xref: /spdk/lib/bdev/bdev.c (revision 4d36735401a968630e14533383f8d8b8fd610b3a)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/event.h"
41 #include "spdk/io_channel.h"
42 #include "spdk/likely.h"
43 #include "spdk/queue.h"
44 #include "spdk/nvme_spec.h"
45 #include "spdk/scsi_spec.h"
46 #include "spdk/util.h"
47 
48 #include "spdk_internal/bdev.h"
49 #include "spdk_internal/log.h"
50 #include "spdk/string.h"
51 
52 #ifdef SPDK_CONFIG_VTUNE
53 #include "ittnotify.h"
54 #include "ittnotify_types.h"
55 int __itt_init_ittlib(const char *, __itt_group_id);
56 #endif
57 
58 #define SPDK_BDEV_IO_POOL_SIZE		(64 * 1024)
59 #define SPDK_BDEV_IO_CACHE_SIZE		256
60 #define BUF_SMALL_POOL_SIZE		8192
61 #define BUF_LARGE_POOL_SIZE		1024
62 #define NOMEM_THRESHOLD_COUNT		8
63 #define ZERO_BUFFER_SIZE		0x100000
64 #define SPDK_BDEV_QOS_TIMESLICE_IN_US	1000
65 
66 typedef TAILQ_HEAD(, spdk_bdev_io) bdev_io_tailq_t;
67 typedef STAILQ_HEAD(, spdk_bdev_io) bdev_io_stailq_t;
68 
69 struct spdk_bdev_mgr {
70 	struct spdk_mempool *bdev_io_pool;
71 
72 	struct spdk_mempool *buf_small_pool;
73 	struct spdk_mempool *buf_large_pool;
74 
75 	void *zero_buffer;
76 
77 	TAILQ_HEAD(, spdk_bdev_module_if) bdev_modules;
78 
79 	TAILQ_HEAD(, spdk_bdev) bdevs;
80 
81 	bool init_complete;
82 	bool module_init_complete;
83 
84 #ifdef SPDK_CONFIG_VTUNE
85 	__itt_domain	*domain;
86 #endif
87 };
88 
89 static struct spdk_bdev_mgr g_bdev_mgr = {
90 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
91 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
92 	.init_complete = false,
93 	.module_init_complete = false,
94 };
95 
96 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
97 static void			*g_init_cb_arg = NULL;
98 
99 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
100 static void			*g_fini_cb_arg = NULL;
101 static struct spdk_thread	*g_fini_thread = NULL;
102 
103 
104 struct spdk_bdev_mgmt_channel {
105 	bdev_io_stailq_t need_buf_small;
106 	bdev_io_stailq_t need_buf_large;
107 
108 	/*
109 	 * Each thread keeps a cache of bdev_io - this allows
110 	 *  bdev threads which are *not* DPDK threads to still
111 	 *  benefit from a per-thread bdev_io cache.  Without
112 	 *  this, non-DPDK threads fetching from the mempool
113 	 *  incur a cmpxchg on get and put.
114 	 */
115 	bdev_io_stailq_t per_thread_cache;
116 	uint32_t	per_thread_cache_count;
117 
118 	TAILQ_HEAD(, spdk_bdev_module_channel) module_channels;
119 };
120 
121 struct spdk_bdev_desc {
122 	struct spdk_bdev		*bdev;
123 	spdk_bdev_remove_cb_t		remove_cb;
124 	void				*remove_ctx;
125 	bool				write;
126 	TAILQ_ENTRY(spdk_bdev_desc)	link;
127 };
128 
129 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
130 
131 struct spdk_bdev_channel {
132 	struct spdk_bdev	*bdev;
133 
134 	/* The channel for the underlying device */
135 	struct spdk_io_channel	*channel;
136 
137 	/* Channel for the bdev manager */
138 	struct spdk_io_channel	*mgmt_channel;
139 
140 	struct spdk_bdev_io_stat stat;
141 
142 	bdev_io_tailq_t		queued_resets;
143 
144 	uint32_t		flags;
145 
146 	/*
147 	 * Rate limiting on this channel.
148 	 * Queue of IO awaiting issue because of a QoS rate limiting happened
149 	 *  on this channel.
150 	 */
151 	bdev_io_tailq_t		qos_io;
152 
153 	/*
154 	 * Rate limiting on this channel.
155 	 * Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and
156 	 *  only valid for the master channel which manages the outstanding IOs.
157 	 */
158 	uint64_t		qos_max_ios_per_timeslice;
159 
160 	/*
161 	 * Rate limiting on this channel.
162 	 * Submitted IO in one timeslice (e.g., 1ms)
163 	 */
164 	uint64_t		io_submitted_this_timeslice;
165 
166 	/*
167 	 * Rate limiting on this channel.
168 	 * Periodic running QoS poller in millisecond.
169 	 */
170 	struct spdk_poller	*qos_poller;
171 
172 	/* Per-device channel */
173 	struct spdk_bdev_module_channel *module_ch;
174 
175 #ifdef SPDK_CONFIG_VTUNE
176 	uint64_t		start_tsc;
177 	uint64_t		interval_tsc;
178 	__itt_string_handle	*handle;
179 #endif
180 
181 };
182 
183 #define __bdev_to_io_dev(bdev)		(((char *)bdev) + 1)
184 #define __bdev_from_io_dev(io_dev)	((struct spdk_bdev *)(((char *)io_dev) - 1))
185 
186 /*
187  * Per-module (or per-io_device) channel. Multiple bdevs built on the same io_device
188  * will queue here their IO that awaits retry. It makes it posible to retry sending
189  * IO to one bdev after IO from other bdev completes.
190  */
191 struct spdk_bdev_module_channel {
192 	/*
193 	 * Count of I/O submitted to bdev module and waiting for completion.
194 	 * Incremented before submit_request() is called on an spdk_bdev_io.
195 	 */
196 	uint64_t		io_outstanding;
197 
198 	/*
199 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
200 	 *  on this channel.
201 	 */
202 	bdev_io_tailq_t		nomem_io;
203 
204 	/*
205 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
206 	 */
207 	uint64_t		nomem_threshold;
208 
209 	/* I/O channel allocated by a bdev module */
210 	struct spdk_io_channel	*module_ch;
211 
212 	uint32_t		ref;
213 
214 	TAILQ_ENTRY(spdk_bdev_module_channel) link;
215 };
216 
217 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
218 
219 struct spdk_bdev *
220 spdk_bdev_first(void)
221 {
222 	struct spdk_bdev *bdev;
223 
224 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
225 	if (bdev) {
226 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
227 	}
228 
229 	return bdev;
230 }
231 
232 struct spdk_bdev *
233 spdk_bdev_next(struct spdk_bdev *prev)
234 {
235 	struct spdk_bdev *bdev;
236 
237 	bdev = TAILQ_NEXT(prev, link);
238 	if (bdev) {
239 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
240 	}
241 
242 	return bdev;
243 }
244 
245 static struct spdk_bdev *
246 _bdev_next_leaf(struct spdk_bdev *bdev)
247 {
248 	while (bdev != NULL) {
249 		if (bdev->claim_module == NULL) {
250 			return bdev;
251 		} else {
252 			bdev = TAILQ_NEXT(bdev, link);
253 		}
254 	}
255 
256 	return bdev;
257 }
258 
259 struct spdk_bdev *
260 spdk_bdev_first_leaf(void)
261 {
262 	struct spdk_bdev *bdev;
263 
264 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
265 
266 	if (bdev) {
267 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
268 	}
269 
270 	return bdev;
271 }
272 
273 struct spdk_bdev *
274 spdk_bdev_next_leaf(struct spdk_bdev *prev)
275 {
276 	struct spdk_bdev *bdev;
277 
278 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
279 
280 	if (bdev) {
281 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
282 	}
283 
284 	return bdev;
285 }
286 
287 struct spdk_bdev *
288 spdk_bdev_get_by_name(const char *bdev_name)
289 {
290 	struct spdk_bdev_alias *tmp;
291 	struct spdk_bdev *bdev = spdk_bdev_first();
292 
293 	while (bdev != NULL) {
294 		if (strcmp(bdev_name, bdev->name) == 0) {
295 			return bdev;
296 		}
297 
298 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
299 			if (strcmp(bdev_name, tmp->alias) == 0) {
300 				return bdev;
301 			}
302 		}
303 
304 		bdev = spdk_bdev_next(bdev);
305 	}
306 
307 	return NULL;
308 }
309 
310 static void
311 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
312 {
313 	assert(bdev_io->get_buf_cb != NULL);
314 	assert(buf != NULL);
315 	assert(bdev_io->u.bdev.iovs != NULL);
316 
317 	bdev_io->buf = buf;
318 	bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
319 	bdev_io->u.bdev.iovs[0].iov_len = bdev_io->buf_len;
320 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
321 }
322 
323 static void
324 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
325 {
326 	struct spdk_mempool *pool;
327 	struct spdk_bdev_io *tmp;
328 	void *buf;
329 	bdev_io_stailq_t *stailq;
330 	struct spdk_bdev_mgmt_channel *ch;
331 
332 	assert(bdev_io->u.bdev.iovcnt == 1);
333 
334 	buf = bdev_io->buf;
335 	ch = bdev_io->mgmt_ch;
336 
337 	if (bdev_io->buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
338 		pool = g_bdev_mgr.buf_small_pool;
339 		stailq = &ch->need_buf_small;
340 	} else {
341 		pool = g_bdev_mgr.buf_large_pool;
342 		stailq = &ch->need_buf_large;
343 	}
344 
345 	if (STAILQ_EMPTY(stailq)) {
346 		spdk_mempool_put(pool, buf);
347 	} else {
348 		tmp = STAILQ_FIRST(stailq);
349 		STAILQ_REMOVE_HEAD(stailq, buf_link);
350 		spdk_bdev_io_set_buf(tmp, buf);
351 	}
352 }
353 
354 void
355 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
356 {
357 	struct spdk_mempool *pool;
358 	bdev_io_stailq_t *stailq;
359 	void *buf = NULL;
360 	struct spdk_bdev_mgmt_channel *ch;
361 
362 	assert(cb != NULL);
363 	assert(bdev_io->u.bdev.iovs != NULL);
364 
365 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
366 		/* Buffer already present */
367 		cb(bdev_io->ch->channel, bdev_io);
368 		return;
369 	}
370 
371 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
372 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
373 
374 	bdev_io->buf_len = len;
375 	bdev_io->get_buf_cb = cb;
376 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
377 		pool = g_bdev_mgr.buf_small_pool;
378 		stailq = &ch->need_buf_small;
379 	} else {
380 		pool = g_bdev_mgr.buf_large_pool;
381 		stailq = &ch->need_buf_large;
382 	}
383 
384 	buf = spdk_mempool_get(pool);
385 
386 	if (!buf) {
387 		STAILQ_INSERT_TAIL(stailq, bdev_io, buf_link);
388 	} else {
389 		spdk_bdev_io_set_buf(bdev_io, buf);
390 	}
391 }
392 
393 static int
394 spdk_bdev_module_get_max_ctx_size(void)
395 {
396 	struct spdk_bdev_module_if *bdev_module;
397 	int max_bdev_module_size = 0;
398 
399 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
400 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
401 			max_bdev_module_size = bdev_module->get_ctx_size();
402 		}
403 	}
404 
405 	return max_bdev_module_size;
406 }
407 
408 void
409 spdk_bdev_config_text(FILE *fp)
410 {
411 	struct spdk_bdev_module_if *bdev_module;
412 
413 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
414 		if (bdev_module->config_text) {
415 			bdev_module->config_text(fp);
416 		}
417 	}
418 }
419 
420 static int
421 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
422 {
423 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
424 
425 	STAILQ_INIT(&ch->need_buf_small);
426 	STAILQ_INIT(&ch->need_buf_large);
427 
428 	STAILQ_INIT(&ch->per_thread_cache);
429 	ch->per_thread_cache_count = 0;
430 
431 	TAILQ_INIT(&ch->module_channels);
432 
433 	return 0;
434 }
435 
436 static void
437 spdk_bdev_mgmt_channel_free_resources(struct spdk_bdev_mgmt_channel *ch)
438 {
439 	struct spdk_bdev_io *bdev_io;
440 
441 	if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
442 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel free\n");
443 	}
444 
445 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
446 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
447 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link);
448 		ch->per_thread_cache_count--;
449 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
450 	}
451 
452 	assert(ch->per_thread_cache_count == 0);
453 }
454 
455 static void
456 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
457 {
458 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
459 
460 	spdk_bdev_mgmt_channel_free_resources(ch);
461 }
462 
463 static void
464 spdk_bdev_init_complete(int rc)
465 {
466 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
467 	void *cb_arg = g_init_cb_arg;
468 
469 	g_bdev_mgr.init_complete = true;
470 	g_init_cb_fn = NULL;
471 	g_init_cb_arg = NULL;
472 
473 	cb_fn(cb_arg, rc);
474 }
475 
476 static void
477 spdk_bdev_module_action_complete(void)
478 {
479 	struct spdk_bdev_module_if *m;
480 
481 	/*
482 	 * Don't finish bdev subsystem initialization if
483 	 * module pre-initialization is still in progress, or
484 	 * the subsystem been already initialized.
485 	 */
486 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
487 		return;
488 	}
489 
490 	/*
491 	 * Check all bdev modules for inits/examinations in progress. If any
492 	 * exist, return immediately since we cannot finish bdev subsystem
493 	 * initialization until all are completed.
494 	 */
495 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
496 		if (m->action_in_progress > 0) {
497 			return;
498 		}
499 	}
500 
501 	/*
502 	 * Modules already finished initialization - now that all
503 	 * the bdev modules have finished their asynchronous I/O
504 	 * processing, the entire bdev layer can be marked as complete.
505 	 */
506 	spdk_bdev_init_complete(0);
507 }
508 
509 static void
510 spdk_bdev_module_action_done(struct spdk_bdev_module_if *module)
511 {
512 	assert(module->action_in_progress > 0);
513 	module->action_in_progress--;
514 	spdk_bdev_module_action_complete();
515 }
516 
517 void
518 spdk_bdev_module_init_done(struct spdk_bdev_module_if *module)
519 {
520 	spdk_bdev_module_action_done(module);
521 }
522 
523 void
524 spdk_bdev_module_examine_done(struct spdk_bdev_module_if *module)
525 {
526 	spdk_bdev_module_action_done(module);
527 }
528 
529 static int
530 spdk_bdev_modules_init(void)
531 {
532 	struct spdk_bdev_module_if *module;
533 	int rc = 0;
534 
535 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
536 		rc = module->module_init();
537 		if (rc != 0) {
538 			break;
539 		}
540 	}
541 
542 	g_bdev_mgr.module_init_complete = true;
543 	return rc;
544 }
545 void
546 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
547 {
548 	int cache_size;
549 	int rc = 0;
550 	char mempool_name[32];
551 
552 	assert(cb_fn != NULL);
553 
554 	g_init_cb_fn = cb_fn;
555 	g_init_cb_arg = cb_arg;
556 
557 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
558 
559 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
560 				  SPDK_BDEV_IO_POOL_SIZE,
561 				  sizeof(struct spdk_bdev_io) +
562 				  spdk_bdev_module_get_max_ctx_size(),
563 				  0,
564 				  SPDK_ENV_SOCKET_ID_ANY);
565 
566 	if (g_bdev_mgr.bdev_io_pool == NULL) {
567 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
568 		spdk_bdev_init_complete(-1);
569 		return;
570 	}
571 
572 	/**
573 	 * Ensure no more than half of the total buffers end up local caches, by
574 	 *   using spdk_env_get_core_count() to determine how many local caches we need
575 	 *   to account for.
576 	 */
577 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
578 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
579 
580 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
581 				    BUF_SMALL_POOL_SIZE,
582 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
583 				    cache_size,
584 				    SPDK_ENV_SOCKET_ID_ANY);
585 	if (!g_bdev_mgr.buf_small_pool) {
586 		SPDK_ERRLOG("create rbuf small pool failed\n");
587 		spdk_bdev_init_complete(-1);
588 		return;
589 	}
590 
591 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
592 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
593 
594 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
595 				    BUF_LARGE_POOL_SIZE,
596 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
597 				    cache_size,
598 				    SPDK_ENV_SOCKET_ID_ANY);
599 	if (!g_bdev_mgr.buf_large_pool) {
600 		SPDK_ERRLOG("create rbuf large pool failed\n");
601 		spdk_bdev_init_complete(-1);
602 		return;
603 	}
604 
605 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
606 				 NULL);
607 	if (!g_bdev_mgr.zero_buffer) {
608 		SPDK_ERRLOG("create bdev zero buffer failed\n");
609 		spdk_bdev_init_complete(-1);
610 		return;
611 	}
612 
613 #ifdef SPDK_CONFIG_VTUNE
614 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
615 #endif
616 
617 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
618 				spdk_bdev_mgmt_channel_destroy,
619 				sizeof(struct spdk_bdev_mgmt_channel));
620 
621 	rc = spdk_bdev_modules_init();
622 	if (rc != 0) {
623 		SPDK_ERRLOG("bdev modules init failed\n");
624 		spdk_bdev_init_complete(-1);
625 		return;
626 	}
627 
628 	spdk_bdev_module_action_complete();
629 }
630 
631 static void
632 spdk_bdev_module_finish_cb(void *io_device)
633 {
634 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
635 
636 	cb_fn(g_fini_cb_arg);
637 	g_fini_cb_fn = NULL;
638 	g_fini_cb_arg = NULL;
639 }
640 
641 static void
642 spdk_bdev_module_finish_complete(struct spdk_io_channel_iter *i, int status)
643 {
644 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
645 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
646 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
647 			    SPDK_BDEV_IO_POOL_SIZE);
648 	}
649 
650 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
651 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
652 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
653 			    BUF_SMALL_POOL_SIZE);
654 		assert(false);
655 	}
656 
657 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
658 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
659 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
660 			    BUF_LARGE_POOL_SIZE);
661 		assert(false);
662 	}
663 
664 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
665 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
666 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
667 	spdk_dma_free(g_bdev_mgr.zero_buffer);
668 
669 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_module_finish_cb);
670 }
671 
672 static void
673 mgmt_channel_free_resources(struct spdk_io_channel_iter *i)
674 {
675 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
676 	struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(_ch);
677 
678 	spdk_bdev_mgmt_channel_free_resources(ch);
679 	spdk_for_each_channel_continue(i, 0);
680 }
681 
682 static void
683 spdk_bdev_module_finish_iter(void *arg)
684 {
685 	/* Notice that this variable is static. It is saved between calls to
686 	 * this function. */
687 	static struct spdk_bdev_module_if *resume_bdev_module = NULL;
688 	struct spdk_bdev_module_if *bdev_module;
689 
690 	/* Start iterating from the last touched module */
691 	if (!resume_bdev_module) {
692 		bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules);
693 	} else {
694 		bdev_module = TAILQ_NEXT(resume_bdev_module, tailq);
695 	}
696 
697 	while (bdev_module) {
698 		if (bdev_module->async_fini) {
699 			/* Save our place so we can resume later. We must
700 			 * save the variable here, before calling module_fini()
701 			 * below, because in some cases the module may immediately
702 			 * call spdk_bdev_module_finish_done() and re-enter
703 			 * this function to continue iterating. */
704 			resume_bdev_module = bdev_module;
705 		}
706 
707 		if (bdev_module->module_fini) {
708 			bdev_module->module_fini();
709 		}
710 
711 		if (bdev_module->async_fini) {
712 			return;
713 		}
714 
715 		bdev_module = TAILQ_NEXT(bdev_module, tailq);
716 	}
717 
718 	resume_bdev_module = NULL;
719 	spdk_for_each_channel(&g_bdev_mgr, mgmt_channel_free_resources, NULL,
720 			      spdk_bdev_module_finish_complete);
721 }
722 
723 void
724 spdk_bdev_module_finish_done(void)
725 {
726 	if (spdk_get_thread() != g_fini_thread) {
727 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
728 	} else {
729 		spdk_bdev_module_finish_iter(NULL);
730 	}
731 }
732 
733 static void
734 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
735 {
736 	struct spdk_bdev *bdev = cb_arg;
737 
738 	if (bdeverrno && bdev) {
739 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
740 			     bdev->name);
741 
742 		/*
743 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
744 		 *  bdev; try to continue by manually removing this bdev from the list and continue
745 		 *  with the next bdev in the list.
746 		 */
747 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
748 	}
749 
750 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
751 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
752 		spdk_bdev_module_finish_iter(NULL);
753 		return;
754 	}
755 
756 	/*
757 	 * Unregister the first bdev in the list.
758 	 *
759 	 * spdk_bdev_unregister() will handle the case where the bdev has open descriptors by
760 	 *  calling the remove_cb of the descriptors first.
761 	 *
762 	 * Once this bdev and all of its open descriptors have been cleaned up, this function
763 	 *  will be called again via the unregister completion callback to continue the cleanup
764 	 *  process with the next bdev.
765 	 */
766 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
767 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
768 	spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
769 }
770 
771 static void
772 _spdk_bdev_finish_unregister_bdevs(void)
773 {
774 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
775 }
776 
777 void
778 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
779 {
780 	assert(cb_fn != NULL);
781 
782 	g_fini_thread = spdk_get_thread();
783 
784 	g_fini_cb_fn = cb_fn;
785 	g_fini_cb_arg = cb_arg;
786 
787 	_spdk_bdev_finish_unregister_bdevs();
788 }
789 
790 static struct spdk_bdev_io *
791 spdk_bdev_get_io(struct spdk_io_channel *_ch)
792 {
793 	struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(_ch);
794 	struct spdk_bdev_io *bdev_io;
795 
796 	if (ch->per_thread_cache_count > 0) {
797 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
798 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link);
799 		ch->per_thread_cache_count--;
800 	} else {
801 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
802 		if (!bdev_io) {
803 			SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
804 			abort();
805 		}
806 	}
807 
808 	bdev_io->mgmt_ch = ch;
809 
810 	return bdev_io;
811 }
812 
813 static void
814 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
815 {
816 	struct spdk_bdev_mgmt_channel *ch = bdev_io->mgmt_ch;
817 
818 	if (bdev_io->buf != NULL) {
819 		spdk_bdev_io_put_buf(bdev_io);
820 	}
821 
822 	if (ch->per_thread_cache_count < SPDK_BDEV_IO_CACHE_SIZE) {
823 		ch->per_thread_cache_count++;
824 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, buf_link);
825 	} else {
826 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
827 	}
828 }
829 
830 static void
831 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
832 {
833 	struct spdk_bdev *bdev = bdev_io->bdev;
834 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
835 	struct spdk_io_channel *ch = bdev_ch->channel;
836 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
837 
838 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
839 
840 	bdev_io->submit_tsc = spdk_get_ticks();
841 	shared_ch->io_outstanding++;
842 	bdev_io->in_submit_request = true;
843 	if (spdk_likely(bdev_ch->flags == 0)) {
844 		if (spdk_likely(TAILQ_EMPTY(&shared_ch->nomem_io))) {
845 			bdev->fn_table->submit_request(ch, bdev_io);
846 		} else {
847 			shared_ch->io_outstanding--;
848 			TAILQ_INSERT_TAIL(&shared_ch->nomem_io, bdev_io, link);
849 		}
850 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
851 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
852 	} else {
853 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
854 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
855 	}
856 	bdev_io->in_submit_request = false;
857 }
858 
859 static void
860 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
861 {
862 	struct spdk_bdev *bdev = bdev_io->bdev;
863 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
864 	struct spdk_io_channel *ch = bdev_ch->channel;
865 
866 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
867 
868 	bdev_io->in_submit_request = true;
869 	bdev->fn_table->submit_request(ch, bdev_io);
870 	bdev_io->in_submit_request = false;
871 }
872 
873 static void
874 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
875 		  struct spdk_bdev *bdev, void *cb_arg,
876 		  spdk_bdev_io_completion_cb cb)
877 {
878 	bdev_io->bdev = bdev;
879 	bdev_io->caller_ctx = cb_arg;
880 	bdev_io->cb = cb;
881 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
882 	bdev_io->in_submit_request = false;
883 	bdev_io->buf = NULL;
884 }
885 
886 bool
887 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
888 {
889 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
890 }
891 
892 int
893 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
894 {
895 	if (bdev->fn_table->dump_info_json) {
896 		return bdev->fn_table->dump_info_json(bdev->ctxt, w);
897 	}
898 
899 	return 0;
900 }
901 
902 static int
903 _spdk_bdev_channel_create(struct spdk_bdev_channel *ch, void *io_device)
904 {
905 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
906 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
907 	struct spdk_bdev_module_channel	*shared_ch;
908 
909 	ch->bdev = bdev;
910 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
911 	if (!ch->channel) {
912 		return -1;
913 	}
914 
915 	ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr);
916 	if (!ch->mgmt_channel) {
917 		return -1;
918 	}
919 
920 	mgmt_ch = spdk_io_channel_get_ctx(ch->mgmt_channel);
921 	TAILQ_FOREACH(shared_ch, &mgmt_ch->module_channels, link) {
922 		if (shared_ch->module_ch == ch->channel) {
923 			shared_ch->ref++;
924 			break;
925 		}
926 	}
927 
928 	if (shared_ch == NULL) {
929 		shared_ch = calloc(1, sizeof(*shared_ch));
930 		if (!shared_ch) {
931 			return -1;
932 		}
933 
934 		shared_ch->io_outstanding = 0;
935 		TAILQ_INIT(&shared_ch->nomem_io);
936 		shared_ch->nomem_threshold = 0;
937 		shared_ch->module_ch = ch->channel;
938 		shared_ch->ref = 1;
939 		TAILQ_INSERT_TAIL(&mgmt_ch->module_channels, shared_ch, link);
940 	}
941 
942 	memset(&ch->stat, 0, sizeof(ch->stat));
943 	TAILQ_INIT(&ch->queued_resets);
944 	TAILQ_INIT(&ch->qos_io);
945 	ch->qos_max_ios_per_timeslice = 0;
946 	ch->io_submitted_this_timeslice = 0;
947 	ch->qos_poller = NULL;
948 	ch->flags = 0;
949 	ch->module_ch = shared_ch;
950 
951 	return 0;
952 }
953 
954 static void
955 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
956 {
957 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
958 	struct spdk_bdev_module_channel	*shared_ch = ch->module_ch;
959 
960 	if (ch->channel) {
961 		spdk_put_io_channel(ch->channel);
962 	}
963 
964 	if (ch->mgmt_channel) {
965 		if (shared_ch) {
966 			assert(shared_ch->ref > 0);
967 			shared_ch->ref--;
968 			if (shared_ch->ref == 0) {
969 				mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
970 				assert(shared_ch->io_outstanding == 0);
971 				TAILQ_REMOVE(&mgmt_channel->module_channels, shared_ch, link);
972 				free(shared_ch);
973 			}
974 		}
975 		spdk_put_io_channel(ch->mgmt_channel);
976 	}
977 }
978 
979 static int
980 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
981 {
982 	struct spdk_bdev_channel	*ch = ctx_buf;
983 
984 	if (_spdk_bdev_channel_create(ch, io_device) != 0) {
985 		_spdk_bdev_channel_destroy_resource(ch);
986 		return -1;
987 	}
988 
989 #ifdef SPDK_CONFIG_VTUNE
990 	{
991 		char *name;
992 		__itt_init_ittlib(NULL, 0);
993 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
994 		if (!name) {
995 			_spdk_bdev_channel_destroy_resource(ch);
996 			return -1;
997 		}
998 		ch->handle = __itt_string_handle_create(name);
999 		free(name);
1000 		ch->start_tsc = spdk_get_ticks();
1001 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
1002 	}
1003 #endif
1004 
1005 	return 0;
1006 }
1007 
1008 /*
1009  * Abort I/O that are waiting on a data buffer.  These types of I/O are
1010  *  linked using the spdk_bdev_io buf_link TAILQ_ENTRY.
1011  */
1012 static void
1013 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
1014 {
1015 	bdev_io_stailq_t tmp;
1016 	struct spdk_bdev_io *bdev_io;
1017 
1018 	STAILQ_INIT(&tmp);
1019 
1020 	while (!STAILQ_EMPTY(queue)) {
1021 		bdev_io = STAILQ_FIRST(queue);
1022 		STAILQ_REMOVE_HEAD(queue, buf_link);
1023 		if (bdev_io->ch == ch) {
1024 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1025 		} else {
1026 			STAILQ_INSERT_TAIL(&tmp, bdev_io, buf_link);
1027 		}
1028 	}
1029 
1030 	STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
1031 }
1032 
1033 /*
1034  * Abort I/O that are queued waiting for submission.  These types of I/O are
1035  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
1036  */
1037 static void
1038 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
1039 {
1040 	struct spdk_bdev_io *bdev_io, *tmp;
1041 
1042 	TAILQ_FOREACH_SAFE(bdev_io, queue, link, tmp) {
1043 		if (bdev_io->ch == ch) {
1044 			TAILQ_REMOVE(queue, bdev_io, link);
1045 			/*
1046 			 * spdk_bdev_io_complete() assumes that the completed I/O had
1047 			 *  been submitted to the bdev module.  Since in this case it
1048 			 *  hadn't, bump io_outstanding to account for the decrement
1049 			 *  that spdk_bdev_io_complete() will do.
1050 			 */
1051 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
1052 				ch->module_ch->io_outstanding++;
1053 			}
1054 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1055 		}
1056 	}
1057 }
1058 
1059 static void
1060 _spdk_bdev_channel_destroy(struct spdk_bdev_channel *ch)
1061 {
1062 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1063 	struct spdk_bdev_module_channel	*shared_ch = ch->module_ch;
1064 
1065 	mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
1066 
1067 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
1068 	_spdk_bdev_abort_queued_io(&ch->qos_io, ch);
1069 	_spdk_bdev_abort_queued_io(&shared_ch->nomem_io, ch);
1070 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, ch);
1071 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, ch);
1072 
1073 	_spdk_bdev_channel_destroy_resource(ch);
1074 }
1075 
1076 static void
1077 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
1078 {
1079 	struct spdk_bdev_channel	*ch = ctx_buf;
1080 
1081 	_spdk_bdev_channel_destroy(ch);
1082 }
1083 
1084 int
1085 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
1086 {
1087 	struct spdk_bdev_alias *tmp;
1088 
1089 	if (alias == NULL) {
1090 		SPDK_ERRLOG("Empty alias passed\n");
1091 		return -EINVAL;
1092 	}
1093 
1094 	if (spdk_bdev_get_by_name(alias)) {
1095 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
1096 		return -EEXIST;
1097 	}
1098 
1099 	tmp = calloc(1, sizeof(*tmp));
1100 	if (tmp == NULL) {
1101 		SPDK_ERRLOG("Unable to allocate alias\n");
1102 		return -ENOMEM;
1103 	}
1104 
1105 	tmp->alias = strdup(alias);
1106 	if (tmp->alias == NULL) {
1107 		free(tmp);
1108 		SPDK_ERRLOG("Unable to allocate alias\n");
1109 		return -ENOMEM;
1110 	}
1111 
1112 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
1113 
1114 	return 0;
1115 }
1116 
1117 int
1118 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
1119 {
1120 	struct spdk_bdev_alias *tmp;
1121 
1122 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
1123 		if (strcmp(alias, tmp->alias) == 0) {
1124 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
1125 			free(tmp->alias);
1126 			free(tmp);
1127 			return 0;
1128 		}
1129 	}
1130 
1131 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1132 
1133 	return -ENOENT;
1134 }
1135 
1136 struct spdk_io_channel *
1137 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1138 {
1139 	return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev));
1140 }
1141 
1142 const char *
1143 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1144 {
1145 	return bdev->name;
1146 }
1147 
1148 const char *
1149 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1150 {
1151 	return bdev->product_name;
1152 }
1153 
1154 const struct spdk_bdev_aliases_list *
1155 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1156 {
1157 	return &bdev->aliases;
1158 }
1159 
1160 uint32_t
1161 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1162 {
1163 	return bdev->blocklen;
1164 }
1165 
1166 uint64_t
1167 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
1168 {
1169 	return bdev->blockcnt;
1170 }
1171 
1172 size_t
1173 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
1174 {
1175 	/* TODO: push this logic down to the bdev modules */
1176 	if (bdev->need_aligned_buffer) {
1177 		return bdev->blocklen;
1178 	}
1179 
1180 	return 1;
1181 }
1182 
1183 uint32_t
1184 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
1185 {
1186 	return bdev->optimal_io_boundary;
1187 }
1188 
1189 bool
1190 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
1191 {
1192 	return bdev->write_cache;
1193 }
1194 
1195 const struct spdk_uuid *
1196 spdk_bdev_get_uuid(const struct spdk_bdev *bdev)
1197 {
1198 	return &bdev->uuid;
1199 }
1200 
1201 int
1202 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
1203 {
1204 	int ret;
1205 
1206 	pthread_mutex_lock(&bdev->mutex);
1207 
1208 	/* bdev has open descriptors */
1209 	if (!TAILQ_EMPTY(&bdev->open_descs) &&
1210 	    bdev->blockcnt > size) {
1211 		ret = -EBUSY;
1212 	} else {
1213 		bdev->blockcnt = size;
1214 		ret = 0;
1215 	}
1216 
1217 	pthread_mutex_unlock(&bdev->mutex);
1218 
1219 	return ret;
1220 }
1221 
1222 /*
1223  * Convert I/O offset and length from bytes to blocks.
1224  *
1225  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
1226  */
1227 static uint64_t
1228 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
1229 			  uint64_t num_bytes, uint64_t *num_blocks)
1230 {
1231 	uint32_t block_size = bdev->blocklen;
1232 
1233 	*offset_blocks = offset_bytes / block_size;
1234 	*num_blocks = num_bytes / block_size;
1235 
1236 	return (offset_bytes % block_size) | (num_bytes % block_size);
1237 }
1238 
1239 static bool
1240 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
1241 {
1242 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
1243 	 * has been an overflow and hence the offset has been wrapped around */
1244 	if (offset_blocks + num_blocks < offset_blocks) {
1245 		return false;
1246 	}
1247 
1248 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
1249 	if (offset_blocks + num_blocks > bdev->blockcnt) {
1250 		return false;
1251 	}
1252 
1253 	return true;
1254 }
1255 
1256 int
1257 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1258 	       void *buf, uint64_t offset, uint64_t nbytes,
1259 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
1260 {
1261 	uint64_t offset_blocks, num_blocks;
1262 
1263 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1264 		return -EINVAL;
1265 	}
1266 
1267 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1268 }
1269 
1270 int
1271 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1272 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1273 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
1274 {
1275 	struct spdk_bdev *bdev = desc->bdev;
1276 	struct spdk_bdev_io *bdev_io;
1277 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1278 
1279 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1280 		return -EINVAL;
1281 	}
1282 
1283 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1284 	if (!bdev_io) {
1285 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1286 		return -ENOMEM;
1287 	}
1288 
1289 	bdev_io->ch = channel;
1290 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1291 	bdev_io->u.bdev.iov.iov_base = buf;
1292 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1293 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1294 	bdev_io->u.bdev.iovcnt = 1;
1295 	bdev_io->u.bdev.num_blocks = num_blocks;
1296 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1297 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1298 
1299 	spdk_bdev_io_submit(bdev_io);
1300 	return 0;
1301 }
1302 
1303 int
1304 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1305 		struct iovec *iov, int iovcnt,
1306 		uint64_t offset, uint64_t nbytes,
1307 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1308 {
1309 	uint64_t offset_blocks, num_blocks;
1310 
1311 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1312 		return -EINVAL;
1313 	}
1314 
1315 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1316 }
1317 
1318 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1319 			   struct iovec *iov, int iovcnt,
1320 			   uint64_t offset_blocks, uint64_t num_blocks,
1321 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1322 {
1323 	struct spdk_bdev *bdev = desc->bdev;
1324 	struct spdk_bdev_io *bdev_io;
1325 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1326 
1327 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1328 		return -EINVAL;
1329 	}
1330 
1331 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1332 	if (!bdev_io) {
1333 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1334 		return -ENOMEM;
1335 	}
1336 
1337 	bdev_io->ch = channel;
1338 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1339 	bdev_io->u.bdev.iovs = iov;
1340 	bdev_io->u.bdev.iovcnt = iovcnt;
1341 	bdev_io->u.bdev.num_blocks = num_blocks;
1342 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1343 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1344 
1345 	spdk_bdev_io_submit(bdev_io);
1346 	return 0;
1347 }
1348 
1349 int
1350 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1351 		void *buf, uint64_t offset, uint64_t nbytes,
1352 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1353 {
1354 	uint64_t offset_blocks, num_blocks;
1355 
1356 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1357 		return -EINVAL;
1358 	}
1359 
1360 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1361 }
1362 
1363 int
1364 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1365 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1366 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1367 {
1368 	struct spdk_bdev *bdev = desc->bdev;
1369 	struct spdk_bdev_io *bdev_io;
1370 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1371 
1372 	if (!desc->write) {
1373 		return -EBADF;
1374 	}
1375 
1376 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1377 		return -EINVAL;
1378 	}
1379 
1380 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1381 	if (!bdev_io) {
1382 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
1383 		return -ENOMEM;
1384 	}
1385 
1386 	bdev_io->ch = channel;
1387 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1388 	bdev_io->u.bdev.iov.iov_base = buf;
1389 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1390 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1391 	bdev_io->u.bdev.iovcnt = 1;
1392 	bdev_io->u.bdev.num_blocks = num_blocks;
1393 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1394 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1395 
1396 	spdk_bdev_io_submit(bdev_io);
1397 	return 0;
1398 }
1399 
1400 int
1401 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1402 		 struct iovec *iov, int iovcnt,
1403 		 uint64_t offset, uint64_t len,
1404 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
1405 {
1406 	uint64_t offset_blocks, num_blocks;
1407 
1408 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1409 		return -EINVAL;
1410 	}
1411 
1412 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1413 }
1414 
1415 int
1416 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1417 			struct iovec *iov, int iovcnt,
1418 			uint64_t offset_blocks, uint64_t num_blocks,
1419 			spdk_bdev_io_completion_cb cb, void *cb_arg)
1420 {
1421 	struct spdk_bdev *bdev = desc->bdev;
1422 	struct spdk_bdev_io *bdev_io;
1423 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1424 
1425 	if (!desc->write) {
1426 		return -EBADF;
1427 	}
1428 
1429 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1430 		return -EINVAL;
1431 	}
1432 
1433 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1434 	if (!bdev_io) {
1435 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
1436 		return -ENOMEM;
1437 	}
1438 
1439 	bdev_io->ch = channel;
1440 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1441 	bdev_io->u.bdev.iovs = iov;
1442 	bdev_io->u.bdev.iovcnt = iovcnt;
1443 	bdev_io->u.bdev.num_blocks = num_blocks;
1444 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1445 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1446 
1447 	spdk_bdev_io_submit(bdev_io);
1448 	return 0;
1449 }
1450 
1451 int
1452 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1453 		       uint64_t offset, uint64_t len,
1454 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1455 {
1456 	uint64_t offset_blocks, num_blocks;
1457 
1458 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1459 		return -EINVAL;
1460 	}
1461 
1462 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1463 }
1464 
1465 int
1466 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1467 			      uint64_t offset_blocks, uint64_t num_blocks,
1468 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1469 {
1470 	struct spdk_bdev *bdev = desc->bdev;
1471 	struct spdk_bdev_io *bdev_io;
1472 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1473 	uint64_t len;
1474 	bool split_request = false;
1475 
1476 	if (num_blocks > UINT64_MAX / spdk_bdev_get_block_size(bdev)) {
1477 		SPDK_ERRLOG("length argument out of range in write_zeroes\n");
1478 		return -ERANGE;
1479 	}
1480 
1481 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1482 		return -EINVAL;
1483 	}
1484 
1485 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1486 
1487 	if (!bdev_io) {
1488 		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
1489 		return -ENOMEM;
1490 	}
1491 
1492 	bdev_io->ch = channel;
1493 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1494 
1495 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1496 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1497 		bdev_io->u.bdev.num_blocks = num_blocks;
1498 		bdev_io->u.bdev.iovs = NULL;
1499 		bdev_io->u.bdev.iovcnt = 0;
1500 
1501 	} else {
1502 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
1503 
1504 		len = spdk_bdev_get_block_size(bdev) * num_blocks;
1505 
1506 		if (len > ZERO_BUFFER_SIZE) {
1507 			split_request = true;
1508 			len = ZERO_BUFFER_SIZE;
1509 		}
1510 
1511 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1512 		bdev_io->u.bdev.iov.iov_base = g_bdev_mgr.zero_buffer;
1513 		bdev_io->u.bdev.iov.iov_len = len;
1514 		bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1515 		bdev_io->u.bdev.iovcnt = 1;
1516 		bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev);
1517 		bdev_io->split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks;
1518 		bdev_io->split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks;
1519 	}
1520 
1521 	if (split_request) {
1522 		bdev_io->stored_user_cb = cb;
1523 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split);
1524 	} else {
1525 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1526 	}
1527 	spdk_bdev_io_submit(bdev_io);
1528 	return 0;
1529 }
1530 
1531 int
1532 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1533 		uint64_t offset, uint64_t nbytes,
1534 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1535 {
1536 	uint64_t offset_blocks, num_blocks;
1537 
1538 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1539 		return -EINVAL;
1540 	}
1541 
1542 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1543 }
1544 
1545 int
1546 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1547 		       uint64_t offset_blocks, uint64_t num_blocks,
1548 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1549 {
1550 	struct spdk_bdev *bdev = desc->bdev;
1551 	struct spdk_bdev_io *bdev_io;
1552 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1553 
1554 	if (!desc->write) {
1555 		return -EBADF;
1556 	}
1557 
1558 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1559 		return -EINVAL;
1560 	}
1561 
1562 	if (num_blocks == 0) {
1563 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
1564 		return -EINVAL;
1565 	}
1566 
1567 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1568 	if (!bdev_io) {
1569 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
1570 		return -ENOMEM;
1571 	}
1572 
1573 	bdev_io->ch = channel;
1574 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1575 	bdev_io->u.bdev.iov.iov_base = NULL;
1576 	bdev_io->u.bdev.iov.iov_len = 0;
1577 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1578 	bdev_io->u.bdev.iovcnt = 1;
1579 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1580 	bdev_io->u.bdev.num_blocks = num_blocks;
1581 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1582 
1583 	spdk_bdev_io_submit(bdev_io);
1584 	return 0;
1585 }
1586 
1587 int
1588 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1589 		uint64_t offset, uint64_t length,
1590 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1591 {
1592 	uint64_t offset_blocks, num_blocks;
1593 
1594 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
1595 		return -EINVAL;
1596 	}
1597 
1598 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1599 }
1600 
1601 int
1602 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1603 		       uint64_t offset_blocks, uint64_t num_blocks,
1604 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1605 {
1606 	struct spdk_bdev *bdev = desc->bdev;
1607 	struct spdk_bdev_io *bdev_io;
1608 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1609 
1610 	if (!desc->write) {
1611 		return -EBADF;
1612 	}
1613 
1614 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1615 		return -EINVAL;
1616 	}
1617 
1618 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1619 	if (!bdev_io) {
1620 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1621 		return -ENOMEM;
1622 	}
1623 
1624 	bdev_io->ch = channel;
1625 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1626 	bdev_io->u.bdev.iovs = NULL;
1627 	bdev_io->u.bdev.iovcnt = 0;
1628 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1629 	bdev_io->u.bdev.num_blocks = num_blocks;
1630 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1631 
1632 	spdk_bdev_io_submit(bdev_io);
1633 	return 0;
1634 }
1635 
1636 static void
1637 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
1638 {
1639 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
1640 	struct spdk_bdev_io *bdev_io;
1641 
1642 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
1643 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, link);
1644 	spdk_bdev_io_submit_reset(bdev_io);
1645 }
1646 
1647 static void
1648 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
1649 {
1650 	struct spdk_io_channel		*ch;
1651 	struct spdk_bdev_channel	*channel;
1652 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1653 	struct spdk_bdev_module_channel	*shared_ch;
1654 
1655 	ch = spdk_io_channel_iter_get_channel(i);
1656 	channel = spdk_io_channel_get_ctx(ch);
1657 	mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
1658 	shared_ch = channel->module_ch;
1659 
1660 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
1661 
1662 	_spdk_bdev_abort_queued_io(&shared_ch->nomem_io, channel);
1663 	_spdk_bdev_abort_queued_io(&channel->qos_io, channel);
1664 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
1665 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
1666 
1667 	spdk_for_each_channel_continue(i, 0);
1668 }
1669 
1670 static void
1671 _spdk_bdev_start_reset(void *ctx)
1672 {
1673 	struct spdk_bdev_channel *ch = ctx;
1674 
1675 	spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel,
1676 			      ch, _spdk_bdev_reset_dev);
1677 }
1678 
1679 static void
1680 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
1681 {
1682 	struct spdk_bdev *bdev = ch->bdev;
1683 
1684 	assert(!TAILQ_EMPTY(&ch->queued_resets));
1685 
1686 	pthread_mutex_lock(&bdev->mutex);
1687 	if (bdev->reset_in_progress == NULL) {
1688 		bdev->reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
1689 		/*
1690 		 * Take a channel reference for the target bdev for the life of this
1691 		 *  reset.  This guards against the channel getting destroyed while
1692 		 *  spdk_for_each_channel() calls related to this reset IO are in
1693 		 *  progress.  We will release the reference when this reset is
1694 		 *  completed.
1695 		 */
1696 		bdev->reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev));
1697 		_spdk_bdev_start_reset(ch);
1698 	}
1699 	pthread_mutex_unlock(&bdev->mutex);
1700 }
1701 
1702 int
1703 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1704 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1705 {
1706 	struct spdk_bdev *bdev = desc->bdev;
1707 	struct spdk_bdev_io *bdev_io;
1708 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1709 
1710 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1711 	if (!bdev_io) {
1712 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1713 		return -ENOMEM;
1714 	}
1715 
1716 	bdev_io->ch = channel;
1717 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1718 	bdev_io->u.reset.ch_ref = NULL;
1719 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1720 
1721 	pthread_mutex_lock(&bdev->mutex);
1722 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, link);
1723 	pthread_mutex_unlock(&bdev->mutex);
1724 
1725 	_spdk_bdev_channel_start_reset(channel);
1726 
1727 	return 0;
1728 }
1729 
1730 void
1731 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1732 		      struct spdk_bdev_io_stat *stat)
1733 {
1734 #ifdef SPDK_CONFIG_VTUNE
1735 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1736 	memset(stat, 0, sizeof(*stat));
1737 	return;
1738 #endif
1739 
1740 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1741 
1742 	channel->stat.ticks_rate = spdk_get_ticks_hz();
1743 	*stat = channel->stat;
1744 	memset(&channel->stat, 0, sizeof(channel->stat));
1745 }
1746 
1747 int
1748 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1749 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1750 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1751 {
1752 	struct spdk_bdev *bdev = desc->bdev;
1753 	struct spdk_bdev_io *bdev_io;
1754 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1755 
1756 	if (!desc->write) {
1757 		return -EBADF;
1758 	}
1759 
1760 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1761 	if (!bdev_io) {
1762 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1763 		return -ENOMEM;
1764 	}
1765 
1766 	bdev_io->ch = channel;
1767 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
1768 	bdev_io->u.nvme_passthru.cmd = *cmd;
1769 	bdev_io->u.nvme_passthru.buf = buf;
1770 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1771 	bdev_io->u.nvme_passthru.md_buf = NULL;
1772 	bdev_io->u.nvme_passthru.md_len = 0;
1773 
1774 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1775 
1776 	spdk_bdev_io_submit(bdev_io);
1777 	return 0;
1778 }
1779 
1780 int
1781 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1782 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1783 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1784 {
1785 	struct spdk_bdev *bdev = desc->bdev;
1786 	struct spdk_bdev_io *bdev_io;
1787 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1788 
1789 	if (!desc->write) {
1790 		/*
1791 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1792 		 *  to easily determine if the command is a read or write, but for now just
1793 		 *  do not allow io_passthru with a read-only descriptor.
1794 		 */
1795 		return -EBADF;
1796 	}
1797 
1798 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1799 	if (!bdev_io) {
1800 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1801 		return -ENOMEM;
1802 	}
1803 
1804 	bdev_io->ch = channel;
1805 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
1806 	bdev_io->u.nvme_passthru.cmd = *cmd;
1807 	bdev_io->u.nvme_passthru.buf = buf;
1808 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1809 	bdev_io->u.nvme_passthru.md_buf = NULL;
1810 	bdev_io->u.nvme_passthru.md_len = 0;
1811 
1812 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1813 
1814 	spdk_bdev_io_submit(bdev_io);
1815 	return 0;
1816 }
1817 
1818 int
1819 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1820 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
1821 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1822 {
1823 	struct spdk_bdev *bdev = desc->bdev;
1824 	struct spdk_bdev_io *bdev_io;
1825 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1826 
1827 	if (!desc->write) {
1828 		/*
1829 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1830 		 *  to easily determine if the command is a read or write, but for now just
1831 		 *  do not allow io_passthru with a read-only descriptor.
1832 		 */
1833 		return -EBADF;
1834 	}
1835 
1836 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1837 	if (!bdev_io) {
1838 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1839 		return -ENOMEM;
1840 	}
1841 
1842 	bdev_io->ch = channel;
1843 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
1844 	bdev_io->u.nvme_passthru.cmd = *cmd;
1845 	bdev_io->u.nvme_passthru.buf = buf;
1846 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1847 	bdev_io->u.nvme_passthru.md_buf = md_buf;
1848 	bdev_io->u.nvme_passthru.md_len = md_len;
1849 
1850 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1851 
1852 	spdk_bdev_io_submit(bdev_io);
1853 	return 0;
1854 }
1855 
1856 int
1857 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
1858 {
1859 	if (!bdev_io) {
1860 		SPDK_ERRLOG("bdev_io is NULL\n");
1861 		return -1;
1862 	}
1863 
1864 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
1865 		SPDK_ERRLOG("bdev_io is in pending state\n");
1866 		assert(false);
1867 		return -1;
1868 	}
1869 
1870 	spdk_bdev_put_io(bdev_io);
1871 
1872 	return 0;
1873 }
1874 
1875 static void
1876 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
1877 {
1878 	struct spdk_bdev *bdev = bdev_ch->bdev;
1879 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
1880 	struct spdk_bdev_io *bdev_io;
1881 
1882 	if (shared_ch->io_outstanding > shared_ch->nomem_threshold) {
1883 		/*
1884 		 * Allow some more I/O to complete before retrying the nomem_io queue.
1885 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
1886 		 *  the context of a completion, because the resources for the I/O are
1887 		 *  not released until control returns to the bdev poller.  Also, we
1888 		 *  may require several small I/O to complete before a larger I/O
1889 		 *  (that requires splitting) can be submitted.
1890 		 */
1891 		return;
1892 	}
1893 
1894 	while (!TAILQ_EMPTY(&shared_ch->nomem_io)) {
1895 		bdev_io = TAILQ_FIRST(&shared_ch->nomem_io);
1896 		TAILQ_REMOVE(&shared_ch->nomem_io, bdev_io, link);
1897 		shared_ch->io_outstanding++;
1898 		bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
1899 		bdev->fn_table->submit_request(bdev_io->ch->channel, bdev_io);
1900 		if (bdev_io->status == SPDK_BDEV_IO_STATUS_NOMEM) {
1901 			break;
1902 		}
1903 	}
1904 }
1905 
1906 static void
1907 _spdk_bdev_io_complete(void *ctx)
1908 {
1909 	struct spdk_bdev_io *bdev_io = ctx;
1910 
1911 	assert(bdev_io->cb != NULL);
1912 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, bdev_io->caller_ctx);
1913 }
1914 
1915 static void
1916 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
1917 {
1918 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
1919 
1920 	if (bdev_io->u.reset.ch_ref != NULL) {
1921 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
1922 		bdev_io->u.reset.ch_ref = NULL;
1923 	}
1924 
1925 	_spdk_bdev_io_complete(bdev_io);
1926 }
1927 
1928 static void
1929 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
1930 {
1931 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
1932 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
1933 
1934 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
1935 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
1936 		_spdk_bdev_channel_start_reset(ch);
1937 	}
1938 
1939 	spdk_for_each_channel_continue(i, 0);
1940 }
1941 
1942 void
1943 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
1944 {
1945 	struct spdk_bdev *bdev = bdev_io->bdev;
1946 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
1947 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
1948 
1949 	bdev_io->status = status;
1950 
1951 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
1952 		bool unlock_channels = false;
1953 
1954 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
1955 			SPDK_ERRLOG("NOMEM returned for reset\n");
1956 		}
1957 		pthread_mutex_lock(&bdev->mutex);
1958 		if (bdev_io == bdev->reset_in_progress) {
1959 			bdev->reset_in_progress = NULL;
1960 			unlock_channels = true;
1961 		}
1962 		pthread_mutex_unlock(&bdev->mutex);
1963 
1964 		if (unlock_channels) {
1965 			spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel,
1966 					      bdev_io, _spdk_bdev_reset_complete);
1967 			return;
1968 		}
1969 	} else {
1970 		assert(shared_ch->io_outstanding > 0);
1971 		shared_ch->io_outstanding--;
1972 		if (spdk_likely(status != SPDK_BDEV_IO_STATUS_NOMEM)) {
1973 			if (spdk_unlikely(!TAILQ_EMPTY(&shared_ch->nomem_io))) {
1974 				_spdk_bdev_ch_retry_io(bdev_ch);
1975 			}
1976 		} else {
1977 			TAILQ_INSERT_HEAD(&shared_ch->nomem_io, bdev_io, link);
1978 			/*
1979 			 * Wait for some of the outstanding I/O to complete before we
1980 			 *  retry any of the nomem_io.  Normally we will wait for
1981 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
1982 			 *  depth channels we will instead wait for half to complete.
1983 			 */
1984 			shared_ch->nomem_threshold = spdk_max((int64_t)shared_ch->io_outstanding / 2,
1985 							      (int64_t)shared_ch->io_outstanding - NOMEM_THRESHOLD_COUNT);
1986 			return;
1987 		}
1988 	}
1989 
1990 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1991 		switch (bdev_io->type) {
1992 		case SPDK_BDEV_IO_TYPE_READ:
1993 			bdev_ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev->blocklen;
1994 			bdev_ch->stat.num_read_ops++;
1995 			bdev_ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->submit_tsc);
1996 			break;
1997 		case SPDK_BDEV_IO_TYPE_WRITE:
1998 			bdev_ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev->blocklen;
1999 			bdev_ch->stat.num_write_ops++;
2000 			bdev_ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->submit_tsc);
2001 			break;
2002 		default:
2003 			break;
2004 		}
2005 	}
2006 
2007 #ifdef SPDK_CONFIG_VTUNE
2008 	uint64_t now_tsc = spdk_get_ticks();
2009 	if (now_tsc > (bdev_ch->start_tsc + bdev_ch->interval_tsc)) {
2010 		uint64_t data[5];
2011 
2012 		data[0] = bdev_ch->stat.num_read_ops;
2013 		data[1] = bdev_ch->stat.bytes_read;
2014 		data[2] = bdev_ch->stat.num_write_ops;
2015 		data[3] = bdev_ch->stat.bytes_written;
2016 		data[4] = bdev->fn_table->get_spin_time ?
2017 			  bdev->fn_table->get_spin_time(bdev_ch->channel) : 0;
2018 
2019 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_ch->handle,
2020 				   __itt_metadata_u64, 5, data);
2021 
2022 		memset(&bdev_ch->stat, 0, sizeof(bdev_ch->stat));
2023 		bdev_ch->start_tsc = now_tsc;
2024 	}
2025 #endif
2026 
2027 	if (bdev_io->in_submit_request) {
2028 		/*
2029 		 * Defer completion to avoid potential infinite recursion if the
2030 		 * user's completion callback issues a new I/O.
2031 		 */
2032 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_ch->channel),
2033 				     _spdk_bdev_io_complete, bdev_io);
2034 	} else {
2035 		_spdk_bdev_io_complete(bdev_io);
2036 	}
2037 }
2038 
2039 void
2040 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
2041 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
2042 {
2043 	if (sc == SPDK_SCSI_STATUS_GOOD) {
2044 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
2045 	} else {
2046 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
2047 		bdev_io->error.scsi.sc = sc;
2048 		bdev_io->error.scsi.sk = sk;
2049 		bdev_io->error.scsi.asc = asc;
2050 		bdev_io->error.scsi.ascq = ascq;
2051 	}
2052 
2053 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
2054 }
2055 
2056 void
2057 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
2058 			     int *sc, int *sk, int *asc, int *ascq)
2059 {
2060 	assert(sc != NULL);
2061 	assert(sk != NULL);
2062 	assert(asc != NULL);
2063 	assert(ascq != NULL);
2064 
2065 	switch (bdev_io->status) {
2066 	case SPDK_BDEV_IO_STATUS_SUCCESS:
2067 		*sc = SPDK_SCSI_STATUS_GOOD;
2068 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
2069 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2070 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2071 		break;
2072 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
2073 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
2074 		break;
2075 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
2076 		*sc = bdev_io->error.scsi.sc;
2077 		*sk = bdev_io->error.scsi.sk;
2078 		*asc = bdev_io->error.scsi.asc;
2079 		*ascq = bdev_io->error.scsi.ascq;
2080 		break;
2081 	default:
2082 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
2083 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
2084 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2085 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2086 		break;
2087 	}
2088 }
2089 
2090 void
2091 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
2092 {
2093 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
2094 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
2095 	} else {
2096 		bdev_io->error.nvme.sct = sct;
2097 		bdev_io->error.nvme.sc = sc;
2098 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
2099 	}
2100 
2101 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
2102 }
2103 
2104 void
2105 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
2106 {
2107 	assert(sct != NULL);
2108 	assert(sc != NULL);
2109 
2110 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
2111 		*sct = bdev_io->error.nvme.sct;
2112 		*sc = bdev_io->error.nvme.sc;
2113 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2114 		*sct = SPDK_NVME_SCT_GENERIC;
2115 		*sc = SPDK_NVME_SC_SUCCESS;
2116 	} else {
2117 		*sct = SPDK_NVME_SCT_GENERIC;
2118 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
2119 	}
2120 }
2121 
2122 struct spdk_thread *
2123 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
2124 {
2125 	return spdk_io_channel_get_thread(bdev_io->ch->channel);
2126 }
2127 
2128 static int
2129 _spdk_bdev_register(struct spdk_bdev *bdev)
2130 {
2131 	struct spdk_bdev_module_if *module;
2132 
2133 	assert(bdev->module != NULL);
2134 
2135 	if (!bdev->name) {
2136 		SPDK_ERRLOG("Bdev name is NULL\n");
2137 		return -EINVAL;
2138 	}
2139 
2140 	if (spdk_bdev_get_by_name(bdev->name)) {
2141 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
2142 		return -EEXIST;
2143 	}
2144 
2145 	bdev->status = SPDK_BDEV_STATUS_READY;
2146 
2147 	TAILQ_INIT(&bdev->open_descs);
2148 
2149 	TAILQ_INIT(&bdev->vbdevs);
2150 	TAILQ_INIT(&bdev->base_bdevs);
2151 
2152 	TAILQ_INIT(&bdev->aliases);
2153 
2154 	bdev->reset_in_progress = NULL;
2155 
2156 	spdk_io_device_register(__bdev_to_io_dev(bdev),
2157 				spdk_bdev_channel_create, spdk_bdev_channel_destroy,
2158 				sizeof(struct spdk_bdev_channel));
2159 
2160 	pthread_mutex_init(&bdev->mutex, NULL);
2161 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
2162 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
2163 
2164 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
2165 		if (module->examine) {
2166 			module->action_in_progress++;
2167 			module->examine(bdev);
2168 		}
2169 	}
2170 
2171 	return 0;
2172 }
2173 
2174 int
2175 spdk_bdev_register(struct spdk_bdev *bdev)
2176 {
2177 	return _spdk_bdev_register(bdev);
2178 }
2179 
2180 int
2181 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
2182 {
2183 	int i, rc;
2184 
2185 	rc = _spdk_bdev_register(vbdev);
2186 	if (rc) {
2187 		return rc;
2188 	}
2189 
2190 	for (i = 0; i < base_bdev_count; i++) {
2191 		assert(base_bdevs[i] != NULL);
2192 		assert(base_bdevs[i]->claim_module != NULL);
2193 		TAILQ_INSERT_TAIL(&vbdev->base_bdevs, base_bdevs[i], base_bdev_link);
2194 		TAILQ_INSERT_TAIL(&base_bdevs[i]->vbdevs, vbdev, vbdev_link);
2195 	}
2196 
2197 	return 0;
2198 }
2199 
2200 void
2201 spdk_bdev_unregister_done(struct spdk_bdev *bdev, int bdeverrno)
2202 {
2203 	if (bdev->unregister_cb != NULL) {
2204 		bdev->unregister_cb(bdev->unregister_ctx, bdeverrno);
2205 	}
2206 }
2207 
2208 static void
2209 _remove_notify(void *arg)
2210 {
2211 	struct spdk_bdev_desc *desc = arg;
2212 
2213 	desc->remove_cb(desc->remove_ctx);
2214 }
2215 
2216 void
2217 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
2218 {
2219 	struct spdk_bdev_desc	*desc, *tmp;
2220 	int			rc;
2221 	bool			do_destruct = true;
2222 	struct spdk_bdev	*base_bdev;
2223 
2224 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
2225 
2226 	pthread_mutex_lock(&bdev->mutex);
2227 
2228 	if (!TAILQ_EMPTY(&bdev->base_bdevs)) {
2229 		TAILQ_FOREACH(base_bdev, &bdev->base_bdevs, base_bdev_link) {
2230 			TAILQ_REMOVE(&base_bdev->vbdevs, bdev, vbdev_link);
2231 		}
2232 	}
2233 
2234 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
2235 	bdev->unregister_cb = cb_fn;
2236 	bdev->unregister_ctx = cb_arg;
2237 
2238 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
2239 		if (desc->remove_cb) {
2240 			do_destruct = false;
2241 			/*
2242 			 * Defer invocation of the remove_cb to a separate message that will
2243 			 *  run later on this thread.  This ensures this context unwinds and
2244 			 *  we don't recursively unregister this bdev again if the remove_cb
2245 			 *  immediately closes its descriptor.
2246 			 */
2247 			spdk_thread_send_msg(spdk_get_thread(), _remove_notify, desc);
2248 		}
2249 	}
2250 
2251 	if (!do_destruct) {
2252 		pthread_mutex_unlock(&bdev->mutex);
2253 		return;
2254 	}
2255 
2256 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
2257 	pthread_mutex_unlock(&bdev->mutex);
2258 
2259 	pthread_mutex_destroy(&bdev->mutex);
2260 
2261 	spdk_io_device_unregister(__bdev_to_io_dev(bdev), NULL);
2262 
2263 	rc = bdev->fn_table->destruct(bdev->ctxt);
2264 	if (rc < 0) {
2265 		SPDK_ERRLOG("destruct failed\n");
2266 	}
2267 	if (rc <= 0 && cb_fn != NULL) {
2268 		cb_fn(cb_arg, rc);
2269 	}
2270 }
2271 
2272 int
2273 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
2274 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
2275 {
2276 	struct spdk_bdev_desc *desc;
2277 
2278 	desc = calloc(1, sizeof(*desc));
2279 	if (desc == NULL) {
2280 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
2281 		return -ENOMEM;
2282 	}
2283 
2284 	pthread_mutex_lock(&bdev->mutex);
2285 
2286 	if (write && bdev->claim_module) {
2287 		SPDK_INFOLOG(SPDK_LOG_BDEV, "Could not open %s - already claimed\n", bdev->name);
2288 		free(desc);
2289 		pthread_mutex_unlock(&bdev->mutex);
2290 		return -EPERM;
2291 	}
2292 
2293 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
2294 
2295 	desc->bdev = bdev;
2296 	desc->remove_cb = remove_cb;
2297 	desc->remove_ctx = remove_ctx;
2298 	desc->write = write;
2299 	*_desc = desc;
2300 
2301 	pthread_mutex_unlock(&bdev->mutex);
2302 
2303 	return 0;
2304 }
2305 
2306 void
2307 spdk_bdev_close(struct spdk_bdev_desc *desc)
2308 {
2309 	struct spdk_bdev *bdev = desc->bdev;
2310 	bool do_unregister = false;
2311 
2312 	pthread_mutex_lock(&bdev->mutex);
2313 
2314 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
2315 	free(desc);
2316 
2317 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
2318 		do_unregister = true;
2319 	}
2320 	pthread_mutex_unlock(&bdev->mutex);
2321 
2322 	if (do_unregister == true) {
2323 		spdk_bdev_unregister(bdev, bdev->unregister_cb, bdev->unregister_ctx);
2324 	}
2325 }
2326 
2327 int
2328 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
2329 			    struct spdk_bdev_module_if *module)
2330 {
2331 	if (bdev->claim_module != NULL) {
2332 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
2333 			    bdev->claim_module->name);
2334 		return -EPERM;
2335 	}
2336 
2337 	if (desc && !desc->write) {
2338 		desc->write = true;
2339 	}
2340 
2341 	bdev->claim_module = module;
2342 	return 0;
2343 }
2344 
2345 void
2346 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
2347 {
2348 	assert(bdev->claim_module != NULL);
2349 	bdev->claim_module = NULL;
2350 }
2351 
2352 struct spdk_bdev *
2353 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
2354 {
2355 	return desc->bdev;
2356 }
2357 
2358 void
2359 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
2360 {
2361 	struct iovec *iovs;
2362 	int iovcnt;
2363 
2364 	if (bdev_io == NULL) {
2365 		return;
2366 	}
2367 
2368 	switch (bdev_io->type) {
2369 	case SPDK_BDEV_IO_TYPE_READ:
2370 		iovs = bdev_io->u.bdev.iovs;
2371 		iovcnt = bdev_io->u.bdev.iovcnt;
2372 		break;
2373 	case SPDK_BDEV_IO_TYPE_WRITE:
2374 		iovs = bdev_io->u.bdev.iovs;
2375 		iovcnt = bdev_io->u.bdev.iovcnt;
2376 		break;
2377 	default:
2378 		iovs = NULL;
2379 		iovcnt = 0;
2380 		break;
2381 	}
2382 
2383 	if (iovp) {
2384 		*iovp = iovs;
2385 	}
2386 	if (iovcntp) {
2387 		*iovcntp = iovcnt;
2388 	}
2389 }
2390 
2391 void
2392 spdk_bdev_module_list_add(struct spdk_bdev_module_if *bdev_module)
2393 {
2394 
2395 	if (spdk_bdev_module_list_find(bdev_module->name)) {
2396 		fprintf(stderr, "ERROR: module '%s' already registered.\n", bdev_module->name);
2397 		assert(false);
2398 	}
2399 
2400 	if (bdev_module->async_init) {
2401 		bdev_module->action_in_progress = 1;
2402 	}
2403 
2404 	/*
2405 	 * Modules with examine callbacks must be initialized first, so they are
2406 	 *  ready to handle examine callbacks from later modules that will
2407 	 *  register physical bdevs.
2408 	 */
2409 	if (bdev_module->examine != NULL) {
2410 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2411 	} else {
2412 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2413 	}
2414 }
2415 
2416 struct spdk_bdev_module_if *
2417 spdk_bdev_module_list_find(const char *name)
2418 {
2419 	struct spdk_bdev_module_if *bdev_module;
2420 
2421 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
2422 		if (strcmp(name, bdev_module->name) == 0) {
2423 			break;
2424 		}
2425 	}
2426 
2427 	return bdev_module;
2428 }
2429 
2430 static void
2431 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2432 {
2433 	uint64_t len;
2434 
2435 	if (!success) {
2436 		bdev_io->cb = bdev_io->stored_user_cb;
2437 		_spdk_bdev_io_complete(bdev_io);
2438 		return;
2439 	}
2440 
2441 	/* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */
2442 	len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->split_remaining_num_blocks,
2443 		       ZERO_BUFFER_SIZE);
2444 
2445 	bdev_io->u.bdev.offset_blocks = bdev_io->split_current_offset_blocks;
2446 	bdev_io->u.bdev.iov.iov_len = len;
2447 	bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev);
2448 	bdev_io->split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks;
2449 	bdev_io->split_current_offset_blocks += bdev_io->u.bdev.num_blocks;
2450 
2451 	/* if this round completes the i/o, change the callback to be the original user callback */
2452 	if (bdev_io->split_remaining_num_blocks == 0) {
2453 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->stored_user_cb);
2454 	} else {
2455 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split);
2456 	}
2457 	spdk_bdev_io_submit(bdev_io);
2458 }
2459 
2460 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
2461