xref: /spdk/lib/bdev/bdev.c (revision f86f10757912918b8ba7b4b3bfdab1cd4c2d180c)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/event.h"
41 #include "spdk/io_channel.h"
42 #include "spdk/likely.h"
43 #include "spdk/queue.h"
44 #include "spdk/nvme_spec.h"
45 #include "spdk/scsi_spec.h"
46 #include "spdk/util.h"
47 
48 #include "spdk_internal/bdev.h"
49 #include "spdk_internal/log.h"
50 #include "spdk/string.h"
51 
52 #ifdef SPDK_CONFIG_VTUNE
53 #include "ittnotify.h"
54 #include "ittnotify_types.h"
55 int __itt_init_ittlib(const char *, __itt_group_id);
56 #endif
57 
58 #define SPDK_BDEV_IO_POOL_SIZE	(64 * 1024)
59 #define BUF_SMALL_POOL_SIZE	8192
60 #define BUF_LARGE_POOL_SIZE	1024
61 #define NOMEM_THRESHOLD_COUNT	8
62 #define ZERO_BUFFER_SIZE	0x100000
63 
64 typedef TAILQ_HEAD(, spdk_bdev_io) bdev_io_tailq_t;
65 
66 struct spdk_bdev_mgr {
67 	struct spdk_mempool *bdev_io_pool;
68 
69 	struct spdk_mempool *buf_small_pool;
70 	struct spdk_mempool *buf_large_pool;
71 
72 	void *zero_buffer;
73 
74 	TAILQ_HEAD(, spdk_bdev_module_if) bdev_modules;
75 
76 	TAILQ_HEAD(, spdk_bdev) bdevs;
77 
78 	bool init_complete;
79 	bool module_init_complete;
80 
81 #ifdef SPDK_CONFIG_VTUNE
82 	__itt_domain	*domain;
83 #endif
84 };
85 
86 static struct spdk_bdev_mgr g_bdev_mgr = {
87 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
88 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
89 	.init_complete = false,
90 	.module_init_complete = false,
91 };
92 
93 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
94 static void			*g_init_cb_arg = NULL;
95 
96 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
97 static void			*g_fini_cb_arg = NULL;
98 static struct spdk_thread	*g_fini_thread = NULL;
99 
100 
101 struct spdk_bdev_mgmt_channel {
102 	bdev_io_tailq_t need_buf_small;
103 	bdev_io_tailq_t need_buf_large;
104 };
105 
106 struct spdk_bdev_desc {
107 	struct spdk_bdev		*bdev;
108 	spdk_bdev_remove_cb_t		remove_cb;
109 	void				*remove_ctx;
110 	bool				write;
111 	TAILQ_ENTRY(spdk_bdev_desc)	link;
112 };
113 
114 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
115 
116 struct spdk_bdev_channel {
117 	struct spdk_bdev	*bdev;
118 
119 	/* The channel for the underlying device */
120 	struct spdk_io_channel	*channel;
121 
122 	/* Channel for the bdev manager */
123 	struct spdk_io_channel *mgmt_channel;
124 
125 	struct spdk_bdev_io_stat stat;
126 
127 	/*
128 	 * Count of I/O submitted to bdev module and waiting for completion.
129 	 * Incremented before submit_request() is called on an spdk_bdev_io.
130 	 */
131 	uint64_t		io_outstanding;
132 
133 	bdev_io_tailq_t		queued_resets;
134 
135 	/*
136 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
137 	 *  on this channel.
138 	 */
139 	bdev_io_tailq_t		nomem_io;
140 
141 	/*
142 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
143 	 */
144 	uint64_t		nomem_threshold;
145 
146 	uint32_t		flags;
147 
148 #ifdef SPDK_CONFIG_VTUNE
149 	uint64_t		start_tsc;
150 	uint64_t		interval_tsc;
151 	__itt_string_handle	*handle;
152 #endif
153 
154 };
155 
156 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
157 
158 struct spdk_bdev *
159 spdk_bdev_first(void)
160 {
161 	struct spdk_bdev *bdev;
162 
163 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
164 	if (bdev) {
165 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
166 	}
167 
168 	return bdev;
169 }
170 
171 struct spdk_bdev *
172 spdk_bdev_next(struct spdk_bdev *prev)
173 {
174 	struct spdk_bdev *bdev;
175 
176 	bdev = TAILQ_NEXT(prev, link);
177 	if (bdev) {
178 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
179 	}
180 
181 	return bdev;
182 }
183 
184 static struct spdk_bdev *
185 _bdev_next_leaf(struct spdk_bdev *bdev)
186 {
187 	while (bdev != NULL) {
188 		if (TAILQ_EMPTY(&bdev->vbdevs)) {
189 			return bdev;
190 		} else {
191 			bdev = TAILQ_NEXT(bdev, link);
192 		}
193 	}
194 
195 	return bdev;
196 }
197 
198 struct spdk_bdev *
199 spdk_bdev_first_leaf(void)
200 {
201 	struct spdk_bdev *bdev;
202 
203 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
204 
205 	if (bdev) {
206 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
207 	}
208 
209 	return bdev;
210 }
211 
212 struct spdk_bdev *
213 spdk_bdev_next_leaf(struct spdk_bdev *prev)
214 {
215 	struct spdk_bdev *bdev;
216 
217 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
218 
219 	if (bdev) {
220 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
221 	}
222 
223 	return bdev;
224 }
225 
226 struct spdk_bdev *
227 spdk_bdev_get_by_name(const char *bdev_name)
228 {
229 	struct spdk_bdev *bdev = spdk_bdev_first();
230 
231 	while (bdev != NULL) {
232 		if (strcmp(bdev_name, bdev->name) == 0) {
233 			return bdev;
234 		}
235 		bdev = spdk_bdev_next(bdev);
236 	}
237 
238 	return NULL;
239 }
240 
241 static void
242 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
243 {
244 	assert(bdev_io->get_buf_cb != NULL);
245 	assert(buf != NULL);
246 	assert(bdev_io->u.bdev.iovs != NULL);
247 
248 	bdev_io->buf = buf;
249 	bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
250 	bdev_io->u.bdev.iovs[0].iov_len = bdev_io->buf_len;
251 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
252 }
253 
254 static void
255 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
256 {
257 	struct spdk_mempool *pool;
258 	struct spdk_bdev_io *tmp;
259 	void *buf;
260 	bdev_io_tailq_t *tailq;
261 	struct spdk_bdev_mgmt_channel *ch;
262 
263 	assert(bdev_io->u.bdev.iovcnt == 1);
264 
265 	buf = bdev_io->buf;
266 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
267 
268 	if (bdev_io->buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
269 		pool = g_bdev_mgr.buf_small_pool;
270 		tailq = &ch->need_buf_small;
271 	} else {
272 		pool = g_bdev_mgr.buf_large_pool;
273 		tailq = &ch->need_buf_large;
274 	}
275 
276 	if (TAILQ_EMPTY(tailq)) {
277 		spdk_mempool_put(pool, buf);
278 	} else {
279 		tmp = TAILQ_FIRST(tailq);
280 		TAILQ_REMOVE(tailq, tmp, buf_link);
281 		spdk_bdev_io_set_buf(tmp, buf);
282 	}
283 }
284 
285 void
286 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
287 {
288 	struct spdk_mempool *pool;
289 	bdev_io_tailq_t *tailq;
290 	void *buf = NULL;
291 	struct spdk_bdev_mgmt_channel *ch;
292 
293 	assert(cb != NULL);
294 	assert(bdev_io->u.bdev.iovs != NULL);
295 
296 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
297 		/* Buffer already present */
298 		cb(bdev_io->ch->channel, bdev_io);
299 		return;
300 	}
301 
302 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
303 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
304 
305 	bdev_io->buf_len = len;
306 	bdev_io->get_buf_cb = cb;
307 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
308 		pool = g_bdev_mgr.buf_small_pool;
309 		tailq = &ch->need_buf_small;
310 	} else {
311 		pool = g_bdev_mgr.buf_large_pool;
312 		tailq = &ch->need_buf_large;
313 	}
314 
315 	buf = spdk_mempool_get(pool);
316 
317 	if (!buf) {
318 		TAILQ_INSERT_TAIL(tailq, bdev_io, buf_link);
319 	} else {
320 		spdk_bdev_io_set_buf(bdev_io, buf);
321 	}
322 }
323 
324 static int
325 spdk_bdev_module_get_max_ctx_size(void)
326 {
327 	struct spdk_bdev_module_if *bdev_module;
328 	int max_bdev_module_size = 0;
329 
330 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
331 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
332 			max_bdev_module_size = bdev_module->get_ctx_size();
333 		}
334 	}
335 
336 	return max_bdev_module_size;
337 }
338 
339 void
340 spdk_bdev_config_text(FILE *fp)
341 {
342 	struct spdk_bdev_module_if *bdev_module;
343 
344 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
345 		if (bdev_module->config_text) {
346 			bdev_module->config_text(fp);
347 		}
348 	}
349 }
350 
351 static int
352 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
353 {
354 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
355 
356 	TAILQ_INIT(&ch->need_buf_small);
357 	TAILQ_INIT(&ch->need_buf_large);
358 
359 	return 0;
360 }
361 
362 static void
363 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
364 {
365 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
366 
367 	if (!TAILQ_EMPTY(&ch->need_buf_small) || !TAILQ_EMPTY(&ch->need_buf_large)) {
368 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel destruction\n");
369 	}
370 }
371 
372 static void
373 spdk_bdev_init_complete(int rc)
374 {
375 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
376 	void *cb_arg = g_init_cb_arg;
377 
378 	g_bdev_mgr.init_complete = true;
379 	g_init_cb_fn = NULL;
380 	g_init_cb_arg = NULL;
381 
382 	cb_fn(cb_arg, rc);
383 }
384 
385 static void
386 spdk_bdev_module_action_complete(void)
387 {
388 	struct spdk_bdev_module_if *m;
389 
390 	/*
391 	 * Don't finish bdev subsystem initialization if
392 	 * module pre-initialization is still in progress, or
393 	 * the subsystem been already initialized.
394 	 */
395 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
396 		return;
397 	}
398 
399 	/*
400 	 * Check all bdev modules for inits/examinations in progress. If any
401 	 * exist, return immediately since we cannot finish bdev subsystem
402 	 * initialization until all are completed.
403 	 */
404 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
405 		if (m->action_in_progress > 0) {
406 			return;
407 		}
408 	}
409 
410 	/*
411 	 * Modules already finished initialization - now that all
412 	 * the bdev modules have finished their asynchronous I/O
413 	 * processing, the entire bdev layer can be marked as complete.
414 	 */
415 	spdk_bdev_init_complete(0);
416 }
417 
418 static void
419 spdk_bdev_module_action_done(struct spdk_bdev_module_if *module)
420 {
421 	assert(module->action_in_progress > 0);
422 	module->action_in_progress--;
423 	spdk_bdev_module_action_complete();
424 }
425 
426 void
427 spdk_bdev_module_init_done(struct spdk_bdev_module_if *module)
428 {
429 	spdk_bdev_module_action_done(module);
430 }
431 
432 void
433 spdk_bdev_module_examine_done(struct spdk_bdev_module_if *module)
434 {
435 	spdk_bdev_module_action_done(module);
436 }
437 
438 static int
439 spdk_bdev_modules_init(void)
440 {
441 	struct spdk_bdev_module_if *module;
442 	int rc = 0;
443 
444 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
445 		rc = module->module_init();
446 		if (rc != 0) {
447 			break;
448 		}
449 	}
450 
451 	g_bdev_mgr.module_init_complete = true;
452 	return rc;
453 }
454 void
455 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
456 {
457 	int cache_size;
458 	int rc = 0;
459 	char mempool_name[32];
460 
461 	assert(cb_fn != NULL);
462 
463 	g_init_cb_fn = cb_fn;
464 	g_init_cb_arg = cb_arg;
465 
466 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
467 
468 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
469 				  SPDK_BDEV_IO_POOL_SIZE,
470 				  sizeof(struct spdk_bdev_io) +
471 				  spdk_bdev_module_get_max_ctx_size(),
472 				  64,
473 				  SPDK_ENV_SOCKET_ID_ANY);
474 
475 	if (g_bdev_mgr.bdev_io_pool == NULL) {
476 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
477 		spdk_bdev_init_complete(-1);
478 		return;
479 	}
480 
481 	/**
482 	 * Ensure no more than half of the total buffers end up local caches, by
483 	 *   using spdk_env_get_core_count() to determine how many local caches we need
484 	 *   to account for.
485 	 */
486 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
487 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
488 
489 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
490 				    BUF_SMALL_POOL_SIZE,
491 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
492 				    cache_size,
493 				    SPDK_ENV_SOCKET_ID_ANY);
494 	if (!g_bdev_mgr.buf_small_pool) {
495 		SPDK_ERRLOG("create rbuf small pool failed\n");
496 		spdk_bdev_init_complete(-1);
497 		return;
498 	}
499 
500 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
501 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
502 
503 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
504 				    BUF_LARGE_POOL_SIZE,
505 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
506 				    cache_size,
507 				    SPDK_ENV_SOCKET_ID_ANY);
508 	if (!g_bdev_mgr.buf_large_pool) {
509 		SPDK_ERRLOG("create rbuf large pool failed\n");
510 		spdk_bdev_init_complete(-1);
511 		return;
512 	}
513 
514 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
515 				 NULL);
516 	if (!g_bdev_mgr.zero_buffer) {
517 		SPDK_ERRLOG("create bdev zero buffer failed\n");
518 		spdk_bdev_init_complete(-1);
519 		return;
520 	}
521 
522 #ifdef SPDK_CONFIG_VTUNE
523 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
524 #endif
525 
526 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
527 				spdk_bdev_mgmt_channel_destroy,
528 				sizeof(struct spdk_bdev_mgmt_channel));
529 
530 	rc = spdk_bdev_modules_init();
531 	if (rc != 0) {
532 		SPDK_ERRLOG("bdev modules init failed\n");
533 		spdk_bdev_init_complete(-1);
534 		return;
535 	}
536 
537 	spdk_bdev_module_action_complete();
538 }
539 
540 static void
541 spdk_bdev_module_finish_cb(void *io_device)
542 {
543 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
544 
545 	cb_fn(g_fini_cb_arg);
546 	g_fini_cb_fn = NULL;
547 	g_fini_cb_arg = NULL;
548 }
549 
550 static void
551 spdk_bdev_module_finish_complete(void)
552 {
553 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
554 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
555 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
556 			    SPDK_BDEV_IO_POOL_SIZE);
557 	}
558 
559 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
560 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
561 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
562 			    BUF_SMALL_POOL_SIZE);
563 		assert(false);
564 	}
565 
566 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
567 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
568 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
569 			    BUF_LARGE_POOL_SIZE);
570 		assert(false);
571 	}
572 
573 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
574 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
575 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
576 	spdk_dma_free(g_bdev_mgr.zero_buffer);
577 
578 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_module_finish_cb);
579 }
580 
581 static void
582 spdk_bdev_module_finish_iter(void *arg)
583 {
584 	/* Notice that this variable is static. It is saved between calls to
585 	 * this function. */
586 	static struct spdk_bdev_module_if *resume_bdev_module = NULL;
587 	struct spdk_bdev_module_if *bdev_module;
588 
589 	/* Start iterating from the last touched module */
590 	if (!resume_bdev_module) {
591 		bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules);
592 	} else {
593 		bdev_module = TAILQ_NEXT(resume_bdev_module, tailq);
594 	}
595 
596 	while (bdev_module) {
597 		if (bdev_module->async_fini) {
598 			/* Save our place so we can resume later. We must
599 			 * save the variable here, before calling module_fini()
600 			 * below, because in some cases the module may immediately
601 			 * call spdk_bdev_module_finish_done() and re-enter
602 			 * this function to continue iterating. */
603 			resume_bdev_module = bdev_module;
604 		}
605 
606 		if (bdev_module->module_fini) {
607 			bdev_module->module_fini();
608 		}
609 
610 		if (bdev_module->async_fini) {
611 			return;
612 		}
613 
614 		bdev_module = TAILQ_NEXT(bdev_module, tailq);
615 	}
616 
617 	resume_bdev_module = NULL;
618 	spdk_bdev_module_finish_complete();
619 }
620 
621 void
622 spdk_bdev_module_finish_done(void)
623 {
624 	if (spdk_get_thread() != g_fini_thread) {
625 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
626 	} else {
627 		spdk_bdev_module_finish_iter(NULL);
628 	}
629 }
630 
631 void
632 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
633 {
634 	assert(cb_fn != NULL);
635 
636 	g_fini_thread = spdk_get_thread();
637 
638 	g_fini_cb_fn = cb_fn;
639 	g_fini_cb_arg = cb_arg;
640 
641 	spdk_bdev_module_finish_iter(NULL);
642 }
643 
644 struct spdk_bdev_io *
645 spdk_bdev_get_io(void)
646 {
647 	struct spdk_bdev_io *bdev_io;
648 
649 	bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
650 	if (!bdev_io) {
651 		SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
652 		abort();
653 	}
654 
655 	memset(bdev_io, 0, offsetof(struct spdk_bdev_io, u));
656 
657 	return bdev_io;
658 }
659 
660 static void
661 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
662 {
663 	if (bdev_io->buf != NULL) {
664 		spdk_bdev_io_put_buf(bdev_io);
665 	}
666 
667 	spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
668 }
669 
670 static void
671 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
672 {
673 	struct spdk_bdev *bdev = bdev_io->bdev;
674 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
675 	struct spdk_io_channel *ch = bdev_ch->channel;
676 
677 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
678 
679 	bdev_ch->io_outstanding++;
680 	bdev_io->in_submit_request = true;
681 	if (spdk_likely(bdev_ch->flags == 0)) {
682 		if (spdk_likely(TAILQ_EMPTY(&bdev_ch->nomem_io))) {
683 			bdev->fn_table->submit_request(ch, bdev_io);
684 		} else {
685 			bdev_ch->io_outstanding--;
686 			TAILQ_INSERT_TAIL(&bdev_ch->nomem_io, bdev_io, link);
687 		}
688 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
689 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
690 	} else {
691 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
692 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
693 	}
694 	bdev_io->in_submit_request = false;
695 }
696 
697 static void
698 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
699 {
700 	struct spdk_bdev *bdev = bdev_io->bdev;
701 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
702 	struct spdk_io_channel *ch = bdev_ch->channel;
703 
704 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
705 
706 	bdev_io->in_submit_request = true;
707 	bdev->fn_table->submit_request(ch, bdev_io);
708 	bdev_io->in_submit_request = false;
709 }
710 
711 static void
712 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
713 		  struct spdk_bdev *bdev, void *cb_arg,
714 		  spdk_bdev_io_completion_cb cb)
715 {
716 	bdev_io->bdev = bdev;
717 	bdev_io->caller_ctx = cb_arg;
718 	bdev_io->cb = cb;
719 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
720 	bdev_io->in_submit_request = false;
721 }
722 
723 bool
724 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
725 {
726 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
727 }
728 
729 int
730 spdk_bdev_dump_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
731 {
732 	if (bdev->fn_table->dump_config_json) {
733 		return bdev->fn_table->dump_config_json(bdev->ctxt, w);
734 	}
735 
736 	return 0;
737 }
738 
739 static int
740 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
741 {
742 	struct spdk_bdev		*bdev = io_device;
743 	struct spdk_bdev_channel	*ch = ctx_buf;
744 
745 	ch->bdev = io_device;
746 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
747 	if (!ch->channel) {
748 		return -1;
749 	}
750 
751 	ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr);
752 	if (!ch->mgmt_channel) {
753 		spdk_put_io_channel(ch->channel);
754 		return -1;
755 	}
756 
757 	memset(&ch->stat, 0, sizeof(ch->stat));
758 	ch->io_outstanding = 0;
759 	TAILQ_INIT(&ch->queued_resets);
760 	TAILQ_INIT(&ch->nomem_io);
761 	ch->nomem_threshold = 0;
762 	ch->flags = 0;
763 
764 #ifdef SPDK_CONFIG_VTUNE
765 	{
766 		char *name;
767 		__itt_init_ittlib(NULL, 0);
768 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
769 		if (!name) {
770 			spdk_put_io_channel(ch->channel);
771 			spdk_put_io_channel(ch->mgmt_channel);
772 			return -1;
773 		}
774 		ch->handle = __itt_string_handle_create(name);
775 		free(name);
776 		ch->start_tsc = spdk_get_ticks();
777 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
778 	}
779 #endif
780 
781 	return 0;
782 }
783 
784 /*
785  * Abort I/O that are waiting on a data buffer.  These types of I/O are
786  *  linked using the spdk_bdev_io buf_link TAILQ_ENTRY.
787  */
788 static void
789 _spdk_bdev_abort_buf_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
790 {
791 	struct spdk_bdev_io *bdev_io, *tmp;
792 
793 	TAILQ_FOREACH_SAFE(bdev_io, queue, buf_link, tmp) {
794 		if (bdev_io->ch == ch) {
795 			TAILQ_REMOVE(queue, bdev_io, buf_link);
796 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
797 		}
798 	}
799 }
800 
801 /*
802  * Abort I/O that are queued waiting for submission.  These types of I/O are
803  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
804  */
805 static void
806 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
807 {
808 	struct spdk_bdev_io *bdev_io, *tmp;
809 
810 	TAILQ_FOREACH_SAFE(bdev_io, queue, link, tmp) {
811 		if (bdev_io->ch == ch) {
812 			TAILQ_REMOVE(queue, bdev_io, link);
813 			/*
814 			 * spdk_bdev_io_complete() assumes that the completed I/O had
815 			 *  been submitted to the bdev module.  Since in this case it
816 			 *  hadn't, bump io_outstanding to account for the decrement
817 			 *  that spdk_bdev_io_complete() will do.
818 			 */
819 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
820 				ch->io_outstanding++;
821 			}
822 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
823 		}
824 	}
825 }
826 
827 static void
828 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
829 {
830 	struct spdk_bdev_channel	*ch = ctx_buf;
831 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
832 
833 	mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
834 
835 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
836 	_spdk_bdev_abort_queued_io(&ch->nomem_io, ch);
837 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, ch);
838 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, ch);
839 
840 	spdk_put_io_channel(ch->channel);
841 	spdk_put_io_channel(ch->mgmt_channel);
842 	assert(ch->io_outstanding == 0);
843 }
844 
845 struct spdk_io_channel *
846 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
847 {
848 	return spdk_get_io_channel(desc->bdev);
849 }
850 
851 const char *
852 spdk_bdev_get_name(const struct spdk_bdev *bdev)
853 {
854 	return bdev->name;
855 }
856 
857 const char *
858 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
859 {
860 	return bdev->product_name;
861 }
862 
863 uint32_t
864 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
865 {
866 	return bdev->blocklen;
867 }
868 
869 uint64_t
870 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
871 {
872 	return bdev->blockcnt;
873 }
874 
875 size_t
876 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
877 {
878 	/* TODO: push this logic down to the bdev modules */
879 	if (bdev->need_aligned_buffer) {
880 		return bdev->blocklen;
881 	}
882 
883 	return 1;
884 }
885 
886 uint32_t
887 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
888 {
889 	return bdev->optimal_io_boundary;
890 }
891 
892 bool
893 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
894 {
895 	return bdev->write_cache;
896 }
897 
898 /*
899  * Convert I/O offset and length from bytes to blocks.
900  *
901  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
902  */
903 static uint64_t
904 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
905 			  uint64_t num_bytes, uint64_t *num_blocks)
906 {
907 	uint32_t block_size = bdev->blocklen;
908 
909 	*offset_blocks = offset_bytes / block_size;
910 	*num_blocks = num_bytes / block_size;
911 
912 	return (offset_bytes % block_size) | (num_bytes % block_size);
913 }
914 
915 static bool
916 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
917 {
918 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
919 	 * has been an overflow and hence the offset has been wrapped around */
920 	if (offset_blocks + num_blocks < offset_blocks) {
921 		return false;
922 	}
923 
924 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
925 	if (offset_blocks + num_blocks > bdev->blockcnt) {
926 		return false;
927 	}
928 
929 	return true;
930 }
931 
932 int
933 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
934 	       void *buf, uint64_t offset, uint64_t nbytes,
935 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
936 {
937 	uint64_t offset_blocks, num_blocks;
938 
939 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
940 		return -EINVAL;
941 	}
942 
943 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
944 }
945 
946 int
947 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
948 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
949 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
950 {
951 	struct spdk_bdev *bdev = desc->bdev;
952 	struct spdk_bdev_io *bdev_io;
953 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
954 
955 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
956 		return -EINVAL;
957 	}
958 
959 	bdev_io = spdk_bdev_get_io();
960 	if (!bdev_io) {
961 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
962 		return -ENOMEM;
963 	}
964 
965 	bdev_io->ch = channel;
966 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
967 	bdev_io->u.bdev.iov.iov_base = buf;
968 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
969 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
970 	bdev_io->u.bdev.iovcnt = 1;
971 	bdev_io->u.bdev.num_blocks = num_blocks;
972 	bdev_io->u.bdev.offset_blocks = offset_blocks;
973 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
974 
975 	spdk_bdev_io_submit(bdev_io);
976 	return 0;
977 }
978 
979 int
980 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
981 		struct iovec *iov, int iovcnt,
982 		uint64_t offset, uint64_t nbytes,
983 		spdk_bdev_io_completion_cb cb, void *cb_arg)
984 {
985 	uint64_t offset_blocks, num_blocks;
986 
987 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
988 		return -EINVAL;
989 	}
990 
991 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
992 }
993 
994 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
995 			   struct iovec *iov, int iovcnt,
996 			   uint64_t offset_blocks, uint64_t num_blocks,
997 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
998 {
999 	struct spdk_bdev *bdev = desc->bdev;
1000 	struct spdk_bdev_io *bdev_io;
1001 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1002 
1003 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1004 		return -EINVAL;
1005 	}
1006 
1007 	bdev_io = spdk_bdev_get_io();
1008 	if (!bdev_io) {
1009 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1010 		return -ENOMEM;
1011 	}
1012 
1013 	bdev_io->ch = channel;
1014 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1015 	bdev_io->u.bdev.iovs = iov;
1016 	bdev_io->u.bdev.iovcnt = iovcnt;
1017 	bdev_io->u.bdev.num_blocks = num_blocks;
1018 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1019 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1020 
1021 	spdk_bdev_io_submit(bdev_io);
1022 	return 0;
1023 }
1024 
1025 int
1026 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1027 		void *buf, uint64_t offset, uint64_t nbytes,
1028 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1029 {
1030 	uint64_t offset_blocks, num_blocks;
1031 
1032 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1033 		return -EINVAL;
1034 	}
1035 
1036 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1037 }
1038 
1039 int
1040 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1041 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1042 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1043 {
1044 	struct spdk_bdev *bdev = desc->bdev;
1045 	struct spdk_bdev_io *bdev_io;
1046 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1047 
1048 	if (!desc->write) {
1049 		return -EBADF;
1050 	}
1051 
1052 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1053 		return -EINVAL;
1054 	}
1055 
1056 	bdev_io = spdk_bdev_get_io();
1057 	if (!bdev_io) {
1058 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
1059 		return -ENOMEM;
1060 	}
1061 
1062 	bdev_io->ch = channel;
1063 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1064 	bdev_io->u.bdev.iov.iov_base = buf;
1065 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1066 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1067 	bdev_io->u.bdev.iovcnt = 1;
1068 	bdev_io->u.bdev.num_blocks = num_blocks;
1069 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1070 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1071 
1072 	spdk_bdev_io_submit(bdev_io);
1073 	return 0;
1074 }
1075 
1076 int
1077 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1078 		 struct iovec *iov, int iovcnt,
1079 		 uint64_t offset, uint64_t len,
1080 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
1081 {
1082 	uint64_t offset_blocks, num_blocks;
1083 
1084 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1085 		return -EINVAL;
1086 	}
1087 
1088 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1089 }
1090 
1091 int
1092 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1093 			struct iovec *iov, int iovcnt,
1094 			uint64_t offset_blocks, uint64_t num_blocks,
1095 			spdk_bdev_io_completion_cb cb, void *cb_arg)
1096 {
1097 	struct spdk_bdev *bdev = desc->bdev;
1098 	struct spdk_bdev_io *bdev_io;
1099 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1100 
1101 	if (!desc->write) {
1102 		return -EBADF;
1103 	}
1104 
1105 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1106 		return -EINVAL;
1107 	}
1108 
1109 	bdev_io = spdk_bdev_get_io();
1110 	if (!bdev_io) {
1111 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
1112 		return -ENOMEM;
1113 	}
1114 
1115 	bdev_io->ch = channel;
1116 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1117 	bdev_io->u.bdev.iovs = iov;
1118 	bdev_io->u.bdev.iovcnt = iovcnt;
1119 	bdev_io->u.bdev.num_blocks = num_blocks;
1120 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1121 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1122 
1123 	spdk_bdev_io_submit(bdev_io);
1124 	return 0;
1125 }
1126 
1127 int
1128 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1129 		       uint64_t offset, uint64_t len,
1130 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1131 {
1132 	uint64_t offset_blocks, num_blocks;
1133 
1134 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1135 		return -EINVAL;
1136 	}
1137 
1138 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1139 }
1140 
1141 int
1142 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1143 			      uint64_t offset_blocks, uint64_t num_blocks,
1144 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1145 {
1146 	struct spdk_bdev *bdev = desc->bdev;
1147 	struct spdk_bdev_io *bdev_io;
1148 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1149 	uint64_t len;
1150 	bool split_request = false;
1151 
1152 	if (num_blocks > UINT64_MAX / spdk_bdev_get_block_size(bdev)) {
1153 		SPDK_ERRLOG("length argument out of range in write_zeroes\n");
1154 		return -ERANGE;
1155 	}
1156 
1157 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1158 		return -EINVAL;
1159 	}
1160 
1161 	bdev_io = spdk_bdev_get_io();
1162 
1163 	if (!bdev_io) {
1164 		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
1165 		return -ENOMEM;
1166 	}
1167 
1168 	bdev_io->ch = channel;
1169 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1170 
1171 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1172 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1173 		bdev_io->u.bdev.num_blocks = num_blocks;
1174 		bdev_io->u.bdev.iovs = NULL;
1175 		bdev_io->u.bdev.iovcnt = 0;
1176 
1177 	} else {
1178 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
1179 
1180 		len = spdk_bdev_get_block_size(bdev) * num_blocks;
1181 
1182 		if (len > ZERO_BUFFER_SIZE) {
1183 			split_request = true;
1184 			len = ZERO_BUFFER_SIZE;
1185 		}
1186 
1187 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1188 		bdev_io->u.bdev.iov.iov_base = g_bdev_mgr.zero_buffer;
1189 		bdev_io->u.bdev.iov.iov_len = len;
1190 		bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1191 		bdev_io->u.bdev.iovcnt = 1;
1192 		bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev);
1193 		bdev_io->split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks;
1194 		bdev_io->split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks;
1195 	}
1196 
1197 	if (split_request) {
1198 		bdev_io->stored_user_cb = cb;
1199 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split);
1200 	} else {
1201 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1202 	}
1203 	spdk_bdev_io_submit(bdev_io);
1204 	return 0;
1205 }
1206 
1207 int
1208 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1209 		uint64_t offset, uint64_t nbytes,
1210 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1211 {
1212 	uint64_t offset_blocks, num_blocks;
1213 
1214 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1215 		return -EINVAL;
1216 	}
1217 
1218 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1219 }
1220 
1221 int
1222 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1223 		       uint64_t offset_blocks, uint64_t num_blocks,
1224 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1225 {
1226 	struct spdk_bdev *bdev = desc->bdev;
1227 	struct spdk_bdev_io *bdev_io;
1228 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1229 
1230 	if (!desc->write) {
1231 		return -EBADF;
1232 	}
1233 
1234 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1235 		return -EINVAL;
1236 	}
1237 
1238 	if (num_blocks == 0) {
1239 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
1240 		return -EINVAL;
1241 	}
1242 
1243 	bdev_io = spdk_bdev_get_io();
1244 	if (!bdev_io) {
1245 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
1246 		return -ENOMEM;
1247 	}
1248 
1249 	bdev_io->ch = channel;
1250 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1251 	bdev_io->u.bdev.iov.iov_base = NULL;
1252 	bdev_io->u.bdev.iov.iov_len = 0;
1253 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1254 	bdev_io->u.bdev.iovcnt = 1;
1255 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1256 	bdev_io->u.bdev.num_blocks = num_blocks;
1257 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1258 
1259 	spdk_bdev_io_submit(bdev_io);
1260 	return 0;
1261 }
1262 
1263 int
1264 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1265 		uint64_t offset, uint64_t length,
1266 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1267 {
1268 	uint64_t offset_blocks, num_blocks;
1269 
1270 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
1271 		return -EINVAL;
1272 	}
1273 
1274 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1275 }
1276 
1277 int
1278 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1279 		       uint64_t offset_blocks, uint64_t num_blocks,
1280 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1281 {
1282 	struct spdk_bdev *bdev = desc->bdev;
1283 	struct spdk_bdev_io *bdev_io;
1284 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1285 
1286 	if (!desc->write) {
1287 		return -EBADF;
1288 	}
1289 
1290 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1291 		return -EINVAL;
1292 	}
1293 
1294 	bdev_io = spdk_bdev_get_io();
1295 	if (!bdev_io) {
1296 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1297 		return -ENOMEM;
1298 	}
1299 
1300 	bdev_io->ch = channel;
1301 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1302 	bdev_io->u.bdev.iovs = NULL;
1303 	bdev_io->u.bdev.iovcnt = 0;
1304 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1305 	bdev_io->u.bdev.num_blocks = num_blocks;
1306 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1307 
1308 	spdk_bdev_io_submit(bdev_io);
1309 	return 0;
1310 }
1311 
1312 static void
1313 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
1314 {
1315 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
1316 	struct spdk_bdev_io *bdev_io;
1317 
1318 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
1319 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, link);
1320 	spdk_bdev_io_submit_reset(bdev_io);
1321 }
1322 
1323 static void
1324 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
1325 {
1326 	struct spdk_io_channel 		*ch;
1327 	struct spdk_bdev_channel	*channel;
1328 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1329 
1330 	ch = spdk_io_channel_iter_get_channel(i);
1331 	channel = spdk_io_channel_get_ctx(ch);
1332 	mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
1333 
1334 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
1335 
1336 	_spdk_bdev_abort_queued_io(&channel->nomem_io, channel);
1337 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
1338 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
1339 
1340 	spdk_for_each_channel_continue(i, 0);
1341 }
1342 
1343 static void
1344 _spdk_bdev_start_reset(void *ctx)
1345 {
1346 	struct spdk_bdev_channel *ch = ctx;
1347 
1348 	spdk_for_each_channel(ch->bdev, _spdk_bdev_reset_freeze_channel,
1349 			      ch, _spdk_bdev_reset_dev);
1350 }
1351 
1352 static void
1353 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
1354 {
1355 	struct spdk_bdev *bdev = ch->bdev;
1356 
1357 	assert(!TAILQ_EMPTY(&ch->queued_resets));
1358 
1359 	pthread_mutex_lock(&bdev->mutex);
1360 	if (bdev->reset_in_progress == NULL) {
1361 		bdev->reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
1362 		/*
1363 		 * Take a channel reference for the target bdev for the life of this
1364 		 *  reset.  This guards against the channel getting destroyed while
1365 		 *  spdk_for_each_channel() calls related to this reset IO are in
1366 		 *  progress.  We will release the reference when this reset is
1367 		 *  completed.
1368 		 */
1369 		bdev->reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(bdev);
1370 		_spdk_bdev_start_reset(ch);
1371 	}
1372 	pthread_mutex_unlock(&bdev->mutex);
1373 }
1374 
1375 int
1376 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1377 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1378 {
1379 	struct spdk_bdev *bdev = desc->bdev;
1380 	struct spdk_bdev_io *bdev_io;
1381 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1382 
1383 	bdev_io = spdk_bdev_get_io();
1384 	if (!bdev_io) {
1385 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1386 		return -ENOMEM;
1387 	}
1388 
1389 	bdev_io->ch = channel;
1390 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1391 	bdev_io->u.reset.ch_ref = NULL;
1392 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1393 
1394 	pthread_mutex_lock(&bdev->mutex);
1395 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, link);
1396 	pthread_mutex_unlock(&bdev->mutex);
1397 
1398 	_spdk_bdev_channel_start_reset(channel);
1399 
1400 	return 0;
1401 }
1402 
1403 void
1404 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1405 		      struct spdk_bdev_io_stat *stat)
1406 {
1407 #ifdef SPDK_CONFIG_VTUNE
1408 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1409 	memset(stat, 0, sizeof(*stat));
1410 	return;
1411 #endif
1412 
1413 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1414 
1415 	*stat = channel->stat;
1416 	memset(&channel->stat, 0, sizeof(channel->stat));
1417 }
1418 
1419 int
1420 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1421 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1422 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1423 {
1424 	struct spdk_bdev *bdev = desc->bdev;
1425 	struct spdk_bdev_io *bdev_io;
1426 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1427 
1428 	if (!desc->write) {
1429 		return -EBADF;
1430 	}
1431 
1432 	bdev_io = spdk_bdev_get_io();
1433 	if (!bdev_io) {
1434 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1435 		return -ENOMEM;
1436 	}
1437 
1438 	bdev_io->ch = channel;
1439 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
1440 	bdev_io->u.nvme_passthru.cmd = *cmd;
1441 	bdev_io->u.nvme_passthru.buf = buf;
1442 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1443 	bdev_io->u.nvme_passthru.md_buf = NULL;
1444 	bdev_io->u.nvme_passthru.md_len = 0;
1445 
1446 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1447 
1448 	spdk_bdev_io_submit(bdev_io);
1449 	return 0;
1450 }
1451 
1452 int
1453 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1454 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1455 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1456 {
1457 	struct spdk_bdev *bdev = desc->bdev;
1458 	struct spdk_bdev_io *bdev_io;
1459 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1460 
1461 	if (!desc->write) {
1462 		/*
1463 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1464 		 *  to easily determine if the command is a read or write, but for now just
1465 		 *  do not allow io_passthru with a read-only descriptor.
1466 		 */
1467 		return -EBADF;
1468 	}
1469 
1470 	bdev_io = spdk_bdev_get_io();
1471 	if (!bdev_io) {
1472 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1473 		return -ENOMEM;
1474 	}
1475 
1476 	bdev_io->ch = channel;
1477 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
1478 	bdev_io->u.nvme_passthru.cmd = *cmd;
1479 	bdev_io->u.nvme_passthru.buf = buf;
1480 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1481 	bdev_io->u.nvme_passthru.md_buf = NULL;
1482 	bdev_io->u.nvme_passthru.md_len = 0;
1483 
1484 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1485 
1486 	spdk_bdev_io_submit(bdev_io);
1487 	return 0;
1488 }
1489 
1490 int
1491 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1492 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
1493 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1494 {
1495 	struct spdk_bdev *bdev = desc->bdev;
1496 	struct spdk_bdev_io *bdev_io;
1497 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1498 
1499 	if (!desc->write) {
1500 		/*
1501 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1502 		 *  to easily determine if the command is a read or write, but for now just
1503 		 *  do not allow io_passthru with a read-only descriptor.
1504 		 */
1505 		return -EBADF;
1506 	}
1507 
1508 	bdev_io = spdk_bdev_get_io();
1509 	if (!bdev_io) {
1510 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1511 		return -ENOMEM;
1512 	}
1513 
1514 	bdev_io->ch = channel;
1515 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
1516 	bdev_io->u.nvme_passthru.cmd = *cmd;
1517 	bdev_io->u.nvme_passthru.buf = buf;
1518 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1519 	bdev_io->u.nvme_passthru.md_buf = md_buf;
1520 	bdev_io->u.nvme_passthru.md_len = md_len;
1521 
1522 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1523 
1524 	spdk_bdev_io_submit(bdev_io);
1525 	return 0;
1526 }
1527 
1528 int
1529 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
1530 {
1531 	if (!bdev_io) {
1532 		SPDK_ERRLOG("bdev_io is NULL\n");
1533 		return -1;
1534 	}
1535 
1536 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
1537 		SPDK_ERRLOG("bdev_io is in pending state\n");
1538 		assert(false);
1539 		return -1;
1540 	}
1541 
1542 	spdk_bdev_put_io(bdev_io);
1543 
1544 	return 0;
1545 }
1546 
1547 static void
1548 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
1549 {
1550 	struct spdk_bdev *bdev = bdev_ch->bdev;
1551 	struct spdk_bdev_io *bdev_io;
1552 
1553 	if (bdev_ch->io_outstanding > bdev_ch->nomem_threshold) {
1554 		/*
1555 		 * Allow some more I/O to complete before retrying the nomem_io queue.
1556 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
1557 		 *  the context of a completion, because the resources for the I/O are
1558 		 *  not released until control returns to the bdev poller.  Also, we
1559 		 *  may require several small I/O to complete before a larger I/O
1560 		 *  (that requires splitting) can be submitted.
1561 		 */
1562 		return;
1563 	}
1564 
1565 	while (!TAILQ_EMPTY(&bdev_ch->nomem_io)) {
1566 		bdev_io = TAILQ_FIRST(&bdev_ch->nomem_io);
1567 		TAILQ_REMOVE(&bdev_ch->nomem_io, bdev_io, link);
1568 		bdev_ch->io_outstanding++;
1569 		bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
1570 		bdev->fn_table->submit_request(bdev_ch->channel, bdev_io);
1571 		if (bdev_io->status == SPDK_BDEV_IO_STATUS_NOMEM) {
1572 			break;
1573 		}
1574 	}
1575 }
1576 
1577 static void
1578 _spdk_bdev_io_complete(void *ctx)
1579 {
1580 	struct spdk_bdev_io *bdev_io = ctx;
1581 
1582 	assert(bdev_io->cb != NULL);
1583 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, bdev_io->caller_ctx);
1584 }
1585 
1586 static void
1587 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
1588 {
1589 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
1590 
1591 	if (bdev_io->u.reset.ch_ref != NULL) {
1592 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
1593 		bdev_io->u.reset.ch_ref = NULL;
1594 	}
1595 
1596 	_spdk_bdev_io_complete(bdev_io);
1597 }
1598 
1599 static void
1600 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
1601 {
1602 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
1603 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
1604 
1605 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
1606 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
1607 		_spdk_bdev_channel_start_reset(ch);
1608 	}
1609 
1610 	spdk_for_each_channel_continue(i, 0);
1611 }
1612 
1613 void
1614 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
1615 {
1616 	struct spdk_bdev *bdev = bdev_io->bdev;
1617 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
1618 
1619 	bdev_io->status = status;
1620 
1621 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
1622 		bool unlock_channels = false;
1623 
1624 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
1625 			SPDK_ERRLOG("NOMEM returned for reset\n");
1626 		}
1627 		pthread_mutex_lock(&bdev->mutex);
1628 		if (bdev_io == bdev->reset_in_progress) {
1629 			bdev->reset_in_progress = NULL;
1630 			unlock_channels = true;
1631 		}
1632 		pthread_mutex_unlock(&bdev->mutex);
1633 
1634 		if (unlock_channels) {
1635 			spdk_for_each_channel(bdev, _spdk_bdev_unfreeze_channel, bdev_io,
1636 					      _spdk_bdev_reset_complete);
1637 			return;
1638 		}
1639 	} else {
1640 		assert(bdev_ch->io_outstanding > 0);
1641 		bdev_ch->io_outstanding--;
1642 		if (spdk_likely(status != SPDK_BDEV_IO_STATUS_NOMEM)) {
1643 			if (spdk_unlikely(!TAILQ_EMPTY(&bdev_ch->nomem_io))) {
1644 				_spdk_bdev_ch_retry_io(bdev_ch);
1645 			}
1646 		} else {
1647 			TAILQ_INSERT_HEAD(&bdev_ch->nomem_io, bdev_io, link);
1648 			/*
1649 			 * Wait for some of the outstanding I/O to complete before we
1650 			 *  retry any of the nomem_io.  Normally we will wait for
1651 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
1652 			 *  depth channels we will instead wait for half to complete.
1653 			 */
1654 			bdev_ch->nomem_threshold = spdk_max(bdev_ch->io_outstanding / 2,
1655 							    bdev_ch->io_outstanding - NOMEM_THRESHOLD_COUNT);
1656 			return;
1657 		}
1658 	}
1659 
1660 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1661 		switch (bdev_io->type) {
1662 		case SPDK_BDEV_IO_TYPE_READ:
1663 			bdev_ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev->blocklen;
1664 			bdev_ch->stat.num_read_ops++;
1665 			break;
1666 		case SPDK_BDEV_IO_TYPE_WRITE:
1667 			bdev_ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev->blocklen;
1668 			bdev_ch->stat.num_write_ops++;
1669 			break;
1670 		default:
1671 			break;
1672 		}
1673 	}
1674 
1675 #ifdef SPDK_CONFIG_VTUNE
1676 	uint64_t now_tsc = spdk_get_ticks();
1677 	if (now_tsc > (bdev_ch->start_tsc + bdev_ch->interval_tsc)) {
1678 		uint64_t data[5];
1679 
1680 		data[0] = bdev_ch->stat.num_read_ops;
1681 		data[1] = bdev_ch->stat.bytes_read;
1682 		data[2] = bdev_ch->stat.num_write_ops;
1683 		data[3] = bdev_ch->stat.bytes_written;
1684 		data[4] = bdev->fn_table->get_spin_time ?
1685 			  bdev->fn_table->get_spin_time(bdev_ch->channel) : 0;
1686 
1687 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_ch->handle,
1688 				   __itt_metadata_u64, 5, data);
1689 
1690 		memset(&bdev_ch->stat, 0, sizeof(bdev_ch->stat));
1691 		bdev_ch->start_tsc = now_tsc;
1692 	}
1693 #endif
1694 
1695 	if (bdev_io->in_submit_request) {
1696 		/*
1697 		 * Defer completion to avoid potential infinite recursion if the
1698 		 * user's completion callback issues a new I/O.
1699 		 */
1700 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_ch->channel),
1701 				     _spdk_bdev_io_complete, bdev_io);
1702 	} else {
1703 		_spdk_bdev_io_complete(bdev_io);
1704 	}
1705 }
1706 
1707 void
1708 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
1709 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
1710 {
1711 	if (sc == SPDK_SCSI_STATUS_GOOD) {
1712 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1713 	} else {
1714 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
1715 		bdev_io->error.scsi.sc = sc;
1716 		bdev_io->error.scsi.sk = sk;
1717 		bdev_io->error.scsi.asc = asc;
1718 		bdev_io->error.scsi.ascq = ascq;
1719 	}
1720 
1721 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1722 }
1723 
1724 void
1725 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
1726 			     int *sc, int *sk, int *asc, int *ascq)
1727 {
1728 	assert(sc != NULL);
1729 	assert(sk != NULL);
1730 	assert(asc != NULL);
1731 	assert(ascq != NULL);
1732 
1733 	switch (bdev_io->status) {
1734 	case SPDK_BDEV_IO_STATUS_SUCCESS:
1735 		*sc = SPDK_SCSI_STATUS_GOOD;
1736 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
1737 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1738 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1739 		break;
1740 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
1741 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
1742 		break;
1743 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
1744 		*sc = bdev_io->error.scsi.sc;
1745 		*sk = bdev_io->error.scsi.sk;
1746 		*asc = bdev_io->error.scsi.asc;
1747 		*ascq = bdev_io->error.scsi.ascq;
1748 		break;
1749 	default:
1750 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
1751 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
1752 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1753 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1754 		break;
1755 	}
1756 }
1757 
1758 void
1759 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
1760 {
1761 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
1762 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1763 	} else {
1764 		bdev_io->error.nvme.sct = sct;
1765 		bdev_io->error.nvme.sc = sc;
1766 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
1767 	}
1768 
1769 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1770 }
1771 
1772 void
1773 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
1774 {
1775 	assert(sct != NULL);
1776 	assert(sc != NULL);
1777 
1778 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
1779 		*sct = bdev_io->error.nvme.sct;
1780 		*sc = bdev_io->error.nvme.sc;
1781 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1782 		*sct = SPDK_NVME_SCT_GENERIC;
1783 		*sc = SPDK_NVME_SC_SUCCESS;
1784 	} else {
1785 		*sct = SPDK_NVME_SCT_GENERIC;
1786 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1787 	}
1788 }
1789 
1790 struct spdk_thread *
1791 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
1792 {
1793 	return spdk_io_channel_get_thread(bdev_io->ch->channel);
1794 }
1795 
1796 static int
1797 _spdk_bdev_register(struct spdk_bdev *bdev)
1798 {
1799 	struct spdk_bdev_module_if *module;
1800 
1801 	assert(bdev->module != NULL);
1802 
1803 	if (!bdev->name) {
1804 		SPDK_ERRLOG("Bdev name is NULL\n");
1805 		return -EINVAL;
1806 	}
1807 
1808 	if (spdk_bdev_get_by_name(bdev->name)) {
1809 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
1810 		return -EEXIST;
1811 	}
1812 
1813 	bdev->status = SPDK_BDEV_STATUS_READY;
1814 
1815 	TAILQ_INIT(&bdev->open_descs);
1816 
1817 	TAILQ_INIT(&bdev->vbdevs);
1818 	TAILQ_INIT(&bdev->base_bdevs);
1819 
1820 	bdev->reset_in_progress = NULL;
1821 
1822 	spdk_io_device_register(bdev, spdk_bdev_channel_create, spdk_bdev_channel_destroy,
1823 				sizeof(struct spdk_bdev_channel));
1824 
1825 	pthread_mutex_init(&bdev->mutex, NULL);
1826 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
1827 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
1828 
1829 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
1830 		if (module->examine) {
1831 			module->action_in_progress++;
1832 			module->examine(bdev);
1833 		}
1834 	}
1835 
1836 	return 0;
1837 }
1838 
1839 int
1840 spdk_bdev_register(struct spdk_bdev *bdev)
1841 {
1842 	return _spdk_bdev_register(bdev);
1843 }
1844 
1845 int
1846 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
1847 {
1848 	int i, rc;
1849 
1850 	rc = _spdk_bdev_register(vbdev);
1851 	if (rc) {
1852 		return rc;
1853 	}
1854 
1855 	for (i = 0; i < base_bdev_count; i++) {
1856 		assert(base_bdevs[i] != NULL);
1857 		TAILQ_INSERT_TAIL(&vbdev->base_bdevs, base_bdevs[i], base_bdev_link);
1858 		TAILQ_INSERT_TAIL(&base_bdevs[i]->vbdevs, vbdev, vbdev_link);
1859 	}
1860 
1861 	return 0;
1862 }
1863 
1864 void
1865 spdk_bdev_unregister_done(struct spdk_bdev *bdev, int bdeverrno)
1866 {
1867 	if (bdev->unregister_cb != NULL) {
1868 		bdev->unregister_cb(bdev->unregister_ctx, bdeverrno);
1869 	}
1870 }
1871 
1872 void
1873 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
1874 {
1875 	struct spdk_bdev_desc	*desc, *tmp;
1876 	int			rc;
1877 	bool			do_destruct = true;
1878 
1879 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
1880 
1881 	pthread_mutex_lock(&bdev->mutex);
1882 
1883 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
1884 	bdev->unregister_cb = cb_fn;
1885 	bdev->unregister_ctx = cb_arg;
1886 
1887 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
1888 		if (desc->remove_cb) {
1889 			pthread_mutex_unlock(&bdev->mutex);
1890 			do_destruct = false;
1891 			desc->remove_cb(desc->remove_ctx);
1892 			pthread_mutex_lock(&bdev->mutex);
1893 		}
1894 	}
1895 
1896 	if (!do_destruct) {
1897 		pthread_mutex_unlock(&bdev->mutex);
1898 		return;
1899 	}
1900 
1901 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
1902 	pthread_mutex_unlock(&bdev->mutex);
1903 
1904 	pthread_mutex_destroy(&bdev->mutex);
1905 
1906 	spdk_io_device_unregister(bdev, NULL);
1907 
1908 	rc = bdev->fn_table->destruct(bdev->ctxt);
1909 	if (rc < 0) {
1910 		SPDK_ERRLOG("destruct failed\n");
1911 	}
1912 	if (rc <= 0 && cb_fn != NULL) {
1913 		cb_fn(cb_arg, rc);
1914 	}
1915 }
1916 
1917 void
1918 spdk_vbdev_unregister(struct spdk_bdev *vbdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
1919 {
1920 	struct spdk_bdev *base_bdev;
1921 
1922 	assert(!TAILQ_EMPTY(&vbdev->base_bdevs));
1923 	TAILQ_FOREACH(base_bdev, &vbdev->base_bdevs, base_bdev_link) {
1924 		TAILQ_REMOVE(&base_bdev->vbdevs, vbdev, vbdev_link);
1925 	}
1926 	spdk_bdev_unregister(vbdev, cb_fn, cb_arg);
1927 }
1928 
1929 int
1930 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
1931 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
1932 {
1933 	struct spdk_bdev_desc *desc;
1934 
1935 	desc = calloc(1, sizeof(*desc));
1936 	if (desc == NULL) {
1937 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
1938 		return -ENOMEM;
1939 	}
1940 
1941 	pthread_mutex_lock(&bdev->mutex);
1942 
1943 	if (write && bdev->claim_module) {
1944 		SPDK_INFOLOG(SPDK_LOG_BDEV, "Could not open %s - already claimed\n", bdev->name);
1945 		free(desc);
1946 		pthread_mutex_unlock(&bdev->mutex);
1947 		return -EPERM;
1948 	}
1949 
1950 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
1951 
1952 	desc->bdev = bdev;
1953 	desc->remove_cb = remove_cb;
1954 	desc->remove_ctx = remove_ctx;
1955 	desc->write = write;
1956 	*_desc = desc;
1957 
1958 	pthread_mutex_unlock(&bdev->mutex);
1959 
1960 	return 0;
1961 }
1962 
1963 void
1964 spdk_bdev_close(struct spdk_bdev_desc *desc)
1965 {
1966 	struct spdk_bdev *bdev = desc->bdev;
1967 	bool do_unregister = false;
1968 
1969 	pthread_mutex_lock(&bdev->mutex);
1970 
1971 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
1972 	free(desc);
1973 
1974 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
1975 		do_unregister = true;
1976 	}
1977 	pthread_mutex_unlock(&bdev->mutex);
1978 
1979 	if (do_unregister == true) {
1980 		spdk_bdev_unregister(bdev, bdev->unregister_cb, bdev->unregister_ctx);
1981 	}
1982 }
1983 
1984 int
1985 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
1986 			    struct spdk_bdev_module_if *module)
1987 {
1988 	if (bdev->claim_module != NULL) {
1989 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
1990 			    bdev->claim_module->name);
1991 		return -EPERM;
1992 	}
1993 
1994 	if (desc && !desc->write) {
1995 		desc->write = true;
1996 	}
1997 
1998 	bdev->claim_module = module;
1999 	return 0;
2000 }
2001 
2002 void
2003 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
2004 {
2005 	assert(bdev->claim_module != NULL);
2006 	bdev->claim_module = NULL;
2007 }
2008 
2009 struct spdk_bdev *
2010 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
2011 {
2012 	return desc->bdev;
2013 }
2014 
2015 void
2016 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
2017 {
2018 	struct iovec *iovs;
2019 	int iovcnt;
2020 
2021 	if (bdev_io == NULL) {
2022 		return;
2023 	}
2024 
2025 	switch (bdev_io->type) {
2026 	case SPDK_BDEV_IO_TYPE_READ:
2027 		iovs = bdev_io->u.bdev.iovs;
2028 		iovcnt = bdev_io->u.bdev.iovcnt;
2029 		break;
2030 	case SPDK_BDEV_IO_TYPE_WRITE:
2031 		iovs = bdev_io->u.bdev.iovs;
2032 		iovcnt = bdev_io->u.bdev.iovcnt;
2033 		break;
2034 	default:
2035 		iovs = NULL;
2036 		iovcnt = 0;
2037 		break;
2038 	}
2039 
2040 	if (iovp) {
2041 		*iovp = iovs;
2042 	}
2043 	if (iovcntp) {
2044 		*iovcntp = iovcnt;
2045 	}
2046 }
2047 
2048 void
2049 spdk_bdev_module_list_add(struct spdk_bdev_module_if *bdev_module)
2050 {
2051 	/*
2052 	 * Modules with examine callbacks must be initialized first, so they are
2053 	 *  ready to handle examine callbacks from later modules that will
2054 	 *  register physical bdevs.
2055 	 */
2056 	if (bdev_module->examine != NULL) {
2057 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2058 	} else {
2059 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2060 	}
2061 }
2062 
2063 void
2064 spdk_bdev_part_base_free(struct spdk_bdev_part_base *base)
2065 {
2066 	if (base->desc) {
2067 		spdk_bdev_close(base->desc);
2068 		base->desc = NULL;
2069 	}
2070 	base->base_free_fn(base);
2071 }
2072 
2073 void
2074 spdk_bdev_part_free(struct spdk_bdev_part *part)
2075 {
2076 	struct spdk_bdev_part_base *base;
2077 
2078 	assert(part);
2079 	assert(part->base);
2080 
2081 	base = part->base;
2082 	spdk_io_device_unregister(&part->base, NULL);
2083 	TAILQ_REMOVE(base->tailq, part, tailq);
2084 	free(part->bdev.name);
2085 	free(part);
2086 
2087 	if (__sync_sub_and_fetch(&base->ref, 1) == 0) {
2088 		spdk_bdev_module_release_bdev(base->bdev);
2089 		spdk_bdev_part_base_free(base);
2090 	}
2091 }
2092 
2093 void
2094 spdk_bdev_part_tailq_fini(struct bdev_part_tailq *tailq)
2095 {
2096 	struct spdk_bdev_part *part, *tmp;
2097 
2098 	TAILQ_FOREACH_SAFE(part, tailq, tailq, tmp) {
2099 		spdk_bdev_part_free(part);
2100 	}
2101 }
2102 
2103 void
2104 spdk_bdev_part_base_hotremove(struct spdk_bdev *base_bdev, struct bdev_part_tailq *tailq)
2105 {
2106 	struct spdk_bdev_part *part, *tmp;
2107 
2108 	TAILQ_FOREACH_SAFE(part, tailq, tailq, tmp) {
2109 		if (part->base->bdev == base_bdev) {
2110 			spdk_vbdev_unregister(&part->bdev, NULL, NULL);
2111 		}
2112 	}
2113 }
2114 
2115 static bool
2116 spdk_bdev_part_io_type_supported(void *_part, enum spdk_bdev_io_type io_type)
2117 {
2118 	struct spdk_bdev_part *part = _part;
2119 
2120 	return part->base->bdev->fn_table->io_type_supported(part->base->bdev, io_type);
2121 }
2122 
2123 static struct spdk_io_channel *
2124 spdk_bdev_part_get_io_channel(void *_part)
2125 {
2126 	struct spdk_bdev_part *part = _part;
2127 
2128 	return spdk_get_io_channel(&part->base);
2129 }
2130 
2131 static void
2132 spdk_bdev_part_complete_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2133 {
2134 	struct spdk_bdev_io *part_io = cb_arg;
2135 	int status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
2136 
2137 	spdk_bdev_io_complete(part_io, status);
2138 	spdk_bdev_free_io(bdev_io);
2139 }
2140 
2141 static void
2142 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2143 {
2144 	uint64_t len;
2145 
2146 	if (!success) {
2147 		bdev_io->cb = bdev_io->stored_user_cb;
2148 		_spdk_bdev_io_complete(bdev_io);
2149 		return;
2150 	}
2151 
2152 	/* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */
2153 	len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->split_remaining_num_blocks,
2154 		       ZERO_BUFFER_SIZE);
2155 
2156 	bdev_io->u.bdev.offset_blocks = bdev_io->split_current_offset_blocks;
2157 	bdev_io->u.bdev.iov.iov_len = len;
2158 	bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev);
2159 	bdev_io->split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks;
2160 	bdev_io->split_current_offset_blocks += bdev_io->u.bdev.num_blocks;
2161 
2162 	/* if this round completes the i/o, change the callback to be the original user callback */
2163 	if (bdev_io->split_remaining_num_blocks == 0) {
2164 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->stored_user_cb);
2165 	} else {
2166 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split);
2167 	}
2168 	spdk_bdev_io_submit(bdev_io);
2169 }
2170 
2171 void
2172 spdk_bdev_part_submit_request(struct spdk_bdev_part_channel *ch, struct spdk_bdev_io *bdev_io)
2173 {
2174 	struct spdk_bdev_part *part = ch->part;
2175 	struct spdk_io_channel *base_ch = ch->base_ch;
2176 	struct spdk_bdev_desc *base_desc = part->base->desc;
2177 	uint64_t offset;
2178 	int rc = 0;
2179 
2180 	/* Modify the I/O to adjust for the offset within the base bdev. */
2181 	switch (bdev_io->type) {
2182 	case SPDK_BDEV_IO_TYPE_READ:
2183 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2184 		rc = spdk_bdev_readv_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
2185 					    bdev_io->u.bdev.iovcnt, offset,
2186 					    bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
2187 					    bdev_io);
2188 		break;
2189 	case SPDK_BDEV_IO_TYPE_WRITE:
2190 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2191 		rc = spdk_bdev_writev_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
2192 					     bdev_io->u.bdev.iovcnt, offset,
2193 					     bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
2194 					     bdev_io);
2195 		break;
2196 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
2197 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2198 		rc = spdk_bdev_write_zeroes_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2199 						   spdk_bdev_part_complete_io, bdev_io);
2200 		break;
2201 	case SPDK_BDEV_IO_TYPE_UNMAP:
2202 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2203 		rc = spdk_bdev_unmap_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2204 					    spdk_bdev_part_complete_io, bdev_io);
2205 		break;
2206 	case SPDK_BDEV_IO_TYPE_FLUSH:
2207 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2208 		rc = spdk_bdev_flush_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2209 					    spdk_bdev_part_complete_io, bdev_io);
2210 		break;
2211 	case SPDK_BDEV_IO_TYPE_RESET:
2212 		rc = spdk_bdev_reset(base_desc, base_ch,
2213 				     spdk_bdev_part_complete_io, bdev_io);
2214 		break;
2215 	default:
2216 		SPDK_ERRLOG("split: unknown I/O type %d\n", bdev_io->type);
2217 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
2218 		return;
2219 	}
2220 
2221 	if (rc != 0) {
2222 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
2223 	}
2224 }
2225 static int
2226 spdk_bdev_part_channel_create_cb(void *io_device, void *ctx_buf)
2227 {
2228 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
2229 	struct spdk_bdev_part_channel *ch = ctx_buf;
2230 
2231 	ch->part = part;
2232 	ch->base_ch = spdk_bdev_get_io_channel(part->base->desc);
2233 	if (ch->base_ch == NULL) {
2234 		return -1;
2235 	}
2236 
2237 	if (part->base->ch_create_cb) {
2238 		return part->base->ch_create_cb(io_device, ctx_buf);
2239 	} else {
2240 		return 0;
2241 	}
2242 }
2243 
2244 static void
2245 spdk_bdev_part_channel_destroy_cb(void *io_device, void *ctx_buf)
2246 {
2247 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
2248 	struct spdk_bdev_part_channel *ch = ctx_buf;
2249 
2250 	if (part->base->ch_destroy_cb) {
2251 		part->base->ch_destroy_cb(io_device, ctx_buf);
2252 	}
2253 	spdk_put_io_channel(ch->base_ch);
2254 }
2255 
2256 int
2257 spdk_bdev_part_base_construct(struct spdk_bdev_part_base *base, struct spdk_bdev *bdev,
2258 			      spdk_bdev_remove_cb_t remove_cb, struct spdk_bdev_module_if *module,
2259 			      struct spdk_bdev_fn_table *fn_table, struct bdev_part_tailq *tailq,
2260 			      spdk_bdev_part_base_free_fn free_fn,
2261 			      uint32_t channel_size, spdk_io_channel_create_cb ch_create_cb,
2262 			      spdk_io_channel_destroy_cb ch_destroy_cb)
2263 {
2264 	int rc;
2265 
2266 	fn_table->get_io_channel = spdk_bdev_part_get_io_channel;
2267 	fn_table->io_type_supported = spdk_bdev_part_io_type_supported;
2268 
2269 	base->bdev = bdev;
2270 	base->desc = NULL;
2271 	base->ref = 0;
2272 	base->module = module;
2273 	base->fn_table = fn_table;
2274 	base->tailq = tailq;
2275 	base->claimed = false;
2276 	base->channel_size = channel_size;
2277 	base->ch_create_cb = ch_create_cb;
2278 	base->ch_destroy_cb = ch_destroy_cb;
2279 	base->base_free_fn = free_fn;
2280 
2281 	rc = spdk_bdev_open(bdev, false, remove_cb, bdev, &base->desc);
2282 	if (rc) {
2283 		spdk_bdev_part_base_free(base);
2284 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev));
2285 		return -1;
2286 	}
2287 
2288 	return 0;
2289 }
2290 
2291 int
2292 spdk_bdev_part_construct(struct spdk_bdev_part *part, struct spdk_bdev_part_base *base,
2293 			 char *name, uint64_t offset_blocks, uint64_t num_blocks,
2294 			 char *product_name)
2295 {
2296 	part->bdev.name = name;
2297 	part->bdev.blocklen = base->bdev->blocklen;
2298 	part->bdev.blockcnt = num_blocks;
2299 	part->offset_blocks = offset_blocks;
2300 
2301 	part->bdev.write_cache = base->bdev->write_cache;
2302 	part->bdev.need_aligned_buffer = base->bdev->need_aligned_buffer;
2303 	part->bdev.product_name = product_name;
2304 	part->bdev.ctxt = part;
2305 	part->bdev.module = base->module;
2306 	part->bdev.fn_table = base->fn_table;
2307 
2308 	__sync_fetch_and_add(&base->ref, 1);
2309 	part->base = base;
2310 
2311 	if (!base->claimed) {
2312 		int rc;
2313 
2314 		rc = spdk_bdev_module_claim_bdev(base->bdev, base->desc, base->module);
2315 		if (rc) {
2316 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(base->bdev));
2317 			free(part->bdev.name);
2318 			return -1;
2319 		}
2320 		base->claimed = true;
2321 	}
2322 
2323 	spdk_io_device_register(&part->base, spdk_bdev_part_channel_create_cb,
2324 				spdk_bdev_part_channel_destroy_cb,
2325 				base->channel_size);
2326 	spdk_vbdev_register(&part->bdev, &base->bdev, 1);
2327 	TAILQ_INSERT_TAIL(base->tailq, part, tailq);
2328 
2329 	return 0;
2330 }
2331 
2332 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
2333