xref: /spdk/lib/bdev/bdev.c (revision 1f935c7a9b5b930feffd7ce2598a842d39def5a4)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/likely.h"
42 #include "spdk/queue.h"
43 #include "spdk/nvme_spec.h"
44 #include "spdk/scsi_spec.h"
45 #include "spdk/util.h"
46 
47 #include "spdk_internal/bdev.h"
48 #include "spdk_internal/log.h"
49 #include "spdk/string.h"
50 
51 #ifdef SPDK_CONFIG_VTUNE
52 #include "ittnotify.h"
53 #include "ittnotify_types.h"
54 int __itt_init_ittlib(const char *, __itt_group_id);
55 #endif
56 
57 #define SPDK_BDEV_IO_POOL_SIZE	(64 * 1024)
58 #define BUF_SMALL_POOL_SIZE	8192
59 #define BUF_LARGE_POOL_SIZE	1024
60 
61 typedef TAILQ_HEAD(, spdk_bdev_io) bdev_io_tailq_t;
62 
63 struct spdk_bdev_mgr {
64 	struct spdk_mempool *bdev_io_pool;
65 
66 	struct spdk_mempool *buf_small_pool;
67 	struct spdk_mempool *buf_large_pool;
68 
69 	TAILQ_HEAD(, spdk_bdev_module_if) bdev_modules;
70 
71 	TAILQ_HEAD(, spdk_bdev) bdevs;
72 
73 	spdk_bdev_poller_start_cb start_poller_fn;
74 	spdk_bdev_poller_stop_cb stop_poller_fn;
75 
76 	bool init_complete;
77 	bool module_init_complete;
78 
79 #ifdef SPDK_CONFIG_VTUNE
80 	__itt_domain	*domain;
81 #endif
82 };
83 
84 static struct spdk_bdev_mgr g_bdev_mgr = {
85 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
86 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
87 	.start_poller_fn = NULL,
88 	.stop_poller_fn = NULL,
89 	.init_complete = false,
90 	.module_init_complete = false,
91 };
92 
93 static spdk_bdev_init_cb	g_cb_fn = NULL;
94 static void			*g_cb_arg = NULL;
95 
96 
97 struct spdk_bdev_mgmt_channel {
98 	bdev_io_tailq_t need_buf_small;
99 	bdev_io_tailq_t need_buf_large;
100 };
101 
102 struct spdk_bdev_desc {
103 	struct spdk_bdev		*bdev;
104 	spdk_bdev_remove_cb_t		remove_cb;
105 	void				*remove_ctx;
106 	bool				write;
107 	TAILQ_ENTRY(spdk_bdev_desc)	link;
108 };
109 
110 struct spdk_bdev_channel {
111 	struct spdk_bdev	*bdev;
112 
113 	/* The channel for the underlying device */
114 	struct spdk_io_channel	*channel;
115 
116 	/* Channel for the bdev manager */
117 	struct spdk_io_channel *mgmt_channel;
118 
119 	struct spdk_bdev_io_stat stat;
120 
121 	/*
122 	 * Count of I/O submitted to bdev module and waiting for completion.
123 	 * Incremented before submit_request() is called on an spdk_bdev_io.
124 	 */
125 	uint64_t		io_outstanding;
126 
127 	bdev_io_tailq_t		queued_resets;
128 
129 #ifdef SPDK_CONFIG_VTUNE
130 	uint64_t		start_tsc;
131 	uint64_t		interval_tsc;
132 	__itt_string_handle	*handle;
133 #endif
134 
135 };
136 
137 struct spdk_bdev *
138 spdk_bdev_first(void)
139 {
140 	struct spdk_bdev *bdev;
141 
142 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
143 	if (bdev) {
144 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Starting bdev iteration at %s\n", bdev->name);
145 	}
146 
147 	return bdev;
148 }
149 
150 struct spdk_bdev *
151 spdk_bdev_next(struct spdk_bdev *prev)
152 {
153 	struct spdk_bdev *bdev;
154 
155 	bdev = TAILQ_NEXT(prev, link);
156 	if (bdev) {
157 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
158 	}
159 
160 	return bdev;
161 }
162 
163 static struct spdk_bdev *
164 _bdev_next_leaf(struct spdk_bdev *bdev)
165 {
166 	while (bdev != NULL) {
167 		if (TAILQ_EMPTY(&bdev->vbdevs)) {
168 			return bdev;
169 		} else {
170 			bdev = TAILQ_NEXT(bdev, link);
171 		}
172 	}
173 
174 	return bdev;
175 }
176 
177 struct spdk_bdev *
178 spdk_bdev_first_leaf(void)
179 {
180 	struct spdk_bdev *bdev;
181 
182 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
183 
184 	if (bdev) {
185 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Starting bdev iteration at %s\n", bdev->name);
186 	}
187 
188 	return bdev;
189 }
190 
191 struct spdk_bdev *
192 spdk_bdev_next_leaf(struct spdk_bdev *prev)
193 {
194 	struct spdk_bdev *bdev;
195 
196 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
197 
198 	if (bdev) {
199 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
200 	}
201 
202 	return bdev;
203 }
204 
205 struct spdk_bdev *
206 spdk_bdev_get_by_name(const char *bdev_name)
207 {
208 	struct spdk_bdev *bdev = spdk_bdev_first();
209 
210 	while (bdev != NULL) {
211 		if (strcmp(bdev_name, bdev->name) == 0) {
212 			return bdev;
213 		}
214 		bdev = spdk_bdev_next(bdev);
215 	}
216 
217 	return NULL;
218 }
219 
220 static void
221 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
222 {
223 	assert(bdev_io->get_buf_cb != NULL);
224 	assert(buf != NULL);
225 	assert(bdev_io->u.bdev.iovs != NULL);
226 
227 	bdev_io->buf = buf;
228 	bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
229 	bdev_io->u.bdev.iovs[0].iov_len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
230 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
231 }
232 
233 static void
234 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
235 {
236 	struct spdk_mempool *pool;
237 	struct spdk_bdev_io *tmp;
238 	void *buf;
239 	bdev_io_tailq_t *tailq;
240 	uint64_t length;
241 	struct spdk_bdev_mgmt_channel *ch;
242 
243 	assert(bdev_io->u.bdev.iovcnt == 1);
244 
245 	length = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
246 	buf = bdev_io->buf;
247 
248 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
249 
250 	if (length <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
251 		pool = g_bdev_mgr.buf_small_pool;
252 		tailq = &ch->need_buf_small;
253 	} else {
254 		pool = g_bdev_mgr.buf_large_pool;
255 		tailq = &ch->need_buf_large;
256 	}
257 
258 	if (TAILQ_EMPTY(tailq)) {
259 		spdk_mempool_put(pool, buf);
260 	} else {
261 		tmp = TAILQ_FIRST(tailq);
262 		TAILQ_REMOVE(tailq, tmp, buf_link);
263 		spdk_bdev_io_set_buf(tmp, buf);
264 	}
265 }
266 
267 void
268 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb)
269 {
270 	uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
271 	struct spdk_mempool *pool;
272 	bdev_io_tailq_t *tailq;
273 	void *buf = NULL;
274 	struct spdk_bdev_mgmt_channel *ch;
275 
276 	assert(cb != NULL);
277 	assert(bdev_io->u.bdev.iovs != NULL);
278 
279 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
280 		/* Buffer already present */
281 		cb(bdev_io->ch->channel, bdev_io);
282 		return;
283 	}
284 
285 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
286 
287 	bdev_io->get_buf_cb = cb;
288 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
289 		pool = g_bdev_mgr.buf_small_pool;
290 		tailq = &ch->need_buf_small;
291 	} else {
292 		pool = g_bdev_mgr.buf_large_pool;
293 		tailq = &ch->need_buf_large;
294 	}
295 
296 	buf = spdk_mempool_get(pool);
297 
298 	if (!buf) {
299 		TAILQ_INSERT_TAIL(tailq, bdev_io, buf_link);
300 	} else {
301 		spdk_bdev_io_set_buf(bdev_io, buf);
302 	}
303 }
304 
305 static int
306 spdk_bdev_module_get_max_ctx_size(void)
307 {
308 	struct spdk_bdev_module_if *bdev_module;
309 	int max_bdev_module_size = 0;
310 
311 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
312 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
313 			max_bdev_module_size = bdev_module->get_ctx_size();
314 		}
315 	}
316 
317 	return max_bdev_module_size;
318 }
319 
320 void
321 spdk_bdev_config_text(FILE *fp)
322 {
323 	struct spdk_bdev_module_if *bdev_module;
324 
325 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
326 		if (bdev_module->config_text) {
327 			bdev_module->config_text(fp);
328 		}
329 	}
330 }
331 
332 static int
333 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
334 {
335 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
336 
337 	TAILQ_INIT(&ch->need_buf_small);
338 	TAILQ_INIT(&ch->need_buf_large);
339 
340 	return 0;
341 }
342 
343 static void
344 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
345 {
346 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
347 
348 	if (!TAILQ_EMPTY(&ch->need_buf_small) || !TAILQ_EMPTY(&ch->need_buf_large)) {
349 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel destruction\n");
350 	}
351 }
352 
353 static void
354 spdk_bdev_init_complete(int rc)
355 {
356 	spdk_bdev_init_cb cb_fn = g_cb_fn;
357 	void *cb_arg = g_cb_arg;
358 
359 	g_bdev_mgr.init_complete = true;
360 	g_cb_fn = NULL;
361 	g_cb_arg = NULL;
362 
363 	cb_fn(cb_arg, rc);
364 }
365 
366 static void
367 spdk_bdev_module_action_complete(void)
368 {
369 	struct spdk_bdev_module_if *m;
370 
371 	/*
372 	 * Don't finish bdev subsystem initialization if
373 	 * module pre-initialization is still in progress, or
374 	 * the subsystem been already initialized.
375 	 */
376 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
377 		return;
378 	}
379 
380 	/*
381 	 * Check all bdev modules for inits/examinations in progress. If any
382 	 * exist, return immediately since we cannot finish bdev subsystem
383 	 * initialization until all are completed.
384 	 */
385 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
386 		if (m->action_in_progress > 0) {
387 			return;
388 		}
389 	}
390 
391 	/*
392 	 * Modules already finished initialization - now that all
393 	 * the bdev modules have finished their asynchronous I/O
394 	 * processing, the entire bdev layer can be marked as complete.
395 	 */
396 	spdk_bdev_init_complete(0);
397 }
398 
399 static void
400 spdk_bdev_module_action_done(struct spdk_bdev_module_if *module)
401 {
402 	assert(module->action_in_progress > 0);
403 	module->action_in_progress--;
404 	spdk_bdev_module_action_complete();
405 }
406 
407 void
408 spdk_bdev_module_init_done(struct spdk_bdev_module_if *module)
409 {
410 	spdk_bdev_module_action_done(module);
411 }
412 
413 void
414 spdk_bdev_module_examine_done(struct spdk_bdev_module_if *module)
415 {
416 	spdk_bdev_module_action_done(module);
417 }
418 
419 static int
420 spdk_bdev_modules_init(void)
421 {
422 	struct spdk_bdev_module_if *module;
423 	int rc = 0;
424 
425 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
426 		rc = module->module_init();
427 		if (rc != 0) {
428 			break;
429 		}
430 	}
431 
432 	g_bdev_mgr.module_init_complete = true;
433 	return rc;
434 }
435 
436 void
437 spdk_bdev_poller_start(struct spdk_bdev_poller **ppoller,
438 		       spdk_bdev_poller_fn fn,
439 		       void *arg,
440 		       uint32_t lcore,
441 		       uint64_t period_microseconds)
442 {
443 	g_bdev_mgr.start_poller_fn(ppoller, fn, arg, lcore, period_microseconds);
444 }
445 
446 void
447 spdk_bdev_poller_stop(struct spdk_bdev_poller **ppoller)
448 {
449 	g_bdev_mgr.stop_poller_fn(ppoller);
450 }
451 
452 void
453 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg,
454 		     spdk_bdev_poller_start_cb start_poller_fn,
455 		     spdk_bdev_poller_stop_cb stop_poller_fn)
456 {
457 	int cache_size;
458 	int rc = 0;
459 	char mempool_name[32];
460 
461 	assert(cb_fn != NULL);
462 
463 	g_cb_fn = cb_fn;
464 	g_cb_arg = cb_arg;
465 
466 	g_bdev_mgr.start_poller_fn = start_poller_fn;
467 	g_bdev_mgr.stop_poller_fn = stop_poller_fn;
468 
469 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
470 
471 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
472 				  SPDK_BDEV_IO_POOL_SIZE,
473 				  sizeof(struct spdk_bdev_io) +
474 				  spdk_bdev_module_get_max_ctx_size(),
475 				  64,
476 				  SPDK_ENV_SOCKET_ID_ANY);
477 
478 	if (g_bdev_mgr.bdev_io_pool == NULL) {
479 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
480 		spdk_bdev_init_complete(-1);
481 		return;
482 	}
483 
484 	/**
485 	 * Ensure no more than half of the total buffers end up local caches, by
486 	 *   using spdk_env_get_core_count() to determine how many local caches we need
487 	 *   to account for.
488 	 */
489 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
490 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
491 
492 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
493 				    BUF_SMALL_POOL_SIZE,
494 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
495 				    cache_size,
496 				    SPDK_ENV_SOCKET_ID_ANY);
497 	if (!g_bdev_mgr.buf_small_pool) {
498 		SPDK_ERRLOG("create rbuf small pool failed\n");
499 		spdk_bdev_init_complete(-1);
500 		return;
501 	}
502 
503 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
504 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
505 
506 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
507 				    BUF_LARGE_POOL_SIZE,
508 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
509 				    cache_size,
510 				    SPDK_ENV_SOCKET_ID_ANY);
511 	if (!g_bdev_mgr.buf_large_pool) {
512 		SPDK_ERRLOG("create rbuf large pool failed\n");
513 		spdk_bdev_init_complete(-1);
514 		return;
515 	}
516 
517 #ifdef SPDK_CONFIG_VTUNE
518 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
519 #endif
520 
521 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
522 				spdk_bdev_mgmt_channel_destroy,
523 				sizeof(struct spdk_bdev_mgmt_channel));
524 
525 	rc = spdk_bdev_modules_init();
526 	if (rc != 0) {
527 		SPDK_ERRLOG("bdev modules init failed\n");
528 		spdk_bdev_init_complete(-1);
529 		return;
530 	}
531 
532 	spdk_bdev_module_action_complete();
533 }
534 
535 int
536 spdk_bdev_finish(void)
537 {
538 	struct spdk_bdev_module_if *bdev_module;
539 
540 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
541 		if (bdev_module->module_fini) {
542 			bdev_module->module_fini();
543 		}
544 	}
545 
546 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
547 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
548 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
549 			    SPDK_BDEV_IO_POOL_SIZE);
550 	}
551 
552 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
553 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
554 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
555 			    BUF_SMALL_POOL_SIZE);
556 		assert(false);
557 	}
558 
559 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
560 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
561 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
562 			    BUF_LARGE_POOL_SIZE);
563 		assert(false);
564 	}
565 
566 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
567 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
568 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
569 
570 	spdk_io_device_unregister(&g_bdev_mgr, NULL);
571 
572 	return 0;
573 }
574 
575 struct spdk_bdev_io *
576 spdk_bdev_get_io(void)
577 {
578 	struct spdk_bdev_io *bdev_io;
579 
580 	bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
581 	if (!bdev_io) {
582 		SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
583 		abort();
584 	}
585 
586 	memset(bdev_io, 0, offsetof(struct spdk_bdev_io, u));
587 
588 	return bdev_io;
589 }
590 
591 static void
592 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
593 {
594 	if (!bdev_io) {
595 		return;
596 	}
597 
598 	if (bdev_io->buf != NULL) {
599 		spdk_bdev_io_put_buf(bdev_io);
600 	}
601 
602 	spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
603 }
604 
605 static void
606 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
607 {
608 	struct spdk_bdev *bdev = bdev_io->bdev;
609 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
610 	struct spdk_io_channel *ch = bdev_ch->channel;
611 
612 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
613 
614 	bdev_ch->io_outstanding++;
615 	bdev_io->in_submit_request = true;
616 	bdev->fn_table->submit_request(ch, bdev_io);
617 	bdev_io->in_submit_request = false;
618 }
619 
620 static void
621 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
622 {
623 	struct spdk_bdev *bdev = bdev_io->bdev;
624 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
625 	struct spdk_io_channel *ch = bdev_ch->channel;
626 
627 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
628 
629 	bdev_io->in_submit_request = true;
630 	bdev->fn_table->submit_request(ch, bdev_io);
631 	bdev_io->in_submit_request = false;
632 }
633 
634 static void
635 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
636 		  struct spdk_bdev *bdev, void *cb_arg,
637 		  spdk_bdev_io_completion_cb cb)
638 {
639 	bdev_io->bdev = bdev;
640 	bdev_io->caller_ctx = cb_arg;
641 	bdev_io->cb = cb;
642 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
643 	bdev_io->in_submit_request = false;
644 }
645 
646 bool
647 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
648 {
649 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
650 }
651 
652 int
653 spdk_bdev_dump_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
654 {
655 	if (bdev->fn_table->dump_config_json) {
656 		return bdev->fn_table->dump_config_json(bdev->ctxt, w);
657 	}
658 
659 	return 0;
660 }
661 
662 static int
663 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
664 {
665 	struct spdk_bdev		*bdev = io_device;
666 	struct spdk_bdev_channel	*ch = ctx_buf;
667 
668 	ch->bdev = io_device;
669 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
670 	ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr);
671 	memset(&ch->stat, 0, sizeof(ch->stat));
672 	ch->io_outstanding = 0;
673 	TAILQ_INIT(&ch->queued_resets);
674 
675 #ifdef SPDK_CONFIG_VTUNE
676 	{
677 		char *name;
678 		__itt_init_ittlib(NULL, 0);
679 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
680 		if (!name) {
681 			return -1;
682 		}
683 		ch->handle = __itt_string_handle_create(name);
684 		free(name);
685 		ch->start_tsc = spdk_get_ticks();
686 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
687 	}
688 #endif
689 
690 	return 0;
691 }
692 
693 /*
694  * Abort I/O that are waiting on a data buffer.  These types of I/O are
695  *  linked using the spdk_bdev_io buf_link TAILQ_ENTRY.
696  */
697 static void
698 _spdk_bdev_abort_buf_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
699 {
700 	struct spdk_bdev_io *bdev_io, *tmp;
701 
702 	TAILQ_FOREACH_SAFE(bdev_io, queue, buf_link, tmp) {
703 		if (bdev_io->ch == ch) {
704 			TAILQ_REMOVE(queue, bdev_io, buf_link);
705 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
706 		}
707 	}
708 }
709 
710 /*
711  * Abort I/O that are queued waiting for submission.  These types of I/O are
712  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
713  */
714 static void
715 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
716 {
717 	struct spdk_bdev_io *bdev_io, *tmp;
718 
719 	TAILQ_FOREACH_SAFE(bdev_io, queue, link, tmp) {
720 		if (bdev_io->ch == ch) {
721 			TAILQ_REMOVE(queue, bdev_io, link);
722 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
723 		}
724 	}
725 }
726 
727 static void
728 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
729 {
730 	struct spdk_bdev_channel	*ch = ctx_buf;
731 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
732 
733 	mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
734 
735 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
736 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, ch);
737 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, ch);
738 
739 	spdk_put_io_channel(ch->channel);
740 	spdk_put_io_channel(ch->mgmt_channel);
741 	assert(ch->io_outstanding == 0);
742 }
743 
744 struct spdk_io_channel *
745 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
746 {
747 	return spdk_get_io_channel(desc->bdev);
748 }
749 
750 const char *
751 spdk_bdev_get_name(const struct spdk_bdev *bdev)
752 {
753 	return bdev->name;
754 }
755 
756 const char *
757 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
758 {
759 	return bdev->product_name;
760 }
761 
762 uint32_t
763 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
764 {
765 	return bdev->blocklen;
766 }
767 
768 uint64_t
769 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
770 {
771 	return bdev->blockcnt;
772 }
773 
774 size_t
775 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
776 {
777 	/* TODO: push this logic down to the bdev modules */
778 	if (bdev->need_aligned_buffer) {
779 		return bdev->blocklen;
780 	}
781 
782 	return 1;
783 }
784 
785 uint32_t
786 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
787 {
788 	return bdev->optimal_io_boundary;
789 }
790 
791 bool
792 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
793 {
794 	return bdev->write_cache;
795 }
796 
797 /*
798  * Convert I/O offset and length from bytes to blocks.
799  *
800  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
801  */
802 static uint64_t
803 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
804 			  uint64_t num_bytes, uint64_t *num_blocks)
805 {
806 	uint32_t block_size = bdev->blocklen;
807 
808 	*offset_blocks = offset_bytes / block_size;
809 	*num_blocks = num_bytes / block_size;
810 
811 	return (offset_bytes % block_size) | (num_bytes % block_size);
812 }
813 
814 static bool
815 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
816 {
817 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
818 	 * has been an overflow and hence the offset has been wrapped around */
819 	if (offset_blocks + num_blocks < offset_blocks) {
820 		return false;
821 	}
822 
823 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
824 	if (offset_blocks + num_blocks > bdev->blockcnt) {
825 		return false;
826 	}
827 
828 	return true;
829 }
830 
831 int
832 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
833 	       void *buf, uint64_t offset, uint64_t nbytes,
834 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
835 {
836 	uint64_t offset_blocks, num_blocks;
837 
838 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
839 		return -EINVAL;
840 	}
841 
842 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
843 }
844 
845 int
846 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
847 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
848 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
849 {
850 	struct spdk_bdev *bdev = desc->bdev;
851 	struct spdk_bdev_io *bdev_io;
852 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
853 
854 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
855 		return -EINVAL;
856 	}
857 
858 	bdev_io = spdk_bdev_get_io();
859 	if (!bdev_io) {
860 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
861 		return -ENOMEM;
862 	}
863 
864 	bdev_io->ch = channel;
865 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
866 	bdev_io->u.bdev.iov.iov_base = buf;
867 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
868 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
869 	bdev_io->u.bdev.iovcnt = 1;
870 	bdev_io->u.bdev.num_blocks = num_blocks;
871 	bdev_io->u.bdev.offset_blocks = offset_blocks;
872 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
873 
874 	spdk_bdev_io_submit(bdev_io);
875 	return 0;
876 }
877 
878 int
879 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
880 		struct iovec *iov, int iovcnt,
881 		uint64_t offset, uint64_t nbytes,
882 		spdk_bdev_io_completion_cb cb, void *cb_arg)
883 {
884 	uint64_t offset_blocks, num_blocks;
885 
886 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
887 		return -EINVAL;
888 	}
889 
890 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
891 }
892 
893 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
894 			   struct iovec *iov, int iovcnt,
895 			   uint64_t offset_blocks, uint64_t num_blocks,
896 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
897 {
898 	struct spdk_bdev *bdev = desc->bdev;
899 	struct spdk_bdev_io *bdev_io;
900 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
901 
902 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
903 		return -EINVAL;
904 	}
905 
906 	bdev_io = spdk_bdev_get_io();
907 	if (!bdev_io) {
908 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
909 		return -ENOMEM;
910 	}
911 
912 	bdev_io->ch = channel;
913 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
914 	bdev_io->u.bdev.iovs = iov;
915 	bdev_io->u.bdev.iovcnt = iovcnt;
916 	bdev_io->u.bdev.num_blocks = num_blocks;
917 	bdev_io->u.bdev.offset_blocks = offset_blocks;
918 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
919 
920 	spdk_bdev_io_submit(bdev_io);
921 	return 0;
922 }
923 
924 int
925 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
926 		void *buf, uint64_t offset, uint64_t nbytes,
927 		spdk_bdev_io_completion_cb cb, void *cb_arg)
928 {
929 	uint64_t offset_blocks, num_blocks;
930 
931 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
932 		return -EINVAL;
933 	}
934 
935 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
936 }
937 
938 int
939 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
940 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
941 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
942 {
943 	struct spdk_bdev *bdev = desc->bdev;
944 	struct spdk_bdev_io *bdev_io;
945 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
946 
947 	if (!desc->write) {
948 		return -EBADF;
949 	}
950 
951 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
952 		return -EINVAL;
953 	}
954 
955 	bdev_io = spdk_bdev_get_io();
956 	if (!bdev_io) {
957 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
958 		return -ENOMEM;
959 	}
960 
961 	bdev_io->ch = channel;
962 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
963 	bdev_io->u.bdev.iov.iov_base = buf;
964 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
965 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
966 	bdev_io->u.bdev.iovcnt = 1;
967 	bdev_io->u.bdev.num_blocks = num_blocks;
968 	bdev_io->u.bdev.offset_blocks = offset_blocks;
969 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
970 
971 	spdk_bdev_io_submit(bdev_io);
972 	return 0;
973 }
974 
975 int
976 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
977 		 struct iovec *iov, int iovcnt,
978 		 uint64_t offset, uint64_t len,
979 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
980 {
981 	uint64_t offset_blocks, num_blocks;
982 
983 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
984 		return -EINVAL;
985 	}
986 
987 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
988 }
989 
990 int
991 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
992 			struct iovec *iov, int iovcnt,
993 			uint64_t offset_blocks, uint64_t num_blocks,
994 			spdk_bdev_io_completion_cb cb, void *cb_arg)
995 {
996 	struct spdk_bdev *bdev = desc->bdev;
997 	struct spdk_bdev_io *bdev_io;
998 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
999 
1000 	if (!desc->write) {
1001 		return -EBADF;
1002 	}
1003 
1004 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1005 		return -EINVAL;
1006 	}
1007 
1008 	bdev_io = spdk_bdev_get_io();
1009 	if (!bdev_io) {
1010 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
1011 		return -ENOMEM;
1012 	}
1013 
1014 	bdev_io->ch = channel;
1015 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1016 	bdev_io->u.bdev.iovs = iov;
1017 	bdev_io->u.bdev.iovcnt = iovcnt;
1018 	bdev_io->u.bdev.num_blocks = num_blocks;
1019 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1020 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1021 
1022 	spdk_bdev_io_submit(bdev_io);
1023 	return 0;
1024 }
1025 
1026 int
1027 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1028 		       uint64_t offset, uint64_t len,
1029 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1030 {
1031 	uint64_t offset_blocks, num_blocks;
1032 
1033 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1034 		return -EINVAL;
1035 	}
1036 
1037 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1038 }
1039 
1040 int
1041 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1042 			      uint64_t offset_blocks, uint64_t num_blocks,
1043 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1044 {
1045 	struct spdk_bdev *bdev = desc->bdev;
1046 	struct spdk_bdev_io *bdev_io;
1047 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1048 
1049 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1050 		return -EINVAL;
1051 	}
1052 
1053 	bdev_io = spdk_bdev_get_io();
1054 	if (!bdev_io) {
1055 		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
1056 		return -ENOMEM;
1057 	}
1058 
1059 	bdev_io->ch = channel;
1060 	bdev_io->u.bdev.num_blocks = num_blocks;
1061 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1062 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1063 
1064 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1065 
1066 	spdk_bdev_io_submit(bdev_io);
1067 	return 0;
1068 }
1069 
1070 int
1071 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1072 		uint64_t offset, uint64_t nbytes,
1073 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1074 {
1075 	uint64_t offset_blocks, num_blocks;
1076 
1077 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1078 		return -EINVAL;
1079 	}
1080 
1081 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1082 }
1083 
1084 int
1085 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1086 		       uint64_t offset_blocks, uint64_t num_blocks,
1087 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1088 {
1089 	struct spdk_bdev *bdev = desc->bdev;
1090 	struct spdk_bdev_io *bdev_io;
1091 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1092 
1093 	if (!desc->write) {
1094 		return -EBADF;
1095 	}
1096 
1097 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1098 		return -EINVAL;
1099 	}
1100 
1101 	if (num_blocks == 0) {
1102 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
1103 		return -EINVAL;
1104 	}
1105 
1106 	bdev_io = spdk_bdev_get_io();
1107 	if (!bdev_io) {
1108 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
1109 		return -ENOMEM;
1110 	}
1111 
1112 	bdev_io->ch = channel;
1113 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1114 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1115 	bdev_io->u.bdev.num_blocks = num_blocks;
1116 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1117 
1118 	spdk_bdev_io_submit(bdev_io);
1119 	return 0;
1120 }
1121 
1122 int
1123 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1124 		uint64_t offset, uint64_t length,
1125 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1126 {
1127 	uint64_t offset_blocks, num_blocks;
1128 
1129 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
1130 		return -EINVAL;
1131 	}
1132 
1133 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1134 }
1135 
1136 int
1137 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1138 		       uint64_t offset_blocks, uint64_t num_blocks,
1139 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1140 {
1141 	struct spdk_bdev *bdev = desc->bdev;
1142 	struct spdk_bdev_io *bdev_io;
1143 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1144 
1145 	if (!desc->write) {
1146 		return -EBADF;
1147 	}
1148 
1149 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1150 		return -EINVAL;
1151 	}
1152 
1153 	bdev_io = spdk_bdev_get_io();
1154 	if (!bdev_io) {
1155 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1156 		return -ENOMEM;
1157 	}
1158 
1159 	bdev_io->ch = channel;
1160 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1161 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1162 	bdev_io->u.bdev.num_blocks = num_blocks;
1163 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1164 
1165 	spdk_bdev_io_submit(bdev_io);
1166 	return 0;
1167 }
1168 
1169 static void
1170 _spdk_bdev_reset_dev(void *io_device, void *ctx)
1171 {
1172 	struct spdk_bdev_channel *ch = ctx;
1173 	struct spdk_bdev_io *bdev_io;
1174 
1175 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
1176 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, link);
1177 	spdk_bdev_io_submit_reset(bdev_io);
1178 }
1179 
1180 static void
1181 _spdk_bdev_reset_abort_channel(void *io_device, struct spdk_io_channel *ch,
1182 			       void *ctx)
1183 {
1184 	struct spdk_bdev_channel	*channel;
1185 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1186 
1187 	channel = spdk_io_channel_get_ctx(ch);
1188 	mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
1189 
1190 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
1191 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
1192 }
1193 
1194 static void
1195 _spdk_bdev_start_reset(void *ctx)
1196 {
1197 	struct spdk_bdev_channel *ch = ctx;
1198 
1199 	spdk_for_each_channel(ch->bdev, _spdk_bdev_reset_abort_channel,
1200 			      ch, _spdk_bdev_reset_dev);
1201 }
1202 
1203 static void
1204 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
1205 {
1206 	struct spdk_bdev *bdev = ch->bdev;
1207 
1208 	assert(!TAILQ_EMPTY(&ch->queued_resets));
1209 
1210 	pthread_mutex_lock(&bdev->mutex);
1211 	if (bdev->reset_in_progress == NULL) {
1212 		bdev->reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
1213 		/*
1214 		 * Take a channel reference for the target bdev for the life of this
1215 		 *  reset.  This guards against the channel getting destroyed while
1216 		 *  spdk_for_each_channel() calls related to this reset IO are in
1217 		 *  progress.  We will release the reference when this reset is
1218 		 *  completed.
1219 		 */
1220 		bdev->reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(bdev);
1221 		_spdk_bdev_start_reset(ch);
1222 	}
1223 	pthread_mutex_unlock(&bdev->mutex);
1224 }
1225 
1226 static void
1227 _spdk_bdev_complete_reset_channel(void *io_device, struct spdk_io_channel *_ch, void *ctx)
1228 {
1229 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
1230 
1231 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
1232 		_spdk_bdev_channel_start_reset(ch);
1233 	}
1234 }
1235 
1236 int
1237 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1238 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1239 {
1240 	struct spdk_bdev *bdev = desc->bdev;
1241 	struct spdk_bdev_io *bdev_io;
1242 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1243 
1244 	bdev_io = spdk_bdev_get_io();
1245 	if (!bdev_io) {
1246 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1247 		return -ENOMEM;;
1248 	}
1249 
1250 	bdev_io->ch = channel;
1251 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1252 	bdev_io->u.reset.ch_ref = NULL;
1253 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1254 
1255 	pthread_mutex_lock(&bdev->mutex);
1256 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, link);
1257 	pthread_mutex_unlock(&bdev->mutex);
1258 
1259 	_spdk_bdev_channel_start_reset(channel);
1260 
1261 	return 0;
1262 }
1263 
1264 void
1265 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1266 		      struct spdk_bdev_io_stat *stat)
1267 {
1268 #ifdef SPDK_CONFIG_VTUNE
1269 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1270 	memset(stat, 0, sizeof(*stat));
1271 	return;
1272 #endif
1273 
1274 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1275 
1276 	*stat = channel->stat;
1277 	memset(&channel->stat, 0, sizeof(channel->stat));
1278 }
1279 
1280 int
1281 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1282 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1283 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1284 {
1285 	struct spdk_bdev *bdev = desc->bdev;
1286 	struct spdk_bdev_io *bdev_io;
1287 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1288 
1289 	if (!desc->write) {
1290 		return -EBADF;
1291 	}
1292 
1293 	bdev_io = spdk_bdev_get_io();
1294 	if (!bdev_io) {
1295 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1296 		return -ENOMEM;
1297 	}
1298 
1299 	bdev_io->ch = channel;
1300 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
1301 	bdev_io->u.nvme_passthru.cmd = *cmd;
1302 	bdev_io->u.nvme_passthru.buf = buf;
1303 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1304 
1305 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1306 
1307 	spdk_bdev_io_submit(bdev_io);
1308 	return 0;
1309 }
1310 
1311 int
1312 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1313 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1314 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1315 {
1316 	struct spdk_bdev *bdev = desc->bdev;
1317 	struct spdk_bdev_io *bdev_io;
1318 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1319 
1320 	if (!desc->write) {
1321 		/*
1322 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1323 		 *  to easily determine if the command is a read or write, but for now just
1324 		 *  do not allow io_passthru with a read-only descriptor.
1325 		 */
1326 		return -EBADF;
1327 	}
1328 
1329 	bdev_io = spdk_bdev_get_io();
1330 	if (!bdev_io) {
1331 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1332 		return -ENOMEM;
1333 	}
1334 
1335 	bdev_io->ch = channel;
1336 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
1337 	bdev_io->u.nvme_passthru.cmd = *cmd;
1338 	bdev_io->u.nvme_passthru.buf = buf;
1339 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1340 
1341 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1342 
1343 	spdk_bdev_io_submit(bdev_io);
1344 	return 0;
1345 }
1346 
1347 int
1348 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
1349 {
1350 	if (!bdev_io) {
1351 		SPDK_ERRLOG("bdev_io is NULL\n");
1352 		return -1;
1353 	}
1354 
1355 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
1356 		SPDK_ERRLOG("bdev_io is in pending state\n");
1357 		assert(false);
1358 		return -1;
1359 	}
1360 
1361 	spdk_bdev_put_io(bdev_io);
1362 
1363 	return 0;
1364 }
1365 
1366 static void
1367 _spdk_bdev_io_complete(void *ctx)
1368 {
1369 	struct spdk_bdev_io *bdev_io = ctx;
1370 
1371 	assert(bdev_io->cb != NULL);
1372 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, bdev_io->caller_ctx);
1373 }
1374 
1375 void
1376 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
1377 {
1378 	bdev_io->status = status;
1379 
1380 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
1381 		pthread_mutex_lock(&bdev_io->bdev->mutex);
1382 		if (bdev_io == bdev_io->bdev->reset_in_progress) {
1383 			bdev_io->bdev->reset_in_progress = NULL;
1384 		}
1385 		pthread_mutex_unlock(&bdev_io->bdev->mutex);
1386 		if (bdev_io->u.reset.ch_ref != NULL) {
1387 			spdk_put_io_channel(bdev_io->u.reset.ch_ref);
1388 		}
1389 		spdk_for_each_channel(bdev_io->bdev, _spdk_bdev_complete_reset_channel, NULL, NULL);
1390 	} else {
1391 		assert(bdev_io->ch->io_outstanding > 0);
1392 		bdev_io->ch->io_outstanding--;
1393 	}
1394 
1395 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1396 		switch (bdev_io->type) {
1397 		case SPDK_BDEV_IO_TYPE_READ:
1398 			bdev_io->ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
1399 			bdev_io->ch->stat.num_read_ops++;
1400 			break;
1401 		case SPDK_BDEV_IO_TYPE_WRITE:
1402 			bdev_io->ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
1403 			bdev_io->ch->stat.num_write_ops++;
1404 			break;
1405 		default:
1406 			break;
1407 		}
1408 	}
1409 
1410 #ifdef SPDK_CONFIG_VTUNE
1411 	uint64_t now_tsc = spdk_get_ticks();
1412 	if (now_tsc > (bdev_io->ch->start_tsc + bdev_io->ch->interval_tsc)) {
1413 		uint64_t data[5];
1414 
1415 		data[0] = bdev_io->ch->stat.num_read_ops;
1416 		data[1] = bdev_io->ch->stat.bytes_read;
1417 		data[2] = bdev_io->ch->stat.num_write_ops;
1418 		data[3] = bdev_io->ch->stat.bytes_written;
1419 		data[4] = bdev_io->bdev->fn_table->get_spin_time ?
1420 			  bdev_io->bdev->fn_table->get_spin_time(bdev_io->ch->channel) : 0;
1421 
1422 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->ch->handle,
1423 				   __itt_metadata_u64, 5, data);
1424 
1425 		memset(&bdev_io->ch->stat, 0, sizeof(bdev_io->ch->stat));
1426 		bdev_io->ch->start_tsc = now_tsc;
1427 	}
1428 #endif
1429 
1430 	if (bdev_io->in_submit_request || bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
1431 		/*
1432 		 * Defer completion to avoid potential infinite recursion if the
1433 		 * user's completion callback issues a new I/O.
1434 		 */
1435 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->ch->channel),
1436 				     _spdk_bdev_io_complete, bdev_io);
1437 	} else {
1438 		_spdk_bdev_io_complete(bdev_io);
1439 	}
1440 }
1441 
1442 void
1443 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
1444 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
1445 {
1446 	if (sc == SPDK_SCSI_STATUS_GOOD) {
1447 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1448 	} else {
1449 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
1450 		bdev_io->error.scsi.sc = sc;
1451 		bdev_io->error.scsi.sk = sk;
1452 		bdev_io->error.scsi.asc = asc;
1453 		bdev_io->error.scsi.ascq = ascq;
1454 	}
1455 
1456 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1457 }
1458 
1459 void
1460 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
1461 			     int *sc, int *sk, int *asc, int *ascq)
1462 {
1463 	assert(sc != NULL);
1464 	assert(sk != NULL);
1465 	assert(asc != NULL);
1466 	assert(ascq != NULL);
1467 
1468 	switch (bdev_io->status) {
1469 	case SPDK_BDEV_IO_STATUS_SUCCESS:
1470 		*sc = SPDK_SCSI_STATUS_GOOD;
1471 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
1472 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1473 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1474 		break;
1475 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
1476 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
1477 		break;
1478 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
1479 		*sc = bdev_io->error.scsi.sc;
1480 		*sk = bdev_io->error.scsi.sk;
1481 		*asc = bdev_io->error.scsi.asc;
1482 		*ascq = bdev_io->error.scsi.ascq;
1483 		break;
1484 	default:
1485 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
1486 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
1487 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1488 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1489 		break;
1490 	}
1491 }
1492 
1493 void
1494 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
1495 {
1496 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
1497 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1498 	} else {
1499 		bdev_io->error.nvme.sct = sct;
1500 		bdev_io->error.nvme.sc = sc;
1501 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
1502 	}
1503 
1504 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1505 }
1506 
1507 void
1508 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
1509 {
1510 	assert(sct != NULL);
1511 	assert(sc != NULL);
1512 
1513 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
1514 		*sct = bdev_io->error.nvme.sct;
1515 		*sc = bdev_io->error.nvme.sc;
1516 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1517 		*sct = SPDK_NVME_SCT_GENERIC;
1518 		*sc = SPDK_NVME_SC_SUCCESS;
1519 	} else {
1520 		*sct = SPDK_NVME_SCT_GENERIC;
1521 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1522 	}
1523 }
1524 
1525 static void
1526 _spdk_bdev_register(struct spdk_bdev *bdev)
1527 {
1528 	struct spdk_bdev_module_if *module;
1529 
1530 	assert(bdev->module != NULL);
1531 
1532 	bdev->status = SPDK_BDEV_STATUS_READY;
1533 
1534 	TAILQ_INIT(&bdev->open_descs);
1535 	bdev->bdev_opened = false;
1536 
1537 	TAILQ_INIT(&bdev->vbdevs);
1538 	TAILQ_INIT(&bdev->base_bdevs);
1539 
1540 	bdev->reset_in_progress = NULL;
1541 
1542 	spdk_io_device_register(bdev, spdk_bdev_channel_create, spdk_bdev_channel_destroy,
1543 				sizeof(struct spdk_bdev_channel));
1544 
1545 	pthread_mutex_init(&bdev->mutex, NULL);
1546 	SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Inserting bdev %s into list\n", bdev->name);
1547 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
1548 
1549 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
1550 		if (module->examine) {
1551 			module->action_in_progress++;
1552 			module->examine(bdev);
1553 		}
1554 	}
1555 }
1556 
1557 void
1558 spdk_bdev_register(struct spdk_bdev *bdev)
1559 {
1560 	_spdk_bdev_register(bdev);
1561 }
1562 
1563 void
1564 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
1565 {
1566 	int i;
1567 
1568 	_spdk_bdev_register(vbdev);
1569 	for (i = 0; i < base_bdev_count; i++) {
1570 		assert(base_bdevs[i] != NULL);
1571 		TAILQ_INSERT_TAIL(&vbdev->base_bdevs, base_bdevs[i], base_bdev_link);
1572 		TAILQ_INSERT_TAIL(&base_bdevs[i]->vbdevs, vbdev, vbdev_link);
1573 	}
1574 }
1575 
1576 void
1577 spdk_bdev_unregister(struct spdk_bdev *bdev)
1578 {
1579 	struct spdk_bdev_desc	*desc, *tmp;
1580 	int			rc;
1581 	bool			do_destruct = true;
1582 
1583 	SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Removing bdev %s from list\n", bdev->name);
1584 
1585 	pthread_mutex_lock(&bdev->mutex);
1586 
1587 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
1588 
1589 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
1590 		if (desc->remove_cb) {
1591 			pthread_mutex_unlock(&bdev->mutex);
1592 			do_destruct = false;
1593 			desc->remove_cb(desc->remove_ctx);
1594 			pthread_mutex_lock(&bdev->mutex);
1595 		}
1596 	}
1597 
1598 	if (!do_destruct) {
1599 		pthread_mutex_unlock(&bdev->mutex);
1600 		return;
1601 	}
1602 
1603 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
1604 	pthread_mutex_unlock(&bdev->mutex);
1605 
1606 	pthread_mutex_destroy(&bdev->mutex);
1607 
1608 	spdk_io_device_unregister(bdev, NULL);
1609 
1610 	rc = bdev->fn_table->destruct(bdev->ctxt);
1611 	if (rc < 0) {
1612 		SPDK_ERRLOG("destruct failed\n");
1613 	}
1614 }
1615 
1616 void
1617 spdk_vbdev_unregister(struct spdk_bdev *vbdev)
1618 {
1619 	struct spdk_bdev *base_bdev;
1620 
1621 	assert(!TAILQ_EMPTY(&vbdev->base_bdevs));
1622 	TAILQ_FOREACH(base_bdev, &vbdev->base_bdevs, base_bdev_link) {
1623 		TAILQ_REMOVE(&base_bdev->vbdevs, vbdev, vbdev_link);
1624 	}
1625 	spdk_bdev_unregister(vbdev);
1626 }
1627 
1628 bool
1629 spdk_is_bdev_opened(struct spdk_bdev *bdev)
1630 {
1631 	struct spdk_bdev *base;
1632 
1633 	if (bdev->bdev_opened) {
1634 		return true;
1635 	}
1636 
1637 	TAILQ_FOREACH(base, &bdev->base_bdevs, base_bdev_link) {
1638 		if (spdk_is_bdev_opened(base)) {
1639 			return true;
1640 		}
1641 	}
1642 
1643 	return false;
1644 }
1645 
1646 int
1647 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
1648 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
1649 {
1650 	struct spdk_bdev_desc *desc;
1651 
1652 	desc = calloc(1, sizeof(*desc));
1653 	if (desc == NULL) {
1654 		return -ENOMEM;
1655 	}
1656 
1657 	pthread_mutex_lock(&bdev->mutex);
1658 
1659 	if (write && bdev->claim_module) {
1660 		SPDK_ERRLOG("failed, %s already claimed\n", bdev->name);
1661 		free(desc);
1662 		pthread_mutex_unlock(&bdev->mutex);
1663 		return -EPERM;
1664 	}
1665 
1666 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
1667 
1668 	bdev->bdev_opened = true;
1669 
1670 	desc->bdev = bdev;
1671 	desc->remove_cb = remove_cb;
1672 	desc->remove_ctx = remove_ctx;
1673 	desc->write = write;
1674 	*_desc = desc;
1675 
1676 	pthread_mutex_unlock(&bdev->mutex);
1677 
1678 	return 0;
1679 }
1680 
1681 void
1682 spdk_bdev_close(struct spdk_bdev_desc *desc)
1683 {
1684 	struct spdk_bdev *bdev = desc->bdev;
1685 	bool do_unregister = false;
1686 
1687 	pthread_mutex_lock(&bdev->mutex);
1688 
1689 	bdev->bdev_opened = false;
1690 
1691 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
1692 	free(desc);
1693 
1694 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
1695 		do_unregister = true;
1696 	}
1697 	pthread_mutex_unlock(&bdev->mutex);
1698 
1699 	if (do_unregister == true) {
1700 		spdk_bdev_unregister(bdev);
1701 	}
1702 }
1703 
1704 int
1705 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
1706 			    struct spdk_bdev_module_if *module)
1707 {
1708 	if (bdev->claim_module != NULL) {
1709 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
1710 			    bdev->claim_module->name);
1711 		return -EPERM;
1712 	}
1713 
1714 	if (desc && !desc->write) {
1715 		desc->write = true;
1716 	}
1717 
1718 	bdev->claim_module = module;
1719 	return 0;
1720 }
1721 
1722 void
1723 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
1724 {
1725 	assert(bdev->claim_module != NULL);
1726 	bdev->claim_module = NULL;
1727 }
1728 
1729 struct spdk_bdev *
1730 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
1731 {
1732 	return desc->bdev;
1733 }
1734 
1735 void
1736 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
1737 {
1738 	struct iovec *iovs;
1739 	int iovcnt;
1740 
1741 	if (bdev_io == NULL) {
1742 		return;
1743 	}
1744 
1745 	switch (bdev_io->type) {
1746 	case SPDK_BDEV_IO_TYPE_READ:
1747 		iovs = bdev_io->u.bdev.iovs;
1748 		iovcnt = bdev_io->u.bdev.iovcnt;
1749 		break;
1750 	case SPDK_BDEV_IO_TYPE_WRITE:
1751 		iovs = bdev_io->u.bdev.iovs;
1752 		iovcnt = bdev_io->u.bdev.iovcnt;
1753 		break;
1754 	default:
1755 		iovs = NULL;
1756 		iovcnt = 0;
1757 		break;
1758 	}
1759 
1760 	if (iovp) {
1761 		*iovp = iovs;
1762 	}
1763 	if (iovcntp) {
1764 		*iovcntp = iovcnt;
1765 	}
1766 }
1767 
1768 void
1769 spdk_bdev_module_list_add(struct spdk_bdev_module_if *bdev_module)
1770 {
1771 	/*
1772 	 * Modules with examine callbacks must be initialized first, so they are
1773 	 *  ready to handle examine callbacks from later modules that will
1774 	 *  register physical bdevs.
1775 	 */
1776 	if (bdev_module->examine != NULL) {
1777 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
1778 	} else {
1779 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
1780 	}
1781 }
1782 
1783 void
1784 spdk_bdev_part_base_free(struct spdk_bdev_part_base *base)
1785 {
1786 	assert(base->bdev);
1787 	assert(base->desc);
1788 	spdk_bdev_close(base->desc);
1789 	free(base);
1790 }
1791 
1792 void
1793 spdk_bdev_part_free(struct spdk_bdev_part *part)
1794 {
1795 	struct spdk_bdev_part_base *base;
1796 
1797 	assert(part);
1798 	assert(part->base);
1799 
1800 	base = part->base;
1801 	spdk_io_device_unregister(&part->base, NULL);
1802 	TAILQ_REMOVE(base->tailq, part, tailq);
1803 	free(part->bdev.name);
1804 	free(part);
1805 
1806 	if (__sync_sub_and_fetch(&base->ref, 1) == 0) {
1807 		spdk_bdev_module_release_bdev(base->bdev);
1808 		spdk_bdev_part_base_free(base);
1809 	}
1810 }
1811 
1812 void
1813 spdk_bdev_part_tailq_fini(struct bdev_part_tailq *tailq)
1814 {
1815 	struct spdk_bdev_part *part, *tmp;
1816 
1817 	TAILQ_FOREACH_SAFE(part, tailq, tailq, tmp) {
1818 		spdk_bdev_part_free(part);
1819 	}
1820 }
1821 
1822 void
1823 spdk_bdev_part_base_hotremove(struct spdk_bdev *base_bdev, struct bdev_part_tailq *tailq)
1824 {
1825 	struct spdk_bdev_part *part, *tmp;
1826 
1827 	TAILQ_FOREACH_SAFE(part, tailq, tailq, tmp) {
1828 		if (part->base->bdev == base_bdev) {
1829 			spdk_bdev_unregister(&part->bdev);
1830 		}
1831 	}
1832 }
1833 
1834 static bool
1835 spdk_bdev_part_io_type_supported(void *_part, enum spdk_bdev_io_type io_type)
1836 {
1837 	struct spdk_bdev_part *part = _part;
1838 
1839 	return part->base->bdev->fn_table->io_type_supported(part->base->bdev, io_type);
1840 }
1841 
1842 static struct spdk_io_channel *
1843 spdk_bdev_part_get_io_channel(void *_part)
1844 {
1845 	struct spdk_bdev_part *part = _part;
1846 
1847 	return spdk_get_io_channel(&part->base);
1848 }
1849 
1850 static void
1851 spdk_bdev_part_complete_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1852 {
1853 	struct spdk_bdev_io *part_io = cb_arg;
1854 	int status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
1855 
1856 	spdk_bdev_io_complete(part_io, status);
1857 	spdk_bdev_free_io(bdev_io);
1858 }
1859 
1860 void
1861 spdk_bdev_part_submit_request(struct spdk_bdev_part_channel *ch, struct spdk_bdev_io *bdev_io)
1862 {
1863 	struct spdk_bdev_part *part = ch->part;
1864 	struct spdk_io_channel *base_ch = ch->base_ch;
1865 	struct spdk_bdev_desc *base_desc = part->base->desc;
1866 	uint64_t offset;
1867 	int rc = 0;
1868 
1869 	/* Modify the I/O to adjust for the offset within the base bdev. */
1870 	switch (bdev_io->type) {
1871 	case SPDK_BDEV_IO_TYPE_READ:
1872 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
1873 		rc = spdk_bdev_readv_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
1874 					    bdev_io->u.bdev.iovcnt, offset,
1875 					    bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
1876 					    bdev_io);
1877 		break;
1878 	case SPDK_BDEV_IO_TYPE_WRITE:
1879 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
1880 		rc = spdk_bdev_writev_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
1881 					     bdev_io->u.bdev.iovcnt, offset,
1882 					     bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
1883 					     bdev_io);
1884 		break;
1885 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1886 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
1887 		rc = spdk_bdev_write_zeroes_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
1888 						   spdk_bdev_part_complete_io, bdev_io);
1889 		break;
1890 	case SPDK_BDEV_IO_TYPE_UNMAP:
1891 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
1892 		rc = spdk_bdev_unmap_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
1893 					    spdk_bdev_part_complete_io, bdev_io);
1894 		break;
1895 	case SPDK_BDEV_IO_TYPE_FLUSH:
1896 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
1897 		rc = spdk_bdev_flush_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
1898 					    spdk_bdev_part_complete_io, bdev_io);
1899 		break;
1900 	case SPDK_BDEV_IO_TYPE_RESET:
1901 		rc = spdk_bdev_reset(base_desc, base_ch,
1902 				     spdk_bdev_part_complete_io, bdev_io);
1903 		break;
1904 	default:
1905 		SPDK_ERRLOG("split: unknown I/O type %d\n", bdev_io->type);
1906 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1907 		return;
1908 	}
1909 
1910 	if (rc != 0) {
1911 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1912 	}
1913 }
1914 static int
1915 spdk_bdev_part_channel_create_cb(void *io_device, void *ctx_buf)
1916 {
1917 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
1918 	struct spdk_bdev_part_channel *ch = ctx_buf;
1919 
1920 	ch->part = part;
1921 	ch->base_ch = spdk_bdev_get_io_channel(part->base->desc);
1922 	if (ch->base_ch == NULL) {
1923 		return -1;
1924 	}
1925 
1926 	if (part->base->ch_create_cb) {
1927 		return part->base->ch_create_cb(io_device, ctx_buf);
1928 	} else {
1929 		return 0;
1930 	}
1931 }
1932 
1933 static void
1934 spdk_bdev_part_channel_destroy_cb(void *io_device, void *ctx_buf)
1935 {
1936 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
1937 	struct spdk_bdev_part_channel *ch = ctx_buf;
1938 
1939 	if (part->base->ch_destroy_cb) {
1940 		part->base->ch_destroy_cb(io_device, ctx_buf);
1941 	}
1942 	spdk_put_io_channel(ch->base_ch);
1943 }
1944 
1945 int
1946 spdk_bdev_part_base_construct(struct spdk_bdev_part_base *base, struct spdk_bdev *bdev,
1947 			      spdk_bdev_remove_cb_t remove_cb, struct spdk_bdev_module_if *module,
1948 			      struct spdk_bdev_fn_table *fn_table, struct bdev_part_tailq *tailq,
1949 			      uint32_t channel_size, spdk_io_channel_create_cb ch_create_cb,
1950 			      spdk_io_channel_destroy_cb ch_destroy_cb)
1951 {
1952 	int rc;
1953 
1954 	fn_table->get_io_channel = spdk_bdev_part_get_io_channel;
1955 	fn_table->io_type_supported = spdk_bdev_part_io_type_supported;
1956 
1957 	base->bdev = bdev;
1958 	base->ref = 0;
1959 	base->module = module;
1960 	base->fn_table = fn_table;
1961 	base->tailq = tailq;
1962 	base->claimed = false;
1963 	base->channel_size = channel_size;
1964 	base->ch_create_cb = ch_create_cb;
1965 	base->ch_destroy_cb = ch_destroy_cb;
1966 
1967 	rc = spdk_bdev_open(bdev, false, remove_cb, bdev, &base->desc);
1968 	if (rc) {
1969 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev));
1970 		return -1;
1971 	}
1972 
1973 	return 0;
1974 }
1975 
1976 int
1977 spdk_bdev_part_construct(struct spdk_bdev_part *part, struct spdk_bdev_part_base *base,
1978 			 char *name, uint64_t offset_blocks, uint64_t num_blocks,
1979 			 char *product_name)
1980 {
1981 	part->bdev.name = name;
1982 	part->bdev.blocklen = base->bdev->blocklen;
1983 	part->bdev.blockcnt = num_blocks;
1984 	part->offset_blocks = offset_blocks;
1985 
1986 	part->bdev.write_cache = base->bdev->write_cache;
1987 	part->bdev.need_aligned_buffer = base->bdev->need_aligned_buffer;
1988 	part->bdev.product_name = product_name;
1989 	part->bdev.ctxt = part;
1990 	part->bdev.module = base->module;
1991 	part->bdev.fn_table = base->fn_table;
1992 
1993 	__sync_fetch_and_add(&base->ref, 1);
1994 	part->base = base;
1995 
1996 	if (!base->claimed) {
1997 		int rc;
1998 
1999 		rc = spdk_bdev_module_claim_bdev(base->bdev, base->desc, base->module);
2000 		if (rc) {
2001 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(base->bdev));
2002 			free(part->bdev.name);
2003 			return -1;
2004 		}
2005 		base->claimed = true;
2006 	}
2007 
2008 	spdk_io_device_register(&part->base, spdk_bdev_part_channel_create_cb,
2009 				spdk_bdev_part_channel_destroy_cb,
2010 				base->channel_size);
2011 	spdk_vbdev_register(&part->bdev, &base->bdev, 1);
2012 	TAILQ_INSERT_TAIL(base->tailq, part, tailq);
2013 
2014 	return 0;
2015 }
2016 
2017 SPDK_LOG_REGISTER_TRACE_FLAG("bdev", SPDK_TRACE_BDEV)
2018