xref: /spdk/lib/ftl/ftl_nv_cache.c (revision d4d015a572e1af7b2818e44218c1e661a61545ec)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2022 Intel Corporation.
3  *   Copyright 2023 Solidigm All Rights Reserved
4  *   All rights reserved.
5  */
6 
7 #include "spdk/bdev.h"
8 #include "spdk/bdev_module.h"
9 #include "spdk/ftl.h"
10 #include "spdk/string.h"
11 
12 #include "ftl_nv_cache.h"
13 #include "ftl_nv_cache_io.h"
14 #include "ftl_core.h"
15 #include "ftl_band.h"
16 #include "utils/ftl_addr_utils.h"
17 #include "utils/ftl_defs.h"
18 #include "mngt/ftl_mngt.h"
19 
20 static inline uint64_t nvc_data_blocks(struct ftl_nv_cache *nv_cache) __attribute__((unused));
21 static struct ftl_nv_cache_compactor *compactor_alloc(struct spdk_ftl_dev *dev);
22 static void compactor_free(struct spdk_ftl_dev *dev, struct ftl_nv_cache_compactor *compactor);
23 static void compaction_process_ftl_done(struct ftl_rq *rq);
24 static void compaction_process_read_entry(void *arg);
25 static void ftl_property_dump_cache_dev(struct spdk_ftl_dev *dev,
26 					const struct ftl_property *property,
27 					struct spdk_json_write_ctx *w);
28 
29 static inline void
30 nvc_validate_md(struct ftl_nv_cache *nv_cache,
31 		struct ftl_nv_cache_chunk_md *chunk_md)
32 {
33 	struct ftl_md *md = nv_cache->md;
34 	void *buffer = ftl_md_get_buffer(md);
35 	uint64_t size = ftl_md_get_buffer_size(md);
36 	void *ptr = chunk_md;
37 
38 	if (ptr < buffer) {
39 		ftl_abort();
40 	}
41 
42 	ptr += sizeof(*chunk_md);
43 	if (ptr > buffer + size) {
44 		ftl_abort();
45 	}
46 }
47 
48 static inline uint64_t
49 nvc_data_offset(struct ftl_nv_cache *nv_cache)
50 {
51 	return 0;
52 }
53 
54 static inline uint64_t
55 nvc_data_blocks(struct ftl_nv_cache *nv_cache)
56 {
57 	return nv_cache->chunk_blocks * nv_cache->chunk_count;
58 }
59 
60 size_t
61 ftl_nv_cache_chunk_tail_md_num_blocks(const struct ftl_nv_cache *nv_cache)
62 {
63 	struct spdk_ftl_dev *dev =  SPDK_CONTAINEROF(nv_cache,
64 				    struct spdk_ftl_dev, nv_cache);
65 	return spdk_divide_round_up(dev->layout.nvc.chunk_data_blocks * dev->layout.l2p.addr_size,
66 				    FTL_BLOCK_SIZE);
67 }
68 
69 static size_t
70 nv_cache_p2l_map_pool_elem_size(const struct ftl_nv_cache *nv_cache)
71 {
72 	/* Map pool element holds the whole tail md */
73 	return nv_cache->tail_md_chunk_blocks * FTL_BLOCK_SIZE;
74 }
75 
76 static uint64_t
77 get_chunk_idx(struct ftl_nv_cache_chunk *chunk)
78 {
79 	struct ftl_nv_cache_chunk *first_chunk = chunk->nv_cache->chunks;
80 
81 	return (chunk->offset - first_chunk->offset) / chunk->nv_cache->chunk_blocks;
82 }
83 
84 static void
85 ftl_nv_cache_init_update_limits(struct spdk_ftl_dev *dev)
86 {
87 	struct ftl_nv_cache *nvc = &dev->nv_cache;
88 	uint64_t usable_chunks = nvc->chunk_count - nvc->chunk_inactive_count;
89 
90 	/* Start compaction when full chunks exceed given % of entire active chunks */
91 	nvc->chunk_compaction_threshold = usable_chunks *
92 					  dev->conf.nv_cache.chunk_compaction_threshold /
93 					  100;
94 
95 	nvc->throttle.interval_tsc = FTL_NV_CACHE_THROTTLE_INTERVAL_MS *
96 				     (spdk_get_ticks_hz() / 1000);
97 
98 	nvc->chunk_free_target = spdk_divide_round_up(usable_chunks *
99 				 dev->conf.nv_cache.chunk_free_target,
100 				 100);
101 }
102 
103 struct nvc_scrub_ctx {
104 	uint64_t chunk_no;
105 	nvc_scrub_cb cb;
106 	void *cb_ctx;
107 
108 	struct ftl_layout_region reg_chunk;
109 	struct ftl_md *md_chunk;
110 };
111 
112 static int
113 nvc_scrub_find_next_chunk(struct spdk_ftl_dev *dev, struct nvc_scrub_ctx *scrub_ctx)
114 {
115 	while (scrub_ctx->chunk_no < dev->layout.nvc.chunk_count) {
116 		if (dev->nv_cache.nvc_type->ops.is_chunk_active(dev, scrub_ctx->reg_chunk.current.offset)) {
117 			return 0;
118 		}
119 
120 		/* Move the dummy region along with the active chunk */
121 		scrub_ctx->reg_chunk.current.offset += dev->layout.nvc.chunk_data_blocks;
122 		scrub_ctx->chunk_no++;
123 	}
124 	return -ENOENT;
125 }
126 
127 static void
128 nvc_scrub_clear_cb(struct spdk_ftl_dev *dev, struct ftl_md *md, int status)
129 {
130 	struct nvc_scrub_ctx *scrub_ctx = md->owner.cb_ctx;
131 	union ftl_md_vss vss;
132 
133 	/* Move to the next chunk */
134 	scrub_ctx->chunk_no++;
135 	scrub_ctx->reg_chunk.current.offset += dev->layout.nvc.chunk_data_blocks;
136 
137 	FTL_DEBUGLOG(dev, "Scrub progress: %"PRIu64"/%"PRIu64" chunks\n",
138 		     scrub_ctx->chunk_no, dev->layout.nvc.chunk_count);
139 
140 	if (status || nvc_scrub_find_next_chunk(dev, scrub_ctx)) {
141 		/* IO error or no more active chunks found. Scrubbing finished. */
142 		scrub_ctx->cb(dev, scrub_ctx->cb_ctx, status);
143 		ftl_md_destroy(scrub_ctx->md_chunk, 0);
144 		free(scrub_ctx);
145 		return;
146 	}
147 
148 	/* Scrub the next chunk */
149 	vss.version.md_version = 0;
150 	vss.nv_cache.lba = FTL_ADDR_INVALID;
151 
152 	scrub_ctx->md_chunk->cb = nvc_scrub_clear_cb;
153 	scrub_ctx->md_chunk->owner.cb_ctx = scrub_ctx;
154 
155 	ftl_md_clear(scrub_ctx->md_chunk, 0, &vss);
156 }
157 
158 void
159 ftl_nv_cache_scrub(struct spdk_ftl_dev *dev, nvc_scrub_cb cb, void *cb_ctx)
160 {
161 	struct nvc_scrub_ctx *scrub_ctx = calloc(1, sizeof(*scrub_ctx));
162 	union ftl_md_vss vss;
163 
164 	if (!scrub_ctx) {
165 		cb(dev, cb_ctx, -ENOMEM);
166 		return;
167 	}
168 
169 	scrub_ctx->cb = cb;
170 	scrub_ctx->cb_ctx = cb_ctx;
171 
172 	/* Setup a dummy region for the first chunk */
173 	scrub_ctx->reg_chunk.name = ftl_md_region_name(FTL_LAYOUT_REGION_TYPE_DATA_NVC);
174 	scrub_ctx->reg_chunk.type = FTL_LAYOUT_REGION_TYPE_DATA_NVC;
175 	scrub_ctx->reg_chunk.mirror_type = FTL_LAYOUT_REGION_TYPE_INVALID;
176 	scrub_ctx->reg_chunk.current.version = 0;
177 	scrub_ctx->reg_chunk.current.offset = 0;
178 	scrub_ctx->reg_chunk.current.blocks = dev->layout.nvc.chunk_data_blocks;
179 	scrub_ctx->reg_chunk.entry_size = FTL_BLOCK_SIZE;
180 	scrub_ctx->reg_chunk.num_entries = dev->layout.nvc.chunk_data_blocks;
181 	scrub_ctx->reg_chunk.vss_blksz = dev->nv_cache.md_size;
182 	scrub_ctx->reg_chunk.bdev_desc = dev->nv_cache.bdev_desc;
183 	scrub_ctx->reg_chunk.ioch = dev->nv_cache.cache_ioch;
184 
185 	/* Setup an MD object for the region */
186 	scrub_ctx->md_chunk = ftl_md_create(dev, scrub_ctx->reg_chunk.current.blocks,
187 					    scrub_ctx->reg_chunk.vss_blksz, scrub_ctx->reg_chunk.name, FTL_MD_CREATE_NO_MEM,
188 					    &scrub_ctx->reg_chunk);
189 
190 	if (!scrub_ctx->md_chunk) {
191 		free(scrub_ctx);
192 		cb(dev, cb_ctx, -ENOMEM);
193 		return;
194 	}
195 
196 	if (nvc_scrub_find_next_chunk(dev, scrub_ctx)) {
197 		/* No active chunks found */
198 		ftl_md_destroy(scrub_ctx->md_chunk, 0);
199 		free(scrub_ctx);
200 		cb(dev, cb_ctx, -ENOENT);
201 		return;
202 	}
203 
204 	/* Scrub the first chunk */
205 	vss.version.md_version = 0;
206 	vss.nv_cache.lba = FTL_ADDR_INVALID;
207 
208 	scrub_ctx->md_chunk->cb = nvc_scrub_clear_cb;
209 	scrub_ctx->md_chunk->owner.cb_ctx = scrub_ctx;
210 
211 	ftl_md_clear(scrub_ctx->md_chunk, 0, &vss);
212 	return;
213 }
214 
215 int
216 ftl_nv_cache_init(struct spdk_ftl_dev *dev)
217 {
218 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
219 	struct ftl_nv_cache_chunk *chunk;
220 	struct ftl_nv_cache_chunk_md *md;
221 	struct ftl_nv_cache_compactor *compactor;
222 	uint64_t i, offset;
223 
224 	nv_cache->halt = true;
225 
226 	nv_cache->md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_NVC_MD];
227 	if (!nv_cache->md) {
228 		FTL_ERRLOG(dev, "No NV cache metadata object\n");
229 		return -1;
230 	}
231 
232 	nv_cache->md_pool = ftl_mempool_create(dev->conf.user_io_pool_size,
233 					       nv_cache->md_size * dev->xfer_size,
234 					       FTL_BLOCK_SIZE, SPDK_ENV_NUMA_ID_ANY);
235 	if (!nv_cache->md_pool) {
236 		FTL_ERRLOG(dev, "Failed to initialize NV cache metadata pool\n");
237 		return -1;
238 	}
239 
240 	/*
241 	 * Initialize chunk info
242 	 */
243 	nv_cache->chunk_blocks = dev->layout.nvc.chunk_data_blocks;
244 	nv_cache->chunk_count = dev->layout.nvc.chunk_count;
245 	nv_cache->tail_md_chunk_blocks = ftl_nv_cache_chunk_tail_md_num_blocks(nv_cache);
246 
247 	/* Allocate chunks */
248 	nv_cache->chunks = calloc(nv_cache->chunk_count,
249 				  sizeof(nv_cache->chunks[0]));
250 	if (!nv_cache->chunks) {
251 		FTL_ERRLOG(dev, "Failed to initialize NV cache chunks\n");
252 		return -1;
253 	}
254 
255 	TAILQ_INIT(&nv_cache->chunk_free_list);
256 	TAILQ_INIT(&nv_cache->chunk_open_list);
257 	TAILQ_INIT(&nv_cache->chunk_full_list);
258 	TAILQ_INIT(&nv_cache->chunk_comp_list);
259 	TAILQ_INIT(&nv_cache->chunk_inactive_list);
260 	TAILQ_INIT(&nv_cache->needs_free_persist_list);
261 
262 	/* First chunk metadata */
263 	md = ftl_md_get_buffer(nv_cache->md);
264 	if (!md) {
265 		FTL_ERRLOG(dev, "No NV cache metadata\n");
266 		return -1;
267 	}
268 
269 	chunk = nv_cache->chunks;
270 	offset = nvc_data_offset(nv_cache);
271 	for (i = 0; i < nv_cache->chunk_count; i++, chunk++, md++) {
272 		chunk->nv_cache = nv_cache;
273 		chunk->md = md;
274 		chunk->md->version = FTL_NVC_VERSION_CURRENT;
275 		nvc_validate_md(nv_cache, md);
276 		chunk->offset = offset;
277 		offset += nv_cache->chunk_blocks;
278 
279 		if (nv_cache->nvc_type->ops.is_chunk_active(dev, chunk->offset)) {
280 			nv_cache->chunk_free_count++;
281 			TAILQ_INSERT_TAIL(&nv_cache->chunk_free_list, chunk, entry);
282 		} else {
283 			chunk->md->state = FTL_CHUNK_STATE_INACTIVE;
284 			nv_cache->chunk_inactive_count++;
285 			TAILQ_INSERT_TAIL(&nv_cache->chunk_inactive_list, chunk, entry);
286 		}
287 	}
288 	assert(nv_cache->chunk_free_count + nv_cache->chunk_inactive_count == nv_cache->chunk_count);
289 	assert(offset <= nvc_data_offset(nv_cache) + nvc_data_blocks(nv_cache));
290 
291 	TAILQ_INIT(&nv_cache->compactor_list);
292 	for (i = 0; i < FTL_NV_CACHE_NUM_COMPACTORS; i++) {
293 		compactor = compactor_alloc(dev);
294 
295 		if (!compactor) {
296 			FTL_ERRLOG(dev, "Cannot allocate compaction process\n");
297 			return -1;
298 		}
299 
300 		TAILQ_INSERT_TAIL(&nv_cache->compactor_list, compactor, entry);
301 	}
302 
303 #define FTL_MAX_OPEN_CHUNKS 2
304 #define FTL_MAX_COMPACTED_CHUNKS 2
305 	nv_cache->p2l_pool = ftl_mempool_create(FTL_MAX_OPEN_CHUNKS + FTL_MAX_COMPACTED_CHUNKS,
306 						nv_cache_p2l_map_pool_elem_size(nv_cache),
307 						FTL_BLOCK_SIZE,
308 						SPDK_ENV_NUMA_ID_ANY);
309 	if (!nv_cache->p2l_pool) {
310 		return -ENOMEM;
311 	}
312 
313 	/* One entry per open chunk */
314 	nv_cache->chunk_md_pool = ftl_mempool_create(FTL_MAX_OPEN_CHUNKS + FTL_MAX_COMPACTED_CHUNKS,
315 				  sizeof(struct ftl_nv_cache_chunk_md),
316 				  FTL_BLOCK_SIZE,
317 				  SPDK_ENV_NUMA_ID_ANY);
318 	if (!nv_cache->chunk_md_pool) {
319 		return -ENOMEM;
320 	}
321 
322 	/* Each compactor can be reading a different chunk which it needs to switch state to free to at the end,
323 	 * plus one backup each for high invalidity chunks processing (if there's a backlog of chunks with extremely
324 	 * small, even 0, validity then they can be processed by the compactors quickly and trigger a lot of updates
325 	 * to free state at once) */
326 	nv_cache->free_chunk_md_pool = ftl_mempool_create(2 * FTL_NV_CACHE_NUM_COMPACTORS,
327 				       sizeof(struct ftl_nv_cache_chunk_md),
328 				       FTL_BLOCK_SIZE,
329 				       SPDK_ENV_NUMA_ID_ANY);
330 	if (!nv_cache->free_chunk_md_pool) {
331 		return -ENOMEM;
332 	}
333 
334 	ftl_nv_cache_init_update_limits(dev);
335 	ftl_property_register(dev, "cache_device", NULL, 0, NULL, NULL, ftl_property_dump_cache_dev, NULL,
336 			      NULL, true);
337 
338 	nv_cache->throttle.interval_tsc = FTL_NV_CACHE_THROTTLE_INTERVAL_MS *
339 					  (spdk_get_ticks_hz() / 1000);
340 	nv_cache->chunk_free_target = spdk_divide_round_up(nv_cache->chunk_count *
341 				      dev->conf.nv_cache.chunk_free_target,
342 				      100);
343 
344 	if (nv_cache->nvc_type->ops.init) {
345 		return nv_cache->nvc_type->ops.init(dev);
346 	} else {
347 		return 0;
348 	}
349 }
350 
351 void
352 ftl_nv_cache_deinit(struct spdk_ftl_dev *dev)
353 {
354 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
355 	struct ftl_nv_cache_compactor *compactor;
356 
357 	if (nv_cache->nvc_type->ops.deinit) {
358 		nv_cache->nvc_type->ops.deinit(dev);
359 	}
360 
361 	while (!TAILQ_EMPTY(&nv_cache->compactor_list)) {
362 		compactor = TAILQ_FIRST(&nv_cache->compactor_list);
363 		TAILQ_REMOVE(&nv_cache->compactor_list, compactor, entry);
364 
365 		compactor_free(dev, compactor);
366 	}
367 
368 	ftl_mempool_destroy(nv_cache->md_pool);
369 	ftl_mempool_destroy(nv_cache->p2l_pool);
370 	ftl_mempool_destroy(nv_cache->chunk_md_pool);
371 	ftl_mempool_destroy(nv_cache->free_chunk_md_pool);
372 	nv_cache->md_pool = NULL;
373 	nv_cache->p2l_pool = NULL;
374 	nv_cache->chunk_md_pool = NULL;
375 	nv_cache->free_chunk_md_pool = NULL;
376 
377 	free(nv_cache->chunks);
378 	nv_cache->chunks = NULL;
379 }
380 
381 static uint64_t
382 chunk_get_free_space(struct ftl_nv_cache *nv_cache,
383 		     struct ftl_nv_cache_chunk *chunk)
384 {
385 	assert(chunk->md->write_pointer + nv_cache->tail_md_chunk_blocks <=
386 	       nv_cache->chunk_blocks);
387 	return nv_cache->chunk_blocks - chunk->md->write_pointer -
388 	       nv_cache->tail_md_chunk_blocks;
389 }
390 
391 static bool
392 chunk_is_closed(struct ftl_nv_cache_chunk *chunk)
393 {
394 	return chunk->md->write_pointer == chunk->nv_cache->chunk_blocks;
395 }
396 
397 static void ftl_chunk_close(struct ftl_nv_cache_chunk *chunk);
398 
399 static uint64_t
400 ftl_nv_cache_get_wr_buffer(struct ftl_nv_cache *nv_cache, struct ftl_io *io)
401 {
402 	uint64_t address = FTL_LBA_INVALID;
403 	uint64_t num_blocks = io->num_blocks;
404 	uint64_t free_space;
405 	struct ftl_nv_cache_chunk *chunk;
406 
407 	do {
408 		chunk = nv_cache->chunk_current;
409 		/* Chunk has been closed so pick new one */
410 		if (chunk && chunk_is_closed(chunk))  {
411 			chunk = NULL;
412 		}
413 
414 		if (!chunk) {
415 			chunk = TAILQ_FIRST(&nv_cache->chunk_open_list);
416 			if (chunk && chunk->md->state == FTL_CHUNK_STATE_OPEN) {
417 				TAILQ_REMOVE(&nv_cache->chunk_open_list, chunk, entry);
418 				nv_cache->chunk_current = chunk;
419 			} else {
420 				break;
421 			}
422 		}
423 
424 		free_space = chunk_get_free_space(nv_cache, chunk);
425 
426 		if (free_space >= num_blocks) {
427 			/* Enough space in chunk */
428 
429 			/* Calculate address in NV cache */
430 			address = chunk->offset + chunk->md->write_pointer;
431 
432 			/* Set chunk in IO */
433 			io->nv_cache_chunk = chunk;
434 
435 			/* Move write pointer */
436 			chunk->md->write_pointer += num_blocks;
437 
438 			if (free_space == num_blocks) {
439 				nv_cache->chunk_current = NULL;
440 			}
441 			break;
442 		}
443 
444 		/* Not enough space in nv_cache_chunk */
445 		nv_cache->chunk_current = NULL;
446 
447 		if (0 == free_space) {
448 			continue;
449 		}
450 
451 		chunk->md->blocks_skipped = free_space;
452 		chunk->md->blocks_written += free_space;
453 		chunk->md->write_pointer += free_space;
454 
455 		if (chunk->md->blocks_written == chunk_tail_md_offset(nv_cache)) {
456 			ftl_chunk_close(chunk);
457 		}
458 	} while (1);
459 
460 	return address;
461 }
462 
463 void
464 ftl_nv_cache_fill_md(struct ftl_io *io)
465 {
466 	struct ftl_nv_cache_chunk *chunk = io->nv_cache_chunk;
467 	uint64_t i;
468 	union ftl_md_vss *metadata = io->md;
469 	uint64_t lba = ftl_io_get_lba(io, 0);
470 
471 	for (i = 0; i < io->num_blocks; ++i, lba++, metadata++) {
472 		metadata->nv_cache.lba = lba;
473 		metadata->nv_cache.seq_id = chunk->md->seq_id;
474 	}
475 }
476 
477 uint64_t
478 chunk_tail_md_offset(struct ftl_nv_cache *nv_cache)
479 {
480 	return nv_cache->chunk_blocks - nv_cache->tail_md_chunk_blocks;
481 }
482 
483 static void
484 chunk_advance_blocks(struct ftl_nv_cache *nv_cache, struct ftl_nv_cache_chunk *chunk,
485 		     uint64_t advanced_blocks)
486 {
487 	chunk->md->blocks_written += advanced_blocks;
488 
489 	assert(chunk->md->blocks_written <= nv_cache->chunk_blocks);
490 
491 	if (chunk->md->blocks_written == chunk_tail_md_offset(nv_cache)) {
492 		ftl_chunk_close(chunk);
493 	}
494 }
495 
496 static uint64_t
497 chunk_user_blocks_written(struct ftl_nv_cache_chunk *chunk)
498 {
499 	return chunk->md->blocks_written - chunk->md->blocks_skipped -
500 	       chunk->nv_cache->tail_md_chunk_blocks;
501 }
502 
503 static bool
504 is_chunk_compacted(struct ftl_nv_cache_chunk *chunk)
505 {
506 	assert(chunk->md->blocks_written != 0);
507 
508 	if (chunk_user_blocks_written(chunk) == chunk->md->blocks_compacted) {
509 		return true;
510 	}
511 
512 	return false;
513 }
514 
515 static int
516 ftl_chunk_alloc_md_entry(struct ftl_nv_cache_chunk *chunk)
517 {
518 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
519 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
520 
521 	p2l_map->chunk_dma_md = ftl_mempool_get(nv_cache->chunk_md_pool);
522 
523 	if (!p2l_map->chunk_dma_md) {
524 		return -ENOMEM;
525 	}
526 
527 	ftl_nv_cache_chunk_md_initialize(p2l_map->chunk_dma_md);
528 	return 0;
529 }
530 
531 static void
532 ftl_chunk_free_md_entry(struct ftl_nv_cache_chunk *chunk)
533 {
534 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
535 
536 	ftl_mempool_put(chunk->nv_cache->chunk_md_pool, p2l_map->chunk_dma_md);
537 	p2l_map->chunk_dma_md = NULL;
538 }
539 
540 static void chunk_free_p2l_map(struct ftl_nv_cache_chunk *chunk);
541 
542 static void
543 ftl_chunk_free(struct ftl_nv_cache_chunk *chunk)
544 {
545 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
546 
547 	/* Reset chunk */
548 	ftl_nv_cache_chunk_md_initialize(chunk->md);
549 
550 	TAILQ_INSERT_TAIL(&nv_cache->needs_free_persist_list, chunk, entry);
551 	nv_cache->chunk_free_persist_count++;
552 }
553 
554 static int
555 ftl_chunk_alloc_chunk_free_entry(struct ftl_nv_cache_chunk *chunk)
556 {
557 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
558 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
559 
560 	p2l_map->chunk_dma_md = ftl_mempool_get(nv_cache->free_chunk_md_pool);
561 	if (!p2l_map->chunk_dma_md) {
562 		return -ENOMEM;
563 	}
564 
565 	ftl_nv_cache_chunk_md_initialize(p2l_map->chunk_dma_md);
566 	return 0;
567 }
568 
569 static void
570 ftl_chunk_free_chunk_free_entry(struct ftl_nv_cache_chunk *chunk)
571 {
572 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
573 
574 	ftl_mempool_put(chunk->nv_cache->free_chunk_md_pool, p2l_map->chunk_dma_md);
575 	p2l_map->chunk_dma_md = NULL;
576 }
577 
578 static void
579 chunk_free_cb(int status, void *ctx)
580 {
581 	struct ftl_nv_cache_chunk *chunk = (struct ftl_nv_cache_chunk *)ctx;
582 
583 	if (spdk_likely(!status)) {
584 		struct ftl_nv_cache *nv_cache = chunk->nv_cache;
585 
586 		nv_cache->chunk_free_persist_count--;
587 		TAILQ_INSERT_TAIL(&nv_cache->chunk_free_list, chunk, entry);
588 		nv_cache->chunk_free_count++;
589 		nv_cache->chunk_full_count--;
590 		chunk->md->state = FTL_CHUNK_STATE_FREE;
591 		chunk->md->close_seq_id = 0;
592 		ftl_chunk_free_chunk_free_entry(chunk);
593 	} else {
594 #ifdef SPDK_FTL_RETRY_ON_ERROR
595 		ftl_md_persist_entry_retry(&chunk->md_persist_entry_ctx);
596 #else
597 		ftl_abort();
598 #endif
599 	}
600 }
601 
602 static void
603 ftl_chunk_persist_free_state(struct ftl_nv_cache *nv_cache)
604 {
605 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
606 	struct ftl_p2l_map *p2l_map;
607 	struct ftl_md *md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_NVC_MD];
608 	struct ftl_layout_region *region = ftl_layout_region_get(dev, FTL_LAYOUT_REGION_TYPE_NVC_MD);
609 	struct ftl_nv_cache_chunk *tchunk, *chunk = NULL;
610 	int rc;
611 
612 	TAILQ_FOREACH_SAFE(chunk, &nv_cache->needs_free_persist_list, entry, tchunk) {
613 		p2l_map = &chunk->p2l_map;
614 		rc = ftl_chunk_alloc_chunk_free_entry(chunk);
615 		if (rc) {
616 			break;
617 		}
618 
619 		TAILQ_REMOVE(&nv_cache->needs_free_persist_list, chunk, entry);
620 
621 		memcpy(p2l_map->chunk_dma_md, chunk->md, region->entry_size * FTL_BLOCK_SIZE);
622 		p2l_map->chunk_dma_md->state = FTL_CHUNK_STATE_FREE;
623 		p2l_map->chunk_dma_md->close_seq_id = 0;
624 		p2l_map->chunk_dma_md->p2l_map_checksum = 0;
625 
626 		ftl_md_persist_entries(md, get_chunk_idx(chunk), 1, p2l_map->chunk_dma_md, NULL,
627 				       chunk_free_cb, chunk, &chunk->md_persist_entry_ctx);
628 	}
629 }
630 
631 static void
632 compaction_stats_update(struct ftl_nv_cache_chunk *chunk)
633 {
634 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
635 	struct compaction_bw_stats *compaction_bw = &nv_cache->compaction_recent_bw;
636 	double *ptr;
637 
638 	if (spdk_unlikely(chunk->compaction_length_tsc == 0)) {
639 		return;
640 	}
641 
642 	if (spdk_likely(compaction_bw->count == FTL_NV_CACHE_COMPACTION_SMA_N)) {
643 		ptr = compaction_bw->buf + compaction_bw->first;
644 		compaction_bw->first++;
645 		if (compaction_bw->first == FTL_NV_CACHE_COMPACTION_SMA_N) {
646 			compaction_bw->first = 0;
647 		}
648 		compaction_bw->sum -= *ptr;
649 	} else {
650 		ptr = compaction_bw->buf + compaction_bw->count;
651 		compaction_bw->count++;
652 	}
653 
654 	*ptr = (double)chunk->md->blocks_compacted * FTL_BLOCK_SIZE / chunk->compaction_length_tsc;
655 	chunk->compaction_length_tsc = 0;
656 
657 	compaction_bw->sum += *ptr;
658 	nv_cache->compaction_sma = compaction_bw->sum / compaction_bw->count;
659 }
660 
661 static void
662 chunk_compaction_advance(struct ftl_nv_cache_chunk *chunk, uint64_t num_blocks)
663 {
664 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
665 	uint64_t tsc = spdk_thread_get_last_tsc(spdk_get_thread());
666 
667 	chunk->compaction_length_tsc += tsc - chunk->compaction_start_tsc;
668 	chunk->compaction_start_tsc = tsc;
669 
670 	chunk->md->blocks_compacted += num_blocks;
671 	assert(chunk->md->blocks_compacted <= chunk_user_blocks_written(chunk));
672 	if (!is_chunk_compacted(chunk)) {
673 		return;
674 	}
675 
676 	/* Remove chunk from compacted list */
677 	TAILQ_REMOVE(&nv_cache->chunk_comp_list, chunk, entry);
678 	nv_cache->chunk_comp_count--;
679 
680 	compaction_stats_update(chunk);
681 
682 	chunk_free_p2l_map(chunk);
683 
684 	ftl_chunk_free(chunk);
685 }
686 
687 static bool
688 is_compaction_required_for_upgrade(struct ftl_nv_cache *nv_cache)
689 {
690 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
691 
692 	if (dev->conf.prep_upgrade_on_shutdown) {
693 		if (nv_cache->chunk_full_count || nv_cache->chunk_open_count) {
694 			return true;
695 		}
696 	}
697 
698 	return false;
699 }
700 
701 static bool
702 is_compaction_required(struct ftl_nv_cache *nv_cache)
703 {
704 	if (spdk_unlikely(nv_cache->halt)) {
705 		return is_compaction_required_for_upgrade(nv_cache);
706 	}
707 
708 	if (nv_cache->chunk_full_count >= nv_cache->chunk_compaction_threshold) {
709 		return true;
710 	}
711 
712 	return false;
713 }
714 
715 static void compaction_process_finish_read(struct ftl_nv_cache_compactor *compactor);
716 static void compaction_process_pin_lba(struct ftl_nv_cache_compactor *comp);
717 
718 static void
719 _compaction_process_pin_lba(void *_comp)
720 {
721 	struct ftl_nv_cache_compactor *comp = _comp;
722 
723 	compaction_process_pin_lba(comp);
724 }
725 
726 static void
727 compaction_process_pin_lba_cb(struct spdk_ftl_dev *dev, int status, struct ftl_l2p_pin_ctx *pin_ctx)
728 {
729 	struct ftl_nv_cache_compactor *comp = pin_ctx->cb_ctx;
730 	struct ftl_rq *rq = comp->rq;
731 
732 	if (status) {
733 		rq->iter.status = status;
734 		pin_ctx->lba = FTL_LBA_INVALID;
735 	}
736 
737 	if (--rq->iter.remaining == 0) {
738 		if (rq->iter.status) {
739 			/* unpin and try again */
740 			ftl_rq_unpin(rq);
741 			spdk_thread_send_msg(spdk_get_thread(), _compaction_process_pin_lba, comp);
742 			return;
743 		}
744 
745 		compaction_process_finish_read(comp);
746 	}
747 }
748 
749 static void
750 compaction_process_pin_lba(struct ftl_nv_cache_compactor *comp)
751 {
752 	struct ftl_rq *rq = comp->rq;
753 	struct spdk_ftl_dev *dev = rq->dev;
754 	struct ftl_rq_entry *entry;
755 
756 	assert(rq->iter.count);
757 	rq->iter.remaining = rq->iter.count;
758 	rq->iter.status = 0;
759 
760 	FTL_RQ_ENTRY_LOOP(rq, entry, rq->iter.count) {
761 		struct ftl_l2p_pin_ctx *pin_ctx = &entry->l2p_pin_ctx;
762 
763 		if (entry->lba == FTL_LBA_INVALID) {
764 			ftl_l2p_pin_skip(dev, compaction_process_pin_lba_cb, comp, pin_ctx);
765 		} else {
766 			ftl_l2p_pin(dev, entry->lba, 1, compaction_process_pin_lba_cb, comp, pin_ctx);
767 		}
768 	}
769 }
770 
771 static void
772 compaction_process_read_entry_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
773 {
774 	struct ftl_rq_entry *entry = arg;
775 	struct ftl_rq *rq = ftl_rq_from_entry(entry);
776 	struct spdk_ftl_dev *dev = rq->dev;
777 	struct ftl_nv_cache_compactor *compactor = rq->owner.priv;
778 
779 	ftl_stats_bdev_io_completed(dev, FTL_STATS_TYPE_CMP, bdev_io);
780 
781 	spdk_bdev_free_io(bdev_io);
782 
783 	if (!success) {
784 		/* retry */
785 		spdk_thread_send_msg(spdk_get_thread(), compaction_process_read_entry, entry);
786 		return;
787 	}
788 
789 	assert(rq->iter.remaining >= entry->bdev_io.num_blocks);
790 	rq->iter.remaining -= entry->bdev_io.num_blocks;
791 	if (0 == rq->iter.remaining) {
792 		/* All IOs processed, go to next phase - pining */
793 		compaction_process_pin_lba(compactor);
794 	}
795 }
796 
797 static void
798 compaction_process_read_entry(void *arg)
799 {
800 	struct ftl_rq_entry *entry = arg;
801 	struct ftl_rq *rq = ftl_rq_from_entry(entry);
802 	struct spdk_ftl_dev *dev = rq->dev;
803 	int rc;
804 
805 	rc = spdk_bdev_read_blocks(dev->nv_cache.bdev_desc, dev->nv_cache.cache_ioch,
806 				   entry->io_payload, entry->bdev_io.offset_blocks, entry->bdev_io.num_blocks,
807 				   compaction_process_read_entry_cb, entry);
808 
809 	if (spdk_unlikely(rc)) {
810 		if (rc == -ENOMEM) {
811 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(dev->nv_cache.bdev_desc);
812 			entry->bdev_io.wait_entry.bdev = bdev;
813 			entry->bdev_io.wait_entry.cb_fn = compaction_process_read_entry;
814 			entry->bdev_io.wait_entry.cb_arg = entry;
815 			spdk_bdev_queue_io_wait(bdev, dev->nv_cache.cache_ioch, &entry->bdev_io.wait_entry);
816 		} else {
817 			ftl_abort();
818 		}
819 	}
820 
821 	dev->stats.io_activity_total += entry->bdev_io.num_blocks;
822 }
823 
824 static bool
825 is_chunk_to_read(struct ftl_nv_cache_chunk *chunk)
826 {
827 	assert(chunk->md->blocks_written != 0);
828 
829 	if (chunk_user_blocks_written(chunk) == chunk->md->read_pointer) {
830 		return false;
831 	}
832 
833 	return true;
834 }
835 
836 static void
837 read_chunk_p2l_map_cb(struct ftl_basic_rq *brq)
838 {
839 	struct ftl_nv_cache_chunk *chunk = brq->io.chunk;
840 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
841 
842 	if (!brq->success) {
843 #ifdef SPDK_FTL_RETRY_ON_ERROR
844 		read_chunk_p2l_map(chunk);
845 #else
846 		ftl_abort();
847 #endif
848 	}
849 
850 	TAILQ_INSERT_TAIL(&nv_cache->chunk_comp_list, chunk, entry);
851 }
852 
853 static int chunk_alloc_p2l_map(struct ftl_nv_cache_chunk *chunk);
854 static int ftl_chunk_read_tail_md(struct ftl_nv_cache_chunk *chunk, struct ftl_basic_rq *brq,
855 				  void (*cb)(struct ftl_basic_rq *brq), void *cb_ctx);
856 
857 static void
858 read_chunk_p2l_map(void *arg)
859 {
860 	struct ftl_nv_cache_chunk *chunk = arg;
861 	int rc;
862 
863 	if (chunk_alloc_p2l_map(chunk)) {
864 		ftl_abort();
865 	}
866 
867 	rc = ftl_chunk_read_tail_md(chunk, &chunk->metadata_rq, read_chunk_p2l_map_cb, NULL);
868 	if (rc) {
869 		if (rc == -ENOMEM) {
870 			struct ftl_nv_cache *nv_cache = chunk->nv_cache;
871 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
872 			struct spdk_bdev_io_wait_entry *wait_entry = &chunk->metadata_rq.io.bdev_io_wait;
873 
874 			wait_entry->bdev = bdev;
875 			wait_entry->cb_fn = read_chunk_p2l_map;
876 			wait_entry->cb_arg = chunk;
877 			spdk_bdev_queue_io_wait(bdev, nv_cache->cache_ioch, wait_entry);
878 		} else {
879 			ftl_abort();
880 		}
881 	}
882 }
883 
884 static void
885 prepare_chunk_for_compaction(struct ftl_nv_cache *nv_cache)
886 {
887 	struct ftl_nv_cache_chunk *chunk = NULL;
888 
889 	if (TAILQ_EMPTY(&nv_cache->chunk_full_list)) {
890 		return;
891 	}
892 
893 	chunk = TAILQ_FIRST(&nv_cache->chunk_full_list);
894 	TAILQ_REMOVE(&nv_cache->chunk_full_list, chunk, entry);
895 	assert(chunk->md->write_pointer);
896 
897 	nv_cache->chunk_comp_count++;
898 	read_chunk_p2l_map(chunk);
899 }
900 
901 
902 static struct ftl_nv_cache_chunk *
903 get_chunk_for_compaction(struct ftl_nv_cache *nv_cache)
904 {
905 	struct ftl_nv_cache_chunk *chunk = NULL;
906 
907 	if (TAILQ_EMPTY(&nv_cache->chunk_comp_list)) {
908 		return NULL;
909 	}
910 
911 	chunk = TAILQ_FIRST(&nv_cache->chunk_comp_list);
912 	if (!is_chunk_to_read(chunk)) {
913 		return NULL;
914 	}
915 
916 	return chunk;
917 }
918 
919 static uint64_t
920 chunk_blocks_to_read(struct ftl_nv_cache_chunk *chunk)
921 {
922 	uint64_t blocks_written;
923 	uint64_t blocks_to_read;
924 
925 	assert(chunk->md->blocks_written >= chunk->md->blocks_skipped);
926 	blocks_written = chunk_user_blocks_written(chunk);
927 
928 	assert(blocks_written >= chunk->md->read_pointer);
929 	blocks_to_read = blocks_written - chunk->md->read_pointer;
930 
931 	return blocks_to_read;
932 }
933 
934 static void
935 compactor_deactivate(struct ftl_nv_cache_compactor *compactor)
936 {
937 	struct ftl_nv_cache *nv_cache = compactor->nv_cache;
938 
939 	compactor->rq->iter.count = 0;
940 	assert(nv_cache->compaction_active_count);
941 	nv_cache->compaction_active_count--;
942 	TAILQ_INSERT_TAIL(&nv_cache->compactor_list, compactor, entry);
943 }
944 
945 static void
946 compaction_process_invalidate_entry(struct ftl_rq_entry *entry)
947 {
948 	entry->addr = FTL_ADDR_INVALID;
949 	entry->lba = FTL_LBA_INVALID;
950 	entry->seq_id = 0;
951 	entry->owner.priv = NULL;
952 }
953 
954 static void
955 compaction_process_pad(struct ftl_nv_cache_compactor *compactor, uint64_t idx)
956 {
957 	struct ftl_rq *rq = compactor->rq;
958 	struct ftl_rq_entry *entry;
959 
960 	assert(idx < rq->num_blocks);
961 	FTL_RQ_ENTRY_LOOP_FROM(rq, &rq->entries[idx], entry, rq->num_blocks) {
962 		compaction_process_invalidate_entry(entry);
963 	}
964 }
965 
966 static void
967 compaction_process_read(struct ftl_nv_cache_compactor *compactor)
968 {
969 	struct ftl_rq *rq = compactor->rq;
970 	struct ftl_nv_cache *nv_cache = compactor->nv_cache;
971 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
972 	struct ftl_rq_entry *entry, *io;
973 
974 	assert(rq->iter.count);
975 	rq->iter.remaining = rq->iter.count;
976 
977 	io = rq->entries;
978 	io->bdev_io.num_blocks = 1;
979 	io->bdev_io.offset_blocks = ftl_addr_to_nvc_offset(dev, io->addr);
980 	FTL_RQ_ENTRY_LOOP_FROM(rq, &rq->entries[1], entry,  rq->iter.count) {
981 		if (entry->addr == io->addr + io->bdev_io.num_blocks) {
982 			io->bdev_io.num_blocks++;
983 		} else {
984 			compaction_process_read_entry(io);
985 			io = entry;
986 			io->bdev_io.num_blocks = 1;
987 			io->bdev_io.offset_blocks = ftl_addr_to_nvc_offset(dev, io->addr);
988 		}
989 	}
990 	compaction_process_read_entry(io);
991 }
992 
993 static ftl_addr
994 compaction_chunk_read_pos(struct spdk_ftl_dev *dev, struct ftl_nv_cache_chunk *chunk)
995 {
996 	ftl_addr start, pos;
997 	uint64_t skip, to_read = chunk_blocks_to_read(chunk);
998 
999 	if (0 == to_read) {
1000 		return FTL_ADDR_INVALID;
1001 	}
1002 
1003 	start = ftl_addr_from_nvc_offset(dev, chunk->offset + chunk->md->read_pointer);
1004 	pos = ftl_bitmap_find_first_set(dev->valid_map, start, start + to_read - 1);
1005 
1006 	if (pos == UINT64_MAX) {
1007 		chunk->md->read_pointer += to_read;
1008 		chunk_compaction_advance(chunk, to_read);
1009 		return FTL_ADDR_INVALID;
1010 	}
1011 
1012 	assert(pos >= start);
1013 	skip = pos - start;
1014 	if (skip) {
1015 		chunk->md->read_pointer += skip;
1016 		chunk_compaction_advance(chunk, skip);
1017 	}
1018 
1019 	return pos;
1020 }
1021 
1022 static bool
1023 compaction_entry_read_pos(struct ftl_nv_cache *nv_cache, struct ftl_rq_entry *entry)
1024 {
1025 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1026 	struct ftl_nv_cache_chunk *chunk = NULL;
1027 	ftl_addr addr = FTL_ADDR_INVALID;
1028 
1029 	while (!chunk) {
1030 		/* Get currently handled chunk */
1031 		chunk = get_chunk_for_compaction(nv_cache);
1032 		if (!chunk) {
1033 			return false;
1034 		}
1035 		chunk->compaction_start_tsc = spdk_thread_get_last_tsc(spdk_get_thread());
1036 
1037 		/* Get next read position in chunk */
1038 		addr = compaction_chunk_read_pos(dev, chunk);
1039 		if (FTL_ADDR_INVALID == addr) {
1040 			chunk = NULL;
1041 		}
1042 	}
1043 
1044 	assert(FTL_ADDR_INVALID != addr);
1045 
1046 	/* Set entry address info and chunk */
1047 	entry->addr = addr;
1048 	entry->owner.priv = chunk;
1049 	entry->lba = ftl_chunk_map_get_lba(chunk, chunk->md->read_pointer);
1050 
1051 	/* Move read pointer in the chunk */
1052 	chunk->md->read_pointer++;
1053 
1054 	return true;
1055 }
1056 
1057 static void
1058 compaction_process_start(struct ftl_nv_cache_compactor *compactor)
1059 {
1060 	struct ftl_rq *rq = compactor->rq;
1061 	struct ftl_nv_cache *nv_cache = compactor->nv_cache;
1062 	struct ftl_rq_entry *entry;
1063 
1064 	assert(0 == compactor->rq->iter.count);
1065 	FTL_RQ_ENTRY_LOOP(rq, entry, rq->num_blocks) {
1066 		if (!compaction_entry_read_pos(nv_cache, entry)) {
1067 			compaction_process_pad(compactor, entry->index);
1068 			break;
1069 		}
1070 		rq->iter.count++;
1071 	}
1072 
1073 	if (rq->iter.count) {
1074 		/* Schedule Read IOs */
1075 		compaction_process_read(compactor);
1076 	} else {
1077 		compactor_deactivate(compactor);
1078 	}
1079 }
1080 
1081 static void
1082 compaction_process(struct ftl_nv_cache *nv_cache)
1083 {
1084 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1085 	struct ftl_nv_cache_compactor *compactor;
1086 
1087 	if (!is_compaction_required(nv_cache)) {
1088 		return;
1089 	}
1090 
1091 	if (nv_cache->chunk_comp_count < FTL_MAX_COMPACTED_CHUNKS) {
1092 		prepare_chunk_for_compaction(nv_cache);
1093 	}
1094 
1095 	if (TAILQ_EMPTY(&nv_cache->chunk_comp_list)) {
1096 		return;
1097 	}
1098 
1099 	compactor = TAILQ_FIRST(&nv_cache->compactor_list);
1100 	if (!compactor) {
1101 		return;
1102 	}
1103 
1104 	TAILQ_REMOVE(&nv_cache->compactor_list, compactor, entry);
1105 	compactor->nv_cache->compaction_active_count++;
1106 	compaction_process_start(compactor);
1107 	ftl_add_io_activity(dev);
1108 }
1109 
1110 static void
1111 compaction_process_ftl_done(struct ftl_rq *rq)
1112 {
1113 	struct spdk_ftl_dev *dev = rq->dev;
1114 	struct ftl_nv_cache_compactor *compactor = rq->owner.priv;
1115 	struct ftl_band *band = rq->io.band;
1116 	struct ftl_rq_entry *entry;
1117 	ftl_addr addr;
1118 
1119 	if (spdk_unlikely(false == rq->success)) {
1120 		/* IO error retry writing */
1121 #ifdef SPDK_FTL_RETRY_ON_ERROR
1122 		ftl_writer_queue_rq(&dev->writer_user, rq);
1123 		return;
1124 #else
1125 		ftl_abort();
1126 #endif
1127 	}
1128 
1129 	assert(rq->iter.count);
1130 
1131 	/* Update L2P table */
1132 	addr = rq->io.addr;
1133 	FTL_RQ_ENTRY_LOOP(rq, entry, rq->iter.count) {
1134 		struct ftl_nv_cache_chunk *chunk = entry->owner.priv;
1135 
1136 		if (entry->lba != FTL_LBA_INVALID) {
1137 			ftl_l2p_update_base(dev, entry->lba, addr, entry->addr);
1138 			ftl_l2p_unpin(dev, entry->lba, 1);
1139 			chunk_compaction_advance(chunk, 1);
1140 		} else {
1141 			assert(entry->addr == FTL_ADDR_INVALID);
1142 		}
1143 
1144 		addr = ftl_band_next_addr(band, addr, 1);
1145 		compaction_process_invalidate_entry(entry);
1146 	}
1147 
1148 	compactor_deactivate(compactor);
1149 }
1150 
1151 static void
1152 compaction_process_finish_read(struct ftl_nv_cache_compactor *compactor)
1153 {
1154 	struct ftl_rq *rq = compactor->rq;
1155 	struct spdk_ftl_dev *dev = rq->dev;
1156 	struct ftl_rq_entry *entry;
1157 	ftl_addr current_addr;
1158 	uint64_t skip = 0;
1159 
1160 	FTL_RQ_ENTRY_LOOP(rq, entry, rq->iter.count) {
1161 		struct ftl_nv_cache_chunk *chunk = entry->owner.priv;
1162 		uint64_t lba = entry->lba;
1163 
1164 		if (lba == FTL_LBA_INVALID) {
1165 			skip++;
1166 			compaction_process_invalidate_entry(entry);
1167 			chunk_compaction_advance(chunk, 1);
1168 			continue;
1169 		}
1170 
1171 		current_addr = ftl_l2p_get(dev, lba);
1172 		if (current_addr == entry->addr) {
1173 			entry->seq_id = chunk->md->seq_id;
1174 		} else {
1175 			/* This address already invalidated, just omit this block */
1176 			skip++;
1177 			ftl_l2p_unpin(dev, lba, 1);
1178 			compaction_process_invalidate_entry(entry);
1179 			chunk_compaction_advance(chunk, 1);
1180 		}
1181 	}
1182 
1183 	if (skip < rq->iter.count) {
1184 		/*
1185 		 * Request contains data to be placed on FTL, compact it
1186 		 */
1187 		ftl_writer_queue_rq(&dev->writer_user, rq);
1188 	} else {
1189 		compactor_deactivate(compactor);
1190 	}
1191 }
1192 
1193 static void
1194 compactor_free(struct spdk_ftl_dev *dev, struct ftl_nv_cache_compactor *compactor)
1195 {
1196 	if (!compactor) {
1197 		return;
1198 	}
1199 
1200 	ftl_rq_del(compactor->rq);
1201 	free(compactor);
1202 }
1203 
1204 static struct ftl_nv_cache_compactor *
1205 compactor_alloc(struct spdk_ftl_dev *dev)
1206 {
1207 	struct ftl_nv_cache_compactor *compactor;
1208 	struct ftl_rq_entry *entry;
1209 
1210 	compactor = calloc(1, sizeof(*compactor));
1211 	if (!compactor) {
1212 		goto error;
1213 	}
1214 
1215 	/* Allocate help request for reading */
1216 	compactor->rq = ftl_rq_new(dev, dev->nv_cache.md_size);
1217 	if (!compactor->rq) {
1218 		goto error;
1219 	}
1220 
1221 	compactor->nv_cache = &dev->nv_cache;
1222 	compactor->rq->owner.priv = compactor;
1223 	compactor->rq->owner.cb = compaction_process_ftl_done;
1224 	compactor->rq->owner.compaction = true;
1225 
1226 	FTL_RQ_ENTRY_LOOP(compactor->rq, entry, compactor->rq->num_blocks) {
1227 		compaction_process_invalidate_entry(entry);
1228 	}
1229 
1230 	return compactor;
1231 
1232 error:
1233 	compactor_free(dev, compactor);
1234 	return NULL;
1235 }
1236 
1237 static void
1238 ftl_nv_cache_submit_cb_done(struct ftl_io *io)
1239 {
1240 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1241 
1242 	chunk_advance_blocks(nv_cache, io->nv_cache_chunk, io->num_blocks);
1243 	io->nv_cache_chunk = NULL;
1244 
1245 	ftl_io_complete(io);
1246 }
1247 
1248 static void
1249 ftl_nv_cache_l2p_update(struct ftl_io *io)
1250 {
1251 	struct spdk_ftl_dev *dev = io->dev;
1252 	ftl_addr next_addr = io->addr;
1253 	size_t i;
1254 
1255 	for (i = 0; i < io->num_blocks; ++i, ++next_addr) {
1256 		ftl_l2p_update_cache(dev, ftl_io_get_lba(io, i), next_addr, io->map[i]);
1257 	}
1258 
1259 	ftl_l2p_unpin(dev, io->lba, io->num_blocks);
1260 	ftl_nv_cache_submit_cb_done(io);
1261 }
1262 
1263 static void
1264 ftl_nv_cache_pin_cb(struct spdk_ftl_dev *dev, int status, struct ftl_l2p_pin_ctx *pin_ctx)
1265 {
1266 	struct ftl_io *io = pin_ctx->cb_ctx;
1267 	size_t i;
1268 
1269 	if (spdk_unlikely(status != 0)) {
1270 		/* Retry on the internal L2P fault */
1271 		FTL_ERRLOG(dev, "Cannot PIN LBA for NV cache write failed at %"PRIx64"\n",
1272 			   io->addr);
1273 		io->status = -EAGAIN;
1274 		ftl_nv_cache_submit_cb_done(io);
1275 		return;
1276 	}
1277 
1278 	/* Remember previous l2p mapping to resolve conflicts in case of outstanding write-after-write */
1279 	for (i = 0; i < io->num_blocks; ++i) {
1280 		io->map[i] = ftl_l2p_get(dev, ftl_io_get_lba(io, i));
1281 	}
1282 
1283 	assert(io->iov_pos == 0);
1284 
1285 	ftl_trace_submission(io->dev, io, io->addr, io->num_blocks);
1286 
1287 	dev->nv_cache.nvc_type->ops.write(io);
1288 }
1289 
1290 void
1291 ftl_nv_cache_write_complete(struct ftl_io *io, bool success)
1292 {
1293 	if (spdk_unlikely(!success)) {
1294 		FTL_ERRLOG(io->dev, "Non-volatile cache write failed at %"PRIx64"\n",
1295 			   io->addr);
1296 		io->status = -EIO;
1297 		ftl_l2p_unpin(io->dev, io->lba, io->num_blocks);
1298 		ftl_nv_cache_submit_cb_done(io);
1299 		return;
1300 	}
1301 
1302 	ftl_nv_cache_l2p_update(io);
1303 }
1304 
1305 bool
1306 ftl_nv_cache_write(struct ftl_io *io)
1307 {
1308 	struct spdk_ftl_dev *dev = io->dev;
1309 	uint64_t cache_offset;
1310 
1311 	/* Reserve area on the write buffer cache */
1312 	cache_offset = ftl_nv_cache_get_wr_buffer(&dev->nv_cache, io);
1313 	if (cache_offset == FTL_LBA_INVALID) {
1314 		/* No free space in NV cache, resubmit request */
1315 		return false;
1316 	}
1317 	io->addr = ftl_addr_from_nvc_offset(dev, cache_offset);
1318 
1319 	ftl_l2p_pin(io->dev, io->lba, io->num_blocks,
1320 		    ftl_nv_cache_pin_cb, io,
1321 		    &io->l2p_pin_ctx);
1322 
1323 	dev->nv_cache.throttle.blocks_submitted += io->num_blocks;
1324 
1325 	return true;
1326 }
1327 
1328 int
1329 ftl_nv_cache_read(struct ftl_io *io, ftl_addr addr, uint32_t num_blocks,
1330 		  spdk_bdev_io_completion_cb cb, void *cb_arg)
1331 {
1332 	int rc;
1333 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1334 
1335 	assert(ftl_addr_in_nvc(io->dev, addr));
1336 
1337 	rc = ftl_nv_cache_bdev_read_blocks_with_md(io->dev, nv_cache->bdev_desc, nv_cache->cache_ioch,
1338 			ftl_io_iovec_addr(io), NULL, ftl_addr_to_nvc_offset(io->dev, addr),
1339 			num_blocks, cb, cb_arg);
1340 
1341 	return rc;
1342 }
1343 
1344 bool
1345 ftl_nv_cache_is_halted(struct ftl_nv_cache *nv_cache)
1346 {
1347 	if (nv_cache->compaction_active_count) {
1348 		return false;
1349 	}
1350 
1351 	if (nv_cache->chunk_open_count > 0) {
1352 		return false;
1353 	}
1354 
1355 	if (is_compaction_required_for_upgrade(nv_cache)) {
1356 		return false;
1357 	}
1358 
1359 	return true;
1360 }
1361 
1362 void
1363 ftl_chunk_map_set_lba(struct ftl_nv_cache_chunk *chunk,
1364 		      uint64_t offset, uint64_t lba)
1365 {
1366 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1367 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1368 
1369 	ftl_lba_store(dev, p2l_map->chunk_map, offset, lba);
1370 }
1371 
1372 uint64_t
1373 ftl_chunk_map_get_lba(struct ftl_nv_cache_chunk *chunk, uint64_t offset)
1374 {
1375 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1376 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1377 
1378 	return ftl_lba_load(dev, p2l_map->chunk_map, offset);
1379 }
1380 
1381 void
1382 ftl_nv_cache_chunk_set_addr(struct ftl_nv_cache_chunk *chunk, uint64_t lba, ftl_addr addr)
1383 {
1384 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1385 	uint64_t cache_offset = ftl_addr_to_nvc_offset(dev, addr);
1386 	uint64_t offset;
1387 
1388 	offset = (cache_offset - chunk->offset) % chunk->nv_cache->chunk_blocks;
1389 	ftl_chunk_map_set_lba(chunk, offset, lba);
1390 }
1391 
1392 struct ftl_nv_cache_chunk *
1393 ftl_nv_cache_get_chunk_from_addr(struct spdk_ftl_dev *dev, ftl_addr addr)
1394 {
1395 	struct ftl_nv_cache_chunk *chunk = dev->nv_cache.chunks;
1396 	uint64_t chunk_idx;
1397 	uint64_t cache_offset = ftl_addr_to_nvc_offset(dev, addr);
1398 
1399 	assert(chunk != NULL);
1400 	chunk_idx = (cache_offset - chunk->offset) / chunk->nv_cache->chunk_blocks;
1401 	chunk += chunk_idx;
1402 
1403 	return chunk;
1404 }
1405 
1406 void
1407 ftl_nv_cache_set_addr(struct spdk_ftl_dev *dev, uint64_t lba, ftl_addr addr)
1408 {
1409 	struct ftl_nv_cache_chunk *chunk;
1410 
1411 	chunk = ftl_nv_cache_get_chunk_from_addr(dev, addr);
1412 
1413 	assert(lba != FTL_LBA_INVALID);
1414 
1415 	ftl_nv_cache_chunk_set_addr(chunk, lba, addr);
1416 	ftl_bitmap_set(dev->valid_map, addr);
1417 }
1418 
1419 static void
1420 ftl_nv_cache_throttle_update(struct ftl_nv_cache *nv_cache)
1421 {
1422 	double err;
1423 	double modifier;
1424 
1425 	err = ((double)nv_cache->chunk_free_count - nv_cache->chunk_free_target) / nv_cache->chunk_count;
1426 	modifier = FTL_NV_CACHE_THROTTLE_MODIFIER_KP * err;
1427 
1428 	if (modifier < FTL_NV_CACHE_THROTTLE_MODIFIER_MIN) {
1429 		modifier = FTL_NV_CACHE_THROTTLE_MODIFIER_MIN;
1430 	} else if (modifier > FTL_NV_CACHE_THROTTLE_MODIFIER_MAX) {
1431 		modifier = FTL_NV_CACHE_THROTTLE_MODIFIER_MAX;
1432 	}
1433 
1434 	if (spdk_unlikely(nv_cache->compaction_sma == 0 || nv_cache->compaction_active_count == 0)) {
1435 		nv_cache->throttle.blocks_submitted_limit = UINT64_MAX;
1436 	} else {
1437 		double blocks_per_interval = nv_cache->compaction_sma * nv_cache->throttle.interval_tsc /
1438 					     FTL_BLOCK_SIZE;
1439 		nv_cache->throttle.blocks_submitted_limit = blocks_per_interval * (1.0 + modifier);
1440 	}
1441 }
1442 
1443 static void
1444 ftl_nv_cache_process_throttle(struct ftl_nv_cache *nv_cache)
1445 {
1446 	uint64_t tsc = spdk_thread_get_last_tsc(spdk_get_thread());
1447 
1448 	if (spdk_unlikely(!nv_cache->throttle.start_tsc)) {
1449 		nv_cache->throttle.start_tsc = tsc;
1450 	} else if (tsc - nv_cache->throttle.start_tsc >= nv_cache->throttle.interval_tsc) {
1451 		ftl_nv_cache_throttle_update(nv_cache);
1452 		nv_cache->throttle.start_tsc = tsc;
1453 		nv_cache->throttle.blocks_submitted = 0;
1454 	}
1455 }
1456 
1457 static void ftl_chunk_open(struct ftl_nv_cache_chunk *chunk);
1458 
1459 void
1460 ftl_nv_cache_process(struct spdk_ftl_dev *dev)
1461 {
1462 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
1463 
1464 	assert(dev->nv_cache.bdev_desc);
1465 
1466 	if (nv_cache->chunk_open_count < FTL_MAX_OPEN_CHUNKS && spdk_likely(!nv_cache->halt) &&
1467 	    !TAILQ_EMPTY(&nv_cache->chunk_free_list)) {
1468 		struct ftl_nv_cache_chunk *chunk = TAILQ_FIRST(&nv_cache->chunk_free_list);
1469 		TAILQ_REMOVE(&nv_cache->chunk_free_list, chunk, entry);
1470 		TAILQ_INSERT_TAIL(&nv_cache->chunk_open_list, chunk, entry);
1471 		nv_cache->chunk_free_count--;
1472 		chunk->md->seq_id = ftl_get_next_seq_id(dev);
1473 		ftl_chunk_open(chunk);
1474 		ftl_add_io_activity(dev);
1475 	}
1476 
1477 	compaction_process(nv_cache);
1478 	ftl_chunk_persist_free_state(nv_cache);
1479 	ftl_nv_cache_process_throttle(nv_cache);
1480 
1481 	if (nv_cache->nvc_type->ops.process) {
1482 		nv_cache->nvc_type->ops.process(dev);
1483 	}
1484 }
1485 
1486 static bool
1487 ftl_nv_cache_full(struct ftl_nv_cache *nv_cache)
1488 {
1489 	if (0 == nv_cache->chunk_open_count && NULL == nv_cache->chunk_current) {
1490 		return true;
1491 	} else {
1492 		return false;
1493 	}
1494 }
1495 
1496 bool
1497 ftl_nv_cache_throttle(struct spdk_ftl_dev *dev)
1498 {
1499 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
1500 
1501 	if (dev->nv_cache.throttle.blocks_submitted >= nv_cache->throttle.blocks_submitted_limit ||
1502 	    ftl_nv_cache_full(nv_cache)) {
1503 		return true;
1504 	}
1505 
1506 	return false;
1507 }
1508 
1509 static void
1510 chunk_free_p2l_map(struct ftl_nv_cache_chunk *chunk)
1511 {
1512 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
1513 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1514 
1515 	ftl_mempool_put(nv_cache->p2l_pool, p2l_map->chunk_map);
1516 	p2l_map->chunk_map = NULL;
1517 
1518 	ftl_chunk_free_md_entry(chunk);
1519 }
1520 
1521 int
1522 ftl_nv_cache_save_state(struct ftl_nv_cache *nv_cache)
1523 {
1524 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1525 	struct ftl_nv_cache_chunk *chunk;
1526 	int status = 0;
1527 	uint64_t i;
1528 
1529 	assert(nv_cache->chunk_open_count == 0);
1530 
1531 	if (nv_cache->compaction_active_count) {
1532 		FTL_ERRLOG(dev, "Cannot save NV cache state, compaction in progress\n");
1533 		return -EINVAL;
1534 	}
1535 
1536 	chunk = nv_cache->chunks;
1537 	if (!chunk) {
1538 		FTL_ERRLOG(dev, "Cannot save NV cache state, no NV cache metadata\n");
1539 		return -ENOMEM;
1540 	}
1541 
1542 	for (i = 0; i < nv_cache->chunk_count; i++, chunk++) {
1543 		nvc_validate_md(nv_cache, chunk->md);
1544 
1545 		if (chunk->md->read_pointer)  {
1546 			/* Only full chunks can be compacted */
1547 			if (chunk->md->blocks_written != nv_cache->chunk_blocks) {
1548 				assert(0);
1549 				status = -EINVAL;
1550 				break;
1551 			}
1552 
1553 			/*
1554 			 * Chunk in the middle of compaction, start over after
1555 			 * load
1556 			 */
1557 			chunk->md->read_pointer = chunk->md->blocks_compacted = 0;
1558 		} else if (chunk->md->blocks_written == nv_cache->chunk_blocks) {
1559 			/* Full chunk */
1560 		} else if (0 == chunk->md->blocks_written) {
1561 			/* Empty chunk */
1562 		} else {
1563 			assert(0);
1564 			status = -EINVAL;
1565 			break;
1566 		}
1567 	}
1568 
1569 	if (status) {
1570 		FTL_ERRLOG(dev, "Cannot save NV cache state, inconsistent NV cache"
1571 			   "metadata\n");
1572 	}
1573 
1574 	return status;
1575 }
1576 
1577 static int
1578 sort_chunks_cmp(const void *a, const void *b)
1579 {
1580 	struct ftl_nv_cache_chunk *a_chunk = *(struct ftl_nv_cache_chunk **)a;
1581 	struct ftl_nv_cache_chunk *b_chunk = *(struct ftl_nv_cache_chunk **)b;
1582 
1583 	return a_chunk->md->seq_id - b_chunk->md->seq_id;
1584 }
1585 
1586 static int
1587 sort_chunks(struct ftl_nv_cache *nv_cache)
1588 {
1589 	struct ftl_nv_cache_chunk **chunks_list;
1590 	struct ftl_nv_cache_chunk *chunk;
1591 	uint32_t i;
1592 
1593 	if (TAILQ_EMPTY(&nv_cache->chunk_full_list)) {
1594 		return 0;
1595 	}
1596 
1597 	chunks_list = calloc(nv_cache->chunk_full_count,
1598 			     sizeof(chunks_list[0]));
1599 	if (!chunks_list) {
1600 		return -ENOMEM;
1601 	}
1602 
1603 	i = 0;
1604 	TAILQ_FOREACH(chunk, &nv_cache->chunk_full_list, entry) {
1605 		chunks_list[i] = chunk;
1606 		i++;
1607 	}
1608 	assert(i == nv_cache->chunk_full_count);
1609 
1610 	qsort(chunks_list, nv_cache->chunk_full_count, sizeof(chunks_list[0]),
1611 	      sort_chunks_cmp);
1612 
1613 	TAILQ_INIT(&nv_cache->chunk_full_list);
1614 	for (i = 0; i < nv_cache->chunk_full_count; i++) {
1615 		chunk = chunks_list[i];
1616 		TAILQ_INSERT_TAIL(&nv_cache->chunk_full_list, chunk, entry);
1617 	}
1618 
1619 	free(chunks_list);
1620 	return 0;
1621 }
1622 
1623 static int
1624 chunk_alloc_p2l_map(struct ftl_nv_cache_chunk *chunk)
1625 {
1626 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
1627 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1628 
1629 	assert(p2l_map->ref_cnt == 0);
1630 	assert(p2l_map->chunk_map == NULL);
1631 
1632 	p2l_map->chunk_map = ftl_mempool_get(nv_cache->p2l_pool);
1633 
1634 	if (!p2l_map->chunk_map) {
1635 		return -ENOMEM;
1636 	}
1637 
1638 	if (ftl_chunk_alloc_md_entry(chunk)) {
1639 		ftl_mempool_put(nv_cache->p2l_pool, p2l_map->chunk_map);
1640 		p2l_map->chunk_map = NULL;
1641 		return -ENOMEM;
1642 	}
1643 
1644 	/* Set the P2L to FTL_LBA_INVALID */
1645 	memset(p2l_map->chunk_map, -1, FTL_BLOCK_SIZE * nv_cache->tail_md_chunk_blocks);
1646 
1647 	return 0;
1648 }
1649 
1650 int
1651 ftl_nv_cache_load_state(struct ftl_nv_cache *nv_cache)
1652 {
1653 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1654 	struct ftl_nv_cache_chunk *chunk;
1655 	uint64_t chunks_number, offset, i;
1656 	int status = 0;
1657 	bool active;
1658 
1659 	nv_cache->chunk_current = NULL;
1660 	TAILQ_INIT(&nv_cache->chunk_free_list);
1661 	TAILQ_INIT(&nv_cache->chunk_full_list);
1662 	TAILQ_INIT(&nv_cache->chunk_inactive_list);
1663 	nv_cache->chunk_full_count = 0;
1664 	nv_cache->chunk_free_count = 0;
1665 	nv_cache->chunk_inactive_count = 0;
1666 
1667 	assert(nv_cache->chunk_open_count == 0);
1668 	offset = nvc_data_offset(nv_cache);
1669 	if (!nv_cache->chunks) {
1670 		FTL_ERRLOG(dev, "No NV cache metadata\n");
1671 		return -1;
1672 	}
1673 
1674 	if (dev->sb->upgrade_ready) {
1675 		/*
1676 		 * During upgrade some transitions are allowed:
1677 		 *
1678 		 * 1. FREE -> INACTIVE
1679 		 * 2. INACTIVE -> FREE
1680 		 */
1681 		chunk = nv_cache->chunks;
1682 		for (i = 0; i < nv_cache->chunk_count; i++, chunk++) {
1683 			active = nv_cache->nvc_type->ops.is_chunk_active(dev, chunk->offset);
1684 
1685 			if (chunk->md->state == FTL_CHUNK_STATE_FREE) {
1686 				if (!active) {
1687 					chunk->md->state = FTL_CHUNK_STATE_INACTIVE;
1688 				}
1689 			} else if (chunk->md->state == FTL_CHUNK_STATE_INACTIVE) {
1690 				if (active) {
1691 					chunk->md->state = FTL_CHUNK_STATE_FREE;
1692 				}
1693 			}
1694 		}
1695 	}
1696 
1697 	chunk = nv_cache->chunks;
1698 	for (i = 0; i < nv_cache->chunk_count; i++, chunk++) {
1699 		chunk->nv_cache = nv_cache;
1700 		nvc_validate_md(nv_cache, chunk->md);
1701 
1702 		if (offset != chunk->offset) {
1703 			status = -EINVAL;
1704 			goto error;
1705 		}
1706 
1707 		if (chunk->md->version != FTL_NVC_VERSION_CURRENT) {
1708 			status = -EINVAL;
1709 			goto error;
1710 		}
1711 
1712 		active = nv_cache->nvc_type->ops.is_chunk_active(dev, chunk->offset);
1713 		if (false == active) {
1714 			if (chunk->md->state != FTL_CHUNK_STATE_INACTIVE) {
1715 				status = -EINVAL;
1716 				goto error;
1717 			}
1718 		}
1719 
1720 		switch (chunk->md->state) {
1721 		case FTL_CHUNK_STATE_FREE:
1722 			if (chunk->md->blocks_written || chunk->md->write_pointer) {
1723 				status = -EINVAL;
1724 				goto error;
1725 			}
1726 			/* Chunk empty, move it on empty list */
1727 			TAILQ_INSERT_TAIL(&nv_cache->chunk_free_list, chunk, entry);
1728 			nv_cache->chunk_free_count++;
1729 			break;
1730 		case FTL_CHUNK_STATE_OPEN:
1731 			/* All chunks needs to closed at this point */
1732 			status = -EINVAL;
1733 			goto error;
1734 			break;
1735 		case FTL_CHUNK_STATE_CLOSED:
1736 			if (chunk->md->blocks_written != nv_cache->chunk_blocks) {
1737 				status = -EINVAL;
1738 				goto error;
1739 			}
1740 			/* Chunk full, move it on full list */
1741 			TAILQ_INSERT_TAIL(&nv_cache->chunk_full_list, chunk, entry);
1742 			nv_cache->chunk_full_count++;
1743 			break;
1744 		case FTL_CHUNK_STATE_INACTIVE:
1745 			TAILQ_INSERT_TAIL(&nv_cache->chunk_inactive_list, chunk, entry);
1746 			nv_cache->chunk_inactive_count++;
1747 			break;
1748 		default:
1749 			status = -EINVAL;
1750 			FTL_ERRLOG(dev, "Invalid chunk state\n");
1751 			goto error;
1752 		}
1753 
1754 		offset += nv_cache->chunk_blocks;
1755 	}
1756 
1757 	chunks_number = nv_cache->chunk_free_count + nv_cache->chunk_full_count +
1758 			nv_cache->chunk_inactive_count;
1759 	assert(nv_cache->chunk_current == NULL);
1760 
1761 	if (chunks_number != nv_cache->chunk_count) {
1762 		FTL_ERRLOG(dev, "Inconsistent NV cache metadata\n");
1763 		status = -EINVAL;
1764 		goto error;
1765 	}
1766 
1767 	status = sort_chunks(nv_cache);
1768 	if (status) {
1769 		FTL_ERRLOG(dev, "FTL NV Cache: sorting chunks ERROR\n");
1770 	}
1771 
1772 	FTL_NOTICELOG(dev, "FTL NV Cache: full chunks = %lu, empty chunks = %lu\n",
1773 		      nv_cache->chunk_full_count, nv_cache->chunk_free_count);
1774 
1775 	if (0 == status) {
1776 		FTL_NOTICELOG(dev, "FTL NV Cache: state loaded successfully\n");
1777 	} else {
1778 		FTL_ERRLOG(dev, "FTL NV Cache: loading state ERROR\n");
1779 	}
1780 
1781 	/* The number of active/inactive chunks calculated at initialization can change at this point due to metadata
1782 	 * upgrade. Recalculate the thresholds that depend on active chunk count.
1783 	 */
1784 	ftl_nv_cache_init_update_limits(dev);
1785 error:
1786 	return status;
1787 }
1788 
1789 void
1790 ftl_nv_cache_get_max_seq_id(struct ftl_nv_cache *nv_cache, uint64_t *open_seq_id,
1791 			    uint64_t *close_seq_id)
1792 {
1793 	uint64_t i, o_seq_id = 0, c_seq_id = 0;
1794 	struct ftl_nv_cache_chunk *chunk;
1795 
1796 	chunk = nv_cache->chunks;
1797 	assert(chunk);
1798 
1799 	/* Iterate over chunks and get their max open and close seq id */
1800 	for (i = 0; i < nv_cache->chunk_count; i++, chunk++) {
1801 		o_seq_id = spdk_max(o_seq_id, chunk->md->seq_id);
1802 		c_seq_id = spdk_max(c_seq_id, chunk->md->close_seq_id);
1803 	}
1804 
1805 	*open_seq_id = o_seq_id;
1806 	*close_seq_id = c_seq_id;
1807 }
1808 
1809 typedef void (*ftl_chunk_ops_cb)(struct ftl_nv_cache_chunk *chunk, void *cntx, bool status);
1810 
1811 static void
1812 write_brq_end(struct spdk_bdev_io *bdev_io, bool success, void *arg)
1813 {
1814 	struct ftl_basic_rq *brq = arg;
1815 	struct ftl_nv_cache_chunk *chunk = brq->io.chunk;
1816 
1817 	ftl_stats_bdev_io_completed(brq->dev, FTL_STATS_TYPE_MD_NV_CACHE, bdev_io);
1818 
1819 	brq->success = success;
1820 	if (spdk_likely(success)) {
1821 		chunk_advance_blocks(chunk->nv_cache, chunk, brq->num_blocks);
1822 	}
1823 
1824 	spdk_bdev_free_io(bdev_io);
1825 	brq->owner.cb(brq);
1826 }
1827 
1828 static void
1829 _ftl_chunk_basic_rq_write(void *_brq)
1830 {
1831 	struct ftl_basic_rq *brq = _brq;
1832 	struct ftl_nv_cache *nv_cache = brq->io.chunk->nv_cache;
1833 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1834 	int rc;
1835 
1836 	rc = ftl_nv_cache_bdev_write_blocks_with_md(dev, nv_cache->bdev_desc, nv_cache->cache_ioch,
1837 			brq->io_payload, NULL, brq->io.addr,
1838 			brq->num_blocks, write_brq_end, brq);
1839 	if (spdk_unlikely(rc)) {
1840 		if (rc == -ENOMEM) {
1841 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1842 			brq->io.bdev_io_wait.bdev = bdev;
1843 			brq->io.bdev_io_wait.cb_fn = _ftl_chunk_basic_rq_write;
1844 			brq->io.bdev_io_wait.cb_arg = brq;
1845 			spdk_bdev_queue_io_wait(bdev, nv_cache->cache_ioch, &brq->io.bdev_io_wait);
1846 		} else {
1847 			ftl_abort();
1848 		}
1849 	}
1850 }
1851 
1852 static void
1853 ftl_chunk_basic_rq_write(struct ftl_nv_cache_chunk *chunk, struct ftl_basic_rq *brq)
1854 {
1855 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
1856 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1857 
1858 	brq->io.chunk = chunk;
1859 	brq->success = false;
1860 
1861 	_ftl_chunk_basic_rq_write(brq);
1862 
1863 	chunk->md->write_pointer += brq->num_blocks;
1864 	dev->stats.io_activity_total += brq->num_blocks;
1865 }
1866 
1867 static void
1868 read_brq_end(struct spdk_bdev_io *bdev_io, bool success, void *arg)
1869 {
1870 	struct ftl_basic_rq *brq = arg;
1871 
1872 	ftl_stats_bdev_io_completed(brq->dev, FTL_STATS_TYPE_MD_NV_CACHE, bdev_io);
1873 
1874 	brq->success = success;
1875 
1876 	brq->owner.cb(brq);
1877 	spdk_bdev_free_io(bdev_io);
1878 }
1879 
1880 static int
1881 ftl_chunk_basic_rq_read(struct ftl_nv_cache_chunk *chunk, struct ftl_basic_rq *brq)
1882 {
1883 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
1884 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1885 	int rc;
1886 
1887 	brq->io.chunk = chunk;
1888 	brq->success = false;
1889 
1890 	rc = ftl_nv_cache_bdev_read_blocks_with_md(dev, nv_cache->bdev_desc, nv_cache->cache_ioch,
1891 			brq->io_payload, NULL, brq->io.addr, brq->num_blocks, read_brq_end, brq);
1892 
1893 	if (spdk_likely(!rc)) {
1894 		dev->stats.io_activity_total += brq->num_blocks;
1895 	}
1896 
1897 	return rc;
1898 }
1899 
1900 static void
1901 chunk_open_cb(int status, void *ctx)
1902 {
1903 	struct ftl_nv_cache_chunk *chunk = (struct ftl_nv_cache_chunk *)ctx;
1904 
1905 	if (spdk_unlikely(status)) {
1906 #ifdef SPDK_FTL_RETRY_ON_ERROR
1907 		ftl_md_persist_entry_retry(&chunk->md_persist_entry_ctx);
1908 		return;
1909 #else
1910 		ftl_abort();
1911 #endif
1912 	}
1913 
1914 	chunk->md->state = FTL_CHUNK_STATE_OPEN;
1915 }
1916 
1917 static void
1918 ftl_chunk_open(struct ftl_nv_cache_chunk *chunk)
1919 {
1920 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1921 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1922 	struct ftl_layout_region *region = ftl_layout_region_get(dev, FTL_LAYOUT_REGION_TYPE_NVC_MD);
1923 	struct ftl_md *md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_NVC_MD];
1924 
1925 	if (chunk_alloc_p2l_map(chunk)) {
1926 		assert(0);
1927 		/*
1928 		 * We control number of opening chunk and it shall be consistent with size of chunk
1929 		 * P2L map pool
1930 		 */
1931 		ftl_abort();
1932 		return;
1933 	}
1934 
1935 	chunk->nv_cache->chunk_open_count++;
1936 
1937 	assert(chunk->md->write_pointer == 0);
1938 	assert(chunk->md->blocks_written == 0);
1939 
1940 	if (dev->nv_cache.nvc_type->ops.on_chunk_open) {
1941 		dev->nv_cache.nvc_type->ops.on_chunk_open(dev, chunk);
1942 	}
1943 
1944 	memcpy(p2l_map->chunk_dma_md, chunk->md, region->entry_size * FTL_BLOCK_SIZE);
1945 	p2l_map->chunk_dma_md->state = FTL_CHUNK_STATE_OPEN;
1946 	p2l_map->chunk_dma_md->p2l_map_checksum = 0;
1947 
1948 	ftl_md_persist_entries(md, get_chunk_idx(chunk), 1, p2l_map->chunk_dma_md,
1949 			       NULL, chunk_open_cb, chunk,
1950 			       &chunk->md_persist_entry_ctx);
1951 }
1952 
1953 static void
1954 chunk_close_cb(int status, void *ctx)
1955 {
1956 	struct ftl_nv_cache_chunk *chunk = (struct ftl_nv_cache_chunk *)ctx;
1957 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
1958 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1959 
1960 	assert(chunk->md->write_pointer == chunk->nv_cache->chunk_blocks);
1961 
1962 	if (spdk_likely(!status)) {
1963 		chunk->md->p2l_map_checksum = chunk->p2l_map.chunk_dma_md->p2l_map_checksum;
1964 		chunk_free_p2l_map(chunk);
1965 
1966 		assert(chunk->nv_cache->chunk_open_count > 0);
1967 		chunk->nv_cache->chunk_open_count--;
1968 
1969 		/* Chunk full move it on full list */
1970 		TAILQ_INSERT_TAIL(&chunk->nv_cache->chunk_full_list, chunk, entry);
1971 		chunk->nv_cache->chunk_full_count++;
1972 
1973 		chunk->nv_cache->last_seq_id = chunk->md->close_seq_id;
1974 
1975 		chunk->md->state = FTL_CHUNK_STATE_CLOSED;
1976 		if (nv_cache->nvc_type->ops.on_chunk_closed) {
1977 			nv_cache->nvc_type->ops.on_chunk_closed(dev, chunk);
1978 		}
1979 	} else {
1980 #ifdef SPDK_FTL_RETRY_ON_ERROR
1981 		ftl_md_persist_entry_retry(&chunk->md_persist_entry_ctx);
1982 #else
1983 		ftl_abort();
1984 #endif
1985 	}
1986 }
1987 
1988 static void
1989 chunk_map_write_cb(struct ftl_basic_rq *brq)
1990 {
1991 	struct ftl_nv_cache_chunk *chunk = brq->io.chunk;
1992 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1993 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1994 	struct ftl_layout_region *region = ftl_layout_region_get(dev, FTL_LAYOUT_REGION_TYPE_NVC_MD);
1995 	struct ftl_md *md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_NVC_MD];
1996 	uint32_t chunk_map_crc;
1997 
1998 	if (spdk_likely(brq->success)) {
1999 		chunk_map_crc = spdk_crc32c_update(p2l_map->chunk_map,
2000 						   chunk->nv_cache->tail_md_chunk_blocks * FTL_BLOCK_SIZE, 0);
2001 		memcpy(p2l_map->chunk_dma_md, chunk->md, region->entry_size * FTL_BLOCK_SIZE);
2002 		p2l_map->chunk_dma_md->state = FTL_CHUNK_STATE_CLOSED;
2003 		p2l_map->chunk_dma_md->p2l_map_checksum = chunk_map_crc;
2004 		ftl_md_persist_entries(md, get_chunk_idx(chunk), 1, chunk->p2l_map.chunk_dma_md,
2005 				       NULL, chunk_close_cb, chunk,
2006 				       &chunk->md_persist_entry_ctx);
2007 	} else {
2008 #ifdef SPDK_FTL_RETRY_ON_ERROR
2009 		/* retry */
2010 		chunk->md->write_pointer -= brq->num_blocks;
2011 		ftl_chunk_basic_rq_write(chunk, brq);
2012 #else
2013 		ftl_abort();
2014 #endif
2015 	}
2016 }
2017 
2018 static void
2019 ftl_chunk_close(struct ftl_nv_cache_chunk *chunk)
2020 {
2021 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
2022 	struct ftl_basic_rq *brq = &chunk->metadata_rq;
2023 	void *metadata = chunk->p2l_map.chunk_map;
2024 
2025 	chunk->md->close_seq_id = ftl_get_next_seq_id(dev);
2026 	ftl_basic_rq_init(dev, brq, metadata, chunk->nv_cache->tail_md_chunk_blocks);
2027 	ftl_basic_rq_set_owner(brq, chunk_map_write_cb, chunk);
2028 
2029 	assert(chunk->md->write_pointer == chunk_tail_md_offset(chunk->nv_cache));
2030 	brq->io.addr = chunk->offset + chunk->md->write_pointer;
2031 
2032 	ftl_chunk_basic_rq_write(chunk, brq);
2033 }
2034 
2035 static int
2036 ftl_chunk_read_tail_md(struct ftl_nv_cache_chunk *chunk, struct ftl_basic_rq *brq,
2037 		       void (*cb)(struct ftl_basic_rq *brq), void *cb_ctx)
2038 {
2039 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
2040 	void *metadata;
2041 	int rc;
2042 
2043 	metadata = chunk->p2l_map.chunk_map;
2044 	ftl_basic_rq_init(dev, brq, metadata, chunk->nv_cache->tail_md_chunk_blocks);
2045 	ftl_basic_rq_set_owner(brq, cb, cb_ctx);
2046 
2047 	brq->io.addr = chunk->offset + chunk_tail_md_offset(chunk->nv_cache);
2048 	rc = ftl_chunk_basic_rq_read(chunk, brq);
2049 
2050 	return rc;
2051 }
2052 
2053 struct restore_chunk_md_ctx {
2054 	ftl_chunk_md_cb cb;
2055 	void *cb_ctx;
2056 	int status;
2057 	uint64_t qd;
2058 	uint64_t id;
2059 };
2060 
2061 static inline bool
2062 is_chunk_count_valid(struct ftl_nv_cache *nv_cache)
2063 {
2064 	uint64_t chunk_count = 0;
2065 
2066 	chunk_count += nv_cache->chunk_open_count;
2067 	chunk_count += nv_cache->chunk_free_count;
2068 	chunk_count += nv_cache->chunk_full_count;
2069 	chunk_count += nv_cache->chunk_comp_count;
2070 	chunk_count += nv_cache->chunk_inactive_count;
2071 
2072 	return chunk_count == nv_cache->chunk_count;
2073 }
2074 
2075 static void
2076 walk_tail_md_cb(struct ftl_basic_rq *brq)
2077 {
2078 	struct ftl_mngt_process *mngt = brq->owner.priv;
2079 	struct ftl_nv_cache_chunk *chunk = brq->io.chunk;
2080 	struct restore_chunk_md_ctx *ctx = ftl_mngt_get_step_ctx(mngt);
2081 	int rc = 0;
2082 
2083 	if (brq->success) {
2084 		rc = ctx->cb(chunk, ctx->cb_ctx);
2085 	} else {
2086 		rc = -EIO;
2087 	}
2088 
2089 	if (rc) {
2090 		ctx->status = rc;
2091 	}
2092 	ctx->qd--;
2093 	chunk_free_p2l_map(chunk);
2094 	ftl_mngt_continue_step(mngt);
2095 }
2096 
2097 static void
2098 ftl_mngt_nv_cache_walk_tail_md(struct spdk_ftl_dev *dev, struct ftl_mngt_process *mngt,
2099 			       uint64_t seq_id, ftl_chunk_md_cb cb, void *cb_ctx)
2100 {
2101 	struct ftl_nv_cache *nvc = &dev->nv_cache;
2102 	struct restore_chunk_md_ctx *ctx;
2103 
2104 	ctx = ftl_mngt_get_step_ctx(mngt);
2105 	if (!ctx) {
2106 		if (ftl_mngt_alloc_step_ctx(mngt, sizeof(*ctx))) {
2107 			ftl_mngt_fail_step(mngt);
2108 			return;
2109 		}
2110 		ctx = ftl_mngt_get_step_ctx(mngt);
2111 		assert(ctx);
2112 
2113 		ctx->cb = cb;
2114 		ctx->cb_ctx = cb_ctx;
2115 	}
2116 
2117 	/*
2118 	 * This function generates a high queue depth and will utilize ftl_mngt_continue_step during completions to make sure all chunks
2119 	 * are processed before returning an error (if any were found) or continuing on.
2120 	 */
2121 	if (0 == ctx->qd && ctx->id == nvc->chunk_count) {
2122 		if (!is_chunk_count_valid(nvc)) {
2123 			FTL_ERRLOG(dev, "Recovery ERROR, invalid number of chunk\n");
2124 			assert(false);
2125 			ctx->status = -EINVAL;
2126 		}
2127 
2128 		if (ctx->status) {
2129 			ftl_mngt_fail_step(mngt);
2130 		} else {
2131 			ftl_mngt_next_step(mngt);
2132 		}
2133 		return;
2134 	}
2135 
2136 	while (ctx->id < nvc->chunk_count) {
2137 		struct ftl_nv_cache_chunk *chunk = &nvc->chunks[ctx->id];
2138 		int rc;
2139 
2140 		if (!chunk->recovery) {
2141 			/* This chunk is inactive or empty and not used in recovery */
2142 			ctx->id++;
2143 			continue;
2144 		}
2145 
2146 		if (seq_id && (chunk->md->close_seq_id <= seq_id)) {
2147 			ctx->id++;
2148 			continue;
2149 		}
2150 
2151 		if (chunk_alloc_p2l_map(chunk)) {
2152 			/* No more free P2L map, break and continue later */
2153 			break;
2154 		}
2155 		ctx->id++;
2156 
2157 		rc = ftl_chunk_read_tail_md(chunk, &chunk->metadata_rq, walk_tail_md_cb, mngt);
2158 
2159 		if (0 == rc) {
2160 			ctx->qd++;
2161 		} else {
2162 			chunk_free_p2l_map(chunk);
2163 			ctx->status = rc;
2164 		}
2165 	}
2166 
2167 	if (0 == ctx->qd) {
2168 		/*
2169 		 * No QD could happen due to all leftover chunks being in free state.
2170 		 * Additionally ftl_chunk_read_tail_md could fail starting with the first IO in a given patch.
2171 		 * For streamlining of all potential error handling (since many chunks are reading P2L at the same time),
2172 		 * we're using ftl_mngt_continue_step to arrive at the same spot of checking for mngt step end (see beginning of function).
2173 		 */
2174 		ftl_mngt_continue_step(mngt);
2175 	}
2176 
2177 }
2178 
2179 void
2180 ftl_mngt_nv_cache_restore_l2p(struct spdk_ftl_dev *dev, struct ftl_mngt_process *mngt,
2181 			      ftl_chunk_md_cb cb, void *cb_ctx)
2182 {
2183 	ftl_mngt_nv_cache_walk_tail_md(dev, mngt, dev->sb->ckpt_seq_id, cb, cb_ctx);
2184 }
2185 
2186 static void
2187 restore_chunk_state_cb(struct spdk_ftl_dev *dev, struct ftl_md *md, int status)
2188 {
2189 	struct ftl_mngt_process *mngt = md->owner.cb_ctx;
2190 	struct ftl_nv_cache *nvc = &dev->nv_cache;
2191 	struct ftl_nv_cache_chunk *chunk;
2192 	uint64_t i;
2193 
2194 	if (status) {
2195 		/* Restore error, end step */
2196 		ftl_mngt_fail_step(mngt);
2197 		return;
2198 	}
2199 
2200 	for (i = 0; i < nvc->chunk_count; i++) {
2201 		chunk = &nvc->chunks[i];
2202 
2203 		if (false == nvc->nvc_type->ops.is_chunk_active(dev, chunk->offset) &&
2204 		    chunk->md->state != FTL_CHUNK_STATE_INACTIVE) {
2205 			status = -EINVAL;
2206 			break;
2207 		}
2208 
2209 		if (chunk->md->version != FTL_NVC_VERSION_CURRENT) {
2210 			status = -EINVAL;
2211 			break;
2212 		}
2213 
2214 		switch (chunk->md->state) {
2215 		case FTL_CHUNK_STATE_FREE:
2216 			break;
2217 		case FTL_CHUNK_STATE_OPEN:
2218 			TAILQ_REMOVE(&nvc->chunk_free_list, chunk, entry);
2219 			nvc->chunk_free_count--;
2220 
2221 			TAILQ_INSERT_TAIL(&nvc->chunk_open_list, chunk, entry);
2222 			nvc->chunk_open_count++;
2223 
2224 			/* Chunk is not empty, mark it to be recovered */
2225 			chunk->recovery = true;
2226 			break;
2227 		case FTL_CHUNK_STATE_CLOSED:
2228 			TAILQ_REMOVE(&nvc->chunk_free_list, chunk, entry);
2229 			nvc->chunk_free_count--;
2230 
2231 			TAILQ_INSERT_TAIL(&nvc->chunk_full_list, chunk, entry);
2232 			nvc->chunk_full_count++;
2233 
2234 			/* Chunk is not empty, mark it to be recovered */
2235 			chunk->recovery = true;
2236 			break;
2237 		case FTL_CHUNK_STATE_INACTIVE:
2238 			break;
2239 		default:
2240 			status = -EINVAL;
2241 		}
2242 	}
2243 
2244 	if (status) {
2245 		ftl_mngt_fail_step(mngt);
2246 	} else {
2247 		ftl_mngt_next_step(mngt);
2248 	}
2249 }
2250 
2251 void
2252 ftl_mngt_nv_cache_restore_chunk_state(struct spdk_ftl_dev *dev, struct ftl_mngt_process *mngt)
2253 {
2254 	struct ftl_md *md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_NVC_MD];
2255 
2256 	md->owner.cb_ctx = mngt;
2257 	md->cb = restore_chunk_state_cb;
2258 	ftl_md_restore(md);
2259 }
2260 
2261 struct recover_open_chunk_ctx {
2262 	struct ftl_nv_cache_chunk *chunk;
2263 };
2264 
2265 static void
2266 recover_open_chunk_prepare(struct spdk_ftl_dev *dev, struct ftl_mngt_process *mngt)
2267 {
2268 	struct ftl_nv_cache *nvc = &dev->nv_cache;
2269 	struct recover_open_chunk_ctx *ctx = ftl_mngt_get_process_ctx(mngt);
2270 
2271 	ftl_bug(TAILQ_EMPTY(&nvc->chunk_open_list));
2272 	ctx->chunk = TAILQ_FIRST(&nvc->chunk_open_list);
2273 
2274 	FTL_NOTICELOG(dev, "Start recovery open chunk, offset = %"PRIu64", seq id %"PRIu64"\n",
2275 		      ctx->chunk->offset, ctx->chunk->md->seq_id);
2276 
2277 	if (chunk_alloc_p2l_map(ctx->chunk)) {
2278 		ftl_mngt_fail_step(mngt);
2279 	} else {
2280 		ftl_mngt_next_step(mngt);
2281 	}
2282 }
2283 
2284 static void
2285 recover_open_chunk_persist_p2l_map_cb(struct ftl_basic_rq *rq)
2286 {
2287 	struct ftl_mngt_process *mngt = rq->owner.priv;
2288 
2289 	if (rq->success) {
2290 		ftl_mngt_next_step(mngt);
2291 	} else {
2292 		ftl_mngt_fail_step(mngt);
2293 	}
2294 }
2295 
2296 static void
2297 recover_open_chunk_persist_p2l_map(struct spdk_ftl_dev *dev, struct ftl_mngt_process *mngt)
2298 {
2299 	struct recover_open_chunk_ctx *ctx = ftl_mngt_get_process_ctx(mngt);
2300 	struct ftl_nv_cache_chunk *chunk = ctx->chunk;
2301 	struct ftl_basic_rq *rq = ftl_mngt_get_step_ctx(mngt);
2302 	void *p2l_map = chunk->p2l_map.chunk_map;
2303 
2304 	ftl_basic_rq_init(dev, rq, p2l_map, chunk->nv_cache->tail_md_chunk_blocks);
2305 	ftl_basic_rq_set_owner(rq, recover_open_chunk_persist_p2l_map_cb, mngt);
2306 
2307 	rq->io.addr = chunk->offset + chunk_tail_md_offset(chunk->nv_cache);
2308 	ftl_chunk_basic_rq_write(chunk, rq);
2309 }
2310 
2311 static void
2312 recover_open_chunk_close_chunk_cb(int status, void *cb_arg)
2313 {
2314 	struct ftl_mngt_process *mngt = cb_arg;
2315 	struct recover_open_chunk_ctx *ctx = ftl_mngt_get_process_ctx(mngt);
2316 	struct ftl_nv_cache_chunk *chunk = ctx->chunk;
2317 	struct ftl_nv_cache *nvc = chunk->nv_cache;
2318 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nvc, struct spdk_ftl_dev, nv_cache);
2319 
2320 	if (0 == status) {
2321 		*chunk->md = *chunk->p2l_map.chunk_dma_md;
2322 
2323 		FTL_NOTICELOG(dev, "Recovered chunk, offset = %"PRIu64", seq id %"PRIu64"\n", chunk->offset,
2324 			      chunk->md->seq_id);
2325 
2326 		TAILQ_REMOVE(&nvc->chunk_open_list, chunk, entry);
2327 		nvc->chunk_open_count--;
2328 
2329 		TAILQ_INSERT_TAIL(&nvc->chunk_full_list, chunk, entry);
2330 		nvc->chunk_full_count++;
2331 
2332 		ftl_mngt_next_step(mngt);
2333 	} else {
2334 		ftl_mngt_fail_step(mngt);
2335 	}
2336 }
2337 
2338 static void
2339 recover_open_chunk_close_chunk(struct spdk_ftl_dev *dev, struct ftl_mngt_process *mngt)
2340 {
2341 	struct recover_open_chunk_ctx *ctx = ftl_mngt_get_process_ctx(mngt);
2342 	struct ftl_nv_cache_chunk *chunk = ctx->chunk;
2343 	struct ftl_nv_cache_chunk_md *chunk_md = chunk->p2l_map.chunk_dma_md;
2344 	struct ftl_md *md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_NVC_MD];
2345 
2346 	*chunk_md = *chunk->md;
2347 
2348 	chunk_md->state = FTL_CHUNK_STATE_CLOSED;
2349 	chunk_md->write_pointer = chunk->nv_cache->chunk_blocks;
2350 	chunk_md->blocks_written = chunk->nv_cache->chunk_blocks;
2351 	chunk_md->p2l_map_checksum = spdk_crc32c_update(chunk->p2l_map.chunk_map,
2352 				     chunk->nv_cache->tail_md_chunk_blocks * FTL_BLOCK_SIZE, 0);
2353 
2354 	ftl_md_persist_entries(md, get_chunk_idx(chunk), 1, chunk_md, NULL,
2355 			       recover_open_chunk_close_chunk_cb, mngt,
2356 			       &chunk->md_persist_entry_ctx);
2357 }
2358 
2359 static void
2360 recover_open_chunk_execute(struct spdk_ftl_dev *dev, struct ftl_mngt_process *mngt)
2361 {
2362 	struct recover_open_chunk_ctx *ctx = ftl_mngt_get_process_ctx(mngt);
2363 	struct ftl_nv_cache *nvc = ctx->chunk->nv_cache;
2364 
2365 	nvc->nvc_type->ops.recover_open_chunk(dev, mngt, ctx->chunk);
2366 }
2367 
2368 static void
2369 recover_open_chunk_cleanup(struct spdk_ftl_dev *dev, struct ftl_mngt_process *mngt)
2370 {
2371 	struct recover_open_chunk_ctx *ctx = ftl_mngt_get_process_ctx(mngt);
2372 	struct ftl_nv_cache_chunk *chunk = ctx->chunk;
2373 
2374 	if (chunk->p2l_map.chunk_map) {
2375 		chunk_free_p2l_map(ctx->chunk);
2376 	}
2377 	ftl_mngt_next_step(mngt);
2378 }
2379 
2380 static const struct ftl_mngt_process_desc desc_recover_open_chunk = {
2381 	.name = "Recover open chunk",
2382 	.ctx_size = sizeof(struct recover_open_chunk_ctx),
2383 	.steps = {
2384 		{
2385 			.name = "Chunk recovery, prepare",
2386 			.action = recover_open_chunk_prepare,
2387 			.cleanup = recover_open_chunk_cleanup
2388 		},
2389 		{
2390 			.name = "Chunk recovery, execute",
2391 			.action = recover_open_chunk_execute,
2392 		},
2393 		{
2394 			.name = "Chunk recovery, persist P2L map",
2395 			.ctx_size = sizeof(struct ftl_basic_rq),
2396 			.action = recover_open_chunk_persist_p2l_map,
2397 		},
2398 		{
2399 			.name = "Chunk recovery, close chunk",
2400 			.action = recover_open_chunk_close_chunk,
2401 		},
2402 		{
2403 			.name = "Chunk recovery, cleanup",
2404 			.action = recover_open_chunk_cleanup,
2405 		},
2406 		{}
2407 	}
2408 };
2409 
2410 static void
2411 ftl_mngt_nv_cache_recover_open_chunk_cb(struct spdk_ftl_dev *dev, void *ctx, int status)
2412 {
2413 	struct ftl_mngt_process *mngt = ctx;
2414 
2415 	if (status) {
2416 		ftl_mngt_fail_step(mngt);
2417 	} else {
2418 		ftl_mngt_continue_step(mngt);
2419 	}
2420 }
2421 
2422 void
2423 ftl_mngt_nv_cache_recover_open_chunk(struct spdk_ftl_dev *dev, struct ftl_mngt_process *mngt)
2424 {
2425 	struct ftl_nv_cache *nvc = &dev->nv_cache;
2426 
2427 	if (TAILQ_EMPTY(&nvc->chunk_open_list)) {
2428 		if (!is_chunk_count_valid(nvc)) {
2429 			FTL_ERRLOG(dev, "Recovery ERROR, invalid number of chunk\n");
2430 			ftl_mngt_fail_step(mngt);
2431 			return;
2432 		}
2433 
2434 		/*
2435 		 * Now all chunks loaded and closed, do final step of restoring
2436 		 * chunks state
2437 		 */
2438 		if (ftl_nv_cache_load_state(nvc)) {
2439 			ftl_mngt_fail_step(mngt);
2440 		} else {
2441 			ftl_mngt_next_step(mngt);
2442 		}
2443 	} else {
2444 		if (ftl_mngt_process_execute(dev, &desc_recover_open_chunk,
2445 					     ftl_mngt_nv_cache_recover_open_chunk_cb, mngt)) {
2446 			ftl_mngt_fail_step(mngt);
2447 		}
2448 	}
2449 }
2450 
2451 int
2452 ftl_nv_cache_chunks_busy(struct ftl_nv_cache *nv_cache)
2453 {
2454 	/* chunk_current is migrating to closed status when closing, any others should already be
2455 	 * moved to free chunk list. Also need to wait for free md requests */
2456 	return nv_cache->chunk_open_count == 0 && nv_cache->chunk_free_persist_count == 0;
2457 }
2458 
2459 void
2460 ftl_nv_cache_halt(struct ftl_nv_cache *nv_cache)
2461 {
2462 	struct ftl_nv_cache_chunk *chunk;
2463 	uint64_t free_space;
2464 
2465 	nv_cache->halt = true;
2466 
2467 	/* Set chunks on open list back to free state since no user data has been written to it */
2468 	while (!TAILQ_EMPTY(&nv_cache->chunk_open_list)) {
2469 		chunk = TAILQ_FIRST(&nv_cache->chunk_open_list);
2470 
2471 		/* Chunks are moved between lists on metadata update submission, but state is changed
2472 		 * on completion. Breaking early in such a case to make sure all the necessary resources
2473 		 * will be freed (during next pass(es) of ftl_nv_cache_halt).
2474 		 */
2475 		if (chunk->md->state != FTL_CHUNK_STATE_OPEN) {
2476 			break;
2477 		}
2478 
2479 		TAILQ_REMOVE(&nv_cache->chunk_open_list, chunk, entry);
2480 		chunk_free_p2l_map(chunk);
2481 		ftl_nv_cache_chunk_md_initialize(chunk->md);
2482 		assert(nv_cache->chunk_open_count > 0);
2483 		nv_cache->chunk_open_count--;
2484 	}
2485 
2486 	/* Close current chunk by skipping all not written blocks */
2487 	chunk = nv_cache->chunk_current;
2488 	if (chunk != NULL) {
2489 		nv_cache->chunk_current = NULL;
2490 		if (chunk_is_closed(chunk)) {
2491 			return;
2492 		}
2493 
2494 		free_space = chunk_get_free_space(nv_cache, chunk);
2495 		chunk->md->blocks_skipped = free_space;
2496 		chunk->md->blocks_written += free_space;
2497 		chunk->md->write_pointer += free_space;
2498 		ftl_chunk_close(chunk);
2499 	}
2500 }
2501 
2502 uint64_t
2503 ftl_nv_cache_acquire_trim_seq_id(struct ftl_nv_cache *nv_cache)
2504 {
2505 	struct ftl_nv_cache_chunk *chunk = nv_cache->chunk_current;
2506 	uint64_t seq_id, free_space;
2507 
2508 	if (!chunk) {
2509 		chunk = TAILQ_FIRST(&nv_cache->chunk_open_list);
2510 		if (chunk && chunk->md->state == FTL_CHUNK_STATE_OPEN) {
2511 			return chunk->md->seq_id;
2512 		} else {
2513 			return 0;
2514 		}
2515 	}
2516 
2517 	if (chunk_is_closed(chunk)) {
2518 		return 0;
2519 	}
2520 
2521 	seq_id = nv_cache->chunk_current->md->seq_id;
2522 	free_space = chunk_get_free_space(nv_cache, chunk);
2523 
2524 	chunk->md->blocks_skipped = free_space;
2525 	chunk->md->blocks_written += free_space;
2526 	chunk->md->write_pointer += free_space;
2527 	if (chunk->md->blocks_written == chunk_tail_md_offset(nv_cache)) {
2528 		ftl_chunk_close(chunk);
2529 	}
2530 	nv_cache->chunk_current = NULL;
2531 
2532 	seq_id++;
2533 	return seq_id;
2534 }
2535 
2536 static double
2537 ftl_nv_cache_get_chunk_utilization(struct ftl_nv_cache *nv_cache,
2538 				   struct ftl_nv_cache_chunk *chunk)
2539 {
2540 	double capacity = nv_cache->chunk_blocks;
2541 	double used = chunk->md->blocks_written + chunk->md->blocks_skipped;
2542 
2543 	return used / capacity;
2544 }
2545 
2546 static const char *
2547 ftl_nv_cache_get_chunk_state_name(struct ftl_nv_cache_chunk *chunk)
2548 {
2549 	static const char *names[] = {
2550 		"FREE", "OPEN", "CLOSED", "INACTIVE"
2551 	};
2552 
2553 	assert(chunk->md->state < SPDK_COUNTOF(names));
2554 	if (chunk->md->state < SPDK_COUNTOF(names)) {
2555 		return names[chunk->md->state];
2556 	} else {
2557 		assert(false);
2558 		return "?";
2559 	}
2560 }
2561 
2562 static void
2563 ftl_property_dump_cache_dev(struct spdk_ftl_dev *dev, const struct ftl_property *property,
2564 			    struct spdk_json_write_ctx *w)
2565 {
2566 	uint64_t i;
2567 	struct ftl_nv_cache_chunk *chunk;
2568 
2569 	spdk_json_write_named_string(w, "type", dev->nv_cache.nvc_type->name);
2570 	spdk_json_write_named_array_begin(w, "chunks");
2571 	for (i = 0, chunk = dev->nv_cache.chunks; i < dev->nv_cache.chunk_count; i++, chunk++) {
2572 		spdk_json_write_object_begin(w);
2573 		spdk_json_write_named_uint64(w, "id", i);
2574 		spdk_json_write_named_string(w, "state", ftl_nv_cache_get_chunk_state_name(chunk));
2575 		spdk_json_write_named_double(w, "utilization",
2576 					     ftl_nv_cache_get_chunk_utilization(&dev->nv_cache, chunk));
2577 		spdk_json_write_object_end(w);
2578 	}
2579 	spdk_json_write_array_end(w);
2580 }
2581 
2582 void
2583 ftl_nv_cache_chunk_md_initialize(struct ftl_nv_cache_chunk_md *md)
2584 {
2585 	memset(md, 0, sizeof(*md));
2586 	md->version = FTL_NVC_VERSION_CURRENT;
2587 }
2588