xref: /spdk/lib/lvol/lvol.c (revision f2dbb50516db0ce1bb06e37f77fef5b437e98c6a)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk_internal/lvolstore.h"
8 #include "spdk/log.h"
9 #include "spdk/string.h"
10 #include "spdk/thread.h"
11 #include "spdk/blob_bdev.h"
12 #include "spdk/tree.h"
13 #include "spdk/util.h"
14 
15 /* Default blob channel opts for lvol */
16 #define SPDK_LVOL_BLOB_OPTS_CHANNEL_OPS 512
17 
18 #define LVOL_NAME "name"
19 
20 SPDK_LOG_REGISTER_COMPONENT(lvol)
21 
22 struct spdk_lvs_degraded_lvol_set {
23 	struct spdk_lvol_store			*lvol_store;
24 	const void				*esnap_id;
25 	uint32_t				id_len;
26 	TAILQ_HEAD(degraded_lvols, spdk_lvol)	lvols;
27 	RB_ENTRY(spdk_lvs_degraded_lvol_set)	node;
28 };
29 
30 static TAILQ_HEAD(, spdk_lvol_store) g_lvol_stores = TAILQ_HEAD_INITIALIZER(g_lvol_stores);
31 static pthread_mutex_t g_lvol_stores_mutex = PTHREAD_MUTEX_INITIALIZER;
32 
33 static inline int lvs_opts_copy(const struct spdk_lvs_opts *src, struct spdk_lvs_opts *dst);
34 static int lvs_esnap_bs_dev_create(void *bs_ctx, void *blob_ctx, struct spdk_blob *blob,
35 				   const void *esnap_id, uint32_t id_len,
36 				   struct spdk_bs_dev **_bs_dev);
37 static struct spdk_lvol *lvs_get_lvol_by_blob_id(struct spdk_lvol_store *lvs, spdk_blob_id blob_id);
38 static void lvs_degraded_lvol_set_add(struct spdk_lvs_degraded_lvol_set *degraded_set,
39 				      struct spdk_lvol *lvol);
40 static void lvs_degraded_lvol_set_remove(struct spdk_lvs_degraded_lvol_set *degraded_set,
41 		struct spdk_lvol *lvol);
42 
43 static int
44 add_lvs_to_list(struct spdk_lvol_store *lvs)
45 {
46 	struct spdk_lvol_store *tmp;
47 	bool name_conflict = false;
48 
49 	pthread_mutex_lock(&g_lvol_stores_mutex);
50 	TAILQ_FOREACH(tmp, &g_lvol_stores, link) {
51 		if (!strncmp(lvs->name, tmp->name, SPDK_LVS_NAME_MAX)) {
52 			name_conflict = true;
53 			break;
54 		}
55 	}
56 	if (!name_conflict) {
57 		lvs->on_list = true;
58 		TAILQ_INSERT_TAIL(&g_lvol_stores, lvs, link);
59 	}
60 	pthread_mutex_unlock(&g_lvol_stores_mutex);
61 
62 	return name_conflict ? -1 : 0;
63 }
64 
65 static struct spdk_lvol_store *
66 lvs_alloc(void)
67 {
68 	struct spdk_lvol_store *lvs;
69 
70 	lvs = calloc(1, sizeof(*lvs));
71 	if (lvs == NULL) {
72 		return NULL;
73 	}
74 
75 	TAILQ_INIT(&lvs->lvols);
76 	TAILQ_INIT(&lvs->pending_lvols);
77 	TAILQ_INIT(&lvs->retry_open_lvols);
78 
79 	lvs->load_esnaps = false;
80 	RB_INIT(&lvs->degraded_lvol_sets_tree);
81 	lvs->thread = spdk_get_thread();
82 
83 	return lvs;
84 }
85 
86 static void
87 lvs_free(struct spdk_lvol_store *lvs)
88 {
89 	pthread_mutex_lock(&g_lvol_stores_mutex);
90 	if (lvs->on_list) {
91 		TAILQ_REMOVE(&g_lvol_stores, lvs, link);
92 	}
93 	pthread_mutex_unlock(&g_lvol_stores_mutex);
94 
95 	assert(RB_EMPTY(&lvs->degraded_lvol_sets_tree));
96 
97 	free(lvs);
98 }
99 
100 static struct spdk_lvol *
101 lvol_alloc(struct spdk_lvol_store *lvs, const char *name, bool thin_provision,
102 	   enum lvol_clear_method clear_method)
103 {
104 	struct spdk_lvol *lvol;
105 
106 	lvol = calloc(1, sizeof(*lvol));
107 	if (lvol == NULL) {
108 		return NULL;
109 	}
110 
111 	lvol->lvol_store = lvs;
112 	lvol->clear_method = (enum blob_clear_method)clear_method;
113 	snprintf(lvol->name, sizeof(lvol->name), "%s", name);
114 	spdk_uuid_generate(&lvol->uuid);
115 	spdk_uuid_fmt_lower(lvol->uuid_str, sizeof(lvol->uuid_str), &lvol->uuid);
116 	spdk_uuid_fmt_lower(lvol->unique_id, sizeof(lvol->uuid_str), &lvol->uuid);
117 
118 	TAILQ_INSERT_TAIL(&lvs->pending_lvols, lvol, link);
119 
120 	return lvol;
121 }
122 
123 static void
124 lvol_free(struct spdk_lvol *lvol)
125 {
126 	free(lvol);
127 }
128 
129 static void
130 lvol_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
131 {
132 	struct spdk_lvol_with_handle_req *req = cb_arg;
133 	struct spdk_lvol *lvol = req->lvol;
134 
135 	if (lvolerrno != 0) {
136 		SPDK_INFOLOG(lvol, "Failed to open lvol %s\n", lvol->unique_id);
137 		goto end;
138 	}
139 
140 	lvol->ref_count++;
141 	lvol->blob = blob;
142 end:
143 	req->cb_fn(req->cb_arg, lvol, lvolerrno);
144 	free(req);
145 }
146 
147 void
148 spdk_lvol_open(struct spdk_lvol *lvol, spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
149 {
150 	struct spdk_lvol_with_handle_req *req;
151 	struct spdk_blob_open_opts opts;
152 
153 	assert(cb_fn != NULL);
154 
155 	if (lvol == NULL) {
156 		SPDK_ERRLOG("lvol does not exist\n");
157 		cb_fn(cb_arg, NULL, -ENODEV);
158 		return;
159 	}
160 
161 	if (lvol->action_in_progress == true) {
162 		SPDK_ERRLOG("Cannot open lvol - operations on lvol pending\n");
163 		cb_fn(cb_arg, lvol, -EBUSY);
164 		return;
165 	}
166 
167 	if (lvol->ref_count > 0) {
168 		lvol->ref_count++;
169 		cb_fn(cb_arg, lvol, 0);
170 		return;
171 	}
172 
173 	req = calloc(1, sizeof(*req));
174 	if (req == NULL) {
175 		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
176 		cb_fn(cb_arg, NULL, -ENOMEM);
177 		return;
178 	}
179 
180 	req->cb_fn = cb_fn;
181 	req->cb_arg = cb_arg;
182 	req->lvol = lvol;
183 
184 	spdk_blob_open_opts_init(&opts, sizeof(opts));
185 	opts.clear_method = lvol->clear_method;
186 
187 	spdk_bs_open_blob_ext(lvol->lvol_store->blobstore, lvol->blob_id, &opts, lvol_open_cb, req);
188 }
189 
190 static void
191 bs_unload_with_error_cb(void *cb_arg, int lvolerrno)
192 {
193 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
194 
195 	req->cb_fn(req->cb_arg, NULL, req->lvserrno);
196 	free(req);
197 }
198 
199 static void
200 load_next_lvol(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
201 {
202 	struct spdk_lvs_with_handle_req *req = cb_arg;
203 	struct spdk_lvol_store *lvs = req->lvol_store;
204 	struct spdk_blob_store *bs = lvs->blobstore;
205 	struct spdk_lvol *lvol, *tmp;
206 	spdk_blob_id blob_id;
207 	const char *attr;
208 	size_t value_len;
209 	int rc;
210 
211 	if (lvolerrno == -ENOENT) {
212 		/* Finished iterating */
213 		if (req->lvserrno == 0) {
214 			lvs->load_esnaps = true;
215 			req->cb_fn(req->cb_arg, lvs, req->lvserrno);
216 			free(req);
217 		} else {
218 			TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
219 				TAILQ_REMOVE(&lvs->lvols, lvol, link);
220 				free(lvol);
221 			}
222 			lvs_free(lvs);
223 			spdk_bs_unload(bs, bs_unload_with_error_cb, req);
224 		}
225 		return;
226 	} else if (lvolerrno < 0) {
227 		SPDK_ERRLOG("Failed to fetch blobs list\n");
228 		req->lvserrno = lvolerrno;
229 		goto invalid;
230 	}
231 
232 	blob_id = spdk_blob_get_id(blob);
233 
234 	if (blob_id == lvs->super_blob_id) {
235 		SPDK_INFOLOG(lvol, "found superblob %"PRIu64"\n", (uint64_t)blob_id);
236 		spdk_bs_iter_next(bs, blob, load_next_lvol, req);
237 		return;
238 	}
239 
240 	lvol = calloc(1, sizeof(*lvol));
241 	if (!lvol) {
242 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
243 		req->lvserrno = -ENOMEM;
244 		goto invalid;
245 	}
246 
247 	/*
248 	 * Do not store a reference to blob now because spdk_bs_iter_next() will close it.
249 	 * Storing blob_id for future lookups is fine.
250 	 */
251 	lvol->blob_id = blob_id;
252 	lvol->lvol_store = lvs;
253 
254 	rc = spdk_blob_get_xattr_value(blob, "uuid", (const void **)&attr, &value_len);
255 	if (rc != 0 || value_len != SPDK_UUID_STRING_LEN || attr[SPDK_UUID_STRING_LEN - 1] != '\0' ||
256 	    spdk_uuid_parse(&lvol->uuid, attr) != 0) {
257 		SPDK_INFOLOG(lvol, "Missing or corrupt lvol uuid\n");
258 		memset(&lvol->uuid, 0, sizeof(lvol->uuid));
259 	}
260 	spdk_uuid_fmt_lower(lvol->uuid_str, sizeof(lvol->uuid_str), &lvol->uuid);
261 
262 	if (!spdk_mem_all_zero(&lvol->uuid, sizeof(lvol->uuid))) {
263 		snprintf(lvol->unique_id, sizeof(lvol->unique_id), "%s", lvol->uuid_str);
264 	} else {
265 		spdk_uuid_fmt_lower(lvol->unique_id, sizeof(lvol->unique_id), &lvol->lvol_store->uuid);
266 		value_len = strlen(lvol->unique_id);
267 		snprintf(lvol->unique_id + value_len, sizeof(lvol->unique_id) - value_len, "_%"PRIu64,
268 			 (uint64_t)blob_id);
269 	}
270 
271 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&attr, &value_len);
272 	if (rc != 0 || value_len > SPDK_LVOL_NAME_MAX) {
273 		SPDK_ERRLOG("Cannot assign lvol name\n");
274 		lvol_free(lvol);
275 		req->lvserrno = -EINVAL;
276 		goto invalid;
277 	}
278 
279 	snprintf(lvol->name, sizeof(lvol->name), "%s", attr);
280 
281 	TAILQ_INSERT_TAIL(&lvs->lvols, lvol, link);
282 
283 	lvs->lvol_count++;
284 
285 	SPDK_INFOLOG(lvol, "added lvol %s (%s)\n", lvol->unique_id, lvol->uuid_str);
286 
287 invalid:
288 	spdk_bs_iter_next(bs, blob, load_next_lvol, req);
289 }
290 
291 static void
292 close_super_cb(void *cb_arg, int lvolerrno)
293 {
294 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
295 	struct spdk_lvol_store *lvs = req->lvol_store;
296 	struct spdk_blob_store *bs = lvs->blobstore;
297 
298 	if (lvolerrno != 0) {
299 		SPDK_INFOLOG(lvol, "Could not close super blob\n");
300 		lvs_free(lvs);
301 		req->lvserrno = -ENODEV;
302 		spdk_bs_unload(bs, bs_unload_with_error_cb, req);
303 		return;
304 	}
305 
306 	/* Start loading lvols */
307 	spdk_bs_iter_first(lvs->blobstore, load_next_lvol, req);
308 }
309 
310 static void
311 close_super_blob_with_error_cb(void *cb_arg, int lvolerrno)
312 {
313 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
314 	struct spdk_lvol_store *lvs = req->lvol_store;
315 	struct spdk_blob_store *bs = lvs->blobstore;
316 
317 	lvs_free(lvs);
318 
319 	spdk_bs_unload(bs, bs_unload_with_error_cb, req);
320 }
321 
322 static void
323 lvs_read_uuid(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
324 {
325 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
326 	struct spdk_lvol_store *lvs = req->lvol_store;
327 	struct spdk_blob_store *bs = lvs->blobstore;
328 	const char *attr;
329 	size_t value_len;
330 	int rc;
331 
332 	if (lvolerrno != 0) {
333 		SPDK_INFOLOG(lvol, "Could not open super blob\n");
334 		lvs_free(lvs);
335 		req->lvserrno = -ENODEV;
336 		spdk_bs_unload(bs, bs_unload_with_error_cb, req);
337 		return;
338 	}
339 
340 	rc = spdk_blob_get_xattr_value(blob, "uuid", (const void **)&attr, &value_len);
341 	if (rc != 0 || value_len != SPDK_UUID_STRING_LEN || attr[SPDK_UUID_STRING_LEN - 1] != '\0') {
342 		SPDK_INFOLOG(lvol, "degraded_set or incorrect UUID\n");
343 		req->lvserrno = -EINVAL;
344 		spdk_blob_close(blob, close_super_blob_with_error_cb, req);
345 		return;
346 	}
347 
348 	if (spdk_uuid_parse(&lvs->uuid, attr)) {
349 		SPDK_INFOLOG(lvol, "incorrect UUID '%s'\n", attr);
350 		req->lvserrno = -EINVAL;
351 		spdk_blob_close(blob, close_super_blob_with_error_cb, req);
352 		return;
353 	}
354 
355 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&attr, &value_len);
356 	if (rc != 0 || value_len > SPDK_LVS_NAME_MAX) {
357 		SPDK_INFOLOG(lvol, "degraded_set or invalid name\n");
358 		req->lvserrno = -EINVAL;
359 		spdk_blob_close(blob, close_super_blob_with_error_cb, req);
360 		return;
361 	}
362 
363 	snprintf(lvs->name, sizeof(lvs->name), "%s", attr);
364 
365 	rc = add_lvs_to_list(lvs);
366 	if (rc) {
367 		SPDK_INFOLOG(lvol, "lvolstore with name %s already exists\n", lvs->name);
368 		req->lvserrno = -EEXIST;
369 		spdk_blob_close(blob, close_super_blob_with_error_cb, req);
370 		return;
371 	}
372 
373 	lvs->super_blob_id = spdk_blob_get_id(blob);
374 
375 	spdk_blob_close(blob, close_super_cb, req);
376 }
377 
378 static void
379 lvs_open_super(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
380 {
381 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
382 	struct spdk_lvol_store *lvs = req->lvol_store;
383 	struct spdk_blob_store *bs = lvs->blobstore;
384 
385 	if (lvolerrno != 0) {
386 		SPDK_INFOLOG(lvol, "Super blob not found\n");
387 		lvs_free(lvs);
388 		req->lvserrno = -ENODEV;
389 		spdk_bs_unload(bs, bs_unload_with_error_cb, req);
390 		return;
391 	}
392 
393 	spdk_bs_open_blob(bs, blobid, lvs_read_uuid, req);
394 }
395 
396 static void
397 lvs_load_cb(void *cb_arg, struct spdk_blob_store *bs, int lvolerrno)
398 {
399 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
400 	struct spdk_lvol_store *lvs = req->lvol_store;
401 
402 	if (lvolerrno != 0) {
403 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
404 		lvs_free(lvs);
405 		free(req);
406 		return;
407 	}
408 
409 	lvs->blobstore = bs;
410 	lvs->bs_dev = req->bs_dev;
411 
412 	spdk_bs_get_super(bs, lvs_open_super, req);
413 }
414 
415 static void
416 lvs_bs_opts_init(struct spdk_bs_opts *opts)
417 {
418 	spdk_bs_opts_init(opts, sizeof(*opts));
419 	opts->max_channel_ops = SPDK_LVOL_BLOB_OPTS_CHANNEL_OPS;
420 }
421 
422 static void
423 lvs_load(struct spdk_bs_dev *bs_dev, const struct spdk_lvs_opts *_lvs_opts,
424 	 spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
425 {
426 	struct spdk_lvs_with_handle_req *req;
427 	struct spdk_bs_opts bs_opts = {};
428 	struct spdk_lvs_opts lvs_opts;
429 
430 	assert(cb_fn != NULL);
431 
432 	if (bs_dev == NULL) {
433 		SPDK_ERRLOG("Blobstore device does not exist\n");
434 		cb_fn(cb_arg, NULL, -ENODEV);
435 		return;
436 	}
437 
438 	spdk_lvs_opts_init(&lvs_opts);
439 	if (_lvs_opts != NULL) {
440 		if (lvs_opts_copy(_lvs_opts, &lvs_opts) != 0) {
441 			SPDK_ERRLOG("Invalid options\n");
442 			cb_fn(cb_arg, NULL, -EINVAL);
443 			return;
444 		}
445 	}
446 
447 	req = calloc(1, sizeof(*req));
448 	if (req == NULL) {
449 		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
450 		cb_fn(cb_arg, NULL, -ENOMEM);
451 		return;
452 	}
453 
454 	req->lvol_store = lvs_alloc();
455 	if (req->lvol_store == NULL) {
456 		SPDK_ERRLOG("Cannot alloc memory for lvol store\n");
457 		free(req);
458 		cb_fn(cb_arg, NULL, -ENOMEM);
459 		return;
460 	}
461 	req->cb_fn = cb_fn;
462 	req->cb_arg = cb_arg;
463 	req->bs_dev = bs_dev;
464 
465 	lvs_bs_opts_init(&bs_opts);
466 	snprintf(bs_opts.bstype.bstype, sizeof(bs_opts.bstype.bstype), "LVOLSTORE");
467 
468 	if (lvs_opts.esnap_bs_dev_create != NULL) {
469 		req->lvol_store->esnap_bs_dev_create = lvs_opts.esnap_bs_dev_create;
470 		bs_opts.esnap_bs_dev_create = lvs_esnap_bs_dev_create;
471 		bs_opts.esnap_ctx = req->lvol_store;
472 	}
473 
474 	spdk_bs_load(bs_dev, &bs_opts, lvs_load_cb, req);
475 }
476 
477 void
478 spdk_lvs_load(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
479 {
480 	lvs_load(bs_dev, NULL, cb_fn, cb_arg);
481 }
482 
483 void
484 spdk_lvs_load_ext(struct spdk_bs_dev *bs_dev, const struct spdk_lvs_opts *opts,
485 		  spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
486 {
487 	lvs_load(bs_dev, opts, cb_fn, cb_arg);
488 }
489 
490 static void
491 remove_bs_on_error_cb(void *cb_arg, int bserrno)
492 {
493 }
494 
495 static void
496 exit_error_lvs_req(struct spdk_lvs_with_handle_req *req, struct spdk_lvol_store *lvs, int lvolerrno)
497 {
498 	req->cb_fn(req->cb_arg, NULL, lvolerrno);
499 	spdk_bs_destroy(lvs->blobstore, remove_bs_on_error_cb, NULL);
500 	lvs_free(lvs);
501 	free(req);
502 }
503 
504 static void
505 super_create_close_cb(void *cb_arg, int lvolerrno)
506 {
507 	struct spdk_lvs_with_handle_req *req = cb_arg;
508 	struct spdk_lvol_store *lvs = req->lvol_store;
509 
510 	if (lvolerrno < 0) {
511 		SPDK_ERRLOG("Lvol store init failed: could not close super blob\n");
512 		exit_error_lvs_req(req, lvs, lvolerrno);
513 		return;
514 	}
515 
516 	req->cb_fn(req->cb_arg, lvs, lvolerrno);
517 	free(req);
518 }
519 
520 static void
521 super_blob_set_cb(void *cb_arg, int lvolerrno)
522 {
523 	struct spdk_lvs_with_handle_req *req = cb_arg;
524 	struct spdk_lvol_store *lvs = req->lvol_store;
525 	struct spdk_blob *blob = lvs->super_blob;
526 
527 	if (lvolerrno < 0) {
528 		SPDK_ERRLOG("Lvol store init failed: could not set uuid for super blob\n");
529 		exit_error_lvs_req(req, lvs, lvolerrno);
530 		return;
531 	}
532 
533 	spdk_blob_close(blob, super_create_close_cb, req);
534 }
535 
536 static void
537 super_blob_init_cb(void *cb_arg, int lvolerrno)
538 {
539 	struct spdk_lvs_with_handle_req *req = cb_arg;
540 	struct spdk_lvol_store *lvs = req->lvol_store;
541 	struct spdk_blob *blob = lvs->super_blob;
542 	char uuid[SPDK_UUID_STRING_LEN];
543 
544 	if (lvolerrno < 0) {
545 		SPDK_ERRLOG("Lvol store init failed: could not set super blob\n");
546 		exit_error_lvs_req(req, lvs, lvolerrno);
547 		return;
548 	}
549 
550 	spdk_uuid_fmt_lower(uuid, sizeof(uuid), &lvs->uuid);
551 
552 	spdk_blob_set_xattr(blob, "uuid", uuid, sizeof(uuid));
553 	spdk_blob_set_xattr(blob, "name", lvs->name, strnlen(lvs->name, SPDK_LVS_NAME_MAX) + 1);
554 	spdk_blob_sync_md(blob, super_blob_set_cb, req);
555 }
556 
557 static void
558 super_blob_create_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
559 {
560 	struct spdk_lvs_with_handle_req *req = cb_arg;
561 	struct spdk_lvol_store *lvs = req->lvol_store;
562 
563 	if (lvolerrno < 0) {
564 		SPDK_ERRLOG("Lvol store init failed: could not open super blob\n");
565 		exit_error_lvs_req(req, lvs, lvolerrno);
566 		return;
567 	}
568 
569 	lvs->super_blob = blob;
570 	lvs->super_blob_id = spdk_blob_get_id(blob);
571 
572 	spdk_bs_set_super(lvs->blobstore, lvs->super_blob_id, super_blob_init_cb, req);
573 }
574 
575 static void
576 super_blob_create_cb(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
577 {
578 	struct spdk_lvs_with_handle_req *req = cb_arg;
579 	struct spdk_lvol_store *lvs = req->lvol_store;
580 	struct spdk_blob_store *bs;
581 
582 	if (lvolerrno < 0) {
583 		SPDK_ERRLOG("Lvol store init failed: could not create super blob\n");
584 		exit_error_lvs_req(req, lvs, lvolerrno);
585 		return;
586 	}
587 
588 	bs = req->lvol_store->blobstore;
589 
590 	spdk_bs_open_blob(bs, blobid, super_blob_create_open_cb, req);
591 }
592 
593 static void
594 lvs_init_cb(void *cb_arg, struct spdk_blob_store *bs, int lvserrno)
595 {
596 	struct spdk_lvs_with_handle_req *lvs_req = cb_arg;
597 	struct spdk_lvol_store *lvs = lvs_req->lvol_store;
598 
599 	if (lvserrno != 0) {
600 		assert(bs == NULL);
601 		lvs_req->cb_fn(lvs_req->cb_arg, NULL, lvserrno);
602 		SPDK_ERRLOG("Lvol store init failed: could not initialize blobstore\n");
603 		lvs_free(lvs);
604 		free(lvs_req);
605 		return;
606 	}
607 
608 	assert(bs != NULL);
609 	lvs->blobstore = bs;
610 
611 	SPDK_INFOLOG(lvol, "Lvol store initialized\n");
612 
613 	/* create super blob */
614 	spdk_bs_create_blob(lvs->blobstore, super_blob_create_cb, lvs_req);
615 }
616 
617 void
618 spdk_lvs_opts_init(struct spdk_lvs_opts *o)
619 {
620 	memset(o, 0, sizeof(*o));
621 	o->cluster_sz = SPDK_LVS_OPTS_CLUSTER_SZ;
622 	o->clear_method = LVS_CLEAR_WITH_UNMAP;
623 	o->num_md_pages_per_cluster_ratio = 100;
624 	o->opts_size = sizeof(*o);
625 }
626 
627 static inline int
628 lvs_opts_copy(const struct spdk_lvs_opts *src, struct spdk_lvs_opts *dst)
629 {
630 	if (src->opts_size == 0) {
631 		SPDK_ERRLOG("opts_size should not be zero value\n");
632 		return -1;
633 	}
634 #define FIELD_OK(field) \
635         offsetof(struct spdk_lvs_opts, field) + sizeof(src->field) <= src->opts_size
636 
637 #define SET_FIELD(field) \
638         if (FIELD_OK(field)) { \
639                 dst->field = src->field; \
640         } \
641 
642 	SET_FIELD(cluster_sz);
643 	SET_FIELD(clear_method);
644 	if (FIELD_OK(name)) {
645 		memcpy(&dst->name, &src->name, sizeof(dst->name));
646 	}
647 	SET_FIELD(num_md_pages_per_cluster_ratio);
648 	SET_FIELD(opts_size);
649 	SET_FIELD(esnap_bs_dev_create);
650 
651 	dst->opts_size = src->opts_size;
652 
653 	/* You should not remove this statement, but need to update the assert statement
654 	 * if you add a new field, and also add a corresponding SET_FIELD statement */
655 	SPDK_STATIC_ASSERT(sizeof(struct spdk_lvs_opts) == 88, "Incorrect size");
656 
657 #undef FIELD_OK
658 #undef SET_FIELD
659 
660 	return 0;
661 }
662 
663 static void
664 setup_lvs_opts(struct spdk_bs_opts *bs_opts, struct spdk_lvs_opts *o, uint32_t total_clusters,
665 	       void *esnap_ctx)
666 {
667 	assert(o != NULL);
668 	lvs_bs_opts_init(bs_opts);
669 	bs_opts->cluster_sz = o->cluster_sz;
670 	bs_opts->clear_method = (enum bs_clear_method)o->clear_method;
671 	bs_opts->num_md_pages = (o->num_md_pages_per_cluster_ratio * total_clusters) / 100;
672 	bs_opts->esnap_bs_dev_create = o->esnap_bs_dev_create;
673 	bs_opts->esnap_ctx = esnap_ctx;
674 	snprintf(bs_opts->bstype.bstype, sizeof(bs_opts->bstype.bstype), "LVOLSTORE");
675 }
676 
677 int
678 spdk_lvs_init(struct spdk_bs_dev *bs_dev, struct spdk_lvs_opts *o,
679 	      spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
680 {
681 	struct spdk_lvol_store *lvs;
682 	struct spdk_lvs_with_handle_req *lvs_req;
683 	struct spdk_bs_opts opts = {};
684 	struct spdk_lvs_opts lvs_opts;
685 	uint32_t total_clusters;
686 	int rc;
687 
688 	if (bs_dev == NULL) {
689 		SPDK_ERRLOG("Blobstore device does not exist\n");
690 		return -ENODEV;
691 	}
692 
693 	if (o == NULL) {
694 		SPDK_ERRLOG("spdk_lvs_opts not specified\n");
695 		return -EINVAL;
696 	}
697 
698 	spdk_lvs_opts_init(&lvs_opts);
699 	if (lvs_opts_copy(o, &lvs_opts) != 0) {
700 		SPDK_ERRLOG("spdk_lvs_opts invalid\n");
701 		return -EINVAL;
702 	}
703 
704 	if (lvs_opts.cluster_sz < bs_dev->blocklen) {
705 		SPDK_ERRLOG("Cluster size %" PRIu32 " is smaller than blocklen %" PRIu32 "\n",
706 			    lvs_opts.cluster_sz, bs_dev->blocklen);
707 		return -EINVAL;
708 	}
709 	total_clusters = bs_dev->blockcnt / (lvs_opts.cluster_sz / bs_dev->blocklen);
710 
711 	lvs = lvs_alloc();
712 	if (!lvs) {
713 		SPDK_ERRLOG("Cannot alloc memory for lvol store base pointer\n");
714 		return -ENOMEM;
715 	}
716 
717 	setup_lvs_opts(&opts, o, total_clusters, lvs);
718 
719 	if (strnlen(lvs_opts.name, SPDK_LVS_NAME_MAX) == SPDK_LVS_NAME_MAX) {
720 		SPDK_ERRLOG("Name has no null terminator.\n");
721 		lvs_free(lvs);
722 		return -EINVAL;
723 	}
724 
725 	if (strnlen(lvs_opts.name, SPDK_LVS_NAME_MAX) == 0) {
726 		SPDK_ERRLOG("No name specified.\n");
727 		lvs_free(lvs);
728 		return -EINVAL;
729 	}
730 
731 	spdk_uuid_generate(&lvs->uuid);
732 	snprintf(lvs->name, sizeof(lvs->name), "%s", lvs_opts.name);
733 
734 	rc = add_lvs_to_list(lvs);
735 	if (rc) {
736 		SPDK_ERRLOG("lvolstore with name %s already exists\n", lvs->name);
737 		lvs_free(lvs);
738 		return -EEXIST;
739 	}
740 
741 	lvs_req = calloc(1, sizeof(*lvs_req));
742 	if (!lvs_req) {
743 		lvs_free(lvs);
744 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
745 		return -ENOMEM;
746 	}
747 
748 	assert(cb_fn != NULL);
749 	lvs_req->cb_fn = cb_fn;
750 	lvs_req->cb_arg = cb_arg;
751 	lvs_req->lvol_store = lvs;
752 	lvs->bs_dev = bs_dev;
753 
754 	SPDK_INFOLOG(lvol, "Initializing lvol store\n");
755 	spdk_bs_init(bs_dev, &opts, lvs_init_cb, lvs_req);
756 
757 	return 0;
758 }
759 
760 static void
761 lvs_rename_cb(void *cb_arg, int lvolerrno)
762 {
763 	struct spdk_lvs_req *req = cb_arg;
764 
765 	if (lvolerrno != 0) {
766 		req->lvserrno = lvolerrno;
767 	}
768 	if (req->lvserrno != 0) {
769 		SPDK_ERRLOG("Lvol store rename operation failed\n");
770 		/* Lvs renaming failed, so we should 'clear' new_name.
771 		 * Otherwise it could cause a failure on the next attempt to change the name to 'new_name'  */
772 		snprintf(req->lvol_store->new_name,
773 			 sizeof(req->lvol_store->new_name),
774 			 "%s", req->lvol_store->name);
775 	} else {
776 		/* Update lvs name with new_name */
777 		snprintf(req->lvol_store->name,
778 			 sizeof(req->lvol_store->name),
779 			 "%s", req->lvol_store->new_name);
780 	}
781 
782 	req->cb_fn(req->cb_arg, req->lvserrno);
783 	free(req);
784 }
785 
786 static void
787 lvs_rename_sync_cb(void *cb_arg, int lvolerrno)
788 {
789 	struct spdk_lvs_req *req = cb_arg;
790 	struct spdk_blob *blob = req->lvol_store->super_blob;
791 
792 	if (lvolerrno < 0) {
793 		req->lvserrno = lvolerrno;
794 	}
795 
796 	spdk_blob_close(blob, lvs_rename_cb, req);
797 }
798 
799 static void
800 lvs_rename_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
801 {
802 	struct spdk_lvs_req *req = cb_arg;
803 	int rc;
804 
805 	if (lvolerrno < 0) {
806 		lvs_rename_cb(cb_arg, lvolerrno);
807 		return;
808 	}
809 
810 	rc = spdk_blob_set_xattr(blob, "name", req->lvol_store->new_name,
811 				 strlen(req->lvol_store->new_name) + 1);
812 	if (rc < 0) {
813 		req->lvserrno = rc;
814 		lvs_rename_sync_cb(req, rc);
815 		return;
816 	}
817 
818 	req->lvol_store->super_blob = blob;
819 
820 	spdk_blob_sync_md(blob, lvs_rename_sync_cb, req);
821 }
822 
823 void
824 spdk_lvs_rename(struct spdk_lvol_store *lvs, const char *new_name,
825 		spdk_lvs_op_complete cb_fn, void *cb_arg)
826 {
827 	struct spdk_lvs_req *req;
828 	struct spdk_lvol_store *tmp;
829 
830 	/* Check if new name is current lvs name.
831 	 * If so, return success immediately */
832 	if (strncmp(lvs->name, new_name, SPDK_LVS_NAME_MAX) == 0) {
833 		cb_fn(cb_arg, 0);
834 		return;
835 	}
836 
837 	/* Check if new or new_name is already used in other lvs */
838 	pthread_mutex_lock(&g_lvol_stores_mutex);
839 	TAILQ_FOREACH(tmp, &g_lvol_stores, link) {
840 		if (!strncmp(new_name, tmp->name, SPDK_LVS_NAME_MAX) ||
841 		    !strncmp(new_name, tmp->new_name, SPDK_LVS_NAME_MAX)) {
842 			pthread_mutex_unlock(&g_lvol_stores_mutex);
843 			cb_fn(cb_arg, -EEXIST);
844 			return;
845 		}
846 	}
847 	pthread_mutex_unlock(&g_lvol_stores_mutex);
848 
849 	req = calloc(1, sizeof(*req));
850 	if (!req) {
851 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
852 		cb_fn(cb_arg, -ENOMEM);
853 		return;
854 	}
855 	snprintf(lvs->new_name, sizeof(lvs->new_name), "%s", new_name);
856 	req->lvol_store = lvs;
857 	req->cb_fn = cb_fn;
858 	req->cb_arg = cb_arg;
859 
860 	spdk_bs_open_blob(lvs->blobstore, lvs->super_blob_id, lvs_rename_open_cb, req);
861 }
862 
863 static void
864 _lvs_unload_cb(void *cb_arg, int lvserrno)
865 {
866 	struct spdk_lvs_req *lvs_req = cb_arg;
867 
868 	SPDK_INFOLOG(lvol, "Lvol store unloaded\n");
869 	assert(lvs_req->cb_fn != NULL);
870 	lvs_req->cb_fn(lvs_req->cb_arg, lvserrno);
871 	free(lvs_req);
872 }
873 
874 int
875 spdk_lvs_unload(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn,
876 		void *cb_arg)
877 {
878 	struct spdk_lvs_req *lvs_req;
879 	struct spdk_lvol *lvol, *tmp;
880 
881 	if (lvs == NULL) {
882 		SPDK_ERRLOG("Lvol store is NULL\n");
883 		return -ENODEV;
884 	}
885 
886 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
887 		if (lvol->action_in_progress == true) {
888 			SPDK_ERRLOG("Cannot unload lvol store - operations on lvols pending\n");
889 			cb_fn(cb_arg, -EBUSY);
890 			return -EBUSY;
891 		} else if (lvol->ref_count != 0) {
892 			SPDK_ERRLOG("Lvols still open on lvol store\n");
893 			cb_fn(cb_arg, -EBUSY);
894 			return -EBUSY;
895 		}
896 	}
897 
898 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
899 		spdk_lvs_esnap_missing_remove(lvol);
900 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
901 		lvol_free(lvol);
902 	}
903 
904 	lvs_req = calloc(1, sizeof(*lvs_req));
905 	if (!lvs_req) {
906 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
907 		return -ENOMEM;
908 	}
909 
910 	lvs_req->cb_fn = cb_fn;
911 	lvs_req->cb_arg = cb_arg;
912 
913 	SPDK_INFOLOG(lvol, "Unloading lvol store\n");
914 	spdk_bs_unload(lvs->blobstore, _lvs_unload_cb, lvs_req);
915 	lvs_free(lvs);
916 
917 	return 0;
918 }
919 
920 static void
921 _lvs_destroy_cb(void *cb_arg, int lvserrno)
922 {
923 	struct spdk_lvs_destroy_req *lvs_req = cb_arg;
924 
925 	SPDK_INFOLOG(lvol, "Lvol store destroyed\n");
926 	assert(lvs_req->cb_fn != NULL);
927 	lvs_req->cb_fn(lvs_req->cb_arg, lvserrno);
928 	free(lvs_req);
929 }
930 
931 static void
932 _lvs_destroy_super_cb(void *cb_arg, int bserrno)
933 {
934 	struct spdk_lvs_destroy_req *lvs_req = cb_arg;
935 	struct spdk_lvol_store *lvs = lvs_req->lvs;
936 
937 	assert(lvs != NULL);
938 
939 	SPDK_INFOLOG(lvol, "Destroying lvol store\n");
940 	spdk_bs_destroy(lvs->blobstore, _lvs_destroy_cb, lvs_req);
941 	lvs_free(lvs);
942 }
943 
944 int
945 spdk_lvs_destroy(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn,
946 		 void *cb_arg)
947 {
948 	struct spdk_lvs_destroy_req *lvs_req;
949 	struct spdk_lvol *iter_lvol, *tmp;
950 
951 	if (lvs == NULL) {
952 		SPDK_ERRLOG("Lvol store is NULL\n");
953 		return -ENODEV;
954 	}
955 
956 	TAILQ_FOREACH_SAFE(iter_lvol, &lvs->lvols, link, tmp) {
957 		if (iter_lvol->action_in_progress == true) {
958 			SPDK_ERRLOG("Cannot destroy lvol store - operations on lvols pending\n");
959 			cb_fn(cb_arg, -EBUSY);
960 			return -EBUSY;
961 		} else if (iter_lvol->ref_count != 0) {
962 			SPDK_ERRLOG("Lvols still open on lvol store\n");
963 			cb_fn(cb_arg, -EBUSY);
964 			return -EBUSY;
965 		}
966 	}
967 
968 	TAILQ_FOREACH_SAFE(iter_lvol, &lvs->lvols, link, tmp) {
969 		free(iter_lvol);
970 	}
971 
972 	lvs_req = calloc(1, sizeof(*lvs_req));
973 	if (!lvs_req) {
974 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
975 		return -ENOMEM;
976 	}
977 
978 	lvs_req->cb_fn = cb_fn;
979 	lvs_req->cb_arg = cb_arg;
980 	lvs_req->lvs = lvs;
981 
982 	SPDK_INFOLOG(lvol, "Deleting super blob\n");
983 	spdk_bs_delete_blob(lvs->blobstore, lvs->super_blob_id, _lvs_destroy_super_cb, lvs_req);
984 
985 	return 0;
986 }
987 
988 static void
989 lvol_close_blob_cb(void *cb_arg, int lvolerrno)
990 {
991 	struct spdk_lvol_req *req = cb_arg;
992 	struct spdk_lvol *lvol = req->lvol;
993 
994 	if (lvolerrno < 0) {
995 		SPDK_ERRLOG("Could not close blob on lvol\n");
996 		lvol_free(lvol);
997 		goto end;
998 	}
999 
1000 	lvol->ref_count--;
1001 	lvol->action_in_progress = false;
1002 	lvol->blob = NULL;
1003 	SPDK_INFOLOG(lvol, "Lvol %s closed\n", lvol->unique_id);
1004 
1005 end:
1006 	req->cb_fn(req->cb_arg, lvolerrno);
1007 	free(req);
1008 }
1009 
1010 bool
1011 spdk_lvol_deletable(struct spdk_lvol *lvol)
1012 {
1013 	size_t count = 0;
1014 
1015 	spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
1016 	return (count == 0);
1017 }
1018 
1019 static void
1020 lvol_delete_blob_cb(void *cb_arg, int lvolerrno)
1021 {
1022 	struct spdk_lvol_req *req = cb_arg;
1023 	struct spdk_lvol *lvol = req->lvol;
1024 	struct spdk_lvol *clone_lvol = req->clone_lvol;
1025 
1026 	if (lvolerrno < 0) {
1027 		SPDK_ERRLOG("Could not remove blob on lvol gracefully - forced removal\n");
1028 	} else {
1029 		SPDK_INFOLOG(lvol, "Lvol %s deleted\n", lvol->unique_id);
1030 	}
1031 
1032 	if (lvol->degraded_set != NULL) {
1033 		if (clone_lvol != NULL) {
1034 			/*
1035 			 * A degraded esnap clone that has a blob clone has been deleted. clone_lvol
1036 			 * becomes an esnap clone and needs to be associated with the
1037 			 * spdk_lvs_degraded_lvol_set.
1038 			 */
1039 			struct spdk_lvs_degraded_lvol_set *degraded_set = lvol->degraded_set;
1040 
1041 			lvs_degraded_lvol_set_remove(degraded_set, lvol);
1042 			lvs_degraded_lvol_set_add(degraded_set, clone_lvol);
1043 		} else {
1044 			spdk_lvs_esnap_missing_remove(lvol);
1045 		}
1046 	}
1047 
1048 	TAILQ_REMOVE(&lvol->lvol_store->lvols, lvol, link);
1049 	lvol_free(lvol);
1050 	req->cb_fn(req->cb_arg, lvolerrno);
1051 	free(req);
1052 }
1053 
1054 static void
1055 lvol_create_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
1056 {
1057 	struct spdk_lvol_with_handle_req *req = cb_arg;
1058 	struct spdk_lvol *lvol = req->lvol;
1059 
1060 	TAILQ_REMOVE(&req->lvol->lvol_store->pending_lvols, req->lvol, link);
1061 
1062 	if (lvolerrno < 0) {
1063 		free(lvol);
1064 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
1065 		free(req);
1066 		return;
1067 	}
1068 
1069 	lvol->blob = blob;
1070 	lvol->blob_id = spdk_blob_get_id(blob);
1071 
1072 	TAILQ_INSERT_TAIL(&lvol->lvol_store->lvols, lvol, link);
1073 
1074 	lvol->ref_count++;
1075 
1076 	assert(req->cb_fn != NULL);
1077 	req->cb_fn(req->cb_arg, req->lvol, lvolerrno);
1078 	free(req);
1079 }
1080 
1081 static void
1082 lvol_create_cb(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
1083 {
1084 	struct spdk_lvol_with_handle_req *req = cb_arg;
1085 	struct spdk_blob_store *bs;
1086 	struct spdk_blob_open_opts opts;
1087 
1088 	if (lvolerrno < 0) {
1089 		TAILQ_REMOVE(&req->lvol->lvol_store->pending_lvols, req->lvol, link);
1090 		free(req->lvol);
1091 		assert(req->cb_fn != NULL);
1092 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
1093 		free(req);
1094 		return;
1095 	}
1096 
1097 	spdk_blob_open_opts_init(&opts, sizeof(opts));
1098 	opts.clear_method = req->lvol->clear_method;
1099 	/*
1100 	 * If the lvol that is being created is an esnap clone, the blobstore needs to be able to
1101 	 * pass the lvol to the esnap_bs_dev_create callback. In order for that to happen, we need
1102 	 * to pass it here.
1103 	 *
1104 	 * This does set ensap_ctx in cases where it's not needed, but we don't know that it's not
1105 	 * needed until after the blob is open. When the blob is not an esnap clone, a reference to
1106 	 * the value stored in opts.esnap_ctx is not retained by the blobstore.
1107 	 */
1108 	opts.esnap_ctx = req->lvol;
1109 	bs = req->lvol->lvol_store->blobstore;
1110 
1111 	if (req->origlvol != NULL && req->origlvol->degraded_set != NULL) {
1112 		/*
1113 		 * A snapshot was created from a degraded esnap clone. The new snapshot is now a
1114 		 * degraded esnap clone. The previous clone is now a regular clone of a blob. Update
1115 		 * the set of directly-related clones to the missing external snapshot.
1116 		 */
1117 		struct spdk_lvs_degraded_lvol_set *degraded_set = req->origlvol->degraded_set;
1118 
1119 		lvs_degraded_lvol_set_remove(degraded_set, req->origlvol);
1120 		lvs_degraded_lvol_set_add(degraded_set, req->lvol);
1121 	}
1122 
1123 	spdk_bs_open_blob_ext(bs, blobid, &opts, lvol_create_open_cb, req);
1124 }
1125 
1126 static void
1127 lvol_get_xattr_value(void *xattr_ctx, const char *name,
1128 		     const void **value, size_t *value_len)
1129 {
1130 	struct spdk_lvol *lvol = xattr_ctx;
1131 
1132 	if (!strcmp(LVOL_NAME, name)) {
1133 		*value = lvol->name;
1134 		*value_len = SPDK_LVOL_NAME_MAX;
1135 		return;
1136 	}
1137 	if (!strcmp("uuid", name)) {
1138 		*value = lvol->uuid_str;
1139 		*value_len = sizeof(lvol->uuid_str);
1140 		return;
1141 	}
1142 	*value = NULL;
1143 	*value_len = 0;
1144 }
1145 
1146 static int
1147 lvs_verify_lvol_name(struct spdk_lvol_store *lvs, const char *name)
1148 {
1149 	struct spdk_lvol *tmp;
1150 
1151 	if (name == NULL || strnlen(name, SPDK_LVOL_NAME_MAX) == 0) {
1152 		SPDK_INFOLOG(lvol, "lvol name not provided.\n");
1153 		return -EINVAL;
1154 	}
1155 
1156 	if (strnlen(name, SPDK_LVOL_NAME_MAX) == SPDK_LVOL_NAME_MAX) {
1157 		SPDK_ERRLOG("Name has no null terminator.\n");
1158 		return -EINVAL;
1159 	}
1160 
1161 	TAILQ_FOREACH(tmp, &lvs->lvols, link) {
1162 		if (!strncmp(name, tmp->name, SPDK_LVOL_NAME_MAX)) {
1163 			SPDK_ERRLOG("lvol with name %s already exists\n", name);
1164 			return -EEXIST;
1165 		}
1166 	}
1167 
1168 	TAILQ_FOREACH(tmp, &lvs->pending_lvols, link) {
1169 		if (!strncmp(name, tmp->name, SPDK_LVOL_NAME_MAX)) {
1170 			SPDK_ERRLOG("lvol with name %s is being already created\n", name);
1171 			return -EEXIST;
1172 		}
1173 	}
1174 
1175 	return 0;
1176 }
1177 
1178 int
1179 spdk_lvol_create(struct spdk_lvol_store *lvs, const char *name, uint64_t sz,
1180 		 bool thin_provision, enum lvol_clear_method clear_method, spdk_lvol_op_with_handle_complete cb_fn,
1181 		 void *cb_arg)
1182 {
1183 	struct spdk_lvol_with_handle_req *req;
1184 	struct spdk_blob_store *bs;
1185 	struct spdk_lvol *lvol;
1186 	struct spdk_blob_opts opts;
1187 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1188 	int rc;
1189 
1190 	if (lvs == NULL) {
1191 		SPDK_ERRLOG("lvol store does not exist\n");
1192 		return -EINVAL;
1193 	}
1194 
1195 	rc = lvs_verify_lvol_name(lvs, name);
1196 	if (rc < 0) {
1197 		return rc;
1198 	}
1199 
1200 	bs = lvs->blobstore;
1201 
1202 	req = calloc(1, sizeof(*req));
1203 	if (!req) {
1204 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1205 		return -ENOMEM;
1206 	}
1207 	req->cb_fn = cb_fn;
1208 	req->cb_arg = cb_arg;
1209 
1210 	lvol = lvol_alloc(lvs, name, thin_provision, clear_method);
1211 	if (!lvol) {
1212 		free(req);
1213 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1214 		return -ENOMEM;
1215 	}
1216 
1217 	req->lvol = lvol;
1218 	spdk_blob_opts_init(&opts, sizeof(opts));
1219 	opts.thin_provision = thin_provision;
1220 	opts.num_clusters = spdk_divide_round_up(sz, spdk_bs_get_cluster_size(bs));
1221 	opts.clear_method = lvol->clear_method;
1222 	opts.xattrs.count = SPDK_COUNTOF(xattr_names);
1223 	opts.xattrs.names = xattr_names;
1224 	opts.xattrs.ctx = lvol;
1225 	opts.xattrs.get_value = lvol_get_xattr_value;
1226 
1227 	spdk_bs_create_blob_ext(lvs->blobstore, &opts, lvol_create_cb, req);
1228 
1229 	return 0;
1230 }
1231 
1232 int
1233 spdk_lvol_create_esnap_clone(const void *esnap_id, uint32_t id_len, uint64_t size_bytes,
1234 			     struct spdk_lvol_store *lvs, const char *clone_name,
1235 			     spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1236 {
1237 	struct spdk_lvol_with_handle_req *req;
1238 	struct spdk_blob_store *bs;
1239 	struct spdk_lvol *lvol;
1240 	struct spdk_blob_opts opts;
1241 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1242 	int rc;
1243 
1244 	if (lvs == NULL) {
1245 		SPDK_ERRLOG("lvol store does not exist\n");
1246 		return -EINVAL;
1247 	}
1248 
1249 	rc = lvs_verify_lvol_name(lvs, clone_name);
1250 	if (rc < 0) {
1251 		return rc;
1252 	}
1253 
1254 	bs = lvs->blobstore;
1255 
1256 	req = calloc(1, sizeof(*req));
1257 	if (!req) {
1258 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1259 		return -ENOMEM;
1260 	}
1261 	req->cb_fn = cb_fn;
1262 	req->cb_arg = cb_arg;
1263 
1264 	lvol = lvol_alloc(lvs, clone_name, true, LVOL_CLEAR_WITH_DEFAULT);
1265 	if (!lvol) {
1266 		free(req);
1267 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1268 		return -ENOMEM;
1269 	}
1270 	req->lvol = lvol;
1271 
1272 	spdk_blob_opts_init(&opts, sizeof(opts));
1273 	opts.esnap_id = esnap_id;
1274 	opts.esnap_id_len = id_len;
1275 	opts.thin_provision = true;
1276 	opts.num_clusters = spdk_divide_round_up(size_bytes, spdk_bs_get_cluster_size(bs));
1277 	opts.clear_method = lvol->clear_method;
1278 	opts.xattrs.count = SPDK_COUNTOF(xattr_names);
1279 	opts.xattrs.names = xattr_names;
1280 	opts.xattrs.ctx = lvol;
1281 	opts.xattrs.get_value = lvol_get_xattr_value;
1282 
1283 	spdk_bs_create_blob_ext(lvs->blobstore, &opts, lvol_create_cb, req);
1284 
1285 	return 0;
1286 }
1287 
1288 void
1289 spdk_lvol_create_snapshot(struct spdk_lvol *origlvol, const char *snapshot_name,
1290 			  spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1291 {
1292 	struct spdk_lvol_store *lvs;
1293 	struct spdk_lvol *newlvol;
1294 	struct spdk_blob *origblob;
1295 	struct spdk_lvol_with_handle_req *req;
1296 	struct spdk_blob_xattr_opts snapshot_xattrs;
1297 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1298 	int rc;
1299 
1300 	if (origlvol == NULL) {
1301 		SPDK_INFOLOG(lvol, "Lvol not provided.\n");
1302 		cb_fn(cb_arg, NULL, -EINVAL);
1303 		return;
1304 	}
1305 
1306 	origblob = origlvol->blob;
1307 	lvs = origlvol->lvol_store;
1308 	if (lvs == NULL) {
1309 		SPDK_ERRLOG("lvol store does not exist\n");
1310 		cb_fn(cb_arg, NULL, -EINVAL);
1311 		return;
1312 	}
1313 
1314 	rc = lvs_verify_lvol_name(lvs, snapshot_name);
1315 	if (rc < 0) {
1316 		cb_fn(cb_arg, NULL, rc);
1317 		return;
1318 	}
1319 
1320 	req = calloc(1, sizeof(*req));
1321 	if (!req) {
1322 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1323 		cb_fn(cb_arg, NULL, -ENOMEM);
1324 		return;
1325 	}
1326 
1327 	newlvol = lvol_alloc(origlvol->lvol_store, snapshot_name, true,
1328 			     (enum lvol_clear_method)origlvol->clear_method);
1329 	if (!newlvol) {
1330 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1331 		free(req);
1332 		cb_fn(cb_arg, NULL, -ENOMEM);
1333 		return;
1334 	}
1335 
1336 	snapshot_xattrs.count = SPDK_COUNTOF(xattr_names);
1337 	snapshot_xattrs.ctx = newlvol;
1338 	snapshot_xattrs.names = xattr_names;
1339 	snapshot_xattrs.get_value = lvol_get_xattr_value;
1340 	req->lvol = newlvol;
1341 	req->origlvol = origlvol;
1342 	req->cb_fn = cb_fn;
1343 	req->cb_arg = cb_arg;
1344 
1345 	spdk_bs_create_snapshot(lvs->blobstore, spdk_blob_get_id(origblob), &snapshot_xattrs,
1346 				lvol_create_cb, req);
1347 }
1348 
1349 void
1350 spdk_lvol_create_clone(struct spdk_lvol *origlvol, const char *clone_name,
1351 		       spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1352 {
1353 	struct spdk_lvol *newlvol;
1354 	struct spdk_lvol_with_handle_req *req;
1355 	struct spdk_lvol_store *lvs;
1356 	struct spdk_blob *origblob;
1357 	struct spdk_blob_xattr_opts clone_xattrs;
1358 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1359 	int rc;
1360 
1361 	if (origlvol == NULL) {
1362 		SPDK_INFOLOG(lvol, "Lvol not provided.\n");
1363 		cb_fn(cb_arg, NULL, -EINVAL);
1364 		return;
1365 	}
1366 
1367 	origblob = origlvol->blob;
1368 	lvs = origlvol->lvol_store;
1369 	if (lvs == NULL) {
1370 		SPDK_ERRLOG("lvol store does not exist\n");
1371 		cb_fn(cb_arg, NULL, -EINVAL);
1372 		return;
1373 	}
1374 
1375 	rc = lvs_verify_lvol_name(lvs, clone_name);
1376 	if (rc < 0) {
1377 		cb_fn(cb_arg, NULL, rc);
1378 		return;
1379 	}
1380 
1381 	req = calloc(1, sizeof(*req));
1382 	if (!req) {
1383 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1384 		cb_fn(cb_arg, NULL, -ENOMEM);
1385 		return;
1386 	}
1387 
1388 	newlvol = lvol_alloc(lvs, clone_name, true, (enum lvol_clear_method)origlvol->clear_method);
1389 	if (!newlvol) {
1390 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1391 		free(req);
1392 		cb_fn(cb_arg, NULL, -ENOMEM);
1393 		return;
1394 	}
1395 
1396 	clone_xattrs.count = SPDK_COUNTOF(xattr_names);
1397 	clone_xattrs.ctx = newlvol;
1398 	clone_xattrs.names = xattr_names;
1399 	clone_xattrs.get_value = lvol_get_xattr_value;
1400 	req->lvol = newlvol;
1401 	req->cb_fn = cb_fn;
1402 	req->cb_arg = cb_arg;
1403 
1404 	spdk_bs_create_clone(lvs->blobstore, spdk_blob_get_id(origblob), &clone_xattrs,
1405 			     lvol_create_cb,
1406 			     req);
1407 }
1408 
1409 static void
1410 lvol_resize_done(void *cb_arg, int lvolerrno)
1411 {
1412 	struct spdk_lvol_req *req = cb_arg;
1413 
1414 	req->cb_fn(req->cb_arg,  lvolerrno);
1415 	free(req);
1416 }
1417 
1418 static void
1419 lvol_blob_resize_cb(void *cb_arg, int bserrno)
1420 {
1421 	struct spdk_lvol_req *req = cb_arg;
1422 	struct spdk_lvol *lvol = req->lvol;
1423 
1424 	if (bserrno != 0) {
1425 		req->cb_fn(req->cb_arg, bserrno);
1426 		free(req);
1427 		return;
1428 	}
1429 
1430 	spdk_blob_sync_md(lvol->blob, lvol_resize_done, req);
1431 }
1432 
1433 void
1434 spdk_lvol_resize(struct spdk_lvol *lvol, uint64_t sz,
1435 		 spdk_lvol_op_complete cb_fn, void *cb_arg)
1436 {
1437 	struct spdk_blob *blob = lvol->blob;
1438 	struct spdk_lvol_store *lvs = lvol->lvol_store;
1439 	struct spdk_lvol_req *req;
1440 	uint64_t new_clusters = spdk_divide_round_up(sz, spdk_bs_get_cluster_size(lvs->blobstore));
1441 
1442 	req = calloc(1, sizeof(*req));
1443 	if (!req) {
1444 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1445 		cb_fn(cb_arg, -ENOMEM);
1446 		return;
1447 	}
1448 	req->cb_fn = cb_fn;
1449 	req->cb_arg = cb_arg;
1450 	req->lvol = lvol;
1451 
1452 	spdk_blob_resize(blob, new_clusters, lvol_blob_resize_cb, req);
1453 }
1454 
1455 static void
1456 lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
1457 {
1458 	struct spdk_lvol_req *req = cb_arg;
1459 
1460 	req->cb_fn(req->cb_arg, lvolerrno);
1461 	free(req);
1462 }
1463 
1464 void
1465 spdk_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1466 {
1467 	struct spdk_lvol_req *req;
1468 
1469 	req = calloc(1, sizeof(*req));
1470 	if (!req) {
1471 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1472 		cb_fn(cb_arg, -ENOMEM);
1473 		return;
1474 	}
1475 	req->cb_fn = cb_fn;
1476 	req->cb_arg = cb_arg;
1477 
1478 	spdk_blob_set_read_only(lvol->blob);
1479 	spdk_blob_sync_md(lvol->blob, lvol_set_read_only_cb, req);
1480 }
1481 
1482 static void
1483 lvol_rename_cb(void *cb_arg, int lvolerrno)
1484 {
1485 	struct spdk_lvol_req *req = cb_arg;
1486 
1487 	if (lvolerrno != 0) {
1488 		SPDK_ERRLOG("Lvol rename operation failed\n");
1489 	} else {
1490 		snprintf(req->lvol->name, sizeof(req->lvol->name), "%s", req->name);
1491 	}
1492 
1493 	req->cb_fn(req->cb_arg, lvolerrno);
1494 	free(req);
1495 }
1496 
1497 void
1498 spdk_lvol_rename(struct spdk_lvol *lvol, const char *new_name,
1499 		 spdk_lvol_op_complete cb_fn, void *cb_arg)
1500 {
1501 	struct spdk_lvol *tmp;
1502 	struct spdk_blob *blob = lvol->blob;
1503 	struct spdk_lvol_req *req;
1504 	int rc;
1505 
1506 	/* Check if new name is current lvol name.
1507 	 * If so, return success immediately */
1508 	if (strncmp(lvol->name, new_name, SPDK_LVOL_NAME_MAX) == 0) {
1509 		cb_fn(cb_arg, 0);
1510 		return;
1511 	}
1512 
1513 	/* Check if lvol with 'new_name' already exists in lvolstore */
1514 	TAILQ_FOREACH(tmp, &lvol->lvol_store->lvols, link) {
1515 		if (strncmp(tmp->name, new_name, SPDK_LVOL_NAME_MAX) == 0) {
1516 			SPDK_ERRLOG("Lvol %s already exists in lvol store %s\n", new_name, lvol->lvol_store->name);
1517 			cb_fn(cb_arg, -EEXIST);
1518 			return;
1519 		}
1520 	}
1521 
1522 	req = calloc(1, sizeof(*req));
1523 	if (!req) {
1524 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1525 		cb_fn(cb_arg, -ENOMEM);
1526 		return;
1527 	}
1528 	req->cb_fn = cb_fn;
1529 	req->cb_arg = cb_arg;
1530 	req->lvol = lvol;
1531 	snprintf(req->name, sizeof(req->name), "%s", new_name);
1532 
1533 	rc = spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1534 	if (rc < 0) {
1535 		free(req);
1536 		cb_fn(cb_arg, rc);
1537 		return;
1538 	}
1539 
1540 	spdk_blob_sync_md(blob, lvol_rename_cb, req);
1541 }
1542 
1543 void
1544 spdk_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1545 {
1546 	struct spdk_lvol_req *req;
1547 	struct spdk_blob_store *bs;
1548 	struct spdk_lvol_store	*lvs = lvol->lvol_store;
1549 	spdk_blob_id	clone_id;
1550 	size_t		count = 1;
1551 	int		rc;
1552 
1553 	assert(cb_fn != NULL);
1554 
1555 	if (lvol == NULL) {
1556 		SPDK_ERRLOG("lvol does not exist\n");
1557 		cb_fn(cb_arg, -ENODEV);
1558 		return;
1559 	}
1560 
1561 	if (lvol->ref_count != 0) {
1562 		SPDK_ERRLOG("Cannot destroy lvol %s because it is still open\n", lvol->unique_id);
1563 		cb_fn(cb_arg, -EBUSY);
1564 		return;
1565 	}
1566 
1567 	lvol->action_in_progress = true;
1568 
1569 	req = calloc(1, sizeof(*req));
1570 	if (!req) {
1571 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1572 		cb_fn(cb_arg, -ENOMEM);
1573 		return;
1574 	}
1575 
1576 	req->cb_fn = cb_fn;
1577 	req->cb_arg = cb_arg;
1578 	req->lvol = lvol;
1579 	bs = lvol->lvol_store->blobstore;
1580 
1581 	rc = spdk_blob_get_clones(lvs->blobstore, lvol->blob_id, &clone_id, &count);
1582 	if (rc == 0 && count == 1) {
1583 		req->clone_lvol = lvs_get_lvol_by_blob_id(lvs, clone_id);
1584 	} else if (rc == -ENOMEM) {
1585 		SPDK_INFOLOG(lvol, "lvol %s: cannot destroy: has %" PRIu64 " clones\n",
1586 			     lvol->unique_id, count);
1587 		free(req);
1588 		assert(count > 1);
1589 		cb_fn(cb_arg, -EBUSY);
1590 		return;
1591 	}
1592 
1593 	spdk_bs_delete_blob(bs, lvol->blob_id, lvol_delete_blob_cb, req);
1594 }
1595 
1596 void
1597 spdk_lvol_close(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1598 {
1599 	struct spdk_lvol_req *req;
1600 
1601 	assert(cb_fn != NULL);
1602 
1603 	if (lvol == NULL) {
1604 		SPDK_ERRLOG("lvol does not exist\n");
1605 		cb_fn(cb_arg, -ENODEV);
1606 		return;
1607 	}
1608 
1609 	if (lvol->ref_count > 1) {
1610 		lvol->ref_count--;
1611 		cb_fn(cb_arg, 0);
1612 		return;
1613 	} else if (lvol->ref_count == 0) {
1614 		cb_fn(cb_arg, -EINVAL);
1615 		return;
1616 	}
1617 
1618 	lvol->action_in_progress = true;
1619 
1620 	req = calloc(1, sizeof(*req));
1621 	if (!req) {
1622 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1623 		cb_fn(cb_arg, -ENOMEM);
1624 		return;
1625 	}
1626 
1627 	req->cb_fn = cb_fn;
1628 	req->cb_arg = cb_arg;
1629 	req->lvol = lvol;
1630 
1631 	spdk_blob_close(lvol->blob, lvol_close_blob_cb, req);
1632 }
1633 
1634 struct spdk_io_channel *
1635 spdk_lvol_get_io_channel(struct spdk_lvol *lvol)
1636 {
1637 	return spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1638 }
1639 
1640 static void
1641 lvol_inflate_cb(void *cb_arg, int lvolerrno)
1642 {
1643 	struct spdk_lvol_req *req = cb_arg;
1644 
1645 	spdk_bs_free_io_channel(req->channel);
1646 
1647 	if (lvolerrno < 0) {
1648 		SPDK_ERRLOG("Could not inflate lvol\n");
1649 	}
1650 
1651 	req->cb_fn(req->cb_arg, lvolerrno);
1652 	free(req);
1653 }
1654 
1655 void
1656 spdk_lvol_inflate(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1657 {
1658 	struct spdk_lvol_req *req;
1659 	spdk_blob_id blob_id;
1660 
1661 	assert(cb_fn != NULL);
1662 
1663 	if (lvol == NULL) {
1664 		SPDK_ERRLOG("Lvol does not exist\n");
1665 		cb_fn(cb_arg, -ENODEV);
1666 		return;
1667 	}
1668 
1669 	req = calloc(1, sizeof(*req));
1670 	if (!req) {
1671 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1672 		cb_fn(cb_arg, -ENOMEM);
1673 		return;
1674 	}
1675 
1676 	req->cb_fn = cb_fn;
1677 	req->cb_arg = cb_arg;
1678 	req->channel = spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1679 	if (req->channel == NULL) {
1680 		SPDK_ERRLOG("Cannot alloc io channel for lvol inflate request\n");
1681 		free(req);
1682 		cb_fn(cb_arg, -ENOMEM);
1683 		return;
1684 	}
1685 
1686 	blob_id = spdk_blob_get_id(lvol->blob);
1687 	spdk_bs_inflate_blob(lvol->lvol_store->blobstore, req->channel, blob_id, lvol_inflate_cb,
1688 			     req);
1689 }
1690 
1691 void
1692 spdk_lvol_decouple_parent(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1693 {
1694 	struct spdk_lvol_req *req;
1695 	spdk_blob_id blob_id;
1696 
1697 	assert(cb_fn != NULL);
1698 
1699 	if (lvol == NULL) {
1700 		SPDK_ERRLOG("Lvol does not exist\n");
1701 		cb_fn(cb_arg, -ENODEV);
1702 		return;
1703 	}
1704 
1705 	req = calloc(1, sizeof(*req));
1706 	if (!req) {
1707 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1708 		cb_fn(cb_arg, -ENOMEM);
1709 		return;
1710 	}
1711 
1712 	req->cb_fn = cb_fn;
1713 	req->cb_arg = cb_arg;
1714 	req->channel = spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1715 	if (req->channel == NULL) {
1716 		SPDK_ERRLOG("Cannot alloc io channel for lvol inflate request\n");
1717 		free(req);
1718 		cb_fn(cb_arg, -ENOMEM);
1719 		return;
1720 	}
1721 
1722 	blob_id = spdk_blob_get_id(lvol->blob);
1723 	spdk_bs_blob_decouple_parent(lvol->lvol_store->blobstore, req->channel, blob_id,
1724 				     lvol_inflate_cb, req);
1725 }
1726 
1727 void
1728 spdk_lvs_grow(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
1729 {
1730 	struct spdk_lvs_with_handle_req *req;
1731 	struct spdk_bs_opts opts = {};
1732 
1733 	assert(cb_fn != NULL);
1734 
1735 	if (bs_dev == NULL) {
1736 		SPDK_ERRLOG("Blobstore device does not exist\n");
1737 		cb_fn(cb_arg, NULL, -ENODEV);
1738 		return;
1739 	}
1740 
1741 	req = calloc(1, sizeof(*req));
1742 	if (req == NULL) {
1743 		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
1744 		cb_fn(cb_arg, NULL, -ENOMEM);
1745 		return;
1746 	}
1747 
1748 	req->lvol_store = lvs_alloc();
1749 	if (req->lvol_store == NULL) {
1750 		SPDK_ERRLOG("Cannot alloc memory for lvol store\n");
1751 		free(req);
1752 		cb_fn(cb_arg, NULL, -ENOMEM);
1753 		return;
1754 	}
1755 	req->cb_fn = cb_fn;
1756 	req->cb_arg = cb_arg;
1757 	req->bs_dev = bs_dev;
1758 
1759 	lvs_bs_opts_init(&opts);
1760 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "LVOLSTORE");
1761 
1762 	spdk_bs_grow(bs_dev, &opts, lvs_load_cb, req);
1763 }
1764 
1765 static struct spdk_lvol *
1766 lvs_get_lvol_by_blob_id(struct spdk_lvol_store *lvs, spdk_blob_id blob_id)
1767 {
1768 	struct spdk_lvol *lvol;
1769 
1770 	TAILQ_FOREACH(lvol, &lvs->lvols, link) {
1771 		if (lvol->blob_id == blob_id) {
1772 			return lvol;
1773 		}
1774 	}
1775 	return NULL;
1776 }
1777 
1778 static int
1779 lvs_esnap_bs_dev_create(void *bs_ctx, void *blob_ctx, struct spdk_blob *blob,
1780 			const void *esnap_id, uint32_t id_len,
1781 			struct spdk_bs_dev **bs_dev)
1782 {
1783 	struct spdk_lvol_store	*lvs = bs_ctx;
1784 	struct spdk_lvol	*lvol = blob_ctx;
1785 	spdk_blob_id		blob_id = spdk_blob_get_id(blob);
1786 
1787 	if (lvs == NULL) {
1788 		if (lvol == NULL) {
1789 			SPDK_ERRLOG("Blob 0x%" PRIx64 ": no lvs context nor lvol context\n",
1790 				    blob_id);
1791 			return -EINVAL;
1792 		}
1793 		lvs = lvol->lvol_store;
1794 	}
1795 
1796 	/*
1797 	 * When spdk_lvs_load() is called, it iterates through all blobs in its blobstore building
1798 	 * up a list of lvols (lvs->lvols). During this initial iteration, each blob is opened,
1799 	 * passed to load_next_lvol(), then closed. There is no need to open the external snapshot
1800 	 * during this phase. Once the blobstore is loaded, lvs->load_esnaps is set to true so that
1801 	 * future lvol opens cause the external snapshot to be loaded.
1802 	 */
1803 	if (!lvs->load_esnaps) {
1804 		*bs_dev = NULL;
1805 		return 0;
1806 	}
1807 
1808 	if (lvol == NULL) {
1809 		spdk_blob_id blob_id = spdk_blob_get_id(blob);
1810 
1811 		/*
1812 		 * If spdk_bs_blob_open() is used instead of spdk_bs_blob_open_ext() the lvol will
1813 		 * not have been passed in. The same is true if the open happens spontaneously due
1814 		 * to blobstore activity.
1815 		 */
1816 		lvol = lvs_get_lvol_by_blob_id(lvs, blob_id);
1817 		if (lvol == NULL) {
1818 			SPDK_ERRLOG("lvstore %s: no lvol for blob 0x%" PRIx64 "\n",
1819 				    lvs->name, blob_id);
1820 			return -ENODEV;
1821 		}
1822 	}
1823 
1824 	return lvs->esnap_bs_dev_create(lvs, lvol, blob, esnap_id, id_len, bs_dev);
1825 }
1826 
1827 /*
1828  * The theory of missing external snapshots
1829  *
1830  * The lvs->esnap_bs_dev_create() callback may be unable to create an external snapshot bs_dev when
1831  * it is called. This can happen, for instance, as when the device containing the lvolstore is
1832  * examined prior to spdk_bdev_register() being called on a bdev that acts as an external snapshot.
1833  * In such a case, the esnap_bs_dev_create() callback will call spdk_lvs_esnap_missing_add().
1834  *
1835  * Missing external snapshots are tracked in a per-lvolstore tree, lvs->degraded_lvol_sets_tree.
1836  * Each tree node (struct spdk_lvs_degraded_lvol_set) contains a tailq of lvols that are missing
1837  * that particular external snapshot.
1838  */
1839 static int
1840 lvs_esnap_name_cmp(struct spdk_lvs_degraded_lvol_set *m1, struct spdk_lvs_degraded_lvol_set *m2)
1841 {
1842 	if (m1->id_len == m2->id_len) {
1843 		return memcmp(m1->esnap_id, m2->esnap_id, m1->id_len);
1844 	}
1845 	return (m1->id_len > m2->id_len) ? 1 : -1;
1846 }
1847 
1848 RB_GENERATE_STATIC(degraded_lvol_sets_tree, spdk_lvs_degraded_lvol_set, node, lvs_esnap_name_cmp)
1849 
1850 static void
1851 lvs_degraded_lvol_set_add(struct spdk_lvs_degraded_lvol_set *degraded_set, struct spdk_lvol *lvol)
1852 {
1853 	assert(lvol->lvol_store->thread == spdk_get_thread());
1854 
1855 	lvol->degraded_set = degraded_set;
1856 	TAILQ_INSERT_TAIL(&degraded_set->lvols, lvol, degraded_link);
1857 }
1858 
1859 static void
1860 lvs_degraded_lvol_set_remove(struct spdk_lvs_degraded_lvol_set *degraded_set,
1861 			     struct spdk_lvol *lvol)
1862 {
1863 	assert(lvol->lvol_store->thread == spdk_get_thread());
1864 
1865 	lvol->degraded_set = NULL;
1866 	TAILQ_REMOVE(&degraded_set->lvols, lvol, degraded_link);
1867 	/* degraded_set->lvols may be empty. Caller should check if not immediately adding a new
1868 	 * lvol. */
1869 }
1870 
1871 /*
1872  * Record in lvs->degraded_lvol_sets_tree that a bdev of the specified name is needed by the
1873  * specified lvol.
1874  */
1875 int
1876 spdk_lvs_esnap_missing_add(struct spdk_lvol_store *lvs, struct spdk_lvol *lvol,
1877 			   const void *esnap_id, uint32_t id_len)
1878 {
1879 	struct spdk_lvs_degraded_lvol_set find, *degraded_set;
1880 
1881 	assert(lvs->thread == spdk_get_thread());
1882 
1883 	find.esnap_id = esnap_id;
1884 	find.id_len = id_len;
1885 	degraded_set = RB_FIND(degraded_lvol_sets_tree, &lvs->degraded_lvol_sets_tree, &find);
1886 	if (degraded_set == NULL) {
1887 		degraded_set = calloc(1, sizeof(*degraded_set));
1888 		if (degraded_set == NULL) {
1889 			SPDK_ERRLOG("lvol %s: cannot create degraded_set node: out of memory\n",
1890 				    lvol->unique_id);
1891 			return -ENOMEM;
1892 		}
1893 		degraded_set->esnap_id = calloc(1, id_len);
1894 		if (degraded_set->esnap_id == NULL) {
1895 			free(degraded_set);
1896 			SPDK_ERRLOG("lvol %s: cannot create degraded_set node: out of memory\n",
1897 				    lvol->unique_id);
1898 			return -ENOMEM;
1899 		}
1900 		memcpy((void *)degraded_set->esnap_id, esnap_id, id_len);
1901 		degraded_set->id_len = id_len;
1902 		degraded_set->lvol_store = lvs;
1903 		TAILQ_INIT(&degraded_set->lvols);
1904 		RB_INSERT(degraded_lvol_sets_tree, &lvs->degraded_lvol_sets_tree, degraded_set);
1905 	}
1906 
1907 	lvs_degraded_lvol_set_add(degraded_set, lvol);
1908 
1909 	return 0;
1910 }
1911 
1912 /*
1913  * Remove the record of the specified lvol needing a degraded_set bdev.
1914  */
1915 void
1916 spdk_lvs_esnap_missing_remove(struct spdk_lvol *lvol)
1917 {
1918 	struct spdk_lvol_store		*lvs = lvol->lvol_store;
1919 	struct spdk_lvs_degraded_lvol_set	*degraded_set = lvol->degraded_set;
1920 
1921 	assert(lvs->thread == spdk_get_thread());
1922 
1923 	if (degraded_set == NULL) {
1924 		return;
1925 	}
1926 
1927 	lvs_degraded_lvol_set_remove(degraded_set, lvol);
1928 
1929 	if (!TAILQ_EMPTY(&degraded_set->lvols)) {
1930 		return;
1931 	}
1932 
1933 	RB_REMOVE(degraded_lvol_sets_tree, &lvs->degraded_lvol_sets_tree, degraded_set);
1934 
1935 	free((char *)degraded_set->esnap_id);
1936 	free(degraded_set);
1937 }
1938