xref: /spdk/module/bdev/lvol/vbdev_lvol.c (revision a6dbe3721eb3b5990707fc3e378c95e505dd8ab5)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/blob_bdev.h"
8 #include "spdk/rpc.h"
9 #include "spdk/bdev_module.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/uuid.h"
13 
14 #include "vbdev_lvol.h"
15 
16 struct vbdev_lvol_io {
17 	struct spdk_blob_ext_io_opts ext_io_opts;
18 };
19 
20 static TAILQ_HEAD(, lvol_store_bdev) g_spdk_lvol_pairs = TAILQ_HEAD_INITIALIZER(
21 			g_spdk_lvol_pairs);
22 
23 static int vbdev_lvs_init(void);
24 static void vbdev_lvs_fini_start(void);
25 static int vbdev_lvs_get_ctx_size(void);
26 static void vbdev_lvs_examine(struct spdk_bdev *bdev);
27 static bool g_shutdown_started = false;
28 
29 static struct spdk_bdev_module g_lvol_if = {
30 	.name = "lvol",
31 	.module_init = vbdev_lvs_init,
32 	.fini_start = vbdev_lvs_fini_start,
33 	.async_fini_start = true,
34 	.examine_disk = vbdev_lvs_examine,
35 	.get_ctx_size = vbdev_lvs_get_ctx_size,
36 
37 };
38 
39 SPDK_BDEV_MODULE_REGISTER(lvol, &g_lvol_if)
40 
41 struct lvol_store_bdev *
42 vbdev_get_lvs_bdev_by_lvs(struct spdk_lvol_store *lvs_orig)
43 {
44 	struct spdk_lvol_store *lvs = NULL;
45 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
46 
47 	while (lvs_bdev != NULL) {
48 		lvs = lvs_bdev->lvs;
49 		if (lvs == lvs_orig) {
50 			if (lvs_bdev->req != NULL) {
51 				/* We do not allow access to lvs that are being destroyed */
52 				return NULL;
53 			} else {
54 				return lvs_bdev;
55 			}
56 		}
57 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
58 	}
59 
60 	return NULL;
61 }
62 
63 static int
64 _vbdev_lvol_change_bdev_alias(struct spdk_lvol *lvol, const char *new_lvol_name)
65 {
66 	struct spdk_bdev_alias *tmp;
67 	char *old_alias;
68 	char *alias;
69 	int rc;
70 	int alias_number = 0;
71 
72 	/* bdev representing lvols have only one alias,
73 	 * while we changed lvs name earlier, we have to iterate alias list to get one,
74 	 * and check if there is only one alias */
75 
76 	TAILQ_FOREACH(tmp, spdk_bdev_get_aliases(lvol->bdev), tailq) {
77 		if (++alias_number > 1) {
78 			SPDK_ERRLOG("There is more than 1 alias in bdev %s\n", lvol->bdev->name);
79 			return -EINVAL;
80 		}
81 
82 		old_alias = tmp->alias.name;
83 	}
84 
85 	if (alias_number == 0) {
86 		SPDK_ERRLOG("There are no aliases in bdev %s\n", lvol->bdev->name);
87 		return -EINVAL;
88 	}
89 
90 	alias = spdk_sprintf_alloc("%s/%s", lvol->lvol_store->name, new_lvol_name);
91 	if (alias == NULL) {
92 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
93 		return -ENOMEM;
94 	}
95 
96 	rc = spdk_bdev_alias_add(lvol->bdev, alias);
97 	if (rc != 0) {
98 		SPDK_ERRLOG("cannot add alias '%s'\n", alias);
99 		free(alias);
100 		return rc;
101 	}
102 	free(alias);
103 
104 	rc = spdk_bdev_alias_del(lvol->bdev, old_alias);
105 	if (rc != 0) {
106 		SPDK_ERRLOG("cannot remove alias '%s'\n", old_alias);
107 		return rc;
108 	}
109 
110 	return 0;
111 }
112 
113 static struct lvol_store_bdev *
114 vbdev_get_lvs_bdev_by_bdev(struct spdk_bdev *bdev_orig)
115 {
116 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
117 
118 	while (lvs_bdev != NULL) {
119 		if (lvs_bdev->bdev == bdev_orig) {
120 			if (lvs_bdev->req != NULL) {
121 				/* We do not allow access to lvs that are being destroyed */
122 				return NULL;
123 			} else {
124 				return lvs_bdev;
125 			}
126 		}
127 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
128 	}
129 
130 	return NULL;
131 }
132 
133 static void
134 vbdev_lvs_hotremove_cb(struct spdk_bdev *bdev)
135 {
136 	struct lvol_store_bdev *lvs_bdev;
137 
138 	lvs_bdev = vbdev_get_lvs_bdev_by_bdev(bdev);
139 	if (lvs_bdev != NULL) {
140 		vbdev_lvs_unload(lvs_bdev->lvs, NULL, NULL);
141 	}
142 }
143 
144 static void
145 vbdev_lvs_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
146 			     void *event_ctx)
147 {
148 	switch (type) {
149 	case SPDK_BDEV_EVENT_REMOVE:
150 		vbdev_lvs_hotremove_cb(bdev);
151 		break;
152 	default:
153 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
154 		break;
155 	}
156 }
157 
158 static void
159 _vbdev_lvs_create_cb(void *cb_arg, struct spdk_lvol_store *lvs, int lvserrno)
160 {
161 	struct spdk_lvs_with_handle_req *req = cb_arg;
162 	struct lvol_store_bdev *lvs_bdev;
163 	struct spdk_bdev *bdev = req->base_bdev;
164 	struct spdk_bs_dev *bs_dev = req->bs_dev;
165 
166 	if (lvserrno != 0) {
167 		assert(lvs == NULL);
168 		SPDK_ERRLOG("Cannot create lvol store bdev\n");
169 		goto end;
170 	}
171 
172 	lvserrno = spdk_bs_bdev_claim(bs_dev, &g_lvol_if);
173 	if (lvserrno != 0) {
174 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
175 		req->bs_dev->destroy(req->bs_dev);
176 		goto end;
177 	}
178 
179 	assert(lvs != NULL);
180 
181 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
182 	if (!lvs_bdev) {
183 		lvserrno = -ENOMEM;
184 		goto end;
185 	}
186 	lvs_bdev->lvs = lvs;
187 	lvs_bdev->bdev = bdev;
188 	lvs_bdev->req = NULL;
189 
190 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
191 	SPDK_INFOLOG(vbdev_lvol, "Lvol store bdev inserted\n");
192 
193 end:
194 	req->cb_fn(req->cb_arg, lvs, lvserrno);
195 	free(req);
196 
197 	return;
198 }
199 
200 int
201 vbdev_lvs_create(const char *base_bdev_name, const char *name, uint32_t cluster_sz,
202 		 enum lvs_clear_method clear_method, uint32_t num_md_pages_per_cluster_ratio,
203 		 spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
204 {
205 	struct spdk_bs_dev *bs_dev;
206 	struct spdk_lvs_with_handle_req *lvs_req;
207 	struct spdk_lvs_opts opts;
208 	int rc;
209 	int len;
210 
211 	if (base_bdev_name == NULL) {
212 		SPDK_ERRLOG("missing base_bdev_name param\n");
213 		return -EINVAL;
214 	}
215 
216 	spdk_lvs_opts_init(&opts);
217 	if (cluster_sz != 0) {
218 		opts.cluster_sz = cluster_sz;
219 	}
220 
221 	if (clear_method != 0) {
222 		opts.clear_method = clear_method;
223 	}
224 
225 	if (num_md_pages_per_cluster_ratio != 0) {
226 		opts.num_md_pages_per_cluster_ratio = num_md_pages_per_cluster_ratio;
227 	}
228 
229 	if (name == NULL) {
230 		SPDK_ERRLOG("missing name param\n");
231 		return -EINVAL;
232 	}
233 
234 	len = strnlen(name, SPDK_LVS_NAME_MAX);
235 
236 	if (len == 0 || len == SPDK_LVS_NAME_MAX) {
237 		SPDK_ERRLOG("name must be between 1 and %d characters\n", SPDK_LVS_NAME_MAX - 1);
238 		return -EINVAL;
239 	}
240 	snprintf(opts.name, sizeof(opts.name), "%s", name);
241 
242 	lvs_req = calloc(1, sizeof(*lvs_req));
243 	if (!lvs_req) {
244 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
245 		return -ENOMEM;
246 	}
247 
248 	rc = spdk_bdev_create_bs_dev_ext(base_bdev_name, vbdev_lvs_base_bdev_event_cb,
249 					 NULL, &bs_dev);
250 	if (rc < 0) {
251 		SPDK_ERRLOG("Cannot create blobstore device\n");
252 		free(lvs_req);
253 		return rc;
254 	}
255 
256 	lvs_req->bs_dev = bs_dev;
257 	lvs_req->base_bdev = bs_dev->get_base_bdev(bs_dev);
258 	lvs_req->cb_fn = cb_fn;
259 	lvs_req->cb_arg = cb_arg;
260 
261 	rc = spdk_lvs_init(bs_dev, &opts, _vbdev_lvs_create_cb, lvs_req);
262 	if (rc < 0) {
263 		free(lvs_req);
264 		bs_dev->destroy(bs_dev);
265 		return rc;
266 	}
267 
268 	return 0;
269 }
270 
271 static void
272 _vbdev_lvs_rename_cb(void *cb_arg, int lvserrno)
273 {
274 	struct spdk_lvs_req *req = cb_arg;
275 	struct spdk_lvol *tmp;
276 
277 	if (lvserrno != 0) {
278 		SPDK_INFOLOG(vbdev_lvol, "Lvol store rename failed\n");
279 	} else {
280 		TAILQ_FOREACH(tmp, &req->lvol_store->lvols, link) {
281 			/* We have to pass current lvol name, since only lvs name changed */
282 			_vbdev_lvol_change_bdev_alias(tmp, tmp->name);
283 		}
284 	}
285 
286 	req->cb_fn(req->cb_arg, lvserrno);
287 	free(req);
288 }
289 
290 void
291 vbdev_lvs_rename(struct spdk_lvol_store *lvs, const char *new_lvs_name,
292 		 spdk_lvs_op_complete cb_fn, void *cb_arg)
293 {
294 	struct lvol_store_bdev *lvs_bdev;
295 
296 	struct spdk_lvs_req *req;
297 
298 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
299 	if (!lvs_bdev) {
300 		SPDK_ERRLOG("No such lvol store found\n");
301 		cb_fn(cb_arg, -ENODEV);
302 		return;
303 	}
304 
305 	req = calloc(1, sizeof(*req));
306 	if (!req) {
307 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
308 		cb_fn(cb_arg, -ENOMEM);
309 		return;
310 	}
311 	req->cb_fn = cb_fn;
312 	req->cb_arg = cb_arg;
313 	req->lvol_store = lvs;
314 
315 	spdk_lvs_rename(lvs, new_lvs_name, _vbdev_lvs_rename_cb, req);
316 }
317 
318 static void
319 _vbdev_lvs_remove_cb(void *cb_arg, int lvserrno)
320 {
321 	struct lvol_store_bdev *lvs_bdev = cb_arg;
322 	struct spdk_lvs_req *req = lvs_bdev->req;
323 
324 	if (lvserrno != 0) {
325 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
326 	}
327 
328 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
329 	free(lvs_bdev);
330 
331 	if (req->cb_fn != NULL) {
332 		req->cb_fn(req->cb_arg, lvserrno);
333 	}
334 	free(req);
335 }
336 
337 static void
338 _vbdev_lvs_remove_lvol_cb(void *cb_arg, int lvolerrno)
339 {
340 	struct lvol_store_bdev *lvs_bdev = cb_arg;
341 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
342 	struct spdk_lvol *lvol;
343 
344 	if (lvolerrno != 0) {
345 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol removed with errno %d\n", lvolerrno);
346 	}
347 
348 	if (TAILQ_EMPTY(&lvs->lvols)) {
349 		spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
350 		return;
351 	}
352 
353 	lvol = TAILQ_FIRST(&lvs->lvols);
354 	while (lvol != NULL) {
355 		if (spdk_lvol_deletable(lvol)) {
356 			vbdev_lvol_destroy(lvol, _vbdev_lvs_remove_lvol_cb, lvs_bdev);
357 			return;
358 		}
359 		lvol = TAILQ_NEXT(lvol, link);
360 	}
361 
362 	/* If no lvol is deletable, that means there is circular dependency. */
363 	SPDK_ERRLOG("Lvols left in lvs, but unable to delete.\n");
364 	assert(false);
365 }
366 
367 static bool
368 _vbdev_lvs_are_lvols_closed(struct spdk_lvol_store *lvs)
369 {
370 	struct spdk_lvol *lvol;
371 
372 	TAILQ_FOREACH(lvol, &lvs->lvols, link) {
373 		if (lvol->ref_count != 0) {
374 			return false;
375 		}
376 	}
377 	return true;
378 }
379 
380 static void
381 _vbdev_lvs_remove_bdev_unregistered_cb(void *cb_arg, int bdeverrno)
382 {
383 	struct lvol_store_bdev *lvs_bdev = cb_arg;
384 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
385 
386 	if (bdeverrno != 0) {
387 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol unregistered with errno %d\n", bdeverrno);
388 	}
389 
390 	/* Lvol store can be unloaded once all lvols are closed. */
391 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
392 		spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
393 	}
394 }
395 
396 static void
397 _vbdev_lvs_remove(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg,
398 		  bool destroy)
399 {
400 	struct spdk_lvs_req *req;
401 	struct lvol_store_bdev *lvs_bdev;
402 	struct spdk_lvol *lvol, *tmp;
403 
404 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
405 	if (!lvs_bdev) {
406 		SPDK_ERRLOG("No such lvol store found\n");
407 		if (cb_fn != NULL) {
408 			cb_fn(cb_arg, -ENODEV);
409 		}
410 		return;
411 	}
412 
413 	req = calloc(1, sizeof(*req));
414 	if (!req) {
415 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
416 		if (cb_fn != NULL) {
417 			cb_fn(cb_arg, -ENOMEM);
418 		}
419 		return;
420 	}
421 
422 	req->cb_fn = cb_fn;
423 	req->cb_arg = cb_arg;
424 	lvs_bdev->req = req;
425 
426 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
427 		if (destroy) {
428 			spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
429 		} else {
430 			spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
431 		}
432 	} else {
433 		lvs->destruct = destroy;
434 		if (destroy) {
435 			_vbdev_lvs_remove_lvol_cb(lvs_bdev, 0);
436 		} else {
437 			TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
438 				spdk_bdev_unregister(lvol->bdev, _vbdev_lvs_remove_bdev_unregistered_cb, lvs_bdev);
439 			}
440 		}
441 	}
442 }
443 
444 void
445 vbdev_lvs_unload(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
446 {
447 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, false);
448 }
449 
450 void
451 vbdev_lvs_destruct(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
452 {
453 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, true);
454 }
455 
456 struct lvol_store_bdev *
457 vbdev_lvol_store_first(void)
458 {
459 	struct lvol_store_bdev *lvs_bdev;
460 
461 	lvs_bdev = TAILQ_FIRST(&g_spdk_lvol_pairs);
462 	if (lvs_bdev) {
463 		SPDK_INFOLOG(vbdev_lvol, "Starting lvolstore iteration at %p\n", lvs_bdev->lvs);
464 	}
465 
466 	return lvs_bdev;
467 }
468 
469 struct lvol_store_bdev *
470 vbdev_lvol_store_next(struct lvol_store_bdev *prev)
471 {
472 	struct lvol_store_bdev *lvs_bdev;
473 
474 	if (prev == NULL) {
475 		SPDK_ERRLOG("prev argument cannot be NULL\n");
476 		return NULL;
477 	}
478 
479 	lvs_bdev = TAILQ_NEXT(prev, lvol_stores);
480 	if (lvs_bdev) {
481 		SPDK_INFOLOG(vbdev_lvol, "Continuing lvolstore iteration at %p\n", lvs_bdev->lvs);
482 	}
483 
484 	return lvs_bdev;
485 }
486 
487 static struct spdk_lvol_store *
488 _vbdev_get_lvol_store_by_uuid(const struct spdk_uuid *uuid)
489 {
490 	struct spdk_lvol_store *lvs = NULL;
491 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
492 
493 	while (lvs_bdev != NULL) {
494 		lvs = lvs_bdev->lvs;
495 		if (spdk_uuid_compare(&lvs->uuid, uuid) == 0) {
496 			return lvs;
497 		}
498 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
499 	}
500 	return NULL;
501 }
502 
503 struct spdk_lvol_store *
504 vbdev_get_lvol_store_by_uuid(const char *uuid_str)
505 {
506 	struct spdk_uuid uuid;
507 
508 	if (spdk_uuid_parse(&uuid, uuid_str)) {
509 		return NULL;
510 	}
511 
512 	return _vbdev_get_lvol_store_by_uuid(&uuid);
513 }
514 
515 struct spdk_lvol_store *
516 vbdev_get_lvol_store_by_name(const char *name)
517 {
518 	struct spdk_lvol_store *lvs = NULL;
519 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
520 
521 	while (lvs_bdev != NULL) {
522 		lvs = lvs_bdev->lvs;
523 		if (strncmp(lvs->name, name, sizeof(lvs->name)) == 0) {
524 			return lvs;
525 		}
526 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
527 	}
528 	return NULL;
529 }
530 
531 struct vbdev_lvol_destroy_ctx {
532 	struct spdk_lvol *lvol;
533 	spdk_lvol_op_complete cb_fn;
534 	void *cb_arg;
535 };
536 
537 static void
538 _vbdev_lvol_unregister_unload_lvs(void *cb_arg, int lvserrno)
539 {
540 	struct lvol_bdev *lvol_bdev = cb_arg;
541 	struct lvol_store_bdev *lvs_bdev = lvol_bdev->lvs_bdev;
542 
543 	if (lvserrno != 0) {
544 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
545 	}
546 
547 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
548 	free(lvs_bdev);
549 
550 	spdk_bdev_destruct_done(&lvol_bdev->bdev, lvserrno);
551 	free(lvol_bdev);
552 }
553 
554 static void
555 _vbdev_lvol_unregister_cb(void *ctx, int lvolerrno)
556 {
557 	struct lvol_bdev *lvol_bdev = ctx;
558 	struct lvol_store_bdev *lvs_bdev = lvol_bdev->lvs_bdev;
559 
560 	if (g_shutdown_started && _vbdev_lvs_are_lvols_closed(lvs_bdev->lvs)) {
561 		spdk_lvs_unload(lvs_bdev->lvs, _vbdev_lvol_unregister_unload_lvs, lvol_bdev);
562 		return;
563 	}
564 
565 	spdk_bdev_destruct_done(&lvol_bdev->bdev, lvolerrno);
566 	free(lvol_bdev);
567 }
568 
569 static int
570 vbdev_lvol_unregister(void *ctx)
571 {
572 	struct spdk_lvol *lvol = ctx;
573 	struct lvol_bdev *lvol_bdev;
574 
575 	assert(lvol != NULL);
576 	lvol_bdev = SPDK_CONTAINEROF(lvol->bdev, struct lvol_bdev, bdev);
577 
578 	spdk_bdev_alias_del_all(lvol->bdev);
579 	spdk_lvol_close(lvol, _vbdev_lvol_unregister_cb, lvol_bdev);
580 
581 	/* return 1 to indicate we have an operation that must finish asynchronously before the
582 	 *  lvol is closed
583 	 */
584 	return 1;
585 }
586 
587 static void
588 _vbdev_lvol_destroy_cb(void *cb_arg, int bdeverrno)
589 {
590 	struct vbdev_lvol_destroy_ctx *ctx = cb_arg;
591 	struct spdk_lvol *lvol = ctx->lvol;
592 
593 	if (bdeverrno < 0) {
594 		SPDK_INFOLOG(vbdev_lvol, "Could not unregister bdev during lvol (%s) destroy\n",
595 			     lvol->unique_id);
596 		ctx->cb_fn(ctx->cb_arg, bdeverrno);
597 		free(ctx);
598 		return;
599 	}
600 
601 	spdk_lvol_destroy(lvol, ctx->cb_fn, ctx->cb_arg);
602 	free(ctx);
603 }
604 
605 void
606 vbdev_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
607 {
608 	struct vbdev_lvol_destroy_ctx *ctx;
609 	size_t count;
610 
611 	assert(lvol != NULL);
612 	assert(cb_fn != NULL);
613 
614 	/* Check if it is possible to delete lvol */
615 	spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
616 	if (count > 1) {
617 		/* throw an error */
618 		SPDK_ERRLOG("Cannot delete lvol\n");
619 		cb_fn(cb_arg, -EPERM);
620 		return;
621 	}
622 
623 	ctx = calloc(1, sizeof(*ctx));
624 	if (!ctx) {
625 		cb_fn(cb_arg, -ENOMEM);
626 		return;
627 	}
628 
629 	ctx->lvol = lvol;
630 	ctx->cb_fn = cb_fn;
631 	ctx->cb_arg = cb_arg;
632 
633 	spdk_bdev_unregister(lvol->bdev, _vbdev_lvol_destroy_cb, ctx);
634 }
635 
636 static char *
637 vbdev_lvol_find_name(struct spdk_lvol *lvol, spdk_blob_id blob_id)
638 {
639 	struct spdk_lvol_store *lvs;
640 	struct spdk_lvol *_lvol;
641 
642 	assert(lvol != NULL);
643 
644 	lvs = lvol->lvol_store;
645 
646 	assert(lvs);
647 
648 	TAILQ_FOREACH(_lvol, &lvs->lvols, link) {
649 		if (_lvol->blob_id == blob_id) {
650 			return _lvol->name;
651 		}
652 	}
653 
654 	return NULL;
655 }
656 
657 static int
658 vbdev_lvol_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
659 {
660 	struct spdk_lvol *lvol = ctx;
661 	struct lvol_store_bdev *lvs_bdev;
662 	struct spdk_bdev *bdev;
663 	struct spdk_blob *blob;
664 	char lvol_store_uuid[SPDK_UUID_STRING_LEN];
665 	spdk_blob_id *ids = NULL;
666 	size_t count, i;
667 	char *name;
668 	int rc = 0;
669 
670 	spdk_json_write_named_object_begin(w, "lvol");
671 
672 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
673 	if (!lvs_bdev) {
674 		SPDK_ERRLOG("No such lvol store found\n");
675 		rc = -ENODEV;
676 		goto end;
677 	}
678 
679 	bdev = lvs_bdev->bdev;
680 
681 	spdk_uuid_fmt_lower(lvol_store_uuid, sizeof(lvol_store_uuid), &lvol->lvol_store->uuid);
682 	spdk_json_write_named_string(w, "lvol_store_uuid", lvol_store_uuid);
683 
684 	spdk_json_write_named_string(w, "base_bdev", spdk_bdev_get_name(bdev));
685 
686 	blob = lvol->blob;
687 
688 	spdk_json_write_named_bool(w, "thin_provision", spdk_blob_is_thin_provisioned(blob));
689 
690 	spdk_json_write_named_bool(w, "snapshot", spdk_blob_is_snapshot(blob));
691 
692 	spdk_json_write_named_bool(w, "clone", spdk_blob_is_clone(blob));
693 
694 	if (spdk_blob_is_clone(blob)) {
695 		spdk_blob_id snapshotid = spdk_blob_get_parent_snapshot(lvol->lvol_store->blobstore, lvol->blob_id);
696 		if (snapshotid != SPDK_BLOBID_INVALID) {
697 			name = vbdev_lvol_find_name(lvol, snapshotid);
698 			if (name != NULL) {
699 				spdk_json_write_named_string(w, "base_snapshot", name);
700 			} else {
701 				SPDK_ERRLOG("Cannot obtain snapshots name\n");
702 			}
703 		}
704 	}
705 
706 	if (spdk_blob_is_snapshot(blob)) {
707 		/* Take a number of clones */
708 		rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
709 		if (rc == -ENOMEM && count > 0) {
710 			ids = malloc(sizeof(spdk_blob_id) * count);
711 			if (ids == NULL) {
712 				SPDK_ERRLOG("Cannot allocate memory\n");
713 				rc = -ENOMEM;
714 				goto end;
715 			}
716 
717 			rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, ids, &count);
718 			if (rc == 0) {
719 				spdk_json_write_named_array_begin(w, "clones");
720 				for (i = 0; i < count; i++) {
721 					name = vbdev_lvol_find_name(lvol, ids[i]);
722 					if (name != NULL) {
723 						spdk_json_write_string(w, name);
724 					} else {
725 						SPDK_ERRLOG("Cannot obtain clone name\n");
726 					}
727 
728 				}
729 				spdk_json_write_array_end(w);
730 			}
731 			free(ids);
732 		}
733 
734 	}
735 
736 end:
737 	spdk_json_write_object_end(w);
738 
739 	return rc;
740 }
741 
742 static void
743 vbdev_lvol_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
744 {
745 	/* Nothing to dump as lvol configuration is saved on physical device. */
746 }
747 
748 static struct spdk_io_channel *
749 vbdev_lvol_get_io_channel(void *ctx)
750 {
751 	struct spdk_lvol *lvol = ctx;
752 
753 	return spdk_lvol_get_io_channel(lvol);
754 }
755 
756 static bool
757 vbdev_lvol_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
758 {
759 	struct spdk_lvol *lvol = ctx;
760 
761 	switch (io_type) {
762 	case SPDK_BDEV_IO_TYPE_WRITE:
763 	case SPDK_BDEV_IO_TYPE_UNMAP:
764 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
765 		return !spdk_blob_is_read_only(lvol->blob);
766 	case SPDK_BDEV_IO_TYPE_RESET:
767 	case SPDK_BDEV_IO_TYPE_READ:
768 	case SPDK_BDEV_IO_TYPE_SEEK_DATA:
769 	case SPDK_BDEV_IO_TYPE_SEEK_HOLE:
770 		return true;
771 	default:
772 		return false;
773 	}
774 }
775 
776 static void
777 lvol_op_comp(void *cb_arg, int bserrno)
778 {
779 	struct spdk_bdev_io *bdev_io = cb_arg;
780 	enum spdk_bdev_io_status status = SPDK_BDEV_IO_STATUS_SUCCESS;
781 
782 	if (bserrno != 0) {
783 		if (bserrno == -ENOMEM) {
784 			status = SPDK_BDEV_IO_STATUS_NOMEM;
785 		} else {
786 			status = SPDK_BDEV_IO_STATUS_FAILED;
787 		}
788 	}
789 
790 	spdk_bdev_io_complete(bdev_io, status);
791 }
792 
793 static void
794 lvol_unmap(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
795 {
796 	uint64_t start_page, num_pages;
797 	struct spdk_blob *blob = lvol->blob;
798 
799 	start_page = bdev_io->u.bdev.offset_blocks;
800 	num_pages = bdev_io->u.bdev.num_blocks;
801 
802 	spdk_blob_io_unmap(blob, ch, start_page, num_pages, lvol_op_comp, bdev_io);
803 }
804 
805 static void
806 lvol_seek_data(struct spdk_lvol *lvol, struct spdk_bdev_io *bdev_io)
807 {
808 	bdev_io->u.bdev.seek.offset = spdk_blob_get_next_allocated_io_unit(lvol->blob,
809 				      bdev_io->u.bdev.offset_blocks);
810 
811 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
812 }
813 
814 static void
815 lvol_seek_hole(struct spdk_lvol *lvol, struct spdk_bdev_io *bdev_io)
816 {
817 	bdev_io->u.bdev.seek.offset = spdk_blob_get_next_unallocated_io_unit(lvol->blob,
818 				      bdev_io->u.bdev.offset_blocks);
819 
820 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
821 }
822 
823 static void
824 lvol_write_zeroes(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
825 {
826 	uint64_t start_page, num_pages;
827 	struct spdk_blob *blob = lvol->blob;
828 
829 	start_page = bdev_io->u.bdev.offset_blocks;
830 	num_pages = bdev_io->u.bdev.num_blocks;
831 
832 	spdk_blob_io_write_zeroes(blob, ch, start_page, num_pages, lvol_op_comp, bdev_io);
833 }
834 
835 static void
836 lvol_read(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
837 {
838 	uint64_t start_page, num_pages;
839 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
840 	struct spdk_blob *blob = lvol->blob;
841 
842 	start_page = bdev_io->u.bdev.offset_blocks;
843 	num_pages = bdev_io->u.bdev.num_blocks;
844 
845 	if (bdev_io->u.bdev.ext_opts) {
846 		struct vbdev_lvol_io *lvol_io = (struct vbdev_lvol_io *)bdev_io->driver_ctx;
847 
848 		lvol_io->ext_io_opts.size = sizeof(lvol_io->ext_io_opts);
849 		lvol_io->ext_io_opts.memory_domain = bdev_io->u.bdev.ext_opts->memory_domain;
850 		lvol_io->ext_io_opts.memory_domain_ctx = bdev_io->u.bdev.ext_opts->memory_domain_ctx;
851 		/* Save a pointer to ext_opts passed by the user, it will be used in bs_dev readv/writev_ext functions
852 		 * to restore ext_opts structure. That is done since bdev and blob extended functions use different
853 		 * extended opts structures */
854 		lvol_io->ext_io_opts.user_ctx = bdev_io->u.bdev.ext_opts;
855 
856 		spdk_blob_io_readv_ext(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
857 				       num_pages, lvol_op_comp, bdev_io, &lvol_io->ext_io_opts);
858 	} else {
859 		spdk_blob_io_readv(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
860 				   num_pages, lvol_op_comp, bdev_io);
861 	}
862 }
863 
864 static void
865 lvol_write(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
866 {
867 	uint64_t start_page, num_pages;
868 	struct spdk_blob *blob = lvol->blob;
869 
870 	start_page = bdev_io->u.bdev.offset_blocks;
871 	num_pages = bdev_io->u.bdev.num_blocks;
872 
873 	if (bdev_io->u.bdev.ext_opts) {
874 		struct vbdev_lvol_io *lvol_io = (struct vbdev_lvol_io *)bdev_io->driver_ctx;
875 
876 		lvol_io->ext_io_opts.size = sizeof(lvol_io->ext_io_opts);
877 		lvol_io->ext_io_opts.memory_domain = bdev_io->u.bdev.ext_opts->memory_domain;
878 		lvol_io->ext_io_opts.memory_domain_ctx = bdev_io->u.bdev.ext_opts->memory_domain_ctx;
879 		/* Save a pointer to ext_opts passed by the user, it will be used in bs_dev readv/writev_ext functions
880 		 * to restore ext_opts structure. That is done since bdev and blob extended functions use different
881 		 * extended opts structures */
882 		lvol_io->ext_io_opts.user_ctx = bdev_io->u.bdev.ext_opts;
883 
884 		spdk_blob_io_writev_ext(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
885 					num_pages, lvol_op_comp, bdev_io, &lvol_io->ext_io_opts);
886 	} else {
887 		spdk_blob_io_writev(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
888 				    num_pages, lvol_op_comp, bdev_io);
889 	}
890 }
891 
892 static int
893 lvol_reset(struct spdk_bdev_io *bdev_io)
894 {
895 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
896 
897 	return 0;
898 }
899 
900 static void
901 lvol_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
902 {
903 	if (!success) {
904 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
905 		return;
906 	}
907 
908 	lvol_read(ch, bdev_io);
909 }
910 
911 static void
912 vbdev_lvol_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
913 {
914 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
915 
916 	switch (bdev_io->type) {
917 	case SPDK_BDEV_IO_TYPE_READ:
918 		spdk_bdev_io_get_buf(bdev_io, lvol_get_buf_cb,
919 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
920 		break;
921 	case SPDK_BDEV_IO_TYPE_WRITE:
922 		lvol_write(lvol, ch, bdev_io);
923 		break;
924 	case SPDK_BDEV_IO_TYPE_RESET:
925 		lvol_reset(bdev_io);
926 		break;
927 	case SPDK_BDEV_IO_TYPE_UNMAP:
928 		lvol_unmap(lvol, ch, bdev_io);
929 		break;
930 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
931 		lvol_write_zeroes(lvol, ch, bdev_io);
932 		break;
933 	case SPDK_BDEV_IO_TYPE_SEEK_DATA:
934 		lvol_seek_data(lvol, bdev_io);
935 		break;
936 	case SPDK_BDEV_IO_TYPE_SEEK_HOLE:
937 		lvol_seek_hole(lvol, bdev_io);
938 		break;
939 	default:
940 		SPDK_INFOLOG(vbdev_lvol, "lvol: unsupported I/O type %d\n", bdev_io->type);
941 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
942 		return;
943 	}
944 	return;
945 }
946 
947 static int
948 vbdev_lvol_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
949 {
950 	struct spdk_lvol *lvol = ctx;
951 	struct spdk_bdev *base_bdev;
952 
953 	base_bdev = lvol->lvol_store->bs_dev->get_base_bdev(lvol->lvol_store->bs_dev);
954 
955 	return spdk_bdev_get_memory_domains(base_bdev, domains, array_size);
956 }
957 
958 static struct spdk_bdev_fn_table vbdev_lvol_fn_table = {
959 	.destruct		= vbdev_lvol_unregister,
960 	.io_type_supported	= vbdev_lvol_io_type_supported,
961 	.submit_request		= vbdev_lvol_submit_request,
962 	.get_io_channel		= vbdev_lvol_get_io_channel,
963 	.dump_info_json		= vbdev_lvol_dump_info_json,
964 	.write_config_json	= vbdev_lvol_write_config_json,
965 	.get_memory_domains	= vbdev_lvol_get_memory_domains,
966 };
967 
968 static void
969 lvol_destroy_cb(void *cb_arg, int bdeverrno)
970 {
971 }
972 
973 static void
974 _create_lvol_disk_destroy_cb(void *cb_arg, int bdeverrno)
975 {
976 	struct spdk_lvol *lvol = cb_arg;
977 
978 	if (bdeverrno < 0) {
979 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
980 			    lvol->unique_id);
981 		return;
982 	}
983 
984 	spdk_lvol_destroy(lvol, lvol_destroy_cb, NULL);
985 }
986 
987 static void
988 _create_lvol_disk_unload_cb(void *cb_arg, int bdeverrno)
989 {
990 	struct spdk_lvol *lvol = cb_arg;
991 
992 	if (bdeverrno < 0) {
993 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
994 			    lvol->unique_id);
995 		return;
996 	}
997 
998 	TAILQ_REMOVE(&lvol->lvol_store->lvols, lvol, link);
999 	free(lvol);
1000 }
1001 
1002 static int
1003 _create_lvol_disk(struct spdk_lvol *lvol, bool destroy)
1004 {
1005 	struct spdk_bdev *bdev;
1006 	struct lvol_bdev *lvol_bdev;
1007 	struct lvol_store_bdev *lvs_bdev;
1008 	uint64_t total_size;
1009 	unsigned char *alias;
1010 	int rc;
1011 
1012 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
1013 	if (lvs_bdev == NULL) {
1014 		SPDK_ERRLOG("No spdk lvs-bdev pair found for lvol %s\n", lvol->unique_id);
1015 		return -ENODEV;
1016 	}
1017 
1018 	lvol_bdev = calloc(1, sizeof(struct lvol_bdev));
1019 	if (!lvol_bdev) {
1020 		SPDK_ERRLOG("Cannot alloc memory for lvol bdev\n");
1021 		return -ENOMEM;
1022 	}
1023 
1024 	lvol_bdev->lvol = lvol;
1025 	lvol_bdev->lvs_bdev = lvs_bdev;
1026 
1027 	bdev = &lvol_bdev->bdev;
1028 	bdev->name = lvol->unique_id;
1029 	bdev->product_name = "Logical Volume";
1030 	bdev->blocklen = spdk_bs_get_io_unit_size(lvol->lvol_store->blobstore);
1031 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
1032 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
1033 	assert((total_size % bdev->blocklen) == 0);
1034 	bdev->blockcnt = total_size / bdev->blocklen;
1035 	bdev->uuid = lvol->uuid;
1036 	bdev->required_alignment = lvs_bdev->bdev->required_alignment;
1037 	bdev->split_on_optimal_io_boundary = true;
1038 	bdev->optimal_io_boundary = spdk_bs_get_cluster_size(lvol->lvol_store->blobstore) / bdev->blocklen;
1039 
1040 	bdev->ctxt = lvol;
1041 	bdev->fn_table = &vbdev_lvol_fn_table;
1042 	bdev->module = &g_lvol_if;
1043 
1044 	/* Set default bdev reset waiting time. This value indicates how much
1045 	 * time a reset should wait before forcing a reset down to the underlying
1046 	 * bdev module.
1047 	 * Setting this parameter is mainly to avoid "empty" resets to a shared
1048 	 * bdev that may be used by multiple lvols. */
1049 	bdev->reset_io_drain_timeout = BDEV_RESET_IO_DRAIN_RECOMMENDED_VALUE;
1050 
1051 	rc = spdk_bdev_register(bdev);
1052 	if (rc) {
1053 		free(lvol_bdev);
1054 		return rc;
1055 	}
1056 	lvol->bdev = bdev;
1057 
1058 	alias = spdk_sprintf_alloc("%s/%s", lvs_bdev->lvs->name, lvol->name);
1059 	if (alias == NULL) {
1060 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
1061 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
1062 						  _create_lvol_disk_unload_cb), lvol);
1063 		return -ENOMEM;
1064 	}
1065 
1066 	rc = spdk_bdev_alias_add(bdev, alias);
1067 	if (rc != 0) {
1068 		SPDK_ERRLOG("Cannot add alias to lvol bdev\n");
1069 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
1070 						  _create_lvol_disk_unload_cb), lvol);
1071 	}
1072 	free(alias);
1073 
1074 	return rc;
1075 }
1076 
1077 static void
1078 _vbdev_lvol_create_cb(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1079 {
1080 	struct spdk_lvol_with_handle_req *req = cb_arg;
1081 
1082 	if (lvolerrno < 0) {
1083 		goto end;
1084 	}
1085 
1086 	lvolerrno = _create_lvol_disk(lvol, true);
1087 
1088 end:
1089 	req->cb_fn(req->cb_arg, lvol, lvolerrno);
1090 	free(req);
1091 }
1092 
1093 int
1094 vbdev_lvol_create(struct spdk_lvol_store *lvs, const char *name, uint64_t sz,
1095 		  bool thin_provision, enum lvol_clear_method clear_method, spdk_lvol_op_with_handle_complete cb_fn,
1096 		  void *cb_arg)
1097 {
1098 	struct spdk_lvol_with_handle_req *req;
1099 	int rc;
1100 
1101 	req = calloc(1, sizeof(*req));
1102 	if (req == NULL) {
1103 		return -ENOMEM;
1104 	}
1105 	req->cb_fn = cb_fn;
1106 	req->cb_arg = cb_arg;
1107 
1108 	rc = spdk_lvol_create(lvs, name, sz, thin_provision, clear_method,
1109 			      _vbdev_lvol_create_cb, req);
1110 	if (rc != 0) {
1111 		free(req);
1112 	}
1113 
1114 	return rc;
1115 }
1116 
1117 void
1118 vbdev_lvol_create_snapshot(struct spdk_lvol *lvol, const char *snapshot_name,
1119 			   spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1120 {
1121 	struct spdk_lvol_with_handle_req *req;
1122 
1123 	req = calloc(1, sizeof(*req));
1124 	if (req == NULL) {
1125 		cb_fn(cb_arg, NULL, -ENOMEM);
1126 		return;
1127 	}
1128 
1129 	req->cb_fn = cb_fn;
1130 	req->cb_arg = cb_arg;
1131 
1132 	spdk_lvol_create_snapshot(lvol, snapshot_name, _vbdev_lvol_create_cb, req);
1133 }
1134 
1135 void
1136 vbdev_lvol_create_clone(struct spdk_lvol *lvol, const char *clone_name,
1137 			spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1138 {
1139 	struct spdk_lvol_with_handle_req *req;
1140 
1141 	req = calloc(1, sizeof(*req));
1142 	if (req == NULL) {
1143 		cb_fn(cb_arg, NULL, -ENOMEM);
1144 		return;
1145 	}
1146 
1147 	req->cb_fn = cb_fn;
1148 	req->cb_arg = cb_arg;
1149 
1150 	spdk_lvol_create_clone(lvol, clone_name, _vbdev_lvol_create_cb, req);
1151 }
1152 
1153 static void
1154 _vbdev_lvol_rename_cb(void *cb_arg, int lvolerrno)
1155 {
1156 	struct spdk_lvol_req *req = cb_arg;
1157 
1158 	if (lvolerrno != 0) {
1159 		SPDK_ERRLOG("Renaming lvol failed\n");
1160 	}
1161 
1162 	req->cb_fn(req->cb_arg, lvolerrno);
1163 	free(req);
1164 }
1165 
1166 void
1167 vbdev_lvol_rename(struct spdk_lvol *lvol, const char *new_lvol_name,
1168 		  spdk_lvol_op_complete cb_fn, void *cb_arg)
1169 {
1170 	struct spdk_lvol_req *req;
1171 	int rc;
1172 
1173 	rc = _vbdev_lvol_change_bdev_alias(lvol, new_lvol_name);
1174 	if (rc != 0) {
1175 		SPDK_ERRLOG("renaming lvol to '%s' does not succeed\n", new_lvol_name);
1176 		cb_fn(cb_arg, rc);
1177 		return;
1178 	}
1179 
1180 	req = calloc(1, sizeof(*req));
1181 	if (req == NULL) {
1182 		cb_fn(cb_arg, -ENOMEM);
1183 		return;
1184 	}
1185 	req->cb_fn = cb_fn;
1186 	req->cb_arg = cb_arg;
1187 
1188 	spdk_lvol_rename(lvol, new_lvol_name, _vbdev_lvol_rename_cb, req);
1189 }
1190 
1191 static void
1192 _vbdev_lvol_resize_cb(void *cb_arg, int lvolerrno)
1193 {
1194 	struct spdk_lvol_req *req = cb_arg;
1195 	struct spdk_lvol *lvol = req->lvol;
1196 	uint64_t total_size;
1197 
1198 	/* change bdev size */
1199 	if (lvolerrno != 0) {
1200 		SPDK_ERRLOG("CB function for bdev lvol %s receive error no: %d.\n", lvol->name, lvolerrno);
1201 		goto finish;
1202 	}
1203 
1204 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
1205 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
1206 	assert((total_size % lvol->bdev->blocklen) == 0);
1207 
1208 	lvolerrno = spdk_bdev_notify_blockcnt_change(lvol->bdev, total_size / lvol->bdev->blocklen);
1209 	if (lvolerrno != 0) {
1210 		SPDK_ERRLOG("Could not change num blocks for bdev lvol %s with error no: %d.\n",
1211 			    lvol->name, lvolerrno);
1212 	}
1213 
1214 finish:
1215 	req->cb_fn(req->cb_arg, lvolerrno);
1216 	free(req);
1217 }
1218 
1219 void
1220 vbdev_lvol_resize(struct spdk_lvol *lvol, uint64_t sz, spdk_lvol_op_complete cb_fn, void *cb_arg)
1221 {
1222 	struct spdk_lvol_req *req;
1223 
1224 	if (lvol == NULL) {
1225 		SPDK_ERRLOG("lvol does not exist\n");
1226 		cb_fn(cb_arg, -EINVAL);
1227 		return;
1228 	}
1229 
1230 	assert(lvol->bdev != NULL);
1231 
1232 	req = calloc(1, sizeof(*req));
1233 	if (req == NULL) {
1234 		cb_fn(cb_arg, -ENOMEM);
1235 		return;
1236 	}
1237 
1238 	req->cb_fn = cb_fn;
1239 	req->cb_arg = cb_arg;
1240 	req->sz = sz;
1241 	req->lvol = lvol;
1242 
1243 	spdk_lvol_resize(req->lvol, req->sz, _vbdev_lvol_resize_cb, req);
1244 }
1245 
1246 static void
1247 _vbdev_lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
1248 {
1249 	struct spdk_lvol_req *req = cb_arg;
1250 	struct spdk_lvol *lvol = req->lvol;
1251 
1252 	if (lvolerrno != 0) {
1253 		SPDK_ERRLOG("Could not set bdev lvol %s as read only due to error: %d.\n", lvol->name, lvolerrno);
1254 	}
1255 
1256 	req->cb_fn(req->cb_arg, lvolerrno);
1257 	free(req);
1258 }
1259 
1260 void
1261 vbdev_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1262 {
1263 	struct spdk_lvol_req *req;
1264 
1265 	if (lvol == NULL) {
1266 		SPDK_ERRLOG("lvol does not exist\n");
1267 		cb_fn(cb_arg, -EINVAL);
1268 		return;
1269 	}
1270 
1271 	assert(lvol->bdev != NULL);
1272 
1273 	req = calloc(1, sizeof(*req));
1274 	if (req == NULL) {
1275 		cb_fn(cb_arg, -ENOMEM);
1276 		return;
1277 	}
1278 
1279 	req->cb_fn = cb_fn;
1280 	req->cb_arg = cb_arg;
1281 	req->lvol = lvol;
1282 
1283 	spdk_lvol_set_read_only(lvol, _vbdev_lvol_set_read_only_cb, req);
1284 }
1285 
1286 static int
1287 vbdev_lvs_init(void)
1288 {
1289 	return 0;
1290 }
1291 
1292 static void vbdev_lvs_fini_start_iter(struct lvol_store_bdev *lvs_bdev);
1293 
1294 static void
1295 vbdev_lvs_fini_start_unload_cb(void *cb_arg, int lvserrno)
1296 {
1297 	struct lvol_store_bdev *lvs_bdev = cb_arg;
1298 	struct lvol_store_bdev *next_lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
1299 
1300 	if (lvserrno != 0) {
1301 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
1302 	}
1303 
1304 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1305 	free(lvs_bdev);
1306 
1307 	vbdev_lvs_fini_start_iter(next_lvs_bdev);
1308 }
1309 
1310 static void
1311 vbdev_lvs_fini_start_iter(struct lvol_store_bdev *lvs_bdev)
1312 {
1313 	struct spdk_lvol_store *lvs;
1314 
1315 	while (lvs_bdev != NULL) {
1316 		lvs = lvs_bdev->lvs;
1317 
1318 		if (_vbdev_lvs_are_lvols_closed(lvs)) {
1319 			spdk_lvs_unload(lvs, vbdev_lvs_fini_start_unload_cb, lvs_bdev);
1320 			return;
1321 		}
1322 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
1323 	}
1324 
1325 	spdk_bdev_module_fini_start_done();
1326 }
1327 
1328 static void
1329 vbdev_lvs_fini_start(void)
1330 {
1331 	g_shutdown_started = true;
1332 	vbdev_lvs_fini_start_iter(vbdev_lvol_store_first());
1333 }
1334 
1335 static int
1336 vbdev_lvs_get_ctx_size(void)
1337 {
1338 	return sizeof(struct vbdev_lvol_io);
1339 }
1340 
1341 static void
1342 _vbdev_lvs_examine_failed(void *cb_arg, int lvserrno)
1343 {
1344 	spdk_bdev_module_examine_done(&g_lvol_if);
1345 }
1346 
1347 static void
1348 _vbdev_lvol_examine_close_cb(struct spdk_lvol_store *lvs)
1349 {
1350 	if (lvs->lvols_opened >= lvs->lvol_count) {
1351 		SPDK_INFOLOG(vbdev_lvol, "Opening lvols finished\n");
1352 		spdk_bdev_module_examine_done(&g_lvol_if);
1353 	}
1354 }
1355 
1356 static void
1357 _vbdev_lvs_examine_finish(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1358 {
1359 	struct spdk_lvol_store *lvs = cb_arg;
1360 
1361 	if (lvolerrno != 0) {
1362 		SPDK_ERRLOG("Error opening lvol %s\n", lvol->unique_id);
1363 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
1364 		lvs->lvol_count--;
1365 		free(lvol);
1366 		goto end;
1367 	}
1368 
1369 	if (_create_lvol_disk(lvol, false)) {
1370 		SPDK_ERRLOG("Cannot create bdev for lvol %s\n", lvol->unique_id);
1371 		lvs->lvol_count--;
1372 		_vbdev_lvol_examine_close_cb(lvs);
1373 		SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s failed\n", lvol->unique_id);
1374 		return;
1375 	}
1376 
1377 	lvs->lvols_opened++;
1378 	SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s succeeded\n", lvol->unique_id);
1379 
1380 end:
1381 
1382 	if (lvs->lvols_opened >= lvs->lvol_count) {
1383 		SPDK_INFOLOG(vbdev_lvol, "Opening lvols finished\n");
1384 		spdk_bdev_module_examine_done(&g_lvol_if);
1385 	}
1386 }
1387 
1388 static void
1389 _vbdev_lvs_examine_cb(void *arg, struct spdk_lvol_store *lvol_store, int lvserrno)
1390 {
1391 	struct lvol_store_bdev *lvs_bdev;
1392 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)arg;
1393 	struct spdk_lvol *lvol, *tmp;
1394 
1395 	if (lvserrno == -EEXIST) {
1396 		SPDK_INFOLOG(vbdev_lvol,
1397 			     "Name for lvolstore on device %s conflicts with name for already loaded lvs\n",
1398 			     req->base_bdev->name);
1399 		/* On error blobstore destroys bs_dev itself */
1400 		spdk_bdev_module_examine_done(&g_lvol_if);
1401 		goto end;
1402 	} else if (lvserrno != 0) {
1403 		SPDK_INFOLOG(vbdev_lvol, "Lvol store not found on %s\n", req->base_bdev->name);
1404 		/* On error blobstore destroys bs_dev itself */
1405 		spdk_bdev_module_examine_done(&g_lvol_if);
1406 		goto end;
1407 	}
1408 
1409 	lvserrno = spdk_bs_bdev_claim(lvol_store->bs_dev, &g_lvol_if);
1410 	if (lvserrno != 0) {
1411 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
1412 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, NULL);
1413 		goto end;
1414 	}
1415 
1416 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
1417 	if (!lvs_bdev) {
1418 		SPDK_ERRLOG("Cannot alloc memory for lvs_bdev\n");
1419 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, NULL);
1420 		goto end;
1421 	}
1422 
1423 	lvs_bdev->lvs = lvol_store;
1424 	lvs_bdev->bdev = req->base_bdev;
1425 
1426 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1427 
1428 	SPDK_INFOLOG(vbdev_lvol, "Lvol store found on %s - begin parsing\n",
1429 		     req->base_bdev->name);
1430 
1431 	lvol_store->lvols_opened = 0;
1432 
1433 	if (TAILQ_EMPTY(&lvol_store->lvols)) {
1434 		SPDK_INFOLOG(vbdev_lvol, "Lvol store examination done\n");
1435 		spdk_bdev_module_examine_done(&g_lvol_if);
1436 	} else {
1437 		/* Open all lvols */
1438 		TAILQ_FOREACH_SAFE(lvol, &lvol_store->lvols, link, tmp) {
1439 			spdk_lvol_open(lvol, _vbdev_lvs_examine_finish, lvol_store);
1440 		}
1441 	}
1442 
1443 end:
1444 	free(req);
1445 }
1446 
1447 static void
1448 vbdev_lvs_examine(struct spdk_bdev *bdev)
1449 {
1450 	struct spdk_bs_dev *bs_dev;
1451 	struct spdk_lvs_with_handle_req *req;
1452 	int rc;
1453 
1454 	if (spdk_bdev_get_md_size(bdev) != 0) {
1455 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n which is formatted with metadata",
1456 			     bdev->name);
1457 		spdk_bdev_module_examine_done(&g_lvol_if);
1458 		return;
1459 	}
1460 
1461 	req = calloc(1, sizeof(*req));
1462 	if (req == NULL) {
1463 		spdk_bdev_module_examine_done(&g_lvol_if);
1464 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1465 		return;
1466 	}
1467 
1468 	rc = spdk_bdev_create_bs_dev_ext(bdev->name, vbdev_lvs_base_bdev_event_cb,
1469 					 NULL, &bs_dev);
1470 	if (rc < 0) {
1471 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n", bdev->name);
1472 		spdk_bdev_module_examine_done(&g_lvol_if);
1473 		free(req);
1474 		return;
1475 	}
1476 
1477 	req->base_bdev = bdev;
1478 
1479 	spdk_lvs_load(bs_dev, _vbdev_lvs_examine_cb, req);
1480 }
1481 
1482 struct spdk_lvol *
1483 vbdev_lvol_get_from_bdev(struct spdk_bdev *bdev)
1484 {
1485 	if (!bdev || bdev->module != &g_lvol_if) {
1486 		return NULL;
1487 	}
1488 
1489 	if (bdev->ctxt == NULL) {
1490 		SPDK_ERRLOG("No lvol ctx assigned to bdev %s\n", bdev->name);
1491 		return NULL;
1492 	}
1493 
1494 	return (struct spdk_lvol *)bdev->ctxt;
1495 }
1496 
1497 static void
1498 _vbdev_lvs_grow_finish(void *arg, int lvserrno)
1499 {
1500 	struct spdk_lvs_grow_req *req = arg;
1501 	req->cb_fn(req->cb_arg, req->lvserrno);
1502 	free(req);
1503 }
1504 
1505 static void
1506 _vbdev_lvs_grow_examine_finish(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1507 {
1508 	struct spdk_lvs_grow_req *req = cb_arg;
1509 	struct spdk_lvol_store *lvs = req->lvol_store;
1510 
1511 	if (lvolerrno != 0) {
1512 		SPDK_ERRLOG("Error opening lvol %s\n", lvol->unique_id);
1513 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
1514 		assert(lvs->lvol_count > 0);
1515 		lvs->lvol_count--;
1516 		free(lvol);
1517 		goto end;
1518 	}
1519 
1520 	if (_create_lvol_disk(lvol, false)) {
1521 		SPDK_ERRLOG("Cannot create bdev for lvol %s\n", lvol->unique_id);
1522 		assert(lvs->lvol_count > 0);
1523 		lvs->lvol_count--;
1524 		SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s failed\n", lvol->unique_id);
1525 		goto end;
1526 	}
1527 
1528 	lvs->lvols_opened++;
1529 	SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s succeeded\n", lvol->unique_id);
1530 
1531 end:
1532 	if (lvs->lvols_opened >= lvs->lvol_count) {
1533 		SPDK_INFOLOG(vbdev_lvol, "Opening lvols finished\n");
1534 		_vbdev_lvs_grow_finish(req, 0);
1535 	}
1536 }
1537 
1538 static void
1539 _vbdev_lvs_grow_examine_cb(void *arg, struct spdk_lvol_store *lvol_store, int lvserrno)
1540 {
1541 	struct lvol_store_bdev *lvs_bdev;
1542 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)arg;
1543 	struct spdk_lvol *lvol, *tmp;
1544 	struct spdk_lvs_grow_req *ori_req = req->cb_arg;
1545 
1546 	if (lvserrno == -EEXIST) {
1547 		SPDK_INFOLOG(vbdev_lvol,
1548 			     "Name for lvolstore on device %s conflicts with name for already loaded lvs\n",
1549 			     req->base_bdev->name);
1550 		ori_req->lvserrno = lvserrno;
1551 		_vbdev_lvs_grow_finish(ori_req, lvserrno);
1552 		goto end;
1553 	} else if (lvserrno != 0) {
1554 		SPDK_INFOLOG(vbdev_lvol, "Lvol store not found on %s\n", req->base_bdev->name);
1555 		/* On error blobstore destroys bs_dev itself */
1556 		ori_req->lvserrno = lvserrno;
1557 		_vbdev_lvs_grow_finish(ori_req, lvserrno);
1558 		goto end;
1559 	}
1560 
1561 	lvserrno = spdk_bs_bdev_claim(lvol_store->bs_dev, &g_lvol_if);
1562 	if (lvserrno != 0) {
1563 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
1564 		ori_req->lvserrno = lvserrno;
1565 		spdk_lvs_unload(lvol_store, _vbdev_lvs_grow_finish, ori_req);
1566 		goto end;
1567 	}
1568 
1569 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
1570 	if (!lvs_bdev) {
1571 		SPDK_ERRLOG("Cannot alloc memory for lvs_bdev\n");
1572 		ori_req->lvserrno = -ENOMEM;
1573 		spdk_lvs_unload(lvol_store, _vbdev_lvs_grow_finish, ori_req);
1574 		goto end;
1575 	}
1576 
1577 	lvs_bdev->lvs = lvol_store;
1578 	lvs_bdev->bdev = req->base_bdev;
1579 
1580 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1581 	SPDK_INFOLOG(vbdev_lvol, "Lvol store found on %s - begin parsing\n",
1582 		     req->base_bdev->name);
1583 
1584 	lvol_store->lvols_opened = 0;
1585 
1586 	ori_req->lvol_store = lvol_store;
1587 	ori_req->lvserrno = 0;
1588 	if (TAILQ_EMPTY(&lvol_store->lvols)) {
1589 		SPDK_INFOLOG(vbdev_lvol, "Lvol store examination done\n");
1590 		_vbdev_lvs_grow_finish(ori_req, 0);
1591 	} else {
1592 		/* Open all lvols */
1593 		TAILQ_FOREACH_SAFE(lvol, &lvol_store->lvols, link, tmp) {
1594 			spdk_lvol_open(lvol, _vbdev_lvs_grow_examine_finish, ori_req);
1595 		}
1596 	}
1597 
1598 end:
1599 	free(req);
1600 }
1601 
1602 static void
1603 _vbdev_lvs_grow_examine(struct spdk_bdev *bdev, struct spdk_lvs_grow_req *ori_req)
1604 {
1605 	struct spdk_bs_dev *bs_dev;
1606 	struct spdk_lvs_with_handle_req *req;
1607 	int rc;
1608 
1609 	req = calloc(1, sizeof(*req));
1610 	if (req == NULL) {
1611 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1612 		ori_req->lvserrno = -ENOMEM;
1613 		_vbdev_lvs_grow_finish(ori_req, -ENOMEM);
1614 		return;
1615 	}
1616 
1617 	rc = spdk_bdev_create_bs_dev_ext(bdev->name, vbdev_lvs_base_bdev_event_cb,
1618 					 NULL, &bs_dev);
1619 	if (rc < 0) {
1620 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n", bdev->name);
1621 		ori_req->lvserrno = rc;
1622 		_vbdev_lvs_grow_finish(ori_req, rc);
1623 		free(req);
1624 		return;
1625 	}
1626 
1627 	req->base_bdev = bdev;
1628 	req->cb_arg = ori_req;
1629 
1630 	spdk_lvs_grow(bs_dev, _vbdev_lvs_grow_examine_cb, req);
1631 }
1632 
1633 static void
1634 _vbdev_lvs_grow_unload_cb(void *cb_arg, int lvserrno)
1635 {
1636 	struct spdk_lvs_grow_req *req = cb_arg;
1637 	struct lvol_store_bdev *lvs_bdev;
1638 	struct spdk_bdev *bdev;
1639 
1640 	if (lvserrno != 0) {
1641 		req->cb_fn(req->cb_arg, lvserrno);
1642 		free(req);
1643 		return;
1644 	}
1645 
1646 	lvs_bdev = req->lvs_bdev;
1647 	bdev = lvs_bdev->bdev;
1648 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1649 	_vbdev_lvs_grow_examine(bdev, req);
1650 	free(lvs_bdev);
1651 }
1652 
1653 static void
1654 _vbdev_lvs_grow_remove_bdev_unregistered_cb(void *cb_arg, int bdeverrno)
1655 {
1656 	struct spdk_lvs_grow_req *req = cb_arg;
1657 	struct spdk_lvol_store *lvs = req->lvol_store;
1658 
1659 	if (bdeverrno != 0) {
1660 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol unregistered with errno %d\n", bdeverrno);
1661 	}
1662 
1663 	req->lvol_cnt--;
1664 
1665 	if (req->lvol_cnt == 0) {
1666 		/* Lvol store can be unloaded once all lvols are closed. */
1667 		if (_vbdev_lvs_are_lvols_closed(lvs)) {
1668 			spdk_lvs_unload(lvs, _vbdev_lvs_grow_unload_cb, req);
1669 		} else {
1670 			req->cb_fn(req->cb_arg, -EINVAL);
1671 			free(req);
1672 		}
1673 	}
1674 }
1675 
1676 void
1677 vbdev_lvs_grow(struct spdk_lvol_store *lvs,
1678 	       spdk_lvs_op_complete cb_fn, void *cb_arg)
1679 {
1680 	struct spdk_lvs_grow_req *req;
1681 	struct spdk_lvol *lvol, *tmp;
1682 
1683 	req = calloc(1, sizeof(*req));
1684 	if (!req) {
1685 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1686 		cb_fn(cb_arg, -ENOMEM);
1687 		return;
1688 	}
1689 	req->cb_fn = cb_fn;
1690 	req->cb_arg = cb_arg;
1691 	req->lvol_store = lvs;
1692 	req->lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
1693 
1694 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
1695 		spdk_lvs_unload(lvs, _vbdev_lvs_grow_unload_cb, req);
1696 	} else {
1697 		lvs->destruct = false;
1698 		TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
1699 			req->lvol_cnt++;
1700 			spdk_bdev_unregister(lvol->bdev, _vbdev_lvs_grow_remove_bdev_unregistered_cb, req);
1701 		}
1702 		assert(req->lvol_cnt > 0);
1703 	}
1704 }
1705 
1706 SPDK_LOG_REGISTER_COMPONENT(vbdev_lvol)
1707