xref: /spdk/lib/bdev/bdev.c (revision 94bc8cfdbaa79976292d40c0d74228007190581f)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/likely.h"
42 #include "spdk/queue.h"
43 #include "spdk/nvme_spec.h"
44 #include "spdk/scsi_spec.h"
45 #include "spdk/util.h"
46 
47 #include "spdk_internal/bdev.h"
48 #include "spdk_internal/log.h"
49 #include "spdk/string.h"
50 
51 #ifdef SPDK_CONFIG_VTUNE
52 #include "ittnotify.h"
53 #include "ittnotify_types.h"
54 int __itt_init_ittlib(const char *, __itt_group_id);
55 #endif
56 
57 #define SPDK_BDEV_IO_POOL_SIZE	(64 * 1024)
58 #define BUF_SMALL_POOL_SIZE	8192
59 #define BUF_LARGE_POOL_SIZE	1024
60 #define NOMEM_THRESHOLD_COUNT	8
61 
62 typedef TAILQ_HEAD(, spdk_bdev_io) bdev_io_tailq_t;
63 
64 struct spdk_bdev_mgr {
65 	struct spdk_mempool *bdev_io_pool;
66 
67 	struct spdk_mempool *buf_small_pool;
68 	struct spdk_mempool *buf_large_pool;
69 
70 	TAILQ_HEAD(, spdk_bdev_module_if) bdev_modules;
71 
72 	TAILQ_HEAD(, spdk_bdev) bdevs;
73 
74 	spdk_bdev_poller_start_cb start_poller_fn;
75 	spdk_bdev_poller_stop_cb stop_poller_fn;
76 
77 	bool init_complete;
78 	bool module_init_complete;
79 
80 #ifdef SPDK_CONFIG_VTUNE
81 	__itt_domain	*domain;
82 #endif
83 };
84 
85 static struct spdk_bdev_mgr g_bdev_mgr = {
86 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
87 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
88 	.start_poller_fn = NULL,
89 	.stop_poller_fn = NULL,
90 	.init_complete = false,
91 	.module_init_complete = false,
92 };
93 
94 static spdk_bdev_init_cb	g_cb_fn = NULL;
95 static void			*g_cb_arg = NULL;
96 
97 
98 struct spdk_bdev_mgmt_channel {
99 	bdev_io_tailq_t need_buf_small;
100 	bdev_io_tailq_t need_buf_large;
101 };
102 
103 struct spdk_bdev_desc {
104 	struct spdk_bdev		*bdev;
105 	spdk_bdev_remove_cb_t		remove_cb;
106 	void				*remove_ctx;
107 	bool				write;
108 	TAILQ_ENTRY(spdk_bdev_desc)	link;
109 };
110 
111 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
112 
113 struct spdk_bdev_channel {
114 	struct spdk_bdev	*bdev;
115 
116 	/* The channel for the underlying device */
117 	struct spdk_io_channel	*channel;
118 
119 	/* Channel for the bdev manager */
120 	struct spdk_io_channel *mgmt_channel;
121 
122 	struct spdk_bdev_io_stat stat;
123 
124 	/*
125 	 * Count of I/O submitted to bdev module and waiting for completion.
126 	 * Incremented before submit_request() is called on an spdk_bdev_io.
127 	 */
128 	uint64_t		io_outstanding;
129 
130 	bdev_io_tailq_t		queued_resets;
131 
132 	/*
133 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
134 	 *  on this channel.
135 	 */
136 	bdev_io_tailq_t		nomem_io;
137 
138 	/*
139 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
140 	 */
141 	uint64_t		nomem_threshold;
142 
143 	uint32_t		flags;
144 
145 #ifdef SPDK_CONFIG_VTUNE
146 	uint64_t		start_tsc;
147 	uint64_t		interval_tsc;
148 	__itt_string_handle	*handle;
149 #endif
150 
151 };
152 
153 struct spdk_bdev *
154 spdk_bdev_first(void)
155 {
156 	struct spdk_bdev *bdev;
157 
158 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
159 	if (bdev) {
160 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Starting bdev iteration at %s\n", bdev->name);
161 	}
162 
163 	return bdev;
164 }
165 
166 struct spdk_bdev *
167 spdk_bdev_next(struct spdk_bdev *prev)
168 {
169 	struct spdk_bdev *bdev;
170 
171 	bdev = TAILQ_NEXT(prev, link);
172 	if (bdev) {
173 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
174 	}
175 
176 	return bdev;
177 }
178 
179 static struct spdk_bdev *
180 _bdev_next_leaf(struct spdk_bdev *bdev)
181 {
182 	while (bdev != NULL) {
183 		if (TAILQ_EMPTY(&bdev->vbdevs)) {
184 			return bdev;
185 		} else {
186 			bdev = TAILQ_NEXT(bdev, link);
187 		}
188 	}
189 
190 	return bdev;
191 }
192 
193 struct spdk_bdev *
194 spdk_bdev_first_leaf(void)
195 {
196 	struct spdk_bdev *bdev;
197 
198 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
199 
200 	if (bdev) {
201 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Starting bdev iteration at %s\n", bdev->name);
202 	}
203 
204 	return bdev;
205 }
206 
207 struct spdk_bdev *
208 spdk_bdev_next_leaf(struct spdk_bdev *prev)
209 {
210 	struct spdk_bdev *bdev;
211 
212 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
213 
214 	if (bdev) {
215 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
216 	}
217 
218 	return bdev;
219 }
220 
221 struct spdk_bdev *
222 spdk_bdev_get_by_name(const char *bdev_name)
223 {
224 	struct spdk_bdev *bdev = spdk_bdev_first();
225 
226 	while (bdev != NULL) {
227 		if (strcmp(bdev_name, bdev->name) == 0) {
228 			return bdev;
229 		}
230 		bdev = spdk_bdev_next(bdev);
231 	}
232 
233 	return NULL;
234 }
235 
236 static void
237 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
238 {
239 	assert(bdev_io->get_buf_cb != NULL);
240 	assert(buf != NULL);
241 	assert(bdev_io->u.bdev.iovs != NULL);
242 
243 	bdev_io->buf = buf;
244 	bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
245 	bdev_io->u.bdev.iovs[0].iov_len = bdev_io->buf_len;
246 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
247 }
248 
249 static void
250 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
251 {
252 	struct spdk_mempool *pool;
253 	struct spdk_bdev_io *tmp;
254 	void *buf;
255 	bdev_io_tailq_t *tailq;
256 	struct spdk_bdev_mgmt_channel *ch;
257 
258 	assert(bdev_io->u.bdev.iovcnt == 1);
259 
260 	buf = bdev_io->buf;
261 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
262 
263 	if (bdev_io->buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
264 		pool = g_bdev_mgr.buf_small_pool;
265 		tailq = &ch->need_buf_small;
266 	} else {
267 		pool = g_bdev_mgr.buf_large_pool;
268 		tailq = &ch->need_buf_large;
269 	}
270 
271 	if (TAILQ_EMPTY(tailq)) {
272 		spdk_mempool_put(pool, buf);
273 	} else {
274 		tmp = TAILQ_FIRST(tailq);
275 		TAILQ_REMOVE(tailq, tmp, buf_link);
276 		spdk_bdev_io_set_buf(tmp, buf);
277 	}
278 }
279 
280 void
281 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
282 {
283 	struct spdk_mempool *pool;
284 	bdev_io_tailq_t *tailq;
285 	void *buf = NULL;
286 	struct spdk_bdev_mgmt_channel *ch;
287 
288 	assert(cb != NULL);
289 	assert(bdev_io->u.bdev.iovs != NULL);
290 
291 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
292 		/* Buffer already present */
293 		cb(bdev_io->ch->channel, bdev_io);
294 		return;
295 	}
296 
297 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
298 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
299 
300 	bdev_io->buf_len = len;
301 	bdev_io->get_buf_cb = cb;
302 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
303 		pool = g_bdev_mgr.buf_small_pool;
304 		tailq = &ch->need_buf_small;
305 	} else {
306 		pool = g_bdev_mgr.buf_large_pool;
307 		tailq = &ch->need_buf_large;
308 	}
309 
310 	buf = spdk_mempool_get(pool);
311 
312 	if (!buf) {
313 		TAILQ_INSERT_TAIL(tailq, bdev_io, buf_link);
314 	} else {
315 		spdk_bdev_io_set_buf(bdev_io, buf);
316 	}
317 }
318 
319 static int
320 spdk_bdev_module_get_max_ctx_size(void)
321 {
322 	struct spdk_bdev_module_if *bdev_module;
323 	int max_bdev_module_size = 0;
324 
325 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
326 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
327 			max_bdev_module_size = bdev_module->get_ctx_size();
328 		}
329 	}
330 
331 	return max_bdev_module_size;
332 }
333 
334 void
335 spdk_bdev_config_text(FILE *fp)
336 {
337 	struct spdk_bdev_module_if *bdev_module;
338 
339 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
340 		if (bdev_module->config_text) {
341 			bdev_module->config_text(fp);
342 		}
343 	}
344 }
345 
346 static int
347 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
348 {
349 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
350 
351 	TAILQ_INIT(&ch->need_buf_small);
352 	TAILQ_INIT(&ch->need_buf_large);
353 
354 	return 0;
355 }
356 
357 static void
358 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
359 {
360 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
361 
362 	if (!TAILQ_EMPTY(&ch->need_buf_small) || !TAILQ_EMPTY(&ch->need_buf_large)) {
363 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel destruction\n");
364 	}
365 }
366 
367 static void
368 spdk_bdev_init_complete(int rc)
369 {
370 	spdk_bdev_init_cb cb_fn = g_cb_fn;
371 	void *cb_arg = g_cb_arg;
372 
373 	g_bdev_mgr.init_complete = true;
374 	g_cb_fn = NULL;
375 	g_cb_arg = NULL;
376 
377 	cb_fn(cb_arg, rc);
378 }
379 
380 static void
381 spdk_bdev_module_action_complete(void)
382 {
383 	struct spdk_bdev_module_if *m;
384 
385 	/*
386 	 * Don't finish bdev subsystem initialization if
387 	 * module pre-initialization is still in progress, or
388 	 * the subsystem been already initialized.
389 	 */
390 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
391 		return;
392 	}
393 
394 	/*
395 	 * Check all bdev modules for inits/examinations in progress. If any
396 	 * exist, return immediately since we cannot finish bdev subsystem
397 	 * initialization until all are completed.
398 	 */
399 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
400 		if (m->action_in_progress > 0) {
401 			return;
402 		}
403 	}
404 
405 	/*
406 	 * Modules already finished initialization - now that all
407 	 * the bdev modules have finished their asynchronous I/O
408 	 * processing, the entire bdev layer can be marked as complete.
409 	 */
410 	spdk_bdev_init_complete(0);
411 }
412 
413 static void
414 spdk_bdev_module_action_done(struct spdk_bdev_module_if *module)
415 {
416 	assert(module->action_in_progress > 0);
417 	module->action_in_progress--;
418 	spdk_bdev_module_action_complete();
419 }
420 
421 void
422 spdk_bdev_module_init_done(struct spdk_bdev_module_if *module)
423 {
424 	spdk_bdev_module_action_done(module);
425 }
426 
427 void
428 spdk_bdev_module_examine_done(struct spdk_bdev_module_if *module)
429 {
430 	spdk_bdev_module_action_done(module);
431 }
432 
433 static int
434 spdk_bdev_modules_init(void)
435 {
436 	struct spdk_bdev_module_if *module;
437 	int rc = 0;
438 
439 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
440 		rc = module->module_init();
441 		if (rc != 0) {
442 			break;
443 		}
444 	}
445 
446 	g_bdev_mgr.module_init_complete = true;
447 	return rc;
448 }
449 
450 void
451 spdk_bdev_poller_start(struct spdk_bdev_poller **ppoller,
452 		       spdk_bdev_poller_fn fn,
453 		       void *arg,
454 		       uint32_t lcore,
455 		       uint64_t period_microseconds)
456 {
457 	g_bdev_mgr.start_poller_fn(ppoller, fn, arg, lcore, period_microseconds);
458 }
459 
460 void
461 spdk_bdev_poller_stop(struct spdk_bdev_poller **ppoller)
462 {
463 	g_bdev_mgr.stop_poller_fn(ppoller);
464 }
465 
466 void
467 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg,
468 		     spdk_bdev_poller_start_cb start_poller_fn,
469 		     spdk_bdev_poller_stop_cb stop_poller_fn)
470 {
471 	int cache_size;
472 	int rc = 0;
473 	char mempool_name[32];
474 
475 	assert(cb_fn != NULL);
476 
477 	g_cb_fn = cb_fn;
478 	g_cb_arg = cb_arg;
479 
480 	g_bdev_mgr.start_poller_fn = start_poller_fn;
481 	g_bdev_mgr.stop_poller_fn = stop_poller_fn;
482 
483 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
484 
485 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
486 				  SPDK_BDEV_IO_POOL_SIZE,
487 				  sizeof(struct spdk_bdev_io) +
488 				  spdk_bdev_module_get_max_ctx_size(),
489 				  64,
490 				  SPDK_ENV_SOCKET_ID_ANY);
491 
492 	if (g_bdev_mgr.bdev_io_pool == NULL) {
493 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
494 		spdk_bdev_init_complete(-1);
495 		return;
496 	}
497 
498 	/**
499 	 * Ensure no more than half of the total buffers end up local caches, by
500 	 *   using spdk_env_get_core_count() to determine how many local caches we need
501 	 *   to account for.
502 	 */
503 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
504 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
505 
506 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
507 				    BUF_SMALL_POOL_SIZE,
508 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
509 				    cache_size,
510 				    SPDK_ENV_SOCKET_ID_ANY);
511 	if (!g_bdev_mgr.buf_small_pool) {
512 		SPDK_ERRLOG("create rbuf small pool failed\n");
513 		spdk_bdev_init_complete(-1);
514 		return;
515 	}
516 
517 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
518 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
519 
520 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
521 				    BUF_LARGE_POOL_SIZE,
522 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
523 				    cache_size,
524 				    SPDK_ENV_SOCKET_ID_ANY);
525 	if (!g_bdev_mgr.buf_large_pool) {
526 		SPDK_ERRLOG("create rbuf large pool failed\n");
527 		spdk_bdev_init_complete(-1);
528 		return;
529 	}
530 
531 #ifdef SPDK_CONFIG_VTUNE
532 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
533 #endif
534 
535 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
536 				spdk_bdev_mgmt_channel_destroy,
537 				sizeof(struct spdk_bdev_mgmt_channel));
538 
539 	rc = spdk_bdev_modules_init();
540 	if (rc != 0) {
541 		SPDK_ERRLOG("bdev modules init failed\n");
542 		spdk_bdev_init_complete(-1);
543 		return;
544 	}
545 
546 	spdk_bdev_module_action_complete();
547 }
548 
549 int
550 spdk_bdev_finish(void)
551 {
552 	struct spdk_bdev_module_if *bdev_module;
553 
554 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
555 		if (bdev_module->module_fini) {
556 			bdev_module->module_fini();
557 		}
558 	}
559 
560 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
561 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
562 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
563 			    SPDK_BDEV_IO_POOL_SIZE);
564 	}
565 
566 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
567 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
568 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
569 			    BUF_SMALL_POOL_SIZE);
570 		assert(false);
571 	}
572 
573 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
574 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
575 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
576 			    BUF_LARGE_POOL_SIZE);
577 		assert(false);
578 	}
579 
580 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
581 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
582 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
583 
584 	spdk_io_device_unregister(&g_bdev_mgr, NULL);
585 
586 	return 0;
587 }
588 
589 struct spdk_bdev_io *
590 spdk_bdev_get_io(void)
591 {
592 	struct spdk_bdev_io *bdev_io;
593 
594 	bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
595 	if (!bdev_io) {
596 		SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
597 		abort();
598 	}
599 
600 	memset(bdev_io, 0, offsetof(struct spdk_bdev_io, u));
601 
602 	return bdev_io;
603 }
604 
605 static void
606 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
607 {
608 	if (bdev_io->buf != NULL) {
609 		spdk_bdev_io_put_buf(bdev_io);
610 	}
611 
612 	spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
613 }
614 
615 static void
616 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
617 {
618 	struct spdk_bdev *bdev = bdev_io->bdev;
619 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
620 	struct spdk_io_channel *ch = bdev_ch->channel;
621 
622 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
623 
624 	bdev_ch->io_outstanding++;
625 	bdev_io->in_submit_request = true;
626 	if (spdk_likely(bdev_ch->flags == 0)) {
627 		if (spdk_likely(TAILQ_EMPTY(&bdev_ch->nomem_io))) {
628 			bdev->fn_table->submit_request(ch, bdev_io);
629 		} else {
630 			bdev_ch->io_outstanding--;
631 			TAILQ_INSERT_TAIL(&bdev_ch->nomem_io, bdev_io, link);
632 		}
633 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
634 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
635 	} else {
636 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
637 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
638 	}
639 	bdev_io->in_submit_request = false;
640 }
641 
642 static void
643 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
644 {
645 	struct spdk_bdev *bdev = bdev_io->bdev;
646 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
647 	struct spdk_io_channel *ch = bdev_ch->channel;
648 
649 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
650 
651 	bdev_io->in_submit_request = true;
652 	bdev->fn_table->submit_request(ch, bdev_io);
653 	bdev_io->in_submit_request = false;
654 }
655 
656 static void
657 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
658 		  struct spdk_bdev *bdev, void *cb_arg,
659 		  spdk_bdev_io_completion_cb cb)
660 {
661 	bdev_io->bdev = bdev;
662 	bdev_io->caller_ctx = cb_arg;
663 	bdev_io->cb = cb;
664 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
665 	bdev_io->in_submit_request = false;
666 }
667 
668 bool
669 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
670 {
671 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
672 }
673 
674 int
675 spdk_bdev_dump_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
676 {
677 	if (bdev->fn_table->dump_config_json) {
678 		return bdev->fn_table->dump_config_json(bdev->ctxt, w);
679 	}
680 
681 	return 0;
682 }
683 
684 static int
685 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
686 {
687 	struct spdk_bdev		*bdev = io_device;
688 	struct spdk_bdev_channel	*ch = ctx_buf;
689 
690 	ch->bdev = io_device;
691 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
692 	ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr);
693 	memset(&ch->stat, 0, sizeof(ch->stat));
694 	ch->io_outstanding = 0;
695 	TAILQ_INIT(&ch->queued_resets);
696 	TAILQ_INIT(&ch->nomem_io);
697 	ch->nomem_threshold = 0;
698 	ch->flags = 0;
699 
700 #ifdef SPDK_CONFIG_VTUNE
701 	{
702 		char *name;
703 		__itt_init_ittlib(NULL, 0);
704 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
705 		if (!name) {
706 			return -1;
707 		}
708 		ch->handle = __itt_string_handle_create(name);
709 		free(name);
710 		ch->start_tsc = spdk_get_ticks();
711 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
712 	}
713 #endif
714 
715 	return 0;
716 }
717 
718 /*
719  * Abort I/O that are waiting on a data buffer.  These types of I/O are
720  *  linked using the spdk_bdev_io buf_link TAILQ_ENTRY.
721  */
722 static void
723 _spdk_bdev_abort_buf_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
724 {
725 	struct spdk_bdev_io *bdev_io, *tmp;
726 
727 	TAILQ_FOREACH_SAFE(bdev_io, queue, buf_link, tmp) {
728 		if (bdev_io->ch == ch) {
729 			TAILQ_REMOVE(queue, bdev_io, buf_link);
730 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
731 		}
732 	}
733 }
734 
735 /*
736  * Abort I/O that are queued waiting for submission.  These types of I/O are
737  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
738  */
739 static void
740 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
741 {
742 	struct spdk_bdev_io *bdev_io, *tmp;
743 
744 	TAILQ_FOREACH_SAFE(bdev_io, queue, link, tmp) {
745 		if (bdev_io->ch == ch) {
746 			TAILQ_REMOVE(queue, bdev_io, link);
747 			/*
748 			 * spdk_bdev_io_complete() assumes that the completed I/O had
749 			 *  been submitted to the bdev module.  Since in this case it
750 			 *  hadn't, bump io_outstanding to account for the decrement
751 			 *  that spdk_bdev_io_complete() will do.
752 			 */
753 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
754 				ch->io_outstanding++;
755 			}
756 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
757 		}
758 	}
759 }
760 
761 static void
762 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
763 {
764 	struct spdk_bdev_channel	*ch = ctx_buf;
765 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
766 
767 	mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
768 
769 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
770 	_spdk_bdev_abort_queued_io(&ch->nomem_io, ch);
771 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, ch);
772 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, ch);
773 
774 	spdk_put_io_channel(ch->channel);
775 	spdk_put_io_channel(ch->mgmt_channel);
776 	assert(ch->io_outstanding == 0);
777 }
778 
779 struct spdk_io_channel *
780 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
781 {
782 	return spdk_get_io_channel(desc->bdev);
783 }
784 
785 const char *
786 spdk_bdev_get_name(const struct spdk_bdev *bdev)
787 {
788 	return bdev->name;
789 }
790 
791 const char *
792 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
793 {
794 	return bdev->product_name;
795 }
796 
797 uint32_t
798 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
799 {
800 	return bdev->blocklen;
801 }
802 
803 uint64_t
804 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
805 {
806 	return bdev->blockcnt;
807 }
808 
809 size_t
810 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
811 {
812 	/* TODO: push this logic down to the bdev modules */
813 	if (bdev->need_aligned_buffer) {
814 		return bdev->blocklen;
815 	}
816 
817 	return 1;
818 }
819 
820 uint32_t
821 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
822 {
823 	return bdev->optimal_io_boundary;
824 }
825 
826 bool
827 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
828 {
829 	return bdev->write_cache;
830 }
831 
832 /*
833  * Convert I/O offset and length from bytes to blocks.
834  *
835  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
836  */
837 static uint64_t
838 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
839 			  uint64_t num_bytes, uint64_t *num_blocks)
840 {
841 	uint32_t block_size = bdev->blocklen;
842 
843 	*offset_blocks = offset_bytes / block_size;
844 	*num_blocks = num_bytes / block_size;
845 
846 	return (offset_bytes % block_size) | (num_bytes % block_size);
847 }
848 
849 static bool
850 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
851 {
852 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
853 	 * has been an overflow and hence the offset has been wrapped around */
854 	if (offset_blocks + num_blocks < offset_blocks) {
855 		return false;
856 	}
857 
858 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
859 	if (offset_blocks + num_blocks > bdev->blockcnt) {
860 		return false;
861 	}
862 
863 	return true;
864 }
865 
866 int
867 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
868 	       void *buf, uint64_t offset, uint64_t nbytes,
869 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
870 {
871 	uint64_t offset_blocks, num_blocks;
872 
873 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
874 		return -EINVAL;
875 	}
876 
877 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
878 }
879 
880 int
881 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
882 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
883 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
884 {
885 	struct spdk_bdev *bdev = desc->bdev;
886 	struct spdk_bdev_io *bdev_io;
887 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
888 
889 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
890 		return -EINVAL;
891 	}
892 
893 	bdev_io = spdk_bdev_get_io();
894 	if (!bdev_io) {
895 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
896 		return -ENOMEM;
897 	}
898 
899 	bdev_io->ch = channel;
900 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
901 	bdev_io->u.bdev.iov.iov_base = buf;
902 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
903 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
904 	bdev_io->u.bdev.iovcnt = 1;
905 	bdev_io->u.bdev.num_blocks = num_blocks;
906 	bdev_io->u.bdev.offset_blocks = offset_blocks;
907 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
908 
909 	spdk_bdev_io_submit(bdev_io);
910 	return 0;
911 }
912 
913 int
914 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
915 		struct iovec *iov, int iovcnt,
916 		uint64_t offset, uint64_t nbytes,
917 		spdk_bdev_io_completion_cb cb, void *cb_arg)
918 {
919 	uint64_t offset_blocks, num_blocks;
920 
921 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
922 		return -EINVAL;
923 	}
924 
925 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
926 }
927 
928 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
929 			   struct iovec *iov, int iovcnt,
930 			   uint64_t offset_blocks, uint64_t num_blocks,
931 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
932 {
933 	struct spdk_bdev *bdev = desc->bdev;
934 	struct spdk_bdev_io *bdev_io;
935 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
936 
937 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
938 		return -EINVAL;
939 	}
940 
941 	bdev_io = spdk_bdev_get_io();
942 	if (!bdev_io) {
943 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
944 		return -ENOMEM;
945 	}
946 
947 	bdev_io->ch = channel;
948 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
949 	bdev_io->u.bdev.iovs = iov;
950 	bdev_io->u.bdev.iovcnt = iovcnt;
951 	bdev_io->u.bdev.num_blocks = num_blocks;
952 	bdev_io->u.bdev.offset_blocks = offset_blocks;
953 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
954 
955 	spdk_bdev_io_submit(bdev_io);
956 	return 0;
957 }
958 
959 int
960 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
961 		void *buf, uint64_t offset, uint64_t nbytes,
962 		spdk_bdev_io_completion_cb cb, void *cb_arg)
963 {
964 	uint64_t offset_blocks, num_blocks;
965 
966 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
967 		return -EINVAL;
968 	}
969 
970 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
971 }
972 
973 int
974 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
975 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
976 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
977 {
978 	struct spdk_bdev *bdev = desc->bdev;
979 	struct spdk_bdev_io *bdev_io;
980 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
981 
982 	if (!desc->write) {
983 		return -EBADF;
984 	}
985 
986 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
987 		return -EINVAL;
988 	}
989 
990 	bdev_io = spdk_bdev_get_io();
991 	if (!bdev_io) {
992 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
993 		return -ENOMEM;
994 	}
995 
996 	bdev_io->ch = channel;
997 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
998 	bdev_io->u.bdev.iov.iov_base = buf;
999 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1000 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1001 	bdev_io->u.bdev.iovcnt = 1;
1002 	bdev_io->u.bdev.num_blocks = num_blocks;
1003 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1004 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1005 
1006 	spdk_bdev_io_submit(bdev_io);
1007 	return 0;
1008 }
1009 
1010 int
1011 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1012 		 struct iovec *iov, int iovcnt,
1013 		 uint64_t offset, uint64_t len,
1014 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
1015 {
1016 	uint64_t offset_blocks, num_blocks;
1017 
1018 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1019 		return -EINVAL;
1020 	}
1021 
1022 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1023 }
1024 
1025 int
1026 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1027 			struct iovec *iov, int iovcnt,
1028 			uint64_t offset_blocks, uint64_t num_blocks,
1029 			spdk_bdev_io_completion_cb cb, void *cb_arg)
1030 {
1031 	struct spdk_bdev *bdev = desc->bdev;
1032 	struct spdk_bdev_io *bdev_io;
1033 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1034 
1035 	if (!desc->write) {
1036 		return -EBADF;
1037 	}
1038 
1039 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1040 		return -EINVAL;
1041 	}
1042 
1043 	bdev_io = spdk_bdev_get_io();
1044 	if (!bdev_io) {
1045 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
1046 		return -ENOMEM;
1047 	}
1048 
1049 	bdev_io->ch = channel;
1050 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1051 	bdev_io->u.bdev.iovs = iov;
1052 	bdev_io->u.bdev.iovcnt = iovcnt;
1053 	bdev_io->u.bdev.num_blocks = num_blocks;
1054 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1055 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1056 
1057 	spdk_bdev_io_submit(bdev_io);
1058 	return 0;
1059 }
1060 
1061 int
1062 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1063 		       uint64_t offset, uint64_t len,
1064 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1065 {
1066 	uint64_t offset_blocks, num_blocks;
1067 
1068 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1069 		return -EINVAL;
1070 	}
1071 
1072 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1073 }
1074 
1075 int
1076 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1077 			      uint64_t offset_blocks, uint64_t num_blocks,
1078 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1079 {
1080 	struct spdk_bdev *bdev = desc->bdev;
1081 	struct spdk_bdev_io *bdev_io;
1082 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1083 
1084 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1085 		return -EINVAL;
1086 	}
1087 
1088 	bdev_io = spdk_bdev_get_io();
1089 	if (!bdev_io) {
1090 		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
1091 		return -ENOMEM;
1092 	}
1093 
1094 	bdev_io->ch = channel;
1095 	bdev_io->u.bdev.iovs = NULL;
1096 	bdev_io->u.bdev.iovcnt = 0;
1097 	bdev_io->u.bdev.num_blocks = num_blocks;
1098 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1099 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1100 
1101 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1102 
1103 	spdk_bdev_io_submit(bdev_io);
1104 	return 0;
1105 }
1106 
1107 int
1108 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1109 		uint64_t offset, uint64_t nbytes,
1110 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1111 {
1112 	uint64_t offset_blocks, num_blocks;
1113 
1114 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1115 		return -EINVAL;
1116 	}
1117 
1118 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1119 }
1120 
1121 int
1122 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1123 		       uint64_t offset_blocks, uint64_t num_blocks,
1124 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1125 {
1126 	struct spdk_bdev *bdev = desc->bdev;
1127 	struct spdk_bdev_io *bdev_io;
1128 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1129 
1130 	if (!desc->write) {
1131 		return -EBADF;
1132 	}
1133 
1134 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1135 		return -EINVAL;
1136 	}
1137 
1138 	if (num_blocks == 0) {
1139 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
1140 		return -EINVAL;
1141 	}
1142 
1143 	bdev_io = spdk_bdev_get_io();
1144 	if (!bdev_io) {
1145 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
1146 		return -ENOMEM;
1147 	}
1148 
1149 	bdev_io->ch = channel;
1150 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1151 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1152 	bdev_io->u.bdev.iovcnt = 1;
1153 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1154 	bdev_io->u.bdev.num_blocks = num_blocks;
1155 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1156 
1157 	spdk_bdev_io_submit(bdev_io);
1158 	return 0;
1159 }
1160 
1161 int
1162 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1163 		uint64_t offset, uint64_t length,
1164 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1165 {
1166 	uint64_t offset_blocks, num_blocks;
1167 
1168 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
1169 		return -EINVAL;
1170 	}
1171 
1172 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1173 }
1174 
1175 int
1176 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1177 		       uint64_t offset_blocks, uint64_t num_blocks,
1178 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1179 {
1180 	struct spdk_bdev *bdev = desc->bdev;
1181 	struct spdk_bdev_io *bdev_io;
1182 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1183 
1184 	if (!desc->write) {
1185 		return -EBADF;
1186 	}
1187 
1188 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1189 		return -EINVAL;
1190 	}
1191 
1192 	bdev_io = spdk_bdev_get_io();
1193 	if (!bdev_io) {
1194 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1195 		return -ENOMEM;
1196 	}
1197 
1198 	bdev_io->ch = channel;
1199 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1200 	bdev_io->u.bdev.iovs = NULL;
1201 	bdev_io->u.bdev.iovcnt = 0;
1202 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1203 	bdev_io->u.bdev.num_blocks = num_blocks;
1204 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1205 
1206 	spdk_bdev_io_submit(bdev_io);
1207 	return 0;
1208 }
1209 
1210 static void
1211 _spdk_bdev_reset_dev(void *io_device, void *ctx)
1212 {
1213 	struct spdk_bdev_channel *ch = ctx;
1214 	struct spdk_bdev_io *bdev_io;
1215 
1216 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
1217 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, link);
1218 	spdk_bdev_io_submit_reset(bdev_io);
1219 }
1220 
1221 static void
1222 _spdk_bdev_reset_abort_channel(void *io_device, struct spdk_io_channel *ch,
1223 			       void *ctx)
1224 {
1225 	struct spdk_bdev_channel	*channel;
1226 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1227 
1228 	channel = spdk_io_channel_get_ctx(ch);
1229 	mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
1230 
1231 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
1232 
1233 	_spdk_bdev_abort_queued_io(&channel->nomem_io, channel);
1234 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
1235 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
1236 }
1237 
1238 static void
1239 _spdk_bdev_start_reset(void *ctx)
1240 {
1241 	struct spdk_bdev_channel *ch = ctx;
1242 
1243 	spdk_for_each_channel(ch->bdev, _spdk_bdev_reset_abort_channel,
1244 			      ch, _spdk_bdev_reset_dev);
1245 }
1246 
1247 static void
1248 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
1249 {
1250 	struct spdk_bdev *bdev = ch->bdev;
1251 
1252 	assert(!TAILQ_EMPTY(&ch->queued_resets));
1253 
1254 	pthread_mutex_lock(&bdev->mutex);
1255 	if (bdev->reset_in_progress == NULL) {
1256 		bdev->reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
1257 		/*
1258 		 * Take a channel reference for the target bdev for the life of this
1259 		 *  reset.  This guards against the channel getting destroyed while
1260 		 *  spdk_for_each_channel() calls related to this reset IO are in
1261 		 *  progress.  We will release the reference when this reset is
1262 		 *  completed.
1263 		 */
1264 		bdev->reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(bdev);
1265 		_spdk_bdev_start_reset(ch);
1266 	}
1267 	pthread_mutex_unlock(&bdev->mutex);
1268 }
1269 
1270 static void
1271 _spdk_bdev_complete_reset_channel(void *io_device, struct spdk_io_channel *_ch, void *ctx)
1272 {
1273 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
1274 
1275 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
1276 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
1277 		_spdk_bdev_channel_start_reset(ch);
1278 	}
1279 }
1280 
1281 int
1282 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1283 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1284 {
1285 	struct spdk_bdev *bdev = desc->bdev;
1286 	struct spdk_bdev_io *bdev_io;
1287 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1288 
1289 	bdev_io = spdk_bdev_get_io();
1290 	if (!bdev_io) {
1291 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1292 		return -ENOMEM;
1293 	}
1294 
1295 	bdev_io->ch = channel;
1296 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1297 	bdev_io->u.reset.ch_ref = NULL;
1298 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1299 
1300 	pthread_mutex_lock(&bdev->mutex);
1301 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, link);
1302 	pthread_mutex_unlock(&bdev->mutex);
1303 
1304 	_spdk_bdev_channel_start_reset(channel);
1305 
1306 	return 0;
1307 }
1308 
1309 void
1310 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1311 		      struct spdk_bdev_io_stat *stat)
1312 {
1313 #ifdef SPDK_CONFIG_VTUNE
1314 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1315 	memset(stat, 0, sizeof(*stat));
1316 	return;
1317 #endif
1318 
1319 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1320 
1321 	*stat = channel->stat;
1322 	memset(&channel->stat, 0, sizeof(channel->stat));
1323 }
1324 
1325 int
1326 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1327 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1328 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1329 {
1330 	struct spdk_bdev *bdev = desc->bdev;
1331 	struct spdk_bdev_io *bdev_io;
1332 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1333 
1334 	if (!desc->write) {
1335 		return -EBADF;
1336 	}
1337 
1338 	bdev_io = spdk_bdev_get_io();
1339 	if (!bdev_io) {
1340 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1341 		return -ENOMEM;
1342 	}
1343 
1344 	bdev_io->ch = channel;
1345 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
1346 	bdev_io->u.nvme_passthru.cmd = *cmd;
1347 	bdev_io->u.nvme_passthru.buf = buf;
1348 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1349 
1350 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1351 
1352 	spdk_bdev_io_submit(bdev_io);
1353 	return 0;
1354 }
1355 
1356 int
1357 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1358 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1359 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1360 {
1361 	struct spdk_bdev *bdev = desc->bdev;
1362 	struct spdk_bdev_io *bdev_io;
1363 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1364 
1365 	if (!desc->write) {
1366 		/*
1367 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1368 		 *  to easily determine if the command is a read or write, but for now just
1369 		 *  do not allow io_passthru with a read-only descriptor.
1370 		 */
1371 		return -EBADF;
1372 	}
1373 
1374 	bdev_io = spdk_bdev_get_io();
1375 	if (!bdev_io) {
1376 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1377 		return -ENOMEM;
1378 	}
1379 
1380 	bdev_io->ch = channel;
1381 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
1382 	bdev_io->u.nvme_passthru.cmd = *cmd;
1383 	bdev_io->u.nvme_passthru.buf = buf;
1384 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1385 
1386 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1387 
1388 	spdk_bdev_io_submit(bdev_io);
1389 	return 0;
1390 }
1391 
1392 int
1393 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
1394 {
1395 	if (!bdev_io) {
1396 		SPDK_ERRLOG("bdev_io is NULL\n");
1397 		return -1;
1398 	}
1399 
1400 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
1401 		SPDK_ERRLOG("bdev_io is in pending state\n");
1402 		assert(false);
1403 		return -1;
1404 	}
1405 
1406 	spdk_bdev_put_io(bdev_io);
1407 
1408 	return 0;
1409 }
1410 
1411 static void
1412 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
1413 {
1414 	struct spdk_bdev *bdev = bdev_ch->bdev;
1415 	struct spdk_bdev_io *bdev_io;
1416 
1417 	if (bdev_ch->io_outstanding > bdev_ch->nomem_threshold) {
1418 		/*
1419 		 * Allow some more I/O to complete before retrying the nomem_io queue.
1420 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
1421 		 *  the context of a completion, because the resources for the I/O are
1422 		 *  not released until control returns to the bdev poller.  Also, we
1423 		 *  may require several small I/O to complete before a larger I/O
1424 		 *  (that requires splitting) can be submitted.
1425 		 */
1426 		return;
1427 	}
1428 
1429 	while (!TAILQ_EMPTY(&bdev_ch->nomem_io)) {
1430 		bdev_io = TAILQ_FIRST(&bdev_ch->nomem_io);
1431 		TAILQ_REMOVE(&bdev_ch->nomem_io, bdev_io, link);
1432 		bdev_ch->io_outstanding++;
1433 		bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
1434 		bdev->fn_table->submit_request(bdev_ch->channel, bdev_io);
1435 		if (bdev_io->status == SPDK_BDEV_IO_STATUS_NOMEM) {
1436 			break;
1437 		}
1438 	}
1439 }
1440 
1441 static void
1442 _spdk_bdev_io_complete(void *ctx)
1443 {
1444 	struct spdk_bdev_io *bdev_io = ctx;
1445 
1446 	assert(bdev_io->cb != NULL);
1447 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, bdev_io->caller_ctx);
1448 }
1449 
1450 void
1451 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
1452 {
1453 	struct spdk_bdev *bdev = bdev_io->bdev;
1454 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
1455 
1456 	bdev_io->status = status;
1457 
1458 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
1459 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
1460 			SPDK_ERRLOG("NOMEM returned for reset\n");
1461 		}
1462 		pthread_mutex_lock(&bdev->mutex);
1463 		if (bdev_io == bdev->reset_in_progress) {
1464 			bdev->reset_in_progress = NULL;
1465 		}
1466 		pthread_mutex_unlock(&bdev->mutex);
1467 		if (bdev_io->u.reset.ch_ref != NULL) {
1468 			spdk_put_io_channel(bdev_io->u.reset.ch_ref);
1469 		}
1470 		spdk_for_each_channel(bdev, _spdk_bdev_complete_reset_channel, NULL, NULL);
1471 	} else {
1472 		assert(bdev_ch->io_outstanding > 0);
1473 		bdev_ch->io_outstanding--;
1474 		if (spdk_likely(status != SPDK_BDEV_IO_STATUS_NOMEM)) {
1475 			if (spdk_unlikely(!TAILQ_EMPTY(&bdev_ch->nomem_io))) {
1476 				_spdk_bdev_ch_retry_io(bdev_ch);
1477 			}
1478 		} else {
1479 			TAILQ_INSERT_HEAD(&bdev_ch->nomem_io, bdev_io, link);
1480 			/*
1481 			 * Wait for some of the outstanding I/O to complete before we
1482 			 *  retry any of the nomem_io.  Normally we will wait for
1483 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
1484 			 *  depth channels we will instead wait for half to complete.
1485 			 */
1486 			bdev_ch->nomem_threshold = spdk_max(bdev_ch->io_outstanding / 2,
1487 							    bdev_ch->io_outstanding - NOMEM_THRESHOLD_COUNT);
1488 			return;
1489 		}
1490 	}
1491 
1492 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1493 		switch (bdev_io->type) {
1494 		case SPDK_BDEV_IO_TYPE_READ:
1495 			bdev_ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev->blocklen;
1496 			bdev_ch->stat.num_read_ops++;
1497 			break;
1498 		case SPDK_BDEV_IO_TYPE_WRITE:
1499 			bdev_ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev->blocklen;
1500 			bdev_ch->stat.num_write_ops++;
1501 			break;
1502 		default:
1503 			break;
1504 		}
1505 	}
1506 
1507 #ifdef SPDK_CONFIG_VTUNE
1508 	uint64_t now_tsc = spdk_get_ticks();
1509 	if (now_tsc > (bdev_ch->start_tsc + bdev_ch->interval_tsc)) {
1510 		uint64_t data[5];
1511 
1512 		data[0] = bdev_ch->stat.num_read_ops;
1513 		data[1] = bdev_ch->stat.bytes_read;
1514 		data[2] = bdev_ch->stat.num_write_ops;
1515 		data[3] = bdev_ch->stat.bytes_written;
1516 		data[4] = bdev->fn_table->get_spin_time ?
1517 			  bdev->fn_table->get_spin_time(bdev_ch->channel) : 0;
1518 
1519 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_ch->handle,
1520 				   __itt_metadata_u64, 5, data);
1521 
1522 		memset(&bdev_ch->stat, 0, sizeof(bdev_ch->stat));
1523 		bdev_ch->start_tsc = now_tsc;
1524 	}
1525 #endif
1526 
1527 	if (bdev_io->in_submit_request || bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
1528 		/*
1529 		 * Defer completion to avoid potential infinite recursion if the
1530 		 * user's completion callback issues a new I/O.
1531 		 */
1532 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_ch->channel),
1533 				     _spdk_bdev_io_complete, bdev_io);
1534 	} else {
1535 		_spdk_bdev_io_complete(bdev_io);
1536 	}
1537 }
1538 
1539 void
1540 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
1541 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
1542 {
1543 	if (sc == SPDK_SCSI_STATUS_GOOD) {
1544 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1545 	} else {
1546 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
1547 		bdev_io->error.scsi.sc = sc;
1548 		bdev_io->error.scsi.sk = sk;
1549 		bdev_io->error.scsi.asc = asc;
1550 		bdev_io->error.scsi.ascq = ascq;
1551 	}
1552 
1553 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1554 }
1555 
1556 void
1557 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
1558 			     int *sc, int *sk, int *asc, int *ascq)
1559 {
1560 	assert(sc != NULL);
1561 	assert(sk != NULL);
1562 	assert(asc != NULL);
1563 	assert(ascq != NULL);
1564 
1565 	switch (bdev_io->status) {
1566 	case SPDK_BDEV_IO_STATUS_SUCCESS:
1567 		*sc = SPDK_SCSI_STATUS_GOOD;
1568 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
1569 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1570 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1571 		break;
1572 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
1573 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
1574 		break;
1575 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
1576 		*sc = bdev_io->error.scsi.sc;
1577 		*sk = bdev_io->error.scsi.sk;
1578 		*asc = bdev_io->error.scsi.asc;
1579 		*ascq = bdev_io->error.scsi.ascq;
1580 		break;
1581 	default:
1582 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
1583 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
1584 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1585 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1586 		break;
1587 	}
1588 }
1589 
1590 void
1591 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
1592 {
1593 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
1594 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1595 	} else {
1596 		bdev_io->error.nvme.sct = sct;
1597 		bdev_io->error.nvme.sc = sc;
1598 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
1599 	}
1600 
1601 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1602 }
1603 
1604 void
1605 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
1606 {
1607 	assert(sct != NULL);
1608 	assert(sc != NULL);
1609 
1610 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
1611 		*sct = bdev_io->error.nvme.sct;
1612 		*sc = bdev_io->error.nvme.sc;
1613 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1614 		*sct = SPDK_NVME_SCT_GENERIC;
1615 		*sc = SPDK_NVME_SC_SUCCESS;
1616 	} else {
1617 		*sct = SPDK_NVME_SCT_GENERIC;
1618 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1619 	}
1620 }
1621 
1622 static void
1623 _spdk_bdev_register(struct spdk_bdev *bdev)
1624 {
1625 	struct spdk_bdev_module_if *module;
1626 
1627 	assert(bdev->module != NULL);
1628 
1629 	bdev->status = SPDK_BDEV_STATUS_READY;
1630 
1631 	TAILQ_INIT(&bdev->open_descs);
1632 	bdev->bdev_opened = false;
1633 
1634 	TAILQ_INIT(&bdev->vbdevs);
1635 	TAILQ_INIT(&bdev->base_bdevs);
1636 
1637 	bdev->reset_in_progress = NULL;
1638 
1639 	spdk_io_device_register(bdev, spdk_bdev_channel_create, spdk_bdev_channel_destroy,
1640 				sizeof(struct spdk_bdev_channel));
1641 
1642 	pthread_mutex_init(&bdev->mutex, NULL);
1643 	SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Inserting bdev %s into list\n", bdev->name);
1644 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
1645 
1646 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
1647 		if (module->examine) {
1648 			module->action_in_progress++;
1649 			module->examine(bdev);
1650 		}
1651 	}
1652 }
1653 
1654 void
1655 spdk_bdev_register(struct spdk_bdev *bdev)
1656 {
1657 	_spdk_bdev_register(bdev);
1658 }
1659 
1660 void
1661 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
1662 {
1663 	int i;
1664 
1665 	_spdk_bdev_register(vbdev);
1666 	for (i = 0; i < base_bdev_count; i++) {
1667 		assert(base_bdevs[i] != NULL);
1668 		TAILQ_INSERT_TAIL(&vbdev->base_bdevs, base_bdevs[i], base_bdev_link);
1669 		TAILQ_INSERT_TAIL(&base_bdevs[i]->vbdevs, vbdev, vbdev_link);
1670 	}
1671 }
1672 
1673 void
1674 spdk_bdev_unregister(struct spdk_bdev *bdev)
1675 {
1676 	struct spdk_bdev_desc	*desc, *tmp;
1677 	int			rc;
1678 	bool			do_destruct = true;
1679 
1680 	SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Removing bdev %s from list\n", bdev->name);
1681 
1682 	pthread_mutex_lock(&bdev->mutex);
1683 
1684 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
1685 
1686 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
1687 		if (desc->remove_cb) {
1688 			pthread_mutex_unlock(&bdev->mutex);
1689 			do_destruct = false;
1690 			desc->remove_cb(desc->remove_ctx);
1691 			pthread_mutex_lock(&bdev->mutex);
1692 		}
1693 	}
1694 
1695 	if (!do_destruct) {
1696 		pthread_mutex_unlock(&bdev->mutex);
1697 		return;
1698 	}
1699 
1700 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
1701 	pthread_mutex_unlock(&bdev->mutex);
1702 
1703 	pthread_mutex_destroy(&bdev->mutex);
1704 
1705 	spdk_io_device_unregister(bdev, NULL);
1706 
1707 	rc = bdev->fn_table->destruct(bdev->ctxt);
1708 	if (rc < 0) {
1709 		SPDK_ERRLOG("destruct failed\n");
1710 	}
1711 }
1712 
1713 void
1714 spdk_vbdev_unregister(struct spdk_bdev *vbdev)
1715 {
1716 	struct spdk_bdev *base_bdev;
1717 
1718 	assert(!TAILQ_EMPTY(&vbdev->base_bdevs));
1719 	TAILQ_FOREACH(base_bdev, &vbdev->base_bdevs, base_bdev_link) {
1720 		TAILQ_REMOVE(&base_bdev->vbdevs, vbdev, vbdev_link);
1721 	}
1722 	spdk_bdev_unregister(vbdev);
1723 }
1724 
1725 bool
1726 spdk_is_bdev_opened(struct spdk_bdev *bdev)
1727 {
1728 	struct spdk_bdev *base;
1729 
1730 	if (bdev->bdev_opened) {
1731 		return true;
1732 	}
1733 
1734 	TAILQ_FOREACH(base, &bdev->base_bdevs, base_bdev_link) {
1735 		if (spdk_is_bdev_opened(base)) {
1736 			return true;
1737 		}
1738 	}
1739 
1740 	return false;
1741 }
1742 
1743 int
1744 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
1745 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
1746 {
1747 	struct spdk_bdev_desc *desc;
1748 
1749 	desc = calloc(1, sizeof(*desc));
1750 	if (desc == NULL) {
1751 		return -ENOMEM;
1752 	}
1753 
1754 	pthread_mutex_lock(&bdev->mutex);
1755 
1756 	if (write && bdev->claim_module) {
1757 		SPDK_ERRLOG("failed, %s already claimed\n", bdev->name);
1758 		free(desc);
1759 		pthread_mutex_unlock(&bdev->mutex);
1760 		return -EPERM;
1761 	}
1762 
1763 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
1764 
1765 	bdev->bdev_opened = true;
1766 
1767 	desc->bdev = bdev;
1768 	desc->remove_cb = remove_cb;
1769 	desc->remove_ctx = remove_ctx;
1770 	desc->write = write;
1771 	*_desc = desc;
1772 
1773 	pthread_mutex_unlock(&bdev->mutex);
1774 
1775 	return 0;
1776 }
1777 
1778 void
1779 spdk_bdev_close(struct spdk_bdev_desc *desc)
1780 {
1781 	struct spdk_bdev *bdev = desc->bdev;
1782 	bool do_unregister = false;
1783 
1784 	pthread_mutex_lock(&bdev->mutex);
1785 
1786 	bdev->bdev_opened = false;
1787 
1788 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
1789 	free(desc);
1790 
1791 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
1792 		do_unregister = true;
1793 	}
1794 	pthread_mutex_unlock(&bdev->mutex);
1795 
1796 	if (do_unregister == true) {
1797 		spdk_bdev_unregister(bdev);
1798 	}
1799 }
1800 
1801 int
1802 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
1803 			    struct spdk_bdev_module_if *module)
1804 {
1805 	if (bdev->claim_module != NULL) {
1806 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
1807 			    bdev->claim_module->name);
1808 		return -EPERM;
1809 	}
1810 
1811 	if (desc && !desc->write) {
1812 		desc->write = true;
1813 	}
1814 
1815 	bdev->claim_module = module;
1816 	return 0;
1817 }
1818 
1819 void
1820 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
1821 {
1822 	assert(bdev->claim_module != NULL);
1823 	bdev->claim_module = NULL;
1824 }
1825 
1826 struct spdk_bdev *
1827 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
1828 {
1829 	return desc->bdev;
1830 }
1831 
1832 void
1833 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
1834 {
1835 	struct iovec *iovs;
1836 	int iovcnt;
1837 
1838 	if (bdev_io == NULL) {
1839 		return;
1840 	}
1841 
1842 	switch (bdev_io->type) {
1843 	case SPDK_BDEV_IO_TYPE_READ:
1844 		iovs = bdev_io->u.bdev.iovs;
1845 		iovcnt = bdev_io->u.bdev.iovcnt;
1846 		break;
1847 	case SPDK_BDEV_IO_TYPE_WRITE:
1848 		iovs = bdev_io->u.bdev.iovs;
1849 		iovcnt = bdev_io->u.bdev.iovcnt;
1850 		break;
1851 	default:
1852 		iovs = NULL;
1853 		iovcnt = 0;
1854 		break;
1855 	}
1856 
1857 	if (iovp) {
1858 		*iovp = iovs;
1859 	}
1860 	if (iovcntp) {
1861 		*iovcntp = iovcnt;
1862 	}
1863 }
1864 
1865 void
1866 spdk_bdev_module_list_add(struct spdk_bdev_module_if *bdev_module)
1867 {
1868 	/*
1869 	 * Modules with examine callbacks must be initialized first, so they are
1870 	 *  ready to handle examine callbacks from later modules that will
1871 	 *  register physical bdevs.
1872 	 */
1873 	if (bdev_module->examine != NULL) {
1874 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
1875 	} else {
1876 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
1877 	}
1878 }
1879 
1880 void
1881 spdk_bdev_part_base_free(struct spdk_bdev_part_base *base)
1882 {
1883 	assert(base->bdev);
1884 	assert(base->desc);
1885 	spdk_bdev_close(base->desc);
1886 	free(base);
1887 }
1888 
1889 void
1890 spdk_bdev_part_free(struct spdk_bdev_part *part)
1891 {
1892 	struct spdk_bdev_part_base *base;
1893 
1894 	assert(part);
1895 	assert(part->base);
1896 
1897 	base = part->base;
1898 	spdk_io_device_unregister(&part->base, NULL);
1899 	TAILQ_REMOVE(base->tailq, part, tailq);
1900 	free(part->bdev.name);
1901 	free(part);
1902 
1903 	if (__sync_sub_and_fetch(&base->ref, 1) == 0) {
1904 		spdk_bdev_module_release_bdev(base->bdev);
1905 		spdk_bdev_part_base_free(base);
1906 	}
1907 }
1908 
1909 void
1910 spdk_bdev_part_tailq_fini(struct bdev_part_tailq *tailq)
1911 {
1912 	struct spdk_bdev_part *part, *tmp;
1913 
1914 	TAILQ_FOREACH_SAFE(part, tailq, tailq, tmp) {
1915 		spdk_bdev_part_free(part);
1916 	}
1917 }
1918 
1919 void
1920 spdk_bdev_part_base_hotremove(struct spdk_bdev *base_bdev, struct bdev_part_tailq *tailq)
1921 {
1922 	struct spdk_bdev_part *part, *tmp;
1923 
1924 	TAILQ_FOREACH_SAFE(part, tailq, tailq, tmp) {
1925 		if (part->base->bdev == base_bdev) {
1926 			spdk_bdev_unregister(&part->bdev);
1927 		}
1928 	}
1929 }
1930 
1931 static bool
1932 spdk_bdev_part_io_type_supported(void *_part, enum spdk_bdev_io_type io_type)
1933 {
1934 	struct spdk_bdev_part *part = _part;
1935 
1936 	return part->base->bdev->fn_table->io_type_supported(part->base->bdev, io_type);
1937 }
1938 
1939 static struct spdk_io_channel *
1940 spdk_bdev_part_get_io_channel(void *_part)
1941 {
1942 	struct spdk_bdev_part *part = _part;
1943 
1944 	return spdk_get_io_channel(&part->base);
1945 }
1946 
1947 static void
1948 spdk_bdev_part_complete_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1949 {
1950 	struct spdk_bdev_io *part_io = cb_arg;
1951 	int status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
1952 
1953 	spdk_bdev_io_complete(part_io, status);
1954 	spdk_bdev_free_io(bdev_io);
1955 }
1956 
1957 void
1958 spdk_bdev_part_submit_request(struct spdk_bdev_part_channel *ch, struct spdk_bdev_io *bdev_io)
1959 {
1960 	struct spdk_bdev_part *part = ch->part;
1961 	struct spdk_io_channel *base_ch = ch->base_ch;
1962 	struct spdk_bdev_desc *base_desc = part->base->desc;
1963 	uint64_t offset;
1964 	int rc = 0;
1965 
1966 	/* Modify the I/O to adjust for the offset within the base bdev. */
1967 	switch (bdev_io->type) {
1968 	case SPDK_BDEV_IO_TYPE_READ:
1969 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
1970 		rc = spdk_bdev_readv_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
1971 					    bdev_io->u.bdev.iovcnt, offset,
1972 					    bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
1973 					    bdev_io);
1974 		break;
1975 	case SPDK_BDEV_IO_TYPE_WRITE:
1976 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
1977 		rc = spdk_bdev_writev_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
1978 					     bdev_io->u.bdev.iovcnt, offset,
1979 					     bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
1980 					     bdev_io);
1981 		break;
1982 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1983 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
1984 		rc = spdk_bdev_write_zeroes_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
1985 						   spdk_bdev_part_complete_io, bdev_io);
1986 		break;
1987 	case SPDK_BDEV_IO_TYPE_UNMAP:
1988 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
1989 		rc = spdk_bdev_unmap_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
1990 					    spdk_bdev_part_complete_io, bdev_io);
1991 		break;
1992 	case SPDK_BDEV_IO_TYPE_FLUSH:
1993 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
1994 		rc = spdk_bdev_flush_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
1995 					    spdk_bdev_part_complete_io, bdev_io);
1996 		break;
1997 	case SPDK_BDEV_IO_TYPE_RESET:
1998 		rc = spdk_bdev_reset(base_desc, base_ch,
1999 				     spdk_bdev_part_complete_io, bdev_io);
2000 		break;
2001 	default:
2002 		SPDK_ERRLOG("split: unknown I/O type %d\n", bdev_io->type);
2003 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
2004 		return;
2005 	}
2006 
2007 	if (rc != 0) {
2008 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
2009 	}
2010 }
2011 static int
2012 spdk_bdev_part_channel_create_cb(void *io_device, void *ctx_buf)
2013 {
2014 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
2015 	struct spdk_bdev_part_channel *ch = ctx_buf;
2016 
2017 	ch->part = part;
2018 	ch->base_ch = spdk_bdev_get_io_channel(part->base->desc);
2019 	if (ch->base_ch == NULL) {
2020 		return -1;
2021 	}
2022 
2023 	if (part->base->ch_create_cb) {
2024 		return part->base->ch_create_cb(io_device, ctx_buf);
2025 	} else {
2026 		return 0;
2027 	}
2028 }
2029 
2030 static void
2031 spdk_bdev_part_channel_destroy_cb(void *io_device, void *ctx_buf)
2032 {
2033 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
2034 	struct spdk_bdev_part_channel *ch = ctx_buf;
2035 
2036 	if (part->base->ch_destroy_cb) {
2037 		part->base->ch_destroy_cb(io_device, ctx_buf);
2038 	}
2039 	spdk_put_io_channel(ch->base_ch);
2040 }
2041 
2042 int
2043 spdk_bdev_part_base_construct(struct spdk_bdev_part_base *base, struct spdk_bdev *bdev,
2044 			      spdk_bdev_remove_cb_t remove_cb, struct spdk_bdev_module_if *module,
2045 			      struct spdk_bdev_fn_table *fn_table, struct bdev_part_tailq *tailq,
2046 			      uint32_t channel_size, spdk_io_channel_create_cb ch_create_cb,
2047 			      spdk_io_channel_destroy_cb ch_destroy_cb)
2048 {
2049 	int rc;
2050 
2051 	fn_table->get_io_channel = spdk_bdev_part_get_io_channel;
2052 	fn_table->io_type_supported = spdk_bdev_part_io_type_supported;
2053 
2054 	base->bdev = bdev;
2055 	base->ref = 0;
2056 	base->module = module;
2057 	base->fn_table = fn_table;
2058 	base->tailq = tailq;
2059 	base->claimed = false;
2060 	base->channel_size = channel_size;
2061 	base->ch_create_cb = ch_create_cb;
2062 	base->ch_destroy_cb = ch_destroy_cb;
2063 
2064 	rc = spdk_bdev_open(bdev, false, remove_cb, bdev, &base->desc);
2065 	if (rc) {
2066 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev));
2067 		return -1;
2068 	}
2069 
2070 	return 0;
2071 }
2072 
2073 int
2074 spdk_bdev_part_construct(struct spdk_bdev_part *part, struct spdk_bdev_part_base *base,
2075 			 char *name, uint64_t offset_blocks, uint64_t num_blocks,
2076 			 char *product_name)
2077 {
2078 	part->bdev.name = name;
2079 	part->bdev.blocklen = base->bdev->blocklen;
2080 	part->bdev.blockcnt = num_blocks;
2081 	part->offset_blocks = offset_blocks;
2082 
2083 	part->bdev.write_cache = base->bdev->write_cache;
2084 	part->bdev.need_aligned_buffer = base->bdev->need_aligned_buffer;
2085 	part->bdev.product_name = product_name;
2086 	part->bdev.ctxt = part;
2087 	part->bdev.module = base->module;
2088 	part->bdev.fn_table = base->fn_table;
2089 
2090 	__sync_fetch_and_add(&base->ref, 1);
2091 	part->base = base;
2092 
2093 	if (!base->claimed) {
2094 		int rc;
2095 
2096 		rc = spdk_bdev_module_claim_bdev(base->bdev, base->desc, base->module);
2097 		if (rc) {
2098 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(base->bdev));
2099 			free(part->bdev.name);
2100 			return -1;
2101 		}
2102 		base->claimed = true;
2103 	}
2104 
2105 	spdk_io_device_register(&part->base, spdk_bdev_part_channel_create_cb,
2106 				spdk_bdev_part_channel_destroy_cb,
2107 				base->channel_size);
2108 	spdk_vbdev_register(&part->bdev, &base->bdev, 1);
2109 	TAILQ_INSERT_TAIL(base->tailq, part, tailq);
2110 
2111 	return 0;
2112 }
2113 
2114 SPDK_LOG_REGISTER_TRACE_FLAG("bdev", SPDK_TRACE_BDEV)
2115