xref: /spdk/lib/lvol/lvol.c (revision 9889ab2dc80e40dae92dcef361d53dcba722043d)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk_internal/lvolstore.h"
35 #include "spdk_internal/log.h"
36 #include "spdk/string.h"
37 #include "spdk/thread.h"
38 #include "spdk/blob_bdev.h"
39 #include "spdk/util.h"
40 
41 /* Default blob channel opts for lvol */
42 #define SPDK_LVOL_BLOB_OPTS_CHANNEL_OPS 512
43 
44 #define LVOL_NAME "name"
45 
46 SPDK_LOG_REGISTER_COMPONENT("lvol", SPDK_LOG_LVOL)
47 
48 static TAILQ_HEAD(, spdk_lvol_store) g_lvol_stores = TAILQ_HEAD_INITIALIZER(g_lvol_stores);
49 static pthread_mutex_t g_lvol_stores_mutex = PTHREAD_MUTEX_INITIALIZER;
50 
51 static int
52 _spdk_add_lvs_to_list(struct spdk_lvol_store *lvs)
53 {
54 	struct spdk_lvol_store *tmp;
55 	bool name_conflict = false;
56 
57 	pthread_mutex_lock(&g_lvol_stores_mutex);
58 	TAILQ_FOREACH(tmp, &g_lvol_stores, link) {
59 		if (!strncmp(lvs->name, tmp->name, SPDK_LVS_NAME_MAX)) {
60 			name_conflict = true;
61 			break;
62 		}
63 	}
64 	if (!name_conflict) {
65 		lvs->on_list = true;
66 		TAILQ_INSERT_TAIL(&g_lvol_stores, lvs, link);
67 	}
68 	pthread_mutex_unlock(&g_lvol_stores_mutex);
69 
70 	return name_conflict ? -1 : 0;
71 }
72 
73 static void
74 _spdk_lvs_free(struct spdk_lvol_store *lvs)
75 {
76 	pthread_mutex_lock(&g_lvol_stores_mutex);
77 	if (lvs->on_list) {
78 		TAILQ_REMOVE(&g_lvol_stores, lvs, link);
79 	}
80 	pthread_mutex_unlock(&g_lvol_stores_mutex);
81 
82 	free(lvs);
83 }
84 
85 static void
86 _spdk_lvol_free(struct spdk_lvol *lvol)
87 {
88 	free(lvol);
89 }
90 
91 static void
92 _spdk_lvol_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
93 {
94 	struct spdk_lvol_with_handle_req *req = cb_arg;
95 	struct spdk_lvol *lvol = req->lvol;
96 
97 	if (lvolerrno != 0) {
98 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Failed to open lvol %s\n", lvol->unique_id);
99 		goto end;
100 	}
101 
102 	lvol->ref_count++;
103 	lvol->blob = blob;
104 end:
105 	req->cb_fn(req->cb_arg, lvol, lvolerrno);
106 	free(req);
107 }
108 
109 void
110 spdk_lvol_open(struct spdk_lvol *lvol, spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
111 {
112 	struct spdk_lvol_with_handle_req *req;
113 	struct spdk_blob_open_opts opts;
114 
115 	assert(cb_fn != NULL);
116 
117 	if (lvol == NULL) {
118 		SPDK_ERRLOG("lvol does not exist\n");
119 		cb_fn(cb_arg, NULL, -ENODEV);
120 		return;
121 	}
122 
123 	if (lvol->action_in_progress == true) {
124 		SPDK_ERRLOG("Cannot open lvol - operations on lvol pending\n");
125 		cb_fn(cb_arg, lvol, -EBUSY);
126 		return;
127 	}
128 
129 	if (lvol->ref_count > 0) {
130 		lvol->ref_count++;
131 		cb_fn(cb_arg, lvol, 0);
132 		return;
133 	}
134 
135 	req = calloc(1, sizeof(*req));
136 	if (req == NULL) {
137 		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
138 		cb_fn(cb_arg, NULL, -ENOMEM);
139 		return;
140 	}
141 
142 	req->cb_fn = cb_fn;
143 	req->cb_arg = cb_arg;
144 	req->lvol = lvol;
145 
146 	spdk_blob_open_opts_init(&opts);
147 	opts.clear_method = lvol->clear_method;
148 
149 	spdk_bs_open_blob_ext(lvol->lvol_store->blobstore, lvol->blob_id, &opts, _spdk_lvol_open_cb, req);
150 }
151 
152 static void
153 _spdk_bs_unload_with_error_cb(void *cb_arg, int lvolerrno)
154 {
155 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
156 
157 	req->cb_fn(req->cb_arg, NULL, req->lvserrno);
158 	free(req);
159 }
160 
161 static void
162 _spdk_load_next_lvol(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
163 {
164 	struct spdk_lvs_with_handle_req *req = cb_arg;
165 	struct spdk_lvol_store *lvs = req->lvol_store;
166 	struct spdk_blob_store *bs = lvs->blobstore;
167 	struct spdk_lvol *lvol, *tmp;
168 	spdk_blob_id blob_id;
169 	const char *attr;
170 	const enum blob_clear_method *clear_method;
171 	size_t value_len;
172 	int rc;
173 
174 	if (lvolerrno == -ENOENT) {
175 		/* Finished iterating */
176 		req->cb_fn(req->cb_arg, lvs, 0);
177 		free(req);
178 		return;
179 	} else if (lvolerrno < 0) {
180 		SPDK_ERRLOG("Failed to fetch blobs list\n");
181 		req->lvserrno = lvolerrno;
182 		goto invalid;
183 	}
184 
185 	blob_id = spdk_blob_get_id(blob);
186 
187 	if (blob_id == lvs->super_blob_id) {
188 		SPDK_INFOLOG(SPDK_LOG_LVOL, "found superblob %"PRIu64"\n", (uint64_t)blob_id);
189 		spdk_bs_iter_next(bs, blob, _spdk_load_next_lvol, req);
190 		return;
191 	}
192 
193 	lvol = calloc(1, sizeof(*lvol));
194 	if (!lvol) {
195 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
196 		req->lvserrno = -ENOMEM;
197 		goto invalid;
198 	}
199 
200 	lvol->blob = blob;
201 	lvol->blob_id = blob_id;
202 	lvol->lvol_store = lvs;
203 
204 	rc = spdk_blob_get_xattr_value(blob, "uuid", (const void **)&attr, &value_len);
205 	if (rc != 0 || value_len != SPDK_UUID_STRING_LEN || attr[SPDK_UUID_STRING_LEN - 1] != '\0' ||
206 	    spdk_uuid_parse(&lvol->uuid, attr) != 0) {
207 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Missing or corrupt lvol uuid\n");
208 		memset(&lvol->uuid, 0, sizeof(lvol->uuid));
209 	}
210 	spdk_uuid_fmt_lower(lvol->uuid_str, sizeof(lvol->uuid_str), &lvol->uuid);
211 
212 	if (!spdk_mem_all_zero(&lvol->uuid, sizeof(lvol->uuid))) {
213 		snprintf(lvol->unique_id, sizeof(lvol->unique_id), "%s", lvol->uuid_str);
214 	} else {
215 		spdk_uuid_fmt_lower(lvol->unique_id, sizeof(lvol->unique_id), &lvol->lvol_store->uuid);
216 		value_len = strlen(lvol->unique_id);
217 		snprintf(lvol->unique_id + value_len, sizeof(lvol->unique_id) - value_len, "_%"PRIu64,
218 			 (uint64_t)blob_id);
219 	}
220 
221 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&attr, &value_len);
222 	if (rc != 0 || value_len > SPDK_LVOL_NAME_MAX) {
223 		SPDK_ERRLOG("Cannot assign lvol name\n");
224 		_spdk_lvol_free(lvol);
225 		req->lvserrno = -EINVAL;
226 		goto invalid;
227 	}
228 
229 	rc = spdk_blob_get_xattr_value(blob, "clear_method", (const void **)&clear_method, &value_len);
230 	if (rc != 0) {
231 		lvol->clear_method = BLOB_CLEAR_WITH_DEFAULT;
232 	} else {
233 		lvol->clear_method = *clear_method;
234 	}
235 
236 	snprintf(lvol->name, sizeof(lvol->name), "%s", attr);
237 
238 	TAILQ_INSERT_TAIL(&lvs->lvols, lvol, link);
239 
240 	lvs->lvol_count++;
241 
242 	SPDK_INFOLOG(SPDK_LOG_LVOL, "added lvol %s (%s)\n", lvol->unique_id, lvol->uuid_str);
243 
244 	spdk_bs_iter_next(bs, blob, _spdk_load_next_lvol, req);
245 
246 	return;
247 
248 invalid:
249 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
250 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
251 		free(lvol);
252 	}
253 
254 	_spdk_lvs_free(lvs);
255 	spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
256 }
257 
258 static void
259 _spdk_close_super_cb(void *cb_arg, int lvolerrno)
260 {
261 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
262 	struct spdk_lvol_store *lvs = req->lvol_store;
263 	struct spdk_blob_store *bs = lvs->blobstore;
264 
265 	if (lvolerrno != 0) {
266 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Could not close super blob\n");
267 		_spdk_lvs_free(lvs);
268 		req->lvserrno = -ENODEV;
269 		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
270 		return;
271 	}
272 
273 	/* Start loading lvols */
274 	spdk_bs_iter_first(lvs->blobstore, _spdk_load_next_lvol, req);
275 }
276 
277 static void
278 _spdk_close_super_blob_with_error_cb(void *cb_arg, int lvolerrno)
279 {
280 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
281 	struct spdk_lvol_store *lvs = req->lvol_store;
282 	struct spdk_blob_store *bs = lvs->blobstore;
283 
284 	_spdk_lvs_free(lvs);
285 
286 	spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
287 }
288 
289 static void
290 _spdk_lvs_read_uuid(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
291 {
292 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
293 	struct spdk_lvol_store *lvs = req->lvol_store;
294 	struct spdk_blob_store *bs = lvs->blobstore;
295 	const char *attr;
296 	size_t value_len;
297 	int rc;
298 
299 	if (lvolerrno != 0) {
300 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Could not open super blob\n");
301 		_spdk_lvs_free(lvs);
302 		req->lvserrno = -ENODEV;
303 		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
304 		return;
305 	}
306 
307 	rc = spdk_blob_get_xattr_value(blob, "uuid", (const void **)&attr, &value_len);
308 	if (rc != 0 || value_len != SPDK_UUID_STRING_LEN || attr[SPDK_UUID_STRING_LEN - 1] != '\0') {
309 		SPDK_INFOLOG(SPDK_LOG_LVOL, "missing or incorrect UUID\n");
310 		req->lvserrno = -EINVAL;
311 		spdk_blob_close(blob, _spdk_close_super_blob_with_error_cb, req);
312 		return;
313 	}
314 
315 	if (spdk_uuid_parse(&lvs->uuid, attr)) {
316 		SPDK_INFOLOG(SPDK_LOG_LVOL, "incorrect UUID '%s'\n", attr);
317 		req->lvserrno = -EINVAL;
318 		spdk_blob_close(blob, _spdk_close_super_blob_with_error_cb, req);
319 		return;
320 	}
321 
322 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&attr, &value_len);
323 	if (rc != 0 || value_len > SPDK_LVS_NAME_MAX) {
324 		SPDK_INFOLOG(SPDK_LOG_LVOL, "missing or invalid name\n");
325 		req->lvserrno = -EINVAL;
326 		spdk_blob_close(blob, _spdk_close_super_blob_with_error_cb, req);
327 		return;
328 	}
329 
330 	snprintf(lvs->name, sizeof(lvs->name), "%s", attr);
331 
332 	rc = _spdk_add_lvs_to_list(lvs);
333 	if (rc) {
334 		SPDK_INFOLOG(SPDK_LOG_LVOL, "lvolstore with name %s already exists\n", lvs->name);
335 		req->lvserrno = -EEXIST;
336 		spdk_blob_close(blob, _spdk_close_super_blob_with_error_cb, req);
337 		return;
338 	}
339 
340 	lvs->super_blob_id = spdk_blob_get_id(blob);
341 
342 	spdk_blob_close(blob, _spdk_close_super_cb, req);
343 }
344 
345 static void
346 _spdk_lvs_open_super(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
347 {
348 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
349 	struct spdk_lvol_store *lvs = req->lvol_store;
350 	struct spdk_blob_store *bs = lvs->blobstore;
351 
352 	if (lvolerrno != 0) {
353 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Super blob not found\n");
354 		_spdk_lvs_free(lvs);
355 		req->lvserrno = -ENODEV;
356 		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
357 		return;
358 	}
359 
360 	spdk_bs_open_blob(bs, blobid, _spdk_lvs_read_uuid, req);
361 }
362 
363 static void
364 _spdk_lvs_load_cb(void *cb_arg, struct spdk_blob_store *bs, int lvolerrno)
365 {
366 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
367 	struct spdk_lvol_store *lvs;
368 
369 	if (lvolerrno != 0) {
370 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
371 		free(req);
372 		return;
373 	}
374 
375 	lvs = calloc(1, sizeof(*lvs));
376 	if (lvs == NULL) {
377 		SPDK_ERRLOG("Cannot alloc memory for lvol store\n");
378 		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
379 		return;
380 	}
381 
382 	lvs->blobstore = bs;
383 	lvs->bs_dev = req->bs_dev;
384 	TAILQ_INIT(&lvs->lvols);
385 	TAILQ_INIT(&lvs->pending_lvols);
386 
387 	req->lvol_store = lvs;
388 
389 	spdk_bs_get_super(bs, _spdk_lvs_open_super, req);
390 }
391 
392 static void
393 spdk_lvs_bs_opts_init(struct spdk_bs_opts *opts)
394 {
395 	spdk_bs_opts_init(opts);
396 	opts->max_channel_ops = SPDK_LVOL_BLOB_OPTS_CHANNEL_OPS;
397 }
398 
399 void
400 spdk_lvs_load(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
401 {
402 	struct spdk_lvs_with_handle_req *req;
403 	struct spdk_bs_opts opts = {};
404 
405 	assert(cb_fn != NULL);
406 
407 	if (bs_dev == NULL) {
408 		SPDK_ERRLOG("Blobstore device does not exist\n");
409 		cb_fn(cb_arg, NULL, -ENODEV);
410 		return;
411 	}
412 
413 	req = calloc(1, sizeof(*req));
414 	if (req == NULL) {
415 		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
416 		cb_fn(cb_arg, NULL, -ENOMEM);
417 		return;
418 	}
419 
420 	req->cb_fn = cb_fn;
421 	req->cb_arg = cb_arg;
422 	req->bs_dev = bs_dev;
423 
424 	spdk_lvs_bs_opts_init(&opts);
425 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "LVOLSTORE");
426 
427 	spdk_bs_load(bs_dev, &opts, _spdk_lvs_load_cb, req);
428 }
429 
430 static void
431 _spdk_remove_bs_on_error_cb(void *cb_arg, int bserrno)
432 {
433 }
434 
435 static void
436 _spdk_super_create_close_cb(void *cb_arg, int lvolerrno)
437 {
438 	struct spdk_lvs_with_handle_req *req = cb_arg;
439 	struct spdk_lvol_store *lvs = req->lvol_store;
440 
441 	if (lvolerrno < 0) {
442 		SPDK_ERRLOG("Lvol store init failed: could not close super blob\n");
443 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
444 		spdk_bs_destroy(lvs->blobstore, _spdk_remove_bs_on_error_cb, NULL);
445 		_spdk_lvs_free(lvs);
446 		free(req);
447 		return;
448 	}
449 
450 	req->cb_fn(req->cb_arg, lvs, lvolerrno);
451 	free(req);
452 }
453 
454 static void
455 _spdk_super_blob_set_cb(void *cb_arg, int lvolerrno)
456 {
457 	struct spdk_lvs_with_handle_req *req = cb_arg;
458 	struct spdk_lvol_store *lvs = req->lvol_store;
459 	struct spdk_blob *blob = lvs->super_blob;
460 
461 	if (lvolerrno < 0) {
462 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
463 		SPDK_ERRLOG("Lvol store init failed: could not set uuid for super blob\n");
464 		spdk_bs_destroy(lvs->blobstore, _spdk_remove_bs_on_error_cb, NULL);
465 		_spdk_lvs_free(lvs);
466 		free(req);
467 		return;
468 	}
469 
470 	spdk_blob_close(blob, _spdk_super_create_close_cb, req);
471 }
472 
473 static void
474 _spdk_super_blob_init_cb(void *cb_arg, int lvolerrno)
475 {
476 	struct spdk_lvs_with_handle_req *req = cb_arg;
477 	struct spdk_lvol_store *lvs = req->lvol_store;
478 	struct spdk_blob *blob = lvs->super_blob;
479 	char uuid[SPDK_UUID_STRING_LEN];
480 
481 	if (lvolerrno < 0) {
482 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
483 		SPDK_ERRLOG("Lvol store init failed: could not set super blob\n");
484 		spdk_bs_destroy(lvs->blobstore, _spdk_remove_bs_on_error_cb, NULL);
485 		_spdk_lvs_free(lvs);
486 		free(req);
487 		return;
488 	}
489 
490 	spdk_uuid_fmt_lower(uuid, sizeof(uuid), &lvs->uuid);
491 
492 	spdk_blob_set_xattr(blob, "uuid", uuid, sizeof(uuid));
493 	spdk_blob_set_xattr(blob, "name", lvs->name, strnlen(lvs->name, SPDK_LVS_NAME_MAX) + 1);
494 	spdk_blob_sync_md(blob, _spdk_super_blob_set_cb, req);
495 }
496 
497 static void
498 _spdk_super_blob_create_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
499 {
500 	struct spdk_lvs_with_handle_req *req = cb_arg;
501 	struct spdk_lvol_store *lvs = req->lvol_store;
502 
503 	if (lvolerrno < 0) {
504 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
505 		SPDK_ERRLOG("Lvol store init failed: could not open super blob\n");
506 		spdk_bs_destroy(lvs->blobstore, _spdk_remove_bs_on_error_cb, NULL);
507 		_spdk_lvs_free(lvs);
508 		free(req);
509 		return;
510 	}
511 
512 	lvs->super_blob = blob;
513 	lvs->super_blob_id = spdk_blob_get_id(blob);
514 
515 	spdk_bs_set_super(lvs->blobstore, lvs->super_blob_id, _spdk_super_blob_init_cb, req);
516 }
517 
518 static void
519 _spdk_super_blob_create_cb(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
520 {
521 	struct spdk_lvs_with_handle_req *req = cb_arg;
522 	struct spdk_lvol_store *lvs = req->lvol_store;
523 	struct spdk_blob_store *bs;
524 
525 	if (lvolerrno < 0) {
526 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
527 		SPDK_ERRLOG("Lvol store init failed: could not create super blob\n");
528 		spdk_bs_destroy(lvs->blobstore, _spdk_remove_bs_on_error_cb, NULL);
529 		_spdk_lvs_free(lvs);
530 		free(req);
531 		return;
532 	}
533 
534 	bs = req->lvol_store->blobstore;
535 
536 	spdk_bs_open_blob(bs, blobid, _spdk_super_blob_create_open_cb, req);
537 }
538 
539 static void
540 _spdk_lvs_init_cb(void *cb_arg, struct spdk_blob_store *bs, int lvserrno)
541 {
542 	struct spdk_lvs_with_handle_req *lvs_req = cb_arg;
543 	struct spdk_lvol_store *lvs = lvs_req->lvol_store;
544 
545 	if (lvserrno != 0) {
546 		assert(bs == NULL);
547 		lvs_req->cb_fn(lvs_req->cb_arg, NULL, lvserrno);
548 		SPDK_ERRLOG("Lvol store init failed: could not initialize blobstore\n");
549 		_spdk_lvs_free(lvs);
550 		free(lvs_req);
551 		return;
552 	}
553 
554 	assert(bs != NULL);
555 	lvs->blobstore = bs;
556 	TAILQ_INIT(&lvs->lvols);
557 	TAILQ_INIT(&lvs->pending_lvols);
558 
559 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol store initialized\n");
560 
561 	/* create super blob */
562 	spdk_bs_create_blob(lvs->blobstore, _spdk_super_blob_create_cb, lvs_req);
563 }
564 
565 void
566 spdk_lvs_opts_init(struct spdk_lvs_opts *o)
567 {
568 	o->cluster_sz = SPDK_LVS_OPTS_CLUSTER_SZ;
569 	o->clear_method = LVS_CLEAR_WITH_UNMAP;
570 	memset(o->name, 0, sizeof(o->name));
571 }
572 
573 static void
574 _spdk_setup_lvs_opts(struct spdk_bs_opts *bs_opts, struct spdk_lvs_opts *o)
575 {
576 	assert(o != NULL);
577 	spdk_lvs_bs_opts_init(bs_opts);
578 	bs_opts->cluster_sz = o->cluster_sz;
579 	bs_opts->clear_method = (enum bs_clear_method)o->clear_method;
580 }
581 
582 int
583 spdk_lvs_init(struct spdk_bs_dev *bs_dev, struct spdk_lvs_opts *o,
584 	      spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
585 {
586 	struct spdk_lvol_store *lvs;
587 	struct spdk_lvs_with_handle_req *lvs_req;
588 	struct spdk_bs_opts opts = {};
589 	int rc;
590 
591 	if (bs_dev == NULL) {
592 		SPDK_ERRLOG("Blobstore device does not exist\n");
593 		return -ENODEV;
594 	}
595 
596 	if (o == NULL) {
597 		SPDK_ERRLOG("spdk_lvs_opts not specified\n");
598 		return -EINVAL;
599 	}
600 
601 	_spdk_setup_lvs_opts(&opts, o);
602 
603 	if (strnlen(o->name, SPDK_LVS_NAME_MAX) == SPDK_LVS_NAME_MAX) {
604 		SPDK_ERRLOG("Name has no null terminator.\n");
605 		return -EINVAL;
606 	}
607 
608 	if (strnlen(o->name, SPDK_LVS_NAME_MAX) == 0) {
609 		SPDK_ERRLOG("No name specified.\n");
610 		return -EINVAL;
611 	}
612 
613 	lvs = calloc(1, sizeof(*lvs));
614 	if (!lvs) {
615 		SPDK_ERRLOG("Cannot alloc memory for lvol store base pointer\n");
616 		return -ENOMEM;
617 	}
618 
619 	spdk_uuid_generate(&lvs->uuid);
620 	snprintf(lvs->name, sizeof(lvs->name), "%s", o->name);
621 
622 	rc = _spdk_add_lvs_to_list(lvs);
623 	if (rc) {
624 		SPDK_ERRLOG("lvolstore with name %s already exists\n", lvs->name);
625 		_spdk_lvs_free(lvs);
626 		return -EEXIST;
627 	}
628 
629 	lvs_req = calloc(1, sizeof(*lvs_req));
630 	if (!lvs_req) {
631 		_spdk_lvs_free(lvs);
632 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
633 		return -ENOMEM;
634 	}
635 
636 	assert(cb_fn != NULL);
637 	lvs_req->cb_fn = cb_fn;
638 	lvs_req->cb_arg = cb_arg;
639 	lvs_req->lvol_store = lvs;
640 	lvs->bs_dev = bs_dev;
641 	lvs->destruct = false;
642 
643 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "LVOLSTORE");
644 
645 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Initializing lvol store\n");
646 	spdk_bs_init(bs_dev, &opts, _spdk_lvs_init_cb, lvs_req);
647 
648 	return 0;
649 }
650 
651 static void
652 _spdk_lvs_rename_cb(void *cb_arg, int lvolerrno)
653 {
654 	struct spdk_lvs_req *req = cb_arg;
655 
656 	if (lvolerrno != 0) {
657 		req->lvserrno = lvolerrno;
658 	}
659 	if (req->lvserrno != 0) {
660 		SPDK_ERRLOG("Lvol store rename operation failed\n");
661 		/* Lvs renaming failed, so we should 'clear' new_name.
662 		 * Otherwise it could cause a failure on the next attepmt to change the name to 'new_name'  */
663 		snprintf(req->lvol_store->new_name,
664 			 sizeof(req->lvol_store->new_name),
665 			 "%s", req->lvol_store->name);
666 	} else {
667 		/* Update lvs name with new_name */
668 		snprintf(req->lvol_store->name,
669 			 sizeof(req->lvol_store->name),
670 			 "%s", req->lvol_store->new_name);
671 	}
672 
673 	req->cb_fn(req->cb_arg, req->lvserrno);
674 	free(req);
675 }
676 
677 static void
678 _spdk_lvs_rename_sync_cb(void *cb_arg, int lvolerrno)
679 {
680 	struct spdk_lvs_req *req = cb_arg;
681 	struct spdk_blob *blob = req->lvol_store->super_blob;
682 
683 	if (lvolerrno < 0) {
684 		req->lvserrno = lvolerrno;
685 	}
686 
687 	spdk_blob_close(blob, _spdk_lvs_rename_cb, req);
688 }
689 
690 static void
691 _spdk_lvs_rename_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
692 {
693 	struct spdk_lvs_req *req = cb_arg;
694 	int rc;
695 
696 	if (lvolerrno < 0) {
697 		_spdk_lvs_rename_cb(cb_arg, lvolerrno);
698 		return;
699 	}
700 
701 	rc = spdk_blob_set_xattr(blob, "name", req->lvol_store->new_name,
702 				 strlen(req->lvol_store->new_name) + 1);
703 	if (rc < 0) {
704 		req->lvserrno = rc;
705 		_spdk_lvs_rename_sync_cb(req, rc);
706 		return;
707 	}
708 
709 	req->lvol_store->super_blob = blob;
710 
711 	spdk_blob_sync_md(blob, _spdk_lvs_rename_sync_cb, req);
712 }
713 
714 void
715 spdk_lvs_rename(struct spdk_lvol_store *lvs, const char *new_name,
716 		spdk_lvs_op_complete cb_fn, void *cb_arg)
717 {
718 	struct spdk_lvs_req *req;
719 	struct spdk_lvol_store *tmp;
720 
721 	/* Check if new name is current lvs name.
722 	 * If so, return success immediately */
723 	if (strncmp(lvs->name, new_name, SPDK_LVS_NAME_MAX) == 0) {
724 		cb_fn(cb_arg, 0);
725 		return;
726 	}
727 
728 	/* Check if new or new_name is already used in other lvs */
729 	pthread_mutex_lock(&g_lvol_stores_mutex);
730 	TAILQ_FOREACH(tmp, &g_lvol_stores, link) {
731 		if (!strncmp(new_name, tmp->name, SPDK_LVS_NAME_MAX) ||
732 		    !strncmp(new_name, tmp->new_name, SPDK_LVS_NAME_MAX)) {
733 			pthread_mutex_unlock(&g_lvol_stores_mutex);
734 			cb_fn(cb_arg, -EEXIST);
735 			return;
736 		}
737 	}
738 	pthread_mutex_unlock(&g_lvol_stores_mutex);
739 
740 	req = calloc(1, sizeof(*req));
741 	if (!req) {
742 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
743 		cb_fn(cb_arg, -ENOMEM);
744 		return;
745 	}
746 	snprintf(lvs->new_name, sizeof(lvs->new_name), "%s", new_name);
747 	req->lvol_store = lvs;
748 	req->cb_fn = cb_fn;
749 	req->cb_arg = cb_arg;
750 
751 	spdk_bs_open_blob(lvs->blobstore, lvs->super_blob_id, _spdk_lvs_rename_open_cb, req);
752 }
753 
754 static void
755 _lvs_unload_cb(void *cb_arg, int lvserrno)
756 {
757 	struct spdk_lvs_req *lvs_req = cb_arg;
758 
759 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol store unloaded\n");
760 	assert(lvs_req->cb_fn != NULL);
761 	lvs_req->cb_fn(lvs_req->cb_arg, lvserrno);
762 	free(lvs_req);
763 }
764 
765 int
766 spdk_lvs_unload(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn,
767 		void *cb_arg)
768 {
769 	struct spdk_lvs_req *lvs_req;
770 	struct spdk_lvol *lvol, *tmp;
771 
772 	if (lvs == NULL) {
773 		SPDK_ERRLOG("Lvol store is NULL\n");
774 		return -ENODEV;
775 	}
776 
777 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
778 		if (lvol->action_in_progress == true) {
779 			SPDK_ERRLOG("Cannot unload lvol store - operations on lvols pending\n");
780 			cb_fn(cb_arg, -EBUSY);
781 			return -EBUSY;
782 		} else if (lvol->ref_count != 0) {
783 			SPDK_ERRLOG("Lvols still open on lvol store\n");
784 			cb_fn(cb_arg, -EBUSY);
785 			return -EBUSY;
786 		}
787 	}
788 
789 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
790 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
791 		_spdk_lvol_free(lvol);
792 	}
793 
794 	lvs_req = calloc(1, sizeof(*lvs_req));
795 	if (!lvs_req) {
796 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
797 		return -ENOMEM;
798 	}
799 
800 	lvs_req->cb_fn = cb_fn;
801 	lvs_req->cb_arg = cb_arg;
802 
803 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Unloading lvol store\n");
804 	spdk_bs_unload(lvs->blobstore, _lvs_unload_cb, lvs_req);
805 	_spdk_lvs_free(lvs);
806 
807 	return 0;
808 }
809 
810 static void
811 _lvs_destroy_cb(void *cb_arg, int lvserrno)
812 {
813 	struct spdk_lvs_destroy_req *lvs_req = cb_arg;
814 
815 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol store destroyed\n");
816 	assert(lvs_req->cb_fn != NULL);
817 	lvs_req->cb_fn(lvs_req->cb_arg, lvserrno);
818 	free(lvs_req);
819 }
820 
821 static void
822 _lvs_destroy_super_cb(void *cb_arg, int bserrno)
823 {
824 	struct spdk_lvs_destroy_req *lvs_req = cb_arg;
825 	struct spdk_lvol_store *lvs = lvs_req->lvs;
826 
827 	assert(lvs != NULL);
828 
829 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Destroying lvol store\n");
830 	spdk_bs_destroy(lvs->blobstore, _lvs_destroy_cb, lvs_req);
831 	_spdk_lvs_free(lvs);
832 }
833 
834 int
835 spdk_lvs_destroy(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn,
836 		 void *cb_arg)
837 {
838 	struct spdk_lvs_destroy_req *lvs_req;
839 	struct spdk_lvol *iter_lvol, *tmp;
840 
841 	if (lvs == NULL) {
842 		SPDK_ERRLOG("Lvol store is NULL\n");
843 		return -ENODEV;
844 	}
845 
846 	TAILQ_FOREACH_SAFE(iter_lvol, &lvs->lvols, link, tmp) {
847 		if (iter_lvol->action_in_progress == true) {
848 			SPDK_ERRLOG("Cannot destroy lvol store - operations on lvols pending\n");
849 			cb_fn(cb_arg, -EBUSY);
850 			return -EBUSY;
851 		} else if (iter_lvol->ref_count != 0) {
852 			SPDK_ERRLOG("Lvols still open on lvol store\n");
853 			cb_fn(cb_arg, -EBUSY);
854 			return -EBUSY;
855 		}
856 	}
857 
858 	TAILQ_FOREACH_SAFE(iter_lvol, &lvs->lvols, link, tmp) {
859 		free(iter_lvol);
860 	}
861 
862 	lvs_req = calloc(1, sizeof(*lvs_req));
863 	if (!lvs_req) {
864 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
865 		return -ENOMEM;
866 	}
867 
868 	lvs_req->cb_fn = cb_fn;
869 	lvs_req->cb_arg = cb_arg;
870 	lvs_req->lvs = lvs;
871 
872 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Deleting super blob\n");
873 	spdk_bs_delete_blob(lvs->blobstore, lvs->super_blob_id, _lvs_destroy_super_cb, lvs_req);
874 
875 	return 0;
876 }
877 
878 static void
879 _spdk_lvol_close_blob_cb(void *cb_arg, int lvolerrno)
880 {
881 	struct spdk_lvol_req *req = cb_arg;
882 	struct spdk_lvol *lvol = req->lvol;
883 
884 	if (lvolerrno < 0) {
885 		SPDK_ERRLOG("Could not close blob on lvol\n");
886 		_spdk_lvol_free(lvol);
887 		goto end;
888 	}
889 
890 	lvol->ref_count--;
891 	lvol->action_in_progress = false;
892 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol %s closed\n", lvol->unique_id);
893 
894 end:
895 	req->cb_fn(req->cb_arg, lvolerrno);
896 	free(req);
897 }
898 
899 bool
900 spdk_lvol_deletable(struct spdk_lvol *lvol)
901 {
902 	size_t count;
903 
904 	spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
905 	return (count == 0);
906 }
907 
908 static void
909 _spdk_lvol_delete_blob_cb(void *cb_arg, int lvolerrno)
910 {
911 	struct spdk_lvol_req *req = cb_arg;
912 	struct spdk_lvol *lvol = req->lvol;
913 
914 	if (lvolerrno < 0) {
915 		SPDK_ERRLOG("Could not remove blob on lvol gracefully - forced removal\n");
916 	} else {
917 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol %s deleted\n", lvol->unique_id);
918 	}
919 
920 	TAILQ_REMOVE(&lvol->lvol_store->lvols, lvol, link);
921 	_spdk_lvol_free(lvol);
922 	req->cb_fn(req->cb_arg, lvolerrno);
923 	free(req);
924 }
925 
926 static void
927 _spdk_lvol_create_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
928 {
929 	struct spdk_lvol_with_handle_req *req = cb_arg;
930 	spdk_blob_id blob_id = spdk_blob_get_id(blob);
931 	struct spdk_lvol *lvol = req->lvol;
932 
933 	TAILQ_REMOVE(&req->lvol->lvol_store->pending_lvols, req->lvol, link);
934 
935 	if (lvolerrno < 0) {
936 		free(lvol);
937 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
938 		free(req);
939 		return;
940 	}
941 
942 	lvol->blob = blob;
943 	lvol->blob_id = blob_id;
944 
945 	TAILQ_INSERT_TAIL(&lvol->lvol_store->lvols, lvol, link);
946 
947 	snprintf(lvol->unique_id, sizeof(lvol->unique_id), "%s", lvol->uuid_str);
948 	lvol->ref_count++;
949 
950 	assert(req->cb_fn != NULL);
951 	req->cb_fn(req->cb_arg, req->lvol, lvolerrno);
952 	free(req);
953 }
954 
955 static void
956 _spdk_lvol_create_cb(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
957 {
958 	struct spdk_lvol_with_handle_req *req = cb_arg;
959 	struct spdk_blob_store *bs;
960 	struct spdk_blob_open_opts opts;
961 
962 	if (lvolerrno < 0) {
963 		TAILQ_REMOVE(&req->lvol->lvol_store->pending_lvols, req->lvol, link);
964 		free(req->lvol);
965 		assert(req->cb_fn != NULL);
966 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
967 		free(req);
968 		return;
969 	}
970 
971 	spdk_blob_open_opts_init(&opts);
972 	opts.clear_method = req->lvol->clear_method;
973 	bs = req->lvol->lvol_store->blobstore;
974 
975 	spdk_bs_open_blob_ext(bs, blobid, &opts, _spdk_lvol_create_open_cb, req);
976 }
977 
978 static void
979 spdk_lvol_get_xattr_value(void *xattr_ctx, const char *name,
980 			  const void **value, size_t *value_len)
981 {
982 	struct spdk_lvol *lvol = xattr_ctx;
983 
984 	if (!strcmp(LVOL_NAME, name)) {
985 		*value = lvol->name;
986 		*value_len = SPDK_LVOL_NAME_MAX;
987 	} else if (!strcmp("uuid", name)) {
988 		*value = lvol->uuid_str;
989 		*value_len = sizeof(lvol->uuid_str);
990 	} else if (!strcmp("clear_method", name)) {
991 		*value = &lvol->clear_method;
992 		*value_len = sizeof(int);
993 	}
994 }
995 
996 static int
997 _spdk_lvs_verify_lvol_name(struct spdk_lvol_store *lvs, const char *name)
998 {
999 	struct spdk_lvol *tmp;
1000 
1001 	if (name == NULL || strnlen(name, SPDK_LVOL_NAME_MAX) == 0) {
1002 		SPDK_INFOLOG(SPDK_LOG_LVOL, "lvol name not provided.\n");
1003 		return -EINVAL;
1004 	}
1005 
1006 	if (strnlen(name, SPDK_LVOL_NAME_MAX) == SPDK_LVOL_NAME_MAX) {
1007 		SPDK_ERRLOG("Name has no null terminator.\n");
1008 		return -EINVAL;
1009 	}
1010 
1011 	TAILQ_FOREACH(tmp, &lvs->lvols, link) {
1012 		if (!strncmp(name, tmp->name, SPDK_LVOL_NAME_MAX)) {
1013 			SPDK_ERRLOG("lvol with name %s already exists\n", name);
1014 			return -EEXIST;
1015 		}
1016 	}
1017 
1018 	TAILQ_FOREACH(tmp, &lvs->pending_lvols, link) {
1019 		if (!strncmp(name, tmp->name, SPDK_LVOL_NAME_MAX)) {
1020 			SPDK_ERRLOG("lvol with name %s is being already created\n", name);
1021 			return -EEXIST;
1022 		}
1023 	}
1024 
1025 	return 0;
1026 }
1027 
1028 int
1029 spdk_lvol_create(struct spdk_lvol_store *lvs, const char *name, uint64_t sz,
1030 		 bool thin_provision, enum lvol_clear_method clear_method, spdk_lvol_op_with_handle_complete cb_fn,
1031 		 void *cb_arg)
1032 {
1033 	struct spdk_lvol_with_handle_req *req;
1034 	struct spdk_blob_store *bs;
1035 	struct spdk_lvol *lvol;
1036 	struct spdk_blob_opts opts;
1037 	uint64_t num_clusters;
1038 	char *xattr_names[] = {LVOL_NAME, "uuid", "clear_method"};
1039 	int rc;
1040 
1041 	if (lvs == NULL) {
1042 		SPDK_ERRLOG("lvol store does not exist\n");
1043 		return -EINVAL;
1044 	}
1045 
1046 	rc = _spdk_lvs_verify_lvol_name(lvs, name);
1047 	if (rc < 0) {
1048 		return rc;
1049 	}
1050 
1051 	bs = lvs->blobstore;
1052 
1053 	req = calloc(1, sizeof(*req));
1054 	if (!req) {
1055 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1056 		return -ENOMEM;
1057 	}
1058 	req->cb_fn = cb_fn;
1059 	req->cb_arg = cb_arg;
1060 
1061 	lvol = calloc(1, sizeof(*lvol));
1062 	if (!lvol) {
1063 		free(req);
1064 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1065 		return -ENOMEM;
1066 	}
1067 	lvol->lvol_store = lvs;
1068 	num_clusters = spdk_divide_round_up(sz, spdk_bs_get_cluster_size(bs));
1069 	lvol->thin_provision = thin_provision;
1070 	lvol->clear_method = (enum blob_clear_method)clear_method;
1071 	snprintf(lvol->name, sizeof(lvol->name), "%s", name);
1072 	TAILQ_INSERT_TAIL(&lvol->lvol_store->pending_lvols, lvol, link);
1073 	spdk_uuid_generate(&lvol->uuid);
1074 	spdk_uuid_fmt_lower(lvol->uuid_str, sizeof(lvol->uuid_str), &lvol->uuid);
1075 	req->lvol = lvol;
1076 
1077 	spdk_blob_opts_init(&opts);
1078 	opts.thin_provision = thin_provision;
1079 	opts.num_clusters = num_clusters;
1080 	opts.xattrs.count = SPDK_COUNTOF(xattr_names);
1081 	opts.xattrs.names = xattr_names;
1082 	opts.xattrs.ctx = lvol;
1083 	opts.xattrs.get_value = spdk_lvol_get_xattr_value;
1084 
1085 	spdk_bs_create_blob_ext(lvs->blobstore, &opts, _spdk_lvol_create_cb, req);
1086 
1087 	return 0;
1088 }
1089 
1090 void
1091 spdk_lvol_create_snapshot(struct spdk_lvol *origlvol, const char *snapshot_name,
1092 			  spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1093 {
1094 	struct spdk_lvol_store *lvs;
1095 	struct spdk_lvol *newlvol;
1096 	struct spdk_blob *origblob;
1097 	struct spdk_lvol_with_handle_req *req;
1098 	struct spdk_blob_xattr_opts snapshot_xattrs;
1099 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1100 	int rc;
1101 
1102 	if (origlvol == NULL) {
1103 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol not provided.\n");
1104 		cb_fn(cb_arg, NULL, -EINVAL);
1105 		return;
1106 	}
1107 
1108 	origblob = origlvol->blob;
1109 	lvs = origlvol->lvol_store;
1110 	if (lvs == NULL) {
1111 		SPDK_ERRLOG("lvol store does not exist\n");
1112 		cb_fn(cb_arg, NULL, -EINVAL);
1113 		return;
1114 	}
1115 
1116 	rc = _spdk_lvs_verify_lvol_name(lvs, snapshot_name);
1117 	if (rc < 0) {
1118 		cb_fn(cb_arg, NULL, rc);
1119 		return;
1120 	}
1121 
1122 	req = calloc(1, sizeof(*req));
1123 	if (!req) {
1124 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1125 		cb_fn(cb_arg, NULL, -ENOMEM);
1126 		return;
1127 	}
1128 
1129 	newlvol = calloc(1, sizeof(*newlvol));
1130 	if (!newlvol) {
1131 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1132 		free(req);
1133 		cb_fn(cb_arg, NULL, -ENOMEM);
1134 		return;
1135 	}
1136 
1137 	newlvol->lvol_store = origlvol->lvol_store;
1138 	snprintf(newlvol->name, sizeof(newlvol->name), "%s", snapshot_name);
1139 	TAILQ_INSERT_TAIL(&newlvol->lvol_store->pending_lvols, newlvol, link);
1140 	spdk_uuid_generate(&newlvol->uuid);
1141 	spdk_uuid_fmt_lower(newlvol->uuid_str, sizeof(newlvol->uuid_str), &newlvol->uuid);
1142 	snapshot_xattrs.count = SPDK_COUNTOF(xattr_names);
1143 	snapshot_xattrs.ctx = newlvol;
1144 	snapshot_xattrs.names = xattr_names;
1145 	snapshot_xattrs.get_value = spdk_lvol_get_xattr_value;
1146 	req->lvol = newlvol;
1147 	req->cb_fn = cb_fn;
1148 	req->cb_arg = cb_arg;
1149 
1150 	spdk_bs_create_snapshot(lvs->blobstore, spdk_blob_get_id(origblob), &snapshot_xattrs,
1151 				_spdk_lvol_create_cb, req);
1152 }
1153 
1154 void
1155 spdk_lvol_create_clone(struct spdk_lvol *origlvol, const char *clone_name,
1156 		       spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1157 {
1158 	struct spdk_lvol *newlvol;
1159 	struct spdk_lvol_with_handle_req *req;
1160 	struct spdk_lvol_store *lvs;
1161 	struct spdk_blob *origblob;
1162 	struct spdk_blob_xattr_opts clone_xattrs;
1163 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1164 	int rc;
1165 
1166 	if (origlvol == NULL) {
1167 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol not provided.\n");
1168 		cb_fn(cb_arg, NULL, -EINVAL);
1169 		return;
1170 	}
1171 
1172 	origblob = origlvol->blob;
1173 	lvs = origlvol->lvol_store;
1174 	if (lvs == NULL) {
1175 		SPDK_ERRLOG("lvol store does not exist\n");
1176 		cb_fn(cb_arg, NULL, -EINVAL);
1177 		return;
1178 	}
1179 
1180 	rc = _spdk_lvs_verify_lvol_name(lvs, clone_name);
1181 	if (rc < 0) {
1182 		cb_fn(cb_arg, NULL, rc);
1183 		return;
1184 	}
1185 
1186 	req = calloc(1, sizeof(*req));
1187 	if (!req) {
1188 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1189 		cb_fn(cb_arg, NULL, -ENOMEM);
1190 		return;
1191 	}
1192 
1193 	newlvol = calloc(1, sizeof(*newlvol));
1194 	if (!newlvol) {
1195 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1196 		free(req);
1197 		cb_fn(cb_arg, NULL, -ENOMEM);
1198 		return;
1199 	}
1200 
1201 	newlvol->lvol_store = lvs;
1202 	snprintf(newlvol->name, sizeof(newlvol->name), "%s", clone_name);
1203 	TAILQ_INSERT_TAIL(&newlvol->lvol_store->pending_lvols, newlvol, link);
1204 	spdk_uuid_generate(&newlvol->uuid);
1205 	spdk_uuid_fmt_lower(newlvol->uuid_str, sizeof(newlvol->uuid_str), &newlvol->uuid);
1206 	clone_xattrs.count = SPDK_COUNTOF(xattr_names);
1207 	clone_xattrs.ctx = newlvol;
1208 	clone_xattrs.names = xattr_names;
1209 	clone_xattrs.get_value = spdk_lvol_get_xattr_value;
1210 	req->lvol = newlvol;
1211 	req->cb_fn = cb_fn;
1212 	req->cb_arg = cb_arg;
1213 
1214 	spdk_bs_create_clone(lvs->blobstore, spdk_blob_get_id(origblob), &clone_xattrs,
1215 			     _spdk_lvol_create_cb,
1216 			     req);
1217 }
1218 
1219 static void
1220 _spdk_lvol_resize_done(void *cb_arg, int lvolerrno)
1221 {
1222 	struct spdk_lvol_req *req = cb_arg;
1223 
1224 	req->cb_fn(req->cb_arg,  lvolerrno);
1225 	free(req);
1226 }
1227 
1228 static void
1229 _spdk_lvol_blob_resize_cb(void *cb_arg, int bserrno)
1230 {
1231 	struct spdk_lvol_req *req = cb_arg;
1232 	struct spdk_lvol *lvol = req->lvol;
1233 
1234 	if (bserrno != 0) {
1235 		req->cb_fn(req->cb_arg, bserrno);
1236 		free(req);
1237 		return;
1238 	}
1239 
1240 	spdk_blob_sync_md(lvol->blob, _spdk_lvol_resize_done, req);
1241 }
1242 
1243 void
1244 spdk_lvol_resize(struct spdk_lvol *lvol, uint64_t sz,
1245 		 spdk_lvol_op_complete cb_fn, void *cb_arg)
1246 {
1247 	struct spdk_blob *blob = lvol->blob;
1248 	struct spdk_lvol_store *lvs = lvol->lvol_store;
1249 	struct spdk_lvol_req *req;
1250 	uint64_t new_clusters = spdk_divide_round_up(sz, spdk_bs_get_cluster_size(lvs->blobstore));
1251 
1252 	req = calloc(1, sizeof(*req));
1253 	if (!req) {
1254 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1255 		cb_fn(cb_arg, -ENOMEM);
1256 		return;
1257 	}
1258 	req->cb_fn = cb_fn;
1259 	req->cb_arg = cb_arg;
1260 	req->lvol = lvol;
1261 
1262 	spdk_blob_resize(blob, new_clusters, _spdk_lvol_blob_resize_cb, req);
1263 }
1264 
1265 static void
1266 _spdk_lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
1267 {
1268 	struct spdk_lvol_req *req = cb_arg;
1269 
1270 	req->cb_fn(req->cb_arg, lvolerrno);
1271 	free(req);
1272 }
1273 
1274 void
1275 spdk_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1276 {
1277 	struct spdk_lvol_req *req;
1278 
1279 	req = calloc(1, sizeof(*req));
1280 	if (!req) {
1281 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1282 		cb_fn(cb_arg, -ENOMEM);
1283 		return;
1284 	}
1285 	req->cb_fn = cb_fn;
1286 	req->cb_arg = cb_arg;
1287 
1288 	spdk_blob_set_read_only(lvol->blob);
1289 	spdk_blob_sync_md(lvol->blob, _spdk_lvol_set_read_only_cb, req);
1290 }
1291 
1292 static void
1293 _spdk_lvol_rename_cb(void *cb_arg, int lvolerrno)
1294 {
1295 	struct spdk_lvol_req *req = cb_arg;
1296 
1297 	if (lvolerrno != 0) {
1298 		SPDK_ERRLOG("Lvol rename operation failed\n");
1299 	} else {
1300 		snprintf(req->lvol->name, sizeof(req->lvol->name), "%s", req->name);
1301 	}
1302 
1303 	req->cb_fn(req->cb_arg, lvolerrno);
1304 	free(req);
1305 }
1306 
1307 void
1308 spdk_lvol_rename(struct spdk_lvol *lvol, const char *new_name,
1309 		 spdk_lvol_op_complete cb_fn, void *cb_arg)
1310 {
1311 	struct spdk_lvol *tmp;
1312 	struct spdk_blob *blob = lvol->blob;
1313 	struct spdk_lvol_req *req;
1314 	int rc;
1315 
1316 	/* Check if new name is current lvol name.
1317 	 * If so, return success immediately */
1318 	if (strncmp(lvol->name, new_name, SPDK_LVOL_NAME_MAX) == 0) {
1319 		cb_fn(cb_arg, 0);
1320 		return;
1321 	}
1322 
1323 	/* Check if lvol with 'new_name' already exists in lvolstore */
1324 	TAILQ_FOREACH(tmp, &lvol->lvol_store->lvols, link) {
1325 		if (strncmp(tmp->name, new_name, SPDK_LVOL_NAME_MAX) == 0) {
1326 			SPDK_ERRLOG("Lvol %s already exists in lvol store %s\n", new_name, lvol->lvol_store->name);
1327 			cb_fn(cb_arg, -EEXIST);
1328 			return;
1329 		}
1330 	}
1331 
1332 	req = calloc(1, sizeof(*req));
1333 	if (!req) {
1334 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1335 		cb_fn(cb_arg, -ENOMEM);
1336 		return;
1337 	}
1338 	req->cb_fn = cb_fn;
1339 	req->cb_arg = cb_arg;
1340 	req->lvol = lvol;
1341 	snprintf(req->name, sizeof(req->name), "%s", new_name);
1342 
1343 	rc = spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1344 	if (rc < 0) {
1345 		free(req);
1346 		cb_fn(cb_arg, rc);
1347 		return;
1348 	}
1349 
1350 	spdk_blob_sync_md(blob, _spdk_lvol_rename_cb, req);
1351 }
1352 
1353 void
1354 spdk_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1355 {
1356 	struct spdk_lvol_req *req;
1357 	struct spdk_blob_store *bs;
1358 
1359 	assert(cb_fn != NULL);
1360 
1361 	if (lvol == NULL) {
1362 		SPDK_ERRLOG("lvol does not exist\n");
1363 		cb_fn(cb_arg, -ENODEV);
1364 		return;
1365 	}
1366 
1367 	if (lvol->ref_count != 0) {
1368 		SPDK_ERRLOG("Cannot destroy lvol %s because it is still open\n", lvol->unique_id);
1369 		cb_fn(cb_arg, -EBUSY);
1370 		return;
1371 	}
1372 
1373 	lvol->action_in_progress = true;
1374 
1375 	req = calloc(1, sizeof(*req));
1376 	if (!req) {
1377 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1378 		cb_fn(cb_arg, -ENOMEM);
1379 		return;
1380 	}
1381 
1382 	req->cb_fn = cb_fn;
1383 	req->cb_arg = cb_arg;
1384 	req->lvol = lvol;
1385 	bs = lvol->lvol_store->blobstore;
1386 
1387 	spdk_bs_delete_blob(bs, lvol->blob_id, _spdk_lvol_delete_blob_cb, req);
1388 }
1389 
1390 void
1391 spdk_lvol_close(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1392 {
1393 	struct spdk_lvol_req *req;
1394 
1395 	assert(cb_fn != NULL);
1396 
1397 	if (lvol == NULL) {
1398 		SPDK_ERRLOG("lvol does not exist\n");
1399 		cb_fn(cb_arg, -ENODEV);
1400 		return;
1401 	}
1402 
1403 	if (lvol->ref_count > 1) {
1404 		lvol->ref_count--;
1405 		cb_fn(cb_arg, 0);
1406 		return;
1407 	} else if (lvol->ref_count == 0) {
1408 		cb_fn(cb_arg, -EINVAL);
1409 		return;
1410 	}
1411 
1412 	lvol->action_in_progress = true;
1413 
1414 	req = calloc(1, sizeof(*req));
1415 	if (!req) {
1416 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1417 		cb_fn(cb_arg, -ENOMEM);
1418 		return;
1419 	}
1420 
1421 	req->cb_fn = cb_fn;
1422 	req->cb_arg = cb_arg;
1423 	req->lvol = lvol;
1424 
1425 	spdk_blob_close(lvol->blob, _spdk_lvol_close_blob_cb, req);
1426 }
1427 
1428 struct spdk_io_channel *
1429 spdk_lvol_get_io_channel(struct spdk_lvol *lvol)
1430 {
1431 	return spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1432 }
1433 
1434 static void
1435 _spdk_lvol_inflate_cb(void *cb_arg, int lvolerrno)
1436 {
1437 	struct spdk_lvol_req *req = cb_arg;
1438 
1439 	spdk_bs_free_io_channel(req->channel);
1440 
1441 	if (lvolerrno < 0) {
1442 		SPDK_ERRLOG("Could not inflate lvol\n");
1443 	}
1444 
1445 	req->cb_fn(req->cb_arg, lvolerrno);
1446 	free(req);
1447 }
1448 
1449 void
1450 spdk_lvol_inflate(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1451 {
1452 	struct spdk_lvol_req *req;
1453 	spdk_blob_id blob_id;
1454 
1455 	assert(cb_fn != NULL);
1456 
1457 	if (lvol == NULL) {
1458 		SPDK_ERRLOG("Lvol does not exist\n");
1459 		cb_fn(cb_arg, -ENODEV);
1460 		return;
1461 	}
1462 
1463 	req = calloc(1, sizeof(*req));
1464 	if (!req) {
1465 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1466 		cb_fn(cb_arg, -ENOMEM);
1467 		return;
1468 	}
1469 
1470 	req->cb_fn = cb_fn;
1471 	req->cb_arg = cb_arg;
1472 	req->channel = spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1473 	if (req->channel == NULL) {
1474 		SPDK_ERRLOG("Cannot alloc io channel for lvol inflate request\n");
1475 		free(req);
1476 		cb_fn(cb_arg, -ENOMEM);
1477 		return;
1478 	}
1479 
1480 	blob_id = spdk_blob_get_id(lvol->blob);
1481 	spdk_bs_inflate_blob(lvol->lvol_store->blobstore, req->channel, blob_id, _spdk_lvol_inflate_cb,
1482 			     req);
1483 }
1484 
1485 void
1486 spdk_lvol_decouple_parent(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1487 {
1488 	struct spdk_lvol_req *req;
1489 	spdk_blob_id blob_id;
1490 
1491 	assert(cb_fn != NULL);
1492 
1493 	if (lvol == NULL) {
1494 		SPDK_ERRLOG("Lvol does not exist\n");
1495 		cb_fn(cb_arg, -ENODEV);
1496 		return;
1497 	}
1498 
1499 	req = calloc(1, sizeof(*req));
1500 	if (!req) {
1501 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1502 		cb_fn(cb_arg, -ENOMEM);
1503 		return;
1504 	}
1505 
1506 	req->cb_fn = cb_fn;
1507 	req->cb_arg = cb_arg;
1508 	req->channel = spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1509 	if (req->channel == NULL) {
1510 		SPDK_ERRLOG("Cannot alloc io channel for lvol inflate request\n");
1511 		free(req);
1512 		cb_fn(cb_arg, -ENOMEM);
1513 		return;
1514 	}
1515 
1516 	blob_id = spdk_blob_get_id(lvol->blob);
1517 	spdk_bs_blob_decouple_parent(lvol->lvol_store->blobstore, req->channel, blob_id,
1518 				     _spdk_lvol_inflate_cb, req);
1519 }
1520