xref: /spdk/lib/ftl/utils/ftl_md.c (revision fecffda6ecf8853b82edccde429b68252f0a62c5)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2022 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/env.h"
7 #include "spdk/bdev_module.h"
8 
9 #include "ftl_core.h"
10 #include "ftl_md.h"
11 #include "ftl_nv_cache_io.h"
12 
13 struct ftl_md;
14 static void io_submit(struct ftl_md *md);
15 static void io_done(struct ftl_md *md);
16 
17 static bool
18 has_mirror(struct ftl_md *md)
19 {
20 	if (md->region) {
21 		if (md->region->mirror_type != FTL_LAYOUT_REGION_TYPE_INVALID) {
22 			return md->mirror_enabled;
23 		}
24 	}
25 
26 	return false;
27 }
28 
29 static int
30 setup_mirror(struct ftl_md *md)
31 {
32 	if (!md->mirror) {
33 		md->mirror = calloc(1, sizeof(*md->mirror));
34 		if (!md->mirror) {
35 			return -ENOMEM;
36 		}
37 		md->mirror_enabled = true;
38 	}
39 
40 	md->mirror->dev = md->dev;
41 	md->mirror->data_blocks = md->data_blocks;
42 	md->mirror->data = md->data;
43 	md->mirror->vss_data = md->vss_data;
44 
45 	/* Set proper region in secondary object */
46 	assert(md->region->mirror_type != FTL_LAYOUT_REGION_TYPE_INVALID);
47 	md->mirror->region = &md->dev->layout.region[md->region->mirror_type];
48 
49 	return 0;
50 }
51 
52 uint64_t
53 ftl_md_xfer_blocks(struct spdk_ftl_dev *dev)
54 {
55 	return 4ULL * dev->xfer_size;
56 }
57 
58 static uint64_t
59 xfer_size(struct ftl_md *md)
60 {
61 	return ftl_md_xfer_blocks(md->dev) * FTL_BLOCK_SIZE;
62 }
63 
64 static void
65 ftl_md_create_heap(struct ftl_md *md, uint64_t vss_blksz)
66 {
67 	md->shm_fd = -1;
68 	md->vss_data = NULL;
69 	md->data = calloc(md->data_blocks, FTL_BLOCK_SIZE + vss_blksz);
70 
71 	if (md->data && vss_blksz) {
72 		md->vss_data = ((char *)md->data) + md->data_blocks * FTL_BLOCK_SIZE;
73 	}
74 }
75 
76 static void
77 ftl_md_destroy_heap(struct ftl_md *md)
78 {
79 	if (md->data) {
80 		free(md->data);
81 		md->data = NULL;
82 		md->vss_data = NULL;
83 	}
84 }
85 
86 static int
87 ftl_wrapper_open(const char *name, int of, mode_t m)
88 {
89 	return open(name, of, m);
90 }
91 
92 static void
93 ftl_md_setup_obj(struct ftl_md *md, int flags,
94 		 const char *name)
95 {
96 	char uuid_str[SPDK_UUID_STRING_LEN];
97 	const char *fmt;
98 
99 	if (!(flags & FTL_MD_CREATE_SHM)) {
100 		assert(false);
101 		return;
102 	}
103 
104 	/* TODO: temporary, define a proper hugetlbfs mountpoint */
105 	fmt = "/dev/hugepages/ftl_%s_%s";
106 	md->shm_mmap_flags = MAP_SHARED;
107 	md->shm_open = ftl_wrapper_open;
108 	md->shm_unlink = unlink;
109 
110 	if (name == NULL ||
111 	    spdk_uuid_fmt_lower(uuid_str, SPDK_UUID_STRING_LEN, &md->dev->conf.uuid) ||
112 	    snprintf(md->name, sizeof(md->name) / sizeof(md->name[0]),
113 		     fmt, uuid_str, name) <= 0) {
114 		md->name[0] = 0;
115 	}
116 }
117 
118 static void
119 ftl_md_invalidate_shm(struct ftl_md *md)
120 {
121 	if (md->dev->sb_shm && md->dev->sb_shm->shm_ready) {
122 		md->dev->init_retry = true;
123 		md->dev->sb_shm->shm_ready = false;
124 	}
125 }
126 
127 static void
128 ftl_md_create_shm(struct ftl_md *md, uint64_t vss_blksz, int flags)
129 {
130 	struct stat shm_stat;
131 	size_t vss_blk_offs;
132 	void *shm_ptr;
133 	int open_flags = O_RDWR;
134 	mode_t open_mode = S_IRUSR | S_IWUSR;
135 
136 	assert(md->shm_open && md->shm_unlink);
137 	md->data = NULL;
138 	md->vss_data = NULL;
139 	md->shm_sz = 0;
140 
141 	/* Must have an object name */
142 	if (md->name[0] == 0) {
143 		assert(false);
144 		return;
145 	}
146 
147 	/* If specified, unlink before create a new SHM object */
148 	if (flags & FTL_MD_CREATE_SHM_NEW) {
149 		if (md->shm_unlink(md->name) < 0 && errno != ENOENT) {
150 			ftl_md_invalidate_shm(md);
151 			return;
152 		}
153 		open_flags += O_CREAT | O_TRUNC;
154 	}
155 
156 	/* Open existing or create a new SHM object, then query its props */
157 	md->shm_fd = md->shm_open(md->name, open_flags, open_mode);
158 	if (md->shm_fd < 0 || fstat(md->shm_fd, &shm_stat) < 0) {
159 		goto err_shm;
160 	}
161 
162 	/* Verify open mode hasn't changed */
163 	if ((shm_stat.st_mode & (S_IRWXU | S_IRWXG | S_IRWXO)) != open_mode) {
164 		goto err_shm;
165 	}
166 
167 	/* Round up the SHM obj size to the nearest blk size (i.e. page size) */
168 	md->shm_sz = spdk_divide_round_up(md->data_blocks * FTL_BLOCK_SIZE, shm_stat.st_blksize);
169 
170 	/* Add some blks for VSS metadata */
171 	vss_blk_offs = md->shm_sz;
172 
173 	if (vss_blksz) {
174 		md->shm_sz += spdk_divide_round_up(md->data_blocks * vss_blksz,
175 						   shm_stat.st_blksize);
176 	}
177 
178 	/* Total SHM obj size */
179 	md->shm_sz *= shm_stat.st_blksize;
180 
181 	/* Set or check the object size - zero init`d in case of set (FTL_MD_CREATE_SHM_NEW) */
182 	if ((shm_stat.st_size == 0 && (ftruncate(md->shm_fd, md->shm_sz) < 0 ||
183 				       (flags & FTL_MD_CREATE_SHM_NEW) == 0))
184 	    || (shm_stat.st_size > 0 && (size_t)shm_stat.st_size != md->shm_sz)) {
185 		goto err_shm;
186 	}
187 
188 	/* Create a virtual memory mapping for the object */
189 	shm_ptr = mmap(NULL, md->shm_sz, PROT_READ | PROT_WRITE, md->shm_mmap_flags,
190 		       md->shm_fd, 0);
191 	if (shm_ptr == MAP_FAILED) {
192 		goto err_shm;
193 	}
194 
195 	md->data = shm_ptr;
196 	if (vss_blksz) {
197 		md->vss_data = ((char *)shm_ptr) + vss_blk_offs * shm_stat.st_blksize;
198 	}
199 
200 	/* Lock the pages in memory (i.e. prevent the pages to be paged out) */
201 	if (mlock(md->data, md->shm_sz) < 0) {
202 		goto err_map;
203 	}
204 
205 	if (spdk_mem_register(md->data, md->shm_sz)) {
206 		goto err_mlock;
207 	}
208 	md->mem_reg = true;
209 
210 	return;
211 
212 	/* Cleanup upon fault */
213 err_mlock:
214 	munlock(md->data, md->shm_sz);
215 
216 err_map:
217 	munmap(md->data, md->shm_sz);
218 	md->data = NULL;
219 	md->vss_data = NULL;
220 	md->shm_sz = 0;
221 
222 err_shm:
223 	if (md->shm_fd >= 0) {
224 		close(md->shm_fd);
225 		md->shm_unlink(md->name);
226 		md->shm_fd = -1;
227 	}
228 	ftl_md_invalidate_shm(md);
229 }
230 
231 static void
232 ftl_md_destroy_shm(struct ftl_md *md, int flags)
233 {
234 	if (!md->data) {
235 		return;
236 	}
237 
238 	assert(md->shm_sz > 0);
239 	if (md->mem_reg) {
240 		spdk_mem_unregister(md->data, md->shm_sz);
241 		md->mem_reg = false;
242 	}
243 
244 	/* Unlock the pages in memory */
245 	munlock(md->data, md->shm_sz);
246 
247 	/* Remove the virtual memory mapping for the object */
248 	munmap(md->data, md->shm_sz);
249 
250 	/* Close SHM object fd */
251 	close(md->shm_fd);
252 
253 	md->data = NULL;
254 	md->vss_data = NULL;
255 
256 	/* If specified, keep the object in SHM */
257 	if (flags & FTL_MD_DESTROY_SHM_KEEP) {
258 		return;
259 	}
260 
261 	/* Otherwise destroy/unlink the object */
262 	assert(md->name[0] != 0 && md->shm_unlink != NULL);
263 	md->shm_unlink(md->name);
264 }
265 
266 struct ftl_md *ftl_md_create(struct spdk_ftl_dev *dev, uint64_t blocks,
267 			     uint64_t vss_blksz, const char *name, int flags,
268 			     const struct ftl_layout_region *region)
269 {
270 	struct ftl_md *md;
271 
272 	md = calloc(1, sizeof(*md));
273 	if (!md) {
274 		return NULL;
275 	}
276 	md->dev = dev;
277 	md->data_blocks = blocks;
278 	md->mirror_enabled = true;
279 
280 	if (flags != FTL_MD_CREATE_NO_MEM) {
281 		if (flags & FTL_MD_CREATE_SHM) {
282 			ftl_md_setup_obj(md, flags, name);
283 			ftl_md_create_shm(md, vss_blksz, flags);
284 		} else {
285 			assert((flags & FTL_MD_CREATE_HEAP) == FTL_MD_CREATE_HEAP);
286 			ftl_md_create_heap(md, vss_blksz);
287 		}
288 
289 		if (!md->data) {
290 			free(md);
291 			return NULL;
292 		}
293 	}
294 
295 	if (region) {
296 		size_t entry_vss_buf_size = vss_blksz * region->entry_size;
297 
298 		if (entry_vss_buf_size) {
299 			md->entry_vss_dma_buf = spdk_malloc(entry_vss_buf_size, FTL_BLOCK_SIZE,
300 							    NULL, SPDK_ENV_LCORE_ID_ANY,
301 							    SPDK_MALLOC_DMA);
302 			if (!md->entry_vss_dma_buf) {
303 				goto err;
304 			}
305 		}
306 
307 		if (ftl_md_set_region(md, region)) {
308 			goto err;
309 		}
310 	}
311 
312 	return md;
313 err:
314 	ftl_md_destroy(md, ftl_md_destroy_region_flags(dev, region->type));
315 	return NULL;
316 }
317 
318 int
319 ftl_md_unlink(struct spdk_ftl_dev *dev, const char *name, int flags)
320 {
321 	struct ftl_md md = { 0 };
322 
323 	if (0 == (flags & FTL_MD_CREATE_SHM)) {
324 		/* Unlink can be called for shared memory only */
325 		return -EINVAL;
326 	}
327 
328 	md.dev = dev;
329 	ftl_md_setup_obj(&md, flags, name);
330 
331 	return md.shm_unlink(md.name);
332 }
333 
334 void
335 ftl_md_destroy(struct ftl_md *md, int flags)
336 {
337 	if (!md) {
338 		return;
339 	}
340 
341 	ftl_md_free_buf(md, flags);
342 
343 	spdk_free(md->entry_vss_dma_buf);
344 
345 	free(md->mirror);
346 	free(md);
347 }
348 
349 void
350 ftl_md_free_buf(struct ftl_md *md, int flags)
351 {
352 	if (!md) {
353 		return;
354 	}
355 
356 	if (md->shm_fd < 0) {
357 		assert(flags == 0);
358 		ftl_md_destroy_heap(md);
359 	} else {
360 		ftl_md_destroy_shm(md, flags);
361 	}
362 }
363 
364 void *
365 ftl_md_get_buffer(struct ftl_md *md)
366 {
367 	return md->data;
368 }
369 
370 uint64_t
371 ftl_md_get_buffer_size(struct ftl_md *md)
372 {
373 	return md->data_blocks * FTL_BLOCK_SIZE;
374 }
375 
376 static void
377 ftl_md_vss_buf_init(union ftl_md_vss *buf, uint32_t count,
378 		    const union ftl_md_vss *vss_pattern)
379 {
380 	while (count) {
381 		count--;
382 		buf[count] = *vss_pattern;
383 	}
384 }
385 
386 union ftl_md_vss *ftl_md_vss_buf_alloc(struct ftl_layout_region *region, uint32_t count)
387 {
388 	union ftl_md_vss *buf = spdk_zmalloc(count * FTL_MD_VSS_SZ, FTL_BLOCK_SIZE, NULL,
389 						     SPDK_ENV_LCORE_ID_ANY,
390 						     SPDK_MALLOC_DMA);
391 
392 	if (!buf) {
393 		return NULL;
394 	}
395 
396 	union ftl_md_vss vss_buf = {0};
397 	vss_buf.version.md_version = region->current.version;
398 	ftl_md_vss_buf_init(buf, count, &vss_buf);
399 	return buf;
400 }
401 
402 union ftl_md_vss *ftl_md_get_vss_buffer(struct ftl_md *md)
403 {
404 	return md->vss_data;
405 }
406 
407 static void
408 io_cleanup(struct ftl_md *md)
409 {
410 	spdk_dma_free(md->io.data);
411 	md->io.data = NULL;
412 
413 	spdk_dma_free(md->io.md);
414 	md->io.md = NULL;
415 }
416 
417 static void
418 exception(void *arg)
419 {
420 	struct ftl_md *md = arg;
421 
422 	md->cb(md->dev, md, -EINVAL);
423 	io_cleanup(md);
424 }
425 
426 static inline enum ftl_stats_type
427 get_bdev_io_ftl_stats_type(struct spdk_ftl_dev *dev, struct spdk_bdev_io *bdev_io) {
428 	struct spdk_bdev *nvc = spdk_bdev_desc_get_bdev(dev->nv_cache.bdev_desc);
429 
430 	if (bdev_io->bdev == nvc)
431 	{
432 		return FTL_STATS_TYPE_MD_NV_CACHE;
433 	} else
434 	{
435 		return FTL_STATS_TYPE_MD_BASE;
436 	}
437 }
438 
439 static void
440 audit_md_vss_version(struct ftl_md *md, uint64_t blocks)
441 {
442 #if defined(DEBUG)
443 	union ftl_md_vss *vss = md->io.md;
444 	while (blocks) {
445 		blocks--;
446 		assert(vss[blocks].version.md_version == md->region->current.version);
447 	}
448 #endif
449 }
450 
451 static void
452 read_write_blocks_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
453 {
454 	struct ftl_md *md = arg;
455 
456 	ftl_stats_bdev_io_completed(md->dev, get_bdev_io_ftl_stats_type(md->dev, bdev_io), bdev_io);
457 
458 	if (spdk_unlikely(!success)) {
459 		if (md->io.op == FTL_MD_OP_RESTORE && has_mirror(md)) {
460 			md->io.status = -EAGAIN;
461 		} else {
462 			md->io.status = -EIO;
463 		}
464 	} else {
465 		uint64_t blocks = bdev_io->u.bdev.num_blocks;
466 		uint64_t size = blocks * FTL_BLOCK_SIZE;
467 
468 		if (md->io.op == FTL_MD_OP_RESTORE) {
469 			memcpy(md->data + md->io.data_offset, md->io.data, size);
470 
471 			if (md->vss_data) {
472 				uint64_t vss_offset = md->io.data_offset / FTL_BLOCK_SIZE;
473 				vss_offset *= FTL_MD_VSS_SZ;
474 				audit_md_vss_version(md, blocks);
475 				memcpy(md->vss_data + vss_offset, md->io.md, blocks * FTL_MD_VSS_SZ);
476 			}
477 		}
478 
479 		md->io.address += blocks;
480 		md->io.remaining -= blocks;
481 		md->io.data_offset += size;
482 	}
483 
484 	spdk_bdev_free_io(bdev_io);
485 
486 	io_submit(md);
487 }
488 
489 static inline int
490 read_blocks(struct spdk_ftl_dev *dev, struct spdk_bdev_desc *desc,
491 	    struct spdk_io_channel *ch,
492 	    void *buf, void *md_buf,
493 	    uint64_t offset_blocks, uint64_t num_blocks,
494 	    spdk_bdev_io_completion_cb cb, void *cb_arg)
495 {
496 	if (desc == dev->nv_cache.bdev_desc) {
497 		return ftl_nv_cache_bdev_read_blocks_with_md(dev, desc, ch, buf, md_buf,
498 				offset_blocks, num_blocks,
499 				cb, cb_arg);
500 	} else if (md_buf) {
501 		return spdk_bdev_read_blocks_with_md(desc, ch, buf, md_buf,
502 						     offset_blocks, num_blocks,
503 						     cb, cb_arg);
504 	} else {
505 		return spdk_bdev_read_blocks(desc, ch, buf,
506 					     offset_blocks, num_blocks,
507 					     cb, cb_arg);
508 	}
509 }
510 
511 static inline int
512 write_blocks(struct spdk_ftl_dev *dev, struct spdk_bdev_desc *desc,
513 	     struct spdk_io_channel *ch,
514 	     void *buf, void *md_buf,
515 	     uint64_t offset_blocks, uint64_t num_blocks,
516 	     spdk_bdev_io_completion_cb cb, void *cb_arg)
517 {
518 	if (desc == dev->nv_cache.bdev_desc) {
519 		return ftl_nv_cache_bdev_write_blocks_with_md(dev, desc, ch, buf, md_buf,
520 				offset_blocks, num_blocks,
521 				cb, cb_arg);
522 	} else if (md_buf) {
523 		return spdk_bdev_write_blocks_with_md(desc, ch, buf, md_buf, offset_blocks,
524 						      num_blocks, cb, cb_arg);
525 	} else {
526 		return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
527 	}
528 }
529 
530 static void
531 read_write_blocks(void *_md)
532 {
533 	struct ftl_md *md = _md;
534 	const struct ftl_layout_region *region = md->region;
535 	uint64_t blocks;
536 	int rc = 0;
537 
538 	blocks = spdk_min(md->io.remaining, ftl_md_xfer_blocks(md->dev));
539 
540 	switch (md->io.op) {
541 	case FTL_MD_OP_RESTORE:
542 		rc = read_blocks(md->dev, region->bdev_desc, region->ioch,
543 				 md->io.data, md->io.md,
544 				 md->io.address, blocks,
545 				 read_write_blocks_cb, md);
546 		break;
547 	case FTL_MD_OP_PERSIST:
548 	case FTL_MD_OP_CLEAR:
549 		rc = write_blocks(md->dev, region->bdev_desc, region->ioch,
550 				  md->io.data, md->io.md,
551 				  md->io.address, blocks,
552 				  read_write_blocks_cb, md);
553 		break;
554 	default:
555 		ftl_abort();
556 	}
557 
558 	if (spdk_unlikely(rc)) {
559 		if (rc == -ENOMEM) {
560 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(region->bdev_desc);
561 			md->io.bdev_io_wait.bdev = bdev;
562 			md->io.bdev_io_wait.cb_fn = read_write_blocks;
563 			md->io.bdev_io_wait.cb_arg = md;
564 			spdk_bdev_queue_io_wait(bdev, region->ioch, &md->io.bdev_io_wait);
565 		} else {
566 			ftl_abort();
567 		}
568 	}
569 }
570 
571 static void
572 io_submit(struct ftl_md *md)
573 {
574 	if (!md->io.remaining || md->io.status) {
575 		io_done(md);
576 		return;
577 	}
578 
579 	if (md->io.op == FTL_MD_OP_PERSIST) {
580 		uint64_t blocks = spdk_min(md->io.remaining, ftl_md_xfer_blocks(md->dev));
581 
582 		memcpy(md->io.data, md->data + md->io.data_offset, FTL_BLOCK_SIZE * blocks);
583 
584 		if (md->vss_data) {
585 			uint64_t vss_offset = md->io.data_offset / FTL_BLOCK_SIZE;
586 			vss_offset *= FTL_MD_VSS_SZ;
587 			assert(md->io.md);
588 			memcpy(md->io.md, md->vss_data + vss_offset, FTL_MD_VSS_SZ * blocks);
589 			audit_md_vss_version(md, blocks);
590 		}
591 	}
592 #if defined(DEBUG)
593 	if (md->io.md && md->io.op == FTL_MD_OP_CLEAR) {
594 		uint64_t blocks = spdk_min(md->io.remaining, ftl_md_xfer_blocks(md->dev));
595 		audit_md_vss_version(md, blocks);
596 	}
597 #endif
598 
599 	read_write_blocks(md);
600 }
601 
602 static int
603 io_can_start(struct ftl_md *md)
604 {
605 	assert(NULL == md->io.data);
606 	if (NULL != md->io.data) {
607 		/* Outgoing IO on metadata */
608 		return -EINVAL;
609 	}
610 
611 	if (!md->region) {
612 		/* No device region to process data */
613 		return -EINVAL;
614 	}
615 
616 	if (md->region->current.blocks > md->data_blocks) {
617 		/* No device region to process data */
618 		FTL_ERRLOG(md->dev, "Blocks number mismatch between metadata object and"
619 			   "device region\n");
620 		return -EINVAL;
621 	}
622 
623 	return 0;
624 }
625 
626 static int
627 io_prepare(struct ftl_md *md, enum ftl_md_ops op)
628 {
629 	const struct ftl_layout_region *region = md->region;
630 	uint64_t data_size, meta_size = 0;
631 
632 	/* Allocates buffer for IO */
633 	data_size = xfer_size(md);
634 	md->io.data = spdk_zmalloc(data_size, FTL_BLOCK_SIZE, NULL,
635 				   SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
636 	if (!md->io.data) {
637 		return -ENOMEM;
638 	}
639 
640 	if (md->vss_data || md->region->vss_blksz) {
641 		meta_size = ftl_md_xfer_blocks(md->dev) * FTL_MD_VSS_SZ;
642 		md->io.md = spdk_zmalloc(meta_size, FTL_BLOCK_SIZE, NULL,
643 					 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
644 		if (!md->io.md) {
645 			spdk_dma_free(md->io.data);
646 			md->io.data = NULL;
647 			return -ENOMEM;
648 		}
649 	}
650 
651 	md->io.address = region->current.offset;
652 	md->io.remaining = region->current.blocks;
653 	md->io.data_offset = 0;
654 	md->io.status = 0;
655 	md->io.op = op;
656 
657 	return 0;
658 }
659 
660 static int
661 io_init(struct ftl_md *md, enum ftl_md_ops op)
662 {
663 	if (io_can_start(md)) {
664 		return -EINVAL;
665 	}
666 
667 	if (io_prepare(md, op)) {
668 		return -ENOMEM;
669 	}
670 
671 	return 0;
672 }
673 
674 static uint64_t
675 persist_entry_lba(struct ftl_md *md, uint64_t start_entry)
676 {
677 	return md->region->current.offset + start_entry * md->region->entry_size;
678 }
679 
680 static void
681 persist_entry_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
682 {
683 	struct ftl_md_io_entry_ctx *ctx = cb_arg;
684 	struct ftl_md *md = ctx->md;
685 
686 	ftl_stats_bdev_io_completed(md->dev, get_bdev_io_ftl_stats_type(md->dev, bdev_io), bdev_io);
687 
688 	spdk_bdev_free_io(bdev_io);
689 
690 	assert(ctx->remaining > 0);
691 	ctx->remaining--;
692 
693 	if (!success) {
694 		ctx->status = -EIO;
695 	}
696 
697 	if (!ctx->remaining) {
698 		ctx->cb(ctx->status, ctx->cb_arg);
699 	}
700 }
701 
702 static int
703 ftl_md_persist_entry_write_blocks(struct ftl_md_io_entry_ctx *ctx, struct ftl_md *md,
704 				  spdk_bdev_io_wait_cb retry_fn)
705 {
706 	int rc;
707 
708 	rc = write_blocks(md->dev, md->region->bdev_desc, md->region->ioch,
709 			  ctx->buffer, ctx->vss_buffer,
710 			  persist_entry_lba(md, ctx->start_entry), md->region->entry_size,
711 			  persist_entry_cb, ctx);
712 	if (spdk_unlikely(rc)) {
713 		if (rc == -ENOMEM) {
714 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(md->region->bdev_desc);
715 			ctx->bdev_io_wait.bdev = bdev;
716 			ctx->bdev_io_wait.cb_fn = retry_fn;
717 			ctx->bdev_io_wait.cb_arg = ctx;
718 			spdk_bdev_queue_io_wait(bdev, md->region->ioch, &ctx->bdev_io_wait);
719 		} else {
720 			ftl_abort();
721 		}
722 	}
723 
724 	return rc;
725 }
726 
727 static void
728 ftl_md_persist_entry_mirror(void *_ctx)
729 {
730 	struct ftl_md_io_entry_ctx *ctx = _ctx;
731 
732 	ftl_md_persist_entry_write_blocks(ctx, ctx->md->mirror, ftl_md_persist_entry_mirror);
733 }
734 
735 static void
736 ftl_md_persist_entry_primary(void *_ctx)
737 {
738 	struct ftl_md_io_entry_ctx *ctx = _ctx;
739 	struct ftl_md *md = ctx->md;
740 	int rc;
741 
742 	rc = ftl_md_persist_entry_write_blocks(ctx, md, ftl_md_persist_entry_primary);
743 
744 	if (!rc && has_mirror(md)) {
745 		assert(md->region->entry_size == md->mirror->region->entry_size);
746 
747 		/* The MD object has mirror so execute persist on it too */
748 		ftl_md_persist_entry_mirror(ctx);
749 		ctx->remaining++;
750 	}
751 }
752 
753 static void
754 _ftl_md_persist_entry(struct ftl_md_io_entry_ctx *ctx)
755 {
756 	ctx->status = 0;
757 	ctx->remaining = 1;
758 
759 	/* First execute an IO to the primary region */
760 	ftl_md_persist_entry_primary(ctx);
761 }
762 
763 void
764 ftl_md_persist_entry(struct ftl_md *md, uint64_t start_entry, void *buffer, void *vss_buffer,
765 		     ftl_md_io_entry_cb cb, void *cb_arg,
766 		     struct ftl_md_io_entry_ctx *ctx)
767 {
768 	if (spdk_unlikely(0 == md->region->entry_size)) {
769 		/* This MD has not been configured to support persist entry call */
770 		ftl_abort();
771 	}
772 
773 	/* Initialize persist entry context */
774 	ctx->cb = cb;
775 	ctx->cb_arg = cb_arg;
776 	ctx->md = md;
777 	ctx->start_entry = start_entry;
778 	ctx->buffer = buffer;
779 	ctx->vss_buffer = vss_buffer ? : md->entry_vss_dma_buf;
780 
781 	_ftl_md_persist_entry(ctx);
782 }
783 
784 static void
785 read_entry_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
786 {
787 	struct ftl_md_io_entry_ctx *ctx = cb_arg;
788 	struct ftl_md *md = ctx->md;
789 
790 	ftl_stats_bdev_io_completed(md->dev, get_bdev_io_ftl_stats_type(md->dev, bdev_io), bdev_io);
791 
792 	spdk_bdev_free_io(bdev_io);
793 
794 	if (!success) {
795 		if (has_mirror(md)) {
796 			if (setup_mirror(md)) {
797 				/* An error when setup the mirror */
798 				ctx->status = -EIO;
799 				goto finish_io;
800 			}
801 
802 			/* First read from the mirror */
803 			ftl_md_read_entry(md->mirror, ctx->start_entry, ctx->buffer, ctx->vss_buffer,
804 					  ctx->cb, ctx->cb_arg,
805 					  ctx);
806 			return;
807 		} else {
808 			ctx->status = -EIO;
809 			goto finish_io;
810 		}
811 	}
812 
813 finish_io:
814 	ctx->cb(ctx->status, ctx->cb_arg);
815 }
816 
817 static void
818 ftl_md_read_entry_read_blocks(struct ftl_md_io_entry_ctx *ctx, struct ftl_md *md,
819 			      spdk_bdev_io_wait_cb retry_fn)
820 {
821 	int rc;
822 
823 	rc = read_blocks(md->dev, md->region->bdev_desc, md->region->ioch,
824 			 ctx->buffer, ctx->vss_buffer,
825 			 persist_entry_lba(md, ctx->start_entry), md->region->entry_size,
826 			 read_entry_cb, ctx);
827 
828 	if (spdk_unlikely(rc)) {
829 		if (rc == -ENOMEM) {
830 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(md->region->bdev_desc);
831 			ctx->bdev_io_wait.bdev = bdev;
832 			ctx->bdev_io_wait.cb_fn = retry_fn;
833 			ctx->bdev_io_wait.cb_arg = ctx;
834 			spdk_bdev_queue_io_wait(bdev, md->region->ioch, &ctx->bdev_io_wait);
835 		} else {
836 			ftl_abort();
837 		}
838 	}
839 }
840 
841 static void
842 _ftl_md_read_entry(void *_ctx)
843 {
844 	struct ftl_md_io_entry_ctx *ctx = _ctx;
845 
846 	ftl_md_read_entry_read_blocks(ctx, ctx->md, _ftl_md_read_entry);
847 }
848 
849 void
850 ftl_md_read_entry(struct ftl_md *md, uint64_t start_entry, void *buffer, void *vss_buffer,
851 		  ftl_md_io_entry_cb cb, void *cb_arg,
852 		  struct ftl_md_io_entry_ctx *ctx)
853 {
854 	if (spdk_unlikely(0 == md->region->entry_size)) {
855 		/* This MD has not been configured to support read entry call */
856 		ftl_abort();
857 	}
858 
859 	ctx->cb = cb;
860 	ctx->cb_arg = cb_arg;
861 	ctx->md = md;
862 	ctx->start_entry = start_entry;
863 	ctx->buffer = buffer;
864 	ctx->vss_buffer = vss_buffer;
865 
866 	_ftl_md_read_entry(ctx);
867 }
868 
869 void
870 ftl_md_persist_entry_retry(struct ftl_md_io_entry_ctx *ctx)
871 {
872 	_ftl_md_persist_entry(ctx);
873 }
874 
875 static void
876 persist_mirror_cb(struct spdk_ftl_dev *dev, struct ftl_md *md, int status)
877 {
878 	struct ftl_md *primary = md->owner.private;
879 
880 	if (status) {
881 		/* We got an error, stop persist procedure immediately */
882 		primary->io.status = status;
883 		io_done(primary);
884 	} else {
885 		/* Now continue the persist procedure on the primary MD object */
886 		if (0 == io_init(primary, FTL_MD_OP_PERSIST)) {
887 			io_submit(primary);
888 		} else {
889 			spdk_thread_send_msg(spdk_get_thread(), exception, primary);
890 		}
891 	}
892 }
893 
894 void
895 ftl_md_persist(struct ftl_md *md)
896 {
897 	if (has_mirror(md)) {
898 		if (setup_mirror(md)) {
899 			/* An error when setup the mirror */
900 			spdk_thread_send_msg(spdk_get_thread(), exception, md);
901 			return;
902 		}
903 
904 		/* Set callback and context in mirror */
905 		md->mirror->cb = persist_mirror_cb;
906 		md->mirror->owner.private = md;
907 
908 		/* First persist the mirror */
909 		ftl_md_persist(md->mirror);
910 		return;
911 	}
912 
913 	if (0 == io_init(md, FTL_MD_OP_PERSIST)) {
914 		io_submit(md);
915 	} else {
916 		spdk_thread_send_msg(spdk_get_thread(), exception, md);
917 	}
918 }
919 
920 static void
921 restore_mirror_cb(struct spdk_ftl_dev *dev, struct ftl_md *md, int status)
922 {
923 	struct ftl_md *primary = md->owner.private;
924 
925 	if (status) {
926 		/* Cannot restore the object from the mirror too, mark error and fail */
927 		primary->io.status = -EIO;
928 		io_done(primary);
929 	} else {
930 		/*
931 		 * Restoring from the mirror successful. Synchronize mirror to the primary.
932 		 * Because we read MD content from the mirror, we can disable it, only the primary
933 		 * requires persisting.
934 		 */
935 		primary->io.status = 0;
936 		primary->mirror_enabled = false;
937 		io_cleanup(primary);
938 		ftl_md_persist(primary);
939 		primary->mirror_enabled = true;
940 	}
941 }
942 
943 static void
944 restore_sync_cb(struct spdk_ftl_dev *dev, struct ftl_md *md, int status)
945 {
946 	struct ftl_md *primary = md->owner.private;
947 
948 	if (status) {
949 		/* Cannot sync the object from the primary to the mirror, mark error and fail */
950 		primary->io.status = -EIO;
951 		io_done(primary);
952 	} else {
953 		primary->cb(dev, primary, primary->io.status);
954 		io_cleanup(primary);
955 	}
956 }
957 
958 static int
959 restore_done(struct ftl_md *md)
960 {
961 	if (-EAGAIN == md->io.status) {
962 		/* Failed to read MD from primary region, try it from mirror.
963 		 * At the moment read the mirror entirely, (TODO) in the
964 		 * feature we can restore from primary and mirror region
965 		 * with finer granularity.
966 		 */
967 
968 		if (has_mirror(md)) {
969 			if (setup_mirror(md)) {
970 				/* An error when setup the mirror */
971 				return -EIO;
972 			}
973 
974 			/* Set callback and context in mirror */
975 			md->mirror->cb = restore_mirror_cb;
976 			md->mirror->owner.private = md;
977 
978 			/* First persist the mirror */
979 			ftl_md_restore(md->mirror);
980 			return -EAGAIN;
981 		} else {
982 			return -EIO;
983 		}
984 	} else if (0 == md->io.status && false == md->dev->sb->clean) {
985 		if (has_mirror(md)) {
986 			/* There was a dirty shutdown, synchronize primary to mirror */
987 
988 			/* Set callback and context in the mirror */
989 			md->mirror->cb = restore_sync_cb;
990 			md->mirror->owner.private = md;
991 
992 			/* First persist the mirror */
993 			ftl_md_persist(md->mirror);
994 			return -EAGAIN;
995 		}
996 	}
997 
998 	return md->io.status;
999 }
1000 
1001 static void
1002 io_done(struct ftl_md *md)
1003 {
1004 	int status;
1005 
1006 	if (md->io.op == FTL_MD_OP_RESTORE) {
1007 		status = restore_done(md);
1008 	} else {
1009 		status = md->io.status;
1010 	}
1011 
1012 	if (status != -EAGAIN) {
1013 		md->cb(md->dev, md, status);
1014 		io_cleanup(md);
1015 	}
1016 }
1017 
1018 void
1019 ftl_md_restore(struct ftl_md *md)
1020 {
1021 	if (0 == io_init(md, FTL_MD_OP_RESTORE)) {
1022 		io_submit(md);
1023 	} else {
1024 		spdk_thread_send_msg(spdk_get_thread(), exception, md);
1025 	}
1026 }
1027 
1028 static int
1029 pattern_prepare(struct ftl_md *md,
1030 		int data_pattern, union ftl_md_vss *vss_pattern)
1031 {
1032 	void *data = md->io.data;
1033 	uint64_t data_size = xfer_size(md);
1034 
1035 	memset(data, data_pattern, data_size);
1036 
1037 	if (md->io.md) {
1038 		if (vss_pattern) {
1039 			/* store the VSS pattern... */
1040 			ftl_md_vss_buf_init(md->io.md, ftl_md_xfer_blocks(md->dev), vss_pattern);
1041 		} else {
1042 			/* ...or default init VSS to 0 */
1043 			union ftl_md_vss vss = {0};
1044 
1045 			vss.version.md_version = md->region->current.version;
1046 			ftl_md_vss_buf_init(md->io.md, ftl_md_xfer_blocks(md->dev), &vss);
1047 		}
1048 	}
1049 
1050 	return 0;
1051 }
1052 
1053 static void
1054 clear_mirror_cb(struct spdk_ftl_dev *dev, struct ftl_md *secondary, int status)
1055 {
1056 	struct ftl_md *primary = secondary->owner.private;
1057 
1058 	if (status) {
1059 		/* We got an error, stop persist procedure immediately */
1060 		primary->io.status = status;
1061 		io_done(primary);
1062 	} else {
1063 		/* Now continue the persist procedure on the primary MD object */
1064 		if (0 == io_init(primary, FTL_MD_OP_CLEAR) &&
1065 		    0 == pattern_prepare(primary, *(int *)secondary->io.data,
1066 					 secondary->io.md)) {
1067 			io_submit(primary);
1068 		} else {
1069 			spdk_thread_send_msg(spdk_get_thread(), exception, primary);
1070 		}
1071 	}
1072 }
1073 
1074 void
1075 ftl_md_clear(struct ftl_md *md, int data_pattern, union ftl_md_vss *vss_pattern)
1076 {
1077 	if (has_mirror(md)) {
1078 		if (setup_mirror(md)) {
1079 			/* An error when setup the mirror */
1080 			spdk_thread_send_msg(spdk_get_thread(), exception, md);
1081 			return;
1082 		}
1083 
1084 		/* Set callback and context in mirror */
1085 		md->mirror->cb = clear_mirror_cb;
1086 		md->mirror->owner.private = md;
1087 
1088 		/* First persist the mirror */
1089 		ftl_md_clear(md->mirror, data_pattern, vss_pattern);
1090 		return;
1091 	}
1092 
1093 	if (0 == io_init(md, FTL_MD_OP_CLEAR) && 0 == pattern_prepare(md, data_pattern, vss_pattern)) {
1094 		io_submit(md);
1095 	} else {
1096 		spdk_thread_send_msg(spdk_get_thread(), exception, md);
1097 	}
1098 }
1099 
1100 const struct ftl_layout_region *
1101 ftl_md_get_region(struct ftl_md *md)
1102 {
1103 	return md->region;
1104 }
1105 
1106 int
1107 ftl_md_set_region(struct ftl_md *md,
1108 		  const struct ftl_layout_region *region)
1109 {
1110 	assert(region->current.blocks <= md->data_blocks);
1111 	md->region = region;
1112 
1113 	if (md->vss_data) {
1114 		union ftl_md_vss vss = {0};
1115 		vss.version.md_version = region->current.version;
1116 		ftl_md_vss_buf_init(md->vss_data, md->data_blocks, &vss);
1117 		if (region->entry_size) {
1118 			assert(md->entry_vss_dma_buf);
1119 			ftl_md_vss_buf_init(md->entry_vss_dma_buf, region->entry_size, &vss);
1120 		}
1121 	}
1122 
1123 	if (has_mirror(md)) {
1124 		return setup_mirror(md);
1125 	}
1126 
1127 	return 0;
1128 }
1129 
1130 int
1131 ftl_md_create_region_flags(struct spdk_ftl_dev *dev, int region_type)
1132 {
1133 	int flags = FTL_MD_CREATE_SHM;
1134 
1135 	switch (region_type) {
1136 	case FTL_LAYOUT_REGION_TYPE_SB:
1137 		if (dev->conf.mode & SPDK_FTL_MODE_CREATE) {
1138 			flags |= FTL_MD_CREATE_SHM_NEW;
1139 		}
1140 		break;
1141 
1142 	case FTL_LAYOUT_REGION_TYPE_BAND_MD:
1143 	case FTL_LAYOUT_REGION_TYPE_NVC_MD:
1144 		if (!ftl_fast_startup(dev)) {
1145 			flags |= FTL_MD_CREATE_SHM_NEW;
1146 		}
1147 		break;
1148 	case FTL_LAYOUT_REGION_TYPE_VALID_MAP:
1149 	case FTL_LAYOUT_REGION_TYPE_TRIM_MD:
1150 		if (!ftl_fast_startup(dev) && !ftl_fast_recovery(dev)) {
1151 			flags |= FTL_MD_CREATE_SHM_NEW;
1152 		}
1153 		break;
1154 	default:
1155 		return FTL_MD_CREATE_HEAP;
1156 	}
1157 
1158 	return flags;
1159 }
1160 
1161 int
1162 ftl_md_destroy_region_flags(struct spdk_ftl_dev *dev, int region_type)
1163 {
1164 	switch (region_type) {
1165 	case FTL_LAYOUT_REGION_TYPE_SB:
1166 	case FTL_LAYOUT_REGION_TYPE_BAND_MD:
1167 	case FTL_LAYOUT_REGION_TYPE_VALID_MAP:
1168 	case FTL_LAYOUT_REGION_TYPE_NVC_MD:
1169 	case FTL_LAYOUT_REGION_TYPE_TRIM_MD:
1170 		if (dev->conf.fast_shutdown) {
1171 			return FTL_MD_DESTROY_SHM_KEEP;
1172 		}
1173 		break;
1174 
1175 	default:
1176 		break;
1177 	}
1178 	return 0;
1179 }
1180 
1181 int
1182 ftl_md_create_shm_flags(struct spdk_ftl_dev *dev)
1183 {
1184 	int flags = FTL_MD_CREATE_SHM;
1185 
1186 	if (!ftl_fast_startup(dev) && !ftl_fast_recovery(dev)) {
1187 		flags |= FTL_MD_CREATE_SHM_NEW;
1188 	}
1189 	return flags;
1190 }
1191 
1192 int
1193 ftl_md_destroy_shm_flags(struct spdk_ftl_dev *dev)
1194 {
1195 	return (dev->conf.fast_shutdown) ? FTL_MD_DESTROY_SHM_KEEP : 0;
1196 }
1197