xref: /spdk/module/bdev/lvol/vbdev_lvol.c (revision b37db06935181fd0e8f5592a96d860040abaa201)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/rpc.h"
8 #include "spdk/bdev_module.h"
9 #include "spdk/log.h"
10 #include "spdk/string.h"
11 #include "spdk/uuid.h"
12 #include "spdk/blob.h"
13 
14 #include "vbdev_lvol.h"
15 
16 struct vbdev_lvol_io {
17 	struct spdk_blob_ext_io_opts ext_io_opts;
18 };
19 
20 static TAILQ_HEAD(, lvol_store_bdev) g_spdk_lvol_pairs = TAILQ_HEAD_INITIALIZER(
21 			g_spdk_lvol_pairs);
22 
23 static int vbdev_lvs_init(void);
24 static void vbdev_lvs_fini_start(void);
25 static int vbdev_lvs_get_ctx_size(void);
26 static void vbdev_lvs_examine_config(struct spdk_bdev *bdev);
27 static void vbdev_lvs_examine_disk(struct spdk_bdev *bdev);
28 static bool g_shutdown_started = false;
29 
30 static struct spdk_bdev_module g_lvol_if = {
31 	.name = "lvol",
32 	.module_init = vbdev_lvs_init,
33 	.fini_start = vbdev_lvs_fini_start,
34 	.async_fini_start = true,
35 	.examine_config = vbdev_lvs_examine_config,
36 	.examine_disk = vbdev_lvs_examine_disk,
37 	.get_ctx_size = vbdev_lvs_get_ctx_size,
38 
39 };
40 
41 SPDK_BDEV_MODULE_REGISTER(lvol, &g_lvol_if)
42 
43 static void _vbdev_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg);
44 
45 struct lvol_store_bdev *
46 vbdev_get_lvs_bdev_by_lvs(struct spdk_lvol_store *lvs_orig)
47 {
48 	struct spdk_lvol_store *lvs = NULL;
49 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
50 
51 	while (lvs_bdev != NULL) {
52 		lvs = lvs_bdev->lvs;
53 		if (lvs == lvs_orig) {
54 			if (lvs_bdev->removal_in_progress) {
55 				/* We do not allow access to lvs that are being unloaded or
56 				 * destroyed */
57 				SPDK_DEBUGLOG(vbdev_lvol, "lvs %s: removal in progress\n",
58 					      lvs_orig->name);
59 				return NULL;
60 			} else {
61 				return lvs_bdev;
62 			}
63 		}
64 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
65 	}
66 
67 	return NULL;
68 }
69 
70 static int
71 _vbdev_lvol_change_bdev_alias(struct spdk_lvol *lvol, const char *new_lvol_name)
72 {
73 	struct spdk_bdev_alias *tmp;
74 	char *old_alias;
75 	char *alias;
76 	int rc;
77 	int alias_number = 0;
78 
79 	/* bdev representing lvols have only one alias,
80 	 * while we changed lvs name earlier, we have to iterate alias list to get one,
81 	 * and check if there is only one alias */
82 
83 	TAILQ_FOREACH(tmp, spdk_bdev_get_aliases(lvol->bdev), tailq) {
84 		if (++alias_number > 1) {
85 			SPDK_ERRLOG("There is more than 1 alias in bdev %s\n", lvol->bdev->name);
86 			return -EINVAL;
87 		}
88 
89 		old_alias = tmp->alias.name;
90 	}
91 
92 	if (alias_number == 0) {
93 		SPDK_ERRLOG("There are no aliases in bdev %s\n", lvol->bdev->name);
94 		return -EINVAL;
95 	}
96 
97 	alias = spdk_sprintf_alloc("%s/%s", lvol->lvol_store->name, new_lvol_name);
98 	if (alias == NULL) {
99 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
100 		return -ENOMEM;
101 	}
102 
103 	rc = spdk_bdev_alias_add(lvol->bdev, alias);
104 	if (rc != 0) {
105 		SPDK_ERRLOG("cannot add alias '%s'\n", alias);
106 		free(alias);
107 		return rc;
108 	}
109 	free(alias);
110 
111 	rc = spdk_bdev_alias_del(lvol->bdev, old_alias);
112 	if (rc != 0) {
113 		SPDK_ERRLOG("cannot remove alias '%s'\n", old_alias);
114 		return rc;
115 	}
116 
117 	return 0;
118 }
119 
120 static struct lvol_store_bdev *
121 vbdev_get_lvs_bdev_by_bdev(struct spdk_bdev *bdev_orig)
122 {
123 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
124 
125 	while (lvs_bdev != NULL) {
126 		if (lvs_bdev->bdev == bdev_orig) {
127 			if (lvs_bdev->removal_in_progress) {
128 				/* We do not allow access to lvs that are being unloaded or
129 				 * destroyed */
130 				SPDK_DEBUGLOG(vbdev_lvol, "lvs %s: removal in progress\n",
131 					      lvs_bdev->lvs->name);
132 				return NULL;
133 			} else {
134 				return lvs_bdev;
135 			}
136 		}
137 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
138 	}
139 
140 	return NULL;
141 }
142 
143 static void
144 vbdev_lvs_hotremove_cb(struct spdk_bdev *bdev)
145 {
146 	struct lvol_store_bdev *lvs_bdev;
147 
148 	lvs_bdev = vbdev_get_lvs_bdev_by_bdev(bdev);
149 	if (lvs_bdev != NULL) {
150 		SPDK_NOTICELOG("bdev %s being removed: closing lvstore %s\n",
151 			       spdk_bdev_get_name(bdev), lvs_bdev->lvs->name);
152 		vbdev_lvs_unload(lvs_bdev->lvs, NULL, NULL);
153 	}
154 }
155 
156 static void
157 vbdev_lvs_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
158 			     void *event_ctx)
159 {
160 	switch (type) {
161 	case SPDK_BDEV_EVENT_REMOVE:
162 		vbdev_lvs_hotremove_cb(bdev);
163 		break;
164 	default:
165 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
166 		break;
167 	}
168 }
169 
170 static void
171 _vbdev_lvs_create_cb(void *cb_arg, struct spdk_lvol_store *lvs, int lvserrno)
172 {
173 	struct spdk_lvs_with_handle_req *req = cb_arg;
174 	struct lvol_store_bdev *lvs_bdev;
175 	struct spdk_bdev *bdev = req->base_bdev;
176 	struct spdk_bs_dev *bs_dev = req->bs_dev;
177 
178 	if (lvserrno != 0) {
179 		assert(lvs == NULL);
180 		SPDK_ERRLOG("Cannot create lvol store bdev\n");
181 		goto end;
182 	}
183 
184 	lvserrno = spdk_bs_bdev_claim(bs_dev, &g_lvol_if);
185 	if (lvserrno != 0) {
186 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
187 		req->bs_dev->destroy(req->bs_dev);
188 		goto end;
189 	}
190 
191 	assert(lvs != NULL);
192 
193 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
194 	if (!lvs_bdev) {
195 		lvserrno = -ENOMEM;
196 		goto end;
197 	}
198 	lvs_bdev->lvs = lvs;
199 	lvs_bdev->bdev = bdev;
200 	lvs_bdev->req = NULL;
201 
202 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
203 	SPDK_INFOLOG(vbdev_lvol, "Lvol store bdev inserted\n");
204 
205 end:
206 	req->cb_fn(req->cb_arg, lvs, lvserrno);
207 	free(req);
208 
209 	return;
210 }
211 
212 int
213 vbdev_lvs_create_ext(const char *base_bdev_name, const char *name, uint32_t cluster_sz,
214 		     enum lvs_clear_method clear_method, uint32_t num_md_pages_per_cluster_ratio,
215 		     uint32_t md_page_size, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
216 {
217 	struct spdk_bs_dev *bs_dev;
218 	struct spdk_lvs_with_handle_req *lvs_req;
219 	struct spdk_lvs_opts opts;
220 	int rc;
221 	int len;
222 
223 	if (base_bdev_name == NULL) {
224 		SPDK_ERRLOG("missing base_bdev_name param\n");
225 		return -EINVAL;
226 	}
227 
228 	spdk_lvs_opts_init(&opts);
229 	if (cluster_sz != 0) {
230 		opts.cluster_sz = cluster_sz;
231 	}
232 
233 	if (clear_method != 0) {
234 		opts.clear_method = clear_method;
235 	}
236 
237 	if (num_md_pages_per_cluster_ratio != 0) {
238 		opts.num_md_pages_per_cluster_ratio = num_md_pages_per_cluster_ratio;
239 	}
240 
241 	if (name == NULL) {
242 		SPDK_ERRLOG("missing name param\n");
243 		return -EINVAL;
244 	}
245 
246 	len = strnlen(name, SPDK_LVS_NAME_MAX);
247 
248 	if (len == 0 || len == SPDK_LVS_NAME_MAX) {
249 		SPDK_ERRLOG("name must be between 1 and %d characters\n", SPDK_LVS_NAME_MAX - 1);
250 		return -EINVAL;
251 	}
252 	snprintf(opts.name, sizeof(opts.name), "%s", name);
253 	opts.esnap_bs_dev_create = vbdev_lvol_esnap_dev_create;
254 
255 	if (md_page_size != 0 && (md_page_size < 4096 || md_page_size > 65536)) {
256 		SPDK_ERRLOG("Invalid metadata page size %" PRIu32 " (must be between 4096B and 65536B).\n",
257 			    md_page_size);
258 		return -EINVAL;
259 	}
260 
261 	if (md_page_size != 0 && (!spdk_u32_is_pow2(md_page_size))) {
262 		SPDK_ERRLOG("Invalid metadata page size %" PRIu32 " (must be a power of 2.)\n", md_page_size);
263 		return -EINVAL;
264 	}
265 
266 	lvs_req = calloc(1, sizeof(*lvs_req));
267 	if (!lvs_req) {
268 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
269 		return -ENOMEM;
270 	}
271 
272 	rc = spdk_bdev_create_bs_dev_ext(base_bdev_name, vbdev_lvs_base_bdev_event_cb,
273 					 NULL, &bs_dev);
274 	if (rc < 0) {
275 		SPDK_ERRLOG("Cannot create blobstore device\n");
276 		free(lvs_req);
277 		return rc;
278 	}
279 
280 	if (md_page_size > bs_dev->phys_blocklen) {
281 		SPDK_WARNLOG("Metadata page size is greater than physical block length\n");
282 	}
283 
284 	opts.md_page_size = md_page_size;
285 	lvs_req->bs_dev = bs_dev;
286 	lvs_req->base_bdev = bs_dev->get_base_bdev(bs_dev);
287 	lvs_req->cb_fn = cb_fn;
288 	lvs_req->cb_arg = cb_arg;
289 
290 	rc = spdk_lvs_init(bs_dev, &opts, _vbdev_lvs_create_cb, lvs_req);
291 	if (rc < 0) {
292 		free(lvs_req);
293 		bs_dev->destroy(bs_dev);
294 		return rc;
295 	}
296 
297 	return 0;
298 }
299 
300 int
301 vbdev_lvs_create(const char *base_bdev_name, const char *name, uint32_t cluster_sz,
302 		 enum lvs_clear_method clear_method, uint32_t num_md_pages_per_cluster_ratio,
303 		 spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
304 {
305 	uint32_t md_page_size = 0;
306 
307 	return vbdev_lvs_create_ext(base_bdev_name, name, cluster_sz, clear_method,
308 				    num_md_pages_per_cluster_ratio, md_page_size, cb_fn, cb_arg);
309 }
310 
311 static void
312 _vbdev_lvs_rename_cb(void *cb_arg, int lvserrno)
313 {
314 	struct spdk_lvs_req *req = cb_arg;
315 	struct spdk_lvol *tmp;
316 
317 	if (lvserrno != 0) {
318 		SPDK_INFOLOG(vbdev_lvol, "Lvol store rename failed\n");
319 	} else {
320 		TAILQ_FOREACH(tmp, &req->lvol_store->lvols, link) {
321 			/* We have to pass current lvol name, since only lvs name changed */
322 			_vbdev_lvol_change_bdev_alias(tmp, tmp->name);
323 		}
324 	}
325 
326 	req->cb_fn(req->cb_arg, lvserrno);
327 	free(req);
328 }
329 
330 void
331 vbdev_lvs_rename(struct spdk_lvol_store *lvs, const char *new_lvs_name,
332 		 spdk_lvs_op_complete cb_fn, void *cb_arg)
333 {
334 	struct lvol_store_bdev *lvs_bdev;
335 
336 	struct spdk_lvs_req *req;
337 
338 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
339 	if (!lvs_bdev) {
340 		SPDK_ERRLOG("No such lvol store found\n");
341 		cb_fn(cb_arg, -ENODEV);
342 		return;
343 	}
344 
345 	req = calloc(1, sizeof(*req));
346 	if (!req) {
347 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
348 		cb_fn(cb_arg, -ENOMEM);
349 		return;
350 	}
351 	req->cb_fn = cb_fn;
352 	req->cb_arg = cb_arg;
353 	req->lvol_store = lvs;
354 
355 	spdk_lvs_rename(lvs, new_lvs_name, _vbdev_lvs_rename_cb, req);
356 }
357 
358 static void
359 _vbdev_lvs_remove_cb(void *cb_arg, int lvserrno)
360 {
361 	struct lvol_store_bdev *lvs_bdev = cb_arg;
362 	struct spdk_lvs_req *req = lvs_bdev->req;
363 
364 	if (lvserrno != 0) {
365 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
366 	}
367 
368 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
369 	free(lvs_bdev);
370 
371 	if (req->cb_fn != NULL) {
372 		req->cb_fn(req->cb_arg, lvserrno);
373 	}
374 	free(req);
375 }
376 
377 static void
378 _vbdev_lvs_remove_lvol_cb(void *cb_arg, int lvolerrno)
379 {
380 	struct lvol_store_bdev *lvs_bdev = cb_arg;
381 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
382 	struct spdk_lvol *lvol;
383 
384 	if (lvolerrno != 0) {
385 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol removed with errno %d\n", lvolerrno);
386 	}
387 
388 	if (TAILQ_EMPTY(&lvs->lvols)) {
389 		spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
390 		return;
391 	}
392 
393 	lvol = TAILQ_FIRST(&lvs->lvols);
394 	while (lvol != NULL) {
395 		if (spdk_lvol_deletable(lvol)) {
396 			_vbdev_lvol_destroy(lvol, _vbdev_lvs_remove_lvol_cb, lvs_bdev);
397 			return;
398 		}
399 		lvol = TAILQ_NEXT(lvol, link);
400 	}
401 
402 	/* If no lvol is deletable, that means there is circular dependency. */
403 	SPDK_ERRLOG("Lvols left in lvs, but unable to delete.\n");
404 	assert(false);
405 }
406 
407 static bool
408 _vbdev_lvs_are_lvols_closed(struct spdk_lvol_store *lvs)
409 {
410 	struct spdk_lvol *lvol;
411 
412 	TAILQ_FOREACH(lvol, &lvs->lvols, link) {
413 		if (lvol->ref_count != 0) {
414 			return false;
415 		}
416 	}
417 	return true;
418 }
419 
420 static void
421 _vbdev_lvs_remove_bdev_unregistered_cb(void *cb_arg, int bdeverrno)
422 {
423 	struct lvol_store_bdev *lvs_bdev = cb_arg;
424 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
425 
426 	if (bdeverrno != 0) {
427 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol unregistered with errno %d\n", bdeverrno);
428 	}
429 
430 	/* Lvol store can be unloaded once all lvols are closed. */
431 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
432 		spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
433 	}
434 }
435 
436 static void
437 _vbdev_lvs_remove(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg,
438 		  bool destroy)
439 {
440 	struct spdk_lvs_req *req;
441 	struct lvol_store_bdev *lvs_bdev;
442 	struct spdk_lvol *lvol, *tmp;
443 
444 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
445 	if (!lvs_bdev) {
446 		SPDK_ERRLOG("No such lvol store found\n");
447 		if (cb_fn != NULL) {
448 			cb_fn(cb_arg, -ENODEV);
449 		}
450 		return;
451 	}
452 
453 	req = calloc(1, sizeof(*req));
454 	if (!req) {
455 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
456 		if (cb_fn != NULL) {
457 			cb_fn(cb_arg, -ENOMEM);
458 		}
459 		return;
460 	}
461 
462 	lvs_bdev->removal_in_progress = true;
463 
464 	req->cb_fn = cb_fn;
465 	req->cb_arg = cb_arg;
466 	lvs_bdev->req = req;
467 
468 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
469 		if (destroy) {
470 			spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
471 			return;
472 		}
473 		spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
474 		return;
475 	}
476 	if (destroy) {
477 		_vbdev_lvs_remove_lvol_cb(lvs_bdev, 0);
478 		return;
479 	}
480 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
481 		if (lvol->bdev == NULL) {
482 			spdk_lvol_close(lvol, _vbdev_lvs_remove_bdev_unregistered_cb, lvs_bdev);
483 			continue;
484 		}
485 		spdk_bdev_unregister(lvol->bdev, _vbdev_lvs_remove_bdev_unregistered_cb, lvs_bdev);
486 	}
487 }
488 
489 void
490 vbdev_lvs_unload(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
491 {
492 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, false);
493 }
494 
495 void
496 vbdev_lvs_destruct(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
497 {
498 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, true);
499 }
500 
501 struct lvol_store_bdev *
502 vbdev_lvol_store_first(void)
503 {
504 	struct lvol_store_bdev *lvs_bdev;
505 
506 	lvs_bdev = TAILQ_FIRST(&g_spdk_lvol_pairs);
507 	if (lvs_bdev) {
508 		SPDK_INFOLOG(vbdev_lvol, "Starting lvolstore iteration at %p\n", lvs_bdev->lvs);
509 	}
510 
511 	return lvs_bdev;
512 }
513 
514 struct lvol_store_bdev *
515 vbdev_lvol_store_next(struct lvol_store_bdev *prev)
516 {
517 	struct lvol_store_bdev *lvs_bdev;
518 
519 	if (prev == NULL) {
520 		SPDK_ERRLOG("prev argument cannot be NULL\n");
521 		return NULL;
522 	}
523 
524 	lvs_bdev = TAILQ_NEXT(prev, lvol_stores);
525 	if (lvs_bdev) {
526 		SPDK_INFOLOG(vbdev_lvol, "Continuing lvolstore iteration at %p\n", lvs_bdev->lvs);
527 	}
528 
529 	return lvs_bdev;
530 }
531 
532 static struct spdk_lvol_store *
533 _vbdev_get_lvol_store_by_uuid(const struct spdk_uuid *uuid)
534 {
535 	struct spdk_lvol_store *lvs = NULL;
536 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
537 
538 	while (lvs_bdev != NULL) {
539 		lvs = lvs_bdev->lvs;
540 		if (spdk_uuid_compare(&lvs->uuid, uuid) == 0) {
541 			return lvs;
542 		}
543 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
544 	}
545 	return NULL;
546 }
547 
548 struct spdk_lvol_store *
549 vbdev_get_lvol_store_by_uuid(const char *uuid_str)
550 {
551 	struct spdk_uuid uuid;
552 
553 	if (spdk_uuid_parse(&uuid, uuid_str)) {
554 		return NULL;
555 	}
556 
557 	return _vbdev_get_lvol_store_by_uuid(&uuid);
558 }
559 
560 struct spdk_lvol_store *
561 vbdev_get_lvol_store_by_name(const char *name)
562 {
563 	struct spdk_lvol_store *lvs = NULL;
564 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
565 
566 	while (lvs_bdev != NULL) {
567 		lvs = lvs_bdev->lvs;
568 		if (strncmp(lvs->name, name, sizeof(lvs->name)) == 0) {
569 			return lvs;
570 		}
571 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
572 	}
573 	return NULL;
574 }
575 
576 struct vbdev_lvol_destroy_ctx {
577 	struct spdk_lvol *lvol;
578 	spdk_lvol_op_complete cb_fn;
579 	void *cb_arg;
580 };
581 
582 static void
583 _vbdev_lvol_unregister_unload_lvs(void *cb_arg, int lvserrno)
584 {
585 	struct lvol_bdev *lvol_bdev = cb_arg;
586 	struct lvol_store_bdev *lvs_bdev = lvol_bdev->lvs_bdev;
587 
588 	if (lvserrno != 0) {
589 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
590 	}
591 
592 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
593 	free(lvs_bdev);
594 
595 	spdk_bdev_destruct_done(&lvol_bdev->bdev, lvserrno);
596 	free(lvol_bdev);
597 }
598 
599 static void
600 _vbdev_lvol_unregister_cb(void *ctx, int lvolerrno)
601 {
602 	struct lvol_bdev *lvol_bdev = ctx;
603 	struct lvol_store_bdev *lvs_bdev = lvol_bdev->lvs_bdev;
604 
605 	if (g_shutdown_started && _vbdev_lvs_are_lvols_closed(lvs_bdev->lvs)) {
606 		spdk_lvs_unload(lvs_bdev->lvs, _vbdev_lvol_unregister_unload_lvs, lvol_bdev);
607 		return;
608 	}
609 
610 	spdk_bdev_destruct_done(&lvol_bdev->bdev, lvolerrno);
611 	free(lvol_bdev);
612 }
613 
614 static int
615 vbdev_lvol_unregister(void *ctx)
616 {
617 	struct spdk_lvol *lvol = ctx;
618 	struct lvol_bdev *lvol_bdev;
619 
620 	assert(lvol != NULL);
621 	lvol_bdev = SPDK_CONTAINEROF(lvol->bdev, struct lvol_bdev, bdev);
622 
623 	spdk_bdev_alias_del_all(lvol->bdev);
624 	spdk_lvol_close(lvol, _vbdev_lvol_unregister_cb, lvol_bdev);
625 
626 	/* return 1 to indicate we have an operation that must finish asynchronously before the
627 	 *  lvol is closed
628 	 */
629 	return 1;
630 }
631 
632 static void
633 _vbdev_lvol_destroy_cb(void *cb_arg, int bdeverrno)
634 {
635 	struct vbdev_lvol_destroy_ctx *ctx = cb_arg;
636 	struct spdk_lvol *lvol = ctx->lvol;
637 
638 	if (bdeverrno < 0) {
639 		SPDK_INFOLOG(vbdev_lvol, "Could not unregister bdev during lvol (%s) destroy\n",
640 			     lvol->unique_id);
641 		ctx->cb_fn(ctx->cb_arg, bdeverrno);
642 		free(ctx);
643 		return;
644 	}
645 
646 	spdk_lvol_destroy(lvol, ctx->cb_fn, ctx->cb_arg);
647 	free(ctx);
648 }
649 
650 static void
651 _vbdev_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
652 {
653 	struct vbdev_lvol_destroy_ctx *ctx;
654 	size_t count;
655 
656 	assert(lvol != NULL);
657 	assert(cb_fn != NULL);
658 
659 	/* Callers other than _vbdev_lvs_remove() must ensure the lvstore is not being removed. */
660 	assert(cb_fn == _vbdev_lvs_remove_lvol_cb ||
661 	       vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store) != NULL);
662 
663 	/* Check if it is possible to delete lvol */
664 	spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
665 	if (count > 1) {
666 		/* throw an error */
667 		SPDK_ERRLOG("Cannot delete lvol\n");
668 		cb_fn(cb_arg, -EPERM);
669 		return;
670 	}
671 
672 	ctx = calloc(1, sizeof(*ctx));
673 	if (!ctx) {
674 		cb_fn(cb_arg, -ENOMEM);
675 		return;
676 	}
677 
678 	ctx->lvol = lvol;
679 	ctx->cb_fn = cb_fn;
680 	ctx->cb_arg = cb_arg;
681 
682 	if (spdk_lvol_is_degraded(lvol)) {
683 		spdk_lvol_close(lvol, _vbdev_lvol_destroy_cb, ctx);
684 		return;
685 	}
686 
687 	spdk_bdev_unregister(lvol->bdev, _vbdev_lvol_destroy_cb, ctx);
688 }
689 
690 void
691 vbdev_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
692 {
693 	struct lvol_store_bdev *lvs_bdev;
694 
695 	/*
696 	 * During destruction of an lvolstore, _vbdev_lvs_unload() iterates through lvols until they
697 	 * are all deleted. There may be some IO required
698 	 */
699 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
700 	if (lvs_bdev == NULL) {
701 		SPDK_DEBUGLOG(vbdev_lvol, "lvol %s: lvolstore is being removed\n",
702 			      lvol->unique_id);
703 		cb_fn(cb_arg, -ENODEV);
704 		return;
705 	}
706 
707 	_vbdev_lvol_destroy(lvol, cb_fn, cb_arg);
708 }
709 
710 static char *
711 vbdev_lvol_find_name(struct spdk_lvol *lvol, spdk_blob_id blob_id)
712 {
713 	struct spdk_lvol_store *lvs;
714 	struct spdk_lvol *_lvol;
715 
716 	assert(lvol != NULL);
717 
718 	lvs = lvol->lvol_store;
719 
720 	assert(lvs);
721 
722 	TAILQ_FOREACH(_lvol, &lvs->lvols, link) {
723 		if (_lvol->blob_id == blob_id) {
724 			return _lvol->name;
725 		}
726 	}
727 
728 	return NULL;
729 }
730 
731 static int
732 vbdev_lvol_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
733 {
734 	struct spdk_lvol *lvol = ctx;
735 	struct lvol_store_bdev *lvs_bdev;
736 	struct spdk_bdev *bdev;
737 	struct spdk_blob *blob;
738 	spdk_blob_id *ids = NULL;
739 	size_t count, i;
740 	char *name;
741 	int rc = 0;
742 
743 	spdk_json_write_named_object_begin(w, "lvol");
744 
745 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
746 	if (!lvs_bdev) {
747 		SPDK_ERRLOG("No such lvol store found\n");
748 		rc = -ENODEV;
749 		goto end;
750 	}
751 
752 	bdev = lvs_bdev->bdev;
753 
754 	spdk_json_write_named_uuid(w, "lvol_store_uuid", &lvol->lvol_store->uuid);
755 
756 	spdk_json_write_named_string(w, "base_bdev", spdk_bdev_get_name(bdev));
757 
758 	blob = lvol->blob;
759 
760 	spdk_json_write_named_bool(w, "thin_provision", spdk_blob_is_thin_provisioned(blob));
761 
762 	spdk_json_write_named_uint64(w, "num_allocated_clusters",
763 				     spdk_blob_get_num_allocated_clusters(blob));
764 
765 	spdk_json_write_named_bool(w, "snapshot", spdk_blob_is_snapshot(blob));
766 
767 	spdk_json_write_named_bool(w, "clone", spdk_blob_is_clone(blob));
768 
769 	if (spdk_blob_is_clone(blob)) {
770 		spdk_blob_id snapshotid = spdk_blob_get_parent_snapshot(lvol->lvol_store->blobstore, lvol->blob_id);
771 		if (snapshotid != SPDK_BLOBID_INVALID) {
772 			name = vbdev_lvol_find_name(lvol, snapshotid);
773 			if (name != NULL) {
774 				spdk_json_write_named_string(w, "base_snapshot", name);
775 			} else {
776 				SPDK_ERRLOG("Cannot obtain snapshots name\n");
777 			}
778 		}
779 	}
780 
781 	if (spdk_blob_is_snapshot(blob)) {
782 		/* Take a number of clones */
783 		rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
784 		if (rc == -ENOMEM && count > 0) {
785 			ids = malloc(sizeof(spdk_blob_id) * count);
786 			if (ids == NULL) {
787 				SPDK_ERRLOG("Cannot allocate memory\n");
788 				rc = -ENOMEM;
789 				goto end;
790 			}
791 
792 			rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, ids, &count);
793 			if (rc == 0) {
794 				spdk_json_write_named_array_begin(w, "clones");
795 				for (i = 0; i < count; i++) {
796 					name = vbdev_lvol_find_name(lvol, ids[i]);
797 					if (name != NULL) {
798 						spdk_json_write_string(w, name);
799 					} else {
800 						SPDK_ERRLOG("Cannot obtain clone name\n");
801 					}
802 
803 				}
804 				spdk_json_write_array_end(w);
805 			}
806 			free(ids);
807 		}
808 
809 	}
810 
811 	spdk_json_write_named_bool(w, "esnap_clone", spdk_blob_is_esnap_clone(blob));
812 
813 	if (spdk_blob_is_esnap_clone(blob)) {
814 		const char *name;
815 		size_t name_len;
816 
817 		rc = spdk_blob_get_esnap_id(blob, (const void **)&name, &name_len);
818 		if (rc == 0 && name != NULL && strnlen(name, name_len) + 1 == name_len) {
819 			spdk_json_write_named_string(w, "external_snapshot_name", name);
820 		}
821 	}
822 
823 end:
824 	spdk_json_write_object_end(w);
825 
826 	return rc;
827 }
828 
829 static void
830 vbdev_lvol_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
831 {
832 	/* Nothing to dump as lvol configuration is saved on physical device. */
833 }
834 
835 static struct spdk_io_channel *
836 vbdev_lvol_get_io_channel(void *ctx)
837 {
838 	struct spdk_lvol *lvol = ctx;
839 
840 	return spdk_lvol_get_io_channel(lvol);
841 }
842 
843 static bool
844 vbdev_lvol_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
845 {
846 	struct spdk_lvol *lvol = ctx;
847 
848 	switch (io_type) {
849 	case SPDK_BDEV_IO_TYPE_WRITE:
850 	case SPDK_BDEV_IO_TYPE_UNMAP:
851 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
852 		return !spdk_blob_is_read_only(lvol->blob);
853 	case SPDK_BDEV_IO_TYPE_RESET:
854 	case SPDK_BDEV_IO_TYPE_READ:
855 	case SPDK_BDEV_IO_TYPE_SEEK_DATA:
856 	case SPDK_BDEV_IO_TYPE_SEEK_HOLE:
857 		return true;
858 	default:
859 		return false;
860 	}
861 }
862 
863 static void
864 lvol_op_comp(void *cb_arg, int bserrno)
865 {
866 	struct spdk_bdev_io *bdev_io = cb_arg;
867 	enum spdk_bdev_io_status status = SPDK_BDEV_IO_STATUS_SUCCESS;
868 
869 	if (bserrno != 0) {
870 		if (bserrno == -ENOMEM) {
871 			status = SPDK_BDEV_IO_STATUS_NOMEM;
872 		} else {
873 			status = SPDK_BDEV_IO_STATUS_FAILED;
874 		}
875 	}
876 
877 	spdk_bdev_io_complete(bdev_io, status);
878 }
879 
880 static void
881 lvol_unmap(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
882 {
883 	uint64_t start_page, num_pages;
884 	struct spdk_blob *blob = lvol->blob;
885 
886 	start_page = bdev_io->u.bdev.offset_blocks;
887 	num_pages = bdev_io->u.bdev.num_blocks;
888 
889 	spdk_blob_io_unmap(blob, ch, start_page, num_pages, lvol_op_comp, bdev_io);
890 }
891 
892 static void
893 lvol_seek_data(struct spdk_lvol *lvol, struct spdk_bdev_io *bdev_io)
894 {
895 	bdev_io->u.bdev.seek.offset = spdk_blob_get_next_allocated_io_unit(lvol->blob,
896 				      bdev_io->u.bdev.offset_blocks);
897 
898 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
899 }
900 
901 static void
902 lvol_seek_hole(struct spdk_lvol *lvol, struct spdk_bdev_io *bdev_io)
903 {
904 	bdev_io->u.bdev.seek.offset = spdk_blob_get_next_unallocated_io_unit(lvol->blob,
905 				      bdev_io->u.bdev.offset_blocks);
906 
907 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
908 }
909 
910 static void
911 lvol_write_zeroes(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
912 {
913 	uint64_t start_page, num_pages;
914 	struct spdk_blob *blob = lvol->blob;
915 
916 	start_page = bdev_io->u.bdev.offset_blocks;
917 	num_pages = bdev_io->u.bdev.num_blocks;
918 
919 	spdk_blob_io_write_zeroes(blob, ch, start_page, num_pages, lvol_op_comp, bdev_io);
920 }
921 
922 static void
923 lvol_read(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
924 {
925 	uint64_t start_page, num_pages;
926 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
927 	struct spdk_blob *blob = lvol->blob;
928 	struct vbdev_lvol_io *lvol_io = (struct vbdev_lvol_io *)bdev_io->driver_ctx;
929 
930 	start_page = bdev_io->u.bdev.offset_blocks;
931 	num_pages = bdev_io->u.bdev.num_blocks;
932 
933 	lvol_io->ext_io_opts.size = sizeof(lvol_io->ext_io_opts);
934 	lvol_io->ext_io_opts.memory_domain = bdev_io->u.bdev.memory_domain;
935 	lvol_io->ext_io_opts.memory_domain_ctx = bdev_io->u.bdev.memory_domain_ctx;
936 
937 	spdk_blob_io_readv_ext(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
938 			       num_pages, lvol_op_comp, bdev_io, &lvol_io->ext_io_opts);
939 }
940 
941 static void
942 lvol_write(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
943 {
944 	uint64_t start_page, num_pages;
945 	struct spdk_blob *blob = lvol->blob;
946 	struct vbdev_lvol_io *lvol_io = (struct vbdev_lvol_io *)bdev_io->driver_ctx;
947 
948 	start_page = bdev_io->u.bdev.offset_blocks;
949 	num_pages = bdev_io->u.bdev.num_blocks;
950 
951 	lvol_io->ext_io_opts.size = sizeof(lvol_io->ext_io_opts);
952 	lvol_io->ext_io_opts.memory_domain = bdev_io->u.bdev.memory_domain;
953 	lvol_io->ext_io_opts.memory_domain_ctx = bdev_io->u.bdev.memory_domain_ctx;
954 
955 	spdk_blob_io_writev_ext(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
956 				num_pages, lvol_op_comp, bdev_io, &lvol_io->ext_io_opts);
957 }
958 
959 static int
960 lvol_reset(struct spdk_bdev_io *bdev_io)
961 {
962 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
963 
964 	return 0;
965 }
966 
967 static void
968 lvol_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
969 {
970 	if (!success) {
971 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
972 		return;
973 	}
974 
975 	lvol_read(ch, bdev_io);
976 }
977 
978 static void
979 vbdev_lvol_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
980 {
981 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
982 
983 	switch (bdev_io->type) {
984 	case SPDK_BDEV_IO_TYPE_READ:
985 		spdk_bdev_io_get_buf(bdev_io, lvol_get_buf_cb,
986 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
987 		break;
988 	case SPDK_BDEV_IO_TYPE_WRITE:
989 		lvol_write(lvol, ch, bdev_io);
990 		break;
991 	case SPDK_BDEV_IO_TYPE_RESET:
992 		lvol_reset(bdev_io);
993 		break;
994 	case SPDK_BDEV_IO_TYPE_UNMAP:
995 		lvol_unmap(lvol, ch, bdev_io);
996 		break;
997 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
998 		lvol_write_zeroes(lvol, ch, bdev_io);
999 		break;
1000 	case SPDK_BDEV_IO_TYPE_SEEK_DATA:
1001 		lvol_seek_data(lvol, bdev_io);
1002 		break;
1003 	case SPDK_BDEV_IO_TYPE_SEEK_HOLE:
1004 		lvol_seek_hole(lvol, bdev_io);
1005 		break;
1006 	default:
1007 		SPDK_INFOLOG(vbdev_lvol, "lvol: unsupported I/O type %d\n", bdev_io->type);
1008 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1009 		return;
1010 	}
1011 	return;
1012 }
1013 
1014 static int
1015 vbdev_lvol_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
1016 {
1017 	struct spdk_lvol *lvol = ctx;
1018 	struct spdk_bdev *base_bdev, *esnap_bdev;
1019 	struct spdk_bs_dev *bs_dev;
1020 	struct spdk_lvol_store *lvs;
1021 	int base_cnt, esnap_cnt;
1022 
1023 	lvs = lvol->lvol_store;
1024 	base_bdev = lvs->bs_dev->get_base_bdev(lvol->lvol_store->bs_dev);
1025 
1026 	base_cnt = spdk_bdev_get_memory_domains(base_bdev, domains, array_size);
1027 	if (base_cnt < 0) {
1028 		return base_cnt;
1029 	}
1030 
1031 	if (lvol->blob == NULL) {
1032 		/*
1033 		 * This is probably called due to an open happening during blobstore load. Another
1034 		 * open will follow shortly that has lvol->blob set.
1035 		 */
1036 		return -EAGAIN;
1037 	}
1038 
1039 	if (!spdk_blob_is_esnap_clone(lvol->blob)) {
1040 		return base_cnt;
1041 	}
1042 
1043 	bs_dev = spdk_blob_get_esnap_bs_dev(lvol->blob);
1044 	if (bs_dev == NULL) {
1045 		assert(false);
1046 		SPDK_ERRLOG("lvol %s is an esnap clone but has no esnap device\n", lvol->unique_id);
1047 		return base_cnt;
1048 	}
1049 
1050 	if (bs_dev->get_base_bdev == NULL) {
1051 		/*
1052 		 * If this were a blob_bdev, we wouldn't be here. We are probably here because an
1053 		 * lvol bdev is being registered with spdk_bdev_register() before the external
1054 		 * snapshot bdev is loaded. Ideally, the load of a missing esnap would trigger an
1055 		 * event that causes the lvol bdev's memory domain information to be updated.
1056 		 */
1057 		return base_cnt;
1058 	}
1059 
1060 	esnap_bdev = bs_dev->get_base_bdev(bs_dev);
1061 	if (esnap_bdev == NULL) {
1062 		/*
1063 		 * The esnap bdev has not yet been loaded. Anyone that has opened at this point may
1064 		 * miss out on using memory domains if base_cnt is zero.
1065 		 */
1066 		SPDK_NOTICELOG("lvol %s reporting %d memory domains, not including missing esnap\n",
1067 			       lvol->unique_id, base_cnt);
1068 		return base_cnt;
1069 	}
1070 
1071 	if (base_cnt < array_size) {
1072 		array_size -= base_cnt;
1073 		domains += base_cnt;
1074 	} else {
1075 		array_size = 0;
1076 		domains = NULL;
1077 	}
1078 
1079 	esnap_cnt = spdk_bdev_get_memory_domains(esnap_bdev, domains, array_size);
1080 	if (esnap_cnt <= 0) {
1081 		return base_cnt;
1082 	}
1083 
1084 	return base_cnt + esnap_cnt;
1085 }
1086 
1087 static struct spdk_bdev_fn_table vbdev_lvol_fn_table = {
1088 	.destruct		= vbdev_lvol_unregister,
1089 	.io_type_supported	= vbdev_lvol_io_type_supported,
1090 	.submit_request		= vbdev_lvol_submit_request,
1091 	.get_io_channel		= vbdev_lvol_get_io_channel,
1092 	.dump_info_json		= vbdev_lvol_dump_info_json,
1093 	.write_config_json	= vbdev_lvol_write_config_json,
1094 	.get_memory_domains	= vbdev_lvol_get_memory_domains,
1095 };
1096 
1097 static void
1098 lvol_destroy_cb(void *cb_arg, int bdeverrno)
1099 {
1100 }
1101 
1102 static void
1103 _create_lvol_disk_destroy_cb(void *cb_arg, int bdeverrno)
1104 {
1105 	struct spdk_lvol *lvol = cb_arg;
1106 
1107 	if (bdeverrno < 0) {
1108 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
1109 			    lvol->unique_id);
1110 		return;
1111 	}
1112 
1113 	spdk_lvol_destroy(lvol, lvol_destroy_cb, NULL);
1114 }
1115 
1116 static void
1117 _create_lvol_disk_unload_cb(void *cb_arg, int bdeverrno)
1118 {
1119 	struct spdk_lvol *lvol = cb_arg;
1120 
1121 	if (bdeverrno < 0) {
1122 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
1123 			    lvol->unique_id);
1124 		return;
1125 	}
1126 
1127 	TAILQ_REMOVE(&lvol->lvol_store->lvols, lvol, link);
1128 	free(lvol);
1129 }
1130 
1131 static int
1132 _create_lvol_disk(struct spdk_lvol *lvol, bool destroy)
1133 {
1134 	struct spdk_bdev *bdev;
1135 	struct lvol_bdev *lvol_bdev;
1136 	struct lvol_store_bdev *lvs_bdev;
1137 	uint64_t total_size;
1138 	unsigned char *alias;
1139 	int rc;
1140 
1141 	if (spdk_lvol_is_degraded(lvol)) {
1142 		SPDK_NOTICELOG("lvol %s: blob is degraded: deferring bdev creation\n",
1143 			       lvol->unique_id);
1144 		return 0;
1145 	}
1146 
1147 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
1148 	if (lvs_bdev == NULL) {
1149 		SPDK_ERRLOG("No spdk lvs-bdev pair found for lvol %s\n", lvol->unique_id);
1150 		assert(false);
1151 		return -ENODEV;
1152 	}
1153 
1154 	lvol_bdev = calloc(1, sizeof(struct lvol_bdev));
1155 	if (!lvol_bdev) {
1156 		SPDK_ERRLOG("Cannot alloc memory for lvol bdev\n");
1157 		return -ENOMEM;
1158 	}
1159 
1160 	lvol_bdev->lvol = lvol;
1161 	lvol_bdev->lvs_bdev = lvs_bdev;
1162 
1163 	bdev = &lvol_bdev->bdev;
1164 	bdev->name = lvol->unique_id;
1165 	bdev->product_name = "Logical Volume";
1166 	bdev->blocklen = spdk_bs_get_io_unit_size(lvol->lvol_store->blobstore);
1167 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
1168 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
1169 	assert((total_size % bdev->blocklen) == 0);
1170 	bdev->blockcnt = total_size / bdev->blocklen;
1171 	bdev->uuid = lvol->uuid;
1172 	bdev->required_alignment = lvs_bdev->bdev->required_alignment;
1173 	bdev->split_on_optimal_io_boundary = true;
1174 	bdev->optimal_io_boundary = spdk_bs_get_cluster_size(lvol->lvol_store->blobstore) / bdev->blocklen;
1175 
1176 	bdev->ctxt = lvol;
1177 	bdev->fn_table = &vbdev_lvol_fn_table;
1178 	bdev->module = &g_lvol_if;
1179 	bdev->phys_blocklen = lvol->lvol_store->bs_dev->phys_blocklen;
1180 
1181 	/* Set default bdev reset waiting time. This value indicates how much
1182 	 * time a reset should wait before forcing a reset down to the underlying
1183 	 * bdev module.
1184 	 * Setting this parameter is mainly to avoid "empty" resets to a shared
1185 	 * bdev that may be used by multiple lvols. */
1186 	bdev->reset_io_drain_timeout = SPDK_BDEV_RESET_IO_DRAIN_RECOMMENDED_VALUE;
1187 
1188 	rc = spdk_bdev_register(bdev);
1189 	if (rc) {
1190 		free(lvol_bdev);
1191 		return rc;
1192 	}
1193 	lvol->bdev = bdev;
1194 
1195 	alias = spdk_sprintf_alloc("%s/%s", lvs_bdev->lvs->name, lvol->name);
1196 	if (alias == NULL) {
1197 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
1198 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
1199 						  _create_lvol_disk_unload_cb), lvol);
1200 		return -ENOMEM;
1201 	}
1202 
1203 	rc = spdk_bdev_alias_add(bdev, alias);
1204 	if (rc != 0) {
1205 		SPDK_ERRLOG("Cannot add alias to lvol bdev\n");
1206 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
1207 						  _create_lvol_disk_unload_cb), lvol);
1208 	}
1209 	free(alias);
1210 
1211 	return rc;
1212 }
1213 
1214 static void
1215 _vbdev_lvol_create_cb(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1216 {
1217 	struct spdk_lvol_with_handle_req *req = cb_arg;
1218 
1219 	if (lvolerrno < 0) {
1220 		goto end;
1221 	}
1222 
1223 	lvolerrno = _create_lvol_disk(lvol, true);
1224 
1225 end:
1226 	req->cb_fn(req->cb_arg, lvol, lvolerrno);
1227 	free(req);
1228 }
1229 
1230 int
1231 vbdev_lvol_create(struct spdk_lvol_store *lvs, const char *name, uint64_t sz,
1232 		  bool thin_provision, enum lvol_clear_method clear_method, spdk_lvol_op_with_handle_complete cb_fn,
1233 		  void *cb_arg)
1234 {
1235 	struct spdk_lvol_with_handle_req *req;
1236 	int rc;
1237 
1238 	req = calloc(1, sizeof(*req));
1239 	if (req == NULL) {
1240 		return -ENOMEM;
1241 	}
1242 	req->cb_fn = cb_fn;
1243 	req->cb_arg = cb_arg;
1244 
1245 	rc = spdk_lvol_create(lvs, name, sz, thin_provision, clear_method,
1246 			      _vbdev_lvol_create_cb, req);
1247 	if (rc != 0) {
1248 		free(req);
1249 	}
1250 
1251 	return rc;
1252 }
1253 
1254 void
1255 vbdev_lvol_create_snapshot(struct spdk_lvol *lvol, const char *snapshot_name,
1256 			   spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1257 {
1258 	struct spdk_lvol_with_handle_req *req;
1259 
1260 	req = calloc(1, sizeof(*req));
1261 	if (req == NULL) {
1262 		cb_fn(cb_arg, NULL, -ENOMEM);
1263 		return;
1264 	}
1265 
1266 	req->cb_fn = cb_fn;
1267 	req->cb_arg = cb_arg;
1268 
1269 	spdk_lvol_create_snapshot(lvol, snapshot_name, _vbdev_lvol_create_cb, req);
1270 }
1271 
1272 void
1273 vbdev_lvol_create_clone(struct spdk_lvol *lvol, const char *clone_name,
1274 			spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1275 {
1276 	struct spdk_lvol_with_handle_req *req;
1277 
1278 	req = calloc(1, sizeof(*req));
1279 	if (req == NULL) {
1280 		cb_fn(cb_arg, NULL, -ENOMEM);
1281 		return;
1282 	}
1283 
1284 	req->cb_fn = cb_fn;
1285 	req->cb_arg = cb_arg;
1286 
1287 	spdk_lvol_create_clone(lvol, clone_name, _vbdev_lvol_create_cb, req);
1288 }
1289 
1290 static void
1291 ignore_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx)
1292 {
1293 }
1294 
1295 void
1296 vbdev_lvol_create_bdev_clone(const char *esnap_name,
1297 			     struct spdk_lvol_store *lvs, const char *clone_name,
1298 			     spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1299 {
1300 	struct spdk_lvol_with_handle_req *req;
1301 	struct spdk_bdev_desc *desc;
1302 	struct spdk_bdev *bdev;
1303 	char bdev_uuid[SPDK_UUID_STRING_LEN];
1304 	uint64_t sz;
1305 	int rc;
1306 
1307 	if (lvs == NULL) {
1308 		SPDK_ERRLOG("lvol store not specified\n");
1309 		cb_fn(cb_arg, NULL, -EINVAL);
1310 		return;
1311 	}
1312 
1313 	rc = spdk_bdev_open_ext(esnap_name, false, ignore_bdev_event_cb, NULL, &desc);
1314 	if (rc != 0) {
1315 		SPDK_ERRLOG("bdev '%s' could not be opened: error %d\n", esnap_name, rc);
1316 		cb_fn(cb_arg, NULL, rc);
1317 		return;
1318 	}
1319 	bdev = spdk_bdev_desc_get_bdev(desc);
1320 
1321 	rc = spdk_uuid_fmt_lower(bdev_uuid, sizeof(bdev_uuid), spdk_bdev_get_uuid(bdev));
1322 	if (rc != 0) {
1323 		spdk_bdev_close(desc);
1324 		SPDK_ERRLOG("bdev %s: unable to parse UUID\n", esnap_name);
1325 		assert(false);
1326 		cb_fn(cb_arg, NULL, -ENODEV);
1327 		return;
1328 	}
1329 
1330 	req = calloc(1, sizeof(*req));
1331 	if (req == NULL) {
1332 		spdk_bdev_close(desc);
1333 		cb_fn(cb_arg, NULL, -ENOMEM);
1334 		return;
1335 	}
1336 
1337 	req->cb_fn = cb_fn;
1338 	req->cb_arg = cb_arg;
1339 
1340 	sz = spdk_bdev_get_num_blocks(bdev) * spdk_bdev_get_block_size(bdev);
1341 	rc = spdk_lvol_create_esnap_clone(bdev_uuid, sizeof(bdev_uuid), sz, lvs, clone_name,
1342 					  _vbdev_lvol_create_cb, req);
1343 	spdk_bdev_close(desc);
1344 	if (rc != 0) {
1345 		cb_fn(cb_arg, NULL, rc);
1346 		free(req);
1347 	}
1348 }
1349 
1350 static void
1351 _vbdev_lvol_rename_cb(void *cb_arg, int lvolerrno)
1352 {
1353 	struct spdk_lvol_req *req = cb_arg;
1354 
1355 	if (lvolerrno != 0) {
1356 		SPDK_ERRLOG("Renaming lvol failed\n");
1357 	}
1358 
1359 	req->cb_fn(req->cb_arg, lvolerrno);
1360 	free(req);
1361 }
1362 
1363 void
1364 vbdev_lvol_rename(struct spdk_lvol *lvol, const char *new_lvol_name,
1365 		  spdk_lvol_op_complete cb_fn, void *cb_arg)
1366 {
1367 	struct spdk_lvol_req *req;
1368 	int rc;
1369 
1370 	rc = _vbdev_lvol_change_bdev_alias(lvol, new_lvol_name);
1371 	if (rc != 0) {
1372 		SPDK_ERRLOG("renaming lvol to '%s' does not succeed\n", new_lvol_name);
1373 		cb_fn(cb_arg, rc);
1374 		return;
1375 	}
1376 
1377 	req = calloc(1, sizeof(*req));
1378 	if (req == NULL) {
1379 		cb_fn(cb_arg, -ENOMEM);
1380 		return;
1381 	}
1382 	req->cb_fn = cb_fn;
1383 	req->cb_arg = cb_arg;
1384 
1385 	spdk_lvol_rename(lvol, new_lvol_name, _vbdev_lvol_rename_cb, req);
1386 }
1387 
1388 static void
1389 _vbdev_lvol_resize_cb(void *cb_arg, int lvolerrno)
1390 {
1391 	struct spdk_lvol_req *req = cb_arg;
1392 	struct spdk_lvol *lvol = req->lvol;
1393 	uint64_t total_size;
1394 
1395 	/* change bdev size */
1396 	if (lvolerrno != 0) {
1397 		SPDK_ERRLOG("CB function for bdev lvol %s receive error no: %d.\n", lvol->name, lvolerrno);
1398 		goto finish;
1399 	}
1400 
1401 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
1402 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
1403 	assert((total_size % lvol->bdev->blocklen) == 0);
1404 
1405 	lvolerrno = spdk_bdev_notify_blockcnt_change(lvol->bdev, total_size / lvol->bdev->blocklen);
1406 	if (lvolerrno != 0) {
1407 		SPDK_ERRLOG("Could not change num blocks for bdev lvol %s with error no: %d.\n",
1408 			    lvol->name, lvolerrno);
1409 	}
1410 
1411 finish:
1412 	req->cb_fn(req->cb_arg, lvolerrno);
1413 	free(req);
1414 }
1415 
1416 void
1417 vbdev_lvol_resize(struct spdk_lvol *lvol, uint64_t sz, spdk_lvol_op_complete cb_fn, void *cb_arg)
1418 {
1419 	struct spdk_lvol_req *req;
1420 
1421 	if (lvol == NULL) {
1422 		SPDK_ERRLOG("lvol does not exist\n");
1423 		cb_fn(cb_arg, -EINVAL);
1424 		return;
1425 	}
1426 
1427 	assert(lvol->bdev != NULL);
1428 
1429 	req = calloc(1, sizeof(*req));
1430 	if (req == NULL) {
1431 		cb_fn(cb_arg, -ENOMEM);
1432 		return;
1433 	}
1434 
1435 	req->cb_fn = cb_fn;
1436 	req->cb_arg = cb_arg;
1437 	req->sz = sz;
1438 	req->lvol = lvol;
1439 
1440 	spdk_lvol_resize(req->lvol, req->sz, _vbdev_lvol_resize_cb, req);
1441 }
1442 
1443 static void
1444 _vbdev_lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
1445 {
1446 	struct spdk_lvol_req *req = cb_arg;
1447 	struct spdk_lvol *lvol = req->lvol;
1448 
1449 	if (lvolerrno != 0) {
1450 		SPDK_ERRLOG("Could not set bdev lvol %s as read only due to error: %d.\n", lvol->name, lvolerrno);
1451 	}
1452 
1453 	req->cb_fn(req->cb_arg, lvolerrno);
1454 	free(req);
1455 }
1456 
1457 void
1458 vbdev_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1459 {
1460 	struct spdk_lvol_req *req;
1461 
1462 	if (lvol == NULL) {
1463 		SPDK_ERRLOG("lvol does not exist\n");
1464 		cb_fn(cb_arg, -EINVAL);
1465 		return;
1466 	}
1467 
1468 	assert(lvol->bdev != NULL);
1469 
1470 	req = calloc(1, sizeof(*req));
1471 	if (req == NULL) {
1472 		cb_fn(cb_arg, -ENOMEM);
1473 		return;
1474 	}
1475 
1476 	req->cb_fn = cb_fn;
1477 	req->cb_arg = cb_arg;
1478 	req->lvol = lvol;
1479 
1480 	spdk_lvol_set_read_only(lvol, _vbdev_lvol_set_read_only_cb, req);
1481 }
1482 
1483 static int
1484 vbdev_lvs_init(void)
1485 {
1486 	return 0;
1487 }
1488 
1489 static void vbdev_lvs_fini_start_iter(struct lvol_store_bdev *lvs_bdev);
1490 
1491 static void
1492 vbdev_lvs_fini_start_unload_cb(void *cb_arg, int lvserrno)
1493 {
1494 	struct lvol_store_bdev *lvs_bdev = cb_arg;
1495 	struct lvol_store_bdev *next_lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
1496 
1497 	if (lvserrno != 0) {
1498 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
1499 	}
1500 
1501 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1502 	free(lvs_bdev);
1503 
1504 	vbdev_lvs_fini_start_iter(next_lvs_bdev);
1505 }
1506 
1507 static void
1508 vbdev_lvs_fini_start_iter(struct lvol_store_bdev *lvs_bdev)
1509 {
1510 	struct spdk_lvol_store *lvs;
1511 
1512 	while (lvs_bdev != NULL) {
1513 		lvs = lvs_bdev->lvs;
1514 
1515 		if (_vbdev_lvs_are_lvols_closed(lvs)) {
1516 			spdk_lvs_unload(lvs, vbdev_lvs_fini_start_unload_cb, lvs_bdev);
1517 			return;
1518 		}
1519 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
1520 	}
1521 
1522 	spdk_bdev_module_fini_start_done();
1523 }
1524 
1525 static void
1526 vbdev_lvs_fini_start(void)
1527 {
1528 	g_shutdown_started = true;
1529 	vbdev_lvs_fini_start_iter(vbdev_lvol_store_first());
1530 }
1531 
1532 static int
1533 vbdev_lvs_get_ctx_size(void)
1534 {
1535 	return sizeof(struct vbdev_lvol_io);
1536 }
1537 
1538 static void
1539 _vbdev_lvs_examine_done(struct spdk_lvs_req *req, int lvserrno)
1540 {
1541 	req->cb_fn(req->cb_arg, lvserrno);
1542 }
1543 
1544 static void
1545 _vbdev_lvs_examine_failed(void *cb_arg, int lvserrno)
1546 {
1547 	struct spdk_lvs_req *req = cb_arg;
1548 
1549 	_vbdev_lvs_examine_done(req, req->lvserrno);
1550 }
1551 
1552 static void
1553 _vbdev_lvs_examine_finish(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1554 {
1555 	struct spdk_lvs_req *req = cb_arg;
1556 	struct spdk_lvol_store *lvs = req->lvol_store;
1557 
1558 	if (lvolerrno != 0) {
1559 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
1560 		if (lvolerrno == -ENOMEM) {
1561 			TAILQ_INSERT_TAIL(&lvs->retry_open_lvols, lvol, link);
1562 			return;
1563 		}
1564 		SPDK_ERRLOG("Error opening lvol %s\n", lvol->unique_id);
1565 		lvs->lvol_count--;
1566 		free(lvol);
1567 		goto end;
1568 	}
1569 
1570 	if (_create_lvol_disk(lvol, false)) {
1571 		SPDK_ERRLOG("Cannot create bdev for lvol %s\n", lvol->unique_id);
1572 		lvs->lvol_count--;
1573 		goto end;
1574 	}
1575 
1576 	lvs->lvols_opened++;
1577 	SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s succeeded\n", lvol->unique_id);
1578 
1579 end:
1580 	if (!TAILQ_EMPTY(&lvs->retry_open_lvols)) {
1581 		lvol = TAILQ_FIRST(&lvs->retry_open_lvols);
1582 		TAILQ_REMOVE(&lvs->retry_open_lvols, lvol, link);
1583 		TAILQ_INSERT_HEAD(&lvs->lvols, lvol, link);
1584 		spdk_lvol_open(lvol, _vbdev_lvs_examine_finish, req);
1585 		return;
1586 	}
1587 	if (lvs->lvols_opened >= lvs->lvol_count) {
1588 		SPDK_INFOLOG(vbdev_lvol, "Opening lvols finished\n");
1589 		_vbdev_lvs_examine_done(req, 0);
1590 	}
1591 }
1592 
1593 /* Walks a tree of clones that are no longer degraded to create bdevs. */
1594 static int
1595 create_esnap_clone_lvol_disks(void *ctx, struct spdk_lvol *lvol)
1596 {
1597 	struct spdk_bdev *bdev = ctx;
1598 	int rc;
1599 
1600 	rc = _create_lvol_disk(lvol, false);
1601 	if (rc != 0) {
1602 		SPDK_ERRLOG("lvol %s: failed to create bdev after esnap hotplug of %s: %d\n",
1603 			    lvol->unique_id, spdk_bdev_get_name(bdev), rc);
1604 		/* Do not prevent creation of other clones in case of one failure. */
1605 		return 0;
1606 	}
1607 
1608 	return spdk_lvol_iter_immediate_clones(lvol, create_esnap_clone_lvol_disks, ctx);
1609 }
1610 
1611 static void
1612 vbdev_lvs_hotplug(void *ctx, struct spdk_lvol *lvol, int lvolerrno)
1613 {
1614 	struct spdk_bdev *esnap_clone_bdev = ctx;
1615 
1616 	if (lvolerrno != 0) {
1617 		SPDK_ERRLOG("lvol %s: during examine of bdev %s: not creating clone bdev due to "
1618 			    "error %d\n", lvol->unique_id, spdk_bdev_get_name(esnap_clone_bdev),
1619 			    lvolerrno);
1620 		return;
1621 	}
1622 	create_esnap_clone_lvol_disks(esnap_clone_bdev, lvol);
1623 }
1624 
1625 static void
1626 vbdev_lvs_examine_config(struct spdk_bdev *bdev)
1627 {
1628 	char uuid_str[SPDK_UUID_STRING_LEN];
1629 
1630 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid);
1631 
1632 	if (spdk_lvs_notify_hotplug(uuid_str, sizeof(uuid_str), vbdev_lvs_hotplug, bdev)) {
1633 		SPDK_INFOLOG(vbdev_lvol, "bdev %s: claimed by one or more esnap clones\n",
1634 			     uuid_str);
1635 	}
1636 	spdk_bdev_module_examine_done(&g_lvol_if);
1637 }
1638 
1639 static void
1640 _vbdev_lvs_examine_cb(void *arg, struct spdk_lvol_store *lvol_store, int lvserrno)
1641 {
1642 	struct lvol_store_bdev *lvs_bdev;
1643 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)arg;
1644 	struct spdk_lvol *lvol, *tmp;
1645 	struct spdk_lvs_req *ori_req = req->cb_arg;
1646 
1647 	if (lvserrno == -EEXIST) {
1648 		SPDK_INFOLOG(vbdev_lvol,
1649 			     "Name for lvolstore on device %s conflicts with name for already loaded lvs\n",
1650 			     req->base_bdev->name);
1651 		/* On error blobstore destroys bs_dev itself */
1652 		_vbdev_lvs_examine_done(ori_req, lvserrno);
1653 		goto end;
1654 	} else if (lvserrno != 0) {
1655 		SPDK_INFOLOG(vbdev_lvol, "Lvol store not found on %s\n", req->base_bdev->name);
1656 		/* On error blobstore destroys bs_dev itself */
1657 		_vbdev_lvs_examine_done(ori_req, lvserrno);
1658 		goto end;
1659 	}
1660 
1661 	lvserrno = spdk_bs_bdev_claim(lvol_store->bs_dev, &g_lvol_if);
1662 	if (lvserrno != 0) {
1663 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
1664 		ori_req->lvserrno = lvserrno;
1665 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, ori_req);
1666 		goto end;
1667 	}
1668 
1669 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
1670 	if (!lvs_bdev) {
1671 		SPDK_ERRLOG("Cannot alloc memory for lvs_bdev\n");
1672 		ori_req->lvserrno = lvserrno;
1673 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, ori_req);
1674 		goto end;
1675 	}
1676 
1677 	lvs_bdev->lvs = lvol_store;
1678 	lvs_bdev->bdev = req->base_bdev;
1679 
1680 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1681 
1682 	SPDK_INFOLOG(vbdev_lvol, "Lvol store found on %s - begin parsing\n",
1683 		     req->base_bdev->name);
1684 
1685 	lvol_store->lvols_opened = 0;
1686 
1687 	ori_req->lvol_store = lvol_store;
1688 
1689 	if (TAILQ_EMPTY(&lvol_store->lvols)) {
1690 		SPDK_INFOLOG(vbdev_lvol, "Lvol store examination done\n");
1691 		_vbdev_lvs_examine_done(ori_req, 0);
1692 	} else {
1693 		/* Open all lvols */
1694 		TAILQ_FOREACH_SAFE(lvol, &lvol_store->lvols, link, tmp) {
1695 			spdk_lvol_open(lvol, _vbdev_lvs_examine_finish, ori_req);
1696 		}
1697 	}
1698 
1699 end:
1700 	free(req);
1701 }
1702 
1703 static void
1704 _vbdev_lvs_examine(struct spdk_bdev *bdev, struct spdk_lvs_req *ori_req,
1705 		   void (*action)(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg))
1706 {
1707 	struct spdk_bs_dev *bs_dev;
1708 	struct spdk_lvs_with_handle_req *req;
1709 	int rc;
1710 
1711 	req = calloc(1, sizeof(*req));
1712 	if (req == NULL) {
1713 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1714 		_vbdev_lvs_examine_done(ori_req, -ENOMEM);
1715 		return;
1716 	}
1717 
1718 	rc = spdk_bdev_create_bs_dev_ext(bdev->name, vbdev_lvs_base_bdev_event_cb,
1719 					 NULL, &bs_dev);
1720 	if (rc < 0) {
1721 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n", bdev->name);
1722 		_vbdev_lvs_examine_done(ori_req, rc);
1723 		free(req);
1724 		return;
1725 	}
1726 
1727 	req->base_bdev = bdev;
1728 	req->cb_arg = ori_req;
1729 
1730 	action(bs_dev, _vbdev_lvs_examine_cb, req);
1731 }
1732 
1733 static void
1734 vbdev_lvs_examine_done(void *arg, int lvserrno)
1735 {
1736 	struct spdk_lvs_req *req = arg;
1737 
1738 	spdk_bdev_module_examine_done(&g_lvol_if);
1739 	free(req);
1740 }
1741 
1742 static void
1743 vbdev_lvs_load(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
1744 {
1745 	struct spdk_lvs_opts lvs_opts;
1746 
1747 	spdk_lvs_opts_init(&lvs_opts);
1748 	lvs_opts.esnap_bs_dev_create = vbdev_lvol_esnap_dev_create;
1749 	spdk_lvs_load_ext(bs_dev, &lvs_opts, cb_fn, cb_arg);
1750 }
1751 
1752 static void
1753 vbdev_lvs_examine_disk(struct spdk_bdev *bdev)
1754 {
1755 	struct spdk_lvs_req *req;
1756 
1757 	if (spdk_bdev_get_md_size(bdev) != 0) {
1758 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n which is formatted with metadata",
1759 			     bdev->name);
1760 		spdk_bdev_module_examine_done(&g_lvol_if);
1761 		return;
1762 	}
1763 
1764 	req = calloc(1, sizeof(*req));
1765 	if (req == NULL) {
1766 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1767 		spdk_bdev_module_examine_done(&g_lvol_if);
1768 		return;
1769 	}
1770 
1771 	req->cb_fn = vbdev_lvs_examine_done;
1772 	req->cb_arg = req;
1773 
1774 	_vbdev_lvs_examine(bdev, req, vbdev_lvs_load);
1775 }
1776 
1777 struct spdk_lvol *
1778 vbdev_lvol_get_from_bdev(struct spdk_bdev *bdev)
1779 {
1780 	if (!bdev || bdev->module != &g_lvol_if) {
1781 		return NULL;
1782 	}
1783 
1784 	if (bdev->ctxt == NULL) {
1785 		SPDK_ERRLOG("No lvol ctx assigned to bdev %s\n", bdev->name);
1786 		return NULL;
1787 	}
1788 
1789 	return (struct spdk_lvol *)bdev->ctxt;
1790 }
1791 
1792 /* Begin degraded blobstore device */
1793 
1794 /*
1795  * When an external snapshot is missing, an instance of bs_dev_degraded is used as the blob's
1796  * back_bs_dev. No bdev is registered, so there should be no IO nor requests for channels. The main
1797  * purposes of this device are to prevent blobstore from hitting fatal runtime errors and to
1798  * indicate that the blob is degraded via the is_degraded() callback.
1799  */
1800 
1801 static void
1802 bs_dev_degraded_read(struct spdk_bs_dev *dev, struct spdk_io_channel *channel, void *payload,
1803 		     uint64_t lba, uint32_t lba_count, struct spdk_bs_dev_cb_args *cb_args)
1804 {
1805 	assert(false);
1806 	cb_args->cb_fn(cb_args->channel, cb_args->cb_arg, -EIO);
1807 }
1808 
1809 static void
1810 bs_dev_degraded_readv(struct spdk_bs_dev *dev, struct spdk_io_channel *channel,
1811 		      struct iovec *iov, int iovcnt, uint64_t lba, uint32_t lba_count,
1812 		      struct spdk_bs_dev_cb_args *cb_args)
1813 {
1814 	assert(false);
1815 	cb_args->cb_fn(cb_args->channel, cb_args->cb_arg, -EIO);
1816 }
1817 
1818 static void
1819 bs_dev_degraded_readv_ext(struct spdk_bs_dev *dev, struct spdk_io_channel *channel,
1820 			  struct iovec *iov, int iovcnt, uint64_t lba, uint32_t lba_count,
1821 			  struct spdk_bs_dev_cb_args *cb_args,
1822 			  struct spdk_blob_ext_io_opts *io_opts)
1823 {
1824 	assert(false);
1825 	cb_args->cb_fn(cb_args->channel, cb_args->cb_arg, -EIO);
1826 }
1827 
1828 static bool
1829 bs_dev_degraded_is_zeroes(struct spdk_bs_dev *dev, uint64_t lba, uint64_t lba_count)
1830 {
1831 	assert(false);
1832 	return false;
1833 }
1834 
1835 static bool
1836 bs_dev_degraded_is_range_valid(struct spdk_bs_dev *dev, uint64_t lba, uint64_t lba_count)
1837 {
1838 	assert(false);
1839 	return false;
1840 }
1841 
1842 static struct spdk_io_channel *
1843 bs_dev_degraded_create_channel(struct spdk_bs_dev *bs_dev)
1844 {
1845 	assert(false);
1846 	return NULL;
1847 }
1848 
1849 static void
1850 bs_dev_degraded_destroy_channel(struct spdk_bs_dev *bs_dev, struct spdk_io_channel *channel)
1851 {
1852 	assert(false);
1853 }
1854 
1855 static void
1856 bs_dev_degraded_destroy(struct spdk_bs_dev *bs_dev)
1857 {
1858 }
1859 
1860 static bool
1861 bs_dev_degraded_is_degraded(struct spdk_bs_dev *bs_dev)
1862 {
1863 	return true;
1864 }
1865 
1866 static struct spdk_bs_dev bs_dev_degraded = {
1867 	.create_channel = bs_dev_degraded_create_channel,
1868 	.destroy_channel = bs_dev_degraded_destroy_channel,
1869 	.destroy = bs_dev_degraded_destroy,
1870 	.read = bs_dev_degraded_read,
1871 	.readv = bs_dev_degraded_readv,
1872 	.readv_ext = bs_dev_degraded_readv_ext,
1873 	.is_zeroes = bs_dev_degraded_is_zeroes,
1874 	.is_range_valid = bs_dev_degraded_is_range_valid,
1875 	.is_degraded = bs_dev_degraded_is_degraded,
1876 	/* Make the device as large as possible without risk of uint64 overflow. */
1877 	.blockcnt = UINT64_MAX / 512,
1878 	/* Prevent divide by zero errors calculating LBAs that will never be read. */
1879 	.blocklen = 512,
1880 };
1881 
1882 /* End degraded blobstore device */
1883 
1884 /* Begin external snapshot support */
1885 
1886 static void
1887 vbdev_lvol_esnap_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1888 			       void *event_ctx)
1889 {
1890 	SPDK_NOTICELOG("bdev name (%s) received unsupported event type %d\n",
1891 		       spdk_bdev_get_name(bdev), type);
1892 }
1893 
1894 int
1895 vbdev_lvol_esnap_dev_create(void *bs_ctx, void *blob_ctx, struct spdk_blob *blob,
1896 			    const void *esnap_id, uint32_t id_len,
1897 			    struct spdk_bs_dev **_bs_dev)
1898 {
1899 	struct spdk_lvol_store	*lvs = bs_ctx;
1900 	struct spdk_lvol	*lvol = blob_ctx;
1901 	struct spdk_bs_dev	*bs_dev = NULL;
1902 	struct spdk_uuid	uuid;
1903 	int			rc;
1904 	char			uuid_str[SPDK_UUID_STRING_LEN] = { 0 };
1905 
1906 	if (esnap_id == NULL) {
1907 		SPDK_ERRLOG("lvol %s: NULL esnap ID\n", lvol->unique_id);
1908 		return -EINVAL;
1909 	}
1910 
1911 	/* Guard against arbitrary names and unterminated UUID strings */
1912 	if (id_len != SPDK_UUID_STRING_LEN) {
1913 		SPDK_ERRLOG("lvol %s: Invalid esnap ID length (%u)\n", lvol->unique_id, id_len);
1914 		return -EINVAL;
1915 	}
1916 
1917 	if (spdk_uuid_parse(&uuid, esnap_id)) {
1918 		SPDK_ERRLOG("lvol %s: Invalid esnap ID: not a UUID\n", lvol->unique_id);
1919 		return -EINVAL;
1920 	}
1921 
1922 	/* Format the UUID the same as it is in the bdev names tree. */
1923 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &uuid);
1924 	if (strcmp(uuid_str, esnap_id) != 0) {
1925 		SPDK_WARNLOG("lvol %s: esnap_id '%*s' does not match parsed uuid '%s'\n",
1926 			     lvol->unique_id, SPDK_UUID_STRING_LEN, (const char *)esnap_id,
1927 			     uuid_str);
1928 		assert(false);
1929 	}
1930 
1931 	rc = spdk_bdev_create_bs_dev(uuid_str, false, NULL, 0,
1932 				     vbdev_lvol_esnap_bdev_event_cb, NULL, &bs_dev);
1933 	if (rc != 0) {
1934 		goto fail;
1935 	}
1936 
1937 	rc = spdk_bs_bdev_claim(bs_dev, &g_lvol_if);
1938 	if (rc != 0) {
1939 		SPDK_ERRLOG("lvol %s: unable to claim esnap bdev '%s': %d\n", lvol->unique_id,
1940 			    uuid_str, rc);
1941 		bs_dev->destroy(bs_dev);
1942 		goto fail;
1943 	}
1944 
1945 	*_bs_dev = bs_dev;
1946 	return 0;
1947 
1948 fail:
1949 	/* Unable to open or claim the bdev. This lvol is degraded. */
1950 	bs_dev = &bs_dev_degraded;
1951 	SPDK_NOTICELOG("lvol %s: bdev %s not available: lvol is degraded\n", lvol->unique_id,
1952 		       uuid_str);
1953 
1954 	/*
1955 	 * Be sure not to call spdk_lvs_missing_add() on an lvol that is already degraded. This can
1956 	 * lead to a cycle in the degraded_lvols tailq.
1957 	 */
1958 	if (lvol->degraded_set == NULL) {
1959 		rc = spdk_lvs_esnap_missing_add(lvs, lvol, uuid_str, sizeof(uuid_str));
1960 		if (rc != 0) {
1961 			SPDK_NOTICELOG("lvol %s: unable to register missing esnap device %s: "
1962 				       "it will not be hotplugged if added later\n",
1963 				       lvol->unique_id, uuid_str);
1964 		}
1965 	}
1966 
1967 	*_bs_dev = bs_dev;
1968 	return 0;
1969 }
1970 
1971 /* End external snapshot support */
1972 
1973 static void
1974 _vbdev_lvol_shallow_copy_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1975 		void *event_ctx)
1976 {
1977 }
1978 
1979 static void
1980 _vbdev_lvol_shallow_copy_cb(void *cb_arg, int lvolerrno)
1981 {
1982 	struct spdk_lvol_copy_req *req = cb_arg;
1983 	struct spdk_lvol *lvol = req->lvol;
1984 
1985 	if (lvolerrno != 0) {
1986 		SPDK_ERRLOG("Could not make a shallow copy of lvol %s due to error: %d\n",
1987 			    lvol->name, lvolerrno);
1988 	}
1989 
1990 	req->ext_dev->destroy(req->ext_dev);
1991 	req->cb_fn(req->cb_arg, lvolerrno);
1992 	free(req);
1993 }
1994 
1995 int
1996 vbdev_lvol_shallow_copy(struct spdk_lvol *lvol, const char *bdev_name,
1997 			spdk_blob_shallow_copy_status status_cb_fn, void *status_cb_arg,
1998 			spdk_lvol_op_complete cb_fn, void *cb_arg)
1999 {
2000 	struct spdk_bs_dev *ext_dev;
2001 	struct spdk_lvol_copy_req *req;
2002 	int rc;
2003 
2004 	if (lvol == NULL) {
2005 		SPDK_ERRLOG("lvol must not be NULL\n");
2006 		return -EINVAL;
2007 	}
2008 
2009 	if (bdev_name == NULL) {
2010 		SPDK_ERRLOG("lvol %s, bdev name must not be NULL\n", lvol->name);
2011 		return -EINVAL;
2012 	}
2013 
2014 	assert(lvol->bdev != NULL);
2015 
2016 	req = calloc(1, sizeof(*req));
2017 	if (req == NULL) {
2018 		SPDK_ERRLOG("lvol %s, cannot alloc memory for lvol copy request\n", lvol->name);
2019 		return -ENOMEM;
2020 	}
2021 
2022 	rc = spdk_bdev_create_bs_dev_ext(bdev_name, _vbdev_lvol_shallow_copy_base_bdev_event_cb,
2023 					 NULL, &ext_dev);
2024 	if (rc < 0) {
2025 		SPDK_ERRLOG("lvol %s, cannot create blobstore block device from bdev %s\n", lvol->name, bdev_name);
2026 		free(req);
2027 		return rc;
2028 	}
2029 
2030 	rc = spdk_bs_bdev_claim(ext_dev, &g_lvol_if);
2031 	if (rc != 0) {
2032 		SPDK_ERRLOG("lvol %s, unable to claim bdev %s, error %d\n", lvol->name, bdev_name, rc);
2033 		ext_dev->destroy(ext_dev);
2034 		free(req);
2035 		return rc;
2036 	}
2037 
2038 	req->cb_fn = cb_fn;
2039 	req->cb_arg = cb_arg;
2040 	req->lvol = lvol;
2041 	req->ext_dev = ext_dev;
2042 
2043 	rc = spdk_lvol_shallow_copy(lvol, ext_dev, status_cb_fn, status_cb_arg, _vbdev_lvol_shallow_copy_cb,
2044 				    req);
2045 
2046 	if (rc < 0) {
2047 		ext_dev->destroy(ext_dev);
2048 		free(req);
2049 	}
2050 
2051 	return rc;
2052 }
2053 
2054 void
2055 vbdev_lvol_set_external_parent(struct spdk_lvol *lvol, const char *esnap_name,
2056 			       spdk_lvol_op_complete cb_fn, void *cb_arg)
2057 {
2058 	struct spdk_bdev_desc *desc;
2059 	struct spdk_bdev *bdev;
2060 	char bdev_uuid[SPDK_UUID_STRING_LEN];
2061 	int rc;
2062 
2063 	rc = spdk_bdev_open_ext(esnap_name, false, ignore_bdev_event_cb, NULL, &desc);
2064 	if (rc != 0) {
2065 		SPDK_ERRLOG("bdev '%s' could not be opened: error %d\n", esnap_name, rc);
2066 		cb_fn(cb_arg, -ENODEV);
2067 		return;
2068 	}
2069 	bdev = spdk_bdev_desc_get_bdev(desc);
2070 
2071 	rc = spdk_uuid_fmt_lower(bdev_uuid, sizeof(bdev_uuid), spdk_bdev_get_uuid(bdev));
2072 	if (rc != 0) {
2073 		spdk_bdev_close(desc);
2074 		SPDK_ERRLOG("bdev %s: unable to parse UUID\n", esnap_name);
2075 		assert(false);
2076 		cb_fn(cb_arg, -ENODEV);
2077 		return;
2078 	}
2079 
2080 	/*
2081 	 * If lvol store is not loaded from disk, and so vbdev_lvs_load is not called, these
2082 	 * assignments are necessary to let vbdev_lvol_esnap_dev_create be called.
2083 	 */
2084 	lvol->lvol_store->load_esnaps = true;
2085 	lvol->lvol_store->esnap_bs_dev_create = vbdev_lvol_esnap_dev_create;
2086 
2087 	spdk_lvol_set_external_parent(lvol, bdev_uuid, sizeof(bdev_uuid), cb_fn, cb_arg);
2088 
2089 	spdk_bdev_close(desc);
2090 }
2091 
2092 SPDK_LOG_REGISTER_COMPONENT(vbdev_lvol)
2093