xref: /spdk/module/bdev/lvol/vbdev_lvol.c (revision b217432fa83bdca50a1723d18a32b9fac1a7d8f9)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/rpc.h"
8 #include "spdk/bdev_module.h"
9 #include "spdk/log.h"
10 #include "spdk/string.h"
11 #include "spdk/uuid.h"
12 #include "spdk/blob.h"
13 
14 #include "vbdev_lvol.h"
15 
16 struct vbdev_lvol_io {
17 	struct spdk_blob_ext_io_opts ext_io_opts;
18 };
19 
20 static TAILQ_HEAD(, lvol_store_bdev) g_spdk_lvol_pairs = TAILQ_HEAD_INITIALIZER(
21 			g_spdk_lvol_pairs);
22 
23 static int vbdev_lvs_init(void);
24 static void vbdev_lvs_fini_start(void);
25 static int vbdev_lvs_get_ctx_size(void);
26 static void vbdev_lvs_examine_config(struct spdk_bdev *bdev);
27 static void vbdev_lvs_examine_disk(struct spdk_bdev *bdev);
28 static bool g_shutdown_started = false;
29 
30 struct spdk_bdev_module g_lvol_if = {
31 	.name = "lvol",
32 	.module_init = vbdev_lvs_init,
33 	.fini_start = vbdev_lvs_fini_start,
34 	.async_fini_start = true,
35 	.examine_config = vbdev_lvs_examine_config,
36 	.examine_disk = vbdev_lvs_examine_disk,
37 	.get_ctx_size = vbdev_lvs_get_ctx_size,
38 
39 };
40 
41 SPDK_BDEV_MODULE_REGISTER(lvol, &g_lvol_if)
42 
43 static void _vbdev_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg);
44 
45 struct lvol_store_bdev *
46 vbdev_get_lvs_bdev_by_lvs(struct spdk_lvol_store *lvs_orig)
47 {
48 	struct spdk_lvol_store *lvs = NULL;
49 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
50 
51 	while (lvs_bdev != NULL) {
52 		lvs = lvs_bdev->lvs;
53 		if (lvs == lvs_orig) {
54 			if (lvs_bdev->removal_in_progress) {
55 				/* We do not allow access to lvs that are being unloaded or
56 				 * destroyed */
57 				SPDK_DEBUGLOG(vbdev_lvol, "lvs %s: removal in progress\n",
58 					      lvs_orig->name);
59 				return NULL;
60 			} else {
61 				return lvs_bdev;
62 			}
63 		}
64 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
65 	}
66 
67 	return NULL;
68 }
69 
70 static int
71 _vbdev_lvol_change_bdev_alias(struct spdk_lvol *lvol, const char *new_lvol_name)
72 {
73 	struct spdk_bdev_alias *tmp;
74 	char *old_alias;
75 	char *alias;
76 	int rc;
77 	int alias_number = 0;
78 
79 	/* bdev representing lvols have only one alias,
80 	 * while we changed lvs name earlier, we have to iterate alias list to get one,
81 	 * and check if there is only one alias */
82 
83 	TAILQ_FOREACH(tmp, spdk_bdev_get_aliases(lvol->bdev), tailq) {
84 		if (++alias_number > 1) {
85 			SPDK_ERRLOG("There is more than 1 alias in bdev %s\n", lvol->bdev->name);
86 			return -EINVAL;
87 		}
88 
89 		old_alias = tmp->alias.name;
90 	}
91 
92 	if (alias_number == 0) {
93 		SPDK_ERRLOG("There are no aliases in bdev %s\n", lvol->bdev->name);
94 		return -EINVAL;
95 	}
96 
97 	alias = spdk_sprintf_alloc("%s/%s", lvol->lvol_store->name, new_lvol_name);
98 	if (alias == NULL) {
99 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
100 		return -ENOMEM;
101 	}
102 
103 	rc = spdk_bdev_alias_add(lvol->bdev, alias);
104 	if (rc != 0) {
105 		SPDK_ERRLOG("cannot add alias '%s'\n", alias);
106 		free(alias);
107 		return rc;
108 	}
109 	free(alias);
110 
111 	rc = spdk_bdev_alias_del(lvol->bdev, old_alias);
112 	if (rc != 0) {
113 		SPDK_ERRLOG("cannot remove alias '%s'\n", old_alias);
114 		return rc;
115 	}
116 
117 	return 0;
118 }
119 
120 static struct lvol_store_bdev *
121 vbdev_get_lvs_bdev_by_bdev(struct spdk_bdev *bdev_orig)
122 {
123 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
124 
125 	while (lvs_bdev != NULL) {
126 		if (lvs_bdev->bdev == bdev_orig) {
127 			if (lvs_bdev->removal_in_progress) {
128 				/* We do not allow access to lvs that are being unloaded or
129 				 * destroyed */
130 				SPDK_DEBUGLOG(vbdev_lvol, "lvs %s: removal in progress\n",
131 					      lvs_bdev->lvs->name);
132 				return NULL;
133 			} else {
134 				return lvs_bdev;
135 			}
136 		}
137 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
138 	}
139 
140 	return NULL;
141 }
142 
143 static void
144 vbdev_lvs_hotremove_cb(struct spdk_bdev *bdev)
145 {
146 	struct lvol_store_bdev *lvs_bdev;
147 
148 	lvs_bdev = vbdev_get_lvs_bdev_by_bdev(bdev);
149 	if (lvs_bdev != NULL) {
150 		SPDK_NOTICELOG("bdev %s being removed: closing lvstore %s\n",
151 			       spdk_bdev_get_name(bdev), lvs_bdev->lvs->name);
152 		vbdev_lvs_unload(lvs_bdev->lvs, NULL, NULL);
153 	}
154 }
155 
156 static void
157 vbdev_lvs_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
158 			     void *event_ctx)
159 {
160 	switch (type) {
161 	case SPDK_BDEV_EVENT_REMOVE:
162 		vbdev_lvs_hotremove_cb(bdev);
163 		break;
164 	default:
165 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
166 		break;
167 	}
168 }
169 
170 static void
171 _vbdev_lvs_create_cb(void *cb_arg, struct spdk_lvol_store *lvs, int lvserrno)
172 {
173 	struct spdk_lvs_with_handle_req *req = cb_arg;
174 	struct lvol_store_bdev *lvs_bdev;
175 	struct spdk_bdev *bdev = req->base_bdev;
176 	struct spdk_bs_dev *bs_dev = req->bs_dev;
177 
178 	if (lvserrno != 0) {
179 		assert(lvs == NULL);
180 		SPDK_ERRLOG("Cannot create lvol store bdev\n");
181 		goto end;
182 	}
183 
184 	lvserrno = spdk_bs_bdev_claim(bs_dev, &g_lvol_if);
185 	if (lvserrno != 0) {
186 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
187 		req->bs_dev->destroy(req->bs_dev);
188 		goto end;
189 	}
190 
191 	assert(lvs != NULL);
192 
193 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
194 	if (!lvs_bdev) {
195 		lvserrno = -ENOMEM;
196 		goto end;
197 	}
198 	lvs_bdev->lvs = lvs;
199 	lvs_bdev->bdev = bdev;
200 	lvs_bdev->req = NULL;
201 
202 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
203 	SPDK_INFOLOG(vbdev_lvol, "Lvol store bdev inserted\n");
204 
205 end:
206 	req->cb_fn(req->cb_arg, lvs, lvserrno);
207 	free(req);
208 
209 	return;
210 }
211 
212 int
213 vbdev_lvs_create(const char *base_bdev_name, const char *name, uint32_t cluster_sz,
214 		 enum lvs_clear_method clear_method, uint32_t num_md_pages_per_cluster_ratio,
215 		 spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
216 {
217 	struct spdk_bs_dev *bs_dev;
218 	struct spdk_lvs_with_handle_req *lvs_req;
219 	struct spdk_lvs_opts opts;
220 	int rc;
221 	int len;
222 
223 	if (base_bdev_name == NULL) {
224 		SPDK_ERRLOG("missing base_bdev_name param\n");
225 		return -EINVAL;
226 	}
227 
228 	spdk_lvs_opts_init(&opts);
229 	if (cluster_sz != 0) {
230 		opts.cluster_sz = cluster_sz;
231 	}
232 
233 	if (clear_method != 0) {
234 		opts.clear_method = clear_method;
235 	}
236 
237 	if (num_md_pages_per_cluster_ratio != 0) {
238 		opts.num_md_pages_per_cluster_ratio = num_md_pages_per_cluster_ratio;
239 	}
240 
241 	if (name == NULL) {
242 		SPDK_ERRLOG("missing name param\n");
243 		return -EINVAL;
244 	}
245 
246 	len = strnlen(name, SPDK_LVS_NAME_MAX);
247 
248 	if (len == 0 || len == SPDK_LVS_NAME_MAX) {
249 		SPDK_ERRLOG("name must be between 1 and %d characters\n", SPDK_LVS_NAME_MAX - 1);
250 		return -EINVAL;
251 	}
252 	snprintf(opts.name, sizeof(opts.name), "%s", name);
253 	opts.esnap_bs_dev_create = vbdev_lvol_esnap_dev_create;
254 
255 	lvs_req = calloc(1, sizeof(*lvs_req));
256 	if (!lvs_req) {
257 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
258 		return -ENOMEM;
259 	}
260 
261 	rc = spdk_bdev_create_bs_dev_ext(base_bdev_name, vbdev_lvs_base_bdev_event_cb,
262 					 NULL, &bs_dev);
263 	if (rc < 0) {
264 		SPDK_ERRLOG("Cannot create blobstore device\n");
265 		free(lvs_req);
266 		return rc;
267 	}
268 
269 	lvs_req->bs_dev = bs_dev;
270 	lvs_req->base_bdev = bs_dev->get_base_bdev(bs_dev);
271 	lvs_req->cb_fn = cb_fn;
272 	lvs_req->cb_arg = cb_arg;
273 
274 	rc = spdk_lvs_init(bs_dev, &opts, _vbdev_lvs_create_cb, lvs_req);
275 	if (rc < 0) {
276 		free(lvs_req);
277 		bs_dev->destroy(bs_dev);
278 		return rc;
279 	}
280 
281 	return 0;
282 }
283 
284 static void
285 _vbdev_lvs_rename_cb(void *cb_arg, int lvserrno)
286 {
287 	struct spdk_lvs_req *req = cb_arg;
288 	struct spdk_lvol *tmp;
289 
290 	if (lvserrno != 0) {
291 		SPDK_INFOLOG(vbdev_lvol, "Lvol store rename failed\n");
292 	} else {
293 		TAILQ_FOREACH(tmp, &req->lvol_store->lvols, link) {
294 			/* We have to pass current lvol name, since only lvs name changed */
295 			_vbdev_lvol_change_bdev_alias(tmp, tmp->name);
296 		}
297 	}
298 
299 	req->cb_fn(req->cb_arg, lvserrno);
300 	free(req);
301 }
302 
303 void
304 vbdev_lvs_rename(struct spdk_lvol_store *lvs, const char *new_lvs_name,
305 		 spdk_lvs_op_complete cb_fn, void *cb_arg)
306 {
307 	struct lvol_store_bdev *lvs_bdev;
308 
309 	struct spdk_lvs_req *req;
310 
311 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
312 	if (!lvs_bdev) {
313 		SPDK_ERRLOG("No such lvol store found\n");
314 		cb_fn(cb_arg, -ENODEV);
315 		return;
316 	}
317 
318 	req = calloc(1, sizeof(*req));
319 	if (!req) {
320 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
321 		cb_fn(cb_arg, -ENOMEM);
322 		return;
323 	}
324 	req->cb_fn = cb_fn;
325 	req->cb_arg = cb_arg;
326 	req->lvol_store = lvs;
327 
328 	spdk_lvs_rename(lvs, new_lvs_name, _vbdev_lvs_rename_cb, req);
329 }
330 
331 static void
332 _vbdev_lvs_remove_cb(void *cb_arg, int lvserrno)
333 {
334 	struct lvol_store_bdev *lvs_bdev = cb_arg;
335 	struct spdk_lvs_req *req = lvs_bdev->req;
336 
337 	if (lvserrno != 0) {
338 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
339 	}
340 
341 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
342 	free(lvs_bdev);
343 
344 	if (req->cb_fn != NULL) {
345 		req->cb_fn(req->cb_arg, lvserrno);
346 	}
347 	free(req);
348 }
349 
350 static void
351 _vbdev_lvs_remove_lvol_cb(void *cb_arg, int lvolerrno)
352 {
353 	struct lvol_store_bdev *lvs_bdev = cb_arg;
354 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
355 	struct spdk_lvol *lvol;
356 
357 	if (lvolerrno != 0) {
358 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol removed with errno %d\n", lvolerrno);
359 	}
360 
361 	if (TAILQ_EMPTY(&lvs->lvols)) {
362 		spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
363 		return;
364 	}
365 
366 	lvol = TAILQ_FIRST(&lvs->lvols);
367 	while (lvol != NULL) {
368 		if (spdk_lvol_deletable(lvol)) {
369 			_vbdev_lvol_destroy(lvol, _vbdev_lvs_remove_lvol_cb, lvs_bdev);
370 			return;
371 		}
372 		lvol = TAILQ_NEXT(lvol, link);
373 	}
374 
375 	/* If no lvol is deletable, that means there is circular dependency. */
376 	SPDK_ERRLOG("Lvols left in lvs, but unable to delete.\n");
377 	assert(false);
378 }
379 
380 static bool
381 _vbdev_lvs_are_lvols_closed(struct spdk_lvol_store *lvs)
382 {
383 	struct spdk_lvol *lvol;
384 
385 	TAILQ_FOREACH(lvol, &lvs->lvols, link) {
386 		if (lvol->ref_count != 0) {
387 			return false;
388 		}
389 	}
390 	return true;
391 }
392 
393 static void
394 _vbdev_lvs_remove_bdev_unregistered_cb(void *cb_arg, int bdeverrno)
395 {
396 	struct lvol_store_bdev *lvs_bdev = cb_arg;
397 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
398 
399 	if (bdeverrno != 0) {
400 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol unregistered with errno %d\n", bdeverrno);
401 	}
402 
403 	/* Lvol store can be unloaded once all lvols are closed. */
404 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
405 		spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
406 	}
407 }
408 
409 static void
410 _vbdev_lvs_remove(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg,
411 		  bool destroy)
412 {
413 	struct spdk_lvs_req *req;
414 	struct lvol_store_bdev *lvs_bdev;
415 	struct spdk_lvol *lvol, *tmp;
416 
417 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
418 	if (!lvs_bdev) {
419 		SPDK_ERRLOG("No such lvol store found\n");
420 		if (cb_fn != NULL) {
421 			cb_fn(cb_arg, -ENODEV);
422 		}
423 		return;
424 	}
425 
426 	req = calloc(1, sizeof(*req));
427 	if (!req) {
428 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
429 		if (cb_fn != NULL) {
430 			cb_fn(cb_arg, -ENOMEM);
431 		}
432 		return;
433 	}
434 
435 	lvs_bdev->removal_in_progress = true;
436 
437 	req->cb_fn = cb_fn;
438 	req->cb_arg = cb_arg;
439 	lvs_bdev->req = req;
440 
441 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
442 		if (destroy) {
443 			spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
444 			return;
445 		}
446 		spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
447 		return;
448 	}
449 	if (destroy) {
450 		_vbdev_lvs_remove_lvol_cb(lvs_bdev, 0);
451 		return;
452 	}
453 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
454 		if (lvol->bdev == NULL) {
455 			spdk_lvol_close(lvol, _vbdev_lvs_remove_bdev_unregistered_cb, lvs_bdev);
456 			continue;
457 		}
458 		spdk_bdev_unregister(lvol->bdev, _vbdev_lvs_remove_bdev_unregistered_cb, lvs_bdev);
459 	}
460 }
461 
462 void
463 vbdev_lvs_unload(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
464 {
465 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, false);
466 }
467 
468 void
469 vbdev_lvs_destruct(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
470 {
471 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, true);
472 }
473 
474 struct lvol_store_bdev *
475 vbdev_lvol_store_first(void)
476 {
477 	struct lvol_store_bdev *lvs_bdev;
478 
479 	lvs_bdev = TAILQ_FIRST(&g_spdk_lvol_pairs);
480 	if (lvs_bdev) {
481 		SPDK_INFOLOG(vbdev_lvol, "Starting lvolstore iteration at %p\n", lvs_bdev->lvs);
482 	}
483 
484 	return lvs_bdev;
485 }
486 
487 struct lvol_store_bdev *
488 vbdev_lvol_store_next(struct lvol_store_bdev *prev)
489 {
490 	struct lvol_store_bdev *lvs_bdev;
491 
492 	if (prev == NULL) {
493 		SPDK_ERRLOG("prev argument cannot be NULL\n");
494 		return NULL;
495 	}
496 
497 	lvs_bdev = TAILQ_NEXT(prev, lvol_stores);
498 	if (lvs_bdev) {
499 		SPDK_INFOLOG(vbdev_lvol, "Continuing lvolstore iteration at %p\n", lvs_bdev->lvs);
500 	}
501 
502 	return lvs_bdev;
503 }
504 
505 static struct spdk_lvol_store *
506 _vbdev_get_lvol_store_by_uuid(const struct spdk_uuid *uuid)
507 {
508 	struct spdk_lvol_store *lvs = NULL;
509 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
510 
511 	while (lvs_bdev != NULL) {
512 		lvs = lvs_bdev->lvs;
513 		if (spdk_uuid_compare(&lvs->uuid, uuid) == 0) {
514 			return lvs;
515 		}
516 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
517 	}
518 	return NULL;
519 }
520 
521 struct spdk_lvol_store *
522 vbdev_get_lvol_store_by_uuid(const char *uuid_str)
523 {
524 	struct spdk_uuid uuid;
525 
526 	if (spdk_uuid_parse(&uuid, uuid_str)) {
527 		return NULL;
528 	}
529 
530 	return _vbdev_get_lvol_store_by_uuid(&uuid);
531 }
532 
533 struct spdk_lvol_store *
534 vbdev_get_lvol_store_by_name(const char *name)
535 {
536 	struct spdk_lvol_store *lvs = NULL;
537 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
538 
539 	while (lvs_bdev != NULL) {
540 		lvs = lvs_bdev->lvs;
541 		if (strncmp(lvs->name, name, sizeof(lvs->name)) == 0) {
542 			return lvs;
543 		}
544 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
545 	}
546 	return NULL;
547 }
548 
549 struct vbdev_lvol_destroy_ctx {
550 	struct spdk_lvol *lvol;
551 	spdk_lvol_op_complete cb_fn;
552 	void *cb_arg;
553 };
554 
555 static void
556 _vbdev_lvol_unregister_unload_lvs(void *cb_arg, int lvserrno)
557 {
558 	struct lvol_bdev *lvol_bdev = cb_arg;
559 	struct lvol_store_bdev *lvs_bdev = lvol_bdev->lvs_bdev;
560 
561 	if (lvserrno != 0) {
562 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
563 	}
564 
565 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
566 	free(lvs_bdev);
567 
568 	spdk_bdev_destruct_done(&lvol_bdev->bdev, lvserrno);
569 	free(lvol_bdev);
570 }
571 
572 static void
573 _vbdev_lvol_unregister_cb(void *ctx, int lvolerrno)
574 {
575 	struct lvol_bdev *lvol_bdev = ctx;
576 	struct lvol_store_bdev *lvs_bdev = lvol_bdev->lvs_bdev;
577 
578 	if (g_shutdown_started && _vbdev_lvs_are_lvols_closed(lvs_bdev->lvs)) {
579 		spdk_lvs_unload(lvs_bdev->lvs, _vbdev_lvol_unregister_unload_lvs, lvol_bdev);
580 		return;
581 	}
582 
583 	spdk_bdev_destruct_done(&lvol_bdev->bdev, lvolerrno);
584 	free(lvol_bdev);
585 }
586 
587 static int
588 vbdev_lvol_unregister(void *ctx)
589 {
590 	struct spdk_lvol *lvol = ctx;
591 	struct lvol_bdev *lvol_bdev;
592 
593 	assert(lvol != NULL);
594 	lvol_bdev = SPDK_CONTAINEROF(lvol->bdev, struct lvol_bdev, bdev);
595 
596 	spdk_bdev_alias_del_all(lvol->bdev);
597 	spdk_lvol_close(lvol, _vbdev_lvol_unregister_cb, lvol_bdev);
598 
599 	/* return 1 to indicate we have an operation that must finish asynchronously before the
600 	 *  lvol is closed
601 	 */
602 	return 1;
603 }
604 
605 static void
606 _vbdev_lvol_destroy_cb(void *cb_arg, int bdeverrno)
607 {
608 	struct vbdev_lvol_destroy_ctx *ctx = cb_arg;
609 	struct spdk_lvol *lvol = ctx->lvol;
610 
611 	if (bdeverrno < 0) {
612 		SPDK_INFOLOG(vbdev_lvol, "Could not unregister bdev during lvol (%s) destroy\n",
613 			     lvol->unique_id);
614 		ctx->cb_fn(ctx->cb_arg, bdeverrno);
615 		free(ctx);
616 		return;
617 	}
618 
619 	spdk_lvol_destroy(lvol, ctx->cb_fn, ctx->cb_arg);
620 	free(ctx);
621 }
622 
623 static void
624 _vbdev_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
625 {
626 	struct vbdev_lvol_destroy_ctx *ctx;
627 	size_t count;
628 
629 	assert(lvol != NULL);
630 	assert(cb_fn != NULL);
631 
632 	/* Callers other than _vbdev_lvs_remove() must ensure the lvstore is not being removed. */
633 	assert(cb_fn == _vbdev_lvs_remove_lvol_cb ||
634 	       vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store) != NULL);
635 
636 	/* Check if it is possible to delete lvol */
637 	spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
638 	if (count > 1) {
639 		/* throw an error */
640 		SPDK_ERRLOG("Cannot delete lvol\n");
641 		cb_fn(cb_arg, -EPERM);
642 		return;
643 	}
644 
645 	ctx = calloc(1, sizeof(*ctx));
646 	if (!ctx) {
647 		cb_fn(cb_arg, -ENOMEM);
648 		return;
649 	}
650 
651 	ctx->lvol = lvol;
652 	ctx->cb_fn = cb_fn;
653 	ctx->cb_arg = cb_arg;
654 
655 	if (spdk_lvol_is_degraded(lvol)) {
656 		spdk_lvol_close(lvol, _vbdev_lvol_destroy_cb, ctx);
657 		return;
658 	}
659 
660 	spdk_bdev_unregister(lvol->bdev, _vbdev_lvol_destroy_cb, ctx);
661 }
662 
663 void
664 vbdev_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
665 {
666 	struct lvol_store_bdev *lvs_bdev;
667 
668 	/*
669 	 * During destruction of an lvolstore, _vbdev_lvs_unload() iterates through lvols until they
670 	 * are all deleted. There may be some IO required
671 	 */
672 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
673 	if (lvs_bdev == NULL) {
674 		SPDK_DEBUGLOG(vbdev_lvol, "lvol %s: lvolstore is being removed\n",
675 			      lvol->unique_id);
676 		cb_fn(cb_arg, -ENODEV);
677 		return;
678 	}
679 
680 	_vbdev_lvol_destroy(lvol, cb_fn, cb_arg);
681 }
682 
683 static char *
684 vbdev_lvol_find_name(struct spdk_lvol *lvol, spdk_blob_id blob_id)
685 {
686 	struct spdk_lvol_store *lvs;
687 	struct spdk_lvol *_lvol;
688 
689 	assert(lvol != NULL);
690 
691 	lvs = lvol->lvol_store;
692 
693 	assert(lvs);
694 
695 	TAILQ_FOREACH(_lvol, &lvs->lvols, link) {
696 		if (_lvol->blob_id == blob_id) {
697 			return _lvol->name;
698 		}
699 	}
700 
701 	return NULL;
702 }
703 
704 static int
705 vbdev_lvol_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
706 {
707 	struct spdk_lvol *lvol = ctx;
708 	struct lvol_store_bdev *lvs_bdev;
709 	struct spdk_bdev *bdev;
710 	struct spdk_blob *blob;
711 	spdk_blob_id *ids = NULL;
712 	size_t count, i;
713 	char *name;
714 	int rc = 0;
715 
716 	spdk_json_write_named_object_begin(w, "lvol");
717 
718 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
719 	if (!lvs_bdev) {
720 		SPDK_ERRLOG("No such lvol store found\n");
721 		rc = -ENODEV;
722 		goto end;
723 	}
724 
725 	bdev = lvs_bdev->bdev;
726 
727 	spdk_json_write_named_uuid(w, "lvol_store_uuid", &lvol->lvol_store->uuid);
728 
729 	spdk_json_write_named_string(w, "base_bdev", spdk_bdev_get_name(bdev));
730 
731 	blob = lvol->blob;
732 
733 	spdk_json_write_named_bool(w, "thin_provision", spdk_blob_is_thin_provisioned(blob));
734 
735 	spdk_json_write_named_bool(w, "snapshot", spdk_blob_is_snapshot(blob));
736 
737 	spdk_json_write_named_bool(w, "clone", spdk_blob_is_clone(blob));
738 
739 	if (spdk_blob_is_clone(blob)) {
740 		spdk_blob_id snapshotid = spdk_blob_get_parent_snapshot(lvol->lvol_store->blobstore, lvol->blob_id);
741 		if (snapshotid != SPDK_BLOBID_INVALID) {
742 			name = vbdev_lvol_find_name(lvol, snapshotid);
743 			if (name != NULL) {
744 				spdk_json_write_named_string(w, "base_snapshot", name);
745 			} else {
746 				SPDK_ERRLOG("Cannot obtain snapshots name\n");
747 			}
748 		}
749 	}
750 
751 	if (spdk_blob_is_snapshot(blob)) {
752 		/* Take a number of clones */
753 		rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
754 		if (rc == -ENOMEM && count > 0) {
755 			ids = malloc(sizeof(spdk_blob_id) * count);
756 			if (ids == NULL) {
757 				SPDK_ERRLOG("Cannot allocate memory\n");
758 				rc = -ENOMEM;
759 				goto end;
760 			}
761 
762 			rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, ids, &count);
763 			if (rc == 0) {
764 				spdk_json_write_named_array_begin(w, "clones");
765 				for (i = 0; i < count; i++) {
766 					name = vbdev_lvol_find_name(lvol, ids[i]);
767 					if (name != NULL) {
768 						spdk_json_write_string(w, name);
769 					} else {
770 						SPDK_ERRLOG("Cannot obtain clone name\n");
771 					}
772 
773 				}
774 				spdk_json_write_array_end(w);
775 			}
776 			free(ids);
777 		}
778 
779 	}
780 
781 	spdk_json_write_named_bool(w, "esnap_clone", spdk_blob_is_esnap_clone(blob));
782 
783 	if (spdk_blob_is_esnap_clone(blob)) {
784 		const char *name;
785 		size_t name_len;
786 
787 		rc = spdk_blob_get_esnap_id(blob, (const void **)&name, &name_len);
788 		if (rc == 0 && name != NULL && strnlen(name, name_len) + 1 == name_len) {
789 			spdk_json_write_named_string(w, "external_snapshot_name", name);
790 		}
791 	}
792 
793 end:
794 	spdk_json_write_object_end(w);
795 
796 	return rc;
797 }
798 
799 static void
800 vbdev_lvol_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
801 {
802 	/* Nothing to dump as lvol configuration is saved on physical device. */
803 }
804 
805 static struct spdk_io_channel *
806 vbdev_lvol_get_io_channel(void *ctx)
807 {
808 	struct spdk_lvol *lvol = ctx;
809 
810 	return spdk_lvol_get_io_channel(lvol);
811 }
812 
813 static bool
814 vbdev_lvol_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
815 {
816 	struct spdk_lvol *lvol = ctx;
817 
818 	switch (io_type) {
819 	case SPDK_BDEV_IO_TYPE_WRITE:
820 	case SPDK_BDEV_IO_TYPE_UNMAP:
821 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
822 		return !spdk_blob_is_read_only(lvol->blob);
823 	case SPDK_BDEV_IO_TYPE_RESET:
824 	case SPDK_BDEV_IO_TYPE_READ:
825 	case SPDK_BDEV_IO_TYPE_SEEK_DATA:
826 	case SPDK_BDEV_IO_TYPE_SEEK_HOLE:
827 		return true;
828 	default:
829 		return false;
830 	}
831 }
832 
833 static void
834 lvol_op_comp(void *cb_arg, int bserrno)
835 {
836 	struct spdk_bdev_io *bdev_io = cb_arg;
837 	enum spdk_bdev_io_status status = SPDK_BDEV_IO_STATUS_SUCCESS;
838 
839 	if (bserrno != 0) {
840 		if (bserrno == -ENOMEM) {
841 			status = SPDK_BDEV_IO_STATUS_NOMEM;
842 		} else {
843 			status = SPDK_BDEV_IO_STATUS_FAILED;
844 		}
845 	}
846 
847 	spdk_bdev_io_complete(bdev_io, status);
848 }
849 
850 static void
851 lvol_unmap(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
852 {
853 	uint64_t start_page, num_pages;
854 	struct spdk_blob *blob = lvol->blob;
855 
856 	start_page = bdev_io->u.bdev.offset_blocks;
857 	num_pages = bdev_io->u.bdev.num_blocks;
858 
859 	spdk_blob_io_unmap(blob, ch, start_page, num_pages, lvol_op_comp, bdev_io);
860 }
861 
862 static void
863 lvol_seek_data(struct spdk_lvol *lvol, struct spdk_bdev_io *bdev_io)
864 {
865 	bdev_io->u.bdev.seek.offset = spdk_blob_get_next_allocated_io_unit(lvol->blob,
866 				      bdev_io->u.bdev.offset_blocks);
867 
868 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
869 }
870 
871 static void
872 lvol_seek_hole(struct spdk_lvol *lvol, struct spdk_bdev_io *bdev_io)
873 {
874 	bdev_io->u.bdev.seek.offset = spdk_blob_get_next_unallocated_io_unit(lvol->blob,
875 				      bdev_io->u.bdev.offset_blocks);
876 
877 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
878 }
879 
880 static void
881 lvol_write_zeroes(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
882 {
883 	uint64_t start_page, num_pages;
884 	struct spdk_blob *blob = lvol->blob;
885 
886 	start_page = bdev_io->u.bdev.offset_blocks;
887 	num_pages = bdev_io->u.bdev.num_blocks;
888 
889 	spdk_blob_io_write_zeroes(blob, ch, start_page, num_pages, lvol_op_comp, bdev_io);
890 }
891 
892 static void
893 lvol_read(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
894 {
895 	uint64_t start_page, num_pages;
896 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
897 	struct spdk_blob *blob = lvol->blob;
898 	struct vbdev_lvol_io *lvol_io = (struct vbdev_lvol_io *)bdev_io->driver_ctx;
899 
900 	start_page = bdev_io->u.bdev.offset_blocks;
901 	num_pages = bdev_io->u.bdev.num_blocks;
902 
903 	lvol_io->ext_io_opts.size = sizeof(lvol_io->ext_io_opts);
904 	lvol_io->ext_io_opts.memory_domain = bdev_io->u.bdev.memory_domain;
905 	lvol_io->ext_io_opts.memory_domain_ctx = bdev_io->u.bdev.memory_domain_ctx;
906 
907 	spdk_blob_io_readv_ext(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
908 			       num_pages, lvol_op_comp, bdev_io, &lvol_io->ext_io_opts);
909 }
910 
911 static void
912 lvol_write(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
913 {
914 	uint64_t start_page, num_pages;
915 	struct spdk_blob *blob = lvol->blob;
916 	struct vbdev_lvol_io *lvol_io = (struct vbdev_lvol_io *)bdev_io->driver_ctx;
917 
918 	start_page = bdev_io->u.bdev.offset_blocks;
919 	num_pages = bdev_io->u.bdev.num_blocks;
920 
921 	lvol_io->ext_io_opts.size = sizeof(lvol_io->ext_io_opts);
922 	lvol_io->ext_io_opts.memory_domain = bdev_io->u.bdev.memory_domain;
923 	lvol_io->ext_io_opts.memory_domain_ctx = bdev_io->u.bdev.memory_domain_ctx;
924 
925 	spdk_blob_io_writev_ext(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
926 				num_pages, lvol_op_comp, bdev_io, &lvol_io->ext_io_opts);
927 }
928 
929 static int
930 lvol_reset(struct spdk_bdev_io *bdev_io)
931 {
932 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
933 
934 	return 0;
935 }
936 
937 static void
938 lvol_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
939 {
940 	if (!success) {
941 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
942 		return;
943 	}
944 
945 	lvol_read(ch, bdev_io);
946 }
947 
948 static void
949 vbdev_lvol_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
950 {
951 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
952 
953 	switch (bdev_io->type) {
954 	case SPDK_BDEV_IO_TYPE_READ:
955 		spdk_bdev_io_get_buf(bdev_io, lvol_get_buf_cb,
956 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
957 		break;
958 	case SPDK_BDEV_IO_TYPE_WRITE:
959 		lvol_write(lvol, ch, bdev_io);
960 		break;
961 	case SPDK_BDEV_IO_TYPE_RESET:
962 		lvol_reset(bdev_io);
963 		break;
964 	case SPDK_BDEV_IO_TYPE_UNMAP:
965 		lvol_unmap(lvol, ch, bdev_io);
966 		break;
967 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
968 		lvol_write_zeroes(lvol, ch, bdev_io);
969 		break;
970 	case SPDK_BDEV_IO_TYPE_SEEK_DATA:
971 		lvol_seek_data(lvol, bdev_io);
972 		break;
973 	case SPDK_BDEV_IO_TYPE_SEEK_HOLE:
974 		lvol_seek_hole(lvol, bdev_io);
975 		break;
976 	default:
977 		SPDK_INFOLOG(vbdev_lvol, "lvol: unsupported I/O type %d\n", bdev_io->type);
978 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
979 		return;
980 	}
981 	return;
982 }
983 
984 static int
985 vbdev_lvol_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
986 {
987 	struct spdk_lvol *lvol = ctx;
988 	struct spdk_bdev *base_bdev, *esnap_bdev;
989 	struct spdk_bs_dev *bs_dev;
990 	struct spdk_lvol_store *lvs;
991 	int base_cnt, esnap_cnt;
992 
993 	lvs = lvol->lvol_store;
994 	base_bdev = lvs->bs_dev->get_base_bdev(lvol->lvol_store->bs_dev);
995 
996 	base_cnt = spdk_bdev_get_memory_domains(base_bdev, domains, array_size);
997 	if (base_cnt < 0) {
998 		return base_cnt;
999 	}
1000 
1001 	if (lvol->blob == NULL) {
1002 		/*
1003 		 * This is probably called due to an open happening during blobstore load. Another
1004 		 * open will follow shortly that has lvol->blob set.
1005 		 */
1006 		return -EAGAIN;
1007 	}
1008 
1009 	if (!spdk_blob_is_esnap_clone(lvol->blob)) {
1010 		return base_cnt;
1011 	}
1012 
1013 	bs_dev = spdk_blob_get_esnap_bs_dev(lvol->blob);
1014 	if (bs_dev == NULL) {
1015 		assert(false);
1016 		SPDK_ERRLOG("lvol %s is an esnap clone but has no esnap device\n", lvol->unique_id);
1017 		return base_cnt;
1018 	}
1019 
1020 	if (bs_dev->get_base_bdev == NULL) {
1021 		/*
1022 		 * If this were a blob_bdev, we wouldn't be here. We are probably here because an
1023 		 * lvol bdev is being registered with spdk_bdev_register() before the external
1024 		 * snapshot bdev is loaded. Ideally, the load of a missing esnap would trigger an
1025 		 * event that causes the lvol bdev's memory domain information to be updated.
1026 		 */
1027 		return base_cnt;
1028 	}
1029 
1030 	esnap_bdev = bs_dev->get_base_bdev(bs_dev);
1031 	if (esnap_bdev == NULL) {
1032 		/*
1033 		 * The esnap bdev has not yet been loaded. Anyone that has opened at this point may
1034 		 * miss out on using memory domains if base_cnt is zero.
1035 		 */
1036 		SPDK_NOTICELOG("lvol %s reporting %d memory domains, not including missing esnap\n",
1037 			       lvol->unique_id, base_cnt);
1038 		return base_cnt;
1039 	}
1040 
1041 	if (base_cnt < array_size) {
1042 		array_size -= base_cnt;
1043 		domains += base_cnt;
1044 	} else {
1045 		array_size = 0;
1046 		domains = NULL;
1047 	}
1048 
1049 	esnap_cnt = spdk_bdev_get_memory_domains(esnap_bdev, domains, array_size);
1050 	if (esnap_cnt <= 0) {
1051 		return base_cnt;
1052 	}
1053 
1054 	return base_cnt + esnap_cnt;
1055 }
1056 
1057 static struct spdk_bdev_fn_table vbdev_lvol_fn_table = {
1058 	.destruct		= vbdev_lvol_unregister,
1059 	.io_type_supported	= vbdev_lvol_io_type_supported,
1060 	.submit_request		= vbdev_lvol_submit_request,
1061 	.get_io_channel		= vbdev_lvol_get_io_channel,
1062 	.dump_info_json		= vbdev_lvol_dump_info_json,
1063 	.write_config_json	= vbdev_lvol_write_config_json,
1064 	.get_memory_domains	= vbdev_lvol_get_memory_domains,
1065 };
1066 
1067 static void
1068 lvol_destroy_cb(void *cb_arg, int bdeverrno)
1069 {
1070 }
1071 
1072 static void
1073 _create_lvol_disk_destroy_cb(void *cb_arg, int bdeverrno)
1074 {
1075 	struct spdk_lvol *lvol = cb_arg;
1076 
1077 	if (bdeverrno < 0) {
1078 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
1079 			    lvol->unique_id);
1080 		return;
1081 	}
1082 
1083 	spdk_lvol_destroy(lvol, lvol_destroy_cb, NULL);
1084 }
1085 
1086 static void
1087 _create_lvol_disk_unload_cb(void *cb_arg, int bdeverrno)
1088 {
1089 	struct spdk_lvol *lvol = cb_arg;
1090 
1091 	if (bdeverrno < 0) {
1092 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
1093 			    lvol->unique_id);
1094 		return;
1095 	}
1096 
1097 	TAILQ_REMOVE(&lvol->lvol_store->lvols, lvol, link);
1098 	free(lvol);
1099 }
1100 
1101 static int
1102 _create_lvol_disk(struct spdk_lvol *lvol, bool destroy)
1103 {
1104 	struct spdk_bdev *bdev;
1105 	struct lvol_bdev *lvol_bdev;
1106 	struct lvol_store_bdev *lvs_bdev;
1107 	uint64_t total_size;
1108 	unsigned char *alias;
1109 	int rc;
1110 
1111 	if (spdk_lvol_is_degraded(lvol)) {
1112 		SPDK_NOTICELOG("lvol %s: blob is degraded: deferring bdev creation\n",
1113 			       lvol->unique_id);
1114 		return 0;
1115 	}
1116 
1117 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
1118 	if (lvs_bdev == NULL) {
1119 		SPDK_ERRLOG("No spdk lvs-bdev pair found for lvol %s\n", lvol->unique_id);
1120 		assert(false);
1121 		return -ENODEV;
1122 	}
1123 
1124 	lvol_bdev = calloc(1, sizeof(struct lvol_bdev));
1125 	if (!lvol_bdev) {
1126 		SPDK_ERRLOG("Cannot alloc memory for lvol bdev\n");
1127 		return -ENOMEM;
1128 	}
1129 
1130 	lvol_bdev->lvol = lvol;
1131 	lvol_bdev->lvs_bdev = lvs_bdev;
1132 
1133 	bdev = &lvol_bdev->bdev;
1134 	bdev->name = lvol->unique_id;
1135 	bdev->product_name = "Logical Volume";
1136 	bdev->blocklen = spdk_bs_get_io_unit_size(lvol->lvol_store->blobstore);
1137 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
1138 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
1139 	assert((total_size % bdev->blocklen) == 0);
1140 	bdev->blockcnt = total_size / bdev->blocklen;
1141 	bdev->uuid = lvol->uuid;
1142 	bdev->required_alignment = lvs_bdev->bdev->required_alignment;
1143 	bdev->split_on_optimal_io_boundary = true;
1144 	bdev->optimal_io_boundary = spdk_bs_get_cluster_size(lvol->lvol_store->blobstore) / bdev->blocklen;
1145 
1146 	bdev->ctxt = lvol;
1147 	bdev->fn_table = &vbdev_lvol_fn_table;
1148 	bdev->module = &g_lvol_if;
1149 
1150 	/* Set default bdev reset waiting time. This value indicates how much
1151 	 * time a reset should wait before forcing a reset down to the underlying
1152 	 * bdev module.
1153 	 * Setting this parameter is mainly to avoid "empty" resets to a shared
1154 	 * bdev that may be used by multiple lvols. */
1155 	bdev->reset_io_drain_timeout = SPDK_BDEV_RESET_IO_DRAIN_RECOMMENDED_VALUE;
1156 
1157 	rc = spdk_bdev_register(bdev);
1158 	if (rc) {
1159 		free(lvol_bdev);
1160 		return rc;
1161 	}
1162 	lvol->bdev = bdev;
1163 
1164 	alias = spdk_sprintf_alloc("%s/%s", lvs_bdev->lvs->name, lvol->name);
1165 	if (alias == NULL) {
1166 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
1167 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
1168 						  _create_lvol_disk_unload_cb), lvol);
1169 		return -ENOMEM;
1170 	}
1171 
1172 	rc = spdk_bdev_alias_add(bdev, alias);
1173 	if (rc != 0) {
1174 		SPDK_ERRLOG("Cannot add alias to lvol bdev\n");
1175 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
1176 						  _create_lvol_disk_unload_cb), lvol);
1177 	}
1178 	free(alias);
1179 
1180 	return rc;
1181 }
1182 
1183 static void
1184 _vbdev_lvol_create_cb(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1185 {
1186 	struct spdk_lvol_with_handle_req *req = cb_arg;
1187 
1188 	if (lvolerrno < 0) {
1189 		goto end;
1190 	}
1191 
1192 	lvolerrno = _create_lvol_disk(lvol, true);
1193 
1194 end:
1195 	req->cb_fn(req->cb_arg, lvol, lvolerrno);
1196 	free(req);
1197 }
1198 
1199 int
1200 vbdev_lvol_create(struct spdk_lvol_store *lvs, const char *name, uint64_t sz,
1201 		  bool thin_provision, enum lvol_clear_method clear_method, spdk_lvol_op_with_handle_complete cb_fn,
1202 		  void *cb_arg)
1203 {
1204 	struct spdk_lvol_with_handle_req *req;
1205 	int rc;
1206 
1207 	req = calloc(1, sizeof(*req));
1208 	if (req == NULL) {
1209 		return -ENOMEM;
1210 	}
1211 	req->cb_fn = cb_fn;
1212 	req->cb_arg = cb_arg;
1213 
1214 	rc = spdk_lvol_create(lvs, name, sz, thin_provision, clear_method,
1215 			      _vbdev_lvol_create_cb, req);
1216 	if (rc != 0) {
1217 		free(req);
1218 	}
1219 
1220 	return rc;
1221 }
1222 
1223 void
1224 vbdev_lvol_create_snapshot(struct spdk_lvol *lvol, const char *snapshot_name,
1225 			   spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1226 {
1227 	struct spdk_lvol_with_handle_req *req;
1228 
1229 	req = calloc(1, sizeof(*req));
1230 	if (req == NULL) {
1231 		cb_fn(cb_arg, NULL, -ENOMEM);
1232 		return;
1233 	}
1234 
1235 	req->cb_fn = cb_fn;
1236 	req->cb_arg = cb_arg;
1237 
1238 	spdk_lvol_create_snapshot(lvol, snapshot_name, _vbdev_lvol_create_cb, req);
1239 }
1240 
1241 void
1242 vbdev_lvol_create_clone(struct spdk_lvol *lvol, const char *clone_name,
1243 			spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1244 {
1245 	struct spdk_lvol_with_handle_req *req;
1246 
1247 	req = calloc(1, sizeof(*req));
1248 	if (req == NULL) {
1249 		cb_fn(cb_arg, NULL, -ENOMEM);
1250 		return;
1251 	}
1252 
1253 	req->cb_fn = cb_fn;
1254 	req->cb_arg = cb_arg;
1255 
1256 	spdk_lvol_create_clone(lvol, clone_name, _vbdev_lvol_create_cb, req);
1257 }
1258 
1259 static void
1260 ignore_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx)
1261 {
1262 }
1263 
1264 void
1265 vbdev_lvol_create_bdev_clone(const char *esnap_name,
1266 			     struct spdk_lvol_store *lvs, const char *clone_name,
1267 			     spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1268 {
1269 	struct spdk_lvol_with_handle_req *req;
1270 	struct spdk_bdev_desc *desc;
1271 	struct spdk_bdev *bdev;
1272 	char bdev_uuid[SPDK_UUID_STRING_LEN];
1273 	uint64_t sz;
1274 	int rc;
1275 
1276 	if (lvs == NULL) {
1277 		SPDK_ERRLOG("lvol store not specified\n");
1278 		cb_fn(cb_arg, NULL, -EINVAL);
1279 		return;
1280 	}
1281 
1282 	rc = spdk_bdev_open_ext(esnap_name, false, ignore_bdev_event_cb, NULL, &desc);
1283 	if (rc != 0) {
1284 		SPDK_ERRLOG("bdev '%s' could not be opened: error %d\n", esnap_name, rc);
1285 		cb_fn(cb_arg, NULL, rc);
1286 		return;
1287 	}
1288 	bdev = spdk_bdev_desc_get_bdev(desc);
1289 
1290 	rc = spdk_uuid_fmt_lower(bdev_uuid, sizeof(bdev_uuid), spdk_bdev_get_uuid(bdev));
1291 	if (rc != 0) {
1292 		spdk_bdev_close(desc);
1293 		SPDK_ERRLOG("bdev %s: unable to parse UUID\n", esnap_name);
1294 		assert(false);
1295 		cb_fn(cb_arg, NULL, -ENODEV);
1296 		return;
1297 	}
1298 
1299 	req = calloc(1, sizeof(*req));
1300 	if (req == NULL) {
1301 		spdk_bdev_close(desc);
1302 		cb_fn(cb_arg, NULL, -ENOMEM);
1303 		return;
1304 	}
1305 
1306 	req->cb_fn = cb_fn;
1307 	req->cb_arg = cb_arg;
1308 
1309 	sz = spdk_bdev_get_num_blocks(bdev) * spdk_bdev_get_block_size(bdev);
1310 	rc = spdk_lvol_create_esnap_clone(bdev_uuid, sizeof(bdev_uuid), sz, lvs, clone_name,
1311 					  _vbdev_lvol_create_cb, req);
1312 	spdk_bdev_close(desc);
1313 	if (rc != 0) {
1314 		cb_fn(cb_arg, NULL, rc);
1315 		free(req);
1316 	}
1317 }
1318 
1319 static void
1320 _vbdev_lvol_rename_cb(void *cb_arg, int lvolerrno)
1321 {
1322 	struct spdk_lvol_req *req = cb_arg;
1323 
1324 	if (lvolerrno != 0) {
1325 		SPDK_ERRLOG("Renaming lvol failed\n");
1326 	}
1327 
1328 	req->cb_fn(req->cb_arg, lvolerrno);
1329 	free(req);
1330 }
1331 
1332 void
1333 vbdev_lvol_rename(struct spdk_lvol *lvol, const char *new_lvol_name,
1334 		  spdk_lvol_op_complete cb_fn, void *cb_arg)
1335 {
1336 	struct spdk_lvol_req *req;
1337 	int rc;
1338 
1339 	rc = _vbdev_lvol_change_bdev_alias(lvol, new_lvol_name);
1340 	if (rc != 0) {
1341 		SPDK_ERRLOG("renaming lvol to '%s' does not succeed\n", new_lvol_name);
1342 		cb_fn(cb_arg, rc);
1343 		return;
1344 	}
1345 
1346 	req = calloc(1, sizeof(*req));
1347 	if (req == NULL) {
1348 		cb_fn(cb_arg, -ENOMEM);
1349 		return;
1350 	}
1351 	req->cb_fn = cb_fn;
1352 	req->cb_arg = cb_arg;
1353 
1354 	spdk_lvol_rename(lvol, new_lvol_name, _vbdev_lvol_rename_cb, req);
1355 }
1356 
1357 static void
1358 _vbdev_lvol_resize_cb(void *cb_arg, int lvolerrno)
1359 {
1360 	struct spdk_lvol_req *req = cb_arg;
1361 	struct spdk_lvol *lvol = req->lvol;
1362 	uint64_t total_size;
1363 
1364 	/* change bdev size */
1365 	if (lvolerrno != 0) {
1366 		SPDK_ERRLOG("CB function for bdev lvol %s receive error no: %d.\n", lvol->name, lvolerrno);
1367 		goto finish;
1368 	}
1369 
1370 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
1371 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
1372 	assert((total_size % lvol->bdev->blocklen) == 0);
1373 
1374 	lvolerrno = spdk_bdev_notify_blockcnt_change(lvol->bdev, total_size / lvol->bdev->blocklen);
1375 	if (lvolerrno != 0) {
1376 		SPDK_ERRLOG("Could not change num blocks for bdev lvol %s with error no: %d.\n",
1377 			    lvol->name, lvolerrno);
1378 	}
1379 
1380 finish:
1381 	req->cb_fn(req->cb_arg, lvolerrno);
1382 	free(req);
1383 }
1384 
1385 void
1386 vbdev_lvol_resize(struct spdk_lvol *lvol, uint64_t sz, spdk_lvol_op_complete cb_fn, void *cb_arg)
1387 {
1388 	struct spdk_lvol_req *req;
1389 
1390 	if (lvol == NULL) {
1391 		SPDK_ERRLOG("lvol does not exist\n");
1392 		cb_fn(cb_arg, -EINVAL);
1393 		return;
1394 	}
1395 
1396 	assert(lvol->bdev != NULL);
1397 
1398 	req = calloc(1, sizeof(*req));
1399 	if (req == NULL) {
1400 		cb_fn(cb_arg, -ENOMEM);
1401 		return;
1402 	}
1403 
1404 	req->cb_fn = cb_fn;
1405 	req->cb_arg = cb_arg;
1406 	req->sz = sz;
1407 	req->lvol = lvol;
1408 
1409 	spdk_lvol_resize(req->lvol, req->sz, _vbdev_lvol_resize_cb, req);
1410 }
1411 
1412 static void
1413 _vbdev_lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
1414 {
1415 	struct spdk_lvol_req *req = cb_arg;
1416 	struct spdk_lvol *lvol = req->lvol;
1417 
1418 	if (lvolerrno != 0) {
1419 		SPDK_ERRLOG("Could not set bdev lvol %s as read only due to error: %d.\n", lvol->name, lvolerrno);
1420 	}
1421 
1422 	req->cb_fn(req->cb_arg, lvolerrno);
1423 	free(req);
1424 }
1425 
1426 void
1427 vbdev_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1428 {
1429 	struct spdk_lvol_req *req;
1430 
1431 	if (lvol == NULL) {
1432 		SPDK_ERRLOG("lvol does not exist\n");
1433 		cb_fn(cb_arg, -EINVAL);
1434 		return;
1435 	}
1436 
1437 	assert(lvol->bdev != NULL);
1438 
1439 	req = calloc(1, sizeof(*req));
1440 	if (req == NULL) {
1441 		cb_fn(cb_arg, -ENOMEM);
1442 		return;
1443 	}
1444 
1445 	req->cb_fn = cb_fn;
1446 	req->cb_arg = cb_arg;
1447 	req->lvol = lvol;
1448 
1449 	spdk_lvol_set_read_only(lvol, _vbdev_lvol_set_read_only_cb, req);
1450 }
1451 
1452 static int
1453 vbdev_lvs_init(void)
1454 {
1455 	return 0;
1456 }
1457 
1458 static void vbdev_lvs_fini_start_iter(struct lvol_store_bdev *lvs_bdev);
1459 
1460 static void
1461 vbdev_lvs_fini_start_unload_cb(void *cb_arg, int lvserrno)
1462 {
1463 	struct lvol_store_bdev *lvs_bdev = cb_arg;
1464 	struct lvol_store_bdev *next_lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
1465 
1466 	if (lvserrno != 0) {
1467 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
1468 	}
1469 
1470 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1471 	free(lvs_bdev);
1472 
1473 	vbdev_lvs_fini_start_iter(next_lvs_bdev);
1474 }
1475 
1476 static void
1477 vbdev_lvs_fini_start_iter(struct lvol_store_bdev *lvs_bdev)
1478 {
1479 	struct spdk_lvol_store *lvs;
1480 
1481 	while (lvs_bdev != NULL) {
1482 		lvs = lvs_bdev->lvs;
1483 
1484 		if (_vbdev_lvs_are_lvols_closed(lvs)) {
1485 			spdk_lvs_unload(lvs, vbdev_lvs_fini_start_unload_cb, lvs_bdev);
1486 			return;
1487 		}
1488 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
1489 	}
1490 
1491 	spdk_bdev_module_fini_start_done();
1492 }
1493 
1494 static void
1495 vbdev_lvs_fini_start(void)
1496 {
1497 	g_shutdown_started = true;
1498 	vbdev_lvs_fini_start_iter(vbdev_lvol_store_first());
1499 }
1500 
1501 static int
1502 vbdev_lvs_get_ctx_size(void)
1503 {
1504 	return sizeof(struct vbdev_lvol_io);
1505 }
1506 
1507 static void
1508 _vbdev_lvs_examine_done(struct spdk_lvs_req *req, int lvserrno)
1509 {
1510 	req->cb_fn(req->cb_arg, lvserrno);
1511 }
1512 
1513 static void
1514 _vbdev_lvs_examine_failed(void *cb_arg, int lvserrno)
1515 {
1516 	struct spdk_lvs_req *req = cb_arg;
1517 
1518 	_vbdev_lvs_examine_done(req, req->lvserrno);
1519 }
1520 
1521 static void
1522 _vbdev_lvs_examine_finish(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1523 {
1524 	struct spdk_lvs_req *req = cb_arg;
1525 	struct spdk_lvol_store *lvs = req->lvol_store;
1526 
1527 	if (lvolerrno != 0) {
1528 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
1529 		if (lvolerrno == -ENOMEM) {
1530 			TAILQ_INSERT_TAIL(&lvs->retry_open_lvols, lvol, link);
1531 			return;
1532 		}
1533 		SPDK_ERRLOG("Error opening lvol %s\n", lvol->unique_id);
1534 		lvs->lvol_count--;
1535 		free(lvol);
1536 		goto end;
1537 	}
1538 
1539 	if (_create_lvol_disk(lvol, false)) {
1540 		SPDK_ERRLOG("Cannot create bdev for lvol %s\n", lvol->unique_id);
1541 		lvs->lvol_count--;
1542 		goto end;
1543 	}
1544 
1545 	lvs->lvols_opened++;
1546 	SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s succeeded\n", lvol->unique_id);
1547 
1548 end:
1549 	if (!TAILQ_EMPTY(&lvs->retry_open_lvols)) {
1550 		lvol = TAILQ_FIRST(&lvs->retry_open_lvols);
1551 		TAILQ_REMOVE(&lvs->retry_open_lvols, lvol, link);
1552 		TAILQ_INSERT_HEAD(&lvs->lvols, lvol, link);
1553 		spdk_lvol_open(lvol, _vbdev_lvs_examine_finish, req);
1554 		return;
1555 	}
1556 	if (lvs->lvols_opened >= lvs->lvol_count) {
1557 		SPDK_INFOLOG(vbdev_lvol, "Opening lvols finished\n");
1558 		_vbdev_lvs_examine_done(req, 0);
1559 	}
1560 }
1561 
1562 /* Walks a tree of clones that are no longer degraded to create bdevs. */
1563 static int
1564 create_esnap_clone_lvol_disks(void *ctx, struct spdk_lvol *lvol)
1565 {
1566 	struct spdk_bdev *bdev = ctx;
1567 	int rc;
1568 
1569 	rc = _create_lvol_disk(lvol, false);
1570 	if (rc != 0) {
1571 		SPDK_ERRLOG("lvol %s: failed to create bdev after esnap hotplug of %s: %d\n",
1572 			    lvol->unique_id, spdk_bdev_get_name(bdev), rc);
1573 		/* Do not prevent creation of other clones in case of one failure. */
1574 		return 0;
1575 	}
1576 
1577 	return spdk_lvol_iter_immediate_clones(lvol, create_esnap_clone_lvol_disks, ctx);
1578 }
1579 
1580 static void
1581 vbdev_lvs_hotplug(void *ctx, struct spdk_lvol *lvol, int lvolerrno)
1582 {
1583 	struct spdk_bdev *esnap_clone_bdev = ctx;
1584 
1585 	if (lvolerrno != 0) {
1586 		SPDK_ERRLOG("lvol %s: during examine of bdev %s: not creating clone bdev due to "
1587 			    "error %d\n", lvol->unique_id, spdk_bdev_get_name(esnap_clone_bdev),
1588 			    lvolerrno);
1589 		return;
1590 	}
1591 	create_esnap_clone_lvol_disks(esnap_clone_bdev, lvol);
1592 }
1593 
1594 static void
1595 vbdev_lvs_examine_config(struct spdk_bdev *bdev)
1596 {
1597 	char uuid_str[SPDK_UUID_STRING_LEN];
1598 
1599 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid);
1600 
1601 	if (spdk_lvs_notify_hotplug(uuid_str, sizeof(uuid_str), vbdev_lvs_hotplug, bdev)) {
1602 		SPDK_INFOLOG(vbdev_lvol, "bdev %s: claimed by one ore more esnap clones\n",
1603 			     uuid_str);
1604 	}
1605 	spdk_bdev_module_examine_done(&g_lvol_if);
1606 }
1607 
1608 static void
1609 _vbdev_lvs_examine_cb(void *arg, struct spdk_lvol_store *lvol_store, int lvserrno)
1610 {
1611 	struct lvol_store_bdev *lvs_bdev;
1612 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)arg;
1613 	struct spdk_lvol *lvol, *tmp;
1614 	struct spdk_lvs_req *ori_req = req->cb_arg;
1615 
1616 	if (lvserrno == -EEXIST) {
1617 		SPDK_INFOLOG(vbdev_lvol,
1618 			     "Name for lvolstore on device %s conflicts with name for already loaded lvs\n",
1619 			     req->base_bdev->name);
1620 		/* On error blobstore destroys bs_dev itself */
1621 		_vbdev_lvs_examine_done(ori_req, lvserrno);
1622 		goto end;
1623 	} else if (lvserrno != 0) {
1624 		SPDK_INFOLOG(vbdev_lvol, "Lvol store not found on %s\n", req->base_bdev->name);
1625 		/* On error blobstore destroys bs_dev itself */
1626 		_vbdev_lvs_examine_done(ori_req, lvserrno);
1627 		goto end;
1628 	}
1629 
1630 	lvserrno = spdk_bs_bdev_claim(lvol_store->bs_dev, &g_lvol_if);
1631 	if (lvserrno != 0) {
1632 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
1633 		ori_req->lvserrno = lvserrno;
1634 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, ori_req);
1635 		goto end;
1636 	}
1637 
1638 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
1639 	if (!lvs_bdev) {
1640 		SPDK_ERRLOG("Cannot alloc memory for lvs_bdev\n");
1641 		ori_req->lvserrno = lvserrno;
1642 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, ori_req);
1643 		goto end;
1644 	}
1645 
1646 	lvs_bdev->lvs = lvol_store;
1647 	lvs_bdev->bdev = req->base_bdev;
1648 
1649 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1650 
1651 	SPDK_INFOLOG(vbdev_lvol, "Lvol store found on %s - begin parsing\n",
1652 		     req->base_bdev->name);
1653 
1654 	lvol_store->lvols_opened = 0;
1655 
1656 	ori_req->lvol_store = lvol_store;
1657 
1658 	if (TAILQ_EMPTY(&lvol_store->lvols)) {
1659 		SPDK_INFOLOG(vbdev_lvol, "Lvol store examination done\n");
1660 		_vbdev_lvs_examine_done(ori_req, 0);
1661 	} else {
1662 		/* Open all lvols */
1663 		TAILQ_FOREACH_SAFE(lvol, &lvol_store->lvols, link, tmp) {
1664 			spdk_lvol_open(lvol, _vbdev_lvs_examine_finish, ori_req);
1665 		}
1666 	}
1667 
1668 end:
1669 	free(req);
1670 }
1671 
1672 static void
1673 _vbdev_lvs_examine(struct spdk_bdev *bdev, struct spdk_lvs_req *ori_req,
1674 		   void (*action)(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg))
1675 {
1676 	struct spdk_bs_dev *bs_dev;
1677 	struct spdk_lvs_with_handle_req *req;
1678 	int rc;
1679 
1680 	req = calloc(1, sizeof(*req));
1681 	if (req == NULL) {
1682 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1683 		_vbdev_lvs_examine_done(ori_req, -ENOMEM);
1684 		return;
1685 	}
1686 
1687 	rc = spdk_bdev_create_bs_dev_ext(bdev->name, vbdev_lvs_base_bdev_event_cb,
1688 					 NULL, &bs_dev);
1689 	if (rc < 0) {
1690 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n", bdev->name);
1691 		_vbdev_lvs_examine_done(ori_req, rc);
1692 		free(req);
1693 		return;
1694 	}
1695 
1696 	req->base_bdev = bdev;
1697 	req->cb_arg = ori_req;
1698 
1699 	action(bs_dev, _vbdev_lvs_examine_cb, req);
1700 }
1701 
1702 static void
1703 vbdev_lvs_examine_done(void *arg, int lvserrno)
1704 {
1705 	struct spdk_lvs_req *req = arg;
1706 
1707 	spdk_bdev_module_examine_done(&g_lvol_if);
1708 	free(req);
1709 }
1710 
1711 static void
1712 vbdev_lvs_load(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
1713 {
1714 	struct spdk_lvs_opts lvs_opts;
1715 
1716 	spdk_lvs_opts_init(&lvs_opts);
1717 	lvs_opts.esnap_bs_dev_create = vbdev_lvol_esnap_dev_create;
1718 	spdk_lvs_load_ext(bs_dev, &lvs_opts, cb_fn, cb_arg);
1719 }
1720 
1721 static void
1722 vbdev_lvs_examine_disk(struct spdk_bdev *bdev)
1723 {
1724 	struct spdk_lvs_req *req;
1725 
1726 	if (spdk_bdev_get_md_size(bdev) != 0) {
1727 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n which is formatted with metadata",
1728 			     bdev->name);
1729 		spdk_bdev_module_examine_done(&g_lvol_if);
1730 		return;
1731 	}
1732 
1733 	req = calloc(1, sizeof(*req));
1734 	if (req == NULL) {
1735 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1736 		spdk_bdev_module_examine_done(&g_lvol_if);
1737 		return;
1738 	}
1739 
1740 	req->cb_fn = vbdev_lvs_examine_done;
1741 	req->cb_arg = req;
1742 
1743 	_vbdev_lvs_examine(bdev, req, vbdev_lvs_load);
1744 }
1745 
1746 struct spdk_lvol *
1747 vbdev_lvol_get_from_bdev(struct spdk_bdev *bdev)
1748 {
1749 	if (!bdev || bdev->module != &g_lvol_if) {
1750 		return NULL;
1751 	}
1752 
1753 	if (bdev->ctxt == NULL) {
1754 		SPDK_ERRLOG("No lvol ctx assigned to bdev %s\n", bdev->name);
1755 		return NULL;
1756 	}
1757 
1758 	return (struct spdk_lvol *)bdev->ctxt;
1759 }
1760 
1761 /* Begin degraded blobstore device */
1762 
1763 /*
1764  * When an external snapshot is missing, an instance of bs_dev_degraded is used as the blob's
1765  * back_bs_dev. No bdev is registered, so there should be no IO nor requests for channels. The main
1766  * purposes of this device are to prevent blobstore from hitting fatal runtime errors and to
1767  * indicate that the blob is degraded via the is_degraded() callback.
1768  */
1769 
1770 static void
1771 bs_dev_degraded_read(struct spdk_bs_dev *dev, struct spdk_io_channel *channel, void *payload,
1772 		     uint64_t lba, uint32_t lba_count, struct spdk_bs_dev_cb_args *cb_args)
1773 {
1774 	assert(false);
1775 	cb_args->cb_fn(cb_args->channel, cb_args->cb_arg, -EIO);
1776 }
1777 
1778 static void
1779 bs_dev_degraded_readv(struct spdk_bs_dev *dev, struct spdk_io_channel *channel,
1780 		      struct iovec *iov, int iovcnt, uint64_t lba, uint32_t lba_count,
1781 		      struct spdk_bs_dev_cb_args *cb_args)
1782 {
1783 	assert(false);
1784 	cb_args->cb_fn(cb_args->channel, cb_args->cb_arg, -EIO);
1785 }
1786 
1787 static void
1788 bs_dev_degraded_readv_ext(struct spdk_bs_dev *dev, struct spdk_io_channel *channel,
1789 			  struct iovec *iov, int iovcnt, uint64_t lba, uint32_t lba_count,
1790 			  struct spdk_bs_dev_cb_args *cb_args,
1791 			  struct spdk_blob_ext_io_opts *io_opts)
1792 {
1793 	assert(false);
1794 	cb_args->cb_fn(cb_args->channel, cb_args->cb_arg, -EIO);
1795 }
1796 
1797 static bool
1798 bs_dev_degraded_is_zeroes(struct spdk_bs_dev *dev, uint64_t lba, uint64_t lba_count)
1799 {
1800 	assert(false);
1801 	return false;
1802 }
1803 
1804 static bool
1805 bs_dev_degraded_is_range_valid(struct spdk_bs_dev *dev, uint64_t lba, uint64_t lba_count)
1806 {
1807 	assert(false);
1808 	return false;
1809 }
1810 
1811 static struct spdk_io_channel *
1812 bs_dev_degraded_create_channel(struct spdk_bs_dev *bs_dev)
1813 {
1814 	assert(false);
1815 	return NULL;
1816 }
1817 
1818 static void
1819 bs_dev_degraded_destroy_channel(struct spdk_bs_dev *bs_dev, struct spdk_io_channel *channel)
1820 {
1821 	assert(false);
1822 }
1823 
1824 static void
1825 bs_dev_degraded_destroy(struct spdk_bs_dev *bs_dev)
1826 {
1827 }
1828 
1829 static bool
1830 bs_dev_degraded_is_degraded(struct spdk_bs_dev *bs_dev)
1831 {
1832 	return true;
1833 }
1834 
1835 static struct spdk_bs_dev bs_dev_degraded = {
1836 	.create_channel = bs_dev_degraded_create_channel,
1837 	.destroy_channel = bs_dev_degraded_destroy_channel,
1838 	.destroy = bs_dev_degraded_destroy,
1839 	.read = bs_dev_degraded_read,
1840 	.readv = bs_dev_degraded_readv,
1841 	.readv_ext = bs_dev_degraded_readv_ext,
1842 	.is_zeroes = bs_dev_degraded_is_zeroes,
1843 	.is_range_valid = bs_dev_degraded_is_range_valid,
1844 	.is_degraded = bs_dev_degraded_is_degraded,
1845 	/* Make the device as large as possible without risk of uint64 overflow. */
1846 	.blockcnt = UINT64_MAX / 512,
1847 	/* Prevent divide by zero errors calculating LBAs that will never be read. */
1848 	.blocklen = 512,
1849 };
1850 
1851 /* End degraded blobstore device */
1852 
1853 /* Begin external snapshot support */
1854 
1855 static void
1856 vbdev_lvol_esnap_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1857 			       void *event_ctx)
1858 {
1859 	SPDK_NOTICELOG("bdev name (%s) received unsupported event type %d\n",
1860 		       spdk_bdev_get_name(bdev), type);
1861 }
1862 
1863 int
1864 vbdev_lvol_esnap_dev_create(void *bs_ctx, void *blob_ctx, struct spdk_blob *blob,
1865 			    const void *esnap_id, uint32_t id_len,
1866 			    struct spdk_bs_dev **_bs_dev)
1867 {
1868 	struct spdk_lvol_store	*lvs = bs_ctx;
1869 	struct spdk_lvol	*lvol = blob_ctx;
1870 	struct spdk_bs_dev	*bs_dev = NULL;
1871 	struct spdk_uuid	uuid;
1872 	int			rc;
1873 	char			uuid_str[SPDK_UUID_STRING_LEN] = { 0 };
1874 
1875 	if (esnap_id == NULL) {
1876 		SPDK_ERRLOG("lvol %s: NULL esnap ID\n", lvol->unique_id);
1877 		return -EINVAL;
1878 	}
1879 
1880 	/* Guard against arbitrary names and unterminated UUID strings */
1881 	if (id_len != SPDK_UUID_STRING_LEN) {
1882 		SPDK_ERRLOG("lvol %s: Invalid esnap ID length (%u)\n", lvol->unique_id, id_len);
1883 		return -EINVAL;
1884 	}
1885 
1886 	if (spdk_uuid_parse(&uuid, esnap_id)) {
1887 		SPDK_ERRLOG("lvol %s: Invalid esnap ID: not a UUID\n", lvol->unique_id);
1888 		return -EINVAL;
1889 	}
1890 
1891 	/* Format the UUID the same as it is in the bdev names tree. */
1892 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &uuid);
1893 	if (strcmp(uuid_str, esnap_id) != 0) {
1894 		SPDK_WARNLOG("lvol %s: esnap_id '%*s' does not match parsed uuid '%s'\n",
1895 			     lvol->unique_id, SPDK_UUID_STRING_LEN, (const char *)esnap_id,
1896 			     uuid_str);
1897 		assert(false);
1898 	}
1899 
1900 	rc = spdk_bdev_create_bs_dev(uuid_str, false, NULL, 0,
1901 				     vbdev_lvol_esnap_bdev_event_cb, NULL, &bs_dev);
1902 	if (rc != 0) {
1903 		goto fail;
1904 	}
1905 
1906 	rc = spdk_bs_bdev_claim(bs_dev, &g_lvol_if);
1907 	if (rc != 0) {
1908 		SPDK_ERRLOG("lvol %s: unable to claim esnap bdev '%s': %d\n", lvol->unique_id,
1909 			    uuid_str, rc);
1910 		bs_dev->destroy(bs_dev);
1911 		goto fail;
1912 	}
1913 
1914 	*_bs_dev = bs_dev;
1915 	return 0;
1916 
1917 fail:
1918 	/* Unable to open or claim the bdev. This lvol is degraded. */
1919 	bs_dev = &bs_dev_degraded;
1920 	SPDK_NOTICELOG("lvol %s: bdev %s not available: lvol is degraded\n", lvol->unique_id,
1921 		       uuid_str);
1922 
1923 	/*
1924 	 * Be sure not to call spdk_lvs_missing_add() on an lvol that is already degraded. This can
1925 	 * lead to a cycle in the degraded_lvols tailq.
1926 	 */
1927 	if (lvol->degraded_set == NULL) {
1928 		rc = spdk_lvs_esnap_missing_add(lvs, lvol, uuid_str, sizeof(uuid_str));
1929 		if (rc != 0) {
1930 			SPDK_NOTICELOG("lvol %s: unable to register missing esnap device %s: "
1931 				       "it will not be hotplugged if added later\n",
1932 				       lvol->unique_id, uuid_str);
1933 		}
1934 	}
1935 
1936 	*_bs_dev = bs_dev;
1937 	return 0;
1938 }
1939 
1940 /* End external snapshot support */
1941 
1942 SPDK_LOG_REGISTER_COMPONENT(vbdev_lvol)
1943