xref: /spdk/module/bdev/lvol/vbdev_lvol.c (revision d919a197d60e407aa1137d7512f8b0af92f3d593)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/blob_bdev.h"
35 #include "spdk/rpc.h"
36 #include "spdk/bdev_module.h"
37 #include "spdk/log.h"
38 #include "spdk/string.h"
39 #include "spdk/uuid.h"
40 
41 #include "vbdev_lvol.h"
42 
43 static TAILQ_HEAD(, lvol_store_bdev) g_spdk_lvol_pairs = TAILQ_HEAD_INITIALIZER(
44 			g_spdk_lvol_pairs);
45 
46 static int vbdev_lvs_init(void);
47 static void vbdev_lvs_fini_start(void);
48 static int vbdev_lvs_get_ctx_size(void);
49 static void vbdev_lvs_examine(struct spdk_bdev *bdev);
50 static bool g_shutdown_started = false;
51 
52 static struct spdk_bdev_module g_lvol_if = {
53 	.name = "lvol",
54 	.module_init = vbdev_lvs_init,
55 	.fini_start = vbdev_lvs_fini_start,
56 	.async_fini_start = true,
57 	.examine_disk = vbdev_lvs_examine,
58 	.get_ctx_size = vbdev_lvs_get_ctx_size,
59 
60 };
61 
62 SPDK_BDEV_MODULE_REGISTER(lvol, &g_lvol_if)
63 
64 struct lvol_store_bdev *
65 vbdev_get_lvs_bdev_by_lvs(struct spdk_lvol_store *lvs_orig)
66 {
67 	struct spdk_lvol_store *lvs = NULL;
68 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
69 
70 	while (lvs_bdev != NULL) {
71 		lvs = lvs_bdev->lvs;
72 		if (lvs == lvs_orig) {
73 			if (lvs_bdev->req != NULL) {
74 				/* We do not allow access to lvs that are being destroyed */
75 				return NULL;
76 			} else {
77 				return lvs_bdev;
78 			}
79 		}
80 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
81 	}
82 
83 	return NULL;
84 }
85 
86 static int
87 _vbdev_lvol_change_bdev_alias(struct spdk_lvol *lvol, const char *new_lvol_name)
88 {
89 	struct spdk_bdev_alias *tmp;
90 	char *old_alias;
91 	char *alias;
92 	int rc;
93 	int alias_number = 0;
94 
95 	/* bdev representing lvols have only one alias,
96 	 * while we changed lvs name earlier, we have to iterate alias list to get one,
97 	 * and check if there is only one alias */
98 
99 	TAILQ_FOREACH(tmp, spdk_bdev_get_aliases(lvol->bdev), tailq) {
100 		if (++alias_number > 1) {
101 			SPDK_ERRLOG("There is more than 1 alias in bdev %s\n", lvol->bdev->name);
102 			return -EINVAL;
103 		}
104 
105 		old_alias = tmp->alias.name;
106 	}
107 
108 	if (alias_number == 0) {
109 		SPDK_ERRLOG("There are no aliases in bdev %s\n", lvol->bdev->name);
110 		return -EINVAL;
111 	}
112 
113 	alias = spdk_sprintf_alloc("%s/%s", lvol->lvol_store->name, new_lvol_name);
114 	if (alias == NULL) {
115 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
116 		return -ENOMEM;
117 	}
118 
119 	rc = spdk_bdev_alias_add(lvol->bdev, alias);
120 	if (rc != 0) {
121 		SPDK_ERRLOG("cannot add alias '%s'\n", alias);
122 		free(alias);
123 		return rc;
124 	}
125 	free(alias);
126 
127 	rc = spdk_bdev_alias_del(lvol->bdev, old_alias);
128 	if (rc != 0) {
129 		SPDK_ERRLOG("cannot remove alias '%s'\n", old_alias);
130 		return rc;
131 	}
132 
133 	return 0;
134 }
135 
136 static struct lvol_store_bdev *
137 vbdev_get_lvs_bdev_by_bdev(struct spdk_bdev *bdev_orig)
138 {
139 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
140 
141 	while (lvs_bdev != NULL) {
142 		if (lvs_bdev->bdev == bdev_orig) {
143 			if (lvs_bdev->req != NULL) {
144 				/* We do not allow access to lvs that are being destroyed */
145 				return NULL;
146 			} else {
147 				return lvs_bdev;
148 			}
149 		}
150 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
151 	}
152 
153 	return NULL;
154 }
155 
156 static void
157 vbdev_lvs_hotremove_cb(struct spdk_bdev *bdev)
158 {
159 	struct lvol_store_bdev *lvs_bdev;
160 
161 	lvs_bdev = vbdev_get_lvs_bdev_by_bdev(bdev);
162 	if (lvs_bdev != NULL) {
163 		vbdev_lvs_unload(lvs_bdev->lvs, NULL, NULL);
164 	}
165 }
166 
167 static void
168 vbdev_lvs_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
169 			     void *event_ctx)
170 {
171 	switch (type) {
172 	case SPDK_BDEV_EVENT_REMOVE:
173 		vbdev_lvs_hotremove_cb(bdev);
174 		break;
175 	default:
176 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
177 		break;
178 	}
179 }
180 
181 static void
182 _vbdev_lvs_create_cb(void *cb_arg, struct spdk_lvol_store *lvs, int lvserrno)
183 {
184 	struct spdk_lvs_with_handle_req *req = cb_arg;
185 	struct lvol_store_bdev *lvs_bdev;
186 	struct spdk_bdev *bdev = req->base_bdev;
187 	struct spdk_bs_dev *bs_dev = req->bs_dev;
188 
189 	if (lvserrno != 0) {
190 		assert(lvs == NULL);
191 		SPDK_ERRLOG("Cannot create lvol store bdev\n");
192 		goto end;
193 	}
194 
195 	lvserrno = spdk_bs_bdev_claim(bs_dev, &g_lvol_if);
196 	if (lvserrno != 0) {
197 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
198 		req->bs_dev->destroy(req->bs_dev);
199 		goto end;
200 	}
201 
202 	assert(lvs != NULL);
203 
204 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
205 	if (!lvs_bdev) {
206 		lvserrno = -ENOMEM;
207 		goto end;
208 	}
209 	lvs_bdev->lvs = lvs;
210 	lvs_bdev->bdev = bdev;
211 	lvs_bdev->req = NULL;
212 
213 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
214 	SPDK_INFOLOG(vbdev_lvol, "Lvol store bdev inserted\n");
215 
216 end:
217 	req->cb_fn(req->cb_arg, lvs, lvserrno);
218 	free(req);
219 
220 	return;
221 }
222 
223 int
224 vbdev_lvs_create(const char *base_bdev_name, const char *name, uint32_t cluster_sz,
225 		 enum lvs_clear_method clear_method, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
226 {
227 	struct spdk_bs_dev *bs_dev;
228 	struct spdk_lvs_with_handle_req *lvs_req;
229 	struct spdk_lvs_opts opts;
230 	int rc;
231 	int len;
232 
233 	if (base_bdev_name == NULL) {
234 		SPDK_ERRLOG("missing base_bdev_name param\n");
235 		return -EINVAL;
236 	}
237 
238 	spdk_lvs_opts_init(&opts);
239 	if (cluster_sz != 0) {
240 		opts.cluster_sz = cluster_sz;
241 	}
242 
243 	if (clear_method != 0) {
244 		opts.clear_method = clear_method;
245 	}
246 
247 	if (name == NULL) {
248 		SPDK_ERRLOG("missing name param\n");
249 		return -EINVAL;
250 	}
251 
252 	len = strnlen(name, SPDK_LVS_NAME_MAX);
253 
254 	if (len == 0 || len == SPDK_LVS_NAME_MAX) {
255 		SPDK_ERRLOG("name must be between 1 and %d characters\n", SPDK_LVS_NAME_MAX - 1);
256 		return -EINVAL;
257 	}
258 	snprintf(opts.name, sizeof(opts.name), "%s", name);
259 
260 	lvs_req = calloc(1, sizeof(*lvs_req));
261 	if (!lvs_req) {
262 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
263 		return -ENOMEM;
264 	}
265 
266 	rc = spdk_bdev_create_bs_dev_ext(base_bdev_name, vbdev_lvs_base_bdev_event_cb,
267 					 NULL, &bs_dev);
268 	if (rc < 0) {
269 		SPDK_ERRLOG("Cannot create blobstore device\n");
270 		free(lvs_req);
271 		return rc;
272 	}
273 
274 	lvs_req->bs_dev = bs_dev;
275 	lvs_req->base_bdev = bs_dev->get_base_bdev(bs_dev);
276 	lvs_req->cb_fn = cb_fn;
277 	lvs_req->cb_arg = cb_arg;
278 
279 	rc = spdk_lvs_init(bs_dev, &opts, _vbdev_lvs_create_cb, lvs_req);
280 	if (rc < 0) {
281 		free(lvs_req);
282 		bs_dev->destroy(bs_dev);
283 		return rc;
284 	}
285 
286 	return 0;
287 }
288 
289 static void
290 _vbdev_lvs_rename_cb(void *cb_arg, int lvserrno)
291 {
292 	struct spdk_lvs_req *req = cb_arg;
293 	struct spdk_lvol *tmp;
294 
295 	if (lvserrno != 0) {
296 		SPDK_INFOLOG(vbdev_lvol, "Lvol store rename failed\n");
297 	} else {
298 		TAILQ_FOREACH(tmp, &req->lvol_store->lvols, link) {
299 			/* We have to pass current lvol name, since only lvs name changed */
300 			_vbdev_lvol_change_bdev_alias(tmp, tmp->name);
301 		}
302 	}
303 
304 	req->cb_fn(req->cb_arg, lvserrno);
305 	free(req);
306 }
307 
308 void
309 vbdev_lvs_rename(struct spdk_lvol_store *lvs, const char *new_lvs_name,
310 		 spdk_lvs_op_complete cb_fn, void *cb_arg)
311 {
312 	struct lvol_store_bdev *lvs_bdev;
313 
314 	struct spdk_lvs_req *req;
315 
316 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
317 	if (!lvs_bdev) {
318 		SPDK_ERRLOG("No such lvol store found\n");
319 		cb_fn(cb_arg, -ENODEV);
320 		return;
321 	}
322 
323 	req = calloc(1, sizeof(*req));
324 	if (!req) {
325 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
326 		cb_fn(cb_arg, -ENOMEM);
327 		return;
328 	}
329 	req->cb_fn = cb_fn;
330 	req->cb_arg = cb_arg;
331 	req->lvol_store = lvs;
332 
333 	spdk_lvs_rename(lvs, new_lvs_name, _vbdev_lvs_rename_cb, req);
334 }
335 
336 static void
337 _vbdev_lvs_remove_cb(void *cb_arg, int lvserrno)
338 {
339 	struct lvol_store_bdev *lvs_bdev = cb_arg;
340 	struct spdk_lvs_req *req = lvs_bdev->req;
341 
342 	if (lvserrno != 0) {
343 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
344 	}
345 
346 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
347 	free(lvs_bdev);
348 
349 	if (req->cb_fn != NULL) {
350 		req->cb_fn(req->cb_arg, lvserrno);
351 	}
352 	free(req);
353 }
354 
355 static void
356 _vbdev_lvs_remove_lvol_cb(void *cb_arg, int lvolerrno)
357 {
358 	struct lvol_store_bdev *lvs_bdev = cb_arg;
359 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
360 	struct spdk_lvol *lvol;
361 
362 	if (lvolerrno != 0) {
363 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol removed with errno %d\n", lvolerrno);
364 	}
365 
366 	if (TAILQ_EMPTY(&lvs->lvols)) {
367 		spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
368 		return;
369 	}
370 
371 	lvol = TAILQ_FIRST(&lvs->lvols);
372 	while (lvol != NULL) {
373 		if (spdk_lvol_deletable(lvol)) {
374 			vbdev_lvol_destroy(lvol, _vbdev_lvs_remove_lvol_cb, lvs_bdev);
375 			return;
376 		}
377 		lvol = TAILQ_NEXT(lvol, link);
378 	}
379 
380 	/* If no lvol is deletable, that means there is circular dependency. */
381 	SPDK_ERRLOG("Lvols left in lvs, but unable to delete.\n");
382 	assert(false);
383 }
384 
385 static bool
386 _vbdev_lvs_are_lvols_closed(struct spdk_lvol_store *lvs)
387 {
388 	struct spdk_lvol *lvol;
389 
390 	TAILQ_FOREACH(lvol, &lvs->lvols, link) {
391 		if (lvol->ref_count != 0) {
392 			return false;
393 		}
394 	}
395 	return true;
396 }
397 
398 static void
399 _vbdev_lvs_remove_bdev_unregistered_cb(void *cb_arg, int bdeverrno)
400 {
401 	struct lvol_store_bdev *lvs_bdev = cb_arg;
402 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
403 
404 	if (bdeverrno != 0) {
405 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol unregistered with errno %d\n", bdeverrno);
406 	}
407 
408 	/* Lvol store can be unloaded once all lvols are closed. */
409 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
410 		spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
411 	}
412 }
413 
414 static void
415 _vbdev_lvs_remove(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg,
416 		  bool destroy)
417 {
418 	struct spdk_lvs_req *req;
419 	struct lvol_store_bdev *lvs_bdev;
420 	struct spdk_lvol *lvol, *tmp;
421 
422 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
423 	if (!lvs_bdev) {
424 		SPDK_ERRLOG("No such lvol store found\n");
425 		if (cb_fn != NULL) {
426 			cb_fn(cb_arg, -ENODEV);
427 		}
428 		return;
429 	}
430 
431 	req = calloc(1, sizeof(*req));
432 	if (!req) {
433 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
434 		if (cb_fn != NULL) {
435 			cb_fn(cb_arg, -ENOMEM);
436 		}
437 		return;
438 	}
439 
440 	req->cb_fn = cb_fn;
441 	req->cb_arg = cb_arg;
442 	lvs_bdev->req = req;
443 
444 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
445 		if (destroy) {
446 			spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
447 		} else {
448 			spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
449 		}
450 	} else {
451 		lvs->destruct = destroy;
452 		if (destroy) {
453 			_vbdev_lvs_remove_lvol_cb(lvs_bdev, 0);
454 		} else {
455 			TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
456 				spdk_bdev_unregister(lvol->bdev, _vbdev_lvs_remove_bdev_unregistered_cb, lvs_bdev);
457 			}
458 		}
459 	}
460 }
461 
462 void
463 vbdev_lvs_unload(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
464 {
465 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, false);
466 }
467 
468 void
469 vbdev_lvs_destruct(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
470 {
471 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, true);
472 }
473 
474 struct lvol_store_bdev *
475 vbdev_lvol_store_first(void)
476 {
477 	struct lvol_store_bdev *lvs_bdev;
478 
479 	lvs_bdev = TAILQ_FIRST(&g_spdk_lvol_pairs);
480 	if (lvs_bdev) {
481 		SPDK_INFOLOG(vbdev_lvol, "Starting lvolstore iteration at %p\n", lvs_bdev->lvs);
482 	}
483 
484 	return lvs_bdev;
485 }
486 
487 struct lvol_store_bdev *
488 vbdev_lvol_store_next(struct lvol_store_bdev *prev)
489 {
490 	struct lvol_store_bdev *lvs_bdev;
491 
492 	if (prev == NULL) {
493 		SPDK_ERRLOG("prev argument cannot be NULL\n");
494 		return NULL;
495 	}
496 
497 	lvs_bdev = TAILQ_NEXT(prev, lvol_stores);
498 	if (lvs_bdev) {
499 		SPDK_INFOLOG(vbdev_lvol, "Continuing lvolstore iteration at %p\n", lvs_bdev->lvs);
500 	}
501 
502 	return lvs_bdev;
503 }
504 
505 static struct spdk_lvol_store *
506 _vbdev_get_lvol_store_by_uuid(const struct spdk_uuid *uuid)
507 {
508 	struct spdk_lvol_store *lvs = NULL;
509 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
510 
511 	while (lvs_bdev != NULL) {
512 		lvs = lvs_bdev->lvs;
513 		if (spdk_uuid_compare(&lvs->uuid, uuid) == 0) {
514 			return lvs;
515 		}
516 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
517 	}
518 	return NULL;
519 }
520 
521 struct spdk_lvol_store *
522 vbdev_get_lvol_store_by_uuid(const char *uuid_str)
523 {
524 	struct spdk_uuid uuid;
525 
526 	if (spdk_uuid_parse(&uuid, uuid_str)) {
527 		return NULL;
528 	}
529 
530 	return _vbdev_get_lvol_store_by_uuid(&uuid);
531 }
532 
533 struct spdk_lvol_store *
534 vbdev_get_lvol_store_by_name(const char *name)
535 {
536 	struct spdk_lvol_store *lvs = NULL;
537 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
538 
539 	while (lvs_bdev != NULL) {
540 		lvs = lvs_bdev->lvs;
541 		if (strncmp(lvs->name, name, sizeof(lvs->name)) == 0) {
542 			return lvs;
543 		}
544 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
545 	}
546 	return NULL;
547 }
548 
549 struct vbdev_lvol_destroy_ctx {
550 	struct spdk_lvol *lvol;
551 	spdk_lvol_op_complete cb_fn;
552 	void *cb_arg;
553 };
554 
555 static void
556 _vbdev_lvol_unregister_unload_lvs(void *cb_arg, int lvserrno)
557 {
558 	struct lvol_bdev *lvol_bdev = cb_arg;
559 	struct lvol_store_bdev *lvs_bdev = lvol_bdev->lvs_bdev;
560 
561 	if (lvserrno != 0) {
562 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
563 	}
564 
565 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
566 	free(lvs_bdev);
567 
568 	spdk_bdev_destruct_done(&lvol_bdev->bdev, lvserrno);
569 	free(lvol_bdev);
570 }
571 
572 static void
573 _vbdev_lvol_unregister_cb(void *ctx, int lvolerrno)
574 {
575 	struct lvol_bdev *lvol_bdev = ctx;
576 	struct lvol_store_bdev *lvs_bdev = lvol_bdev->lvs_bdev;
577 
578 	if (g_shutdown_started && _vbdev_lvs_are_lvols_closed(lvs_bdev->lvs)) {
579 		spdk_lvs_unload(lvs_bdev->lvs, _vbdev_lvol_unregister_unload_lvs, lvol_bdev);
580 		return;
581 	}
582 
583 	spdk_bdev_destruct_done(&lvol_bdev->bdev, lvolerrno);
584 	free(lvol_bdev);
585 }
586 
587 static int
588 vbdev_lvol_unregister(void *ctx)
589 {
590 	struct spdk_lvol *lvol = ctx;
591 	struct lvol_bdev *lvol_bdev;
592 
593 	assert(lvol != NULL);
594 	lvol_bdev = SPDK_CONTAINEROF(lvol->bdev, struct lvol_bdev, bdev);
595 
596 	spdk_bdev_alias_del_all(lvol->bdev);
597 	spdk_lvol_close(lvol, _vbdev_lvol_unregister_cb, lvol_bdev);
598 
599 	/* return 1 to indicate we have an operation that must finish asynchronously before the
600 	 *  lvol is closed
601 	 */
602 	return 1;
603 }
604 
605 static void
606 _vbdev_lvol_destroy_cb(void *cb_arg, int bdeverrno)
607 {
608 	struct vbdev_lvol_destroy_ctx *ctx = cb_arg;
609 	struct spdk_lvol *lvol = ctx->lvol;
610 
611 	if (bdeverrno < 0) {
612 		SPDK_INFOLOG(vbdev_lvol, "Could not unregister bdev during lvol (%s) destroy\n",
613 			     lvol->unique_id);
614 		ctx->cb_fn(ctx->cb_arg, bdeverrno);
615 		free(ctx);
616 		return;
617 	}
618 
619 	spdk_lvol_destroy(lvol, ctx->cb_fn, ctx->cb_arg);
620 	free(ctx);
621 }
622 
623 void
624 vbdev_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
625 {
626 	struct vbdev_lvol_destroy_ctx *ctx;
627 	size_t count;
628 
629 	assert(lvol != NULL);
630 	assert(cb_fn != NULL);
631 
632 	/* Check if it is possible to delete lvol */
633 	spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
634 	if (count > 1) {
635 		/* throw an error */
636 		SPDK_ERRLOG("Cannot delete lvol\n");
637 		cb_fn(cb_arg, -EPERM);
638 		return;
639 	}
640 
641 	ctx = calloc(1, sizeof(*ctx));
642 	if (!ctx) {
643 		cb_fn(cb_arg, -ENOMEM);
644 		return;
645 	}
646 
647 	ctx->lvol = lvol;
648 	ctx->cb_fn = cb_fn;
649 	ctx->cb_arg = cb_arg;
650 
651 	spdk_bdev_unregister(lvol->bdev, _vbdev_lvol_destroy_cb, ctx);
652 }
653 
654 static char *
655 vbdev_lvol_find_name(struct spdk_lvol *lvol, spdk_blob_id blob_id)
656 {
657 	struct spdk_lvol_store *lvs;
658 	struct spdk_lvol *_lvol;
659 
660 	assert(lvol != NULL);
661 
662 	lvs = lvol->lvol_store;
663 
664 	assert(lvs);
665 
666 	TAILQ_FOREACH(_lvol, &lvs->lvols, link) {
667 		if (_lvol->blob_id == blob_id) {
668 			return _lvol->name;
669 		}
670 	}
671 
672 	return NULL;
673 }
674 
675 static int
676 vbdev_lvol_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
677 {
678 	struct spdk_lvol *lvol = ctx;
679 	struct lvol_store_bdev *lvs_bdev;
680 	struct spdk_bdev *bdev;
681 	struct spdk_blob *blob;
682 	char lvol_store_uuid[SPDK_UUID_STRING_LEN];
683 	spdk_blob_id *ids = NULL;
684 	size_t count, i;
685 	char *name;
686 	int rc = 0;
687 
688 	spdk_json_write_named_object_begin(w, "lvol");
689 
690 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
691 	if (!lvs_bdev) {
692 		SPDK_ERRLOG("No such lvol store found\n");
693 		rc = -ENODEV;
694 		goto end;
695 	}
696 
697 	bdev = lvs_bdev->bdev;
698 
699 	spdk_uuid_fmt_lower(lvol_store_uuid, sizeof(lvol_store_uuid), &lvol->lvol_store->uuid);
700 	spdk_json_write_named_string(w, "lvol_store_uuid", lvol_store_uuid);
701 
702 	spdk_json_write_named_string(w, "base_bdev", spdk_bdev_get_name(bdev));
703 
704 	blob = lvol->blob;
705 
706 	spdk_json_write_named_bool(w, "thin_provision", spdk_blob_is_thin_provisioned(blob));
707 
708 	spdk_json_write_named_bool(w, "snapshot", spdk_blob_is_snapshot(blob));
709 
710 	spdk_json_write_named_bool(w, "clone", spdk_blob_is_clone(blob));
711 
712 	if (spdk_blob_is_clone(blob)) {
713 		spdk_blob_id snapshotid = spdk_blob_get_parent_snapshot(lvol->lvol_store->blobstore, lvol->blob_id);
714 		if (snapshotid != SPDK_BLOBID_INVALID) {
715 			name = vbdev_lvol_find_name(lvol, snapshotid);
716 			if (name != NULL) {
717 				spdk_json_write_named_string(w, "base_snapshot", name);
718 			} else {
719 				SPDK_ERRLOG("Cannot obtain snapshots name\n");
720 			}
721 		}
722 	}
723 
724 	if (spdk_blob_is_snapshot(blob)) {
725 		/* Take a number of clones */
726 		rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
727 		if (rc == -ENOMEM && count > 0) {
728 			ids = malloc(sizeof(spdk_blob_id) * count);
729 			if (ids == NULL) {
730 				SPDK_ERRLOG("Cannot allocate memory\n");
731 				rc = -ENOMEM;
732 				goto end;
733 			}
734 
735 			rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, ids, &count);
736 			if (rc == 0) {
737 				spdk_json_write_named_array_begin(w, "clones");
738 				for (i = 0; i < count; i++) {
739 					name = vbdev_lvol_find_name(lvol, ids[i]);
740 					if (name != NULL) {
741 						spdk_json_write_string(w, name);
742 					} else {
743 						SPDK_ERRLOG("Cannot obtain clone name\n");
744 					}
745 
746 				}
747 				spdk_json_write_array_end(w);
748 			}
749 			free(ids);
750 		}
751 
752 	}
753 
754 end:
755 	spdk_json_write_object_end(w);
756 
757 	return rc;
758 }
759 
760 static void
761 vbdev_lvol_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
762 {
763 	/* Nothing to dump as lvol configuration is saved on physical device. */
764 }
765 
766 static struct spdk_io_channel *
767 vbdev_lvol_get_io_channel(void *ctx)
768 {
769 	struct spdk_lvol *lvol = ctx;
770 
771 	return spdk_lvol_get_io_channel(lvol);
772 }
773 
774 static bool
775 vbdev_lvol_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
776 {
777 	struct spdk_lvol *lvol = ctx;
778 
779 	switch (io_type) {
780 	case SPDK_BDEV_IO_TYPE_WRITE:
781 	case SPDK_BDEV_IO_TYPE_UNMAP:
782 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
783 		return !spdk_blob_is_read_only(lvol->blob);
784 	case SPDK_BDEV_IO_TYPE_RESET:
785 	case SPDK_BDEV_IO_TYPE_READ:
786 		return true;
787 	default:
788 		return false;
789 	}
790 }
791 
792 static void
793 lvol_op_comp(void *cb_arg, int bserrno)
794 {
795 	struct spdk_bdev_io *bdev_io = cb_arg;
796 	enum spdk_bdev_io_status status = SPDK_BDEV_IO_STATUS_SUCCESS;
797 
798 	if (bserrno != 0) {
799 		if (bserrno == -ENOMEM) {
800 			status = SPDK_BDEV_IO_STATUS_NOMEM;
801 		} else {
802 			status = SPDK_BDEV_IO_STATUS_FAILED;
803 		}
804 	}
805 
806 	spdk_bdev_io_complete(bdev_io, status);
807 }
808 
809 static void
810 lvol_unmap(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
811 {
812 	uint64_t start_page, num_pages;
813 	struct spdk_blob *blob = lvol->blob;
814 
815 	start_page = bdev_io->u.bdev.offset_blocks;
816 	num_pages = bdev_io->u.bdev.num_blocks;
817 
818 	spdk_blob_io_unmap(blob, ch, start_page, num_pages, lvol_op_comp, bdev_io);
819 }
820 
821 static void
822 lvol_write_zeroes(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
823 {
824 	uint64_t start_page, num_pages;
825 	struct spdk_blob *blob = lvol->blob;
826 
827 	start_page = bdev_io->u.bdev.offset_blocks;
828 	num_pages = bdev_io->u.bdev.num_blocks;
829 
830 	spdk_blob_io_write_zeroes(blob, ch, start_page, num_pages, lvol_op_comp, bdev_io);
831 }
832 
833 static void
834 lvol_read(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
835 {
836 	uint64_t start_page, num_pages;
837 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
838 	struct spdk_blob *blob = lvol->blob;
839 
840 	start_page = bdev_io->u.bdev.offset_blocks;
841 	num_pages = bdev_io->u.bdev.num_blocks;
842 
843 	spdk_blob_io_readv(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
844 			   num_pages, lvol_op_comp, bdev_io);
845 }
846 
847 static void
848 lvol_write(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
849 {
850 	uint64_t start_page, num_pages;
851 	struct spdk_blob *blob = lvol->blob;
852 
853 	start_page = bdev_io->u.bdev.offset_blocks;
854 	num_pages = bdev_io->u.bdev.num_blocks;
855 
856 	spdk_blob_io_writev(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
857 			    num_pages, lvol_op_comp, bdev_io);
858 }
859 
860 static int
861 lvol_reset(struct spdk_bdev_io *bdev_io)
862 {
863 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
864 
865 	return 0;
866 }
867 
868 static void
869 lvol_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
870 {
871 	if (!success) {
872 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
873 		return;
874 	}
875 
876 	lvol_read(ch, bdev_io);
877 }
878 
879 static void
880 vbdev_lvol_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
881 {
882 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
883 
884 	switch (bdev_io->type) {
885 	case SPDK_BDEV_IO_TYPE_READ:
886 		spdk_bdev_io_get_buf(bdev_io, lvol_get_buf_cb,
887 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
888 		break;
889 	case SPDK_BDEV_IO_TYPE_WRITE:
890 		lvol_write(lvol, ch, bdev_io);
891 		break;
892 	case SPDK_BDEV_IO_TYPE_RESET:
893 		lvol_reset(bdev_io);
894 		break;
895 	case SPDK_BDEV_IO_TYPE_UNMAP:
896 		lvol_unmap(lvol, ch, bdev_io);
897 		break;
898 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
899 		lvol_write_zeroes(lvol, ch, bdev_io);
900 		break;
901 	default:
902 		SPDK_INFOLOG(vbdev_lvol, "lvol: unsupported I/O type %d\n", bdev_io->type);
903 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
904 		return;
905 	}
906 	return;
907 }
908 
909 static struct spdk_bdev_fn_table vbdev_lvol_fn_table = {
910 	.destruct		= vbdev_lvol_unregister,
911 	.io_type_supported	= vbdev_lvol_io_type_supported,
912 	.submit_request		= vbdev_lvol_submit_request,
913 	.get_io_channel		= vbdev_lvol_get_io_channel,
914 	.dump_info_json		= vbdev_lvol_dump_info_json,
915 	.write_config_json	= vbdev_lvol_write_config_json,
916 };
917 
918 static void
919 lvol_destroy_cb(void *cb_arg, int bdeverrno)
920 {
921 }
922 
923 static void
924 _create_lvol_disk_destroy_cb(void *cb_arg, int bdeverrno)
925 {
926 	struct spdk_lvol *lvol = cb_arg;
927 
928 	if (bdeverrno < 0) {
929 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
930 			    lvol->unique_id);
931 		return;
932 	}
933 
934 	spdk_lvol_destroy(lvol, lvol_destroy_cb, NULL);
935 }
936 
937 static void
938 _create_lvol_disk_unload_cb(void *cb_arg, int bdeverrno)
939 {
940 	struct spdk_lvol *lvol = cb_arg;
941 
942 	if (bdeverrno < 0) {
943 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
944 			    lvol->unique_id);
945 		return;
946 	}
947 
948 	TAILQ_REMOVE(&lvol->lvol_store->lvols, lvol, link);
949 	free(lvol);
950 }
951 
952 static int
953 _create_lvol_disk(struct spdk_lvol *lvol, bool destroy)
954 {
955 	struct spdk_bdev *bdev;
956 	struct lvol_bdev *lvol_bdev;
957 	struct lvol_store_bdev *lvs_bdev;
958 	uint64_t total_size;
959 	unsigned char *alias;
960 	int rc;
961 
962 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
963 	if (lvs_bdev == NULL) {
964 		SPDK_ERRLOG("No spdk lvs-bdev pair found for lvol %s\n", lvol->unique_id);
965 		return -ENODEV;
966 	}
967 
968 	lvol_bdev = calloc(1, sizeof(struct lvol_bdev));
969 	if (!lvol_bdev) {
970 		SPDK_ERRLOG("Cannot alloc memory for lvol bdev\n");
971 		return -ENOMEM;
972 	}
973 
974 	lvol_bdev->lvol = lvol;
975 	lvol_bdev->lvs_bdev = lvs_bdev;
976 
977 	bdev = &lvol_bdev->bdev;
978 	bdev->name = lvol->unique_id;
979 	bdev->product_name = "Logical Volume";
980 	bdev->blocklen = spdk_bs_get_io_unit_size(lvol->lvol_store->blobstore);
981 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
982 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
983 	assert((total_size % bdev->blocklen) == 0);
984 	bdev->blockcnt = total_size / bdev->blocklen;
985 	bdev->uuid = lvol->uuid;
986 	bdev->required_alignment = lvs_bdev->bdev->required_alignment;
987 	bdev->split_on_optimal_io_boundary = true;
988 	bdev->optimal_io_boundary = spdk_bs_get_cluster_size(lvol->lvol_store->blobstore) / bdev->blocklen;
989 
990 	bdev->ctxt = lvol;
991 	bdev->fn_table = &vbdev_lvol_fn_table;
992 	bdev->module = &g_lvol_if;
993 
994 	rc = spdk_bdev_register(bdev);
995 	if (rc) {
996 		free(lvol_bdev);
997 		return rc;
998 	}
999 	lvol->bdev = bdev;
1000 
1001 	alias = spdk_sprintf_alloc("%s/%s", lvs_bdev->lvs->name, lvol->name);
1002 	if (alias == NULL) {
1003 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
1004 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
1005 						  _create_lvol_disk_unload_cb), lvol);
1006 		return -ENOMEM;
1007 	}
1008 
1009 	rc = spdk_bdev_alias_add(bdev, alias);
1010 	if (rc != 0) {
1011 		SPDK_ERRLOG("Cannot add alias to lvol bdev\n");
1012 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
1013 						  _create_lvol_disk_unload_cb), lvol);
1014 	}
1015 	free(alias);
1016 
1017 	return rc;
1018 }
1019 
1020 static void
1021 _vbdev_lvol_create_cb(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1022 {
1023 	struct spdk_lvol_with_handle_req *req = cb_arg;
1024 
1025 	if (lvolerrno < 0) {
1026 		goto end;
1027 	}
1028 
1029 	lvolerrno = _create_lvol_disk(lvol, true);
1030 
1031 end:
1032 	req->cb_fn(req->cb_arg, lvol, lvolerrno);
1033 	free(req);
1034 }
1035 
1036 int
1037 vbdev_lvol_create(struct spdk_lvol_store *lvs, const char *name, uint64_t sz,
1038 		  bool thin_provision, enum lvol_clear_method clear_method, spdk_lvol_op_with_handle_complete cb_fn,
1039 		  void *cb_arg)
1040 {
1041 	struct spdk_lvol_with_handle_req *req;
1042 	int rc;
1043 
1044 	req = calloc(1, sizeof(*req));
1045 	if (req == NULL) {
1046 		return -ENOMEM;
1047 	}
1048 	req->cb_fn = cb_fn;
1049 	req->cb_arg = cb_arg;
1050 
1051 	rc = spdk_lvol_create(lvs, name, sz, thin_provision, clear_method,
1052 			      _vbdev_lvol_create_cb, req);
1053 	if (rc != 0) {
1054 		free(req);
1055 	}
1056 
1057 	return rc;
1058 }
1059 
1060 void
1061 vbdev_lvol_create_snapshot(struct spdk_lvol *lvol, const char *snapshot_name,
1062 			   spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1063 {
1064 	struct spdk_lvol_with_handle_req *req;
1065 
1066 	req = calloc(1, sizeof(*req));
1067 	if (req == NULL) {
1068 		cb_fn(cb_arg, NULL, -ENOMEM);
1069 		return;
1070 	}
1071 
1072 	req->cb_fn = cb_fn;
1073 	req->cb_arg = cb_arg;
1074 
1075 	spdk_lvol_create_snapshot(lvol, snapshot_name, _vbdev_lvol_create_cb, req);
1076 }
1077 
1078 void
1079 vbdev_lvol_create_clone(struct spdk_lvol *lvol, const char *clone_name,
1080 			spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1081 {
1082 	struct spdk_lvol_with_handle_req *req;
1083 
1084 	req = calloc(1, sizeof(*req));
1085 	if (req == NULL) {
1086 		cb_fn(cb_arg, NULL, -ENOMEM);
1087 		return;
1088 	}
1089 
1090 	req->cb_fn = cb_fn;
1091 	req->cb_arg = cb_arg;
1092 
1093 	spdk_lvol_create_clone(lvol, clone_name, _vbdev_lvol_create_cb, req);
1094 }
1095 
1096 static void
1097 _vbdev_lvol_rename_cb(void *cb_arg, int lvolerrno)
1098 {
1099 	struct spdk_lvol_req *req = cb_arg;
1100 
1101 	if (lvolerrno != 0) {
1102 		SPDK_ERRLOG("Renaming lvol failed\n");
1103 	}
1104 
1105 	req->cb_fn(req->cb_arg, lvolerrno);
1106 	free(req);
1107 }
1108 
1109 void
1110 vbdev_lvol_rename(struct spdk_lvol *lvol, const char *new_lvol_name,
1111 		  spdk_lvol_op_complete cb_fn, void *cb_arg)
1112 {
1113 	struct spdk_lvol_req *req;
1114 	int rc;
1115 
1116 	rc = _vbdev_lvol_change_bdev_alias(lvol, new_lvol_name);
1117 	if (rc != 0) {
1118 		SPDK_ERRLOG("renaming lvol to '%s' does not succeed\n", new_lvol_name);
1119 		cb_fn(cb_arg, rc);
1120 		return;
1121 	}
1122 
1123 	req = calloc(1, sizeof(*req));
1124 	if (req == NULL) {
1125 		cb_fn(cb_arg, -ENOMEM);
1126 		return;
1127 	}
1128 	req->cb_fn = cb_fn;
1129 	req->cb_arg = cb_arg;
1130 
1131 	spdk_lvol_rename(lvol, new_lvol_name, _vbdev_lvol_rename_cb, req);
1132 }
1133 
1134 static void
1135 _vbdev_lvol_resize_cb(void *cb_arg, int lvolerrno)
1136 {
1137 	struct spdk_lvol_req *req = cb_arg;
1138 	struct spdk_lvol *lvol = req->lvol;
1139 	uint64_t total_size;
1140 
1141 	/* change bdev size */
1142 	if (lvolerrno != 0) {
1143 		SPDK_ERRLOG("CB function for bdev lvol %s receive error no: %d.\n", lvol->name, lvolerrno);
1144 		goto finish;
1145 	}
1146 
1147 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
1148 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
1149 	assert((total_size % lvol->bdev->blocklen) == 0);
1150 
1151 	lvolerrno = spdk_bdev_notify_blockcnt_change(lvol->bdev, total_size / lvol->bdev->blocklen);
1152 	if (lvolerrno != 0) {
1153 		SPDK_ERRLOG("Could not change num blocks for bdev lvol %s with error no: %d.\n",
1154 			    lvol->name, lvolerrno);
1155 	}
1156 
1157 finish:
1158 	req->cb_fn(req->cb_arg, lvolerrno);
1159 	free(req);
1160 }
1161 
1162 void
1163 vbdev_lvol_resize(struct spdk_lvol *lvol, uint64_t sz, spdk_lvol_op_complete cb_fn, void *cb_arg)
1164 {
1165 	struct spdk_lvol_req *req;
1166 
1167 	if (lvol == NULL) {
1168 		SPDK_ERRLOG("lvol does not exist\n");
1169 		cb_fn(cb_arg, -EINVAL);
1170 		return;
1171 	}
1172 
1173 	assert(lvol->bdev != NULL);
1174 
1175 	req = calloc(1, sizeof(*req));
1176 	if (req == NULL) {
1177 		cb_fn(cb_arg, -ENOMEM);
1178 		return;
1179 	}
1180 
1181 	req->cb_fn = cb_fn;
1182 	req->cb_arg = cb_arg;
1183 	req->sz = sz;
1184 	req->lvol = lvol;
1185 
1186 	spdk_lvol_resize(req->lvol, req->sz, _vbdev_lvol_resize_cb, req);
1187 }
1188 
1189 static void
1190 _vbdev_lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
1191 {
1192 	struct spdk_lvol_req *req = cb_arg;
1193 	struct spdk_lvol *lvol = req->lvol;
1194 
1195 	if (lvolerrno != 0) {
1196 		SPDK_ERRLOG("Could not set bdev lvol %s as read only due to error: %d.\n", lvol->name, lvolerrno);
1197 	}
1198 
1199 	req->cb_fn(req->cb_arg, lvolerrno);
1200 	free(req);
1201 }
1202 
1203 void
1204 vbdev_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1205 {
1206 	struct spdk_lvol_req *req;
1207 
1208 	if (lvol == NULL) {
1209 		SPDK_ERRLOG("lvol does not exist\n");
1210 		cb_fn(cb_arg, -EINVAL);
1211 		return;
1212 	}
1213 
1214 	assert(lvol->bdev != NULL);
1215 
1216 	req = calloc(1, sizeof(*req));
1217 	if (req == NULL) {
1218 		cb_fn(cb_arg, -ENOMEM);
1219 		return;
1220 	}
1221 
1222 	req->cb_fn = cb_fn;
1223 	req->cb_arg = cb_arg;
1224 	req->lvol = lvol;
1225 
1226 	spdk_lvol_set_read_only(lvol, _vbdev_lvol_set_read_only_cb, req);
1227 }
1228 
1229 static int
1230 vbdev_lvs_init(void)
1231 {
1232 	return 0;
1233 }
1234 
1235 static void vbdev_lvs_fini_start_iter(struct lvol_store_bdev *lvs_bdev);
1236 
1237 static void
1238 vbdev_lvs_fini_start_unload_cb(void *cb_arg, int lvserrno)
1239 {
1240 	struct lvol_store_bdev *lvs_bdev = cb_arg;
1241 	struct lvol_store_bdev *next_lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
1242 
1243 	if (lvserrno != 0) {
1244 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
1245 	}
1246 
1247 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1248 	free(lvs_bdev);
1249 
1250 	vbdev_lvs_fini_start_iter(next_lvs_bdev);
1251 }
1252 
1253 static void
1254 vbdev_lvs_fini_start_iter(struct lvol_store_bdev *lvs_bdev)
1255 {
1256 	struct spdk_lvol_store *lvs;
1257 
1258 	while (lvs_bdev != NULL) {
1259 		lvs = lvs_bdev->lvs;
1260 
1261 		if (_vbdev_lvs_are_lvols_closed(lvs)) {
1262 			spdk_lvs_unload(lvs, vbdev_lvs_fini_start_unload_cb, lvs_bdev);
1263 			return;
1264 		}
1265 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
1266 	}
1267 
1268 	spdk_bdev_module_fini_start_done();
1269 }
1270 
1271 static void
1272 vbdev_lvs_fini_start(void)
1273 {
1274 	g_shutdown_started = true;
1275 	vbdev_lvs_fini_start_iter(vbdev_lvol_store_first());
1276 }
1277 
1278 static int
1279 vbdev_lvs_get_ctx_size(void)
1280 {
1281 	return 0;
1282 }
1283 
1284 static void
1285 _vbdev_lvs_examine_failed(void *cb_arg, int lvserrno)
1286 {
1287 	spdk_bdev_module_examine_done(&g_lvol_if);
1288 }
1289 
1290 static void
1291 _vbdev_lvol_examine_close_cb(struct spdk_lvol_store *lvs)
1292 {
1293 	if (lvs->lvols_opened >= lvs->lvol_count) {
1294 		SPDK_INFOLOG(vbdev_lvol, "Opening lvols finished\n");
1295 		spdk_bdev_module_examine_done(&g_lvol_if);
1296 	}
1297 }
1298 
1299 static void
1300 _vbdev_lvs_examine_finish(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1301 {
1302 	struct spdk_lvol_store *lvs = cb_arg;
1303 
1304 	if (lvolerrno != 0) {
1305 		SPDK_ERRLOG("Error opening lvol %s\n", lvol->unique_id);
1306 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
1307 		lvs->lvol_count--;
1308 		free(lvol);
1309 		goto end;
1310 	}
1311 
1312 	if (_create_lvol_disk(lvol, false)) {
1313 		SPDK_ERRLOG("Cannot create bdev for lvol %s\n", lvol->unique_id);
1314 		lvs->lvol_count--;
1315 		_vbdev_lvol_examine_close_cb(lvs);
1316 		SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s failed\n", lvol->unique_id);
1317 		return;
1318 	}
1319 
1320 	lvs->lvols_opened++;
1321 	SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s succeeded\n", lvol->unique_id);
1322 
1323 end:
1324 
1325 	if (lvs->lvols_opened >= lvs->lvol_count) {
1326 		SPDK_INFOLOG(vbdev_lvol, "Opening lvols finished\n");
1327 		spdk_bdev_module_examine_done(&g_lvol_if);
1328 	}
1329 }
1330 
1331 static void
1332 _vbdev_lvs_examine_cb(void *arg, struct spdk_lvol_store *lvol_store, int lvserrno)
1333 {
1334 	struct lvol_store_bdev *lvs_bdev;
1335 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)arg;
1336 	struct spdk_lvol *lvol, *tmp;
1337 
1338 	if (lvserrno == -EEXIST) {
1339 		SPDK_INFOLOG(vbdev_lvol,
1340 			     "Name for lvolstore on device %s conflicts with name for already loaded lvs\n",
1341 			     req->base_bdev->name);
1342 		/* On error blobstore destroys bs_dev itself */
1343 		spdk_bdev_module_examine_done(&g_lvol_if);
1344 		goto end;
1345 	} else if (lvserrno != 0) {
1346 		SPDK_INFOLOG(vbdev_lvol, "Lvol store not found on %s\n", req->base_bdev->name);
1347 		/* On error blobstore destroys bs_dev itself */
1348 		spdk_bdev_module_examine_done(&g_lvol_if);
1349 		goto end;
1350 	}
1351 
1352 	lvserrno = spdk_bs_bdev_claim(lvol_store->bs_dev, &g_lvol_if);
1353 	if (lvserrno != 0) {
1354 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
1355 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, NULL);
1356 		goto end;
1357 	}
1358 
1359 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
1360 	if (!lvs_bdev) {
1361 		SPDK_ERRLOG("Cannot alloc memory for lvs_bdev\n");
1362 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, NULL);
1363 		goto end;
1364 	}
1365 
1366 	lvs_bdev->lvs = lvol_store;
1367 	lvs_bdev->bdev = req->base_bdev;
1368 
1369 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1370 
1371 	SPDK_INFOLOG(vbdev_lvol, "Lvol store found on %s - begin parsing\n",
1372 		     req->base_bdev->name);
1373 
1374 	lvol_store->lvols_opened = 0;
1375 
1376 	if (TAILQ_EMPTY(&lvol_store->lvols)) {
1377 		SPDK_INFOLOG(vbdev_lvol, "Lvol store examination done\n");
1378 		spdk_bdev_module_examine_done(&g_lvol_if);
1379 	} else {
1380 		/* Open all lvols */
1381 		TAILQ_FOREACH_SAFE(lvol, &lvol_store->lvols, link, tmp) {
1382 			spdk_lvol_open(lvol, _vbdev_lvs_examine_finish, lvol_store);
1383 		}
1384 	}
1385 
1386 end:
1387 	free(req);
1388 }
1389 
1390 static void
1391 vbdev_lvs_examine(struct spdk_bdev *bdev)
1392 {
1393 	struct spdk_bs_dev *bs_dev;
1394 	struct spdk_lvs_with_handle_req *req;
1395 	int rc;
1396 
1397 	req = calloc(1, sizeof(*req));
1398 	if (req == NULL) {
1399 		spdk_bdev_module_examine_done(&g_lvol_if);
1400 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1401 		return;
1402 	}
1403 
1404 	rc = spdk_bdev_create_bs_dev_ext(bdev->name, vbdev_lvs_base_bdev_event_cb,
1405 					 NULL, &bs_dev);
1406 	if (rc < 0) {
1407 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n", bdev->name);
1408 		spdk_bdev_module_examine_done(&g_lvol_if);
1409 		free(req);
1410 		return;
1411 	}
1412 
1413 	req->base_bdev = bdev;
1414 
1415 	spdk_lvs_load(bs_dev, _vbdev_lvs_examine_cb, req);
1416 }
1417 
1418 struct spdk_lvol *
1419 vbdev_lvol_get_from_bdev(struct spdk_bdev *bdev)
1420 {
1421 	if (!bdev || bdev->module != &g_lvol_if) {
1422 		return NULL;
1423 	}
1424 
1425 	if (bdev->ctxt == NULL) {
1426 		SPDK_ERRLOG("No lvol ctx assigned to bdev %s\n", bdev->name);
1427 		return NULL;
1428 	}
1429 
1430 	return (struct spdk_lvol *)bdev->ctxt;
1431 }
1432 
1433 SPDK_LOG_REGISTER_COMPONENT(vbdev_lvol)
1434