xref: /spdk/module/bdev/lvol/vbdev_lvol.c (revision ee8167e3e1671162f2e832d6b8798fb04ee76f3c)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/blob_bdev.h"
8 #include "spdk/rpc.h"
9 #include "spdk/bdev_module.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/uuid.h"
13 
14 #include "vbdev_lvol.h"
15 
16 struct vbdev_lvol_io {
17 	struct spdk_blob_ext_io_opts ext_io_opts;
18 };
19 
20 static TAILQ_HEAD(, lvol_store_bdev) g_spdk_lvol_pairs = TAILQ_HEAD_INITIALIZER(
21 			g_spdk_lvol_pairs);
22 
23 static int vbdev_lvs_init(void);
24 static void vbdev_lvs_fini_start(void);
25 static int vbdev_lvs_get_ctx_size(void);
26 static void vbdev_lvs_examine(struct spdk_bdev *bdev);
27 static bool g_shutdown_started = false;
28 
29 static struct spdk_bdev_module g_lvol_if = {
30 	.name = "lvol",
31 	.module_init = vbdev_lvs_init,
32 	.fini_start = vbdev_lvs_fini_start,
33 	.async_fini_start = true,
34 	.examine_disk = vbdev_lvs_examine,
35 	.get_ctx_size = vbdev_lvs_get_ctx_size,
36 
37 };
38 
39 SPDK_BDEV_MODULE_REGISTER(lvol, &g_lvol_if)
40 
41 struct lvol_store_bdev *
42 vbdev_get_lvs_bdev_by_lvs(struct spdk_lvol_store *lvs_orig)
43 {
44 	struct spdk_lvol_store *lvs = NULL;
45 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
46 
47 	while (lvs_bdev != NULL) {
48 		lvs = lvs_bdev->lvs;
49 		if (lvs == lvs_orig) {
50 			if (lvs_bdev->req != NULL) {
51 				/* We do not allow access to lvs that are being destroyed */
52 				return NULL;
53 			} else {
54 				return lvs_bdev;
55 			}
56 		}
57 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
58 	}
59 
60 	return NULL;
61 }
62 
63 static int
64 _vbdev_lvol_change_bdev_alias(struct spdk_lvol *lvol, const char *new_lvol_name)
65 {
66 	struct spdk_bdev_alias *tmp;
67 	char *old_alias;
68 	char *alias;
69 	int rc;
70 	int alias_number = 0;
71 
72 	/* bdev representing lvols have only one alias,
73 	 * while we changed lvs name earlier, we have to iterate alias list to get one,
74 	 * and check if there is only one alias */
75 
76 	TAILQ_FOREACH(tmp, spdk_bdev_get_aliases(lvol->bdev), tailq) {
77 		if (++alias_number > 1) {
78 			SPDK_ERRLOG("There is more than 1 alias in bdev %s\n", lvol->bdev->name);
79 			return -EINVAL;
80 		}
81 
82 		old_alias = tmp->alias.name;
83 	}
84 
85 	if (alias_number == 0) {
86 		SPDK_ERRLOG("There are no aliases in bdev %s\n", lvol->bdev->name);
87 		return -EINVAL;
88 	}
89 
90 	alias = spdk_sprintf_alloc("%s/%s", lvol->lvol_store->name, new_lvol_name);
91 	if (alias == NULL) {
92 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
93 		return -ENOMEM;
94 	}
95 
96 	rc = spdk_bdev_alias_add(lvol->bdev, alias);
97 	if (rc != 0) {
98 		SPDK_ERRLOG("cannot add alias '%s'\n", alias);
99 		free(alias);
100 		return rc;
101 	}
102 	free(alias);
103 
104 	rc = spdk_bdev_alias_del(lvol->bdev, old_alias);
105 	if (rc != 0) {
106 		SPDK_ERRLOG("cannot remove alias '%s'\n", old_alias);
107 		return rc;
108 	}
109 
110 	return 0;
111 }
112 
113 static struct lvol_store_bdev *
114 vbdev_get_lvs_bdev_by_bdev(struct spdk_bdev *bdev_orig)
115 {
116 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
117 
118 	while (lvs_bdev != NULL) {
119 		if (lvs_bdev->bdev == bdev_orig) {
120 			if (lvs_bdev->req != NULL) {
121 				/* We do not allow access to lvs that are being destroyed */
122 				return NULL;
123 			} else {
124 				return lvs_bdev;
125 			}
126 		}
127 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
128 	}
129 
130 	return NULL;
131 }
132 
133 static void
134 vbdev_lvs_hotremove_cb(struct spdk_bdev *bdev)
135 {
136 	struct lvol_store_bdev *lvs_bdev;
137 
138 	lvs_bdev = vbdev_get_lvs_bdev_by_bdev(bdev);
139 	if (lvs_bdev != NULL) {
140 		vbdev_lvs_unload(lvs_bdev->lvs, NULL, NULL);
141 	}
142 }
143 
144 static void
145 vbdev_lvs_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
146 			     void *event_ctx)
147 {
148 	switch (type) {
149 	case SPDK_BDEV_EVENT_REMOVE:
150 		vbdev_lvs_hotremove_cb(bdev);
151 		break;
152 	default:
153 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
154 		break;
155 	}
156 }
157 
158 static void
159 _vbdev_lvs_create_cb(void *cb_arg, struct spdk_lvol_store *lvs, int lvserrno)
160 {
161 	struct spdk_lvs_with_handle_req *req = cb_arg;
162 	struct lvol_store_bdev *lvs_bdev;
163 	struct spdk_bdev *bdev = req->base_bdev;
164 	struct spdk_bs_dev *bs_dev = req->bs_dev;
165 
166 	if (lvserrno != 0) {
167 		assert(lvs == NULL);
168 		SPDK_ERRLOG("Cannot create lvol store bdev\n");
169 		goto end;
170 	}
171 
172 	lvserrno = spdk_bs_bdev_claim(bs_dev, &g_lvol_if);
173 	if (lvserrno != 0) {
174 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
175 		req->bs_dev->destroy(req->bs_dev);
176 		goto end;
177 	}
178 
179 	assert(lvs != NULL);
180 
181 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
182 	if (!lvs_bdev) {
183 		lvserrno = -ENOMEM;
184 		goto end;
185 	}
186 	lvs_bdev->lvs = lvs;
187 	lvs_bdev->bdev = bdev;
188 	lvs_bdev->req = NULL;
189 
190 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
191 	SPDK_INFOLOG(vbdev_lvol, "Lvol store bdev inserted\n");
192 
193 end:
194 	req->cb_fn(req->cb_arg, lvs, lvserrno);
195 	free(req);
196 
197 	return;
198 }
199 
200 int
201 vbdev_lvs_create(const char *base_bdev_name, const char *name, uint32_t cluster_sz,
202 		 enum lvs_clear_method clear_method, uint32_t num_md_pages_per_cluster_ratio,
203 		 spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
204 {
205 	struct spdk_bs_dev *bs_dev;
206 	struct spdk_lvs_with_handle_req *lvs_req;
207 	struct spdk_lvs_opts opts;
208 	int rc;
209 	int len;
210 
211 	if (base_bdev_name == NULL) {
212 		SPDK_ERRLOG("missing base_bdev_name param\n");
213 		return -EINVAL;
214 	}
215 
216 	spdk_lvs_opts_init(&opts);
217 	if (cluster_sz != 0) {
218 		opts.cluster_sz = cluster_sz;
219 	}
220 
221 	if (clear_method != 0) {
222 		opts.clear_method = clear_method;
223 	}
224 
225 	if (num_md_pages_per_cluster_ratio != 0) {
226 		opts.num_md_pages_per_cluster_ratio = num_md_pages_per_cluster_ratio;
227 	}
228 
229 	if (name == NULL) {
230 		SPDK_ERRLOG("missing name param\n");
231 		return -EINVAL;
232 	}
233 
234 	len = strnlen(name, SPDK_LVS_NAME_MAX);
235 
236 	if (len == 0 || len == SPDK_LVS_NAME_MAX) {
237 		SPDK_ERRLOG("name must be between 1 and %d characters\n", SPDK_LVS_NAME_MAX - 1);
238 		return -EINVAL;
239 	}
240 	snprintf(opts.name, sizeof(opts.name), "%s", name);
241 
242 	lvs_req = calloc(1, sizeof(*lvs_req));
243 	if (!lvs_req) {
244 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
245 		return -ENOMEM;
246 	}
247 
248 	rc = spdk_bdev_create_bs_dev_ext(base_bdev_name, vbdev_lvs_base_bdev_event_cb,
249 					 NULL, &bs_dev);
250 	if (rc < 0) {
251 		SPDK_ERRLOG("Cannot create blobstore device\n");
252 		free(lvs_req);
253 		return rc;
254 	}
255 
256 	lvs_req->bs_dev = bs_dev;
257 	lvs_req->base_bdev = bs_dev->get_base_bdev(bs_dev);
258 	lvs_req->cb_fn = cb_fn;
259 	lvs_req->cb_arg = cb_arg;
260 
261 	rc = spdk_lvs_init(bs_dev, &opts, _vbdev_lvs_create_cb, lvs_req);
262 	if (rc < 0) {
263 		free(lvs_req);
264 		bs_dev->destroy(bs_dev);
265 		return rc;
266 	}
267 
268 	return 0;
269 }
270 
271 static void
272 _vbdev_lvs_rename_cb(void *cb_arg, int lvserrno)
273 {
274 	struct spdk_lvs_req *req = cb_arg;
275 	struct spdk_lvol *tmp;
276 
277 	if (lvserrno != 0) {
278 		SPDK_INFOLOG(vbdev_lvol, "Lvol store rename failed\n");
279 	} else {
280 		TAILQ_FOREACH(tmp, &req->lvol_store->lvols, link) {
281 			/* We have to pass current lvol name, since only lvs name changed */
282 			_vbdev_lvol_change_bdev_alias(tmp, tmp->name);
283 		}
284 	}
285 
286 	req->cb_fn(req->cb_arg, lvserrno);
287 	free(req);
288 }
289 
290 void
291 vbdev_lvs_rename(struct spdk_lvol_store *lvs, const char *new_lvs_name,
292 		 spdk_lvs_op_complete cb_fn, void *cb_arg)
293 {
294 	struct lvol_store_bdev *lvs_bdev;
295 
296 	struct spdk_lvs_req *req;
297 
298 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
299 	if (!lvs_bdev) {
300 		SPDK_ERRLOG("No such lvol store found\n");
301 		cb_fn(cb_arg, -ENODEV);
302 		return;
303 	}
304 
305 	req = calloc(1, sizeof(*req));
306 	if (!req) {
307 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
308 		cb_fn(cb_arg, -ENOMEM);
309 		return;
310 	}
311 	req->cb_fn = cb_fn;
312 	req->cb_arg = cb_arg;
313 	req->lvol_store = lvs;
314 
315 	spdk_lvs_rename(lvs, new_lvs_name, _vbdev_lvs_rename_cb, req);
316 }
317 
318 static void
319 _vbdev_lvs_remove_cb(void *cb_arg, int lvserrno)
320 {
321 	struct lvol_store_bdev *lvs_bdev = cb_arg;
322 	struct spdk_lvs_req *req = lvs_bdev->req;
323 
324 	if (lvserrno != 0) {
325 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
326 	}
327 
328 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
329 	free(lvs_bdev);
330 
331 	if (req->cb_fn != NULL) {
332 		req->cb_fn(req->cb_arg, lvserrno);
333 	}
334 	free(req);
335 }
336 
337 static void
338 _vbdev_lvs_remove_lvol_cb(void *cb_arg, int lvolerrno)
339 {
340 	struct lvol_store_bdev *lvs_bdev = cb_arg;
341 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
342 	struct spdk_lvol *lvol;
343 
344 	if (lvolerrno != 0) {
345 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol removed with errno %d\n", lvolerrno);
346 	}
347 
348 	if (TAILQ_EMPTY(&lvs->lvols)) {
349 		spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
350 		return;
351 	}
352 
353 	lvol = TAILQ_FIRST(&lvs->lvols);
354 	while (lvol != NULL) {
355 		if (spdk_lvol_deletable(lvol)) {
356 			vbdev_lvol_destroy(lvol, _vbdev_lvs_remove_lvol_cb, lvs_bdev);
357 			return;
358 		}
359 		lvol = TAILQ_NEXT(lvol, link);
360 	}
361 
362 	/* If no lvol is deletable, that means there is circular dependency. */
363 	SPDK_ERRLOG("Lvols left in lvs, but unable to delete.\n");
364 	assert(false);
365 }
366 
367 static bool
368 _vbdev_lvs_are_lvols_closed(struct spdk_lvol_store *lvs)
369 {
370 	struct spdk_lvol *lvol;
371 
372 	TAILQ_FOREACH(lvol, &lvs->lvols, link) {
373 		if (lvol->ref_count != 0) {
374 			return false;
375 		}
376 	}
377 	return true;
378 }
379 
380 static void
381 _vbdev_lvs_remove_bdev_unregistered_cb(void *cb_arg, int bdeverrno)
382 {
383 	struct lvol_store_bdev *lvs_bdev = cb_arg;
384 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
385 
386 	if (bdeverrno != 0) {
387 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol unregistered with errno %d\n", bdeverrno);
388 	}
389 
390 	/* Lvol store can be unloaded once all lvols are closed. */
391 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
392 		spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
393 	}
394 }
395 
396 static void
397 _vbdev_lvs_remove(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg,
398 		  bool destroy)
399 {
400 	struct spdk_lvs_req *req;
401 	struct lvol_store_bdev *lvs_bdev;
402 	struct spdk_lvol *lvol, *tmp;
403 
404 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
405 	if (!lvs_bdev) {
406 		SPDK_ERRLOG("No such lvol store found\n");
407 		if (cb_fn != NULL) {
408 			cb_fn(cb_arg, -ENODEV);
409 		}
410 		return;
411 	}
412 
413 	req = calloc(1, sizeof(*req));
414 	if (!req) {
415 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
416 		if (cb_fn != NULL) {
417 			cb_fn(cb_arg, -ENOMEM);
418 		}
419 		return;
420 	}
421 
422 	req->cb_fn = cb_fn;
423 	req->cb_arg = cb_arg;
424 	lvs_bdev->req = req;
425 
426 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
427 		if (destroy) {
428 			spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
429 		} else {
430 			spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
431 		}
432 	} else {
433 		lvs->destruct = destroy;
434 		if (destroy) {
435 			_vbdev_lvs_remove_lvol_cb(lvs_bdev, 0);
436 		} else {
437 			TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
438 				spdk_bdev_unregister(lvol->bdev, _vbdev_lvs_remove_bdev_unregistered_cb, lvs_bdev);
439 			}
440 		}
441 	}
442 }
443 
444 void
445 vbdev_lvs_unload(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
446 {
447 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, false);
448 }
449 
450 void
451 vbdev_lvs_destruct(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
452 {
453 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, true);
454 }
455 
456 struct lvol_store_bdev *
457 vbdev_lvol_store_first(void)
458 {
459 	struct lvol_store_bdev *lvs_bdev;
460 
461 	lvs_bdev = TAILQ_FIRST(&g_spdk_lvol_pairs);
462 	if (lvs_bdev) {
463 		SPDK_INFOLOG(vbdev_lvol, "Starting lvolstore iteration at %p\n", lvs_bdev->lvs);
464 	}
465 
466 	return lvs_bdev;
467 }
468 
469 struct lvol_store_bdev *
470 vbdev_lvol_store_next(struct lvol_store_bdev *prev)
471 {
472 	struct lvol_store_bdev *lvs_bdev;
473 
474 	if (prev == NULL) {
475 		SPDK_ERRLOG("prev argument cannot be NULL\n");
476 		return NULL;
477 	}
478 
479 	lvs_bdev = TAILQ_NEXT(prev, lvol_stores);
480 	if (lvs_bdev) {
481 		SPDK_INFOLOG(vbdev_lvol, "Continuing lvolstore iteration at %p\n", lvs_bdev->lvs);
482 	}
483 
484 	return lvs_bdev;
485 }
486 
487 static struct spdk_lvol_store *
488 _vbdev_get_lvol_store_by_uuid(const struct spdk_uuid *uuid)
489 {
490 	struct spdk_lvol_store *lvs = NULL;
491 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
492 
493 	while (lvs_bdev != NULL) {
494 		lvs = lvs_bdev->lvs;
495 		if (spdk_uuid_compare(&lvs->uuid, uuid) == 0) {
496 			return lvs;
497 		}
498 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
499 	}
500 	return NULL;
501 }
502 
503 struct spdk_lvol_store *
504 vbdev_get_lvol_store_by_uuid(const char *uuid_str)
505 {
506 	struct spdk_uuid uuid;
507 
508 	if (spdk_uuid_parse(&uuid, uuid_str)) {
509 		return NULL;
510 	}
511 
512 	return _vbdev_get_lvol_store_by_uuid(&uuid);
513 }
514 
515 struct spdk_lvol_store *
516 vbdev_get_lvol_store_by_name(const char *name)
517 {
518 	struct spdk_lvol_store *lvs = NULL;
519 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
520 
521 	while (lvs_bdev != NULL) {
522 		lvs = lvs_bdev->lvs;
523 		if (strncmp(lvs->name, name, sizeof(lvs->name)) == 0) {
524 			return lvs;
525 		}
526 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
527 	}
528 	return NULL;
529 }
530 
531 struct vbdev_lvol_destroy_ctx {
532 	struct spdk_lvol *lvol;
533 	spdk_lvol_op_complete cb_fn;
534 	void *cb_arg;
535 };
536 
537 static void
538 _vbdev_lvol_unregister_unload_lvs(void *cb_arg, int lvserrno)
539 {
540 	struct lvol_bdev *lvol_bdev = cb_arg;
541 	struct lvol_store_bdev *lvs_bdev = lvol_bdev->lvs_bdev;
542 
543 	if (lvserrno != 0) {
544 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
545 	}
546 
547 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
548 	free(lvs_bdev);
549 
550 	spdk_bdev_destruct_done(&lvol_bdev->bdev, lvserrno);
551 	free(lvol_bdev);
552 }
553 
554 static void
555 _vbdev_lvol_unregister_cb(void *ctx, int lvolerrno)
556 {
557 	struct lvol_bdev *lvol_bdev = ctx;
558 	struct lvol_store_bdev *lvs_bdev = lvol_bdev->lvs_bdev;
559 
560 	if (g_shutdown_started && _vbdev_lvs_are_lvols_closed(lvs_bdev->lvs)) {
561 		spdk_lvs_unload(lvs_bdev->lvs, _vbdev_lvol_unregister_unload_lvs, lvol_bdev);
562 		return;
563 	}
564 
565 	spdk_bdev_destruct_done(&lvol_bdev->bdev, lvolerrno);
566 	free(lvol_bdev);
567 }
568 
569 static int
570 vbdev_lvol_unregister(void *ctx)
571 {
572 	struct spdk_lvol *lvol = ctx;
573 	struct lvol_bdev *lvol_bdev;
574 
575 	assert(lvol != NULL);
576 	lvol_bdev = SPDK_CONTAINEROF(lvol->bdev, struct lvol_bdev, bdev);
577 
578 	spdk_bdev_alias_del_all(lvol->bdev);
579 	spdk_lvol_close(lvol, _vbdev_lvol_unregister_cb, lvol_bdev);
580 
581 	/* return 1 to indicate we have an operation that must finish asynchronously before the
582 	 *  lvol is closed
583 	 */
584 	return 1;
585 }
586 
587 static void
588 _vbdev_lvol_destroy_cb(void *cb_arg, int bdeverrno)
589 {
590 	struct vbdev_lvol_destroy_ctx *ctx = cb_arg;
591 	struct spdk_lvol *lvol = ctx->lvol;
592 
593 	if (bdeverrno < 0) {
594 		SPDK_INFOLOG(vbdev_lvol, "Could not unregister bdev during lvol (%s) destroy\n",
595 			     lvol->unique_id);
596 		ctx->cb_fn(ctx->cb_arg, bdeverrno);
597 		free(ctx);
598 		return;
599 	}
600 
601 	spdk_lvol_destroy(lvol, ctx->cb_fn, ctx->cb_arg);
602 	free(ctx);
603 }
604 
605 void
606 vbdev_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
607 {
608 	struct vbdev_lvol_destroy_ctx *ctx;
609 	size_t count;
610 
611 	assert(lvol != NULL);
612 	assert(cb_fn != NULL);
613 
614 	/* Check if it is possible to delete lvol */
615 	spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
616 	if (count > 1) {
617 		/* throw an error */
618 		SPDK_ERRLOG("Cannot delete lvol\n");
619 		cb_fn(cb_arg, -EPERM);
620 		return;
621 	}
622 
623 	ctx = calloc(1, sizeof(*ctx));
624 	if (!ctx) {
625 		cb_fn(cb_arg, -ENOMEM);
626 		return;
627 	}
628 
629 	ctx->lvol = lvol;
630 	ctx->cb_fn = cb_fn;
631 	ctx->cb_arg = cb_arg;
632 
633 	spdk_bdev_unregister(lvol->bdev, _vbdev_lvol_destroy_cb, ctx);
634 }
635 
636 static char *
637 vbdev_lvol_find_name(struct spdk_lvol *lvol, spdk_blob_id blob_id)
638 {
639 	struct spdk_lvol_store *lvs;
640 	struct spdk_lvol *_lvol;
641 
642 	assert(lvol != NULL);
643 
644 	lvs = lvol->lvol_store;
645 
646 	assert(lvs);
647 
648 	TAILQ_FOREACH(_lvol, &lvs->lvols, link) {
649 		if (_lvol->blob_id == blob_id) {
650 			return _lvol->name;
651 		}
652 	}
653 
654 	return NULL;
655 }
656 
657 static int
658 vbdev_lvol_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
659 {
660 	struct spdk_lvol *lvol = ctx;
661 	struct lvol_store_bdev *lvs_bdev;
662 	struct spdk_bdev *bdev;
663 	struct spdk_blob *blob;
664 	char lvol_store_uuid[SPDK_UUID_STRING_LEN];
665 	spdk_blob_id *ids = NULL;
666 	size_t count, i;
667 	char *name;
668 	int rc = 0;
669 
670 	spdk_json_write_named_object_begin(w, "lvol");
671 
672 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
673 	if (!lvs_bdev) {
674 		SPDK_ERRLOG("No such lvol store found\n");
675 		rc = -ENODEV;
676 		goto end;
677 	}
678 
679 	bdev = lvs_bdev->bdev;
680 
681 	spdk_uuid_fmt_lower(lvol_store_uuid, sizeof(lvol_store_uuid), &lvol->lvol_store->uuid);
682 	spdk_json_write_named_string(w, "lvol_store_uuid", lvol_store_uuid);
683 
684 	spdk_json_write_named_string(w, "base_bdev", spdk_bdev_get_name(bdev));
685 
686 	blob = lvol->blob;
687 
688 	spdk_json_write_named_bool(w, "thin_provision", spdk_blob_is_thin_provisioned(blob));
689 
690 	spdk_json_write_named_bool(w, "snapshot", spdk_blob_is_snapshot(blob));
691 
692 	spdk_json_write_named_bool(w, "clone", spdk_blob_is_clone(blob));
693 
694 	if (spdk_blob_is_clone(blob)) {
695 		spdk_blob_id snapshotid = spdk_blob_get_parent_snapshot(lvol->lvol_store->blobstore, lvol->blob_id);
696 		if (snapshotid != SPDK_BLOBID_INVALID) {
697 			name = vbdev_lvol_find_name(lvol, snapshotid);
698 			if (name != NULL) {
699 				spdk_json_write_named_string(w, "base_snapshot", name);
700 			} else {
701 				SPDK_ERRLOG("Cannot obtain snapshots name\n");
702 			}
703 		}
704 	}
705 
706 	if (spdk_blob_is_snapshot(blob)) {
707 		/* Take a number of clones */
708 		rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
709 		if (rc == -ENOMEM && count > 0) {
710 			ids = malloc(sizeof(spdk_blob_id) * count);
711 			if (ids == NULL) {
712 				SPDK_ERRLOG("Cannot allocate memory\n");
713 				rc = -ENOMEM;
714 				goto end;
715 			}
716 
717 			rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, ids, &count);
718 			if (rc == 0) {
719 				spdk_json_write_named_array_begin(w, "clones");
720 				for (i = 0; i < count; i++) {
721 					name = vbdev_lvol_find_name(lvol, ids[i]);
722 					if (name != NULL) {
723 						spdk_json_write_string(w, name);
724 					} else {
725 						SPDK_ERRLOG("Cannot obtain clone name\n");
726 					}
727 
728 				}
729 				spdk_json_write_array_end(w);
730 			}
731 			free(ids);
732 		}
733 
734 	}
735 
736 end:
737 	spdk_json_write_object_end(w);
738 
739 	return rc;
740 }
741 
742 static void
743 vbdev_lvol_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
744 {
745 	/* Nothing to dump as lvol configuration is saved on physical device. */
746 }
747 
748 static struct spdk_io_channel *
749 vbdev_lvol_get_io_channel(void *ctx)
750 {
751 	struct spdk_lvol *lvol = ctx;
752 
753 	return spdk_lvol_get_io_channel(lvol);
754 }
755 
756 static bool
757 vbdev_lvol_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
758 {
759 	struct spdk_lvol *lvol = ctx;
760 
761 	switch (io_type) {
762 	case SPDK_BDEV_IO_TYPE_WRITE:
763 	case SPDK_BDEV_IO_TYPE_UNMAP:
764 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
765 		return !spdk_blob_is_read_only(lvol->blob);
766 	case SPDK_BDEV_IO_TYPE_RESET:
767 	case SPDK_BDEV_IO_TYPE_READ:
768 		return true;
769 	default:
770 		return false;
771 	}
772 }
773 
774 static void
775 lvol_op_comp(void *cb_arg, int bserrno)
776 {
777 	struct spdk_bdev_io *bdev_io = cb_arg;
778 	enum spdk_bdev_io_status status = SPDK_BDEV_IO_STATUS_SUCCESS;
779 
780 	if (bserrno != 0) {
781 		if (bserrno == -ENOMEM) {
782 			status = SPDK_BDEV_IO_STATUS_NOMEM;
783 		} else {
784 			status = SPDK_BDEV_IO_STATUS_FAILED;
785 		}
786 	}
787 
788 	spdk_bdev_io_complete(bdev_io, status);
789 }
790 
791 static void
792 lvol_unmap(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
793 {
794 	uint64_t start_page, num_pages;
795 	struct spdk_blob *blob = lvol->blob;
796 
797 	start_page = bdev_io->u.bdev.offset_blocks;
798 	num_pages = bdev_io->u.bdev.num_blocks;
799 
800 	spdk_blob_io_unmap(blob, ch, start_page, num_pages, lvol_op_comp, bdev_io);
801 }
802 
803 static void
804 lvol_write_zeroes(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
805 {
806 	uint64_t start_page, num_pages;
807 	struct spdk_blob *blob = lvol->blob;
808 
809 	start_page = bdev_io->u.bdev.offset_blocks;
810 	num_pages = bdev_io->u.bdev.num_blocks;
811 
812 	spdk_blob_io_write_zeroes(blob, ch, start_page, num_pages, lvol_op_comp, bdev_io);
813 }
814 
815 static void
816 lvol_read(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
817 {
818 	uint64_t start_page, num_pages;
819 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
820 	struct spdk_blob *blob = lvol->blob;
821 
822 	start_page = bdev_io->u.bdev.offset_blocks;
823 	num_pages = bdev_io->u.bdev.num_blocks;
824 
825 	if (bdev_io->u.bdev.ext_opts) {
826 		struct vbdev_lvol_io *lvol_io = (struct vbdev_lvol_io *)bdev_io->driver_ctx;
827 
828 		lvol_io->ext_io_opts.size = sizeof(lvol_io->ext_io_opts);
829 		lvol_io->ext_io_opts.memory_domain = bdev_io->u.bdev.ext_opts->memory_domain;
830 		lvol_io->ext_io_opts.memory_domain_ctx = bdev_io->u.bdev.ext_opts->memory_domain_ctx;
831 		/* Save a pointer to ext_opts passed by the user, it will be used in bs_dev readv/writev_ext functions
832 		 * to restore ext_opts structure. That is done since bdev and blob extended functions use different
833 		 * extended opts structures */
834 		lvol_io->ext_io_opts.user_ctx = bdev_io->u.bdev.ext_opts;
835 
836 		spdk_blob_io_readv_ext(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
837 				       num_pages, lvol_op_comp, bdev_io, &lvol_io->ext_io_opts);
838 	} else {
839 		spdk_blob_io_readv(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
840 				   num_pages, lvol_op_comp, bdev_io);
841 	}
842 }
843 
844 static void
845 lvol_write(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
846 {
847 	uint64_t start_page, num_pages;
848 	struct spdk_blob *blob = lvol->blob;
849 
850 	start_page = bdev_io->u.bdev.offset_blocks;
851 	num_pages = bdev_io->u.bdev.num_blocks;
852 
853 	if (bdev_io->u.bdev.ext_opts) {
854 		struct vbdev_lvol_io *lvol_io = (struct vbdev_lvol_io *)bdev_io->driver_ctx;
855 
856 		lvol_io->ext_io_opts.size = sizeof(lvol_io->ext_io_opts);
857 		lvol_io->ext_io_opts.memory_domain = bdev_io->u.bdev.ext_opts->memory_domain;
858 		lvol_io->ext_io_opts.memory_domain_ctx = bdev_io->u.bdev.ext_opts->memory_domain_ctx;
859 		/* Save a pointer to ext_opts passed by the user, it will be used in bs_dev readv/writev_ext functions
860 		 * to restore ext_opts structure. That is done since bdev and blob extended functions use different
861 		 * extended opts structures */
862 		lvol_io->ext_io_opts.user_ctx = bdev_io->u.bdev.ext_opts;
863 
864 		spdk_blob_io_writev_ext(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
865 					num_pages, lvol_op_comp, bdev_io, &lvol_io->ext_io_opts);
866 	} else {
867 		spdk_blob_io_writev(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
868 				    num_pages, lvol_op_comp, bdev_io);
869 	}
870 }
871 
872 static int
873 lvol_reset(struct spdk_bdev_io *bdev_io)
874 {
875 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
876 
877 	return 0;
878 }
879 
880 static void
881 lvol_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
882 {
883 	if (!success) {
884 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
885 		return;
886 	}
887 
888 	lvol_read(ch, bdev_io);
889 }
890 
891 static void
892 vbdev_lvol_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
893 {
894 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
895 
896 	switch (bdev_io->type) {
897 	case SPDK_BDEV_IO_TYPE_READ:
898 		spdk_bdev_io_get_buf(bdev_io, lvol_get_buf_cb,
899 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
900 		break;
901 	case SPDK_BDEV_IO_TYPE_WRITE:
902 		lvol_write(lvol, ch, bdev_io);
903 		break;
904 	case SPDK_BDEV_IO_TYPE_RESET:
905 		lvol_reset(bdev_io);
906 		break;
907 	case SPDK_BDEV_IO_TYPE_UNMAP:
908 		lvol_unmap(lvol, ch, bdev_io);
909 		break;
910 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
911 		lvol_write_zeroes(lvol, ch, bdev_io);
912 		break;
913 	default:
914 		SPDK_INFOLOG(vbdev_lvol, "lvol: unsupported I/O type %d\n", bdev_io->type);
915 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
916 		return;
917 	}
918 	return;
919 }
920 
921 static int
922 vbdev_lvol_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
923 {
924 	struct spdk_lvol *lvol = ctx;
925 	struct spdk_bdev *base_bdev;
926 
927 	base_bdev = lvol->lvol_store->bs_dev->get_base_bdev(lvol->lvol_store->bs_dev);
928 
929 	return spdk_bdev_get_memory_domains(base_bdev, domains, array_size);
930 }
931 
932 static struct spdk_bdev_fn_table vbdev_lvol_fn_table = {
933 	.destruct		= vbdev_lvol_unregister,
934 	.io_type_supported	= vbdev_lvol_io_type_supported,
935 	.submit_request		= vbdev_lvol_submit_request,
936 	.get_io_channel		= vbdev_lvol_get_io_channel,
937 	.dump_info_json		= vbdev_lvol_dump_info_json,
938 	.write_config_json	= vbdev_lvol_write_config_json,
939 	.get_memory_domains	= vbdev_lvol_get_memory_domains,
940 };
941 
942 static void
943 lvol_destroy_cb(void *cb_arg, int bdeverrno)
944 {
945 }
946 
947 static void
948 _create_lvol_disk_destroy_cb(void *cb_arg, int bdeverrno)
949 {
950 	struct spdk_lvol *lvol = cb_arg;
951 
952 	if (bdeverrno < 0) {
953 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
954 			    lvol->unique_id);
955 		return;
956 	}
957 
958 	spdk_lvol_destroy(lvol, lvol_destroy_cb, NULL);
959 }
960 
961 static void
962 _create_lvol_disk_unload_cb(void *cb_arg, int bdeverrno)
963 {
964 	struct spdk_lvol *lvol = cb_arg;
965 
966 	if (bdeverrno < 0) {
967 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
968 			    lvol->unique_id);
969 		return;
970 	}
971 
972 	TAILQ_REMOVE(&lvol->lvol_store->lvols, lvol, link);
973 	free(lvol);
974 }
975 
976 static int
977 _create_lvol_disk(struct spdk_lvol *lvol, bool destroy)
978 {
979 	struct spdk_bdev *bdev;
980 	struct lvol_bdev *lvol_bdev;
981 	struct lvol_store_bdev *lvs_bdev;
982 	uint64_t total_size;
983 	unsigned char *alias;
984 	int rc;
985 
986 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
987 	if (lvs_bdev == NULL) {
988 		SPDK_ERRLOG("No spdk lvs-bdev pair found for lvol %s\n", lvol->unique_id);
989 		return -ENODEV;
990 	}
991 
992 	lvol_bdev = calloc(1, sizeof(struct lvol_bdev));
993 	if (!lvol_bdev) {
994 		SPDK_ERRLOG("Cannot alloc memory for lvol bdev\n");
995 		return -ENOMEM;
996 	}
997 
998 	lvol_bdev->lvol = lvol;
999 	lvol_bdev->lvs_bdev = lvs_bdev;
1000 
1001 	bdev = &lvol_bdev->bdev;
1002 	bdev->name = lvol->unique_id;
1003 	bdev->product_name = "Logical Volume";
1004 	bdev->blocklen = spdk_bs_get_io_unit_size(lvol->lvol_store->blobstore);
1005 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
1006 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
1007 	assert((total_size % bdev->blocklen) == 0);
1008 	bdev->blockcnt = total_size / bdev->blocklen;
1009 	bdev->uuid = lvol->uuid;
1010 	bdev->required_alignment = lvs_bdev->bdev->required_alignment;
1011 	bdev->split_on_optimal_io_boundary = true;
1012 	bdev->optimal_io_boundary = spdk_bs_get_cluster_size(lvol->lvol_store->blobstore) / bdev->blocklen;
1013 
1014 	bdev->ctxt = lvol;
1015 	bdev->fn_table = &vbdev_lvol_fn_table;
1016 	bdev->module = &g_lvol_if;
1017 
1018 	rc = spdk_bdev_register(bdev);
1019 	if (rc) {
1020 		free(lvol_bdev);
1021 		return rc;
1022 	}
1023 	lvol->bdev = bdev;
1024 
1025 	alias = spdk_sprintf_alloc("%s/%s", lvs_bdev->lvs->name, lvol->name);
1026 	if (alias == NULL) {
1027 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
1028 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
1029 						  _create_lvol_disk_unload_cb), lvol);
1030 		return -ENOMEM;
1031 	}
1032 
1033 	rc = spdk_bdev_alias_add(bdev, alias);
1034 	if (rc != 0) {
1035 		SPDK_ERRLOG("Cannot add alias to lvol bdev\n");
1036 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
1037 						  _create_lvol_disk_unload_cb), lvol);
1038 	}
1039 	free(alias);
1040 
1041 	return rc;
1042 }
1043 
1044 static void
1045 _vbdev_lvol_create_cb(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1046 {
1047 	struct spdk_lvol_with_handle_req *req = cb_arg;
1048 
1049 	if (lvolerrno < 0) {
1050 		goto end;
1051 	}
1052 
1053 	lvolerrno = _create_lvol_disk(lvol, true);
1054 
1055 end:
1056 	req->cb_fn(req->cb_arg, lvol, lvolerrno);
1057 	free(req);
1058 }
1059 
1060 int
1061 vbdev_lvol_create(struct spdk_lvol_store *lvs, const char *name, uint64_t sz,
1062 		  bool thin_provision, enum lvol_clear_method clear_method, spdk_lvol_op_with_handle_complete cb_fn,
1063 		  void *cb_arg)
1064 {
1065 	struct spdk_lvol_with_handle_req *req;
1066 	int rc;
1067 
1068 	req = calloc(1, sizeof(*req));
1069 	if (req == NULL) {
1070 		return -ENOMEM;
1071 	}
1072 	req->cb_fn = cb_fn;
1073 	req->cb_arg = cb_arg;
1074 
1075 	rc = spdk_lvol_create(lvs, name, sz, thin_provision, clear_method,
1076 			      _vbdev_lvol_create_cb, req);
1077 	if (rc != 0) {
1078 		free(req);
1079 	}
1080 
1081 	return rc;
1082 }
1083 
1084 void
1085 vbdev_lvol_create_snapshot(struct spdk_lvol *lvol, const char *snapshot_name,
1086 			   spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1087 {
1088 	struct spdk_lvol_with_handle_req *req;
1089 
1090 	req = calloc(1, sizeof(*req));
1091 	if (req == NULL) {
1092 		cb_fn(cb_arg, NULL, -ENOMEM);
1093 		return;
1094 	}
1095 
1096 	req->cb_fn = cb_fn;
1097 	req->cb_arg = cb_arg;
1098 
1099 	spdk_lvol_create_snapshot(lvol, snapshot_name, _vbdev_lvol_create_cb, req);
1100 }
1101 
1102 void
1103 vbdev_lvol_create_clone(struct spdk_lvol *lvol, const char *clone_name,
1104 			spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1105 {
1106 	struct spdk_lvol_with_handle_req *req;
1107 
1108 	req = calloc(1, sizeof(*req));
1109 	if (req == NULL) {
1110 		cb_fn(cb_arg, NULL, -ENOMEM);
1111 		return;
1112 	}
1113 
1114 	req->cb_fn = cb_fn;
1115 	req->cb_arg = cb_arg;
1116 
1117 	spdk_lvol_create_clone(lvol, clone_name, _vbdev_lvol_create_cb, req);
1118 }
1119 
1120 static void
1121 _vbdev_lvol_rename_cb(void *cb_arg, int lvolerrno)
1122 {
1123 	struct spdk_lvol_req *req = cb_arg;
1124 
1125 	if (lvolerrno != 0) {
1126 		SPDK_ERRLOG("Renaming lvol failed\n");
1127 	}
1128 
1129 	req->cb_fn(req->cb_arg, lvolerrno);
1130 	free(req);
1131 }
1132 
1133 void
1134 vbdev_lvol_rename(struct spdk_lvol *lvol, const char *new_lvol_name,
1135 		  spdk_lvol_op_complete cb_fn, void *cb_arg)
1136 {
1137 	struct spdk_lvol_req *req;
1138 	int rc;
1139 
1140 	rc = _vbdev_lvol_change_bdev_alias(lvol, new_lvol_name);
1141 	if (rc != 0) {
1142 		SPDK_ERRLOG("renaming lvol to '%s' does not succeed\n", new_lvol_name);
1143 		cb_fn(cb_arg, rc);
1144 		return;
1145 	}
1146 
1147 	req = calloc(1, sizeof(*req));
1148 	if (req == NULL) {
1149 		cb_fn(cb_arg, -ENOMEM);
1150 		return;
1151 	}
1152 	req->cb_fn = cb_fn;
1153 	req->cb_arg = cb_arg;
1154 
1155 	spdk_lvol_rename(lvol, new_lvol_name, _vbdev_lvol_rename_cb, req);
1156 }
1157 
1158 static void
1159 _vbdev_lvol_resize_cb(void *cb_arg, int lvolerrno)
1160 {
1161 	struct spdk_lvol_req *req = cb_arg;
1162 	struct spdk_lvol *lvol = req->lvol;
1163 	uint64_t total_size;
1164 
1165 	/* change bdev size */
1166 	if (lvolerrno != 0) {
1167 		SPDK_ERRLOG("CB function for bdev lvol %s receive error no: %d.\n", lvol->name, lvolerrno);
1168 		goto finish;
1169 	}
1170 
1171 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
1172 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
1173 	assert((total_size % lvol->bdev->blocklen) == 0);
1174 
1175 	lvolerrno = spdk_bdev_notify_blockcnt_change(lvol->bdev, total_size / lvol->bdev->blocklen);
1176 	if (lvolerrno != 0) {
1177 		SPDK_ERRLOG("Could not change num blocks for bdev lvol %s with error no: %d.\n",
1178 			    lvol->name, lvolerrno);
1179 	}
1180 
1181 finish:
1182 	req->cb_fn(req->cb_arg, lvolerrno);
1183 	free(req);
1184 }
1185 
1186 void
1187 vbdev_lvol_resize(struct spdk_lvol *lvol, uint64_t sz, spdk_lvol_op_complete cb_fn, void *cb_arg)
1188 {
1189 	struct spdk_lvol_req *req;
1190 
1191 	if (lvol == NULL) {
1192 		SPDK_ERRLOG("lvol does not exist\n");
1193 		cb_fn(cb_arg, -EINVAL);
1194 		return;
1195 	}
1196 
1197 	assert(lvol->bdev != NULL);
1198 
1199 	req = calloc(1, sizeof(*req));
1200 	if (req == NULL) {
1201 		cb_fn(cb_arg, -ENOMEM);
1202 		return;
1203 	}
1204 
1205 	req->cb_fn = cb_fn;
1206 	req->cb_arg = cb_arg;
1207 	req->sz = sz;
1208 	req->lvol = lvol;
1209 
1210 	spdk_lvol_resize(req->lvol, req->sz, _vbdev_lvol_resize_cb, req);
1211 }
1212 
1213 static void
1214 _vbdev_lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
1215 {
1216 	struct spdk_lvol_req *req = cb_arg;
1217 	struct spdk_lvol *lvol = req->lvol;
1218 
1219 	if (lvolerrno != 0) {
1220 		SPDK_ERRLOG("Could not set bdev lvol %s as read only due to error: %d.\n", lvol->name, lvolerrno);
1221 	}
1222 
1223 	req->cb_fn(req->cb_arg, lvolerrno);
1224 	free(req);
1225 }
1226 
1227 void
1228 vbdev_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1229 {
1230 	struct spdk_lvol_req *req;
1231 
1232 	if (lvol == NULL) {
1233 		SPDK_ERRLOG("lvol does not exist\n");
1234 		cb_fn(cb_arg, -EINVAL);
1235 		return;
1236 	}
1237 
1238 	assert(lvol->bdev != NULL);
1239 
1240 	req = calloc(1, sizeof(*req));
1241 	if (req == NULL) {
1242 		cb_fn(cb_arg, -ENOMEM);
1243 		return;
1244 	}
1245 
1246 	req->cb_fn = cb_fn;
1247 	req->cb_arg = cb_arg;
1248 	req->lvol = lvol;
1249 
1250 	spdk_lvol_set_read_only(lvol, _vbdev_lvol_set_read_only_cb, req);
1251 }
1252 
1253 static int
1254 vbdev_lvs_init(void)
1255 {
1256 	return 0;
1257 }
1258 
1259 static void vbdev_lvs_fini_start_iter(struct lvol_store_bdev *lvs_bdev);
1260 
1261 static void
1262 vbdev_lvs_fini_start_unload_cb(void *cb_arg, int lvserrno)
1263 {
1264 	struct lvol_store_bdev *lvs_bdev = cb_arg;
1265 	struct lvol_store_bdev *next_lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
1266 
1267 	if (lvserrno != 0) {
1268 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
1269 	}
1270 
1271 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1272 	free(lvs_bdev);
1273 
1274 	vbdev_lvs_fini_start_iter(next_lvs_bdev);
1275 }
1276 
1277 static void
1278 vbdev_lvs_fini_start_iter(struct lvol_store_bdev *lvs_bdev)
1279 {
1280 	struct spdk_lvol_store *lvs;
1281 
1282 	while (lvs_bdev != NULL) {
1283 		lvs = lvs_bdev->lvs;
1284 
1285 		if (_vbdev_lvs_are_lvols_closed(lvs)) {
1286 			spdk_lvs_unload(lvs, vbdev_lvs_fini_start_unload_cb, lvs_bdev);
1287 			return;
1288 		}
1289 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
1290 	}
1291 
1292 	spdk_bdev_module_fini_start_done();
1293 }
1294 
1295 static void
1296 vbdev_lvs_fini_start(void)
1297 {
1298 	g_shutdown_started = true;
1299 	vbdev_lvs_fini_start_iter(vbdev_lvol_store_first());
1300 }
1301 
1302 static int
1303 vbdev_lvs_get_ctx_size(void)
1304 {
1305 	return sizeof(struct vbdev_lvol_io);
1306 }
1307 
1308 static void
1309 _vbdev_lvs_examine_failed(void *cb_arg, int lvserrno)
1310 {
1311 	spdk_bdev_module_examine_done(&g_lvol_if);
1312 }
1313 
1314 static void
1315 _vbdev_lvol_examine_close_cb(struct spdk_lvol_store *lvs)
1316 {
1317 	if (lvs->lvols_opened >= lvs->lvol_count) {
1318 		SPDK_INFOLOG(vbdev_lvol, "Opening lvols finished\n");
1319 		spdk_bdev_module_examine_done(&g_lvol_if);
1320 	}
1321 }
1322 
1323 static void
1324 _vbdev_lvs_examine_finish(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1325 {
1326 	struct spdk_lvol_store *lvs = cb_arg;
1327 
1328 	if (lvolerrno != 0) {
1329 		SPDK_ERRLOG("Error opening lvol %s\n", lvol->unique_id);
1330 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
1331 		lvs->lvol_count--;
1332 		free(lvol);
1333 		goto end;
1334 	}
1335 
1336 	if (_create_lvol_disk(lvol, false)) {
1337 		SPDK_ERRLOG("Cannot create bdev for lvol %s\n", lvol->unique_id);
1338 		lvs->lvol_count--;
1339 		_vbdev_lvol_examine_close_cb(lvs);
1340 		SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s failed\n", lvol->unique_id);
1341 		return;
1342 	}
1343 
1344 	lvs->lvols_opened++;
1345 	SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s succeeded\n", lvol->unique_id);
1346 
1347 end:
1348 
1349 	if (lvs->lvols_opened >= lvs->lvol_count) {
1350 		SPDK_INFOLOG(vbdev_lvol, "Opening lvols finished\n");
1351 		spdk_bdev_module_examine_done(&g_lvol_if);
1352 	}
1353 }
1354 
1355 static void
1356 _vbdev_lvs_examine_cb(void *arg, struct spdk_lvol_store *lvol_store, int lvserrno)
1357 {
1358 	struct lvol_store_bdev *lvs_bdev;
1359 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)arg;
1360 	struct spdk_lvol *lvol, *tmp;
1361 
1362 	if (lvserrno == -EEXIST) {
1363 		SPDK_INFOLOG(vbdev_lvol,
1364 			     "Name for lvolstore on device %s conflicts with name for already loaded lvs\n",
1365 			     req->base_bdev->name);
1366 		/* On error blobstore destroys bs_dev itself */
1367 		spdk_bdev_module_examine_done(&g_lvol_if);
1368 		goto end;
1369 	} else if (lvserrno != 0) {
1370 		SPDK_INFOLOG(vbdev_lvol, "Lvol store not found on %s\n", req->base_bdev->name);
1371 		/* On error blobstore destroys bs_dev itself */
1372 		spdk_bdev_module_examine_done(&g_lvol_if);
1373 		goto end;
1374 	}
1375 
1376 	lvserrno = spdk_bs_bdev_claim(lvol_store->bs_dev, &g_lvol_if);
1377 	if (lvserrno != 0) {
1378 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
1379 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, NULL);
1380 		goto end;
1381 	}
1382 
1383 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
1384 	if (!lvs_bdev) {
1385 		SPDK_ERRLOG("Cannot alloc memory for lvs_bdev\n");
1386 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, NULL);
1387 		goto end;
1388 	}
1389 
1390 	lvs_bdev->lvs = lvol_store;
1391 	lvs_bdev->bdev = req->base_bdev;
1392 
1393 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1394 
1395 	SPDK_INFOLOG(vbdev_lvol, "Lvol store found on %s - begin parsing\n",
1396 		     req->base_bdev->name);
1397 
1398 	lvol_store->lvols_opened = 0;
1399 
1400 	if (TAILQ_EMPTY(&lvol_store->lvols)) {
1401 		SPDK_INFOLOG(vbdev_lvol, "Lvol store examination done\n");
1402 		spdk_bdev_module_examine_done(&g_lvol_if);
1403 	} else {
1404 		/* Open all lvols */
1405 		TAILQ_FOREACH_SAFE(lvol, &lvol_store->lvols, link, tmp) {
1406 			spdk_lvol_open(lvol, _vbdev_lvs_examine_finish, lvol_store);
1407 		}
1408 	}
1409 
1410 end:
1411 	free(req);
1412 }
1413 
1414 static void
1415 vbdev_lvs_examine(struct spdk_bdev *bdev)
1416 {
1417 	struct spdk_bs_dev *bs_dev;
1418 	struct spdk_lvs_with_handle_req *req;
1419 	int rc;
1420 
1421 	if (spdk_bdev_get_md_size(bdev) != 0) {
1422 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n which is formatted with metadata",
1423 			     bdev->name);
1424 		spdk_bdev_module_examine_done(&g_lvol_if);
1425 		return;
1426 	}
1427 
1428 	req = calloc(1, sizeof(*req));
1429 	if (req == NULL) {
1430 		spdk_bdev_module_examine_done(&g_lvol_if);
1431 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1432 		return;
1433 	}
1434 
1435 	rc = spdk_bdev_create_bs_dev_ext(bdev->name, vbdev_lvs_base_bdev_event_cb,
1436 					 NULL, &bs_dev);
1437 	if (rc < 0) {
1438 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n", bdev->name);
1439 		spdk_bdev_module_examine_done(&g_lvol_if);
1440 		free(req);
1441 		return;
1442 	}
1443 
1444 	req->base_bdev = bdev;
1445 
1446 	spdk_lvs_load(bs_dev, _vbdev_lvs_examine_cb, req);
1447 }
1448 
1449 struct spdk_lvol *
1450 vbdev_lvol_get_from_bdev(struct spdk_bdev *bdev)
1451 {
1452 	if (!bdev || bdev->module != &g_lvol_if) {
1453 		return NULL;
1454 	}
1455 
1456 	if (bdev->ctxt == NULL) {
1457 		SPDK_ERRLOG("No lvol ctx assigned to bdev %s\n", bdev->name);
1458 		return NULL;
1459 	}
1460 
1461 	return (struct spdk_lvol *)bdev->ctxt;
1462 }
1463 
1464 static void
1465 _vbdev_lvs_grow_finish(void *arg, int lvserrno)
1466 {
1467 	struct spdk_lvs_grow_req *req = arg;
1468 	req->cb_fn(req->cb_arg, req->lvserrno);
1469 	free(req);
1470 }
1471 
1472 static void
1473 _vbdev_lvs_grow_examine_finish(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1474 {
1475 	struct spdk_lvs_grow_req *req = cb_arg;
1476 	struct spdk_lvol_store *lvs = req->lvol_store;
1477 
1478 	if (lvolerrno != 0) {
1479 		SPDK_ERRLOG("Error opening lvol %s\n", lvol->unique_id);
1480 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
1481 		assert(lvs->lvol_count > 0);
1482 		lvs->lvol_count--;
1483 		free(lvol);
1484 		goto end;
1485 	}
1486 
1487 	if (_create_lvol_disk(lvol, false)) {
1488 		SPDK_ERRLOG("Cannot create bdev for lvol %s\n", lvol->unique_id);
1489 		assert(lvs->lvol_count > 0);
1490 		lvs->lvol_count--;
1491 		SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s failed\n", lvol->unique_id);
1492 		goto end;
1493 	}
1494 
1495 	lvs->lvols_opened++;
1496 	SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s succeeded\n", lvol->unique_id);
1497 
1498 end:
1499 	if (lvs->lvols_opened >= lvs->lvol_count) {
1500 		SPDK_INFOLOG(vbdev_lvol, "Opening lvols finished\n");
1501 		_vbdev_lvs_grow_finish(req, 0);
1502 	}
1503 }
1504 
1505 static void
1506 _vbdev_lvs_grow_examine_cb(void *arg, struct spdk_lvol_store *lvol_store, int lvserrno)
1507 {
1508 	struct lvol_store_bdev *lvs_bdev;
1509 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)arg;
1510 	struct spdk_lvol *lvol, *tmp;
1511 	struct spdk_lvs_grow_req *ori_req = req->cb_arg;
1512 
1513 	if (lvserrno == -EEXIST) {
1514 		SPDK_INFOLOG(vbdev_lvol,
1515 			     "Name for lvolstore on device %s conflicts with name for already loaded lvs\n",
1516 			     req->base_bdev->name);
1517 		ori_req->lvserrno = lvserrno;
1518 		_vbdev_lvs_grow_finish(ori_req, lvserrno);
1519 		goto end;
1520 	} else if (lvserrno != 0) {
1521 		SPDK_INFOLOG(vbdev_lvol, "Lvol store not found on %s\n", req->base_bdev->name);
1522 		/* On error blobstore destroys bs_dev itself */
1523 		ori_req->lvserrno = lvserrno;
1524 		_vbdev_lvs_grow_finish(ori_req, lvserrno);
1525 		goto end;
1526 	}
1527 
1528 	lvserrno = spdk_bs_bdev_claim(lvol_store->bs_dev, &g_lvol_if);
1529 	if (lvserrno != 0) {
1530 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
1531 		ori_req->lvserrno = lvserrno;
1532 		spdk_lvs_unload(lvol_store, _vbdev_lvs_grow_finish, ori_req);
1533 		goto end;
1534 	}
1535 
1536 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
1537 	if (!lvs_bdev) {
1538 		SPDK_ERRLOG("Cannot alloc memory for lvs_bdev\n");
1539 		ori_req->lvserrno = -ENOMEM;
1540 		spdk_lvs_unload(lvol_store, _vbdev_lvs_grow_finish, ori_req);
1541 		goto end;
1542 	}
1543 
1544 	lvs_bdev->lvs = lvol_store;
1545 	lvs_bdev->bdev = req->base_bdev;
1546 
1547 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1548 	SPDK_INFOLOG(vbdev_lvol, "Lvol store found on %s - begin parsing\n",
1549 		     req->base_bdev->name);
1550 
1551 	lvol_store->lvols_opened = 0;
1552 
1553 	ori_req->lvol_store = lvol_store;
1554 	ori_req->lvserrno = 0;
1555 	if (TAILQ_EMPTY(&lvol_store->lvols)) {
1556 		SPDK_INFOLOG(vbdev_lvol, "Lvol store examination done\n");
1557 		_vbdev_lvs_grow_finish(ori_req, 0);
1558 	} else {
1559 		/* Open all lvols */
1560 		TAILQ_FOREACH_SAFE(lvol, &lvol_store->lvols, link, tmp) {
1561 			spdk_lvol_open(lvol, _vbdev_lvs_grow_examine_finish, ori_req);
1562 		}
1563 	}
1564 
1565 end:
1566 	free(req);
1567 }
1568 
1569 static void
1570 _vbdev_lvs_grow_examine(struct spdk_bdev *bdev, struct spdk_lvs_grow_req *ori_req)
1571 {
1572 	struct spdk_bs_dev *bs_dev;
1573 	struct spdk_lvs_with_handle_req *req;
1574 	int rc;
1575 
1576 	req = calloc(1, sizeof(*req));
1577 	if (req == NULL) {
1578 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1579 		ori_req->lvserrno = -ENOMEM;
1580 		_vbdev_lvs_grow_finish(ori_req, -ENOMEM);
1581 		return;
1582 	}
1583 
1584 	rc = spdk_bdev_create_bs_dev_ext(bdev->name, vbdev_lvs_base_bdev_event_cb,
1585 					 NULL, &bs_dev);
1586 	if (rc < 0) {
1587 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n", bdev->name);
1588 		ori_req->lvserrno = rc;
1589 		_vbdev_lvs_grow_finish(ori_req, rc);
1590 		free(req);
1591 		return;
1592 	}
1593 
1594 	req->base_bdev = bdev;
1595 	req->cb_arg = ori_req;
1596 
1597 	spdk_lvs_grow(bs_dev, _vbdev_lvs_grow_examine_cb, req);
1598 }
1599 
1600 static void
1601 _vbdev_lvs_grow_unload_cb(void *cb_arg, int lvserrno)
1602 {
1603 	struct spdk_lvs_grow_req *req = cb_arg;
1604 	struct lvol_store_bdev *lvs_bdev;
1605 	struct spdk_bdev *bdev;
1606 
1607 	if (lvserrno != 0) {
1608 		req->cb_fn(req->cb_arg, lvserrno);
1609 		free(req);
1610 		return;
1611 	}
1612 
1613 	lvs_bdev = req->lvs_bdev;
1614 	bdev = lvs_bdev->bdev;
1615 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1616 	_vbdev_lvs_grow_examine(bdev, req);
1617 	free(lvs_bdev);
1618 }
1619 
1620 static void
1621 _vbdev_lvs_grow_remove_bdev_unregistered_cb(void *cb_arg, int bdeverrno)
1622 {
1623 	struct spdk_lvs_grow_req *req = cb_arg;
1624 	struct spdk_lvol_store *lvs = req->lvol_store;
1625 
1626 	if (bdeverrno != 0) {
1627 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol unregistered with errno %d\n", bdeverrno);
1628 	}
1629 
1630 	req->lvol_cnt--;
1631 
1632 	if (req->lvol_cnt == 0) {
1633 		/* Lvol store can be unloaded once all lvols are closed. */
1634 		if (_vbdev_lvs_are_lvols_closed(lvs)) {
1635 			spdk_lvs_unload(lvs, _vbdev_lvs_grow_unload_cb, req);
1636 		} else {
1637 			req->cb_fn(req->cb_arg, -EINVAL);
1638 			free(req);
1639 		}
1640 	}
1641 }
1642 
1643 void
1644 vbdev_lvs_grow(struct spdk_lvol_store *lvs,
1645 	       spdk_lvs_op_complete cb_fn, void *cb_arg)
1646 {
1647 	struct spdk_lvs_grow_req *req;
1648 	struct spdk_lvol *lvol, *tmp;
1649 
1650 	req = calloc(1, sizeof(*req));
1651 	if (!req) {
1652 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1653 		cb_fn(cb_arg, -ENOMEM);
1654 		return;
1655 	}
1656 	req->cb_fn = cb_fn;
1657 	req->cb_arg = cb_arg;
1658 	req->lvol_store = lvs;
1659 	req->lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
1660 
1661 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
1662 		spdk_lvs_unload(lvs, _vbdev_lvs_grow_unload_cb, req);
1663 	} else {
1664 		lvs->destruct = false;
1665 		TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
1666 			req->lvol_cnt++;
1667 			spdk_bdev_unregister(lvol->bdev, _vbdev_lvs_grow_remove_bdev_unregistered_cb, req);
1668 		}
1669 		assert(req->lvol_cnt > 0);
1670 	}
1671 }
1672 
1673 SPDK_LOG_REGISTER_COMPONENT(vbdev_lvol)
1674