xref: /spdk/lib/bdev/bdev.c (revision 7e846d2bb99838a21b042dd2db1d0e36eb17f95c)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/likely.h"
42 #include "spdk/queue.h"
43 #include "spdk/nvme_spec.h"
44 #include "spdk/scsi_spec.h"
45 #include "spdk/util.h"
46 
47 #include "spdk_internal/bdev.h"
48 #include "spdk_internal/log.h"
49 #include "spdk/string.h"
50 
51 #ifdef SPDK_CONFIG_VTUNE
52 #include "ittnotify.h"
53 #include "ittnotify_types.h"
54 int __itt_init_ittlib(const char *, __itt_group_id);
55 #endif
56 
57 #define SPDK_BDEV_IO_POOL_SIZE	(64 * 1024)
58 #define BUF_SMALL_POOL_SIZE	8192
59 #define BUF_LARGE_POOL_SIZE	1024
60 
61 typedef TAILQ_HEAD(, spdk_bdev_io) bdev_io_tailq_t;
62 
63 struct spdk_bdev_mgr {
64 	struct spdk_mempool *bdev_io_pool;
65 
66 	struct spdk_mempool *buf_small_pool;
67 	struct spdk_mempool *buf_large_pool;
68 
69 	TAILQ_HEAD(, spdk_bdev_module_if) bdev_modules;
70 
71 	TAILQ_HEAD(, spdk_bdev) bdevs;
72 
73 	spdk_bdev_poller_start_cb start_poller_fn;
74 	spdk_bdev_poller_stop_cb stop_poller_fn;
75 
76 	bool init_complete;
77 	bool module_init_complete;
78 
79 #ifdef SPDK_CONFIG_VTUNE
80 	__itt_domain	*domain;
81 #endif
82 };
83 
84 static struct spdk_bdev_mgr g_bdev_mgr = {
85 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
86 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
87 	.start_poller_fn = NULL,
88 	.stop_poller_fn = NULL,
89 	.init_complete = false,
90 	.module_init_complete = false,
91 };
92 
93 static spdk_bdev_init_cb	g_cb_fn = NULL;
94 static void			*g_cb_arg = NULL;
95 
96 
97 struct spdk_bdev_mgmt_channel {
98 	bdev_io_tailq_t need_buf_small;
99 	bdev_io_tailq_t need_buf_large;
100 };
101 
102 struct spdk_bdev_desc {
103 	struct spdk_bdev		*bdev;
104 	spdk_bdev_remove_cb_t		remove_cb;
105 	void				*remove_ctx;
106 	bool				write;
107 	TAILQ_ENTRY(spdk_bdev_desc)	link;
108 };
109 
110 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
111 
112 struct spdk_bdev_channel {
113 	struct spdk_bdev	*bdev;
114 
115 	/* The channel for the underlying device */
116 	struct spdk_io_channel	*channel;
117 
118 	/* Channel for the bdev manager */
119 	struct spdk_io_channel *mgmt_channel;
120 
121 	struct spdk_bdev_io_stat stat;
122 
123 	/*
124 	 * Count of I/O submitted to bdev module and waiting for completion.
125 	 * Incremented before submit_request() is called on an spdk_bdev_io.
126 	 */
127 	uint64_t		io_outstanding;
128 
129 	bdev_io_tailq_t		queued_resets;
130 
131 	uint32_t		flags;
132 
133 #ifdef SPDK_CONFIG_VTUNE
134 	uint64_t		start_tsc;
135 	uint64_t		interval_tsc;
136 	__itt_string_handle	*handle;
137 #endif
138 
139 };
140 
141 struct spdk_bdev *
142 spdk_bdev_first(void)
143 {
144 	struct spdk_bdev *bdev;
145 
146 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
147 	if (bdev) {
148 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Starting bdev iteration at %s\n", bdev->name);
149 	}
150 
151 	return bdev;
152 }
153 
154 struct spdk_bdev *
155 spdk_bdev_next(struct spdk_bdev *prev)
156 {
157 	struct spdk_bdev *bdev;
158 
159 	bdev = TAILQ_NEXT(prev, link);
160 	if (bdev) {
161 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
162 	}
163 
164 	return bdev;
165 }
166 
167 static struct spdk_bdev *
168 _bdev_next_leaf(struct spdk_bdev *bdev)
169 {
170 	while (bdev != NULL) {
171 		if (TAILQ_EMPTY(&bdev->vbdevs)) {
172 			return bdev;
173 		} else {
174 			bdev = TAILQ_NEXT(bdev, link);
175 		}
176 	}
177 
178 	return bdev;
179 }
180 
181 struct spdk_bdev *
182 spdk_bdev_first_leaf(void)
183 {
184 	struct spdk_bdev *bdev;
185 
186 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
187 
188 	if (bdev) {
189 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Starting bdev iteration at %s\n", bdev->name);
190 	}
191 
192 	return bdev;
193 }
194 
195 struct spdk_bdev *
196 spdk_bdev_next_leaf(struct spdk_bdev *prev)
197 {
198 	struct spdk_bdev *bdev;
199 
200 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
201 
202 	if (bdev) {
203 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
204 	}
205 
206 	return bdev;
207 }
208 
209 struct spdk_bdev *
210 spdk_bdev_get_by_name(const char *bdev_name)
211 {
212 	struct spdk_bdev *bdev = spdk_bdev_first();
213 
214 	while (bdev != NULL) {
215 		if (strcmp(bdev_name, bdev->name) == 0) {
216 			return bdev;
217 		}
218 		bdev = spdk_bdev_next(bdev);
219 	}
220 
221 	return NULL;
222 }
223 
224 static void
225 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
226 {
227 	assert(bdev_io->get_buf_cb != NULL);
228 	assert(buf != NULL);
229 	assert(bdev_io->u.bdev.iovs != NULL);
230 
231 	bdev_io->buf = buf;
232 	bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
233 	bdev_io->u.bdev.iovs[0].iov_len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
234 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
235 }
236 
237 static void
238 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
239 {
240 	struct spdk_mempool *pool;
241 	struct spdk_bdev_io *tmp;
242 	void *buf;
243 	bdev_io_tailq_t *tailq;
244 	uint64_t length;
245 	struct spdk_bdev_mgmt_channel *ch;
246 
247 	assert(bdev_io->u.bdev.iovcnt == 1);
248 
249 	length = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
250 	buf = bdev_io->buf;
251 
252 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
253 
254 	if (length <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
255 		pool = g_bdev_mgr.buf_small_pool;
256 		tailq = &ch->need_buf_small;
257 	} else {
258 		pool = g_bdev_mgr.buf_large_pool;
259 		tailq = &ch->need_buf_large;
260 	}
261 
262 	if (TAILQ_EMPTY(tailq)) {
263 		spdk_mempool_put(pool, buf);
264 	} else {
265 		tmp = TAILQ_FIRST(tailq);
266 		TAILQ_REMOVE(tailq, tmp, buf_link);
267 		spdk_bdev_io_set_buf(tmp, buf);
268 	}
269 }
270 
271 void
272 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb)
273 {
274 	uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
275 	struct spdk_mempool *pool;
276 	bdev_io_tailq_t *tailq;
277 	void *buf = NULL;
278 	struct spdk_bdev_mgmt_channel *ch;
279 
280 	assert(cb != NULL);
281 	assert(bdev_io->u.bdev.iovs != NULL);
282 
283 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
284 		/* Buffer already present */
285 		cb(bdev_io->ch->channel, bdev_io);
286 		return;
287 	}
288 
289 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
290 
291 	bdev_io->get_buf_cb = cb;
292 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
293 		pool = g_bdev_mgr.buf_small_pool;
294 		tailq = &ch->need_buf_small;
295 	} else {
296 		pool = g_bdev_mgr.buf_large_pool;
297 		tailq = &ch->need_buf_large;
298 	}
299 
300 	buf = spdk_mempool_get(pool);
301 
302 	if (!buf) {
303 		TAILQ_INSERT_TAIL(tailq, bdev_io, buf_link);
304 	} else {
305 		spdk_bdev_io_set_buf(bdev_io, buf);
306 	}
307 }
308 
309 static int
310 spdk_bdev_module_get_max_ctx_size(void)
311 {
312 	struct spdk_bdev_module_if *bdev_module;
313 	int max_bdev_module_size = 0;
314 
315 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
316 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
317 			max_bdev_module_size = bdev_module->get_ctx_size();
318 		}
319 	}
320 
321 	return max_bdev_module_size;
322 }
323 
324 void
325 spdk_bdev_config_text(FILE *fp)
326 {
327 	struct spdk_bdev_module_if *bdev_module;
328 
329 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
330 		if (bdev_module->config_text) {
331 			bdev_module->config_text(fp);
332 		}
333 	}
334 }
335 
336 static int
337 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
338 {
339 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
340 
341 	TAILQ_INIT(&ch->need_buf_small);
342 	TAILQ_INIT(&ch->need_buf_large);
343 
344 	return 0;
345 }
346 
347 static void
348 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
349 {
350 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
351 
352 	if (!TAILQ_EMPTY(&ch->need_buf_small) || !TAILQ_EMPTY(&ch->need_buf_large)) {
353 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel destruction\n");
354 	}
355 }
356 
357 static void
358 spdk_bdev_init_complete(int rc)
359 {
360 	spdk_bdev_init_cb cb_fn = g_cb_fn;
361 	void *cb_arg = g_cb_arg;
362 
363 	g_bdev_mgr.init_complete = true;
364 	g_cb_fn = NULL;
365 	g_cb_arg = NULL;
366 
367 	cb_fn(cb_arg, rc);
368 }
369 
370 static void
371 spdk_bdev_module_action_complete(void)
372 {
373 	struct spdk_bdev_module_if *m;
374 
375 	/*
376 	 * Don't finish bdev subsystem initialization if
377 	 * module pre-initialization is still in progress, or
378 	 * the subsystem been already initialized.
379 	 */
380 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
381 		return;
382 	}
383 
384 	/*
385 	 * Check all bdev modules for inits/examinations in progress. If any
386 	 * exist, return immediately since we cannot finish bdev subsystem
387 	 * initialization until all are completed.
388 	 */
389 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
390 		if (m->action_in_progress > 0) {
391 			return;
392 		}
393 	}
394 
395 	/*
396 	 * Modules already finished initialization - now that all
397 	 * the bdev modules have finished their asynchronous I/O
398 	 * processing, the entire bdev layer can be marked as complete.
399 	 */
400 	spdk_bdev_init_complete(0);
401 }
402 
403 static void
404 spdk_bdev_module_action_done(struct spdk_bdev_module_if *module)
405 {
406 	assert(module->action_in_progress > 0);
407 	module->action_in_progress--;
408 	spdk_bdev_module_action_complete();
409 }
410 
411 void
412 spdk_bdev_module_init_done(struct spdk_bdev_module_if *module)
413 {
414 	spdk_bdev_module_action_done(module);
415 }
416 
417 void
418 spdk_bdev_module_examine_done(struct spdk_bdev_module_if *module)
419 {
420 	spdk_bdev_module_action_done(module);
421 }
422 
423 static int
424 spdk_bdev_modules_init(void)
425 {
426 	struct spdk_bdev_module_if *module;
427 	int rc = 0;
428 
429 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
430 		rc = module->module_init();
431 		if (rc != 0) {
432 			break;
433 		}
434 	}
435 
436 	g_bdev_mgr.module_init_complete = true;
437 	return rc;
438 }
439 
440 void
441 spdk_bdev_poller_start(struct spdk_bdev_poller **ppoller,
442 		       spdk_bdev_poller_fn fn,
443 		       void *arg,
444 		       uint32_t lcore,
445 		       uint64_t period_microseconds)
446 {
447 	g_bdev_mgr.start_poller_fn(ppoller, fn, arg, lcore, period_microseconds);
448 }
449 
450 void
451 spdk_bdev_poller_stop(struct spdk_bdev_poller **ppoller)
452 {
453 	g_bdev_mgr.stop_poller_fn(ppoller);
454 }
455 
456 void
457 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg,
458 		     spdk_bdev_poller_start_cb start_poller_fn,
459 		     spdk_bdev_poller_stop_cb stop_poller_fn)
460 {
461 	int cache_size;
462 	int rc = 0;
463 	char mempool_name[32];
464 
465 	assert(cb_fn != NULL);
466 
467 	g_cb_fn = cb_fn;
468 	g_cb_arg = cb_arg;
469 
470 	g_bdev_mgr.start_poller_fn = start_poller_fn;
471 	g_bdev_mgr.stop_poller_fn = stop_poller_fn;
472 
473 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
474 
475 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
476 				  SPDK_BDEV_IO_POOL_SIZE,
477 				  sizeof(struct spdk_bdev_io) +
478 				  spdk_bdev_module_get_max_ctx_size(),
479 				  64,
480 				  SPDK_ENV_SOCKET_ID_ANY);
481 
482 	if (g_bdev_mgr.bdev_io_pool == NULL) {
483 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
484 		spdk_bdev_init_complete(-1);
485 		return;
486 	}
487 
488 	/**
489 	 * Ensure no more than half of the total buffers end up local caches, by
490 	 *   using spdk_env_get_core_count() to determine how many local caches we need
491 	 *   to account for.
492 	 */
493 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
494 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
495 
496 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
497 				    BUF_SMALL_POOL_SIZE,
498 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
499 				    cache_size,
500 				    SPDK_ENV_SOCKET_ID_ANY);
501 	if (!g_bdev_mgr.buf_small_pool) {
502 		SPDK_ERRLOG("create rbuf small pool failed\n");
503 		spdk_bdev_init_complete(-1);
504 		return;
505 	}
506 
507 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
508 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
509 
510 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
511 				    BUF_LARGE_POOL_SIZE,
512 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
513 				    cache_size,
514 				    SPDK_ENV_SOCKET_ID_ANY);
515 	if (!g_bdev_mgr.buf_large_pool) {
516 		SPDK_ERRLOG("create rbuf large pool failed\n");
517 		spdk_bdev_init_complete(-1);
518 		return;
519 	}
520 
521 #ifdef SPDK_CONFIG_VTUNE
522 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
523 #endif
524 
525 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
526 				spdk_bdev_mgmt_channel_destroy,
527 				sizeof(struct spdk_bdev_mgmt_channel));
528 
529 	rc = spdk_bdev_modules_init();
530 	if (rc != 0) {
531 		SPDK_ERRLOG("bdev modules init failed\n");
532 		spdk_bdev_init_complete(-1);
533 		return;
534 	}
535 
536 	spdk_bdev_module_action_complete();
537 }
538 
539 int
540 spdk_bdev_finish(void)
541 {
542 	struct spdk_bdev_module_if *bdev_module;
543 
544 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
545 		if (bdev_module->module_fini) {
546 			bdev_module->module_fini();
547 		}
548 	}
549 
550 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
551 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
552 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
553 			    SPDK_BDEV_IO_POOL_SIZE);
554 	}
555 
556 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
557 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
558 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
559 			    BUF_SMALL_POOL_SIZE);
560 		assert(false);
561 	}
562 
563 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
564 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
565 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
566 			    BUF_LARGE_POOL_SIZE);
567 		assert(false);
568 	}
569 
570 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
571 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
572 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
573 
574 	spdk_io_device_unregister(&g_bdev_mgr, NULL);
575 
576 	return 0;
577 }
578 
579 struct spdk_bdev_io *
580 spdk_bdev_get_io(void)
581 {
582 	struct spdk_bdev_io *bdev_io;
583 
584 	bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
585 	if (!bdev_io) {
586 		SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
587 		abort();
588 	}
589 
590 	memset(bdev_io, 0, offsetof(struct spdk_bdev_io, u));
591 
592 	return bdev_io;
593 }
594 
595 static void
596 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
597 {
598 	if (!bdev_io) {
599 		return;
600 	}
601 
602 	if (bdev_io->buf != NULL) {
603 		spdk_bdev_io_put_buf(bdev_io);
604 	}
605 
606 	spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
607 }
608 
609 static void
610 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
611 {
612 	struct spdk_bdev *bdev = bdev_io->bdev;
613 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
614 	struct spdk_io_channel *ch = bdev_ch->channel;
615 
616 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
617 
618 	bdev_ch->io_outstanding++;
619 	bdev_io->in_submit_request = true;
620 	if (spdk_likely(bdev_ch->flags == 0)) {
621 		bdev->fn_table->submit_request(ch, bdev_io);
622 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
623 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
624 	} else {
625 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
626 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
627 	}
628 	bdev_io->in_submit_request = false;
629 }
630 
631 static void
632 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
633 {
634 	struct spdk_bdev *bdev = bdev_io->bdev;
635 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
636 	struct spdk_io_channel *ch = bdev_ch->channel;
637 
638 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
639 
640 	bdev_io->in_submit_request = true;
641 	bdev->fn_table->submit_request(ch, bdev_io);
642 	bdev_io->in_submit_request = false;
643 }
644 
645 static void
646 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
647 		  struct spdk_bdev *bdev, void *cb_arg,
648 		  spdk_bdev_io_completion_cb cb)
649 {
650 	bdev_io->bdev = bdev;
651 	bdev_io->caller_ctx = cb_arg;
652 	bdev_io->cb = cb;
653 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
654 	bdev_io->in_submit_request = false;
655 }
656 
657 bool
658 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
659 {
660 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
661 }
662 
663 int
664 spdk_bdev_dump_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
665 {
666 	if (bdev->fn_table->dump_config_json) {
667 		return bdev->fn_table->dump_config_json(bdev->ctxt, w);
668 	}
669 
670 	return 0;
671 }
672 
673 static int
674 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
675 {
676 	struct spdk_bdev		*bdev = io_device;
677 	struct spdk_bdev_channel	*ch = ctx_buf;
678 
679 	ch->bdev = io_device;
680 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
681 	ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr);
682 	memset(&ch->stat, 0, sizeof(ch->stat));
683 	ch->io_outstanding = 0;
684 	TAILQ_INIT(&ch->queued_resets);
685 	ch->flags = 0;
686 
687 #ifdef SPDK_CONFIG_VTUNE
688 	{
689 		char *name;
690 		__itt_init_ittlib(NULL, 0);
691 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
692 		if (!name) {
693 			return -1;
694 		}
695 		ch->handle = __itt_string_handle_create(name);
696 		free(name);
697 		ch->start_tsc = spdk_get_ticks();
698 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
699 	}
700 #endif
701 
702 	return 0;
703 }
704 
705 /*
706  * Abort I/O that are waiting on a data buffer.  These types of I/O are
707  *  linked using the spdk_bdev_io buf_link TAILQ_ENTRY.
708  */
709 static void
710 _spdk_bdev_abort_buf_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
711 {
712 	struct spdk_bdev_io *bdev_io, *tmp;
713 
714 	TAILQ_FOREACH_SAFE(bdev_io, queue, buf_link, tmp) {
715 		if (bdev_io->ch == ch) {
716 			TAILQ_REMOVE(queue, bdev_io, buf_link);
717 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
718 		}
719 	}
720 }
721 
722 /*
723  * Abort I/O that are queued waiting for submission.  These types of I/O are
724  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
725  */
726 static void
727 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
728 {
729 	struct spdk_bdev_io *bdev_io, *tmp;
730 
731 	TAILQ_FOREACH_SAFE(bdev_io, queue, link, tmp) {
732 		if (bdev_io->ch == ch) {
733 			TAILQ_REMOVE(queue, bdev_io, link);
734 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
735 		}
736 	}
737 }
738 
739 static void
740 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
741 {
742 	struct spdk_bdev_channel	*ch = ctx_buf;
743 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
744 
745 	mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
746 
747 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
748 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, ch);
749 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, ch);
750 
751 	spdk_put_io_channel(ch->channel);
752 	spdk_put_io_channel(ch->mgmt_channel);
753 	assert(ch->io_outstanding == 0);
754 }
755 
756 struct spdk_io_channel *
757 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
758 {
759 	return spdk_get_io_channel(desc->bdev);
760 }
761 
762 const char *
763 spdk_bdev_get_name(const struct spdk_bdev *bdev)
764 {
765 	return bdev->name;
766 }
767 
768 const char *
769 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
770 {
771 	return bdev->product_name;
772 }
773 
774 uint32_t
775 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
776 {
777 	return bdev->blocklen;
778 }
779 
780 uint64_t
781 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
782 {
783 	return bdev->blockcnt;
784 }
785 
786 size_t
787 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
788 {
789 	/* TODO: push this logic down to the bdev modules */
790 	if (bdev->need_aligned_buffer) {
791 		return bdev->blocklen;
792 	}
793 
794 	return 1;
795 }
796 
797 uint32_t
798 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
799 {
800 	return bdev->optimal_io_boundary;
801 }
802 
803 bool
804 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
805 {
806 	return bdev->write_cache;
807 }
808 
809 /*
810  * Convert I/O offset and length from bytes to blocks.
811  *
812  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
813  */
814 static uint64_t
815 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
816 			  uint64_t num_bytes, uint64_t *num_blocks)
817 {
818 	uint32_t block_size = bdev->blocklen;
819 
820 	*offset_blocks = offset_bytes / block_size;
821 	*num_blocks = num_bytes / block_size;
822 
823 	return (offset_bytes % block_size) | (num_bytes % block_size);
824 }
825 
826 static bool
827 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
828 {
829 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
830 	 * has been an overflow and hence the offset has been wrapped around */
831 	if (offset_blocks + num_blocks < offset_blocks) {
832 		return false;
833 	}
834 
835 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
836 	if (offset_blocks + num_blocks > bdev->blockcnt) {
837 		return false;
838 	}
839 
840 	return true;
841 }
842 
843 int
844 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
845 	       void *buf, uint64_t offset, uint64_t nbytes,
846 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
847 {
848 	uint64_t offset_blocks, num_blocks;
849 
850 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
851 		return -EINVAL;
852 	}
853 
854 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
855 }
856 
857 int
858 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
859 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
860 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
861 {
862 	struct spdk_bdev *bdev = desc->bdev;
863 	struct spdk_bdev_io *bdev_io;
864 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
865 
866 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
867 		return -EINVAL;
868 	}
869 
870 	bdev_io = spdk_bdev_get_io();
871 	if (!bdev_io) {
872 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
873 		return -ENOMEM;
874 	}
875 
876 	bdev_io->ch = channel;
877 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
878 	bdev_io->u.bdev.iov.iov_base = buf;
879 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
880 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
881 	bdev_io->u.bdev.iovcnt = 1;
882 	bdev_io->u.bdev.num_blocks = num_blocks;
883 	bdev_io->u.bdev.offset_blocks = offset_blocks;
884 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
885 
886 	spdk_bdev_io_submit(bdev_io);
887 	return 0;
888 }
889 
890 int
891 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
892 		struct iovec *iov, int iovcnt,
893 		uint64_t offset, uint64_t nbytes,
894 		spdk_bdev_io_completion_cb cb, void *cb_arg)
895 {
896 	uint64_t offset_blocks, num_blocks;
897 
898 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
899 		return -EINVAL;
900 	}
901 
902 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
903 }
904 
905 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
906 			   struct iovec *iov, int iovcnt,
907 			   uint64_t offset_blocks, uint64_t num_blocks,
908 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
909 {
910 	struct spdk_bdev *bdev = desc->bdev;
911 	struct spdk_bdev_io *bdev_io;
912 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
913 
914 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
915 		return -EINVAL;
916 	}
917 
918 	bdev_io = spdk_bdev_get_io();
919 	if (!bdev_io) {
920 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
921 		return -ENOMEM;
922 	}
923 
924 	bdev_io->ch = channel;
925 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
926 	bdev_io->u.bdev.iovs = iov;
927 	bdev_io->u.bdev.iovcnt = iovcnt;
928 	bdev_io->u.bdev.num_blocks = num_blocks;
929 	bdev_io->u.bdev.offset_blocks = offset_blocks;
930 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
931 
932 	spdk_bdev_io_submit(bdev_io);
933 	return 0;
934 }
935 
936 int
937 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
938 		void *buf, uint64_t offset, uint64_t nbytes,
939 		spdk_bdev_io_completion_cb cb, void *cb_arg)
940 {
941 	uint64_t offset_blocks, num_blocks;
942 
943 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
944 		return -EINVAL;
945 	}
946 
947 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
948 }
949 
950 int
951 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
952 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
953 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
954 {
955 	struct spdk_bdev *bdev = desc->bdev;
956 	struct spdk_bdev_io *bdev_io;
957 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
958 
959 	if (!desc->write) {
960 		return -EBADF;
961 	}
962 
963 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
964 		return -EINVAL;
965 	}
966 
967 	bdev_io = spdk_bdev_get_io();
968 	if (!bdev_io) {
969 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
970 		return -ENOMEM;
971 	}
972 
973 	bdev_io->ch = channel;
974 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
975 	bdev_io->u.bdev.iov.iov_base = buf;
976 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
977 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
978 	bdev_io->u.bdev.iovcnt = 1;
979 	bdev_io->u.bdev.num_blocks = num_blocks;
980 	bdev_io->u.bdev.offset_blocks = offset_blocks;
981 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
982 
983 	spdk_bdev_io_submit(bdev_io);
984 	return 0;
985 }
986 
987 int
988 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
989 		 struct iovec *iov, int iovcnt,
990 		 uint64_t offset, uint64_t len,
991 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
992 {
993 	uint64_t offset_blocks, num_blocks;
994 
995 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
996 		return -EINVAL;
997 	}
998 
999 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1000 }
1001 
1002 int
1003 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1004 			struct iovec *iov, int iovcnt,
1005 			uint64_t offset_blocks, uint64_t num_blocks,
1006 			spdk_bdev_io_completion_cb cb, void *cb_arg)
1007 {
1008 	struct spdk_bdev *bdev = desc->bdev;
1009 	struct spdk_bdev_io *bdev_io;
1010 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1011 
1012 	if (!desc->write) {
1013 		return -EBADF;
1014 	}
1015 
1016 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1017 		return -EINVAL;
1018 	}
1019 
1020 	bdev_io = spdk_bdev_get_io();
1021 	if (!bdev_io) {
1022 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
1023 		return -ENOMEM;
1024 	}
1025 
1026 	bdev_io->ch = channel;
1027 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1028 	bdev_io->u.bdev.iovs = iov;
1029 	bdev_io->u.bdev.iovcnt = iovcnt;
1030 	bdev_io->u.bdev.num_blocks = num_blocks;
1031 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1032 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1033 
1034 	spdk_bdev_io_submit(bdev_io);
1035 	return 0;
1036 }
1037 
1038 int
1039 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1040 		       uint64_t offset, uint64_t len,
1041 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1042 {
1043 	uint64_t offset_blocks, num_blocks;
1044 
1045 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1046 		return -EINVAL;
1047 	}
1048 
1049 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1050 }
1051 
1052 int
1053 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1054 			      uint64_t offset_blocks, uint64_t num_blocks,
1055 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1056 {
1057 	struct spdk_bdev *bdev = desc->bdev;
1058 	struct spdk_bdev_io *bdev_io;
1059 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1060 
1061 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1062 		return -EINVAL;
1063 	}
1064 
1065 	bdev_io = spdk_bdev_get_io();
1066 	if (!bdev_io) {
1067 		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
1068 		return -ENOMEM;
1069 	}
1070 
1071 	bdev_io->ch = channel;
1072 	bdev_io->u.bdev.iovs = NULL;
1073 	bdev_io->u.bdev.iovcnt = 0;
1074 	bdev_io->u.bdev.num_blocks = num_blocks;
1075 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1076 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1077 
1078 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1079 
1080 	spdk_bdev_io_submit(bdev_io);
1081 	return 0;
1082 }
1083 
1084 int
1085 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1086 		uint64_t offset, uint64_t nbytes,
1087 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1088 {
1089 	uint64_t offset_blocks, num_blocks;
1090 
1091 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1092 		return -EINVAL;
1093 	}
1094 
1095 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1096 }
1097 
1098 int
1099 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1100 		       uint64_t offset_blocks, uint64_t num_blocks,
1101 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1102 {
1103 	struct spdk_bdev *bdev = desc->bdev;
1104 	struct spdk_bdev_io *bdev_io;
1105 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1106 
1107 	if (!desc->write) {
1108 		return -EBADF;
1109 	}
1110 
1111 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1112 		return -EINVAL;
1113 	}
1114 
1115 	if (num_blocks == 0) {
1116 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
1117 		return -EINVAL;
1118 	}
1119 
1120 	bdev_io = spdk_bdev_get_io();
1121 	if (!bdev_io) {
1122 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
1123 		return -ENOMEM;
1124 	}
1125 
1126 	bdev_io->ch = channel;
1127 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1128 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1129 	bdev_io->u.bdev.iovcnt = 1;
1130 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1131 	bdev_io->u.bdev.num_blocks = num_blocks;
1132 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1133 
1134 	spdk_bdev_io_submit(bdev_io);
1135 	return 0;
1136 }
1137 
1138 int
1139 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1140 		uint64_t offset, uint64_t length,
1141 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1142 {
1143 	uint64_t offset_blocks, num_blocks;
1144 
1145 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
1146 		return -EINVAL;
1147 	}
1148 
1149 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1150 }
1151 
1152 int
1153 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1154 		       uint64_t offset_blocks, uint64_t num_blocks,
1155 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1156 {
1157 	struct spdk_bdev *bdev = desc->bdev;
1158 	struct spdk_bdev_io *bdev_io;
1159 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1160 
1161 	if (!desc->write) {
1162 		return -EBADF;
1163 	}
1164 
1165 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1166 		return -EINVAL;
1167 	}
1168 
1169 	bdev_io = spdk_bdev_get_io();
1170 	if (!bdev_io) {
1171 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1172 		return -ENOMEM;
1173 	}
1174 
1175 	bdev_io->ch = channel;
1176 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1177 	bdev_io->u.bdev.iovs = NULL;
1178 	bdev_io->u.bdev.iovcnt = 0;
1179 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1180 	bdev_io->u.bdev.num_blocks = num_blocks;
1181 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1182 
1183 	spdk_bdev_io_submit(bdev_io);
1184 	return 0;
1185 }
1186 
1187 static void
1188 _spdk_bdev_reset_dev(void *io_device, void *ctx)
1189 {
1190 	struct spdk_bdev_channel *ch = ctx;
1191 	struct spdk_bdev_io *bdev_io;
1192 
1193 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
1194 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, link);
1195 	spdk_bdev_io_submit_reset(bdev_io);
1196 }
1197 
1198 static void
1199 _spdk_bdev_reset_abort_channel(void *io_device, struct spdk_io_channel *ch,
1200 			       void *ctx)
1201 {
1202 	struct spdk_bdev_channel	*channel;
1203 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1204 
1205 	channel = spdk_io_channel_get_ctx(ch);
1206 	mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
1207 
1208 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
1209 
1210 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
1211 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
1212 }
1213 
1214 static void
1215 _spdk_bdev_start_reset(void *ctx)
1216 {
1217 	struct spdk_bdev_channel *ch = ctx;
1218 
1219 	spdk_for_each_channel(ch->bdev, _spdk_bdev_reset_abort_channel,
1220 			      ch, _spdk_bdev_reset_dev);
1221 }
1222 
1223 static void
1224 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
1225 {
1226 	struct spdk_bdev *bdev = ch->bdev;
1227 
1228 	assert(!TAILQ_EMPTY(&ch->queued_resets));
1229 
1230 	pthread_mutex_lock(&bdev->mutex);
1231 	if (bdev->reset_in_progress == NULL) {
1232 		bdev->reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
1233 		/*
1234 		 * Take a channel reference for the target bdev for the life of this
1235 		 *  reset.  This guards against the channel getting destroyed while
1236 		 *  spdk_for_each_channel() calls related to this reset IO are in
1237 		 *  progress.  We will release the reference when this reset is
1238 		 *  completed.
1239 		 */
1240 		bdev->reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(bdev);
1241 		_spdk_bdev_start_reset(ch);
1242 	}
1243 	pthread_mutex_unlock(&bdev->mutex);
1244 }
1245 
1246 static void
1247 _spdk_bdev_complete_reset_channel(void *io_device, struct spdk_io_channel *_ch, void *ctx)
1248 {
1249 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
1250 
1251 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
1252 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
1253 		_spdk_bdev_channel_start_reset(ch);
1254 	}
1255 }
1256 
1257 int
1258 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1259 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1260 {
1261 	struct spdk_bdev *bdev = desc->bdev;
1262 	struct spdk_bdev_io *bdev_io;
1263 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1264 
1265 	bdev_io = spdk_bdev_get_io();
1266 	if (!bdev_io) {
1267 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1268 		return -ENOMEM;
1269 	}
1270 
1271 	bdev_io->ch = channel;
1272 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1273 	bdev_io->u.reset.ch_ref = NULL;
1274 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1275 
1276 	pthread_mutex_lock(&bdev->mutex);
1277 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, link);
1278 	pthread_mutex_unlock(&bdev->mutex);
1279 
1280 	_spdk_bdev_channel_start_reset(channel);
1281 
1282 	return 0;
1283 }
1284 
1285 void
1286 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1287 		      struct spdk_bdev_io_stat *stat)
1288 {
1289 #ifdef SPDK_CONFIG_VTUNE
1290 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1291 	memset(stat, 0, sizeof(*stat));
1292 	return;
1293 #endif
1294 
1295 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1296 
1297 	*stat = channel->stat;
1298 	memset(&channel->stat, 0, sizeof(channel->stat));
1299 }
1300 
1301 int
1302 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1303 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1304 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1305 {
1306 	struct spdk_bdev *bdev = desc->bdev;
1307 	struct spdk_bdev_io *bdev_io;
1308 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1309 
1310 	if (!desc->write) {
1311 		return -EBADF;
1312 	}
1313 
1314 	bdev_io = spdk_bdev_get_io();
1315 	if (!bdev_io) {
1316 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1317 		return -ENOMEM;
1318 	}
1319 
1320 	bdev_io->ch = channel;
1321 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
1322 	bdev_io->u.nvme_passthru.cmd = *cmd;
1323 	bdev_io->u.nvme_passthru.buf = buf;
1324 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1325 
1326 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1327 
1328 	spdk_bdev_io_submit(bdev_io);
1329 	return 0;
1330 }
1331 
1332 int
1333 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1334 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1335 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1336 {
1337 	struct spdk_bdev *bdev = desc->bdev;
1338 	struct spdk_bdev_io *bdev_io;
1339 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1340 
1341 	if (!desc->write) {
1342 		/*
1343 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1344 		 *  to easily determine if the command is a read or write, but for now just
1345 		 *  do not allow io_passthru with a read-only descriptor.
1346 		 */
1347 		return -EBADF;
1348 	}
1349 
1350 	bdev_io = spdk_bdev_get_io();
1351 	if (!bdev_io) {
1352 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1353 		return -ENOMEM;
1354 	}
1355 
1356 	bdev_io->ch = channel;
1357 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
1358 	bdev_io->u.nvme_passthru.cmd = *cmd;
1359 	bdev_io->u.nvme_passthru.buf = buf;
1360 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1361 
1362 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1363 
1364 	spdk_bdev_io_submit(bdev_io);
1365 	return 0;
1366 }
1367 
1368 int
1369 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
1370 {
1371 	if (!bdev_io) {
1372 		SPDK_ERRLOG("bdev_io is NULL\n");
1373 		return -1;
1374 	}
1375 
1376 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
1377 		SPDK_ERRLOG("bdev_io is in pending state\n");
1378 		assert(false);
1379 		return -1;
1380 	}
1381 
1382 	spdk_bdev_put_io(bdev_io);
1383 
1384 	return 0;
1385 }
1386 
1387 static void
1388 _spdk_bdev_io_complete(void *ctx)
1389 {
1390 	struct spdk_bdev_io *bdev_io = ctx;
1391 
1392 	assert(bdev_io->cb != NULL);
1393 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, bdev_io->caller_ctx);
1394 }
1395 
1396 void
1397 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
1398 {
1399 	struct spdk_bdev *bdev = bdev_io->bdev;
1400 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
1401 
1402 	bdev_io->status = status;
1403 
1404 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
1405 		pthread_mutex_lock(&bdev->mutex);
1406 		if (bdev_io == bdev->reset_in_progress) {
1407 			bdev->reset_in_progress = NULL;
1408 		}
1409 		pthread_mutex_unlock(&bdev->mutex);
1410 		if (bdev_io->u.reset.ch_ref != NULL) {
1411 			spdk_put_io_channel(bdev_io->u.reset.ch_ref);
1412 		}
1413 		spdk_for_each_channel(bdev, _spdk_bdev_complete_reset_channel, NULL, NULL);
1414 	} else {
1415 		assert(bdev_ch->io_outstanding > 0);
1416 		bdev_ch->io_outstanding--;
1417 	}
1418 
1419 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1420 		switch (bdev_io->type) {
1421 		case SPDK_BDEV_IO_TYPE_READ:
1422 			bdev_ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev->blocklen;
1423 			bdev_ch->stat.num_read_ops++;
1424 			break;
1425 		case SPDK_BDEV_IO_TYPE_WRITE:
1426 			bdev_ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev->blocklen;
1427 			bdev_ch->stat.num_write_ops++;
1428 			break;
1429 		default:
1430 			break;
1431 		}
1432 	}
1433 
1434 #ifdef SPDK_CONFIG_VTUNE
1435 	uint64_t now_tsc = spdk_get_ticks();
1436 	if (now_tsc > (bdev_ch->start_tsc + bdev_ch->interval_tsc)) {
1437 		uint64_t data[5];
1438 
1439 		data[0] = bdev_ch->stat.num_read_ops;
1440 		data[1] = bdev_ch->stat.bytes_read;
1441 		data[2] = bdev_ch->stat.num_write_ops;
1442 		data[3] = bdev_ch->stat.bytes_written;
1443 		data[4] = bdev->fn_table->get_spin_time ?
1444 			  bdev->fn_table->get_spin_time(bdev_ch->channel) : 0;
1445 
1446 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_ch->handle,
1447 				   __itt_metadata_u64, 5, data);
1448 
1449 		memset(&bdev_ch->stat, 0, sizeof(bdev_ch->stat));
1450 		bdev_ch->start_tsc = now_tsc;
1451 	}
1452 #endif
1453 
1454 	if (bdev_io->in_submit_request || bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
1455 		/*
1456 		 * Defer completion to avoid potential infinite recursion if the
1457 		 * user's completion callback issues a new I/O.
1458 		 */
1459 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_ch->channel),
1460 				     _spdk_bdev_io_complete, bdev_io);
1461 	} else {
1462 		_spdk_bdev_io_complete(bdev_io);
1463 	}
1464 }
1465 
1466 void
1467 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
1468 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
1469 {
1470 	if (sc == SPDK_SCSI_STATUS_GOOD) {
1471 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1472 	} else {
1473 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
1474 		bdev_io->error.scsi.sc = sc;
1475 		bdev_io->error.scsi.sk = sk;
1476 		bdev_io->error.scsi.asc = asc;
1477 		bdev_io->error.scsi.ascq = ascq;
1478 	}
1479 
1480 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1481 }
1482 
1483 void
1484 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
1485 			     int *sc, int *sk, int *asc, int *ascq)
1486 {
1487 	assert(sc != NULL);
1488 	assert(sk != NULL);
1489 	assert(asc != NULL);
1490 	assert(ascq != NULL);
1491 
1492 	switch (bdev_io->status) {
1493 	case SPDK_BDEV_IO_STATUS_SUCCESS:
1494 		*sc = SPDK_SCSI_STATUS_GOOD;
1495 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
1496 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1497 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1498 		break;
1499 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
1500 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
1501 		break;
1502 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
1503 		*sc = bdev_io->error.scsi.sc;
1504 		*sk = bdev_io->error.scsi.sk;
1505 		*asc = bdev_io->error.scsi.asc;
1506 		*ascq = bdev_io->error.scsi.ascq;
1507 		break;
1508 	default:
1509 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
1510 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
1511 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1512 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1513 		break;
1514 	}
1515 }
1516 
1517 void
1518 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
1519 {
1520 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
1521 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1522 	} else {
1523 		bdev_io->error.nvme.sct = sct;
1524 		bdev_io->error.nvme.sc = sc;
1525 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
1526 	}
1527 
1528 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1529 }
1530 
1531 void
1532 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
1533 {
1534 	assert(sct != NULL);
1535 	assert(sc != NULL);
1536 
1537 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
1538 		*sct = bdev_io->error.nvme.sct;
1539 		*sc = bdev_io->error.nvme.sc;
1540 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1541 		*sct = SPDK_NVME_SCT_GENERIC;
1542 		*sc = SPDK_NVME_SC_SUCCESS;
1543 	} else {
1544 		*sct = SPDK_NVME_SCT_GENERIC;
1545 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1546 	}
1547 }
1548 
1549 static void
1550 _spdk_bdev_register(struct spdk_bdev *bdev)
1551 {
1552 	struct spdk_bdev_module_if *module;
1553 
1554 	assert(bdev->module != NULL);
1555 
1556 	bdev->status = SPDK_BDEV_STATUS_READY;
1557 
1558 	TAILQ_INIT(&bdev->open_descs);
1559 	bdev->bdev_opened = false;
1560 
1561 	TAILQ_INIT(&bdev->vbdevs);
1562 	TAILQ_INIT(&bdev->base_bdevs);
1563 
1564 	bdev->reset_in_progress = NULL;
1565 
1566 	spdk_io_device_register(bdev, spdk_bdev_channel_create, spdk_bdev_channel_destroy,
1567 				sizeof(struct spdk_bdev_channel));
1568 
1569 	pthread_mutex_init(&bdev->mutex, NULL);
1570 	SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Inserting bdev %s into list\n", bdev->name);
1571 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
1572 
1573 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
1574 		if (module->examine) {
1575 			module->action_in_progress++;
1576 			module->examine(bdev);
1577 		}
1578 	}
1579 }
1580 
1581 void
1582 spdk_bdev_register(struct spdk_bdev *bdev)
1583 {
1584 	_spdk_bdev_register(bdev);
1585 }
1586 
1587 void
1588 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
1589 {
1590 	int i;
1591 
1592 	_spdk_bdev_register(vbdev);
1593 	for (i = 0; i < base_bdev_count; i++) {
1594 		assert(base_bdevs[i] != NULL);
1595 		TAILQ_INSERT_TAIL(&vbdev->base_bdevs, base_bdevs[i], base_bdev_link);
1596 		TAILQ_INSERT_TAIL(&base_bdevs[i]->vbdevs, vbdev, vbdev_link);
1597 	}
1598 }
1599 
1600 void
1601 spdk_bdev_unregister(struct spdk_bdev *bdev)
1602 {
1603 	struct spdk_bdev_desc	*desc, *tmp;
1604 	int			rc;
1605 	bool			do_destruct = true;
1606 
1607 	SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Removing bdev %s from list\n", bdev->name);
1608 
1609 	pthread_mutex_lock(&bdev->mutex);
1610 
1611 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
1612 
1613 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
1614 		if (desc->remove_cb) {
1615 			pthread_mutex_unlock(&bdev->mutex);
1616 			do_destruct = false;
1617 			desc->remove_cb(desc->remove_ctx);
1618 			pthread_mutex_lock(&bdev->mutex);
1619 		}
1620 	}
1621 
1622 	if (!do_destruct) {
1623 		pthread_mutex_unlock(&bdev->mutex);
1624 		return;
1625 	}
1626 
1627 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
1628 	pthread_mutex_unlock(&bdev->mutex);
1629 
1630 	pthread_mutex_destroy(&bdev->mutex);
1631 
1632 	spdk_io_device_unregister(bdev, NULL);
1633 
1634 	rc = bdev->fn_table->destruct(bdev->ctxt);
1635 	if (rc < 0) {
1636 		SPDK_ERRLOG("destruct failed\n");
1637 	}
1638 }
1639 
1640 void
1641 spdk_vbdev_unregister(struct spdk_bdev *vbdev)
1642 {
1643 	struct spdk_bdev *base_bdev;
1644 
1645 	assert(!TAILQ_EMPTY(&vbdev->base_bdevs));
1646 	TAILQ_FOREACH(base_bdev, &vbdev->base_bdevs, base_bdev_link) {
1647 		TAILQ_REMOVE(&base_bdev->vbdevs, vbdev, vbdev_link);
1648 	}
1649 	spdk_bdev_unregister(vbdev);
1650 }
1651 
1652 bool
1653 spdk_is_bdev_opened(struct spdk_bdev *bdev)
1654 {
1655 	struct spdk_bdev *base;
1656 
1657 	if (bdev->bdev_opened) {
1658 		return true;
1659 	}
1660 
1661 	TAILQ_FOREACH(base, &bdev->base_bdevs, base_bdev_link) {
1662 		if (spdk_is_bdev_opened(base)) {
1663 			return true;
1664 		}
1665 	}
1666 
1667 	return false;
1668 }
1669 
1670 int
1671 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
1672 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
1673 {
1674 	struct spdk_bdev_desc *desc;
1675 
1676 	desc = calloc(1, sizeof(*desc));
1677 	if (desc == NULL) {
1678 		return -ENOMEM;
1679 	}
1680 
1681 	pthread_mutex_lock(&bdev->mutex);
1682 
1683 	if (write && bdev->claim_module) {
1684 		SPDK_ERRLOG("failed, %s already claimed\n", bdev->name);
1685 		free(desc);
1686 		pthread_mutex_unlock(&bdev->mutex);
1687 		return -EPERM;
1688 	}
1689 
1690 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
1691 
1692 	bdev->bdev_opened = true;
1693 
1694 	desc->bdev = bdev;
1695 	desc->remove_cb = remove_cb;
1696 	desc->remove_ctx = remove_ctx;
1697 	desc->write = write;
1698 	*_desc = desc;
1699 
1700 	pthread_mutex_unlock(&bdev->mutex);
1701 
1702 	return 0;
1703 }
1704 
1705 void
1706 spdk_bdev_close(struct spdk_bdev_desc *desc)
1707 {
1708 	struct spdk_bdev *bdev = desc->bdev;
1709 	bool do_unregister = false;
1710 
1711 	pthread_mutex_lock(&bdev->mutex);
1712 
1713 	bdev->bdev_opened = false;
1714 
1715 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
1716 	free(desc);
1717 
1718 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
1719 		do_unregister = true;
1720 	}
1721 	pthread_mutex_unlock(&bdev->mutex);
1722 
1723 	if (do_unregister == true) {
1724 		spdk_bdev_unregister(bdev);
1725 	}
1726 }
1727 
1728 int
1729 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
1730 			    struct spdk_bdev_module_if *module)
1731 {
1732 	if (bdev->claim_module != NULL) {
1733 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
1734 			    bdev->claim_module->name);
1735 		return -EPERM;
1736 	}
1737 
1738 	if (desc && !desc->write) {
1739 		desc->write = true;
1740 	}
1741 
1742 	bdev->claim_module = module;
1743 	return 0;
1744 }
1745 
1746 void
1747 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
1748 {
1749 	assert(bdev->claim_module != NULL);
1750 	bdev->claim_module = NULL;
1751 }
1752 
1753 struct spdk_bdev *
1754 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
1755 {
1756 	return desc->bdev;
1757 }
1758 
1759 void
1760 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
1761 {
1762 	struct iovec *iovs;
1763 	int iovcnt;
1764 
1765 	if (bdev_io == NULL) {
1766 		return;
1767 	}
1768 
1769 	switch (bdev_io->type) {
1770 	case SPDK_BDEV_IO_TYPE_READ:
1771 		iovs = bdev_io->u.bdev.iovs;
1772 		iovcnt = bdev_io->u.bdev.iovcnt;
1773 		break;
1774 	case SPDK_BDEV_IO_TYPE_WRITE:
1775 		iovs = bdev_io->u.bdev.iovs;
1776 		iovcnt = bdev_io->u.bdev.iovcnt;
1777 		break;
1778 	default:
1779 		iovs = NULL;
1780 		iovcnt = 0;
1781 		break;
1782 	}
1783 
1784 	if (iovp) {
1785 		*iovp = iovs;
1786 	}
1787 	if (iovcntp) {
1788 		*iovcntp = iovcnt;
1789 	}
1790 }
1791 
1792 void
1793 spdk_bdev_module_list_add(struct spdk_bdev_module_if *bdev_module)
1794 {
1795 	/*
1796 	 * Modules with examine callbacks must be initialized first, so they are
1797 	 *  ready to handle examine callbacks from later modules that will
1798 	 *  register physical bdevs.
1799 	 */
1800 	if (bdev_module->examine != NULL) {
1801 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
1802 	} else {
1803 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
1804 	}
1805 }
1806 
1807 void
1808 spdk_bdev_part_base_free(struct spdk_bdev_part_base *base)
1809 {
1810 	assert(base->bdev);
1811 	assert(base->desc);
1812 	spdk_bdev_close(base->desc);
1813 	free(base);
1814 }
1815 
1816 void
1817 spdk_bdev_part_free(struct spdk_bdev_part *part)
1818 {
1819 	struct spdk_bdev_part_base *base;
1820 
1821 	assert(part);
1822 	assert(part->base);
1823 
1824 	base = part->base;
1825 	spdk_io_device_unregister(&part->base, NULL);
1826 	TAILQ_REMOVE(base->tailq, part, tailq);
1827 	free(part->bdev.name);
1828 	free(part);
1829 
1830 	if (__sync_sub_and_fetch(&base->ref, 1) == 0) {
1831 		spdk_bdev_module_release_bdev(base->bdev);
1832 		spdk_bdev_part_base_free(base);
1833 	}
1834 }
1835 
1836 void
1837 spdk_bdev_part_tailq_fini(struct bdev_part_tailq *tailq)
1838 {
1839 	struct spdk_bdev_part *part, *tmp;
1840 
1841 	TAILQ_FOREACH_SAFE(part, tailq, tailq, tmp) {
1842 		spdk_bdev_part_free(part);
1843 	}
1844 }
1845 
1846 void
1847 spdk_bdev_part_base_hotremove(struct spdk_bdev *base_bdev, struct bdev_part_tailq *tailq)
1848 {
1849 	struct spdk_bdev_part *part, *tmp;
1850 
1851 	TAILQ_FOREACH_SAFE(part, tailq, tailq, tmp) {
1852 		if (part->base->bdev == base_bdev) {
1853 			spdk_bdev_unregister(&part->bdev);
1854 		}
1855 	}
1856 }
1857 
1858 static bool
1859 spdk_bdev_part_io_type_supported(void *_part, enum spdk_bdev_io_type io_type)
1860 {
1861 	struct spdk_bdev_part *part = _part;
1862 
1863 	return part->base->bdev->fn_table->io_type_supported(part->base->bdev, io_type);
1864 }
1865 
1866 static struct spdk_io_channel *
1867 spdk_bdev_part_get_io_channel(void *_part)
1868 {
1869 	struct spdk_bdev_part *part = _part;
1870 
1871 	return spdk_get_io_channel(&part->base);
1872 }
1873 
1874 static void
1875 spdk_bdev_part_complete_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1876 {
1877 	struct spdk_bdev_io *part_io = cb_arg;
1878 	int status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
1879 
1880 	spdk_bdev_io_complete(part_io, status);
1881 	spdk_bdev_free_io(bdev_io);
1882 }
1883 
1884 void
1885 spdk_bdev_part_submit_request(struct spdk_bdev_part_channel *ch, struct spdk_bdev_io *bdev_io)
1886 {
1887 	struct spdk_bdev_part *part = ch->part;
1888 	struct spdk_io_channel *base_ch = ch->base_ch;
1889 	struct spdk_bdev_desc *base_desc = part->base->desc;
1890 	uint64_t offset;
1891 	int rc = 0;
1892 
1893 	/* Modify the I/O to adjust for the offset within the base bdev. */
1894 	switch (bdev_io->type) {
1895 	case SPDK_BDEV_IO_TYPE_READ:
1896 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
1897 		rc = spdk_bdev_readv_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
1898 					    bdev_io->u.bdev.iovcnt, offset,
1899 					    bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
1900 					    bdev_io);
1901 		break;
1902 	case SPDK_BDEV_IO_TYPE_WRITE:
1903 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
1904 		rc = spdk_bdev_writev_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
1905 					     bdev_io->u.bdev.iovcnt, offset,
1906 					     bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
1907 					     bdev_io);
1908 		break;
1909 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1910 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
1911 		rc = spdk_bdev_write_zeroes_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
1912 						   spdk_bdev_part_complete_io, bdev_io);
1913 		break;
1914 	case SPDK_BDEV_IO_TYPE_UNMAP:
1915 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
1916 		rc = spdk_bdev_unmap_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
1917 					    spdk_bdev_part_complete_io, bdev_io);
1918 		break;
1919 	case SPDK_BDEV_IO_TYPE_FLUSH:
1920 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
1921 		rc = spdk_bdev_flush_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
1922 					    spdk_bdev_part_complete_io, bdev_io);
1923 		break;
1924 	case SPDK_BDEV_IO_TYPE_RESET:
1925 		rc = spdk_bdev_reset(base_desc, base_ch,
1926 				     spdk_bdev_part_complete_io, bdev_io);
1927 		break;
1928 	default:
1929 		SPDK_ERRLOG("split: unknown I/O type %d\n", bdev_io->type);
1930 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1931 		return;
1932 	}
1933 
1934 	if (rc != 0) {
1935 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1936 	}
1937 }
1938 static int
1939 spdk_bdev_part_channel_create_cb(void *io_device, void *ctx_buf)
1940 {
1941 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
1942 	struct spdk_bdev_part_channel *ch = ctx_buf;
1943 
1944 	ch->part = part;
1945 	ch->base_ch = spdk_bdev_get_io_channel(part->base->desc);
1946 	if (ch->base_ch == NULL) {
1947 		return -1;
1948 	}
1949 
1950 	if (part->base->ch_create_cb) {
1951 		return part->base->ch_create_cb(io_device, ctx_buf);
1952 	} else {
1953 		return 0;
1954 	}
1955 }
1956 
1957 static void
1958 spdk_bdev_part_channel_destroy_cb(void *io_device, void *ctx_buf)
1959 {
1960 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
1961 	struct spdk_bdev_part_channel *ch = ctx_buf;
1962 
1963 	if (part->base->ch_destroy_cb) {
1964 		part->base->ch_destroy_cb(io_device, ctx_buf);
1965 	}
1966 	spdk_put_io_channel(ch->base_ch);
1967 }
1968 
1969 int
1970 spdk_bdev_part_base_construct(struct spdk_bdev_part_base *base, struct spdk_bdev *bdev,
1971 			      spdk_bdev_remove_cb_t remove_cb, struct spdk_bdev_module_if *module,
1972 			      struct spdk_bdev_fn_table *fn_table, struct bdev_part_tailq *tailq,
1973 			      uint32_t channel_size, spdk_io_channel_create_cb ch_create_cb,
1974 			      spdk_io_channel_destroy_cb ch_destroy_cb)
1975 {
1976 	int rc;
1977 
1978 	fn_table->get_io_channel = spdk_bdev_part_get_io_channel;
1979 	fn_table->io_type_supported = spdk_bdev_part_io_type_supported;
1980 
1981 	base->bdev = bdev;
1982 	base->ref = 0;
1983 	base->module = module;
1984 	base->fn_table = fn_table;
1985 	base->tailq = tailq;
1986 	base->claimed = false;
1987 	base->channel_size = channel_size;
1988 	base->ch_create_cb = ch_create_cb;
1989 	base->ch_destroy_cb = ch_destroy_cb;
1990 
1991 	rc = spdk_bdev_open(bdev, false, remove_cb, bdev, &base->desc);
1992 	if (rc) {
1993 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev));
1994 		return -1;
1995 	}
1996 
1997 	return 0;
1998 }
1999 
2000 int
2001 spdk_bdev_part_construct(struct spdk_bdev_part *part, struct spdk_bdev_part_base *base,
2002 			 char *name, uint64_t offset_blocks, uint64_t num_blocks,
2003 			 char *product_name)
2004 {
2005 	part->bdev.name = name;
2006 	part->bdev.blocklen = base->bdev->blocklen;
2007 	part->bdev.blockcnt = num_blocks;
2008 	part->offset_blocks = offset_blocks;
2009 
2010 	part->bdev.write_cache = base->bdev->write_cache;
2011 	part->bdev.need_aligned_buffer = base->bdev->need_aligned_buffer;
2012 	part->bdev.product_name = product_name;
2013 	part->bdev.ctxt = part;
2014 	part->bdev.module = base->module;
2015 	part->bdev.fn_table = base->fn_table;
2016 
2017 	__sync_fetch_and_add(&base->ref, 1);
2018 	part->base = base;
2019 
2020 	if (!base->claimed) {
2021 		int rc;
2022 
2023 		rc = spdk_bdev_module_claim_bdev(base->bdev, base->desc, base->module);
2024 		if (rc) {
2025 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(base->bdev));
2026 			free(part->bdev.name);
2027 			return -1;
2028 		}
2029 		base->claimed = true;
2030 	}
2031 
2032 	spdk_io_device_register(&part->base, spdk_bdev_part_channel_create_cb,
2033 				spdk_bdev_part_channel_destroy_cb,
2034 				base->channel_size);
2035 	spdk_vbdev_register(&part->bdev, &base->bdev, 1);
2036 	TAILQ_INSERT_TAIL(base->tailq, part, tailq);
2037 
2038 	return 0;
2039 }
2040 
2041 SPDK_LOG_REGISTER_TRACE_FLAG("bdev", SPDK_TRACE_BDEV)
2042