xref: /spdk/lib/ftl/utils/ftl_md.c (revision 2e283fcb67a8ee1d9b4f470f17bec57bbe3adad5)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/env.h"
7 #include "spdk/bdev_module.h"
8 
9 #include "ftl_core.h"
10 #include "ftl_md.h"
11 #include "ftl_nv_cache_io.h"
12 
13 struct ftl_md;
14 static void io_submit(struct ftl_md *md);
15 static void io_done(struct ftl_md *md);
16 
17 static bool
18 has_mirror(struct ftl_md *md)
19 {
20 	if (md->region) {
21 		if (md->region->mirror_type != FTL_LAYOUT_REGION_TYPE_INVALID) {
22 			return md->mirror_enabled;
23 		}
24 	}
25 
26 	return false;
27 }
28 
29 static int
30 setup_mirror(struct ftl_md *md)
31 {
32 	if (!md->mirror) {
33 		md->mirror = calloc(1, sizeof(*md->mirror));
34 		if (!md->mirror) {
35 			return -ENOMEM;
36 		}
37 		md->mirror_enabled = true;
38 	}
39 
40 	md->mirror->dev = md->dev;
41 	md->mirror->data_blocks = md->data_blocks;
42 	md->mirror->data = md->data;
43 	md->mirror->vss_data = md->vss_data;
44 
45 	/* Set proper region in secondary object */
46 	assert(md->region->mirror_type != FTL_LAYOUT_REGION_TYPE_INVALID);
47 	md->mirror->region = &md->dev->layout.region[md->region->mirror_type];
48 
49 	return 0;
50 }
51 
52 uint64_t
53 ftl_md_xfer_blocks(struct spdk_ftl_dev *dev)
54 {
55 	return 4ULL * dev->xfer_size;
56 }
57 
58 static uint64_t
59 xfer_size(struct ftl_md *md)
60 {
61 	return ftl_md_xfer_blocks(md->dev) * FTL_BLOCK_SIZE;
62 }
63 
64 static void
65 ftl_md_create_heap(struct ftl_md *md, uint64_t vss_blksz)
66 {
67 	md->shm_fd = -1;
68 	md->vss_data = NULL;
69 	md->data = calloc(md->data_blocks, FTL_BLOCK_SIZE + vss_blksz);
70 
71 	if (md->data && vss_blksz) {
72 		md->vss_data = ((char *)md->data) + md->data_blocks * FTL_BLOCK_SIZE;
73 	}
74 }
75 
76 static void
77 ftl_md_destroy_heap(struct ftl_md *md)
78 {
79 	if (md->data) {
80 		free(md->data);
81 		md->data = NULL;
82 		md->vss_data = NULL;
83 	}
84 }
85 
86 static int
87 ftl_wrapper_open(const char *name, int of, mode_t m)
88 {
89 	return open(name, of, m);
90 }
91 
92 static void
93 ftl_md_setup_obj(struct ftl_md *md, int flags,
94 		 const char *name)
95 {
96 	char uuid_str[SPDK_UUID_STRING_LEN];
97 	const char *fmt;
98 
99 	if (!(flags & FTL_MD_CREATE_SHM)) {
100 		assert(false);
101 		return;
102 	}
103 
104 	/* TODO: temporary, define a proper hugetlbfs mountpoint */
105 	fmt = "/dev/hugepages/ftl_%s_%s";
106 	md->shm_mmap_flags = MAP_SHARED;
107 	md->shm_open = ftl_wrapper_open;
108 	md->shm_unlink = unlink;
109 
110 	if (name == NULL ||
111 	    spdk_uuid_fmt_lower(uuid_str, SPDK_UUID_STRING_LEN, &md->dev->conf.uuid) ||
112 	    snprintf(md->name, sizeof(md->name) / sizeof(md->name[0]),
113 		     fmt, uuid_str, name) <= 0) {
114 		md->name[0] = 0;
115 	}
116 }
117 
118 static void
119 ftl_md_invalidate_shm(struct ftl_md *md)
120 {
121 	if (md->dev->sb_shm && md->dev->sb_shm->shm_ready) {
122 		md->dev->init_retry = true;
123 		md->dev->sb_shm->shm_ready = false;
124 	}
125 }
126 
127 static void
128 ftl_md_create_shm(struct ftl_md *md, uint64_t vss_blksz, int flags)
129 {
130 	struct stat shm_stat;
131 	size_t vss_blk_offs;
132 	void *shm_ptr;
133 	int open_flags = O_RDWR;
134 	mode_t open_mode = S_IRUSR | S_IWUSR;
135 
136 	assert(md->shm_open && md->shm_unlink);
137 	md->data = NULL;
138 	md->vss_data = NULL;
139 	md->shm_sz = 0;
140 
141 	/* Must have an object name */
142 	if (md->name[0] == 0) {
143 		assert(false);
144 		return;
145 	}
146 
147 	/* If specified, unlink before create a new SHM object */
148 	if (flags & FTL_MD_CREATE_SHM_NEW) {
149 		if (md->shm_unlink(md->name) < 0 && errno != ENOENT) {
150 			ftl_md_invalidate_shm(md);
151 			return;
152 		}
153 		open_flags += O_CREAT | O_TRUNC;
154 	}
155 
156 	/* Open existing or create a new SHM object, then query its props */
157 	md->shm_fd = md->shm_open(md->name, open_flags, open_mode);
158 	if (md->shm_fd < 0 || fstat(md->shm_fd, &shm_stat) < 0) {
159 		goto err_shm;
160 	}
161 
162 	/* Verify open mode hasn't changed */
163 	if ((shm_stat.st_mode & (S_IRWXU | S_IRWXG | S_IRWXO)) != open_mode) {
164 		goto err_shm;
165 	}
166 
167 	/* Round up the SHM obj size to the nearest blk size (i.e. page size) */
168 	md->shm_sz = spdk_divide_round_up(md->data_blocks * FTL_BLOCK_SIZE, shm_stat.st_blksize);
169 
170 	/* Add some blks for VSS metadata */
171 	vss_blk_offs = md->shm_sz;
172 
173 	if (vss_blksz) {
174 		md->shm_sz += spdk_divide_round_up(md->data_blocks * vss_blksz,
175 						   shm_stat.st_blksize);
176 	}
177 
178 	/* Total SHM obj size */
179 	md->shm_sz *= shm_stat.st_blksize;
180 
181 	/* Set or check the object size - zero init`d in case of set (FTL_MD_CREATE_SHM_NEW) */
182 	if ((shm_stat.st_size == 0 && (ftruncate(md->shm_fd, md->shm_sz) < 0 ||
183 				       (flags & FTL_MD_CREATE_SHM_NEW) == 0))
184 	    || (shm_stat.st_size > 0 && (size_t)shm_stat.st_size != md->shm_sz)) {
185 		goto err_shm;
186 	}
187 
188 	/* Create a virtual memory mapping for the object */
189 	shm_ptr = mmap(NULL, md->shm_sz, PROT_READ | PROT_WRITE, md->shm_mmap_flags,
190 		       md->shm_fd, 0);
191 	if (shm_ptr == MAP_FAILED) {
192 		goto err_shm;
193 	}
194 
195 	md->data = shm_ptr;
196 	if (vss_blksz) {
197 		md->vss_data = ((char *)shm_ptr) + vss_blk_offs * shm_stat.st_blksize;
198 	}
199 
200 	/* Lock the pages in memory (i.e. prevent the pages to be paged out) */
201 	if (mlock(md->data, md->shm_sz) < 0) {
202 		goto err_map;
203 	}
204 
205 	if (spdk_mem_register(md->data, md->shm_sz)) {
206 		goto err_mlock;
207 	}
208 	md->mem_reg = true;
209 
210 	return;
211 
212 	/* Cleanup upon fault */
213 err_mlock:
214 	munlock(md->data, md->shm_sz);
215 
216 err_map:
217 	munmap(md->data, md->shm_sz);
218 	md->data = NULL;
219 	md->vss_data = NULL;
220 	md->shm_sz = 0;
221 
222 err_shm:
223 	if (md->shm_fd >= 0) {
224 		close(md->shm_fd);
225 		md->shm_unlink(md->name);
226 		md->shm_fd = -1;
227 	}
228 	ftl_md_invalidate_shm(md);
229 }
230 
231 static void
232 ftl_md_destroy_shm(struct ftl_md *md)
233 {
234 	if (!md->data) {
235 		return;
236 	}
237 
238 	assert(md->shm_sz > 0);
239 	if (md->mem_reg) {
240 		spdk_mem_unregister(md->data, md->shm_sz);
241 		md->mem_reg = false;
242 	}
243 
244 	/* Unlock the pages in memory */
245 	munlock(md->data, md->shm_sz);
246 
247 	/* Remove the virtual memory mapping for the object */
248 	munmap(md->data, md->shm_sz);
249 
250 	/* Close SHM object fd */
251 	close(md->shm_fd);
252 
253 	md->data = NULL;
254 	md->vss_data = NULL;
255 
256 	/* Otherwise destroy/unlink the object */
257 	assert(md->name[0] != 0 && md->shm_unlink != NULL);
258 	md->shm_unlink(md->name);
259 }
260 
261 struct ftl_md *ftl_md_create(struct spdk_ftl_dev *dev, uint64_t blocks,
262 			     uint64_t vss_blksz, const char *name, int flags,
263 			     const struct ftl_layout_region *region)
264 {
265 	struct ftl_md *md;
266 
267 	md = calloc(1, sizeof(*md));
268 	if (!md) {
269 		return NULL;
270 	}
271 	md->dev = dev;
272 	md->data_blocks = blocks;
273 	md->mirror_enabled = true;
274 
275 	if (flags != FTL_MD_CREATE_NO_MEM) {
276 		if (flags & FTL_MD_CREATE_SHM) {
277 			ftl_md_setup_obj(md, flags, name);
278 			ftl_md_create_shm(md, vss_blksz, flags);
279 		} else {
280 			assert((flags & FTL_MD_CREATE_HEAP) == FTL_MD_CREATE_HEAP);
281 			ftl_md_create_heap(md, vss_blksz);
282 		}
283 
284 		if (!md->data) {
285 			free(md);
286 			return NULL;
287 		}
288 	}
289 
290 	if (region) {
291 		size_t entry_vss_buf_size = vss_blksz * region->entry_size;
292 
293 		if (entry_vss_buf_size) {
294 			md->entry_vss_dma_buf = spdk_malloc(entry_vss_buf_size, FTL_BLOCK_SIZE,
295 							    NULL, SPDK_ENV_LCORE_ID_ANY,
296 							    SPDK_MALLOC_DMA);
297 			if (!md->entry_vss_dma_buf) {
298 				goto err;
299 			}
300 		}
301 
302 		if (ftl_md_set_region(md, region)) {
303 			goto err;
304 		}
305 	}
306 
307 	return md;
308 err:
309 	ftl_md_destroy(md);
310 	return NULL;
311 }
312 
313 int
314 ftl_md_unlink(struct spdk_ftl_dev *dev, const char *name, int flags)
315 {
316 	struct ftl_md md = { 0 };
317 
318 	if (0 == (flags & FTL_MD_CREATE_SHM)) {
319 		/* Unlink can be called for shared memory only */
320 		return -EINVAL;
321 	}
322 
323 	md.dev = dev;
324 	ftl_md_setup_obj(&md, flags, name);
325 
326 	return md.shm_unlink(md.name);
327 }
328 
329 void
330 ftl_md_destroy(struct ftl_md *md)
331 {
332 	if (!md) {
333 		return;
334 	}
335 
336 	ftl_md_free_buf(md);
337 
338 	spdk_free(md->entry_vss_dma_buf);
339 
340 	free(md->mirror);
341 	free(md);
342 }
343 
344 void
345 ftl_md_free_buf(struct ftl_md *md)
346 {
347 	if (!md) {
348 		return;
349 	}
350 
351 	if (md->shm_fd < 0) {
352 		ftl_md_destroy_heap(md);
353 	} else {
354 		ftl_md_destroy_shm(md);
355 	}
356 }
357 
358 void *
359 ftl_md_get_buffer(struct ftl_md *md)
360 {
361 	return md->data;
362 }
363 
364 uint64_t
365 ftl_md_get_buffer_size(struct ftl_md *md)
366 {
367 	return md->data_blocks * FTL_BLOCK_SIZE;
368 }
369 
370 static void
371 ftl_md_vss_buf_init(union ftl_md_vss *buf, uint32_t count,
372 		    const union ftl_md_vss *vss_pattern)
373 {
374 	while (count) {
375 		count--;
376 		buf[count] = *vss_pattern;
377 	}
378 }
379 
380 union ftl_md_vss *ftl_md_vss_buf_alloc(struct ftl_layout_region *region, uint32_t count)
381 {
382 	union ftl_md_vss *buf = spdk_zmalloc(count * FTL_MD_VSS_SZ, FTL_BLOCK_SIZE, NULL,
383 						     SPDK_ENV_LCORE_ID_ANY,
384 						     SPDK_MALLOC_DMA);
385 
386 	if (!buf) {
387 		return NULL;
388 	}
389 
390 	union ftl_md_vss vss_buf = {0};
391 	vss_buf.version.md_version = region->current.version;
392 	ftl_md_vss_buf_init(buf, count, &vss_buf);
393 	return buf;
394 }
395 
396 union ftl_md_vss *ftl_md_get_vss_buffer(struct ftl_md *md)
397 {
398 	return md->vss_data;
399 }
400 
401 static void
402 io_cleanup(struct ftl_md *md)
403 {
404 	spdk_dma_free(md->io.data);
405 	md->io.data = NULL;
406 
407 	spdk_dma_free(md->io.md);
408 	md->io.md = NULL;
409 }
410 
411 static void
412 exception(void *arg)
413 {
414 	struct ftl_md *md = arg;
415 
416 	md->cb(md->dev, md, -EINVAL);
417 	io_cleanup(md);
418 }
419 
420 static void
421 audit_md_vss_version(struct ftl_md *md, uint64_t blocks)
422 {
423 #if defined(DEBUG)
424 	union ftl_md_vss *vss = md->io.md;
425 	while (blocks) {
426 		blocks--;
427 		assert(vss[blocks].version.md_version == md->region->current.version);
428 	}
429 #endif
430 }
431 
432 static void
433 read_write_blocks_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
434 {
435 	struct ftl_md *md = arg;
436 
437 	if (spdk_unlikely(!success)) {
438 		if (md->io.op == FTL_MD_OP_RESTORE && has_mirror(md)) {
439 			md->io.status = -EAGAIN;
440 		} else {
441 			md->io.status = -EIO;
442 		}
443 	} else {
444 		uint64_t blocks = bdev_io->u.bdev.num_blocks;
445 		uint64_t size = blocks * FTL_BLOCK_SIZE;
446 
447 		if (md->io.op == FTL_MD_OP_RESTORE) {
448 			memcpy(md->data + md->io.data_offset, md->io.data, size);
449 
450 			if (md->vss_data) {
451 				uint64_t vss_offset = md->io.data_offset / FTL_BLOCK_SIZE;
452 				vss_offset *= FTL_MD_VSS_SZ;
453 				audit_md_vss_version(md, blocks);
454 				memcpy(md->vss_data + vss_offset, md->io.md, blocks * FTL_MD_VSS_SZ);
455 			}
456 		}
457 
458 		md->io.address += blocks;
459 		md->io.remaining -= blocks;
460 		md->io.data_offset += size;
461 	}
462 
463 	spdk_bdev_free_io(bdev_io);
464 
465 	io_submit(md);
466 }
467 
468 static inline int
469 read_blocks(struct spdk_ftl_dev *dev, struct spdk_bdev_desc *desc,
470 	    struct spdk_io_channel *ch,
471 	    void *buf, void *md_buf,
472 	    uint64_t offset_blocks, uint64_t num_blocks,
473 	    spdk_bdev_io_completion_cb cb, void *cb_arg)
474 {
475 	if (desc == dev->nv_cache.bdev_desc) {
476 		return ftl_nv_cache_bdev_read_blocks_with_md(dev, desc, ch, buf, md_buf,
477 				offset_blocks, num_blocks,
478 				cb, cb_arg);
479 	} else if (md_buf) {
480 		return spdk_bdev_read_blocks_with_md(desc, ch, buf, md_buf,
481 						     offset_blocks, num_blocks,
482 						     cb, cb_arg);
483 	} else {
484 		return spdk_bdev_read_blocks(desc, ch, buf,
485 					     offset_blocks, num_blocks,
486 					     cb, cb_arg);
487 	}
488 }
489 
490 static inline int
491 write_blocks(struct spdk_ftl_dev *dev, struct spdk_bdev_desc *desc,
492 	     struct spdk_io_channel *ch,
493 	     void *buf, void *md_buf,
494 	     uint64_t offset_blocks, uint64_t num_blocks,
495 	     spdk_bdev_io_completion_cb cb, void *cb_arg)
496 {
497 	if (desc == dev->nv_cache.bdev_desc) {
498 		return ftl_nv_cache_bdev_write_blocks_with_md(dev, desc, ch, buf, md_buf,
499 				offset_blocks, num_blocks,
500 				cb, cb_arg);
501 	} else if (md_buf) {
502 		return spdk_bdev_write_blocks_with_md(desc, ch, buf, md_buf, offset_blocks,
503 						      num_blocks, cb, cb_arg);
504 	} else {
505 		return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
506 	}
507 }
508 
509 static void
510 read_write_blocks(void *_md)
511 {
512 	struct ftl_md *md = _md;
513 	const struct ftl_layout_region *region = md->region;
514 	uint64_t blocks;
515 	int rc = 0;
516 
517 	blocks = spdk_min(md->io.remaining, ftl_md_xfer_blocks(md->dev));
518 
519 	switch (md->io.op) {
520 	case FTL_MD_OP_RESTORE:
521 		rc = read_blocks(md->dev, region->bdev_desc, region->ioch,
522 				 md->io.data, md->io.md,
523 				 md->io.address, blocks,
524 				 read_write_blocks_cb, md);
525 		break;
526 	case FTL_MD_OP_PERSIST:
527 	case FTL_MD_OP_CLEAR:
528 		rc = write_blocks(md->dev, region->bdev_desc, region->ioch,
529 				  md->io.data, md->io.md,
530 				  md->io.address, blocks,
531 				  read_write_blocks_cb, md);
532 		break;
533 	default:
534 		ftl_abort();
535 	}
536 
537 	if (spdk_unlikely(rc)) {
538 		if (rc == -ENOMEM) {
539 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(region->bdev_desc);
540 			md->io.bdev_io_wait.bdev = bdev;
541 			md->io.bdev_io_wait.cb_fn = read_write_blocks;
542 			md->io.bdev_io_wait.cb_arg = md;
543 			spdk_bdev_queue_io_wait(bdev, region->ioch, &md->io.bdev_io_wait);
544 		} else {
545 			ftl_abort();
546 		}
547 	}
548 }
549 
550 static void
551 io_submit(struct ftl_md *md)
552 {
553 	if (!md->io.remaining || md->io.status) {
554 		io_done(md);
555 		return;
556 	}
557 
558 	if (md->io.op == FTL_MD_OP_PERSIST) {
559 		uint64_t blocks = spdk_min(md->io.remaining, ftl_md_xfer_blocks(md->dev));
560 
561 		memcpy(md->io.data, md->data + md->io.data_offset, FTL_BLOCK_SIZE * blocks);
562 
563 		if (md->vss_data) {
564 			uint64_t vss_offset = md->io.data_offset / FTL_BLOCK_SIZE;
565 			vss_offset *= FTL_MD_VSS_SZ;
566 			assert(md->io.md);
567 			memcpy(md->io.md, md->vss_data + vss_offset, FTL_MD_VSS_SZ * blocks);
568 			audit_md_vss_version(md, blocks);
569 		}
570 	}
571 #if defined(DEBUG)
572 	if (md->io.md && md->io.op == FTL_MD_OP_CLEAR) {
573 		uint64_t blocks = spdk_min(md->io.remaining, ftl_md_xfer_blocks(md->dev));
574 		audit_md_vss_version(md, blocks);
575 	}
576 #endif
577 
578 	read_write_blocks(md);
579 }
580 
581 static int
582 io_can_start(struct ftl_md *md)
583 {
584 	assert(NULL == md->io.data);
585 	if (NULL != md->io.data) {
586 		/* Outgoing IO on metadata */
587 		return -EINVAL;
588 	}
589 
590 	if (!md->region) {
591 		/* No device region to process data */
592 		return -EINVAL;
593 	}
594 
595 	if (md->region->current.blocks > md->data_blocks) {
596 		/* No device region to process data */
597 		FTL_ERRLOG(md->dev, "Blocks number mismatch between metadata object and"
598 			   "device region\n");
599 		return -EINVAL;
600 	}
601 
602 	return 0;
603 }
604 
605 static int
606 io_prepare(struct ftl_md *md, enum ftl_md_ops op)
607 {
608 	const struct ftl_layout_region *region = md->region;
609 	uint64_t data_size, meta_size = 0;
610 
611 	/* Allocates buffer for IO */
612 	data_size = xfer_size(md);
613 	md->io.data = spdk_zmalloc(data_size, FTL_BLOCK_SIZE, NULL,
614 				   SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
615 	if (!md->io.data) {
616 		return -ENOMEM;
617 	}
618 
619 	if (md->vss_data || md->region->vss_blksz) {
620 		meta_size = ftl_md_xfer_blocks(md->dev) * FTL_MD_VSS_SZ;
621 		md->io.md = spdk_zmalloc(meta_size, FTL_BLOCK_SIZE, NULL,
622 					 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
623 		if (!md->io.md) {
624 			spdk_dma_free(md->io.data);
625 			md->io.data = NULL;
626 			return -ENOMEM;
627 		}
628 	}
629 
630 	md->io.address = region->current.offset;
631 	md->io.remaining = region->current.blocks;
632 	md->io.data_offset = 0;
633 	md->io.status = 0;
634 	md->io.op = op;
635 
636 	return 0;
637 }
638 
639 static int
640 io_init(struct ftl_md *md, enum ftl_md_ops op)
641 {
642 	if (io_can_start(md)) {
643 		return -EINVAL;
644 	}
645 
646 	if (io_prepare(md, op)) {
647 		return -ENOMEM;
648 	}
649 
650 	return 0;
651 }
652 
653 static uint64_t
654 persist_entry_lba(struct ftl_md *md, uint64_t start_entry)
655 {
656 	return md->region->current.offset + start_entry * md->region->entry_size;
657 }
658 
659 static void
660 persist_entry_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
661 {
662 	struct ftl_md_io_entry_ctx *ctx = cb_arg;
663 
664 	spdk_bdev_free_io(bdev_io);
665 
666 	assert(ctx->remaining > 0);
667 	ctx->remaining--;
668 
669 	if (!success) {
670 		ctx->status = -EIO;
671 	}
672 
673 	if (!ctx->remaining) {
674 		ctx->cb(ctx->status, ctx->cb_arg);
675 	}
676 }
677 
678 static int
679 ftl_md_persist_entry_write_blocks(struct ftl_md_io_entry_ctx *ctx, struct ftl_md *md,
680 				  spdk_bdev_io_wait_cb retry_fn)
681 {
682 	int rc;
683 
684 	rc = write_blocks(md->dev, md->region->bdev_desc, md->region->ioch,
685 			  ctx->buffer, ctx->vss_buffer,
686 			  persist_entry_lba(md, ctx->start_entry), md->region->entry_size,
687 			  persist_entry_cb, ctx);
688 	if (spdk_unlikely(rc)) {
689 		if (rc == -ENOMEM) {
690 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(md->region->bdev_desc);
691 			ctx->bdev_io_wait.bdev = bdev;
692 			ctx->bdev_io_wait.cb_fn = retry_fn;
693 			ctx->bdev_io_wait.cb_arg = ctx;
694 			spdk_bdev_queue_io_wait(bdev, md->region->ioch, &ctx->bdev_io_wait);
695 		} else {
696 			ftl_abort();
697 		}
698 	}
699 
700 	return rc;
701 }
702 
703 static void
704 ftl_md_persist_entry_mirror(void *_ctx)
705 {
706 	struct ftl_md_io_entry_ctx *ctx = _ctx;
707 
708 	ftl_md_persist_entry_write_blocks(ctx, ctx->md->mirror, ftl_md_persist_entry_mirror);
709 }
710 
711 static void
712 ftl_md_persist_entry_primary(void *_ctx)
713 {
714 	struct ftl_md_io_entry_ctx *ctx = _ctx;
715 	struct ftl_md *md = ctx->md;
716 	int rc;
717 
718 	rc = ftl_md_persist_entry_write_blocks(ctx, md, ftl_md_persist_entry_primary);
719 
720 	if (!rc && has_mirror(md)) {
721 		assert(md->region->entry_size == md->mirror->region->entry_size);
722 
723 		/* The MD object has mirror so execute persist on it too */
724 		ftl_md_persist_entry_mirror(ctx);
725 		ctx->remaining++;
726 	}
727 }
728 
729 static void
730 _ftl_md_persist_entry(struct ftl_md_io_entry_ctx *ctx)
731 {
732 	ctx->status = 0;
733 	ctx->remaining = 1;
734 
735 	/* First execute an IO to the primary region */
736 	ftl_md_persist_entry_primary(ctx);
737 }
738 
739 void
740 ftl_md_persist_entry(struct ftl_md *md, uint64_t start_entry, void *buffer, void *vss_buffer,
741 		     ftl_md_io_entry_cb cb, void *cb_arg,
742 		     struct ftl_md_io_entry_ctx *ctx)
743 {
744 	if (spdk_unlikely(0 == md->region->entry_size)) {
745 		/* This MD has not been configured to support persist entry call */
746 		ftl_abort();
747 	}
748 
749 	/* Initialize persist entry context */
750 	ctx->cb = cb;
751 	ctx->cb_arg = cb_arg;
752 	ctx->md = md;
753 	ctx->start_entry = start_entry;
754 	ctx->buffer = buffer;
755 	ctx->vss_buffer = vss_buffer ? : md->entry_vss_dma_buf;
756 
757 	_ftl_md_persist_entry(ctx);
758 }
759 
760 static void
761 read_entry_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
762 {
763 	struct ftl_md_io_entry_ctx *ctx = cb_arg;
764 	struct ftl_md *md = ctx->md;
765 
766 	spdk_bdev_free_io(bdev_io);
767 
768 	if (!success) {
769 		if (has_mirror(md)) {
770 			if (setup_mirror(md)) {
771 				/* An error when setup the mirror */
772 				ctx->status = -EIO;
773 				goto finish_io;
774 			}
775 
776 			/* First read from the mirror */
777 			ftl_md_read_entry(md->mirror, ctx->start_entry, ctx->buffer, ctx->vss_buffer,
778 					  ctx->cb, ctx->cb_arg,
779 					  ctx);
780 			return;
781 		} else {
782 			ctx->status = -EIO;
783 			goto finish_io;
784 		}
785 	}
786 
787 finish_io:
788 	ctx->cb(ctx->status, ctx->cb_arg);
789 }
790 
791 static void
792 ftl_md_read_entry_read_blocks(struct ftl_md_io_entry_ctx *ctx, struct ftl_md *md,
793 			      spdk_bdev_io_wait_cb retry_fn)
794 {
795 	int rc;
796 
797 	rc = read_blocks(md->dev, md->region->bdev_desc, md->region->ioch,
798 			 ctx->buffer, ctx->vss_buffer,
799 			 persist_entry_lba(md, ctx->start_entry), md->region->entry_size,
800 			 read_entry_cb, ctx);
801 
802 	if (spdk_unlikely(rc)) {
803 		if (rc == -ENOMEM) {
804 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(md->region->bdev_desc);
805 			ctx->bdev_io_wait.bdev = bdev;
806 			ctx->bdev_io_wait.cb_fn = retry_fn;
807 			ctx->bdev_io_wait.cb_arg = ctx;
808 			spdk_bdev_queue_io_wait(bdev, md->region->ioch, &ctx->bdev_io_wait);
809 		} else {
810 			ftl_abort();
811 		}
812 	}
813 }
814 
815 static void
816 _ftl_md_read_entry(void *_ctx)
817 {
818 	struct ftl_md_io_entry_ctx *ctx = _ctx;
819 
820 	ftl_md_read_entry_read_blocks(ctx, ctx->md, _ftl_md_read_entry);
821 }
822 
823 void
824 ftl_md_read_entry(struct ftl_md *md, uint64_t start_entry, void *buffer, void *vss_buffer,
825 		  ftl_md_io_entry_cb cb, void *cb_arg,
826 		  struct ftl_md_io_entry_ctx *ctx)
827 {
828 	if (spdk_unlikely(0 == md->region->entry_size)) {
829 		/* This MD has not been configured to support read entry call */
830 		ftl_abort();
831 	}
832 
833 	ctx->cb = cb;
834 	ctx->cb_arg = cb_arg;
835 	ctx->md = md;
836 	ctx->start_entry = start_entry;
837 	ctx->buffer = buffer;
838 	ctx->vss_buffer = vss_buffer;
839 
840 	_ftl_md_read_entry(ctx);
841 }
842 
843 void
844 ftl_md_persist_entry_retry(struct ftl_md_io_entry_ctx *ctx)
845 {
846 	_ftl_md_persist_entry(ctx);
847 }
848 
849 static void
850 persist_mirror_cb(struct spdk_ftl_dev *dev, struct ftl_md *md, int status)
851 {
852 	struct ftl_md *primary = md->owner.private;
853 
854 	if (status) {
855 		/* We got an error, stop persist procedure immediately */
856 		primary->io.status = status;
857 		io_done(primary);
858 	} else {
859 		/* Now continue the persist procedure on the primary MD object */
860 		if (0 == io_init(primary, FTL_MD_OP_PERSIST)) {
861 			io_submit(primary);
862 		} else {
863 			spdk_thread_send_msg(spdk_get_thread(), exception, primary);
864 		}
865 	}
866 }
867 
868 void
869 ftl_md_persist(struct ftl_md *md)
870 {
871 	if (has_mirror(md)) {
872 		if (setup_mirror(md)) {
873 			/* An error when setup the mirror */
874 			spdk_thread_send_msg(spdk_get_thread(), exception, md);
875 			return;
876 		}
877 
878 		/* Set callback and context in mirror */
879 		md->mirror->cb = persist_mirror_cb;
880 		md->mirror->owner.private = md;
881 
882 		/* First persist the mirror */
883 		ftl_md_persist(md->mirror);
884 		return;
885 	}
886 
887 	if (0 == io_init(md, FTL_MD_OP_PERSIST)) {
888 		io_submit(md);
889 	} else {
890 		spdk_thread_send_msg(spdk_get_thread(), exception, md);
891 	}
892 }
893 
894 static void
895 restore_mirror_cb(struct spdk_ftl_dev *dev, struct ftl_md *md, int status)
896 {
897 	struct ftl_md *primary = md->owner.private;
898 
899 	if (status) {
900 		/* Cannot restore the object from the mirror too, mark error and fail */
901 		primary->io.status = -EIO;
902 		io_done(primary);
903 	} else {
904 		/*
905 		 * Restoring from the mirror successful. Synchronize mirror to the primary.
906 		 * Because we read MD content from the mirror, we can disable it, only the primary
907 		 * requires persisting.
908 		 */
909 		primary->io.status = 0;
910 		primary->mirror_enabled = false;
911 		io_cleanup(primary);
912 		ftl_md_persist(primary);
913 		primary->mirror_enabled = true;
914 	}
915 }
916 
917 static void
918 restore_sync_cb(struct spdk_ftl_dev *dev, struct ftl_md *md, int status)
919 {
920 	struct ftl_md *primary = md->owner.private;
921 
922 	if (status) {
923 		/* Cannot sync the object from the primary to the mirror, mark error and fail */
924 		primary->io.status = -EIO;
925 		io_done(primary);
926 	} else {
927 		primary->cb(dev, primary, primary->io.status);
928 		io_cleanup(primary);
929 	}
930 }
931 
932 static int
933 restore_done(struct ftl_md *md)
934 {
935 	if (-EAGAIN == md->io.status) {
936 		/* Failed to read MD from primary region, try it from mirror.
937 		 * At the moment read the mirror entirely, (TODO) in the
938 		 * feature we can restore from primary and mirror region
939 		 * with finer granularity.
940 		 */
941 
942 		if (has_mirror(md)) {
943 			if (setup_mirror(md)) {
944 				/* An error when setup the mirror */
945 				return -EIO;
946 			}
947 
948 			/* Set callback and context in mirror */
949 			md->mirror->cb = restore_mirror_cb;
950 			md->mirror->owner.private = md;
951 
952 			/* First persist the mirror */
953 			ftl_md_restore(md->mirror);
954 			return -EAGAIN;
955 		} else {
956 			return -EIO;
957 		}
958 	} else if (0 == md->io.status && false == md->dev->sb->clean) {
959 		if (has_mirror(md)) {
960 			/* There was a dirty shutdown, synchronize primary to mirror */
961 
962 			/* Set callback and context in the mirror */
963 			md->mirror->cb = restore_sync_cb;
964 			md->mirror->owner.private = md;
965 
966 			/* First persist the mirror */
967 			ftl_md_persist(md->mirror);
968 			return -EAGAIN;
969 		}
970 	}
971 
972 	return md->io.status;
973 }
974 
975 static void
976 io_done(struct ftl_md *md)
977 {
978 	int status;
979 
980 	if (md->io.op == FTL_MD_OP_RESTORE) {
981 		status = restore_done(md);
982 	} else {
983 		status = md->io.status;
984 	}
985 
986 	if (status != -EAGAIN) {
987 		md->cb(md->dev, md, status);
988 		io_cleanup(md);
989 	}
990 }
991 
992 void
993 ftl_md_restore(struct ftl_md *md)
994 {
995 	if (0 == io_init(md, FTL_MD_OP_RESTORE)) {
996 		io_submit(md);
997 	} else {
998 		spdk_thread_send_msg(spdk_get_thread(), exception, md);
999 	}
1000 }
1001 
1002 static int
1003 pattern_prepare(struct ftl_md *md,
1004 		int data_pattern, union ftl_md_vss *vss_pattern)
1005 {
1006 	void *data = md->io.data;
1007 	uint64_t data_size = xfer_size(md);
1008 
1009 	memset(data, data_pattern, data_size);
1010 
1011 	if (md->io.md) {
1012 		if (vss_pattern) {
1013 			/* store the VSS pattern... */
1014 			ftl_md_vss_buf_init(md->io.md, ftl_md_xfer_blocks(md->dev), vss_pattern);
1015 		} else {
1016 			/* ...or default init VSS to 0 */
1017 			union ftl_md_vss vss = {0};
1018 
1019 			vss.version.md_version = md->region->current.version;
1020 			ftl_md_vss_buf_init(md->io.md, ftl_md_xfer_blocks(md->dev), &vss);
1021 		}
1022 	}
1023 
1024 	return 0;
1025 }
1026 
1027 static void
1028 clear_mirror_cb(struct spdk_ftl_dev *dev, struct ftl_md *secondary, int status)
1029 {
1030 	struct ftl_md *primary = secondary->owner.private;
1031 
1032 	if (status) {
1033 		/* We got an error, stop persist procedure immediately */
1034 		primary->io.status = status;
1035 		io_done(primary);
1036 	} else {
1037 		/* Now continue the persist procedure on the primary MD object */
1038 		if (0 == io_init(primary, FTL_MD_OP_CLEAR) &&
1039 		    0 == pattern_prepare(primary, *(int *)secondary->io.data,
1040 					 secondary->io.md)) {
1041 			io_submit(primary);
1042 		} else {
1043 			spdk_thread_send_msg(spdk_get_thread(), exception, primary);
1044 		}
1045 	}
1046 }
1047 
1048 void
1049 ftl_md_clear(struct ftl_md *md, int data_pattern, union ftl_md_vss *vss_pattern)
1050 {
1051 	if (has_mirror(md)) {
1052 		if (setup_mirror(md)) {
1053 			/* An error when setup the mirror */
1054 			spdk_thread_send_msg(spdk_get_thread(), exception, md);
1055 			return;
1056 		}
1057 
1058 		/* Set callback and context in mirror */
1059 		md->mirror->cb = clear_mirror_cb;
1060 		md->mirror->owner.private = md;
1061 
1062 		/* First persist the mirror */
1063 		ftl_md_clear(md->mirror, data_pattern, vss_pattern);
1064 		return;
1065 	}
1066 
1067 	if (0 == io_init(md, FTL_MD_OP_CLEAR) && 0 == pattern_prepare(md, data_pattern, vss_pattern)) {
1068 		io_submit(md);
1069 	} else {
1070 		spdk_thread_send_msg(spdk_get_thread(), exception, md);
1071 	}
1072 }
1073 
1074 const struct ftl_layout_region *
1075 ftl_md_get_region(struct ftl_md *md)
1076 {
1077 	return md->region;
1078 }
1079 
1080 int
1081 ftl_md_set_region(struct ftl_md *md,
1082 		  const struct ftl_layout_region *region)
1083 {
1084 	assert(region->current.blocks <= md->data_blocks);
1085 	md->region = region;
1086 
1087 	if (md->vss_data) {
1088 		union ftl_md_vss vss = {0};
1089 		vss.version.md_version = region->current.version;
1090 		ftl_md_vss_buf_init(md->vss_data, md->data_blocks, &vss);
1091 		if (region->entry_size) {
1092 			assert(md->entry_vss_dma_buf);
1093 			ftl_md_vss_buf_init(md->entry_vss_dma_buf, region->entry_size, &vss);
1094 		}
1095 	}
1096 
1097 	if (has_mirror(md)) {
1098 		return setup_mirror(md);
1099 	}
1100 
1101 	return 0;
1102 }
1103