xref: /spdk/lib/bdev/bdev.c (revision 61e8486c10bd179811225d792add75a8fcc936ce)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 #include "spdk/conf.h"
39 
40 #include "spdk/env.h"
41 #include "spdk/event.h"
42 #include "spdk/io_channel.h"
43 #include "spdk/likely.h"
44 #include "spdk/queue.h"
45 #include "spdk/nvme_spec.h"
46 #include "spdk/scsi_spec.h"
47 #include "spdk/util.h"
48 
49 #include "spdk_internal/bdev.h"
50 #include "spdk_internal/log.h"
51 #include "spdk/string.h"
52 
53 #ifdef SPDK_CONFIG_VTUNE
54 #include "ittnotify.h"
55 #include "ittnotify_types.h"
56 int __itt_init_ittlib(const char *, __itt_group_id);
57 #endif
58 
59 #define SPDK_BDEV_IO_POOL_SIZE			(64 * 1024)
60 #define SPDK_BDEV_IO_CACHE_SIZE			256
61 #define BUF_SMALL_POOL_SIZE			8192
62 #define BUF_LARGE_POOL_SIZE			1024
63 #define NOMEM_THRESHOLD_COUNT			8
64 #define ZERO_BUFFER_SIZE			0x100000
65 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC		1000
66 #define SPDK_BDEV_SEC_TO_USEC			1000000ULL
67 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE	1
68 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC		10000
69 
70 typedef TAILQ_HEAD(, spdk_bdev_io) bdev_io_tailq_t;
71 typedef STAILQ_HEAD(, spdk_bdev_io) bdev_io_stailq_t;
72 
73 struct spdk_bdev_mgr {
74 	struct spdk_mempool *bdev_io_pool;
75 
76 	struct spdk_mempool *buf_small_pool;
77 	struct spdk_mempool *buf_large_pool;
78 
79 	void *zero_buffer;
80 
81 	TAILQ_HEAD(, spdk_bdev_module) bdev_modules;
82 
83 	TAILQ_HEAD(, spdk_bdev) bdevs;
84 
85 	bool init_complete;
86 	bool module_init_complete;
87 
88 #ifdef SPDK_CONFIG_VTUNE
89 	__itt_domain	*domain;
90 #endif
91 };
92 
93 static struct spdk_bdev_mgr g_bdev_mgr = {
94 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
95 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
96 	.init_complete = false,
97 	.module_init_complete = false,
98 };
99 
100 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
101 static void			*g_init_cb_arg = NULL;
102 
103 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
104 static void			*g_fini_cb_arg = NULL;
105 static struct spdk_thread	*g_fini_thread = NULL;
106 
107 
108 struct spdk_bdev_mgmt_channel {
109 	bdev_io_stailq_t need_buf_small;
110 	bdev_io_stailq_t need_buf_large;
111 
112 	/*
113 	 * Each thread keeps a cache of bdev_io - this allows
114 	 *  bdev threads which are *not* DPDK threads to still
115 	 *  benefit from a per-thread bdev_io cache.  Without
116 	 *  this, non-DPDK threads fetching from the mempool
117 	 *  incur a cmpxchg on get and put.
118 	 */
119 	bdev_io_stailq_t per_thread_cache;
120 	uint32_t	per_thread_cache_count;
121 
122 	TAILQ_HEAD(, spdk_bdev_module_channel) module_channels;
123 };
124 
125 struct spdk_bdev_desc {
126 	struct spdk_bdev		*bdev;
127 	spdk_bdev_remove_cb_t		remove_cb;
128 	void				*remove_ctx;
129 	bool				write;
130 	TAILQ_ENTRY(spdk_bdev_desc)	link;
131 };
132 
133 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
134 #define BDEV_CH_QOS_ENABLED		(1 << 1)
135 
136 struct spdk_bdev_channel {
137 	struct spdk_bdev	*bdev;
138 
139 	/* The channel for the underlying device */
140 	struct spdk_io_channel	*channel;
141 
142 	/* Channel for the bdev manager */
143 	struct spdk_io_channel	*mgmt_channel;
144 
145 	struct spdk_bdev_io_stat stat;
146 
147 	/*
148 	 * Count of I/O submitted through this channel and waiting for completion.
149 	 * Incremented before submit_request() is called on an spdk_bdev_io.
150 	 */
151 	uint64_t		io_outstanding;
152 
153 	bdev_io_tailq_t		queued_resets;
154 
155 	uint32_t		flags;
156 
157 	/*
158 	 * Rate limiting on this channel.
159 	 * Queue of IO awaiting issue because of a QoS rate limiting happened
160 	 *  on this channel.
161 	 */
162 	bdev_io_tailq_t		qos_io;
163 
164 	/*
165 	 * Rate limiting on this channel.
166 	 * Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and
167 	 *  only valid for the master channel which manages the outstanding IOs.
168 	 */
169 	uint64_t		qos_max_ios_per_timeslice;
170 
171 	/*
172 	 * Rate limiting on this channel.
173 	 * Submitted IO in one timeslice (e.g., 1ms)
174 	 */
175 	uint64_t		io_submitted_this_timeslice;
176 
177 	/*
178 	 * Rate limiting on this channel.
179 	 * Periodic running QoS poller in millisecond.
180 	 */
181 	struct spdk_poller	*qos_poller;
182 
183 	/* Per-device channel */
184 	struct spdk_bdev_module_channel *module_ch;
185 
186 #ifdef SPDK_CONFIG_VTUNE
187 	uint64_t		start_tsc;
188 	uint64_t		interval_tsc;
189 	__itt_string_handle	*handle;
190 #endif
191 
192 };
193 
194 #define __bdev_to_io_dev(bdev)		(((char *)bdev) + 1)
195 #define __bdev_from_io_dev(io_dev)	((struct spdk_bdev *)(((char *)io_dev) - 1))
196 
197 /*
198  * Per-module (or per-io_device) channel. Multiple bdevs built on the same io_device
199  * will queue here their IO that awaits retry. It makes it posible to retry sending
200  * IO to one bdev after IO from other bdev completes.
201  */
202 struct spdk_bdev_module_channel {
203 	/*
204 	 * Count of I/O submitted to bdev module and waiting for completion.
205 	 * Incremented before submit_request() is called on an spdk_bdev_io.
206 	 */
207 	uint64_t		io_outstanding;
208 
209 	/*
210 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
211 	 *  on this channel.
212 	 */
213 	bdev_io_tailq_t		nomem_io;
214 
215 	/*
216 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
217 	 */
218 	uint64_t		nomem_threshold;
219 
220 	/* I/O channel allocated by a bdev module */
221 	struct spdk_io_channel	*module_ch;
222 
223 	uint32_t		ref;
224 
225 	TAILQ_ENTRY(spdk_bdev_module_channel) link;
226 };
227 
228 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
229 
230 struct spdk_bdev *
231 spdk_bdev_first(void)
232 {
233 	struct spdk_bdev *bdev;
234 
235 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
236 	if (bdev) {
237 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
238 	}
239 
240 	return bdev;
241 }
242 
243 struct spdk_bdev *
244 spdk_bdev_next(struct spdk_bdev *prev)
245 {
246 	struct spdk_bdev *bdev;
247 
248 	bdev = TAILQ_NEXT(prev, link);
249 	if (bdev) {
250 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
251 	}
252 
253 	return bdev;
254 }
255 
256 static struct spdk_bdev *
257 _bdev_next_leaf(struct spdk_bdev *bdev)
258 {
259 	while (bdev != NULL) {
260 		if (bdev->claim_module == NULL) {
261 			return bdev;
262 		} else {
263 			bdev = TAILQ_NEXT(bdev, link);
264 		}
265 	}
266 
267 	return bdev;
268 }
269 
270 struct spdk_bdev *
271 spdk_bdev_first_leaf(void)
272 {
273 	struct spdk_bdev *bdev;
274 
275 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
276 
277 	if (bdev) {
278 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
279 	}
280 
281 	return bdev;
282 }
283 
284 struct spdk_bdev *
285 spdk_bdev_next_leaf(struct spdk_bdev *prev)
286 {
287 	struct spdk_bdev *bdev;
288 
289 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
290 
291 	if (bdev) {
292 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
293 	}
294 
295 	return bdev;
296 }
297 
298 struct spdk_bdev *
299 spdk_bdev_get_by_name(const char *bdev_name)
300 {
301 	struct spdk_bdev_alias *tmp;
302 	struct spdk_bdev *bdev = spdk_bdev_first();
303 
304 	while (bdev != NULL) {
305 		if (strcmp(bdev_name, bdev->name) == 0) {
306 			return bdev;
307 		}
308 
309 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
310 			if (strcmp(bdev_name, tmp->alias) == 0) {
311 				return bdev;
312 			}
313 		}
314 
315 		bdev = spdk_bdev_next(bdev);
316 	}
317 
318 	return NULL;
319 }
320 
321 static void
322 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
323 {
324 	assert(bdev_io->get_buf_cb != NULL);
325 	assert(buf != NULL);
326 	assert(bdev_io->u.bdev.iovs != NULL);
327 
328 	bdev_io->buf = buf;
329 	bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
330 	bdev_io->u.bdev.iovs[0].iov_len = bdev_io->buf_len;
331 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
332 }
333 
334 static void
335 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
336 {
337 	struct spdk_mempool *pool;
338 	struct spdk_bdev_io *tmp;
339 	void *buf;
340 	bdev_io_stailq_t *stailq;
341 	struct spdk_bdev_mgmt_channel *ch;
342 
343 	assert(bdev_io->u.bdev.iovcnt == 1);
344 
345 	buf = bdev_io->buf;
346 	ch = bdev_io->mgmt_ch;
347 
348 	if (bdev_io->buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
349 		pool = g_bdev_mgr.buf_small_pool;
350 		stailq = &ch->need_buf_small;
351 	} else {
352 		pool = g_bdev_mgr.buf_large_pool;
353 		stailq = &ch->need_buf_large;
354 	}
355 
356 	if (STAILQ_EMPTY(stailq)) {
357 		spdk_mempool_put(pool, buf);
358 	} else {
359 		tmp = STAILQ_FIRST(stailq);
360 		STAILQ_REMOVE_HEAD(stailq, buf_link);
361 		spdk_bdev_io_set_buf(tmp, buf);
362 	}
363 }
364 
365 void
366 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
367 {
368 	struct spdk_mempool *pool;
369 	bdev_io_stailq_t *stailq;
370 	void *buf = NULL;
371 	struct spdk_bdev_mgmt_channel *ch;
372 
373 	assert(cb != NULL);
374 	assert(bdev_io->u.bdev.iovs != NULL);
375 
376 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
377 		/* Buffer already present */
378 		cb(bdev_io->ch->channel, bdev_io);
379 		return;
380 	}
381 
382 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
383 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
384 
385 	bdev_io->buf_len = len;
386 	bdev_io->get_buf_cb = cb;
387 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
388 		pool = g_bdev_mgr.buf_small_pool;
389 		stailq = &ch->need_buf_small;
390 	} else {
391 		pool = g_bdev_mgr.buf_large_pool;
392 		stailq = &ch->need_buf_large;
393 	}
394 
395 	buf = spdk_mempool_get(pool);
396 
397 	if (!buf) {
398 		STAILQ_INSERT_TAIL(stailq, bdev_io, buf_link);
399 	} else {
400 		spdk_bdev_io_set_buf(bdev_io, buf);
401 	}
402 }
403 
404 static int
405 spdk_bdev_module_get_max_ctx_size(void)
406 {
407 	struct spdk_bdev_module *bdev_module;
408 	int max_bdev_module_size = 0;
409 
410 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
411 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
412 			max_bdev_module_size = bdev_module->get_ctx_size();
413 		}
414 	}
415 
416 	return max_bdev_module_size;
417 }
418 
419 void
420 spdk_bdev_config_text(FILE *fp)
421 {
422 	struct spdk_bdev_module *bdev_module;
423 
424 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
425 		if (bdev_module->config_text) {
426 			bdev_module->config_text(fp);
427 		}
428 	}
429 }
430 
431 void
432 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w)
433 {
434 	struct spdk_bdev_module *bdev_module;
435 	struct spdk_bdev *bdev;
436 
437 	assert(w != NULL);
438 
439 	spdk_json_write_array_begin(w);
440 
441 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
442 		if (bdev_module->config_json) {
443 			bdev_module->config_json(w);
444 		}
445 	}
446 
447 	TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, link) {
448 		spdk_bdev_config_json(bdev, w);
449 	}
450 
451 	spdk_json_write_array_end(w);
452 }
453 
454 static int
455 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
456 {
457 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
458 
459 	STAILQ_INIT(&ch->need_buf_small);
460 	STAILQ_INIT(&ch->need_buf_large);
461 
462 	STAILQ_INIT(&ch->per_thread_cache);
463 	ch->per_thread_cache_count = 0;
464 
465 	TAILQ_INIT(&ch->module_channels);
466 
467 	return 0;
468 }
469 
470 static void
471 spdk_bdev_mgmt_channel_free_resources(struct spdk_bdev_mgmt_channel *ch)
472 {
473 	struct spdk_bdev_io *bdev_io;
474 
475 	if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
476 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel free\n");
477 	}
478 
479 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
480 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
481 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link);
482 		ch->per_thread_cache_count--;
483 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
484 	}
485 
486 	assert(ch->per_thread_cache_count == 0);
487 }
488 
489 static void
490 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
491 {
492 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
493 
494 	spdk_bdev_mgmt_channel_free_resources(ch);
495 }
496 
497 static void
498 spdk_bdev_init_complete(int rc)
499 {
500 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
501 	void *cb_arg = g_init_cb_arg;
502 
503 	g_bdev_mgr.init_complete = true;
504 	g_init_cb_fn = NULL;
505 	g_init_cb_arg = NULL;
506 
507 	cb_fn(cb_arg, rc);
508 }
509 
510 static void
511 spdk_bdev_module_action_complete(void)
512 {
513 	struct spdk_bdev_module *m;
514 
515 	/*
516 	 * Don't finish bdev subsystem initialization if
517 	 * module pre-initialization is still in progress, or
518 	 * the subsystem been already initialized.
519 	 */
520 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
521 		return;
522 	}
523 
524 	/*
525 	 * Check all bdev modules for inits/examinations in progress. If any
526 	 * exist, return immediately since we cannot finish bdev subsystem
527 	 * initialization until all are completed.
528 	 */
529 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
530 		if (m->action_in_progress > 0) {
531 			return;
532 		}
533 	}
534 
535 	/*
536 	 * Modules already finished initialization - now that all
537 	 * the bdev modules have finished their asynchronous I/O
538 	 * processing, the entire bdev layer can be marked as complete.
539 	 */
540 	spdk_bdev_init_complete(0);
541 }
542 
543 static void
544 spdk_bdev_module_action_done(struct spdk_bdev_module *module)
545 {
546 	assert(module->action_in_progress > 0);
547 	module->action_in_progress--;
548 	spdk_bdev_module_action_complete();
549 }
550 
551 void
552 spdk_bdev_module_init_done(struct spdk_bdev_module *module)
553 {
554 	spdk_bdev_module_action_done(module);
555 }
556 
557 void
558 spdk_bdev_module_examine_done(struct spdk_bdev_module *module)
559 {
560 	spdk_bdev_module_action_done(module);
561 }
562 
563 static int
564 spdk_bdev_modules_init(void)
565 {
566 	struct spdk_bdev_module *module;
567 	int rc = 0;
568 
569 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
570 		rc = module->module_init();
571 		if (rc != 0) {
572 			break;
573 		}
574 	}
575 
576 	g_bdev_mgr.module_init_complete = true;
577 	return rc;
578 }
579 void
580 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
581 {
582 	int cache_size;
583 	int rc = 0;
584 	char mempool_name[32];
585 
586 	assert(cb_fn != NULL);
587 
588 	g_init_cb_fn = cb_fn;
589 	g_init_cb_arg = cb_arg;
590 
591 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
592 
593 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
594 				  SPDK_BDEV_IO_POOL_SIZE,
595 				  sizeof(struct spdk_bdev_io) +
596 				  spdk_bdev_module_get_max_ctx_size(),
597 				  0,
598 				  SPDK_ENV_SOCKET_ID_ANY);
599 
600 	if (g_bdev_mgr.bdev_io_pool == NULL) {
601 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
602 		spdk_bdev_init_complete(-1);
603 		return;
604 	}
605 
606 	/**
607 	 * Ensure no more than half of the total buffers end up local caches, by
608 	 *   using spdk_env_get_core_count() to determine how many local caches we need
609 	 *   to account for.
610 	 */
611 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
612 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
613 
614 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
615 				    BUF_SMALL_POOL_SIZE,
616 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
617 				    cache_size,
618 				    SPDK_ENV_SOCKET_ID_ANY);
619 	if (!g_bdev_mgr.buf_small_pool) {
620 		SPDK_ERRLOG("create rbuf small pool failed\n");
621 		spdk_bdev_init_complete(-1);
622 		return;
623 	}
624 
625 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
626 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
627 
628 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
629 				    BUF_LARGE_POOL_SIZE,
630 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
631 				    cache_size,
632 				    SPDK_ENV_SOCKET_ID_ANY);
633 	if (!g_bdev_mgr.buf_large_pool) {
634 		SPDK_ERRLOG("create rbuf large pool failed\n");
635 		spdk_bdev_init_complete(-1);
636 		return;
637 	}
638 
639 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
640 				 NULL);
641 	if (!g_bdev_mgr.zero_buffer) {
642 		SPDK_ERRLOG("create bdev zero buffer failed\n");
643 		spdk_bdev_init_complete(-1);
644 		return;
645 	}
646 
647 #ifdef SPDK_CONFIG_VTUNE
648 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
649 #endif
650 
651 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
652 				spdk_bdev_mgmt_channel_destroy,
653 				sizeof(struct spdk_bdev_mgmt_channel));
654 
655 	rc = spdk_bdev_modules_init();
656 	if (rc != 0) {
657 		SPDK_ERRLOG("bdev modules init failed\n");
658 		spdk_bdev_init_complete(-1);
659 		return;
660 	}
661 
662 	spdk_bdev_module_action_complete();
663 }
664 
665 static void
666 spdk_bdev_module_finish_cb(void *io_device)
667 {
668 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
669 
670 	cb_fn(g_fini_cb_arg);
671 	g_fini_cb_fn = NULL;
672 	g_fini_cb_arg = NULL;
673 }
674 
675 static void
676 spdk_bdev_module_finish_complete(struct spdk_io_channel_iter *i, int status)
677 {
678 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
679 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
680 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
681 			    SPDK_BDEV_IO_POOL_SIZE);
682 	}
683 
684 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
685 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
686 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
687 			    BUF_SMALL_POOL_SIZE);
688 		assert(false);
689 	}
690 
691 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
692 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
693 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
694 			    BUF_LARGE_POOL_SIZE);
695 		assert(false);
696 	}
697 
698 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
699 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
700 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
701 	spdk_dma_free(g_bdev_mgr.zero_buffer);
702 
703 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_module_finish_cb);
704 }
705 
706 static void
707 mgmt_channel_free_resources(struct spdk_io_channel_iter *i)
708 {
709 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
710 	struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(_ch);
711 
712 	spdk_bdev_mgmt_channel_free_resources(ch);
713 	spdk_for_each_channel_continue(i, 0);
714 }
715 
716 static void
717 spdk_bdev_module_finish_iter(void *arg)
718 {
719 	/* Notice that this variable is static. It is saved between calls to
720 	 * this function. */
721 	static struct spdk_bdev_module *resume_bdev_module = NULL;
722 	struct spdk_bdev_module *bdev_module;
723 
724 	/* Start iterating from the last touched module */
725 	if (!resume_bdev_module) {
726 		bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules);
727 	} else {
728 		bdev_module = TAILQ_NEXT(resume_bdev_module, tailq);
729 	}
730 
731 	while (bdev_module) {
732 		if (bdev_module->async_fini) {
733 			/* Save our place so we can resume later. We must
734 			 * save the variable here, before calling module_fini()
735 			 * below, because in some cases the module may immediately
736 			 * call spdk_bdev_module_finish_done() and re-enter
737 			 * this function to continue iterating. */
738 			resume_bdev_module = bdev_module;
739 		}
740 
741 		if (bdev_module->module_fini) {
742 			bdev_module->module_fini();
743 		}
744 
745 		if (bdev_module->async_fini) {
746 			return;
747 		}
748 
749 		bdev_module = TAILQ_NEXT(bdev_module, tailq);
750 	}
751 
752 	resume_bdev_module = NULL;
753 	spdk_for_each_channel(&g_bdev_mgr, mgmt_channel_free_resources, NULL,
754 			      spdk_bdev_module_finish_complete);
755 }
756 
757 void
758 spdk_bdev_module_finish_done(void)
759 {
760 	if (spdk_get_thread() != g_fini_thread) {
761 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
762 	} else {
763 		spdk_bdev_module_finish_iter(NULL);
764 	}
765 }
766 
767 static void
768 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
769 {
770 	struct spdk_bdev *bdev = cb_arg;
771 
772 	if (bdeverrno && bdev) {
773 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
774 			     bdev->name);
775 
776 		/*
777 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
778 		 *  bdev; try to continue by manually removing this bdev from the list and continue
779 		 *  with the next bdev in the list.
780 		 */
781 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
782 	}
783 
784 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
785 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
786 		/*
787 		 * Bdev module finish need to be deffered as we might be in the middle of some context
788 		 * (like bdev part free) that will use this bdev (or private bdev driver ctx data)
789 		 * after returning.
790 		 */
791 		spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL);
792 		return;
793 	}
794 
795 	/*
796 	 * Unregister the first bdev in the list.
797 	 *
798 	 * spdk_bdev_unregister() will handle the case where the bdev has open descriptors by
799 	 *  calling the remove_cb of the descriptors first.
800 	 *
801 	 * Once this bdev and all of its open descriptors have been cleaned up, this function
802 	 *  will be called again via the unregister completion callback to continue the cleanup
803 	 *  process with the next bdev.
804 	 */
805 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
806 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
807 	spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
808 }
809 
810 static void
811 _spdk_bdev_finish_unregister_bdevs(void)
812 {
813 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
814 }
815 
816 void
817 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
818 {
819 	assert(cb_fn != NULL);
820 
821 	g_fini_thread = spdk_get_thread();
822 
823 	g_fini_cb_fn = cb_fn;
824 	g_fini_cb_arg = cb_arg;
825 
826 	_spdk_bdev_finish_unregister_bdevs();
827 }
828 
829 static struct spdk_bdev_io *
830 spdk_bdev_get_io(struct spdk_io_channel *_ch)
831 {
832 	struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(_ch);
833 	struct spdk_bdev_io *bdev_io;
834 
835 	if (ch->per_thread_cache_count > 0) {
836 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
837 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, buf_link);
838 		ch->per_thread_cache_count--;
839 	} else {
840 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
841 		if (!bdev_io) {
842 			SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
843 			return NULL;
844 		}
845 	}
846 
847 	bdev_io->mgmt_ch = ch;
848 
849 	return bdev_io;
850 }
851 
852 static void
853 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
854 {
855 	struct spdk_bdev_mgmt_channel *ch = bdev_io->mgmt_ch;
856 
857 	if (bdev_io->buf != NULL) {
858 		spdk_bdev_io_put_buf(bdev_io);
859 	}
860 
861 	if (ch->per_thread_cache_count < SPDK_BDEV_IO_CACHE_SIZE) {
862 		ch->per_thread_cache_count++;
863 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, buf_link);
864 	} else {
865 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
866 	}
867 }
868 
869 static void
870 _spdk_bdev_qos_io_submit(void *ctx)
871 {
872 	struct spdk_bdev_channel	*ch = ctx;
873 	struct spdk_bdev_io		*bdev_io = NULL;
874 	struct spdk_bdev		*bdev = ch->bdev;
875 	struct spdk_bdev_module_channel *shared_ch = ch->module_ch;
876 
877 	while (!TAILQ_EMPTY(&ch->qos_io)) {
878 		if (ch->io_submitted_this_timeslice < ch->qos_max_ios_per_timeslice) {
879 			bdev_io = TAILQ_FIRST(&ch->qos_io);
880 			TAILQ_REMOVE(&ch->qos_io, bdev_io, link);
881 			ch->io_submitted_this_timeslice++;
882 			ch->io_outstanding++;
883 			shared_ch->io_outstanding++;
884 			bdev->fn_table->submit_request(ch->channel, bdev_io);
885 		} else {
886 			break;
887 		}
888 	}
889 }
890 
891 static void
892 _spdk_bdev_io_submit(void *ctx)
893 {
894 	struct spdk_bdev_io *bdev_io = ctx;
895 	struct spdk_bdev *bdev = bdev_io->bdev;
896 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
897 	struct spdk_io_channel *ch = bdev_ch->channel;
898 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
899 
900 	bdev_io->submit_tsc = spdk_get_ticks();
901 	bdev_ch->io_outstanding++;
902 	shared_ch->io_outstanding++;
903 	bdev_io->in_submit_request = true;
904 	if (spdk_likely(bdev_ch->flags == 0)) {
905 		if (spdk_likely(TAILQ_EMPTY(&shared_ch->nomem_io))) {
906 			bdev->fn_table->submit_request(ch, bdev_io);
907 		} else {
908 			bdev_ch->io_outstanding--;
909 			shared_ch->io_outstanding--;
910 			TAILQ_INSERT_TAIL(&shared_ch->nomem_io, bdev_io, link);
911 		}
912 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
913 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
914 	} else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) {
915 		bdev_ch->io_outstanding--;
916 		shared_ch->io_outstanding--;
917 		TAILQ_INSERT_TAIL(&bdev_ch->qos_io, bdev_io, link);
918 		_spdk_bdev_qos_io_submit(bdev_ch);
919 	} else {
920 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
921 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
922 	}
923 	bdev_io->in_submit_request = false;
924 }
925 
926 static void
927 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
928 {
929 	struct spdk_bdev *bdev = bdev_io->bdev;
930 
931 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
932 
933 	/* QoS channel and thread have been properly configured */
934 	if (bdev->ios_per_sec > 0 && bdev->qos_channel && bdev->qos_thread) {
935 		bdev_io->io_submit_ch = bdev_io->ch;
936 		bdev_io->ch = bdev->qos_channel;
937 		spdk_thread_send_msg(bdev->qos_thread, _spdk_bdev_io_submit, bdev_io);
938 	} else {
939 		_spdk_bdev_io_submit(bdev_io);
940 	}
941 }
942 
943 static void
944 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
945 {
946 	struct spdk_bdev *bdev = bdev_io->bdev;
947 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
948 	struct spdk_io_channel *ch = bdev_ch->channel;
949 
950 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
951 
952 	bdev_io->in_submit_request = true;
953 	bdev->fn_table->submit_request(ch, bdev_io);
954 	bdev_io->in_submit_request = false;
955 }
956 
957 static void
958 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
959 		  struct spdk_bdev *bdev, void *cb_arg,
960 		  spdk_bdev_io_completion_cb cb)
961 {
962 	bdev_io->bdev = bdev;
963 	bdev_io->caller_ctx = cb_arg;
964 	bdev_io->cb = cb;
965 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
966 	bdev_io->in_submit_request = false;
967 	bdev_io->buf = NULL;
968 	bdev_io->io_submit_ch = NULL;
969 }
970 
971 bool
972 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
973 {
974 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
975 }
976 
977 int
978 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
979 {
980 	if (bdev->fn_table->dump_info_json) {
981 		return bdev->fn_table->dump_info_json(bdev->ctxt, w);
982 	}
983 
984 	return 0;
985 }
986 
987 void
988 spdk_bdev_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
989 {
990 	assert(bdev != NULL);
991 	assert(w != NULL);
992 
993 	if (bdev->fn_table->write_config_json) {
994 		bdev->fn_table->write_config_json(bdev, w);
995 	} else {
996 		spdk_json_write_object_begin(w);
997 		spdk_json_write_named_string(w, "name", bdev->name);
998 		spdk_json_write_object_end(w);
999 	}
1000 }
1001 
1002 static void
1003 spdk_bdev_qos_get_max_ios_per_timeslice(struct spdk_bdev_channel *qos_ch)
1004 {
1005 	uint64_t		qos_max_ios_per_timeslice = 0;
1006 	struct spdk_bdev	*bdev = qos_ch->bdev;
1007 
1008 	qos_max_ios_per_timeslice = bdev->ios_per_sec * SPDK_BDEV_QOS_TIMESLICE_IN_USEC /
1009 				    SPDK_BDEV_SEC_TO_USEC;
1010 	qos_ch->qos_max_ios_per_timeslice = spdk_max(qos_max_ios_per_timeslice,
1011 					    SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE);
1012 }
1013 
1014 static int
1015 spdk_bdev_channel_poll_qos(void *arg)
1016 {
1017 	struct spdk_bdev_channel	*ch = arg;
1018 
1019 	/* Reset for next round of rate limiting */
1020 	ch->io_submitted_this_timeslice = 0;
1021 	spdk_bdev_qos_get_max_ios_per_timeslice(ch);
1022 
1023 	_spdk_bdev_qos_io_submit(ch);
1024 
1025 	return -1;
1026 }
1027 
1028 static int
1029 _spdk_bdev_channel_create(struct spdk_bdev_channel *ch, void *io_device)
1030 {
1031 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1032 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1033 	struct spdk_bdev_module_channel	*shared_ch;
1034 
1035 	ch->bdev = bdev;
1036 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
1037 	if (!ch->channel) {
1038 		return -1;
1039 	}
1040 
1041 	ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr);
1042 	if (!ch->mgmt_channel) {
1043 		return -1;
1044 	}
1045 
1046 	mgmt_ch = spdk_io_channel_get_ctx(ch->mgmt_channel);
1047 	TAILQ_FOREACH(shared_ch, &mgmt_ch->module_channels, link) {
1048 		if (shared_ch->module_ch == ch->channel) {
1049 			shared_ch->ref++;
1050 			break;
1051 		}
1052 	}
1053 
1054 	if (shared_ch == NULL) {
1055 		shared_ch = calloc(1, sizeof(*shared_ch));
1056 		if (!shared_ch) {
1057 			return -1;
1058 		}
1059 
1060 		shared_ch->io_outstanding = 0;
1061 		TAILQ_INIT(&shared_ch->nomem_io);
1062 		shared_ch->nomem_threshold = 0;
1063 		shared_ch->module_ch = ch->channel;
1064 		shared_ch->ref = 1;
1065 		TAILQ_INSERT_TAIL(&mgmt_ch->module_channels, shared_ch, link);
1066 	}
1067 
1068 	memset(&ch->stat, 0, sizeof(ch->stat));
1069 	ch->io_outstanding = 0;
1070 	TAILQ_INIT(&ch->queued_resets);
1071 	TAILQ_INIT(&ch->qos_io);
1072 	ch->qos_max_ios_per_timeslice = 0;
1073 	ch->io_submitted_this_timeslice = 0;
1074 	ch->qos_poller = NULL;
1075 	ch->flags = 0;
1076 	ch->module_ch = shared_ch;
1077 
1078 	return 0;
1079 }
1080 
1081 static void
1082 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
1083 {
1084 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1085 	struct spdk_bdev_module_channel	*shared_ch = NULL;
1086 
1087 	if (!ch) {
1088 		return;
1089 	}
1090 
1091 	if (ch->channel) {
1092 		spdk_put_io_channel(ch->channel);
1093 	}
1094 
1095 	if (ch->mgmt_channel) {
1096 		shared_ch = ch->module_ch;
1097 		if (shared_ch) {
1098 			assert(ch->io_outstanding == 0);
1099 			assert(shared_ch->ref > 0);
1100 			shared_ch->ref--;
1101 			if (shared_ch->ref == 0) {
1102 				mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
1103 				assert(shared_ch->io_outstanding == 0);
1104 				TAILQ_REMOVE(&mgmt_channel->module_channels, shared_ch, link);
1105 				free(shared_ch);
1106 			}
1107 		}
1108 		spdk_put_io_channel(ch->mgmt_channel);
1109 	}
1110 }
1111 
1112 /* Caller must hold bdev->mutex. */
1113 static int
1114 spdk_bdev_qos_channel_create(struct spdk_bdev *bdev)
1115 {
1116 	assert(bdev->qos_channel == NULL);
1117 	assert(bdev->qos_thread == NULL);
1118 
1119 	bdev->qos_channel = calloc(1, sizeof(struct spdk_bdev_channel));
1120 	if (!bdev->qos_channel) {
1121 		return -1;
1122 	}
1123 
1124 	bdev->qos_thread = spdk_get_thread();
1125 	if (!bdev->qos_thread) {
1126 		free(bdev->qos_channel);
1127 		bdev->qos_channel = NULL;
1128 		return -1;
1129 	}
1130 
1131 	if (_spdk_bdev_channel_create(bdev->qos_channel, __bdev_to_io_dev(bdev)) != 0) {
1132 		free(bdev->qos_channel);
1133 		bdev->qos_channel = NULL;
1134 		bdev->qos_thread = NULL;
1135 		return -1;
1136 	}
1137 
1138 	bdev->qos_channel->flags |= BDEV_CH_QOS_ENABLED;
1139 	spdk_bdev_qos_get_max_ios_per_timeslice(bdev->qos_channel);
1140 	bdev->qos_channel->qos_poller = spdk_poller_register(
1141 						spdk_bdev_channel_poll_qos,
1142 						bdev->qos_channel,
1143 						SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
1144 
1145 	return 0;
1146 }
1147 
1148 static int
1149 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
1150 {
1151 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1152 	struct spdk_bdev_channel	*ch = ctx_buf;
1153 
1154 	if (_spdk_bdev_channel_create(ch, io_device) != 0) {
1155 		_spdk_bdev_channel_destroy_resource(ch);
1156 		return -1;
1157 	}
1158 
1159 #ifdef SPDK_CONFIG_VTUNE
1160 	{
1161 		char *name;
1162 		__itt_init_ittlib(NULL, 0);
1163 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
1164 		if (!name) {
1165 			_spdk_bdev_channel_destroy_resource(ch);
1166 			return -1;
1167 		}
1168 		ch->handle = __itt_string_handle_create(name);
1169 		free(name);
1170 		ch->start_tsc = spdk_get_ticks();
1171 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
1172 	}
1173 #endif
1174 
1175 	pthread_mutex_lock(&bdev->mutex);
1176 
1177 	/* Rate limiting on this bdev enabled */
1178 	if (bdev->ios_per_sec > 0 && bdev->qos_channel == NULL) {
1179 		if (spdk_bdev_qos_channel_create(bdev) != 0) {
1180 			_spdk_bdev_channel_destroy_resource(ch);
1181 			pthread_mutex_unlock(&bdev->mutex);
1182 			return -1;
1183 		}
1184 	}
1185 
1186 	bdev->channel_count++;
1187 
1188 	pthread_mutex_unlock(&bdev->mutex);
1189 
1190 	return 0;
1191 }
1192 
1193 /*
1194  * Abort I/O that are waiting on a data buffer.  These types of I/O are
1195  *  linked using the spdk_bdev_io buf_link TAILQ_ENTRY.
1196  */
1197 static void
1198 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
1199 {
1200 	bdev_io_stailq_t tmp;
1201 	struct spdk_bdev_io *bdev_io;
1202 
1203 	STAILQ_INIT(&tmp);
1204 
1205 	while (!STAILQ_EMPTY(queue)) {
1206 		bdev_io = STAILQ_FIRST(queue);
1207 		STAILQ_REMOVE_HEAD(queue, buf_link);
1208 		if (bdev_io->ch == ch) {
1209 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1210 		} else {
1211 			STAILQ_INSERT_TAIL(&tmp, bdev_io, buf_link);
1212 		}
1213 	}
1214 
1215 	STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
1216 }
1217 
1218 /*
1219  * Abort I/O that are queued waiting for submission.  These types of I/O are
1220  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
1221  */
1222 static void
1223 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
1224 {
1225 	struct spdk_bdev_io *bdev_io, *tmp;
1226 
1227 	TAILQ_FOREACH_SAFE(bdev_io, queue, link, tmp) {
1228 		if (bdev_io->ch == ch) {
1229 			TAILQ_REMOVE(queue, bdev_io, link);
1230 			/*
1231 			 * spdk_bdev_io_complete() assumes that the completed I/O had
1232 			 *  been submitted to the bdev module.  Since in this case it
1233 			 *  hadn't, bump io_outstanding to account for the decrement
1234 			 *  that spdk_bdev_io_complete() will do.
1235 			 */
1236 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
1237 				ch->io_outstanding++;
1238 				ch->module_ch->io_outstanding++;
1239 			}
1240 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1241 		}
1242 	}
1243 }
1244 
1245 static void
1246 _spdk_bdev_channel_destroy(struct spdk_bdev_channel *ch)
1247 {
1248 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1249 	struct spdk_bdev_module_channel	*shared_ch = ch->module_ch;
1250 
1251 	mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
1252 
1253 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
1254 	_spdk_bdev_abort_queued_io(&ch->qos_io, ch);
1255 	_spdk_bdev_abort_queued_io(&shared_ch->nomem_io, ch);
1256 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, ch);
1257 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, ch);
1258 
1259 	_spdk_bdev_channel_destroy_resource(ch);
1260 }
1261 
1262 static void
1263 spdk_bdev_qos_channel_destroy(void *ctx)
1264 {
1265 	struct spdk_bdev_channel *qos_channel = ctx;
1266 
1267 	_spdk_bdev_channel_destroy(qos_channel);
1268 
1269 	spdk_poller_unregister(&qos_channel->qos_poller);
1270 	free(qos_channel);
1271 }
1272 
1273 static void
1274 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
1275 {
1276 	struct spdk_bdev_channel	*ch = ctx_buf;
1277 	struct spdk_bdev		*bdev = ch->bdev;
1278 
1279 	_spdk_bdev_channel_destroy(ch);
1280 
1281 	pthread_mutex_lock(&bdev->mutex);
1282 	bdev->channel_count--;
1283 	if (bdev->channel_count == 0 && bdev->qos_channel != NULL) {
1284 		/* All I/O channels for this bdev have been destroyed - destroy the QoS channel. */
1285 		spdk_thread_send_msg(bdev->qos_thread, spdk_bdev_qos_channel_destroy,
1286 				     bdev->qos_channel);
1287 
1288 		/*
1289 		 * Set qos_channel to NULL within the critical section so that
1290 		 * if another channel is created, it will see qos_channel == NULL and
1291 		 * re-create the QoS channel even if the asynchronous qos_channel_destroy
1292 		 * isn't finished yet.
1293 		 */
1294 		bdev->qos_channel = NULL;
1295 		bdev->qos_thread = NULL;
1296 	}
1297 	pthread_mutex_unlock(&bdev->mutex);
1298 }
1299 
1300 int
1301 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
1302 {
1303 	struct spdk_bdev_alias *tmp;
1304 
1305 	if (alias == NULL) {
1306 		SPDK_ERRLOG("Empty alias passed\n");
1307 		return -EINVAL;
1308 	}
1309 
1310 	if (spdk_bdev_get_by_name(alias)) {
1311 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
1312 		return -EEXIST;
1313 	}
1314 
1315 	tmp = calloc(1, sizeof(*tmp));
1316 	if (tmp == NULL) {
1317 		SPDK_ERRLOG("Unable to allocate alias\n");
1318 		return -ENOMEM;
1319 	}
1320 
1321 	tmp->alias = strdup(alias);
1322 	if (tmp->alias == NULL) {
1323 		free(tmp);
1324 		SPDK_ERRLOG("Unable to allocate alias\n");
1325 		return -ENOMEM;
1326 	}
1327 
1328 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
1329 
1330 	return 0;
1331 }
1332 
1333 int
1334 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
1335 {
1336 	struct spdk_bdev_alias *tmp;
1337 
1338 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
1339 		if (strcmp(alias, tmp->alias) == 0) {
1340 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
1341 			free(tmp->alias);
1342 			free(tmp);
1343 			return 0;
1344 		}
1345 	}
1346 
1347 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1348 
1349 	return -ENOENT;
1350 }
1351 
1352 struct spdk_io_channel *
1353 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1354 {
1355 	return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev));
1356 }
1357 
1358 const char *
1359 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1360 {
1361 	return bdev->name;
1362 }
1363 
1364 const char *
1365 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1366 {
1367 	return bdev->product_name;
1368 }
1369 
1370 const struct spdk_bdev_aliases_list *
1371 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1372 {
1373 	return &bdev->aliases;
1374 }
1375 
1376 uint32_t
1377 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1378 {
1379 	return bdev->blocklen;
1380 }
1381 
1382 uint64_t
1383 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
1384 {
1385 	return bdev->blockcnt;
1386 }
1387 
1388 size_t
1389 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
1390 {
1391 	/* TODO: push this logic down to the bdev modules */
1392 	if (bdev->need_aligned_buffer) {
1393 		return bdev->blocklen;
1394 	}
1395 
1396 	return 1;
1397 }
1398 
1399 uint32_t
1400 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
1401 {
1402 	return bdev->optimal_io_boundary;
1403 }
1404 
1405 bool
1406 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
1407 {
1408 	return bdev->write_cache;
1409 }
1410 
1411 const struct spdk_uuid *
1412 spdk_bdev_get_uuid(const struct spdk_bdev *bdev)
1413 {
1414 	return &bdev->uuid;
1415 }
1416 
1417 int
1418 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
1419 {
1420 	int ret;
1421 
1422 	pthread_mutex_lock(&bdev->mutex);
1423 
1424 	/* bdev has open descriptors */
1425 	if (!TAILQ_EMPTY(&bdev->open_descs) &&
1426 	    bdev->blockcnt > size) {
1427 		ret = -EBUSY;
1428 	} else {
1429 		bdev->blockcnt = size;
1430 		ret = 0;
1431 	}
1432 
1433 	pthread_mutex_unlock(&bdev->mutex);
1434 
1435 	return ret;
1436 }
1437 
1438 /*
1439  * Convert I/O offset and length from bytes to blocks.
1440  *
1441  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
1442  */
1443 static uint64_t
1444 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
1445 			  uint64_t num_bytes, uint64_t *num_blocks)
1446 {
1447 	uint32_t block_size = bdev->blocklen;
1448 
1449 	*offset_blocks = offset_bytes / block_size;
1450 	*num_blocks = num_bytes / block_size;
1451 
1452 	return (offset_bytes % block_size) | (num_bytes % block_size);
1453 }
1454 
1455 static bool
1456 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
1457 {
1458 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
1459 	 * has been an overflow and hence the offset has been wrapped around */
1460 	if (offset_blocks + num_blocks < offset_blocks) {
1461 		return false;
1462 	}
1463 
1464 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
1465 	if (offset_blocks + num_blocks > bdev->blockcnt) {
1466 		return false;
1467 	}
1468 
1469 	return true;
1470 }
1471 
1472 int
1473 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1474 	       void *buf, uint64_t offset, uint64_t nbytes,
1475 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
1476 {
1477 	uint64_t offset_blocks, num_blocks;
1478 
1479 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1480 		return -EINVAL;
1481 	}
1482 
1483 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1484 }
1485 
1486 int
1487 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1488 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1489 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
1490 {
1491 	struct spdk_bdev *bdev = desc->bdev;
1492 	struct spdk_bdev_io *bdev_io;
1493 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1494 
1495 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1496 		return -EINVAL;
1497 	}
1498 
1499 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1500 	if (!bdev_io) {
1501 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1502 		return -ENOMEM;
1503 	}
1504 
1505 	bdev_io->ch = channel;
1506 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1507 	bdev_io->u.bdev.iov.iov_base = buf;
1508 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1509 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1510 	bdev_io->u.bdev.iovcnt = 1;
1511 	bdev_io->u.bdev.num_blocks = num_blocks;
1512 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1513 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1514 
1515 	spdk_bdev_io_submit(bdev_io);
1516 	return 0;
1517 }
1518 
1519 int
1520 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1521 		struct iovec *iov, int iovcnt,
1522 		uint64_t offset, uint64_t nbytes,
1523 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1524 {
1525 	uint64_t offset_blocks, num_blocks;
1526 
1527 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1528 		return -EINVAL;
1529 	}
1530 
1531 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1532 }
1533 
1534 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1535 			   struct iovec *iov, int iovcnt,
1536 			   uint64_t offset_blocks, uint64_t num_blocks,
1537 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1538 {
1539 	struct spdk_bdev *bdev = desc->bdev;
1540 	struct spdk_bdev_io *bdev_io;
1541 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1542 
1543 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1544 		return -EINVAL;
1545 	}
1546 
1547 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1548 	if (!bdev_io) {
1549 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1550 		return -ENOMEM;
1551 	}
1552 
1553 	bdev_io->ch = channel;
1554 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1555 	bdev_io->u.bdev.iovs = iov;
1556 	bdev_io->u.bdev.iovcnt = iovcnt;
1557 	bdev_io->u.bdev.num_blocks = num_blocks;
1558 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1559 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1560 
1561 	spdk_bdev_io_submit(bdev_io);
1562 	return 0;
1563 }
1564 
1565 int
1566 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1567 		void *buf, uint64_t offset, uint64_t nbytes,
1568 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1569 {
1570 	uint64_t offset_blocks, num_blocks;
1571 
1572 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1573 		return -EINVAL;
1574 	}
1575 
1576 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1577 }
1578 
1579 int
1580 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1581 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1582 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1583 {
1584 	struct spdk_bdev *bdev = desc->bdev;
1585 	struct spdk_bdev_io *bdev_io;
1586 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1587 
1588 	if (!desc->write) {
1589 		return -EBADF;
1590 	}
1591 
1592 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1593 		return -EINVAL;
1594 	}
1595 
1596 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1597 	if (!bdev_io) {
1598 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
1599 		return -ENOMEM;
1600 	}
1601 
1602 	bdev_io->ch = channel;
1603 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1604 	bdev_io->u.bdev.iov.iov_base = buf;
1605 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1606 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1607 	bdev_io->u.bdev.iovcnt = 1;
1608 	bdev_io->u.bdev.num_blocks = num_blocks;
1609 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1610 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1611 
1612 	spdk_bdev_io_submit(bdev_io);
1613 	return 0;
1614 }
1615 
1616 int
1617 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1618 		 struct iovec *iov, int iovcnt,
1619 		 uint64_t offset, uint64_t len,
1620 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
1621 {
1622 	uint64_t offset_blocks, num_blocks;
1623 
1624 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1625 		return -EINVAL;
1626 	}
1627 
1628 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1629 }
1630 
1631 int
1632 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1633 			struct iovec *iov, int iovcnt,
1634 			uint64_t offset_blocks, uint64_t num_blocks,
1635 			spdk_bdev_io_completion_cb cb, void *cb_arg)
1636 {
1637 	struct spdk_bdev *bdev = desc->bdev;
1638 	struct spdk_bdev_io *bdev_io;
1639 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1640 
1641 	if (!desc->write) {
1642 		return -EBADF;
1643 	}
1644 
1645 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1646 		return -EINVAL;
1647 	}
1648 
1649 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1650 	if (!bdev_io) {
1651 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
1652 		return -ENOMEM;
1653 	}
1654 
1655 	bdev_io->ch = channel;
1656 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1657 	bdev_io->u.bdev.iovs = iov;
1658 	bdev_io->u.bdev.iovcnt = iovcnt;
1659 	bdev_io->u.bdev.num_blocks = num_blocks;
1660 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1661 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1662 
1663 	spdk_bdev_io_submit(bdev_io);
1664 	return 0;
1665 }
1666 
1667 int
1668 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1669 		       uint64_t offset, uint64_t len,
1670 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1671 {
1672 	uint64_t offset_blocks, num_blocks;
1673 
1674 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1675 		return -EINVAL;
1676 	}
1677 
1678 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1679 }
1680 
1681 int
1682 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1683 			      uint64_t offset_blocks, uint64_t num_blocks,
1684 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1685 {
1686 	struct spdk_bdev *bdev = desc->bdev;
1687 	struct spdk_bdev_io *bdev_io;
1688 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1689 	uint64_t len;
1690 	bool split_request = false;
1691 
1692 	if (num_blocks > UINT64_MAX / spdk_bdev_get_block_size(bdev)) {
1693 		SPDK_ERRLOG("length argument out of range in write_zeroes\n");
1694 		return -ERANGE;
1695 	}
1696 
1697 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1698 		return -EINVAL;
1699 	}
1700 
1701 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1702 
1703 	if (!bdev_io) {
1704 		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
1705 		return -ENOMEM;
1706 	}
1707 
1708 	bdev_io->ch = channel;
1709 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1710 
1711 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1712 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1713 		bdev_io->u.bdev.num_blocks = num_blocks;
1714 		bdev_io->u.bdev.iovs = NULL;
1715 		bdev_io->u.bdev.iovcnt = 0;
1716 
1717 	} else {
1718 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
1719 
1720 		len = spdk_bdev_get_block_size(bdev) * num_blocks;
1721 
1722 		if (len > ZERO_BUFFER_SIZE) {
1723 			split_request = true;
1724 			len = ZERO_BUFFER_SIZE;
1725 		}
1726 
1727 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1728 		bdev_io->u.bdev.iov.iov_base = g_bdev_mgr.zero_buffer;
1729 		bdev_io->u.bdev.iov.iov_len = len;
1730 		bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1731 		bdev_io->u.bdev.iovcnt = 1;
1732 		bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev);
1733 		bdev_io->u.bdev.split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks;
1734 		bdev_io->u.bdev.split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks;
1735 	}
1736 
1737 	if (split_request) {
1738 		bdev_io->u.bdev.stored_user_cb = cb;
1739 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split);
1740 	} else {
1741 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1742 	}
1743 	spdk_bdev_io_submit(bdev_io);
1744 	return 0;
1745 }
1746 
1747 int
1748 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1749 		uint64_t offset, uint64_t nbytes,
1750 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1751 {
1752 	uint64_t offset_blocks, num_blocks;
1753 
1754 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1755 		return -EINVAL;
1756 	}
1757 
1758 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1759 }
1760 
1761 int
1762 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1763 		       uint64_t offset_blocks, uint64_t num_blocks,
1764 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1765 {
1766 	struct spdk_bdev *bdev = desc->bdev;
1767 	struct spdk_bdev_io *bdev_io;
1768 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1769 
1770 	if (!desc->write) {
1771 		return -EBADF;
1772 	}
1773 
1774 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1775 		return -EINVAL;
1776 	}
1777 
1778 	if (num_blocks == 0) {
1779 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
1780 		return -EINVAL;
1781 	}
1782 
1783 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1784 	if (!bdev_io) {
1785 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
1786 		return -ENOMEM;
1787 	}
1788 
1789 	bdev_io->ch = channel;
1790 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1791 	bdev_io->u.bdev.iov.iov_base = NULL;
1792 	bdev_io->u.bdev.iov.iov_len = 0;
1793 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1794 	bdev_io->u.bdev.iovcnt = 1;
1795 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1796 	bdev_io->u.bdev.num_blocks = num_blocks;
1797 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1798 
1799 	spdk_bdev_io_submit(bdev_io);
1800 	return 0;
1801 }
1802 
1803 int
1804 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1805 		uint64_t offset, uint64_t length,
1806 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1807 {
1808 	uint64_t offset_blocks, num_blocks;
1809 
1810 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
1811 		return -EINVAL;
1812 	}
1813 
1814 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1815 }
1816 
1817 int
1818 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1819 		       uint64_t offset_blocks, uint64_t num_blocks,
1820 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1821 {
1822 	struct spdk_bdev *bdev = desc->bdev;
1823 	struct spdk_bdev_io *bdev_io;
1824 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1825 
1826 	if (!desc->write) {
1827 		return -EBADF;
1828 	}
1829 
1830 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1831 		return -EINVAL;
1832 	}
1833 
1834 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1835 	if (!bdev_io) {
1836 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1837 		return -ENOMEM;
1838 	}
1839 
1840 	bdev_io->ch = channel;
1841 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1842 	bdev_io->u.bdev.iovs = NULL;
1843 	bdev_io->u.bdev.iovcnt = 0;
1844 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1845 	bdev_io->u.bdev.num_blocks = num_blocks;
1846 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1847 
1848 	spdk_bdev_io_submit(bdev_io);
1849 	return 0;
1850 }
1851 
1852 static void
1853 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
1854 {
1855 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
1856 	struct spdk_bdev_io *bdev_io;
1857 
1858 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
1859 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, link);
1860 	spdk_bdev_io_submit_reset(bdev_io);
1861 }
1862 
1863 static void
1864 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
1865 {
1866 	struct spdk_io_channel		*ch;
1867 	struct spdk_bdev_channel	*channel;
1868 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1869 	struct spdk_bdev_module_channel	*shared_ch;
1870 
1871 	ch = spdk_io_channel_iter_get_channel(i);
1872 	channel = spdk_io_channel_get_ctx(ch);
1873 	mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
1874 	shared_ch = channel->module_ch;
1875 
1876 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
1877 
1878 	_spdk_bdev_abort_queued_io(&shared_ch->nomem_io, channel);
1879 	_spdk_bdev_abort_queued_io(&channel->qos_io, channel);
1880 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
1881 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
1882 
1883 	spdk_for_each_channel_continue(i, 0);
1884 }
1885 
1886 static void
1887 _spdk_bdev_reset_freeze_qos_channel(void *ctx)
1888 {
1889 	struct spdk_bdev		*bdev = ctx;
1890 	struct spdk_bdev_mgmt_channel	*mgmt_channel = NULL;
1891 	struct spdk_bdev_channel	*qos_channel = bdev->qos_channel;
1892 	struct spdk_bdev_module_channel	*shared_ch = NULL;
1893 
1894 	if (qos_channel) {
1895 		shared_ch = qos_channel->module_ch;
1896 		mgmt_channel = spdk_io_channel_get_ctx(qos_channel->mgmt_channel);
1897 
1898 		qos_channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
1899 
1900 		_spdk_bdev_abort_queued_io(&shared_ch->nomem_io, qos_channel);
1901 		_spdk_bdev_abort_queued_io(&qos_channel->qos_io, qos_channel);
1902 		_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, qos_channel);
1903 		_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, qos_channel);
1904 	}
1905 }
1906 
1907 static void
1908 _spdk_bdev_start_reset(void *ctx)
1909 {
1910 	struct spdk_bdev_channel *ch = ctx;
1911 
1912 	spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel,
1913 			      ch, _spdk_bdev_reset_dev);
1914 }
1915 
1916 static void
1917 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
1918 {
1919 	struct spdk_bdev *bdev = ch->bdev;
1920 
1921 	assert(!TAILQ_EMPTY(&ch->queued_resets));
1922 
1923 	pthread_mutex_lock(&bdev->mutex);
1924 	if (bdev->reset_in_progress == NULL) {
1925 		bdev->reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
1926 		/*
1927 		 * Take a channel reference for the target bdev for the life of this
1928 		 *  reset.  This guards against the channel getting destroyed while
1929 		 *  spdk_for_each_channel() calls related to this reset IO are in
1930 		 *  progress.  We will release the reference when this reset is
1931 		 *  completed.
1932 		 */
1933 		bdev->reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev));
1934 		_spdk_bdev_start_reset(ch);
1935 	}
1936 	pthread_mutex_unlock(&bdev->mutex);
1937 }
1938 
1939 int
1940 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1941 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1942 {
1943 	struct spdk_bdev *bdev = desc->bdev;
1944 	struct spdk_bdev_io *bdev_io;
1945 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1946 
1947 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1948 	if (!bdev_io) {
1949 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1950 		return -ENOMEM;
1951 	}
1952 
1953 	bdev_io->ch = channel;
1954 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1955 	bdev_io->u.reset.ch_ref = NULL;
1956 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1957 
1958 	pthread_mutex_lock(&bdev->mutex);
1959 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, link);
1960 	pthread_mutex_unlock(&bdev->mutex);
1961 
1962 	_spdk_bdev_channel_start_reset(channel);
1963 
1964 	/* Explicitly handle the QoS bdev channel as no IO channel associated */
1965 	if (bdev->qos_thread) {
1966 		spdk_thread_send_msg(bdev->qos_thread,
1967 				     _spdk_bdev_reset_freeze_qos_channel, bdev);
1968 	}
1969 
1970 	return 0;
1971 }
1972 
1973 void
1974 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1975 		      struct spdk_bdev_io_stat *stat)
1976 {
1977 #ifdef SPDK_CONFIG_VTUNE
1978 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1979 	memset(stat, 0, sizeof(*stat));
1980 	return;
1981 #endif
1982 
1983 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1984 
1985 	channel->stat.ticks_rate = spdk_get_ticks_hz();
1986 	*stat = channel->stat;
1987 	memset(&channel->stat, 0, sizeof(channel->stat));
1988 }
1989 
1990 int
1991 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1992 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1993 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1994 {
1995 	struct spdk_bdev *bdev = desc->bdev;
1996 	struct spdk_bdev_io *bdev_io;
1997 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1998 
1999 	if (!desc->write) {
2000 		return -EBADF;
2001 	}
2002 
2003 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
2004 	if (!bdev_io) {
2005 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
2006 		return -ENOMEM;
2007 	}
2008 
2009 	bdev_io->ch = channel;
2010 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
2011 	bdev_io->u.nvme_passthru.cmd = *cmd;
2012 	bdev_io->u.nvme_passthru.buf = buf;
2013 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2014 	bdev_io->u.nvme_passthru.md_buf = NULL;
2015 	bdev_io->u.nvme_passthru.md_len = 0;
2016 
2017 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2018 
2019 	spdk_bdev_io_submit(bdev_io);
2020 	return 0;
2021 }
2022 
2023 int
2024 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2025 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2026 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
2027 {
2028 	struct spdk_bdev *bdev = desc->bdev;
2029 	struct spdk_bdev_io *bdev_io;
2030 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2031 
2032 	if (!desc->write) {
2033 		/*
2034 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2035 		 *  to easily determine if the command is a read or write, but for now just
2036 		 *  do not allow io_passthru with a read-only descriptor.
2037 		 */
2038 		return -EBADF;
2039 	}
2040 
2041 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
2042 	if (!bdev_io) {
2043 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
2044 		return -ENOMEM;
2045 	}
2046 
2047 	bdev_io->ch = channel;
2048 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
2049 	bdev_io->u.nvme_passthru.cmd = *cmd;
2050 	bdev_io->u.nvme_passthru.buf = buf;
2051 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2052 	bdev_io->u.nvme_passthru.md_buf = NULL;
2053 	bdev_io->u.nvme_passthru.md_len = 0;
2054 
2055 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2056 
2057 	spdk_bdev_io_submit(bdev_io);
2058 	return 0;
2059 }
2060 
2061 int
2062 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2063 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
2064 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2065 {
2066 	struct spdk_bdev *bdev = desc->bdev;
2067 	struct spdk_bdev_io *bdev_io;
2068 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2069 
2070 	if (!desc->write) {
2071 		/*
2072 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2073 		 *  to easily determine if the command is a read or write, but for now just
2074 		 *  do not allow io_passthru with a read-only descriptor.
2075 		 */
2076 		return -EBADF;
2077 	}
2078 
2079 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
2080 	if (!bdev_io) {
2081 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
2082 		return -ENOMEM;
2083 	}
2084 
2085 	bdev_io->ch = channel;
2086 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
2087 	bdev_io->u.nvme_passthru.cmd = *cmd;
2088 	bdev_io->u.nvme_passthru.buf = buf;
2089 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2090 	bdev_io->u.nvme_passthru.md_buf = md_buf;
2091 	bdev_io->u.nvme_passthru.md_len = md_len;
2092 
2093 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2094 
2095 	spdk_bdev_io_submit(bdev_io);
2096 	return 0;
2097 }
2098 
2099 int
2100 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
2101 {
2102 	if (!bdev_io) {
2103 		SPDK_ERRLOG("bdev_io is NULL\n");
2104 		return -1;
2105 	}
2106 
2107 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
2108 		SPDK_ERRLOG("bdev_io is in pending state\n");
2109 		assert(false);
2110 		return -1;
2111 	}
2112 
2113 	spdk_bdev_put_io(bdev_io);
2114 
2115 	return 0;
2116 }
2117 
2118 static void
2119 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
2120 {
2121 	struct spdk_bdev *bdev = bdev_ch->bdev;
2122 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
2123 	struct spdk_bdev_io *bdev_io;
2124 
2125 	if (shared_ch->io_outstanding > shared_ch->nomem_threshold) {
2126 		/*
2127 		 * Allow some more I/O to complete before retrying the nomem_io queue.
2128 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
2129 		 *  the context of a completion, because the resources for the I/O are
2130 		 *  not released until control returns to the bdev poller.  Also, we
2131 		 *  may require several small I/O to complete before a larger I/O
2132 		 *  (that requires splitting) can be submitted.
2133 		 */
2134 		return;
2135 	}
2136 
2137 	while (!TAILQ_EMPTY(&shared_ch->nomem_io)) {
2138 		bdev_io = TAILQ_FIRST(&shared_ch->nomem_io);
2139 		TAILQ_REMOVE(&shared_ch->nomem_io, bdev_io, link);
2140 		bdev_io->ch->io_outstanding++;
2141 		shared_ch->io_outstanding++;
2142 		bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
2143 		bdev->fn_table->submit_request(bdev_io->ch->channel, bdev_io);
2144 		if (bdev_io->status == SPDK_BDEV_IO_STATUS_NOMEM) {
2145 			break;
2146 		}
2147 	}
2148 }
2149 
2150 static inline void
2151 _spdk_bdev_io_complete(void *ctx)
2152 {
2153 	struct spdk_bdev_io *bdev_io = ctx;
2154 
2155 	if (spdk_unlikely(bdev_io->in_submit_request || bdev_io->io_submit_ch)) {
2156 		/*
2157 		 * Send the completion to the thread that originally submitted the I/O,
2158 		 * which may not be the current thread in the case of QoS.
2159 		 */
2160 		if (bdev_io->io_submit_ch) {
2161 			bdev_io->ch = bdev_io->io_submit_ch;
2162 			bdev_io->io_submit_ch = NULL;
2163 		}
2164 
2165 		/*
2166 		 * Defer completion to avoid potential infinite recursion if the
2167 		 * user's completion callback issues a new I/O.
2168 		 */
2169 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->ch->channel),
2170 				     _spdk_bdev_io_complete, bdev_io);
2171 		return;
2172 	}
2173 
2174 	assert(bdev_io->cb != NULL);
2175 	assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->ch->channel));
2176 
2177 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS,
2178 		    bdev_io->caller_ctx);
2179 }
2180 
2181 static void
2182 _spdk_bdev_unfreeze_qos_channel(void *ctx)
2183 {
2184 	struct spdk_bdev	*bdev = ctx;
2185 
2186 	if (bdev->qos_channel) {
2187 		bdev->qos_channel->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
2188 		assert(TAILQ_EMPTY(&bdev->qos_channel->queued_resets));
2189 	}
2190 }
2191 
2192 static void
2193 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
2194 {
2195 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
2196 
2197 	if (bdev_io->u.reset.ch_ref != NULL) {
2198 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
2199 		bdev_io->u.reset.ch_ref = NULL;
2200 	}
2201 
2202 	_spdk_bdev_io_complete(bdev_io);
2203 }
2204 
2205 static void
2206 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
2207 {
2208 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
2209 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
2210 
2211 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
2212 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
2213 		_spdk_bdev_channel_start_reset(ch);
2214 	}
2215 
2216 	spdk_for_each_channel_continue(i, 0);
2217 }
2218 
2219 void
2220 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
2221 {
2222 	struct spdk_bdev *bdev = bdev_io->bdev;
2223 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
2224 	struct spdk_bdev_module_channel	*shared_ch = bdev_ch->module_ch;
2225 
2226 	bdev_io->status = status;
2227 
2228 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
2229 		bool unlock_channels = false;
2230 
2231 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
2232 			SPDK_ERRLOG("NOMEM returned for reset\n");
2233 		}
2234 		pthread_mutex_lock(&bdev->mutex);
2235 		if (bdev_io == bdev->reset_in_progress) {
2236 			bdev->reset_in_progress = NULL;
2237 			unlock_channels = true;
2238 		}
2239 		pthread_mutex_unlock(&bdev->mutex);
2240 
2241 		if (unlock_channels) {
2242 			/* Explicitly handle the QoS bdev channel as no IO channel associated */
2243 			if (bdev->qos_thread) {
2244 				spdk_thread_send_msg(bdev->qos_thread,
2245 						     _spdk_bdev_unfreeze_qos_channel, bdev);
2246 			}
2247 
2248 			spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel,
2249 					      bdev_io, _spdk_bdev_reset_complete);
2250 			return;
2251 		}
2252 	} else {
2253 		assert(bdev_ch->io_outstanding > 0);
2254 		assert(shared_ch->io_outstanding > 0);
2255 		bdev_ch->io_outstanding--;
2256 		shared_ch->io_outstanding--;
2257 
2258 		if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) {
2259 			TAILQ_INSERT_HEAD(&shared_ch->nomem_io, bdev_io, link);
2260 			/*
2261 			 * Wait for some of the outstanding I/O to complete before we
2262 			 *  retry any of the nomem_io.  Normally we will wait for
2263 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
2264 			 *  depth channels we will instead wait for half to complete.
2265 			 */
2266 			shared_ch->nomem_threshold = spdk_max((int64_t)shared_ch->io_outstanding / 2,
2267 							      (int64_t)shared_ch->io_outstanding - NOMEM_THRESHOLD_COUNT);
2268 			return;
2269 		}
2270 
2271 		if (spdk_unlikely(!TAILQ_EMPTY(&shared_ch->nomem_io))) {
2272 			_spdk_bdev_ch_retry_io(bdev_ch);
2273 		}
2274 	}
2275 
2276 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2277 		switch (bdev_io->type) {
2278 		case SPDK_BDEV_IO_TYPE_READ:
2279 			bdev_ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev->blocklen;
2280 			bdev_ch->stat.num_read_ops++;
2281 			bdev_ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->submit_tsc);
2282 			break;
2283 		case SPDK_BDEV_IO_TYPE_WRITE:
2284 			bdev_ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev->blocklen;
2285 			bdev_ch->stat.num_write_ops++;
2286 			bdev_ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->submit_tsc);
2287 			break;
2288 		default:
2289 			break;
2290 		}
2291 	}
2292 
2293 #ifdef SPDK_CONFIG_VTUNE
2294 	uint64_t now_tsc = spdk_get_ticks();
2295 	if (now_tsc > (bdev_ch->start_tsc + bdev_ch->interval_tsc)) {
2296 		uint64_t data[5];
2297 
2298 		data[0] = bdev_ch->stat.num_read_ops;
2299 		data[1] = bdev_ch->stat.bytes_read;
2300 		data[2] = bdev_ch->stat.num_write_ops;
2301 		data[3] = bdev_ch->stat.bytes_written;
2302 		data[4] = bdev->fn_table->get_spin_time ?
2303 			  bdev->fn_table->get_spin_time(bdev_ch->channel) : 0;
2304 
2305 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_ch->handle,
2306 				   __itt_metadata_u64, 5, data);
2307 
2308 		memset(&bdev_ch->stat, 0, sizeof(bdev_ch->stat));
2309 		bdev_ch->start_tsc = now_tsc;
2310 	}
2311 #endif
2312 
2313 	_spdk_bdev_io_complete(bdev_io);
2314 }
2315 
2316 void
2317 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
2318 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
2319 {
2320 	if (sc == SPDK_SCSI_STATUS_GOOD) {
2321 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
2322 	} else {
2323 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
2324 		bdev_io->error.scsi.sc = sc;
2325 		bdev_io->error.scsi.sk = sk;
2326 		bdev_io->error.scsi.asc = asc;
2327 		bdev_io->error.scsi.ascq = ascq;
2328 	}
2329 
2330 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
2331 }
2332 
2333 void
2334 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
2335 			     int *sc, int *sk, int *asc, int *ascq)
2336 {
2337 	assert(sc != NULL);
2338 	assert(sk != NULL);
2339 	assert(asc != NULL);
2340 	assert(ascq != NULL);
2341 
2342 	switch (bdev_io->status) {
2343 	case SPDK_BDEV_IO_STATUS_SUCCESS:
2344 		*sc = SPDK_SCSI_STATUS_GOOD;
2345 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
2346 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2347 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2348 		break;
2349 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
2350 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
2351 		break;
2352 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
2353 		*sc = bdev_io->error.scsi.sc;
2354 		*sk = bdev_io->error.scsi.sk;
2355 		*asc = bdev_io->error.scsi.asc;
2356 		*ascq = bdev_io->error.scsi.ascq;
2357 		break;
2358 	default:
2359 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
2360 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
2361 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2362 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2363 		break;
2364 	}
2365 }
2366 
2367 void
2368 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
2369 {
2370 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
2371 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
2372 	} else {
2373 		bdev_io->error.nvme.sct = sct;
2374 		bdev_io->error.nvme.sc = sc;
2375 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
2376 	}
2377 
2378 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
2379 }
2380 
2381 void
2382 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
2383 {
2384 	assert(sct != NULL);
2385 	assert(sc != NULL);
2386 
2387 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
2388 		*sct = bdev_io->error.nvme.sct;
2389 		*sc = bdev_io->error.nvme.sc;
2390 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2391 		*sct = SPDK_NVME_SCT_GENERIC;
2392 		*sc = SPDK_NVME_SC_SUCCESS;
2393 	} else {
2394 		*sct = SPDK_NVME_SCT_GENERIC;
2395 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
2396 	}
2397 }
2398 
2399 struct spdk_thread *
2400 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
2401 {
2402 	return spdk_io_channel_get_thread(bdev_io->ch->channel);
2403 }
2404 
2405 static void
2406 _spdk_bdev_qos_config(struct spdk_bdev *bdev)
2407 {
2408 	struct spdk_conf_section	*sp = NULL;
2409 	const char			*val = NULL;
2410 	int				ios_per_sec = 0;
2411 	int				i = 0;
2412 
2413 	sp = spdk_conf_find_section(NULL, "QoS");
2414 	if (!sp) {
2415 		return;
2416 	}
2417 
2418 	while (true) {
2419 		val = spdk_conf_section_get_nmval(sp, "Limit_IOPS", i, 0);
2420 		if (!val) {
2421 			break;
2422 		}
2423 
2424 		if (strcmp(bdev->name, val) != 0) {
2425 			i++;
2426 			continue;
2427 		}
2428 
2429 		val = spdk_conf_section_get_nmval(sp, "Limit_IOPS", i, 1);
2430 		if (!val) {
2431 			return;
2432 		}
2433 
2434 		ios_per_sec = (int)strtol(val, NULL, 10);
2435 		if (ios_per_sec > 0) {
2436 			if (ios_per_sec % SPDK_BDEV_QOS_MIN_IOS_PER_SEC) {
2437 				SPDK_ERRLOG("Assigned IOPS %u on bdev %s is not multiple of %u\n",
2438 					    ios_per_sec, bdev->name, SPDK_BDEV_QOS_MIN_IOS_PER_SEC);
2439 				SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name);
2440 			} else {
2441 				bdev->ios_per_sec = (uint64_t)ios_per_sec;
2442 				SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS:%lu\n",
2443 					      bdev->name, bdev->ios_per_sec);
2444 			}
2445 		}
2446 
2447 		return;
2448 	}
2449 }
2450 
2451 static int
2452 spdk_bdev_init(struct spdk_bdev *bdev)
2453 {
2454 	assert(bdev->module != NULL);
2455 
2456 	if (!bdev->name) {
2457 		SPDK_ERRLOG("Bdev name is NULL\n");
2458 		return -EINVAL;
2459 	}
2460 
2461 	if (spdk_bdev_get_by_name(bdev->name)) {
2462 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
2463 		return -EEXIST;
2464 	}
2465 
2466 	bdev->status = SPDK_BDEV_STATUS_READY;
2467 
2468 	TAILQ_INIT(&bdev->open_descs);
2469 
2470 	TAILQ_INIT(&bdev->aliases);
2471 
2472 	bdev->reset_in_progress = NULL;
2473 
2474 	_spdk_bdev_qos_config(bdev);
2475 
2476 	spdk_io_device_register(__bdev_to_io_dev(bdev),
2477 				spdk_bdev_channel_create, spdk_bdev_channel_destroy,
2478 				sizeof(struct spdk_bdev_channel));
2479 
2480 	pthread_mutex_init(&bdev->mutex, NULL);
2481 	return 0;
2482 }
2483 
2484 static void
2485 spdk_bdev_fini(struct spdk_bdev *bdev)
2486 {
2487 	pthread_mutex_destroy(&bdev->mutex);
2488 
2489 	spdk_io_device_unregister(__bdev_to_io_dev(bdev), NULL);
2490 }
2491 
2492 static void
2493 spdk_bdev_start(struct spdk_bdev *bdev)
2494 {
2495 	struct spdk_bdev_module *module;
2496 
2497 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
2498 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
2499 
2500 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
2501 		if (module->examine) {
2502 			module->action_in_progress++;
2503 			module->examine(bdev);
2504 		}
2505 	}
2506 }
2507 
2508 int
2509 spdk_bdev_register(struct spdk_bdev *bdev)
2510 {
2511 	int rc = spdk_bdev_init(bdev);
2512 
2513 	if (rc == 0) {
2514 		spdk_bdev_start(bdev);
2515 	}
2516 
2517 	return rc;
2518 }
2519 
2520 static void
2521 spdk_vbdev_remove_base_bdevs(struct spdk_bdev *vbdev)
2522 {
2523 	struct spdk_bdev **bdevs;
2524 	struct spdk_bdev *base;
2525 	size_t i, j, k;
2526 	bool found;
2527 
2528 	/* Iterate over base bdevs to remove vbdev from them. */
2529 	for (i = 0; i < vbdev->base_bdevs_cnt; i++) {
2530 		found = false;
2531 		base = vbdev->base_bdevs[i];
2532 
2533 		for (j = 0; j < base->vbdevs_cnt; j++) {
2534 			if (base->vbdevs[j] != vbdev) {
2535 				continue;
2536 			}
2537 
2538 			for (k = j; k + 1 < base->vbdevs_cnt; k++) {
2539 				base->vbdevs[k] = base->vbdevs[k + 1];
2540 			}
2541 
2542 			base->vbdevs_cnt--;
2543 			if (base->vbdevs_cnt > 0) {
2544 				bdevs = realloc(base->vbdevs, base->vbdevs_cnt * sizeof(bdevs[0]));
2545 				/* It would be odd if shrinking memory block fail. */
2546 				assert(bdevs);
2547 				base->vbdevs = bdevs;
2548 			} else {
2549 				free(base->vbdevs);
2550 				base->vbdevs = NULL;
2551 			}
2552 
2553 			found = true;
2554 			break;
2555 		}
2556 
2557 		if (!found) {
2558 			SPDK_WARNLOG("Bdev '%s' is not base bdev of '%s'.\n", base->name, vbdev->name);
2559 		}
2560 	}
2561 
2562 	free(vbdev->base_bdevs);
2563 	vbdev->base_bdevs = NULL;
2564 	vbdev->base_bdevs_cnt = 0;
2565 }
2566 
2567 static int
2568 spdk_vbdev_set_base_bdevs(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, size_t cnt)
2569 {
2570 	struct spdk_bdev **vbdevs;
2571 	struct spdk_bdev *base;
2572 	size_t i;
2573 
2574 	/* Adding base bdevs isn't supported (yet?). */
2575 	assert(vbdev->base_bdevs_cnt == 0);
2576 
2577 	vbdev->base_bdevs = malloc(cnt * sizeof(vbdev->base_bdevs[0]));
2578 	if (!vbdev->base_bdevs) {
2579 		SPDK_ERRLOG("%s - realloc() failed\n", vbdev->name);
2580 		return -ENOMEM;
2581 	}
2582 
2583 	memcpy(vbdev->base_bdevs, base_bdevs, cnt * sizeof(vbdev->base_bdevs[0]));
2584 	vbdev->base_bdevs_cnt = cnt;
2585 
2586 	/* Iterate over base bdevs to add this vbdev to them. */
2587 	for (i = 0; i < cnt; i++) {
2588 		base = vbdev->base_bdevs[i];
2589 
2590 		assert(base != NULL);
2591 		assert(base->claim_module != NULL);
2592 
2593 		vbdevs = realloc(base->vbdevs, (base->vbdevs_cnt + 1) * sizeof(vbdevs[0]));
2594 		if (!vbdevs) {
2595 			SPDK_ERRLOG("%s - realloc() failed\n", base->name);
2596 			spdk_vbdev_remove_base_bdevs(vbdev);
2597 			return -ENOMEM;
2598 		}
2599 
2600 		vbdevs[base->vbdevs_cnt] = vbdev;
2601 		base->vbdevs = vbdevs;
2602 		base->vbdevs_cnt++;
2603 	}
2604 
2605 	return 0;
2606 }
2607 
2608 int
2609 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
2610 {
2611 	int rc;
2612 
2613 	rc = spdk_bdev_init(vbdev);
2614 	if (rc) {
2615 		return rc;
2616 	}
2617 
2618 	if (base_bdev_count == 0) {
2619 		spdk_bdev_start(vbdev);
2620 		return 0;
2621 	}
2622 
2623 	rc = spdk_vbdev_set_base_bdevs(vbdev, base_bdevs, base_bdev_count);
2624 	if (rc) {
2625 		spdk_bdev_fini(vbdev);
2626 		return rc;
2627 	}
2628 
2629 	spdk_bdev_start(vbdev);
2630 	return 0;
2631 
2632 }
2633 
2634 void
2635 spdk_bdev_unregister_done(struct spdk_bdev *bdev, int bdeverrno)
2636 {
2637 	if (bdev->unregister_cb != NULL) {
2638 		bdev->unregister_cb(bdev->unregister_ctx, bdeverrno);
2639 	}
2640 }
2641 
2642 static void
2643 _remove_notify(void *arg)
2644 {
2645 	struct spdk_bdev_desc *desc = arg;
2646 
2647 	desc->remove_cb(desc->remove_ctx);
2648 }
2649 
2650 void
2651 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
2652 {
2653 	struct spdk_bdev_desc	*desc, *tmp;
2654 	int			rc;
2655 	bool			do_destruct = true;
2656 
2657 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
2658 
2659 	pthread_mutex_lock(&bdev->mutex);
2660 
2661 	spdk_vbdev_remove_base_bdevs(bdev);
2662 
2663 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
2664 	bdev->unregister_cb = cb_fn;
2665 	bdev->unregister_ctx = cb_arg;
2666 
2667 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
2668 		if (desc->remove_cb) {
2669 			do_destruct = false;
2670 			/*
2671 			 * Defer invocation of the remove_cb to a separate message that will
2672 			 *  run later on this thread.  This ensures this context unwinds and
2673 			 *  we don't recursively unregister this bdev again if the remove_cb
2674 			 *  immediately closes its descriptor.
2675 			 */
2676 			spdk_thread_send_msg(spdk_get_thread(), _remove_notify, desc);
2677 		}
2678 	}
2679 
2680 	if (!do_destruct) {
2681 		pthread_mutex_unlock(&bdev->mutex);
2682 		return;
2683 	}
2684 
2685 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
2686 	pthread_mutex_unlock(&bdev->mutex);
2687 
2688 	spdk_bdev_fini(bdev);
2689 
2690 	rc = bdev->fn_table->destruct(bdev->ctxt);
2691 	if (rc < 0) {
2692 		SPDK_ERRLOG("destruct failed\n");
2693 	}
2694 	if (rc <= 0 && cb_fn != NULL) {
2695 		cb_fn(cb_arg, rc);
2696 	}
2697 }
2698 
2699 int
2700 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
2701 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
2702 {
2703 	struct spdk_bdev_desc *desc;
2704 
2705 	desc = calloc(1, sizeof(*desc));
2706 	if (desc == NULL) {
2707 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
2708 		return -ENOMEM;
2709 	}
2710 
2711 	pthread_mutex_lock(&bdev->mutex);
2712 
2713 	if (write && bdev->claim_module) {
2714 		SPDK_INFOLOG(SPDK_LOG_BDEV, "Could not open %s - already claimed\n", bdev->name);
2715 		free(desc);
2716 		pthread_mutex_unlock(&bdev->mutex);
2717 		return -EPERM;
2718 	}
2719 
2720 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
2721 
2722 	desc->bdev = bdev;
2723 	desc->remove_cb = remove_cb;
2724 	desc->remove_ctx = remove_ctx;
2725 	desc->write = write;
2726 	*_desc = desc;
2727 
2728 	pthread_mutex_unlock(&bdev->mutex);
2729 
2730 	return 0;
2731 }
2732 
2733 void
2734 spdk_bdev_close(struct spdk_bdev_desc *desc)
2735 {
2736 	struct spdk_bdev *bdev = desc->bdev;
2737 	bool do_unregister = false;
2738 
2739 	pthread_mutex_lock(&bdev->mutex);
2740 
2741 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
2742 	free(desc);
2743 
2744 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
2745 		do_unregister = true;
2746 	}
2747 	pthread_mutex_unlock(&bdev->mutex);
2748 
2749 	if (do_unregister == true) {
2750 		spdk_bdev_unregister(bdev, bdev->unregister_cb, bdev->unregister_ctx);
2751 	}
2752 }
2753 
2754 int
2755 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
2756 			    struct spdk_bdev_module *module)
2757 {
2758 	if (bdev->claim_module != NULL) {
2759 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
2760 			    bdev->claim_module->name);
2761 		return -EPERM;
2762 	}
2763 
2764 	if (desc && !desc->write) {
2765 		desc->write = true;
2766 	}
2767 
2768 	bdev->claim_module = module;
2769 	return 0;
2770 }
2771 
2772 void
2773 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
2774 {
2775 	assert(bdev->claim_module != NULL);
2776 	bdev->claim_module = NULL;
2777 }
2778 
2779 struct spdk_bdev *
2780 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
2781 {
2782 	return desc->bdev;
2783 }
2784 
2785 void
2786 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
2787 {
2788 	struct iovec *iovs;
2789 	int iovcnt;
2790 
2791 	if (bdev_io == NULL) {
2792 		return;
2793 	}
2794 
2795 	switch (bdev_io->type) {
2796 	case SPDK_BDEV_IO_TYPE_READ:
2797 		iovs = bdev_io->u.bdev.iovs;
2798 		iovcnt = bdev_io->u.bdev.iovcnt;
2799 		break;
2800 	case SPDK_BDEV_IO_TYPE_WRITE:
2801 		iovs = bdev_io->u.bdev.iovs;
2802 		iovcnt = bdev_io->u.bdev.iovcnt;
2803 		break;
2804 	default:
2805 		iovs = NULL;
2806 		iovcnt = 0;
2807 		break;
2808 	}
2809 
2810 	if (iovp) {
2811 		*iovp = iovs;
2812 	}
2813 	if (iovcntp) {
2814 		*iovcntp = iovcnt;
2815 	}
2816 }
2817 
2818 void
2819 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module)
2820 {
2821 
2822 	if (spdk_bdev_module_list_find(bdev_module->name)) {
2823 		fprintf(stderr, "ERROR: module '%s' already registered.\n", bdev_module->name);
2824 		assert(false);
2825 	}
2826 
2827 	if (bdev_module->async_init) {
2828 		bdev_module->action_in_progress = 1;
2829 	}
2830 
2831 	/*
2832 	 * Modules with examine callbacks must be initialized first, so they are
2833 	 *  ready to handle examine callbacks from later modules that will
2834 	 *  register physical bdevs.
2835 	 */
2836 	if (bdev_module->examine != NULL) {
2837 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2838 	} else {
2839 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2840 	}
2841 }
2842 
2843 struct spdk_bdev_module *
2844 spdk_bdev_module_list_find(const char *name)
2845 {
2846 	struct spdk_bdev_module *bdev_module;
2847 
2848 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
2849 		if (strcmp(name, bdev_module->name) == 0) {
2850 			break;
2851 		}
2852 	}
2853 
2854 	return bdev_module;
2855 }
2856 
2857 static void
2858 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2859 {
2860 	uint64_t len;
2861 
2862 	if (!success) {
2863 		bdev_io->cb = bdev_io->u.bdev.stored_user_cb;
2864 		_spdk_bdev_io_complete(bdev_io);
2865 		return;
2866 	}
2867 
2868 	/* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */
2869 	len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->u.bdev.split_remaining_num_blocks,
2870 		       ZERO_BUFFER_SIZE);
2871 
2872 	bdev_io->u.bdev.offset_blocks = bdev_io->u.bdev.split_current_offset_blocks;
2873 	bdev_io->u.bdev.iov.iov_len = len;
2874 	bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev);
2875 	bdev_io->u.bdev.split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks;
2876 	bdev_io->u.bdev.split_current_offset_blocks += bdev_io->u.bdev.num_blocks;
2877 
2878 	/* if this round completes the i/o, change the callback to be the original user callback */
2879 	if (bdev_io->u.bdev.split_remaining_num_blocks == 0) {
2880 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->u.bdev.stored_user_cb);
2881 	} else {
2882 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split);
2883 	}
2884 	spdk_bdev_io_submit(bdev_io);
2885 }
2886 
2887 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
2888