xref: /spdk/lib/bdev/bdev.c (revision d92f0f75caf311608f5f0e19d4b3db349609b4e8)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/likely.h"
42 #include "spdk/queue.h"
43 #include "spdk/nvme_spec.h"
44 #include "spdk/scsi_spec.h"
45 
46 #include "spdk_internal/bdev.h"
47 #include "spdk_internal/log.h"
48 #include "spdk/string.h"
49 
50 #ifdef SPDK_CONFIG_VTUNE
51 #include "ittnotify.h"
52 #include "ittnotify_types.h"
53 int __itt_init_ittlib(const char *, __itt_group_id);
54 #endif
55 
56 #define SPDK_BDEV_IO_POOL_SIZE	(64 * 1024)
57 #define BUF_SMALL_POOL_SIZE	8192
58 #define BUF_LARGE_POOL_SIZE	1024
59 
60 typedef TAILQ_HEAD(, spdk_bdev_io) need_buf_tailq_t;
61 
62 struct spdk_bdev_mgr {
63 	struct spdk_mempool *bdev_io_pool;
64 
65 	struct spdk_mempool *buf_small_pool;
66 	struct spdk_mempool *buf_large_pool;
67 
68 	TAILQ_HEAD(, spdk_bdev_module_if) bdev_modules;
69 
70 	TAILQ_HEAD(, spdk_bdev) bdevs;
71 
72 	spdk_bdev_poller_start_cb start_poller_fn;
73 	spdk_bdev_poller_stop_cb stop_poller_fn;
74 
75 	bool init_complete;
76 	bool module_init_complete;
77 
78 #ifdef SPDK_CONFIG_VTUNE
79 	__itt_domain	*domain;
80 #endif
81 };
82 
83 static struct spdk_bdev_mgr g_bdev_mgr = {
84 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
85 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
86 	.start_poller_fn = NULL,
87 	.stop_poller_fn = NULL,
88 	.init_complete = false,
89 	.module_init_complete = false,
90 };
91 
92 static spdk_bdev_init_cb	g_cb_fn = NULL;
93 static void			*g_cb_arg = NULL;
94 
95 
96 struct spdk_bdev_mgmt_channel {
97 	need_buf_tailq_t need_buf_small;
98 	need_buf_tailq_t need_buf_large;
99 };
100 
101 struct spdk_bdev_desc {
102 	struct spdk_bdev		*bdev;
103 	spdk_bdev_remove_cb_t		remove_cb;
104 	void				*remove_ctx;
105 	bool				write;
106 	TAILQ_ENTRY(spdk_bdev_desc)	link;
107 };
108 
109 struct spdk_bdev_channel {
110 	struct spdk_bdev	*bdev;
111 
112 	/* The channel for the underlying device */
113 	struct spdk_io_channel	*channel;
114 
115 	/* Channel for the bdev manager */
116 	struct spdk_io_channel *mgmt_channel;
117 
118 	struct spdk_bdev_io_stat stat;
119 
120 	/*
121 	 * Count of I/O submitted to bdev module and waiting for completion.
122 	 * Incremented before submit_request() is called on an spdk_bdev_io.
123 	 */
124 	uint64_t		io_outstanding;
125 
126 #ifdef SPDK_CONFIG_VTUNE
127 	uint64_t		start_tsc;
128 	uint64_t		interval_tsc;
129 	__itt_string_handle	*handle;
130 #endif
131 
132 };
133 
134 struct spdk_bdev *
135 spdk_bdev_first(void)
136 {
137 	struct spdk_bdev *bdev;
138 
139 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
140 	if (bdev) {
141 		SPDK_DEBUGLOG(SPDK_TRACE_DEBUG, "Starting bdev iteration at %s\n", bdev->name);
142 	}
143 
144 	return bdev;
145 }
146 
147 struct spdk_bdev *
148 spdk_bdev_next(struct spdk_bdev *prev)
149 {
150 	struct spdk_bdev *bdev;
151 
152 	bdev = TAILQ_NEXT(prev, link);
153 	if (bdev) {
154 		SPDK_DEBUGLOG(SPDK_TRACE_DEBUG, "Continuing bdev iteration at %s\n", bdev->name);
155 	}
156 
157 	return bdev;
158 }
159 
160 static struct spdk_bdev *
161 _bdev_next_leaf(struct spdk_bdev *bdev)
162 {
163 	while (bdev != NULL) {
164 		if (TAILQ_EMPTY(&bdev->vbdevs)) {
165 			return bdev;
166 		} else {
167 			bdev = TAILQ_NEXT(bdev, link);
168 		}
169 	}
170 
171 	return bdev;
172 }
173 
174 struct spdk_bdev *
175 spdk_bdev_first_leaf(void)
176 {
177 	struct spdk_bdev *bdev;
178 
179 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
180 
181 	if (bdev) {
182 		SPDK_DEBUGLOG(SPDK_TRACE_DEBUG, "Starting bdev iteration at %s\n", bdev->name);
183 	}
184 
185 	return bdev;
186 }
187 
188 struct spdk_bdev *
189 spdk_bdev_next_leaf(struct spdk_bdev *prev)
190 {
191 	struct spdk_bdev *bdev;
192 
193 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
194 
195 	if (bdev) {
196 		SPDK_DEBUGLOG(SPDK_TRACE_DEBUG, "Continuing bdev iteration at %s\n", bdev->name);
197 	}
198 
199 	return bdev;
200 }
201 
202 struct spdk_bdev *
203 spdk_bdev_get_by_name(const char *bdev_name)
204 {
205 	struct spdk_bdev *bdev = spdk_bdev_first();
206 
207 	while (bdev != NULL) {
208 		if (strcmp(bdev_name, bdev->name) == 0) {
209 			return bdev;
210 		}
211 		bdev = spdk_bdev_next(bdev);
212 	}
213 
214 	return NULL;
215 }
216 
217 static void
218 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
219 {
220 	assert(bdev_io->get_buf_cb != NULL);
221 	assert(buf != NULL);
222 	assert(bdev_io->u.read.iovs != NULL);
223 
224 	bdev_io->buf = buf;
225 	bdev_io->u.read.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
226 	bdev_io->u.read.iovs[0].iov_len = bdev_io->u.read.len;
227 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
228 }
229 
230 static void
231 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
232 {
233 	struct spdk_mempool *pool;
234 	struct spdk_bdev_io *tmp;
235 	void *buf;
236 	need_buf_tailq_t *tailq;
237 	uint64_t length;
238 	struct spdk_bdev_mgmt_channel *ch;
239 
240 	assert(bdev_io->u.read.iovcnt == 1);
241 
242 	length = bdev_io->u.read.len;
243 	buf = bdev_io->buf;
244 
245 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
246 
247 	if (length <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
248 		pool = g_bdev_mgr.buf_small_pool;
249 		tailq = &ch->need_buf_small;
250 	} else {
251 		pool = g_bdev_mgr.buf_large_pool;
252 		tailq = &ch->need_buf_large;
253 	}
254 
255 	if (TAILQ_EMPTY(tailq)) {
256 		spdk_mempool_put(pool, buf);
257 	} else {
258 		tmp = TAILQ_FIRST(tailq);
259 		TAILQ_REMOVE(tailq, tmp, buf_link);
260 		spdk_bdev_io_set_buf(tmp, buf);
261 	}
262 }
263 
264 void
265 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb)
266 {
267 	uint64_t len = bdev_io->u.read.len;
268 	struct spdk_mempool *pool;
269 	need_buf_tailq_t *tailq;
270 	void *buf = NULL;
271 	struct spdk_bdev_mgmt_channel *ch;
272 
273 	assert(cb != NULL);
274 	assert(bdev_io->u.read.iovs != NULL);
275 
276 	if (spdk_unlikely(bdev_io->u.read.iovs[0].iov_base != NULL)) {
277 		/* Buffer already present */
278 		cb(bdev_io->ch->channel, bdev_io);
279 		return;
280 	}
281 
282 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
283 
284 	bdev_io->get_buf_cb = cb;
285 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
286 		pool = g_bdev_mgr.buf_small_pool;
287 		tailq = &ch->need_buf_small;
288 	} else {
289 		pool = g_bdev_mgr.buf_large_pool;
290 		tailq = &ch->need_buf_large;
291 	}
292 
293 	buf = spdk_mempool_get(pool);
294 
295 	if (!buf) {
296 		TAILQ_INSERT_TAIL(tailq, bdev_io, buf_link);
297 	} else {
298 		spdk_bdev_io_set_buf(bdev_io, buf);
299 	}
300 }
301 
302 static int
303 spdk_bdev_module_get_max_ctx_size(void)
304 {
305 	struct spdk_bdev_module_if *bdev_module;
306 	int max_bdev_module_size = 0;
307 
308 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
309 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
310 			max_bdev_module_size = bdev_module->get_ctx_size();
311 		}
312 	}
313 
314 	return max_bdev_module_size;
315 }
316 
317 void
318 spdk_bdev_config_text(FILE *fp)
319 {
320 	struct spdk_bdev_module_if *bdev_module;
321 
322 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
323 		if (bdev_module->config_text) {
324 			bdev_module->config_text(fp);
325 		}
326 	}
327 }
328 
329 static int
330 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
331 {
332 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
333 
334 	TAILQ_INIT(&ch->need_buf_small);
335 	TAILQ_INIT(&ch->need_buf_large);
336 
337 	return 0;
338 }
339 
340 static void
341 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
342 {
343 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
344 
345 	if (!TAILQ_EMPTY(&ch->need_buf_small) || !TAILQ_EMPTY(&ch->need_buf_large)) {
346 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel destruction\n");
347 	}
348 }
349 
350 static void
351 spdk_bdev_init_complete(int rc)
352 {
353 	spdk_bdev_init_cb cb_fn = g_cb_fn;
354 	void *cb_arg = g_cb_arg;
355 
356 	g_bdev_mgr.init_complete = true;
357 	g_cb_fn = NULL;
358 	g_cb_arg = NULL;
359 
360 	cb_fn(cb_arg, rc);
361 }
362 
363 static void
364 spdk_bdev_module_init_complete(int rc)
365 {
366 	struct spdk_bdev_module_if *m;
367 
368 	g_bdev_mgr.module_init_complete = true;
369 
370 	if (rc != 0) {
371 		spdk_bdev_init_complete(rc);
372 	}
373 
374 	/*
375 	 * Check all bdev modules for an examinations in progress.  If any
376 	 * exist, return immediately since we cannot finish bdev subsystem
377 	 * initialization until all are completed.
378 	 */
379 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
380 		if (m->examine_in_progress > 0) {
381 			return;
382 		}
383 	}
384 
385 	spdk_bdev_init_complete(0);
386 }
387 
388 static int
389 spdk_bdev_modules_init(void)
390 {
391 	struct spdk_bdev_module_if *module;
392 	int rc;
393 
394 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
395 		rc = module->module_init();
396 		if (rc != 0) {
397 			return rc;
398 		}
399 	}
400 
401 	return 0;
402 }
403 
404 void
405 spdk_bdev_poller_start(struct spdk_bdev_poller **ppoller,
406 		       spdk_bdev_poller_fn fn,
407 		       void *arg,
408 		       uint32_t lcore,
409 		       uint64_t period_microseconds)
410 {
411 	g_bdev_mgr.start_poller_fn(ppoller, fn, arg, lcore, period_microseconds);
412 }
413 
414 void
415 spdk_bdev_poller_stop(struct spdk_bdev_poller **ppoller)
416 {
417 	g_bdev_mgr.stop_poller_fn(ppoller);
418 }
419 
420 void
421 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg,
422 		     spdk_bdev_poller_start_cb start_poller_fn,
423 		     spdk_bdev_poller_stop_cb stop_poller_fn)
424 {
425 	int cache_size;
426 	int rc = 0;
427 	char mempool_name[32];
428 
429 	assert(cb_fn != NULL);
430 
431 	g_cb_fn = cb_fn;
432 	g_cb_arg = cb_arg;
433 
434 	g_bdev_mgr.start_poller_fn = start_poller_fn;
435 	g_bdev_mgr.stop_poller_fn = stop_poller_fn;
436 
437 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
438 
439 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
440 				  SPDK_BDEV_IO_POOL_SIZE,
441 				  sizeof(struct spdk_bdev_io) +
442 				  spdk_bdev_module_get_max_ctx_size(),
443 				  64,
444 				  SPDK_ENV_SOCKET_ID_ANY);
445 
446 	if (g_bdev_mgr.bdev_io_pool == NULL) {
447 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool\n");
448 		spdk_bdev_module_init_complete(-1);
449 		return;
450 	}
451 
452 	/**
453 	 * Ensure no more than half of the total buffers end up local caches, by
454 	 *   using spdk_env_get_core_count() to determine how many local caches we need
455 	 *   to account for.
456 	 */
457 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
458 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
459 
460 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
461 				    BUF_SMALL_POOL_SIZE,
462 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
463 				    cache_size,
464 				    SPDK_ENV_SOCKET_ID_ANY);
465 	if (!g_bdev_mgr.buf_small_pool) {
466 		SPDK_ERRLOG("create rbuf small pool failed\n");
467 		spdk_bdev_module_init_complete(-1);
468 		return;
469 	}
470 
471 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
472 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
473 
474 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
475 				    BUF_LARGE_POOL_SIZE,
476 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
477 				    cache_size,
478 				    SPDK_ENV_SOCKET_ID_ANY);
479 	if (!g_bdev_mgr.buf_large_pool) {
480 		SPDK_ERRLOG("create rbuf large pool failed\n");
481 		spdk_bdev_module_init_complete(-1);
482 		return;
483 	}
484 
485 #ifdef SPDK_CONFIG_VTUNE
486 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
487 #endif
488 
489 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
490 				spdk_bdev_mgmt_channel_destroy,
491 				sizeof(struct spdk_bdev_mgmt_channel));
492 
493 	rc = spdk_bdev_modules_init();
494 	spdk_bdev_module_init_complete(rc);
495 }
496 
497 int
498 spdk_bdev_finish(void)
499 {
500 	struct spdk_bdev_module_if *bdev_module;
501 
502 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
503 		if (bdev_module->module_fini) {
504 			bdev_module->module_fini();
505 		}
506 	}
507 
508 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
509 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
510 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
511 			    SPDK_BDEV_IO_POOL_SIZE);
512 	}
513 
514 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
515 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
516 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
517 			    BUF_SMALL_POOL_SIZE);
518 		assert(false);
519 	}
520 
521 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
522 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
523 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
524 			    BUF_LARGE_POOL_SIZE);
525 		assert(false);
526 	}
527 
528 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
529 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
530 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
531 
532 	spdk_io_device_unregister(&g_bdev_mgr, NULL);
533 
534 	return 0;
535 }
536 
537 struct spdk_bdev_io *
538 spdk_bdev_get_io(void)
539 {
540 	struct spdk_bdev_io *bdev_io;
541 
542 	bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
543 	if (!bdev_io) {
544 		SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
545 		abort();
546 	}
547 
548 	memset(bdev_io, 0, sizeof(*bdev_io));
549 
550 	return bdev_io;
551 }
552 
553 static void
554 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
555 {
556 	if (!bdev_io) {
557 		return;
558 	}
559 
560 	if (bdev_io->buf != NULL) {
561 		spdk_bdev_io_put_buf(bdev_io);
562 	}
563 
564 	spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
565 }
566 
567 static void
568 __submit_request(struct spdk_bdev *bdev, struct spdk_bdev_io *bdev_io)
569 {
570 	struct spdk_io_channel *ch;
571 
572 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
573 
574 	ch = bdev_io->ch->channel;
575 
576 	bdev_io->ch->io_outstanding++;
577 	bdev_io->in_submit_request = true;
578 	bdev->fn_table->submit_request(ch, bdev_io);
579 	bdev_io->in_submit_request = false;
580 }
581 
582 static int
583 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
584 {
585 	struct spdk_bdev *bdev = bdev_io->bdev;
586 
587 	__submit_request(bdev, bdev_io);
588 	return 0;
589 }
590 
591 void
592 spdk_bdev_io_resubmit(struct spdk_bdev_io *bdev_io, struct spdk_bdev_desc *new_bdev_desc)
593 {
594 	struct spdk_bdev *new_bdev = new_bdev_desc->bdev;
595 
596 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
597 	bdev_io->bdev = new_bdev;
598 
599 	/*
600 	 * This bdev_io was already submitted so decrement io_outstanding to ensure it
601 	 *  does not get double-counted.
602 	 */
603 	assert(bdev_io->ch->io_outstanding > 0);
604 	bdev_io->ch->io_outstanding--;
605 	__submit_request(new_bdev, bdev_io);
606 }
607 
608 static void
609 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
610 		  struct spdk_bdev *bdev, void *cb_arg,
611 		  spdk_bdev_io_completion_cb cb)
612 {
613 	bdev_io->bdev = bdev;
614 	bdev_io->caller_ctx = cb_arg;
615 	bdev_io->cb = cb;
616 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
617 	bdev_io->in_submit_request = false;
618 }
619 
620 bool
621 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
622 {
623 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
624 }
625 
626 int
627 spdk_bdev_dump_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
628 {
629 	if (bdev->fn_table->dump_config_json) {
630 		return bdev->fn_table->dump_config_json(bdev->ctxt, w);
631 	}
632 
633 	return 0;
634 }
635 
636 static int
637 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
638 {
639 	struct spdk_bdev		*bdev = io_device;
640 	struct spdk_bdev_channel	*ch = ctx_buf;
641 
642 	ch->bdev = io_device;
643 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
644 	ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr);
645 	memset(&ch->stat, 0, sizeof(ch->stat));
646 	ch->io_outstanding = 0;
647 
648 #ifdef SPDK_CONFIG_VTUNE
649 	{
650 		char *name;
651 		__itt_init_ittlib(NULL, 0);
652 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
653 		if (!name) {
654 			return -1;
655 		}
656 		ch->handle = __itt_string_handle_create(name);
657 		free(name);
658 		ch->start_tsc = spdk_get_ticks();
659 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
660 	}
661 #endif
662 
663 	return 0;
664 }
665 
666 static void
667 _spdk_bdev_abort_io(need_buf_tailq_t *queue, struct spdk_bdev_channel *ch)
668 {
669 	struct spdk_bdev_io *bdev_io, *tmp;
670 
671 	TAILQ_FOREACH_SAFE(bdev_io, queue, buf_link, tmp) {
672 		if (bdev_io->ch == ch) {
673 			TAILQ_REMOVE(queue, bdev_io, buf_link);
674 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
675 		}
676 	}
677 }
678 
679 static void
680 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
681 {
682 	struct spdk_bdev_channel	*ch = ctx_buf;
683 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
684 
685 	mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
686 
687 	_spdk_bdev_abort_io(&mgmt_channel->need_buf_small, ch);
688 	_spdk_bdev_abort_io(&mgmt_channel->need_buf_large, ch);
689 
690 	spdk_put_io_channel(ch->channel);
691 	spdk_put_io_channel(ch->mgmt_channel);
692 	assert(ch->io_outstanding == 0);
693 }
694 
695 struct spdk_io_channel *
696 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
697 {
698 	return spdk_get_io_channel(desc->bdev);
699 }
700 
701 const char *
702 spdk_bdev_get_name(const struct spdk_bdev *bdev)
703 {
704 	return bdev->name;
705 }
706 
707 const char *
708 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
709 {
710 	return bdev->product_name;
711 }
712 
713 uint32_t
714 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
715 {
716 	return bdev->blocklen;
717 }
718 
719 uint64_t
720 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
721 {
722 	return bdev->blockcnt;
723 }
724 
725 size_t
726 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
727 {
728 	/* TODO: push this logic down to the bdev modules */
729 	if (bdev->need_aligned_buffer) {
730 		return bdev->blocklen;
731 	}
732 
733 	return 1;
734 }
735 
736 uint32_t
737 spdk_bdev_get_optimal_io_boundary(const struct spdk_bdev *bdev)
738 {
739 	return bdev->optimal_io_boundary;
740 }
741 
742 bool
743 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
744 {
745 	return bdev->write_cache;
746 }
747 
748 static bool
749 spdk_bdev_io_valid(struct spdk_bdev *bdev, uint64_t offset, uint64_t nbytes)
750 {
751 	/* Return failure if offset is not a multiple of bdev->blocklen */
752 	if (offset % bdev->blocklen) {
753 		return false;
754 	}
755 
756 	/* Return failure if nbytes is not a multiple of bdev->blocklen */
757 	if (nbytes % bdev->blocklen) {
758 		return false;
759 	}
760 
761 	/* Return failure if offset + nbytes is less than offset; indicates there
762 	 * has been an overflow and hence the offset has been wrapped around */
763 	if (offset + nbytes < offset) {
764 		return false;
765 	}
766 
767 	/* Return failure if offset + nbytes exceeds the size of the bdev */
768 	if (offset + nbytes > bdev->blockcnt * bdev->blocklen) {
769 		return false;
770 	}
771 
772 	return true;
773 }
774 
775 int
776 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
777 	       void *buf, uint64_t offset, uint64_t nbytes,
778 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
779 {
780 	struct spdk_bdev *bdev = desc->bdev;
781 	struct spdk_bdev_io *bdev_io;
782 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
783 	int rc;
784 
785 	if (!spdk_bdev_io_valid(bdev, offset, nbytes)) {
786 		return -EINVAL;
787 	}
788 
789 	bdev_io = spdk_bdev_get_io();
790 	if (!bdev_io) {
791 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
792 		return -ENOMEM;
793 	}
794 
795 	bdev_io->ch = channel;
796 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
797 	bdev_io->u.read.iov.iov_base = buf;
798 	bdev_io->u.read.iov.iov_len = nbytes;
799 	bdev_io->u.read.iovs = &bdev_io->u.read.iov;
800 	bdev_io->u.read.iovcnt = 1;
801 	bdev_io->u.read.len = nbytes;
802 	bdev_io->u.read.offset = offset;
803 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
804 
805 	rc = spdk_bdev_io_submit(bdev_io);
806 	if (rc < 0) {
807 		spdk_bdev_put_io(bdev_io);
808 		return rc;
809 	}
810 
811 	return 0;
812 }
813 
814 int
815 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
816 		struct iovec *iov, int iovcnt,
817 		uint64_t offset, uint64_t nbytes,
818 		spdk_bdev_io_completion_cb cb, void *cb_arg)
819 {
820 	struct spdk_bdev *bdev = desc->bdev;
821 	struct spdk_bdev_io *bdev_io;
822 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
823 	int rc;
824 
825 	if (!spdk_bdev_io_valid(bdev, offset, nbytes)) {
826 		return -EINVAL;
827 	}
828 
829 	bdev_io = spdk_bdev_get_io();
830 	if (!bdev_io) {
831 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
832 		return -ENOMEM;
833 	}
834 
835 	bdev_io->ch = channel;
836 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
837 	bdev_io->u.read.iovs = iov;
838 	bdev_io->u.read.iovcnt = iovcnt;
839 	bdev_io->u.read.len = nbytes;
840 	bdev_io->u.read.offset = offset;
841 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
842 
843 	rc = spdk_bdev_io_submit(bdev_io);
844 	if (rc < 0) {
845 		spdk_bdev_put_io(bdev_io);
846 		return rc;
847 	}
848 
849 	return 0;
850 }
851 
852 int
853 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
854 		void *buf, uint64_t offset, uint64_t nbytes,
855 		spdk_bdev_io_completion_cb cb, void *cb_arg)
856 {
857 	struct spdk_bdev *bdev = desc->bdev;
858 	struct spdk_bdev_io *bdev_io;
859 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
860 	int rc;
861 
862 	if (!desc->write) {
863 		return -EBADF;
864 	}
865 
866 	if (!spdk_bdev_io_valid(bdev, offset, nbytes)) {
867 		return -EINVAL;
868 	}
869 
870 	bdev_io = spdk_bdev_get_io();
871 	if (!bdev_io) {
872 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
873 		return -ENOMEM;
874 	}
875 
876 	bdev_io->ch = channel;
877 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
878 	bdev_io->u.write.iov.iov_base = buf;
879 	bdev_io->u.write.iov.iov_len = nbytes;
880 	bdev_io->u.write.iovs = &bdev_io->u.write.iov;
881 	bdev_io->u.write.iovcnt = 1;
882 	bdev_io->u.write.len = nbytes;
883 	bdev_io->u.write.offset = offset;
884 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
885 
886 	rc = spdk_bdev_io_submit(bdev_io);
887 	if (rc < 0) {
888 		spdk_bdev_put_io(bdev_io);
889 		return rc;
890 	}
891 
892 	return 0;
893 }
894 
895 int
896 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
897 		 struct iovec *iov, int iovcnt,
898 		 uint64_t offset, uint64_t len,
899 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
900 {
901 	struct spdk_bdev *bdev = desc->bdev;
902 	struct spdk_bdev_io *bdev_io;
903 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
904 	int rc;
905 
906 	if (!desc->write) {
907 		return -EBADF;
908 	}
909 
910 	if (!spdk_bdev_io_valid(bdev, offset, len)) {
911 		return -EINVAL;
912 	}
913 
914 	bdev_io = spdk_bdev_get_io();
915 	if (!bdev_io) {
916 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
917 		return -ENOMEM;
918 	}
919 
920 	bdev_io->ch = channel;
921 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
922 	bdev_io->u.write.iovs = iov;
923 	bdev_io->u.write.iovcnt = iovcnt;
924 	bdev_io->u.write.len = len;
925 	bdev_io->u.write.offset = offset;
926 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
927 
928 	rc = spdk_bdev_io_submit(bdev_io);
929 	if (rc < 0) {
930 		spdk_bdev_put_io(bdev_io);
931 		return rc;
932 	}
933 
934 	return 0;
935 }
936 
937 int
938 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
939 		       uint64_t offset, uint64_t len,
940 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
941 {
942 	int rc;
943 	struct spdk_bdev *bdev = desc->bdev;
944 	struct spdk_bdev_io *bdev_io;
945 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
946 
947 	if (!spdk_bdev_io_valid(bdev, offset, len)) {
948 		return -EINVAL;
949 	}
950 
951 	bdev_io = spdk_bdev_get_io();
952 	if (!bdev_io) {
953 		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
954 		return -ENOMEM;
955 	}
956 
957 	bdev_io->ch = channel;
958 	bdev_io->u.write.len = len;
959 	bdev_io->u.write.offset = offset;
960 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
961 
962 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
963 
964 	rc = spdk_bdev_io_submit(bdev_io);
965 	if (rc < 0) {
966 		spdk_bdev_put_io(bdev_io);
967 		return rc;
968 	}
969 
970 	return 0;
971 }
972 
973 int
974 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
975 		uint64_t offset, uint64_t nbytes,
976 		spdk_bdev_io_completion_cb cb, void *cb_arg)
977 {
978 	struct spdk_bdev *bdev = desc->bdev;
979 	struct spdk_bdev_io *bdev_io;
980 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
981 	int rc;
982 
983 	if (!desc->write) {
984 		return -EBADF;
985 	}
986 
987 	if (!spdk_bdev_io_valid(bdev, offset, nbytes)) {
988 		return -EINVAL;
989 	}
990 
991 	if (nbytes == 0) {
992 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
993 		return -EINVAL;
994 	}
995 
996 	bdev_io = spdk_bdev_get_io();
997 	if (!bdev_io) {
998 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
999 		return -ENOMEM;
1000 	}
1001 
1002 	bdev_io->ch = channel;
1003 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1004 	bdev_io->u.unmap.offset = offset;
1005 	bdev_io->u.unmap.len = nbytes;
1006 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1007 
1008 	rc = spdk_bdev_io_submit(bdev_io);
1009 	if (rc < 0) {
1010 		spdk_bdev_put_io(bdev_io);
1011 		return rc;
1012 	}
1013 
1014 	return 0;
1015 }
1016 
1017 int
1018 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1019 		uint64_t offset, uint64_t length,
1020 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1021 {
1022 	struct spdk_bdev *bdev = desc->bdev;
1023 	struct spdk_bdev_io *bdev_io;
1024 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1025 	int rc;
1026 
1027 	if (!desc->write) {
1028 		return -EBADF;
1029 	}
1030 
1031 	bdev_io = spdk_bdev_get_io();
1032 	if (!bdev_io) {
1033 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1034 		return -ENOMEM;
1035 	}
1036 
1037 	bdev_io->ch = channel;
1038 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1039 	bdev_io->u.flush.offset = offset;
1040 	bdev_io->u.flush.len = length;
1041 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1042 
1043 	rc = spdk_bdev_io_submit(bdev_io);
1044 	if (rc < 0) {
1045 		spdk_bdev_put_io(bdev_io);
1046 		return rc;
1047 	}
1048 
1049 	return 0;
1050 }
1051 
1052 static void
1053 _spdk_bdev_reset_dev(void *io_device, void *ctx)
1054 {
1055 	struct spdk_bdev_io *bdev_io = ctx;
1056 	int rc;
1057 
1058 	rc = spdk_bdev_io_submit(bdev_io);
1059 	if (rc < 0) {
1060 		spdk_bdev_put_io(bdev_io);
1061 		SPDK_ERRLOG("reset failed\n");
1062 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1063 	}
1064 }
1065 
1066 static void
1067 _spdk_bdev_reset_abort_channel(void *io_device, struct spdk_io_channel *ch,
1068 			       void *ctx)
1069 {
1070 	struct spdk_bdev_channel	*channel;
1071 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1072 
1073 	channel = spdk_io_channel_get_ctx(ch);
1074 	mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
1075 
1076 	_spdk_bdev_abort_io(&mgmt_channel->need_buf_small, channel);
1077 	_spdk_bdev_abort_io(&mgmt_channel->need_buf_large, channel);
1078 }
1079 
1080 static void
1081 _spdk_bdev_start_reset(void *ctx)
1082 {
1083 	struct spdk_bdev_io *bdev_io = ctx;
1084 
1085 	spdk_for_each_channel(bdev_io->bdev, _spdk_bdev_reset_abort_channel,
1086 			      bdev_io, _spdk_bdev_reset_dev);
1087 }
1088 
1089 static void
1090 _spdk_bdev_start_next_reset(struct spdk_bdev *bdev)
1091 {
1092 	struct spdk_bdev_io *bdev_io;
1093 	struct spdk_thread *thread;
1094 
1095 	pthread_mutex_lock(&bdev->mutex);
1096 
1097 	if (bdev->reset_in_progress || TAILQ_EMPTY(&bdev->queued_resets)) {
1098 		pthread_mutex_unlock(&bdev->mutex);
1099 		return;
1100 	} else {
1101 		bdev_io = TAILQ_FIRST(&bdev->queued_resets);
1102 		TAILQ_REMOVE(&bdev->queued_resets, bdev_io, link);
1103 		bdev->reset_in_progress = true;
1104 		thread = spdk_io_channel_get_thread(bdev_io->ch->channel);
1105 		spdk_thread_send_msg(thread, _spdk_bdev_start_reset, bdev_io);
1106 	}
1107 
1108 	pthread_mutex_unlock(&bdev->mutex);
1109 }
1110 
1111 int
1112 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1113 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1114 {
1115 	struct spdk_bdev *bdev = desc->bdev;
1116 	struct spdk_bdev_io *bdev_io;
1117 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1118 
1119 	bdev_io = spdk_bdev_get_io();
1120 	if (!bdev_io) {
1121 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1122 		return -ENOMEM;;
1123 	}
1124 
1125 	bdev_io->ch = channel;
1126 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1127 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1128 
1129 	pthread_mutex_lock(&bdev->mutex);
1130 	TAILQ_INSERT_TAIL(&bdev->queued_resets, bdev_io, link);
1131 	pthread_mutex_unlock(&bdev->mutex);
1132 
1133 	_spdk_bdev_start_next_reset(bdev);
1134 
1135 	return 0;
1136 }
1137 
1138 void
1139 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1140 		      struct spdk_bdev_io_stat *stat)
1141 {
1142 #ifdef SPDK_CONFIG_VTUNE
1143 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1144 	memset(stat, 0, sizeof(*stat));
1145 	return;
1146 #endif
1147 
1148 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1149 
1150 	*stat = channel->stat;
1151 	memset(&channel->stat, 0, sizeof(channel->stat));
1152 }
1153 
1154 int
1155 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1156 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1157 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1158 {
1159 	struct spdk_bdev *bdev = desc->bdev;
1160 	struct spdk_bdev_io *bdev_io;
1161 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1162 	int rc;
1163 
1164 	if (!desc->write) {
1165 		return -EBADF;
1166 	}
1167 
1168 	bdev_io = spdk_bdev_get_io();
1169 	if (!bdev_io) {
1170 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1171 		return -ENOMEM;
1172 	}
1173 
1174 	bdev_io->ch = channel;
1175 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
1176 	bdev_io->u.nvme_passthru.cmd = *cmd;
1177 	bdev_io->u.nvme_passthru.buf = buf;
1178 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1179 
1180 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1181 
1182 	rc = spdk_bdev_io_submit(bdev_io);
1183 	if (rc < 0) {
1184 		spdk_bdev_put_io(bdev_io);
1185 		return rc;
1186 	}
1187 
1188 	return 0;
1189 }
1190 
1191 int
1192 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1193 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1194 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1195 {
1196 	struct spdk_bdev *bdev = desc->bdev;
1197 	struct spdk_bdev_io *bdev_io;
1198 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1199 	int rc;
1200 
1201 	if (!desc->write) {
1202 		/*
1203 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1204 		 *  to easily determine if the command is a read or write, but for now just
1205 		 *  do not allow io_passthru with a read-only descriptor.
1206 		 */
1207 		return -EBADF;
1208 	}
1209 
1210 	bdev_io = spdk_bdev_get_io();
1211 	if (!bdev_io) {
1212 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1213 		return -ENOMEM;
1214 	}
1215 
1216 	bdev_io->ch = channel;
1217 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
1218 	bdev_io->u.nvme_passthru.cmd = *cmd;
1219 	bdev_io->u.nvme_passthru.buf = buf;
1220 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1221 
1222 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1223 
1224 	rc = spdk_bdev_io_submit(bdev_io);
1225 	if (rc < 0) {
1226 		spdk_bdev_put_io(bdev_io);
1227 		return rc;
1228 	}
1229 
1230 	return 0;
1231 }
1232 
1233 int
1234 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
1235 {
1236 	if (!bdev_io) {
1237 		SPDK_ERRLOG("bdev_io is NULL\n");
1238 		return -1;
1239 	}
1240 
1241 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
1242 		SPDK_ERRLOG("bdev_io is in pending state\n");
1243 		assert(false);
1244 		return -1;
1245 	}
1246 
1247 	spdk_bdev_put_io(bdev_io);
1248 
1249 	return 0;
1250 }
1251 
1252 static void
1253 _spdk_bdev_io_complete(void *ctx)
1254 {
1255 	struct spdk_bdev_io *bdev_io = ctx;
1256 
1257 	assert(bdev_io->cb != NULL);
1258 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, bdev_io->caller_ctx);
1259 }
1260 
1261 void
1262 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
1263 {
1264 	bdev_io->status = status;
1265 
1266 	assert(bdev_io->ch->io_outstanding > 0);
1267 	bdev_io->ch->io_outstanding--;
1268 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
1269 		bdev_io->bdev->reset_in_progress = false;
1270 		_spdk_bdev_start_next_reset(bdev_io->bdev);
1271 	}
1272 
1273 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1274 		switch (bdev_io->type) {
1275 		case SPDK_BDEV_IO_TYPE_READ:
1276 			bdev_io->ch->stat.bytes_read += bdev_io->u.read.len;
1277 			bdev_io->ch->stat.num_read_ops++;
1278 			break;
1279 		case SPDK_BDEV_IO_TYPE_WRITE:
1280 			bdev_io->ch->stat.bytes_written += bdev_io->u.write.len;
1281 			bdev_io->ch->stat.num_write_ops++;
1282 			break;
1283 		default:
1284 			break;
1285 		}
1286 	}
1287 
1288 #ifdef SPDK_CONFIG_VTUNE
1289 	uint64_t now_tsc = spdk_get_ticks();
1290 	if (now_tsc > (bdev_io->ch->start_tsc + bdev_io->ch->interval_tsc)) {
1291 		uint64_t data[5];
1292 
1293 		data[0] = bdev_io->ch->stat.num_read_ops;
1294 		data[1] = bdev_io->ch->stat.bytes_read;
1295 		data[2] = bdev_io->ch->stat.num_write_ops;
1296 		data[3] = bdev_io->ch->stat.bytes_written;
1297 		data[4] = bdev_io->bdev->fn_table->get_spin_time ?
1298 			  bdev_io->bdev->fn_table->get_spin_time(bdev_io->ch->channel) : 0;
1299 
1300 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->ch->handle,
1301 				   __itt_metadata_u64, 5, data);
1302 
1303 		memset(&bdev_io->ch->stat, 0, sizeof(bdev_io->ch->stat));
1304 		bdev_io->ch->start_tsc = now_tsc;
1305 	}
1306 #endif
1307 
1308 	if (bdev_io->in_submit_request || bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
1309 		/*
1310 		 * Defer completion to avoid potential infinite recursion if the
1311 		 * user's completion callback issues a new I/O.
1312 		 */
1313 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->ch->channel),
1314 				     _spdk_bdev_io_complete, bdev_io);
1315 	} else {
1316 		_spdk_bdev_io_complete(bdev_io);
1317 	}
1318 }
1319 
1320 void
1321 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
1322 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
1323 {
1324 	if (sc == SPDK_SCSI_STATUS_GOOD) {
1325 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1326 	} else {
1327 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
1328 		bdev_io->error.scsi.sc = sc;
1329 		bdev_io->error.scsi.sk = sk;
1330 		bdev_io->error.scsi.asc = asc;
1331 		bdev_io->error.scsi.ascq = ascq;
1332 	}
1333 
1334 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1335 }
1336 
1337 void
1338 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
1339 			     int *sc, int *sk, int *asc, int *ascq)
1340 {
1341 	assert(sc != NULL);
1342 	assert(sk != NULL);
1343 	assert(asc != NULL);
1344 	assert(ascq != NULL);
1345 
1346 	switch (bdev_io->status) {
1347 	case SPDK_BDEV_IO_STATUS_SUCCESS:
1348 		*sc = SPDK_SCSI_STATUS_GOOD;
1349 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
1350 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1351 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1352 		break;
1353 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
1354 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
1355 		break;
1356 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
1357 		*sc = bdev_io->error.scsi.sc;
1358 		*sk = bdev_io->error.scsi.sk;
1359 		*asc = bdev_io->error.scsi.asc;
1360 		*ascq = bdev_io->error.scsi.ascq;
1361 		break;
1362 	default:
1363 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
1364 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
1365 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1366 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1367 		break;
1368 	}
1369 }
1370 
1371 void
1372 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
1373 {
1374 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
1375 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1376 	} else {
1377 		bdev_io->error.nvme.sct = sct;
1378 		bdev_io->error.nvme.sc = sc;
1379 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
1380 	}
1381 
1382 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1383 }
1384 
1385 void
1386 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
1387 {
1388 	assert(sct != NULL);
1389 	assert(sc != NULL);
1390 
1391 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
1392 		*sct = bdev_io->error.nvme.sct;
1393 		*sc = bdev_io->error.nvme.sc;
1394 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1395 		*sct = SPDK_NVME_SCT_GENERIC;
1396 		*sc = SPDK_NVME_SC_SUCCESS;
1397 	} else {
1398 		*sct = SPDK_NVME_SCT_GENERIC;
1399 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1400 	}
1401 }
1402 
1403 static void
1404 _spdk_bdev_register(struct spdk_bdev *bdev)
1405 {
1406 	struct spdk_bdev_module_if *module;
1407 
1408 	assert(bdev->module != NULL);
1409 
1410 	bdev->status = SPDK_BDEV_STATUS_READY;
1411 
1412 	TAILQ_INIT(&bdev->open_descs);
1413 	bdev->bdev_opened_for_write = false;
1414 
1415 	TAILQ_INIT(&bdev->vbdevs);
1416 	TAILQ_INIT(&bdev->base_bdevs);
1417 
1418 	bdev->reset_in_progress = false;
1419 	TAILQ_INIT(&bdev->queued_resets);
1420 
1421 	spdk_io_device_register(bdev, spdk_bdev_channel_create, spdk_bdev_channel_destroy,
1422 				sizeof(struct spdk_bdev_channel));
1423 
1424 	pthread_mutex_init(&bdev->mutex, NULL);
1425 	SPDK_DEBUGLOG(SPDK_TRACE_DEBUG, "Inserting bdev %s into list\n", bdev->name);
1426 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
1427 
1428 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
1429 		if (module->examine) {
1430 			module->examine_in_progress++;
1431 			module->examine(bdev);
1432 		}
1433 	}
1434 }
1435 
1436 void
1437 spdk_bdev_register(struct spdk_bdev *bdev)
1438 {
1439 	_spdk_bdev_register(bdev);
1440 }
1441 
1442 void
1443 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
1444 {
1445 	int i;
1446 
1447 	_spdk_bdev_register(vbdev);
1448 	for (i = 0; i < base_bdev_count; i++) {
1449 		assert(base_bdevs[i] != NULL);
1450 		TAILQ_INSERT_TAIL(&vbdev->base_bdevs, base_bdevs[i], base_bdev_link);
1451 		TAILQ_INSERT_TAIL(&base_bdevs[i]->vbdevs, vbdev, vbdev_link);
1452 	}
1453 }
1454 
1455 void
1456 spdk_bdev_unregister(struct spdk_bdev *bdev)
1457 {
1458 	struct spdk_bdev_desc	*desc, *tmp;
1459 	int			rc;
1460 	bool			do_destruct = true;
1461 
1462 	SPDK_DEBUGLOG(SPDK_TRACE_DEBUG, "Removing bdev %s from list\n", bdev->name);
1463 
1464 	pthread_mutex_lock(&bdev->mutex);
1465 
1466 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
1467 
1468 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
1469 		if (desc->remove_cb) {
1470 			pthread_mutex_unlock(&bdev->mutex);
1471 			do_destruct = false;
1472 			desc->remove_cb(desc->remove_ctx);
1473 			pthread_mutex_lock(&bdev->mutex);
1474 		}
1475 	}
1476 
1477 	if (!do_destruct) {
1478 		pthread_mutex_unlock(&bdev->mutex);
1479 		return;
1480 	}
1481 
1482 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
1483 	pthread_mutex_unlock(&bdev->mutex);
1484 
1485 	pthread_mutex_destroy(&bdev->mutex);
1486 
1487 	spdk_io_device_unregister(bdev, NULL);
1488 
1489 	rc = bdev->fn_table->destruct(bdev->ctxt);
1490 	if (rc < 0) {
1491 		SPDK_ERRLOG("destruct failed\n");
1492 	}
1493 }
1494 
1495 void
1496 spdk_vbdev_unregister(struct spdk_bdev *vbdev)
1497 {
1498 	struct spdk_bdev *base_bdev;
1499 
1500 	assert(!TAILQ_EMPTY(&vbdev->base_bdevs));
1501 	TAILQ_FOREACH(base_bdev, &vbdev->base_bdevs, base_bdev_link) {
1502 		TAILQ_REMOVE(&base_bdev->vbdevs, vbdev, vbdev_link);
1503 	}
1504 	spdk_bdev_unregister(vbdev);
1505 }
1506 
1507 void
1508 spdk_bdev_module_examine_done(struct spdk_bdev_module_if *module)
1509 {
1510 	struct spdk_bdev_module_if *m;
1511 
1512 	assert(module->examine_in_progress > 0);
1513 	module->examine_in_progress--;
1514 
1515 	/*
1516 	 * Check all bdev modules for an examinations in progress.  If any
1517 	 * exist, return immediately since we cannot finish bdev subsystem
1518 	 * initialization until all are completed.
1519 	 */
1520 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
1521 		if (m->examine_in_progress > 0) {
1522 			return;
1523 		}
1524 	}
1525 
1526 	if (g_bdev_mgr.module_init_complete && !g_bdev_mgr.init_complete) {
1527 		/*
1528 		 * Modules already finished initialization - now that all
1529 		 * the bdev moduless have finished their asynchronous I/O
1530 		 * processing, the entire bdev layer can be marked as complete.
1531 		 */
1532 		spdk_bdev_init_complete(0);
1533 	}
1534 }
1535 
1536 int
1537 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
1538 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
1539 {
1540 	struct spdk_bdev_desc *desc;
1541 
1542 	desc = calloc(1, sizeof(*desc));
1543 	if (desc == NULL) {
1544 		return -ENOMEM;
1545 	}
1546 
1547 	pthread_mutex_lock(&bdev->mutex);
1548 
1549 	if (write && (bdev->bdev_opened_for_write || bdev->claim_module)) {
1550 		SPDK_ERRLOG("failed, %s already opened for write or claimed\n", bdev->name);
1551 		free(desc);
1552 		pthread_mutex_unlock(&bdev->mutex);
1553 		return -EPERM;
1554 	}
1555 
1556 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
1557 
1558 	if (write) {
1559 		bdev->bdev_opened_for_write = true;
1560 	}
1561 
1562 	desc->bdev = bdev;
1563 	desc->remove_cb = remove_cb;
1564 	desc->remove_ctx = remove_ctx;
1565 	desc->write = write;
1566 	*_desc = desc;
1567 
1568 	pthread_mutex_unlock(&bdev->mutex);
1569 
1570 	return 0;
1571 }
1572 
1573 void
1574 spdk_bdev_close(struct spdk_bdev_desc *desc)
1575 {
1576 	struct spdk_bdev *bdev = desc->bdev;
1577 	bool do_unregister = false;
1578 
1579 	pthread_mutex_lock(&bdev->mutex);
1580 
1581 	if (desc->write) {
1582 		assert(bdev->bdev_opened_for_write);
1583 		bdev->bdev_opened_for_write = false;
1584 	}
1585 
1586 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
1587 	free(desc);
1588 
1589 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
1590 		do_unregister = true;
1591 	}
1592 	pthread_mutex_unlock(&bdev->mutex);
1593 
1594 	if (do_unregister == true) {
1595 		spdk_bdev_unregister(bdev);
1596 	}
1597 }
1598 
1599 int
1600 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
1601 			    struct spdk_bdev_module_if *module)
1602 {
1603 	if (bdev->claim_module != NULL) {
1604 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
1605 			    bdev->claim_module->name);
1606 		return -EPERM;
1607 	}
1608 
1609 	if ((!desc || !desc->write) && bdev->bdev_opened_for_write) {
1610 		SPDK_ERRLOG("bdev %s already opened with write access\n", bdev->name);
1611 		return -EPERM;
1612 	}
1613 
1614 	if (desc && !desc->write) {
1615 		bdev->bdev_opened_for_write = true;
1616 		desc->write = true;
1617 	}
1618 
1619 	bdev->claim_module = module;
1620 	return 0;
1621 }
1622 
1623 void
1624 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
1625 {
1626 	assert(bdev->claim_module != NULL);
1627 	bdev->claim_module = NULL;
1628 }
1629 
1630 struct spdk_bdev *
1631 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
1632 {
1633 	return desc->bdev;
1634 }
1635 
1636 void
1637 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
1638 {
1639 	struct iovec *iovs;
1640 	int iovcnt;
1641 
1642 	if (bdev_io == NULL) {
1643 		return;
1644 	}
1645 
1646 	switch (bdev_io->type) {
1647 	case SPDK_BDEV_IO_TYPE_READ:
1648 		iovs = bdev_io->u.read.iovs;
1649 		iovcnt = bdev_io->u.read.iovcnt;
1650 		break;
1651 	case SPDK_BDEV_IO_TYPE_WRITE:
1652 		iovs = bdev_io->u.write.iovs;
1653 		iovcnt = bdev_io->u.write.iovcnt;
1654 		break;
1655 	default:
1656 		iovs = NULL;
1657 		iovcnt = 0;
1658 		break;
1659 	}
1660 
1661 	if (iovp) {
1662 		*iovp = iovs;
1663 	}
1664 	if (iovcntp) {
1665 		*iovcntp = iovcnt;
1666 	}
1667 }
1668 
1669 void
1670 spdk_bdev_module_list_add(struct spdk_bdev_module_if *bdev_module)
1671 {
1672 	/*
1673 	 * Modules with examine callbacks must be initialized first, so they are
1674 	 *  ready to handle examine callbacks from later modules that will
1675 	 *  register physical bdevs.
1676 	 */
1677 	if (bdev_module->examine != NULL) {
1678 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
1679 	} else {
1680 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
1681 	}
1682 }
1683