xref: /spdk/lib/bdev/bdev.c (revision a6014eb2adf0c95816b23ef94a911005fa047511)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/likely.h"
42 #include "spdk/queue.h"
43 #include "spdk/nvme_spec.h"
44 #include "spdk/scsi_spec.h"
45 
46 #include "spdk_internal/bdev.h"
47 #include "spdk_internal/log.h"
48 #include "spdk/string.h"
49 
50 #ifdef SPDK_CONFIG_VTUNE
51 #include "ittnotify.h"
52 #endif
53 
54 #define SPDK_BDEV_IO_POOL_SIZE	(64 * 1024)
55 #define BUF_SMALL_POOL_SIZE	8192
56 #define BUF_LARGE_POOL_SIZE	1024
57 
58 typedef TAILQ_HEAD(, spdk_bdev_io) need_buf_tailq_t;
59 
60 struct spdk_bdev_mgr {
61 	struct spdk_mempool *bdev_io_pool;
62 
63 	struct spdk_mempool *buf_small_pool;
64 	struct spdk_mempool *buf_large_pool;
65 
66 	TAILQ_HEAD(, spdk_bdev_module_if) bdev_modules;
67 	TAILQ_HEAD(, spdk_bdev_module_if) vbdev_modules;
68 
69 	TAILQ_HEAD(, spdk_bdev) bdevs;
70 
71 	spdk_bdev_poller_start_cb start_poller_fn;
72 	spdk_bdev_poller_stop_cb stop_poller_fn;
73 
74 	bool init_complete;
75 	bool module_init_complete;
76 	int module_init_rc;
77 
78 #ifdef SPDK_CONFIG_VTUNE
79 	__itt_domain	*domain;
80 #endif
81 };
82 
83 static struct spdk_bdev_mgr g_bdev_mgr = {
84 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
85 	.vbdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.vbdev_modules),
86 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
87 	.start_poller_fn = NULL,
88 	.stop_poller_fn = NULL,
89 	.init_complete = false,
90 	.module_init_complete = false,
91 	.module_init_rc = 0,
92 };
93 
94 static struct spdk_bdev_module_if *g_next_bdev_module;
95 static struct spdk_bdev_module_if *g_next_vbdev_module;
96 static spdk_bdev_init_cb	g_cb_fn = NULL;
97 static void			*g_cb_arg = NULL;
98 
99 
100 struct spdk_bdev_mgmt_channel {
101 	need_buf_tailq_t need_buf_small;
102 	need_buf_tailq_t need_buf_large;
103 };
104 
105 struct spdk_bdev_desc {
106 	struct spdk_bdev		*bdev;
107 	spdk_bdev_remove_cb_t		remove_cb;
108 	void				*remove_ctx;
109 	bool				write;
110 	TAILQ_ENTRY(spdk_bdev_desc)	link;
111 };
112 
113 struct spdk_bdev_channel {
114 	struct spdk_bdev	*bdev;
115 
116 	/* The channel for the underlying device */
117 	struct spdk_io_channel	*channel;
118 
119 	/* Channel for the bdev manager */
120 	struct spdk_io_channel *mgmt_channel;
121 
122 	struct spdk_bdev_io_stat stat;
123 
124 	/*
125 	 * Count of I/O submitted to bdev module and waiting for completion.
126 	 * Incremented before submit_request() is called on an spdk_bdev_io.
127 	 */
128 	uint64_t		io_outstanding;
129 
130 #ifdef SPDK_CONFIG_VTUNE
131 	uint64_t		start_tsc;
132 	uint64_t		interval_tsc;
133 	__itt_string_handle	*handle;
134 #endif
135 
136 };
137 
138 struct spdk_bdev *
139 spdk_bdev_first(void)
140 {
141 	struct spdk_bdev *bdev;
142 
143 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
144 	if (bdev) {
145 		SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Starting bdev iteration at %s\n", bdev->name);
146 	}
147 
148 	return bdev;
149 }
150 
151 struct spdk_bdev *
152 spdk_bdev_next(struct spdk_bdev *prev)
153 {
154 	struct spdk_bdev *bdev;
155 
156 	bdev = TAILQ_NEXT(prev, link);
157 	if (bdev) {
158 		SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Continuing bdev iteration at %s\n", bdev->name);
159 	}
160 
161 	return bdev;
162 }
163 
164 static struct spdk_bdev *
165 _bdev_next_leaf(struct spdk_bdev *bdev)
166 {
167 	while (bdev != NULL) {
168 		if (TAILQ_EMPTY(&bdev->vbdevs)) {
169 			return bdev;
170 		} else {
171 			bdev = TAILQ_NEXT(bdev, link);
172 		}
173 	}
174 
175 	return bdev;
176 }
177 
178 struct spdk_bdev *
179 spdk_bdev_first_leaf(void)
180 {
181 	struct spdk_bdev *bdev;
182 
183 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
184 
185 	if (bdev) {
186 		SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Starting bdev iteration at %s\n", bdev->name);
187 	}
188 
189 	return bdev;
190 }
191 
192 struct spdk_bdev *
193 spdk_bdev_next_leaf(struct spdk_bdev *prev)
194 {
195 	struct spdk_bdev *bdev;
196 
197 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
198 
199 	if (bdev) {
200 		SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Continuing bdev iteration at %s\n", bdev->name);
201 	}
202 
203 	return bdev;
204 }
205 
206 struct spdk_bdev *
207 spdk_bdev_get_by_name(const char *bdev_name)
208 {
209 	struct spdk_bdev *bdev = spdk_bdev_first();
210 
211 	while (bdev != NULL) {
212 		if (strcmp(bdev_name, bdev->name) == 0) {
213 			return bdev;
214 		}
215 		bdev = spdk_bdev_next(bdev);
216 	}
217 
218 	return NULL;
219 }
220 
221 static void
222 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
223 {
224 	assert(bdev_io->get_buf_cb != NULL);
225 	assert(buf != NULL);
226 	assert(bdev_io->u.read.iovs != NULL);
227 
228 	bdev_io->buf = buf;
229 	bdev_io->u.read.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
230 	bdev_io->u.read.iovs[0].iov_len = bdev_io->u.read.len;
231 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
232 }
233 
234 static void
235 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
236 {
237 	struct spdk_mempool *pool;
238 	struct spdk_bdev_io *tmp;
239 	void *buf;
240 	need_buf_tailq_t *tailq;
241 	uint64_t length;
242 	struct spdk_bdev_mgmt_channel *ch;
243 
244 	assert(bdev_io->u.read.iovcnt == 1);
245 
246 	length = bdev_io->u.read.len;
247 	buf = bdev_io->buf;
248 
249 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
250 
251 	if (length <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
252 		pool = g_bdev_mgr.buf_small_pool;
253 		tailq = &ch->need_buf_small;
254 	} else {
255 		pool = g_bdev_mgr.buf_large_pool;
256 		tailq = &ch->need_buf_large;
257 	}
258 
259 	if (TAILQ_EMPTY(tailq)) {
260 		spdk_mempool_put(pool, buf);
261 	} else {
262 		tmp = TAILQ_FIRST(tailq);
263 		TAILQ_REMOVE(tailq, tmp, buf_link);
264 		spdk_bdev_io_set_buf(tmp, buf);
265 	}
266 }
267 
268 void
269 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb)
270 {
271 	uint64_t len = bdev_io->u.read.len;
272 	struct spdk_mempool *pool;
273 	need_buf_tailq_t *tailq;
274 	void *buf = NULL;
275 	struct spdk_bdev_mgmt_channel *ch;
276 
277 	assert(cb != NULL);
278 	assert(bdev_io->u.read.iovs != NULL);
279 
280 	if (spdk_unlikely(bdev_io->u.read.iovs[0].iov_base != NULL)) {
281 		/* Buffer already present */
282 		cb(bdev_io->ch->channel, bdev_io);
283 		return;
284 	}
285 
286 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
287 
288 	bdev_io->get_buf_cb = cb;
289 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
290 		pool = g_bdev_mgr.buf_small_pool;
291 		tailq = &ch->need_buf_small;
292 	} else {
293 		pool = g_bdev_mgr.buf_large_pool;
294 		tailq = &ch->need_buf_large;
295 	}
296 
297 	buf = spdk_mempool_get(pool);
298 
299 	if (!buf) {
300 		TAILQ_INSERT_TAIL(tailq, bdev_io, buf_link);
301 	} else {
302 		spdk_bdev_io_set_buf(bdev_io, buf);
303 	}
304 }
305 
306 static int
307 spdk_bdev_module_get_max_ctx_size(void)
308 {
309 	struct spdk_bdev_module_if *bdev_module;
310 	int max_bdev_module_size = 0;
311 
312 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
313 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
314 			max_bdev_module_size = bdev_module->get_ctx_size();
315 		}
316 	}
317 
318 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.vbdev_modules, tailq) {
319 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
320 			max_bdev_module_size = bdev_module->get_ctx_size();
321 		}
322 	}
323 
324 	return max_bdev_module_size;
325 }
326 
327 void
328 spdk_bdev_config_text(FILE *fp)
329 {
330 	struct spdk_bdev_module_if *bdev_module;
331 
332 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
333 		if (bdev_module->config_text) {
334 			bdev_module->config_text(fp);
335 		}
336 	}
337 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.vbdev_modules, tailq) {
338 		if (bdev_module->config_text) {
339 			bdev_module->config_text(fp);
340 		}
341 	}
342 }
343 
344 static int
345 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
346 {
347 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
348 
349 	TAILQ_INIT(&ch->need_buf_small);
350 	TAILQ_INIT(&ch->need_buf_large);
351 
352 	return 0;
353 }
354 
355 static void
356 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
357 {
358 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
359 
360 	if (!TAILQ_EMPTY(&ch->need_buf_small) || !TAILQ_EMPTY(&ch->need_buf_large)) {
361 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel destruction\n");
362 	}
363 }
364 
365 static void
366 spdk_bdev_init_complete(int rc)
367 {
368 	spdk_bdev_init_cb cb_fn = g_cb_fn;
369 	void *cb_arg = g_cb_arg;
370 
371 	g_bdev_mgr.init_complete = true;
372 	g_cb_fn = NULL;
373 	g_cb_arg = NULL;
374 
375 	cb_fn(cb_arg, rc);
376 }
377 
378 static void
379 spdk_bdev_module_init_complete(int rc)
380 {
381 	struct spdk_bdev_module_if *m;
382 
383 	g_bdev_mgr.module_init_complete = true;
384 	g_bdev_mgr.module_init_rc = rc;
385 
386 	/*
387 	 * Check all vbdev modules for an examinations in progress.  If any
388 	 * exist, return immediately since we cannot finish bdev subsystem
389 	 * initialization until all are completed.
390 	 */
391 	TAILQ_FOREACH(m, &g_bdev_mgr.vbdev_modules, tailq) {
392 		if (m->examine_in_progress > 0) {
393 			return;
394 		}
395 	}
396 
397 	spdk_bdev_init_complete(rc);
398 }
399 
400 void
401 spdk_bdev_module_init_next(int rc)
402 {
403 	if (rc) {
404 		assert(g_next_bdev_module != NULL);
405 		SPDK_ERRLOG("Failed to init bdev module: %s\n", g_next_bdev_module->name);
406 		spdk_bdev_module_init_complete(rc);
407 		return;
408 	}
409 
410 	if (!g_next_bdev_module) {
411 		g_next_bdev_module = TAILQ_FIRST(&g_bdev_mgr.bdev_modules);
412 	} else {
413 		g_next_bdev_module = TAILQ_NEXT(g_next_bdev_module, tailq);
414 	}
415 
416 	if (g_next_bdev_module) {
417 		g_next_bdev_module->module_init();
418 	} else {
419 		spdk_bdev_module_init_complete(rc);
420 	}
421 }
422 
423 void
424 spdk_vbdev_module_init_next(int rc)
425 {
426 	if (rc) {
427 		assert(g_next_vbdev_module != NULL);
428 		SPDK_ERRLOG("Failed to init vbdev module: %s\n", g_next_vbdev_module->name);
429 		spdk_bdev_module_init_complete(rc);
430 		return;
431 	}
432 
433 	if (!g_next_vbdev_module) {
434 		g_next_vbdev_module = TAILQ_FIRST(&g_bdev_mgr.vbdev_modules);
435 	} else {
436 		g_next_vbdev_module = TAILQ_NEXT(g_next_vbdev_module, tailq);
437 	}
438 
439 	if (g_next_vbdev_module) {
440 		g_next_vbdev_module->module_init();
441 	} else {
442 		spdk_bdev_module_init_next(0);
443 	}
444 }
445 
446 void
447 spdk_bdev_poller_start(struct spdk_bdev_poller **ppoller,
448 		       spdk_bdev_poller_fn fn,
449 		       void *arg,
450 		       uint32_t lcore,
451 		       uint64_t period_microseconds)
452 {
453 	g_bdev_mgr.start_poller_fn(ppoller, fn, arg, lcore, period_microseconds);
454 }
455 
456 void
457 spdk_bdev_poller_stop(struct spdk_bdev_poller **ppoller)
458 {
459 	g_bdev_mgr.stop_poller_fn(ppoller);
460 }
461 
462 void
463 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg,
464 		     spdk_bdev_poller_start_cb start_poller_fn,
465 		     spdk_bdev_poller_stop_cb stop_poller_fn)
466 {
467 	int cache_size;
468 	int rc = 0;
469 
470 	assert(cb_fn != NULL);
471 
472 	g_cb_fn = cb_fn;
473 	g_cb_arg = cb_arg;
474 
475 	g_bdev_mgr.start_poller_fn = start_poller_fn;
476 	g_bdev_mgr.stop_poller_fn = stop_poller_fn;
477 
478 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create("blockdev_io",
479 				  SPDK_BDEV_IO_POOL_SIZE,
480 				  sizeof(struct spdk_bdev_io) +
481 				  spdk_bdev_module_get_max_ctx_size(),
482 				  64,
483 				  SPDK_ENV_SOCKET_ID_ANY);
484 
485 	if (g_bdev_mgr.bdev_io_pool == NULL) {
486 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool");
487 		rc = -1;
488 		goto end;
489 	}
490 
491 	/**
492 	 * Ensure no more than half of the total buffers end up local caches, by
493 	 *   using spdk_env_get_core_count() to determine how many local caches we need
494 	 *   to account for.
495 	 */
496 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
497 	g_bdev_mgr.buf_small_pool = spdk_mempool_create("buf_small_pool",
498 				    BUF_SMALL_POOL_SIZE,
499 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
500 				    cache_size,
501 				    SPDK_ENV_SOCKET_ID_ANY);
502 	if (!g_bdev_mgr.buf_small_pool) {
503 		SPDK_ERRLOG("create rbuf small pool failed\n");
504 		rc = -1;
505 		goto end;
506 	}
507 
508 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
509 	g_bdev_mgr.buf_large_pool = spdk_mempool_create("buf_large_pool",
510 				    BUF_LARGE_POOL_SIZE,
511 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
512 				    cache_size,
513 				    SPDK_ENV_SOCKET_ID_ANY);
514 	if (!g_bdev_mgr.buf_large_pool) {
515 		SPDK_ERRLOG("create rbuf large pool failed\n");
516 		rc = -1;
517 		goto end;
518 	}
519 
520 #ifdef SPDK_CONFIG_VTUNE
521 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
522 #endif
523 
524 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
525 				spdk_bdev_mgmt_channel_destroy,
526 				sizeof(struct spdk_bdev_mgmt_channel));
527 
528 end:
529 	spdk_vbdev_module_init_next(rc);
530 }
531 
532 int
533 spdk_bdev_finish(void)
534 {
535 	struct spdk_bdev_module_if *bdev_module;
536 
537 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.vbdev_modules, tailq) {
538 		if (bdev_module->module_fini) {
539 			bdev_module->module_fini();
540 		}
541 	}
542 
543 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
544 		if (bdev_module->module_fini) {
545 			bdev_module->module_fini();
546 		}
547 	}
548 
549 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
550 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
551 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
552 			    SPDK_BDEV_IO_POOL_SIZE);
553 	}
554 
555 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
556 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
557 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
558 			    BUF_SMALL_POOL_SIZE);
559 		assert(false);
560 	}
561 
562 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
563 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
564 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
565 			    BUF_LARGE_POOL_SIZE);
566 		assert(false);
567 	}
568 
569 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
570 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
571 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
572 
573 	spdk_io_device_unregister(&g_bdev_mgr);
574 
575 	return 0;
576 }
577 
578 struct spdk_bdev_io *
579 spdk_bdev_get_io(void)
580 {
581 	struct spdk_bdev_io *bdev_io;
582 
583 	bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
584 	if (!bdev_io) {
585 		SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
586 		abort();
587 	}
588 
589 	memset(bdev_io, 0, sizeof(*bdev_io));
590 
591 	return bdev_io;
592 }
593 
594 static void
595 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
596 {
597 	if (!bdev_io) {
598 		return;
599 	}
600 
601 	if (bdev_io->buf != NULL) {
602 		spdk_bdev_io_put_buf(bdev_io);
603 	}
604 
605 	spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
606 }
607 
608 static void
609 __submit_request(struct spdk_bdev *bdev, struct spdk_bdev_io *bdev_io)
610 {
611 	struct spdk_io_channel *ch;
612 
613 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
614 
615 	ch = bdev_io->ch->channel;
616 
617 	bdev_io->ch->io_outstanding++;
618 	bdev_io->in_submit_request = true;
619 	bdev->fn_table->submit_request(ch, bdev_io);
620 	bdev_io->in_submit_request = false;
621 }
622 
623 static int
624 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
625 {
626 	struct spdk_bdev *bdev = bdev_io->bdev;
627 
628 	__submit_request(bdev, bdev_io);
629 	return 0;
630 }
631 
632 void
633 spdk_bdev_io_resubmit(struct spdk_bdev_io *bdev_io, struct spdk_bdev *new_bdev)
634 {
635 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
636 	bdev_io->bdev = new_bdev;
637 
638 	/*
639 	 * These fields are normally set during spdk_bdev_io_init(), but since bdev is
640 	 * being switched, they need to be reinitialized.
641 	 */
642 	bdev_io->gencnt = new_bdev->gencnt;
643 
644 	/*
645 	 * This bdev_io was already submitted so decrement io_outstanding to ensure it
646 	 *  does not get double-counted.
647 	 */
648 	assert(bdev_io->ch->io_outstanding > 0);
649 	bdev_io->ch->io_outstanding--;
650 	__submit_request(new_bdev, bdev_io);
651 }
652 
653 static void
654 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
655 		  struct spdk_bdev *bdev, void *cb_arg,
656 		  spdk_bdev_io_completion_cb cb)
657 {
658 	bdev_io->bdev = bdev;
659 	bdev_io->caller_ctx = cb_arg;
660 	bdev_io->cb = cb;
661 	bdev_io->gencnt = bdev->gencnt;
662 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
663 	bdev_io->in_submit_request = false;
664 }
665 
666 bool
667 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
668 {
669 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
670 }
671 
672 int
673 spdk_bdev_dump_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
674 {
675 	if (bdev->fn_table->dump_config_json) {
676 		return bdev->fn_table->dump_config_json(bdev->ctxt, w);
677 	}
678 
679 	return 0;
680 }
681 
682 static int
683 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
684 {
685 	struct spdk_bdev		*bdev = io_device;
686 	struct spdk_bdev_channel	*ch = ctx_buf;
687 
688 	ch->bdev = io_device;
689 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
690 	ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr);
691 	memset(&ch->stat, 0, sizeof(ch->stat));
692 	ch->io_outstanding = 0;
693 
694 #ifdef SPDK_CONFIG_VTUNE
695 	{
696 		char *name;
697 
698 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
699 		if (!name) {
700 			return -1;
701 		}
702 		ch->handle = __itt_string_handle_create(name);
703 		free(name);
704 		ch->start_tsc = spdk_get_ticks();
705 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
706 	}
707 #endif
708 
709 	return 0;
710 }
711 
712 static void
713 _spdk_bdev_abort_io(need_buf_tailq_t *queue, struct spdk_bdev_channel *ch)
714 {
715 	struct spdk_bdev_io *bdev_io, *tmp;
716 
717 	TAILQ_FOREACH_SAFE(bdev_io, queue, buf_link, tmp) {
718 		if (bdev_io->ch == ch) {
719 			TAILQ_REMOVE(queue, bdev_io, buf_link);
720 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
721 		}
722 	}
723 }
724 
725 static void
726 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
727 {
728 	struct spdk_bdev_channel	*ch = ctx_buf;
729 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
730 
731 	mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
732 
733 	_spdk_bdev_abort_io(&mgmt_channel->need_buf_small, ch);
734 	_spdk_bdev_abort_io(&mgmt_channel->need_buf_large, ch);
735 
736 	spdk_put_io_channel(ch->channel);
737 	spdk_put_io_channel(ch->mgmt_channel);
738 	assert(ch->io_outstanding == 0);
739 }
740 
741 struct spdk_io_channel *
742 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
743 {
744 	return spdk_get_io_channel(desc->bdev);
745 }
746 
747 const char *
748 spdk_bdev_get_name(const struct spdk_bdev *bdev)
749 {
750 	return bdev->name;
751 }
752 
753 const char *
754 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
755 {
756 	return bdev->product_name;
757 }
758 
759 uint32_t
760 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
761 {
762 	return bdev->blocklen;
763 }
764 
765 uint64_t
766 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
767 {
768 	return bdev->blockcnt;
769 }
770 
771 uint32_t
772 spdk_bdev_get_max_unmap_descriptors(const struct spdk_bdev *bdev)
773 {
774 	return bdev->max_unmap_bdesc_count;
775 }
776 
777 size_t
778 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
779 {
780 	/* TODO: push this logic down to the bdev modules */
781 	if (bdev->need_aligned_buffer) {
782 		return bdev->blocklen;
783 	}
784 
785 	return 1;
786 }
787 
788 bool
789 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
790 {
791 	return bdev->write_cache;
792 }
793 
794 static int
795 spdk_bdev_io_valid(struct spdk_bdev *bdev, uint64_t offset, uint64_t nbytes)
796 {
797 	/* Return failure if nbytes is not a multiple of bdev->blocklen */
798 	if (nbytes % bdev->blocklen) {
799 		return -1;
800 	}
801 
802 	/* Return failure if offset + nbytes is less than offset; indicates there
803 	 * has been an overflow and hence the offset has been wrapped around */
804 	if (offset + nbytes < offset) {
805 		return -1;
806 	}
807 
808 	/* Return failure if offset + nbytes exceeds the size of the blockdev */
809 	if (offset + nbytes > bdev->blockcnt * bdev->blocklen) {
810 		return -1;
811 	}
812 
813 	return 0;
814 }
815 
816 int
817 spdk_bdev_read(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
818 	       void *buf, uint64_t offset, uint64_t nbytes,
819 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
820 {
821 	struct spdk_bdev_io *bdev_io;
822 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
823 	int rc;
824 
825 	if (spdk_bdev_io_valid(bdev, offset, nbytes) != 0) {
826 		return -EINVAL;
827 	}
828 
829 	bdev_io = spdk_bdev_get_io();
830 	if (!bdev_io) {
831 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
832 		return -ENOMEM;
833 	}
834 
835 	bdev_io->ch = channel;
836 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
837 	bdev_io->u.read.iov.iov_base = buf;
838 	bdev_io->u.read.iov.iov_len = nbytes;
839 	bdev_io->u.read.iovs = &bdev_io->u.read.iov;
840 	bdev_io->u.read.iovcnt = 1;
841 	bdev_io->u.read.len = nbytes;
842 	bdev_io->u.read.offset = offset;
843 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
844 
845 	rc = spdk_bdev_io_submit(bdev_io);
846 	if (rc < 0) {
847 		spdk_bdev_put_io(bdev_io);
848 		return rc;
849 	}
850 
851 	return 0;
852 }
853 
854 int
855 spdk_bdev_readv(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
856 		struct iovec *iov, int iovcnt,
857 		uint64_t offset, uint64_t nbytes,
858 		spdk_bdev_io_completion_cb cb, void *cb_arg)
859 {
860 	struct spdk_bdev_io *bdev_io;
861 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
862 	int rc;
863 
864 	if (spdk_bdev_io_valid(bdev, offset, nbytes) != 0) {
865 		return -EINVAL;
866 	}
867 
868 	bdev_io = spdk_bdev_get_io();
869 	if (!bdev_io) {
870 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
871 		return -ENOMEM;
872 	}
873 
874 	bdev_io->ch = channel;
875 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
876 	bdev_io->u.read.iovs = iov;
877 	bdev_io->u.read.iovcnt = iovcnt;
878 	bdev_io->u.read.len = nbytes;
879 	bdev_io->u.read.offset = offset;
880 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
881 
882 	rc = spdk_bdev_io_submit(bdev_io);
883 	if (rc < 0) {
884 		spdk_bdev_put_io(bdev_io);
885 		return rc;
886 	}
887 
888 	return 0;
889 }
890 
891 int
892 spdk_bdev_write(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
893 		void *buf, uint64_t offset, uint64_t nbytes,
894 		spdk_bdev_io_completion_cb cb, void *cb_arg)
895 {
896 	struct spdk_bdev_io *bdev_io;
897 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
898 	int rc;
899 
900 	if (spdk_bdev_io_valid(bdev, offset, nbytes) != 0) {
901 		return -EINVAL;
902 	}
903 
904 	bdev_io = spdk_bdev_get_io();
905 	if (!bdev_io) {
906 		SPDK_ERRLOG("blockdev_io memory allocation failed duing write\n");
907 		return -ENOMEM;
908 	}
909 
910 	bdev_io->ch = channel;
911 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
912 	bdev_io->u.write.iov.iov_base = buf;
913 	bdev_io->u.write.iov.iov_len = nbytes;
914 	bdev_io->u.write.iovs = &bdev_io->u.write.iov;
915 	bdev_io->u.write.iovcnt = 1;
916 	bdev_io->u.write.len = nbytes;
917 	bdev_io->u.write.offset = offset;
918 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
919 
920 	rc = spdk_bdev_io_submit(bdev_io);
921 	if (rc < 0) {
922 		spdk_bdev_put_io(bdev_io);
923 		return rc;
924 	}
925 
926 	return 0;
927 }
928 
929 int
930 spdk_bdev_writev(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
931 		 struct iovec *iov, int iovcnt,
932 		 uint64_t offset, uint64_t len,
933 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
934 {
935 	struct spdk_bdev_io *bdev_io;
936 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
937 	int rc;
938 
939 	if (spdk_bdev_io_valid(bdev, offset, len) != 0) {
940 		return -EINVAL;
941 	}
942 
943 	bdev_io = spdk_bdev_get_io();
944 	if (!bdev_io) {
945 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
946 		return -ENOMEM;
947 	}
948 
949 	bdev_io->ch = channel;
950 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
951 	bdev_io->u.write.iovs = iov;
952 	bdev_io->u.write.iovcnt = iovcnt;
953 	bdev_io->u.write.len = len;
954 	bdev_io->u.write.offset = offset;
955 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
956 
957 	rc = spdk_bdev_io_submit(bdev_io);
958 	if (rc < 0) {
959 		spdk_bdev_put_io(bdev_io);
960 		return rc;
961 	}
962 
963 	return 0;
964 }
965 
966 int
967 spdk_bdev_unmap(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
968 		struct spdk_scsi_unmap_bdesc *unmap_d,
969 		uint16_t bdesc_count,
970 		spdk_bdev_io_completion_cb cb, void *cb_arg)
971 {
972 	struct spdk_bdev_io *bdev_io;
973 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
974 	int rc;
975 
976 	if (bdesc_count == 0) {
977 		SPDK_ERRLOG("Invalid bdesc_count 0\n");
978 		return -EINVAL;
979 	}
980 
981 	if (bdesc_count > bdev->max_unmap_bdesc_count) {
982 		SPDK_ERRLOG("Invalid bdesc_count %u > max_unmap_bdesc_count %u\n",
983 			    bdesc_count, bdev->max_unmap_bdesc_count);
984 		return -EINVAL;
985 	}
986 
987 	bdev_io = spdk_bdev_get_io();
988 	if (!bdev_io) {
989 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
990 		return -ENOMEM;
991 	}
992 
993 	bdev_io->ch = channel;
994 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
995 	bdev_io->u.unmap.unmap_bdesc = unmap_d;
996 	bdev_io->u.unmap.bdesc_count = bdesc_count;
997 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
998 
999 	rc = spdk_bdev_io_submit(bdev_io);
1000 	if (rc < 0) {
1001 		spdk_bdev_put_io(bdev_io);
1002 		return rc;
1003 	}
1004 
1005 	return 0;
1006 }
1007 
1008 int
1009 spdk_bdev_flush(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1010 		uint64_t offset, uint64_t length,
1011 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1012 {
1013 	struct spdk_bdev_io *bdev_io;
1014 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1015 	int rc;
1016 
1017 	bdev_io = spdk_bdev_get_io();
1018 	if (!bdev_io) {
1019 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1020 		return -ENOMEM;
1021 	}
1022 
1023 	bdev_io->ch = channel;
1024 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1025 	bdev_io->u.flush.offset = offset;
1026 	bdev_io->u.flush.length = length;
1027 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1028 
1029 	rc = spdk_bdev_io_submit(bdev_io);
1030 	if (rc < 0) {
1031 		spdk_bdev_put_io(bdev_io);
1032 		return rc;
1033 	}
1034 
1035 	return 0;
1036 }
1037 
1038 static void
1039 _spdk_bdev_reset_dev(void *io_device, void *ctx)
1040 {
1041 	struct spdk_bdev_io *bdev_io = ctx;
1042 	int rc;
1043 
1044 	rc = spdk_bdev_io_submit(bdev_io);
1045 	if (rc < 0) {
1046 		spdk_bdev_put_io(bdev_io);
1047 		SPDK_ERRLOG("reset failed\n");
1048 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1049 	}
1050 }
1051 
1052 static void
1053 _spdk_bdev_reset_abort_channel(void *io_device, struct spdk_io_channel *ch,
1054 			       void *ctx)
1055 {
1056 	struct spdk_bdev_channel	*channel;
1057 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1058 
1059 	channel = spdk_io_channel_get_ctx(ch);
1060 	mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
1061 
1062 	_spdk_bdev_abort_io(&mgmt_channel->need_buf_small, channel);
1063 	_spdk_bdev_abort_io(&mgmt_channel->need_buf_large, channel);
1064 }
1065 
1066 static void
1067 _spdk_bdev_start_reset(void *ctx)
1068 {
1069 	struct spdk_bdev_io *bdev_io = ctx;
1070 
1071 	spdk_for_each_channel(bdev_io->bdev, _spdk_bdev_reset_abort_channel,
1072 			      bdev_io, _spdk_bdev_reset_dev);
1073 }
1074 
1075 static void
1076 _spdk_bdev_start_next_reset(struct spdk_bdev *bdev)
1077 {
1078 	struct spdk_bdev_io *bdev_io;
1079 	struct spdk_thread *thread;
1080 
1081 	pthread_mutex_lock(&bdev->mutex);
1082 
1083 	if (bdev->reset_in_progress || TAILQ_EMPTY(&bdev->queued_resets)) {
1084 		pthread_mutex_unlock(&bdev->mutex);
1085 		return;
1086 	} else {
1087 		bdev_io = TAILQ_FIRST(&bdev->queued_resets);
1088 		TAILQ_REMOVE(&bdev->queued_resets, bdev_io, link);
1089 		bdev->reset_in_progress = true;
1090 		thread = spdk_io_channel_get_thread(bdev_io->ch->channel);
1091 		spdk_thread_send_msg(thread, _spdk_bdev_start_reset, bdev_io);
1092 	}
1093 
1094 	pthread_mutex_unlock(&bdev->mutex);
1095 }
1096 
1097 int
1098 spdk_bdev_reset(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1099 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1100 {
1101 	struct spdk_bdev_io *bdev_io;
1102 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1103 
1104 	bdev_io = spdk_bdev_get_io();
1105 	if (!bdev_io) {
1106 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1107 		return -ENOMEM;;
1108 	}
1109 
1110 	bdev_io->ch = channel;
1111 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1112 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1113 
1114 	pthread_mutex_lock(&bdev->mutex);
1115 	TAILQ_INSERT_TAIL(&bdev->queued_resets, bdev_io, link);
1116 	pthread_mutex_unlock(&bdev->mutex);
1117 
1118 	_spdk_bdev_start_next_reset(bdev);
1119 
1120 	return 0;
1121 }
1122 
1123 void
1124 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1125 		      struct spdk_bdev_io_stat *stat)
1126 {
1127 #ifdef SPDK_CONFIG_VTUNE
1128 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1129 	memset(stat, 0, sizeof(*stat));
1130 	return;
1131 #endif
1132 
1133 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1134 
1135 	*stat = channel->stat;
1136 	memset(&channel->stat, 0, sizeof(channel->stat));
1137 }
1138 
1139 int
1140 spdk_bdev_nvme_admin_passthru(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1141 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1142 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1143 {
1144 	struct spdk_bdev_io *bdev_io;
1145 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1146 	int rc;
1147 
1148 	bdev_io = spdk_bdev_get_io();
1149 	if (!bdev_io) {
1150 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1151 		return -ENOMEM;
1152 	}
1153 
1154 	bdev_io->ch = channel;
1155 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
1156 	bdev_io->u.nvme_passthru.cmd = *cmd;
1157 	bdev_io->u.nvme_passthru.buf = buf;
1158 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1159 
1160 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1161 
1162 	rc = spdk_bdev_io_submit(bdev_io);
1163 	if (rc < 0) {
1164 		spdk_bdev_put_io(bdev_io);
1165 		return rc;
1166 	}
1167 
1168 	return 0;
1169 }
1170 
1171 int
1172 spdk_bdev_nvme_io_passthru(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1173 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1174 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1175 {
1176 	struct spdk_bdev_io *bdev_io;
1177 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1178 	int rc;
1179 
1180 	bdev_io = spdk_bdev_get_io();
1181 	if (!bdev_io) {
1182 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1183 		return -ENOMEM;
1184 	}
1185 
1186 	bdev_io->ch = channel;
1187 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
1188 	bdev_io->u.nvme_passthru.cmd = *cmd;
1189 	bdev_io->u.nvme_passthru.buf = buf;
1190 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1191 
1192 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1193 
1194 	rc = spdk_bdev_io_submit(bdev_io);
1195 	if (rc < 0) {
1196 		spdk_bdev_put_io(bdev_io);
1197 		return rc;
1198 	}
1199 
1200 	return 0;
1201 }
1202 
1203 int
1204 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
1205 {
1206 	if (!bdev_io) {
1207 		SPDK_ERRLOG("bdev_io is NULL\n");
1208 		return -1;
1209 	}
1210 
1211 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
1212 		SPDK_ERRLOG("bdev_io is in pending state\n");
1213 		assert(false);
1214 		return -1;
1215 	}
1216 
1217 	spdk_bdev_put_io(bdev_io);
1218 
1219 	return 0;
1220 }
1221 
1222 static void
1223 _spdk_bdev_io_complete(void *ctx)
1224 {
1225 	struct spdk_bdev_io *bdev_io = ctx;
1226 
1227 	assert(bdev_io->cb != NULL);
1228 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, bdev_io->caller_ctx);
1229 }
1230 
1231 void
1232 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
1233 {
1234 	bdev_io->status = status;
1235 
1236 	assert(bdev_io->ch->io_outstanding > 0);
1237 	bdev_io->ch->io_outstanding--;
1238 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
1239 		/* Successful reset */
1240 		if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1241 			/* Increase the blockdev generation */
1242 			bdev_io->bdev->gencnt++;
1243 		}
1244 		bdev_io->bdev->reset_in_progress = false;
1245 		_spdk_bdev_start_next_reset(bdev_io->bdev);
1246 	} else {
1247 		/*
1248 		 * Check the gencnt, to see if this I/O was issued before the most
1249 		 * recent reset. If the gencnt is not equal, then just free the I/O
1250 		 * without calling the callback, since the caller will have already
1251 		 * freed its context for this I/O.
1252 		 */
1253 		if (bdev_io->bdev->gencnt != bdev_io->gencnt) {
1254 			spdk_bdev_put_io(bdev_io);
1255 			return;
1256 		}
1257 	}
1258 
1259 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1260 		switch (bdev_io->type) {
1261 		case SPDK_BDEV_IO_TYPE_READ:
1262 			bdev_io->ch->stat.bytes_read += bdev_io->u.read.len;
1263 			bdev_io->ch->stat.num_read_ops++;
1264 			break;
1265 		case SPDK_BDEV_IO_TYPE_WRITE:
1266 			bdev_io->ch->stat.bytes_written += bdev_io->u.write.len;
1267 			bdev_io->ch->stat.num_write_ops++;
1268 			break;
1269 		default:
1270 			break;
1271 		}
1272 	}
1273 
1274 #ifdef SPDK_CONFIG_VTUNE
1275 	uint64_t now_tsc = spdk_get_ticks();
1276 	if (now_tsc > (bdev_io->ch->start_tsc + bdev_io->ch->interval_tsc)) {
1277 		uint64_t data[4];
1278 
1279 		data[0] = bdev_io->ch->stat.num_read_ops;
1280 		data[1] = bdev_io->ch->stat.bytes_read;
1281 		data[2] = bdev_io->ch->stat.num_write_ops;
1282 		data[3] = bdev_io->ch->stat.bytes_written;
1283 
1284 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->ch->handle,
1285 				   __itt_metadata_u64, 4, data);
1286 
1287 		memset(&bdev_io->ch->stat, 0, sizeof(bdev_io->ch->stat));
1288 		bdev_io->ch->start_tsc = now_tsc;
1289 	}
1290 #endif
1291 
1292 	if (bdev_io->in_submit_request || bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
1293 		/*
1294 		 * Defer completion to avoid potential infinite recursion if the
1295 		 * user's completion callback issues a new I/O.
1296 		 */
1297 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->ch->channel),
1298 				     _spdk_bdev_io_complete, bdev_io);
1299 	} else {
1300 		_spdk_bdev_io_complete(bdev_io);
1301 	}
1302 }
1303 
1304 void
1305 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
1306 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
1307 {
1308 	if (sc == SPDK_SCSI_STATUS_GOOD) {
1309 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1310 	} else {
1311 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
1312 		bdev_io->error.scsi.sc = sc;
1313 		bdev_io->error.scsi.sk = sk;
1314 		bdev_io->error.scsi.asc = asc;
1315 		bdev_io->error.scsi.ascq = ascq;
1316 	}
1317 
1318 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1319 }
1320 
1321 void
1322 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
1323 			     int *sc, int *sk, int *asc, int *ascq)
1324 {
1325 	assert(sc != NULL);
1326 	assert(sk != NULL);
1327 	assert(asc != NULL);
1328 	assert(ascq != NULL);
1329 
1330 	switch (bdev_io->status) {
1331 	case SPDK_BDEV_IO_STATUS_SUCCESS:
1332 		*sc = SPDK_SCSI_STATUS_GOOD;
1333 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
1334 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1335 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1336 		break;
1337 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
1338 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
1339 		break;
1340 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
1341 		*sc = bdev_io->error.scsi.sc;
1342 		*sk = bdev_io->error.scsi.sk;
1343 		*asc = bdev_io->error.scsi.asc;
1344 		*ascq = bdev_io->error.scsi.ascq;
1345 		break;
1346 	default:
1347 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
1348 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
1349 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1350 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1351 		break;
1352 	}
1353 }
1354 
1355 void
1356 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
1357 {
1358 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
1359 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1360 	} else {
1361 		bdev_io->error.nvme.sct = sct;
1362 		bdev_io->error.nvme.sc = sc;
1363 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
1364 	}
1365 
1366 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1367 }
1368 
1369 void
1370 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
1371 {
1372 	assert(sct != NULL);
1373 	assert(sc != NULL);
1374 
1375 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
1376 		*sct = bdev_io->error.nvme.sct;
1377 		*sc = bdev_io->error.nvme.sc;
1378 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1379 		*sct = SPDK_NVME_SCT_GENERIC;
1380 		*sc = SPDK_NVME_SC_SUCCESS;
1381 	} else {
1382 		*sct = SPDK_NVME_SCT_GENERIC;
1383 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1384 	}
1385 }
1386 
1387 static void
1388 _spdk_bdev_register(struct spdk_bdev *bdev)
1389 {
1390 	struct spdk_bdev_module_if *vbdev_module;
1391 
1392 	assert(bdev->module != NULL);
1393 
1394 	bdev->status = SPDK_BDEV_STATUS_READY;
1395 
1396 	/* initialize the reset generation value to zero */
1397 	bdev->gencnt = 0;
1398 	TAILQ_INIT(&bdev->open_descs);
1399 	bdev->bdev_opened_for_write = false;
1400 	bdev->vbdevs_opened_for_write = 0;
1401 
1402 	TAILQ_INIT(&bdev->vbdevs);
1403 	TAILQ_INIT(&bdev->base_bdevs);
1404 
1405 	bdev->reset_in_progress = false;
1406 	TAILQ_INIT(&bdev->queued_resets);
1407 
1408 	spdk_io_device_register(bdev, spdk_bdev_channel_create, spdk_bdev_channel_destroy,
1409 				sizeof(struct spdk_bdev_channel));
1410 
1411 	pthread_mutex_init(&bdev->mutex, NULL);
1412 	SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Inserting bdev %s into list\n", bdev->name);
1413 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
1414 
1415 	TAILQ_FOREACH(vbdev_module, &g_bdev_mgr.vbdev_modules, tailq) {
1416 		vbdev_module->examine_in_progress++;
1417 		vbdev_module->examine(bdev);
1418 	}
1419 }
1420 
1421 void
1422 spdk_bdev_register(struct spdk_bdev *bdev)
1423 {
1424 	_spdk_bdev_register(bdev);
1425 }
1426 
1427 void
1428 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
1429 {
1430 	int i;
1431 
1432 	_spdk_bdev_register(vbdev);
1433 	for (i = 0; i < base_bdev_count; i++) {
1434 		assert(base_bdevs[i] != NULL);
1435 		TAILQ_INSERT_TAIL(&vbdev->base_bdevs, base_bdevs[i], base_bdev_link);
1436 		TAILQ_INSERT_TAIL(&base_bdevs[i]->vbdevs, vbdev, vbdev_link);
1437 	}
1438 }
1439 
1440 void
1441 spdk_bdev_unregister(struct spdk_bdev *bdev)
1442 {
1443 	struct spdk_bdev_desc	*desc, *tmp;
1444 	int			rc;
1445 
1446 	SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Removing bdev %s from list\n", bdev->name);
1447 
1448 	pthread_mutex_lock(&bdev->mutex);
1449 
1450 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
1451 
1452 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
1453 		if (desc->remove_cb) {
1454 			pthread_mutex_unlock(&bdev->mutex);
1455 			desc->remove_cb(desc->remove_ctx);
1456 			pthread_mutex_lock(&bdev->mutex);
1457 		}
1458 	}
1459 
1460 	if (!TAILQ_EMPTY(&bdev->open_descs)) {
1461 		pthread_mutex_unlock(&bdev->mutex);
1462 		return;
1463 	}
1464 
1465 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
1466 	pthread_mutex_unlock(&bdev->mutex);
1467 
1468 	pthread_mutex_destroy(&bdev->mutex);
1469 
1470 	spdk_io_device_unregister(bdev);
1471 
1472 	rc = bdev->fn_table->destruct(bdev->ctxt);
1473 	if (rc < 0) {
1474 		SPDK_ERRLOG("destruct failed\n");
1475 	}
1476 }
1477 
1478 void
1479 spdk_vbdev_unregister(struct spdk_bdev *vbdev)
1480 {
1481 	struct spdk_bdev *base_bdev;
1482 
1483 	assert(!TAILQ_EMPTY(&vbdev->base_bdevs));
1484 	TAILQ_FOREACH(base_bdev, &vbdev->base_bdevs, base_bdev_link) {
1485 		TAILQ_REMOVE(&base_bdev->vbdevs, vbdev, vbdev_link);
1486 	}
1487 	spdk_bdev_unregister(vbdev);
1488 }
1489 
1490 void
1491 spdk_vbdev_module_examine_done(struct spdk_bdev_module_if *module)
1492 {
1493 	struct spdk_bdev_module_if *m;
1494 
1495 	assert(module->examine_in_progress > 0);
1496 	module->examine_in_progress--;
1497 
1498 	/*
1499 	 * Check all vbdev modules for an examinations in progress.  If any
1500 	 * exist, return immediately since we cannot finish bdev subsystem
1501 	 * initialization until all are completed.
1502 	 */
1503 	TAILQ_FOREACH(m, &g_bdev_mgr.vbdev_modules, tailq) {
1504 		if (m->examine_in_progress > 0) {
1505 			return;
1506 		}
1507 	}
1508 
1509 	if (g_bdev_mgr.module_init_complete && !g_bdev_mgr.init_complete) {
1510 		/*
1511 		 * Modules already finished initialization - now that all
1512 		 * the vbdevs have finished their asynchronous I/O processing,
1513 		 * the entire bdev layer can be marked as complete.
1514 		 */
1515 		spdk_bdev_init_complete(g_bdev_mgr.module_init_rc);
1516 	}
1517 }
1518 
1519 static bool
1520 __is_bdev_opened_for_write(struct spdk_bdev *bdev)
1521 {
1522 	struct spdk_bdev *base;
1523 
1524 	if (bdev->bdev_opened_for_write) {
1525 		return true;
1526 	}
1527 
1528 	TAILQ_FOREACH(base, &bdev->base_bdevs, base_bdev_link) {
1529 		if (__is_bdev_opened_for_write(base)) {
1530 			return true;
1531 		}
1532 	}
1533 
1534 	return false;
1535 }
1536 
1537 static void
1538 __modify_write_counts(struct spdk_bdev *bdev, int mod)
1539 {
1540 	struct spdk_bdev *base;
1541 
1542 	TAILQ_FOREACH(base, &bdev->base_bdevs, base_bdev_link) {
1543 		base->vbdevs_opened_for_write += mod;
1544 		__modify_write_counts(base, mod);
1545 	}
1546 }
1547 
1548 int
1549 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
1550 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
1551 {
1552 	struct spdk_bdev_desc *desc;
1553 
1554 	desc = calloc(1, sizeof(*desc));
1555 	if (desc == NULL) {
1556 		return -ENOMEM;
1557 	}
1558 
1559 	pthread_mutex_lock(&bdev->mutex);
1560 
1561 	if (write && (__is_bdev_opened_for_write(bdev) || bdev->vbdevs_opened_for_write > 0)) {
1562 		SPDK_ERRLOG("failed, %s (or one of its virtual bdevs) already opened for write\n", bdev->name);
1563 		free(desc);
1564 		pthread_mutex_unlock(&bdev->mutex);
1565 		return -EPERM;
1566 	}
1567 
1568 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
1569 
1570 	if (write) {
1571 		bdev->bdev_opened_for_write = true;
1572 		__modify_write_counts(bdev, 1);
1573 	}
1574 
1575 	desc->bdev = bdev;
1576 	desc->remove_cb = remove_cb;
1577 	desc->remove_ctx = remove_ctx;
1578 	desc->write = write;
1579 	*_desc = desc;
1580 
1581 	pthread_mutex_unlock(&bdev->mutex);
1582 
1583 	return 0;
1584 }
1585 
1586 void
1587 spdk_bdev_close(struct spdk_bdev_desc *desc)
1588 {
1589 	struct spdk_bdev *bdev = desc->bdev;
1590 	bool do_unregister = false;
1591 
1592 	pthread_mutex_lock(&bdev->mutex);
1593 
1594 	if (desc->write) {
1595 		assert(bdev->bdev_opened_for_write);
1596 		bdev->bdev_opened_for_write = false;
1597 		__modify_write_counts(bdev, -1);
1598 	}
1599 
1600 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
1601 	free(desc);
1602 
1603 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING) {
1604 		do_unregister = true;
1605 	}
1606 	pthread_mutex_unlock(&bdev->mutex);
1607 
1608 	if (do_unregister == true) {
1609 		spdk_bdev_unregister(bdev);
1610 	}
1611 }
1612 
1613 void
1614 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
1615 {
1616 	struct iovec *iovs;
1617 	int iovcnt;
1618 
1619 	if (bdev_io == NULL) {
1620 		return;
1621 	}
1622 
1623 	switch (bdev_io->type) {
1624 	case SPDK_BDEV_IO_TYPE_READ:
1625 		iovs = bdev_io->u.read.iovs;
1626 		iovcnt = bdev_io->u.read.iovcnt;
1627 		break;
1628 	case SPDK_BDEV_IO_TYPE_WRITE:
1629 		iovs = bdev_io->u.write.iovs;
1630 		iovcnt = bdev_io->u.write.iovcnt;
1631 		break;
1632 	default:
1633 		iovs = NULL;
1634 		iovcnt = 0;
1635 		break;
1636 	}
1637 
1638 	if (iovp) {
1639 		*iovp = iovs;
1640 	}
1641 	if (iovcntp) {
1642 		*iovcntp = iovcnt;
1643 	}
1644 }
1645 
1646 void
1647 spdk_bdev_module_list_add(struct spdk_bdev_module_if *bdev_module)
1648 {
1649 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
1650 }
1651 
1652 void
1653 spdk_vbdev_module_list_add(struct spdk_bdev_module_if *vbdev_module)
1654 {
1655 	assert(vbdev_module->examine != NULL);
1656 	TAILQ_INSERT_TAIL(&g_bdev_mgr.vbdev_modules, vbdev_module, tailq);
1657 }
1658