xref: /spdk/lib/bdev/bdev.c (revision edc5610fbc70a08c4b52f5c42a1ab22622319fcb)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/event.h"
41 #include "spdk/io_channel.h"
42 #include "spdk/likely.h"
43 #include "spdk/queue.h"
44 #include "spdk/nvme_spec.h"
45 #include "spdk/scsi_spec.h"
46 #include "spdk/util.h"
47 
48 #include "spdk_internal/bdev.h"
49 #include "spdk_internal/log.h"
50 #include "spdk/string.h"
51 
52 #ifdef SPDK_CONFIG_VTUNE
53 #include "ittnotify.h"
54 #include "ittnotify_types.h"
55 int __itt_init_ittlib(const char *, __itt_group_id);
56 #endif
57 
58 #define SPDK_BDEV_IO_POOL_SIZE	(64 * 1024)
59 #define SPDK_BDEV_IO_CACHE_SIZE	256
60 #define BUF_SMALL_POOL_SIZE	8192
61 #define BUF_LARGE_POOL_SIZE	1024
62 #define NOMEM_THRESHOLD_COUNT	8
63 #define ZERO_BUFFER_SIZE	0x100000
64 
65 typedef TAILQ_HEAD(, spdk_bdev_io) bdev_io_tailq_t;
66 
67 struct spdk_bdev_mgr {
68 	struct spdk_mempool *bdev_io_pool;
69 
70 	struct spdk_mempool *buf_small_pool;
71 	struct spdk_mempool *buf_large_pool;
72 
73 	void *zero_buffer;
74 
75 	TAILQ_HEAD(, spdk_bdev_module_if) bdev_modules;
76 
77 	TAILQ_HEAD(, spdk_bdev) bdevs;
78 
79 	bool init_complete;
80 	bool module_init_complete;
81 
82 #ifdef SPDK_CONFIG_VTUNE
83 	__itt_domain	*domain;
84 #endif
85 };
86 
87 static struct spdk_bdev_mgr g_bdev_mgr = {
88 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
89 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
90 	.init_complete = false,
91 	.module_init_complete = false,
92 };
93 
94 static spdk_bdev_init_cb	g_init_cb_fn = NULL;
95 static void			*g_init_cb_arg = NULL;
96 
97 static spdk_bdev_fini_cb	g_fini_cb_fn = NULL;
98 static void			*g_fini_cb_arg = NULL;
99 static struct spdk_thread	*g_fini_thread = NULL;
100 
101 
102 struct spdk_bdev_mgmt_channel {
103 	bdev_io_tailq_t need_buf_small;
104 	bdev_io_tailq_t need_buf_large;
105 
106 	/*
107 	 * Each thread keeps a cache of bdev_io - this allows
108 	 *  bdev threads which are *not* DPDK threads to still
109 	 *  benefit from a per-thread bdev_io cache.  Without
110 	 *  this, non-DPDK threads fetching from the mempool
111 	 *  incur a cmpxchg on get and put.
112 	 */
113 	bdev_io_tailq_t per_thread_cache;
114 	uint32_t	per_thread_cache_count;
115 };
116 
117 struct spdk_bdev_desc {
118 	struct spdk_bdev		*bdev;
119 	spdk_bdev_remove_cb_t		remove_cb;
120 	void				*remove_ctx;
121 	bool				write;
122 	TAILQ_ENTRY(spdk_bdev_desc)	link;
123 };
124 
125 #define BDEV_CH_RESET_IN_PROGRESS	(1 << 0)
126 
127 struct spdk_bdev_channel {
128 	struct spdk_bdev	*bdev;
129 
130 	/* The channel for the underlying device */
131 	struct spdk_io_channel	*channel;
132 
133 	/* Channel for the bdev manager */
134 	struct spdk_io_channel *mgmt_channel;
135 
136 	struct spdk_bdev_io_stat stat;
137 
138 	/*
139 	 * Count of I/O submitted to bdev module and waiting for completion.
140 	 * Incremented before submit_request() is called on an spdk_bdev_io.
141 	 */
142 	uint64_t		io_outstanding;
143 
144 	bdev_io_tailq_t		queued_resets;
145 
146 	/*
147 	 * Queue of IO awaiting retry because of a previous NOMEM status returned
148 	 *  on this channel.
149 	 */
150 	bdev_io_tailq_t		nomem_io;
151 
152 	/*
153 	 * Threshold which io_outstanding must drop to before retrying nomem_io.
154 	 */
155 	uint64_t		nomem_threshold;
156 
157 	uint32_t		flags;
158 
159 #ifdef SPDK_CONFIG_VTUNE
160 	uint64_t		start_tsc;
161 	uint64_t		interval_tsc;
162 	__itt_string_handle	*handle;
163 #endif
164 
165 };
166 
167 static void spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
168 
169 struct spdk_bdev *
170 spdk_bdev_first(void)
171 {
172 	struct spdk_bdev *bdev;
173 
174 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
175 	if (bdev) {
176 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
177 	}
178 
179 	return bdev;
180 }
181 
182 struct spdk_bdev *
183 spdk_bdev_next(struct spdk_bdev *prev)
184 {
185 	struct spdk_bdev *bdev;
186 
187 	bdev = TAILQ_NEXT(prev, link);
188 	if (bdev) {
189 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
190 	}
191 
192 	return bdev;
193 }
194 
195 static struct spdk_bdev *
196 _bdev_next_leaf(struct spdk_bdev *bdev)
197 {
198 	while (bdev != NULL) {
199 		if (TAILQ_EMPTY(&bdev->vbdevs)) {
200 			return bdev;
201 		} else {
202 			bdev = TAILQ_NEXT(bdev, link);
203 		}
204 	}
205 
206 	return bdev;
207 }
208 
209 struct spdk_bdev *
210 spdk_bdev_first_leaf(void)
211 {
212 	struct spdk_bdev *bdev;
213 
214 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
215 
216 	if (bdev) {
217 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Starting bdev iteration at %s\n", bdev->name);
218 	}
219 
220 	return bdev;
221 }
222 
223 struct spdk_bdev *
224 spdk_bdev_next_leaf(struct spdk_bdev *prev)
225 {
226 	struct spdk_bdev *bdev;
227 
228 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
229 
230 	if (bdev) {
231 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Continuing bdev iteration at %s\n", bdev->name);
232 	}
233 
234 	return bdev;
235 }
236 
237 struct spdk_bdev *
238 spdk_bdev_get_by_name(const char *bdev_name)
239 {
240 	struct spdk_bdev_alias *tmp;
241 	struct spdk_bdev *bdev = spdk_bdev_first();
242 
243 	while (bdev != NULL) {
244 		if (strcmp(bdev_name, bdev->name) == 0) {
245 			return bdev;
246 		}
247 
248 		TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
249 			if (strcmp(bdev_name, tmp->alias) == 0) {
250 				return bdev;
251 			}
252 		}
253 
254 		bdev = spdk_bdev_next(bdev);
255 	}
256 
257 	return NULL;
258 }
259 
260 static void
261 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
262 {
263 	assert(bdev_io->get_buf_cb != NULL);
264 	assert(buf != NULL);
265 	assert(bdev_io->u.bdev.iovs != NULL);
266 
267 	bdev_io->buf = buf;
268 	bdev_io->u.bdev.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
269 	bdev_io->u.bdev.iovs[0].iov_len = bdev_io->buf_len;
270 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
271 }
272 
273 static void
274 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
275 {
276 	struct spdk_mempool *pool;
277 	struct spdk_bdev_io *tmp;
278 	void *buf;
279 	bdev_io_tailq_t *tailq;
280 	struct spdk_bdev_mgmt_channel *ch;
281 
282 	assert(bdev_io->u.bdev.iovcnt == 1);
283 
284 	buf = bdev_io->buf;
285 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
286 
287 	if (bdev_io->buf_len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
288 		pool = g_bdev_mgr.buf_small_pool;
289 		tailq = &ch->need_buf_small;
290 	} else {
291 		pool = g_bdev_mgr.buf_large_pool;
292 		tailq = &ch->need_buf_large;
293 	}
294 
295 	if (TAILQ_EMPTY(tailq)) {
296 		spdk_mempool_put(pool, buf);
297 	} else {
298 		tmp = TAILQ_FIRST(tailq);
299 		TAILQ_REMOVE(tailq, tmp, buf_link);
300 		spdk_bdev_io_set_buf(tmp, buf);
301 	}
302 }
303 
304 void
305 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb, uint64_t len)
306 {
307 	struct spdk_mempool *pool;
308 	bdev_io_tailq_t *tailq;
309 	void *buf = NULL;
310 	struct spdk_bdev_mgmt_channel *ch;
311 
312 	assert(cb != NULL);
313 	assert(bdev_io->u.bdev.iovs != NULL);
314 
315 	if (spdk_unlikely(bdev_io->u.bdev.iovs[0].iov_base != NULL)) {
316 		/* Buffer already present */
317 		cb(bdev_io->ch->channel, bdev_io);
318 		return;
319 	}
320 
321 	assert(len <= SPDK_BDEV_LARGE_BUF_MAX_SIZE);
322 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
323 
324 	bdev_io->buf_len = len;
325 	bdev_io->get_buf_cb = cb;
326 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
327 		pool = g_bdev_mgr.buf_small_pool;
328 		tailq = &ch->need_buf_small;
329 	} else {
330 		pool = g_bdev_mgr.buf_large_pool;
331 		tailq = &ch->need_buf_large;
332 	}
333 
334 	buf = spdk_mempool_get(pool);
335 
336 	if (!buf) {
337 		TAILQ_INSERT_TAIL(tailq, bdev_io, buf_link);
338 	} else {
339 		spdk_bdev_io_set_buf(bdev_io, buf);
340 	}
341 }
342 
343 static int
344 spdk_bdev_module_get_max_ctx_size(void)
345 {
346 	struct spdk_bdev_module_if *bdev_module;
347 	int max_bdev_module_size = 0;
348 
349 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
350 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
351 			max_bdev_module_size = bdev_module->get_ctx_size();
352 		}
353 	}
354 
355 	return max_bdev_module_size;
356 }
357 
358 void
359 spdk_bdev_config_text(FILE *fp)
360 {
361 	struct spdk_bdev_module_if *bdev_module;
362 
363 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
364 		if (bdev_module->config_text) {
365 			bdev_module->config_text(fp);
366 		}
367 	}
368 }
369 
370 static int
371 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
372 {
373 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
374 
375 	TAILQ_INIT(&ch->need_buf_small);
376 	TAILQ_INIT(&ch->need_buf_large);
377 
378 	TAILQ_INIT(&ch->per_thread_cache);
379 	ch->per_thread_cache_count = 0;
380 
381 	return 0;
382 }
383 
384 static void
385 spdk_bdev_mgmt_channel_free_resources(struct spdk_bdev_mgmt_channel *ch)
386 {
387 	struct spdk_bdev_io *bdev_io;
388 
389 	if (!TAILQ_EMPTY(&ch->need_buf_small) || !TAILQ_EMPTY(&ch->need_buf_large)) {
390 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel free\n");
391 	}
392 
393 	while (!TAILQ_EMPTY(&ch->per_thread_cache)) {
394 		bdev_io = TAILQ_FIRST(&ch->per_thread_cache);
395 		TAILQ_REMOVE(&ch->per_thread_cache, bdev_io, buf_link);
396 		ch->per_thread_cache_count--;
397 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
398 	}
399 
400 	assert(ch->per_thread_cache_count == 0);
401 }
402 
403 static void
404 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
405 {
406 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
407 
408 	spdk_bdev_mgmt_channel_free_resources(ch);
409 }
410 
411 static void
412 spdk_bdev_init_complete(int rc)
413 {
414 	spdk_bdev_init_cb cb_fn = g_init_cb_fn;
415 	void *cb_arg = g_init_cb_arg;
416 
417 	g_bdev_mgr.init_complete = true;
418 	g_init_cb_fn = NULL;
419 	g_init_cb_arg = NULL;
420 
421 	cb_fn(cb_arg, rc);
422 }
423 
424 static void
425 spdk_bdev_module_action_complete(void)
426 {
427 	struct spdk_bdev_module_if *m;
428 
429 	/*
430 	 * Don't finish bdev subsystem initialization if
431 	 * module pre-initialization is still in progress, or
432 	 * the subsystem been already initialized.
433 	 */
434 	if (!g_bdev_mgr.module_init_complete || g_bdev_mgr.init_complete) {
435 		return;
436 	}
437 
438 	/*
439 	 * Check all bdev modules for inits/examinations in progress. If any
440 	 * exist, return immediately since we cannot finish bdev subsystem
441 	 * initialization until all are completed.
442 	 */
443 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
444 		if (m->action_in_progress > 0) {
445 			return;
446 		}
447 	}
448 
449 	/*
450 	 * Modules already finished initialization - now that all
451 	 * the bdev modules have finished their asynchronous I/O
452 	 * processing, the entire bdev layer can be marked as complete.
453 	 */
454 	spdk_bdev_init_complete(0);
455 }
456 
457 static void
458 spdk_bdev_module_action_done(struct spdk_bdev_module_if *module)
459 {
460 	assert(module->action_in_progress > 0);
461 	module->action_in_progress--;
462 	spdk_bdev_module_action_complete();
463 }
464 
465 void
466 spdk_bdev_module_init_done(struct spdk_bdev_module_if *module)
467 {
468 	spdk_bdev_module_action_done(module);
469 }
470 
471 void
472 spdk_bdev_module_examine_done(struct spdk_bdev_module_if *module)
473 {
474 	spdk_bdev_module_action_done(module);
475 }
476 
477 static int
478 spdk_bdev_modules_init(void)
479 {
480 	struct spdk_bdev_module_if *module;
481 	int rc = 0;
482 
483 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
484 		rc = module->module_init();
485 		if (rc != 0) {
486 			break;
487 		}
488 	}
489 
490 	g_bdev_mgr.module_init_complete = true;
491 	return rc;
492 }
493 void
494 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg)
495 {
496 	int cache_size;
497 	int rc = 0;
498 	char mempool_name[32];
499 
500 	assert(cb_fn != NULL);
501 
502 	g_init_cb_fn = cb_fn;
503 	g_init_cb_arg = cb_arg;
504 
505 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
506 
507 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
508 				  SPDK_BDEV_IO_POOL_SIZE,
509 				  sizeof(struct spdk_bdev_io) +
510 				  spdk_bdev_module_get_max_ctx_size(),
511 				  0,
512 				  SPDK_ENV_SOCKET_ID_ANY);
513 
514 	if (g_bdev_mgr.bdev_io_pool == NULL) {
515 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
516 		spdk_bdev_init_complete(-1);
517 		return;
518 	}
519 
520 	/**
521 	 * Ensure no more than half of the total buffers end up local caches, by
522 	 *   using spdk_env_get_core_count() to determine how many local caches we need
523 	 *   to account for.
524 	 */
525 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
526 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
527 
528 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
529 				    BUF_SMALL_POOL_SIZE,
530 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
531 				    cache_size,
532 				    SPDK_ENV_SOCKET_ID_ANY);
533 	if (!g_bdev_mgr.buf_small_pool) {
534 		SPDK_ERRLOG("create rbuf small pool failed\n");
535 		spdk_bdev_init_complete(-1);
536 		return;
537 	}
538 
539 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
540 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
541 
542 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
543 				    BUF_LARGE_POOL_SIZE,
544 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
545 				    cache_size,
546 				    SPDK_ENV_SOCKET_ID_ANY);
547 	if (!g_bdev_mgr.buf_large_pool) {
548 		SPDK_ERRLOG("create rbuf large pool failed\n");
549 		spdk_bdev_init_complete(-1);
550 		return;
551 	}
552 
553 	g_bdev_mgr.zero_buffer = spdk_dma_zmalloc(ZERO_BUFFER_SIZE, ZERO_BUFFER_SIZE,
554 				 NULL);
555 	if (!g_bdev_mgr.zero_buffer) {
556 		SPDK_ERRLOG("create bdev zero buffer failed\n");
557 		spdk_bdev_init_complete(-1);
558 		return;
559 	}
560 
561 #ifdef SPDK_CONFIG_VTUNE
562 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
563 #endif
564 
565 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
566 				spdk_bdev_mgmt_channel_destroy,
567 				sizeof(struct spdk_bdev_mgmt_channel));
568 
569 	rc = spdk_bdev_modules_init();
570 	if (rc != 0) {
571 		SPDK_ERRLOG("bdev modules init failed\n");
572 		spdk_bdev_init_complete(-1);
573 		return;
574 	}
575 
576 	spdk_bdev_module_action_complete();
577 }
578 
579 static void
580 spdk_bdev_module_finish_cb(void *io_device)
581 {
582 	spdk_bdev_fini_cb cb_fn = g_fini_cb_fn;
583 
584 	cb_fn(g_fini_cb_arg);
585 	g_fini_cb_fn = NULL;
586 	g_fini_cb_arg = NULL;
587 }
588 
589 static void
590 spdk_bdev_module_finish_complete(struct spdk_io_channel_iter *i, int status)
591 {
592 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
593 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
594 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
595 			    SPDK_BDEV_IO_POOL_SIZE);
596 	}
597 
598 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
599 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
600 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
601 			    BUF_SMALL_POOL_SIZE);
602 		assert(false);
603 	}
604 
605 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
606 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
607 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
608 			    BUF_LARGE_POOL_SIZE);
609 		assert(false);
610 	}
611 
612 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
613 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
614 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
615 	spdk_dma_free(g_bdev_mgr.zero_buffer);
616 
617 	spdk_io_device_unregister(&g_bdev_mgr, spdk_bdev_module_finish_cb);
618 }
619 
620 static void
621 mgmt_channel_free_resources(struct spdk_io_channel_iter *i)
622 {
623 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
624 	struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(_ch);
625 
626 	spdk_bdev_mgmt_channel_free_resources(ch);
627 	spdk_for_each_channel_continue(i, 0);
628 }
629 
630 static void
631 spdk_bdev_module_finish_iter(void *arg)
632 {
633 	/* Notice that this variable is static. It is saved between calls to
634 	 * this function. */
635 	static struct spdk_bdev_module_if *resume_bdev_module = NULL;
636 	struct spdk_bdev_module_if *bdev_module;
637 
638 	/* Start iterating from the last touched module */
639 	if (!resume_bdev_module) {
640 		bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules);
641 	} else {
642 		bdev_module = TAILQ_NEXT(resume_bdev_module, tailq);
643 	}
644 
645 	while (bdev_module) {
646 		if (bdev_module->async_fini) {
647 			/* Save our place so we can resume later. We must
648 			 * save the variable here, before calling module_fini()
649 			 * below, because in some cases the module may immediately
650 			 * call spdk_bdev_module_finish_done() and re-enter
651 			 * this function to continue iterating. */
652 			resume_bdev_module = bdev_module;
653 		}
654 
655 		if (bdev_module->module_fini) {
656 			bdev_module->module_fini();
657 		}
658 
659 		if (bdev_module->async_fini) {
660 			return;
661 		}
662 
663 		bdev_module = TAILQ_NEXT(bdev_module, tailq);
664 	}
665 
666 	resume_bdev_module = NULL;
667 	spdk_for_each_channel(&g_bdev_mgr, mgmt_channel_free_resources, NULL,
668 			      spdk_bdev_module_finish_complete);
669 }
670 
671 void
672 spdk_bdev_module_finish_done(void)
673 {
674 	if (spdk_get_thread() != g_fini_thread) {
675 		spdk_thread_send_msg(g_fini_thread, spdk_bdev_module_finish_iter, NULL);
676 	} else {
677 		spdk_bdev_module_finish_iter(NULL);
678 	}
679 }
680 
681 static void
682 _spdk_bdev_finish_unregister_bdevs_iter(void *cb_arg, int bdeverrno)
683 {
684 	struct spdk_bdev *bdev = cb_arg;
685 
686 	if (bdeverrno && bdev) {
687 		SPDK_WARNLOG("Unable to unregister bdev '%s' during spdk_bdev_finish()\n",
688 			     bdev->name);
689 
690 		/*
691 		 * Since the call to spdk_bdev_unregister() failed, we have no way to free this
692 		 *  bdev; try to continue by manually removing this bdev from the list and continue
693 		 *  with the next bdev in the list.
694 		 */
695 		TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
696 	}
697 
698 	if (TAILQ_EMPTY(&g_bdev_mgr.bdevs)) {
699 		SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Done unregistering bdevs\n");
700 		spdk_bdev_module_finish_iter(NULL);
701 		return;
702 	}
703 
704 	/*
705 	 * Unregister the first bdev in the list.
706 	 *
707 	 * spdk_bdev_unregister() will handle the case where the bdev has open descriptors by
708 	 *  calling the remove_cb of the descriptors first.
709 	 *
710 	 * Once this bdev and all of its open descriptors have been cleaned up, this function
711 	 *  will be called again via the unregister completion callback to continue the cleanup
712 	 *  process with the next bdev.
713 	 */
714 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
715 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Unregistering bdev '%s'\n", bdev->name);
716 	spdk_bdev_unregister(bdev, _spdk_bdev_finish_unregister_bdevs_iter, bdev);
717 }
718 
719 static void
720 _spdk_bdev_finish_unregister_bdevs(void)
721 {
722 	_spdk_bdev_finish_unregister_bdevs_iter(NULL, 0);
723 }
724 
725 void
726 spdk_bdev_finish(spdk_bdev_fini_cb cb_fn, void *cb_arg)
727 {
728 	assert(cb_fn != NULL);
729 
730 	g_fini_thread = spdk_get_thread();
731 
732 	g_fini_cb_fn = cb_fn;
733 	g_fini_cb_arg = cb_arg;
734 
735 	_spdk_bdev_finish_unregister_bdevs();
736 }
737 
738 static struct spdk_bdev_io *
739 spdk_bdev_get_io(struct spdk_io_channel *_ch)
740 {
741 	struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(_ch);
742 	struct spdk_bdev_io *bdev_io;
743 
744 	if (ch->per_thread_cache_count > 0) {
745 		bdev_io = TAILQ_FIRST(&ch->per_thread_cache);
746 		TAILQ_REMOVE(&ch->per_thread_cache, bdev_io, buf_link);
747 		ch->per_thread_cache_count--;
748 	} else {
749 		bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
750 		if (!bdev_io) {
751 			SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
752 			abort();
753 		}
754 	}
755 
756 	return bdev_io;
757 }
758 
759 static void
760 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
761 {
762 	struct spdk_bdev_mgmt_channel *ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
763 
764 	if (bdev_io->buf != NULL) {
765 		spdk_bdev_io_put_buf(bdev_io);
766 	}
767 
768 	if (ch->per_thread_cache_count < SPDK_BDEV_IO_CACHE_SIZE) {
769 		ch->per_thread_cache_count++;
770 		TAILQ_INSERT_TAIL(&ch->per_thread_cache, bdev_io, buf_link);
771 	} else {
772 		spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
773 	}
774 }
775 
776 static void
777 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
778 {
779 	struct spdk_bdev *bdev = bdev_io->bdev;
780 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
781 	struct spdk_io_channel *ch = bdev_ch->channel;
782 
783 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
784 
785 	bdev_ch->io_outstanding++;
786 	bdev_io->in_submit_request = true;
787 	if (spdk_likely(bdev_ch->flags == 0)) {
788 		if (spdk_likely(TAILQ_EMPTY(&bdev_ch->nomem_io))) {
789 			bdev->fn_table->submit_request(ch, bdev_io);
790 		} else {
791 			bdev_ch->io_outstanding--;
792 			TAILQ_INSERT_TAIL(&bdev_ch->nomem_io, bdev_io, link);
793 		}
794 	} else if (bdev_ch->flags & BDEV_CH_RESET_IN_PROGRESS) {
795 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
796 	} else {
797 		SPDK_ERRLOG("unknown bdev_ch flag %x found\n", bdev_ch->flags);
798 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
799 	}
800 	bdev_io->in_submit_request = false;
801 }
802 
803 static void
804 spdk_bdev_io_submit_reset(struct spdk_bdev_io *bdev_io)
805 {
806 	struct spdk_bdev *bdev = bdev_io->bdev;
807 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
808 	struct spdk_io_channel *ch = bdev_ch->channel;
809 
810 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
811 
812 	bdev_io->in_submit_request = true;
813 	bdev->fn_table->submit_request(ch, bdev_io);
814 	bdev_io->in_submit_request = false;
815 }
816 
817 static void
818 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
819 		  struct spdk_bdev *bdev, void *cb_arg,
820 		  spdk_bdev_io_completion_cb cb)
821 {
822 	bdev_io->bdev = bdev;
823 	bdev_io->caller_ctx = cb_arg;
824 	bdev_io->cb = cb;
825 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
826 	bdev_io->in_submit_request = false;
827 	bdev_io->buf = NULL;
828 }
829 
830 bool
831 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
832 {
833 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
834 }
835 
836 int
837 spdk_bdev_dump_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
838 {
839 	if (bdev->fn_table->dump_config_json) {
840 		return bdev->fn_table->dump_config_json(bdev->ctxt, w);
841 	}
842 
843 	return 0;
844 }
845 
846 static int
847 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
848 {
849 	struct spdk_bdev		*bdev = io_device;
850 	struct spdk_bdev_channel	*ch = ctx_buf;
851 
852 	ch->bdev = io_device;
853 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
854 	if (!ch->channel) {
855 		return -1;
856 	}
857 
858 	ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr);
859 	if (!ch->mgmt_channel) {
860 		spdk_put_io_channel(ch->channel);
861 		return -1;
862 	}
863 
864 	memset(&ch->stat, 0, sizeof(ch->stat));
865 	ch->io_outstanding = 0;
866 	TAILQ_INIT(&ch->queued_resets);
867 	TAILQ_INIT(&ch->nomem_io);
868 	ch->nomem_threshold = 0;
869 	ch->flags = 0;
870 
871 #ifdef SPDK_CONFIG_VTUNE
872 	{
873 		char *name;
874 		__itt_init_ittlib(NULL, 0);
875 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
876 		if (!name) {
877 			spdk_put_io_channel(ch->channel);
878 			spdk_put_io_channel(ch->mgmt_channel);
879 			return -1;
880 		}
881 		ch->handle = __itt_string_handle_create(name);
882 		free(name);
883 		ch->start_tsc = spdk_get_ticks();
884 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
885 	}
886 #endif
887 
888 	return 0;
889 }
890 
891 /*
892  * Abort I/O that are waiting on a data buffer.  These types of I/O are
893  *  linked using the spdk_bdev_io buf_link TAILQ_ENTRY.
894  */
895 static void
896 _spdk_bdev_abort_buf_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
897 {
898 	struct spdk_bdev_io *bdev_io, *tmp;
899 
900 	TAILQ_FOREACH_SAFE(bdev_io, queue, buf_link, tmp) {
901 		if (bdev_io->ch == ch) {
902 			TAILQ_REMOVE(queue, bdev_io, buf_link);
903 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
904 		}
905 	}
906 }
907 
908 /*
909  * Abort I/O that are queued waiting for submission.  These types of I/O are
910  *  linked using the spdk_bdev_io link TAILQ_ENTRY.
911  */
912 static void
913 _spdk_bdev_abort_queued_io(bdev_io_tailq_t *queue, struct spdk_bdev_channel *ch)
914 {
915 	struct spdk_bdev_io *bdev_io, *tmp;
916 
917 	TAILQ_FOREACH_SAFE(bdev_io, queue, link, tmp) {
918 		if (bdev_io->ch == ch) {
919 			TAILQ_REMOVE(queue, bdev_io, link);
920 			/*
921 			 * spdk_bdev_io_complete() assumes that the completed I/O had
922 			 *  been submitted to the bdev module.  Since in this case it
923 			 *  hadn't, bump io_outstanding to account for the decrement
924 			 *  that spdk_bdev_io_complete() will do.
925 			 */
926 			if (bdev_io->type != SPDK_BDEV_IO_TYPE_RESET) {
927 				ch->io_outstanding++;
928 			}
929 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
930 		}
931 	}
932 }
933 
934 static void
935 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
936 {
937 	struct spdk_bdev_channel	*ch = ctx_buf;
938 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
939 
940 	mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
941 
942 	_spdk_bdev_abort_queued_io(&ch->queued_resets, ch);
943 	_spdk_bdev_abort_queued_io(&ch->nomem_io, ch);
944 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, ch);
945 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, ch);
946 
947 	spdk_put_io_channel(ch->channel);
948 	spdk_put_io_channel(ch->mgmt_channel);
949 	assert(ch->io_outstanding == 0);
950 }
951 
952 int
953 spdk_bdev_alias_add(struct spdk_bdev *bdev, const char *alias)
954 {
955 	struct spdk_bdev_alias *tmp;
956 
957 	if (alias == NULL) {
958 		SPDK_ERRLOG("Empty alias passed\n");
959 		return -EINVAL;
960 	}
961 
962 	if (spdk_bdev_get_by_name(alias)) {
963 		SPDK_ERRLOG("Bdev name/alias: %s already exists\n", alias);
964 		return -EEXIST;
965 	}
966 
967 	tmp = calloc(1, sizeof(*tmp));
968 	if (tmp == NULL) {
969 		SPDK_ERRLOG("Unable to allocate alias\n");
970 		return -ENOMEM;
971 	}
972 
973 	tmp->alias = strdup(alias);
974 	if (tmp->alias == NULL) {
975 		free(tmp);
976 		SPDK_ERRLOG("Unable to allocate alias\n");
977 		return -ENOMEM;
978 	}
979 
980 	TAILQ_INSERT_TAIL(&bdev->aliases, tmp, tailq);
981 
982 	return 0;
983 }
984 
985 int
986 spdk_bdev_alias_del(struct spdk_bdev *bdev, const char *alias)
987 {
988 	struct spdk_bdev_alias *tmp;
989 
990 	TAILQ_FOREACH(tmp, &bdev->aliases, tailq) {
991 		if (strcmp(alias, tmp->alias) == 0) {
992 			TAILQ_REMOVE(&bdev->aliases, tmp, tailq);
993 			free(tmp->alias);
994 			free(tmp);
995 			return 0;
996 		}
997 	}
998 
999 	SPDK_INFOLOG(SPDK_LOG_BDEV, "Alias %s does not exists\n", alias);
1000 
1001 	return -ENOENT;
1002 }
1003 
1004 struct spdk_io_channel *
1005 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
1006 {
1007 	return spdk_get_io_channel(desc->bdev);
1008 }
1009 
1010 const char *
1011 spdk_bdev_get_name(const struct spdk_bdev *bdev)
1012 {
1013 	return bdev->name;
1014 }
1015 
1016 const char *
1017 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
1018 {
1019 	return bdev->product_name;
1020 }
1021 
1022 const struct spdk_bdev_aliases_list *
1023 spdk_bdev_get_aliases(const struct spdk_bdev *bdev)
1024 {
1025 	return &bdev->aliases;
1026 }
1027 
1028 uint32_t
1029 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
1030 {
1031 	return bdev->blocklen;
1032 }
1033 
1034 uint64_t
1035 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
1036 {
1037 	return bdev->blockcnt;
1038 }
1039 
1040 size_t
1041 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
1042 {
1043 	/* TODO: push this logic down to the bdev modules */
1044 	if (bdev->need_aligned_buffer) {
1045 		return bdev->blocklen;
1046 	}
1047 
1048 	return 1;
1049 }
1050 
1051 uint32_t
1052 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
1053 {
1054 	return bdev->optimal_io_boundary;
1055 }
1056 
1057 bool
1058 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
1059 {
1060 	return bdev->write_cache;
1061 }
1062 
1063 /*
1064  * Convert I/O offset and length from bytes to blocks.
1065  *
1066  * Returns zero on success or non-zero if the byte parameters aren't divisible by the block size.
1067  */
1068 static uint64_t
1069 spdk_bdev_bytes_to_blocks(struct spdk_bdev *bdev, uint64_t offset_bytes, uint64_t *offset_blocks,
1070 			  uint64_t num_bytes, uint64_t *num_blocks)
1071 {
1072 	uint32_t block_size = bdev->blocklen;
1073 
1074 	*offset_blocks = offset_bytes / block_size;
1075 	*num_blocks = num_bytes / block_size;
1076 
1077 	return (offset_bytes % block_size) | (num_bytes % block_size);
1078 }
1079 
1080 static bool
1081 spdk_bdev_io_valid_blocks(struct spdk_bdev *bdev, uint64_t offset_blocks, uint64_t num_blocks)
1082 {
1083 	/* Return failure if offset_blocks + num_blocks is less than offset_blocks; indicates there
1084 	 * has been an overflow and hence the offset has been wrapped around */
1085 	if (offset_blocks + num_blocks < offset_blocks) {
1086 		return false;
1087 	}
1088 
1089 	/* Return failure if offset_blocks + num_blocks exceeds the size of the bdev */
1090 	if (offset_blocks + num_blocks > bdev->blockcnt) {
1091 		return false;
1092 	}
1093 
1094 	return true;
1095 }
1096 
1097 int
1098 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1099 	       void *buf, uint64_t offset, uint64_t nbytes,
1100 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
1101 {
1102 	uint64_t offset_blocks, num_blocks;
1103 
1104 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1105 		return -EINVAL;
1106 	}
1107 
1108 	return spdk_bdev_read_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1109 }
1110 
1111 int
1112 spdk_bdev_read_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1113 		      void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1114 		      spdk_bdev_io_completion_cb cb, void *cb_arg)
1115 {
1116 	struct spdk_bdev *bdev = desc->bdev;
1117 	struct spdk_bdev_io *bdev_io;
1118 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1119 
1120 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1121 		return -EINVAL;
1122 	}
1123 
1124 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1125 	if (!bdev_io) {
1126 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1127 		return -ENOMEM;
1128 	}
1129 
1130 	bdev_io->ch = channel;
1131 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1132 	bdev_io->u.bdev.iov.iov_base = buf;
1133 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1134 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1135 	bdev_io->u.bdev.iovcnt = 1;
1136 	bdev_io->u.bdev.num_blocks = num_blocks;
1137 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1138 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1139 
1140 	spdk_bdev_io_submit(bdev_io);
1141 	return 0;
1142 }
1143 
1144 int
1145 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1146 		struct iovec *iov, int iovcnt,
1147 		uint64_t offset, uint64_t nbytes,
1148 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1149 {
1150 	uint64_t offset_blocks, num_blocks;
1151 
1152 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1153 		return -EINVAL;
1154 	}
1155 
1156 	return spdk_bdev_readv_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1157 }
1158 
1159 int spdk_bdev_readv_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1160 			   struct iovec *iov, int iovcnt,
1161 			   uint64_t offset_blocks, uint64_t num_blocks,
1162 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1163 {
1164 	struct spdk_bdev *bdev = desc->bdev;
1165 	struct spdk_bdev_io *bdev_io;
1166 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1167 
1168 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1169 		return -EINVAL;
1170 	}
1171 
1172 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1173 	if (!bdev_io) {
1174 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
1175 		return -ENOMEM;
1176 	}
1177 
1178 	bdev_io->ch = channel;
1179 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
1180 	bdev_io->u.bdev.iovs = iov;
1181 	bdev_io->u.bdev.iovcnt = iovcnt;
1182 	bdev_io->u.bdev.num_blocks = num_blocks;
1183 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1184 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1185 
1186 	spdk_bdev_io_submit(bdev_io);
1187 	return 0;
1188 }
1189 
1190 int
1191 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1192 		void *buf, uint64_t offset, uint64_t nbytes,
1193 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1194 {
1195 	uint64_t offset_blocks, num_blocks;
1196 
1197 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1198 		return -EINVAL;
1199 	}
1200 
1201 	return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
1202 }
1203 
1204 int
1205 spdk_bdev_write_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1206 		       void *buf, uint64_t offset_blocks, uint64_t num_blocks,
1207 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1208 {
1209 	struct spdk_bdev *bdev = desc->bdev;
1210 	struct spdk_bdev_io *bdev_io;
1211 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1212 
1213 	if (!desc->write) {
1214 		return -EBADF;
1215 	}
1216 
1217 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1218 		return -EINVAL;
1219 	}
1220 
1221 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1222 	if (!bdev_io) {
1223 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
1224 		return -ENOMEM;
1225 	}
1226 
1227 	bdev_io->ch = channel;
1228 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1229 	bdev_io->u.bdev.iov.iov_base = buf;
1230 	bdev_io->u.bdev.iov.iov_len = num_blocks * bdev->blocklen;
1231 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1232 	bdev_io->u.bdev.iovcnt = 1;
1233 	bdev_io->u.bdev.num_blocks = num_blocks;
1234 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1235 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1236 
1237 	spdk_bdev_io_submit(bdev_io);
1238 	return 0;
1239 }
1240 
1241 int
1242 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1243 		 struct iovec *iov, int iovcnt,
1244 		 uint64_t offset, uint64_t len,
1245 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
1246 {
1247 	uint64_t offset_blocks, num_blocks;
1248 
1249 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1250 		return -EINVAL;
1251 	}
1252 
1253 	return spdk_bdev_writev_blocks(desc, ch, iov, iovcnt, offset_blocks, num_blocks, cb, cb_arg);
1254 }
1255 
1256 int
1257 spdk_bdev_writev_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1258 			struct iovec *iov, int iovcnt,
1259 			uint64_t offset_blocks, uint64_t num_blocks,
1260 			spdk_bdev_io_completion_cb cb, void *cb_arg)
1261 {
1262 	struct spdk_bdev *bdev = desc->bdev;
1263 	struct spdk_bdev_io *bdev_io;
1264 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1265 
1266 	if (!desc->write) {
1267 		return -EBADF;
1268 	}
1269 
1270 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1271 		return -EINVAL;
1272 	}
1273 
1274 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1275 	if (!bdev_io) {
1276 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
1277 		return -ENOMEM;
1278 	}
1279 
1280 	bdev_io->ch = channel;
1281 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1282 	bdev_io->u.bdev.iovs = iov;
1283 	bdev_io->u.bdev.iovcnt = iovcnt;
1284 	bdev_io->u.bdev.num_blocks = num_blocks;
1285 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1286 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1287 
1288 	spdk_bdev_io_submit(bdev_io);
1289 	return 0;
1290 }
1291 
1292 int
1293 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1294 		       uint64_t offset, uint64_t len,
1295 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1296 {
1297 	uint64_t offset_blocks, num_blocks;
1298 
1299 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, len, &num_blocks) != 0) {
1300 		return -EINVAL;
1301 	}
1302 
1303 	return spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1304 }
1305 
1306 int
1307 spdk_bdev_write_zeroes_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1308 			      uint64_t offset_blocks, uint64_t num_blocks,
1309 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1310 {
1311 	struct spdk_bdev *bdev = desc->bdev;
1312 	struct spdk_bdev_io *bdev_io;
1313 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1314 	uint64_t len;
1315 	bool split_request = false;
1316 
1317 	if (num_blocks > UINT64_MAX / spdk_bdev_get_block_size(bdev)) {
1318 		SPDK_ERRLOG("length argument out of range in write_zeroes\n");
1319 		return -ERANGE;
1320 	}
1321 
1322 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1323 		return -EINVAL;
1324 	}
1325 
1326 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1327 
1328 	if (!bdev_io) {
1329 		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
1330 		return -ENOMEM;
1331 	}
1332 
1333 	bdev_io->ch = channel;
1334 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1335 
1336 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1337 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1338 		bdev_io->u.bdev.num_blocks = num_blocks;
1339 		bdev_io->u.bdev.iovs = NULL;
1340 		bdev_io->u.bdev.iovcnt = 0;
1341 
1342 	} else {
1343 		assert(spdk_bdev_get_block_size(bdev) <= ZERO_BUFFER_SIZE);
1344 
1345 		len = spdk_bdev_get_block_size(bdev) * num_blocks;
1346 
1347 		if (len > ZERO_BUFFER_SIZE) {
1348 			split_request = true;
1349 			len = ZERO_BUFFER_SIZE;
1350 		}
1351 
1352 		bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
1353 		bdev_io->u.bdev.iov.iov_base = g_bdev_mgr.zero_buffer;
1354 		bdev_io->u.bdev.iov.iov_len = len;
1355 		bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1356 		bdev_io->u.bdev.iovcnt = 1;
1357 		bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev);
1358 		bdev_io->split_remaining_num_blocks = num_blocks - bdev_io->u.bdev.num_blocks;
1359 		bdev_io->split_current_offset_blocks = offset_blocks + bdev_io->u.bdev.num_blocks;
1360 	}
1361 
1362 	if (split_request) {
1363 		bdev_io->stored_user_cb = cb;
1364 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, spdk_bdev_write_zeroes_split);
1365 	} else {
1366 		spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1367 	}
1368 	spdk_bdev_io_submit(bdev_io);
1369 	return 0;
1370 }
1371 
1372 int
1373 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1374 		uint64_t offset, uint64_t nbytes,
1375 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1376 {
1377 	uint64_t offset_blocks, num_blocks;
1378 
1379 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, nbytes, &num_blocks) != 0) {
1380 		return -EINVAL;
1381 	}
1382 
1383 	return spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1384 }
1385 
1386 int
1387 spdk_bdev_unmap_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1388 		       uint64_t offset_blocks, uint64_t num_blocks,
1389 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1390 {
1391 	struct spdk_bdev *bdev = desc->bdev;
1392 	struct spdk_bdev_io *bdev_io;
1393 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1394 
1395 	if (!desc->write) {
1396 		return -EBADF;
1397 	}
1398 
1399 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1400 		return -EINVAL;
1401 	}
1402 
1403 	if (num_blocks == 0) {
1404 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
1405 		return -EINVAL;
1406 	}
1407 
1408 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1409 	if (!bdev_io) {
1410 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
1411 		return -ENOMEM;
1412 	}
1413 
1414 	bdev_io->ch = channel;
1415 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1416 	bdev_io->u.bdev.iov.iov_base = NULL;
1417 	bdev_io->u.bdev.iov.iov_len = 0;
1418 	bdev_io->u.bdev.iovs = &bdev_io->u.bdev.iov;
1419 	bdev_io->u.bdev.iovcnt = 1;
1420 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1421 	bdev_io->u.bdev.num_blocks = num_blocks;
1422 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1423 
1424 	spdk_bdev_io_submit(bdev_io);
1425 	return 0;
1426 }
1427 
1428 int
1429 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1430 		uint64_t offset, uint64_t length,
1431 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1432 {
1433 	uint64_t offset_blocks, num_blocks;
1434 
1435 	if (spdk_bdev_bytes_to_blocks(desc->bdev, offset, &offset_blocks, length, &num_blocks) != 0) {
1436 		return -EINVAL;
1437 	}
1438 
1439 	return spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, cb, cb_arg);
1440 }
1441 
1442 int
1443 spdk_bdev_flush_blocks(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1444 		       uint64_t offset_blocks, uint64_t num_blocks,
1445 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
1446 {
1447 	struct spdk_bdev *bdev = desc->bdev;
1448 	struct spdk_bdev_io *bdev_io;
1449 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1450 
1451 	if (!desc->write) {
1452 		return -EBADF;
1453 	}
1454 
1455 	if (!spdk_bdev_io_valid_blocks(bdev, offset_blocks, num_blocks)) {
1456 		return -EINVAL;
1457 	}
1458 
1459 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1460 	if (!bdev_io) {
1461 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1462 		return -ENOMEM;
1463 	}
1464 
1465 	bdev_io->ch = channel;
1466 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1467 	bdev_io->u.bdev.iovs = NULL;
1468 	bdev_io->u.bdev.iovcnt = 0;
1469 	bdev_io->u.bdev.offset_blocks = offset_blocks;
1470 	bdev_io->u.bdev.num_blocks = num_blocks;
1471 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1472 
1473 	spdk_bdev_io_submit(bdev_io);
1474 	return 0;
1475 }
1476 
1477 static void
1478 _spdk_bdev_reset_dev(struct spdk_io_channel_iter *i, int status)
1479 {
1480 	struct spdk_bdev_channel *ch = spdk_io_channel_iter_get_ctx(i);
1481 	struct spdk_bdev_io *bdev_io;
1482 
1483 	bdev_io = TAILQ_FIRST(&ch->queued_resets);
1484 	TAILQ_REMOVE(&ch->queued_resets, bdev_io, link);
1485 	spdk_bdev_io_submit_reset(bdev_io);
1486 }
1487 
1488 static void
1489 _spdk_bdev_reset_freeze_channel(struct spdk_io_channel_iter *i)
1490 {
1491 	struct spdk_io_channel 		*ch;
1492 	struct spdk_bdev_channel	*channel;
1493 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1494 
1495 	ch = spdk_io_channel_iter_get_channel(i);
1496 	channel = spdk_io_channel_get_ctx(ch);
1497 	mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
1498 
1499 	channel->flags |= BDEV_CH_RESET_IN_PROGRESS;
1500 
1501 	_spdk_bdev_abort_queued_io(&channel->nomem_io, channel);
1502 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_small, channel);
1503 	_spdk_bdev_abort_buf_io(&mgmt_channel->need_buf_large, channel);
1504 
1505 	spdk_for_each_channel_continue(i, 0);
1506 }
1507 
1508 static void
1509 _spdk_bdev_start_reset(void *ctx)
1510 {
1511 	struct spdk_bdev_channel *ch = ctx;
1512 
1513 	spdk_for_each_channel(ch->bdev, _spdk_bdev_reset_freeze_channel,
1514 			      ch, _spdk_bdev_reset_dev);
1515 }
1516 
1517 static void
1518 _spdk_bdev_channel_start_reset(struct spdk_bdev_channel *ch)
1519 {
1520 	struct spdk_bdev *bdev = ch->bdev;
1521 
1522 	assert(!TAILQ_EMPTY(&ch->queued_resets));
1523 
1524 	pthread_mutex_lock(&bdev->mutex);
1525 	if (bdev->reset_in_progress == NULL) {
1526 		bdev->reset_in_progress = TAILQ_FIRST(&ch->queued_resets);
1527 		/*
1528 		 * Take a channel reference for the target bdev for the life of this
1529 		 *  reset.  This guards against the channel getting destroyed while
1530 		 *  spdk_for_each_channel() calls related to this reset IO are in
1531 		 *  progress.  We will release the reference when this reset is
1532 		 *  completed.
1533 		 */
1534 		bdev->reset_in_progress->u.reset.ch_ref = spdk_get_io_channel(bdev);
1535 		_spdk_bdev_start_reset(ch);
1536 	}
1537 	pthread_mutex_unlock(&bdev->mutex);
1538 }
1539 
1540 int
1541 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1542 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1543 {
1544 	struct spdk_bdev *bdev = desc->bdev;
1545 	struct spdk_bdev_io *bdev_io;
1546 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1547 
1548 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1549 	if (!bdev_io) {
1550 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1551 		return -ENOMEM;
1552 	}
1553 
1554 	bdev_io->ch = channel;
1555 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1556 	bdev_io->u.reset.ch_ref = NULL;
1557 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1558 
1559 	pthread_mutex_lock(&bdev->mutex);
1560 	TAILQ_INSERT_TAIL(&channel->queued_resets, bdev_io, link);
1561 	pthread_mutex_unlock(&bdev->mutex);
1562 
1563 	_spdk_bdev_channel_start_reset(channel);
1564 
1565 	return 0;
1566 }
1567 
1568 void
1569 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1570 		      struct spdk_bdev_io_stat *stat)
1571 {
1572 #ifdef SPDK_CONFIG_VTUNE
1573 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1574 	memset(stat, 0, sizeof(*stat));
1575 	return;
1576 #endif
1577 
1578 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1579 
1580 	*stat = channel->stat;
1581 	memset(&channel->stat, 0, sizeof(channel->stat));
1582 }
1583 
1584 int
1585 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1586 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1587 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1588 {
1589 	struct spdk_bdev *bdev = desc->bdev;
1590 	struct spdk_bdev_io *bdev_io;
1591 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1592 
1593 	if (!desc->write) {
1594 		return -EBADF;
1595 	}
1596 
1597 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1598 	if (!bdev_io) {
1599 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1600 		return -ENOMEM;
1601 	}
1602 
1603 	bdev_io->ch = channel;
1604 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
1605 	bdev_io->u.nvme_passthru.cmd = *cmd;
1606 	bdev_io->u.nvme_passthru.buf = buf;
1607 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1608 	bdev_io->u.nvme_passthru.md_buf = NULL;
1609 	bdev_io->u.nvme_passthru.md_len = 0;
1610 
1611 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1612 
1613 	spdk_bdev_io_submit(bdev_io);
1614 	return 0;
1615 }
1616 
1617 int
1618 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1619 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1620 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1621 {
1622 	struct spdk_bdev *bdev = desc->bdev;
1623 	struct spdk_bdev_io *bdev_io;
1624 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1625 
1626 	if (!desc->write) {
1627 		/*
1628 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1629 		 *  to easily determine if the command is a read or write, but for now just
1630 		 *  do not allow io_passthru with a read-only descriptor.
1631 		 */
1632 		return -EBADF;
1633 	}
1634 
1635 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1636 	if (!bdev_io) {
1637 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1638 		return -ENOMEM;
1639 	}
1640 
1641 	bdev_io->ch = channel;
1642 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
1643 	bdev_io->u.nvme_passthru.cmd = *cmd;
1644 	bdev_io->u.nvme_passthru.buf = buf;
1645 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1646 	bdev_io->u.nvme_passthru.md_buf = NULL;
1647 	bdev_io->u.nvme_passthru.md_len = 0;
1648 
1649 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1650 
1651 	spdk_bdev_io_submit(bdev_io);
1652 	return 0;
1653 }
1654 
1655 int
1656 spdk_bdev_nvme_io_passthru_md(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1657 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes, void *md_buf, size_t md_len,
1658 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1659 {
1660 	struct spdk_bdev *bdev = desc->bdev;
1661 	struct spdk_bdev_io *bdev_io;
1662 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1663 
1664 	if (!desc->write) {
1665 		/*
1666 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1667 		 *  to easily determine if the command is a read or write, but for now just
1668 		 *  do not allow io_passthru with a read-only descriptor.
1669 		 */
1670 		return -EBADF;
1671 	}
1672 
1673 	bdev_io = spdk_bdev_get_io(channel->mgmt_channel);
1674 	if (!bdev_io) {
1675 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1676 		return -ENOMEM;
1677 	}
1678 
1679 	bdev_io->ch = channel;
1680 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO_MD;
1681 	bdev_io->u.nvme_passthru.cmd = *cmd;
1682 	bdev_io->u.nvme_passthru.buf = buf;
1683 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1684 	bdev_io->u.nvme_passthru.md_buf = md_buf;
1685 	bdev_io->u.nvme_passthru.md_len = md_len;
1686 
1687 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1688 
1689 	spdk_bdev_io_submit(bdev_io);
1690 	return 0;
1691 }
1692 
1693 int
1694 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
1695 {
1696 	if (!bdev_io) {
1697 		SPDK_ERRLOG("bdev_io is NULL\n");
1698 		return -1;
1699 	}
1700 
1701 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
1702 		SPDK_ERRLOG("bdev_io is in pending state\n");
1703 		assert(false);
1704 		return -1;
1705 	}
1706 
1707 	spdk_bdev_put_io(bdev_io);
1708 
1709 	return 0;
1710 }
1711 
1712 static void
1713 _spdk_bdev_ch_retry_io(struct spdk_bdev_channel *bdev_ch)
1714 {
1715 	struct spdk_bdev *bdev = bdev_ch->bdev;
1716 	struct spdk_bdev_io *bdev_io;
1717 
1718 	if (bdev_ch->io_outstanding > bdev_ch->nomem_threshold) {
1719 		/*
1720 		 * Allow some more I/O to complete before retrying the nomem_io queue.
1721 		 *  Some drivers (such as nvme) cannot immediately take a new I/O in
1722 		 *  the context of a completion, because the resources for the I/O are
1723 		 *  not released until control returns to the bdev poller.  Also, we
1724 		 *  may require several small I/O to complete before a larger I/O
1725 		 *  (that requires splitting) can be submitted.
1726 		 */
1727 		return;
1728 	}
1729 
1730 	while (!TAILQ_EMPTY(&bdev_ch->nomem_io)) {
1731 		bdev_io = TAILQ_FIRST(&bdev_ch->nomem_io);
1732 		TAILQ_REMOVE(&bdev_ch->nomem_io, bdev_io, link);
1733 		bdev_ch->io_outstanding++;
1734 		bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
1735 		bdev->fn_table->submit_request(bdev_ch->channel, bdev_io);
1736 		if (bdev_io->status == SPDK_BDEV_IO_STATUS_NOMEM) {
1737 			break;
1738 		}
1739 	}
1740 }
1741 
1742 static void
1743 _spdk_bdev_io_complete(void *ctx)
1744 {
1745 	struct spdk_bdev_io *bdev_io = ctx;
1746 
1747 	assert(bdev_io->cb != NULL);
1748 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, bdev_io->caller_ctx);
1749 }
1750 
1751 static void
1752 _spdk_bdev_reset_complete(struct spdk_io_channel_iter *i, int status)
1753 {
1754 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
1755 
1756 	if (bdev_io->u.reset.ch_ref != NULL) {
1757 		spdk_put_io_channel(bdev_io->u.reset.ch_ref);
1758 		bdev_io->u.reset.ch_ref = NULL;
1759 	}
1760 
1761 	_spdk_bdev_io_complete(bdev_io);
1762 }
1763 
1764 static void
1765 _spdk_bdev_unfreeze_channel(struct spdk_io_channel_iter *i)
1766 {
1767 	struct spdk_io_channel *_ch = spdk_io_channel_iter_get_channel(i);
1768 	struct spdk_bdev_channel *ch = spdk_io_channel_get_ctx(_ch);
1769 
1770 	ch->flags &= ~BDEV_CH_RESET_IN_PROGRESS;
1771 	if (!TAILQ_EMPTY(&ch->queued_resets)) {
1772 		_spdk_bdev_channel_start_reset(ch);
1773 	}
1774 
1775 	spdk_for_each_channel_continue(i, 0);
1776 }
1777 
1778 void
1779 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
1780 {
1781 	struct spdk_bdev *bdev = bdev_io->bdev;
1782 	struct spdk_bdev_channel *bdev_ch = bdev_io->ch;
1783 
1784 	bdev_io->status = status;
1785 
1786 	if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_RESET)) {
1787 		bool unlock_channels = false;
1788 
1789 		if (status == SPDK_BDEV_IO_STATUS_NOMEM) {
1790 			SPDK_ERRLOG("NOMEM returned for reset\n");
1791 		}
1792 		pthread_mutex_lock(&bdev->mutex);
1793 		if (bdev_io == bdev->reset_in_progress) {
1794 			bdev->reset_in_progress = NULL;
1795 			unlock_channels = true;
1796 		}
1797 		pthread_mutex_unlock(&bdev->mutex);
1798 
1799 		if (unlock_channels) {
1800 			spdk_for_each_channel(bdev, _spdk_bdev_unfreeze_channel, bdev_io,
1801 					      _spdk_bdev_reset_complete);
1802 			return;
1803 		}
1804 	} else {
1805 		assert(bdev_ch->io_outstanding > 0);
1806 		bdev_ch->io_outstanding--;
1807 		if (spdk_likely(status != SPDK_BDEV_IO_STATUS_NOMEM)) {
1808 			if (spdk_unlikely(!TAILQ_EMPTY(&bdev_ch->nomem_io))) {
1809 				_spdk_bdev_ch_retry_io(bdev_ch);
1810 			}
1811 		} else {
1812 			TAILQ_INSERT_HEAD(&bdev_ch->nomem_io, bdev_io, link);
1813 			/*
1814 			 * Wait for some of the outstanding I/O to complete before we
1815 			 *  retry any of the nomem_io.  Normally we will wait for
1816 			 *  NOMEM_THRESHOLD_COUNT I/O to complete but for low queue
1817 			 *  depth channels we will instead wait for half to complete.
1818 			 */
1819 			bdev_ch->nomem_threshold = spdk_max((int64_t)bdev_ch->io_outstanding / 2,
1820 							    (int64_t)bdev_ch->io_outstanding - NOMEM_THRESHOLD_COUNT);
1821 			return;
1822 		}
1823 	}
1824 
1825 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1826 		switch (bdev_io->type) {
1827 		case SPDK_BDEV_IO_TYPE_READ:
1828 			bdev_ch->stat.bytes_read += bdev_io->u.bdev.num_blocks * bdev->blocklen;
1829 			bdev_ch->stat.num_read_ops++;
1830 			break;
1831 		case SPDK_BDEV_IO_TYPE_WRITE:
1832 			bdev_ch->stat.bytes_written += bdev_io->u.bdev.num_blocks * bdev->blocklen;
1833 			bdev_ch->stat.num_write_ops++;
1834 			break;
1835 		default:
1836 			break;
1837 		}
1838 	}
1839 
1840 #ifdef SPDK_CONFIG_VTUNE
1841 	uint64_t now_tsc = spdk_get_ticks();
1842 	if (now_tsc > (bdev_ch->start_tsc + bdev_ch->interval_tsc)) {
1843 		uint64_t data[5];
1844 
1845 		data[0] = bdev_ch->stat.num_read_ops;
1846 		data[1] = bdev_ch->stat.bytes_read;
1847 		data[2] = bdev_ch->stat.num_write_ops;
1848 		data[3] = bdev_ch->stat.bytes_written;
1849 		data[4] = bdev->fn_table->get_spin_time ?
1850 			  bdev->fn_table->get_spin_time(bdev_ch->channel) : 0;
1851 
1852 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_ch->handle,
1853 				   __itt_metadata_u64, 5, data);
1854 
1855 		memset(&bdev_ch->stat, 0, sizeof(bdev_ch->stat));
1856 		bdev_ch->start_tsc = now_tsc;
1857 	}
1858 #endif
1859 
1860 	if (bdev_io->in_submit_request) {
1861 		/*
1862 		 * Defer completion to avoid potential infinite recursion if the
1863 		 * user's completion callback issues a new I/O.
1864 		 */
1865 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_ch->channel),
1866 				     _spdk_bdev_io_complete, bdev_io);
1867 	} else {
1868 		_spdk_bdev_io_complete(bdev_io);
1869 	}
1870 }
1871 
1872 void
1873 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
1874 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
1875 {
1876 	if (sc == SPDK_SCSI_STATUS_GOOD) {
1877 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1878 	} else {
1879 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
1880 		bdev_io->error.scsi.sc = sc;
1881 		bdev_io->error.scsi.sk = sk;
1882 		bdev_io->error.scsi.asc = asc;
1883 		bdev_io->error.scsi.ascq = ascq;
1884 	}
1885 
1886 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1887 }
1888 
1889 void
1890 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
1891 			     int *sc, int *sk, int *asc, int *ascq)
1892 {
1893 	assert(sc != NULL);
1894 	assert(sk != NULL);
1895 	assert(asc != NULL);
1896 	assert(ascq != NULL);
1897 
1898 	switch (bdev_io->status) {
1899 	case SPDK_BDEV_IO_STATUS_SUCCESS:
1900 		*sc = SPDK_SCSI_STATUS_GOOD;
1901 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
1902 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1903 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1904 		break;
1905 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
1906 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
1907 		break;
1908 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
1909 		*sc = bdev_io->error.scsi.sc;
1910 		*sk = bdev_io->error.scsi.sk;
1911 		*asc = bdev_io->error.scsi.asc;
1912 		*ascq = bdev_io->error.scsi.ascq;
1913 		break;
1914 	default:
1915 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
1916 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
1917 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1918 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1919 		break;
1920 	}
1921 }
1922 
1923 void
1924 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
1925 {
1926 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
1927 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1928 	} else {
1929 		bdev_io->error.nvme.sct = sct;
1930 		bdev_io->error.nvme.sc = sc;
1931 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
1932 	}
1933 
1934 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1935 }
1936 
1937 void
1938 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
1939 {
1940 	assert(sct != NULL);
1941 	assert(sc != NULL);
1942 
1943 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
1944 		*sct = bdev_io->error.nvme.sct;
1945 		*sc = bdev_io->error.nvme.sc;
1946 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1947 		*sct = SPDK_NVME_SCT_GENERIC;
1948 		*sc = SPDK_NVME_SC_SUCCESS;
1949 	} else {
1950 		*sct = SPDK_NVME_SCT_GENERIC;
1951 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1952 	}
1953 }
1954 
1955 struct spdk_thread *
1956 spdk_bdev_io_get_thread(struct spdk_bdev_io *bdev_io)
1957 {
1958 	return spdk_io_channel_get_thread(bdev_io->ch->channel);
1959 }
1960 
1961 static int
1962 _spdk_bdev_register(struct spdk_bdev *bdev)
1963 {
1964 	struct spdk_bdev_module_if *module;
1965 
1966 	assert(bdev->module != NULL);
1967 
1968 	if (!bdev->name) {
1969 		SPDK_ERRLOG("Bdev name is NULL\n");
1970 		return -EINVAL;
1971 	}
1972 
1973 	if (spdk_bdev_get_by_name(bdev->name)) {
1974 		SPDK_ERRLOG("Bdev name:%s already exists\n", bdev->name);
1975 		return -EEXIST;
1976 	}
1977 
1978 	bdev->status = SPDK_BDEV_STATUS_READY;
1979 
1980 	TAILQ_INIT(&bdev->open_descs);
1981 
1982 	TAILQ_INIT(&bdev->vbdevs);
1983 	TAILQ_INIT(&bdev->base_bdevs);
1984 
1985 	TAILQ_INIT(&bdev->aliases);
1986 
1987 	bdev->reset_in_progress = NULL;
1988 
1989 	spdk_io_device_register(bdev, spdk_bdev_channel_create, spdk_bdev_channel_destroy,
1990 				sizeof(struct spdk_bdev_channel));
1991 
1992 	pthread_mutex_init(&bdev->mutex, NULL);
1993 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Inserting bdev %s into list\n", bdev->name);
1994 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
1995 
1996 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
1997 		if (module->examine) {
1998 			module->action_in_progress++;
1999 			module->examine(bdev);
2000 		}
2001 	}
2002 
2003 	return 0;
2004 }
2005 
2006 int
2007 spdk_bdev_register(struct spdk_bdev *bdev)
2008 {
2009 	return _spdk_bdev_register(bdev);
2010 }
2011 
2012 int
2013 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
2014 {
2015 	int i, rc;
2016 
2017 	rc = _spdk_bdev_register(vbdev);
2018 	if (rc) {
2019 		return rc;
2020 	}
2021 
2022 	for (i = 0; i < base_bdev_count; i++) {
2023 		assert(base_bdevs[i] != NULL);
2024 		TAILQ_INSERT_TAIL(&vbdev->base_bdevs, base_bdevs[i], base_bdev_link);
2025 		TAILQ_INSERT_TAIL(&base_bdevs[i]->vbdevs, vbdev, vbdev_link);
2026 	}
2027 
2028 	return 0;
2029 }
2030 
2031 void
2032 spdk_bdev_unregister_done(struct spdk_bdev *bdev, int bdeverrno)
2033 {
2034 	if (bdev->unregister_cb != NULL) {
2035 		bdev->unregister_cb(bdev->unregister_ctx, bdeverrno);
2036 	}
2037 }
2038 
2039 void
2040 spdk_bdev_unregister(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
2041 {
2042 	struct spdk_bdev_desc	*desc, *tmp;
2043 	int			rc;
2044 	bool			do_destruct = true;
2045 
2046 	SPDK_DEBUGLOG(SPDK_LOG_BDEV, "Removing bdev %s from list\n", bdev->name);
2047 
2048 	pthread_mutex_lock(&bdev->mutex);
2049 
2050 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
2051 	bdev->unregister_cb = cb_fn;
2052 	bdev->unregister_ctx = cb_arg;
2053 
2054 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
2055 		if (desc->remove_cb) {
2056 			pthread_mutex_unlock(&bdev->mutex);
2057 			do_destruct = false;
2058 			desc->remove_cb(desc->remove_ctx);
2059 			pthread_mutex_lock(&bdev->mutex);
2060 		}
2061 	}
2062 
2063 	if (!do_destruct) {
2064 		pthread_mutex_unlock(&bdev->mutex);
2065 		return;
2066 	}
2067 
2068 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
2069 	pthread_mutex_unlock(&bdev->mutex);
2070 
2071 	pthread_mutex_destroy(&bdev->mutex);
2072 
2073 	spdk_io_device_unregister(bdev, NULL);
2074 
2075 	rc = bdev->fn_table->destruct(bdev->ctxt);
2076 	if (rc < 0) {
2077 		SPDK_ERRLOG("destruct failed\n");
2078 	}
2079 	if (rc <= 0 && cb_fn != NULL) {
2080 		cb_fn(cb_arg, rc);
2081 	}
2082 }
2083 
2084 void
2085 spdk_vbdev_unregister(struct spdk_bdev *vbdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
2086 {
2087 	struct spdk_bdev *base_bdev;
2088 
2089 	assert(!TAILQ_EMPTY(&vbdev->base_bdevs));
2090 	TAILQ_FOREACH(base_bdev, &vbdev->base_bdevs, base_bdev_link) {
2091 		TAILQ_REMOVE(&base_bdev->vbdevs, vbdev, vbdev_link);
2092 	}
2093 	spdk_bdev_unregister(vbdev, cb_fn, cb_arg);
2094 }
2095 
2096 int
2097 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
2098 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
2099 {
2100 	struct spdk_bdev_desc *desc;
2101 
2102 	desc = calloc(1, sizeof(*desc));
2103 	if (desc == NULL) {
2104 		SPDK_ERRLOG("Failed to allocate memory for bdev descriptor\n");
2105 		return -ENOMEM;
2106 	}
2107 
2108 	pthread_mutex_lock(&bdev->mutex);
2109 
2110 	if (write && bdev->claim_module) {
2111 		SPDK_INFOLOG(SPDK_LOG_BDEV, "Could not open %s - already claimed\n", bdev->name);
2112 		free(desc);
2113 		pthread_mutex_unlock(&bdev->mutex);
2114 		return -EPERM;
2115 	}
2116 
2117 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
2118 
2119 	desc->bdev = bdev;
2120 	desc->remove_cb = remove_cb;
2121 	desc->remove_ctx = remove_ctx;
2122 	desc->write = write;
2123 	*_desc = desc;
2124 
2125 	pthread_mutex_unlock(&bdev->mutex);
2126 
2127 	return 0;
2128 }
2129 
2130 void
2131 spdk_bdev_close(struct spdk_bdev_desc *desc)
2132 {
2133 	struct spdk_bdev *bdev = desc->bdev;
2134 	bool do_unregister = false;
2135 
2136 	pthread_mutex_lock(&bdev->mutex);
2137 
2138 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
2139 	free(desc);
2140 
2141 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
2142 		do_unregister = true;
2143 	}
2144 	pthread_mutex_unlock(&bdev->mutex);
2145 
2146 	if (do_unregister == true) {
2147 		spdk_bdev_unregister(bdev, bdev->unregister_cb, bdev->unregister_ctx);
2148 	}
2149 }
2150 
2151 int
2152 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
2153 			    struct spdk_bdev_module_if *module)
2154 {
2155 	if (bdev->claim_module != NULL) {
2156 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
2157 			    bdev->claim_module->name);
2158 		return -EPERM;
2159 	}
2160 
2161 	if (desc && !desc->write) {
2162 		desc->write = true;
2163 	}
2164 
2165 	bdev->claim_module = module;
2166 	return 0;
2167 }
2168 
2169 void
2170 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
2171 {
2172 	assert(bdev->claim_module != NULL);
2173 	bdev->claim_module = NULL;
2174 }
2175 
2176 struct spdk_bdev *
2177 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
2178 {
2179 	return desc->bdev;
2180 }
2181 
2182 void
2183 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
2184 {
2185 	struct iovec *iovs;
2186 	int iovcnt;
2187 
2188 	if (bdev_io == NULL) {
2189 		return;
2190 	}
2191 
2192 	switch (bdev_io->type) {
2193 	case SPDK_BDEV_IO_TYPE_READ:
2194 		iovs = bdev_io->u.bdev.iovs;
2195 		iovcnt = bdev_io->u.bdev.iovcnt;
2196 		break;
2197 	case SPDK_BDEV_IO_TYPE_WRITE:
2198 		iovs = bdev_io->u.bdev.iovs;
2199 		iovcnt = bdev_io->u.bdev.iovcnt;
2200 		break;
2201 	default:
2202 		iovs = NULL;
2203 		iovcnt = 0;
2204 		break;
2205 	}
2206 
2207 	if (iovp) {
2208 		*iovp = iovs;
2209 	}
2210 	if (iovcntp) {
2211 		*iovcntp = iovcnt;
2212 	}
2213 }
2214 
2215 void
2216 spdk_bdev_module_list_add(struct spdk_bdev_module_if *bdev_module)
2217 {
2218 	/*
2219 	 * Modules with examine callbacks must be initialized first, so they are
2220 	 *  ready to handle examine callbacks from later modules that will
2221 	 *  register physical bdevs.
2222 	 */
2223 	if (bdev_module->examine != NULL) {
2224 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2225 	} else {
2226 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
2227 	}
2228 }
2229 
2230 void
2231 spdk_bdev_part_base_free(struct spdk_bdev_part_base *base)
2232 {
2233 	if (base->desc) {
2234 		spdk_bdev_close(base->desc);
2235 		base->desc = NULL;
2236 	}
2237 	base->base_free_fn(base);
2238 }
2239 
2240 void
2241 spdk_bdev_part_free(struct spdk_bdev_part *part)
2242 {
2243 	struct spdk_bdev_part_base *base;
2244 
2245 	assert(part);
2246 	assert(part->base);
2247 
2248 	base = part->base;
2249 	spdk_io_device_unregister(&part->base, NULL);
2250 	TAILQ_REMOVE(base->tailq, part, tailq);
2251 	free(part->bdev.name);
2252 	free(part);
2253 
2254 	if (__sync_sub_and_fetch(&base->ref, 1) == 0) {
2255 		spdk_bdev_module_release_bdev(base->bdev);
2256 		spdk_bdev_part_base_free(base);
2257 	}
2258 }
2259 
2260 void
2261 spdk_bdev_part_base_hotremove(struct spdk_bdev *base_bdev, struct bdev_part_tailq *tailq)
2262 {
2263 	struct spdk_bdev_part *part, *tmp;
2264 
2265 	TAILQ_FOREACH_SAFE(part, tailq, tailq, tmp) {
2266 		if (part->base->bdev == base_bdev) {
2267 			spdk_vbdev_unregister(&part->bdev, NULL, NULL);
2268 		}
2269 	}
2270 }
2271 
2272 static bool
2273 spdk_bdev_part_io_type_supported(void *_part, enum spdk_bdev_io_type io_type)
2274 {
2275 	struct spdk_bdev_part *part = _part;
2276 
2277 	return part->base->bdev->fn_table->io_type_supported(part->base->bdev, io_type);
2278 }
2279 
2280 static struct spdk_io_channel *
2281 spdk_bdev_part_get_io_channel(void *_part)
2282 {
2283 	struct spdk_bdev_part *part = _part;
2284 
2285 	return spdk_get_io_channel(&part->base);
2286 }
2287 
2288 static void
2289 spdk_bdev_part_complete_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2290 {
2291 	struct spdk_bdev_io *part_io = cb_arg;
2292 	int status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
2293 
2294 	spdk_bdev_io_complete(part_io, status);
2295 	spdk_bdev_free_io(bdev_io);
2296 }
2297 
2298 static void
2299 spdk_bdev_write_zeroes_split(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
2300 {
2301 	uint64_t len;
2302 
2303 	if (!success) {
2304 		bdev_io->cb = bdev_io->stored_user_cb;
2305 		_spdk_bdev_io_complete(bdev_io);
2306 		return;
2307 	}
2308 
2309 	/* no need to perform the error checking from write_zeroes_blocks because this request already passed those checks. */
2310 	len = spdk_min(spdk_bdev_get_block_size(bdev_io->bdev) * bdev_io->split_remaining_num_blocks,
2311 		       ZERO_BUFFER_SIZE);
2312 
2313 	bdev_io->u.bdev.offset_blocks = bdev_io->split_current_offset_blocks;
2314 	bdev_io->u.bdev.iov.iov_len = len;
2315 	bdev_io->u.bdev.num_blocks = len / spdk_bdev_get_block_size(bdev_io->bdev);
2316 	bdev_io->split_remaining_num_blocks -= bdev_io->u.bdev.num_blocks;
2317 	bdev_io->split_current_offset_blocks += bdev_io->u.bdev.num_blocks;
2318 
2319 	/* if this round completes the i/o, change the callback to be the original user callback */
2320 	if (bdev_io->split_remaining_num_blocks == 0) {
2321 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, bdev_io->stored_user_cb);
2322 	} else {
2323 		spdk_bdev_io_init(bdev_io, bdev_io->bdev, cb_arg, spdk_bdev_write_zeroes_split);
2324 	}
2325 	spdk_bdev_io_submit(bdev_io);
2326 }
2327 
2328 void
2329 spdk_bdev_part_submit_request(struct spdk_bdev_part_channel *ch, struct spdk_bdev_io *bdev_io)
2330 {
2331 	struct spdk_bdev_part *part = ch->part;
2332 	struct spdk_io_channel *base_ch = ch->base_ch;
2333 	struct spdk_bdev_desc *base_desc = part->base->desc;
2334 	uint64_t offset;
2335 	int rc = 0;
2336 
2337 	/* Modify the I/O to adjust for the offset within the base bdev. */
2338 	switch (bdev_io->type) {
2339 	case SPDK_BDEV_IO_TYPE_READ:
2340 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2341 		rc = spdk_bdev_readv_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
2342 					    bdev_io->u.bdev.iovcnt, offset,
2343 					    bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
2344 					    bdev_io);
2345 		break;
2346 	case SPDK_BDEV_IO_TYPE_WRITE:
2347 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2348 		rc = spdk_bdev_writev_blocks(base_desc, base_ch, bdev_io->u.bdev.iovs,
2349 					     bdev_io->u.bdev.iovcnt, offset,
2350 					     bdev_io->u.bdev.num_blocks, spdk_bdev_part_complete_io,
2351 					     bdev_io);
2352 		break;
2353 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
2354 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2355 		rc = spdk_bdev_write_zeroes_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2356 						   spdk_bdev_part_complete_io, bdev_io);
2357 		break;
2358 	case SPDK_BDEV_IO_TYPE_UNMAP:
2359 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2360 		rc = spdk_bdev_unmap_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2361 					    spdk_bdev_part_complete_io, bdev_io);
2362 		break;
2363 	case SPDK_BDEV_IO_TYPE_FLUSH:
2364 		offset = bdev_io->u.bdev.offset_blocks + part->offset_blocks;
2365 		rc = spdk_bdev_flush_blocks(base_desc, base_ch, offset, bdev_io->u.bdev.num_blocks,
2366 					    spdk_bdev_part_complete_io, bdev_io);
2367 		break;
2368 	case SPDK_BDEV_IO_TYPE_RESET:
2369 		rc = spdk_bdev_reset(base_desc, base_ch,
2370 				     spdk_bdev_part_complete_io, bdev_io);
2371 		break;
2372 	default:
2373 		SPDK_ERRLOG("split: unknown I/O type %d\n", bdev_io->type);
2374 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
2375 		return;
2376 	}
2377 
2378 	if (rc != 0) {
2379 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
2380 	}
2381 }
2382 static int
2383 spdk_bdev_part_channel_create_cb(void *io_device, void *ctx_buf)
2384 {
2385 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
2386 	struct spdk_bdev_part_channel *ch = ctx_buf;
2387 
2388 	ch->part = part;
2389 	ch->base_ch = spdk_bdev_get_io_channel(part->base->desc);
2390 	if (ch->base_ch == NULL) {
2391 		return -1;
2392 	}
2393 
2394 	if (part->base->ch_create_cb) {
2395 		return part->base->ch_create_cb(io_device, ctx_buf);
2396 	} else {
2397 		return 0;
2398 	}
2399 }
2400 
2401 static void
2402 spdk_bdev_part_channel_destroy_cb(void *io_device, void *ctx_buf)
2403 {
2404 	struct spdk_bdev_part *part = SPDK_CONTAINEROF(io_device, struct spdk_bdev_part, base);
2405 	struct spdk_bdev_part_channel *ch = ctx_buf;
2406 
2407 	if (part->base->ch_destroy_cb) {
2408 		part->base->ch_destroy_cb(io_device, ctx_buf);
2409 	}
2410 	spdk_put_io_channel(ch->base_ch);
2411 }
2412 
2413 int
2414 spdk_bdev_part_base_construct(struct spdk_bdev_part_base *base, struct spdk_bdev *bdev,
2415 			      spdk_bdev_remove_cb_t remove_cb, struct spdk_bdev_module_if *module,
2416 			      struct spdk_bdev_fn_table *fn_table, struct bdev_part_tailq *tailq,
2417 			      spdk_bdev_part_base_free_fn free_fn,
2418 			      uint32_t channel_size, spdk_io_channel_create_cb ch_create_cb,
2419 			      spdk_io_channel_destroy_cb ch_destroy_cb)
2420 {
2421 	int rc;
2422 
2423 	fn_table->get_io_channel = spdk_bdev_part_get_io_channel;
2424 	fn_table->io_type_supported = spdk_bdev_part_io_type_supported;
2425 
2426 	base->bdev = bdev;
2427 	base->desc = NULL;
2428 	base->ref = 0;
2429 	base->module = module;
2430 	base->fn_table = fn_table;
2431 	base->tailq = tailq;
2432 	base->claimed = false;
2433 	base->channel_size = channel_size;
2434 	base->ch_create_cb = ch_create_cb;
2435 	base->ch_destroy_cb = ch_destroy_cb;
2436 	base->base_free_fn = free_fn;
2437 
2438 	rc = spdk_bdev_open(bdev, false, remove_cb, bdev, &base->desc);
2439 	if (rc) {
2440 		spdk_bdev_part_base_free(base);
2441 		SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(bdev));
2442 		return -1;
2443 	}
2444 
2445 	return 0;
2446 }
2447 
2448 int
2449 spdk_bdev_part_construct(struct spdk_bdev_part *part, struct spdk_bdev_part_base *base,
2450 			 char *name, uint64_t offset_blocks, uint64_t num_blocks,
2451 			 char *product_name)
2452 {
2453 	part->bdev.name = name;
2454 	part->bdev.blocklen = base->bdev->blocklen;
2455 	part->bdev.blockcnt = num_blocks;
2456 	part->offset_blocks = offset_blocks;
2457 
2458 	part->bdev.write_cache = base->bdev->write_cache;
2459 	part->bdev.need_aligned_buffer = base->bdev->need_aligned_buffer;
2460 	part->bdev.product_name = product_name;
2461 	part->bdev.ctxt = part;
2462 	part->bdev.module = base->module;
2463 	part->bdev.fn_table = base->fn_table;
2464 
2465 	__sync_fetch_and_add(&base->ref, 1);
2466 	part->base = base;
2467 
2468 	if (!base->claimed) {
2469 		int rc;
2470 
2471 		rc = spdk_bdev_module_claim_bdev(base->bdev, base->desc, base->module);
2472 		if (rc) {
2473 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(base->bdev));
2474 			free(part->bdev.name);
2475 			return -1;
2476 		}
2477 		base->claimed = true;
2478 	}
2479 
2480 	spdk_io_device_register(&part->base, spdk_bdev_part_channel_create_cb,
2481 				spdk_bdev_part_channel_destroy_cb,
2482 				base->channel_size);
2483 	spdk_vbdev_register(&part->bdev, &base->bdev, 1);
2484 	TAILQ_INSERT_TAIL(base->tailq, part, tailq);
2485 
2486 	return 0;
2487 }
2488 
2489 SPDK_LOG_REGISTER_COMPONENT("bdev", SPDK_LOG_BDEV)
2490