xref: /spdk/lib/bdev/bdev.c (revision 053d5733e62a33b8b734f33ac5fc357fcc0513de)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/bdev.h"
38 
39 #include "spdk/env.h"
40 #include "spdk/io_channel.h"
41 #include "spdk/likely.h"
42 #include "spdk/queue.h"
43 #include "spdk/nvme_spec.h"
44 #include "spdk/scsi_spec.h"
45 
46 #include "spdk_internal/bdev.h"
47 #include "spdk_internal/log.h"
48 #include "spdk/string.h"
49 
50 #ifdef SPDK_CONFIG_VTUNE
51 #include "ittnotify.h"
52 #include "ittnotify_types.h"
53 int __itt_init_ittlib(const char *, __itt_group_id);
54 #endif
55 
56 #define SPDK_BDEV_IO_POOL_SIZE	(64 * 1024)
57 #define BUF_SMALL_POOL_SIZE	8192
58 #define BUF_LARGE_POOL_SIZE	1024
59 
60 typedef TAILQ_HEAD(, spdk_bdev_io) need_buf_tailq_t;
61 
62 struct spdk_bdev_mgr {
63 	struct spdk_mempool *bdev_io_pool;
64 
65 	struct spdk_mempool *buf_small_pool;
66 	struct spdk_mempool *buf_large_pool;
67 
68 	TAILQ_HEAD(, spdk_bdev_module_if) bdev_modules;
69 
70 	TAILQ_HEAD(, spdk_bdev) bdevs;
71 
72 	spdk_bdev_poller_start_cb start_poller_fn;
73 	spdk_bdev_poller_stop_cb stop_poller_fn;
74 
75 	bool init_complete;
76 	bool module_init_complete;
77 
78 #ifdef SPDK_CONFIG_VTUNE
79 	__itt_domain	*domain;
80 #endif
81 };
82 
83 static struct spdk_bdev_mgr g_bdev_mgr = {
84 	.bdev_modules = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdev_modules),
85 	.bdevs = TAILQ_HEAD_INITIALIZER(g_bdev_mgr.bdevs),
86 	.start_poller_fn = NULL,
87 	.stop_poller_fn = NULL,
88 	.init_complete = false,
89 	.module_init_complete = false,
90 };
91 
92 static spdk_bdev_init_cb	g_cb_fn = NULL;
93 static void			*g_cb_arg = NULL;
94 
95 
96 struct spdk_bdev_mgmt_channel {
97 	need_buf_tailq_t need_buf_small;
98 	need_buf_tailq_t need_buf_large;
99 };
100 
101 struct spdk_bdev_desc {
102 	struct spdk_bdev		*bdev;
103 	spdk_bdev_remove_cb_t		remove_cb;
104 	void				*remove_ctx;
105 	bool				write;
106 	TAILQ_ENTRY(spdk_bdev_desc)	link;
107 };
108 
109 struct spdk_bdev_channel {
110 	struct spdk_bdev	*bdev;
111 
112 	/* The channel for the underlying device */
113 	struct spdk_io_channel	*channel;
114 
115 	/* Channel for the bdev manager */
116 	struct spdk_io_channel *mgmt_channel;
117 
118 	struct spdk_bdev_io_stat stat;
119 
120 	/*
121 	 * Count of I/O submitted to bdev module and waiting for completion.
122 	 * Incremented before submit_request() is called on an spdk_bdev_io.
123 	 */
124 	uint64_t		io_outstanding;
125 
126 #ifdef SPDK_CONFIG_VTUNE
127 	uint64_t		start_tsc;
128 	uint64_t		interval_tsc;
129 	__itt_string_handle	*handle;
130 #endif
131 
132 };
133 
134 struct spdk_bdev *
135 spdk_bdev_first(void)
136 {
137 	struct spdk_bdev *bdev;
138 
139 	bdev = TAILQ_FIRST(&g_bdev_mgr.bdevs);
140 	if (bdev) {
141 		SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Starting bdev iteration at %s\n", bdev->name);
142 	}
143 
144 	return bdev;
145 }
146 
147 struct spdk_bdev *
148 spdk_bdev_next(struct spdk_bdev *prev)
149 {
150 	struct spdk_bdev *bdev;
151 
152 	bdev = TAILQ_NEXT(prev, link);
153 	if (bdev) {
154 		SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Continuing bdev iteration at %s\n", bdev->name);
155 	}
156 
157 	return bdev;
158 }
159 
160 static struct spdk_bdev *
161 _bdev_next_leaf(struct spdk_bdev *bdev)
162 {
163 	while (bdev != NULL) {
164 		if (TAILQ_EMPTY(&bdev->vbdevs)) {
165 			return bdev;
166 		} else {
167 			bdev = TAILQ_NEXT(bdev, link);
168 		}
169 	}
170 
171 	return bdev;
172 }
173 
174 struct spdk_bdev *
175 spdk_bdev_first_leaf(void)
176 {
177 	struct spdk_bdev *bdev;
178 
179 	bdev = _bdev_next_leaf(TAILQ_FIRST(&g_bdev_mgr.bdevs));
180 
181 	if (bdev) {
182 		SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Starting bdev iteration at %s\n", bdev->name);
183 	}
184 
185 	return bdev;
186 }
187 
188 struct spdk_bdev *
189 spdk_bdev_next_leaf(struct spdk_bdev *prev)
190 {
191 	struct spdk_bdev *bdev;
192 
193 	bdev = _bdev_next_leaf(TAILQ_NEXT(prev, link));
194 
195 	if (bdev) {
196 		SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Continuing bdev iteration at %s\n", bdev->name);
197 	}
198 
199 	return bdev;
200 }
201 
202 struct spdk_bdev *
203 spdk_bdev_get_by_name(const char *bdev_name)
204 {
205 	struct spdk_bdev *bdev = spdk_bdev_first();
206 
207 	while (bdev != NULL) {
208 		if (strcmp(bdev_name, bdev->name) == 0) {
209 			return bdev;
210 		}
211 		bdev = spdk_bdev_next(bdev);
212 	}
213 
214 	return NULL;
215 }
216 
217 static void
218 spdk_bdev_io_set_buf(struct spdk_bdev_io *bdev_io, void *buf)
219 {
220 	assert(bdev_io->get_buf_cb != NULL);
221 	assert(buf != NULL);
222 	assert(bdev_io->u.read.iovs != NULL);
223 
224 	bdev_io->buf = buf;
225 	bdev_io->u.read.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
226 	bdev_io->u.read.iovs[0].iov_len = bdev_io->u.read.len;
227 	bdev_io->get_buf_cb(bdev_io->ch->channel, bdev_io);
228 }
229 
230 static void
231 spdk_bdev_io_put_buf(struct spdk_bdev_io *bdev_io)
232 {
233 	struct spdk_mempool *pool;
234 	struct spdk_bdev_io *tmp;
235 	void *buf;
236 	need_buf_tailq_t *tailq;
237 	uint64_t length;
238 	struct spdk_bdev_mgmt_channel *ch;
239 
240 	assert(bdev_io->u.read.iovcnt == 1);
241 
242 	length = bdev_io->u.read.len;
243 	buf = bdev_io->buf;
244 
245 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
246 
247 	if (length <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
248 		pool = g_bdev_mgr.buf_small_pool;
249 		tailq = &ch->need_buf_small;
250 	} else {
251 		pool = g_bdev_mgr.buf_large_pool;
252 		tailq = &ch->need_buf_large;
253 	}
254 
255 	if (TAILQ_EMPTY(tailq)) {
256 		spdk_mempool_put(pool, buf);
257 	} else {
258 		tmp = TAILQ_FIRST(tailq);
259 		TAILQ_REMOVE(tailq, tmp, buf_link);
260 		spdk_bdev_io_set_buf(tmp, buf);
261 	}
262 }
263 
264 void
265 spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_buf_cb cb)
266 {
267 	uint64_t len = bdev_io->u.read.len;
268 	struct spdk_mempool *pool;
269 	need_buf_tailq_t *tailq;
270 	void *buf = NULL;
271 	struct spdk_bdev_mgmt_channel *ch;
272 
273 	assert(cb != NULL);
274 	assert(bdev_io->u.read.iovs != NULL);
275 
276 	if (spdk_unlikely(bdev_io->u.read.iovs[0].iov_base != NULL)) {
277 		/* Buffer already present */
278 		cb(bdev_io->ch->channel, bdev_io);
279 		return;
280 	}
281 
282 	ch = spdk_io_channel_get_ctx(bdev_io->ch->mgmt_channel);
283 
284 	bdev_io->get_buf_cb = cb;
285 	if (len <= SPDK_BDEV_SMALL_BUF_MAX_SIZE) {
286 		pool = g_bdev_mgr.buf_small_pool;
287 		tailq = &ch->need_buf_small;
288 	} else {
289 		pool = g_bdev_mgr.buf_large_pool;
290 		tailq = &ch->need_buf_large;
291 	}
292 
293 	buf = spdk_mempool_get(pool);
294 
295 	if (!buf) {
296 		TAILQ_INSERT_TAIL(tailq, bdev_io, buf_link);
297 	} else {
298 		spdk_bdev_io_set_buf(bdev_io, buf);
299 	}
300 }
301 
302 static int
303 spdk_bdev_module_get_max_ctx_size(void)
304 {
305 	struct spdk_bdev_module_if *bdev_module;
306 	int max_bdev_module_size = 0;
307 
308 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
309 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
310 			max_bdev_module_size = bdev_module->get_ctx_size();
311 		}
312 	}
313 
314 	return max_bdev_module_size;
315 }
316 
317 void
318 spdk_bdev_config_text(FILE *fp)
319 {
320 	struct spdk_bdev_module_if *bdev_module;
321 
322 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
323 		if (bdev_module->config_text) {
324 			bdev_module->config_text(fp);
325 		}
326 	}
327 }
328 
329 static int
330 spdk_bdev_mgmt_channel_create(void *io_device, void *ctx_buf)
331 {
332 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
333 
334 	TAILQ_INIT(&ch->need_buf_small);
335 	TAILQ_INIT(&ch->need_buf_large);
336 
337 	return 0;
338 }
339 
340 static void
341 spdk_bdev_mgmt_channel_destroy(void *io_device, void *ctx_buf)
342 {
343 	struct spdk_bdev_mgmt_channel *ch = ctx_buf;
344 
345 	if (!TAILQ_EMPTY(&ch->need_buf_small) || !TAILQ_EMPTY(&ch->need_buf_large)) {
346 		SPDK_ERRLOG("Pending I/O list wasn't empty on channel destruction\n");
347 	}
348 }
349 
350 static void
351 spdk_bdev_init_complete(int rc)
352 {
353 	spdk_bdev_init_cb cb_fn = g_cb_fn;
354 	void *cb_arg = g_cb_arg;
355 
356 	g_bdev_mgr.init_complete = true;
357 	g_cb_fn = NULL;
358 	g_cb_arg = NULL;
359 
360 	cb_fn(cb_arg, rc);
361 }
362 
363 static void
364 spdk_bdev_module_init_complete(int rc)
365 {
366 	struct spdk_bdev_module_if *m;
367 
368 	g_bdev_mgr.module_init_complete = true;
369 
370 	if (rc != 0) {
371 		spdk_bdev_init_complete(rc);
372 	}
373 
374 	/*
375 	 * Check all bdev modules for an examinations in progress.  If any
376 	 * exist, return immediately since we cannot finish bdev subsystem
377 	 * initialization until all are completed.
378 	 */
379 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
380 		if (m->examine_in_progress > 0) {
381 			return;
382 		}
383 	}
384 
385 	spdk_bdev_init_complete(0);
386 }
387 
388 static int
389 spdk_bdev_modules_init(void)
390 {
391 	struct spdk_bdev_module_if *module;
392 	int rc;
393 
394 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
395 		rc = module->module_init();
396 		if (rc != 0) {
397 			return rc;
398 		}
399 	}
400 
401 	return 0;
402 }
403 
404 void
405 spdk_bdev_poller_start(struct spdk_bdev_poller **ppoller,
406 		       spdk_bdev_poller_fn fn,
407 		       void *arg,
408 		       uint32_t lcore,
409 		       uint64_t period_microseconds)
410 {
411 	g_bdev_mgr.start_poller_fn(ppoller, fn, arg, lcore, period_microseconds);
412 }
413 
414 void
415 spdk_bdev_poller_stop(struct spdk_bdev_poller **ppoller)
416 {
417 	g_bdev_mgr.stop_poller_fn(ppoller);
418 }
419 
420 void
421 spdk_bdev_initialize(spdk_bdev_init_cb cb_fn, void *cb_arg,
422 		     spdk_bdev_poller_start_cb start_poller_fn,
423 		     spdk_bdev_poller_stop_cb stop_poller_fn)
424 {
425 	int cache_size;
426 	int rc = 0;
427 	char mempool_name[32];
428 
429 	assert(cb_fn != NULL);
430 
431 	g_cb_fn = cb_fn;
432 	g_cb_arg = cb_arg;
433 
434 	g_bdev_mgr.start_poller_fn = start_poller_fn;
435 	g_bdev_mgr.stop_poller_fn = stop_poller_fn;
436 
437 	snprintf(mempool_name, sizeof(mempool_name), "bdev_io_%d", getpid());
438 
439 	g_bdev_mgr.bdev_io_pool = spdk_mempool_create(mempool_name,
440 				  SPDK_BDEV_IO_POOL_SIZE,
441 				  sizeof(struct spdk_bdev_io) +
442 				  spdk_bdev_module_get_max_ctx_size(),
443 				  64,
444 				  SPDK_ENV_SOCKET_ID_ANY);
445 
446 	if (g_bdev_mgr.bdev_io_pool == NULL) {
447 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool");
448 		spdk_bdev_module_init_complete(-1);
449 		return;
450 	}
451 
452 	/**
453 	 * Ensure no more than half of the total buffers end up local caches, by
454 	 *   using spdk_env_get_core_count() to determine how many local caches we need
455 	 *   to account for.
456 	 */
457 	cache_size = BUF_SMALL_POOL_SIZE / (2 * spdk_env_get_core_count());
458 	snprintf(mempool_name, sizeof(mempool_name), "buf_small_pool_%d", getpid());
459 
460 	g_bdev_mgr.buf_small_pool = spdk_mempool_create(mempool_name,
461 				    BUF_SMALL_POOL_SIZE,
462 				    SPDK_BDEV_SMALL_BUF_MAX_SIZE + 512,
463 				    cache_size,
464 				    SPDK_ENV_SOCKET_ID_ANY);
465 	if (!g_bdev_mgr.buf_small_pool) {
466 		SPDK_ERRLOG("create rbuf small pool failed\n");
467 		spdk_bdev_module_init_complete(-1);
468 		return;
469 	}
470 
471 	cache_size = BUF_LARGE_POOL_SIZE / (2 * spdk_env_get_core_count());
472 	snprintf(mempool_name, sizeof(mempool_name), "buf_large_pool_%d", getpid());
473 
474 	g_bdev_mgr.buf_large_pool = spdk_mempool_create(mempool_name,
475 				    BUF_LARGE_POOL_SIZE,
476 				    SPDK_BDEV_LARGE_BUF_MAX_SIZE + 512,
477 				    cache_size,
478 				    SPDK_ENV_SOCKET_ID_ANY);
479 	if (!g_bdev_mgr.buf_large_pool) {
480 		SPDK_ERRLOG("create rbuf large pool failed\n");
481 		spdk_bdev_module_init_complete(-1);
482 		return;
483 	}
484 
485 #ifdef SPDK_CONFIG_VTUNE
486 	g_bdev_mgr.domain = __itt_domain_create("spdk_bdev");
487 #endif
488 
489 	spdk_io_device_register(&g_bdev_mgr, spdk_bdev_mgmt_channel_create,
490 				spdk_bdev_mgmt_channel_destroy,
491 				sizeof(struct spdk_bdev_mgmt_channel));
492 
493 	rc = spdk_bdev_modules_init();
494 	spdk_bdev_module_init_complete(rc);
495 }
496 
497 int
498 spdk_bdev_finish(void)
499 {
500 	struct spdk_bdev_module_if *bdev_module;
501 
502 	TAILQ_FOREACH(bdev_module, &g_bdev_mgr.bdev_modules, tailq) {
503 		if (bdev_module->module_fini) {
504 			bdev_module->module_fini();
505 		}
506 	}
507 
508 	if (spdk_mempool_count(g_bdev_mgr.bdev_io_pool) != SPDK_BDEV_IO_POOL_SIZE) {
509 		SPDK_ERRLOG("bdev IO pool count is %zu but should be %u\n",
510 			    spdk_mempool_count(g_bdev_mgr.bdev_io_pool),
511 			    SPDK_BDEV_IO_POOL_SIZE);
512 	}
513 
514 	if (spdk_mempool_count(g_bdev_mgr.buf_small_pool) != BUF_SMALL_POOL_SIZE) {
515 		SPDK_ERRLOG("Small buffer pool count is %zu but should be %u\n",
516 			    spdk_mempool_count(g_bdev_mgr.buf_small_pool),
517 			    BUF_SMALL_POOL_SIZE);
518 		assert(false);
519 	}
520 
521 	if (spdk_mempool_count(g_bdev_mgr.buf_large_pool) != BUF_LARGE_POOL_SIZE) {
522 		SPDK_ERRLOG("Large buffer pool count is %zu but should be %u\n",
523 			    spdk_mempool_count(g_bdev_mgr.buf_large_pool),
524 			    BUF_LARGE_POOL_SIZE);
525 		assert(false);
526 	}
527 
528 	spdk_mempool_free(g_bdev_mgr.bdev_io_pool);
529 	spdk_mempool_free(g_bdev_mgr.buf_small_pool);
530 	spdk_mempool_free(g_bdev_mgr.buf_large_pool);
531 
532 	spdk_io_device_unregister(&g_bdev_mgr, NULL);
533 
534 	return 0;
535 }
536 
537 struct spdk_bdev_io *
538 spdk_bdev_get_io(void)
539 {
540 	struct spdk_bdev_io *bdev_io;
541 
542 	bdev_io = spdk_mempool_get(g_bdev_mgr.bdev_io_pool);
543 	if (!bdev_io) {
544 		SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
545 		abort();
546 	}
547 
548 	memset(bdev_io, 0, sizeof(*bdev_io));
549 
550 	return bdev_io;
551 }
552 
553 static void
554 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
555 {
556 	if (!bdev_io) {
557 		return;
558 	}
559 
560 	if (bdev_io->buf != NULL) {
561 		spdk_bdev_io_put_buf(bdev_io);
562 	}
563 
564 	spdk_mempool_put(g_bdev_mgr.bdev_io_pool, (void *)bdev_io);
565 }
566 
567 static void
568 __submit_request(struct spdk_bdev *bdev, struct spdk_bdev_io *bdev_io)
569 {
570 	struct spdk_io_channel *ch;
571 
572 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
573 
574 	ch = bdev_io->ch->channel;
575 
576 	bdev_io->ch->io_outstanding++;
577 	bdev_io->in_submit_request = true;
578 	bdev->fn_table->submit_request(ch, bdev_io);
579 	bdev_io->in_submit_request = false;
580 }
581 
582 static int
583 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
584 {
585 	struct spdk_bdev *bdev = bdev_io->bdev;
586 
587 	__submit_request(bdev, bdev_io);
588 	return 0;
589 }
590 
591 void
592 spdk_bdev_io_resubmit(struct spdk_bdev_io *bdev_io, struct spdk_bdev_desc *new_bdev_desc)
593 {
594 	struct spdk_bdev *new_bdev = new_bdev_desc->bdev;
595 
596 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
597 	bdev_io->bdev = new_bdev;
598 
599 	/*
600 	 * These fields are normally set during spdk_bdev_io_init(), but since bdev is
601 	 * being switched, they need to be reinitialized.
602 	 */
603 	bdev_io->gencnt = new_bdev->gencnt;
604 
605 	/*
606 	 * This bdev_io was already submitted so decrement io_outstanding to ensure it
607 	 *  does not get double-counted.
608 	 */
609 	assert(bdev_io->ch->io_outstanding > 0);
610 	bdev_io->ch->io_outstanding--;
611 	__submit_request(new_bdev, bdev_io);
612 }
613 
614 static void
615 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
616 		  struct spdk_bdev *bdev, void *cb_arg,
617 		  spdk_bdev_io_completion_cb cb)
618 {
619 	bdev_io->bdev = bdev;
620 	bdev_io->caller_ctx = cb_arg;
621 	bdev_io->cb = cb;
622 	bdev_io->gencnt = bdev->gencnt;
623 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
624 	bdev_io->in_submit_request = false;
625 }
626 
627 bool
628 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
629 {
630 	return bdev->fn_table->io_type_supported(bdev->ctxt, io_type);
631 }
632 
633 int
634 spdk_bdev_dump_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
635 {
636 	if (bdev->fn_table->dump_config_json) {
637 		return bdev->fn_table->dump_config_json(bdev->ctxt, w);
638 	}
639 
640 	return 0;
641 }
642 
643 static int
644 spdk_bdev_channel_create(void *io_device, void *ctx_buf)
645 {
646 	struct spdk_bdev		*bdev = io_device;
647 	struct spdk_bdev_channel	*ch = ctx_buf;
648 
649 	ch->bdev = io_device;
650 	ch->channel = bdev->fn_table->get_io_channel(bdev->ctxt);
651 	ch->mgmt_channel = spdk_get_io_channel(&g_bdev_mgr);
652 	memset(&ch->stat, 0, sizeof(ch->stat));
653 	ch->io_outstanding = 0;
654 
655 #ifdef SPDK_CONFIG_VTUNE
656 	{
657 		char *name;
658 		__itt_init_ittlib(NULL, 0);
659 		name = spdk_sprintf_alloc("spdk_bdev_%s_%p", ch->bdev->name, ch);
660 		if (!name) {
661 			return -1;
662 		}
663 		ch->handle = __itt_string_handle_create(name);
664 		free(name);
665 		ch->start_tsc = spdk_get_ticks();
666 		ch->interval_tsc = spdk_get_ticks_hz() / 100;
667 	}
668 #endif
669 
670 	return 0;
671 }
672 
673 static void
674 _spdk_bdev_abort_io(need_buf_tailq_t *queue, struct spdk_bdev_channel *ch)
675 {
676 	struct spdk_bdev_io *bdev_io, *tmp;
677 
678 	TAILQ_FOREACH_SAFE(bdev_io, queue, buf_link, tmp) {
679 		if (bdev_io->ch == ch) {
680 			TAILQ_REMOVE(queue, bdev_io, buf_link);
681 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
682 		}
683 	}
684 }
685 
686 static void
687 spdk_bdev_channel_destroy(void *io_device, void *ctx_buf)
688 {
689 	struct spdk_bdev_channel	*ch = ctx_buf;
690 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
691 
692 	mgmt_channel = spdk_io_channel_get_ctx(ch->mgmt_channel);
693 
694 	_spdk_bdev_abort_io(&mgmt_channel->need_buf_small, ch);
695 	_spdk_bdev_abort_io(&mgmt_channel->need_buf_large, ch);
696 
697 	spdk_put_io_channel(ch->channel);
698 	spdk_put_io_channel(ch->mgmt_channel);
699 	assert(ch->io_outstanding == 0);
700 }
701 
702 struct spdk_io_channel *
703 spdk_bdev_get_io_channel(struct spdk_bdev_desc *desc)
704 {
705 	return spdk_get_io_channel(desc->bdev);
706 }
707 
708 const char *
709 spdk_bdev_get_name(const struct spdk_bdev *bdev)
710 {
711 	return bdev->name;
712 }
713 
714 const char *
715 spdk_bdev_get_product_name(const struct spdk_bdev *bdev)
716 {
717 	return bdev->product_name;
718 }
719 
720 uint32_t
721 spdk_bdev_get_block_size(const struct spdk_bdev *bdev)
722 {
723 	return bdev->blocklen;
724 }
725 
726 uint64_t
727 spdk_bdev_get_num_blocks(const struct spdk_bdev *bdev)
728 {
729 	return bdev->blockcnt;
730 }
731 
732 size_t
733 spdk_bdev_get_buf_align(const struct spdk_bdev *bdev)
734 {
735 	/* TODO: push this logic down to the bdev modules */
736 	if (bdev->need_aligned_buffer) {
737 		return bdev->blocklen;
738 	}
739 
740 	return 1;
741 }
742 
743 bool
744 spdk_bdev_has_write_cache(const struct spdk_bdev *bdev)
745 {
746 	return bdev->write_cache;
747 }
748 
749 static bool
750 spdk_bdev_io_valid(struct spdk_bdev *bdev, uint64_t offset, uint64_t nbytes)
751 {
752 	/* Return failure if offset is not a multiple of bdev->blocklen */
753 	if (offset % bdev->blocklen) {
754 		return false;
755 	}
756 
757 	/* Return failure if nbytes is not a multiple of bdev->blocklen */
758 	if (nbytes % bdev->blocklen) {
759 		return false;
760 	}
761 
762 	/* Return failure if offset + nbytes is less than offset; indicates there
763 	 * has been an overflow and hence the offset has been wrapped around */
764 	if (offset + nbytes < offset) {
765 		return false;
766 	}
767 
768 	/* Return failure if offset + nbytes exceeds the size of the bdev */
769 	if (offset + nbytes > bdev->blockcnt * bdev->blocklen) {
770 		return false;
771 	}
772 
773 	return true;
774 }
775 
776 int
777 spdk_bdev_read(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
778 	       void *buf, uint64_t offset, uint64_t nbytes,
779 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
780 {
781 	struct spdk_bdev *bdev = desc->bdev;
782 	struct spdk_bdev_io *bdev_io;
783 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
784 	int rc;
785 
786 	if (!spdk_bdev_io_valid(bdev, offset, nbytes)) {
787 		return -EINVAL;
788 	}
789 
790 	bdev_io = spdk_bdev_get_io();
791 	if (!bdev_io) {
792 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
793 		return -ENOMEM;
794 	}
795 
796 	bdev_io->ch = channel;
797 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
798 	bdev_io->u.read.iov.iov_base = buf;
799 	bdev_io->u.read.iov.iov_len = nbytes;
800 	bdev_io->u.read.iovs = &bdev_io->u.read.iov;
801 	bdev_io->u.read.iovcnt = 1;
802 	bdev_io->u.read.len = nbytes;
803 	bdev_io->u.read.offset = offset;
804 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
805 
806 	rc = spdk_bdev_io_submit(bdev_io);
807 	if (rc < 0) {
808 		spdk_bdev_put_io(bdev_io);
809 		return rc;
810 	}
811 
812 	return 0;
813 }
814 
815 int
816 spdk_bdev_readv(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
817 		struct iovec *iov, int iovcnt,
818 		uint64_t offset, uint64_t nbytes,
819 		spdk_bdev_io_completion_cb cb, void *cb_arg)
820 {
821 	struct spdk_bdev *bdev = desc->bdev;
822 	struct spdk_bdev_io *bdev_io;
823 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
824 	int rc;
825 
826 	if (!spdk_bdev_io_valid(bdev, offset, nbytes)) {
827 		return -EINVAL;
828 	}
829 
830 	bdev_io = spdk_bdev_get_io();
831 	if (!bdev_io) {
832 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
833 		return -ENOMEM;
834 	}
835 
836 	bdev_io->ch = channel;
837 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
838 	bdev_io->u.read.iovs = iov;
839 	bdev_io->u.read.iovcnt = iovcnt;
840 	bdev_io->u.read.len = nbytes;
841 	bdev_io->u.read.offset = offset;
842 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
843 
844 	rc = spdk_bdev_io_submit(bdev_io);
845 	if (rc < 0) {
846 		spdk_bdev_put_io(bdev_io);
847 		return rc;
848 	}
849 
850 	return 0;
851 }
852 
853 int
854 spdk_bdev_write(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
855 		void *buf, uint64_t offset, uint64_t nbytes,
856 		spdk_bdev_io_completion_cb cb, void *cb_arg)
857 {
858 	struct spdk_bdev *bdev = desc->bdev;
859 	struct spdk_bdev_io *bdev_io;
860 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
861 	int rc;
862 
863 	if (!desc->write) {
864 		return -EBADF;
865 	}
866 
867 	if (!spdk_bdev_io_valid(bdev, offset, nbytes)) {
868 		return -EINVAL;
869 	}
870 
871 	bdev_io = spdk_bdev_get_io();
872 	if (!bdev_io) {
873 		SPDK_ERRLOG("bdev_io memory allocation failed duing write\n");
874 		return -ENOMEM;
875 	}
876 
877 	bdev_io->ch = channel;
878 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
879 	bdev_io->u.write.iov.iov_base = buf;
880 	bdev_io->u.write.iov.iov_len = nbytes;
881 	bdev_io->u.write.iovs = &bdev_io->u.write.iov;
882 	bdev_io->u.write.iovcnt = 1;
883 	bdev_io->u.write.len = nbytes;
884 	bdev_io->u.write.offset = offset;
885 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
886 
887 	rc = spdk_bdev_io_submit(bdev_io);
888 	if (rc < 0) {
889 		spdk_bdev_put_io(bdev_io);
890 		return rc;
891 	}
892 
893 	return 0;
894 }
895 
896 int
897 spdk_bdev_writev(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
898 		 struct iovec *iov, int iovcnt,
899 		 uint64_t offset, uint64_t len,
900 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
901 {
902 	struct spdk_bdev *bdev = desc->bdev;
903 	struct spdk_bdev_io *bdev_io;
904 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
905 	int rc;
906 
907 	if (!desc->write) {
908 		return -EBADF;
909 	}
910 
911 	if (!spdk_bdev_io_valid(bdev, offset, len)) {
912 		return -EINVAL;
913 	}
914 
915 	bdev_io = spdk_bdev_get_io();
916 	if (!bdev_io) {
917 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
918 		return -ENOMEM;
919 	}
920 
921 	bdev_io->ch = channel;
922 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
923 	bdev_io->u.write.iovs = iov;
924 	bdev_io->u.write.iovcnt = iovcnt;
925 	bdev_io->u.write.len = len;
926 	bdev_io->u.write.offset = offset;
927 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
928 
929 	rc = spdk_bdev_io_submit(bdev_io);
930 	if (rc < 0) {
931 		spdk_bdev_put_io(bdev_io);
932 		return rc;
933 	}
934 
935 	return 0;
936 }
937 
938 int
939 spdk_bdev_write_zeroes(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
940 		       uint64_t offset, uint64_t len,
941 		       spdk_bdev_io_completion_cb cb, void *cb_arg)
942 {
943 	int rc;
944 	struct spdk_bdev *bdev = desc->bdev;
945 	struct spdk_bdev_io *bdev_io;
946 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
947 
948 	if (!spdk_bdev_io_valid(bdev, offset, len)) {
949 		return -EINVAL;
950 	}
951 
952 	bdev_io = spdk_bdev_get_io();
953 	if (!bdev_io) {
954 		SPDK_ERRLOG("bdev_io memory allocation failed duing write_zeroes\n");
955 		return -ENOMEM;
956 	}
957 
958 	bdev_io->ch = channel;
959 	bdev_io->u.write.len = len;
960 	bdev_io->u.write.offset = offset;
961 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
962 
963 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
964 
965 	rc = spdk_bdev_io_submit(bdev_io);
966 	if (rc < 0) {
967 		spdk_bdev_put_io(bdev_io);
968 		return rc;
969 	}
970 
971 	return 0;
972 }
973 
974 int
975 spdk_bdev_unmap(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
976 		uint64_t offset, uint64_t nbytes,
977 		spdk_bdev_io_completion_cb cb, void *cb_arg)
978 {
979 	struct spdk_bdev *bdev = desc->bdev;
980 	struct spdk_bdev_io *bdev_io;
981 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
982 	int rc;
983 
984 	if (!desc->write) {
985 		return -EBADF;
986 	}
987 
988 	if (!spdk_bdev_io_valid(bdev, offset, nbytes)) {
989 		return -EINVAL;
990 	}
991 
992 	if (nbytes == 0) {
993 		SPDK_ERRLOG("Can't unmap 0 bytes\n");
994 		return -EINVAL;
995 	}
996 
997 	bdev_io = spdk_bdev_get_io();
998 	if (!bdev_io) {
999 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
1000 		return -ENOMEM;
1001 	}
1002 
1003 	bdev_io->ch = channel;
1004 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
1005 	bdev_io->u.unmap.offset = offset;
1006 	bdev_io->u.unmap.len = nbytes;
1007 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1008 
1009 	rc = spdk_bdev_io_submit(bdev_io);
1010 	if (rc < 0) {
1011 		spdk_bdev_put_io(bdev_io);
1012 		return rc;
1013 	}
1014 
1015 	return 0;
1016 }
1017 
1018 int
1019 spdk_bdev_flush(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1020 		uint64_t offset, uint64_t length,
1021 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1022 {
1023 	struct spdk_bdev *bdev = desc->bdev;
1024 	struct spdk_bdev_io *bdev_io;
1025 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1026 	int rc;
1027 
1028 	if (!desc->write) {
1029 		return -EBADF;
1030 	}
1031 
1032 	bdev_io = spdk_bdev_get_io();
1033 	if (!bdev_io) {
1034 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
1035 		return -ENOMEM;
1036 	}
1037 
1038 	bdev_io->ch = channel;
1039 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
1040 	bdev_io->u.flush.offset = offset;
1041 	bdev_io->u.flush.length = length;
1042 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1043 
1044 	rc = spdk_bdev_io_submit(bdev_io);
1045 	if (rc < 0) {
1046 		spdk_bdev_put_io(bdev_io);
1047 		return rc;
1048 	}
1049 
1050 	return 0;
1051 }
1052 
1053 static void
1054 _spdk_bdev_reset_dev(void *io_device, void *ctx)
1055 {
1056 	struct spdk_bdev_io *bdev_io = ctx;
1057 	int rc;
1058 
1059 	rc = spdk_bdev_io_submit(bdev_io);
1060 	if (rc < 0) {
1061 		spdk_bdev_put_io(bdev_io);
1062 		SPDK_ERRLOG("reset failed\n");
1063 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1064 	}
1065 }
1066 
1067 static void
1068 _spdk_bdev_reset_abort_channel(void *io_device, struct spdk_io_channel *ch,
1069 			       void *ctx)
1070 {
1071 	struct spdk_bdev_channel	*channel;
1072 	struct spdk_bdev_mgmt_channel	*mgmt_channel;
1073 
1074 	channel = spdk_io_channel_get_ctx(ch);
1075 	mgmt_channel = spdk_io_channel_get_ctx(channel->mgmt_channel);
1076 
1077 	_spdk_bdev_abort_io(&mgmt_channel->need_buf_small, channel);
1078 	_spdk_bdev_abort_io(&mgmt_channel->need_buf_large, channel);
1079 }
1080 
1081 static void
1082 _spdk_bdev_start_reset(void *ctx)
1083 {
1084 	struct spdk_bdev_io *bdev_io = ctx;
1085 
1086 	spdk_for_each_channel(bdev_io->bdev, _spdk_bdev_reset_abort_channel,
1087 			      bdev_io, _spdk_bdev_reset_dev);
1088 }
1089 
1090 static void
1091 _spdk_bdev_start_next_reset(struct spdk_bdev *bdev)
1092 {
1093 	struct spdk_bdev_io *bdev_io;
1094 	struct spdk_thread *thread;
1095 
1096 	pthread_mutex_lock(&bdev->mutex);
1097 
1098 	if (bdev->reset_in_progress || TAILQ_EMPTY(&bdev->queued_resets)) {
1099 		pthread_mutex_unlock(&bdev->mutex);
1100 		return;
1101 	} else {
1102 		bdev_io = TAILQ_FIRST(&bdev->queued_resets);
1103 		TAILQ_REMOVE(&bdev->queued_resets, bdev_io, link);
1104 		bdev->reset_in_progress = true;
1105 		thread = spdk_io_channel_get_thread(bdev_io->ch->channel);
1106 		spdk_thread_send_msg(thread, _spdk_bdev_start_reset, bdev_io);
1107 	}
1108 
1109 	pthread_mutex_unlock(&bdev->mutex);
1110 }
1111 
1112 int
1113 spdk_bdev_reset(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1114 		spdk_bdev_io_completion_cb cb, void *cb_arg)
1115 {
1116 	struct spdk_bdev *bdev = desc->bdev;
1117 	struct spdk_bdev_io *bdev_io;
1118 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1119 
1120 	bdev_io = spdk_bdev_get_io();
1121 	if (!bdev_io) {
1122 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
1123 		return -ENOMEM;;
1124 	}
1125 
1126 	bdev_io->ch = channel;
1127 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
1128 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1129 
1130 	pthread_mutex_lock(&bdev->mutex);
1131 	TAILQ_INSERT_TAIL(&bdev->queued_resets, bdev_io, link);
1132 	pthread_mutex_unlock(&bdev->mutex);
1133 
1134 	_spdk_bdev_start_next_reset(bdev);
1135 
1136 	return 0;
1137 }
1138 
1139 void
1140 spdk_bdev_get_io_stat(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
1141 		      struct spdk_bdev_io_stat *stat)
1142 {
1143 #ifdef SPDK_CONFIG_VTUNE
1144 	SPDK_ERRLOG("Calling spdk_bdev_get_io_stat is not allowed when VTune integration is enabled.\n");
1145 	memset(stat, 0, sizeof(*stat));
1146 	return;
1147 #endif
1148 
1149 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1150 
1151 	*stat = channel->stat;
1152 	memset(&channel->stat, 0, sizeof(channel->stat));
1153 }
1154 
1155 int
1156 spdk_bdev_nvme_admin_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1157 			      const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1158 			      spdk_bdev_io_completion_cb cb, void *cb_arg)
1159 {
1160 	struct spdk_bdev *bdev = desc->bdev;
1161 	struct spdk_bdev_io *bdev_io;
1162 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1163 	int rc;
1164 
1165 	if (!desc->write) {
1166 		return -EBADF;
1167 	}
1168 
1169 	bdev_io = spdk_bdev_get_io();
1170 	if (!bdev_io) {
1171 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1172 		return -ENOMEM;
1173 	}
1174 
1175 	bdev_io->ch = channel;
1176 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_ADMIN;
1177 	bdev_io->u.nvme_passthru.cmd = *cmd;
1178 	bdev_io->u.nvme_passthru.buf = buf;
1179 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1180 
1181 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1182 
1183 	rc = spdk_bdev_io_submit(bdev_io);
1184 	if (rc < 0) {
1185 		spdk_bdev_put_io(bdev_io);
1186 		return rc;
1187 	}
1188 
1189 	return 0;
1190 }
1191 
1192 int
1193 spdk_bdev_nvme_io_passthru(struct spdk_bdev_desc *desc, struct spdk_io_channel *ch,
1194 			   const struct spdk_nvme_cmd *cmd, void *buf, size_t nbytes,
1195 			   spdk_bdev_io_completion_cb cb, void *cb_arg)
1196 {
1197 	struct spdk_bdev *bdev = desc->bdev;
1198 	struct spdk_bdev_io *bdev_io;
1199 	struct spdk_bdev_channel *channel = spdk_io_channel_get_ctx(ch);
1200 	int rc;
1201 
1202 	if (!desc->write) {
1203 		/*
1204 		 * Do not try to parse the NVMe command - we could maybe use bits in the opcode
1205 		 *  to easily determine if the command is a read or write, but for now just
1206 		 *  do not allow io_passthru with a read-only descriptor.
1207 		 */
1208 		return -EBADF;
1209 	}
1210 
1211 	bdev_io = spdk_bdev_get_io();
1212 	if (!bdev_io) {
1213 		SPDK_ERRLOG("bdev_io memory allocation failed during nvme_admin_passthru\n");
1214 		return -ENOMEM;
1215 	}
1216 
1217 	bdev_io->ch = channel;
1218 	bdev_io->type = SPDK_BDEV_IO_TYPE_NVME_IO;
1219 	bdev_io->u.nvme_passthru.cmd = *cmd;
1220 	bdev_io->u.nvme_passthru.buf = buf;
1221 	bdev_io->u.nvme_passthru.nbytes = nbytes;
1222 
1223 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
1224 
1225 	rc = spdk_bdev_io_submit(bdev_io);
1226 	if (rc < 0) {
1227 		spdk_bdev_put_io(bdev_io);
1228 		return rc;
1229 	}
1230 
1231 	return 0;
1232 }
1233 
1234 int
1235 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
1236 {
1237 	if (!bdev_io) {
1238 		SPDK_ERRLOG("bdev_io is NULL\n");
1239 		return -1;
1240 	}
1241 
1242 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
1243 		SPDK_ERRLOG("bdev_io is in pending state\n");
1244 		assert(false);
1245 		return -1;
1246 	}
1247 
1248 	spdk_bdev_put_io(bdev_io);
1249 
1250 	return 0;
1251 }
1252 
1253 static void
1254 _spdk_bdev_io_complete(void *ctx)
1255 {
1256 	struct spdk_bdev_io *bdev_io = ctx;
1257 
1258 	assert(bdev_io->cb != NULL);
1259 	bdev_io->cb(bdev_io, bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS, bdev_io->caller_ctx);
1260 }
1261 
1262 void
1263 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
1264 {
1265 	bdev_io->status = status;
1266 
1267 	assert(bdev_io->ch->io_outstanding > 0);
1268 	bdev_io->ch->io_outstanding--;
1269 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
1270 		/* Successful reset */
1271 		if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1272 			/* Increase the bdev generation */
1273 			bdev_io->bdev->gencnt++;
1274 		}
1275 		bdev_io->bdev->reset_in_progress = false;
1276 		_spdk_bdev_start_next_reset(bdev_io->bdev);
1277 	} else {
1278 		/*
1279 		 * Check the gencnt, to see if this I/O was issued before the most
1280 		 * recent reset. If the gencnt is not equal, then just free the I/O
1281 		 * without calling the callback, since the caller will have already
1282 		 * freed its context for this I/O.
1283 		 */
1284 		if (bdev_io->bdev->gencnt != bdev_io->gencnt) {
1285 			spdk_bdev_put_io(bdev_io);
1286 			return;
1287 		}
1288 	}
1289 
1290 	if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1291 		switch (bdev_io->type) {
1292 		case SPDK_BDEV_IO_TYPE_READ:
1293 			bdev_io->ch->stat.bytes_read += bdev_io->u.read.len;
1294 			bdev_io->ch->stat.num_read_ops++;
1295 			break;
1296 		case SPDK_BDEV_IO_TYPE_WRITE:
1297 			bdev_io->ch->stat.bytes_written += bdev_io->u.write.len;
1298 			bdev_io->ch->stat.num_write_ops++;
1299 			break;
1300 		default:
1301 			break;
1302 		}
1303 	}
1304 
1305 #ifdef SPDK_CONFIG_VTUNE
1306 	uint64_t now_tsc = spdk_get_ticks();
1307 	if (now_tsc > (bdev_io->ch->start_tsc + bdev_io->ch->interval_tsc)) {
1308 		uint64_t data[5];
1309 
1310 		data[0] = bdev_io->ch->stat.num_read_ops;
1311 		data[1] = bdev_io->ch->stat.bytes_read;
1312 		data[2] = bdev_io->ch->stat.num_write_ops;
1313 		data[3] = bdev_io->ch->stat.bytes_written;
1314 		data[4] = bdev_io->bdev->fn_table->get_spin_time ?
1315 			  bdev_io->bdev->fn_table->get_spin_time(bdev_io->ch->channel) : 0;
1316 
1317 		__itt_metadata_add(g_bdev_mgr.domain, __itt_null, bdev_io->ch->handle,
1318 				   __itt_metadata_u64, 5, data);
1319 
1320 		memset(&bdev_io->ch->stat, 0, sizeof(bdev_io->ch->stat));
1321 		bdev_io->ch->start_tsc = now_tsc;
1322 	}
1323 #endif
1324 
1325 	if (bdev_io->in_submit_request || bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
1326 		/*
1327 		 * Defer completion to avoid potential infinite recursion if the
1328 		 * user's completion callback issues a new I/O.
1329 		 */
1330 		spdk_thread_send_msg(spdk_io_channel_get_thread(bdev_io->ch->channel),
1331 				     _spdk_bdev_io_complete, bdev_io);
1332 	} else {
1333 		_spdk_bdev_io_complete(bdev_io);
1334 	}
1335 }
1336 
1337 void
1338 spdk_bdev_io_complete_scsi_status(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
1339 				  enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
1340 {
1341 	if (sc == SPDK_SCSI_STATUS_GOOD) {
1342 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1343 	} else {
1344 		bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
1345 		bdev_io->error.scsi.sc = sc;
1346 		bdev_io->error.scsi.sk = sk;
1347 		bdev_io->error.scsi.asc = asc;
1348 		bdev_io->error.scsi.ascq = ascq;
1349 	}
1350 
1351 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1352 }
1353 
1354 void
1355 spdk_bdev_io_get_scsi_status(const struct spdk_bdev_io *bdev_io,
1356 			     int *sc, int *sk, int *asc, int *ascq)
1357 {
1358 	assert(sc != NULL);
1359 	assert(sk != NULL);
1360 	assert(asc != NULL);
1361 	assert(ascq != NULL);
1362 
1363 	switch (bdev_io->status) {
1364 	case SPDK_BDEV_IO_STATUS_SUCCESS:
1365 		*sc = SPDK_SCSI_STATUS_GOOD;
1366 		*sk = SPDK_SCSI_SENSE_NO_SENSE;
1367 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1368 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1369 		break;
1370 	case SPDK_BDEV_IO_STATUS_NVME_ERROR:
1371 		spdk_scsi_nvme_translate(bdev_io, sc, sk, asc, ascq);
1372 		break;
1373 	case SPDK_BDEV_IO_STATUS_SCSI_ERROR:
1374 		*sc = bdev_io->error.scsi.sc;
1375 		*sk = bdev_io->error.scsi.sk;
1376 		*asc = bdev_io->error.scsi.asc;
1377 		*ascq = bdev_io->error.scsi.ascq;
1378 		break;
1379 	default:
1380 		*sc = SPDK_SCSI_STATUS_CHECK_CONDITION;
1381 		*sk = SPDK_SCSI_SENSE_ABORTED_COMMAND;
1382 		*asc = SPDK_SCSI_ASC_NO_ADDITIONAL_SENSE;
1383 		*ascq = SPDK_SCSI_ASCQ_CAUSE_NOT_REPORTABLE;
1384 		break;
1385 	}
1386 }
1387 
1388 void
1389 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
1390 {
1391 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
1392 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
1393 	} else {
1394 		bdev_io->error.nvme.sct = sct;
1395 		bdev_io->error.nvme.sc = sc;
1396 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
1397 	}
1398 
1399 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
1400 }
1401 
1402 void
1403 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
1404 {
1405 	assert(sct != NULL);
1406 	assert(sc != NULL);
1407 
1408 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
1409 		*sct = bdev_io->error.nvme.sct;
1410 		*sc = bdev_io->error.nvme.sc;
1411 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
1412 		*sct = SPDK_NVME_SCT_GENERIC;
1413 		*sc = SPDK_NVME_SC_SUCCESS;
1414 	} else {
1415 		*sct = SPDK_NVME_SCT_GENERIC;
1416 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
1417 	}
1418 }
1419 
1420 static void
1421 _spdk_bdev_register(struct spdk_bdev *bdev)
1422 {
1423 	struct spdk_bdev_module_if *module;
1424 
1425 	assert(bdev->module != NULL);
1426 
1427 	bdev->status = SPDK_BDEV_STATUS_READY;
1428 
1429 	/* initialize the reset generation value to zero */
1430 	bdev->gencnt = 0;
1431 	TAILQ_INIT(&bdev->open_descs);
1432 	bdev->bdev_opened_for_write = false;
1433 
1434 	TAILQ_INIT(&bdev->vbdevs);
1435 	TAILQ_INIT(&bdev->base_bdevs);
1436 
1437 	bdev->reset_in_progress = false;
1438 	TAILQ_INIT(&bdev->queued_resets);
1439 
1440 	spdk_io_device_register(bdev, spdk_bdev_channel_create, spdk_bdev_channel_destroy,
1441 				sizeof(struct spdk_bdev_channel));
1442 
1443 	pthread_mutex_init(&bdev->mutex, NULL);
1444 	SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Inserting bdev %s into list\n", bdev->name);
1445 	TAILQ_INSERT_TAIL(&g_bdev_mgr.bdevs, bdev, link);
1446 
1447 	TAILQ_FOREACH(module, &g_bdev_mgr.bdev_modules, tailq) {
1448 		if (module->examine) {
1449 			module->examine_in_progress++;
1450 			module->examine(bdev);
1451 		}
1452 	}
1453 }
1454 
1455 void
1456 spdk_bdev_register(struct spdk_bdev *bdev)
1457 {
1458 	_spdk_bdev_register(bdev);
1459 }
1460 
1461 void
1462 spdk_vbdev_register(struct spdk_bdev *vbdev, struct spdk_bdev **base_bdevs, int base_bdev_count)
1463 {
1464 	int i;
1465 
1466 	_spdk_bdev_register(vbdev);
1467 	for (i = 0; i < base_bdev_count; i++) {
1468 		assert(base_bdevs[i] != NULL);
1469 		TAILQ_INSERT_TAIL(&vbdev->base_bdevs, base_bdevs[i], base_bdev_link);
1470 		TAILQ_INSERT_TAIL(&base_bdevs[i]->vbdevs, vbdev, vbdev_link);
1471 	}
1472 }
1473 
1474 void
1475 spdk_bdev_unregister(struct spdk_bdev *bdev)
1476 {
1477 	struct spdk_bdev_desc	*desc, *tmp;
1478 	int			rc;
1479 	bool			do_destruct = true;
1480 
1481 	SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Removing bdev %s from list\n", bdev->name);
1482 
1483 	pthread_mutex_lock(&bdev->mutex);
1484 
1485 	bdev->status = SPDK_BDEV_STATUS_REMOVING;
1486 
1487 	TAILQ_FOREACH_SAFE(desc, &bdev->open_descs, link, tmp) {
1488 		if (desc->remove_cb) {
1489 			pthread_mutex_unlock(&bdev->mutex);
1490 			do_destruct = false;
1491 			desc->remove_cb(desc->remove_ctx);
1492 			pthread_mutex_lock(&bdev->mutex);
1493 		}
1494 	}
1495 
1496 	if (!do_destruct) {
1497 		pthread_mutex_unlock(&bdev->mutex);
1498 		return;
1499 	}
1500 
1501 	TAILQ_REMOVE(&g_bdev_mgr.bdevs, bdev, link);
1502 	pthread_mutex_unlock(&bdev->mutex);
1503 
1504 	pthread_mutex_destroy(&bdev->mutex);
1505 
1506 	spdk_io_device_unregister(bdev, NULL);
1507 
1508 	rc = bdev->fn_table->destruct(bdev->ctxt);
1509 	if (rc < 0) {
1510 		SPDK_ERRLOG("destruct failed\n");
1511 	}
1512 }
1513 
1514 void
1515 spdk_vbdev_unregister(struct spdk_bdev *vbdev)
1516 {
1517 	struct spdk_bdev *base_bdev;
1518 
1519 	assert(!TAILQ_EMPTY(&vbdev->base_bdevs));
1520 	TAILQ_FOREACH(base_bdev, &vbdev->base_bdevs, base_bdev_link) {
1521 		TAILQ_REMOVE(&base_bdev->vbdevs, vbdev, vbdev_link);
1522 	}
1523 	spdk_bdev_unregister(vbdev);
1524 }
1525 
1526 void
1527 spdk_bdev_module_examine_done(struct spdk_bdev_module_if *module)
1528 {
1529 	struct spdk_bdev_module_if *m;
1530 
1531 	assert(module->examine_in_progress > 0);
1532 	module->examine_in_progress--;
1533 
1534 	/*
1535 	 * Check all bdev modules for an examinations in progress.  If any
1536 	 * exist, return immediately since we cannot finish bdev subsystem
1537 	 * initialization until all are completed.
1538 	 */
1539 	TAILQ_FOREACH(m, &g_bdev_mgr.bdev_modules, tailq) {
1540 		if (m->examine_in_progress > 0) {
1541 			return;
1542 		}
1543 	}
1544 
1545 	if (g_bdev_mgr.module_init_complete && !g_bdev_mgr.init_complete) {
1546 		/*
1547 		 * Modules already finished initialization - now that all
1548 		 * the bdev moduless have finished their asynchronous I/O
1549 		 * processing, the entire bdev layer can be marked as complete.
1550 		 */
1551 		spdk_bdev_init_complete(0);
1552 	}
1553 }
1554 
1555 int
1556 spdk_bdev_open(struct spdk_bdev *bdev, bool write, spdk_bdev_remove_cb_t remove_cb,
1557 	       void *remove_ctx, struct spdk_bdev_desc **_desc)
1558 {
1559 	struct spdk_bdev_desc *desc;
1560 
1561 	desc = calloc(1, sizeof(*desc));
1562 	if (desc == NULL) {
1563 		return -ENOMEM;
1564 	}
1565 
1566 	pthread_mutex_lock(&bdev->mutex);
1567 
1568 	if (write && (bdev->bdev_opened_for_write || bdev->claim_module)) {
1569 		SPDK_ERRLOG("failed, %s already opened for write or claimed\n", bdev->name);
1570 		free(desc);
1571 		pthread_mutex_unlock(&bdev->mutex);
1572 		return -EPERM;
1573 	}
1574 
1575 	TAILQ_INSERT_TAIL(&bdev->open_descs, desc, link);
1576 
1577 	if (write) {
1578 		bdev->bdev_opened_for_write = true;
1579 	}
1580 
1581 	desc->bdev = bdev;
1582 	desc->remove_cb = remove_cb;
1583 	desc->remove_ctx = remove_ctx;
1584 	desc->write = write;
1585 	*_desc = desc;
1586 
1587 	pthread_mutex_unlock(&bdev->mutex);
1588 
1589 	return 0;
1590 }
1591 
1592 void
1593 spdk_bdev_close(struct spdk_bdev_desc *desc)
1594 {
1595 	struct spdk_bdev *bdev = desc->bdev;
1596 	bool do_unregister = false;
1597 
1598 	pthread_mutex_lock(&bdev->mutex);
1599 
1600 	if (desc->write) {
1601 		assert(bdev->bdev_opened_for_write);
1602 		bdev->bdev_opened_for_write = false;
1603 	}
1604 
1605 	TAILQ_REMOVE(&bdev->open_descs, desc, link);
1606 	free(desc);
1607 
1608 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING && TAILQ_EMPTY(&bdev->open_descs)) {
1609 		do_unregister = true;
1610 	}
1611 	pthread_mutex_unlock(&bdev->mutex);
1612 
1613 	if (do_unregister == true) {
1614 		spdk_bdev_unregister(bdev);
1615 	}
1616 }
1617 
1618 int
1619 spdk_bdev_module_claim_bdev(struct spdk_bdev *bdev, struct spdk_bdev_desc *desc,
1620 			    struct spdk_bdev_module_if *module)
1621 {
1622 	if (bdev->claim_module != NULL) {
1623 		SPDK_ERRLOG("bdev %s already claimed by module %s\n", bdev->name,
1624 			    bdev->claim_module->name);
1625 		return -EPERM;
1626 	}
1627 
1628 	if ((!desc || !desc->write) && bdev->bdev_opened_for_write) {
1629 		SPDK_ERRLOG("bdev %s already opened with write access\n", bdev->name);
1630 		return -EPERM;
1631 	}
1632 
1633 	if (desc && !desc->write) {
1634 		bdev->bdev_opened_for_write = true;
1635 		desc->write = true;
1636 	}
1637 
1638 	bdev->claim_module = module;
1639 	return 0;
1640 }
1641 
1642 void
1643 spdk_bdev_module_release_bdev(struct spdk_bdev *bdev)
1644 {
1645 	assert(bdev->claim_module != NULL);
1646 	bdev->claim_module = NULL;
1647 }
1648 
1649 struct spdk_bdev *
1650 spdk_bdev_desc_get_bdev(struct spdk_bdev_desc *desc)
1651 {
1652 	return desc->bdev;
1653 }
1654 
1655 void
1656 spdk_bdev_io_get_iovec(struct spdk_bdev_io *bdev_io, struct iovec **iovp, int *iovcntp)
1657 {
1658 	struct iovec *iovs;
1659 	int iovcnt;
1660 
1661 	if (bdev_io == NULL) {
1662 		return;
1663 	}
1664 
1665 	switch (bdev_io->type) {
1666 	case SPDK_BDEV_IO_TYPE_READ:
1667 		iovs = bdev_io->u.read.iovs;
1668 		iovcnt = bdev_io->u.read.iovcnt;
1669 		break;
1670 	case SPDK_BDEV_IO_TYPE_WRITE:
1671 		iovs = bdev_io->u.write.iovs;
1672 		iovcnt = bdev_io->u.write.iovcnt;
1673 		break;
1674 	default:
1675 		iovs = NULL;
1676 		iovcnt = 0;
1677 		break;
1678 	}
1679 
1680 	if (iovp) {
1681 		*iovp = iovs;
1682 	}
1683 	if (iovcntp) {
1684 		*iovcntp = iovcnt;
1685 	}
1686 }
1687 
1688 void
1689 spdk_bdev_module_list_add(struct spdk_bdev_module_if *bdev_module)
1690 {
1691 	/*
1692 	 * Modules with examine callbacks must be initialized first, so they are
1693 	 *  ready to handle examine callbacks from later modules that will
1694 	 *  register physical bdevs.
1695 	 */
1696 	if (bdev_module->examine != NULL) {
1697 		TAILQ_INSERT_HEAD(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
1698 	} else {
1699 		TAILQ_INSERT_TAIL(&g_bdev_mgr.bdev_modules, bdev_module, tailq);
1700 	}
1701 }
1702