xref: /spdk/module/bdev/lvol/vbdev_lvol.c (revision 927f1fd57bd004df581518466ec4c1b8083e5d23)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/blob_bdev.h"
36 #include "spdk/rpc.h"
37 #include "spdk/bdev_module.h"
38 #include "spdk/log.h"
39 #include "spdk/string.h"
40 #include "spdk/uuid.h"
41 
42 #include "vbdev_lvol.h"
43 
44 struct vbdev_lvol_io {
45 	struct spdk_blob_ext_io_opts ext_io_opts;
46 };
47 
48 static TAILQ_HEAD(, lvol_store_bdev) g_spdk_lvol_pairs = TAILQ_HEAD_INITIALIZER(
49 			g_spdk_lvol_pairs);
50 
51 static int vbdev_lvs_init(void);
52 static void vbdev_lvs_fini_start(void);
53 static int vbdev_lvs_get_ctx_size(void);
54 static void vbdev_lvs_examine(struct spdk_bdev *bdev);
55 static bool g_shutdown_started = false;
56 
57 static struct spdk_bdev_module g_lvol_if = {
58 	.name = "lvol",
59 	.module_init = vbdev_lvs_init,
60 	.fini_start = vbdev_lvs_fini_start,
61 	.async_fini_start = true,
62 	.examine_disk = vbdev_lvs_examine,
63 	.get_ctx_size = vbdev_lvs_get_ctx_size,
64 
65 };
66 
67 SPDK_BDEV_MODULE_REGISTER(lvol, &g_lvol_if)
68 
69 struct lvol_store_bdev *
70 vbdev_get_lvs_bdev_by_lvs(struct spdk_lvol_store *lvs_orig)
71 {
72 	struct spdk_lvol_store *lvs = NULL;
73 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
74 
75 	while (lvs_bdev != NULL) {
76 		lvs = lvs_bdev->lvs;
77 		if (lvs == lvs_orig) {
78 			if (lvs_bdev->req != NULL) {
79 				/* We do not allow access to lvs that are being destroyed */
80 				return NULL;
81 			} else {
82 				return lvs_bdev;
83 			}
84 		}
85 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
86 	}
87 
88 	return NULL;
89 }
90 
91 static int
92 _vbdev_lvol_change_bdev_alias(struct spdk_lvol *lvol, const char *new_lvol_name)
93 {
94 	struct spdk_bdev_alias *tmp;
95 	char *old_alias;
96 	char *alias;
97 	int rc;
98 	int alias_number = 0;
99 
100 	/* bdev representing lvols have only one alias,
101 	 * while we changed lvs name earlier, we have to iterate alias list to get one,
102 	 * and check if there is only one alias */
103 
104 	TAILQ_FOREACH(tmp, spdk_bdev_get_aliases(lvol->bdev), tailq) {
105 		if (++alias_number > 1) {
106 			SPDK_ERRLOG("There is more than 1 alias in bdev %s\n", lvol->bdev->name);
107 			return -EINVAL;
108 		}
109 
110 		old_alias = tmp->alias.name;
111 	}
112 
113 	if (alias_number == 0) {
114 		SPDK_ERRLOG("There are no aliases in bdev %s\n", lvol->bdev->name);
115 		return -EINVAL;
116 	}
117 
118 	alias = spdk_sprintf_alloc("%s/%s", lvol->lvol_store->name, new_lvol_name);
119 	if (alias == NULL) {
120 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
121 		return -ENOMEM;
122 	}
123 
124 	rc = spdk_bdev_alias_add(lvol->bdev, alias);
125 	if (rc != 0) {
126 		SPDK_ERRLOG("cannot add alias '%s'\n", alias);
127 		free(alias);
128 		return rc;
129 	}
130 	free(alias);
131 
132 	rc = spdk_bdev_alias_del(lvol->bdev, old_alias);
133 	if (rc != 0) {
134 		SPDK_ERRLOG("cannot remove alias '%s'\n", old_alias);
135 		return rc;
136 	}
137 
138 	return 0;
139 }
140 
141 static struct lvol_store_bdev *
142 vbdev_get_lvs_bdev_by_bdev(struct spdk_bdev *bdev_orig)
143 {
144 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
145 
146 	while (lvs_bdev != NULL) {
147 		if (lvs_bdev->bdev == bdev_orig) {
148 			if (lvs_bdev->req != NULL) {
149 				/* We do not allow access to lvs that are being destroyed */
150 				return NULL;
151 			} else {
152 				return lvs_bdev;
153 			}
154 		}
155 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
156 	}
157 
158 	return NULL;
159 }
160 
161 static void
162 vbdev_lvs_hotremove_cb(struct spdk_bdev *bdev)
163 {
164 	struct lvol_store_bdev *lvs_bdev;
165 
166 	lvs_bdev = vbdev_get_lvs_bdev_by_bdev(bdev);
167 	if (lvs_bdev != NULL) {
168 		vbdev_lvs_unload(lvs_bdev->lvs, NULL, NULL);
169 	}
170 }
171 
172 static void
173 vbdev_lvs_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
174 			     void *event_ctx)
175 {
176 	switch (type) {
177 	case SPDK_BDEV_EVENT_REMOVE:
178 		vbdev_lvs_hotremove_cb(bdev);
179 		break;
180 	default:
181 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
182 		break;
183 	}
184 }
185 
186 static void
187 _vbdev_lvs_create_cb(void *cb_arg, struct spdk_lvol_store *lvs, int lvserrno)
188 {
189 	struct spdk_lvs_with_handle_req *req = cb_arg;
190 	struct lvol_store_bdev *lvs_bdev;
191 	struct spdk_bdev *bdev = req->base_bdev;
192 	struct spdk_bs_dev *bs_dev = req->bs_dev;
193 
194 	if (lvserrno != 0) {
195 		assert(lvs == NULL);
196 		SPDK_ERRLOG("Cannot create lvol store bdev\n");
197 		goto end;
198 	}
199 
200 	lvserrno = spdk_bs_bdev_claim(bs_dev, &g_lvol_if);
201 	if (lvserrno != 0) {
202 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
203 		req->bs_dev->destroy(req->bs_dev);
204 		goto end;
205 	}
206 
207 	assert(lvs != NULL);
208 
209 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
210 	if (!lvs_bdev) {
211 		lvserrno = -ENOMEM;
212 		goto end;
213 	}
214 	lvs_bdev->lvs = lvs;
215 	lvs_bdev->bdev = bdev;
216 	lvs_bdev->req = NULL;
217 
218 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
219 	SPDK_INFOLOG(vbdev_lvol, "Lvol store bdev inserted\n");
220 
221 end:
222 	req->cb_fn(req->cb_arg, lvs, lvserrno);
223 	free(req);
224 
225 	return;
226 }
227 
228 int
229 vbdev_lvs_create(const char *base_bdev_name, const char *name, uint32_t cluster_sz,
230 		 enum lvs_clear_method clear_method, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
231 {
232 	struct spdk_bs_dev *bs_dev;
233 	struct spdk_lvs_with_handle_req *lvs_req;
234 	struct spdk_lvs_opts opts;
235 	int rc;
236 	int len;
237 
238 	if (base_bdev_name == NULL) {
239 		SPDK_ERRLOG("missing base_bdev_name param\n");
240 		return -EINVAL;
241 	}
242 
243 	spdk_lvs_opts_init(&opts);
244 	if (cluster_sz != 0) {
245 		opts.cluster_sz = cluster_sz;
246 	}
247 
248 	if (clear_method != 0) {
249 		opts.clear_method = clear_method;
250 	}
251 
252 	if (name == NULL) {
253 		SPDK_ERRLOG("missing name param\n");
254 		return -EINVAL;
255 	}
256 
257 	len = strnlen(name, SPDK_LVS_NAME_MAX);
258 
259 	if (len == 0 || len == SPDK_LVS_NAME_MAX) {
260 		SPDK_ERRLOG("name must be between 1 and %d characters\n", SPDK_LVS_NAME_MAX - 1);
261 		return -EINVAL;
262 	}
263 	snprintf(opts.name, sizeof(opts.name), "%s", name);
264 
265 	lvs_req = calloc(1, sizeof(*lvs_req));
266 	if (!lvs_req) {
267 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
268 		return -ENOMEM;
269 	}
270 
271 	rc = spdk_bdev_create_bs_dev_ext(base_bdev_name, vbdev_lvs_base_bdev_event_cb,
272 					 NULL, &bs_dev);
273 	if (rc < 0) {
274 		SPDK_ERRLOG("Cannot create blobstore device\n");
275 		free(lvs_req);
276 		return rc;
277 	}
278 
279 	lvs_req->bs_dev = bs_dev;
280 	lvs_req->base_bdev = bs_dev->get_base_bdev(bs_dev);
281 	lvs_req->cb_fn = cb_fn;
282 	lvs_req->cb_arg = cb_arg;
283 
284 	rc = spdk_lvs_init(bs_dev, &opts, _vbdev_lvs_create_cb, lvs_req);
285 	if (rc < 0) {
286 		free(lvs_req);
287 		bs_dev->destroy(bs_dev);
288 		return rc;
289 	}
290 
291 	return 0;
292 }
293 
294 static void
295 _vbdev_lvs_rename_cb(void *cb_arg, int lvserrno)
296 {
297 	struct spdk_lvs_req *req = cb_arg;
298 	struct spdk_lvol *tmp;
299 
300 	if (lvserrno != 0) {
301 		SPDK_INFOLOG(vbdev_lvol, "Lvol store rename failed\n");
302 	} else {
303 		TAILQ_FOREACH(tmp, &req->lvol_store->lvols, link) {
304 			/* We have to pass current lvol name, since only lvs name changed */
305 			_vbdev_lvol_change_bdev_alias(tmp, tmp->name);
306 		}
307 	}
308 
309 	req->cb_fn(req->cb_arg, lvserrno);
310 	free(req);
311 }
312 
313 void
314 vbdev_lvs_rename(struct spdk_lvol_store *lvs, const char *new_lvs_name,
315 		 spdk_lvs_op_complete cb_fn, void *cb_arg)
316 {
317 	struct lvol_store_bdev *lvs_bdev;
318 
319 	struct spdk_lvs_req *req;
320 
321 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
322 	if (!lvs_bdev) {
323 		SPDK_ERRLOG("No such lvol store found\n");
324 		cb_fn(cb_arg, -ENODEV);
325 		return;
326 	}
327 
328 	req = calloc(1, sizeof(*req));
329 	if (!req) {
330 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
331 		cb_fn(cb_arg, -ENOMEM);
332 		return;
333 	}
334 	req->cb_fn = cb_fn;
335 	req->cb_arg = cb_arg;
336 	req->lvol_store = lvs;
337 
338 	spdk_lvs_rename(lvs, new_lvs_name, _vbdev_lvs_rename_cb, req);
339 }
340 
341 static void
342 _vbdev_lvs_remove_cb(void *cb_arg, int lvserrno)
343 {
344 	struct lvol_store_bdev *lvs_bdev = cb_arg;
345 	struct spdk_lvs_req *req = lvs_bdev->req;
346 
347 	if (lvserrno != 0) {
348 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
349 	}
350 
351 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
352 	free(lvs_bdev);
353 
354 	if (req->cb_fn != NULL) {
355 		req->cb_fn(req->cb_arg, lvserrno);
356 	}
357 	free(req);
358 }
359 
360 static void
361 _vbdev_lvs_remove_lvol_cb(void *cb_arg, int lvolerrno)
362 {
363 	struct lvol_store_bdev *lvs_bdev = cb_arg;
364 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
365 	struct spdk_lvol *lvol;
366 
367 	if (lvolerrno != 0) {
368 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol removed with errno %d\n", lvolerrno);
369 	}
370 
371 	if (TAILQ_EMPTY(&lvs->lvols)) {
372 		spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
373 		return;
374 	}
375 
376 	lvol = TAILQ_FIRST(&lvs->lvols);
377 	while (lvol != NULL) {
378 		if (spdk_lvol_deletable(lvol)) {
379 			vbdev_lvol_destroy(lvol, _vbdev_lvs_remove_lvol_cb, lvs_bdev);
380 			return;
381 		}
382 		lvol = TAILQ_NEXT(lvol, link);
383 	}
384 
385 	/* If no lvol is deletable, that means there is circular dependency. */
386 	SPDK_ERRLOG("Lvols left in lvs, but unable to delete.\n");
387 	assert(false);
388 }
389 
390 static bool
391 _vbdev_lvs_are_lvols_closed(struct spdk_lvol_store *lvs)
392 {
393 	struct spdk_lvol *lvol;
394 
395 	TAILQ_FOREACH(lvol, &lvs->lvols, link) {
396 		if (lvol->ref_count != 0) {
397 			return false;
398 		}
399 	}
400 	return true;
401 }
402 
403 static void
404 _vbdev_lvs_remove_bdev_unregistered_cb(void *cb_arg, int bdeverrno)
405 {
406 	struct lvol_store_bdev *lvs_bdev = cb_arg;
407 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
408 
409 	if (bdeverrno != 0) {
410 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol unregistered with errno %d\n", bdeverrno);
411 	}
412 
413 	/* Lvol store can be unloaded once all lvols are closed. */
414 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
415 		spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
416 	}
417 }
418 
419 static void
420 _vbdev_lvs_remove(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg,
421 		  bool destroy)
422 {
423 	struct spdk_lvs_req *req;
424 	struct lvol_store_bdev *lvs_bdev;
425 	struct spdk_lvol *lvol, *tmp;
426 
427 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
428 	if (!lvs_bdev) {
429 		SPDK_ERRLOG("No such lvol store found\n");
430 		if (cb_fn != NULL) {
431 			cb_fn(cb_arg, -ENODEV);
432 		}
433 		return;
434 	}
435 
436 	req = calloc(1, sizeof(*req));
437 	if (!req) {
438 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
439 		if (cb_fn != NULL) {
440 			cb_fn(cb_arg, -ENOMEM);
441 		}
442 		return;
443 	}
444 
445 	req->cb_fn = cb_fn;
446 	req->cb_arg = cb_arg;
447 	lvs_bdev->req = req;
448 
449 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
450 		if (destroy) {
451 			spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
452 		} else {
453 			spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
454 		}
455 	} else {
456 		lvs->destruct = destroy;
457 		if (destroy) {
458 			_vbdev_lvs_remove_lvol_cb(lvs_bdev, 0);
459 		} else {
460 			TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
461 				spdk_bdev_unregister(lvol->bdev, _vbdev_lvs_remove_bdev_unregistered_cb, lvs_bdev);
462 			}
463 		}
464 	}
465 }
466 
467 void
468 vbdev_lvs_unload(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
469 {
470 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, false);
471 }
472 
473 void
474 vbdev_lvs_destruct(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
475 {
476 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, true);
477 }
478 
479 struct lvol_store_bdev *
480 vbdev_lvol_store_first(void)
481 {
482 	struct lvol_store_bdev *lvs_bdev;
483 
484 	lvs_bdev = TAILQ_FIRST(&g_spdk_lvol_pairs);
485 	if (lvs_bdev) {
486 		SPDK_INFOLOG(vbdev_lvol, "Starting lvolstore iteration at %p\n", lvs_bdev->lvs);
487 	}
488 
489 	return lvs_bdev;
490 }
491 
492 struct lvol_store_bdev *
493 vbdev_lvol_store_next(struct lvol_store_bdev *prev)
494 {
495 	struct lvol_store_bdev *lvs_bdev;
496 
497 	if (prev == NULL) {
498 		SPDK_ERRLOG("prev argument cannot be NULL\n");
499 		return NULL;
500 	}
501 
502 	lvs_bdev = TAILQ_NEXT(prev, lvol_stores);
503 	if (lvs_bdev) {
504 		SPDK_INFOLOG(vbdev_lvol, "Continuing lvolstore iteration at %p\n", lvs_bdev->lvs);
505 	}
506 
507 	return lvs_bdev;
508 }
509 
510 static struct spdk_lvol_store *
511 _vbdev_get_lvol_store_by_uuid(const struct spdk_uuid *uuid)
512 {
513 	struct spdk_lvol_store *lvs = NULL;
514 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
515 
516 	while (lvs_bdev != NULL) {
517 		lvs = lvs_bdev->lvs;
518 		if (spdk_uuid_compare(&lvs->uuid, uuid) == 0) {
519 			return lvs;
520 		}
521 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
522 	}
523 	return NULL;
524 }
525 
526 struct spdk_lvol_store *
527 vbdev_get_lvol_store_by_uuid(const char *uuid_str)
528 {
529 	struct spdk_uuid uuid;
530 
531 	if (spdk_uuid_parse(&uuid, uuid_str)) {
532 		return NULL;
533 	}
534 
535 	return _vbdev_get_lvol_store_by_uuid(&uuid);
536 }
537 
538 struct spdk_lvol_store *
539 vbdev_get_lvol_store_by_name(const char *name)
540 {
541 	struct spdk_lvol_store *lvs = NULL;
542 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
543 
544 	while (lvs_bdev != NULL) {
545 		lvs = lvs_bdev->lvs;
546 		if (strncmp(lvs->name, name, sizeof(lvs->name)) == 0) {
547 			return lvs;
548 		}
549 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
550 	}
551 	return NULL;
552 }
553 
554 struct vbdev_lvol_destroy_ctx {
555 	struct spdk_lvol *lvol;
556 	spdk_lvol_op_complete cb_fn;
557 	void *cb_arg;
558 };
559 
560 static void
561 _vbdev_lvol_unregister_unload_lvs(void *cb_arg, int lvserrno)
562 {
563 	struct lvol_bdev *lvol_bdev = cb_arg;
564 	struct lvol_store_bdev *lvs_bdev = lvol_bdev->lvs_bdev;
565 
566 	if (lvserrno != 0) {
567 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
568 	}
569 
570 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
571 	free(lvs_bdev);
572 
573 	spdk_bdev_destruct_done(&lvol_bdev->bdev, lvserrno);
574 	free(lvol_bdev);
575 }
576 
577 static void
578 _vbdev_lvol_unregister_cb(void *ctx, int lvolerrno)
579 {
580 	struct lvol_bdev *lvol_bdev = ctx;
581 	struct lvol_store_bdev *lvs_bdev = lvol_bdev->lvs_bdev;
582 
583 	if (g_shutdown_started && _vbdev_lvs_are_lvols_closed(lvs_bdev->lvs)) {
584 		spdk_lvs_unload(lvs_bdev->lvs, _vbdev_lvol_unregister_unload_lvs, lvol_bdev);
585 		return;
586 	}
587 
588 	spdk_bdev_destruct_done(&lvol_bdev->bdev, lvolerrno);
589 	free(lvol_bdev);
590 }
591 
592 static int
593 vbdev_lvol_unregister(void *ctx)
594 {
595 	struct spdk_lvol *lvol = ctx;
596 	struct lvol_bdev *lvol_bdev;
597 
598 	assert(lvol != NULL);
599 	lvol_bdev = SPDK_CONTAINEROF(lvol->bdev, struct lvol_bdev, bdev);
600 
601 	spdk_bdev_alias_del_all(lvol->bdev);
602 	spdk_lvol_close(lvol, _vbdev_lvol_unregister_cb, lvol_bdev);
603 
604 	/* return 1 to indicate we have an operation that must finish asynchronously before the
605 	 *  lvol is closed
606 	 */
607 	return 1;
608 }
609 
610 static void
611 _vbdev_lvol_destroy_cb(void *cb_arg, int bdeverrno)
612 {
613 	struct vbdev_lvol_destroy_ctx *ctx = cb_arg;
614 	struct spdk_lvol *lvol = ctx->lvol;
615 
616 	if (bdeverrno < 0) {
617 		SPDK_INFOLOG(vbdev_lvol, "Could not unregister bdev during lvol (%s) destroy\n",
618 			     lvol->unique_id);
619 		ctx->cb_fn(ctx->cb_arg, bdeverrno);
620 		free(ctx);
621 		return;
622 	}
623 
624 	spdk_lvol_destroy(lvol, ctx->cb_fn, ctx->cb_arg);
625 	free(ctx);
626 }
627 
628 void
629 vbdev_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
630 {
631 	struct vbdev_lvol_destroy_ctx *ctx;
632 	size_t count;
633 
634 	assert(lvol != NULL);
635 	assert(cb_fn != NULL);
636 
637 	/* Check if it is possible to delete lvol */
638 	spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
639 	if (count > 1) {
640 		/* throw an error */
641 		SPDK_ERRLOG("Cannot delete lvol\n");
642 		cb_fn(cb_arg, -EPERM);
643 		return;
644 	}
645 
646 	ctx = calloc(1, sizeof(*ctx));
647 	if (!ctx) {
648 		cb_fn(cb_arg, -ENOMEM);
649 		return;
650 	}
651 
652 	ctx->lvol = lvol;
653 	ctx->cb_fn = cb_fn;
654 	ctx->cb_arg = cb_arg;
655 
656 	spdk_bdev_unregister(lvol->bdev, _vbdev_lvol_destroy_cb, ctx);
657 }
658 
659 static char *
660 vbdev_lvol_find_name(struct spdk_lvol *lvol, spdk_blob_id blob_id)
661 {
662 	struct spdk_lvol_store *lvs;
663 	struct spdk_lvol *_lvol;
664 
665 	assert(lvol != NULL);
666 
667 	lvs = lvol->lvol_store;
668 
669 	assert(lvs);
670 
671 	TAILQ_FOREACH(_lvol, &lvs->lvols, link) {
672 		if (_lvol->blob_id == blob_id) {
673 			return _lvol->name;
674 		}
675 	}
676 
677 	return NULL;
678 }
679 
680 static int
681 vbdev_lvol_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
682 {
683 	struct spdk_lvol *lvol = ctx;
684 	struct lvol_store_bdev *lvs_bdev;
685 	struct spdk_bdev *bdev;
686 	struct spdk_blob *blob;
687 	char lvol_store_uuid[SPDK_UUID_STRING_LEN];
688 	spdk_blob_id *ids = NULL;
689 	size_t count, i;
690 	char *name;
691 	int rc = 0;
692 
693 	spdk_json_write_named_object_begin(w, "lvol");
694 
695 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
696 	if (!lvs_bdev) {
697 		SPDK_ERRLOG("No such lvol store found\n");
698 		rc = -ENODEV;
699 		goto end;
700 	}
701 
702 	bdev = lvs_bdev->bdev;
703 
704 	spdk_uuid_fmt_lower(lvol_store_uuid, sizeof(lvol_store_uuid), &lvol->lvol_store->uuid);
705 	spdk_json_write_named_string(w, "lvol_store_uuid", lvol_store_uuid);
706 
707 	spdk_json_write_named_string(w, "base_bdev", spdk_bdev_get_name(bdev));
708 
709 	blob = lvol->blob;
710 
711 	spdk_json_write_named_bool(w, "thin_provision", spdk_blob_is_thin_provisioned(blob));
712 
713 	spdk_json_write_named_bool(w, "snapshot", spdk_blob_is_snapshot(blob));
714 
715 	spdk_json_write_named_bool(w, "clone", spdk_blob_is_clone(blob));
716 
717 	if (spdk_blob_is_clone(blob)) {
718 		spdk_blob_id snapshotid = spdk_blob_get_parent_snapshot(lvol->lvol_store->blobstore, lvol->blob_id);
719 		if (snapshotid != SPDK_BLOBID_INVALID) {
720 			name = vbdev_lvol_find_name(lvol, snapshotid);
721 			if (name != NULL) {
722 				spdk_json_write_named_string(w, "base_snapshot", name);
723 			} else {
724 				SPDK_ERRLOG("Cannot obtain snapshots name\n");
725 			}
726 		}
727 	}
728 
729 	if (spdk_blob_is_snapshot(blob)) {
730 		/* Take a number of clones */
731 		rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
732 		if (rc == -ENOMEM && count > 0) {
733 			ids = malloc(sizeof(spdk_blob_id) * count);
734 			if (ids == NULL) {
735 				SPDK_ERRLOG("Cannot allocate memory\n");
736 				rc = -ENOMEM;
737 				goto end;
738 			}
739 
740 			rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, ids, &count);
741 			if (rc == 0) {
742 				spdk_json_write_named_array_begin(w, "clones");
743 				for (i = 0; i < count; i++) {
744 					name = vbdev_lvol_find_name(lvol, ids[i]);
745 					if (name != NULL) {
746 						spdk_json_write_string(w, name);
747 					} else {
748 						SPDK_ERRLOG("Cannot obtain clone name\n");
749 					}
750 
751 				}
752 				spdk_json_write_array_end(w);
753 			}
754 			free(ids);
755 		}
756 
757 	}
758 
759 end:
760 	spdk_json_write_object_end(w);
761 
762 	return rc;
763 }
764 
765 static void
766 vbdev_lvol_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
767 {
768 	/* Nothing to dump as lvol configuration is saved on physical device. */
769 }
770 
771 static struct spdk_io_channel *
772 vbdev_lvol_get_io_channel(void *ctx)
773 {
774 	struct spdk_lvol *lvol = ctx;
775 
776 	return spdk_lvol_get_io_channel(lvol);
777 }
778 
779 static bool
780 vbdev_lvol_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
781 {
782 	struct spdk_lvol *lvol = ctx;
783 
784 	switch (io_type) {
785 	case SPDK_BDEV_IO_TYPE_WRITE:
786 	case SPDK_BDEV_IO_TYPE_UNMAP:
787 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
788 		return !spdk_blob_is_read_only(lvol->blob);
789 	case SPDK_BDEV_IO_TYPE_RESET:
790 	case SPDK_BDEV_IO_TYPE_READ:
791 		return true;
792 	default:
793 		return false;
794 	}
795 }
796 
797 static void
798 lvol_op_comp(void *cb_arg, int bserrno)
799 {
800 	struct spdk_bdev_io *bdev_io = cb_arg;
801 	enum spdk_bdev_io_status status = SPDK_BDEV_IO_STATUS_SUCCESS;
802 
803 	if (bserrno != 0) {
804 		if (bserrno == -ENOMEM) {
805 			status = SPDK_BDEV_IO_STATUS_NOMEM;
806 		} else {
807 			status = SPDK_BDEV_IO_STATUS_FAILED;
808 		}
809 	}
810 
811 	spdk_bdev_io_complete(bdev_io, status);
812 }
813 
814 static void
815 lvol_unmap(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
816 {
817 	uint64_t start_page, num_pages;
818 	struct spdk_blob *blob = lvol->blob;
819 
820 	start_page = bdev_io->u.bdev.offset_blocks;
821 	num_pages = bdev_io->u.bdev.num_blocks;
822 
823 	spdk_blob_io_unmap(blob, ch, start_page, num_pages, lvol_op_comp, bdev_io);
824 }
825 
826 static void
827 lvol_write_zeroes(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
828 {
829 	uint64_t start_page, num_pages;
830 	struct spdk_blob *blob = lvol->blob;
831 
832 	start_page = bdev_io->u.bdev.offset_blocks;
833 	num_pages = bdev_io->u.bdev.num_blocks;
834 
835 	spdk_blob_io_write_zeroes(blob, ch, start_page, num_pages, lvol_op_comp, bdev_io);
836 }
837 
838 static void
839 lvol_read(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
840 {
841 	uint64_t start_page, num_pages;
842 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
843 	struct spdk_blob *blob = lvol->blob;
844 
845 	start_page = bdev_io->u.bdev.offset_blocks;
846 	num_pages = bdev_io->u.bdev.num_blocks;
847 
848 	if (bdev_io->u.bdev.ext_opts) {
849 		struct vbdev_lvol_io *lvol_io = (struct vbdev_lvol_io *)bdev_io->driver_ctx;
850 
851 		lvol_io->ext_io_opts.size = sizeof(lvol_io->ext_io_opts);
852 		lvol_io->ext_io_opts.memory_domain = bdev_io->u.bdev.ext_opts->memory_domain;
853 		lvol_io->ext_io_opts.memory_domain_ctx = bdev_io->u.bdev.ext_opts->memory_domain_ctx;
854 		/* Save a pointer to ext_opts passed by the user, it will be used in bs_dev readv/writev_ext functions
855 		 * to restore ext_opts structure. That is done since bdev and blob extended functions use different
856 		 * extended opts structures */
857 		lvol_io->ext_io_opts.user_ctx = bdev_io->u.bdev.ext_opts;
858 
859 		spdk_blob_io_readv_ext(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
860 				       num_pages, lvol_op_comp, bdev_io, &lvol_io->ext_io_opts);
861 	} else {
862 		spdk_blob_io_readv(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
863 				   num_pages, lvol_op_comp, bdev_io);
864 	}
865 }
866 
867 static void
868 lvol_write(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
869 {
870 	uint64_t start_page, num_pages;
871 	struct spdk_blob *blob = lvol->blob;
872 
873 	start_page = bdev_io->u.bdev.offset_blocks;
874 	num_pages = bdev_io->u.bdev.num_blocks;
875 
876 	if (bdev_io->u.bdev.ext_opts) {
877 		struct vbdev_lvol_io *lvol_io = (struct vbdev_lvol_io *)bdev_io->driver_ctx;
878 
879 		lvol_io->ext_io_opts.size = sizeof(lvol_io->ext_io_opts);
880 		lvol_io->ext_io_opts.memory_domain = bdev_io->u.bdev.ext_opts->memory_domain;
881 		lvol_io->ext_io_opts.memory_domain_ctx = bdev_io->u.bdev.ext_opts->memory_domain_ctx;
882 		/* Save a pointer to ext_opts passed by the user, it will be used in bs_dev readv/writev_ext functions
883 		 * to restore ext_opts structure. That is done since bdev and blob extended functions use different
884 		 * extended opts structures */
885 		lvol_io->ext_io_opts.user_ctx = bdev_io->u.bdev.ext_opts;
886 
887 		spdk_blob_io_writev_ext(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
888 					num_pages, lvol_op_comp, bdev_io, &lvol_io->ext_io_opts);
889 	} else {
890 		spdk_blob_io_writev(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
891 				    num_pages, lvol_op_comp, bdev_io);
892 	}
893 }
894 
895 static int
896 lvol_reset(struct spdk_bdev_io *bdev_io)
897 {
898 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
899 
900 	return 0;
901 }
902 
903 static void
904 lvol_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
905 {
906 	if (!success) {
907 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
908 		return;
909 	}
910 
911 	lvol_read(ch, bdev_io);
912 }
913 
914 static void
915 vbdev_lvol_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
916 {
917 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
918 
919 	switch (bdev_io->type) {
920 	case SPDK_BDEV_IO_TYPE_READ:
921 		spdk_bdev_io_get_buf(bdev_io, lvol_get_buf_cb,
922 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
923 		break;
924 	case SPDK_BDEV_IO_TYPE_WRITE:
925 		lvol_write(lvol, ch, bdev_io);
926 		break;
927 	case SPDK_BDEV_IO_TYPE_RESET:
928 		lvol_reset(bdev_io);
929 		break;
930 	case SPDK_BDEV_IO_TYPE_UNMAP:
931 		lvol_unmap(lvol, ch, bdev_io);
932 		break;
933 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
934 		lvol_write_zeroes(lvol, ch, bdev_io);
935 		break;
936 	default:
937 		SPDK_INFOLOG(vbdev_lvol, "lvol: unsupported I/O type %d\n", bdev_io->type);
938 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
939 		return;
940 	}
941 	return;
942 }
943 
944 static int
945 vbdev_lvol_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
946 {
947 	struct spdk_lvol *lvol = ctx;
948 	struct spdk_bdev *base_bdev;
949 
950 	base_bdev = lvol->lvol_store->bs_dev->get_base_bdev(lvol->lvol_store->bs_dev);
951 
952 	return spdk_bdev_get_memory_domains(base_bdev, domains, array_size);
953 }
954 
955 static struct spdk_bdev_fn_table vbdev_lvol_fn_table = {
956 	.destruct		= vbdev_lvol_unregister,
957 	.io_type_supported	= vbdev_lvol_io_type_supported,
958 	.submit_request		= vbdev_lvol_submit_request,
959 	.get_io_channel		= vbdev_lvol_get_io_channel,
960 	.dump_info_json		= vbdev_lvol_dump_info_json,
961 	.write_config_json	= vbdev_lvol_write_config_json,
962 	.get_memory_domains	= vbdev_lvol_get_memory_domains,
963 };
964 
965 static void
966 lvol_destroy_cb(void *cb_arg, int bdeverrno)
967 {
968 }
969 
970 static void
971 _create_lvol_disk_destroy_cb(void *cb_arg, int bdeverrno)
972 {
973 	struct spdk_lvol *lvol = cb_arg;
974 
975 	if (bdeverrno < 0) {
976 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
977 			    lvol->unique_id);
978 		return;
979 	}
980 
981 	spdk_lvol_destroy(lvol, lvol_destroy_cb, NULL);
982 }
983 
984 static void
985 _create_lvol_disk_unload_cb(void *cb_arg, int bdeverrno)
986 {
987 	struct spdk_lvol *lvol = cb_arg;
988 
989 	if (bdeverrno < 0) {
990 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
991 			    lvol->unique_id);
992 		return;
993 	}
994 
995 	TAILQ_REMOVE(&lvol->lvol_store->lvols, lvol, link);
996 	free(lvol);
997 }
998 
999 static int
1000 _create_lvol_disk(struct spdk_lvol *lvol, bool destroy)
1001 {
1002 	struct spdk_bdev *bdev;
1003 	struct lvol_bdev *lvol_bdev;
1004 	struct lvol_store_bdev *lvs_bdev;
1005 	uint64_t total_size;
1006 	unsigned char *alias;
1007 	int rc;
1008 
1009 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
1010 	if (lvs_bdev == NULL) {
1011 		SPDK_ERRLOG("No spdk lvs-bdev pair found for lvol %s\n", lvol->unique_id);
1012 		return -ENODEV;
1013 	}
1014 
1015 	lvol_bdev = calloc(1, sizeof(struct lvol_bdev));
1016 	if (!lvol_bdev) {
1017 		SPDK_ERRLOG("Cannot alloc memory for lvol bdev\n");
1018 		return -ENOMEM;
1019 	}
1020 
1021 	lvol_bdev->lvol = lvol;
1022 	lvol_bdev->lvs_bdev = lvs_bdev;
1023 
1024 	bdev = &lvol_bdev->bdev;
1025 	bdev->name = lvol->unique_id;
1026 	bdev->product_name = "Logical Volume";
1027 	bdev->blocklen = spdk_bs_get_io_unit_size(lvol->lvol_store->blobstore);
1028 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
1029 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
1030 	assert((total_size % bdev->blocklen) == 0);
1031 	bdev->blockcnt = total_size / bdev->blocklen;
1032 	bdev->uuid = lvol->uuid;
1033 	bdev->required_alignment = lvs_bdev->bdev->required_alignment;
1034 	bdev->split_on_optimal_io_boundary = true;
1035 	bdev->optimal_io_boundary = spdk_bs_get_cluster_size(lvol->lvol_store->blobstore) / bdev->blocklen;
1036 
1037 	bdev->ctxt = lvol;
1038 	bdev->fn_table = &vbdev_lvol_fn_table;
1039 	bdev->module = &g_lvol_if;
1040 
1041 	rc = spdk_bdev_register(bdev);
1042 	if (rc) {
1043 		free(lvol_bdev);
1044 		return rc;
1045 	}
1046 	lvol->bdev = bdev;
1047 
1048 	alias = spdk_sprintf_alloc("%s/%s", lvs_bdev->lvs->name, lvol->name);
1049 	if (alias == NULL) {
1050 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
1051 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
1052 						  _create_lvol_disk_unload_cb), lvol);
1053 		return -ENOMEM;
1054 	}
1055 
1056 	rc = spdk_bdev_alias_add(bdev, alias);
1057 	if (rc != 0) {
1058 		SPDK_ERRLOG("Cannot add alias to lvol bdev\n");
1059 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
1060 						  _create_lvol_disk_unload_cb), lvol);
1061 	}
1062 	free(alias);
1063 
1064 	return rc;
1065 }
1066 
1067 static void
1068 _vbdev_lvol_create_cb(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1069 {
1070 	struct spdk_lvol_with_handle_req *req = cb_arg;
1071 
1072 	if (lvolerrno < 0) {
1073 		goto end;
1074 	}
1075 
1076 	lvolerrno = _create_lvol_disk(lvol, true);
1077 
1078 end:
1079 	req->cb_fn(req->cb_arg, lvol, lvolerrno);
1080 	free(req);
1081 }
1082 
1083 int
1084 vbdev_lvol_create(struct spdk_lvol_store *lvs, const char *name, uint64_t sz,
1085 		  bool thin_provision, enum lvol_clear_method clear_method, spdk_lvol_op_with_handle_complete cb_fn,
1086 		  void *cb_arg)
1087 {
1088 	struct spdk_lvol_with_handle_req *req;
1089 	int rc;
1090 
1091 	req = calloc(1, sizeof(*req));
1092 	if (req == NULL) {
1093 		return -ENOMEM;
1094 	}
1095 	req->cb_fn = cb_fn;
1096 	req->cb_arg = cb_arg;
1097 
1098 	rc = spdk_lvol_create(lvs, name, sz, thin_provision, clear_method,
1099 			      _vbdev_lvol_create_cb, req);
1100 	if (rc != 0) {
1101 		free(req);
1102 	}
1103 
1104 	return rc;
1105 }
1106 
1107 void
1108 vbdev_lvol_create_snapshot(struct spdk_lvol *lvol, const char *snapshot_name,
1109 			   spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1110 {
1111 	struct spdk_lvol_with_handle_req *req;
1112 
1113 	req = calloc(1, sizeof(*req));
1114 	if (req == NULL) {
1115 		cb_fn(cb_arg, NULL, -ENOMEM);
1116 		return;
1117 	}
1118 
1119 	req->cb_fn = cb_fn;
1120 	req->cb_arg = cb_arg;
1121 
1122 	spdk_lvol_create_snapshot(lvol, snapshot_name, _vbdev_lvol_create_cb, req);
1123 }
1124 
1125 void
1126 vbdev_lvol_create_clone(struct spdk_lvol *lvol, const char *clone_name,
1127 			spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1128 {
1129 	struct spdk_lvol_with_handle_req *req;
1130 
1131 	req = calloc(1, sizeof(*req));
1132 	if (req == NULL) {
1133 		cb_fn(cb_arg, NULL, -ENOMEM);
1134 		return;
1135 	}
1136 
1137 	req->cb_fn = cb_fn;
1138 	req->cb_arg = cb_arg;
1139 
1140 	spdk_lvol_create_clone(lvol, clone_name, _vbdev_lvol_create_cb, req);
1141 }
1142 
1143 static void
1144 _vbdev_lvol_rename_cb(void *cb_arg, int lvolerrno)
1145 {
1146 	struct spdk_lvol_req *req = cb_arg;
1147 
1148 	if (lvolerrno != 0) {
1149 		SPDK_ERRLOG("Renaming lvol failed\n");
1150 	}
1151 
1152 	req->cb_fn(req->cb_arg, lvolerrno);
1153 	free(req);
1154 }
1155 
1156 void
1157 vbdev_lvol_rename(struct spdk_lvol *lvol, const char *new_lvol_name,
1158 		  spdk_lvol_op_complete cb_fn, void *cb_arg)
1159 {
1160 	struct spdk_lvol_req *req;
1161 	int rc;
1162 
1163 	rc = _vbdev_lvol_change_bdev_alias(lvol, new_lvol_name);
1164 	if (rc != 0) {
1165 		SPDK_ERRLOG("renaming lvol to '%s' does not succeed\n", new_lvol_name);
1166 		cb_fn(cb_arg, rc);
1167 		return;
1168 	}
1169 
1170 	req = calloc(1, sizeof(*req));
1171 	if (req == NULL) {
1172 		cb_fn(cb_arg, -ENOMEM);
1173 		return;
1174 	}
1175 	req->cb_fn = cb_fn;
1176 	req->cb_arg = cb_arg;
1177 
1178 	spdk_lvol_rename(lvol, new_lvol_name, _vbdev_lvol_rename_cb, req);
1179 }
1180 
1181 static void
1182 _vbdev_lvol_resize_cb(void *cb_arg, int lvolerrno)
1183 {
1184 	struct spdk_lvol_req *req = cb_arg;
1185 	struct spdk_lvol *lvol = req->lvol;
1186 	uint64_t total_size;
1187 
1188 	/* change bdev size */
1189 	if (lvolerrno != 0) {
1190 		SPDK_ERRLOG("CB function for bdev lvol %s receive error no: %d.\n", lvol->name, lvolerrno);
1191 		goto finish;
1192 	}
1193 
1194 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
1195 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
1196 	assert((total_size % lvol->bdev->blocklen) == 0);
1197 
1198 	lvolerrno = spdk_bdev_notify_blockcnt_change(lvol->bdev, total_size / lvol->bdev->blocklen);
1199 	if (lvolerrno != 0) {
1200 		SPDK_ERRLOG("Could not change num blocks for bdev lvol %s with error no: %d.\n",
1201 			    lvol->name, lvolerrno);
1202 	}
1203 
1204 finish:
1205 	req->cb_fn(req->cb_arg, lvolerrno);
1206 	free(req);
1207 }
1208 
1209 void
1210 vbdev_lvol_resize(struct spdk_lvol *lvol, uint64_t sz, spdk_lvol_op_complete cb_fn, void *cb_arg)
1211 {
1212 	struct spdk_lvol_req *req;
1213 
1214 	if (lvol == NULL) {
1215 		SPDK_ERRLOG("lvol does not exist\n");
1216 		cb_fn(cb_arg, -EINVAL);
1217 		return;
1218 	}
1219 
1220 	assert(lvol->bdev != NULL);
1221 
1222 	req = calloc(1, sizeof(*req));
1223 	if (req == NULL) {
1224 		cb_fn(cb_arg, -ENOMEM);
1225 		return;
1226 	}
1227 
1228 	req->cb_fn = cb_fn;
1229 	req->cb_arg = cb_arg;
1230 	req->sz = sz;
1231 	req->lvol = lvol;
1232 
1233 	spdk_lvol_resize(req->lvol, req->sz, _vbdev_lvol_resize_cb, req);
1234 }
1235 
1236 static void
1237 _vbdev_lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
1238 {
1239 	struct spdk_lvol_req *req = cb_arg;
1240 	struct spdk_lvol *lvol = req->lvol;
1241 
1242 	if (lvolerrno != 0) {
1243 		SPDK_ERRLOG("Could not set bdev lvol %s as read only due to error: %d.\n", lvol->name, lvolerrno);
1244 	}
1245 
1246 	req->cb_fn(req->cb_arg, lvolerrno);
1247 	free(req);
1248 }
1249 
1250 void
1251 vbdev_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1252 {
1253 	struct spdk_lvol_req *req;
1254 
1255 	if (lvol == NULL) {
1256 		SPDK_ERRLOG("lvol does not exist\n");
1257 		cb_fn(cb_arg, -EINVAL);
1258 		return;
1259 	}
1260 
1261 	assert(lvol->bdev != NULL);
1262 
1263 	req = calloc(1, sizeof(*req));
1264 	if (req == NULL) {
1265 		cb_fn(cb_arg, -ENOMEM);
1266 		return;
1267 	}
1268 
1269 	req->cb_fn = cb_fn;
1270 	req->cb_arg = cb_arg;
1271 	req->lvol = lvol;
1272 
1273 	spdk_lvol_set_read_only(lvol, _vbdev_lvol_set_read_only_cb, req);
1274 }
1275 
1276 static int
1277 vbdev_lvs_init(void)
1278 {
1279 	return 0;
1280 }
1281 
1282 static void vbdev_lvs_fini_start_iter(struct lvol_store_bdev *lvs_bdev);
1283 
1284 static void
1285 vbdev_lvs_fini_start_unload_cb(void *cb_arg, int lvserrno)
1286 {
1287 	struct lvol_store_bdev *lvs_bdev = cb_arg;
1288 	struct lvol_store_bdev *next_lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
1289 
1290 	if (lvserrno != 0) {
1291 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
1292 	}
1293 
1294 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1295 	free(lvs_bdev);
1296 
1297 	vbdev_lvs_fini_start_iter(next_lvs_bdev);
1298 }
1299 
1300 static void
1301 vbdev_lvs_fini_start_iter(struct lvol_store_bdev *lvs_bdev)
1302 {
1303 	struct spdk_lvol_store *lvs;
1304 
1305 	while (lvs_bdev != NULL) {
1306 		lvs = lvs_bdev->lvs;
1307 
1308 		if (_vbdev_lvs_are_lvols_closed(lvs)) {
1309 			spdk_lvs_unload(lvs, vbdev_lvs_fini_start_unload_cb, lvs_bdev);
1310 			return;
1311 		}
1312 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
1313 	}
1314 
1315 	spdk_bdev_module_fini_start_done();
1316 }
1317 
1318 static void
1319 vbdev_lvs_fini_start(void)
1320 {
1321 	g_shutdown_started = true;
1322 	vbdev_lvs_fini_start_iter(vbdev_lvol_store_first());
1323 }
1324 
1325 static int
1326 vbdev_lvs_get_ctx_size(void)
1327 {
1328 	return sizeof(struct vbdev_lvol_io);
1329 }
1330 
1331 static void
1332 _vbdev_lvs_examine_failed(void *cb_arg, int lvserrno)
1333 {
1334 	spdk_bdev_module_examine_done(&g_lvol_if);
1335 }
1336 
1337 static void
1338 _vbdev_lvol_examine_close_cb(struct spdk_lvol_store *lvs)
1339 {
1340 	if (lvs->lvols_opened >= lvs->lvol_count) {
1341 		SPDK_INFOLOG(vbdev_lvol, "Opening lvols finished\n");
1342 		spdk_bdev_module_examine_done(&g_lvol_if);
1343 	}
1344 }
1345 
1346 static void
1347 _vbdev_lvs_examine_finish(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1348 {
1349 	struct spdk_lvol_store *lvs = cb_arg;
1350 
1351 	if (lvolerrno != 0) {
1352 		SPDK_ERRLOG("Error opening lvol %s\n", lvol->unique_id);
1353 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
1354 		lvs->lvol_count--;
1355 		free(lvol);
1356 		goto end;
1357 	}
1358 
1359 	if (_create_lvol_disk(lvol, false)) {
1360 		SPDK_ERRLOG("Cannot create bdev for lvol %s\n", lvol->unique_id);
1361 		lvs->lvol_count--;
1362 		_vbdev_lvol_examine_close_cb(lvs);
1363 		SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s failed\n", lvol->unique_id);
1364 		return;
1365 	}
1366 
1367 	lvs->lvols_opened++;
1368 	SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s succeeded\n", lvol->unique_id);
1369 
1370 end:
1371 
1372 	if (lvs->lvols_opened >= lvs->lvol_count) {
1373 		SPDK_INFOLOG(vbdev_lvol, "Opening lvols finished\n");
1374 		spdk_bdev_module_examine_done(&g_lvol_if);
1375 	}
1376 }
1377 
1378 static void
1379 _vbdev_lvs_examine_cb(void *arg, struct spdk_lvol_store *lvol_store, int lvserrno)
1380 {
1381 	struct lvol_store_bdev *lvs_bdev;
1382 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)arg;
1383 	struct spdk_lvol *lvol, *tmp;
1384 
1385 	if (lvserrno == -EEXIST) {
1386 		SPDK_INFOLOG(vbdev_lvol,
1387 			     "Name for lvolstore on device %s conflicts with name for already loaded lvs\n",
1388 			     req->base_bdev->name);
1389 		/* On error blobstore destroys bs_dev itself */
1390 		spdk_bdev_module_examine_done(&g_lvol_if);
1391 		goto end;
1392 	} else if (lvserrno != 0) {
1393 		SPDK_INFOLOG(vbdev_lvol, "Lvol store not found on %s\n", req->base_bdev->name);
1394 		/* On error blobstore destroys bs_dev itself */
1395 		spdk_bdev_module_examine_done(&g_lvol_if);
1396 		goto end;
1397 	}
1398 
1399 	lvserrno = spdk_bs_bdev_claim(lvol_store->bs_dev, &g_lvol_if);
1400 	if (lvserrno != 0) {
1401 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
1402 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, NULL);
1403 		goto end;
1404 	}
1405 
1406 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
1407 	if (!lvs_bdev) {
1408 		SPDK_ERRLOG("Cannot alloc memory for lvs_bdev\n");
1409 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, NULL);
1410 		goto end;
1411 	}
1412 
1413 	lvs_bdev->lvs = lvol_store;
1414 	lvs_bdev->bdev = req->base_bdev;
1415 
1416 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1417 
1418 	SPDK_INFOLOG(vbdev_lvol, "Lvol store found on %s - begin parsing\n",
1419 		     req->base_bdev->name);
1420 
1421 	lvol_store->lvols_opened = 0;
1422 
1423 	if (TAILQ_EMPTY(&lvol_store->lvols)) {
1424 		SPDK_INFOLOG(vbdev_lvol, "Lvol store examination done\n");
1425 		spdk_bdev_module_examine_done(&g_lvol_if);
1426 	} else {
1427 		/* Open all lvols */
1428 		TAILQ_FOREACH_SAFE(lvol, &lvol_store->lvols, link, tmp) {
1429 			spdk_lvol_open(lvol, _vbdev_lvs_examine_finish, lvol_store);
1430 		}
1431 	}
1432 
1433 end:
1434 	free(req);
1435 }
1436 
1437 static void
1438 vbdev_lvs_examine(struct spdk_bdev *bdev)
1439 {
1440 	struct spdk_bs_dev *bs_dev;
1441 	struct spdk_lvs_with_handle_req *req;
1442 	int rc;
1443 
1444 	if (spdk_bdev_get_md_size(bdev) != 0) {
1445 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n which is formatted with metadata",
1446 			     bdev->name);
1447 		spdk_bdev_module_examine_done(&g_lvol_if);
1448 		return;
1449 	}
1450 
1451 	req = calloc(1, sizeof(*req));
1452 	if (req == NULL) {
1453 		spdk_bdev_module_examine_done(&g_lvol_if);
1454 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1455 		return;
1456 	}
1457 
1458 	rc = spdk_bdev_create_bs_dev_ext(bdev->name, vbdev_lvs_base_bdev_event_cb,
1459 					 NULL, &bs_dev);
1460 	if (rc < 0) {
1461 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n", bdev->name);
1462 		spdk_bdev_module_examine_done(&g_lvol_if);
1463 		free(req);
1464 		return;
1465 	}
1466 
1467 	req->base_bdev = bdev;
1468 
1469 	spdk_lvs_load(bs_dev, _vbdev_lvs_examine_cb, req);
1470 }
1471 
1472 struct spdk_lvol *
1473 vbdev_lvol_get_from_bdev(struct spdk_bdev *bdev)
1474 {
1475 	if (!bdev || bdev->module != &g_lvol_if) {
1476 		return NULL;
1477 	}
1478 
1479 	if (bdev->ctxt == NULL) {
1480 		SPDK_ERRLOG("No lvol ctx assigned to bdev %s\n", bdev->name);
1481 		return NULL;
1482 	}
1483 
1484 	return (struct spdk_lvol *)bdev->ctxt;
1485 }
1486 
1487 SPDK_LOG_REGISTER_COMPONENT(vbdev_lvol)
1488