xref: /spdk/lib/ftl/ftl_core.c (revision d0d19eb82e3ba677162ae5c1930d9ddcf728bcbf)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/likely.h"
35 #include "spdk/stdinc.h"
36 #include "spdk/nvme.h"
37 #include "spdk/io_channel.h"
38 #include "spdk/bdev_module.h"
39 #include "spdk/string.h"
40 #include "spdk_internal/log.h"
41 #include "spdk/ftl.h"
42 
43 #include "ftl_core.h"
44 #include "ftl_band.h"
45 #include "ftl_io.h"
46 #include "ftl_anm.h"
47 #include "ftl_rwb.h"
48 #include "ftl_debug.h"
49 #include "ftl_reloc.h"
50 
51 /* Max number of iovecs */
52 #define FTL_MAX_IOV 1024
53 
54 struct ftl_wptr {
55 	/* Owner device */
56 	struct spdk_ftl_dev		*dev;
57 
58 	/* Current PPA */
59 	struct ftl_ppa			ppa;
60 
61 	/* Band currently being written to */
62 	struct ftl_band			*band;
63 
64 	/* Current logical block's offset */
65 	uint64_t			offset;
66 
67 	/* Current erase block */
68 	struct ftl_chunk		*chunk;
69 
70 	/* IO that is currently processed */
71 	struct ftl_io			*current_io;
72 
73 	/* List link */
74 	LIST_ENTRY(ftl_wptr)		list_entry;
75 };
76 
77 struct ftl_flush {
78 	/* Owner device */
79 	struct spdk_ftl_dev		*dev;
80 
81 	/* Number of batches to wait for */
82 	size_t				num_req;
83 
84 	/* Callback */
85 	struct ftl_cb			cb;
86 
87 	/* Batch bitmap */
88 	struct spdk_bit_array		*bmap;
89 
90 	/* List link */
91 	LIST_ENTRY(ftl_flush)		list_entry;
92 };
93 
94 typedef int (*ftl_next_ppa_fn)(struct ftl_io *, struct ftl_ppa *);
95 static void _ftl_read(void *);
96 static void _ftl_write(void *);
97 
98 static int
99 ftl_rwb_flags_from_io(const struct ftl_io *io)
100 {
101 	int valid_flags = FTL_IO_INTERNAL | FTL_IO_WEAK | FTL_IO_PAD;
102 	return io->flags & valid_flags;
103 }
104 
105 static int
106 ftl_rwb_entry_weak(const struct ftl_rwb_entry *entry)
107 {
108 	return entry->flags & FTL_IO_WEAK;
109 }
110 
111 static void
112 ftl_wptr_free(struct ftl_wptr *wptr)
113 {
114 	if (!wptr) {
115 		return;
116 	}
117 
118 	free(wptr);
119 }
120 
121 static void
122 ftl_remove_wptr(struct ftl_wptr *wptr)
123 {
124 	LIST_REMOVE(wptr, list_entry);
125 	ftl_wptr_free(wptr);
126 }
127 
128 static void
129 ftl_io_cmpl_cb(void *arg, const struct spdk_nvme_cpl *status)
130 {
131 	struct ftl_io *io = arg;
132 
133 	if (spdk_nvme_cpl_is_error(status)) {
134 		ftl_io_process_error(io, status);
135 	}
136 
137 	ftl_trace_completion(io->dev, io, FTL_TRACE_COMPLETION_DISK);
138 
139 	ftl_io_dec_req(io);
140 
141 	if (ftl_io_done(io)) {
142 		ftl_io_complete(io);
143 	}
144 }
145 
146 static void
147 ftl_halt_writes(struct spdk_ftl_dev *dev, struct ftl_band *band)
148 {
149 	struct ftl_wptr *wptr = NULL;
150 
151 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
152 		if (wptr->band == band) {
153 			break;
154 		}
155 	}
156 
157 	/* If the band already has the high_prio flag set, other writes must */
158 	/* have failed earlier, so it's already taken care of. */
159 	if (band->high_prio) {
160 		assert(wptr == NULL);
161 		return;
162 	}
163 
164 	ftl_band_write_failed(band);
165 	ftl_remove_wptr(wptr);
166 }
167 
168 static struct ftl_wptr *
169 ftl_wptr_from_band(struct ftl_band *band)
170 {
171 	struct spdk_ftl_dev *dev = band->dev;
172 	struct ftl_wptr *wptr = NULL;
173 
174 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
175 		if (wptr->band == band) {
176 			return wptr;
177 		}
178 	}
179 
180 	return NULL;
181 }
182 
183 static void
184 ftl_md_write_fail(struct ftl_io *io, int status)
185 {
186 	struct ftl_band *band = io->band;
187 	struct ftl_wptr *wptr;
188 	char buf[128];
189 
190 	wptr = ftl_wptr_from_band(band);
191 
192 	SPDK_ERRLOG("Metadata write failed @ppa: %s, status: %d\n",
193 		    ftl_ppa2str(wptr->ppa, buf, sizeof(buf)), status);
194 
195 	ftl_halt_writes(io->dev, band);
196 }
197 
198 static void
199 ftl_md_write_cb(void *arg, int status)
200 {
201 	struct ftl_io *io = arg;
202 	struct spdk_ftl_dev *dev = io->dev;
203 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
204 	struct ftl_wptr *wptr;
205 	struct spdk_bdev *bdev;
206 
207 	wptr = ftl_wptr_from_band(io->band);
208 
209 	if (status) {
210 		ftl_md_write_fail(io, status);
211 		return;
212 	}
213 
214 	ftl_band_set_next_state(io->band);
215 	if (io->band->state == FTL_BAND_STATE_CLOSED) {
216 		if (nv_cache->bdev_desc) {
217 			bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
218 
219 			pthread_spin_lock(&nv_cache->lock);
220 			nv_cache->num_available += ftl_band_user_lbks(io->band);
221 
222 			if (spdk_unlikely(nv_cache->num_available > spdk_bdev_get_num_blocks(bdev))) {
223 				nv_cache->num_available = spdk_bdev_get_num_blocks(bdev);
224 			}
225 			pthread_spin_unlock(&nv_cache->lock);
226 		}
227 
228 		ftl_remove_wptr(wptr);
229 	}
230 }
231 
232 static int
233 ftl_ppa_read_next_ppa(struct ftl_io *io, struct ftl_ppa *ppa)
234 {
235 	struct spdk_ftl_dev *dev = io->dev;
236 	size_t lbk_cnt, max_lbks;
237 
238 	assert(ftl_io_mode_ppa(io));
239 	assert(io->iov_pos < io->iov_cnt);
240 
241 	if (io->pos == 0) {
242 		*ppa = io->ppa;
243 	} else {
244 		*ppa = ftl_band_next_xfer_ppa(io->band, io->ppa, io->pos);
245 	}
246 
247 	assert(!ftl_ppa_invalid(*ppa));
248 
249 	/* Metadata has to be read in the way it's written (jumping across */
250 	/* the chunks in xfer_size increments) */
251 	if (io->flags & FTL_IO_MD) {
252 		max_lbks = dev->xfer_size - (ppa->lbk % dev->xfer_size);
253 		lbk_cnt = spdk_min(ftl_io_iovec_len_left(io), max_lbks);
254 		assert(ppa->lbk / dev->xfer_size == (ppa->lbk + lbk_cnt - 1) / dev->xfer_size);
255 	} else {
256 		lbk_cnt = ftl_io_iovec_len_left(io);
257 	}
258 
259 	return lbk_cnt;
260 }
261 
262 static int
263 ftl_wptr_close_band(struct ftl_wptr *wptr)
264 {
265 	struct ftl_band *band = wptr->band;
266 
267 	ftl_band_set_state(band, FTL_BAND_STATE_CLOSING);
268 	band->tail_md_ppa = wptr->ppa;
269 
270 	return ftl_band_write_tail_md(band, band->md.dma_buf, ftl_md_write_cb);
271 }
272 
273 static int
274 ftl_wptr_open_band(struct ftl_wptr *wptr)
275 {
276 	struct ftl_band *band = wptr->band;
277 
278 	assert(ftl_band_chunk_is_first(band, wptr->chunk));
279 	assert(band->md.num_vld == 0);
280 
281 	ftl_band_clear_md(band);
282 
283 	assert(band->state == FTL_BAND_STATE_PREP);
284 	ftl_band_set_state(band, FTL_BAND_STATE_OPENING);
285 
286 	return ftl_band_write_head_md(band, band->md.dma_buf, ftl_md_write_cb);
287 }
288 
289 static int
290 ftl_submit_erase(struct ftl_io *io)
291 {
292 	struct spdk_ftl_dev *dev = io->dev;
293 	struct ftl_band *band = io->band;
294 	struct ftl_ppa ppa = io->ppa;
295 	struct ftl_chunk *chunk;
296 	uint64_t ppa_packed;
297 	int rc = 0;
298 	size_t i;
299 
300 	for (i = 0; i < io->lbk_cnt; ++i) {
301 		if (i != 0) {
302 			chunk = ftl_band_next_chunk(band, ftl_band_chunk_from_ppa(band, ppa));
303 			assert(chunk->state == FTL_CHUNK_STATE_CLOSED ||
304 			       chunk->state == FTL_CHUNK_STATE_VACANT);
305 			ppa = chunk->start_ppa;
306 		}
307 
308 		assert(ppa.lbk == 0);
309 		ppa_packed = ftl_ppa_addr_pack(dev, ppa);
310 
311 		ftl_trace_submission(dev, io, ppa, 1);
312 		rc = spdk_nvme_ocssd_ns_cmd_vector_reset(dev->ns, ftl_get_write_qpair(dev),
313 				&ppa_packed, 1, NULL, ftl_io_cmpl_cb, io);
314 		if (rc) {
315 			ftl_io_fail(io, rc);
316 			SPDK_ERRLOG("Vector reset failed with status: %d\n", rc);
317 			break;
318 		}
319 
320 		ftl_io_inc_req(io);
321 		ftl_io_advance(io, 1);
322 	}
323 
324 	if (ftl_io_done(io)) {
325 		ftl_io_complete(io);
326 	}
327 
328 	return rc;
329 }
330 
331 static void
332 _ftl_io_erase(void *ctx)
333 {
334 	ftl_io_erase((struct ftl_io *)ctx);
335 }
336 
337 static bool
338 ftl_check_core_thread(const struct spdk_ftl_dev *dev)
339 {
340 	return dev->core_thread.thread == spdk_get_thread();
341 }
342 
343 static bool
344 ftl_check_read_thread(const struct spdk_ftl_dev *dev)
345 {
346 	return dev->read_thread.thread == spdk_get_thread();
347 }
348 
349 int
350 ftl_io_erase(struct ftl_io *io)
351 {
352 	struct spdk_ftl_dev *dev = io->dev;
353 
354 	if (ftl_check_core_thread(dev)) {
355 		return ftl_submit_erase(io);
356 	}
357 
358 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_erase, io);
359 	return 0;
360 }
361 
362 static struct ftl_band *
363 ftl_next_write_band(struct spdk_ftl_dev *dev)
364 {
365 	struct ftl_band *band;
366 
367 	band = LIST_FIRST(&dev->free_bands);
368 	if (!band) {
369 		return NULL;
370 	}
371 	assert(band->state == FTL_BAND_STATE_FREE);
372 
373 	if (ftl_band_erase(band)) {
374 		/* TODO: handle erase failure */
375 		return NULL;
376 	}
377 
378 	return band;
379 }
380 
381 static struct ftl_band *
382 ftl_next_wptr_band(struct spdk_ftl_dev *dev)
383 {
384 	struct ftl_band *band;
385 
386 	if (!dev->next_band) {
387 		band = ftl_next_write_band(dev);
388 	} else {
389 		assert(dev->next_band->state == FTL_BAND_STATE_PREP);
390 		band = dev->next_band;
391 		dev->next_band = NULL;
392 	}
393 
394 	return band;
395 }
396 
397 static struct ftl_wptr *
398 ftl_wptr_init(struct ftl_band *band)
399 {
400 	struct spdk_ftl_dev *dev = band->dev;
401 	struct ftl_wptr *wptr;
402 
403 	wptr = calloc(1, sizeof(*wptr));
404 	if (!wptr) {
405 		return NULL;
406 	}
407 
408 	wptr->dev = dev;
409 	wptr->band = band;
410 	wptr->chunk = CIRCLEQ_FIRST(&band->chunks);
411 	wptr->ppa = wptr->chunk->start_ppa;
412 
413 	return wptr;
414 }
415 
416 static int
417 ftl_add_wptr(struct spdk_ftl_dev *dev)
418 {
419 	struct ftl_band *band;
420 	struct ftl_wptr *wptr;
421 
422 	band = ftl_next_wptr_band(dev);
423 	if (!band) {
424 		return -1;
425 	}
426 
427 	wptr = ftl_wptr_init(band);
428 	if (!wptr) {
429 		return -1;
430 	}
431 
432 	if (ftl_band_write_prep(band)) {
433 		ftl_wptr_free(wptr);
434 		return -1;
435 	}
436 
437 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
438 
439 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: band %u\n", band->id);
440 	ftl_trace_write_band(dev, band);
441 	return 0;
442 }
443 
444 static void
445 ftl_wptr_advance(struct ftl_wptr *wptr, size_t xfer_size)
446 {
447 	struct ftl_band *band = wptr->band;
448 	struct spdk_ftl_dev *dev = wptr->dev;
449 	struct spdk_ftl_conf *conf = &dev->conf;
450 	size_t next_thld;
451 
452 	wptr->offset += xfer_size;
453 	next_thld = (ftl_band_num_usable_lbks(band) * conf->band_thld) / 100;
454 
455 	if (ftl_band_full(band, wptr->offset)) {
456 		ftl_band_set_state(band, FTL_BAND_STATE_FULL);
457 	}
458 
459 	wptr->chunk->busy = true;
460 	wptr->ppa = ftl_band_next_xfer_ppa(band, wptr->ppa, xfer_size);
461 	wptr->chunk = ftl_band_next_operational_chunk(band, wptr->chunk);
462 
463 	assert(!ftl_ppa_invalid(wptr->ppa));
464 
465 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: grp:%d, pu:%d chunk:%d, lbk:%u\n",
466 		      wptr->ppa.grp, wptr->ppa.pu, wptr->ppa.chk, wptr->ppa.lbk);
467 
468 	if (wptr->offset >= next_thld && !dev->next_band) {
469 		dev->next_band = ftl_next_write_band(dev);
470 	}
471 }
472 
473 static int
474 ftl_wptr_ready(struct ftl_wptr *wptr)
475 {
476 	struct ftl_band *band = wptr->band;
477 
478 	/* TODO: add handling of empty bands */
479 
480 	if (spdk_unlikely(!ftl_chunk_is_writable(wptr->chunk))) {
481 		/* Erasing band may fail after it was assigned to wptr. */
482 		if (spdk_unlikely(wptr->chunk->state == FTL_CHUNK_STATE_BAD)) {
483 			ftl_wptr_advance(wptr, wptr->dev->xfer_size);
484 		}
485 		return 0;
486 	}
487 
488 	/* If we're in the process of writing metadata, wait till it is */
489 	/* completed. */
490 	/* TODO: we should probably change bands once we're writing tail md */
491 	if (ftl_band_state_changing(band)) {
492 		return 0;
493 	}
494 
495 	if (band->state == FTL_BAND_STATE_FULL) {
496 		if (ftl_wptr_close_band(wptr)) {
497 			/* TODO: need recovery here */
498 			assert(false);
499 		}
500 		return 0;
501 	}
502 
503 	if (band->state != FTL_BAND_STATE_OPEN) {
504 		if (ftl_wptr_open_band(wptr)) {
505 			/* TODO: need recovery here */
506 			assert(false);
507 		}
508 		return 0;
509 	}
510 
511 	return 1;
512 }
513 
514 static const struct spdk_ftl_limit *
515 ftl_get_limit(const struct spdk_ftl_dev *dev, int type)
516 {
517 	assert(type < SPDK_FTL_LIMIT_MAX);
518 	return &dev->conf.defrag.limits[type];
519 }
520 
521 static bool
522 ftl_cache_lba_valid(struct spdk_ftl_dev *dev, struct ftl_rwb_entry *entry)
523 {
524 	struct ftl_ppa ppa;
525 
526 	/* If the LBA is invalid don't bother checking the md and l2p */
527 	if (spdk_unlikely(entry->lba == FTL_LBA_INVALID)) {
528 		return false;
529 	}
530 
531 	ppa = ftl_l2p_get(dev, entry->lba);
532 	if (!(ftl_ppa_cached(ppa) && ppa.offset == entry->pos)) {
533 		return false;
534 	}
535 
536 	return true;
537 }
538 
539 static void
540 ftl_evict_cache_entry(struct spdk_ftl_dev *dev, struct ftl_rwb_entry *entry)
541 {
542 	pthread_spin_lock(&entry->lock);
543 
544 	if (!ftl_rwb_entry_valid(entry)) {
545 		goto unlock;
546 	}
547 
548 	/* If the l2p wasn't updated and still points at the entry, fill it with the */
549 	/* on-disk PPA and clear the cache status bit. Otherwise, skip the l2p update */
550 	/* and just clear the cache status. */
551 	if (!ftl_cache_lba_valid(dev, entry)) {
552 		goto clear;
553 	}
554 
555 	ftl_l2p_set(dev, entry->lba, entry->ppa);
556 clear:
557 	ftl_rwb_entry_invalidate(entry);
558 unlock:
559 	pthread_spin_unlock(&entry->lock);
560 }
561 
562 static struct ftl_rwb_entry *
563 ftl_acquire_entry(struct spdk_ftl_dev *dev, int flags)
564 {
565 	struct ftl_rwb_entry *entry;
566 
567 	entry = ftl_rwb_acquire(dev->rwb, ftl_rwb_type_from_flags(flags));
568 	if (!entry) {
569 		return NULL;
570 	}
571 
572 	ftl_evict_cache_entry(dev, entry);
573 
574 	entry->flags = flags;
575 	return entry;
576 }
577 
578 static void
579 ftl_rwb_pad(struct spdk_ftl_dev *dev, size_t size)
580 {
581 	struct ftl_rwb_entry *entry;
582 	int flags = FTL_IO_PAD | FTL_IO_INTERNAL;
583 
584 	for (size_t i = 0; i < size; ++i) {
585 		entry = ftl_acquire_entry(dev, flags);
586 		if (!entry) {
587 			break;
588 		}
589 
590 		entry->lba = FTL_LBA_INVALID;
591 		entry->ppa = ftl_to_ppa(FTL_PPA_INVALID);
592 		memset(entry->data, 0, FTL_BLOCK_SIZE);
593 		ftl_rwb_push(entry);
594 	}
595 }
596 
597 static void
598 ftl_remove_free_bands(struct spdk_ftl_dev *dev)
599 {
600 	while (!LIST_EMPTY(&dev->free_bands)) {
601 		LIST_REMOVE(LIST_FIRST(&dev->free_bands), list_entry);
602 	}
603 
604 	dev->next_band = NULL;
605 }
606 
607 static void
608 ftl_process_shutdown(struct spdk_ftl_dev *dev)
609 {
610 	size_t size = ftl_rwb_num_acquired(dev->rwb, FTL_RWB_TYPE_INTERNAL) +
611 		      ftl_rwb_num_acquired(dev->rwb, FTL_RWB_TYPE_USER);
612 
613 	if (size >= dev->xfer_size) {
614 		return;
615 	}
616 
617 	/* If we reach this point we need to remove free bands */
618 	/* and pad current wptr band to the end */
619 	ftl_remove_free_bands(dev);
620 
621 	/* Pad write buffer until band is full */
622 	ftl_rwb_pad(dev, dev->xfer_size - size);
623 }
624 
625 static int
626 ftl_shutdown_complete(struct spdk_ftl_dev *dev)
627 {
628 	return !__atomic_load_n(&dev->num_inflight, __ATOMIC_SEQ_CST) &&
629 	       LIST_EMPTY(&dev->wptr_list);
630 }
631 
632 void
633 ftl_apply_limits(struct spdk_ftl_dev *dev)
634 {
635 	const struct spdk_ftl_limit *limit;
636 	struct ftl_stats *stats = &dev->stats;
637 	size_t rwb_limit[FTL_RWB_TYPE_MAX];
638 	int i;
639 
640 	ftl_rwb_get_limits(dev->rwb, rwb_limit);
641 
642 	/* Clear existing limit */
643 	dev->limit = SPDK_FTL_LIMIT_MAX;
644 
645 	for (i = SPDK_FTL_LIMIT_CRIT; i < SPDK_FTL_LIMIT_MAX; ++i) {
646 		limit = ftl_get_limit(dev, i);
647 
648 		if (dev->num_free <= limit->thld) {
649 			rwb_limit[FTL_RWB_TYPE_USER] =
650 				(limit->limit * ftl_rwb_entry_cnt(dev->rwb)) / 100;
651 			stats->limits[i]++;
652 			dev->limit = i;
653 			goto apply;
654 		}
655 	}
656 
657 	/* Clear the limits, since we don't need to apply them anymore */
658 	rwb_limit[FTL_RWB_TYPE_USER] = ftl_rwb_entry_cnt(dev->rwb);
659 apply:
660 	ftl_trace_limits(dev, rwb_limit, dev->num_free);
661 	ftl_rwb_set_limits(dev->rwb, rwb_limit);
662 }
663 
664 static int
665 ftl_invalidate_addr_unlocked(struct spdk_ftl_dev *dev, struct ftl_ppa ppa)
666 {
667 	struct ftl_band *band = ftl_band_from_ppa(dev, ppa);
668 	struct ftl_md *md = &band->md;
669 	uint64_t offset;
670 
671 	offset = ftl_band_lbkoff_from_ppa(band, ppa);
672 
673 	/* The bit might be already cleared if two writes are scheduled to the */
674 	/* same LBA at the same time */
675 	if (spdk_bit_array_get(md->vld_map, offset)) {
676 		assert(md->num_vld > 0);
677 		spdk_bit_array_clear(md->vld_map, offset);
678 		md->num_vld--;
679 		return 1;
680 	}
681 
682 	return 0;
683 }
684 
685 int
686 ftl_invalidate_addr(struct spdk_ftl_dev *dev, struct ftl_ppa ppa)
687 {
688 	struct ftl_band *band;
689 	int rc;
690 
691 	assert(!ftl_ppa_cached(ppa));
692 	band = ftl_band_from_ppa(dev, ppa);
693 
694 	pthread_spin_lock(&band->md.lock);
695 	rc = ftl_invalidate_addr_unlocked(dev, ppa);
696 	pthread_spin_unlock(&band->md.lock);
697 
698 	return rc;
699 }
700 
701 static int
702 ftl_read_retry(int rc)
703 {
704 	return rc == -EAGAIN;
705 }
706 
707 static int
708 ftl_read_canceled(int rc)
709 {
710 	return rc == -EFAULT || rc == 0;
711 }
712 
713 static void
714 ftl_add_to_retry_queue(struct ftl_io *io)
715 {
716 	if (!(io->flags & FTL_IO_RETRY)) {
717 		io->flags |= FTL_IO_RETRY;
718 		TAILQ_INSERT_TAIL(&io->dev->retry_queue, io, retry_entry);
719 	}
720 }
721 
722 static int
723 ftl_submit_read(struct ftl_io *io, ftl_next_ppa_fn next_ppa)
724 {
725 	struct spdk_ftl_dev *dev = io->dev;
726 	struct ftl_ppa ppa;
727 	int rc = 0, lbk_cnt;
728 
729 	while (io->pos < io->lbk_cnt) {
730 		/* We might hit the cache here, if so, skip the read */
731 		lbk_cnt = rc = next_ppa(io, &ppa);
732 
733 		/* We might need to retry the read from scratch (e.g. */
734 		/* because write was under way and completed before */
735 		/* we could read it from rwb */
736 		if (ftl_read_retry(rc)) {
737 			continue;
738 		}
739 
740 		/* We don't have to schedule the read, as it was read from cache */
741 		if (ftl_read_canceled(rc)) {
742 			ftl_io_advance(io, 1);
743 			ftl_trace_completion(io->dev, io, rc ? FTL_TRACE_COMPLETION_INVALID :
744 					     FTL_TRACE_COMPLETION_CACHE);
745 			rc = 0;
746 			continue;
747 		}
748 
749 		assert(lbk_cnt > 0);
750 
751 		ftl_trace_submission(dev, io, ppa, lbk_cnt);
752 		rc = spdk_nvme_ns_cmd_read(dev->ns, ftl_get_read_qpair(dev),
753 					   ftl_io_iovec_addr(io),
754 					   ftl_ppa_addr_pack(io->dev, ppa), lbk_cnt,
755 					   ftl_io_cmpl_cb, io, 0);
756 		if (rc == -ENOMEM) {
757 			ftl_add_to_retry_queue(io);
758 			break;
759 		} else if (rc) {
760 			ftl_io_fail(io, rc);
761 			break;
762 		}
763 
764 		ftl_io_inc_req(io);
765 		ftl_io_advance(io, lbk_cnt);
766 	}
767 
768 	/* If we didn't have to read anything from the device, */
769 	/* complete the request right away */
770 	if (ftl_io_done(io)) {
771 		ftl_io_complete(io);
772 	}
773 
774 	return rc;
775 }
776 
777 static int
778 ftl_ppa_cache_read(struct ftl_io *io, uint64_t lba,
779 		   struct ftl_ppa ppa, void *buf)
780 {
781 	struct ftl_rwb *rwb = io->dev->rwb;
782 	struct ftl_rwb_entry *entry;
783 	struct ftl_ppa nppa;
784 	int rc = 0;
785 
786 	entry = ftl_rwb_entry_from_offset(rwb, ppa.offset);
787 	pthread_spin_lock(&entry->lock);
788 
789 	nppa = ftl_l2p_get(io->dev, lba);
790 	if (ppa.ppa != nppa.ppa) {
791 		rc = -1;
792 		goto out;
793 	}
794 
795 	memcpy(buf, entry->data, FTL_BLOCK_SIZE);
796 out:
797 	pthread_spin_unlock(&entry->lock);
798 	return rc;
799 }
800 
801 static int
802 ftl_lba_read_next_ppa(struct ftl_io *io, struct ftl_ppa *ppa)
803 {
804 	struct spdk_ftl_dev *dev = io->dev;
805 	struct ftl_ppa next_ppa;
806 	size_t i;
807 
808 	*ppa = ftl_l2p_get(dev, ftl_io_current_lba(io));
809 
810 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Read ppa:%lx, lba:%lu\n",
811 		      ppa->ppa, ftl_io_current_lba(io));
812 
813 	/* If the PPA is invalid, skip it (the buffer should already be zero'ed) */
814 	if (ftl_ppa_invalid(*ppa)) {
815 		return -EFAULT;
816 	}
817 
818 	if (ftl_ppa_cached(*ppa)) {
819 		if (!ftl_ppa_cache_read(io, ftl_io_current_lba(io), *ppa, ftl_io_iovec_addr(io))) {
820 			return 0;
821 		}
822 
823 		/* If the state changed, we have to re-read the l2p */
824 		return -EAGAIN;
825 	}
826 
827 	for (i = 1; i < ftl_io_iovec_len_left(io); ++i) {
828 		next_ppa = ftl_l2p_get(dev, ftl_io_get_lba(io, io->pos + i));
829 
830 		if (ftl_ppa_invalid(next_ppa) || ftl_ppa_cached(next_ppa)) {
831 			break;
832 		}
833 
834 		if (ftl_ppa_addr_pack(dev, *ppa) + i != ftl_ppa_addr_pack(dev, next_ppa)) {
835 			break;
836 		}
837 	}
838 
839 	return i;
840 }
841 
842 static void
843 ftl_complete_flush(struct ftl_flush *flush)
844 {
845 	assert(flush->num_req == 0);
846 	LIST_REMOVE(flush, list_entry);
847 
848 	flush->cb.fn(flush->cb.ctx, 0);
849 
850 	spdk_bit_array_free(&flush->bmap);
851 	free(flush);
852 }
853 
854 static void
855 ftl_process_flush(struct spdk_ftl_dev *dev, struct ftl_rwb_batch *batch)
856 {
857 	struct ftl_flush *flush, *tflush;
858 	size_t offset;
859 
860 	LIST_FOREACH_SAFE(flush, &dev->flush_list, list_entry, tflush) {
861 		offset = ftl_rwb_batch_get_offset(batch);
862 
863 		if (spdk_bit_array_get(flush->bmap, offset)) {
864 			spdk_bit_array_clear(flush->bmap, offset);
865 			if (!(--flush->num_req)) {
866 				ftl_complete_flush(flush);
867 			}
868 		}
869 	}
870 }
871 
872 static uint64_t
873 ftl_reserve_nv_cache(struct ftl_nv_cache *nv_cache, size_t *num_lbks)
874 {
875 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
876 	uint64_t num_available, cache_size, cache_addr = FTL_LBA_INVALID;
877 
878 	cache_size = spdk_bdev_get_num_blocks(bdev);
879 
880 	pthread_spin_lock(&nv_cache->lock);
881 	if (spdk_unlikely(nv_cache->num_available == 0)) {
882 		goto out;
883 	}
884 
885 	num_available = spdk_min(nv_cache->num_available, *num_lbks);
886 	if (spdk_unlikely(nv_cache->current_addr + num_available > cache_size)) {
887 		*num_lbks = cache_size - nv_cache->current_addr;
888 	} else {
889 		*num_lbks = num_available;
890 	}
891 
892 	cache_addr = nv_cache->current_addr;
893 	nv_cache->current_addr += *num_lbks;
894 	nv_cache->num_available -= *num_lbks;
895 
896 	if (nv_cache->current_addr == spdk_bdev_get_num_blocks(bdev)) {
897 		nv_cache->current_addr = 0;
898 	}
899 out:
900 	pthread_spin_unlock(&nv_cache->lock);
901 	return cache_addr;
902 }
903 
904 static struct ftl_io *
905 ftl_alloc_io_nv_cache(struct ftl_io *parent, size_t num_lbks)
906 {
907 	struct ftl_io_init_opts opts = {
908 		.dev		= parent->dev,
909 		.parent		= parent,
910 		.iov_cnt	= 1,
911 		.data		= ftl_io_iovec_addr(parent),
912 		.req_size	= num_lbks,
913 		.flags		= FTL_IO_CACHE,
914 	};
915 
916 	return ftl_io_init_internal(&opts);
917 }
918 
919 static void
920 ftl_nv_cache_submit_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
921 {
922 	struct ftl_io *io = cb_arg;
923 
924 	if (spdk_unlikely(!success)) {
925 		SPDK_ERRLOG("Non-volatile cache write failed at %"PRIx64"\n", io->ppa.ppa);
926 		io->status = -EIO;
927 	}
928 
929 	ftl_io_dec_req(io);
930 	if (ftl_io_done(io)) {
931 		ftl_io_complete(io);
932 	}
933 
934 	spdk_bdev_free_io(bdev_io);
935 }
936 
937 static void
938 ftl_submit_nv_cache(void *ctx)
939 {
940 	struct ftl_io *io = ctx;
941 	struct spdk_ftl_dev *dev = io->dev;
942 	struct spdk_thread *thread;
943 	struct ftl_io_channel *ioch;
944 	int rc;
945 
946 	ioch = spdk_io_channel_get_ctx(io->ioch);
947 	thread = spdk_io_channel_get_thread(io->ioch);
948 
949 	rc = spdk_bdev_write_blocks(dev->nv_cache.bdev_desc, ioch->cache_ioch,
950 				    ftl_io_iovec_addr(io), io->ppa.ppa, io->lbk_cnt,
951 				    ftl_nv_cache_submit_cb, io);
952 	if (rc == -ENOMEM) {
953 		spdk_thread_send_msg(thread, ftl_submit_nv_cache, io);
954 		return;
955 	} else if (rc) {
956 		SPDK_ERRLOG("Write to persistent cache failed: %s (%"PRIu64", %"PRIu64")\n",
957 			    spdk_strerror(-rc), io->ppa.ppa, io->lbk_cnt);
958 		io->status = -EIO;
959 		ftl_io_complete(io);
960 		return;
961 	}
962 
963 	ftl_io_advance(io, io->lbk_cnt);
964 	ftl_io_inc_req(io);
965 }
966 
967 static void
968 _ftl_write_nv_cache(void *ctx)
969 {
970 	struct ftl_io *child, *io = ctx;
971 	struct spdk_ftl_dev *dev = io->dev;
972 	struct spdk_thread *thread;
973 	uint64_t num_lbks;
974 
975 	thread = spdk_io_channel_get_thread(io->ioch);
976 
977 	while (io->pos < io->lbk_cnt) {
978 		num_lbks = ftl_io_iovec_len_left(io);
979 
980 		child = ftl_alloc_io_nv_cache(io, num_lbks);
981 		if (spdk_unlikely(!child)) {
982 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
983 			return;
984 		}
985 
986 		/* Reserve area on the write buffer cache */
987 		child->ppa.ppa = ftl_reserve_nv_cache(&dev->nv_cache, &num_lbks);
988 		if (child->ppa.ppa == FTL_LBA_INVALID) {
989 			ftl_io_free(child);
990 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
991 			break;
992 		}
993 
994 		/* Shrink the IO if there isn't enough room in the cache to fill the whole iovec */
995 		if (spdk_unlikely(num_lbks != ftl_io_iovec_len_left(io))) {
996 			ftl_io_shrink_iovec(child, ftl_io_iovec_addr(child), 1, num_lbks);
997 		}
998 
999 		ftl_submit_nv_cache(child);
1000 		ftl_io_advance(io, num_lbks);
1001 	}
1002 
1003 	if (ftl_io_done(io)) {
1004 		ftl_io_complete(io);
1005 	}
1006 }
1007 
1008 static void
1009 ftl_write_nv_cache(struct ftl_io *parent)
1010 {
1011 	ftl_io_reset(parent);
1012 	parent->flags |= FTL_IO_CACHE;
1013 	_ftl_write_nv_cache(parent);
1014 }
1015 
1016 static void
1017 ftl_write_fail(struct ftl_io *io, int status)
1018 {
1019 	struct ftl_rwb_batch *batch = io->rwb_batch;
1020 	struct spdk_ftl_dev *dev = io->dev;
1021 	struct ftl_rwb_entry *entry;
1022 	struct ftl_band *band;
1023 	char buf[128];
1024 
1025 	entry = ftl_rwb_batch_first_entry(batch);
1026 
1027 	band = ftl_band_from_ppa(io->dev, entry->ppa);
1028 	SPDK_ERRLOG("Write failed @ppa: %s, status: %d\n",
1029 		    ftl_ppa2str(entry->ppa, buf, sizeof(buf)), status);
1030 
1031 	/* Close the band and, halt wptr and defrag */
1032 	ftl_halt_writes(dev, band);
1033 
1034 	ftl_rwb_foreach(entry, batch) {
1035 		/* Invalidate meta set by process_writes() */
1036 		ftl_invalidate_addr(dev, entry->ppa);
1037 	}
1038 
1039 	/* Reset the batch back to the the RWB to resend it later */
1040 	ftl_rwb_batch_revert(batch);
1041 }
1042 
1043 static void
1044 ftl_write_cb(void *arg, int status)
1045 {
1046 	struct ftl_io *io = arg;
1047 	struct spdk_ftl_dev *dev = io->dev;
1048 	struct ftl_rwb_batch *batch = io->rwb_batch;
1049 	struct ftl_rwb_entry *entry;
1050 
1051 	if (status) {
1052 		ftl_write_fail(io, status);
1053 		return;
1054 	}
1055 
1056 	assert(io->lbk_cnt == dev->xfer_size);
1057 	ftl_rwb_foreach(entry, batch) {
1058 		if (!(io->flags & FTL_IO_MD) && !(entry->flags & FTL_IO_PAD)) {
1059 			/* Verify that the LBA is set for user lbks */
1060 			assert(entry->lba != FTL_LBA_INVALID);
1061 		}
1062 
1063 		SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write ppa:%lu, lba:%lu\n",
1064 			      entry->ppa.ppa, entry->lba);
1065 	}
1066 
1067 	ftl_process_flush(dev, batch);
1068 	ftl_rwb_batch_release(batch);
1069 }
1070 
1071 static void
1072 ftl_update_rwb_stats(struct spdk_ftl_dev *dev, const struct ftl_rwb_entry *entry)
1073 {
1074 	if (!ftl_rwb_entry_internal(entry)) {
1075 		dev->stats.write_user++;
1076 	}
1077 	dev->stats.write_total++;
1078 }
1079 
1080 static void
1081 ftl_update_l2p(struct spdk_ftl_dev *dev, const struct ftl_rwb_entry *entry,
1082 	       struct ftl_ppa ppa)
1083 {
1084 	struct ftl_ppa prev_ppa;
1085 	struct ftl_rwb_entry *prev;
1086 	struct ftl_band *band;
1087 	int valid;
1088 
1089 	prev_ppa = ftl_l2p_get(dev, entry->lba);
1090 	if (ftl_ppa_invalid(prev_ppa)) {
1091 		ftl_l2p_set(dev, entry->lba, ppa);
1092 		return;
1093 	}
1094 
1095 	/* If the L2P's PPA is different than what we expected we don't need to */
1096 	/* do anything (someone's already overwritten our data). */
1097 	if (ftl_rwb_entry_weak(entry) && !ftl_ppa_cmp(prev_ppa, entry->ppa)) {
1098 		return;
1099 	}
1100 
1101 	if (ftl_ppa_cached(prev_ppa)) {
1102 		assert(!ftl_rwb_entry_weak(entry));
1103 		prev = ftl_rwb_entry_from_offset(dev->rwb, prev_ppa.offset);
1104 		pthread_spin_lock(&prev->lock);
1105 
1106 		/* Re-read the L2P under the lock to protect against updates */
1107 		/* to this LBA from other threads */
1108 		prev_ppa = ftl_l2p_get(dev, entry->lba);
1109 
1110 		/* If the entry is no longer in cache, another write has been */
1111 		/* scheduled in the meantime, so we have to invalidate its LBA */
1112 		if (!ftl_ppa_cached(prev_ppa)) {
1113 			ftl_invalidate_addr(dev, prev_ppa);
1114 		}
1115 
1116 		/* If previous entry is part of cache, remove and invalidate it */
1117 		if (ftl_rwb_entry_valid(prev)) {
1118 			ftl_invalidate_addr(dev, prev->ppa);
1119 			ftl_rwb_entry_invalidate(prev);
1120 		}
1121 
1122 		ftl_l2p_set(dev, entry->lba, ppa);
1123 		pthread_spin_unlock(&prev->lock);
1124 		return;
1125 	}
1126 
1127 	/* Lock the band containing previous PPA. This assures atomic changes to */
1128 	/* the L2P as wall as metadata. The valid bits in metadata are used to */
1129 	/* check weak writes validity. */
1130 	band = ftl_band_from_ppa(dev, prev_ppa);
1131 	pthread_spin_lock(&band->md.lock);
1132 
1133 	valid = ftl_invalidate_addr_unlocked(dev, prev_ppa);
1134 
1135 	/* If the address has been invalidated already, we don't want to update */
1136 	/* the L2P for weak writes, as it means the write is no longer valid. */
1137 	if (!ftl_rwb_entry_weak(entry) || valid) {
1138 		ftl_l2p_set(dev, entry->lba, ppa);
1139 	}
1140 
1141 	pthread_spin_unlock(&band->md.lock);
1142 }
1143 
1144 static struct ftl_io *
1145 ftl_io_init_child_write(struct ftl_io *parent, struct ftl_ppa ppa,
1146 			void *data, void *md, spdk_ftl_fn cb)
1147 {
1148 	struct ftl_io *io;
1149 	struct spdk_ftl_dev *dev = parent->dev;
1150 	struct ftl_io_init_opts opts = {
1151 		.dev		= dev,
1152 		.io		= NULL,
1153 		.parent		= parent,
1154 		.rwb_batch	= NULL,
1155 		.band		= parent->band,
1156 		.size		= sizeof(struct ftl_io),
1157 		.flags		= 0,
1158 		.type		= FTL_IO_WRITE,
1159 		.iov_cnt	= 1,
1160 		.req_size	= dev->xfer_size,
1161 		.fn		= cb,
1162 		.data		= data,
1163 		.md		= md,
1164 	};
1165 
1166 	io = ftl_io_init_internal(&opts);
1167 	if (!io) {
1168 		return NULL;
1169 	}
1170 
1171 	io->ppa = ppa;
1172 
1173 	return io;
1174 }
1175 
1176 static void
1177 ftl_io_child_write_cb(void *ctx, int status)
1178 {
1179 	struct ftl_chunk *chunk;
1180 	struct ftl_io *io = ctx;
1181 
1182 	chunk = ftl_band_chunk_from_ppa(io->band, io->ppa);
1183 	chunk->busy = false;
1184 }
1185 
1186 static int
1187 ftl_submit_child_write(struct ftl_wptr *wptr, struct ftl_io *io, int lbk_cnt)
1188 {
1189 	struct spdk_ftl_dev	*dev = io->dev;
1190 	struct ftl_io		*child;
1191 	struct iovec		*iov = ftl_io_iovec(io);
1192 	int			rc;
1193 
1194 	/* Split IO to child requests and release chunk immediately after child is completed */
1195 	child = ftl_io_init_child_write(io, wptr->ppa, iov[io->iov_pos].iov_base,
1196 					ftl_io_get_md(io), ftl_io_child_write_cb);
1197 	if (!child) {
1198 		return -EAGAIN;
1199 	}
1200 
1201 	rc = spdk_nvme_ns_cmd_write_with_md(dev->ns, ftl_get_write_qpair(dev),
1202 					    ftl_io_iovec_addr(child), child->md,
1203 					    ftl_ppa_addr_pack(dev, wptr->ppa),
1204 					    lbk_cnt, ftl_io_cmpl_cb, child, 0, 0, 0);
1205 	if (rc) {
1206 		ftl_io_fail(child, rc);
1207 		ftl_io_complete(child);
1208 		SPDK_ERRLOG("spdk_nvme_ns_cmd_write failed with status:%d, ppa:%lu\n",
1209 			    rc, wptr->ppa.ppa);
1210 
1211 		return -EIO;
1212 	}
1213 
1214 	ftl_io_inc_req(child);
1215 	ftl_io_advance(child, lbk_cnt);
1216 
1217 	return 0;
1218 }
1219 
1220 static int
1221 ftl_submit_write(struct ftl_wptr *wptr, struct ftl_io *io)
1222 {
1223 	struct spdk_ftl_dev	*dev = io->dev;
1224 	struct iovec		*iov = ftl_io_iovec(io);
1225 	int			rc = 0;
1226 	size_t			lbk_cnt;
1227 
1228 	while (io->iov_pos < io->iov_cnt) {
1229 		lbk_cnt = iov[io->iov_pos].iov_len / PAGE_SIZE;
1230 		assert(iov[io->iov_pos].iov_len > 0);
1231 		assert(lbk_cnt == dev->xfer_size);
1232 
1233 		/* There are no guarantees of the order of completion of NVMe IO submission queue */
1234 		/* so wait until chunk is not busy before submitting another write */
1235 		if (wptr->chunk->busy) {
1236 			wptr->current_io = io;
1237 			rc = -EAGAIN;
1238 			break;
1239 		}
1240 
1241 		rc = ftl_submit_child_write(wptr, io, lbk_cnt);
1242 
1243 		if (rc == -EAGAIN) {
1244 			wptr->current_io = io;
1245 			break;
1246 		} else if (rc) {
1247 			ftl_io_fail(io, rc);
1248 			break;
1249 		}
1250 
1251 		ftl_trace_submission(dev, io, wptr->ppa, lbk_cnt);
1252 
1253 		/* Update parent iovec */
1254 		ftl_io_advance(io, lbk_cnt);
1255 
1256 		ftl_wptr_advance(wptr, lbk_cnt);
1257 	}
1258 
1259 	if (ftl_io_done(io)) {
1260 		/* Parent IO will complete after all children are completed */
1261 		ftl_io_complete(io);
1262 	}
1263 
1264 	return rc;
1265 }
1266 
1267 static void
1268 ftl_flush_pad_batch(struct spdk_ftl_dev *dev)
1269 {
1270 	struct ftl_rwb *rwb = dev->rwb;
1271 	size_t size;
1272 
1273 	size = ftl_rwb_num_acquired(rwb, FTL_RWB_TYPE_INTERNAL) +
1274 	       ftl_rwb_num_acquired(rwb, FTL_RWB_TYPE_USER);
1275 
1276 	/* There must be something in the RWB, otherwise the flush */
1277 	/* wouldn't be waiting for anything */
1278 	assert(size > 0);
1279 
1280 	/* Only add padding when there's less than xfer size */
1281 	/* entries in the buffer. Otherwise we just have to wait */
1282 	/* for the entries to become ready. */
1283 	if (size < dev->xfer_size) {
1284 		ftl_rwb_pad(dev, dev->xfer_size - (size % dev->xfer_size));
1285 	}
1286 }
1287 
1288 static int
1289 ftl_wptr_process_writes(struct ftl_wptr *wptr)
1290 {
1291 	struct spdk_ftl_dev	*dev = wptr->dev;
1292 	struct ftl_rwb_batch	*batch;
1293 	struct ftl_rwb_entry	*entry;
1294 	struct ftl_io		*io;
1295 	struct ftl_ppa		ppa, prev_ppa;
1296 
1297 	if (wptr->current_io) {
1298 		if (ftl_submit_write(wptr, wptr->current_io) == -EAGAIN) {
1299 			return 0;
1300 		}
1301 		wptr->current_io = NULL;
1302 	}
1303 
1304 	/* Make sure the band is prepared for writing */
1305 	if (!ftl_wptr_ready(wptr)) {
1306 		return 0;
1307 	}
1308 
1309 	if (dev->halt) {
1310 		ftl_process_shutdown(dev);
1311 	}
1312 
1313 	batch = ftl_rwb_pop(dev->rwb);
1314 	if (!batch) {
1315 		/* If there are queued flush requests we need to pad the RWB to */
1316 		/* force out remaining entries */
1317 		if (!LIST_EMPTY(&dev->flush_list)) {
1318 			ftl_flush_pad_batch(dev);
1319 		}
1320 
1321 		return 0;
1322 	}
1323 
1324 	io = ftl_io_rwb_init(dev, wptr->band, batch, ftl_write_cb);
1325 	if (!io) {
1326 		goto error;
1327 	}
1328 
1329 	ppa = wptr->ppa;
1330 	ftl_rwb_foreach(entry, batch) {
1331 		entry->ppa = ppa;
1332 
1333 		if (entry->lba != FTL_LBA_INVALID) {
1334 			pthread_spin_lock(&entry->lock);
1335 			prev_ppa = ftl_l2p_get(dev, entry->lba);
1336 
1337 			/* If the l2p was updated in the meantime, don't update band's metadata */
1338 			if (ftl_ppa_cached(prev_ppa) && prev_ppa.offset == entry->pos) {
1339 				/* Setting entry's cache bit needs to be done after metadata */
1340 				/* within the band is updated to make sure that writes */
1341 				/* invalidating the entry clear the metadata as well */
1342 				ftl_band_set_addr(wptr->band, entry->lba, entry->ppa);
1343 				ftl_rwb_entry_set_valid(entry);
1344 			}
1345 			pthread_spin_unlock(&entry->lock);
1346 		}
1347 
1348 		ftl_trace_rwb_pop(dev, entry);
1349 		ftl_update_rwb_stats(dev, entry);
1350 
1351 		ppa = ftl_band_next_ppa(wptr->band, ppa, 1);
1352 	}
1353 
1354 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write ppa:%lx, %lx\n", wptr->ppa.ppa,
1355 		      ftl_ppa_addr_pack(dev, wptr->ppa));
1356 
1357 	if (ftl_submit_write(wptr, io)) {
1358 		/* TODO: we need some recovery here */
1359 		assert(0 && "Write submit failed");
1360 		if (ftl_io_done(io)) {
1361 			ftl_io_free(io);
1362 		}
1363 	}
1364 
1365 	return dev->xfer_size;
1366 error:
1367 	ftl_rwb_batch_revert(batch);
1368 	return 0;
1369 }
1370 
1371 static int
1372 ftl_process_writes(struct spdk_ftl_dev *dev)
1373 {
1374 	struct ftl_wptr *wptr, *twptr;
1375 	size_t num_active = 0;
1376 	enum ftl_band_state state;
1377 
1378 	LIST_FOREACH_SAFE(wptr, &dev->wptr_list, list_entry, twptr) {
1379 		ftl_wptr_process_writes(wptr);
1380 		state = wptr->band->state;
1381 
1382 		if (state != FTL_BAND_STATE_FULL &&
1383 		    state != FTL_BAND_STATE_CLOSING &&
1384 		    state != FTL_BAND_STATE_CLOSED) {
1385 			num_active++;
1386 		}
1387 	}
1388 
1389 	if (num_active < 1) {
1390 		ftl_add_wptr(dev);
1391 	}
1392 
1393 	return 0;
1394 }
1395 
1396 static void
1397 ftl_rwb_entry_fill(struct ftl_rwb_entry *entry, struct ftl_io *io)
1398 {
1399 	struct ftl_band *band;
1400 
1401 	memcpy(entry->data, ftl_io_iovec_addr(io), FTL_BLOCK_SIZE);
1402 
1403 	if (ftl_rwb_entry_weak(entry)) {
1404 		band = ftl_band_from_ppa(io->dev, io->ppa);
1405 		entry->ppa = ftl_band_next_ppa(band, io->ppa, io->pos);
1406 	}
1407 
1408 	entry->trace = io->trace;
1409 	entry->lba = ftl_io_current_lba(io);
1410 
1411 	if (entry->md) {
1412 		memcpy(entry->md, &entry->lba, sizeof(entry->lba));
1413 	}
1414 }
1415 
1416 static int
1417 ftl_rwb_fill(struct ftl_io *io)
1418 {
1419 	struct spdk_ftl_dev *dev = io->dev;
1420 	struct ftl_rwb_entry *entry;
1421 	struct ftl_ppa ppa = { .cached = 1 };
1422 	int flags = ftl_rwb_flags_from_io(io);
1423 
1424 	while (io->pos < io->lbk_cnt) {
1425 		if (ftl_io_current_lba(io) == FTL_LBA_INVALID) {
1426 			ftl_io_advance(io, 1);
1427 			continue;
1428 		}
1429 
1430 		entry = ftl_acquire_entry(dev, flags);
1431 		if (!entry) {
1432 			return -EAGAIN;
1433 		}
1434 
1435 		ftl_rwb_entry_fill(entry, io);
1436 
1437 		ppa.offset = entry->pos;
1438 
1439 		ftl_trace_rwb_fill(dev, io);
1440 		ftl_update_l2p(dev, entry, ppa);
1441 		ftl_io_advance(io, 1);
1442 
1443 		/* Needs to be done after L2P is updated to avoid race with */
1444 		/* write completion callback when it's processed faster than */
1445 		/* L2P is set in update_l2p(). */
1446 		ftl_rwb_push(entry);
1447 	}
1448 
1449 	if (ftl_io_done(io)) {
1450 		if (dev->nv_cache.bdev_desc) {
1451 			ftl_write_nv_cache(io);
1452 		} else {
1453 			ftl_io_complete(io);
1454 		}
1455 	}
1456 
1457 	return 0;
1458 }
1459 
1460 static bool
1461 ftl_dev_needs_defrag(struct spdk_ftl_dev *dev)
1462 {
1463 	const struct spdk_ftl_limit *limit = ftl_get_limit(dev, SPDK_FTL_LIMIT_START);
1464 
1465 	if (ftl_reloc_is_halted(dev->reloc)) {
1466 		return false;
1467 	}
1468 
1469 	if (dev->df_band) {
1470 		return false;
1471 	}
1472 
1473 	if (dev->num_free <= limit->thld) {
1474 		return true;
1475 	}
1476 
1477 	return false;
1478 }
1479 
1480 static double
1481 ftl_band_calc_merit(struct ftl_band *band, size_t *threshold_valid)
1482 {
1483 	size_t usable, valid, invalid;
1484 	double vld_ratio;
1485 
1486 	/* If the band doesn't have any usable lbks it's of no use */
1487 	usable = ftl_band_num_usable_lbks(band);
1488 	if (usable == 0) {
1489 		return 0.0;
1490 	}
1491 
1492 	valid =  threshold_valid ? (usable - *threshold_valid) : band->md.num_vld;
1493 	invalid = usable - valid;
1494 
1495 	/* Add one to avoid division by 0 */
1496 	vld_ratio = (double)invalid / (double)(valid + 1);
1497 	return vld_ratio * ftl_band_age(band);
1498 }
1499 
1500 static bool
1501 ftl_band_needs_defrag(struct ftl_band *band, struct spdk_ftl_dev *dev)
1502 {
1503 	struct spdk_ftl_conf *conf = &dev->conf;
1504 	size_t thld_vld;
1505 
1506 	/* If we're in dire need of free bands, every band is worth defragging */
1507 	if (ftl_current_limit(dev) == SPDK_FTL_LIMIT_CRIT) {
1508 		return true;
1509 	}
1510 
1511 	thld_vld = (ftl_band_num_usable_lbks(band) * conf->defrag.invalid_thld) / 100;
1512 
1513 	return band->merit > ftl_band_calc_merit(band, &thld_vld);
1514 }
1515 
1516 static struct ftl_band *
1517 ftl_select_defrag_band(struct spdk_ftl_dev *dev)
1518 {
1519 	struct ftl_band *band, *mband = NULL;
1520 	double merit = 0;
1521 
1522 	LIST_FOREACH(band, &dev->shut_bands, list_entry) {
1523 		assert(band->state == FTL_BAND_STATE_CLOSED);
1524 		band->merit = ftl_band_calc_merit(band, NULL);
1525 		if (band->merit > merit) {
1526 			merit = band->merit;
1527 			mband = band;
1528 		}
1529 	}
1530 
1531 	if (mband && !ftl_band_needs_defrag(mband, dev)) {
1532 		mband = NULL;
1533 	}
1534 
1535 	return mband;
1536 }
1537 
1538 static void
1539 ftl_process_relocs(struct spdk_ftl_dev *dev)
1540 {
1541 	struct ftl_band *band;
1542 
1543 	if (ftl_dev_needs_defrag(dev)) {
1544 		band = dev->df_band = ftl_select_defrag_band(dev);
1545 
1546 		if (band) {
1547 			ftl_reloc_add(dev->reloc, band, 0, ftl_num_band_lbks(dev), 0);
1548 			ftl_trace_defrag_band(dev, band);
1549 		}
1550 	}
1551 
1552 	ftl_reloc(dev->reloc);
1553 }
1554 
1555 int
1556 ftl_current_limit(const struct spdk_ftl_dev *dev)
1557 {
1558 	return dev->limit;
1559 }
1560 
1561 void
1562 spdk_ftl_dev_get_attrs(const struct spdk_ftl_dev *dev, struct spdk_ftl_attrs *attrs)
1563 {
1564 	attrs->uuid = dev->uuid;
1565 	attrs->lbk_cnt = dev->num_lbas;
1566 	attrs->lbk_size = FTL_BLOCK_SIZE;
1567 	attrs->range = dev->range;
1568 	attrs->cache_bdev_desc = dev->nv_cache.bdev_desc;
1569 }
1570 
1571 static void
1572 _ftl_io_write(void *ctx)
1573 {
1574 	ftl_io_write((struct ftl_io *)ctx);
1575 }
1576 
1577 int
1578 ftl_io_write(struct ftl_io *io)
1579 {
1580 	struct spdk_ftl_dev *dev = io->dev;
1581 
1582 	/* For normal IOs we just need to copy the data onto the rwb */
1583 	if (!(io->flags & FTL_IO_MD)) {
1584 		return ftl_rwb_fill(io);
1585 	}
1586 
1587 	/* Metadata has its own buffer, so it doesn't have to be copied, so just */
1588 	/* send it the the core thread and schedule the write immediately */
1589 	if (ftl_check_core_thread(dev)) {
1590 		return ftl_submit_write(ftl_wptr_from_band(io->band), io);
1591 	}
1592 
1593 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_write, io);
1594 
1595 	return 0;
1596 }
1597 
1598 static int
1599 _spdk_ftl_write(struct ftl_io *io)
1600 {
1601 	int rc;
1602 
1603 	rc = ftl_io_write(io);
1604 	if (rc == -EAGAIN) {
1605 		spdk_thread_send_msg(spdk_io_channel_get_thread(io->ioch),
1606 				     _ftl_write, io);
1607 		return 0;
1608 	}
1609 
1610 	if (rc) {
1611 		ftl_io_free(io);
1612 	}
1613 
1614 	return rc;
1615 }
1616 
1617 static void
1618 _ftl_write(void *ctx)
1619 {
1620 	_spdk_ftl_write(ctx);
1621 }
1622 
1623 int
1624 spdk_ftl_write(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
1625 	       struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
1626 {
1627 	struct ftl_io *io;
1628 
1629 	if (iov_cnt == 0 || iov_cnt > FTL_MAX_IOV) {
1630 		return -EINVAL;
1631 	}
1632 
1633 	if (lba_cnt == 0) {
1634 		return -EINVAL;
1635 	}
1636 
1637 	if (lba_cnt != ftl_iovec_num_lbks(iov, iov_cnt)) {
1638 		return -EINVAL;
1639 	}
1640 
1641 	if (!dev->initialized) {
1642 		return -EBUSY;
1643 	}
1644 
1645 	io = ftl_io_alloc(ch);
1646 	if (!io) {
1647 		return -ENOMEM;
1648 	}
1649 
1650 	ftl_io_user_init(dev, io, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_WRITE);
1651 	return _spdk_ftl_write(io);
1652 }
1653 
1654 int
1655 ftl_io_read(struct ftl_io *io)
1656 {
1657 	struct spdk_ftl_dev *dev = io->dev;
1658 	ftl_next_ppa_fn	next_ppa;
1659 
1660 	if (ftl_check_read_thread(dev)) {
1661 		if (ftl_io_mode_ppa(io)) {
1662 			next_ppa = ftl_ppa_read_next_ppa;
1663 		} else {
1664 			next_ppa = ftl_lba_read_next_ppa;
1665 		}
1666 
1667 		return ftl_submit_read(io, next_ppa);
1668 	}
1669 
1670 	spdk_thread_send_msg(ftl_get_read_thread(dev), _ftl_read, io);
1671 	return 0;
1672 }
1673 
1674 static void
1675 _ftl_read(void *arg)
1676 {
1677 	ftl_io_read((struct ftl_io *)arg);
1678 }
1679 
1680 int
1681 spdk_ftl_read(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
1682 	      struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
1683 {
1684 	struct ftl_io *io;
1685 
1686 	if (iov_cnt == 0 || iov_cnt > FTL_MAX_IOV) {
1687 		return -EINVAL;
1688 	}
1689 
1690 	if (lba_cnt == 0) {
1691 		return -EINVAL;
1692 	}
1693 
1694 	if (lba_cnt != ftl_iovec_num_lbks(iov, iov_cnt)) {
1695 		return -EINVAL;
1696 	}
1697 
1698 	if (!dev->initialized) {
1699 		return -EBUSY;
1700 	}
1701 
1702 	io = ftl_io_alloc(ch);
1703 	if (!io) {
1704 		return -ENOMEM;
1705 	}
1706 
1707 	ftl_io_user_init(dev, io, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_READ);
1708 	ftl_io_read(io);
1709 	return 0;
1710 }
1711 
1712 static struct ftl_flush *
1713 ftl_flush_init(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
1714 {
1715 	struct ftl_flush *flush;
1716 	struct ftl_rwb *rwb = dev->rwb;
1717 
1718 	flush = calloc(1, sizeof(*flush));
1719 	if (!flush) {
1720 		return NULL;
1721 	}
1722 
1723 	flush->bmap = spdk_bit_array_create(ftl_rwb_num_batches(rwb));
1724 	if (!flush->bmap) {
1725 		goto error;
1726 	}
1727 
1728 	flush->dev = dev;
1729 	flush->cb.fn = cb_fn;
1730 	flush->cb.ctx = cb_arg;
1731 
1732 	return flush;
1733 error:
1734 	free(flush);
1735 	return NULL;
1736 }
1737 
1738 static void
1739 _ftl_flush(void *ctx)
1740 {
1741 	struct ftl_flush *flush = ctx;
1742 	struct spdk_ftl_dev *dev = flush->dev;
1743 	struct ftl_rwb *rwb = dev->rwb;
1744 	struct ftl_rwb_batch *batch;
1745 
1746 	/* Attach flush object to all non-empty batches */
1747 	ftl_rwb_foreach_batch(batch, rwb) {
1748 		if (!ftl_rwb_batch_empty(batch)) {
1749 			spdk_bit_array_set(flush->bmap, ftl_rwb_batch_get_offset(batch));
1750 			flush->num_req++;
1751 		}
1752 	}
1753 
1754 	LIST_INSERT_HEAD(&dev->flush_list, flush, list_entry);
1755 
1756 	/* If the RWB was already empty, the flush can be completed right away */
1757 	if (!flush->num_req) {
1758 		ftl_complete_flush(flush);
1759 	}
1760 }
1761 
1762 int
1763 spdk_ftl_flush(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
1764 {
1765 	struct ftl_flush *flush;
1766 
1767 	if (!dev->initialized) {
1768 		return -EBUSY;
1769 	}
1770 
1771 	flush = ftl_flush_init(dev, cb_fn, cb_arg);
1772 	if (!flush) {
1773 		return -ENOMEM;
1774 	}
1775 
1776 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_flush, flush);
1777 	return 0;
1778 }
1779 
1780 void
1781 ftl_process_anm_event(struct ftl_anm_event *event)
1782 {
1783 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Unconsumed ANM received for dev: %p...\n", event->dev);
1784 	ftl_anm_event_complete(event);
1785 }
1786 
1787 static void
1788 ftl_process_retry_queue(struct spdk_ftl_dev *dev)
1789 {
1790 	struct ftl_io *io;
1791 	int rc;
1792 
1793 	while (!TAILQ_EMPTY(&dev->retry_queue)) {
1794 		io = TAILQ_FIRST(&dev->retry_queue);
1795 
1796 		/* Retry only if IO is still healthy */
1797 		if (spdk_likely(io->status == 0)) {
1798 			rc = ftl_io_read(io);
1799 			if (rc == -ENOMEM) {
1800 				break;
1801 			}
1802 		}
1803 
1804 		io->flags &= ~FTL_IO_RETRY;
1805 		TAILQ_REMOVE(&dev->retry_queue, io, retry_entry);
1806 
1807 		if (ftl_io_done(io)) {
1808 			ftl_io_complete(io);
1809 		}
1810 	}
1811 }
1812 
1813 int
1814 ftl_task_read(void *ctx)
1815 {
1816 	struct ftl_thread *thread = ctx;
1817 	struct spdk_ftl_dev *dev = thread->dev;
1818 	struct spdk_nvme_qpair *qpair = ftl_get_read_qpair(dev);
1819 	size_t num_completed;
1820 
1821 	if (dev->halt) {
1822 		if (ftl_shutdown_complete(dev)) {
1823 			spdk_poller_unregister(&thread->poller);
1824 			return 0;
1825 		}
1826 	}
1827 
1828 	num_completed = spdk_nvme_qpair_process_completions(qpair, 0);
1829 
1830 	if (num_completed && !TAILQ_EMPTY(&dev->retry_queue)) {
1831 		ftl_process_retry_queue(dev);
1832 	}
1833 
1834 	return num_completed;
1835 }
1836 
1837 int
1838 ftl_task_core(void *ctx)
1839 {
1840 	struct ftl_thread *thread = ctx;
1841 	struct spdk_ftl_dev *dev = thread->dev;
1842 	struct spdk_nvme_qpair *qpair = ftl_get_write_qpair(dev);
1843 
1844 	if (dev->halt) {
1845 		if (ftl_shutdown_complete(dev)) {
1846 			spdk_poller_unregister(&thread->poller);
1847 			return 0;
1848 		}
1849 	}
1850 
1851 	ftl_process_writes(dev);
1852 	spdk_nvme_qpair_process_completions(qpair, 0);
1853 	ftl_process_relocs(dev);
1854 
1855 	return 0;
1856 }
1857 
1858 SPDK_LOG_REGISTER_COMPONENT("ftl_core", SPDK_LOG_FTL_CORE)
1859