xref: /spdk/test/lvol/esnap/esnap.c (revision e5693d682a9872b3bb3a84b3245a099af77992d6)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3  */
4 
5 #include "spdk/stdinc.h"
6 #include "spdk_internal/cunit.h"
7 #include "spdk/string.h"
8 #include "spdk/init.h"
9 
10 #include "common/lib/ut_multithread.c"
11 
12 #include "bdev/bdev.c"
13 #include "lvol/lvol.c"
14 #include "bdev/malloc/bdev_malloc.c"
15 #include "bdev/lvol/vbdev_lvol.c"
16 #include "accel/accel_sw.c"
17 #include "bdev/part.c"
18 #include "blob/blobstore.h"
19 #include "bdev/aio/bdev_aio.h"
20 
21 #include "unit/lib/json_mock.c"
22 
23 #ifdef SPDK_CONFIG_PMDK
24 DEFINE_STUB(pmem_msync, int, (const void *addr, size_t len), 0);
25 DEFINE_STUB(pmem_memcpy_persist, void *, (void *pmemdest, const void *src, size_t len), NULL);
26 DEFINE_STUB(pmem_is_pmem, int, (const void *addr, size_t len), 0);
27 DEFINE_STUB(pmem_memset_persist, void *, (void *pmemdest, int c, size_t len), NULL);
28 #endif
29 
30 char g_testdir[PATH_MAX];
31 
32 static void
33 set_testdir(const char *path)
34 {
35 	char *tmp;
36 
37 	tmp = realpath(path, NULL);
38 	snprintf(g_testdir, sizeof(g_testdir), "%s", tmp ? dirname(tmp) : ".");
39 	free(tmp);
40 }
41 
42 static int
43 make_test_file(size_t size, char *path, size_t len, const char *name)
44 {
45 	int fd;
46 	int rc;
47 
48 	CU_ASSERT(len <= INT32_MAX);
49 	if (snprintf(path, len, "%s/%s", g_testdir, name) >= (int)len) {
50 		return -ENAMETOOLONG;
51 	}
52 	unlink(path);
53 	fd = open(path, O_RDWR | O_CREAT | O_EXCL, 0600);
54 	if (fd < 0) {
55 		return -errno;
56 	}
57 	rc = ftruncate(fd, size);
58 	if (rc != 0) {
59 		rc = -errno;
60 		unlink(path);
61 	}
62 	close(fd);
63 	return rc;
64 }
65 
66 static void
67 unregister_cb(void *ctx, int bdeverrno)
68 {
69 	int *rc = ctx;
70 
71 	if (rc != NULL) {
72 		*rc = bdeverrno;
73 	}
74 }
75 
76 struct op_with_handle_data {
77 	union {
78 		struct spdk_lvol_store *lvs;
79 		struct spdk_lvol *lvol;
80 	} u;
81 	int lvserrno;
82 };
83 
84 static struct op_with_handle_data *
85 clear_owh(struct op_with_handle_data *owh)
86 {
87 	memset(owh, 0, sizeof(*owh));
88 	owh->lvserrno = 0xbad;
89 
90 	return owh;
91 }
92 
93 /* spdk_poll_threads() doesn't have visibility into uncompleted aio operations. */
94 static void
95 poll_error_updated(int *error)
96 {
97 	while (*error == 0xbad) {
98 		poll_threads();
99 	}
100 }
101 
102 static void
103 lvs_op_with_handle_cb(void *cb_arg, struct spdk_lvol_store *lvs, int lvserrno)
104 {
105 	struct op_with_handle_data *data = cb_arg;
106 
107 	data->u.lvs = lvs;
108 	data->lvserrno = lvserrno;
109 }
110 
111 static void
112 lvol_op_with_handle_cb(void *cb_arg, struct spdk_lvol *lvol, int lvserrno)
113 {
114 	struct op_with_handle_data *data = cb_arg;
115 
116 	data->u.lvol = lvol;
117 	data->lvserrno = lvserrno;
118 }
119 
120 static void
121 lvol_op_complete_cb(void *cb_arg, int lvolerrno)
122 {
123 	int *err = cb_arg;
124 
125 	*err = lvolerrno;
126 }
127 
128 static void
129 ut_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
130 {
131 }
132 
133 static void
134 io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
135 {
136 	int *err = cb_arg;
137 
138 	spdk_bdev_free_io(bdev_io);
139 	SPDK_CU_ASSERT_FATAL(success);
140 
141 	*err = 0;
142 }
143 
144 static void
145 prepare_block(char *buf, size_t bufsz, const char *uuid_str, uint64_t block)
146 {
147 	memset(buf, 0, bufsz);
148 	snprintf(buf, bufsz, "%s %8" PRIu64, uuid_str, block);
149 }
150 
151 static void
152 scribble(struct spdk_bdev_desc *desc, uint64_t start, uint64_t count)
153 {
154 	struct spdk_bdev *bdev = desc->bdev;
155 	const uint32_t blocklen = desc->bdev->blocklen;
156 	struct spdk_io_channel *ch = spdk_bdev_get_io_channel(desc);
157 	char uuid_str[SPDK_UUID_STRING_LEN];
158 	char buf[count][blocklen];
159 	int err = 0xbad;
160 	uint64_t i;
161 
162 	SPDK_CU_ASSERT_FATAL(count > 0 && count < INT32_MAX);
163 	SPDK_CU_ASSERT_FATAL(ch != NULL);
164 
165 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid);
166 
167 	for (i = 0; i < count; i++) {
168 		prepare_block(buf[i], sizeof(buf[i]), uuid_str, start + i);
169 	}
170 
171 	spdk_bdev_write(desc, ch, buf, start * blocklen, sizeof(buf), io_done, &err);
172 	poll_threads();
173 	SPDK_CU_ASSERT_FATAL(err == 0);
174 	spdk_put_io_channel(ch);
175 	poll_threads();
176 }
177 
178 #define verify(desc, bdev, start, count) _verify(desc, bdev, start, count, __FILE__, __LINE__)
179 
180 static bool
181 _verify(struct spdk_bdev_desc *desc, struct spdk_bdev *bdev, uint64_t start, uint64_t count,
182 	const char *file, int line)
183 {
184 	struct spdk_io_channel *ch = spdk_bdev_get_io_channel(desc);
185 	const uint32_t blocklen = desc->bdev->blocklen;
186 	char uuid_str[SPDK_UUID_STRING_LEN];
187 	char buf[blocklen];
188 	char expect[blocklen];
189 	int err = 0xbad;
190 	bool ret = true;
191 	uint64_t i;
192 
193 	SPDK_CU_ASSERT_FATAL(count > 0 && count < INT32_MAX);
194 	SPDK_CU_ASSERT_FATAL(ch != NULL);
195 
196 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid);
197 
198 	for (i = 0; i < count; i++) {
199 		uint64_t block = start + i;
200 
201 		spdk_bdev_read(desc, ch, buf, block * blocklen, sizeof(buf), io_done, &err);
202 		poll_threads();
203 		SPDK_CU_ASSERT_FATAL(err == 0);
204 		prepare_block(expect, sizeof(expect), uuid_str, block);
205 		if (memcmp(expect, buf, blocklen) != 0) {
206 			printf("%s:%d: ERROR: expected '%s' got '%s'\n", file, line,
207 			       expect, buf);
208 			ret = false;
209 		}
210 	}
211 
212 	spdk_put_io_channel(ch);
213 	poll_threads();
214 
215 	return ret;
216 }
217 
218 static bool
219 cluster_is_allocated(struct spdk_blob *blob, uint32_t cluster)
220 {
221 	return bs_io_unit_is_allocated(blob, cluster * blob->bs->pages_per_cluster);
222 }
223 
224 static void
225 esnap_clone_io(void)
226 {
227 	struct spdk_lvol_store *lvs = NULL;
228 	struct spdk_bdev *bs_bdev = NULL;
229 	struct spdk_bdev *esnap_bdev = NULL;
230 	struct spdk_bdev *lvol_bdev = NULL;
231 	struct spdk_bdev_desc *esnap_desc = NULL;
232 	struct spdk_bdev_desc *lvol_desc = NULL;
233 	const char bs_malloc_uuid[SPDK_UUID_STRING_LEN] = "11110049-cf29-4681-ab4b-5dd16de6cd81";
234 	const char esnap_uuid[SPDK_UUID_STRING_LEN] = "222251be-1ece-434d-8513-6944d5c93a53";
235 	struct malloc_bdev_opts malloc_opts = { 0 };
236 	const uint32_t bs_size_bytes = 10 * 1024 * 1024;
237 	const uint32_t bs_block_size = 4096;
238 	const uint32_t cluster_size = 32 * 1024;
239 	const uint32_t blocks_per_cluster = cluster_size / bs_block_size;
240 	const uint32_t esnap_size_bytes = 4 * cluster_size;
241 	struct op_with_handle_data owh_data = { 0 };
242 	struct lvol_bdev *_lvol_bdev;
243 	struct spdk_blob *blob;
244 	int rc;
245 
246 	g_bdev_opts.bdev_auto_examine = false;
247 
248 	/* Create device for lvstore */
249 	spdk_uuid_parse(&malloc_opts.uuid, bs_malloc_uuid);
250 	malloc_opts.name = "bs_malloc";
251 	malloc_opts.num_blocks = bs_size_bytes / bs_block_size;
252 	malloc_opts.block_size = bs_block_size;
253 	rc = create_malloc_disk(&bs_bdev, &malloc_opts);
254 	SPDK_CU_ASSERT_FATAL(rc == 0);
255 
256 	/* Create lvstore */
257 	rc = vbdev_lvs_create("bs_malloc", "lvs1", cluster_size, 0, 0,
258 			      lvs_op_with_handle_cb, clear_owh(&owh_data));
259 	SPDK_CU_ASSERT_FATAL(rc == 0);
260 	poll_error_updated(&owh_data.lvserrno);
261 	SPDK_CU_ASSERT_FATAL(owh_data.lvserrno == 0);
262 	SPDK_CU_ASSERT_FATAL(owh_data.u.lvs != NULL);
263 	lvs = owh_data.u.lvs;
264 
265 	/* Create esnap device */
266 	memset(&malloc_opts, 0, sizeof(malloc_opts));
267 	spdk_uuid_parse(&malloc_opts.uuid, esnap_uuid);
268 	malloc_opts.name = "esnap_malloc";
269 	malloc_opts.num_blocks = esnap_size_bytes / bs_block_size;
270 	malloc_opts.block_size = bs_block_size;
271 	rc = create_malloc_disk(&esnap_bdev, &malloc_opts);
272 	SPDK_CU_ASSERT_FATAL(rc == 0);
273 
274 	/* Fill esnap device with pattern */
275 	rc = spdk_bdev_open_ext(esnap_uuid, true, ut_event_cb, NULL, &esnap_desc);
276 	SPDK_CU_ASSERT_FATAL(rc == 0);
277 	scribble(esnap_desc, 0, esnap_bdev->blockcnt);
278 
279 	/* Reopen the external snapshot read-only for verification later */
280 	spdk_bdev_close(esnap_desc);
281 	poll_threads();
282 	rc = spdk_bdev_open_ext(esnap_uuid, false, ut_event_cb, NULL, &esnap_desc);
283 	SPDK_CU_ASSERT_FATAL(rc == 0);
284 
285 	/* Create esnap clone */
286 	vbdev_lvol_create_bdev_clone(esnap_uuid, lvs, "clone1",
287 				     lvol_op_with_handle_cb, clear_owh(&owh_data));
288 	poll_error_updated(&owh_data.lvserrno);
289 	SPDK_CU_ASSERT_FATAL(owh_data.lvserrno == 0);
290 	SPDK_CU_ASSERT_FATAL(owh_data.u.lvol != NULL);
291 
292 	/* Open the esnap clone */
293 	rc = spdk_bdev_open_ext("lvs1/clone1", true, ut_event_cb, NULL, &lvol_desc);
294 	SPDK_CU_ASSERT_FATAL(rc == 0);
295 	lvol_bdev = lvol_desc->bdev;
296 	_lvol_bdev = (struct lvol_bdev *)lvol_bdev;
297 	blob = _lvol_bdev->lvol->blob;
298 	CU_ASSERT(blob->active.num_clusters == 4);
299 	CU_ASSERT(!cluster_is_allocated(blob, 0));
300 	CU_ASSERT(!cluster_is_allocated(blob, 1));
301 	CU_ASSERT(!cluster_is_allocated(blob, 2));
302 	CU_ASSERT(!cluster_is_allocated(blob, 3));
303 
304 	/* Be sure the esnap and the clone see the same content. */
305 	CU_ASSERT(verify(esnap_desc, esnap_bdev, 0, esnap_bdev->blockcnt));
306 	CU_ASSERT(verify(lvol_desc, esnap_bdev, 0, esnap_bdev->blockcnt));
307 
308 	/* Overwrite the second block of the first cluster then verify the whole first cluster */
309 	scribble(lvol_desc, 1, 1);
310 	CU_ASSERT(cluster_is_allocated(blob, 0));
311 	CU_ASSERT(!cluster_is_allocated(blob, 1));
312 	CU_ASSERT(!cluster_is_allocated(blob, 2));
313 	CU_ASSERT(!cluster_is_allocated(blob, 3));
314 	CU_ASSERT(verify(lvol_desc, esnap_bdev, 0, 1));
315 	CU_ASSERT(verify(lvol_desc, lvol_bdev, 1, 1));
316 	CU_ASSERT(verify(lvol_desc, esnap_bdev, 2, blocks_per_cluster - 2));
317 	/* And be sure no writes made it to the external snapshot */
318 	CU_ASSERT(verify(esnap_desc, esnap_bdev, 0, esnap_bdev->blockcnt));
319 
320 	/* Overwrite the two blocks that span the end of the first cluster and the start of the
321 	 * second cluster
322 	 */
323 	scribble(lvol_desc, blocks_per_cluster - 1, 2);
324 	/* The first part of the first cluster was written previously - it should be the same. */
325 	CU_ASSERT(cluster_is_allocated(blob, 0));
326 	CU_ASSERT(cluster_is_allocated(blob, 1));
327 	CU_ASSERT(!cluster_is_allocated(blob, 2));
328 	CU_ASSERT(!cluster_is_allocated(blob, 3));
329 	CU_ASSERT(verify(lvol_desc, esnap_bdev, 0, 1));
330 	CU_ASSERT(verify(lvol_desc, lvol_bdev, 1, 1));
331 	CU_ASSERT(verify(lvol_desc, esnap_bdev, 2, blocks_per_cluster - 2 - 1));
332 	/* Check the newly written area spanning the first two clusters. */
333 	CU_ASSERT(verify(lvol_desc, lvol_bdev, blocks_per_cluster - 1, 2));
334 	/* The rest should not have changed. */
335 	CU_ASSERT(verify(lvol_desc, esnap_bdev, blocks_per_cluster + 1,
336 			 esnap_bdev->blockcnt - blocks_per_cluster - 1));
337 	/* And be sure no writes made it to the external snapshot */
338 	CU_ASSERT(verify(esnap_desc, esnap_bdev, 0, esnap_bdev->blockcnt));
339 
340 	/* Clean up */
341 	bdev_close(lvol_bdev, lvol_desc);
342 	bdev_close(esnap_bdev, esnap_desc);
343 	delete_malloc_disk("esnap_malloc", NULL, 0);
344 	/* This triggers spdk_lvs_unload() */
345 	delete_malloc_disk("bs_malloc", NULL, 0);
346 	poll_threads();
347 }
348 
349 static void
350 esnap_wait_for_examine(void *ctx)
351 {
352 	int *flag = ctx;
353 
354 	*flag = 0;
355 }
356 
357 static void
358 esnap_hotplug(void)
359 {
360 	const char *uuid_esnap = "22218fb6-6743-483d-88b1-de643dc7c0bc";
361 	struct malloc_bdev_opts malloc_opts = { 0 };
362 	const uint32_t bs_size_bytes = 10 * 1024 * 1024;
363 	const uint32_t bs_block_size = 4096;
364 	const uint32_t cluster_size = 32 * 1024;
365 	const uint32_t esnap_size_bytes = 2 * cluster_size;
366 	struct op_with_handle_data owh_data = { 0 };
367 	struct spdk_bdev *malloc_bdev = NULL, *bdev;
368 	struct spdk_lvol_store *lvs;
369 	struct spdk_lvol *lvol;
370 	struct spdk_uuid uuid = { 0 };
371 	char aiopath[PATH_MAX];
372 	int rc, rc2;
373 
374 	g_bdev_opts.bdev_auto_examine = true;
375 
376 	/* Create aio device to hold the lvstore. */
377 	rc = make_test_file(bs_size_bytes, aiopath, sizeof(aiopath), "esnap_hotplug.aio");
378 	SPDK_CU_ASSERT_FATAL(rc == 0);
379 	rc = create_aio_bdev("aio1", aiopath, bs_block_size, false, false, &uuid);
380 	SPDK_CU_ASSERT_FATAL(rc == 0);
381 	poll_threads();
382 
383 	rc = vbdev_lvs_create("aio1", "lvs1", cluster_size, 0, 0,
384 			      lvs_op_with_handle_cb, clear_owh(&owh_data));
385 	SPDK_CU_ASSERT_FATAL(rc == 0);
386 	poll_error_updated(&owh_data.lvserrno);
387 	SPDK_CU_ASSERT_FATAL(owh_data.lvserrno == 0);
388 	SPDK_CU_ASSERT_FATAL(owh_data.u.lvs != NULL);
389 	lvs = owh_data.u.lvs;
390 
391 	/* Create esnap device */
392 	spdk_uuid_parse(&malloc_opts.uuid, uuid_esnap);
393 	malloc_opts.name = "esnap_malloc";
394 	malloc_opts.num_blocks = esnap_size_bytes / bs_block_size;
395 	malloc_opts.block_size = bs_block_size;
396 	rc = create_malloc_disk(&malloc_bdev, &malloc_opts);
397 	SPDK_CU_ASSERT_FATAL(rc == 0);
398 
399 	/* Create esnap clone */
400 	vbdev_lvol_create_bdev_clone(uuid_esnap, lvs, "clone1",
401 				     lvol_op_with_handle_cb, clear_owh(&owh_data));
402 	poll_error_updated(&owh_data.lvserrno);
403 	SPDK_CU_ASSERT_FATAL(owh_data.lvserrno == 0);
404 	SPDK_CU_ASSERT_FATAL(owh_data.u.lvol != NULL);
405 
406 	/* Verify that lvol bdev exists */
407 	bdev = spdk_bdev_get_by_name("lvs1/clone1");
408 	SPDK_CU_ASSERT_FATAL(bdev != NULL);
409 
410 	/* Unload the lvstore and verify the bdev is gone. */
411 	rc = rc2 = 0xbad;
412 	bdev_aio_delete("aio1", unregister_cb, &rc);
413 	CU_ASSERT(spdk_bdev_get_by_name(uuid_esnap) != NULL)
414 	delete_malloc_disk(malloc_bdev->name, unregister_cb, &rc2);
415 	malloc_bdev = NULL;
416 	poll_error_updated(&rc);
417 	poll_error_updated(&rc2);
418 	SPDK_CU_ASSERT_FATAL(rc == 0);
419 	SPDK_CU_ASSERT_FATAL(rc2 == 0);
420 	SPDK_CU_ASSERT_FATAL(spdk_bdev_get_by_name("lvs1/clone1") == NULL);
421 	SPDK_CU_ASSERT_FATAL(spdk_bdev_get_by_name(uuid_esnap) == NULL);
422 
423 	/* Trigger the reload of the lvstore */
424 	rc = create_aio_bdev("aio1", aiopath, bs_block_size, false, false, &uuid);
425 	SPDK_CU_ASSERT_FATAL(rc == 0);
426 	rc = 0xbad;
427 	spdk_bdev_wait_for_examine(esnap_wait_for_examine, &rc);
428 	poll_error_updated(&rc);
429 
430 	/* Verify the lvol is loaded without creating a bdev. */
431 	lvol = spdk_lvol_get_by_names("lvs1", "clone1");
432 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/clone1") == NULL)
433 	SPDK_CU_ASSERT_FATAL(lvol != NULL);
434 	SPDK_CU_ASSERT_FATAL(lvol->degraded_set != NULL);
435 
436 	/* Create the esnap device and verify that the bdev is created. */
437 	rc = create_malloc_disk(&malloc_bdev, &malloc_opts);
438 	SPDK_CU_ASSERT_FATAL(rc == 0);
439 	poll_threads();
440 	CU_ASSERT(malloc_bdev != NULL);
441 	CU_ASSERT(lvol->degraded_set == NULL);
442 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/clone1") != NULL);
443 
444 	/* Clean up */
445 	rc = rc2 = 0xbad;
446 	bdev_aio_delete("aio1", unregister_cb, &rc);
447 	poll_error_updated(&rc);
448 	CU_ASSERT(rc == 0);
449 	if (malloc_bdev != NULL) {
450 		delete_malloc_disk(malloc_bdev->name, unregister_cb, &rc2);
451 		poll_threads();
452 		CU_ASSERT(rc2 == 0);
453 	}
454 	rc = unlink(aiopath);
455 	CU_ASSERT(rc == 0);
456 }
457 
458 static void
459 esnap_remove_degraded(void)
460 {
461 	const char *uuid_esnap = "33358eb9-3dcf-4275-b089-0becc126fc3d";
462 	struct malloc_bdev_opts malloc_opts = { 0 };
463 	const uint32_t bs_size_bytes = 10 * 1024 * 1024;
464 	const uint32_t bs_block_size = 4096;
465 	const uint32_t cluster_size = 32 * 1024;
466 	const uint32_t esnap_size_bytes = 2 * cluster_size;
467 	struct op_with_handle_data owh_data = { 0 };
468 	struct spdk_lvol_store *lvs;
469 	struct spdk_bdev *malloc_bdev = NULL;
470 	struct spdk_uuid uuid = { 0 };
471 	struct spdk_lvol *vol1, *vol2, *vol3;
472 	char aiopath[PATH_MAX];
473 	int rc, rc2;
474 
475 	g_bdev_opts.bdev_auto_examine = true;
476 
477 	/* Create aio device to hold the lvstore. */
478 	rc = make_test_file(bs_size_bytes, aiopath, sizeof(aiopath), "remove_degraded.aio");
479 	SPDK_CU_ASSERT_FATAL(rc == 0);
480 	rc = create_aio_bdev("aio1", aiopath, bs_block_size, false, false, &uuid);
481 	SPDK_CU_ASSERT_FATAL(rc == 0);
482 	poll_threads();
483 
484 	rc = vbdev_lvs_create("aio1", "lvs1", cluster_size, 0, 0,
485 			      lvs_op_with_handle_cb, clear_owh(&owh_data));
486 	SPDK_CU_ASSERT_FATAL(rc == 0);
487 	poll_error_updated(&owh_data.lvserrno);
488 	SPDK_CU_ASSERT_FATAL(owh_data.lvserrno == 0);
489 	SPDK_CU_ASSERT_FATAL(owh_data.u.lvs != NULL);
490 	lvs = owh_data.u.lvs;
491 
492 	/* Create esnap device */
493 	spdk_uuid_parse(&malloc_opts.uuid, uuid_esnap);
494 	malloc_opts.name = "esnap";
495 	malloc_opts.num_blocks = esnap_size_bytes / bs_block_size;
496 	malloc_opts.block_size = bs_block_size;
497 	rc = create_malloc_disk(&malloc_bdev, &malloc_opts);
498 	SPDK_CU_ASSERT_FATAL(rc == 0);
499 
500 	/* Create a snapshot of vol1.
501 	 * State:
502 	 *   esnap <-- vol1
503 	 */
504 	vbdev_lvol_create_bdev_clone(uuid_esnap, lvs, "vol1",
505 				     lvol_op_with_handle_cb, clear_owh(&owh_data));
506 	poll_error_updated(&owh_data.lvserrno);
507 	SPDK_CU_ASSERT_FATAL(owh_data.lvserrno == 0);
508 	SPDK_CU_ASSERT_FATAL(owh_data.u.lvol != NULL);
509 	vol1 = owh_data.u.lvol;
510 
511 	/* Create a snapshot of vol1.
512 	 * State:
513 	 *   esnap <-- vol2 <-- vol1
514 	 */
515 	vbdev_lvol_create_snapshot(vol1, "vol2", lvol_op_with_handle_cb, clear_owh(&owh_data));
516 	poll_error_updated(&owh_data.lvserrno);
517 	SPDK_CU_ASSERT_FATAL(owh_data.lvserrno == 0);
518 	SPDK_CU_ASSERT_FATAL(owh_data.u.lvol != NULL);
519 	vol2 = owh_data.u.lvol;
520 
521 	/* Create a clone of vol2.
522 	 * State:
523 	 *   esnap <-- vol2 <-- vol1
524 	 *                `---- vol3
525 	 */
526 	vbdev_lvol_create_clone(vol2, "vol3", lvol_op_with_handle_cb, clear_owh(&owh_data));
527 	poll_error_updated(&owh_data.lvserrno);
528 	SPDK_CU_ASSERT_FATAL(owh_data.lvserrno == 0);
529 	SPDK_CU_ASSERT_FATAL(owh_data.u.lvol != NULL);
530 	vol3 = owh_data.u.lvol;
531 
532 	/* Unload the lvstore and delete esnap */
533 	rc = rc2 = 0xbad;
534 	bdev_aio_delete("aio1", unregister_cb, &rc);
535 	CU_ASSERT(spdk_bdev_get_by_name(uuid_esnap) != NULL)
536 	delete_malloc_disk(malloc_bdev->name, unregister_cb, &rc2);
537 	malloc_bdev = NULL;
538 	poll_error_updated(&rc);
539 	poll_error_updated(&rc2);
540 	SPDK_CU_ASSERT_FATAL(rc == 0);
541 	SPDK_CU_ASSERT_FATAL(rc2 == 0);
542 
543 	/* Trigger the reload of the lvstore.
544 	 * State:
545 	 *   (missing) <-- vol2 <-- vol1
546 	 *                    `---- vol3
547 	 */
548 	rc = create_aio_bdev("aio1", aiopath, bs_block_size, false, false, &uuid);
549 	SPDK_CU_ASSERT_FATAL(rc == 0);
550 	rc = 0xbad;
551 	spdk_bdev_wait_for_examine(esnap_wait_for_examine, &rc);
552 	poll_error_updated(&rc);
553 
554 	/* Verify vol1 is as described in diagram above */
555 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/vol1") == NULL);
556 	vol1 = spdk_lvol_get_by_names("lvs1", "vol1");
557 	SPDK_CU_ASSERT_FATAL(vol1 != NULL);
558 	lvs = vol1->lvol_store;
559 	CU_ASSERT(spdk_blob_is_clone(vol1->blob));
560 	CU_ASSERT(!spdk_blob_is_esnap_clone(vol1->blob));
561 	CU_ASSERT(!spdk_blob_is_snapshot(vol1->blob));
562 	CU_ASSERT(vol1->degraded_set == NULL);
563 
564 	/* Verify vol2 is as described in diagram above */
565 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/vol2") == NULL);
566 	vol2 = spdk_lvol_get_by_names("lvs1", "vol2");
567 	SPDK_CU_ASSERT_FATAL(vol2 != NULL);
568 	CU_ASSERT(!spdk_blob_is_clone(vol2->blob));
569 	CU_ASSERT(spdk_blob_is_esnap_clone(vol2->blob));
570 	CU_ASSERT(spdk_blob_is_snapshot(vol2->blob));
571 	CU_ASSERT(RB_MIN(degraded_lvol_sets_tree, &lvs->degraded_lvol_sets_tree) == vol2->degraded_set);
572 	CU_ASSERT(TAILQ_FIRST(&vol2->degraded_set->lvols) == vol2);
573 
574 	/* Verify vol3 is as described in diagram above */
575 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/vol3") == NULL);
576 	vol3 = spdk_lvol_get_by_names("lvs1", "vol3");
577 	SPDK_CU_ASSERT_FATAL(vol3 != NULL);
578 	CU_ASSERT(spdk_blob_is_clone(vol3->blob));
579 	CU_ASSERT(!spdk_blob_is_esnap_clone(vol3->blob));
580 	CU_ASSERT(!spdk_blob_is_snapshot(vol3->blob));
581 	CU_ASSERT(vol3->degraded_set == NULL);
582 
583 	/* Try to delete vol2. Should fail because it has multiple clones. */
584 	rc = 0xbad;
585 	vbdev_lvol_destroy(vol2, lvol_op_complete_cb, &rc);
586 	poll_error_updated(&rc);
587 	CU_ASSERT(rc == -EPERM);
588 
589 	/* Delete vol1
590 	 * New state:
591 	 *   (missing) <-- vol2 <-- vol3
592 	 */
593 	rc = 0xbad;
594 	vbdev_lvol_destroy(vol1, lvol_op_complete_cb, &rc);
595 	poll_error_updated(&rc);
596 	CU_ASSERT(rc == 0);
597 
598 	/* Verify vol1 is gone */
599 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/vol1") == NULL);
600 	vol1 = spdk_lvol_get_by_names("lvs1", "vol1");
601 	CU_ASSERT(vol1 == NULL);
602 
603 	/* Verify vol2 is as described in diagram above */
604 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/vol2") == NULL);
605 	vol2 = spdk_lvol_get_by_names("lvs1", "vol2");
606 	SPDK_CU_ASSERT_FATAL(vol2 != NULL);
607 	CU_ASSERT(!spdk_blob_is_clone(vol2->blob));
608 	CU_ASSERT(spdk_blob_is_esnap_clone(vol2->blob));
609 	CU_ASSERT(spdk_blob_is_snapshot(vol2->blob));
610 	CU_ASSERT(RB_MIN(degraded_lvol_sets_tree, &lvs->degraded_lvol_sets_tree) == vol2->degraded_set);
611 	CU_ASSERT(TAILQ_FIRST(&vol2->degraded_set->lvols) == vol2);
612 
613 	/* Verify vol3 is as described in diagram above */
614 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/vol3") == NULL);
615 	vol3 = spdk_lvol_get_by_names("lvs1", "vol3");
616 	SPDK_CU_ASSERT_FATAL(vol3 != NULL);
617 	CU_ASSERT(spdk_blob_is_clone(vol3->blob));
618 	CU_ASSERT(!spdk_blob_is_esnap_clone(vol3->blob));
619 	CU_ASSERT(!spdk_blob_is_snapshot(vol3->blob));
620 	CU_ASSERT(vol3->degraded_set == NULL);
621 
622 	/* Delete vol2
623 	 * New state:
624 	 *   (missing) <-- vol3
625 	 */
626 	rc = 0xbad;
627 	vbdev_lvol_destroy(vol2, lvol_op_complete_cb, &rc);
628 	poll_error_updated(&rc);
629 	CU_ASSERT(rc == 0);
630 
631 	/* Verify vol2 is gone */
632 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/vol2") == NULL);
633 	vol2 = spdk_lvol_get_by_names("lvs1", "vol2");
634 	SPDK_CU_ASSERT_FATAL(vol2 == NULL);
635 
636 	/* Verify vol3 is as described in diagram above */
637 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/vol3") == NULL);
638 	vol3 = spdk_lvol_get_by_names("lvs1", "vol3");
639 	SPDK_CU_ASSERT_FATAL(vol3 != NULL);
640 	CU_ASSERT(!spdk_blob_is_clone(vol3->blob));
641 	CU_ASSERT(spdk_blob_is_esnap_clone(vol3->blob));
642 	CU_ASSERT(!spdk_blob_is_snapshot(vol3->blob));
643 	CU_ASSERT(RB_MIN(degraded_lvol_sets_tree, &lvs->degraded_lvol_sets_tree) == vol3->degraded_set);
644 	CU_ASSERT(TAILQ_FIRST(&vol3->degraded_set->lvols) == vol3);
645 
646 	/* Delete vol3
647 	 * New state:
648 	 *   (nothing)
649 	 */
650 	rc = 0xbad;
651 	vbdev_lvol_destroy(vol3, lvol_op_complete_cb, &rc);
652 	poll_error_updated(&rc);
653 	CU_ASSERT(rc == 0);
654 
655 	/* Verify vol3 is gone */
656 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/vol3") == NULL);
657 	vol3 = spdk_lvol_get_by_names("lvs1", "vol3");
658 	SPDK_CU_ASSERT_FATAL(vol3 == NULL);
659 
660 	/* Nothing depends on the missing bdev, so it is no longer missing. */
661 	CU_ASSERT(RB_EMPTY(&lvs->degraded_lvol_sets_tree));
662 
663 	/* Clean up */
664 	rc = rc2 = 0xbad;
665 	bdev_aio_delete("aio1", unregister_cb, &rc);
666 	poll_error_updated(&rc);
667 	CU_ASSERT(rc == 0);
668 	if (malloc_bdev != NULL) {
669 		delete_malloc_disk(malloc_bdev->name, unregister_cb, &rc2);
670 		poll_threads();
671 		CU_ASSERT(rc2 == 0);
672 	}
673 	rc = unlink(aiopath);
674 	CU_ASSERT(rc == 0);
675 }
676 
677 static void
678 late_delete(void)
679 {
680 	char aiopath[PATH_MAX];
681 	struct spdk_lvol_store *lvs = NULL;
682 	const uint32_t bs_size_bytes = 10 * 1024 * 1024;
683 	const uint32_t bs_block_size = 4096;
684 	const uint32_t cluster_size = 32 * 1024;
685 	struct op_with_handle_data owh_data;
686 	struct lvol_bdev *lvol_bdev;
687 	struct spdk_lvol *lvol;
688 	struct spdk_uuid uuid = { 0 };
689 	int rc, rc2;
690 	int count;
691 
692 	/* Create device for lvstore. This cannot be a malloc bdev because we need to sneak a
693 	 * deletion in while blob_persist() is in progress.
694 	 */
695 	rc = make_test_file(bs_size_bytes, aiopath, sizeof(aiopath), "late_delete.aio");
696 	SPDK_CU_ASSERT_FATAL(rc == 0);
697 	rc = create_aio_bdev("aio1", aiopath, bs_block_size, false, false, &uuid);
698 	SPDK_CU_ASSERT_FATAL(rc == 0);
699 	poll_threads();
700 
701 	/* Create lvstore */
702 	rc = vbdev_lvs_create("aio1", "lvs1", cluster_size, 0, 0,
703 			      lvs_op_with_handle_cb, clear_owh(&owh_data));
704 	SPDK_CU_ASSERT_FATAL(rc == 0);
705 	poll_error_updated(&owh_data.lvserrno);
706 	SPDK_CU_ASSERT_FATAL(owh_data.lvserrno == 0);
707 	SPDK_CU_ASSERT_FATAL(owh_data.u.lvs != NULL);
708 	lvs = owh_data.u.lvs;
709 
710 	/* Create an lvol */
711 	vbdev_lvol_create(lvs, "lvol", 1, true, LVOL_CLEAR_WITH_DEFAULT,
712 			  lvol_op_with_handle_cb, clear_owh(&owh_data));
713 	poll_error_updated(&owh_data.lvserrno);
714 	CU_ASSERT(owh_data.lvserrno == 0);
715 	CU_ASSERT(owh_data.u.lvol != NULL);
716 
717 	/* Verify the lvol can be found both ways */
718 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/lvol") != NULL);
719 	CU_ASSERT(spdk_lvol_get_by_names("lvs1", "lvol") != NULL);
720 
721 	/*
722 	 * Once the lvolstore deletion starts, it will briefly be possible to look up lvol bdevs and
723 	 * degraded lvols in that lvolstore but any attempt to delete them will fail with -ENODEV.
724 	 */
725 	rc = 0xbad;
726 	count = 0;
727 	vbdev_lvs_destruct(lvs, lvol_op_complete_cb, &rc);
728 	do {
729 		lvol_bdev = (struct lvol_bdev *)spdk_bdev_get_by_name("lvs1/lvol");
730 		if (lvol_bdev != NULL) {
731 			rc2 = 0xbad;
732 			vbdev_lvol_destroy(lvol_bdev->lvol, lvol_op_complete_cb, &rc2);
733 			CU_ASSERT(rc2 == -ENODEV);
734 		}
735 		lvol = spdk_lvol_get_by_names("lvs1", "lvol");
736 		if (lvol != NULL) {
737 			/* If we are here, we are likely reproducing #2998 */
738 			rc2 = 0xbad;
739 			vbdev_lvol_destroy(lvol, lvol_op_complete_cb, &rc2);
740 			CU_ASSERT(rc2 == -ENODEV);
741 			count++;
742 		}
743 
744 		spdk_thread_poll(g_ut_threads[0].thread, 0, 0);
745 	} while (rc == 0xbad);
746 
747 	/* Ensure that the test exercised the race */
748 	CU_ASSERT(count != 0);
749 
750 	/* The lvol is now gone */
751 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/lvol") == NULL);
752 	CU_ASSERT(spdk_lvol_get_by_names("lvs1", "lvol") == NULL);
753 
754 	/* Clean up */
755 	rc = 0xbad;
756 	bdev_aio_delete("aio1", unregister_cb, &rc);
757 	poll_error_updated(&rc);
758 	CU_ASSERT(rc == 0);
759 	rc = unlink(aiopath);
760 	CU_ASSERT(rc == 0);
761 }
762 
763 static void
764 bdev_init_cb(void *arg, int rc)
765 {
766 	assert(rc == 0);
767 }
768 
769 static void
770 subsystem_init_cb(int rc, void *ctx)
771 {
772 	assert(rc == 0);
773 }
774 
775 static void
776 bdev_fini_cb(void *arg)
777 {
778 }
779 
780 int
781 main(int argc, char **argv)
782 {
783 	CU_pSuite	suite = NULL;
784 	unsigned int	num_failures;
785 	int rc;
786 
787 	set_testdir(argv[0]);
788 
789 	CU_initialize_registry();
790 
791 	suite = CU_add_suite("esnap_io", NULL, NULL);
792 
793 	CU_ADD_TEST(suite, esnap_clone_io);
794 	CU_ADD_TEST(suite, esnap_hotplug);
795 	CU_ADD_TEST(suite, esnap_remove_degraded);
796 	CU_ADD_TEST(suite, late_delete);
797 
798 	allocate_threads(2);
799 	set_thread(0);
800 
801 	/*
802 	 * This is a non-standard way of initializing libraries. It works for this test but
803 	 * shouldn't be used as an example elsewhere, except for maybe other tests.
804 	 */
805 	spdk_subsystem_init(subsystem_init_cb, NULL);
806 	rc = spdk_iobuf_initialize();
807 	if (rc != 0) {
808 		SPDK_ERRLOG("Failed to initialize iobuf\n");
809 		abort();
810 	}
811 	rc = spdk_accel_initialize();
812 	if (rc != 0) {
813 		SPDK_ERRLOG("Failed to initialize accel\n");
814 		abort();
815 	}
816 	spdk_bdev_initialize(bdev_init_cb, NULL);
817 
818 	num_failures = spdk_ut_run_tests(argc, argv, NULL);
819 	CU_cleanup_registry();
820 
821 	spdk_bdev_finish(bdev_fini_cb, NULL);
822 	spdk_accel_finish(bdev_fini_cb, NULL);
823 	spdk_iobuf_finish(bdev_fini_cb, NULL);
824 
825 	free_threads();
826 
827 	return num_failures;
828 }
829