xref: /spdk/lib/bdev/bdev.c (revision ab29d2ce5de614bd44cfbcbc1a794a0adcdda93c)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/likely.h"
42 #include "spdk/queue.h"
43 #include "spdk/nvme_spec.h"
44 #include "spdk/scsi_spec.h"
45 #include "spdk/util.h"
46 
47 #include "spdk_internal/bdev.h"
48 #include "spdk_internal/log.h"
49 #include "spdk/string.h"
50 
51 #ifdef SPDK_CONFIG_VTUNE
52 #include "ittnotify.h"
53 #include "ittnotify_types.h"
54 int __itt_init_ittlib(const char *, __itt_group_id);
55 #endif
56 
57 #define SPDK_BDEV_IO_POOL_SIZE	(64 * 1024)
58 #define BUF_SMALL_POOL_SIZE	8192
59 #define BUF_LARGE_POOL_SIZE	1024
60 
61 typedef TAILQ_HEAD(, spdk_bdev_io) bdev_io_tailq_t;
62 
63 struct spdk_bdev_mgr {
64 	struct spdk_mempool *bdev_io_pool;
65 
66 	struct spdk_mempool *buf_small_pool;
67 	struct spdk_mempool *buf_large_pool;
68 
69 	TAILQ_HEAD(, spdk_bdev_module_if) bdev_modules;
70 
71 	TAILQ_HEAD(, spdk_bdev) bdevs;
72 
73 	spdk_bdev_poller_start_cb start_poller_fn;
74 	spdk_bdev_poller_stop_cb stop_poller_fn;
75 
76 	bool init_complete;
77 	bool module_init_complete;
78 
79 #ifdef SPDK_CONFIG_VTUNE
80 	__itt_domain	*domain;
81 #endif
82 };
83 
84 static struct spdk_bdev_mgr g_bdev_mgr = {
85 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
86 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
87 	.start_poller_fn = NULL,
88 	.stop_poller_fn = NULL,
89 	.init_complete = false,
90 	.module_init_complete = false,
91 };
92 
93 static spdk_bdev_init_cb	g_cb_fn = NULL;
94 static void			*g_cb_arg = NULL;
95 
96 
97 struct spdk_bdev_mgmt_channel {
98 	bdev_io_tailq_t need_buf_small;
99 	bdev_io_tailq_t need_buf_large;
100 };
101 
102 struct spdk_bdev_desc {
103 	struct spdk_bdev		*bdev;
104 	spdk_bdev_remove_cb_t		remove_cb;
105 	void				*remove_ctx;
106 	bool				write;
107 	TAILQ_ENTRY(spdk_bdev_desc)	link;
108 };
109 
110 struct spdk_bdev_channel {
111 	struct spdk_bdev	*bdev;
112 
113 	/* The channel for the underlying device */
114 	struct spdk_io_channel	*channel;
115 
116 	/* Channel for the bdev manager */
117 	struct spdk_io_channel *mgmt_channel;
118 
119 	struct spdk_bdev_io_stat stat;
120 
121 	/*
122 	 * Count of I/O submitted to bdev module and waiting for completion.
123 	 * Incremented before submit_request() is called on an spdk_bdev_io.
124 	 */
125 	uint64_t		io_outstanding;
126 
127 	bdev_io_tailq_t		queued_resets;
128 
129 #ifdef SPDK_CONFIG_VTUNE
130 	uint64_t		start_tsc;
131 	uint64_t		interval_tsc;
132 	__itt_string_handle	*handle;
133 #endif
134 
135 };
136 
137 struct spdk_bdev *
138 spdk_bdev_first(void)
139 {
140 	struct spdk_bdev *bdev;
141 
142 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
143 	if (bdev) {
144 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Starting bdev iteration at %s\n", bdev->name);
145 	}
146 
147 	return bdev;
148 }
149 
150 struct spdk_bdev *
151 spdk_bdev_next(struct spdk_bdev *prev)
152 {
153 	struct spdk_bdev *bdev;
154 
155 	bdev = TAILQ_NEXT(prev, link);
156 	if (bdev) {
157 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
158 	}
159 
160 	return bdev;
161 }
162 
163 static struct spdk_bdev *
164 _bdev_next_leaf(struct spdk_bdev *bdev)
165 {
166 	while (bdev != NULL) {
167 		if (TAILQ_EMPTY(&bdev->vbdevs)) {
168 			return bdev;
169 		} else {
170 			bdev = TAILQ_NEXT(bdev, link);
171 		}
172 	}
173 
174 	return bdev;
175 }
176 
177 struct spdk_bdev *
178 spdk_bdev_first_leaf(void)
179 {
180 	struct spdk_bdev *bdev;
181 
182 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
183 
184 	if (bdev) {
185 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Starting bdev iteration at %s\n", bdev->name);
186 	}
187 
188 	return bdev;
189 }
190 
191 struct spdk_bdev *
192 spdk_bdev_next_leaf(struct spdk_bdev *prev)
193 {
194 	struct spdk_bdev *bdev;
195 
196 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
197 
198 	if (bdev) {
199 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
200 	}
201 
202 	return bdev;
203 }
204 
205 struct spdk_bdev *
206 spdk_bdev_get_by_name(const char *bdev_name)
207 {
208 	struct spdk_bdev *bdev = spdk_bdev_first();
209 
210 	while (bdev != NULL) {
211 		if (strcmp(bdev_name, bdev->name) == 0) {
212 			return bdev;
213 		}
214 		bdev = spdk_bdev_next(bdev);
215 	}
216 
217 	return NULL;
218 }
219 
220 static void
221 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
222 {
223 	assert(bdev_io->get_buf_cb != NULL);
224 	assert(buf != NULL);
225 	assert(bdev_io->u.bdev.iovs != NULL);
226 
227 	bdev_io->buf = buf;
228 	bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
229 	bdev_io->u.bdev.iovs[0].iov_len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
230 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
231 }
232 
233 static void
234 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
235 {
236 	struct spdk_mempool *pool;
237 	struct spdk_bdev_io *tmp;
238 	void *buf;
239 	bdev_io_tailq_t *tailq;
240 	uint64_t length;
241 	struct spdk_bdev_mgmt_channel *ch;
242 
243 	assert(bdev_io->u.bdev.iovcnt == 1);
244 
245 	length = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
246 	buf = bdev_io->buf;
247 
248 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
249 
250 	if (length <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
251 		pool = g_bdev_mgr.buf_small_pool;
252 		tailq = &ch->need_buf_small;
253 	} else {
254 		pool = g_bdev_mgr.buf_large_pool;
255 		tailq = &ch->need_buf_large;
256 	}
257 
258 	if (TAILQ_EMPTY(tailq)) {
259 		spdk_mempool_put(pool, buf);
260 	} else {
261 		tmp = TAILQ_FIRST(tailq);
262 		TAILQ_REMOVE(tailq, tmp, buf_link);
263 		spdk_bdev_io_set_buf(tmp, buf);
264 	}
265 }
266 
267 void
268 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb)
269 {
270 	uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
271 	struct spdk_mempool *pool;
272 	bdev_io_tailq_t *tailq;
273 	void *buf = NULL;
274 	struct spdk_bdev_mgmt_channel *ch;
275 
276 	assert(cb != NULL);
277 	assert(bdev_io->u.bdev.iovs != NULL);
278 
279 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
280 		/* Buffer already present */
281 		cb(bdev_io->ch->channel, bdev_io);
282 		return;
283 	}
284 
285 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
286 
287 	bdev_io->get_buf_cb = cb;
288 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
289 		pool = g_bdev_mgr.buf_small_pool;
290 		tailq = &ch->need_buf_small;
291 	} else {
292 		pool = g_bdev_mgr.buf_large_pool;
293 		tailq = &ch->need_buf_large;
294 	}
295 
296 	buf = spdk_mempool_get(pool);
297 
298 	if (!buf) {
299 		TAILQ_INSERT_TAIL(tailq, bdev_io, buf_link);
300 	} else {
301 		spdk_bdev_io_set_buf(bdev_io, buf);
302 	}
303 }
304 
305 static int
306 spdk_bdev_module_get_max_ctx_size(void)
307 {
308 	struct spdk_bdev_module_if *bdev_module;
309 	int max_bdev_module_size = 0;
310 
311 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
312 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
313 			max_bdev_module_size = bdev_module->get_ctx_size();
314 		}
315 	}
316 
317 	return max_bdev_module_size;
318 }
319 
320 void
321 spdk_bdev_config_text(FILE *fp)
322 {
323 	struct spdk_bdev_module_if *bdev_module;
324 
325 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
326 		if (bdev_module->config_text) {
327 			bdev_module->config_text(fp);
328 		}
329 	}
330 }
331 
332 static int
333 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
334 {
335 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
336 
337 	TAILQ_INIT(&ch->need_buf_small);
338 	TAILQ_INIT(&ch->need_buf_large);
339 
340 	return 0;
341 }
342 
343 static void
344 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
345 {
346 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
347 
348 	if (!TAILQ_EMPTY(&ch->need_buf_small) || !TAILQ_EMPTY(&ch->need_buf_large)) {
349 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel destruction\n");
350 	}
351 }
352 
353 static void
354 spdk_bdev_init_complete(int rc)
355 {
356 	spdk_bdev_init_cb cb_fn = g_cb_fn;
357 	void *cb_arg = g_cb_arg;
358 
359 	g_bdev_mgr.init_complete = true;
360 	g_cb_fn = NULL;
361 	g_cb_arg = NULL;
362 
363 	cb_fn(cb_arg, rc);
364 }
365 
366 static void
367 spdk_bdev_module_action_complete(void)
368 {
369 	struct spdk_bdev_module_if *m;
370 
371 	/*
372 	 * Don't finish bdev subsystem initialization if
373 	 * module pre-initialization is still in progress, or
374 	 * the subsystem been already initialized.
375 	 */
376 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
377 		return;
378 	}
379 
380 	/*
381 	 * Check all bdev modules for inits/examinations in progress. If any
382 	 * exist, return immediately since we cannot finish bdev subsystem
383 	 * initialization until all are completed.
384 	 */
385 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
386 		if (m->action_in_progress > 0) {
387 			return;
388 		}
389 	}
390 
391 	/*
392 	 * Modules already finished initialization - now that all
393 	 * the bdev modules have finished their asynchronous I/O
394 	 * processing, the entire bdev layer can be marked as complete.
395 	 */
396 	spdk_bdev_init_complete(0);
397 }
398 
399 static void
400 spdk_bdev_module_action_done(struct spdk_bdev_module_if *module)
401 {
402 	assert(module->action_in_progress > 0);
403 	module->action_in_progress--;
404 	spdk_bdev_module_action_complete();
405 }
406 
407 void
408 spdk_bdev_module_init_done(struct spdk_bdev_module_if *module)
409 {
410 	spdk_bdev_module_action_done(module);
411 }
412 
413 void
414 spdk_bdev_module_examine_done(struct spdk_bdev_module_if *module)
415 {
416 	spdk_bdev_module_action_done(module);
417 }
418 
419 static int
420 spdk_bdev_modules_init(void)
421 {
422 	struct spdk_bdev_module_if *module;
423 	int rc = 0;
424 
425 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
426 		rc = module->module_init();
427 		if (rc != 0) {
428 			break;
429 		}
430 	}
431 
432 	g_bdev_mgr.module_init_complete = true;
433 	return rc;
434 }
435 
436 void
437 spdk_bdev_poller_start(struct spdk_bdev_poller **ppoller,
438 		       spdk_bdev_poller_fn fn,
439 		       void *arg,
440 		       uint32_t lcore,
441 		       uint64_t period_microseconds)
442 {
443 	g_bdev_mgr.start_poller_fn(ppoller, fn, arg, lcore, period_microseconds);
444 }
445 
446 void
447 spdk_bdev_poller_stop(struct spdk_bdev_poller **ppoller)
448 {
449 	g_bdev_mgr.stop_poller_fn(ppoller);
450 }
451 
452 void
453 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg,
454 		     spdk_bdev_poller_start_cb start_poller_fn,
455 		     spdk_bdev_poller_stop_cb stop_poller_fn)
456 {
457 	int cache_size;
458 	int rc = 0;
459 	char mempool_name[32];
460 
461 	assert(cb_fn != NULL);
462 
463 	g_cb_fn = cb_fn;
464 	g_cb_arg = cb_arg;
465 
466 	g_bdev_mgr.start_poller_fn = start_poller_fn;
467 	g_bdev_mgr.stop_poller_fn = stop_poller_fn;
468 
469 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
470 
471 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
472 				  SPDK_BDEV_IO_POOL_SIZE,
473 				  sizeof(struct spdk_bdev_io) +
474 				  spdk_bdev_module_get_max_ctx_size(),
475 				  64,
476 				  SPDK_ENV_SOCKET_ID_ANY);
477 
478 	if (g_bdev_mgr.bdev_io_pool == NULL) {
479 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
480 		spdk_bdev_init_complete(-1);
481 		return;
482 	}
483 
484 	/**
485 	 * Ensure no more than half of the total buffers end up local caches, by
486 	 *   using spdk_env_get_core_count() to determine how many local caches we need
487 	 *   to account for.
488 	 */
489 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
490 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
491 
492 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
493 				    BUF_SMALL_POOL_SIZE,
494 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
495 				    cache_size,
496 				    SPDK_ENV_SOCKET_ID_ANY);
497 	if (!g_bdev_mgr.buf_small_pool) {
498 		SPDK_ERRLOG("create rbuf small pool failed\n");
499 		spdk_bdev_init_complete(-1);
500 		return;
501 	}
502 
503 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
504 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
505 
506 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
507 				    BUF_LARGE_POOL_SIZE,
508 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
509 				    cache_size,
510 				    SPDK_ENV_SOCKET_ID_ANY);
511 	if (!g_bdev_mgr.buf_large_pool) {
512 		SPDK_ERRLOG("create rbuf large pool failed\n");
513 		spdk_bdev_init_complete(-1);
514 		return;
515 	}
516 
517 #ifdef SPDK_CONFIG_VTUNE
518 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
519 #endif
520 
521 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
522 				spdk_bdev_mgmt_channel_destroy,
523 				sizeof(struct spdk_bdev_mgmt_channel));
524 
525 	rc = spdk_bdev_modules_init();
526 	if (rc != 0) {
527 		SPDK_ERRLOG("bdev modules init failed\n");
528 		spdk_bdev_init_complete(-1);
529 		return;
530 	}
531 
532 	spdk_bdev_module_action_complete();
533 }
534 
535 int
536 spdk_bdev_finish(void)
537 {
538 	struct spdk_bdev_module_if *bdev_module;
539 
540 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
541 		if (bdev_module->module_fini) {
542 			bdev_module->module_fini();
543 		}
544 	}
545 
546 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
547 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
548 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
549 			    SPDK_BDEV_IO_POOL_SIZE);
550 	}
551 
552 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
553 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
554 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
555 			    BUF_SMALL_POOL_SIZE);
556 		assert(false);
557 	}
558 
559 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
560 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
561 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
562 			    BUF_LARGE_POOL_SIZE);
563 		assert(false);
564 	}
565 
566 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
567 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
568 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
569 
570 	spdk_io_device_unregister(&g_bdev_mgr, NULL);
571 
572 	return 0;
573 }
574 
575 struct spdk_bdev_io *
576 spdk_bdev_get_io(void)
577 {
578 	struct spdk_bdev_io *bdev_io;
579 
580 	bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
581 	if (!bdev_io) {
582 		SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
583 		abort();
584 	}
585 
586 	memset(bdev_io, 0, offsetof(struct spdk_bdev_io, u));
587 
588 	return bdev_io;
589 }
590 
591 static void
592 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
593 {
594 	if (!bdev_io) {
595 		return;
596 	}
597 
598 	if (bdev_io->buf != NULL) {
599 		spdk_bdev_io_put_buf(bdev_io);
600 	}
601 
602 	spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
603 }
604 
605 static void
606 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
607 {
608 	struct spdk_bdev *bdev = bdev_io->bdev;
609 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
610 	struct spdk_io_channel *ch = bdev_ch->channel;
611 
612 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
613 
614 	bdev_ch->io_outstanding++;
615 	bdev_io->in_submit_request = true;
616 	bdev->fn_table->submit_request(ch, bdev_io);
617 	bdev_io->in_submit_request = false;
618 }
619 
620 static void
621 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
622 {
623 	struct spdk_bdev *bdev = bdev_io->bdev;
624 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
625 	struct spdk_io_channel *ch = bdev_ch->channel;
626 
627 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
628 
629 	bdev_io->in_submit_request = true;
630 	bdev->fn_table->submit_request(ch, bdev_io);
631 	bdev_io->in_submit_request = false;
632 }
633 
634 static void
635 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
636 		  struct spdk_bdev *bdev, void *cb_arg,
637 		  spdk_bdev_io_completion_cb cb)
638 {
639 	bdev_io->bdev = bdev;
640 	bdev_io->caller_ctx = cb_arg;
641 	bdev_io->cb = cb;
642 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
643 	bdev_io->in_submit_request = false;
644 }
645 
646 bool
647 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
648 {
649 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
650 }
651 
652 int
653 spdk_bdev_dump_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
654 {
655 	if (bdev->fn_table->dump_config_json) {
656 		return bdev->fn_table->dump_config_json(bdev->ctxt, w);
657 	}
658 
659 	return 0;
660 }
661 
662 static int
663 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
664 {
665 	struct spdk_bdev		*bdev = io_device;
666 	struct spdk_bdev_channel	*ch = ctx_buf;
667 
668 	ch->bdev = io_device;
669 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
670 	ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr);
671 	memset(&ch->stat, 0, sizeof(ch->stat));
672 	ch->io_outstanding = 0;
673 	TAILQ_INIT(&ch->queued_resets);
674 
675 #ifdef SPDK_CONFIG_VTUNE
676 	{
677 		char *name;
678 		__itt_init_ittlib(NULL, 0);
679 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
680 		if (!name) {
681 			return -1;
682 		}
683 		ch->handle = __itt_string_handle_create(name);
684 		free(name);
685 		ch->start_tsc = spdk_get_ticks();
686 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
687 	}
688 #endif
689 
690 	return 0;
691 }
692 
693 static void
694 _spdk_bdev_abort_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
695 {
696 	struct spdk_bdev_io *bdev_io, *tmp;
697 
698 	TAILQ_FOREACH_SAFE(bdev_io, queue, buf_link, tmp) {
699 		if (bdev_io->ch == ch) {
700 			TAILQ_REMOVE(queue, bdev_io, buf_link);
701 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
702 		}
703 	}
704 }
705 
706 static void
707 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
708 {
709 	struct spdk_bdev_channel	*ch = ctx_buf;
710 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
711 
712 	mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
713 
714 	_spdk_bdev_abort_io(&mgmt_channel->need_buf_small, ch);
715 	_spdk_bdev_abort_io(&mgmt_channel->need_buf_large, ch);
716 
717 	spdk_put_io_channel(ch->channel);
718 	spdk_put_io_channel(ch->mgmt_channel);
719 	assert(ch->io_outstanding == 0);
720 }
721 
722 struct spdk_io_channel *
723 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
724 {
725 	return spdk_get_io_channel(desc->bdev);
726 }
727 
728 const char *
729 spdk_bdev_get_name(const struct spdk_bdev *bdev)
730 {
731 	return bdev->name;
732 }
733 
734 const char *
735 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
736 {
737 	return bdev->product_name;
738 }
739 
740 uint32_t
741 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
742 {
743 	return bdev->blocklen;
744 }
745 
746 uint64_t
747 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
748 {
749 	return bdev->blockcnt;
750 }
751 
752 size_t
753 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
754 {
755 	/* TODO: push this logic down to the bdev modules */
756 	if (bdev->need_aligned_buffer) {
757 		return bdev->blocklen;
758 	}
759 
760 	return 1;
761 }
762 
763 uint32_t
764 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
765 {
766 	return bdev->optimal_io_boundary;
767 }
768 
769 bool
770 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
771 {
772 	return bdev->write_cache;
773 }
774 
775 /*
776  * Convert I/O offset and length from bytes to blocks.
777  *
778  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
779  */
780 static uint64_t
781 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
782 			  uint64_t num_bytes, uint64_t *num_blocks)
783 {
784 	uint32_t block_size = bdev->blocklen;
785 
786 	*offset_blocks = offset_bytes / block_size;
787 	*num_blocks = num_bytes / block_size;
788 
789 	return (offset_bytes % block_size) | (num_bytes % block_size);
790 }
791 
792 static bool
793 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
794 {
795 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
796 	 * has been an overflow and hence the offset has been wrapped around */
797 	if (offset_blocks + num_blocks < offset_blocks) {
798 		return false;
799 	}
800 
801 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
802 	if (offset_blocks + num_blocks > bdev->blockcnt) {
803 		return false;
804 	}
805 
806 	return true;
807 }
808 
809 int
810 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
811 	       void *buf, uint64_t offset, uint64_t nbytes,
812 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
813 {
814 	uint64_t offset_blocks, num_blocks;
815 
816 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
817 		return -EINVAL;
818 	}
819 
820 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
821 }
822 
823 int
824 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
825 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
826 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
827 {
828 	struct spdk_bdev *bdev = desc->bdev;
829 	struct spdk_bdev_io *bdev_io;
830 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
831 
832 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
833 		return -EINVAL;
834 	}
835 
836 	bdev_io = spdk_bdev_get_io();
837 	if (!bdev_io) {
838 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
839 		return -ENOMEM;
840 	}
841 
842 	bdev_io->ch = channel;
843 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
844 	bdev_io->u.bdev.iov.iov_base = buf;
845 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
846 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
847 	bdev_io->u.bdev.iovcnt = 1;
848 	bdev_io->u.bdev.num_blocks = num_blocks;
849 	bdev_io->u.bdev.offset_blocks = offset_blocks;
850 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
851 
852 	spdk_bdev_io_submit(bdev_io);
853 	return 0;
854 }
855 
856 int
857 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
858 		struct iovec *iov, int iovcnt,
859 		uint64_t offset, uint64_t nbytes,
860 		spdk_bdev_io_completion_cb cb, void *cb_arg)
861 {
862 	uint64_t offset_blocks, num_blocks;
863 
864 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
865 		return -EINVAL;
866 	}
867 
868 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
869 }
870 
871 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
872 			   struct iovec *iov, int iovcnt,
873 			   uint64_t offset_blocks, uint64_t num_blocks,
874 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
875 {
876 	struct spdk_bdev *bdev = desc->bdev;
877 	struct spdk_bdev_io *bdev_io;
878 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
879 
880 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
881 		return -EINVAL;
882 	}
883 
884 	bdev_io = spdk_bdev_get_io();
885 	if (!bdev_io) {
886 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
887 		return -ENOMEM;
888 	}
889 
890 	bdev_io->ch = channel;
891 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
892 	bdev_io->u.bdev.iovs = iov;
893 	bdev_io->u.bdev.iovcnt = iovcnt;
894 	bdev_io->u.bdev.num_blocks = num_blocks;
895 	bdev_io->u.bdev.offset_blocks = offset_blocks;
896 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
897 
898 	spdk_bdev_io_submit(bdev_io);
899 	return 0;
900 }
901 
902 int
903 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
904 		void *buf, uint64_t offset, uint64_t nbytes,
905 		spdk_bdev_io_completion_cb cb, void *cb_arg)
906 {
907 	uint64_t offset_blocks, num_blocks;
908 
909 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
910 		return -EINVAL;
911 	}
912 
913 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
914 }
915 
916 int
917 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
918 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
919 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
920 {
921 	struct spdk_bdev *bdev = desc->bdev;
922 	struct spdk_bdev_io *bdev_io;
923 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
924 
925 	if (!desc->write) {
926 		return -EBADF;
927 	}
928 
929 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
930 		return -EINVAL;
931 	}
932 
933 	bdev_io = spdk_bdev_get_io();
934 	if (!bdev_io) {
935 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
936 		return -ENOMEM;
937 	}
938 
939 	bdev_io->ch = channel;
940 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
941 	bdev_io->u.bdev.iov.iov_base = buf;
942 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
943 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
944 	bdev_io->u.bdev.iovcnt = 1;
945 	bdev_io->u.bdev.num_blocks = num_blocks;
946 	bdev_io->u.bdev.offset_blocks = offset_blocks;
947 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
948 
949 	spdk_bdev_io_submit(bdev_io);
950 	return 0;
951 }
952 
953 int
954 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
955 		 struct iovec *iov, int iovcnt,
956 		 uint64_t offset, uint64_t len,
957 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
958 {
959 	uint64_t offset_blocks, num_blocks;
960 
961 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
962 		return -EINVAL;
963 	}
964 
965 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
966 }
967 
968 int
969 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
970 			struct iovec *iov, int iovcnt,
971 			uint64_t offset_blocks, uint64_t num_blocks,
972 			spdk_bdev_io_completion_cb cb, void *cb_arg)
973 {
974 	struct spdk_bdev *bdev = desc->bdev;
975 	struct spdk_bdev_io *bdev_io;
976 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
977 
978 	if (!desc->write) {
979 		return -EBADF;
980 	}
981 
982 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
983 		return -EINVAL;
984 	}
985 
986 	bdev_io = spdk_bdev_get_io();
987 	if (!bdev_io) {
988 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
989 		return -ENOMEM;
990 	}
991 
992 	bdev_io->ch = channel;
993 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
994 	bdev_io->u.bdev.iovs = iov;
995 	bdev_io->u.bdev.iovcnt = iovcnt;
996 	bdev_io->u.bdev.num_blocks = num_blocks;
997 	bdev_io->u.bdev.offset_blocks = offset_blocks;
998 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
999 
1000 	spdk_bdev_io_submit(bdev_io);
1001 	return 0;
1002 }
1003 
1004 int
1005 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1006 		       uint64_t offset, uint64_t len,
1007 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1008 {
1009 	uint64_t offset_blocks, num_blocks;
1010 
1011 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1012 		return -EINVAL;
1013 	}
1014 
1015 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1016 }
1017 
1018 int
1019 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1020 			      uint64_t offset_blocks, uint64_t num_blocks,
1021 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1022 {
1023 	struct spdk_bdev *bdev = desc->bdev;
1024 	struct spdk_bdev_io *bdev_io;
1025 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1026 
1027 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1028 		return -EINVAL;
1029 	}
1030 
1031 	bdev_io = spdk_bdev_get_io();
1032 	if (!bdev_io) {
1033 		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
1034 		return -ENOMEM;
1035 	}
1036 
1037 	bdev_io->ch = channel;
1038 	bdev_io->u.bdev.num_blocks = num_blocks;
1039 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1040 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1041 
1042 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1043 
1044 	spdk_bdev_io_submit(bdev_io);
1045 	return 0;
1046 }
1047 
1048 int
1049 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1050 		uint64_t offset, uint64_t nbytes,
1051 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1052 {
1053 	uint64_t offset_blocks, num_blocks;
1054 
1055 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1056 		return -EINVAL;
1057 	}
1058 
1059 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1060 }
1061 
1062 int
1063 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1064 		       uint64_t offset_blocks, uint64_t num_blocks,
1065 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1066 {
1067 	struct spdk_bdev *bdev = desc->bdev;
1068 	struct spdk_bdev_io *bdev_io;
1069 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1070 
1071 	if (!desc->write) {
1072 		return -EBADF;
1073 	}
1074 
1075 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1076 		return -EINVAL;
1077 	}
1078 
1079 	if (num_blocks == 0) {
1080 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
1081 		return -EINVAL;
1082 	}
1083 
1084 	bdev_io = spdk_bdev_get_io();
1085 	if (!bdev_io) {
1086 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
1087 		return -ENOMEM;
1088 	}
1089 
1090 	bdev_io->ch = channel;
1091 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1092 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1093 	bdev_io->u.bdev.num_blocks = num_blocks;
1094 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1095 
1096 	spdk_bdev_io_submit(bdev_io);
1097 	return 0;
1098 }
1099 
1100 int
1101 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1102 		uint64_t offset, uint64_t length,
1103 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1104 {
1105 	uint64_t offset_blocks, num_blocks;
1106 
1107 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
1108 		return -EINVAL;
1109 	}
1110 
1111 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1112 }
1113 
1114 int
1115 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1116 		       uint64_t offset_blocks, uint64_t num_blocks,
1117 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1118 {
1119 	struct spdk_bdev *bdev = desc->bdev;
1120 	struct spdk_bdev_io *bdev_io;
1121 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1122 
1123 	if (!desc->write) {
1124 		return -EBADF;
1125 	}
1126 
1127 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1128 		return -EINVAL;
1129 	}
1130 
1131 	bdev_io = spdk_bdev_get_io();
1132 	if (!bdev_io) {
1133 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1134 		return -ENOMEM;
1135 	}
1136 
1137 	bdev_io->ch = channel;
1138 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1139 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1140 	bdev_io->u.bdev.num_blocks = num_blocks;
1141 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1142 
1143 	spdk_bdev_io_submit(bdev_io);
1144 	return 0;
1145 }
1146 
1147 static void
1148 _spdk_bdev_reset_dev(void *io_device, void *ctx)
1149 {
1150 	struct spdk_bdev_channel *ch = ctx;
1151 	struct spdk_bdev_io *bdev_io;
1152 
1153 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
1154 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, link);
1155 	spdk_bdev_io_submit_reset(bdev_io);
1156 }
1157 
1158 static void
1159 _spdk_bdev_reset_abort_channel(void *io_device, struct spdk_io_channel *ch,
1160 			       void *ctx)
1161 {
1162 	struct spdk_bdev_channel	*channel;
1163 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1164 
1165 	channel = spdk_io_channel_get_ctx(ch);
1166 	mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
1167 
1168 	_spdk_bdev_abort_io(&mgmt_channel->need_buf_small, channel);
1169 	_spdk_bdev_abort_io(&mgmt_channel->need_buf_large, channel);
1170 }
1171 
1172 static void
1173 _spdk_bdev_start_reset(void *ctx)
1174 {
1175 	struct spdk_bdev_channel *ch = ctx;
1176 
1177 	spdk_for_each_channel(ch->bdev, _spdk_bdev_reset_abort_channel,
1178 			      ch, _spdk_bdev_reset_dev);
1179 }
1180 
1181 static void
1182 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
1183 {
1184 	struct spdk_bdev *bdev = ch->bdev;
1185 
1186 	assert(!TAILQ_EMPTY(&ch->queued_resets));
1187 
1188 	pthread_mutex_lock(&bdev->mutex);
1189 	if (!bdev->reset_in_progress) {
1190 		bdev->reset_in_progress = true;
1191 		/*
1192 		 * Take a channel reference for the target bdev for the life of this
1193 		 *  reset.  This guards against the channel getting destroyed while
1194 		 *  spdk_for_each_channel() calls related to this reset IO are in
1195 		 *  progress.  We will release the reference when this reset is
1196 		 *  completed.
1197 		 */
1198 		TAILQ_FIRST(&ch->queued_resets)->u.reset.ch_ref = spdk_get_io_channel(bdev);
1199 		_spdk_bdev_start_reset(ch);
1200 	}
1201 	pthread_mutex_unlock(&bdev->mutex);
1202 }
1203 
1204 static void
1205 _spdk_bdev_complete_reset_channel(void *io_device, struct spdk_io_channel *_ch, void *ctx)
1206 {
1207 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
1208 
1209 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
1210 		_spdk_bdev_channel_start_reset(ch);
1211 	}
1212 }
1213 
1214 int
1215 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1216 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1217 {
1218 	struct spdk_bdev *bdev = desc->bdev;
1219 	struct spdk_bdev_io *bdev_io;
1220 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1221 
1222 	bdev_io = spdk_bdev_get_io();
1223 	if (!bdev_io) {
1224 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1225 		return -ENOMEM;;
1226 	}
1227 
1228 	bdev_io->ch = channel;
1229 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1230 	bdev_io->u.reset.ch_ref = NULL;
1231 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1232 
1233 	pthread_mutex_lock(&bdev->mutex);
1234 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, link);
1235 	pthread_mutex_unlock(&bdev->mutex);
1236 
1237 	_spdk_bdev_channel_start_reset(channel);
1238 
1239 	return 0;
1240 }
1241 
1242 void
1243 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1244 		      struct spdk_bdev_io_stat *stat)
1245 {
1246 #ifdef SPDK_CONFIG_VTUNE
1247 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1248 	memset(stat, 0, sizeof(*stat));
1249 	return;
1250 #endif
1251 
1252 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1253 
1254 	*stat = channel->stat;
1255 	memset(&channel->stat, 0, sizeof(channel->stat));
1256 }
1257 
1258 int
1259 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1260 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1261 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1262 {
1263 	struct spdk_bdev *bdev = desc->bdev;
1264 	struct spdk_bdev_io *bdev_io;
1265 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1266 
1267 	if (!desc->write) {
1268 		return -EBADF;
1269 	}
1270 
1271 	bdev_io = spdk_bdev_get_io();
1272 	if (!bdev_io) {
1273 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1274 		return -ENOMEM;
1275 	}
1276 
1277 	bdev_io->ch = channel;
1278 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
1279 	bdev_io->u.nvme_passthru.cmd = *cmd;
1280 	bdev_io->u.nvme_passthru.buf = buf;
1281 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1282 
1283 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1284 
1285 	spdk_bdev_io_submit(bdev_io);
1286 	return 0;
1287 }
1288 
1289 int
1290 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1291 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1292 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1293 {
1294 	struct spdk_bdev *bdev = desc->bdev;
1295 	struct spdk_bdev_io *bdev_io;
1296 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1297 
1298 	if (!desc->write) {
1299 		/*
1300 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1301 		 *  to easily determine if the command is a read or write, but for now just
1302 		 *  do not allow io_passthru with a read-only descriptor.
1303 		 */
1304 		return -EBADF;
1305 	}
1306 
1307 	bdev_io = spdk_bdev_get_io();
1308 	if (!bdev_io) {
1309 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1310 		return -ENOMEM;
1311 	}
1312 
1313 	bdev_io->ch = channel;
1314 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
1315 	bdev_io->u.nvme_passthru.cmd = *cmd;
1316 	bdev_io->u.nvme_passthru.buf = buf;
1317 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1318 
1319 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1320 
1321 	spdk_bdev_io_submit(bdev_io);
1322 	return 0;
1323 }
1324 
1325 int
1326 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
1327 {
1328 	if (!bdev_io) {
1329 		SPDK_ERRLOG("bdev_io is NULL\n");
1330 		return -1;
1331 	}
1332 
1333 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
1334 		SPDK_ERRLOG("bdev_io is in pending state\n");
1335 		assert(false);
1336 		return -1;
1337 	}
1338 
1339 	spdk_bdev_put_io(bdev_io);
1340 
1341 	return 0;
1342 }
1343 
1344 static void
1345 _spdk_bdev_io_complete(void *ctx)
1346 {
1347 	struct spdk_bdev_io *bdev_io = ctx;
1348 
1349 	assert(bdev_io->cb != NULL);
1350 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, bdev_io->caller_ctx);
1351 }
1352 
1353 void
1354 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
1355 {
1356 	bdev_io->status = status;
1357 
1358 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
1359 		pthread_mutex_lock(&bdev_io->bdev->mutex);
1360 		bdev_io->bdev->reset_in_progress = false;
1361 		pthread_mutex_unlock(&bdev_io->bdev->mutex);
1362 		if (bdev_io->u.reset.ch_ref != NULL) {
1363 			spdk_put_io_channel(bdev_io->u.reset.ch_ref);
1364 		}
1365 		spdk_for_each_channel(bdev_io->bdev, _spdk_bdev_complete_reset_channel, NULL, NULL);
1366 	} else {
1367 		assert(bdev_io->ch->io_outstanding > 0);
1368 		bdev_io->ch->io_outstanding--;
1369 	}
1370 
1371 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1372 		switch (bdev_io->type) {
1373 		case SPDK_BDEV_IO_TYPE_READ:
1374 			bdev_io->ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
1375 			bdev_io->ch->stat.num_read_ops++;
1376 			break;
1377 		case SPDK_BDEV_IO_TYPE_WRITE:
1378 			bdev_io->ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
1379 			bdev_io->ch->stat.num_write_ops++;
1380 			break;
1381 		default:
1382 			break;
1383 		}
1384 	}
1385 
1386 #ifdef SPDK_CONFIG_VTUNE
1387 	uint64_t now_tsc = spdk_get_ticks();
1388 	if (now_tsc > (bdev_io->ch->start_tsc + bdev_io->ch->interval_tsc)) {
1389 		uint64_t data[5];
1390 
1391 		data[0] = bdev_io->ch->stat.num_read_ops;
1392 		data[1] = bdev_io->ch->stat.bytes_read;
1393 		data[2] = bdev_io->ch->stat.num_write_ops;
1394 		data[3] = bdev_io->ch->stat.bytes_written;
1395 		data[4] = bdev_io->bdev->fn_table->get_spin_time ?
1396 			  bdev_io->bdev->fn_table->get_spin_time(bdev_io->ch->channel) : 0;
1397 
1398 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->ch->handle,
1399 				   __itt_metadata_u64, 5, data);
1400 
1401 		memset(&bdev_io->ch->stat, 0, sizeof(bdev_io->ch->stat));
1402 		bdev_io->ch->start_tsc = now_tsc;
1403 	}
1404 #endif
1405 
1406 	if (bdev_io->in_submit_request || bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
1407 		/*
1408 		 * Defer completion to avoid potential infinite recursion if the
1409 		 * user's completion callback issues a new I/O.
1410 		 */
1411 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->ch->channel),
1412 				     _spdk_bdev_io_complete, bdev_io);
1413 	} else {
1414 		_spdk_bdev_io_complete(bdev_io);
1415 	}
1416 }
1417 
1418 void
1419 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
1420 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
1421 {
1422 	if (sc == SPDK_SCSI_STATUS_GOOD) {
1423 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1424 	} else {
1425 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
1426 		bdev_io->error.scsi.sc = sc;
1427 		bdev_io->error.scsi.sk = sk;
1428 		bdev_io->error.scsi.asc = asc;
1429 		bdev_io->error.scsi.ascq = ascq;
1430 	}
1431 
1432 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1433 }
1434 
1435 void
1436 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
1437 			     int *sc, int *sk, int *asc, int *ascq)
1438 {
1439 	assert(sc != NULL);
1440 	assert(sk != NULL);
1441 	assert(asc != NULL);
1442 	assert(ascq != NULL);
1443 
1444 	switch (bdev_io->status) {
1445 	case SPDK_BDEV_IO_STATUS_SUCCESS:
1446 		*sc = SPDK_SCSI_STATUS_GOOD;
1447 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
1448 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1449 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1450 		break;
1451 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
1452 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
1453 		break;
1454 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
1455 		*sc = bdev_io->error.scsi.sc;
1456 		*sk = bdev_io->error.scsi.sk;
1457 		*asc = bdev_io->error.scsi.asc;
1458 		*ascq = bdev_io->error.scsi.ascq;
1459 		break;
1460 	default:
1461 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
1462 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
1463 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1464 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1465 		break;
1466 	}
1467 }
1468 
1469 void
1470 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
1471 {
1472 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
1473 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1474 	} else {
1475 		bdev_io->error.nvme.sct = sct;
1476 		bdev_io->error.nvme.sc = sc;
1477 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
1478 	}
1479 
1480 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1481 }
1482 
1483 void
1484 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
1485 {
1486 	assert(sct != NULL);
1487 	assert(sc != NULL);
1488 
1489 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
1490 		*sct = bdev_io->error.nvme.sct;
1491 		*sc = bdev_io->error.nvme.sc;
1492 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1493 		*sct = SPDK_NVME_SCT_GENERIC;
1494 		*sc = SPDK_NVME_SC_SUCCESS;
1495 	} else {
1496 		*sct = SPDK_NVME_SCT_GENERIC;
1497 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1498 	}
1499 }
1500 
1501 static void
1502 _spdk_bdev_register(struct spdk_bdev *bdev)
1503 {
1504 	struct spdk_bdev_module_if *module;
1505 
1506 	assert(bdev->module != NULL);
1507 
1508 	bdev->status = SPDK_BDEV_STATUS_READY;
1509 
1510 	TAILQ_INIT(&bdev->open_descs);
1511 	bdev->bdev_opened = false;
1512 
1513 	TAILQ_INIT(&bdev->vbdevs);
1514 	TAILQ_INIT(&bdev->base_bdevs);
1515 
1516 	bdev->reset_in_progress = false;
1517 
1518 	spdk_io_device_register(bdev, spdk_bdev_channel_create, spdk_bdev_channel_destroy,
1519 				sizeof(struct spdk_bdev_channel));
1520 
1521 	pthread_mutex_init(&bdev->mutex, NULL);
1522 	SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Inserting bdev %s into list\n", bdev->name);
1523 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
1524 
1525 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
1526 		if (module->examine) {
1527 			module->action_in_progress++;
1528 			module->examine(bdev);
1529 		}
1530 	}
1531 }
1532 
1533 void
1534 spdk_bdev_register(struct spdk_bdev *bdev)
1535 {
1536 	_spdk_bdev_register(bdev);
1537 }
1538 
1539 void
1540 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
1541 {
1542 	int i;
1543 
1544 	_spdk_bdev_register(vbdev);
1545 	for (i = 0; i < base_bdev_count; i++) {
1546 		assert(base_bdevs[i] != NULL);
1547 		TAILQ_INSERT_TAIL(&vbdev->base_bdevs, base_bdevs[i], base_bdev_link);
1548 		TAILQ_INSERT_TAIL(&base_bdevs[i]->vbdevs, vbdev, vbdev_link);
1549 	}
1550 }
1551 
1552 void
1553 spdk_bdev_unregister(struct spdk_bdev *bdev)
1554 {
1555 	struct spdk_bdev_desc	*desc, *tmp;
1556 	int			rc;
1557 	bool			do_destruct = true;
1558 
1559 	SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Removing bdev %s from list\n", bdev->name);
1560 
1561 	pthread_mutex_lock(&bdev->mutex);
1562 
1563 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
1564 
1565 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
1566 		if (desc->remove_cb) {
1567 			pthread_mutex_unlock(&bdev->mutex);
1568 			do_destruct = false;
1569 			desc->remove_cb(desc->remove_ctx);
1570 			pthread_mutex_lock(&bdev->mutex);
1571 		}
1572 	}
1573 
1574 	if (!do_destruct) {
1575 		pthread_mutex_unlock(&bdev->mutex);
1576 		return;
1577 	}
1578 
1579 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
1580 	pthread_mutex_unlock(&bdev->mutex);
1581 
1582 	pthread_mutex_destroy(&bdev->mutex);
1583 
1584 	spdk_io_device_unregister(bdev, NULL);
1585 
1586 	rc = bdev->fn_table->destruct(bdev->ctxt);
1587 	if (rc < 0) {
1588 		SPDK_ERRLOG("destruct failed\n");
1589 	}
1590 }
1591 
1592 void
1593 spdk_vbdev_unregister(struct spdk_bdev *vbdev)
1594 {
1595 	struct spdk_bdev *base_bdev;
1596 
1597 	assert(!TAILQ_EMPTY(&vbdev->base_bdevs));
1598 	TAILQ_FOREACH(base_bdev, &vbdev->base_bdevs, base_bdev_link) {
1599 		TAILQ_REMOVE(&base_bdev->vbdevs, vbdev, vbdev_link);
1600 	}
1601 	spdk_bdev_unregister(vbdev);
1602 }
1603 
1604 bool
1605 spdk_is_bdev_opened(struct spdk_bdev *bdev)
1606 {
1607 	struct spdk_bdev *base;
1608 
1609 	if (bdev->bdev_opened) {
1610 		return true;
1611 	}
1612 
1613 	TAILQ_FOREACH(base, &bdev->base_bdevs, base_bdev_link) {
1614 		if (spdk_is_bdev_opened(base)) {
1615 			return true;
1616 		}
1617 	}
1618 
1619 	return false;
1620 }
1621 
1622 int
1623 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
1624 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
1625 {
1626 	struct spdk_bdev_desc *desc;
1627 
1628 	desc = calloc(1, sizeof(*desc));
1629 	if (desc == NULL) {
1630 		return -ENOMEM;
1631 	}
1632 
1633 	pthread_mutex_lock(&bdev->mutex);
1634 
1635 	if (write && bdev->claim_module) {
1636 		SPDK_ERRLOG("failed, %s already claimed\n", bdev->name);
1637 		free(desc);
1638 		pthread_mutex_unlock(&bdev->mutex);
1639 		return -EPERM;
1640 	}
1641 
1642 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
1643 
1644 	bdev->bdev_opened = true;
1645 
1646 	desc->bdev = bdev;
1647 	desc->remove_cb = remove_cb;
1648 	desc->remove_ctx = remove_ctx;
1649 	desc->write = write;
1650 	*_desc = desc;
1651 
1652 	pthread_mutex_unlock(&bdev->mutex);
1653 
1654 	return 0;
1655 }
1656 
1657 void
1658 spdk_bdev_close(struct spdk_bdev_desc *desc)
1659 {
1660 	struct spdk_bdev *bdev = desc->bdev;
1661 	bool do_unregister = false;
1662 
1663 	pthread_mutex_lock(&bdev->mutex);
1664 
1665 	bdev->bdev_opened = false;
1666 
1667 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
1668 	free(desc);
1669 
1670 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
1671 		do_unregister = true;
1672 	}
1673 	pthread_mutex_unlock(&bdev->mutex);
1674 
1675 	if (do_unregister == true) {
1676 		spdk_bdev_unregister(bdev);
1677 	}
1678 }
1679 
1680 int
1681 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
1682 			    struct spdk_bdev_module_if *module)
1683 {
1684 	if (bdev->claim_module != NULL) {
1685 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
1686 			    bdev->claim_module->name);
1687 		return -EPERM;
1688 	}
1689 
1690 	if (desc && !desc->write) {
1691 		desc->write = true;
1692 	}
1693 
1694 	bdev->claim_module = module;
1695 	return 0;
1696 }
1697 
1698 void
1699 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
1700 {
1701 	assert(bdev->claim_module != NULL);
1702 	bdev->claim_module = NULL;
1703 }
1704 
1705 struct spdk_bdev *
1706 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
1707 {
1708 	return desc->bdev;
1709 }
1710 
1711 void
1712 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
1713 {
1714 	struct iovec *iovs;
1715 	int iovcnt;
1716 
1717 	if (bdev_io == NULL) {
1718 		return;
1719 	}
1720 
1721 	switch (bdev_io->type) {
1722 	case SPDK_BDEV_IO_TYPE_READ:
1723 		iovs = bdev_io->u.bdev.iovs;
1724 		iovcnt = bdev_io->u.bdev.iovcnt;
1725 		break;
1726 	case SPDK_BDEV_IO_TYPE_WRITE:
1727 		iovs = bdev_io->u.bdev.iovs;
1728 		iovcnt = bdev_io->u.bdev.iovcnt;
1729 		break;
1730 	default:
1731 		iovs = NULL;
1732 		iovcnt = 0;
1733 		break;
1734 	}
1735 
1736 	if (iovp) {
1737 		*iovp = iovs;
1738 	}
1739 	if (iovcntp) {
1740 		*iovcntp = iovcnt;
1741 	}
1742 }
1743 
1744 void
1745 spdk_bdev_module_list_add(struct spdk_bdev_module_if *bdev_module)
1746 {
1747 	/*
1748 	 * Modules with examine callbacks must be initialized first, so they are
1749 	 *  ready to handle examine callbacks from later modules that will
1750 	 *  register physical bdevs.
1751 	 */
1752 	if (bdev_module->examine != NULL) {
1753 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
1754 	} else {
1755 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
1756 	}
1757 }
1758 
1759 void
1760 spdk_bdev_part_base_free(struct spdk_bdev_part_base *base)
1761 {
1762 	assert(base->bdev);
1763 	assert(base->desc);
1764 	spdk_bdev_close(base->desc);
1765 	free(base);
1766 }
1767 
1768 void
1769 spdk_bdev_part_free(struct spdk_bdev_part *part)
1770 {
1771 	struct spdk_bdev_part_base *base;
1772 
1773 	assert(part);
1774 	assert(part->base);
1775 
1776 	base = part->base;
1777 	spdk_io_device_unregister(&part->base, NULL);
1778 	TAILQ_REMOVE(base->tailq, part, tailq);
1779 	free(part->bdev.name);
1780 	free(part);
1781 
1782 	if (__sync_sub_and_fetch(&base->ref, 1) == 0) {
1783 		spdk_bdev_module_release_bdev(base->bdev);
1784 		spdk_bdev_part_base_free(base);
1785 	}
1786 }
1787 
1788 void
1789 spdk_bdev_part_tailq_fini(struct bdev_part_tailq *tailq)
1790 {
1791 	struct spdk_bdev_part *part, *tmp;
1792 
1793 	TAILQ_FOREACH_SAFE(part, tailq, tailq, tmp) {
1794 		spdk_bdev_part_free(part);
1795 	}
1796 }
1797 
1798 void
1799 spdk_bdev_part_base_hotremove(struct spdk_bdev *base_bdev, struct bdev_part_tailq *tailq)
1800 {
1801 	struct spdk_bdev_part *part, *tmp;
1802 
1803 	TAILQ_FOREACH_SAFE(part, tailq, tailq, tmp) {
1804 		if (part->base->bdev == base_bdev) {
1805 			spdk_bdev_unregister(&part->bdev);
1806 		}
1807 	}
1808 }
1809 
1810 static bool
1811 spdk_bdev_part_io_type_supported(void *_part, enum spdk_bdev_io_type io_type)
1812 {
1813 	struct spdk_bdev_part *part = _part;
1814 
1815 	return part->base->bdev->fn_table->io_type_supported(part->base->bdev, io_type);
1816 }
1817 
1818 static struct spdk_io_channel *
1819 spdk_bdev_part_get_io_channel(void *_part)
1820 {
1821 	struct spdk_bdev_part *part = _part;
1822 
1823 	return spdk_get_io_channel(&part->base);
1824 }
1825 
1826 static void
1827 spdk_bdev_part_complete_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1828 {
1829 	struct spdk_bdev_io *part_io = cb_arg;
1830 	int status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
1831 
1832 	spdk_bdev_io_complete(part_io, status);
1833 	spdk_bdev_free_io(bdev_io);
1834 }
1835 
1836 void
1837 spdk_bdev_part_submit_request(struct spdk_bdev_part_channel *ch, struct spdk_bdev_io *bdev_io)
1838 {
1839 	struct spdk_bdev_part *part = ch->part;
1840 	struct spdk_io_channel *base_ch = ch->base_ch;
1841 	struct spdk_bdev_desc *base_desc = part->base->desc;
1842 	uint64_t offset;
1843 	int rc = 0;
1844 
1845 	/* Modify the I/O to adjust for the offset within the base bdev. */
1846 	switch (bdev_io->type) {
1847 	case SPDK_BDEV_IO_TYPE_READ:
1848 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
1849 		rc = spdk_bdev_readv_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
1850 					    bdev_io->u.bdev.iovcnt, offset,
1851 					    bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
1852 					    bdev_io);
1853 		break;
1854 	case SPDK_BDEV_IO_TYPE_WRITE:
1855 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
1856 		rc = spdk_bdev_writev_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
1857 					     bdev_io->u.bdev.iovcnt, offset,
1858 					     bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
1859 					     bdev_io);
1860 		break;
1861 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1862 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
1863 		rc = spdk_bdev_write_zeroes_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
1864 						   spdk_bdev_part_complete_io, bdev_io);
1865 		break;
1866 	case SPDK_BDEV_IO_TYPE_UNMAP:
1867 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
1868 		rc = spdk_bdev_unmap_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
1869 					    spdk_bdev_part_complete_io, bdev_io);
1870 		break;
1871 	case SPDK_BDEV_IO_TYPE_FLUSH:
1872 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
1873 		rc = spdk_bdev_flush_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
1874 					    spdk_bdev_part_complete_io, bdev_io);
1875 		break;
1876 	case SPDK_BDEV_IO_TYPE_RESET:
1877 		rc = spdk_bdev_reset(base_desc, base_ch,
1878 				     spdk_bdev_part_complete_io, bdev_io);
1879 		break;
1880 	default:
1881 		SPDK_ERRLOG("split: unknown I/O type %d\n", bdev_io->type);
1882 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1883 		return;
1884 	}
1885 
1886 	if (rc != 0) {
1887 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1888 	}
1889 }
1890 static int
1891 spdk_bdev_part_channel_create_cb(void *io_device, void *ctx_buf)
1892 {
1893 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
1894 	struct spdk_bdev_part_channel *ch = ctx_buf;
1895 
1896 	ch->part = part;
1897 	ch->base_ch = spdk_bdev_get_io_channel(part->base->desc);
1898 	if (ch->base_ch == NULL) {
1899 		return -1;
1900 	}
1901 
1902 	if (part->base->ch_create_cb) {
1903 		return part->base->ch_create_cb(io_device, ctx_buf);
1904 	} else {
1905 		return 0;
1906 	}
1907 }
1908 
1909 static void
1910 spdk_bdev_part_channel_destroy_cb(void *io_device, void *ctx_buf)
1911 {
1912 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
1913 	struct spdk_bdev_part_channel *ch = ctx_buf;
1914 
1915 	if (part->base->ch_destroy_cb) {
1916 		part->base->ch_destroy_cb(io_device, ctx_buf);
1917 	}
1918 	spdk_put_io_channel(ch->base_ch);
1919 }
1920 
1921 int
1922 spdk_bdev_part_base_construct(struct spdk_bdev_part_base *base, struct spdk_bdev *bdev,
1923 			      spdk_bdev_remove_cb_t remove_cb, struct spdk_bdev_module_if *module,
1924 			      struct spdk_bdev_fn_table *fn_table, struct bdev_part_tailq *tailq,
1925 			      uint32_t channel_size, spdk_io_channel_create_cb ch_create_cb,
1926 			      spdk_io_channel_destroy_cb ch_destroy_cb)
1927 {
1928 	int rc;
1929 
1930 	fn_table->get_io_channel = spdk_bdev_part_get_io_channel;
1931 	fn_table->io_type_supported = spdk_bdev_part_io_type_supported;
1932 
1933 	base->bdev = bdev;
1934 	base->ref = 0;
1935 	base->module = module;
1936 	base->fn_table = fn_table;
1937 	base->tailq = tailq;
1938 	base->claimed = false;
1939 	base->channel_size = channel_size;
1940 	base->ch_create_cb = ch_create_cb;
1941 	base->ch_destroy_cb = ch_destroy_cb;
1942 
1943 	rc = spdk_bdev_open(bdev, false, remove_cb, bdev, &base->desc);
1944 	if (rc) {
1945 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev));
1946 		return -1;
1947 	}
1948 
1949 	return 0;
1950 }
1951 
1952 int
1953 spdk_bdev_part_construct(struct spdk_bdev_part *part, struct spdk_bdev_part_base *base,
1954 			 char *name, uint64_t offset_blocks, uint64_t num_blocks,
1955 			 char *product_name)
1956 {
1957 	part->bdev.name = name;
1958 	part->bdev.blocklen = base->bdev->blocklen;
1959 	part->bdev.blockcnt = num_blocks;
1960 	part->offset_blocks = offset_blocks;
1961 
1962 	part->bdev.write_cache = base->bdev->write_cache;
1963 	part->bdev.need_aligned_buffer = base->bdev->need_aligned_buffer;
1964 	part->bdev.product_name = product_name;
1965 	part->bdev.ctxt = part;
1966 	part->bdev.module = base->module;
1967 	part->bdev.fn_table = base->fn_table;
1968 
1969 	__sync_fetch_and_add(&base->ref, 1);
1970 	part->base = base;
1971 
1972 	if (!base->claimed) {
1973 		int rc;
1974 
1975 		rc = spdk_bdev_module_claim_bdev(base->bdev, base->desc, base->module);
1976 		if (rc) {
1977 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(base->bdev));
1978 			free(part->bdev.name);
1979 			return -1;
1980 		}
1981 		base->claimed = true;
1982 	}
1983 
1984 	spdk_io_device_register(&part->base, spdk_bdev_part_channel_create_cb,
1985 				spdk_bdev_part_channel_destroy_cb,
1986 				base->channel_size);
1987 	spdk_vbdev_register(&part->bdev, &base->bdev, 1);
1988 	TAILQ_INSERT_TAIL(base->tailq, part, tailq);
1989 
1990 	return 0;
1991 }
1992 
1993 SPDK_LOG_REGISTER_TRACE_FLAG("bdev", SPDK_TRACE_BDEV)
1994