xref: /spdk/lib/lvol/lvol.c (revision fecffda6ecf8853b82edccde429b68252f0a62c5)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk_internal/lvolstore.h"
8 #include "spdk/log.h"
9 #include "spdk/string.h"
10 #include "spdk/thread.h"
11 #include "spdk/blob_bdev.h"
12 #include "spdk/util.h"
13 
14 /* Default blob channel opts for lvol */
15 #define SPDK_LVOL_BLOB_OPTS_CHANNEL_OPS 512
16 
17 #define LVOL_NAME "name"
18 
19 SPDK_LOG_REGISTER_COMPONENT(lvol)
20 
21 static TAILQ_HEAD(, spdk_lvol_store) g_lvol_stores = TAILQ_HEAD_INITIALIZER(g_lvol_stores);
22 static pthread_mutex_t g_lvol_stores_mutex = PTHREAD_MUTEX_INITIALIZER;
23 
24 static int
25 add_lvs_to_list(struct spdk_lvol_store *lvs)
26 {
27 	struct spdk_lvol_store *tmp;
28 	bool name_conflict = false;
29 
30 	pthread_mutex_lock(&g_lvol_stores_mutex);
31 	TAILQ_FOREACH(tmp, &g_lvol_stores, link) {
32 		if (!strncmp(lvs->name, tmp->name, SPDK_LVS_NAME_MAX)) {
33 			name_conflict = true;
34 			break;
35 		}
36 	}
37 	if (!name_conflict) {
38 		lvs->on_list = true;
39 		TAILQ_INSERT_TAIL(&g_lvol_stores, lvs, link);
40 	}
41 	pthread_mutex_unlock(&g_lvol_stores_mutex);
42 
43 	return name_conflict ? -1 : 0;
44 }
45 
46 static void
47 lvs_free(struct spdk_lvol_store *lvs)
48 {
49 	pthread_mutex_lock(&g_lvol_stores_mutex);
50 	if (lvs->on_list) {
51 		TAILQ_REMOVE(&g_lvol_stores, lvs, link);
52 	}
53 	pthread_mutex_unlock(&g_lvol_stores_mutex);
54 
55 	free(lvs);
56 }
57 
58 static void
59 lvol_free(struct spdk_lvol *lvol)
60 {
61 	free(lvol);
62 }
63 
64 static void
65 lvol_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
66 {
67 	struct spdk_lvol_with_handle_req *req = cb_arg;
68 	struct spdk_lvol *lvol = req->lvol;
69 
70 	if (lvolerrno != 0) {
71 		SPDK_INFOLOG(lvol, "Failed to open lvol %s\n", lvol->unique_id);
72 		goto end;
73 	}
74 
75 	lvol->ref_count++;
76 	lvol->blob = blob;
77 end:
78 	req->cb_fn(req->cb_arg, lvol, lvolerrno);
79 	free(req);
80 }
81 
82 void
83 spdk_lvol_open(struct spdk_lvol *lvol, spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
84 {
85 	struct spdk_lvol_with_handle_req *req;
86 	struct spdk_blob_open_opts opts;
87 
88 	assert(cb_fn != NULL);
89 
90 	if (lvol == NULL) {
91 		SPDK_ERRLOG("lvol does not exist\n");
92 		cb_fn(cb_arg, NULL, -ENODEV);
93 		return;
94 	}
95 
96 	if (lvol->action_in_progress == true) {
97 		SPDK_ERRLOG("Cannot open lvol - operations on lvol pending\n");
98 		cb_fn(cb_arg, lvol, -EBUSY);
99 		return;
100 	}
101 
102 	if (lvol->ref_count > 0) {
103 		lvol->ref_count++;
104 		cb_fn(cb_arg, lvol, 0);
105 		return;
106 	}
107 
108 	req = calloc(1, sizeof(*req));
109 	if (req == NULL) {
110 		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
111 		cb_fn(cb_arg, NULL, -ENOMEM);
112 		return;
113 	}
114 
115 	req->cb_fn = cb_fn;
116 	req->cb_arg = cb_arg;
117 	req->lvol = lvol;
118 
119 	spdk_blob_open_opts_init(&opts, sizeof(opts));
120 	opts.clear_method = lvol->clear_method;
121 
122 	spdk_bs_open_blob_ext(lvol->lvol_store->blobstore, lvol->blob_id, &opts, lvol_open_cb, req);
123 }
124 
125 static void
126 bs_unload_with_error_cb(void *cb_arg, int lvolerrno)
127 {
128 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
129 
130 	req->cb_fn(req->cb_arg, NULL, req->lvserrno);
131 	free(req);
132 }
133 
134 static void
135 load_next_lvol(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
136 {
137 	struct spdk_lvs_with_handle_req *req = cb_arg;
138 	struct spdk_lvol_store *lvs = req->lvol_store;
139 	struct spdk_blob_store *bs = lvs->blobstore;
140 	struct spdk_lvol *lvol, *tmp;
141 	spdk_blob_id blob_id;
142 	const char *attr;
143 	size_t value_len;
144 	int rc;
145 
146 	if (lvolerrno == -ENOENT) {
147 		/* Finished iterating */
148 		if (req->lvserrno == 0) {
149 			req->cb_fn(req->cb_arg, lvs, req->lvserrno);
150 			free(req);
151 		} else {
152 			TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
153 				TAILQ_REMOVE(&lvs->lvols, lvol, link);
154 				free(lvol);
155 			}
156 			lvs_free(lvs);
157 			spdk_bs_unload(bs, bs_unload_with_error_cb, req);
158 		}
159 		return;
160 	} else if (lvolerrno < 0) {
161 		SPDK_ERRLOG("Failed to fetch blobs list\n");
162 		req->lvserrno = lvolerrno;
163 		goto invalid;
164 	}
165 
166 	blob_id = spdk_blob_get_id(blob);
167 
168 	if (blob_id == lvs->super_blob_id) {
169 		SPDK_INFOLOG(lvol, "found superblob %"PRIu64"\n", (uint64_t)blob_id);
170 		spdk_bs_iter_next(bs, blob, load_next_lvol, req);
171 		return;
172 	}
173 
174 	lvol = calloc(1, sizeof(*lvol));
175 	if (!lvol) {
176 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
177 		req->lvserrno = -ENOMEM;
178 		goto invalid;
179 	}
180 
181 	lvol->blob = blob;
182 	lvol->blob_id = blob_id;
183 	lvol->lvol_store = lvs;
184 	lvol->thin_provision = spdk_blob_is_thin_provisioned(blob);
185 
186 	rc = spdk_blob_get_xattr_value(blob, "uuid", (const void **)&attr, &value_len);
187 	if (rc != 0 || value_len != SPDK_UUID_STRING_LEN || attr[SPDK_UUID_STRING_LEN - 1] != '\0' ||
188 	    spdk_uuid_parse(&lvol->uuid, attr) != 0) {
189 		SPDK_INFOLOG(lvol, "Missing or corrupt lvol uuid\n");
190 		memset(&lvol->uuid, 0, sizeof(lvol->uuid));
191 	}
192 	spdk_uuid_fmt_lower(lvol->uuid_str, sizeof(lvol->uuid_str), &lvol->uuid);
193 
194 	if (!spdk_mem_all_zero(&lvol->uuid, sizeof(lvol->uuid))) {
195 		snprintf(lvol->unique_id, sizeof(lvol->unique_id), "%s", lvol->uuid_str);
196 	} else {
197 		spdk_uuid_fmt_lower(lvol->unique_id, sizeof(lvol->unique_id), &lvol->lvol_store->uuid);
198 		value_len = strlen(lvol->unique_id);
199 		snprintf(lvol->unique_id + value_len, sizeof(lvol->unique_id) - value_len, "_%"PRIu64,
200 			 (uint64_t)blob_id);
201 	}
202 
203 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&attr, &value_len);
204 	if (rc != 0 || value_len > SPDK_LVOL_NAME_MAX) {
205 		SPDK_ERRLOG("Cannot assign lvol name\n");
206 		lvol_free(lvol);
207 		req->lvserrno = -EINVAL;
208 		goto invalid;
209 	}
210 
211 	snprintf(lvol->name, sizeof(lvol->name), "%s", attr);
212 
213 	TAILQ_INSERT_TAIL(&lvs->lvols, lvol, link);
214 
215 	lvs->lvol_count++;
216 
217 	SPDK_INFOLOG(lvol, "added lvol %s (%s)\n", lvol->unique_id, lvol->uuid_str);
218 
219 invalid:
220 	spdk_bs_iter_next(bs, blob, load_next_lvol, req);
221 }
222 
223 static void
224 close_super_cb(void *cb_arg, int lvolerrno)
225 {
226 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
227 	struct spdk_lvol_store *lvs = req->lvol_store;
228 	struct spdk_blob_store *bs = lvs->blobstore;
229 
230 	if (lvolerrno != 0) {
231 		SPDK_INFOLOG(lvol, "Could not close super blob\n");
232 		lvs_free(lvs);
233 		req->lvserrno = -ENODEV;
234 		spdk_bs_unload(bs, bs_unload_with_error_cb, req);
235 		return;
236 	}
237 
238 	/* Start loading lvols */
239 	spdk_bs_iter_first(lvs->blobstore, load_next_lvol, req);
240 }
241 
242 static void
243 close_super_blob_with_error_cb(void *cb_arg, int lvolerrno)
244 {
245 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
246 	struct spdk_lvol_store *lvs = req->lvol_store;
247 	struct spdk_blob_store *bs = lvs->blobstore;
248 
249 	lvs_free(lvs);
250 
251 	spdk_bs_unload(bs, bs_unload_with_error_cb, req);
252 }
253 
254 static void
255 lvs_read_uuid(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
256 {
257 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
258 	struct spdk_lvol_store *lvs = req->lvol_store;
259 	struct spdk_blob_store *bs = lvs->blobstore;
260 	const char *attr;
261 	size_t value_len;
262 	int rc;
263 
264 	if (lvolerrno != 0) {
265 		SPDK_INFOLOG(lvol, "Could not open super blob\n");
266 		lvs_free(lvs);
267 		req->lvserrno = -ENODEV;
268 		spdk_bs_unload(bs, bs_unload_with_error_cb, req);
269 		return;
270 	}
271 
272 	rc = spdk_blob_get_xattr_value(blob, "uuid", (const void **)&attr, &value_len);
273 	if (rc != 0 || value_len != SPDK_UUID_STRING_LEN || attr[SPDK_UUID_STRING_LEN - 1] != '\0') {
274 		SPDK_INFOLOG(lvol, "missing or incorrect UUID\n");
275 		req->lvserrno = -EINVAL;
276 		spdk_blob_close(blob, close_super_blob_with_error_cb, req);
277 		return;
278 	}
279 
280 	if (spdk_uuid_parse(&lvs->uuid, attr)) {
281 		SPDK_INFOLOG(lvol, "incorrect UUID '%s'\n", attr);
282 		req->lvserrno = -EINVAL;
283 		spdk_blob_close(blob, close_super_blob_with_error_cb, req);
284 		return;
285 	}
286 
287 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&attr, &value_len);
288 	if (rc != 0 || value_len > SPDK_LVS_NAME_MAX) {
289 		SPDK_INFOLOG(lvol, "missing or invalid name\n");
290 		req->lvserrno = -EINVAL;
291 		spdk_blob_close(blob, close_super_blob_with_error_cb, req);
292 		return;
293 	}
294 
295 	snprintf(lvs->name, sizeof(lvs->name), "%s", attr);
296 
297 	rc = add_lvs_to_list(lvs);
298 	if (rc) {
299 		SPDK_INFOLOG(lvol, "lvolstore with name %s already exists\n", lvs->name);
300 		req->lvserrno = -EEXIST;
301 		spdk_blob_close(blob, close_super_blob_with_error_cb, req);
302 		return;
303 	}
304 
305 	lvs->super_blob_id = spdk_blob_get_id(blob);
306 
307 	spdk_blob_close(blob, close_super_cb, req);
308 }
309 
310 static void
311 lvs_open_super(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
312 {
313 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
314 	struct spdk_lvol_store *lvs = req->lvol_store;
315 	struct spdk_blob_store *bs = lvs->blobstore;
316 
317 	if (lvolerrno != 0) {
318 		SPDK_INFOLOG(lvol, "Super blob not found\n");
319 		lvs_free(lvs);
320 		req->lvserrno = -ENODEV;
321 		spdk_bs_unload(bs, bs_unload_with_error_cb, req);
322 		return;
323 	}
324 
325 	spdk_bs_open_blob(bs, blobid, lvs_read_uuid, req);
326 }
327 
328 static void
329 lvs_load_cb(void *cb_arg, struct spdk_blob_store *bs, int lvolerrno)
330 {
331 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
332 	struct spdk_lvol_store *lvs;
333 
334 	if (lvolerrno != 0) {
335 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
336 		free(req);
337 		return;
338 	}
339 
340 	lvs = calloc(1, sizeof(*lvs));
341 	if (lvs == NULL) {
342 		SPDK_ERRLOG("Cannot alloc memory for lvol store\n");
343 		spdk_bs_unload(bs, bs_unload_with_error_cb, req);
344 		return;
345 	}
346 
347 	lvs->blobstore = bs;
348 	lvs->bs_dev = req->bs_dev;
349 	TAILQ_INIT(&lvs->lvols);
350 	TAILQ_INIT(&lvs->pending_lvols);
351 
352 	req->lvol_store = lvs;
353 
354 	spdk_bs_get_super(bs, lvs_open_super, req);
355 }
356 
357 static void
358 lvs_bs_opts_init(struct spdk_bs_opts *opts)
359 {
360 	spdk_bs_opts_init(opts, sizeof(*opts));
361 	opts->max_channel_ops = SPDK_LVOL_BLOB_OPTS_CHANNEL_OPS;
362 }
363 
364 void
365 spdk_lvs_load(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
366 {
367 	struct spdk_lvs_with_handle_req *req;
368 	struct spdk_bs_opts opts = {};
369 
370 	assert(cb_fn != NULL);
371 
372 	if (bs_dev == NULL) {
373 		SPDK_ERRLOG("Blobstore device does not exist\n");
374 		cb_fn(cb_arg, NULL, -ENODEV);
375 		return;
376 	}
377 
378 	req = calloc(1, sizeof(*req));
379 	if (req == NULL) {
380 		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
381 		cb_fn(cb_arg, NULL, -ENOMEM);
382 		return;
383 	}
384 
385 	req->cb_fn = cb_fn;
386 	req->cb_arg = cb_arg;
387 	req->bs_dev = bs_dev;
388 
389 	lvs_bs_opts_init(&opts);
390 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "LVOLSTORE");
391 
392 	spdk_bs_load(bs_dev, &opts, lvs_load_cb, req);
393 }
394 
395 static void
396 remove_bs_on_error_cb(void *cb_arg, int bserrno)
397 {
398 }
399 
400 static void
401 exit_error_lvs_req(struct spdk_lvs_with_handle_req *req, struct spdk_lvol_store *lvs, int lvolerrno)
402 {
403 	req->cb_fn(req->cb_arg, NULL, lvolerrno);
404 	spdk_bs_destroy(lvs->blobstore, remove_bs_on_error_cb, NULL);
405 	lvs_free(lvs);
406 	free(req);
407 }
408 
409 static void
410 super_create_close_cb(void *cb_arg, int lvolerrno)
411 {
412 	struct spdk_lvs_with_handle_req *req = cb_arg;
413 	struct spdk_lvol_store *lvs = req->lvol_store;
414 
415 	if (lvolerrno < 0) {
416 		SPDK_ERRLOG("Lvol store init failed: could not close super blob\n");
417 		exit_error_lvs_req(req, lvs, lvolerrno);
418 		return;
419 	}
420 
421 	req->cb_fn(req->cb_arg, lvs, lvolerrno);
422 	free(req);
423 }
424 
425 static void
426 super_blob_set_cb(void *cb_arg, int lvolerrno)
427 {
428 	struct spdk_lvs_with_handle_req *req = cb_arg;
429 	struct spdk_lvol_store *lvs = req->lvol_store;
430 	struct spdk_blob *blob = lvs->super_blob;
431 
432 	if (lvolerrno < 0) {
433 		SPDK_ERRLOG("Lvol store init failed: could not set uuid for super blob\n");
434 		exit_error_lvs_req(req, lvs, lvolerrno);
435 		return;
436 	}
437 
438 	spdk_blob_close(blob, super_create_close_cb, req);
439 }
440 
441 static void
442 super_blob_init_cb(void *cb_arg, int lvolerrno)
443 {
444 	struct spdk_lvs_with_handle_req *req = cb_arg;
445 	struct spdk_lvol_store *lvs = req->lvol_store;
446 	struct spdk_blob *blob = lvs->super_blob;
447 	char uuid[SPDK_UUID_STRING_LEN];
448 
449 	if (lvolerrno < 0) {
450 		SPDK_ERRLOG("Lvol store init failed: could not set super blob\n");
451 		exit_error_lvs_req(req, lvs, lvolerrno);
452 		return;
453 	}
454 
455 	spdk_uuid_fmt_lower(uuid, sizeof(uuid), &lvs->uuid);
456 
457 	spdk_blob_set_xattr(blob, "uuid", uuid, sizeof(uuid));
458 	spdk_blob_set_xattr(blob, "name", lvs->name, strnlen(lvs->name, SPDK_LVS_NAME_MAX) + 1);
459 	spdk_blob_sync_md(blob, super_blob_set_cb, req);
460 }
461 
462 static void
463 super_blob_create_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
464 {
465 	struct spdk_lvs_with_handle_req *req = cb_arg;
466 	struct spdk_lvol_store *lvs = req->lvol_store;
467 
468 	if (lvolerrno < 0) {
469 		SPDK_ERRLOG("Lvol store init failed: could not open super blob\n");
470 		exit_error_lvs_req(req, lvs, lvolerrno);
471 		return;
472 	}
473 
474 	lvs->super_blob = blob;
475 	lvs->super_blob_id = spdk_blob_get_id(blob);
476 
477 	spdk_bs_set_super(lvs->blobstore, lvs->super_blob_id, super_blob_init_cb, req);
478 }
479 
480 static void
481 super_blob_create_cb(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
482 {
483 	struct spdk_lvs_with_handle_req *req = cb_arg;
484 	struct spdk_lvol_store *lvs = req->lvol_store;
485 	struct spdk_blob_store *bs;
486 
487 	if (lvolerrno < 0) {
488 		SPDK_ERRLOG("Lvol store init failed: could not create super blob\n");
489 		exit_error_lvs_req(req, lvs, lvolerrno);
490 		return;
491 	}
492 
493 	bs = req->lvol_store->blobstore;
494 
495 	spdk_bs_open_blob(bs, blobid, super_blob_create_open_cb, req);
496 }
497 
498 static void
499 lvs_init_cb(void *cb_arg, struct spdk_blob_store *bs, int lvserrno)
500 {
501 	struct spdk_lvs_with_handle_req *lvs_req = cb_arg;
502 	struct spdk_lvol_store *lvs = lvs_req->lvol_store;
503 
504 	if (lvserrno != 0) {
505 		assert(bs == NULL);
506 		lvs_req->cb_fn(lvs_req->cb_arg, NULL, lvserrno);
507 		SPDK_ERRLOG("Lvol store init failed: could not initialize blobstore\n");
508 		lvs_free(lvs);
509 		free(lvs_req);
510 		return;
511 	}
512 
513 	assert(bs != NULL);
514 	lvs->blobstore = bs;
515 	TAILQ_INIT(&lvs->lvols);
516 	TAILQ_INIT(&lvs->pending_lvols);
517 
518 	SPDK_INFOLOG(lvol, "Lvol store initialized\n");
519 
520 	/* create super blob */
521 	spdk_bs_create_blob(lvs->blobstore, super_blob_create_cb, lvs_req);
522 }
523 
524 void
525 spdk_lvs_opts_init(struct spdk_lvs_opts *o)
526 {
527 	o->cluster_sz = SPDK_LVS_OPTS_CLUSTER_SZ;
528 	o->clear_method = LVS_CLEAR_WITH_UNMAP;
529 	o->num_md_pages_per_cluster_ratio = 100;
530 	memset(o->name, 0, sizeof(o->name));
531 }
532 
533 static void
534 setup_lvs_opts(struct spdk_bs_opts *bs_opts, struct spdk_lvs_opts *o, uint32_t total_clusters)
535 {
536 	assert(o != NULL);
537 	lvs_bs_opts_init(bs_opts);
538 	bs_opts->cluster_sz = o->cluster_sz;
539 	bs_opts->clear_method = (enum bs_clear_method)o->clear_method;
540 	bs_opts->num_md_pages = (o->num_md_pages_per_cluster_ratio * total_clusters) / 100;
541 }
542 
543 int
544 spdk_lvs_init(struct spdk_bs_dev *bs_dev, struct spdk_lvs_opts *o,
545 	      spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
546 {
547 	struct spdk_lvol_store *lvs;
548 	struct spdk_lvs_with_handle_req *lvs_req;
549 	struct spdk_bs_opts opts = {};
550 	uint32_t total_clusters;
551 	int rc;
552 
553 	if (bs_dev == NULL) {
554 		SPDK_ERRLOG("Blobstore device does not exist\n");
555 		return -ENODEV;
556 	}
557 
558 	if (o == NULL) {
559 		SPDK_ERRLOG("spdk_lvs_opts not specified\n");
560 		return -EINVAL;
561 	}
562 
563 	if (o->cluster_sz < bs_dev->blocklen) {
564 		SPDK_ERRLOG("Cluster size %" PRIu32 " is smaller than blocklen %" PRIu32 "\n",
565 			    o->cluster_sz, bs_dev->blocklen);
566 		return -EINVAL;
567 	}
568 	total_clusters = bs_dev->blockcnt / (o->cluster_sz / bs_dev->blocklen);
569 
570 	setup_lvs_opts(&opts, o, total_clusters);
571 
572 	if (strnlen(o->name, SPDK_LVS_NAME_MAX) == SPDK_LVS_NAME_MAX) {
573 		SPDK_ERRLOG("Name has no null terminator.\n");
574 		return -EINVAL;
575 	}
576 
577 	if (strnlen(o->name, SPDK_LVS_NAME_MAX) == 0) {
578 		SPDK_ERRLOG("No name specified.\n");
579 		return -EINVAL;
580 	}
581 
582 	lvs = calloc(1, sizeof(*lvs));
583 	if (!lvs) {
584 		SPDK_ERRLOG("Cannot alloc memory for lvol store base pointer\n");
585 		return -ENOMEM;
586 	}
587 
588 	spdk_uuid_generate(&lvs->uuid);
589 	snprintf(lvs->name, sizeof(lvs->name), "%s", o->name);
590 
591 	rc = add_lvs_to_list(lvs);
592 	if (rc) {
593 		SPDK_ERRLOG("lvolstore with name %s already exists\n", lvs->name);
594 		lvs_free(lvs);
595 		return -EEXIST;
596 	}
597 
598 	lvs_req = calloc(1, sizeof(*lvs_req));
599 	if (!lvs_req) {
600 		lvs_free(lvs);
601 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
602 		return -ENOMEM;
603 	}
604 
605 	assert(cb_fn != NULL);
606 	lvs_req->cb_fn = cb_fn;
607 	lvs_req->cb_arg = cb_arg;
608 	lvs_req->lvol_store = lvs;
609 	lvs->bs_dev = bs_dev;
610 
611 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "LVOLSTORE");
612 
613 	SPDK_INFOLOG(lvol, "Initializing lvol store\n");
614 	spdk_bs_init(bs_dev, &opts, lvs_init_cb, lvs_req);
615 
616 	return 0;
617 }
618 
619 static void
620 lvs_rename_cb(void *cb_arg, int lvolerrno)
621 {
622 	struct spdk_lvs_req *req = cb_arg;
623 
624 	if (lvolerrno != 0) {
625 		req->lvserrno = lvolerrno;
626 	}
627 	if (req->lvserrno != 0) {
628 		SPDK_ERRLOG("Lvol store rename operation failed\n");
629 		/* Lvs renaming failed, so we should 'clear' new_name.
630 		 * Otherwise it could cause a failure on the next attempt to change the name to 'new_name'  */
631 		snprintf(req->lvol_store->new_name,
632 			 sizeof(req->lvol_store->new_name),
633 			 "%s", req->lvol_store->name);
634 	} else {
635 		/* Update lvs name with new_name */
636 		snprintf(req->lvol_store->name,
637 			 sizeof(req->lvol_store->name),
638 			 "%s", req->lvol_store->new_name);
639 	}
640 
641 	req->cb_fn(req->cb_arg, req->lvserrno);
642 	free(req);
643 }
644 
645 static void
646 lvs_rename_sync_cb(void *cb_arg, int lvolerrno)
647 {
648 	struct spdk_lvs_req *req = cb_arg;
649 	struct spdk_blob *blob = req->lvol_store->super_blob;
650 
651 	if (lvolerrno < 0) {
652 		req->lvserrno = lvolerrno;
653 	}
654 
655 	spdk_blob_close(blob, lvs_rename_cb, req);
656 }
657 
658 static void
659 lvs_rename_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
660 {
661 	struct spdk_lvs_req *req = cb_arg;
662 	int rc;
663 
664 	if (lvolerrno < 0) {
665 		lvs_rename_cb(cb_arg, lvolerrno);
666 		return;
667 	}
668 
669 	rc = spdk_blob_set_xattr(blob, "name", req->lvol_store->new_name,
670 				 strlen(req->lvol_store->new_name) + 1);
671 	if (rc < 0) {
672 		req->lvserrno = rc;
673 		lvs_rename_sync_cb(req, rc);
674 		return;
675 	}
676 
677 	req->lvol_store->super_blob = blob;
678 
679 	spdk_blob_sync_md(blob, lvs_rename_sync_cb, req);
680 }
681 
682 void
683 spdk_lvs_rename(struct spdk_lvol_store *lvs, const char *new_name,
684 		spdk_lvs_op_complete cb_fn, void *cb_arg)
685 {
686 	struct spdk_lvs_req *req;
687 	struct spdk_lvol_store *tmp;
688 
689 	/* Check if new name is current lvs name.
690 	 * If so, return success immediately */
691 	if (strncmp(lvs->name, new_name, SPDK_LVS_NAME_MAX) == 0) {
692 		cb_fn(cb_arg, 0);
693 		return;
694 	}
695 
696 	/* Check if new or new_name is already used in other lvs */
697 	pthread_mutex_lock(&g_lvol_stores_mutex);
698 	TAILQ_FOREACH(tmp, &g_lvol_stores, link) {
699 		if (!strncmp(new_name, tmp->name, SPDK_LVS_NAME_MAX) ||
700 		    !strncmp(new_name, tmp->new_name, SPDK_LVS_NAME_MAX)) {
701 			pthread_mutex_unlock(&g_lvol_stores_mutex);
702 			cb_fn(cb_arg, -EEXIST);
703 			return;
704 		}
705 	}
706 	pthread_mutex_unlock(&g_lvol_stores_mutex);
707 
708 	req = calloc(1, sizeof(*req));
709 	if (!req) {
710 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
711 		cb_fn(cb_arg, -ENOMEM);
712 		return;
713 	}
714 	snprintf(lvs->new_name, sizeof(lvs->new_name), "%s", new_name);
715 	req->lvol_store = lvs;
716 	req->cb_fn = cb_fn;
717 	req->cb_arg = cb_arg;
718 
719 	spdk_bs_open_blob(lvs->blobstore, lvs->super_blob_id, lvs_rename_open_cb, req);
720 }
721 
722 static void
723 _lvs_unload_cb(void *cb_arg, int lvserrno)
724 {
725 	struct spdk_lvs_req *lvs_req = cb_arg;
726 
727 	SPDK_INFOLOG(lvol, "Lvol store unloaded\n");
728 	assert(lvs_req->cb_fn != NULL);
729 	lvs_req->cb_fn(lvs_req->cb_arg, lvserrno);
730 	free(lvs_req);
731 }
732 
733 int
734 spdk_lvs_unload(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn,
735 		void *cb_arg)
736 {
737 	struct spdk_lvs_req *lvs_req;
738 	struct spdk_lvol *lvol, *tmp;
739 
740 	if (lvs == NULL) {
741 		SPDK_ERRLOG("Lvol store is NULL\n");
742 		return -ENODEV;
743 	}
744 
745 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
746 		if (lvol->action_in_progress == true) {
747 			SPDK_ERRLOG("Cannot unload lvol store - operations on lvols pending\n");
748 			cb_fn(cb_arg, -EBUSY);
749 			return -EBUSY;
750 		} else if (lvol->ref_count != 0) {
751 			SPDK_ERRLOG("Lvols still open on lvol store\n");
752 			cb_fn(cb_arg, -EBUSY);
753 			return -EBUSY;
754 		}
755 	}
756 
757 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
758 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
759 		lvol_free(lvol);
760 	}
761 
762 	lvs_req = calloc(1, sizeof(*lvs_req));
763 	if (!lvs_req) {
764 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
765 		return -ENOMEM;
766 	}
767 
768 	lvs_req->cb_fn = cb_fn;
769 	lvs_req->cb_arg = cb_arg;
770 
771 	SPDK_INFOLOG(lvol, "Unloading lvol store\n");
772 	spdk_bs_unload(lvs->blobstore, _lvs_unload_cb, lvs_req);
773 	lvs_free(lvs);
774 
775 	return 0;
776 }
777 
778 static void
779 _lvs_destroy_cb(void *cb_arg, int lvserrno)
780 {
781 	struct spdk_lvs_destroy_req *lvs_req = cb_arg;
782 
783 	SPDK_INFOLOG(lvol, "Lvol store destroyed\n");
784 	assert(lvs_req->cb_fn != NULL);
785 	lvs_req->cb_fn(lvs_req->cb_arg, lvserrno);
786 	free(lvs_req);
787 }
788 
789 static void
790 _lvs_destroy_super_cb(void *cb_arg, int bserrno)
791 {
792 	struct spdk_lvs_destroy_req *lvs_req = cb_arg;
793 	struct spdk_lvol_store *lvs = lvs_req->lvs;
794 
795 	assert(lvs != NULL);
796 
797 	SPDK_INFOLOG(lvol, "Destroying lvol store\n");
798 	spdk_bs_destroy(lvs->blobstore, _lvs_destroy_cb, lvs_req);
799 	lvs_free(lvs);
800 }
801 
802 int
803 spdk_lvs_destroy(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn,
804 		 void *cb_arg)
805 {
806 	struct spdk_lvs_destroy_req *lvs_req;
807 	struct spdk_lvol *iter_lvol, *tmp;
808 
809 	if (lvs == NULL) {
810 		SPDK_ERRLOG("Lvol store is NULL\n");
811 		return -ENODEV;
812 	}
813 
814 	TAILQ_FOREACH_SAFE(iter_lvol, &lvs->lvols, link, tmp) {
815 		if (iter_lvol->action_in_progress == true) {
816 			SPDK_ERRLOG("Cannot destroy lvol store - operations on lvols pending\n");
817 			cb_fn(cb_arg, -EBUSY);
818 			return -EBUSY;
819 		} else if (iter_lvol->ref_count != 0) {
820 			SPDK_ERRLOG("Lvols still open on lvol store\n");
821 			cb_fn(cb_arg, -EBUSY);
822 			return -EBUSY;
823 		}
824 	}
825 
826 	TAILQ_FOREACH_SAFE(iter_lvol, &lvs->lvols, link, tmp) {
827 		free(iter_lvol);
828 	}
829 
830 	lvs_req = calloc(1, sizeof(*lvs_req));
831 	if (!lvs_req) {
832 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
833 		return -ENOMEM;
834 	}
835 
836 	lvs_req->cb_fn = cb_fn;
837 	lvs_req->cb_arg = cb_arg;
838 	lvs_req->lvs = lvs;
839 
840 	SPDK_INFOLOG(lvol, "Deleting super blob\n");
841 	spdk_bs_delete_blob(lvs->blobstore, lvs->super_blob_id, _lvs_destroy_super_cb, lvs_req);
842 
843 	return 0;
844 }
845 
846 static void
847 lvol_close_blob_cb(void *cb_arg, int lvolerrno)
848 {
849 	struct spdk_lvol_req *req = cb_arg;
850 	struct spdk_lvol *lvol = req->lvol;
851 
852 	if (lvolerrno < 0) {
853 		SPDK_ERRLOG("Could not close blob on lvol\n");
854 		lvol_free(lvol);
855 		goto end;
856 	}
857 
858 	lvol->ref_count--;
859 	lvol->action_in_progress = false;
860 	SPDK_INFOLOG(lvol, "Lvol %s closed\n", lvol->unique_id);
861 
862 end:
863 	req->cb_fn(req->cb_arg, lvolerrno);
864 	free(req);
865 }
866 
867 bool
868 spdk_lvol_deletable(struct spdk_lvol *lvol)
869 {
870 	size_t count = 0;
871 
872 	spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
873 	return (count == 0);
874 }
875 
876 static void
877 lvol_delete_blob_cb(void *cb_arg, int lvolerrno)
878 {
879 	struct spdk_lvol_req *req = cb_arg;
880 	struct spdk_lvol *lvol = req->lvol;
881 
882 	if (lvolerrno < 0) {
883 		SPDK_ERRLOG("Could not remove blob on lvol gracefully - forced removal\n");
884 	} else {
885 		SPDK_INFOLOG(lvol, "Lvol %s deleted\n", lvol->unique_id);
886 	}
887 
888 	TAILQ_REMOVE(&lvol->lvol_store->lvols, lvol, link);
889 	lvol_free(lvol);
890 	req->cb_fn(req->cb_arg, lvolerrno);
891 	free(req);
892 }
893 
894 static void
895 lvol_create_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
896 {
897 	struct spdk_lvol_with_handle_req *req = cb_arg;
898 	struct spdk_lvol *lvol = req->lvol;
899 
900 	TAILQ_REMOVE(&req->lvol->lvol_store->pending_lvols, req->lvol, link);
901 
902 	if (lvolerrno < 0) {
903 		free(lvol);
904 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
905 		free(req);
906 		return;
907 	}
908 
909 	lvol->blob = blob;
910 	lvol->blob_id = spdk_blob_get_id(blob);
911 
912 	TAILQ_INSERT_TAIL(&lvol->lvol_store->lvols, lvol, link);
913 
914 	snprintf(lvol->unique_id, sizeof(lvol->unique_id), "%s", lvol->uuid_str);
915 	lvol->ref_count++;
916 
917 	assert(req->cb_fn != NULL);
918 	req->cb_fn(req->cb_arg, req->lvol, lvolerrno);
919 	free(req);
920 }
921 
922 static void
923 lvol_create_cb(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
924 {
925 	struct spdk_lvol_with_handle_req *req = cb_arg;
926 	struct spdk_blob_store *bs;
927 	struct spdk_blob_open_opts opts;
928 
929 	if (lvolerrno < 0) {
930 		TAILQ_REMOVE(&req->lvol->lvol_store->pending_lvols, req->lvol, link);
931 		free(req->lvol);
932 		assert(req->cb_fn != NULL);
933 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
934 		free(req);
935 		return;
936 	}
937 
938 	spdk_blob_open_opts_init(&opts, sizeof(opts));
939 	opts.clear_method = req->lvol->clear_method;
940 	bs = req->lvol->lvol_store->blobstore;
941 
942 	spdk_bs_open_blob_ext(bs, blobid, &opts, lvol_create_open_cb, req);
943 }
944 
945 static void
946 lvol_get_xattr_value(void *xattr_ctx, const char *name,
947 		     const void **value, size_t *value_len)
948 {
949 	struct spdk_lvol *lvol = xattr_ctx;
950 
951 	if (!strcmp(LVOL_NAME, name)) {
952 		*value = lvol->name;
953 		*value_len = SPDK_LVOL_NAME_MAX;
954 		return;
955 	}
956 	if (!strcmp("uuid", name)) {
957 		*value = lvol->uuid_str;
958 		*value_len = sizeof(lvol->uuid_str);
959 		return;
960 	}
961 	*value = NULL;
962 	*value_len = 0;
963 }
964 
965 static int
966 lvs_verify_lvol_name(struct spdk_lvol_store *lvs, const char *name)
967 {
968 	struct spdk_lvol *tmp;
969 
970 	if (name == NULL || strnlen(name, SPDK_LVOL_NAME_MAX) == 0) {
971 		SPDK_INFOLOG(lvol, "lvol name not provided.\n");
972 		return -EINVAL;
973 	}
974 
975 	if (strnlen(name, SPDK_LVOL_NAME_MAX) == SPDK_LVOL_NAME_MAX) {
976 		SPDK_ERRLOG("Name has no null terminator.\n");
977 		return -EINVAL;
978 	}
979 
980 	TAILQ_FOREACH(tmp, &lvs->lvols, link) {
981 		if (!strncmp(name, tmp->name, SPDK_LVOL_NAME_MAX)) {
982 			SPDK_ERRLOG("lvol with name %s already exists\n", name);
983 			return -EEXIST;
984 		}
985 	}
986 
987 	TAILQ_FOREACH(tmp, &lvs->pending_lvols, link) {
988 		if (!strncmp(name, tmp->name, SPDK_LVOL_NAME_MAX)) {
989 			SPDK_ERRLOG("lvol with name %s is being already created\n", name);
990 			return -EEXIST;
991 		}
992 	}
993 
994 	return 0;
995 }
996 
997 int
998 spdk_lvol_create(struct spdk_lvol_store *lvs, const char *name, uint64_t sz,
999 		 bool thin_provision, enum lvol_clear_method clear_method, spdk_lvol_op_with_handle_complete cb_fn,
1000 		 void *cb_arg)
1001 {
1002 	struct spdk_lvol_with_handle_req *req;
1003 	struct spdk_blob_store *bs;
1004 	struct spdk_lvol *lvol;
1005 	struct spdk_blob_opts opts;
1006 	uint64_t num_clusters;
1007 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1008 	int rc;
1009 
1010 	if (lvs == NULL) {
1011 		SPDK_ERRLOG("lvol store does not exist\n");
1012 		return -EINVAL;
1013 	}
1014 
1015 	rc = lvs_verify_lvol_name(lvs, name);
1016 	if (rc < 0) {
1017 		return rc;
1018 	}
1019 
1020 	bs = lvs->blobstore;
1021 
1022 	req = calloc(1, sizeof(*req));
1023 	if (!req) {
1024 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1025 		return -ENOMEM;
1026 	}
1027 	req->cb_fn = cb_fn;
1028 	req->cb_arg = cb_arg;
1029 
1030 	lvol = calloc(1, sizeof(*lvol));
1031 	if (!lvol) {
1032 		free(req);
1033 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1034 		return -ENOMEM;
1035 	}
1036 	lvol->lvol_store = lvs;
1037 	num_clusters = spdk_divide_round_up(sz, spdk_bs_get_cluster_size(bs));
1038 	lvol->thin_provision = thin_provision;
1039 	lvol->clear_method = (enum blob_clear_method)clear_method;
1040 	snprintf(lvol->name, sizeof(lvol->name), "%s", name);
1041 	TAILQ_INSERT_TAIL(&lvol->lvol_store->pending_lvols, lvol, link);
1042 	spdk_uuid_generate(&lvol->uuid);
1043 	spdk_uuid_fmt_lower(lvol->uuid_str, sizeof(lvol->uuid_str), &lvol->uuid);
1044 	req->lvol = lvol;
1045 
1046 	spdk_blob_opts_init(&opts, sizeof(opts));
1047 	opts.thin_provision = thin_provision;
1048 	opts.num_clusters = num_clusters;
1049 	opts.clear_method = lvol->clear_method;
1050 	opts.xattrs.count = SPDK_COUNTOF(xattr_names);
1051 	opts.xattrs.names = xattr_names;
1052 	opts.xattrs.ctx = lvol;
1053 	opts.xattrs.get_value = lvol_get_xattr_value;
1054 
1055 	spdk_bs_create_blob_ext(lvs->blobstore, &opts, lvol_create_cb, req);
1056 
1057 	return 0;
1058 }
1059 
1060 void
1061 spdk_lvol_create_snapshot(struct spdk_lvol *origlvol, const char *snapshot_name,
1062 			  spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1063 {
1064 	struct spdk_lvol_store *lvs;
1065 	struct spdk_lvol *newlvol;
1066 	struct spdk_blob *origblob;
1067 	struct spdk_lvol_with_handle_req *req;
1068 	struct spdk_blob_xattr_opts snapshot_xattrs;
1069 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1070 	int rc;
1071 
1072 	if (origlvol == NULL) {
1073 		SPDK_INFOLOG(lvol, "Lvol not provided.\n");
1074 		cb_fn(cb_arg, NULL, -EINVAL);
1075 		return;
1076 	}
1077 
1078 	origblob = origlvol->blob;
1079 	lvs = origlvol->lvol_store;
1080 	if (lvs == NULL) {
1081 		SPDK_ERRLOG("lvol store does not exist\n");
1082 		cb_fn(cb_arg, NULL, -EINVAL);
1083 		return;
1084 	}
1085 
1086 	rc = lvs_verify_lvol_name(lvs, snapshot_name);
1087 	if (rc < 0) {
1088 		cb_fn(cb_arg, NULL, rc);
1089 		return;
1090 	}
1091 
1092 	req = calloc(1, sizeof(*req));
1093 	if (!req) {
1094 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1095 		cb_fn(cb_arg, NULL, -ENOMEM);
1096 		return;
1097 	}
1098 
1099 	newlvol = calloc(1, sizeof(*newlvol));
1100 	if (!newlvol) {
1101 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1102 		free(req);
1103 		cb_fn(cb_arg, NULL, -ENOMEM);
1104 		return;
1105 	}
1106 
1107 	newlvol->lvol_store = origlvol->lvol_store;
1108 	snprintf(newlvol->name, sizeof(newlvol->name), "%s", snapshot_name);
1109 	TAILQ_INSERT_TAIL(&newlvol->lvol_store->pending_lvols, newlvol, link);
1110 	spdk_uuid_generate(&newlvol->uuid);
1111 	spdk_uuid_fmt_lower(newlvol->uuid_str, sizeof(newlvol->uuid_str), &newlvol->uuid);
1112 	snapshot_xattrs.count = SPDK_COUNTOF(xattr_names);
1113 	snapshot_xattrs.ctx = newlvol;
1114 	snapshot_xattrs.names = xattr_names;
1115 	snapshot_xattrs.get_value = lvol_get_xattr_value;
1116 	req->lvol = newlvol;
1117 	req->cb_fn = cb_fn;
1118 	req->cb_arg = cb_arg;
1119 
1120 	spdk_bs_create_snapshot(lvs->blobstore, spdk_blob_get_id(origblob), &snapshot_xattrs,
1121 				lvol_create_cb, req);
1122 }
1123 
1124 void
1125 spdk_lvol_create_clone(struct spdk_lvol *origlvol, const char *clone_name,
1126 		       spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1127 {
1128 	struct spdk_lvol *newlvol;
1129 	struct spdk_lvol_with_handle_req *req;
1130 	struct spdk_lvol_store *lvs;
1131 	struct spdk_blob *origblob;
1132 	struct spdk_blob_xattr_opts clone_xattrs;
1133 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1134 	int rc;
1135 
1136 	if (origlvol == NULL) {
1137 		SPDK_INFOLOG(lvol, "Lvol not provided.\n");
1138 		cb_fn(cb_arg, NULL, -EINVAL);
1139 		return;
1140 	}
1141 
1142 	origblob = origlvol->blob;
1143 	lvs = origlvol->lvol_store;
1144 	if (lvs == NULL) {
1145 		SPDK_ERRLOG("lvol store does not exist\n");
1146 		cb_fn(cb_arg, NULL, -EINVAL);
1147 		return;
1148 	}
1149 
1150 	rc = lvs_verify_lvol_name(lvs, clone_name);
1151 	if (rc < 0) {
1152 		cb_fn(cb_arg, NULL, rc);
1153 		return;
1154 	}
1155 
1156 	req = calloc(1, sizeof(*req));
1157 	if (!req) {
1158 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1159 		cb_fn(cb_arg, NULL, -ENOMEM);
1160 		return;
1161 	}
1162 
1163 	newlvol = calloc(1, sizeof(*newlvol));
1164 	if (!newlvol) {
1165 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1166 		free(req);
1167 		cb_fn(cb_arg, NULL, -ENOMEM);
1168 		return;
1169 	}
1170 
1171 	newlvol->lvol_store = lvs;
1172 	snprintf(newlvol->name, sizeof(newlvol->name), "%s", clone_name);
1173 	TAILQ_INSERT_TAIL(&newlvol->lvol_store->pending_lvols, newlvol, link);
1174 	spdk_uuid_generate(&newlvol->uuid);
1175 	spdk_uuid_fmt_lower(newlvol->uuid_str, sizeof(newlvol->uuid_str), &newlvol->uuid);
1176 	clone_xattrs.count = SPDK_COUNTOF(xattr_names);
1177 	clone_xattrs.ctx = newlvol;
1178 	clone_xattrs.names = xattr_names;
1179 	clone_xattrs.get_value = lvol_get_xattr_value;
1180 	req->lvol = newlvol;
1181 	req->cb_fn = cb_fn;
1182 	req->cb_arg = cb_arg;
1183 
1184 	spdk_bs_create_clone(lvs->blobstore, spdk_blob_get_id(origblob), &clone_xattrs,
1185 			     lvol_create_cb,
1186 			     req);
1187 }
1188 
1189 static void
1190 lvol_resize_done(void *cb_arg, int lvolerrno)
1191 {
1192 	struct spdk_lvol_req *req = cb_arg;
1193 
1194 	req->cb_fn(req->cb_arg,  lvolerrno);
1195 	free(req);
1196 }
1197 
1198 static void
1199 lvol_blob_resize_cb(void *cb_arg, int bserrno)
1200 {
1201 	struct spdk_lvol_req *req = cb_arg;
1202 	struct spdk_lvol *lvol = req->lvol;
1203 
1204 	if (bserrno != 0) {
1205 		req->cb_fn(req->cb_arg, bserrno);
1206 		free(req);
1207 		return;
1208 	}
1209 
1210 	spdk_blob_sync_md(lvol->blob, lvol_resize_done, req);
1211 }
1212 
1213 void
1214 spdk_lvol_resize(struct spdk_lvol *lvol, uint64_t sz,
1215 		 spdk_lvol_op_complete cb_fn, void *cb_arg)
1216 {
1217 	struct spdk_blob *blob = lvol->blob;
1218 	struct spdk_lvol_store *lvs = lvol->lvol_store;
1219 	struct spdk_lvol_req *req;
1220 	uint64_t new_clusters = spdk_divide_round_up(sz, spdk_bs_get_cluster_size(lvs->blobstore));
1221 
1222 	req = calloc(1, sizeof(*req));
1223 	if (!req) {
1224 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1225 		cb_fn(cb_arg, -ENOMEM);
1226 		return;
1227 	}
1228 	req->cb_fn = cb_fn;
1229 	req->cb_arg = cb_arg;
1230 	req->lvol = lvol;
1231 
1232 	spdk_blob_resize(blob, new_clusters, lvol_blob_resize_cb, req);
1233 }
1234 
1235 static void
1236 lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
1237 {
1238 	struct spdk_lvol_req *req = cb_arg;
1239 
1240 	req->cb_fn(req->cb_arg, lvolerrno);
1241 	free(req);
1242 }
1243 
1244 void
1245 spdk_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1246 {
1247 	struct spdk_lvol_req *req;
1248 
1249 	req = calloc(1, sizeof(*req));
1250 	if (!req) {
1251 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1252 		cb_fn(cb_arg, -ENOMEM);
1253 		return;
1254 	}
1255 	req->cb_fn = cb_fn;
1256 	req->cb_arg = cb_arg;
1257 
1258 	spdk_blob_set_read_only(lvol->blob);
1259 	spdk_blob_sync_md(lvol->blob, lvol_set_read_only_cb, req);
1260 }
1261 
1262 static void
1263 lvol_rename_cb(void *cb_arg, int lvolerrno)
1264 {
1265 	struct spdk_lvol_req *req = cb_arg;
1266 
1267 	if (lvolerrno != 0) {
1268 		SPDK_ERRLOG("Lvol rename operation failed\n");
1269 	} else {
1270 		snprintf(req->lvol->name, sizeof(req->lvol->name), "%s", req->name);
1271 	}
1272 
1273 	req->cb_fn(req->cb_arg, lvolerrno);
1274 	free(req);
1275 }
1276 
1277 void
1278 spdk_lvol_rename(struct spdk_lvol *lvol, const char *new_name,
1279 		 spdk_lvol_op_complete cb_fn, void *cb_arg)
1280 {
1281 	struct spdk_lvol *tmp;
1282 	struct spdk_blob *blob = lvol->blob;
1283 	struct spdk_lvol_req *req;
1284 	int rc;
1285 
1286 	/* Check if new name is current lvol name.
1287 	 * If so, return success immediately */
1288 	if (strncmp(lvol->name, new_name, SPDK_LVOL_NAME_MAX) == 0) {
1289 		cb_fn(cb_arg, 0);
1290 		return;
1291 	}
1292 
1293 	/* Check if lvol with 'new_name' already exists in lvolstore */
1294 	TAILQ_FOREACH(tmp, &lvol->lvol_store->lvols, link) {
1295 		if (strncmp(tmp->name, new_name, SPDK_LVOL_NAME_MAX) == 0) {
1296 			SPDK_ERRLOG("Lvol %s already exists in lvol store %s\n", new_name, lvol->lvol_store->name);
1297 			cb_fn(cb_arg, -EEXIST);
1298 			return;
1299 		}
1300 	}
1301 
1302 	req = calloc(1, sizeof(*req));
1303 	if (!req) {
1304 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1305 		cb_fn(cb_arg, -ENOMEM);
1306 		return;
1307 	}
1308 	req->cb_fn = cb_fn;
1309 	req->cb_arg = cb_arg;
1310 	req->lvol = lvol;
1311 	snprintf(req->name, sizeof(req->name), "%s", new_name);
1312 
1313 	rc = spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1314 	if (rc < 0) {
1315 		free(req);
1316 		cb_fn(cb_arg, rc);
1317 		return;
1318 	}
1319 
1320 	spdk_blob_sync_md(blob, lvol_rename_cb, req);
1321 }
1322 
1323 void
1324 spdk_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1325 {
1326 	struct spdk_lvol_req *req;
1327 	struct spdk_blob_store *bs;
1328 
1329 	assert(cb_fn != NULL);
1330 
1331 	if (lvol == NULL) {
1332 		SPDK_ERRLOG("lvol does not exist\n");
1333 		cb_fn(cb_arg, -ENODEV);
1334 		return;
1335 	}
1336 
1337 	if (lvol->ref_count != 0) {
1338 		SPDK_ERRLOG("Cannot destroy lvol %s because it is still open\n", lvol->unique_id);
1339 		cb_fn(cb_arg, -EBUSY);
1340 		return;
1341 	}
1342 
1343 	lvol->action_in_progress = true;
1344 
1345 	req = calloc(1, sizeof(*req));
1346 	if (!req) {
1347 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1348 		cb_fn(cb_arg, -ENOMEM);
1349 		return;
1350 	}
1351 
1352 	req->cb_fn = cb_fn;
1353 	req->cb_arg = cb_arg;
1354 	req->lvol = lvol;
1355 	bs = lvol->lvol_store->blobstore;
1356 
1357 	spdk_bs_delete_blob(bs, lvol->blob_id, lvol_delete_blob_cb, req);
1358 }
1359 
1360 void
1361 spdk_lvol_close(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1362 {
1363 	struct spdk_lvol_req *req;
1364 
1365 	assert(cb_fn != NULL);
1366 
1367 	if (lvol == NULL) {
1368 		SPDK_ERRLOG("lvol does not exist\n");
1369 		cb_fn(cb_arg, -ENODEV);
1370 		return;
1371 	}
1372 
1373 	if (lvol->ref_count > 1) {
1374 		lvol->ref_count--;
1375 		cb_fn(cb_arg, 0);
1376 		return;
1377 	} else if (lvol->ref_count == 0) {
1378 		cb_fn(cb_arg, -EINVAL);
1379 		return;
1380 	}
1381 
1382 	lvol->action_in_progress = true;
1383 
1384 	req = calloc(1, sizeof(*req));
1385 	if (!req) {
1386 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1387 		cb_fn(cb_arg, -ENOMEM);
1388 		return;
1389 	}
1390 
1391 	req->cb_fn = cb_fn;
1392 	req->cb_arg = cb_arg;
1393 	req->lvol = lvol;
1394 
1395 	spdk_blob_close(lvol->blob, lvol_close_blob_cb, req);
1396 }
1397 
1398 struct spdk_io_channel *
1399 spdk_lvol_get_io_channel(struct spdk_lvol *lvol)
1400 {
1401 	return spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1402 }
1403 
1404 static void
1405 lvol_inflate_cb(void *cb_arg, int lvolerrno)
1406 {
1407 	struct spdk_lvol_req *req = cb_arg;
1408 
1409 	spdk_bs_free_io_channel(req->channel);
1410 
1411 	if (lvolerrno < 0) {
1412 		SPDK_ERRLOG("Could not inflate lvol\n");
1413 	}
1414 
1415 	req->cb_fn(req->cb_arg, lvolerrno);
1416 	free(req);
1417 }
1418 
1419 void
1420 spdk_lvol_inflate(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1421 {
1422 	struct spdk_lvol_req *req;
1423 	spdk_blob_id blob_id;
1424 
1425 	assert(cb_fn != NULL);
1426 
1427 	if (lvol == NULL) {
1428 		SPDK_ERRLOG("Lvol does not exist\n");
1429 		cb_fn(cb_arg, -ENODEV);
1430 		return;
1431 	}
1432 
1433 	req = calloc(1, sizeof(*req));
1434 	if (!req) {
1435 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1436 		cb_fn(cb_arg, -ENOMEM);
1437 		return;
1438 	}
1439 
1440 	req->cb_fn = cb_fn;
1441 	req->cb_arg = cb_arg;
1442 	req->channel = spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1443 	if (req->channel == NULL) {
1444 		SPDK_ERRLOG("Cannot alloc io channel for lvol inflate request\n");
1445 		free(req);
1446 		cb_fn(cb_arg, -ENOMEM);
1447 		return;
1448 	}
1449 
1450 	blob_id = spdk_blob_get_id(lvol->blob);
1451 	spdk_bs_inflate_blob(lvol->lvol_store->blobstore, req->channel, blob_id, lvol_inflate_cb,
1452 			     req);
1453 }
1454 
1455 void
1456 spdk_lvol_decouple_parent(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1457 {
1458 	struct spdk_lvol_req *req;
1459 	spdk_blob_id blob_id;
1460 
1461 	assert(cb_fn != NULL);
1462 
1463 	if (lvol == NULL) {
1464 		SPDK_ERRLOG("Lvol does not exist\n");
1465 		cb_fn(cb_arg, -ENODEV);
1466 		return;
1467 	}
1468 
1469 	req = calloc(1, sizeof(*req));
1470 	if (!req) {
1471 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1472 		cb_fn(cb_arg, -ENOMEM);
1473 		return;
1474 	}
1475 
1476 	req->cb_fn = cb_fn;
1477 	req->cb_arg = cb_arg;
1478 	req->channel = spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1479 	if (req->channel == NULL) {
1480 		SPDK_ERRLOG("Cannot alloc io channel for lvol inflate request\n");
1481 		free(req);
1482 		cb_fn(cb_arg, -ENOMEM);
1483 		return;
1484 	}
1485 
1486 	blob_id = spdk_blob_get_id(lvol->blob);
1487 	spdk_bs_blob_decouple_parent(lvol->lvol_store->blobstore, req->channel, blob_id,
1488 				     lvol_inflate_cb, req);
1489 }
1490 
1491 void
1492 spdk_lvs_grow(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
1493 {
1494 	struct spdk_lvs_with_handle_req *req;
1495 	struct spdk_bs_opts opts = {};
1496 
1497 	assert(cb_fn != NULL);
1498 
1499 	if (bs_dev == NULL) {
1500 		SPDK_ERRLOG("Blobstore device does not exist\n");
1501 		cb_fn(cb_arg, NULL, -ENODEV);
1502 		return;
1503 	}
1504 
1505 	req = calloc(1, sizeof(*req));
1506 	if (req == NULL) {
1507 		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
1508 		cb_fn(cb_arg, NULL, -ENOMEM);
1509 		return;
1510 	}
1511 
1512 	req->cb_fn = cb_fn;
1513 	req->cb_arg = cb_arg;
1514 	req->bs_dev = bs_dev;
1515 
1516 	lvs_bs_opts_init(&opts);
1517 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "LVOLSTORE");
1518 
1519 	spdk_bs_grow(bs_dev, &opts, lvs_load_cb, req);
1520 }
1521