xref: /spdk/lib/bdev/bdev.c (revision 310f324e3846950532387a92345022b3e3cc97a3)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/event.h"
41 #include "spdk/io_channel.h"
42 #include "spdk/likely.h"
43 #include "spdk/queue.h"
44 #include "spdk/nvme_spec.h"
45 #include "spdk/scsi_spec.h"
46 #include "spdk/util.h"
47 
48 #include "spdk_internal/bdev.h"
49 #include "spdk_internal/log.h"
50 #include "spdk/string.h"
51 
52 #ifdef SPDK_CONFIG_VTUNE
53 #include "ittnotify.h"
54 #include "ittnotify_types.h"
55 int __itt_init_ittlib(const char *, __itt_group_id);
56 #endif
57 
58 #define SPDK_BDEV_IO_POOL_SIZE			(64 * 1024)
59 #define SPDK_BDEV_IO_CACHE_SIZE			256
60 #define BUF_SMALL_POOL_SIZE			8192
61 #define BUF_LARGE_POOL_SIZE			1024
62 #define NOMEM_THRESHOLD_COUNT			8
63 #define ZERO_BUFFER_SIZE			0x100000
64 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC		1000
65 #define SPDK_BDEV_SEC_TO_USEC			1000000ULL
66 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE	1
67 
68 typedef TAILQ_HEAD(, spdk_bdev_io) bdev_io_tailq_t;
69 typedef STAILQ_HEAD(, spdk_bdev_io) bdev_io_stailq_t;
70 
71 struct spdk_bdev_mgr {
72 	struct spdk_mempool *bdev_io_pool;
73 
74 	struct spdk_mempool *buf_small_pool;
75 	struct spdk_mempool *buf_large_pool;
76 
77 	void *zero_buffer;
78 
79 	TAILQ_HEAD(, spdk_bdev_module) bdev_modules;
80 
81 	TAILQ_HEAD(, spdk_bdev) bdevs;
82 
83 	bool init_complete;
84 	bool module_init_complete;
85 
86 #ifdef SPDK_CONFIG_VTUNE
87 	__itt_domain	*domain;
88 #endif
89 };
90 
91 static struct spdk_bdev_mgr g_bdev_mgr = {
92 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
93 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
94 	.init_complete = false,
95 	.module_init_complete = false,
96 };
97 
98 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
99 static void			*g_init_cb_arg = NULL;
100 
101 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
102 static void			*g_fini_cb_arg = NULL;
103 static struct spdk_thread	*g_fini_thread = NULL;
104 
105 
106 struct spdk_bdev_mgmt_channel {
107 	bdev_io_stailq_t need_buf_small;
108 	bdev_io_stailq_t need_buf_large;
109 
110 	/*
111 	 * Each thread keeps a cache of bdev_io - this allows
112 	 *  bdev threads which are *not* DPDK threads to still
113 	 *  benefit from a per-thread bdev_io cache.  Without
114 	 *  this, non-DPDK threads fetching from the mempool
115 	 *  incur a cmpxchg on get and put.
116 	 */
117 	bdev_io_stailq_t per_thread_cache;
118 	uint32_t	per_thread_cache_count;
119 
120 	TAILQ_HEAD(, spdk_bdev_module_channel) module_channels;
121 };
122 
123 struct spdk_bdev_desc {
124 	struct spdk_bdev		*bdev;
125 	spdk_bdev_remove_cb_t		remove_cb;
126 	void				*remove_ctx;
127 	bool				write;
128 	TAILQ_ENTRY(spdk_bdev_desc)	link;
129 };
130 
131 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
132 #define BDEV_CH_QOS_ENABLED		(1 << 1)
133 
134 struct spdk_bdev_channel {
135 	struct spdk_bdev	*bdev;
136 
137 	/* The channel for the underlying device */
138 	struct spdk_io_channel	*channel;
139 
140 	/* Channel for the bdev manager */
141 	struct spdk_io_channel	*mgmt_channel;
142 
143 	struct spdk_bdev_io_stat stat;
144 
145 	/*
146 	 * Count of I/O submitted through this channel and waiting for completion.
147 	 * Incremented before submit_request() is called on an spdk_bdev_io.
148 	 */
149 	uint64_t		io_outstanding;
150 
151 	bdev_io_tailq_t		queued_resets;
152 
153 	uint32_t		flags;
154 
155 	/*
156 	 * Rate limiting on this channel.
157 	 * Queue of IO awaiting issue because of a QoS rate limiting happened
158 	 *  on this channel.
159 	 */
160 	bdev_io_tailq_t		qos_io;
161 
162 	/*
163 	 * Rate limiting on this channel.
164 	 * Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and
165 	 *  only valid for the master channel which manages the outstanding IOs.
166 	 */
167 	uint64_t		qos_max_ios_per_timeslice;
168 
169 	/*
170 	 * Rate limiting on this channel.
171 	 * Submitted IO in one timeslice (e.g., 1ms)
172 	 */
173 	uint64_t		io_submitted_this_timeslice;
174 
175 	/*
176 	 * Rate limiting on this channel.
177 	 * Periodic running QoS poller in millisecond.
178 	 */
179 	struct spdk_poller	*qos_poller;
180 
181 	/* Per-device channel */
182 	struct spdk_bdev_module_channel *module_ch;
183 
184 #ifdef SPDK_CONFIG_VTUNE
185 	uint64_t		start_tsc;
186 	uint64_t		interval_tsc;
187 	__itt_string_handle	*handle;
188 #endif
189 
190 };
191 
192 #define __bdev_to_io_dev(bdev)		(((char *)bdev) + 1)
193 #define __bdev_from_io_dev(io_dev)	((struct spdk_bdev *)(((char *)io_dev) - 1))
194 
195 /*
196  * Per-module (or per-io_device) channel. Multiple bdevs built on the same io_device
197  * will queue here their IO that awaits retry. It makes it posible to retry sending
198  * IO to one bdev after IO from other bdev completes.
199  */
200 struct spdk_bdev_module_channel {
201 	/*
202 	 * Count of I/O submitted to bdev module and waiting for completion.
203 	 * Incremented before submit_request() is called on an spdk_bdev_io.
204 	 */
205 	uint64_t		io_outstanding;
206 
207 	/*
208 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
209 	 *  on this channel.
210 	 */
211 	bdev_io_tailq_t		nomem_io;
212 
213 	/*
214 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
215 	 */
216 	uint64_t		nomem_threshold;
217 
218 	/* I/O channel allocated by a bdev module */
219 	struct spdk_io_channel	*module_ch;
220 
221 	uint32_t		ref;
222 
223 	TAILQ_ENTRY(spdk_bdev_module_channel) link;
224 };
225 
226 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
227 
228 struct spdk_bdev *
229 spdk_bdev_first(void)
230 {
231 	struct spdk_bdev *bdev;
232 
233 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
234 	if (bdev) {
235 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
236 	}
237 
238 	return bdev;
239 }
240 
241 struct spdk_bdev *
242 spdk_bdev_next(struct spdk_bdev *prev)
243 {
244 	struct spdk_bdev *bdev;
245 
246 	bdev = TAILQ_NEXT(prev, link);
247 	if (bdev) {
248 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
249 	}
250 
251 	return bdev;
252 }
253 
254 static struct spdk_bdev *
255 _bdev_next_leaf(struct spdk_bdev *bdev)
256 {
257 	while (bdev != NULL) {
258 		if (bdev->claim_module == NULL) {
259 			return bdev;
260 		} else {
261 			bdev = TAILQ_NEXT(bdev, link);
262 		}
263 	}
264 
265 	return bdev;
266 }
267 
268 struct spdk_bdev *
269 spdk_bdev_first_leaf(void)
270 {
271 	struct spdk_bdev *bdev;
272 
273 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
274 
275 	if (bdev) {
276 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
277 	}
278 
279 	return bdev;
280 }
281 
282 struct spdk_bdev *
283 spdk_bdev_next_leaf(struct spdk_bdev *prev)
284 {
285 	struct spdk_bdev *bdev;
286 
287 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
288 
289 	if (bdev) {
290 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
291 	}
292 
293 	return bdev;
294 }
295 
296 struct spdk_bdev *
297 spdk_bdev_get_by_name(const char *bdev_name)
298 {
299 	struct spdk_bdev_alias *tmp;
300 	struct spdk_bdev *bdev = spdk_bdev_first();
301 
302 	while (bdev != NULL) {
303 		if (strcmp(bdev_name, bdev->name) == 0) {
304 			return bdev;
305 		}
306 
307 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
308 			if (strcmp(bdev_name, tmp->alias) == 0) {
309 				return bdev;
310 			}
311 		}
312 
313 		bdev = spdk_bdev_next(bdev);
314 	}
315 
316 	return NULL;
317 }
318 
319 static void
320 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
321 {
322 	assert(bdev_io->get_buf_cb != NULL);
323 	assert(buf != NULL);
324 	assert(bdev_io->u.bdev.iovs != NULL);
325 
326 	bdev_io->buf = buf;
327 	bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
328 	bdev_io->u.bdev.iovs[0].iov_len = bdev_io->buf_len;
329 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
330 }
331 
332 static void
333 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
334 {
335 	struct spdk_mempool *pool;
336 	struct spdk_bdev_io *tmp;
337 	void *buf;
338 	bdev_io_stailq_t *stailq;
339 	struct spdk_bdev_mgmt_channel *ch;
340 
341 	assert(bdev_io->u.bdev.iovcnt == 1);
342 
343 	buf = bdev_io->buf;
344 	ch = bdev_io->mgmt_ch;
345 
346 	if (bdev_io->buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
347 		pool = g_bdev_mgr.buf_small_pool;
348 		stailq = &ch->need_buf_small;
349 	} else {
350 		pool = g_bdev_mgr.buf_large_pool;
351 		stailq = &ch->need_buf_large;
352 	}
353 
354 	if (STAILQ_EMPTY(stailq)) {
355 		spdk_mempool_put(pool, buf);
356 	} else {
357 		tmp = STAILQ_FIRST(stailq);
358 		STAILQ_REMOVE_HEAD(stailq, buf_link);
359 		spdk_bdev_io_set_buf(tmp, buf);
360 	}
361 }
362 
363 void
364 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
365 {
366 	struct spdk_mempool *pool;
367 	bdev_io_stailq_t *stailq;
368 	void *buf = NULL;
369 	struct spdk_bdev_mgmt_channel *ch;
370 
371 	assert(cb != NULL);
372 	assert(bdev_io->u.bdev.iovs != NULL);
373 
374 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
375 		/* Buffer already present */
376 		cb(bdev_io->ch->channel, bdev_io);
377 		return;
378 	}
379 
380 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
381 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
382 
383 	bdev_io->buf_len = len;
384 	bdev_io->get_buf_cb = cb;
385 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
386 		pool = g_bdev_mgr.buf_small_pool;
387 		stailq = &ch->need_buf_small;
388 	} else {
389 		pool = g_bdev_mgr.buf_large_pool;
390 		stailq = &ch->need_buf_large;
391 	}
392 
393 	buf = spdk_mempool_get(pool);
394 
395 	if (!buf) {
396 		STAILQ_INSERT_TAIL(stailq, bdev_io, buf_link);
397 	} else {
398 		spdk_bdev_io_set_buf(bdev_io, buf);
399 	}
400 }
401 
402 static int
403 spdk_bdev_module_get_max_ctx_size(void)
404 {
405 	struct spdk_bdev_module *bdev_module;
406 	int max_bdev_module_size = 0;
407 
408 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
409 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
410 			max_bdev_module_size = bdev_module->get_ctx_size();
411 		}
412 	}
413 
414 	return max_bdev_module_size;
415 }
416 
417 void
418 spdk_bdev_config_text(FILE *fp)
419 {
420 	struct spdk_bdev_module *bdev_module;
421 
422 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
423 		if (bdev_module->config_text) {
424 			bdev_module->config_text(fp);
425 		}
426 	}
427 }
428 
429 void
430 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w)
431 {
432 	struct spdk_bdev_module *bdev_module;
433 	struct spdk_bdev *bdev;
434 
435 	assert(w != NULL);
436 
437 	spdk_json_write_array_begin(w);
438 
439 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
440 		if (bdev_module->config_json) {
441 			bdev_module->config_json(w);
442 		}
443 	}
444 
445 	TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, link) {
446 		spdk_bdev_config_json(bdev, w);
447 	}
448 
449 	spdk_json_write_array_end(w);
450 }
451 
452 static int
453 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
454 {
455 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
456 
457 	STAILQ_INIT(&ch->need_buf_small);
458 	STAILQ_INIT(&ch->need_buf_large);
459 
460 	STAILQ_INIT(&ch->per_thread_cache);
461 	ch->per_thread_cache_count = 0;
462 
463 	TAILQ_INIT(&ch->module_channels);
464 
465 	return 0;
466 }
467 
468 static void
469 spdk_bdev_mgmt_channel_free_resources(struct spdk_bdev_mgmt_channel *ch)
470 {
471 	struct spdk_bdev_io *bdev_io;
472 
473 	if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
474 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel free\n");
475 	}
476 
477 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
478 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
479 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link);
480 		ch->per_thread_cache_count--;
481 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
482 	}
483 
484 	assert(ch->per_thread_cache_count == 0);
485 }
486 
487 static void
488 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
489 {
490 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
491 
492 	spdk_bdev_mgmt_channel_free_resources(ch);
493 }
494 
495 static void
496 spdk_bdev_init_complete(int rc)
497 {
498 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
499 	void *cb_arg = g_init_cb_arg;
500 
501 	g_bdev_mgr.init_complete = true;
502 	g_init_cb_fn = NULL;
503 	g_init_cb_arg = NULL;
504 
505 	cb_fn(cb_arg, rc);
506 }
507 
508 static void
509 spdk_bdev_module_action_complete(void)
510 {
511 	struct spdk_bdev_module *m;
512 
513 	/*
514 	 * Don't finish bdev subsystem initialization if
515 	 * module pre-initialization is still in progress, or
516 	 * the subsystem been already initialized.
517 	 */
518 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
519 		return;
520 	}
521 
522 	/*
523 	 * Check all bdev modules for inits/examinations in progress. If any
524 	 * exist, return immediately since we cannot finish bdev subsystem
525 	 * initialization until all are completed.
526 	 */
527 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
528 		if (m->action_in_progress > 0) {
529 			return;
530 		}
531 	}
532 
533 	/*
534 	 * Modules already finished initialization - now that all
535 	 * the bdev modules have finished their asynchronous I/O
536 	 * processing, the entire bdev layer can be marked as complete.
537 	 */
538 	spdk_bdev_init_complete(0);
539 }
540 
541 static void
542 spdk_bdev_module_action_done(struct spdk_bdev_module *module)
543 {
544 	assert(module->action_in_progress > 0);
545 	module->action_in_progress--;
546 	spdk_bdev_module_action_complete();
547 }
548 
549 void
550 spdk_bdev_module_init_done(struct spdk_bdev_module *module)
551 {
552 	spdk_bdev_module_action_done(module);
553 }
554 
555 void
556 spdk_bdev_module_examine_done(struct spdk_bdev_module *module)
557 {
558 	spdk_bdev_module_action_done(module);
559 }
560 
561 static int
562 spdk_bdev_modules_init(void)
563 {
564 	struct spdk_bdev_module *module;
565 	int rc = 0;
566 
567 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
568 		rc = module->module_init();
569 		if (rc != 0) {
570 			break;
571 		}
572 	}
573 
574 	g_bdev_mgr.module_init_complete = true;
575 	return rc;
576 }
577 void
578 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
579 {
580 	int cache_size;
581 	int rc = 0;
582 	char mempool_name[32];
583 
584 	assert(cb_fn != NULL);
585 
586 	g_init_cb_fn = cb_fn;
587 	g_init_cb_arg = cb_arg;
588 
589 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
590 
591 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
592 				  SPDK_BDEV_IO_POOL_SIZE,
593 				  sizeof(struct spdk_bdev_io) +
594 				  spdk_bdev_module_get_max_ctx_size(),
595 				  0,
596 				  SPDK_ENV_SOCKET_ID_ANY);
597 
598 	if (g_bdev_mgr.bdev_io_pool == NULL) {
599 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
600 		spdk_bdev_init_complete(-1);
601 		return;
602 	}
603 
604 	/**
605 	 * Ensure no more than half of the total buffers end up local caches, by
606 	 *   using spdk_env_get_core_count() to determine how many local caches we need
607 	 *   to account for.
608 	 */
609 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
610 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
611 
612 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
613 				    BUF_SMALL_POOL_SIZE,
614 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
615 				    cache_size,
616 				    SPDK_ENV_SOCKET_ID_ANY);
617 	if (!g_bdev_mgr.buf_small_pool) {
618 		SPDK_ERRLOG("create rbuf small pool failed\n");
619 		spdk_bdev_init_complete(-1);
620 		return;
621 	}
622 
623 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
624 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
625 
626 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
627 				    BUF_LARGE_POOL_SIZE,
628 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
629 				    cache_size,
630 				    SPDK_ENV_SOCKET_ID_ANY);
631 	if (!g_bdev_mgr.buf_large_pool) {
632 		SPDK_ERRLOG("create rbuf large pool failed\n");
633 		spdk_bdev_init_complete(-1);
634 		return;
635 	}
636 
637 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
638 				 NULL);
639 	if (!g_bdev_mgr.zero_buffer) {
640 		SPDK_ERRLOG("create bdev zero buffer failed\n");
641 		spdk_bdev_init_complete(-1);
642 		return;
643 	}
644 
645 #ifdef SPDK_CONFIG_VTUNE
646 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
647 #endif
648 
649 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
650 				spdk_bdev_mgmt_channel_destroy,
651 				sizeof(struct spdk_bdev_mgmt_channel));
652 
653 	rc = spdk_bdev_modules_init();
654 	if (rc != 0) {
655 		SPDK_ERRLOG("bdev modules init failed\n");
656 		spdk_bdev_init_complete(-1);
657 		return;
658 	}
659 
660 	spdk_bdev_module_action_complete();
661 }
662 
663 static void
664 spdk_bdev_module_finish_cb(void *io_device)
665 {
666 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
667 
668 	cb_fn(g_fini_cb_arg);
669 	g_fini_cb_fn = NULL;
670 	g_fini_cb_arg = NULL;
671 }
672 
673 static void
674 spdk_bdev_module_finish_complete(struct spdk_io_channel_iter *i, int status)
675 {
676 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
677 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
678 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
679 			    SPDK_BDEV_IO_POOL_SIZE);
680 	}
681 
682 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
683 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
684 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
685 			    BUF_SMALL_POOL_SIZE);
686 		assert(false);
687 	}
688 
689 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
690 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
691 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
692 			    BUF_LARGE_POOL_SIZE);
693 		assert(false);
694 	}
695 
696 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
697 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
698 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
699 	spdk_dma_free(g_bdev_mgr.zero_buffer);
700 
701 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_module_finish_cb);
702 }
703 
704 static void
705 mgmt_channel_free_resources(struct spdk_io_channel_iter *i)
706 {
707 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
708 	struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(_ch);
709 
710 	spdk_bdev_mgmt_channel_free_resources(ch);
711 	spdk_for_each_channel_continue(i, 0);
712 }
713 
714 static void
715 spdk_bdev_module_finish_iter(void *arg)
716 {
717 	/* Notice that this variable is static. It is saved between calls to
718 	 * this function. */
719 	static struct spdk_bdev_module *resume_bdev_module = NULL;
720 	struct spdk_bdev_module *bdev_module;
721 
722 	/* Start iterating from the last touched module */
723 	if (!resume_bdev_module) {
724 		bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules);
725 	} else {
726 		bdev_module = TAILQ_NEXT(resume_bdev_module, tailq);
727 	}
728 
729 	while (bdev_module) {
730 		if (bdev_module->async_fini) {
731 			/* Save our place so we can resume later. We must
732 			 * save the variable here, before calling module_fini()
733 			 * below, because in some cases the module may immediately
734 			 * call spdk_bdev_module_finish_done() and re-enter
735 			 * this function to continue iterating. */
736 			resume_bdev_module = bdev_module;
737 		}
738 
739 		if (bdev_module->module_fini) {
740 			bdev_module->module_fini();
741 		}
742 
743 		if (bdev_module->async_fini) {
744 			return;
745 		}
746 
747 		bdev_module = TAILQ_NEXT(bdev_module, tailq);
748 	}
749 
750 	resume_bdev_module = NULL;
751 	spdk_for_each_channel(&g_bdev_mgr, mgmt_channel_free_resources, NULL,
752 			      spdk_bdev_module_finish_complete);
753 }
754 
755 void
756 spdk_bdev_module_finish_done(void)
757 {
758 	if (spdk_get_thread() != g_fini_thread) {
759 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
760 	} else {
761 		spdk_bdev_module_finish_iter(NULL);
762 	}
763 }
764 
765 static void
766 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
767 {
768 	struct spdk_bdev *bdev = cb_arg;
769 
770 	if (bdeverrno && bdev) {
771 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
772 			     bdev->name);
773 
774 		/*
775 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
776 		 *  bdev; try to continue by manually removing this bdev from the list and continue
777 		 *  with the next bdev in the list.
778 		 */
779 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
780 	}
781 
782 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
783 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
784 		/*
785 		 * Bdev module finish need to be deffered as we might be in the middle of some context
786 		 * (like bdev part free) that will use this bdev (or private bdev driver ctx data)
787 		 * after returning.
788 		 */
789 		spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL);
790 		return;
791 	}
792 
793 	/*
794 	 * Unregister the first bdev in the list.
795 	 *
796 	 * spdk_bdev_unregister() will handle the case where the bdev has open descriptors by
797 	 *  calling the remove_cb of the descriptors first.
798 	 *
799 	 * Once this bdev and all of its open descriptors have been cleaned up, this function
800 	 *  will be called again via the unregister completion callback to continue the cleanup
801 	 *  process with the next bdev.
802 	 */
803 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
804 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
805 	spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
806 }
807 
808 static void
809 _spdk_bdev_finish_unregister_bdevs(void)
810 {
811 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
812 }
813 
814 void
815 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
816 {
817 	assert(cb_fn != NULL);
818 
819 	g_fini_thread = spdk_get_thread();
820 
821 	g_fini_cb_fn = cb_fn;
822 	g_fini_cb_arg = cb_arg;
823 
824 	_spdk_bdev_finish_unregister_bdevs();
825 }
826 
827 static struct spdk_bdev_io *
828 spdk_bdev_get_io(struct spdk_io_channel *_ch)
829 {
830 	struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(_ch);
831 	struct spdk_bdev_io *bdev_io;
832 
833 	if (ch->per_thread_cache_count > 0) {
834 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
835 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link);
836 		ch->per_thread_cache_count--;
837 	} else {
838 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
839 		if (!bdev_io) {
840 			SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
841 			return NULL;
842 		}
843 	}
844 
845 	bdev_io->mgmt_ch = ch;
846 
847 	return bdev_io;
848 }
849 
850 static void
851 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
852 {
853 	struct spdk_bdev_mgmt_channel *ch = bdev_io->mgmt_ch;
854 
855 	if (bdev_io->buf != NULL) {
856 		spdk_bdev_io_put_buf(bdev_io);
857 	}
858 
859 	if (ch->per_thread_cache_count < SPDK_BDEV_IO_CACHE_SIZE) {
860 		ch->per_thread_cache_count++;
861 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, buf_link);
862 	} else {
863 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
864 	}
865 }
866 
867 static void
868 _spdk_bdev_qos_io_submit(void *ctx)
869 {
870 	struct spdk_bdev_channel	*ch = ctx;
871 	struct spdk_bdev_io		*bdev_io = NULL;
872 	struct spdk_bdev		*bdev = ch->bdev;
873 	struct spdk_bdev_module_channel *shared_ch = ch->module_ch;
874 
875 	while (!TAILQ_EMPTY(&ch->qos_io)) {
876 		if (ch->io_submitted_this_timeslice < ch->qos_max_ios_per_timeslice) {
877 			bdev_io = TAILQ_FIRST(&ch->qos_io);
878 			TAILQ_REMOVE(&ch->qos_io, bdev_io, link);
879 			ch->io_submitted_this_timeslice++;
880 			ch->io_outstanding++;
881 			shared_ch->io_outstanding++;
882 			bdev->fn_table->submit_request(ch->channel, bdev_io);
883 		} else {
884 			break;
885 		}
886 	}
887 }
888 
889 static void
890 _spdk_bdev_io_submit(void *ctx)
891 {
892 	struct spdk_bdev_io *bdev_io = ctx;
893 	struct spdk_bdev *bdev = bdev_io->bdev;
894 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
895 	struct spdk_io_channel *ch = bdev_ch->channel;
896 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
897 
898 	bdev_io->submit_tsc = spdk_get_ticks();
899 	bdev_ch->io_outstanding++;
900 	shared_ch->io_outstanding++;
901 	bdev_io->in_submit_request = true;
902 	if (spdk_likely(bdev_ch->flags == 0)) {
903 		if (spdk_likely(TAILQ_EMPTY(&shared_ch->nomem_io))) {
904 			bdev->fn_table->submit_request(ch, bdev_io);
905 		} else {
906 			bdev_ch->io_outstanding--;
907 			shared_ch->io_outstanding--;
908 			TAILQ_INSERT_TAIL(&shared_ch->nomem_io, bdev_io, link);
909 		}
910 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
911 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
912 	} else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) {
913 		bdev_ch->io_outstanding--;
914 		shared_ch->io_outstanding--;
915 		TAILQ_INSERT_TAIL(&bdev_ch->qos_io, bdev_io, link);
916 		_spdk_bdev_qos_io_submit(bdev_ch);
917 	} else {
918 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
919 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
920 	}
921 	bdev_io->in_submit_request = false;
922 }
923 
924 static void
925 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
926 {
927 	struct spdk_bdev *bdev = bdev_io->bdev;
928 
929 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
930 
931 	/* QoS channel and thread have been properly configured */
932 	if (bdev->ios_per_sec > 0 && bdev->qos_channel && bdev->qos_thread) {
933 		bdev_io->io_submit_ch = bdev_io->ch;
934 		bdev_io->ch = bdev->qos_channel;
935 		spdk_thread_send_msg(bdev->qos_thread, _spdk_bdev_io_submit, bdev_io);
936 	} else {
937 		_spdk_bdev_io_submit(bdev_io);
938 	}
939 }
940 
941 static void
942 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
943 {
944 	struct spdk_bdev *bdev = bdev_io->bdev;
945 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
946 	struct spdk_io_channel *ch = bdev_ch->channel;
947 
948 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
949 
950 	bdev_io->in_submit_request = true;
951 	bdev->fn_table->submit_request(ch, bdev_io);
952 	bdev_io->in_submit_request = false;
953 }
954 
955 static void
956 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
957 		  struct spdk_bdev *bdev, void *cb_arg,
958 		  spdk_bdev_io_completion_cb cb)
959 {
960 	bdev_io->bdev = bdev;
961 	bdev_io->caller_ctx = cb_arg;
962 	bdev_io->cb = cb;
963 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
964 	bdev_io->in_submit_request = false;
965 	bdev_io->buf = NULL;
966 	bdev_io->io_submit_ch = NULL;
967 }
968 
969 bool
970 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
971 {
972 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
973 }
974 
975 int
976 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
977 {
978 	if (bdev->fn_table->dump_info_json) {
979 		return bdev->fn_table->dump_info_json(bdev->ctxt, w);
980 	}
981 
982 	return 0;
983 }
984 
985 void
986 spdk_bdev_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
987 {
988 	assert(bdev != NULL);
989 	assert(w != NULL);
990 
991 	if (bdev->fn_table->write_config_json) {
992 		bdev->fn_table->write_config_json(bdev, w);
993 	} else {
994 		spdk_json_write_object_begin(w);
995 		spdk_json_write_named_string(w, "name", bdev->name);
996 		spdk_json_write_object_end(w);
997 	}
998 }
999 
1000 static void
1001 spdk_bdev_qos_get_max_ios_per_timeslice(struct spdk_bdev_channel *qos_ch)
1002 {
1003 	uint64_t		qos_max_ios_per_timeslice = 0;
1004 	struct spdk_bdev	*bdev = qos_ch->bdev;
1005 
1006 	qos_max_ios_per_timeslice = bdev->ios_per_sec * SPDK_BDEV_QOS_TIMESLICE_IN_USEC /
1007 				    SPDK_BDEV_SEC_TO_USEC;
1008 	qos_ch->qos_max_ios_per_timeslice = spdk_max(qos_max_ios_per_timeslice,
1009 					    SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE);
1010 }
1011 
1012 static int
1013 spdk_bdev_channel_poll_qos(void *arg)
1014 {
1015 	struct spdk_bdev_channel	*ch = arg;
1016 
1017 	/* Reset for next round of rate limiting */
1018 	ch->io_submitted_this_timeslice = 0;
1019 	spdk_bdev_qos_get_max_ios_per_timeslice(ch);
1020 
1021 	_spdk_bdev_qos_io_submit(ch);
1022 
1023 	return -1;
1024 }
1025 
1026 static int
1027 _spdk_bdev_channel_create(struct spdk_bdev_channel *ch, void *io_device)
1028 {
1029 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1030 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1031 	struct spdk_bdev_module_channel	*shared_ch;
1032 
1033 	ch->bdev = bdev;
1034 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
1035 	if (!ch->channel) {
1036 		return -1;
1037 	}
1038 
1039 	ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr);
1040 	if (!ch->mgmt_channel) {
1041 		return -1;
1042 	}
1043 
1044 	mgmt_ch = spdk_io_channel_get_ctx(ch->mgmt_channel);
1045 	TAILQ_FOREACH(shared_ch, &mgmt_ch->module_channels, link) {
1046 		if (shared_ch->module_ch == ch->channel) {
1047 			shared_ch->ref++;
1048 			break;
1049 		}
1050 	}
1051 
1052 	if (shared_ch == NULL) {
1053 		shared_ch = calloc(1, sizeof(*shared_ch));
1054 		if (!shared_ch) {
1055 			return -1;
1056 		}
1057 
1058 		shared_ch->io_outstanding = 0;
1059 		TAILQ_INIT(&shared_ch->nomem_io);
1060 		shared_ch->nomem_threshold = 0;
1061 		shared_ch->module_ch = ch->channel;
1062 		shared_ch->ref = 1;
1063 		TAILQ_INSERT_TAIL(&mgmt_ch->module_channels, shared_ch, link);
1064 	}
1065 
1066 	memset(&ch->stat, 0, sizeof(ch->stat));
1067 	ch->io_outstanding = 0;
1068 	TAILQ_INIT(&ch->queued_resets);
1069 	TAILQ_INIT(&ch->qos_io);
1070 	ch->qos_max_ios_per_timeslice = 0;
1071 	ch->io_submitted_this_timeslice = 0;
1072 	ch->qos_poller = NULL;
1073 	ch->flags = 0;
1074 	ch->module_ch = shared_ch;
1075 
1076 	return 0;
1077 }
1078 
1079 static void
1080 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
1081 {
1082 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1083 	struct spdk_bdev_module_channel	*shared_ch = NULL;
1084 
1085 	if (!ch) {
1086 		return;
1087 	}
1088 
1089 	if (ch->channel) {
1090 		spdk_put_io_channel(ch->channel);
1091 	}
1092 
1093 	if (ch->mgmt_channel) {
1094 		shared_ch = ch->module_ch;
1095 		if (shared_ch) {
1096 			assert(ch->io_outstanding == 0);
1097 			assert(shared_ch->ref > 0);
1098 			shared_ch->ref--;
1099 			if (shared_ch->ref == 0) {
1100 				mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
1101 				assert(shared_ch->io_outstanding == 0);
1102 				TAILQ_REMOVE(&mgmt_channel->module_channels, shared_ch, link);
1103 				free(shared_ch);
1104 			}
1105 		}
1106 		spdk_put_io_channel(ch->mgmt_channel);
1107 	}
1108 }
1109 
1110 /* Caller must hold bdev->mutex. */
1111 static int
1112 spdk_bdev_qos_channel_create(struct spdk_bdev *bdev)
1113 {
1114 	assert(bdev->qos_channel == NULL);
1115 	assert(bdev->qos_thread == NULL);
1116 
1117 	bdev->qos_channel = calloc(1, sizeof(struct spdk_bdev_channel));
1118 	if (!bdev->qos_channel) {
1119 		return -1;
1120 	}
1121 
1122 	bdev->qos_thread = spdk_get_thread();
1123 	if (!bdev->qos_thread) {
1124 		free(bdev->qos_channel);
1125 		bdev->qos_channel = NULL;
1126 		return -1;
1127 	}
1128 
1129 	if (_spdk_bdev_channel_create(bdev->qos_channel, __bdev_to_io_dev(bdev)) != 0) {
1130 		free(bdev->qos_channel);
1131 		bdev->qos_channel = NULL;
1132 		bdev->qos_thread = NULL;
1133 		return -1;
1134 	}
1135 
1136 	bdev->qos_channel->flags |= BDEV_CH_QOS_ENABLED;
1137 	spdk_bdev_qos_get_max_ios_per_timeslice(bdev->qos_channel);
1138 	bdev->qos_channel->qos_poller = spdk_poller_register(
1139 						spdk_bdev_channel_poll_qos,
1140 						bdev->qos_channel,
1141 						SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
1142 
1143 	return 0;
1144 }
1145 
1146 static int
1147 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
1148 {
1149 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1150 	struct spdk_bdev_channel	*ch = ctx_buf;
1151 
1152 	if (_spdk_bdev_channel_create(ch, io_device) != 0) {
1153 		_spdk_bdev_channel_destroy_resource(ch);
1154 		return -1;
1155 	}
1156 
1157 #ifdef SPDK_CONFIG_VTUNE
1158 	{
1159 		char *name;
1160 		__itt_init_ittlib(NULL, 0);
1161 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
1162 		if (!name) {
1163 			_spdk_bdev_channel_destroy_resource(ch);
1164 			return -1;
1165 		}
1166 		ch->handle = __itt_string_handle_create(name);
1167 		free(name);
1168 		ch->start_tsc = spdk_get_ticks();
1169 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
1170 	}
1171 #endif
1172 
1173 	pthread_mutex_lock(&bdev->mutex);
1174 
1175 	/* Rate limiting on this bdev enabled */
1176 	if (bdev->ios_per_sec > 0 && bdev->qos_channel == NULL) {
1177 		if (spdk_bdev_qos_channel_create(bdev) != 0) {
1178 			_spdk_bdev_channel_destroy_resource(ch);
1179 			pthread_mutex_unlock(&bdev->mutex);
1180 			return -1;
1181 		}
1182 	}
1183 
1184 	bdev->channel_count++;
1185 
1186 	pthread_mutex_unlock(&bdev->mutex);
1187 
1188 	return 0;
1189 }
1190 
1191 /*
1192  * Abort I/O that are waiting on a data buffer.  These types of I/O are
1193  *  linked using the spdk_bdev_io buf_link TAILQ_ENTRY.
1194  */
1195 static void
1196 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
1197 {
1198 	bdev_io_stailq_t tmp;
1199 	struct spdk_bdev_io *bdev_io;
1200 
1201 	STAILQ_INIT(&tmp);
1202 
1203 	while (!STAILQ_EMPTY(queue)) {
1204 		bdev_io = STAILQ_FIRST(queue);
1205 		STAILQ_REMOVE_HEAD(queue, buf_link);
1206 		if (bdev_io->ch == ch) {
1207 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1208 		} else {
1209 			STAILQ_INSERT_TAIL(&tmp, bdev_io, buf_link);
1210 		}
1211 	}
1212 
1213 	STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
1214 }
1215 
1216 /*
1217  * Abort I/O that are queued waiting for submission.  These types of I/O are
1218  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
1219  */
1220 static void
1221 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
1222 {
1223 	struct spdk_bdev_io *bdev_io, *tmp;
1224 
1225 	TAILQ_FOREACH_SAFE(bdev_io, queue, link, tmp) {
1226 		if (bdev_io->ch == ch) {
1227 			TAILQ_REMOVE(queue, bdev_io, link);
1228 			/*
1229 			 * spdk_bdev_io_complete() assumes that the completed I/O had
1230 			 *  been submitted to the bdev module.  Since in this case it
1231 			 *  hadn't, bump io_outstanding to account for the decrement
1232 			 *  that spdk_bdev_io_complete() will do.
1233 			 */
1234 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
1235 				ch->io_outstanding++;
1236 				ch->module_ch->io_outstanding++;
1237 			}
1238 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1239 		}
1240 	}
1241 }
1242 
1243 static void
1244 _spdk_bdev_channel_destroy(struct spdk_bdev_channel *ch)
1245 {
1246 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1247 	struct spdk_bdev_module_channel	*shared_ch = ch->module_ch;
1248 
1249 	mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
1250 
1251 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
1252 	_spdk_bdev_abort_queued_io(&ch->qos_io, ch);
1253 	_spdk_bdev_abort_queued_io(&shared_ch->nomem_io, ch);
1254 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, ch);
1255 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, ch);
1256 
1257 	_spdk_bdev_channel_destroy_resource(ch);
1258 }
1259 
1260 static void
1261 spdk_bdev_qos_channel_destroy(void *ctx)
1262 {
1263 	struct spdk_bdev_channel *qos_channel = ctx;
1264 
1265 	_spdk_bdev_channel_destroy(qos_channel);
1266 
1267 	spdk_poller_unregister(&qos_channel->qos_poller);
1268 	free(qos_channel);
1269 }
1270 
1271 static void
1272 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
1273 {
1274 	struct spdk_bdev_channel	*ch = ctx_buf;
1275 	struct spdk_bdev		*bdev = ch->bdev;
1276 
1277 	_spdk_bdev_channel_destroy(ch);
1278 
1279 	pthread_mutex_lock(&bdev->mutex);
1280 	bdev->channel_count--;
1281 	if (bdev->channel_count == 0 && bdev->qos_channel != NULL) {
1282 		/* All I/O channels for this bdev have been destroyed - destroy the QoS channel. */
1283 		spdk_thread_send_msg(bdev->qos_thread, spdk_bdev_qos_channel_destroy,
1284 				     bdev->qos_channel);
1285 
1286 		/*
1287 		 * Set qos_channel to NULL within the critical section so that
1288 		 * if another channel is created, it will see qos_channel == NULL and
1289 		 * re-create the QoS channel even if the asynchronous qos_channel_destroy
1290 		 * isn't finished yet.
1291 		 */
1292 		bdev->qos_channel = NULL;
1293 		bdev->qos_thread = NULL;
1294 	}
1295 	pthread_mutex_unlock(&bdev->mutex);
1296 }
1297 
1298 int
1299 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
1300 {
1301 	struct spdk_bdev_alias *tmp;
1302 
1303 	if (alias == NULL) {
1304 		SPDK_ERRLOG("Empty alias passed\n");
1305 		return -EINVAL;
1306 	}
1307 
1308 	if (spdk_bdev_get_by_name(alias)) {
1309 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
1310 		return -EEXIST;
1311 	}
1312 
1313 	tmp = calloc(1, sizeof(*tmp));
1314 	if (tmp == NULL) {
1315 		SPDK_ERRLOG("Unable to allocate alias\n");
1316 		return -ENOMEM;
1317 	}
1318 
1319 	tmp->alias = strdup(alias);
1320 	if (tmp->alias == NULL) {
1321 		free(tmp);
1322 		SPDK_ERRLOG("Unable to allocate alias\n");
1323 		return -ENOMEM;
1324 	}
1325 
1326 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
1327 
1328 	return 0;
1329 }
1330 
1331 int
1332 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
1333 {
1334 	struct spdk_bdev_alias *tmp;
1335 
1336 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
1337 		if (strcmp(alias, tmp->alias) == 0) {
1338 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
1339 			free(tmp->alias);
1340 			free(tmp);
1341 			return 0;
1342 		}
1343 	}
1344 
1345 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1346 
1347 	return -ENOENT;
1348 }
1349 
1350 struct spdk_io_channel *
1351 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1352 {
1353 	return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev));
1354 }
1355 
1356 const char *
1357 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1358 {
1359 	return bdev->name;
1360 }
1361 
1362 const char *
1363 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1364 {
1365 	return bdev->product_name;
1366 }
1367 
1368 const struct spdk_bdev_aliases_list *
1369 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1370 {
1371 	return &bdev->aliases;
1372 }
1373 
1374 uint32_t
1375 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1376 {
1377 	return bdev->blocklen;
1378 }
1379 
1380 uint64_t
1381 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
1382 {
1383 	return bdev->blockcnt;
1384 }
1385 
1386 size_t
1387 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
1388 {
1389 	/* TODO: push this logic down to the bdev modules */
1390 	if (bdev->need_aligned_buffer) {
1391 		return bdev->blocklen;
1392 	}
1393 
1394 	return 1;
1395 }
1396 
1397 uint32_t
1398 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
1399 {
1400 	return bdev->optimal_io_boundary;
1401 }
1402 
1403 bool
1404 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
1405 {
1406 	return bdev->write_cache;
1407 }
1408 
1409 const struct spdk_uuid *
1410 spdk_bdev_get_uuid(const struct spdk_bdev *bdev)
1411 {
1412 	return &bdev->uuid;
1413 }
1414 
1415 int
1416 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
1417 {
1418 	int ret;
1419 
1420 	pthread_mutex_lock(&bdev->mutex);
1421 
1422 	/* bdev has open descriptors */
1423 	if (!TAILQ_EMPTY(&bdev->open_descs) &&
1424 	    bdev->blockcnt > size) {
1425 		ret = -EBUSY;
1426 	} else {
1427 		bdev->blockcnt = size;
1428 		ret = 0;
1429 	}
1430 
1431 	pthread_mutex_unlock(&bdev->mutex);
1432 
1433 	return ret;
1434 }
1435 
1436 /*
1437  * Convert I/O offset and length from bytes to blocks.
1438  *
1439  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
1440  */
1441 static uint64_t
1442 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
1443 			  uint64_t num_bytes, uint64_t *num_blocks)
1444 {
1445 	uint32_t block_size = bdev->blocklen;
1446 
1447 	*offset_blocks = offset_bytes / block_size;
1448 	*num_blocks = num_bytes / block_size;
1449 
1450 	return (offset_bytes % block_size) | (num_bytes % block_size);
1451 }
1452 
1453 static bool
1454 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
1455 {
1456 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
1457 	 * has been an overflow and hence the offset has been wrapped around */
1458 	if (offset_blocks + num_blocks < offset_blocks) {
1459 		return false;
1460 	}
1461 
1462 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
1463 	if (offset_blocks + num_blocks > bdev->blockcnt) {
1464 		return false;
1465 	}
1466 
1467 	return true;
1468 }
1469 
1470 int
1471 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1472 	       void *buf, uint64_t offset, uint64_t nbytes,
1473 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
1474 {
1475 	uint64_t offset_blocks, num_blocks;
1476 
1477 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1478 		return -EINVAL;
1479 	}
1480 
1481 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1482 }
1483 
1484 int
1485 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1486 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1487 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
1488 {
1489 	struct spdk_bdev *bdev = desc->bdev;
1490 	struct spdk_bdev_io *bdev_io;
1491 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1492 
1493 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1494 		return -EINVAL;
1495 	}
1496 
1497 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1498 	if (!bdev_io) {
1499 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1500 		return -ENOMEM;
1501 	}
1502 
1503 	bdev_io->ch = channel;
1504 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1505 	bdev_io->u.bdev.iov.iov_base = buf;
1506 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1507 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1508 	bdev_io->u.bdev.iovcnt = 1;
1509 	bdev_io->u.bdev.num_blocks = num_blocks;
1510 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1511 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1512 
1513 	spdk_bdev_io_submit(bdev_io);
1514 	return 0;
1515 }
1516 
1517 int
1518 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1519 		struct iovec *iov, int iovcnt,
1520 		uint64_t offset, uint64_t nbytes,
1521 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1522 {
1523 	uint64_t offset_blocks, num_blocks;
1524 
1525 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1526 		return -EINVAL;
1527 	}
1528 
1529 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1530 }
1531 
1532 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1533 			   struct iovec *iov, int iovcnt,
1534 			   uint64_t offset_blocks, uint64_t num_blocks,
1535 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1536 {
1537 	struct spdk_bdev *bdev = desc->bdev;
1538 	struct spdk_bdev_io *bdev_io;
1539 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1540 
1541 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1542 		return -EINVAL;
1543 	}
1544 
1545 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1546 	if (!bdev_io) {
1547 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1548 		return -ENOMEM;
1549 	}
1550 
1551 	bdev_io->ch = channel;
1552 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1553 	bdev_io->u.bdev.iovs = iov;
1554 	bdev_io->u.bdev.iovcnt = iovcnt;
1555 	bdev_io->u.bdev.num_blocks = num_blocks;
1556 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1557 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1558 
1559 	spdk_bdev_io_submit(bdev_io);
1560 	return 0;
1561 }
1562 
1563 int
1564 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1565 		void *buf, uint64_t offset, uint64_t nbytes,
1566 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1567 {
1568 	uint64_t offset_blocks, num_blocks;
1569 
1570 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1571 		return -EINVAL;
1572 	}
1573 
1574 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1575 }
1576 
1577 int
1578 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1579 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1580 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1581 {
1582 	struct spdk_bdev *bdev = desc->bdev;
1583 	struct spdk_bdev_io *bdev_io;
1584 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1585 
1586 	if (!desc->write) {
1587 		return -EBADF;
1588 	}
1589 
1590 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1591 		return -EINVAL;
1592 	}
1593 
1594 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1595 	if (!bdev_io) {
1596 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
1597 		return -ENOMEM;
1598 	}
1599 
1600 	bdev_io->ch = channel;
1601 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1602 	bdev_io->u.bdev.iov.iov_base = buf;
1603 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1604 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1605 	bdev_io->u.bdev.iovcnt = 1;
1606 	bdev_io->u.bdev.num_blocks = num_blocks;
1607 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1608 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1609 
1610 	spdk_bdev_io_submit(bdev_io);
1611 	return 0;
1612 }
1613 
1614 int
1615 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1616 		 struct iovec *iov, int iovcnt,
1617 		 uint64_t offset, uint64_t len,
1618 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
1619 {
1620 	uint64_t offset_blocks, num_blocks;
1621 
1622 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1623 		return -EINVAL;
1624 	}
1625 
1626 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1627 }
1628 
1629 int
1630 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1631 			struct iovec *iov, int iovcnt,
1632 			uint64_t offset_blocks, uint64_t num_blocks,
1633 			spdk_bdev_io_completion_cb cb, void *cb_arg)
1634 {
1635 	struct spdk_bdev *bdev = desc->bdev;
1636 	struct spdk_bdev_io *bdev_io;
1637 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1638 
1639 	if (!desc->write) {
1640 		return -EBADF;
1641 	}
1642 
1643 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1644 		return -EINVAL;
1645 	}
1646 
1647 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1648 	if (!bdev_io) {
1649 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
1650 		return -ENOMEM;
1651 	}
1652 
1653 	bdev_io->ch = channel;
1654 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1655 	bdev_io->u.bdev.iovs = iov;
1656 	bdev_io->u.bdev.iovcnt = iovcnt;
1657 	bdev_io->u.bdev.num_blocks = num_blocks;
1658 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1659 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1660 
1661 	spdk_bdev_io_submit(bdev_io);
1662 	return 0;
1663 }
1664 
1665 int
1666 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1667 		       uint64_t offset, uint64_t len,
1668 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1669 {
1670 	uint64_t offset_blocks, num_blocks;
1671 
1672 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1673 		return -EINVAL;
1674 	}
1675 
1676 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1677 }
1678 
1679 int
1680 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1681 			      uint64_t offset_blocks, uint64_t num_blocks,
1682 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1683 {
1684 	struct spdk_bdev *bdev = desc->bdev;
1685 	struct spdk_bdev_io *bdev_io;
1686 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1687 	uint64_t len;
1688 	bool split_request = false;
1689 
1690 	if (num_blocks > UINT64_MAX / spdk_bdev_get_block_size(bdev)) {
1691 		SPDK_ERRLOG("length argument out of range in write_zeroes\n");
1692 		return -ERANGE;
1693 	}
1694 
1695 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1696 		return -EINVAL;
1697 	}
1698 
1699 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1700 
1701 	if (!bdev_io) {
1702 		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
1703 		return -ENOMEM;
1704 	}
1705 
1706 	bdev_io->ch = channel;
1707 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1708 
1709 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1710 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1711 		bdev_io->u.bdev.num_blocks = num_blocks;
1712 		bdev_io->u.bdev.iovs = NULL;
1713 		bdev_io->u.bdev.iovcnt = 0;
1714 
1715 	} else {
1716 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
1717 
1718 		len = spdk_bdev_get_block_size(bdev) * num_blocks;
1719 
1720 		if (len > ZERO_BUFFER_SIZE) {
1721 			split_request = true;
1722 			len = ZERO_BUFFER_SIZE;
1723 		}
1724 
1725 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1726 		bdev_io->u.bdev.iov.iov_base = g_bdev_mgr.zero_buffer;
1727 		bdev_io->u.bdev.iov.iov_len = len;
1728 		bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1729 		bdev_io->u.bdev.iovcnt = 1;
1730 		bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev);
1731 		bdev_io->u.bdev.split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks;
1732 		bdev_io->u.bdev.split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks;
1733 	}
1734 
1735 	if (split_request) {
1736 		bdev_io->u.bdev.stored_user_cb = cb;
1737 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split);
1738 	} else {
1739 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1740 	}
1741 	spdk_bdev_io_submit(bdev_io);
1742 	return 0;
1743 }
1744 
1745 int
1746 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1747 		uint64_t offset, uint64_t nbytes,
1748 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1749 {
1750 	uint64_t offset_blocks, num_blocks;
1751 
1752 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1753 		return -EINVAL;
1754 	}
1755 
1756 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1757 }
1758 
1759 int
1760 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1761 		       uint64_t offset_blocks, uint64_t num_blocks,
1762 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1763 {
1764 	struct spdk_bdev *bdev = desc->bdev;
1765 	struct spdk_bdev_io *bdev_io;
1766 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1767 
1768 	if (!desc->write) {
1769 		return -EBADF;
1770 	}
1771 
1772 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1773 		return -EINVAL;
1774 	}
1775 
1776 	if (num_blocks == 0) {
1777 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
1778 		return -EINVAL;
1779 	}
1780 
1781 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1782 	if (!bdev_io) {
1783 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
1784 		return -ENOMEM;
1785 	}
1786 
1787 	bdev_io->ch = channel;
1788 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1789 	bdev_io->u.bdev.iov.iov_base = NULL;
1790 	bdev_io->u.bdev.iov.iov_len = 0;
1791 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1792 	bdev_io->u.bdev.iovcnt = 1;
1793 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1794 	bdev_io->u.bdev.num_blocks = num_blocks;
1795 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1796 
1797 	spdk_bdev_io_submit(bdev_io);
1798 	return 0;
1799 }
1800 
1801 int
1802 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1803 		uint64_t offset, uint64_t length,
1804 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1805 {
1806 	uint64_t offset_blocks, num_blocks;
1807 
1808 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
1809 		return -EINVAL;
1810 	}
1811 
1812 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1813 }
1814 
1815 int
1816 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1817 		       uint64_t offset_blocks, uint64_t num_blocks,
1818 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1819 {
1820 	struct spdk_bdev *bdev = desc->bdev;
1821 	struct spdk_bdev_io *bdev_io;
1822 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1823 
1824 	if (!desc->write) {
1825 		return -EBADF;
1826 	}
1827 
1828 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1829 		return -EINVAL;
1830 	}
1831 
1832 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1833 	if (!bdev_io) {
1834 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1835 		return -ENOMEM;
1836 	}
1837 
1838 	bdev_io->ch = channel;
1839 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1840 	bdev_io->u.bdev.iovs = NULL;
1841 	bdev_io->u.bdev.iovcnt = 0;
1842 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1843 	bdev_io->u.bdev.num_blocks = num_blocks;
1844 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1845 
1846 	spdk_bdev_io_submit(bdev_io);
1847 	return 0;
1848 }
1849 
1850 static void
1851 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
1852 {
1853 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
1854 	struct spdk_bdev_io *bdev_io;
1855 
1856 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
1857 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, link);
1858 	spdk_bdev_io_submit_reset(bdev_io);
1859 }
1860 
1861 static void
1862 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
1863 {
1864 	struct spdk_io_channel		*ch;
1865 	struct spdk_bdev_channel	*channel;
1866 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1867 	struct spdk_bdev_module_channel	*shared_ch;
1868 
1869 	ch = spdk_io_channel_iter_get_channel(i);
1870 	channel = spdk_io_channel_get_ctx(ch);
1871 	mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
1872 	shared_ch = channel->module_ch;
1873 
1874 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
1875 
1876 	_spdk_bdev_abort_queued_io(&shared_ch->nomem_io, channel);
1877 	_spdk_bdev_abort_queued_io(&channel->qos_io, channel);
1878 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
1879 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
1880 
1881 	spdk_for_each_channel_continue(i, 0);
1882 }
1883 
1884 static void
1885 _spdk_bdev_reset_freeze_qos_channel(void *ctx)
1886 {
1887 	struct spdk_bdev		*bdev = ctx;
1888 	struct spdk_bdev_mgmt_channel	*mgmt_channel = NULL;
1889 	struct spdk_bdev_channel	*qos_channel = bdev->qos_channel;
1890 	struct spdk_bdev_module_channel	*shared_ch = NULL;
1891 
1892 	if (qos_channel) {
1893 		shared_ch = qos_channel->module_ch;
1894 		mgmt_channel = spdk_io_channel_get_ctx(qos_channel->mgmt_channel);
1895 
1896 		qos_channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
1897 
1898 		_spdk_bdev_abort_queued_io(&shared_ch->nomem_io, qos_channel);
1899 		_spdk_bdev_abort_queued_io(&qos_channel->qos_io, qos_channel);
1900 		_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, qos_channel);
1901 		_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, qos_channel);
1902 	}
1903 }
1904 
1905 static void
1906 _spdk_bdev_start_reset(void *ctx)
1907 {
1908 	struct spdk_bdev_channel *ch = ctx;
1909 
1910 	spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel,
1911 			      ch, _spdk_bdev_reset_dev);
1912 }
1913 
1914 static void
1915 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
1916 {
1917 	struct spdk_bdev *bdev = ch->bdev;
1918 
1919 	assert(!TAILQ_EMPTY(&ch->queued_resets));
1920 
1921 	pthread_mutex_lock(&bdev->mutex);
1922 	if (bdev->reset_in_progress == NULL) {
1923 		bdev->reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
1924 		/*
1925 		 * Take a channel reference for the target bdev for the life of this
1926 		 *  reset.  This guards against the channel getting destroyed while
1927 		 *  spdk_for_each_channel() calls related to this reset IO are in
1928 		 *  progress.  We will release the reference when this reset is
1929 		 *  completed.
1930 		 */
1931 		bdev->reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev));
1932 		_spdk_bdev_start_reset(ch);
1933 	}
1934 	pthread_mutex_unlock(&bdev->mutex);
1935 }
1936 
1937 int
1938 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1939 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1940 {
1941 	struct spdk_bdev *bdev = desc->bdev;
1942 	struct spdk_bdev_io *bdev_io;
1943 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1944 
1945 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1946 	if (!bdev_io) {
1947 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1948 		return -ENOMEM;
1949 	}
1950 
1951 	bdev_io->ch = channel;
1952 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1953 	bdev_io->u.reset.ch_ref = NULL;
1954 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1955 
1956 	pthread_mutex_lock(&bdev->mutex);
1957 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, link);
1958 	pthread_mutex_unlock(&bdev->mutex);
1959 
1960 	_spdk_bdev_channel_start_reset(channel);
1961 
1962 	/* Explicitly handle the QoS bdev channel as no IO channel associated */
1963 	if (bdev->qos_thread) {
1964 		spdk_thread_send_msg(bdev->qos_thread,
1965 				     _spdk_bdev_reset_freeze_qos_channel, bdev);
1966 	}
1967 
1968 	return 0;
1969 }
1970 
1971 void
1972 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1973 		      struct spdk_bdev_io_stat *stat)
1974 {
1975 #ifdef SPDK_CONFIG_VTUNE
1976 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1977 	memset(stat, 0, sizeof(*stat));
1978 	return;
1979 #endif
1980 
1981 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1982 
1983 	channel->stat.ticks_rate = spdk_get_ticks_hz();
1984 	*stat = channel->stat;
1985 	memset(&channel->stat, 0, sizeof(channel->stat));
1986 }
1987 
1988 int
1989 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1990 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1991 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1992 {
1993 	struct spdk_bdev *bdev = desc->bdev;
1994 	struct spdk_bdev_io *bdev_io;
1995 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1996 
1997 	if (!desc->write) {
1998 		return -EBADF;
1999 	}
2000 
2001 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
2002 	if (!bdev_io) {
2003 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
2004 		return -ENOMEM;
2005 	}
2006 
2007 	bdev_io->ch = channel;
2008 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
2009 	bdev_io->u.nvme_passthru.cmd = *cmd;
2010 	bdev_io->u.nvme_passthru.buf = buf;
2011 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2012 	bdev_io->u.nvme_passthru.md_buf = NULL;
2013 	bdev_io->u.nvme_passthru.md_len = 0;
2014 
2015 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2016 
2017 	spdk_bdev_io_submit(bdev_io);
2018 	return 0;
2019 }
2020 
2021 int
2022 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2023 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2024 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
2025 {
2026 	struct spdk_bdev *bdev = desc->bdev;
2027 	struct spdk_bdev_io *bdev_io;
2028 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2029 
2030 	if (!desc->write) {
2031 		/*
2032 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2033 		 *  to easily determine if the command is a read or write, but for now just
2034 		 *  do not allow io_passthru with a read-only descriptor.
2035 		 */
2036 		return -EBADF;
2037 	}
2038 
2039 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
2040 	if (!bdev_io) {
2041 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
2042 		return -ENOMEM;
2043 	}
2044 
2045 	bdev_io->ch = channel;
2046 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
2047 	bdev_io->u.nvme_passthru.cmd = *cmd;
2048 	bdev_io->u.nvme_passthru.buf = buf;
2049 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2050 	bdev_io->u.nvme_passthru.md_buf = NULL;
2051 	bdev_io->u.nvme_passthru.md_len = 0;
2052 
2053 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2054 
2055 	spdk_bdev_io_submit(bdev_io);
2056 	return 0;
2057 }
2058 
2059 int
2060 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2061 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
2062 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2063 {
2064 	struct spdk_bdev *bdev = desc->bdev;
2065 	struct spdk_bdev_io *bdev_io;
2066 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2067 
2068 	if (!desc->write) {
2069 		/*
2070 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2071 		 *  to easily determine if the command is a read or write, but for now just
2072 		 *  do not allow io_passthru with a read-only descriptor.
2073 		 */
2074 		return -EBADF;
2075 	}
2076 
2077 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
2078 	if (!bdev_io) {
2079 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
2080 		return -ENOMEM;
2081 	}
2082 
2083 	bdev_io->ch = channel;
2084 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
2085 	bdev_io->u.nvme_passthru.cmd = *cmd;
2086 	bdev_io->u.nvme_passthru.buf = buf;
2087 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2088 	bdev_io->u.nvme_passthru.md_buf = md_buf;
2089 	bdev_io->u.nvme_passthru.md_len = md_len;
2090 
2091 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2092 
2093 	spdk_bdev_io_submit(bdev_io);
2094 	return 0;
2095 }
2096 
2097 int
2098 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
2099 {
2100 	if (!bdev_io) {
2101 		SPDK_ERRLOG("bdev_io is NULL\n");
2102 		return -1;
2103 	}
2104 
2105 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
2106 		SPDK_ERRLOG("bdev_io is in pending state\n");
2107 		assert(false);
2108 		return -1;
2109 	}
2110 
2111 	spdk_bdev_put_io(bdev_io);
2112 
2113 	return 0;
2114 }
2115 
2116 static void
2117 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
2118 {
2119 	struct spdk_bdev *bdev = bdev_ch->bdev;
2120 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
2121 	struct spdk_bdev_io *bdev_io;
2122 
2123 	if (shared_ch->io_outstanding > shared_ch->nomem_threshold) {
2124 		/*
2125 		 * Allow some more I/O to complete before retrying the nomem_io queue.
2126 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
2127 		 *  the context of a completion, because the resources for the I/O are
2128 		 *  not released until control returns to the bdev poller.  Also, we
2129 		 *  may require several small I/O to complete before a larger I/O
2130 		 *  (that requires splitting) can be submitted.
2131 		 */
2132 		return;
2133 	}
2134 
2135 	while (!TAILQ_EMPTY(&shared_ch->nomem_io)) {
2136 		bdev_io = TAILQ_FIRST(&shared_ch->nomem_io);
2137 		TAILQ_REMOVE(&shared_ch->nomem_io, bdev_io, link);
2138 		bdev_io->ch->io_outstanding++;
2139 		shared_ch->io_outstanding++;
2140 		bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
2141 		bdev->fn_table->submit_request(bdev_io->ch->channel, bdev_io);
2142 		if (bdev_io->status == SPDK_BDEV_IO_STATUS_NOMEM) {
2143 			break;
2144 		}
2145 	}
2146 }
2147 
2148 static inline void
2149 _spdk_bdev_io_complete(void *ctx)
2150 {
2151 	struct spdk_bdev_io *bdev_io = ctx;
2152 
2153 	if (spdk_unlikely(bdev_io->in_submit_request || bdev_io->io_submit_ch)) {
2154 		/*
2155 		 * Send the completion to the thread that originally submitted the I/O,
2156 		 * which may not be the current thread in the case of QoS.
2157 		 */
2158 		if (bdev_io->io_submit_ch) {
2159 			bdev_io->ch = bdev_io->io_submit_ch;
2160 			bdev_io->io_submit_ch = NULL;
2161 		}
2162 
2163 		/*
2164 		 * Defer completion to avoid potential infinite recursion if the
2165 		 * user's completion callback issues a new I/O.
2166 		 */
2167 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->ch->channel),
2168 				     _spdk_bdev_io_complete, bdev_io);
2169 		return;
2170 	}
2171 
2172 	assert(bdev_io->cb != NULL);
2173 	assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->ch->channel));
2174 
2175 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS,
2176 		    bdev_io->caller_ctx);
2177 }
2178 
2179 static void
2180 _spdk_bdev_unfreeze_qos_channel(void *ctx)
2181 {
2182 	struct spdk_bdev	*bdev = ctx;
2183 
2184 	if (bdev->qos_channel) {
2185 		bdev->qos_channel->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
2186 		assert(TAILQ_EMPTY(&bdev->qos_channel->queued_resets));
2187 	}
2188 }
2189 
2190 static void
2191 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
2192 {
2193 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
2194 
2195 	if (bdev_io->u.reset.ch_ref != NULL) {
2196 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
2197 		bdev_io->u.reset.ch_ref = NULL;
2198 	}
2199 
2200 	_spdk_bdev_io_complete(bdev_io);
2201 }
2202 
2203 static void
2204 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
2205 {
2206 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
2207 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
2208 
2209 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
2210 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
2211 		_spdk_bdev_channel_start_reset(ch);
2212 	}
2213 
2214 	spdk_for_each_channel_continue(i, 0);
2215 }
2216 
2217 void
2218 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
2219 {
2220 	struct spdk_bdev *bdev = bdev_io->bdev;
2221 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
2222 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
2223 
2224 	bdev_io->status = status;
2225 
2226 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
2227 		bool unlock_channels = false;
2228 
2229 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
2230 			SPDK_ERRLOG("NOMEM returned for reset\n");
2231 		}
2232 		pthread_mutex_lock(&bdev->mutex);
2233 		if (bdev_io == bdev->reset_in_progress) {
2234 			bdev->reset_in_progress = NULL;
2235 			unlock_channels = true;
2236 		}
2237 		pthread_mutex_unlock(&bdev->mutex);
2238 
2239 		if (unlock_channels) {
2240 			/* Explicitly handle the QoS bdev channel as no IO channel associated */
2241 			if (bdev->qos_thread) {
2242 				spdk_thread_send_msg(bdev->qos_thread,
2243 						     _spdk_bdev_unfreeze_qos_channel, bdev);
2244 			}
2245 
2246 			spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel,
2247 					      bdev_io, _spdk_bdev_reset_complete);
2248 			return;
2249 		}
2250 	} else {
2251 		assert(bdev_ch->io_outstanding > 0);
2252 		assert(shared_ch->io_outstanding > 0);
2253 		bdev_ch->io_outstanding--;
2254 		shared_ch->io_outstanding--;
2255 
2256 		if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) {
2257 			TAILQ_INSERT_HEAD(&shared_ch->nomem_io, bdev_io, link);
2258 			/*
2259 			 * Wait for some of the outstanding I/O to complete before we
2260 			 *  retry any of the nomem_io.  Normally we will wait for
2261 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
2262 			 *  depth channels we will instead wait for half to complete.
2263 			 */
2264 			shared_ch->nomem_threshold = spdk_max((int64_t)shared_ch->io_outstanding / 2,
2265 							      (int64_t)shared_ch->io_outstanding - NOMEM_THRESHOLD_COUNT);
2266 			return;
2267 		}
2268 
2269 		if (spdk_unlikely(!TAILQ_EMPTY(&shared_ch->nomem_io))) {
2270 			_spdk_bdev_ch_retry_io(bdev_ch);
2271 		}
2272 	}
2273 
2274 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2275 		switch (bdev_io->type) {
2276 		case SPDK_BDEV_IO_TYPE_READ:
2277 			bdev_ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev->blocklen;
2278 			bdev_ch->stat.num_read_ops++;
2279 			bdev_ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->submit_tsc);
2280 			break;
2281 		case SPDK_BDEV_IO_TYPE_WRITE:
2282 			bdev_ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev->blocklen;
2283 			bdev_ch->stat.num_write_ops++;
2284 			bdev_ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->submit_tsc);
2285 			break;
2286 		default:
2287 			break;
2288 		}
2289 	}
2290 
2291 #ifdef SPDK_CONFIG_VTUNE
2292 	uint64_t now_tsc = spdk_get_ticks();
2293 	if (now_tsc > (bdev_ch->start_tsc + bdev_ch->interval_tsc)) {
2294 		uint64_t data[5];
2295 
2296 		data[0] = bdev_ch->stat.num_read_ops;
2297 		data[1] = bdev_ch->stat.bytes_read;
2298 		data[2] = bdev_ch->stat.num_write_ops;
2299 		data[3] = bdev_ch->stat.bytes_written;
2300 		data[4] = bdev->fn_table->get_spin_time ?
2301 			  bdev->fn_table->get_spin_time(bdev_ch->channel) : 0;
2302 
2303 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_ch->handle,
2304 				   __itt_metadata_u64, 5, data);
2305 
2306 		memset(&bdev_ch->stat, 0, sizeof(bdev_ch->stat));
2307 		bdev_ch->start_tsc = now_tsc;
2308 	}
2309 #endif
2310 
2311 	_spdk_bdev_io_complete(bdev_io);
2312 }
2313 
2314 void
2315 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
2316 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
2317 {
2318 	if (sc == SPDK_SCSI_STATUS_GOOD) {
2319 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
2320 	} else {
2321 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
2322 		bdev_io->error.scsi.sc = sc;
2323 		bdev_io->error.scsi.sk = sk;
2324 		bdev_io->error.scsi.asc = asc;
2325 		bdev_io->error.scsi.ascq = ascq;
2326 	}
2327 
2328 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
2329 }
2330 
2331 void
2332 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
2333 			     int *sc, int *sk, int *asc, int *ascq)
2334 {
2335 	assert(sc != NULL);
2336 	assert(sk != NULL);
2337 	assert(asc != NULL);
2338 	assert(ascq != NULL);
2339 
2340 	switch (bdev_io->status) {
2341 	case SPDK_BDEV_IO_STATUS_SUCCESS:
2342 		*sc = SPDK_SCSI_STATUS_GOOD;
2343 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
2344 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2345 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2346 		break;
2347 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
2348 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
2349 		break;
2350 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
2351 		*sc = bdev_io->error.scsi.sc;
2352 		*sk = bdev_io->error.scsi.sk;
2353 		*asc = bdev_io->error.scsi.asc;
2354 		*ascq = bdev_io->error.scsi.ascq;
2355 		break;
2356 	default:
2357 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
2358 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
2359 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2360 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2361 		break;
2362 	}
2363 }
2364 
2365 void
2366 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
2367 {
2368 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
2369 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
2370 	} else {
2371 		bdev_io->error.nvme.sct = sct;
2372 		bdev_io->error.nvme.sc = sc;
2373 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
2374 	}
2375 
2376 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
2377 }
2378 
2379 void
2380 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
2381 {
2382 	assert(sct != NULL);
2383 	assert(sc != NULL);
2384 
2385 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
2386 		*sct = bdev_io->error.nvme.sct;
2387 		*sc = bdev_io->error.nvme.sc;
2388 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2389 		*sct = SPDK_NVME_SCT_GENERIC;
2390 		*sc = SPDK_NVME_SC_SUCCESS;
2391 	} else {
2392 		*sct = SPDK_NVME_SCT_GENERIC;
2393 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
2394 	}
2395 }
2396 
2397 struct spdk_thread *
2398 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
2399 {
2400 	return spdk_io_channel_get_thread(bdev_io->ch->channel);
2401 }
2402 
2403 static int
2404 spdk_bdev_init(struct spdk_bdev *bdev)
2405 {
2406 	assert(bdev->module != NULL);
2407 
2408 	if (!bdev->name) {
2409 		SPDK_ERRLOG("Bdev name is NULL\n");
2410 		return -EINVAL;
2411 	}
2412 
2413 	if (spdk_bdev_get_by_name(bdev->name)) {
2414 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
2415 		return -EEXIST;
2416 	}
2417 
2418 	bdev->status = SPDK_BDEV_STATUS_READY;
2419 
2420 	TAILQ_INIT(&bdev->open_descs);
2421 
2422 	TAILQ_INIT(&bdev->aliases);
2423 
2424 	bdev->reset_in_progress = NULL;
2425 
2426 	spdk_io_device_register(__bdev_to_io_dev(bdev),
2427 				spdk_bdev_channel_create, spdk_bdev_channel_destroy,
2428 				sizeof(struct spdk_bdev_channel));
2429 
2430 	pthread_mutex_init(&bdev->mutex, NULL);
2431 	return 0;
2432 }
2433 
2434 static void
2435 spdk_bdev_fini(struct spdk_bdev *bdev)
2436 {
2437 	pthread_mutex_destroy(&bdev->mutex);
2438 
2439 	spdk_io_device_unregister(__bdev_to_io_dev(bdev), NULL);
2440 }
2441 
2442 static void
2443 spdk_bdev_start(struct spdk_bdev *bdev)
2444 {
2445 	struct spdk_bdev_module *module;
2446 
2447 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
2448 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
2449 
2450 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
2451 		if (module->examine) {
2452 			module->action_in_progress++;
2453 			module->examine(bdev);
2454 		}
2455 	}
2456 }
2457 
2458 int
2459 spdk_bdev_register(struct spdk_bdev *bdev)
2460 {
2461 	int rc = spdk_bdev_init(bdev);
2462 
2463 	if (rc == 0) {
2464 		spdk_bdev_start(bdev);
2465 	}
2466 
2467 	return rc;
2468 }
2469 
2470 static void
2471 spdk_vbdev_remove_base_bdevs(struct spdk_bdev *vbdev)
2472 {
2473 	struct spdk_bdev **bdevs;
2474 	struct spdk_bdev *base;
2475 	size_t i, j, k;
2476 	bool found;
2477 
2478 	/* Iterate over base bdevs to remove vbdev from them. */
2479 	for (i = 0; i < vbdev->base_bdevs_cnt; i++) {
2480 		found = false;
2481 		base = vbdev->base_bdevs[i];
2482 
2483 		for (j = 0; j < base->vbdevs_cnt; j++) {
2484 			if (base->vbdevs[j] != vbdev) {
2485 				continue;
2486 			}
2487 
2488 			for (k = j; k + 1 < base->vbdevs_cnt; k++) {
2489 				base->vbdevs[k] = base->vbdevs[k + 1];
2490 			}
2491 
2492 			base->vbdevs_cnt--;
2493 			if (base->vbdevs_cnt > 0) {
2494 				bdevs = realloc(base->vbdevs, base->vbdevs_cnt * sizeof(bdevs[0]));
2495 				/* It would be odd if shrinking memory block fail. */
2496 				assert(bdevs);
2497 				base->vbdevs = bdevs;
2498 			} else {
2499 				free(base->vbdevs);
2500 				base->vbdevs = NULL;
2501 			}
2502 
2503 			found = true;
2504 			break;
2505 		}
2506 
2507 		if (!found) {
2508 			SPDK_WARNLOG("Bdev '%s' is not base bdev of '%s'.\n", base->name, vbdev->name);
2509 		}
2510 	}
2511 
2512 	free(vbdev->base_bdevs);
2513 	vbdev->base_bdevs = NULL;
2514 	vbdev->base_bdevs_cnt = 0;
2515 }
2516 
2517 static int
2518 spdk_vbdev_set_base_bdevs(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, size_t cnt)
2519 {
2520 	struct spdk_bdev **vbdevs;
2521 	struct spdk_bdev *base;
2522 	size_t i;
2523 
2524 	/* Adding base bdevs isn't supported (yet?). */
2525 	assert(vbdev->base_bdevs_cnt == 0);
2526 
2527 	vbdev->base_bdevs = malloc(cnt * sizeof(vbdev->base_bdevs[0]));
2528 	if (!vbdev->base_bdevs) {
2529 		SPDK_ERRLOG("%s - realloc() failed\n", vbdev->name);
2530 		return -ENOMEM;
2531 	}
2532 
2533 	memcpy(vbdev->base_bdevs, base_bdevs, cnt * sizeof(vbdev->base_bdevs[0]));
2534 	vbdev->base_bdevs_cnt = cnt;
2535 
2536 	/* Iterate over base bdevs to add this vbdev to them. */
2537 	for (i = 0; i < cnt; i++) {
2538 		base = vbdev->base_bdevs[i];
2539 
2540 		assert(base != NULL);
2541 		assert(base->claim_module != NULL);
2542 
2543 		vbdevs = realloc(base->vbdevs, (base->vbdevs_cnt + 1) * sizeof(vbdevs[0]));
2544 		if (!vbdevs) {
2545 			SPDK_ERRLOG("%s - realloc() failed\n", base->name);
2546 			spdk_vbdev_remove_base_bdevs(vbdev);
2547 			return -ENOMEM;
2548 		}
2549 
2550 		vbdevs[base->vbdevs_cnt] = vbdev;
2551 		base->vbdevs = vbdevs;
2552 		base->vbdevs_cnt++;
2553 	}
2554 
2555 	return 0;
2556 }
2557 
2558 int
2559 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
2560 {
2561 	int rc;
2562 
2563 	rc = spdk_bdev_init(vbdev);
2564 	if (rc) {
2565 		return rc;
2566 	}
2567 
2568 	if (base_bdev_count == 0) {
2569 		spdk_bdev_start(vbdev);
2570 		return 0;
2571 	}
2572 
2573 	rc = spdk_vbdev_set_base_bdevs(vbdev, base_bdevs, base_bdev_count);
2574 	if (rc) {
2575 		spdk_bdev_fini(vbdev);
2576 		return rc;
2577 	}
2578 
2579 	spdk_bdev_start(vbdev);
2580 	return 0;
2581 
2582 }
2583 
2584 void
2585 spdk_bdev_unregister_done(struct spdk_bdev *bdev, int bdeverrno)
2586 {
2587 	if (bdev->unregister_cb != NULL) {
2588 		bdev->unregister_cb(bdev->unregister_ctx, bdeverrno);
2589 	}
2590 }
2591 
2592 static void
2593 _remove_notify(void *arg)
2594 {
2595 	struct spdk_bdev_desc *desc = arg;
2596 
2597 	desc->remove_cb(desc->remove_ctx);
2598 }
2599 
2600 void
2601 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
2602 {
2603 	struct spdk_bdev_desc	*desc, *tmp;
2604 	int			rc;
2605 	bool			do_destruct = true;
2606 
2607 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
2608 
2609 	pthread_mutex_lock(&bdev->mutex);
2610 
2611 	spdk_vbdev_remove_base_bdevs(bdev);
2612 
2613 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
2614 	bdev->unregister_cb = cb_fn;
2615 	bdev->unregister_ctx = cb_arg;
2616 
2617 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
2618 		if (desc->remove_cb) {
2619 			do_destruct = false;
2620 			/*
2621 			 * Defer invocation of the remove_cb to a separate message that will
2622 			 *  run later on this thread.  This ensures this context unwinds and
2623 			 *  we don't recursively unregister this bdev again if the remove_cb
2624 			 *  immediately closes its descriptor.
2625 			 */
2626 			spdk_thread_send_msg(spdk_get_thread(), _remove_notify, desc);
2627 		}
2628 	}
2629 
2630 	if (!do_destruct) {
2631 		pthread_mutex_unlock(&bdev->mutex);
2632 		return;
2633 	}
2634 
2635 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
2636 	pthread_mutex_unlock(&bdev->mutex);
2637 
2638 	spdk_bdev_fini(bdev);
2639 
2640 	rc = bdev->fn_table->destruct(bdev->ctxt);
2641 	if (rc < 0) {
2642 		SPDK_ERRLOG("destruct failed\n");
2643 	}
2644 	if (rc <= 0 && cb_fn != NULL) {
2645 		cb_fn(cb_arg, rc);
2646 	}
2647 }
2648 
2649 int
2650 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
2651 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
2652 {
2653 	struct spdk_bdev_desc *desc;
2654 
2655 	desc = calloc(1, sizeof(*desc));
2656 	if (desc == NULL) {
2657 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
2658 		return -ENOMEM;
2659 	}
2660 
2661 	pthread_mutex_lock(&bdev->mutex);
2662 
2663 	if (write && bdev->claim_module) {
2664 		SPDK_INFOLOG(SPDK_LOG_BDEV, "Could not open %s - already claimed\n", bdev->name);
2665 		free(desc);
2666 		pthread_mutex_unlock(&bdev->mutex);
2667 		return -EPERM;
2668 	}
2669 
2670 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
2671 
2672 	desc->bdev = bdev;
2673 	desc->remove_cb = remove_cb;
2674 	desc->remove_ctx = remove_ctx;
2675 	desc->write = write;
2676 	*_desc = desc;
2677 
2678 	pthread_mutex_unlock(&bdev->mutex);
2679 
2680 	return 0;
2681 }
2682 
2683 void
2684 spdk_bdev_close(struct spdk_bdev_desc *desc)
2685 {
2686 	struct spdk_bdev *bdev = desc->bdev;
2687 	bool do_unregister = false;
2688 
2689 	pthread_mutex_lock(&bdev->mutex);
2690 
2691 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
2692 	free(desc);
2693 
2694 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
2695 		do_unregister = true;
2696 	}
2697 	pthread_mutex_unlock(&bdev->mutex);
2698 
2699 	if (do_unregister == true) {
2700 		spdk_bdev_unregister(bdev, bdev->unregister_cb, bdev->unregister_ctx);
2701 	}
2702 }
2703 
2704 int
2705 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
2706 			    struct spdk_bdev_module *module)
2707 {
2708 	if (bdev->claim_module != NULL) {
2709 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
2710 			    bdev->claim_module->name);
2711 		return -EPERM;
2712 	}
2713 
2714 	if (desc && !desc->write) {
2715 		desc->write = true;
2716 	}
2717 
2718 	bdev->claim_module = module;
2719 	return 0;
2720 }
2721 
2722 void
2723 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
2724 {
2725 	assert(bdev->claim_module != NULL);
2726 	bdev->claim_module = NULL;
2727 }
2728 
2729 struct spdk_bdev *
2730 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
2731 {
2732 	return desc->bdev;
2733 }
2734 
2735 void
2736 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
2737 {
2738 	struct iovec *iovs;
2739 	int iovcnt;
2740 
2741 	if (bdev_io == NULL) {
2742 		return;
2743 	}
2744 
2745 	switch (bdev_io->type) {
2746 	case SPDK_BDEV_IO_TYPE_READ:
2747 		iovs = bdev_io->u.bdev.iovs;
2748 		iovcnt = bdev_io->u.bdev.iovcnt;
2749 		break;
2750 	case SPDK_BDEV_IO_TYPE_WRITE:
2751 		iovs = bdev_io->u.bdev.iovs;
2752 		iovcnt = bdev_io->u.bdev.iovcnt;
2753 		break;
2754 	default:
2755 		iovs = NULL;
2756 		iovcnt = 0;
2757 		break;
2758 	}
2759 
2760 	if (iovp) {
2761 		*iovp = iovs;
2762 	}
2763 	if (iovcntp) {
2764 		*iovcntp = iovcnt;
2765 	}
2766 }
2767 
2768 void
2769 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module)
2770 {
2771 
2772 	if (spdk_bdev_module_list_find(bdev_module->name)) {
2773 		fprintf(stderr, "ERROR: module '%s' already registered.\n", bdev_module->name);
2774 		assert(false);
2775 	}
2776 
2777 	if (bdev_module->async_init) {
2778 		bdev_module->action_in_progress = 1;
2779 	}
2780 
2781 	/*
2782 	 * Modules with examine callbacks must be initialized first, so they are
2783 	 *  ready to handle examine callbacks from later modules that will
2784 	 *  register physical bdevs.
2785 	 */
2786 	if (bdev_module->examine != NULL) {
2787 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2788 	} else {
2789 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2790 	}
2791 }
2792 
2793 struct spdk_bdev_module *
2794 spdk_bdev_module_list_find(const char *name)
2795 {
2796 	struct spdk_bdev_module *bdev_module;
2797 
2798 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
2799 		if (strcmp(name, bdev_module->name) == 0) {
2800 			break;
2801 		}
2802 	}
2803 
2804 	return bdev_module;
2805 }
2806 
2807 static void
2808 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2809 {
2810 	uint64_t len;
2811 
2812 	if (!success) {
2813 		bdev_io->cb = bdev_io->u.bdev.stored_user_cb;
2814 		_spdk_bdev_io_complete(bdev_io);
2815 		return;
2816 	}
2817 
2818 	/* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */
2819 	len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->u.bdev.split_remaining_num_blocks,
2820 		       ZERO_BUFFER_SIZE);
2821 
2822 	bdev_io->u.bdev.offset_blocks = bdev_io->u.bdev.split_current_offset_blocks;
2823 	bdev_io->u.bdev.iov.iov_len = len;
2824 	bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev);
2825 	bdev_io->u.bdev.split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks;
2826 	bdev_io->u.bdev.split_current_offset_blocks += bdev_io->u.bdev.num_blocks;
2827 
2828 	/* if this round completes the i/o, change the callback to be the original user callback */
2829 	if (bdev_io->u.bdev.split_remaining_num_blocks == 0) {
2830 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->u.bdev.stored_user_cb);
2831 	} else {
2832 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split);
2833 	}
2834 	spdk_bdev_io_submit(bdev_io);
2835 }
2836 
2837 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
2838