xref: /spdk/module/bdev/lvol/vbdev_lvol.c (revision f8410506e19c71a2a8979946bd5ca0314d2146d4)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/blob_bdev.h"
8 #include "spdk/rpc.h"
9 #include "spdk/bdev_module.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/uuid.h"
13 #include "spdk/blob.h"
14 
15 #include "vbdev_lvol.h"
16 
17 struct vbdev_lvol_io {
18 	struct spdk_blob_ext_io_opts ext_io_opts;
19 };
20 
21 static TAILQ_HEAD(, lvol_store_bdev) g_spdk_lvol_pairs = TAILQ_HEAD_INITIALIZER(
22 			g_spdk_lvol_pairs);
23 
24 static int vbdev_lvs_init(void);
25 static void vbdev_lvs_fini_start(void);
26 static int vbdev_lvs_get_ctx_size(void);
27 static void vbdev_lvs_examine_config(struct spdk_bdev *bdev);
28 static void vbdev_lvs_examine_disk(struct spdk_bdev *bdev);
29 static bool g_shutdown_started = false;
30 
31 struct spdk_bdev_module g_lvol_if = {
32 	.name = "lvol",
33 	.module_init = vbdev_lvs_init,
34 	.fini_start = vbdev_lvs_fini_start,
35 	.async_fini_start = true,
36 	.examine_config = vbdev_lvs_examine_config,
37 	.examine_disk = vbdev_lvs_examine_disk,
38 	.get_ctx_size = vbdev_lvs_get_ctx_size,
39 
40 };
41 
42 SPDK_BDEV_MODULE_REGISTER(lvol, &g_lvol_if)
43 
44 static void _vbdev_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg);
45 
46 struct lvol_store_bdev *
47 vbdev_get_lvs_bdev_by_lvs(struct spdk_lvol_store *lvs_orig)
48 {
49 	struct spdk_lvol_store *lvs = NULL;
50 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
51 
52 	while (lvs_bdev != NULL) {
53 		lvs = lvs_bdev->lvs;
54 		if (lvs == lvs_orig) {
55 			if (lvs_bdev->removal_in_progress) {
56 				/* We do not allow access to lvs that are being unloaded or
57 				 * destroyed */
58 				SPDK_DEBUGLOG(vbdev_lvol, "lvs %s: removal in progress\n",
59 					      lvs_orig->name);
60 				return NULL;
61 			} else {
62 				return lvs_bdev;
63 			}
64 		}
65 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
66 	}
67 
68 	return NULL;
69 }
70 
71 static int
72 _vbdev_lvol_change_bdev_alias(struct spdk_lvol *lvol, const char *new_lvol_name)
73 {
74 	struct spdk_bdev_alias *tmp;
75 	char *old_alias;
76 	char *alias;
77 	int rc;
78 	int alias_number = 0;
79 
80 	/* bdev representing lvols have only one alias,
81 	 * while we changed lvs name earlier, we have to iterate alias list to get one,
82 	 * and check if there is only one alias */
83 
84 	TAILQ_FOREACH(tmp, spdk_bdev_get_aliases(lvol->bdev), tailq) {
85 		if (++alias_number > 1) {
86 			SPDK_ERRLOG("There is more than 1 alias in bdev %s\n", lvol->bdev->name);
87 			return -EINVAL;
88 		}
89 
90 		old_alias = tmp->alias.name;
91 	}
92 
93 	if (alias_number == 0) {
94 		SPDK_ERRLOG("There are no aliases in bdev %s\n", lvol->bdev->name);
95 		return -EINVAL;
96 	}
97 
98 	alias = spdk_sprintf_alloc("%s/%s", lvol->lvol_store->name, new_lvol_name);
99 	if (alias == NULL) {
100 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
101 		return -ENOMEM;
102 	}
103 
104 	rc = spdk_bdev_alias_add(lvol->bdev, alias);
105 	if (rc != 0) {
106 		SPDK_ERRLOG("cannot add alias '%s'\n", alias);
107 		free(alias);
108 		return rc;
109 	}
110 	free(alias);
111 
112 	rc = spdk_bdev_alias_del(lvol->bdev, old_alias);
113 	if (rc != 0) {
114 		SPDK_ERRLOG("cannot remove alias '%s'\n", old_alias);
115 		return rc;
116 	}
117 
118 	return 0;
119 }
120 
121 static struct lvol_store_bdev *
122 vbdev_get_lvs_bdev_by_bdev(struct spdk_bdev *bdev_orig)
123 {
124 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
125 
126 	while (lvs_bdev != NULL) {
127 		if (lvs_bdev->bdev == bdev_orig) {
128 			if (lvs_bdev->removal_in_progress) {
129 				/* We do not allow access to lvs that are being unloaded or
130 				 * destroyed */
131 				SPDK_DEBUGLOG(vbdev_lvol, "lvs %s: removal in progress\n",
132 					      lvs_bdev->lvs->name);
133 				return NULL;
134 			} else {
135 				return lvs_bdev;
136 			}
137 		}
138 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
139 	}
140 
141 	return NULL;
142 }
143 
144 static void
145 vbdev_lvs_hotremove_cb(struct spdk_bdev *bdev)
146 {
147 	struct lvol_store_bdev *lvs_bdev;
148 
149 	lvs_bdev = vbdev_get_lvs_bdev_by_bdev(bdev);
150 	if (lvs_bdev != NULL) {
151 		SPDK_NOTICELOG("bdev %s being removed: closing lvstore %s\n",
152 			       spdk_bdev_get_name(bdev), lvs_bdev->lvs->name);
153 		vbdev_lvs_unload(lvs_bdev->lvs, NULL, NULL);
154 	}
155 }
156 
157 static void
158 vbdev_lvs_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
159 			     void *event_ctx)
160 {
161 	switch (type) {
162 	case SPDK_BDEV_EVENT_REMOVE:
163 		vbdev_lvs_hotremove_cb(bdev);
164 		break;
165 	default:
166 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
167 		break;
168 	}
169 }
170 
171 static void
172 _vbdev_lvs_create_cb(void *cb_arg, struct spdk_lvol_store *lvs, int lvserrno)
173 {
174 	struct spdk_lvs_with_handle_req *req = cb_arg;
175 	struct lvol_store_bdev *lvs_bdev;
176 	struct spdk_bdev *bdev = req->base_bdev;
177 	struct spdk_bs_dev *bs_dev = req->bs_dev;
178 
179 	if (lvserrno != 0) {
180 		assert(lvs == NULL);
181 		SPDK_ERRLOG("Cannot create lvol store bdev\n");
182 		goto end;
183 	}
184 
185 	lvserrno = spdk_bs_bdev_claim(bs_dev, &g_lvol_if);
186 	if (lvserrno != 0) {
187 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
188 		req->bs_dev->destroy(req->bs_dev);
189 		goto end;
190 	}
191 
192 	assert(lvs != NULL);
193 
194 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
195 	if (!lvs_bdev) {
196 		lvserrno = -ENOMEM;
197 		goto end;
198 	}
199 	lvs_bdev->lvs = lvs;
200 	lvs_bdev->bdev = bdev;
201 	lvs_bdev->req = NULL;
202 
203 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
204 	SPDK_INFOLOG(vbdev_lvol, "Lvol store bdev inserted\n");
205 
206 end:
207 	req->cb_fn(req->cb_arg, lvs, lvserrno);
208 	free(req);
209 
210 	return;
211 }
212 
213 int
214 vbdev_lvs_create(const char *base_bdev_name, const char *name, uint32_t cluster_sz,
215 		 enum lvs_clear_method clear_method, uint32_t num_md_pages_per_cluster_ratio,
216 		 spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
217 {
218 	struct spdk_bs_dev *bs_dev;
219 	struct spdk_lvs_with_handle_req *lvs_req;
220 	struct spdk_lvs_opts opts;
221 	int rc;
222 	int len;
223 
224 	if (base_bdev_name == NULL) {
225 		SPDK_ERRLOG("missing base_bdev_name param\n");
226 		return -EINVAL;
227 	}
228 
229 	spdk_lvs_opts_init(&opts);
230 	if (cluster_sz != 0) {
231 		opts.cluster_sz = cluster_sz;
232 	}
233 
234 	if (clear_method != 0) {
235 		opts.clear_method = clear_method;
236 	}
237 
238 	if (num_md_pages_per_cluster_ratio != 0) {
239 		opts.num_md_pages_per_cluster_ratio = num_md_pages_per_cluster_ratio;
240 	}
241 
242 	if (name == NULL) {
243 		SPDK_ERRLOG("missing name param\n");
244 		return -EINVAL;
245 	}
246 
247 	len = strnlen(name, SPDK_LVS_NAME_MAX);
248 
249 	if (len == 0 || len == SPDK_LVS_NAME_MAX) {
250 		SPDK_ERRLOG("name must be between 1 and %d characters\n", SPDK_LVS_NAME_MAX - 1);
251 		return -EINVAL;
252 	}
253 	snprintf(opts.name, sizeof(opts.name), "%s", name);
254 	opts.esnap_bs_dev_create = vbdev_lvol_esnap_dev_create;
255 
256 	lvs_req = calloc(1, sizeof(*lvs_req));
257 	if (!lvs_req) {
258 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
259 		return -ENOMEM;
260 	}
261 
262 	rc = spdk_bdev_create_bs_dev_ext(base_bdev_name, vbdev_lvs_base_bdev_event_cb,
263 					 NULL, &bs_dev);
264 	if (rc < 0) {
265 		SPDK_ERRLOG("Cannot create blobstore device\n");
266 		free(lvs_req);
267 		return rc;
268 	}
269 
270 	lvs_req->bs_dev = bs_dev;
271 	lvs_req->base_bdev = bs_dev->get_base_bdev(bs_dev);
272 	lvs_req->cb_fn = cb_fn;
273 	lvs_req->cb_arg = cb_arg;
274 
275 	rc = spdk_lvs_init(bs_dev, &opts, _vbdev_lvs_create_cb, lvs_req);
276 	if (rc < 0) {
277 		free(lvs_req);
278 		bs_dev->destroy(bs_dev);
279 		return rc;
280 	}
281 
282 	return 0;
283 }
284 
285 static void
286 _vbdev_lvs_rename_cb(void *cb_arg, int lvserrno)
287 {
288 	struct spdk_lvs_req *req = cb_arg;
289 	struct spdk_lvol *tmp;
290 
291 	if (lvserrno != 0) {
292 		SPDK_INFOLOG(vbdev_lvol, "Lvol store rename failed\n");
293 	} else {
294 		TAILQ_FOREACH(tmp, &req->lvol_store->lvols, link) {
295 			/* We have to pass current lvol name, since only lvs name changed */
296 			_vbdev_lvol_change_bdev_alias(tmp, tmp->name);
297 		}
298 	}
299 
300 	req->cb_fn(req->cb_arg, lvserrno);
301 	free(req);
302 }
303 
304 void
305 vbdev_lvs_rename(struct spdk_lvol_store *lvs, const char *new_lvs_name,
306 		 spdk_lvs_op_complete cb_fn, void *cb_arg)
307 {
308 	struct lvol_store_bdev *lvs_bdev;
309 
310 	struct spdk_lvs_req *req;
311 
312 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
313 	if (!lvs_bdev) {
314 		SPDK_ERRLOG("No such lvol store found\n");
315 		cb_fn(cb_arg, -ENODEV);
316 		return;
317 	}
318 
319 	req = calloc(1, sizeof(*req));
320 	if (!req) {
321 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
322 		cb_fn(cb_arg, -ENOMEM);
323 		return;
324 	}
325 	req->cb_fn = cb_fn;
326 	req->cb_arg = cb_arg;
327 	req->lvol_store = lvs;
328 
329 	spdk_lvs_rename(lvs, new_lvs_name, _vbdev_lvs_rename_cb, req);
330 }
331 
332 static void
333 _vbdev_lvs_remove_cb(void *cb_arg, int lvserrno)
334 {
335 	struct lvol_store_bdev *lvs_bdev = cb_arg;
336 	struct spdk_lvs_req *req = lvs_bdev->req;
337 
338 	if (lvserrno != 0) {
339 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
340 	}
341 
342 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
343 	free(lvs_bdev);
344 
345 	if (req->cb_fn != NULL) {
346 		req->cb_fn(req->cb_arg, lvserrno);
347 	}
348 	free(req);
349 }
350 
351 static void
352 _vbdev_lvs_remove_lvol_cb(void *cb_arg, int lvolerrno)
353 {
354 	struct lvol_store_bdev *lvs_bdev = cb_arg;
355 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
356 	struct spdk_lvol *lvol;
357 
358 	if (lvolerrno != 0) {
359 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol removed with errno %d\n", lvolerrno);
360 	}
361 
362 	if (TAILQ_EMPTY(&lvs->lvols)) {
363 		spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
364 		return;
365 	}
366 
367 	lvol = TAILQ_FIRST(&lvs->lvols);
368 	while (lvol != NULL) {
369 		if (spdk_lvol_deletable(lvol)) {
370 			_vbdev_lvol_destroy(lvol, _vbdev_lvs_remove_lvol_cb, lvs_bdev);
371 			return;
372 		}
373 		lvol = TAILQ_NEXT(lvol, link);
374 	}
375 
376 	/* If no lvol is deletable, that means there is circular dependency. */
377 	SPDK_ERRLOG("Lvols left in lvs, but unable to delete.\n");
378 	assert(false);
379 }
380 
381 static bool
382 _vbdev_lvs_are_lvols_closed(struct spdk_lvol_store *lvs)
383 {
384 	struct spdk_lvol *lvol;
385 
386 	TAILQ_FOREACH(lvol, &lvs->lvols, link) {
387 		if (lvol->ref_count != 0) {
388 			return false;
389 		}
390 	}
391 	return true;
392 }
393 
394 static void
395 _vbdev_lvs_remove_bdev_unregistered_cb(void *cb_arg, int bdeverrno)
396 {
397 	struct lvol_store_bdev *lvs_bdev = cb_arg;
398 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
399 
400 	if (bdeverrno != 0) {
401 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol unregistered with errno %d\n", bdeverrno);
402 	}
403 
404 	/* Lvol store can be unloaded once all lvols are closed. */
405 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
406 		spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
407 	}
408 }
409 
410 static void
411 _vbdev_lvs_remove(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg,
412 		  bool destroy)
413 {
414 	struct spdk_lvs_req *req;
415 	struct lvol_store_bdev *lvs_bdev;
416 	struct spdk_lvol *lvol, *tmp;
417 
418 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
419 	if (!lvs_bdev) {
420 		SPDK_ERRLOG("No such lvol store found\n");
421 		if (cb_fn != NULL) {
422 			cb_fn(cb_arg, -ENODEV);
423 		}
424 		return;
425 	}
426 
427 	req = calloc(1, sizeof(*req));
428 	if (!req) {
429 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
430 		if (cb_fn != NULL) {
431 			cb_fn(cb_arg, -ENOMEM);
432 		}
433 		return;
434 	}
435 
436 	lvs_bdev->removal_in_progress = true;
437 
438 	req->cb_fn = cb_fn;
439 	req->cb_arg = cb_arg;
440 	lvs_bdev->req = req;
441 
442 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
443 		if (destroy) {
444 			spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
445 			return;
446 		}
447 		spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
448 		return;
449 	}
450 	if (destroy) {
451 		_vbdev_lvs_remove_lvol_cb(lvs_bdev, 0);
452 		return;
453 	}
454 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
455 		if (lvol->bdev == NULL) {
456 			spdk_lvol_close(lvol, _vbdev_lvs_remove_bdev_unregistered_cb, lvs_bdev);
457 			continue;
458 		}
459 		spdk_bdev_unregister(lvol->bdev, _vbdev_lvs_remove_bdev_unregistered_cb, lvs_bdev);
460 	}
461 }
462 
463 void
464 vbdev_lvs_unload(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
465 {
466 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, false);
467 }
468 
469 void
470 vbdev_lvs_destruct(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
471 {
472 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, true);
473 }
474 
475 struct lvol_store_bdev *
476 vbdev_lvol_store_first(void)
477 {
478 	struct lvol_store_bdev *lvs_bdev;
479 
480 	lvs_bdev = TAILQ_FIRST(&g_spdk_lvol_pairs);
481 	if (lvs_bdev) {
482 		SPDK_INFOLOG(vbdev_lvol, "Starting lvolstore iteration at %p\n", lvs_bdev->lvs);
483 	}
484 
485 	return lvs_bdev;
486 }
487 
488 struct lvol_store_bdev *
489 vbdev_lvol_store_next(struct lvol_store_bdev *prev)
490 {
491 	struct lvol_store_bdev *lvs_bdev;
492 
493 	if (prev == NULL) {
494 		SPDK_ERRLOG("prev argument cannot be NULL\n");
495 		return NULL;
496 	}
497 
498 	lvs_bdev = TAILQ_NEXT(prev, lvol_stores);
499 	if (lvs_bdev) {
500 		SPDK_INFOLOG(vbdev_lvol, "Continuing lvolstore iteration at %p\n", lvs_bdev->lvs);
501 	}
502 
503 	return lvs_bdev;
504 }
505 
506 static struct spdk_lvol_store *
507 _vbdev_get_lvol_store_by_uuid(const struct spdk_uuid *uuid)
508 {
509 	struct spdk_lvol_store *lvs = NULL;
510 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
511 
512 	while (lvs_bdev != NULL) {
513 		lvs = lvs_bdev->lvs;
514 		if (spdk_uuid_compare(&lvs->uuid, uuid) == 0) {
515 			return lvs;
516 		}
517 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
518 	}
519 	return NULL;
520 }
521 
522 struct spdk_lvol_store *
523 vbdev_get_lvol_store_by_uuid(const char *uuid_str)
524 {
525 	struct spdk_uuid uuid;
526 
527 	if (spdk_uuid_parse(&uuid, uuid_str)) {
528 		return NULL;
529 	}
530 
531 	return _vbdev_get_lvol_store_by_uuid(&uuid);
532 }
533 
534 struct spdk_lvol_store *
535 vbdev_get_lvol_store_by_name(const char *name)
536 {
537 	struct spdk_lvol_store *lvs = NULL;
538 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
539 
540 	while (lvs_bdev != NULL) {
541 		lvs = lvs_bdev->lvs;
542 		if (strncmp(lvs->name, name, sizeof(lvs->name)) == 0) {
543 			return lvs;
544 		}
545 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
546 	}
547 	return NULL;
548 }
549 
550 struct vbdev_lvol_destroy_ctx {
551 	struct spdk_lvol *lvol;
552 	spdk_lvol_op_complete cb_fn;
553 	void *cb_arg;
554 };
555 
556 static void
557 _vbdev_lvol_unregister_unload_lvs(void *cb_arg, int lvserrno)
558 {
559 	struct lvol_bdev *lvol_bdev = cb_arg;
560 	struct lvol_store_bdev *lvs_bdev = lvol_bdev->lvs_bdev;
561 
562 	if (lvserrno != 0) {
563 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
564 	}
565 
566 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
567 	free(lvs_bdev);
568 
569 	spdk_bdev_destruct_done(&lvol_bdev->bdev, lvserrno);
570 	free(lvol_bdev);
571 }
572 
573 static void
574 _vbdev_lvol_unregister_cb(void *ctx, int lvolerrno)
575 {
576 	struct lvol_bdev *lvol_bdev = ctx;
577 	struct lvol_store_bdev *lvs_bdev = lvol_bdev->lvs_bdev;
578 
579 	if (g_shutdown_started && _vbdev_lvs_are_lvols_closed(lvs_bdev->lvs)) {
580 		spdk_lvs_unload(lvs_bdev->lvs, _vbdev_lvol_unregister_unload_lvs, lvol_bdev);
581 		return;
582 	}
583 
584 	spdk_bdev_destruct_done(&lvol_bdev->bdev, lvolerrno);
585 	free(lvol_bdev);
586 }
587 
588 static int
589 vbdev_lvol_unregister(void *ctx)
590 {
591 	struct spdk_lvol *lvol = ctx;
592 	struct lvol_bdev *lvol_bdev;
593 
594 	assert(lvol != NULL);
595 	lvol_bdev = SPDK_CONTAINEROF(lvol->bdev, struct lvol_bdev, bdev);
596 
597 	spdk_bdev_alias_del_all(lvol->bdev);
598 	spdk_lvol_close(lvol, _vbdev_lvol_unregister_cb, lvol_bdev);
599 
600 	/* return 1 to indicate we have an operation that must finish asynchronously before the
601 	 *  lvol is closed
602 	 */
603 	return 1;
604 }
605 
606 static void
607 _vbdev_lvol_destroy_cb(void *cb_arg, int bdeverrno)
608 {
609 	struct vbdev_lvol_destroy_ctx *ctx = cb_arg;
610 	struct spdk_lvol *lvol = ctx->lvol;
611 
612 	if (bdeverrno < 0) {
613 		SPDK_INFOLOG(vbdev_lvol, "Could not unregister bdev during lvol (%s) destroy\n",
614 			     lvol->unique_id);
615 		ctx->cb_fn(ctx->cb_arg, bdeverrno);
616 		free(ctx);
617 		return;
618 	}
619 
620 	spdk_lvol_destroy(lvol, ctx->cb_fn, ctx->cb_arg);
621 	free(ctx);
622 }
623 
624 static void
625 _vbdev_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
626 {
627 	struct vbdev_lvol_destroy_ctx *ctx;
628 	size_t count;
629 
630 	assert(lvol != NULL);
631 	assert(cb_fn != NULL);
632 
633 	/* Callers other than _vbdev_lvs_remove() must ensure the lvstore is not being removed. */
634 	assert(cb_fn == _vbdev_lvs_remove_lvol_cb ||
635 	       vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store) != NULL);
636 
637 	/* Check if it is possible to delete lvol */
638 	spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
639 	if (count > 1) {
640 		/* throw an error */
641 		SPDK_ERRLOG("Cannot delete lvol\n");
642 		cb_fn(cb_arg, -EPERM);
643 		return;
644 	}
645 
646 	ctx = calloc(1, sizeof(*ctx));
647 	if (!ctx) {
648 		cb_fn(cb_arg, -ENOMEM);
649 		return;
650 	}
651 
652 	ctx->lvol = lvol;
653 	ctx->cb_fn = cb_fn;
654 	ctx->cb_arg = cb_arg;
655 
656 	if (spdk_lvol_is_degraded(lvol)) {
657 		spdk_lvol_close(lvol, _vbdev_lvol_destroy_cb, ctx);
658 		return;
659 	}
660 
661 	spdk_bdev_unregister(lvol->bdev, _vbdev_lvol_destroy_cb, ctx);
662 }
663 
664 void
665 vbdev_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
666 {
667 	struct lvol_store_bdev *lvs_bdev;
668 
669 	/*
670 	 * During destruction of an lvolstore, _vbdev_lvs_unload() iterates through lvols until they
671 	 * are all deleted. There may be some IO required
672 	 */
673 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
674 	if (lvs_bdev == NULL) {
675 		SPDK_DEBUGLOG(vbdev_lvol, "lvol %s: lvolstore is being removed\n",
676 			      lvol->unique_id);
677 		cb_fn(cb_arg, -ENODEV);
678 		return;
679 	}
680 
681 	_vbdev_lvol_destroy(lvol, cb_fn, cb_arg);
682 }
683 
684 static char *
685 vbdev_lvol_find_name(struct spdk_lvol *lvol, spdk_blob_id blob_id)
686 {
687 	struct spdk_lvol_store *lvs;
688 	struct spdk_lvol *_lvol;
689 
690 	assert(lvol != NULL);
691 
692 	lvs = lvol->lvol_store;
693 
694 	assert(lvs);
695 
696 	TAILQ_FOREACH(_lvol, &lvs->lvols, link) {
697 		if (_lvol->blob_id == blob_id) {
698 			return _lvol->name;
699 		}
700 	}
701 
702 	return NULL;
703 }
704 
705 static int
706 vbdev_lvol_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
707 {
708 	struct spdk_lvol *lvol = ctx;
709 	struct lvol_store_bdev *lvs_bdev;
710 	struct spdk_bdev *bdev;
711 	struct spdk_blob *blob;
712 	spdk_blob_id *ids = NULL;
713 	size_t count, i;
714 	char *name;
715 	int rc = 0;
716 
717 	spdk_json_write_named_object_begin(w, "lvol");
718 
719 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
720 	if (!lvs_bdev) {
721 		SPDK_ERRLOG("No such lvol store found\n");
722 		rc = -ENODEV;
723 		goto end;
724 	}
725 
726 	bdev = lvs_bdev->bdev;
727 
728 	spdk_json_write_named_uuid(w, "lvol_store_uuid", &lvol->lvol_store->uuid);
729 
730 	spdk_json_write_named_string(w, "base_bdev", spdk_bdev_get_name(bdev));
731 
732 	blob = lvol->blob;
733 
734 	spdk_json_write_named_bool(w, "thin_provision", spdk_blob_is_thin_provisioned(blob));
735 
736 	spdk_json_write_named_bool(w, "snapshot", spdk_blob_is_snapshot(blob));
737 
738 	spdk_json_write_named_bool(w, "clone", spdk_blob_is_clone(blob));
739 
740 	if (spdk_blob_is_clone(blob)) {
741 		spdk_blob_id snapshotid = spdk_blob_get_parent_snapshot(lvol->lvol_store->blobstore, lvol->blob_id);
742 		if (snapshotid != SPDK_BLOBID_INVALID) {
743 			name = vbdev_lvol_find_name(lvol, snapshotid);
744 			if (name != NULL) {
745 				spdk_json_write_named_string(w, "base_snapshot", name);
746 			} else {
747 				SPDK_ERRLOG("Cannot obtain snapshots name\n");
748 			}
749 		}
750 	}
751 
752 	if (spdk_blob_is_snapshot(blob)) {
753 		/* Take a number of clones */
754 		rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
755 		if (rc == -ENOMEM && count > 0) {
756 			ids = malloc(sizeof(spdk_blob_id) * count);
757 			if (ids == NULL) {
758 				SPDK_ERRLOG("Cannot allocate memory\n");
759 				rc = -ENOMEM;
760 				goto end;
761 			}
762 
763 			rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, ids, &count);
764 			if (rc == 0) {
765 				spdk_json_write_named_array_begin(w, "clones");
766 				for (i = 0; i < count; i++) {
767 					name = vbdev_lvol_find_name(lvol, ids[i]);
768 					if (name != NULL) {
769 						spdk_json_write_string(w, name);
770 					} else {
771 						SPDK_ERRLOG("Cannot obtain clone name\n");
772 					}
773 
774 				}
775 				spdk_json_write_array_end(w);
776 			}
777 			free(ids);
778 		}
779 
780 	}
781 
782 	spdk_json_write_named_bool(w, "esnap_clone", spdk_blob_is_esnap_clone(blob));
783 
784 	if (spdk_blob_is_esnap_clone(blob)) {
785 		const char *name;
786 		size_t name_len;
787 
788 		rc = spdk_blob_get_esnap_id(blob, (const void **)&name, &name_len);
789 		if (rc == 0 && name != NULL && strnlen(name, name_len) + 1 == name_len) {
790 			spdk_json_write_named_string(w, "external_snapshot_name", name);
791 		}
792 	}
793 
794 end:
795 	spdk_json_write_object_end(w);
796 
797 	return rc;
798 }
799 
800 static void
801 vbdev_lvol_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
802 {
803 	/* Nothing to dump as lvol configuration is saved on physical device. */
804 }
805 
806 static struct spdk_io_channel *
807 vbdev_lvol_get_io_channel(void *ctx)
808 {
809 	struct spdk_lvol *lvol = ctx;
810 
811 	return spdk_lvol_get_io_channel(lvol);
812 }
813 
814 static bool
815 vbdev_lvol_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
816 {
817 	struct spdk_lvol *lvol = ctx;
818 
819 	switch (io_type) {
820 	case SPDK_BDEV_IO_TYPE_WRITE:
821 	case SPDK_BDEV_IO_TYPE_UNMAP:
822 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
823 		return !spdk_blob_is_read_only(lvol->blob);
824 	case SPDK_BDEV_IO_TYPE_RESET:
825 	case SPDK_BDEV_IO_TYPE_READ:
826 	case SPDK_BDEV_IO_TYPE_SEEK_DATA:
827 	case SPDK_BDEV_IO_TYPE_SEEK_HOLE:
828 		return true;
829 	default:
830 		return false;
831 	}
832 }
833 
834 static void
835 lvol_op_comp(void *cb_arg, int bserrno)
836 {
837 	struct spdk_bdev_io *bdev_io = cb_arg;
838 	enum spdk_bdev_io_status status = SPDK_BDEV_IO_STATUS_SUCCESS;
839 
840 	if (bserrno != 0) {
841 		if (bserrno == -ENOMEM) {
842 			status = SPDK_BDEV_IO_STATUS_NOMEM;
843 		} else {
844 			status = SPDK_BDEV_IO_STATUS_FAILED;
845 		}
846 	}
847 
848 	spdk_bdev_io_complete(bdev_io, status);
849 }
850 
851 static void
852 lvol_unmap(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
853 {
854 	uint64_t start_page, num_pages;
855 	struct spdk_blob *blob = lvol->blob;
856 
857 	start_page = bdev_io->u.bdev.offset_blocks;
858 	num_pages = bdev_io->u.bdev.num_blocks;
859 
860 	spdk_blob_io_unmap(blob, ch, start_page, num_pages, lvol_op_comp, bdev_io);
861 }
862 
863 static void
864 lvol_seek_data(struct spdk_lvol *lvol, struct spdk_bdev_io *bdev_io)
865 {
866 	bdev_io->u.bdev.seek.offset = spdk_blob_get_next_allocated_io_unit(lvol->blob,
867 				      bdev_io->u.bdev.offset_blocks);
868 
869 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
870 }
871 
872 static void
873 lvol_seek_hole(struct spdk_lvol *lvol, struct spdk_bdev_io *bdev_io)
874 {
875 	bdev_io->u.bdev.seek.offset = spdk_blob_get_next_unallocated_io_unit(lvol->blob,
876 				      bdev_io->u.bdev.offset_blocks);
877 
878 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
879 }
880 
881 static void
882 lvol_write_zeroes(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
883 {
884 	uint64_t start_page, num_pages;
885 	struct spdk_blob *blob = lvol->blob;
886 
887 	start_page = bdev_io->u.bdev.offset_blocks;
888 	num_pages = bdev_io->u.bdev.num_blocks;
889 
890 	spdk_blob_io_write_zeroes(blob, ch, start_page, num_pages, lvol_op_comp, bdev_io);
891 }
892 
893 static void
894 lvol_read(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
895 {
896 	uint64_t start_page, num_pages;
897 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
898 	struct spdk_blob *blob = lvol->blob;
899 	struct vbdev_lvol_io *lvol_io = (struct vbdev_lvol_io *)bdev_io->driver_ctx;
900 
901 	start_page = bdev_io->u.bdev.offset_blocks;
902 	num_pages = bdev_io->u.bdev.num_blocks;
903 
904 	lvol_io->ext_io_opts.size = sizeof(lvol_io->ext_io_opts);
905 	lvol_io->ext_io_opts.memory_domain = bdev_io->u.bdev.memory_domain;
906 	lvol_io->ext_io_opts.memory_domain_ctx = bdev_io->u.bdev.memory_domain_ctx;
907 
908 	spdk_blob_io_readv_ext(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
909 			       num_pages, lvol_op_comp, bdev_io, &lvol_io->ext_io_opts);
910 }
911 
912 static void
913 lvol_write(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
914 {
915 	uint64_t start_page, num_pages;
916 	struct spdk_blob *blob = lvol->blob;
917 	struct vbdev_lvol_io *lvol_io = (struct vbdev_lvol_io *)bdev_io->driver_ctx;
918 
919 	start_page = bdev_io->u.bdev.offset_blocks;
920 	num_pages = bdev_io->u.bdev.num_blocks;
921 
922 	lvol_io->ext_io_opts.size = sizeof(lvol_io->ext_io_opts);
923 	lvol_io->ext_io_opts.memory_domain = bdev_io->u.bdev.memory_domain;
924 	lvol_io->ext_io_opts.memory_domain_ctx = bdev_io->u.bdev.memory_domain_ctx;
925 
926 	spdk_blob_io_writev_ext(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
927 				num_pages, lvol_op_comp, bdev_io, &lvol_io->ext_io_opts);
928 }
929 
930 static int
931 lvol_reset(struct spdk_bdev_io *bdev_io)
932 {
933 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
934 
935 	return 0;
936 }
937 
938 static void
939 lvol_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
940 {
941 	if (!success) {
942 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
943 		return;
944 	}
945 
946 	lvol_read(ch, bdev_io);
947 }
948 
949 static void
950 vbdev_lvol_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
951 {
952 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
953 
954 	switch (bdev_io->type) {
955 	case SPDK_BDEV_IO_TYPE_READ:
956 		spdk_bdev_io_get_buf(bdev_io, lvol_get_buf_cb,
957 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
958 		break;
959 	case SPDK_BDEV_IO_TYPE_WRITE:
960 		lvol_write(lvol, ch, bdev_io);
961 		break;
962 	case SPDK_BDEV_IO_TYPE_RESET:
963 		lvol_reset(bdev_io);
964 		break;
965 	case SPDK_BDEV_IO_TYPE_UNMAP:
966 		lvol_unmap(lvol, ch, bdev_io);
967 		break;
968 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
969 		lvol_write_zeroes(lvol, ch, bdev_io);
970 		break;
971 	case SPDK_BDEV_IO_TYPE_SEEK_DATA:
972 		lvol_seek_data(lvol, bdev_io);
973 		break;
974 	case SPDK_BDEV_IO_TYPE_SEEK_HOLE:
975 		lvol_seek_hole(lvol, bdev_io);
976 		break;
977 	default:
978 		SPDK_INFOLOG(vbdev_lvol, "lvol: unsupported I/O type %d\n", bdev_io->type);
979 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
980 		return;
981 	}
982 	return;
983 }
984 
985 static int
986 vbdev_lvol_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
987 {
988 	struct spdk_lvol *lvol = ctx;
989 	struct spdk_bdev *base_bdev, *esnap_bdev;
990 	struct spdk_bs_dev *bs_dev;
991 	struct spdk_lvol_store *lvs;
992 	int base_cnt, esnap_cnt;
993 
994 	lvs = lvol->lvol_store;
995 	base_bdev = lvs->bs_dev->get_base_bdev(lvol->lvol_store->bs_dev);
996 
997 	base_cnt = spdk_bdev_get_memory_domains(base_bdev, domains, array_size);
998 	if (base_cnt < 0) {
999 		return base_cnt;
1000 	}
1001 
1002 	if (lvol->blob == NULL) {
1003 		/*
1004 		 * This is probably called due to an open happening during blobstore load. Another
1005 		 * open will follow shortly that has lvol->blob set.
1006 		 */
1007 		return -EAGAIN;
1008 	}
1009 
1010 	if (!spdk_blob_is_esnap_clone(lvol->blob)) {
1011 		return base_cnt;
1012 	}
1013 
1014 	bs_dev = spdk_blob_get_esnap_bs_dev(lvol->blob);
1015 	if (bs_dev == NULL) {
1016 		assert(false);
1017 		SPDK_ERRLOG("lvol %s is an esnap clone but has no esnap device\n", lvol->unique_id);
1018 		return base_cnt;
1019 	}
1020 
1021 	if (bs_dev->get_base_bdev == NULL) {
1022 		/*
1023 		 * If this were a blob_bdev, we wouldn't be here. We are probably here because an
1024 		 * lvol bdev is being registered with spdk_bdev_register() before the external
1025 		 * snapshot bdev is loaded. Ideally, the load of a missing esnap would trigger an
1026 		 * event that causes the lvol bdev's memory domain information to be updated.
1027 		 */
1028 		return base_cnt;
1029 	}
1030 
1031 	esnap_bdev = bs_dev->get_base_bdev(bs_dev);
1032 	if (esnap_bdev == NULL) {
1033 		/*
1034 		 * The esnap bdev has not yet been loaded. Anyone that has opened at this point may
1035 		 * miss out on using memory domains if base_cnt is zero.
1036 		 */
1037 		SPDK_NOTICELOG("lvol %s reporting %d memory domains, not including missing esnap\n",
1038 			       lvol->unique_id, base_cnt);
1039 		return base_cnt;
1040 	}
1041 
1042 	if (base_cnt < array_size) {
1043 		array_size -= base_cnt;
1044 		domains += base_cnt;
1045 	} else {
1046 		array_size = 0;
1047 		domains = NULL;
1048 	}
1049 
1050 	esnap_cnt = spdk_bdev_get_memory_domains(esnap_bdev, domains, array_size);
1051 	if (esnap_cnt <= 0) {
1052 		return base_cnt;
1053 	}
1054 
1055 	return base_cnt + esnap_cnt;
1056 }
1057 
1058 static struct spdk_bdev_fn_table vbdev_lvol_fn_table = {
1059 	.destruct		= vbdev_lvol_unregister,
1060 	.io_type_supported	= vbdev_lvol_io_type_supported,
1061 	.submit_request		= vbdev_lvol_submit_request,
1062 	.get_io_channel		= vbdev_lvol_get_io_channel,
1063 	.dump_info_json		= vbdev_lvol_dump_info_json,
1064 	.write_config_json	= vbdev_lvol_write_config_json,
1065 	.get_memory_domains	= vbdev_lvol_get_memory_domains,
1066 };
1067 
1068 static void
1069 lvol_destroy_cb(void *cb_arg, int bdeverrno)
1070 {
1071 }
1072 
1073 static void
1074 _create_lvol_disk_destroy_cb(void *cb_arg, int bdeverrno)
1075 {
1076 	struct spdk_lvol *lvol = cb_arg;
1077 
1078 	if (bdeverrno < 0) {
1079 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
1080 			    lvol->unique_id);
1081 		return;
1082 	}
1083 
1084 	spdk_lvol_destroy(lvol, lvol_destroy_cb, NULL);
1085 }
1086 
1087 static void
1088 _create_lvol_disk_unload_cb(void *cb_arg, int bdeverrno)
1089 {
1090 	struct spdk_lvol *lvol = cb_arg;
1091 
1092 	if (bdeverrno < 0) {
1093 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
1094 			    lvol->unique_id);
1095 		return;
1096 	}
1097 
1098 	TAILQ_REMOVE(&lvol->lvol_store->lvols, lvol, link);
1099 	free(lvol);
1100 }
1101 
1102 static int
1103 _create_lvol_disk(struct spdk_lvol *lvol, bool destroy)
1104 {
1105 	struct spdk_bdev *bdev;
1106 	struct lvol_bdev *lvol_bdev;
1107 	struct lvol_store_bdev *lvs_bdev;
1108 	uint64_t total_size;
1109 	unsigned char *alias;
1110 	int rc;
1111 
1112 	if (spdk_lvol_is_degraded(lvol)) {
1113 		SPDK_NOTICELOG("lvol %s: blob is degraded: deferring bdev creation\n",
1114 			       lvol->unique_id);
1115 		return 0;
1116 	}
1117 
1118 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
1119 	if (lvs_bdev == NULL) {
1120 		SPDK_ERRLOG("No spdk lvs-bdev pair found for lvol %s\n", lvol->unique_id);
1121 		assert(false);
1122 		return -ENODEV;
1123 	}
1124 
1125 	lvol_bdev = calloc(1, sizeof(struct lvol_bdev));
1126 	if (!lvol_bdev) {
1127 		SPDK_ERRLOG("Cannot alloc memory for lvol bdev\n");
1128 		return -ENOMEM;
1129 	}
1130 
1131 	lvol_bdev->lvol = lvol;
1132 	lvol_bdev->lvs_bdev = lvs_bdev;
1133 
1134 	bdev = &lvol_bdev->bdev;
1135 	bdev->name = lvol->unique_id;
1136 	bdev->product_name = "Logical Volume";
1137 	bdev->blocklen = spdk_bs_get_io_unit_size(lvol->lvol_store->blobstore);
1138 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
1139 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
1140 	assert((total_size % bdev->blocklen) == 0);
1141 	bdev->blockcnt = total_size / bdev->blocklen;
1142 	bdev->uuid = lvol->uuid;
1143 	bdev->required_alignment = lvs_bdev->bdev->required_alignment;
1144 	bdev->split_on_optimal_io_boundary = true;
1145 	bdev->optimal_io_boundary = spdk_bs_get_cluster_size(lvol->lvol_store->blobstore) / bdev->blocklen;
1146 
1147 	bdev->ctxt = lvol;
1148 	bdev->fn_table = &vbdev_lvol_fn_table;
1149 	bdev->module = &g_lvol_if;
1150 
1151 	/* Set default bdev reset waiting time. This value indicates how much
1152 	 * time a reset should wait before forcing a reset down to the underlying
1153 	 * bdev module.
1154 	 * Setting this parameter is mainly to avoid "empty" resets to a shared
1155 	 * bdev that may be used by multiple lvols. */
1156 	bdev->reset_io_drain_timeout = SPDK_BDEV_RESET_IO_DRAIN_RECOMMENDED_VALUE;
1157 
1158 	rc = spdk_bdev_register(bdev);
1159 	if (rc) {
1160 		free(lvol_bdev);
1161 		return rc;
1162 	}
1163 	lvol->bdev = bdev;
1164 
1165 	alias = spdk_sprintf_alloc("%s/%s", lvs_bdev->lvs->name, lvol->name);
1166 	if (alias == NULL) {
1167 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
1168 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
1169 						  _create_lvol_disk_unload_cb), lvol);
1170 		return -ENOMEM;
1171 	}
1172 
1173 	rc = spdk_bdev_alias_add(bdev, alias);
1174 	if (rc != 0) {
1175 		SPDK_ERRLOG("Cannot add alias to lvol bdev\n");
1176 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
1177 						  _create_lvol_disk_unload_cb), lvol);
1178 	}
1179 	free(alias);
1180 
1181 	return rc;
1182 }
1183 
1184 static void
1185 _vbdev_lvol_create_cb(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1186 {
1187 	struct spdk_lvol_with_handle_req *req = cb_arg;
1188 
1189 	if (lvolerrno < 0) {
1190 		goto end;
1191 	}
1192 
1193 	lvolerrno = _create_lvol_disk(lvol, true);
1194 
1195 end:
1196 	req->cb_fn(req->cb_arg, lvol, lvolerrno);
1197 	free(req);
1198 }
1199 
1200 int
1201 vbdev_lvol_create(struct spdk_lvol_store *lvs, const char *name, uint64_t sz,
1202 		  bool thin_provision, enum lvol_clear_method clear_method, spdk_lvol_op_with_handle_complete cb_fn,
1203 		  void *cb_arg)
1204 {
1205 	struct spdk_lvol_with_handle_req *req;
1206 	int rc;
1207 
1208 	req = calloc(1, sizeof(*req));
1209 	if (req == NULL) {
1210 		return -ENOMEM;
1211 	}
1212 	req->cb_fn = cb_fn;
1213 	req->cb_arg = cb_arg;
1214 
1215 	rc = spdk_lvol_create(lvs, name, sz, thin_provision, clear_method,
1216 			      _vbdev_lvol_create_cb, req);
1217 	if (rc != 0) {
1218 		free(req);
1219 	}
1220 
1221 	return rc;
1222 }
1223 
1224 void
1225 vbdev_lvol_create_snapshot(struct spdk_lvol *lvol, const char *snapshot_name,
1226 			   spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1227 {
1228 	struct spdk_lvol_with_handle_req *req;
1229 
1230 	req = calloc(1, sizeof(*req));
1231 	if (req == NULL) {
1232 		cb_fn(cb_arg, NULL, -ENOMEM);
1233 		return;
1234 	}
1235 
1236 	req->cb_fn = cb_fn;
1237 	req->cb_arg = cb_arg;
1238 
1239 	spdk_lvol_create_snapshot(lvol, snapshot_name, _vbdev_lvol_create_cb, req);
1240 }
1241 
1242 void
1243 vbdev_lvol_create_clone(struct spdk_lvol *lvol, const char *clone_name,
1244 			spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1245 {
1246 	struct spdk_lvol_with_handle_req *req;
1247 
1248 	req = calloc(1, sizeof(*req));
1249 	if (req == NULL) {
1250 		cb_fn(cb_arg, NULL, -ENOMEM);
1251 		return;
1252 	}
1253 
1254 	req->cb_fn = cb_fn;
1255 	req->cb_arg = cb_arg;
1256 
1257 	spdk_lvol_create_clone(lvol, clone_name, _vbdev_lvol_create_cb, req);
1258 }
1259 
1260 static void
1261 ignore_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx)
1262 {
1263 }
1264 
1265 void
1266 vbdev_lvol_create_bdev_clone(const char *esnap_name,
1267 			     struct spdk_lvol_store *lvs, const char *clone_name,
1268 			     spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1269 {
1270 	struct spdk_lvol_with_handle_req *req;
1271 	struct spdk_bdev_desc *desc;
1272 	struct spdk_bdev *bdev;
1273 	char bdev_uuid[SPDK_UUID_STRING_LEN];
1274 	uint64_t sz;
1275 	int rc;
1276 
1277 	if (lvs == NULL) {
1278 		SPDK_ERRLOG("lvol store not specified\n");
1279 		cb_fn(cb_arg, NULL, -EINVAL);
1280 		return;
1281 	}
1282 
1283 	rc = spdk_bdev_open_ext(esnap_name, false, ignore_bdev_event_cb, NULL, &desc);
1284 	if (rc != 0) {
1285 		SPDK_ERRLOG("bdev '%s' could not be opened: error %d\n", esnap_name, rc);
1286 		cb_fn(cb_arg, NULL, rc);
1287 		return;
1288 	}
1289 	bdev = spdk_bdev_desc_get_bdev(desc);
1290 
1291 	rc = spdk_uuid_fmt_lower(bdev_uuid, sizeof(bdev_uuid), spdk_bdev_get_uuid(bdev));
1292 	if (rc != 0) {
1293 		spdk_bdev_close(desc);
1294 		SPDK_ERRLOG("bdev %s: unable to parse UUID\n", esnap_name);
1295 		assert(false);
1296 		cb_fn(cb_arg, NULL, -ENODEV);
1297 		return;
1298 	}
1299 
1300 	req = calloc(1, sizeof(*req));
1301 	if (req == NULL) {
1302 		spdk_bdev_close(desc);
1303 		cb_fn(cb_arg, NULL, -ENOMEM);
1304 		return;
1305 	}
1306 
1307 	req->cb_fn = cb_fn;
1308 	req->cb_arg = cb_arg;
1309 
1310 	sz = spdk_bdev_get_num_blocks(bdev) * spdk_bdev_get_block_size(bdev);
1311 	rc = spdk_lvol_create_esnap_clone(bdev_uuid, sizeof(bdev_uuid), sz, lvs, clone_name,
1312 					  _vbdev_lvol_create_cb, req);
1313 	spdk_bdev_close(desc);
1314 	if (rc != 0) {
1315 		cb_fn(cb_arg, NULL, rc);
1316 		free(req);
1317 	}
1318 }
1319 
1320 static void
1321 _vbdev_lvol_rename_cb(void *cb_arg, int lvolerrno)
1322 {
1323 	struct spdk_lvol_req *req = cb_arg;
1324 
1325 	if (lvolerrno != 0) {
1326 		SPDK_ERRLOG("Renaming lvol failed\n");
1327 	}
1328 
1329 	req->cb_fn(req->cb_arg, lvolerrno);
1330 	free(req);
1331 }
1332 
1333 void
1334 vbdev_lvol_rename(struct spdk_lvol *lvol, const char *new_lvol_name,
1335 		  spdk_lvol_op_complete cb_fn, void *cb_arg)
1336 {
1337 	struct spdk_lvol_req *req;
1338 	int rc;
1339 
1340 	rc = _vbdev_lvol_change_bdev_alias(lvol, new_lvol_name);
1341 	if (rc != 0) {
1342 		SPDK_ERRLOG("renaming lvol to '%s' does not succeed\n", new_lvol_name);
1343 		cb_fn(cb_arg, rc);
1344 		return;
1345 	}
1346 
1347 	req = calloc(1, sizeof(*req));
1348 	if (req == NULL) {
1349 		cb_fn(cb_arg, -ENOMEM);
1350 		return;
1351 	}
1352 	req->cb_fn = cb_fn;
1353 	req->cb_arg = cb_arg;
1354 
1355 	spdk_lvol_rename(lvol, new_lvol_name, _vbdev_lvol_rename_cb, req);
1356 }
1357 
1358 static void
1359 _vbdev_lvol_resize_cb(void *cb_arg, int lvolerrno)
1360 {
1361 	struct spdk_lvol_req *req = cb_arg;
1362 	struct spdk_lvol *lvol = req->lvol;
1363 	uint64_t total_size;
1364 
1365 	/* change bdev size */
1366 	if (lvolerrno != 0) {
1367 		SPDK_ERRLOG("CB function for bdev lvol %s receive error no: %d.\n", lvol->name, lvolerrno);
1368 		goto finish;
1369 	}
1370 
1371 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
1372 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
1373 	assert((total_size % lvol->bdev->blocklen) == 0);
1374 
1375 	lvolerrno = spdk_bdev_notify_blockcnt_change(lvol->bdev, total_size / lvol->bdev->blocklen);
1376 	if (lvolerrno != 0) {
1377 		SPDK_ERRLOG("Could not change num blocks for bdev lvol %s with error no: %d.\n",
1378 			    lvol->name, lvolerrno);
1379 	}
1380 
1381 finish:
1382 	req->cb_fn(req->cb_arg, lvolerrno);
1383 	free(req);
1384 }
1385 
1386 void
1387 vbdev_lvol_resize(struct spdk_lvol *lvol, uint64_t sz, spdk_lvol_op_complete cb_fn, void *cb_arg)
1388 {
1389 	struct spdk_lvol_req *req;
1390 
1391 	if (lvol == NULL) {
1392 		SPDK_ERRLOG("lvol does not exist\n");
1393 		cb_fn(cb_arg, -EINVAL);
1394 		return;
1395 	}
1396 
1397 	assert(lvol->bdev != NULL);
1398 
1399 	req = calloc(1, sizeof(*req));
1400 	if (req == NULL) {
1401 		cb_fn(cb_arg, -ENOMEM);
1402 		return;
1403 	}
1404 
1405 	req->cb_fn = cb_fn;
1406 	req->cb_arg = cb_arg;
1407 	req->sz = sz;
1408 	req->lvol = lvol;
1409 
1410 	spdk_lvol_resize(req->lvol, req->sz, _vbdev_lvol_resize_cb, req);
1411 }
1412 
1413 static void
1414 _vbdev_lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
1415 {
1416 	struct spdk_lvol_req *req = cb_arg;
1417 	struct spdk_lvol *lvol = req->lvol;
1418 
1419 	if (lvolerrno != 0) {
1420 		SPDK_ERRLOG("Could not set bdev lvol %s as read only due to error: %d.\n", lvol->name, lvolerrno);
1421 	}
1422 
1423 	req->cb_fn(req->cb_arg, lvolerrno);
1424 	free(req);
1425 }
1426 
1427 void
1428 vbdev_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1429 {
1430 	struct spdk_lvol_req *req;
1431 
1432 	if (lvol == NULL) {
1433 		SPDK_ERRLOG("lvol does not exist\n");
1434 		cb_fn(cb_arg, -EINVAL);
1435 		return;
1436 	}
1437 
1438 	assert(lvol->bdev != NULL);
1439 
1440 	req = calloc(1, sizeof(*req));
1441 	if (req == NULL) {
1442 		cb_fn(cb_arg, -ENOMEM);
1443 		return;
1444 	}
1445 
1446 	req->cb_fn = cb_fn;
1447 	req->cb_arg = cb_arg;
1448 	req->lvol = lvol;
1449 
1450 	spdk_lvol_set_read_only(lvol, _vbdev_lvol_set_read_only_cb, req);
1451 }
1452 
1453 static int
1454 vbdev_lvs_init(void)
1455 {
1456 	return 0;
1457 }
1458 
1459 static void vbdev_lvs_fini_start_iter(struct lvol_store_bdev *lvs_bdev);
1460 
1461 static void
1462 vbdev_lvs_fini_start_unload_cb(void *cb_arg, int lvserrno)
1463 {
1464 	struct lvol_store_bdev *lvs_bdev = cb_arg;
1465 	struct lvol_store_bdev *next_lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
1466 
1467 	if (lvserrno != 0) {
1468 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
1469 	}
1470 
1471 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1472 	free(lvs_bdev);
1473 
1474 	vbdev_lvs_fini_start_iter(next_lvs_bdev);
1475 }
1476 
1477 static void
1478 vbdev_lvs_fini_start_iter(struct lvol_store_bdev *lvs_bdev)
1479 {
1480 	struct spdk_lvol_store *lvs;
1481 
1482 	while (lvs_bdev != NULL) {
1483 		lvs = lvs_bdev->lvs;
1484 
1485 		if (_vbdev_lvs_are_lvols_closed(lvs)) {
1486 			spdk_lvs_unload(lvs, vbdev_lvs_fini_start_unload_cb, lvs_bdev);
1487 			return;
1488 		}
1489 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
1490 	}
1491 
1492 	spdk_bdev_module_fini_start_done();
1493 }
1494 
1495 static void
1496 vbdev_lvs_fini_start(void)
1497 {
1498 	g_shutdown_started = true;
1499 	vbdev_lvs_fini_start_iter(vbdev_lvol_store_first());
1500 }
1501 
1502 static int
1503 vbdev_lvs_get_ctx_size(void)
1504 {
1505 	return sizeof(struct vbdev_lvol_io);
1506 }
1507 
1508 static void
1509 _vbdev_lvs_examine_done(struct spdk_lvs_req *req, int lvserrno)
1510 {
1511 	req->cb_fn(req->cb_arg, lvserrno);
1512 }
1513 
1514 static void
1515 _vbdev_lvs_examine_failed(void *cb_arg, int lvserrno)
1516 {
1517 	struct spdk_lvs_req *req = cb_arg;
1518 
1519 	_vbdev_lvs_examine_done(req, req->lvserrno);
1520 }
1521 
1522 static void
1523 _vbdev_lvs_examine_finish(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1524 {
1525 	struct spdk_lvs_req *req = cb_arg;
1526 	struct spdk_lvol_store *lvs = req->lvol_store;
1527 
1528 	if (lvolerrno != 0) {
1529 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
1530 		if (lvolerrno == -ENOMEM) {
1531 			TAILQ_INSERT_TAIL(&lvs->retry_open_lvols, lvol, link);
1532 			return;
1533 		}
1534 		SPDK_ERRLOG("Error opening lvol %s\n", lvol->unique_id);
1535 		lvs->lvol_count--;
1536 		free(lvol);
1537 		goto end;
1538 	}
1539 
1540 	if (_create_lvol_disk(lvol, false)) {
1541 		SPDK_ERRLOG("Cannot create bdev for lvol %s\n", lvol->unique_id);
1542 		lvs->lvol_count--;
1543 		goto end;
1544 	}
1545 
1546 	lvs->lvols_opened++;
1547 	SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s succeeded\n", lvol->unique_id);
1548 
1549 end:
1550 	if (!TAILQ_EMPTY(&lvs->retry_open_lvols)) {
1551 		lvol = TAILQ_FIRST(&lvs->retry_open_lvols);
1552 		TAILQ_REMOVE(&lvs->retry_open_lvols, lvol, link);
1553 		TAILQ_INSERT_HEAD(&lvs->lvols, lvol, link);
1554 		spdk_lvol_open(lvol, _vbdev_lvs_examine_finish, req);
1555 		return;
1556 	}
1557 	if (lvs->lvols_opened >= lvs->lvol_count) {
1558 		SPDK_INFOLOG(vbdev_lvol, "Opening lvols finished\n");
1559 		_vbdev_lvs_examine_done(req, 0);
1560 	}
1561 }
1562 
1563 /* Walks a tree of clones that are no longer degraded to create bdevs. */
1564 static int
1565 create_esnap_clone_lvol_disks(void *ctx, struct spdk_lvol *lvol)
1566 {
1567 	struct spdk_bdev *bdev = ctx;
1568 	int rc;
1569 
1570 	rc = _create_lvol_disk(lvol, false);
1571 	if (rc != 0) {
1572 		SPDK_ERRLOG("lvol %s: failed to create bdev after esnap hotplug of %s: %d\n",
1573 			    lvol->unique_id, spdk_bdev_get_name(bdev), rc);
1574 		/* Do not prevent creation of other clones in case of one failure. */
1575 		return 0;
1576 	}
1577 
1578 	return spdk_lvol_iter_immediate_clones(lvol, create_esnap_clone_lvol_disks, ctx);
1579 }
1580 
1581 static void
1582 vbdev_lvs_hotplug(void *ctx, struct spdk_lvol *lvol, int lvolerrno)
1583 {
1584 	struct spdk_bdev *esnap_clone_bdev = ctx;
1585 
1586 	if (lvolerrno != 0) {
1587 		SPDK_ERRLOG("lvol %s: during examine of bdev %s: not creating clone bdev due to "
1588 			    "error %d\n", lvol->unique_id, spdk_bdev_get_name(esnap_clone_bdev),
1589 			    lvolerrno);
1590 		return;
1591 	}
1592 	create_esnap_clone_lvol_disks(esnap_clone_bdev, lvol);
1593 }
1594 
1595 static void
1596 vbdev_lvs_examine_config(struct spdk_bdev *bdev)
1597 {
1598 	char uuid_str[SPDK_UUID_STRING_LEN];
1599 
1600 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid);
1601 
1602 	if (spdk_lvs_notify_hotplug(uuid_str, sizeof(uuid_str), vbdev_lvs_hotplug, bdev)) {
1603 		SPDK_INFOLOG(vbdev_lvol, "bdev %s: claimed by one ore more esnap clones\n",
1604 			     uuid_str);
1605 	}
1606 	spdk_bdev_module_examine_done(&g_lvol_if);
1607 }
1608 
1609 static void
1610 _vbdev_lvs_examine_cb(void *arg, struct spdk_lvol_store *lvol_store, int lvserrno)
1611 {
1612 	struct lvol_store_bdev *lvs_bdev;
1613 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)arg;
1614 	struct spdk_lvol *lvol, *tmp;
1615 	struct spdk_lvs_req *ori_req = req->cb_arg;
1616 
1617 	if (lvserrno == -EEXIST) {
1618 		SPDK_INFOLOG(vbdev_lvol,
1619 			     "Name for lvolstore on device %s conflicts with name for already loaded lvs\n",
1620 			     req->base_bdev->name);
1621 		/* On error blobstore destroys bs_dev itself */
1622 		_vbdev_lvs_examine_done(ori_req, lvserrno);
1623 		goto end;
1624 	} else if (lvserrno != 0) {
1625 		SPDK_INFOLOG(vbdev_lvol, "Lvol store not found on %s\n", req->base_bdev->name);
1626 		/* On error blobstore destroys bs_dev itself */
1627 		_vbdev_lvs_examine_done(ori_req, lvserrno);
1628 		goto end;
1629 	}
1630 
1631 	lvserrno = spdk_bs_bdev_claim(lvol_store->bs_dev, &g_lvol_if);
1632 	if (lvserrno != 0) {
1633 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
1634 		ori_req->lvserrno = lvserrno;
1635 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, ori_req);
1636 		goto end;
1637 	}
1638 
1639 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
1640 	if (!lvs_bdev) {
1641 		SPDK_ERRLOG("Cannot alloc memory for lvs_bdev\n");
1642 		ori_req->lvserrno = lvserrno;
1643 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, ori_req);
1644 		goto end;
1645 	}
1646 
1647 	lvs_bdev->lvs = lvol_store;
1648 	lvs_bdev->bdev = req->base_bdev;
1649 
1650 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1651 
1652 	SPDK_INFOLOG(vbdev_lvol, "Lvol store found on %s - begin parsing\n",
1653 		     req->base_bdev->name);
1654 
1655 	lvol_store->lvols_opened = 0;
1656 
1657 	ori_req->lvol_store = lvol_store;
1658 
1659 	if (TAILQ_EMPTY(&lvol_store->lvols)) {
1660 		SPDK_INFOLOG(vbdev_lvol, "Lvol store examination done\n");
1661 		_vbdev_lvs_examine_done(ori_req, 0);
1662 	} else {
1663 		/* Open all lvols */
1664 		TAILQ_FOREACH_SAFE(lvol, &lvol_store->lvols, link, tmp) {
1665 			spdk_lvol_open(lvol, _vbdev_lvs_examine_finish, ori_req);
1666 		}
1667 	}
1668 
1669 end:
1670 	free(req);
1671 }
1672 
1673 static void
1674 _vbdev_lvs_examine(struct spdk_bdev *bdev, struct spdk_lvs_req *ori_req,
1675 		   void (*action)(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg))
1676 {
1677 	struct spdk_bs_dev *bs_dev;
1678 	struct spdk_lvs_with_handle_req *req;
1679 	int rc;
1680 
1681 	req = calloc(1, sizeof(*req));
1682 	if (req == NULL) {
1683 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1684 		_vbdev_lvs_examine_done(ori_req, -ENOMEM);
1685 		return;
1686 	}
1687 
1688 	rc = spdk_bdev_create_bs_dev_ext(bdev->name, vbdev_lvs_base_bdev_event_cb,
1689 					 NULL, &bs_dev);
1690 	if (rc < 0) {
1691 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n", bdev->name);
1692 		_vbdev_lvs_examine_done(ori_req, rc);
1693 		free(req);
1694 		return;
1695 	}
1696 
1697 	req->base_bdev = bdev;
1698 	req->cb_arg = ori_req;
1699 
1700 	action(bs_dev, _vbdev_lvs_examine_cb, req);
1701 }
1702 
1703 static void
1704 vbdev_lvs_examine_done(void *arg, int lvserrno)
1705 {
1706 	struct spdk_lvs_req *req = arg;
1707 
1708 	spdk_bdev_module_examine_done(&g_lvol_if);
1709 	free(req);
1710 }
1711 
1712 static void
1713 vbdev_lvs_load(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
1714 {
1715 	struct spdk_lvs_opts lvs_opts;
1716 
1717 	spdk_lvs_opts_init(&lvs_opts);
1718 	lvs_opts.esnap_bs_dev_create = vbdev_lvol_esnap_dev_create;
1719 	spdk_lvs_load_ext(bs_dev, &lvs_opts, cb_fn, cb_arg);
1720 }
1721 
1722 static void
1723 vbdev_lvs_examine_disk(struct spdk_bdev *bdev)
1724 {
1725 	struct spdk_lvs_req *req;
1726 
1727 	if (spdk_bdev_get_md_size(bdev) != 0) {
1728 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n which is formatted with metadata",
1729 			     bdev->name);
1730 		spdk_bdev_module_examine_done(&g_lvol_if);
1731 		return;
1732 	}
1733 
1734 	req = calloc(1, sizeof(*req));
1735 	if (req == NULL) {
1736 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1737 		spdk_bdev_module_examine_done(&g_lvol_if);
1738 		return;
1739 	}
1740 
1741 	req->cb_fn = vbdev_lvs_examine_done;
1742 	req->cb_arg = req;
1743 
1744 	_vbdev_lvs_examine(bdev, req, vbdev_lvs_load);
1745 }
1746 
1747 struct spdk_lvol *
1748 vbdev_lvol_get_from_bdev(struct spdk_bdev *bdev)
1749 {
1750 	if (!bdev || bdev->module != &g_lvol_if) {
1751 		return NULL;
1752 	}
1753 
1754 	if (bdev->ctxt == NULL) {
1755 		SPDK_ERRLOG("No lvol ctx assigned to bdev %s\n", bdev->name);
1756 		return NULL;
1757 	}
1758 
1759 	return (struct spdk_lvol *)bdev->ctxt;
1760 }
1761 
1762 static void
1763 _vbdev_lvs_grow_finish(void *arg, int lvserrno)
1764 {
1765 	struct spdk_lvs_grow_req *req = arg;
1766 
1767 	req->cb_fn(req->cb_arg, lvserrno);
1768 	free(req);
1769 }
1770 
1771 static void
1772 _vbdev_lvs_grow_unload_cb(void *cb_arg, int lvserrno)
1773 {
1774 	struct spdk_lvs_grow_req *req = cb_arg;
1775 	struct lvol_store_bdev *lvs_bdev;
1776 	struct spdk_bdev *bdev;
1777 
1778 	if (lvserrno != 0) {
1779 		_vbdev_lvs_grow_finish(req, lvserrno);
1780 		return;
1781 	}
1782 
1783 	lvs_bdev = req->lvs_bdev;
1784 	bdev = lvs_bdev->bdev;
1785 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1786 	req->base.cb_fn = _vbdev_lvs_grow_finish;
1787 	req->base.cb_arg = req;
1788 	_vbdev_lvs_examine(bdev, &req->base, spdk_lvs_grow);
1789 	free(lvs_bdev);
1790 }
1791 
1792 static void
1793 _vbdev_lvs_grow_remove_bdev_unregistered_cb(void *cb_arg, int bdeverrno)
1794 {
1795 	struct spdk_lvs_grow_req *req = cb_arg;
1796 	struct spdk_lvol_store *lvs = req->base.lvol_store;
1797 
1798 	if (bdeverrno != 0) {
1799 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol unregistered with errno %d\n", bdeverrno);
1800 	}
1801 
1802 	req->lvol_cnt--;
1803 
1804 	if (req->lvol_cnt == 0) {
1805 		/* Lvol store can be unloaded once all lvols are closed. */
1806 		if (_vbdev_lvs_are_lvols_closed(lvs)) {
1807 			spdk_lvs_unload(lvs, _vbdev_lvs_grow_unload_cb, req);
1808 		} else {
1809 			_vbdev_lvs_grow_finish(req, -EINVAL);
1810 		}
1811 	}
1812 }
1813 
1814 void
1815 vbdev_lvs_grow(struct spdk_lvol_store *lvs,
1816 	       spdk_lvs_op_complete cb_fn, void *cb_arg)
1817 {
1818 	struct spdk_lvs_grow_req *req;
1819 	struct spdk_lvol *lvol, *tmp;
1820 
1821 	req = calloc(1, sizeof(*req));
1822 	if (!req) {
1823 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1824 		cb_fn(cb_arg, -ENOMEM);
1825 		return;
1826 	}
1827 	req->cb_fn = cb_fn;
1828 	req->cb_arg = cb_arg;
1829 	req->base.lvol_store = lvs;
1830 	req->lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
1831 	if (req->lvs_bdev == NULL) {
1832 		SPDK_ERRLOG("Cannot get valid lvs_bdev\n");
1833 		_vbdev_lvs_grow_finish(req, -EINVAL);
1834 		return;
1835 	}
1836 
1837 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
1838 		spdk_lvs_unload(lvs, _vbdev_lvs_grow_unload_cb, req);
1839 	} else {
1840 		TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
1841 			req->lvol_cnt++;
1842 			spdk_bdev_unregister(lvol->bdev, _vbdev_lvs_grow_remove_bdev_unregistered_cb, req);
1843 		}
1844 		assert(req->lvol_cnt > 0);
1845 	}
1846 }
1847 
1848 /* Begin degraded blobstore device */
1849 
1850 /*
1851  * When an external snapshot is missing, an instance of bs_dev_degraded is used as the blob's
1852  * back_bs_dev. No bdev is registered, so there should be no IO nor requests for channels. The main
1853  * purposes of this device are to prevent blobstore from hitting fatal runtime errors and to
1854  * indicate that the blob is degraded via the is_degraded() callback.
1855  */
1856 
1857 static void
1858 bs_dev_degraded_read(struct spdk_bs_dev *dev, struct spdk_io_channel *channel, void *payload,
1859 		     uint64_t lba, uint32_t lba_count, struct spdk_bs_dev_cb_args *cb_args)
1860 {
1861 	assert(false);
1862 	cb_args->cb_fn(cb_args->channel, cb_args->cb_arg, -EIO);
1863 }
1864 
1865 static void
1866 bs_dev_degraded_readv(struct spdk_bs_dev *dev, struct spdk_io_channel *channel,
1867 		      struct iovec *iov, int iovcnt, uint64_t lba, uint32_t lba_count,
1868 		      struct spdk_bs_dev_cb_args *cb_args)
1869 {
1870 	assert(false);
1871 	cb_args->cb_fn(cb_args->channel, cb_args->cb_arg, -EIO);
1872 }
1873 
1874 static void
1875 bs_dev_degraded_readv_ext(struct spdk_bs_dev *dev, struct spdk_io_channel *channel,
1876 			  struct iovec *iov, int iovcnt, uint64_t lba, uint32_t lba_count,
1877 			  struct spdk_bs_dev_cb_args *cb_args,
1878 			  struct spdk_blob_ext_io_opts *io_opts)
1879 {
1880 	assert(false);
1881 	cb_args->cb_fn(cb_args->channel, cb_args->cb_arg, -EIO);
1882 }
1883 
1884 static bool
1885 bs_dev_degraded_is_zeroes(struct spdk_bs_dev *dev, uint64_t lba, uint64_t lba_count)
1886 {
1887 	assert(false);
1888 	return false;
1889 }
1890 
1891 static struct spdk_io_channel *
1892 bs_dev_degraded_create_channel(struct spdk_bs_dev *bs_dev)
1893 {
1894 	assert(false);
1895 	return NULL;
1896 }
1897 
1898 static void
1899 bs_dev_degraded_destroy_channel(struct spdk_bs_dev *bs_dev, struct spdk_io_channel *channel)
1900 {
1901 	assert(false);
1902 }
1903 
1904 static void
1905 bs_dev_degraded_destroy(struct spdk_bs_dev *bs_dev)
1906 {
1907 }
1908 
1909 static bool
1910 bs_dev_degraded_is_degraded(struct spdk_bs_dev *bs_dev)
1911 {
1912 	return true;
1913 }
1914 
1915 static struct spdk_bs_dev bs_dev_degraded = {
1916 	.create_channel = bs_dev_degraded_create_channel,
1917 	.destroy_channel = bs_dev_degraded_destroy_channel,
1918 	.destroy = bs_dev_degraded_destroy,
1919 	.read = bs_dev_degraded_read,
1920 	.readv = bs_dev_degraded_readv,
1921 	.readv_ext = bs_dev_degraded_readv_ext,
1922 	.is_zeroes = bs_dev_degraded_is_zeroes,
1923 	.is_degraded = bs_dev_degraded_is_degraded,
1924 	/* Make the device as large as possible without risk of uint64 overflow. */
1925 	.blockcnt = UINT64_MAX / 512,
1926 	/* Prevent divide by zero errors calculating LBAs that will never be read. */
1927 	.blocklen = 512,
1928 };
1929 
1930 /* End degraded blobstore device */
1931 
1932 /* Begin external snapshot support */
1933 
1934 static void
1935 vbdev_lvol_esnap_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1936 			       void *event_ctx)
1937 {
1938 	SPDK_NOTICELOG("bdev name (%s) received unsupported event type %d\n",
1939 		       spdk_bdev_get_name(bdev), type);
1940 }
1941 
1942 int
1943 vbdev_lvol_esnap_dev_create(void *bs_ctx, void *blob_ctx, struct spdk_blob *blob,
1944 			    const void *esnap_id, uint32_t id_len,
1945 			    struct spdk_bs_dev **_bs_dev)
1946 {
1947 	struct spdk_lvol_store	*lvs = bs_ctx;
1948 	struct spdk_lvol	*lvol = blob_ctx;
1949 	struct spdk_bs_dev	*bs_dev = NULL;
1950 	struct spdk_uuid	uuid;
1951 	int			rc;
1952 	char			uuid_str[SPDK_UUID_STRING_LEN] = { 0 };
1953 
1954 	if (esnap_id == NULL) {
1955 		SPDK_ERRLOG("lvol %s: NULL esnap ID\n", lvol->unique_id);
1956 		return -EINVAL;
1957 	}
1958 
1959 	/* Guard against arbitrary names and unterminated UUID strings */
1960 	if (id_len != SPDK_UUID_STRING_LEN) {
1961 		SPDK_ERRLOG("lvol %s: Invalid esnap ID length (%u)\n", lvol->unique_id, id_len);
1962 		return -EINVAL;
1963 	}
1964 
1965 	if (spdk_uuid_parse(&uuid, esnap_id)) {
1966 		SPDK_ERRLOG("lvol %s: Invalid esnap ID: not a UUID\n", lvol->unique_id);
1967 		return -EINVAL;
1968 	}
1969 
1970 	/* Format the UUID the same as it is in the bdev names tree. */
1971 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &uuid);
1972 	if (strcmp(uuid_str, esnap_id) != 0) {
1973 		SPDK_WARNLOG("lvol %s: esnap_id '%*s' does not match parsed uuid '%s'\n",
1974 			     lvol->unique_id, SPDK_UUID_STRING_LEN, (const char *)esnap_id,
1975 			     uuid_str);
1976 		assert(false);
1977 	}
1978 
1979 	rc = spdk_bdev_create_bs_dev(uuid_str, false, NULL, 0,
1980 				     vbdev_lvol_esnap_bdev_event_cb, NULL, &bs_dev);
1981 	if (rc != 0) {
1982 		goto fail;
1983 	}
1984 
1985 	rc = spdk_bs_bdev_claim(bs_dev, &g_lvol_if);
1986 	if (rc != 0) {
1987 		SPDK_ERRLOG("lvol %s: unable to claim esnap bdev '%s': %d\n", lvol->unique_id,
1988 			    uuid_str, rc);
1989 		bs_dev->destroy(bs_dev);
1990 		goto fail;
1991 	}
1992 
1993 	*_bs_dev = bs_dev;
1994 	return 0;
1995 
1996 fail:
1997 	/* Unable to open or claim the bdev. This lvol is degraded. */
1998 	bs_dev = &bs_dev_degraded;
1999 	SPDK_NOTICELOG("lvol %s: bdev %s not available: lvol is degraded\n", lvol->unique_id,
2000 		       uuid_str);
2001 
2002 	/*
2003 	 * Be sure not to call spdk_lvs_missing_add() on an lvol that is already degraded. This can
2004 	 * lead to a cycle in the degraded_lvols tailq.
2005 	 */
2006 	if (lvol->degraded_set == NULL) {
2007 		rc = spdk_lvs_esnap_missing_add(lvs, lvol, uuid_str, sizeof(uuid_str));
2008 		if (rc != 0) {
2009 			SPDK_NOTICELOG("lvol %s: unable to register missing esnap device %s: "
2010 				       "it will not be hotplugged if added later\n",
2011 				       lvol->unique_id, uuid_str);
2012 		}
2013 	}
2014 
2015 	*_bs_dev = bs_dev;
2016 	return 0;
2017 }
2018 
2019 /* End external snapshot support */
2020 
2021 SPDK_LOG_REGISTER_COMPONENT(vbdev_lvol)
2022