xref: /spdk/lib/ftl/ftl_nv_cache.c (revision b02581a89058ebaebe03bd0e16e3b58adfe406c1)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2022 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 
7 #include "spdk/bdev.h"
8 #include "spdk/bdev_module.h"
9 #include "spdk/ftl.h"
10 #include "spdk/string.h"
11 
12 #include "ftl_nv_cache.h"
13 #include "ftl_nv_cache_io.h"
14 #include "ftl_core.h"
15 #include "ftl_band.h"
16 #include "utils/ftl_addr_utils.h"
17 #include "mngt/ftl_mngt.h"
18 
19 static inline uint64_t nvc_data_blocks(struct ftl_nv_cache *nv_cache) __attribute__((unused));
20 static struct ftl_nv_cache_compactor *compactor_alloc(struct spdk_ftl_dev *dev);
21 static void compactor_free(struct spdk_ftl_dev *dev, struct ftl_nv_cache_compactor *compactor);
22 static void compaction_process_ftl_done(struct ftl_rq *rq);
23 static void compaction_process_read_entry(void *arg);
24 static void ftl_property_dump_cache_dev(struct spdk_ftl_dev *dev,
25 					const struct ftl_property *property,
26 					struct spdk_json_write_ctx *w);
27 
28 static inline const struct ftl_layout_region *
29 nvc_data_region(struct ftl_nv_cache *nv_cache)
30 {
31 	struct spdk_ftl_dev *dev;
32 
33 	dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
34 	return ftl_layout_region_get(dev, FTL_LAYOUT_REGION_TYPE_DATA_NVC);
35 }
36 
37 static inline void
38 nvc_validate_md(struct ftl_nv_cache *nv_cache,
39 		struct ftl_nv_cache_chunk_md *chunk_md)
40 {
41 	struct ftl_md *md = nv_cache->md;
42 	void *buffer = ftl_md_get_buffer(md);
43 	uint64_t size = ftl_md_get_buffer_size(md);
44 	void *ptr = chunk_md;
45 
46 	if (ptr < buffer) {
47 		ftl_abort();
48 	}
49 
50 	ptr += sizeof(*chunk_md);
51 	if (ptr > buffer + size) {
52 		ftl_abort();
53 	}
54 }
55 
56 static inline uint64_t
57 nvc_data_offset(struct ftl_nv_cache *nv_cache)
58 {
59 	return nvc_data_region(nv_cache)->current.offset;
60 }
61 
62 static inline uint64_t
63 nvc_data_blocks(struct ftl_nv_cache *nv_cache)
64 {
65 	return nvc_data_region(nv_cache)->current.blocks;
66 }
67 
68 size_t
69 ftl_nv_cache_chunk_tail_md_num_blocks(const struct ftl_nv_cache *nv_cache)
70 {
71 	struct spdk_ftl_dev *dev =  SPDK_CONTAINEROF(nv_cache,
72 				    struct spdk_ftl_dev, nv_cache);
73 	return spdk_divide_round_up(dev->layout.nvc.chunk_data_blocks * dev->layout.l2p.addr_size,
74 				    FTL_BLOCK_SIZE);
75 }
76 
77 static size_t
78 nv_cache_p2l_map_pool_elem_size(const struct ftl_nv_cache *nv_cache)
79 {
80 	/* Map pool element holds the whole tail md */
81 	return nv_cache->tail_md_chunk_blocks * FTL_BLOCK_SIZE;
82 }
83 
84 static uint64_t
85 get_chunk_idx(struct ftl_nv_cache_chunk *chunk)
86 {
87 	struct ftl_nv_cache_chunk *first_chunk = chunk->nv_cache->chunks;
88 
89 	return (chunk->offset - first_chunk->offset) / chunk->nv_cache->chunk_blocks;
90 }
91 
92 int
93 ftl_nv_cache_init(struct spdk_ftl_dev *dev)
94 {
95 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
96 	struct ftl_nv_cache_chunk *chunk;
97 	struct ftl_nv_cache_chunk_md *md;
98 	struct ftl_nv_cache_compactor *compactor;
99 	uint64_t i, offset;
100 
101 	nv_cache->halt = true;
102 
103 	nv_cache->md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_NVC_MD];
104 	if (!nv_cache->md) {
105 		FTL_ERRLOG(dev, "No NV cache metadata object\n");
106 		return -1;
107 	}
108 
109 	nv_cache->md_pool = ftl_mempool_create(dev->conf.user_io_pool_size,
110 					       nv_cache->md_size * dev->xfer_size,
111 					       FTL_BLOCK_SIZE, SPDK_ENV_SOCKET_ID_ANY);
112 	if (!nv_cache->md_pool) {
113 		FTL_ERRLOG(dev, "Failed to initialize NV cache metadata pool\n");
114 		return -1;
115 	}
116 
117 	/*
118 	 * Initialize chunk info
119 	 */
120 	nv_cache->chunk_blocks = dev->layout.nvc.chunk_data_blocks;
121 	nv_cache->chunk_count = dev->layout.nvc.chunk_count;
122 	nv_cache->tail_md_chunk_blocks = ftl_nv_cache_chunk_tail_md_num_blocks(nv_cache);
123 
124 	/* Allocate chunks */
125 	nv_cache->chunks = calloc(nv_cache->chunk_count,
126 				  sizeof(nv_cache->chunks[0]));
127 	if (!nv_cache->chunks) {
128 		FTL_ERRLOG(dev, "Failed to initialize NV cache chunks\n");
129 		return -1;
130 	}
131 
132 	TAILQ_INIT(&nv_cache->chunk_free_list);
133 	TAILQ_INIT(&nv_cache->chunk_open_list);
134 	TAILQ_INIT(&nv_cache->chunk_full_list);
135 	TAILQ_INIT(&nv_cache->chunk_comp_list);
136 	TAILQ_INIT(&nv_cache->needs_free_persist_list);
137 
138 	/* First chunk metadata */
139 	md = ftl_md_get_buffer(nv_cache->md);
140 	if (!md) {
141 		FTL_ERRLOG(dev, "No NV cache metadata\n");
142 		return -1;
143 	}
144 
145 	nv_cache->chunk_free_count = nv_cache->chunk_count;
146 
147 	chunk = nv_cache->chunks;
148 	offset = nvc_data_offset(nv_cache);
149 	for (i = 0; i < nv_cache->chunk_count; i++, chunk++, md++) {
150 		chunk->nv_cache = nv_cache;
151 		chunk->md = md;
152 		nvc_validate_md(nv_cache, md);
153 		chunk->offset = offset;
154 		offset += nv_cache->chunk_blocks;
155 		TAILQ_INSERT_TAIL(&nv_cache->chunk_free_list, chunk, entry);
156 	}
157 	assert(offset <= nvc_data_offset(nv_cache) + nvc_data_blocks(nv_cache));
158 
159 	/* Start compaction when full chunks exceed given % of entire chunks */
160 	nv_cache->chunk_compaction_threshold = nv_cache->chunk_count *
161 					       dev->conf.nv_cache.chunk_compaction_threshold / 100;
162 	TAILQ_INIT(&nv_cache->compactor_list);
163 	for (i = 0; i < FTL_NV_CACHE_NUM_COMPACTORS; i++) {
164 		compactor = compactor_alloc(dev);
165 
166 		if (!compactor) {
167 			FTL_ERRLOG(dev, "Cannot allocate compaction process\n");
168 			return -1;
169 		}
170 
171 		TAILQ_INSERT_TAIL(&nv_cache->compactor_list, compactor, entry);
172 	}
173 
174 #define FTL_MAX_OPEN_CHUNKS 2
175 	nv_cache->p2l_pool = ftl_mempool_create(FTL_MAX_OPEN_CHUNKS,
176 						nv_cache_p2l_map_pool_elem_size(nv_cache),
177 						FTL_BLOCK_SIZE,
178 						SPDK_ENV_SOCKET_ID_ANY);
179 	if (!nv_cache->p2l_pool) {
180 		return -ENOMEM;
181 	}
182 
183 	/* One entry per open chunk */
184 	nv_cache->chunk_md_pool = ftl_mempool_create(FTL_MAX_OPEN_CHUNKS,
185 				  sizeof(struct ftl_nv_cache_chunk_md),
186 				  FTL_BLOCK_SIZE,
187 				  SPDK_ENV_SOCKET_ID_ANY);
188 	if (!nv_cache->chunk_md_pool) {
189 		return -ENOMEM;
190 	}
191 
192 	/* Each compactor can be reading a different chunk which it needs to switch state to free to at the end,
193 	 * plus one backup each for high invalidity chunks processing (if there's a backlog of chunks with extremely
194 	 * small, even 0, validity then they can be processed by the compactors quickly and trigger a lot of updates
195 	 * to free state at once) */
196 	nv_cache->free_chunk_md_pool = ftl_mempool_create(2 * FTL_NV_CACHE_NUM_COMPACTORS,
197 				       sizeof(struct ftl_nv_cache_chunk_md),
198 				       FTL_BLOCK_SIZE,
199 				       SPDK_ENV_SOCKET_ID_ANY);
200 	if (!nv_cache->free_chunk_md_pool) {
201 		return -ENOMEM;
202 	}
203 
204 	nv_cache->throttle.interval_tsc = FTL_NV_CACHE_THROTTLE_INTERVAL_MS *
205 					  (spdk_get_ticks_hz() / 1000);
206 	nv_cache->chunk_free_target = spdk_divide_round_up(nv_cache->chunk_count *
207 				      dev->conf.nv_cache.chunk_free_target,
208 				      100);
209 
210 	ftl_property_register(dev, "cache_device", NULL, 0, NULL, NULL, ftl_property_dump_cache_dev, NULL,
211 			      NULL, true);
212 	return 0;
213 }
214 
215 void
216 ftl_nv_cache_deinit(struct spdk_ftl_dev *dev)
217 {
218 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
219 	struct ftl_nv_cache_compactor *compactor;
220 
221 	while (!TAILQ_EMPTY(&nv_cache->compactor_list)) {
222 		compactor = TAILQ_FIRST(&nv_cache->compactor_list);
223 		TAILQ_REMOVE(&nv_cache->compactor_list, compactor, entry);
224 
225 		compactor_free(dev, compactor);
226 	}
227 
228 	ftl_mempool_destroy(nv_cache->md_pool);
229 	ftl_mempool_destroy(nv_cache->p2l_pool);
230 	ftl_mempool_destroy(nv_cache->chunk_md_pool);
231 	ftl_mempool_destroy(nv_cache->free_chunk_md_pool);
232 	nv_cache->md_pool = NULL;
233 	nv_cache->p2l_pool = NULL;
234 	nv_cache->chunk_md_pool = NULL;
235 	nv_cache->free_chunk_md_pool = NULL;
236 
237 	free(nv_cache->chunks);
238 	nv_cache->chunks = NULL;
239 }
240 
241 static uint64_t
242 chunk_get_free_space(struct ftl_nv_cache *nv_cache,
243 		     struct ftl_nv_cache_chunk *chunk)
244 {
245 	assert(chunk->md->write_pointer + nv_cache->tail_md_chunk_blocks <=
246 	       nv_cache->chunk_blocks);
247 	return nv_cache->chunk_blocks - chunk->md->write_pointer -
248 	       nv_cache->tail_md_chunk_blocks;
249 }
250 
251 static bool
252 chunk_is_closed(struct ftl_nv_cache_chunk *chunk)
253 {
254 	return chunk->md->write_pointer == chunk->nv_cache->chunk_blocks;
255 }
256 
257 static void ftl_chunk_close(struct ftl_nv_cache_chunk *chunk);
258 
259 static uint64_t
260 ftl_nv_cache_get_wr_buffer(struct ftl_nv_cache *nv_cache, struct ftl_io *io)
261 {
262 	uint64_t address = FTL_LBA_INVALID;
263 	uint64_t num_blocks = io->num_blocks;
264 	uint64_t free_space;
265 	struct ftl_nv_cache_chunk *chunk;
266 
267 	do {
268 		chunk = nv_cache->chunk_current;
269 		/* Chunk has been closed so pick new one */
270 		if (chunk && chunk_is_closed(chunk))  {
271 			chunk = NULL;
272 		}
273 
274 		if (!chunk) {
275 			chunk = TAILQ_FIRST(&nv_cache->chunk_open_list);
276 			if (chunk && chunk->md->state == FTL_CHUNK_STATE_OPEN) {
277 				TAILQ_REMOVE(&nv_cache->chunk_open_list, chunk, entry);
278 				nv_cache->chunk_current = chunk;
279 			} else {
280 				break;
281 			}
282 		}
283 
284 		free_space = chunk_get_free_space(nv_cache, chunk);
285 
286 		if (free_space >= num_blocks) {
287 			/* Enough space in chunk */
288 
289 			/* Calculate address in NV cache */
290 			address = chunk->offset + chunk->md->write_pointer;
291 
292 			/* Set chunk in IO */
293 			io->nv_cache_chunk = chunk;
294 
295 			/* Move write pointer */
296 			chunk->md->write_pointer += num_blocks;
297 			break;
298 		}
299 
300 		/* Not enough space in nv_cache_chunk */
301 		nv_cache->chunk_current = NULL;
302 
303 		if (0 == free_space) {
304 			continue;
305 		}
306 
307 		chunk->md->blocks_skipped = free_space;
308 		chunk->md->blocks_written += free_space;
309 		chunk->md->write_pointer += free_space;
310 
311 		if (chunk->md->blocks_written == chunk_tail_md_offset(nv_cache)) {
312 			ftl_chunk_close(chunk);
313 		}
314 	} while (1);
315 
316 	return address;
317 }
318 
319 void
320 ftl_nv_cache_fill_md(struct ftl_io *io)
321 {
322 	struct ftl_nv_cache_chunk *chunk = io->nv_cache_chunk;
323 	uint64_t i;
324 	union ftl_md_vss *metadata = io->md;
325 	uint64_t lba = ftl_io_get_lba(io, 0);
326 
327 	for (i = 0; i < io->num_blocks; ++i, lba++, metadata++) {
328 		metadata->nv_cache.lba = lba;
329 		metadata->nv_cache.seq_id = chunk->md->seq_id;
330 	}
331 }
332 
333 uint64_t
334 chunk_tail_md_offset(struct ftl_nv_cache *nv_cache)
335 {
336 	return nv_cache->chunk_blocks - nv_cache->tail_md_chunk_blocks;
337 }
338 
339 static void
340 chunk_advance_blocks(struct ftl_nv_cache *nv_cache, struct ftl_nv_cache_chunk *chunk,
341 		     uint64_t advanced_blocks)
342 {
343 	chunk->md->blocks_written += advanced_blocks;
344 
345 	assert(chunk->md->blocks_written <= nv_cache->chunk_blocks);
346 
347 	if (chunk->md->blocks_written == chunk_tail_md_offset(nv_cache)) {
348 		ftl_chunk_close(chunk);
349 	}
350 }
351 
352 static uint64_t
353 chunk_user_blocks_written(struct ftl_nv_cache_chunk *chunk)
354 {
355 	return chunk->md->blocks_written - chunk->md->blocks_skipped -
356 	       chunk->nv_cache->tail_md_chunk_blocks;
357 }
358 
359 static bool
360 is_chunk_compacted(struct ftl_nv_cache_chunk *chunk)
361 {
362 	assert(chunk->md->blocks_written != 0);
363 
364 	if (chunk_user_blocks_written(chunk) == chunk->md->blocks_compacted) {
365 		return true;
366 	}
367 
368 	return false;
369 }
370 
371 static int
372 ftl_chunk_alloc_md_entry(struct ftl_nv_cache_chunk *chunk)
373 {
374 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
375 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
376 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
377 	struct ftl_layout_region *region = ftl_layout_region_get(dev, FTL_LAYOUT_REGION_TYPE_NVC_MD);
378 
379 	p2l_map->chunk_dma_md = ftl_mempool_get(nv_cache->chunk_md_pool);
380 
381 	if (!p2l_map->chunk_dma_md) {
382 		return -ENOMEM;
383 	}
384 
385 	memset(p2l_map->chunk_dma_md, 0, region->entry_size * FTL_BLOCK_SIZE);
386 	return 0;
387 }
388 
389 static void
390 ftl_chunk_free_md_entry(struct ftl_nv_cache_chunk *chunk)
391 {
392 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
393 
394 	ftl_mempool_put(chunk->nv_cache->chunk_md_pool, p2l_map->chunk_dma_md);
395 	p2l_map->chunk_dma_md = NULL;
396 }
397 
398 static void
399 ftl_chunk_free(struct ftl_nv_cache_chunk *chunk)
400 {
401 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
402 
403 	/* Reset chunk */
404 	memset(chunk->md, 0, sizeof(*chunk->md));
405 
406 	TAILQ_INSERT_TAIL(&nv_cache->needs_free_persist_list, chunk, entry);
407 	nv_cache->chunk_free_persist_count++;
408 }
409 
410 static int
411 ftl_chunk_alloc_chunk_free_entry(struct ftl_nv_cache_chunk *chunk)
412 {
413 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
414 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
415 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
416 	struct ftl_layout_region *region = ftl_layout_region_get(dev, FTL_LAYOUT_REGION_TYPE_NVC_MD);
417 
418 	p2l_map->chunk_dma_md = ftl_mempool_get(nv_cache->free_chunk_md_pool);
419 
420 	if (!p2l_map->chunk_dma_md) {
421 		return -ENOMEM;
422 	}
423 
424 	memset(p2l_map->chunk_dma_md, 0, region->entry_size * FTL_BLOCK_SIZE);
425 	return 0;
426 }
427 
428 static void
429 ftl_chunk_free_chunk_free_entry(struct ftl_nv_cache_chunk *chunk)
430 {
431 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
432 
433 	ftl_mempool_put(chunk->nv_cache->free_chunk_md_pool, p2l_map->chunk_dma_md);
434 	p2l_map->chunk_dma_md = NULL;
435 }
436 
437 static void
438 chunk_free_cb(int status, void *ctx)
439 {
440 	struct ftl_nv_cache_chunk *chunk = (struct ftl_nv_cache_chunk *)ctx;
441 
442 	if (spdk_likely(!status)) {
443 		struct ftl_nv_cache *nv_cache = chunk->nv_cache;
444 
445 		nv_cache->chunk_free_persist_count--;
446 		TAILQ_INSERT_TAIL(&nv_cache->chunk_free_list, chunk, entry);
447 		nv_cache->chunk_free_count++;
448 		nv_cache->chunk_full_count--;
449 		chunk->md->state = FTL_CHUNK_STATE_FREE;
450 		chunk->md->close_seq_id = 0;
451 		ftl_chunk_free_chunk_free_entry(chunk);
452 	} else {
453 #ifdef SPDK_FTL_RETRY_ON_ERROR
454 		ftl_md_persist_entry_retry(&chunk->md_persist_entry_ctx);
455 #else
456 		ftl_abort();
457 #endif
458 	}
459 }
460 
461 static void
462 ftl_chunk_persist_free_state(struct ftl_nv_cache *nv_cache)
463 {
464 	int rc;
465 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
466 	struct ftl_p2l_map *p2l_map;
467 	struct ftl_md *md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_NVC_MD];
468 	struct ftl_layout_region *region = ftl_layout_region_get(dev, FTL_LAYOUT_REGION_TYPE_NVC_MD);
469 	struct ftl_nv_cache_chunk *tchunk, *chunk = NULL;
470 
471 	TAILQ_FOREACH_SAFE(chunk, &nv_cache->needs_free_persist_list, entry, tchunk) {
472 		p2l_map = &chunk->p2l_map;
473 		rc = ftl_chunk_alloc_chunk_free_entry(chunk);
474 		if (rc) {
475 			break;
476 		}
477 
478 		TAILQ_REMOVE(&nv_cache->needs_free_persist_list, chunk, entry);
479 
480 		memcpy(p2l_map->chunk_dma_md, chunk->md, region->entry_size * FTL_BLOCK_SIZE);
481 		p2l_map->chunk_dma_md->state = FTL_CHUNK_STATE_FREE;
482 		p2l_map->chunk_dma_md->close_seq_id = 0;
483 		p2l_map->chunk_dma_md->p2l_map_checksum = 0;
484 
485 		ftl_md_persist_entry(md, get_chunk_idx(chunk), p2l_map->chunk_dma_md, NULL,
486 				     chunk_free_cb, chunk, &chunk->md_persist_entry_ctx);
487 	}
488 }
489 
490 static void
491 compaction_stats_update(struct ftl_nv_cache_chunk *chunk)
492 {
493 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
494 	struct compaction_bw_stats *compaction_bw = &nv_cache->compaction_recent_bw;
495 	double *ptr;
496 
497 	if (spdk_unlikely(chunk->compaction_length_tsc == 0)) {
498 		return;
499 	}
500 
501 	if (spdk_likely(compaction_bw->count == FTL_NV_CACHE_COMPACTION_SMA_N)) {
502 		ptr = compaction_bw->buf + compaction_bw->first;
503 		compaction_bw->first++;
504 		if (compaction_bw->first == FTL_NV_CACHE_COMPACTION_SMA_N) {
505 			compaction_bw->first = 0;
506 		}
507 		compaction_bw->sum -= *ptr;
508 	} else {
509 		ptr = compaction_bw->buf + compaction_bw->count;
510 		compaction_bw->count++;
511 	}
512 
513 	*ptr = (double)chunk->md->blocks_compacted * FTL_BLOCK_SIZE / chunk->compaction_length_tsc;
514 	chunk->compaction_length_tsc = 0;
515 
516 	compaction_bw->sum += *ptr;
517 	nv_cache->compaction_sma = compaction_bw->sum / compaction_bw->count;
518 }
519 
520 static void
521 chunk_compaction_advance(struct ftl_nv_cache_chunk *chunk, uint64_t num_blocks)
522 {
523 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
524 	uint64_t tsc = spdk_thread_get_last_tsc(spdk_get_thread());
525 
526 	chunk->compaction_length_tsc += tsc - chunk->compaction_start_tsc;
527 	chunk->compaction_start_tsc = tsc;
528 
529 	chunk->md->blocks_compacted += num_blocks;
530 	assert(chunk->md->blocks_compacted <= chunk_user_blocks_written(chunk));
531 	if (!is_chunk_compacted(chunk)) {
532 		return;
533 	}
534 
535 	/* Remove chunk from compacted list */
536 	TAILQ_REMOVE(&nv_cache->chunk_comp_list, chunk, entry);
537 	nv_cache->chunk_comp_count--;
538 
539 	compaction_stats_update(chunk);
540 
541 	ftl_chunk_free(chunk);
542 }
543 
544 static bool
545 is_compaction_required_for_upgrade(struct ftl_nv_cache *nv_cache)
546 {
547 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
548 
549 	if (dev->conf.prep_upgrade_on_shutdown) {
550 		if (nv_cache->chunk_full_count || nv_cache->chunk_open_count) {
551 			return true;
552 		}
553 	}
554 
555 	return false;
556 }
557 
558 static bool
559 is_compaction_required(struct ftl_nv_cache *nv_cache)
560 {
561 	if (spdk_unlikely(nv_cache->halt)) {
562 		return is_compaction_required_for_upgrade(nv_cache);
563 	}
564 
565 	if (nv_cache->chunk_full_count >= nv_cache->chunk_compaction_threshold) {
566 		return true;
567 	}
568 
569 	return false;
570 }
571 
572 static void compaction_process_finish_read(struct ftl_nv_cache_compactor *compactor);
573 static void compaction_process_pin_lba(struct ftl_nv_cache_compactor *comp);
574 
575 static void
576 _compaction_process_pin_lba(void *_comp)
577 {
578 	struct ftl_nv_cache_compactor *comp = _comp;
579 
580 	compaction_process_pin_lba(comp);
581 }
582 
583 static void
584 compaction_process_pin_lba_cb(struct spdk_ftl_dev *dev, int status, struct ftl_l2p_pin_ctx *pin_ctx)
585 {
586 	struct ftl_nv_cache_compactor *comp = pin_ctx->cb_ctx;
587 	struct ftl_rq *rq = comp->rq;
588 
589 	if (status) {
590 		rq->iter.status = status;
591 		pin_ctx->lba = FTL_LBA_INVALID;
592 	}
593 
594 	if (--rq->iter.remaining == 0) {
595 		if (rq->iter.status) {
596 			/* unpin and try again */
597 			ftl_rq_unpin(rq);
598 			spdk_thread_send_msg(spdk_get_thread(), _compaction_process_pin_lba, comp);
599 			return;
600 		}
601 
602 		compaction_process_finish_read(comp);
603 	}
604 }
605 
606 static void
607 compaction_process_pin_lba(struct ftl_nv_cache_compactor *comp)
608 {
609 	struct ftl_rq *rq = comp->rq;
610 	struct spdk_ftl_dev *dev = rq->dev;
611 	struct ftl_rq_entry *entry;
612 
613 	assert(rq->iter.count);
614 	rq->iter.remaining = rq->iter.count;
615 	rq->iter.status = 0;
616 
617 	FTL_RQ_ENTRY_LOOP(rq, entry, rq->iter.count) {
618 		struct ftl_nv_cache_chunk *chunk = entry->owner.priv;
619 		struct ftl_l2p_pin_ctx *pin_ctx = &entry->l2p_pin_ctx;
620 		union ftl_md_vss *md = entry->io_md;
621 
622 		if (md->nv_cache.lba == FTL_LBA_INVALID || md->nv_cache.seq_id != chunk->md->seq_id) {
623 			ftl_l2p_pin_skip(dev, compaction_process_pin_lba_cb, comp, pin_ctx);
624 		} else {
625 			ftl_l2p_pin(dev, md->nv_cache.lba, 1, compaction_process_pin_lba_cb, comp, pin_ctx);
626 		}
627 	}
628 }
629 
630 static void
631 compaction_process_read_entry_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
632 {
633 	struct ftl_rq_entry *entry = arg;
634 	struct ftl_rq *rq = ftl_rq_from_entry(entry);
635 	struct spdk_ftl_dev *dev = rq->dev;
636 	struct ftl_nv_cache_compactor *compactor = rq->owner.priv;
637 
638 	ftl_stats_bdev_io_completed(dev, FTL_STATS_TYPE_CMP, bdev_io);
639 
640 	spdk_bdev_free_io(bdev_io);
641 
642 	if (!success) {
643 		/* retry */
644 		spdk_thread_send_msg(spdk_get_thread(), compaction_process_read_entry, entry);
645 		return;
646 	}
647 
648 	assert(rq->iter.remaining >= entry->bdev_io.num_blocks);
649 	rq->iter.remaining -= entry->bdev_io.num_blocks;
650 	if (0 == rq->iter.remaining) {
651 		/* All IOs processed, go to next phase - pining */
652 		compaction_process_pin_lba(compactor);
653 	}
654 }
655 
656 static void
657 compaction_process_read_entry(void *arg)
658 {
659 	struct ftl_rq_entry *entry = arg;
660 	struct ftl_rq *rq = ftl_rq_from_entry(entry);
661 	struct spdk_ftl_dev *dev = rq->dev;
662 
663 	int rc = ftl_nv_cache_bdev_read_blocks_with_md(dev, dev->nv_cache.bdev_desc,
664 			dev->nv_cache.cache_ioch, entry->io_payload, entry->io_md,
665 			entry->bdev_io.offset_blocks, entry->bdev_io.num_blocks,
666 			compaction_process_read_entry_cb, entry);
667 
668 	if (spdk_unlikely(rc)) {
669 		if (rc == -ENOMEM) {
670 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(dev->nv_cache.bdev_desc);
671 			entry->bdev_io.wait_entry.bdev = bdev;
672 			entry->bdev_io.wait_entry.cb_fn = compaction_process_read_entry;
673 			entry->bdev_io.wait_entry.cb_arg = entry;
674 			spdk_bdev_queue_io_wait(bdev, dev->nv_cache.cache_ioch, &entry->bdev_io.wait_entry);
675 		} else {
676 			ftl_abort();
677 		}
678 	}
679 
680 	dev->stats.io_activity_total += entry->bdev_io.num_blocks;
681 }
682 
683 static bool
684 is_chunk_to_read(struct ftl_nv_cache_chunk *chunk)
685 {
686 	assert(chunk->md->blocks_written != 0);
687 
688 	if (chunk_user_blocks_written(chunk) == chunk->md->read_pointer) {
689 		return false;
690 	}
691 
692 	return true;
693 }
694 
695 static struct ftl_nv_cache_chunk *
696 get_chunk_for_compaction(struct ftl_nv_cache *nv_cache)
697 {
698 	struct ftl_nv_cache_chunk *chunk = NULL;
699 
700 	if (!TAILQ_EMPTY(&nv_cache->chunk_comp_list)) {
701 		chunk = TAILQ_FIRST(&nv_cache->chunk_comp_list);
702 		if (is_chunk_to_read(chunk)) {
703 			return chunk;
704 		}
705 	}
706 
707 	if (!TAILQ_EMPTY(&nv_cache->chunk_full_list)) {
708 		chunk = TAILQ_FIRST(&nv_cache->chunk_full_list);
709 		TAILQ_REMOVE(&nv_cache->chunk_full_list, chunk, entry);
710 
711 		assert(chunk->md->write_pointer);
712 	} else {
713 		return NULL;
714 	}
715 
716 	if (spdk_likely(chunk)) {
717 		assert(chunk->md->write_pointer != 0);
718 		TAILQ_INSERT_HEAD(&nv_cache->chunk_comp_list, chunk, entry);
719 		nv_cache->chunk_comp_count++;
720 	}
721 
722 	return chunk;
723 }
724 
725 static uint64_t
726 chunk_blocks_to_read(struct ftl_nv_cache_chunk *chunk)
727 {
728 	uint64_t blocks_written;
729 	uint64_t blocks_to_read;
730 
731 	assert(chunk->md->blocks_written >= chunk->md->blocks_skipped);
732 	blocks_written = chunk_user_blocks_written(chunk);
733 
734 	assert(blocks_written >= chunk->md->read_pointer);
735 	blocks_to_read = blocks_written - chunk->md->read_pointer;
736 
737 	return blocks_to_read;
738 }
739 
740 static void
741 compactor_deactivate(struct ftl_nv_cache_compactor *compactor)
742 {
743 	struct ftl_nv_cache *nv_cache = compactor->nv_cache;
744 
745 	compactor->rq->iter.count = 0;
746 	assert(nv_cache->compaction_active_count);
747 	nv_cache->compaction_active_count--;
748 	TAILQ_INSERT_TAIL(&nv_cache->compactor_list, compactor, entry);
749 }
750 
751 static void
752 compaction_process_invalidate_entry(struct ftl_rq_entry *entry)
753 {
754 	entry->addr = FTL_ADDR_INVALID;
755 	entry->lba = FTL_LBA_INVALID;
756 	entry->seq_id = 0;
757 	entry->owner.priv = NULL;
758 }
759 
760 static void
761 compaction_process_pad(struct ftl_nv_cache_compactor *compactor, uint64_t idx)
762 {
763 	struct ftl_rq *rq = compactor->rq;
764 	struct ftl_rq_entry *entry;
765 
766 	assert(idx < rq->num_blocks);
767 	FTL_RQ_ENTRY_LOOP_FROM(rq, &rq->entries[idx], entry, rq->num_blocks) {
768 		compaction_process_invalidate_entry(entry);
769 	}
770 }
771 
772 static void
773 compaction_process_read(struct ftl_nv_cache_compactor *compactor)
774 {
775 	struct ftl_rq *rq = compactor->rq;
776 	struct ftl_nv_cache *nv_cache = compactor->nv_cache;
777 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
778 	struct ftl_rq_entry *entry, *io;
779 
780 	assert(rq->iter.count);
781 	rq->iter.remaining = rq->iter.count;
782 
783 	io = rq->entries;
784 	io->bdev_io.num_blocks = 1;
785 	io->bdev_io.offset_blocks = ftl_addr_to_nvc_offset(dev, io->addr);
786 	FTL_RQ_ENTRY_LOOP_FROM(rq, &rq->entries[1], entry,  rq->iter.count) {
787 		if (entry->addr == io->addr + io->bdev_io.num_blocks) {
788 			io->bdev_io.num_blocks++;
789 		} else {
790 			compaction_process_read_entry(io);
791 			io = entry;
792 			io->bdev_io.num_blocks = 1;
793 			io->bdev_io.offset_blocks = ftl_addr_to_nvc_offset(dev, io->addr);
794 		}
795 	}
796 	compaction_process_read_entry(io);
797 }
798 
799 static ftl_addr
800 compaction_chunk_read_pos(struct spdk_ftl_dev *dev, struct ftl_nv_cache_chunk *chunk)
801 {
802 	ftl_addr start, pos;
803 	uint64_t skip, to_read = chunk_blocks_to_read(chunk);
804 
805 	if (0 == to_read) {
806 		return FTL_ADDR_INVALID;
807 	}
808 
809 	start = ftl_addr_from_nvc_offset(dev, chunk->offset + chunk->md->read_pointer);
810 	pos = ftl_bitmap_find_first_set(dev->valid_map, start, start + to_read - 1);
811 
812 	if (pos == UINT64_MAX) {
813 		chunk->md->read_pointer += to_read;
814 		chunk_compaction_advance(chunk, to_read);
815 		return FTL_ADDR_INVALID;
816 	}
817 
818 	assert(pos >= start);
819 	skip = pos - start;
820 	if (skip) {
821 		chunk->md->read_pointer += skip;
822 		chunk_compaction_advance(chunk, skip);
823 	}
824 
825 	return pos;
826 }
827 
828 static bool
829 compaction_entry_read_pos(struct ftl_nv_cache *nv_cache, struct ftl_rq_entry *entry)
830 {
831 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
832 	struct ftl_nv_cache_chunk *chunk = NULL;
833 	ftl_addr addr = FTL_ADDR_INVALID;
834 
835 	while (!chunk) {
836 		/* Get currently handled chunk */
837 		chunk = get_chunk_for_compaction(nv_cache);
838 		if (!chunk) {
839 			return false;
840 		}
841 		chunk->compaction_start_tsc = spdk_thread_get_last_tsc(spdk_get_thread());
842 
843 		/* Get next read position in chunk */
844 		addr = compaction_chunk_read_pos(dev, chunk);
845 		if (FTL_ADDR_INVALID == addr) {
846 			chunk = NULL;
847 		}
848 	}
849 
850 	assert(FTL_ADDR_INVALID != addr);
851 
852 	/* Set entry address info and chunk */
853 	entry->addr = addr;
854 	entry->owner.priv = chunk;
855 
856 	/* Move read pointer in the chunk */
857 	chunk->md->read_pointer++;
858 
859 	return true;
860 }
861 
862 static void
863 compaction_process_start(struct ftl_nv_cache_compactor *compactor)
864 {
865 	struct ftl_rq *rq = compactor->rq;
866 	struct ftl_nv_cache *nv_cache = compactor->nv_cache;
867 	struct ftl_rq_entry *entry;
868 
869 	assert(0 == compactor->rq->iter.count);
870 	FTL_RQ_ENTRY_LOOP(rq, entry, rq->num_blocks) {
871 		if (!compaction_entry_read_pos(nv_cache, entry)) {
872 			compaction_process_pad(compactor, entry->index);
873 			break;
874 		}
875 		rq->iter.count++;
876 	}
877 
878 	if (rq->iter.count) {
879 		/* Schedule Read IOs */
880 		compaction_process_read(compactor);
881 	} else {
882 		compactor_deactivate(compactor);
883 	}
884 }
885 
886 static void
887 compaction_process(struct ftl_nv_cache *nv_cache)
888 {
889 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
890 	struct ftl_nv_cache_compactor *compactor;
891 
892 	if (!is_compaction_required(nv_cache)) {
893 		return;
894 	}
895 
896 	compactor = TAILQ_FIRST(&nv_cache->compactor_list);
897 	if (!compactor) {
898 		return;
899 	}
900 
901 	TAILQ_REMOVE(&nv_cache->compactor_list, compactor, entry);
902 	compactor->nv_cache->compaction_active_count++;
903 	compaction_process_start(compactor);
904 	ftl_add_io_activity(dev);
905 }
906 
907 static void
908 compaction_process_ftl_done(struct ftl_rq *rq)
909 {
910 	struct spdk_ftl_dev *dev = rq->dev;
911 	struct ftl_nv_cache_compactor *compactor = rq->owner.priv;
912 	struct ftl_band *band = rq->io.band;
913 	struct ftl_rq_entry *entry;
914 	ftl_addr addr;
915 
916 	if (spdk_unlikely(false == rq->success)) {
917 		/* IO error retry writing */
918 #ifdef SPDK_FTL_RETRY_ON_ERROR
919 		ftl_writer_queue_rq(&dev->writer_user, rq);
920 		return;
921 #else
922 		ftl_abort();
923 #endif
924 	}
925 
926 	assert(rq->iter.count);
927 
928 	/* Update L2P table */
929 	addr = rq->io.addr;
930 	FTL_RQ_ENTRY_LOOP(rq, entry, rq->iter.count) {
931 		struct ftl_nv_cache_chunk *chunk = entry->owner.priv;
932 
933 		if (entry->lba != FTL_LBA_INVALID) {
934 			ftl_l2p_update_base(dev, entry->lba, addr, entry->addr);
935 			ftl_l2p_unpin(dev, entry->lba, 1);
936 			chunk_compaction_advance(chunk, 1);
937 		} else {
938 			assert(entry->addr == FTL_ADDR_INVALID);
939 		}
940 
941 		addr = ftl_band_next_addr(band, addr, 1);
942 		compaction_process_invalidate_entry(entry);
943 	}
944 
945 	compactor_deactivate(compactor);
946 }
947 
948 static void
949 compaction_process_finish_read(struct ftl_nv_cache_compactor *compactor)
950 {
951 	struct ftl_rq *rq = compactor->rq;
952 	struct spdk_ftl_dev *dev = rq->dev;
953 	struct ftl_rq_entry *entry;
954 	ftl_addr current_addr;
955 	uint64_t skip = 0;
956 
957 	FTL_RQ_ENTRY_LOOP(rq, entry, rq->iter.count) {
958 		struct ftl_nv_cache_chunk *chunk = entry->owner.priv;
959 		union ftl_md_vss *md = entry->io_md;
960 
961 		if (md->nv_cache.lba == FTL_LBA_INVALID || md->nv_cache.seq_id != chunk->md->seq_id) {
962 			skip++;
963 			compaction_process_invalidate_entry(entry);
964 			chunk_compaction_advance(chunk, 1);
965 			continue;
966 		}
967 
968 		current_addr = ftl_l2p_get(dev, md->nv_cache.lba);
969 		if (current_addr == entry->addr) {
970 			entry->lba = md->nv_cache.lba;
971 			entry->seq_id = chunk->md->seq_id;
972 		} else {
973 			/* This address already invalidated, just omit this block */
974 			chunk_compaction_advance(chunk, 1);
975 			ftl_l2p_unpin(dev, md->nv_cache.lba, 1);
976 			compaction_process_invalidate_entry(entry);
977 			skip++;
978 		}
979 	}
980 
981 	if (skip < rq->iter.count) {
982 		/*
983 		 * Request contains data to be placed on FTL, compact it
984 		 */
985 		ftl_writer_queue_rq(&dev->writer_user, rq);
986 	} else {
987 		compactor_deactivate(compactor);
988 	}
989 }
990 
991 static void
992 compactor_free(struct spdk_ftl_dev *dev, struct ftl_nv_cache_compactor *compactor)
993 {
994 	if (!compactor) {
995 		return;
996 	}
997 
998 	ftl_rq_del(compactor->rq);
999 	free(compactor);
1000 }
1001 
1002 static struct ftl_nv_cache_compactor *
1003 compactor_alloc(struct spdk_ftl_dev *dev)
1004 {
1005 	struct ftl_nv_cache_compactor *compactor;
1006 	struct ftl_rq_entry *entry;
1007 
1008 	compactor = calloc(1, sizeof(*compactor));
1009 	if (!compactor) {
1010 		goto error;
1011 	}
1012 
1013 	/* Allocate help request for reading */
1014 	compactor->rq = ftl_rq_new(dev, dev->nv_cache.md_size);
1015 	if (!compactor->rq) {
1016 		goto error;
1017 	}
1018 
1019 	compactor->nv_cache = &dev->nv_cache;
1020 	compactor->rq->owner.priv = compactor;
1021 	compactor->rq->owner.cb = compaction_process_ftl_done;
1022 	compactor->rq->owner.compaction = true;
1023 
1024 	FTL_RQ_ENTRY_LOOP(compactor->rq, entry, compactor->rq->num_blocks) {
1025 		compaction_process_invalidate_entry(entry);
1026 	}
1027 
1028 	return compactor;
1029 
1030 error:
1031 	compactor_free(dev, compactor);
1032 	return NULL;
1033 }
1034 
1035 static void
1036 ftl_nv_cache_submit_cb_done(struct ftl_io *io)
1037 {
1038 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1039 
1040 	chunk_advance_blocks(nv_cache, io->nv_cache_chunk, io->num_blocks);
1041 	io->nv_cache_chunk = NULL;
1042 
1043 	ftl_mempool_put(nv_cache->md_pool, io->md);
1044 	ftl_io_complete(io);
1045 }
1046 
1047 static void
1048 ftl_nv_cache_l2p_update(struct ftl_io *io)
1049 {
1050 	struct spdk_ftl_dev *dev = io->dev;
1051 	ftl_addr next_addr = io->addr;
1052 	size_t i;
1053 
1054 	for (i = 0; i < io->num_blocks; ++i, ++next_addr) {
1055 		ftl_l2p_update_cache(dev, ftl_io_get_lba(io, i), next_addr, io->map[i]);
1056 	}
1057 
1058 	ftl_l2p_unpin(dev, io->lba, io->num_blocks);
1059 	ftl_nv_cache_submit_cb_done(io);
1060 }
1061 
1062 static void
1063 ftl_nv_cache_submit_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1064 {
1065 	struct ftl_io *io = cb_arg;
1066 
1067 	ftl_stats_bdev_io_completed(io->dev, FTL_STATS_TYPE_USER, bdev_io);
1068 
1069 	spdk_bdev_free_io(bdev_io);
1070 
1071 	if (spdk_unlikely(!success)) {
1072 		FTL_ERRLOG(io->dev, "Non-volatile cache write failed at %"PRIx64"\n",
1073 			   io->addr);
1074 		io->status = -EIO;
1075 		ftl_nv_cache_submit_cb_done(io);
1076 	} else {
1077 		ftl_nv_cache_l2p_update(io);
1078 	}
1079 }
1080 
1081 static void
1082 nv_cache_write(void *_io)
1083 {
1084 	struct ftl_io *io = _io;
1085 	struct spdk_ftl_dev *dev = io->dev;
1086 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
1087 	int rc;
1088 
1089 	rc = spdk_bdev_writev_blocks_with_md(nv_cache->bdev_desc, nv_cache->cache_ioch,
1090 					     io->iov, io->iov_cnt, io->md,
1091 					     ftl_addr_to_nvc_offset(dev, io->addr), io->num_blocks,
1092 					     ftl_nv_cache_submit_cb, io);
1093 	if (spdk_unlikely(rc)) {
1094 		if (rc == -ENOMEM) {
1095 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1096 			io->bdev_io_wait.bdev = bdev;
1097 			io->bdev_io_wait.cb_fn = nv_cache_write;
1098 			io->bdev_io_wait.cb_arg = io;
1099 			spdk_bdev_queue_io_wait(bdev, nv_cache->cache_ioch, &io->bdev_io_wait);
1100 		} else {
1101 			ftl_abort();
1102 		}
1103 	}
1104 }
1105 
1106 static void
1107 ftl_nv_cache_pin_cb(struct spdk_ftl_dev *dev, int status, struct ftl_l2p_pin_ctx *pin_ctx)
1108 {
1109 	struct ftl_io *io = pin_ctx->cb_ctx;
1110 	size_t i;
1111 
1112 	if (spdk_unlikely(status != 0)) {
1113 		/* Retry on the internal L2P fault */
1114 		FTL_ERRLOG(dev, "Cannot PIN LBA for NV cache write failed at %"PRIx64"\n",
1115 			   io->addr);
1116 		io->status = -EAGAIN;
1117 		ftl_nv_cache_submit_cb_done(io);
1118 		return;
1119 	}
1120 
1121 	/* Remember previous l2p mapping to resolve conflicts in case of outstanding write-after-write */
1122 	for (i = 0; i < io->num_blocks; ++i) {
1123 		io->map[i] = ftl_l2p_get(dev, ftl_io_get_lba(io, i));
1124 	}
1125 
1126 	assert(io->iov_pos == 0);
1127 
1128 	ftl_trace_submission(io->dev, io, io->addr, io->num_blocks);
1129 
1130 	nv_cache_write(io);
1131 }
1132 
1133 bool
1134 ftl_nv_cache_write(struct ftl_io *io)
1135 {
1136 	struct spdk_ftl_dev *dev = io->dev;
1137 	uint64_t cache_offset;
1138 
1139 	io->md = ftl_mempool_get(dev->nv_cache.md_pool);
1140 	if (spdk_unlikely(!io->md)) {
1141 		return false;
1142 	}
1143 
1144 	/* Reserve area on the write buffer cache */
1145 	cache_offset = ftl_nv_cache_get_wr_buffer(&dev->nv_cache, io);
1146 	if (cache_offset == FTL_LBA_INVALID) {
1147 		/* No free space in NV cache, resubmit request */
1148 		ftl_mempool_put(dev->nv_cache.md_pool, io->md);
1149 		return false;
1150 	}
1151 	io->addr = ftl_addr_from_nvc_offset(dev, cache_offset);
1152 	io->nv_cache_chunk = dev->nv_cache.chunk_current;
1153 
1154 	ftl_nv_cache_fill_md(io);
1155 	ftl_l2p_pin(io->dev, io->lba, io->num_blocks,
1156 		    ftl_nv_cache_pin_cb, io,
1157 		    &io->l2p_pin_ctx);
1158 
1159 	dev->nv_cache.throttle.blocks_submitted += io->num_blocks;
1160 
1161 	return true;
1162 }
1163 
1164 int
1165 ftl_nv_cache_read(struct ftl_io *io, ftl_addr addr, uint32_t num_blocks,
1166 		  spdk_bdev_io_completion_cb cb, void *cb_arg)
1167 {
1168 	int rc;
1169 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1170 
1171 	assert(ftl_addr_in_nvc(io->dev, addr));
1172 
1173 	rc = ftl_nv_cache_bdev_read_blocks_with_md(io->dev, nv_cache->bdev_desc, nv_cache->cache_ioch,
1174 			ftl_io_iovec_addr(io), NULL, ftl_addr_to_nvc_offset(io->dev, addr),
1175 			num_blocks, cb, cb_arg);
1176 
1177 	return rc;
1178 }
1179 
1180 bool
1181 ftl_nv_cache_is_halted(struct ftl_nv_cache *nv_cache)
1182 {
1183 	if (nv_cache->compaction_active_count) {
1184 		return false;
1185 	}
1186 
1187 	if (nv_cache->chunk_open_count > 0) {
1188 		return false;
1189 	}
1190 
1191 	if (is_compaction_required_for_upgrade(nv_cache)) {
1192 		return false;
1193 	}
1194 
1195 	return true;
1196 }
1197 
1198 void
1199 ftl_chunk_map_set_lba(struct ftl_nv_cache_chunk *chunk,
1200 		      uint64_t offset, uint64_t lba)
1201 {
1202 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1203 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1204 
1205 	ftl_lba_store(dev, p2l_map->chunk_map, offset, lba);
1206 }
1207 
1208 uint64_t
1209 ftl_chunk_map_get_lba(struct ftl_nv_cache_chunk *chunk, uint64_t offset)
1210 {
1211 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1212 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1213 
1214 	return ftl_lba_load(dev, p2l_map->chunk_map, offset);
1215 }
1216 
1217 static void
1218 ftl_chunk_set_addr(struct ftl_nv_cache_chunk *chunk, uint64_t lba, ftl_addr addr)
1219 {
1220 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1221 	uint64_t cache_offset = ftl_addr_to_nvc_offset(dev, addr);
1222 	uint64_t offset;
1223 
1224 	offset = (cache_offset - chunk->offset) % chunk->nv_cache->chunk_blocks;
1225 	ftl_chunk_map_set_lba(chunk, offset, lba);
1226 }
1227 
1228 struct ftl_nv_cache_chunk *
1229 ftl_nv_cache_get_chunk_from_addr(struct spdk_ftl_dev *dev, ftl_addr addr)
1230 {
1231 	struct ftl_nv_cache_chunk *chunk = dev->nv_cache.chunks;
1232 	uint64_t chunk_idx;
1233 	uint64_t cache_offset = ftl_addr_to_nvc_offset(dev, addr);
1234 
1235 	assert(chunk != NULL);
1236 	chunk_idx = (cache_offset - chunk->offset) / chunk->nv_cache->chunk_blocks;
1237 	chunk += chunk_idx;
1238 
1239 	return chunk;
1240 }
1241 
1242 void
1243 ftl_nv_cache_set_addr(struct spdk_ftl_dev *dev, uint64_t lba, ftl_addr addr)
1244 {
1245 	struct ftl_nv_cache_chunk *chunk;
1246 
1247 	chunk = ftl_nv_cache_get_chunk_from_addr(dev, addr);
1248 
1249 	assert(lba != FTL_LBA_INVALID);
1250 
1251 	ftl_chunk_set_addr(chunk, lba, addr);
1252 	ftl_bitmap_set(dev->valid_map, addr);
1253 }
1254 
1255 static void
1256 ftl_nv_cache_throttle_update(struct ftl_nv_cache *nv_cache)
1257 {
1258 	double err;
1259 	double modifier;
1260 
1261 	err = ((double)nv_cache->chunk_free_count - nv_cache->chunk_free_target) / nv_cache->chunk_count;
1262 	modifier = FTL_NV_CACHE_THROTTLE_MODIFIER_KP * err;
1263 
1264 	if (modifier < FTL_NV_CACHE_THROTTLE_MODIFIER_MIN) {
1265 		modifier = FTL_NV_CACHE_THROTTLE_MODIFIER_MIN;
1266 	} else if (modifier > FTL_NV_CACHE_THROTTLE_MODIFIER_MAX) {
1267 		modifier = FTL_NV_CACHE_THROTTLE_MODIFIER_MAX;
1268 	}
1269 
1270 	if (spdk_unlikely(nv_cache->compaction_sma == 0 || nv_cache->compaction_active_count == 0)) {
1271 		nv_cache->throttle.blocks_submitted_limit = UINT64_MAX;
1272 	} else {
1273 		double blocks_per_interval = nv_cache->compaction_sma * nv_cache->throttle.interval_tsc /
1274 					     FTL_BLOCK_SIZE;
1275 		nv_cache->throttle.blocks_submitted_limit = blocks_per_interval * (1.0 + modifier);
1276 	}
1277 }
1278 
1279 static void
1280 ftl_nv_cache_process_throttle(struct ftl_nv_cache *nv_cache)
1281 {
1282 	uint64_t tsc = spdk_thread_get_last_tsc(spdk_get_thread());
1283 
1284 	if (spdk_unlikely(!nv_cache->throttle.start_tsc)) {
1285 		nv_cache->throttle.start_tsc = tsc;
1286 	} else if (tsc - nv_cache->throttle.start_tsc >= nv_cache->throttle.interval_tsc) {
1287 		ftl_nv_cache_throttle_update(nv_cache);
1288 		nv_cache->throttle.start_tsc = tsc;
1289 		nv_cache->throttle.blocks_submitted = 0;
1290 	}
1291 }
1292 
1293 static void ftl_chunk_open(struct ftl_nv_cache_chunk *chunk);
1294 
1295 void
1296 ftl_nv_cache_process(struct spdk_ftl_dev *dev)
1297 {
1298 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
1299 
1300 	assert(dev->nv_cache.bdev_desc);
1301 
1302 	if (nv_cache->chunk_open_count < FTL_MAX_OPEN_CHUNKS && spdk_likely(!nv_cache->halt) &&
1303 	    !TAILQ_EMPTY(&nv_cache->chunk_free_list)) {
1304 		struct ftl_nv_cache_chunk *chunk = TAILQ_FIRST(&nv_cache->chunk_free_list);
1305 		TAILQ_REMOVE(&nv_cache->chunk_free_list, chunk, entry);
1306 		TAILQ_INSERT_TAIL(&nv_cache->chunk_open_list, chunk, entry);
1307 		nv_cache->chunk_free_count--;
1308 		chunk->md->seq_id = ftl_get_next_seq_id(dev);
1309 		ftl_chunk_open(chunk);
1310 		ftl_add_io_activity(dev);
1311 	}
1312 
1313 	compaction_process(nv_cache);
1314 	ftl_chunk_persist_free_state(nv_cache);
1315 	ftl_nv_cache_process_throttle(nv_cache);
1316 }
1317 
1318 static bool
1319 ftl_nv_cache_full(struct ftl_nv_cache *nv_cache)
1320 {
1321 	if (0 == nv_cache->chunk_open_count && NULL == nv_cache->chunk_current) {
1322 		return true;
1323 	} else {
1324 		return false;
1325 	}
1326 }
1327 
1328 bool
1329 ftl_nv_cache_throttle(struct spdk_ftl_dev *dev)
1330 {
1331 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
1332 
1333 	if (dev->nv_cache.throttle.blocks_submitted >= nv_cache->throttle.blocks_submitted_limit ||
1334 	    ftl_nv_cache_full(nv_cache)) {
1335 		return true;
1336 	}
1337 
1338 	return false;
1339 }
1340 
1341 static void
1342 chunk_free_p2l_map(struct ftl_nv_cache_chunk *chunk)
1343 {
1344 
1345 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
1346 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1347 
1348 	ftl_mempool_put(nv_cache->p2l_pool, p2l_map->chunk_map);
1349 	p2l_map->chunk_map = NULL;
1350 
1351 	ftl_chunk_free_md_entry(chunk);
1352 }
1353 
1354 int
1355 ftl_nv_cache_save_state(struct ftl_nv_cache *nv_cache)
1356 {
1357 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1358 	struct ftl_nv_cache_chunk *chunk;
1359 	int status = 0;
1360 	uint64_t i;
1361 
1362 	assert(nv_cache->chunk_open_count == 0);
1363 
1364 	if (nv_cache->compaction_active_count) {
1365 		FTL_ERRLOG(dev, "Cannot save NV cache state, compaction in progress\n");
1366 		return -EINVAL;
1367 	}
1368 
1369 	chunk = nv_cache->chunks;
1370 	if (!chunk) {
1371 		FTL_ERRLOG(dev, "Cannot save NV cache state, no NV cache metadata\n");
1372 		return -ENOMEM;
1373 	}
1374 
1375 	for (i = 0; i < nv_cache->chunk_count; i++, chunk++) {
1376 		nvc_validate_md(nv_cache, chunk->md);
1377 
1378 		if (chunk->md->read_pointer)  {
1379 			/* Only full chunks can be compacted */
1380 			if (chunk->md->blocks_written != nv_cache->chunk_blocks) {
1381 				assert(0);
1382 				status = -EINVAL;
1383 				break;
1384 			}
1385 
1386 			/*
1387 			 * Chunk in the middle of compaction, start over after
1388 			 * load
1389 			 */
1390 			chunk->md->read_pointer = chunk->md->blocks_compacted = 0;
1391 		} else if (chunk->md->blocks_written == nv_cache->chunk_blocks) {
1392 			/* Full chunk */
1393 		} else if (0 == chunk->md->blocks_written) {
1394 			/* Empty chunk */
1395 		} else {
1396 			assert(0);
1397 			status = -EINVAL;
1398 			break;
1399 		}
1400 	}
1401 
1402 	if (status) {
1403 		FTL_ERRLOG(dev, "Cannot save NV cache state, inconsistent NV cache"
1404 			   "metadata\n");
1405 	}
1406 
1407 	return status;
1408 }
1409 
1410 static int
1411 sort_chunks_cmp(const void *a, const void *b)
1412 {
1413 	struct ftl_nv_cache_chunk *a_chunk = *(struct ftl_nv_cache_chunk **)a;
1414 	struct ftl_nv_cache_chunk *b_chunk = *(struct ftl_nv_cache_chunk **)b;
1415 
1416 	return a_chunk->md->seq_id - b_chunk->md->seq_id;
1417 }
1418 
1419 static int
1420 sort_chunks(struct ftl_nv_cache *nv_cache)
1421 {
1422 	struct ftl_nv_cache_chunk **chunks_list;
1423 	struct ftl_nv_cache_chunk *chunk;
1424 	uint32_t i;
1425 
1426 	if (TAILQ_EMPTY(&nv_cache->chunk_full_list)) {
1427 		return 0;
1428 	}
1429 
1430 	chunks_list = calloc(nv_cache->chunk_full_count,
1431 			     sizeof(chunks_list[0]));
1432 	if (!chunks_list) {
1433 		return -ENOMEM;
1434 	}
1435 
1436 	i = 0;
1437 	TAILQ_FOREACH(chunk, &nv_cache->chunk_full_list, entry) {
1438 		chunks_list[i] = chunk;
1439 		i++;
1440 	}
1441 	assert(i == nv_cache->chunk_full_count);
1442 
1443 	qsort(chunks_list, nv_cache->chunk_full_count, sizeof(chunks_list[0]),
1444 	      sort_chunks_cmp);
1445 
1446 	TAILQ_INIT(&nv_cache->chunk_full_list);
1447 	for (i = 0; i < nv_cache->chunk_full_count; i++) {
1448 		chunk = chunks_list[i];
1449 		TAILQ_INSERT_TAIL(&nv_cache->chunk_full_list, chunk, entry);
1450 	}
1451 
1452 	free(chunks_list);
1453 	return 0;
1454 }
1455 
1456 static int
1457 chunk_alloc_p2l_map(struct ftl_nv_cache_chunk *chunk)
1458 {
1459 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
1460 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1461 
1462 	assert(p2l_map->ref_cnt == 0);
1463 	assert(p2l_map->chunk_map == NULL);
1464 
1465 	p2l_map->chunk_map = ftl_mempool_get(nv_cache->p2l_pool);
1466 
1467 	if (!p2l_map->chunk_map) {
1468 		return -ENOMEM;
1469 	}
1470 
1471 	if (ftl_chunk_alloc_md_entry(chunk)) {
1472 		ftl_mempool_put(nv_cache->p2l_pool, p2l_map->chunk_map);
1473 		p2l_map->chunk_map = NULL;
1474 		return -ENOMEM;
1475 	}
1476 
1477 	/* Set the P2L to FTL_LBA_INVALID */
1478 	memset(p2l_map->chunk_map, -1, FTL_BLOCK_SIZE * nv_cache->tail_md_chunk_blocks);
1479 
1480 	return 0;
1481 }
1482 
1483 int
1484 ftl_nv_cache_load_state(struct ftl_nv_cache *nv_cache)
1485 {
1486 	struct ftl_nv_cache_chunk *chunk;
1487 	uint64_t chunks_number, offset, i;
1488 	int status = 0;
1489 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1490 
1491 	nv_cache->chunk_current = NULL;
1492 	TAILQ_INIT(&nv_cache->chunk_free_list);
1493 	TAILQ_INIT(&nv_cache->chunk_full_list);
1494 	nv_cache->chunk_full_count = nv_cache->chunk_free_count = 0;
1495 
1496 	assert(nv_cache->chunk_open_count == 0);
1497 	offset = nvc_data_offset(nv_cache);
1498 	chunk = nv_cache->chunks;
1499 	if (!chunk) {
1500 		FTL_ERRLOG(dev, "No NV cache metadata\n");
1501 		return -1;
1502 	}
1503 
1504 	for (i = 0; i < nv_cache->chunk_count; i++, chunk++) {
1505 		chunk->nv_cache = nv_cache;
1506 		nvc_validate_md(nv_cache, chunk->md);
1507 
1508 		if (offset != chunk->offset) {
1509 			status = -EINVAL;
1510 			goto error;
1511 		}
1512 
1513 		if (chunk->md->blocks_written == nv_cache->chunk_blocks) {
1514 			/* Chunk full, move it on full list */
1515 			TAILQ_INSERT_TAIL(&nv_cache->chunk_full_list, chunk, entry);
1516 			nv_cache->chunk_full_count++;
1517 		} else if (0 == chunk->md->blocks_written) {
1518 			/* Chunk empty, move it on empty list */
1519 			TAILQ_INSERT_TAIL(&nv_cache->chunk_free_list, chunk, entry);
1520 			nv_cache->chunk_free_count++;
1521 		} else {
1522 			status = -EINVAL;
1523 			goto error;
1524 		}
1525 
1526 		offset += nv_cache->chunk_blocks;
1527 	}
1528 
1529 	chunks_number = nv_cache->chunk_free_count + nv_cache->chunk_full_count;
1530 	assert(nv_cache->chunk_current == NULL);
1531 
1532 	if (chunks_number != nv_cache->chunk_count) {
1533 		FTL_ERRLOG(dev, "Inconsistent NV cache metadata\n");
1534 		status = -EINVAL;
1535 		goto error;
1536 	}
1537 
1538 	status = sort_chunks(nv_cache);
1539 	if (status) {
1540 		FTL_ERRLOG(dev, "FTL NV Cache: sorting chunks ERROR\n");
1541 	}
1542 
1543 	FTL_NOTICELOG(dev, "FTL NV Cache: full chunks = %lu, empty chunks = %lu\n",
1544 		      nv_cache->chunk_full_count, nv_cache->chunk_free_count);
1545 
1546 	if (0 == status) {
1547 		FTL_NOTICELOG(dev, "FTL NV Cache: state loaded successfully\n");
1548 	} else {
1549 		FTL_ERRLOG(dev, "FTL NV Cache: loading state ERROR\n");
1550 	}
1551 
1552 error:
1553 	return status;
1554 }
1555 
1556 void
1557 ftl_nv_cache_get_max_seq_id(struct ftl_nv_cache *nv_cache, uint64_t *open_seq_id,
1558 			    uint64_t *close_seq_id)
1559 {
1560 	uint64_t i, o_seq_id = 0, c_seq_id = 0;
1561 	struct ftl_nv_cache_chunk *chunk;
1562 
1563 	chunk = nv_cache->chunks;
1564 	assert(chunk);
1565 
1566 	/* Iterate over chunks and get their max open and close seq id */
1567 	for (i = 0; i < nv_cache->chunk_count; i++, chunk++) {
1568 		o_seq_id = spdk_max(o_seq_id, chunk->md->seq_id);
1569 		c_seq_id = spdk_max(c_seq_id, chunk->md->close_seq_id);
1570 	}
1571 
1572 	*open_seq_id = o_seq_id;
1573 	*close_seq_id = c_seq_id;
1574 }
1575 
1576 typedef void (*ftl_chunk_ops_cb)(struct ftl_nv_cache_chunk *chunk, void *cntx, bool status);
1577 
1578 static void
1579 write_brq_end(struct spdk_bdev_io *bdev_io, bool success, void *arg)
1580 {
1581 	struct ftl_basic_rq *brq = arg;
1582 	struct ftl_nv_cache_chunk *chunk = brq->io.chunk;
1583 
1584 	ftl_stats_bdev_io_completed(brq->dev, FTL_STATS_TYPE_MD_NV_CACHE, bdev_io);
1585 
1586 	brq->success = success;
1587 	if (spdk_likely(success)) {
1588 		chunk_advance_blocks(chunk->nv_cache, chunk, brq->num_blocks);
1589 	}
1590 
1591 	spdk_bdev_free_io(bdev_io);
1592 	brq->owner.cb(brq);
1593 }
1594 
1595 static void
1596 _ftl_chunk_basic_rq_write(void *_brq)
1597 {
1598 	struct ftl_basic_rq *brq = _brq;
1599 	struct ftl_nv_cache *nv_cache = brq->io.chunk->nv_cache;
1600 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1601 	int rc;
1602 
1603 	rc = ftl_nv_cache_bdev_write_blocks_with_md(dev, nv_cache->bdev_desc, nv_cache->cache_ioch,
1604 			brq->io_payload, NULL, brq->io.addr,
1605 			brq->num_blocks, write_brq_end, brq);
1606 	if (spdk_unlikely(rc)) {
1607 		if (rc == -ENOMEM) {
1608 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1609 			brq->io.bdev_io_wait.bdev = bdev;
1610 			brq->io.bdev_io_wait.cb_fn = _ftl_chunk_basic_rq_write;
1611 			brq->io.bdev_io_wait.cb_arg = brq;
1612 			spdk_bdev_queue_io_wait(bdev, nv_cache->cache_ioch, &brq->io.bdev_io_wait);
1613 		} else {
1614 			ftl_abort();
1615 		}
1616 	}
1617 }
1618 
1619 static void
1620 ftl_chunk_basic_rq_write(struct ftl_nv_cache_chunk *chunk, struct ftl_basic_rq *brq)
1621 {
1622 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
1623 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1624 
1625 	brq->io.chunk = chunk;
1626 	brq->success = false;
1627 
1628 	_ftl_chunk_basic_rq_write(brq);
1629 
1630 	chunk->md->write_pointer += brq->num_blocks;
1631 	dev->stats.io_activity_total += brq->num_blocks;
1632 }
1633 
1634 static void
1635 read_brq_end(struct spdk_bdev_io *bdev_io, bool success, void *arg)
1636 {
1637 	struct ftl_basic_rq *brq = arg;
1638 
1639 	ftl_stats_bdev_io_completed(brq->dev, FTL_STATS_TYPE_MD_NV_CACHE, bdev_io);
1640 
1641 	brq->success = success;
1642 
1643 	brq->owner.cb(brq);
1644 	spdk_bdev_free_io(bdev_io);
1645 }
1646 
1647 static int
1648 ftl_chunk_basic_rq_read(struct ftl_nv_cache_chunk *chunk, struct ftl_basic_rq *brq)
1649 {
1650 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
1651 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1652 	int rc;
1653 
1654 	brq->io.chunk = chunk;
1655 	brq->success = false;
1656 
1657 	rc = ftl_nv_cache_bdev_read_blocks_with_md(dev, nv_cache->bdev_desc, nv_cache->cache_ioch,
1658 			brq->io_payload, NULL, brq->io.addr, brq->num_blocks, read_brq_end, brq);
1659 
1660 	if (spdk_likely(!rc)) {
1661 		dev->stats.io_activity_total += brq->num_blocks;
1662 	}
1663 
1664 	return rc;
1665 }
1666 
1667 static void
1668 chunk_open_cb(int status, void *ctx)
1669 {
1670 	struct ftl_nv_cache_chunk *chunk = (struct ftl_nv_cache_chunk *)ctx;
1671 
1672 	if (spdk_unlikely(status)) {
1673 #ifdef SPDK_FTL_RETRY_ON_ERROR
1674 		ftl_md_persist_entry_retry(&chunk->md_persist_entry_ctx);
1675 		return;
1676 #else
1677 		ftl_abort();
1678 #endif
1679 	}
1680 
1681 	chunk->md->state = FTL_CHUNK_STATE_OPEN;
1682 }
1683 
1684 static void
1685 ftl_chunk_open(struct ftl_nv_cache_chunk *chunk)
1686 {
1687 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1688 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1689 	struct ftl_layout_region *region = ftl_layout_region_get(dev, FTL_LAYOUT_REGION_TYPE_NVC_MD);
1690 	struct ftl_md *md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_NVC_MD];
1691 
1692 	if (chunk_alloc_p2l_map(chunk)) {
1693 		assert(0);
1694 		/*
1695 		 * We control number of opening chunk and it shall be consistent with size of chunk
1696 		 * P2L map pool
1697 		 */
1698 		ftl_abort();
1699 		return;
1700 	}
1701 
1702 	chunk->nv_cache->chunk_open_count++;
1703 
1704 	assert(chunk->md->write_pointer == 0);
1705 	assert(chunk->md->blocks_written == 0);
1706 
1707 	memcpy(p2l_map->chunk_dma_md, chunk->md, region->entry_size * FTL_BLOCK_SIZE);
1708 	p2l_map->chunk_dma_md->state = FTL_CHUNK_STATE_OPEN;
1709 	p2l_map->chunk_dma_md->p2l_map_checksum = 0;
1710 
1711 	ftl_md_persist_entry(md, get_chunk_idx(chunk), p2l_map->chunk_dma_md,
1712 			     NULL, chunk_open_cb, chunk,
1713 			     &chunk->md_persist_entry_ctx);
1714 }
1715 
1716 static void
1717 chunk_close_cb(int status, void *ctx)
1718 {
1719 	struct ftl_nv_cache_chunk *chunk = (struct ftl_nv_cache_chunk *)ctx;
1720 
1721 	assert(chunk->md->write_pointer == chunk->nv_cache->chunk_blocks);
1722 
1723 	if (spdk_likely(!status)) {
1724 		chunk->md->p2l_map_checksum = chunk->p2l_map.chunk_dma_md->p2l_map_checksum;
1725 		chunk_free_p2l_map(chunk);
1726 
1727 		assert(chunk->nv_cache->chunk_open_count > 0);
1728 		chunk->nv_cache->chunk_open_count--;
1729 
1730 		/* Chunk full move it on full list */
1731 		TAILQ_INSERT_TAIL(&chunk->nv_cache->chunk_full_list, chunk, entry);
1732 		chunk->nv_cache->chunk_full_count++;
1733 
1734 		chunk->nv_cache->last_seq_id = chunk->md->close_seq_id;
1735 
1736 		chunk->md->state = FTL_CHUNK_STATE_CLOSED;
1737 	} else {
1738 #ifdef SPDK_FTL_RETRY_ON_ERROR
1739 		ftl_md_persist_entry_retry(&chunk->md_persist_entry_ctx);
1740 #else
1741 		ftl_abort();
1742 #endif
1743 	}
1744 }
1745 
1746 static void
1747 chunk_map_write_cb(struct ftl_basic_rq *brq)
1748 {
1749 	struct ftl_nv_cache_chunk *chunk = brq->io.chunk;
1750 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1751 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1752 	struct ftl_layout_region *region = ftl_layout_region_get(dev, FTL_LAYOUT_REGION_TYPE_NVC_MD);
1753 	struct ftl_md *md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_NVC_MD];
1754 	uint32_t chunk_map_crc;
1755 
1756 	if (spdk_likely(brq->success)) {
1757 		chunk_map_crc = spdk_crc32c_update(p2l_map->chunk_map,
1758 						   chunk->nv_cache->tail_md_chunk_blocks * FTL_BLOCK_SIZE, 0);
1759 		memcpy(p2l_map->chunk_dma_md, chunk->md, region->entry_size * FTL_BLOCK_SIZE);
1760 		p2l_map->chunk_dma_md->state = FTL_CHUNK_STATE_CLOSED;
1761 		p2l_map->chunk_dma_md->p2l_map_checksum = chunk_map_crc;
1762 		ftl_md_persist_entry(md, get_chunk_idx(chunk), chunk->p2l_map.chunk_dma_md,
1763 				     NULL, chunk_close_cb, chunk,
1764 				     &chunk->md_persist_entry_ctx);
1765 	} else {
1766 #ifdef SPDK_FTL_RETRY_ON_ERROR
1767 		/* retry */
1768 		chunk->md->write_pointer -= brq->num_blocks;
1769 		ftl_chunk_basic_rq_write(chunk, brq);
1770 #else
1771 		ftl_abort();
1772 #endif
1773 	}
1774 }
1775 
1776 static void
1777 ftl_chunk_close(struct ftl_nv_cache_chunk *chunk)
1778 {
1779 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1780 	struct ftl_basic_rq *brq = &chunk->metadata_rq;
1781 	void *metadata = chunk->p2l_map.chunk_map;
1782 
1783 	chunk->md->close_seq_id = ftl_get_next_seq_id(dev);
1784 	ftl_basic_rq_init(dev, brq, metadata, chunk->nv_cache->tail_md_chunk_blocks);
1785 	ftl_basic_rq_set_owner(brq, chunk_map_write_cb, chunk);
1786 
1787 	assert(chunk->md->write_pointer == chunk_tail_md_offset(chunk->nv_cache));
1788 	brq->io.addr = chunk->offset + chunk->md->write_pointer;
1789 
1790 	ftl_chunk_basic_rq_write(chunk, brq);
1791 }
1792 
1793 static int ftl_chunk_read_tail_md(struct ftl_nv_cache_chunk *chunk, struct ftl_basic_rq *brq,
1794 				  void (*cb)(struct ftl_basic_rq *brq), void *cb_ctx);
1795 static void read_tail_md_cb(struct ftl_basic_rq *brq);
1796 static void recover_open_chunk_cb(struct ftl_basic_rq *brq);
1797 
1798 static void
1799 restore_chunk_close_cb(int status, void *ctx)
1800 {
1801 	struct ftl_basic_rq *parent = (struct ftl_basic_rq *)ctx;
1802 	struct ftl_nv_cache_chunk *chunk = parent->io.chunk;
1803 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1804 
1805 	if (spdk_unlikely(status)) {
1806 		parent->success = false;
1807 	} else {
1808 		chunk->md->p2l_map_checksum = p2l_map->chunk_dma_md->p2l_map_checksum;
1809 		chunk->md->state = FTL_CHUNK_STATE_CLOSED;
1810 	}
1811 
1812 	read_tail_md_cb(parent);
1813 }
1814 
1815 static void
1816 restore_fill_p2l_map_cb(struct ftl_basic_rq *parent)
1817 {
1818 	struct ftl_nv_cache_chunk *chunk = parent->io.chunk;
1819 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1820 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1821 	struct ftl_md *md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_NVC_MD];
1822 	struct ftl_layout_region *region = ftl_layout_region_get(dev, FTL_LAYOUT_REGION_TYPE_NVC_MD);
1823 	uint32_t chunk_map_crc;
1824 
1825 	/* Set original callback */
1826 	ftl_basic_rq_set_owner(parent, recover_open_chunk_cb, parent->owner.priv);
1827 
1828 	if (spdk_unlikely(!parent->success)) {
1829 		read_tail_md_cb(parent);
1830 		return;
1831 	}
1832 
1833 	chunk_map_crc = spdk_crc32c_update(p2l_map->chunk_map,
1834 					   chunk->nv_cache->tail_md_chunk_blocks * FTL_BLOCK_SIZE, 0);
1835 	memcpy(p2l_map->chunk_dma_md, chunk->md, region->entry_size * FTL_BLOCK_SIZE);
1836 	p2l_map->chunk_dma_md->state = FTL_CHUNK_STATE_CLOSED;
1837 	p2l_map->chunk_dma_md->write_pointer = chunk->nv_cache->chunk_blocks;
1838 	p2l_map->chunk_dma_md->blocks_written = chunk->nv_cache->chunk_blocks;
1839 	p2l_map->chunk_dma_md->p2l_map_checksum = chunk_map_crc;
1840 
1841 	ftl_md_persist_entry(md, get_chunk_idx(chunk), p2l_map->chunk_dma_md, NULL,
1842 			     restore_chunk_close_cb, parent, &chunk->md_persist_entry_ctx);
1843 }
1844 
1845 static void
1846 restore_fill_tail_md(struct ftl_basic_rq *parent, struct ftl_nv_cache_chunk *chunk)
1847 {
1848 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1849 	void *metadata;
1850 
1851 	chunk->md->close_seq_id = ftl_get_next_seq_id(dev);
1852 
1853 	metadata = chunk->p2l_map.chunk_map;
1854 	ftl_basic_rq_init(dev, parent, metadata, chunk->nv_cache->tail_md_chunk_blocks);
1855 	ftl_basic_rq_set_owner(parent, restore_fill_p2l_map_cb, parent->owner.priv);
1856 
1857 	parent->io.addr = chunk->offset + chunk_tail_md_offset(chunk->nv_cache);
1858 	parent->io.chunk = chunk;
1859 
1860 	ftl_chunk_basic_rq_write(chunk, parent);
1861 }
1862 
1863 static void
1864 read_open_chunk_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1865 {
1866 	struct ftl_rq *rq = (struct ftl_rq *)cb_arg;
1867 	struct ftl_basic_rq *parent = (struct ftl_basic_rq *)rq->owner.priv;
1868 	struct ftl_nv_cache_chunk *chunk = parent->io.chunk;
1869 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
1870 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1871 	union ftl_md_vss *md;
1872 	uint64_t cache_offset = bdev_io->u.bdev.offset_blocks;
1873 	uint64_t len = bdev_io->u.bdev.num_blocks;
1874 	ftl_addr addr = ftl_addr_from_nvc_offset(dev, cache_offset);
1875 	int rc;
1876 
1877 	ftl_stats_bdev_io_completed(dev, FTL_STATS_TYPE_USER, bdev_io);
1878 
1879 	spdk_bdev_free_io(bdev_io);
1880 
1881 	if (!success) {
1882 		parent->success = false;
1883 		read_tail_md_cb(parent);
1884 		return;
1885 	}
1886 
1887 	while (rq->iter.idx < rq->iter.count) {
1888 		/* Get metadata */
1889 		md = rq->entries[rq->iter.idx].io_md;
1890 		if (md->nv_cache.seq_id != chunk->md->seq_id) {
1891 			md->nv_cache.lba = FTL_LBA_INVALID;
1892 		}
1893 		/*
1894 		 * The p2l map contains effectively random data at this point (since it contains arbitrary
1895 		 * blocks from potentially not even filled tail md), so even LBA_INVALID needs to be set explicitly
1896 		 */
1897 
1898 		ftl_chunk_set_addr(chunk,  md->nv_cache.lba, addr + rq->iter.idx);
1899 		rq->iter.idx++;
1900 	}
1901 
1902 	if (cache_offset + len < chunk->offset + chunk_tail_md_offset(nv_cache)) {
1903 		cache_offset += len;
1904 		len = spdk_min(dev->xfer_size, chunk->offset + chunk_tail_md_offset(nv_cache) - cache_offset);
1905 		rq->iter.idx = 0;
1906 		rq->iter.count = len;
1907 
1908 		rc = ftl_nv_cache_bdev_read_blocks_with_md(dev, nv_cache->bdev_desc,
1909 				nv_cache->cache_ioch,
1910 				rq->io_payload,
1911 				rq->io_md,
1912 				cache_offset, len,
1913 				read_open_chunk_cb,
1914 				rq);
1915 
1916 		if (rc) {
1917 			ftl_rq_del(rq);
1918 			parent->success = false;
1919 			read_tail_md_cb(parent);
1920 			return;
1921 		}
1922 	} else {
1923 		ftl_rq_del(rq);
1924 		restore_fill_tail_md(parent, chunk);
1925 	}
1926 }
1927 
1928 static void
1929 restore_open_chunk(struct ftl_nv_cache_chunk *chunk, struct ftl_basic_rq *parent)
1930 {
1931 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
1932 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1933 	struct ftl_rq *rq;
1934 	uint64_t addr;
1935 	uint64_t len = dev->xfer_size;
1936 	int rc;
1937 
1938 	/*
1939 	 * We've just read the p2l map, prefill it with INVALID LBA
1940 	 * TODO we need to do this because tail md blocks (p2l map) are also represented in the p2l map, instead of just user data region
1941 	 */
1942 	memset(chunk->p2l_map.chunk_map, -1, FTL_BLOCK_SIZE * nv_cache->tail_md_chunk_blocks);
1943 
1944 	/* Need to read user data, recalculate chunk's P2L and write tail md with it */
1945 	rq = ftl_rq_new(dev, dev->nv_cache.md_size);
1946 	if (!rq) {
1947 		parent->success = false;
1948 		read_tail_md_cb(parent);
1949 		return;
1950 	}
1951 
1952 	rq->owner.priv = parent;
1953 	rq->iter.idx = 0;
1954 	rq->iter.count = len;
1955 
1956 	addr = chunk->offset;
1957 
1958 	len = spdk_min(dev->xfer_size, chunk->offset + chunk_tail_md_offset(nv_cache) - addr);
1959 
1960 	rc = ftl_nv_cache_bdev_read_blocks_with_md(dev, nv_cache->bdev_desc,
1961 			nv_cache->cache_ioch,
1962 			rq->io_payload,
1963 			rq->io_md,
1964 			addr, len,
1965 			read_open_chunk_cb,
1966 			rq);
1967 
1968 	if (rc) {
1969 		ftl_rq_del(rq);
1970 		parent->success = false;
1971 		read_tail_md_cb(parent);
1972 	}
1973 }
1974 
1975 static void
1976 read_tail_md_cb(struct ftl_basic_rq *brq)
1977 {
1978 	brq->owner.cb(brq);
1979 }
1980 
1981 static int
1982 ftl_chunk_read_tail_md(struct ftl_nv_cache_chunk *chunk, struct ftl_basic_rq *brq,
1983 		       void (*cb)(struct ftl_basic_rq *brq), void *cb_ctx)
1984 {
1985 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1986 	void *metadata;
1987 	int rc;
1988 
1989 	metadata = chunk->p2l_map.chunk_map;
1990 	ftl_basic_rq_init(dev, brq, metadata, chunk->nv_cache->tail_md_chunk_blocks);
1991 	ftl_basic_rq_set_owner(brq, cb, cb_ctx);
1992 
1993 	brq->io.addr = chunk->offset + chunk_tail_md_offset(chunk->nv_cache);
1994 	rc = ftl_chunk_basic_rq_read(chunk, brq);
1995 
1996 	return rc;
1997 }
1998 
1999 struct restore_chunk_md_ctx {
2000 	ftl_chunk_md_cb cb;
2001 	void *cb_ctx;
2002 	int status;
2003 	uint64_t qd;
2004 	uint64_t id;
2005 };
2006 
2007 static inline bool
2008 is_chunk_count_valid(struct ftl_nv_cache *nv_cache)
2009 {
2010 	uint64_t chunk_count = 0;
2011 
2012 	chunk_count += nv_cache->chunk_open_count;
2013 	chunk_count += nv_cache->chunk_free_count;
2014 	chunk_count += nv_cache->chunk_full_count;
2015 	chunk_count += nv_cache->chunk_comp_count;
2016 
2017 	return chunk_count == nv_cache->chunk_count;
2018 }
2019 
2020 static void
2021 walk_tail_md_cb(struct ftl_basic_rq *brq)
2022 {
2023 	struct ftl_mngt_process *mngt = brq->owner.priv;
2024 	struct ftl_nv_cache_chunk *chunk = brq->io.chunk;
2025 	struct restore_chunk_md_ctx *ctx = ftl_mngt_get_step_ctx(mngt);
2026 	int rc = 0;
2027 
2028 	if (brq->success) {
2029 		rc = ctx->cb(chunk, ctx->cb_ctx);
2030 	} else {
2031 		rc = -EIO;
2032 	}
2033 
2034 	if (rc) {
2035 		ctx->status = rc;
2036 	}
2037 	ctx->qd--;
2038 	chunk_free_p2l_map(chunk);
2039 	ftl_mngt_continue_step(mngt);
2040 }
2041 
2042 static void
2043 ftl_mngt_nv_cache_walk_tail_md(struct spdk_ftl_dev *dev, struct ftl_mngt_process *mngt,
2044 			       uint64_t seq_id, ftl_chunk_md_cb cb, void *cb_ctx)
2045 {
2046 	struct ftl_nv_cache *nvc = &dev->nv_cache;
2047 	struct restore_chunk_md_ctx *ctx;
2048 
2049 	ctx = ftl_mngt_get_step_ctx(mngt);
2050 	if (!ctx) {
2051 		if (ftl_mngt_alloc_step_ctx(mngt, sizeof(*ctx))) {
2052 			ftl_mngt_fail_step(mngt);
2053 			return;
2054 		}
2055 		ctx = ftl_mngt_get_step_ctx(mngt);
2056 		assert(ctx);
2057 
2058 		ctx->cb = cb;
2059 		ctx->cb_ctx = cb_ctx;
2060 	}
2061 
2062 	/*
2063 	 * This function generates a high queue depth and will utilize ftl_mngt_continue_step during completions to make sure all chunks
2064 	 * are processed before returning an error (if any were found) or continuing on.
2065 	 */
2066 	if (0 == ctx->qd && ctx->id == nvc->chunk_count) {
2067 		if (!is_chunk_count_valid(nvc)) {
2068 			FTL_ERRLOG(dev, "Recovery ERROR, invalid number of chunk\n");
2069 			assert(false);
2070 			ctx->status = -EINVAL;
2071 		}
2072 
2073 		if (ctx->status) {
2074 			ftl_mngt_fail_step(mngt);
2075 		} else {
2076 			ftl_mngt_next_step(mngt);
2077 		}
2078 		return;
2079 	}
2080 
2081 	while (ctx->id < nvc->chunk_count) {
2082 		struct ftl_nv_cache_chunk *chunk = &nvc->chunks[ctx->id];
2083 		int rc;
2084 
2085 		if (!chunk->recovery) {
2086 			/* This chunk is empty and not used in recovery */
2087 			ctx->id++;
2088 			continue;
2089 		}
2090 
2091 		if (seq_id && (chunk->md->close_seq_id <= seq_id)) {
2092 			ctx->id++;
2093 			continue;
2094 		}
2095 
2096 		if (chunk_alloc_p2l_map(chunk)) {
2097 			/* No more free P2L map, break and continue later */
2098 			break;
2099 		}
2100 		ctx->id++;
2101 
2102 		rc = ftl_chunk_read_tail_md(chunk, &chunk->metadata_rq, walk_tail_md_cb, mngt);
2103 
2104 		if (0 == rc) {
2105 			ctx->qd++;
2106 		} else {
2107 			chunk_free_p2l_map(chunk);
2108 			ctx->status = rc;
2109 		}
2110 	}
2111 
2112 	if (0 == ctx->qd) {
2113 		/*
2114 		 * No QD could happen due to all leftover chunks being in free state.
2115 		 * Additionally ftl_chunk_read_tail_md could fail starting with the first IO in a given patch.
2116 		 * For streamlining of all potential error handling (since many chunks are reading P2L at the same time),
2117 		 * we're using ftl_mngt_continue_step to arrive at the same spot of checking for mngt step end (see beginning of function).
2118 		 */
2119 		ftl_mngt_continue_step(mngt);
2120 	}
2121 
2122 }
2123 
2124 void
2125 ftl_mngt_nv_cache_restore_l2p(struct spdk_ftl_dev *dev, struct ftl_mngt_process *mngt,
2126 			      ftl_chunk_md_cb cb, void *cb_ctx)
2127 {
2128 	ftl_mngt_nv_cache_walk_tail_md(dev, mngt, dev->sb->ckpt_seq_id, cb, cb_ctx);
2129 }
2130 
2131 static void
2132 restore_chunk_state_cb(struct spdk_ftl_dev *dev, struct ftl_md *md, int status)
2133 {
2134 	struct ftl_mngt_process *mngt = md->owner.cb_ctx;
2135 	struct ftl_nv_cache *nvc = &dev->nv_cache;
2136 	struct ftl_nv_cache_chunk *chunk;
2137 	uint64_t i;
2138 
2139 	if (status) {
2140 		/* Restore error, end step */
2141 		ftl_mngt_fail_step(mngt);
2142 		return;
2143 	}
2144 
2145 	for (i = 0; i < nvc->chunk_count; i++) {
2146 		chunk = &nvc->chunks[i];
2147 
2148 		switch (chunk->md->state) {
2149 		case FTL_CHUNK_STATE_FREE:
2150 			break;
2151 		case FTL_CHUNK_STATE_OPEN:
2152 			TAILQ_REMOVE(&nvc->chunk_free_list, chunk, entry);
2153 			nvc->chunk_free_count--;
2154 
2155 			TAILQ_INSERT_TAIL(&nvc->chunk_open_list, chunk, entry);
2156 			nvc->chunk_open_count++;
2157 
2158 			/* Chunk is not empty, mark it to be recovered */
2159 			chunk->recovery = true;
2160 			break;
2161 		case FTL_CHUNK_STATE_CLOSED:
2162 			TAILQ_REMOVE(&nvc->chunk_free_list, chunk, entry);
2163 			nvc->chunk_free_count--;
2164 
2165 			TAILQ_INSERT_TAIL(&nvc->chunk_full_list, chunk, entry);
2166 			nvc->chunk_full_count++;
2167 
2168 			/* Chunk is not empty, mark it to be recovered */
2169 			chunk->recovery = true;
2170 			break;
2171 		default:
2172 			status = -EINVAL;
2173 		}
2174 	}
2175 
2176 	if (status) {
2177 		ftl_mngt_fail_step(mngt);
2178 	} else {
2179 		ftl_mngt_next_step(mngt);
2180 	}
2181 }
2182 
2183 void
2184 ftl_mngt_nv_cache_restore_chunk_state(struct spdk_ftl_dev *dev, struct ftl_mngt_process *mngt)
2185 {
2186 	struct ftl_md *md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_NVC_MD];
2187 
2188 	md->owner.cb_ctx = mngt;
2189 	md->cb = restore_chunk_state_cb;
2190 	ftl_md_restore(md);
2191 }
2192 
2193 static void
2194 recover_open_chunk_cb(struct ftl_basic_rq *brq)
2195 {
2196 	struct ftl_mngt_process *mngt = brq->owner.priv;
2197 	struct ftl_nv_cache_chunk *chunk = brq->io.chunk;
2198 	struct ftl_nv_cache *nvc = chunk->nv_cache;
2199 	struct spdk_ftl_dev *dev = ftl_mngt_get_dev(mngt);
2200 
2201 	chunk_free_p2l_map(chunk);
2202 
2203 	if (!brq->success) {
2204 		FTL_ERRLOG(dev, "Recovery chunk ERROR, offset = %"PRIu64", seq id %"PRIu64"\n", chunk->offset,
2205 			   chunk->md->seq_id);
2206 		ftl_mngt_fail_step(mngt);
2207 		return;
2208 	}
2209 
2210 	FTL_NOTICELOG(dev, "Recovered chunk, offset = %"PRIu64", seq id %"PRIu64"\n", chunk->offset,
2211 		      chunk->md->seq_id);
2212 
2213 	TAILQ_REMOVE(&nvc->chunk_open_list, chunk, entry);
2214 	nvc->chunk_open_count--;
2215 
2216 	TAILQ_INSERT_TAIL(&nvc->chunk_full_list, chunk, entry);
2217 	nvc->chunk_full_count++;
2218 
2219 	/* This is closed chunk */
2220 	chunk->md->write_pointer = nvc->chunk_blocks;
2221 	chunk->md->blocks_written = nvc->chunk_blocks;
2222 
2223 	ftl_mngt_continue_step(mngt);
2224 }
2225 
2226 void
2227 ftl_mngt_nv_cache_recover_open_chunk(struct spdk_ftl_dev *dev, struct ftl_mngt_process *mngt)
2228 {
2229 	struct ftl_nv_cache *nvc = &dev->nv_cache;
2230 	struct ftl_nv_cache_chunk *chunk;
2231 	struct ftl_basic_rq *brq = ftl_mngt_get_step_ctx(mngt);
2232 
2233 	if (!brq) {
2234 		if (TAILQ_EMPTY(&nvc->chunk_open_list)) {
2235 			FTL_NOTICELOG(dev, "No open chunks to recover P2L\n");
2236 			ftl_mngt_next_step(mngt);
2237 			return;
2238 		}
2239 
2240 		if (ftl_mngt_alloc_step_ctx(mngt, sizeof(*brq))) {
2241 			ftl_mngt_fail_step(mngt);
2242 			return;
2243 		}
2244 		brq = ftl_mngt_get_step_ctx(mngt);
2245 		ftl_basic_rq_set_owner(brq, recover_open_chunk_cb, mngt);
2246 	}
2247 
2248 	if (TAILQ_EMPTY(&nvc->chunk_open_list)) {
2249 		if (!is_chunk_count_valid(nvc)) {
2250 			FTL_ERRLOG(dev, "Recovery ERROR, invalid number of chunk\n");
2251 			ftl_mngt_fail_step(mngt);
2252 			return;
2253 		}
2254 
2255 		/*
2256 		 * Now all chunks loaded and closed, do final step of restoring
2257 		 * chunks state
2258 		 */
2259 		if (ftl_nv_cache_load_state(nvc)) {
2260 			ftl_mngt_fail_step(mngt);
2261 		} else {
2262 			ftl_mngt_next_step(mngt);
2263 		}
2264 	} else {
2265 		chunk = TAILQ_FIRST(&nvc->chunk_open_list);
2266 		if (chunk_alloc_p2l_map(chunk)) {
2267 			ftl_mngt_fail_step(mngt);
2268 			return;
2269 		}
2270 
2271 		brq->io.chunk = chunk;
2272 
2273 		FTL_NOTICELOG(dev, "Start recovery open chunk, offset = %"PRIu64", seq id %"PRIu64"\n",
2274 			      chunk->offset, chunk->md->seq_id);
2275 		restore_open_chunk(chunk, brq);
2276 	}
2277 }
2278 
2279 int
2280 ftl_nv_cache_chunks_busy(struct ftl_nv_cache *nv_cache)
2281 {
2282 	/* chunk_current is migrating to closed status when closing, any others should already be
2283 	 * moved to free chunk list. Also need to wait for free md requests */
2284 	return nv_cache->chunk_open_count == 0 && nv_cache->chunk_free_persist_count == 0;
2285 }
2286 
2287 void
2288 ftl_nv_cache_halt(struct ftl_nv_cache *nv_cache)
2289 {
2290 	struct ftl_nv_cache_chunk *chunk;
2291 	uint64_t free_space;
2292 
2293 	nv_cache->halt = true;
2294 
2295 	/* Set chunks on open list back to free state since no user data has been written to it */
2296 	while (!TAILQ_EMPTY(&nv_cache->chunk_open_list)) {
2297 		chunk = TAILQ_FIRST(&nv_cache->chunk_open_list);
2298 
2299 		/* Chunks are moved between lists on metadata update submission, but state is changed
2300 		 * on completion. Breaking early in such a case to make sure all the necessary resources
2301 		 * will be freed (during next pass(es) of ftl_nv_cache_halt).
2302 		 */
2303 		if (chunk->md->state != FTL_CHUNK_STATE_OPEN) {
2304 			break;
2305 		}
2306 
2307 		TAILQ_REMOVE(&nv_cache->chunk_open_list, chunk, entry);
2308 		chunk_free_p2l_map(chunk);
2309 		memset(chunk->md, 0, sizeof(*chunk->md));
2310 		assert(nv_cache->chunk_open_count > 0);
2311 		nv_cache->chunk_open_count--;
2312 	}
2313 
2314 	/* Close current chunk by skipping all not written blocks */
2315 	chunk = nv_cache->chunk_current;
2316 	if (chunk != NULL) {
2317 		nv_cache->chunk_current = NULL;
2318 		if (chunk_is_closed(chunk)) {
2319 			return;
2320 		}
2321 
2322 		free_space = chunk_get_free_space(nv_cache, chunk);
2323 		chunk->md->blocks_skipped = free_space;
2324 		chunk->md->blocks_written += free_space;
2325 		chunk->md->write_pointer += free_space;
2326 		ftl_chunk_close(chunk);
2327 	}
2328 }
2329 
2330 uint64_t
2331 ftl_nv_cache_acquire_trim_seq_id(struct ftl_nv_cache *nv_cache)
2332 {
2333 	struct ftl_nv_cache_chunk *chunk = nv_cache->chunk_current;
2334 	uint64_t seq_id, free_space;
2335 
2336 	if (!chunk) {
2337 		chunk = TAILQ_FIRST(&nv_cache->chunk_open_list);
2338 		if (chunk && chunk->md->state == FTL_CHUNK_STATE_OPEN) {
2339 			return chunk->md->seq_id;
2340 		} else {
2341 			return 0;
2342 		}
2343 	}
2344 
2345 	if (chunk_is_closed(chunk)) {
2346 		return 0;
2347 	}
2348 
2349 	seq_id = nv_cache->chunk_current->md->seq_id;
2350 	free_space = chunk_get_free_space(nv_cache, chunk);
2351 
2352 	chunk->md->blocks_skipped = free_space;
2353 	chunk->md->blocks_written += free_space;
2354 	chunk->md->write_pointer += free_space;
2355 	if (chunk->md->blocks_written == chunk_tail_md_offset(nv_cache)) {
2356 		ftl_chunk_close(chunk);
2357 	}
2358 	nv_cache->chunk_current = NULL;
2359 
2360 	seq_id++;
2361 	return seq_id;
2362 }
2363 
2364 static double
2365 ftl_nv_cache_get_chunk_utilization(struct ftl_nv_cache *nv_cache,
2366 				   struct ftl_nv_cache_chunk *chunk)
2367 {
2368 	double capacity = nv_cache->chunk_blocks;
2369 	double used = chunk->md->blocks_written + chunk->md->blocks_skipped;
2370 
2371 	return used / capacity;
2372 }
2373 
2374 static const char *
2375 ftl_nv_cache_get_chunk_state_name(struct ftl_nv_cache_chunk *chunk)
2376 {
2377 	static const char *names[] = {
2378 		"FREE", "OPEN", "CLOSED",
2379 	};
2380 
2381 	assert(chunk->md->state < SPDK_COUNTOF(names));
2382 	if (chunk->md->state < SPDK_COUNTOF(names)) {
2383 		return names[chunk->md->state];
2384 	} else {
2385 		assert(false);
2386 		return "?";
2387 	}
2388 }
2389 
2390 static void
2391 ftl_property_dump_cache_dev(struct spdk_ftl_dev *dev, const struct ftl_property *property,
2392 			    struct spdk_json_write_ctx *w)
2393 {
2394 	uint64_t i;
2395 	struct ftl_nv_cache_chunk *chunk;
2396 
2397 	spdk_json_write_named_string(w, "type", dev->nv_cache.nvc_desc->name);
2398 	spdk_json_write_named_array_begin(w, "chunks");
2399 	for (i = 0, chunk = dev->nv_cache.chunks; i < dev->nv_cache.chunk_count; i++, chunk++) {
2400 		spdk_json_write_object_begin(w);
2401 		spdk_json_write_named_uint64(w, "id", i);
2402 		spdk_json_write_named_string(w, "state", ftl_nv_cache_get_chunk_state_name(chunk));
2403 		spdk_json_write_named_double(w, "utilization",
2404 					     ftl_nv_cache_get_chunk_utilization(&dev->nv_cache, chunk));
2405 		spdk_json_write_object_end(w);
2406 	}
2407 	spdk_json_write_array_end(w);
2408 }
2409