xref: /spdk/module/bdev/lvol/vbdev_lvol.c (revision 784b9d48746955f210926648a0131f84f58de76f)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/blob_bdev.h"
8 #include "spdk/rpc.h"
9 #include "spdk/bdev_module.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/uuid.h"
13 
14 #include "vbdev_lvol.h"
15 
16 struct vbdev_lvol_io {
17 	struct spdk_blob_ext_io_opts ext_io_opts;
18 };
19 
20 static TAILQ_HEAD(, lvol_store_bdev) g_spdk_lvol_pairs = TAILQ_HEAD_INITIALIZER(
21 			g_spdk_lvol_pairs);
22 
23 static int vbdev_lvs_init(void);
24 static void vbdev_lvs_fini_start(void);
25 static int vbdev_lvs_get_ctx_size(void);
26 static void vbdev_lvs_examine(struct spdk_bdev *bdev);
27 static bool g_shutdown_started = false;
28 
29 static struct spdk_bdev_module g_lvol_if = {
30 	.name = "lvol",
31 	.module_init = vbdev_lvs_init,
32 	.fini_start = vbdev_lvs_fini_start,
33 	.async_fini_start = true,
34 	.examine_disk = vbdev_lvs_examine,
35 	.get_ctx_size = vbdev_lvs_get_ctx_size,
36 
37 };
38 
39 SPDK_BDEV_MODULE_REGISTER(lvol, &g_lvol_if)
40 
41 struct lvol_store_bdev *
42 vbdev_get_lvs_bdev_by_lvs(struct spdk_lvol_store *lvs_orig)
43 {
44 	struct spdk_lvol_store *lvs = NULL;
45 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
46 
47 	while (lvs_bdev != NULL) {
48 		lvs = lvs_bdev->lvs;
49 		if (lvs == lvs_orig) {
50 			if (lvs_bdev->req != NULL) {
51 				/* We do not allow access to lvs that are being destroyed */
52 				return NULL;
53 			} else {
54 				return lvs_bdev;
55 			}
56 		}
57 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
58 	}
59 
60 	return NULL;
61 }
62 
63 static int
64 _vbdev_lvol_change_bdev_alias(struct spdk_lvol *lvol, const char *new_lvol_name)
65 {
66 	struct spdk_bdev_alias *tmp;
67 	char *old_alias;
68 	char *alias;
69 	int rc;
70 	int alias_number = 0;
71 
72 	/* bdev representing lvols have only one alias,
73 	 * while we changed lvs name earlier, we have to iterate alias list to get one,
74 	 * and check if there is only one alias */
75 
76 	TAILQ_FOREACH(tmp, spdk_bdev_get_aliases(lvol->bdev), tailq) {
77 		if (++alias_number > 1) {
78 			SPDK_ERRLOG("There is more than 1 alias in bdev %s\n", lvol->bdev->name);
79 			return -EINVAL;
80 		}
81 
82 		old_alias = tmp->alias.name;
83 	}
84 
85 	if (alias_number == 0) {
86 		SPDK_ERRLOG("There are no aliases in bdev %s\n", lvol->bdev->name);
87 		return -EINVAL;
88 	}
89 
90 	alias = spdk_sprintf_alloc("%s/%s", lvol->lvol_store->name, new_lvol_name);
91 	if (alias == NULL) {
92 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
93 		return -ENOMEM;
94 	}
95 
96 	rc = spdk_bdev_alias_add(lvol->bdev, alias);
97 	if (rc != 0) {
98 		SPDK_ERRLOG("cannot add alias '%s'\n", alias);
99 		free(alias);
100 		return rc;
101 	}
102 	free(alias);
103 
104 	rc = spdk_bdev_alias_del(lvol->bdev, old_alias);
105 	if (rc != 0) {
106 		SPDK_ERRLOG("cannot remove alias '%s'\n", old_alias);
107 		return rc;
108 	}
109 
110 	return 0;
111 }
112 
113 static struct lvol_store_bdev *
114 vbdev_get_lvs_bdev_by_bdev(struct spdk_bdev *bdev_orig)
115 {
116 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
117 
118 	while (lvs_bdev != NULL) {
119 		if (lvs_bdev->bdev == bdev_orig) {
120 			if (lvs_bdev->req != NULL) {
121 				/* We do not allow access to lvs that are being destroyed */
122 				return NULL;
123 			} else {
124 				return lvs_bdev;
125 			}
126 		}
127 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
128 	}
129 
130 	return NULL;
131 }
132 
133 static void
134 vbdev_lvs_hotremove_cb(struct spdk_bdev *bdev)
135 {
136 	struct lvol_store_bdev *lvs_bdev;
137 
138 	lvs_bdev = vbdev_get_lvs_bdev_by_bdev(bdev);
139 	if (lvs_bdev != NULL) {
140 		vbdev_lvs_unload(lvs_bdev->lvs, NULL, NULL);
141 	}
142 }
143 
144 static void
145 vbdev_lvs_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
146 			     void *event_ctx)
147 {
148 	switch (type) {
149 	case SPDK_BDEV_EVENT_REMOVE:
150 		vbdev_lvs_hotremove_cb(bdev);
151 		break;
152 	default:
153 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
154 		break;
155 	}
156 }
157 
158 static void
159 _vbdev_lvs_create_cb(void *cb_arg, struct spdk_lvol_store *lvs, int lvserrno)
160 {
161 	struct spdk_lvs_with_handle_req *req = cb_arg;
162 	struct lvol_store_bdev *lvs_bdev;
163 	struct spdk_bdev *bdev = req->base_bdev;
164 	struct spdk_bs_dev *bs_dev = req->bs_dev;
165 
166 	if (lvserrno != 0) {
167 		assert(lvs == NULL);
168 		SPDK_ERRLOG("Cannot create lvol store bdev\n");
169 		goto end;
170 	}
171 
172 	lvserrno = spdk_bs_bdev_claim(bs_dev, &g_lvol_if);
173 	if (lvserrno != 0) {
174 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
175 		req->bs_dev->destroy(req->bs_dev);
176 		goto end;
177 	}
178 
179 	assert(lvs != NULL);
180 
181 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
182 	if (!lvs_bdev) {
183 		lvserrno = -ENOMEM;
184 		goto end;
185 	}
186 	lvs_bdev->lvs = lvs;
187 	lvs_bdev->bdev = bdev;
188 	lvs_bdev->req = NULL;
189 
190 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
191 	SPDK_INFOLOG(vbdev_lvol, "Lvol store bdev inserted\n");
192 
193 end:
194 	req->cb_fn(req->cb_arg, lvs, lvserrno);
195 	free(req);
196 
197 	return;
198 }
199 
200 int
201 vbdev_lvs_create(const char *base_bdev_name, const char *name, uint32_t cluster_sz,
202 		 enum lvs_clear_method clear_method, uint32_t num_md_pages_per_cluster_ratio,
203 		 spdk_lvs_op_with_handle_complete cb_fn, void *cb_arg)
204 {
205 	struct spdk_bs_dev *bs_dev;
206 	struct spdk_lvs_with_handle_req *lvs_req;
207 	struct spdk_lvs_opts opts;
208 	int rc;
209 	int len;
210 
211 	if (base_bdev_name == NULL) {
212 		SPDK_ERRLOG("missing base_bdev_name param\n");
213 		return -EINVAL;
214 	}
215 
216 	spdk_lvs_opts_init(&opts);
217 	if (cluster_sz != 0) {
218 		opts.cluster_sz = cluster_sz;
219 	}
220 
221 	if (clear_method != 0) {
222 		opts.clear_method = clear_method;
223 	}
224 
225 	if (num_md_pages_per_cluster_ratio != 0) {
226 		opts.num_md_pages_per_cluster_ratio = num_md_pages_per_cluster_ratio;
227 	}
228 
229 	if (name == NULL) {
230 		SPDK_ERRLOG("missing name param\n");
231 		return -EINVAL;
232 	}
233 
234 	len = strnlen(name, SPDK_LVS_NAME_MAX);
235 
236 	if (len == 0 || len == SPDK_LVS_NAME_MAX) {
237 		SPDK_ERRLOG("name must be between 1 and %d characters\n", SPDK_LVS_NAME_MAX - 1);
238 		return -EINVAL;
239 	}
240 	snprintf(opts.name, sizeof(opts.name), "%s", name);
241 
242 	lvs_req = calloc(1, sizeof(*lvs_req));
243 	if (!lvs_req) {
244 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
245 		return -ENOMEM;
246 	}
247 
248 	rc = spdk_bdev_create_bs_dev_ext(base_bdev_name, vbdev_lvs_base_bdev_event_cb,
249 					 NULL, &bs_dev);
250 	if (rc < 0) {
251 		SPDK_ERRLOG("Cannot create blobstore device\n");
252 		free(lvs_req);
253 		return rc;
254 	}
255 
256 	lvs_req->bs_dev = bs_dev;
257 	lvs_req->base_bdev = bs_dev->get_base_bdev(bs_dev);
258 	lvs_req->cb_fn = cb_fn;
259 	lvs_req->cb_arg = cb_arg;
260 
261 	rc = spdk_lvs_init(bs_dev, &opts, _vbdev_lvs_create_cb, lvs_req);
262 	if (rc < 0) {
263 		free(lvs_req);
264 		bs_dev->destroy(bs_dev);
265 		return rc;
266 	}
267 
268 	return 0;
269 }
270 
271 static void
272 _vbdev_lvs_rename_cb(void *cb_arg, int lvserrno)
273 {
274 	struct spdk_lvs_req *req = cb_arg;
275 	struct spdk_lvol *tmp;
276 
277 	if (lvserrno != 0) {
278 		SPDK_INFOLOG(vbdev_lvol, "Lvol store rename failed\n");
279 	} else {
280 		TAILQ_FOREACH(tmp, &req->lvol_store->lvols, link) {
281 			/* We have to pass current lvol name, since only lvs name changed */
282 			_vbdev_lvol_change_bdev_alias(tmp, tmp->name);
283 		}
284 	}
285 
286 	req->cb_fn(req->cb_arg, lvserrno);
287 	free(req);
288 }
289 
290 void
291 vbdev_lvs_rename(struct spdk_lvol_store *lvs, const char *new_lvs_name,
292 		 spdk_lvs_op_complete cb_fn, void *cb_arg)
293 {
294 	struct lvol_store_bdev *lvs_bdev;
295 
296 	struct spdk_lvs_req *req;
297 
298 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
299 	if (!lvs_bdev) {
300 		SPDK_ERRLOG("No such lvol store found\n");
301 		cb_fn(cb_arg, -ENODEV);
302 		return;
303 	}
304 
305 	req = calloc(1, sizeof(*req));
306 	if (!req) {
307 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
308 		cb_fn(cb_arg, -ENOMEM);
309 		return;
310 	}
311 	req->cb_fn = cb_fn;
312 	req->cb_arg = cb_arg;
313 	req->lvol_store = lvs;
314 
315 	spdk_lvs_rename(lvs, new_lvs_name, _vbdev_lvs_rename_cb, req);
316 }
317 
318 static void
319 _vbdev_lvs_remove_cb(void *cb_arg, int lvserrno)
320 {
321 	struct lvol_store_bdev *lvs_bdev = cb_arg;
322 	struct spdk_lvs_req *req = lvs_bdev->req;
323 
324 	if (lvserrno != 0) {
325 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
326 	}
327 
328 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
329 	free(lvs_bdev);
330 
331 	if (req->cb_fn != NULL) {
332 		req->cb_fn(req->cb_arg, lvserrno);
333 	}
334 	free(req);
335 }
336 
337 static void
338 _vbdev_lvs_remove_lvol_cb(void *cb_arg, int lvolerrno)
339 {
340 	struct lvol_store_bdev *lvs_bdev = cb_arg;
341 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
342 	struct spdk_lvol *lvol;
343 
344 	if (lvolerrno != 0) {
345 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol removed with errno %d\n", lvolerrno);
346 	}
347 
348 	if (TAILQ_EMPTY(&lvs->lvols)) {
349 		spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
350 		return;
351 	}
352 
353 	lvol = TAILQ_FIRST(&lvs->lvols);
354 	while (lvol != NULL) {
355 		if (spdk_lvol_deletable(lvol)) {
356 			vbdev_lvol_destroy(lvol, _vbdev_lvs_remove_lvol_cb, lvs_bdev);
357 			return;
358 		}
359 		lvol = TAILQ_NEXT(lvol, link);
360 	}
361 
362 	/* If no lvol is deletable, that means there is circular dependency. */
363 	SPDK_ERRLOG("Lvols left in lvs, but unable to delete.\n");
364 	assert(false);
365 }
366 
367 static bool
368 _vbdev_lvs_are_lvols_closed(struct spdk_lvol_store *lvs)
369 {
370 	struct spdk_lvol *lvol;
371 
372 	TAILQ_FOREACH(lvol, &lvs->lvols, link) {
373 		if (lvol->ref_count != 0) {
374 			return false;
375 		}
376 	}
377 	return true;
378 }
379 
380 static void
381 _vbdev_lvs_remove_bdev_unregistered_cb(void *cb_arg, int bdeverrno)
382 {
383 	struct lvol_store_bdev *lvs_bdev = cb_arg;
384 	struct spdk_lvol_store *lvs = lvs_bdev->lvs;
385 
386 	if (bdeverrno != 0) {
387 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol unregistered with errno %d\n", bdeverrno);
388 	}
389 
390 	/* Lvol store can be unloaded once all lvols are closed. */
391 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
392 		spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
393 	}
394 }
395 
396 static void
397 _vbdev_lvs_remove(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg,
398 		  bool destroy)
399 {
400 	struct spdk_lvs_req *req;
401 	struct lvol_store_bdev *lvs_bdev;
402 	struct spdk_lvol *lvol, *tmp;
403 
404 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
405 	if (!lvs_bdev) {
406 		SPDK_ERRLOG("No such lvol store found\n");
407 		if (cb_fn != NULL) {
408 			cb_fn(cb_arg, -ENODEV);
409 		}
410 		return;
411 	}
412 
413 	req = calloc(1, sizeof(*req));
414 	if (!req) {
415 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
416 		if (cb_fn != NULL) {
417 			cb_fn(cb_arg, -ENOMEM);
418 		}
419 		return;
420 	}
421 
422 	req->cb_fn = cb_fn;
423 	req->cb_arg = cb_arg;
424 	lvs_bdev->req = req;
425 
426 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
427 		if (destroy) {
428 			spdk_lvs_destroy(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
429 		} else {
430 			spdk_lvs_unload(lvs, _vbdev_lvs_remove_cb, lvs_bdev);
431 		}
432 	} else {
433 		if (destroy) {
434 			_vbdev_lvs_remove_lvol_cb(lvs_bdev, 0);
435 		} else {
436 			TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
437 				spdk_bdev_unregister(lvol->bdev, _vbdev_lvs_remove_bdev_unregistered_cb, lvs_bdev);
438 			}
439 		}
440 	}
441 }
442 
443 void
444 vbdev_lvs_unload(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
445 {
446 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, false);
447 }
448 
449 void
450 vbdev_lvs_destruct(struct spdk_lvol_store *lvs, spdk_lvs_op_complete cb_fn, void *cb_arg)
451 {
452 	_vbdev_lvs_remove(lvs, cb_fn, cb_arg, true);
453 }
454 
455 struct lvol_store_bdev *
456 vbdev_lvol_store_first(void)
457 {
458 	struct lvol_store_bdev *lvs_bdev;
459 
460 	lvs_bdev = TAILQ_FIRST(&g_spdk_lvol_pairs);
461 	if (lvs_bdev) {
462 		SPDK_INFOLOG(vbdev_lvol, "Starting lvolstore iteration at %p\n", lvs_bdev->lvs);
463 	}
464 
465 	return lvs_bdev;
466 }
467 
468 struct lvol_store_bdev *
469 vbdev_lvol_store_next(struct lvol_store_bdev *prev)
470 {
471 	struct lvol_store_bdev *lvs_bdev;
472 
473 	if (prev == NULL) {
474 		SPDK_ERRLOG("prev argument cannot be NULL\n");
475 		return NULL;
476 	}
477 
478 	lvs_bdev = TAILQ_NEXT(prev, lvol_stores);
479 	if (lvs_bdev) {
480 		SPDK_INFOLOG(vbdev_lvol, "Continuing lvolstore iteration at %p\n", lvs_bdev->lvs);
481 	}
482 
483 	return lvs_bdev;
484 }
485 
486 static struct spdk_lvol_store *
487 _vbdev_get_lvol_store_by_uuid(const struct spdk_uuid *uuid)
488 {
489 	struct spdk_lvol_store *lvs = NULL;
490 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
491 
492 	while (lvs_bdev != NULL) {
493 		lvs = lvs_bdev->lvs;
494 		if (spdk_uuid_compare(&lvs->uuid, uuid) == 0) {
495 			return lvs;
496 		}
497 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
498 	}
499 	return NULL;
500 }
501 
502 struct spdk_lvol_store *
503 vbdev_get_lvol_store_by_uuid(const char *uuid_str)
504 {
505 	struct spdk_uuid uuid;
506 
507 	if (spdk_uuid_parse(&uuid, uuid_str)) {
508 		return NULL;
509 	}
510 
511 	return _vbdev_get_lvol_store_by_uuid(&uuid);
512 }
513 
514 struct spdk_lvol_store *
515 vbdev_get_lvol_store_by_name(const char *name)
516 {
517 	struct spdk_lvol_store *lvs = NULL;
518 	struct lvol_store_bdev *lvs_bdev = vbdev_lvol_store_first();
519 
520 	while (lvs_bdev != NULL) {
521 		lvs = lvs_bdev->lvs;
522 		if (strncmp(lvs->name, name, sizeof(lvs->name)) == 0) {
523 			return lvs;
524 		}
525 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
526 	}
527 	return NULL;
528 }
529 
530 struct vbdev_lvol_destroy_ctx {
531 	struct spdk_lvol *lvol;
532 	spdk_lvol_op_complete cb_fn;
533 	void *cb_arg;
534 };
535 
536 static void
537 _vbdev_lvol_unregister_unload_lvs(void *cb_arg, int lvserrno)
538 {
539 	struct lvol_bdev *lvol_bdev = cb_arg;
540 	struct lvol_store_bdev *lvs_bdev = lvol_bdev->lvs_bdev;
541 
542 	if (lvserrno != 0) {
543 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
544 	}
545 
546 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
547 	free(lvs_bdev);
548 
549 	spdk_bdev_destruct_done(&lvol_bdev->bdev, lvserrno);
550 	free(lvol_bdev);
551 }
552 
553 static void
554 _vbdev_lvol_unregister_cb(void *ctx, int lvolerrno)
555 {
556 	struct lvol_bdev *lvol_bdev = ctx;
557 	struct lvol_store_bdev *lvs_bdev = lvol_bdev->lvs_bdev;
558 
559 	if (g_shutdown_started && _vbdev_lvs_are_lvols_closed(lvs_bdev->lvs)) {
560 		spdk_lvs_unload(lvs_bdev->lvs, _vbdev_lvol_unregister_unload_lvs, lvol_bdev);
561 		return;
562 	}
563 
564 	spdk_bdev_destruct_done(&lvol_bdev->bdev, lvolerrno);
565 	free(lvol_bdev);
566 }
567 
568 static int
569 vbdev_lvol_unregister(void *ctx)
570 {
571 	struct spdk_lvol *lvol = ctx;
572 	struct lvol_bdev *lvol_bdev;
573 
574 	assert(lvol != NULL);
575 	lvol_bdev = SPDK_CONTAINEROF(lvol->bdev, struct lvol_bdev, bdev);
576 
577 	spdk_bdev_alias_del_all(lvol->bdev);
578 	spdk_lvol_close(lvol, _vbdev_lvol_unregister_cb, lvol_bdev);
579 
580 	/* return 1 to indicate we have an operation that must finish asynchronously before the
581 	 *  lvol is closed
582 	 */
583 	return 1;
584 }
585 
586 static void
587 _vbdev_lvol_destroy_cb(void *cb_arg, int bdeverrno)
588 {
589 	struct vbdev_lvol_destroy_ctx *ctx = cb_arg;
590 	struct spdk_lvol *lvol = ctx->lvol;
591 
592 	if (bdeverrno < 0) {
593 		SPDK_INFOLOG(vbdev_lvol, "Could not unregister bdev during lvol (%s) destroy\n",
594 			     lvol->unique_id);
595 		ctx->cb_fn(ctx->cb_arg, bdeverrno);
596 		free(ctx);
597 		return;
598 	}
599 
600 	spdk_lvol_destroy(lvol, ctx->cb_fn, ctx->cb_arg);
601 	free(ctx);
602 }
603 
604 void
605 vbdev_lvol_destroy(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
606 {
607 	struct vbdev_lvol_destroy_ctx *ctx;
608 	size_t count;
609 
610 	assert(lvol != NULL);
611 	assert(cb_fn != NULL);
612 
613 	/* Check if it is possible to delete lvol */
614 	spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
615 	if (count > 1) {
616 		/* throw an error */
617 		SPDK_ERRLOG("Cannot delete lvol\n");
618 		cb_fn(cb_arg, -EPERM);
619 		return;
620 	}
621 
622 	ctx = calloc(1, sizeof(*ctx));
623 	if (!ctx) {
624 		cb_fn(cb_arg, -ENOMEM);
625 		return;
626 	}
627 
628 	ctx->lvol = lvol;
629 	ctx->cb_fn = cb_fn;
630 	ctx->cb_arg = cb_arg;
631 
632 	spdk_bdev_unregister(lvol->bdev, _vbdev_lvol_destroy_cb, ctx);
633 }
634 
635 static char *
636 vbdev_lvol_find_name(struct spdk_lvol *lvol, spdk_blob_id blob_id)
637 {
638 	struct spdk_lvol_store *lvs;
639 	struct spdk_lvol *_lvol;
640 
641 	assert(lvol != NULL);
642 
643 	lvs = lvol->lvol_store;
644 
645 	assert(lvs);
646 
647 	TAILQ_FOREACH(_lvol, &lvs->lvols, link) {
648 		if (_lvol->blob_id == blob_id) {
649 			return _lvol->name;
650 		}
651 	}
652 
653 	return NULL;
654 }
655 
656 static int
657 vbdev_lvol_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
658 {
659 	struct spdk_lvol *lvol = ctx;
660 	struct lvol_store_bdev *lvs_bdev;
661 	struct spdk_bdev *bdev;
662 	struct spdk_blob *blob;
663 	char lvol_store_uuid[SPDK_UUID_STRING_LEN];
664 	spdk_blob_id *ids = NULL;
665 	size_t count, i;
666 	char *name;
667 	int rc = 0;
668 
669 	spdk_json_write_named_object_begin(w, "lvol");
670 
671 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
672 	if (!lvs_bdev) {
673 		SPDK_ERRLOG("No such lvol store found\n");
674 		rc = -ENODEV;
675 		goto end;
676 	}
677 
678 	bdev = lvs_bdev->bdev;
679 
680 	spdk_uuid_fmt_lower(lvol_store_uuid, sizeof(lvol_store_uuid), &lvol->lvol_store->uuid);
681 	spdk_json_write_named_string(w, "lvol_store_uuid", lvol_store_uuid);
682 
683 	spdk_json_write_named_string(w, "base_bdev", spdk_bdev_get_name(bdev));
684 
685 	blob = lvol->blob;
686 
687 	spdk_json_write_named_bool(w, "thin_provision", spdk_blob_is_thin_provisioned(blob));
688 
689 	spdk_json_write_named_bool(w, "snapshot", spdk_blob_is_snapshot(blob));
690 
691 	spdk_json_write_named_bool(w, "clone", spdk_blob_is_clone(blob));
692 
693 	if (spdk_blob_is_clone(blob)) {
694 		spdk_blob_id snapshotid = spdk_blob_get_parent_snapshot(lvol->lvol_store->blobstore, lvol->blob_id);
695 		if (snapshotid != SPDK_BLOBID_INVALID) {
696 			name = vbdev_lvol_find_name(lvol, snapshotid);
697 			if (name != NULL) {
698 				spdk_json_write_named_string(w, "base_snapshot", name);
699 			} else {
700 				SPDK_ERRLOG("Cannot obtain snapshots name\n");
701 			}
702 		}
703 	}
704 
705 	if (spdk_blob_is_snapshot(blob)) {
706 		/* Take a number of clones */
707 		rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, NULL, &count);
708 		if (rc == -ENOMEM && count > 0) {
709 			ids = malloc(sizeof(spdk_blob_id) * count);
710 			if (ids == NULL) {
711 				SPDK_ERRLOG("Cannot allocate memory\n");
712 				rc = -ENOMEM;
713 				goto end;
714 			}
715 
716 			rc = spdk_blob_get_clones(lvol->lvol_store->blobstore, lvol->blob_id, ids, &count);
717 			if (rc == 0) {
718 				spdk_json_write_named_array_begin(w, "clones");
719 				for (i = 0; i < count; i++) {
720 					name = vbdev_lvol_find_name(lvol, ids[i]);
721 					if (name != NULL) {
722 						spdk_json_write_string(w, name);
723 					} else {
724 						SPDK_ERRLOG("Cannot obtain clone name\n");
725 					}
726 
727 				}
728 				spdk_json_write_array_end(w);
729 			}
730 			free(ids);
731 		}
732 
733 	}
734 
735 end:
736 	spdk_json_write_object_end(w);
737 
738 	return rc;
739 }
740 
741 static void
742 vbdev_lvol_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
743 {
744 	/* Nothing to dump as lvol configuration is saved on physical device. */
745 }
746 
747 static struct spdk_io_channel *
748 vbdev_lvol_get_io_channel(void *ctx)
749 {
750 	struct spdk_lvol *lvol = ctx;
751 
752 	return spdk_lvol_get_io_channel(lvol);
753 }
754 
755 static bool
756 vbdev_lvol_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
757 {
758 	struct spdk_lvol *lvol = ctx;
759 
760 	switch (io_type) {
761 	case SPDK_BDEV_IO_TYPE_WRITE:
762 	case SPDK_BDEV_IO_TYPE_UNMAP:
763 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
764 		return !spdk_blob_is_read_only(lvol->blob);
765 	case SPDK_BDEV_IO_TYPE_RESET:
766 	case SPDK_BDEV_IO_TYPE_READ:
767 	case SPDK_BDEV_IO_TYPE_SEEK_DATA:
768 	case SPDK_BDEV_IO_TYPE_SEEK_HOLE:
769 		return true;
770 	default:
771 		return false;
772 	}
773 }
774 
775 static void
776 lvol_op_comp(void *cb_arg, int bserrno)
777 {
778 	struct spdk_bdev_io *bdev_io = cb_arg;
779 	enum spdk_bdev_io_status status = SPDK_BDEV_IO_STATUS_SUCCESS;
780 
781 	if (bserrno != 0) {
782 		if (bserrno == -ENOMEM) {
783 			status = SPDK_BDEV_IO_STATUS_NOMEM;
784 		} else {
785 			status = SPDK_BDEV_IO_STATUS_FAILED;
786 		}
787 	}
788 
789 	spdk_bdev_io_complete(bdev_io, status);
790 }
791 
792 static void
793 lvol_unmap(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
794 {
795 	uint64_t start_page, num_pages;
796 	struct spdk_blob *blob = lvol->blob;
797 
798 	start_page = bdev_io->u.bdev.offset_blocks;
799 	num_pages = bdev_io->u.bdev.num_blocks;
800 
801 	spdk_blob_io_unmap(blob, ch, start_page, num_pages, lvol_op_comp, bdev_io);
802 }
803 
804 static void
805 lvol_seek_data(struct spdk_lvol *lvol, struct spdk_bdev_io *bdev_io)
806 {
807 	bdev_io->u.bdev.seek.offset = spdk_blob_get_next_allocated_io_unit(lvol->blob,
808 				      bdev_io->u.bdev.offset_blocks);
809 
810 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
811 }
812 
813 static void
814 lvol_seek_hole(struct spdk_lvol *lvol, struct spdk_bdev_io *bdev_io)
815 {
816 	bdev_io->u.bdev.seek.offset = spdk_blob_get_next_unallocated_io_unit(lvol->blob,
817 				      bdev_io->u.bdev.offset_blocks);
818 
819 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
820 }
821 
822 static void
823 lvol_write_zeroes(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
824 {
825 	uint64_t start_page, num_pages;
826 	struct spdk_blob *blob = lvol->blob;
827 
828 	start_page = bdev_io->u.bdev.offset_blocks;
829 	num_pages = bdev_io->u.bdev.num_blocks;
830 
831 	spdk_blob_io_write_zeroes(blob, ch, start_page, num_pages, lvol_op_comp, bdev_io);
832 }
833 
834 static void
835 lvol_read(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
836 {
837 	uint64_t start_page, num_pages;
838 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
839 	struct spdk_blob *blob = lvol->blob;
840 
841 	start_page = bdev_io->u.bdev.offset_blocks;
842 	num_pages = bdev_io->u.bdev.num_blocks;
843 
844 	if (bdev_io->u.bdev.ext_opts) {
845 		struct vbdev_lvol_io *lvol_io = (struct vbdev_lvol_io *)bdev_io->driver_ctx;
846 
847 		lvol_io->ext_io_opts.size = sizeof(lvol_io->ext_io_opts);
848 		lvol_io->ext_io_opts.memory_domain = bdev_io->u.bdev.ext_opts->memory_domain;
849 		lvol_io->ext_io_opts.memory_domain_ctx = bdev_io->u.bdev.ext_opts->memory_domain_ctx;
850 		/* Save a pointer to ext_opts passed by the user, it will be used in bs_dev readv/writev_ext functions
851 		 * to restore ext_opts structure. That is done since bdev and blob extended functions use different
852 		 * extended opts structures */
853 		lvol_io->ext_io_opts.user_ctx = bdev_io->u.bdev.ext_opts;
854 
855 		spdk_blob_io_readv_ext(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
856 				       num_pages, lvol_op_comp, bdev_io, &lvol_io->ext_io_opts);
857 	} else {
858 		spdk_blob_io_readv(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
859 				   num_pages, lvol_op_comp, bdev_io);
860 	}
861 }
862 
863 static void
864 lvol_write(struct spdk_lvol *lvol, struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
865 {
866 	uint64_t start_page, num_pages;
867 	struct spdk_blob *blob = lvol->blob;
868 
869 	start_page = bdev_io->u.bdev.offset_blocks;
870 	num_pages = bdev_io->u.bdev.num_blocks;
871 
872 	if (bdev_io->u.bdev.ext_opts) {
873 		struct vbdev_lvol_io *lvol_io = (struct vbdev_lvol_io *)bdev_io->driver_ctx;
874 
875 		lvol_io->ext_io_opts.size = sizeof(lvol_io->ext_io_opts);
876 		lvol_io->ext_io_opts.memory_domain = bdev_io->u.bdev.ext_opts->memory_domain;
877 		lvol_io->ext_io_opts.memory_domain_ctx = bdev_io->u.bdev.ext_opts->memory_domain_ctx;
878 		/* Save a pointer to ext_opts passed by the user, it will be used in bs_dev readv/writev_ext functions
879 		 * to restore ext_opts structure. That is done since bdev and blob extended functions use different
880 		 * extended opts structures */
881 		lvol_io->ext_io_opts.user_ctx = bdev_io->u.bdev.ext_opts;
882 
883 		spdk_blob_io_writev_ext(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
884 					num_pages, lvol_op_comp, bdev_io, &lvol_io->ext_io_opts);
885 	} else {
886 		spdk_blob_io_writev(blob, ch, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, start_page,
887 				    num_pages, lvol_op_comp, bdev_io);
888 	}
889 }
890 
891 static int
892 lvol_reset(struct spdk_bdev_io *bdev_io)
893 {
894 	spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
895 
896 	return 0;
897 }
898 
899 static void
900 lvol_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
901 {
902 	if (!success) {
903 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
904 		return;
905 	}
906 
907 	lvol_read(ch, bdev_io);
908 }
909 
910 static void
911 vbdev_lvol_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
912 {
913 	struct spdk_lvol *lvol = bdev_io->bdev->ctxt;
914 
915 	switch (bdev_io->type) {
916 	case SPDK_BDEV_IO_TYPE_READ:
917 		spdk_bdev_io_get_buf(bdev_io, lvol_get_buf_cb,
918 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
919 		break;
920 	case SPDK_BDEV_IO_TYPE_WRITE:
921 		lvol_write(lvol, ch, bdev_io);
922 		break;
923 	case SPDK_BDEV_IO_TYPE_RESET:
924 		lvol_reset(bdev_io);
925 		break;
926 	case SPDK_BDEV_IO_TYPE_UNMAP:
927 		lvol_unmap(lvol, ch, bdev_io);
928 		break;
929 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
930 		lvol_write_zeroes(lvol, ch, bdev_io);
931 		break;
932 	case SPDK_BDEV_IO_TYPE_SEEK_DATA:
933 		lvol_seek_data(lvol, bdev_io);
934 		break;
935 	case SPDK_BDEV_IO_TYPE_SEEK_HOLE:
936 		lvol_seek_hole(lvol, bdev_io);
937 		break;
938 	default:
939 		SPDK_INFOLOG(vbdev_lvol, "lvol: unsupported I/O type %d\n", bdev_io->type);
940 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
941 		return;
942 	}
943 	return;
944 }
945 
946 static int
947 vbdev_lvol_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
948 {
949 	struct spdk_lvol *lvol = ctx;
950 	struct spdk_bdev *base_bdev;
951 
952 	base_bdev = lvol->lvol_store->bs_dev->get_base_bdev(lvol->lvol_store->bs_dev);
953 
954 	return spdk_bdev_get_memory_domains(base_bdev, domains, array_size);
955 }
956 
957 static struct spdk_bdev_fn_table vbdev_lvol_fn_table = {
958 	.destruct		= vbdev_lvol_unregister,
959 	.io_type_supported	= vbdev_lvol_io_type_supported,
960 	.submit_request		= vbdev_lvol_submit_request,
961 	.get_io_channel		= vbdev_lvol_get_io_channel,
962 	.dump_info_json		= vbdev_lvol_dump_info_json,
963 	.write_config_json	= vbdev_lvol_write_config_json,
964 	.get_memory_domains	= vbdev_lvol_get_memory_domains,
965 };
966 
967 static void
968 lvol_destroy_cb(void *cb_arg, int bdeverrno)
969 {
970 }
971 
972 static void
973 _create_lvol_disk_destroy_cb(void *cb_arg, int bdeverrno)
974 {
975 	struct spdk_lvol *lvol = cb_arg;
976 
977 	if (bdeverrno < 0) {
978 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
979 			    lvol->unique_id);
980 		return;
981 	}
982 
983 	spdk_lvol_destroy(lvol, lvol_destroy_cb, NULL);
984 }
985 
986 static void
987 _create_lvol_disk_unload_cb(void *cb_arg, int bdeverrno)
988 {
989 	struct spdk_lvol *lvol = cb_arg;
990 
991 	if (bdeverrno < 0) {
992 		SPDK_ERRLOG("Could not unregister bdev for lvol %s\n",
993 			    lvol->unique_id);
994 		return;
995 	}
996 
997 	TAILQ_REMOVE(&lvol->lvol_store->lvols, lvol, link);
998 	free(lvol);
999 }
1000 
1001 static int
1002 _create_lvol_disk(struct spdk_lvol *lvol, bool destroy)
1003 {
1004 	struct spdk_bdev *bdev;
1005 	struct lvol_bdev *lvol_bdev;
1006 	struct lvol_store_bdev *lvs_bdev;
1007 	uint64_t total_size;
1008 	unsigned char *alias;
1009 	int rc;
1010 
1011 	lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvol->lvol_store);
1012 	if (lvs_bdev == NULL) {
1013 		SPDK_ERRLOG("No spdk lvs-bdev pair found for lvol %s\n", lvol->unique_id);
1014 		return -ENODEV;
1015 	}
1016 
1017 	lvol_bdev = calloc(1, sizeof(struct lvol_bdev));
1018 	if (!lvol_bdev) {
1019 		SPDK_ERRLOG("Cannot alloc memory for lvol bdev\n");
1020 		return -ENOMEM;
1021 	}
1022 
1023 	lvol_bdev->lvol = lvol;
1024 	lvol_bdev->lvs_bdev = lvs_bdev;
1025 
1026 	bdev = &lvol_bdev->bdev;
1027 	bdev->name = lvol->unique_id;
1028 	bdev->product_name = "Logical Volume";
1029 	bdev->blocklen = spdk_bs_get_io_unit_size(lvol->lvol_store->blobstore);
1030 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
1031 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
1032 	assert((total_size % bdev->blocklen) == 0);
1033 	bdev->blockcnt = total_size / bdev->blocklen;
1034 	bdev->uuid = lvol->uuid;
1035 	bdev->required_alignment = lvs_bdev->bdev->required_alignment;
1036 	bdev->split_on_optimal_io_boundary = true;
1037 	bdev->optimal_io_boundary = spdk_bs_get_cluster_size(lvol->lvol_store->blobstore) / bdev->blocklen;
1038 
1039 	bdev->ctxt = lvol;
1040 	bdev->fn_table = &vbdev_lvol_fn_table;
1041 	bdev->module = &g_lvol_if;
1042 
1043 	/* Set default bdev reset waiting time. This value indicates how much
1044 	 * time a reset should wait before forcing a reset down to the underlying
1045 	 * bdev module.
1046 	 * Setting this parameter is mainly to avoid "empty" resets to a shared
1047 	 * bdev that may be used by multiple lvols. */
1048 	bdev->reset_io_drain_timeout = SPDK_BDEV_RESET_IO_DRAIN_RECOMMENDED_VALUE;
1049 
1050 	rc = spdk_bdev_register(bdev);
1051 	if (rc) {
1052 		free(lvol_bdev);
1053 		return rc;
1054 	}
1055 	lvol->bdev = bdev;
1056 
1057 	alias = spdk_sprintf_alloc("%s/%s", lvs_bdev->lvs->name, lvol->name);
1058 	if (alias == NULL) {
1059 		SPDK_ERRLOG("Cannot alloc memory for alias\n");
1060 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
1061 						  _create_lvol_disk_unload_cb), lvol);
1062 		return -ENOMEM;
1063 	}
1064 
1065 	rc = spdk_bdev_alias_add(bdev, alias);
1066 	if (rc != 0) {
1067 		SPDK_ERRLOG("Cannot add alias to lvol bdev\n");
1068 		spdk_bdev_unregister(lvol->bdev, (destroy ? _create_lvol_disk_destroy_cb :
1069 						  _create_lvol_disk_unload_cb), lvol);
1070 	}
1071 	free(alias);
1072 
1073 	return rc;
1074 }
1075 
1076 static void
1077 _vbdev_lvol_create_cb(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1078 {
1079 	struct spdk_lvol_with_handle_req *req = cb_arg;
1080 
1081 	if (lvolerrno < 0) {
1082 		goto end;
1083 	}
1084 
1085 	lvolerrno = _create_lvol_disk(lvol, true);
1086 
1087 end:
1088 	req->cb_fn(req->cb_arg, lvol, lvolerrno);
1089 	free(req);
1090 }
1091 
1092 int
1093 vbdev_lvol_create(struct spdk_lvol_store *lvs, const char *name, uint64_t sz,
1094 		  bool thin_provision, enum lvol_clear_method clear_method, spdk_lvol_op_with_handle_complete cb_fn,
1095 		  void *cb_arg)
1096 {
1097 	struct spdk_lvol_with_handle_req *req;
1098 	int rc;
1099 
1100 	req = calloc(1, sizeof(*req));
1101 	if (req == NULL) {
1102 		return -ENOMEM;
1103 	}
1104 	req->cb_fn = cb_fn;
1105 	req->cb_arg = cb_arg;
1106 
1107 	rc = spdk_lvol_create(lvs, name, sz, thin_provision, clear_method,
1108 			      _vbdev_lvol_create_cb, req);
1109 	if (rc != 0) {
1110 		free(req);
1111 	}
1112 
1113 	return rc;
1114 }
1115 
1116 void
1117 vbdev_lvol_create_snapshot(struct spdk_lvol *lvol, const char *snapshot_name,
1118 			   spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1119 {
1120 	struct spdk_lvol_with_handle_req *req;
1121 
1122 	req = calloc(1, sizeof(*req));
1123 	if (req == NULL) {
1124 		cb_fn(cb_arg, NULL, -ENOMEM);
1125 		return;
1126 	}
1127 
1128 	req->cb_fn = cb_fn;
1129 	req->cb_arg = cb_arg;
1130 
1131 	spdk_lvol_create_snapshot(lvol, snapshot_name, _vbdev_lvol_create_cb, req);
1132 }
1133 
1134 void
1135 vbdev_lvol_create_clone(struct spdk_lvol *lvol, const char *clone_name,
1136 			spdk_lvol_op_with_handle_complete cb_fn, void *cb_arg)
1137 {
1138 	struct spdk_lvol_with_handle_req *req;
1139 
1140 	req = calloc(1, sizeof(*req));
1141 	if (req == NULL) {
1142 		cb_fn(cb_arg, NULL, -ENOMEM);
1143 		return;
1144 	}
1145 
1146 	req->cb_fn = cb_fn;
1147 	req->cb_arg = cb_arg;
1148 
1149 	spdk_lvol_create_clone(lvol, clone_name, _vbdev_lvol_create_cb, req);
1150 }
1151 
1152 static void
1153 _vbdev_lvol_rename_cb(void *cb_arg, int lvolerrno)
1154 {
1155 	struct spdk_lvol_req *req = cb_arg;
1156 
1157 	if (lvolerrno != 0) {
1158 		SPDK_ERRLOG("Renaming lvol failed\n");
1159 	}
1160 
1161 	req->cb_fn(req->cb_arg, lvolerrno);
1162 	free(req);
1163 }
1164 
1165 void
1166 vbdev_lvol_rename(struct spdk_lvol *lvol, const char *new_lvol_name,
1167 		  spdk_lvol_op_complete cb_fn, void *cb_arg)
1168 {
1169 	struct spdk_lvol_req *req;
1170 	int rc;
1171 
1172 	rc = _vbdev_lvol_change_bdev_alias(lvol, new_lvol_name);
1173 	if (rc != 0) {
1174 		SPDK_ERRLOG("renaming lvol to '%s' does not succeed\n", new_lvol_name);
1175 		cb_fn(cb_arg, rc);
1176 		return;
1177 	}
1178 
1179 	req = calloc(1, sizeof(*req));
1180 	if (req == NULL) {
1181 		cb_fn(cb_arg, -ENOMEM);
1182 		return;
1183 	}
1184 	req->cb_fn = cb_fn;
1185 	req->cb_arg = cb_arg;
1186 
1187 	spdk_lvol_rename(lvol, new_lvol_name, _vbdev_lvol_rename_cb, req);
1188 }
1189 
1190 static void
1191 _vbdev_lvol_resize_cb(void *cb_arg, int lvolerrno)
1192 {
1193 	struct spdk_lvol_req *req = cb_arg;
1194 	struct spdk_lvol *lvol = req->lvol;
1195 	uint64_t total_size;
1196 
1197 	/* change bdev size */
1198 	if (lvolerrno != 0) {
1199 		SPDK_ERRLOG("CB function for bdev lvol %s receive error no: %d.\n", lvol->name, lvolerrno);
1200 		goto finish;
1201 	}
1202 
1203 	total_size = spdk_blob_get_num_clusters(lvol->blob) *
1204 		     spdk_bs_get_cluster_size(lvol->lvol_store->blobstore);
1205 	assert((total_size % lvol->bdev->blocklen) == 0);
1206 
1207 	lvolerrno = spdk_bdev_notify_blockcnt_change(lvol->bdev, total_size / lvol->bdev->blocklen);
1208 	if (lvolerrno != 0) {
1209 		SPDK_ERRLOG("Could not change num blocks for bdev lvol %s with error no: %d.\n",
1210 			    lvol->name, lvolerrno);
1211 	}
1212 
1213 finish:
1214 	req->cb_fn(req->cb_arg, lvolerrno);
1215 	free(req);
1216 }
1217 
1218 void
1219 vbdev_lvol_resize(struct spdk_lvol *lvol, uint64_t sz, spdk_lvol_op_complete cb_fn, void *cb_arg)
1220 {
1221 	struct spdk_lvol_req *req;
1222 
1223 	if (lvol == NULL) {
1224 		SPDK_ERRLOG("lvol does not exist\n");
1225 		cb_fn(cb_arg, -EINVAL);
1226 		return;
1227 	}
1228 
1229 	assert(lvol->bdev != NULL);
1230 
1231 	req = calloc(1, sizeof(*req));
1232 	if (req == NULL) {
1233 		cb_fn(cb_arg, -ENOMEM);
1234 		return;
1235 	}
1236 
1237 	req->cb_fn = cb_fn;
1238 	req->cb_arg = cb_arg;
1239 	req->sz = sz;
1240 	req->lvol = lvol;
1241 
1242 	spdk_lvol_resize(req->lvol, req->sz, _vbdev_lvol_resize_cb, req);
1243 }
1244 
1245 static void
1246 _vbdev_lvol_set_read_only_cb(void *cb_arg, int lvolerrno)
1247 {
1248 	struct spdk_lvol_req *req = cb_arg;
1249 	struct spdk_lvol *lvol = req->lvol;
1250 
1251 	if (lvolerrno != 0) {
1252 		SPDK_ERRLOG("Could not set bdev lvol %s as read only due to error: %d.\n", lvol->name, lvolerrno);
1253 	}
1254 
1255 	req->cb_fn(req->cb_arg, lvolerrno);
1256 	free(req);
1257 }
1258 
1259 void
1260 vbdev_lvol_set_read_only(struct spdk_lvol *lvol, spdk_lvol_op_complete cb_fn, void *cb_arg)
1261 {
1262 	struct spdk_lvol_req *req;
1263 
1264 	if (lvol == NULL) {
1265 		SPDK_ERRLOG("lvol does not exist\n");
1266 		cb_fn(cb_arg, -EINVAL);
1267 		return;
1268 	}
1269 
1270 	assert(lvol->bdev != NULL);
1271 
1272 	req = calloc(1, sizeof(*req));
1273 	if (req == NULL) {
1274 		cb_fn(cb_arg, -ENOMEM);
1275 		return;
1276 	}
1277 
1278 	req->cb_fn = cb_fn;
1279 	req->cb_arg = cb_arg;
1280 	req->lvol = lvol;
1281 
1282 	spdk_lvol_set_read_only(lvol, _vbdev_lvol_set_read_only_cb, req);
1283 }
1284 
1285 static int
1286 vbdev_lvs_init(void)
1287 {
1288 	return 0;
1289 }
1290 
1291 static void vbdev_lvs_fini_start_iter(struct lvol_store_bdev *lvs_bdev);
1292 
1293 static void
1294 vbdev_lvs_fini_start_unload_cb(void *cb_arg, int lvserrno)
1295 {
1296 	struct lvol_store_bdev *lvs_bdev = cb_arg;
1297 	struct lvol_store_bdev *next_lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
1298 
1299 	if (lvserrno != 0) {
1300 		SPDK_INFOLOG(vbdev_lvol, "Lvol store removed with error: %d.\n", lvserrno);
1301 	}
1302 
1303 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1304 	free(lvs_bdev);
1305 
1306 	vbdev_lvs_fini_start_iter(next_lvs_bdev);
1307 }
1308 
1309 static void
1310 vbdev_lvs_fini_start_iter(struct lvol_store_bdev *lvs_bdev)
1311 {
1312 	struct spdk_lvol_store *lvs;
1313 
1314 	while (lvs_bdev != NULL) {
1315 		lvs = lvs_bdev->lvs;
1316 
1317 		if (_vbdev_lvs_are_lvols_closed(lvs)) {
1318 			spdk_lvs_unload(lvs, vbdev_lvs_fini_start_unload_cb, lvs_bdev);
1319 			return;
1320 		}
1321 		lvs_bdev = vbdev_lvol_store_next(lvs_bdev);
1322 	}
1323 
1324 	spdk_bdev_module_fini_start_done();
1325 }
1326 
1327 static void
1328 vbdev_lvs_fini_start(void)
1329 {
1330 	g_shutdown_started = true;
1331 	vbdev_lvs_fini_start_iter(vbdev_lvol_store_first());
1332 }
1333 
1334 static int
1335 vbdev_lvs_get_ctx_size(void)
1336 {
1337 	return sizeof(struct vbdev_lvol_io);
1338 }
1339 
1340 static void
1341 _vbdev_lvs_examine_failed(void *cb_arg, int lvserrno)
1342 {
1343 	spdk_bdev_module_examine_done(&g_lvol_if);
1344 }
1345 
1346 static void
1347 _vbdev_lvol_examine_close_cb(struct spdk_lvol_store *lvs)
1348 {
1349 	if (lvs->lvols_opened >= lvs->lvol_count) {
1350 		SPDK_INFOLOG(vbdev_lvol, "Opening lvols finished\n");
1351 		spdk_bdev_module_examine_done(&g_lvol_if);
1352 	}
1353 }
1354 
1355 static void
1356 _vbdev_lvs_examine_finish(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1357 {
1358 	struct spdk_lvol_store *lvs = cb_arg;
1359 
1360 	if (lvolerrno != 0) {
1361 		SPDK_ERRLOG("Error opening lvol %s\n", lvol->unique_id);
1362 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
1363 		lvs->lvol_count--;
1364 		free(lvol);
1365 		goto end;
1366 	}
1367 
1368 	if (_create_lvol_disk(lvol, false)) {
1369 		SPDK_ERRLOG("Cannot create bdev for lvol %s\n", lvol->unique_id);
1370 		lvs->lvol_count--;
1371 		_vbdev_lvol_examine_close_cb(lvs);
1372 		SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s failed\n", lvol->unique_id);
1373 		return;
1374 	}
1375 
1376 	lvs->lvols_opened++;
1377 	SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s succeeded\n", lvol->unique_id);
1378 
1379 end:
1380 
1381 	if (lvs->lvols_opened >= lvs->lvol_count) {
1382 		SPDK_INFOLOG(vbdev_lvol, "Opening lvols finished\n");
1383 		spdk_bdev_module_examine_done(&g_lvol_if);
1384 	}
1385 }
1386 
1387 static void
1388 _vbdev_lvs_examine_cb(void *arg, struct spdk_lvol_store *lvol_store, int lvserrno)
1389 {
1390 	struct lvol_store_bdev *lvs_bdev;
1391 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)arg;
1392 	struct spdk_lvol *lvol, *tmp;
1393 
1394 	if (lvserrno == -EEXIST) {
1395 		SPDK_INFOLOG(vbdev_lvol,
1396 			     "Name for lvolstore on device %s conflicts with name for already loaded lvs\n",
1397 			     req->base_bdev->name);
1398 		/* On error blobstore destroys bs_dev itself */
1399 		spdk_bdev_module_examine_done(&g_lvol_if);
1400 		goto end;
1401 	} else if (lvserrno != 0) {
1402 		SPDK_INFOLOG(vbdev_lvol, "Lvol store not found on %s\n", req->base_bdev->name);
1403 		/* On error blobstore destroys bs_dev itself */
1404 		spdk_bdev_module_examine_done(&g_lvol_if);
1405 		goto end;
1406 	}
1407 
1408 	lvserrno = spdk_bs_bdev_claim(lvol_store->bs_dev, &g_lvol_if);
1409 	if (lvserrno != 0) {
1410 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
1411 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, NULL);
1412 		goto end;
1413 	}
1414 
1415 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
1416 	if (!lvs_bdev) {
1417 		SPDK_ERRLOG("Cannot alloc memory for lvs_bdev\n");
1418 		spdk_lvs_unload(lvol_store, _vbdev_lvs_examine_failed, NULL);
1419 		goto end;
1420 	}
1421 
1422 	lvs_bdev->lvs = lvol_store;
1423 	lvs_bdev->bdev = req->base_bdev;
1424 
1425 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1426 
1427 	SPDK_INFOLOG(vbdev_lvol, "Lvol store found on %s - begin parsing\n",
1428 		     req->base_bdev->name);
1429 
1430 	lvol_store->lvols_opened = 0;
1431 
1432 	if (TAILQ_EMPTY(&lvol_store->lvols)) {
1433 		SPDK_INFOLOG(vbdev_lvol, "Lvol store examination done\n");
1434 		spdk_bdev_module_examine_done(&g_lvol_if);
1435 	} else {
1436 		/* Open all lvols */
1437 		TAILQ_FOREACH_SAFE(lvol, &lvol_store->lvols, link, tmp) {
1438 			spdk_lvol_open(lvol, _vbdev_lvs_examine_finish, lvol_store);
1439 		}
1440 	}
1441 
1442 end:
1443 	free(req);
1444 }
1445 
1446 static void
1447 vbdev_lvs_examine(struct spdk_bdev *bdev)
1448 {
1449 	struct spdk_bs_dev *bs_dev;
1450 	struct spdk_lvs_with_handle_req *req;
1451 	int rc;
1452 
1453 	if (spdk_bdev_get_md_size(bdev) != 0) {
1454 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n which is formatted with metadata",
1455 			     bdev->name);
1456 		spdk_bdev_module_examine_done(&g_lvol_if);
1457 		return;
1458 	}
1459 
1460 	req = calloc(1, sizeof(*req));
1461 	if (req == NULL) {
1462 		spdk_bdev_module_examine_done(&g_lvol_if);
1463 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1464 		return;
1465 	}
1466 
1467 	rc = spdk_bdev_create_bs_dev_ext(bdev->name, vbdev_lvs_base_bdev_event_cb,
1468 					 NULL, &bs_dev);
1469 	if (rc < 0) {
1470 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n", bdev->name);
1471 		spdk_bdev_module_examine_done(&g_lvol_if);
1472 		free(req);
1473 		return;
1474 	}
1475 
1476 	req->base_bdev = bdev;
1477 
1478 	spdk_lvs_load(bs_dev, _vbdev_lvs_examine_cb, req);
1479 }
1480 
1481 struct spdk_lvol *
1482 vbdev_lvol_get_from_bdev(struct spdk_bdev *bdev)
1483 {
1484 	if (!bdev || bdev->module != &g_lvol_if) {
1485 		return NULL;
1486 	}
1487 
1488 	if (bdev->ctxt == NULL) {
1489 		SPDK_ERRLOG("No lvol ctx assigned to bdev %s\n", bdev->name);
1490 		return NULL;
1491 	}
1492 
1493 	return (struct spdk_lvol *)bdev->ctxt;
1494 }
1495 
1496 static void
1497 _vbdev_lvs_grow_finish(void *arg, int lvserrno)
1498 {
1499 	struct spdk_lvs_grow_req *req = arg;
1500 	req->cb_fn(req->cb_arg, req->lvserrno);
1501 	free(req);
1502 }
1503 
1504 static void
1505 _vbdev_lvs_grow_examine_finish(void *cb_arg, struct spdk_lvol *lvol, int lvolerrno)
1506 {
1507 	struct spdk_lvs_grow_req *req = cb_arg;
1508 	struct spdk_lvol_store *lvs = req->lvol_store;
1509 
1510 	if (lvolerrno != 0) {
1511 		SPDK_ERRLOG("Error opening lvol %s\n", lvol->unique_id);
1512 		TAILQ_REMOVE(&lvs->lvols, lvol, link);
1513 		assert(lvs->lvol_count > 0);
1514 		lvs->lvol_count--;
1515 		free(lvol);
1516 		goto end;
1517 	}
1518 
1519 	if (_create_lvol_disk(lvol, false)) {
1520 		SPDK_ERRLOG("Cannot create bdev for lvol %s\n", lvol->unique_id);
1521 		assert(lvs->lvol_count > 0);
1522 		lvs->lvol_count--;
1523 		SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s failed\n", lvol->unique_id);
1524 		goto end;
1525 	}
1526 
1527 	lvs->lvols_opened++;
1528 	SPDK_INFOLOG(vbdev_lvol, "Opening lvol %s succeeded\n", lvol->unique_id);
1529 
1530 end:
1531 	if (lvs->lvols_opened >= lvs->lvol_count) {
1532 		SPDK_INFOLOG(vbdev_lvol, "Opening lvols finished\n");
1533 		_vbdev_lvs_grow_finish(req, 0);
1534 	}
1535 }
1536 
1537 static void
1538 _vbdev_lvs_grow_examine_cb(void *arg, struct spdk_lvol_store *lvol_store, int lvserrno)
1539 {
1540 	struct lvol_store_bdev *lvs_bdev;
1541 	struct spdk_lvs_with_handle_req *req = (struct spdk_lvs_with_handle_req *)arg;
1542 	struct spdk_lvol *lvol, *tmp;
1543 	struct spdk_lvs_grow_req *ori_req = req->cb_arg;
1544 
1545 	if (lvserrno == -EEXIST) {
1546 		SPDK_INFOLOG(vbdev_lvol,
1547 			     "Name for lvolstore on device %s conflicts with name for already loaded lvs\n",
1548 			     req->base_bdev->name);
1549 		ori_req->lvserrno = lvserrno;
1550 		_vbdev_lvs_grow_finish(ori_req, lvserrno);
1551 		goto end;
1552 	} else if (lvserrno != 0) {
1553 		SPDK_INFOLOG(vbdev_lvol, "Lvol store not found on %s\n", req->base_bdev->name);
1554 		/* On error blobstore destroys bs_dev itself */
1555 		ori_req->lvserrno = lvserrno;
1556 		_vbdev_lvs_grow_finish(ori_req, lvserrno);
1557 		goto end;
1558 	}
1559 
1560 	lvserrno = spdk_bs_bdev_claim(lvol_store->bs_dev, &g_lvol_if);
1561 	if (lvserrno != 0) {
1562 		SPDK_INFOLOG(vbdev_lvol, "Lvol store base bdev already claimed by another bdev\n");
1563 		ori_req->lvserrno = lvserrno;
1564 		spdk_lvs_unload(lvol_store, _vbdev_lvs_grow_finish, ori_req);
1565 		goto end;
1566 	}
1567 
1568 	lvs_bdev = calloc(1, sizeof(*lvs_bdev));
1569 	if (!lvs_bdev) {
1570 		SPDK_ERRLOG("Cannot alloc memory for lvs_bdev\n");
1571 		ori_req->lvserrno = -ENOMEM;
1572 		spdk_lvs_unload(lvol_store, _vbdev_lvs_grow_finish, ori_req);
1573 		goto end;
1574 	}
1575 
1576 	lvs_bdev->lvs = lvol_store;
1577 	lvs_bdev->bdev = req->base_bdev;
1578 
1579 	TAILQ_INSERT_TAIL(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1580 	SPDK_INFOLOG(vbdev_lvol, "Lvol store found on %s - begin parsing\n",
1581 		     req->base_bdev->name);
1582 
1583 	lvol_store->lvols_opened = 0;
1584 
1585 	ori_req->lvol_store = lvol_store;
1586 	ori_req->lvserrno = 0;
1587 	if (TAILQ_EMPTY(&lvol_store->lvols)) {
1588 		SPDK_INFOLOG(vbdev_lvol, "Lvol store examination done\n");
1589 		_vbdev_lvs_grow_finish(ori_req, 0);
1590 	} else {
1591 		/* Open all lvols */
1592 		TAILQ_FOREACH_SAFE(lvol, &lvol_store->lvols, link, tmp) {
1593 			spdk_lvol_open(lvol, _vbdev_lvs_grow_examine_finish, ori_req);
1594 		}
1595 	}
1596 
1597 end:
1598 	free(req);
1599 }
1600 
1601 static void
1602 _vbdev_lvs_grow_examine(struct spdk_bdev *bdev, struct spdk_lvs_grow_req *ori_req)
1603 {
1604 	struct spdk_bs_dev *bs_dev;
1605 	struct spdk_lvs_with_handle_req *req;
1606 	int rc;
1607 
1608 	req = calloc(1, sizeof(*req));
1609 	if (req == NULL) {
1610 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1611 		ori_req->lvserrno = -ENOMEM;
1612 		_vbdev_lvs_grow_finish(ori_req, -ENOMEM);
1613 		return;
1614 	}
1615 
1616 	rc = spdk_bdev_create_bs_dev_ext(bdev->name, vbdev_lvs_base_bdev_event_cb,
1617 					 NULL, &bs_dev);
1618 	if (rc < 0) {
1619 		SPDK_INFOLOG(vbdev_lvol, "Cannot create bs dev on %s\n", bdev->name);
1620 		ori_req->lvserrno = rc;
1621 		_vbdev_lvs_grow_finish(ori_req, rc);
1622 		free(req);
1623 		return;
1624 	}
1625 
1626 	req->base_bdev = bdev;
1627 	req->cb_arg = ori_req;
1628 
1629 	spdk_lvs_grow(bs_dev, _vbdev_lvs_grow_examine_cb, req);
1630 }
1631 
1632 static void
1633 _vbdev_lvs_grow_unload_cb(void *cb_arg, int lvserrno)
1634 {
1635 	struct spdk_lvs_grow_req *req = cb_arg;
1636 	struct lvol_store_bdev *lvs_bdev;
1637 	struct spdk_bdev *bdev;
1638 
1639 	if (lvserrno != 0) {
1640 		req->cb_fn(req->cb_arg, lvserrno);
1641 		free(req);
1642 		return;
1643 	}
1644 
1645 	lvs_bdev = req->lvs_bdev;
1646 	bdev = lvs_bdev->bdev;
1647 	TAILQ_REMOVE(&g_spdk_lvol_pairs, lvs_bdev, lvol_stores);
1648 	_vbdev_lvs_grow_examine(bdev, req);
1649 	free(lvs_bdev);
1650 }
1651 
1652 static void
1653 _vbdev_lvs_grow_remove_bdev_unregistered_cb(void *cb_arg, int bdeverrno)
1654 {
1655 	struct spdk_lvs_grow_req *req = cb_arg;
1656 	struct spdk_lvol_store *lvs = req->lvol_store;
1657 
1658 	if (bdeverrno != 0) {
1659 		SPDK_DEBUGLOG(vbdev_lvol, "Lvol unregistered with errno %d\n", bdeverrno);
1660 	}
1661 
1662 	req->lvol_cnt--;
1663 
1664 	if (req->lvol_cnt == 0) {
1665 		/* Lvol store can be unloaded once all lvols are closed. */
1666 		if (_vbdev_lvs_are_lvols_closed(lvs)) {
1667 			spdk_lvs_unload(lvs, _vbdev_lvs_grow_unload_cb, req);
1668 		} else {
1669 			req->cb_fn(req->cb_arg, -EINVAL);
1670 			free(req);
1671 		}
1672 	}
1673 }
1674 
1675 void
1676 vbdev_lvs_grow(struct spdk_lvol_store *lvs,
1677 	       spdk_lvs_op_complete cb_fn, void *cb_arg)
1678 {
1679 	struct spdk_lvs_grow_req *req;
1680 	struct spdk_lvol *lvol, *tmp;
1681 
1682 	req = calloc(1, sizeof(*req));
1683 	if (!req) {
1684 		SPDK_ERRLOG("Cannot alloc memory for vbdev lvol store request pointer\n");
1685 		cb_fn(cb_arg, -ENOMEM);
1686 		return;
1687 	}
1688 	req->cb_fn = cb_fn;
1689 	req->cb_arg = cb_arg;
1690 	req->lvol_store = lvs;
1691 	req->lvs_bdev = vbdev_get_lvs_bdev_by_lvs(lvs);
1692 	if (req->lvs_bdev == NULL) {
1693 		SPDK_ERRLOG("Cannot get valid lvs_bdev\n");
1694 		cb_fn(cb_arg, -EINVAL);
1695 		free(req);
1696 		return;
1697 	}
1698 
1699 	if (_vbdev_lvs_are_lvols_closed(lvs)) {
1700 		spdk_lvs_unload(lvs, _vbdev_lvs_grow_unload_cb, req);
1701 	} else {
1702 		TAILQ_FOREACH_SAFE(lvol, &lvs->lvols, link, tmp) {
1703 			req->lvol_cnt++;
1704 			spdk_bdev_unregister(lvol->bdev, _vbdev_lvs_grow_remove_bdev_unregistered_cb, req);
1705 		}
1706 		assert(req->lvol_cnt > 0);
1707 	}
1708 }
1709 
1710 SPDK_LOG_REGISTER_COMPONENT(vbdev_lvol)
1711