xref: /spdk/lib/ftl/utils/ftl_md.c (revision 42fd001310188f0635a3953f3b0ea0b33a840902)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright 2023 Solidigm All Rights Reserved
3  *   Copyright (C) 2022 Intel Corporation.
4  *   All rights reserved.
5  */
6 
7 #include "spdk/env.h"
8 #include "spdk/bdev_module.h"
9 
10 #include "ftl_core.h"
11 #include "ftl_md.h"
12 #include "ftl_nv_cache_io.h"
13 
14 struct ftl_md;
15 static void io_submit(struct ftl_md *md);
16 static void io_done(struct ftl_md *md);
17 
18 static bool
19 has_mirror(struct ftl_md *md)
20 {
21 	if (md->region) {
22 		if (md->region->mirror_type != FTL_LAYOUT_REGION_TYPE_INVALID) {
23 			return md->mirror_enabled;
24 		}
25 	}
26 
27 	return false;
28 }
29 
30 static struct ftl_md *
31 ftl_md_get_mirror(struct ftl_md *md)
32 {
33 	if (has_mirror(md)) {
34 		return md->dev->layout.md[md->region->mirror_type];
35 	}
36 
37 	return NULL;
38 }
39 
40 uint64_t
41 ftl_md_xfer_blocks(struct spdk_ftl_dev *dev)
42 {
43 	return 4ULL * dev->xfer_size;
44 }
45 
46 static uint64_t
47 xfer_size(struct ftl_md *md)
48 {
49 	return ftl_md_xfer_blocks(md->dev) * FTL_BLOCK_SIZE;
50 }
51 
52 static void
53 ftl_md_create_heap(struct ftl_md *md, uint64_t vss_blksz)
54 {
55 	md->shm_fd = -1;
56 	md->vss_data = NULL;
57 	md->data = calloc(md->data_blocks, FTL_BLOCK_SIZE + vss_blksz);
58 
59 	if (md->data && vss_blksz) {
60 		md->vss_data = ((char *)md->data) + md->data_blocks * FTL_BLOCK_SIZE;
61 	}
62 }
63 
64 static void
65 ftl_md_destroy_heap(struct ftl_md *md)
66 {
67 	if (md->data) {
68 		free(md->data);
69 		md->data = NULL;
70 		md->vss_data = NULL;
71 	}
72 }
73 
74 static int
75 ftl_wrapper_open(const char *name, int of, mode_t m)
76 {
77 	return open(name, of, m);
78 }
79 
80 static void
81 ftl_md_setup_obj(struct ftl_md *md, int flags,
82 		 const char *name)
83 {
84 	char uuid_str[SPDK_UUID_STRING_LEN];
85 	const char *fmt;
86 
87 	if (!(flags & FTL_MD_CREATE_SHM)) {
88 		assert(false);
89 		return;
90 	}
91 
92 	/* TODO: temporary, define a proper hugetlbfs mountpoint */
93 	fmt = "/dev/hugepages/ftl_%s_%s";
94 	md->shm_mmap_flags = MAP_SHARED;
95 	md->shm_open = ftl_wrapper_open;
96 	md->shm_unlink = unlink;
97 
98 	if (name == NULL ||
99 	    spdk_uuid_fmt_lower(uuid_str, SPDK_UUID_STRING_LEN, &md->dev->conf.uuid) ||
100 	    snprintf(md->name, sizeof(md->name) / sizeof(md->name[0]),
101 		     fmt, uuid_str, name) <= 0) {
102 		md->name[0] = 0;
103 	}
104 }
105 
106 static void
107 ftl_md_invalidate_shm(struct ftl_md *md)
108 {
109 	if (md->dev->sb_shm && md->dev->sb_shm->shm_ready) {
110 		md->dev->init_retry = true;
111 		md->dev->sb_shm->shm_ready = false;
112 	}
113 }
114 
115 static void
116 ftl_md_create_shm(struct ftl_md *md, uint64_t vss_blksz, int flags)
117 {
118 	struct stat shm_stat;
119 	size_t vss_blk_offs;
120 	void *shm_ptr;
121 	int open_flags = O_RDWR;
122 	mode_t open_mode = S_IRUSR | S_IWUSR;
123 
124 	assert(md->shm_open && md->shm_unlink);
125 	md->data = NULL;
126 	md->vss_data = NULL;
127 	md->shm_sz = 0;
128 
129 	/* Must have an object name */
130 	if (md->name[0] == 0) {
131 		assert(false);
132 		return;
133 	}
134 
135 	/* If specified, unlink before create a new SHM object */
136 	if (flags & FTL_MD_CREATE_SHM_NEW) {
137 		if (md->shm_unlink(md->name) < 0 && errno != ENOENT) {
138 			ftl_md_invalidate_shm(md);
139 			return;
140 		}
141 		open_flags += O_CREAT | O_TRUNC;
142 	}
143 
144 	/* Open existing or create a new SHM object, then query its props */
145 	md->shm_fd = md->shm_open(md->name, open_flags, open_mode);
146 	if (md->shm_fd < 0 || fstat(md->shm_fd, &shm_stat) < 0) {
147 		goto err_shm;
148 	}
149 
150 	/* Verify open mode hasn't changed */
151 	if ((shm_stat.st_mode & (S_IRWXU | S_IRWXG | S_IRWXO)) != open_mode) {
152 		goto err_shm;
153 	}
154 
155 	/* Round up the SHM obj size to the nearest blk size (i.e. page size) */
156 	md->shm_sz = spdk_divide_round_up(md->data_blocks * FTL_BLOCK_SIZE, shm_stat.st_blksize);
157 
158 	/* Add some blks for VSS metadata */
159 	vss_blk_offs = md->shm_sz;
160 
161 	if (vss_blksz) {
162 		md->shm_sz += spdk_divide_round_up(md->data_blocks * vss_blksz,
163 						   shm_stat.st_blksize);
164 	}
165 
166 	/* Total SHM obj size */
167 	md->shm_sz *= shm_stat.st_blksize;
168 
169 	/* Set or check the object size - zero init`d in case of set (FTL_MD_CREATE_SHM_NEW) */
170 	if ((shm_stat.st_size == 0 && (ftruncate(md->shm_fd, md->shm_sz) < 0 ||
171 				       (flags & FTL_MD_CREATE_SHM_NEW) == 0))
172 	    || (shm_stat.st_size > 0 && (size_t)shm_stat.st_size != md->shm_sz)) {
173 		goto err_shm;
174 	}
175 
176 	/* Create a virtual memory mapping for the object */
177 	shm_ptr = mmap(NULL, md->shm_sz, PROT_READ | PROT_WRITE, md->shm_mmap_flags,
178 		       md->shm_fd, 0);
179 	if (shm_ptr == MAP_FAILED) {
180 		goto err_shm;
181 	}
182 
183 	md->data = shm_ptr;
184 	if (vss_blksz) {
185 		md->vss_data = ((char *)shm_ptr) + vss_blk_offs * shm_stat.st_blksize;
186 	}
187 
188 	/* Lock the pages in memory (i.e. prevent the pages to be paged out) */
189 	if (mlock(md->data, md->shm_sz) < 0) {
190 		goto err_map;
191 	}
192 
193 	if (spdk_mem_register(md->data, md->shm_sz)) {
194 		goto err_mlock;
195 	}
196 	md->mem_reg = true;
197 
198 	return;
199 
200 	/* Cleanup upon fault */
201 err_mlock:
202 	munlock(md->data, md->shm_sz);
203 
204 err_map:
205 	munmap(md->data, md->shm_sz);
206 	md->data = NULL;
207 	md->vss_data = NULL;
208 	md->shm_sz = 0;
209 
210 err_shm:
211 	if (md->shm_fd >= 0) {
212 		close(md->shm_fd);
213 		md->shm_unlink(md->name);
214 		md->shm_fd = -1;
215 	}
216 	ftl_md_invalidate_shm(md);
217 }
218 
219 static void
220 ftl_md_destroy_shm(struct ftl_md *md, int flags)
221 {
222 	if (!md->data) {
223 		return;
224 	}
225 
226 	assert(md->shm_sz > 0);
227 	if (md->mem_reg) {
228 		spdk_mem_unregister(md->data, md->shm_sz);
229 		md->mem_reg = false;
230 	}
231 
232 	/* Unlock the pages in memory */
233 	munlock(md->data, md->shm_sz);
234 
235 	/* Remove the virtual memory mapping for the object */
236 	munmap(md->data, md->shm_sz);
237 
238 	/* Close SHM object fd */
239 	close(md->shm_fd);
240 
241 	md->data = NULL;
242 	md->vss_data = NULL;
243 
244 	/* If specified, keep the object in SHM */
245 	if (flags & FTL_MD_DESTROY_SHM_KEEP) {
246 		return;
247 	}
248 
249 	/* Otherwise destroy/unlink the object */
250 	assert(md->name[0] != 0 && md->shm_unlink != NULL);
251 	md->shm_unlink(md->name);
252 }
253 
254 struct ftl_md *ftl_md_create(struct spdk_ftl_dev *dev, uint64_t blocks,
255 			     uint64_t vss_blksz, const char *name, int flags,
256 			     const struct ftl_layout_region *region)
257 {
258 	struct ftl_md *md;
259 
260 	md = calloc(1, sizeof(*md));
261 	if (!md) {
262 		return NULL;
263 	}
264 	md->dev = dev;
265 	md->data_blocks = blocks;
266 	md->mirror_enabled = true;
267 
268 	if (flags != FTL_MD_CREATE_NO_MEM) {
269 		if (flags & FTL_MD_CREATE_SHM) {
270 			ftl_md_setup_obj(md, flags, name);
271 			ftl_md_create_shm(md, vss_blksz, flags);
272 		} else {
273 			assert((flags & FTL_MD_CREATE_HEAP) == FTL_MD_CREATE_HEAP);
274 			ftl_md_create_heap(md, vss_blksz);
275 		}
276 
277 		if (!md->data) {
278 			free(md);
279 			return NULL;
280 		}
281 	}
282 
283 	if (region) {
284 		size_t entry_vss_buf_size = vss_blksz * region->entry_size;
285 
286 		if (entry_vss_buf_size) {
287 			md->entry_vss_dma_buf = spdk_malloc(entry_vss_buf_size, FTL_BLOCK_SIZE,
288 							    NULL, SPDK_ENV_LCORE_ID_ANY,
289 							    SPDK_MALLOC_DMA);
290 			if (!md->entry_vss_dma_buf) {
291 				goto err;
292 			}
293 		}
294 
295 		ftl_md_set_region(md, region);
296 	}
297 
298 	return md;
299 err:
300 	ftl_md_destroy(md, ftl_md_destroy_region_flags(dev, region->type));
301 	return NULL;
302 }
303 
304 int
305 ftl_md_unlink(struct spdk_ftl_dev *dev, const char *name, int flags)
306 {
307 	struct ftl_md md = { 0 };
308 
309 	if (0 == (flags & FTL_MD_CREATE_SHM)) {
310 		/* Unlink can be called for shared memory only */
311 		return -EINVAL;
312 	}
313 
314 	md.dev = dev;
315 	ftl_md_setup_obj(&md, flags, name);
316 
317 	return md.shm_unlink(md.name);
318 }
319 
320 void
321 ftl_md_destroy(struct ftl_md *md, int flags)
322 {
323 	if (!md) {
324 		return;
325 	}
326 
327 	if (!md->is_mirror) {
328 		ftl_md_free_buf(md, flags);
329 		spdk_free(md->entry_vss_dma_buf);
330 	}
331 	free(md);
332 }
333 
334 void
335 ftl_md_free_buf(struct ftl_md *md, int flags)
336 {
337 	if (!md) {
338 		return;
339 	}
340 
341 	if (md->shm_fd < 0) {
342 		assert(flags == 0);
343 		ftl_md_destroy_heap(md);
344 	} else {
345 		ftl_md_destroy_shm(md, flags);
346 	}
347 }
348 
349 void *
350 ftl_md_get_buffer(struct ftl_md *md)
351 {
352 	return md->data;
353 }
354 
355 uint64_t
356 ftl_md_get_buffer_size(struct ftl_md *md)
357 {
358 	return md->data_blocks * FTL_BLOCK_SIZE;
359 }
360 
361 static void
362 ftl_md_vss_buf_init(union ftl_md_vss *buf, uint32_t count,
363 		    const union ftl_md_vss *vss_pattern)
364 {
365 	while (count) {
366 		count--;
367 		buf[count] = *vss_pattern;
368 	}
369 }
370 
371 union ftl_md_vss *ftl_md_vss_buf_alloc(struct ftl_layout_region *region, uint32_t count)
372 {
373 	union ftl_md_vss *buf = spdk_zmalloc(count * FTL_MD_VSS_SZ, FTL_BLOCK_SIZE, NULL,
374 						     SPDK_ENV_LCORE_ID_ANY,
375 						     SPDK_MALLOC_DMA);
376 
377 	if (!buf) {
378 		return NULL;
379 	}
380 
381 	union ftl_md_vss vss_buf = {0};
382 	vss_buf.version.md_version = region->current.version;
383 	ftl_md_vss_buf_init(buf, count, &vss_buf);
384 	return buf;
385 }
386 
387 union ftl_md_vss *ftl_md_get_vss_buffer(struct ftl_md *md)
388 {
389 	return md->vss_data;
390 }
391 
392 static void
393 io_cleanup(struct ftl_md *md)
394 {
395 	spdk_dma_free(md->io.data);
396 	md->io.data = NULL;
397 
398 	spdk_dma_free(md->io.md);
399 	md->io.md = NULL;
400 }
401 
402 static void
403 exception(void *arg)
404 {
405 	struct ftl_md *md = arg;
406 
407 	md->cb(md->dev, md, -EINVAL);
408 	io_cleanup(md);
409 }
410 
411 static inline enum ftl_stats_type
412 get_bdev_io_ftl_stats_type(struct spdk_ftl_dev *dev, struct spdk_bdev_io *bdev_io) {
413 	struct spdk_bdev *nvc = spdk_bdev_desc_get_bdev(dev->nv_cache.bdev_desc);
414 
415 	if (bdev_io->bdev == nvc)
416 	{
417 		return FTL_STATS_TYPE_MD_NV_CACHE;
418 	} else
419 	{
420 		return FTL_STATS_TYPE_MD_BASE;
421 	}
422 }
423 
424 static void
425 audit_md_vss_version(struct ftl_md *md, uint64_t blocks)
426 {
427 #if defined(DEBUG)
428 	union ftl_md_vss *vss = md->io.md;
429 	/* Need to load the superblock regardless of its version */
430 	if (md->region->type == FTL_LAYOUT_REGION_TYPE_SB) {
431 		return;
432 	}
433 	while (blocks) {
434 		blocks--;
435 		assert(vss[blocks].version.md_version == md->region->current.version);
436 	}
437 #endif
438 }
439 
440 static void
441 read_write_blocks_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
442 {
443 	struct ftl_md *md = arg;
444 
445 	ftl_stats_bdev_io_completed(md->dev, get_bdev_io_ftl_stats_type(md->dev, bdev_io), bdev_io);
446 
447 	if (spdk_unlikely(!success)) {
448 		if (md->io.op == FTL_MD_OP_RESTORE && has_mirror(md)) {
449 			md->io.status = -EAGAIN;
450 		} else {
451 			md->io.status = -EIO;
452 		}
453 	} else {
454 		uint64_t blocks = bdev_io->u.bdev.num_blocks;
455 		uint64_t size = blocks * FTL_BLOCK_SIZE;
456 
457 		if (md->io.op == FTL_MD_OP_RESTORE) {
458 			memcpy(md->data + md->io.data_offset, md->io.data, size);
459 
460 			if (md->vss_data) {
461 				uint64_t vss_offset = md->io.data_offset / FTL_BLOCK_SIZE;
462 				vss_offset *= FTL_MD_VSS_SZ;
463 				audit_md_vss_version(md, blocks);
464 				memcpy(md->vss_data + vss_offset, md->io.md, blocks * FTL_MD_VSS_SZ);
465 			}
466 		}
467 
468 		md->io.address += blocks;
469 		md->io.remaining -= blocks;
470 		md->io.data_offset += size;
471 	}
472 
473 	spdk_bdev_free_io(bdev_io);
474 
475 	io_submit(md);
476 }
477 
478 static inline int
479 read_blocks(struct spdk_ftl_dev *dev, struct spdk_bdev_desc *desc,
480 	    struct spdk_io_channel *ch,
481 	    void *buf, void *md_buf,
482 	    uint64_t offset_blocks, uint64_t num_blocks,
483 	    spdk_bdev_io_completion_cb cb, void *cb_arg)
484 {
485 	if (desc == dev->nv_cache.bdev_desc) {
486 		return ftl_nv_cache_bdev_read_blocks_with_md(dev, desc, ch, buf, md_buf,
487 				offset_blocks, num_blocks,
488 				cb, cb_arg);
489 	} else if (md_buf) {
490 		return spdk_bdev_read_blocks_with_md(desc, ch, buf, md_buf,
491 						     offset_blocks, num_blocks,
492 						     cb, cb_arg);
493 	} else {
494 		return spdk_bdev_read_blocks(desc, ch, buf,
495 					     offset_blocks, num_blocks,
496 					     cb, cb_arg);
497 	}
498 }
499 
500 static inline int
501 write_blocks(struct spdk_ftl_dev *dev, struct spdk_bdev_desc *desc,
502 	     struct spdk_io_channel *ch,
503 	     void *buf, void *md_buf,
504 	     uint64_t offset_blocks, uint64_t num_blocks,
505 	     spdk_bdev_io_completion_cb cb, void *cb_arg)
506 {
507 	if (desc == dev->nv_cache.bdev_desc) {
508 		return ftl_nv_cache_bdev_write_blocks_with_md(dev, desc, ch, buf, md_buf,
509 				offset_blocks, num_blocks,
510 				cb, cb_arg);
511 	} else if (md_buf) {
512 		return spdk_bdev_write_blocks_with_md(desc, ch, buf, md_buf, offset_blocks,
513 						      num_blocks, cb, cb_arg);
514 	} else {
515 		return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
516 	}
517 }
518 
519 static void
520 read_write_blocks(void *_md)
521 {
522 	struct ftl_md *md = _md;
523 	const struct ftl_layout_region *region = md->region;
524 	uint64_t blocks;
525 	int rc = 0;
526 
527 	blocks = spdk_min(md->io.remaining, ftl_md_xfer_blocks(md->dev));
528 
529 	switch (md->io.op) {
530 	case FTL_MD_OP_RESTORE:
531 		rc = read_blocks(md->dev, region->bdev_desc, region->ioch,
532 				 md->io.data, md->io.md,
533 				 md->io.address, blocks,
534 				 read_write_blocks_cb, md);
535 		break;
536 	case FTL_MD_OP_PERSIST:
537 	case FTL_MD_OP_CLEAR:
538 		rc = write_blocks(md->dev, region->bdev_desc, region->ioch,
539 				  md->io.data, md->io.md,
540 				  md->io.address, blocks,
541 				  read_write_blocks_cb, md);
542 		break;
543 	default:
544 		ftl_abort();
545 	}
546 
547 	if (spdk_unlikely(rc)) {
548 		if (rc == -ENOMEM) {
549 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(region->bdev_desc);
550 			md->io.bdev_io_wait.bdev = bdev;
551 			md->io.bdev_io_wait.cb_fn = read_write_blocks;
552 			md->io.bdev_io_wait.cb_arg = md;
553 			spdk_bdev_queue_io_wait(bdev, region->ioch, &md->io.bdev_io_wait);
554 		} else {
555 			ftl_abort();
556 		}
557 	}
558 }
559 
560 static void
561 io_submit(struct ftl_md *md)
562 {
563 	if (!md->io.remaining || md->io.status) {
564 		io_done(md);
565 		return;
566 	}
567 
568 	if (md->io.op == FTL_MD_OP_PERSIST) {
569 		uint64_t blocks = spdk_min(md->io.remaining, ftl_md_xfer_blocks(md->dev));
570 
571 		memcpy(md->io.data, md->data + md->io.data_offset, FTL_BLOCK_SIZE * blocks);
572 
573 		if (md->vss_data) {
574 			uint64_t vss_offset = md->io.data_offset / FTL_BLOCK_SIZE;
575 			vss_offset *= FTL_MD_VSS_SZ;
576 			assert(md->io.md);
577 			memcpy(md->io.md, md->vss_data + vss_offset, FTL_MD_VSS_SZ * blocks);
578 			audit_md_vss_version(md, blocks);
579 		}
580 	}
581 #if defined(DEBUG)
582 	if (md->io.md && md->io.op == FTL_MD_OP_CLEAR) {
583 		uint64_t blocks = spdk_min(md->io.remaining, ftl_md_xfer_blocks(md->dev));
584 		audit_md_vss_version(md, blocks);
585 	}
586 #endif
587 
588 	read_write_blocks(md);
589 }
590 
591 static int
592 io_can_start(struct ftl_md *md)
593 {
594 	assert(NULL == md->io.data);
595 	if (NULL != md->io.data) {
596 		/* Outgoing IO on metadata */
597 		return -EINVAL;
598 	}
599 
600 	if (!md->region) {
601 		/* No device region to process data */
602 		return -EINVAL;
603 	}
604 
605 	if (md->region->current.blocks > md->data_blocks) {
606 		/* No device region to process data */
607 		FTL_ERRLOG(md->dev, "Blocks number mismatch between metadata object and"
608 			   "device region\n");
609 		return -EINVAL;
610 	}
611 
612 	return 0;
613 }
614 
615 static int
616 io_prepare(struct ftl_md *md, enum ftl_md_ops op)
617 {
618 	const struct ftl_layout_region *region = md->region;
619 	uint64_t data_size, meta_size = 0;
620 
621 	/* Allocates buffer for IO */
622 	data_size = xfer_size(md);
623 	md->io.data = spdk_zmalloc(data_size, FTL_BLOCK_SIZE, NULL,
624 				   SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
625 	if (!md->io.data) {
626 		return -ENOMEM;
627 	}
628 
629 	if (md->vss_data || md->region->vss_blksz) {
630 		meta_size = ftl_md_xfer_blocks(md->dev) * FTL_MD_VSS_SZ;
631 		md->io.md = spdk_zmalloc(meta_size, FTL_BLOCK_SIZE, NULL,
632 					 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
633 		if (!md->io.md) {
634 			spdk_dma_free(md->io.data);
635 			md->io.data = NULL;
636 			return -ENOMEM;
637 		}
638 	}
639 
640 	md->io.address = region->current.offset;
641 	md->io.remaining = region->current.blocks;
642 	md->io.data_offset = 0;
643 	md->io.status = 0;
644 	md->io.op = op;
645 
646 	return 0;
647 }
648 
649 static int
650 io_init(struct ftl_md *md, enum ftl_md_ops op)
651 {
652 	if (io_can_start(md)) {
653 		return -EINVAL;
654 	}
655 
656 	if (io_prepare(md, op)) {
657 		return -ENOMEM;
658 	}
659 
660 	return 0;
661 }
662 
663 static uint64_t
664 persist_entry_lba(struct ftl_md *md, uint64_t start_entry)
665 {
666 	return md->region->current.offset + start_entry * md->region->entry_size;
667 }
668 
669 static void
670 persist_entry_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
671 {
672 	struct ftl_md_io_entry_ctx *ctx = cb_arg;
673 	struct ftl_md *md = ctx->md;
674 
675 	ftl_stats_bdev_io_completed(md->dev, get_bdev_io_ftl_stats_type(md->dev, bdev_io), bdev_io);
676 
677 	spdk_bdev_free_io(bdev_io);
678 
679 	assert(ctx->remaining > 0);
680 	ctx->remaining--;
681 
682 	if (!success) {
683 		ctx->status = -EIO;
684 	}
685 
686 	if (!ctx->remaining) {
687 		ctx->cb(ctx->status, ctx->cb_arg);
688 	}
689 }
690 
691 static int
692 ftl_md_persist_entry_write_blocks(struct ftl_md_io_entry_ctx *ctx, struct ftl_md *md,
693 				  spdk_bdev_io_wait_cb retry_fn)
694 {
695 	int rc;
696 
697 	rc = write_blocks(md->dev, md->region->bdev_desc, md->region->ioch,
698 			  ctx->buffer, ctx->vss_buffer,
699 			  persist_entry_lba(md, ctx->start_entry), md->region->entry_size,
700 			  persist_entry_cb, ctx);
701 	if (spdk_unlikely(rc)) {
702 		if (rc == -ENOMEM) {
703 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(md->region->bdev_desc);
704 			ctx->bdev_io_wait.bdev = bdev;
705 			ctx->bdev_io_wait.cb_fn = retry_fn;
706 			ctx->bdev_io_wait.cb_arg = ctx;
707 			spdk_bdev_queue_io_wait(bdev, md->region->ioch, &ctx->bdev_io_wait);
708 		} else {
709 			ftl_abort();
710 		}
711 	}
712 
713 	return rc;
714 }
715 
716 static void
717 ftl_md_persist_entry_mirror(void *_ctx)
718 {
719 	struct ftl_md_io_entry_ctx *ctx = _ctx;
720 	struct ftl_md *md_mirror = ftl_md_get_mirror(ctx->md);
721 
722 	ftl_md_persist_entry_write_blocks(ctx, md_mirror, ftl_md_persist_entry_mirror);
723 }
724 
725 static void
726 ftl_md_persist_entry_primary(void *_ctx)
727 {
728 	struct ftl_md_io_entry_ctx *ctx = _ctx;
729 	struct ftl_md *md = ctx->md;
730 	int rc;
731 
732 	rc = ftl_md_persist_entry_write_blocks(ctx, md, ftl_md_persist_entry_primary);
733 
734 	if (!rc && has_mirror(md)) {
735 		assert(md->region->entry_size == (ftl_md_get_mirror(md))->region->entry_size);
736 
737 		/* The MD object has mirror so execute persist on it too */
738 		ftl_md_persist_entry_mirror(ctx);
739 		ctx->remaining++;
740 	}
741 }
742 
743 static void
744 _ftl_md_persist_entry(struct ftl_md_io_entry_ctx *ctx)
745 {
746 	ctx->status = 0;
747 	ctx->remaining = 1;
748 
749 	/* First execute an IO to the primary region */
750 	ftl_md_persist_entry_primary(ctx);
751 }
752 
753 void
754 ftl_md_persist_entry(struct ftl_md *md, uint64_t start_entry, void *buffer, void *vss_buffer,
755 		     ftl_md_io_entry_cb cb, void *cb_arg,
756 		     struct ftl_md_io_entry_ctx *ctx)
757 {
758 	if (spdk_unlikely(0 == md->region->entry_size)) {
759 		/* This MD has not been configured to support persist entry call */
760 		ftl_abort();
761 	}
762 
763 	/* Initialize persist entry context */
764 	ctx->cb = cb;
765 	ctx->cb_arg = cb_arg;
766 	ctx->md = md;
767 	ctx->start_entry = start_entry;
768 	ctx->buffer = buffer;
769 	ctx->vss_buffer = vss_buffer ? : md->entry_vss_dma_buf;
770 
771 	_ftl_md_persist_entry(ctx);
772 }
773 
774 static void
775 read_entry_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
776 {
777 	struct ftl_md_io_entry_ctx *ctx = cb_arg;
778 	struct ftl_md *md = ctx->md;
779 
780 	ftl_stats_bdev_io_completed(md->dev, get_bdev_io_ftl_stats_type(md->dev, bdev_io), bdev_io);
781 
782 	spdk_bdev_free_io(bdev_io);
783 
784 	if (!success) {
785 		if (has_mirror(md)) {
786 			md->mirror_enabled = true;
787 
788 			/* First read from the mirror */
789 			ftl_md_read_entry(ftl_md_get_mirror(md), ctx->start_entry, ctx->buffer,
790 					  ctx->vss_buffer,
791 					  ctx->cb, ctx->cb_arg,
792 					  ctx);
793 			return;
794 		} else {
795 			ctx->status = -EIO;
796 			goto finish_io;
797 		}
798 	}
799 
800 finish_io:
801 	ctx->cb(ctx->status, ctx->cb_arg);
802 }
803 
804 static void
805 ftl_md_read_entry_read_blocks(struct ftl_md_io_entry_ctx *ctx, struct ftl_md *md,
806 			      spdk_bdev_io_wait_cb retry_fn)
807 {
808 	int rc;
809 
810 	rc = read_blocks(md->dev, md->region->bdev_desc, md->region->ioch,
811 			 ctx->buffer, ctx->vss_buffer,
812 			 persist_entry_lba(md, ctx->start_entry), md->region->entry_size,
813 			 read_entry_cb, ctx);
814 
815 	if (spdk_unlikely(rc)) {
816 		if (rc == -ENOMEM) {
817 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(md->region->bdev_desc);
818 			ctx->bdev_io_wait.bdev = bdev;
819 			ctx->bdev_io_wait.cb_fn = retry_fn;
820 			ctx->bdev_io_wait.cb_arg = ctx;
821 			spdk_bdev_queue_io_wait(bdev, md->region->ioch, &ctx->bdev_io_wait);
822 		} else {
823 			ftl_abort();
824 		}
825 	}
826 }
827 
828 static void
829 _ftl_md_read_entry(void *_ctx)
830 {
831 	struct ftl_md_io_entry_ctx *ctx = _ctx;
832 
833 	ftl_md_read_entry_read_blocks(ctx, ctx->md, _ftl_md_read_entry);
834 }
835 
836 void
837 ftl_md_read_entry(struct ftl_md *md, uint64_t start_entry, void *buffer, void *vss_buffer,
838 		  ftl_md_io_entry_cb cb, void *cb_arg,
839 		  struct ftl_md_io_entry_ctx *ctx)
840 {
841 	if (spdk_unlikely(0 == md->region->entry_size)) {
842 		/* This MD has not been configured to support read entry call */
843 		ftl_abort();
844 	}
845 
846 	ctx->cb = cb;
847 	ctx->cb_arg = cb_arg;
848 	ctx->md = md;
849 	ctx->start_entry = start_entry;
850 	ctx->buffer = buffer;
851 	ctx->vss_buffer = vss_buffer;
852 
853 	_ftl_md_read_entry(ctx);
854 }
855 
856 void
857 ftl_md_persist_entry_retry(struct ftl_md_io_entry_ctx *ctx)
858 {
859 	_ftl_md_persist_entry(ctx);
860 }
861 
862 static void
863 persist_mirror_cb(struct spdk_ftl_dev *dev, struct ftl_md *md, int status)
864 {
865 	struct ftl_md *primary = md->owner.private;
866 
867 	if (status) {
868 		/* We got an error, stop persist procedure immediately */
869 		primary->io.status = status;
870 		io_done(primary);
871 	} else {
872 		/* Now continue the persist procedure on the primary MD object */
873 		if (0 == io_init(primary, FTL_MD_OP_PERSIST)) {
874 			io_submit(primary);
875 		} else {
876 			spdk_thread_send_msg(spdk_get_thread(), exception, primary);
877 		}
878 	}
879 }
880 
881 void
882 ftl_md_persist(struct ftl_md *md)
883 {
884 	if (has_mirror(md)) {
885 		struct ftl_md *md_mirror = ftl_md_get_mirror(md);
886 
887 		md->mirror_enabled = true;
888 
889 		/* Set callback and context in mirror */
890 		md_mirror->cb = persist_mirror_cb;
891 		md_mirror->owner.private = md;
892 
893 		/* First persist the mirror */
894 		ftl_md_persist(md_mirror);
895 		return;
896 	}
897 
898 	if (0 == io_init(md, FTL_MD_OP_PERSIST)) {
899 		io_submit(md);
900 	} else {
901 		spdk_thread_send_msg(spdk_get_thread(), exception, md);
902 	}
903 }
904 
905 static void
906 restore_mirror_cb(struct spdk_ftl_dev *dev, struct ftl_md *md, int status)
907 {
908 	struct ftl_md *primary = md->owner.private;
909 
910 	if (status) {
911 		/* Cannot restore the object from the mirror too, mark error and fail */
912 		primary->io.status = -EIO;
913 		io_done(primary);
914 	} else {
915 		/*
916 		 * Restoring from the mirror successful. Synchronize mirror to the primary.
917 		 * Because we read MD content from the mirror, we can disable it, only the primary
918 		 * requires persisting.
919 		 */
920 		primary->io.status = 0;
921 		primary->mirror_enabled = false;
922 		io_cleanup(primary);
923 		ftl_md_persist(primary);
924 		primary->mirror_enabled = true;
925 	}
926 }
927 
928 static void
929 restore_sync_cb(struct spdk_ftl_dev *dev, struct ftl_md *md, int status)
930 {
931 	struct ftl_md *primary = md->owner.private;
932 
933 	if (status) {
934 		/* Cannot sync the object from the primary to the mirror, mark error and fail */
935 		primary->io.status = -EIO;
936 		io_done(primary);
937 	} else {
938 		primary->cb(dev, primary, primary->io.status);
939 		io_cleanup(primary);
940 	}
941 }
942 
943 static int
944 restore_done(struct ftl_md *md)
945 {
946 	if (-EAGAIN == md->io.status) {
947 		/* Failed to read MD from primary region, try it from mirror.
948 		 * At the moment read the mirror entirely, (TODO) in the
949 		 * feature we can restore from primary and mirror region
950 		 * with finer granularity.
951 		 */
952 
953 		if (has_mirror(md)) {
954 			struct ftl_md *md_mirror = ftl_md_get_mirror(md);
955 
956 			md->mirror_enabled = true;
957 
958 			/* Set callback and context in mirror */
959 			md_mirror->cb = restore_mirror_cb;
960 			md_mirror->owner.private = md;
961 
962 			/* First persist the mirror */
963 			ftl_md_restore(md_mirror);
964 			return -EAGAIN;
965 		} else {
966 			return -EIO;
967 		}
968 	} else if (0 == md->io.status && false == md->dev->sb->clean) {
969 		if (has_mirror(md)) {
970 			struct ftl_md *md_mirror = ftl_md_get_mirror(md);
971 			/* There was a dirty shutdown, synchronize primary to mirror */
972 
973 			/* Set callback and context in the mirror */
974 			md_mirror->cb = restore_sync_cb;
975 			md_mirror->owner.private = md;
976 
977 			/* First persist the mirror */
978 			ftl_md_persist(md_mirror);
979 			return -EAGAIN;
980 		}
981 	}
982 
983 	return md->io.status;
984 }
985 
986 static void
987 io_done(struct ftl_md *md)
988 {
989 	int status;
990 
991 	if (md->io.op == FTL_MD_OP_RESTORE) {
992 		status = restore_done(md);
993 	} else {
994 		status = md->io.status;
995 	}
996 
997 	if (status != -EAGAIN) {
998 		/* The MD instance may be destroyed in ctx of md->cb(), e.g. upon region upgrade. */
999 		/* Need to cleanup DMA bufs first. */
1000 		io_cleanup(md);
1001 		md->cb(md->dev, md, status);
1002 	}
1003 }
1004 
1005 void
1006 ftl_md_restore(struct ftl_md *md)
1007 {
1008 	if (0 == io_init(md, FTL_MD_OP_RESTORE)) {
1009 		io_submit(md);
1010 	} else {
1011 		spdk_thread_send_msg(spdk_get_thread(), exception, md);
1012 	}
1013 }
1014 
1015 static int
1016 pattern_prepare(struct ftl_md *md,
1017 		int data_pattern, union ftl_md_vss *vss_pattern)
1018 {
1019 	void *data = md->io.data;
1020 	uint64_t data_size = xfer_size(md);
1021 
1022 	memset(data, data_pattern, data_size);
1023 
1024 	if (md->io.md) {
1025 		if (vss_pattern) {
1026 			/* store the VSS pattern... */
1027 			ftl_md_vss_buf_init(md->io.md, ftl_md_xfer_blocks(md->dev), vss_pattern);
1028 		} else {
1029 			/* ...or default init VSS to 0 */
1030 			union ftl_md_vss vss = {0};
1031 
1032 			vss.version.md_version = md->region->current.version;
1033 			ftl_md_vss_buf_init(md->io.md, ftl_md_xfer_blocks(md->dev), &vss);
1034 		}
1035 	}
1036 
1037 	return 0;
1038 }
1039 
1040 static void
1041 clear_mirror_cb(struct spdk_ftl_dev *dev, struct ftl_md *secondary, int status)
1042 {
1043 	struct ftl_md *primary = secondary->owner.private;
1044 
1045 	if (status) {
1046 		/* We got an error, stop persist procedure immediately */
1047 		primary->io.status = status;
1048 		io_done(primary);
1049 	} else {
1050 		/* Now continue the persist procedure on the primary MD object */
1051 		io_submit(primary);
1052 	}
1053 }
1054 
1055 void
1056 ftl_md_clear(struct ftl_md *md, int data_pattern, union ftl_md_vss *vss_pattern)
1057 {
1058 	if (has_mirror(md)) {
1059 		struct ftl_md *md_mirror = ftl_md_get_mirror(md);
1060 
1061 		md->mirror_enabled = true;
1062 
1063 		/* Set callback and context in mirror */
1064 		md_mirror->cb = clear_mirror_cb;
1065 		md_mirror->owner.private = md;
1066 
1067 		/* The pattern bufs will not be available outside of this fn context */
1068 		/* Configure the IO for the primary region now */
1069 		if (0 == io_init(md, FTL_MD_OP_CLEAR) && 0 == pattern_prepare(md, data_pattern, vss_pattern)) {
1070 			/* First persist the mirror */
1071 			ftl_md_clear(md_mirror, data_pattern, vss_pattern);
1072 		} else {
1073 			spdk_thread_send_msg(spdk_get_thread(), exception, md);
1074 		}
1075 		return;
1076 	}
1077 
1078 	if (0 == io_init(md, FTL_MD_OP_CLEAR) && 0 == pattern_prepare(md, data_pattern, vss_pattern)) {
1079 		io_submit(md);
1080 	} else {
1081 		spdk_thread_send_msg(spdk_get_thread(), exception, md);
1082 	}
1083 }
1084 
1085 const struct ftl_layout_region *
1086 ftl_md_get_region(struct ftl_md *md)
1087 {
1088 	return md->region;
1089 }
1090 
1091 void
1092 ftl_md_set_region(struct ftl_md *md,
1093 		  const struct ftl_layout_region *region)
1094 {
1095 	assert(region->current.blocks <= md->data_blocks);
1096 	md->region = region;
1097 
1098 	if (md->vss_data) {
1099 		union ftl_md_vss vss = {0};
1100 		vss.version.md_version = region->current.version;
1101 		ftl_md_vss_buf_init(md->vss_data, md->data_blocks, &vss);
1102 		if (region->entry_size) {
1103 			assert(md->entry_vss_dma_buf);
1104 			ftl_md_vss_buf_init(md->entry_vss_dma_buf, region->entry_size, &vss);
1105 		}
1106 	}
1107 
1108 	if (has_mirror(md)) {
1109 		md->mirror_enabled = true;
1110 	}
1111 }
1112 
1113 int
1114 ftl_md_create_region_flags(struct spdk_ftl_dev *dev, int region_type)
1115 {
1116 	int flags = FTL_MD_CREATE_SHM;
1117 
1118 	switch (region_type) {
1119 	case FTL_LAYOUT_REGION_TYPE_SB:
1120 		if (dev->conf.mode & SPDK_FTL_MODE_CREATE) {
1121 			flags |= FTL_MD_CREATE_SHM_NEW;
1122 		}
1123 		break;
1124 
1125 	case FTL_LAYOUT_REGION_TYPE_BAND_MD:
1126 	case FTL_LAYOUT_REGION_TYPE_NVC_MD:
1127 		if (!ftl_fast_startup(dev)) {
1128 			flags |= FTL_MD_CREATE_SHM_NEW;
1129 		}
1130 		break;
1131 	case FTL_LAYOUT_REGION_TYPE_VALID_MAP:
1132 	case FTL_LAYOUT_REGION_TYPE_TRIM_MD:
1133 		if (!ftl_fast_startup(dev) && !ftl_fast_recovery(dev)) {
1134 			flags |= FTL_MD_CREATE_SHM_NEW;
1135 		}
1136 		break;
1137 	default:
1138 		return FTL_MD_CREATE_HEAP;
1139 	}
1140 
1141 	return flags;
1142 }
1143 
1144 int
1145 ftl_md_destroy_region_flags(struct spdk_ftl_dev *dev, int region_type)
1146 {
1147 	switch (region_type) {
1148 	case FTL_LAYOUT_REGION_TYPE_SB:
1149 	case FTL_LAYOUT_REGION_TYPE_BAND_MD:
1150 	case FTL_LAYOUT_REGION_TYPE_VALID_MAP:
1151 	case FTL_LAYOUT_REGION_TYPE_NVC_MD:
1152 	case FTL_LAYOUT_REGION_TYPE_TRIM_MD:
1153 		if (dev->conf.fast_shutdown) {
1154 			return FTL_MD_DESTROY_SHM_KEEP;
1155 		}
1156 		break;
1157 
1158 	default:
1159 		break;
1160 	}
1161 	return 0;
1162 }
1163 
1164 int
1165 ftl_md_create_shm_flags(struct spdk_ftl_dev *dev)
1166 {
1167 	int flags = FTL_MD_CREATE_SHM;
1168 
1169 	if (!ftl_fast_startup(dev) && !ftl_fast_recovery(dev)) {
1170 		flags |= FTL_MD_CREATE_SHM_NEW;
1171 	}
1172 	return flags;
1173 }
1174 
1175 int
1176 ftl_md_destroy_shm_flags(struct spdk_ftl_dev *dev)
1177 {
1178 	return (dev->conf.fast_shutdown) ? FTL_MD_DESTROY_SHM_KEEP : 0;
1179 }
1180