xref: /spdk/lib/ftl/ftl_nv_cache.c (revision f869197b76ff6981e901b6d9a05789e1b993494a)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 
7 #include "spdk/bdev.h"
8 #include "spdk/bdev_module.h"
9 #include "spdk/ftl.h"
10 #include "spdk/string.h"
11 
12 #include "ftl_nv_cache.h"
13 #include "ftl_nv_cache_io.h"
14 #include "ftl_core.h"
15 #include "ftl_band.h"
16 #include "utils/ftl_addr_utils.h"
17 #include "mngt/ftl_mngt.h"
18 
19 static inline uint64_t nvc_data_blocks(struct ftl_nv_cache *nv_cache) __attribute__((unused));
20 static struct ftl_nv_cache_compactor *compactor_alloc(struct spdk_ftl_dev *dev);
21 static void compactor_free(struct spdk_ftl_dev *dev, struct ftl_nv_cache_compactor *compactor);
22 static void compaction_process_ftl_done(struct ftl_rq *rq);
23 
24 static inline const struct ftl_layout_region *
25 nvc_data_region(struct ftl_nv_cache *nv_cache)
26 {
27 	struct spdk_ftl_dev *dev;
28 
29 	dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
30 	return &dev->layout.region[FTL_LAYOUT_REGION_TYPE_DATA_NVC];
31 }
32 
33 static inline void
34 nvc_validate_md(struct ftl_nv_cache *nv_cache,
35 		struct ftl_nv_cache_chunk_md *chunk_md)
36 {
37 	struct ftl_md *md = nv_cache->md;
38 	void *buffer = ftl_md_get_buffer(md);
39 	uint64_t size = ftl_md_get_buffer_size(md);
40 	void *ptr = chunk_md;
41 
42 	if (ptr < buffer) {
43 		ftl_abort();
44 	}
45 
46 	ptr += sizeof(*chunk_md);
47 	if (ptr > buffer + size) {
48 		ftl_abort();
49 	}
50 }
51 
52 static inline uint64_t
53 nvc_data_offset(struct ftl_nv_cache *nv_cache)
54 {
55 	return nvc_data_region(nv_cache)->current.offset;
56 }
57 
58 static inline uint64_t
59 nvc_data_blocks(struct ftl_nv_cache *nv_cache)
60 {
61 	return nvc_data_region(nv_cache)->current.blocks;
62 }
63 
64 size_t
65 ftl_nv_cache_chunk_tail_md_num_blocks(const struct ftl_nv_cache *nv_cache)
66 {
67 	struct spdk_ftl_dev *dev =  SPDK_CONTAINEROF(nv_cache,
68 				    struct spdk_ftl_dev, nv_cache);
69 	return spdk_divide_round_up(dev->layout.nvc.chunk_data_blocks * dev->layout.l2p.addr_size,
70 				    FTL_BLOCK_SIZE);
71 }
72 
73 static size_t
74 nv_cache_p2l_map_pool_elem_size(const struct ftl_nv_cache *nv_cache)
75 {
76 	/* Map pool element holds the whole tail md */
77 	return nv_cache->tail_md_chunk_blocks * FTL_BLOCK_SIZE;
78 }
79 
80 static uint64_t
81 get_chunk_idx(struct ftl_nv_cache_chunk *chunk)
82 {
83 	struct ftl_nv_cache_chunk *first_chunk = chunk->nv_cache->chunks;
84 
85 	return (chunk->offset - first_chunk->offset) / chunk->nv_cache->chunk_blocks;
86 }
87 
88 int
89 ftl_nv_cache_init(struct spdk_ftl_dev *dev)
90 {
91 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
92 	struct ftl_nv_cache_chunk *chunk;
93 	struct ftl_nv_cache_chunk_md *md;
94 	struct ftl_nv_cache_compactor *compactor;
95 	uint64_t i, offset;
96 
97 	nv_cache->halt = true;
98 
99 	nv_cache->md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_NVC_MD];
100 	if (!nv_cache->md) {
101 		FTL_ERRLOG(dev, "No NV cache metadata object\n");
102 		return -1;
103 	}
104 
105 	nv_cache->md_pool = ftl_mempool_create(dev->conf.user_io_pool_size,
106 					       nv_cache->md_size * dev->xfer_size,
107 					       FTL_BLOCK_SIZE, SPDK_ENV_SOCKET_ID_ANY);
108 	if (!nv_cache->md_pool) {
109 		FTL_ERRLOG(dev, "Failed to initialize NV cache metadata pool\n");
110 		return -1;
111 	}
112 
113 	/*
114 	 * Initialize chunk info
115 	 */
116 	nv_cache->chunk_blocks = dev->layout.nvc.chunk_data_blocks;
117 	nv_cache->chunk_count = dev->layout.nvc.chunk_count;
118 	nv_cache->tail_md_chunk_blocks = ftl_nv_cache_chunk_tail_md_num_blocks(nv_cache);
119 
120 	/* Allocate chunks */
121 	nv_cache->chunks = calloc(nv_cache->chunk_count,
122 				  sizeof(nv_cache->chunks[0]));
123 	if (!nv_cache->chunks) {
124 		FTL_ERRLOG(dev, "Failed to initialize NV cache chunks\n");
125 		return -1;
126 	}
127 
128 	TAILQ_INIT(&nv_cache->chunk_free_list);
129 	TAILQ_INIT(&nv_cache->chunk_open_list);
130 	TAILQ_INIT(&nv_cache->chunk_full_list);
131 	TAILQ_INIT(&nv_cache->chunk_comp_list);
132 	TAILQ_INIT(&nv_cache->needs_free_persist_list);
133 
134 	/* First chunk metadata */
135 	md = ftl_md_get_buffer(nv_cache->md);
136 	if (!md) {
137 		FTL_ERRLOG(dev, "No NV cache metadata\n");
138 		return -1;
139 	}
140 
141 	nv_cache->chunk_free_count = nv_cache->chunk_count;
142 
143 	chunk = nv_cache->chunks;
144 	offset = nvc_data_offset(nv_cache);
145 	for (i = 0; i < nv_cache->chunk_count; i++, chunk++, md++) {
146 		chunk->nv_cache = nv_cache;
147 		chunk->md = md;
148 		nvc_validate_md(nv_cache, md);
149 		chunk->offset = offset;
150 		offset += nv_cache->chunk_blocks;
151 		TAILQ_INSERT_TAIL(&nv_cache->chunk_free_list, chunk, entry);
152 	}
153 	assert(offset <= nvc_data_offset(nv_cache) + nvc_data_blocks(nv_cache));
154 
155 	/* Start compaction when full chunks exceed given % of entire chunks */
156 	nv_cache->chunk_compaction_threshold = nv_cache->chunk_count *
157 					       dev->conf.nv_cache.chunk_compaction_threshold / 100;
158 	TAILQ_INIT(&nv_cache->compactor_list);
159 	for (i = 0; i < FTL_NV_CACHE_NUM_COMPACTORS; i++) {
160 		compactor = compactor_alloc(dev);
161 
162 		if (!compactor) {
163 			FTL_ERRLOG(dev, "Cannot allocate compaction process\n");
164 			return -1;
165 		}
166 
167 		TAILQ_INSERT_TAIL(&nv_cache->compactor_list, compactor, entry);
168 	}
169 
170 #define FTL_MAX_OPEN_CHUNKS 2
171 	nv_cache->p2l_pool = ftl_mempool_create(FTL_MAX_OPEN_CHUNKS,
172 						nv_cache_p2l_map_pool_elem_size(nv_cache),
173 						FTL_BLOCK_SIZE,
174 						SPDK_ENV_SOCKET_ID_ANY);
175 	if (!nv_cache->p2l_pool) {
176 		return -ENOMEM;
177 	}
178 
179 	/* One entry per open chunk */
180 	nv_cache->chunk_md_pool = ftl_mempool_create(FTL_MAX_OPEN_CHUNKS,
181 				  sizeof(struct ftl_nv_cache_chunk_md),
182 				  FTL_BLOCK_SIZE,
183 				  SPDK_ENV_SOCKET_ID_ANY);
184 	if (!nv_cache->chunk_md_pool) {
185 		return -ENOMEM;
186 	}
187 
188 	/* Each compactor can be reading a different chunk which it needs to switch state to free to at the end,
189 	 * plus one backup each for high invalidity chunks processing (if there's a backlog of chunks with extremely
190 	 * small, even 0, validity then they can be processed by the compactors quickly and trigger a lot of updates
191 	 * to free state at once) */
192 	nv_cache->free_chunk_md_pool = ftl_mempool_create(2 * FTL_NV_CACHE_NUM_COMPACTORS,
193 				       sizeof(struct ftl_nv_cache_chunk_md),
194 				       FTL_BLOCK_SIZE,
195 				       SPDK_ENV_SOCKET_ID_ANY);
196 	if (!nv_cache->free_chunk_md_pool) {
197 		return -ENOMEM;
198 	}
199 
200 	return 0;
201 }
202 
203 void
204 ftl_nv_cache_deinit(struct spdk_ftl_dev *dev)
205 {
206 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
207 	struct ftl_nv_cache_compactor *compactor;
208 
209 	while (!TAILQ_EMPTY(&nv_cache->compactor_list)) {
210 		compactor = TAILQ_FIRST(&nv_cache->compactor_list);
211 		TAILQ_REMOVE(&nv_cache->compactor_list, compactor, entry);
212 
213 		compactor_free(dev, compactor);
214 	}
215 
216 	ftl_mempool_destroy(nv_cache->md_pool);
217 	ftl_mempool_destroy(nv_cache->p2l_pool);
218 	ftl_mempool_destroy(nv_cache->chunk_md_pool);
219 	ftl_mempool_destroy(nv_cache->free_chunk_md_pool);
220 	nv_cache->md_pool = NULL;
221 	nv_cache->p2l_pool = NULL;
222 	nv_cache->chunk_md_pool = NULL;
223 	nv_cache->free_chunk_md_pool = NULL;
224 
225 	free(nv_cache->chunks);
226 	nv_cache->chunks = NULL;
227 }
228 
229 static uint64_t
230 chunk_get_free_space(struct ftl_nv_cache *nv_cache,
231 		     struct ftl_nv_cache_chunk *chunk)
232 {
233 	assert(chunk->md->write_pointer + nv_cache->tail_md_chunk_blocks <=
234 	       nv_cache->chunk_blocks);
235 	return nv_cache->chunk_blocks - chunk->md->write_pointer -
236 	       nv_cache->tail_md_chunk_blocks;
237 }
238 
239 static bool
240 chunk_is_closed(struct ftl_nv_cache_chunk *chunk)
241 {
242 	return chunk->md->write_pointer == chunk->nv_cache->chunk_blocks;
243 }
244 
245 static void ftl_chunk_close(struct ftl_nv_cache_chunk *chunk);
246 
247 static uint64_t
248 ftl_nv_cache_get_wr_buffer(struct ftl_nv_cache *nv_cache, struct ftl_io *io)
249 {
250 	uint64_t address = FTL_LBA_INVALID;
251 	uint64_t num_blocks = io->num_blocks;
252 	uint64_t free_space;
253 	struct ftl_nv_cache_chunk *chunk;
254 
255 	do {
256 		chunk = nv_cache->chunk_current;
257 		/* Chunk has been closed so pick new one */
258 		if (chunk && chunk_is_closed(chunk))  {
259 			chunk = NULL;
260 		}
261 
262 		if (!chunk) {
263 			chunk = TAILQ_FIRST(&nv_cache->chunk_open_list);
264 			if (chunk && chunk->md->state == FTL_CHUNK_STATE_OPEN) {
265 				TAILQ_REMOVE(&nv_cache->chunk_open_list, chunk, entry);
266 				nv_cache->chunk_current = chunk;
267 			} else {
268 				break;
269 			}
270 		}
271 
272 		free_space = chunk_get_free_space(nv_cache, chunk);
273 
274 		if (free_space >= num_blocks) {
275 			/* Enough space in chunk */
276 
277 			/* Calculate address in NV cache */
278 			address = chunk->offset + chunk->md->write_pointer;
279 
280 			/* Set chunk in IO */
281 			io->nv_cache_chunk = chunk;
282 
283 			/* Move write pointer */
284 			chunk->md->write_pointer += num_blocks;
285 			break;
286 		}
287 
288 		/* Not enough space in nv_cache_chunk */
289 		nv_cache->chunk_current = NULL;
290 
291 		if (0 == free_space) {
292 			continue;
293 		}
294 
295 		chunk->md->blocks_skipped = free_space;
296 		chunk->md->blocks_written += free_space;
297 		chunk->md->write_pointer += free_space;
298 
299 		if (chunk->md->blocks_written == chunk_tail_md_offset(nv_cache)) {
300 			ftl_chunk_close(chunk);
301 		}
302 	} while (1);
303 
304 	return address;
305 }
306 
307 void
308 ftl_nv_cache_fill_md(struct ftl_io *io)
309 {
310 	struct ftl_nv_cache_chunk *chunk = io->nv_cache_chunk;
311 	uint64_t i;
312 	union ftl_md_vss *metadata = io->md;
313 	uint64_t lba = ftl_io_get_lba(io, 0);
314 
315 	for (i = 0; i < io->num_blocks; ++i, lba++, metadata++) {
316 		metadata->nv_cache.lba = lba;
317 		metadata->nv_cache.seq_id = chunk->md->seq_id;
318 	}
319 }
320 
321 uint64_t
322 chunk_tail_md_offset(struct ftl_nv_cache *nv_cache)
323 {
324 	return nv_cache->chunk_blocks - nv_cache->tail_md_chunk_blocks;
325 }
326 
327 static void
328 chunk_advance_blocks(struct ftl_nv_cache *nv_cache, struct ftl_nv_cache_chunk *chunk,
329 		     uint64_t advanced_blocks)
330 {
331 	chunk->md->blocks_written += advanced_blocks;
332 
333 	assert(chunk->md->blocks_written <= nv_cache->chunk_blocks);
334 
335 	if (chunk->md->blocks_written == chunk_tail_md_offset(nv_cache)) {
336 		ftl_chunk_close(chunk);
337 	}
338 }
339 
340 static uint64_t
341 chunk_user_blocks_written(struct ftl_nv_cache_chunk *chunk)
342 {
343 	return chunk->md->blocks_written - chunk->md->blocks_skipped -
344 	       chunk->nv_cache->tail_md_chunk_blocks;
345 }
346 
347 static bool
348 is_chunk_compacted(struct ftl_nv_cache_chunk *chunk)
349 {
350 	assert(chunk->md->blocks_written != 0);
351 
352 	if (chunk_user_blocks_written(chunk) == chunk->md->blocks_compacted) {
353 		return true;
354 	}
355 
356 	return false;
357 }
358 
359 static int
360 ftl_chunk_alloc_md_entry(struct ftl_nv_cache_chunk *chunk)
361 {
362 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
363 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
364 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
365 	struct ftl_layout_region *region = &dev->layout.region[FTL_LAYOUT_REGION_TYPE_NVC_MD];
366 
367 	p2l_map->chunk_dma_md = ftl_mempool_get(nv_cache->chunk_md_pool);
368 
369 	if (!p2l_map->chunk_dma_md) {
370 		return -ENOMEM;
371 	}
372 
373 	memset(p2l_map->chunk_dma_md, 0, region->entry_size * FTL_BLOCK_SIZE);
374 	return 0;
375 }
376 
377 static void
378 ftl_chunk_free_md_entry(struct ftl_nv_cache_chunk *chunk)
379 {
380 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
381 
382 	ftl_mempool_put(chunk->nv_cache->chunk_md_pool, p2l_map->chunk_dma_md);
383 	p2l_map->chunk_dma_md = NULL;
384 }
385 
386 static void
387 ftl_chunk_free(struct ftl_nv_cache_chunk *chunk)
388 {
389 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
390 
391 	/* Reset chunk */
392 	memset(chunk->md, 0, sizeof(*chunk->md));
393 
394 	TAILQ_INSERT_TAIL(&nv_cache->needs_free_persist_list, chunk, entry);
395 	nv_cache->chunk_free_persist_count++;
396 }
397 
398 static int
399 ftl_chunk_alloc_chunk_free_entry(struct ftl_nv_cache_chunk *chunk)
400 {
401 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
402 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
403 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
404 	struct ftl_layout_region *region = &dev->layout.region[FTL_LAYOUT_REGION_TYPE_NVC_MD];
405 
406 	p2l_map->chunk_dma_md = ftl_mempool_get(nv_cache->free_chunk_md_pool);
407 
408 	if (!p2l_map->chunk_dma_md) {
409 		return -ENOMEM;
410 	}
411 
412 	memset(p2l_map->chunk_dma_md, 0, region->entry_size * FTL_BLOCK_SIZE);
413 	return 0;
414 }
415 
416 static void
417 ftl_chunk_free_chunk_free_entry(struct ftl_nv_cache_chunk *chunk)
418 {
419 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
420 
421 	ftl_mempool_put(chunk->nv_cache->free_chunk_md_pool, p2l_map->chunk_dma_md);
422 	p2l_map->chunk_dma_md = NULL;
423 }
424 
425 static void
426 chunk_free_cb(int status, void *ctx)
427 {
428 	struct ftl_nv_cache_chunk *chunk = (struct ftl_nv_cache_chunk *)ctx;
429 
430 	if (spdk_likely(!status)) {
431 		struct ftl_nv_cache *nv_cache = chunk->nv_cache;
432 
433 		nv_cache->chunk_free_persist_count--;
434 		TAILQ_INSERT_TAIL(&nv_cache->chunk_free_list, chunk, entry);
435 		nv_cache->chunk_free_count++;
436 		nv_cache->chunk_full_count--;
437 		chunk->md->state = FTL_CHUNK_STATE_FREE;
438 		chunk->md->close_seq_id = 0;
439 		ftl_chunk_free_chunk_free_entry(chunk);
440 	} else {
441 		ftl_md_persist_entry_retry(&chunk->md_persist_entry_ctx);
442 	}
443 }
444 
445 static void
446 ftl_chunk_persist_free_state(struct ftl_nv_cache *nv_cache)
447 {
448 	int rc;
449 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
450 	struct ftl_p2l_map *p2l_map;
451 	struct ftl_md *md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_NVC_MD];
452 	struct ftl_layout_region *region = &dev->layout.region[FTL_LAYOUT_REGION_TYPE_NVC_MD];
453 	struct ftl_nv_cache_chunk *tchunk, *chunk = NULL;
454 
455 	TAILQ_FOREACH_SAFE(chunk, &nv_cache->needs_free_persist_list, entry, tchunk) {
456 		p2l_map = &chunk->p2l_map;
457 		rc = ftl_chunk_alloc_chunk_free_entry(chunk);
458 		if (rc) {
459 			break;
460 		}
461 
462 		TAILQ_REMOVE(&nv_cache->needs_free_persist_list, chunk, entry);
463 
464 		memcpy(p2l_map->chunk_dma_md, chunk->md, region->entry_size * FTL_BLOCK_SIZE);
465 		p2l_map->chunk_dma_md->state = FTL_CHUNK_STATE_FREE;
466 		p2l_map->chunk_dma_md->close_seq_id = 0;
467 		p2l_map->chunk_dma_md->p2l_map_checksum = 0;
468 
469 		ftl_md_persist_entry(md, get_chunk_idx(chunk), p2l_map->chunk_dma_md, NULL,
470 				     chunk_free_cb, chunk, &chunk->md_persist_entry_ctx);
471 	}
472 }
473 
474 static void
475 chunk_compaction_advance(struct ftl_nv_cache_chunk *chunk, uint64_t num_blocks)
476 {
477 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
478 
479 	chunk->md->blocks_compacted += num_blocks;
480 	if (!is_chunk_compacted(chunk)) {
481 		return;
482 	}
483 
484 	/* Remove chunk from compacted list */
485 	TAILQ_REMOVE(&nv_cache->chunk_comp_list, chunk, entry);
486 	nv_cache->chunk_comp_count--;
487 
488 	ftl_chunk_free(chunk);
489 }
490 
491 static bool
492 is_compaction_required(struct ftl_nv_cache *nv_cache)
493 {
494 	uint64_t full;
495 
496 	if (spdk_unlikely(nv_cache->halt)) {
497 		return false;
498 	}
499 
500 	full = nv_cache->chunk_full_count - nv_cache->compaction_active_count;
501 	if (full >= nv_cache->chunk_compaction_threshold) {
502 		return true;
503 	}
504 
505 	return false;
506 }
507 
508 static void compaction_process_finish_read(struct ftl_nv_cache_compactor *compactor);
509 static void compaction_process_pin_lba(struct ftl_nv_cache_compactor *comp);
510 
511 static void
512 _compaction_process_pin_lba(void *_comp)
513 {
514 	struct ftl_nv_cache_compactor *comp = _comp;
515 
516 	compaction_process_pin_lba(comp);
517 }
518 
519 static void
520 compaction_process_pin_lba_cb(struct spdk_ftl_dev *dev, int status, struct ftl_l2p_pin_ctx *pin_ctx)
521 {
522 	struct ftl_nv_cache_compactor *comp = pin_ctx->cb_ctx;
523 	struct ftl_rq *rq = comp->rd;
524 
525 	if (status) {
526 		rq->iter.status = status;
527 		pin_ctx->lba = FTL_LBA_INVALID;
528 	}
529 
530 	if (--rq->iter.remaining == 0) {
531 		if (rq->iter.status) {
532 			/* unpin and try again */
533 			ftl_rq_unpin(rq);
534 			spdk_thread_send_msg(spdk_get_thread(), _compaction_process_pin_lba, comp);
535 			return;
536 		}
537 
538 		compaction_process_finish_read(comp);
539 	}
540 }
541 
542 static void
543 compaction_process_pin_lba(struct ftl_nv_cache_compactor *comp)
544 {
545 	union ftl_md_vss *md;
546 	struct ftl_nv_cache_chunk *chunk = comp->rd->owner.priv;
547 	struct spdk_ftl_dev *dev = comp->rd->dev;
548 	uint64_t i;
549 	uint32_t count = comp->rd->iter.count;
550 	struct ftl_rq_entry *entry;
551 	struct ftl_l2p_pin_ctx *pin_ctx;
552 
553 	assert(comp->rd->iter.idx == 0);
554 	comp->rd->iter.remaining = count;
555 	comp->rd->iter.status = 0;
556 
557 	for (i = 0; i < count; i++) {
558 		entry = &comp->rd->entries[i];
559 		pin_ctx = &entry->l2p_pin_ctx;
560 		md = entry->io_md;
561 		if (md->nv_cache.lba == FTL_LBA_INVALID || md->nv_cache.seq_id != chunk->md->seq_id) {
562 			ftl_l2p_pin_skip(dev, compaction_process_pin_lba_cb, comp, pin_ctx);
563 		} else {
564 			ftl_l2p_pin(dev, md->nv_cache.lba, 1, compaction_process_pin_lba_cb, comp, pin_ctx);
565 		}
566 	}
567 }
568 
569 static int compaction_submit_read(struct ftl_nv_cache_compactor *compactor, ftl_addr addr,
570 				  uint64_t num_blocks);
571 
572 static void
573 compaction_retry_read(void *_compactor)
574 {
575 	struct ftl_nv_cache_compactor *compactor = _compactor;
576 	struct ftl_rq *rq = compactor->rd;
577 	struct spdk_bdev *bdev;
578 	int ret;
579 
580 	ret = compaction_submit_read(compactor, rq->io.addr, rq->iter.count);
581 
582 	if (ret == -ENOMEM) {
583 		bdev = spdk_bdev_desc_get_bdev(compactor->nv_cache->bdev_desc);
584 		compactor->bdev_io_wait.bdev = bdev;
585 		compactor->bdev_io_wait.cb_fn = compaction_retry_read;
586 		compactor->bdev_io_wait.cb_arg = compactor;
587 		spdk_bdev_queue_io_wait(bdev, compactor->nv_cache->cache_ioch, &compactor->bdev_io_wait);
588 	} else {
589 		ftl_abort();
590 	}
591 }
592 
593 static void
594 compaction_process_read_cb(struct spdk_bdev_io *bdev_io,
595 			   bool success, void *cb_arg)
596 {
597 	struct ftl_nv_cache_compactor *compactor = cb_arg;
598 
599 	spdk_bdev_free_io(bdev_io);
600 
601 	if (!success) {
602 		/* retry */
603 		spdk_thread_send_msg(spdk_get_thread(), compaction_retry_read, compactor);
604 		return;
605 	}
606 
607 	compaction_process_pin_lba(compactor);
608 }
609 
610 static bool
611 is_chunk_to_read(struct ftl_nv_cache_chunk *chunk)
612 {
613 	assert(chunk->md->blocks_written != 0);
614 
615 	if (chunk_user_blocks_written(chunk) == chunk->md->read_pointer) {
616 		return false;
617 	}
618 
619 	return true;
620 }
621 
622 static struct ftl_nv_cache_chunk *
623 get_chunk_for_compaction(struct ftl_nv_cache *nv_cache)
624 {
625 	struct ftl_nv_cache_chunk *chunk = NULL;
626 
627 	if (!TAILQ_EMPTY(&nv_cache->chunk_comp_list)) {
628 		chunk = TAILQ_FIRST(&nv_cache->chunk_comp_list);
629 		if (is_chunk_to_read(chunk)) {
630 			return chunk;
631 		}
632 	}
633 
634 	if (!TAILQ_EMPTY(&nv_cache->chunk_full_list)) {
635 		chunk = TAILQ_FIRST(&nv_cache->chunk_full_list);
636 		TAILQ_REMOVE(&nv_cache->chunk_full_list, chunk, entry);
637 
638 		assert(chunk->md->write_pointer);
639 	} else {
640 		return NULL;
641 	}
642 
643 	if (spdk_likely(chunk)) {
644 		assert(chunk->md->write_pointer != 0);
645 		TAILQ_INSERT_HEAD(&nv_cache->chunk_comp_list, chunk, entry);
646 		nv_cache->chunk_comp_count++;
647 	}
648 
649 	return chunk;
650 }
651 
652 static uint64_t
653 chunk_blocks_to_read(struct ftl_nv_cache_chunk *chunk)
654 {
655 	uint64_t blocks_written;
656 	uint64_t blocks_to_read;
657 
658 	assert(chunk->md->blocks_written >= chunk->md->blocks_skipped);
659 	blocks_written = chunk_user_blocks_written(chunk);
660 
661 	assert(blocks_written >= chunk->md->read_pointer);
662 	blocks_to_read = blocks_written - chunk->md->read_pointer;
663 
664 	return blocks_to_read;
665 }
666 
667 static void
668 compactor_deactivate(struct ftl_nv_cache_compactor *compactor)
669 {
670 	struct ftl_nv_cache *nv_cache = compactor->nv_cache;
671 
672 	nv_cache->compaction_active_count--;
673 	TAILQ_INSERT_TAIL(&nv_cache->compactor_list, compactor, entry);
674 }
675 
676 static int
677 compaction_submit_read(struct ftl_nv_cache_compactor *compactor, ftl_addr addr,
678 		       uint64_t num_blocks)
679 {
680 	struct ftl_nv_cache *nv_cache = compactor->nv_cache;
681 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
682 
683 	return ftl_nv_cache_bdev_readv_blocks_with_md(dev, nv_cache->bdev_desc,
684 			nv_cache->cache_ioch,
685 			compactor->rd->io_vec, num_blocks,
686 			compactor->rd->io_md,
687 			ftl_addr_to_nvc_offset(dev, addr), num_blocks,
688 			compaction_process_read_cb,
689 			compactor);
690 }
691 
692 static void
693 compaction_process_pad(struct ftl_nv_cache_compactor *compactor)
694 {
695 	struct ftl_rq *wr = compactor->wr;
696 	const uint64_t num_entries = wr->num_blocks;
697 	struct ftl_rq_entry *iter;
698 
699 	iter = &wr->entries[wr->iter.idx];
700 
701 	while (wr->iter.idx < num_entries) {
702 		iter->addr = FTL_ADDR_INVALID;
703 		iter->owner.priv = NULL;
704 		iter->lba = FTL_LBA_INVALID;
705 		iter->seq_id = 0;
706 		iter++;
707 		wr->iter.idx++;
708 	}
709 }
710 
711 static void
712 compaction_process(struct ftl_nv_cache_compactor *compactor)
713 {
714 	struct ftl_nv_cache *nv_cache = compactor->nv_cache;
715 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache,
716 				   struct spdk_ftl_dev, nv_cache);
717 	struct ftl_nv_cache_chunk *chunk;
718 	uint64_t to_read, addr, begin, end, offset;
719 	int rc;
720 
721 	/* Check if all read blocks done */
722 	assert(compactor->rd->iter.idx <= compactor->rd->iter.count);
723 	if (compactor->rd->iter.idx < compactor->rd->iter.count) {
724 		compaction_process_finish_read(compactor);
725 		return;
726 	}
727 
728 	/*
729 	 * Get currently handled chunk
730 	 */
731 	chunk = get_chunk_for_compaction(nv_cache);
732 	if (!chunk) {
733 		/* No chunks to compact, pad this request */
734 		compaction_process_pad(compactor);
735 		ftl_writer_queue_rq(&dev->writer_user, compactor->wr);
736 		return;
737 	}
738 
739 	/*
740 	 * Get range of blocks to read
741 	 */
742 	to_read = chunk_blocks_to_read(chunk);
743 	assert(to_read > 0);
744 
745 	addr = ftl_addr_from_nvc_offset(dev, chunk->offset + chunk->md->read_pointer);
746 	begin = ftl_bitmap_find_first_set(dev->valid_map, addr, addr + to_read);
747 	if (begin != UINT64_MAX) {
748 		offset = spdk_min(begin - addr, to_read);
749 	} else {
750 		offset = to_read;
751 	}
752 
753 	if (offset) {
754 		chunk->md->read_pointer += offset;
755 		chunk_compaction_advance(chunk, offset);
756 		to_read -= offset;
757 		if (!to_read) {
758 			compactor_deactivate(compactor);
759 			return;
760 		}
761 	}
762 
763 	end = ftl_bitmap_find_first_clear(dev->valid_map, begin + 1, begin + to_read);
764 	if (end != UINT64_MAX) {
765 		to_read = end - begin;
766 	}
767 
768 	addr = begin;
769 	to_read = spdk_min(to_read, compactor->rd->num_blocks);
770 
771 	/* Read data and metadata from NV cache */
772 	rc = compaction_submit_read(compactor, addr, to_read);
773 	if (spdk_unlikely(rc)) {
774 		/* An error occurred, inactivate this compactor, it will retry
775 		 * in next iteration
776 		 */
777 		compactor_deactivate(compactor);
778 		return;
779 	}
780 
781 	/* IO has started, initialize compaction */
782 	compactor->rd->owner.priv = chunk;
783 	compactor->rd->iter.idx = 0;
784 	compactor->rd->iter.count = to_read;
785 	compactor->rd->io.addr = addr;
786 
787 	/* Move read pointer in the chunk */
788 	chunk->md->read_pointer += to_read;
789 }
790 
791 static void
792 compaction_process_start(struct ftl_nv_cache_compactor *compactor)
793 {
794 	compactor->nv_cache->compaction_active_count++;
795 	compaction_process(compactor);
796 }
797 
798 static void
799 compaction_process_ftl_done(struct ftl_rq *rq)
800 {
801 	struct spdk_ftl_dev *dev = rq->dev;
802 	struct ftl_nv_cache_compactor *compactor = rq->owner.priv;
803 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
804 	struct ftl_band *band = rq->io.band;
805 	struct ftl_rq_entry *entry;
806 	ftl_addr addr;
807 	uint64_t i;
808 
809 	if (spdk_unlikely(false == rq->success)) {
810 		/* IO error retry writing */
811 		ftl_writer_queue_rq(&dev->writer_user, rq);
812 		return;
813 	}
814 
815 	/* Update L2P table */
816 	addr = rq->io.addr;
817 	for (i = 0, entry = rq->entries; i < rq->num_blocks; i++, entry++) {
818 		struct ftl_nv_cache_chunk *chunk = entry->owner.priv;
819 
820 		if (entry->lba == FTL_LBA_INVALID) {
821 			assert(entry->addr == FTL_ADDR_INVALID);
822 			addr = ftl_band_next_addr(band, addr, 1);
823 			continue;
824 		}
825 
826 		ftl_l2p_update_base(dev, entry->lba, addr, entry->addr);
827 		ftl_l2p_unpin(dev, entry->lba, 1);
828 
829 		chunk_compaction_advance(chunk, 1);
830 		addr = ftl_band_next_addr(band, addr, 1);
831 	}
832 
833 	compactor->wr->iter.idx = 0;
834 
835 	if (is_compaction_required(nv_cache)) {
836 		compaction_process(compactor);
837 	} else {
838 		compactor_deactivate(compactor);
839 	}
840 }
841 
842 static void
843 compaction_process_finish_read(struct ftl_nv_cache_compactor *compactor)
844 {
845 	struct ftl_rq *wr = compactor->wr;
846 	struct ftl_rq *rd = compactor->rd;
847 	ftl_addr cache_addr = rd->io.addr;
848 	struct ftl_nv_cache_chunk *chunk = rd->owner.priv;
849 	struct spdk_ftl_dev *dev;
850 	struct ftl_rq_entry *iter;
851 	union ftl_md_vss *md;
852 	ftl_addr current_addr;
853 	const uint64_t num_entries = wr->num_blocks;
854 
855 	dev = SPDK_CONTAINEROF(compactor->nv_cache,
856 			       struct spdk_ftl_dev, nv_cache);
857 
858 	assert(wr->iter.idx < num_entries);
859 	assert(rd->iter.idx < rd->iter.count);
860 
861 	cache_addr += rd->iter.idx;
862 	iter = &wr->entries[wr->iter.idx];
863 
864 	while (wr->iter.idx < num_entries && rd->iter.idx < rd->iter.count) {
865 		/* Get metadata */
866 		md = rd->entries[rd->iter.idx].io_md;
867 		if (md->nv_cache.lba == FTL_LBA_INVALID || md->nv_cache.seq_id != chunk->md->seq_id) {
868 			cache_addr++;
869 			rd->iter.idx++;
870 			chunk_compaction_advance(chunk, 1);
871 			continue;
872 		}
873 
874 		current_addr = ftl_l2p_get(dev, md->nv_cache.lba);
875 		if (current_addr == cache_addr) {
876 			/* Swap payload */
877 			ftl_rq_swap_payload(wr, wr->iter.idx, rd, rd->iter.idx);
878 
879 			/*
880 			 * Address still the same, we may continue to compact it
881 			 * back to  FTL, set valid number of entries within
882 			 * this batch
883 			 */
884 			iter->addr = current_addr;
885 			iter->owner.priv = chunk;
886 			iter->lba = md->nv_cache.lba;
887 			iter->seq_id = chunk->md->seq_id;
888 
889 			/* Advance within batch */
890 			iter++;
891 			wr->iter.idx++;
892 		} else {
893 			/* This address already invalidated, just omit this block */
894 			chunk_compaction_advance(chunk, 1);
895 			ftl_l2p_unpin(dev, md->nv_cache.lba, 1);
896 		}
897 
898 		/* Advance within reader */
899 		rd->iter.idx++;
900 		cache_addr++;
901 	}
902 
903 	if (num_entries == wr->iter.idx) {
904 		/*
905 		 * Request contains data to be placed on FTL, compact it
906 		 */
907 		ftl_writer_queue_rq(&dev->writer_user, wr);
908 	} else {
909 		if (is_compaction_required(compactor->nv_cache)) {
910 			compaction_process(compactor);
911 		} else {
912 			compactor_deactivate(compactor);
913 		}
914 	}
915 }
916 
917 static void
918 compactor_free(struct spdk_ftl_dev *dev, struct ftl_nv_cache_compactor *compactor)
919 {
920 	if (!compactor) {
921 		return;
922 	}
923 
924 	ftl_rq_del(compactor->wr);
925 	ftl_rq_del(compactor->rd);
926 	free(compactor);
927 }
928 
929 static struct ftl_nv_cache_compactor *
930 compactor_alloc(struct spdk_ftl_dev *dev)
931 {
932 	struct ftl_nv_cache_compactor *compactor;
933 
934 	compactor = calloc(1, sizeof(*compactor));
935 	if (!compactor) {
936 		goto error;
937 	}
938 
939 	/* Allocate help request for writing */
940 	compactor->wr = ftl_rq_new(dev, dev->md_size);
941 	if (!compactor->wr) {
942 		goto error;
943 	}
944 
945 	/* Allocate help request for reading */
946 	compactor->rd = ftl_rq_new(dev, dev->nv_cache.md_size);
947 	if (!compactor->rd) {
948 		goto error;
949 	}
950 
951 	compactor->nv_cache = &dev->nv_cache;
952 	compactor->wr->owner.priv = compactor;
953 	compactor->wr->owner.cb = compaction_process_ftl_done;
954 	compactor->wr->owner.compaction = true;
955 
956 	return compactor;
957 
958 error:
959 	compactor_free(dev, compactor);
960 	return NULL;
961 }
962 
963 static void
964 ftl_nv_cache_submit_cb_done(struct ftl_io *io)
965 {
966 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
967 
968 	chunk_advance_blocks(nv_cache, io->nv_cache_chunk, io->num_blocks);
969 	io->nv_cache_chunk = NULL;
970 
971 	ftl_mempool_put(nv_cache->md_pool, io->md);
972 	ftl_io_complete(io);
973 }
974 
975 static void
976 ftl_nv_cache_l2p_update(struct ftl_io *io)
977 {
978 	struct spdk_ftl_dev *dev = io->dev;
979 	ftl_addr next_addr = io->addr;
980 	size_t i;
981 
982 	for (i = 0; i < io->num_blocks; ++i, ++next_addr) {
983 		ftl_l2p_update_cache(dev, ftl_io_get_lba(io, i), next_addr, io->map[i]);
984 	}
985 
986 	ftl_l2p_unpin(dev, io->lba, io->num_blocks);
987 	ftl_nv_cache_submit_cb_done(io);
988 }
989 
990 static void
991 ftl_nv_cache_submit_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
992 {
993 	struct ftl_io *io = cb_arg;
994 
995 	spdk_bdev_free_io(bdev_io);
996 
997 	if (spdk_unlikely(!success)) {
998 		FTL_ERRLOG(io->dev, "Non-volatile cache write failed at %"PRIx64"\n",
999 			   io->addr);
1000 		io->status = -EIO;
1001 		ftl_nv_cache_submit_cb_done(io);
1002 	} else {
1003 		ftl_nv_cache_l2p_update(io);
1004 	}
1005 }
1006 
1007 static void
1008 nv_cache_write(void *_io)
1009 {
1010 	struct ftl_io *io = _io;
1011 	struct spdk_ftl_dev *dev = io->dev;
1012 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
1013 	int rc;
1014 
1015 	rc = ftl_nv_cache_bdev_writev_blocks_with_md(dev,
1016 			nv_cache->bdev_desc, nv_cache->cache_ioch,
1017 			io->iov, io->iov_cnt, io->md,
1018 			ftl_addr_to_nvc_offset(dev, io->addr), io->num_blocks,
1019 			ftl_nv_cache_submit_cb, io);
1020 	if (spdk_unlikely(rc)) {
1021 		if (rc == -ENOMEM) {
1022 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1023 			io->bdev_io_wait.bdev = bdev;
1024 			io->bdev_io_wait.cb_fn = nv_cache_write;
1025 			io->bdev_io_wait.cb_arg = io;
1026 			spdk_bdev_queue_io_wait(bdev, nv_cache->cache_ioch, &io->bdev_io_wait);
1027 		} else {
1028 			ftl_abort();
1029 		}
1030 	}
1031 }
1032 
1033 static void
1034 ftl_nv_cache_pin_cb(struct spdk_ftl_dev *dev, int status, struct ftl_l2p_pin_ctx *pin_ctx)
1035 {
1036 	struct ftl_io *io = pin_ctx->cb_ctx;
1037 	size_t i;
1038 
1039 	if (spdk_unlikely(status != 0)) {
1040 		/* Retry on the internal L2P fault */
1041 		FTL_ERRLOG(dev, "Cannot PIN LBA for NV cache write failed at %"PRIx64"\n",
1042 			   io->addr);
1043 		io->status = -EAGAIN;
1044 		ftl_nv_cache_submit_cb_done(io);
1045 		return;
1046 	}
1047 
1048 	/* Remember previous l2p mapping to resolve conflicts in case of outstanding write-after-write */
1049 	for (i = 0; i < io->num_blocks; ++i) {
1050 		io->map[i] = ftl_l2p_get(dev, ftl_io_get_lba(io, i));
1051 	}
1052 
1053 	assert(io->iov_pos == 0);
1054 
1055 	nv_cache_write(io);
1056 }
1057 
1058 bool
1059 ftl_nv_cache_write(struct ftl_io *io)
1060 {
1061 	struct spdk_ftl_dev *dev = io->dev;
1062 	uint64_t cache_offset;
1063 
1064 	io->md = ftl_mempool_get(dev->nv_cache.md_pool);
1065 	if (spdk_unlikely(!io->md)) {
1066 		return false;
1067 	}
1068 
1069 	/* Reserve area on the write buffer cache */
1070 	cache_offset = ftl_nv_cache_get_wr_buffer(&dev->nv_cache, io);
1071 	if (cache_offset == FTL_LBA_INVALID) {
1072 		/* No free space in NV cache, resubmit request */
1073 		ftl_mempool_put(dev->nv_cache.md_pool, io->md);
1074 		return false;
1075 	}
1076 	io->addr = ftl_addr_from_nvc_offset(dev, cache_offset);
1077 	io->nv_cache_chunk = dev->nv_cache.chunk_current;
1078 
1079 	ftl_nv_cache_fill_md(io);
1080 	ftl_l2p_pin(io->dev, io->lba, io->num_blocks,
1081 		    ftl_nv_cache_pin_cb, io,
1082 		    &io->l2p_pin_ctx);
1083 
1084 	return true;
1085 }
1086 
1087 int
1088 ftl_nv_cache_read(struct ftl_io *io, ftl_addr addr, uint32_t num_blocks,
1089 		  spdk_bdev_io_completion_cb cb, void *cb_arg)
1090 {
1091 	int rc;
1092 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1093 
1094 	assert(ftl_addr_in_nvc(io->dev, addr));
1095 
1096 	rc = ftl_nv_cache_bdev_read_blocks_with_md(io->dev, nv_cache->bdev_desc, nv_cache->cache_ioch,
1097 			ftl_io_iovec_addr(io), NULL, ftl_addr_to_nvc_offset(io->dev, addr),
1098 			num_blocks, cb, cb_arg);
1099 
1100 	return rc;
1101 }
1102 
1103 bool
1104 ftl_nv_cache_is_halted(struct ftl_nv_cache *nv_cache)
1105 {
1106 	struct ftl_nv_cache_compactor *compactor;
1107 
1108 	if (nv_cache->compaction_active_count) {
1109 		return false;
1110 	}
1111 
1112 	TAILQ_FOREACH(compactor, &nv_cache->compactor_list, entry) {
1113 		if (compactor->rd->iter.idx != 0 || compactor->wr->iter.idx != 0) {
1114 			return false;
1115 		}
1116 	}
1117 
1118 	if (nv_cache->chunk_open_count > 0) {
1119 		return false;
1120 	}
1121 
1122 	return true;
1123 }
1124 
1125 static void
1126 ftl_nv_cache_compaction_reset(struct ftl_nv_cache_compactor *compactor)
1127 {
1128 	struct ftl_rq *rd = compactor->rd;
1129 	struct ftl_rq *wr = compactor->wr;
1130 	uint64_t lba;
1131 	uint64_t i;
1132 
1133 	for (i = rd->iter.idx; i < rd->iter.count; i++) {
1134 		lba = ((union ftl_md_vss *)rd->entries[i].io_md)->nv_cache.lba;
1135 		if (lba != FTL_LBA_INVALID) {
1136 			ftl_l2p_unpin(rd->dev, lba, 1);
1137 		}
1138 	}
1139 
1140 	rd->iter.idx = 0;
1141 	rd->iter.count = 0;
1142 
1143 	for (i = 0; i < wr->iter.idx; i++) {
1144 		lba = wr->entries[i].lba;
1145 		assert(lba != FTL_LBA_INVALID);
1146 		ftl_l2p_unpin(wr->dev, lba, 1);
1147 	}
1148 
1149 	wr->iter.idx = 0;
1150 }
1151 
1152 void
1153 ftl_chunk_map_set_lba(struct ftl_nv_cache_chunk *chunk,
1154 		      uint64_t offset, uint64_t lba)
1155 {
1156 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1157 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1158 
1159 	ftl_lba_store(dev, p2l_map->chunk_map, offset, lba);
1160 }
1161 
1162 uint64_t
1163 ftl_chunk_map_get_lba(struct ftl_nv_cache_chunk *chunk, uint64_t offset)
1164 {
1165 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1166 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1167 
1168 	return ftl_lba_load(dev, p2l_map->chunk_map, offset);
1169 }
1170 
1171 static void
1172 ftl_chunk_set_addr(struct ftl_nv_cache_chunk *chunk, uint64_t lba, ftl_addr addr)
1173 {
1174 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1175 	uint64_t cache_offset = ftl_addr_to_nvc_offset(dev, addr);
1176 	uint64_t offset;
1177 
1178 	offset = (cache_offset - chunk->offset) % chunk->nv_cache->chunk_blocks;
1179 	ftl_chunk_map_set_lba(chunk, offset, lba);
1180 }
1181 
1182 struct ftl_nv_cache_chunk *
1183 ftl_nv_cache_get_chunk_from_addr(struct spdk_ftl_dev *dev, ftl_addr addr)
1184 {
1185 	struct ftl_nv_cache_chunk *chunk = dev->nv_cache.chunks;
1186 	uint64_t chunk_idx;
1187 	uint64_t cache_offset = ftl_addr_to_nvc_offset(dev, addr);
1188 
1189 	assert(chunk != NULL);
1190 	chunk_idx = (cache_offset - chunk->offset) / chunk->nv_cache->chunk_blocks;
1191 	chunk += chunk_idx;
1192 
1193 	return chunk;
1194 }
1195 
1196 void
1197 ftl_nv_cache_set_addr(struct spdk_ftl_dev *dev, uint64_t lba, ftl_addr addr)
1198 {
1199 	struct ftl_nv_cache_chunk *chunk;
1200 
1201 	chunk = ftl_nv_cache_get_chunk_from_addr(dev, addr);
1202 
1203 	assert(lba != FTL_LBA_INVALID);
1204 
1205 	ftl_chunk_set_addr(chunk, lba, addr);
1206 	ftl_bitmap_set(dev->valid_map, addr);
1207 }
1208 
1209 static void ftl_chunk_open(struct ftl_nv_cache_chunk *chunk);
1210 
1211 void
1212 ftl_nv_cache_process(struct spdk_ftl_dev *dev)
1213 {
1214 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
1215 
1216 	assert(dev->nv_cache.bdev_desc);
1217 
1218 	if (nv_cache->chunk_open_count < FTL_MAX_OPEN_CHUNKS && spdk_likely(!nv_cache->halt) &&
1219 	    !TAILQ_EMPTY(&nv_cache->chunk_free_list)) {
1220 		struct ftl_nv_cache_chunk *chunk = TAILQ_FIRST(&nv_cache->chunk_free_list);
1221 		TAILQ_REMOVE(&nv_cache->chunk_free_list, chunk, entry);
1222 		TAILQ_INSERT_TAIL(&nv_cache->chunk_open_list, chunk, entry);
1223 		nv_cache->chunk_free_count--;
1224 		chunk->md->seq_id = ftl_get_next_seq_id(dev);
1225 		ftl_chunk_open(chunk);
1226 	}
1227 
1228 	if (is_compaction_required(nv_cache) && !TAILQ_EMPTY(&nv_cache->compactor_list)) {
1229 		struct ftl_nv_cache_compactor *comp =
1230 			TAILQ_FIRST(&nv_cache->compactor_list);
1231 
1232 		TAILQ_REMOVE(&nv_cache->compactor_list, comp, entry);
1233 
1234 		compaction_process_start(comp);
1235 	}
1236 
1237 	ftl_chunk_persist_free_state(nv_cache);
1238 
1239 	if (spdk_unlikely(nv_cache->halt)) {
1240 		struct ftl_nv_cache_compactor *compactor;
1241 
1242 		TAILQ_FOREACH(compactor, &nv_cache->compactor_list, entry) {
1243 			ftl_nv_cache_compaction_reset(compactor);
1244 		}
1245 	}
1246 }
1247 
1248 bool
1249 ftl_nv_cache_full(struct ftl_nv_cache *nv_cache)
1250 {
1251 	if (0 == nv_cache->chunk_open_count && NULL == nv_cache->chunk_current) {
1252 		return true;
1253 	} else {
1254 		return false;
1255 	}
1256 }
1257 
1258 static void
1259 chunk_free_p2l_map(struct ftl_nv_cache_chunk *chunk)
1260 {
1261 
1262 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
1263 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1264 
1265 	ftl_mempool_put(nv_cache->p2l_pool, p2l_map->chunk_map);
1266 	p2l_map->chunk_map = NULL;
1267 
1268 	ftl_chunk_free_md_entry(chunk);
1269 }
1270 
1271 int
1272 ftl_nv_cache_save_state(struct ftl_nv_cache *nv_cache)
1273 {
1274 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1275 	struct ftl_nv_cache_chunk *chunk;
1276 	int status = 0;
1277 	uint64_t i;
1278 
1279 	assert(nv_cache->chunk_open_count == 0);
1280 
1281 	if (nv_cache->compaction_active_count) {
1282 		FTL_ERRLOG(dev, "Cannot save NV cache state, compaction in progress\n");
1283 		return -EINVAL;
1284 	}
1285 
1286 	chunk = nv_cache->chunks;
1287 	if (!chunk) {
1288 		FTL_ERRLOG(dev, "Cannot save NV cache state, no NV cache metadata\n");
1289 		return -ENOMEM;
1290 	}
1291 
1292 	for (i = 0; i < nv_cache->chunk_count; i++, chunk++) {
1293 		nvc_validate_md(nv_cache, chunk->md);
1294 
1295 		if (chunk->md->read_pointer)  {
1296 			/* Only full chunks can be compacted */
1297 			if (chunk->md->blocks_written != nv_cache->chunk_blocks) {
1298 				assert(0);
1299 				status = -EINVAL;
1300 				break;
1301 			}
1302 
1303 			/*
1304 			 * Chunk in the middle of compaction, start over after
1305 			 * load
1306 			 */
1307 			chunk->md->read_pointer = chunk->md->blocks_compacted = 0;
1308 		} else if (chunk->md->blocks_written == nv_cache->chunk_blocks) {
1309 			/* Full chunk */
1310 		} else if (0 == chunk->md->blocks_written) {
1311 			/* Empty chunk */
1312 		} else {
1313 			assert(0);
1314 			status = -EINVAL;
1315 			break;
1316 		}
1317 	}
1318 
1319 	if (status) {
1320 		FTL_ERRLOG(dev, "Cannot save NV cache state, inconsistent NV cache"
1321 			   "metadata\n");
1322 	}
1323 
1324 	return status;
1325 }
1326 
1327 static int
1328 sort_chunks_cmp(const void *a, const void *b)
1329 {
1330 	struct ftl_nv_cache_chunk *a_chunk = *(struct ftl_nv_cache_chunk **)a;
1331 	struct ftl_nv_cache_chunk *b_chunk = *(struct ftl_nv_cache_chunk **)b;
1332 
1333 	return a_chunk->md->seq_id - b_chunk->md->seq_id;
1334 }
1335 
1336 static int
1337 sort_chunks(struct ftl_nv_cache *nv_cache)
1338 {
1339 	struct ftl_nv_cache_chunk **chunks_list;
1340 	struct ftl_nv_cache_chunk *chunk;
1341 	uint32_t i;
1342 
1343 	if (TAILQ_EMPTY(&nv_cache->chunk_full_list)) {
1344 		return 0;
1345 	}
1346 
1347 	chunks_list = calloc(nv_cache->chunk_full_count,
1348 			     sizeof(chunks_list[0]));
1349 	if (!chunks_list) {
1350 		return -ENOMEM;
1351 	}
1352 
1353 	i = 0;
1354 	TAILQ_FOREACH(chunk, &nv_cache->chunk_full_list, entry) {
1355 		chunks_list[i] = chunk;
1356 		i++;
1357 	}
1358 	assert(i == nv_cache->chunk_full_count);
1359 
1360 	qsort(chunks_list, nv_cache->chunk_full_count, sizeof(chunks_list[0]),
1361 	      sort_chunks_cmp);
1362 
1363 	TAILQ_INIT(&nv_cache->chunk_full_list);
1364 	for (i = 0; i < nv_cache->chunk_full_count; i++) {
1365 		chunk = chunks_list[i];
1366 		TAILQ_INSERT_TAIL(&nv_cache->chunk_full_list, chunk, entry);
1367 	}
1368 
1369 	free(chunks_list);
1370 	return 0;
1371 }
1372 
1373 static int
1374 chunk_alloc_p2l_map(struct ftl_nv_cache_chunk *chunk)
1375 {
1376 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
1377 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1378 
1379 	assert(p2l_map->ref_cnt == 0);
1380 	assert(p2l_map->chunk_map == NULL);
1381 
1382 	p2l_map->chunk_map = ftl_mempool_get(nv_cache->p2l_pool);
1383 
1384 	if (!p2l_map->chunk_map) {
1385 		return -ENOMEM;
1386 	}
1387 
1388 	if (ftl_chunk_alloc_md_entry(chunk)) {
1389 		ftl_mempool_put(nv_cache->p2l_pool, p2l_map->chunk_map);
1390 		p2l_map->chunk_map = NULL;
1391 		return -ENOMEM;
1392 	}
1393 
1394 	/* Set the P2L to FTL_LBA_INVALID */
1395 	memset(p2l_map->chunk_map, -1, FTL_BLOCK_SIZE * nv_cache->tail_md_chunk_blocks);
1396 
1397 	return 0;
1398 }
1399 
1400 int
1401 ftl_nv_cache_load_state(struct ftl_nv_cache *nv_cache)
1402 {
1403 	struct ftl_nv_cache_chunk *chunk;
1404 	uint64_t chunks_number, offset, i;
1405 	int status = 0;
1406 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1407 
1408 	nv_cache->chunk_current = NULL;
1409 	TAILQ_INIT(&nv_cache->chunk_free_list);
1410 	TAILQ_INIT(&nv_cache->chunk_full_list);
1411 	nv_cache->chunk_full_count = nv_cache->chunk_free_count = 0;
1412 
1413 	assert(nv_cache->chunk_open_count == 0);
1414 	offset = nvc_data_offset(nv_cache);
1415 	chunk = nv_cache->chunks;
1416 	if (!chunk) {
1417 		FTL_ERRLOG(dev, "No NV cache metadata\n");
1418 		return -1;
1419 	}
1420 
1421 	for (i = 0; i < nv_cache->chunk_count; i++, chunk++) {
1422 		chunk->nv_cache = nv_cache;
1423 		nvc_validate_md(nv_cache, chunk->md);
1424 
1425 		if (offset != chunk->offset) {
1426 			status = -EINVAL;
1427 			goto error;
1428 		}
1429 
1430 		if (chunk->md->blocks_written == nv_cache->chunk_blocks) {
1431 			/* Chunk full, move it on full list */
1432 			TAILQ_INSERT_TAIL(&nv_cache->chunk_full_list, chunk, entry);
1433 			nv_cache->chunk_full_count++;
1434 		} else if (0 == chunk->md->blocks_written) {
1435 			/* Chunk empty, move it on empty list */
1436 			TAILQ_INSERT_TAIL(&nv_cache->chunk_free_list, chunk, entry);
1437 			nv_cache->chunk_free_count++;
1438 		} else {
1439 			status = -EINVAL;
1440 			goto error;
1441 		}
1442 
1443 		offset += nv_cache->chunk_blocks;
1444 	}
1445 
1446 	chunks_number = nv_cache->chunk_free_count + nv_cache->chunk_full_count;
1447 	assert(nv_cache->chunk_current == NULL);
1448 
1449 	if (chunks_number != nv_cache->chunk_count) {
1450 		FTL_ERRLOG(dev, "Inconsistent NV cache metadata\n");
1451 		status = -EINVAL;
1452 		goto error;
1453 	}
1454 
1455 	status = sort_chunks(nv_cache);
1456 	if (status) {
1457 		FTL_ERRLOG(dev, "FTL NV Cache: sorting chunks ERROR\n");
1458 	}
1459 
1460 	FTL_NOTICELOG(dev, "FTL NV Cache: full chunks = %lu, empty chunks = %lu\n",
1461 		      nv_cache->chunk_full_count, nv_cache->chunk_free_count);
1462 
1463 	if (0 == status) {
1464 		FTL_NOTICELOG(dev, "FTL NV Cache: state loaded successfully\n");
1465 	} else {
1466 		FTL_ERRLOG(dev, "FTL NV Cache: loading state ERROR\n");
1467 	}
1468 
1469 error:
1470 	return status;
1471 }
1472 
1473 void
1474 ftl_nv_cache_get_max_seq_id(struct ftl_nv_cache *nv_cache, uint64_t *open_seq_id,
1475 			    uint64_t *close_seq_id)
1476 {
1477 	uint64_t i, o_seq_id = 0, c_seq_id = 0;
1478 	struct ftl_nv_cache_chunk *chunk;
1479 
1480 	chunk = nv_cache->chunks;
1481 	assert(chunk);
1482 
1483 	/* Iterate over chunks and get their max open and close seq id */
1484 	for (i = 0; i < nv_cache->chunk_count; i++, chunk++) {
1485 		o_seq_id = spdk_max(o_seq_id, chunk->md->seq_id);
1486 		c_seq_id = spdk_max(c_seq_id, chunk->md->close_seq_id);
1487 	}
1488 
1489 	*open_seq_id = o_seq_id;
1490 	*close_seq_id = c_seq_id;
1491 }
1492 
1493 typedef void (*ftl_chunk_ops_cb)(struct ftl_nv_cache_chunk *chunk, void *cntx, bool status);
1494 
1495 static void
1496 write_brq_end(struct spdk_bdev_io *bdev_io, bool success, void *arg)
1497 {
1498 	struct ftl_basic_rq *brq = arg;
1499 	struct ftl_nv_cache_chunk *chunk = brq->io.chunk;
1500 
1501 	brq->success = success;
1502 	if (spdk_likely(success)) {
1503 		chunk_advance_blocks(chunk->nv_cache, chunk, brq->num_blocks);
1504 	}
1505 
1506 	spdk_bdev_free_io(bdev_io);
1507 	brq->owner.cb(brq);
1508 }
1509 
1510 static void
1511 _ftl_chunk_basic_rq_write(void *_brq)
1512 {
1513 	struct ftl_basic_rq *brq = _brq;
1514 	struct ftl_nv_cache *nv_cache = brq->io.chunk->nv_cache;
1515 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1516 	int rc;
1517 
1518 	rc = ftl_nv_cache_bdev_write_blocks_with_md(dev, nv_cache->bdev_desc, nv_cache->cache_ioch,
1519 			brq->io_payload, NULL, brq->io.addr,
1520 			brq->num_blocks, write_brq_end, brq);
1521 	if (spdk_unlikely(rc)) {
1522 		if (rc == -ENOMEM) {
1523 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1524 			brq->io.bdev_io_wait.bdev = bdev;
1525 			brq->io.bdev_io_wait.cb_fn = _ftl_chunk_basic_rq_write;
1526 			brq->io.bdev_io_wait.cb_arg = brq;
1527 			spdk_bdev_queue_io_wait(bdev, nv_cache->cache_ioch, &brq->io.bdev_io_wait);
1528 		} else {
1529 			ftl_abort();
1530 		}
1531 	}
1532 }
1533 
1534 static void
1535 ftl_chunk_basic_rq_write(struct ftl_nv_cache_chunk *chunk, struct ftl_basic_rq *brq)
1536 {
1537 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
1538 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1539 
1540 	brq->io.chunk = chunk;
1541 	brq->success = false;
1542 
1543 	_ftl_chunk_basic_rq_write(brq);
1544 
1545 	chunk->md->write_pointer += brq->num_blocks;
1546 	dev->io_activity_total += brq->num_blocks;
1547 }
1548 
1549 static void
1550 chunk_open_cb(int status, void *ctx)
1551 {
1552 	struct ftl_nv_cache_chunk *chunk = (struct ftl_nv_cache_chunk *)ctx;
1553 
1554 	if (spdk_unlikely(status)) {
1555 		ftl_md_persist_entry_retry(&chunk->md_persist_entry_ctx);
1556 		return;
1557 	}
1558 
1559 	chunk->md->state = FTL_CHUNK_STATE_OPEN;
1560 }
1561 
1562 static void
1563 ftl_chunk_open(struct ftl_nv_cache_chunk *chunk)
1564 {
1565 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1566 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1567 	struct ftl_layout_region *region = &dev->layout.region[FTL_LAYOUT_REGION_TYPE_NVC_MD];
1568 	struct ftl_md *md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_NVC_MD];
1569 
1570 	if (chunk_alloc_p2l_map(chunk)) {
1571 		assert(0);
1572 		/*
1573 		 * We control number of opening chunk and it shall be consistent with size of chunk
1574 		 * P2L map pool
1575 		 */
1576 		ftl_abort();
1577 		return;
1578 	}
1579 
1580 	chunk->nv_cache->chunk_open_count++;
1581 
1582 	assert(chunk->md->write_pointer == 0);
1583 	assert(chunk->md->blocks_written == 0);
1584 
1585 	memcpy(p2l_map->chunk_dma_md, chunk->md, region->entry_size * FTL_BLOCK_SIZE);
1586 	p2l_map->chunk_dma_md->state = FTL_CHUNK_STATE_OPEN;
1587 	p2l_map->chunk_dma_md->p2l_map_checksum = 0;
1588 
1589 	ftl_md_persist_entry(md, get_chunk_idx(chunk), p2l_map->chunk_dma_md,
1590 			     NULL, chunk_open_cb, chunk,
1591 			     &chunk->md_persist_entry_ctx);
1592 }
1593 
1594 static void
1595 chunk_close_cb(int status, void *ctx)
1596 {
1597 	struct ftl_nv_cache_chunk *chunk = (struct ftl_nv_cache_chunk *)ctx;
1598 
1599 	assert(chunk->md->write_pointer == chunk->nv_cache->chunk_blocks);
1600 
1601 	if (spdk_likely(!status)) {
1602 		chunk->md->p2l_map_checksum = chunk->p2l_map.chunk_dma_md->p2l_map_checksum;
1603 		chunk_free_p2l_map(chunk);
1604 
1605 		assert(chunk->nv_cache->chunk_open_count > 0);
1606 		chunk->nv_cache->chunk_open_count--;
1607 
1608 		/* Chunk full move it on full list */
1609 		TAILQ_INSERT_TAIL(&chunk->nv_cache->chunk_full_list, chunk, entry);
1610 		chunk->nv_cache->chunk_full_count++;
1611 
1612 		chunk->nv_cache->last_seq_id = chunk->md->close_seq_id;
1613 
1614 		chunk->md->state = FTL_CHUNK_STATE_CLOSED;
1615 	} else {
1616 		ftl_md_persist_entry_retry(&chunk->md_persist_entry_ctx);
1617 	}
1618 }
1619 
1620 static void
1621 chunk_map_write_cb(struct ftl_basic_rq *brq)
1622 {
1623 	struct ftl_nv_cache_chunk *chunk = brq->io.chunk;
1624 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1625 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1626 	struct ftl_layout_region *region = &dev->layout.region[FTL_LAYOUT_REGION_TYPE_NVC_MD];
1627 	struct ftl_md *md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_NVC_MD];
1628 	uint32_t chunk_map_crc;
1629 
1630 	if (spdk_likely(brq->success)) {
1631 		chunk_map_crc = spdk_crc32c_update(p2l_map->chunk_map,
1632 						   chunk->nv_cache->tail_md_chunk_blocks * FTL_BLOCK_SIZE, 0);
1633 		memcpy(p2l_map->chunk_dma_md, chunk->md, region->entry_size * FTL_BLOCK_SIZE);
1634 		p2l_map->chunk_dma_md->state = FTL_CHUNK_STATE_CLOSED;
1635 		p2l_map->chunk_dma_md->p2l_map_checksum = chunk_map_crc;
1636 		ftl_md_persist_entry(md, get_chunk_idx(chunk), chunk->p2l_map.chunk_dma_md,
1637 				     NULL, chunk_close_cb, chunk,
1638 				     &chunk->md_persist_entry_ctx);
1639 	} else {
1640 		/* retry */
1641 		chunk->md->write_pointer -= brq->num_blocks;
1642 		ftl_chunk_basic_rq_write(chunk, brq);
1643 	}
1644 }
1645 
1646 static void
1647 ftl_chunk_close(struct ftl_nv_cache_chunk *chunk)
1648 {
1649 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1650 	struct ftl_basic_rq *brq = &chunk->metadata_rq;
1651 	void *metadata = chunk->p2l_map.chunk_map;
1652 
1653 	chunk->md->close_seq_id = ftl_get_next_seq_id(dev);
1654 	ftl_basic_rq_init(dev, brq, metadata, chunk->nv_cache->tail_md_chunk_blocks);
1655 	ftl_basic_rq_set_owner(brq, chunk_map_write_cb, chunk);
1656 
1657 	assert(chunk->md->write_pointer == chunk_tail_md_offset(chunk->nv_cache));
1658 	brq->io.addr = chunk->offset + chunk->md->write_pointer;
1659 
1660 	ftl_chunk_basic_rq_write(chunk, brq);
1661 }
1662 
1663 int
1664 ftl_nv_cache_chunks_busy(struct ftl_nv_cache *nv_cache)
1665 {
1666 	/* chunk_current is migrating to closed status when closing, any others should already be
1667 	 * moved to free chunk list. Also need to wait for free md requests */
1668 	return nv_cache->chunk_open_count == 0 && nv_cache->chunk_free_persist_count == 0;
1669 }
1670 
1671 void
1672 ftl_nv_cache_halt(struct ftl_nv_cache *nv_cache)
1673 {
1674 	struct ftl_nv_cache_chunk *chunk;
1675 	uint64_t free_space;
1676 
1677 	nv_cache->halt = true;
1678 
1679 	/* Set chunks on open list back to free state since no user data has been written to it */
1680 	while (!TAILQ_EMPTY(&nv_cache->chunk_open_list)) {
1681 		chunk = TAILQ_FIRST(&nv_cache->chunk_open_list);
1682 
1683 		/* Chunks are moved between lists on metadata update submission, but state is changed
1684 		 * on completion. Breaking early in such a case to make sure all the necessary resources
1685 		 * will be freed (during next pass(es) of ftl_nv_cache_halt).
1686 		 */
1687 		if (chunk->md->state != FTL_CHUNK_STATE_OPEN) {
1688 			break;
1689 		}
1690 
1691 		TAILQ_REMOVE(&nv_cache->chunk_open_list, chunk, entry);
1692 		chunk_free_p2l_map(chunk);
1693 		memset(chunk->md, 0, sizeof(*chunk->md));
1694 		assert(nv_cache->chunk_open_count > 0);
1695 		nv_cache->chunk_open_count--;
1696 	}
1697 
1698 	/* Close current chunk by skipping all not written blocks */
1699 	chunk = nv_cache->chunk_current;
1700 	if (chunk != NULL) {
1701 		nv_cache->chunk_current = NULL;
1702 		if (chunk_is_closed(chunk)) {
1703 			return;
1704 		}
1705 
1706 		free_space = chunk_get_free_space(nv_cache, chunk);
1707 		chunk->md->blocks_skipped = free_space;
1708 		chunk->md->blocks_written += free_space;
1709 		chunk->md->write_pointer += free_space;
1710 		ftl_chunk_close(chunk);
1711 	}
1712 }
1713