xref: /spdk/lib/lvol/lvol.c (revision 91a594ad8b3c0d00f25c9a20dfc8f1f2dfce81ce)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk_internal/lvolstore.h"
35 #include "spdk_internal/log.h"
36 #include "spdk/string.h"
37 #include "spdk/thread.h"
38 #include "spdk/blob_bdev.h"
39 #include "spdk/util.h"
40 
41 /* Default blob channel opts for lvol */
42 #define SPDK_LVOL_BLOB_OPTS_CHANNEL_OPS 512
43 
44 #define LVOL_NAME "name"
45 
46 SPDK_LOG_REGISTER_COMPONENT("lvol", SPDK_LOG_LVOL)
47 
48 static TAILQ_HEAD(, spdk_lvol_store) g_lvol_stores = TAILQ_HEAD_INITIALIZER(g_lvol_stores);
49 static pthread_mutex_t g_lvol_stores_mutex = PTHREAD_MUTEX_INITIALIZER;
50 
51 static int
52 _spdk_add_lvs_to_list(struct spdk_lvol_store *lvs)
53 {
54 	struct spdk_lvol_store *tmp;
55 	bool name_conflict = false;
56 
57 	pthread_mutex_lock(&g_lvol_stores_mutex);
58 	TAILQ_FOREACH(tmp, &g_lvol_stores, link) {
59 		if (!strncmp(lvs->name, tmp->name, SPDK_LVS_NAME_MAX)) {
60 			name_conflict = true;
61 			break;
62 		}
63 	}
64 	if (!name_conflict) {
65 		lvs->on_list = true;
66 		TAILQ_INSERT_TAIL(&g_lvol_stores, lvs, link);
67 	}
68 	pthread_mutex_unlock(&g_lvol_stores_mutex);
69 
70 	return name_conflict ? -1 : 0;
71 }
72 
73 static void
74 _spdk_lvs_free(struct spdk_lvol_store *lvs)
75 {
76 	pthread_mutex_lock(&g_lvol_stores_mutex);
77 	if (lvs->on_list) {
78 		TAILQ_REMOVE(&g_lvol_stores, lvs, link);
79 	}
80 	pthread_mutex_unlock(&g_lvol_stores_mutex);
81 
82 	free(lvs);
83 }
84 
85 static void
86 _spdk_lvol_free(struct spdk_lvol *lvol)
87 {
88 	free(lvol);
89 }
90 
91 static void
92 _spdk_lvol_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
93 {
94 	struct spdk_lvol_with_handle_req *req = cb_arg;
95 	struct spdk_lvol *lvol = req->lvol;
96 
97 	if (lvolerrno != 0) {
98 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Failed to open lvol %s\n", lvol->unique_id);
99 		goto end;
100 	}
101 
102 	lvol->ref_count++;
103 	lvol->blob = blob;
104 end:
105 	req->cb_fn(req->cb_arg, lvol, lvolerrno);
106 	free(req);
107 }
108 
109 void
110 spdk_lvol_open(struct spdk_lvol *lvol, spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
111 {
112 	struct spdk_lvol_with_handle_req *req;
113 	struct spdk_blob_open_opts opts;
114 
115 	assert(cb_fn != NULL);
116 
117 	if (lvol == NULL) {
118 		SPDK_ERRLOG("lvol does not exist\n");
119 		cb_fn(cb_arg, NULL, -ENODEV);
120 		return;
121 	}
122 
123 	if (lvol->action_in_progress == true) {
124 		SPDK_ERRLOG("Cannot open lvol - operations on lvol pending\n");
125 		cb_fn(cb_arg, lvol, -EBUSY);
126 		return;
127 	}
128 
129 	if (lvol->ref_count > 0) {
130 		lvol->ref_count++;
131 		cb_fn(cb_arg, lvol, 0);
132 		return;
133 	}
134 
135 	req = calloc(1, sizeof(*req));
136 	if (req == NULL) {
137 		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
138 		cb_fn(cb_arg, NULL, -ENOMEM);
139 		return;
140 	}
141 
142 	req->cb_fn = cb_fn;
143 	req->cb_arg = cb_arg;
144 	req->lvol = lvol;
145 
146 	spdk_blob_open_opts_init(&opts);
147 	opts.clear_method = lvol->clear_method;
148 
149 	spdk_bs_open_blob_ext(lvol->lvol_store->blobstore, lvol->blob_id, &opts, _spdk_lvol_open_cb, req);
150 }
151 
152 static void
153 _spdk_bs_unload_with_error_cb(void *cb_arg, int lvolerrno)
154 {
155 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
156 
157 	req->cb_fn(req->cb_arg, NULL, req->lvserrno);
158 	free(req);
159 }
160 
161 static void
162 _spdk_load_next_lvol(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
163 {
164 	struct spdk_lvs_with_handle_req *req = cb_arg;
165 	struct spdk_lvol_store *lvs = req->lvol_store;
166 	struct spdk_blob_store *bs = lvs->blobstore;
167 	struct spdk_lvol *lvol, *tmp;
168 	spdk_blob_id blob_id;
169 	const char *attr;
170 	size_t value_len;
171 	int rc;
172 
173 	if (lvolerrno == -ENOENT) {
174 		/* Finished iterating */
175 		req->cb_fn(req->cb_arg, lvs, 0);
176 		free(req);
177 		return;
178 	} else if (lvolerrno < 0) {
179 		SPDK_ERRLOG("Failed to fetch blobs list\n");
180 		req->lvserrno = lvolerrno;
181 		goto invalid;
182 	}
183 
184 	blob_id = spdk_blob_get_id(blob);
185 
186 	if (blob_id == lvs->super_blob_id) {
187 		SPDK_INFOLOG(SPDK_LOG_LVOL, "found superblob %"PRIu64"\n", (uint64_t)blob_id);
188 		spdk_bs_iter_next(bs, blob, _spdk_load_next_lvol, req);
189 		return;
190 	}
191 
192 	lvol = calloc(1, sizeof(*lvol));
193 	if (!lvol) {
194 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
195 		req->lvserrno = -ENOMEM;
196 		goto invalid;
197 	}
198 
199 	lvol->blob = blob;
200 	lvol->blob_id = blob_id;
201 	lvol->lvol_store = lvs;
202 
203 	rc = spdk_blob_get_xattr_value(blob, "uuid", (const void **)&attr, &value_len);
204 	if (rc != 0 || value_len != SPDK_UUID_STRING_LEN || attr[SPDK_UUID_STRING_LEN - 1] != '\0' ||
205 	    spdk_uuid_parse(&lvol->uuid, attr) != 0) {
206 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Missing or corrupt lvol uuid\n");
207 		memset(&lvol->uuid, 0, sizeof(lvol->uuid));
208 	}
209 	spdk_uuid_fmt_lower(lvol->uuid_str, sizeof(lvol->uuid_str), &lvol->uuid);
210 
211 	if (!spdk_mem_all_zero(&lvol->uuid, sizeof(lvol->uuid))) {
212 		snprintf(lvol->unique_id, sizeof(lvol->unique_id), "%s", lvol->uuid_str);
213 	} else {
214 		spdk_uuid_fmt_lower(lvol->unique_id, sizeof(lvol->unique_id), &lvol->lvol_store->uuid);
215 		value_len = strlen(lvol->unique_id);
216 		snprintf(lvol->unique_id + value_len, sizeof(lvol->unique_id) - value_len, "_%"PRIu64,
217 			 (uint64_t)blob_id);
218 	}
219 
220 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&attr, &value_len);
221 	if (rc != 0 || value_len > SPDK_LVOL_NAME_MAX) {
222 		SPDK_ERRLOG("Cannot assign lvol name\n");
223 		_spdk_lvol_free(lvol);
224 		req->lvserrno = -EINVAL;
225 		goto invalid;
226 	}
227 
228 	snprintf(lvol->name, sizeof(lvol->name), "%s", attr);
229 
230 	TAILQ_INSERT_TAIL(&lvs->lvols, lvol, link);
231 
232 	lvs->lvol_count++;
233 
234 	SPDK_INFOLOG(SPDK_LOG_LVOL, "added lvol %s (%s)\n", lvol->unique_id, lvol->uuid_str);
235 
236 	spdk_bs_iter_next(bs, blob, _spdk_load_next_lvol, req);
237 
238 	return;
239 
240 invalid:
241 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
242 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
243 		free(lvol);
244 	}
245 
246 	_spdk_lvs_free(lvs);
247 	spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
248 }
249 
250 static void
251 _spdk_close_super_cb(void *cb_arg, int lvolerrno)
252 {
253 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
254 	struct spdk_lvol_store *lvs = req->lvol_store;
255 	struct spdk_blob_store *bs = lvs->blobstore;
256 
257 	if (lvolerrno != 0) {
258 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Could not close super blob\n");
259 		_spdk_lvs_free(lvs);
260 		req->lvserrno = -ENODEV;
261 		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
262 		return;
263 	}
264 
265 	/* Start loading lvols */
266 	spdk_bs_iter_first(lvs->blobstore, _spdk_load_next_lvol, req);
267 }
268 
269 static void
270 _spdk_close_super_blob_with_error_cb(void *cb_arg, int lvolerrno)
271 {
272 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
273 	struct spdk_lvol_store *lvs = req->lvol_store;
274 	struct spdk_blob_store *bs = lvs->blobstore;
275 
276 	_spdk_lvs_free(lvs);
277 
278 	spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
279 }
280 
281 static void
282 _spdk_lvs_read_uuid(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
283 {
284 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
285 	struct spdk_lvol_store *lvs = req->lvol_store;
286 	struct spdk_blob_store *bs = lvs->blobstore;
287 	const char *attr;
288 	size_t value_len;
289 	int rc;
290 
291 	if (lvolerrno != 0) {
292 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Could not open super blob\n");
293 		_spdk_lvs_free(lvs);
294 		req->lvserrno = -ENODEV;
295 		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
296 		return;
297 	}
298 
299 	rc = spdk_blob_get_xattr_value(blob, "uuid", (const void **)&attr, &value_len);
300 	if (rc != 0 || value_len != SPDK_UUID_STRING_LEN || attr[SPDK_UUID_STRING_LEN - 1] != '\0') {
301 		SPDK_INFOLOG(SPDK_LOG_LVOL, "missing or incorrect UUID\n");
302 		req->lvserrno = -EINVAL;
303 		spdk_blob_close(blob, _spdk_close_super_blob_with_error_cb, req);
304 		return;
305 	}
306 
307 	if (spdk_uuid_parse(&lvs->uuid, attr)) {
308 		SPDK_INFOLOG(SPDK_LOG_LVOL, "incorrect UUID '%s'\n", attr);
309 		req->lvserrno = -EINVAL;
310 		spdk_blob_close(blob, _spdk_close_super_blob_with_error_cb, req);
311 		return;
312 	}
313 
314 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&attr, &value_len);
315 	if (rc != 0 || value_len > SPDK_LVS_NAME_MAX) {
316 		SPDK_INFOLOG(SPDK_LOG_LVOL, "missing or invalid name\n");
317 		req->lvserrno = -EINVAL;
318 		spdk_blob_close(blob, _spdk_close_super_blob_with_error_cb, req);
319 		return;
320 	}
321 
322 	snprintf(lvs->name, sizeof(lvs->name), "%s", attr);
323 
324 	rc = _spdk_add_lvs_to_list(lvs);
325 	if (rc) {
326 		SPDK_INFOLOG(SPDK_LOG_LVOL, "lvolstore with name %s already exists\n", lvs->name);
327 		req->lvserrno = -EEXIST;
328 		spdk_blob_close(blob, _spdk_close_super_blob_with_error_cb, req);
329 		return;
330 	}
331 
332 	lvs->super_blob_id = spdk_blob_get_id(blob);
333 
334 	spdk_blob_close(blob, _spdk_close_super_cb, req);
335 }
336 
337 static void
338 _spdk_lvs_open_super(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
339 {
340 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
341 	struct spdk_lvol_store *lvs = req->lvol_store;
342 	struct spdk_blob_store *bs = lvs->blobstore;
343 
344 	if (lvolerrno != 0) {
345 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Super blob not found\n");
346 		_spdk_lvs_free(lvs);
347 		req->lvserrno = -ENODEV;
348 		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
349 		return;
350 	}
351 
352 	spdk_bs_open_blob(bs, blobid, _spdk_lvs_read_uuid, req);
353 }
354 
355 static void
356 _spdk_lvs_load_cb(void *cb_arg, struct spdk_blob_store *bs, int lvolerrno)
357 {
358 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
359 	struct spdk_lvol_store *lvs;
360 
361 	if (lvolerrno != 0) {
362 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
363 		free(req);
364 		return;
365 	}
366 
367 	lvs = calloc(1, sizeof(*lvs));
368 	if (lvs == NULL) {
369 		SPDK_ERRLOG("Cannot alloc memory for lvol store\n");
370 		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
371 		return;
372 	}
373 
374 	lvs->blobstore = bs;
375 	lvs->bs_dev = req->bs_dev;
376 	TAILQ_INIT(&lvs->lvols);
377 	TAILQ_INIT(&lvs->pending_lvols);
378 
379 	req->lvol_store = lvs;
380 
381 	spdk_bs_get_super(bs, _spdk_lvs_open_super, req);
382 }
383 
384 static void
385 spdk_lvs_bs_opts_init(struct spdk_bs_opts *opts)
386 {
387 	spdk_bs_opts_init(opts);
388 	opts->max_channel_ops = SPDK_LVOL_BLOB_OPTS_CHANNEL_OPS;
389 }
390 
391 void
392 spdk_lvs_load(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
393 {
394 	struct spdk_lvs_with_handle_req *req;
395 	struct spdk_bs_opts opts = {};
396 
397 	assert(cb_fn != NULL);
398 
399 	if (bs_dev == NULL) {
400 		SPDK_ERRLOG("Blobstore device does not exist\n");
401 		cb_fn(cb_arg, NULL, -ENODEV);
402 		return;
403 	}
404 
405 	req = calloc(1, sizeof(*req));
406 	if (req == NULL) {
407 		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
408 		cb_fn(cb_arg, NULL, -ENOMEM);
409 		return;
410 	}
411 
412 	req->cb_fn = cb_fn;
413 	req->cb_arg = cb_arg;
414 	req->bs_dev = bs_dev;
415 
416 	spdk_lvs_bs_opts_init(&opts);
417 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "LVOLSTORE");
418 
419 	spdk_bs_load(bs_dev, &opts, _spdk_lvs_load_cb, req);
420 }
421 
422 static void
423 _spdk_remove_bs_on_error_cb(void *cb_arg, int bserrno)
424 {
425 }
426 
427 static void
428 _spdk_super_create_close_cb(void *cb_arg, int lvolerrno)
429 {
430 	struct spdk_lvs_with_handle_req *req = cb_arg;
431 	struct spdk_lvol_store *lvs = req->lvol_store;
432 
433 	if (lvolerrno < 0) {
434 		SPDK_ERRLOG("Lvol store init failed: could not close super blob\n");
435 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
436 		spdk_bs_destroy(lvs->blobstore, _spdk_remove_bs_on_error_cb, NULL);
437 		_spdk_lvs_free(lvs);
438 		free(req);
439 		return;
440 	}
441 
442 	req->cb_fn(req->cb_arg, lvs, lvolerrno);
443 	free(req);
444 }
445 
446 static void
447 _spdk_super_blob_set_cb(void *cb_arg, int lvolerrno)
448 {
449 	struct spdk_lvs_with_handle_req *req = cb_arg;
450 	struct spdk_lvol_store *lvs = req->lvol_store;
451 	struct spdk_blob *blob = lvs->super_blob;
452 
453 	if (lvolerrno < 0) {
454 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
455 		SPDK_ERRLOG("Lvol store init failed: could not set uuid for super blob\n");
456 		spdk_bs_destroy(lvs->blobstore, _spdk_remove_bs_on_error_cb, NULL);
457 		_spdk_lvs_free(lvs);
458 		free(req);
459 		return;
460 	}
461 
462 	spdk_blob_close(blob, _spdk_super_create_close_cb, req);
463 }
464 
465 static void
466 _spdk_super_blob_init_cb(void *cb_arg, int lvolerrno)
467 {
468 	struct spdk_lvs_with_handle_req *req = cb_arg;
469 	struct spdk_lvol_store *lvs = req->lvol_store;
470 	struct spdk_blob *blob = lvs->super_blob;
471 	char uuid[SPDK_UUID_STRING_LEN];
472 
473 	if (lvolerrno < 0) {
474 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
475 		SPDK_ERRLOG("Lvol store init failed: could not set super blob\n");
476 		spdk_bs_destroy(lvs->blobstore, _spdk_remove_bs_on_error_cb, NULL);
477 		_spdk_lvs_free(lvs);
478 		free(req);
479 		return;
480 	}
481 
482 	spdk_uuid_fmt_lower(uuid, sizeof(uuid), &lvs->uuid);
483 
484 	spdk_blob_set_xattr(blob, "uuid", uuid, sizeof(uuid));
485 	spdk_blob_set_xattr(blob, "name", lvs->name, strnlen(lvs->name, SPDK_LVS_NAME_MAX) + 1);
486 	spdk_blob_sync_md(blob, _spdk_super_blob_set_cb, req);
487 }
488 
489 static void
490 _spdk_super_blob_create_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
491 {
492 	struct spdk_lvs_with_handle_req *req = cb_arg;
493 	struct spdk_lvol_store *lvs = req->lvol_store;
494 
495 	if (lvolerrno < 0) {
496 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
497 		SPDK_ERRLOG("Lvol store init failed: could not open super blob\n");
498 		spdk_bs_destroy(lvs->blobstore, _spdk_remove_bs_on_error_cb, NULL);
499 		_spdk_lvs_free(lvs);
500 		free(req);
501 		return;
502 	}
503 
504 	lvs->super_blob = blob;
505 	lvs->super_blob_id = spdk_blob_get_id(blob);
506 
507 	spdk_bs_set_super(lvs->blobstore, lvs->super_blob_id, _spdk_super_blob_init_cb, req);
508 }
509 
510 static void
511 _spdk_super_blob_create_cb(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
512 {
513 	struct spdk_lvs_with_handle_req *req = cb_arg;
514 	struct spdk_lvol_store *lvs = req->lvol_store;
515 	struct spdk_blob_store *bs;
516 
517 	if (lvolerrno < 0) {
518 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
519 		SPDK_ERRLOG("Lvol store init failed: could not create super blob\n");
520 		spdk_bs_destroy(lvs->blobstore, _spdk_remove_bs_on_error_cb, NULL);
521 		_spdk_lvs_free(lvs);
522 		free(req);
523 		return;
524 	}
525 
526 	bs = req->lvol_store->blobstore;
527 
528 	spdk_bs_open_blob(bs, blobid, _spdk_super_blob_create_open_cb, req);
529 }
530 
531 static void
532 _spdk_lvs_init_cb(void *cb_arg, struct spdk_blob_store *bs, int lvserrno)
533 {
534 	struct spdk_lvs_with_handle_req *lvs_req = cb_arg;
535 	struct spdk_lvol_store *lvs = lvs_req->lvol_store;
536 
537 	if (lvserrno != 0) {
538 		assert(bs == NULL);
539 		lvs_req->cb_fn(lvs_req->cb_arg, NULL, lvserrno);
540 		SPDK_ERRLOG("Lvol store init failed: could not initialize blobstore\n");
541 		_spdk_lvs_free(lvs);
542 		free(lvs_req);
543 		return;
544 	}
545 
546 	assert(bs != NULL);
547 	lvs->blobstore = bs;
548 	TAILQ_INIT(&lvs->lvols);
549 	TAILQ_INIT(&lvs->pending_lvols);
550 
551 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol store initialized\n");
552 
553 	/* create super blob */
554 	spdk_bs_create_blob(lvs->blobstore, _spdk_super_blob_create_cb, lvs_req);
555 }
556 
557 void
558 spdk_lvs_opts_init(struct spdk_lvs_opts *o)
559 {
560 	o->cluster_sz = SPDK_LVS_OPTS_CLUSTER_SZ;
561 	o->clear_method = LVS_CLEAR_WITH_UNMAP;
562 	memset(o->name, 0, sizeof(o->name));
563 }
564 
565 static void
566 _spdk_setup_lvs_opts(struct spdk_bs_opts *bs_opts, struct spdk_lvs_opts *o)
567 {
568 	assert(o != NULL);
569 	spdk_lvs_bs_opts_init(bs_opts);
570 	bs_opts->cluster_sz = o->cluster_sz;
571 	bs_opts->clear_method = (enum bs_clear_method)o->clear_method;
572 }
573 
574 int
575 spdk_lvs_init(struct spdk_bs_dev *bs_dev, struct spdk_lvs_opts *o,
576 	      spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
577 {
578 	struct spdk_lvol_store *lvs;
579 	struct spdk_lvs_with_handle_req *lvs_req;
580 	struct spdk_bs_opts opts = {};
581 	int rc;
582 
583 	if (bs_dev == NULL) {
584 		SPDK_ERRLOG("Blobstore device does not exist\n");
585 		return -ENODEV;
586 	}
587 
588 	if (o == NULL) {
589 		SPDK_ERRLOG("spdk_lvs_opts not specified\n");
590 		return -EINVAL;
591 	}
592 
593 	_spdk_setup_lvs_opts(&opts, o);
594 
595 	if (strnlen(o->name, SPDK_LVS_NAME_MAX) == SPDK_LVS_NAME_MAX) {
596 		SPDK_ERRLOG("Name has no null terminator.\n");
597 		return -EINVAL;
598 	}
599 
600 	if (strnlen(o->name, SPDK_LVS_NAME_MAX) == 0) {
601 		SPDK_ERRLOG("No name specified.\n");
602 		return -EINVAL;
603 	}
604 
605 	lvs = calloc(1, sizeof(*lvs));
606 	if (!lvs) {
607 		SPDK_ERRLOG("Cannot alloc memory for lvol store base pointer\n");
608 		return -ENOMEM;
609 	}
610 
611 	spdk_uuid_generate(&lvs->uuid);
612 	snprintf(lvs->name, sizeof(lvs->name), "%s", o->name);
613 
614 	rc = _spdk_add_lvs_to_list(lvs);
615 	if (rc) {
616 		SPDK_ERRLOG("lvolstore with name %s already exists\n", lvs->name);
617 		_spdk_lvs_free(lvs);
618 		return -EEXIST;
619 	}
620 
621 	lvs_req = calloc(1, sizeof(*lvs_req));
622 	if (!lvs_req) {
623 		_spdk_lvs_free(lvs);
624 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
625 		return -ENOMEM;
626 	}
627 
628 	assert(cb_fn != NULL);
629 	lvs_req->cb_fn = cb_fn;
630 	lvs_req->cb_arg = cb_arg;
631 	lvs_req->lvol_store = lvs;
632 	lvs->bs_dev = bs_dev;
633 	lvs->destruct = false;
634 
635 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "LVOLSTORE");
636 
637 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Initializing lvol store\n");
638 	spdk_bs_init(bs_dev, &opts, _spdk_lvs_init_cb, lvs_req);
639 
640 	return 0;
641 }
642 
643 static void
644 _spdk_lvs_rename_cb(void *cb_arg, int lvolerrno)
645 {
646 	struct spdk_lvs_req *req = cb_arg;
647 
648 	if (lvolerrno != 0) {
649 		req->lvserrno = lvolerrno;
650 	}
651 	if (req->lvserrno != 0) {
652 		SPDK_ERRLOG("Lvol store rename operation failed\n");
653 		/* Lvs renaming failed, so we should 'clear' new_name.
654 		 * Otherwise it could cause a failure on the next attepmt to change the name to 'new_name'  */
655 		snprintf(req->lvol_store->new_name,
656 			 sizeof(req->lvol_store->new_name),
657 			 "%s", req->lvol_store->name);
658 	} else {
659 		/* Update lvs name with new_name */
660 		snprintf(req->lvol_store->name,
661 			 sizeof(req->lvol_store->name),
662 			 "%s", req->lvol_store->new_name);
663 	}
664 
665 	req->cb_fn(req->cb_arg, req->lvserrno);
666 	free(req);
667 }
668 
669 static void
670 _spdk_lvs_rename_sync_cb(void *cb_arg, int lvolerrno)
671 {
672 	struct spdk_lvs_req *req = cb_arg;
673 	struct spdk_blob *blob = req->lvol_store->super_blob;
674 
675 	if (lvolerrno < 0) {
676 		req->lvserrno = lvolerrno;
677 	}
678 
679 	spdk_blob_close(blob, _spdk_lvs_rename_cb, req);
680 }
681 
682 static void
683 _spdk_lvs_rename_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
684 {
685 	struct spdk_lvs_req *req = cb_arg;
686 	int rc;
687 
688 	if (lvolerrno < 0) {
689 		_spdk_lvs_rename_cb(cb_arg, lvolerrno);
690 		return;
691 	}
692 
693 	rc = spdk_blob_set_xattr(blob, "name", req->lvol_store->new_name,
694 				 strlen(req->lvol_store->new_name) + 1);
695 	if (rc < 0) {
696 		req->lvserrno = rc;
697 		_spdk_lvs_rename_sync_cb(req, rc);
698 		return;
699 	}
700 
701 	req->lvol_store->super_blob = blob;
702 
703 	spdk_blob_sync_md(blob, _spdk_lvs_rename_sync_cb, req);
704 }
705 
706 void
707 spdk_lvs_rename(struct spdk_lvol_store *lvs, const char *new_name,
708 		spdk_lvs_op_complete cb_fn, void *cb_arg)
709 {
710 	struct spdk_lvs_req *req;
711 	struct spdk_lvol_store *tmp;
712 
713 	/* Check if new name is current lvs name.
714 	 * If so, return success immediately */
715 	if (strncmp(lvs->name, new_name, SPDK_LVS_NAME_MAX) == 0) {
716 		cb_fn(cb_arg, 0);
717 		return;
718 	}
719 
720 	/* Check if new or new_name is already used in other lvs */
721 	pthread_mutex_lock(&g_lvol_stores_mutex);
722 	TAILQ_FOREACH(tmp, &g_lvol_stores, link) {
723 		if (!strncmp(new_name, tmp->name, SPDK_LVS_NAME_MAX) ||
724 		    !strncmp(new_name, tmp->new_name, SPDK_LVS_NAME_MAX)) {
725 			pthread_mutex_unlock(&g_lvol_stores_mutex);
726 			cb_fn(cb_arg, -EEXIST);
727 			return;
728 		}
729 	}
730 	pthread_mutex_unlock(&g_lvol_stores_mutex);
731 
732 	req = calloc(1, sizeof(*req));
733 	if (!req) {
734 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
735 		cb_fn(cb_arg, -ENOMEM);
736 		return;
737 	}
738 	snprintf(lvs->new_name, sizeof(lvs->new_name), "%s", new_name);
739 	req->lvol_store = lvs;
740 	req->cb_fn = cb_fn;
741 	req->cb_arg = cb_arg;
742 
743 	spdk_bs_open_blob(lvs->blobstore, lvs->super_blob_id, _spdk_lvs_rename_open_cb, req);
744 }
745 
746 static void
747 _lvs_unload_cb(void *cb_arg, int lvserrno)
748 {
749 	struct spdk_lvs_req *lvs_req = cb_arg;
750 
751 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol store unloaded\n");
752 	assert(lvs_req->cb_fn != NULL);
753 	lvs_req->cb_fn(lvs_req->cb_arg, lvserrno);
754 	free(lvs_req);
755 }
756 
757 int
758 spdk_lvs_unload(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn,
759 		void *cb_arg)
760 {
761 	struct spdk_lvs_req *lvs_req;
762 	struct spdk_lvol *lvol, *tmp;
763 
764 	if (lvs == NULL) {
765 		SPDK_ERRLOG("Lvol store is NULL\n");
766 		return -ENODEV;
767 	}
768 
769 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
770 		if (lvol->action_in_progress == true) {
771 			SPDK_ERRLOG("Cannot unload lvol store - operations on lvols pending\n");
772 			cb_fn(cb_arg, -EBUSY);
773 			return -EBUSY;
774 		} else if (lvol->ref_count != 0) {
775 			SPDK_ERRLOG("Lvols still open on lvol store\n");
776 			cb_fn(cb_arg, -EBUSY);
777 			return -EBUSY;
778 		}
779 	}
780 
781 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
782 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
783 		_spdk_lvol_free(lvol);
784 	}
785 
786 	lvs_req = calloc(1, sizeof(*lvs_req));
787 	if (!lvs_req) {
788 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
789 		return -ENOMEM;
790 	}
791 
792 	lvs_req->cb_fn = cb_fn;
793 	lvs_req->cb_arg = cb_arg;
794 
795 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Unloading lvol store\n");
796 	spdk_bs_unload(lvs->blobstore, _lvs_unload_cb, lvs_req);
797 	_spdk_lvs_free(lvs);
798 
799 	return 0;
800 }
801 
802 static void
803 _lvs_destroy_cb(void *cb_arg, int lvserrno)
804 {
805 	struct spdk_lvs_destroy_req *lvs_req = cb_arg;
806 
807 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol store destroyed\n");
808 	assert(lvs_req->cb_fn != NULL);
809 	lvs_req->cb_fn(lvs_req->cb_arg, lvserrno);
810 	free(lvs_req);
811 }
812 
813 static void
814 _lvs_destroy_super_cb(void *cb_arg, int bserrno)
815 {
816 	struct spdk_lvs_destroy_req *lvs_req = cb_arg;
817 	struct spdk_lvol_store *lvs = lvs_req->lvs;
818 
819 	assert(lvs != NULL);
820 
821 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Destroying lvol store\n");
822 	spdk_bs_destroy(lvs->blobstore, _lvs_destroy_cb, lvs_req);
823 	_spdk_lvs_free(lvs);
824 }
825 
826 int
827 spdk_lvs_destroy(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn,
828 		 void *cb_arg)
829 {
830 	struct spdk_lvs_destroy_req *lvs_req;
831 	struct spdk_lvol *iter_lvol, *tmp;
832 
833 	if (lvs == NULL) {
834 		SPDK_ERRLOG("Lvol store is NULL\n");
835 		return -ENODEV;
836 	}
837 
838 	TAILQ_FOREACH_SAFE(iter_lvol, &lvs->lvols, link, tmp) {
839 		if (iter_lvol->action_in_progress == true) {
840 			SPDK_ERRLOG("Cannot destroy lvol store - operations on lvols pending\n");
841 			cb_fn(cb_arg, -EBUSY);
842 			return -EBUSY;
843 		} else if (iter_lvol->ref_count != 0) {
844 			SPDK_ERRLOG("Lvols still open on lvol store\n");
845 			cb_fn(cb_arg, -EBUSY);
846 			return -EBUSY;
847 		}
848 	}
849 
850 	TAILQ_FOREACH_SAFE(iter_lvol, &lvs->lvols, link, tmp) {
851 		free(iter_lvol);
852 	}
853 
854 	lvs_req = calloc(1, sizeof(*lvs_req));
855 	if (!lvs_req) {
856 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
857 		return -ENOMEM;
858 	}
859 
860 	lvs_req->cb_fn = cb_fn;
861 	lvs_req->cb_arg = cb_arg;
862 	lvs_req->lvs = lvs;
863 
864 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Deleting super blob\n");
865 	spdk_bs_delete_blob(lvs->blobstore, lvs->super_blob_id, _lvs_destroy_super_cb, lvs_req);
866 
867 	return 0;
868 }
869 
870 static void
871 _spdk_lvol_close_blob_cb(void *cb_arg, int lvolerrno)
872 {
873 	struct spdk_lvol_req *req = cb_arg;
874 	struct spdk_lvol *lvol = req->lvol;
875 
876 	if (lvolerrno < 0) {
877 		SPDK_ERRLOG("Could not close blob on lvol\n");
878 		_spdk_lvol_free(lvol);
879 		goto end;
880 	}
881 
882 	lvol->ref_count--;
883 	lvol->action_in_progress = false;
884 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol %s closed\n", lvol->unique_id);
885 
886 end:
887 	req->cb_fn(req->cb_arg, lvolerrno);
888 	free(req);
889 }
890 
891 bool
892 spdk_lvol_deletable(struct spdk_lvol *lvol)
893 {
894 	size_t count;
895 
896 	spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
897 	return (count == 0);
898 }
899 
900 static void
901 _spdk_lvol_delete_blob_cb(void *cb_arg, int lvolerrno)
902 {
903 	struct spdk_lvol_req *req = cb_arg;
904 	struct spdk_lvol *lvol = req->lvol;
905 
906 	if (lvolerrno < 0) {
907 		SPDK_ERRLOG("Could not remove blob on lvol gracefully - forced removal\n");
908 	} else {
909 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol %s deleted\n", lvol->unique_id);
910 	}
911 
912 	TAILQ_REMOVE(&lvol->lvol_store->lvols, lvol, link);
913 	_spdk_lvol_free(lvol);
914 	req->cb_fn(req->cb_arg, lvolerrno);
915 	free(req);
916 }
917 
918 static void
919 _spdk_lvol_create_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
920 {
921 	struct spdk_lvol_with_handle_req *req = cb_arg;
922 	struct spdk_lvol *lvol = req->lvol;
923 
924 	TAILQ_REMOVE(&req->lvol->lvol_store->pending_lvols, req->lvol, link);
925 
926 	if (lvolerrno < 0) {
927 		free(lvol);
928 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
929 		free(req);
930 		return;
931 	}
932 
933 	lvol->blob = blob;
934 	lvol->blob_id = spdk_blob_get_id(blob);
935 
936 	TAILQ_INSERT_TAIL(&lvol->lvol_store->lvols, lvol, link);
937 
938 	snprintf(lvol->unique_id, sizeof(lvol->unique_id), "%s", lvol->uuid_str);
939 	lvol->ref_count++;
940 
941 	assert(req->cb_fn != NULL);
942 	req->cb_fn(req->cb_arg, req->lvol, lvolerrno);
943 	free(req);
944 }
945 
946 static void
947 _spdk_lvol_create_cb(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
948 {
949 	struct spdk_lvol_with_handle_req *req = cb_arg;
950 	struct spdk_blob_store *bs;
951 	struct spdk_blob_open_opts opts;
952 
953 	if (lvolerrno < 0) {
954 		TAILQ_REMOVE(&req->lvol->lvol_store->pending_lvols, req->lvol, link);
955 		free(req->lvol);
956 		assert(req->cb_fn != NULL);
957 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
958 		free(req);
959 		return;
960 	}
961 
962 	spdk_blob_open_opts_init(&opts);
963 	opts.clear_method = req->lvol->clear_method;
964 	bs = req->lvol->lvol_store->blobstore;
965 
966 	spdk_bs_open_blob_ext(bs, blobid, &opts, _spdk_lvol_create_open_cb, req);
967 }
968 
969 static void
970 spdk_lvol_get_xattr_value(void *xattr_ctx, const char *name,
971 			  const void **value, size_t *value_len)
972 {
973 	struct spdk_lvol *lvol = xattr_ctx;
974 
975 	if (!strcmp(LVOL_NAME, name)) {
976 		*value = lvol->name;
977 		*value_len = SPDK_LVOL_NAME_MAX;
978 	} else if (!strcmp("uuid", name)) {
979 		*value = lvol->uuid_str;
980 		*value_len = sizeof(lvol->uuid_str);
981 	}
982 }
983 
984 static int
985 _spdk_lvs_verify_lvol_name(struct spdk_lvol_store *lvs, const char *name)
986 {
987 	struct spdk_lvol *tmp;
988 
989 	if (name == NULL || strnlen(name, SPDK_LVOL_NAME_MAX) == 0) {
990 		SPDK_INFOLOG(SPDK_LOG_LVOL, "lvol name not provided.\n");
991 		return -EINVAL;
992 	}
993 
994 	if (strnlen(name, SPDK_LVOL_NAME_MAX) == SPDK_LVOL_NAME_MAX) {
995 		SPDK_ERRLOG("Name has no null terminator.\n");
996 		return -EINVAL;
997 	}
998 
999 	TAILQ_FOREACH(tmp, &lvs->lvols, link) {
1000 		if (!strncmp(name, tmp->name, SPDK_LVOL_NAME_MAX)) {
1001 			SPDK_ERRLOG("lvol with name %s already exists\n", name);
1002 			return -EEXIST;
1003 		}
1004 	}
1005 
1006 	TAILQ_FOREACH(tmp, &lvs->pending_lvols, link) {
1007 		if (!strncmp(name, tmp->name, SPDK_LVOL_NAME_MAX)) {
1008 			SPDK_ERRLOG("lvol with name %s is being already created\n", name);
1009 			return -EEXIST;
1010 		}
1011 	}
1012 
1013 	return 0;
1014 }
1015 
1016 int
1017 spdk_lvol_create(struct spdk_lvol_store *lvs, const char *name, uint64_t sz,
1018 		 bool thin_provision, enum lvol_clear_method clear_method, spdk_lvol_op_with_handle_complete cb_fn,
1019 		 void *cb_arg)
1020 {
1021 	struct spdk_lvol_with_handle_req *req;
1022 	struct spdk_blob_store *bs;
1023 	struct spdk_lvol *lvol;
1024 	struct spdk_blob_opts opts;
1025 	uint64_t num_clusters;
1026 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1027 	int rc;
1028 
1029 	if (lvs == NULL) {
1030 		SPDK_ERRLOG("lvol store does not exist\n");
1031 		return -EINVAL;
1032 	}
1033 
1034 	rc = _spdk_lvs_verify_lvol_name(lvs, name);
1035 	if (rc < 0) {
1036 		return rc;
1037 	}
1038 
1039 	bs = lvs->blobstore;
1040 
1041 	req = calloc(1, sizeof(*req));
1042 	if (!req) {
1043 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1044 		return -ENOMEM;
1045 	}
1046 	req->cb_fn = cb_fn;
1047 	req->cb_arg = cb_arg;
1048 
1049 	lvol = calloc(1, sizeof(*lvol));
1050 	if (!lvol) {
1051 		free(req);
1052 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1053 		return -ENOMEM;
1054 	}
1055 	lvol->lvol_store = lvs;
1056 	num_clusters = spdk_divide_round_up(sz, spdk_bs_get_cluster_size(bs));
1057 	lvol->thin_provision = thin_provision;
1058 	lvol->clear_method = (enum blob_clear_method)clear_method;
1059 	snprintf(lvol->name, sizeof(lvol->name), "%s", name);
1060 	TAILQ_INSERT_TAIL(&lvol->lvol_store->pending_lvols, lvol, link);
1061 	spdk_uuid_generate(&lvol->uuid);
1062 	spdk_uuid_fmt_lower(lvol->uuid_str, sizeof(lvol->uuid_str), &lvol->uuid);
1063 	req->lvol = lvol;
1064 
1065 	spdk_blob_opts_init(&opts);
1066 	opts.thin_provision = thin_provision;
1067 	opts.num_clusters = num_clusters;
1068 	opts.clear_method = lvol->clear_method;
1069 	opts.xattrs.count = SPDK_COUNTOF(xattr_names);
1070 	opts.xattrs.names = xattr_names;
1071 	opts.xattrs.ctx = lvol;
1072 	opts.xattrs.get_value = spdk_lvol_get_xattr_value;
1073 
1074 	spdk_bs_create_blob_ext(lvs->blobstore, &opts, _spdk_lvol_create_cb, req);
1075 
1076 	return 0;
1077 }
1078 
1079 void
1080 spdk_lvol_create_snapshot(struct spdk_lvol *origlvol, const char *snapshot_name,
1081 			  spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1082 {
1083 	struct spdk_lvol_store *lvs;
1084 	struct spdk_lvol *newlvol;
1085 	struct spdk_blob *origblob;
1086 	struct spdk_lvol_with_handle_req *req;
1087 	struct spdk_blob_xattr_opts snapshot_xattrs;
1088 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1089 	int rc;
1090 
1091 	if (origlvol == NULL) {
1092 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol not provided.\n");
1093 		cb_fn(cb_arg, NULL, -EINVAL);
1094 		return;
1095 	}
1096 
1097 	origblob = origlvol->blob;
1098 	lvs = origlvol->lvol_store;
1099 	if (lvs == NULL) {
1100 		SPDK_ERRLOG("lvol store does not exist\n");
1101 		cb_fn(cb_arg, NULL, -EINVAL);
1102 		return;
1103 	}
1104 
1105 	rc = _spdk_lvs_verify_lvol_name(lvs, snapshot_name);
1106 	if (rc < 0) {
1107 		cb_fn(cb_arg, NULL, rc);
1108 		return;
1109 	}
1110 
1111 	req = calloc(1, sizeof(*req));
1112 	if (!req) {
1113 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1114 		cb_fn(cb_arg, NULL, -ENOMEM);
1115 		return;
1116 	}
1117 
1118 	newlvol = calloc(1, sizeof(*newlvol));
1119 	if (!newlvol) {
1120 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1121 		free(req);
1122 		cb_fn(cb_arg, NULL, -ENOMEM);
1123 		return;
1124 	}
1125 
1126 	newlvol->lvol_store = origlvol->lvol_store;
1127 	snprintf(newlvol->name, sizeof(newlvol->name), "%s", snapshot_name);
1128 	TAILQ_INSERT_TAIL(&newlvol->lvol_store->pending_lvols, newlvol, link);
1129 	spdk_uuid_generate(&newlvol->uuid);
1130 	spdk_uuid_fmt_lower(newlvol->uuid_str, sizeof(newlvol->uuid_str), &newlvol->uuid);
1131 	snapshot_xattrs.count = SPDK_COUNTOF(xattr_names);
1132 	snapshot_xattrs.ctx = newlvol;
1133 	snapshot_xattrs.names = xattr_names;
1134 	snapshot_xattrs.get_value = spdk_lvol_get_xattr_value;
1135 	req->lvol = newlvol;
1136 	req->cb_fn = cb_fn;
1137 	req->cb_arg = cb_arg;
1138 
1139 	spdk_bs_create_snapshot(lvs->blobstore, spdk_blob_get_id(origblob), &snapshot_xattrs,
1140 				_spdk_lvol_create_cb, req);
1141 }
1142 
1143 void
1144 spdk_lvol_create_clone(struct spdk_lvol *origlvol, const char *clone_name,
1145 		       spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1146 {
1147 	struct spdk_lvol *newlvol;
1148 	struct spdk_lvol_with_handle_req *req;
1149 	struct spdk_lvol_store *lvs;
1150 	struct spdk_blob *origblob;
1151 	struct spdk_blob_xattr_opts clone_xattrs;
1152 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1153 	int rc;
1154 
1155 	if (origlvol == NULL) {
1156 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol not provided.\n");
1157 		cb_fn(cb_arg, NULL, -EINVAL);
1158 		return;
1159 	}
1160 
1161 	origblob = origlvol->blob;
1162 	lvs = origlvol->lvol_store;
1163 	if (lvs == NULL) {
1164 		SPDK_ERRLOG("lvol store does not exist\n");
1165 		cb_fn(cb_arg, NULL, -EINVAL);
1166 		return;
1167 	}
1168 
1169 	rc = _spdk_lvs_verify_lvol_name(lvs, clone_name);
1170 	if (rc < 0) {
1171 		cb_fn(cb_arg, NULL, rc);
1172 		return;
1173 	}
1174 
1175 	req = calloc(1, sizeof(*req));
1176 	if (!req) {
1177 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1178 		cb_fn(cb_arg, NULL, -ENOMEM);
1179 		return;
1180 	}
1181 
1182 	newlvol = calloc(1, sizeof(*newlvol));
1183 	if (!newlvol) {
1184 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1185 		free(req);
1186 		cb_fn(cb_arg, NULL, -ENOMEM);
1187 		return;
1188 	}
1189 
1190 	newlvol->lvol_store = lvs;
1191 	snprintf(newlvol->name, sizeof(newlvol->name), "%s", clone_name);
1192 	TAILQ_INSERT_TAIL(&newlvol->lvol_store->pending_lvols, newlvol, link);
1193 	spdk_uuid_generate(&newlvol->uuid);
1194 	spdk_uuid_fmt_lower(newlvol->uuid_str, sizeof(newlvol->uuid_str), &newlvol->uuid);
1195 	clone_xattrs.count = SPDK_COUNTOF(xattr_names);
1196 	clone_xattrs.ctx = newlvol;
1197 	clone_xattrs.names = xattr_names;
1198 	clone_xattrs.get_value = spdk_lvol_get_xattr_value;
1199 	req->lvol = newlvol;
1200 	req->cb_fn = cb_fn;
1201 	req->cb_arg = cb_arg;
1202 
1203 	spdk_bs_create_clone(lvs->blobstore, spdk_blob_get_id(origblob), &clone_xattrs,
1204 			     _spdk_lvol_create_cb,
1205 			     req);
1206 }
1207 
1208 static void
1209 _spdk_lvol_resize_done(void *cb_arg, int lvolerrno)
1210 {
1211 	struct spdk_lvol_req *req = cb_arg;
1212 
1213 	req->cb_fn(req->cb_arg,  lvolerrno);
1214 	free(req);
1215 }
1216 
1217 static void
1218 _spdk_lvol_blob_resize_cb(void *cb_arg, int bserrno)
1219 {
1220 	struct spdk_lvol_req *req = cb_arg;
1221 	struct spdk_lvol *lvol = req->lvol;
1222 
1223 	if (bserrno != 0) {
1224 		req->cb_fn(req->cb_arg, bserrno);
1225 		free(req);
1226 		return;
1227 	}
1228 
1229 	spdk_blob_sync_md(lvol->blob, _spdk_lvol_resize_done, req);
1230 }
1231 
1232 void
1233 spdk_lvol_resize(struct spdk_lvol *lvol, uint64_t sz,
1234 		 spdk_lvol_op_complete cb_fn, void *cb_arg)
1235 {
1236 	struct spdk_blob *blob = lvol->blob;
1237 	struct spdk_lvol_store *lvs = lvol->lvol_store;
1238 	struct spdk_lvol_req *req;
1239 	uint64_t new_clusters = spdk_divide_round_up(sz, spdk_bs_get_cluster_size(lvs->blobstore));
1240 
1241 	req = calloc(1, sizeof(*req));
1242 	if (!req) {
1243 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1244 		cb_fn(cb_arg, -ENOMEM);
1245 		return;
1246 	}
1247 	req->cb_fn = cb_fn;
1248 	req->cb_arg = cb_arg;
1249 	req->lvol = lvol;
1250 
1251 	spdk_blob_resize(blob, new_clusters, _spdk_lvol_blob_resize_cb, req);
1252 }
1253 
1254 static void
1255 _spdk_lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
1256 {
1257 	struct spdk_lvol_req *req = cb_arg;
1258 
1259 	req->cb_fn(req->cb_arg, lvolerrno);
1260 	free(req);
1261 }
1262 
1263 void
1264 spdk_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1265 {
1266 	struct spdk_lvol_req *req;
1267 
1268 	req = calloc(1, sizeof(*req));
1269 	if (!req) {
1270 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1271 		cb_fn(cb_arg, -ENOMEM);
1272 		return;
1273 	}
1274 	req->cb_fn = cb_fn;
1275 	req->cb_arg = cb_arg;
1276 
1277 	spdk_blob_set_read_only(lvol->blob);
1278 	spdk_blob_sync_md(lvol->blob, _spdk_lvol_set_read_only_cb, req);
1279 }
1280 
1281 static void
1282 _spdk_lvol_rename_cb(void *cb_arg, int lvolerrno)
1283 {
1284 	struct spdk_lvol_req *req = cb_arg;
1285 
1286 	if (lvolerrno != 0) {
1287 		SPDK_ERRLOG("Lvol rename operation failed\n");
1288 	} else {
1289 		snprintf(req->lvol->name, sizeof(req->lvol->name), "%s", req->name);
1290 	}
1291 
1292 	req->cb_fn(req->cb_arg, lvolerrno);
1293 	free(req);
1294 }
1295 
1296 void
1297 spdk_lvol_rename(struct spdk_lvol *lvol, const char *new_name,
1298 		 spdk_lvol_op_complete cb_fn, void *cb_arg)
1299 {
1300 	struct spdk_lvol *tmp;
1301 	struct spdk_blob *blob = lvol->blob;
1302 	struct spdk_lvol_req *req;
1303 	int rc;
1304 
1305 	/* Check if new name is current lvol name.
1306 	 * If so, return success immediately */
1307 	if (strncmp(lvol->name, new_name, SPDK_LVOL_NAME_MAX) == 0) {
1308 		cb_fn(cb_arg, 0);
1309 		return;
1310 	}
1311 
1312 	/* Check if lvol with 'new_name' already exists in lvolstore */
1313 	TAILQ_FOREACH(tmp, &lvol->lvol_store->lvols, link) {
1314 		if (strncmp(tmp->name, new_name, SPDK_LVOL_NAME_MAX) == 0) {
1315 			SPDK_ERRLOG("Lvol %s already exists in lvol store %s\n", new_name, lvol->lvol_store->name);
1316 			cb_fn(cb_arg, -EEXIST);
1317 			return;
1318 		}
1319 	}
1320 
1321 	req = calloc(1, sizeof(*req));
1322 	if (!req) {
1323 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1324 		cb_fn(cb_arg, -ENOMEM);
1325 		return;
1326 	}
1327 	req->cb_fn = cb_fn;
1328 	req->cb_arg = cb_arg;
1329 	req->lvol = lvol;
1330 	snprintf(req->name, sizeof(req->name), "%s", new_name);
1331 
1332 	rc = spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1333 	if (rc < 0) {
1334 		free(req);
1335 		cb_fn(cb_arg, rc);
1336 		return;
1337 	}
1338 
1339 	spdk_blob_sync_md(blob, _spdk_lvol_rename_cb, req);
1340 }
1341 
1342 void
1343 spdk_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1344 {
1345 	struct spdk_lvol_req *req;
1346 	struct spdk_blob_store *bs;
1347 
1348 	assert(cb_fn != NULL);
1349 
1350 	if (lvol == NULL) {
1351 		SPDK_ERRLOG("lvol does not exist\n");
1352 		cb_fn(cb_arg, -ENODEV);
1353 		return;
1354 	}
1355 
1356 	if (lvol->ref_count != 0) {
1357 		SPDK_ERRLOG("Cannot destroy lvol %s because it is still open\n", lvol->unique_id);
1358 		cb_fn(cb_arg, -EBUSY);
1359 		return;
1360 	}
1361 
1362 	lvol->action_in_progress = true;
1363 
1364 	req = calloc(1, sizeof(*req));
1365 	if (!req) {
1366 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1367 		cb_fn(cb_arg, -ENOMEM);
1368 		return;
1369 	}
1370 
1371 	req->cb_fn = cb_fn;
1372 	req->cb_arg = cb_arg;
1373 	req->lvol = lvol;
1374 	bs = lvol->lvol_store->blobstore;
1375 
1376 	spdk_bs_delete_blob(bs, lvol->blob_id, _spdk_lvol_delete_blob_cb, req);
1377 }
1378 
1379 void
1380 spdk_lvol_close(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1381 {
1382 	struct spdk_lvol_req *req;
1383 
1384 	assert(cb_fn != NULL);
1385 
1386 	if (lvol == NULL) {
1387 		SPDK_ERRLOG("lvol does not exist\n");
1388 		cb_fn(cb_arg, -ENODEV);
1389 		return;
1390 	}
1391 
1392 	if (lvol->ref_count > 1) {
1393 		lvol->ref_count--;
1394 		cb_fn(cb_arg, 0);
1395 		return;
1396 	} else if (lvol->ref_count == 0) {
1397 		cb_fn(cb_arg, -EINVAL);
1398 		return;
1399 	}
1400 
1401 	lvol->action_in_progress = true;
1402 
1403 	req = calloc(1, sizeof(*req));
1404 	if (!req) {
1405 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1406 		cb_fn(cb_arg, -ENOMEM);
1407 		return;
1408 	}
1409 
1410 	req->cb_fn = cb_fn;
1411 	req->cb_arg = cb_arg;
1412 	req->lvol = lvol;
1413 
1414 	spdk_blob_close(lvol->blob, _spdk_lvol_close_blob_cb, req);
1415 }
1416 
1417 struct spdk_io_channel *
1418 spdk_lvol_get_io_channel(struct spdk_lvol *lvol)
1419 {
1420 	return spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1421 }
1422 
1423 static void
1424 _spdk_lvol_inflate_cb(void *cb_arg, int lvolerrno)
1425 {
1426 	struct spdk_lvol_req *req = cb_arg;
1427 
1428 	spdk_bs_free_io_channel(req->channel);
1429 
1430 	if (lvolerrno < 0) {
1431 		SPDK_ERRLOG("Could not inflate lvol\n");
1432 	}
1433 
1434 	req->cb_fn(req->cb_arg, lvolerrno);
1435 	free(req);
1436 }
1437 
1438 void
1439 spdk_lvol_inflate(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1440 {
1441 	struct spdk_lvol_req *req;
1442 	spdk_blob_id blob_id;
1443 
1444 	assert(cb_fn != NULL);
1445 
1446 	if (lvol == NULL) {
1447 		SPDK_ERRLOG("Lvol does not exist\n");
1448 		cb_fn(cb_arg, -ENODEV);
1449 		return;
1450 	}
1451 
1452 	req = calloc(1, sizeof(*req));
1453 	if (!req) {
1454 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1455 		cb_fn(cb_arg, -ENOMEM);
1456 		return;
1457 	}
1458 
1459 	req->cb_fn = cb_fn;
1460 	req->cb_arg = cb_arg;
1461 	req->channel = spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1462 	if (req->channel == NULL) {
1463 		SPDK_ERRLOG("Cannot alloc io channel for lvol inflate request\n");
1464 		free(req);
1465 		cb_fn(cb_arg, -ENOMEM);
1466 		return;
1467 	}
1468 
1469 	blob_id = spdk_blob_get_id(lvol->blob);
1470 	spdk_bs_inflate_blob(lvol->lvol_store->blobstore, req->channel, blob_id, _spdk_lvol_inflate_cb,
1471 			     req);
1472 }
1473 
1474 void
1475 spdk_lvol_decouple_parent(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1476 {
1477 	struct spdk_lvol_req *req;
1478 	spdk_blob_id blob_id;
1479 
1480 	assert(cb_fn != NULL);
1481 
1482 	if (lvol == NULL) {
1483 		SPDK_ERRLOG("Lvol does not exist\n");
1484 		cb_fn(cb_arg, -ENODEV);
1485 		return;
1486 	}
1487 
1488 	req = calloc(1, sizeof(*req));
1489 	if (!req) {
1490 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1491 		cb_fn(cb_arg, -ENOMEM);
1492 		return;
1493 	}
1494 
1495 	req->cb_fn = cb_fn;
1496 	req->cb_arg = cb_arg;
1497 	req->channel = spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1498 	if (req->channel == NULL) {
1499 		SPDK_ERRLOG("Cannot alloc io channel for lvol inflate request\n");
1500 		free(req);
1501 		cb_fn(cb_arg, -ENOMEM);
1502 		return;
1503 	}
1504 
1505 	blob_id = spdk_blob_get_id(lvol->blob);
1506 	spdk_bs_blob_decouple_parent(lvol->lvol_store->blobstore, req->channel, blob_id,
1507 				     _spdk_lvol_inflate_cb, req);
1508 }
1509