xref: /spdk/lib/bdev/bdev.c (revision edbca2a67610cf6ebb9755c839e307d1d18375da)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (C) 2008-2012 Daisuke Aoyama <aoyama@peach.ne.jp>.
5  *   Copyright (c) Intel Corporation.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/bdev.h"
36 
37 #include <stdlib.h>
38 #include <stdio.h>
39 #include <string.h>
40 #include <unistd.h>
41 
42 #include <rte_config.h>
43 #include <rte_mempool.h>
44 #include <rte_version.h>
45 
46 #include "spdk/queue.h"
47 #include "spdk/nvme_spec.h"
48 
49 #include "spdk_internal/bdev.h"
50 #include "spdk_internal/event.h"
51 #include "spdk_internal/log.h"
52 
53 #define SPDK_BDEV_IO_POOL_SIZE	(64 * 1024)
54 #define RBUF_SMALL_POOL_SIZE	8192
55 #define RBUF_LARGE_POOL_SIZE	1024
56 
57 static struct rte_mempool *spdk_bdev_g_io_pool = NULL;
58 static struct rte_mempool *g_rbuf_small_pool = NULL;
59 static struct rte_mempool *g_rbuf_large_pool = NULL;
60 
61 typedef TAILQ_HEAD(, spdk_bdev_io) need_rbuf_tailq_t;
62 static need_rbuf_tailq_t g_need_rbuf_small[RTE_MAX_LCORE];
63 static need_rbuf_tailq_t g_need_rbuf_large[RTE_MAX_LCORE];
64 
65 static TAILQ_HEAD(, spdk_bdev_module_if) spdk_bdev_module_list =
66 	TAILQ_HEAD_INITIALIZER(spdk_bdev_module_list);
67 static TAILQ_HEAD(, spdk_bdev_module_if) spdk_vbdev_module_list =
68 	TAILQ_HEAD_INITIALIZER(spdk_vbdev_module_list);
69 
70 static TAILQ_HEAD(, spdk_bdev) spdk_bdev_list =
71 	TAILQ_HEAD_INITIALIZER(spdk_bdev_list);
72 
73 struct spdk_bdev *spdk_bdev_first(void)
74 {
75 	struct spdk_bdev *bdev;
76 
77 	bdev = TAILQ_FIRST(&spdk_bdev_list);
78 	if (bdev) {
79 		SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Starting bdev iteration at %s\n", bdev->name);
80 	}
81 
82 	return bdev;
83 }
84 
85 struct spdk_bdev *spdk_bdev_next(struct spdk_bdev *prev)
86 {
87 	struct spdk_bdev *bdev;
88 
89 	bdev = TAILQ_NEXT(prev, link);
90 	if (bdev) {
91 		SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Continuing bdev iteration at %s\n", bdev->name);
92 	}
93 
94 	return bdev;
95 }
96 
97 struct spdk_bdev *spdk_bdev_get_by_name(const char *bdev_name)
98 {
99 	struct spdk_bdev *bdev = spdk_bdev_first();
100 
101 	while (bdev != NULL) {
102 		if (strncmp(bdev_name, bdev->name, sizeof(bdev->name)) == 0) {
103 			return bdev;
104 		}
105 		bdev = spdk_bdev_next(bdev);
106 	}
107 
108 	return NULL;
109 }
110 
111 static void
112 spdk_bdev_io_set_rbuf(struct spdk_bdev_io *bdev_io, void *buf)
113 {
114 	assert(bdev_io->get_rbuf_cb != NULL);
115 	assert(buf != NULL);
116 	assert(bdev_io->u.read.iovs != NULL);
117 
118 	bdev_io->u.read.buf_unaligned = buf;
119 	bdev_io->u.read.iovs[0].iov_base = (void *)((unsigned long)((char *)buf + 512) & ~511UL);
120 	bdev_io->u.read.iovs[0].iov_len = bdev_io->u.read.len;
121 	bdev_io->u.read.put_rbuf = true;
122 	bdev_io->get_rbuf_cb(bdev_io);
123 }
124 
125 static void
126 spdk_bdev_io_put_rbuf(struct spdk_bdev_io *bdev_io)
127 {
128 	struct rte_mempool *pool;
129 	struct spdk_bdev_io *tmp;
130 	void *buf;
131 	need_rbuf_tailq_t *tailq;
132 	uint64_t length;
133 
134 	assert(bdev_io->u.read.iovcnt == 1);
135 
136 	length = bdev_io->u.read.len;
137 	buf = bdev_io->u.read.buf_unaligned;
138 
139 	if (length <= SPDK_BDEV_SMALL_RBUF_MAX_SIZE) {
140 		pool = g_rbuf_small_pool;
141 		tailq = &g_need_rbuf_small[rte_lcore_id()];
142 	} else {
143 		pool = g_rbuf_large_pool;
144 		tailq = &g_need_rbuf_large[rte_lcore_id()];
145 	}
146 
147 	if (TAILQ_EMPTY(tailq)) {
148 		rte_mempool_put(pool, buf);
149 	} else {
150 		tmp = TAILQ_FIRST(tailq);
151 		TAILQ_REMOVE(tailq, tmp, rbuf_link);
152 		spdk_bdev_io_set_rbuf(tmp, buf);
153 	}
154 }
155 
156 static int spdk_initialize_rbuf_pool(void)
157 {
158 	int cache_size;
159 
160 	/**
161 	 * Ensure no more than half of the total buffers end up local caches, by
162 	 *   using spdk_event_get_active_core_count() to determine how many local caches we need
163 	 *   to account for.
164 	 */
165 	cache_size = RBUF_SMALL_POOL_SIZE / (2 * spdk_app_get_core_count());
166 	if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE)
167 		cache_size = RTE_MEMPOOL_CACHE_MAX_SIZE;
168 	g_rbuf_small_pool = rte_mempool_create("rbuf_small_pool",
169 					       RBUF_SMALL_POOL_SIZE,
170 					       SPDK_BDEV_SMALL_RBUF_MAX_SIZE + 512,
171 					       cache_size, 0, NULL, NULL, NULL, NULL,
172 					       SOCKET_ID_ANY, 0);
173 	if (!g_rbuf_small_pool) {
174 		SPDK_ERRLOG("create rbuf small pool failed\n");
175 		return -1;
176 	}
177 
178 	cache_size = RBUF_LARGE_POOL_SIZE / (2 * spdk_app_get_core_count());
179 	if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE)
180 		cache_size = RTE_MEMPOOL_CACHE_MAX_SIZE;
181 	g_rbuf_large_pool = rte_mempool_create("rbuf_large_pool",
182 					       RBUF_LARGE_POOL_SIZE,
183 					       SPDK_BDEV_LARGE_RBUF_MAX_SIZE + 512,
184 					       cache_size, 0, NULL, NULL, NULL, NULL,
185 					       SOCKET_ID_ANY, 0);
186 	if (!g_rbuf_large_pool) {
187 		SPDK_ERRLOG("create rbuf large pool failed\n");
188 		return -1;
189 	}
190 
191 	return 0;
192 }
193 
194 static int
195 spdk_bdev_module_get_max_ctx_size(void)
196 {
197 	struct spdk_bdev_module_if *bdev_module;
198 	int max_bdev_module_size = 0;
199 
200 	TAILQ_FOREACH(bdev_module, &spdk_bdev_module_list, tailq) {
201 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
202 			max_bdev_module_size = bdev_module->get_ctx_size();
203 		}
204 	}
205 
206 	TAILQ_FOREACH(bdev_module, &spdk_vbdev_module_list, tailq) {
207 		if (bdev_module->get_ctx_size && bdev_module->get_ctx_size() > max_bdev_module_size) {
208 			max_bdev_module_size = bdev_module->get_ctx_size();
209 		}
210 	}
211 
212 	return max_bdev_module_size;
213 }
214 
215 static int
216 spdk_bdev_module_initialize(void)
217 {
218 	struct spdk_bdev_module_if *bdev_module;
219 	int rc = 0;
220 
221 	TAILQ_FOREACH(bdev_module, &spdk_bdev_module_list, tailq) {
222 		rc = bdev_module->module_init();
223 		if (rc)
224 			return rc;
225 	}
226 	TAILQ_FOREACH(bdev_module, &spdk_vbdev_module_list, tailq) {
227 		rc = bdev_module->module_init();
228 		if (rc)
229 			return rc;
230 	}
231 	return rc;
232 }
233 
234 static void
235 spdk_bdev_module_finish(void)
236 {
237 	struct spdk_bdev_module_if *bdev_module;
238 
239 	TAILQ_FOREACH(bdev_module, &spdk_vbdev_module_list, tailq) {
240 		if (bdev_module->module_fini) {
241 			bdev_module->module_fini();
242 		}
243 	}
244 
245 	TAILQ_FOREACH(bdev_module, &spdk_bdev_module_list, tailq) {
246 		if (bdev_module->module_fini) {
247 			bdev_module->module_fini();
248 		}
249 	}
250 }
251 
252 static void
253 spdk_bdev_config_text(FILE *fp)
254 {
255 	struct spdk_bdev_module_if *bdev_module;
256 
257 	TAILQ_FOREACH(bdev_module, &spdk_bdev_module_list, tailq) {
258 		if (bdev_module->config_text) {
259 			bdev_module->config_text(fp);
260 		}
261 	}
262 	TAILQ_FOREACH(bdev_module, &spdk_vbdev_module_list, tailq) {
263 		if (bdev_module->config_text) {
264 			bdev_module->config_text(fp);
265 		}
266 	}
267 }
268 
269 static int
270 spdk_bdev_initialize(void)
271 {
272 	int i;
273 
274 	if (spdk_bdev_module_initialize()) {
275 		SPDK_ERRLOG("bdev module initialize failed");
276 		return -1;
277 	}
278 
279 	spdk_bdev_g_io_pool = rte_mempool_create("blockdev_io",
280 			      SPDK_BDEV_IO_POOL_SIZE,
281 			      sizeof(struct spdk_bdev_io) +
282 			      spdk_bdev_module_get_max_ctx_size(),
283 			      64, 0,
284 			      NULL, NULL, NULL, NULL,
285 			      SOCKET_ID_ANY, 0);
286 
287 	if (spdk_bdev_g_io_pool == NULL) {
288 		SPDK_ERRLOG("could not allocate spdk_bdev_io pool");
289 		return -1;
290 	}
291 
292 	for (i = 0; i < RTE_MAX_LCORE; i++) {
293 		TAILQ_INIT(&g_need_rbuf_small[i]);
294 		TAILQ_INIT(&g_need_rbuf_large[i]);
295 	}
296 
297 	return spdk_initialize_rbuf_pool();
298 }
299 
300 /*
301  * Wrapper to provide rte_mempool_avail_count() on older DPDK versions.
302  * Drop this if the minimum DPDK version is raised to at least 16.07.
303  */
304 #if RTE_VERSION < RTE_VERSION_NUM(16, 7, 0, 1)
305 static unsigned rte_mempool_avail_count(const struct rte_mempool *pool)
306 {
307 	return rte_mempool_count(pool);
308 }
309 #endif
310 
311 static int
312 spdk_bdev_check_pool(struct rte_mempool *pool, uint32_t count)
313 {
314 	if (rte_mempool_avail_count(pool) != count) {
315 		SPDK_ERRLOG("rte_mempool_avail_count(%s) == %d, should be %d\n",
316 			    pool->name, rte_mempool_avail_count(pool), count);
317 		return -1;
318 	} else {
319 		return 0;
320 	}
321 }
322 
323 static int
324 spdk_bdev_finish(void)
325 {
326 	int rc = 0;
327 
328 	spdk_bdev_module_finish();
329 
330 	rc += spdk_bdev_check_pool(g_rbuf_small_pool, RBUF_SMALL_POOL_SIZE);
331 	rc += spdk_bdev_check_pool(g_rbuf_large_pool, RBUF_LARGE_POOL_SIZE);
332 
333 	return (rc != 0);
334 }
335 
336 struct spdk_bdev_io *spdk_bdev_get_io(void)
337 {
338 	struct spdk_bdev_io *bdev_io;
339 	int rc;
340 
341 	rc = rte_mempool_get(spdk_bdev_g_io_pool, (void **)&bdev_io);
342 	if (rc < 0 || !bdev_io) {
343 		SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
344 		abort();
345 	}
346 
347 	memset(bdev_io, 0, sizeof(*bdev_io));
348 
349 	return bdev_io;
350 }
351 
352 static void
353 spdk_bdev_put_io(struct spdk_bdev_io *bdev_io)
354 {
355 	if (!bdev_io) {
356 		return;
357 	}
358 
359 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ && bdev_io->u.read.put_rbuf) {
360 		spdk_bdev_io_put_rbuf(bdev_io);
361 	}
362 
363 	rte_mempool_put(spdk_bdev_g_io_pool, bdev_io);
364 }
365 
366 static void
367 _spdk_bdev_io_get_rbuf(struct spdk_bdev_io *bdev_io)
368 {
369 	uint64_t len = bdev_io->u.read.len;
370 	struct rte_mempool *pool;
371 	need_rbuf_tailq_t *tailq;
372 	int rc;
373 	void *buf = NULL;
374 
375 	if (len <= SPDK_BDEV_SMALL_RBUF_MAX_SIZE) {
376 		pool = g_rbuf_small_pool;
377 		tailq = &g_need_rbuf_small[rte_lcore_id()];
378 	} else {
379 		pool = g_rbuf_large_pool;
380 		tailq = &g_need_rbuf_large[rte_lcore_id()];
381 	}
382 
383 	rc = rte_mempool_get(pool, (void **)&buf);
384 	if (rc < 0 || !buf) {
385 		TAILQ_INSERT_TAIL(tailq, bdev_io, rbuf_link);
386 	} else {
387 		spdk_bdev_io_set_rbuf(bdev_io, buf);
388 	}
389 }
390 
391 
392 static void
393 spdk_bdev_cleanup_pending_rbuf_io(struct spdk_bdev *bdev)
394 {
395 	struct spdk_bdev_io *bdev_io, *tmp;
396 
397 	TAILQ_FOREACH_SAFE(bdev_io, &g_need_rbuf_small[rte_lcore_id()], rbuf_link, tmp) {
398 		if (bdev_io->bdev == bdev) {
399 			TAILQ_REMOVE(&g_need_rbuf_small[rte_lcore_id()], bdev_io, rbuf_link);
400 			bdev_io->status = SPDK_BDEV_IO_STATUS_FAILED;
401 		}
402 	}
403 
404 	TAILQ_FOREACH_SAFE(bdev_io, &g_need_rbuf_large[rte_lcore_id()], rbuf_link, tmp) {
405 		if (bdev_io->bdev == bdev) {
406 			TAILQ_REMOVE(&g_need_rbuf_large[rte_lcore_id()], bdev_io, rbuf_link);
407 			bdev_io->status = SPDK_BDEV_IO_STATUS_FAILED;
408 		}
409 	}
410 }
411 
412 static void
413 __submit_request(struct spdk_bdev *bdev, struct spdk_bdev_io *bdev_io)
414 {
415 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
416 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
417 		spdk_bdev_cleanup_pending_rbuf_io(bdev);
418 	}
419 	bdev_io->in_submit_request = true;
420 	bdev->fn_table->submit_request(bdev_io);
421 	bdev_io->in_submit_request = false;
422 }
423 
424 static int
425 spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io)
426 {
427 	struct spdk_bdev *bdev = bdev_io->bdev;
428 
429 	__submit_request(bdev, bdev_io);
430 	return 0;
431 }
432 
433 void
434 spdk_bdev_io_resubmit(struct spdk_bdev_io *bdev_io, struct spdk_bdev *new_bdev)
435 {
436 	assert(bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING);
437 	bdev_io->bdev = new_bdev;
438 
439 	/*
440 	 * These fields are normally set during spdk_bdev_io_init(), but since bdev is
441 	 * being switched, they need to be reinitialized.
442 	 */
443 	bdev_io->gencnt = new_bdev->gencnt;
444 	bdev_io->ctx = new_bdev->ctxt;
445 
446 	__submit_request(new_bdev, bdev_io);
447 }
448 
449 static void
450 spdk_bdev_io_init(struct spdk_bdev_io *bdev_io,
451 		  struct spdk_bdev *bdev, void *cb_arg,
452 		  spdk_bdev_io_completion_cb cb)
453 {
454 	bdev_io->bdev = bdev;
455 	bdev_io->ctx = bdev->ctxt;
456 	bdev_io->caller_ctx = cb_arg;
457 	bdev_io->cb = cb;
458 	bdev_io->gencnt = bdev->gencnt;
459 	bdev_io->status = SPDK_BDEV_IO_STATUS_PENDING;
460 	bdev_io->in_submit_request = false;
461 	TAILQ_INIT(&bdev_io->child_io);
462 }
463 
464 struct spdk_bdev_io *
465 spdk_bdev_get_child_io(struct spdk_bdev_io *parent,
466 		       struct spdk_bdev *bdev,
467 		       spdk_bdev_io_completion_cb cb,
468 		       void *cb_arg)
469 {
470 	struct spdk_bdev_io *child;
471 
472 	child = spdk_bdev_get_io();
473 	if (!child) {
474 		SPDK_ERRLOG("Unable to get spdk_bdev_io\n");
475 		return NULL;
476 	}
477 
478 	if (cb_arg == NULL) {
479 		cb_arg = child;
480 	}
481 
482 	spdk_bdev_io_init(child, bdev, cb_arg, cb);
483 
484 	child->type = parent->type;
485 	memcpy(&child->u, &parent->u, sizeof(child->u));
486 	if (child->type == SPDK_BDEV_IO_TYPE_READ) {
487 		child->u.read.put_rbuf = false;
488 	}
489 	child->get_rbuf_cb = NULL;
490 	child->parent = parent;
491 
492 	TAILQ_INSERT_TAIL(&parent->child_io, child, link);
493 
494 	return child;
495 }
496 
497 bool
498 spdk_bdev_io_type_supported(struct spdk_bdev *bdev, enum spdk_bdev_io_type io_type)
499 {
500 	return bdev->fn_table->io_type_supported(bdev, io_type);
501 }
502 
503 int
504 spdk_bdev_dump_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
505 {
506 	if (bdev->fn_table->dump_config_json) {
507 		return bdev->fn_table->dump_config_json(bdev, w);
508 	}
509 
510 	return 0;
511 }
512 
513 struct spdk_io_channel *
514 spdk_bdev_get_io_channel(struct spdk_bdev *bdev, uint32_t priority)
515 {
516 	return bdev->fn_table->get_io_channel(bdev, priority);
517 }
518 
519 static int
520 spdk_bdev_io_valid(struct spdk_bdev *bdev, uint64_t offset, uint64_t nbytes)
521 {
522 	/* Return failure if nbytes is not a multiple of bdev->blocklen */
523 	if (nbytes % bdev->blocklen) {
524 		return -1;
525 	}
526 
527 	/* Return failure if offset + nbytes is less than offset; indicates there
528 	 * has been an overflow and hence the offset has been wrapped around */
529 	if (offset + nbytes < offset) {
530 		return -1;
531 	}
532 
533 	/* Return failure if offset + nbytes exceeds the size of the blockdev */
534 	if (offset + nbytes > bdev->blockcnt * bdev->blocklen) {
535 		return -1;
536 	}
537 
538 	return 0;
539 }
540 
541 struct spdk_bdev_io *
542 spdk_bdev_read(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
543 	       void *buf, uint64_t offset, uint64_t nbytes,
544 	       spdk_bdev_io_completion_cb cb, void *cb_arg)
545 {
546 	struct spdk_bdev_io *bdev_io;
547 	int rc;
548 
549 	assert(bdev->status != SPDK_BDEV_STATUS_UNCLAIMED);
550 	if (spdk_bdev_io_valid(bdev, offset, nbytes) != 0) {
551 		return NULL;
552 	}
553 
554 	bdev_io = spdk_bdev_get_io();
555 	if (!bdev_io) {
556 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
557 		return NULL;
558 	}
559 
560 	bdev_io->ch = ch;
561 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
562 	bdev_io->u.read.iov.iov_base = buf;
563 	bdev_io->u.read.iov.iov_len = nbytes;
564 	bdev_io->u.read.iovs = &bdev_io->u.read.iov;
565 	bdev_io->u.read.iovcnt = 1;
566 	bdev_io->u.read.len = nbytes;
567 	bdev_io->u.read.offset = offset;
568 	bdev_io->u.read.put_rbuf = false;
569 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
570 
571 	rc = spdk_bdev_io_submit(bdev_io);
572 	if (rc < 0) {
573 		spdk_bdev_put_io(bdev_io);
574 		return NULL;
575 	}
576 
577 	return bdev_io;
578 }
579 
580 struct spdk_bdev_io *
581 spdk_bdev_readv(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
582 		struct iovec *iov, int iovcnt,
583 		uint64_t offset, uint64_t nbytes,
584 		spdk_bdev_io_completion_cb cb, void *cb_arg)
585 {
586 	struct spdk_bdev_io *bdev_io;
587 	int rc;
588 
589 	assert(bdev->status != SPDK_BDEV_STATUS_UNCLAIMED);
590 	if (spdk_bdev_io_valid(bdev, offset, nbytes) != 0) {
591 		return NULL;
592 	}
593 
594 	bdev_io = spdk_bdev_get_io();
595 	if (!bdev_io) {
596 		SPDK_ERRLOG("spdk_bdev_io memory allocation failed duing read\n");
597 		return NULL;
598 	}
599 
600 	bdev_io->ch = ch;
601 	bdev_io->type = SPDK_BDEV_IO_TYPE_READ;
602 	bdev_io->u.read.iovs = iov;
603 	bdev_io->u.read.iovcnt = iovcnt;
604 	bdev_io->u.read.len = nbytes;
605 	bdev_io->u.read.offset = offset;
606 	bdev_io->u.read.put_rbuf = false;
607 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
608 
609 	rc = spdk_bdev_io_submit(bdev_io);
610 	if (rc < 0) {
611 		spdk_bdev_put_io(bdev_io);
612 		return NULL;
613 	}
614 
615 	return bdev_io;
616 }
617 
618 struct spdk_bdev_io *
619 spdk_bdev_write(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
620 		void *buf, uint64_t offset, uint64_t nbytes,
621 		spdk_bdev_io_completion_cb cb, void *cb_arg)
622 {
623 	struct spdk_bdev_io *bdev_io;
624 	int rc;
625 
626 	assert(bdev->status != SPDK_BDEV_STATUS_UNCLAIMED);
627 	if (spdk_bdev_io_valid(bdev, offset, nbytes) != 0) {
628 		return NULL;
629 	}
630 
631 	bdev_io = spdk_bdev_get_io();
632 	if (!bdev_io) {
633 		SPDK_ERRLOG("blockdev_io memory allocation failed duing write\n");
634 		return NULL;
635 	}
636 
637 	bdev_io->ch = ch;
638 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
639 	bdev_io->u.write.iov.iov_base = buf;
640 	bdev_io->u.write.iov.iov_len = nbytes;
641 	bdev_io->u.write.iovs = &bdev_io->u.write.iov;
642 	bdev_io->u.write.iovcnt = 1;
643 	bdev_io->u.write.len = nbytes;
644 	bdev_io->u.write.offset = offset;
645 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
646 
647 	rc = spdk_bdev_io_submit(bdev_io);
648 	if (rc < 0) {
649 		spdk_bdev_put_io(bdev_io);
650 		return NULL;
651 	}
652 
653 	return bdev_io;
654 }
655 
656 struct spdk_bdev_io *
657 spdk_bdev_writev(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
658 		 struct iovec *iov, int iovcnt,
659 		 uint64_t offset, uint64_t len,
660 		 spdk_bdev_io_completion_cb cb, void *cb_arg)
661 {
662 	struct spdk_bdev_io *bdev_io;
663 	int rc;
664 
665 	assert(bdev->status != SPDK_BDEV_STATUS_UNCLAIMED);
666 	if (spdk_bdev_io_valid(bdev, offset, len) != 0) {
667 		return NULL;
668 	}
669 
670 	bdev_io = spdk_bdev_get_io();
671 	if (!bdev_io) {
672 		SPDK_ERRLOG("bdev_io memory allocation failed duing writev\n");
673 		return NULL;
674 	}
675 
676 	bdev_io->ch = ch;
677 	bdev_io->type = SPDK_BDEV_IO_TYPE_WRITE;
678 	bdev_io->u.write.iovs = iov;
679 	bdev_io->u.write.iovcnt = iovcnt;
680 	bdev_io->u.write.len = len;
681 	bdev_io->u.write.offset = offset;
682 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
683 
684 	rc = spdk_bdev_io_submit(bdev_io);
685 	if (rc < 0) {
686 		spdk_bdev_put_io(bdev_io);
687 		return NULL;
688 	}
689 
690 	return bdev_io;
691 }
692 
693 struct spdk_bdev_io *
694 spdk_bdev_unmap(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
695 		struct spdk_scsi_unmap_bdesc *unmap_d,
696 		uint16_t bdesc_count,
697 		spdk_bdev_io_completion_cb cb, void *cb_arg)
698 {
699 	struct spdk_bdev_io *bdev_io;
700 	int rc;
701 
702 	assert(bdev->status != SPDK_BDEV_STATUS_UNCLAIMED);
703 	if (bdesc_count == 0) {
704 		SPDK_ERRLOG("Invalid bdesc_count 0\n");
705 		return NULL;
706 	}
707 
708 	if (bdesc_count > bdev->max_unmap_bdesc_count) {
709 		SPDK_ERRLOG("Invalid bdesc_count %u > max_unmap_bdesc_count %u\n",
710 			    bdesc_count, bdev->max_unmap_bdesc_count);
711 		return NULL;
712 	}
713 
714 	bdev_io = spdk_bdev_get_io();
715 	if (!bdev_io) {
716 		SPDK_ERRLOG("bdev_io memory allocation failed duing unmap\n");
717 		return NULL;
718 	}
719 
720 	bdev_io->ch = ch;
721 	bdev_io->type = SPDK_BDEV_IO_TYPE_UNMAP;
722 	bdev_io->u.unmap.unmap_bdesc = unmap_d;
723 	bdev_io->u.unmap.bdesc_count = bdesc_count;
724 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
725 
726 	rc = spdk_bdev_io_submit(bdev_io);
727 	if (rc < 0) {
728 		spdk_bdev_put_io(bdev_io);
729 		return NULL;
730 	}
731 
732 	return bdev_io;
733 }
734 
735 struct spdk_bdev_io *
736 spdk_bdev_flush(struct spdk_bdev *bdev, struct spdk_io_channel *ch,
737 		uint64_t offset, uint64_t length,
738 		spdk_bdev_io_completion_cb cb, void *cb_arg)
739 {
740 	struct spdk_bdev_io *bdev_io;
741 	int rc;
742 
743 	assert(bdev->status != SPDK_BDEV_STATUS_UNCLAIMED);
744 	bdev_io = spdk_bdev_get_io();
745 	if (!bdev_io) {
746 		SPDK_ERRLOG("bdev_io memory allocation failed duing flush\n");
747 		return NULL;
748 	}
749 
750 	bdev_io->ch = ch;
751 	bdev_io->type = SPDK_BDEV_IO_TYPE_FLUSH;
752 	bdev_io->u.flush.offset = offset;
753 	bdev_io->u.flush.length = length;
754 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
755 
756 	rc = spdk_bdev_io_submit(bdev_io);
757 	if (rc < 0) {
758 		spdk_bdev_put_io(bdev_io);
759 		return NULL;
760 	}
761 
762 	return bdev_io;
763 }
764 
765 int
766 spdk_bdev_reset(struct spdk_bdev *bdev, enum spdk_bdev_reset_type reset_type,
767 		spdk_bdev_io_completion_cb cb, void *cb_arg)
768 {
769 	struct spdk_bdev_io *bdev_io;
770 	int rc;
771 
772 	assert(bdev->status != SPDK_BDEV_STATUS_UNCLAIMED);
773 	bdev_io = spdk_bdev_get_io();
774 	if (!bdev_io) {
775 		SPDK_ERRLOG("bdev_io memory allocation failed duing reset\n");
776 		return -1;
777 	}
778 
779 	bdev_io->type = SPDK_BDEV_IO_TYPE_RESET;
780 	bdev_io->u.reset.type = reset_type;
781 	spdk_bdev_io_init(bdev_io, bdev, cb_arg, cb);
782 
783 	rc = spdk_bdev_io_submit(bdev_io);
784 	if (rc < 0) {
785 		spdk_bdev_put_io(bdev_io);
786 		SPDK_ERRLOG("reset failed\n");
787 	}
788 
789 	return rc;
790 }
791 
792 int
793 spdk_bdev_free_io(struct spdk_bdev_io *bdev_io)
794 {
795 	struct spdk_bdev_io *child_io, *tmp;
796 
797 	if (!bdev_io) {
798 		SPDK_ERRLOG("bdev_io is NULL\n");
799 		return -1;
800 	}
801 
802 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_PENDING) {
803 		SPDK_ERRLOG("bdev_io is in pending state\n");
804 		return -1;
805 	}
806 
807 	TAILQ_FOREACH_SAFE(child_io, &bdev_io->child_io, link, tmp) {
808 		/*
809 		 * Make sure no references to the parent I/O remain, since it is being
810 		 * returned to the free pool.
811 		 */
812 		child_io->parent = NULL;
813 		TAILQ_REMOVE(&bdev_io->child_io, child_io, link);
814 
815 		/*
816 		 * Child I/O may have an rbuf that needs to be returned to a pool
817 		 *  on a different core, so free it through the request submission
818 		 *  process rather than calling put_io directly here.
819 		 */
820 		spdk_bdev_free_io(child_io);
821 	}
822 
823 	spdk_bdev_put_io(bdev_io);
824 
825 	return 0;
826 }
827 
828 static void
829 bdev_io_deferred_completion(void *arg1, void *arg2)
830 {
831 	struct spdk_bdev_io *bdev_io = arg1;
832 	enum spdk_bdev_io_status status = (enum spdk_bdev_io_status)arg2;
833 
834 	assert(bdev_io->in_submit_request == false);
835 
836 	spdk_bdev_io_complete(bdev_io, status);
837 }
838 
839 void
840 spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
841 {
842 	if (bdev_io->in_submit_request) {
843 		/*
844 		 * Defer completion via an event to avoid potential infinite recursion if the
845 		 * user's completion callback issues a new I/O.
846 		 */
847 		spdk_event_call(spdk_event_allocate(spdk_app_get_current_core(),
848 						    bdev_io_deferred_completion,
849 						    bdev_io,
850 						    (void *)status));
851 		return;
852 	}
853 
854 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
855 		/* Successful reset */
856 		if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
857 			/* Increase the blockdev generation if it is a hard reset */
858 			if (bdev_io->u.reset.type == SPDK_BDEV_RESET_HARD) {
859 				bdev_io->bdev->gencnt++;
860 			}
861 		}
862 	} else {
863 		/*
864 		 * Check the gencnt, to see if this I/O was issued before the most
865 		 * recent reset. If the gencnt is not equal, then just free the I/O
866 		 * without calling the callback, since the caller will have already
867 		 * freed its context for this I/O.
868 		 */
869 		if (bdev_io->bdev->gencnt != bdev_io->gencnt) {
870 			spdk_bdev_put_io(bdev_io);
871 			return;
872 		}
873 	}
874 
875 	bdev_io->status = status;
876 
877 	assert(bdev_io->cb != NULL);
878 	bdev_io->cb(bdev_io, status, bdev_io->caller_ctx);
879 }
880 
881 void
882 spdk_bdev_io_set_scsi_error(struct spdk_bdev_io *bdev_io, enum spdk_scsi_status sc,
883 			    enum spdk_scsi_sense sk, uint8_t asc, uint8_t ascq)
884 {
885 	bdev_io->status = SPDK_BDEV_IO_STATUS_SCSI_ERROR;
886 	bdev_io->error.scsi.sc = sc;
887 	bdev_io->error.scsi.sk = sk;
888 	bdev_io->error.scsi.asc = asc;
889 	bdev_io->error.scsi.ascq = ascq;
890 }
891 
892 void
893 spdk_bdev_io_complete_nvme_status(struct spdk_bdev_io *bdev_io, int sct, int sc)
894 {
895 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
896 		bdev_io->status = SPDK_BDEV_IO_STATUS_SUCCESS;
897 	} else {
898 		bdev_io->error.nvme.sct = sct;
899 		bdev_io->error.nvme.sc = sc;
900 		bdev_io->status = SPDK_BDEV_IO_STATUS_NVME_ERROR;
901 	}
902 
903 	spdk_bdev_io_complete(bdev_io, bdev_io->status);
904 }
905 
906 void
907 spdk_bdev_io_get_nvme_status(const struct spdk_bdev_io *bdev_io, int *sct, int *sc)
908 {
909 	assert(sct != NULL);
910 	assert(sc != NULL);
911 
912 	if (bdev_io->status == SPDK_BDEV_IO_STATUS_NVME_ERROR) {
913 		*sct = bdev_io->error.nvme.sct;
914 		*sc = bdev_io->error.nvme.sc;
915 	} else if (bdev_io->status == SPDK_BDEV_IO_STATUS_SUCCESS) {
916 		*sct = SPDK_NVME_SCT_GENERIC;
917 		*sc = SPDK_NVME_SC_SUCCESS;
918 	} else {
919 		*sct = SPDK_NVME_SCT_GENERIC;
920 		*sc = SPDK_NVME_SC_INTERNAL_DEVICE_ERROR;
921 	}
922 }
923 
924 void
925 spdk_bdev_register(struct spdk_bdev *bdev)
926 {
927 	/* initialize the reset generation value to zero */
928 	bdev->gencnt = 0;
929 
930 	pthread_mutex_init(&bdev->mutex, NULL);
931 	bdev->status = SPDK_BDEV_STATUS_UNCLAIMED;
932 	SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Inserting bdev %s into list\n", bdev->name);
933 	TAILQ_INSERT_TAIL(&spdk_bdev_list, bdev, link);
934 }
935 
936 void
937 spdk_bdev_unregister(struct spdk_bdev *bdev)
938 {
939 	int			rc;
940 
941 	SPDK_TRACELOG(SPDK_TRACE_DEBUG, "Removing bdev %s from list\n", bdev->name);
942 
943 	pthread_mutex_lock(&bdev->mutex);
944 	assert(bdev->status == SPDK_BDEV_STATUS_CLAIMED || bdev->status == SPDK_BDEV_STATUS_UNCLAIMED);
945 	if (bdev->status == SPDK_BDEV_STATUS_CLAIMED) {
946 		if (bdev->remove_cb) {
947 			bdev->status = SPDK_BDEV_STATUS_REMOVING;
948 			pthread_mutex_unlock(&bdev->mutex);
949 			bdev->remove_cb(bdev->remove_ctx);
950 			return;
951 		} else {
952 			bdev->status = SPDK_BDEV_STATUS_UNCLAIMED;
953 		}
954 	}
955 
956 	TAILQ_REMOVE(&spdk_bdev_list, bdev, link);
957 	pthread_mutex_unlock(&bdev->mutex);
958 
959 	pthread_mutex_destroy(&bdev->mutex);
960 
961 	rc = bdev->fn_table->destruct(bdev->ctxt);
962 	if (rc < 0) {
963 		SPDK_ERRLOG("destruct failed\n");
964 	}
965 }
966 
967 bool
968 spdk_bdev_claim(struct spdk_bdev *bdev, spdk_bdev_remove_cb_t remove_cb,
969 		void *remove_ctx)
970 {
971 	bool success;
972 
973 	pthread_mutex_lock(&bdev->mutex);
974 
975 	if (bdev->status != SPDK_BDEV_STATUS_CLAIMED) {
976 		/* Take ownership of bdev. */
977 		bdev->remove_cb = remove_cb;
978 		bdev->remove_ctx = remove_ctx;
979 		bdev->status = SPDK_BDEV_STATUS_CLAIMED;
980 		success = true;
981 	} else {
982 		/* bdev is already claimed. */
983 		success = false;
984 	}
985 
986 	pthread_mutex_unlock(&bdev->mutex);
987 
988 	return success;
989 }
990 
991 void
992 spdk_bdev_unclaim(struct spdk_bdev *bdev)
993 {
994 	bool do_unregister = false;
995 
996 	pthread_mutex_lock(&bdev->mutex);
997 	assert(bdev->status == SPDK_BDEV_STATUS_CLAIMED || bdev->status == SPDK_BDEV_STATUS_REMOVING);
998 	if (bdev->status == SPDK_BDEV_STATUS_REMOVING) {
999 		do_unregister = true;
1000 	}
1001 	bdev->remove_cb = NULL;
1002 	bdev->remove_ctx = NULL;
1003 	bdev->status = SPDK_BDEV_STATUS_UNCLAIMED;
1004 	pthread_mutex_unlock(&bdev->mutex);
1005 
1006 	if (do_unregister == true) {
1007 		spdk_bdev_unregister(bdev);
1008 	}
1009 }
1010 
1011 void
1012 spdk_bdev_io_get_rbuf(struct spdk_bdev_io *bdev_io, spdk_bdev_io_get_rbuf_cb cb)
1013 {
1014 	assert(cb != NULL);
1015 	assert(bdev_io->u.read.iovs != NULL);
1016 
1017 	if (bdev_io->u.read.iovs[0].iov_base == NULL) {
1018 		bdev_io->get_rbuf_cb = cb;
1019 		_spdk_bdev_io_get_rbuf(bdev_io);
1020 	} else {
1021 		cb(bdev_io);
1022 	}
1023 }
1024 
1025 void spdk_bdev_module_list_add(struct spdk_bdev_module_if *bdev_module)
1026 {
1027 	TAILQ_INSERT_TAIL(&spdk_bdev_module_list, bdev_module, tailq);
1028 }
1029 
1030 void spdk_vbdev_module_list_add(struct spdk_bdev_module_if *vbdev_module)
1031 {
1032 	TAILQ_INSERT_TAIL(&spdk_vbdev_module_list, vbdev_module, tailq);
1033 }
1034 SPDK_SUBSYSTEM_REGISTER(bdev, spdk_bdev_initialize, spdk_bdev_finish, spdk_bdev_config_text)
1035 SPDK_SUBSYSTEM_DEPEND(bdev, copy)
1036