xref: /spdk/lib/bdev/bdev.c (revision 5177ed40c4d2f431114a707f9275da765f84d9d1)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/likely.h"
42 #include "spdk/queue.h"
43 #include "spdk/nvme_spec.h"
44 #include "spdk/scsi_spec.h"
45 
46 #include "spdk_internal/bdev.h"
47 #include "spdk_internal/log.h"
48 #include "spdk/string.h"
49 
50 #ifdef SPDK_CONFIG_VTUNE
51 #include "ittnotify.h"
52 #endif
53 
54 #define SPDK_BDEV_IO_POOL_SIZE	(64 * 1024)
55 #define BUF_SMALL_POOL_SIZE	8192
56 #define BUF_LARGE_POOL_SIZE	1024
57 
58 typedef TAILQ_HEAD(, spdk_bdev_io) need_buf_tailq_t;
59 
60 struct spdk_bdev_mgr {
61 	struct spdk_mempool *bdev_io_pool;
62 
63 	struct spdk_mempool *buf_small_pool;
64 	struct spdk_mempool *buf_large_pool;
65 
66 	TAILQ_HEAD(, spdk_bdev_module_if) bdev_modules;
67 
68 	TAILQ_HEAD(, spdk_bdev) bdevs;
69 
70 	spdk_bdev_poller_start_cb start_poller_fn;
71 	spdk_bdev_poller_stop_cb stop_poller_fn;
72 
73 	bool init_complete;
74 	bool module_init_complete;
75 
76 #ifdef SPDK_CONFIG_VTUNE
77 	__itt_domain	*domain;
78 #endif
79 };
80 
81 static struct spdk_bdev_mgr g_bdev_mgr = {
82 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
83 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
84 	.start_poller_fn = NULL,
85 	.stop_poller_fn = NULL,
86 	.init_complete = false,
87 	.module_init_complete = false,
88 };
89 
90 static spdk_bdev_init_cb	g_cb_fn = NULL;
91 static void			*g_cb_arg = NULL;
92 
93 
94 struct spdk_bdev_mgmt_channel {
95 	need_buf_tailq_t need_buf_small;
96 	need_buf_tailq_t need_buf_large;
97 };
98 
99 struct spdk_bdev_desc {
100 	struct spdk_bdev		*bdev;
101 	spdk_bdev_remove_cb_t		remove_cb;
102 	void				*remove_ctx;
103 	bool				write;
104 	TAILQ_ENTRY(spdk_bdev_desc)	link;
105 };
106 
107 struct spdk_bdev_channel {
108 	struct spdk_bdev	*bdev;
109 
110 	/* The channel for the underlying device */
111 	struct spdk_io_channel	*channel;
112 
113 	/* Channel for the bdev manager */
114 	struct spdk_io_channel *mgmt_channel;
115 
116 	struct spdk_bdev_io_stat stat;
117 
118 	/*
119 	 * Count of I/O submitted to bdev module and waiting for completion.
120 	 * Incremented before submit_request() is called on an spdk_bdev_io.
121 	 */
122 	uint64_t		io_outstanding;
123 
124 #ifdef SPDK_CONFIG_VTUNE
125 	uint64_t		start_tsc;
126 	uint64_t		interval_tsc;
127 	__itt_string_handle	*handle;
128 #endif
129 
130 };
131 
132 struct spdk_bdev *
133 spdk_bdev_first(void)
134 {
135 	struct spdk_bdev *bdev;
136 
137 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
138 	if (bdev) {
139 		SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Starting bdev iteration at %s\n", bdev->name);
140 	}
141 
142 	return bdev;
143 }
144 
145 struct spdk_bdev *
146 spdk_bdev_next(struct spdk_bdev *prev)
147 {
148 	struct spdk_bdev *bdev;
149 
150 	bdev = TAILQ_NEXT(prev, link);
151 	if (bdev) {
152 		SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Continuing bdev iteration at %s\n", bdev->name);
153 	}
154 
155 	return bdev;
156 }
157 
158 static struct spdk_bdev *
159 _bdev_next_leaf(struct spdk_bdev *bdev)
160 {
161 	while (bdev != NULL) {
162 		if (TAILQ_EMPTY(&bdev->vbdevs)) {
163 			return bdev;
164 		} else {
165 			bdev = TAILQ_NEXT(bdev, link);
166 		}
167 	}
168 
169 	return bdev;
170 }
171 
172 struct spdk_bdev *
173 spdk_bdev_first_leaf(void)
174 {
175 	struct spdk_bdev *bdev;
176 
177 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
178 
179 	if (bdev) {
180 		SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Starting bdev iteration at %s\n", bdev->name);
181 	}
182 
183 	return bdev;
184 }
185 
186 struct spdk_bdev *
187 spdk_bdev_next_leaf(struct spdk_bdev *prev)
188 {
189 	struct spdk_bdev *bdev;
190 
191 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
192 
193 	if (bdev) {
194 		SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Continuing bdev iteration at %s\n", bdev->name);
195 	}
196 
197 	return bdev;
198 }
199 
200 struct spdk_bdev *
201 spdk_bdev_get_by_name(const char *bdev_name)
202 {
203 	struct spdk_bdev *bdev = spdk_bdev_first();
204 
205 	while (bdev != NULL) {
206 		if (strcmp(bdev_name, bdev->name) == 0) {
207 			return bdev;
208 		}
209 		bdev = spdk_bdev_next(bdev);
210 	}
211 
212 	return NULL;
213 }
214 
215 static void
216 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
217 {
218 	assert(bdev_io->get_buf_cb != NULL);
219 	assert(buf != NULL);
220 	assert(bdev_io->u.read.iovs != NULL);
221 
222 	bdev_io->buf = buf;
223 	bdev_io->u.read.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
224 	bdev_io->u.read.iovs[0].iov_len = bdev_io->u.read.len;
225 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
226 }
227 
228 static void
229 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
230 {
231 	struct spdk_mempool *pool;
232 	struct spdk_bdev_io *tmp;
233 	void *buf;
234 	need_buf_tailq_t *tailq;
235 	uint64_t length;
236 	struct spdk_bdev_mgmt_channel *ch;
237 
238 	assert(bdev_io->u.read.iovcnt == 1);
239 
240 	length = bdev_io->u.read.len;
241 	buf = bdev_io->buf;
242 
243 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
244 
245 	if (length <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
246 		pool = g_bdev_mgr.buf_small_pool;
247 		tailq = &ch->need_buf_small;
248 	} else {
249 		pool = g_bdev_mgr.buf_large_pool;
250 		tailq = &ch->need_buf_large;
251 	}
252 
253 	if (TAILQ_EMPTY(tailq)) {
254 		spdk_mempool_put(pool, buf);
255 	} else {
256 		tmp = TAILQ_FIRST(tailq);
257 		TAILQ_REMOVE(tailq, tmp, buf_link);
258 		spdk_bdev_io_set_buf(tmp, buf);
259 	}
260 }
261 
262 void
263 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb)
264 {
265 	uint64_t len = bdev_io->u.read.len;
266 	struct spdk_mempool *pool;
267 	need_buf_tailq_t *tailq;
268 	void *buf = NULL;
269 	struct spdk_bdev_mgmt_channel *ch;
270 
271 	assert(cb != NULL);
272 	assert(bdev_io->u.read.iovs != NULL);
273 
274 	if (spdk_unlikely(bdev_io->u.read.iovs[0].iov_base != NULL)) {
275 		/* Buffer already present */
276 		cb(bdev_io->ch->channel, bdev_io);
277 		return;
278 	}
279 
280 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
281 
282 	bdev_io->get_buf_cb = cb;
283 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
284 		pool = g_bdev_mgr.buf_small_pool;
285 		tailq = &ch->need_buf_small;
286 	} else {
287 		pool = g_bdev_mgr.buf_large_pool;
288 		tailq = &ch->need_buf_large;
289 	}
290 
291 	buf = spdk_mempool_get(pool);
292 
293 	if (!buf) {
294 		TAILQ_INSERT_TAIL(tailq, bdev_io, buf_link);
295 	} else {
296 		spdk_bdev_io_set_buf(bdev_io, buf);
297 	}
298 }
299 
300 static int
301 spdk_bdev_module_get_max_ctx_size(void)
302 {
303 	struct spdk_bdev_module_if *bdev_module;
304 	int max_bdev_module_size = 0;
305 
306 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
307 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
308 			max_bdev_module_size = bdev_module->get_ctx_size();
309 		}
310 	}
311 
312 	return max_bdev_module_size;
313 }
314 
315 void
316 spdk_bdev_config_text(FILE *fp)
317 {
318 	struct spdk_bdev_module_if *bdev_module;
319 
320 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
321 		if (bdev_module->config_text) {
322 			bdev_module->config_text(fp);
323 		}
324 	}
325 }
326 
327 static int
328 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
329 {
330 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
331 
332 	TAILQ_INIT(&ch->need_buf_small);
333 	TAILQ_INIT(&ch->need_buf_large);
334 
335 	return 0;
336 }
337 
338 static void
339 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
340 {
341 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
342 
343 	if (!TAILQ_EMPTY(&ch->need_buf_small) || !TAILQ_EMPTY(&ch->need_buf_large)) {
344 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel destruction\n");
345 	}
346 }
347 
348 static void
349 spdk_bdev_init_complete(int rc)
350 {
351 	spdk_bdev_init_cb cb_fn = g_cb_fn;
352 	void *cb_arg = g_cb_arg;
353 
354 	g_bdev_mgr.init_complete = true;
355 	g_cb_fn = NULL;
356 	g_cb_arg = NULL;
357 
358 	cb_fn(cb_arg, rc);
359 }
360 
361 static void
362 spdk_bdev_module_init_complete(int rc)
363 {
364 	struct spdk_bdev_module_if *m;
365 
366 	g_bdev_mgr.module_init_complete = true;
367 
368 	if (rc != 0) {
369 		spdk_bdev_init_complete(rc);
370 	}
371 
372 	/*
373 	 * Check all bdev modules for an examinations in progress.  If any
374 	 * exist, return immediately since we cannot finish bdev subsystem
375 	 * initialization until all are completed.
376 	 */
377 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
378 		if (m->examine_in_progress > 0) {
379 			return;
380 		}
381 	}
382 
383 	spdk_bdev_init_complete(0);
384 }
385 
386 static int
387 spdk_bdev_modules_init(void)
388 {
389 	struct spdk_bdev_module_if *module;
390 	int rc;
391 
392 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
393 		rc = module->module_init();
394 		if (rc != 0) {
395 			return rc;
396 		}
397 	}
398 
399 	return 0;
400 }
401 
402 void
403 spdk_bdev_poller_start(struct spdk_bdev_poller **ppoller,
404 		       spdk_bdev_poller_fn fn,
405 		       void *arg,
406 		       uint32_t lcore,
407 		       uint64_t period_microseconds)
408 {
409 	g_bdev_mgr.start_poller_fn(ppoller, fn, arg, lcore, period_microseconds);
410 }
411 
412 void
413 spdk_bdev_poller_stop(struct spdk_bdev_poller **ppoller)
414 {
415 	g_bdev_mgr.stop_poller_fn(ppoller);
416 }
417 
418 void
419 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg,
420 		     spdk_bdev_poller_start_cb start_poller_fn,
421 		     spdk_bdev_poller_stop_cb stop_poller_fn)
422 {
423 	int cache_size;
424 	int rc = 0;
425 
426 	assert(cb_fn != NULL);
427 
428 	g_cb_fn = cb_fn;
429 	g_cb_arg = cb_arg;
430 
431 	g_bdev_mgr.start_poller_fn = start_poller_fn;
432 	g_bdev_mgr.stop_poller_fn = stop_poller_fn;
433 
434 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create("bdev_io",
435 				  SPDK_BDEV_IO_POOL_SIZE,
436 				  sizeof(struct spdk_bdev_io) +
437 				  spdk_bdev_module_get_max_ctx_size(),
438 				  64,
439 				  SPDK_ENV_SOCKET_ID_ANY);
440 
441 	if (g_bdev_mgr.bdev_io_pool == NULL) {
442 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool");
443 		spdk_bdev_module_init_complete(-1);
444 		return;
445 	}
446 
447 	/**
448 	 * Ensure no more than half of the total buffers end up local caches, by
449 	 *   using spdk_env_get_core_count() to determine how many local caches we need
450 	 *   to account for.
451 	 */
452 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
453 	g_bdev_mgr.buf_small_pool = spdk_mempool_create("buf_small_pool",
454 				    BUF_SMALL_POOL_SIZE,
455 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
456 				    cache_size,
457 				    SPDK_ENV_SOCKET_ID_ANY);
458 	if (!g_bdev_mgr.buf_small_pool) {
459 		SPDK_ERRLOG("create rbuf small pool failed\n");
460 		spdk_bdev_module_init_complete(-1);
461 		return;
462 	}
463 
464 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
465 	g_bdev_mgr.buf_large_pool = spdk_mempool_create("buf_large_pool",
466 				    BUF_LARGE_POOL_SIZE,
467 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
468 				    cache_size,
469 				    SPDK_ENV_SOCKET_ID_ANY);
470 	if (!g_bdev_mgr.buf_large_pool) {
471 		SPDK_ERRLOG("create rbuf large pool failed\n");
472 		spdk_bdev_module_init_complete(-1);
473 		return;
474 	}
475 
476 #ifdef SPDK_CONFIG_VTUNE
477 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
478 #endif
479 
480 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
481 				spdk_bdev_mgmt_channel_destroy,
482 				sizeof(struct spdk_bdev_mgmt_channel));
483 
484 	rc = spdk_bdev_modules_init();
485 	spdk_bdev_module_init_complete(rc);
486 }
487 
488 int
489 spdk_bdev_finish(void)
490 {
491 	struct spdk_bdev_module_if *bdev_module;
492 
493 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
494 		if (bdev_module->module_fini) {
495 			bdev_module->module_fini();
496 		}
497 	}
498 
499 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
500 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
501 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
502 			    SPDK_BDEV_IO_POOL_SIZE);
503 	}
504 
505 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
506 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
507 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
508 			    BUF_SMALL_POOL_SIZE);
509 		assert(false);
510 	}
511 
512 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
513 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
514 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
515 			    BUF_LARGE_POOL_SIZE);
516 		assert(false);
517 	}
518 
519 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
520 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
521 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
522 
523 	spdk_io_device_unregister(&g_bdev_mgr);
524 
525 	return 0;
526 }
527 
528 struct spdk_bdev_io *
529 spdk_bdev_get_io(void)
530 {
531 	struct spdk_bdev_io *bdev_io;
532 
533 	bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
534 	if (!bdev_io) {
535 		SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
536 		abort();
537 	}
538 
539 	memset(bdev_io, 0, sizeof(*bdev_io));
540 
541 	return bdev_io;
542 }
543 
544 static void
545 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
546 {
547 	if (!bdev_io) {
548 		return;
549 	}
550 
551 	if (bdev_io->buf != NULL) {
552 		spdk_bdev_io_put_buf(bdev_io);
553 	}
554 
555 	spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
556 }
557 
558 static void
559 __submit_request(struct spdk_bdev *bdev, struct spdk_bdev_io *bdev_io)
560 {
561 	struct spdk_io_channel *ch;
562 
563 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
564 
565 	ch = bdev_io->ch->channel;
566 
567 	bdev_io->ch->io_outstanding++;
568 	bdev_io->in_submit_request = true;
569 	bdev->fn_table->submit_request(ch, bdev_io);
570 	bdev_io->in_submit_request = false;
571 }
572 
573 static int
574 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
575 {
576 	struct spdk_bdev *bdev = bdev_io->bdev;
577 
578 	__submit_request(bdev, bdev_io);
579 	return 0;
580 }
581 
582 void
583 spdk_bdev_io_resubmit(struct spdk_bdev_io *bdev_io, struct spdk_bdev_desc *new_bdev_desc)
584 {
585 	struct spdk_bdev *new_bdev = new_bdev_desc->bdev;
586 
587 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
588 	bdev_io->bdev = new_bdev;
589 
590 	/*
591 	 * These fields are normally set during spdk_bdev_io_init(), but since bdev is
592 	 * being switched, they need to be reinitialized.
593 	 */
594 	bdev_io->gencnt = new_bdev->gencnt;
595 
596 	/*
597 	 * This bdev_io was already submitted so decrement io_outstanding to ensure it
598 	 *  does not get double-counted.
599 	 */
600 	assert(bdev_io->ch->io_outstanding > 0);
601 	bdev_io->ch->io_outstanding--;
602 	__submit_request(new_bdev, bdev_io);
603 }
604 
605 static void
606 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
607 		  struct spdk_bdev *bdev, void *cb_arg,
608 		  spdk_bdev_io_completion_cb cb)
609 {
610 	bdev_io->bdev = bdev;
611 	bdev_io->caller_ctx = cb_arg;
612 	bdev_io->cb = cb;
613 	bdev_io->gencnt = bdev->gencnt;
614 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
615 	bdev_io->in_submit_request = false;
616 }
617 
618 bool
619 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
620 {
621 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
622 }
623 
624 int
625 spdk_bdev_dump_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
626 {
627 	if (bdev->fn_table->dump_config_json) {
628 		return bdev->fn_table->dump_config_json(bdev->ctxt, w);
629 	}
630 
631 	return 0;
632 }
633 
634 static int
635 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
636 {
637 	struct spdk_bdev		*bdev = io_device;
638 	struct spdk_bdev_channel	*ch = ctx_buf;
639 
640 	ch->bdev = io_device;
641 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
642 	ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr);
643 	memset(&ch->stat, 0, sizeof(ch->stat));
644 	ch->io_outstanding = 0;
645 
646 #ifdef SPDK_CONFIG_VTUNE
647 	{
648 		char *name;
649 
650 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
651 		if (!name) {
652 			return -1;
653 		}
654 		ch->handle = __itt_string_handle_create(name);
655 		free(name);
656 		ch->start_tsc = spdk_get_ticks();
657 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
658 	}
659 #endif
660 
661 	return 0;
662 }
663 
664 static void
665 _spdk_bdev_abort_io(need_buf_tailq_t *queue, struct spdk_bdev_channel *ch)
666 {
667 	struct spdk_bdev_io *bdev_io, *tmp;
668 
669 	TAILQ_FOREACH_SAFE(bdev_io, queue, buf_link, tmp) {
670 		if (bdev_io->ch == ch) {
671 			TAILQ_REMOVE(queue, bdev_io, buf_link);
672 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
673 		}
674 	}
675 }
676 
677 static void
678 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
679 {
680 	struct spdk_bdev_channel	*ch = ctx_buf;
681 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
682 
683 	mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
684 
685 	_spdk_bdev_abort_io(&mgmt_channel->need_buf_small, ch);
686 	_spdk_bdev_abort_io(&mgmt_channel->need_buf_large, ch);
687 
688 	spdk_put_io_channel(ch->channel);
689 	spdk_put_io_channel(ch->mgmt_channel);
690 	assert(ch->io_outstanding == 0);
691 }
692 
693 struct spdk_io_channel *
694 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
695 {
696 	return spdk_get_io_channel(desc->bdev);
697 }
698 
699 const char *
700 spdk_bdev_get_name(const struct spdk_bdev *bdev)
701 {
702 	return bdev->name;
703 }
704 
705 const char *
706 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
707 {
708 	return bdev->product_name;
709 }
710 
711 uint32_t
712 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
713 {
714 	return bdev->blocklen;
715 }
716 
717 uint64_t
718 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
719 {
720 	return bdev->blockcnt;
721 }
722 
723 uint32_t
724 spdk_bdev_get_max_unmap_descriptors(const struct spdk_bdev *bdev)
725 {
726 	return bdev->max_unmap_bdesc_count;
727 }
728 
729 size_t
730 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
731 {
732 	/* TODO: push this logic down to the bdev modules */
733 	if (bdev->need_aligned_buffer) {
734 		return bdev->blocklen;
735 	}
736 
737 	return 1;
738 }
739 
740 bool
741 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
742 {
743 	return bdev->write_cache;
744 }
745 
746 static int
747 spdk_bdev_io_valid(struct spdk_bdev *bdev, uint64_t offset, uint64_t nbytes)
748 {
749 	/* Return failure if nbytes is not a multiple of bdev->blocklen */
750 	if (nbytes % bdev->blocklen) {
751 		return -1;
752 	}
753 
754 	/* Return failure if offset + nbytes is less than offset; indicates there
755 	 * has been an overflow and hence the offset has been wrapped around */
756 	if (offset + nbytes < offset) {
757 		return -1;
758 	}
759 
760 	/* Return failure if offset + nbytes exceeds the size of the bdev */
761 	if (offset + nbytes > bdev->blockcnt * bdev->blocklen) {
762 		return -1;
763 	}
764 
765 	return 0;
766 }
767 
768 int
769 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
770 	       void *buf, uint64_t offset, uint64_t nbytes,
771 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
772 {
773 	struct spdk_bdev *bdev = desc->bdev;
774 	struct spdk_bdev_io *bdev_io;
775 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
776 	int rc;
777 
778 	if (spdk_bdev_io_valid(bdev, offset, nbytes) != 0) {
779 		return -EINVAL;
780 	}
781 
782 	bdev_io = spdk_bdev_get_io();
783 	if (!bdev_io) {
784 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
785 		return -ENOMEM;
786 	}
787 
788 	bdev_io->ch = channel;
789 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
790 	bdev_io->u.read.iov.iov_base = buf;
791 	bdev_io->u.read.iov.iov_len = nbytes;
792 	bdev_io->u.read.iovs = &bdev_io->u.read.iov;
793 	bdev_io->u.read.iovcnt = 1;
794 	bdev_io->u.read.len = nbytes;
795 	bdev_io->u.read.offset = offset;
796 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
797 
798 	rc = spdk_bdev_io_submit(bdev_io);
799 	if (rc < 0) {
800 		spdk_bdev_put_io(bdev_io);
801 		return rc;
802 	}
803 
804 	return 0;
805 }
806 
807 int
808 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
809 		struct iovec *iov, int iovcnt,
810 		uint64_t offset, uint64_t nbytes,
811 		spdk_bdev_io_completion_cb cb, void *cb_arg)
812 {
813 	struct spdk_bdev *bdev = desc->bdev;
814 	struct spdk_bdev_io *bdev_io;
815 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
816 	int rc;
817 
818 	if (spdk_bdev_io_valid(bdev, offset, nbytes) != 0) {
819 		return -EINVAL;
820 	}
821 
822 	bdev_io = spdk_bdev_get_io();
823 	if (!bdev_io) {
824 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
825 		return -ENOMEM;
826 	}
827 
828 	bdev_io->ch = channel;
829 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
830 	bdev_io->u.read.iovs = iov;
831 	bdev_io->u.read.iovcnt = iovcnt;
832 	bdev_io->u.read.len = nbytes;
833 	bdev_io->u.read.offset = offset;
834 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
835 
836 	rc = spdk_bdev_io_submit(bdev_io);
837 	if (rc < 0) {
838 		spdk_bdev_put_io(bdev_io);
839 		return rc;
840 	}
841 
842 	return 0;
843 }
844 
845 int
846 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
847 		void *buf, uint64_t offset, uint64_t nbytes,
848 		spdk_bdev_io_completion_cb cb, void *cb_arg)
849 {
850 	struct spdk_bdev *bdev = desc->bdev;
851 	struct spdk_bdev_io *bdev_io;
852 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
853 	int rc;
854 
855 	if (!desc->write) {
856 		return -EBADF;
857 	}
858 
859 	if (spdk_bdev_io_valid(bdev, offset, nbytes) != 0) {
860 		return -EINVAL;
861 	}
862 
863 	bdev_io = spdk_bdev_get_io();
864 	if (!bdev_io) {
865 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
866 		return -ENOMEM;
867 	}
868 
869 	bdev_io->ch = channel;
870 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
871 	bdev_io->u.write.iov.iov_base = buf;
872 	bdev_io->u.write.iov.iov_len = nbytes;
873 	bdev_io->u.write.iovs = &bdev_io->u.write.iov;
874 	bdev_io->u.write.iovcnt = 1;
875 	bdev_io->u.write.len = nbytes;
876 	bdev_io->u.write.offset = offset;
877 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
878 
879 	rc = spdk_bdev_io_submit(bdev_io);
880 	if (rc < 0) {
881 		spdk_bdev_put_io(bdev_io);
882 		return rc;
883 	}
884 
885 	return 0;
886 }
887 
888 int
889 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
890 		 struct iovec *iov, int iovcnt,
891 		 uint64_t offset, uint64_t len,
892 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
893 {
894 	struct spdk_bdev *bdev = desc->bdev;
895 	struct spdk_bdev_io *bdev_io;
896 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
897 	int rc;
898 
899 	if (!desc->write) {
900 		return -EBADF;
901 	}
902 
903 	if (spdk_bdev_io_valid(bdev, offset, len) != 0) {
904 		return -EINVAL;
905 	}
906 
907 	bdev_io = spdk_bdev_get_io();
908 	if (!bdev_io) {
909 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
910 		return -ENOMEM;
911 	}
912 
913 	bdev_io->ch = channel;
914 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
915 	bdev_io->u.write.iovs = iov;
916 	bdev_io->u.write.iovcnt = iovcnt;
917 	bdev_io->u.write.len = len;
918 	bdev_io->u.write.offset = offset;
919 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
920 
921 	rc = spdk_bdev_io_submit(bdev_io);
922 	if (rc < 0) {
923 		spdk_bdev_put_io(bdev_io);
924 		return rc;
925 	}
926 
927 	return 0;
928 }
929 
930 int
931 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
932 		struct spdk_scsi_unmap_bdesc *unmap_d,
933 		uint16_t bdesc_count,
934 		spdk_bdev_io_completion_cb cb, void *cb_arg)
935 {
936 	struct spdk_bdev *bdev = desc->bdev;
937 	struct spdk_bdev_io *bdev_io;
938 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
939 	int rc;
940 
941 	if (!desc->write) {
942 		return -EBADF;
943 	}
944 
945 	if (bdesc_count == 0) {
946 		SPDK_ERRLOG("Invalid bdesc_count 0\n");
947 		return -EINVAL;
948 	}
949 
950 	if (bdesc_count > bdev->max_unmap_bdesc_count) {
951 		SPDK_ERRLOG("Invalid bdesc_count %u > max_unmap_bdesc_count %u\n",
952 			    bdesc_count, bdev->max_unmap_bdesc_count);
953 		return -EINVAL;
954 	}
955 
956 	bdev_io = spdk_bdev_get_io();
957 	if (!bdev_io) {
958 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
959 		return -ENOMEM;
960 	}
961 
962 	bdev_io->ch = channel;
963 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
964 	bdev_io->u.unmap.unmap_bdesc = unmap_d;
965 	bdev_io->u.unmap.bdesc_count = bdesc_count;
966 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
967 
968 	rc = spdk_bdev_io_submit(bdev_io);
969 	if (rc < 0) {
970 		spdk_bdev_put_io(bdev_io);
971 		return rc;
972 	}
973 
974 	return 0;
975 }
976 
977 int
978 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
979 		uint64_t offset, uint64_t length,
980 		spdk_bdev_io_completion_cb cb, void *cb_arg)
981 {
982 	struct spdk_bdev *bdev = desc->bdev;
983 	struct spdk_bdev_io *bdev_io;
984 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
985 	int rc;
986 
987 	if (!desc->write) {
988 		return -EBADF;
989 	}
990 
991 	bdev_io = spdk_bdev_get_io();
992 	if (!bdev_io) {
993 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
994 		return -ENOMEM;
995 	}
996 
997 	bdev_io->ch = channel;
998 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
999 	bdev_io->u.flush.offset = offset;
1000 	bdev_io->u.flush.length = length;
1001 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1002 
1003 	rc = spdk_bdev_io_submit(bdev_io);
1004 	if (rc < 0) {
1005 		spdk_bdev_put_io(bdev_io);
1006 		return rc;
1007 	}
1008 
1009 	return 0;
1010 }
1011 
1012 static void
1013 _spdk_bdev_reset_dev(void *io_device, void *ctx)
1014 {
1015 	struct spdk_bdev_io *bdev_io = ctx;
1016 	int rc;
1017 
1018 	rc = spdk_bdev_io_submit(bdev_io);
1019 	if (rc < 0) {
1020 		spdk_bdev_put_io(bdev_io);
1021 		SPDK_ERRLOG("reset failed\n");
1022 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1023 	}
1024 }
1025 
1026 static void
1027 _spdk_bdev_reset_abort_channel(void *io_device, struct spdk_io_channel *ch,
1028 			       void *ctx)
1029 {
1030 	struct spdk_bdev_channel	*channel;
1031 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1032 
1033 	channel = spdk_io_channel_get_ctx(ch);
1034 	mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
1035 
1036 	_spdk_bdev_abort_io(&mgmt_channel->need_buf_small, channel);
1037 	_spdk_bdev_abort_io(&mgmt_channel->need_buf_large, channel);
1038 }
1039 
1040 static void
1041 _spdk_bdev_start_reset(void *ctx)
1042 {
1043 	struct spdk_bdev_io *bdev_io = ctx;
1044 
1045 	spdk_for_each_channel(bdev_io->bdev, _spdk_bdev_reset_abort_channel,
1046 			      bdev_io, _spdk_bdev_reset_dev);
1047 }
1048 
1049 static void
1050 _spdk_bdev_start_next_reset(struct spdk_bdev *bdev)
1051 {
1052 	struct spdk_bdev_io *bdev_io;
1053 	struct spdk_thread *thread;
1054 
1055 	pthread_mutex_lock(&bdev->mutex);
1056 
1057 	if (bdev->reset_in_progress || TAILQ_EMPTY(&bdev->queued_resets)) {
1058 		pthread_mutex_unlock(&bdev->mutex);
1059 		return;
1060 	} else {
1061 		bdev_io = TAILQ_FIRST(&bdev->queued_resets);
1062 		TAILQ_REMOVE(&bdev->queued_resets, bdev_io, link);
1063 		bdev->reset_in_progress = true;
1064 		thread = spdk_io_channel_get_thread(bdev_io->ch->channel);
1065 		spdk_thread_send_msg(thread, _spdk_bdev_start_reset, bdev_io);
1066 	}
1067 
1068 	pthread_mutex_unlock(&bdev->mutex);
1069 }
1070 
1071 int
1072 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1073 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1074 {
1075 	struct spdk_bdev *bdev = desc->bdev;
1076 	struct spdk_bdev_io *bdev_io;
1077 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1078 
1079 	bdev_io = spdk_bdev_get_io();
1080 	if (!bdev_io) {
1081 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1082 		return -ENOMEM;;
1083 	}
1084 
1085 	bdev_io->ch = channel;
1086 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1087 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1088 
1089 	pthread_mutex_lock(&bdev->mutex);
1090 	TAILQ_INSERT_TAIL(&bdev->queued_resets, bdev_io, link);
1091 	pthread_mutex_unlock(&bdev->mutex);
1092 
1093 	_spdk_bdev_start_next_reset(bdev);
1094 
1095 	return 0;
1096 }
1097 
1098 void
1099 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1100 		      struct spdk_bdev_io_stat *stat)
1101 {
1102 #ifdef SPDK_CONFIG_VTUNE
1103 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1104 	memset(stat, 0, sizeof(*stat));
1105 	return;
1106 #endif
1107 
1108 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1109 
1110 	*stat = channel->stat;
1111 	memset(&channel->stat, 0, sizeof(channel->stat));
1112 }
1113 
1114 int
1115 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1116 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1117 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1118 {
1119 	struct spdk_bdev *bdev = desc->bdev;
1120 	struct spdk_bdev_io *bdev_io;
1121 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1122 	int rc;
1123 
1124 	if (!desc->write) {
1125 		return -EBADF;
1126 	}
1127 
1128 	bdev_io = spdk_bdev_get_io();
1129 	if (!bdev_io) {
1130 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1131 		return -ENOMEM;
1132 	}
1133 
1134 	bdev_io->ch = channel;
1135 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
1136 	bdev_io->u.nvme_passthru.cmd = *cmd;
1137 	bdev_io->u.nvme_passthru.buf = buf;
1138 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1139 
1140 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1141 
1142 	rc = spdk_bdev_io_submit(bdev_io);
1143 	if (rc < 0) {
1144 		spdk_bdev_put_io(bdev_io);
1145 		return rc;
1146 	}
1147 
1148 	return 0;
1149 }
1150 
1151 int
1152 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1153 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1154 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1155 {
1156 	struct spdk_bdev *bdev = desc->bdev;
1157 	struct spdk_bdev_io *bdev_io;
1158 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1159 	int rc;
1160 
1161 	if (!desc->write) {
1162 		/*
1163 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1164 		 *  to easily determine if the command is a read or write, but for now just
1165 		 *  do not allow io_passthru with a read-only descriptor.
1166 		 */
1167 		return -EBADF;
1168 	}
1169 
1170 	bdev_io = spdk_bdev_get_io();
1171 	if (!bdev_io) {
1172 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1173 		return -ENOMEM;
1174 	}
1175 
1176 	bdev_io->ch = channel;
1177 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
1178 	bdev_io->u.nvme_passthru.cmd = *cmd;
1179 	bdev_io->u.nvme_passthru.buf = buf;
1180 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1181 
1182 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1183 
1184 	rc = spdk_bdev_io_submit(bdev_io);
1185 	if (rc < 0) {
1186 		spdk_bdev_put_io(bdev_io);
1187 		return rc;
1188 	}
1189 
1190 	return 0;
1191 }
1192 
1193 int
1194 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
1195 {
1196 	if (!bdev_io) {
1197 		SPDK_ERRLOG("bdev_io is NULL\n");
1198 		return -1;
1199 	}
1200 
1201 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
1202 		SPDK_ERRLOG("bdev_io is in pending state\n");
1203 		assert(false);
1204 		return -1;
1205 	}
1206 
1207 	spdk_bdev_put_io(bdev_io);
1208 
1209 	return 0;
1210 }
1211 
1212 static void
1213 _spdk_bdev_io_complete(void *ctx)
1214 {
1215 	struct spdk_bdev_io *bdev_io = ctx;
1216 
1217 	assert(bdev_io->cb != NULL);
1218 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, bdev_io->caller_ctx);
1219 }
1220 
1221 void
1222 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
1223 {
1224 	bdev_io->status = status;
1225 
1226 	assert(bdev_io->ch->io_outstanding > 0);
1227 	bdev_io->ch->io_outstanding--;
1228 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
1229 		/* Successful reset */
1230 		if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1231 			/* Increase the bdev generation */
1232 			bdev_io->bdev->gencnt++;
1233 		}
1234 		bdev_io->bdev->reset_in_progress = false;
1235 		_spdk_bdev_start_next_reset(bdev_io->bdev);
1236 	} else {
1237 		/*
1238 		 * Check the gencnt, to see if this I/O was issued before the most
1239 		 * recent reset. If the gencnt is not equal, then just free the I/O
1240 		 * without calling the callback, since the caller will have already
1241 		 * freed its context for this I/O.
1242 		 */
1243 		if (bdev_io->bdev->gencnt != bdev_io->gencnt) {
1244 			spdk_bdev_put_io(bdev_io);
1245 			return;
1246 		}
1247 	}
1248 
1249 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1250 		switch (bdev_io->type) {
1251 		case SPDK_BDEV_IO_TYPE_READ:
1252 			bdev_io->ch->stat.bytes_read += bdev_io->u.read.len;
1253 			bdev_io->ch->stat.num_read_ops++;
1254 			break;
1255 		case SPDK_BDEV_IO_TYPE_WRITE:
1256 			bdev_io->ch->stat.bytes_written += bdev_io->u.write.len;
1257 			bdev_io->ch->stat.num_write_ops++;
1258 			break;
1259 		default:
1260 			break;
1261 		}
1262 	}
1263 
1264 #ifdef SPDK_CONFIG_VTUNE
1265 	uint64_t now_tsc = spdk_get_ticks();
1266 	if (now_tsc > (bdev_io->ch->start_tsc + bdev_io->ch->interval_tsc)) {
1267 		uint64_t data[4];
1268 
1269 		data[0] = bdev_io->ch->stat.num_read_ops;
1270 		data[1] = bdev_io->ch->stat.bytes_read;
1271 		data[2] = bdev_io->ch->stat.num_write_ops;
1272 		data[3] = bdev_io->ch->stat.bytes_written;
1273 
1274 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->ch->handle,
1275 				   __itt_metadata_u64, 4, data);
1276 
1277 		memset(&bdev_io->ch->stat, 0, sizeof(bdev_io->ch->stat));
1278 		bdev_io->ch->start_tsc = now_tsc;
1279 	}
1280 #endif
1281 
1282 	if (bdev_io->in_submit_request || bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
1283 		/*
1284 		 * Defer completion to avoid potential infinite recursion if the
1285 		 * user's completion callback issues a new I/O.
1286 		 */
1287 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->ch->channel),
1288 				     _spdk_bdev_io_complete, bdev_io);
1289 	} else {
1290 		_spdk_bdev_io_complete(bdev_io);
1291 	}
1292 }
1293 
1294 void
1295 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
1296 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
1297 {
1298 	if (sc == SPDK_SCSI_STATUS_GOOD) {
1299 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1300 	} else {
1301 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
1302 		bdev_io->error.scsi.sc = sc;
1303 		bdev_io->error.scsi.sk = sk;
1304 		bdev_io->error.scsi.asc = asc;
1305 		bdev_io->error.scsi.ascq = ascq;
1306 	}
1307 
1308 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1309 }
1310 
1311 void
1312 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
1313 			     int *sc, int *sk, int *asc, int *ascq)
1314 {
1315 	assert(sc != NULL);
1316 	assert(sk != NULL);
1317 	assert(asc != NULL);
1318 	assert(ascq != NULL);
1319 
1320 	switch (bdev_io->status) {
1321 	case SPDK_BDEV_IO_STATUS_SUCCESS:
1322 		*sc = SPDK_SCSI_STATUS_GOOD;
1323 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
1324 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1325 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1326 		break;
1327 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
1328 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
1329 		break;
1330 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
1331 		*sc = bdev_io->error.scsi.sc;
1332 		*sk = bdev_io->error.scsi.sk;
1333 		*asc = bdev_io->error.scsi.asc;
1334 		*ascq = bdev_io->error.scsi.ascq;
1335 		break;
1336 	default:
1337 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
1338 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
1339 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1340 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1341 		break;
1342 	}
1343 }
1344 
1345 void
1346 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
1347 {
1348 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
1349 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1350 	} else {
1351 		bdev_io->error.nvme.sct = sct;
1352 		bdev_io->error.nvme.sc = sc;
1353 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
1354 	}
1355 
1356 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1357 }
1358 
1359 void
1360 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
1361 {
1362 	assert(sct != NULL);
1363 	assert(sc != NULL);
1364 
1365 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
1366 		*sct = bdev_io->error.nvme.sct;
1367 		*sc = bdev_io->error.nvme.sc;
1368 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1369 		*sct = SPDK_NVME_SCT_GENERIC;
1370 		*sc = SPDK_NVME_SC_SUCCESS;
1371 	} else {
1372 		*sct = SPDK_NVME_SCT_GENERIC;
1373 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1374 	}
1375 }
1376 
1377 static void
1378 _spdk_bdev_register(struct spdk_bdev *bdev)
1379 {
1380 	struct spdk_bdev_module_if *module;
1381 
1382 	assert(bdev->module != NULL);
1383 
1384 	bdev->status = SPDK_BDEV_STATUS_READY;
1385 
1386 	/* initialize the reset generation value to zero */
1387 	bdev->gencnt = 0;
1388 	TAILQ_INIT(&bdev->open_descs);
1389 	bdev->bdev_opened_for_write = false;
1390 
1391 	TAILQ_INIT(&bdev->vbdevs);
1392 	TAILQ_INIT(&bdev->base_bdevs);
1393 
1394 	bdev->reset_in_progress = false;
1395 	TAILQ_INIT(&bdev->queued_resets);
1396 
1397 	spdk_io_device_register(bdev, spdk_bdev_channel_create, spdk_bdev_channel_destroy,
1398 				sizeof(struct spdk_bdev_channel));
1399 
1400 	pthread_mutex_init(&bdev->mutex, NULL);
1401 	SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Inserting bdev %s into list\n", bdev->name);
1402 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
1403 
1404 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
1405 		if (module->examine) {
1406 			module->examine_in_progress++;
1407 			module->examine(bdev);
1408 		}
1409 	}
1410 }
1411 
1412 void
1413 spdk_bdev_register(struct spdk_bdev *bdev)
1414 {
1415 	_spdk_bdev_register(bdev);
1416 }
1417 
1418 void
1419 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
1420 {
1421 	int i;
1422 
1423 	_spdk_bdev_register(vbdev);
1424 	for (i = 0; i < base_bdev_count; i++) {
1425 		assert(base_bdevs[i] != NULL);
1426 		TAILQ_INSERT_TAIL(&vbdev->base_bdevs, base_bdevs[i], base_bdev_link);
1427 		TAILQ_INSERT_TAIL(&base_bdevs[i]->vbdevs, vbdev, vbdev_link);
1428 	}
1429 }
1430 
1431 void
1432 spdk_bdev_unregister(struct spdk_bdev *bdev)
1433 {
1434 	struct spdk_bdev_desc	*desc, *tmp;
1435 	int			rc;
1436 
1437 	SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Removing bdev %s from list\n", bdev->name);
1438 
1439 	pthread_mutex_lock(&bdev->mutex);
1440 
1441 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
1442 
1443 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
1444 		if (desc->remove_cb) {
1445 			pthread_mutex_unlock(&bdev->mutex);
1446 			desc->remove_cb(desc->remove_ctx);
1447 			pthread_mutex_lock(&bdev->mutex);
1448 		}
1449 	}
1450 
1451 	if (!TAILQ_EMPTY(&bdev->open_descs)) {
1452 		pthread_mutex_unlock(&bdev->mutex);
1453 		return;
1454 	}
1455 
1456 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
1457 	pthread_mutex_unlock(&bdev->mutex);
1458 
1459 	pthread_mutex_destroy(&bdev->mutex);
1460 
1461 	spdk_io_device_unregister(bdev);
1462 
1463 	rc = bdev->fn_table->destruct(bdev->ctxt);
1464 	if (rc < 0) {
1465 		SPDK_ERRLOG("destruct failed\n");
1466 	}
1467 }
1468 
1469 void
1470 spdk_vbdev_unregister(struct spdk_bdev *vbdev)
1471 {
1472 	struct spdk_bdev *base_bdev;
1473 
1474 	assert(!TAILQ_EMPTY(&vbdev->base_bdevs));
1475 	TAILQ_FOREACH(base_bdev, &vbdev->base_bdevs, base_bdev_link) {
1476 		TAILQ_REMOVE(&base_bdev->vbdevs, vbdev, vbdev_link);
1477 	}
1478 	spdk_bdev_unregister(vbdev);
1479 }
1480 
1481 void
1482 spdk_bdev_module_examine_done(struct spdk_bdev_module_if *module)
1483 {
1484 	struct spdk_bdev_module_if *m;
1485 
1486 	assert(module->examine_in_progress > 0);
1487 	module->examine_in_progress--;
1488 
1489 	/*
1490 	 * Check all bdev modules for an examinations in progress.  If any
1491 	 * exist, return immediately since we cannot finish bdev subsystem
1492 	 * initialization until all are completed.
1493 	 */
1494 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
1495 		if (m->examine_in_progress > 0) {
1496 			return;
1497 		}
1498 	}
1499 
1500 	if (g_bdev_mgr.module_init_complete && !g_bdev_mgr.init_complete) {
1501 		/*
1502 		 * Modules already finished initialization - now that all
1503 		 * the bdev moduless have finished their asynchronous I/O
1504 		 * processing, the entire bdev layer can be marked as complete.
1505 		 */
1506 		spdk_bdev_init_complete(0);
1507 	}
1508 }
1509 
1510 int
1511 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
1512 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
1513 {
1514 	struct spdk_bdev_desc *desc;
1515 
1516 	desc = calloc(1, sizeof(*desc));
1517 	if (desc == NULL) {
1518 		return -ENOMEM;
1519 	}
1520 
1521 	pthread_mutex_lock(&bdev->mutex);
1522 
1523 	if (write && (bdev->bdev_opened_for_write || bdev->claim_module)) {
1524 		SPDK_ERRLOG("failed, %s already opened for write or claimed\n", bdev->name);
1525 		free(desc);
1526 		pthread_mutex_unlock(&bdev->mutex);
1527 		return -EPERM;
1528 	}
1529 
1530 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
1531 
1532 	if (write) {
1533 		bdev->bdev_opened_for_write = true;
1534 	}
1535 
1536 	desc->bdev = bdev;
1537 	desc->remove_cb = remove_cb;
1538 	desc->remove_ctx = remove_ctx;
1539 	desc->write = write;
1540 	*_desc = desc;
1541 
1542 	pthread_mutex_unlock(&bdev->mutex);
1543 
1544 	return 0;
1545 }
1546 
1547 void
1548 spdk_bdev_close(struct spdk_bdev_desc *desc)
1549 {
1550 	struct spdk_bdev *bdev = desc->bdev;
1551 	bool do_unregister = false;
1552 
1553 	pthread_mutex_lock(&bdev->mutex);
1554 
1555 	if (desc->write) {
1556 		assert(bdev->bdev_opened_for_write);
1557 		bdev->bdev_opened_for_write = false;
1558 	}
1559 
1560 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
1561 	free(desc);
1562 
1563 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING) {
1564 		do_unregister = true;
1565 	}
1566 	pthread_mutex_unlock(&bdev->mutex);
1567 
1568 	if (do_unregister == true) {
1569 		spdk_bdev_unregister(bdev);
1570 	}
1571 }
1572 
1573 int
1574 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
1575 			    struct spdk_bdev_module_if *module)
1576 {
1577 	if (bdev->claim_module != NULL) {
1578 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
1579 			    bdev->claim_module->name);
1580 		return -EPERM;
1581 	}
1582 
1583 	if ((!desc || !desc->write) && bdev->bdev_opened_for_write) {
1584 		SPDK_ERRLOG("bdev %s already opened with write access\n", bdev->name);
1585 		return -EPERM;
1586 	}
1587 
1588 	if (desc && !desc->write) {
1589 		desc->write = true;
1590 	}
1591 
1592 	bdev->claim_module = module;
1593 	return 0;
1594 }
1595 
1596 void
1597 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
1598 {
1599 	assert(bdev->claim_module != NULL);
1600 	bdev->claim_module = NULL;
1601 }
1602 
1603 struct spdk_bdev *
1604 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
1605 {
1606 	return desc->bdev;
1607 }
1608 
1609 void
1610 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
1611 {
1612 	struct iovec *iovs;
1613 	int iovcnt;
1614 
1615 	if (bdev_io == NULL) {
1616 		return;
1617 	}
1618 
1619 	switch (bdev_io->type) {
1620 	case SPDK_BDEV_IO_TYPE_READ:
1621 		iovs = bdev_io->u.read.iovs;
1622 		iovcnt = bdev_io->u.read.iovcnt;
1623 		break;
1624 	case SPDK_BDEV_IO_TYPE_WRITE:
1625 		iovs = bdev_io->u.write.iovs;
1626 		iovcnt = bdev_io->u.write.iovcnt;
1627 		break;
1628 	default:
1629 		iovs = NULL;
1630 		iovcnt = 0;
1631 		break;
1632 	}
1633 
1634 	if (iovp) {
1635 		*iovp = iovs;
1636 	}
1637 	if (iovcntp) {
1638 		*iovcntp = iovcnt;
1639 	}
1640 }
1641 
1642 void
1643 spdk_bdev_module_list_add(struct spdk_bdev_module_if *bdev_module)
1644 {
1645 	/*
1646 	 * Modules with examine callbacks must be initialized first, so they are
1647 	 *  ready to handle examine callbacks from later modules that will
1648 	 *  register physical bdevs.
1649 	 */
1650 	if (bdev_module->examine != NULL) {
1651 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
1652 	} else {
1653 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
1654 	}
1655 }
1656