xref: /spdk/module/bdev/lvol/vbdev_lvol.c (revision 9889ab2dc80e40dae92dcef361d53dcba722043d)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/blob_bdev.h"
35 #include "spdk/rpc.h"
36 #include "spdk/bdev_module.h"
37 #include "spdk_internal/log.h"
38 #include "spdk/string.h"
39 #include "spdk/uuid.h"
40 
41 #include "vbdev_lvol.h"
42 
43 static TAILQ_HEAD(, lvol_store_bdev) g_spdk_lvol_pairs = TAILQ_HEAD_INITIALIZER(
44 			g_spdk_lvol_pairs);
45 
46 static int vbdev_lvs_init(void);
47 static int vbdev_lvs_get_ctx_size(void);
48 static void vbdev_lvs_examine(struct spdk_bdev *bdev);
49 
50 static struct spdk_bdev_module g_lvol_if = {
51 	.name = "lvol",
52 	.module_init = vbdev_lvs_init,
53 	.examine_disk = vbdev_lvs_examine,
54 	.get_ctx_size = vbdev_lvs_get_ctx_size,
55 
56 };
57 
58 SPDK_BDEV_MODULE_REGISTER(lvol, &g_lvol_if)
59 
60 struct lvol_store_bdev *
61 vbdev_get_lvs_bdev_by_lvs(struct spdk_lvol_store *lvs_orig)
62 {
63 	struct spdk_lvol_store *lvs = NULL;
64 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
65 
66 	while (lvs_bdev != NULL) {
67 		lvs = lvs_bdev->lvs;
68 		if (lvs == lvs_orig) {
69 			if (lvs_bdev->req != NULL) {
70 				/* We do not allow access to lvs that are being destroyed */
71 				return NULL;
72 			} else {
73 				return lvs_bdev;
74 			}
75 		}
76 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
77 	}
78 
79 	return NULL;
80 }
81 
82 static int
83 _vbdev_lvol_change_bdev_alias(struct spdk_lvol *lvol, const char *new_lvol_name)
84 {
85 	struct spdk_bdev_alias *tmp;
86 	char *old_alias;
87 	char *alias;
88 	int rc;
89 	int alias_number = 0;
90 
91 	/* bdev representing lvols have only one alias,
92 	 * while we changed lvs name earlier, we have to iterate alias list to get one,
93 	 * and check if there is only one alias */
94 
95 	TAILQ_FOREACH(tmp, &lvol->bdev->aliases, tailq) {
96 		if (++alias_number > 1) {
97 			SPDK_ERRLOG("There is more than 1 alias in bdev %s\n", lvol->bdev->name);
98 			return -EINVAL;
99 		}
100 
101 		old_alias = tmp->alias;
102 	}
103 
104 	if (alias_number == 0) {
105 		SPDK_ERRLOG("There are no aliases in bdev %s\n", lvol->bdev->name);
106 		return -EINVAL;
107 	}
108 
109 	alias = spdk_sprintf_alloc("%s/%s", lvol->lvol_store->name, new_lvol_name);
110 	if (alias == NULL) {
111 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
112 		return -ENOMEM;
113 	}
114 
115 	rc = spdk_bdev_alias_add(lvol->bdev, alias);
116 	if (rc != 0) {
117 		SPDK_ERRLOG("cannot add alias '%s'\n", alias);
118 		free(alias);
119 		return rc;
120 	}
121 	free(alias);
122 
123 	rc = spdk_bdev_alias_del(lvol->bdev, old_alias);
124 	if (rc != 0) {
125 		SPDK_ERRLOG("cannot remove alias '%s'\n", old_alias);
126 		return rc;
127 	}
128 
129 	return 0;
130 }
131 
132 static struct lvol_store_bdev *
133 vbdev_get_lvs_bdev_by_bdev(struct spdk_bdev *bdev_orig)
134 {
135 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
136 
137 	while (lvs_bdev != NULL) {
138 		if (lvs_bdev->bdev == bdev_orig) {
139 			if (lvs_bdev->req != NULL) {
140 				/* We do not allow access to lvs that are being destroyed */
141 				return NULL;
142 			} else {
143 				return lvs_bdev;
144 			}
145 		}
146 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
147 	}
148 
149 	return NULL;
150 }
151 
152 static void
153 vbdev_lvs_hotremove_cb(void *ctx)
154 {
155 	struct spdk_bdev *bdev = ctx;
156 	struct lvol_store_bdev *lvs_bdev;
157 
158 	lvs_bdev = vbdev_get_lvs_bdev_by_bdev(bdev);
159 	if (lvs_bdev != NULL) {
160 		vbdev_lvs_unload(lvs_bdev->lvs, NULL, NULL);
161 	}
162 }
163 
164 static void
165 _vbdev_lvs_create_cb(void *cb_arg, struct spdk_lvol_store *lvs, int lvserrno)
166 {
167 	struct spdk_lvs_with_handle_req *req = cb_arg;
168 	struct lvol_store_bdev *lvs_bdev;
169 	struct spdk_bdev *bdev = req->base_bdev;
170 	struct spdk_bs_dev *bs_dev = req->bs_dev;
171 
172 	if (lvserrno != 0) {
173 		assert(lvs == NULL);
174 		SPDK_ERRLOG("Cannot create lvol store bdev\n");
175 		goto end;
176 	}
177 
178 	lvserrno = spdk_bs_bdev_claim(bs_dev, &g_lvol_if);
179 	if (lvserrno != 0) {
180 		SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL, "Lvol store base bdev already claimed by another bdev\n");
181 		req->bs_dev->destroy(req->bs_dev);
182 		goto end;
183 	}
184 
185 	assert(lvs != NULL);
186 
187 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
188 	if (!lvs_bdev) {
189 		lvserrno = -ENOMEM;
190 		goto end;
191 	}
192 	lvs_bdev->lvs = lvs;
193 	lvs_bdev->bdev = bdev;
194 	lvs_bdev->req = NULL;
195 
196 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
197 	SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL, "Lvol store bdev inserted\n");
198 
199 end:
200 	req->cb_fn(req->cb_arg, lvs, lvserrno);
201 	free(req);
202 
203 	return;
204 }
205 
206 int
207 vbdev_lvs_create(struct spdk_bdev *base_bdev, const char *name, uint32_t cluster_sz,
208 		 enum lvs_clear_method clear_method, spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
209 {
210 	struct spdk_bs_dev *bs_dev;
211 	struct spdk_lvs_with_handle_req *lvs_req;
212 	struct spdk_lvs_opts opts;
213 	int rc;
214 	int len;
215 
216 	if (base_bdev == NULL) {
217 		SPDK_ERRLOG("Bdev does not exist\n");
218 		return -ENODEV;
219 	}
220 
221 	spdk_lvs_opts_init(&opts);
222 	if (cluster_sz != 0) {
223 		opts.cluster_sz = cluster_sz;
224 	}
225 
226 	if (clear_method != 0) {
227 		opts.clear_method = clear_method;
228 	}
229 
230 	if (name == NULL) {
231 		SPDK_ERRLOG("missing name param\n");
232 		return -EINVAL;
233 	}
234 
235 	len = strnlen(name, SPDK_LVS_NAME_MAX);
236 
237 	if (len == 0 || len == SPDK_LVS_NAME_MAX) {
238 		SPDK_ERRLOG("name must be between 1 and %d characters\n", SPDK_LVS_NAME_MAX - 1);
239 		return -EINVAL;
240 	}
241 	snprintf(opts.name, sizeof(opts.name), "%s", name);
242 
243 	lvs_req = calloc(1, sizeof(*lvs_req));
244 	if (!lvs_req) {
245 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
246 		return -ENOMEM;
247 	}
248 
249 	bs_dev = spdk_bdev_create_bs_dev(base_bdev, vbdev_lvs_hotremove_cb, base_bdev);
250 	if (!bs_dev) {
251 		SPDK_ERRLOG("Cannot create blobstore device\n");
252 		free(lvs_req);
253 		return -ENODEV;
254 	}
255 
256 	lvs_req->bs_dev = bs_dev;
257 	lvs_req->base_bdev = base_bdev;
258 	lvs_req->cb_fn = cb_fn;
259 	lvs_req->cb_arg = cb_arg;
260 
261 	rc = spdk_lvs_init(bs_dev, &opts, _vbdev_lvs_create_cb, lvs_req);
262 	if (rc < 0) {
263 		free(lvs_req);
264 		bs_dev->destroy(bs_dev);
265 		return rc;
266 	}
267 
268 	return 0;
269 }
270 
271 static void
272 _vbdev_lvs_rename_cb(void *cb_arg, int lvserrno)
273 {
274 	struct spdk_lvs_req *req = cb_arg;
275 	struct spdk_lvol *tmp;
276 
277 	if (lvserrno != 0) {
278 		SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL, "Lvol store rename failed\n");
279 	} else {
280 		TAILQ_FOREACH(tmp, &req->lvol_store->lvols, link) {
281 			/* We have to pass current lvol name, since only lvs name changed */
282 			_vbdev_lvol_change_bdev_alias(tmp, tmp->name);
283 		}
284 	}
285 
286 	req->cb_fn(req->cb_arg, lvserrno);
287 	free(req);
288 }
289 
290 void
291 vbdev_lvs_rename(struct spdk_lvol_store *lvs, const char *new_lvs_name,
292 		 spdk_lvs_op_complete cb_fn, void *cb_arg)
293 {
294 	struct lvol_store_bdev *lvs_bdev;
295 
296 	struct spdk_lvs_req *req;
297 
298 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
299 	if (!lvs_bdev) {
300 		SPDK_ERRLOG("No such lvol store found\n");
301 		cb_fn(cb_arg, -ENODEV);
302 		return;
303 	}
304 
305 	req = calloc(1, sizeof(*req));
306 	if (!req) {
307 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
308 		cb_fn(cb_arg, -ENOMEM);
309 		return;
310 	}
311 	req->cb_fn = cb_fn;
312 	req->cb_arg = cb_arg;
313 	req->lvol_store = lvs;
314 
315 	spdk_lvs_rename(lvs, new_lvs_name, _vbdev_lvs_rename_cb, req);
316 }
317 
318 static void
319 _vbdev_lvs_remove_cb(void *cb_arg, int lvserrno)
320 {
321 	struct lvol_store_bdev *lvs_bdev = cb_arg;
322 	struct spdk_lvs_req *req = lvs_bdev->req;
323 
324 	if (lvserrno != 0) {
325 		SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL, "Lvol store removed with error: %d.\n", lvserrno);
326 	}
327 
328 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
329 	free(lvs_bdev);
330 
331 	if (req->cb_fn != NULL) {
332 		req->cb_fn(req->cb_arg, lvserrno);
333 	}
334 	free(req);
335 }
336 
337 static void
338 _vbdev_lvs_remove_lvol_cb(void *cb_arg, int lvolerrno)
339 {
340 	struct lvol_store_bdev *lvs_bdev = cb_arg;
341 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
342 	struct spdk_lvol *lvol;
343 
344 	if (lvolerrno != 0) {
345 		SPDK_DEBUGLOG(SPDK_LOG_VBDEV_LVOL, "Lvol removed with errno %d\n", lvolerrno);
346 	}
347 
348 	if (TAILQ_EMPTY(&lvs->lvols)) {
349 		spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
350 		return;
351 	}
352 
353 	lvol = TAILQ_FIRST(&lvs->lvols);
354 	while (lvol != NULL) {
355 		if (spdk_lvol_deletable(lvol)) {
356 			vbdev_lvol_destroy(lvol, _vbdev_lvs_remove_lvol_cb, lvs_bdev);
357 			return;
358 		}
359 		lvol = TAILQ_NEXT(lvol, link);
360 	}
361 
362 	/* If no lvol is deletable, that means there is circular dependency. */
363 	SPDK_ERRLOG("Lvols left in lvs, but unable to delete.\n");
364 	assert(false);
365 }
366 
367 static void
368 _vbdev_lvs_remove_bdev_unregistered_cb(void *cb_arg, int bdeverrno)
369 {
370 	struct lvol_store_bdev *lvs_bdev = cb_arg;
371 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
372 	struct spdk_lvol *lvol, *tmp;
373 
374 	if (bdeverrno != 0) {
375 		SPDK_DEBUGLOG(SPDK_LOG_VBDEV_LVOL, "Lvol unregistered with errno %d\n", bdeverrno);
376 	}
377 
378 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
379 		if (lvol->ref_count != 0) {
380 			/* An lvol is still open, don't unload whole lvol store. */
381 			return;
382 		}
383 	}
384 	spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
385 }
386 
387 static void
388 _vbdev_lvs_remove(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg,
389 		  bool destroy)
390 {
391 	struct spdk_lvs_req *req;
392 	struct lvol_store_bdev *lvs_bdev;
393 	struct spdk_lvol *lvol, *tmp;
394 	bool all_lvols_closed = true;
395 
396 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
397 	if (!lvs_bdev) {
398 		SPDK_ERRLOG("No such lvol store found\n");
399 		if (cb_fn != NULL) {
400 			cb_fn(cb_arg, -ENODEV);
401 		}
402 		return;
403 	}
404 
405 	req = calloc(1, sizeof(*req));
406 	if (!req) {
407 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
408 		if (cb_fn != NULL) {
409 			cb_fn(cb_arg, -ENOMEM);
410 		}
411 		return;
412 	}
413 
414 	req->cb_fn = cb_fn;
415 	req->cb_arg = cb_arg;
416 	lvs_bdev->req = req;
417 
418 	TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
419 		if (lvol->ref_count != 0) {
420 			all_lvols_closed = false;
421 		}
422 	}
423 
424 	if (all_lvols_closed == true) {
425 		if (destroy) {
426 			spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
427 		} else {
428 			spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
429 		}
430 	} else {
431 		lvs->destruct = destroy;
432 		if (destroy) {
433 			_vbdev_lvs_remove_lvol_cb(lvs_bdev, 0);
434 		} else {
435 			TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
436 				spdk_bdev_unregister(lvol->bdev, _vbdev_lvs_remove_bdev_unregistered_cb, lvs_bdev);
437 			}
438 		}
439 	}
440 }
441 
442 void
443 vbdev_lvs_unload(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
444 {
445 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, false);
446 }
447 
448 void
449 vbdev_lvs_destruct(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
450 {
451 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, true);
452 }
453 
454 struct lvol_store_bdev *
455 vbdev_lvol_store_first(void)
456 {
457 	struct lvol_store_bdev *lvs_bdev;
458 
459 	lvs_bdev = TAILQ_FIRST(&g_spdk_lvol_pairs);
460 	if (lvs_bdev) {
461 		SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL, "Starting lvolstore iteration at %p\n", lvs_bdev->lvs);
462 	}
463 
464 	return lvs_bdev;
465 }
466 
467 struct lvol_store_bdev *
468 vbdev_lvol_store_next(struct lvol_store_bdev *prev)
469 {
470 	struct lvol_store_bdev *lvs_bdev;
471 
472 	if (prev == NULL) {
473 		SPDK_ERRLOG("prev argument cannot be NULL\n");
474 		return NULL;
475 	}
476 
477 	lvs_bdev = TAILQ_NEXT(prev, lvol_stores);
478 	if (lvs_bdev) {
479 		SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL, "Continuing lvolstore iteration at %p\n", lvs_bdev->lvs);
480 	}
481 
482 	return lvs_bdev;
483 }
484 
485 static struct spdk_lvol_store *
486 _vbdev_get_lvol_store_by_uuid(const struct spdk_uuid *uuid)
487 {
488 	struct spdk_lvol_store *lvs = NULL;
489 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
490 
491 	while (lvs_bdev != NULL) {
492 		lvs = lvs_bdev->lvs;
493 		if (spdk_uuid_compare(&lvs->uuid, uuid) == 0) {
494 			return lvs;
495 		}
496 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
497 	}
498 	return NULL;
499 }
500 
501 struct spdk_lvol_store *
502 vbdev_get_lvol_store_by_uuid(const char *uuid_str)
503 {
504 	struct spdk_uuid uuid;
505 
506 	if (spdk_uuid_parse(&uuid, uuid_str)) {
507 		return NULL;
508 	}
509 
510 	return _vbdev_get_lvol_store_by_uuid(&uuid);
511 }
512 
513 struct spdk_lvol_store *
514 vbdev_get_lvol_store_by_name(const char *name)
515 {
516 	struct spdk_lvol_store *lvs = NULL;
517 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
518 
519 	while (lvs_bdev != NULL) {
520 		lvs = lvs_bdev->lvs;
521 		if (strncmp(lvs->name, name, sizeof(lvs->name)) == 0) {
522 			return lvs;
523 		}
524 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
525 	}
526 	return NULL;
527 }
528 
529 struct vbdev_lvol_destroy_ctx {
530 	struct spdk_lvol *lvol;
531 	spdk_lvol_op_complete cb_fn;
532 	void *cb_arg;
533 };
534 
535 static void
536 _vbdev_lvol_unregister_cb(void *ctx, int lvolerrno)
537 {
538 	struct spdk_bdev *bdev = ctx;
539 
540 	spdk_bdev_destruct_done(bdev, lvolerrno);
541 	free(bdev);
542 }
543 
544 static int
545 vbdev_lvol_unregister(void *ctx)
546 {
547 	struct spdk_lvol *lvol = ctx;
548 
549 	assert(lvol != NULL);
550 
551 	spdk_bdev_alias_del_all(lvol->bdev);
552 	spdk_lvol_close(lvol, _vbdev_lvol_unregister_cb, lvol->bdev);
553 
554 	/* return 1 to indicate we have an operation that must finish asynchronously before the
555 	 *  lvol is closed
556 	 */
557 	return 1;
558 }
559 
560 static void
561 _vbdev_lvol_destroy_cb(void *cb_arg, int bdeverrno)
562 {
563 	struct vbdev_lvol_destroy_ctx *ctx = cb_arg;
564 	struct spdk_lvol *lvol = ctx->lvol;
565 
566 	if (bdeverrno < 0) {
567 		SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL, "Could not unregister bdev during lvol (%s) destroy\n",
568 			     lvol->unique_id);
569 		ctx->cb_fn(ctx->cb_arg, bdeverrno);
570 		free(ctx);
571 		return;
572 	}
573 
574 	spdk_lvol_destroy(lvol, ctx->cb_fn, ctx->cb_arg);
575 	free(ctx);
576 }
577 
578 void
579 vbdev_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
580 {
581 	struct vbdev_lvol_destroy_ctx *ctx;
582 	size_t count;
583 
584 	assert(lvol != NULL);
585 	assert(cb_fn != NULL);
586 
587 	/* Check if it is possible to delete lvol */
588 	spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
589 	if (count > 1) {
590 		/* throw an error */
591 		SPDK_ERRLOG("Cannot delete lvol\n");
592 		cb_fn(cb_arg, -EPERM);
593 		return;
594 	}
595 
596 	ctx = calloc(1, sizeof(*ctx));
597 	if (!ctx) {
598 		cb_fn(cb_arg, -ENOMEM);
599 		return;
600 	}
601 
602 	ctx->lvol = lvol;
603 	ctx->cb_fn = cb_fn;
604 	ctx->cb_arg = cb_arg;
605 
606 	spdk_bdev_unregister(lvol->bdev, _vbdev_lvol_destroy_cb, ctx);
607 }
608 
609 static char *
610 vbdev_lvol_find_name(struct spdk_lvol *lvol, spdk_blob_id blob_id)
611 {
612 	struct spdk_lvol_store *lvs;
613 	struct spdk_lvol *_lvol;
614 
615 	assert(lvol != NULL);
616 
617 	lvs = lvol->lvol_store;
618 
619 	assert(lvs);
620 
621 	TAILQ_FOREACH(_lvol, &lvs->lvols, link) {
622 		if (_lvol->blob_id == blob_id) {
623 			return _lvol->name;
624 		}
625 	}
626 
627 	return NULL;
628 }
629 
630 static int
631 vbdev_lvol_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
632 {
633 	struct spdk_lvol *lvol = ctx;
634 	struct lvol_store_bdev *lvs_bdev;
635 	struct spdk_bdev *bdev;
636 	struct spdk_blob *blob;
637 	char lvol_store_uuid[SPDK_UUID_STRING_LEN];
638 	spdk_blob_id *ids = NULL;
639 	size_t count, i;
640 	char *name;
641 	int rc = 0;
642 
643 	spdk_json_write_named_object_begin(w, "lvol");
644 
645 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
646 	if (!lvs_bdev) {
647 		SPDK_ERRLOG("No such lvol store found\n");
648 		rc = -ENODEV;
649 		goto end;
650 	}
651 
652 	bdev = lvs_bdev->bdev;
653 
654 	spdk_uuid_fmt_lower(lvol_store_uuid, sizeof(lvol_store_uuid), &lvol->lvol_store->uuid);
655 	spdk_json_write_named_string(w, "lvol_store_uuid", lvol_store_uuid);
656 
657 	spdk_json_write_named_string(w, "base_bdev", spdk_bdev_get_name(bdev));
658 
659 	blob = lvol->blob;
660 
661 	spdk_json_write_named_bool(w, "thin_provision", spdk_blob_is_thin_provisioned(blob));
662 
663 	spdk_json_write_named_bool(w, "snapshot", spdk_blob_is_snapshot(blob));
664 
665 	spdk_json_write_named_bool(w, "clone", spdk_blob_is_clone(blob));
666 
667 	if (spdk_blob_is_clone(blob)) {
668 		spdk_blob_id snapshotid = spdk_blob_get_parent_snapshot(lvol->lvol_store->blobstore, lvol->blob_id);
669 		if (snapshotid != SPDK_BLOBID_INVALID) {
670 			name = vbdev_lvol_find_name(lvol, snapshotid);
671 			if (name != NULL) {
672 				spdk_json_write_named_string(w, "base_snapshot", name);
673 			} else {
674 				SPDK_ERRLOG("Cannot obtain snapshots name\n");
675 			}
676 		}
677 	}
678 
679 	if (spdk_blob_is_snapshot(blob)) {
680 		/* Take a number of clones */
681 		rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
682 		if (rc == -ENOMEM && count > 0) {
683 			ids = malloc(sizeof(spdk_blob_id) * count);
684 			if (ids == NULL) {
685 				SPDK_ERRLOG("Cannot allocate memory\n");
686 				rc = -ENOMEM;
687 				goto end;
688 			}
689 
690 			rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, ids, &count);
691 			if (rc == 0) {
692 				spdk_json_write_named_array_begin(w, "clones");
693 				for (i = 0; i < count; i++) {
694 					name = vbdev_lvol_find_name(lvol, ids[i]);
695 					if (name != NULL) {
696 						spdk_json_write_string(w, name);
697 					} else {
698 						SPDK_ERRLOG("Cannot obtain clone name\n");
699 					}
700 
701 				}
702 				spdk_json_write_array_end(w);
703 			}
704 			free(ids);
705 		}
706 
707 	}
708 
709 end:
710 	spdk_json_write_object_end(w);
711 
712 	return rc;
713 }
714 
715 static void
716 vbdev_lvol_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
717 {
718 	/* Nothing to dump as lvol configuration is saved on physical device. */
719 }
720 
721 static struct spdk_io_channel *
722 vbdev_lvol_get_io_channel(void *ctx)
723 {
724 	struct spdk_lvol *lvol = ctx;
725 
726 	return spdk_lvol_get_io_channel(lvol);
727 }
728 
729 static bool
730 vbdev_lvol_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
731 {
732 	struct spdk_lvol *lvol = ctx;
733 
734 	switch (io_type) {
735 	case SPDK_BDEV_IO_TYPE_WRITE:
736 	case SPDK_BDEV_IO_TYPE_UNMAP:
737 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
738 		return !spdk_blob_is_read_only(lvol->blob);
739 	case SPDK_BDEV_IO_TYPE_RESET:
740 	case SPDK_BDEV_IO_TYPE_READ:
741 		return true;
742 	default:
743 		return false;
744 	}
745 }
746 
747 static void
748 lvol_op_comp(void *cb_arg, int bserrno)
749 {
750 	struct lvol_task *task = cb_arg;
751 	struct spdk_bdev_io *bdev_io = spdk_bdev_io_from_ctx(task);
752 
753 	if (bserrno != 0) {
754 		if (bserrno == -ENOMEM) {
755 			task->status = SPDK_BDEV_IO_STATUS_NOMEM;
756 		} else {
757 			task->status = SPDK_BDEV_IO_STATUS_FAILED;
758 		}
759 	}
760 
761 	SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL, "Vbdev processing callback on device %s with type %d\n",
762 		     bdev_io->bdev->name, bdev_io->type);
763 	spdk_bdev_io_complete(bdev_io, task->status);
764 }
765 
766 static void
767 lvol_unmap(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
768 {
769 	uint64_t start_page, num_pages;
770 	struct spdk_blob *blob = lvol->blob;
771 	struct lvol_task *task = (struct lvol_task *)bdev_io->driver_ctx;
772 
773 	start_page = bdev_io->u.bdev.offset_blocks;
774 	num_pages = bdev_io->u.bdev.num_blocks;
775 
776 	task->status = SPDK_BDEV_IO_STATUS_SUCCESS;
777 
778 	SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL,
779 		     "Vbdev doing unmap at offset %" PRIu64 " using %" PRIu64 " pages on device %s\n", start_page,
780 		     num_pages, bdev_io->bdev->name);
781 	spdk_blob_io_unmap(blob, ch, start_page, num_pages, lvol_op_comp, task);
782 }
783 
784 static void
785 lvol_write_zeroes(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
786 {
787 	uint64_t start_page, num_pages;
788 	struct spdk_blob *blob = lvol->blob;
789 	struct lvol_task *task = (struct lvol_task *)bdev_io->driver_ctx;
790 
791 	start_page = bdev_io->u.bdev.offset_blocks;
792 	num_pages = bdev_io->u.bdev.num_blocks;
793 
794 	task->status = SPDK_BDEV_IO_STATUS_SUCCESS;
795 
796 	SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL,
797 		     "Vbdev doing write zeros at offset %" PRIu64 " using %" PRIu64 " pages on device %s\n", start_page,
798 		     num_pages, bdev_io->bdev->name);
799 	spdk_blob_io_write_zeroes(blob, ch, start_page, num_pages, lvol_op_comp, task);
800 }
801 
802 static void
803 lvol_read(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
804 {
805 	uint64_t start_page, num_pages;
806 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
807 	struct spdk_blob *blob = lvol->blob;
808 	struct lvol_task *task = (struct lvol_task *)bdev_io->driver_ctx;
809 
810 	start_page = bdev_io->u.bdev.offset_blocks;
811 	num_pages = bdev_io->u.bdev.num_blocks;
812 
813 	task->status = SPDK_BDEV_IO_STATUS_SUCCESS;
814 
815 	SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL,
816 		     "Vbdev doing read at offset %" PRIu64 " using %" PRIu64 " pages on device %s\n", start_page,
817 		     num_pages, bdev_io->bdev->name);
818 	spdk_blob_io_readv(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
819 			   num_pages, lvol_op_comp, task);
820 }
821 
822 static void
823 lvol_write(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
824 {
825 	uint64_t start_page, num_pages;
826 	struct spdk_blob *blob = lvol->blob;
827 	struct lvol_task *task = (struct lvol_task *)bdev_io->driver_ctx;
828 
829 	start_page = bdev_io->u.bdev.offset_blocks;
830 	num_pages = bdev_io->u.bdev.num_blocks;
831 
832 	task->status = SPDK_BDEV_IO_STATUS_SUCCESS;
833 
834 	SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL,
835 		     "Vbdev doing write at offset %" PRIu64 " using %" PRIu64 " pages on device %s\n", start_page,
836 		     num_pages, bdev_io->bdev->name);
837 	spdk_blob_io_writev(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
838 			    num_pages, lvol_op_comp, task);
839 }
840 
841 static int
842 lvol_reset(struct spdk_bdev_io *bdev_io)
843 {
844 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
845 
846 	return 0;
847 }
848 
849 static void
850 lvol_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
851 {
852 	if (!success) {
853 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
854 		return;
855 	}
856 
857 	lvol_read(ch, bdev_io);
858 }
859 
860 static void
861 vbdev_lvol_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
862 {
863 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
864 
865 	SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL, "Vbdev request type %d submitted\n", bdev_io->type);
866 
867 	switch (bdev_io->type) {
868 	case SPDK_BDEV_IO_TYPE_READ:
869 		spdk_bdev_io_get_buf(bdev_io, lvol_get_buf_cb,
870 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
871 		break;
872 	case SPDK_BDEV_IO_TYPE_WRITE:
873 		lvol_write(lvol, ch, bdev_io);
874 		break;
875 	case SPDK_BDEV_IO_TYPE_RESET:
876 		lvol_reset(bdev_io);
877 		break;
878 	case SPDK_BDEV_IO_TYPE_UNMAP:
879 		lvol_unmap(lvol, ch, bdev_io);
880 		break;
881 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
882 		lvol_write_zeroes(lvol, ch, bdev_io);
883 		break;
884 	default:
885 		SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL, "lvol: unsupported I/O type %d\n", bdev_io->type);
886 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
887 		return;
888 	}
889 	return;
890 }
891 
892 static struct spdk_bdev_fn_table vbdev_lvol_fn_table = {
893 	.destruct		= vbdev_lvol_unregister,
894 	.io_type_supported	= vbdev_lvol_io_type_supported,
895 	.submit_request		= vbdev_lvol_submit_request,
896 	.get_io_channel		= vbdev_lvol_get_io_channel,
897 	.dump_info_json		= vbdev_lvol_dump_info_json,
898 	.write_config_json	= vbdev_lvol_write_config_json,
899 };
900 
901 static void
902 _spdk_lvol_destroy_cb(void *cb_arg, int bdeverrno)
903 {
904 }
905 
906 static void
907 _create_lvol_disk_destroy_cb(void *cb_arg, int bdeverrno)
908 {
909 	struct spdk_lvol *lvol = cb_arg;
910 
911 	if (bdeverrno < 0) {
912 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
913 			    lvol->unique_id);
914 		return;
915 	}
916 
917 	spdk_lvol_destroy(lvol, _spdk_lvol_destroy_cb, NULL);
918 }
919 
920 static void
921 _create_lvol_disk_unload_cb(void *cb_arg, int bdeverrno)
922 {
923 	struct spdk_lvol *lvol = cb_arg;
924 
925 	if (bdeverrno < 0) {
926 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
927 			    lvol->unique_id);
928 		return;
929 	}
930 
931 	TAILQ_REMOVE(&lvol->lvol_store->lvols, lvol, link);
932 	free(lvol);
933 }
934 
935 static int
936 _create_lvol_disk(struct spdk_lvol *lvol, bool destroy)
937 {
938 	struct spdk_bdev *bdev;
939 	struct lvol_store_bdev *lvs_bdev;
940 	uint64_t total_size;
941 	unsigned char *alias;
942 	int rc;
943 
944 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
945 	if (lvs_bdev == NULL) {
946 		SPDK_ERRLOG("No spdk lvs-bdev pair found for lvol %s\n", lvol->unique_id);
947 		return -ENODEV;
948 	}
949 
950 	bdev = calloc(1, sizeof(struct spdk_bdev));
951 	if (!bdev) {
952 		SPDK_ERRLOG("Cannot alloc memory for lvol bdev\n");
953 		return -ENOMEM;
954 	}
955 
956 	bdev->name = lvol->unique_id;
957 	bdev->product_name = "Logical Volume";
958 	bdev->blocklen = spdk_bs_get_io_unit_size(lvol->lvol_store->blobstore);
959 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
960 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
961 	assert((total_size % bdev->blocklen) == 0);
962 	bdev->blockcnt = total_size / bdev->blocklen;
963 	bdev->uuid = lvol->uuid;
964 	bdev->required_alignment = lvs_bdev->bdev->required_alignment;
965 	bdev->split_on_optimal_io_boundary = true;
966 	bdev->optimal_io_boundary = spdk_bs_get_cluster_size(lvol->lvol_store->blobstore) / bdev->blocklen;
967 
968 	bdev->ctxt = lvol;
969 	bdev->fn_table = &vbdev_lvol_fn_table;
970 	bdev->module = &g_lvol_if;
971 
972 	rc = spdk_bdev_register(bdev);
973 	if (rc) {
974 		free(bdev);
975 		return rc;
976 	}
977 	lvol->bdev = bdev;
978 
979 	alias = spdk_sprintf_alloc("%s/%s", lvs_bdev->lvs->name, lvol->name);
980 	if (alias == NULL) {
981 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
982 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
983 						  _create_lvol_disk_unload_cb), lvol);
984 		return -ENOMEM;
985 	}
986 
987 	rc = spdk_bdev_alias_add(bdev, alias);
988 	if (rc != 0) {
989 		SPDK_ERRLOG("Cannot add alias to lvol bdev\n");
990 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
991 						  _create_lvol_disk_unload_cb), lvol);
992 	}
993 	free(alias);
994 
995 	return rc;
996 }
997 
998 static void
999 _vbdev_lvol_create_cb(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1000 {
1001 	struct spdk_lvol_with_handle_req *req = cb_arg;
1002 
1003 	if (lvolerrno < 0) {
1004 		goto end;
1005 	}
1006 
1007 	lvolerrno = _create_lvol_disk(lvol, true);
1008 
1009 end:
1010 	req->cb_fn(req->cb_arg, lvol, lvolerrno);
1011 	free(req);
1012 }
1013 
1014 int
1015 vbdev_lvol_create(struct spdk_lvol_store *lvs, const char *name, uint64_t sz,
1016 		  bool thin_provision, enum lvol_clear_method clear_method, spdk_lvol_op_with_handle_complete cb_fn,
1017 		  void *cb_arg)
1018 {
1019 	struct spdk_lvol_with_handle_req *req;
1020 	int rc;
1021 
1022 	req = calloc(1, sizeof(*req));
1023 	if (req == NULL) {
1024 		return -ENOMEM;
1025 	}
1026 	req->cb_fn = cb_fn;
1027 	req->cb_arg = cb_arg;
1028 
1029 	rc = spdk_lvol_create(lvs, name, sz, thin_provision, clear_method,
1030 			      _vbdev_lvol_create_cb, req);
1031 	if (rc != 0) {
1032 		free(req);
1033 	}
1034 
1035 	return rc;
1036 }
1037 
1038 void
1039 vbdev_lvol_create_snapshot(struct spdk_lvol *lvol, const char *snapshot_name,
1040 			   spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1041 {
1042 	struct spdk_lvol_with_handle_req *req;
1043 
1044 	req = calloc(1, sizeof(*req));
1045 	if (req == NULL) {
1046 		cb_fn(cb_arg, NULL, -ENOMEM);
1047 		return;
1048 	}
1049 
1050 	req->cb_fn = cb_fn;
1051 	req->cb_arg = cb_arg;
1052 
1053 	spdk_lvol_create_snapshot(lvol, snapshot_name, _vbdev_lvol_create_cb, req);
1054 }
1055 
1056 void
1057 vbdev_lvol_create_clone(struct spdk_lvol *lvol, const char *clone_name,
1058 			spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1059 {
1060 	struct spdk_lvol_with_handle_req *req;
1061 
1062 	req = calloc(1, sizeof(*req));
1063 	if (req == NULL) {
1064 		cb_fn(cb_arg, NULL, -ENOMEM);
1065 		return;
1066 	}
1067 
1068 	req->cb_fn = cb_fn;
1069 	req->cb_arg = cb_arg;
1070 
1071 	spdk_lvol_create_clone(lvol, clone_name, _vbdev_lvol_create_cb, req);
1072 }
1073 
1074 static void
1075 _vbdev_lvol_rename_cb(void *cb_arg, int lvolerrno)
1076 {
1077 	struct spdk_lvol_req *req = cb_arg;
1078 
1079 	if (lvolerrno != 0) {
1080 		SPDK_ERRLOG("Renaming lvol failed\n");
1081 	}
1082 
1083 	req->cb_fn(req->cb_arg, lvolerrno);
1084 	free(req);
1085 }
1086 
1087 void
1088 vbdev_lvol_rename(struct spdk_lvol *lvol, const char *new_lvol_name,
1089 		  spdk_lvol_op_complete cb_fn, void *cb_arg)
1090 {
1091 	struct spdk_lvol_req *req;
1092 	int rc;
1093 
1094 	rc = _vbdev_lvol_change_bdev_alias(lvol, new_lvol_name);
1095 	if (rc != 0) {
1096 		SPDK_ERRLOG("renaming lvol to '%s' does not succeed\n", new_lvol_name);
1097 		cb_fn(cb_arg, rc);
1098 		return;
1099 	}
1100 
1101 	req = calloc(1, sizeof(*req));
1102 	if (req == NULL) {
1103 		cb_fn(cb_arg, -ENOMEM);
1104 		return;
1105 	}
1106 	req->cb_fn = cb_fn;
1107 	req->cb_arg = cb_arg;
1108 
1109 	spdk_lvol_rename(lvol, new_lvol_name, _vbdev_lvol_rename_cb, req);
1110 }
1111 
1112 static void
1113 _vbdev_lvol_resize_cb(void *cb_arg, int lvolerrno)
1114 {
1115 	struct spdk_lvol_req *req = cb_arg;
1116 	struct spdk_lvol *lvol = req->lvol;
1117 	uint64_t total_size;
1118 
1119 	/* change bdev size */
1120 	if (lvolerrno != 0) {
1121 		SPDK_ERRLOG("CB function for bdev lvol %s receive error no: %d.\n", lvol->name, lvolerrno);
1122 		goto finish;
1123 	}
1124 
1125 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
1126 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
1127 	assert((total_size % lvol->bdev->blocklen) == 0);
1128 
1129 	lvolerrno = spdk_bdev_notify_blockcnt_change(lvol->bdev, total_size / lvol->bdev->blocklen);
1130 	if (lvolerrno != 0) {
1131 		SPDK_ERRLOG("Could not change num blocks for bdev lvol %s with error no: %d.\n",
1132 			    lvol->name, lvolerrno);
1133 	}
1134 
1135 finish:
1136 	req->cb_fn(req->cb_arg, lvolerrno);
1137 	free(req);
1138 }
1139 
1140 void
1141 vbdev_lvol_resize(struct spdk_lvol *lvol, uint64_t sz, spdk_lvol_op_complete cb_fn, void *cb_arg)
1142 {
1143 	struct spdk_lvol_req *req;
1144 
1145 	if (lvol == NULL) {
1146 		SPDK_ERRLOG("lvol does not exist\n");
1147 		cb_fn(cb_arg, -EINVAL);
1148 		return;
1149 	}
1150 
1151 	assert(lvol->bdev != NULL);
1152 
1153 	req = calloc(1, sizeof(*req));
1154 	if (req == NULL) {
1155 		cb_fn(cb_arg, -ENOMEM);
1156 		return;
1157 	}
1158 
1159 	req->cb_fn = cb_fn;
1160 	req->cb_arg = cb_arg;
1161 	req->sz = sz;
1162 	req->lvol = lvol;
1163 
1164 	spdk_lvol_resize(req->lvol, req->sz, _vbdev_lvol_resize_cb, req);
1165 }
1166 
1167 static void
1168 _vbdev_lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
1169 {
1170 	struct spdk_lvol_req *req = cb_arg;
1171 	struct spdk_lvol *lvol = req->lvol;
1172 
1173 	if (lvolerrno != 0) {
1174 		SPDK_ERRLOG("Could not set bdev lvol %s as read only due to error: %d.\n", lvol->name, lvolerrno);
1175 	}
1176 
1177 	req->cb_fn(req->cb_arg, lvolerrno);
1178 	free(req);
1179 }
1180 
1181 void
1182 vbdev_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1183 {
1184 	struct spdk_lvol_req *req;
1185 
1186 	if (lvol == NULL) {
1187 		SPDK_ERRLOG("lvol does not exist\n");
1188 		cb_fn(cb_arg, -EINVAL);
1189 		return;
1190 	}
1191 
1192 	assert(lvol->bdev != NULL);
1193 
1194 	req = calloc(1, sizeof(*req));
1195 	if (req == NULL) {
1196 		cb_fn(cb_arg, -ENOMEM);
1197 		return;
1198 	}
1199 
1200 	req->cb_fn = cb_fn;
1201 	req->cb_arg = cb_arg;
1202 	req->lvol = lvol;
1203 
1204 	spdk_lvol_set_read_only(lvol, _vbdev_lvol_set_read_only_cb, req);
1205 }
1206 
1207 static int
1208 vbdev_lvs_init(void)
1209 {
1210 	return 0;
1211 }
1212 
1213 static int
1214 vbdev_lvs_get_ctx_size(void)
1215 {
1216 	return sizeof(struct lvol_task);
1217 }
1218 
1219 static void
1220 _vbdev_lvs_examine_failed(void *cb_arg, int lvserrno)
1221 {
1222 	spdk_bdev_module_examine_done(&g_lvol_if);
1223 }
1224 
1225 static void
1226 _vbdev_lvol_examine_close_cb(struct spdk_lvol_store *lvs)
1227 {
1228 	if (lvs->lvols_opened >= lvs->lvol_count) {
1229 		SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL, "Opening lvols finished\n");
1230 		spdk_bdev_module_examine_done(&g_lvol_if);
1231 	}
1232 }
1233 
1234 static void
1235 _vbdev_lvs_examine_finish(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1236 {
1237 	struct spdk_lvol_store *lvs = cb_arg;
1238 
1239 	if (lvolerrno != 0) {
1240 		SPDK_ERRLOG("Error opening lvol %s\n", lvol->unique_id);
1241 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
1242 		lvs->lvol_count--;
1243 		free(lvol);
1244 		goto end;
1245 	}
1246 
1247 	if (_create_lvol_disk(lvol, false)) {
1248 		SPDK_ERRLOG("Cannot create bdev for lvol %s\n", lvol->unique_id);
1249 		lvs->lvol_count--;
1250 		_vbdev_lvol_examine_close_cb(lvs);
1251 		SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL, "Opening lvol %s failed\n", lvol->unique_id);
1252 		return;
1253 	}
1254 
1255 	lvs->lvols_opened++;
1256 	SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL, "Opening lvol %s succeeded\n", lvol->unique_id);
1257 
1258 end:
1259 
1260 	if (lvs->lvols_opened >= lvs->lvol_count) {
1261 		SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL, "Opening lvols finished\n");
1262 		spdk_bdev_module_examine_done(&g_lvol_if);
1263 	}
1264 }
1265 
1266 static void
1267 _vbdev_lvs_examine_cb(void *arg, struct spdk_lvol_store *lvol_store, int lvserrno)
1268 {
1269 	struct lvol_store_bdev *lvs_bdev;
1270 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)arg;
1271 	struct spdk_lvol *lvol, *tmp;
1272 
1273 	if (lvserrno == -EEXIST) {
1274 		SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL,
1275 			     "Name for lvolstore on device %s conflicts with name for already loaded lvs\n",
1276 			     req->base_bdev->name);
1277 		/* On error blobstore destroys bs_dev itself */
1278 		spdk_bdev_module_examine_done(&g_lvol_if);
1279 		goto end;
1280 	} else if (lvserrno != 0) {
1281 		SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL, "Lvol store not found on %s\n", req->base_bdev->name);
1282 		/* On error blobstore destroys bs_dev itself */
1283 		spdk_bdev_module_examine_done(&g_lvol_if);
1284 		goto end;
1285 	}
1286 
1287 	lvserrno = spdk_bs_bdev_claim(lvol_store->bs_dev, &g_lvol_if);
1288 	if (lvserrno != 0) {
1289 		SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL, "Lvol store base bdev already claimed by another bdev\n");
1290 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, NULL);
1291 		goto end;
1292 	}
1293 
1294 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
1295 	if (!lvs_bdev) {
1296 		SPDK_ERRLOG("Cannot alloc memory for lvs_bdev\n");
1297 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, NULL);
1298 		goto end;
1299 	}
1300 
1301 	lvs_bdev->lvs = lvol_store;
1302 	lvs_bdev->bdev = req->base_bdev;
1303 
1304 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1305 
1306 	SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL, "Lvol store found on %s - begin parsing\n",
1307 		     req->base_bdev->name);
1308 
1309 	lvol_store->lvols_opened = 0;
1310 
1311 	if (TAILQ_EMPTY(&lvol_store->lvols)) {
1312 		SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL, "Lvol store examination done\n");
1313 		spdk_bdev_module_examine_done(&g_lvol_if);
1314 	} else {
1315 		/* Open all lvols */
1316 		TAILQ_FOREACH_SAFE(lvol, &lvol_store->lvols, link, tmp) {
1317 			spdk_lvol_open(lvol, _vbdev_lvs_examine_finish, lvol_store);
1318 		}
1319 	}
1320 
1321 end:
1322 	free(req);
1323 }
1324 
1325 static void
1326 vbdev_lvs_examine(struct spdk_bdev *bdev)
1327 {
1328 	struct spdk_bs_dev *bs_dev;
1329 	struct spdk_lvs_with_handle_req *req;
1330 
1331 	req = calloc(1, sizeof(*req));
1332 	if (req == NULL) {
1333 		spdk_bdev_module_examine_done(&g_lvol_if);
1334 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1335 		return;
1336 	}
1337 
1338 	bs_dev = spdk_bdev_create_bs_dev(bdev, vbdev_lvs_hotremove_cb, bdev);
1339 	if (!bs_dev) {
1340 		SPDK_INFOLOG(SPDK_LOG_VBDEV_LVOL, "Cannot create bs dev on %s\n", bdev->name);
1341 		spdk_bdev_module_examine_done(&g_lvol_if);
1342 		free(req);
1343 		return;
1344 	}
1345 
1346 	req->base_bdev = bdev;
1347 
1348 	spdk_lvs_load(bs_dev, _vbdev_lvs_examine_cb, req);
1349 }
1350 
1351 struct spdk_lvol *
1352 vbdev_lvol_get_from_bdev(struct spdk_bdev *bdev)
1353 {
1354 	if (!bdev || bdev->module != &g_lvol_if) {
1355 		return NULL;
1356 	}
1357 
1358 	if (bdev->ctxt == NULL) {
1359 		SPDK_ERRLOG("No lvol ctx assigned to bdev %s\n", bdev->name);
1360 		return NULL;
1361 	}
1362 
1363 	return (struct spdk_lvol *)bdev->ctxt;
1364 }
1365 
1366 SPDK_LOG_REGISTER_COMPONENT("vbdev_lvol", SPDK_LOG_VBDEV_LVOL);
1367