xref: /spdk/module/bdev/lvol/vbdev_lvol.c (revision 8bb0ded3e55c182cea67af1f6790f8de5f38c05f)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/blob_bdev.h"
35 #include "spdk/rpc.h"
36 #include "spdk/bdev_module.h"
37 #include "spdk/log.h"
38 #include "spdk/string.h"
39 #include "spdk/uuid.h"
40 
41 #include "vbdev_lvol.h"
42 
43 static TAILQ_HEAD(, lvol_store_bdev) g_spdk_lvol_pairs = TAILQ_HEAD_INITIALIZER(
44 			g_spdk_lvol_pairs);
45 
46 static int vbdev_lvs_init(void);
47 static int vbdev_lvs_get_ctx_size(void);
48 static void vbdev_lvs_examine(struct spdk_bdev *bdev);
49 
50 static struct spdk_bdev_module g_lvol_if = {
51 	.name = "lvol",
52 	.module_init = vbdev_lvs_init,
53 	.examine_disk = vbdev_lvs_examine,
54 	.get_ctx_size = vbdev_lvs_get_ctx_size,
55 
56 };
57 
58 SPDK_BDEV_MODULE_REGISTER(lvol, &g_lvol_if)
59 
60 struct lvol_store_bdev *
61 vbdev_get_lvs_bdev_by_lvs(struct spdk_lvol_store *lvs_orig)
62 {
63 	struct spdk_lvol_store *lvs = NULL;
64 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
65 
66 	while (lvs_bdev != NULL) {
67 		lvs = lvs_bdev->lvs;
68 		if (lvs == lvs_orig) {
69 			if (lvs_bdev->req != NULL) {
70 				/* We do not allow access to lvs that are being destroyed */
71 				return NULL;
72 			} else {
73 				return lvs_bdev;
74 			}
75 		}
76 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
77 	}
78 
79 	return NULL;
80 }
81 
82 static int
83 _vbdev_lvol_change_bdev_alias(struct spdk_lvol *lvol, const char *new_lvol_name)
84 {
85 	struct spdk_bdev_alias *tmp;
86 	char *old_alias;
87 	char *alias;
88 	int rc;
89 	int alias_number = 0;
90 
91 	/* bdev representing lvols have only one alias,
92 	 * while we changed lvs name earlier, we have to iterate alias list to get one,
93 	 * and check if there is only one alias */
94 
95 	TAILQ_FOREACH(tmp, &lvol->bdev->aliases, tailq) {
96 		if (++alias_number > 1) {
97 			SPDK_ERRLOG("There is more than 1 alias in bdev %s\n", lvol->bdev->name);
98 			return -EINVAL;
99 		}
100 
101 		old_alias = tmp->alias;
102 	}
103 
104 	if (alias_number == 0) {
105 		SPDK_ERRLOG("There are no aliases in bdev %s\n", lvol->bdev->name);
106 		return -EINVAL;
107 	}
108 
109 	alias = spdk_sprintf_alloc("%s/%s", lvol->lvol_store->name, new_lvol_name);
110 	if (alias == NULL) {
111 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
112 		return -ENOMEM;
113 	}
114 
115 	rc = spdk_bdev_alias_add(lvol->bdev, alias);
116 	if (rc != 0) {
117 		SPDK_ERRLOG("cannot add alias '%s'\n", alias);
118 		free(alias);
119 		return rc;
120 	}
121 	free(alias);
122 
123 	rc = spdk_bdev_alias_del(lvol->bdev, old_alias);
124 	if (rc != 0) {
125 		SPDK_ERRLOG("cannot remove alias '%s'\n", old_alias);
126 		return rc;
127 	}
128 
129 	return 0;
130 }
131 
132 static struct lvol_store_bdev *
133 vbdev_get_lvs_bdev_by_bdev(struct spdk_bdev *bdev_orig)
134 {
135 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
136 
137 	while (lvs_bdev != NULL) {
138 		if (lvs_bdev->bdev == bdev_orig) {
139 			if (lvs_bdev->req != NULL) {
140 				/* We do not allow access to lvs that are being destroyed */
141 				return NULL;
142 			} else {
143 				return lvs_bdev;
144 			}
145 		}
146 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
147 	}
148 
149 	return NULL;
150 }
151 
152 static void
153 vbdev_lvs_hotremove_cb(struct spdk_bdev *bdev)
154 {
155 	struct lvol_store_bdev *lvs_bdev;
156 
157 	lvs_bdev = vbdev_get_lvs_bdev_by_bdev(bdev);
158 	if (lvs_bdev != NULL) {
159 		vbdev_lvs_unload(lvs_bdev->lvs, NULL, NULL);
160 	}
161 }
162 
163 static void
164 vbdev_lvs_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
165 			     void *event_ctx)
166 {
167 	switch (type) {
168 	case SPDK_BDEV_EVENT_REMOVE:
169 		vbdev_lvs_hotremove_cb(bdev);
170 		break;
171 	default:
172 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
173 		break;
174 	}
175 }
176 
177 static void
178 _vbdev_lvs_create_cb(void *cb_arg, struct spdk_lvol_store *lvs, int lvserrno)
179 {
180 	struct spdk_lvs_with_handle_req *req = cb_arg;
181 	struct lvol_store_bdev *lvs_bdev;
182 	struct spdk_bdev *bdev = req->base_bdev;
183 	struct spdk_bs_dev *bs_dev = req->bs_dev;
184 
185 	if (lvserrno != 0) {
186 		assert(lvs == NULL);
187 		SPDK_ERRLOG("Cannot create lvol store bdev\n");
188 		goto end;
189 	}
190 
191 	lvserrno = spdk_bs_bdev_claim(bs_dev, &g_lvol_if);
192 	if (lvserrno != 0) {
193 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
194 		req->bs_dev->destroy(req->bs_dev);
195 		goto end;
196 	}
197 
198 	assert(lvs != NULL);
199 
200 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
201 	if (!lvs_bdev) {
202 		lvserrno = -ENOMEM;
203 		goto end;
204 	}
205 	lvs_bdev->lvs = lvs;
206 	lvs_bdev->bdev = bdev;
207 	lvs_bdev->req = NULL;
208 
209 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
210 	SPDK_INFOLOG(vbdev_lvol, "Lvol store bdev inserted\n");
211 
212 end:
213 	req->cb_fn(req->cb_arg, lvs, lvserrno);
214 	free(req);
215 
216 	return;
217 }
218 
219 int
220 vbdev_lvs_create(const char *base_bdev_name, const char *name, uint32_t cluster_sz,
221 		 enum lvs_clear_method clear_method, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
222 {
223 	struct spdk_bs_dev *bs_dev;
224 	struct spdk_lvs_with_handle_req *lvs_req;
225 	struct spdk_lvs_opts opts;
226 	int rc;
227 	int len;
228 
229 	if (base_bdev_name == NULL) {
230 		SPDK_ERRLOG("missing base_bdev_name param\n");
231 		return -EINVAL;
232 	}
233 
234 	spdk_lvs_opts_init(&opts);
235 	if (cluster_sz != 0) {
236 		opts.cluster_sz = cluster_sz;
237 	}
238 
239 	if (clear_method != 0) {
240 		opts.clear_method = clear_method;
241 	}
242 
243 	if (name == NULL) {
244 		SPDK_ERRLOG("missing name param\n");
245 		return -EINVAL;
246 	}
247 
248 	len = strnlen(name, SPDK_LVS_NAME_MAX);
249 
250 	if (len == 0 || len == SPDK_LVS_NAME_MAX) {
251 		SPDK_ERRLOG("name must be between 1 and %d characters\n", SPDK_LVS_NAME_MAX - 1);
252 		return -EINVAL;
253 	}
254 	snprintf(opts.name, sizeof(opts.name), "%s", name);
255 
256 	lvs_req = calloc(1, sizeof(*lvs_req));
257 	if (!lvs_req) {
258 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
259 		return -ENOMEM;
260 	}
261 
262 	rc = spdk_bdev_create_bs_dev_ext(base_bdev_name, vbdev_lvs_base_bdev_event_cb,
263 					 NULL, &bs_dev);
264 	if (rc < 0) {
265 		SPDK_ERRLOG("Cannot create blobstore device\n");
266 		free(lvs_req);
267 		return rc;
268 	}
269 
270 	lvs_req->bs_dev = bs_dev;
271 	lvs_req->base_bdev = bs_dev->get_base_bdev(bs_dev);
272 	lvs_req->cb_fn = cb_fn;
273 	lvs_req->cb_arg = cb_arg;
274 
275 	rc = spdk_lvs_init(bs_dev, &opts, _vbdev_lvs_create_cb, lvs_req);
276 	if (rc < 0) {
277 		free(lvs_req);
278 		bs_dev->destroy(bs_dev);
279 		return rc;
280 	}
281 
282 	return 0;
283 }
284 
285 static void
286 _vbdev_lvs_rename_cb(void *cb_arg, int lvserrno)
287 {
288 	struct spdk_lvs_req *req = cb_arg;
289 	struct spdk_lvol *tmp;
290 
291 	if (lvserrno != 0) {
292 		SPDK_INFOLOG(vbdev_lvol, "Lvol store rename failed\n");
293 	} else {
294 		TAILQ_FOREACH(tmp, &req->lvol_store->lvols, link) {
295 			/* We have to pass current lvol name, since only lvs name changed */
296 			_vbdev_lvol_change_bdev_alias(tmp, tmp->name);
297 		}
298 	}
299 
300 	req->cb_fn(req->cb_arg, lvserrno);
301 	free(req);
302 }
303 
304 void
305 vbdev_lvs_rename(struct spdk_lvol_store *lvs, const char *new_lvs_name,
306 		 spdk_lvs_op_complete cb_fn, void *cb_arg)
307 {
308 	struct lvol_store_bdev *lvs_bdev;
309 
310 	struct spdk_lvs_req *req;
311 
312 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
313 	if (!lvs_bdev) {
314 		SPDK_ERRLOG("No such lvol store found\n");
315 		cb_fn(cb_arg, -ENODEV);
316 		return;
317 	}
318 
319 	req = calloc(1, sizeof(*req));
320 	if (!req) {
321 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
322 		cb_fn(cb_arg, -ENOMEM);
323 		return;
324 	}
325 	req->cb_fn = cb_fn;
326 	req->cb_arg = cb_arg;
327 	req->lvol_store = lvs;
328 
329 	spdk_lvs_rename(lvs, new_lvs_name, _vbdev_lvs_rename_cb, req);
330 }
331 
332 static void
333 _vbdev_lvs_remove_cb(void *cb_arg, int lvserrno)
334 {
335 	struct lvol_store_bdev *lvs_bdev = cb_arg;
336 	struct spdk_lvs_req *req = lvs_bdev->req;
337 
338 	if (lvserrno != 0) {
339 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
340 	}
341 
342 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
343 	free(lvs_bdev);
344 
345 	if (req->cb_fn != NULL) {
346 		req->cb_fn(req->cb_arg, lvserrno);
347 	}
348 	free(req);
349 }
350 
351 static void
352 _vbdev_lvs_remove_lvol_cb(void *cb_arg, int lvolerrno)
353 {
354 	struct lvol_store_bdev *lvs_bdev = cb_arg;
355 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
356 	struct spdk_lvol *lvol;
357 
358 	if (lvolerrno != 0) {
359 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol removed with errno %d\n", lvolerrno);
360 	}
361 
362 	if (TAILQ_EMPTY(&lvs->lvols)) {
363 		spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
364 		return;
365 	}
366 
367 	lvol = TAILQ_FIRST(&lvs->lvols);
368 	while (lvol != NULL) {
369 		if (spdk_lvol_deletable(lvol)) {
370 			vbdev_lvol_destroy(lvol, _vbdev_lvs_remove_lvol_cb, lvs_bdev);
371 			return;
372 		}
373 		lvol = TAILQ_NEXT(lvol, link);
374 	}
375 
376 	/* If no lvol is deletable, that means there is circular dependency. */
377 	SPDK_ERRLOG("Lvols left in lvs, but unable to delete.\n");
378 	assert(false);
379 }
380 
381 static void
382 _vbdev_lvs_remove_bdev_unregistered_cb(void *cb_arg, int bdeverrno)
383 {
384 	struct lvol_store_bdev *lvs_bdev = cb_arg;
385 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
386 	struct spdk_lvol *lvol, *tmp;
387 
388 	if (bdeverrno != 0) {
389 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol unregistered with errno %d\n", bdeverrno);
390 	}
391 
392 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
393 		if (lvol->ref_count != 0) {
394 			/* An lvol is still open, don't unload whole lvol store. */
395 			return;
396 		}
397 	}
398 	spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
399 }
400 
401 static void
402 _vbdev_lvs_remove(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg,
403 		  bool destroy)
404 {
405 	struct spdk_lvs_req *req;
406 	struct lvol_store_bdev *lvs_bdev;
407 	struct spdk_lvol *lvol, *tmp;
408 	bool all_lvols_closed = true;
409 
410 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
411 	if (!lvs_bdev) {
412 		SPDK_ERRLOG("No such lvol store found\n");
413 		if (cb_fn != NULL) {
414 			cb_fn(cb_arg, -ENODEV);
415 		}
416 		return;
417 	}
418 
419 	req = calloc(1, sizeof(*req));
420 	if (!req) {
421 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
422 		if (cb_fn != NULL) {
423 			cb_fn(cb_arg, -ENOMEM);
424 		}
425 		return;
426 	}
427 
428 	req->cb_fn = cb_fn;
429 	req->cb_arg = cb_arg;
430 	lvs_bdev->req = req;
431 
432 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
433 		if (lvol->ref_count != 0) {
434 			all_lvols_closed = false;
435 		}
436 	}
437 
438 	if (all_lvols_closed == true) {
439 		if (destroy) {
440 			spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
441 		} else {
442 			spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
443 		}
444 	} else {
445 		lvs->destruct = destroy;
446 		if (destroy) {
447 			_vbdev_lvs_remove_lvol_cb(lvs_bdev, 0);
448 		} else {
449 			TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
450 				spdk_bdev_unregister(lvol->bdev, _vbdev_lvs_remove_bdev_unregistered_cb, lvs_bdev);
451 			}
452 		}
453 	}
454 }
455 
456 void
457 vbdev_lvs_unload(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
458 {
459 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, false);
460 }
461 
462 void
463 vbdev_lvs_destruct(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
464 {
465 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, true);
466 }
467 
468 struct lvol_store_bdev *
469 vbdev_lvol_store_first(void)
470 {
471 	struct lvol_store_bdev *lvs_bdev;
472 
473 	lvs_bdev = TAILQ_FIRST(&g_spdk_lvol_pairs);
474 	if (lvs_bdev) {
475 		SPDK_INFOLOG(vbdev_lvol, "Starting lvolstore iteration at %p\n", lvs_bdev->lvs);
476 	}
477 
478 	return lvs_bdev;
479 }
480 
481 struct lvol_store_bdev *
482 vbdev_lvol_store_next(struct lvol_store_bdev *prev)
483 {
484 	struct lvol_store_bdev *lvs_bdev;
485 
486 	if (prev == NULL) {
487 		SPDK_ERRLOG("prev argument cannot be NULL\n");
488 		return NULL;
489 	}
490 
491 	lvs_bdev = TAILQ_NEXT(prev, lvol_stores);
492 	if (lvs_bdev) {
493 		SPDK_INFOLOG(vbdev_lvol, "Continuing lvolstore iteration at %p\n", lvs_bdev->lvs);
494 	}
495 
496 	return lvs_bdev;
497 }
498 
499 static struct spdk_lvol_store *
500 _vbdev_get_lvol_store_by_uuid(const struct spdk_uuid *uuid)
501 {
502 	struct spdk_lvol_store *lvs = NULL;
503 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
504 
505 	while (lvs_bdev != NULL) {
506 		lvs = lvs_bdev->lvs;
507 		if (spdk_uuid_compare(&lvs->uuid, uuid) == 0) {
508 			return lvs;
509 		}
510 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
511 	}
512 	return NULL;
513 }
514 
515 struct spdk_lvol_store *
516 vbdev_get_lvol_store_by_uuid(const char *uuid_str)
517 {
518 	struct spdk_uuid uuid;
519 
520 	if (spdk_uuid_parse(&uuid, uuid_str)) {
521 		return NULL;
522 	}
523 
524 	return _vbdev_get_lvol_store_by_uuid(&uuid);
525 }
526 
527 struct spdk_lvol_store *
528 vbdev_get_lvol_store_by_name(const char *name)
529 {
530 	struct spdk_lvol_store *lvs = NULL;
531 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
532 
533 	while (lvs_bdev != NULL) {
534 		lvs = lvs_bdev->lvs;
535 		if (strncmp(lvs->name, name, sizeof(lvs->name)) == 0) {
536 			return lvs;
537 		}
538 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
539 	}
540 	return NULL;
541 }
542 
543 struct vbdev_lvol_destroy_ctx {
544 	struct spdk_lvol *lvol;
545 	spdk_lvol_op_complete cb_fn;
546 	void *cb_arg;
547 };
548 
549 static void
550 _vbdev_lvol_unregister_cb(void *ctx, int lvolerrno)
551 {
552 	struct spdk_bdev *bdev = ctx;
553 
554 	spdk_bdev_destruct_done(bdev, lvolerrno);
555 	free(bdev);
556 }
557 
558 static int
559 vbdev_lvol_unregister(void *ctx)
560 {
561 	struct spdk_lvol *lvol = ctx;
562 
563 	assert(lvol != NULL);
564 
565 	spdk_bdev_alias_del_all(lvol->bdev);
566 	spdk_lvol_close(lvol, _vbdev_lvol_unregister_cb, lvol->bdev);
567 
568 	/* return 1 to indicate we have an operation that must finish asynchronously before the
569 	 *  lvol is closed
570 	 */
571 	return 1;
572 }
573 
574 static void
575 _vbdev_lvol_destroy_cb(void *cb_arg, int bdeverrno)
576 {
577 	struct vbdev_lvol_destroy_ctx *ctx = cb_arg;
578 	struct spdk_lvol *lvol = ctx->lvol;
579 
580 	if (bdeverrno < 0) {
581 		SPDK_INFOLOG(vbdev_lvol, "Could not unregister bdev during lvol (%s) destroy\n",
582 			     lvol->unique_id);
583 		ctx->cb_fn(ctx->cb_arg, bdeverrno);
584 		free(ctx);
585 		return;
586 	}
587 
588 	spdk_lvol_destroy(lvol, ctx->cb_fn, ctx->cb_arg);
589 	free(ctx);
590 }
591 
592 void
593 vbdev_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
594 {
595 	struct vbdev_lvol_destroy_ctx *ctx;
596 	size_t count;
597 
598 	assert(lvol != NULL);
599 	assert(cb_fn != NULL);
600 
601 	/* Check if it is possible to delete lvol */
602 	spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
603 	if (count > 1) {
604 		/* throw an error */
605 		SPDK_ERRLOG("Cannot delete lvol\n");
606 		cb_fn(cb_arg, -EPERM);
607 		return;
608 	}
609 
610 	ctx = calloc(1, sizeof(*ctx));
611 	if (!ctx) {
612 		cb_fn(cb_arg, -ENOMEM);
613 		return;
614 	}
615 
616 	ctx->lvol = lvol;
617 	ctx->cb_fn = cb_fn;
618 	ctx->cb_arg = cb_arg;
619 
620 	spdk_bdev_unregister(lvol->bdev, _vbdev_lvol_destroy_cb, ctx);
621 }
622 
623 static char *
624 vbdev_lvol_find_name(struct spdk_lvol *lvol, spdk_blob_id blob_id)
625 {
626 	struct spdk_lvol_store *lvs;
627 	struct spdk_lvol *_lvol;
628 
629 	assert(lvol != NULL);
630 
631 	lvs = lvol->lvol_store;
632 
633 	assert(lvs);
634 
635 	TAILQ_FOREACH(_lvol, &lvs->lvols, link) {
636 		if (_lvol->blob_id == blob_id) {
637 			return _lvol->name;
638 		}
639 	}
640 
641 	return NULL;
642 }
643 
644 static int
645 vbdev_lvol_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
646 {
647 	struct spdk_lvol *lvol = ctx;
648 	struct lvol_store_bdev *lvs_bdev;
649 	struct spdk_bdev *bdev;
650 	struct spdk_blob *blob;
651 	char lvol_store_uuid[SPDK_UUID_STRING_LEN];
652 	spdk_blob_id *ids = NULL;
653 	size_t count, i;
654 	char *name;
655 	int rc = 0;
656 
657 	spdk_json_write_named_object_begin(w, "lvol");
658 
659 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
660 	if (!lvs_bdev) {
661 		SPDK_ERRLOG("No such lvol store found\n");
662 		rc = -ENODEV;
663 		goto end;
664 	}
665 
666 	bdev = lvs_bdev->bdev;
667 
668 	spdk_uuid_fmt_lower(lvol_store_uuid, sizeof(lvol_store_uuid), &lvol->lvol_store->uuid);
669 	spdk_json_write_named_string(w, "lvol_store_uuid", lvol_store_uuid);
670 
671 	spdk_json_write_named_string(w, "base_bdev", spdk_bdev_get_name(bdev));
672 
673 	blob = lvol->blob;
674 
675 	spdk_json_write_named_bool(w, "thin_provision", spdk_blob_is_thin_provisioned(blob));
676 
677 	spdk_json_write_named_bool(w, "snapshot", spdk_blob_is_snapshot(blob));
678 
679 	spdk_json_write_named_bool(w, "clone", spdk_blob_is_clone(blob));
680 
681 	if (spdk_blob_is_clone(blob)) {
682 		spdk_blob_id snapshotid = spdk_blob_get_parent_snapshot(lvol->lvol_store->blobstore, lvol->blob_id);
683 		if (snapshotid != SPDK_BLOBID_INVALID) {
684 			name = vbdev_lvol_find_name(lvol, snapshotid);
685 			if (name != NULL) {
686 				spdk_json_write_named_string(w, "base_snapshot", name);
687 			} else {
688 				SPDK_ERRLOG("Cannot obtain snapshots name\n");
689 			}
690 		}
691 	}
692 
693 	if (spdk_blob_is_snapshot(blob)) {
694 		/* Take a number of clones */
695 		rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
696 		if (rc == -ENOMEM && count > 0) {
697 			ids = malloc(sizeof(spdk_blob_id) * count);
698 			if (ids == NULL) {
699 				SPDK_ERRLOG("Cannot allocate memory\n");
700 				rc = -ENOMEM;
701 				goto end;
702 			}
703 
704 			rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, ids, &count);
705 			if (rc == 0) {
706 				spdk_json_write_named_array_begin(w, "clones");
707 				for (i = 0; i < count; i++) {
708 					name = vbdev_lvol_find_name(lvol, ids[i]);
709 					if (name != NULL) {
710 						spdk_json_write_string(w, name);
711 					} else {
712 						SPDK_ERRLOG("Cannot obtain clone name\n");
713 					}
714 
715 				}
716 				spdk_json_write_array_end(w);
717 			}
718 			free(ids);
719 		}
720 
721 	}
722 
723 end:
724 	spdk_json_write_object_end(w);
725 
726 	return rc;
727 }
728 
729 static void
730 vbdev_lvol_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
731 {
732 	/* Nothing to dump as lvol configuration is saved on physical device. */
733 }
734 
735 static struct spdk_io_channel *
736 vbdev_lvol_get_io_channel(void *ctx)
737 {
738 	struct spdk_lvol *lvol = ctx;
739 
740 	return spdk_lvol_get_io_channel(lvol);
741 }
742 
743 static bool
744 vbdev_lvol_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
745 {
746 	struct spdk_lvol *lvol = ctx;
747 
748 	switch (io_type) {
749 	case SPDK_BDEV_IO_TYPE_WRITE:
750 	case SPDK_BDEV_IO_TYPE_UNMAP:
751 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
752 		return !spdk_blob_is_read_only(lvol->blob);
753 	case SPDK_BDEV_IO_TYPE_RESET:
754 	case SPDK_BDEV_IO_TYPE_READ:
755 		return true;
756 	default:
757 		return false;
758 	}
759 }
760 
761 static void
762 lvol_op_comp(void *cb_arg, int bserrno)
763 {
764 	struct spdk_bdev_io *bdev_io = cb_arg;
765 	enum spdk_bdev_io_status status = SPDK_BDEV_IO_STATUS_SUCCESS;
766 
767 	if (bserrno != 0) {
768 		if (bserrno == -ENOMEM) {
769 			status = SPDK_BDEV_IO_STATUS_NOMEM;
770 		} else {
771 			status = SPDK_BDEV_IO_STATUS_FAILED;
772 		}
773 	}
774 
775 	spdk_bdev_io_complete(bdev_io, status);
776 }
777 
778 static void
779 lvol_unmap(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
780 {
781 	uint64_t start_page, num_pages;
782 	struct spdk_blob *blob = lvol->blob;
783 
784 	start_page = bdev_io->u.bdev.offset_blocks;
785 	num_pages = bdev_io->u.bdev.num_blocks;
786 
787 	spdk_blob_io_unmap(blob, ch, start_page, num_pages, lvol_op_comp, bdev_io);
788 }
789 
790 static void
791 lvol_write_zeroes(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
792 {
793 	uint64_t start_page, num_pages;
794 	struct spdk_blob *blob = lvol->blob;
795 
796 	start_page = bdev_io->u.bdev.offset_blocks;
797 	num_pages = bdev_io->u.bdev.num_blocks;
798 
799 	spdk_blob_io_write_zeroes(blob, ch, start_page, num_pages, lvol_op_comp, bdev_io);
800 }
801 
802 static void
803 lvol_read(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
804 {
805 	uint64_t start_page, num_pages;
806 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
807 	struct spdk_blob *blob = lvol->blob;
808 
809 	start_page = bdev_io->u.bdev.offset_blocks;
810 	num_pages = bdev_io->u.bdev.num_blocks;
811 
812 	spdk_blob_io_readv(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
813 			   num_pages, lvol_op_comp, bdev_io);
814 }
815 
816 static void
817 lvol_write(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
818 {
819 	uint64_t start_page, num_pages;
820 	struct spdk_blob *blob = lvol->blob;
821 
822 	start_page = bdev_io->u.bdev.offset_blocks;
823 	num_pages = bdev_io->u.bdev.num_blocks;
824 
825 	spdk_blob_io_writev(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
826 			    num_pages, lvol_op_comp, bdev_io);
827 }
828 
829 static int
830 lvol_reset(struct spdk_bdev_io *bdev_io)
831 {
832 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
833 
834 	return 0;
835 }
836 
837 static void
838 lvol_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
839 {
840 	if (!success) {
841 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
842 		return;
843 	}
844 
845 	lvol_read(ch, bdev_io);
846 }
847 
848 static void
849 vbdev_lvol_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
850 {
851 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
852 
853 	switch (bdev_io->type) {
854 	case SPDK_BDEV_IO_TYPE_READ:
855 		spdk_bdev_io_get_buf(bdev_io, lvol_get_buf_cb,
856 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
857 		break;
858 	case SPDK_BDEV_IO_TYPE_WRITE:
859 		lvol_write(lvol, ch, bdev_io);
860 		break;
861 	case SPDK_BDEV_IO_TYPE_RESET:
862 		lvol_reset(bdev_io);
863 		break;
864 	case SPDK_BDEV_IO_TYPE_UNMAP:
865 		lvol_unmap(lvol, ch, bdev_io);
866 		break;
867 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
868 		lvol_write_zeroes(lvol, ch, bdev_io);
869 		break;
870 	default:
871 		SPDK_INFOLOG(vbdev_lvol, "lvol: unsupported I/O type %d\n", bdev_io->type);
872 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
873 		return;
874 	}
875 	return;
876 }
877 
878 static struct spdk_bdev_fn_table vbdev_lvol_fn_table = {
879 	.destruct		= vbdev_lvol_unregister,
880 	.io_type_supported	= vbdev_lvol_io_type_supported,
881 	.submit_request		= vbdev_lvol_submit_request,
882 	.get_io_channel		= vbdev_lvol_get_io_channel,
883 	.dump_info_json		= vbdev_lvol_dump_info_json,
884 	.write_config_json	= vbdev_lvol_write_config_json,
885 };
886 
887 static void
888 lvol_destroy_cb(void *cb_arg, int bdeverrno)
889 {
890 }
891 
892 static void
893 _create_lvol_disk_destroy_cb(void *cb_arg, int bdeverrno)
894 {
895 	struct spdk_lvol *lvol = cb_arg;
896 
897 	if (bdeverrno < 0) {
898 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
899 			    lvol->unique_id);
900 		return;
901 	}
902 
903 	spdk_lvol_destroy(lvol, lvol_destroy_cb, NULL);
904 }
905 
906 static void
907 _create_lvol_disk_unload_cb(void *cb_arg, int bdeverrno)
908 {
909 	struct spdk_lvol *lvol = cb_arg;
910 
911 	if (bdeverrno < 0) {
912 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
913 			    lvol->unique_id);
914 		return;
915 	}
916 
917 	TAILQ_REMOVE(&lvol->lvol_store->lvols, lvol, link);
918 	free(lvol);
919 }
920 
921 static int
922 _create_lvol_disk(struct spdk_lvol *lvol, bool destroy)
923 {
924 	struct spdk_bdev *bdev;
925 	struct lvol_store_bdev *lvs_bdev;
926 	uint64_t total_size;
927 	unsigned char *alias;
928 	int rc;
929 
930 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
931 	if (lvs_bdev == NULL) {
932 		SPDK_ERRLOG("No spdk lvs-bdev pair found for lvol %s\n", lvol->unique_id);
933 		return -ENODEV;
934 	}
935 
936 	bdev = calloc(1, sizeof(struct spdk_bdev));
937 	if (!bdev) {
938 		SPDK_ERRLOG("Cannot alloc memory for lvol bdev\n");
939 		return -ENOMEM;
940 	}
941 
942 	bdev->name = lvol->unique_id;
943 	bdev->product_name = "Logical Volume";
944 	bdev->blocklen = spdk_bs_get_io_unit_size(lvol->lvol_store->blobstore);
945 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
946 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
947 	assert((total_size % bdev->blocklen) == 0);
948 	bdev->blockcnt = total_size / bdev->blocklen;
949 	bdev->uuid = lvol->uuid;
950 	bdev->required_alignment = lvs_bdev->bdev->required_alignment;
951 	bdev->split_on_optimal_io_boundary = true;
952 	bdev->optimal_io_boundary = spdk_bs_get_cluster_size(lvol->lvol_store->blobstore) / bdev->blocklen;
953 
954 	bdev->ctxt = lvol;
955 	bdev->fn_table = &vbdev_lvol_fn_table;
956 	bdev->module = &g_lvol_if;
957 
958 	rc = spdk_bdev_register(bdev);
959 	if (rc) {
960 		free(bdev);
961 		return rc;
962 	}
963 	lvol->bdev = bdev;
964 
965 	alias = spdk_sprintf_alloc("%s/%s", lvs_bdev->lvs->name, lvol->name);
966 	if (alias == NULL) {
967 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
968 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
969 						  _create_lvol_disk_unload_cb), lvol);
970 		return -ENOMEM;
971 	}
972 
973 	rc = spdk_bdev_alias_add(bdev, alias);
974 	if (rc != 0) {
975 		SPDK_ERRLOG("Cannot add alias to lvol bdev\n");
976 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
977 						  _create_lvol_disk_unload_cb), lvol);
978 	}
979 	free(alias);
980 
981 	return rc;
982 }
983 
984 static void
985 _vbdev_lvol_create_cb(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
986 {
987 	struct spdk_lvol_with_handle_req *req = cb_arg;
988 
989 	if (lvolerrno < 0) {
990 		goto end;
991 	}
992 
993 	lvolerrno = _create_lvol_disk(lvol, true);
994 
995 end:
996 	req->cb_fn(req->cb_arg, lvol, lvolerrno);
997 	free(req);
998 }
999 
1000 int
1001 vbdev_lvol_create(struct spdk_lvol_store *lvs, const char *name, uint64_t sz,
1002 		  bool thin_provision, enum lvol_clear_method clear_method, spdk_lvol_op_with_handle_complete cb_fn,
1003 		  void *cb_arg)
1004 {
1005 	struct spdk_lvol_with_handle_req *req;
1006 	int rc;
1007 
1008 	req = calloc(1, sizeof(*req));
1009 	if (req == NULL) {
1010 		return -ENOMEM;
1011 	}
1012 	req->cb_fn = cb_fn;
1013 	req->cb_arg = cb_arg;
1014 
1015 	rc = spdk_lvol_create(lvs, name, sz, thin_provision, clear_method,
1016 			      _vbdev_lvol_create_cb, req);
1017 	if (rc != 0) {
1018 		free(req);
1019 	}
1020 
1021 	return rc;
1022 }
1023 
1024 void
1025 vbdev_lvol_create_snapshot(struct spdk_lvol *lvol, const char *snapshot_name,
1026 			   spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1027 {
1028 	struct spdk_lvol_with_handle_req *req;
1029 
1030 	req = calloc(1, sizeof(*req));
1031 	if (req == NULL) {
1032 		cb_fn(cb_arg, NULL, -ENOMEM);
1033 		return;
1034 	}
1035 
1036 	req->cb_fn = cb_fn;
1037 	req->cb_arg = cb_arg;
1038 
1039 	spdk_lvol_create_snapshot(lvol, snapshot_name, _vbdev_lvol_create_cb, req);
1040 }
1041 
1042 void
1043 vbdev_lvol_create_clone(struct spdk_lvol *lvol, const char *clone_name,
1044 			spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1045 {
1046 	struct spdk_lvol_with_handle_req *req;
1047 
1048 	req = calloc(1, sizeof(*req));
1049 	if (req == NULL) {
1050 		cb_fn(cb_arg, NULL, -ENOMEM);
1051 		return;
1052 	}
1053 
1054 	req->cb_fn = cb_fn;
1055 	req->cb_arg = cb_arg;
1056 
1057 	spdk_lvol_create_clone(lvol, clone_name, _vbdev_lvol_create_cb, req);
1058 }
1059 
1060 static void
1061 _vbdev_lvol_rename_cb(void *cb_arg, int lvolerrno)
1062 {
1063 	struct spdk_lvol_req *req = cb_arg;
1064 
1065 	if (lvolerrno != 0) {
1066 		SPDK_ERRLOG("Renaming lvol failed\n");
1067 	}
1068 
1069 	req->cb_fn(req->cb_arg, lvolerrno);
1070 	free(req);
1071 }
1072 
1073 void
1074 vbdev_lvol_rename(struct spdk_lvol *lvol, const char *new_lvol_name,
1075 		  spdk_lvol_op_complete cb_fn, void *cb_arg)
1076 {
1077 	struct spdk_lvol_req *req;
1078 	int rc;
1079 
1080 	rc = _vbdev_lvol_change_bdev_alias(lvol, new_lvol_name);
1081 	if (rc != 0) {
1082 		SPDK_ERRLOG("renaming lvol to '%s' does not succeed\n", new_lvol_name);
1083 		cb_fn(cb_arg, rc);
1084 		return;
1085 	}
1086 
1087 	req = calloc(1, sizeof(*req));
1088 	if (req == NULL) {
1089 		cb_fn(cb_arg, -ENOMEM);
1090 		return;
1091 	}
1092 	req->cb_fn = cb_fn;
1093 	req->cb_arg = cb_arg;
1094 
1095 	spdk_lvol_rename(lvol, new_lvol_name, _vbdev_lvol_rename_cb, req);
1096 }
1097 
1098 static void
1099 _vbdev_lvol_resize_cb(void *cb_arg, int lvolerrno)
1100 {
1101 	struct spdk_lvol_req *req = cb_arg;
1102 	struct spdk_lvol *lvol = req->lvol;
1103 	uint64_t total_size;
1104 
1105 	/* change bdev size */
1106 	if (lvolerrno != 0) {
1107 		SPDK_ERRLOG("CB function for bdev lvol %s receive error no: %d.\n", lvol->name, lvolerrno);
1108 		goto finish;
1109 	}
1110 
1111 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
1112 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
1113 	assert((total_size % lvol->bdev->blocklen) == 0);
1114 
1115 	lvolerrno = spdk_bdev_notify_blockcnt_change(lvol->bdev, total_size / lvol->bdev->blocklen);
1116 	if (lvolerrno != 0) {
1117 		SPDK_ERRLOG("Could not change num blocks for bdev lvol %s with error no: %d.\n",
1118 			    lvol->name, lvolerrno);
1119 	}
1120 
1121 finish:
1122 	req->cb_fn(req->cb_arg, lvolerrno);
1123 	free(req);
1124 }
1125 
1126 void
1127 vbdev_lvol_resize(struct spdk_lvol *lvol, uint64_t sz, spdk_lvol_op_complete cb_fn, void *cb_arg)
1128 {
1129 	struct spdk_lvol_req *req;
1130 
1131 	if (lvol == NULL) {
1132 		SPDK_ERRLOG("lvol does not exist\n");
1133 		cb_fn(cb_arg, -EINVAL);
1134 		return;
1135 	}
1136 
1137 	assert(lvol->bdev != NULL);
1138 
1139 	req = calloc(1, sizeof(*req));
1140 	if (req == NULL) {
1141 		cb_fn(cb_arg, -ENOMEM);
1142 		return;
1143 	}
1144 
1145 	req->cb_fn = cb_fn;
1146 	req->cb_arg = cb_arg;
1147 	req->sz = sz;
1148 	req->lvol = lvol;
1149 
1150 	spdk_lvol_resize(req->lvol, req->sz, _vbdev_lvol_resize_cb, req);
1151 }
1152 
1153 static void
1154 _vbdev_lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
1155 {
1156 	struct spdk_lvol_req *req = cb_arg;
1157 	struct spdk_lvol *lvol = req->lvol;
1158 
1159 	if (lvolerrno != 0) {
1160 		SPDK_ERRLOG("Could not set bdev lvol %s as read only due to error: %d.\n", lvol->name, lvolerrno);
1161 	}
1162 
1163 	req->cb_fn(req->cb_arg, lvolerrno);
1164 	free(req);
1165 }
1166 
1167 void
1168 vbdev_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1169 {
1170 	struct spdk_lvol_req *req;
1171 
1172 	if (lvol == NULL) {
1173 		SPDK_ERRLOG("lvol does not exist\n");
1174 		cb_fn(cb_arg, -EINVAL);
1175 		return;
1176 	}
1177 
1178 	assert(lvol->bdev != NULL);
1179 
1180 	req = calloc(1, sizeof(*req));
1181 	if (req == NULL) {
1182 		cb_fn(cb_arg, -ENOMEM);
1183 		return;
1184 	}
1185 
1186 	req->cb_fn = cb_fn;
1187 	req->cb_arg = cb_arg;
1188 	req->lvol = lvol;
1189 
1190 	spdk_lvol_set_read_only(lvol, _vbdev_lvol_set_read_only_cb, req);
1191 }
1192 
1193 static int
1194 vbdev_lvs_init(void)
1195 {
1196 	return 0;
1197 }
1198 
1199 static int
1200 vbdev_lvs_get_ctx_size(void)
1201 {
1202 	return 0;
1203 }
1204 
1205 static void
1206 _vbdev_lvs_examine_failed(void *cb_arg, int lvserrno)
1207 {
1208 	spdk_bdev_module_examine_done(&g_lvol_if);
1209 }
1210 
1211 static void
1212 _vbdev_lvol_examine_close_cb(struct spdk_lvol_store *lvs)
1213 {
1214 	if (lvs->lvols_opened >= lvs->lvol_count) {
1215 		SPDK_INFOLOG(vbdev_lvol, "Opening lvols finished\n");
1216 		spdk_bdev_module_examine_done(&g_lvol_if);
1217 	}
1218 }
1219 
1220 static void
1221 _vbdev_lvs_examine_finish(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1222 {
1223 	struct spdk_lvol_store *lvs = cb_arg;
1224 
1225 	if (lvolerrno != 0) {
1226 		SPDK_ERRLOG("Error opening lvol %s\n", lvol->unique_id);
1227 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
1228 		lvs->lvol_count--;
1229 		free(lvol);
1230 		goto end;
1231 	}
1232 
1233 	if (_create_lvol_disk(lvol, false)) {
1234 		SPDK_ERRLOG("Cannot create bdev for lvol %s\n", lvol->unique_id);
1235 		lvs->lvol_count--;
1236 		_vbdev_lvol_examine_close_cb(lvs);
1237 		SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s failed\n", lvol->unique_id);
1238 		return;
1239 	}
1240 
1241 	lvs->lvols_opened++;
1242 	SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s succeeded\n", lvol->unique_id);
1243 
1244 end:
1245 
1246 	if (lvs->lvols_opened >= lvs->lvol_count) {
1247 		SPDK_INFOLOG(vbdev_lvol, "Opening lvols finished\n");
1248 		spdk_bdev_module_examine_done(&g_lvol_if);
1249 	}
1250 }
1251 
1252 static void
1253 _vbdev_lvs_examine_cb(void *arg, struct spdk_lvol_store *lvol_store, int lvserrno)
1254 {
1255 	struct lvol_store_bdev *lvs_bdev;
1256 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)arg;
1257 	struct spdk_lvol *lvol, *tmp;
1258 
1259 	if (lvserrno == -EEXIST) {
1260 		SPDK_INFOLOG(vbdev_lvol,
1261 			     "Name for lvolstore on device %s conflicts with name for already loaded lvs\n",
1262 			     req->base_bdev->name);
1263 		/* On error blobstore destroys bs_dev itself */
1264 		spdk_bdev_module_examine_done(&g_lvol_if);
1265 		goto end;
1266 	} else if (lvserrno != 0) {
1267 		SPDK_INFOLOG(vbdev_lvol, "Lvol store not found on %s\n", req->base_bdev->name);
1268 		/* On error blobstore destroys bs_dev itself */
1269 		spdk_bdev_module_examine_done(&g_lvol_if);
1270 		goto end;
1271 	}
1272 
1273 	lvserrno = spdk_bs_bdev_claim(lvol_store->bs_dev, &g_lvol_if);
1274 	if (lvserrno != 0) {
1275 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
1276 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, NULL);
1277 		goto end;
1278 	}
1279 
1280 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
1281 	if (!lvs_bdev) {
1282 		SPDK_ERRLOG("Cannot alloc memory for lvs_bdev\n");
1283 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, NULL);
1284 		goto end;
1285 	}
1286 
1287 	lvs_bdev->lvs = lvol_store;
1288 	lvs_bdev->bdev = req->base_bdev;
1289 
1290 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1291 
1292 	SPDK_INFOLOG(vbdev_lvol, "Lvol store found on %s - begin parsing\n",
1293 		     req->base_bdev->name);
1294 
1295 	lvol_store->lvols_opened = 0;
1296 
1297 	if (TAILQ_EMPTY(&lvol_store->lvols)) {
1298 		SPDK_INFOLOG(vbdev_lvol, "Lvol store examination done\n");
1299 		spdk_bdev_module_examine_done(&g_lvol_if);
1300 	} else {
1301 		/* Open all lvols */
1302 		TAILQ_FOREACH_SAFE(lvol, &lvol_store->lvols, link, tmp) {
1303 			spdk_lvol_open(lvol, _vbdev_lvs_examine_finish, lvol_store);
1304 		}
1305 	}
1306 
1307 end:
1308 	free(req);
1309 }
1310 
1311 static void
1312 vbdev_lvs_examine(struct spdk_bdev *bdev)
1313 {
1314 	struct spdk_bs_dev *bs_dev;
1315 	struct spdk_lvs_with_handle_req *req;
1316 	int rc;
1317 
1318 	req = calloc(1, sizeof(*req));
1319 	if (req == NULL) {
1320 		spdk_bdev_module_examine_done(&g_lvol_if);
1321 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1322 		return;
1323 	}
1324 
1325 	rc = spdk_bdev_create_bs_dev_ext(bdev->name, vbdev_lvs_base_bdev_event_cb,
1326 					 NULL, &bs_dev);
1327 	if (rc < 0) {
1328 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n", bdev->name);
1329 		spdk_bdev_module_examine_done(&g_lvol_if);
1330 		free(req);
1331 		return;
1332 	}
1333 
1334 	req->base_bdev = bdev;
1335 
1336 	spdk_lvs_load(bs_dev, _vbdev_lvs_examine_cb, req);
1337 }
1338 
1339 struct spdk_lvol *
1340 vbdev_lvol_get_from_bdev(struct spdk_bdev *bdev)
1341 {
1342 	if (!bdev || bdev->module != &g_lvol_if) {
1343 		return NULL;
1344 	}
1345 
1346 	if (bdev->ctxt == NULL) {
1347 		SPDK_ERRLOG("No lvol ctx assigned to bdev %s\n", bdev->name);
1348 		return NULL;
1349 	}
1350 
1351 	return (struct spdk_lvol *)bdev->ctxt;
1352 }
1353 
1354 SPDK_LOG_REGISTER_COMPONENT(vbdev_lvol)
1355