xref: /spdk/module/bdev/lvol/vbdev_lvol.c (revision f6866117acb32c78d5ea7bd76ba330284655af35)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/blob_bdev.h"
8 #include "spdk/rpc.h"
9 #include "spdk/bdev_module.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/uuid.h"
13 
14 #include "vbdev_lvol.h"
15 
16 struct vbdev_lvol_io {
17 	struct spdk_blob_ext_io_opts ext_io_opts;
18 };
19 
20 static TAILQ_HEAD(, lvol_store_bdev) g_spdk_lvol_pairs = TAILQ_HEAD_INITIALIZER(
21 			g_spdk_lvol_pairs);
22 
23 static int vbdev_lvs_init(void);
24 static void vbdev_lvs_fini_start(void);
25 static int vbdev_lvs_get_ctx_size(void);
26 static void vbdev_lvs_examine(struct spdk_bdev *bdev);
27 static bool g_shutdown_started = false;
28 
29 static struct spdk_bdev_module g_lvol_if = {
30 	.name = "lvol",
31 	.module_init = vbdev_lvs_init,
32 	.fini_start = vbdev_lvs_fini_start,
33 	.async_fini_start = true,
34 	.examine_disk = vbdev_lvs_examine,
35 	.get_ctx_size = vbdev_lvs_get_ctx_size,
36 
37 };
38 
39 SPDK_BDEV_MODULE_REGISTER(lvol, &g_lvol_if)
40 
41 struct lvol_store_bdev *
42 vbdev_get_lvs_bdev_by_lvs(struct spdk_lvol_store *lvs_orig)
43 {
44 	struct spdk_lvol_store *lvs = NULL;
45 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
46 
47 	while (lvs_bdev != NULL) {
48 		lvs = lvs_bdev->lvs;
49 		if (lvs == lvs_orig) {
50 			if (lvs_bdev->req != NULL) {
51 				/* We do not allow access to lvs that are being destroyed */
52 				return NULL;
53 			} else {
54 				return lvs_bdev;
55 			}
56 		}
57 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
58 	}
59 
60 	return NULL;
61 }
62 
63 static int
64 _vbdev_lvol_change_bdev_alias(struct spdk_lvol *lvol, const char *new_lvol_name)
65 {
66 	struct spdk_bdev_alias *tmp;
67 	char *old_alias;
68 	char *alias;
69 	int rc;
70 	int alias_number = 0;
71 
72 	/* bdev representing lvols have only one alias,
73 	 * while we changed lvs name earlier, we have to iterate alias list to get one,
74 	 * and check if there is only one alias */
75 
76 	TAILQ_FOREACH(tmp, spdk_bdev_get_aliases(lvol->bdev), tailq) {
77 		if (++alias_number > 1) {
78 			SPDK_ERRLOG("There is more than 1 alias in bdev %s\n", lvol->bdev->name);
79 			return -EINVAL;
80 		}
81 
82 		old_alias = tmp->alias.name;
83 	}
84 
85 	if (alias_number == 0) {
86 		SPDK_ERRLOG("There are no aliases in bdev %s\n", lvol->bdev->name);
87 		return -EINVAL;
88 	}
89 
90 	alias = spdk_sprintf_alloc("%s/%s", lvol->lvol_store->name, new_lvol_name);
91 	if (alias == NULL) {
92 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
93 		return -ENOMEM;
94 	}
95 
96 	rc = spdk_bdev_alias_add(lvol->bdev, alias);
97 	if (rc != 0) {
98 		SPDK_ERRLOG("cannot add alias '%s'\n", alias);
99 		free(alias);
100 		return rc;
101 	}
102 	free(alias);
103 
104 	rc = spdk_bdev_alias_del(lvol->bdev, old_alias);
105 	if (rc != 0) {
106 		SPDK_ERRLOG("cannot remove alias '%s'\n", old_alias);
107 		return rc;
108 	}
109 
110 	return 0;
111 }
112 
113 static struct lvol_store_bdev *
114 vbdev_get_lvs_bdev_by_bdev(struct spdk_bdev *bdev_orig)
115 {
116 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
117 
118 	while (lvs_bdev != NULL) {
119 		if (lvs_bdev->bdev == bdev_orig) {
120 			if (lvs_bdev->req != NULL) {
121 				/* We do not allow access to lvs that are being destroyed */
122 				return NULL;
123 			} else {
124 				return lvs_bdev;
125 			}
126 		}
127 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
128 	}
129 
130 	return NULL;
131 }
132 
133 static void
134 vbdev_lvs_hotremove_cb(struct spdk_bdev *bdev)
135 {
136 	struct lvol_store_bdev *lvs_bdev;
137 
138 	lvs_bdev = vbdev_get_lvs_bdev_by_bdev(bdev);
139 	if (lvs_bdev != NULL) {
140 		SPDK_NOTICELOG("bdev %s being removed: closing lvstore %s\n",
141 			       spdk_bdev_get_name(bdev), lvs_bdev->lvs->name);
142 		vbdev_lvs_unload(lvs_bdev->lvs, NULL, NULL);
143 	}
144 }
145 
146 static void
147 vbdev_lvs_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
148 			     void *event_ctx)
149 {
150 	switch (type) {
151 	case SPDK_BDEV_EVENT_REMOVE:
152 		vbdev_lvs_hotremove_cb(bdev);
153 		break;
154 	default:
155 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
156 		break;
157 	}
158 }
159 
160 static void
161 _vbdev_lvs_create_cb(void *cb_arg, struct spdk_lvol_store *lvs, int lvserrno)
162 {
163 	struct spdk_lvs_with_handle_req *req = cb_arg;
164 	struct lvol_store_bdev *lvs_bdev;
165 	struct spdk_bdev *bdev = req->base_bdev;
166 	struct spdk_bs_dev *bs_dev = req->bs_dev;
167 
168 	if (lvserrno != 0) {
169 		assert(lvs == NULL);
170 		SPDK_ERRLOG("Cannot create lvol store bdev\n");
171 		goto end;
172 	}
173 
174 	lvserrno = spdk_bs_bdev_claim(bs_dev, &g_lvol_if);
175 	if (lvserrno != 0) {
176 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
177 		req->bs_dev->destroy(req->bs_dev);
178 		goto end;
179 	}
180 
181 	assert(lvs != NULL);
182 
183 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
184 	if (!lvs_bdev) {
185 		lvserrno = -ENOMEM;
186 		goto end;
187 	}
188 	lvs_bdev->lvs = lvs;
189 	lvs_bdev->bdev = bdev;
190 	lvs_bdev->req = NULL;
191 
192 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
193 	SPDK_INFOLOG(vbdev_lvol, "Lvol store bdev inserted\n");
194 
195 end:
196 	req->cb_fn(req->cb_arg, lvs, lvserrno);
197 	free(req);
198 
199 	return;
200 }
201 
202 int
203 vbdev_lvs_create(const char *base_bdev_name, const char *name, uint32_t cluster_sz,
204 		 enum lvs_clear_method clear_method, uint32_t num_md_pages_per_cluster_ratio,
205 		 spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
206 {
207 	struct spdk_bs_dev *bs_dev;
208 	struct spdk_lvs_with_handle_req *lvs_req;
209 	struct spdk_lvs_opts opts;
210 	int rc;
211 	int len;
212 
213 	if (base_bdev_name == NULL) {
214 		SPDK_ERRLOG("missing base_bdev_name param\n");
215 		return -EINVAL;
216 	}
217 
218 	spdk_lvs_opts_init(&opts);
219 	if (cluster_sz != 0) {
220 		opts.cluster_sz = cluster_sz;
221 	}
222 
223 	if (clear_method != 0) {
224 		opts.clear_method = clear_method;
225 	}
226 
227 	if (num_md_pages_per_cluster_ratio != 0) {
228 		opts.num_md_pages_per_cluster_ratio = num_md_pages_per_cluster_ratio;
229 	}
230 
231 	if (name == NULL) {
232 		SPDK_ERRLOG("missing name param\n");
233 		return -EINVAL;
234 	}
235 
236 	len = strnlen(name, SPDK_LVS_NAME_MAX);
237 
238 	if (len == 0 || len == SPDK_LVS_NAME_MAX) {
239 		SPDK_ERRLOG("name must be between 1 and %d characters\n", SPDK_LVS_NAME_MAX - 1);
240 		return -EINVAL;
241 	}
242 	snprintf(opts.name, sizeof(opts.name), "%s", name);
243 
244 	lvs_req = calloc(1, sizeof(*lvs_req));
245 	if (!lvs_req) {
246 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
247 		return -ENOMEM;
248 	}
249 
250 	rc = spdk_bdev_create_bs_dev_ext(base_bdev_name, vbdev_lvs_base_bdev_event_cb,
251 					 NULL, &bs_dev);
252 	if (rc < 0) {
253 		SPDK_ERRLOG("Cannot create blobstore device\n");
254 		free(lvs_req);
255 		return rc;
256 	}
257 
258 	lvs_req->bs_dev = bs_dev;
259 	lvs_req->base_bdev = bs_dev->get_base_bdev(bs_dev);
260 	lvs_req->cb_fn = cb_fn;
261 	lvs_req->cb_arg = cb_arg;
262 
263 	rc = spdk_lvs_init(bs_dev, &opts, _vbdev_lvs_create_cb, lvs_req);
264 	if (rc < 0) {
265 		free(lvs_req);
266 		bs_dev->destroy(bs_dev);
267 		return rc;
268 	}
269 
270 	return 0;
271 }
272 
273 static void
274 _vbdev_lvs_rename_cb(void *cb_arg, int lvserrno)
275 {
276 	struct spdk_lvs_req *req = cb_arg;
277 	struct spdk_lvol *tmp;
278 
279 	if (lvserrno != 0) {
280 		SPDK_INFOLOG(vbdev_lvol, "Lvol store rename failed\n");
281 	} else {
282 		TAILQ_FOREACH(tmp, &req->lvol_store->lvols, link) {
283 			/* We have to pass current lvol name, since only lvs name changed */
284 			_vbdev_lvol_change_bdev_alias(tmp, tmp->name);
285 		}
286 	}
287 
288 	req->cb_fn(req->cb_arg, lvserrno);
289 	free(req);
290 }
291 
292 void
293 vbdev_lvs_rename(struct spdk_lvol_store *lvs, const char *new_lvs_name,
294 		 spdk_lvs_op_complete cb_fn, void *cb_arg)
295 {
296 	struct lvol_store_bdev *lvs_bdev;
297 
298 	struct spdk_lvs_req *req;
299 
300 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
301 	if (!lvs_bdev) {
302 		SPDK_ERRLOG("No such lvol store found\n");
303 		cb_fn(cb_arg, -ENODEV);
304 		return;
305 	}
306 
307 	req = calloc(1, sizeof(*req));
308 	if (!req) {
309 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
310 		cb_fn(cb_arg, -ENOMEM);
311 		return;
312 	}
313 	req->cb_fn = cb_fn;
314 	req->cb_arg = cb_arg;
315 	req->lvol_store = lvs;
316 
317 	spdk_lvs_rename(lvs, new_lvs_name, _vbdev_lvs_rename_cb, req);
318 }
319 
320 static void
321 _vbdev_lvs_remove_cb(void *cb_arg, int lvserrno)
322 {
323 	struct lvol_store_bdev *lvs_bdev = cb_arg;
324 	struct spdk_lvs_req *req = lvs_bdev->req;
325 
326 	if (lvserrno != 0) {
327 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
328 	}
329 
330 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
331 	free(lvs_bdev);
332 
333 	if (req->cb_fn != NULL) {
334 		req->cb_fn(req->cb_arg, lvserrno);
335 	}
336 	free(req);
337 }
338 
339 static void
340 _vbdev_lvs_remove_lvol_cb(void *cb_arg, int lvolerrno)
341 {
342 	struct lvol_store_bdev *lvs_bdev = cb_arg;
343 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
344 	struct spdk_lvol *lvol;
345 
346 	if (lvolerrno != 0) {
347 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol removed with errno %d\n", lvolerrno);
348 	}
349 
350 	if (TAILQ_EMPTY(&lvs->lvols)) {
351 		spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
352 		return;
353 	}
354 
355 	lvol = TAILQ_FIRST(&lvs->lvols);
356 	while (lvol != NULL) {
357 		if (spdk_lvol_deletable(lvol)) {
358 			vbdev_lvol_destroy(lvol, _vbdev_lvs_remove_lvol_cb, lvs_bdev);
359 			return;
360 		}
361 		lvol = TAILQ_NEXT(lvol, link);
362 	}
363 
364 	/* If no lvol is deletable, that means there is circular dependency. */
365 	SPDK_ERRLOG("Lvols left in lvs, but unable to delete.\n");
366 	assert(false);
367 }
368 
369 static bool
370 _vbdev_lvs_are_lvols_closed(struct spdk_lvol_store *lvs)
371 {
372 	struct spdk_lvol *lvol;
373 
374 	TAILQ_FOREACH(lvol, &lvs->lvols, link) {
375 		if (lvol->ref_count != 0) {
376 			return false;
377 		}
378 	}
379 	return true;
380 }
381 
382 static void
383 _vbdev_lvs_remove_bdev_unregistered_cb(void *cb_arg, int bdeverrno)
384 {
385 	struct lvol_store_bdev *lvs_bdev = cb_arg;
386 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
387 
388 	if (bdeverrno != 0) {
389 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol unregistered with errno %d\n", bdeverrno);
390 	}
391 
392 	/* Lvol store can be unloaded once all lvols are closed. */
393 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
394 		spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
395 	}
396 }
397 
398 static void
399 _vbdev_lvs_remove(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg,
400 		  bool destroy)
401 {
402 	struct spdk_lvs_req *req;
403 	struct lvol_store_bdev *lvs_bdev;
404 	struct spdk_lvol *lvol, *tmp;
405 
406 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
407 	if (!lvs_bdev) {
408 		SPDK_ERRLOG("No such lvol store found\n");
409 		if (cb_fn != NULL) {
410 			cb_fn(cb_arg, -ENODEV);
411 		}
412 		return;
413 	}
414 
415 	req = calloc(1, sizeof(*req));
416 	if (!req) {
417 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
418 		if (cb_fn != NULL) {
419 			cb_fn(cb_arg, -ENOMEM);
420 		}
421 		return;
422 	}
423 
424 	req->cb_fn = cb_fn;
425 	req->cb_arg = cb_arg;
426 	lvs_bdev->req = req;
427 
428 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
429 		if (destroy) {
430 			spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
431 		} else {
432 			spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
433 		}
434 	} else {
435 		if (destroy) {
436 			_vbdev_lvs_remove_lvol_cb(lvs_bdev, 0);
437 		} else {
438 			TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
439 				spdk_bdev_unregister(lvol->bdev, _vbdev_lvs_remove_bdev_unregistered_cb, lvs_bdev);
440 			}
441 		}
442 	}
443 }
444 
445 void
446 vbdev_lvs_unload(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
447 {
448 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, false);
449 }
450 
451 void
452 vbdev_lvs_destruct(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
453 {
454 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, true);
455 }
456 
457 struct lvol_store_bdev *
458 vbdev_lvol_store_first(void)
459 {
460 	struct lvol_store_bdev *lvs_bdev;
461 
462 	lvs_bdev = TAILQ_FIRST(&g_spdk_lvol_pairs);
463 	if (lvs_bdev) {
464 		SPDK_INFOLOG(vbdev_lvol, "Starting lvolstore iteration at %p\n", lvs_bdev->lvs);
465 	}
466 
467 	return lvs_bdev;
468 }
469 
470 struct lvol_store_bdev *
471 vbdev_lvol_store_next(struct lvol_store_bdev *prev)
472 {
473 	struct lvol_store_bdev *lvs_bdev;
474 
475 	if (prev == NULL) {
476 		SPDK_ERRLOG("prev argument cannot be NULL\n");
477 		return NULL;
478 	}
479 
480 	lvs_bdev = TAILQ_NEXT(prev, lvol_stores);
481 	if (lvs_bdev) {
482 		SPDK_INFOLOG(vbdev_lvol, "Continuing lvolstore iteration at %p\n", lvs_bdev->lvs);
483 	}
484 
485 	return lvs_bdev;
486 }
487 
488 static struct spdk_lvol_store *
489 _vbdev_get_lvol_store_by_uuid(const struct spdk_uuid *uuid)
490 {
491 	struct spdk_lvol_store *lvs = NULL;
492 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
493 
494 	while (lvs_bdev != NULL) {
495 		lvs = lvs_bdev->lvs;
496 		if (spdk_uuid_compare(&lvs->uuid, uuid) == 0) {
497 			return lvs;
498 		}
499 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
500 	}
501 	return NULL;
502 }
503 
504 struct spdk_lvol_store *
505 vbdev_get_lvol_store_by_uuid(const char *uuid_str)
506 {
507 	struct spdk_uuid uuid;
508 
509 	if (spdk_uuid_parse(&uuid, uuid_str)) {
510 		return NULL;
511 	}
512 
513 	return _vbdev_get_lvol_store_by_uuid(&uuid);
514 }
515 
516 struct spdk_lvol_store *
517 vbdev_get_lvol_store_by_name(const char *name)
518 {
519 	struct spdk_lvol_store *lvs = NULL;
520 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
521 
522 	while (lvs_bdev != NULL) {
523 		lvs = lvs_bdev->lvs;
524 		if (strncmp(lvs->name, name, sizeof(lvs->name)) == 0) {
525 			return lvs;
526 		}
527 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
528 	}
529 	return NULL;
530 }
531 
532 struct vbdev_lvol_destroy_ctx {
533 	struct spdk_lvol *lvol;
534 	spdk_lvol_op_complete cb_fn;
535 	void *cb_arg;
536 };
537 
538 static void
539 _vbdev_lvol_unregister_unload_lvs(void *cb_arg, int lvserrno)
540 {
541 	struct lvol_bdev *lvol_bdev = cb_arg;
542 	struct lvol_store_bdev *lvs_bdev = lvol_bdev->lvs_bdev;
543 
544 	if (lvserrno != 0) {
545 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
546 	}
547 
548 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
549 	free(lvs_bdev);
550 
551 	spdk_bdev_destruct_done(&lvol_bdev->bdev, lvserrno);
552 	free(lvol_bdev);
553 }
554 
555 static void
556 _vbdev_lvol_unregister_cb(void *ctx, int lvolerrno)
557 {
558 	struct lvol_bdev *lvol_bdev = ctx;
559 	struct lvol_store_bdev *lvs_bdev = lvol_bdev->lvs_bdev;
560 
561 	if (g_shutdown_started && _vbdev_lvs_are_lvols_closed(lvs_bdev->lvs)) {
562 		spdk_lvs_unload(lvs_bdev->lvs, _vbdev_lvol_unregister_unload_lvs, lvol_bdev);
563 		return;
564 	}
565 
566 	spdk_bdev_destruct_done(&lvol_bdev->bdev, lvolerrno);
567 	free(lvol_bdev);
568 }
569 
570 static int
571 vbdev_lvol_unregister(void *ctx)
572 {
573 	struct spdk_lvol *lvol = ctx;
574 	struct lvol_bdev *lvol_bdev;
575 
576 	assert(lvol != NULL);
577 	lvol_bdev = SPDK_CONTAINEROF(lvol->bdev, struct lvol_bdev, bdev);
578 
579 	spdk_bdev_alias_del_all(lvol->bdev);
580 	spdk_lvol_close(lvol, _vbdev_lvol_unregister_cb, lvol_bdev);
581 
582 	/* return 1 to indicate we have an operation that must finish asynchronously before the
583 	 *  lvol is closed
584 	 */
585 	return 1;
586 }
587 
588 static void
589 _vbdev_lvol_destroy_cb(void *cb_arg, int bdeverrno)
590 {
591 	struct vbdev_lvol_destroy_ctx *ctx = cb_arg;
592 	struct spdk_lvol *lvol = ctx->lvol;
593 
594 	if (bdeverrno < 0) {
595 		SPDK_INFOLOG(vbdev_lvol, "Could not unregister bdev during lvol (%s) destroy\n",
596 			     lvol->unique_id);
597 		ctx->cb_fn(ctx->cb_arg, bdeverrno);
598 		free(ctx);
599 		return;
600 	}
601 
602 	spdk_lvol_destroy(lvol, ctx->cb_fn, ctx->cb_arg);
603 	free(ctx);
604 }
605 
606 void
607 vbdev_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
608 {
609 	struct vbdev_lvol_destroy_ctx *ctx;
610 	size_t count;
611 
612 	assert(lvol != NULL);
613 	assert(cb_fn != NULL);
614 
615 	/* Check if it is possible to delete lvol */
616 	spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
617 	if (count > 1) {
618 		/* throw an error */
619 		SPDK_ERRLOG("Cannot delete lvol\n");
620 		cb_fn(cb_arg, -EPERM);
621 		return;
622 	}
623 
624 	ctx = calloc(1, sizeof(*ctx));
625 	if (!ctx) {
626 		cb_fn(cb_arg, -ENOMEM);
627 		return;
628 	}
629 
630 	ctx->lvol = lvol;
631 	ctx->cb_fn = cb_fn;
632 	ctx->cb_arg = cb_arg;
633 
634 	spdk_bdev_unregister(lvol->bdev, _vbdev_lvol_destroy_cb, ctx);
635 }
636 
637 static char *
638 vbdev_lvol_find_name(struct spdk_lvol *lvol, spdk_blob_id blob_id)
639 {
640 	struct spdk_lvol_store *lvs;
641 	struct spdk_lvol *_lvol;
642 
643 	assert(lvol != NULL);
644 
645 	lvs = lvol->lvol_store;
646 
647 	assert(lvs);
648 
649 	TAILQ_FOREACH(_lvol, &lvs->lvols, link) {
650 		if (_lvol->blob_id == blob_id) {
651 			return _lvol->name;
652 		}
653 	}
654 
655 	return NULL;
656 }
657 
658 static int
659 vbdev_lvol_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
660 {
661 	struct spdk_lvol *lvol = ctx;
662 	struct lvol_store_bdev *lvs_bdev;
663 	struct spdk_bdev *bdev;
664 	struct spdk_blob *blob;
665 	char lvol_store_uuid[SPDK_UUID_STRING_LEN];
666 	spdk_blob_id *ids = NULL;
667 	size_t count, i;
668 	char *name;
669 	int rc = 0;
670 
671 	spdk_json_write_named_object_begin(w, "lvol");
672 
673 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
674 	if (!lvs_bdev) {
675 		SPDK_ERRLOG("No such lvol store found\n");
676 		rc = -ENODEV;
677 		goto end;
678 	}
679 
680 	bdev = lvs_bdev->bdev;
681 
682 	spdk_uuid_fmt_lower(lvol_store_uuid, sizeof(lvol_store_uuid), &lvol->lvol_store->uuid);
683 	spdk_json_write_named_string(w, "lvol_store_uuid", lvol_store_uuid);
684 
685 	spdk_json_write_named_string(w, "base_bdev", spdk_bdev_get_name(bdev));
686 
687 	blob = lvol->blob;
688 
689 	spdk_json_write_named_bool(w, "thin_provision", spdk_blob_is_thin_provisioned(blob));
690 
691 	spdk_json_write_named_bool(w, "snapshot", spdk_blob_is_snapshot(blob));
692 
693 	spdk_json_write_named_bool(w, "clone", spdk_blob_is_clone(blob));
694 
695 	if (spdk_blob_is_clone(blob)) {
696 		spdk_blob_id snapshotid = spdk_blob_get_parent_snapshot(lvol->lvol_store->blobstore, lvol->blob_id);
697 		if (snapshotid != SPDK_BLOBID_INVALID) {
698 			name = vbdev_lvol_find_name(lvol, snapshotid);
699 			if (name != NULL) {
700 				spdk_json_write_named_string(w, "base_snapshot", name);
701 			} else {
702 				SPDK_ERRLOG("Cannot obtain snapshots name\n");
703 			}
704 		}
705 	}
706 
707 	if (spdk_blob_is_snapshot(blob)) {
708 		/* Take a number of clones */
709 		rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
710 		if (rc == -ENOMEM && count > 0) {
711 			ids = malloc(sizeof(spdk_blob_id) * count);
712 			if (ids == NULL) {
713 				SPDK_ERRLOG("Cannot allocate memory\n");
714 				rc = -ENOMEM;
715 				goto end;
716 			}
717 
718 			rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, ids, &count);
719 			if (rc == 0) {
720 				spdk_json_write_named_array_begin(w, "clones");
721 				for (i = 0; i < count; i++) {
722 					name = vbdev_lvol_find_name(lvol, ids[i]);
723 					if (name != NULL) {
724 						spdk_json_write_string(w, name);
725 					} else {
726 						SPDK_ERRLOG("Cannot obtain clone name\n");
727 					}
728 
729 				}
730 				spdk_json_write_array_end(w);
731 			}
732 			free(ids);
733 		}
734 
735 	}
736 
737 end:
738 	spdk_json_write_object_end(w);
739 
740 	return rc;
741 }
742 
743 static void
744 vbdev_lvol_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
745 {
746 	/* Nothing to dump as lvol configuration is saved on physical device. */
747 }
748 
749 static struct spdk_io_channel *
750 vbdev_lvol_get_io_channel(void *ctx)
751 {
752 	struct spdk_lvol *lvol = ctx;
753 
754 	return spdk_lvol_get_io_channel(lvol);
755 }
756 
757 static bool
758 vbdev_lvol_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
759 {
760 	struct spdk_lvol *lvol = ctx;
761 
762 	switch (io_type) {
763 	case SPDK_BDEV_IO_TYPE_WRITE:
764 	case SPDK_BDEV_IO_TYPE_UNMAP:
765 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
766 		return !spdk_blob_is_read_only(lvol->blob);
767 	case SPDK_BDEV_IO_TYPE_RESET:
768 	case SPDK_BDEV_IO_TYPE_READ:
769 	case SPDK_BDEV_IO_TYPE_SEEK_DATA:
770 	case SPDK_BDEV_IO_TYPE_SEEK_HOLE:
771 		return true;
772 	default:
773 		return false;
774 	}
775 }
776 
777 static void
778 lvol_op_comp(void *cb_arg, int bserrno)
779 {
780 	struct spdk_bdev_io *bdev_io = cb_arg;
781 	enum spdk_bdev_io_status status = SPDK_BDEV_IO_STATUS_SUCCESS;
782 
783 	if (bserrno != 0) {
784 		if (bserrno == -ENOMEM) {
785 			status = SPDK_BDEV_IO_STATUS_NOMEM;
786 		} else {
787 			status = SPDK_BDEV_IO_STATUS_FAILED;
788 		}
789 	}
790 
791 	spdk_bdev_io_complete(bdev_io, status);
792 }
793 
794 static void
795 lvol_unmap(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
796 {
797 	uint64_t start_page, num_pages;
798 	struct spdk_blob *blob = lvol->blob;
799 
800 	start_page = bdev_io->u.bdev.offset_blocks;
801 	num_pages = bdev_io->u.bdev.num_blocks;
802 
803 	spdk_blob_io_unmap(blob, ch, start_page, num_pages, lvol_op_comp, bdev_io);
804 }
805 
806 static void
807 lvol_seek_data(struct spdk_lvol *lvol, struct spdk_bdev_io *bdev_io)
808 {
809 	bdev_io->u.bdev.seek.offset = spdk_blob_get_next_allocated_io_unit(lvol->blob,
810 				      bdev_io->u.bdev.offset_blocks);
811 
812 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
813 }
814 
815 static void
816 lvol_seek_hole(struct spdk_lvol *lvol, struct spdk_bdev_io *bdev_io)
817 {
818 	bdev_io->u.bdev.seek.offset = spdk_blob_get_next_unallocated_io_unit(lvol->blob,
819 				      bdev_io->u.bdev.offset_blocks);
820 
821 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
822 }
823 
824 static void
825 lvol_write_zeroes(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
826 {
827 	uint64_t start_page, num_pages;
828 	struct spdk_blob *blob = lvol->blob;
829 
830 	start_page = bdev_io->u.bdev.offset_blocks;
831 	num_pages = bdev_io->u.bdev.num_blocks;
832 
833 	spdk_blob_io_write_zeroes(blob, ch, start_page, num_pages, lvol_op_comp, bdev_io);
834 }
835 
836 static void
837 lvol_read(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
838 {
839 	uint64_t start_page, num_pages;
840 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
841 	struct spdk_blob *blob = lvol->blob;
842 	struct vbdev_lvol_io *lvol_io = (struct vbdev_lvol_io *)bdev_io->driver_ctx;
843 
844 	start_page = bdev_io->u.bdev.offset_blocks;
845 	num_pages = bdev_io->u.bdev.num_blocks;
846 
847 	lvol_io->ext_io_opts.size = sizeof(lvol_io->ext_io_opts);
848 	lvol_io->ext_io_opts.memory_domain = bdev_io->u.bdev.memory_domain;
849 	lvol_io->ext_io_opts.memory_domain_ctx = bdev_io->u.bdev.memory_domain_ctx;
850 
851 	spdk_blob_io_readv_ext(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
852 			       num_pages, lvol_op_comp, bdev_io, &lvol_io->ext_io_opts);
853 }
854 
855 static void
856 lvol_write(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
857 {
858 	uint64_t start_page, num_pages;
859 	struct spdk_blob *blob = lvol->blob;
860 	struct vbdev_lvol_io *lvol_io = (struct vbdev_lvol_io *)bdev_io->driver_ctx;
861 
862 	start_page = bdev_io->u.bdev.offset_blocks;
863 	num_pages = bdev_io->u.bdev.num_blocks;
864 
865 	lvol_io->ext_io_opts.size = sizeof(lvol_io->ext_io_opts);
866 	lvol_io->ext_io_opts.memory_domain = bdev_io->u.bdev.memory_domain;
867 	lvol_io->ext_io_opts.memory_domain_ctx = bdev_io->u.bdev.memory_domain_ctx;
868 
869 	spdk_blob_io_writev_ext(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
870 				num_pages, lvol_op_comp, bdev_io, &lvol_io->ext_io_opts);
871 }
872 
873 static int
874 lvol_reset(struct spdk_bdev_io *bdev_io)
875 {
876 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
877 
878 	return 0;
879 }
880 
881 static void
882 lvol_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
883 {
884 	if (!success) {
885 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
886 		return;
887 	}
888 
889 	lvol_read(ch, bdev_io);
890 }
891 
892 static void
893 vbdev_lvol_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
894 {
895 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
896 
897 	switch (bdev_io->type) {
898 	case SPDK_BDEV_IO_TYPE_READ:
899 		spdk_bdev_io_get_buf(bdev_io, lvol_get_buf_cb,
900 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
901 		break;
902 	case SPDK_BDEV_IO_TYPE_WRITE:
903 		lvol_write(lvol, ch, bdev_io);
904 		break;
905 	case SPDK_BDEV_IO_TYPE_RESET:
906 		lvol_reset(bdev_io);
907 		break;
908 	case SPDK_BDEV_IO_TYPE_UNMAP:
909 		lvol_unmap(lvol, ch, bdev_io);
910 		break;
911 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
912 		lvol_write_zeroes(lvol, ch, bdev_io);
913 		break;
914 	case SPDK_BDEV_IO_TYPE_SEEK_DATA:
915 		lvol_seek_data(lvol, bdev_io);
916 		break;
917 	case SPDK_BDEV_IO_TYPE_SEEK_HOLE:
918 		lvol_seek_hole(lvol, bdev_io);
919 		break;
920 	default:
921 		SPDK_INFOLOG(vbdev_lvol, "lvol: unsupported I/O type %d\n", bdev_io->type);
922 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
923 		return;
924 	}
925 	return;
926 }
927 
928 static int
929 vbdev_lvol_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
930 {
931 	struct spdk_lvol *lvol = ctx;
932 	struct spdk_bdev *base_bdev;
933 
934 	base_bdev = lvol->lvol_store->bs_dev->get_base_bdev(lvol->lvol_store->bs_dev);
935 
936 	return spdk_bdev_get_memory_domains(base_bdev, domains, array_size);
937 }
938 
939 static struct spdk_bdev_fn_table vbdev_lvol_fn_table = {
940 	.destruct		= vbdev_lvol_unregister,
941 	.io_type_supported	= vbdev_lvol_io_type_supported,
942 	.submit_request		= vbdev_lvol_submit_request,
943 	.get_io_channel		= vbdev_lvol_get_io_channel,
944 	.dump_info_json		= vbdev_lvol_dump_info_json,
945 	.write_config_json	= vbdev_lvol_write_config_json,
946 	.get_memory_domains	= vbdev_lvol_get_memory_domains,
947 };
948 
949 static void
950 lvol_destroy_cb(void *cb_arg, int bdeverrno)
951 {
952 }
953 
954 static void
955 _create_lvol_disk_destroy_cb(void *cb_arg, int bdeverrno)
956 {
957 	struct spdk_lvol *lvol = cb_arg;
958 
959 	if (bdeverrno < 0) {
960 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
961 			    lvol->unique_id);
962 		return;
963 	}
964 
965 	spdk_lvol_destroy(lvol, lvol_destroy_cb, NULL);
966 }
967 
968 static void
969 _create_lvol_disk_unload_cb(void *cb_arg, int bdeverrno)
970 {
971 	struct spdk_lvol *lvol = cb_arg;
972 
973 	if (bdeverrno < 0) {
974 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
975 			    lvol->unique_id);
976 		return;
977 	}
978 
979 	TAILQ_REMOVE(&lvol->lvol_store->lvols, lvol, link);
980 	free(lvol);
981 }
982 
983 static int
984 _create_lvol_disk(struct spdk_lvol *lvol, bool destroy)
985 {
986 	struct spdk_bdev *bdev;
987 	struct lvol_bdev *lvol_bdev;
988 	struct lvol_store_bdev *lvs_bdev;
989 	uint64_t total_size;
990 	unsigned char *alias;
991 	int rc;
992 
993 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
994 	if (lvs_bdev == NULL) {
995 		SPDK_ERRLOG("No spdk lvs-bdev pair found for lvol %s\n", lvol->unique_id);
996 		return -ENODEV;
997 	}
998 
999 	lvol_bdev = calloc(1, sizeof(struct lvol_bdev));
1000 	if (!lvol_bdev) {
1001 		SPDK_ERRLOG("Cannot alloc memory for lvol bdev\n");
1002 		return -ENOMEM;
1003 	}
1004 
1005 	lvol_bdev->lvol = lvol;
1006 	lvol_bdev->lvs_bdev = lvs_bdev;
1007 
1008 	bdev = &lvol_bdev->bdev;
1009 	bdev->name = lvol->unique_id;
1010 	bdev->product_name = "Logical Volume";
1011 	bdev->blocklen = spdk_bs_get_io_unit_size(lvol->lvol_store->blobstore);
1012 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
1013 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
1014 	assert((total_size % bdev->blocklen) == 0);
1015 	bdev->blockcnt = total_size / bdev->blocklen;
1016 	bdev->uuid = lvol->uuid;
1017 	bdev->required_alignment = lvs_bdev->bdev->required_alignment;
1018 	bdev->split_on_optimal_io_boundary = true;
1019 	bdev->optimal_io_boundary = spdk_bs_get_cluster_size(lvol->lvol_store->blobstore) / bdev->blocklen;
1020 
1021 	bdev->ctxt = lvol;
1022 	bdev->fn_table = &vbdev_lvol_fn_table;
1023 	bdev->module = &g_lvol_if;
1024 
1025 	/* Set default bdev reset waiting time. This value indicates how much
1026 	 * time a reset should wait before forcing a reset down to the underlying
1027 	 * bdev module.
1028 	 * Setting this parameter is mainly to avoid "empty" resets to a shared
1029 	 * bdev that may be used by multiple lvols. */
1030 	bdev->reset_io_drain_timeout = SPDK_BDEV_RESET_IO_DRAIN_RECOMMENDED_VALUE;
1031 
1032 	rc = spdk_bdev_register(bdev);
1033 	if (rc) {
1034 		free(lvol_bdev);
1035 		return rc;
1036 	}
1037 	lvol->bdev = bdev;
1038 
1039 	alias = spdk_sprintf_alloc("%s/%s", lvs_bdev->lvs->name, lvol->name);
1040 	if (alias == NULL) {
1041 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
1042 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
1043 						  _create_lvol_disk_unload_cb), lvol);
1044 		return -ENOMEM;
1045 	}
1046 
1047 	rc = spdk_bdev_alias_add(bdev, alias);
1048 	if (rc != 0) {
1049 		SPDK_ERRLOG("Cannot add alias to lvol bdev\n");
1050 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
1051 						  _create_lvol_disk_unload_cb), lvol);
1052 	}
1053 	free(alias);
1054 
1055 	return rc;
1056 }
1057 
1058 static void
1059 _vbdev_lvol_create_cb(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1060 {
1061 	struct spdk_lvol_with_handle_req *req = cb_arg;
1062 
1063 	if (lvolerrno < 0) {
1064 		goto end;
1065 	}
1066 
1067 	lvolerrno = _create_lvol_disk(lvol, true);
1068 
1069 end:
1070 	req->cb_fn(req->cb_arg, lvol, lvolerrno);
1071 	free(req);
1072 }
1073 
1074 int
1075 vbdev_lvol_create(struct spdk_lvol_store *lvs, const char *name, uint64_t sz,
1076 		  bool thin_provision, enum lvol_clear_method clear_method, spdk_lvol_op_with_handle_complete cb_fn,
1077 		  void *cb_arg)
1078 {
1079 	struct spdk_lvol_with_handle_req *req;
1080 	int rc;
1081 
1082 	req = calloc(1, sizeof(*req));
1083 	if (req == NULL) {
1084 		return -ENOMEM;
1085 	}
1086 	req->cb_fn = cb_fn;
1087 	req->cb_arg = cb_arg;
1088 
1089 	rc = spdk_lvol_create(lvs, name, sz, thin_provision, clear_method,
1090 			      _vbdev_lvol_create_cb, req);
1091 	if (rc != 0) {
1092 		free(req);
1093 	}
1094 
1095 	return rc;
1096 }
1097 
1098 void
1099 vbdev_lvol_create_snapshot(struct spdk_lvol *lvol, const char *snapshot_name,
1100 			   spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1101 {
1102 	struct spdk_lvol_with_handle_req *req;
1103 
1104 	req = calloc(1, sizeof(*req));
1105 	if (req == NULL) {
1106 		cb_fn(cb_arg, NULL, -ENOMEM);
1107 		return;
1108 	}
1109 
1110 	req->cb_fn = cb_fn;
1111 	req->cb_arg = cb_arg;
1112 
1113 	spdk_lvol_create_snapshot(lvol, snapshot_name, _vbdev_lvol_create_cb, req);
1114 }
1115 
1116 void
1117 vbdev_lvol_create_clone(struct spdk_lvol *lvol, const char *clone_name,
1118 			spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1119 {
1120 	struct spdk_lvol_with_handle_req *req;
1121 
1122 	req = calloc(1, sizeof(*req));
1123 	if (req == NULL) {
1124 		cb_fn(cb_arg, NULL, -ENOMEM);
1125 		return;
1126 	}
1127 
1128 	req->cb_fn = cb_fn;
1129 	req->cb_arg = cb_arg;
1130 
1131 	spdk_lvol_create_clone(lvol, clone_name, _vbdev_lvol_create_cb, req);
1132 }
1133 
1134 static void
1135 _vbdev_lvol_rename_cb(void *cb_arg, int lvolerrno)
1136 {
1137 	struct spdk_lvol_req *req = cb_arg;
1138 
1139 	if (lvolerrno != 0) {
1140 		SPDK_ERRLOG("Renaming lvol failed\n");
1141 	}
1142 
1143 	req->cb_fn(req->cb_arg, lvolerrno);
1144 	free(req);
1145 }
1146 
1147 void
1148 vbdev_lvol_rename(struct spdk_lvol *lvol, const char *new_lvol_name,
1149 		  spdk_lvol_op_complete cb_fn, void *cb_arg)
1150 {
1151 	struct spdk_lvol_req *req;
1152 	int rc;
1153 
1154 	rc = _vbdev_lvol_change_bdev_alias(lvol, new_lvol_name);
1155 	if (rc != 0) {
1156 		SPDK_ERRLOG("renaming lvol to '%s' does not succeed\n", new_lvol_name);
1157 		cb_fn(cb_arg, rc);
1158 		return;
1159 	}
1160 
1161 	req = calloc(1, sizeof(*req));
1162 	if (req == NULL) {
1163 		cb_fn(cb_arg, -ENOMEM);
1164 		return;
1165 	}
1166 	req->cb_fn = cb_fn;
1167 	req->cb_arg = cb_arg;
1168 
1169 	spdk_lvol_rename(lvol, new_lvol_name, _vbdev_lvol_rename_cb, req);
1170 }
1171 
1172 static void
1173 _vbdev_lvol_resize_cb(void *cb_arg, int lvolerrno)
1174 {
1175 	struct spdk_lvol_req *req = cb_arg;
1176 	struct spdk_lvol *lvol = req->lvol;
1177 	uint64_t total_size;
1178 
1179 	/* change bdev size */
1180 	if (lvolerrno != 0) {
1181 		SPDK_ERRLOG("CB function for bdev lvol %s receive error no: %d.\n", lvol->name, lvolerrno);
1182 		goto finish;
1183 	}
1184 
1185 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
1186 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
1187 	assert((total_size % lvol->bdev->blocklen) == 0);
1188 
1189 	lvolerrno = spdk_bdev_notify_blockcnt_change(lvol->bdev, total_size / lvol->bdev->blocklen);
1190 	if (lvolerrno != 0) {
1191 		SPDK_ERRLOG("Could not change num blocks for bdev lvol %s with error no: %d.\n",
1192 			    lvol->name, lvolerrno);
1193 	}
1194 
1195 finish:
1196 	req->cb_fn(req->cb_arg, lvolerrno);
1197 	free(req);
1198 }
1199 
1200 void
1201 vbdev_lvol_resize(struct spdk_lvol *lvol, uint64_t sz, spdk_lvol_op_complete cb_fn, void *cb_arg)
1202 {
1203 	struct spdk_lvol_req *req;
1204 
1205 	if (lvol == NULL) {
1206 		SPDK_ERRLOG("lvol does not exist\n");
1207 		cb_fn(cb_arg, -EINVAL);
1208 		return;
1209 	}
1210 
1211 	assert(lvol->bdev != NULL);
1212 
1213 	req = calloc(1, sizeof(*req));
1214 	if (req == NULL) {
1215 		cb_fn(cb_arg, -ENOMEM);
1216 		return;
1217 	}
1218 
1219 	req->cb_fn = cb_fn;
1220 	req->cb_arg = cb_arg;
1221 	req->sz = sz;
1222 	req->lvol = lvol;
1223 
1224 	spdk_lvol_resize(req->lvol, req->sz, _vbdev_lvol_resize_cb, req);
1225 }
1226 
1227 static void
1228 _vbdev_lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
1229 {
1230 	struct spdk_lvol_req *req = cb_arg;
1231 	struct spdk_lvol *lvol = req->lvol;
1232 
1233 	if (lvolerrno != 0) {
1234 		SPDK_ERRLOG("Could not set bdev lvol %s as read only due to error: %d.\n", lvol->name, lvolerrno);
1235 	}
1236 
1237 	req->cb_fn(req->cb_arg, lvolerrno);
1238 	free(req);
1239 }
1240 
1241 void
1242 vbdev_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1243 {
1244 	struct spdk_lvol_req *req;
1245 
1246 	if (lvol == NULL) {
1247 		SPDK_ERRLOG("lvol does not exist\n");
1248 		cb_fn(cb_arg, -EINVAL);
1249 		return;
1250 	}
1251 
1252 	assert(lvol->bdev != NULL);
1253 
1254 	req = calloc(1, sizeof(*req));
1255 	if (req == NULL) {
1256 		cb_fn(cb_arg, -ENOMEM);
1257 		return;
1258 	}
1259 
1260 	req->cb_fn = cb_fn;
1261 	req->cb_arg = cb_arg;
1262 	req->lvol = lvol;
1263 
1264 	spdk_lvol_set_read_only(lvol, _vbdev_lvol_set_read_only_cb, req);
1265 }
1266 
1267 static int
1268 vbdev_lvs_init(void)
1269 {
1270 	return 0;
1271 }
1272 
1273 static void vbdev_lvs_fini_start_iter(struct lvol_store_bdev *lvs_bdev);
1274 
1275 static void
1276 vbdev_lvs_fini_start_unload_cb(void *cb_arg, int lvserrno)
1277 {
1278 	struct lvol_store_bdev *lvs_bdev = cb_arg;
1279 	struct lvol_store_bdev *next_lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
1280 
1281 	if (lvserrno != 0) {
1282 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
1283 	}
1284 
1285 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1286 	free(lvs_bdev);
1287 
1288 	vbdev_lvs_fini_start_iter(next_lvs_bdev);
1289 }
1290 
1291 static void
1292 vbdev_lvs_fini_start_iter(struct lvol_store_bdev *lvs_bdev)
1293 {
1294 	struct spdk_lvol_store *lvs;
1295 
1296 	while (lvs_bdev != NULL) {
1297 		lvs = lvs_bdev->lvs;
1298 
1299 		if (_vbdev_lvs_are_lvols_closed(lvs)) {
1300 			spdk_lvs_unload(lvs, vbdev_lvs_fini_start_unload_cb, lvs_bdev);
1301 			return;
1302 		}
1303 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
1304 	}
1305 
1306 	spdk_bdev_module_fini_start_done();
1307 }
1308 
1309 static void
1310 vbdev_lvs_fini_start(void)
1311 {
1312 	g_shutdown_started = true;
1313 	vbdev_lvs_fini_start_iter(vbdev_lvol_store_first());
1314 }
1315 
1316 static int
1317 vbdev_lvs_get_ctx_size(void)
1318 {
1319 	return sizeof(struct vbdev_lvol_io);
1320 }
1321 
1322 static void
1323 _vbdev_lvs_examine_done(struct spdk_lvs_req *req, int lvserrno)
1324 {
1325 	req->cb_fn(req->cb_arg, lvserrno);
1326 }
1327 
1328 static void
1329 _vbdev_lvs_examine_failed(void *cb_arg, int lvserrno)
1330 {
1331 	struct spdk_lvs_req *req = cb_arg;
1332 
1333 	_vbdev_lvs_examine_done(req, req->lvserrno);
1334 }
1335 
1336 static void
1337 _vbdev_lvs_examine_finish(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1338 {
1339 	struct spdk_lvs_req *req = cb_arg;
1340 	struct spdk_lvol_store *lvs = req->lvol_store;
1341 
1342 	if (lvolerrno != 0) {
1343 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
1344 		if (lvolerrno == -ENOMEM) {
1345 			TAILQ_INSERT_TAIL(&lvs->retry_open_lvols, lvol, link);
1346 			return;
1347 		}
1348 		SPDK_ERRLOG("Error opening lvol %s\n", lvol->unique_id);
1349 		lvs->lvol_count--;
1350 		free(lvol);
1351 		goto end;
1352 	}
1353 
1354 	if (_create_lvol_disk(lvol, false)) {
1355 		SPDK_ERRLOG("Cannot create bdev for lvol %s\n", lvol->unique_id);
1356 		lvs->lvol_count--;
1357 		goto end;
1358 	}
1359 
1360 	lvs->lvols_opened++;
1361 	SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s succeeded\n", lvol->unique_id);
1362 
1363 end:
1364 	if (!TAILQ_EMPTY(&lvs->retry_open_lvols)) {
1365 		lvol = TAILQ_FIRST(&lvs->retry_open_lvols);
1366 		TAILQ_REMOVE(&lvs->retry_open_lvols, lvol, link);
1367 		TAILQ_INSERT_HEAD(&lvs->lvols, lvol, link);
1368 		spdk_lvol_open(lvol, _vbdev_lvs_examine_finish, req);
1369 		return;
1370 	}
1371 	if (lvs->lvols_opened >= lvs->lvol_count) {
1372 		SPDK_INFOLOG(vbdev_lvol, "Opening lvols finished\n");
1373 		_vbdev_lvs_examine_done(req, 0);
1374 	}
1375 }
1376 
1377 static void
1378 _vbdev_lvs_examine_cb(void *arg, struct spdk_lvol_store *lvol_store, int lvserrno)
1379 {
1380 	struct lvol_store_bdev *lvs_bdev;
1381 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)arg;
1382 	struct spdk_lvol *lvol, *tmp;
1383 	struct spdk_lvs_req *ori_req = req->cb_arg;
1384 
1385 	if (lvserrno == -EEXIST) {
1386 		SPDK_INFOLOG(vbdev_lvol,
1387 			     "Name for lvolstore on device %s conflicts with name for already loaded lvs\n",
1388 			     req->base_bdev->name);
1389 		/* On error blobstore destroys bs_dev itself */
1390 		_vbdev_lvs_examine_done(ori_req, lvserrno);
1391 		goto end;
1392 	} else if (lvserrno != 0) {
1393 		SPDK_INFOLOG(vbdev_lvol, "Lvol store not found on %s\n", req->base_bdev->name);
1394 		/* On error blobstore destroys bs_dev itself */
1395 		_vbdev_lvs_examine_done(ori_req, lvserrno);
1396 		goto end;
1397 	}
1398 
1399 	lvserrno = spdk_bs_bdev_claim(lvol_store->bs_dev, &g_lvol_if);
1400 	if (lvserrno != 0) {
1401 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
1402 		ori_req->lvserrno = lvserrno;
1403 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, ori_req);
1404 		goto end;
1405 	}
1406 
1407 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
1408 	if (!lvs_bdev) {
1409 		SPDK_ERRLOG("Cannot alloc memory for lvs_bdev\n");
1410 		ori_req->lvserrno = lvserrno;
1411 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, ori_req);
1412 		goto end;
1413 	}
1414 
1415 	lvs_bdev->lvs = lvol_store;
1416 	lvs_bdev->bdev = req->base_bdev;
1417 
1418 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1419 
1420 	SPDK_INFOLOG(vbdev_lvol, "Lvol store found on %s - begin parsing\n",
1421 		     req->base_bdev->name);
1422 
1423 	lvol_store->lvols_opened = 0;
1424 
1425 	ori_req->lvol_store = lvol_store;
1426 
1427 	if (TAILQ_EMPTY(&lvol_store->lvols)) {
1428 		SPDK_INFOLOG(vbdev_lvol, "Lvol store examination done\n");
1429 		_vbdev_lvs_examine_done(ori_req, 0);
1430 	} else {
1431 		/* Open all lvols */
1432 		TAILQ_FOREACH_SAFE(lvol, &lvol_store->lvols, link, tmp) {
1433 			spdk_lvol_open(lvol, _vbdev_lvs_examine_finish, ori_req);
1434 		}
1435 	}
1436 
1437 end:
1438 	free(req);
1439 }
1440 
1441 static void
1442 _vbdev_lvs_examine(struct spdk_bdev *bdev, struct spdk_lvs_req *ori_req,
1443 		   void (*action)(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg))
1444 {
1445 	struct spdk_bs_dev *bs_dev;
1446 	struct spdk_lvs_with_handle_req *req;
1447 	int rc;
1448 
1449 	req = calloc(1, sizeof(*req));
1450 	if (req == NULL) {
1451 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1452 		_vbdev_lvs_examine_done(ori_req, -ENOMEM);
1453 		return;
1454 	}
1455 
1456 	rc = spdk_bdev_create_bs_dev_ext(bdev->name, vbdev_lvs_base_bdev_event_cb,
1457 					 NULL, &bs_dev);
1458 	if (rc < 0) {
1459 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n", bdev->name);
1460 		_vbdev_lvs_examine_done(ori_req, rc);
1461 		free(req);
1462 		return;
1463 	}
1464 
1465 	req->base_bdev = bdev;
1466 	req->cb_arg = ori_req;
1467 
1468 	action(bs_dev, _vbdev_lvs_examine_cb, req);
1469 }
1470 
1471 static void
1472 vbdev_lvs_examine_done(void *arg, int lvserrno)
1473 {
1474 	struct spdk_lvs_req *req = arg;
1475 
1476 	spdk_bdev_module_examine_done(&g_lvol_if);
1477 	free(req);
1478 }
1479 
1480 static void
1481 vbdev_lvs_examine(struct spdk_bdev *bdev)
1482 {
1483 	struct spdk_lvs_req *req;
1484 
1485 	if (spdk_bdev_get_md_size(bdev) != 0) {
1486 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n which is formatted with metadata",
1487 			     bdev->name);
1488 		spdk_bdev_module_examine_done(&g_lvol_if);
1489 		return;
1490 	}
1491 
1492 	req = calloc(1, sizeof(*req));
1493 	if (req == NULL) {
1494 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1495 		spdk_bdev_module_examine_done(&g_lvol_if);
1496 		return;
1497 	}
1498 
1499 	req->cb_fn = vbdev_lvs_examine_done;
1500 	req->cb_arg = req;
1501 
1502 	_vbdev_lvs_examine(bdev, req, spdk_lvs_load);
1503 }
1504 
1505 struct spdk_lvol *
1506 vbdev_lvol_get_from_bdev(struct spdk_bdev *bdev)
1507 {
1508 	if (!bdev || bdev->module != &g_lvol_if) {
1509 		return NULL;
1510 	}
1511 
1512 	if (bdev->ctxt == NULL) {
1513 		SPDK_ERRLOG("No lvol ctx assigned to bdev %s\n", bdev->name);
1514 		return NULL;
1515 	}
1516 
1517 	return (struct spdk_lvol *)bdev->ctxt;
1518 }
1519 
1520 static void
1521 _vbdev_lvs_grow_finish(void *arg, int lvserrno)
1522 {
1523 	struct spdk_lvs_grow_req *req = arg;
1524 
1525 	req->cb_fn(req->cb_arg, lvserrno);
1526 	free(req);
1527 }
1528 
1529 static void
1530 _vbdev_lvs_grow_unload_cb(void *cb_arg, int lvserrno)
1531 {
1532 	struct spdk_lvs_grow_req *req = cb_arg;
1533 	struct lvol_store_bdev *lvs_bdev;
1534 	struct spdk_bdev *bdev;
1535 
1536 	if (lvserrno != 0) {
1537 		_vbdev_lvs_grow_finish(req, lvserrno);
1538 		return;
1539 	}
1540 
1541 	lvs_bdev = req->lvs_bdev;
1542 	bdev = lvs_bdev->bdev;
1543 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1544 	req->base.cb_fn = _vbdev_lvs_grow_finish;
1545 	req->base.cb_arg = req;
1546 	_vbdev_lvs_examine(bdev, &req->base, spdk_lvs_grow);
1547 	free(lvs_bdev);
1548 }
1549 
1550 static void
1551 _vbdev_lvs_grow_remove_bdev_unregistered_cb(void *cb_arg, int bdeverrno)
1552 {
1553 	struct spdk_lvs_grow_req *req = cb_arg;
1554 	struct spdk_lvol_store *lvs = req->base.lvol_store;
1555 
1556 	if (bdeverrno != 0) {
1557 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol unregistered with errno %d\n", bdeverrno);
1558 	}
1559 
1560 	req->lvol_cnt--;
1561 
1562 	if (req->lvol_cnt == 0) {
1563 		/* Lvol store can be unloaded once all lvols are closed. */
1564 		if (_vbdev_lvs_are_lvols_closed(lvs)) {
1565 			spdk_lvs_unload(lvs, _vbdev_lvs_grow_unload_cb, req);
1566 		} else {
1567 			_vbdev_lvs_grow_finish(req, -EINVAL);
1568 		}
1569 	}
1570 }
1571 
1572 void
1573 vbdev_lvs_grow(struct spdk_lvol_store *lvs,
1574 	       spdk_lvs_op_complete cb_fn, void *cb_arg)
1575 {
1576 	struct spdk_lvs_grow_req *req;
1577 	struct spdk_lvol *lvol, *tmp;
1578 
1579 	req = calloc(1, sizeof(*req));
1580 	if (!req) {
1581 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1582 		cb_fn(cb_arg, -ENOMEM);
1583 		return;
1584 	}
1585 	req->cb_fn = cb_fn;
1586 	req->cb_arg = cb_arg;
1587 	req->base.lvol_store = lvs;
1588 	req->lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
1589 	if (req->lvs_bdev == NULL) {
1590 		SPDK_ERRLOG("Cannot get valid lvs_bdev\n");
1591 		_vbdev_lvs_grow_finish(req, -EINVAL);
1592 		return;
1593 	}
1594 
1595 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
1596 		spdk_lvs_unload(lvs, _vbdev_lvs_grow_unload_cb, req);
1597 	} else {
1598 		TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
1599 			req->lvol_cnt++;
1600 			spdk_bdev_unregister(lvol->bdev, _vbdev_lvs_grow_remove_bdev_unregistered_cb, req);
1601 		}
1602 		assert(req->lvol_cnt > 0);
1603 	}
1604 }
1605 
1606 SPDK_LOG_REGISTER_COMPONENT(vbdev_lvol)
1607