xref: /spdk/lib/ftl/utils/ftl_md.c (revision f8abbede89d30584d2a4f8427b13896f8591b873)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright 2023 Solidigm All Rights Reserved
3  *   Copyright (C) 2022 Intel Corporation.
4  *   All rights reserved.
5  */
6 
7 #include "spdk/env.h"
8 #include "spdk/bdev_module.h"
9 
10 #include "ftl_core.h"
11 #include "ftl_md.h"
12 #include "ftl_nv_cache_io.h"
13 
14 struct ftl_md;
15 static void io_submit(struct ftl_md *md);
16 static void io_done(struct ftl_md *md);
17 
18 static bool
19 has_mirror(struct ftl_md *md)
20 {
21 	if (md->region) {
22 		if (md->region->mirror_type != FTL_LAYOUT_REGION_TYPE_INVALID) {
23 			return md->mirror_enabled;
24 		}
25 	}
26 
27 	return false;
28 }
29 
30 static int
31 setup_mirror(struct ftl_md *md)
32 {
33 	if (!md->mirror) {
34 		md->mirror = calloc(1, sizeof(*md->mirror));
35 		if (!md->mirror) {
36 			return -ENOMEM;
37 		}
38 		md->mirror_enabled = true;
39 	}
40 
41 	md->mirror->dev = md->dev;
42 	md->mirror->data_blocks = md->data_blocks;
43 	md->mirror->data = md->data;
44 	md->mirror->vss_data = md->vss_data;
45 
46 	/* Set proper region in secondary object */
47 	assert(md->region->mirror_type != FTL_LAYOUT_REGION_TYPE_INVALID);
48 	md->mirror->region = &md->dev->layout.region[md->region->mirror_type];
49 
50 	return 0;
51 }
52 
53 uint64_t
54 ftl_md_xfer_blocks(struct spdk_ftl_dev *dev)
55 {
56 	return 4ULL * dev->xfer_size;
57 }
58 
59 static uint64_t
60 xfer_size(struct ftl_md *md)
61 {
62 	return ftl_md_xfer_blocks(md->dev) * FTL_BLOCK_SIZE;
63 }
64 
65 static void
66 ftl_md_create_heap(struct ftl_md *md, uint64_t vss_blksz)
67 {
68 	md->shm_fd = -1;
69 	md->vss_data = NULL;
70 	md->data = calloc(md->data_blocks, FTL_BLOCK_SIZE + vss_blksz);
71 
72 	if (md->data && vss_blksz) {
73 		md->vss_data = ((char *)md->data) + md->data_blocks * FTL_BLOCK_SIZE;
74 	}
75 }
76 
77 static void
78 ftl_md_destroy_heap(struct ftl_md *md)
79 {
80 	if (md->data) {
81 		free(md->data);
82 		md->data = NULL;
83 		md->vss_data = NULL;
84 	}
85 }
86 
87 static int
88 ftl_wrapper_open(const char *name, int of, mode_t m)
89 {
90 	return open(name, of, m);
91 }
92 
93 static void
94 ftl_md_setup_obj(struct ftl_md *md, int flags,
95 		 const char *name)
96 {
97 	char uuid_str[SPDK_UUID_STRING_LEN];
98 	const char *fmt;
99 
100 	if (!(flags & FTL_MD_CREATE_SHM)) {
101 		assert(false);
102 		return;
103 	}
104 
105 	/* TODO: temporary, define a proper hugetlbfs mountpoint */
106 	fmt = "/dev/hugepages/ftl_%s_%s";
107 	md->shm_mmap_flags = MAP_SHARED;
108 	md->shm_open = ftl_wrapper_open;
109 	md->shm_unlink = unlink;
110 
111 	if (name == NULL ||
112 	    spdk_uuid_fmt_lower(uuid_str, SPDK_UUID_STRING_LEN, &md->dev->conf.uuid) ||
113 	    snprintf(md->name, sizeof(md->name) / sizeof(md->name[0]),
114 		     fmt, uuid_str, name) <= 0) {
115 		md->name[0] = 0;
116 	}
117 }
118 
119 static void
120 ftl_md_invalidate_shm(struct ftl_md *md)
121 {
122 	if (md->dev->sb_shm && md->dev->sb_shm->shm_ready) {
123 		md->dev->init_retry = true;
124 		md->dev->sb_shm->shm_ready = false;
125 	}
126 }
127 
128 static void
129 ftl_md_create_shm(struct ftl_md *md, uint64_t vss_blksz, int flags)
130 {
131 	struct stat shm_stat;
132 	size_t vss_blk_offs;
133 	void *shm_ptr;
134 	int open_flags = O_RDWR;
135 	mode_t open_mode = S_IRUSR | S_IWUSR;
136 
137 	assert(md->shm_open && md->shm_unlink);
138 	md->data = NULL;
139 	md->vss_data = NULL;
140 	md->shm_sz = 0;
141 
142 	/* Must have an object name */
143 	if (md->name[0] == 0) {
144 		assert(false);
145 		return;
146 	}
147 
148 	/* If specified, unlink before create a new SHM object */
149 	if (flags & FTL_MD_CREATE_SHM_NEW) {
150 		if (md->shm_unlink(md->name) < 0 && errno != ENOENT) {
151 			ftl_md_invalidate_shm(md);
152 			return;
153 		}
154 		open_flags += O_CREAT | O_TRUNC;
155 	}
156 
157 	/* Open existing or create a new SHM object, then query its props */
158 	md->shm_fd = md->shm_open(md->name, open_flags, open_mode);
159 	if (md->shm_fd < 0 || fstat(md->shm_fd, &shm_stat) < 0) {
160 		goto err_shm;
161 	}
162 
163 	/* Verify open mode hasn't changed */
164 	if ((shm_stat.st_mode & (S_IRWXU | S_IRWXG | S_IRWXO)) != open_mode) {
165 		goto err_shm;
166 	}
167 
168 	/* Round up the SHM obj size to the nearest blk size (i.e. page size) */
169 	md->shm_sz = spdk_divide_round_up(md->data_blocks * FTL_BLOCK_SIZE, shm_stat.st_blksize);
170 
171 	/* Add some blks for VSS metadata */
172 	vss_blk_offs = md->shm_sz;
173 
174 	if (vss_blksz) {
175 		md->shm_sz += spdk_divide_round_up(md->data_blocks * vss_blksz,
176 						   shm_stat.st_blksize);
177 	}
178 
179 	/* Total SHM obj size */
180 	md->shm_sz *= shm_stat.st_blksize;
181 
182 	/* Set or check the object size - zero init`d in case of set (FTL_MD_CREATE_SHM_NEW) */
183 	if ((shm_stat.st_size == 0 && (ftruncate(md->shm_fd, md->shm_sz) < 0 ||
184 				       (flags & FTL_MD_CREATE_SHM_NEW) == 0))
185 	    || (shm_stat.st_size > 0 && (size_t)shm_stat.st_size != md->shm_sz)) {
186 		goto err_shm;
187 	}
188 
189 	/* Create a virtual memory mapping for the object */
190 	shm_ptr = mmap(NULL, md->shm_sz, PROT_READ | PROT_WRITE, md->shm_mmap_flags,
191 		       md->shm_fd, 0);
192 	if (shm_ptr == MAP_FAILED) {
193 		goto err_shm;
194 	}
195 
196 	md->data = shm_ptr;
197 	if (vss_blksz) {
198 		md->vss_data = ((char *)shm_ptr) + vss_blk_offs * shm_stat.st_blksize;
199 	}
200 
201 	/* Lock the pages in memory (i.e. prevent the pages to be paged out) */
202 	if (mlock(md->data, md->shm_sz) < 0) {
203 		goto err_map;
204 	}
205 
206 	if (spdk_mem_register(md->data, md->shm_sz)) {
207 		goto err_mlock;
208 	}
209 	md->mem_reg = true;
210 
211 	return;
212 
213 	/* Cleanup upon fault */
214 err_mlock:
215 	munlock(md->data, md->shm_sz);
216 
217 err_map:
218 	munmap(md->data, md->shm_sz);
219 	md->data = NULL;
220 	md->vss_data = NULL;
221 	md->shm_sz = 0;
222 
223 err_shm:
224 	if (md->shm_fd >= 0) {
225 		close(md->shm_fd);
226 		md->shm_unlink(md->name);
227 		md->shm_fd = -1;
228 	}
229 	ftl_md_invalidate_shm(md);
230 }
231 
232 static void
233 ftl_md_destroy_shm(struct ftl_md *md, int flags)
234 {
235 	if (!md->data) {
236 		return;
237 	}
238 
239 	assert(md->shm_sz > 0);
240 	if (md->mem_reg) {
241 		spdk_mem_unregister(md->data, md->shm_sz);
242 		md->mem_reg = false;
243 	}
244 
245 	/* Unlock the pages in memory */
246 	munlock(md->data, md->shm_sz);
247 
248 	/* Remove the virtual memory mapping for the object */
249 	munmap(md->data, md->shm_sz);
250 
251 	/* Close SHM object fd */
252 	close(md->shm_fd);
253 
254 	md->data = NULL;
255 	md->vss_data = NULL;
256 
257 	/* If specified, keep the object in SHM */
258 	if (flags & FTL_MD_DESTROY_SHM_KEEP) {
259 		return;
260 	}
261 
262 	/* Otherwise destroy/unlink the object */
263 	assert(md->name[0] != 0 && md->shm_unlink != NULL);
264 	md->shm_unlink(md->name);
265 }
266 
267 struct ftl_md *ftl_md_create(struct spdk_ftl_dev *dev, uint64_t blocks,
268 			     uint64_t vss_blksz, const char *name, int flags,
269 			     const struct ftl_layout_region *region)
270 {
271 	struct ftl_md *md;
272 
273 	md = calloc(1, sizeof(*md));
274 	if (!md) {
275 		return NULL;
276 	}
277 	md->dev = dev;
278 	md->data_blocks = blocks;
279 	md->mirror_enabled = true;
280 
281 	if (flags != FTL_MD_CREATE_NO_MEM) {
282 		if (flags & FTL_MD_CREATE_SHM) {
283 			ftl_md_setup_obj(md, flags, name);
284 			ftl_md_create_shm(md, vss_blksz, flags);
285 		} else {
286 			assert((flags & FTL_MD_CREATE_HEAP) == FTL_MD_CREATE_HEAP);
287 			ftl_md_create_heap(md, vss_blksz);
288 		}
289 
290 		if (!md->data) {
291 			free(md);
292 			return NULL;
293 		}
294 	}
295 
296 	if (region) {
297 		size_t entry_vss_buf_size = vss_blksz * region->entry_size;
298 
299 		if (entry_vss_buf_size) {
300 			md->entry_vss_dma_buf = spdk_malloc(entry_vss_buf_size, FTL_BLOCK_SIZE,
301 							    NULL, SPDK_ENV_LCORE_ID_ANY,
302 							    SPDK_MALLOC_DMA);
303 			if (!md->entry_vss_dma_buf) {
304 				goto err;
305 			}
306 		}
307 
308 		if (ftl_md_set_region(md, region)) {
309 			goto err;
310 		}
311 	}
312 
313 	return md;
314 err:
315 	ftl_md_destroy(md, ftl_md_destroy_region_flags(dev, region->type));
316 	return NULL;
317 }
318 
319 int
320 ftl_md_unlink(struct spdk_ftl_dev *dev, const char *name, int flags)
321 {
322 	struct ftl_md md = { 0 };
323 
324 	if (0 == (flags & FTL_MD_CREATE_SHM)) {
325 		/* Unlink can be called for shared memory only */
326 		return -EINVAL;
327 	}
328 
329 	md.dev = dev;
330 	ftl_md_setup_obj(&md, flags, name);
331 
332 	return md.shm_unlink(md.name);
333 }
334 
335 void
336 ftl_md_destroy(struct ftl_md *md, int flags)
337 {
338 	if (!md) {
339 		return;
340 	}
341 
342 	ftl_md_free_buf(md, flags);
343 
344 	spdk_free(md->entry_vss_dma_buf);
345 
346 	free(md->mirror);
347 	free(md);
348 }
349 
350 void
351 ftl_md_free_buf(struct ftl_md *md, int flags)
352 {
353 	if (!md) {
354 		return;
355 	}
356 
357 	if (md->shm_fd < 0) {
358 		assert(flags == 0);
359 		ftl_md_destroy_heap(md);
360 	} else {
361 		ftl_md_destroy_shm(md, flags);
362 	}
363 }
364 
365 void *
366 ftl_md_get_buffer(struct ftl_md *md)
367 {
368 	return md->data;
369 }
370 
371 uint64_t
372 ftl_md_get_buffer_size(struct ftl_md *md)
373 {
374 	return md->data_blocks * FTL_BLOCK_SIZE;
375 }
376 
377 static void
378 ftl_md_vss_buf_init(union ftl_md_vss *buf, uint32_t count,
379 		    const union ftl_md_vss *vss_pattern)
380 {
381 	while (count) {
382 		count--;
383 		buf[count] = *vss_pattern;
384 	}
385 }
386 
387 union ftl_md_vss *ftl_md_vss_buf_alloc(struct ftl_layout_region *region, uint32_t count)
388 {
389 	union ftl_md_vss *buf = spdk_zmalloc(count * FTL_MD_VSS_SZ, FTL_BLOCK_SIZE, NULL,
390 						     SPDK_ENV_LCORE_ID_ANY,
391 						     SPDK_MALLOC_DMA);
392 
393 	if (!buf) {
394 		return NULL;
395 	}
396 
397 	union ftl_md_vss vss_buf = {0};
398 	vss_buf.version.md_version = region->current.version;
399 	ftl_md_vss_buf_init(buf, count, &vss_buf);
400 	return buf;
401 }
402 
403 union ftl_md_vss *ftl_md_get_vss_buffer(struct ftl_md *md)
404 {
405 	return md->vss_data;
406 }
407 
408 static void
409 io_cleanup(struct ftl_md *md)
410 {
411 	spdk_dma_free(md->io.data);
412 	md->io.data = NULL;
413 
414 	spdk_dma_free(md->io.md);
415 	md->io.md = NULL;
416 }
417 
418 static void
419 exception(void *arg)
420 {
421 	struct ftl_md *md = arg;
422 
423 	md->cb(md->dev, md, -EINVAL);
424 	io_cleanup(md);
425 }
426 
427 static inline enum ftl_stats_type
428 get_bdev_io_ftl_stats_type(struct spdk_ftl_dev *dev, struct spdk_bdev_io *bdev_io) {
429 	struct spdk_bdev *nvc = spdk_bdev_desc_get_bdev(dev->nv_cache.bdev_desc);
430 
431 	if (bdev_io->bdev == nvc)
432 	{
433 		return FTL_STATS_TYPE_MD_NV_CACHE;
434 	} else
435 	{
436 		return FTL_STATS_TYPE_MD_BASE;
437 	}
438 }
439 
440 static void
441 audit_md_vss_version(struct ftl_md *md, uint64_t blocks)
442 {
443 #if defined(DEBUG)
444 	union ftl_md_vss *vss = md->io.md;
445 	while (blocks) {
446 		blocks--;
447 		assert(vss[blocks].version.md_version == md->region->current.version);
448 	}
449 #endif
450 }
451 
452 static void
453 read_write_blocks_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
454 {
455 	struct ftl_md *md = arg;
456 
457 	ftl_stats_bdev_io_completed(md->dev, get_bdev_io_ftl_stats_type(md->dev, bdev_io), bdev_io);
458 
459 	if (spdk_unlikely(!success)) {
460 		if (md->io.op == FTL_MD_OP_RESTORE && has_mirror(md)) {
461 			md->io.status = -EAGAIN;
462 		} else {
463 			md->io.status = -EIO;
464 		}
465 	} else {
466 		uint64_t blocks = bdev_io->u.bdev.num_blocks;
467 		uint64_t size = blocks * FTL_BLOCK_SIZE;
468 
469 		if (md->io.op == FTL_MD_OP_RESTORE) {
470 			memcpy(md->data + md->io.data_offset, md->io.data, size);
471 
472 			if (md->vss_data) {
473 				uint64_t vss_offset = md->io.data_offset / FTL_BLOCK_SIZE;
474 				vss_offset *= FTL_MD_VSS_SZ;
475 				audit_md_vss_version(md, blocks);
476 				memcpy(md->vss_data + vss_offset, md->io.md, blocks * FTL_MD_VSS_SZ);
477 			}
478 		}
479 
480 		md->io.address += blocks;
481 		md->io.remaining -= blocks;
482 		md->io.data_offset += size;
483 	}
484 
485 	spdk_bdev_free_io(bdev_io);
486 
487 	io_submit(md);
488 }
489 
490 static inline int
491 read_blocks(struct spdk_ftl_dev *dev, struct spdk_bdev_desc *desc,
492 	    struct spdk_io_channel *ch,
493 	    void *buf, void *md_buf,
494 	    uint64_t offset_blocks, uint64_t num_blocks,
495 	    spdk_bdev_io_completion_cb cb, void *cb_arg)
496 {
497 	if (desc == dev->nv_cache.bdev_desc) {
498 		return ftl_nv_cache_bdev_read_blocks_with_md(dev, desc, ch, buf, md_buf,
499 				offset_blocks, num_blocks,
500 				cb, cb_arg);
501 	} else if (md_buf) {
502 		return spdk_bdev_read_blocks_with_md(desc, ch, buf, md_buf,
503 						     offset_blocks, num_blocks,
504 						     cb, cb_arg);
505 	} else {
506 		return spdk_bdev_read_blocks(desc, ch, buf,
507 					     offset_blocks, num_blocks,
508 					     cb, cb_arg);
509 	}
510 }
511 
512 static inline int
513 write_blocks(struct spdk_ftl_dev *dev, struct spdk_bdev_desc *desc,
514 	     struct spdk_io_channel *ch,
515 	     void *buf, void *md_buf,
516 	     uint64_t offset_blocks, uint64_t num_blocks,
517 	     spdk_bdev_io_completion_cb cb, void *cb_arg)
518 {
519 	if (desc == dev->nv_cache.bdev_desc) {
520 		return ftl_nv_cache_bdev_write_blocks_with_md(dev, desc, ch, buf, md_buf,
521 				offset_blocks, num_blocks,
522 				cb, cb_arg);
523 	} else if (md_buf) {
524 		return spdk_bdev_write_blocks_with_md(desc, ch, buf, md_buf, offset_blocks,
525 						      num_blocks, cb, cb_arg);
526 	} else {
527 		return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
528 	}
529 }
530 
531 static void
532 read_write_blocks(void *_md)
533 {
534 	struct ftl_md *md = _md;
535 	const struct ftl_layout_region *region = md->region;
536 	uint64_t blocks;
537 	int rc = 0;
538 
539 	blocks = spdk_min(md->io.remaining, ftl_md_xfer_blocks(md->dev));
540 
541 	switch (md->io.op) {
542 	case FTL_MD_OP_RESTORE:
543 		rc = read_blocks(md->dev, region->bdev_desc, region->ioch,
544 				 md->io.data, md->io.md,
545 				 md->io.address, blocks,
546 				 read_write_blocks_cb, md);
547 		break;
548 	case FTL_MD_OP_PERSIST:
549 	case FTL_MD_OP_CLEAR:
550 		rc = write_blocks(md->dev, region->bdev_desc, region->ioch,
551 				  md->io.data, md->io.md,
552 				  md->io.address, blocks,
553 				  read_write_blocks_cb, md);
554 		break;
555 	default:
556 		ftl_abort();
557 	}
558 
559 	if (spdk_unlikely(rc)) {
560 		if (rc == -ENOMEM) {
561 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(region->bdev_desc);
562 			md->io.bdev_io_wait.bdev = bdev;
563 			md->io.bdev_io_wait.cb_fn = read_write_blocks;
564 			md->io.bdev_io_wait.cb_arg = md;
565 			spdk_bdev_queue_io_wait(bdev, region->ioch, &md->io.bdev_io_wait);
566 		} else {
567 			ftl_abort();
568 		}
569 	}
570 }
571 
572 static void
573 io_submit(struct ftl_md *md)
574 {
575 	if (!md->io.remaining || md->io.status) {
576 		io_done(md);
577 		return;
578 	}
579 
580 	if (md->io.op == FTL_MD_OP_PERSIST) {
581 		uint64_t blocks = spdk_min(md->io.remaining, ftl_md_xfer_blocks(md->dev));
582 
583 		memcpy(md->io.data, md->data + md->io.data_offset, FTL_BLOCK_SIZE * blocks);
584 
585 		if (md->vss_data) {
586 			uint64_t vss_offset = md->io.data_offset / FTL_BLOCK_SIZE;
587 			vss_offset *= FTL_MD_VSS_SZ;
588 			assert(md->io.md);
589 			memcpy(md->io.md, md->vss_data + vss_offset, FTL_MD_VSS_SZ * blocks);
590 			audit_md_vss_version(md, blocks);
591 		}
592 	}
593 #if defined(DEBUG)
594 	if (md->io.md && md->io.op == FTL_MD_OP_CLEAR) {
595 		uint64_t blocks = spdk_min(md->io.remaining, ftl_md_xfer_blocks(md->dev));
596 		audit_md_vss_version(md, blocks);
597 	}
598 #endif
599 
600 	read_write_blocks(md);
601 }
602 
603 static int
604 io_can_start(struct ftl_md *md)
605 {
606 	assert(NULL == md->io.data);
607 	if (NULL != md->io.data) {
608 		/* Outgoing IO on metadata */
609 		return -EINVAL;
610 	}
611 
612 	if (!md->region) {
613 		/* No device region to process data */
614 		return -EINVAL;
615 	}
616 
617 	if (md->region->current.blocks > md->data_blocks) {
618 		/* No device region to process data */
619 		FTL_ERRLOG(md->dev, "Blocks number mismatch between metadata object and"
620 			   "device region\n");
621 		return -EINVAL;
622 	}
623 
624 	return 0;
625 }
626 
627 static int
628 io_prepare(struct ftl_md *md, enum ftl_md_ops op)
629 {
630 	const struct ftl_layout_region *region = md->region;
631 	uint64_t data_size, meta_size = 0;
632 
633 	/* Allocates buffer for IO */
634 	data_size = xfer_size(md);
635 	md->io.data = spdk_zmalloc(data_size, FTL_BLOCK_SIZE, NULL,
636 				   SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
637 	if (!md->io.data) {
638 		return -ENOMEM;
639 	}
640 
641 	if (md->vss_data || md->region->vss_blksz) {
642 		meta_size = ftl_md_xfer_blocks(md->dev) * FTL_MD_VSS_SZ;
643 		md->io.md = spdk_zmalloc(meta_size, FTL_BLOCK_SIZE, NULL,
644 					 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
645 		if (!md->io.md) {
646 			spdk_dma_free(md->io.data);
647 			md->io.data = NULL;
648 			return -ENOMEM;
649 		}
650 	}
651 
652 	md->io.address = region->current.offset;
653 	md->io.remaining = region->current.blocks;
654 	md->io.data_offset = 0;
655 	md->io.status = 0;
656 	md->io.op = op;
657 
658 	return 0;
659 }
660 
661 static int
662 io_init(struct ftl_md *md, enum ftl_md_ops op)
663 {
664 	if (io_can_start(md)) {
665 		return -EINVAL;
666 	}
667 
668 	if (io_prepare(md, op)) {
669 		return -ENOMEM;
670 	}
671 
672 	return 0;
673 }
674 
675 static uint64_t
676 persist_entry_lba(struct ftl_md *md, uint64_t start_entry)
677 {
678 	return md->region->current.offset + start_entry * md->region->entry_size;
679 }
680 
681 static void
682 persist_entry_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
683 {
684 	struct ftl_md_io_entry_ctx *ctx = cb_arg;
685 	struct ftl_md *md = ctx->md;
686 
687 	ftl_stats_bdev_io_completed(md->dev, get_bdev_io_ftl_stats_type(md->dev, bdev_io), bdev_io);
688 
689 	spdk_bdev_free_io(bdev_io);
690 
691 	assert(ctx->remaining > 0);
692 	ctx->remaining--;
693 
694 	if (!success) {
695 		ctx->status = -EIO;
696 	}
697 
698 	if (!ctx->remaining) {
699 		ctx->cb(ctx->status, ctx->cb_arg);
700 	}
701 }
702 
703 static int
704 ftl_md_persist_entry_write_blocks(struct ftl_md_io_entry_ctx *ctx, struct ftl_md *md,
705 				  spdk_bdev_io_wait_cb retry_fn)
706 {
707 	int rc;
708 
709 	rc = write_blocks(md->dev, md->region->bdev_desc, md->region->ioch,
710 			  ctx->buffer, ctx->vss_buffer,
711 			  persist_entry_lba(md, ctx->start_entry), md->region->entry_size,
712 			  persist_entry_cb, ctx);
713 	if (spdk_unlikely(rc)) {
714 		if (rc == -ENOMEM) {
715 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(md->region->bdev_desc);
716 			ctx->bdev_io_wait.bdev = bdev;
717 			ctx->bdev_io_wait.cb_fn = retry_fn;
718 			ctx->bdev_io_wait.cb_arg = ctx;
719 			spdk_bdev_queue_io_wait(bdev, md->region->ioch, &ctx->bdev_io_wait);
720 		} else {
721 			ftl_abort();
722 		}
723 	}
724 
725 	return rc;
726 }
727 
728 static void
729 ftl_md_persist_entry_mirror(void *_ctx)
730 {
731 	struct ftl_md_io_entry_ctx *ctx = _ctx;
732 
733 	ftl_md_persist_entry_write_blocks(ctx, ctx->md->mirror, ftl_md_persist_entry_mirror);
734 }
735 
736 static void
737 ftl_md_persist_entry_primary(void *_ctx)
738 {
739 	struct ftl_md_io_entry_ctx *ctx = _ctx;
740 	struct ftl_md *md = ctx->md;
741 	int rc;
742 
743 	rc = ftl_md_persist_entry_write_blocks(ctx, md, ftl_md_persist_entry_primary);
744 
745 	if (!rc && has_mirror(md)) {
746 		assert(md->region->entry_size == md->mirror->region->entry_size);
747 
748 		/* The MD object has mirror so execute persist on it too */
749 		ftl_md_persist_entry_mirror(ctx);
750 		ctx->remaining++;
751 	}
752 }
753 
754 static void
755 _ftl_md_persist_entry(struct ftl_md_io_entry_ctx *ctx)
756 {
757 	ctx->status = 0;
758 	ctx->remaining = 1;
759 
760 	/* First execute an IO to the primary region */
761 	ftl_md_persist_entry_primary(ctx);
762 }
763 
764 void
765 ftl_md_persist_entry(struct ftl_md *md, uint64_t start_entry, void *buffer, void *vss_buffer,
766 		     ftl_md_io_entry_cb cb, void *cb_arg,
767 		     struct ftl_md_io_entry_ctx *ctx)
768 {
769 	if (spdk_unlikely(0 == md->region->entry_size)) {
770 		/* This MD has not been configured to support persist entry call */
771 		ftl_abort();
772 	}
773 
774 	/* Initialize persist entry context */
775 	ctx->cb = cb;
776 	ctx->cb_arg = cb_arg;
777 	ctx->md = md;
778 	ctx->start_entry = start_entry;
779 	ctx->buffer = buffer;
780 	ctx->vss_buffer = vss_buffer ? : md->entry_vss_dma_buf;
781 
782 	_ftl_md_persist_entry(ctx);
783 }
784 
785 static void
786 read_entry_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
787 {
788 	struct ftl_md_io_entry_ctx *ctx = cb_arg;
789 	struct ftl_md *md = ctx->md;
790 
791 	ftl_stats_bdev_io_completed(md->dev, get_bdev_io_ftl_stats_type(md->dev, bdev_io), bdev_io);
792 
793 	spdk_bdev_free_io(bdev_io);
794 
795 	if (!success) {
796 		if (has_mirror(md)) {
797 			if (setup_mirror(md)) {
798 				/* An error when setup the mirror */
799 				ctx->status = -EIO;
800 				goto finish_io;
801 			}
802 
803 			/* First read from the mirror */
804 			ftl_md_read_entry(md->mirror, ctx->start_entry, ctx->buffer, ctx->vss_buffer,
805 					  ctx->cb, ctx->cb_arg,
806 					  ctx);
807 			return;
808 		} else {
809 			ctx->status = -EIO;
810 			goto finish_io;
811 		}
812 	}
813 
814 finish_io:
815 	ctx->cb(ctx->status, ctx->cb_arg);
816 }
817 
818 static void
819 ftl_md_read_entry_read_blocks(struct ftl_md_io_entry_ctx *ctx, struct ftl_md *md,
820 			      spdk_bdev_io_wait_cb retry_fn)
821 {
822 	int rc;
823 
824 	rc = read_blocks(md->dev, md->region->bdev_desc, md->region->ioch,
825 			 ctx->buffer, ctx->vss_buffer,
826 			 persist_entry_lba(md, ctx->start_entry), md->region->entry_size,
827 			 read_entry_cb, ctx);
828 
829 	if (spdk_unlikely(rc)) {
830 		if (rc == -ENOMEM) {
831 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(md->region->bdev_desc);
832 			ctx->bdev_io_wait.bdev = bdev;
833 			ctx->bdev_io_wait.cb_fn = retry_fn;
834 			ctx->bdev_io_wait.cb_arg = ctx;
835 			spdk_bdev_queue_io_wait(bdev, md->region->ioch, &ctx->bdev_io_wait);
836 		} else {
837 			ftl_abort();
838 		}
839 	}
840 }
841 
842 static void
843 _ftl_md_read_entry(void *_ctx)
844 {
845 	struct ftl_md_io_entry_ctx *ctx = _ctx;
846 
847 	ftl_md_read_entry_read_blocks(ctx, ctx->md, _ftl_md_read_entry);
848 }
849 
850 void
851 ftl_md_read_entry(struct ftl_md *md, uint64_t start_entry, void *buffer, void *vss_buffer,
852 		  ftl_md_io_entry_cb cb, void *cb_arg,
853 		  struct ftl_md_io_entry_ctx *ctx)
854 {
855 	if (spdk_unlikely(0 == md->region->entry_size)) {
856 		/* This MD has not been configured to support read entry call */
857 		ftl_abort();
858 	}
859 
860 	ctx->cb = cb;
861 	ctx->cb_arg = cb_arg;
862 	ctx->md = md;
863 	ctx->start_entry = start_entry;
864 	ctx->buffer = buffer;
865 	ctx->vss_buffer = vss_buffer;
866 
867 	_ftl_md_read_entry(ctx);
868 }
869 
870 void
871 ftl_md_persist_entry_retry(struct ftl_md_io_entry_ctx *ctx)
872 {
873 	_ftl_md_persist_entry(ctx);
874 }
875 
876 static void
877 persist_mirror_cb(struct spdk_ftl_dev *dev, struct ftl_md *md, int status)
878 {
879 	struct ftl_md *primary = md->owner.private;
880 
881 	if (status) {
882 		/* We got an error, stop persist procedure immediately */
883 		primary->io.status = status;
884 		io_done(primary);
885 	} else {
886 		/* Now continue the persist procedure on the primary MD object */
887 		if (0 == io_init(primary, FTL_MD_OP_PERSIST)) {
888 			io_submit(primary);
889 		} else {
890 			spdk_thread_send_msg(spdk_get_thread(), exception, primary);
891 		}
892 	}
893 }
894 
895 void
896 ftl_md_persist(struct ftl_md *md)
897 {
898 	if (has_mirror(md)) {
899 		if (setup_mirror(md)) {
900 			/* An error when setup the mirror */
901 			spdk_thread_send_msg(spdk_get_thread(), exception, md);
902 			return;
903 		}
904 
905 		/* Set callback and context in mirror */
906 		md->mirror->cb = persist_mirror_cb;
907 		md->mirror->owner.private = md;
908 
909 		/* First persist the mirror */
910 		ftl_md_persist(md->mirror);
911 		return;
912 	}
913 
914 	if (0 == io_init(md, FTL_MD_OP_PERSIST)) {
915 		io_submit(md);
916 	} else {
917 		spdk_thread_send_msg(spdk_get_thread(), exception, md);
918 	}
919 }
920 
921 static void
922 restore_mirror_cb(struct spdk_ftl_dev *dev, struct ftl_md *md, int status)
923 {
924 	struct ftl_md *primary = md->owner.private;
925 
926 	if (status) {
927 		/* Cannot restore the object from the mirror too, mark error and fail */
928 		primary->io.status = -EIO;
929 		io_done(primary);
930 	} else {
931 		/*
932 		 * Restoring from the mirror successful. Synchronize mirror to the primary.
933 		 * Because we read MD content from the mirror, we can disable it, only the primary
934 		 * requires persisting.
935 		 */
936 		primary->io.status = 0;
937 		primary->mirror_enabled = false;
938 		io_cleanup(primary);
939 		ftl_md_persist(primary);
940 		primary->mirror_enabled = true;
941 	}
942 }
943 
944 static void
945 restore_sync_cb(struct spdk_ftl_dev *dev, struct ftl_md *md, int status)
946 {
947 	struct ftl_md *primary = md->owner.private;
948 
949 	if (status) {
950 		/* Cannot sync the object from the primary to the mirror, mark error and fail */
951 		primary->io.status = -EIO;
952 		io_done(primary);
953 	} else {
954 		primary->cb(dev, primary, primary->io.status);
955 		io_cleanup(primary);
956 	}
957 }
958 
959 static int
960 restore_done(struct ftl_md *md)
961 {
962 	if (-EAGAIN == md->io.status) {
963 		/* Failed to read MD from primary region, try it from mirror.
964 		 * At the moment read the mirror entirely, (TODO) in the
965 		 * feature we can restore from primary and mirror region
966 		 * with finer granularity.
967 		 */
968 
969 		if (has_mirror(md)) {
970 			if (setup_mirror(md)) {
971 				/* An error when setup the mirror */
972 				return -EIO;
973 			}
974 
975 			/* Set callback and context in mirror */
976 			md->mirror->cb = restore_mirror_cb;
977 			md->mirror->owner.private = md;
978 
979 			/* First persist the mirror */
980 			ftl_md_restore(md->mirror);
981 			return -EAGAIN;
982 		} else {
983 			return -EIO;
984 		}
985 	} else if (0 == md->io.status && false == md->dev->sb->clean) {
986 		if (has_mirror(md)) {
987 			/* There was a dirty shutdown, synchronize primary to mirror */
988 
989 			/* Set callback and context in the mirror */
990 			md->mirror->cb = restore_sync_cb;
991 			md->mirror->owner.private = md;
992 
993 			/* First persist the mirror */
994 			ftl_md_persist(md->mirror);
995 			return -EAGAIN;
996 		}
997 	}
998 
999 	return md->io.status;
1000 }
1001 
1002 static void
1003 io_done(struct ftl_md *md)
1004 {
1005 	int status;
1006 
1007 	if (md->io.op == FTL_MD_OP_RESTORE) {
1008 		status = restore_done(md);
1009 	} else {
1010 		status = md->io.status;
1011 	}
1012 
1013 	if (status != -EAGAIN) {
1014 		md->cb(md->dev, md, status);
1015 		io_cleanup(md);
1016 	}
1017 }
1018 
1019 void
1020 ftl_md_restore(struct ftl_md *md)
1021 {
1022 	if (0 == io_init(md, FTL_MD_OP_RESTORE)) {
1023 		io_submit(md);
1024 	} else {
1025 		spdk_thread_send_msg(spdk_get_thread(), exception, md);
1026 	}
1027 }
1028 
1029 static int
1030 pattern_prepare(struct ftl_md *md,
1031 		int data_pattern, union ftl_md_vss *vss_pattern)
1032 {
1033 	void *data = md->io.data;
1034 	uint64_t data_size = xfer_size(md);
1035 
1036 	memset(data, data_pattern, data_size);
1037 
1038 	if (md->io.md) {
1039 		if (vss_pattern) {
1040 			/* store the VSS pattern... */
1041 			ftl_md_vss_buf_init(md->io.md, ftl_md_xfer_blocks(md->dev), vss_pattern);
1042 		} else {
1043 			/* ...or default init VSS to 0 */
1044 			union ftl_md_vss vss = {0};
1045 
1046 			vss.version.md_version = md->region->current.version;
1047 			ftl_md_vss_buf_init(md->io.md, ftl_md_xfer_blocks(md->dev), &vss);
1048 		}
1049 	}
1050 
1051 	return 0;
1052 }
1053 
1054 static void
1055 clear_mirror_cb(struct spdk_ftl_dev *dev, struct ftl_md *secondary, int status)
1056 {
1057 	struct ftl_md *primary = secondary->owner.private;
1058 
1059 	if (status) {
1060 		/* We got an error, stop persist procedure immediately */
1061 		primary->io.status = status;
1062 		io_done(primary);
1063 	} else {
1064 		/* Now continue the persist procedure on the primary MD object */
1065 		if (0 == io_init(primary, FTL_MD_OP_CLEAR) &&
1066 		    0 == pattern_prepare(primary, *(int *)secondary->io.data,
1067 					 secondary->io.md)) {
1068 			io_submit(primary);
1069 		} else {
1070 			spdk_thread_send_msg(spdk_get_thread(), exception, primary);
1071 		}
1072 	}
1073 }
1074 
1075 void
1076 ftl_md_clear(struct ftl_md *md, int data_pattern, union ftl_md_vss *vss_pattern)
1077 {
1078 	if (has_mirror(md)) {
1079 		if (setup_mirror(md)) {
1080 			/* An error when setup the mirror */
1081 			spdk_thread_send_msg(spdk_get_thread(), exception, md);
1082 			return;
1083 		}
1084 
1085 		/* Set callback and context in mirror */
1086 		md->mirror->cb = clear_mirror_cb;
1087 		md->mirror->owner.private = md;
1088 
1089 		/* First persist the mirror */
1090 		ftl_md_clear(md->mirror, data_pattern, vss_pattern);
1091 		return;
1092 	}
1093 
1094 	if (0 == io_init(md, FTL_MD_OP_CLEAR) && 0 == pattern_prepare(md, data_pattern, vss_pattern)) {
1095 		io_submit(md);
1096 	} else {
1097 		spdk_thread_send_msg(spdk_get_thread(), exception, md);
1098 	}
1099 }
1100 
1101 const struct ftl_layout_region *
1102 ftl_md_get_region(struct ftl_md *md)
1103 {
1104 	return md->region;
1105 }
1106 
1107 int
1108 ftl_md_set_region(struct ftl_md *md,
1109 		  const struct ftl_layout_region *region)
1110 {
1111 	assert(region->current.blocks <= md->data_blocks);
1112 	md->region = region;
1113 
1114 	if (md->vss_data) {
1115 		union ftl_md_vss vss = {0};
1116 		vss.version.md_version = region->current.version;
1117 		ftl_md_vss_buf_init(md->vss_data, md->data_blocks, &vss);
1118 		if (region->entry_size) {
1119 			assert(md->entry_vss_dma_buf);
1120 			ftl_md_vss_buf_init(md->entry_vss_dma_buf, region->entry_size, &vss);
1121 		}
1122 	}
1123 
1124 	if (has_mirror(md)) {
1125 		return setup_mirror(md);
1126 	}
1127 
1128 	return 0;
1129 }
1130 
1131 int
1132 ftl_md_create_region_flags(struct spdk_ftl_dev *dev, int region_type)
1133 {
1134 	int flags = FTL_MD_CREATE_SHM;
1135 
1136 	switch (region_type) {
1137 	case FTL_LAYOUT_REGION_TYPE_SB:
1138 		if (dev->conf.mode & SPDK_FTL_MODE_CREATE) {
1139 			flags |= FTL_MD_CREATE_SHM_NEW;
1140 		}
1141 		break;
1142 
1143 	case FTL_LAYOUT_REGION_TYPE_BAND_MD:
1144 	case FTL_LAYOUT_REGION_TYPE_NVC_MD:
1145 		if (!ftl_fast_startup(dev)) {
1146 			flags |= FTL_MD_CREATE_SHM_NEW;
1147 		}
1148 		break;
1149 	case FTL_LAYOUT_REGION_TYPE_VALID_MAP:
1150 	case FTL_LAYOUT_REGION_TYPE_TRIM_MD:
1151 		if (!ftl_fast_startup(dev) && !ftl_fast_recovery(dev)) {
1152 			flags |= FTL_MD_CREATE_SHM_NEW;
1153 		}
1154 		break;
1155 	default:
1156 		return FTL_MD_CREATE_HEAP;
1157 	}
1158 
1159 	return flags;
1160 }
1161 
1162 int
1163 ftl_md_destroy_region_flags(struct spdk_ftl_dev *dev, int region_type)
1164 {
1165 	switch (region_type) {
1166 	case FTL_LAYOUT_REGION_TYPE_SB:
1167 	case FTL_LAYOUT_REGION_TYPE_BAND_MD:
1168 	case FTL_LAYOUT_REGION_TYPE_VALID_MAP:
1169 	case FTL_LAYOUT_REGION_TYPE_NVC_MD:
1170 	case FTL_LAYOUT_REGION_TYPE_TRIM_MD:
1171 		if (dev->conf.fast_shutdown) {
1172 			return FTL_MD_DESTROY_SHM_KEEP;
1173 		}
1174 		break;
1175 
1176 	default:
1177 		break;
1178 	}
1179 	return 0;
1180 }
1181 
1182 int
1183 ftl_md_create_shm_flags(struct spdk_ftl_dev *dev)
1184 {
1185 	int flags = FTL_MD_CREATE_SHM;
1186 
1187 	if (!ftl_fast_startup(dev) && !ftl_fast_recovery(dev)) {
1188 		flags |= FTL_MD_CREATE_SHM_NEW;
1189 	}
1190 	return flags;
1191 }
1192 
1193 int
1194 ftl_md_destroy_shm_flags(struct spdk_ftl_dev *dev)
1195 {
1196 	return (dev->conf.fast_shutdown) ? FTL_MD_DESTROY_SHM_KEEP : 0;
1197 }
1198