xref: /spdk/lib/lvol/lvol.c (revision 1a9ed697f0c1696ba6b5819e27e68a3fbbf3b223)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk_internal/lvolstore.h"
35 #include "spdk_internal/log.h"
36 #include "spdk/string.h"
37 #include "spdk/thread.h"
38 #include "spdk/blob_bdev.h"
39 #include "spdk/util.h"
40 
41 /* Default blob channel opts for lvol */
42 #define SPDK_LVOL_BLOB_OPTS_CHANNEL_OPS 512
43 
44 #define LVOL_NAME "name"
45 
46 SPDK_LOG_REGISTER_COMPONENT("lvol", SPDK_LOG_LVOL)
47 
48 static TAILQ_HEAD(, spdk_lvol_store) g_lvol_stores = TAILQ_HEAD_INITIALIZER(g_lvol_stores);
49 static pthread_mutex_t g_lvol_stores_mutex = PTHREAD_MUTEX_INITIALIZER;
50 
51 static int
52 _spdk_add_lvs_to_list(struct spdk_lvol_store *lvs)
53 {
54 	struct spdk_lvol_store *tmp;
55 	bool name_conflict = false;
56 
57 	pthread_mutex_lock(&g_lvol_stores_mutex);
58 	TAILQ_FOREACH(tmp, &g_lvol_stores, link) {
59 		if (!strncmp(lvs->name, tmp->name, SPDK_LVS_NAME_MAX)) {
60 			name_conflict = true;
61 			break;
62 		}
63 	}
64 	if (!name_conflict) {
65 		lvs->on_list = true;
66 		TAILQ_INSERT_TAIL(&g_lvol_stores, lvs, link);
67 	}
68 	pthread_mutex_unlock(&g_lvol_stores_mutex);
69 
70 	return name_conflict ? -1 : 0;
71 }
72 
73 static void
74 _spdk_lvs_free(struct spdk_lvol_store *lvs)
75 {
76 	pthread_mutex_lock(&g_lvol_stores_mutex);
77 	if (lvs->on_list) {
78 		TAILQ_REMOVE(&g_lvol_stores, lvs, link);
79 	}
80 	pthread_mutex_unlock(&g_lvol_stores_mutex);
81 
82 	free(lvs);
83 }
84 
85 static void
86 _spdk_lvol_free(struct spdk_lvol *lvol)
87 {
88 	free(lvol);
89 }
90 
91 static void
92 _spdk_lvol_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
93 {
94 	struct spdk_lvol_with_handle_req *req = cb_arg;
95 	struct spdk_lvol *lvol = req->lvol;
96 
97 	if (lvolerrno != 0) {
98 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Failed to open lvol %s\n", lvol->unique_id);
99 		goto end;
100 	}
101 
102 	lvol->ref_count++;
103 	lvol->blob = blob;
104 end:
105 	req->cb_fn(req->cb_arg, lvol, lvolerrno);
106 	free(req);
107 }
108 
109 void
110 spdk_lvol_open(struct spdk_lvol *lvol, spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
111 {
112 	struct spdk_lvol_with_handle_req *req;
113 	struct spdk_blob_open_opts opts;
114 
115 	assert(cb_fn != NULL);
116 
117 	if (lvol == NULL) {
118 		SPDK_ERRLOG("lvol does not exist\n");
119 		cb_fn(cb_arg, NULL, -ENODEV);
120 		return;
121 	}
122 
123 	if (lvol->action_in_progress == true) {
124 		SPDK_ERRLOG("Cannot open lvol - operations on lvol pending\n");
125 		cb_fn(cb_arg, lvol, -EBUSY);
126 		return;
127 	}
128 
129 	if (lvol->ref_count > 0) {
130 		lvol->ref_count++;
131 		cb_fn(cb_arg, lvol, 0);
132 		return;
133 	}
134 
135 	req = calloc(1, sizeof(*req));
136 	if (req == NULL) {
137 		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
138 		cb_fn(cb_arg, NULL, -ENOMEM);
139 		return;
140 	}
141 
142 	req->cb_fn = cb_fn;
143 	req->cb_arg = cb_arg;
144 	req->lvol = lvol;
145 
146 	spdk_blob_open_opts_init(&opts);
147 	opts.clear_method = lvol->clear_method;
148 
149 	spdk_bs_open_blob_ext(lvol->lvol_store->blobstore, lvol->blob_id, &opts, _spdk_lvol_open_cb, req);
150 }
151 
152 static void
153 _spdk_bs_unload_with_error_cb(void *cb_arg, int lvolerrno)
154 {
155 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
156 
157 	req->cb_fn(req->cb_arg, NULL, req->lvserrno);
158 	free(req);
159 }
160 
161 static void
162 _spdk_load_next_lvol(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
163 {
164 	struct spdk_lvs_with_handle_req *req = cb_arg;
165 	struct spdk_lvol_store *lvs = req->lvol_store;
166 	struct spdk_blob_store *bs = lvs->blobstore;
167 	struct spdk_lvol *lvol, *tmp;
168 	spdk_blob_id blob_id;
169 	const char *attr;
170 	size_t value_len;
171 	int rc;
172 
173 	if (lvolerrno == -ENOENT) {
174 		/* Finished iterating */
175 		req->cb_fn(req->cb_arg, lvs, 0);
176 		free(req);
177 		return;
178 	} else if (lvolerrno < 0) {
179 		SPDK_ERRLOG("Failed to fetch blobs list\n");
180 		req->lvserrno = lvolerrno;
181 		goto invalid;
182 	}
183 
184 	blob_id = spdk_blob_get_id(blob);
185 
186 	if (blob_id == lvs->super_blob_id) {
187 		SPDK_INFOLOG(SPDK_LOG_LVOL, "found superblob %"PRIu64"\n", (uint64_t)blob_id);
188 		spdk_bs_iter_next(bs, blob, _spdk_load_next_lvol, req);
189 		return;
190 	}
191 
192 	lvol = calloc(1, sizeof(*lvol));
193 	if (!lvol) {
194 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
195 		req->lvserrno = -ENOMEM;
196 		goto invalid;
197 	}
198 
199 	lvol->blob = blob;
200 	lvol->blob_id = blob_id;
201 	lvol->lvol_store = lvs;
202 	lvol->thin_provision = spdk_blob_is_thin_provisioned(blob);
203 
204 	rc = spdk_blob_get_xattr_value(blob, "uuid", (const void **)&attr, &value_len);
205 	if (rc != 0 || value_len != SPDK_UUID_STRING_LEN || attr[SPDK_UUID_STRING_LEN - 1] != '\0' ||
206 	    spdk_uuid_parse(&lvol->uuid, attr) != 0) {
207 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Missing or corrupt lvol uuid\n");
208 		memset(&lvol->uuid, 0, sizeof(lvol->uuid));
209 	}
210 	spdk_uuid_fmt_lower(lvol->uuid_str, sizeof(lvol->uuid_str), &lvol->uuid);
211 
212 	if (!spdk_mem_all_zero(&lvol->uuid, sizeof(lvol->uuid))) {
213 		snprintf(lvol->unique_id, sizeof(lvol->unique_id), "%s", lvol->uuid_str);
214 	} else {
215 		spdk_uuid_fmt_lower(lvol->unique_id, sizeof(lvol->unique_id), &lvol->lvol_store->uuid);
216 		value_len = strlen(lvol->unique_id);
217 		snprintf(lvol->unique_id + value_len, sizeof(lvol->unique_id) - value_len, "_%"PRIu64,
218 			 (uint64_t)blob_id);
219 	}
220 
221 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&attr, &value_len);
222 	if (rc != 0 || value_len > SPDK_LVOL_NAME_MAX) {
223 		SPDK_ERRLOG("Cannot assign lvol name\n");
224 		_spdk_lvol_free(lvol);
225 		req->lvserrno = -EINVAL;
226 		goto invalid;
227 	}
228 
229 	snprintf(lvol->name, sizeof(lvol->name), "%s", attr);
230 
231 	TAILQ_INSERT_TAIL(&lvs->lvols, lvol, link);
232 
233 	lvs->lvol_count++;
234 
235 	SPDK_INFOLOG(SPDK_LOG_LVOL, "added lvol %s (%s)\n", lvol->unique_id, lvol->uuid_str);
236 
237 	spdk_bs_iter_next(bs, blob, _spdk_load_next_lvol, req);
238 
239 	return;
240 
241 invalid:
242 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
243 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
244 		free(lvol);
245 	}
246 
247 	_spdk_lvs_free(lvs);
248 	spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
249 }
250 
251 static void
252 _spdk_close_super_cb(void *cb_arg, int lvolerrno)
253 {
254 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
255 	struct spdk_lvol_store *lvs = req->lvol_store;
256 	struct spdk_blob_store *bs = lvs->blobstore;
257 
258 	if (lvolerrno != 0) {
259 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Could not close super blob\n");
260 		_spdk_lvs_free(lvs);
261 		req->lvserrno = -ENODEV;
262 		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
263 		return;
264 	}
265 
266 	/* Start loading lvols */
267 	spdk_bs_iter_first(lvs->blobstore, _spdk_load_next_lvol, req);
268 }
269 
270 static void
271 _spdk_close_super_blob_with_error_cb(void *cb_arg, int lvolerrno)
272 {
273 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
274 	struct spdk_lvol_store *lvs = req->lvol_store;
275 	struct spdk_blob_store *bs = lvs->blobstore;
276 
277 	_spdk_lvs_free(lvs);
278 
279 	spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
280 }
281 
282 static void
283 _spdk_lvs_read_uuid(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
284 {
285 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
286 	struct spdk_lvol_store *lvs = req->lvol_store;
287 	struct spdk_blob_store *bs = lvs->blobstore;
288 	const char *attr;
289 	size_t value_len;
290 	int rc;
291 
292 	if (lvolerrno != 0) {
293 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Could not open super blob\n");
294 		_spdk_lvs_free(lvs);
295 		req->lvserrno = -ENODEV;
296 		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
297 		return;
298 	}
299 
300 	rc = spdk_blob_get_xattr_value(blob, "uuid", (const void **)&attr, &value_len);
301 	if (rc != 0 || value_len != SPDK_UUID_STRING_LEN || attr[SPDK_UUID_STRING_LEN - 1] != '\0') {
302 		SPDK_INFOLOG(SPDK_LOG_LVOL, "missing or incorrect UUID\n");
303 		req->lvserrno = -EINVAL;
304 		spdk_blob_close(blob, _spdk_close_super_blob_with_error_cb, req);
305 		return;
306 	}
307 
308 	if (spdk_uuid_parse(&lvs->uuid, attr)) {
309 		SPDK_INFOLOG(SPDK_LOG_LVOL, "incorrect UUID '%s'\n", attr);
310 		req->lvserrno = -EINVAL;
311 		spdk_blob_close(blob, _spdk_close_super_blob_with_error_cb, req);
312 		return;
313 	}
314 
315 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&attr, &value_len);
316 	if (rc != 0 || value_len > SPDK_LVS_NAME_MAX) {
317 		SPDK_INFOLOG(SPDK_LOG_LVOL, "missing or invalid name\n");
318 		req->lvserrno = -EINVAL;
319 		spdk_blob_close(blob, _spdk_close_super_blob_with_error_cb, req);
320 		return;
321 	}
322 
323 	snprintf(lvs->name, sizeof(lvs->name), "%s", attr);
324 
325 	rc = _spdk_add_lvs_to_list(lvs);
326 	if (rc) {
327 		SPDK_INFOLOG(SPDK_LOG_LVOL, "lvolstore with name %s already exists\n", lvs->name);
328 		req->lvserrno = -EEXIST;
329 		spdk_blob_close(blob, _spdk_close_super_blob_with_error_cb, req);
330 		return;
331 	}
332 
333 	lvs->super_blob_id = spdk_blob_get_id(blob);
334 
335 	spdk_blob_close(blob, _spdk_close_super_cb, req);
336 }
337 
338 static void
339 _spdk_lvs_open_super(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
340 {
341 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
342 	struct spdk_lvol_store *lvs = req->lvol_store;
343 	struct spdk_blob_store *bs = lvs->blobstore;
344 
345 	if (lvolerrno != 0) {
346 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Super blob not found\n");
347 		_spdk_lvs_free(lvs);
348 		req->lvserrno = -ENODEV;
349 		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
350 		return;
351 	}
352 
353 	spdk_bs_open_blob(bs, blobid, _spdk_lvs_read_uuid, req);
354 }
355 
356 static void
357 _spdk_lvs_load_cb(void *cb_arg, struct spdk_blob_store *bs, int lvolerrno)
358 {
359 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
360 	struct spdk_lvol_store *lvs;
361 
362 	if (lvolerrno != 0) {
363 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
364 		free(req);
365 		return;
366 	}
367 
368 	lvs = calloc(1, sizeof(*lvs));
369 	if (lvs == NULL) {
370 		SPDK_ERRLOG("Cannot alloc memory for lvol store\n");
371 		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
372 		return;
373 	}
374 
375 	lvs->blobstore = bs;
376 	lvs->bs_dev = req->bs_dev;
377 	TAILQ_INIT(&lvs->lvols);
378 	TAILQ_INIT(&lvs->pending_lvols);
379 
380 	req->lvol_store = lvs;
381 
382 	spdk_bs_get_super(bs, _spdk_lvs_open_super, req);
383 }
384 
385 static void
386 lvs_bs_opts_init(struct spdk_bs_opts *opts)
387 {
388 	spdk_bs_opts_init(opts);
389 	opts->max_channel_ops = SPDK_LVOL_BLOB_OPTS_CHANNEL_OPS;
390 }
391 
392 void
393 spdk_lvs_load(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
394 {
395 	struct spdk_lvs_with_handle_req *req;
396 	struct spdk_bs_opts opts = {};
397 
398 	assert(cb_fn != NULL);
399 
400 	if (bs_dev == NULL) {
401 		SPDK_ERRLOG("Blobstore device does not exist\n");
402 		cb_fn(cb_arg, NULL, -ENODEV);
403 		return;
404 	}
405 
406 	req = calloc(1, sizeof(*req));
407 	if (req == NULL) {
408 		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
409 		cb_fn(cb_arg, NULL, -ENOMEM);
410 		return;
411 	}
412 
413 	req->cb_fn = cb_fn;
414 	req->cb_arg = cb_arg;
415 	req->bs_dev = bs_dev;
416 
417 	lvs_bs_opts_init(&opts);
418 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "LVOLSTORE");
419 
420 	spdk_bs_load(bs_dev, &opts, _spdk_lvs_load_cb, req);
421 }
422 
423 static void
424 _spdk_remove_bs_on_error_cb(void *cb_arg, int bserrno)
425 {
426 }
427 
428 static void
429 _spdk_super_create_close_cb(void *cb_arg, int lvolerrno)
430 {
431 	struct spdk_lvs_with_handle_req *req = cb_arg;
432 	struct spdk_lvol_store *lvs = req->lvol_store;
433 
434 	if (lvolerrno < 0) {
435 		SPDK_ERRLOG("Lvol store init failed: could not close super blob\n");
436 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
437 		spdk_bs_destroy(lvs->blobstore, _spdk_remove_bs_on_error_cb, NULL);
438 		_spdk_lvs_free(lvs);
439 		free(req);
440 		return;
441 	}
442 
443 	req->cb_fn(req->cb_arg, lvs, lvolerrno);
444 	free(req);
445 }
446 
447 static void
448 _spdk_super_blob_set_cb(void *cb_arg, int lvolerrno)
449 {
450 	struct spdk_lvs_with_handle_req *req = cb_arg;
451 	struct spdk_lvol_store *lvs = req->lvol_store;
452 	struct spdk_blob *blob = lvs->super_blob;
453 
454 	if (lvolerrno < 0) {
455 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
456 		SPDK_ERRLOG("Lvol store init failed: could not set uuid for super blob\n");
457 		spdk_bs_destroy(lvs->blobstore, _spdk_remove_bs_on_error_cb, NULL);
458 		_spdk_lvs_free(lvs);
459 		free(req);
460 		return;
461 	}
462 
463 	spdk_blob_close(blob, _spdk_super_create_close_cb, req);
464 }
465 
466 static void
467 _spdk_super_blob_init_cb(void *cb_arg, int lvolerrno)
468 {
469 	struct spdk_lvs_with_handle_req *req = cb_arg;
470 	struct spdk_lvol_store *lvs = req->lvol_store;
471 	struct spdk_blob *blob = lvs->super_blob;
472 	char uuid[SPDK_UUID_STRING_LEN];
473 
474 	if (lvolerrno < 0) {
475 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
476 		SPDK_ERRLOG("Lvol store init failed: could not set super blob\n");
477 		spdk_bs_destroy(lvs->blobstore, _spdk_remove_bs_on_error_cb, NULL);
478 		_spdk_lvs_free(lvs);
479 		free(req);
480 		return;
481 	}
482 
483 	spdk_uuid_fmt_lower(uuid, sizeof(uuid), &lvs->uuid);
484 
485 	spdk_blob_set_xattr(blob, "uuid", uuid, sizeof(uuid));
486 	spdk_blob_set_xattr(blob, "name", lvs->name, strnlen(lvs->name, SPDK_LVS_NAME_MAX) + 1);
487 	spdk_blob_sync_md(blob, _spdk_super_blob_set_cb, req);
488 }
489 
490 static void
491 _spdk_super_blob_create_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
492 {
493 	struct spdk_lvs_with_handle_req *req = cb_arg;
494 	struct spdk_lvol_store *lvs = req->lvol_store;
495 
496 	if (lvolerrno < 0) {
497 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
498 		SPDK_ERRLOG("Lvol store init failed: could not open super blob\n");
499 		spdk_bs_destroy(lvs->blobstore, _spdk_remove_bs_on_error_cb, NULL);
500 		_spdk_lvs_free(lvs);
501 		free(req);
502 		return;
503 	}
504 
505 	lvs->super_blob = blob;
506 	lvs->super_blob_id = spdk_blob_get_id(blob);
507 
508 	spdk_bs_set_super(lvs->blobstore, lvs->super_blob_id, _spdk_super_blob_init_cb, req);
509 }
510 
511 static void
512 _spdk_super_blob_create_cb(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
513 {
514 	struct spdk_lvs_with_handle_req *req = cb_arg;
515 	struct spdk_lvol_store *lvs = req->lvol_store;
516 	struct spdk_blob_store *bs;
517 
518 	if (lvolerrno < 0) {
519 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
520 		SPDK_ERRLOG("Lvol store init failed: could not create super blob\n");
521 		spdk_bs_destroy(lvs->blobstore, _spdk_remove_bs_on_error_cb, NULL);
522 		_spdk_lvs_free(lvs);
523 		free(req);
524 		return;
525 	}
526 
527 	bs = req->lvol_store->blobstore;
528 
529 	spdk_bs_open_blob(bs, blobid, _spdk_super_blob_create_open_cb, req);
530 }
531 
532 static void
533 _spdk_lvs_init_cb(void *cb_arg, struct spdk_blob_store *bs, int lvserrno)
534 {
535 	struct spdk_lvs_with_handle_req *lvs_req = cb_arg;
536 	struct spdk_lvol_store *lvs = lvs_req->lvol_store;
537 
538 	if (lvserrno != 0) {
539 		assert(bs == NULL);
540 		lvs_req->cb_fn(lvs_req->cb_arg, NULL, lvserrno);
541 		SPDK_ERRLOG("Lvol store init failed: could not initialize blobstore\n");
542 		_spdk_lvs_free(lvs);
543 		free(lvs_req);
544 		return;
545 	}
546 
547 	assert(bs != NULL);
548 	lvs->blobstore = bs;
549 	TAILQ_INIT(&lvs->lvols);
550 	TAILQ_INIT(&lvs->pending_lvols);
551 
552 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol store initialized\n");
553 
554 	/* create super blob */
555 	spdk_bs_create_blob(lvs->blobstore, _spdk_super_blob_create_cb, lvs_req);
556 }
557 
558 void
559 spdk_lvs_opts_init(struct spdk_lvs_opts *o)
560 {
561 	o->cluster_sz = SPDK_LVS_OPTS_CLUSTER_SZ;
562 	o->clear_method = LVS_CLEAR_WITH_UNMAP;
563 	memset(o->name, 0, sizeof(o->name));
564 }
565 
566 static void
567 _spdk_setup_lvs_opts(struct spdk_bs_opts *bs_opts, struct spdk_lvs_opts *o)
568 {
569 	assert(o != NULL);
570 	lvs_bs_opts_init(bs_opts);
571 	bs_opts->cluster_sz = o->cluster_sz;
572 	bs_opts->clear_method = (enum bs_clear_method)o->clear_method;
573 }
574 
575 int
576 spdk_lvs_init(struct spdk_bs_dev *bs_dev, struct spdk_lvs_opts *o,
577 	      spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
578 {
579 	struct spdk_lvol_store *lvs;
580 	struct spdk_lvs_with_handle_req *lvs_req;
581 	struct spdk_bs_opts opts = {};
582 	int rc;
583 
584 	if (bs_dev == NULL) {
585 		SPDK_ERRLOG("Blobstore device does not exist\n");
586 		return -ENODEV;
587 	}
588 
589 	if (o == NULL) {
590 		SPDK_ERRLOG("spdk_lvs_opts not specified\n");
591 		return -EINVAL;
592 	}
593 
594 	_spdk_setup_lvs_opts(&opts, o);
595 
596 	if (strnlen(o->name, SPDK_LVS_NAME_MAX) == SPDK_LVS_NAME_MAX) {
597 		SPDK_ERRLOG("Name has no null terminator.\n");
598 		return -EINVAL;
599 	}
600 
601 	if (strnlen(o->name, SPDK_LVS_NAME_MAX) == 0) {
602 		SPDK_ERRLOG("No name specified.\n");
603 		return -EINVAL;
604 	}
605 
606 	lvs = calloc(1, sizeof(*lvs));
607 	if (!lvs) {
608 		SPDK_ERRLOG("Cannot alloc memory for lvol store base pointer\n");
609 		return -ENOMEM;
610 	}
611 
612 	spdk_uuid_generate(&lvs->uuid);
613 	snprintf(lvs->name, sizeof(lvs->name), "%s", o->name);
614 
615 	rc = _spdk_add_lvs_to_list(lvs);
616 	if (rc) {
617 		SPDK_ERRLOG("lvolstore with name %s already exists\n", lvs->name);
618 		_spdk_lvs_free(lvs);
619 		return -EEXIST;
620 	}
621 
622 	lvs_req = calloc(1, sizeof(*lvs_req));
623 	if (!lvs_req) {
624 		_spdk_lvs_free(lvs);
625 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
626 		return -ENOMEM;
627 	}
628 
629 	assert(cb_fn != NULL);
630 	lvs_req->cb_fn = cb_fn;
631 	lvs_req->cb_arg = cb_arg;
632 	lvs_req->lvol_store = lvs;
633 	lvs->bs_dev = bs_dev;
634 	lvs->destruct = false;
635 
636 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "LVOLSTORE");
637 
638 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Initializing lvol store\n");
639 	spdk_bs_init(bs_dev, &opts, _spdk_lvs_init_cb, lvs_req);
640 
641 	return 0;
642 }
643 
644 static void
645 _spdk_lvs_rename_cb(void *cb_arg, int lvolerrno)
646 {
647 	struct spdk_lvs_req *req = cb_arg;
648 
649 	if (lvolerrno != 0) {
650 		req->lvserrno = lvolerrno;
651 	}
652 	if (req->lvserrno != 0) {
653 		SPDK_ERRLOG("Lvol store rename operation failed\n");
654 		/* Lvs renaming failed, so we should 'clear' new_name.
655 		 * Otherwise it could cause a failure on the next attepmt to change the name to 'new_name'  */
656 		snprintf(req->lvol_store->new_name,
657 			 sizeof(req->lvol_store->new_name),
658 			 "%s", req->lvol_store->name);
659 	} else {
660 		/* Update lvs name with new_name */
661 		snprintf(req->lvol_store->name,
662 			 sizeof(req->lvol_store->name),
663 			 "%s", req->lvol_store->new_name);
664 	}
665 
666 	req->cb_fn(req->cb_arg, req->lvserrno);
667 	free(req);
668 }
669 
670 static void
671 _spdk_lvs_rename_sync_cb(void *cb_arg, int lvolerrno)
672 {
673 	struct spdk_lvs_req *req = cb_arg;
674 	struct spdk_blob *blob = req->lvol_store->super_blob;
675 
676 	if (lvolerrno < 0) {
677 		req->lvserrno = lvolerrno;
678 	}
679 
680 	spdk_blob_close(blob, _spdk_lvs_rename_cb, req);
681 }
682 
683 static void
684 _spdk_lvs_rename_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
685 {
686 	struct spdk_lvs_req *req = cb_arg;
687 	int rc;
688 
689 	if (lvolerrno < 0) {
690 		_spdk_lvs_rename_cb(cb_arg, lvolerrno);
691 		return;
692 	}
693 
694 	rc = spdk_blob_set_xattr(blob, "name", req->lvol_store->new_name,
695 				 strlen(req->lvol_store->new_name) + 1);
696 	if (rc < 0) {
697 		req->lvserrno = rc;
698 		_spdk_lvs_rename_sync_cb(req, rc);
699 		return;
700 	}
701 
702 	req->lvol_store->super_blob = blob;
703 
704 	spdk_blob_sync_md(blob, _spdk_lvs_rename_sync_cb, req);
705 }
706 
707 void
708 spdk_lvs_rename(struct spdk_lvol_store *lvs, const char *new_name,
709 		spdk_lvs_op_complete cb_fn, void *cb_arg)
710 {
711 	struct spdk_lvs_req *req;
712 	struct spdk_lvol_store *tmp;
713 
714 	/* Check if new name is current lvs name.
715 	 * If so, return success immediately */
716 	if (strncmp(lvs->name, new_name, SPDK_LVS_NAME_MAX) == 0) {
717 		cb_fn(cb_arg, 0);
718 		return;
719 	}
720 
721 	/* Check if new or new_name is already used in other lvs */
722 	pthread_mutex_lock(&g_lvol_stores_mutex);
723 	TAILQ_FOREACH(tmp, &g_lvol_stores, link) {
724 		if (!strncmp(new_name, tmp->name, SPDK_LVS_NAME_MAX) ||
725 		    !strncmp(new_name, tmp->new_name, SPDK_LVS_NAME_MAX)) {
726 			pthread_mutex_unlock(&g_lvol_stores_mutex);
727 			cb_fn(cb_arg, -EEXIST);
728 			return;
729 		}
730 	}
731 	pthread_mutex_unlock(&g_lvol_stores_mutex);
732 
733 	req = calloc(1, sizeof(*req));
734 	if (!req) {
735 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
736 		cb_fn(cb_arg, -ENOMEM);
737 		return;
738 	}
739 	snprintf(lvs->new_name, sizeof(lvs->new_name), "%s", new_name);
740 	req->lvol_store = lvs;
741 	req->cb_fn = cb_fn;
742 	req->cb_arg = cb_arg;
743 
744 	spdk_bs_open_blob(lvs->blobstore, lvs->super_blob_id, _spdk_lvs_rename_open_cb, req);
745 }
746 
747 static void
748 _lvs_unload_cb(void *cb_arg, int lvserrno)
749 {
750 	struct spdk_lvs_req *lvs_req = cb_arg;
751 
752 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol store unloaded\n");
753 	assert(lvs_req->cb_fn != NULL);
754 	lvs_req->cb_fn(lvs_req->cb_arg, lvserrno);
755 	free(lvs_req);
756 }
757 
758 int
759 spdk_lvs_unload(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn,
760 		void *cb_arg)
761 {
762 	struct spdk_lvs_req *lvs_req;
763 	struct spdk_lvol *lvol, *tmp;
764 
765 	if (lvs == NULL) {
766 		SPDK_ERRLOG("Lvol store is NULL\n");
767 		return -ENODEV;
768 	}
769 
770 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
771 		if (lvol->action_in_progress == true) {
772 			SPDK_ERRLOG("Cannot unload lvol store - operations on lvols pending\n");
773 			cb_fn(cb_arg, -EBUSY);
774 			return -EBUSY;
775 		} else if (lvol->ref_count != 0) {
776 			SPDK_ERRLOG("Lvols still open on lvol store\n");
777 			cb_fn(cb_arg, -EBUSY);
778 			return -EBUSY;
779 		}
780 	}
781 
782 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
783 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
784 		_spdk_lvol_free(lvol);
785 	}
786 
787 	lvs_req = calloc(1, sizeof(*lvs_req));
788 	if (!lvs_req) {
789 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
790 		return -ENOMEM;
791 	}
792 
793 	lvs_req->cb_fn = cb_fn;
794 	lvs_req->cb_arg = cb_arg;
795 
796 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Unloading lvol store\n");
797 	spdk_bs_unload(lvs->blobstore, _lvs_unload_cb, lvs_req);
798 	_spdk_lvs_free(lvs);
799 
800 	return 0;
801 }
802 
803 static void
804 _lvs_destroy_cb(void *cb_arg, int lvserrno)
805 {
806 	struct spdk_lvs_destroy_req *lvs_req = cb_arg;
807 
808 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol store destroyed\n");
809 	assert(lvs_req->cb_fn != NULL);
810 	lvs_req->cb_fn(lvs_req->cb_arg, lvserrno);
811 	free(lvs_req);
812 }
813 
814 static void
815 _lvs_destroy_super_cb(void *cb_arg, int bserrno)
816 {
817 	struct spdk_lvs_destroy_req *lvs_req = cb_arg;
818 	struct spdk_lvol_store *lvs = lvs_req->lvs;
819 
820 	assert(lvs != NULL);
821 
822 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Destroying lvol store\n");
823 	spdk_bs_destroy(lvs->blobstore, _lvs_destroy_cb, lvs_req);
824 	_spdk_lvs_free(lvs);
825 }
826 
827 int
828 spdk_lvs_destroy(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn,
829 		 void *cb_arg)
830 {
831 	struct spdk_lvs_destroy_req *lvs_req;
832 	struct spdk_lvol *iter_lvol, *tmp;
833 
834 	if (lvs == NULL) {
835 		SPDK_ERRLOG("Lvol store is NULL\n");
836 		return -ENODEV;
837 	}
838 
839 	TAILQ_FOREACH_SAFE(iter_lvol, &lvs->lvols, link, tmp) {
840 		if (iter_lvol->action_in_progress == true) {
841 			SPDK_ERRLOG("Cannot destroy lvol store - operations on lvols pending\n");
842 			cb_fn(cb_arg, -EBUSY);
843 			return -EBUSY;
844 		} else if (iter_lvol->ref_count != 0) {
845 			SPDK_ERRLOG("Lvols still open on lvol store\n");
846 			cb_fn(cb_arg, -EBUSY);
847 			return -EBUSY;
848 		}
849 	}
850 
851 	TAILQ_FOREACH_SAFE(iter_lvol, &lvs->lvols, link, tmp) {
852 		free(iter_lvol);
853 	}
854 
855 	lvs_req = calloc(1, sizeof(*lvs_req));
856 	if (!lvs_req) {
857 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
858 		return -ENOMEM;
859 	}
860 
861 	lvs_req->cb_fn = cb_fn;
862 	lvs_req->cb_arg = cb_arg;
863 	lvs_req->lvs = lvs;
864 
865 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Deleting super blob\n");
866 	spdk_bs_delete_blob(lvs->blobstore, lvs->super_blob_id, _lvs_destroy_super_cb, lvs_req);
867 
868 	return 0;
869 }
870 
871 static void
872 _spdk_lvol_close_blob_cb(void *cb_arg, int lvolerrno)
873 {
874 	struct spdk_lvol_req *req = cb_arg;
875 	struct spdk_lvol *lvol = req->lvol;
876 
877 	if (lvolerrno < 0) {
878 		SPDK_ERRLOG("Could not close blob on lvol\n");
879 		_spdk_lvol_free(lvol);
880 		goto end;
881 	}
882 
883 	lvol->ref_count--;
884 	lvol->action_in_progress = false;
885 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol %s closed\n", lvol->unique_id);
886 
887 end:
888 	req->cb_fn(req->cb_arg, lvolerrno);
889 	free(req);
890 }
891 
892 bool
893 spdk_lvol_deletable(struct spdk_lvol *lvol)
894 {
895 	size_t count;
896 
897 	spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
898 	return (count == 0);
899 }
900 
901 static void
902 _spdk_lvol_delete_blob_cb(void *cb_arg, int lvolerrno)
903 {
904 	struct spdk_lvol_req *req = cb_arg;
905 	struct spdk_lvol *lvol = req->lvol;
906 
907 	if (lvolerrno < 0) {
908 		SPDK_ERRLOG("Could not remove blob on lvol gracefully - forced removal\n");
909 	} else {
910 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol %s deleted\n", lvol->unique_id);
911 	}
912 
913 	TAILQ_REMOVE(&lvol->lvol_store->lvols, lvol, link);
914 	_spdk_lvol_free(lvol);
915 	req->cb_fn(req->cb_arg, lvolerrno);
916 	free(req);
917 }
918 
919 static void
920 _spdk_lvol_create_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
921 {
922 	struct spdk_lvol_with_handle_req *req = cb_arg;
923 	struct spdk_lvol *lvol = req->lvol;
924 
925 	TAILQ_REMOVE(&req->lvol->lvol_store->pending_lvols, req->lvol, link);
926 
927 	if (lvolerrno < 0) {
928 		free(lvol);
929 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
930 		free(req);
931 		return;
932 	}
933 
934 	lvol->blob = blob;
935 	lvol->blob_id = spdk_blob_get_id(blob);
936 
937 	TAILQ_INSERT_TAIL(&lvol->lvol_store->lvols, lvol, link);
938 
939 	snprintf(lvol->unique_id, sizeof(lvol->unique_id), "%s", lvol->uuid_str);
940 	lvol->ref_count++;
941 
942 	assert(req->cb_fn != NULL);
943 	req->cb_fn(req->cb_arg, req->lvol, lvolerrno);
944 	free(req);
945 }
946 
947 static void
948 _spdk_lvol_create_cb(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
949 {
950 	struct spdk_lvol_with_handle_req *req = cb_arg;
951 	struct spdk_blob_store *bs;
952 	struct spdk_blob_open_opts opts;
953 
954 	if (lvolerrno < 0) {
955 		TAILQ_REMOVE(&req->lvol->lvol_store->pending_lvols, req->lvol, link);
956 		free(req->lvol);
957 		assert(req->cb_fn != NULL);
958 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
959 		free(req);
960 		return;
961 	}
962 
963 	spdk_blob_open_opts_init(&opts);
964 	opts.clear_method = req->lvol->clear_method;
965 	bs = req->lvol->lvol_store->blobstore;
966 
967 	spdk_bs_open_blob_ext(bs, blobid, &opts, _spdk_lvol_create_open_cb, req);
968 }
969 
970 static void
971 lvol_get_xattr_value(void *xattr_ctx, const char *name,
972 		     const void **value, size_t *value_len)
973 {
974 	struct spdk_lvol *lvol = xattr_ctx;
975 
976 	if (!strcmp(LVOL_NAME, name)) {
977 		*value = lvol->name;
978 		*value_len = SPDK_LVOL_NAME_MAX;
979 	} else if (!strcmp("uuid", name)) {
980 		*value = lvol->uuid_str;
981 		*value_len = sizeof(lvol->uuid_str);
982 	}
983 }
984 
985 static int
986 _spdk_lvs_verify_lvol_name(struct spdk_lvol_store *lvs, const char *name)
987 {
988 	struct spdk_lvol *tmp;
989 
990 	if (name == NULL || strnlen(name, SPDK_LVOL_NAME_MAX) == 0) {
991 		SPDK_INFOLOG(SPDK_LOG_LVOL, "lvol name not provided.\n");
992 		return -EINVAL;
993 	}
994 
995 	if (strnlen(name, SPDK_LVOL_NAME_MAX) == SPDK_LVOL_NAME_MAX) {
996 		SPDK_ERRLOG("Name has no null terminator.\n");
997 		return -EINVAL;
998 	}
999 
1000 	TAILQ_FOREACH(tmp, &lvs->lvols, link) {
1001 		if (!strncmp(name, tmp->name, SPDK_LVOL_NAME_MAX)) {
1002 			SPDK_ERRLOG("lvol with name %s already exists\n", name);
1003 			return -EEXIST;
1004 		}
1005 	}
1006 
1007 	TAILQ_FOREACH(tmp, &lvs->pending_lvols, link) {
1008 		if (!strncmp(name, tmp->name, SPDK_LVOL_NAME_MAX)) {
1009 			SPDK_ERRLOG("lvol with name %s is being already created\n", name);
1010 			return -EEXIST;
1011 		}
1012 	}
1013 
1014 	return 0;
1015 }
1016 
1017 int
1018 spdk_lvol_create(struct spdk_lvol_store *lvs, const char *name, uint64_t sz,
1019 		 bool thin_provision, enum lvol_clear_method clear_method, spdk_lvol_op_with_handle_complete cb_fn,
1020 		 void *cb_arg)
1021 {
1022 	struct spdk_lvol_with_handle_req *req;
1023 	struct spdk_blob_store *bs;
1024 	struct spdk_lvol *lvol;
1025 	struct spdk_blob_opts opts;
1026 	uint64_t num_clusters;
1027 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1028 	int rc;
1029 
1030 	if (lvs == NULL) {
1031 		SPDK_ERRLOG("lvol store does not exist\n");
1032 		return -EINVAL;
1033 	}
1034 
1035 	rc = _spdk_lvs_verify_lvol_name(lvs, name);
1036 	if (rc < 0) {
1037 		return rc;
1038 	}
1039 
1040 	bs = lvs->blobstore;
1041 
1042 	req = calloc(1, sizeof(*req));
1043 	if (!req) {
1044 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1045 		return -ENOMEM;
1046 	}
1047 	req->cb_fn = cb_fn;
1048 	req->cb_arg = cb_arg;
1049 
1050 	lvol = calloc(1, sizeof(*lvol));
1051 	if (!lvol) {
1052 		free(req);
1053 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1054 		return -ENOMEM;
1055 	}
1056 	lvol->lvol_store = lvs;
1057 	num_clusters = spdk_divide_round_up(sz, spdk_bs_get_cluster_size(bs));
1058 	lvol->thin_provision = thin_provision;
1059 	lvol->clear_method = (enum blob_clear_method)clear_method;
1060 	snprintf(lvol->name, sizeof(lvol->name), "%s", name);
1061 	TAILQ_INSERT_TAIL(&lvol->lvol_store->pending_lvols, lvol, link);
1062 	spdk_uuid_generate(&lvol->uuid);
1063 	spdk_uuid_fmt_lower(lvol->uuid_str, sizeof(lvol->uuid_str), &lvol->uuid);
1064 	req->lvol = lvol;
1065 
1066 	spdk_blob_opts_init(&opts);
1067 	opts.thin_provision = thin_provision;
1068 	opts.num_clusters = num_clusters;
1069 	opts.clear_method = lvol->clear_method;
1070 	opts.xattrs.count = SPDK_COUNTOF(xattr_names);
1071 	opts.xattrs.names = xattr_names;
1072 	opts.xattrs.ctx = lvol;
1073 	opts.xattrs.get_value = lvol_get_xattr_value;
1074 
1075 	spdk_bs_create_blob_ext(lvs->blobstore, &opts, _spdk_lvol_create_cb, req);
1076 
1077 	return 0;
1078 }
1079 
1080 void
1081 spdk_lvol_create_snapshot(struct spdk_lvol *origlvol, const char *snapshot_name,
1082 			  spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1083 {
1084 	struct spdk_lvol_store *lvs;
1085 	struct spdk_lvol *newlvol;
1086 	struct spdk_blob *origblob;
1087 	struct spdk_lvol_with_handle_req *req;
1088 	struct spdk_blob_xattr_opts snapshot_xattrs;
1089 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1090 	int rc;
1091 
1092 	if (origlvol == NULL) {
1093 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol not provided.\n");
1094 		cb_fn(cb_arg, NULL, -EINVAL);
1095 		return;
1096 	}
1097 
1098 	origblob = origlvol->blob;
1099 	lvs = origlvol->lvol_store;
1100 	if (lvs == NULL) {
1101 		SPDK_ERRLOG("lvol store does not exist\n");
1102 		cb_fn(cb_arg, NULL, -EINVAL);
1103 		return;
1104 	}
1105 
1106 	rc = _spdk_lvs_verify_lvol_name(lvs, snapshot_name);
1107 	if (rc < 0) {
1108 		cb_fn(cb_arg, NULL, rc);
1109 		return;
1110 	}
1111 
1112 	req = calloc(1, sizeof(*req));
1113 	if (!req) {
1114 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1115 		cb_fn(cb_arg, NULL, -ENOMEM);
1116 		return;
1117 	}
1118 
1119 	newlvol = calloc(1, sizeof(*newlvol));
1120 	if (!newlvol) {
1121 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1122 		free(req);
1123 		cb_fn(cb_arg, NULL, -ENOMEM);
1124 		return;
1125 	}
1126 
1127 	newlvol->lvol_store = origlvol->lvol_store;
1128 	snprintf(newlvol->name, sizeof(newlvol->name), "%s", snapshot_name);
1129 	TAILQ_INSERT_TAIL(&newlvol->lvol_store->pending_lvols, newlvol, link);
1130 	spdk_uuid_generate(&newlvol->uuid);
1131 	spdk_uuid_fmt_lower(newlvol->uuid_str, sizeof(newlvol->uuid_str), &newlvol->uuid);
1132 	snapshot_xattrs.count = SPDK_COUNTOF(xattr_names);
1133 	snapshot_xattrs.ctx = newlvol;
1134 	snapshot_xattrs.names = xattr_names;
1135 	snapshot_xattrs.get_value = lvol_get_xattr_value;
1136 	req->lvol = newlvol;
1137 	req->cb_fn = cb_fn;
1138 	req->cb_arg = cb_arg;
1139 
1140 	spdk_bs_create_snapshot(lvs->blobstore, spdk_blob_get_id(origblob), &snapshot_xattrs,
1141 				_spdk_lvol_create_cb, req);
1142 }
1143 
1144 void
1145 spdk_lvol_create_clone(struct spdk_lvol *origlvol, const char *clone_name,
1146 		       spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1147 {
1148 	struct spdk_lvol *newlvol;
1149 	struct spdk_lvol_with_handle_req *req;
1150 	struct spdk_lvol_store *lvs;
1151 	struct spdk_blob *origblob;
1152 	struct spdk_blob_xattr_opts clone_xattrs;
1153 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1154 	int rc;
1155 
1156 	if (origlvol == NULL) {
1157 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol not provided.\n");
1158 		cb_fn(cb_arg, NULL, -EINVAL);
1159 		return;
1160 	}
1161 
1162 	origblob = origlvol->blob;
1163 	lvs = origlvol->lvol_store;
1164 	if (lvs == NULL) {
1165 		SPDK_ERRLOG("lvol store does not exist\n");
1166 		cb_fn(cb_arg, NULL, -EINVAL);
1167 		return;
1168 	}
1169 
1170 	rc = _spdk_lvs_verify_lvol_name(lvs, clone_name);
1171 	if (rc < 0) {
1172 		cb_fn(cb_arg, NULL, rc);
1173 		return;
1174 	}
1175 
1176 	req = calloc(1, sizeof(*req));
1177 	if (!req) {
1178 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1179 		cb_fn(cb_arg, NULL, -ENOMEM);
1180 		return;
1181 	}
1182 
1183 	newlvol = calloc(1, sizeof(*newlvol));
1184 	if (!newlvol) {
1185 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1186 		free(req);
1187 		cb_fn(cb_arg, NULL, -ENOMEM);
1188 		return;
1189 	}
1190 
1191 	newlvol->lvol_store = lvs;
1192 	snprintf(newlvol->name, sizeof(newlvol->name), "%s", clone_name);
1193 	TAILQ_INSERT_TAIL(&newlvol->lvol_store->pending_lvols, newlvol, link);
1194 	spdk_uuid_generate(&newlvol->uuid);
1195 	spdk_uuid_fmt_lower(newlvol->uuid_str, sizeof(newlvol->uuid_str), &newlvol->uuid);
1196 	clone_xattrs.count = SPDK_COUNTOF(xattr_names);
1197 	clone_xattrs.ctx = newlvol;
1198 	clone_xattrs.names = xattr_names;
1199 	clone_xattrs.get_value = lvol_get_xattr_value;
1200 	req->lvol = newlvol;
1201 	req->cb_fn = cb_fn;
1202 	req->cb_arg = cb_arg;
1203 
1204 	spdk_bs_create_clone(lvs->blobstore, spdk_blob_get_id(origblob), &clone_xattrs,
1205 			     _spdk_lvol_create_cb,
1206 			     req);
1207 }
1208 
1209 static void
1210 _spdk_lvol_resize_done(void *cb_arg, int lvolerrno)
1211 {
1212 	struct spdk_lvol_req *req = cb_arg;
1213 
1214 	req->cb_fn(req->cb_arg,  lvolerrno);
1215 	free(req);
1216 }
1217 
1218 static void
1219 _spdk_lvol_blob_resize_cb(void *cb_arg, int bserrno)
1220 {
1221 	struct spdk_lvol_req *req = cb_arg;
1222 	struct spdk_lvol *lvol = req->lvol;
1223 
1224 	if (bserrno != 0) {
1225 		req->cb_fn(req->cb_arg, bserrno);
1226 		free(req);
1227 		return;
1228 	}
1229 
1230 	spdk_blob_sync_md(lvol->blob, _spdk_lvol_resize_done, req);
1231 }
1232 
1233 void
1234 spdk_lvol_resize(struct spdk_lvol *lvol, uint64_t sz,
1235 		 spdk_lvol_op_complete cb_fn, void *cb_arg)
1236 {
1237 	struct spdk_blob *blob = lvol->blob;
1238 	struct spdk_lvol_store *lvs = lvol->lvol_store;
1239 	struct spdk_lvol_req *req;
1240 	uint64_t new_clusters = spdk_divide_round_up(sz, spdk_bs_get_cluster_size(lvs->blobstore));
1241 
1242 	req = calloc(1, sizeof(*req));
1243 	if (!req) {
1244 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1245 		cb_fn(cb_arg, -ENOMEM);
1246 		return;
1247 	}
1248 	req->cb_fn = cb_fn;
1249 	req->cb_arg = cb_arg;
1250 	req->lvol = lvol;
1251 
1252 	spdk_blob_resize(blob, new_clusters, _spdk_lvol_blob_resize_cb, req);
1253 }
1254 
1255 static void
1256 _spdk_lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
1257 {
1258 	struct spdk_lvol_req *req = cb_arg;
1259 
1260 	req->cb_fn(req->cb_arg, lvolerrno);
1261 	free(req);
1262 }
1263 
1264 void
1265 spdk_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1266 {
1267 	struct spdk_lvol_req *req;
1268 
1269 	req = calloc(1, sizeof(*req));
1270 	if (!req) {
1271 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1272 		cb_fn(cb_arg, -ENOMEM);
1273 		return;
1274 	}
1275 	req->cb_fn = cb_fn;
1276 	req->cb_arg = cb_arg;
1277 
1278 	spdk_blob_set_read_only(lvol->blob);
1279 	spdk_blob_sync_md(lvol->blob, _spdk_lvol_set_read_only_cb, req);
1280 }
1281 
1282 static void
1283 _spdk_lvol_rename_cb(void *cb_arg, int lvolerrno)
1284 {
1285 	struct spdk_lvol_req *req = cb_arg;
1286 
1287 	if (lvolerrno != 0) {
1288 		SPDK_ERRLOG("Lvol rename operation failed\n");
1289 	} else {
1290 		snprintf(req->lvol->name, sizeof(req->lvol->name), "%s", req->name);
1291 	}
1292 
1293 	req->cb_fn(req->cb_arg, lvolerrno);
1294 	free(req);
1295 }
1296 
1297 void
1298 spdk_lvol_rename(struct spdk_lvol *lvol, const char *new_name,
1299 		 spdk_lvol_op_complete cb_fn, void *cb_arg)
1300 {
1301 	struct spdk_lvol *tmp;
1302 	struct spdk_blob *blob = lvol->blob;
1303 	struct spdk_lvol_req *req;
1304 	int rc;
1305 
1306 	/* Check if new name is current lvol name.
1307 	 * If so, return success immediately */
1308 	if (strncmp(lvol->name, new_name, SPDK_LVOL_NAME_MAX) == 0) {
1309 		cb_fn(cb_arg, 0);
1310 		return;
1311 	}
1312 
1313 	/* Check if lvol with 'new_name' already exists in lvolstore */
1314 	TAILQ_FOREACH(tmp, &lvol->lvol_store->lvols, link) {
1315 		if (strncmp(tmp->name, new_name, SPDK_LVOL_NAME_MAX) == 0) {
1316 			SPDK_ERRLOG("Lvol %s already exists in lvol store %s\n", new_name, lvol->lvol_store->name);
1317 			cb_fn(cb_arg, -EEXIST);
1318 			return;
1319 		}
1320 	}
1321 
1322 	req = calloc(1, sizeof(*req));
1323 	if (!req) {
1324 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1325 		cb_fn(cb_arg, -ENOMEM);
1326 		return;
1327 	}
1328 	req->cb_fn = cb_fn;
1329 	req->cb_arg = cb_arg;
1330 	req->lvol = lvol;
1331 	snprintf(req->name, sizeof(req->name), "%s", new_name);
1332 
1333 	rc = spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1334 	if (rc < 0) {
1335 		free(req);
1336 		cb_fn(cb_arg, rc);
1337 		return;
1338 	}
1339 
1340 	spdk_blob_sync_md(blob, _spdk_lvol_rename_cb, req);
1341 }
1342 
1343 void
1344 spdk_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1345 {
1346 	struct spdk_lvol_req *req;
1347 	struct spdk_blob_store *bs;
1348 
1349 	assert(cb_fn != NULL);
1350 
1351 	if (lvol == NULL) {
1352 		SPDK_ERRLOG("lvol does not exist\n");
1353 		cb_fn(cb_arg, -ENODEV);
1354 		return;
1355 	}
1356 
1357 	if (lvol->ref_count != 0) {
1358 		SPDK_ERRLOG("Cannot destroy lvol %s because it is still open\n", lvol->unique_id);
1359 		cb_fn(cb_arg, -EBUSY);
1360 		return;
1361 	}
1362 
1363 	lvol->action_in_progress = true;
1364 
1365 	req = calloc(1, sizeof(*req));
1366 	if (!req) {
1367 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1368 		cb_fn(cb_arg, -ENOMEM);
1369 		return;
1370 	}
1371 
1372 	req->cb_fn = cb_fn;
1373 	req->cb_arg = cb_arg;
1374 	req->lvol = lvol;
1375 	bs = lvol->lvol_store->blobstore;
1376 
1377 	spdk_bs_delete_blob(bs, lvol->blob_id, _spdk_lvol_delete_blob_cb, req);
1378 }
1379 
1380 void
1381 spdk_lvol_close(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1382 {
1383 	struct spdk_lvol_req *req;
1384 
1385 	assert(cb_fn != NULL);
1386 
1387 	if (lvol == NULL) {
1388 		SPDK_ERRLOG("lvol does not exist\n");
1389 		cb_fn(cb_arg, -ENODEV);
1390 		return;
1391 	}
1392 
1393 	if (lvol->ref_count > 1) {
1394 		lvol->ref_count--;
1395 		cb_fn(cb_arg, 0);
1396 		return;
1397 	} else if (lvol->ref_count == 0) {
1398 		cb_fn(cb_arg, -EINVAL);
1399 		return;
1400 	}
1401 
1402 	lvol->action_in_progress = true;
1403 
1404 	req = calloc(1, sizeof(*req));
1405 	if (!req) {
1406 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1407 		cb_fn(cb_arg, -ENOMEM);
1408 		return;
1409 	}
1410 
1411 	req->cb_fn = cb_fn;
1412 	req->cb_arg = cb_arg;
1413 	req->lvol = lvol;
1414 
1415 	spdk_blob_close(lvol->blob, _spdk_lvol_close_blob_cb, req);
1416 }
1417 
1418 struct spdk_io_channel *
1419 spdk_lvol_get_io_channel(struct spdk_lvol *lvol)
1420 {
1421 	return spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1422 }
1423 
1424 static void
1425 _spdk_lvol_inflate_cb(void *cb_arg, int lvolerrno)
1426 {
1427 	struct spdk_lvol_req *req = cb_arg;
1428 
1429 	spdk_bs_free_io_channel(req->channel);
1430 
1431 	if (lvolerrno < 0) {
1432 		SPDK_ERRLOG("Could not inflate lvol\n");
1433 	}
1434 
1435 	req->cb_fn(req->cb_arg, lvolerrno);
1436 	free(req);
1437 }
1438 
1439 void
1440 spdk_lvol_inflate(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1441 {
1442 	struct spdk_lvol_req *req;
1443 	spdk_blob_id blob_id;
1444 
1445 	assert(cb_fn != NULL);
1446 
1447 	if (lvol == NULL) {
1448 		SPDK_ERRLOG("Lvol does not exist\n");
1449 		cb_fn(cb_arg, -ENODEV);
1450 		return;
1451 	}
1452 
1453 	req = calloc(1, sizeof(*req));
1454 	if (!req) {
1455 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1456 		cb_fn(cb_arg, -ENOMEM);
1457 		return;
1458 	}
1459 
1460 	req->cb_fn = cb_fn;
1461 	req->cb_arg = cb_arg;
1462 	req->channel = spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1463 	if (req->channel == NULL) {
1464 		SPDK_ERRLOG("Cannot alloc io channel for lvol inflate request\n");
1465 		free(req);
1466 		cb_fn(cb_arg, -ENOMEM);
1467 		return;
1468 	}
1469 
1470 	blob_id = spdk_blob_get_id(lvol->blob);
1471 	spdk_bs_inflate_blob(lvol->lvol_store->blobstore, req->channel, blob_id, _spdk_lvol_inflate_cb,
1472 			     req);
1473 }
1474 
1475 void
1476 spdk_lvol_decouple_parent(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1477 {
1478 	struct spdk_lvol_req *req;
1479 	spdk_blob_id blob_id;
1480 
1481 	assert(cb_fn != NULL);
1482 
1483 	if (lvol == NULL) {
1484 		SPDK_ERRLOG("Lvol does not exist\n");
1485 		cb_fn(cb_arg, -ENODEV);
1486 		return;
1487 	}
1488 
1489 	req = calloc(1, sizeof(*req));
1490 	if (!req) {
1491 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1492 		cb_fn(cb_arg, -ENOMEM);
1493 		return;
1494 	}
1495 
1496 	req->cb_fn = cb_fn;
1497 	req->cb_arg = cb_arg;
1498 	req->channel = spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1499 	if (req->channel == NULL) {
1500 		SPDK_ERRLOG("Cannot alloc io channel for lvol inflate request\n");
1501 		free(req);
1502 		cb_fn(cb_arg, -ENOMEM);
1503 		return;
1504 	}
1505 
1506 	blob_id = spdk_blob_get_id(lvol->blob);
1507 	spdk_bs_blob_decouple_parent(lvol->lvol_store->blobstore, req->channel, blob_id,
1508 				     _spdk_lvol_inflate_cb, req);
1509 }
1510