xref: /spdk/lib/lvol/lvol.c (revision adb39585efa6b2bcdf2227606563b1e5947d145a)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk_internal/lvolstore.h"
35 #include "spdk_internal/log.h"
36 #include "spdk/string.h"
37 #include "spdk/thread.h"
38 #include "spdk/blob_bdev.h"
39 #include "spdk/util.h"
40 
41 /* Default blob channel opts for lvol */
42 #define SPDK_LVOL_BLOB_OPTS_CHANNEL_OPS 512
43 
44 #define LVOL_NAME "name"
45 
46 SPDK_LOG_REGISTER_COMPONENT("lvol", SPDK_LOG_LVOL)
47 
48 static TAILQ_HEAD(, spdk_lvol_store) g_lvol_stores = TAILQ_HEAD_INITIALIZER(g_lvol_stores);
49 static pthread_mutex_t g_lvol_stores_mutex = PTHREAD_MUTEX_INITIALIZER;
50 
51 static int
52 _spdk_add_lvs_to_list(struct spdk_lvol_store *lvs)
53 {
54 	struct spdk_lvol_store *tmp;
55 	bool name_conflict = false;
56 
57 	pthread_mutex_lock(&g_lvol_stores_mutex);
58 	TAILQ_FOREACH(tmp, &g_lvol_stores, link) {
59 		if (!strncmp(lvs->name, tmp->name, SPDK_LVS_NAME_MAX)) {
60 			name_conflict = true;
61 			break;
62 		}
63 	}
64 	if (!name_conflict) {
65 		lvs->on_list = true;
66 		TAILQ_INSERT_TAIL(&g_lvol_stores, lvs, link);
67 	}
68 	pthread_mutex_unlock(&g_lvol_stores_mutex);
69 
70 	return name_conflict ? -1 : 0;
71 }
72 
73 static void
74 _spdk_lvs_free(struct spdk_lvol_store *lvs)
75 {
76 	pthread_mutex_lock(&g_lvol_stores_mutex);
77 	if (lvs->on_list) {
78 		TAILQ_REMOVE(&g_lvol_stores, lvs, link);
79 	}
80 	pthread_mutex_unlock(&g_lvol_stores_mutex);
81 
82 	free(lvs);
83 }
84 
85 static void
86 _spdk_lvol_free(struct spdk_lvol *lvol)
87 {
88 	free(lvol);
89 }
90 
91 static void
92 _spdk_lvol_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
93 {
94 	struct spdk_lvol_with_handle_req *req = cb_arg;
95 	struct spdk_lvol *lvol = req->lvol;
96 
97 	if (lvolerrno != 0) {
98 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Failed to open lvol %s\n", lvol->unique_id);
99 		goto end;
100 	}
101 
102 	lvol->ref_count++;
103 	lvol->blob = blob;
104 end:
105 	req->cb_fn(req->cb_arg, lvol, lvolerrno);
106 	free(req);
107 }
108 
109 void
110 spdk_lvol_open(struct spdk_lvol *lvol, spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
111 {
112 	struct spdk_lvol_with_handle_req *req;
113 	struct spdk_blob_open_opts opts;
114 
115 	assert(cb_fn != NULL);
116 
117 	if (lvol == NULL) {
118 		SPDK_ERRLOG("lvol does not exist\n");
119 		cb_fn(cb_arg, NULL, -ENODEV);
120 		return;
121 	}
122 
123 	if (lvol->action_in_progress == true) {
124 		SPDK_ERRLOG("Cannot open lvol - operations on lvol pending\n");
125 		cb_fn(cb_arg, lvol, -EBUSY);
126 		return;
127 	}
128 
129 	if (lvol->ref_count > 0) {
130 		lvol->ref_count++;
131 		cb_fn(cb_arg, lvol, 0);
132 		return;
133 	}
134 
135 	req = calloc(1, sizeof(*req));
136 	if (req == NULL) {
137 		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
138 		cb_fn(cb_arg, NULL, -ENOMEM);
139 		return;
140 	}
141 
142 	req->cb_fn = cb_fn;
143 	req->cb_arg = cb_arg;
144 	req->lvol = lvol;
145 
146 	spdk_blob_open_opts_init(&opts);
147 	opts.clear_method = lvol->clear_method;
148 
149 	spdk_bs_open_blob_ext(lvol->lvol_store->blobstore, lvol->blob_id, &opts, _spdk_lvol_open_cb, req);
150 }
151 
152 static void
153 _spdk_bs_unload_with_error_cb(void *cb_arg, int lvolerrno)
154 {
155 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
156 
157 	req->cb_fn(req->cb_arg, NULL, req->lvserrno);
158 	free(req);
159 }
160 
161 static void
162 _spdk_load_next_lvol(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
163 {
164 	struct spdk_lvs_with_handle_req *req = cb_arg;
165 	struct spdk_lvol_store *lvs = req->lvol_store;
166 	struct spdk_blob_store *bs = lvs->blobstore;
167 	struct spdk_lvol *lvol, *tmp;
168 	spdk_blob_id blob_id;
169 	const char *attr;
170 	const enum blob_clear_method *clear_method;
171 	size_t value_len;
172 	int rc;
173 
174 	if (lvolerrno == -ENOENT) {
175 		/* Finished iterating */
176 		req->cb_fn(req->cb_arg, lvs, 0);
177 		free(req);
178 		return;
179 	} else if (lvolerrno < 0) {
180 		SPDK_ERRLOG("Failed to fetch blobs list\n");
181 		req->lvserrno = lvolerrno;
182 		goto invalid;
183 	}
184 
185 	blob_id = spdk_blob_get_id(blob);
186 
187 	if (blob_id == lvs->super_blob_id) {
188 		SPDK_INFOLOG(SPDK_LOG_LVOL, "found superblob %"PRIu64"\n", (uint64_t)blob_id);
189 		spdk_bs_iter_next(bs, blob, _spdk_load_next_lvol, req);
190 		return;
191 	}
192 
193 	lvol = calloc(1, sizeof(*lvol));
194 	if (!lvol) {
195 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
196 		req->lvserrno = -ENOMEM;
197 		goto invalid;
198 	}
199 
200 	lvol->blob = blob;
201 	lvol->blob_id = blob_id;
202 	lvol->lvol_store = lvs;
203 
204 	rc = spdk_blob_get_xattr_value(blob, "uuid", (const void **)&attr, &value_len);
205 	if (rc != 0 || value_len != SPDK_UUID_STRING_LEN || attr[SPDK_UUID_STRING_LEN - 1] != '\0' ||
206 	    spdk_uuid_parse(&lvol->uuid, attr) != 0) {
207 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Missing or corrupt lvol uuid\n");
208 		memset(&lvol->uuid, 0, sizeof(lvol->uuid));
209 	}
210 	spdk_uuid_fmt_lower(lvol->uuid_str, sizeof(lvol->uuid_str), &lvol->uuid);
211 
212 	if (!spdk_mem_all_zero(&lvol->uuid, sizeof(lvol->uuid))) {
213 		snprintf(lvol->unique_id, sizeof(lvol->unique_id), "%s", lvol->uuid_str);
214 	} else {
215 		spdk_uuid_fmt_lower(lvol->unique_id, sizeof(lvol->unique_id), &lvol->lvol_store->uuid);
216 		value_len = strlen(lvol->unique_id);
217 		snprintf(lvol->unique_id + value_len, sizeof(lvol->unique_id) - value_len, "_%"PRIu64,
218 			 (uint64_t)blob_id);
219 	}
220 
221 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&attr, &value_len);
222 	if (rc != 0 || value_len > SPDK_LVOL_NAME_MAX) {
223 		SPDK_ERRLOG("Cannot assign lvol name\n");
224 		_spdk_lvol_free(lvol);
225 		req->lvserrno = -EINVAL;
226 		goto invalid;
227 	}
228 
229 	rc = spdk_blob_get_xattr_value(blob, "clear_method", (const void **)&clear_method, &value_len);
230 	if (rc != 0) {
231 		lvol->clear_method = BLOB_CLEAR_WITH_DEFAULT;
232 	} else {
233 		lvol->clear_method = *clear_method;
234 	}
235 
236 	snprintf(lvol->name, sizeof(lvol->name), "%s", attr);
237 
238 	TAILQ_INSERT_TAIL(&lvs->lvols, lvol, link);
239 
240 	lvs->lvol_count++;
241 
242 	SPDK_INFOLOG(SPDK_LOG_LVOL, "added lvol %s (%s)\n", lvol->unique_id, lvol->uuid_str);
243 
244 	spdk_bs_iter_next(bs, blob, _spdk_load_next_lvol, req);
245 
246 	return;
247 
248 invalid:
249 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
250 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
251 		free(lvol);
252 	}
253 
254 	_spdk_lvs_free(lvs);
255 	spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
256 }
257 
258 static void
259 _spdk_close_super_cb(void *cb_arg, int lvolerrno)
260 {
261 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
262 	struct spdk_lvol_store *lvs = req->lvol_store;
263 	struct spdk_blob_store *bs = lvs->blobstore;
264 
265 	if (lvolerrno != 0) {
266 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Could not close super blob\n");
267 		_spdk_lvs_free(lvs);
268 		req->lvserrno = -ENODEV;
269 		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
270 		return;
271 	}
272 
273 	/* Start loading lvols */
274 	spdk_bs_iter_first(lvs->blobstore, _spdk_load_next_lvol, req);
275 }
276 
277 static void
278 _spdk_close_super_blob_with_error_cb(void *cb_arg, int lvolerrno)
279 {
280 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
281 	struct spdk_lvol_store *lvs = req->lvol_store;
282 	struct spdk_blob_store *bs = lvs->blobstore;
283 
284 	_spdk_lvs_free(lvs);
285 
286 	spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
287 }
288 
289 static void
290 _spdk_lvs_read_uuid(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
291 {
292 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
293 	struct spdk_lvol_store *lvs = req->lvol_store;
294 	struct spdk_blob_store *bs = lvs->blobstore;
295 	const char *attr;
296 	size_t value_len;
297 	int rc;
298 
299 	if (lvolerrno != 0) {
300 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Could not open super blob\n");
301 		_spdk_lvs_free(lvs);
302 		req->lvserrno = -ENODEV;
303 		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
304 		return;
305 	}
306 
307 	rc = spdk_blob_get_xattr_value(blob, "uuid", (const void **)&attr, &value_len);
308 	if (rc != 0 || value_len != SPDK_UUID_STRING_LEN || attr[SPDK_UUID_STRING_LEN - 1] != '\0') {
309 		SPDK_INFOLOG(SPDK_LOG_LVOL, "missing or incorrect UUID\n");
310 		req->lvserrno = -EINVAL;
311 		spdk_blob_close(blob, _spdk_close_super_blob_with_error_cb, req);
312 		return;
313 	}
314 
315 	if (spdk_uuid_parse(&lvs->uuid, attr)) {
316 		SPDK_INFOLOG(SPDK_LOG_LVOL, "incorrect UUID '%s'\n", attr);
317 		req->lvserrno = -EINVAL;
318 		spdk_blob_close(blob, _spdk_close_super_blob_with_error_cb, req);
319 		return;
320 	}
321 
322 	rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&attr, &value_len);
323 	if (rc != 0 || value_len > SPDK_LVS_NAME_MAX) {
324 		SPDK_INFOLOG(SPDK_LOG_LVOL, "missing or invalid name\n");
325 		req->lvserrno = -EINVAL;
326 		spdk_blob_close(blob, _spdk_close_super_blob_with_error_cb, req);
327 		return;
328 	}
329 
330 	snprintf(lvs->name, sizeof(lvs->name), "%s", attr);
331 
332 	rc = _spdk_add_lvs_to_list(lvs);
333 	if (rc) {
334 		SPDK_INFOLOG(SPDK_LOG_LVOL, "lvolstore with name %s already exists\n", lvs->name);
335 		req->lvserrno = -EEXIST;
336 		spdk_blob_close(blob, _spdk_close_super_blob_with_error_cb, req);
337 		return;
338 	}
339 
340 	lvs->super_blob_id = spdk_blob_get_id(blob);
341 
342 	spdk_blob_close(blob, _spdk_close_super_cb, req);
343 }
344 
345 static void
346 _spdk_lvs_open_super(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
347 {
348 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
349 	struct spdk_lvol_store *lvs = req->lvol_store;
350 	struct spdk_blob_store *bs = lvs->blobstore;
351 
352 	if (lvolerrno != 0) {
353 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Super blob not found\n");
354 		_spdk_lvs_free(lvs);
355 		req->lvserrno = -ENODEV;
356 		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
357 		return;
358 	}
359 
360 	spdk_bs_open_blob(bs, blobid, _spdk_lvs_read_uuid, req);
361 }
362 
363 static void
364 _spdk_lvs_load_cb(void *cb_arg, struct spdk_blob_store *bs, int lvolerrno)
365 {
366 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)cb_arg;
367 	struct spdk_lvol_store *lvs;
368 
369 	if (lvolerrno != 0) {
370 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
371 		free(req);
372 		return;
373 	}
374 
375 	lvs = calloc(1, sizeof(*lvs));
376 	if (lvs == NULL) {
377 		SPDK_ERRLOG("Cannot alloc memory for lvol store\n");
378 		spdk_bs_unload(bs, _spdk_bs_unload_with_error_cb, req);
379 		return;
380 	}
381 
382 	lvs->blobstore = bs;
383 	lvs->bs_dev = req->bs_dev;
384 	TAILQ_INIT(&lvs->lvols);
385 	TAILQ_INIT(&lvs->pending_lvols);
386 
387 	req->lvol_store = lvs;
388 
389 	spdk_bs_get_super(bs, _spdk_lvs_open_super, req);
390 }
391 
392 static void
393 spdk_lvs_bs_opts_init(struct spdk_bs_opts *opts)
394 {
395 	spdk_bs_opts_init(opts);
396 	opts->max_channel_ops = SPDK_LVOL_BLOB_OPTS_CHANNEL_OPS;
397 }
398 
399 void
400 spdk_lvs_load(struct spdk_bs_dev *bs_dev, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
401 {
402 	struct spdk_lvs_with_handle_req *req;
403 	struct spdk_bs_opts opts = {};
404 
405 	assert(cb_fn != NULL);
406 
407 	if (bs_dev == NULL) {
408 		SPDK_ERRLOG("Blobstore device does not exist\n");
409 		cb_fn(cb_arg, NULL, -ENODEV);
410 		return;
411 	}
412 
413 	req = calloc(1, sizeof(*req));
414 	if (req == NULL) {
415 		SPDK_ERRLOG("Cannot alloc memory for request structure\n");
416 		cb_fn(cb_arg, NULL, -ENOMEM);
417 		return;
418 	}
419 
420 	req->cb_fn = cb_fn;
421 	req->cb_arg = cb_arg;
422 	req->bs_dev = bs_dev;
423 
424 	spdk_lvs_bs_opts_init(&opts);
425 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "LVOLSTORE");
426 
427 	spdk_bs_load(bs_dev, &opts, _spdk_lvs_load_cb, req);
428 }
429 
430 static void
431 _spdk_super_create_close_cb(void *cb_arg, int lvolerrno)
432 {
433 	struct spdk_lvs_with_handle_req *req = cb_arg;
434 	struct spdk_lvol_store *lvs = req->lvol_store;
435 
436 	if (lvolerrno < 0) {
437 		SPDK_ERRLOG("Lvol store init failed: could not close super blob\n");
438 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
439 		_spdk_lvs_free(lvs);
440 		free(req);
441 		return;
442 	}
443 
444 	req->cb_fn(req->cb_arg, lvs, lvolerrno);
445 	free(req);
446 }
447 
448 static void
449 _spdk_super_blob_set_cb(void *cb_arg, int lvolerrno)
450 {
451 	struct spdk_lvs_with_handle_req *req = cb_arg;
452 	struct spdk_lvol_store *lvs = req->lvol_store;
453 	struct spdk_blob *blob = lvs->super_blob;
454 
455 	if (lvolerrno < 0) {
456 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
457 		SPDK_ERRLOG("Lvol store init failed: could not set uuid for super blob\n");
458 		_spdk_lvs_free(lvs);
459 		free(req);
460 		return;
461 	}
462 
463 	spdk_blob_close(blob, _spdk_super_create_close_cb, req);
464 }
465 
466 static void
467 _spdk_super_blob_init_cb(void *cb_arg, int lvolerrno)
468 {
469 	struct spdk_lvs_with_handle_req *req = cb_arg;
470 	struct spdk_lvol_store *lvs = req->lvol_store;
471 	struct spdk_blob *blob = lvs->super_blob;
472 	char uuid[SPDK_UUID_STRING_LEN];
473 
474 	if (lvolerrno < 0) {
475 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
476 		SPDK_ERRLOG("Lvol store init failed: could not set super blob\n");
477 		_spdk_lvs_free(lvs);
478 		free(req);
479 		return;
480 	}
481 
482 	spdk_uuid_fmt_lower(uuid, sizeof(uuid), &lvs->uuid);
483 
484 	spdk_blob_set_xattr(blob, "uuid", uuid, sizeof(uuid));
485 	spdk_blob_set_xattr(blob, "name", lvs->name, strnlen(lvs->name, SPDK_LVS_NAME_MAX) + 1);
486 	spdk_blob_sync_md(blob, _spdk_super_blob_set_cb, req);
487 }
488 
489 static void
490 _spdk_super_blob_create_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
491 {
492 	struct spdk_lvs_with_handle_req *req = cb_arg;
493 	struct spdk_lvol_store *lvs = req->lvol_store;
494 
495 	if (lvolerrno < 0) {
496 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
497 		SPDK_ERRLOG("Lvol store init failed: could not open super blob\n");
498 		_spdk_lvs_free(lvs);
499 		free(req);
500 		return;
501 	}
502 
503 	lvs->super_blob = blob;
504 	lvs->super_blob_id = spdk_blob_get_id(blob);
505 
506 	spdk_bs_set_super(lvs->blobstore, lvs->super_blob_id, _spdk_super_blob_init_cb, req);
507 }
508 
509 static void
510 _spdk_super_blob_create_cb(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
511 {
512 	struct spdk_lvs_with_handle_req *req = cb_arg;
513 	struct spdk_lvol_store *lvs = req->lvol_store;
514 	struct spdk_blob_store *bs;
515 
516 	if (lvolerrno < 0) {
517 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
518 		SPDK_ERRLOG("Lvol store init failed: could not create super blob\n");
519 		_spdk_lvs_free(lvs);
520 		free(req);
521 		return;
522 	}
523 
524 	bs = req->lvol_store->blobstore;
525 
526 	spdk_bs_open_blob(bs, blobid, _spdk_super_blob_create_open_cb, req);
527 }
528 
529 static void
530 _spdk_lvs_init_cb(void *cb_arg, struct spdk_blob_store *bs, int lvserrno)
531 {
532 	struct spdk_lvs_with_handle_req *lvs_req = cb_arg;
533 	struct spdk_lvol_store *lvs = lvs_req->lvol_store;
534 
535 	if (lvserrno != 0) {
536 		assert(bs == NULL);
537 		lvs_req->cb_fn(lvs_req->cb_arg, NULL, lvserrno);
538 		SPDK_ERRLOG("Lvol store init failed: could not initialize blobstore\n");
539 		_spdk_lvs_free(lvs);
540 		free(lvs_req);
541 		return;
542 	}
543 
544 	assert(bs != NULL);
545 	lvs->blobstore = bs;
546 	TAILQ_INIT(&lvs->lvols);
547 	TAILQ_INIT(&lvs->pending_lvols);
548 
549 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol store initialized\n");
550 
551 	/* create super blob */
552 	spdk_bs_create_blob(lvs->blobstore, _spdk_super_blob_create_cb, lvs_req);
553 }
554 
555 void
556 spdk_lvs_opts_init(struct spdk_lvs_opts *o)
557 {
558 	o->cluster_sz = SPDK_LVS_OPTS_CLUSTER_SZ;
559 	memset(o->name, 0, sizeof(o->name));
560 }
561 
562 static void
563 _spdk_setup_lvs_opts(struct spdk_bs_opts *bs_opts, struct spdk_lvs_opts *o)
564 {
565 	assert(o != NULL);
566 	spdk_lvs_bs_opts_init(bs_opts);
567 	bs_opts->cluster_sz = o->cluster_sz;
568 }
569 
570 int
571 spdk_lvs_init(struct spdk_bs_dev *bs_dev, struct spdk_lvs_opts *o,
572 	      spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
573 {
574 	struct spdk_lvol_store *lvs;
575 	struct spdk_lvs_with_handle_req *lvs_req;
576 	struct spdk_bs_opts opts = {};
577 	int rc;
578 
579 	if (bs_dev == NULL) {
580 		SPDK_ERRLOG("Blobstore device does not exist\n");
581 		return -ENODEV;
582 	}
583 
584 	if (o == NULL) {
585 		SPDK_ERRLOG("spdk_lvs_opts not specified\n");
586 		return -EINVAL;
587 	}
588 
589 	_spdk_setup_lvs_opts(&opts, o);
590 
591 	if (strnlen(o->name, SPDK_LVS_NAME_MAX) == SPDK_LVS_NAME_MAX) {
592 		SPDK_ERRLOG("Name has no null terminator.\n");
593 		return -EINVAL;
594 	}
595 
596 	if (strnlen(o->name, SPDK_LVS_NAME_MAX) == 0) {
597 		SPDK_ERRLOG("No name specified.\n");
598 		return -EINVAL;
599 	}
600 
601 	lvs = calloc(1, sizeof(*lvs));
602 	if (!lvs) {
603 		SPDK_ERRLOG("Cannot alloc memory for lvol store base pointer\n");
604 		return -ENOMEM;
605 	}
606 
607 	spdk_uuid_generate(&lvs->uuid);
608 	snprintf(lvs->name, sizeof(lvs->name), "%s", o->name);
609 
610 	rc = _spdk_add_lvs_to_list(lvs);
611 	if (rc) {
612 		SPDK_ERRLOG("lvolstore with name %s already exists\n", lvs->name);
613 		_spdk_lvs_free(lvs);
614 		return -EEXIST;
615 	}
616 
617 	lvs_req = calloc(1, sizeof(*lvs_req));
618 	if (!lvs_req) {
619 		_spdk_lvs_free(lvs);
620 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
621 		return -ENOMEM;
622 	}
623 
624 	assert(cb_fn != NULL);
625 	lvs_req->cb_fn = cb_fn;
626 	lvs_req->cb_arg = cb_arg;
627 	lvs_req->lvol_store = lvs;
628 	lvs->bs_dev = bs_dev;
629 	lvs->destruct = false;
630 
631 	snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "LVOLSTORE");
632 
633 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Initializing lvol store\n");
634 	spdk_bs_init(bs_dev, &opts, _spdk_lvs_init_cb, lvs_req);
635 
636 	return 0;
637 }
638 
639 static void
640 _spdk_lvs_rename_cb(void *cb_arg, int lvolerrno)
641 {
642 	struct spdk_lvs_req *req = cb_arg;
643 
644 	if (lvolerrno != 0) {
645 		req->lvserrno = lvolerrno;
646 	}
647 	if (req->lvserrno != 0) {
648 		SPDK_ERRLOG("Lvol store rename operation failed\n");
649 		/* Lvs renaming failed, so we should 'clear' new_name.
650 		 * Otherwise it could cause a failure on the next attepmt to change the name to 'new_name'  */
651 		snprintf(req->lvol_store->new_name,
652 			 sizeof(req->lvol_store->new_name),
653 			 "%s", req->lvol_store->name);
654 	} else {
655 		/* Update lvs name with new_name */
656 		snprintf(req->lvol_store->name,
657 			 sizeof(req->lvol_store->name),
658 			 "%s", req->lvol_store->new_name);
659 	}
660 
661 	req->cb_fn(req->cb_arg, req->lvserrno);
662 	free(req);
663 }
664 
665 static void
666 _spdk_lvs_rename_sync_cb(void *cb_arg, int lvolerrno)
667 {
668 	struct spdk_lvs_req *req = cb_arg;
669 	struct spdk_blob *blob = req->lvol_store->super_blob;
670 
671 	if (lvolerrno < 0) {
672 		req->lvserrno = lvolerrno;
673 	}
674 
675 	spdk_blob_close(blob, _spdk_lvs_rename_cb, req);
676 }
677 
678 static void
679 _spdk_lvs_rename_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
680 {
681 	struct spdk_lvs_req *req = cb_arg;
682 	int rc;
683 
684 	if (lvolerrno < 0) {
685 		_spdk_lvs_rename_cb(cb_arg, lvolerrno);
686 		return;
687 	}
688 
689 	rc = spdk_blob_set_xattr(blob, "name", req->lvol_store->new_name,
690 				 strlen(req->lvol_store->new_name) + 1);
691 	if (rc < 0) {
692 		req->lvserrno = rc;
693 		_spdk_lvs_rename_sync_cb(req, rc);
694 		return;
695 	}
696 
697 	req->lvol_store->super_blob = blob;
698 
699 	spdk_blob_sync_md(blob, _spdk_lvs_rename_sync_cb, req);
700 }
701 
702 void
703 spdk_lvs_rename(struct spdk_lvol_store *lvs, const char *new_name,
704 		spdk_lvs_op_complete cb_fn, void *cb_arg)
705 {
706 	struct spdk_lvs_req *req;
707 	struct spdk_lvol_store *tmp;
708 
709 	/* Check if new name is current lvs name.
710 	 * If so, return success immediately */
711 	if (strncmp(lvs->name, new_name, SPDK_LVS_NAME_MAX) == 0) {
712 		cb_fn(cb_arg, 0);
713 		return;
714 	}
715 
716 	/* Check if new or new_name is already used in other lvs */
717 	pthread_mutex_lock(&g_lvol_stores_mutex);
718 	TAILQ_FOREACH(tmp, &g_lvol_stores, link) {
719 		if (!strncmp(new_name, tmp->name, SPDK_LVS_NAME_MAX) ||
720 		    !strncmp(new_name, tmp->new_name, SPDK_LVS_NAME_MAX)) {
721 			pthread_mutex_unlock(&g_lvol_stores_mutex);
722 			cb_fn(cb_arg, -EEXIST);
723 			return;
724 		}
725 	}
726 	pthread_mutex_unlock(&g_lvol_stores_mutex);
727 
728 	req = calloc(1, sizeof(*req));
729 	if (!req) {
730 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
731 		cb_fn(cb_arg, -ENOMEM);
732 		return;
733 	}
734 	snprintf(lvs->new_name, sizeof(lvs->new_name), "%s", new_name);
735 	req->lvol_store = lvs;
736 	req->cb_fn = cb_fn;
737 	req->cb_arg = cb_arg;
738 
739 	spdk_bs_open_blob(lvs->blobstore, lvs->super_blob_id, _spdk_lvs_rename_open_cb, req);
740 }
741 
742 static void
743 _lvs_unload_cb(void *cb_arg, int lvserrno)
744 {
745 	struct spdk_lvs_req *lvs_req = cb_arg;
746 
747 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol store unloaded\n");
748 	assert(lvs_req->cb_fn != NULL);
749 	lvs_req->cb_fn(lvs_req->cb_arg, lvserrno);
750 	free(lvs_req);
751 }
752 
753 int
754 spdk_lvs_unload(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn,
755 		void *cb_arg)
756 {
757 	struct spdk_lvs_req *lvs_req;
758 	struct spdk_lvol *lvol, *tmp;
759 
760 	if (lvs == NULL) {
761 		SPDK_ERRLOG("Lvol store is NULL\n");
762 		return -ENODEV;
763 	}
764 
765 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
766 		if (lvol->action_in_progress == true) {
767 			SPDK_ERRLOG("Cannot unload lvol store - operations on lvols pending\n");
768 			cb_fn(cb_arg, -EBUSY);
769 			return -EBUSY;
770 		} else if (lvol->ref_count != 0) {
771 			SPDK_ERRLOG("Lvols still open on lvol store\n");
772 			cb_fn(cb_arg, -EBUSY);
773 			return -EBUSY;
774 		}
775 	}
776 
777 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
778 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
779 		_spdk_lvol_free(lvol);
780 	}
781 
782 	lvs_req = calloc(1, sizeof(*lvs_req));
783 	if (!lvs_req) {
784 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
785 		return -ENOMEM;
786 	}
787 
788 	lvs_req->cb_fn = cb_fn;
789 	lvs_req->cb_arg = cb_arg;
790 
791 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Unloading lvol store\n");
792 	spdk_bs_unload(lvs->blobstore, _lvs_unload_cb, lvs_req);
793 	_spdk_lvs_free(lvs);
794 
795 	return 0;
796 }
797 
798 static void
799 _lvs_destroy_cb(void *cb_arg, int lvserrno)
800 {
801 	struct spdk_lvs_destroy_req *lvs_req = cb_arg;
802 
803 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol store destroyed\n");
804 	assert(lvs_req->cb_fn != NULL);
805 	lvs_req->cb_fn(lvs_req->cb_arg, lvserrno);
806 	free(lvs_req);
807 }
808 
809 static void
810 _lvs_destroy_super_cb(void *cb_arg, int bserrno)
811 {
812 	struct spdk_lvs_destroy_req *lvs_req = cb_arg;
813 	struct spdk_lvol_store *lvs = lvs_req->lvs;
814 
815 	assert(lvs != NULL);
816 
817 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Destroying lvol store\n");
818 	spdk_bs_destroy(lvs->blobstore, _lvs_destroy_cb, lvs_req);
819 	_spdk_lvs_free(lvs);
820 }
821 
822 int
823 spdk_lvs_destroy(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn,
824 		 void *cb_arg)
825 {
826 	struct spdk_lvs_destroy_req *lvs_req;
827 	struct spdk_lvol *iter_lvol, *tmp;
828 
829 	if (lvs == NULL) {
830 		SPDK_ERRLOG("Lvol store is NULL\n");
831 		return -ENODEV;
832 	}
833 
834 	TAILQ_FOREACH_SAFE(iter_lvol, &lvs->lvols, link, tmp) {
835 		if (iter_lvol->action_in_progress == true) {
836 			SPDK_ERRLOG("Cannot destroy lvol store - operations on lvols pending\n");
837 			cb_fn(cb_arg, -EBUSY);
838 			return -EBUSY;
839 		} else if (iter_lvol->ref_count != 0) {
840 			SPDK_ERRLOG("Lvols still open on lvol store\n");
841 			cb_fn(cb_arg, -EBUSY);
842 			return -EBUSY;
843 		}
844 	}
845 
846 	TAILQ_FOREACH_SAFE(iter_lvol, &lvs->lvols, link, tmp) {
847 		free(iter_lvol);
848 	}
849 
850 	lvs_req = calloc(1, sizeof(*lvs_req));
851 	if (!lvs_req) {
852 		SPDK_ERRLOG("Cannot alloc memory for lvol store request pointer\n");
853 		return -ENOMEM;
854 	}
855 
856 	lvs_req->cb_fn = cb_fn;
857 	lvs_req->cb_arg = cb_arg;
858 	lvs_req->lvs = lvs;
859 
860 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Deleting super blob\n");
861 	spdk_bs_delete_blob(lvs->blobstore, lvs->super_blob_id, _lvs_destroy_super_cb, lvs_req);
862 
863 	return 0;
864 }
865 
866 static void
867 _spdk_lvol_close_blob_cb(void *cb_arg, int lvolerrno)
868 {
869 	struct spdk_lvol_req *req = cb_arg;
870 	struct spdk_lvol *lvol = req->lvol;
871 
872 	if (lvolerrno < 0) {
873 		SPDK_ERRLOG("Could not close blob on lvol\n");
874 		_spdk_lvol_free(lvol);
875 		goto end;
876 	}
877 
878 	lvol->ref_count--;
879 	lvol->action_in_progress = false;
880 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol %s closed\n", lvol->unique_id);
881 
882 end:
883 	req->cb_fn(req->cb_arg, lvolerrno);
884 	free(req);
885 }
886 
887 bool
888 spdk_lvol_deletable(struct spdk_lvol *lvol)
889 {
890 	size_t count;
891 
892 	spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
893 	return (count == 0);
894 }
895 
896 static void
897 _spdk_lvol_delete_blob_cb(void *cb_arg, int lvolerrno)
898 {
899 	struct spdk_lvol_req *req = cb_arg;
900 	struct spdk_lvol *lvol = req->lvol;
901 
902 	if (lvolerrno < 0) {
903 		SPDK_ERRLOG("Could not delete blob on lvol\n");
904 		goto end;
905 	}
906 
907 	TAILQ_REMOVE(&lvol->lvol_store->lvols, lvol, link);
908 	SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol %s deleted\n", lvol->unique_id);
909 
910 end:
911 	_spdk_lvol_free(lvol);
912 	req->cb_fn(req->cb_arg, lvolerrno);
913 	free(req);
914 }
915 
916 static void
917 _spdk_lvol_create_open_cb(void *cb_arg, struct spdk_blob *blob, int lvolerrno)
918 {
919 	struct spdk_lvol_with_handle_req *req = cb_arg;
920 	spdk_blob_id blob_id = spdk_blob_get_id(blob);
921 	struct spdk_lvol *lvol = req->lvol;
922 
923 	TAILQ_REMOVE(&req->lvol->lvol_store->pending_lvols, req->lvol, link);
924 
925 	if (lvolerrno < 0) {
926 		free(lvol);
927 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
928 		free(req);
929 		return;
930 	}
931 
932 	lvol->blob = blob;
933 	lvol->blob_id = blob_id;
934 
935 	TAILQ_INSERT_TAIL(&lvol->lvol_store->lvols, lvol, link);
936 
937 	snprintf(lvol->unique_id, sizeof(lvol->unique_id), "%s", lvol->uuid_str);
938 	lvol->ref_count++;
939 
940 	assert(req->cb_fn != NULL);
941 	req->cb_fn(req->cb_arg, req->lvol, lvolerrno);
942 	free(req);
943 }
944 
945 static void
946 _spdk_lvol_create_cb(void *cb_arg, spdk_blob_id blobid, int lvolerrno)
947 {
948 	struct spdk_lvol_with_handle_req *req = cb_arg;
949 	struct spdk_blob_store *bs;
950 	struct spdk_blob_open_opts opts;
951 
952 	if (lvolerrno < 0) {
953 		TAILQ_REMOVE(&req->lvol->lvol_store->pending_lvols, req->lvol, link);
954 		free(req->lvol);
955 		assert(req->cb_fn != NULL);
956 		req->cb_fn(req->cb_arg, NULL, lvolerrno);
957 		free(req);
958 		return;
959 	}
960 
961 	spdk_blob_open_opts_init(&opts);
962 	opts.clear_method = req->lvol->clear_method;
963 	bs = req->lvol->lvol_store->blobstore;
964 
965 	spdk_bs_open_blob_ext(bs, blobid, &opts, _spdk_lvol_create_open_cb, req);
966 }
967 
968 static void
969 spdk_lvol_get_xattr_value(void *xattr_ctx, const char *name,
970 			  const void **value, size_t *value_len)
971 {
972 	struct spdk_lvol *lvol = xattr_ctx;
973 
974 	if (!strcmp(LVOL_NAME, name)) {
975 		*value = lvol->name;
976 		*value_len = SPDK_LVOL_NAME_MAX;
977 	} else if (!strcmp("uuid", name)) {
978 		*value = lvol->uuid_str;
979 		*value_len = sizeof(lvol->uuid_str);
980 	} else if (!strcmp("clear_method", name)) {
981 		*value = &lvol->clear_method;
982 		*value_len = sizeof(int);
983 	}
984 }
985 
986 static int
987 _spdk_lvs_verify_lvol_name(struct spdk_lvol_store *lvs, const char *name)
988 {
989 	struct spdk_lvol *tmp;
990 
991 	if (name == NULL || strnlen(name, SPDK_LVOL_NAME_MAX) == 0) {
992 		SPDK_INFOLOG(SPDK_LOG_LVOL, "lvol name not provided.\n");
993 		return -EINVAL;
994 	}
995 
996 	if (strnlen(name, SPDK_LVOL_NAME_MAX) == SPDK_LVOL_NAME_MAX) {
997 		SPDK_ERRLOG("Name has no null terminator.\n");
998 		return -EINVAL;
999 	}
1000 
1001 	TAILQ_FOREACH(tmp, &lvs->lvols, link) {
1002 		if (!strncmp(name, tmp->name, SPDK_LVOL_NAME_MAX)) {
1003 			SPDK_ERRLOG("lvol with name %s already exists\n", name);
1004 			return -EEXIST;
1005 		}
1006 	}
1007 
1008 	TAILQ_FOREACH(tmp, &lvs->pending_lvols, link) {
1009 		if (!strncmp(name, tmp->name, SPDK_LVOL_NAME_MAX)) {
1010 			SPDK_ERRLOG("lvol with name %s is being already created\n", name);
1011 			return -EEXIST;
1012 		}
1013 	}
1014 
1015 	return 0;
1016 }
1017 
1018 int
1019 spdk_lvol_create(struct spdk_lvol_store *lvs, const char *name, uint64_t sz,
1020 		 bool thin_provision, enum lvol_clear_method clear_method, spdk_lvol_op_with_handle_complete cb_fn,
1021 		 void *cb_arg)
1022 {
1023 	struct spdk_lvol_with_handle_req *req;
1024 	struct spdk_blob_store *bs;
1025 	struct spdk_lvol *lvol;
1026 	struct spdk_blob_opts opts;
1027 	uint64_t num_clusters;
1028 	char *xattr_names[] = {LVOL_NAME, "uuid", "clear_method"};
1029 	int rc;
1030 
1031 	if (lvs == NULL) {
1032 		SPDK_ERRLOG("lvol store does not exist\n");
1033 		return -EINVAL;
1034 	}
1035 
1036 	rc = _spdk_lvs_verify_lvol_name(lvs, name);
1037 	if (rc < 0) {
1038 		return rc;
1039 	}
1040 
1041 	bs = lvs->blobstore;
1042 
1043 	req = calloc(1, sizeof(*req));
1044 	if (!req) {
1045 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1046 		return -ENOMEM;
1047 	}
1048 	req->cb_fn = cb_fn;
1049 	req->cb_arg = cb_arg;
1050 
1051 	lvol = calloc(1, sizeof(*lvol));
1052 	if (!lvol) {
1053 		free(req);
1054 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1055 		return -ENOMEM;
1056 	}
1057 	lvol->lvol_store = lvs;
1058 	num_clusters = spdk_divide_round_up(sz, spdk_bs_get_cluster_size(bs));
1059 	lvol->thin_provision = thin_provision;
1060 	lvol->clear_method = (enum blob_clear_method)clear_method;
1061 	snprintf(lvol->name, sizeof(lvol->name), "%s", name);
1062 	TAILQ_INSERT_TAIL(&lvol->lvol_store->pending_lvols, lvol, link);
1063 	spdk_uuid_generate(&lvol->uuid);
1064 	spdk_uuid_fmt_lower(lvol->uuid_str, sizeof(lvol->uuid_str), &lvol->uuid);
1065 	req->lvol = lvol;
1066 
1067 	spdk_blob_opts_init(&opts);
1068 	opts.thin_provision = thin_provision;
1069 	opts.num_clusters = num_clusters;
1070 	opts.xattrs.count = SPDK_COUNTOF(xattr_names);
1071 	opts.xattrs.names = xattr_names;
1072 	opts.xattrs.ctx = lvol;
1073 	opts.xattrs.get_value = spdk_lvol_get_xattr_value;
1074 
1075 	spdk_bs_create_blob_ext(lvs->blobstore, &opts, _spdk_lvol_create_cb, req);
1076 
1077 	return 0;
1078 }
1079 
1080 void
1081 spdk_lvol_create_snapshot(struct spdk_lvol *origlvol, const char *snapshot_name,
1082 			  spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1083 {
1084 	struct spdk_lvol_store *lvs;
1085 	struct spdk_lvol *newlvol;
1086 	struct spdk_blob *origblob;
1087 	struct spdk_lvol_with_handle_req *req;
1088 	struct spdk_blob_xattr_opts snapshot_xattrs;
1089 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1090 	int rc;
1091 
1092 	if (origlvol == NULL) {
1093 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol not provided.\n");
1094 		cb_fn(cb_arg, NULL, -EINVAL);
1095 		return;
1096 	}
1097 
1098 	origblob = origlvol->blob;
1099 	lvs = origlvol->lvol_store;
1100 	if (lvs == NULL) {
1101 		SPDK_ERRLOG("lvol store does not exist\n");
1102 		cb_fn(cb_arg, NULL, -EINVAL);
1103 		return;
1104 	}
1105 
1106 	rc = _spdk_lvs_verify_lvol_name(lvs, snapshot_name);
1107 	if (rc < 0) {
1108 		cb_fn(cb_arg, NULL, rc);
1109 		return;
1110 	}
1111 
1112 	req = calloc(1, sizeof(*req));
1113 	if (!req) {
1114 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1115 		cb_fn(cb_arg, NULL, -ENOMEM);
1116 		return;
1117 	}
1118 
1119 	newlvol = calloc(1, sizeof(*newlvol));
1120 	if (!newlvol) {
1121 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1122 		free(req);
1123 		cb_fn(cb_arg, NULL, -ENOMEM);
1124 		return;
1125 	}
1126 
1127 	newlvol->lvol_store = origlvol->lvol_store;
1128 	snprintf(newlvol->name, sizeof(newlvol->name), "%s", snapshot_name);
1129 	TAILQ_INSERT_TAIL(&newlvol->lvol_store->pending_lvols, newlvol, link);
1130 	spdk_uuid_generate(&newlvol->uuid);
1131 	spdk_uuid_fmt_lower(newlvol->uuid_str, sizeof(newlvol->uuid_str), &newlvol->uuid);
1132 	snapshot_xattrs.count = SPDK_COUNTOF(xattr_names);
1133 	snapshot_xattrs.ctx = newlvol;
1134 	snapshot_xattrs.names = xattr_names;
1135 	snapshot_xattrs.get_value = spdk_lvol_get_xattr_value;
1136 	req->lvol = newlvol;
1137 	req->cb_fn = cb_fn;
1138 	req->cb_arg = cb_arg;
1139 
1140 	spdk_bs_create_snapshot(lvs->blobstore, spdk_blob_get_id(origblob), &snapshot_xattrs,
1141 				_spdk_lvol_create_cb, req);
1142 }
1143 
1144 void
1145 spdk_lvol_create_clone(struct spdk_lvol *origlvol, const char *clone_name,
1146 		       spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1147 {
1148 	struct spdk_lvol *newlvol;
1149 	struct spdk_lvol_with_handle_req *req;
1150 	struct spdk_lvol_store *lvs;
1151 	struct spdk_blob *origblob;
1152 	struct spdk_blob_xattr_opts clone_xattrs;
1153 	char *xattr_names[] = {LVOL_NAME, "uuid"};
1154 	int rc;
1155 
1156 	if (origlvol == NULL) {
1157 		SPDK_INFOLOG(SPDK_LOG_LVOL, "Lvol not provided.\n");
1158 		cb_fn(cb_arg, NULL, -EINVAL);
1159 		return;
1160 	}
1161 
1162 	origblob = origlvol->blob;
1163 	lvs = origlvol->lvol_store;
1164 	if (lvs == NULL) {
1165 		SPDK_ERRLOG("lvol store does not exist\n");
1166 		cb_fn(cb_arg, NULL, -EINVAL);
1167 		return;
1168 	}
1169 
1170 	rc = _spdk_lvs_verify_lvol_name(lvs, clone_name);
1171 	if (rc < 0) {
1172 		cb_fn(cb_arg, NULL, rc);
1173 		return;
1174 	}
1175 
1176 	req = calloc(1, sizeof(*req));
1177 	if (!req) {
1178 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1179 		cb_fn(cb_arg, NULL, -ENOMEM);
1180 		return;
1181 	}
1182 
1183 	newlvol = calloc(1, sizeof(*newlvol));
1184 	if (!newlvol) {
1185 		SPDK_ERRLOG("Cannot alloc memory for lvol base pointer\n");
1186 		free(req);
1187 		cb_fn(cb_arg, NULL, -ENOMEM);
1188 		return;
1189 	}
1190 
1191 	newlvol->lvol_store = lvs;
1192 	snprintf(newlvol->name, sizeof(newlvol->name), "%s", clone_name);
1193 	TAILQ_INSERT_TAIL(&newlvol->lvol_store->pending_lvols, newlvol, link);
1194 	spdk_uuid_generate(&newlvol->uuid);
1195 	spdk_uuid_fmt_lower(newlvol->uuid_str, sizeof(newlvol->uuid_str), &newlvol->uuid);
1196 	clone_xattrs.count = SPDK_COUNTOF(xattr_names);
1197 	clone_xattrs.ctx = newlvol;
1198 	clone_xattrs.names = xattr_names;
1199 	clone_xattrs.get_value = spdk_lvol_get_xattr_value;
1200 	req->lvol = newlvol;
1201 	req->cb_fn = cb_fn;
1202 	req->cb_arg = cb_arg;
1203 
1204 	spdk_bs_create_clone(lvs->blobstore, spdk_blob_get_id(origblob), &clone_xattrs,
1205 			     _spdk_lvol_create_cb,
1206 			     req);
1207 }
1208 
1209 static void
1210 _spdk_lvol_resize_done(void *cb_arg, int lvolerrno)
1211 {
1212 	struct spdk_lvol_req *req = cb_arg;
1213 
1214 	req->cb_fn(req->cb_arg,  lvolerrno);
1215 	free(req);
1216 }
1217 
1218 static void
1219 _spdk_lvol_blob_resize_cb(void *cb_arg, int bserrno)
1220 {
1221 	struct spdk_lvol_req *req = cb_arg;
1222 	struct spdk_lvol *lvol = req->lvol;
1223 
1224 	if (bserrno != 0) {
1225 		req->cb_fn(req->cb_arg, bserrno);
1226 		free(req);
1227 		return;
1228 	}
1229 
1230 	spdk_blob_sync_md(lvol->blob, _spdk_lvol_resize_done, req);
1231 }
1232 
1233 void
1234 spdk_lvol_resize(struct spdk_lvol *lvol, uint64_t sz,
1235 		 spdk_lvol_op_complete cb_fn, void *cb_arg)
1236 {
1237 	struct spdk_blob *blob = lvol->blob;
1238 	struct spdk_lvol_store *lvs = lvol->lvol_store;
1239 	struct spdk_lvol_req *req;
1240 	uint64_t new_clusters = spdk_divide_round_up(sz, spdk_bs_get_cluster_size(lvs->blobstore));
1241 
1242 	req = calloc(1, sizeof(*req));
1243 	if (!req) {
1244 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1245 		cb_fn(cb_arg, -ENOMEM);
1246 		return;
1247 	}
1248 	req->cb_fn = cb_fn;
1249 	req->cb_arg = cb_arg;
1250 	req->lvol = lvol;
1251 
1252 	spdk_blob_resize(blob, new_clusters, _spdk_lvol_blob_resize_cb, req);
1253 }
1254 
1255 static void
1256 _spdk_lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
1257 {
1258 	struct spdk_lvol_req *req = cb_arg;
1259 
1260 	req->cb_fn(req->cb_arg, lvolerrno);
1261 	free(req);
1262 }
1263 
1264 void
1265 spdk_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1266 {
1267 	struct spdk_lvol_req *req;
1268 
1269 	req = calloc(1, sizeof(*req));
1270 	if (!req) {
1271 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1272 		cb_fn(cb_arg, -ENOMEM);
1273 		return;
1274 	}
1275 	req->cb_fn = cb_fn;
1276 	req->cb_arg = cb_arg;
1277 
1278 	spdk_blob_set_read_only(lvol->blob);
1279 	spdk_blob_sync_md(lvol->blob, _spdk_lvol_set_read_only_cb, req);
1280 }
1281 
1282 static void
1283 _spdk_lvol_rename_cb(void *cb_arg, int lvolerrno)
1284 {
1285 	struct spdk_lvol_req *req = cb_arg;
1286 
1287 	if (lvolerrno != 0) {
1288 		SPDK_ERRLOG("Lvol rename operation failed\n");
1289 	} else {
1290 		snprintf(req->lvol->name, sizeof(req->lvol->name), "%s", req->name);
1291 	}
1292 
1293 	req->cb_fn(req->cb_arg, lvolerrno);
1294 	free(req);
1295 }
1296 
1297 void
1298 spdk_lvol_rename(struct spdk_lvol *lvol, const char *new_name,
1299 		 spdk_lvol_op_complete cb_fn, void *cb_arg)
1300 {
1301 	struct spdk_lvol *tmp;
1302 	struct spdk_blob *blob = lvol->blob;
1303 	struct spdk_lvol_req *req;
1304 	int rc;
1305 
1306 	/* Check if new name is current lvol name.
1307 	 * If so, return success immediately */
1308 	if (strncmp(lvol->name, new_name, SPDK_LVOL_NAME_MAX) == 0) {
1309 		cb_fn(cb_arg, 0);
1310 		return;
1311 	}
1312 
1313 	/* Check if lvol with 'new_name' already exists in lvolstore */
1314 	TAILQ_FOREACH(tmp, &lvol->lvol_store->lvols, link) {
1315 		if (strncmp(tmp->name, new_name, SPDK_LVOL_NAME_MAX) == 0) {
1316 			SPDK_ERRLOG("Lvol %s already exists in lvol store %s\n", new_name, lvol->lvol_store->name);
1317 			cb_fn(cb_arg, -EEXIST);
1318 			return;
1319 		}
1320 	}
1321 
1322 	req = calloc(1, sizeof(*req));
1323 	if (!req) {
1324 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1325 		cb_fn(cb_arg, -ENOMEM);
1326 		return;
1327 	}
1328 	req->cb_fn = cb_fn;
1329 	req->cb_arg = cb_arg;
1330 	req->lvol = lvol;
1331 	snprintf(req->name, sizeof(req->name), "%s", new_name);
1332 
1333 	rc = spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1);
1334 	if (rc < 0) {
1335 		free(req);
1336 		cb_fn(cb_arg, rc);
1337 		return;
1338 	}
1339 
1340 	spdk_blob_sync_md(blob, _spdk_lvol_rename_cb, req);
1341 }
1342 
1343 void
1344 spdk_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1345 {
1346 	struct spdk_lvol_req *req;
1347 	struct spdk_blob_store *bs;
1348 
1349 	assert(cb_fn != NULL);
1350 
1351 	if (lvol == NULL) {
1352 		SPDK_ERRLOG("lvol does not exist\n");
1353 		cb_fn(cb_arg, -ENODEV);
1354 		return;
1355 	}
1356 
1357 	if (lvol->ref_count != 0) {
1358 		SPDK_ERRLOG("Cannot destroy lvol %s because it is still open\n", lvol->unique_id);
1359 		cb_fn(cb_arg, -EBUSY);
1360 		return;
1361 	}
1362 
1363 	lvol->action_in_progress = true;
1364 
1365 	req = calloc(1, sizeof(*req));
1366 	if (!req) {
1367 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1368 		cb_fn(cb_arg, -ENOMEM);
1369 		return;
1370 	}
1371 
1372 	req->cb_fn = cb_fn;
1373 	req->cb_arg = cb_arg;
1374 	req->lvol = lvol;
1375 	bs = lvol->lvol_store->blobstore;
1376 
1377 	spdk_bs_delete_blob(bs, lvol->blob_id, _spdk_lvol_delete_blob_cb, req);
1378 }
1379 
1380 void
1381 spdk_lvol_close(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1382 {
1383 	struct spdk_lvol_req *req;
1384 
1385 	assert(cb_fn != NULL);
1386 
1387 	if (lvol == NULL) {
1388 		SPDK_ERRLOG("lvol does not exist\n");
1389 		cb_fn(cb_arg, -ENODEV);
1390 		return;
1391 	}
1392 
1393 	if (lvol->ref_count > 1) {
1394 		lvol->ref_count--;
1395 		cb_fn(cb_arg, 0);
1396 		return;
1397 	} else if (lvol->ref_count == 0) {
1398 		cb_fn(cb_arg, -EINVAL);
1399 		return;
1400 	}
1401 
1402 	lvol->action_in_progress = true;
1403 
1404 	req = calloc(1, sizeof(*req));
1405 	if (!req) {
1406 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1407 		cb_fn(cb_arg, -ENOMEM);
1408 		return;
1409 	}
1410 
1411 	req->cb_fn = cb_fn;
1412 	req->cb_arg = cb_arg;
1413 	req->lvol = lvol;
1414 
1415 	spdk_blob_close(lvol->blob, _spdk_lvol_close_blob_cb, req);
1416 }
1417 
1418 struct spdk_io_channel *
1419 spdk_lvol_get_io_channel(struct spdk_lvol *lvol)
1420 {
1421 	return spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1422 }
1423 
1424 static void
1425 _spdk_lvol_inflate_cb(void *cb_arg, int lvolerrno)
1426 {
1427 	struct spdk_lvol_req *req = cb_arg;
1428 
1429 	spdk_bs_free_io_channel(req->channel);
1430 
1431 	if (lvolerrno < 0) {
1432 		SPDK_ERRLOG("Could not inflate lvol\n");
1433 	}
1434 
1435 	req->cb_fn(req->cb_arg, lvolerrno);
1436 	free(req);
1437 }
1438 
1439 void
1440 spdk_lvol_inflate(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1441 {
1442 	struct spdk_lvol_req *req;
1443 	spdk_blob_id blob_id;
1444 
1445 	assert(cb_fn != NULL);
1446 
1447 	if (lvol == NULL) {
1448 		SPDK_ERRLOG("Lvol does not exist\n");
1449 		cb_fn(cb_arg, -ENODEV);
1450 		return;
1451 	}
1452 
1453 	req = calloc(1, sizeof(*req));
1454 	if (!req) {
1455 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1456 		cb_fn(cb_arg, -ENOMEM);
1457 		return;
1458 	}
1459 
1460 	req->cb_fn = cb_fn;
1461 	req->cb_arg = cb_arg;
1462 	req->channel = spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1463 	if (req->channel == NULL) {
1464 		SPDK_ERRLOG("Cannot alloc io channel for lvol inflate request\n");
1465 		free(req);
1466 		cb_fn(cb_arg, -ENOMEM);
1467 		return;
1468 	}
1469 
1470 	blob_id = spdk_blob_get_id(lvol->blob);
1471 	spdk_bs_inflate_blob(lvol->lvol_store->blobstore, req->channel, blob_id, _spdk_lvol_inflate_cb,
1472 			     req);
1473 }
1474 
1475 void
1476 spdk_lvol_decouple_parent(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1477 {
1478 	struct spdk_lvol_req *req;
1479 	spdk_blob_id blob_id;
1480 
1481 	assert(cb_fn != NULL);
1482 
1483 	if (lvol == NULL) {
1484 		SPDK_ERRLOG("Lvol does not exist\n");
1485 		cb_fn(cb_arg, -ENODEV);
1486 		return;
1487 	}
1488 
1489 	req = calloc(1, sizeof(*req));
1490 	if (!req) {
1491 		SPDK_ERRLOG("Cannot alloc memory for lvol request pointer\n");
1492 		cb_fn(cb_arg, -ENOMEM);
1493 		return;
1494 	}
1495 
1496 	req->cb_fn = cb_fn;
1497 	req->cb_arg = cb_arg;
1498 	req->channel = spdk_bs_alloc_io_channel(lvol->lvol_store->blobstore);
1499 	if (req->channel == NULL) {
1500 		SPDK_ERRLOG("Cannot alloc io channel for lvol inflate request\n");
1501 		free(req);
1502 		cb_fn(cb_arg, -ENOMEM);
1503 		return;
1504 	}
1505 
1506 	blob_id = spdk_blob_get_id(lvol->blob);
1507 	spdk_bs_blob_decouple_parent(lvol->lvol_store->blobstore, req->channel, blob_id,
1508 				     _spdk_lvol_inflate_cb, req);
1509 }
1510