xref: /spdk/test/lvol/esnap/esnap.c (revision b3bec07939ebe2ea2e0c43931705d32aa9e06719)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3  */
4 
5 #include "spdk/stdinc.h"
6 #include "spdk_internal/cunit.h"
7 #include "spdk/string.h"
8 #include "spdk/init.h"
9 
10 #include "common/lib/ut_multithread.c"
11 
12 #include "bdev/bdev.c"
13 #include "lvol/lvol.c"
14 #include "bdev/malloc/bdev_malloc.c"
15 #include "bdev/lvol/vbdev_lvol.c"
16 #include "accel/accel_sw.c"
17 #include "bdev/part.c"
18 #include "blob/blobstore.h"
19 #include "bdev/aio/bdev_aio.h"
20 
21 #include "unit/lib/json_mock.c"
22 
23 #ifdef SPDK_CONFIG_PMDK
24 DEFINE_STUB(pmem_msync, int, (const void *addr, size_t len), 0);
25 DEFINE_STUB(pmem_memcpy_persist, void *, (void *pmemdest, const void *src, size_t len), NULL);
26 DEFINE_STUB(pmem_is_pmem, int, (const void *addr, size_t len), 0);
27 DEFINE_STUB(pmem_memset_persist, void *, (void *pmemdest, int c, size_t len), NULL);
28 #endif
29 
30 char g_testdir[PATH_MAX];
31 
32 static void
33 set_testdir(const char *path)
34 {
35 	char *tmp;
36 
37 	tmp = realpath(path, NULL);
38 	snprintf(g_testdir, sizeof(g_testdir), "%s", tmp ? dirname(tmp) : ".");
39 	free(tmp);
40 }
41 
42 static int
43 make_test_file(size_t size, char *path, size_t len, const char *name)
44 {
45 	int fd;
46 	int rc;
47 
48 	CU_ASSERT(len <= INT32_MAX);
49 	if (snprintf(path, len, "%s/%s", g_testdir, name) >= (int)len) {
50 		return -ENAMETOOLONG;
51 	}
52 	unlink(path);
53 	fd = open(path, O_RDWR | O_CREAT | O_EXCL, 0600);
54 	if (fd < 0) {
55 		return -errno;
56 	}
57 	rc = ftruncate(fd, size);
58 	if (rc != 0) {
59 		rc = -errno;
60 		unlink(path);
61 	}
62 	close(fd);
63 	return rc;
64 }
65 
66 static void
67 unregister_cb(void *ctx, int bdeverrno)
68 {
69 	int *rc = ctx;
70 
71 	if (rc != NULL) {
72 		*rc = bdeverrno;
73 	}
74 }
75 
76 struct op_with_handle_data {
77 	union {
78 		struct spdk_lvol_store *lvs;
79 		struct spdk_lvol *lvol;
80 	} u;
81 	int lvserrno;
82 };
83 
84 static struct op_with_handle_data *
85 clear_owh(struct op_with_handle_data *owh)
86 {
87 	memset(owh, 0, sizeof(*owh));
88 	owh->lvserrno = 0xbad;
89 
90 	return owh;
91 }
92 
93 /* spdk_poll_threads() doesn't have visibility into uncompleted aio operations. */
94 static void
95 poll_error_updated(int *error)
96 {
97 	while (*error == 0xbad) {
98 		poll_threads();
99 	}
100 }
101 
102 static void
103 lvs_op_with_handle_cb(void *cb_arg, struct spdk_lvol_store *lvs, int lvserrno)
104 {
105 	struct op_with_handle_data *data = cb_arg;
106 
107 	data->u.lvs = lvs;
108 	data->lvserrno = lvserrno;
109 }
110 
111 static void
112 lvol_op_with_handle_cb(void *cb_arg, struct spdk_lvol *lvol, int lvserrno)
113 {
114 	struct op_with_handle_data *data = cb_arg;
115 
116 	data->u.lvol = lvol;
117 	data->lvserrno = lvserrno;
118 }
119 
120 static void
121 lvol_op_complete_cb(void *cb_arg, int lvolerrno)
122 {
123 	int *err = cb_arg;
124 
125 	*err = lvolerrno;
126 }
127 
128 static void
129 ut_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
130 {
131 }
132 
133 static void
134 io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
135 {
136 	int *err = cb_arg;
137 
138 	spdk_bdev_free_io(bdev_io);
139 	SPDK_CU_ASSERT_FATAL(success);
140 
141 	*err = 0;
142 }
143 
144 static void
145 prepare_block(char *buf, size_t bufsz, const char *uuid_str, uint64_t block)
146 {
147 	memset(buf, 0, bufsz);
148 	snprintf(buf, bufsz, "%s %8" PRIu64, uuid_str, block);
149 }
150 
151 static void
152 scribble(struct spdk_bdev_desc *desc, uint64_t start, uint64_t count)
153 {
154 	struct spdk_bdev *bdev = desc->bdev;
155 	const uint32_t blocklen = desc->bdev->blocklen;
156 	struct spdk_io_channel *ch = spdk_bdev_get_io_channel(desc);
157 	char uuid_str[SPDK_UUID_STRING_LEN];
158 	char buf[count][blocklen];
159 	int err = 0xbad;
160 	uint64_t i;
161 
162 	SPDK_CU_ASSERT_FATAL(count > 0 && count < INT32_MAX);
163 	SPDK_CU_ASSERT_FATAL(ch != NULL);
164 
165 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid);
166 
167 	for (i = 0; i < count; i++) {
168 		prepare_block(buf[i], sizeof(buf[i]), uuid_str, start + i);
169 	}
170 
171 	spdk_bdev_write(desc, ch, buf, start * blocklen, sizeof(buf), io_done, &err);
172 	poll_threads();
173 	SPDK_CU_ASSERT_FATAL(err == 0);
174 	spdk_put_io_channel(ch);
175 	poll_threads();
176 }
177 
178 #define verify(desc, bdev, start, count) _verify(desc, bdev, start, count, __FILE__, __LINE__)
179 
180 static bool
181 _verify(struct spdk_bdev_desc *desc, struct spdk_bdev *bdev, uint64_t start, uint64_t count,
182 	const char *file, int line)
183 {
184 	struct spdk_io_channel *ch = spdk_bdev_get_io_channel(desc);
185 	const uint32_t blocklen = desc->bdev->blocklen;
186 	char uuid_str[SPDK_UUID_STRING_LEN];
187 	char buf[blocklen];
188 	char expect[blocklen];
189 	int err = 0xbad;
190 	bool ret = true;
191 	uint64_t i;
192 
193 	SPDK_CU_ASSERT_FATAL(count > 0 && count < INT32_MAX);
194 	SPDK_CU_ASSERT_FATAL(ch != NULL);
195 
196 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &bdev->uuid);
197 
198 	for (i = 0; i < count; i++) {
199 		uint64_t block = start + i;
200 
201 		spdk_bdev_read(desc, ch, buf, block * blocklen, sizeof(buf), io_done, &err);
202 		poll_threads();
203 		SPDK_CU_ASSERT_FATAL(err == 0);
204 		prepare_block(expect, sizeof(expect), uuid_str, block);
205 		if (memcmp(expect, buf, blocklen) != 0) {
206 			printf("%s:%d: ERROR: expected '%s' got '%s'\n", file, line,
207 			       expect, buf);
208 			ret = false;
209 		}
210 	}
211 
212 	spdk_put_io_channel(ch);
213 	poll_threads();
214 
215 	return ret;
216 }
217 
218 static bool
219 cluster_is_allocated(struct spdk_blob *blob, uint32_t cluster)
220 {
221 	return bs_io_unit_is_allocated(blob, cluster * blob->bs->pages_per_cluster);
222 }
223 
224 static void
225 esnap_clone_io(void)
226 {
227 	struct spdk_lvol_store *lvs = NULL;
228 	struct spdk_bdev *bs_bdev = NULL;
229 	struct spdk_bdev *esnap_bdev = NULL;
230 	struct spdk_bdev *lvol_bdev = NULL;
231 	struct spdk_bdev_desc *esnap_desc = NULL;
232 	struct spdk_bdev_desc *lvol_desc = NULL;
233 	const char bs_malloc_uuid[SPDK_UUID_STRING_LEN] = "11110049-cf29-4681-ab4b-5dd16de6cd81";
234 	const char esnap_uuid[SPDK_UUID_STRING_LEN] = "222251be-1ece-434d-8513-6944d5c93a53";
235 	struct malloc_bdev_opts malloc_opts = { 0 };
236 	const uint32_t bs_size_bytes = 10 * 1024 * 1024;
237 	const uint32_t bs_block_size = 4096;
238 	const uint32_t cluster_size = 32 * 1024;
239 	const uint32_t blocks_per_cluster = cluster_size / bs_block_size;
240 	const uint32_t esnap_size_bytes = 4 * cluster_size;
241 	struct op_with_handle_data owh_data = { 0 };
242 	struct lvol_bdev *_lvol_bdev;
243 	struct spdk_blob *blob;
244 	int rc;
245 
246 	g_bdev_opts.bdev_auto_examine = false;
247 
248 	/* Create device for lvstore */
249 	spdk_uuid_parse(&malloc_opts.uuid, bs_malloc_uuid);
250 	malloc_opts.name = "bs_malloc";
251 	malloc_opts.num_blocks = bs_size_bytes / bs_block_size;
252 	malloc_opts.block_size = bs_block_size;
253 	rc = create_malloc_disk(&bs_bdev, &malloc_opts);
254 	SPDK_CU_ASSERT_FATAL(rc == 0);
255 
256 	/* Create lvstore */
257 	rc = vbdev_lvs_create("bs_malloc", "lvs1", cluster_size, 0, 0,
258 			      lvs_op_with_handle_cb, clear_owh(&owh_data));
259 	SPDK_CU_ASSERT_FATAL(rc == 0);
260 	poll_error_updated(&owh_data.lvserrno);
261 	SPDK_CU_ASSERT_FATAL(owh_data.lvserrno == 0);
262 	SPDK_CU_ASSERT_FATAL(owh_data.u.lvs != NULL);
263 	lvs = owh_data.u.lvs;
264 
265 	/* Create esnap device */
266 	memset(&malloc_opts, 0, sizeof(malloc_opts));
267 	spdk_uuid_parse(&malloc_opts.uuid, esnap_uuid);
268 	malloc_opts.name = "esnap_malloc";
269 	malloc_opts.num_blocks = esnap_size_bytes / bs_block_size;
270 	malloc_opts.block_size = bs_block_size;
271 	rc = create_malloc_disk(&esnap_bdev, &malloc_opts);
272 	SPDK_CU_ASSERT_FATAL(rc == 0);
273 
274 	/* Fill esnap device with pattern */
275 	rc = spdk_bdev_open_ext(esnap_uuid, true, ut_event_cb, NULL, &esnap_desc);
276 	SPDK_CU_ASSERT_FATAL(rc == 0);
277 	scribble(esnap_desc, 0, esnap_bdev->blockcnt);
278 
279 	/* Reopen the external snapshot read-only for verification later */
280 	spdk_bdev_close(esnap_desc);
281 	poll_threads();
282 	rc = spdk_bdev_open_ext(esnap_uuid, false, ut_event_cb, NULL, &esnap_desc);
283 	SPDK_CU_ASSERT_FATAL(rc == 0);
284 
285 	/* Create esnap clone */
286 	vbdev_lvol_create_bdev_clone(esnap_uuid, lvs, "clone1",
287 				     lvol_op_with_handle_cb, clear_owh(&owh_data));
288 	poll_error_updated(&owh_data.lvserrno);
289 	SPDK_CU_ASSERT_FATAL(owh_data.lvserrno == 0);
290 	SPDK_CU_ASSERT_FATAL(owh_data.u.lvol != NULL);
291 
292 	/* Open the esnap clone */
293 	rc = spdk_bdev_open_ext("lvs1/clone1", true, ut_event_cb, NULL, &lvol_desc);
294 	SPDK_CU_ASSERT_FATAL(rc == 0);
295 	lvol_bdev = lvol_desc->bdev;
296 	_lvol_bdev = (struct lvol_bdev *)lvol_bdev;
297 	blob = _lvol_bdev->lvol->blob;
298 	CU_ASSERT(blob->active.num_clusters == 4);
299 	CU_ASSERT(!cluster_is_allocated(blob, 0));
300 	CU_ASSERT(!cluster_is_allocated(blob, 1));
301 	CU_ASSERT(!cluster_is_allocated(blob, 2));
302 	CU_ASSERT(!cluster_is_allocated(blob, 3));
303 
304 	/* Be sure the esnap and the clone see the same content. */
305 	CU_ASSERT(verify(esnap_desc, esnap_bdev, 0, esnap_bdev->blockcnt));
306 	CU_ASSERT(verify(lvol_desc, esnap_bdev, 0, esnap_bdev->blockcnt));
307 
308 	/* Overwrite the second block of the first cluster then verify the whole first cluster */
309 	scribble(lvol_desc, 1, 1);
310 	CU_ASSERT(cluster_is_allocated(blob, 0));
311 	CU_ASSERT(!cluster_is_allocated(blob, 1));
312 	CU_ASSERT(!cluster_is_allocated(blob, 2));
313 	CU_ASSERT(!cluster_is_allocated(blob, 3));
314 	CU_ASSERT(verify(lvol_desc, esnap_bdev, 0, 1));
315 	CU_ASSERT(verify(lvol_desc, lvol_bdev, 1, 1));
316 	CU_ASSERT(verify(lvol_desc, esnap_bdev, 2, blocks_per_cluster - 2));
317 	/* And be sure no writes made it to the external snapshot */
318 	CU_ASSERT(verify(esnap_desc, esnap_bdev, 0, esnap_bdev->blockcnt));
319 
320 	/* Overwrite the two blocks that span the end of the first cluster and the start of the
321 	 * second cluster
322 	 */
323 	scribble(lvol_desc, blocks_per_cluster - 1, 2);
324 	/* The first part of the first cluster was written previously - it should be the same. */
325 	CU_ASSERT(cluster_is_allocated(blob, 0));
326 	CU_ASSERT(cluster_is_allocated(blob, 1));
327 	CU_ASSERT(!cluster_is_allocated(blob, 2));
328 	CU_ASSERT(!cluster_is_allocated(blob, 3));
329 	CU_ASSERT(verify(lvol_desc, esnap_bdev, 0, 1));
330 	CU_ASSERT(verify(lvol_desc, lvol_bdev, 1, 1));
331 	CU_ASSERT(verify(lvol_desc, esnap_bdev, 2, blocks_per_cluster - 2 - 1));
332 	/* Check the newly written area spanning the first two clusters. */
333 	CU_ASSERT(verify(lvol_desc, lvol_bdev, blocks_per_cluster - 1, 2));
334 	/* The rest should not have changed. */
335 	CU_ASSERT(verify(lvol_desc, esnap_bdev, blocks_per_cluster + 1,
336 			 esnap_bdev->blockcnt - blocks_per_cluster - 1));
337 	/* And be sure no writes made it to the external snapshot */
338 	CU_ASSERT(verify(esnap_desc, esnap_bdev, 0, esnap_bdev->blockcnt));
339 
340 	/* Clean up */
341 	bdev_close(lvol_bdev, lvol_desc);
342 	bdev_close(esnap_bdev, esnap_desc);
343 	delete_malloc_disk("esnap_malloc", NULL, 0);
344 	/* This triggers spdk_lvs_unload() */
345 	delete_malloc_disk("bs_malloc", NULL, 0);
346 	poll_threads();
347 }
348 
349 static void
350 esnap_wait_for_examine(void *ctx)
351 {
352 	int *flag = ctx;
353 
354 	*flag = 0;
355 }
356 
357 static void
358 esnap_hotplug(void)
359 {
360 	const char *uuid_esnap = "22218fb6-6743-483d-88b1-de643dc7c0bc";
361 	struct malloc_bdev_opts malloc_opts = { 0 };
362 	const uint32_t bs_size_bytes = 10 * 1024 * 1024;
363 	const uint32_t bs_block_size = 4096;
364 	const uint32_t cluster_size = 32 * 1024;
365 	const uint32_t esnap_size_bytes = 2 * cluster_size;
366 	struct op_with_handle_data owh_data = { 0 };
367 	struct spdk_bdev *malloc_bdev = NULL, *bdev;
368 	struct spdk_lvol_store *lvs;
369 	struct spdk_lvol *lvol;
370 	char aiopath[PATH_MAX];
371 	int rc, rc2;
372 
373 	g_bdev_opts.bdev_auto_examine = true;
374 
375 	/* Create aio device to hold the lvstore. */
376 	rc = make_test_file(bs_size_bytes, aiopath, sizeof(aiopath), "esnap_hotplug.aio");
377 	SPDK_CU_ASSERT_FATAL(rc == 0);
378 	rc = create_aio_bdev("aio1", aiopath, bs_block_size, false);
379 	SPDK_CU_ASSERT_FATAL(rc == 0);
380 	poll_threads();
381 
382 	rc = vbdev_lvs_create("aio1", "lvs1", cluster_size, 0, 0,
383 			      lvs_op_with_handle_cb, clear_owh(&owh_data));
384 	SPDK_CU_ASSERT_FATAL(rc == 0);
385 	poll_error_updated(&owh_data.lvserrno);
386 	SPDK_CU_ASSERT_FATAL(owh_data.lvserrno == 0);
387 	SPDK_CU_ASSERT_FATAL(owh_data.u.lvs != NULL);
388 	lvs = owh_data.u.lvs;
389 
390 	/* Create esnap device */
391 	spdk_uuid_parse(&malloc_opts.uuid, uuid_esnap);
392 	malloc_opts.name = "esnap_malloc";
393 	malloc_opts.num_blocks = esnap_size_bytes / bs_block_size;
394 	malloc_opts.block_size = bs_block_size;
395 	rc = create_malloc_disk(&malloc_bdev, &malloc_opts);
396 	SPDK_CU_ASSERT_FATAL(rc == 0);
397 
398 	/* Create esnap clone */
399 	vbdev_lvol_create_bdev_clone(uuid_esnap, lvs, "clone1",
400 				     lvol_op_with_handle_cb, clear_owh(&owh_data));
401 	poll_error_updated(&owh_data.lvserrno);
402 	SPDK_CU_ASSERT_FATAL(owh_data.lvserrno == 0);
403 	SPDK_CU_ASSERT_FATAL(owh_data.u.lvol != NULL);
404 
405 	/* Verify that lvol bdev exists */
406 	bdev = spdk_bdev_get_by_name("lvs1/clone1");
407 	SPDK_CU_ASSERT_FATAL(bdev != NULL);
408 
409 	/* Unload the lvstore and verify the bdev is gone. */
410 	rc = rc2 = 0xbad;
411 	bdev_aio_delete("aio1", unregister_cb, &rc);
412 	CU_ASSERT(spdk_bdev_get_by_name(uuid_esnap) != NULL)
413 	delete_malloc_disk(malloc_bdev->name, unregister_cb, &rc2);
414 	malloc_bdev = NULL;
415 	poll_error_updated(&rc);
416 	poll_error_updated(&rc2);
417 	SPDK_CU_ASSERT_FATAL(rc == 0);
418 	SPDK_CU_ASSERT_FATAL(rc2 == 0);
419 	SPDK_CU_ASSERT_FATAL(spdk_bdev_get_by_name("lvs1/clone1") == NULL);
420 	SPDK_CU_ASSERT_FATAL(spdk_bdev_get_by_name(uuid_esnap) == NULL);
421 
422 	/* Trigger the reload of the lvstore */
423 	rc = create_aio_bdev("aio1", aiopath, bs_block_size, false);
424 	SPDK_CU_ASSERT_FATAL(rc == 0);
425 	rc = 0xbad;
426 	spdk_bdev_wait_for_examine(esnap_wait_for_examine, &rc);
427 	poll_error_updated(&rc);
428 
429 	/* Verify the lvol is loaded without creating a bdev. */
430 	lvol = spdk_lvol_get_by_names("lvs1", "clone1");
431 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/clone1") == NULL)
432 	SPDK_CU_ASSERT_FATAL(lvol != NULL);
433 	SPDK_CU_ASSERT_FATAL(lvol->degraded_set != NULL);
434 
435 	/* Create the esnap device and verify that the bdev is created. */
436 	rc = create_malloc_disk(&malloc_bdev, &malloc_opts);
437 	SPDK_CU_ASSERT_FATAL(rc == 0);
438 	poll_threads();
439 	CU_ASSERT(malloc_bdev != NULL);
440 	CU_ASSERT(lvol->degraded_set == NULL);
441 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/clone1") != NULL);
442 
443 	/* Clean up */
444 	rc = rc2 = 0xbad;
445 	bdev_aio_delete("aio1", unregister_cb, &rc);
446 	poll_error_updated(&rc);
447 	CU_ASSERT(rc == 0);
448 	if (malloc_bdev != NULL) {
449 		delete_malloc_disk(malloc_bdev->name, unregister_cb, &rc2);
450 		poll_threads();
451 		CU_ASSERT(rc2 == 0);
452 	}
453 	rc = unlink(aiopath);
454 	CU_ASSERT(rc == 0);
455 }
456 
457 static void
458 esnap_remove_degraded(void)
459 {
460 	const char *uuid_esnap = "33358eb9-3dcf-4275-b089-0becc126fc3d";
461 	struct malloc_bdev_opts malloc_opts = { 0 };
462 	const uint32_t bs_size_bytes = 10 * 1024 * 1024;
463 	const uint32_t bs_block_size = 4096;
464 	const uint32_t cluster_size = 32 * 1024;
465 	const uint32_t esnap_size_bytes = 2 * cluster_size;
466 	struct op_with_handle_data owh_data = { 0 };
467 	struct spdk_lvol_store *lvs;
468 	struct spdk_bdev *malloc_bdev = NULL;
469 	struct spdk_lvol *vol1, *vol2, *vol3;
470 	char aiopath[PATH_MAX];
471 	int rc, rc2;
472 
473 	g_bdev_opts.bdev_auto_examine = true;
474 
475 	/* Create aio device to hold the lvstore. */
476 	rc = make_test_file(bs_size_bytes, aiopath, sizeof(aiopath), "remove_degraded.aio");
477 	SPDK_CU_ASSERT_FATAL(rc == 0);
478 	rc = create_aio_bdev("aio1", aiopath, bs_block_size, false);
479 	SPDK_CU_ASSERT_FATAL(rc == 0);
480 	poll_threads();
481 
482 	rc = vbdev_lvs_create("aio1", "lvs1", cluster_size, 0, 0,
483 			      lvs_op_with_handle_cb, clear_owh(&owh_data));
484 	SPDK_CU_ASSERT_FATAL(rc == 0);
485 	poll_error_updated(&owh_data.lvserrno);
486 	SPDK_CU_ASSERT_FATAL(owh_data.lvserrno == 0);
487 	SPDK_CU_ASSERT_FATAL(owh_data.u.lvs != NULL);
488 	lvs = owh_data.u.lvs;
489 
490 	/* Create esnap device */
491 	spdk_uuid_parse(&malloc_opts.uuid, uuid_esnap);
492 	malloc_opts.name = "esnap";
493 	malloc_opts.num_blocks = esnap_size_bytes / bs_block_size;
494 	malloc_opts.block_size = bs_block_size;
495 	rc = create_malloc_disk(&malloc_bdev, &malloc_opts);
496 	SPDK_CU_ASSERT_FATAL(rc == 0);
497 
498 	/* Create a snapshot of vol1.
499 	 * State:
500 	 *   esnap <-- vol1
501 	 */
502 	vbdev_lvol_create_bdev_clone(uuid_esnap, lvs, "vol1",
503 				     lvol_op_with_handle_cb, clear_owh(&owh_data));
504 	poll_error_updated(&owh_data.lvserrno);
505 	SPDK_CU_ASSERT_FATAL(owh_data.lvserrno == 0);
506 	SPDK_CU_ASSERT_FATAL(owh_data.u.lvol != NULL);
507 	vol1 = owh_data.u.lvol;
508 
509 	/* Create a snapshot of vol1.
510 	 * State:
511 	 *   esnap <-- vol2 <-- vol1
512 	 */
513 	vbdev_lvol_create_snapshot(vol1, "vol2", lvol_op_with_handle_cb, clear_owh(&owh_data));
514 	poll_error_updated(&owh_data.lvserrno);
515 	SPDK_CU_ASSERT_FATAL(owh_data.lvserrno == 0);
516 	SPDK_CU_ASSERT_FATAL(owh_data.u.lvol != NULL);
517 	vol2 = owh_data.u.lvol;
518 
519 	/* Create a clone of vol2.
520 	 * State:
521 	 *   esnap <-- vol2 <-- vol1
522 	 *                `---- vol3
523 	 */
524 	vbdev_lvol_create_clone(vol2, "vol3", lvol_op_with_handle_cb, clear_owh(&owh_data));
525 	poll_error_updated(&owh_data.lvserrno);
526 	SPDK_CU_ASSERT_FATAL(owh_data.lvserrno == 0);
527 	SPDK_CU_ASSERT_FATAL(owh_data.u.lvol != NULL);
528 	vol3 = owh_data.u.lvol;
529 
530 	/* Unload the lvstore and delete esnap */
531 	rc = rc2 = 0xbad;
532 	bdev_aio_delete("aio1", unregister_cb, &rc);
533 	CU_ASSERT(spdk_bdev_get_by_name(uuid_esnap) != NULL)
534 	delete_malloc_disk(malloc_bdev->name, unregister_cb, &rc2);
535 	malloc_bdev = NULL;
536 	poll_error_updated(&rc);
537 	poll_error_updated(&rc2);
538 	SPDK_CU_ASSERT_FATAL(rc == 0);
539 	SPDK_CU_ASSERT_FATAL(rc2 == 0);
540 
541 	/* Trigger the reload of the lvstore.
542 	 * State:
543 	 *   (missing) <-- vol2 <-- vol1
544 	 *                    `---- vol3
545 	 */
546 	rc = create_aio_bdev("aio1", aiopath, bs_block_size, false);
547 	SPDK_CU_ASSERT_FATAL(rc == 0);
548 	rc = 0xbad;
549 	spdk_bdev_wait_for_examine(esnap_wait_for_examine, &rc);
550 	poll_error_updated(&rc);
551 
552 	/* Verify vol1 is as described in diagram above */
553 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/vol1") == NULL);
554 	vol1 = spdk_lvol_get_by_names("lvs1", "vol1");
555 	SPDK_CU_ASSERT_FATAL(vol1 != NULL);
556 	lvs = vol1->lvol_store;
557 	CU_ASSERT(spdk_blob_is_clone(vol1->blob));
558 	CU_ASSERT(!spdk_blob_is_esnap_clone(vol1->blob));
559 	CU_ASSERT(!spdk_blob_is_snapshot(vol1->blob));
560 	CU_ASSERT(vol1->degraded_set == NULL);
561 
562 	/* Verify vol2 is as described in diagram above */
563 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/vol2") == NULL);
564 	vol2 = spdk_lvol_get_by_names("lvs1", "vol2");
565 	SPDK_CU_ASSERT_FATAL(vol2 != NULL);
566 	CU_ASSERT(!spdk_blob_is_clone(vol2->blob));
567 	CU_ASSERT(spdk_blob_is_esnap_clone(vol2->blob));
568 	CU_ASSERT(spdk_blob_is_snapshot(vol2->blob));
569 	CU_ASSERT(RB_MIN(degraded_lvol_sets_tree, &lvs->degraded_lvol_sets_tree) == vol2->degraded_set);
570 	CU_ASSERT(TAILQ_FIRST(&vol2->degraded_set->lvols) == vol2);
571 
572 	/* Verify vol3 is as described in diagram above */
573 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/vol3") == NULL);
574 	vol3 = spdk_lvol_get_by_names("lvs1", "vol3");
575 	SPDK_CU_ASSERT_FATAL(vol3 != NULL);
576 	CU_ASSERT(spdk_blob_is_clone(vol3->blob));
577 	CU_ASSERT(!spdk_blob_is_esnap_clone(vol3->blob));
578 	CU_ASSERT(!spdk_blob_is_snapshot(vol3->blob));
579 	CU_ASSERT(vol3->degraded_set == NULL);
580 
581 	/* Try to delete vol2. Should fail because it has multiple clones. */
582 	rc = 0xbad;
583 	vbdev_lvol_destroy(vol2, lvol_op_complete_cb, &rc);
584 	poll_error_updated(&rc);
585 	CU_ASSERT(rc == -EPERM);
586 
587 	/* Delete vol1
588 	 * New state:
589 	 *   (missing) <-- vol2 <-- vol3
590 	 */
591 	rc = 0xbad;
592 	vbdev_lvol_destroy(vol1, lvol_op_complete_cb, &rc);
593 	poll_error_updated(&rc);
594 	CU_ASSERT(rc == 0);
595 
596 	/* Verify vol1 is gone */
597 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/vol1") == NULL);
598 	vol1 = spdk_lvol_get_by_names("lvs1", "vol1");
599 	CU_ASSERT(vol1 == NULL);
600 
601 	/* Verify vol2 is as described in diagram above */
602 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/vol2") == NULL);
603 	vol2 = spdk_lvol_get_by_names("lvs1", "vol2");
604 	SPDK_CU_ASSERT_FATAL(vol2 != NULL);
605 	CU_ASSERT(!spdk_blob_is_clone(vol2->blob));
606 	CU_ASSERT(spdk_blob_is_esnap_clone(vol2->blob));
607 	CU_ASSERT(spdk_blob_is_snapshot(vol2->blob));
608 	CU_ASSERT(RB_MIN(degraded_lvol_sets_tree, &lvs->degraded_lvol_sets_tree) == vol2->degraded_set);
609 	CU_ASSERT(TAILQ_FIRST(&vol2->degraded_set->lvols) == vol2);
610 
611 	/* Verify vol3 is as described in diagram above */
612 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/vol3") == NULL);
613 	vol3 = spdk_lvol_get_by_names("lvs1", "vol3");
614 	SPDK_CU_ASSERT_FATAL(vol3 != NULL);
615 	CU_ASSERT(spdk_blob_is_clone(vol3->blob));
616 	CU_ASSERT(!spdk_blob_is_esnap_clone(vol3->blob));
617 	CU_ASSERT(!spdk_blob_is_snapshot(vol3->blob));
618 	CU_ASSERT(vol3->degraded_set == NULL);
619 
620 	/* Delete vol2
621 	 * New state:
622 	 *   (missing) <-- vol3
623 	 */
624 	rc = 0xbad;
625 	vbdev_lvol_destroy(vol2, lvol_op_complete_cb, &rc);
626 	poll_error_updated(&rc);
627 	CU_ASSERT(rc == 0);
628 
629 	/* Verify vol2 is gone */
630 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/vol2") == NULL);
631 	vol2 = spdk_lvol_get_by_names("lvs1", "vol2");
632 	SPDK_CU_ASSERT_FATAL(vol2 == NULL);
633 
634 	/* Verify vol3 is as described in diagram above */
635 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/vol3") == NULL);
636 	vol3 = spdk_lvol_get_by_names("lvs1", "vol3");
637 	SPDK_CU_ASSERT_FATAL(vol3 != NULL);
638 	CU_ASSERT(!spdk_blob_is_clone(vol3->blob));
639 	CU_ASSERT(spdk_blob_is_esnap_clone(vol3->blob));
640 	CU_ASSERT(!spdk_blob_is_snapshot(vol3->blob));
641 	CU_ASSERT(RB_MIN(degraded_lvol_sets_tree, &lvs->degraded_lvol_sets_tree) == vol3->degraded_set);
642 	CU_ASSERT(TAILQ_FIRST(&vol3->degraded_set->lvols) == vol3);
643 
644 	/* Delete vol3
645 	 * New state:
646 	 *   (nothing)
647 	 */
648 	rc = 0xbad;
649 	vbdev_lvol_destroy(vol3, lvol_op_complete_cb, &rc);
650 	poll_error_updated(&rc);
651 	CU_ASSERT(rc == 0);
652 
653 	/* Verify vol3 is gone */
654 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/vol3") == NULL);
655 	vol3 = spdk_lvol_get_by_names("lvs1", "vol3");
656 	SPDK_CU_ASSERT_FATAL(vol3 == NULL);
657 
658 	/* Nothing depends on the missing bdev, so it is no longer missing. */
659 	CU_ASSERT(RB_EMPTY(&lvs->degraded_lvol_sets_tree));
660 
661 	/* Clean up */
662 	rc = rc2 = 0xbad;
663 	bdev_aio_delete("aio1", unregister_cb, &rc);
664 	poll_error_updated(&rc);
665 	CU_ASSERT(rc == 0);
666 	if (malloc_bdev != NULL) {
667 		delete_malloc_disk(malloc_bdev->name, unregister_cb, &rc2);
668 		poll_threads();
669 		CU_ASSERT(rc2 == 0);
670 	}
671 	rc = unlink(aiopath);
672 	CU_ASSERT(rc == 0);
673 }
674 
675 static void
676 late_delete(void)
677 {
678 	char aiopath[PATH_MAX];
679 	struct spdk_lvol_store *lvs = NULL;
680 	const uint32_t bs_size_bytes = 10 * 1024 * 1024;
681 	const uint32_t bs_block_size = 4096;
682 	const uint32_t cluster_size = 32 * 1024;
683 	struct op_with_handle_data owh_data;
684 	struct lvol_bdev *lvol_bdev;
685 	struct spdk_lvol *lvol;
686 	int rc, rc2;
687 	int count;
688 
689 	/* Create device for lvstore. This cannot be a malloc bdev because we need to sneak a
690 	 * deletion in while blob_persist() is in progress.
691 	 */
692 	rc = make_test_file(bs_size_bytes, aiopath, sizeof(aiopath), "late_delete.aio");
693 	SPDK_CU_ASSERT_FATAL(rc == 0);
694 	rc = create_aio_bdev("aio1", aiopath, bs_block_size, false);
695 	SPDK_CU_ASSERT_FATAL(rc == 0);
696 	poll_threads();
697 
698 	/* Create lvstore */
699 	rc = vbdev_lvs_create("aio1", "lvs1", cluster_size, 0, 0,
700 			      lvs_op_with_handle_cb, clear_owh(&owh_data));
701 	SPDK_CU_ASSERT_FATAL(rc == 0);
702 	poll_error_updated(&owh_data.lvserrno);
703 	SPDK_CU_ASSERT_FATAL(owh_data.lvserrno == 0);
704 	SPDK_CU_ASSERT_FATAL(owh_data.u.lvs != NULL);
705 	lvs = owh_data.u.lvs;
706 
707 	/* Create an lvol */
708 	vbdev_lvol_create(lvs, "lvol", 1, true, LVOL_CLEAR_WITH_DEFAULT,
709 			  lvol_op_with_handle_cb, clear_owh(&owh_data));
710 	poll_error_updated(&owh_data.lvserrno);
711 	CU_ASSERT(owh_data.lvserrno == 0);
712 	CU_ASSERT(owh_data.u.lvol != NULL);
713 
714 	/* Verify the lvol can be found both ways */
715 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/lvol") != NULL);
716 	CU_ASSERT(spdk_lvol_get_by_names("lvs1", "lvol") != NULL);
717 
718 	/*
719 	 * Once the lvolstore deletion starts, it will briefly be possible to look up lvol bdevs and
720 	 * degraded lvols in that lvolstore but any attempt to delete them will fail with -ENODEV.
721 	 */
722 	rc = 0xbad;
723 	count = 0;
724 	vbdev_lvs_destruct(lvs, lvol_op_complete_cb, &rc);
725 	do {
726 		lvol_bdev = (struct lvol_bdev *)spdk_bdev_get_by_name("lvs1/lvol");
727 		if (lvol_bdev != NULL) {
728 			rc2 = 0xbad;
729 			vbdev_lvol_destroy(lvol_bdev->lvol, lvol_op_complete_cb, &rc2);
730 			CU_ASSERT(rc2 == -ENODEV);
731 		}
732 		lvol = spdk_lvol_get_by_names("lvs1", "lvol");
733 		if (lvol != NULL) {
734 			/* If we are here, we are likely reproducing #2998 */
735 			rc2 = 0xbad;
736 			vbdev_lvol_destroy(lvol, lvol_op_complete_cb, &rc2);
737 			CU_ASSERT(rc2 == -ENODEV);
738 			count++;
739 		}
740 
741 		spdk_thread_poll(g_ut_threads[0].thread, 0, 0);
742 	} while (rc == 0xbad);
743 
744 	/* Ensure that the test exercised the race */
745 	CU_ASSERT(count != 0);
746 
747 	/* The lvol is now gone */
748 	CU_ASSERT(spdk_bdev_get_by_name("lvs1/lvol") == NULL);
749 	CU_ASSERT(spdk_lvol_get_by_names("lvs1", "lvol") == NULL);
750 
751 	/* Clean up */
752 	rc = 0xbad;
753 	bdev_aio_delete("aio1", unregister_cb, &rc);
754 	poll_error_updated(&rc);
755 	CU_ASSERT(rc == 0);
756 	rc = unlink(aiopath);
757 	CU_ASSERT(rc == 0);
758 }
759 
760 static void
761 bdev_init_cb(void *arg, int rc)
762 {
763 	assert(rc == 0);
764 }
765 
766 static void
767 subsystem_init_cb(int rc, void *ctx)
768 {
769 	assert(rc == 0);
770 }
771 
772 static void
773 bdev_fini_cb(void *arg)
774 {
775 }
776 
777 int
778 main(int argc, char **argv)
779 {
780 	CU_pSuite	suite = NULL;
781 	unsigned int	num_failures;
782 	int rc;
783 
784 	set_testdir(argv[0]);
785 
786 	CU_initialize_registry();
787 
788 	suite = CU_add_suite("esnap_io", NULL, NULL);
789 
790 	CU_ADD_TEST(suite, esnap_clone_io);
791 	CU_ADD_TEST(suite, esnap_hotplug);
792 	CU_ADD_TEST(suite, esnap_remove_degraded);
793 	CU_ADD_TEST(suite, late_delete);
794 
795 	allocate_threads(2);
796 	set_thread(0);
797 
798 	/*
799 	 * This is a non-standard way of initializing libraries. It works for this test but
800 	 * shouldn't be used as an example elsewhere, except for maybe other tests.
801 	 */
802 	spdk_subsystem_init(subsystem_init_cb, NULL);
803 	rc = spdk_iobuf_initialize();
804 	if (rc != 0) {
805 		SPDK_ERRLOG("Failed to initialize iobuf\n");
806 		abort();
807 	}
808 	rc = spdk_accel_initialize();
809 	if (rc != 0) {
810 		SPDK_ERRLOG("Failed to initialize accel\n");
811 		abort();
812 	}
813 	spdk_bdev_initialize(bdev_init_cb, NULL);
814 
815 	num_failures = spdk_ut_run_tests(argc, argv, NULL);
816 	CU_cleanup_registry();
817 
818 	spdk_bdev_finish(bdev_fini_cb, NULL);
819 	spdk_accel_finish(bdev_fini_cb, NULL);
820 	spdk_iobuf_finish(bdev_fini_cb, NULL);
821 
822 	free_threads();
823 
824 	return num_failures;
825 }
826