xref: /spdk/lib/lvol/lvol.c (revision fa2d95b3fe66e7f5c543eaef89fa00d4eaa0e6e7)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk_internal/lvolstore.h"
35 #include "spdk_internal/log.h"
36 #include "spdk/string.h"
37 #include "spdk/thread.h"
38 #include "spdk/blob_bdev.h"
39 #include "spdk/util.h"
40 
41 /* Default blob channel opts for lvol */
42 #define SPDK_LVOL_BLOB_OPTS_CHANNEL_OPS 512
43 
44 #define LVOL_NAME "name"
45 
46 SPDK_LOG_REGISTER_COMPONENT("lvol", SPDK_LOG_LVOL)
47 
48 static TAILQ_HEAD(, spdk_lvol_store) g_lvol_stores = TAILQ_HEAD_INITIALIZER(g_lvol_stores);
49 static pthread_mutex_t g_lvol_stores_mutex = PTHREAD_MUTEX_INITIALIZER;
50 
51 static int
52 _spdk_add_lvs_to_list(struct spdk_lvol_store *lvs)
53 {
54 	struct spdk_lvol_store *tmp;
55 	bool name_conflict = false;
56 
57 	pthread_mutex_lock(&g_lvol_stores_mutex);
58 	TAILQ_FOREACH(tmp, &g_lvol_stores, link) {
59 		if (!strncmp(lvs->name, tmp->name, SPDK_LVS_NAME_MAX)) {
60 			name_conflict = true;
61 			break;
62 		}
63 	}
64 	if (!name_conflict) {
65 		lvs->on_list = true;
66 		TAILQ_INSERT_TAIL(&g_lvol_stores, lvs, link);
67 	}
68 	pthread_mutex_unlock(&g_lvol_stores_mutex);
69 
70 	return name_conflict ? -1 : 0;
71 }
72 
73 static void
74 _spdk_lvs_free(struct spdk_lvol_store *lvs)
75 {
76 	pthread_mutex_lock(&g_lvol_stores_mutex);
77 	if (lvs->on_list) {
78 		TAILQ_REMOVE(&g_lvol_stores, lvs, link);
79 	}
80 	pthread_mutex_unlock(&g_lvol_stores_mutex);
81 
82 	free(lvs);
83 }
84 
85 static void
86 _spdk_lvol_free(struct spdk_lvol *lvol)
87 {
88 	free(lvol);
89 }
90 
91 static void
92 _spdk_lvol_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
93 {
94 	struct spdk_lvol_with_handle_req *req = cb_arg;
95 	struct spdk_lvol *lvol = req->lvol;
96 
97 	if (lvolerrno != 0) {
98 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Failed to open lvol %s\n", lvol->unique_id);
99 		goto end;
100 	}
101 
102 	lvol->ref_count++;
103 	lvol->blob = blob;
104 end:
105 	req->cb_fn(req->cb_arg, lvol, lvolerrno);
106 	free(req);
107 }
108 
109 void
110 spdk_lvol_open(struct spdk_lvol *lvol, spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
111 {
112 	struct spdk_lvol_with_handle_req *req;
113 	struct spdk_blob_open_opts opts;
114 
115 	assert(cb_fn != NULL);
116 
117 	if (lvol == NULL) {
118 		SPDK_ERRLOG("lvol does not exist\n");
119 		cb_fn(cb_arg, NULL, -ENODEV);
120 		return;
121 	}
122 
123 	if (lvol->action_in_progress == true) {
124 		SPDK_ERRLOG("Cannot open lvol - operations on lvol pending\n");
125 		cb_fn(cb_arg, lvol, -EBUSY);
126 		return;
127 	}
128 
129 	if (lvol->ref_count > 0) {
130 		lvol->ref_count++;
131 		cb_fn(cb_arg, lvol, 0);
132 		return;
133 	}
134 
135 	req = calloc(1, sizeof(*req));
136 	if (req == NULL) {
137 		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
138 		cb_fn(cb_arg, NULL, -ENOMEM);
139 		return;
140 	}
141 
142 	req->cb_fn = cb_fn;
143 	req->cb_arg = cb_arg;
144 	req->lvol = lvol;
145 
146 	spdk_blob_open_opts_init(&opts);
147 	opts.clear_method = lvol->clear_method;
148 
149 	spdk_bs_open_blob_ext(lvol->lvol_store->blobstore, lvol->blob_id, &opts, _spdk_lvol_open_cb, req);
150 }
151 
152 static void
153 _spdk_bs_unload_with_error_cb(void *cb_arg, int lvolerrno)
154 {
155 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
156 
157 	req->cb_fn(req->cb_arg, NULL, req->lvserrno);
158 	free(req);
159 }
160 
161 static void
162 _spdk_load_next_lvol(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
163 {
164 	struct spdk_lvs_with_handle_req *req = cb_arg;
165 	struct spdk_lvol_store *lvs = req->lvol_store;
166 	struct spdk_blob_store *bs = lvs->blobstore;
167 	struct spdk_lvol *lvol, *tmp;
168 	spdk_blob_id blob_id;
169 	const char *attr;
170 	const enum blob_clear_method *clear_method;
171 	size_t value_len;
172 	int rc;
173 
174 	if (lvolerrno == -ENOENT) {
175 		/* Finished iterating */
176 		req->cb_fn(req->cb_arg, lvs, 0);
177 		free(req);
178 		return;
179 	} else if (lvolerrno < 0) {
180 		SPDK_ERRLOG("Failed to fetch blobs list\n");
181 		req->lvserrno = lvolerrno;
182 		goto invalid;
183 	}
184 
185 	blob_id = spdk_blob_get_id(blob);
186 
187 	if (blob_id == lvs->super_blob_id) {
188 		SPDK_INFOLOG(SPDK_LOG_LVOL, "found superblob %"PRIu64"\n", (uint64_t)blob_id);
189 		spdk_bs_iter_next(bs, blob, _spdk_load_next_lvol, req);
190 		return;
191 	}
192 
193 	lvol = calloc(1, sizeof(*lvol));
194 	if (!lvol) {
195 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
196 		req->lvserrno = -ENOMEM;
197 		goto invalid;
198 	}
199 
200 	lvol->blob = blob;
201 	lvol->blob_id = blob_id;
202 	lvol->lvol_store = lvs;
203 
204 	rc = spdk_blob_get_xattr_value(blob, "uuid", (const void **)&attr, &value_len);
205 	if (rc != 0 || value_len != SPDK_UUID_STRING_LEN || attr[SPDK_UUID_STRING_LEN - 1] != '\0' ||
206 	    spdk_uuid_parse(&lvol->uuid, attr) != 0) {
207 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Missing or corrupt lvol uuid\n");
208 		memset(&lvol->uuid, 0, sizeof(lvol->uuid));
209 	}
210 	spdk_uuid_fmt_lower(lvol->uuid_str, sizeof(lvol->uuid_str), &lvol->uuid);
211 
212 	if (!spdk_mem_all_zero(&lvol->uuid, sizeof(lvol->uuid))) {
213 		snprintf(lvol->unique_id, sizeof(lvol->unique_id), "%s", lvol->uuid_str);
214 	} else {
215 		spdk_uuid_fmt_lower(lvol->unique_id, sizeof(lvol->unique_id), &lvol->lvol_store->uuid);
216 		value_len = strlen(lvol->unique_id);
217 		snprintf(lvol->unique_id + value_len, sizeof(lvol->unique_id) - value_len, "_%"PRIu64,
218 			 (uint64_t)blob_id);
219 	}
220 
221 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&attr, &value_len);
222 	if (rc != 0 || value_len > SPDK_LVOL_NAME_MAX) {
223 		SPDK_ERRLOG("Cannot assign lvol name\n");
224 		_spdk_lvol_free(lvol);
225 		req->lvserrno = -EINVAL;
226 		goto invalid;
227 	}
228 
229 	rc = spdk_blob_get_xattr_value(blob, "clear_method", (const void **)&clear_method, &value_len);
230 	if (rc != 0) {
231 		lvol->clear_method = BLOB_CLEAR_WITH_DEFAULT;
232 	} else {
233 		lvol->clear_method = *clear_method;
234 	}
235 
236 	snprintf(lvol->name, sizeof(lvol->name), "%s", attr);
237 
238 	TAILQ_INSERT_TAIL(&lvs->lvols, lvol, link);
239 
240 	lvs->lvol_count++;
241 
242 	SPDK_INFOLOG(SPDK_LOG_LVOL, "added lvol %s (%s)\n", lvol->unique_id, lvol->uuid_str);
243 
244 	spdk_bs_iter_next(bs, blob, _spdk_load_next_lvol, req);
245 
246 	return;
247 
248 invalid:
249 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
250 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
251 		free(lvol);
252 	}
253 
254 	_spdk_lvs_free(lvs);
255 	spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
256 }
257 
258 static void
259 _spdk_close_super_cb(void *cb_arg, int lvolerrno)
260 {
261 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
262 	struct spdk_lvol_store *lvs = req->lvol_store;
263 	struct spdk_blob_store *bs = lvs->blobstore;
264 
265 	if (lvolerrno != 0) {
266 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Could not close super blob\n");
267 		_spdk_lvs_free(lvs);
268 		req->lvserrno = -ENODEV;
269 		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
270 		return;
271 	}
272 
273 	/* Start loading lvols */
274 	spdk_bs_iter_first(lvs->blobstore, _spdk_load_next_lvol, req);
275 }
276 
277 static void
278 _spdk_close_super_blob_with_error_cb(void *cb_arg, int lvolerrno)
279 {
280 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
281 	struct spdk_lvol_store *lvs = req->lvol_store;
282 	struct spdk_blob_store *bs = lvs->blobstore;
283 
284 	_spdk_lvs_free(lvs);
285 
286 	spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
287 }
288 
289 static void
290 _spdk_lvs_read_uuid(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
291 {
292 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
293 	struct spdk_lvol_store *lvs = req->lvol_store;
294 	struct spdk_blob_store *bs = lvs->blobstore;
295 	const char *attr;
296 	size_t value_len;
297 	int rc;
298 
299 	if (lvolerrno != 0) {
300 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Could not open super blob\n");
301 		_spdk_lvs_free(lvs);
302 		req->lvserrno = -ENODEV;
303 		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
304 		return;
305 	}
306 
307 	rc = spdk_blob_get_xattr_value(blob, "uuid", (const void **)&attr, &value_len);
308 	if (rc != 0 || value_len != SPDK_UUID_STRING_LEN || attr[SPDK_UUID_STRING_LEN - 1] != '\0') {
309 		SPDK_INFOLOG(SPDK_LOG_LVOL, "missing or incorrect UUID\n");
310 		req->lvserrno = -EINVAL;
311 		spdk_blob_close(blob, _spdk_close_super_blob_with_error_cb, req);
312 		return;
313 	}
314 
315 	if (spdk_uuid_parse(&lvs->uuid, attr)) {
316 		SPDK_INFOLOG(SPDK_LOG_LVOL, "incorrect UUID '%s'\n", attr);
317 		req->lvserrno = -EINVAL;
318 		spdk_blob_close(blob, _spdk_close_super_blob_with_error_cb, req);
319 		return;
320 	}
321 
322 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&attr, &value_len);
323 	if (rc != 0 || value_len > SPDK_LVS_NAME_MAX) {
324 		SPDK_INFOLOG(SPDK_LOG_LVOL, "missing or invalid name\n");
325 		req->lvserrno = -EINVAL;
326 		spdk_blob_close(blob, _spdk_close_super_blob_with_error_cb, req);
327 		return;
328 	}
329 
330 	snprintf(lvs->name, sizeof(lvs->name), "%s", attr);
331 
332 	rc = _spdk_add_lvs_to_list(lvs);
333 	if (rc) {
334 		SPDK_INFOLOG(SPDK_LOG_LVOL, "lvolstore with name %s already exists\n", lvs->name);
335 		req->lvserrno = -EEXIST;
336 		spdk_blob_close(blob, _spdk_close_super_blob_with_error_cb, req);
337 		return;
338 	}
339 
340 	lvs->super_blob_id = spdk_blob_get_id(blob);
341 
342 	spdk_blob_close(blob, _spdk_close_super_cb, req);
343 }
344 
345 static void
346 _spdk_lvs_open_super(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
347 {
348 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
349 	struct spdk_lvol_store *lvs = req->lvol_store;
350 	struct spdk_blob_store *bs = lvs->blobstore;
351 
352 	if (lvolerrno != 0) {
353 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Super blob not found\n");
354 		_spdk_lvs_free(lvs);
355 		req->lvserrno = -ENODEV;
356 		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
357 		return;
358 	}
359 
360 	spdk_bs_open_blob(bs, blobid, _spdk_lvs_read_uuid, req);
361 }
362 
363 static void
364 _spdk_lvs_load_cb(void *cb_arg, struct spdk_blob_store *bs, int lvolerrno)
365 {
366 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
367 	struct spdk_lvol_store *lvs;
368 
369 	if (lvolerrno != 0) {
370 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
371 		free(req);
372 		return;
373 	}
374 
375 	lvs = calloc(1, sizeof(*lvs));
376 	if (lvs == NULL) {
377 		SPDK_ERRLOG("Cannot alloc memory for lvol store\n");
378 		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
379 		return;
380 	}
381 
382 	lvs->blobstore = bs;
383 	lvs->bs_dev = req->bs_dev;
384 	TAILQ_INIT(&lvs->lvols);
385 	TAILQ_INIT(&lvs->pending_lvols);
386 
387 	req->lvol_store = lvs;
388 
389 	spdk_bs_get_super(bs, _spdk_lvs_open_super, req);
390 }
391 
392 static void
393 spdk_lvs_bs_opts_init(struct spdk_bs_opts *opts)
394 {
395 	spdk_bs_opts_init(opts);
396 	opts->max_channel_ops = SPDK_LVOL_BLOB_OPTS_CHANNEL_OPS;
397 }
398 
399 void
400 spdk_lvs_load(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
401 {
402 	struct spdk_lvs_with_handle_req *req;
403 	struct spdk_bs_opts opts = {};
404 
405 	assert(cb_fn != NULL);
406 
407 	if (bs_dev == NULL) {
408 		SPDK_ERRLOG("Blobstore device does not exist\n");
409 		cb_fn(cb_arg, NULL, -ENODEV);
410 		return;
411 	}
412 
413 	req = calloc(1, sizeof(*req));
414 	if (req == NULL) {
415 		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
416 		cb_fn(cb_arg, NULL, -ENOMEM);
417 		return;
418 	}
419 
420 	req->cb_fn = cb_fn;
421 	req->cb_arg = cb_arg;
422 	req->bs_dev = bs_dev;
423 
424 	spdk_lvs_bs_opts_init(&opts);
425 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "LVOLSTORE");
426 
427 	spdk_bs_load(bs_dev, &opts, _spdk_lvs_load_cb, req);
428 }
429 
430 static void
431 _spdk_super_create_close_cb(void *cb_arg, int lvolerrno)
432 {
433 	struct spdk_lvs_with_handle_req *req = cb_arg;
434 	struct spdk_lvol_store *lvs = req->lvol_store;
435 
436 	if (lvolerrno < 0) {
437 		SPDK_ERRLOG("Lvol store init failed: could not close super blob\n");
438 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
439 		_spdk_lvs_free(lvs);
440 		free(req);
441 		return;
442 	}
443 
444 	req->cb_fn(req->cb_arg, lvs, lvolerrno);
445 	free(req);
446 }
447 
448 static void
449 _spdk_super_blob_set_cb(void *cb_arg, int lvolerrno)
450 {
451 	struct spdk_lvs_with_handle_req *req = cb_arg;
452 	struct spdk_lvol_store *lvs = req->lvol_store;
453 	struct spdk_blob *blob = lvs->super_blob;
454 
455 	if (lvolerrno < 0) {
456 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
457 		SPDK_ERRLOG("Lvol store init failed: could not set uuid for super blob\n");
458 		_spdk_lvs_free(lvs);
459 		free(req);
460 		return;
461 	}
462 
463 	spdk_blob_close(blob, _spdk_super_create_close_cb, req);
464 }
465 
466 static void
467 _spdk_super_blob_init_cb(void *cb_arg, int lvolerrno)
468 {
469 	struct spdk_lvs_with_handle_req *req = cb_arg;
470 	struct spdk_lvol_store *lvs = req->lvol_store;
471 	struct spdk_blob *blob = lvs->super_blob;
472 	char uuid[SPDK_UUID_STRING_LEN];
473 
474 	if (lvolerrno < 0) {
475 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
476 		SPDK_ERRLOG("Lvol store init failed: could not set super blob\n");
477 		_spdk_lvs_free(lvs);
478 		free(req);
479 		return;
480 	}
481 
482 	spdk_uuid_fmt_lower(uuid, sizeof(uuid), &lvs->uuid);
483 
484 	spdk_blob_set_xattr(blob, "uuid", uuid, sizeof(uuid));
485 	spdk_blob_set_xattr(blob, "name", lvs->name, strnlen(lvs->name, SPDK_LVS_NAME_MAX) + 1);
486 	spdk_blob_sync_md(blob, _spdk_super_blob_set_cb, req);
487 }
488 
489 static void
490 _spdk_super_blob_create_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
491 {
492 	struct spdk_lvs_with_handle_req *req = cb_arg;
493 	struct spdk_lvol_store *lvs = req->lvol_store;
494 
495 	if (lvolerrno < 0) {
496 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
497 		SPDK_ERRLOG("Lvol store init failed: could not open super blob\n");
498 		_spdk_lvs_free(lvs);
499 		free(req);
500 		return;
501 	}
502 
503 	lvs->super_blob = blob;
504 	lvs->super_blob_id = spdk_blob_get_id(blob);
505 
506 	spdk_bs_set_super(lvs->blobstore, lvs->super_blob_id, _spdk_super_blob_init_cb, req);
507 }
508 
509 static void
510 _spdk_super_blob_create_cb(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
511 {
512 	struct spdk_lvs_with_handle_req *req = cb_arg;
513 	struct spdk_lvol_store *lvs = req->lvol_store;
514 	struct spdk_blob_store *bs;
515 
516 	if (lvolerrno < 0) {
517 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
518 		SPDK_ERRLOG("Lvol store init failed: could not create super blob\n");
519 		_spdk_lvs_free(lvs);
520 		free(req);
521 		return;
522 	}
523 
524 	bs = req->lvol_store->blobstore;
525 
526 	spdk_bs_open_blob(bs, blobid, _spdk_super_blob_create_open_cb, req);
527 }
528 
529 static void
530 _spdk_lvs_init_cb(void *cb_arg, struct spdk_blob_store *bs, int lvserrno)
531 {
532 	struct spdk_lvs_with_handle_req *lvs_req = cb_arg;
533 	struct spdk_lvol_store *lvs = lvs_req->lvol_store;
534 
535 	if (lvserrno != 0) {
536 		assert(bs == NULL);
537 		lvs_req->cb_fn(lvs_req->cb_arg, NULL, lvserrno);
538 		SPDK_ERRLOG("Lvol store init failed: could not initialize blobstore\n");
539 		_spdk_lvs_free(lvs);
540 		free(lvs_req);
541 		return;
542 	}
543 
544 	assert(bs != NULL);
545 	lvs->blobstore = bs;
546 	TAILQ_INIT(&lvs->lvols);
547 	TAILQ_INIT(&lvs->pending_lvols);
548 
549 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol store initialized\n");
550 
551 	/* create super blob */
552 	spdk_bs_create_blob(lvs->blobstore, _spdk_super_blob_create_cb, lvs_req);
553 }
554 
555 void
556 spdk_lvs_opts_init(struct spdk_lvs_opts *o)
557 {
558 	o->cluster_sz = SPDK_LVS_OPTS_CLUSTER_SZ;
559 	o->clear_method = LVS_CLEAR_WITH_UNMAP;
560 	memset(o->name, 0, sizeof(o->name));
561 }
562 
563 static void
564 _spdk_setup_lvs_opts(struct spdk_bs_opts *bs_opts, struct spdk_lvs_opts *o)
565 {
566 	assert(o != NULL);
567 	spdk_lvs_bs_opts_init(bs_opts);
568 	bs_opts->cluster_sz = o->cluster_sz;
569 	bs_opts->clear_method = (enum bs_clear_method)o->clear_method;
570 }
571 
572 int
573 spdk_lvs_init(struct spdk_bs_dev *bs_dev, struct spdk_lvs_opts *o,
574 	      spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
575 {
576 	struct spdk_lvol_store *lvs;
577 	struct spdk_lvs_with_handle_req *lvs_req;
578 	struct spdk_bs_opts opts = {};
579 	int rc;
580 
581 	if (bs_dev == NULL) {
582 		SPDK_ERRLOG("Blobstore device does not exist\n");
583 		return -ENODEV;
584 	}
585 
586 	if (o == NULL) {
587 		SPDK_ERRLOG("spdk_lvs_opts not specified\n");
588 		return -EINVAL;
589 	}
590 
591 	_spdk_setup_lvs_opts(&opts, o);
592 
593 	if (strnlen(o->name, SPDK_LVS_NAME_MAX) == SPDK_LVS_NAME_MAX) {
594 		SPDK_ERRLOG("Name has no null terminator.\n");
595 		return -EINVAL;
596 	}
597 
598 	if (strnlen(o->name, SPDK_LVS_NAME_MAX) == 0) {
599 		SPDK_ERRLOG("No name specified.\n");
600 		return -EINVAL;
601 	}
602 
603 	lvs = calloc(1, sizeof(*lvs));
604 	if (!lvs) {
605 		SPDK_ERRLOG("Cannot alloc memory for lvol store base pointer\n");
606 		return -ENOMEM;
607 	}
608 
609 	spdk_uuid_generate(&lvs->uuid);
610 	snprintf(lvs->name, sizeof(lvs->name), "%s", o->name);
611 
612 	rc = _spdk_add_lvs_to_list(lvs);
613 	if (rc) {
614 		SPDK_ERRLOG("lvolstore with name %s already exists\n", lvs->name);
615 		_spdk_lvs_free(lvs);
616 		return -EEXIST;
617 	}
618 
619 	lvs_req = calloc(1, sizeof(*lvs_req));
620 	if (!lvs_req) {
621 		_spdk_lvs_free(lvs);
622 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
623 		return -ENOMEM;
624 	}
625 
626 	assert(cb_fn != NULL);
627 	lvs_req->cb_fn = cb_fn;
628 	lvs_req->cb_arg = cb_arg;
629 	lvs_req->lvol_store = lvs;
630 	lvs->bs_dev = bs_dev;
631 	lvs->destruct = false;
632 
633 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "LVOLSTORE");
634 
635 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Initializing lvol store\n");
636 	spdk_bs_init(bs_dev, &opts, _spdk_lvs_init_cb, lvs_req);
637 
638 	return 0;
639 }
640 
641 static void
642 _spdk_lvs_rename_cb(void *cb_arg, int lvolerrno)
643 {
644 	struct spdk_lvs_req *req = cb_arg;
645 
646 	if (lvolerrno != 0) {
647 		req->lvserrno = lvolerrno;
648 	}
649 	if (req->lvserrno != 0) {
650 		SPDK_ERRLOG("Lvol store rename operation failed\n");
651 		/* Lvs renaming failed, so we should 'clear' new_name.
652 		 * Otherwise it could cause a failure on the next attepmt to change the name to 'new_name'  */
653 		snprintf(req->lvol_store->new_name,
654 			 sizeof(req->lvol_store->new_name),
655 			 "%s", req->lvol_store->name);
656 	} else {
657 		/* Update lvs name with new_name */
658 		snprintf(req->lvol_store->name,
659 			 sizeof(req->lvol_store->name),
660 			 "%s", req->lvol_store->new_name);
661 	}
662 
663 	req->cb_fn(req->cb_arg, req->lvserrno);
664 	free(req);
665 }
666 
667 static void
668 _spdk_lvs_rename_sync_cb(void *cb_arg, int lvolerrno)
669 {
670 	struct spdk_lvs_req *req = cb_arg;
671 	struct spdk_blob *blob = req->lvol_store->super_blob;
672 
673 	if (lvolerrno < 0) {
674 		req->lvserrno = lvolerrno;
675 	}
676 
677 	spdk_blob_close(blob, _spdk_lvs_rename_cb, req);
678 }
679 
680 static void
681 _spdk_lvs_rename_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
682 {
683 	struct spdk_lvs_req *req = cb_arg;
684 	int rc;
685 
686 	if (lvolerrno < 0) {
687 		_spdk_lvs_rename_cb(cb_arg, lvolerrno);
688 		return;
689 	}
690 
691 	rc = spdk_blob_set_xattr(blob, "name", req->lvol_store->new_name,
692 				 strlen(req->lvol_store->new_name) + 1);
693 	if (rc < 0) {
694 		req->lvserrno = rc;
695 		_spdk_lvs_rename_sync_cb(req, rc);
696 		return;
697 	}
698 
699 	req->lvol_store->super_blob = blob;
700 
701 	spdk_blob_sync_md(blob, _spdk_lvs_rename_sync_cb, req);
702 }
703 
704 void
705 spdk_lvs_rename(struct spdk_lvol_store *lvs, const char *new_name,
706 		spdk_lvs_op_complete cb_fn, void *cb_arg)
707 {
708 	struct spdk_lvs_req *req;
709 	struct spdk_lvol_store *tmp;
710 
711 	/* Check if new name is current lvs name.
712 	 * If so, return success immediately */
713 	if (strncmp(lvs->name, new_name, SPDK_LVS_NAME_MAX) == 0) {
714 		cb_fn(cb_arg, 0);
715 		return;
716 	}
717 
718 	/* Check if new or new_name is already used in other lvs */
719 	pthread_mutex_lock(&g_lvol_stores_mutex);
720 	TAILQ_FOREACH(tmp, &g_lvol_stores, link) {
721 		if (!strncmp(new_name, tmp->name, SPDK_LVS_NAME_MAX) ||
722 		    !strncmp(new_name, tmp->new_name, SPDK_LVS_NAME_MAX)) {
723 			pthread_mutex_unlock(&g_lvol_stores_mutex);
724 			cb_fn(cb_arg, -EEXIST);
725 			return;
726 		}
727 	}
728 	pthread_mutex_unlock(&g_lvol_stores_mutex);
729 
730 	req = calloc(1, sizeof(*req));
731 	if (!req) {
732 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
733 		cb_fn(cb_arg, -ENOMEM);
734 		return;
735 	}
736 	snprintf(lvs->new_name, sizeof(lvs->new_name), "%s", new_name);
737 	req->lvol_store = lvs;
738 	req->cb_fn = cb_fn;
739 	req->cb_arg = cb_arg;
740 
741 	spdk_bs_open_blob(lvs->blobstore, lvs->super_blob_id, _spdk_lvs_rename_open_cb, req);
742 }
743 
744 static void
745 _lvs_unload_cb(void *cb_arg, int lvserrno)
746 {
747 	struct spdk_lvs_req *lvs_req = cb_arg;
748 
749 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol store unloaded\n");
750 	assert(lvs_req->cb_fn != NULL);
751 	lvs_req->cb_fn(lvs_req->cb_arg, lvserrno);
752 	free(lvs_req);
753 }
754 
755 int
756 spdk_lvs_unload(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn,
757 		void *cb_arg)
758 {
759 	struct spdk_lvs_req *lvs_req;
760 	struct spdk_lvol *lvol, *tmp;
761 
762 	if (lvs == NULL) {
763 		SPDK_ERRLOG("Lvol store is NULL\n");
764 		return -ENODEV;
765 	}
766 
767 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
768 		if (lvol->action_in_progress == true) {
769 			SPDK_ERRLOG("Cannot unload lvol store - operations on lvols pending\n");
770 			cb_fn(cb_arg, -EBUSY);
771 			return -EBUSY;
772 		} else if (lvol->ref_count != 0) {
773 			SPDK_ERRLOG("Lvols still open on lvol store\n");
774 			cb_fn(cb_arg, -EBUSY);
775 			return -EBUSY;
776 		}
777 	}
778 
779 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
780 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
781 		_spdk_lvol_free(lvol);
782 	}
783 
784 	lvs_req = calloc(1, sizeof(*lvs_req));
785 	if (!lvs_req) {
786 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
787 		return -ENOMEM;
788 	}
789 
790 	lvs_req->cb_fn = cb_fn;
791 	lvs_req->cb_arg = cb_arg;
792 
793 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Unloading lvol store\n");
794 	spdk_bs_unload(lvs->blobstore, _lvs_unload_cb, lvs_req);
795 	_spdk_lvs_free(lvs);
796 
797 	return 0;
798 }
799 
800 static void
801 _lvs_destroy_cb(void *cb_arg, int lvserrno)
802 {
803 	struct spdk_lvs_destroy_req *lvs_req = cb_arg;
804 
805 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol store destroyed\n");
806 	assert(lvs_req->cb_fn != NULL);
807 	lvs_req->cb_fn(lvs_req->cb_arg, lvserrno);
808 	free(lvs_req);
809 }
810 
811 static void
812 _lvs_destroy_super_cb(void *cb_arg, int bserrno)
813 {
814 	struct spdk_lvs_destroy_req *lvs_req = cb_arg;
815 	struct spdk_lvol_store *lvs = lvs_req->lvs;
816 
817 	assert(lvs != NULL);
818 
819 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Destroying lvol store\n");
820 	spdk_bs_destroy(lvs->blobstore, _lvs_destroy_cb, lvs_req);
821 	_spdk_lvs_free(lvs);
822 }
823 
824 int
825 spdk_lvs_destroy(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn,
826 		 void *cb_arg)
827 {
828 	struct spdk_lvs_destroy_req *lvs_req;
829 	struct spdk_lvol *iter_lvol, *tmp;
830 
831 	if (lvs == NULL) {
832 		SPDK_ERRLOG("Lvol store is NULL\n");
833 		return -ENODEV;
834 	}
835 
836 	TAILQ_FOREACH_SAFE(iter_lvol, &lvs->lvols, link, tmp) {
837 		if (iter_lvol->action_in_progress == true) {
838 			SPDK_ERRLOG("Cannot destroy lvol store - operations on lvols pending\n");
839 			cb_fn(cb_arg, -EBUSY);
840 			return -EBUSY;
841 		} else if (iter_lvol->ref_count != 0) {
842 			SPDK_ERRLOG("Lvols still open on lvol store\n");
843 			cb_fn(cb_arg, -EBUSY);
844 			return -EBUSY;
845 		}
846 	}
847 
848 	TAILQ_FOREACH_SAFE(iter_lvol, &lvs->lvols, link, tmp) {
849 		free(iter_lvol);
850 	}
851 
852 	lvs_req = calloc(1, sizeof(*lvs_req));
853 	if (!lvs_req) {
854 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
855 		return -ENOMEM;
856 	}
857 
858 	lvs_req->cb_fn = cb_fn;
859 	lvs_req->cb_arg = cb_arg;
860 	lvs_req->lvs = lvs;
861 
862 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Deleting super blob\n");
863 	spdk_bs_delete_blob(lvs->blobstore, lvs->super_blob_id, _lvs_destroy_super_cb, lvs_req);
864 
865 	return 0;
866 }
867 
868 static void
869 _spdk_lvol_close_blob_cb(void *cb_arg, int lvolerrno)
870 {
871 	struct spdk_lvol_req *req = cb_arg;
872 	struct spdk_lvol *lvol = req->lvol;
873 
874 	if (lvolerrno < 0) {
875 		SPDK_ERRLOG("Could not close blob on lvol\n");
876 		_spdk_lvol_free(lvol);
877 		goto end;
878 	}
879 
880 	lvol->ref_count--;
881 	lvol->action_in_progress = false;
882 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol %s closed\n", lvol->unique_id);
883 
884 end:
885 	req->cb_fn(req->cb_arg, lvolerrno);
886 	free(req);
887 }
888 
889 bool
890 spdk_lvol_deletable(struct spdk_lvol *lvol)
891 {
892 	size_t count;
893 
894 	spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
895 	return (count == 0);
896 }
897 
898 static void
899 _spdk_lvol_delete_blob_cb(void *cb_arg, int lvolerrno)
900 {
901 	struct spdk_lvol_req *req = cb_arg;
902 	struct spdk_lvol *lvol = req->lvol;
903 
904 	if (lvolerrno < 0) {
905 		SPDK_ERRLOG("Could not delete blob on lvol\n");
906 		goto end;
907 	}
908 
909 	TAILQ_REMOVE(&lvol->lvol_store->lvols, lvol, link);
910 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol %s deleted\n", lvol->unique_id);
911 
912 end:
913 	_spdk_lvol_free(lvol);
914 	req->cb_fn(req->cb_arg, lvolerrno);
915 	free(req);
916 }
917 
918 static void
919 _spdk_lvol_create_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
920 {
921 	struct spdk_lvol_with_handle_req *req = cb_arg;
922 	spdk_blob_id blob_id = spdk_blob_get_id(blob);
923 	struct spdk_lvol *lvol = req->lvol;
924 
925 	TAILQ_REMOVE(&req->lvol->lvol_store->pending_lvols, req->lvol, link);
926 
927 	if (lvolerrno < 0) {
928 		free(lvol);
929 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
930 		free(req);
931 		return;
932 	}
933 
934 	lvol->blob = blob;
935 	lvol->blob_id = blob_id;
936 
937 	TAILQ_INSERT_TAIL(&lvol->lvol_store->lvols, lvol, link);
938 
939 	snprintf(lvol->unique_id, sizeof(lvol->unique_id), "%s", lvol->uuid_str);
940 	lvol->ref_count++;
941 
942 	assert(req->cb_fn != NULL);
943 	req->cb_fn(req->cb_arg, req->lvol, lvolerrno);
944 	free(req);
945 }
946 
947 static void
948 _spdk_lvol_create_cb(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
949 {
950 	struct spdk_lvol_with_handle_req *req = cb_arg;
951 	struct spdk_blob_store *bs;
952 	struct spdk_blob_open_opts opts;
953 
954 	if (lvolerrno < 0) {
955 		TAILQ_REMOVE(&req->lvol->lvol_store->pending_lvols, req->lvol, link);
956 		free(req->lvol);
957 		assert(req->cb_fn != NULL);
958 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
959 		free(req);
960 		return;
961 	}
962 
963 	spdk_blob_open_opts_init(&opts);
964 	opts.clear_method = req->lvol->clear_method;
965 	bs = req->lvol->lvol_store->blobstore;
966 
967 	spdk_bs_open_blob_ext(bs, blobid, &opts, _spdk_lvol_create_open_cb, req);
968 }
969 
970 static void
971 spdk_lvol_get_xattr_value(void *xattr_ctx, const char *name,
972 			  const void **value, size_t *value_len)
973 {
974 	struct spdk_lvol *lvol = xattr_ctx;
975 
976 	if (!strcmp(LVOL_NAME, name)) {
977 		*value = lvol->name;
978 		*value_len = SPDK_LVOL_NAME_MAX;
979 	} else if (!strcmp("uuid", name)) {
980 		*value = lvol->uuid_str;
981 		*value_len = sizeof(lvol->uuid_str);
982 	} else if (!strcmp("clear_method", name)) {
983 		*value = &lvol->clear_method;
984 		*value_len = sizeof(int);
985 	}
986 }
987 
988 static int
989 _spdk_lvs_verify_lvol_name(struct spdk_lvol_store *lvs, const char *name)
990 {
991 	struct spdk_lvol *tmp;
992 
993 	if (name == NULL || strnlen(name, SPDK_LVOL_NAME_MAX) == 0) {
994 		SPDK_INFOLOG(SPDK_LOG_LVOL, "lvol name not provided.\n");
995 		return -EINVAL;
996 	}
997 
998 	if (strnlen(name, SPDK_LVOL_NAME_MAX) == SPDK_LVOL_NAME_MAX) {
999 		SPDK_ERRLOG("Name has no null terminator.\n");
1000 		return -EINVAL;
1001 	}
1002 
1003 	TAILQ_FOREACH(tmp, &lvs->lvols, link) {
1004 		if (!strncmp(name, tmp->name, SPDK_LVOL_NAME_MAX)) {
1005 			SPDK_ERRLOG("lvol with name %s already exists\n", name);
1006 			return -EEXIST;
1007 		}
1008 	}
1009 
1010 	TAILQ_FOREACH(tmp, &lvs->pending_lvols, link) {
1011 		if (!strncmp(name, tmp->name, SPDK_LVOL_NAME_MAX)) {
1012 			SPDK_ERRLOG("lvol with name %s is being already created\n", name);
1013 			return -EEXIST;
1014 		}
1015 	}
1016 
1017 	return 0;
1018 }
1019 
1020 int
1021 spdk_lvol_create(struct spdk_lvol_store *lvs, const char *name, uint64_t sz,
1022 		 bool thin_provision, enum lvol_clear_method clear_method, spdk_lvol_op_with_handle_complete cb_fn,
1023 		 void *cb_arg)
1024 {
1025 	struct spdk_lvol_with_handle_req *req;
1026 	struct spdk_blob_store *bs;
1027 	struct spdk_lvol *lvol;
1028 	struct spdk_blob_opts opts;
1029 	uint64_t num_clusters;
1030 	char *xattr_names[] = {LVOL_NAME, "uuid", "clear_method"};
1031 	int rc;
1032 
1033 	if (lvs == NULL) {
1034 		SPDK_ERRLOG("lvol store does not exist\n");
1035 		return -EINVAL;
1036 	}
1037 
1038 	rc = _spdk_lvs_verify_lvol_name(lvs, name);
1039 	if (rc < 0) {
1040 		return rc;
1041 	}
1042 
1043 	bs = lvs->blobstore;
1044 
1045 	req = calloc(1, sizeof(*req));
1046 	if (!req) {
1047 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1048 		return -ENOMEM;
1049 	}
1050 	req->cb_fn = cb_fn;
1051 	req->cb_arg = cb_arg;
1052 
1053 	lvol = calloc(1, sizeof(*lvol));
1054 	if (!lvol) {
1055 		free(req);
1056 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1057 		return -ENOMEM;
1058 	}
1059 	lvol->lvol_store = lvs;
1060 	num_clusters = spdk_divide_round_up(sz, spdk_bs_get_cluster_size(bs));
1061 	lvol->thin_provision = thin_provision;
1062 	lvol->clear_method = (enum blob_clear_method)clear_method;
1063 	snprintf(lvol->name, sizeof(lvol->name), "%s", name);
1064 	TAILQ_INSERT_TAIL(&lvol->lvol_store->pending_lvols, lvol, link);
1065 	spdk_uuid_generate(&lvol->uuid);
1066 	spdk_uuid_fmt_lower(lvol->uuid_str, sizeof(lvol->uuid_str), &lvol->uuid);
1067 	req->lvol = lvol;
1068 
1069 	spdk_blob_opts_init(&opts);
1070 	opts.thin_provision = thin_provision;
1071 	opts.num_clusters = num_clusters;
1072 	opts.xattrs.count = SPDK_COUNTOF(xattr_names);
1073 	opts.xattrs.names = xattr_names;
1074 	opts.xattrs.ctx = lvol;
1075 	opts.xattrs.get_value = spdk_lvol_get_xattr_value;
1076 
1077 	spdk_bs_create_blob_ext(lvs->blobstore, &opts, _spdk_lvol_create_cb, req);
1078 
1079 	return 0;
1080 }
1081 
1082 void
1083 spdk_lvol_create_snapshot(struct spdk_lvol *origlvol, const char *snapshot_name,
1084 			  spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1085 {
1086 	struct spdk_lvol_store *lvs;
1087 	struct spdk_lvol *newlvol;
1088 	struct spdk_blob *origblob;
1089 	struct spdk_lvol_with_handle_req *req;
1090 	struct spdk_blob_xattr_opts snapshot_xattrs;
1091 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1092 	int rc;
1093 
1094 	if (origlvol == NULL) {
1095 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol not provided.\n");
1096 		cb_fn(cb_arg, NULL, -EINVAL);
1097 		return;
1098 	}
1099 
1100 	origblob = origlvol->blob;
1101 	lvs = origlvol->lvol_store;
1102 	if (lvs == NULL) {
1103 		SPDK_ERRLOG("lvol store does not exist\n");
1104 		cb_fn(cb_arg, NULL, -EINVAL);
1105 		return;
1106 	}
1107 
1108 	rc = _spdk_lvs_verify_lvol_name(lvs, snapshot_name);
1109 	if (rc < 0) {
1110 		cb_fn(cb_arg, NULL, rc);
1111 		return;
1112 	}
1113 
1114 	req = calloc(1, sizeof(*req));
1115 	if (!req) {
1116 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1117 		cb_fn(cb_arg, NULL, -ENOMEM);
1118 		return;
1119 	}
1120 
1121 	newlvol = calloc(1, sizeof(*newlvol));
1122 	if (!newlvol) {
1123 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1124 		free(req);
1125 		cb_fn(cb_arg, NULL, -ENOMEM);
1126 		return;
1127 	}
1128 
1129 	newlvol->lvol_store = origlvol->lvol_store;
1130 	snprintf(newlvol->name, sizeof(newlvol->name), "%s", snapshot_name);
1131 	TAILQ_INSERT_TAIL(&newlvol->lvol_store->pending_lvols, newlvol, link);
1132 	spdk_uuid_generate(&newlvol->uuid);
1133 	spdk_uuid_fmt_lower(newlvol->uuid_str, sizeof(newlvol->uuid_str), &newlvol->uuid);
1134 	snapshot_xattrs.count = SPDK_COUNTOF(xattr_names);
1135 	snapshot_xattrs.ctx = newlvol;
1136 	snapshot_xattrs.names = xattr_names;
1137 	snapshot_xattrs.get_value = spdk_lvol_get_xattr_value;
1138 	req->lvol = newlvol;
1139 	req->cb_fn = cb_fn;
1140 	req->cb_arg = cb_arg;
1141 
1142 	spdk_bs_create_snapshot(lvs->blobstore, spdk_blob_get_id(origblob), &snapshot_xattrs,
1143 				_spdk_lvol_create_cb, req);
1144 }
1145 
1146 void
1147 spdk_lvol_create_clone(struct spdk_lvol *origlvol, const char *clone_name,
1148 		       spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1149 {
1150 	struct spdk_lvol *newlvol;
1151 	struct spdk_lvol_with_handle_req *req;
1152 	struct spdk_lvol_store *lvs;
1153 	struct spdk_blob *origblob;
1154 	struct spdk_blob_xattr_opts clone_xattrs;
1155 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1156 	int rc;
1157 
1158 	if (origlvol == NULL) {
1159 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol not provided.\n");
1160 		cb_fn(cb_arg, NULL, -EINVAL);
1161 		return;
1162 	}
1163 
1164 	origblob = origlvol->blob;
1165 	lvs = origlvol->lvol_store;
1166 	if (lvs == NULL) {
1167 		SPDK_ERRLOG("lvol store does not exist\n");
1168 		cb_fn(cb_arg, NULL, -EINVAL);
1169 		return;
1170 	}
1171 
1172 	rc = _spdk_lvs_verify_lvol_name(lvs, clone_name);
1173 	if (rc < 0) {
1174 		cb_fn(cb_arg, NULL, rc);
1175 		return;
1176 	}
1177 
1178 	req = calloc(1, sizeof(*req));
1179 	if (!req) {
1180 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1181 		cb_fn(cb_arg, NULL, -ENOMEM);
1182 		return;
1183 	}
1184 
1185 	newlvol = calloc(1, sizeof(*newlvol));
1186 	if (!newlvol) {
1187 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1188 		free(req);
1189 		cb_fn(cb_arg, NULL, -ENOMEM);
1190 		return;
1191 	}
1192 
1193 	newlvol->lvol_store = lvs;
1194 	snprintf(newlvol->name, sizeof(newlvol->name), "%s", clone_name);
1195 	TAILQ_INSERT_TAIL(&newlvol->lvol_store->pending_lvols, newlvol, link);
1196 	spdk_uuid_generate(&newlvol->uuid);
1197 	spdk_uuid_fmt_lower(newlvol->uuid_str, sizeof(newlvol->uuid_str), &newlvol->uuid);
1198 	clone_xattrs.count = SPDK_COUNTOF(xattr_names);
1199 	clone_xattrs.ctx = newlvol;
1200 	clone_xattrs.names = xattr_names;
1201 	clone_xattrs.get_value = spdk_lvol_get_xattr_value;
1202 	req->lvol = newlvol;
1203 	req->cb_fn = cb_fn;
1204 	req->cb_arg = cb_arg;
1205 
1206 	spdk_bs_create_clone(lvs->blobstore, spdk_blob_get_id(origblob), &clone_xattrs,
1207 			     _spdk_lvol_create_cb,
1208 			     req);
1209 }
1210 
1211 static void
1212 _spdk_lvol_resize_done(void *cb_arg, int lvolerrno)
1213 {
1214 	struct spdk_lvol_req *req = cb_arg;
1215 
1216 	req->cb_fn(req->cb_arg,  lvolerrno);
1217 	free(req);
1218 }
1219 
1220 static void
1221 _spdk_lvol_blob_resize_cb(void *cb_arg, int bserrno)
1222 {
1223 	struct spdk_lvol_req *req = cb_arg;
1224 	struct spdk_lvol *lvol = req->lvol;
1225 
1226 	if (bserrno != 0) {
1227 		req->cb_fn(req->cb_arg, bserrno);
1228 		free(req);
1229 		return;
1230 	}
1231 
1232 	spdk_blob_sync_md(lvol->blob, _spdk_lvol_resize_done, req);
1233 }
1234 
1235 void
1236 spdk_lvol_resize(struct spdk_lvol *lvol, uint64_t sz,
1237 		 spdk_lvol_op_complete cb_fn, void *cb_arg)
1238 {
1239 	struct spdk_blob *blob = lvol->blob;
1240 	struct spdk_lvol_store *lvs = lvol->lvol_store;
1241 	struct spdk_lvol_req *req;
1242 	uint64_t new_clusters = spdk_divide_round_up(sz, spdk_bs_get_cluster_size(lvs->blobstore));
1243 
1244 	req = calloc(1, sizeof(*req));
1245 	if (!req) {
1246 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1247 		cb_fn(cb_arg, -ENOMEM);
1248 		return;
1249 	}
1250 	req->cb_fn = cb_fn;
1251 	req->cb_arg = cb_arg;
1252 	req->lvol = lvol;
1253 
1254 	spdk_blob_resize(blob, new_clusters, _spdk_lvol_blob_resize_cb, req);
1255 }
1256 
1257 static void
1258 _spdk_lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
1259 {
1260 	struct spdk_lvol_req *req = cb_arg;
1261 
1262 	req->cb_fn(req->cb_arg, lvolerrno);
1263 	free(req);
1264 }
1265 
1266 void
1267 spdk_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1268 {
1269 	struct spdk_lvol_req *req;
1270 
1271 	req = calloc(1, sizeof(*req));
1272 	if (!req) {
1273 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1274 		cb_fn(cb_arg, -ENOMEM);
1275 		return;
1276 	}
1277 	req->cb_fn = cb_fn;
1278 	req->cb_arg = cb_arg;
1279 
1280 	spdk_blob_set_read_only(lvol->blob);
1281 	spdk_blob_sync_md(lvol->blob, _spdk_lvol_set_read_only_cb, req);
1282 }
1283 
1284 static void
1285 _spdk_lvol_rename_cb(void *cb_arg, int lvolerrno)
1286 {
1287 	struct spdk_lvol_req *req = cb_arg;
1288 
1289 	if (lvolerrno != 0) {
1290 		SPDK_ERRLOG("Lvol rename operation failed\n");
1291 	} else {
1292 		snprintf(req->lvol->name, sizeof(req->lvol->name), "%s", req->name);
1293 	}
1294 
1295 	req->cb_fn(req->cb_arg, lvolerrno);
1296 	free(req);
1297 }
1298 
1299 void
1300 spdk_lvol_rename(struct spdk_lvol *lvol, const char *new_name,
1301 		 spdk_lvol_op_complete cb_fn, void *cb_arg)
1302 {
1303 	struct spdk_lvol *tmp;
1304 	struct spdk_blob *blob = lvol->blob;
1305 	struct spdk_lvol_req *req;
1306 	int rc;
1307 
1308 	/* Check if new name is current lvol name.
1309 	 * If so, return success immediately */
1310 	if (strncmp(lvol->name, new_name, SPDK_LVOL_NAME_MAX) == 0) {
1311 		cb_fn(cb_arg, 0);
1312 		return;
1313 	}
1314 
1315 	/* Check if lvol with 'new_name' already exists in lvolstore */
1316 	TAILQ_FOREACH(tmp, &lvol->lvol_store->lvols, link) {
1317 		if (strncmp(tmp->name, new_name, SPDK_LVOL_NAME_MAX) == 0) {
1318 			SPDK_ERRLOG("Lvol %s already exists in lvol store %s\n", new_name, lvol->lvol_store->name);
1319 			cb_fn(cb_arg, -EEXIST);
1320 			return;
1321 		}
1322 	}
1323 
1324 	req = calloc(1, sizeof(*req));
1325 	if (!req) {
1326 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1327 		cb_fn(cb_arg, -ENOMEM);
1328 		return;
1329 	}
1330 	req->cb_fn = cb_fn;
1331 	req->cb_arg = cb_arg;
1332 	req->lvol = lvol;
1333 	snprintf(req->name, sizeof(req->name), "%s", new_name);
1334 
1335 	rc = spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1336 	if (rc < 0) {
1337 		free(req);
1338 		cb_fn(cb_arg, rc);
1339 		return;
1340 	}
1341 
1342 	spdk_blob_sync_md(blob, _spdk_lvol_rename_cb, req);
1343 }
1344 
1345 void
1346 spdk_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1347 {
1348 	struct spdk_lvol_req *req;
1349 	struct spdk_blob_store *bs;
1350 
1351 	assert(cb_fn != NULL);
1352 
1353 	if (lvol == NULL) {
1354 		SPDK_ERRLOG("lvol does not exist\n");
1355 		cb_fn(cb_arg, -ENODEV);
1356 		return;
1357 	}
1358 
1359 	if (lvol->ref_count != 0) {
1360 		SPDK_ERRLOG("Cannot destroy lvol %s because it is still open\n", lvol->unique_id);
1361 		cb_fn(cb_arg, -EBUSY);
1362 		return;
1363 	}
1364 
1365 	lvol->action_in_progress = true;
1366 
1367 	req = calloc(1, sizeof(*req));
1368 	if (!req) {
1369 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1370 		cb_fn(cb_arg, -ENOMEM);
1371 		return;
1372 	}
1373 
1374 	req->cb_fn = cb_fn;
1375 	req->cb_arg = cb_arg;
1376 	req->lvol = lvol;
1377 	bs = lvol->lvol_store->blobstore;
1378 
1379 	spdk_bs_delete_blob(bs, lvol->blob_id, _spdk_lvol_delete_blob_cb, req);
1380 }
1381 
1382 void
1383 spdk_lvol_close(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1384 {
1385 	struct spdk_lvol_req *req;
1386 
1387 	assert(cb_fn != NULL);
1388 
1389 	if (lvol == NULL) {
1390 		SPDK_ERRLOG("lvol does not exist\n");
1391 		cb_fn(cb_arg, -ENODEV);
1392 		return;
1393 	}
1394 
1395 	if (lvol->ref_count > 1) {
1396 		lvol->ref_count--;
1397 		cb_fn(cb_arg, 0);
1398 		return;
1399 	} else if (lvol->ref_count == 0) {
1400 		cb_fn(cb_arg, -EINVAL);
1401 		return;
1402 	}
1403 
1404 	lvol->action_in_progress = true;
1405 
1406 	req = calloc(1, sizeof(*req));
1407 	if (!req) {
1408 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1409 		cb_fn(cb_arg, -ENOMEM);
1410 		return;
1411 	}
1412 
1413 	req->cb_fn = cb_fn;
1414 	req->cb_arg = cb_arg;
1415 	req->lvol = lvol;
1416 
1417 	spdk_blob_close(lvol->blob, _spdk_lvol_close_blob_cb, req);
1418 }
1419 
1420 struct spdk_io_channel *
1421 spdk_lvol_get_io_channel(struct spdk_lvol *lvol)
1422 {
1423 	return spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1424 }
1425 
1426 static void
1427 _spdk_lvol_inflate_cb(void *cb_arg, int lvolerrno)
1428 {
1429 	struct spdk_lvol_req *req = cb_arg;
1430 
1431 	spdk_bs_free_io_channel(req->channel);
1432 
1433 	if (lvolerrno < 0) {
1434 		SPDK_ERRLOG("Could not inflate lvol\n");
1435 	}
1436 
1437 	req->cb_fn(req->cb_arg, lvolerrno);
1438 	free(req);
1439 }
1440 
1441 void
1442 spdk_lvol_inflate(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1443 {
1444 	struct spdk_lvol_req *req;
1445 	spdk_blob_id blob_id;
1446 
1447 	assert(cb_fn != NULL);
1448 
1449 	if (lvol == NULL) {
1450 		SPDK_ERRLOG("Lvol does not exist\n");
1451 		cb_fn(cb_arg, -ENODEV);
1452 		return;
1453 	}
1454 
1455 	req = calloc(1, sizeof(*req));
1456 	if (!req) {
1457 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1458 		cb_fn(cb_arg, -ENOMEM);
1459 		return;
1460 	}
1461 
1462 	req->cb_fn = cb_fn;
1463 	req->cb_arg = cb_arg;
1464 	req->channel = spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1465 	if (req->channel == NULL) {
1466 		SPDK_ERRLOG("Cannot alloc io channel for lvol inflate request\n");
1467 		free(req);
1468 		cb_fn(cb_arg, -ENOMEM);
1469 		return;
1470 	}
1471 
1472 	blob_id = spdk_blob_get_id(lvol->blob);
1473 	spdk_bs_inflate_blob(lvol->lvol_store->blobstore, req->channel, blob_id, _spdk_lvol_inflate_cb,
1474 			     req);
1475 }
1476 
1477 void
1478 spdk_lvol_decouple_parent(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1479 {
1480 	struct spdk_lvol_req *req;
1481 	spdk_blob_id blob_id;
1482 
1483 	assert(cb_fn != NULL);
1484 
1485 	if (lvol == NULL) {
1486 		SPDK_ERRLOG("Lvol does not exist\n");
1487 		cb_fn(cb_arg, -ENODEV);
1488 		return;
1489 	}
1490 
1491 	req = calloc(1, sizeof(*req));
1492 	if (!req) {
1493 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1494 		cb_fn(cb_arg, -ENOMEM);
1495 		return;
1496 	}
1497 
1498 	req->cb_fn = cb_fn;
1499 	req->cb_arg = cb_arg;
1500 	req->channel = spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1501 	if (req->channel == NULL) {
1502 		SPDK_ERRLOG("Cannot alloc io channel for lvol inflate request\n");
1503 		free(req);
1504 		cb_fn(cb_arg, -ENOMEM);
1505 		return;
1506 	}
1507 
1508 	blob_id = spdk_blob_get_id(lvol->blob);
1509 	spdk_bs_blob_decouple_parent(lvol->lvol_store->blobstore, req->channel, blob_id,
1510 				     _spdk_lvol_inflate_cb, req);
1511 }
1512