xref: /spdk/lib/ftl/ftl_nv_cache.c (revision 0b9ba6a3308396569e189dacb2bbea56943e6dd3)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 
7 #include "spdk/bdev.h"
8 #include "spdk/bdev_module.h"
9 #include "spdk/ftl.h"
10 #include "spdk/string.h"
11 
12 #include "ftl_nv_cache.h"
13 #include "ftl_nv_cache_io.h"
14 #include "ftl_core.h"
15 #include "ftl_band.h"
16 #include "utils/ftl_addr_utils.h"
17 #include "mngt/ftl_mngt.h"
18 
19 static inline uint64_t nvc_data_blocks(struct ftl_nv_cache *nv_cache) __attribute__((unused));
20 static struct ftl_nv_cache_compactor *compactor_alloc(struct spdk_ftl_dev *dev);
21 static void compactor_free(struct spdk_ftl_dev *dev, struct ftl_nv_cache_compactor *compactor);
22 static void compaction_process_ftl_done(struct ftl_rq *rq);
23 
24 static inline const struct ftl_layout_region *
25 nvc_data_region(struct ftl_nv_cache *nv_cache)
26 {
27 	struct spdk_ftl_dev *dev;
28 
29 	dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
30 	return &dev->layout.region[FTL_LAYOUT_REGION_TYPE_DATA_NVC];
31 }
32 
33 static inline void
34 nvc_validate_md(struct ftl_nv_cache *nv_cache,
35 		struct ftl_nv_cache_chunk_md *chunk_md)
36 {
37 	struct ftl_md *md = nv_cache->md;
38 	void *buffer = ftl_md_get_buffer(md);
39 	uint64_t size = ftl_md_get_buffer_size(md);
40 	void *ptr = chunk_md;
41 
42 	if (ptr < buffer) {
43 		ftl_abort();
44 	}
45 
46 	ptr += sizeof(*chunk_md);
47 	if (ptr > buffer + size) {
48 		ftl_abort();
49 	}
50 }
51 
52 static inline uint64_t
53 nvc_data_offset(struct ftl_nv_cache *nv_cache)
54 {
55 	return nvc_data_region(nv_cache)->current.offset;
56 }
57 
58 static inline uint64_t
59 nvc_data_blocks(struct ftl_nv_cache *nv_cache)
60 {
61 	return nvc_data_region(nv_cache)->current.blocks;
62 }
63 
64 size_t
65 ftl_nv_cache_chunk_tail_md_num_blocks(const struct ftl_nv_cache *nv_cache)
66 {
67 	struct spdk_ftl_dev *dev =  SPDK_CONTAINEROF(nv_cache,
68 				    struct spdk_ftl_dev, nv_cache);
69 	return spdk_divide_round_up(dev->layout.nvc.chunk_data_blocks * dev->layout.l2p.addr_size,
70 				    FTL_BLOCK_SIZE);
71 }
72 
73 static size_t
74 nv_cache_p2l_map_pool_elem_size(const struct ftl_nv_cache *nv_cache)
75 {
76 	/* Map pool element holds the whole tail md */
77 	return nv_cache->tail_md_chunk_blocks * FTL_BLOCK_SIZE;
78 }
79 
80 static uint64_t
81 get_chunk_idx(struct ftl_nv_cache_chunk *chunk)
82 {
83 	struct ftl_nv_cache_chunk *first_chunk = chunk->nv_cache->chunks;
84 
85 	return (chunk->offset - first_chunk->offset) / chunk->nv_cache->chunk_blocks;
86 }
87 
88 int
89 ftl_nv_cache_init(struct spdk_ftl_dev *dev)
90 {
91 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
92 	struct ftl_nv_cache_chunk *chunk;
93 	struct ftl_nv_cache_chunk_md *md;
94 	struct ftl_nv_cache_compactor *compactor;
95 	uint64_t i, offset;
96 
97 	nv_cache->halt = true;
98 
99 	nv_cache->md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_NVC_MD];
100 	if (!nv_cache->md) {
101 		FTL_ERRLOG(dev, "No NV cache metadata object\n");
102 		return -1;
103 	}
104 
105 	nv_cache->md_pool = ftl_mempool_create(dev->conf.user_io_pool_size,
106 					       nv_cache->md_size * dev->xfer_size,
107 					       FTL_BLOCK_SIZE, SPDK_ENV_SOCKET_ID_ANY);
108 	if (!nv_cache->md_pool) {
109 		FTL_ERRLOG(dev, "Failed to initialize NV cache metadata pool\n");
110 		return -1;
111 	}
112 
113 	/*
114 	 * Initialize chunk info
115 	 */
116 	nv_cache->chunk_blocks = dev->layout.nvc.chunk_data_blocks;
117 	nv_cache->chunk_count = dev->layout.nvc.chunk_count;
118 	nv_cache->tail_md_chunk_blocks = ftl_nv_cache_chunk_tail_md_num_blocks(nv_cache);
119 
120 	/* Allocate chunks */
121 	nv_cache->chunks = calloc(nv_cache->chunk_count,
122 				  sizeof(nv_cache->chunks[0]));
123 	if (!nv_cache->chunks) {
124 		FTL_ERRLOG(dev, "Failed to initialize NV cache chunks\n");
125 		return -1;
126 	}
127 
128 	TAILQ_INIT(&nv_cache->chunk_free_list);
129 	TAILQ_INIT(&nv_cache->chunk_open_list);
130 	TAILQ_INIT(&nv_cache->chunk_full_list);
131 	TAILQ_INIT(&nv_cache->chunk_comp_list);
132 	TAILQ_INIT(&nv_cache->needs_free_persist_list);
133 
134 	/* First chunk metadata */
135 	md = ftl_md_get_buffer(nv_cache->md);
136 	if (!md) {
137 		FTL_ERRLOG(dev, "No NV cache metadata\n");
138 		return -1;
139 	}
140 
141 	nv_cache->chunk_free_count = nv_cache->chunk_count;
142 
143 	chunk = nv_cache->chunks;
144 	offset = nvc_data_offset(nv_cache);
145 	for (i = 0; i < nv_cache->chunk_count; i++, chunk++, md++) {
146 		chunk->nv_cache = nv_cache;
147 		chunk->md = md;
148 		nvc_validate_md(nv_cache, md);
149 		chunk->offset = offset;
150 		offset += nv_cache->chunk_blocks;
151 		TAILQ_INSERT_TAIL(&nv_cache->chunk_free_list, chunk, entry);
152 	}
153 	assert(offset <= nvc_data_offset(nv_cache) + nvc_data_blocks(nv_cache));
154 
155 	/* Start compaction when full chunks exceed given % of entire chunks */
156 	nv_cache->chunk_compaction_threshold = nv_cache->chunk_count *
157 					       dev->conf.nv_cache.chunk_compaction_threshold / 100;
158 	TAILQ_INIT(&nv_cache->compactor_list);
159 	for (i = 0; i < FTL_NV_CACHE_NUM_COMPACTORS; i++) {
160 		compactor = compactor_alloc(dev);
161 
162 		if (!compactor) {
163 			FTL_ERRLOG(dev, "Cannot allocate compaction process\n");
164 			return -1;
165 		}
166 
167 		TAILQ_INSERT_TAIL(&nv_cache->compactor_list, compactor, entry);
168 	}
169 
170 #define FTL_MAX_OPEN_CHUNKS 2
171 	nv_cache->p2l_pool = ftl_mempool_create(FTL_MAX_OPEN_CHUNKS,
172 						nv_cache_p2l_map_pool_elem_size(nv_cache),
173 						FTL_BLOCK_SIZE,
174 						SPDK_ENV_SOCKET_ID_ANY);
175 	if (!nv_cache->p2l_pool) {
176 		return -ENOMEM;
177 	}
178 
179 	/* One entry per open chunk */
180 	nv_cache->chunk_md_pool = ftl_mempool_create(FTL_MAX_OPEN_CHUNKS,
181 				  sizeof(struct ftl_nv_cache_chunk_md),
182 				  FTL_BLOCK_SIZE,
183 				  SPDK_ENV_SOCKET_ID_ANY);
184 	if (!nv_cache->chunk_md_pool) {
185 		return -ENOMEM;
186 	}
187 
188 	/* Each compactor can be reading a different chunk which it needs to switch state to free to at the end,
189 	 * plus one backup each for high invalidity chunks processing (if there's a backlog of chunks with extremely
190 	 * small, even 0, validity then they can be processed by the compactors quickly and trigger a lot of updates
191 	 * to free state at once) */
192 	nv_cache->free_chunk_md_pool = ftl_mempool_create(2 * FTL_NV_CACHE_NUM_COMPACTORS,
193 				       sizeof(struct ftl_nv_cache_chunk_md),
194 				       FTL_BLOCK_SIZE,
195 				       SPDK_ENV_SOCKET_ID_ANY);
196 	if (!nv_cache->free_chunk_md_pool) {
197 		return -ENOMEM;
198 	}
199 
200 	return 0;
201 }
202 
203 void
204 ftl_nv_cache_deinit(struct spdk_ftl_dev *dev)
205 {
206 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
207 	struct ftl_nv_cache_compactor *compactor;
208 
209 	while (!TAILQ_EMPTY(&nv_cache->compactor_list)) {
210 		compactor = TAILQ_FIRST(&nv_cache->compactor_list);
211 		TAILQ_REMOVE(&nv_cache->compactor_list, compactor, entry);
212 
213 		compactor_free(dev, compactor);
214 	}
215 
216 	ftl_mempool_destroy(nv_cache->md_pool);
217 	ftl_mempool_destroy(nv_cache->p2l_pool);
218 	ftl_mempool_destroy(nv_cache->chunk_md_pool);
219 	ftl_mempool_destroy(nv_cache->free_chunk_md_pool);
220 	nv_cache->md_pool = NULL;
221 	nv_cache->p2l_pool = NULL;
222 	nv_cache->chunk_md_pool = NULL;
223 	nv_cache->free_chunk_md_pool = NULL;
224 
225 	free(nv_cache->chunks);
226 	nv_cache->chunks = NULL;
227 }
228 
229 static uint64_t
230 chunk_get_free_space(struct ftl_nv_cache *nv_cache,
231 		     struct ftl_nv_cache_chunk *chunk)
232 {
233 	assert(chunk->md->write_pointer + nv_cache->tail_md_chunk_blocks <=
234 	       nv_cache->chunk_blocks);
235 	return nv_cache->chunk_blocks - chunk->md->write_pointer -
236 	       nv_cache->tail_md_chunk_blocks;
237 }
238 
239 static bool
240 chunk_is_closed(struct ftl_nv_cache_chunk *chunk)
241 {
242 	return chunk->md->write_pointer == chunk->nv_cache->chunk_blocks;
243 }
244 
245 static void ftl_chunk_close(struct ftl_nv_cache_chunk *chunk);
246 
247 static uint64_t
248 ftl_nv_cache_get_wr_buffer(struct ftl_nv_cache *nv_cache, struct ftl_io *io)
249 {
250 	uint64_t address = FTL_LBA_INVALID;
251 	uint64_t num_blocks = io->num_blocks;
252 	uint64_t free_space;
253 	struct ftl_nv_cache_chunk *chunk;
254 
255 	do {
256 		chunk = nv_cache->chunk_current;
257 		/* Chunk has been closed so pick new one */
258 		if (chunk && chunk_is_closed(chunk))  {
259 			chunk = NULL;
260 		}
261 
262 		if (!chunk) {
263 			chunk = TAILQ_FIRST(&nv_cache->chunk_open_list);
264 			if (chunk && chunk->md->state == FTL_CHUNK_STATE_OPEN) {
265 				TAILQ_REMOVE(&nv_cache->chunk_open_list, chunk, entry);
266 				nv_cache->chunk_current = chunk;
267 			} else {
268 				break;
269 			}
270 		}
271 
272 		free_space = chunk_get_free_space(nv_cache, chunk);
273 
274 		if (free_space >= num_blocks) {
275 			/* Enough space in chunk */
276 
277 			/* Calculate address in NV cache */
278 			address = chunk->offset + chunk->md->write_pointer;
279 
280 			/* Set chunk in IO */
281 			io->nv_cache_chunk = chunk;
282 
283 			/* Move write pointer */
284 			chunk->md->write_pointer += num_blocks;
285 			break;
286 		}
287 
288 		/* Not enough space in nv_cache_chunk */
289 		nv_cache->chunk_current = NULL;
290 
291 		if (0 == free_space) {
292 			continue;
293 		}
294 
295 		chunk->md->blocks_skipped = free_space;
296 		chunk->md->blocks_written += free_space;
297 		chunk->md->write_pointer += free_space;
298 
299 		if (chunk->md->blocks_written == chunk_tail_md_offset(nv_cache)) {
300 			ftl_chunk_close(chunk);
301 		}
302 	} while (1);
303 
304 	return address;
305 }
306 
307 void
308 ftl_nv_cache_fill_md(struct ftl_io *io)
309 {
310 	uint64_t i;
311 	union ftl_md_vss *metadata = io->md;
312 	uint64_t lba = ftl_io_get_lba(io, 0);
313 
314 	for (i = 0; i < io->num_blocks; ++i, lba++, metadata++) {
315 		metadata->nv_cache.lba = lba;
316 	}
317 }
318 
319 uint64_t
320 chunk_tail_md_offset(struct ftl_nv_cache *nv_cache)
321 {
322 	return nv_cache->chunk_blocks - nv_cache->tail_md_chunk_blocks;
323 }
324 
325 static void
326 chunk_advance_blocks(struct ftl_nv_cache *nv_cache, struct ftl_nv_cache_chunk *chunk,
327 		     uint64_t advanced_blocks)
328 {
329 	chunk->md->blocks_written += advanced_blocks;
330 
331 	assert(chunk->md->blocks_written <= nv_cache->chunk_blocks);
332 
333 	if (chunk->md->blocks_written == chunk_tail_md_offset(nv_cache)) {
334 		ftl_chunk_close(chunk);
335 	}
336 }
337 
338 static uint64_t
339 chunk_user_blocks_written(struct ftl_nv_cache_chunk *chunk)
340 {
341 	return chunk->md->blocks_written - chunk->md->blocks_skipped -
342 	       chunk->nv_cache->tail_md_chunk_blocks;
343 }
344 
345 static bool
346 is_chunk_compacted(struct ftl_nv_cache_chunk *chunk)
347 {
348 	assert(chunk->md->blocks_written != 0);
349 
350 	if (chunk_user_blocks_written(chunk) == chunk->md->blocks_compacted) {
351 		return true;
352 	}
353 
354 	return false;
355 }
356 
357 static int
358 ftl_chunk_alloc_md_entry(struct ftl_nv_cache_chunk *chunk)
359 {
360 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
361 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
362 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
363 	struct ftl_layout_region *region = &dev->layout.region[FTL_LAYOUT_REGION_TYPE_NVC_MD];
364 
365 	p2l_map->chunk_dma_md = ftl_mempool_get(nv_cache->chunk_md_pool);
366 
367 	if (!p2l_map->chunk_dma_md) {
368 		return -ENOMEM;
369 	}
370 
371 	memset(p2l_map->chunk_dma_md, 0, region->entry_size * FTL_BLOCK_SIZE);
372 	return 0;
373 }
374 
375 static void
376 ftl_chunk_free_md_entry(struct ftl_nv_cache_chunk *chunk)
377 {
378 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
379 
380 	ftl_mempool_put(chunk->nv_cache->chunk_md_pool, p2l_map->chunk_dma_md);
381 	p2l_map->chunk_dma_md = NULL;
382 }
383 
384 static void
385 ftl_chunk_free(struct ftl_nv_cache_chunk *chunk)
386 {
387 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
388 
389 	/* Reset chunk */
390 	memset(chunk->md, 0, sizeof(*chunk->md));
391 
392 	TAILQ_INSERT_TAIL(&nv_cache->needs_free_persist_list, chunk, entry);
393 	nv_cache->chunk_free_persist_count++;
394 }
395 
396 static int
397 ftl_chunk_alloc_chunk_free_entry(struct ftl_nv_cache_chunk *chunk)
398 {
399 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
400 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
401 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
402 	struct ftl_layout_region *region = &dev->layout.region[FTL_LAYOUT_REGION_TYPE_NVC_MD];
403 
404 	p2l_map->chunk_dma_md = ftl_mempool_get(nv_cache->free_chunk_md_pool);
405 
406 	if (!p2l_map->chunk_dma_md) {
407 		return -ENOMEM;
408 	}
409 
410 	memset(p2l_map->chunk_dma_md, 0, region->entry_size * FTL_BLOCK_SIZE);
411 	return 0;
412 }
413 
414 static void
415 ftl_chunk_free_chunk_free_entry(struct ftl_nv_cache_chunk *chunk)
416 {
417 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
418 
419 	ftl_mempool_put(chunk->nv_cache->free_chunk_md_pool, p2l_map->chunk_dma_md);
420 	p2l_map->chunk_dma_md = NULL;
421 }
422 
423 static void
424 chunk_free_cb(int status, void *ctx)
425 {
426 	struct ftl_nv_cache_chunk *chunk = (struct ftl_nv_cache_chunk *)ctx;
427 
428 	if (spdk_likely(!status)) {
429 		struct ftl_nv_cache *nv_cache = chunk->nv_cache;
430 
431 		nv_cache->chunk_free_persist_count--;
432 		TAILQ_INSERT_TAIL(&nv_cache->chunk_free_list, chunk, entry);
433 		nv_cache->chunk_free_count++;
434 		nv_cache->chunk_full_count--;
435 		chunk->md->state = FTL_CHUNK_STATE_FREE;
436 		ftl_chunk_free_chunk_free_entry(chunk);
437 	} else {
438 		ftl_md_persist_entry_retry(&chunk->md_persist_entry_ctx);
439 	}
440 }
441 
442 static void
443 ftl_chunk_persist_free_state(struct ftl_nv_cache *nv_cache)
444 {
445 	int rc;
446 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
447 	struct ftl_p2l_map *p2l_map;
448 	struct ftl_md *md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_NVC_MD];
449 	struct ftl_layout_region *region = &dev->layout.region[FTL_LAYOUT_REGION_TYPE_NVC_MD];
450 	struct ftl_nv_cache_chunk *tchunk, *chunk = NULL;
451 
452 	TAILQ_FOREACH_SAFE(chunk, &nv_cache->needs_free_persist_list, entry, tchunk) {
453 		p2l_map = &chunk->p2l_map;
454 		rc = ftl_chunk_alloc_chunk_free_entry(chunk);
455 		if (rc) {
456 			break;
457 		}
458 
459 		TAILQ_REMOVE(&nv_cache->needs_free_persist_list, chunk, entry);
460 
461 		memcpy(p2l_map->chunk_dma_md, chunk->md, region->entry_size * FTL_BLOCK_SIZE);
462 		p2l_map->chunk_dma_md->state = FTL_CHUNK_STATE_FREE;
463 		p2l_map->chunk_dma_md->p2l_map_checksum = 0;
464 
465 		ftl_md_persist_entry(md, get_chunk_idx(chunk), p2l_map->chunk_dma_md, NULL,
466 				     chunk_free_cb, chunk, &chunk->md_persist_entry_ctx);
467 	}
468 }
469 
470 static void
471 chunk_compaction_advance(struct ftl_nv_cache_chunk *chunk, uint64_t num_blocks)
472 {
473 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
474 
475 	chunk->md->blocks_compacted += num_blocks;
476 	if (!is_chunk_compacted(chunk)) {
477 		return;
478 	}
479 
480 	/* Remove chunk from compacted list */
481 	TAILQ_REMOVE(&nv_cache->chunk_comp_list, chunk, entry);
482 	nv_cache->chunk_comp_count--;
483 
484 	ftl_chunk_free(chunk);
485 }
486 
487 static bool
488 is_compaction_required(struct ftl_nv_cache *nv_cache)
489 {
490 	uint64_t full;
491 
492 	if (spdk_unlikely(nv_cache->halt)) {
493 		return false;
494 	}
495 
496 	full = nv_cache->chunk_full_count - nv_cache->compaction_active_count;
497 	if (full >= nv_cache->chunk_compaction_threshold) {
498 		return true;
499 	}
500 
501 	return false;
502 }
503 
504 static void compaction_process_finish_read(struct ftl_nv_cache_compactor *compactor);
505 static void compaction_process_pin_lba(struct ftl_nv_cache_compactor *comp);
506 
507 static void
508 _compaction_process_pin_lba(void *_comp)
509 {
510 	struct ftl_nv_cache_compactor *comp = _comp;
511 
512 	compaction_process_pin_lba(comp);
513 }
514 
515 static void
516 compaction_process_pin_lba_cb(struct spdk_ftl_dev *dev, int status, struct ftl_l2p_pin_ctx *pin_ctx)
517 {
518 	struct ftl_nv_cache_compactor *comp = pin_ctx->cb_ctx;
519 	struct ftl_rq *rq = comp->rd;
520 
521 	if (status) {
522 		rq->iter.status = status;
523 		pin_ctx->lba = FTL_LBA_INVALID;
524 	}
525 
526 	if (--rq->iter.remaining == 0) {
527 		if (rq->iter.status) {
528 			/* unpin and try again */
529 			ftl_rq_unpin(rq);
530 			spdk_thread_send_msg(spdk_get_thread(), _compaction_process_pin_lba, comp);
531 			return;
532 		}
533 
534 		compaction_process_finish_read(comp);
535 	}
536 }
537 
538 static void
539 compaction_process_pin_lba(struct ftl_nv_cache_compactor *comp)
540 {
541 	union ftl_md_vss *md;
542 	struct spdk_ftl_dev *dev = comp->rd->dev;
543 	uint64_t i;
544 	uint32_t count = comp->rd->iter.count;
545 	struct ftl_rq_entry *entry;
546 	struct ftl_l2p_pin_ctx *pin_ctx;
547 
548 	assert(comp->rd->iter.idx == 0);
549 	comp->rd->iter.remaining = count;
550 	comp->rd->iter.status = 0;
551 
552 	for (i = 0; i < count; i++) {
553 		entry = &comp->rd->entries[i];
554 		pin_ctx = &entry->l2p_pin_ctx;
555 		md = entry->io_md;
556 		if (md->nv_cache.lba == FTL_LBA_INVALID) {
557 			ftl_l2p_pin_skip(dev, compaction_process_pin_lba_cb, comp, pin_ctx);
558 		} else {
559 			ftl_l2p_pin(dev, md->nv_cache.lba, 1, compaction_process_pin_lba_cb, comp, pin_ctx);
560 		}
561 	}
562 }
563 
564 static int compaction_submit_read(struct ftl_nv_cache_compactor *compactor, ftl_addr addr,
565 				  uint64_t num_blocks);
566 
567 static void
568 compaction_retry_read(void *_compactor)
569 {
570 	struct ftl_nv_cache_compactor *compactor = _compactor;
571 	struct ftl_rq *rq = compactor->rd;
572 	struct spdk_bdev *bdev;
573 	int ret;
574 
575 	ret = compaction_submit_read(compactor, rq->io.addr, rq->iter.count);
576 
577 	if (ret == -ENOMEM) {
578 		bdev = spdk_bdev_desc_get_bdev(compactor->nv_cache->bdev_desc);
579 		compactor->bdev_io_wait.bdev = bdev;
580 		compactor->bdev_io_wait.cb_fn = compaction_retry_read;
581 		compactor->bdev_io_wait.cb_arg = compactor;
582 		spdk_bdev_queue_io_wait(bdev, compactor->nv_cache->cache_ioch, &compactor->bdev_io_wait);
583 	} else {
584 		ftl_abort();
585 	}
586 }
587 
588 static void
589 compaction_process_read_cb(struct spdk_bdev_io *bdev_io,
590 			   bool success, void *cb_arg)
591 {
592 	struct ftl_nv_cache_compactor *compactor = cb_arg;
593 
594 	spdk_bdev_free_io(bdev_io);
595 
596 	if (!success) {
597 		/* retry */
598 		spdk_thread_send_msg(spdk_get_thread(), compaction_retry_read, compactor);
599 		return;
600 	}
601 
602 	compaction_process_pin_lba(compactor);
603 }
604 
605 static bool
606 is_chunk_to_read(struct ftl_nv_cache_chunk *chunk)
607 {
608 	assert(chunk->md->blocks_written != 0);
609 
610 	if (chunk_user_blocks_written(chunk) == chunk->md->read_pointer) {
611 		return false;
612 	}
613 
614 	return true;
615 }
616 
617 static struct ftl_nv_cache_chunk *
618 get_chunk_for_compaction(struct ftl_nv_cache *nv_cache)
619 {
620 	struct ftl_nv_cache_chunk *chunk = NULL;
621 
622 	if (!TAILQ_EMPTY(&nv_cache->chunk_comp_list)) {
623 		chunk = TAILQ_FIRST(&nv_cache->chunk_comp_list);
624 		if (is_chunk_to_read(chunk)) {
625 			return chunk;
626 		}
627 	}
628 
629 	if (!TAILQ_EMPTY(&nv_cache->chunk_full_list)) {
630 		chunk = TAILQ_FIRST(&nv_cache->chunk_full_list);
631 		TAILQ_REMOVE(&nv_cache->chunk_full_list, chunk, entry);
632 
633 		assert(chunk->md->write_pointer);
634 	} else {
635 		return NULL;
636 	}
637 
638 	if (spdk_likely(chunk)) {
639 		assert(chunk->md->write_pointer != 0);
640 		TAILQ_INSERT_HEAD(&nv_cache->chunk_comp_list, chunk, entry);
641 		nv_cache->chunk_comp_count++;
642 	}
643 
644 	return chunk;
645 }
646 
647 static uint64_t
648 chunk_blocks_to_read(struct ftl_nv_cache_chunk *chunk)
649 {
650 	uint64_t blocks_written;
651 	uint64_t blocks_to_read;
652 
653 	assert(chunk->md->blocks_written >= chunk->md->blocks_skipped);
654 	blocks_written = chunk_user_blocks_written(chunk);
655 
656 	assert(blocks_written >= chunk->md->read_pointer);
657 	blocks_to_read = blocks_written - chunk->md->read_pointer;
658 
659 	return blocks_to_read;
660 }
661 
662 static void
663 compactor_deactivate(struct ftl_nv_cache_compactor *compactor)
664 {
665 	struct ftl_nv_cache *nv_cache = compactor->nv_cache;
666 
667 	nv_cache->compaction_active_count--;
668 	TAILQ_INSERT_TAIL(&nv_cache->compactor_list, compactor, entry);
669 }
670 
671 static int
672 compaction_submit_read(struct ftl_nv_cache_compactor *compactor, ftl_addr addr,
673 		       uint64_t num_blocks)
674 {
675 	struct ftl_nv_cache *nv_cache = compactor->nv_cache;
676 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
677 
678 	return ftl_nv_cache_bdev_readv_blocks_with_md(dev, nv_cache->bdev_desc,
679 			nv_cache->cache_ioch,
680 			compactor->rd->io_vec, num_blocks,
681 			compactor->rd->io_md,
682 			ftl_addr_to_nvc_offset(dev, addr), num_blocks,
683 			compaction_process_read_cb,
684 			compactor);
685 }
686 
687 static void
688 compaction_process_pad(struct ftl_nv_cache_compactor *compactor)
689 {
690 	struct ftl_rq *wr = compactor->wr;
691 	const uint64_t num_entries = wr->num_blocks;
692 	struct ftl_rq_entry *iter;
693 
694 	iter = &wr->entries[wr->iter.idx];
695 
696 	while (wr->iter.idx < num_entries) {
697 		iter->addr = FTL_ADDR_INVALID;
698 		iter->owner.priv = NULL;
699 		iter->lba = FTL_LBA_INVALID;
700 		iter++;
701 		wr->iter.idx++;
702 	}
703 }
704 
705 static void
706 compaction_process(struct ftl_nv_cache_compactor *compactor)
707 {
708 	struct ftl_nv_cache *nv_cache = compactor->nv_cache;
709 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache,
710 				   struct spdk_ftl_dev, nv_cache);
711 	struct ftl_nv_cache_chunk *chunk;
712 	uint64_t to_read, addr, begin, end, offset;
713 	int rc;
714 
715 	/* Check if all read blocks done */
716 	assert(compactor->rd->iter.idx <= compactor->rd->iter.count);
717 	if (compactor->rd->iter.idx < compactor->rd->iter.count) {
718 		compaction_process_finish_read(compactor);
719 		return;
720 	}
721 
722 	/*
723 	 * Get currently handled chunk
724 	 */
725 	chunk = get_chunk_for_compaction(nv_cache);
726 	if (!chunk) {
727 		/* No chunks to compact, pad this request */
728 		compaction_process_pad(compactor);
729 		ftl_writer_queue_rq(&dev->writer_user, compactor->wr);
730 		return;
731 	}
732 
733 	/*
734 	 * Get range of blocks to read
735 	 */
736 	to_read = chunk_blocks_to_read(chunk);
737 	assert(to_read > 0);
738 
739 	addr = ftl_addr_from_nvc_offset(dev, chunk->offset + chunk->md->read_pointer);
740 	begin = ftl_bitmap_find_first_set(dev->valid_map, addr, addr + to_read);
741 	if (begin != UINT64_MAX) {
742 		offset = spdk_min(begin - addr, to_read);
743 	} else {
744 		offset = to_read;
745 	}
746 
747 	if (offset) {
748 		chunk->md->read_pointer += offset;
749 		chunk_compaction_advance(chunk, offset);
750 		to_read -= offset;
751 		if (!to_read) {
752 			compactor_deactivate(compactor);
753 			return;
754 		}
755 	}
756 
757 	end = ftl_bitmap_find_first_clear(dev->valid_map, begin + 1, begin + to_read);
758 	if (end != UINT64_MAX) {
759 		to_read = end - begin;
760 	}
761 
762 	addr = begin;
763 	to_read = spdk_min(to_read, compactor->rd->num_blocks);
764 
765 	/* Read data and metadata from NV cache */
766 	rc = compaction_submit_read(compactor, addr, to_read);
767 	if (spdk_unlikely(rc)) {
768 		/* An error occurred, inactivate this compactor, it will retry
769 		 * in next iteration
770 		 */
771 		compactor_deactivate(compactor);
772 		return;
773 	}
774 
775 	/* IO has started, initialize compaction */
776 	compactor->rd->owner.priv = chunk;
777 	compactor->rd->iter.idx = 0;
778 	compactor->rd->iter.count = to_read;
779 	compactor->rd->io.addr = addr;
780 
781 	/* Move read pointer in the chunk */
782 	chunk->md->read_pointer += to_read;
783 }
784 
785 static void
786 compaction_process_start(struct ftl_nv_cache_compactor *compactor)
787 {
788 	compactor->nv_cache->compaction_active_count++;
789 	compaction_process(compactor);
790 }
791 
792 static void
793 compaction_process_ftl_done(struct ftl_rq *rq)
794 {
795 	struct spdk_ftl_dev *dev = rq->dev;
796 	struct ftl_nv_cache_compactor *compactor = rq->owner.priv;
797 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
798 	struct ftl_band *band = rq->io.band;
799 	struct ftl_rq_entry *entry;
800 	ftl_addr addr;
801 	uint64_t i;
802 
803 	if (spdk_unlikely(false == rq->success)) {
804 		/* IO error retry writing */
805 		ftl_writer_queue_rq(&dev->writer_user, rq);
806 		return;
807 	}
808 
809 	/* Update L2P table */
810 	addr = rq->io.addr;
811 	for (i = 0, entry = rq->entries; i < rq->num_blocks; i++, entry++) {
812 		struct ftl_nv_cache_chunk *chunk = entry->owner.priv;
813 
814 		if (entry->lba == FTL_LBA_INVALID) {
815 			assert(entry->addr == FTL_ADDR_INVALID);
816 			addr = ftl_band_next_addr(band, addr, 1);
817 			continue;
818 		}
819 
820 		ftl_l2p_update_base(dev, entry->lba, addr, entry->addr);
821 		ftl_l2p_unpin(dev, entry->lba, 1);
822 
823 		chunk_compaction_advance(chunk, 1);
824 		addr = ftl_band_next_addr(band, addr, 1);
825 	}
826 
827 	compactor->wr->iter.idx = 0;
828 
829 	if (is_compaction_required(nv_cache)) {
830 		compaction_process(compactor);
831 	} else {
832 		compactor_deactivate(compactor);
833 	}
834 }
835 
836 static void
837 compaction_process_finish_read(struct ftl_nv_cache_compactor *compactor)
838 {
839 	struct ftl_rq *wr = compactor->wr;
840 	struct ftl_rq *rd = compactor->rd;
841 	ftl_addr cache_addr = rd->io.addr;
842 	struct ftl_nv_cache_chunk *chunk = rd->owner.priv;
843 	struct spdk_ftl_dev *dev;
844 	struct ftl_rq_entry *iter;
845 	union ftl_md_vss *md;
846 	ftl_addr current_addr;
847 	const uint64_t num_entries = wr->num_blocks;
848 
849 	dev = SPDK_CONTAINEROF(compactor->nv_cache,
850 			       struct spdk_ftl_dev, nv_cache);
851 
852 	assert(wr->iter.idx < num_entries);
853 	assert(rd->iter.idx < rd->iter.count);
854 
855 	cache_addr += rd->iter.idx;
856 	iter = &wr->entries[wr->iter.idx];
857 
858 	while (wr->iter.idx < num_entries && rd->iter.idx < rd->iter.count) {
859 		/* Get metadata */
860 		md = rd->entries[rd->iter.idx].io_md;
861 		if (md->nv_cache.lba == FTL_LBA_INVALID) {
862 			cache_addr++;
863 			rd->iter.idx++;
864 			chunk_compaction_advance(chunk, 1);
865 			continue;
866 		}
867 
868 		current_addr = ftl_l2p_get(dev, md->nv_cache.lba);
869 		if (current_addr == cache_addr) {
870 			/* Swap payload */
871 			ftl_rq_swap_payload(wr, wr->iter.idx, rd, rd->iter.idx);
872 
873 			/*
874 			 * Address still the same, we may continue to compact it
875 			 * back to  FTL, set valid number of entries within
876 			 * this batch
877 			 */
878 			iter->addr = current_addr;
879 			iter->owner.priv = chunk;
880 			iter->lba = md->nv_cache.lba;
881 
882 			/* Advance within batch */
883 			iter++;
884 			wr->iter.idx++;
885 		} else {
886 			/* This address already invalidated, just omit this block */
887 			chunk_compaction_advance(chunk, 1);
888 			ftl_l2p_unpin(dev, md->nv_cache.lba, 1);
889 		}
890 
891 		/* Advance within reader */
892 		rd->iter.idx++;
893 		cache_addr++;
894 	}
895 
896 	if (num_entries == wr->iter.idx) {
897 		/*
898 		 * Request contains data to be placed on FTL, compact it
899 		 */
900 		ftl_writer_queue_rq(&dev->writer_user, wr);
901 	} else {
902 		if (is_compaction_required(compactor->nv_cache)) {
903 			compaction_process(compactor);
904 		} else {
905 			compactor_deactivate(compactor);
906 		}
907 	}
908 }
909 
910 static void
911 compactor_free(struct spdk_ftl_dev *dev, struct ftl_nv_cache_compactor *compactor)
912 {
913 	if (!compactor) {
914 		return;
915 	}
916 
917 	ftl_rq_del(compactor->wr);
918 	ftl_rq_del(compactor->rd);
919 	free(compactor);
920 }
921 
922 static struct ftl_nv_cache_compactor *
923 compactor_alloc(struct spdk_ftl_dev *dev)
924 {
925 	struct ftl_nv_cache_compactor *compactor;
926 
927 	compactor = calloc(1, sizeof(*compactor));
928 	if (!compactor) {
929 		goto error;
930 	}
931 
932 	/* Allocate help request for writing */
933 	compactor->wr = ftl_rq_new(dev, dev->md_size);
934 	if (!compactor->wr) {
935 		goto error;
936 	}
937 
938 	/* Allocate help request for reading */
939 	compactor->rd = ftl_rq_new(dev, dev->nv_cache.md_size);
940 	if (!compactor->rd) {
941 		goto error;
942 	}
943 
944 	compactor->nv_cache = &dev->nv_cache;
945 	compactor->wr->owner.priv = compactor;
946 	compactor->wr->owner.cb = compaction_process_ftl_done;
947 	compactor->wr->owner.compaction = true;
948 
949 	return compactor;
950 
951 error:
952 	compactor_free(dev, compactor);
953 	return NULL;
954 }
955 
956 static void
957 ftl_nv_cache_submit_cb_done(struct ftl_io *io)
958 {
959 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
960 
961 	chunk_advance_blocks(nv_cache, io->nv_cache_chunk, io->num_blocks);
962 	io->nv_cache_chunk = NULL;
963 
964 	ftl_mempool_put(nv_cache->md_pool, io->md);
965 	ftl_io_complete(io);
966 }
967 
968 static void
969 ftl_nv_cache_l2p_update(struct ftl_io *io)
970 {
971 	struct spdk_ftl_dev *dev = io->dev;
972 	ftl_addr next_addr = io->addr;
973 	size_t i;
974 
975 	for (i = 0; i < io->num_blocks; ++i, ++next_addr) {
976 		ftl_l2p_update_cache(dev, ftl_io_get_lba(io, i), next_addr, io->map[i]);
977 	}
978 
979 	ftl_l2p_unpin(dev, io->lba, io->num_blocks);
980 	ftl_nv_cache_submit_cb_done(io);
981 }
982 
983 static void
984 ftl_nv_cache_submit_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
985 {
986 	struct ftl_io *io = cb_arg;
987 
988 	spdk_bdev_free_io(bdev_io);
989 
990 	if (spdk_unlikely(!success)) {
991 		FTL_ERRLOG(io->dev, "Non-volatile cache write failed at %"PRIx64"\n",
992 			   io->addr);
993 		io->status = -EIO;
994 		ftl_nv_cache_submit_cb_done(io);
995 	} else {
996 		ftl_nv_cache_l2p_update(io);
997 	}
998 }
999 
1000 static void
1001 nv_cache_write(void *_io)
1002 {
1003 	struct ftl_io *io = _io;
1004 	struct spdk_ftl_dev *dev = io->dev;
1005 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
1006 	int rc;
1007 
1008 	rc = ftl_nv_cache_bdev_writev_blocks_with_md(dev,
1009 			nv_cache->bdev_desc, nv_cache->cache_ioch,
1010 			io->iov, io->iov_cnt, io->md,
1011 			ftl_addr_to_nvc_offset(dev, io->addr), io->num_blocks,
1012 			ftl_nv_cache_submit_cb, io);
1013 	if (spdk_unlikely(rc)) {
1014 		if (rc == -ENOMEM) {
1015 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1016 			io->bdev_io_wait.bdev = bdev;
1017 			io->bdev_io_wait.cb_fn = nv_cache_write;
1018 			io->bdev_io_wait.cb_arg = io;
1019 			spdk_bdev_queue_io_wait(bdev, nv_cache->cache_ioch, &io->bdev_io_wait);
1020 		} else {
1021 			ftl_abort();
1022 		}
1023 	}
1024 }
1025 
1026 static void
1027 ftl_nv_cache_pin_cb(struct spdk_ftl_dev *dev, int status, struct ftl_l2p_pin_ctx *pin_ctx)
1028 {
1029 	struct ftl_io *io = pin_ctx->cb_ctx;
1030 	size_t i;
1031 
1032 	if (spdk_unlikely(status != 0)) {
1033 		/* Retry on the internal L2P fault */
1034 		FTL_ERRLOG(dev, "Cannot PIN LBA for NV cache write failed at %"PRIx64"\n",
1035 			   io->addr);
1036 		io->status = -EAGAIN;
1037 		ftl_nv_cache_submit_cb_done(io);
1038 		return;
1039 	}
1040 
1041 	/* Remember previous l2p mapping to resolve conflicts in case of outstanding write-after-write */
1042 	for (i = 0; i < io->num_blocks; ++i) {
1043 		io->map[i] = ftl_l2p_get(dev, ftl_io_get_lba(io, i));
1044 	}
1045 
1046 	assert(io->iov_pos == 0);
1047 
1048 	nv_cache_write(io);
1049 }
1050 
1051 bool
1052 ftl_nv_cache_write(struct ftl_io *io)
1053 {
1054 	struct spdk_ftl_dev *dev = io->dev;
1055 	uint64_t cache_offset;
1056 
1057 	io->md = ftl_mempool_get(dev->nv_cache.md_pool);
1058 	if (spdk_unlikely(!io->md)) {
1059 		return false;
1060 	}
1061 
1062 	/* Reserve area on the write buffer cache */
1063 	cache_offset = ftl_nv_cache_get_wr_buffer(&dev->nv_cache, io);
1064 	if (cache_offset == FTL_LBA_INVALID) {
1065 		/* No free space in NV cache, resubmit request */
1066 		ftl_mempool_put(dev->nv_cache.md_pool, io->md);
1067 		return false;
1068 	}
1069 	io->addr = ftl_addr_from_nvc_offset(dev, cache_offset);
1070 	io->nv_cache_chunk = dev->nv_cache.chunk_current;
1071 
1072 	ftl_nv_cache_fill_md(io);
1073 	ftl_l2p_pin(io->dev, io->lba, io->num_blocks,
1074 		    ftl_nv_cache_pin_cb, io,
1075 		    &io->l2p_pin_ctx);
1076 
1077 	return true;
1078 }
1079 
1080 int
1081 ftl_nv_cache_read(struct ftl_io *io, ftl_addr addr, uint32_t num_blocks,
1082 		  spdk_bdev_io_completion_cb cb, void *cb_arg)
1083 {
1084 	int rc;
1085 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1086 
1087 	assert(ftl_addr_in_nvc(io->dev, addr));
1088 
1089 	rc = ftl_nv_cache_bdev_read_blocks_with_md(io->dev, nv_cache->bdev_desc, nv_cache->cache_ioch,
1090 			ftl_io_iovec_addr(io), NULL, ftl_addr_to_nvc_offset(io->dev, addr),
1091 			num_blocks, cb, cb_arg);
1092 
1093 	return rc;
1094 }
1095 
1096 bool
1097 ftl_nv_cache_is_halted(struct ftl_nv_cache *nv_cache)
1098 {
1099 	struct ftl_nv_cache_compactor *compactor;
1100 
1101 	if (nv_cache->compaction_active_count) {
1102 		return false;
1103 	}
1104 
1105 	TAILQ_FOREACH(compactor, &nv_cache->compactor_list, entry) {
1106 		if (compactor->rd->iter.idx != 0 || compactor->wr->iter.idx != 0) {
1107 			return false;
1108 		}
1109 	}
1110 
1111 	if (nv_cache->chunk_open_count > 0) {
1112 		return false;
1113 	}
1114 
1115 	return true;
1116 }
1117 
1118 static void
1119 ftl_nv_cache_compaction_reset(struct ftl_nv_cache_compactor *compactor)
1120 {
1121 	struct ftl_rq *rd = compactor->rd;
1122 	struct ftl_rq *wr = compactor->wr;
1123 	uint64_t lba;
1124 	uint64_t i;
1125 
1126 	for (i = rd->iter.idx; i < rd->iter.count; i++) {
1127 		lba = ((union ftl_md_vss *)rd->entries[i].io_md)->nv_cache.lba;
1128 		if (lba != FTL_LBA_INVALID) {
1129 			ftl_l2p_unpin(rd->dev, lba, 1);
1130 		}
1131 	}
1132 
1133 	rd->iter.idx = 0;
1134 	rd->iter.count = 0;
1135 
1136 	for (i = 0; i < wr->iter.idx; i++) {
1137 		lba = wr->entries[i].lba;
1138 		assert(lba != FTL_LBA_INVALID);
1139 		ftl_l2p_unpin(wr->dev, lba, 1);
1140 	}
1141 
1142 	wr->iter.idx = 0;
1143 }
1144 
1145 void
1146 ftl_chunk_map_set_lba(struct ftl_nv_cache_chunk *chunk,
1147 		      uint64_t offset, uint64_t lba)
1148 {
1149 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1150 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1151 
1152 	ftl_lba_store(dev, p2l_map->chunk_map, offset, lba);
1153 }
1154 
1155 uint64_t
1156 ftl_chunk_map_get_lba(struct ftl_nv_cache_chunk *chunk, uint64_t offset)
1157 {
1158 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1159 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1160 
1161 	return ftl_lba_load(dev, p2l_map->chunk_map, offset);
1162 }
1163 
1164 static void
1165 ftl_chunk_set_addr(struct ftl_nv_cache_chunk *chunk, uint64_t lba, ftl_addr addr)
1166 {
1167 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1168 	uint64_t cache_offset = ftl_addr_to_nvc_offset(dev, addr);
1169 	uint64_t offset;
1170 
1171 	offset = (cache_offset - chunk->offset) % chunk->nv_cache->chunk_blocks;
1172 	ftl_chunk_map_set_lba(chunk, offset, lba);
1173 }
1174 
1175 struct ftl_nv_cache_chunk *
1176 ftl_nv_cache_get_chunk_from_addr(struct spdk_ftl_dev *dev, ftl_addr addr)
1177 {
1178 	struct ftl_nv_cache_chunk *chunk = dev->nv_cache.chunks;
1179 	uint64_t chunk_idx;
1180 	uint64_t cache_offset = ftl_addr_to_nvc_offset(dev, addr);
1181 
1182 	assert(chunk != NULL);
1183 	chunk_idx = (cache_offset - chunk->offset) / chunk->nv_cache->chunk_blocks;
1184 	chunk += chunk_idx;
1185 
1186 	return chunk;
1187 }
1188 
1189 void
1190 ftl_nv_cache_set_addr(struct spdk_ftl_dev *dev, uint64_t lba, ftl_addr addr)
1191 {
1192 	struct ftl_nv_cache_chunk *chunk;
1193 
1194 	chunk = ftl_nv_cache_get_chunk_from_addr(dev, addr);
1195 
1196 	assert(lba != FTL_LBA_INVALID);
1197 
1198 	ftl_chunk_set_addr(chunk, lba, addr);
1199 	ftl_bitmap_set(dev->valid_map, addr);
1200 }
1201 
1202 static void ftl_chunk_open(struct ftl_nv_cache_chunk *chunk);
1203 
1204 void
1205 ftl_nv_cache_process(struct spdk_ftl_dev *dev)
1206 {
1207 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
1208 
1209 	assert(dev->nv_cache.bdev_desc);
1210 
1211 	if (nv_cache->chunk_open_count < FTL_MAX_OPEN_CHUNKS && spdk_likely(!nv_cache->halt) &&
1212 	    !TAILQ_EMPTY(&nv_cache->chunk_free_list)) {
1213 		struct ftl_nv_cache_chunk *chunk = TAILQ_FIRST(&nv_cache->chunk_free_list);
1214 		TAILQ_REMOVE(&nv_cache->chunk_free_list, chunk, entry);
1215 		TAILQ_INSERT_TAIL(&nv_cache->chunk_open_list, chunk, entry);
1216 		nv_cache->chunk_free_count--;
1217 		ftl_chunk_open(chunk);
1218 	}
1219 
1220 	if (is_compaction_required(nv_cache) && !TAILQ_EMPTY(&nv_cache->compactor_list)) {
1221 		struct ftl_nv_cache_compactor *comp =
1222 			TAILQ_FIRST(&nv_cache->compactor_list);
1223 
1224 		TAILQ_REMOVE(&nv_cache->compactor_list, comp, entry);
1225 
1226 		compaction_process_start(comp);
1227 	}
1228 
1229 	ftl_chunk_persist_free_state(nv_cache);
1230 
1231 	if (spdk_unlikely(nv_cache->halt)) {
1232 		struct ftl_nv_cache_compactor *compactor;
1233 
1234 		TAILQ_FOREACH(compactor, &nv_cache->compactor_list, entry) {
1235 			ftl_nv_cache_compaction_reset(compactor);
1236 		}
1237 	}
1238 }
1239 
1240 bool
1241 ftl_nv_cache_full(struct ftl_nv_cache *nv_cache)
1242 {
1243 	if (0 == nv_cache->chunk_open_count && NULL == nv_cache->chunk_current) {
1244 		return true;
1245 	} else {
1246 		return false;
1247 	}
1248 }
1249 
1250 static void
1251 chunk_free_p2l_map(struct ftl_nv_cache_chunk *chunk)
1252 {
1253 
1254 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
1255 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1256 
1257 	ftl_mempool_put(nv_cache->p2l_pool, p2l_map->chunk_map);
1258 	p2l_map->chunk_map = NULL;
1259 
1260 	ftl_chunk_free_md_entry(chunk);
1261 }
1262 
1263 int
1264 ftl_nv_cache_save_state(struct ftl_nv_cache *nv_cache)
1265 {
1266 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1267 	struct ftl_nv_cache_chunk *chunk;
1268 	int status = 0;
1269 	uint64_t i;
1270 
1271 	assert(nv_cache->chunk_open_count == 0);
1272 
1273 	if (nv_cache->compaction_active_count) {
1274 		FTL_ERRLOG(dev, "Cannot save NV cache state, compaction in progress\n");
1275 		return -EINVAL;
1276 	}
1277 
1278 	chunk = nv_cache->chunks;
1279 	if (!chunk) {
1280 		FTL_ERRLOG(dev, "Cannot save NV cache state, no NV cache metadata\n");
1281 		return -ENOMEM;
1282 	}
1283 
1284 	for (i = 0; i < nv_cache->chunk_count; i++, chunk++) {
1285 		nvc_validate_md(nv_cache, chunk->md);
1286 
1287 		if (chunk->md->read_pointer)  {
1288 			/* Only full chunks can be compacted */
1289 			if (chunk->md->blocks_written != nv_cache->chunk_blocks) {
1290 				assert(0);
1291 				status = -EINVAL;
1292 				break;
1293 			}
1294 
1295 			/*
1296 			 * Chunk in the middle of compaction, start over after
1297 			 * load
1298 			 */
1299 			chunk->md->read_pointer = chunk->md->blocks_compacted = 0;
1300 		} else if (chunk->md->blocks_written == nv_cache->chunk_blocks) {
1301 			/* Full chunk */
1302 		} else if (0 == chunk->md->blocks_written) {
1303 			/* Empty chunk */
1304 		} else {
1305 			assert(0);
1306 			status = -EINVAL;
1307 			break;
1308 		}
1309 	}
1310 
1311 	if (status) {
1312 		FTL_ERRLOG(dev, "Cannot save NV cache state, inconsistent NV cache"
1313 			   "metadata\n");
1314 	}
1315 
1316 	return status;
1317 }
1318 
1319 static int
1320 chunk_alloc_p2l_map(struct ftl_nv_cache_chunk *chunk)
1321 {
1322 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
1323 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1324 
1325 	assert(p2l_map->ref_cnt == 0);
1326 	assert(p2l_map->chunk_map == NULL);
1327 
1328 	p2l_map->chunk_map = ftl_mempool_get(nv_cache->p2l_pool);
1329 
1330 	if (!p2l_map->chunk_map) {
1331 		return -ENOMEM;
1332 	}
1333 
1334 	if (ftl_chunk_alloc_md_entry(chunk)) {
1335 		ftl_mempool_put(nv_cache->p2l_pool, p2l_map->chunk_map);
1336 		p2l_map->chunk_map = NULL;
1337 		return -ENOMEM;
1338 	}
1339 
1340 	/* Set the P2L to FTL_LBA_INVALID */
1341 	memset(p2l_map->chunk_map, -1, FTL_BLOCK_SIZE * nv_cache->tail_md_chunk_blocks);
1342 
1343 	return 0;
1344 }
1345 
1346 int
1347 ftl_nv_cache_load_state(struct ftl_nv_cache *nv_cache)
1348 {
1349 	struct ftl_nv_cache_chunk *chunk;
1350 	uint64_t chunks_number, offset, i;
1351 	int status = 0;
1352 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1353 
1354 	nv_cache->chunk_current = NULL;
1355 	TAILQ_INIT(&nv_cache->chunk_free_list);
1356 	TAILQ_INIT(&nv_cache->chunk_full_list);
1357 	nv_cache->chunk_full_count = nv_cache->chunk_free_count = 0;
1358 
1359 	assert(nv_cache->chunk_open_count == 0);
1360 	offset = nvc_data_offset(nv_cache);
1361 	chunk = nv_cache->chunks;
1362 	if (!chunk) {
1363 		FTL_ERRLOG(dev, "No NV cache metadata\n");
1364 		return -1;
1365 	}
1366 
1367 	for (i = 0; i < nv_cache->chunk_count; i++, chunk++) {
1368 		chunk->nv_cache = nv_cache;
1369 		nvc_validate_md(nv_cache, chunk->md);
1370 
1371 		if (offset != chunk->offset) {
1372 			status = -EINVAL;
1373 			goto error;
1374 		}
1375 
1376 		if (chunk->md->blocks_written == nv_cache->chunk_blocks) {
1377 			/* Chunk full, move it on full list */
1378 			TAILQ_INSERT_TAIL(&nv_cache->chunk_full_list, chunk, entry);
1379 			nv_cache->chunk_full_count++;
1380 		} else if (0 == chunk->md->blocks_written) {
1381 			/* Chunk empty, move it on empty list */
1382 			TAILQ_INSERT_TAIL(&nv_cache->chunk_free_list, chunk, entry);
1383 			nv_cache->chunk_free_count++;
1384 		} else {
1385 			status = -EINVAL;
1386 			goto error;
1387 		}
1388 
1389 		offset += nv_cache->chunk_blocks;
1390 	}
1391 
1392 	chunks_number = nv_cache->chunk_free_count + nv_cache->chunk_full_count;
1393 	assert(nv_cache->chunk_current == NULL);
1394 
1395 	if (chunks_number != nv_cache->chunk_count) {
1396 		FTL_ERRLOG(dev, "Inconsistent NV cache metadata\n");
1397 		status = -EINVAL;
1398 		goto error;
1399 	}
1400 
1401 	FTL_NOTICELOG(dev, "FTL NV Cache: full chunks = %lu, empty chunks = %lu\n",
1402 		      nv_cache->chunk_full_count, nv_cache->chunk_free_count);
1403 
1404 	if (0 == status) {
1405 		FTL_NOTICELOG(dev, "FTL NV Cache: state loaded successfully\n");
1406 	} else {
1407 		FTL_ERRLOG(dev, "FTL NV Cache: loading state ERROR\n");
1408 	}
1409 
1410 error:
1411 	return status;
1412 }
1413 
1414 typedef void (*ftl_chunk_ops_cb)(struct ftl_nv_cache_chunk *chunk, void *cntx, bool status);
1415 
1416 static void
1417 write_brq_end(struct spdk_bdev_io *bdev_io, bool success, void *arg)
1418 {
1419 	struct ftl_basic_rq *brq = arg;
1420 	struct ftl_nv_cache_chunk *chunk = brq->io.chunk;
1421 
1422 	brq->success = success;
1423 	if (spdk_likely(success)) {
1424 		chunk_advance_blocks(chunk->nv_cache, chunk, brq->num_blocks);
1425 	}
1426 
1427 	spdk_bdev_free_io(bdev_io);
1428 	brq->owner.cb(brq);
1429 }
1430 
1431 static void
1432 _ftl_chunk_basic_rq_write(void *_brq)
1433 {
1434 	struct ftl_basic_rq *brq = _brq;
1435 	struct ftl_nv_cache *nv_cache = brq->io.chunk->nv_cache;
1436 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1437 	int rc;
1438 
1439 	rc = ftl_nv_cache_bdev_write_blocks_with_md(dev, nv_cache->bdev_desc, nv_cache->cache_ioch,
1440 			brq->io_payload, NULL, brq->io.addr,
1441 			brq->num_blocks, write_brq_end, brq);
1442 	if (spdk_unlikely(rc)) {
1443 		if (rc == -ENOMEM) {
1444 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1445 			brq->io.bdev_io_wait.bdev = bdev;
1446 			brq->io.bdev_io_wait.cb_fn = _ftl_chunk_basic_rq_write;
1447 			brq->io.bdev_io_wait.cb_arg = brq;
1448 			spdk_bdev_queue_io_wait(bdev, nv_cache->cache_ioch, &brq->io.bdev_io_wait);
1449 		} else {
1450 			ftl_abort();
1451 		}
1452 	}
1453 }
1454 
1455 static void
1456 ftl_chunk_basic_rq_write(struct ftl_nv_cache_chunk *chunk, struct ftl_basic_rq *brq)
1457 {
1458 	struct ftl_nv_cache *nv_cache = chunk->nv_cache;
1459 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1460 
1461 	brq->io.chunk = chunk;
1462 	brq->success = false;
1463 
1464 	_ftl_chunk_basic_rq_write(brq);
1465 
1466 	chunk->md->write_pointer += brq->num_blocks;
1467 	dev->io_activity_total += brq->num_blocks;
1468 }
1469 
1470 static void
1471 chunk_open_cb(int status, void *ctx)
1472 {
1473 	struct ftl_nv_cache_chunk *chunk = (struct ftl_nv_cache_chunk *)ctx;
1474 
1475 	if (spdk_unlikely(status)) {
1476 		ftl_md_persist_entry_retry(&chunk->md_persist_entry_ctx);
1477 		return;
1478 	}
1479 
1480 	chunk->md->state = FTL_CHUNK_STATE_OPEN;
1481 }
1482 
1483 static void
1484 ftl_chunk_open(struct ftl_nv_cache_chunk *chunk)
1485 {
1486 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1487 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1488 	struct ftl_layout_region *region = &dev->layout.region[FTL_LAYOUT_REGION_TYPE_NVC_MD];
1489 	struct ftl_md *md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_NVC_MD];
1490 
1491 	if (chunk_alloc_p2l_map(chunk)) {
1492 		assert(0);
1493 		/*
1494 		 * We control number of opening chunk and it shall be consistent with size of chunk
1495 		 * P2L map pool
1496 		 */
1497 		ftl_abort();
1498 		return;
1499 	}
1500 
1501 	chunk->nv_cache->chunk_open_count++;
1502 
1503 	assert(chunk->md->write_pointer == 0);
1504 	assert(chunk->md->blocks_written == 0);
1505 
1506 	memcpy(p2l_map->chunk_dma_md, chunk->md, region->entry_size * FTL_BLOCK_SIZE);
1507 	p2l_map->chunk_dma_md->state = FTL_CHUNK_STATE_OPEN;
1508 	p2l_map->chunk_dma_md->p2l_map_checksum = 0;
1509 
1510 	ftl_md_persist_entry(md, get_chunk_idx(chunk), p2l_map->chunk_dma_md,
1511 			     NULL, chunk_open_cb, chunk,
1512 			     &chunk->md_persist_entry_ctx);
1513 }
1514 
1515 static void
1516 chunk_close_cb(int status, void *ctx)
1517 {
1518 	struct ftl_nv_cache_chunk *chunk = (struct ftl_nv_cache_chunk *)ctx;
1519 
1520 	assert(chunk->md->write_pointer == chunk->nv_cache->chunk_blocks);
1521 
1522 	if (spdk_likely(!status)) {
1523 		chunk->md->p2l_map_checksum = chunk->p2l_map.chunk_dma_md->p2l_map_checksum;
1524 		chunk_free_p2l_map(chunk);
1525 
1526 		assert(chunk->nv_cache->chunk_open_count > 0);
1527 		chunk->nv_cache->chunk_open_count--;
1528 
1529 		/* Chunk full move it on full list */
1530 		TAILQ_INSERT_TAIL(&chunk->nv_cache->chunk_full_list, chunk, entry);
1531 		chunk->nv_cache->chunk_full_count++;
1532 
1533 		chunk->md->state = FTL_CHUNK_STATE_CLOSED;
1534 	} else {
1535 		ftl_md_persist_entry_retry(&chunk->md_persist_entry_ctx);
1536 	}
1537 }
1538 
1539 static void
1540 chunk_map_write_cb(struct ftl_basic_rq *brq)
1541 {
1542 	struct ftl_nv_cache_chunk *chunk = brq->io.chunk;
1543 	struct ftl_p2l_map *p2l_map = &chunk->p2l_map;
1544 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1545 	struct ftl_layout_region *region = &dev->layout.region[FTL_LAYOUT_REGION_TYPE_NVC_MD];
1546 	struct ftl_md *md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_NVC_MD];
1547 	uint32_t chunk_map_crc;
1548 
1549 	if (spdk_likely(brq->success)) {
1550 		chunk_map_crc = spdk_crc32c_update(p2l_map->chunk_map,
1551 						   chunk->nv_cache->tail_md_chunk_blocks * FTL_BLOCK_SIZE, 0);
1552 		memcpy(p2l_map->chunk_dma_md, chunk->md, region->entry_size * FTL_BLOCK_SIZE);
1553 		p2l_map->chunk_dma_md->state = FTL_CHUNK_STATE_CLOSED;
1554 		p2l_map->chunk_dma_md->p2l_map_checksum = chunk_map_crc;
1555 		ftl_md_persist_entry(md, get_chunk_idx(chunk), chunk->p2l_map.chunk_dma_md,
1556 				     NULL, chunk_close_cb, chunk,
1557 				     &chunk->md_persist_entry_ctx);
1558 	} else {
1559 		/* retry */
1560 		chunk->md->write_pointer -= brq->num_blocks;
1561 		ftl_chunk_basic_rq_write(chunk, brq);
1562 	}
1563 }
1564 
1565 static void
1566 ftl_chunk_close(struct ftl_nv_cache_chunk *chunk)
1567 {
1568 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(chunk->nv_cache, struct spdk_ftl_dev, nv_cache);
1569 	struct ftl_basic_rq *brq = &chunk->metadata_rq;
1570 	void *metadata = chunk->p2l_map.chunk_map;
1571 
1572 	ftl_basic_rq_init(dev, brq, metadata, chunk->nv_cache->tail_md_chunk_blocks);
1573 	ftl_basic_rq_set_owner(brq, chunk_map_write_cb, chunk);
1574 
1575 	assert(chunk->md->write_pointer == chunk_tail_md_offset(chunk->nv_cache));
1576 	brq->io.addr = chunk->offset + chunk->md->write_pointer;
1577 
1578 	ftl_chunk_basic_rq_write(chunk, brq);
1579 }
1580 
1581 int
1582 ftl_nv_cache_chunks_busy(struct ftl_nv_cache *nv_cache)
1583 {
1584 	/* chunk_current is migrating to closed status when closing, any others should already be
1585 	 * moved to free chunk list. Also need to wait for free md requests */
1586 	return nv_cache->chunk_open_count == 0 && nv_cache->chunk_free_persist_count == 0;
1587 }
1588 
1589 void
1590 ftl_nv_cache_halt(struct ftl_nv_cache *nv_cache)
1591 {
1592 	struct ftl_nv_cache_chunk *chunk;
1593 	uint64_t free_space;
1594 
1595 	nv_cache->halt = true;
1596 
1597 	/* Set chunks on open list back to free state since no user data has been written to it */
1598 	while (!TAILQ_EMPTY(&nv_cache->chunk_open_list)) {
1599 		chunk = TAILQ_FIRST(&nv_cache->chunk_open_list);
1600 
1601 		/* Chunks are moved between lists on metadata update submission, but state is changed
1602 		 * on completion. Breaking early in such a case to make sure all the necessary resources
1603 		 * will be freed (during next pass(es) of ftl_nv_cache_halt).
1604 		 */
1605 		if (chunk->md->state != FTL_CHUNK_STATE_OPEN) {
1606 			break;
1607 		}
1608 
1609 		TAILQ_REMOVE(&nv_cache->chunk_open_list, chunk, entry);
1610 		chunk_free_p2l_map(chunk);
1611 		memset(chunk->md, 0, sizeof(*chunk->md));
1612 		assert(nv_cache->chunk_open_count > 0);
1613 		nv_cache->chunk_open_count--;
1614 	}
1615 
1616 	/* Close current chunk by skipping all not written blocks */
1617 	chunk = nv_cache->chunk_current;
1618 	if (chunk != NULL) {
1619 		nv_cache->chunk_current = NULL;
1620 		if (chunk_is_closed(chunk)) {
1621 			return;
1622 		}
1623 
1624 		free_space = chunk_get_free_space(nv_cache, chunk);
1625 		chunk->md->blocks_skipped = free_space;
1626 		chunk->md->blocks_written += free_space;
1627 		chunk->md->write_pointer += free_space;
1628 		ftl_chunk_close(chunk);
1629 	}
1630 }
1631