xref: /spdk/lib/bdev/bdev.c (revision 8a0a98d35e21f282088edf28b9e8da66ec390e3a)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 #include "spdk/conf.h"
39 
40 #include "spdk/env.h"
41 #include "spdk/event.h"
42 #include "spdk/thread.h"
43 #include "spdk/likely.h"
44 #include "spdk/queue.h"
45 #include "spdk/nvme_spec.h"
46 #include "spdk/scsi_spec.h"
47 #include "spdk/util.h"
48 
49 #include "spdk/bdev_module.h"
50 #include "spdk_internal/log.h"
51 #include "spdk/string.h"
52 
53 #ifdef SPDK_CONFIG_VTUNE
54 #include "ittnotify.h"
55 #include "ittnotify_types.h"
56 int __itt_init_ittlib(const char *, __itt_group_id);
57 #endif
58 
59 #define SPDK_BDEV_IO_POOL_SIZE			(64 * 1024)
60 #define SPDK_BDEV_IO_CACHE_SIZE			256
61 #define BUF_SMALL_POOL_SIZE			8192
62 #define BUF_LARGE_POOL_SIZE			1024
63 #define NOMEM_THRESHOLD_COUNT			8
64 #define ZERO_BUFFER_SIZE			0x100000
65 #define SPDK_BDEV_QOS_TIMESLICE_IN_USEC		1000
66 #define SPDK_BDEV_SEC_TO_USEC			1000000ULL
67 #define SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE	1
68 #define SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE	512
69 #define SPDK_BDEV_QOS_MIN_IOS_PER_SEC		10000
70 #define SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC	10
71 
72 enum spdk_bdev_qos_type {
73 	SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT = 0,
74 	SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT,
75 	SPDK_BDEV_QOS_NUM_TYPES /* Keep last */
76 };
77 
78 static const char *qos_type_str[SPDK_BDEV_QOS_NUM_TYPES] = {"Limit_IOPS", "Limit_BWPS"};
79 
80 struct spdk_bdev_mgr {
81 	struct spdk_mempool *bdev_io_pool;
82 
83 	struct spdk_mempool *buf_small_pool;
84 	struct spdk_mempool *buf_large_pool;
85 
86 	void *zero_buffer;
87 
88 	TAILQ_HEAD(, spdk_bdev_module) bdev_modules;
89 
90 	TAILQ_HEAD(, spdk_bdev) bdevs;
91 
92 	bool init_complete;
93 	bool module_init_complete;
94 
95 #ifdef SPDK_CONFIG_VTUNE
96 	__itt_domain	*domain;
97 #endif
98 };
99 
100 static struct spdk_bdev_mgr g_bdev_mgr = {
101 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
102 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
103 	.init_complete = false,
104 	.module_init_complete = false,
105 };
106 
107 static struct spdk_bdev_opts	g_bdev_opts = {
108 	.bdev_io_pool_size = SPDK_BDEV_IO_POOL_SIZE,
109 	.bdev_io_cache_size = SPDK_BDEV_IO_CACHE_SIZE,
110 };
111 
112 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
113 static void			*g_init_cb_arg = NULL;
114 
115 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
116 static void			*g_fini_cb_arg = NULL;
117 static struct spdk_thread	*g_fini_thread = NULL;
118 
119 struct spdk_bdev_qos {
120 	/** Rate limit, in I/O per second */
121 	uint64_t iops_rate_limit;
122 
123 	/** Rate limit, in byte per second */
124 	uint64_t byte_rate_limit;
125 
126 	/** The channel that all I/O are funneled through */
127 	struct spdk_bdev_channel *ch;
128 
129 	/** The thread on which the poller is running. */
130 	struct spdk_thread *thread;
131 
132 	/** Queue of I/O waiting to be issued. */
133 	bdev_io_tailq_t queued;
134 
135 	/** Maximum allowed IOs to be issued in one timeslice (e.g., 1ms) and
136 	 *  only valid for the master channel which manages the outstanding IOs. */
137 	uint64_t max_ios_per_timeslice;
138 
139 	/** Maximum allowed bytes to be issued in one timeslice (e.g., 1ms) and
140 	 *  only valid for the master channel which manages the outstanding IOs. */
141 	uint64_t max_byte_per_timeslice;
142 
143 	/** Submitted IO in one timeslice (e.g., 1ms) */
144 	uint64_t io_submitted_this_timeslice;
145 
146 	/** Submitted byte in one timeslice (e.g., 1ms) */
147 	uint64_t byte_submitted_this_timeslice;
148 
149 	/** Polller that processes queued I/O commands each time slice. */
150 	struct spdk_poller *poller;
151 };
152 
153 struct spdk_bdev_mgmt_channel {
154 	bdev_io_stailq_t need_buf_small;
155 	bdev_io_stailq_t need_buf_large;
156 
157 	/*
158 	 * Each thread keeps a cache of bdev_io - this allows
159 	 *  bdev threads which are *not* DPDK threads to still
160 	 *  benefit from a per-thread bdev_io cache.  Without
161 	 *  this, non-DPDK threads fetching from the mempool
162 	 *  incur a cmpxchg on get and put.
163 	 */
164 	bdev_io_stailq_t per_thread_cache;
165 	uint32_t	per_thread_cache_count;
166 	uint32_t	bdev_io_cache_size;
167 
168 	TAILQ_HEAD(, spdk_bdev_shared_resource)	shared_resources;
169 	TAILQ_HEAD(, spdk_bdev_io_wait_entry)	io_wait_queue;
170 };
171 
172 /*
173  * Per-module (or per-io_device) data. Multiple bdevs built on the same io_device
174  * will queue here their IO that awaits retry. It makes it posible to retry sending
175  * IO to one bdev after IO from other bdev completes.
176  */
177 struct spdk_bdev_shared_resource {
178 	/* The bdev management channel */
179 	struct spdk_bdev_mgmt_channel *mgmt_ch;
180 
181 	/*
182 	 * Count of I/O submitted to bdev module and waiting for completion.
183 	 * Incremented before submit_request() is called on an spdk_bdev_io.
184 	 */
185 	uint64_t		io_outstanding;
186 
187 	/*
188 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
189 	 *  on this channel.
190 	 */
191 	bdev_io_tailq_t		nomem_io;
192 
193 	/*
194 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
195 	 */
196 	uint64_t		nomem_threshold;
197 
198 	/* I/O channel allocated by a bdev module */
199 	struct spdk_io_channel	*shared_ch;
200 
201 	/* Refcount of bdev channels using this resource */
202 	uint32_t		ref;
203 
204 	TAILQ_ENTRY(spdk_bdev_shared_resource) link;
205 };
206 
207 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
208 #define BDEV_CH_QOS_ENABLED		(1 << 1)
209 
210 struct spdk_bdev_channel {
211 	struct spdk_bdev	*bdev;
212 
213 	/* The channel for the underlying device */
214 	struct spdk_io_channel	*channel;
215 
216 	/* Per io_device per thread data */
217 	struct spdk_bdev_shared_resource *shared_resource;
218 
219 	struct spdk_bdev_io_stat stat;
220 
221 	/*
222 	 * Count of I/O submitted through this channel and waiting for completion.
223 	 * Incremented before submit_request() is called on an spdk_bdev_io.
224 	 */
225 	uint64_t		io_outstanding;
226 
227 	bdev_io_tailq_t		queued_resets;
228 
229 	uint32_t		flags;
230 
231 #ifdef SPDK_CONFIG_VTUNE
232 	uint64_t		start_tsc;
233 	uint64_t		interval_tsc;
234 	__itt_string_handle	*handle;
235 	struct spdk_bdev_io_stat prev_stat;
236 #endif
237 
238 };
239 
240 struct spdk_bdev_desc {
241 	struct spdk_bdev		*bdev;
242 	spdk_bdev_remove_cb_t		remove_cb;
243 	void				*remove_ctx;
244 	bool				remove_scheduled;
245 	bool				write;
246 	TAILQ_ENTRY(spdk_bdev_desc)	link;
247 };
248 
249 struct spdk_bdev_iostat_ctx {
250 	struct spdk_bdev_io_stat *stat;
251 	spdk_bdev_get_device_stat_cb cb;
252 	void *cb_arg;
253 };
254 
255 #define __bdev_to_io_dev(bdev)		(((char *)bdev) + 1)
256 #define __bdev_from_io_dev(io_dev)	((struct spdk_bdev *)(((char *)io_dev) - 1))
257 
258 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
259 
260 void
261 spdk_bdev_get_opts(struct spdk_bdev_opts *opts)
262 {
263 	*opts = g_bdev_opts;
264 }
265 
266 int
267 spdk_bdev_set_opts(struct spdk_bdev_opts *opts)
268 {
269 	uint32_t min_pool_size;
270 
271 	/*
272 	 * Add 1 to the thread count to account for the extra mgmt_ch that gets created during subsystem
273 	 *  initialization.  A second mgmt_ch will be created on the same thread when the application starts
274 	 *  but before the deferred put_io_channel event is executed for the first mgmt_ch.
275 	 */
276 	min_pool_size = opts->bdev_io_cache_size * (spdk_thread_get_count() + 1);
277 	if (opts->bdev_io_pool_size < min_pool_size) {
278 		SPDK_ERRLOG("bdev_io_pool_size %" PRIu32 " is not compatible with bdev_io_cache_size %" PRIu32
279 			    " and %" PRIu32 " threads\n", opts->bdev_io_pool_size, opts->bdev_io_cache_size,
280 			    spdk_thread_get_count());
281 		SPDK_ERRLOG("bdev_io_pool_size must be at least %" PRIu32 "\n", min_pool_size);
282 		return -1;
283 	}
284 
285 	g_bdev_opts = *opts;
286 	return 0;
287 }
288 
289 struct spdk_bdev *
290 spdk_bdev_first(void)
291 {
292 	struct spdk_bdev *bdev;
293 
294 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
295 	if (bdev) {
296 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
297 	}
298 
299 	return bdev;
300 }
301 
302 struct spdk_bdev *
303 spdk_bdev_next(struct spdk_bdev *prev)
304 {
305 	struct spdk_bdev *bdev;
306 
307 	bdev = TAILQ_NEXT(prev, link);
308 	if (bdev) {
309 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
310 	}
311 
312 	return bdev;
313 }
314 
315 static struct spdk_bdev *
316 _bdev_next_leaf(struct spdk_bdev *bdev)
317 {
318 	while (bdev != NULL) {
319 		if (bdev->claim_module == NULL) {
320 			return bdev;
321 		} else {
322 			bdev = TAILQ_NEXT(bdev, link);
323 		}
324 	}
325 
326 	return bdev;
327 }
328 
329 struct spdk_bdev *
330 spdk_bdev_first_leaf(void)
331 {
332 	struct spdk_bdev *bdev;
333 
334 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
335 
336 	if (bdev) {
337 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
338 	}
339 
340 	return bdev;
341 }
342 
343 struct spdk_bdev *
344 spdk_bdev_next_leaf(struct spdk_bdev *prev)
345 {
346 	struct spdk_bdev *bdev;
347 
348 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
349 
350 	if (bdev) {
351 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
352 	}
353 
354 	return bdev;
355 }
356 
357 struct spdk_bdev *
358 spdk_bdev_get_by_name(const char *bdev_name)
359 {
360 	struct spdk_bdev_alias *tmp;
361 	struct spdk_bdev *bdev = spdk_bdev_first();
362 
363 	while (bdev != NULL) {
364 		if (strcmp(bdev_name, bdev->name) == 0) {
365 			return bdev;
366 		}
367 
368 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
369 			if (strcmp(bdev_name, tmp->alias) == 0) {
370 				return bdev;
371 			}
372 		}
373 
374 		bdev = spdk_bdev_next(bdev);
375 	}
376 
377 	return NULL;
378 }
379 
380 static void
381 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
382 {
383 	assert(bdev_io->internal.get_buf_cb != NULL);
384 	assert(buf != NULL);
385 	assert(bdev_io->u.bdev.iovs != NULL);
386 
387 	bdev_io->internal.buf = buf;
388 	bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
389 	bdev_io->u.bdev.iovs[0].iov_len = bdev_io->internal.buf_len;
390 	bdev_io->internal.get_buf_cb(bdev_io->internal.ch->channel, bdev_io);
391 }
392 
393 static void
394 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
395 {
396 	struct spdk_mempool *pool;
397 	struct spdk_bdev_io *tmp;
398 	void *buf;
399 	bdev_io_stailq_t *stailq;
400 	struct spdk_bdev_mgmt_channel *ch;
401 
402 	assert(bdev_io->u.bdev.iovcnt == 1);
403 
404 	buf = bdev_io->internal.buf;
405 	ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
406 
407 	if (bdev_io->internal.buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
408 		pool = g_bdev_mgr.buf_small_pool;
409 		stailq = &ch->need_buf_small;
410 	} else {
411 		pool = g_bdev_mgr.buf_large_pool;
412 		stailq = &ch->need_buf_large;
413 	}
414 
415 	if (STAILQ_EMPTY(stailq)) {
416 		spdk_mempool_put(pool, buf);
417 	} else {
418 		tmp = STAILQ_FIRST(stailq);
419 		STAILQ_REMOVE_HEAD(stailq, internal.buf_link);
420 		spdk_bdev_io_set_buf(tmp, buf);
421 	}
422 }
423 
424 void
425 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
426 {
427 	struct spdk_mempool *pool;
428 	bdev_io_stailq_t *stailq;
429 	void *buf = NULL;
430 	struct spdk_bdev_mgmt_channel *mgmt_ch;
431 
432 	assert(cb != NULL);
433 	assert(bdev_io->u.bdev.iovs != NULL);
434 
435 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
436 		/* Buffer already present */
437 		cb(bdev_io->internal.ch->channel, bdev_io);
438 		return;
439 	}
440 
441 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
442 	mgmt_ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
443 
444 	bdev_io->internal.buf_len = len;
445 	bdev_io->internal.get_buf_cb = cb;
446 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
447 		pool = g_bdev_mgr.buf_small_pool;
448 		stailq = &mgmt_ch->need_buf_small;
449 	} else {
450 		pool = g_bdev_mgr.buf_large_pool;
451 		stailq = &mgmt_ch->need_buf_large;
452 	}
453 
454 	buf = spdk_mempool_get(pool);
455 
456 	if (!buf) {
457 		STAILQ_INSERT_TAIL(stailq, bdev_io, internal.buf_link);
458 	} else {
459 		spdk_bdev_io_set_buf(bdev_io, buf);
460 	}
461 }
462 
463 static int
464 spdk_bdev_module_get_max_ctx_size(void)
465 {
466 	struct spdk_bdev_module *bdev_module;
467 	int max_bdev_module_size = 0;
468 
469 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
470 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
471 			max_bdev_module_size = bdev_module->get_ctx_size();
472 		}
473 	}
474 
475 	return max_bdev_module_size;
476 }
477 
478 void
479 spdk_bdev_config_text(FILE *fp)
480 {
481 	struct spdk_bdev_module *bdev_module;
482 
483 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
484 		if (bdev_module->config_text) {
485 			bdev_module->config_text(fp);
486 		}
487 	}
488 }
489 
490 void
491 spdk_bdev_subsystem_config_json(struct spdk_json_write_ctx *w)
492 {
493 	struct spdk_bdev_module *bdev_module;
494 	struct spdk_bdev *bdev;
495 
496 	assert(w != NULL);
497 
498 	spdk_json_write_array_begin(w);
499 
500 	spdk_json_write_object_begin(w);
501 	spdk_json_write_named_string(w, "method", "set_bdev_options");
502 	spdk_json_write_name(w, "params");
503 	spdk_json_write_object_begin(w);
504 	spdk_json_write_named_uint32(w, "bdev_io_pool_size", g_bdev_opts.bdev_io_pool_size);
505 	spdk_json_write_named_uint32(w, "bdev_io_cache_size", g_bdev_opts.bdev_io_cache_size);
506 	spdk_json_write_object_end(w);
507 	spdk_json_write_object_end(w);
508 
509 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
510 		if (bdev_module->config_json) {
511 			bdev_module->config_json(w);
512 		}
513 	}
514 
515 	TAILQ_FOREACH(bdev, &g_bdev_mgr.bdevs, link) {
516 		spdk_bdev_config_json(bdev, w);
517 	}
518 
519 	spdk_json_write_array_end(w);
520 }
521 
522 static int
523 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
524 {
525 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
526 	struct spdk_bdev_io *bdev_io;
527 	uint32_t i;
528 
529 	STAILQ_INIT(&ch->need_buf_small);
530 	STAILQ_INIT(&ch->need_buf_large);
531 
532 	STAILQ_INIT(&ch->per_thread_cache);
533 	ch->bdev_io_cache_size = g_bdev_opts.bdev_io_cache_size;
534 
535 	/* Pre-populate bdev_io cache to ensure this thread cannot be starved. */
536 	ch->per_thread_cache_count = 0;
537 	for (i = 0; i < ch->bdev_io_cache_size; i++) {
538 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
539 		assert(bdev_io != NULL);
540 		ch->per_thread_cache_count++;
541 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
542 	}
543 
544 	TAILQ_INIT(&ch->shared_resources);
545 	TAILQ_INIT(&ch->io_wait_queue);
546 
547 	return 0;
548 }
549 
550 static void
551 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
552 {
553 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
554 	struct spdk_bdev_io *bdev_io;
555 
556 	if (!STAILQ_EMPTY(&ch->need_buf_small) || !STAILQ_EMPTY(&ch->need_buf_large)) {
557 		SPDK_ERRLOG("Pending I/O list wasn't empty on mgmt channel free\n");
558 	}
559 
560 	if (!TAILQ_EMPTY(&ch->shared_resources)) {
561 		SPDK_ERRLOG("Module channel list wasn't empty on mgmt channel free\n");
562 	}
563 
564 	while (!STAILQ_EMPTY(&ch->per_thread_cache)) {
565 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
566 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
567 		ch->per_thread_cache_count--;
568 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
569 	}
570 
571 	assert(ch->per_thread_cache_count == 0);
572 }
573 
574 static void
575 spdk_bdev_init_complete(int rc)
576 {
577 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
578 	void *cb_arg = g_init_cb_arg;
579 	struct spdk_bdev_module *m;
580 
581 	g_bdev_mgr.init_complete = true;
582 	g_init_cb_fn = NULL;
583 	g_init_cb_arg = NULL;
584 
585 	/*
586 	 * For modules that need to know when subsystem init is complete,
587 	 * inform them now.
588 	 */
589 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
590 		if (m->init_complete) {
591 			m->init_complete();
592 		}
593 	}
594 
595 	cb_fn(cb_arg, rc);
596 }
597 
598 static void
599 spdk_bdev_module_action_complete(void)
600 {
601 	struct spdk_bdev_module *m;
602 
603 	/*
604 	 * Don't finish bdev subsystem initialization if
605 	 * module pre-initialization is still in progress, or
606 	 * the subsystem been already initialized.
607 	 */
608 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
609 		return;
610 	}
611 
612 	/*
613 	 * Check all bdev modules for inits/examinations in progress. If any
614 	 * exist, return immediately since we cannot finish bdev subsystem
615 	 * initialization until all are completed.
616 	 */
617 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
618 		if (m->action_in_progress > 0) {
619 			return;
620 		}
621 	}
622 
623 	/*
624 	 * Modules already finished initialization - now that all
625 	 * the bdev modules have finished their asynchronous I/O
626 	 * processing, the entire bdev layer can be marked as complete.
627 	 */
628 	spdk_bdev_init_complete(0);
629 }
630 
631 static void
632 spdk_bdev_module_action_done(struct spdk_bdev_module *module)
633 {
634 	assert(module->action_in_progress > 0);
635 	module->action_in_progress--;
636 	spdk_bdev_module_action_complete();
637 }
638 
639 void
640 spdk_bdev_module_init_done(struct spdk_bdev_module *module)
641 {
642 	spdk_bdev_module_action_done(module);
643 }
644 
645 void
646 spdk_bdev_module_examine_done(struct spdk_bdev_module *module)
647 {
648 	spdk_bdev_module_action_done(module);
649 }
650 
651 static int
652 spdk_bdev_modules_init(void)
653 {
654 	struct spdk_bdev_module *module;
655 	int rc = 0;
656 
657 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
658 		rc = module->module_init();
659 		if (rc != 0) {
660 			break;
661 		}
662 	}
663 
664 	g_bdev_mgr.module_init_complete = true;
665 	return rc;
666 }
667 
668 void
669 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
670 {
671 	struct spdk_conf_section *sp;
672 	struct spdk_bdev_opts bdev_opts;
673 	int32_t bdev_io_pool_size, bdev_io_cache_size;
674 	int cache_size;
675 	int rc = 0;
676 	char mempool_name[32];
677 
678 	assert(cb_fn != NULL);
679 
680 	sp = spdk_conf_find_section(NULL, "Bdev");
681 	if (sp != NULL) {
682 		spdk_bdev_get_opts(&bdev_opts);
683 
684 		bdev_io_pool_size = spdk_conf_section_get_intval(sp, "BdevIoPoolSize");
685 		if (bdev_io_pool_size >= 0) {
686 			bdev_opts.bdev_io_pool_size = bdev_io_pool_size;
687 		}
688 
689 		bdev_io_cache_size = spdk_conf_section_get_intval(sp, "BdevIoCacheSize");
690 		if (bdev_io_cache_size >= 0) {
691 			bdev_opts.bdev_io_cache_size = bdev_io_cache_size;
692 		}
693 
694 		if (spdk_bdev_set_opts(&bdev_opts)) {
695 			spdk_bdev_init_complete(-1);
696 			return;
697 		}
698 
699 		assert(memcmp(&bdev_opts, &g_bdev_opts, sizeof(bdev_opts)) == 0);
700 	}
701 
702 	g_init_cb_fn = cb_fn;
703 	g_init_cb_arg = cb_arg;
704 
705 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
706 
707 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
708 				  g_bdev_opts.bdev_io_pool_size,
709 				  sizeof(struct spdk_bdev_io) +
710 				  spdk_bdev_module_get_max_ctx_size(),
711 				  0,
712 				  SPDK_ENV_SOCKET_ID_ANY);
713 
714 	if (g_bdev_mgr.bdev_io_pool == NULL) {
715 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
716 		spdk_bdev_init_complete(-1);
717 		return;
718 	}
719 
720 	/**
721 	 * Ensure no more than half of the total buffers end up local caches, by
722 	 *   using spdk_thread_get_count() to determine how many local caches we need
723 	 *   to account for.
724 	 */
725 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_thread_get_count());
726 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
727 
728 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
729 				    BUF_SMALL_POOL_SIZE,
730 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
731 				    cache_size,
732 				    SPDK_ENV_SOCKET_ID_ANY);
733 	if (!g_bdev_mgr.buf_small_pool) {
734 		SPDK_ERRLOG("create rbuf small pool failed\n");
735 		spdk_bdev_init_complete(-1);
736 		return;
737 	}
738 
739 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_thread_get_count());
740 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
741 
742 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
743 				    BUF_LARGE_POOL_SIZE,
744 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
745 				    cache_size,
746 				    SPDK_ENV_SOCKET_ID_ANY);
747 	if (!g_bdev_mgr.buf_large_pool) {
748 		SPDK_ERRLOG("create rbuf large pool failed\n");
749 		spdk_bdev_init_complete(-1);
750 		return;
751 	}
752 
753 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
754 				 NULL);
755 	if (!g_bdev_mgr.zero_buffer) {
756 		SPDK_ERRLOG("create bdev zero buffer failed\n");
757 		spdk_bdev_init_complete(-1);
758 		return;
759 	}
760 
761 #ifdef SPDK_CONFIG_VTUNE
762 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
763 #endif
764 
765 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
766 				spdk_bdev_mgmt_channel_destroy,
767 				sizeof(struct spdk_bdev_mgmt_channel));
768 
769 	rc = spdk_bdev_modules_init();
770 	if (rc != 0) {
771 		SPDK_ERRLOG("bdev modules init failed\n");
772 		spdk_bdev_init_complete(-1);
773 		return;
774 	}
775 
776 	spdk_bdev_module_action_complete();
777 }
778 
779 static void
780 spdk_bdev_mgr_unregister_cb(void *io_device)
781 {
782 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
783 
784 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != g_bdev_opts.bdev_io_pool_size) {
785 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
786 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
787 			    g_bdev_opts.bdev_io_pool_size);
788 	}
789 
790 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
791 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
792 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
793 			    BUF_SMALL_POOL_SIZE);
794 		assert(false);
795 	}
796 
797 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
798 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
799 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
800 			    BUF_LARGE_POOL_SIZE);
801 		assert(false);
802 	}
803 
804 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
805 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
806 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
807 	spdk_dma_free(g_bdev_mgr.zero_buffer);
808 
809 	cb_fn(g_fini_cb_arg);
810 	g_fini_cb_fn = NULL;
811 	g_fini_cb_arg = NULL;
812 }
813 
814 static struct spdk_bdev_module *g_resume_bdev_module = NULL;
815 
816 static void
817 spdk_bdev_module_finish_iter(void *arg)
818 {
819 	struct spdk_bdev_module *bdev_module;
820 
821 	/* Start iterating from the last touched module */
822 	if (!g_resume_bdev_module) {
823 		bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules);
824 	} else {
825 		bdev_module = TAILQ_NEXT(g_resume_bdev_module, tailq);
826 	}
827 
828 	while (bdev_module) {
829 		if (bdev_module->async_fini) {
830 			/* Save our place so we can resume later. We must
831 			 * save the variable here, before calling module_fini()
832 			 * below, because in some cases the module may immediately
833 			 * call spdk_bdev_module_finish_done() and re-enter
834 			 * this function to continue iterating. */
835 			g_resume_bdev_module = bdev_module;
836 		}
837 
838 		if (bdev_module->module_fini) {
839 			bdev_module->module_fini();
840 		}
841 
842 		if (bdev_module->async_fini) {
843 			return;
844 		}
845 
846 		bdev_module = TAILQ_NEXT(bdev_module, tailq);
847 	}
848 
849 	g_resume_bdev_module = NULL;
850 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_mgr_unregister_cb);
851 }
852 
853 void
854 spdk_bdev_module_finish_done(void)
855 {
856 	if (spdk_get_thread() != g_fini_thread) {
857 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
858 	} else {
859 		spdk_bdev_module_finish_iter(NULL);
860 	}
861 }
862 
863 static void
864 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
865 {
866 	struct spdk_bdev *bdev = cb_arg;
867 
868 	if (bdeverrno && bdev) {
869 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
870 			     bdev->name);
871 
872 		/*
873 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
874 		 *  bdev; try to continue by manually removing this bdev from the list and continue
875 		 *  with the next bdev in the list.
876 		 */
877 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
878 	}
879 
880 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
881 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
882 		/*
883 		 * Bdev module finish need to be deffered as we might be in the middle of some context
884 		 * (like bdev part free) that will use this bdev (or private bdev driver ctx data)
885 		 * after returning.
886 		 */
887 		spdk_thread_send_msg(spdk_get_thread(), spdk_bdev_module_finish_iter, NULL);
888 		return;
889 	}
890 
891 	/*
892 	 * Unregister the first bdev in the list.
893 	 *
894 	 * spdk_bdev_unregister() will handle the case where the bdev has open descriptors by
895 	 *  calling the remove_cb of the descriptors first.
896 	 *
897 	 * Once this bdev and all of its open descriptors have been cleaned up, this function
898 	 *  will be called again via the unregister completion callback to continue the cleanup
899 	 *  process with the next bdev.
900 	 */
901 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
902 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
903 	spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
904 }
905 
906 void
907 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
908 {
909 	assert(cb_fn != NULL);
910 
911 	g_fini_thread = spdk_get_thread();
912 
913 	g_fini_cb_fn = cb_fn;
914 	g_fini_cb_arg = cb_arg;
915 
916 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
917 }
918 
919 static struct spdk_bdev_io *
920 spdk_bdev_get_io(struct spdk_bdev_channel *channel)
921 {
922 	struct spdk_bdev_mgmt_channel *ch = channel->shared_resource->mgmt_ch;
923 	struct spdk_bdev_io *bdev_io;
924 
925 	if (ch->per_thread_cache_count > 0) {
926 		bdev_io = STAILQ_FIRST(&ch->per_thread_cache);
927 		STAILQ_REMOVE_HEAD(&ch->per_thread_cache, internal.buf_link);
928 		ch->per_thread_cache_count--;
929 	} else if (spdk_unlikely(!TAILQ_EMPTY(&ch->io_wait_queue))) {
930 		/*
931 		 * Don't try to look for bdev_ios in the global pool if there are
932 		 * waiters on bdev_ios - we don't want this caller to jump the line.
933 		 */
934 		bdev_io = NULL;
935 	} else {
936 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
937 	}
938 
939 	return bdev_io;
940 }
941 
942 void
943 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
944 {
945 	struct spdk_bdev_mgmt_channel *ch = bdev_io->internal.ch->shared_resource->mgmt_ch;
946 
947 	assert(bdev_io != NULL);
948 	assert(bdev_io->internal.status != SPDK_BDEV_IO_STATUS_PENDING);
949 
950 	if (bdev_io->internal.buf != NULL) {
951 		spdk_bdev_io_put_buf(bdev_io);
952 	}
953 
954 	if (ch->per_thread_cache_count < ch->bdev_io_cache_size) {
955 		ch->per_thread_cache_count++;
956 		STAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, internal.buf_link);
957 		while (ch->per_thread_cache_count > 0 && !TAILQ_EMPTY(&ch->io_wait_queue)) {
958 			struct spdk_bdev_io_wait_entry *entry;
959 
960 			entry = TAILQ_FIRST(&ch->io_wait_queue);
961 			TAILQ_REMOVE(&ch->io_wait_queue, entry, link);
962 			entry->cb_fn(entry->cb_arg);
963 		}
964 	} else {
965 		/* We should never have a full cache with entries on the io wait queue. */
966 		assert(TAILQ_EMPTY(&ch->io_wait_queue));
967 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
968 	}
969 }
970 
971 static uint64_t
972 _spdk_bdev_get_io_size_in_byte(struct spdk_bdev_io *bdev_io)
973 {
974 	struct spdk_bdev	*bdev = bdev_io->bdev;
975 
976 	switch (bdev_io->type) {
977 	case SPDK_BDEV_IO_TYPE_NVME_ADMIN:
978 	case SPDK_BDEV_IO_TYPE_NVME_IO:
979 	case SPDK_BDEV_IO_TYPE_NVME_IO_MD:
980 		return bdev_io->u.nvme_passthru.nbytes;
981 	case SPDK_BDEV_IO_TYPE_READ:
982 	case SPDK_BDEV_IO_TYPE_WRITE:
983 	case SPDK_BDEV_IO_TYPE_UNMAP:
984 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
985 		return bdev_io->u.bdev.num_blocks * bdev->blocklen;
986 	default:
987 		return 0;
988 	}
989 }
990 
991 static void
992 _spdk_bdev_qos_io_submit(struct spdk_bdev_channel *ch)
993 {
994 	struct spdk_bdev_io		*bdev_io = NULL;
995 	struct spdk_bdev		*bdev = ch->bdev;
996 	struct spdk_bdev_qos		*qos = bdev->qos;
997 	struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
998 
999 	while (!TAILQ_EMPTY(&qos->queued)) {
1000 		if (qos->max_ios_per_timeslice > 0 &&
1001 		    qos->io_submitted_this_timeslice >= qos->max_ios_per_timeslice) {
1002 			break;
1003 		}
1004 
1005 		if (qos->max_byte_per_timeslice > 0 &&
1006 		    qos->byte_submitted_this_timeslice >= qos->max_byte_per_timeslice) {
1007 			break;
1008 		}
1009 
1010 		bdev_io = TAILQ_FIRST(&qos->queued);
1011 		TAILQ_REMOVE(&qos->queued, bdev_io, link);
1012 		qos->io_submitted_this_timeslice++;
1013 		qos->byte_submitted_this_timeslice += _spdk_bdev_get_io_size_in_byte(bdev_io);
1014 		ch->io_outstanding++;
1015 		shared_resource->io_outstanding++;
1016 		bdev->fn_table->submit_request(ch->channel, bdev_io);
1017 	}
1018 }
1019 
1020 static void
1021 _spdk_bdev_io_submit(void *ctx)
1022 {
1023 	struct spdk_bdev_io *bdev_io = ctx;
1024 	struct spdk_bdev *bdev = bdev_io->bdev;
1025 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1026 	struct spdk_io_channel *ch = bdev_ch->channel;
1027 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
1028 
1029 	bdev_io->internal.submit_tsc = spdk_get_ticks();
1030 	bdev_ch->io_outstanding++;
1031 	shared_resource->io_outstanding++;
1032 	bdev_io->internal.in_submit_request = true;
1033 	if (spdk_likely(bdev_ch->flags == 0)) {
1034 		if (spdk_likely(TAILQ_EMPTY(&shared_resource->nomem_io))) {
1035 			bdev->fn_table->submit_request(ch, bdev_io);
1036 		} else {
1037 			bdev_ch->io_outstanding--;
1038 			shared_resource->io_outstanding--;
1039 			TAILQ_INSERT_TAIL(&shared_resource->nomem_io, bdev_io, link);
1040 		}
1041 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
1042 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1043 	} else if (bdev_ch->flags & BDEV_CH_QOS_ENABLED) {
1044 		bdev_ch->io_outstanding--;
1045 		shared_resource->io_outstanding--;
1046 		TAILQ_INSERT_TAIL(&bdev->qos->queued, bdev_io, link);
1047 		_spdk_bdev_qos_io_submit(bdev_ch);
1048 	} else {
1049 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
1050 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1051 	}
1052 	bdev_io->internal.in_submit_request = false;
1053 }
1054 
1055 static void
1056 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
1057 {
1058 	struct spdk_bdev *bdev = bdev_io->bdev;
1059 	struct spdk_thread *thread = spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
1060 
1061 	assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1062 
1063 	if (bdev_io->internal.ch->flags & BDEV_CH_QOS_ENABLED) {
1064 		if (thread == bdev->qos->thread) {
1065 			_spdk_bdev_io_submit(bdev_io);
1066 		} else {
1067 			bdev_io->internal.io_submit_ch = bdev_io->internal.ch;
1068 			bdev_io->internal.ch = bdev->qos->ch;
1069 			spdk_thread_send_msg(bdev->qos->thread, _spdk_bdev_io_submit, bdev_io);
1070 		}
1071 	} else {
1072 		_spdk_bdev_io_submit(bdev_io);
1073 	}
1074 }
1075 
1076 static void
1077 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
1078 {
1079 	struct spdk_bdev *bdev = bdev_io->bdev;
1080 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
1081 	struct spdk_io_channel *ch = bdev_ch->channel;
1082 
1083 	assert(bdev_io->internal.status == SPDK_BDEV_IO_STATUS_PENDING);
1084 
1085 	bdev_io->internal.in_submit_request = true;
1086 	bdev->fn_table->submit_request(ch, bdev_io);
1087 	bdev_io->internal.in_submit_request = false;
1088 }
1089 
1090 static void
1091 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
1092 		  struct spdk_bdev *bdev, void *cb_arg,
1093 		  spdk_bdev_io_completion_cb cb)
1094 {
1095 	bdev_io->bdev = bdev;
1096 	bdev_io->internal.caller_ctx = cb_arg;
1097 	bdev_io->internal.cb = cb;
1098 	bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
1099 	bdev_io->internal.in_submit_request = false;
1100 	bdev_io->internal.buf = NULL;
1101 	bdev_io->internal.io_submit_ch = NULL;
1102 }
1103 
1104 bool
1105 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
1106 {
1107 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
1108 }
1109 
1110 int
1111 spdk_bdev_dump_info_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1112 {
1113 	if (bdev->fn_table->dump_info_json) {
1114 		return bdev->fn_table->dump_info_json(bdev->ctxt, w);
1115 	}
1116 
1117 	return 0;
1118 }
1119 
1120 void
1121 spdk_bdev_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1122 {
1123 	assert(bdev != NULL);
1124 	assert(w != NULL);
1125 
1126 	if (bdev->fn_table->write_config_json) {
1127 		bdev->fn_table->write_config_json(bdev, w);
1128 	} else {
1129 		spdk_json_write_object_begin(w);
1130 		spdk_json_write_named_string(w, "name", bdev->name);
1131 		spdk_json_write_object_end(w);
1132 	}
1133 }
1134 
1135 static void
1136 spdk_bdev_qos_update_max_quota_per_timeslice(struct spdk_bdev_qos *qos)
1137 {
1138 	uint64_t max_ios_per_timeslice = 0, max_byte_per_timeslice = 0;
1139 
1140 	if (qos->iops_rate_limit > 0) {
1141 		max_ios_per_timeslice = qos->iops_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC /
1142 					SPDK_BDEV_SEC_TO_USEC;
1143 		qos->max_ios_per_timeslice = spdk_max(max_ios_per_timeslice,
1144 						      SPDK_BDEV_QOS_MIN_IO_PER_TIMESLICE);
1145 	}
1146 
1147 	if (qos->byte_rate_limit > 0) {
1148 		max_byte_per_timeslice = qos->byte_rate_limit * SPDK_BDEV_QOS_TIMESLICE_IN_USEC /
1149 					 SPDK_BDEV_SEC_TO_USEC;
1150 		qos->max_byte_per_timeslice = spdk_max(max_byte_per_timeslice,
1151 						       SPDK_BDEV_QOS_MIN_BYTE_PER_TIMESLICE);
1152 	}
1153 }
1154 
1155 static int
1156 spdk_bdev_channel_poll_qos(void *arg)
1157 {
1158 	struct spdk_bdev_qos *qos = arg;
1159 
1160 	/* Reset for next round of rate limiting */
1161 	qos->io_submitted_this_timeslice = 0;
1162 	qos->byte_submitted_this_timeslice = 0;
1163 
1164 	_spdk_bdev_qos_io_submit(qos->ch);
1165 
1166 	return -1;
1167 }
1168 
1169 static void
1170 _spdk_bdev_channel_destroy_resource(struct spdk_bdev_channel *ch)
1171 {
1172 	struct spdk_bdev_shared_resource *shared_resource;
1173 
1174 	if (!ch) {
1175 		return;
1176 	}
1177 
1178 	if (ch->channel) {
1179 		spdk_put_io_channel(ch->channel);
1180 	}
1181 
1182 	assert(ch->io_outstanding == 0);
1183 
1184 	shared_resource = ch->shared_resource;
1185 	if (shared_resource) {
1186 		assert(ch->io_outstanding == 0);
1187 		assert(shared_resource->ref > 0);
1188 		shared_resource->ref--;
1189 		if (shared_resource->ref == 0) {
1190 			assert(shared_resource->io_outstanding == 0);
1191 			TAILQ_REMOVE(&shared_resource->mgmt_ch->shared_resources, shared_resource, link);
1192 			spdk_put_io_channel(spdk_io_channel_from_ctx(shared_resource->mgmt_ch));
1193 			free(shared_resource);
1194 		}
1195 	}
1196 }
1197 
1198 /* Caller must hold bdev->mutex. */
1199 static int
1200 _spdk_bdev_enable_qos(struct spdk_bdev *bdev, struct spdk_bdev_channel *ch)
1201 {
1202 	struct spdk_bdev_qos *qos = bdev->qos;
1203 
1204 	/* Rate limiting on this bdev enabled */
1205 	if (qos) {
1206 		if (qos->ch == NULL) {
1207 			struct spdk_io_channel *io_ch;
1208 
1209 			SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Selecting channel %p as QoS channel for bdev %s on thread %p\n", ch,
1210 				      bdev->name, spdk_get_thread());
1211 
1212 			/* No qos channel has been selected, so set one up */
1213 
1214 			/* Take another reference to ch */
1215 			io_ch = spdk_get_io_channel(__bdev_to_io_dev(bdev));
1216 			qos->ch = ch;
1217 
1218 			qos->thread = spdk_io_channel_get_thread(io_ch);
1219 
1220 			TAILQ_INIT(&qos->queued);
1221 			spdk_bdev_qos_update_max_quota_per_timeslice(qos);
1222 			qos->io_submitted_this_timeslice = 0;
1223 			qos->byte_submitted_this_timeslice = 0;
1224 
1225 			qos->poller = spdk_poller_register(spdk_bdev_channel_poll_qos,
1226 							   qos,
1227 							   SPDK_BDEV_QOS_TIMESLICE_IN_USEC);
1228 		}
1229 
1230 		ch->flags |= BDEV_CH_QOS_ENABLED;
1231 	}
1232 
1233 	return 0;
1234 }
1235 
1236 static int
1237 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
1238 {
1239 	struct spdk_bdev		*bdev = __bdev_from_io_dev(io_device);
1240 	struct spdk_bdev_channel	*ch = ctx_buf;
1241 	struct spdk_io_channel		*mgmt_io_ch;
1242 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1243 	struct spdk_bdev_shared_resource *shared_resource;
1244 
1245 	ch->bdev = bdev;
1246 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
1247 	if (!ch->channel) {
1248 		return -1;
1249 	}
1250 
1251 	mgmt_io_ch = spdk_get_io_channel(&g_bdev_mgr);
1252 	if (!mgmt_io_ch) {
1253 		return -1;
1254 	}
1255 
1256 	mgmt_ch = spdk_io_channel_get_ctx(mgmt_io_ch);
1257 	TAILQ_FOREACH(shared_resource, &mgmt_ch->shared_resources, link) {
1258 		if (shared_resource->shared_ch == ch->channel) {
1259 			spdk_put_io_channel(mgmt_io_ch);
1260 			shared_resource->ref++;
1261 			break;
1262 		}
1263 	}
1264 
1265 	if (shared_resource == NULL) {
1266 		shared_resource = calloc(1, sizeof(*shared_resource));
1267 		if (shared_resource == NULL) {
1268 			spdk_put_io_channel(mgmt_io_ch);
1269 			return -1;
1270 		}
1271 
1272 		shared_resource->mgmt_ch = mgmt_ch;
1273 		shared_resource->io_outstanding = 0;
1274 		TAILQ_INIT(&shared_resource->nomem_io);
1275 		shared_resource->nomem_threshold = 0;
1276 		shared_resource->shared_ch = ch->channel;
1277 		shared_resource->ref = 1;
1278 		TAILQ_INSERT_TAIL(&mgmt_ch->shared_resources, shared_resource, link);
1279 	}
1280 
1281 	memset(&ch->stat, 0, sizeof(ch->stat));
1282 	ch->stat.ticks_rate = spdk_get_ticks_hz();
1283 	ch->io_outstanding = 0;
1284 	TAILQ_INIT(&ch->queued_resets);
1285 	ch->flags = 0;
1286 	ch->shared_resource = shared_resource;
1287 
1288 #ifdef SPDK_CONFIG_VTUNE
1289 	{
1290 		char *name;
1291 		__itt_init_ittlib(NULL, 0);
1292 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
1293 		if (!name) {
1294 			_spdk_bdev_channel_destroy_resource(ch);
1295 			return -1;
1296 		}
1297 		ch->handle = __itt_string_handle_create(name);
1298 		free(name);
1299 		ch->start_tsc = spdk_get_ticks();
1300 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
1301 		memset(&ch->prev_stat, 0, sizeof(ch->prev_stat));
1302 	}
1303 #endif
1304 
1305 	pthread_mutex_lock(&bdev->mutex);
1306 
1307 	if (_spdk_bdev_enable_qos(bdev, ch)) {
1308 		_spdk_bdev_channel_destroy_resource(ch);
1309 		pthread_mutex_unlock(&bdev->mutex);
1310 		return -1;
1311 	}
1312 
1313 	pthread_mutex_unlock(&bdev->mutex);
1314 
1315 	return 0;
1316 }
1317 
1318 /*
1319  * Abort I/O that are waiting on a data buffer.  These types of I/O are
1320  *  linked using the spdk_bdev_io internal.buf_link TAILQ_ENTRY.
1321  */
1322 static void
1323 _spdk_bdev_abort_buf_io(bdev_io_stailq_t *queue, struct spdk_bdev_channel *ch)
1324 {
1325 	bdev_io_stailq_t tmp;
1326 	struct spdk_bdev_io *bdev_io;
1327 
1328 	STAILQ_INIT(&tmp);
1329 
1330 	while (!STAILQ_EMPTY(queue)) {
1331 		bdev_io = STAILQ_FIRST(queue);
1332 		STAILQ_REMOVE_HEAD(queue, internal.buf_link);
1333 		if (bdev_io->internal.ch == ch) {
1334 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1335 		} else {
1336 			STAILQ_INSERT_TAIL(&tmp, bdev_io, internal.buf_link);
1337 		}
1338 	}
1339 
1340 	STAILQ_SWAP(&tmp, queue, spdk_bdev_io);
1341 }
1342 
1343 /*
1344  * Abort I/O that are queued waiting for submission.  These types of I/O are
1345  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
1346  */
1347 static void
1348 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
1349 {
1350 	struct spdk_bdev_io *bdev_io, *tmp;
1351 
1352 	TAILQ_FOREACH_SAFE(bdev_io, queue, link, tmp) {
1353 		if (bdev_io->internal.ch == ch) {
1354 			TAILQ_REMOVE(queue, bdev_io, link);
1355 			/*
1356 			 * spdk_bdev_io_complete() assumes that the completed I/O had
1357 			 *  been submitted to the bdev module.  Since in this case it
1358 			 *  hadn't, bump io_outstanding to account for the decrement
1359 			 *  that spdk_bdev_io_complete() will do.
1360 			 */
1361 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
1362 				ch->io_outstanding++;
1363 				ch->shared_resource->io_outstanding++;
1364 			}
1365 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1366 		}
1367 	}
1368 }
1369 
1370 static void
1371 spdk_bdev_qos_channel_destroy(void *cb_arg)
1372 {
1373 	struct spdk_bdev_qos *qos = cb_arg;
1374 
1375 	spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
1376 	spdk_poller_unregister(&qos->poller);
1377 
1378 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Free QoS %p.\n", qos);
1379 
1380 	free(qos);
1381 }
1382 
1383 static int
1384 spdk_bdev_qos_destroy(struct spdk_bdev *bdev)
1385 {
1386 	/*
1387 	 * Cleanly shutting down the QoS poller is tricky, because
1388 	 * during the asynchronous operation the user could open
1389 	 * a new descriptor and create a new channel, spawning
1390 	 * a new QoS poller.
1391 	 *
1392 	 * The strategy is to create a new QoS structure here and swap it
1393 	 * in. The shutdown path then continues to refer to the old one
1394 	 * until it completes and then releases it.
1395 	 */
1396 	struct spdk_bdev_qos *new_qos, *old_qos;
1397 
1398 	old_qos = bdev->qos;
1399 
1400 	new_qos = calloc(1, sizeof(*new_qos));
1401 	if (!new_qos) {
1402 		SPDK_ERRLOG("Unable to allocate memory to shut down QoS.\n");
1403 		return -ENOMEM;
1404 	}
1405 
1406 	/* Copy the old QoS data into the newly allocated structure */
1407 	memcpy(new_qos, old_qos, sizeof(*new_qos));
1408 
1409 	/* Zero out the key parts of the QoS structure */
1410 	new_qos->ch = NULL;
1411 	new_qos->thread = NULL;
1412 	new_qos->max_ios_per_timeslice = 0;
1413 	new_qos->max_byte_per_timeslice = 0;
1414 	new_qos->io_submitted_this_timeslice = 0;
1415 	new_qos->byte_submitted_this_timeslice = 0;
1416 	new_qos->poller = NULL;
1417 	TAILQ_INIT(&new_qos->queued);
1418 
1419 	bdev->qos = new_qos;
1420 
1421 	spdk_thread_send_msg(old_qos->thread, spdk_bdev_qos_channel_destroy,
1422 			     old_qos);
1423 
1424 	/* It is safe to continue with destroying the bdev even though the QoS channel hasn't
1425 	 * been destroyed yet. The destruction path will end up waiting for the final
1426 	 * channel to be put before it releases resources. */
1427 
1428 	return 0;
1429 }
1430 
1431 static void
1432 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
1433 {
1434 	struct spdk_bdev_channel	*ch = ctx_buf;
1435 	struct spdk_bdev_mgmt_channel	*mgmt_ch;
1436 	struct spdk_bdev_shared_resource *shared_resource = ch->shared_resource;
1437 
1438 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Destroying channel %p for bdev %s on thread %p\n", ch, ch->bdev->name,
1439 		      spdk_get_thread());
1440 
1441 	mgmt_ch = shared_resource->mgmt_ch;
1442 
1443 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
1444 	_spdk_bdev_abort_queued_io(&shared_resource->nomem_io, ch);
1445 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_small, ch);
1446 	_spdk_bdev_abort_buf_io(&mgmt_ch->need_buf_large, ch);
1447 
1448 	_spdk_bdev_channel_destroy_resource(ch);
1449 }
1450 
1451 int
1452 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
1453 {
1454 	struct spdk_bdev_alias *tmp;
1455 
1456 	if (alias == NULL) {
1457 		SPDK_ERRLOG("Empty alias passed\n");
1458 		return -EINVAL;
1459 	}
1460 
1461 	if (spdk_bdev_get_by_name(alias)) {
1462 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
1463 		return -EEXIST;
1464 	}
1465 
1466 	tmp = calloc(1, sizeof(*tmp));
1467 	if (tmp == NULL) {
1468 		SPDK_ERRLOG("Unable to allocate alias\n");
1469 		return -ENOMEM;
1470 	}
1471 
1472 	tmp->alias = strdup(alias);
1473 	if (tmp->alias == NULL) {
1474 		free(tmp);
1475 		SPDK_ERRLOG("Unable to allocate alias\n");
1476 		return -ENOMEM;
1477 	}
1478 
1479 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
1480 
1481 	return 0;
1482 }
1483 
1484 int
1485 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
1486 {
1487 	struct spdk_bdev_alias *tmp;
1488 
1489 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
1490 		if (strcmp(alias, tmp->alias) == 0) {
1491 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
1492 			free(tmp->alias);
1493 			free(tmp);
1494 			return 0;
1495 		}
1496 	}
1497 
1498 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1499 
1500 	return -ENOENT;
1501 }
1502 
1503 struct spdk_io_channel *
1504 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1505 {
1506 	return spdk_get_io_channel(__bdev_to_io_dev(desc->bdev));
1507 }
1508 
1509 const char *
1510 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1511 {
1512 	return bdev->name;
1513 }
1514 
1515 const char *
1516 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1517 {
1518 	return bdev->product_name;
1519 }
1520 
1521 const struct spdk_bdev_aliases_list *
1522 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1523 {
1524 	return &bdev->aliases;
1525 }
1526 
1527 uint32_t
1528 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1529 {
1530 	return bdev->blocklen;
1531 }
1532 
1533 uint64_t
1534 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
1535 {
1536 	return bdev->blockcnt;
1537 }
1538 
1539 uint64_t
1540 spdk_bdev_get_qos_ios_per_sec(struct spdk_bdev *bdev)
1541 {
1542 	uint64_t iops_rate_limit = 0;
1543 
1544 	pthread_mutex_lock(&bdev->mutex);
1545 	if (bdev->qos) {
1546 		iops_rate_limit = bdev->qos->iops_rate_limit;
1547 	}
1548 	pthread_mutex_unlock(&bdev->mutex);
1549 
1550 	return iops_rate_limit;
1551 }
1552 
1553 size_t
1554 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
1555 {
1556 	/* TODO: push this logic down to the bdev modules */
1557 	if (bdev->need_aligned_buffer) {
1558 		return bdev->blocklen;
1559 	}
1560 
1561 	return 1;
1562 }
1563 
1564 uint32_t
1565 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
1566 {
1567 	return bdev->optimal_io_boundary;
1568 }
1569 
1570 bool
1571 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
1572 {
1573 	return bdev->write_cache;
1574 }
1575 
1576 const struct spdk_uuid *
1577 spdk_bdev_get_uuid(const struct spdk_bdev *bdev)
1578 {
1579 	return &bdev->uuid;
1580 }
1581 
1582 int
1583 spdk_bdev_notify_blockcnt_change(struct spdk_bdev *bdev, uint64_t size)
1584 {
1585 	int ret;
1586 
1587 	pthread_mutex_lock(&bdev->mutex);
1588 
1589 	/* bdev has open descriptors */
1590 	if (!TAILQ_EMPTY(&bdev->open_descs) &&
1591 	    bdev->blockcnt > size) {
1592 		ret = -EBUSY;
1593 	} else {
1594 		bdev->blockcnt = size;
1595 		ret = 0;
1596 	}
1597 
1598 	pthread_mutex_unlock(&bdev->mutex);
1599 
1600 	return ret;
1601 }
1602 
1603 /*
1604  * Convert I/O offset and length from bytes to blocks.
1605  *
1606  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
1607  */
1608 static uint64_t
1609 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
1610 			  uint64_t num_bytes, uint64_t *num_blocks)
1611 {
1612 	uint32_t block_size = bdev->blocklen;
1613 
1614 	*offset_blocks = offset_bytes / block_size;
1615 	*num_blocks = num_bytes / block_size;
1616 
1617 	return (offset_bytes % block_size) | (num_bytes % block_size);
1618 }
1619 
1620 static bool
1621 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
1622 {
1623 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
1624 	 * has been an overflow and hence the offset has been wrapped around */
1625 	if (offset_blocks + num_blocks < offset_blocks) {
1626 		return false;
1627 	}
1628 
1629 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
1630 	if (offset_blocks + num_blocks > bdev->blockcnt) {
1631 		return false;
1632 	}
1633 
1634 	return true;
1635 }
1636 
1637 int
1638 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1639 	       void *buf, uint64_t offset, uint64_t nbytes,
1640 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
1641 {
1642 	uint64_t offset_blocks, num_blocks;
1643 
1644 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1645 		return -EINVAL;
1646 	}
1647 
1648 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1649 }
1650 
1651 int
1652 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1653 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1654 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
1655 {
1656 	struct spdk_bdev *bdev = desc->bdev;
1657 	struct spdk_bdev_io *bdev_io;
1658 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1659 
1660 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1661 		return -EINVAL;
1662 	}
1663 
1664 	bdev_io = spdk_bdev_get_io(channel);
1665 	if (!bdev_io) {
1666 		return -ENOMEM;
1667 	}
1668 
1669 	bdev_io->internal.ch = channel;
1670 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1671 	bdev_io->u.bdev.iov.iov_base = buf;
1672 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1673 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1674 	bdev_io->u.bdev.iovcnt = 1;
1675 	bdev_io->u.bdev.num_blocks = num_blocks;
1676 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1677 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1678 
1679 	spdk_bdev_io_submit(bdev_io);
1680 	return 0;
1681 }
1682 
1683 int
1684 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1685 		struct iovec *iov, int iovcnt,
1686 		uint64_t offset, uint64_t nbytes,
1687 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1688 {
1689 	uint64_t offset_blocks, num_blocks;
1690 
1691 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1692 		return -EINVAL;
1693 	}
1694 
1695 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1696 }
1697 
1698 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1699 			   struct iovec *iov, int iovcnt,
1700 			   uint64_t offset_blocks, uint64_t num_blocks,
1701 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1702 {
1703 	struct spdk_bdev *bdev = desc->bdev;
1704 	struct spdk_bdev_io *bdev_io;
1705 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1706 
1707 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1708 		return -EINVAL;
1709 	}
1710 
1711 	bdev_io = spdk_bdev_get_io(channel);
1712 	if (!bdev_io) {
1713 		return -ENOMEM;
1714 	}
1715 
1716 	bdev_io->internal.ch = channel;
1717 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1718 	bdev_io->u.bdev.iovs = iov;
1719 	bdev_io->u.bdev.iovcnt = iovcnt;
1720 	bdev_io->u.bdev.num_blocks = num_blocks;
1721 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1722 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1723 
1724 	spdk_bdev_io_submit(bdev_io);
1725 	return 0;
1726 }
1727 
1728 int
1729 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1730 		void *buf, uint64_t offset, uint64_t nbytes,
1731 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1732 {
1733 	uint64_t offset_blocks, num_blocks;
1734 
1735 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1736 		return -EINVAL;
1737 	}
1738 
1739 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1740 }
1741 
1742 int
1743 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1744 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1745 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1746 {
1747 	struct spdk_bdev *bdev = desc->bdev;
1748 	struct spdk_bdev_io *bdev_io;
1749 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1750 
1751 	if (!desc->write) {
1752 		return -EBADF;
1753 	}
1754 
1755 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1756 		return -EINVAL;
1757 	}
1758 
1759 	bdev_io = spdk_bdev_get_io(channel);
1760 	if (!bdev_io) {
1761 		return -ENOMEM;
1762 	}
1763 
1764 	bdev_io->internal.ch = channel;
1765 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1766 	bdev_io->u.bdev.iov.iov_base = buf;
1767 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1768 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1769 	bdev_io->u.bdev.iovcnt = 1;
1770 	bdev_io->u.bdev.num_blocks = num_blocks;
1771 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1772 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1773 
1774 	spdk_bdev_io_submit(bdev_io);
1775 	return 0;
1776 }
1777 
1778 int
1779 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1780 		 struct iovec *iov, int iovcnt,
1781 		 uint64_t offset, uint64_t len,
1782 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
1783 {
1784 	uint64_t offset_blocks, num_blocks;
1785 
1786 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1787 		return -EINVAL;
1788 	}
1789 
1790 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1791 }
1792 
1793 int
1794 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1795 			struct iovec *iov, int iovcnt,
1796 			uint64_t offset_blocks, uint64_t num_blocks,
1797 			spdk_bdev_io_completion_cb cb, void *cb_arg)
1798 {
1799 	struct spdk_bdev *bdev = desc->bdev;
1800 	struct spdk_bdev_io *bdev_io;
1801 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1802 
1803 	if (!desc->write) {
1804 		return -EBADF;
1805 	}
1806 
1807 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1808 		return -EINVAL;
1809 	}
1810 
1811 	bdev_io = spdk_bdev_get_io(channel);
1812 	if (!bdev_io) {
1813 		return -ENOMEM;
1814 	}
1815 
1816 	bdev_io->internal.ch = channel;
1817 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1818 	bdev_io->u.bdev.iovs = iov;
1819 	bdev_io->u.bdev.iovcnt = iovcnt;
1820 	bdev_io->u.bdev.num_blocks = num_blocks;
1821 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1822 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1823 
1824 	spdk_bdev_io_submit(bdev_io);
1825 	return 0;
1826 }
1827 
1828 int
1829 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1830 		       uint64_t offset, uint64_t len,
1831 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1832 {
1833 	uint64_t offset_blocks, num_blocks;
1834 
1835 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1836 		return -EINVAL;
1837 	}
1838 
1839 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1840 }
1841 
1842 int
1843 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1844 			      uint64_t offset_blocks, uint64_t num_blocks,
1845 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1846 {
1847 	struct spdk_bdev *bdev = desc->bdev;
1848 	struct spdk_bdev_io *bdev_io;
1849 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1850 	uint64_t len;
1851 	bool split_request = false;
1852 
1853 	if (!desc->write) {
1854 		return -EBADF;
1855 	}
1856 
1857 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1858 		return -EINVAL;
1859 	}
1860 
1861 	bdev_io = spdk_bdev_get_io(channel);
1862 
1863 	if (!bdev_io) {
1864 		return -ENOMEM;
1865 	}
1866 
1867 	bdev_io->internal.ch = channel;
1868 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1869 
1870 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1871 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1872 		bdev_io->u.bdev.num_blocks = num_blocks;
1873 		bdev_io->u.bdev.iovs = NULL;
1874 		bdev_io->u.bdev.iovcnt = 0;
1875 
1876 	} else {
1877 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
1878 
1879 		len = spdk_bdev_get_block_size(bdev) * num_blocks;
1880 
1881 		if (len > ZERO_BUFFER_SIZE) {
1882 			split_request = true;
1883 			len = ZERO_BUFFER_SIZE;
1884 		}
1885 
1886 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1887 		bdev_io->u.bdev.iov.iov_base = g_bdev_mgr.zero_buffer;
1888 		bdev_io->u.bdev.iov.iov_len = len;
1889 		bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1890 		bdev_io->u.bdev.iovcnt = 1;
1891 		bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev);
1892 		bdev_io->u.bdev.split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks;
1893 		bdev_io->u.bdev.split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks;
1894 	}
1895 
1896 	if (split_request) {
1897 		bdev_io->u.bdev.stored_user_cb = cb;
1898 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split);
1899 	} else {
1900 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1901 	}
1902 	spdk_bdev_io_submit(bdev_io);
1903 	return 0;
1904 }
1905 
1906 int
1907 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1908 		uint64_t offset, uint64_t nbytes,
1909 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1910 {
1911 	uint64_t offset_blocks, num_blocks;
1912 
1913 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1914 		return -EINVAL;
1915 	}
1916 
1917 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1918 }
1919 
1920 int
1921 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1922 		       uint64_t offset_blocks, uint64_t num_blocks,
1923 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1924 {
1925 	struct spdk_bdev *bdev = desc->bdev;
1926 	struct spdk_bdev_io *bdev_io;
1927 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1928 
1929 	if (!desc->write) {
1930 		return -EBADF;
1931 	}
1932 
1933 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1934 		return -EINVAL;
1935 	}
1936 
1937 	if (num_blocks == 0) {
1938 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
1939 		return -EINVAL;
1940 	}
1941 
1942 	bdev_io = spdk_bdev_get_io(channel);
1943 	if (!bdev_io) {
1944 		return -ENOMEM;
1945 	}
1946 
1947 	bdev_io->internal.ch = channel;
1948 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1949 	bdev_io->u.bdev.iov.iov_base = NULL;
1950 	bdev_io->u.bdev.iov.iov_len = 0;
1951 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1952 	bdev_io->u.bdev.iovcnt = 1;
1953 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1954 	bdev_io->u.bdev.num_blocks = num_blocks;
1955 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1956 
1957 	spdk_bdev_io_submit(bdev_io);
1958 	return 0;
1959 }
1960 
1961 int
1962 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1963 		uint64_t offset, uint64_t length,
1964 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1965 {
1966 	uint64_t offset_blocks, num_blocks;
1967 
1968 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
1969 		return -EINVAL;
1970 	}
1971 
1972 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1973 }
1974 
1975 int
1976 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1977 		       uint64_t offset_blocks, uint64_t num_blocks,
1978 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1979 {
1980 	struct spdk_bdev *bdev = desc->bdev;
1981 	struct spdk_bdev_io *bdev_io;
1982 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1983 
1984 	if (!desc->write) {
1985 		return -EBADF;
1986 	}
1987 
1988 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1989 		return -EINVAL;
1990 	}
1991 
1992 	bdev_io = spdk_bdev_get_io(channel);
1993 	if (!bdev_io) {
1994 		return -ENOMEM;
1995 	}
1996 
1997 	bdev_io->internal.ch = channel;
1998 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1999 	bdev_io->u.bdev.iovs = NULL;
2000 	bdev_io->u.bdev.iovcnt = 0;
2001 	bdev_io->u.bdev.offset_blocks = offset_blocks;
2002 	bdev_io->u.bdev.num_blocks = num_blocks;
2003 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2004 
2005 	spdk_bdev_io_submit(bdev_io);
2006 	return 0;
2007 }
2008 
2009 static void
2010 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
2011 {
2012 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
2013 	struct spdk_bdev_io *bdev_io;
2014 
2015 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
2016 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, link);
2017 	spdk_bdev_io_submit_reset(bdev_io);
2018 }
2019 
2020 static void
2021 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
2022 {
2023 	struct spdk_io_channel		*ch;
2024 	struct spdk_bdev_channel	*channel;
2025 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
2026 	struct spdk_bdev_shared_resource *shared_resource;
2027 	bdev_io_tailq_t			tmp_queued;
2028 
2029 	TAILQ_INIT(&tmp_queued);
2030 
2031 	ch = spdk_io_channel_iter_get_channel(i);
2032 	channel = spdk_io_channel_get_ctx(ch);
2033 	shared_resource = channel->shared_resource;
2034 	mgmt_channel = shared_resource->mgmt_ch;
2035 
2036 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
2037 
2038 	if ((channel->flags & BDEV_CH_QOS_ENABLED) != 0) {
2039 		/* The QoS object is always valid and readable while
2040 		 * the channel flag is set, so the lock here should not
2041 		 * be necessary. We're not in the fast path though, so
2042 		 * just take it anyway. */
2043 		pthread_mutex_lock(&channel->bdev->mutex);
2044 		if (channel->bdev->qos->ch == channel) {
2045 			TAILQ_SWAP(&channel->bdev->qos->queued, &tmp_queued, spdk_bdev_io, link);
2046 		}
2047 		pthread_mutex_unlock(&channel->bdev->mutex);
2048 	}
2049 
2050 	_spdk_bdev_abort_queued_io(&shared_resource->nomem_io, channel);
2051 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
2052 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
2053 	_spdk_bdev_abort_queued_io(&tmp_queued, channel);
2054 
2055 	spdk_for_each_channel_continue(i, 0);
2056 }
2057 
2058 static void
2059 _spdk_bdev_start_reset(void *ctx)
2060 {
2061 	struct spdk_bdev_channel *ch = ctx;
2062 
2063 	spdk_for_each_channel(__bdev_to_io_dev(ch->bdev), _spdk_bdev_reset_freeze_channel,
2064 			      ch, _spdk_bdev_reset_dev);
2065 }
2066 
2067 static void
2068 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
2069 {
2070 	struct spdk_bdev *bdev = ch->bdev;
2071 
2072 	assert(!TAILQ_EMPTY(&ch->queued_resets));
2073 
2074 	pthread_mutex_lock(&bdev->mutex);
2075 	if (bdev->reset_in_progress == NULL) {
2076 		bdev->reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
2077 		/*
2078 		 * Take a channel reference for the target bdev for the life of this
2079 		 *  reset.  This guards against the channel getting destroyed while
2080 		 *  spdk_for_each_channel() calls related to this reset IO are in
2081 		 *  progress.  We will release the reference when this reset is
2082 		 *  completed.
2083 		 */
2084 		bdev->reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(__bdev_to_io_dev(bdev));
2085 		_spdk_bdev_start_reset(ch);
2086 	}
2087 	pthread_mutex_unlock(&bdev->mutex);
2088 }
2089 
2090 int
2091 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2092 		spdk_bdev_io_completion_cb cb, void *cb_arg)
2093 {
2094 	struct spdk_bdev *bdev = desc->bdev;
2095 	struct spdk_bdev_io *bdev_io;
2096 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2097 
2098 	bdev_io = spdk_bdev_get_io(channel);
2099 	if (!bdev_io) {
2100 		return -ENOMEM;
2101 	}
2102 
2103 	bdev_io->internal.ch = channel;
2104 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
2105 	bdev_io->u.reset.ch_ref = NULL;
2106 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2107 
2108 	pthread_mutex_lock(&bdev->mutex);
2109 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, link);
2110 	pthread_mutex_unlock(&bdev->mutex);
2111 
2112 	_spdk_bdev_channel_start_reset(channel);
2113 
2114 	return 0;
2115 }
2116 
2117 void
2118 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
2119 		      struct spdk_bdev_io_stat *stat)
2120 {
2121 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2122 
2123 	*stat = channel->stat;
2124 }
2125 
2126 static void
2127 _spdk_bdev_get_device_stat_done(struct spdk_io_channel_iter *i, int status)
2128 {
2129 	void *io_device = spdk_io_channel_iter_get_io_device(i);
2130 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
2131 
2132 	bdev_iostat_ctx->cb(__bdev_from_io_dev(io_device), bdev_iostat_ctx->stat,
2133 			    bdev_iostat_ctx->cb_arg, 0);
2134 	free(bdev_iostat_ctx);
2135 }
2136 
2137 static void
2138 _spdk_bdev_get_each_channel_stat(struct spdk_io_channel_iter *i)
2139 {
2140 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx = spdk_io_channel_iter_get_ctx(i);
2141 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2142 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2143 
2144 	bdev_iostat_ctx->stat->bytes_read += channel->stat.bytes_read;
2145 	bdev_iostat_ctx->stat->num_read_ops += channel->stat.num_read_ops;
2146 	bdev_iostat_ctx->stat->bytes_written += channel->stat.bytes_written;
2147 	bdev_iostat_ctx->stat->num_write_ops += channel->stat.num_write_ops;
2148 
2149 	spdk_for_each_channel_continue(i, 0);
2150 }
2151 
2152 void
2153 spdk_bdev_get_device_stat(struct spdk_bdev *bdev, struct spdk_bdev_io_stat *stat,
2154 			  spdk_bdev_get_device_stat_cb cb, void *cb_arg)
2155 {
2156 	struct spdk_bdev_iostat_ctx *bdev_iostat_ctx;
2157 
2158 	assert(bdev != NULL);
2159 	assert(stat != NULL);
2160 	assert(cb != NULL);
2161 
2162 	bdev_iostat_ctx = calloc(1, sizeof(struct spdk_bdev_iostat_ctx));
2163 	if (bdev_iostat_ctx == NULL) {
2164 		SPDK_ERRLOG("Unable to allocate memory for spdk_bdev_iostat_ctx\n");
2165 		cb(bdev, stat, cb_arg, -ENOMEM);
2166 		return;
2167 	}
2168 
2169 	bdev_iostat_ctx->stat = stat;
2170 	bdev_iostat_ctx->cb = cb;
2171 	bdev_iostat_ctx->cb_arg = cb_arg;
2172 
2173 	spdk_for_each_channel(__bdev_to_io_dev(bdev),
2174 			      _spdk_bdev_get_each_channel_stat,
2175 			      bdev_iostat_ctx,
2176 			      _spdk_bdev_get_device_stat_done);
2177 }
2178 
2179 int
2180 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2181 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2182 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2183 {
2184 	struct spdk_bdev *bdev = desc->bdev;
2185 	struct spdk_bdev_io *bdev_io;
2186 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2187 
2188 	if (!desc->write) {
2189 		return -EBADF;
2190 	}
2191 
2192 	bdev_io = spdk_bdev_get_io(channel);
2193 	if (!bdev_io) {
2194 		return -ENOMEM;
2195 	}
2196 
2197 	bdev_io->internal.ch = channel;
2198 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
2199 	bdev_io->u.nvme_passthru.cmd = *cmd;
2200 	bdev_io->u.nvme_passthru.buf = buf;
2201 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2202 	bdev_io->u.nvme_passthru.md_buf = NULL;
2203 	bdev_io->u.nvme_passthru.md_len = 0;
2204 
2205 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2206 
2207 	spdk_bdev_io_submit(bdev_io);
2208 	return 0;
2209 }
2210 
2211 int
2212 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2213 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
2214 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
2215 {
2216 	struct spdk_bdev *bdev = desc->bdev;
2217 	struct spdk_bdev_io *bdev_io;
2218 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2219 
2220 	if (!desc->write) {
2221 		/*
2222 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2223 		 *  to easily determine if the command is a read or write, but for now just
2224 		 *  do not allow io_passthru with a read-only descriptor.
2225 		 */
2226 		return -EBADF;
2227 	}
2228 
2229 	bdev_io = spdk_bdev_get_io(channel);
2230 	if (!bdev_io) {
2231 		return -ENOMEM;
2232 	}
2233 
2234 	bdev_io->internal.ch = channel;
2235 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
2236 	bdev_io->u.nvme_passthru.cmd = *cmd;
2237 	bdev_io->u.nvme_passthru.buf = buf;
2238 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2239 	bdev_io->u.nvme_passthru.md_buf = NULL;
2240 	bdev_io->u.nvme_passthru.md_len = 0;
2241 
2242 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2243 
2244 	spdk_bdev_io_submit(bdev_io);
2245 	return 0;
2246 }
2247 
2248 int
2249 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
2250 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
2251 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
2252 {
2253 	struct spdk_bdev *bdev = desc->bdev;
2254 	struct spdk_bdev_io *bdev_io;
2255 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2256 
2257 	if (!desc->write) {
2258 		/*
2259 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
2260 		 *  to easily determine if the command is a read or write, but for now just
2261 		 *  do not allow io_passthru with a read-only descriptor.
2262 		 */
2263 		return -EBADF;
2264 	}
2265 
2266 	bdev_io = spdk_bdev_get_io(channel);
2267 	if (!bdev_io) {
2268 		return -ENOMEM;
2269 	}
2270 
2271 	bdev_io->internal.ch = channel;
2272 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
2273 	bdev_io->u.nvme_passthru.cmd = *cmd;
2274 	bdev_io->u.nvme_passthru.buf = buf;
2275 	bdev_io->u.nvme_passthru.nbytes = nbytes;
2276 	bdev_io->u.nvme_passthru.md_buf = md_buf;
2277 	bdev_io->u.nvme_passthru.md_len = md_len;
2278 
2279 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
2280 
2281 	spdk_bdev_io_submit(bdev_io);
2282 	return 0;
2283 }
2284 
2285 int
2286 spdk_bdev_queue_io_wait(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
2287 			struct spdk_bdev_io_wait_entry *entry)
2288 {
2289 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
2290 	struct spdk_bdev_mgmt_channel *mgmt_ch = channel->shared_resource->mgmt_ch;
2291 
2292 	if (bdev != entry->bdev) {
2293 		SPDK_ERRLOG("bdevs do not match\n");
2294 		return -EINVAL;
2295 	}
2296 
2297 	if (mgmt_ch->per_thread_cache_count > 0) {
2298 		SPDK_ERRLOG("Cannot queue io_wait if spdk_bdev_io available in per-thread cache\n");
2299 		return -EINVAL;
2300 	}
2301 
2302 	TAILQ_INSERT_TAIL(&mgmt_ch->io_wait_queue, entry, link);
2303 	return 0;
2304 }
2305 
2306 static void
2307 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
2308 {
2309 	struct spdk_bdev *bdev = bdev_ch->bdev;
2310 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
2311 	struct spdk_bdev_io *bdev_io;
2312 
2313 	if (shared_resource->io_outstanding > shared_resource->nomem_threshold) {
2314 		/*
2315 		 * Allow some more I/O to complete before retrying the nomem_io queue.
2316 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
2317 		 *  the context of a completion, because the resources for the I/O are
2318 		 *  not released until control returns to the bdev poller.  Also, we
2319 		 *  may require several small I/O to complete before a larger I/O
2320 		 *  (that requires splitting) can be submitted.
2321 		 */
2322 		return;
2323 	}
2324 
2325 	while (!TAILQ_EMPTY(&shared_resource->nomem_io)) {
2326 		bdev_io = TAILQ_FIRST(&shared_resource->nomem_io);
2327 		TAILQ_REMOVE(&shared_resource->nomem_io, bdev_io, link);
2328 		bdev_io->internal.ch->io_outstanding++;
2329 		shared_resource->io_outstanding++;
2330 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_PENDING;
2331 		bdev->fn_table->submit_request(bdev_io->internal.ch->channel, bdev_io);
2332 		if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NOMEM) {
2333 			break;
2334 		}
2335 	}
2336 }
2337 
2338 static inline void
2339 _spdk_bdev_io_complete(void *ctx)
2340 {
2341 	struct spdk_bdev_io *bdev_io = ctx;
2342 
2343 	if (spdk_unlikely(bdev_io->internal.in_submit_request || bdev_io->internal.io_submit_ch)) {
2344 		/*
2345 		 * Send the completion to the thread that originally submitted the I/O,
2346 		 * which may not be the current thread in the case of QoS.
2347 		 */
2348 		if (bdev_io->internal.io_submit_ch) {
2349 			bdev_io->internal.ch = bdev_io->internal.io_submit_ch;
2350 			bdev_io->internal.io_submit_ch = NULL;
2351 		}
2352 
2353 		/*
2354 		 * Defer completion to avoid potential infinite recursion if the
2355 		 * user's completion callback issues a new I/O.
2356 		 */
2357 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->internal.ch->channel),
2358 				     _spdk_bdev_io_complete, bdev_io);
2359 		return;
2360 	}
2361 
2362 	if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2363 		switch (bdev_io->type) {
2364 		case SPDK_BDEV_IO_TYPE_READ:
2365 			bdev_io->internal.ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2366 			bdev_io->internal.ch->stat.num_read_ops++;
2367 			bdev_io->internal.ch->stat.read_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc);
2368 			break;
2369 		case SPDK_BDEV_IO_TYPE_WRITE:
2370 			bdev_io->internal.ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
2371 			bdev_io->internal.ch->stat.num_write_ops++;
2372 			bdev_io->internal.ch->stat.write_latency_ticks += (spdk_get_ticks() - bdev_io->internal.submit_tsc);
2373 			break;
2374 		default:
2375 			break;
2376 		}
2377 	}
2378 
2379 #ifdef SPDK_CONFIG_VTUNE
2380 	uint64_t now_tsc = spdk_get_ticks();
2381 	if (now_tsc > (bdev_io->internal.ch->start_tsc + bdev_io->internal.ch->interval_tsc)) {
2382 		uint64_t data[5];
2383 
2384 		data[0] = bdev_io->internal.ch->stat.num_read_ops - bdev_io->internal.ch->prev_stat.num_read_ops;
2385 		data[1] = bdev_io->internal.ch->stat.bytes_read - bdev_io->internal.ch->prev_stat.bytes_read;
2386 		data[2] = bdev_io->internal.ch->stat.num_write_ops - bdev_io->internal.ch->prev_stat.num_write_ops;
2387 		data[3] = bdev_io->internal.ch->stat.bytes_written - bdev_io->internal.ch->prev_stat.bytes_written;
2388 		data[4] = bdev_io->bdev->fn_table->get_spin_time ?
2389 			  bdev_io->bdev->fn_table->get_spin_time(bdev_io->internal.ch->channel) : 0;
2390 
2391 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->internal.ch->handle,
2392 				   __itt_metadata_u64, 5, data);
2393 
2394 		bdev_io->internal.ch->prev_stat = bdev_io->internal.ch->stat;
2395 		bdev_io->internal.ch->start_tsc = now_tsc;
2396 	}
2397 #endif
2398 
2399 	assert(bdev_io->internal.cb != NULL);
2400 	assert(spdk_get_thread() == spdk_io_channel_get_thread(bdev_io->internal.ch->channel));
2401 
2402 	bdev_io->internal.cb(bdev_io, bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS,
2403 			     bdev_io->internal.caller_ctx);
2404 }
2405 
2406 static void
2407 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
2408 {
2409 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
2410 
2411 	if (bdev_io->u.reset.ch_ref != NULL) {
2412 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
2413 		bdev_io->u.reset.ch_ref = NULL;
2414 	}
2415 
2416 	_spdk_bdev_io_complete(bdev_io);
2417 }
2418 
2419 static void
2420 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
2421 {
2422 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
2423 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
2424 
2425 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
2426 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
2427 		_spdk_bdev_channel_start_reset(ch);
2428 	}
2429 
2430 	spdk_for_each_channel_continue(i, 0);
2431 }
2432 
2433 void
2434 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
2435 {
2436 	struct spdk_bdev *bdev = bdev_io->bdev;
2437 	struct spdk_bdev_channel *bdev_ch = bdev_io->internal.ch;
2438 	struct spdk_bdev_shared_resource *shared_resource = bdev_ch->shared_resource;
2439 
2440 	bdev_io->internal.status = status;
2441 
2442 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
2443 		bool unlock_channels = false;
2444 
2445 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
2446 			SPDK_ERRLOG("NOMEM returned for reset\n");
2447 		}
2448 		pthread_mutex_lock(&bdev->mutex);
2449 		if (bdev_io == bdev->reset_in_progress) {
2450 			bdev->reset_in_progress = NULL;
2451 			unlock_channels = true;
2452 		}
2453 		pthread_mutex_unlock(&bdev->mutex);
2454 
2455 		if (unlock_channels) {
2456 			spdk_for_each_channel(__bdev_to_io_dev(bdev), _spdk_bdev_unfreeze_channel,
2457 					      bdev_io, _spdk_bdev_reset_complete);
2458 			return;
2459 		}
2460 	} else {
2461 		assert(bdev_ch->io_outstanding > 0);
2462 		assert(shared_resource->io_outstanding > 0);
2463 		bdev_ch->io_outstanding--;
2464 		shared_resource->io_outstanding--;
2465 
2466 		if (spdk_unlikely(status == SPDK_BDEV_IO_STATUS_NOMEM)) {
2467 			TAILQ_INSERT_HEAD(&shared_resource->nomem_io, bdev_io, link);
2468 			/*
2469 			 * Wait for some of the outstanding I/O to complete before we
2470 			 *  retry any of the nomem_io.  Normally we will wait for
2471 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
2472 			 *  depth channels we will instead wait for half to complete.
2473 			 */
2474 			shared_resource->nomem_threshold = spdk_max((int64_t)shared_resource->io_outstanding / 2,
2475 							   (int64_t)shared_resource->io_outstanding - NOMEM_THRESHOLD_COUNT);
2476 			return;
2477 		}
2478 
2479 		if (spdk_unlikely(!TAILQ_EMPTY(&shared_resource->nomem_io))) {
2480 			_spdk_bdev_ch_retry_io(bdev_ch);
2481 		}
2482 	}
2483 
2484 	_spdk_bdev_io_complete(bdev_io);
2485 }
2486 
2487 void
2488 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
2489 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
2490 {
2491 	if (sc == SPDK_SCSI_STATUS_GOOD) {
2492 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
2493 	} else {
2494 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
2495 		bdev_io->error.scsi.sc = sc;
2496 		bdev_io->error.scsi.sk = sk;
2497 		bdev_io->error.scsi.asc = asc;
2498 		bdev_io->error.scsi.ascq = ascq;
2499 	}
2500 
2501 	spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
2502 }
2503 
2504 void
2505 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
2506 			     int *sc, int *sk, int *asc, int *ascq)
2507 {
2508 	assert(sc != NULL);
2509 	assert(sk != NULL);
2510 	assert(asc != NULL);
2511 	assert(ascq != NULL);
2512 
2513 	switch (bdev_io->internal.status) {
2514 	case SPDK_BDEV_IO_STATUS_SUCCESS:
2515 		*sc = SPDK_SCSI_STATUS_GOOD;
2516 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
2517 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2518 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2519 		break;
2520 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
2521 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
2522 		break;
2523 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
2524 		*sc = bdev_io->error.scsi.sc;
2525 		*sk = bdev_io->error.scsi.sk;
2526 		*asc = bdev_io->error.scsi.asc;
2527 		*ascq = bdev_io->error.scsi.ascq;
2528 		break;
2529 	default:
2530 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
2531 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
2532 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
2533 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
2534 		break;
2535 	}
2536 }
2537 
2538 void
2539 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
2540 {
2541 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
2542 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_SUCCESS;
2543 	} else {
2544 		bdev_io->error.nvme.sct = sct;
2545 		bdev_io->error.nvme.sc = sc;
2546 		bdev_io->internal.status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
2547 	}
2548 
2549 	spdk_bdev_io_complete(bdev_io, bdev_io->internal.status);
2550 }
2551 
2552 void
2553 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
2554 {
2555 	assert(sct != NULL);
2556 	assert(sc != NULL);
2557 
2558 	if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
2559 		*sct = bdev_io->error.nvme.sct;
2560 		*sc = bdev_io->error.nvme.sc;
2561 	} else if (bdev_io->internal.status == SPDK_BDEV_IO_STATUS_SUCCESS) {
2562 		*sct = SPDK_NVME_SCT_GENERIC;
2563 		*sc = SPDK_NVME_SC_SUCCESS;
2564 	} else {
2565 		*sct = SPDK_NVME_SCT_GENERIC;
2566 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
2567 	}
2568 }
2569 
2570 struct spdk_thread *
2571 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
2572 {
2573 	return spdk_io_channel_get_thread(bdev_io->internal.ch->channel);
2574 }
2575 
2576 static void
2577 _spdk_bdev_qos_config_type(struct spdk_bdev *bdev, uint64_t qos_set,
2578 			   enum spdk_bdev_qos_type qos_type)
2579 {
2580 	uint64_t	min_qos_set = 0;
2581 
2582 	switch (qos_type) {
2583 	case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
2584 		min_qos_set = SPDK_BDEV_QOS_MIN_IOS_PER_SEC;
2585 		break;
2586 	case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT:
2587 		min_qos_set = SPDK_BDEV_QOS_MIN_BW_IN_MB_PER_SEC;
2588 		break;
2589 	default:
2590 		SPDK_ERRLOG("Unsupported QoS type.\n");
2591 		return;
2592 	}
2593 
2594 	if (qos_set % min_qos_set) {
2595 		SPDK_ERRLOG("Assigned QoS %" PRIu64 " on bdev %s is not multiple of %lu\n",
2596 			    qos_set, bdev->name, min_qos_set);
2597 		SPDK_ERRLOG("Failed to enable QoS on this bdev %s\n", bdev->name);
2598 		return;
2599 	}
2600 
2601 	if (!bdev->qos) {
2602 		bdev->qos = calloc(1, sizeof(*bdev->qos));
2603 		if (!bdev->qos) {
2604 			SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
2605 			return;
2606 		}
2607 	}
2608 
2609 	switch (qos_type) {
2610 	case SPDK_BDEV_QOS_RW_IOPS_RATE_LIMIT:
2611 		bdev->qos->iops_rate_limit = qos_set;
2612 		break;
2613 	case SPDK_BDEV_QOS_RW_BYTEPS_RATE_LIMIT:
2614 		bdev->qos->byte_rate_limit = qos_set * 1024 * 1024;
2615 		break;
2616 	default:
2617 		break;
2618 	}
2619 
2620 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Bdev:%s QoS type:%d set:%lu\n",
2621 		      bdev->name, qos_type, qos_set);
2622 
2623 	return;
2624 }
2625 
2626 static void
2627 _spdk_bdev_qos_config(struct spdk_bdev *bdev)
2628 {
2629 	struct spdk_conf_section	*sp = NULL;
2630 	const char			*val = NULL;
2631 	uint64_t			qos_set = 0;
2632 	int				i = 0, j = 0;
2633 
2634 	sp = spdk_conf_find_section(NULL, "QoS");
2635 	if (!sp) {
2636 		return;
2637 	}
2638 
2639 	while (j < SPDK_BDEV_QOS_NUM_TYPES) {
2640 		i = 0;
2641 		while (true) {
2642 			val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 0);
2643 			if (!val) {
2644 				break;
2645 			}
2646 
2647 			if (strcmp(bdev->name, val) != 0) {
2648 				i++;
2649 				continue;
2650 			}
2651 
2652 			val = spdk_conf_section_get_nmval(sp, qos_type_str[j], i, 1);
2653 			if (val) {
2654 				qos_set = strtoull(val, NULL, 10);
2655 				_spdk_bdev_qos_config_type(bdev, qos_set, j);
2656 			}
2657 
2658 			break;
2659 		}
2660 
2661 		j++;
2662 	}
2663 
2664 	return;
2665 }
2666 
2667 static int
2668 spdk_bdev_init(struct spdk_bdev *bdev)
2669 {
2670 	assert(bdev->module != NULL);
2671 
2672 	if (!bdev->name) {
2673 		SPDK_ERRLOG("Bdev name is NULL\n");
2674 		return -EINVAL;
2675 	}
2676 
2677 	if (spdk_bdev_get_by_name(bdev->name)) {
2678 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
2679 		return -EEXIST;
2680 	}
2681 
2682 	bdev->status = SPDK_BDEV_STATUS_READY;
2683 
2684 	TAILQ_INIT(&bdev->open_descs);
2685 
2686 	TAILQ_INIT(&bdev->aliases);
2687 
2688 	bdev->reset_in_progress = NULL;
2689 
2690 	_spdk_bdev_qos_config(bdev);
2691 
2692 	spdk_io_device_register(__bdev_to_io_dev(bdev),
2693 				spdk_bdev_channel_create, spdk_bdev_channel_destroy,
2694 				sizeof(struct spdk_bdev_channel));
2695 
2696 	pthread_mutex_init(&bdev->mutex, NULL);
2697 	return 0;
2698 }
2699 
2700 static void
2701 spdk_bdev_destroy_cb(void *io_device)
2702 {
2703 	int			rc;
2704 	struct spdk_bdev	*bdev;
2705 	spdk_bdev_unregister_cb	cb_fn;
2706 	void			*cb_arg;
2707 
2708 	bdev = __bdev_from_io_dev(io_device);
2709 	cb_fn = bdev->unregister_cb;
2710 	cb_arg = bdev->unregister_ctx;
2711 
2712 	rc = bdev->fn_table->destruct(bdev->ctxt);
2713 	if (rc < 0) {
2714 		SPDK_ERRLOG("destruct failed\n");
2715 	}
2716 	if (rc <= 0 && cb_fn != NULL) {
2717 		cb_fn(cb_arg, rc);
2718 	}
2719 }
2720 
2721 
2722 static void
2723 spdk_bdev_fini(struct spdk_bdev *bdev)
2724 {
2725 	pthread_mutex_destroy(&bdev->mutex);
2726 
2727 	free(bdev->qos);
2728 
2729 	spdk_io_device_unregister(__bdev_to_io_dev(bdev), spdk_bdev_destroy_cb);
2730 }
2731 
2732 static void
2733 spdk_bdev_start(struct spdk_bdev *bdev)
2734 {
2735 	struct spdk_bdev_module *module;
2736 
2737 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
2738 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
2739 
2740 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
2741 		if (module->examine) {
2742 			module->action_in_progress++;
2743 			module->examine(bdev);
2744 		}
2745 	}
2746 }
2747 
2748 int
2749 spdk_bdev_register(struct spdk_bdev *bdev)
2750 {
2751 	int rc = spdk_bdev_init(bdev);
2752 
2753 	if (rc == 0) {
2754 		spdk_bdev_start(bdev);
2755 	}
2756 
2757 	return rc;
2758 }
2759 
2760 static void
2761 spdk_vbdev_remove_base_bdevs(struct spdk_bdev *vbdev)
2762 {
2763 	struct spdk_bdev **bdevs;
2764 	struct spdk_bdev *base;
2765 	size_t i, j, k;
2766 	bool found;
2767 
2768 	/* Iterate over base bdevs to remove vbdev from them. */
2769 	for (i = 0; i < vbdev->base_bdevs_cnt; i++) {
2770 		found = false;
2771 		base = vbdev->base_bdevs[i];
2772 
2773 		for (j = 0; j < base->vbdevs_cnt; j++) {
2774 			if (base->vbdevs[j] != vbdev) {
2775 				continue;
2776 			}
2777 
2778 			for (k = j; k + 1 < base->vbdevs_cnt; k++) {
2779 				base->vbdevs[k] = base->vbdevs[k + 1];
2780 			}
2781 
2782 			base->vbdevs_cnt--;
2783 			if (base->vbdevs_cnt > 0) {
2784 				bdevs = realloc(base->vbdevs, base->vbdevs_cnt * sizeof(bdevs[0]));
2785 				/* It would be odd if shrinking memory block fail. */
2786 				assert(bdevs);
2787 				base->vbdevs = bdevs;
2788 			} else {
2789 				free(base->vbdevs);
2790 				base->vbdevs = NULL;
2791 			}
2792 
2793 			found = true;
2794 			break;
2795 		}
2796 
2797 		if (!found) {
2798 			SPDK_WARNLOG("Bdev '%s' is not base bdev of '%s'.\n", base->name, vbdev->name);
2799 		}
2800 	}
2801 
2802 	free(vbdev->base_bdevs);
2803 	vbdev->base_bdevs = NULL;
2804 	vbdev->base_bdevs_cnt = 0;
2805 }
2806 
2807 static int
2808 spdk_vbdev_set_base_bdevs(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, size_t cnt)
2809 {
2810 	struct spdk_bdev **vbdevs;
2811 	struct spdk_bdev *base;
2812 	size_t i;
2813 
2814 	/* Adding base bdevs isn't supported (yet?). */
2815 	assert(vbdev->base_bdevs_cnt == 0);
2816 
2817 	vbdev->base_bdevs = malloc(cnt * sizeof(vbdev->base_bdevs[0]));
2818 	if (!vbdev->base_bdevs) {
2819 		SPDK_ERRLOG("%s - realloc() failed\n", vbdev->name);
2820 		return -ENOMEM;
2821 	}
2822 
2823 	memcpy(vbdev->base_bdevs, base_bdevs, cnt * sizeof(vbdev->base_bdevs[0]));
2824 	vbdev->base_bdevs_cnt = cnt;
2825 
2826 	/* Iterate over base bdevs to add this vbdev to them. */
2827 	for (i = 0; i < cnt; i++) {
2828 		base = vbdev->base_bdevs[i];
2829 
2830 		assert(base != NULL);
2831 		assert(base->claim_module != NULL);
2832 
2833 		vbdevs = realloc(base->vbdevs, (base->vbdevs_cnt + 1) * sizeof(vbdevs[0]));
2834 		if (!vbdevs) {
2835 			SPDK_ERRLOG("%s - realloc() failed\n", base->name);
2836 			spdk_vbdev_remove_base_bdevs(vbdev);
2837 			return -ENOMEM;
2838 		}
2839 
2840 		vbdevs[base->vbdevs_cnt] = vbdev;
2841 		base->vbdevs = vbdevs;
2842 		base->vbdevs_cnt++;
2843 	}
2844 
2845 	return 0;
2846 }
2847 
2848 int
2849 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
2850 {
2851 	int rc;
2852 
2853 	rc = spdk_bdev_init(vbdev);
2854 	if (rc) {
2855 		return rc;
2856 	}
2857 
2858 	if (base_bdev_count == 0) {
2859 		spdk_bdev_start(vbdev);
2860 		return 0;
2861 	}
2862 
2863 	rc = spdk_vbdev_set_base_bdevs(vbdev, base_bdevs, base_bdev_count);
2864 	if (rc) {
2865 		spdk_bdev_fini(vbdev);
2866 		return rc;
2867 	}
2868 
2869 	spdk_bdev_start(vbdev);
2870 	return 0;
2871 
2872 }
2873 
2874 void
2875 spdk_bdev_destruct_done(struct spdk_bdev *bdev, int bdeverrno)
2876 {
2877 	if (bdev->unregister_cb != NULL) {
2878 		bdev->unregister_cb(bdev->unregister_ctx, bdeverrno);
2879 	}
2880 }
2881 
2882 static void
2883 _remove_notify(void *arg)
2884 {
2885 	struct spdk_bdev_desc *desc = arg;
2886 
2887 	desc->remove_cb(desc->remove_ctx);
2888 }
2889 
2890 void
2891 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
2892 {
2893 	struct spdk_bdev_desc	*desc, *tmp;
2894 	bool			do_destruct = true;
2895 	struct spdk_thread	*thread;
2896 
2897 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
2898 
2899 	thread = spdk_get_thread();
2900 	if (!thread) {
2901 		/* The user called this from a non-SPDK thread. */
2902 		cb_fn(cb_arg, -ENOTSUP);
2903 		return;
2904 	}
2905 
2906 	pthread_mutex_lock(&bdev->mutex);
2907 
2908 	spdk_vbdev_remove_base_bdevs(bdev);
2909 
2910 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
2911 	bdev->unregister_cb = cb_fn;
2912 	bdev->unregister_ctx = cb_arg;
2913 
2914 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
2915 		if (desc->remove_cb) {
2916 			do_destruct = false;
2917 			/*
2918 			 * Defer invocation of the remove_cb to a separate message that will
2919 			 *  run later on this thread.  This ensures this context unwinds and
2920 			 *  we don't recursively unregister this bdev again if the remove_cb
2921 			 *  immediately closes its descriptor.
2922 			 */
2923 			if (!desc->remove_scheduled) {
2924 				/* Avoid scheduling removal of the same descriptor multiple times. */
2925 				desc->remove_scheduled = true;
2926 				spdk_thread_send_msg(thread, _remove_notify, desc);
2927 			}
2928 		}
2929 	}
2930 
2931 	if (!do_destruct) {
2932 		pthread_mutex_unlock(&bdev->mutex);
2933 		return;
2934 	}
2935 
2936 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
2937 	pthread_mutex_unlock(&bdev->mutex);
2938 
2939 	spdk_bdev_fini(bdev);
2940 }
2941 
2942 int
2943 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
2944 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
2945 {
2946 	struct spdk_bdev_desc *desc;
2947 
2948 	desc = calloc(1, sizeof(*desc));
2949 	if (desc == NULL) {
2950 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
2951 		return -ENOMEM;
2952 	}
2953 
2954 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Opening descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
2955 		      spdk_get_thread());
2956 
2957 	pthread_mutex_lock(&bdev->mutex);
2958 
2959 	if (write && bdev->claim_module) {
2960 		SPDK_ERRLOG("Could not open %s - already claimed\n", bdev->name);
2961 		free(desc);
2962 		pthread_mutex_unlock(&bdev->mutex);
2963 		return -EPERM;
2964 	}
2965 
2966 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
2967 
2968 	desc->bdev = bdev;
2969 	desc->remove_cb = remove_cb;
2970 	desc->remove_ctx = remove_ctx;
2971 	desc->write = write;
2972 	*_desc = desc;
2973 
2974 	pthread_mutex_unlock(&bdev->mutex);
2975 
2976 	return 0;
2977 }
2978 
2979 void
2980 spdk_bdev_close(struct spdk_bdev_desc *desc)
2981 {
2982 	struct spdk_bdev *bdev = desc->bdev;
2983 	bool do_unregister = false;
2984 
2985 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closing descriptor %p for bdev %s on thread %p\n", desc, bdev->name,
2986 		      spdk_get_thread());
2987 
2988 	pthread_mutex_lock(&bdev->mutex);
2989 
2990 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
2991 	free(desc);
2992 
2993 	/* If no more descriptors, kill QoS channel */
2994 	if (bdev->qos && TAILQ_EMPTY(&bdev->open_descs)) {
2995 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Closed last descriptor for bdev %s on thread %p. Stopping QoS.\n",
2996 			      bdev->name, spdk_get_thread());
2997 
2998 		if (spdk_bdev_qos_destroy(bdev)) {
2999 			/* There isn't anything we can do to recover here. Just let the
3000 			 * old QoS poller keep running. The QoS handling won't change
3001 			 * cores when the user allocates a new channel, but it won't break. */
3002 			SPDK_ERRLOG("Unable to shut down QoS poller. It will continue running on the current thread.\n");
3003 		}
3004 	}
3005 
3006 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
3007 		do_unregister = true;
3008 	}
3009 	pthread_mutex_unlock(&bdev->mutex);
3010 
3011 	if (do_unregister == true) {
3012 		spdk_bdev_unregister(bdev, bdev->unregister_cb, bdev->unregister_ctx);
3013 	}
3014 }
3015 
3016 int
3017 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
3018 			    struct spdk_bdev_module *module)
3019 {
3020 	if (bdev->claim_module != NULL) {
3021 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
3022 			    bdev->claim_module->name);
3023 		return -EPERM;
3024 	}
3025 
3026 	if (desc && !desc->write) {
3027 		desc->write = true;
3028 	}
3029 
3030 	bdev->claim_module = module;
3031 	return 0;
3032 }
3033 
3034 void
3035 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
3036 {
3037 	assert(bdev->claim_module != NULL);
3038 	bdev->claim_module = NULL;
3039 }
3040 
3041 struct spdk_bdev *
3042 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
3043 {
3044 	return desc->bdev;
3045 }
3046 
3047 void
3048 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
3049 {
3050 	struct iovec *iovs;
3051 	int iovcnt;
3052 
3053 	if (bdev_io == NULL) {
3054 		return;
3055 	}
3056 
3057 	switch (bdev_io->type) {
3058 	case SPDK_BDEV_IO_TYPE_READ:
3059 		iovs = bdev_io->u.bdev.iovs;
3060 		iovcnt = bdev_io->u.bdev.iovcnt;
3061 		break;
3062 	case SPDK_BDEV_IO_TYPE_WRITE:
3063 		iovs = bdev_io->u.bdev.iovs;
3064 		iovcnt = bdev_io->u.bdev.iovcnt;
3065 		break;
3066 	default:
3067 		iovs = NULL;
3068 		iovcnt = 0;
3069 		break;
3070 	}
3071 
3072 	if (iovp) {
3073 		*iovp = iovs;
3074 	}
3075 	if (iovcntp) {
3076 		*iovcntp = iovcnt;
3077 	}
3078 }
3079 
3080 void
3081 spdk_bdev_module_list_add(struct spdk_bdev_module *bdev_module)
3082 {
3083 
3084 	if (spdk_bdev_module_list_find(bdev_module->name)) {
3085 		SPDK_ERRLOG("ERROR: module '%s' already registered.\n", bdev_module->name);
3086 		assert(false);
3087 	}
3088 
3089 	if (bdev_module->async_init) {
3090 		bdev_module->action_in_progress = 1;
3091 	}
3092 
3093 	/*
3094 	 * Modules with examine callbacks must be initialized first, so they are
3095 	 *  ready to handle examine callbacks from later modules that will
3096 	 *  register physical bdevs.
3097 	 */
3098 	if (bdev_module->examine != NULL) {
3099 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
3100 	} else {
3101 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
3102 	}
3103 }
3104 
3105 struct spdk_bdev_module *
3106 spdk_bdev_module_list_find(const char *name)
3107 {
3108 	struct spdk_bdev_module *bdev_module;
3109 
3110 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
3111 		if (strcmp(name, bdev_module->name) == 0) {
3112 			break;
3113 		}
3114 	}
3115 
3116 	return bdev_module;
3117 }
3118 
3119 static void
3120 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
3121 {
3122 	uint64_t len;
3123 
3124 	if (!success) {
3125 		bdev_io->internal.cb = bdev_io->u.bdev.stored_user_cb;
3126 		_spdk_bdev_io_complete(bdev_io);
3127 		return;
3128 	}
3129 
3130 	/* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */
3131 	len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->u.bdev.split_remaining_num_blocks,
3132 		       ZERO_BUFFER_SIZE);
3133 
3134 	bdev_io->u.bdev.offset_blocks = bdev_io->u.bdev.split_current_offset_blocks;
3135 	bdev_io->u.bdev.iov.iov_len = len;
3136 	bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev);
3137 	bdev_io->u.bdev.split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks;
3138 	bdev_io->u.bdev.split_current_offset_blocks += bdev_io->u.bdev.num_blocks;
3139 
3140 	/* if this round completes the i/o, change the callback to be the original user callback */
3141 	if (bdev_io->u.bdev.split_remaining_num_blocks == 0) {
3142 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->u.bdev.stored_user_cb);
3143 	} else {
3144 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split);
3145 	}
3146 	spdk_bdev_io_submit(bdev_io);
3147 }
3148 
3149 struct set_qos_limit_ctx {
3150 	void (*cb_fn)(void *cb_arg, int status);
3151 	void *cb_arg;
3152 	struct spdk_bdev *bdev;
3153 };
3154 
3155 static void
3156 _spdk_bdev_set_qos_limit_done(struct set_qos_limit_ctx *ctx, int status)
3157 {
3158 	pthread_mutex_lock(&ctx->bdev->mutex);
3159 	ctx->bdev->qos_mod_in_progress = false;
3160 	pthread_mutex_unlock(&ctx->bdev->mutex);
3161 
3162 	ctx->cb_fn(ctx->cb_arg, status);
3163 	free(ctx);
3164 }
3165 
3166 static void
3167 _spdk_bdev_disable_qos_done(void *cb_arg)
3168 {
3169 	struct set_qos_limit_ctx *ctx = cb_arg;
3170 	struct spdk_bdev *bdev = ctx->bdev;
3171 	struct spdk_bdev_qos *qos;
3172 
3173 	pthread_mutex_lock(&bdev->mutex);
3174 	qos = bdev->qos;
3175 	bdev->qos = NULL;
3176 	pthread_mutex_unlock(&bdev->mutex);
3177 
3178 	_spdk_bdev_abort_queued_io(&qos->queued, qos->ch);
3179 	spdk_put_io_channel(spdk_io_channel_from_ctx(qos->ch));
3180 	spdk_poller_unregister(&qos->poller);
3181 
3182 	free(qos);
3183 
3184 	_spdk_bdev_set_qos_limit_done(ctx, 0);
3185 }
3186 
3187 static void
3188 _spdk_bdev_disable_qos_msg_done(struct spdk_io_channel_iter *i, int status)
3189 {
3190 	void *io_device = spdk_io_channel_iter_get_io_device(i);
3191 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
3192 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3193 	struct spdk_thread *thread;
3194 
3195 	pthread_mutex_lock(&bdev->mutex);
3196 	thread = bdev->qos->thread;
3197 	pthread_mutex_unlock(&bdev->mutex);
3198 
3199 	spdk_thread_send_msg(thread, _spdk_bdev_disable_qos_done, ctx);
3200 }
3201 
3202 static void
3203 _spdk_bdev_disable_qos_msg(struct spdk_io_channel_iter *i)
3204 {
3205 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3206 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
3207 
3208 	bdev_ch->flags &= ~BDEV_CH_QOS_ENABLED;
3209 
3210 	spdk_for_each_channel_continue(i, 0);
3211 }
3212 
3213 static void
3214 _spdk_bdev_update_qos_limit_iops_msg(void *cb_arg)
3215 {
3216 	struct set_qos_limit_ctx *ctx = cb_arg;
3217 	struct spdk_bdev *bdev = ctx->bdev;
3218 
3219 	pthread_mutex_lock(&bdev->mutex);
3220 	spdk_bdev_qos_update_max_quota_per_timeslice(bdev->qos);
3221 	pthread_mutex_unlock(&bdev->mutex);
3222 
3223 	_spdk_bdev_set_qos_limit_done(ctx, 0);
3224 }
3225 
3226 static void
3227 _spdk_bdev_enable_qos_msg(struct spdk_io_channel_iter *i)
3228 {
3229 	void *io_device = spdk_io_channel_iter_get_io_device(i);
3230 	struct spdk_bdev *bdev = __bdev_from_io_dev(io_device);
3231 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3232 	struct spdk_bdev_channel *bdev_ch = spdk_io_channel_get_ctx(ch);
3233 	int rc;
3234 
3235 	pthread_mutex_lock(&bdev->mutex);
3236 	rc = _spdk_bdev_enable_qos(bdev, bdev_ch);
3237 	pthread_mutex_unlock(&bdev->mutex);
3238 	spdk_for_each_channel_continue(i, rc);
3239 }
3240 
3241 static void
3242 _spdk_bdev_enable_qos_done(struct spdk_io_channel_iter *i, int status)
3243 {
3244 	struct set_qos_limit_ctx *ctx = spdk_io_channel_iter_get_ctx(i);
3245 
3246 	_spdk_bdev_set_qos_limit_done(ctx, status);
3247 }
3248 
3249 void
3250 spdk_bdev_set_qos_limit_iops(struct spdk_bdev *bdev, uint64_t ios_per_sec,
3251 			     void (*cb_fn)(void *cb_arg, int status), void *cb_arg)
3252 {
3253 	struct set_qos_limit_ctx *ctx;
3254 
3255 	if (ios_per_sec > 0 && ios_per_sec % SPDK_BDEV_QOS_MIN_IOS_PER_SEC) {
3256 		SPDK_ERRLOG("Requested ios_per_sec limit %" PRIu64 " is not a multiple of %u\n",
3257 			    ios_per_sec, SPDK_BDEV_QOS_MIN_IOS_PER_SEC);
3258 		cb_fn(cb_arg, -EINVAL);
3259 		return;
3260 	}
3261 
3262 	ctx = calloc(1, sizeof(*ctx));
3263 	if (ctx == NULL) {
3264 		cb_fn(cb_arg, -ENOMEM);
3265 		return;
3266 	}
3267 
3268 	ctx->cb_fn = cb_fn;
3269 	ctx->cb_arg = cb_arg;
3270 	ctx->bdev = bdev;
3271 
3272 	pthread_mutex_lock(&bdev->mutex);
3273 	if (bdev->qos_mod_in_progress) {
3274 		pthread_mutex_unlock(&bdev->mutex);
3275 		free(ctx);
3276 		cb_fn(cb_arg, -EAGAIN);
3277 		return;
3278 	}
3279 	bdev->qos_mod_in_progress = true;
3280 
3281 	if (ios_per_sec > 0) {
3282 		if (bdev->qos == NULL) {
3283 			/* Enabling */
3284 			bdev->qos = calloc(1, sizeof(*bdev->qos));
3285 			if (!bdev->qos) {
3286 				pthread_mutex_unlock(&bdev->mutex);
3287 				SPDK_ERRLOG("Unable to allocate memory for QoS tracking\n");
3288 				free(ctx);
3289 				cb_fn(cb_arg, -ENOMEM);
3290 				return;
3291 			}
3292 
3293 			bdev->qos->iops_rate_limit = ios_per_sec;
3294 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
3295 					      _spdk_bdev_enable_qos_msg, ctx,
3296 					      _spdk_bdev_enable_qos_done);
3297 		} else {
3298 			/* Updating */
3299 			bdev->qos->iops_rate_limit = ios_per_sec;
3300 			spdk_thread_send_msg(bdev->qos->thread, _spdk_bdev_update_qos_limit_iops_msg, ctx);
3301 		}
3302 	} else {
3303 		if (bdev->qos != NULL) {
3304 			/* Disabling */
3305 			spdk_for_each_channel(__bdev_to_io_dev(bdev),
3306 					      _spdk_bdev_disable_qos_msg, ctx,
3307 					      _spdk_bdev_disable_qos_msg_done);
3308 		} else {
3309 			pthread_mutex_unlock(&bdev->mutex);
3310 			_spdk_bdev_set_qos_limit_done(ctx, 0);
3311 			return;
3312 		}
3313 	}
3314 
3315 	pthread_mutex_unlock(&bdev->mutex);
3316 }
3317 
3318 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
3319