xref: /spdk/lib/bdev/bdev.c (revision 5e799ff470261620a1d3952cad88c003785108a2)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/event.h"
41 #include "spdk/io_channel.h"
42 #include "spdk/likely.h"
43 #include "spdk/queue.h"
44 #include "spdk/nvme_spec.h"
45 #include "spdk/scsi_spec.h"
46 #include "spdk/util.h"
47 
48 #include "spdk_internal/bdev.h"
49 #include "spdk_internal/log.h"
50 #include "spdk/string.h"
51 
52 #ifdef SPDK_CONFIG_VTUNE
53 #include "ittnotify.h"
54 #include "ittnotify_types.h"
55 int __itt_init_ittlib(const char *, __itt_group_id);
56 #endif
57 
58 #define SPDK_BDEV_IO_POOL_SIZE	(64 * 1024)
59 #define SPDK_BDEV_IO_CACHE_SIZE	256
60 #define BUF_SMALL_POOL_SIZE	8192
61 #define BUF_LARGE_POOL_SIZE	1024
62 #define NOMEM_THRESHOLD_COUNT	8
63 #define ZERO_BUFFER_SIZE	0x100000
64 
65 typedef TAILQ_HEAD(, spdk_bdev_io) bdev_io_tailq_t;
66 typedef STAILQ_HEAD(, spdk_bdev_io) bdev_io_stailq_t;
67 
68 struct spdk_bdev_mgr {
69 	struct spdk_mempool *bdev_io_pool;
70 
71 	struct spdk_mempool *buf_small_pool;
72 	struct spdk_mempool *buf_large_pool;
73 
74 	void *zero_buffer;
75 
76 	TAILQ_HEAD(, spdk_bdev_module_if) bdev_modules;
77 
78 	TAILQ_HEAD(, spdk_bdev) bdevs;
79 
80 	bool init_complete;
81 	bool module_init_complete;
82 
83 #ifdef SPDK_CONFIG_VTUNE
84 	__itt_domain	*domain;
85 #endif
86 };
87 
88 static struct spdk_bdev_mgr g_bdev_mgr = {
89 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
90 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
91 	.init_complete = false,
92 	.module_init_complete = false,
93 };
94 
95 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
96 static void			*g_init_cb_arg = NULL;
97 
98 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
99 static void			*g_fini_cb_arg = NULL;
100 static struct spdk_thread	*g_fini_thread = NULL;
101 
102 
103 struct spdk_bdev_mgmt_channel {
104 	bdev_io_stailq_t need_buf_small;
105 	bdev_io_stailq_t need_buf_large;
106 
107 	/*
108 	 * Each thread keeps a cache of bdev_io - this allows
109 	 *  bdev threads which are *not* DPDK threads to still
110 	 *  benefit from a per-thread bdev_io cache.  Without
111 	 *  this, non-DPDK threads fetching from the mempool
112 	 *  incur a cmpxchg on get and put.
113 	 */
114 	bdev_io_stailq_t per_thread_cache;
115 	uint32_t	per_thread_cache_count;
116 };
117 
118 struct spdk_bdev_desc {
119 	struct spdk_bdev		*bdev;
120 	spdk_bdev_remove_cb_t		remove_cb;
121 	void				*remove_ctx;
122 	bool				write;
123 	TAILQ_ENTRY(spdk_bdev_desc)	link;
124 };
125 
126 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
127 
128 struct spdk_bdev_channel {
129 	struct spdk_bdev	*bdev;
130 
131 	/* The channel for the underlying device */
132 	struct spdk_io_channel	*channel;
133 
134 	/* Channel for the bdev manager */
135 	struct spdk_io_channel *mgmt_channel;
136 
137 	struct spdk_bdev_io_stat stat;
138 
139 	/*
140 	 * Count of I/O submitted to bdev module and waiting for completion.
141 	 * Incremented before submit_request() is called on an spdk_bdev_io.
142 	 */
143 	uint64_t		io_outstanding;
144 
145 	bdev_io_tailq_t		queued_resets;
146 
147 	/*
148 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
149 	 *  on this channel.
150 	 */
151 	bdev_io_tailq_t		nomem_io;
152 
153 	/*
154 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
155 	 */
156 	uint64_t		nomem_threshold;
157 
158 	uint32_t		flags;
159 
160 #ifdef SPDK_CONFIG_VTUNE
161 	uint64_t		start_tsc;
162 	uint64_t		interval_tsc;
163 	__itt_string_handle	*handle;
164 #endif
165 
166 };
167 
168 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
169 
170 struct spdk_bdev *
171 spdk_bdev_first(void)
172 {
173 	struct spdk_bdev *bdev;
174 
175 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
176 	if (bdev) {
177 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
178 	}
179 
180 	return bdev;
181 }
182 
183 struct spdk_bdev *
184 spdk_bdev_next(struct spdk_bdev *prev)
185 {
186 	struct spdk_bdev *bdev;
187 
188 	bdev = TAILQ_NEXT(prev, link);
189 	if (bdev) {
190 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
191 	}
192 
193 	return bdev;
194 }
195 
196 static struct spdk_bdev *
197 _bdev_next_leaf(struct spdk_bdev *bdev)
198 {
199 	while (bdev != NULL) {
200 		if (TAILQ_EMPTY(&bdev->vbdevs)) {
201 			return bdev;
202 		} else {
203 			bdev = TAILQ_NEXT(bdev, link);
204 		}
205 	}
206 
207 	return bdev;
208 }
209 
210 struct spdk_bdev *
211 spdk_bdev_first_leaf(void)
212 {
213 	struct spdk_bdev *bdev;
214 
215 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
216 
217 	if (bdev) {
218 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
219 	}
220 
221 	return bdev;
222 }
223 
224 struct spdk_bdev *
225 spdk_bdev_next_leaf(struct spdk_bdev *prev)
226 {
227 	struct spdk_bdev *bdev;
228 
229 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
230 
231 	if (bdev) {
232 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
233 	}
234 
235 	return bdev;
236 }
237 
238 struct spdk_bdev *
239 spdk_bdev_get_by_name(const char *bdev_name)
240 {
241 	struct spdk_bdev_alias *tmp;
242 	struct spdk_bdev *bdev = spdk_bdev_first();
243 
244 	while (bdev != NULL) {
245 		if (strcmp(bdev_name, bdev->name) == 0) {
246 			return bdev;
247 		}
248 
249 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
250 			if (strcmp(bdev_name, tmp->alias) == 0) {
251 				return bdev;
252 			}
253 		}
254 
255 		bdev = spdk_bdev_next(bdev);
256 	}
257 
258 	return NULL;
259 }
260 
261 static void
262 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
263 {
264 	assert(bdev_io->get_buf_cb != NULL);
265 	assert(buf != NULL);
266 	assert(bdev_io->u.bdev.iovs != NULL);
267 
268 	bdev_io->buf = buf;
269 	bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
270 	bdev_io->u.bdev.iovs[0].iov_len = bdev_io->buf_len;
271 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
272 }
273 
274 static void
275 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
276 {
277 	struct spdk_mempool *pool;
278 	struct spdk_bdev_io *tmp;
279 	void *buf;
280 	bdev_io_stailq_t *stailq;
281 	struct spdk_bdev_mgmt_channel *ch;
282 
283 	assert(bdev_io->u.bdev.iovcnt == 1);
284 
285 	buf = bdev_io->buf;
286 	ch = bdev_io->mgmt_ch;
287 
288 	if (bdev_io->buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
289 		pool = g_bdev_mgr.buf_small_pool;
290 		stailq = &ch->need_buf_small;
291 	} else {
292 		pool = g_bdev_mgr.buf_large_pool;
293 		stailq = &ch->need_buf_large;
294 	}
295 
296 	if (STAILQ_EMPTY(stailq)) {
297 		spdk_mempool_put(pool, buf);
298 	} else {
299 		tmp = STAILQ_FIRST(stailq);
300 		STAILQ_REMOVE_HEAD(stailq, buf_link);
301 		spdk_bdev_io_set_buf(tmp, buf);
302 	}
303 }
304 
305 void
306 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
307 {
308 	struct spdk_mempool *pool;
309 	bdev_io_stailq_t *stailq;
310 	void *buf = NULL;
311 	struct spdk_bdev_mgmt_channel *ch;
312 
313 	assert(cb != NULL);
314 	assert(bdev_io->u.bdev.iovs != NULL);
315 
316 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
317 		/* Buffer already present */
318 		cb(bdev_io->ch->channel, bdev_io);
319 		return;
320 	}
321 
322 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
323 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
324 
325 	bdev_io->buf_len = len;
326 	bdev_io->get_buf_cb = cb;
327 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
328 		pool = g_bdev_mgr.buf_small_pool;
329 		stailq = &ch->need_buf_small;
330 	} else {
331 		pool = g_bdev_mgr.buf_large_pool;
332 		stailq = &ch->need_buf_large;
333 	}
334 
335 	buf = spdk_mempool_get(pool);
336 
337 	if (!buf) {
338 		STAILQ_INSERT_TAIL(stailq, bdev_io, buf_link);
339 	} else {
340 		spdk_bdev_io_set_buf(bdev_io, buf);
341 	}
342 }
343 
344 static int
345 spdk_bdev_module_get_max_ctx_size(void)
346 {
347 	struct spdk_bdev_module_if *bdev_module;
348 	int max_bdev_module_size = 0;
349 
350 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
351 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
352 			max_bdev_module_size = bdev_module->get_ctx_size();
353 		}
354 	}
355 
356 	return max_bdev_module_size;
357 }
358 
359 void
360 spdk_bdev_config_text(FILE *fp)
361 {
362 	struct spdk_bdev_module_if *bdev_module;
363 
364 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
365 		if (bdev_module->config_text) {
366 			bdev_module->config_text(fp);
367 		}
368 	}
369 }
370 
371 static int
372 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
373 {
374 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
375 
376 	STAILQ_INIT(&ch->need_buf_small);
377 	STAILQ_INIT(&ch->need_buf_large);
378 
379 	STAILQ_INIT(&ch->per_thread_cache);
380 	ch->per_thread_cache_count = 0;
381 
382 	return 0;
383 }
384 
385 static void
386 spdk_bdev_mgmt_channel_free_resources(struct spdk_bdev_mgmt_channel *ch)
387 {
388 	struct spdk_bdev_io *bdev_io;
389 
390 	if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
391 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel free\n");
392 	}
393 
394 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
395 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
396 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link);
397 		ch->per_thread_cache_count--;
398 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
399 	}
400 
401 	assert(ch->per_thread_cache_count == 0);
402 }
403 
404 static void
405 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
406 {
407 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
408 
409 	spdk_bdev_mgmt_channel_free_resources(ch);
410 }
411 
412 static void
413 spdk_bdev_init_complete(int rc)
414 {
415 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
416 	void *cb_arg = g_init_cb_arg;
417 
418 	g_bdev_mgr.init_complete = true;
419 	g_init_cb_fn = NULL;
420 	g_init_cb_arg = NULL;
421 
422 	cb_fn(cb_arg, rc);
423 }
424 
425 static void
426 spdk_bdev_module_action_complete(void)
427 {
428 	struct spdk_bdev_module_if *m;
429 
430 	/*
431 	 * Don't finish bdev subsystem initialization if
432 	 * module pre-initialization is still in progress, or
433 	 * the subsystem been already initialized.
434 	 */
435 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
436 		return;
437 	}
438 
439 	/*
440 	 * Check all bdev modules for inits/examinations in progress. If any
441 	 * exist, return immediately since we cannot finish bdev subsystem
442 	 * initialization until all are completed.
443 	 */
444 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
445 		if (m->action_in_progress > 0) {
446 			return;
447 		}
448 	}
449 
450 	/*
451 	 * Modules already finished initialization - now that all
452 	 * the bdev modules have finished their asynchronous I/O
453 	 * processing, the entire bdev layer can be marked as complete.
454 	 */
455 	spdk_bdev_init_complete(0);
456 }
457 
458 static void
459 spdk_bdev_module_action_done(struct spdk_bdev_module_if *module)
460 {
461 	assert(module->action_in_progress > 0);
462 	module->action_in_progress--;
463 	spdk_bdev_module_action_complete();
464 }
465 
466 void
467 spdk_bdev_module_init_done(struct spdk_bdev_module_if *module)
468 {
469 	spdk_bdev_module_action_done(module);
470 }
471 
472 void
473 spdk_bdev_module_examine_done(struct spdk_bdev_module_if *module)
474 {
475 	spdk_bdev_module_action_done(module);
476 }
477 
478 static int
479 spdk_bdev_modules_init(void)
480 {
481 	struct spdk_bdev_module_if *module;
482 	int rc = 0;
483 
484 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
485 		rc = module->module_init();
486 		if (rc != 0) {
487 			break;
488 		}
489 	}
490 
491 	g_bdev_mgr.module_init_complete = true;
492 	return rc;
493 }
494 void
495 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
496 {
497 	int cache_size;
498 	int rc = 0;
499 	char mempool_name[32];
500 
501 	assert(cb_fn != NULL);
502 
503 	g_init_cb_fn = cb_fn;
504 	g_init_cb_arg = cb_arg;
505 
506 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
507 
508 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
509 				  SPDK_BDEV_IO_POOL_SIZE,
510 				  sizeof(struct spdk_bdev_io) +
511 				  spdk_bdev_module_get_max_ctx_size(),
512 				  0,
513 				  SPDK_ENV_SOCKET_ID_ANY);
514 
515 	if (g_bdev_mgr.bdev_io_pool == NULL) {
516 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
517 		spdk_bdev_init_complete(-1);
518 		return;
519 	}
520 
521 	/**
522 	 * Ensure no more than half of the total buffers end up local caches, by
523 	 *   using spdk_env_get_core_count() to determine how many local caches we need
524 	 *   to account for.
525 	 */
526 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
527 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
528 
529 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
530 				    BUF_SMALL_POOL_SIZE,
531 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
532 				    cache_size,
533 				    SPDK_ENV_SOCKET_ID_ANY);
534 	if (!g_bdev_mgr.buf_small_pool) {
535 		SPDK_ERRLOG("create rbuf small pool failed\n");
536 		spdk_bdev_init_complete(-1);
537 		return;
538 	}
539 
540 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
541 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
542 
543 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
544 				    BUF_LARGE_POOL_SIZE,
545 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
546 				    cache_size,
547 				    SPDK_ENV_SOCKET_ID_ANY);
548 	if (!g_bdev_mgr.buf_large_pool) {
549 		SPDK_ERRLOG("create rbuf large pool failed\n");
550 		spdk_bdev_init_complete(-1);
551 		return;
552 	}
553 
554 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
555 				 NULL);
556 	if (!g_bdev_mgr.zero_buffer) {
557 		SPDK_ERRLOG("create bdev zero buffer failed\n");
558 		spdk_bdev_init_complete(-1);
559 		return;
560 	}
561 
562 #ifdef SPDK_CONFIG_VTUNE
563 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
564 #endif
565 
566 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
567 				spdk_bdev_mgmt_channel_destroy,
568 				sizeof(struct spdk_bdev_mgmt_channel));
569 
570 	rc = spdk_bdev_modules_init();
571 	if (rc != 0) {
572 		SPDK_ERRLOG("bdev modules init failed\n");
573 		spdk_bdev_init_complete(-1);
574 		return;
575 	}
576 
577 	spdk_bdev_module_action_complete();
578 }
579 
580 static void
581 spdk_bdev_module_finish_cb(void *io_device)
582 {
583 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
584 
585 	cb_fn(g_fini_cb_arg);
586 	g_fini_cb_fn = NULL;
587 	g_fini_cb_arg = NULL;
588 }
589 
590 static void
591 spdk_bdev_module_finish_complete(struct spdk_io_channel_iter *i, int status)
592 {
593 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
594 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
595 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
596 			    SPDK_BDEV_IO_POOL_SIZE);
597 	}
598 
599 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
600 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
601 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
602 			    BUF_SMALL_POOL_SIZE);
603 		assert(false);
604 	}
605 
606 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
607 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
608 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
609 			    BUF_LARGE_POOL_SIZE);
610 		assert(false);
611 	}
612 
613 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
614 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
615 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
616 	spdk_dma_free(g_bdev_mgr.zero_buffer);
617 
618 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_module_finish_cb);
619 }
620 
621 static void
622 mgmt_channel_free_resources(struct spdk_io_channel_iter *i)
623 {
624 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
625 	struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(_ch);
626 
627 	spdk_bdev_mgmt_channel_free_resources(ch);
628 	spdk_for_each_channel_continue(i, 0);
629 }
630 
631 static void
632 spdk_bdev_module_finish_iter(void *arg)
633 {
634 	/* Notice that this variable is static. It is saved between calls to
635 	 * this function. */
636 	static struct spdk_bdev_module_if *resume_bdev_module = NULL;
637 	struct spdk_bdev_module_if *bdev_module;
638 
639 	/* Start iterating from the last touched module */
640 	if (!resume_bdev_module) {
641 		bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules);
642 	} else {
643 		bdev_module = TAILQ_NEXT(resume_bdev_module, tailq);
644 	}
645 
646 	while (bdev_module) {
647 		if (bdev_module->async_fini) {
648 			/* Save our place so we can resume later. We must
649 			 * save the variable here, before calling module_fini()
650 			 * below, because in some cases the module may immediately
651 			 * call spdk_bdev_module_finish_done() and re-enter
652 			 * this function to continue iterating. */
653 			resume_bdev_module = bdev_module;
654 		}
655 
656 		if (bdev_module->module_fini) {
657 			bdev_module->module_fini();
658 		}
659 
660 		if (bdev_module->async_fini) {
661 			return;
662 		}
663 
664 		bdev_module = TAILQ_NEXT(bdev_module, tailq);
665 	}
666 
667 	resume_bdev_module = NULL;
668 	spdk_for_each_channel(&g_bdev_mgr, mgmt_channel_free_resources, NULL,
669 			      spdk_bdev_module_finish_complete);
670 }
671 
672 void
673 spdk_bdev_module_finish_done(void)
674 {
675 	if (spdk_get_thread() != g_fini_thread) {
676 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
677 	} else {
678 		spdk_bdev_module_finish_iter(NULL);
679 	}
680 }
681 
682 static void
683 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
684 {
685 	struct spdk_bdev *bdev = cb_arg;
686 
687 	if (bdeverrno && bdev) {
688 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
689 			     bdev->name);
690 
691 		/*
692 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
693 		 *  bdev; try to continue by manually removing this bdev from the list and continue
694 		 *  with the next bdev in the list.
695 		 */
696 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
697 	}
698 
699 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
700 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
701 		spdk_bdev_module_finish_iter(NULL);
702 		return;
703 	}
704 
705 	/*
706 	 * Unregister the first bdev in the list.
707 	 *
708 	 * spdk_bdev_unregister() will handle the case where the bdev has open descriptors by
709 	 *  calling the remove_cb of the descriptors first.
710 	 *
711 	 * Once this bdev and all of its open descriptors have been cleaned up, this function
712 	 *  will be called again via the unregister completion callback to continue the cleanup
713 	 *  process with the next bdev.
714 	 */
715 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
716 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
717 	spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
718 }
719 
720 static void
721 _spdk_bdev_finish_unregister_bdevs(void)
722 {
723 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
724 }
725 
726 void
727 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
728 {
729 	assert(cb_fn != NULL);
730 
731 	g_fini_thread = spdk_get_thread();
732 
733 	g_fini_cb_fn = cb_fn;
734 	g_fini_cb_arg = cb_arg;
735 
736 	_spdk_bdev_finish_unregister_bdevs();
737 }
738 
739 static struct spdk_bdev_io *
740 spdk_bdev_get_io(struct spdk_io_channel *_ch)
741 {
742 	struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(_ch);
743 	struct spdk_bdev_io *bdev_io;
744 
745 	if (ch->per_thread_cache_count > 0) {
746 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
747 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link);
748 		ch->per_thread_cache_count--;
749 	} else {
750 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
751 		if (!bdev_io) {
752 			SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
753 			abort();
754 		}
755 	}
756 
757 	bdev_io->mgmt_ch = ch;
758 
759 	return bdev_io;
760 }
761 
762 static void
763 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
764 {
765 	struct spdk_bdev_mgmt_channel *ch = bdev_io->mgmt_ch;
766 
767 	if (bdev_io->buf != NULL) {
768 		spdk_bdev_io_put_buf(bdev_io);
769 	}
770 
771 	if (ch->per_thread_cache_count < SPDK_BDEV_IO_CACHE_SIZE) {
772 		ch->per_thread_cache_count++;
773 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, buf_link);
774 	} else {
775 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
776 	}
777 }
778 
779 static void
780 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
781 {
782 	struct spdk_bdev *bdev = bdev_io->bdev;
783 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
784 	struct spdk_io_channel *ch = bdev_ch->channel;
785 
786 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
787 
788 	bdev_ch->io_outstanding++;
789 	bdev_io->in_submit_request = true;
790 	if (spdk_likely(bdev_ch->flags == 0)) {
791 		if (spdk_likely(TAILQ_EMPTY(&bdev_ch->nomem_io))) {
792 			bdev->fn_table->submit_request(ch, bdev_io);
793 		} else {
794 			bdev_ch->io_outstanding--;
795 			TAILQ_INSERT_TAIL(&bdev_ch->nomem_io, bdev_io, link);
796 		}
797 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
798 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
799 	} else {
800 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
801 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
802 	}
803 	bdev_io->in_submit_request = false;
804 }
805 
806 static void
807 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
808 {
809 	struct spdk_bdev *bdev = bdev_io->bdev;
810 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
811 	struct spdk_io_channel *ch = bdev_ch->channel;
812 
813 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
814 
815 	bdev_io->in_submit_request = true;
816 	bdev->fn_table->submit_request(ch, bdev_io);
817 	bdev_io->in_submit_request = false;
818 }
819 
820 static void
821 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
822 		  struct spdk_bdev *bdev, void *cb_arg,
823 		  spdk_bdev_io_completion_cb cb)
824 {
825 	bdev_io->bdev = bdev;
826 	bdev_io->caller_ctx = cb_arg;
827 	bdev_io->cb = cb;
828 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
829 	bdev_io->in_submit_request = false;
830 	bdev_io->buf = NULL;
831 }
832 
833 bool
834 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
835 {
836 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
837 }
838 
839 int
840 spdk_bdev_dump_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
841 {
842 	if (bdev->fn_table->dump_config_json) {
843 		return bdev->fn_table->dump_config_json(bdev->ctxt, w);
844 	}
845 
846 	return 0;
847 }
848 
849 static int
850 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
851 {
852 	struct spdk_bdev		*bdev = io_device;
853 	struct spdk_bdev_channel	*ch = ctx_buf;
854 
855 	ch->bdev = io_device;
856 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
857 	if (!ch->channel) {
858 		return -1;
859 	}
860 
861 	ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr);
862 	if (!ch->mgmt_channel) {
863 		spdk_put_io_channel(ch->channel);
864 		return -1;
865 	}
866 
867 	memset(&ch->stat, 0, sizeof(ch->stat));
868 	ch->io_outstanding = 0;
869 	TAILQ_INIT(&ch->queued_resets);
870 	TAILQ_INIT(&ch->nomem_io);
871 	ch->nomem_threshold = 0;
872 	ch->flags = 0;
873 
874 #ifdef SPDK_CONFIG_VTUNE
875 	{
876 		char *name;
877 		__itt_init_ittlib(NULL, 0);
878 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
879 		if (!name) {
880 			spdk_put_io_channel(ch->channel);
881 			spdk_put_io_channel(ch->mgmt_channel);
882 			return -1;
883 		}
884 		ch->handle = __itt_string_handle_create(name);
885 		free(name);
886 		ch->start_tsc = spdk_get_ticks();
887 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
888 	}
889 #endif
890 
891 	return 0;
892 }
893 
894 /*
895  * Abort I/O that are waiting on a data buffer.  These types of I/O are
896  *  linked using the spdk_bdev_io buf_link TAILQ_ENTRY.
897  */
898 static void
899 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
900 {
901 	bdev_io_stailq_t tmp;
902 	struct spdk_bdev_io *bdev_io;
903 
904 	STAILQ_INIT(&tmp);
905 
906 	while (!STAILQ_EMPTY(queue)) {
907 		bdev_io = STAILQ_FIRST(queue);
908 		STAILQ_REMOVE_HEAD(queue, buf_link);
909 		if (bdev_io->ch == ch) {
910 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
911 		} else {
912 			STAILQ_INSERT_TAIL(&tmp, bdev_io, buf_link);
913 		}
914 	}
915 
916 	STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
917 }
918 
919 /*
920  * Abort I/O that are queued waiting for submission.  These types of I/O are
921  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
922  */
923 static void
924 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
925 {
926 	struct spdk_bdev_io *bdev_io, *tmp;
927 
928 	TAILQ_FOREACH_SAFE(bdev_io, queue, link, tmp) {
929 		if (bdev_io->ch == ch) {
930 			TAILQ_REMOVE(queue, bdev_io, link);
931 			/*
932 			 * spdk_bdev_io_complete() assumes that the completed I/O had
933 			 *  been submitted to the bdev module.  Since in this case it
934 			 *  hadn't, bump io_outstanding to account for the decrement
935 			 *  that spdk_bdev_io_complete() will do.
936 			 */
937 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
938 				ch->io_outstanding++;
939 			}
940 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
941 		}
942 	}
943 }
944 
945 static void
946 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
947 {
948 	struct spdk_bdev_channel	*ch = ctx_buf;
949 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
950 
951 	mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
952 
953 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
954 	_spdk_bdev_abort_queued_io(&ch->nomem_io, ch);
955 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, ch);
956 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, ch);
957 
958 	spdk_put_io_channel(ch->channel);
959 	spdk_put_io_channel(ch->mgmt_channel);
960 	assert(ch->io_outstanding == 0);
961 }
962 
963 int
964 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
965 {
966 	struct spdk_bdev_alias *tmp;
967 
968 	if (alias == NULL) {
969 		SPDK_ERRLOG("Empty alias passed\n");
970 		return -EINVAL;
971 	}
972 
973 	if (spdk_bdev_get_by_name(alias)) {
974 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
975 		return -EEXIST;
976 	}
977 
978 	tmp = calloc(1, sizeof(*tmp));
979 	if (tmp == NULL) {
980 		SPDK_ERRLOG("Unable to allocate alias\n");
981 		return -ENOMEM;
982 	}
983 
984 	tmp->alias = strdup(alias);
985 	if (tmp->alias == NULL) {
986 		free(tmp);
987 		SPDK_ERRLOG("Unable to allocate alias\n");
988 		return -ENOMEM;
989 	}
990 
991 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
992 
993 	return 0;
994 }
995 
996 int
997 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
998 {
999 	struct spdk_bdev_alias *tmp;
1000 
1001 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
1002 		if (strcmp(alias, tmp->alias) == 0) {
1003 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
1004 			free(tmp->alias);
1005 			free(tmp);
1006 			return 0;
1007 		}
1008 	}
1009 
1010 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1011 
1012 	return -ENOENT;
1013 }
1014 
1015 struct spdk_io_channel *
1016 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1017 {
1018 	return spdk_get_io_channel(desc->bdev);
1019 }
1020 
1021 const char *
1022 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1023 {
1024 	return bdev->name;
1025 }
1026 
1027 const char *
1028 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1029 {
1030 	return bdev->product_name;
1031 }
1032 
1033 const struct spdk_bdev_aliases_list *
1034 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1035 {
1036 	return &bdev->aliases;
1037 }
1038 
1039 uint32_t
1040 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1041 {
1042 	return bdev->blocklen;
1043 }
1044 
1045 uint64_t
1046 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
1047 {
1048 	return bdev->blockcnt;
1049 }
1050 
1051 size_t
1052 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
1053 {
1054 	/* TODO: push this logic down to the bdev modules */
1055 	if (bdev->need_aligned_buffer) {
1056 		return bdev->blocklen;
1057 	}
1058 
1059 	return 1;
1060 }
1061 
1062 uint32_t
1063 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
1064 {
1065 	return bdev->optimal_io_boundary;
1066 }
1067 
1068 bool
1069 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
1070 {
1071 	return bdev->write_cache;
1072 }
1073 
1074 /*
1075  * Convert I/O offset and length from bytes to blocks.
1076  *
1077  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
1078  */
1079 static uint64_t
1080 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
1081 			  uint64_t num_bytes, uint64_t *num_blocks)
1082 {
1083 	uint32_t block_size = bdev->blocklen;
1084 
1085 	*offset_blocks = offset_bytes / block_size;
1086 	*num_blocks = num_bytes / block_size;
1087 
1088 	return (offset_bytes % block_size) | (num_bytes % block_size);
1089 }
1090 
1091 static bool
1092 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
1093 {
1094 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
1095 	 * has been an overflow and hence the offset has been wrapped around */
1096 	if (offset_blocks + num_blocks < offset_blocks) {
1097 		return false;
1098 	}
1099 
1100 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
1101 	if (offset_blocks + num_blocks > bdev->blockcnt) {
1102 		return false;
1103 	}
1104 
1105 	return true;
1106 }
1107 
1108 int
1109 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1110 	       void *buf, uint64_t offset, uint64_t nbytes,
1111 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
1112 {
1113 	uint64_t offset_blocks, num_blocks;
1114 
1115 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1116 		return -EINVAL;
1117 	}
1118 
1119 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1120 }
1121 
1122 int
1123 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1124 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1125 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
1126 {
1127 	struct spdk_bdev *bdev = desc->bdev;
1128 	struct spdk_bdev_io *bdev_io;
1129 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1130 
1131 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1132 		return -EINVAL;
1133 	}
1134 
1135 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1136 	if (!bdev_io) {
1137 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1138 		return -ENOMEM;
1139 	}
1140 
1141 	bdev_io->ch = channel;
1142 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1143 	bdev_io->u.bdev.iov.iov_base = buf;
1144 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1145 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1146 	bdev_io->u.bdev.iovcnt = 1;
1147 	bdev_io->u.bdev.num_blocks = num_blocks;
1148 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1149 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1150 
1151 	spdk_bdev_io_submit(bdev_io);
1152 	return 0;
1153 }
1154 
1155 int
1156 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1157 		struct iovec *iov, int iovcnt,
1158 		uint64_t offset, uint64_t nbytes,
1159 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1160 {
1161 	uint64_t offset_blocks, num_blocks;
1162 
1163 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1164 		return -EINVAL;
1165 	}
1166 
1167 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1168 }
1169 
1170 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1171 			   struct iovec *iov, int iovcnt,
1172 			   uint64_t offset_blocks, uint64_t num_blocks,
1173 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1174 {
1175 	struct spdk_bdev *bdev = desc->bdev;
1176 	struct spdk_bdev_io *bdev_io;
1177 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1178 
1179 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1180 		return -EINVAL;
1181 	}
1182 
1183 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1184 	if (!bdev_io) {
1185 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1186 		return -ENOMEM;
1187 	}
1188 
1189 	bdev_io->ch = channel;
1190 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1191 	bdev_io->u.bdev.iovs = iov;
1192 	bdev_io->u.bdev.iovcnt = iovcnt;
1193 	bdev_io->u.bdev.num_blocks = num_blocks;
1194 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1195 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1196 
1197 	spdk_bdev_io_submit(bdev_io);
1198 	return 0;
1199 }
1200 
1201 int
1202 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1203 		void *buf, uint64_t offset, uint64_t nbytes,
1204 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1205 {
1206 	uint64_t offset_blocks, num_blocks;
1207 
1208 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1209 		return -EINVAL;
1210 	}
1211 
1212 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1213 }
1214 
1215 int
1216 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1217 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1218 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1219 {
1220 	struct spdk_bdev *bdev = desc->bdev;
1221 	struct spdk_bdev_io *bdev_io;
1222 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1223 
1224 	if (!desc->write) {
1225 		return -EBADF;
1226 	}
1227 
1228 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1229 		return -EINVAL;
1230 	}
1231 
1232 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1233 	if (!bdev_io) {
1234 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
1235 		return -ENOMEM;
1236 	}
1237 
1238 	bdev_io->ch = channel;
1239 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1240 	bdev_io->u.bdev.iov.iov_base = buf;
1241 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1242 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1243 	bdev_io->u.bdev.iovcnt = 1;
1244 	bdev_io->u.bdev.num_blocks = num_blocks;
1245 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1246 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1247 
1248 	spdk_bdev_io_submit(bdev_io);
1249 	return 0;
1250 }
1251 
1252 int
1253 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1254 		 struct iovec *iov, int iovcnt,
1255 		 uint64_t offset, uint64_t len,
1256 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
1257 {
1258 	uint64_t offset_blocks, num_blocks;
1259 
1260 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1261 		return -EINVAL;
1262 	}
1263 
1264 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1265 }
1266 
1267 int
1268 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1269 			struct iovec *iov, int iovcnt,
1270 			uint64_t offset_blocks, uint64_t num_blocks,
1271 			spdk_bdev_io_completion_cb cb, void *cb_arg)
1272 {
1273 	struct spdk_bdev *bdev = desc->bdev;
1274 	struct spdk_bdev_io *bdev_io;
1275 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1276 
1277 	if (!desc->write) {
1278 		return -EBADF;
1279 	}
1280 
1281 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1282 		return -EINVAL;
1283 	}
1284 
1285 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1286 	if (!bdev_io) {
1287 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
1288 		return -ENOMEM;
1289 	}
1290 
1291 	bdev_io->ch = channel;
1292 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1293 	bdev_io->u.bdev.iovs = iov;
1294 	bdev_io->u.bdev.iovcnt = iovcnt;
1295 	bdev_io->u.bdev.num_blocks = num_blocks;
1296 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1297 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1298 
1299 	spdk_bdev_io_submit(bdev_io);
1300 	return 0;
1301 }
1302 
1303 int
1304 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1305 		       uint64_t offset, uint64_t len,
1306 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1307 {
1308 	uint64_t offset_blocks, num_blocks;
1309 
1310 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1311 		return -EINVAL;
1312 	}
1313 
1314 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1315 }
1316 
1317 int
1318 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1319 			      uint64_t offset_blocks, uint64_t num_blocks,
1320 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1321 {
1322 	struct spdk_bdev *bdev = desc->bdev;
1323 	struct spdk_bdev_io *bdev_io;
1324 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1325 	uint64_t len;
1326 	bool split_request = false;
1327 
1328 	if (num_blocks > UINT64_MAX / spdk_bdev_get_block_size(bdev)) {
1329 		SPDK_ERRLOG("length argument out of range in write_zeroes\n");
1330 		return -ERANGE;
1331 	}
1332 
1333 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1334 		return -EINVAL;
1335 	}
1336 
1337 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1338 
1339 	if (!bdev_io) {
1340 		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
1341 		return -ENOMEM;
1342 	}
1343 
1344 	bdev_io->ch = channel;
1345 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1346 
1347 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1348 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1349 		bdev_io->u.bdev.num_blocks = num_blocks;
1350 		bdev_io->u.bdev.iovs = NULL;
1351 		bdev_io->u.bdev.iovcnt = 0;
1352 
1353 	} else {
1354 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
1355 
1356 		len = spdk_bdev_get_block_size(bdev) * num_blocks;
1357 
1358 		if (len > ZERO_BUFFER_SIZE) {
1359 			split_request = true;
1360 			len = ZERO_BUFFER_SIZE;
1361 		}
1362 
1363 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1364 		bdev_io->u.bdev.iov.iov_base = g_bdev_mgr.zero_buffer;
1365 		bdev_io->u.bdev.iov.iov_len = len;
1366 		bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1367 		bdev_io->u.bdev.iovcnt = 1;
1368 		bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev);
1369 		bdev_io->split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks;
1370 		bdev_io->split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks;
1371 	}
1372 
1373 	if (split_request) {
1374 		bdev_io->stored_user_cb = cb;
1375 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split);
1376 	} else {
1377 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1378 	}
1379 	spdk_bdev_io_submit(bdev_io);
1380 	return 0;
1381 }
1382 
1383 int
1384 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1385 		uint64_t offset, uint64_t nbytes,
1386 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1387 {
1388 	uint64_t offset_blocks, num_blocks;
1389 
1390 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1391 		return -EINVAL;
1392 	}
1393 
1394 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1395 }
1396 
1397 int
1398 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1399 		       uint64_t offset_blocks, uint64_t num_blocks,
1400 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1401 {
1402 	struct spdk_bdev *bdev = desc->bdev;
1403 	struct spdk_bdev_io *bdev_io;
1404 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1405 
1406 	if (!desc->write) {
1407 		return -EBADF;
1408 	}
1409 
1410 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1411 		return -EINVAL;
1412 	}
1413 
1414 	if (num_blocks == 0) {
1415 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
1416 		return -EINVAL;
1417 	}
1418 
1419 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1420 	if (!bdev_io) {
1421 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
1422 		return -ENOMEM;
1423 	}
1424 
1425 	bdev_io->ch = channel;
1426 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1427 	bdev_io->u.bdev.iov.iov_base = NULL;
1428 	bdev_io->u.bdev.iov.iov_len = 0;
1429 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1430 	bdev_io->u.bdev.iovcnt = 1;
1431 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1432 	bdev_io->u.bdev.num_blocks = num_blocks;
1433 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1434 
1435 	spdk_bdev_io_submit(bdev_io);
1436 	return 0;
1437 }
1438 
1439 int
1440 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1441 		uint64_t offset, uint64_t length,
1442 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1443 {
1444 	uint64_t offset_blocks, num_blocks;
1445 
1446 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
1447 		return -EINVAL;
1448 	}
1449 
1450 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1451 }
1452 
1453 int
1454 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1455 		       uint64_t offset_blocks, uint64_t num_blocks,
1456 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1457 {
1458 	struct spdk_bdev *bdev = desc->bdev;
1459 	struct spdk_bdev_io *bdev_io;
1460 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1461 
1462 	if (!desc->write) {
1463 		return -EBADF;
1464 	}
1465 
1466 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1467 		return -EINVAL;
1468 	}
1469 
1470 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1471 	if (!bdev_io) {
1472 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1473 		return -ENOMEM;
1474 	}
1475 
1476 	bdev_io->ch = channel;
1477 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1478 	bdev_io->u.bdev.iovs = NULL;
1479 	bdev_io->u.bdev.iovcnt = 0;
1480 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1481 	bdev_io->u.bdev.num_blocks = num_blocks;
1482 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1483 
1484 	spdk_bdev_io_submit(bdev_io);
1485 	return 0;
1486 }
1487 
1488 static void
1489 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
1490 {
1491 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
1492 	struct spdk_bdev_io *bdev_io;
1493 
1494 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
1495 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, link);
1496 	spdk_bdev_io_submit_reset(bdev_io);
1497 }
1498 
1499 static void
1500 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
1501 {
1502 	struct spdk_io_channel 		*ch;
1503 	struct spdk_bdev_channel	*channel;
1504 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1505 
1506 	ch = spdk_io_channel_iter_get_channel(i);
1507 	channel = spdk_io_channel_get_ctx(ch);
1508 	mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
1509 
1510 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
1511 
1512 	_spdk_bdev_abort_queued_io(&channel->nomem_io, channel);
1513 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
1514 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
1515 
1516 	spdk_for_each_channel_continue(i, 0);
1517 }
1518 
1519 static void
1520 _spdk_bdev_start_reset(void *ctx)
1521 {
1522 	struct spdk_bdev_channel *ch = ctx;
1523 
1524 	spdk_for_each_channel(ch->bdev, _spdk_bdev_reset_freeze_channel,
1525 			      ch, _spdk_bdev_reset_dev);
1526 }
1527 
1528 static void
1529 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
1530 {
1531 	struct spdk_bdev *bdev = ch->bdev;
1532 
1533 	assert(!TAILQ_EMPTY(&ch->queued_resets));
1534 
1535 	pthread_mutex_lock(&bdev->mutex);
1536 	if (bdev->reset_in_progress == NULL) {
1537 		bdev->reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
1538 		/*
1539 		 * Take a channel reference for the target bdev for the life of this
1540 		 *  reset.  This guards against the channel getting destroyed while
1541 		 *  spdk_for_each_channel() calls related to this reset IO are in
1542 		 *  progress.  We will release the reference when this reset is
1543 		 *  completed.
1544 		 */
1545 		bdev->reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(bdev);
1546 		_spdk_bdev_start_reset(ch);
1547 	}
1548 	pthread_mutex_unlock(&bdev->mutex);
1549 }
1550 
1551 int
1552 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1553 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1554 {
1555 	struct spdk_bdev *bdev = desc->bdev;
1556 	struct spdk_bdev_io *bdev_io;
1557 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1558 
1559 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1560 	if (!bdev_io) {
1561 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1562 		return -ENOMEM;
1563 	}
1564 
1565 	bdev_io->ch = channel;
1566 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1567 	bdev_io->u.reset.ch_ref = NULL;
1568 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1569 
1570 	pthread_mutex_lock(&bdev->mutex);
1571 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, link);
1572 	pthread_mutex_unlock(&bdev->mutex);
1573 
1574 	_spdk_bdev_channel_start_reset(channel);
1575 
1576 	return 0;
1577 }
1578 
1579 void
1580 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1581 		      struct spdk_bdev_io_stat *stat)
1582 {
1583 #ifdef SPDK_CONFIG_VTUNE
1584 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1585 	memset(stat, 0, sizeof(*stat));
1586 	return;
1587 #endif
1588 
1589 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1590 
1591 	*stat = channel->stat;
1592 	memset(&channel->stat, 0, sizeof(channel->stat));
1593 }
1594 
1595 int
1596 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1597 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1598 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1599 {
1600 	struct spdk_bdev *bdev = desc->bdev;
1601 	struct spdk_bdev_io *bdev_io;
1602 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1603 
1604 	if (!desc->write) {
1605 		return -EBADF;
1606 	}
1607 
1608 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1609 	if (!bdev_io) {
1610 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1611 		return -ENOMEM;
1612 	}
1613 
1614 	bdev_io->ch = channel;
1615 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
1616 	bdev_io->u.nvme_passthru.cmd = *cmd;
1617 	bdev_io->u.nvme_passthru.buf = buf;
1618 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1619 	bdev_io->u.nvme_passthru.md_buf = NULL;
1620 	bdev_io->u.nvme_passthru.md_len = 0;
1621 
1622 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1623 
1624 	spdk_bdev_io_submit(bdev_io);
1625 	return 0;
1626 }
1627 
1628 int
1629 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1630 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1631 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1632 {
1633 	struct spdk_bdev *bdev = desc->bdev;
1634 	struct spdk_bdev_io *bdev_io;
1635 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1636 
1637 	if (!desc->write) {
1638 		/*
1639 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1640 		 *  to easily determine if the command is a read or write, but for now just
1641 		 *  do not allow io_passthru with a read-only descriptor.
1642 		 */
1643 		return -EBADF;
1644 	}
1645 
1646 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1647 	if (!bdev_io) {
1648 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1649 		return -ENOMEM;
1650 	}
1651 
1652 	bdev_io->ch = channel;
1653 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
1654 	bdev_io->u.nvme_passthru.cmd = *cmd;
1655 	bdev_io->u.nvme_passthru.buf = buf;
1656 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1657 	bdev_io->u.nvme_passthru.md_buf = NULL;
1658 	bdev_io->u.nvme_passthru.md_len = 0;
1659 
1660 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1661 
1662 	spdk_bdev_io_submit(bdev_io);
1663 	return 0;
1664 }
1665 
1666 int
1667 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1668 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
1669 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1670 {
1671 	struct spdk_bdev *bdev = desc->bdev;
1672 	struct spdk_bdev_io *bdev_io;
1673 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1674 
1675 	if (!desc->write) {
1676 		/*
1677 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1678 		 *  to easily determine if the command is a read or write, but for now just
1679 		 *  do not allow io_passthru with a read-only descriptor.
1680 		 */
1681 		return -EBADF;
1682 	}
1683 
1684 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1685 	if (!bdev_io) {
1686 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1687 		return -ENOMEM;
1688 	}
1689 
1690 	bdev_io->ch = channel;
1691 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
1692 	bdev_io->u.nvme_passthru.cmd = *cmd;
1693 	bdev_io->u.nvme_passthru.buf = buf;
1694 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1695 	bdev_io->u.nvme_passthru.md_buf = md_buf;
1696 	bdev_io->u.nvme_passthru.md_len = md_len;
1697 
1698 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1699 
1700 	spdk_bdev_io_submit(bdev_io);
1701 	return 0;
1702 }
1703 
1704 int
1705 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
1706 {
1707 	if (!bdev_io) {
1708 		SPDK_ERRLOG("bdev_io is NULL\n");
1709 		return -1;
1710 	}
1711 
1712 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
1713 		SPDK_ERRLOG("bdev_io is in pending state\n");
1714 		assert(false);
1715 		return -1;
1716 	}
1717 
1718 	spdk_bdev_put_io(bdev_io);
1719 
1720 	return 0;
1721 }
1722 
1723 static void
1724 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
1725 {
1726 	struct spdk_bdev *bdev = bdev_ch->bdev;
1727 	struct spdk_bdev_io *bdev_io;
1728 
1729 	if (bdev_ch->io_outstanding > bdev_ch->nomem_threshold) {
1730 		/*
1731 		 * Allow some more I/O to complete before retrying the nomem_io queue.
1732 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
1733 		 *  the context of a completion, because the resources for the I/O are
1734 		 *  not released until control returns to the bdev poller.  Also, we
1735 		 *  may require several small I/O to complete before a larger I/O
1736 		 *  (that requires splitting) can be submitted.
1737 		 */
1738 		return;
1739 	}
1740 
1741 	while (!TAILQ_EMPTY(&bdev_ch->nomem_io)) {
1742 		bdev_io = TAILQ_FIRST(&bdev_ch->nomem_io);
1743 		TAILQ_REMOVE(&bdev_ch->nomem_io, bdev_io, link);
1744 		bdev_ch->io_outstanding++;
1745 		bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
1746 		bdev->fn_table->submit_request(bdev_ch->channel, bdev_io);
1747 		if (bdev_io->status == SPDK_BDEV_IO_STATUS_NOMEM) {
1748 			break;
1749 		}
1750 	}
1751 }
1752 
1753 static void
1754 _spdk_bdev_io_complete(void *ctx)
1755 {
1756 	struct spdk_bdev_io *bdev_io = ctx;
1757 
1758 	assert(bdev_io->cb != NULL);
1759 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, bdev_io->caller_ctx);
1760 }
1761 
1762 static void
1763 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
1764 {
1765 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
1766 
1767 	if (bdev_io->u.reset.ch_ref != NULL) {
1768 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
1769 		bdev_io->u.reset.ch_ref = NULL;
1770 	}
1771 
1772 	_spdk_bdev_io_complete(bdev_io);
1773 }
1774 
1775 static void
1776 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
1777 {
1778 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
1779 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
1780 
1781 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
1782 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
1783 		_spdk_bdev_channel_start_reset(ch);
1784 	}
1785 
1786 	spdk_for_each_channel_continue(i, 0);
1787 }
1788 
1789 void
1790 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
1791 {
1792 	struct spdk_bdev *bdev = bdev_io->bdev;
1793 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
1794 
1795 	bdev_io->status = status;
1796 
1797 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
1798 		bool unlock_channels = false;
1799 
1800 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
1801 			SPDK_ERRLOG("NOMEM returned for reset\n");
1802 		}
1803 		pthread_mutex_lock(&bdev->mutex);
1804 		if (bdev_io == bdev->reset_in_progress) {
1805 			bdev->reset_in_progress = NULL;
1806 			unlock_channels = true;
1807 		}
1808 		pthread_mutex_unlock(&bdev->mutex);
1809 
1810 		if (unlock_channels) {
1811 			spdk_for_each_channel(bdev, _spdk_bdev_unfreeze_channel, bdev_io,
1812 					      _spdk_bdev_reset_complete);
1813 			return;
1814 		}
1815 	} else {
1816 		assert(bdev_ch->io_outstanding > 0);
1817 		bdev_ch->io_outstanding--;
1818 		if (spdk_likely(status != SPDK_BDEV_IO_STATUS_NOMEM)) {
1819 			if (spdk_unlikely(!TAILQ_EMPTY(&bdev_ch->nomem_io))) {
1820 				_spdk_bdev_ch_retry_io(bdev_ch);
1821 			}
1822 		} else {
1823 			TAILQ_INSERT_HEAD(&bdev_ch->nomem_io, bdev_io, link);
1824 			/*
1825 			 * Wait for some of the outstanding I/O to complete before we
1826 			 *  retry any of the nomem_io.  Normally we will wait for
1827 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
1828 			 *  depth channels we will instead wait for half to complete.
1829 			 */
1830 			bdev_ch->nomem_threshold = spdk_max((int64_t)bdev_ch->io_outstanding / 2,
1831 							    (int64_t)bdev_ch->io_outstanding - NOMEM_THRESHOLD_COUNT);
1832 			return;
1833 		}
1834 	}
1835 
1836 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1837 		switch (bdev_io->type) {
1838 		case SPDK_BDEV_IO_TYPE_READ:
1839 			bdev_ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev->blocklen;
1840 			bdev_ch->stat.num_read_ops++;
1841 			break;
1842 		case SPDK_BDEV_IO_TYPE_WRITE:
1843 			bdev_ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev->blocklen;
1844 			bdev_ch->stat.num_write_ops++;
1845 			break;
1846 		default:
1847 			break;
1848 		}
1849 	}
1850 
1851 #ifdef SPDK_CONFIG_VTUNE
1852 	uint64_t now_tsc = spdk_get_ticks();
1853 	if (now_tsc > (bdev_ch->start_tsc + bdev_ch->interval_tsc)) {
1854 		uint64_t data[5];
1855 
1856 		data[0] = bdev_ch->stat.num_read_ops;
1857 		data[1] = bdev_ch->stat.bytes_read;
1858 		data[2] = bdev_ch->stat.num_write_ops;
1859 		data[3] = bdev_ch->stat.bytes_written;
1860 		data[4] = bdev->fn_table->get_spin_time ?
1861 			  bdev->fn_table->get_spin_time(bdev_ch->channel) : 0;
1862 
1863 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_ch->handle,
1864 				   __itt_metadata_u64, 5, data);
1865 
1866 		memset(&bdev_ch->stat, 0, sizeof(bdev_ch->stat));
1867 		bdev_ch->start_tsc = now_tsc;
1868 	}
1869 #endif
1870 
1871 	if (bdev_io->in_submit_request) {
1872 		/*
1873 		 * Defer completion to avoid potential infinite recursion if the
1874 		 * user's completion callback issues a new I/O.
1875 		 */
1876 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_ch->channel),
1877 				     _spdk_bdev_io_complete, bdev_io);
1878 	} else {
1879 		_spdk_bdev_io_complete(bdev_io);
1880 	}
1881 }
1882 
1883 void
1884 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
1885 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
1886 {
1887 	if (sc == SPDK_SCSI_STATUS_GOOD) {
1888 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1889 	} else {
1890 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
1891 		bdev_io->error.scsi.sc = sc;
1892 		bdev_io->error.scsi.sk = sk;
1893 		bdev_io->error.scsi.asc = asc;
1894 		bdev_io->error.scsi.ascq = ascq;
1895 	}
1896 
1897 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1898 }
1899 
1900 void
1901 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
1902 			     int *sc, int *sk, int *asc, int *ascq)
1903 {
1904 	assert(sc != NULL);
1905 	assert(sk != NULL);
1906 	assert(asc != NULL);
1907 	assert(ascq != NULL);
1908 
1909 	switch (bdev_io->status) {
1910 	case SPDK_BDEV_IO_STATUS_SUCCESS:
1911 		*sc = SPDK_SCSI_STATUS_GOOD;
1912 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
1913 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1914 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1915 		break;
1916 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
1917 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
1918 		break;
1919 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
1920 		*sc = bdev_io->error.scsi.sc;
1921 		*sk = bdev_io->error.scsi.sk;
1922 		*asc = bdev_io->error.scsi.asc;
1923 		*ascq = bdev_io->error.scsi.ascq;
1924 		break;
1925 	default:
1926 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
1927 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
1928 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1929 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1930 		break;
1931 	}
1932 }
1933 
1934 void
1935 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
1936 {
1937 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
1938 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1939 	} else {
1940 		bdev_io->error.nvme.sct = sct;
1941 		bdev_io->error.nvme.sc = sc;
1942 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
1943 	}
1944 
1945 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1946 }
1947 
1948 void
1949 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
1950 {
1951 	assert(sct != NULL);
1952 	assert(sc != NULL);
1953 
1954 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
1955 		*sct = bdev_io->error.nvme.sct;
1956 		*sc = bdev_io->error.nvme.sc;
1957 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1958 		*sct = SPDK_NVME_SCT_GENERIC;
1959 		*sc = SPDK_NVME_SC_SUCCESS;
1960 	} else {
1961 		*sct = SPDK_NVME_SCT_GENERIC;
1962 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1963 	}
1964 }
1965 
1966 struct spdk_thread *
1967 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
1968 {
1969 	return spdk_io_channel_get_thread(bdev_io->ch->channel);
1970 }
1971 
1972 static int
1973 _spdk_bdev_register(struct spdk_bdev *bdev)
1974 {
1975 	struct spdk_bdev_module_if *module;
1976 
1977 	assert(bdev->module != NULL);
1978 
1979 	if (!bdev->name) {
1980 		SPDK_ERRLOG("Bdev name is NULL\n");
1981 		return -EINVAL;
1982 	}
1983 
1984 	if (spdk_bdev_get_by_name(bdev->name)) {
1985 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
1986 		return -EEXIST;
1987 	}
1988 
1989 	bdev->status = SPDK_BDEV_STATUS_READY;
1990 
1991 	TAILQ_INIT(&bdev->open_descs);
1992 
1993 	TAILQ_INIT(&bdev->vbdevs);
1994 	TAILQ_INIT(&bdev->base_bdevs);
1995 
1996 	TAILQ_INIT(&bdev->aliases);
1997 
1998 	bdev->reset_in_progress = NULL;
1999 
2000 	spdk_io_device_register(bdev, spdk_bdev_channel_create, spdk_bdev_channel_destroy,
2001 				sizeof(struct spdk_bdev_channel));
2002 
2003 	pthread_mutex_init(&bdev->mutex, NULL);
2004 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
2005 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
2006 
2007 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
2008 		if (module->examine) {
2009 			module->action_in_progress++;
2010 			module->examine(bdev);
2011 		}
2012 	}
2013 
2014 	return 0;
2015 }
2016 
2017 int
2018 spdk_bdev_register(struct spdk_bdev *bdev)
2019 {
2020 	return _spdk_bdev_register(bdev);
2021 }
2022 
2023 int
2024 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
2025 {
2026 	int i, rc;
2027 
2028 	rc = _spdk_bdev_register(vbdev);
2029 	if (rc) {
2030 		return rc;
2031 	}
2032 
2033 	for (i = 0; i < base_bdev_count; i++) {
2034 		assert(base_bdevs[i] != NULL);
2035 		TAILQ_INSERT_TAIL(&vbdev->base_bdevs, base_bdevs[i], base_bdev_link);
2036 		TAILQ_INSERT_TAIL(&base_bdevs[i]->vbdevs, vbdev, vbdev_link);
2037 	}
2038 
2039 	return 0;
2040 }
2041 
2042 void
2043 spdk_bdev_unregister_done(struct spdk_bdev *bdev, int bdeverrno)
2044 {
2045 	if (bdev->unregister_cb != NULL) {
2046 		bdev->unregister_cb(bdev->unregister_ctx, bdeverrno);
2047 	}
2048 }
2049 
2050 void
2051 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
2052 {
2053 	struct spdk_bdev_desc	*desc, *tmp;
2054 	int			rc;
2055 	bool			do_destruct = true;
2056 
2057 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
2058 
2059 	pthread_mutex_lock(&bdev->mutex);
2060 
2061 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
2062 	bdev->unregister_cb = cb_fn;
2063 	bdev->unregister_ctx = cb_arg;
2064 
2065 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
2066 		if (desc->remove_cb) {
2067 			pthread_mutex_unlock(&bdev->mutex);
2068 			do_destruct = false;
2069 			desc->remove_cb(desc->remove_ctx);
2070 			pthread_mutex_lock(&bdev->mutex);
2071 		}
2072 	}
2073 
2074 	if (!do_destruct) {
2075 		pthread_mutex_unlock(&bdev->mutex);
2076 		return;
2077 	}
2078 
2079 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
2080 	pthread_mutex_unlock(&bdev->mutex);
2081 
2082 	pthread_mutex_destroy(&bdev->mutex);
2083 
2084 	spdk_io_device_unregister(bdev, NULL);
2085 
2086 	rc = bdev->fn_table->destruct(bdev->ctxt);
2087 	if (rc < 0) {
2088 		SPDK_ERRLOG("destruct failed\n");
2089 	}
2090 	if (rc <= 0 && cb_fn != NULL) {
2091 		cb_fn(cb_arg, rc);
2092 	}
2093 }
2094 
2095 void
2096 spdk_vbdev_unregister(struct spdk_bdev *vbdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
2097 {
2098 	struct spdk_bdev *base_bdev;
2099 
2100 	assert(!TAILQ_EMPTY(&vbdev->base_bdevs));
2101 	TAILQ_FOREACH(base_bdev, &vbdev->base_bdevs, base_bdev_link) {
2102 		TAILQ_REMOVE(&base_bdev->vbdevs, vbdev, vbdev_link);
2103 	}
2104 	spdk_bdev_unregister(vbdev, cb_fn, cb_arg);
2105 }
2106 
2107 int
2108 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
2109 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
2110 {
2111 	struct spdk_bdev_desc *desc;
2112 
2113 	desc = calloc(1, sizeof(*desc));
2114 	if (desc == NULL) {
2115 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
2116 		return -ENOMEM;
2117 	}
2118 
2119 	pthread_mutex_lock(&bdev->mutex);
2120 
2121 	if (write && bdev->claim_module) {
2122 		SPDK_INFOLOG(SPDK_LOG_BDEV, "Could not open %s - already claimed\n", bdev->name);
2123 		free(desc);
2124 		pthread_mutex_unlock(&bdev->mutex);
2125 		return -EPERM;
2126 	}
2127 
2128 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
2129 
2130 	desc->bdev = bdev;
2131 	desc->remove_cb = remove_cb;
2132 	desc->remove_ctx = remove_ctx;
2133 	desc->write = write;
2134 	*_desc = desc;
2135 
2136 	pthread_mutex_unlock(&bdev->mutex);
2137 
2138 	return 0;
2139 }
2140 
2141 void
2142 spdk_bdev_close(struct spdk_bdev_desc *desc)
2143 {
2144 	struct spdk_bdev *bdev = desc->bdev;
2145 	bool do_unregister = false;
2146 
2147 	pthread_mutex_lock(&bdev->mutex);
2148 
2149 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
2150 	free(desc);
2151 
2152 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
2153 		do_unregister = true;
2154 	}
2155 	pthread_mutex_unlock(&bdev->mutex);
2156 
2157 	if (do_unregister == true) {
2158 		spdk_bdev_unregister(bdev, bdev->unregister_cb, bdev->unregister_ctx);
2159 	}
2160 }
2161 
2162 int
2163 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
2164 			    struct spdk_bdev_module_if *module)
2165 {
2166 	if (bdev->claim_module != NULL) {
2167 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
2168 			    bdev->claim_module->name);
2169 		return -EPERM;
2170 	}
2171 
2172 	if (desc && !desc->write) {
2173 		desc->write = true;
2174 	}
2175 
2176 	bdev->claim_module = module;
2177 	return 0;
2178 }
2179 
2180 void
2181 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
2182 {
2183 	assert(bdev->claim_module != NULL);
2184 	bdev->claim_module = NULL;
2185 }
2186 
2187 struct spdk_bdev *
2188 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
2189 {
2190 	return desc->bdev;
2191 }
2192 
2193 void
2194 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
2195 {
2196 	struct iovec *iovs;
2197 	int iovcnt;
2198 
2199 	if (bdev_io == NULL) {
2200 		return;
2201 	}
2202 
2203 	switch (bdev_io->type) {
2204 	case SPDK_BDEV_IO_TYPE_READ:
2205 		iovs = bdev_io->u.bdev.iovs;
2206 		iovcnt = bdev_io->u.bdev.iovcnt;
2207 		break;
2208 	case SPDK_BDEV_IO_TYPE_WRITE:
2209 		iovs = bdev_io->u.bdev.iovs;
2210 		iovcnt = bdev_io->u.bdev.iovcnt;
2211 		break;
2212 	default:
2213 		iovs = NULL;
2214 		iovcnt = 0;
2215 		break;
2216 	}
2217 
2218 	if (iovp) {
2219 		*iovp = iovs;
2220 	}
2221 	if (iovcntp) {
2222 		*iovcntp = iovcnt;
2223 	}
2224 }
2225 
2226 void
2227 spdk_bdev_module_list_add(struct spdk_bdev_module_if *bdev_module)
2228 {
2229 	/*
2230 	 * Modules with examine callbacks must be initialized first, so they are
2231 	 *  ready to handle examine callbacks from later modules that will
2232 	 *  register physical bdevs.
2233 	 */
2234 	if (bdev_module->examine != NULL) {
2235 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2236 	} else {
2237 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2238 	}
2239 }
2240 
2241 void
2242 spdk_bdev_part_base_free(struct spdk_bdev_part_base *base)
2243 {
2244 	if (base->desc) {
2245 		spdk_bdev_close(base->desc);
2246 		base->desc = NULL;
2247 	}
2248 	base->base_free_fn(base);
2249 }
2250 
2251 void
2252 spdk_bdev_part_free(struct spdk_bdev_part *part)
2253 {
2254 	struct spdk_bdev_part_base *base;
2255 
2256 	assert(part);
2257 	assert(part->base);
2258 
2259 	base = part->base;
2260 	spdk_io_device_unregister(&part->base, NULL);
2261 	TAILQ_REMOVE(base->tailq, part, tailq);
2262 	free(part->bdev.name);
2263 	free(part);
2264 
2265 	if (__sync_sub_and_fetch(&base->ref, 1) == 0) {
2266 		spdk_bdev_module_release_bdev(base->bdev);
2267 		spdk_bdev_part_base_free(base);
2268 	}
2269 }
2270 
2271 void
2272 spdk_bdev_part_base_hotremove(struct spdk_bdev *base_bdev, struct bdev_part_tailq *tailq)
2273 {
2274 	struct spdk_bdev_part *part, *tmp;
2275 
2276 	TAILQ_FOREACH_SAFE(part, tailq, tailq, tmp) {
2277 		if (part->base->bdev == base_bdev) {
2278 			spdk_vbdev_unregister(&part->bdev, NULL, NULL);
2279 		}
2280 	}
2281 }
2282 
2283 static bool
2284 spdk_bdev_part_io_type_supported(void *_part, enum spdk_bdev_io_type io_type)
2285 {
2286 	struct spdk_bdev_part *part = _part;
2287 
2288 	return part->base->bdev->fn_table->io_type_supported(part->base->bdev, io_type);
2289 }
2290 
2291 static struct spdk_io_channel *
2292 spdk_bdev_part_get_io_channel(void *_part)
2293 {
2294 	struct spdk_bdev_part *part = _part;
2295 
2296 	return spdk_get_io_channel(&part->base);
2297 }
2298 
2299 static void
2300 spdk_bdev_part_complete_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2301 {
2302 	struct spdk_bdev_io *part_io = cb_arg;
2303 	int status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
2304 
2305 	spdk_bdev_io_complete(part_io, status);
2306 	spdk_bdev_free_io(bdev_io);
2307 }
2308 
2309 static void
2310 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2311 {
2312 	uint64_t len;
2313 
2314 	if (!success) {
2315 		bdev_io->cb = bdev_io->stored_user_cb;
2316 		_spdk_bdev_io_complete(bdev_io);
2317 		return;
2318 	}
2319 
2320 	/* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */
2321 	len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->split_remaining_num_blocks,
2322 		       ZERO_BUFFER_SIZE);
2323 
2324 	bdev_io->u.bdev.offset_blocks = bdev_io->split_current_offset_blocks;
2325 	bdev_io->u.bdev.iov.iov_len = len;
2326 	bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev);
2327 	bdev_io->split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks;
2328 	bdev_io->split_current_offset_blocks += bdev_io->u.bdev.num_blocks;
2329 
2330 	/* if this round completes the i/o, change the callback to be the original user callback */
2331 	if (bdev_io->split_remaining_num_blocks == 0) {
2332 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->stored_user_cb);
2333 	} else {
2334 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split);
2335 	}
2336 	spdk_bdev_io_submit(bdev_io);
2337 }
2338 
2339 void
2340 spdk_bdev_part_submit_request(struct spdk_bdev_part_channel *ch, struct spdk_bdev_io *bdev_io)
2341 {
2342 	struct spdk_bdev_part *part = ch->part;
2343 	struct spdk_io_channel *base_ch = ch->base_ch;
2344 	struct spdk_bdev_desc *base_desc = part->base->desc;
2345 	uint64_t offset;
2346 	int rc = 0;
2347 
2348 	/* Modify the I/O to adjust for the offset within the base bdev. */
2349 	switch (bdev_io->type) {
2350 	case SPDK_BDEV_IO_TYPE_READ:
2351 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2352 		rc = spdk_bdev_readv_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
2353 					    bdev_io->u.bdev.iovcnt, offset,
2354 					    bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
2355 					    bdev_io);
2356 		break;
2357 	case SPDK_BDEV_IO_TYPE_WRITE:
2358 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2359 		rc = spdk_bdev_writev_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
2360 					     bdev_io->u.bdev.iovcnt, offset,
2361 					     bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
2362 					     bdev_io);
2363 		break;
2364 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
2365 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2366 		rc = spdk_bdev_write_zeroes_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2367 						   spdk_bdev_part_complete_io, bdev_io);
2368 		break;
2369 	case SPDK_BDEV_IO_TYPE_UNMAP:
2370 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2371 		rc = spdk_bdev_unmap_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2372 					    spdk_bdev_part_complete_io, bdev_io);
2373 		break;
2374 	case SPDK_BDEV_IO_TYPE_FLUSH:
2375 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2376 		rc = spdk_bdev_flush_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2377 					    spdk_bdev_part_complete_io, bdev_io);
2378 		break;
2379 	case SPDK_BDEV_IO_TYPE_RESET:
2380 		rc = spdk_bdev_reset(base_desc, base_ch,
2381 				     spdk_bdev_part_complete_io, bdev_io);
2382 		break;
2383 	default:
2384 		SPDK_ERRLOG("split: unknown I/O type %d\n", bdev_io->type);
2385 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
2386 		return;
2387 	}
2388 
2389 	if (rc != 0) {
2390 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
2391 	}
2392 }
2393 static int
2394 spdk_bdev_part_channel_create_cb(void *io_device, void *ctx_buf)
2395 {
2396 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
2397 	struct spdk_bdev_part_channel *ch = ctx_buf;
2398 
2399 	ch->part = part;
2400 	ch->base_ch = spdk_bdev_get_io_channel(part->base->desc);
2401 	if (ch->base_ch == NULL) {
2402 		return -1;
2403 	}
2404 
2405 	if (part->base->ch_create_cb) {
2406 		return part->base->ch_create_cb(io_device, ctx_buf);
2407 	} else {
2408 		return 0;
2409 	}
2410 }
2411 
2412 static void
2413 spdk_bdev_part_channel_destroy_cb(void *io_device, void *ctx_buf)
2414 {
2415 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
2416 	struct spdk_bdev_part_channel *ch = ctx_buf;
2417 
2418 	if (part->base->ch_destroy_cb) {
2419 		part->base->ch_destroy_cb(io_device, ctx_buf);
2420 	}
2421 	spdk_put_io_channel(ch->base_ch);
2422 }
2423 
2424 int
2425 spdk_bdev_part_base_construct(struct spdk_bdev_part_base *base, struct spdk_bdev *bdev,
2426 			      spdk_bdev_remove_cb_t remove_cb, struct spdk_bdev_module_if *module,
2427 			      struct spdk_bdev_fn_table *fn_table, struct bdev_part_tailq *tailq,
2428 			      spdk_bdev_part_base_free_fn free_fn,
2429 			      uint32_t channel_size, spdk_io_channel_create_cb ch_create_cb,
2430 			      spdk_io_channel_destroy_cb ch_destroy_cb)
2431 {
2432 	int rc;
2433 
2434 	fn_table->get_io_channel = spdk_bdev_part_get_io_channel;
2435 	fn_table->io_type_supported = spdk_bdev_part_io_type_supported;
2436 
2437 	base->bdev = bdev;
2438 	base->desc = NULL;
2439 	base->ref = 0;
2440 	base->module = module;
2441 	base->fn_table = fn_table;
2442 	base->tailq = tailq;
2443 	base->claimed = false;
2444 	base->channel_size = channel_size;
2445 	base->ch_create_cb = ch_create_cb;
2446 	base->ch_destroy_cb = ch_destroy_cb;
2447 	base->base_free_fn = free_fn;
2448 
2449 	rc = spdk_bdev_open(bdev, false, remove_cb, bdev, &base->desc);
2450 	if (rc) {
2451 		spdk_bdev_part_base_free(base);
2452 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev));
2453 		return -1;
2454 	}
2455 
2456 	return 0;
2457 }
2458 
2459 int
2460 spdk_bdev_part_construct(struct spdk_bdev_part *part, struct spdk_bdev_part_base *base,
2461 			 char *name, uint64_t offset_blocks, uint64_t num_blocks,
2462 			 char *product_name)
2463 {
2464 	part->bdev.name = name;
2465 	part->bdev.blocklen = base->bdev->blocklen;
2466 	part->bdev.blockcnt = num_blocks;
2467 	part->offset_blocks = offset_blocks;
2468 
2469 	part->bdev.write_cache = base->bdev->write_cache;
2470 	part->bdev.need_aligned_buffer = base->bdev->need_aligned_buffer;
2471 	part->bdev.product_name = product_name;
2472 	part->bdev.ctxt = part;
2473 	part->bdev.module = base->module;
2474 	part->bdev.fn_table = base->fn_table;
2475 
2476 	__sync_fetch_and_add(&base->ref, 1);
2477 	part->base = base;
2478 
2479 	if (!base->claimed) {
2480 		int rc;
2481 
2482 		rc = spdk_bdev_module_claim_bdev(base->bdev, base->desc, base->module);
2483 		if (rc) {
2484 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(base->bdev));
2485 			free(part->bdev.name);
2486 			return -1;
2487 		}
2488 		base->claimed = true;
2489 	}
2490 
2491 	spdk_io_device_register(&part->base, spdk_bdev_part_channel_create_cb,
2492 				spdk_bdev_part_channel_destroy_cb,
2493 				base->channel_size);
2494 	spdk_vbdev_register(&part->bdev, &base->bdev, 1);
2495 	TAILQ_INSERT_TAIL(base->tailq, part, tailq);
2496 
2497 	return 0;
2498 }
2499 
2500 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
2501