xref: /spdk/lib/lvol/lvol.c (revision 6c35d974ebb8d0e67a58261f46c0ebdd86baf054)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk_internal/lvolstore.h"
8 #include "spdk/log.h"
9 #include "spdk/string.h"
10 #include "spdk/thread.h"
11 #include "spdk/blob_bdev.h"
12 #include "spdk/tree.h"
13 #include "spdk/util.h"
14 
15 /* Default blob channel opts for lvol */
16 #define SPDK_LVOL_BLOB_OPTS_CHANNEL_OPS 512
17 
18 #define LVOL_NAME "name"
19 
20 SPDK_LOG_REGISTER_COMPONENT(lvol)
21 
22 struct spdk_lvs_degraded_lvol_set {
23 	struct spdk_lvol_store			*lvol_store;
24 	const void				*esnap_id;
25 	uint32_t				id_len;
26 	TAILQ_HEAD(degraded_lvols, spdk_lvol)	lvols;
27 	RB_ENTRY(spdk_lvs_degraded_lvol_set)	node;
28 };
29 
30 static TAILQ_HEAD(, spdk_lvol_store) g_lvol_stores = TAILQ_HEAD_INITIALIZER(g_lvol_stores);
31 static pthread_mutex_t g_lvol_stores_mutex = PTHREAD_MUTEX_INITIALIZER;
32 
33 static inline int lvs_opts_copy(const struct spdk_lvs_opts *src, struct spdk_lvs_opts *dst);
34 static int lvs_esnap_bs_dev_create(void *bs_ctx, void *blob_ctx, struct spdk_blob *blob,
35 				   const void *esnap_id, uint32_t id_len,
36 				   struct spdk_bs_dev **_bs_dev);
37 static struct spdk_lvol *lvs_get_lvol_by_blob_id(struct spdk_lvol_store *lvs, spdk_blob_id blob_id);
38 static void lvs_degraded_lvol_set_add(struct spdk_lvs_degraded_lvol_set *degraded_set,
39 				      struct spdk_lvol *lvol);
40 static void lvs_degraded_lvol_set_remove(struct spdk_lvs_degraded_lvol_set *degraded_set,
41 		struct spdk_lvol *lvol);
42 
43 static int
44 add_lvs_to_list(struct spdk_lvol_store *lvs)
45 {
46 	struct spdk_lvol_store *tmp;
47 	bool name_conflict = false;
48 
49 	pthread_mutex_lock(&g_lvol_stores_mutex);
50 	TAILQ_FOREACH(tmp, &g_lvol_stores, link) {
51 		if (!strncmp(lvs->name, tmp->name, SPDK_LVS_NAME_MAX)) {
52 			name_conflict = true;
53 			break;
54 		}
55 	}
56 	if (!name_conflict) {
57 		lvs->on_list = true;
58 		TAILQ_INSERT_TAIL(&g_lvol_stores, lvs, link);
59 	}
60 	pthread_mutex_unlock(&g_lvol_stores_mutex);
61 
62 	return name_conflict ? -1 : 0;
63 }
64 
65 static struct spdk_lvol_store *
66 lvs_alloc(void)
67 {
68 	struct spdk_lvol_store *lvs;
69 
70 	lvs = calloc(1, sizeof(*lvs));
71 	if (lvs == NULL) {
72 		return NULL;
73 	}
74 
75 	TAILQ_INIT(&lvs->lvols);
76 	TAILQ_INIT(&lvs->pending_lvols);
77 	TAILQ_INIT(&lvs->retry_open_lvols);
78 
79 	lvs->load_esnaps = false;
80 	RB_INIT(&lvs->degraded_lvol_sets_tree);
81 	lvs->thread = spdk_get_thread();
82 
83 	return lvs;
84 }
85 
86 static void
87 lvs_free(struct spdk_lvol_store *lvs)
88 {
89 	pthread_mutex_lock(&g_lvol_stores_mutex);
90 	if (lvs->on_list) {
91 		TAILQ_REMOVE(&g_lvol_stores, lvs, link);
92 	}
93 	pthread_mutex_unlock(&g_lvol_stores_mutex);
94 
95 	assert(RB_EMPTY(&lvs->degraded_lvol_sets_tree));
96 
97 	free(lvs);
98 }
99 
100 static struct spdk_lvol *
101 lvol_alloc(struct spdk_lvol_store *lvs, const char *name, bool thin_provision,
102 	   enum lvol_clear_method clear_method)
103 {
104 	struct spdk_lvol *lvol;
105 
106 	lvol = calloc(1, sizeof(*lvol));
107 	if (lvol == NULL) {
108 		return NULL;
109 	}
110 
111 	lvol->lvol_store = lvs;
112 	lvol->clear_method = (enum blob_clear_method)clear_method;
113 	snprintf(lvol->name, sizeof(lvol->name), "%s", name);
114 	spdk_uuid_generate(&lvol->uuid);
115 	spdk_uuid_fmt_lower(lvol->uuid_str, sizeof(lvol->uuid_str), &lvol->uuid);
116 	spdk_uuid_fmt_lower(lvol->unique_id, sizeof(lvol->uuid_str), &lvol->uuid);
117 
118 	TAILQ_INSERT_TAIL(&lvs->pending_lvols, lvol, link);
119 
120 	return lvol;
121 }
122 
123 static void
124 lvol_free(struct spdk_lvol *lvol)
125 {
126 	free(lvol);
127 }
128 
129 static void
130 lvol_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
131 {
132 	struct spdk_lvol_with_handle_req *req = cb_arg;
133 	struct spdk_lvol *lvol = req->lvol;
134 
135 	if (lvolerrno != 0) {
136 		SPDK_INFOLOG(lvol, "Failed to open lvol %s\n", lvol->unique_id);
137 		goto end;
138 	}
139 
140 	lvol->ref_count++;
141 	lvol->blob = blob;
142 end:
143 	req->cb_fn(req->cb_arg, lvol, lvolerrno);
144 	free(req);
145 }
146 
147 void
148 spdk_lvol_open(struct spdk_lvol *lvol, spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
149 {
150 	struct spdk_lvol_with_handle_req *req;
151 	struct spdk_blob_open_opts opts;
152 
153 	assert(cb_fn != NULL);
154 
155 	if (lvol == NULL) {
156 		SPDK_ERRLOG("lvol does not exist\n");
157 		cb_fn(cb_arg, NULL, -ENODEV);
158 		return;
159 	}
160 
161 	if (lvol->action_in_progress == true) {
162 		SPDK_ERRLOG("Cannot open lvol - operations on lvol pending\n");
163 		cb_fn(cb_arg, lvol, -EBUSY);
164 		return;
165 	}
166 
167 	if (lvol->ref_count > 0) {
168 		lvol->ref_count++;
169 		cb_fn(cb_arg, lvol, 0);
170 		return;
171 	}
172 
173 	req = calloc(1, sizeof(*req));
174 	if (req == NULL) {
175 		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
176 		cb_fn(cb_arg, NULL, -ENOMEM);
177 		return;
178 	}
179 
180 	req->cb_fn = cb_fn;
181 	req->cb_arg = cb_arg;
182 	req->lvol = lvol;
183 
184 	spdk_blob_open_opts_init(&opts, sizeof(opts));
185 	opts.clear_method = lvol->clear_method;
186 
187 	spdk_bs_open_blob_ext(lvol->lvol_store->blobstore, lvol->blob_id, &opts, lvol_open_cb, req);
188 }
189 
190 static void
191 bs_unload_with_error_cb(void *cb_arg, int lvolerrno)
192 {
193 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
194 
195 	req->cb_fn(req->cb_arg, NULL, req->lvserrno);
196 	free(req);
197 }
198 
199 static void
200 load_next_lvol(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
201 {
202 	struct spdk_lvs_with_handle_req *req = cb_arg;
203 	struct spdk_lvol_store *lvs = req->lvol_store;
204 	struct spdk_blob_store *bs = lvs->blobstore;
205 	struct spdk_lvol *lvol, *tmp;
206 	spdk_blob_id blob_id;
207 	const char *attr;
208 	size_t value_len;
209 	int rc;
210 
211 	if (lvolerrno == -ENOENT) {
212 		/* Finished iterating */
213 		if (req->lvserrno == 0) {
214 			lvs->load_esnaps = true;
215 			req->cb_fn(req->cb_arg, lvs, req->lvserrno);
216 			free(req);
217 		} else {
218 			TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
219 				TAILQ_REMOVE(&lvs->lvols, lvol, link);
220 				lvol_free(lvol);
221 			}
222 			lvs_free(lvs);
223 			spdk_bs_unload(bs, bs_unload_with_error_cb, req);
224 		}
225 		return;
226 	} else if (lvolerrno < 0) {
227 		SPDK_ERRLOG("Failed to fetch blobs list\n");
228 		req->lvserrno = lvolerrno;
229 		goto invalid;
230 	}
231 
232 	blob_id = spdk_blob_get_id(blob);
233 
234 	if (blob_id == lvs->super_blob_id) {
235 		SPDK_INFOLOG(lvol, "found superblob %"PRIu64"\n", (uint64_t)blob_id);
236 		spdk_bs_iter_next(bs, blob, load_next_lvol, req);
237 		return;
238 	}
239 
240 	lvol = calloc(1, sizeof(*lvol));
241 	if (!lvol) {
242 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
243 		req->lvserrno = -ENOMEM;
244 		goto invalid;
245 	}
246 
247 	/*
248 	 * Do not store a reference to blob now because spdk_bs_iter_next() will close it.
249 	 * Storing blob_id for future lookups is fine.
250 	 */
251 	lvol->blob_id = blob_id;
252 	lvol->lvol_store = lvs;
253 
254 	rc = spdk_blob_get_xattr_value(blob, "uuid", (const void **)&attr, &value_len);
255 	if (rc != 0 || value_len != SPDK_UUID_STRING_LEN || attr[SPDK_UUID_STRING_LEN - 1] != '\0' ||
256 	    spdk_uuid_parse(&lvol->uuid, attr) != 0) {
257 		SPDK_INFOLOG(lvol, "Missing or corrupt lvol uuid\n");
258 		spdk_uuid_set_null(&lvol->uuid);
259 	}
260 	spdk_uuid_fmt_lower(lvol->uuid_str, sizeof(lvol->uuid_str), &lvol->uuid);
261 
262 	if (!spdk_uuid_is_null(&lvol->uuid)) {
263 		snprintf(lvol->unique_id, sizeof(lvol->unique_id), "%s", lvol->uuid_str);
264 	} else {
265 		spdk_uuid_fmt_lower(lvol->unique_id, sizeof(lvol->unique_id), &lvol->lvol_store->uuid);
266 		value_len = strlen(lvol->unique_id);
267 		snprintf(lvol->unique_id + value_len, sizeof(lvol->unique_id) - value_len, "_%"PRIu64,
268 			 (uint64_t)blob_id);
269 	}
270 
271 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&attr, &value_len);
272 	if (rc != 0 || value_len > SPDK_LVOL_NAME_MAX) {
273 		SPDK_ERRLOG("Cannot assign lvol name\n");
274 		lvol_free(lvol);
275 		req->lvserrno = -EINVAL;
276 		goto invalid;
277 	}
278 
279 	snprintf(lvol->name, sizeof(lvol->name), "%s", attr);
280 
281 	TAILQ_INSERT_TAIL(&lvs->lvols, lvol, link);
282 
283 	lvs->lvol_count++;
284 
285 	SPDK_INFOLOG(lvol, "added lvol %s (%s)\n", lvol->unique_id, lvol->uuid_str);
286 
287 invalid:
288 	spdk_bs_iter_next(bs, blob, load_next_lvol, req);
289 }
290 
291 static void
292 close_super_cb(void *cb_arg, int lvolerrno)
293 {
294 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
295 	struct spdk_lvol_store *lvs = req->lvol_store;
296 	struct spdk_blob_store *bs = lvs->blobstore;
297 
298 	if (lvolerrno != 0) {
299 		SPDK_INFOLOG(lvol, "Could not close super blob\n");
300 		lvs_free(lvs);
301 		req->lvserrno = -ENODEV;
302 		spdk_bs_unload(bs, bs_unload_with_error_cb, req);
303 		return;
304 	}
305 
306 	/* Start loading lvols */
307 	spdk_bs_iter_first(lvs->blobstore, load_next_lvol, req);
308 }
309 
310 static void
311 close_super_blob_with_error_cb(void *cb_arg, int lvolerrno)
312 {
313 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
314 	struct spdk_lvol_store *lvs = req->lvol_store;
315 	struct spdk_blob_store *bs = lvs->blobstore;
316 
317 	lvs_free(lvs);
318 
319 	spdk_bs_unload(bs, bs_unload_with_error_cb, req);
320 }
321 
322 static void
323 lvs_read_uuid(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
324 {
325 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
326 	struct spdk_lvol_store *lvs = req->lvol_store;
327 	struct spdk_blob_store *bs = lvs->blobstore;
328 	const char *attr;
329 	size_t value_len;
330 	int rc;
331 
332 	if (lvolerrno != 0) {
333 		SPDK_INFOLOG(lvol, "Could not open super blob\n");
334 		lvs_free(lvs);
335 		req->lvserrno = -ENODEV;
336 		spdk_bs_unload(bs, bs_unload_with_error_cb, req);
337 		return;
338 	}
339 
340 	rc = spdk_blob_get_xattr_value(blob, "uuid", (const void **)&attr, &value_len);
341 	if (rc != 0 || value_len != SPDK_UUID_STRING_LEN || attr[SPDK_UUID_STRING_LEN - 1] != '\0') {
342 		SPDK_INFOLOG(lvol, "degraded_set or incorrect UUID\n");
343 		req->lvserrno = -EINVAL;
344 		spdk_blob_close(blob, close_super_blob_with_error_cb, req);
345 		return;
346 	}
347 
348 	if (spdk_uuid_parse(&lvs->uuid, attr)) {
349 		SPDK_INFOLOG(lvol, "incorrect UUID '%s'\n", attr);
350 		req->lvserrno = -EINVAL;
351 		spdk_blob_close(blob, close_super_blob_with_error_cb, req);
352 		return;
353 	}
354 
355 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&attr, &value_len);
356 	if (rc != 0 || value_len > SPDK_LVS_NAME_MAX) {
357 		SPDK_INFOLOG(lvol, "degraded_set or invalid name\n");
358 		req->lvserrno = -EINVAL;
359 		spdk_blob_close(blob, close_super_blob_with_error_cb, req);
360 		return;
361 	}
362 
363 	snprintf(lvs->name, sizeof(lvs->name), "%s", attr);
364 
365 	rc = add_lvs_to_list(lvs);
366 	if (rc) {
367 		SPDK_INFOLOG(lvol, "lvolstore with name %s already exists\n", lvs->name);
368 		req->lvserrno = -EEXIST;
369 		spdk_blob_close(blob, close_super_blob_with_error_cb, req);
370 		return;
371 	}
372 
373 	lvs->super_blob_id = spdk_blob_get_id(blob);
374 
375 	spdk_blob_close(blob, close_super_cb, req);
376 }
377 
378 static void
379 lvs_open_super(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
380 {
381 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
382 	struct spdk_lvol_store *lvs = req->lvol_store;
383 	struct spdk_blob_store *bs = lvs->blobstore;
384 
385 	if (lvolerrno != 0) {
386 		SPDK_INFOLOG(lvol, "Super blob not found\n");
387 		lvs_free(lvs);
388 		req->lvserrno = -ENODEV;
389 		spdk_bs_unload(bs, bs_unload_with_error_cb, req);
390 		return;
391 	}
392 
393 	spdk_bs_open_blob(bs, blobid, lvs_read_uuid, req);
394 }
395 
396 static void
397 lvs_load_cb(void *cb_arg, struct spdk_blob_store *bs, int lvolerrno)
398 {
399 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
400 	struct spdk_lvol_store *lvs = req->lvol_store;
401 
402 	if (lvolerrno != 0) {
403 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
404 		lvs_free(lvs);
405 		free(req);
406 		return;
407 	}
408 
409 	lvs->blobstore = bs;
410 	lvs->bs_dev = req->bs_dev;
411 
412 	spdk_bs_get_super(bs, lvs_open_super, req);
413 }
414 
415 static void
416 lvs_bs_opts_init(struct spdk_bs_opts *opts)
417 {
418 	spdk_bs_opts_init(opts, sizeof(*opts));
419 	opts->max_channel_ops = SPDK_LVOL_BLOB_OPTS_CHANNEL_OPS;
420 }
421 
422 static void
423 lvs_load(struct spdk_bs_dev *bs_dev, const struct spdk_lvs_opts *_lvs_opts,
424 	 spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
425 {
426 	struct spdk_lvs_with_handle_req *req;
427 	struct spdk_bs_opts bs_opts = {};
428 	struct spdk_lvs_opts lvs_opts;
429 
430 	assert(cb_fn != NULL);
431 
432 	if (bs_dev == NULL) {
433 		SPDK_ERRLOG("Blobstore device does not exist\n");
434 		cb_fn(cb_arg, NULL, -ENODEV);
435 		return;
436 	}
437 
438 	spdk_lvs_opts_init(&lvs_opts);
439 	if (_lvs_opts != NULL) {
440 		if (lvs_opts_copy(_lvs_opts, &lvs_opts) != 0) {
441 			SPDK_ERRLOG("Invalid options\n");
442 			cb_fn(cb_arg, NULL, -EINVAL);
443 			return;
444 		}
445 	}
446 
447 	req = calloc(1, sizeof(*req));
448 	if (req == NULL) {
449 		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
450 		cb_fn(cb_arg, NULL, -ENOMEM);
451 		return;
452 	}
453 
454 	req->lvol_store = lvs_alloc();
455 	if (req->lvol_store == NULL) {
456 		SPDK_ERRLOG("Cannot alloc memory for lvol store\n");
457 		free(req);
458 		cb_fn(cb_arg, NULL, -ENOMEM);
459 		return;
460 	}
461 	req->cb_fn = cb_fn;
462 	req->cb_arg = cb_arg;
463 	req->bs_dev = bs_dev;
464 
465 	lvs_bs_opts_init(&bs_opts);
466 	snprintf(bs_opts.bstype.bstype, sizeof(bs_opts.bstype.bstype), "LVOLSTORE");
467 
468 	if (lvs_opts.esnap_bs_dev_create != NULL) {
469 		req->lvol_store->esnap_bs_dev_create = lvs_opts.esnap_bs_dev_create;
470 		bs_opts.esnap_bs_dev_create = lvs_esnap_bs_dev_create;
471 		bs_opts.esnap_ctx = req->lvol_store;
472 	}
473 
474 	spdk_bs_load(bs_dev, &bs_opts, lvs_load_cb, req);
475 }
476 
477 void
478 spdk_lvs_load(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
479 {
480 	lvs_load(bs_dev, NULL, cb_fn, cb_arg);
481 }
482 
483 void
484 spdk_lvs_load_ext(struct spdk_bs_dev *bs_dev, const struct spdk_lvs_opts *opts,
485 		  spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
486 {
487 	lvs_load(bs_dev, opts, cb_fn, cb_arg);
488 }
489 
490 static void
491 remove_bs_on_error_cb(void *cb_arg, int bserrno)
492 {
493 }
494 
495 static void
496 exit_error_lvs_req(struct spdk_lvs_with_handle_req *req, struct spdk_lvol_store *lvs, int lvolerrno)
497 {
498 	req->cb_fn(req->cb_arg, NULL, lvolerrno);
499 	spdk_bs_destroy(lvs->blobstore, remove_bs_on_error_cb, NULL);
500 	lvs_free(lvs);
501 	free(req);
502 }
503 
504 static void
505 super_create_close_cb(void *cb_arg, int lvolerrno)
506 {
507 	struct spdk_lvs_with_handle_req *req = cb_arg;
508 	struct spdk_lvol_store *lvs = req->lvol_store;
509 
510 	if (lvolerrno < 0) {
511 		SPDK_ERRLOG("Lvol store init failed: could not close super blob\n");
512 		exit_error_lvs_req(req, lvs, lvolerrno);
513 		return;
514 	}
515 
516 	req->cb_fn(req->cb_arg, lvs, lvolerrno);
517 	free(req);
518 }
519 
520 static void
521 super_blob_set_cb(void *cb_arg, int lvolerrno)
522 {
523 	struct spdk_lvs_with_handle_req *req = cb_arg;
524 	struct spdk_lvol_store *lvs = req->lvol_store;
525 	struct spdk_blob *blob = lvs->super_blob;
526 
527 	if (lvolerrno < 0) {
528 		SPDK_ERRLOG("Lvol store init failed: could not set uuid for super blob\n");
529 		exit_error_lvs_req(req, lvs, lvolerrno);
530 		return;
531 	}
532 
533 	spdk_blob_close(blob, super_create_close_cb, req);
534 }
535 
536 static void
537 super_blob_init_cb(void *cb_arg, int lvolerrno)
538 {
539 	struct spdk_lvs_with_handle_req *req = cb_arg;
540 	struct spdk_lvol_store *lvs = req->lvol_store;
541 	struct spdk_blob *blob = lvs->super_blob;
542 	char uuid[SPDK_UUID_STRING_LEN];
543 
544 	if (lvolerrno < 0) {
545 		SPDK_ERRLOG("Lvol store init failed: could not set super blob\n");
546 		exit_error_lvs_req(req, lvs, lvolerrno);
547 		return;
548 	}
549 
550 	spdk_uuid_fmt_lower(uuid, sizeof(uuid), &lvs->uuid);
551 
552 	spdk_blob_set_xattr(blob, "uuid", uuid, sizeof(uuid));
553 	spdk_blob_set_xattr(blob, "name", lvs->name, strnlen(lvs->name, SPDK_LVS_NAME_MAX) + 1);
554 	spdk_blob_sync_md(blob, super_blob_set_cb, req);
555 }
556 
557 static void
558 super_blob_create_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
559 {
560 	struct spdk_lvs_with_handle_req *req = cb_arg;
561 	struct spdk_lvol_store *lvs = req->lvol_store;
562 
563 	if (lvolerrno < 0) {
564 		SPDK_ERRLOG("Lvol store init failed: could not open super blob\n");
565 		exit_error_lvs_req(req, lvs, lvolerrno);
566 		return;
567 	}
568 
569 	lvs->super_blob = blob;
570 	lvs->super_blob_id = spdk_blob_get_id(blob);
571 
572 	spdk_bs_set_super(lvs->blobstore, lvs->super_blob_id, super_blob_init_cb, req);
573 }
574 
575 static void
576 super_blob_create_cb(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
577 {
578 	struct spdk_lvs_with_handle_req *req = cb_arg;
579 	struct spdk_lvol_store *lvs = req->lvol_store;
580 	struct spdk_blob_store *bs;
581 
582 	if (lvolerrno < 0) {
583 		SPDK_ERRLOG("Lvol store init failed: could not create super blob\n");
584 		exit_error_lvs_req(req, lvs, lvolerrno);
585 		return;
586 	}
587 
588 	bs = req->lvol_store->blobstore;
589 
590 	spdk_bs_open_blob(bs, blobid, super_blob_create_open_cb, req);
591 }
592 
593 static void
594 lvs_init_cb(void *cb_arg, struct spdk_blob_store *bs, int lvserrno)
595 {
596 	struct spdk_lvs_with_handle_req *lvs_req = cb_arg;
597 	struct spdk_lvol_store *lvs = lvs_req->lvol_store;
598 
599 	if (lvserrno != 0) {
600 		assert(bs == NULL);
601 		lvs_req->cb_fn(lvs_req->cb_arg, NULL, lvserrno);
602 		SPDK_ERRLOG("Lvol store init failed: could not initialize blobstore\n");
603 		lvs_free(lvs);
604 		free(lvs_req);
605 		return;
606 	}
607 
608 	assert(bs != NULL);
609 	lvs->blobstore = bs;
610 
611 	SPDK_INFOLOG(lvol, "Lvol store initialized\n");
612 
613 	/* create super blob */
614 	spdk_bs_create_blob(lvs->blobstore, super_blob_create_cb, lvs_req);
615 }
616 
617 void
618 spdk_lvs_opts_init(struct spdk_lvs_opts *o)
619 {
620 	memset(o, 0, sizeof(*o));
621 	o->cluster_sz = SPDK_LVS_OPTS_CLUSTER_SZ;
622 	o->clear_method = LVS_CLEAR_WITH_UNMAP;
623 	o->num_md_pages_per_cluster_ratio = 100;
624 	o->opts_size = sizeof(*o);
625 }
626 
627 static inline int
628 lvs_opts_copy(const struct spdk_lvs_opts *src, struct spdk_lvs_opts *dst)
629 {
630 	if (src->opts_size == 0) {
631 		SPDK_ERRLOG("opts_size should not be zero value\n");
632 		return -1;
633 	}
634 #define FIELD_OK(field) \
635         offsetof(struct spdk_lvs_opts, field) + sizeof(src->field) <= src->opts_size
636 
637 #define SET_FIELD(field) \
638         if (FIELD_OK(field)) { \
639                 dst->field = src->field; \
640         } \
641 
642 	SET_FIELD(cluster_sz);
643 	SET_FIELD(clear_method);
644 	if (FIELD_OK(name)) {
645 		memcpy(&dst->name, &src->name, sizeof(dst->name));
646 	}
647 	SET_FIELD(num_md_pages_per_cluster_ratio);
648 	SET_FIELD(opts_size);
649 	SET_FIELD(esnap_bs_dev_create);
650 	SET_FIELD(md_page_size);
651 
652 	dst->opts_size = src->opts_size;
653 
654 	/* You should not remove this statement, but need to update the assert statement
655 	 * if you add a new field, and also add a corresponding SET_FIELD statement */
656 	SPDK_STATIC_ASSERT(sizeof(struct spdk_lvs_opts) == 92, "Incorrect size");
657 
658 #undef FIELD_OK
659 #undef SET_FIELD
660 
661 	return 0;
662 }
663 
664 static void
665 setup_lvs_opts(struct spdk_bs_opts *bs_opts, struct spdk_lvs_opts *o, uint32_t total_clusters,
666 	       void *esnap_ctx)
667 {
668 	assert(o != NULL);
669 	lvs_bs_opts_init(bs_opts);
670 	bs_opts->cluster_sz = o->cluster_sz;
671 	bs_opts->clear_method = (enum bs_clear_method)o->clear_method;
672 	bs_opts->num_md_pages = (o->num_md_pages_per_cluster_ratio * total_clusters) / 100;
673 	bs_opts->md_page_size = o->md_page_size;
674 	bs_opts->esnap_bs_dev_create = o->esnap_bs_dev_create;
675 	bs_opts->esnap_ctx = esnap_ctx;
676 	snprintf(bs_opts->bstype.bstype, sizeof(bs_opts->bstype.bstype), "LVOLSTORE");
677 }
678 
679 int
680 spdk_lvs_init(struct spdk_bs_dev *bs_dev, struct spdk_lvs_opts *o,
681 	      spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
682 {
683 	struct spdk_lvol_store *lvs;
684 	struct spdk_lvs_with_handle_req *lvs_req;
685 	struct spdk_bs_opts opts = {};
686 	struct spdk_lvs_opts lvs_opts;
687 	uint32_t total_clusters;
688 	int rc;
689 
690 	if (bs_dev == NULL) {
691 		SPDK_ERRLOG("Blobstore device does not exist\n");
692 		return -ENODEV;
693 	}
694 
695 	if (o == NULL) {
696 		SPDK_ERRLOG("spdk_lvs_opts not specified\n");
697 		return -EINVAL;
698 	}
699 
700 	spdk_lvs_opts_init(&lvs_opts);
701 	if (lvs_opts_copy(o, &lvs_opts) != 0) {
702 		SPDK_ERRLOG("spdk_lvs_opts invalid\n");
703 		return -EINVAL;
704 	}
705 
706 	if (lvs_opts.cluster_sz < bs_dev->blocklen || (lvs_opts.cluster_sz % bs_dev->blocklen) != 0) {
707 		SPDK_ERRLOG("Cluster size %" PRIu32 " is smaller than blocklen %" PRIu32
708 			    "Or not an integral multiple\n", lvs_opts.cluster_sz, bs_dev->blocklen);
709 		return -EINVAL;
710 	}
711 	total_clusters = bs_dev->blockcnt / (lvs_opts.cluster_sz / bs_dev->blocklen);
712 
713 	lvs = lvs_alloc();
714 	if (!lvs) {
715 		SPDK_ERRLOG("Cannot alloc memory for lvol store base pointer\n");
716 		return -ENOMEM;
717 	}
718 
719 	setup_lvs_opts(&opts, o, total_clusters, lvs);
720 
721 	if (strnlen(lvs_opts.name, SPDK_LVS_NAME_MAX) == SPDK_LVS_NAME_MAX) {
722 		SPDK_ERRLOG("Name has no null terminator.\n");
723 		lvs_free(lvs);
724 		return -EINVAL;
725 	}
726 
727 	if (strnlen(lvs_opts.name, SPDK_LVS_NAME_MAX) == 0) {
728 		SPDK_ERRLOG("No name specified.\n");
729 		lvs_free(lvs);
730 		return -EINVAL;
731 	}
732 
733 	spdk_uuid_generate(&lvs->uuid);
734 	snprintf(lvs->name, sizeof(lvs->name), "%s", lvs_opts.name);
735 
736 	rc = add_lvs_to_list(lvs);
737 	if (rc) {
738 		SPDK_ERRLOG("lvolstore with name %s already exists\n", lvs->name);
739 		lvs_free(lvs);
740 		return -EEXIST;
741 	}
742 
743 	lvs_req = calloc(1, sizeof(*lvs_req));
744 	if (!lvs_req) {
745 		lvs_free(lvs);
746 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
747 		return -ENOMEM;
748 	}
749 
750 	assert(cb_fn != NULL);
751 	lvs_req->cb_fn = cb_fn;
752 	lvs_req->cb_arg = cb_arg;
753 	lvs_req->lvol_store = lvs;
754 	lvs->bs_dev = bs_dev;
755 
756 	SPDK_INFOLOG(lvol, "Initializing lvol store\n");
757 	spdk_bs_init(bs_dev, &opts, lvs_init_cb, lvs_req);
758 
759 	return 0;
760 }
761 
762 static void
763 lvs_rename_cb(void *cb_arg, int lvolerrno)
764 {
765 	struct spdk_lvs_req *req = cb_arg;
766 
767 	if (lvolerrno != 0) {
768 		req->lvserrno = lvolerrno;
769 	}
770 	if (req->lvserrno != 0) {
771 		SPDK_ERRLOG("Lvol store rename operation failed\n");
772 		/* Lvs renaming failed, so we should 'clear' new_name.
773 		 * Otherwise it could cause a failure on the next attempt to change the name to 'new_name'  */
774 		snprintf(req->lvol_store->new_name,
775 			 sizeof(req->lvol_store->new_name),
776 			 "%s", req->lvol_store->name);
777 	} else {
778 		/* Update lvs name with new_name */
779 		snprintf(req->lvol_store->name,
780 			 sizeof(req->lvol_store->name),
781 			 "%s", req->lvol_store->new_name);
782 	}
783 
784 	req->cb_fn(req->cb_arg, req->lvserrno);
785 	free(req);
786 }
787 
788 static void
789 lvs_rename_sync_cb(void *cb_arg, int lvolerrno)
790 {
791 	struct spdk_lvs_req *req = cb_arg;
792 	struct spdk_blob *blob = req->lvol_store->super_blob;
793 
794 	if (lvolerrno < 0) {
795 		req->lvserrno = lvolerrno;
796 	}
797 
798 	spdk_blob_close(blob, lvs_rename_cb, req);
799 }
800 
801 static void
802 lvs_rename_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
803 {
804 	struct spdk_lvs_req *req = cb_arg;
805 	int rc;
806 
807 	if (lvolerrno < 0) {
808 		lvs_rename_cb(cb_arg, lvolerrno);
809 		return;
810 	}
811 
812 	rc = spdk_blob_set_xattr(blob, "name", req->lvol_store->new_name,
813 				 strlen(req->lvol_store->new_name) + 1);
814 	if (rc < 0) {
815 		req->lvserrno = rc;
816 		lvs_rename_sync_cb(req, rc);
817 		return;
818 	}
819 
820 	req->lvol_store->super_blob = blob;
821 
822 	spdk_blob_sync_md(blob, lvs_rename_sync_cb, req);
823 }
824 
825 void
826 spdk_lvs_rename(struct spdk_lvol_store *lvs, const char *new_name,
827 		spdk_lvs_op_complete cb_fn, void *cb_arg)
828 {
829 	struct spdk_lvs_req *req;
830 	struct spdk_lvol_store *tmp;
831 
832 	/* Check if new name is current lvs name.
833 	 * If so, return success immediately */
834 	if (strncmp(lvs->name, new_name, SPDK_LVS_NAME_MAX) == 0) {
835 		cb_fn(cb_arg, 0);
836 		return;
837 	}
838 
839 	/* Check if new or new_name is already used in other lvs */
840 	pthread_mutex_lock(&g_lvol_stores_mutex);
841 	TAILQ_FOREACH(tmp, &g_lvol_stores, link) {
842 		if (!strncmp(new_name, tmp->name, SPDK_LVS_NAME_MAX) ||
843 		    !strncmp(new_name, tmp->new_name, SPDK_LVS_NAME_MAX)) {
844 			pthread_mutex_unlock(&g_lvol_stores_mutex);
845 			cb_fn(cb_arg, -EEXIST);
846 			return;
847 		}
848 	}
849 	pthread_mutex_unlock(&g_lvol_stores_mutex);
850 
851 	req = calloc(1, sizeof(*req));
852 	if (!req) {
853 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
854 		cb_fn(cb_arg, -ENOMEM);
855 		return;
856 	}
857 	snprintf(lvs->new_name, sizeof(lvs->new_name), "%s", new_name);
858 	req->lvol_store = lvs;
859 	req->cb_fn = cb_fn;
860 	req->cb_arg = cb_arg;
861 
862 	spdk_bs_open_blob(lvs->blobstore, lvs->super_blob_id, lvs_rename_open_cb, req);
863 }
864 
865 static void
866 _lvs_unload_cb(void *cb_arg, int lvserrno)
867 {
868 	struct spdk_lvs_req *lvs_req = cb_arg;
869 
870 	SPDK_INFOLOG(lvol, "Lvol store unloaded\n");
871 	assert(lvs_req->cb_fn != NULL);
872 	lvs_req->cb_fn(lvs_req->cb_arg, lvserrno);
873 	free(lvs_req);
874 }
875 
876 int
877 spdk_lvs_unload(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn,
878 		void *cb_arg)
879 {
880 	struct spdk_lvs_req *lvs_req;
881 	struct spdk_lvol *lvol, *tmp;
882 
883 	if (lvs == NULL) {
884 		SPDK_ERRLOG("Lvol store is NULL\n");
885 		return -ENODEV;
886 	}
887 
888 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
889 		if (lvol->action_in_progress == true) {
890 			SPDK_ERRLOG("Cannot unload lvol store - operations on lvols pending\n");
891 			cb_fn(cb_arg, -EBUSY);
892 			return -EBUSY;
893 		} else if (lvol->ref_count != 0) {
894 			SPDK_ERRLOG("Lvols still open on lvol store\n");
895 			cb_fn(cb_arg, -EBUSY);
896 			return -EBUSY;
897 		}
898 	}
899 
900 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
901 		spdk_lvs_esnap_missing_remove(lvol);
902 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
903 		lvol_free(lvol);
904 	}
905 
906 	lvs_req = calloc(1, sizeof(*lvs_req));
907 	if (!lvs_req) {
908 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
909 		return -ENOMEM;
910 	}
911 
912 	lvs_req->cb_fn = cb_fn;
913 	lvs_req->cb_arg = cb_arg;
914 
915 	SPDK_INFOLOG(lvol, "Unloading lvol store\n");
916 	spdk_bs_unload(lvs->blobstore, _lvs_unload_cb, lvs_req);
917 	lvs_free(lvs);
918 
919 	return 0;
920 }
921 
922 static void
923 _lvs_destroy_cb(void *cb_arg, int lvserrno)
924 {
925 	struct spdk_lvs_destroy_req *lvs_req = cb_arg;
926 
927 	SPDK_INFOLOG(lvol, "Lvol store destroyed\n");
928 	assert(lvs_req->cb_fn != NULL);
929 	lvs_req->cb_fn(lvs_req->cb_arg, lvserrno);
930 	free(lvs_req);
931 }
932 
933 static void
934 _lvs_destroy_super_cb(void *cb_arg, int bserrno)
935 {
936 	struct spdk_lvs_destroy_req *lvs_req = cb_arg;
937 	struct spdk_lvol_store *lvs = lvs_req->lvs;
938 
939 	assert(lvs != NULL);
940 
941 	SPDK_INFOLOG(lvol, "Destroying lvol store\n");
942 	spdk_bs_destroy(lvs->blobstore, _lvs_destroy_cb, lvs_req);
943 	lvs_free(lvs);
944 }
945 
946 int
947 spdk_lvs_destroy(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn,
948 		 void *cb_arg)
949 {
950 	struct spdk_lvs_destroy_req *lvs_req;
951 	struct spdk_lvol *iter_lvol, *tmp;
952 
953 	if (lvs == NULL) {
954 		SPDK_ERRLOG("Lvol store is NULL\n");
955 		return -ENODEV;
956 	}
957 
958 	TAILQ_FOREACH_SAFE(iter_lvol, &lvs->lvols, link, tmp) {
959 		if (iter_lvol->action_in_progress == true) {
960 			SPDK_ERRLOG("Cannot destroy lvol store - operations on lvols pending\n");
961 			cb_fn(cb_arg, -EBUSY);
962 			return -EBUSY;
963 		} else if (iter_lvol->ref_count != 0) {
964 			SPDK_ERRLOG("Lvols still open on lvol store\n");
965 			cb_fn(cb_arg, -EBUSY);
966 			return -EBUSY;
967 		}
968 	}
969 
970 	TAILQ_FOREACH_SAFE(iter_lvol, &lvs->lvols, link, tmp) {
971 		free(iter_lvol);
972 	}
973 
974 	lvs_req = calloc(1, sizeof(*lvs_req));
975 	if (!lvs_req) {
976 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
977 		return -ENOMEM;
978 	}
979 
980 	lvs_req->cb_fn = cb_fn;
981 	lvs_req->cb_arg = cb_arg;
982 	lvs_req->lvs = lvs;
983 
984 	SPDK_INFOLOG(lvol, "Deleting super blob\n");
985 	spdk_bs_delete_blob(lvs->blobstore, lvs->super_blob_id, _lvs_destroy_super_cb, lvs_req);
986 
987 	return 0;
988 }
989 
990 static void
991 lvol_close_blob_cb(void *cb_arg, int lvolerrno)
992 {
993 	struct spdk_lvol_req *req = cb_arg;
994 	struct spdk_lvol *lvol = req->lvol;
995 
996 	if (lvolerrno < 0) {
997 		SPDK_ERRLOG("Could not close blob on lvol\n");
998 		goto end;
999 	}
1000 
1001 	lvol->ref_count--;
1002 	lvol->blob = NULL;
1003 	SPDK_INFOLOG(lvol, "Lvol %s closed\n", lvol->unique_id);
1004 
1005 end:
1006 	lvol->action_in_progress = false;
1007 	req->cb_fn(req->cb_arg, lvolerrno);
1008 	free(req);
1009 }
1010 
1011 bool
1012 spdk_lvol_deletable(struct spdk_lvol *lvol)
1013 {
1014 	size_t count = 0;
1015 
1016 	spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
1017 	return (count == 0);
1018 }
1019 
1020 static void
1021 lvol_delete_blob_cb(void *cb_arg, int lvolerrno)
1022 {
1023 	struct spdk_lvol_req *req = cb_arg;
1024 	struct spdk_lvol *lvol = req->lvol;
1025 	struct spdk_lvol *clone_lvol = req->clone_lvol;
1026 
1027 	if (lvolerrno < 0) {
1028 		SPDK_ERRLOG("Could not remove blob on lvol gracefully - forced removal\n");
1029 	} else {
1030 		SPDK_INFOLOG(lvol, "Lvol %s deleted\n", lvol->unique_id);
1031 	}
1032 
1033 	if (lvol->degraded_set != NULL) {
1034 		if (clone_lvol != NULL) {
1035 			/*
1036 			 * A degraded esnap clone that has a blob clone has been deleted. clone_lvol
1037 			 * becomes an esnap clone and needs to be associated with the
1038 			 * spdk_lvs_degraded_lvol_set.
1039 			 */
1040 			struct spdk_lvs_degraded_lvol_set *degraded_set = lvol->degraded_set;
1041 
1042 			lvs_degraded_lvol_set_remove(degraded_set, lvol);
1043 			lvs_degraded_lvol_set_add(degraded_set, clone_lvol);
1044 		} else {
1045 			spdk_lvs_esnap_missing_remove(lvol);
1046 		}
1047 	}
1048 
1049 	TAILQ_REMOVE(&lvol->lvol_store->lvols, lvol, link);
1050 	lvol_free(lvol);
1051 	req->cb_fn(req->cb_arg, lvolerrno);
1052 	free(req);
1053 }
1054 
1055 static void
1056 lvol_create_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
1057 {
1058 	struct spdk_lvol_with_handle_req *req = cb_arg;
1059 	struct spdk_lvol *lvol = req->lvol;
1060 
1061 	TAILQ_REMOVE(&req->lvol->lvol_store->pending_lvols, req->lvol, link);
1062 
1063 	if (lvolerrno < 0) {
1064 		lvol_free(lvol);
1065 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
1066 		free(req);
1067 		return;
1068 	}
1069 
1070 	lvol->blob = blob;
1071 	lvol->blob_id = spdk_blob_get_id(blob);
1072 
1073 	TAILQ_INSERT_TAIL(&lvol->lvol_store->lvols, lvol, link);
1074 
1075 	lvol->ref_count++;
1076 
1077 	assert(req->cb_fn != NULL);
1078 	req->cb_fn(req->cb_arg, req->lvol, lvolerrno);
1079 	free(req);
1080 }
1081 
1082 static void
1083 lvol_create_cb(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
1084 {
1085 	struct spdk_lvol_with_handle_req *req = cb_arg;
1086 	struct spdk_blob_store *bs;
1087 	struct spdk_blob_open_opts opts;
1088 
1089 	if (lvolerrno < 0) {
1090 		TAILQ_REMOVE(&req->lvol->lvol_store->pending_lvols, req->lvol, link);
1091 		lvol_free(req->lvol);
1092 		assert(req->cb_fn != NULL);
1093 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
1094 		free(req);
1095 		return;
1096 	}
1097 
1098 	spdk_blob_open_opts_init(&opts, sizeof(opts));
1099 	opts.clear_method = req->lvol->clear_method;
1100 	/*
1101 	 * If the lvol that is being created is an esnap clone, the blobstore needs to be able to
1102 	 * pass the lvol to the esnap_bs_dev_create callback. In order for that to happen, we need
1103 	 * to pass it here.
1104 	 *
1105 	 * This does set ensap_ctx in cases where it's not needed, but we don't know that it's not
1106 	 * needed until after the blob is open. When the blob is not an esnap clone, a reference to
1107 	 * the value stored in opts.esnap_ctx is not retained by the blobstore.
1108 	 */
1109 	opts.esnap_ctx = req->lvol;
1110 	bs = req->lvol->lvol_store->blobstore;
1111 
1112 	if (req->origlvol != NULL && req->origlvol->degraded_set != NULL) {
1113 		/*
1114 		 * A snapshot was created from a degraded esnap clone. The new snapshot is now a
1115 		 * degraded esnap clone. The previous clone is now a regular clone of a blob. Update
1116 		 * the set of directly-related clones to the missing external snapshot.
1117 		 */
1118 		struct spdk_lvs_degraded_lvol_set *degraded_set = req->origlvol->degraded_set;
1119 
1120 		lvs_degraded_lvol_set_remove(degraded_set, req->origlvol);
1121 		lvs_degraded_lvol_set_add(degraded_set, req->lvol);
1122 	}
1123 
1124 	spdk_bs_open_blob_ext(bs, blobid, &opts, lvol_create_open_cb, req);
1125 }
1126 
1127 static void
1128 lvol_get_xattr_value(void *xattr_ctx, const char *name,
1129 		     const void **value, size_t *value_len)
1130 {
1131 	struct spdk_lvol *lvol = xattr_ctx;
1132 
1133 	if (!strcmp(LVOL_NAME, name)) {
1134 		*value = lvol->name;
1135 		*value_len = SPDK_LVOL_NAME_MAX;
1136 		return;
1137 	}
1138 	if (!strcmp("uuid", name)) {
1139 		*value = lvol->uuid_str;
1140 		*value_len = sizeof(lvol->uuid_str);
1141 		return;
1142 	}
1143 	*value = NULL;
1144 	*value_len = 0;
1145 }
1146 
1147 static int
1148 lvs_verify_lvol_name(struct spdk_lvol_store *lvs, const char *name)
1149 {
1150 	struct spdk_lvol *tmp;
1151 
1152 	if (name == NULL || strnlen(name, SPDK_LVOL_NAME_MAX) == 0) {
1153 		SPDK_INFOLOG(lvol, "lvol name not provided.\n");
1154 		return -EINVAL;
1155 	}
1156 
1157 	if (strnlen(name, SPDK_LVOL_NAME_MAX) == SPDK_LVOL_NAME_MAX) {
1158 		SPDK_ERRLOG("Name has no null terminator.\n");
1159 		return -EINVAL;
1160 	}
1161 
1162 	TAILQ_FOREACH(tmp, &lvs->lvols, link) {
1163 		if (!strncmp(name, tmp->name, SPDK_LVOL_NAME_MAX)) {
1164 			SPDK_ERRLOG("lvol with name %s already exists\n", name);
1165 			return -EEXIST;
1166 		}
1167 	}
1168 
1169 	TAILQ_FOREACH(tmp, &lvs->pending_lvols, link) {
1170 		if (!strncmp(name, tmp->name, SPDK_LVOL_NAME_MAX)) {
1171 			SPDK_ERRLOG("lvol with name %s is being already created\n", name);
1172 			return -EEXIST;
1173 		}
1174 	}
1175 
1176 	return 0;
1177 }
1178 
1179 int
1180 spdk_lvol_create(struct spdk_lvol_store *lvs, const char *name, uint64_t sz,
1181 		 bool thin_provision, enum lvol_clear_method clear_method, spdk_lvol_op_with_handle_complete cb_fn,
1182 		 void *cb_arg)
1183 {
1184 	struct spdk_lvol_with_handle_req *req;
1185 	struct spdk_blob_store *bs;
1186 	struct spdk_lvol *lvol;
1187 	struct spdk_blob_opts opts;
1188 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1189 	int rc;
1190 
1191 	if (lvs == NULL) {
1192 		SPDK_ERRLOG("lvol store does not exist\n");
1193 		return -EINVAL;
1194 	}
1195 
1196 	rc = lvs_verify_lvol_name(lvs, name);
1197 	if (rc < 0) {
1198 		return rc;
1199 	}
1200 
1201 	bs = lvs->blobstore;
1202 
1203 	req = calloc(1, sizeof(*req));
1204 	if (!req) {
1205 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1206 		return -ENOMEM;
1207 	}
1208 	req->cb_fn = cb_fn;
1209 	req->cb_arg = cb_arg;
1210 
1211 	lvol = lvol_alloc(lvs, name, thin_provision, clear_method);
1212 	if (!lvol) {
1213 		free(req);
1214 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1215 		return -ENOMEM;
1216 	}
1217 
1218 	req->lvol = lvol;
1219 	spdk_blob_opts_init(&opts, sizeof(opts));
1220 	opts.thin_provision = thin_provision;
1221 	opts.num_clusters = spdk_divide_round_up(sz, spdk_bs_get_cluster_size(bs));
1222 	opts.clear_method = lvol->clear_method;
1223 	opts.xattrs.count = SPDK_COUNTOF(xattr_names);
1224 	opts.xattrs.names = xattr_names;
1225 	opts.xattrs.ctx = lvol;
1226 	opts.xattrs.get_value = lvol_get_xattr_value;
1227 
1228 	spdk_bs_create_blob_ext(lvs->blobstore, &opts, lvol_create_cb, req);
1229 
1230 	return 0;
1231 }
1232 
1233 int
1234 spdk_lvol_create_esnap_clone(const void *esnap_id, uint32_t id_len, uint64_t size_bytes,
1235 			     struct spdk_lvol_store *lvs, const char *clone_name,
1236 			     spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1237 {
1238 	struct spdk_lvol_with_handle_req *req;
1239 	struct spdk_blob_store *bs;
1240 	struct spdk_lvol *lvol;
1241 	struct spdk_blob_opts opts;
1242 	uint64_t cluster_sz;
1243 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1244 	int rc;
1245 
1246 	if (lvs == NULL) {
1247 		SPDK_ERRLOG("lvol store does not exist\n");
1248 		return -EINVAL;
1249 	}
1250 
1251 	rc = lvs_verify_lvol_name(lvs, clone_name);
1252 	if (rc < 0) {
1253 		return rc;
1254 	}
1255 
1256 	bs = lvs->blobstore;
1257 
1258 	cluster_sz = spdk_bs_get_cluster_size(bs);
1259 	if ((size_bytes % cluster_sz) != 0) {
1260 		SPDK_ERRLOG("Cannot create '%s/%s': size %" PRIu64 " is not an integer multiple of "
1261 			    "cluster size %" PRIu64 "\n", lvs->name, clone_name, size_bytes,
1262 			    cluster_sz);
1263 		return -EINVAL;
1264 	}
1265 
1266 	req = calloc(1, sizeof(*req));
1267 	if (!req) {
1268 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1269 		return -ENOMEM;
1270 	}
1271 	req->cb_fn = cb_fn;
1272 	req->cb_arg = cb_arg;
1273 
1274 	lvol = lvol_alloc(lvs, clone_name, true, LVOL_CLEAR_WITH_DEFAULT);
1275 	if (!lvol) {
1276 		free(req);
1277 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1278 		return -ENOMEM;
1279 	}
1280 	req->lvol = lvol;
1281 
1282 	spdk_blob_opts_init(&opts, sizeof(opts));
1283 	opts.esnap_id = esnap_id;
1284 	opts.esnap_id_len = id_len;
1285 	opts.thin_provision = true;
1286 	opts.num_clusters = spdk_divide_round_up(size_bytes, cluster_sz);
1287 	opts.clear_method = lvol->clear_method;
1288 	opts.xattrs.count = SPDK_COUNTOF(xattr_names);
1289 	opts.xattrs.names = xattr_names;
1290 	opts.xattrs.ctx = lvol;
1291 	opts.xattrs.get_value = lvol_get_xattr_value;
1292 
1293 	spdk_bs_create_blob_ext(lvs->blobstore, &opts, lvol_create_cb, req);
1294 
1295 	return 0;
1296 }
1297 
1298 void
1299 spdk_lvol_create_snapshot(struct spdk_lvol *origlvol, const char *snapshot_name,
1300 			  spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1301 {
1302 	struct spdk_lvol_store *lvs;
1303 	struct spdk_lvol *newlvol;
1304 	struct spdk_blob *origblob;
1305 	struct spdk_lvol_with_handle_req *req;
1306 	struct spdk_blob_xattr_opts snapshot_xattrs;
1307 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1308 	int rc;
1309 
1310 	if (origlvol == NULL) {
1311 		SPDK_INFOLOG(lvol, "Lvol not provided.\n");
1312 		cb_fn(cb_arg, NULL, -EINVAL);
1313 		return;
1314 	}
1315 
1316 	origblob = origlvol->blob;
1317 	lvs = origlvol->lvol_store;
1318 	if (lvs == NULL) {
1319 		SPDK_ERRLOG("lvol store does not exist\n");
1320 		cb_fn(cb_arg, NULL, -EINVAL);
1321 		return;
1322 	}
1323 
1324 	rc = lvs_verify_lvol_name(lvs, snapshot_name);
1325 	if (rc < 0) {
1326 		cb_fn(cb_arg, NULL, rc);
1327 		return;
1328 	}
1329 
1330 	req = calloc(1, sizeof(*req));
1331 	if (!req) {
1332 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1333 		cb_fn(cb_arg, NULL, -ENOMEM);
1334 		return;
1335 	}
1336 
1337 	newlvol = lvol_alloc(origlvol->lvol_store, snapshot_name, true,
1338 			     (enum lvol_clear_method)origlvol->clear_method);
1339 	if (!newlvol) {
1340 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1341 		free(req);
1342 		cb_fn(cb_arg, NULL, -ENOMEM);
1343 		return;
1344 	}
1345 
1346 	snapshot_xattrs.count = SPDK_COUNTOF(xattr_names);
1347 	snapshot_xattrs.ctx = newlvol;
1348 	snapshot_xattrs.names = xattr_names;
1349 	snapshot_xattrs.get_value = lvol_get_xattr_value;
1350 	req->lvol = newlvol;
1351 	req->origlvol = origlvol;
1352 	req->cb_fn = cb_fn;
1353 	req->cb_arg = cb_arg;
1354 
1355 	spdk_bs_create_snapshot(lvs->blobstore, spdk_blob_get_id(origblob), &snapshot_xattrs,
1356 				lvol_create_cb, req);
1357 }
1358 
1359 void
1360 spdk_lvol_create_clone(struct spdk_lvol *origlvol, const char *clone_name,
1361 		       spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1362 {
1363 	struct spdk_lvol *newlvol;
1364 	struct spdk_lvol_with_handle_req *req;
1365 	struct spdk_lvol_store *lvs;
1366 	struct spdk_blob *origblob;
1367 	struct spdk_blob_xattr_opts clone_xattrs;
1368 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1369 	int rc;
1370 
1371 	if (origlvol == NULL) {
1372 		SPDK_INFOLOG(lvol, "Lvol not provided.\n");
1373 		cb_fn(cb_arg, NULL, -EINVAL);
1374 		return;
1375 	}
1376 
1377 	origblob = origlvol->blob;
1378 	lvs = origlvol->lvol_store;
1379 	if (lvs == NULL) {
1380 		SPDK_ERRLOG("lvol store does not exist\n");
1381 		cb_fn(cb_arg, NULL, -EINVAL);
1382 		return;
1383 	}
1384 
1385 	rc = lvs_verify_lvol_name(lvs, clone_name);
1386 	if (rc < 0) {
1387 		cb_fn(cb_arg, NULL, rc);
1388 		return;
1389 	}
1390 
1391 	req = calloc(1, sizeof(*req));
1392 	if (!req) {
1393 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1394 		cb_fn(cb_arg, NULL, -ENOMEM);
1395 		return;
1396 	}
1397 
1398 	newlvol = lvol_alloc(lvs, clone_name, true, (enum lvol_clear_method)origlvol->clear_method);
1399 	if (!newlvol) {
1400 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1401 		free(req);
1402 		cb_fn(cb_arg, NULL, -ENOMEM);
1403 		return;
1404 	}
1405 
1406 	clone_xattrs.count = SPDK_COUNTOF(xattr_names);
1407 	clone_xattrs.ctx = newlvol;
1408 	clone_xattrs.names = xattr_names;
1409 	clone_xattrs.get_value = lvol_get_xattr_value;
1410 	req->lvol = newlvol;
1411 	req->cb_fn = cb_fn;
1412 	req->cb_arg = cb_arg;
1413 
1414 	spdk_bs_create_clone(lvs->blobstore, spdk_blob_get_id(origblob), &clone_xattrs,
1415 			     lvol_create_cb,
1416 			     req);
1417 }
1418 
1419 static void
1420 lvol_resize_done(void *cb_arg, int lvolerrno)
1421 {
1422 	struct spdk_lvol_req *req = cb_arg;
1423 
1424 	req->cb_fn(req->cb_arg,  lvolerrno);
1425 	free(req);
1426 }
1427 
1428 static void
1429 lvol_blob_resize_cb(void *cb_arg, int bserrno)
1430 {
1431 	struct spdk_lvol_req *req = cb_arg;
1432 	struct spdk_lvol *lvol = req->lvol;
1433 
1434 	if (bserrno != 0) {
1435 		req->cb_fn(req->cb_arg, bserrno);
1436 		free(req);
1437 		return;
1438 	}
1439 
1440 	spdk_blob_sync_md(lvol->blob, lvol_resize_done, req);
1441 }
1442 
1443 void
1444 spdk_lvol_resize(struct spdk_lvol *lvol, uint64_t sz,
1445 		 spdk_lvol_op_complete cb_fn, void *cb_arg)
1446 {
1447 	struct spdk_blob *blob = lvol->blob;
1448 	struct spdk_lvol_store *lvs = lvol->lvol_store;
1449 	struct spdk_lvol_req *req;
1450 	uint64_t new_clusters = spdk_divide_round_up(sz, spdk_bs_get_cluster_size(lvs->blobstore));
1451 
1452 	req = calloc(1, sizeof(*req));
1453 	if (!req) {
1454 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1455 		cb_fn(cb_arg, -ENOMEM);
1456 		return;
1457 	}
1458 	req->cb_fn = cb_fn;
1459 	req->cb_arg = cb_arg;
1460 	req->lvol = lvol;
1461 
1462 	spdk_blob_resize(blob, new_clusters, lvol_blob_resize_cb, req);
1463 }
1464 
1465 static void
1466 lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
1467 {
1468 	struct spdk_lvol_req *req = cb_arg;
1469 
1470 	req->cb_fn(req->cb_arg, lvolerrno);
1471 	free(req);
1472 }
1473 
1474 void
1475 spdk_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1476 {
1477 	struct spdk_lvol_req *req;
1478 
1479 	req = calloc(1, sizeof(*req));
1480 	if (!req) {
1481 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1482 		cb_fn(cb_arg, -ENOMEM);
1483 		return;
1484 	}
1485 	req->cb_fn = cb_fn;
1486 	req->cb_arg = cb_arg;
1487 
1488 	spdk_blob_set_read_only(lvol->blob);
1489 	spdk_blob_sync_md(lvol->blob, lvol_set_read_only_cb, req);
1490 }
1491 
1492 static void
1493 lvol_rename_cb(void *cb_arg, int lvolerrno)
1494 {
1495 	struct spdk_lvol_req *req = cb_arg;
1496 
1497 	if (lvolerrno != 0) {
1498 		SPDK_ERRLOG("Lvol rename operation failed\n");
1499 	} else {
1500 		snprintf(req->lvol->name, sizeof(req->lvol->name), "%s", req->name);
1501 	}
1502 
1503 	req->cb_fn(req->cb_arg, lvolerrno);
1504 	free(req);
1505 }
1506 
1507 void
1508 spdk_lvol_rename(struct spdk_lvol *lvol, const char *new_name,
1509 		 spdk_lvol_op_complete cb_fn, void *cb_arg)
1510 {
1511 	struct spdk_lvol *tmp;
1512 	struct spdk_blob *blob = lvol->blob;
1513 	struct spdk_lvol_req *req;
1514 	int rc;
1515 
1516 	/* Check if new name is current lvol name.
1517 	 * If so, return success immediately */
1518 	if (strncmp(lvol->name, new_name, SPDK_LVOL_NAME_MAX) == 0) {
1519 		cb_fn(cb_arg, 0);
1520 		return;
1521 	}
1522 
1523 	/* Check if lvol with 'new_name' already exists in lvolstore */
1524 	TAILQ_FOREACH(tmp, &lvol->lvol_store->lvols, link) {
1525 		if (strncmp(tmp->name, new_name, SPDK_LVOL_NAME_MAX) == 0) {
1526 			SPDK_ERRLOG("Lvol %s already exists in lvol store %s\n", new_name, lvol->lvol_store->name);
1527 			cb_fn(cb_arg, -EEXIST);
1528 			return;
1529 		}
1530 	}
1531 
1532 	req = calloc(1, sizeof(*req));
1533 	if (!req) {
1534 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1535 		cb_fn(cb_arg, -ENOMEM);
1536 		return;
1537 	}
1538 	req->cb_fn = cb_fn;
1539 	req->cb_arg = cb_arg;
1540 	req->lvol = lvol;
1541 	snprintf(req->name, sizeof(req->name), "%s", new_name);
1542 
1543 	rc = spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1544 	if (rc < 0) {
1545 		free(req);
1546 		cb_fn(cb_arg, rc);
1547 		return;
1548 	}
1549 
1550 	spdk_blob_sync_md(blob, lvol_rename_cb, req);
1551 }
1552 
1553 void
1554 spdk_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1555 {
1556 	struct spdk_lvol_req *req;
1557 	struct spdk_blob_store *bs;
1558 	struct spdk_lvol_store	*lvs;
1559 	spdk_blob_id	clone_id;
1560 	size_t		count = 1;
1561 	int		rc;
1562 
1563 	assert(cb_fn != NULL);
1564 
1565 	if (lvol == NULL) {
1566 		SPDK_ERRLOG("lvol does not exist\n");
1567 		cb_fn(cb_arg, -ENODEV);
1568 		return;
1569 	}
1570 
1571 	lvs = lvol->lvol_store;
1572 
1573 	if (lvol->ref_count != 0) {
1574 		SPDK_ERRLOG("Cannot destroy lvol %s because it is still open\n", lvol->unique_id);
1575 		cb_fn(cb_arg, -EBUSY);
1576 		return;
1577 	}
1578 
1579 	req = calloc(1, sizeof(*req));
1580 	if (!req) {
1581 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1582 		cb_fn(cb_arg, -ENOMEM);
1583 		return;
1584 	}
1585 
1586 	req->cb_fn = cb_fn;
1587 	req->cb_arg = cb_arg;
1588 	req->lvol = lvol;
1589 	bs = lvol->lvol_store->blobstore;
1590 
1591 	rc = spdk_blob_get_clones(lvs->blobstore, lvol->blob_id, &clone_id, &count);
1592 	if (rc == 0 && count == 1) {
1593 		req->clone_lvol = lvs_get_lvol_by_blob_id(lvs, clone_id);
1594 	} else if (rc == -ENOMEM) {
1595 		SPDK_INFOLOG(lvol, "lvol %s: cannot destroy: has %" PRIu64 " clones\n",
1596 			     lvol->unique_id, count);
1597 		free(req);
1598 		assert(count > 1);
1599 		cb_fn(cb_arg, -EBUSY);
1600 		return;
1601 	}
1602 
1603 	lvol->action_in_progress = true;
1604 
1605 	spdk_bs_delete_blob(bs, lvol->blob_id, lvol_delete_blob_cb, req);
1606 }
1607 
1608 void
1609 spdk_lvol_close(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1610 {
1611 	struct spdk_lvol_req *req;
1612 
1613 	assert(cb_fn != NULL);
1614 
1615 	if (lvol == NULL) {
1616 		SPDK_ERRLOG("lvol does not exist\n");
1617 		cb_fn(cb_arg, -ENODEV);
1618 		return;
1619 	}
1620 
1621 	if (lvol->ref_count > 1) {
1622 		lvol->ref_count--;
1623 		cb_fn(cb_arg, 0);
1624 		return;
1625 	} else if (lvol->ref_count == 0) {
1626 		cb_fn(cb_arg, -EINVAL);
1627 		return;
1628 	}
1629 
1630 	req = calloc(1, sizeof(*req));
1631 	if (!req) {
1632 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1633 		cb_fn(cb_arg, -ENOMEM);
1634 		return;
1635 	}
1636 
1637 	req->cb_fn = cb_fn;
1638 	req->cb_arg = cb_arg;
1639 	req->lvol = lvol;
1640 
1641 	lvol->action_in_progress = true;
1642 
1643 	spdk_blob_close(lvol->blob, lvol_close_blob_cb, req);
1644 }
1645 
1646 struct spdk_io_channel *
1647 spdk_lvol_get_io_channel(struct spdk_lvol *lvol)
1648 {
1649 	return spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1650 }
1651 
1652 static void
1653 lvol_inflate_cb(void *cb_arg, int lvolerrno)
1654 {
1655 	struct spdk_lvol_req *req = cb_arg;
1656 
1657 	spdk_bs_free_io_channel(req->channel);
1658 
1659 	if (lvolerrno < 0) {
1660 		SPDK_ERRLOG("Could not inflate lvol\n");
1661 	}
1662 
1663 	req->cb_fn(req->cb_arg, lvolerrno);
1664 	free(req);
1665 }
1666 
1667 void
1668 spdk_lvol_inflate(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1669 {
1670 	struct spdk_lvol_req *req;
1671 	spdk_blob_id blob_id;
1672 
1673 	assert(cb_fn != NULL);
1674 
1675 	if (lvol == NULL) {
1676 		SPDK_ERRLOG("Lvol does not exist\n");
1677 		cb_fn(cb_arg, -ENODEV);
1678 		return;
1679 	}
1680 
1681 	req = calloc(1, sizeof(*req));
1682 	if (!req) {
1683 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1684 		cb_fn(cb_arg, -ENOMEM);
1685 		return;
1686 	}
1687 
1688 	req->cb_fn = cb_fn;
1689 	req->cb_arg = cb_arg;
1690 	req->channel = spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1691 	if (req->channel == NULL) {
1692 		SPDK_ERRLOG("Cannot alloc io channel for lvol inflate request\n");
1693 		free(req);
1694 		cb_fn(cb_arg, -ENOMEM);
1695 		return;
1696 	}
1697 
1698 	blob_id = spdk_blob_get_id(lvol->blob);
1699 	spdk_bs_inflate_blob(lvol->lvol_store->blobstore, req->channel, blob_id, lvol_inflate_cb,
1700 			     req);
1701 }
1702 
1703 void
1704 spdk_lvol_decouple_parent(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1705 {
1706 	struct spdk_lvol_req *req;
1707 	spdk_blob_id blob_id;
1708 
1709 	assert(cb_fn != NULL);
1710 
1711 	if (lvol == NULL) {
1712 		SPDK_ERRLOG("Lvol does not exist\n");
1713 		cb_fn(cb_arg, -ENODEV);
1714 		return;
1715 	}
1716 
1717 	req = calloc(1, sizeof(*req));
1718 	if (!req) {
1719 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1720 		cb_fn(cb_arg, -ENOMEM);
1721 		return;
1722 	}
1723 
1724 	req->cb_fn = cb_fn;
1725 	req->cb_arg = cb_arg;
1726 	req->channel = spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1727 	if (req->channel == NULL) {
1728 		SPDK_ERRLOG("Cannot alloc io channel for lvol inflate request\n");
1729 		free(req);
1730 		cb_fn(cb_arg, -ENOMEM);
1731 		return;
1732 	}
1733 
1734 	blob_id = spdk_blob_get_id(lvol->blob);
1735 	spdk_bs_blob_decouple_parent(lvol->lvol_store->blobstore, req->channel, blob_id,
1736 				     lvol_inflate_cb, req);
1737 }
1738 
1739 static void
1740 lvs_grow_live_cb(void *cb_arg, int lvolerrno)
1741 {
1742 	struct spdk_lvs_req *req = (struct spdk_lvs_req *)cb_arg;
1743 
1744 	if (req->cb_fn) {
1745 		req->cb_fn(req->cb_arg, lvolerrno);
1746 	}
1747 	free(req);
1748 	return;
1749 }
1750 
1751 void
1752 spdk_lvs_grow_live(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
1753 {
1754 	struct spdk_lvs_req *req;
1755 
1756 	req = calloc(1, sizeof(*req));
1757 	if (req == NULL) {
1758 		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
1759 		if (cb_fn) {
1760 			cb_fn(cb_arg, -ENOMEM);
1761 		}
1762 		return;
1763 	}
1764 
1765 	req->cb_fn = cb_fn;
1766 	req->cb_arg = cb_arg;
1767 	req->lvol_store = lvs;
1768 
1769 	spdk_bs_grow_live(lvs->blobstore, lvs_grow_live_cb, req);
1770 }
1771 
1772 void
1773 spdk_lvs_grow(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
1774 {
1775 	struct spdk_lvs_with_handle_req *req;
1776 	struct spdk_bs_opts opts = {};
1777 
1778 	assert(cb_fn != NULL);
1779 
1780 	if (bs_dev == NULL) {
1781 		SPDK_ERRLOG("Blobstore device does not exist\n");
1782 		cb_fn(cb_arg, NULL, -ENODEV);
1783 		return;
1784 	}
1785 
1786 	req = calloc(1, sizeof(*req));
1787 	if (req == NULL) {
1788 		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
1789 		cb_fn(cb_arg, NULL, -ENOMEM);
1790 		return;
1791 	}
1792 
1793 	req->lvol_store = lvs_alloc();
1794 	if (req->lvol_store == NULL) {
1795 		SPDK_ERRLOG("Cannot alloc memory for lvol store\n");
1796 		free(req);
1797 		cb_fn(cb_arg, NULL, -ENOMEM);
1798 		return;
1799 	}
1800 	req->cb_fn = cb_fn;
1801 	req->cb_arg = cb_arg;
1802 	req->bs_dev = bs_dev;
1803 
1804 	lvs_bs_opts_init(&opts);
1805 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "LVOLSTORE");
1806 
1807 	spdk_bs_grow(bs_dev, &opts, lvs_load_cb, req);
1808 }
1809 
1810 static struct spdk_lvol *
1811 lvs_get_lvol_by_blob_id(struct spdk_lvol_store *lvs, spdk_blob_id blob_id)
1812 {
1813 	struct spdk_lvol *lvol;
1814 
1815 	TAILQ_FOREACH(lvol, &lvs->lvols, link) {
1816 		if (lvol->blob_id == blob_id) {
1817 			return lvol;
1818 		}
1819 	}
1820 	return NULL;
1821 }
1822 
1823 static int
1824 lvs_esnap_bs_dev_create(void *bs_ctx, void *blob_ctx, struct spdk_blob *blob,
1825 			const void *esnap_id, uint32_t id_len,
1826 			struct spdk_bs_dev **bs_dev)
1827 {
1828 	struct spdk_lvol_store	*lvs = bs_ctx;
1829 	struct spdk_lvol	*lvol = blob_ctx;
1830 	spdk_blob_id		blob_id = spdk_blob_get_id(blob);
1831 
1832 	if (lvs == NULL) {
1833 		if (lvol == NULL || lvol->lvol_store == NULL) {
1834 			SPDK_ERRLOG("Blob 0x%" PRIx64 ": no lvs context nor lvol context\n",
1835 				    blob_id);
1836 			return -EINVAL;
1837 		}
1838 		lvs = lvol->lvol_store;
1839 	}
1840 
1841 	/*
1842 	 * When spdk_lvs_load() is called, it iterates through all blobs in its blobstore building
1843 	 * up a list of lvols (lvs->lvols). During this initial iteration, each blob is opened,
1844 	 * passed to load_next_lvol(), then closed. There is no need to open the external snapshot
1845 	 * during this phase. Once the blobstore is loaded, lvs->load_esnaps is set to true so that
1846 	 * future lvol opens cause the external snapshot to be loaded.
1847 	 */
1848 	if (!lvs->load_esnaps) {
1849 		*bs_dev = NULL;
1850 		return 0;
1851 	}
1852 
1853 	if (lvol == NULL) {
1854 		spdk_blob_id blob_id = spdk_blob_get_id(blob);
1855 
1856 		/*
1857 		 * If spdk_bs_blob_open() is used instead of spdk_bs_blob_open_ext() the lvol will
1858 		 * not have been passed in. The same is true if the open happens spontaneously due
1859 		 * to blobstore activity.
1860 		 */
1861 		lvol = lvs_get_lvol_by_blob_id(lvs, blob_id);
1862 		if (lvol == NULL) {
1863 			SPDK_ERRLOG("lvstore %s: no lvol for blob 0x%" PRIx64 "\n",
1864 				    lvs->name, blob_id);
1865 			return -ENODEV;
1866 		}
1867 	}
1868 
1869 	return lvs->esnap_bs_dev_create(lvs, lvol, blob, esnap_id, id_len, bs_dev);
1870 }
1871 
1872 /*
1873  * The theory of missing external snapshots
1874  *
1875  * The lvs->esnap_bs_dev_create() callback may be unable to create an external snapshot bs_dev when
1876  * it is called. This can happen, for instance, as when the device containing the lvolstore is
1877  * examined prior to spdk_bdev_register() being called on a bdev that acts as an external snapshot.
1878  * In such a case, the esnap_bs_dev_create() callback will call spdk_lvs_esnap_missing_add().
1879  *
1880  * Missing external snapshots are tracked in a per-lvolstore tree, lvs->degraded_lvol_sets_tree.
1881  * Each tree node (struct spdk_lvs_degraded_lvol_set) contains a tailq of lvols that are missing
1882  * that particular external snapshot.
1883  *
1884  * When a potential missing snapshot becomes available, spdk_lvs_notify_hotplug() may be called to
1885  * notify this library that it is available. It will then iterate through the active lvolstores and
1886  * search each lvs->degraded_lvol_sets_tree for a set of degraded lvols that are missing an external
1887  * snapshot matching the id passed in the notification. The lvols in the tailq on each matching tree
1888  * node are then asked to create an external snapshot bs_dev using the esnap_bs_dev_create()
1889  * callback that the consumer registered with the lvolstore. If lvs->esnap_bs_dev_create() returns
1890  * 0, the lvol is removed from the spdk_lvs_degraded_lvol_set's lvol tailq. When this tailq becomes
1891  * empty, the degraded lvol set node for this missing external snapshot is removed.
1892  */
1893 static int
1894 lvs_esnap_name_cmp(struct spdk_lvs_degraded_lvol_set *m1, struct spdk_lvs_degraded_lvol_set *m2)
1895 {
1896 	if (m1->id_len == m2->id_len) {
1897 		return memcmp(m1->esnap_id, m2->esnap_id, m1->id_len);
1898 	}
1899 	return (m1->id_len > m2->id_len) ? 1 : -1;
1900 }
1901 
1902 RB_GENERATE_STATIC(degraded_lvol_sets_tree, spdk_lvs_degraded_lvol_set, node, lvs_esnap_name_cmp)
1903 
1904 static void
1905 lvs_degraded_lvol_set_add(struct spdk_lvs_degraded_lvol_set *degraded_set, struct spdk_lvol *lvol)
1906 {
1907 	assert(lvol->lvol_store->thread == spdk_get_thread());
1908 
1909 	lvol->degraded_set = degraded_set;
1910 	TAILQ_INSERT_TAIL(&degraded_set->lvols, lvol, degraded_link);
1911 }
1912 
1913 static void
1914 lvs_degraded_lvol_set_remove(struct spdk_lvs_degraded_lvol_set *degraded_set,
1915 			     struct spdk_lvol *lvol)
1916 {
1917 	assert(lvol->lvol_store->thread == spdk_get_thread());
1918 
1919 	lvol->degraded_set = NULL;
1920 	TAILQ_REMOVE(&degraded_set->lvols, lvol, degraded_link);
1921 	/* degraded_set->lvols may be empty. Caller should check if not immediately adding a new
1922 	 * lvol. */
1923 }
1924 
1925 /*
1926  * Record in lvs->degraded_lvol_sets_tree that a bdev of the specified name is needed by the
1927  * specified lvol.
1928  */
1929 int
1930 spdk_lvs_esnap_missing_add(struct spdk_lvol_store *lvs, struct spdk_lvol *lvol,
1931 			   const void *esnap_id, uint32_t id_len)
1932 {
1933 	struct spdk_lvs_degraded_lvol_set find, *degraded_set;
1934 
1935 	assert(lvs->thread == spdk_get_thread());
1936 
1937 	find.esnap_id = esnap_id;
1938 	find.id_len = id_len;
1939 	degraded_set = RB_FIND(degraded_lvol_sets_tree, &lvs->degraded_lvol_sets_tree, &find);
1940 	if (degraded_set == NULL) {
1941 		degraded_set = calloc(1, sizeof(*degraded_set));
1942 		if (degraded_set == NULL) {
1943 			SPDK_ERRLOG("lvol %s: cannot create degraded_set node: out of memory\n",
1944 				    lvol->unique_id);
1945 			return -ENOMEM;
1946 		}
1947 		degraded_set->esnap_id = calloc(1, id_len);
1948 		if (degraded_set->esnap_id == NULL) {
1949 			free(degraded_set);
1950 			SPDK_ERRLOG("lvol %s: cannot create degraded_set node: out of memory\n",
1951 				    lvol->unique_id);
1952 			return -ENOMEM;
1953 		}
1954 		memcpy((void *)degraded_set->esnap_id, esnap_id, id_len);
1955 		degraded_set->id_len = id_len;
1956 		degraded_set->lvol_store = lvs;
1957 		TAILQ_INIT(&degraded_set->lvols);
1958 		RB_INSERT(degraded_lvol_sets_tree, &lvs->degraded_lvol_sets_tree, degraded_set);
1959 	}
1960 
1961 	lvs_degraded_lvol_set_add(degraded_set, lvol);
1962 
1963 	return 0;
1964 }
1965 
1966 /*
1967  * Remove the record of the specified lvol needing a degraded_set bdev.
1968  */
1969 void
1970 spdk_lvs_esnap_missing_remove(struct spdk_lvol *lvol)
1971 {
1972 	struct spdk_lvol_store		*lvs = lvol->lvol_store;
1973 	struct spdk_lvs_degraded_lvol_set	*degraded_set = lvol->degraded_set;
1974 
1975 	assert(lvs->thread == spdk_get_thread());
1976 
1977 	if (degraded_set == NULL) {
1978 		return;
1979 	}
1980 
1981 	lvs_degraded_lvol_set_remove(degraded_set, lvol);
1982 
1983 	if (!TAILQ_EMPTY(&degraded_set->lvols)) {
1984 		return;
1985 	}
1986 
1987 	RB_REMOVE(degraded_lvol_sets_tree, &lvs->degraded_lvol_sets_tree, degraded_set);
1988 
1989 	free((char *)degraded_set->esnap_id);
1990 	free(degraded_set);
1991 }
1992 
1993 struct lvs_esnap_hotplug_req {
1994 	struct spdk_lvol			*lvol;
1995 	spdk_lvol_op_with_handle_complete	cb_fn;
1996 	void					*cb_arg;
1997 };
1998 
1999 static void
2000 lvs_esnap_hotplug_done(void *cb_arg, int bserrno)
2001 {
2002 	struct lvs_esnap_hotplug_req *req = cb_arg;
2003 	struct spdk_lvol	*lvol = req->lvol;
2004 	struct spdk_lvol_store	*lvs = lvol->lvol_store;
2005 
2006 	if (bserrno != 0) {
2007 		SPDK_ERRLOG("lvol %s/%s: failed to hotplug blob_bdev due to error %d\n",
2008 			    lvs->name, lvol->name, bserrno);
2009 	}
2010 	req->cb_fn(req->cb_arg, lvol, bserrno);
2011 	free(req);
2012 }
2013 
2014 static void
2015 lvs_esnap_degraded_hotplug(struct spdk_lvs_degraded_lvol_set *degraded_set,
2016 			   spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
2017 {
2018 	struct spdk_lvol_store	*lvs = degraded_set->lvol_store;
2019 	struct spdk_lvol	*lvol, *tmp, *last_missing;
2020 	struct spdk_bs_dev	*bs_dev;
2021 	const void		*esnap_id = degraded_set->esnap_id;
2022 	uint32_t		id_len = degraded_set->id_len;
2023 	struct lvs_esnap_hotplug_req *req;
2024 	int			rc;
2025 
2026 	assert(lvs->thread == spdk_get_thread());
2027 
2028 	/*
2029 	 * When lvs->esnap_bs_bdev_create() tries to load an external snapshot, it can encounter
2030 	 * errors that lead it to calling spdk_lvs_esnap_missing_add(). This function needs to be
2031 	 * sure that such modifications do not lead to degraded_set->lvols tailqs or references
2032 	 * to memory that this function will free.
2033 	 *
2034 	 * While this function is running, no other thread can add items to degraded_set->lvols. If
2035 	 * the list is mutated, it must have been done by this function or something in its call
2036 	 * graph running on this thread.
2037 	 */
2038 
2039 	/* Remember the last lvol on the list. Iteration will stop once it has been processed. */
2040 	last_missing = TAILQ_LAST(&degraded_set->lvols, degraded_lvols);
2041 
2042 	TAILQ_FOREACH_SAFE(lvol, &degraded_set->lvols, degraded_link, tmp) {
2043 		req = calloc(1, sizeof(*req));
2044 		if (req == NULL) {
2045 			SPDK_ERRLOG("lvol %s: failed to create esnap bs_dev: out of memory\n",
2046 				    lvol->unique_id);
2047 			cb_fn(cb_arg, lvol, -ENOMEM);
2048 			/* The next one likely won't succeed either, but keep going so that all the
2049 			 * failed hotplugs are logged.
2050 			 */
2051 			goto next;
2052 		}
2053 
2054 		/*
2055 		 * Remove the lvol from the tailq so that tailq corruption is avoided if
2056 		 * lvs->esnap_bs_dev_create() calls spdk_lvs_esnap_missing_add(lvol).
2057 		 */
2058 		TAILQ_REMOVE(&degraded_set->lvols, lvol, degraded_link);
2059 		lvol->degraded_set = NULL;
2060 
2061 		bs_dev = NULL;
2062 		rc = lvs->esnap_bs_dev_create(lvs, lvol, lvol->blob, esnap_id, id_len, &bs_dev);
2063 		if (rc != 0) {
2064 			SPDK_ERRLOG("lvol %s: failed to create esnap bs_dev: error %d\n",
2065 				    lvol->unique_id, rc);
2066 			lvol->degraded_set = degraded_set;
2067 			TAILQ_INSERT_TAIL(&degraded_set->lvols, lvol, degraded_link);
2068 			cb_fn(cb_arg, lvol, rc);
2069 			free(req);
2070 			goto next;
2071 		}
2072 
2073 		req->lvol = lvol;
2074 		req->cb_fn = cb_fn;
2075 		req->cb_arg = cb_arg;
2076 		spdk_blob_set_esnap_bs_dev(lvol->blob, bs_dev, lvs_esnap_hotplug_done, req);
2077 
2078 next:
2079 		if (lvol == last_missing) {
2080 			/*
2081 			 * Anything after last_missing was added due to some problem encountered
2082 			 * while trying to create the esnap bs_dev.
2083 			 */
2084 			break;
2085 		}
2086 	}
2087 
2088 	if (TAILQ_EMPTY(&degraded_set->lvols)) {
2089 		RB_REMOVE(degraded_lvol_sets_tree, &lvs->degraded_lvol_sets_tree, degraded_set);
2090 		free((void *)degraded_set->esnap_id);
2091 		free(degraded_set);
2092 	}
2093 }
2094 
2095 /*
2096  * Notify each lvstore created on this thread that is missing a bdev by the specified name or uuid
2097  * that the bdev now exists.
2098  */
2099 bool
2100 spdk_lvs_notify_hotplug(const void *esnap_id, uint32_t id_len,
2101 			spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
2102 {
2103 	struct spdk_lvs_degraded_lvol_set *found;
2104 	struct spdk_lvs_degraded_lvol_set find = { 0 };
2105 	struct spdk_lvol_store	*lvs;
2106 	struct spdk_thread	*thread = spdk_get_thread();
2107 	bool			ret = false;
2108 
2109 	find.esnap_id = esnap_id;
2110 	find.id_len = id_len;
2111 
2112 	pthread_mutex_lock(&g_lvol_stores_mutex);
2113 	TAILQ_FOREACH(lvs, &g_lvol_stores, link) {
2114 		if (thread != lvs->thread) {
2115 			/*
2116 			 * It is expected that this is called from vbdev_lvol's examine_config()
2117 			 * callback. The lvstore was likely loaded do a creation happening as a
2118 			 * result of an RPC call or opening of an existing lvstore via
2119 			 * examine_disk() callback. RPC calls, examine_disk(), and examine_config()
2120 			 * should all be happening only on the app thread. The "wrong thread"
2121 			 * condition will only happen when an application is doing something weird.
2122 			 */
2123 			SPDK_NOTICELOG("Discarded examine for lvstore %s: wrong thread\n",
2124 				       lvs->name);
2125 			continue;
2126 		}
2127 
2128 		found = RB_FIND(degraded_lvol_sets_tree, &lvs->degraded_lvol_sets_tree, &find);
2129 		if (found == NULL) {
2130 			continue;
2131 		}
2132 
2133 		ret = true;
2134 		lvs_esnap_degraded_hotplug(found, cb_fn, cb_arg);
2135 	}
2136 	pthread_mutex_unlock(&g_lvol_stores_mutex);
2137 
2138 	return ret;
2139 }
2140 
2141 int
2142 spdk_lvol_iter_immediate_clones(struct spdk_lvol *lvol, spdk_lvol_iter_cb cb_fn, void *cb_arg)
2143 {
2144 	struct spdk_lvol_store *lvs = lvol->lvol_store;
2145 	struct spdk_blob_store *bs = lvs->blobstore;
2146 	struct spdk_lvol *clone;
2147 	spdk_blob_id *ids;
2148 	size_t id_cnt = 0;
2149 	size_t i;
2150 	int rc;
2151 
2152 	rc = spdk_blob_get_clones(bs, lvol->blob_id, NULL, &id_cnt);
2153 	if (rc != -ENOMEM) {
2154 		/* -ENOMEM says id_cnt is valid, no other errors should be returned. */
2155 		assert(rc == 0);
2156 		return rc;
2157 	}
2158 
2159 	ids = calloc(id_cnt, sizeof(*ids));
2160 	if (ids == NULL) {
2161 		SPDK_ERRLOG("lvol %s: out of memory while iterating clones\n", lvol->unique_id);
2162 		return -ENOMEM;
2163 	}
2164 
2165 	rc = spdk_blob_get_clones(bs, lvol->blob_id, ids, &id_cnt);
2166 	if (rc != 0) {
2167 		SPDK_ERRLOG("lvol %s: unable to get clone blob IDs: %d\n", lvol->unique_id, rc);
2168 		free(ids);
2169 		return rc;
2170 	}
2171 
2172 	for (i = 0; i < id_cnt; i++) {
2173 		clone = lvs_get_lvol_by_blob_id(lvs, ids[i]);
2174 		if (clone == NULL) {
2175 			SPDK_NOTICELOG("lvol %s: unable to find clone lvol with blob id 0x%"
2176 				       PRIx64 "\n", lvol->unique_id, ids[i]);
2177 			continue;
2178 		}
2179 		rc = cb_fn(cb_arg, clone);
2180 		if (rc != 0) {
2181 			SPDK_DEBUGLOG(lvol, "lvol %s: iteration stopped when lvol %s (blob 0x%"
2182 				      PRIx64 ") returned %d\n", lvol->unique_id, clone->unique_id,
2183 				      ids[i], rc);
2184 			break;
2185 		}
2186 	}
2187 
2188 	free(ids);
2189 	return rc;
2190 }
2191 
2192 struct spdk_lvol *
2193 spdk_lvol_get_by_uuid(const struct spdk_uuid *uuid)
2194 {
2195 	struct spdk_lvol_store *lvs;
2196 	struct spdk_lvol *lvol;
2197 
2198 	pthread_mutex_lock(&g_lvol_stores_mutex);
2199 
2200 	TAILQ_FOREACH(lvs, &g_lvol_stores, link) {
2201 		TAILQ_FOREACH(lvol, &lvs->lvols, link) {
2202 			if (spdk_uuid_compare(uuid, &lvol->uuid) == 0) {
2203 				pthread_mutex_unlock(&g_lvol_stores_mutex);
2204 				return lvol;
2205 			}
2206 		}
2207 	}
2208 
2209 	pthread_mutex_unlock(&g_lvol_stores_mutex);
2210 	return NULL;
2211 }
2212 
2213 struct spdk_lvol *
2214 spdk_lvol_get_by_names(const char *lvs_name, const char *lvol_name)
2215 {
2216 	struct spdk_lvol_store *lvs;
2217 	struct spdk_lvol *lvol;
2218 
2219 	pthread_mutex_lock(&g_lvol_stores_mutex);
2220 
2221 	TAILQ_FOREACH(lvs, &g_lvol_stores, link) {
2222 		if (strcmp(lvs_name, lvs->name) != 0) {
2223 			continue;
2224 		}
2225 		TAILQ_FOREACH(lvol, &lvs->lvols, link) {
2226 			if (strcmp(lvol_name, lvol->name) == 0) {
2227 				pthread_mutex_unlock(&g_lvol_stores_mutex);
2228 				return lvol;
2229 			}
2230 		}
2231 	}
2232 
2233 	pthread_mutex_unlock(&g_lvol_stores_mutex);
2234 	return NULL;
2235 }
2236 
2237 bool
2238 spdk_lvol_is_degraded(const struct spdk_lvol *lvol)
2239 {
2240 	struct spdk_blob *blob = lvol->blob;
2241 
2242 	if (blob == NULL) {
2243 		return true;
2244 	}
2245 	return spdk_blob_is_degraded(blob);
2246 }
2247 
2248 static void
2249 lvol_shallow_copy_cb(void *cb_arg, int lvolerrno)
2250 {
2251 	struct spdk_lvol_copy_req *req = cb_arg;
2252 	struct spdk_lvol *lvol = req->lvol;
2253 
2254 	spdk_bs_free_io_channel(req->channel);
2255 
2256 	if (lvolerrno < 0) {
2257 		SPDK_ERRLOG("Could not make a shallow copy of lvol %s, error %d\n", lvol->unique_id, lvolerrno);
2258 	}
2259 
2260 	req->cb_fn(req->cb_arg, lvolerrno);
2261 	free(req);
2262 }
2263 
2264 int
2265 spdk_lvol_shallow_copy(struct spdk_lvol *lvol, struct spdk_bs_dev *ext_dev,
2266 		       spdk_blob_shallow_copy_status status_cb_fn, void *status_cb_arg,
2267 		       spdk_lvol_op_complete cb_fn, void *cb_arg)
2268 {
2269 	struct spdk_lvol_copy_req *req;
2270 	spdk_blob_id blob_id;
2271 	int rc;
2272 
2273 	assert(cb_fn != NULL);
2274 
2275 	if (lvol == NULL) {
2276 		SPDK_ERRLOG("lvol must not be NULL\n");
2277 		return -EINVAL;
2278 	}
2279 
2280 	assert(lvol->lvol_store->thread == spdk_get_thread());
2281 
2282 	if (ext_dev == NULL) {
2283 		SPDK_ERRLOG("lvol %s shallow copy, ext_dev must not be NULL\n", lvol->unique_id);
2284 		return -EINVAL;
2285 	}
2286 
2287 	req = calloc(1, sizeof(*req));
2288 	if (!req) {
2289 		SPDK_ERRLOG("lvol %s shallow copy, cannot alloc memory for lvol request\n", lvol->unique_id);
2290 		return -ENOMEM;
2291 	}
2292 
2293 	req->lvol = lvol;
2294 	req->cb_fn = cb_fn;
2295 	req->cb_arg = cb_arg;
2296 	req->channel = spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
2297 	if (req->channel == NULL) {
2298 		SPDK_ERRLOG("lvol %s shallow copy, cannot alloc io channel for lvol request\n", lvol->unique_id);
2299 		free(req);
2300 		return -ENOMEM;
2301 	}
2302 
2303 	blob_id = spdk_blob_get_id(lvol->blob);
2304 
2305 	rc = spdk_bs_blob_shallow_copy(lvol->lvol_store->blobstore, req->channel, blob_id, ext_dev,
2306 				       status_cb_fn, status_cb_arg, lvol_shallow_copy_cb, req);
2307 
2308 	if (rc < 0) {
2309 		SPDK_ERRLOG("Could not make a shallow copy of lvol %s\n", lvol->unique_id);
2310 		spdk_bs_free_io_channel(req->channel);
2311 		free(req);
2312 	}
2313 
2314 	return rc;
2315 }
2316 
2317 static void
2318 lvol_set_parent_cb(void *cb_arg, int lvolerrno)
2319 {
2320 	struct spdk_lvol_req *req = cb_arg;
2321 
2322 	if (lvolerrno < 0) {
2323 		SPDK_ERRLOG("could not set parent of lvol %s, error %d\n", req->lvol->name, lvolerrno);
2324 	}
2325 
2326 	req->cb_fn(req->cb_arg, lvolerrno);
2327 	free(req);
2328 }
2329 
2330 void
2331 spdk_lvol_set_parent(struct spdk_lvol *lvol, struct spdk_lvol *snapshot,
2332 		     spdk_lvol_op_complete cb_fn, void *cb_arg)
2333 {
2334 	struct spdk_lvol_req *req;
2335 	spdk_blob_id blob_id, snapshot_id;
2336 
2337 	assert(cb_fn != NULL);
2338 
2339 	if (lvol == NULL) {
2340 		SPDK_ERRLOG("lvol must not be NULL\n");
2341 		cb_fn(cb_arg, -EINVAL);
2342 		return;
2343 	}
2344 
2345 	if (snapshot == NULL) {
2346 		SPDK_ERRLOG("snapshot must not be NULL\n");
2347 		cb_fn(cb_arg, -EINVAL);
2348 		return;
2349 	}
2350 
2351 	req = calloc(1, sizeof(*req));
2352 	if (!req) {
2353 		SPDK_ERRLOG("cannot alloc memory for lvol request pointer\n");
2354 		cb_fn(cb_arg, -ENOMEM);
2355 		return;
2356 	}
2357 
2358 	req->lvol = lvol;
2359 	req->cb_fn = cb_fn;
2360 	req->cb_arg = cb_arg;
2361 
2362 	blob_id = spdk_blob_get_id(lvol->blob);
2363 	snapshot_id = spdk_blob_get_id(snapshot->blob);
2364 
2365 	spdk_bs_blob_set_parent(lvol->lvol_store->blobstore, blob_id, snapshot_id,
2366 				lvol_set_parent_cb, req);
2367 }
2368 
2369 static void
2370 lvol_set_external_parent_cb(void *cb_arg, int lvolerrno)
2371 {
2372 	struct spdk_lvol_bs_dev_req *req = cb_arg;
2373 
2374 	if (lvolerrno < 0) {
2375 		SPDK_ERRLOG("could not set external parent of lvol %s, error %d\n", req->lvol->name, lvolerrno);
2376 		req->bs_dev->destroy(req->bs_dev);
2377 	}
2378 
2379 	req->cb_fn(req->cb_arg, lvolerrno);
2380 	free(req);
2381 }
2382 
2383 void
2384 spdk_lvol_set_external_parent(struct spdk_lvol *lvol, const void *esnap_id, uint32_t esnap_id_len,
2385 			      spdk_lvol_op_complete cb_fn, void *cb_arg)
2386 {
2387 	struct spdk_lvol_bs_dev_req *req;
2388 	struct spdk_bs_dev *bs_dev;
2389 	spdk_blob_id blob_id;
2390 	int rc;
2391 
2392 	assert(cb_fn != NULL);
2393 
2394 	if (lvol == NULL) {
2395 		SPDK_ERRLOG("lvol must not be NULL\n");
2396 		cb_fn(cb_arg, -EINVAL);
2397 		return;
2398 	}
2399 
2400 	if (esnap_id == NULL) {
2401 		SPDK_ERRLOG("snapshot must not be NULL\n");
2402 		cb_fn(cb_arg, -EINVAL);
2403 		return;
2404 	}
2405 
2406 	if (esnap_id_len == sizeof(lvol->uuid_str) &&
2407 	    memcmp(esnap_id, lvol->uuid_str, esnap_id_len) == 0) {
2408 		SPDK_ERRLOG("lvol %s and esnap have the same UUID\n", lvol->name);
2409 		cb_fn(cb_arg, -EINVAL);
2410 		return;
2411 	}
2412 
2413 	rc = lvs_esnap_bs_dev_create(lvol->lvol_store, lvol, lvol->blob, esnap_id, esnap_id_len, &bs_dev);
2414 	if (rc < 0) {
2415 		cb_fn(cb_arg, rc);
2416 		return;
2417 	}
2418 
2419 	req = calloc(1, sizeof(*req));
2420 	if (!req) {
2421 		SPDK_ERRLOG("cannot alloc memory for lvol request pointer\n");
2422 		cb_fn(cb_arg, -ENOMEM);
2423 		return;
2424 	}
2425 
2426 	req->lvol = lvol;
2427 	req->bs_dev = bs_dev;
2428 	req->cb_fn = cb_fn;
2429 	req->cb_arg = cb_arg;
2430 
2431 	blob_id = spdk_blob_get_id(lvol->blob);
2432 
2433 	spdk_bs_blob_set_external_parent(lvol->lvol_store->blobstore, blob_id, bs_dev, esnap_id,
2434 					 esnap_id_len, lvol_set_external_parent_cb, req);
2435 }
2436