xref: /spdk/lib/bdev/bdev.c (revision 2baeea7dd43483689a430ab2f03091373a626a7b)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/likely.h"
42 #include "spdk/queue.h"
43 #include "spdk/nvme_spec.h"
44 #include "spdk/scsi_spec.h"
45 #include "spdk/util.h"
46 
47 #include "spdk_internal/bdev.h"
48 #include "spdk_internal/log.h"
49 #include "spdk/string.h"
50 
51 #ifdef SPDK_CONFIG_VTUNE
52 #include "ittnotify.h"
53 #include "ittnotify_types.h"
54 int __itt_init_ittlib(const char *, __itt_group_id);
55 #endif
56 
57 #define SPDK_BDEV_IO_POOL_SIZE	(64 * 1024)
58 #define BUF_SMALL_POOL_SIZE	8192
59 #define BUF_LARGE_POOL_SIZE	1024
60 #define NOMEM_THRESHOLD_COUNT	8
61 #define ZERO_BUFFER_SIZE	0x100000
62 
63 typedef TAILQ_HEAD(, spdk_bdev_io) bdev_io_tailq_t;
64 
65 struct spdk_bdev_mgr {
66 	struct spdk_mempool *bdev_io_pool;
67 
68 	struct spdk_mempool *buf_small_pool;
69 	struct spdk_mempool *buf_large_pool;
70 
71 	void *zero_buffer;
72 
73 	TAILQ_HEAD(, spdk_bdev_module_if) bdev_modules;
74 
75 	TAILQ_HEAD(, spdk_bdev) bdevs;
76 
77 	spdk_bdev_poller_start_cb start_poller_fn;
78 	spdk_bdev_poller_stop_cb stop_poller_fn;
79 
80 	bool init_complete;
81 	bool module_init_complete;
82 
83 #ifdef SPDK_CONFIG_VTUNE
84 	__itt_domain	*domain;
85 #endif
86 };
87 
88 static struct spdk_bdev_mgr g_bdev_mgr = {
89 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
90 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
91 	.start_poller_fn = NULL,
92 	.stop_poller_fn = NULL,
93 	.init_complete = false,
94 	.module_init_complete = false,
95 };
96 
97 static spdk_bdev_init_cb	g_cb_fn = NULL;
98 static void			*g_cb_arg = NULL;
99 
100 
101 struct spdk_bdev_mgmt_channel {
102 	bdev_io_tailq_t need_buf_small;
103 	bdev_io_tailq_t need_buf_large;
104 };
105 
106 struct spdk_bdev_desc {
107 	struct spdk_bdev		*bdev;
108 	spdk_bdev_remove_cb_t		remove_cb;
109 	void				*remove_ctx;
110 	bool				write;
111 	TAILQ_ENTRY(spdk_bdev_desc)	link;
112 };
113 
114 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
115 
116 struct spdk_bdev_channel {
117 	struct spdk_bdev	*bdev;
118 
119 	/* The channel for the underlying device */
120 	struct spdk_io_channel	*channel;
121 
122 	/* Channel for the bdev manager */
123 	struct spdk_io_channel *mgmt_channel;
124 
125 	struct spdk_bdev_io_stat stat;
126 
127 	/*
128 	 * Count of I/O submitted to bdev module and waiting for completion.
129 	 * Incremented before submit_request() is called on an spdk_bdev_io.
130 	 */
131 	uint64_t		io_outstanding;
132 
133 	bdev_io_tailq_t		queued_resets;
134 
135 	/*
136 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
137 	 *  on this channel.
138 	 */
139 	bdev_io_tailq_t		nomem_io;
140 
141 	/*
142 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
143 	 */
144 	uint64_t		nomem_threshold;
145 
146 	uint32_t		flags;
147 
148 #ifdef SPDK_CONFIG_VTUNE
149 	uint64_t		start_tsc;
150 	uint64_t		interval_tsc;
151 	__itt_string_handle	*handle;
152 #endif
153 
154 };
155 
156 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
157 
158 struct spdk_bdev *
159 spdk_bdev_first(void)
160 {
161 	struct spdk_bdev *bdev;
162 
163 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
164 	if (bdev) {
165 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Starting bdev iteration at %s\n", bdev->name);
166 	}
167 
168 	return bdev;
169 }
170 
171 struct spdk_bdev *
172 spdk_bdev_next(struct spdk_bdev *prev)
173 {
174 	struct spdk_bdev *bdev;
175 
176 	bdev = TAILQ_NEXT(prev, link);
177 	if (bdev) {
178 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
179 	}
180 
181 	return bdev;
182 }
183 
184 static struct spdk_bdev *
185 _bdev_next_leaf(struct spdk_bdev *bdev)
186 {
187 	while (bdev != NULL) {
188 		if (TAILQ_EMPTY(&bdev->vbdevs)) {
189 			return bdev;
190 		} else {
191 			bdev = TAILQ_NEXT(bdev, link);
192 		}
193 	}
194 
195 	return bdev;
196 }
197 
198 struct spdk_bdev *
199 spdk_bdev_first_leaf(void)
200 {
201 	struct spdk_bdev *bdev;
202 
203 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
204 
205 	if (bdev) {
206 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Starting bdev iteration at %s\n", bdev->name);
207 	}
208 
209 	return bdev;
210 }
211 
212 struct spdk_bdev *
213 spdk_bdev_next_leaf(struct spdk_bdev *prev)
214 {
215 	struct spdk_bdev *bdev;
216 
217 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
218 
219 	if (bdev) {
220 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
221 	}
222 
223 	return bdev;
224 }
225 
226 struct spdk_bdev *
227 spdk_bdev_get_by_name(const char *bdev_name)
228 {
229 	struct spdk_bdev *bdev = spdk_bdev_first();
230 
231 	while (bdev != NULL) {
232 		if (strcmp(bdev_name, bdev->name) == 0) {
233 			return bdev;
234 		}
235 		bdev = spdk_bdev_next(bdev);
236 	}
237 
238 	return NULL;
239 }
240 
241 static void
242 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
243 {
244 	assert(bdev_io->get_buf_cb != NULL);
245 	assert(buf != NULL);
246 	assert(bdev_io->u.bdev.iovs != NULL);
247 
248 	bdev_io->buf = buf;
249 	bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
250 	bdev_io->u.bdev.iovs[0].iov_len = bdev_io->buf_len;
251 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
252 }
253 
254 static void
255 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
256 {
257 	struct spdk_mempool *pool;
258 	struct spdk_bdev_io *tmp;
259 	void *buf;
260 	bdev_io_tailq_t *tailq;
261 	struct spdk_bdev_mgmt_channel *ch;
262 
263 	assert(bdev_io->u.bdev.iovcnt == 1);
264 
265 	buf = bdev_io->buf;
266 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
267 
268 	if (bdev_io->buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
269 		pool = g_bdev_mgr.buf_small_pool;
270 		tailq = &ch->need_buf_small;
271 	} else {
272 		pool = g_bdev_mgr.buf_large_pool;
273 		tailq = &ch->need_buf_large;
274 	}
275 
276 	if (TAILQ_EMPTY(tailq)) {
277 		spdk_mempool_put(pool, buf);
278 	} else {
279 		tmp = TAILQ_FIRST(tailq);
280 		TAILQ_REMOVE(tailq, tmp, buf_link);
281 		spdk_bdev_io_set_buf(tmp, buf);
282 	}
283 }
284 
285 void
286 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
287 {
288 	struct spdk_mempool *pool;
289 	bdev_io_tailq_t *tailq;
290 	void *buf = NULL;
291 	struct spdk_bdev_mgmt_channel *ch;
292 
293 	assert(cb != NULL);
294 	assert(bdev_io->u.bdev.iovs != NULL);
295 
296 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
297 		/* Buffer already present */
298 		cb(bdev_io->ch->channel, bdev_io);
299 		return;
300 	}
301 
302 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
303 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
304 
305 	bdev_io->buf_len = len;
306 	bdev_io->get_buf_cb = cb;
307 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
308 		pool = g_bdev_mgr.buf_small_pool;
309 		tailq = &ch->need_buf_small;
310 	} else {
311 		pool = g_bdev_mgr.buf_large_pool;
312 		tailq = &ch->need_buf_large;
313 	}
314 
315 	buf = spdk_mempool_get(pool);
316 
317 	if (!buf) {
318 		TAILQ_INSERT_TAIL(tailq, bdev_io, buf_link);
319 	} else {
320 		spdk_bdev_io_set_buf(bdev_io, buf);
321 	}
322 }
323 
324 static int
325 spdk_bdev_module_get_max_ctx_size(void)
326 {
327 	struct spdk_bdev_module_if *bdev_module;
328 	int max_bdev_module_size = 0;
329 
330 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
331 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
332 			max_bdev_module_size = bdev_module->get_ctx_size();
333 		}
334 	}
335 
336 	return max_bdev_module_size;
337 }
338 
339 void
340 spdk_bdev_config_text(FILE *fp)
341 {
342 	struct spdk_bdev_module_if *bdev_module;
343 
344 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
345 		if (bdev_module->config_text) {
346 			bdev_module->config_text(fp);
347 		}
348 	}
349 }
350 
351 static int
352 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
353 {
354 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
355 
356 	TAILQ_INIT(&ch->need_buf_small);
357 	TAILQ_INIT(&ch->need_buf_large);
358 
359 	return 0;
360 }
361 
362 static void
363 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
364 {
365 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
366 
367 	if (!TAILQ_EMPTY(&ch->need_buf_small) || !TAILQ_EMPTY(&ch->need_buf_large)) {
368 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel destruction\n");
369 	}
370 }
371 
372 static void
373 spdk_bdev_init_complete(int rc)
374 {
375 	spdk_bdev_init_cb cb_fn = g_cb_fn;
376 	void *cb_arg = g_cb_arg;
377 
378 	g_bdev_mgr.init_complete = true;
379 	g_cb_fn = NULL;
380 	g_cb_arg = NULL;
381 
382 	cb_fn(cb_arg, rc);
383 }
384 
385 static void
386 spdk_bdev_module_action_complete(void)
387 {
388 	struct spdk_bdev_module_if *m;
389 
390 	/*
391 	 * Don't finish bdev subsystem initialization if
392 	 * module pre-initialization is still in progress, or
393 	 * the subsystem been already initialized.
394 	 */
395 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
396 		return;
397 	}
398 
399 	/*
400 	 * Check all bdev modules for inits/examinations in progress. If any
401 	 * exist, return immediately since we cannot finish bdev subsystem
402 	 * initialization until all are completed.
403 	 */
404 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
405 		if (m->action_in_progress > 0) {
406 			return;
407 		}
408 	}
409 
410 	/*
411 	 * Modules already finished initialization - now that all
412 	 * the bdev modules have finished their asynchronous I/O
413 	 * processing, the entire bdev layer can be marked as complete.
414 	 */
415 	spdk_bdev_init_complete(0);
416 }
417 
418 static void
419 spdk_bdev_module_action_done(struct spdk_bdev_module_if *module)
420 {
421 	assert(module->action_in_progress > 0);
422 	module->action_in_progress--;
423 	spdk_bdev_module_action_complete();
424 }
425 
426 void
427 spdk_bdev_module_init_done(struct spdk_bdev_module_if *module)
428 {
429 	spdk_bdev_module_action_done(module);
430 }
431 
432 void
433 spdk_bdev_module_examine_done(struct spdk_bdev_module_if *module)
434 {
435 	spdk_bdev_module_action_done(module);
436 }
437 
438 static int
439 spdk_bdev_modules_init(void)
440 {
441 	struct spdk_bdev_module_if *module;
442 	int rc = 0;
443 
444 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
445 		rc = module->module_init();
446 		if (rc != 0) {
447 			break;
448 		}
449 	}
450 
451 	g_bdev_mgr.module_init_complete = true;
452 	return rc;
453 }
454 
455 void
456 spdk_bdev_poller_start(struct spdk_bdev_poller **ppoller,
457 		       spdk_bdev_poller_fn fn,
458 		       void *arg,
459 		       uint64_t period_microseconds)
460 {
461 	g_bdev_mgr.start_poller_fn(ppoller, fn, arg, period_microseconds);
462 }
463 
464 void
465 spdk_bdev_poller_stop(struct spdk_bdev_poller **ppoller)
466 {
467 	g_bdev_mgr.stop_poller_fn(ppoller);
468 }
469 
470 void
471 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg,
472 		     spdk_bdev_poller_start_cb start_poller_fn,
473 		     spdk_bdev_poller_stop_cb stop_poller_fn)
474 {
475 	int cache_size;
476 	int rc = 0;
477 	char mempool_name[32];
478 
479 	assert(cb_fn != NULL);
480 
481 	g_cb_fn = cb_fn;
482 	g_cb_arg = cb_arg;
483 
484 	g_bdev_mgr.start_poller_fn = start_poller_fn;
485 	g_bdev_mgr.stop_poller_fn = stop_poller_fn;
486 
487 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
488 
489 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
490 				  SPDK_BDEV_IO_POOL_SIZE,
491 				  sizeof(struct spdk_bdev_io) +
492 				  spdk_bdev_module_get_max_ctx_size(),
493 				  64,
494 				  SPDK_ENV_SOCKET_ID_ANY);
495 
496 	if (g_bdev_mgr.bdev_io_pool == NULL) {
497 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
498 		spdk_bdev_init_complete(-1);
499 		return;
500 	}
501 
502 	/**
503 	 * Ensure no more than half of the total buffers end up local caches, by
504 	 *   using spdk_env_get_core_count() to determine how many local caches we need
505 	 *   to account for.
506 	 */
507 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
508 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
509 
510 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
511 				    BUF_SMALL_POOL_SIZE,
512 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
513 				    cache_size,
514 				    SPDK_ENV_SOCKET_ID_ANY);
515 	if (!g_bdev_mgr.buf_small_pool) {
516 		SPDK_ERRLOG("create rbuf small pool failed\n");
517 		spdk_bdev_init_complete(-1);
518 		return;
519 	}
520 
521 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
522 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
523 
524 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
525 				    BUF_LARGE_POOL_SIZE,
526 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
527 				    cache_size,
528 				    SPDK_ENV_SOCKET_ID_ANY);
529 	if (!g_bdev_mgr.buf_large_pool) {
530 		SPDK_ERRLOG("create rbuf large pool failed\n");
531 		spdk_bdev_init_complete(-1);
532 		return;
533 	}
534 
535 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
536 				 NULL);
537 	if (!g_bdev_mgr.zero_buffer) {
538 		SPDK_ERRLOG("create bdev zero buffer failed\n");
539 		spdk_bdev_init_complete(-1);
540 		return;
541 	}
542 
543 #ifdef SPDK_CONFIG_VTUNE
544 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
545 #endif
546 
547 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
548 				spdk_bdev_mgmt_channel_destroy,
549 				sizeof(struct spdk_bdev_mgmt_channel));
550 
551 	rc = spdk_bdev_modules_init();
552 	if (rc != 0) {
553 		SPDK_ERRLOG("bdev modules init failed\n");
554 		spdk_bdev_init_complete(-1);
555 		return;
556 	}
557 
558 	spdk_bdev_module_action_complete();
559 }
560 
561 void
562 spdk_bdev_finish(void)
563 {
564 	struct spdk_bdev_module_if *bdev_module;
565 
566 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
567 		if (bdev_module->module_fini) {
568 			bdev_module->module_fini();
569 		}
570 	}
571 
572 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
573 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
574 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
575 			    SPDK_BDEV_IO_POOL_SIZE);
576 	}
577 
578 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
579 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
580 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
581 			    BUF_SMALL_POOL_SIZE);
582 		assert(false);
583 	}
584 
585 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
586 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
587 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
588 			    BUF_LARGE_POOL_SIZE);
589 		assert(false);
590 	}
591 
592 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
593 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
594 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
595 	spdk_dma_free(g_bdev_mgr.zero_buffer);
596 
597 	spdk_io_device_unregister(&g_bdev_mgr, NULL);
598 }
599 
600 struct spdk_bdev_io *
601 spdk_bdev_get_io(void)
602 {
603 	struct spdk_bdev_io *bdev_io;
604 
605 	bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
606 	if (!bdev_io) {
607 		SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
608 		abort();
609 	}
610 
611 	memset(bdev_io, 0, offsetof(struct spdk_bdev_io, u));
612 
613 	return bdev_io;
614 }
615 
616 static void
617 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
618 {
619 	if (bdev_io->buf != NULL) {
620 		spdk_bdev_io_put_buf(bdev_io);
621 	}
622 
623 	spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
624 }
625 
626 static void
627 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
628 {
629 	struct spdk_bdev *bdev = bdev_io->bdev;
630 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
631 	struct spdk_io_channel *ch = bdev_ch->channel;
632 
633 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
634 
635 	bdev_ch->io_outstanding++;
636 	bdev_io->in_submit_request = true;
637 	if (spdk_likely(bdev_ch->flags == 0)) {
638 		if (spdk_likely(TAILQ_EMPTY(&bdev_ch->nomem_io))) {
639 			bdev->fn_table->submit_request(ch, bdev_io);
640 		} else {
641 			bdev_ch->io_outstanding--;
642 			TAILQ_INSERT_TAIL(&bdev_ch->nomem_io, bdev_io, link);
643 		}
644 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
645 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
646 	} else {
647 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
648 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
649 	}
650 	bdev_io->in_submit_request = false;
651 }
652 
653 static void
654 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
655 {
656 	struct spdk_bdev *bdev = bdev_io->bdev;
657 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
658 	struct spdk_io_channel *ch = bdev_ch->channel;
659 
660 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
661 
662 	bdev_io->in_submit_request = true;
663 	bdev->fn_table->submit_request(ch, bdev_io);
664 	bdev_io->in_submit_request = false;
665 }
666 
667 static void
668 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
669 		  struct spdk_bdev *bdev, void *cb_arg,
670 		  spdk_bdev_io_completion_cb cb)
671 {
672 	bdev_io->bdev = bdev;
673 	bdev_io->caller_ctx = cb_arg;
674 	bdev_io->cb = cb;
675 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
676 	bdev_io->in_submit_request = false;
677 }
678 
679 bool
680 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
681 {
682 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
683 }
684 
685 int
686 spdk_bdev_dump_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
687 {
688 	if (bdev->fn_table->dump_config_json) {
689 		return bdev->fn_table->dump_config_json(bdev->ctxt, w);
690 	}
691 
692 	return 0;
693 }
694 
695 static int
696 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
697 {
698 	struct spdk_bdev		*bdev = io_device;
699 	struct spdk_bdev_channel	*ch = ctx_buf;
700 
701 	ch->bdev = io_device;
702 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
703 	if (!ch->channel) {
704 		return -1;
705 	}
706 
707 	ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr);
708 	if (!ch->mgmt_channel) {
709 		spdk_put_io_channel(ch->channel);
710 		return -1;
711 	}
712 
713 	memset(&ch->stat, 0, sizeof(ch->stat));
714 	ch->io_outstanding = 0;
715 	TAILQ_INIT(&ch->queued_resets);
716 	TAILQ_INIT(&ch->nomem_io);
717 	ch->nomem_threshold = 0;
718 	ch->flags = 0;
719 
720 #ifdef SPDK_CONFIG_VTUNE
721 	{
722 		char *name;
723 		__itt_init_ittlib(NULL, 0);
724 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
725 		if (!name) {
726 			spdk_put_io_channel(ch->channel);
727 			spdk_put_io_channel(ch->mgmt_channel);
728 			return -1;
729 		}
730 		ch->handle = __itt_string_handle_create(name);
731 		free(name);
732 		ch->start_tsc = spdk_get_ticks();
733 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
734 	}
735 #endif
736 
737 	return 0;
738 }
739 
740 /*
741  * Abort I/O that are waiting on a data buffer.  These types of I/O are
742  *  linked using the spdk_bdev_io buf_link TAILQ_ENTRY.
743  */
744 static void
745 _spdk_bdev_abort_buf_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
746 {
747 	struct spdk_bdev_io *bdev_io, *tmp;
748 
749 	TAILQ_FOREACH_SAFE(bdev_io, queue, buf_link, tmp) {
750 		if (bdev_io->ch == ch) {
751 			TAILQ_REMOVE(queue, bdev_io, buf_link);
752 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
753 		}
754 	}
755 }
756 
757 /*
758  * Abort I/O that are queued waiting for submission.  These types of I/O are
759  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
760  */
761 static void
762 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
763 {
764 	struct spdk_bdev_io *bdev_io, *tmp;
765 
766 	TAILQ_FOREACH_SAFE(bdev_io, queue, link, tmp) {
767 		if (bdev_io->ch == ch) {
768 			TAILQ_REMOVE(queue, bdev_io, link);
769 			/*
770 			 * spdk_bdev_io_complete() assumes that the completed I/O had
771 			 *  been submitted to the bdev module.  Since in this case it
772 			 *  hadn't, bump io_outstanding to account for the decrement
773 			 *  that spdk_bdev_io_complete() will do.
774 			 */
775 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
776 				ch->io_outstanding++;
777 			}
778 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
779 		}
780 	}
781 }
782 
783 static void
784 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
785 {
786 	struct spdk_bdev_channel	*ch = ctx_buf;
787 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
788 
789 	mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
790 
791 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
792 	_spdk_bdev_abort_queued_io(&ch->nomem_io, ch);
793 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, ch);
794 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, ch);
795 
796 	spdk_put_io_channel(ch->channel);
797 	spdk_put_io_channel(ch->mgmt_channel);
798 	assert(ch->io_outstanding == 0);
799 }
800 
801 struct spdk_io_channel *
802 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
803 {
804 	return spdk_get_io_channel(desc->bdev);
805 }
806 
807 const char *
808 spdk_bdev_get_name(const struct spdk_bdev *bdev)
809 {
810 	return bdev->name;
811 }
812 
813 const char *
814 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
815 {
816 	return bdev->product_name;
817 }
818 
819 uint32_t
820 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
821 {
822 	return bdev->blocklen;
823 }
824 
825 uint64_t
826 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
827 {
828 	return bdev->blockcnt;
829 }
830 
831 size_t
832 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
833 {
834 	/* TODO: push this logic down to the bdev modules */
835 	if (bdev->need_aligned_buffer) {
836 		return bdev->blocklen;
837 	}
838 
839 	return 1;
840 }
841 
842 uint32_t
843 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
844 {
845 	return bdev->optimal_io_boundary;
846 }
847 
848 bool
849 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
850 {
851 	return bdev->write_cache;
852 }
853 
854 /*
855  * Convert I/O offset and length from bytes to blocks.
856  *
857  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
858  */
859 static uint64_t
860 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
861 			  uint64_t num_bytes, uint64_t *num_blocks)
862 {
863 	uint32_t block_size = bdev->blocklen;
864 
865 	*offset_blocks = offset_bytes / block_size;
866 	*num_blocks = num_bytes / block_size;
867 
868 	return (offset_bytes % block_size) | (num_bytes % block_size);
869 }
870 
871 static bool
872 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
873 {
874 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
875 	 * has been an overflow and hence the offset has been wrapped around */
876 	if (offset_blocks + num_blocks < offset_blocks) {
877 		return false;
878 	}
879 
880 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
881 	if (offset_blocks + num_blocks > bdev->blockcnt) {
882 		return false;
883 	}
884 
885 	return true;
886 }
887 
888 int
889 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
890 	       void *buf, uint64_t offset, uint64_t nbytes,
891 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
892 {
893 	uint64_t offset_blocks, num_blocks;
894 
895 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
896 		return -EINVAL;
897 	}
898 
899 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
900 }
901 
902 int
903 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
904 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
905 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
906 {
907 	struct spdk_bdev *bdev = desc->bdev;
908 	struct spdk_bdev_io *bdev_io;
909 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
910 
911 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
912 		return -EINVAL;
913 	}
914 
915 	bdev_io = spdk_bdev_get_io();
916 	if (!bdev_io) {
917 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
918 		return -ENOMEM;
919 	}
920 
921 	bdev_io->ch = channel;
922 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
923 	bdev_io->u.bdev.iov.iov_base = buf;
924 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
925 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
926 	bdev_io->u.bdev.iovcnt = 1;
927 	bdev_io->u.bdev.num_blocks = num_blocks;
928 	bdev_io->u.bdev.offset_blocks = offset_blocks;
929 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
930 
931 	spdk_bdev_io_submit(bdev_io);
932 	return 0;
933 }
934 
935 int
936 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
937 		struct iovec *iov, int iovcnt,
938 		uint64_t offset, uint64_t nbytes,
939 		spdk_bdev_io_completion_cb cb, void *cb_arg)
940 {
941 	uint64_t offset_blocks, num_blocks;
942 
943 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
944 		return -EINVAL;
945 	}
946 
947 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
948 }
949 
950 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
951 			   struct iovec *iov, int iovcnt,
952 			   uint64_t offset_blocks, uint64_t num_blocks,
953 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
954 {
955 	struct spdk_bdev *bdev = desc->bdev;
956 	struct spdk_bdev_io *bdev_io;
957 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
958 
959 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
960 		return -EINVAL;
961 	}
962 
963 	bdev_io = spdk_bdev_get_io();
964 	if (!bdev_io) {
965 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
966 		return -ENOMEM;
967 	}
968 
969 	bdev_io->ch = channel;
970 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
971 	bdev_io->u.bdev.iovs = iov;
972 	bdev_io->u.bdev.iovcnt = iovcnt;
973 	bdev_io->u.bdev.num_blocks = num_blocks;
974 	bdev_io->u.bdev.offset_blocks = offset_blocks;
975 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
976 
977 	spdk_bdev_io_submit(bdev_io);
978 	return 0;
979 }
980 
981 int
982 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
983 		void *buf, uint64_t offset, uint64_t nbytes,
984 		spdk_bdev_io_completion_cb cb, void *cb_arg)
985 {
986 	uint64_t offset_blocks, num_blocks;
987 
988 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
989 		return -EINVAL;
990 	}
991 
992 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
993 }
994 
995 int
996 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
997 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
998 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
999 {
1000 	struct spdk_bdev *bdev = desc->bdev;
1001 	struct spdk_bdev_io *bdev_io;
1002 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1003 
1004 	if (!desc->write) {
1005 		return -EBADF;
1006 	}
1007 
1008 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1009 		return -EINVAL;
1010 	}
1011 
1012 	bdev_io = spdk_bdev_get_io();
1013 	if (!bdev_io) {
1014 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
1015 		return -ENOMEM;
1016 	}
1017 
1018 	bdev_io->ch = channel;
1019 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1020 	bdev_io->u.bdev.iov.iov_base = buf;
1021 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1022 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1023 	bdev_io->u.bdev.iovcnt = 1;
1024 	bdev_io->u.bdev.num_blocks = num_blocks;
1025 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1026 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1027 
1028 	spdk_bdev_io_submit(bdev_io);
1029 	return 0;
1030 }
1031 
1032 int
1033 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1034 		 struct iovec *iov, int iovcnt,
1035 		 uint64_t offset, uint64_t len,
1036 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
1037 {
1038 	uint64_t offset_blocks, num_blocks;
1039 
1040 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1041 		return -EINVAL;
1042 	}
1043 
1044 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1045 }
1046 
1047 int
1048 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1049 			struct iovec *iov, int iovcnt,
1050 			uint64_t offset_blocks, uint64_t num_blocks,
1051 			spdk_bdev_io_completion_cb cb, void *cb_arg)
1052 {
1053 	struct spdk_bdev *bdev = desc->bdev;
1054 	struct spdk_bdev_io *bdev_io;
1055 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1056 
1057 	if (!desc->write) {
1058 		return -EBADF;
1059 	}
1060 
1061 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1062 		return -EINVAL;
1063 	}
1064 
1065 	bdev_io = spdk_bdev_get_io();
1066 	if (!bdev_io) {
1067 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
1068 		return -ENOMEM;
1069 	}
1070 
1071 	bdev_io->ch = channel;
1072 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1073 	bdev_io->u.bdev.iovs = iov;
1074 	bdev_io->u.bdev.iovcnt = iovcnt;
1075 	bdev_io->u.bdev.num_blocks = num_blocks;
1076 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1077 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1078 
1079 	spdk_bdev_io_submit(bdev_io);
1080 	return 0;
1081 }
1082 
1083 int
1084 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1085 		       uint64_t offset, uint64_t len,
1086 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1087 {
1088 	uint64_t offset_blocks, num_blocks;
1089 
1090 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1091 		return -EINVAL;
1092 	}
1093 
1094 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1095 }
1096 
1097 int
1098 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1099 			      uint64_t offset_blocks, uint64_t num_blocks,
1100 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1101 {
1102 	struct spdk_bdev *bdev = desc->bdev;
1103 	struct spdk_bdev_io *bdev_io;
1104 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1105 	uint64_t len;
1106 	bool split_request = false;
1107 
1108 	if (num_blocks > UINT64_MAX / spdk_bdev_get_block_size(bdev)) {
1109 		SPDK_ERRLOG("length argument out of range in write_zeroes\n");
1110 		return -ERANGE;
1111 	}
1112 
1113 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1114 		return -EINVAL;
1115 	}
1116 
1117 	bdev_io = spdk_bdev_get_io();
1118 
1119 	if (!bdev_io) {
1120 		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
1121 		return -ENOMEM;
1122 	}
1123 
1124 	bdev_io->ch = channel;
1125 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1126 
1127 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1128 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1129 		bdev_io->u.bdev.num_blocks = num_blocks;
1130 		bdev_io->u.bdev.iovs = NULL;
1131 		bdev_io->u.bdev.iovcnt = 0;
1132 
1133 	} else {
1134 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
1135 
1136 		len = spdk_bdev_get_block_size(bdev) * num_blocks;
1137 
1138 		if (len > ZERO_BUFFER_SIZE) {
1139 			split_request = true;
1140 			len = ZERO_BUFFER_SIZE;
1141 		}
1142 
1143 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1144 		bdev_io->u.bdev.iov.iov_base = g_bdev_mgr.zero_buffer;
1145 		bdev_io->u.bdev.iov.iov_len = len;
1146 		bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1147 		bdev_io->u.bdev.iovcnt = 1;
1148 		bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev);
1149 		bdev_io->split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks;
1150 		bdev_io->split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks;
1151 	}
1152 
1153 	if (split_request) {
1154 		bdev_io->stored_user_cb = cb;
1155 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split);
1156 	} else {
1157 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1158 	}
1159 	spdk_bdev_io_submit(bdev_io);
1160 	return 0;
1161 }
1162 
1163 int
1164 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1165 		uint64_t offset, uint64_t nbytes,
1166 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1167 {
1168 	uint64_t offset_blocks, num_blocks;
1169 
1170 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1171 		return -EINVAL;
1172 	}
1173 
1174 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1175 }
1176 
1177 int
1178 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1179 		       uint64_t offset_blocks, uint64_t num_blocks,
1180 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1181 {
1182 	struct spdk_bdev *bdev = desc->bdev;
1183 	struct spdk_bdev_io *bdev_io;
1184 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1185 
1186 	if (!desc->write) {
1187 		return -EBADF;
1188 	}
1189 
1190 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1191 		return -EINVAL;
1192 	}
1193 
1194 	if (num_blocks == 0) {
1195 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
1196 		return -EINVAL;
1197 	}
1198 
1199 	bdev_io = spdk_bdev_get_io();
1200 	if (!bdev_io) {
1201 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
1202 		return -ENOMEM;
1203 	}
1204 
1205 	bdev_io->ch = channel;
1206 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1207 	bdev_io->u.bdev.iov.iov_base = NULL;
1208 	bdev_io->u.bdev.iov.iov_len = 0;
1209 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1210 	bdev_io->u.bdev.iovcnt = 1;
1211 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1212 	bdev_io->u.bdev.num_blocks = num_blocks;
1213 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1214 
1215 	spdk_bdev_io_submit(bdev_io);
1216 	return 0;
1217 }
1218 
1219 int
1220 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1221 		uint64_t offset, uint64_t length,
1222 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1223 {
1224 	uint64_t offset_blocks, num_blocks;
1225 
1226 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
1227 		return -EINVAL;
1228 	}
1229 
1230 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1231 }
1232 
1233 int
1234 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1235 		       uint64_t offset_blocks, uint64_t num_blocks,
1236 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1237 {
1238 	struct spdk_bdev *bdev = desc->bdev;
1239 	struct spdk_bdev_io *bdev_io;
1240 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1241 
1242 	if (!desc->write) {
1243 		return -EBADF;
1244 	}
1245 
1246 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1247 		return -EINVAL;
1248 	}
1249 
1250 	bdev_io = spdk_bdev_get_io();
1251 	if (!bdev_io) {
1252 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1253 		return -ENOMEM;
1254 	}
1255 
1256 	bdev_io->ch = channel;
1257 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1258 	bdev_io->u.bdev.iovs = NULL;
1259 	bdev_io->u.bdev.iovcnt = 0;
1260 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1261 	bdev_io->u.bdev.num_blocks = num_blocks;
1262 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1263 
1264 	spdk_bdev_io_submit(bdev_io);
1265 	return 0;
1266 }
1267 
1268 static void
1269 _spdk_bdev_reset_dev(void *io_device, void *ctx)
1270 {
1271 	struct spdk_bdev_channel *ch = ctx;
1272 	struct spdk_bdev_io *bdev_io;
1273 
1274 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
1275 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, link);
1276 	spdk_bdev_io_submit_reset(bdev_io);
1277 }
1278 
1279 static void
1280 _spdk_bdev_reset_abort_channel(void *io_device, struct spdk_io_channel *ch,
1281 			       void *ctx)
1282 {
1283 	struct spdk_bdev_channel	*channel;
1284 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1285 
1286 	channel = spdk_io_channel_get_ctx(ch);
1287 	mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
1288 
1289 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
1290 
1291 	_spdk_bdev_abort_queued_io(&channel->nomem_io, channel);
1292 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
1293 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
1294 }
1295 
1296 static void
1297 _spdk_bdev_start_reset(void *ctx)
1298 {
1299 	struct spdk_bdev_channel *ch = ctx;
1300 
1301 	spdk_for_each_channel(ch->bdev, _spdk_bdev_reset_abort_channel,
1302 			      ch, _spdk_bdev_reset_dev);
1303 }
1304 
1305 static void
1306 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
1307 {
1308 	struct spdk_bdev *bdev = ch->bdev;
1309 
1310 	assert(!TAILQ_EMPTY(&ch->queued_resets));
1311 
1312 	pthread_mutex_lock(&bdev->mutex);
1313 	if (bdev->reset_in_progress == NULL) {
1314 		bdev->reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
1315 		/*
1316 		 * Take a channel reference for the target bdev for the life of this
1317 		 *  reset.  This guards against the channel getting destroyed while
1318 		 *  spdk_for_each_channel() calls related to this reset IO are in
1319 		 *  progress.  We will release the reference when this reset is
1320 		 *  completed.
1321 		 */
1322 		bdev->reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(bdev);
1323 		_spdk_bdev_start_reset(ch);
1324 	}
1325 	pthread_mutex_unlock(&bdev->mutex);
1326 }
1327 
1328 static void
1329 _spdk_bdev_complete_reset_channel(void *io_device, struct spdk_io_channel *_ch, void *ctx)
1330 {
1331 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
1332 
1333 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
1334 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
1335 		_spdk_bdev_channel_start_reset(ch);
1336 	}
1337 }
1338 
1339 int
1340 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1341 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1342 {
1343 	struct spdk_bdev *bdev = desc->bdev;
1344 	struct spdk_bdev_io *bdev_io;
1345 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1346 
1347 	bdev_io = spdk_bdev_get_io();
1348 	if (!bdev_io) {
1349 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1350 		return -ENOMEM;
1351 	}
1352 
1353 	bdev_io->ch = channel;
1354 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1355 	bdev_io->u.reset.ch_ref = NULL;
1356 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1357 
1358 	pthread_mutex_lock(&bdev->mutex);
1359 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, link);
1360 	pthread_mutex_unlock(&bdev->mutex);
1361 
1362 	_spdk_bdev_channel_start_reset(channel);
1363 
1364 	return 0;
1365 }
1366 
1367 void
1368 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1369 		      struct spdk_bdev_io_stat *stat)
1370 {
1371 #ifdef SPDK_CONFIG_VTUNE
1372 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1373 	memset(stat, 0, sizeof(*stat));
1374 	return;
1375 #endif
1376 
1377 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1378 
1379 	*stat = channel->stat;
1380 	memset(&channel->stat, 0, sizeof(channel->stat));
1381 }
1382 
1383 int
1384 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1385 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1386 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1387 {
1388 	struct spdk_bdev *bdev = desc->bdev;
1389 	struct spdk_bdev_io *bdev_io;
1390 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1391 
1392 	if (!desc->write) {
1393 		return -EBADF;
1394 	}
1395 
1396 	bdev_io = spdk_bdev_get_io();
1397 	if (!bdev_io) {
1398 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1399 		return -ENOMEM;
1400 	}
1401 
1402 	bdev_io->ch = channel;
1403 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
1404 	bdev_io->u.nvme_passthru.cmd = *cmd;
1405 	bdev_io->u.nvme_passthru.buf = buf;
1406 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1407 
1408 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1409 
1410 	spdk_bdev_io_submit(bdev_io);
1411 	return 0;
1412 }
1413 
1414 int
1415 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1416 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1417 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1418 {
1419 	struct spdk_bdev *bdev = desc->bdev;
1420 	struct spdk_bdev_io *bdev_io;
1421 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1422 
1423 	if (!desc->write) {
1424 		/*
1425 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1426 		 *  to easily determine if the command is a read or write, but for now just
1427 		 *  do not allow io_passthru with a read-only descriptor.
1428 		 */
1429 		return -EBADF;
1430 	}
1431 
1432 	bdev_io = spdk_bdev_get_io();
1433 	if (!bdev_io) {
1434 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1435 		return -ENOMEM;
1436 	}
1437 
1438 	bdev_io->ch = channel;
1439 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
1440 	bdev_io->u.nvme_passthru.cmd = *cmd;
1441 	bdev_io->u.nvme_passthru.buf = buf;
1442 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1443 
1444 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1445 
1446 	spdk_bdev_io_submit(bdev_io);
1447 	return 0;
1448 }
1449 
1450 int
1451 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
1452 {
1453 	if (!bdev_io) {
1454 		SPDK_ERRLOG("bdev_io is NULL\n");
1455 		return -1;
1456 	}
1457 
1458 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
1459 		SPDK_ERRLOG("bdev_io is in pending state\n");
1460 		assert(false);
1461 		return -1;
1462 	}
1463 
1464 	spdk_bdev_put_io(bdev_io);
1465 
1466 	return 0;
1467 }
1468 
1469 static void
1470 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
1471 {
1472 	struct spdk_bdev *bdev = bdev_ch->bdev;
1473 	struct spdk_bdev_io *bdev_io;
1474 
1475 	if (bdev_ch->io_outstanding > bdev_ch->nomem_threshold) {
1476 		/*
1477 		 * Allow some more I/O to complete before retrying the nomem_io queue.
1478 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
1479 		 *  the context of a completion, because the resources for the I/O are
1480 		 *  not released until control returns to the bdev poller.  Also, we
1481 		 *  may require several small I/O to complete before a larger I/O
1482 		 *  (that requires splitting) can be submitted.
1483 		 */
1484 		return;
1485 	}
1486 
1487 	while (!TAILQ_EMPTY(&bdev_ch->nomem_io)) {
1488 		bdev_io = TAILQ_FIRST(&bdev_ch->nomem_io);
1489 		TAILQ_REMOVE(&bdev_ch->nomem_io, bdev_io, link);
1490 		bdev_ch->io_outstanding++;
1491 		bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
1492 		bdev->fn_table->submit_request(bdev_ch->channel, bdev_io);
1493 		if (bdev_io->status == SPDK_BDEV_IO_STATUS_NOMEM) {
1494 			break;
1495 		}
1496 	}
1497 }
1498 
1499 static void
1500 _spdk_bdev_io_complete(void *ctx)
1501 {
1502 	struct spdk_bdev_io *bdev_io = ctx;
1503 
1504 	assert(bdev_io->cb != NULL);
1505 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, bdev_io->caller_ctx);
1506 }
1507 
1508 void
1509 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
1510 {
1511 	struct spdk_bdev *bdev = bdev_io->bdev;
1512 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
1513 
1514 	bdev_io->status = status;
1515 
1516 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
1517 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
1518 			SPDK_ERRLOG("NOMEM returned for reset\n");
1519 		}
1520 		pthread_mutex_lock(&bdev->mutex);
1521 		if (bdev_io == bdev->reset_in_progress) {
1522 			bdev->reset_in_progress = NULL;
1523 		}
1524 		pthread_mutex_unlock(&bdev->mutex);
1525 		if (bdev_io->u.reset.ch_ref != NULL) {
1526 			spdk_put_io_channel(bdev_io->u.reset.ch_ref);
1527 		}
1528 		spdk_for_each_channel(bdev, _spdk_bdev_complete_reset_channel, NULL, NULL);
1529 	} else {
1530 		assert(bdev_ch->io_outstanding > 0);
1531 		bdev_ch->io_outstanding--;
1532 		if (spdk_likely(status != SPDK_BDEV_IO_STATUS_NOMEM)) {
1533 			if (spdk_unlikely(!TAILQ_EMPTY(&bdev_ch->nomem_io))) {
1534 				_spdk_bdev_ch_retry_io(bdev_ch);
1535 			}
1536 		} else {
1537 			TAILQ_INSERT_HEAD(&bdev_ch->nomem_io, bdev_io, link);
1538 			/*
1539 			 * Wait for some of the outstanding I/O to complete before we
1540 			 *  retry any of the nomem_io.  Normally we will wait for
1541 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
1542 			 *  depth channels we will instead wait for half to complete.
1543 			 */
1544 			bdev_ch->nomem_threshold = spdk_max(bdev_ch->io_outstanding / 2,
1545 							    bdev_ch->io_outstanding - NOMEM_THRESHOLD_COUNT);
1546 			return;
1547 		}
1548 	}
1549 
1550 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1551 		switch (bdev_io->type) {
1552 		case SPDK_BDEV_IO_TYPE_READ:
1553 			bdev_ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev->blocklen;
1554 			bdev_ch->stat.num_read_ops++;
1555 			break;
1556 		case SPDK_BDEV_IO_TYPE_WRITE:
1557 			bdev_ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev->blocklen;
1558 			bdev_ch->stat.num_write_ops++;
1559 			break;
1560 		default:
1561 			break;
1562 		}
1563 	}
1564 
1565 #ifdef SPDK_CONFIG_VTUNE
1566 	uint64_t now_tsc = spdk_get_ticks();
1567 	if (now_tsc > (bdev_ch->start_tsc + bdev_ch->interval_tsc)) {
1568 		uint64_t data[5];
1569 
1570 		data[0] = bdev_ch->stat.num_read_ops;
1571 		data[1] = bdev_ch->stat.bytes_read;
1572 		data[2] = bdev_ch->stat.num_write_ops;
1573 		data[3] = bdev_ch->stat.bytes_written;
1574 		data[4] = bdev->fn_table->get_spin_time ?
1575 			  bdev->fn_table->get_spin_time(bdev_ch->channel) : 0;
1576 
1577 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_ch->handle,
1578 				   __itt_metadata_u64, 5, data);
1579 
1580 		memset(&bdev_ch->stat, 0, sizeof(bdev_ch->stat));
1581 		bdev_ch->start_tsc = now_tsc;
1582 	}
1583 #endif
1584 
1585 	if (bdev_io->in_submit_request || bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
1586 		/*
1587 		 * Defer completion to avoid potential infinite recursion if the
1588 		 * user's completion callback issues a new I/O.
1589 		 */
1590 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_ch->channel),
1591 				     _spdk_bdev_io_complete, bdev_io);
1592 	} else {
1593 		_spdk_bdev_io_complete(bdev_io);
1594 	}
1595 }
1596 
1597 void
1598 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
1599 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
1600 {
1601 	if (sc == SPDK_SCSI_STATUS_GOOD) {
1602 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1603 	} else {
1604 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
1605 		bdev_io->error.scsi.sc = sc;
1606 		bdev_io->error.scsi.sk = sk;
1607 		bdev_io->error.scsi.asc = asc;
1608 		bdev_io->error.scsi.ascq = ascq;
1609 	}
1610 
1611 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1612 }
1613 
1614 void
1615 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
1616 			     int *sc, int *sk, int *asc, int *ascq)
1617 {
1618 	assert(sc != NULL);
1619 	assert(sk != NULL);
1620 	assert(asc != NULL);
1621 	assert(ascq != NULL);
1622 
1623 	switch (bdev_io->status) {
1624 	case SPDK_BDEV_IO_STATUS_SUCCESS:
1625 		*sc = SPDK_SCSI_STATUS_GOOD;
1626 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
1627 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1628 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1629 		break;
1630 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
1631 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
1632 		break;
1633 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
1634 		*sc = bdev_io->error.scsi.sc;
1635 		*sk = bdev_io->error.scsi.sk;
1636 		*asc = bdev_io->error.scsi.asc;
1637 		*ascq = bdev_io->error.scsi.ascq;
1638 		break;
1639 	default:
1640 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
1641 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
1642 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1643 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1644 		break;
1645 	}
1646 }
1647 
1648 void
1649 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
1650 {
1651 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
1652 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1653 	} else {
1654 		bdev_io->error.nvme.sct = sct;
1655 		bdev_io->error.nvme.sc = sc;
1656 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
1657 	}
1658 
1659 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1660 }
1661 
1662 void
1663 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
1664 {
1665 	assert(sct != NULL);
1666 	assert(sc != NULL);
1667 
1668 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
1669 		*sct = bdev_io->error.nvme.sct;
1670 		*sc = bdev_io->error.nvme.sc;
1671 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1672 		*sct = SPDK_NVME_SCT_GENERIC;
1673 		*sc = SPDK_NVME_SC_SUCCESS;
1674 	} else {
1675 		*sct = SPDK_NVME_SCT_GENERIC;
1676 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1677 	}
1678 }
1679 
1680 struct spdk_thread *
1681 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
1682 {
1683 	return spdk_io_channel_get_thread(bdev_io->ch->channel);
1684 }
1685 
1686 static void
1687 _spdk_bdev_register(struct spdk_bdev *bdev)
1688 {
1689 	struct spdk_bdev_module_if *module;
1690 
1691 	assert(bdev->module != NULL);
1692 
1693 	bdev->status = SPDK_BDEV_STATUS_READY;
1694 
1695 	TAILQ_INIT(&bdev->open_descs);
1696 
1697 	TAILQ_INIT(&bdev->vbdevs);
1698 	TAILQ_INIT(&bdev->base_bdevs);
1699 
1700 	bdev->reset_in_progress = NULL;
1701 
1702 	spdk_io_device_register(bdev, spdk_bdev_channel_create, spdk_bdev_channel_destroy,
1703 				sizeof(struct spdk_bdev_channel));
1704 
1705 	pthread_mutex_init(&bdev->mutex, NULL);
1706 	SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Inserting bdev %s into list\n", bdev->name);
1707 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
1708 
1709 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
1710 		if (module->examine) {
1711 			module->action_in_progress++;
1712 			module->examine(bdev);
1713 		}
1714 	}
1715 }
1716 
1717 void
1718 spdk_bdev_register(struct spdk_bdev *bdev)
1719 {
1720 	_spdk_bdev_register(bdev);
1721 }
1722 
1723 void
1724 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
1725 {
1726 	int i;
1727 
1728 	_spdk_bdev_register(vbdev);
1729 	for (i = 0; i < base_bdev_count; i++) {
1730 		assert(base_bdevs[i] != NULL);
1731 		TAILQ_INSERT_TAIL(&vbdev->base_bdevs, base_bdevs[i], base_bdev_link);
1732 		TAILQ_INSERT_TAIL(&base_bdevs[i]->vbdevs, vbdev, vbdev_link);
1733 	}
1734 }
1735 
1736 void
1737 spdk_bdev_unregister_done(struct spdk_bdev *bdev, int bdeverrno)
1738 {
1739 	if (bdev->unregister_cb != NULL) {
1740 		bdev->unregister_cb(bdev->unregister_ctx, bdeverrno);
1741 	}
1742 }
1743 
1744 void
1745 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
1746 {
1747 	struct spdk_bdev_desc	*desc, *tmp;
1748 	int			rc;
1749 	bool			do_destruct = true;
1750 
1751 	SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Removing bdev %s from list\n", bdev->name);
1752 
1753 	pthread_mutex_lock(&bdev->mutex);
1754 
1755 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
1756 	bdev->unregister_cb = cb_fn;
1757 	bdev->unregister_ctx = cb_arg;
1758 
1759 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
1760 		if (desc->remove_cb) {
1761 			pthread_mutex_unlock(&bdev->mutex);
1762 			do_destruct = false;
1763 			desc->remove_cb(desc->remove_ctx);
1764 			pthread_mutex_lock(&bdev->mutex);
1765 		}
1766 	}
1767 
1768 	if (!do_destruct) {
1769 		pthread_mutex_unlock(&bdev->mutex);
1770 		return;
1771 	}
1772 
1773 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
1774 	pthread_mutex_unlock(&bdev->mutex);
1775 
1776 	pthread_mutex_destroy(&bdev->mutex);
1777 
1778 	spdk_io_device_unregister(bdev, NULL);
1779 
1780 	rc = bdev->fn_table->destruct(bdev->ctxt);
1781 	if (rc < 0) {
1782 		SPDK_ERRLOG("destruct failed\n");
1783 	}
1784 	if (rc <= 0 && cb_fn != NULL) {
1785 		cb_fn(cb_arg, rc);
1786 	}
1787 }
1788 
1789 void
1790 spdk_vbdev_unregister(struct spdk_bdev *vbdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
1791 {
1792 	struct spdk_bdev *base_bdev;
1793 
1794 	assert(!TAILQ_EMPTY(&vbdev->base_bdevs));
1795 	TAILQ_FOREACH(base_bdev, &vbdev->base_bdevs, base_bdev_link) {
1796 		TAILQ_REMOVE(&base_bdev->vbdevs, vbdev, vbdev_link);
1797 	}
1798 	spdk_bdev_unregister(vbdev, cb_fn, cb_arg);
1799 }
1800 
1801 int
1802 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
1803 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
1804 {
1805 	struct spdk_bdev_desc *desc;
1806 
1807 	desc = calloc(1, sizeof(*desc));
1808 	if (desc == NULL) {
1809 		return -ENOMEM;
1810 	}
1811 
1812 	pthread_mutex_lock(&bdev->mutex);
1813 
1814 	if (write && bdev->claim_module) {
1815 		SPDK_ERRLOG("failed, %s already claimed\n", bdev->name);
1816 		free(desc);
1817 		pthread_mutex_unlock(&bdev->mutex);
1818 		return -EPERM;
1819 	}
1820 
1821 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
1822 
1823 	desc->bdev = bdev;
1824 	desc->remove_cb = remove_cb;
1825 	desc->remove_ctx = remove_ctx;
1826 	desc->write = write;
1827 	*_desc = desc;
1828 
1829 	pthread_mutex_unlock(&bdev->mutex);
1830 
1831 	return 0;
1832 }
1833 
1834 void
1835 spdk_bdev_close(struct spdk_bdev_desc *desc)
1836 {
1837 	struct spdk_bdev *bdev = desc->bdev;
1838 	bool do_unregister = false;
1839 
1840 	pthread_mutex_lock(&bdev->mutex);
1841 
1842 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
1843 	free(desc);
1844 
1845 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
1846 		do_unregister = true;
1847 	}
1848 	pthread_mutex_unlock(&bdev->mutex);
1849 
1850 	if (do_unregister == true) {
1851 		spdk_bdev_unregister(bdev, bdev->unregister_cb, bdev->unregister_ctx);
1852 	}
1853 }
1854 
1855 int
1856 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
1857 			    struct spdk_bdev_module_if *module)
1858 {
1859 	if (bdev->claim_module != NULL) {
1860 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
1861 			    bdev->claim_module->name);
1862 		return -EPERM;
1863 	}
1864 
1865 	if (desc && !desc->write) {
1866 		desc->write = true;
1867 	}
1868 
1869 	bdev->claim_module = module;
1870 	return 0;
1871 }
1872 
1873 void
1874 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
1875 {
1876 	assert(bdev->claim_module != NULL);
1877 	bdev->claim_module = NULL;
1878 }
1879 
1880 struct spdk_bdev *
1881 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
1882 {
1883 	return desc->bdev;
1884 }
1885 
1886 void
1887 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
1888 {
1889 	struct iovec *iovs;
1890 	int iovcnt;
1891 
1892 	if (bdev_io == NULL) {
1893 		return;
1894 	}
1895 
1896 	switch (bdev_io->type) {
1897 	case SPDK_BDEV_IO_TYPE_READ:
1898 		iovs = bdev_io->u.bdev.iovs;
1899 		iovcnt = bdev_io->u.bdev.iovcnt;
1900 		break;
1901 	case SPDK_BDEV_IO_TYPE_WRITE:
1902 		iovs = bdev_io->u.bdev.iovs;
1903 		iovcnt = bdev_io->u.bdev.iovcnt;
1904 		break;
1905 	default:
1906 		iovs = NULL;
1907 		iovcnt = 0;
1908 		break;
1909 	}
1910 
1911 	if (iovp) {
1912 		*iovp = iovs;
1913 	}
1914 	if (iovcntp) {
1915 		*iovcntp = iovcnt;
1916 	}
1917 }
1918 
1919 void
1920 spdk_bdev_module_list_add(struct spdk_bdev_module_if *bdev_module)
1921 {
1922 	/*
1923 	 * Modules with examine callbacks must be initialized first, so they are
1924 	 *  ready to handle examine callbacks from later modules that will
1925 	 *  register physical bdevs.
1926 	 */
1927 	if (bdev_module->examine != NULL) {
1928 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
1929 	} else {
1930 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
1931 	}
1932 }
1933 
1934 void
1935 spdk_bdev_part_base_free(struct spdk_bdev_part_base *base)
1936 {
1937 	if (base->desc) {
1938 		spdk_bdev_close(base->desc);
1939 		base->desc = NULL;
1940 	}
1941 	base->base_free_fn(base);
1942 }
1943 
1944 void
1945 spdk_bdev_part_free(struct spdk_bdev_part *part)
1946 {
1947 	struct spdk_bdev_part_base *base;
1948 
1949 	assert(part);
1950 	assert(part->base);
1951 
1952 	base = part->base;
1953 	spdk_io_device_unregister(&part->base, NULL);
1954 	TAILQ_REMOVE(base->tailq, part, tailq);
1955 	free(part->bdev.name);
1956 	free(part);
1957 
1958 	if (__sync_sub_and_fetch(&base->ref, 1) == 0) {
1959 		spdk_bdev_module_release_bdev(base->bdev);
1960 		spdk_bdev_part_base_free(base);
1961 	}
1962 }
1963 
1964 void
1965 spdk_bdev_part_tailq_fini(struct bdev_part_tailq *tailq)
1966 {
1967 	struct spdk_bdev_part *part, *tmp;
1968 
1969 	TAILQ_FOREACH_SAFE(part, tailq, tailq, tmp) {
1970 		spdk_bdev_part_free(part);
1971 	}
1972 }
1973 
1974 void
1975 spdk_bdev_part_base_hotremove(struct spdk_bdev *base_bdev, struct bdev_part_tailq *tailq)
1976 {
1977 	struct spdk_bdev_part *part, *tmp;
1978 
1979 	TAILQ_FOREACH_SAFE(part, tailq, tailq, tmp) {
1980 		if (part->base->bdev == base_bdev) {
1981 			spdk_vbdev_unregister(&part->bdev, NULL, NULL);
1982 		}
1983 	}
1984 }
1985 
1986 static bool
1987 spdk_bdev_part_io_type_supported(void *_part, enum spdk_bdev_io_type io_type)
1988 {
1989 	struct spdk_bdev_part *part = _part;
1990 
1991 	return part->base->bdev->fn_table->io_type_supported(part->base->bdev, io_type);
1992 }
1993 
1994 static struct spdk_io_channel *
1995 spdk_bdev_part_get_io_channel(void *_part)
1996 {
1997 	struct spdk_bdev_part *part = _part;
1998 
1999 	return spdk_get_io_channel(&part->base);
2000 }
2001 
2002 static void
2003 spdk_bdev_part_complete_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2004 {
2005 	struct spdk_bdev_io *part_io = cb_arg;
2006 	int status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
2007 
2008 	spdk_bdev_io_complete(part_io, status);
2009 	spdk_bdev_free_io(bdev_io);
2010 }
2011 
2012 static void
2013 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2014 {
2015 	uint64_t len;
2016 
2017 	if (!success) {
2018 		bdev_io->cb = bdev_io->stored_user_cb;
2019 		_spdk_bdev_io_complete(bdev_io);
2020 		return;
2021 	}
2022 
2023 	/* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */
2024 	len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->split_remaining_num_blocks,
2025 		       ZERO_BUFFER_SIZE);
2026 
2027 	bdev_io->u.bdev.offset_blocks = bdev_io->split_current_offset_blocks;
2028 	bdev_io->u.bdev.iov.iov_len = len;
2029 	bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev);
2030 	bdev_io->split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks;
2031 	bdev_io->split_current_offset_blocks += bdev_io->u.bdev.num_blocks;
2032 
2033 	/* if this round completes the i/o, change the callback to be the original user callback */
2034 	if (bdev_io->split_remaining_num_blocks == 0) {
2035 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->stored_user_cb);
2036 	} else {
2037 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split);
2038 	}
2039 	spdk_bdev_io_submit(bdev_io);
2040 }
2041 
2042 void
2043 spdk_bdev_part_submit_request(struct spdk_bdev_part_channel *ch, struct spdk_bdev_io *bdev_io)
2044 {
2045 	struct spdk_bdev_part *part = ch->part;
2046 	struct spdk_io_channel *base_ch = ch->base_ch;
2047 	struct spdk_bdev_desc *base_desc = part->base->desc;
2048 	uint64_t offset;
2049 	int rc = 0;
2050 
2051 	/* Modify the I/O to adjust for the offset within the base bdev. */
2052 	switch (bdev_io->type) {
2053 	case SPDK_BDEV_IO_TYPE_READ:
2054 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2055 		rc = spdk_bdev_readv_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
2056 					    bdev_io->u.bdev.iovcnt, offset,
2057 					    bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
2058 					    bdev_io);
2059 		break;
2060 	case SPDK_BDEV_IO_TYPE_WRITE:
2061 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2062 		rc = spdk_bdev_writev_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
2063 					     bdev_io->u.bdev.iovcnt, offset,
2064 					     bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
2065 					     bdev_io);
2066 		break;
2067 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
2068 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2069 		rc = spdk_bdev_write_zeroes_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2070 						   spdk_bdev_part_complete_io, bdev_io);
2071 		break;
2072 	case SPDK_BDEV_IO_TYPE_UNMAP:
2073 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2074 		rc = spdk_bdev_unmap_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2075 					    spdk_bdev_part_complete_io, bdev_io);
2076 		break;
2077 	case SPDK_BDEV_IO_TYPE_FLUSH:
2078 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2079 		rc = spdk_bdev_flush_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2080 					    spdk_bdev_part_complete_io, bdev_io);
2081 		break;
2082 	case SPDK_BDEV_IO_TYPE_RESET:
2083 		rc = spdk_bdev_reset(base_desc, base_ch,
2084 				     spdk_bdev_part_complete_io, bdev_io);
2085 		break;
2086 	default:
2087 		SPDK_ERRLOG("split: unknown I/O type %d\n", bdev_io->type);
2088 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
2089 		return;
2090 	}
2091 
2092 	if (rc != 0) {
2093 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
2094 	}
2095 }
2096 static int
2097 spdk_bdev_part_channel_create_cb(void *io_device, void *ctx_buf)
2098 {
2099 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
2100 	struct spdk_bdev_part_channel *ch = ctx_buf;
2101 
2102 	ch->part = part;
2103 	ch->base_ch = spdk_bdev_get_io_channel(part->base->desc);
2104 	if (ch->base_ch == NULL) {
2105 		return -1;
2106 	}
2107 
2108 	if (part->base->ch_create_cb) {
2109 		return part->base->ch_create_cb(io_device, ctx_buf);
2110 	} else {
2111 		return 0;
2112 	}
2113 }
2114 
2115 static void
2116 spdk_bdev_part_channel_destroy_cb(void *io_device, void *ctx_buf)
2117 {
2118 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
2119 	struct spdk_bdev_part_channel *ch = ctx_buf;
2120 
2121 	if (part->base->ch_destroy_cb) {
2122 		part->base->ch_destroy_cb(io_device, ctx_buf);
2123 	}
2124 	spdk_put_io_channel(ch->base_ch);
2125 }
2126 
2127 int
2128 spdk_bdev_part_base_construct(struct spdk_bdev_part_base *base, struct spdk_bdev *bdev,
2129 			      spdk_bdev_remove_cb_t remove_cb, struct spdk_bdev_module_if *module,
2130 			      struct spdk_bdev_fn_table *fn_table, struct bdev_part_tailq *tailq,
2131 			      spdk_bdev_part_base_free_fn free_fn,
2132 			      uint32_t channel_size, spdk_io_channel_create_cb ch_create_cb,
2133 			      spdk_io_channel_destroy_cb ch_destroy_cb)
2134 {
2135 	int rc;
2136 
2137 	fn_table->get_io_channel = spdk_bdev_part_get_io_channel;
2138 	fn_table->io_type_supported = spdk_bdev_part_io_type_supported;
2139 
2140 	base->bdev = bdev;
2141 	base->desc = NULL;
2142 	base->ref = 0;
2143 	base->module = module;
2144 	base->fn_table = fn_table;
2145 	base->tailq = tailq;
2146 	base->claimed = false;
2147 	base->channel_size = channel_size;
2148 	base->ch_create_cb = ch_create_cb;
2149 	base->ch_destroy_cb = ch_destroy_cb;
2150 	base->base_free_fn = free_fn;
2151 
2152 	rc = spdk_bdev_open(bdev, false, remove_cb, bdev, &base->desc);
2153 	if (rc) {
2154 		spdk_bdev_part_base_free(base);
2155 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev));
2156 		return -1;
2157 	}
2158 
2159 	return 0;
2160 }
2161 
2162 int
2163 spdk_bdev_part_construct(struct spdk_bdev_part *part, struct spdk_bdev_part_base *base,
2164 			 char *name, uint64_t offset_blocks, uint64_t num_blocks,
2165 			 char *product_name)
2166 {
2167 	part->bdev.name = name;
2168 	part->bdev.blocklen = base->bdev->blocklen;
2169 	part->bdev.blockcnt = num_blocks;
2170 	part->offset_blocks = offset_blocks;
2171 
2172 	part->bdev.write_cache = base->bdev->write_cache;
2173 	part->bdev.need_aligned_buffer = base->bdev->need_aligned_buffer;
2174 	part->bdev.product_name = product_name;
2175 	part->bdev.ctxt = part;
2176 	part->bdev.module = base->module;
2177 	part->bdev.fn_table = base->fn_table;
2178 
2179 	__sync_fetch_and_add(&base->ref, 1);
2180 	part->base = base;
2181 
2182 	if (!base->claimed) {
2183 		int rc;
2184 
2185 		rc = spdk_bdev_module_claim_bdev(base->bdev, base->desc, base->module);
2186 		if (rc) {
2187 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(base->bdev));
2188 			free(part->bdev.name);
2189 			return -1;
2190 		}
2191 		base->claimed = true;
2192 	}
2193 
2194 	spdk_io_device_register(&part->base, spdk_bdev_part_channel_create_cb,
2195 				spdk_bdev_part_channel_destroy_cb,
2196 				base->channel_size);
2197 	spdk_vbdev_register(&part->bdev, &base->bdev, 1);
2198 	TAILQ_INSERT_TAIL(base->tailq, part, tailq);
2199 
2200 	return 0;
2201 }
2202 
2203 SPDK_LOG_REGISTER_TRACE_FLAG("bdev", SPDK_TRACE_BDEV)
2204