xref: /spdk/lib/ftl/utils/ftl_md.c (revision fa09c9ac9b0ce66dcacb64b02aae778014b6c2b3)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/env.h"
7 #include "spdk/bdev_module.h"
8 
9 #include "ftl_core.h"
10 #include "ftl_md.h"
11 #include "ftl_nv_cache_io.h"
12 
13 struct ftl_md;
14 static void io_submit(struct ftl_md *md);
15 static void io_done(struct ftl_md *md);
16 
17 static bool
18 has_mirror(struct ftl_md *md)
19 {
20 	if (md->region) {
21 		if (md->region->mirror_type != FTL_LAYOUT_REGION_TYPE_INVALID) {
22 			return md->mirror_enabled;
23 		}
24 	}
25 
26 	return false;
27 }
28 
29 static int
30 setup_mirror(struct ftl_md *md)
31 {
32 	if (!md->mirror) {
33 		md->mirror = calloc(1, sizeof(*md->mirror));
34 		if (!md->mirror) {
35 			return -ENOMEM;
36 		}
37 		md->mirror_enabled = true;
38 	}
39 
40 	md->mirror->dev = md->dev;
41 	md->mirror->data_blocks = md->data_blocks;
42 	md->mirror->data = md->data;
43 	md->mirror->vss_data = md->vss_data;
44 
45 	/* Set proper region in secondary object */
46 	assert(md->region->mirror_type != FTL_LAYOUT_REGION_TYPE_INVALID);
47 	md->mirror->region = &md->dev->layout.region[md->region->mirror_type];
48 
49 	return 0;
50 }
51 
52 uint64_t
53 ftl_md_xfer_blocks(struct spdk_ftl_dev *dev)
54 {
55 	return 4ULL * dev->xfer_size;
56 }
57 
58 static uint64_t
59 xfer_size(struct ftl_md *md)
60 {
61 	return ftl_md_xfer_blocks(md->dev) * FTL_BLOCK_SIZE;
62 }
63 
64 struct ftl_md *ftl_md_create(struct spdk_ftl_dev *dev, uint64_t blocks,
65 			     uint64_t vss_blksz, const char *name, bool no_mem,
66 			     const struct ftl_layout_region *region)
67 {
68 	struct ftl_md *md;
69 
70 	md = calloc(1, sizeof(*md));
71 	if (!md) {
72 		return NULL;
73 	}
74 	md->dev = dev;
75 	md->data_blocks = blocks;
76 	md->mirror_enabled = true;
77 
78 	if (!no_mem) {
79 		size_t buf_size = md->data_blocks * (FTL_BLOCK_SIZE + vss_blksz);
80 		int ret;
81 
82 		ret = posix_memalign((void **)&md->data, FTL_BLOCK_SIZE, buf_size);
83 		if (ret) {
84 			free(md);
85 			return NULL;
86 		}
87 		memset(md->data, 0, buf_size);
88 
89 		if (vss_blksz) {
90 			md->vss_data = ((char *)md->data) + md->data_blocks * FTL_BLOCK_SIZE;
91 		}
92 	}
93 
94 	if (region) {
95 		size_t entry_vss_buf_size = vss_blksz * region->entry_size;
96 
97 		if (entry_vss_buf_size) {
98 			md->entry_vss_dma_buf = spdk_malloc(entry_vss_buf_size, FTL_BLOCK_SIZE,
99 							    NULL, SPDK_ENV_LCORE_ID_ANY,
100 							    SPDK_MALLOC_DMA);
101 			if (!md->entry_vss_dma_buf) {
102 				goto err;
103 			}
104 		}
105 
106 		if (ftl_md_set_region(md, region)) {
107 			goto err;
108 		}
109 	}
110 
111 	return md;
112 err:
113 	ftl_md_destroy(md);
114 	return NULL;
115 }
116 
117 void
118 ftl_md_destroy(struct ftl_md *md)
119 {
120 	if (!md) {
121 		return;
122 	}
123 
124 	ftl_md_free_buf(md);
125 
126 	spdk_free(md->entry_vss_dma_buf);
127 
128 	free(md->mirror);
129 	free(md);
130 }
131 
132 void
133 ftl_md_free_buf(struct ftl_md *md)
134 {
135 	if (!md) {
136 		return;
137 	}
138 
139 	if (md->data) {
140 		free(md->data);
141 		md->data = NULL;
142 		md->vss_data = NULL;
143 	}
144 }
145 
146 void *
147 ftl_md_get_buffer(struct ftl_md *md)
148 {
149 	return md->data;
150 }
151 
152 uint64_t
153 ftl_md_get_buffer_size(struct ftl_md *md)
154 {
155 	return md->data_blocks * FTL_BLOCK_SIZE;
156 }
157 
158 static void
159 ftl_md_vss_buf_init(union ftl_md_vss *buf, uint32_t count,
160 		    const union ftl_md_vss *vss_pattern)
161 {
162 	while (count) {
163 		count--;
164 		buf[count] = *vss_pattern;
165 	}
166 }
167 
168 union ftl_md_vss *ftl_md_vss_buf_alloc(struct ftl_layout_region *region, uint32_t count)
169 {
170 	union ftl_md_vss *buf = spdk_zmalloc(count * FTL_MD_VSS_SZ, FTL_BLOCK_SIZE, NULL,
171 						     SPDK_ENV_LCORE_ID_ANY,
172 						     SPDK_MALLOC_DMA);
173 
174 	if (!buf) {
175 		return NULL;
176 	}
177 
178 	union ftl_md_vss vss_buf = {0};
179 	vss_buf.version.md_version = region->current.version;
180 	ftl_md_vss_buf_init(buf, count, &vss_buf);
181 	return buf;
182 }
183 
184 union ftl_md_vss *ftl_md_get_vss_buffer(struct ftl_md *md)
185 {
186 	return md->vss_data;
187 }
188 
189 static void
190 io_cleanup(struct ftl_md *md)
191 {
192 	spdk_dma_free(md->io.data);
193 	md->io.data = NULL;
194 
195 	spdk_dma_free(md->io.md);
196 	md->io.md = NULL;
197 }
198 
199 static void
200 exception(void *arg)
201 {
202 	struct ftl_md *md = arg;
203 
204 	md->cb(md->dev, md, -EINVAL);
205 	io_cleanup(md);
206 }
207 
208 static void
209 audit_md_vss_version(struct ftl_md *md, uint64_t blocks)
210 {
211 #if defined(DEBUG)
212 	union ftl_md_vss *vss = md->io.md;
213 	while (blocks) {
214 		blocks--;
215 		assert(vss[blocks].version.md_version == md->region->current.version);
216 	}
217 #endif
218 }
219 
220 static void
221 read_write_blocks_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
222 {
223 	struct ftl_md *md = arg;
224 
225 	if (spdk_unlikely(!success)) {
226 		if (md->io.op == FTL_MD_OP_RESTORE && has_mirror(md)) {
227 			md->io.status = -EAGAIN;
228 		} else {
229 			md->io.status = -EIO;
230 		}
231 	} else {
232 		uint64_t blocks = bdev_io->u.bdev.num_blocks;
233 		uint64_t size = blocks * FTL_BLOCK_SIZE;
234 
235 		if (md->io.op == FTL_MD_OP_RESTORE) {
236 			memcpy(md->data + md->io.data_offset, md->io.data, size);
237 
238 			if (md->vss_data) {
239 				uint64_t vss_offset = md->io.data_offset / FTL_BLOCK_SIZE;
240 				vss_offset *= FTL_MD_VSS_SZ;
241 				audit_md_vss_version(md, blocks);
242 				memcpy(md->vss_data + vss_offset, md->io.md, blocks * FTL_MD_VSS_SZ);
243 			}
244 		}
245 
246 		md->io.address += blocks;
247 		md->io.remaining -= blocks;
248 		md->io.data_offset += size;
249 	}
250 
251 	spdk_bdev_free_io(bdev_io);
252 
253 	io_submit(md);
254 }
255 
256 static inline int
257 read_blocks(struct spdk_ftl_dev *dev, struct spdk_bdev_desc *desc,
258 	    struct spdk_io_channel *ch,
259 	    void *buf, void *md_buf,
260 	    uint64_t offset_blocks, uint64_t num_blocks,
261 	    spdk_bdev_io_completion_cb cb, void *cb_arg)
262 {
263 	if (desc == dev->cache_bdev_desc) {
264 		return ftl_nv_cache_bdev_read_blocks_with_md(dev, desc, ch, buf, md_buf,
265 				offset_blocks, num_blocks,
266 				cb, cb_arg);
267 	} else if (md_buf) {
268 		return spdk_bdev_read_blocks_with_md(desc, ch, buf, md_buf,
269 						     offset_blocks, num_blocks,
270 						     cb, cb_arg);
271 	} else {
272 		return spdk_bdev_read_blocks(desc, ch, buf,
273 					     offset_blocks, num_blocks,
274 					     cb, cb_arg);
275 	}
276 }
277 
278 static inline int
279 write_blocks(struct spdk_ftl_dev *dev, struct spdk_bdev_desc *desc,
280 	     struct spdk_io_channel *ch,
281 	     void *buf, void *md_buf,
282 	     uint64_t offset_blocks, uint64_t num_blocks,
283 	     spdk_bdev_io_completion_cb cb, void *cb_arg)
284 {
285 	if (desc == dev->cache_bdev_desc) {
286 		return ftl_nv_cache_bdev_write_blocks_with_md(dev, desc, ch, buf, md_buf,
287 				offset_blocks, num_blocks,
288 				cb, cb_arg);
289 	} else if (md_buf) {
290 		return spdk_bdev_write_blocks_with_md(desc, ch, buf, md_buf, offset_blocks,
291 						      num_blocks, cb, cb_arg);
292 	} else {
293 		return spdk_bdev_write_blocks(desc, ch, buf, offset_blocks, num_blocks, cb, cb_arg);
294 	}
295 }
296 
297 static void
298 read_write_blocks(void *_md)
299 {
300 	struct ftl_md *md = _md;
301 	const struct ftl_layout_region *region = md->region;
302 	uint64_t blocks;
303 	int rc = 0;
304 
305 	blocks = spdk_min(md->io.remaining, ftl_md_xfer_blocks(md->dev));
306 
307 	switch (md->io.op) {
308 	case FTL_MD_OP_RESTORE:
309 		rc = read_blocks(md->dev, region->bdev_desc, region->ioch,
310 				 md->io.data, md->io.md,
311 				 md->io.address, blocks,
312 				 read_write_blocks_cb, md);
313 		break;
314 	case FTL_MD_OP_PERSIST:
315 	case FTL_MD_OP_CLEAR:
316 		rc = write_blocks(md->dev, region->bdev_desc, region->ioch,
317 				  md->io.data, md->io.md,
318 				  md->io.address, blocks,
319 				  read_write_blocks_cb, md);
320 		break;
321 	default:
322 		ftl_abort();
323 	}
324 
325 	if (spdk_unlikely(rc)) {
326 		if (rc == -ENOMEM) {
327 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(region->bdev_desc);
328 			md->io.bdev_io_wait.bdev = bdev;
329 			md->io.bdev_io_wait.cb_fn = read_write_blocks;
330 			md->io.bdev_io_wait.cb_arg = md;
331 			spdk_bdev_queue_io_wait(bdev, region->ioch, &md->io.bdev_io_wait);
332 		} else {
333 			ftl_abort();
334 		}
335 	}
336 }
337 
338 static void
339 io_submit(struct ftl_md *md)
340 {
341 	if (!md->io.remaining || md->io.status) {
342 		io_done(md);
343 		return;
344 	}
345 
346 	if (md->io.op == FTL_MD_OP_PERSIST) {
347 		uint64_t blocks = spdk_min(md->io.remaining, ftl_md_xfer_blocks(md->dev));
348 
349 		memcpy(md->io.data, md->data + md->io.data_offset, FTL_BLOCK_SIZE * blocks);
350 
351 		if (md->vss_data) {
352 			uint64_t vss_offset = md->io.data_offset / FTL_BLOCK_SIZE;
353 			vss_offset *= FTL_MD_VSS_SZ;
354 			assert(md->io.md);
355 			memcpy(md->io.md, md->vss_data + vss_offset, FTL_MD_VSS_SZ * blocks);
356 			audit_md_vss_version(md, blocks);
357 		}
358 	}
359 #if defined(DEBUG)
360 	if (md->io.md && md->io.op == FTL_MD_OP_CLEAR) {
361 		uint64_t blocks = spdk_min(md->io.remaining, ftl_md_xfer_blocks(md->dev));
362 		audit_md_vss_version(md, blocks);
363 	}
364 #endif
365 
366 	read_write_blocks(md);
367 }
368 
369 static int
370 io_can_start(struct ftl_md *md)
371 {
372 	assert(NULL == md->io.data);
373 	if (NULL != md->io.data) {
374 		/* Outgoing IO on metadata */
375 		return -EINVAL;
376 	}
377 
378 	if (!md->region) {
379 		/* No device region to process data */
380 		return -EINVAL;
381 	}
382 
383 	if (md->region->current.blocks > md->data_blocks) {
384 		/* No device region to process data */
385 		FTL_ERRLOG(md->dev, "Blocks number mismatch between metadata object and"
386 			   "device region\n");
387 		return -EINVAL;
388 	}
389 
390 	return 0;
391 }
392 
393 static int
394 io_prepare(struct ftl_md *md, enum ftl_md_ops op)
395 {
396 	const struct ftl_layout_region *region = md->region;
397 	uint64_t data_size, meta_size = 0;
398 
399 	/* Allocates buffer for IO */
400 	data_size = xfer_size(md);
401 	md->io.data = spdk_zmalloc(data_size, FTL_BLOCK_SIZE, NULL,
402 				   SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
403 	if (!md->io.data) {
404 		return -ENOMEM;
405 	}
406 
407 	if (md->vss_data || md->region->vss_blksz) {
408 		meta_size = ftl_md_xfer_blocks(md->dev) * FTL_MD_VSS_SZ;
409 		md->io.md = spdk_zmalloc(meta_size, FTL_BLOCK_SIZE, NULL,
410 					 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
411 		if (!md->io.md) {
412 			spdk_dma_free(md->io.data);
413 			md->io.data = NULL;
414 			return -ENOMEM;
415 		}
416 	}
417 
418 	md->io.address = region->current.offset;
419 	md->io.remaining = region->current.blocks;
420 	md->io.data_offset = 0;
421 	md->io.status = 0;
422 	md->io.op = op;
423 
424 	return 0;
425 }
426 
427 static int
428 io_init(struct ftl_md *md, enum ftl_md_ops op)
429 {
430 	if (io_can_start(md)) {
431 		return -EINVAL;
432 	}
433 
434 	if (io_prepare(md, op)) {
435 		return -ENOMEM;
436 	}
437 
438 	return 0;
439 }
440 
441 static uint64_t
442 persist_entry_lba(struct ftl_md *md, uint64_t start_entry)
443 {
444 	return md->region->current.offset + start_entry * md->region->entry_size;
445 }
446 
447 static void
448 persist_entry_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
449 {
450 	struct ftl_md_io_entry_ctx *ctx = cb_arg;
451 
452 	spdk_bdev_free_io(bdev_io);
453 
454 	assert(ctx->remaining > 0);
455 	ctx->remaining--;
456 
457 	if (!success) {
458 		ctx->status = -EIO;
459 	}
460 
461 	if (!ctx->remaining) {
462 		ctx->cb(ctx->status, ctx->cb_arg);
463 	}
464 }
465 
466 static int
467 ftl_md_persist_entry_write_blocks(struct ftl_md_io_entry_ctx *ctx, struct ftl_md *md,
468 				  spdk_bdev_io_wait_cb retry_fn)
469 {
470 	int rc;
471 
472 	rc = write_blocks(md->dev, md->region->bdev_desc, md->region->ioch,
473 			  ctx->buffer, ctx->vss_buffer,
474 			  persist_entry_lba(md, ctx->start_entry), md->region->entry_size,
475 			  persist_entry_cb, ctx);
476 	if (spdk_unlikely(rc)) {
477 		if (rc == -ENOMEM) {
478 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(md->region->bdev_desc);
479 			ctx->bdev_io_wait.bdev = bdev;
480 			ctx->bdev_io_wait.cb_fn = retry_fn;
481 			ctx->bdev_io_wait.cb_arg = ctx;
482 			spdk_bdev_queue_io_wait(bdev, md->region->ioch, &ctx->bdev_io_wait);
483 		} else {
484 			ftl_abort();
485 		}
486 	}
487 
488 	return rc;
489 }
490 
491 static void
492 ftl_md_persist_entry_mirror(void *_ctx)
493 {
494 	struct ftl_md_io_entry_ctx *ctx = _ctx;
495 
496 	ftl_md_persist_entry_write_blocks(ctx, ctx->md->mirror, ftl_md_persist_entry_mirror);
497 }
498 
499 static void
500 ftl_md_persist_entry_primary(void *_ctx)
501 {
502 	struct ftl_md_io_entry_ctx *ctx = _ctx;
503 	struct ftl_md *md = ctx->md;
504 	int rc;
505 
506 	rc = ftl_md_persist_entry_write_blocks(ctx, md, ftl_md_persist_entry_primary);
507 
508 	if (!rc && has_mirror(md)) {
509 		assert(md->region->entry_size == md->mirror->region->entry_size);
510 
511 		/* The MD object has mirror so execute persist on it too */
512 		ftl_md_persist_entry_mirror(ctx);
513 		ctx->remaining++;
514 	}
515 }
516 
517 static void
518 _ftl_md_persist_entry(struct ftl_md_io_entry_ctx *ctx)
519 {
520 	ctx->status = 0;
521 	ctx->remaining = 1;
522 
523 	/* First execute an IO to the primary region */
524 	ftl_md_persist_entry_primary(ctx);
525 }
526 
527 void
528 ftl_md_persist_entry(struct ftl_md *md, uint64_t start_entry, void *buffer, void *vss_buffer,
529 		     ftl_md_io_entry_cb cb, void *cb_arg,
530 		     struct ftl_md_io_entry_ctx *ctx)
531 {
532 	if (spdk_unlikely(0 == md->region->entry_size)) {
533 		/* This MD has not been configured to support persist entry call */
534 		ftl_abort();
535 	}
536 
537 	/* Initialize persist entry context */
538 	ctx->cb = cb;
539 	ctx->cb_arg = cb_arg;
540 	ctx->md = md;
541 	ctx->start_entry = start_entry;
542 	ctx->buffer = buffer;
543 	ctx->vss_buffer = vss_buffer ? : md->entry_vss_dma_buf;
544 
545 	_ftl_md_persist_entry(ctx);
546 }
547 
548 static void
549 read_entry_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
550 {
551 	struct ftl_md_io_entry_ctx *ctx = cb_arg;
552 	struct ftl_md *md = ctx->md;
553 
554 	spdk_bdev_free_io(bdev_io);
555 
556 	if (!success) {
557 		if (has_mirror(md)) {
558 			if (setup_mirror(md)) {
559 				/* An error when setup the mirror */
560 				ctx->status = -EIO;
561 				goto finish_io;
562 			}
563 
564 			/* First read from the mirror */
565 			ftl_md_read_entry(md->mirror, ctx->start_entry, ctx->buffer, ctx->vss_buffer,
566 					  ctx->cb, ctx->cb_arg,
567 					  ctx);
568 			return;
569 		} else {
570 			ctx->status = -EIO;
571 			goto finish_io;
572 		}
573 	}
574 
575 finish_io:
576 	ctx->cb(ctx->status, ctx->cb_arg);
577 }
578 
579 static void
580 ftl_md_read_entry_read_blocks(struct ftl_md_io_entry_ctx *ctx, struct ftl_md *md,
581 			      spdk_bdev_io_wait_cb retry_fn)
582 {
583 	int rc;
584 
585 	rc = read_blocks(md->dev, md->region->bdev_desc, md->region->ioch,
586 			 ctx->buffer, ctx->vss_buffer,
587 			 persist_entry_lba(md, ctx->start_entry), md->region->entry_size,
588 			 read_entry_cb, ctx);
589 
590 	if (spdk_unlikely(rc)) {
591 		if (rc == -ENOMEM) {
592 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(md->region->bdev_desc);
593 			ctx->bdev_io_wait.bdev = bdev;
594 			ctx->bdev_io_wait.cb_fn = retry_fn;
595 			ctx->bdev_io_wait.cb_arg = ctx;
596 			spdk_bdev_queue_io_wait(bdev, md->region->ioch, &ctx->bdev_io_wait);
597 		} else {
598 			ftl_abort();
599 		}
600 	}
601 }
602 
603 static void
604 _ftl_md_read_entry(void *_ctx)
605 {
606 	struct ftl_md_io_entry_ctx *ctx = _ctx;
607 
608 	ftl_md_read_entry_read_blocks(ctx, ctx->md, _ftl_md_read_entry);
609 }
610 
611 void
612 ftl_md_read_entry(struct ftl_md *md, uint64_t start_entry, void *buffer, void *vss_buffer,
613 		  ftl_md_io_entry_cb cb, void *cb_arg,
614 		  struct ftl_md_io_entry_ctx *ctx)
615 {
616 	if (spdk_unlikely(0 == md->region->entry_size)) {
617 		/* This MD has not been configured to support read entry call */
618 		ftl_abort();
619 	}
620 
621 	ctx->cb = cb;
622 	ctx->cb_arg = cb_arg;
623 	ctx->md = md;
624 	ctx->start_entry = start_entry;
625 	ctx->buffer = buffer;
626 	ctx->vss_buffer = vss_buffer;
627 
628 	_ftl_md_read_entry(ctx);
629 }
630 
631 void
632 ftl_md_persist_entry_retry(struct ftl_md_io_entry_ctx *ctx)
633 {
634 	_ftl_md_persist_entry(ctx);
635 }
636 
637 static void
638 persist_mirror_cb(struct spdk_ftl_dev *dev, struct ftl_md *md, int status)
639 {
640 	struct ftl_md *primary = md->owner.private;
641 
642 	if (status) {
643 		/* We got an error, stop persist procedure immediately */
644 		primary->io.status = status;
645 		io_done(primary);
646 	} else {
647 		/* Now continue the persist procedure on the primary MD object */
648 		if (0 == io_init(primary, FTL_MD_OP_PERSIST)) {
649 			io_submit(primary);
650 		} else {
651 			spdk_thread_send_msg(spdk_get_thread(), exception, primary);
652 		}
653 	}
654 }
655 
656 void
657 ftl_md_persist(struct ftl_md *md)
658 {
659 	if (has_mirror(md)) {
660 		if (setup_mirror(md)) {
661 			/* An error when setup the mirror */
662 			spdk_thread_send_msg(spdk_get_thread(), exception, md);
663 			return;
664 		}
665 
666 		/* Set callback and context in mirror */
667 		md->mirror->cb = persist_mirror_cb;
668 		md->mirror->owner.private = md;
669 
670 		/* First persist the mirror */
671 		ftl_md_persist(md->mirror);
672 		return;
673 	}
674 
675 	if (0 == io_init(md, FTL_MD_OP_PERSIST)) {
676 		io_submit(md);
677 	} else {
678 		spdk_thread_send_msg(spdk_get_thread(), exception, md);
679 	}
680 }
681 
682 static void
683 restore_mirror_cb(struct spdk_ftl_dev *dev, struct ftl_md *md, int status)
684 {
685 	struct ftl_md *primary = md->owner.private;
686 
687 	if (status) {
688 		/* Cannot restore the object from the mirror too, mark error and fail */
689 		primary->io.status = -EIO;
690 		io_done(primary);
691 	} else {
692 		/*
693 		 * Restoring from the mirror successful. Synchronize mirror to the primary.
694 		 * Because we read MD content from the mirror, we can disable it, only the primary
695 		 * requires persisting.
696 		 */
697 		primary->io.status = 0;
698 		primary->mirror_enabled = false;
699 		io_cleanup(primary);
700 		ftl_md_persist(primary);
701 		primary->mirror_enabled = true;
702 	}
703 }
704 
705 static void
706 restore_sync_cb(struct spdk_ftl_dev *dev, struct ftl_md *md, int status)
707 {
708 	struct ftl_md *primary = md->owner.private;
709 
710 	if (status) {
711 		/* Cannot sync the object from the primary to the mirror, mark error and fail */
712 		primary->io.status = -EIO;
713 		io_done(primary);
714 	} else {
715 		primary->cb(dev, primary, primary->io.status);
716 		io_cleanup(primary);
717 	}
718 }
719 
720 static int
721 restore_done(struct ftl_md *md)
722 {
723 	if (-EAGAIN == md->io.status) {
724 		/* Failed to read MD from primary region, try it from mirror.
725 		 * At the moment read the mirror entirely, (TODO) in the
726 		 * feature we can restore from primary and mirror region
727 		 * with finer granularity.
728 		 */
729 
730 		if (has_mirror(md)) {
731 			if (setup_mirror(md)) {
732 				/* An error when setup the mirror */
733 				return -EIO;
734 			}
735 
736 			/* Set callback and context in mirror */
737 			md->mirror->cb = restore_mirror_cb;
738 			md->mirror->owner.private = md;
739 
740 			/* First persist the mirror */
741 			ftl_md_restore(md->mirror);
742 			return -EAGAIN;
743 		} else {
744 			return -EIO;
745 		}
746 	} else if (0 == md->io.status && false == md->dev->sb->clean) {
747 		if (has_mirror(md)) {
748 			/* There was a dirty shutdown, synchronize primary to mirror */
749 
750 			/* Set callback and context in the mirror */
751 			md->mirror->cb = restore_sync_cb;
752 			md->mirror->owner.private = md;
753 
754 			/* First persist the mirror */
755 			ftl_md_persist(md->mirror);
756 			return -EAGAIN;
757 		}
758 	}
759 
760 	return md->io.status;
761 }
762 
763 static void
764 io_done(struct ftl_md *md)
765 {
766 	int status;
767 
768 	if (md->io.op == FTL_MD_OP_RESTORE) {
769 		status = restore_done(md);
770 	} else {
771 		status = md->io.status;
772 	}
773 
774 	if (status != -EAGAIN) {
775 		md->cb(md->dev, md, status);
776 		io_cleanup(md);
777 	}
778 }
779 
780 void
781 ftl_md_restore(struct ftl_md *md)
782 {
783 	if (0 == io_init(md, FTL_MD_OP_RESTORE)) {
784 		io_submit(md);
785 	} else {
786 		spdk_thread_send_msg(spdk_get_thread(), exception, md);
787 	}
788 }
789 
790 static int
791 pattern_prepare(struct ftl_md *md,
792 		int data_pattern, union ftl_md_vss *vss_pattern)
793 {
794 	void *data = md->io.data;
795 	uint64_t data_size = xfer_size(md);
796 
797 	memset(data, data_pattern, data_size);
798 
799 	if (md->io.md) {
800 		if (vss_pattern) {
801 			/* store the VSS pattern... */
802 			ftl_md_vss_buf_init(md->io.md, ftl_md_xfer_blocks(md->dev), vss_pattern);
803 		} else {
804 			/* ...or default init VSS to 0 */
805 			union ftl_md_vss vss = {0};
806 
807 			vss.version.md_version = md->region->current.version;
808 			ftl_md_vss_buf_init(md->io.md, ftl_md_xfer_blocks(md->dev), &vss);
809 		}
810 	}
811 
812 	return 0;
813 }
814 
815 static void
816 clear_mirror_cb(struct spdk_ftl_dev *dev, struct ftl_md *secondary, int status)
817 {
818 	struct ftl_md *primary = secondary->owner.private;
819 
820 	if (status) {
821 		/* We got an error, stop persist procedure immediately */
822 		primary->io.status = status;
823 		io_done(primary);
824 	} else {
825 		/* Now continue the persist procedure on the primary MD object */
826 		if (0 == io_init(primary, FTL_MD_OP_CLEAR) &&
827 		    0 == pattern_prepare(primary, *(int *)secondary->io.data,
828 					 secondary->io.md)) {
829 			io_submit(primary);
830 		} else {
831 			spdk_thread_send_msg(spdk_get_thread(), exception, primary);
832 		}
833 	}
834 }
835 
836 void
837 ftl_md_clear(struct ftl_md *md, int data_pattern, union ftl_md_vss *vss_pattern)
838 {
839 	if (has_mirror(md)) {
840 		if (setup_mirror(md)) {
841 			/* An error when setup the mirror */
842 			spdk_thread_send_msg(spdk_get_thread(), exception, md);
843 			return;
844 		}
845 
846 		/* Set callback and context in mirror */
847 		md->mirror->cb = clear_mirror_cb;
848 		md->mirror->owner.private = md;
849 
850 		/* First persist the mirror */
851 		ftl_md_clear(md->mirror, data_pattern, vss_pattern);
852 		return;
853 	}
854 
855 	if (0 == io_init(md, FTL_MD_OP_CLEAR) && 0 == pattern_prepare(md, data_pattern, vss_pattern)) {
856 		io_submit(md);
857 	} else {
858 		spdk_thread_send_msg(spdk_get_thread(), exception, md);
859 	}
860 }
861 
862 const struct ftl_layout_region *
863 ftl_md_get_region(struct ftl_md *md)
864 {
865 	return md->region;
866 }
867 
868 int
869 ftl_md_set_region(struct ftl_md *md,
870 		  const struct ftl_layout_region *region)
871 {
872 	assert(region->current.blocks <= md->data_blocks);
873 	md->region = region;
874 
875 	if (md->vss_data) {
876 		union ftl_md_vss vss = {0};
877 		vss.version.md_version = region->current.version;
878 		ftl_md_vss_buf_init(md->vss_data, md->data_blocks, &vss);
879 		if (region->entry_size) {
880 			assert(md->entry_vss_dma_buf);
881 			ftl_md_vss_buf_init(md->entry_vss_dma_buf, region->entry_size, &vss);
882 		}
883 	}
884 
885 	if (has_mirror(md)) {
886 		return setup_mirror(md);
887 	}
888 
889 	return 0;
890 }
891