xref: /spdk/lib/bdev/bdev.c (revision 6c54c13cd4ef10dac9b17b30072fb930a8f95a5c)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/event.h"
41 #include "spdk/io_channel.h"
42 #include "spdk/likely.h"
43 #include "spdk/queue.h"
44 #include "spdk/nvme_spec.h"
45 #include "spdk/scsi_spec.h"
46 #include "spdk/util.h"
47 
48 #include "spdk_internal/bdev.h"
49 #include "spdk_internal/log.h"
50 #include "spdk/string.h"
51 
52 #ifdef SPDK_CONFIG_VTUNE
53 #include "ittnotify.h"
54 #include "ittnotify_types.h"
55 int __itt_init_ittlib(const char *, __itt_group_id);
56 #endif
57 
58 #define SPDK_BDEV_IO_POOL_SIZE	(64 * 1024)
59 #define BUF_SMALL_POOL_SIZE	8192
60 #define BUF_LARGE_POOL_SIZE	1024
61 #define NOMEM_THRESHOLD_COUNT	8
62 #define ZERO_BUFFER_SIZE	0x100000
63 
64 typedef TAILQ_HEAD(, spdk_bdev_io) bdev_io_tailq_t;
65 
66 struct spdk_bdev_mgr {
67 	struct spdk_mempool *bdev_io_pool;
68 
69 	struct spdk_mempool *buf_small_pool;
70 	struct spdk_mempool *buf_large_pool;
71 
72 	void *zero_buffer;
73 
74 	TAILQ_HEAD(, spdk_bdev_module_if) bdev_modules;
75 
76 	TAILQ_HEAD(, spdk_bdev) bdevs;
77 
78 	spdk_bdev_poller_start_cb start_poller_fn;
79 	spdk_bdev_poller_stop_cb stop_poller_fn;
80 
81 	bool init_complete;
82 	bool module_init_complete;
83 
84 #ifdef SPDK_CONFIG_VTUNE
85 	__itt_domain	*domain;
86 #endif
87 };
88 
89 static struct spdk_bdev_mgr g_bdev_mgr = {
90 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
91 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
92 	.start_poller_fn = NULL,
93 	.stop_poller_fn = NULL,
94 	.init_complete = false,
95 	.module_init_complete = false,
96 };
97 
98 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
99 static void			*g_init_cb_arg = NULL;
100 
101 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
102 static void			*g_fini_cb_arg = NULL;
103 struct spdk_bdev_module_if	*g_bdev_module = NULL;
104 struct spdk_thread		*g_fini_thread = NULL;
105 
106 
107 struct spdk_bdev_mgmt_channel {
108 	bdev_io_tailq_t need_buf_small;
109 	bdev_io_tailq_t need_buf_large;
110 };
111 
112 struct spdk_bdev_desc {
113 	struct spdk_bdev		*bdev;
114 	spdk_bdev_remove_cb_t		remove_cb;
115 	void				*remove_ctx;
116 	bool				write;
117 	TAILQ_ENTRY(spdk_bdev_desc)	link;
118 };
119 
120 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
121 
122 struct spdk_bdev_channel {
123 	struct spdk_bdev	*bdev;
124 
125 	/* The channel for the underlying device */
126 	struct spdk_io_channel	*channel;
127 
128 	/* Channel for the bdev manager */
129 	struct spdk_io_channel *mgmt_channel;
130 
131 	struct spdk_bdev_io_stat stat;
132 
133 	/*
134 	 * Count of I/O submitted to bdev module and waiting for completion.
135 	 * Incremented before submit_request() is called on an spdk_bdev_io.
136 	 */
137 	uint64_t		io_outstanding;
138 
139 	bdev_io_tailq_t		queued_resets;
140 
141 	/*
142 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
143 	 *  on this channel.
144 	 */
145 	bdev_io_tailq_t		nomem_io;
146 
147 	/*
148 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
149 	 */
150 	uint64_t		nomem_threshold;
151 
152 	uint32_t		flags;
153 
154 #ifdef SPDK_CONFIG_VTUNE
155 	uint64_t		start_tsc;
156 	uint64_t		interval_tsc;
157 	__itt_string_handle	*handle;
158 #endif
159 
160 };
161 
162 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
163 
164 struct spdk_bdev *
165 spdk_bdev_first(void)
166 {
167 	struct spdk_bdev *bdev;
168 
169 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
170 	if (bdev) {
171 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Starting bdev iteration at %s\n", bdev->name);
172 	}
173 
174 	return bdev;
175 }
176 
177 struct spdk_bdev *
178 spdk_bdev_next(struct spdk_bdev *prev)
179 {
180 	struct spdk_bdev *bdev;
181 
182 	bdev = TAILQ_NEXT(prev, link);
183 	if (bdev) {
184 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
185 	}
186 
187 	return bdev;
188 }
189 
190 static struct spdk_bdev *
191 _bdev_next_leaf(struct spdk_bdev *bdev)
192 {
193 	while (bdev != NULL) {
194 		if (TAILQ_EMPTY(&bdev->vbdevs)) {
195 			return bdev;
196 		} else {
197 			bdev = TAILQ_NEXT(bdev, link);
198 		}
199 	}
200 
201 	return bdev;
202 }
203 
204 struct spdk_bdev *
205 spdk_bdev_first_leaf(void)
206 {
207 	struct spdk_bdev *bdev;
208 
209 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
210 
211 	if (bdev) {
212 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Starting bdev iteration at %s\n", bdev->name);
213 	}
214 
215 	return bdev;
216 }
217 
218 struct spdk_bdev *
219 spdk_bdev_next_leaf(struct spdk_bdev *prev)
220 {
221 	struct spdk_bdev *bdev;
222 
223 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
224 
225 	if (bdev) {
226 		SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
227 	}
228 
229 	return bdev;
230 }
231 
232 struct spdk_bdev *
233 spdk_bdev_get_by_name(const char *bdev_name)
234 {
235 	struct spdk_bdev *bdev = spdk_bdev_first();
236 
237 	while (bdev != NULL) {
238 		if (strcmp(bdev_name, bdev->name) == 0) {
239 			return bdev;
240 		}
241 		bdev = spdk_bdev_next(bdev);
242 	}
243 
244 	return NULL;
245 }
246 
247 static void
248 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
249 {
250 	assert(bdev_io->get_buf_cb != NULL);
251 	assert(buf != NULL);
252 	assert(bdev_io->u.bdev.iovs != NULL);
253 
254 	bdev_io->buf = buf;
255 	bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
256 	bdev_io->u.bdev.iovs[0].iov_len = bdev_io->buf_len;
257 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
258 }
259 
260 static void
261 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
262 {
263 	struct spdk_mempool *pool;
264 	struct spdk_bdev_io *tmp;
265 	void *buf;
266 	bdev_io_tailq_t *tailq;
267 	struct spdk_bdev_mgmt_channel *ch;
268 
269 	assert(bdev_io->u.bdev.iovcnt == 1);
270 
271 	buf = bdev_io->buf;
272 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
273 
274 	if (bdev_io->buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
275 		pool = g_bdev_mgr.buf_small_pool;
276 		tailq = &ch->need_buf_small;
277 	} else {
278 		pool = g_bdev_mgr.buf_large_pool;
279 		tailq = &ch->need_buf_large;
280 	}
281 
282 	if (TAILQ_EMPTY(tailq)) {
283 		spdk_mempool_put(pool, buf);
284 	} else {
285 		tmp = TAILQ_FIRST(tailq);
286 		TAILQ_REMOVE(tailq, tmp, buf_link);
287 		spdk_bdev_io_set_buf(tmp, buf);
288 	}
289 }
290 
291 void
292 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
293 {
294 	struct spdk_mempool *pool;
295 	bdev_io_tailq_t *tailq;
296 	void *buf = NULL;
297 	struct spdk_bdev_mgmt_channel *ch;
298 
299 	assert(cb != NULL);
300 	assert(bdev_io->u.bdev.iovs != NULL);
301 
302 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
303 		/* Buffer already present */
304 		cb(bdev_io->ch->channel, bdev_io);
305 		return;
306 	}
307 
308 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
309 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
310 
311 	bdev_io->buf_len = len;
312 	bdev_io->get_buf_cb = cb;
313 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
314 		pool = g_bdev_mgr.buf_small_pool;
315 		tailq = &ch->need_buf_small;
316 	} else {
317 		pool = g_bdev_mgr.buf_large_pool;
318 		tailq = &ch->need_buf_large;
319 	}
320 
321 	buf = spdk_mempool_get(pool);
322 
323 	if (!buf) {
324 		TAILQ_INSERT_TAIL(tailq, bdev_io, buf_link);
325 	} else {
326 		spdk_bdev_io_set_buf(bdev_io, buf);
327 	}
328 }
329 
330 static int
331 spdk_bdev_module_get_max_ctx_size(void)
332 {
333 	struct spdk_bdev_module_if *bdev_module;
334 	int max_bdev_module_size = 0;
335 
336 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
337 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
338 			max_bdev_module_size = bdev_module->get_ctx_size();
339 		}
340 	}
341 
342 	return max_bdev_module_size;
343 }
344 
345 void
346 spdk_bdev_config_text(FILE *fp)
347 {
348 	struct spdk_bdev_module_if *bdev_module;
349 
350 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
351 		if (bdev_module->config_text) {
352 			bdev_module->config_text(fp);
353 		}
354 	}
355 }
356 
357 static int
358 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
359 {
360 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
361 
362 	TAILQ_INIT(&ch->need_buf_small);
363 	TAILQ_INIT(&ch->need_buf_large);
364 
365 	return 0;
366 }
367 
368 static void
369 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
370 {
371 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
372 
373 	if (!TAILQ_EMPTY(&ch->need_buf_small) || !TAILQ_EMPTY(&ch->need_buf_large)) {
374 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel destruction\n");
375 	}
376 }
377 
378 static void
379 spdk_bdev_init_complete(int rc)
380 {
381 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
382 	void *cb_arg = g_init_cb_arg;
383 
384 	g_bdev_mgr.init_complete = true;
385 	g_init_cb_fn = NULL;
386 	g_init_cb_arg = NULL;
387 
388 	cb_fn(cb_arg, rc);
389 }
390 
391 static void
392 spdk_bdev_module_action_complete(void)
393 {
394 	struct spdk_bdev_module_if *m;
395 
396 	/*
397 	 * Don't finish bdev subsystem initialization if
398 	 * module pre-initialization is still in progress, or
399 	 * the subsystem been already initialized.
400 	 */
401 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
402 		return;
403 	}
404 
405 	/*
406 	 * Check all bdev modules for inits/examinations in progress. If any
407 	 * exist, return immediately since we cannot finish bdev subsystem
408 	 * initialization until all are completed.
409 	 */
410 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
411 		if (m->action_in_progress > 0) {
412 			return;
413 		}
414 	}
415 
416 	/*
417 	 * Modules already finished initialization - now that all
418 	 * the bdev modules have finished their asynchronous I/O
419 	 * processing, the entire bdev layer can be marked as complete.
420 	 */
421 	spdk_bdev_init_complete(0);
422 }
423 
424 static void
425 spdk_bdev_module_action_done(struct spdk_bdev_module_if *module)
426 {
427 	assert(module->action_in_progress > 0);
428 	module->action_in_progress--;
429 	spdk_bdev_module_action_complete();
430 }
431 
432 void
433 spdk_bdev_module_init_done(struct spdk_bdev_module_if *module)
434 {
435 	spdk_bdev_module_action_done(module);
436 }
437 
438 void
439 spdk_bdev_module_examine_done(struct spdk_bdev_module_if *module)
440 {
441 	spdk_bdev_module_action_done(module);
442 }
443 
444 static int
445 spdk_bdev_modules_init(void)
446 {
447 	struct spdk_bdev_module_if *module;
448 	int rc = 0;
449 
450 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
451 		rc = module->module_init();
452 		if (rc != 0) {
453 			break;
454 		}
455 	}
456 
457 	g_bdev_mgr.module_init_complete = true;
458 	return rc;
459 }
460 
461 void
462 spdk_bdev_poller_start(struct spdk_bdev_poller **ppoller,
463 		       spdk_bdev_poller_fn fn,
464 		       void *arg,
465 		       uint64_t period_microseconds)
466 {
467 	g_bdev_mgr.start_poller_fn(ppoller, fn, arg, period_microseconds);
468 }
469 
470 void
471 spdk_bdev_poller_stop(struct spdk_bdev_poller **ppoller)
472 {
473 	g_bdev_mgr.stop_poller_fn(ppoller);
474 }
475 
476 void
477 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg,
478 		     spdk_bdev_poller_start_cb start_poller_fn,
479 		     spdk_bdev_poller_stop_cb stop_poller_fn)
480 {
481 	int cache_size;
482 	int rc = 0;
483 	char mempool_name[32];
484 
485 	assert(cb_fn != NULL);
486 
487 	g_init_cb_fn = cb_fn;
488 	g_init_cb_arg = cb_arg;
489 
490 	g_bdev_mgr.start_poller_fn = start_poller_fn;
491 	g_bdev_mgr.stop_poller_fn = stop_poller_fn;
492 
493 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
494 
495 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
496 				  SPDK_BDEV_IO_POOL_SIZE,
497 				  sizeof(struct spdk_bdev_io) +
498 				  spdk_bdev_module_get_max_ctx_size(),
499 				  64,
500 				  SPDK_ENV_SOCKET_ID_ANY);
501 
502 	if (g_bdev_mgr.bdev_io_pool == NULL) {
503 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
504 		spdk_bdev_init_complete(-1);
505 		return;
506 	}
507 
508 	/**
509 	 * Ensure no more than half of the total buffers end up local caches, by
510 	 *   using spdk_env_get_core_count() to determine how many local caches we need
511 	 *   to account for.
512 	 */
513 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
514 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
515 
516 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
517 				    BUF_SMALL_POOL_SIZE,
518 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
519 				    cache_size,
520 				    SPDK_ENV_SOCKET_ID_ANY);
521 	if (!g_bdev_mgr.buf_small_pool) {
522 		SPDK_ERRLOG("create rbuf small pool failed\n");
523 		spdk_bdev_init_complete(-1);
524 		return;
525 	}
526 
527 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
528 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
529 
530 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
531 				    BUF_LARGE_POOL_SIZE,
532 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
533 				    cache_size,
534 				    SPDK_ENV_SOCKET_ID_ANY);
535 	if (!g_bdev_mgr.buf_large_pool) {
536 		SPDK_ERRLOG("create rbuf large pool failed\n");
537 		spdk_bdev_init_complete(-1);
538 		return;
539 	}
540 
541 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
542 				 NULL);
543 	if (!g_bdev_mgr.zero_buffer) {
544 		SPDK_ERRLOG("create bdev zero buffer failed\n");
545 		spdk_bdev_init_complete(-1);
546 		return;
547 	}
548 
549 #ifdef SPDK_CONFIG_VTUNE
550 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
551 #endif
552 
553 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
554 				spdk_bdev_mgmt_channel_destroy,
555 				sizeof(struct spdk_bdev_mgmt_channel));
556 
557 	rc = spdk_bdev_modules_init();
558 	if (rc != 0) {
559 		SPDK_ERRLOG("bdev modules init failed\n");
560 		spdk_bdev_init_complete(-1);
561 		return;
562 	}
563 
564 	spdk_bdev_module_action_complete();
565 }
566 
567 static void
568 spdk_bdev_module_finish_cb(void *io_device)
569 {
570 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
571 
572 	cb_fn(g_fini_cb_arg);
573 	g_fini_cb_fn = NULL;
574 	g_fini_cb_arg = NULL;
575 }
576 
577 static void
578 spdk_bdev_module_finish_complete(void)
579 {
580 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
581 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
582 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
583 			    SPDK_BDEV_IO_POOL_SIZE);
584 	}
585 
586 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
587 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
588 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
589 			    BUF_SMALL_POOL_SIZE);
590 		assert(false);
591 	}
592 
593 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
594 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
595 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
596 			    BUF_LARGE_POOL_SIZE);
597 		assert(false);
598 	}
599 
600 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
601 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
602 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
603 	spdk_dma_free(g_bdev_mgr.zero_buffer);
604 
605 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_module_finish_cb);
606 }
607 
608 static void
609 _call_next_module_fini(void *arg)
610 {
611 	struct spdk_bdev_module_if *module = arg;
612 
613 	module->module_fini();
614 }
615 
616 void
617 spdk_bdev_module_finish_done(void)
618 {
619 	if (spdk_get_thread() != g_fini_thread) {
620 		SPDK_ERRLOG("%s changed threads\n", g_bdev_module->name);
621 	}
622 
623 	if (!g_bdev_module) {
624 		g_bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules);
625 	} else {
626 		g_bdev_module = TAILQ_NEXT(g_bdev_module, tailq);
627 	}
628 
629 	if (!g_bdev_module) {
630 		spdk_bdev_module_finish_complete();
631 		return;
632 	}
633 
634 	if (g_bdev_module->module_fini) {
635 		spdk_thread_send_msg(g_fini_thread, _call_next_module_fini, g_bdev_module);
636 	}
637 
638 	if (!g_bdev_module->async_fini) {
639 		spdk_bdev_module_finish_done();
640 	}
641 }
642 
643 void
644 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
645 {
646 	assert(cb_fn != NULL);
647 
648 	g_fini_thread = spdk_get_thread();
649 
650 	g_fini_cb_fn = cb_fn;
651 	g_fini_cb_arg = cb_arg;
652 
653 	spdk_bdev_module_finish_done();
654 }
655 
656 struct spdk_bdev_io *
657 spdk_bdev_get_io(void)
658 {
659 	struct spdk_bdev_io *bdev_io;
660 
661 	bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
662 	if (!bdev_io) {
663 		SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
664 		abort();
665 	}
666 
667 	memset(bdev_io, 0, offsetof(struct spdk_bdev_io, u));
668 
669 	return bdev_io;
670 }
671 
672 static void
673 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
674 {
675 	if (bdev_io->buf != NULL) {
676 		spdk_bdev_io_put_buf(bdev_io);
677 	}
678 
679 	spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
680 }
681 
682 static void
683 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
684 {
685 	struct spdk_bdev *bdev = bdev_io->bdev;
686 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
687 	struct spdk_io_channel *ch = bdev_ch->channel;
688 
689 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
690 
691 	bdev_ch->io_outstanding++;
692 	bdev_io->in_submit_request = true;
693 	if (spdk_likely(bdev_ch->flags == 0)) {
694 		if (spdk_likely(TAILQ_EMPTY(&bdev_ch->nomem_io))) {
695 			bdev->fn_table->submit_request(ch, bdev_io);
696 		} else {
697 			bdev_ch->io_outstanding--;
698 			TAILQ_INSERT_TAIL(&bdev_ch->nomem_io, bdev_io, link);
699 		}
700 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
701 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
702 	} else {
703 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
704 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
705 	}
706 	bdev_io->in_submit_request = false;
707 }
708 
709 static void
710 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
711 {
712 	struct spdk_bdev *bdev = bdev_io->bdev;
713 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
714 	struct spdk_io_channel *ch = bdev_ch->channel;
715 
716 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
717 
718 	bdev_io->in_submit_request = true;
719 	bdev->fn_table->submit_request(ch, bdev_io);
720 	bdev_io->in_submit_request = false;
721 }
722 
723 static void
724 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
725 		  struct spdk_bdev *bdev, void *cb_arg,
726 		  spdk_bdev_io_completion_cb cb)
727 {
728 	bdev_io->bdev = bdev;
729 	bdev_io->caller_ctx = cb_arg;
730 	bdev_io->cb = cb;
731 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
732 	bdev_io->in_submit_request = false;
733 }
734 
735 bool
736 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
737 {
738 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
739 }
740 
741 int
742 spdk_bdev_dump_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
743 {
744 	if (bdev->fn_table->dump_config_json) {
745 		return bdev->fn_table->dump_config_json(bdev->ctxt, w);
746 	}
747 
748 	return 0;
749 }
750 
751 static int
752 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
753 {
754 	struct spdk_bdev		*bdev = io_device;
755 	struct spdk_bdev_channel	*ch = ctx_buf;
756 
757 	ch->bdev = io_device;
758 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
759 	if (!ch->channel) {
760 		return -1;
761 	}
762 
763 	ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr);
764 	if (!ch->mgmt_channel) {
765 		spdk_put_io_channel(ch->channel);
766 		return -1;
767 	}
768 
769 	memset(&ch->stat, 0, sizeof(ch->stat));
770 	ch->io_outstanding = 0;
771 	TAILQ_INIT(&ch->queued_resets);
772 	TAILQ_INIT(&ch->nomem_io);
773 	ch->nomem_threshold = 0;
774 	ch->flags = 0;
775 
776 #ifdef SPDK_CONFIG_VTUNE
777 	{
778 		char *name;
779 		__itt_init_ittlib(NULL, 0);
780 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
781 		if (!name) {
782 			spdk_put_io_channel(ch->channel);
783 			spdk_put_io_channel(ch->mgmt_channel);
784 			return -1;
785 		}
786 		ch->handle = __itt_string_handle_create(name);
787 		free(name);
788 		ch->start_tsc = spdk_get_ticks();
789 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
790 	}
791 #endif
792 
793 	return 0;
794 }
795 
796 /*
797  * Abort I/O that are waiting on a data buffer.  These types of I/O are
798  *  linked using the spdk_bdev_io buf_link TAILQ_ENTRY.
799  */
800 static void
801 _spdk_bdev_abort_buf_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
802 {
803 	struct spdk_bdev_io *bdev_io, *tmp;
804 
805 	TAILQ_FOREACH_SAFE(bdev_io, queue, buf_link, tmp) {
806 		if (bdev_io->ch == ch) {
807 			TAILQ_REMOVE(queue, bdev_io, buf_link);
808 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
809 		}
810 	}
811 }
812 
813 /*
814  * Abort I/O that are queued waiting for submission.  These types of I/O are
815  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
816  */
817 static void
818 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
819 {
820 	struct spdk_bdev_io *bdev_io, *tmp;
821 
822 	TAILQ_FOREACH_SAFE(bdev_io, queue, link, tmp) {
823 		if (bdev_io->ch == ch) {
824 			TAILQ_REMOVE(queue, bdev_io, link);
825 			/*
826 			 * spdk_bdev_io_complete() assumes that the completed I/O had
827 			 *  been submitted to the bdev module.  Since in this case it
828 			 *  hadn't, bump io_outstanding to account for the decrement
829 			 *  that spdk_bdev_io_complete() will do.
830 			 */
831 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
832 				ch->io_outstanding++;
833 			}
834 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
835 		}
836 	}
837 }
838 
839 static void
840 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
841 {
842 	struct spdk_bdev_channel	*ch = ctx_buf;
843 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
844 
845 	mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
846 
847 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
848 	_spdk_bdev_abort_queued_io(&ch->nomem_io, ch);
849 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, ch);
850 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, ch);
851 
852 	spdk_put_io_channel(ch->channel);
853 	spdk_put_io_channel(ch->mgmt_channel);
854 	assert(ch->io_outstanding == 0);
855 }
856 
857 struct spdk_io_channel *
858 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
859 {
860 	return spdk_get_io_channel(desc->bdev);
861 }
862 
863 const char *
864 spdk_bdev_get_name(const struct spdk_bdev *bdev)
865 {
866 	return bdev->name;
867 }
868 
869 const char *
870 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
871 {
872 	return bdev->product_name;
873 }
874 
875 uint32_t
876 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
877 {
878 	return bdev->blocklen;
879 }
880 
881 uint64_t
882 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
883 {
884 	return bdev->blockcnt;
885 }
886 
887 size_t
888 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
889 {
890 	/* TODO: push this logic down to the bdev modules */
891 	if (bdev->need_aligned_buffer) {
892 		return bdev->blocklen;
893 	}
894 
895 	return 1;
896 }
897 
898 uint32_t
899 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
900 {
901 	return bdev->optimal_io_boundary;
902 }
903 
904 bool
905 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
906 {
907 	return bdev->write_cache;
908 }
909 
910 /*
911  * Convert I/O offset and length from bytes to blocks.
912  *
913  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
914  */
915 static uint64_t
916 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
917 			  uint64_t num_bytes, uint64_t *num_blocks)
918 {
919 	uint32_t block_size = bdev->blocklen;
920 
921 	*offset_blocks = offset_bytes / block_size;
922 	*num_blocks = num_bytes / block_size;
923 
924 	return (offset_bytes % block_size) | (num_bytes % block_size);
925 }
926 
927 static bool
928 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
929 {
930 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
931 	 * has been an overflow and hence the offset has been wrapped around */
932 	if (offset_blocks + num_blocks < offset_blocks) {
933 		return false;
934 	}
935 
936 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
937 	if (offset_blocks + num_blocks > bdev->blockcnt) {
938 		return false;
939 	}
940 
941 	return true;
942 }
943 
944 int
945 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
946 	       void *buf, uint64_t offset, uint64_t nbytes,
947 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
948 {
949 	uint64_t offset_blocks, num_blocks;
950 
951 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
952 		return -EINVAL;
953 	}
954 
955 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
956 }
957 
958 int
959 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
960 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
961 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
962 {
963 	struct spdk_bdev *bdev = desc->bdev;
964 	struct spdk_bdev_io *bdev_io;
965 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
966 
967 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
968 		return -EINVAL;
969 	}
970 
971 	bdev_io = spdk_bdev_get_io();
972 	if (!bdev_io) {
973 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
974 		return -ENOMEM;
975 	}
976 
977 	bdev_io->ch = channel;
978 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
979 	bdev_io->u.bdev.iov.iov_base = buf;
980 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
981 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
982 	bdev_io->u.bdev.iovcnt = 1;
983 	bdev_io->u.bdev.num_blocks = num_blocks;
984 	bdev_io->u.bdev.offset_blocks = offset_blocks;
985 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
986 
987 	spdk_bdev_io_submit(bdev_io);
988 	return 0;
989 }
990 
991 int
992 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
993 		struct iovec *iov, int iovcnt,
994 		uint64_t offset, uint64_t nbytes,
995 		spdk_bdev_io_completion_cb cb, void *cb_arg)
996 {
997 	uint64_t offset_blocks, num_blocks;
998 
999 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1000 		return -EINVAL;
1001 	}
1002 
1003 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1004 }
1005 
1006 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1007 			   struct iovec *iov, int iovcnt,
1008 			   uint64_t offset_blocks, uint64_t num_blocks,
1009 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1010 {
1011 	struct spdk_bdev *bdev = desc->bdev;
1012 	struct spdk_bdev_io *bdev_io;
1013 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1014 
1015 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1016 		return -EINVAL;
1017 	}
1018 
1019 	bdev_io = spdk_bdev_get_io();
1020 	if (!bdev_io) {
1021 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1022 		return -ENOMEM;
1023 	}
1024 
1025 	bdev_io->ch = channel;
1026 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1027 	bdev_io->u.bdev.iovs = iov;
1028 	bdev_io->u.bdev.iovcnt = iovcnt;
1029 	bdev_io->u.bdev.num_blocks = num_blocks;
1030 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1031 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1032 
1033 	spdk_bdev_io_submit(bdev_io);
1034 	return 0;
1035 }
1036 
1037 int
1038 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1039 		void *buf, uint64_t offset, uint64_t nbytes,
1040 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1041 {
1042 	uint64_t offset_blocks, num_blocks;
1043 
1044 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1045 		return -EINVAL;
1046 	}
1047 
1048 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1049 }
1050 
1051 int
1052 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1053 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1054 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1055 {
1056 	struct spdk_bdev *bdev = desc->bdev;
1057 	struct spdk_bdev_io *bdev_io;
1058 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1059 
1060 	if (!desc->write) {
1061 		return -EBADF;
1062 	}
1063 
1064 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1065 		return -EINVAL;
1066 	}
1067 
1068 	bdev_io = spdk_bdev_get_io();
1069 	if (!bdev_io) {
1070 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
1071 		return -ENOMEM;
1072 	}
1073 
1074 	bdev_io->ch = channel;
1075 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1076 	bdev_io->u.bdev.iov.iov_base = buf;
1077 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1078 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1079 	bdev_io->u.bdev.iovcnt = 1;
1080 	bdev_io->u.bdev.num_blocks = num_blocks;
1081 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1082 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1083 
1084 	spdk_bdev_io_submit(bdev_io);
1085 	return 0;
1086 }
1087 
1088 int
1089 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1090 		 struct iovec *iov, int iovcnt,
1091 		 uint64_t offset, uint64_t len,
1092 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
1093 {
1094 	uint64_t offset_blocks, num_blocks;
1095 
1096 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1097 		return -EINVAL;
1098 	}
1099 
1100 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1101 }
1102 
1103 int
1104 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1105 			struct iovec *iov, int iovcnt,
1106 			uint64_t offset_blocks, uint64_t num_blocks,
1107 			spdk_bdev_io_completion_cb cb, void *cb_arg)
1108 {
1109 	struct spdk_bdev *bdev = desc->bdev;
1110 	struct spdk_bdev_io *bdev_io;
1111 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1112 
1113 	if (!desc->write) {
1114 		return -EBADF;
1115 	}
1116 
1117 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1118 		return -EINVAL;
1119 	}
1120 
1121 	bdev_io = spdk_bdev_get_io();
1122 	if (!bdev_io) {
1123 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
1124 		return -ENOMEM;
1125 	}
1126 
1127 	bdev_io->ch = channel;
1128 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1129 	bdev_io->u.bdev.iovs = iov;
1130 	bdev_io->u.bdev.iovcnt = iovcnt;
1131 	bdev_io->u.bdev.num_blocks = num_blocks;
1132 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1133 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1134 
1135 	spdk_bdev_io_submit(bdev_io);
1136 	return 0;
1137 }
1138 
1139 int
1140 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1141 		       uint64_t offset, uint64_t len,
1142 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1143 {
1144 	uint64_t offset_blocks, num_blocks;
1145 
1146 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1147 		return -EINVAL;
1148 	}
1149 
1150 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1151 }
1152 
1153 int
1154 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1155 			      uint64_t offset_blocks, uint64_t num_blocks,
1156 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1157 {
1158 	struct spdk_bdev *bdev = desc->bdev;
1159 	struct spdk_bdev_io *bdev_io;
1160 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1161 	uint64_t len;
1162 	bool split_request = false;
1163 
1164 	if (num_blocks > UINT64_MAX / spdk_bdev_get_block_size(bdev)) {
1165 		SPDK_ERRLOG("length argument out of range in write_zeroes\n");
1166 		return -ERANGE;
1167 	}
1168 
1169 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1170 		return -EINVAL;
1171 	}
1172 
1173 	bdev_io = spdk_bdev_get_io();
1174 
1175 	if (!bdev_io) {
1176 		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
1177 		return -ENOMEM;
1178 	}
1179 
1180 	bdev_io->ch = channel;
1181 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1182 
1183 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1184 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1185 		bdev_io->u.bdev.num_blocks = num_blocks;
1186 		bdev_io->u.bdev.iovs = NULL;
1187 		bdev_io->u.bdev.iovcnt = 0;
1188 
1189 	} else {
1190 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
1191 
1192 		len = spdk_bdev_get_block_size(bdev) * num_blocks;
1193 
1194 		if (len > ZERO_BUFFER_SIZE) {
1195 			split_request = true;
1196 			len = ZERO_BUFFER_SIZE;
1197 		}
1198 
1199 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1200 		bdev_io->u.bdev.iov.iov_base = g_bdev_mgr.zero_buffer;
1201 		bdev_io->u.bdev.iov.iov_len = len;
1202 		bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1203 		bdev_io->u.bdev.iovcnt = 1;
1204 		bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev);
1205 		bdev_io->split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks;
1206 		bdev_io->split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks;
1207 	}
1208 
1209 	if (split_request) {
1210 		bdev_io->stored_user_cb = cb;
1211 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split);
1212 	} else {
1213 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1214 	}
1215 	spdk_bdev_io_submit(bdev_io);
1216 	return 0;
1217 }
1218 
1219 int
1220 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1221 		uint64_t offset, uint64_t nbytes,
1222 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1223 {
1224 	uint64_t offset_blocks, num_blocks;
1225 
1226 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1227 		return -EINVAL;
1228 	}
1229 
1230 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1231 }
1232 
1233 int
1234 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1235 		       uint64_t offset_blocks, uint64_t num_blocks,
1236 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1237 {
1238 	struct spdk_bdev *bdev = desc->bdev;
1239 	struct spdk_bdev_io *bdev_io;
1240 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1241 
1242 	if (!desc->write) {
1243 		return -EBADF;
1244 	}
1245 
1246 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1247 		return -EINVAL;
1248 	}
1249 
1250 	if (num_blocks == 0) {
1251 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
1252 		return -EINVAL;
1253 	}
1254 
1255 	bdev_io = spdk_bdev_get_io();
1256 	if (!bdev_io) {
1257 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
1258 		return -ENOMEM;
1259 	}
1260 
1261 	bdev_io->ch = channel;
1262 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1263 	bdev_io->u.bdev.iov.iov_base = NULL;
1264 	bdev_io->u.bdev.iov.iov_len = 0;
1265 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1266 	bdev_io->u.bdev.iovcnt = 1;
1267 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1268 	bdev_io->u.bdev.num_blocks = num_blocks;
1269 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1270 
1271 	spdk_bdev_io_submit(bdev_io);
1272 	return 0;
1273 }
1274 
1275 int
1276 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1277 		uint64_t offset, uint64_t length,
1278 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1279 {
1280 	uint64_t offset_blocks, num_blocks;
1281 
1282 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
1283 		return -EINVAL;
1284 	}
1285 
1286 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1287 }
1288 
1289 int
1290 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1291 		       uint64_t offset_blocks, uint64_t num_blocks,
1292 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1293 {
1294 	struct spdk_bdev *bdev = desc->bdev;
1295 	struct spdk_bdev_io *bdev_io;
1296 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1297 
1298 	if (!desc->write) {
1299 		return -EBADF;
1300 	}
1301 
1302 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1303 		return -EINVAL;
1304 	}
1305 
1306 	bdev_io = spdk_bdev_get_io();
1307 	if (!bdev_io) {
1308 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1309 		return -ENOMEM;
1310 	}
1311 
1312 	bdev_io->ch = channel;
1313 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1314 	bdev_io->u.bdev.iovs = NULL;
1315 	bdev_io->u.bdev.iovcnt = 0;
1316 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1317 	bdev_io->u.bdev.num_blocks = num_blocks;
1318 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1319 
1320 	spdk_bdev_io_submit(bdev_io);
1321 	return 0;
1322 }
1323 
1324 static void
1325 _spdk_bdev_reset_dev(void *io_device, void *ctx)
1326 {
1327 	struct spdk_bdev_channel *ch = ctx;
1328 	struct spdk_bdev_io *bdev_io;
1329 
1330 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
1331 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, link);
1332 	spdk_bdev_io_submit_reset(bdev_io);
1333 }
1334 
1335 static void
1336 _spdk_bdev_reset_abort_channel(void *io_device, struct spdk_io_channel *ch,
1337 			       void *ctx)
1338 {
1339 	struct spdk_bdev_channel	*channel;
1340 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1341 
1342 	channel = spdk_io_channel_get_ctx(ch);
1343 	mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
1344 
1345 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
1346 
1347 	_spdk_bdev_abort_queued_io(&channel->nomem_io, channel);
1348 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
1349 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
1350 }
1351 
1352 static void
1353 _spdk_bdev_start_reset(void *ctx)
1354 {
1355 	struct spdk_bdev_channel *ch = ctx;
1356 
1357 	spdk_for_each_channel(ch->bdev, _spdk_bdev_reset_abort_channel,
1358 			      ch, _spdk_bdev_reset_dev);
1359 }
1360 
1361 static void
1362 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
1363 {
1364 	struct spdk_bdev *bdev = ch->bdev;
1365 
1366 	assert(!TAILQ_EMPTY(&ch->queued_resets));
1367 
1368 	pthread_mutex_lock(&bdev->mutex);
1369 	if (bdev->reset_in_progress == NULL) {
1370 		bdev->reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
1371 		/*
1372 		 * Take a channel reference for the target bdev for the life of this
1373 		 *  reset.  This guards against the channel getting destroyed while
1374 		 *  spdk_for_each_channel() calls related to this reset IO are in
1375 		 *  progress.  We will release the reference when this reset is
1376 		 *  completed.
1377 		 */
1378 		bdev->reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(bdev);
1379 		_spdk_bdev_start_reset(ch);
1380 	}
1381 	pthread_mutex_unlock(&bdev->mutex);
1382 }
1383 
1384 static void
1385 _spdk_bdev_complete_reset_channel(void *io_device, struct spdk_io_channel *_ch, void *ctx)
1386 {
1387 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
1388 
1389 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
1390 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
1391 		_spdk_bdev_channel_start_reset(ch);
1392 	}
1393 }
1394 
1395 int
1396 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1397 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1398 {
1399 	struct spdk_bdev *bdev = desc->bdev;
1400 	struct spdk_bdev_io *bdev_io;
1401 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1402 
1403 	bdev_io = spdk_bdev_get_io();
1404 	if (!bdev_io) {
1405 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1406 		return -ENOMEM;
1407 	}
1408 
1409 	bdev_io->ch = channel;
1410 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1411 	bdev_io->u.reset.ch_ref = NULL;
1412 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1413 
1414 	pthread_mutex_lock(&bdev->mutex);
1415 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, link);
1416 	pthread_mutex_unlock(&bdev->mutex);
1417 
1418 	_spdk_bdev_channel_start_reset(channel);
1419 
1420 	return 0;
1421 }
1422 
1423 void
1424 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1425 		      struct spdk_bdev_io_stat *stat)
1426 {
1427 #ifdef SPDK_CONFIG_VTUNE
1428 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1429 	memset(stat, 0, sizeof(*stat));
1430 	return;
1431 #endif
1432 
1433 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1434 
1435 	*stat = channel->stat;
1436 	memset(&channel->stat, 0, sizeof(channel->stat));
1437 }
1438 
1439 int
1440 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1441 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1442 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1443 {
1444 	struct spdk_bdev *bdev = desc->bdev;
1445 	struct spdk_bdev_io *bdev_io;
1446 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1447 
1448 	if (!desc->write) {
1449 		return -EBADF;
1450 	}
1451 
1452 	bdev_io = spdk_bdev_get_io();
1453 	if (!bdev_io) {
1454 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1455 		return -ENOMEM;
1456 	}
1457 
1458 	bdev_io->ch = channel;
1459 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
1460 	bdev_io->u.nvme_passthru.cmd = *cmd;
1461 	bdev_io->u.nvme_passthru.buf = buf;
1462 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1463 
1464 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1465 
1466 	spdk_bdev_io_submit(bdev_io);
1467 	return 0;
1468 }
1469 
1470 int
1471 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1472 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1473 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1474 {
1475 	struct spdk_bdev *bdev = desc->bdev;
1476 	struct spdk_bdev_io *bdev_io;
1477 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1478 
1479 	if (!desc->write) {
1480 		/*
1481 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1482 		 *  to easily determine if the command is a read or write, but for now just
1483 		 *  do not allow io_passthru with a read-only descriptor.
1484 		 */
1485 		return -EBADF;
1486 	}
1487 
1488 	bdev_io = spdk_bdev_get_io();
1489 	if (!bdev_io) {
1490 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1491 		return -ENOMEM;
1492 	}
1493 
1494 	bdev_io->ch = channel;
1495 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
1496 	bdev_io->u.nvme_passthru.cmd = *cmd;
1497 	bdev_io->u.nvme_passthru.buf = buf;
1498 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1499 
1500 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1501 
1502 	spdk_bdev_io_submit(bdev_io);
1503 	return 0;
1504 }
1505 
1506 int
1507 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
1508 {
1509 	if (!bdev_io) {
1510 		SPDK_ERRLOG("bdev_io is NULL\n");
1511 		return -1;
1512 	}
1513 
1514 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
1515 		SPDK_ERRLOG("bdev_io is in pending state\n");
1516 		assert(false);
1517 		return -1;
1518 	}
1519 
1520 	spdk_bdev_put_io(bdev_io);
1521 
1522 	return 0;
1523 }
1524 
1525 static void
1526 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
1527 {
1528 	struct spdk_bdev *bdev = bdev_ch->bdev;
1529 	struct spdk_bdev_io *bdev_io;
1530 
1531 	if (bdev_ch->io_outstanding > bdev_ch->nomem_threshold) {
1532 		/*
1533 		 * Allow some more I/O to complete before retrying the nomem_io queue.
1534 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
1535 		 *  the context of a completion, because the resources for the I/O are
1536 		 *  not released until control returns to the bdev poller.  Also, we
1537 		 *  may require several small I/O to complete before a larger I/O
1538 		 *  (that requires splitting) can be submitted.
1539 		 */
1540 		return;
1541 	}
1542 
1543 	while (!TAILQ_EMPTY(&bdev_ch->nomem_io)) {
1544 		bdev_io = TAILQ_FIRST(&bdev_ch->nomem_io);
1545 		TAILQ_REMOVE(&bdev_ch->nomem_io, bdev_io, link);
1546 		bdev_ch->io_outstanding++;
1547 		bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
1548 		bdev->fn_table->submit_request(bdev_ch->channel, bdev_io);
1549 		if (bdev_io->status == SPDK_BDEV_IO_STATUS_NOMEM) {
1550 			break;
1551 		}
1552 	}
1553 }
1554 
1555 static void
1556 _spdk_bdev_io_complete(void *ctx)
1557 {
1558 	struct spdk_bdev_io *bdev_io = ctx;
1559 
1560 	assert(bdev_io->cb != NULL);
1561 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, bdev_io->caller_ctx);
1562 }
1563 
1564 void
1565 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
1566 {
1567 	struct spdk_bdev *bdev = bdev_io->bdev;
1568 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
1569 
1570 	bdev_io->status = status;
1571 
1572 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
1573 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
1574 			SPDK_ERRLOG("NOMEM returned for reset\n");
1575 		}
1576 		pthread_mutex_lock(&bdev->mutex);
1577 		if (bdev_io == bdev->reset_in_progress) {
1578 			bdev->reset_in_progress = NULL;
1579 		}
1580 		pthread_mutex_unlock(&bdev->mutex);
1581 		if (bdev_io->u.reset.ch_ref != NULL) {
1582 			spdk_put_io_channel(bdev_io->u.reset.ch_ref);
1583 		}
1584 		spdk_for_each_channel(bdev, _spdk_bdev_complete_reset_channel, NULL, NULL);
1585 	} else {
1586 		assert(bdev_ch->io_outstanding > 0);
1587 		bdev_ch->io_outstanding--;
1588 		if (spdk_likely(status != SPDK_BDEV_IO_STATUS_NOMEM)) {
1589 			if (spdk_unlikely(!TAILQ_EMPTY(&bdev_ch->nomem_io))) {
1590 				_spdk_bdev_ch_retry_io(bdev_ch);
1591 			}
1592 		} else {
1593 			TAILQ_INSERT_HEAD(&bdev_ch->nomem_io, bdev_io, link);
1594 			/*
1595 			 * Wait for some of the outstanding I/O to complete before we
1596 			 *  retry any of the nomem_io.  Normally we will wait for
1597 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
1598 			 *  depth channels we will instead wait for half to complete.
1599 			 */
1600 			bdev_ch->nomem_threshold = spdk_max(bdev_ch->io_outstanding / 2,
1601 							    bdev_ch->io_outstanding - NOMEM_THRESHOLD_COUNT);
1602 			return;
1603 		}
1604 	}
1605 
1606 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1607 		switch (bdev_io->type) {
1608 		case SPDK_BDEV_IO_TYPE_READ:
1609 			bdev_ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev->blocklen;
1610 			bdev_ch->stat.num_read_ops++;
1611 			break;
1612 		case SPDK_BDEV_IO_TYPE_WRITE:
1613 			bdev_ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev->blocklen;
1614 			bdev_ch->stat.num_write_ops++;
1615 			break;
1616 		default:
1617 			break;
1618 		}
1619 	}
1620 
1621 #ifdef SPDK_CONFIG_VTUNE
1622 	uint64_t now_tsc = spdk_get_ticks();
1623 	if (now_tsc > (bdev_ch->start_tsc + bdev_ch->interval_tsc)) {
1624 		uint64_t data[5];
1625 
1626 		data[0] = bdev_ch->stat.num_read_ops;
1627 		data[1] = bdev_ch->stat.bytes_read;
1628 		data[2] = bdev_ch->stat.num_write_ops;
1629 		data[3] = bdev_ch->stat.bytes_written;
1630 		data[4] = bdev->fn_table->get_spin_time ?
1631 			  bdev->fn_table->get_spin_time(bdev_ch->channel) : 0;
1632 
1633 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_ch->handle,
1634 				   __itt_metadata_u64, 5, data);
1635 
1636 		memset(&bdev_ch->stat, 0, sizeof(bdev_ch->stat));
1637 		bdev_ch->start_tsc = now_tsc;
1638 	}
1639 #endif
1640 
1641 	if (bdev_io->in_submit_request || bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
1642 		/*
1643 		 * Defer completion to avoid potential infinite recursion if the
1644 		 * user's completion callback issues a new I/O.
1645 		 */
1646 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_ch->channel),
1647 				     _spdk_bdev_io_complete, bdev_io);
1648 	} else {
1649 		_spdk_bdev_io_complete(bdev_io);
1650 	}
1651 }
1652 
1653 void
1654 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
1655 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
1656 {
1657 	if (sc == SPDK_SCSI_STATUS_GOOD) {
1658 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1659 	} else {
1660 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
1661 		bdev_io->error.scsi.sc = sc;
1662 		bdev_io->error.scsi.sk = sk;
1663 		bdev_io->error.scsi.asc = asc;
1664 		bdev_io->error.scsi.ascq = ascq;
1665 	}
1666 
1667 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1668 }
1669 
1670 void
1671 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
1672 			     int *sc, int *sk, int *asc, int *ascq)
1673 {
1674 	assert(sc != NULL);
1675 	assert(sk != NULL);
1676 	assert(asc != NULL);
1677 	assert(ascq != NULL);
1678 
1679 	switch (bdev_io->status) {
1680 	case SPDK_BDEV_IO_STATUS_SUCCESS:
1681 		*sc = SPDK_SCSI_STATUS_GOOD;
1682 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
1683 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1684 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1685 		break;
1686 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
1687 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
1688 		break;
1689 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
1690 		*sc = bdev_io->error.scsi.sc;
1691 		*sk = bdev_io->error.scsi.sk;
1692 		*asc = bdev_io->error.scsi.asc;
1693 		*ascq = bdev_io->error.scsi.ascq;
1694 		break;
1695 	default:
1696 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
1697 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
1698 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1699 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1700 		break;
1701 	}
1702 }
1703 
1704 void
1705 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
1706 {
1707 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
1708 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1709 	} else {
1710 		bdev_io->error.nvme.sct = sct;
1711 		bdev_io->error.nvme.sc = sc;
1712 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
1713 	}
1714 
1715 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1716 }
1717 
1718 void
1719 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
1720 {
1721 	assert(sct != NULL);
1722 	assert(sc != NULL);
1723 
1724 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
1725 		*sct = bdev_io->error.nvme.sct;
1726 		*sc = bdev_io->error.nvme.sc;
1727 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1728 		*sct = SPDK_NVME_SCT_GENERIC;
1729 		*sc = SPDK_NVME_SC_SUCCESS;
1730 	} else {
1731 		*sct = SPDK_NVME_SCT_GENERIC;
1732 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1733 	}
1734 }
1735 
1736 struct spdk_thread *
1737 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
1738 {
1739 	return spdk_io_channel_get_thread(bdev_io->ch->channel);
1740 }
1741 
1742 static void
1743 _spdk_bdev_register(struct spdk_bdev *bdev)
1744 {
1745 	struct spdk_bdev_module_if *module;
1746 
1747 	assert(bdev->module != NULL);
1748 
1749 	bdev->status = SPDK_BDEV_STATUS_READY;
1750 
1751 	TAILQ_INIT(&bdev->open_descs);
1752 
1753 	TAILQ_INIT(&bdev->vbdevs);
1754 	TAILQ_INIT(&bdev->base_bdevs);
1755 
1756 	bdev->reset_in_progress = NULL;
1757 
1758 	spdk_io_device_register(bdev, spdk_bdev_channel_create, spdk_bdev_channel_destroy,
1759 				sizeof(struct spdk_bdev_channel));
1760 
1761 	pthread_mutex_init(&bdev->mutex, NULL);
1762 	SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Inserting bdev %s into list\n", bdev->name);
1763 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
1764 
1765 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
1766 		if (module->examine) {
1767 			module->action_in_progress++;
1768 			module->examine(bdev);
1769 		}
1770 	}
1771 }
1772 
1773 void
1774 spdk_bdev_register(struct spdk_bdev *bdev)
1775 {
1776 	_spdk_bdev_register(bdev);
1777 }
1778 
1779 void
1780 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
1781 {
1782 	int i;
1783 
1784 	_spdk_bdev_register(vbdev);
1785 	for (i = 0; i < base_bdev_count; i++) {
1786 		assert(base_bdevs[i] != NULL);
1787 		TAILQ_INSERT_TAIL(&vbdev->base_bdevs, base_bdevs[i], base_bdev_link);
1788 		TAILQ_INSERT_TAIL(&base_bdevs[i]->vbdevs, vbdev, vbdev_link);
1789 	}
1790 }
1791 
1792 void
1793 spdk_bdev_unregister_done(struct spdk_bdev *bdev, int bdeverrno)
1794 {
1795 	if (bdev->unregister_cb != NULL) {
1796 		bdev->unregister_cb(bdev->unregister_ctx, bdeverrno);
1797 	}
1798 }
1799 
1800 void
1801 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
1802 {
1803 	struct spdk_bdev_desc	*desc, *tmp;
1804 	int			rc;
1805 	bool			do_destruct = true;
1806 
1807 	SPDK_DEBUGLOG(SPDK_TRACE_BDEV, "Removing bdev %s from list\n", bdev->name);
1808 
1809 	pthread_mutex_lock(&bdev->mutex);
1810 
1811 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
1812 	bdev->unregister_cb = cb_fn;
1813 	bdev->unregister_ctx = cb_arg;
1814 
1815 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
1816 		if (desc->remove_cb) {
1817 			pthread_mutex_unlock(&bdev->mutex);
1818 			do_destruct = false;
1819 			desc->remove_cb(desc->remove_ctx);
1820 			pthread_mutex_lock(&bdev->mutex);
1821 		}
1822 	}
1823 
1824 	if (!do_destruct) {
1825 		pthread_mutex_unlock(&bdev->mutex);
1826 		return;
1827 	}
1828 
1829 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
1830 	pthread_mutex_unlock(&bdev->mutex);
1831 
1832 	pthread_mutex_destroy(&bdev->mutex);
1833 
1834 	spdk_io_device_unregister(bdev, NULL);
1835 
1836 	rc = bdev->fn_table->destruct(bdev->ctxt);
1837 	if (rc < 0) {
1838 		SPDK_ERRLOG("destruct failed\n");
1839 	}
1840 	if (rc <= 0 && cb_fn != NULL) {
1841 		cb_fn(cb_arg, rc);
1842 	}
1843 }
1844 
1845 void
1846 spdk_vbdev_unregister(struct spdk_bdev *vbdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
1847 {
1848 	struct spdk_bdev *base_bdev;
1849 
1850 	assert(!TAILQ_EMPTY(&vbdev->base_bdevs));
1851 	TAILQ_FOREACH(base_bdev, &vbdev->base_bdevs, base_bdev_link) {
1852 		TAILQ_REMOVE(&base_bdev->vbdevs, vbdev, vbdev_link);
1853 	}
1854 	spdk_bdev_unregister(vbdev, cb_fn, cb_arg);
1855 }
1856 
1857 int
1858 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
1859 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
1860 {
1861 	struct spdk_bdev_desc *desc;
1862 
1863 	desc = calloc(1, sizeof(*desc));
1864 	if (desc == NULL) {
1865 		return -ENOMEM;
1866 	}
1867 
1868 	pthread_mutex_lock(&bdev->mutex);
1869 
1870 	if (write && bdev->claim_module) {
1871 		SPDK_ERRLOG("failed, %s already claimed\n", bdev->name);
1872 		free(desc);
1873 		pthread_mutex_unlock(&bdev->mutex);
1874 		return -EPERM;
1875 	}
1876 
1877 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
1878 
1879 	desc->bdev = bdev;
1880 	desc->remove_cb = remove_cb;
1881 	desc->remove_ctx = remove_ctx;
1882 	desc->write = write;
1883 	*_desc = desc;
1884 
1885 	pthread_mutex_unlock(&bdev->mutex);
1886 
1887 	return 0;
1888 }
1889 
1890 void
1891 spdk_bdev_close(struct spdk_bdev_desc *desc)
1892 {
1893 	struct spdk_bdev *bdev = desc->bdev;
1894 	bool do_unregister = false;
1895 
1896 	pthread_mutex_lock(&bdev->mutex);
1897 
1898 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
1899 	free(desc);
1900 
1901 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
1902 		do_unregister = true;
1903 	}
1904 	pthread_mutex_unlock(&bdev->mutex);
1905 
1906 	if (do_unregister == true) {
1907 		spdk_bdev_unregister(bdev, bdev->unregister_cb, bdev->unregister_ctx);
1908 	}
1909 }
1910 
1911 int
1912 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
1913 			    struct spdk_bdev_module_if *module)
1914 {
1915 	if (bdev->claim_module != NULL) {
1916 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
1917 			    bdev->claim_module->name);
1918 		return -EPERM;
1919 	}
1920 
1921 	if (desc && !desc->write) {
1922 		desc->write = true;
1923 	}
1924 
1925 	bdev->claim_module = module;
1926 	return 0;
1927 }
1928 
1929 void
1930 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
1931 {
1932 	assert(bdev->claim_module != NULL);
1933 	bdev->claim_module = NULL;
1934 }
1935 
1936 struct spdk_bdev *
1937 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
1938 {
1939 	return desc->bdev;
1940 }
1941 
1942 void
1943 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
1944 {
1945 	struct iovec *iovs;
1946 	int iovcnt;
1947 
1948 	if (bdev_io == NULL) {
1949 		return;
1950 	}
1951 
1952 	switch (bdev_io->type) {
1953 	case SPDK_BDEV_IO_TYPE_READ:
1954 		iovs = bdev_io->u.bdev.iovs;
1955 		iovcnt = bdev_io->u.bdev.iovcnt;
1956 		break;
1957 	case SPDK_BDEV_IO_TYPE_WRITE:
1958 		iovs = bdev_io->u.bdev.iovs;
1959 		iovcnt = bdev_io->u.bdev.iovcnt;
1960 		break;
1961 	default:
1962 		iovs = NULL;
1963 		iovcnt = 0;
1964 		break;
1965 	}
1966 
1967 	if (iovp) {
1968 		*iovp = iovs;
1969 	}
1970 	if (iovcntp) {
1971 		*iovcntp = iovcnt;
1972 	}
1973 }
1974 
1975 void
1976 spdk_bdev_module_list_add(struct spdk_bdev_module_if *bdev_module)
1977 {
1978 	/*
1979 	 * Modules with examine callbacks must be initialized first, so they are
1980 	 *  ready to handle examine callbacks from later modules that will
1981 	 *  register physical bdevs.
1982 	 */
1983 	if (bdev_module->examine != NULL) {
1984 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
1985 	} else {
1986 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
1987 	}
1988 }
1989 
1990 void
1991 spdk_bdev_part_base_free(struct spdk_bdev_part_base *base)
1992 {
1993 	if (base->desc) {
1994 		spdk_bdev_close(base->desc);
1995 		base->desc = NULL;
1996 	}
1997 	base->base_free_fn(base);
1998 }
1999 
2000 void
2001 spdk_bdev_part_free(struct spdk_bdev_part *part)
2002 {
2003 	struct spdk_bdev_part_base *base;
2004 
2005 	assert(part);
2006 	assert(part->base);
2007 
2008 	base = part->base;
2009 	spdk_io_device_unregister(&part->base, NULL);
2010 	TAILQ_REMOVE(base->tailq, part, tailq);
2011 	free(part->bdev.name);
2012 	free(part);
2013 
2014 	if (__sync_sub_and_fetch(&base->ref, 1) == 0) {
2015 		spdk_bdev_module_release_bdev(base->bdev);
2016 		spdk_bdev_part_base_free(base);
2017 	}
2018 }
2019 
2020 void
2021 spdk_bdev_part_tailq_fini(struct bdev_part_tailq *tailq)
2022 {
2023 	struct spdk_bdev_part *part, *tmp;
2024 
2025 	TAILQ_FOREACH_SAFE(part, tailq, tailq, tmp) {
2026 		spdk_bdev_part_free(part);
2027 	}
2028 }
2029 
2030 void
2031 spdk_bdev_part_base_hotremove(struct spdk_bdev *base_bdev, struct bdev_part_tailq *tailq)
2032 {
2033 	struct spdk_bdev_part *part, *tmp;
2034 
2035 	TAILQ_FOREACH_SAFE(part, tailq, tailq, tmp) {
2036 		if (part->base->bdev == base_bdev) {
2037 			spdk_vbdev_unregister(&part->bdev, NULL, NULL);
2038 		}
2039 	}
2040 }
2041 
2042 static bool
2043 spdk_bdev_part_io_type_supported(void *_part, enum spdk_bdev_io_type io_type)
2044 {
2045 	struct spdk_bdev_part *part = _part;
2046 
2047 	return part->base->bdev->fn_table->io_type_supported(part->base->bdev, io_type);
2048 }
2049 
2050 static struct spdk_io_channel *
2051 spdk_bdev_part_get_io_channel(void *_part)
2052 {
2053 	struct spdk_bdev_part *part = _part;
2054 
2055 	return spdk_get_io_channel(&part->base);
2056 }
2057 
2058 static void
2059 spdk_bdev_part_complete_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2060 {
2061 	struct spdk_bdev_io *part_io = cb_arg;
2062 	int status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
2063 
2064 	spdk_bdev_io_complete(part_io, status);
2065 	spdk_bdev_free_io(bdev_io);
2066 }
2067 
2068 static void
2069 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2070 {
2071 	uint64_t len;
2072 
2073 	if (!success) {
2074 		bdev_io->cb = bdev_io->stored_user_cb;
2075 		_spdk_bdev_io_complete(bdev_io);
2076 		return;
2077 	}
2078 
2079 	/* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */
2080 	len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->split_remaining_num_blocks,
2081 		       ZERO_BUFFER_SIZE);
2082 
2083 	bdev_io->u.bdev.offset_blocks = bdev_io->split_current_offset_blocks;
2084 	bdev_io->u.bdev.iov.iov_len = len;
2085 	bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev);
2086 	bdev_io->split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks;
2087 	bdev_io->split_current_offset_blocks += bdev_io->u.bdev.num_blocks;
2088 
2089 	/* if this round completes the i/o, change the callback to be the original user callback */
2090 	if (bdev_io->split_remaining_num_blocks == 0) {
2091 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->stored_user_cb);
2092 	} else {
2093 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split);
2094 	}
2095 	spdk_bdev_io_submit(bdev_io);
2096 }
2097 
2098 void
2099 spdk_bdev_part_submit_request(struct spdk_bdev_part_channel *ch, struct spdk_bdev_io *bdev_io)
2100 {
2101 	struct spdk_bdev_part *part = ch->part;
2102 	struct spdk_io_channel *base_ch = ch->base_ch;
2103 	struct spdk_bdev_desc *base_desc = part->base->desc;
2104 	uint64_t offset;
2105 	int rc = 0;
2106 
2107 	/* Modify the I/O to adjust for the offset within the base bdev. */
2108 	switch (bdev_io->type) {
2109 	case SPDK_BDEV_IO_TYPE_READ:
2110 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2111 		rc = spdk_bdev_readv_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
2112 					    bdev_io->u.bdev.iovcnt, offset,
2113 					    bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
2114 					    bdev_io);
2115 		break;
2116 	case SPDK_BDEV_IO_TYPE_WRITE:
2117 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2118 		rc = spdk_bdev_writev_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
2119 					     bdev_io->u.bdev.iovcnt, offset,
2120 					     bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
2121 					     bdev_io);
2122 		break;
2123 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
2124 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2125 		rc = spdk_bdev_write_zeroes_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2126 						   spdk_bdev_part_complete_io, bdev_io);
2127 		break;
2128 	case SPDK_BDEV_IO_TYPE_UNMAP:
2129 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2130 		rc = spdk_bdev_unmap_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2131 					    spdk_bdev_part_complete_io, bdev_io);
2132 		break;
2133 	case SPDK_BDEV_IO_TYPE_FLUSH:
2134 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2135 		rc = spdk_bdev_flush_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2136 					    spdk_bdev_part_complete_io, bdev_io);
2137 		break;
2138 	case SPDK_BDEV_IO_TYPE_RESET:
2139 		rc = spdk_bdev_reset(base_desc, base_ch,
2140 				     spdk_bdev_part_complete_io, bdev_io);
2141 		break;
2142 	default:
2143 		SPDK_ERRLOG("split: unknown I/O type %d\n", bdev_io->type);
2144 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
2145 		return;
2146 	}
2147 
2148 	if (rc != 0) {
2149 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
2150 	}
2151 }
2152 static int
2153 spdk_bdev_part_channel_create_cb(void *io_device, void *ctx_buf)
2154 {
2155 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
2156 	struct spdk_bdev_part_channel *ch = ctx_buf;
2157 
2158 	ch->part = part;
2159 	ch->base_ch = spdk_bdev_get_io_channel(part->base->desc);
2160 	if (ch->base_ch == NULL) {
2161 		return -1;
2162 	}
2163 
2164 	if (part->base->ch_create_cb) {
2165 		return part->base->ch_create_cb(io_device, ctx_buf);
2166 	} else {
2167 		return 0;
2168 	}
2169 }
2170 
2171 static void
2172 spdk_bdev_part_channel_destroy_cb(void *io_device, void *ctx_buf)
2173 {
2174 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
2175 	struct spdk_bdev_part_channel *ch = ctx_buf;
2176 
2177 	if (part->base->ch_destroy_cb) {
2178 		part->base->ch_destroy_cb(io_device, ctx_buf);
2179 	}
2180 	spdk_put_io_channel(ch->base_ch);
2181 }
2182 
2183 int
2184 spdk_bdev_part_base_construct(struct spdk_bdev_part_base *base, struct spdk_bdev *bdev,
2185 			      spdk_bdev_remove_cb_t remove_cb, struct spdk_bdev_module_if *module,
2186 			      struct spdk_bdev_fn_table *fn_table, struct bdev_part_tailq *tailq,
2187 			      spdk_bdev_part_base_free_fn free_fn,
2188 			      uint32_t channel_size, spdk_io_channel_create_cb ch_create_cb,
2189 			      spdk_io_channel_destroy_cb ch_destroy_cb)
2190 {
2191 	int rc;
2192 
2193 	fn_table->get_io_channel = spdk_bdev_part_get_io_channel;
2194 	fn_table->io_type_supported = spdk_bdev_part_io_type_supported;
2195 
2196 	base->bdev = bdev;
2197 	base->desc = NULL;
2198 	base->ref = 0;
2199 	base->module = module;
2200 	base->fn_table = fn_table;
2201 	base->tailq = tailq;
2202 	base->claimed = false;
2203 	base->channel_size = channel_size;
2204 	base->ch_create_cb = ch_create_cb;
2205 	base->ch_destroy_cb = ch_destroy_cb;
2206 	base->base_free_fn = free_fn;
2207 
2208 	rc = spdk_bdev_open(bdev, false, remove_cb, bdev, &base->desc);
2209 	if (rc) {
2210 		spdk_bdev_part_base_free(base);
2211 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev));
2212 		return -1;
2213 	}
2214 
2215 	return 0;
2216 }
2217 
2218 int
2219 spdk_bdev_part_construct(struct spdk_bdev_part *part, struct spdk_bdev_part_base *base,
2220 			 char *name, uint64_t offset_blocks, uint64_t num_blocks,
2221 			 char *product_name)
2222 {
2223 	part->bdev.name = name;
2224 	part->bdev.blocklen = base->bdev->blocklen;
2225 	part->bdev.blockcnt = num_blocks;
2226 	part->offset_blocks = offset_blocks;
2227 
2228 	part->bdev.write_cache = base->bdev->write_cache;
2229 	part->bdev.need_aligned_buffer = base->bdev->need_aligned_buffer;
2230 	part->bdev.product_name = product_name;
2231 	part->bdev.ctxt = part;
2232 	part->bdev.module = base->module;
2233 	part->bdev.fn_table = base->fn_table;
2234 
2235 	__sync_fetch_and_add(&base->ref, 1);
2236 	part->base = base;
2237 
2238 	if (!base->claimed) {
2239 		int rc;
2240 
2241 		rc = spdk_bdev_module_claim_bdev(base->bdev, base->desc, base->module);
2242 		if (rc) {
2243 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(base->bdev));
2244 			free(part->bdev.name);
2245 			return -1;
2246 		}
2247 		base->claimed = true;
2248 	}
2249 
2250 	spdk_io_device_register(&part->base, spdk_bdev_part_channel_create_cb,
2251 				spdk_bdev_part_channel_destroy_cb,
2252 				base->channel_size);
2253 	spdk_vbdev_register(&part->bdev, &base->bdev, 1);
2254 	TAILQ_INSERT_TAIL(base->tailq, part, tailq);
2255 
2256 	return 0;
2257 }
2258 
2259 SPDK_LOG_REGISTER_TRACE_FLAG("bdev", SPDK_TRACE_BDEV)
2260