xref: /spdk/lib/ftl/ftl_core.c (revision 9889ab2dc80e40dae92dcef361d53dcba722043d)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/likely.h"
35 #include "spdk/stdinc.h"
36 #include "spdk/nvme.h"
37 #include "spdk/io_channel.h"
38 #include "spdk/bdev_module.h"
39 #include "spdk/string.h"
40 #include "spdk_internal/log.h"
41 #include "spdk/ftl.h"
42 #include "spdk/crc32.h"
43 
44 #include "ftl_core.h"
45 #include "ftl_band.h"
46 #include "ftl_io.h"
47 #include "ftl_anm.h"
48 #include "ftl_rwb.h"
49 #include "ftl_debug.h"
50 #include "ftl_reloc.h"
51 
52 struct ftl_band_flush {
53 	struct spdk_ftl_dev		*dev;
54 	/* Number of bands left to be flushed */
55 	size_t				num_bands;
56 	/* User callback */
57 	spdk_ftl_fn			cb_fn;
58 	/* Callback's argument */
59 	void				*cb_arg;
60 	/* List link */
61 	LIST_ENTRY(ftl_band_flush)	list_entry;
62 };
63 
64 struct ftl_wptr {
65 	/* Owner device */
66 	struct spdk_ftl_dev		*dev;
67 
68 	/* Current PPA */
69 	struct ftl_ppa			ppa;
70 
71 	/* Band currently being written to */
72 	struct ftl_band			*band;
73 
74 	/* Current logical block's offset */
75 	uint64_t			offset;
76 
77 	/* Current erase block */
78 	struct ftl_chunk		*chunk;
79 
80 	/* Pending IO queue */
81 	TAILQ_HEAD(, ftl_io)		pending_queue;
82 
83 	/* List link */
84 	LIST_ENTRY(ftl_wptr)		list_entry;
85 
86 	/*
87 	 * If setup in direct mode, there will be no offset or band state update after IO.
88 	 * The PPA is not assigned by wptr, and is instead taken directly from the request.
89 	 */
90 	bool				direct_mode;
91 
92 	/* Number of outstanding write requests */
93 	uint32_t			num_outstanding;
94 
95 	/* Marks that the band related to this wptr needs to be closed as soon as possible */
96 	bool				flush;
97 };
98 
99 struct ftl_flush {
100 	/* Owner device */
101 	struct spdk_ftl_dev		*dev;
102 
103 	/* Number of batches to wait for */
104 	size_t				num_req;
105 
106 	/* Callback */
107 	struct {
108 		spdk_ftl_fn		fn;
109 		void			*ctx;
110 	} cb;
111 
112 	/* Batch bitmap */
113 	struct spdk_bit_array		*bmap;
114 
115 	/* List link */
116 	LIST_ENTRY(ftl_flush)		list_entry;
117 };
118 
119 static int
120 ftl_rwb_flags_from_io(const struct ftl_io *io)
121 {
122 	int valid_flags = FTL_IO_INTERNAL | FTL_IO_WEAK | FTL_IO_PAD;
123 	return io->flags & valid_flags;
124 }
125 
126 static int
127 ftl_rwb_entry_weak(const struct ftl_rwb_entry *entry)
128 {
129 	return entry->flags & FTL_IO_WEAK;
130 }
131 
132 static void
133 ftl_wptr_free(struct ftl_wptr *wptr)
134 {
135 	if (!wptr) {
136 		return;
137 	}
138 
139 	free(wptr);
140 }
141 
142 static void
143 ftl_remove_wptr(struct ftl_wptr *wptr)
144 {
145 	struct spdk_ftl_dev *dev = wptr->dev;
146 	struct ftl_band_flush *flush, *tmp;
147 
148 	if (spdk_unlikely(wptr->flush)) {
149 		LIST_FOREACH_SAFE(flush, &dev->band_flush_list, list_entry, tmp) {
150 			assert(flush->num_bands > 0);
151 			if (--flush->num_bands == 0) {
152 				flush->cb_fn(flush->cb_arg, 0);
153 				LIST_REMOVE(flush, list_entry);
154 				free(flush);
155 			}
156 		}
157 	}
158 
159 	LIST_REMOVE(wptr, list_entry);
160 	ftl_wptr_free(wptr);
161 }
162 
163 static void
164 ftl_io_cmpl_cb(void *arg, const struct spdk_nvme_cpl *status)
165 {
166 	struct ftl_io *io = arg;
167 
168 	if (spdk_nvme_cpl_is_error(status)) {
169 		ftl_io_process_error(io, status);
170 	}
171 
172 	ftl_trace_completion(io->dev, io, FTL_TRACE_COMPLETION_DISK);
173 
174 	ftl_io_dec_req(io);
175 	if (ftl_io_done(io)) {
176 		ftl_io_complete(io);
177 	}
178 }
179 
180 static void
181 ftl_halt_writes(struct spdk_ftl_dev *dev, struct ftl_band *band)
182 {
183 	struct ftl_wptr *wptr = NULL;
184 
185 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
186 		if (wptr->band == band) {
187 			break;
188 		}
189 	}
190 
191 	/* If the band already has the high_prio flag set, other writes must */
192 	/* have failed earlier, so it's already taken care of. */
193 	if (band->high_prio) {
194 		assert(wptr == NULL);
195 		return;
196 	}
197 
198 	ftl_band_write_failed(band);
199 	ftl_remove_wptr(wptr);
200 }
201 
202 static struct ftl_wptr *
203 ftl_wptr_from_band(struct ftl_band *band)
204 {
205 	struct spdk_ftl_dev *dev = band->dev;
206 	struct ftl_wptr *wptr = NULL;
207 
208 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
209 		if (wptr->band == band) {
210 			return wptr;
211 		}
212 	}
213 
214 	return NULL;
215 }
216 
217 static void
218 ftl_md_write_fail(struct ftl_io *io, int status)
219 {
220 	struct ftl_band *band = io->band;
221 	struct ftl_wptr *wptr;
222 	char buf[128];
223 
224 	wptr = ftl_wptr_from_band(band);
225 	assert(wptr);
226 
227 	SPDK_ERRLOG("Metadata write failed @ppa: %s, status: %d\n",
228 		    ftl_ppa2str(wptr->ppa, buf, sizeof(buf)), status);
229 
230 	ftl_halt_writes(io->dev, band);
231 }
232 
233 static void
234 ftl_md_write_cb(struct ftl_io *io, void *arg, int status)
235 {
236 	struct spdk_ftl_dev *dev = io->dev;
237 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
238 	struct ftl_band *band = io->band;
239 	struct ftl_wptr *wptr;
240 	size_t id;
241 
242 	wptr = ftl_wptr_from_band(band);
243 	assert(wptr);
244 
245 	if (status) {
246 		ftl_md_write_fail(io, status);
247 		return;
248 	}
249 
250 	ftl_band_set_next_state(band);
251 	if (band->state == FTL_BAND_STATE_CLOSED) {
252 		if (ftl_dev_has_nv_cache(dev)) {
253 			pthread_spin_lock(&nv_cache->lock);
254 			nv_cache->num_available += ftl_band_user_lbks(band);
255 
256 			if (spdk_unlikely(nv_cache->num_available > nv_cache->num_data_blocks)) {
257 				nv_cache->num_available = nv_cache->num_data_blocks;
258 			}
259 			pthread_spin_unlock(&nv_cache->lock);
260 		}
261 
262 		/*
263 		 * Go through the reloc_bitmap, checking for all the bands that had its data moved
264 		 * onto current band and update their counters to allow them to be used for writing
265 		 * (once they're closed and empty).
266 		 */
267 		for (id = 0; id < ftl_dev_num_bands(dev); ++id) {
268 			if (spdk_bit_array_get(band->reloc_bitmap, id)) {
269 				assert(dev->bands[id].num_reloc_bands > 0);
270 				dev->bands[id].num_reloc_bands--;
271 
272 				spdk_bit_array_clear(band->reloc_bitmap, id);
273 			}
274 		}
275 
276 		ftl_remove_wptr(wptr);
277 	}
278 }
279 
280 static int
281 ftl_ppa_read_next_ppa(struct ftl_io *io, struct ftl_ppa *ppa)
282 {
283 	struct spdk_ftl_dev *dev = io->dev;
284 	size_t lbk_cnt, max_lbks;
285 
286 	assert(ftl_io_mode_ppa(io));
287 	assert(io->iov_pos < io->iov_cnt);
288 
289 	if (io->pos == 0) {
290 		*ppa = io->ppa;
291 	} else {
292 		*ppa = ftl_band_next_xfer_ppa(io->band, io->ppa, io->pos);
293 	}
294 
295 	assert(!ftl_ppa_invalid(*ppa));
296 
297 	/* Metadata has to be read in the way it's written (jumping across */
298 	/* the chunks in xfer_size increments) */
299 	if (io->flags & FTL_IO_MD) {
300 		max_lbks = dev->xfer_size - (ppa->lbk % dev->xfer_size);
301 		lbk_cnt = spdk_min(ftl_io_iovec_len_left(io), max_lbks);
302 		assert(ppa->lbk / dev->xfer_size == (ppa->lbk + lbk_cnt - 1) / dev->xfer_size);
303 	} else {
304 		lbk_cnt = ftl_io_iovec_len_left(io);
305 	}
306 
307 	return lbk_cnt;
308 }
309 
310 static int
311 ftl_wptr_close_band(struct ftl_wptr *wptr)
312 {
313 	struct ftl_band *band = wptr->band;
314 
315 	ftl_band_set_state(band, FTL_BAND_STATE_CLOSING);
316 
317 	return ftl_band_write_tail_md(band, ftl_md_write_cb);
318 }
319 
320 static int
321 ftl_wptr_open_band(struct ftl_wptr *wptr)
322 {
323 	struct ftl_band *band = wptr->band;
324 
325 	assert(ftl_band_chunk_is_first(band, wptr->chunk));
326 	assert(band->lba_map.num_vld == 0);
327 
328 	ftl_band_clear_lba_map(band);
329 
330 	assert(band->state == FTL_BAND_STATE_PREP);
331 	ftl_band_set_state(band, FTL_BAND_STATE_OPENING);
332 
333 	return ftl_band_write_head_md(band, ftl_md_write_cb);
334 }
335 
336 static int
337 ftl_submit_erase(struct ftl_io *io)
338 {
339 	struct spdk_ftl_dev *dev = io->dev;
340 	struct ftl_band *band = io->band;
341 	struct ftl_ppa ppa = io->ppa;
342 	struct ftl_chunk *chunk;
343 	uint64_t ppa_packed;
344 	int rc = 0;
345 	size_t i;
346 
347 	for (i = 0; i < io->lbk_cnt; ++i) {
348 		if (i != 0) {
349 			chunk = ftl_band_next_chunk(band, ftl_band_chunk_from_ppa(band, ppa));
350 			assert(chunk->state == FTL_CHUNK_STATE_CLOSED ||
351 			       chunk->state == FTL_CHUNK_STATE_VACANT);
352 			ppa = chunk->start_ppa;
353 		}
354 
355 		assert(ppa.lbk == 0);
356 		ppa_packed = ftl_ppa_addr_pack(dev, ppa);
357 
358 		ftl_trace_submission(dev, io, ppa, 1);
359 		rc = spdk_nvme_ocssd_ns_cmd_vector_reset(dev->ns, ftl_get_write_qpair(dev),
360 				&ppa_packed, 1, NULL, ftl_io_cmpl_cb, io);
361 		if (spdk_unlikely(rc)) {
362 			ftl_io_fail(io, rc);
363 			SPDK_ERRLOG("Vector reset failed with status: %d\n", rc);
364 			break;
365 		}
366 
367 		ftl_io_inc_req(io);
368 		ftl_io_advance(io, 1);
369 	}
370 
371 	if (ftl_io_done(io)) {
372 		ftl_io_complete(io);
373 	}
374 
375 	return rc;
376 }
377 
378 static void
379 _ftl_io_erase(void *ctx)
380 {
381 	ftl_io_erase((struct ftl_io *)ctx);
382 }
383 
384 static bool
385 ftl_check_core_thread(const struct spdk_ftl_dev *dev)
386 {
387 	return dev->core_thread.thread == spdk_get_thread();
388 }
389 
390 static bool
391 ftl_check_read_thread(const struct spdk_ftl_dev *dev)
392 {
393 	return dev->read_thread.thread == spdk_get_thread();
394 }
395 
396 struct spdk_io_channel *
397 ftl_get_io_channel(const struct spdk_ftl_dev *dev)
398 {
399 	if (ftl_check_core_thread(dev)) {
400 		return dev->core_thread.ioch;
401 	}
402 	if (ftl_check_read_thread(dev)) {
403 		return dev->read_thread.ioch;
404 	}
405 
406 	assert(0);
407 	return NULL;
408 }
409 
410 int
411 ftl_io_erase(struct ftl_io *io)
412 {
413 	struct spdk_ftl_dev *dev = io->dev;
414 
415 	if (ftl_check_core_thread(dev)) {
416 		return ftl_submit_erase(io);
417 	}
418 
419 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_erase, io);
420 	return 0;
421 }
422 
423 static struct ftl_band *
424 ftl_next_write_band(struct spdk_ftl_dev *dev)
425 {
426 	struct ftl_band *band;
427 
428 	/* Find a free band that has all of its data moved onto other closed bands */
429 	LIST_FOREACH(band, &dev->free_bands, list_entry) {
430 		assert(band->state == FTL_BAND_STATE_FREE);
431 		if (band->num_reloc_bands == 0 && band->num_reloc_blocks == 0) {
432 			break;
433 		}
434 	}
435 
436 	if (spdk_unlikely(!band)) {
437 		return NULL;
438 	}
439 
440 	if (ftl_band_erase(band)) {
441 		/* TODO: handle erase failure */
442 		return NULL;
443 	}
444 
445 	return band;
446 }
447 
448 static struct ftl_band *
449 ftl_next_wptr_band(struct spdk_ftl_dev *dev)
450 {
451 	struct ftl_band *band;
452 
453 	if (!dev->next_band) {
454 		band = ftl_next_write_band(dev);
455 	} else {
456 		assert(dev->next_band->state == FTL_BAND_STATE_PREP);
457 		band = dev->next_band;
458 		dev->next_band = NULL;
459 	}
460 
461 	return band;
462 }
463 
464 static struct ftl_wptr *
465 ftl_wptr_init(struct ftl_band *band)
466 {
467 	struct spdk_ftl_dev *dev = band->dev;
468 	struct ftl_wptr *wptr;
469 
470 	wptr = calloc(1, sizeof(*wptr));
471 	if (!wptr) {
472 		return NULL;
473 	}
474 
475 	wptr->dev = dev;
476 	wptr->band = band;
477 	wptr->chunk = CIRCLEQ_FIRST(&band->chunks);
478 	wptr->ppa = wptr->chunk->start_ppa;
479 	TAILQ_INIT(&wptr->pending_queue);
480 
481 	return wptr;
482 }
483 
484 static int
485 ftl_add_direct_wptr(struct ftl_band *band)
486 {
487 	struct spdk_ftl_dev *dev = band->dev;
488 	struct ftl_wptr *wptr;
489 
490 	assert(band->state == FTL_BAND_STATE_OPEN);
491 
492 	wptr = ftl_wptr_init(band);
493 	if (!wptr) {
494 		return -1;
495 	}
496 
497 	wptr->direct_mode = true;
498 
499 	if (ftl_band_alloc_lba_map(band)) {
500 		ftl_wptr_free(wptr);
501 		return -1;
502 	}
503 
504 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
505 
506 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: direct band %u\n", band->id);
507 	ftl_trace_write_band(dev, band);
508 	return 0;
509 }
510 
511 static void
512 ftl_close_direct_wptr(struct ftl_band *band)
513 {
514 	struct ftl_wptr *wptr = ftl_wptr_from_band(band);
515 
516 	assert(wptr);
517 	assert(wptr->direct_mode);
518 	assert(band->state == FTL_BAND_STATE_CLOSED);
519 
520 	ftl_band_release_lba_map(band);
521 
522 	ftl_remove_wptr(wptr);
523 }
524 
525 int
526 ftl_band_set_direct_access(struct ftl_band *band, bool access)
527 {
528 	if (access) {
529 		return ftl_add_direct_wptr(band);
530 	} else {
531 		ftl_close_direct_wptr(band);
532 		return 0;
533 	}
534 }
535 
536 static int
537 ftl_add_wptr(struct spdk_ftl_dev *dev)
538 {
539 	struct ftl_band *band;
540 	struct ftl_wptr *wptr;
541 
542 	band = ftl_next_wptr_band(dev);
543 	if (!band) {
544 		return -1;
545 	}
546 
547 	wptr = ftl_wptr_init(band);
548 	if (!wptr) {
549 		return -1;
550 	}
551 
552 	if (ftl_band_write_prep(band)) {
553 		ftl_wptr_free(wptr);
554 		return -1;
555 	}
556 
557 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
558 
559 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: band %u\n", band->id);
560 	ftl_trace_write_band(dev, band);
561 	return 0;
562 }
563 
564 static void
565 ftl_wptr_advance(struct ftl_wptr *wptr, size_t xfer_size)
566 {
567 	struct ftl_band *band = wptr->band;
568 	struct spdk_ftl_dev *dev = wptr->dev;
569 	struct spdk_ftl_conf *conf = &dev->conf;
570 	size_t next_thld;
571 
572 	if (spdk_unlikely(wptr->direct_mode)) {
573 		return;
574 	}
575 
576 	wptr->offset += xfer_size;
577 	next_thld = (ftl_band_num_usable_lbks(band) * conf->band_thld) / 100;
578 
579 	if (ftl_band_full(band, wptr->offset)) {
580 		ftl_band_set_state(band, FTL_BAND_STATE_FULL);
581 	}
582 
583 	wptr->chunk->busy = true;
584 	wptr->ppa = ftl_band_next_xfer_ppa(band, wptr->ppa, xfer_size);
585 	wptr->chunk = ftl_band_next_operational_chunk(band, wptr->chunk);
586 
587 	assert(!ftl_ppa_invalid(wptr->ppa));
588 
589 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: grp:%d, pu:%d chunk:%d, lbk:%u\n",
590 		      wptr->ppa.grp, wptr->ppa.pu, wptr->ppa.chk, wptr->ppa.lbk);
591 
592 	if (wptr->offset >= next_thld && !dev->next_band) {
593 		dev->next_band = ftl_next_write_band(dev);
594 	}
595 }
596 
597 static size_t
598 ftl_wptr_user_lbks_left(const struct ftl_wptr *wptr)
599 {
600 	return ftl_band_user_lbks_left(wptr->band, wptr->offset);
601 }
602 
603 static int
604 ftl_wptr_ready(struct ftl_wptr *wptr)
605 {
606 	struct ftl_band *band = wptr->band;
607 
608 	/* TODO: add handling of empty bands */
609 
610 	if (spdk_unlikely(!ftl_chunk_is_writable(wptr->chunk))) {
611 		/* Erasing band may fail after it was assigned to wptr. */
612 		if (spdk_unlikely(wptr->chunk->state == FTL_CHUNK_STATE_BAD)) {
613 			ftl_wptr_advance(wptr, wptr->dev->xfer_size);
614 		}
615 		return 0;
616 	}
617 
618 	/* If we're in the process of writing metadata, wait till it is */
619 	/* completed. */
620 	/* TODO: we should probably change bands once we're writing tail md */
621 	if (ftl_band_state_changing(band)) {
622 		return 0;
623 	}
624 
625 	if (band->state == FTL_BAND_STATE_FULL) {
626 		if (wptr->num_outstanding == 0) {
627 			if (ftl_wptr_close_band(wptr)) {
628 				/* TODO: need recovery here */
629 				assert(false);
630 			}
631 		}
632 
633 		return 0;
634 	}
635 
636 	if (band->state != FTL_BAND_STATE_OPEN) {
637 		if (ftl_wptr_open_band(wptr)) {
638 			/* TODO: need recovery here */
639 			assert(false);
640 		}
641 
642 		return 0;
643 	}
644 
645 	return 1;
646 }
647 
648 int
649 ftl_flush_active_bands(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
650 {
651 	struct ftl_wptr *wptr;
652 	struct ftl_band_flush *flush;
653 
654 	assert(ftl_get_core_thread(dev) == spdk_get_thread());
655 
656 	flush = calloc(1, sizeof(*flush));
657 	if (spdk_unlikely(!flush)) {
658 		return -ENOMEM;
659 	}
660 
661 	LIST_INSERT_HEAD(&dev->band_flush_list, flush, list_entry);
662 
663 	flush->cb_fn = cb_fn;
664 	flush->cb_arg = cb_arg;
665 	flush->dev = dev;
666 
667 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
668 		wptr->flush = true;
669 		flush->num_bands++;
670 	}
671 
672 	return 0;
673 }
674 
675 static const struct spdk_ftl_limit *
676 ftl_get_limit(const struct spdk_ftl_dev *dev, int type)
677 {
678 	assert(type < SPDK_FTL_LIMIT_MAX);
679 	return &dev->conf.limits[type];
680 }
681 
682 static bool
683 ftl_cache_lba_valid(struct spdk_ftl_dev *dev, struct ftl_rwb_entry *entry)
684 {
685 	struct ftl_ppa ppa;
686 
687 	/* If the LBA is invalid don't bother checking the md and l2p */
688 	if (spdk_unlikely(entry->lba == FTL_LBA_INVALID)) {
689 		return false;
690 	}
691 
692 	ppa = ftl_l2p_get(dev, entry->lba);
693 	if (!(ftl_ppa_cached(ppa) && ppa.offset == entry->pos)) {
694 		return false;
695 	}
696 
697 	return true;
698 }
699 
700 static void
701 ftl_evict_cache_entry(struct spdk_ftl_dev *dev, struct ftl_rwb_entry *entry)
702 {
703 	pthread_spin_lock(&entry->lock);
704 
705 	if (!ftl_rwb_entry_valid(entry)) {
706 		goto unlock;
707 	}
708 
709 	/* If the l2p wasn't updated and still points at the entry, fill it with the */
710 	/* on-disk PPA and clear the cache status bit. Otherwise, skip the l2p update */
711 	/* and just clear the cache status. */
712 	if (!ftl_cache_lba_valid(dev, entry)) {
713 		goto clear;
714 	}
715 
716 	ftl_l2p_set(dev, entry->lba, entry->ppa);
717 clear:
718 	ftl_rwb_entry_invalidate(entry);
719 unlock:
720 	pthread_spin_unlock(&entry->lock);
721 }
722 
723 static struct ftl_rwb_entry *
724 ftl_acquire_entry(struct spdk_ftl_dev *dev, int flags)
725 {
726 	struct ftl_rwb_entry *entry;
727 
728 	entry = ftl_rwb_acquire(dev->rwb, ftl_rwb_type_from_flags(flags));
729 	if (!entry) {
730 		return NULL;
731 	}
732 
733 	ftl_evict_cache_entry(dev, entry);
734 
735 	entry->flags = flags;
736 	return entry;
737 }
738 
739 static void
740 ftl_rwb_pad(struct spdk_ftl_dev *dev, size_t size)
741 {
742 	struct ftl_rwb_entry *entry;
743 	int flags = FTL_IO_PAD | FTL_IO_INTERNAL;
744 
745 	for (size_t i = 0; i < size; ++i) {
746 		entry = ftl_acquire_entry(dev, flags);
747 		if (!entry) {
748 			break;
749 		}
750 
751 		entry->lba = FTL_LBA_INVALID;
752 		entry->ppa = ftl_to_ppa(FTL_PPA_INVALID);
753 		memset(entry->data, 0, FTL_BLOCK_SIZE);
754 		ftl_rwb_push(entry);
755 	}
756 }
757 
758 static void
759 ftl_remove_free_bands(struct spdk_ftl_dev *dev)
760 {
761 	while (!LIST_EMPTY(&dev->free_bands)) {
762 		LIST_REMOVE(LIST_FIRST(&dev->free_bands), list_entry);
763 	}
764 
765 	dev->next_band = NULL;
766 }
767 
768 static void
769 ftl_wptr_pad_band(struct ftl_wptr *wptr)
770 {
771 	struct spdk_ftl_dev *dev = wptr->dev;
772 	size_t size = ftl_rwb_num_pending(dev->rwb);
773 	size_t blocks_left, rwb_size, pad_size;
774 
775 	blocks_left = ftl_wptr_user_lbks_left(wptr);
776 	assert(size <= blocks_left);
777 	assert(blocks_left % dev->xfer_size == 0);
778 	rwb_size = ftl_rwb_size(dev->rwb) - size;
779 	pad_size = spdk_min(blocks_left - size, rwb_size);
780 
781 	/* Pad write buffer until band is full */
782 	ftl_rwb_pad(dev, pad_size);
783 }
784 
785 static void
786 ftl_wptr_process_shutdown(struct ftl_wptr *wptr)
787 {
788 	struct spdk_ftl_dev *dev = wptr->dev;
789 	size_t size = ftl_rwb_num_pending(dev->rwb);
790 	size_t num_active = dev->xfer_size * ftl_rwb_get_active_batches(dev->rwb);
791 
792 	num_active = num_active ? num_active : dev->xfer_size;
793 	if (size >= num_active) {
794 		return;
795 	}
796 
797 	/* If we reach this point we need to remove free bands */
798 	/* and pad current wptr band to the end */
799 	if (ftl_rwb_get_active_batches(dev->rwb) <= 1) {
800 		ftl_remove_free_bands(dev);
801 	}
802 
803 	ftl_wptr_pad_band(wptr);
804 }
805 
806 static int
807 ftl_shutdown_complete(struct spdk_ftl_dev *dev)
808 {
809 	return !__atomic_load_n(&dev->num_inflight, __ATOMIC_SEQ_CST) &&
810 	       LIST_EMPTY(&dev->wptr_list) && TAILQ_EMPTY(&dev->retry_queue);
811 }
812 
813 void
814 ftl_apply_limits(struct spdk_ftl_dev *dev)
815 {
816 	const struct spdk_ftl_limit *limit;
817 	struct ftl_stats *stats = &dev->stats;
818 	size_t rwb_limit[FTL_RWB_TYPE_MAX];
819 	int i;
820 
821 	ftl_rwb_get_limits(dev->rwb, rwb_limit);
822 
823 	/* Clear existing limit */
824 	dev->limit = SPDK_FTL_LIMIT_MAX;
825 
826 	for (i = SPDK_FTL_LIMIT_CRIT; i < SPDK_FTL_LIMIT_MAX; ++i) {
827 		limit = ftl_get_limit(dev, i);
828 
829 		if (dev->num_free <= limit->thld) {
830 			rwb_limit[FTL_RWB_TYPE_USER] =
831 				(limit->limit * ftl_rwb_entry_cnt(dev->rwb)) / 100;
832 			stats->limits[i]++;
833 			dev->limit = i;
834 			goto apply;
835 		}
836 	}
837 
838 	/* Clear the limits, since we don't need to apply them anymore */
839 	rwb_limit[FTL_RWB_TYPE_USER] = ftl_rwb_entry_cnt(dev->rwb);
840 apply:
841 	ftl_trace_limits(dev, rwb_limit, dev->num_free);
842 	ftl_rwb_set_limits(dev->rwb, rwb_limit);
843 }
844 
845 static int
846 ftl_invalidate_addr_unlocked(struct spdk_ftl_dev *dev, struct ftl_ppa ppa)
847 {
848 	struct ftl_band *band = ftl_band_from_ppa(dev, ppa);
849 	struct ftl_lba_map *lba_map = &band->lba_map;
850 	uint64_t offset;
851 
852 	offset = ftl_band_lbkoff_from_ppa(band, ppa);
853 
854 	/* The bit might be already cleared if two writes are scheduled to the */
855 	/* same LBA at the same time */
856 	if (spdk_bit_array_get(lba_map->vld, offset)) {
857 		assert(lba_map->num_vld > 0);
858 		spdk_bit_array_clear(lba_map->vld, offset);
859 		lba_map->num_vld--;
860 		return 1;
861 	}
862 
863 	return 0;
864 }
865 
866 int
867 ftl_invalidate_addr(struct spdk_ftl_dev *dev, struct ftl_ppa ppa)
868 {
869 	struct ftl_band *band;
870 	int rc;
871 
872 	assert(!ftl_ppa_cached(ppa));
873 	band = ftl_band_from_ppa(dev, ppa);
874 
875 	pthread_spin_lock(&band->lba_map.lock);
876 	rc = ftl_invalidate_addr_unlocked(dev, ppa);
877 	pthread_spin_unlock(&band->lba_map.lock);
878 
879 	return rc;
880 }
881 
882 static int
883 ftl_read_retry(int rc)
884 {
885 	return rc == -EAGAIN;
886 }
887 
888 static int
889 ftl_read_canceled(int rc)
890 {
891 	return rc == -EFAULT || rc == 0;
892 }
893 
894 static void
895 ftl_add_to_retry_queue(struct ftl_io *io)
896 {
897 	if (!(io->flags & FTL_IO_RETRY)) {
898 		io->flags |= FTL_IO_RETRY;
899 		TAILQ_INSERT_TAIL(&io->dev->retry_queue, io, retry_entry);
900 	}
901 }
902 
903 static int
904 ftl_ppa_cache_read(struct ftl_io *io, uint64_t lba,
905 		   struct ftl_ppa ppa, void *buf)
906 {
907 	struct ftl_rwb *rwb = io->dev->rwb;
908 	struct ftl_rwb_entry *entry;
909 	struct ftl_ppa nppa;
910 	int rc = 0;
911 
912 	entry = ftl_rwb_entry_from_offset(rwb, ppa.offset);
913 	pthread_spin_lock(&entry->lock);
914 
915 	nppa = ftl_l2p_get(io->dev, lba);
916 	if (ppa.ppa != nppa.ppa) {
917 		rc = -1;
918 		goto out;
919 	}
920 
921 	memcpy(buf, entry->data, FTL_BLOCK_SIZE);
922 out:
923 	pthread_spin_unlock(&entry->lock);
924 	return rc;
925 }
926 
927 static int
928 ftl_lba_read_next_ppa(struct ftl_io *io, struct ftl_ppa *ppa)
929 {
930 	struct spdk_ftl_dev *dev = io->dev;
931 	struct ftl_ppa next_ppa;
932 	size_t i;
933 
934 	*ppa = ftl_l2p_get(dev, ftl_io_current_lba(io));
935 
936 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Read ppa:%lx, lba:%lu\n",
937 		      ppa->ppa, ftl_io_current_lba(io));
938 
939 	/* If the PPA is invalid, skip it (the buffer should already be zero'ed) */
940 	if (ftl_ppa_invalid(*ppa)) {
941 		return -EFAULT;
942 	}
943 
944 	if (ftl_ppa_cached(*ppa)) {
945 		if (!ftl_ppa_cache_read(io, ftl_io_current_lba(io), *ppa, ftl_io_iovec_addr(io))) {
946 			return 0;
947 		}
948 
949 		/* If the state changed, we have to re-read the l2p */
950 		return -EAGAIN;
951 	}
952 
953 	for (i = 1; i < ftl_io_iovec_len_left(io); ++i) {
954 		next_ppa = ftl_l2p_get(dev, ftl_io_get_lba(io, io->pos + i));
955 
956 		if (ftl_ppa_invalid(next_ppa) || ftl_ppa_cached(next_ppa)) {
957 			break;
958 		}
959 
960 		if (ftl_ppa_addr_pack(dev, *ppa) + i != ftl_ppa_addr_pack(dev, next_ppa)) {
961 			break;
962 		}
963 	}
964 
965 	return i;
966 }
967 
968 static int
969 ftl_submit_read(struct ftl_io *io)
970 {
971 	struct spdk_ftl_dev *dev = io->dev;
972 	struct ftl_ppa ppa;
973 	int rc = 0, lbk_cnt;
974 
975 	assert(LIST_EMPTY(&io->children));
976 
977 	while (io->pos < io->lbk_cnt) {
978 		if (ftl_io_mode_ppa(io)) {
979 			lbk_cnt = rc = ftl_ppa_read_next_ppa(io, &ppa);
980 		} else {
981 			lbk_cnt = rc = ftl_lba_read_next_ppa(io, &ppa);
982 		}
983 
984 		/* We might need to retry the read from scratch (e.g. */
985 		/* because write was under way and completed before */
986 		/* we could read it from rwb */
987 		if (ftl_read_retry(rc)) {
988 			continue;
989 		}
990 
991 		/* We don't have to schedule the read, as it was read from cache */
992 		if (ftl_read_canceled(rc)) {
993 			ftl_io_advance(io, 1);
994 			ftl_trace_completion(io->dev, io, rc ? FTL_TRACE_COMPLETION_INVALID :
995 					     FTL_TRACE_COMPLETION_CACHE);
996 			rc = 0;
997 			continue;
998 		}
999 
1000 		assert(lbk_cnt > 0);
1001 
1002 		ftl_trace_submission(dev, io, ppa, lbk_cnt);
1003 		rc = spdk_nvme_ns_cmd_read(dev->ns, ftl_get_read_qpair(dev),
1004 					   ftl_io_iovec_addr(io),
1005 					   ftl_ppa_addr_pack(io->dev, ppa), lbk_cnt,
1006 					   ftl_io_cmpl_cb, io, 0);
1007 		if (spdk_unlikely(rc)) {
1008 			if (rc == -ENOMEM) {
1009 				ftl_add_to_retry_queue(io);
1010 			} else {
1011 				ftl_io_fail(io, rc);
1012 			}
1013 			break;
1014 		}
1015 
1016 		ftl_io_inc_req(io);
1017 		ftl_io_advance(io, lbk_cnt);
1018 	}
1019 
1020 	/* If we didn't have to read anything from the device, */
1021 	/* complete the request right away */
1022 	if (ftl_io_done(io)) {
1023 		ftl_io_complete(io);
1024 	}
1025 
1026 	return rc;
1027 }
1028 
1029 static void
1030 ftl_complete_flush(struct ftl_flush *flush)
1031 {
1032 	assert(flush->num_req == 0);
1033 	LIST_REMOVE(flush, list_entry);
1034 
1035 	flush->cb.fn(flush->cb.ctx, 0);
1036 
1037 	spdk_bit_array_free(&flush->bmap);
1038 	free(flush);
1039 }
1040 
1041 static void
1042 ftl_process_flush(struct spdk_ftl_dev *dev, struct ftl_rwb_batch *batch)
1043 {
1044 	struct ftl_flush *flush, *tflush;
1045 	size_t offset;
1046 
1047 	LIST_FOREACH_SAFE(flush, &dev->flush_list, list_entry, tflush) {
1048 		offset = ftl_rwb_batch_get_offset(batch);
1049 
1050 		if (spdk_bit_array_get(flush->bmap, offset)) {
1051 			spdk_bit_array_clear(flush->bmap, offset);
1052 			if (!(--flush->num_req)) {
1053 				ftl_complete_flush(flush);
1054 			}
1055 		}
1056 	}
1057 }
1058 
1059 static void
1060 ftl_nv_cache_wrap_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1061 {
1062 	struct ftl_nv_cache *nv_cache = cb_arg;
1063 
1064 	if (!success) {
1065 		SPDK_ERRLOG("Unable to write non-volatile cache metadata header\n");
1066 		/* TODO: go into read-only mode */
1067 		assert(0);
1068 	}
1069 
1070 	pthread_spin_lock(&nv_cache->lock);
1071 	nv_cache->ready = true;
1072 	pthread_spin_unlock(&nv_cache->lock);
1073 
1074 	spdk_bdev_free_io(bdev_io);
1075 }
1076 
1077 static void
1078 ftl_nv_cache_wrap(void *ctx)
1079 {
1080 	struct ftl_nv_cache *nv_cache = ctx;
1081 	int rc;
1082 
1083 	rc = ftl_nv_cache_write_header(nv_cache, false, ftl_nv_cache_wrap_cb, nv_cache);
1084 	if (spdk_unlikely(rc != 0)) {
1085 		SPDK_ERRLOG("Unable to write non-volatile cache metadata header: %s\n",
1086 			    spdk_strerror(-rc));
1087 		/* TODO: go into read-only mode */
1088 		assert(0);
1089 	}
1090 }
1091 
1092 static uint64_t
1093 ftl_reserve_nv_cache(struct ftl_nv_cache *nv_cache, size_t *num_lbks, unsigned int *phase)
1094 {
1095 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1096 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1097 	uint64_t num_available, cache_size, cache_addr = FTL_LBA_INVALID;
1098 
1099 	cache_size = spdk_bdev_get_num_blocks(bdev);
1100 
1101 	pthread_spin_lock(&nv_cache->lock);
1102 	if (spdk_unlikely(nv_cache->num_available == 0 || !nv_cache->ready)) {
1103 		goto out;
1104 	}
1105 
1106 	num_available = spdk_min(nv_cache->num_available, *num_lbks);
1107 	num_available = spdk_min(num_available, dev->conf.nv_cache.max_request_cnt);
1108 
1109 	if (spdk_unlikely(nv_cache->current_addr + num_available > cache_size)) {
1110 		*num_lbks = cache_size - nv_cache->current_addr;
1111 	} else {
1112 		*num_lbks = num_available;
1113 	}
1114 
1115 	cache_addr = nv_cache->current_addr;
1116 	nv_cache->current_addr += *num_lbks;
1117 	nv_cache->num_available -= *num_lbks;
1118 	*phase = nv_cache->phase;
1119 
1120 	if (nv_cache->current_addr == spdk_bdev_get_num_blocks(bdev)) {
1121 		nv_cache->current_addr = FTL_NV_CACHE_DATA_OFFSET;
1122 		nv_cache->phase = ftl_nv_cache_next_phase(nv_cache->phase);
1123 		nv_cache->ready = false;
1124 		spdk_thread_send_msg(ftl_get_core_thread(dev), ftl_nv_cache_wrap, nv_cache);
1125 	}
1126 out:
1127 	pthread_spin_unlock(&nv_cache->lock);
1128 	return cache_addr;
1129 }
1130 
1131 static struct ftl_io *
1132 ftl_alloc_io_nv_cache(struct ftl_io *parent, size_t num_lbks)
1133 {
1134 	struct ftl_io_init_opts opts = {
1135 		.dev		= parent->dev,
1136 		.parent		= parent,
1137 		.data		= ftl_io_iovec_addr(parent),
1138 		.lbk_cnt	= num_lbks,
1139 		.flags		= parent->flags | FTL_IO_CACHE,
1140 	};
1141 
1142 	return ftl_io_init_internal(&opts);
1143 }
1144 
1145 static void
1146 ftl_nv_cache_submit_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1147 {
1148 	struct ftl_io *io = cb_arg;
1149 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1150 
1151 	if (spdk_unlikely(!success)) {
1152 		SPDK_ERRLOG("Non-volatile cache write failed at %"PRIx64"\n", io->ppa.ppa);
1153 		io->status = -EIO;
1154 	}
1155 
1156 	ftl_io_dec_req(io);
1157 	if (ftl_io_done(io)) {
1158 		spdk_mempool_put(nv_cache->md_pool, io->md);
1159 		ftl_io_complete(io);
1160 	}
1161 
1162 	spdk_bdev_free_io(bdev_io);
1163 }
1164 
1165 static void
1166 ftl_submit_nv_cache(void *ctx)
1167 {
1168 	struct ftl_io *io = ctx;
1169 	struct spdk_ftl_dev *dev = io->dev;
1170 	struct spdk_thread *thread;
1171 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
1172 	struct ftl_io_channel *ioch;
1173 	int rc;
1174 
1175 	ioch = spdk_io_channel_get_ctx(io->ioch);
1176 	thread = spdk_io_channel_get_thread(io->ioch);
1177 
1178 	rc = spdk_bdev_write_blocks_with_md(nv_cache->bdev_desc, ioch->cache_ioch,
1179 					    ftl_io_iovec_addr(io), io->md, io->ppa.ppa,
1180 					    io->lbk_cnt, ftl_nv_cache_submit_cb, io);
1181 	if (rc == -ENOMEM) {
1182 		spdk_thread_send_msg(thread, ftl_submit_nv_cache, io);
1183 		return;
1184 	} else if (rc) {
1185 		SPDK_ERRLOG("Write to persistent cache failed: %s (%"PRIu64", %"PRIu64")\n",
1186 			    spdk_strerror(-rc), io->ppa.ppa, io->lbk_cnt);
1187 		spdk_mempool_put(nv_cache->md_pool, io->md);
1188 		io->status = -EIO;
1189 		ftl_io_complete(io);
1190 		return;
1191 	}
1192 
1193 	ftl_io_advance(io, io->lbk_cnt);
1194 	ftl_io_inc_req(io);
1195 }
1196 
1197 static void
1198 ftl_nv_cache_fill_md(struct ftl_io *io, unsigned int phase)
1199 {
1200 	struct spdk_bdev *bdev;
1201 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1202 	uint64_t lbk_off, lba;
1203 	void *md_buf = io->md;
1204 
1205 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1206 
1207 	for (lbk_off = 0; lbk_off < io->lbk_cnt; ++lbk_off) {
1208 		lba = ftl_nv_cache_pack_lba(ftl_io_get_lba(io, lbk_off), phase);
1209 		memcpy(md_buf, &lba, sizeof(lba));
1210 		md_buf += spdk_bdev_get_md_size(bdev);
1211 	}
1212 }
1213 
1214 static void
1215 _ftl_write_nv_cache(void *ctx)
1216 {
1217 	struct ftl_io *child, *io = ctx;
1218 	struct spdk_ftl_dev *dev = io->dev;
1219 	struct spdk_thread *thread;
1220 	unsigned int phase;
1221 	uint64_t num_lbks;
1222 
1223 	thread = spdk_io_channel_get_thread(io->ioch);
1224 
1225 	while (io->pos < io->lbk_cnt) {
1226 		num_lbks = ftl_io_iovec_len_left(io);
1227 
1228 		child = ftl_alloc_io_nv_cache(io, num_lbks);
1229 		if (spdk_unlikely(!child)) {
1230 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1231 			return;
1232 		}
1233 
1234 		child->md = spdk_mempool_get(dev->nv_cache.md_pool);
1235 		if (spdk_unlikely(!child->md)) {
1236 			ftl_io_free(child);
1237 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1238 			break;
1239 		}
1240 
1241 		/* Reserve area on the write buffer cache */
1242 		child->ppa.ppa = ftl_reserve_nv_cache(&dev->nv_cache, &num_lbks, &phase);
1243 		if (child->ppa.ppa == FTL_LBA_INVALID) {
1244 			spdk_mempool_put(dev->nv_cache.md_pool, child->md);
1245 			ftl_io_free(child);
1246 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1247 			break;
1248 		}
1249 
1250 		/* Shrink the IO if there isn't enough room in the cache to fill the whole iovec */
1251 		if (spdk_unlikely(num_lbks != ftl_io_iovec_len_left(io))) {
1252 			ftl_io_shrink_iovec(child, num_lbks);
1253 		}
1254 
1255 		ftl_nv_cache_fill_md(child, phase);
1256 		ftl_submit_nv_cache(child);
1257 	}
1258 
1259 	if (ftl_io_done(io)) {
1260 		ftl_io_complete(io);
1261 	}
1262 }
1263 
1264 static void
1265 ftl_write_nv_cache(struct ftl_io *parent)
1266 {
1267 	ftl_io_reset(parent);
1268 	parent->flags |= FTL_IO_CACHE;
1269 	_ftl_write_nv_cache(parent);
1270 }
1271 
1272 int
1273 ftl_nv_cache_write_header(struct ftl_nv_cache *nv_cache, bool shutdown,
1274 			  spdk_bdev_io_completion_cb cb_fn, void *cb_arg)
1275 {
1276 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1277 	struct ftl_nv_cache_header *hdr = nv_cache->dma_buf;
1278 	struct spdk_bdev *bdev;
1279 	struct ftl_io_channel *ioch;
1280 
1281 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1282 	ioch = spdk_io_channel_get_ctx(ftl_get_io_channel(dev));
1283 
1284 	memset(hdr, 0, spdk_bdev_get_block_size(bdev));
1285 
1286 	hdr->phase = (uint8_t)nv_cache->phase;
1287 	hdr->size = spdk_bdev_get_num_blocks(bdev);
1288 	hdr->uuid = dev->uuid;
1289 	hdr->version = FTL_NV_CACHE_HEADER_VERSION;
1290 	hdr->current_addr = shutdown ? nv_cache->current_addr : FTL_LBA_INVALID;
1291 	hdr->checksum = spdk_crc32c_update(hdr, offsetof(struct ftl_nv_cache_header, checksum), 0);
1292 
1293 	return spdk_bdev_write_blocks(nv_cache->bdev_desc, ioch->cache_ioch, hdr, 0, 1,
1294 				      cb_fn, cb_arg);
1295 }
1296 
1297 int
1298 ftl_nv_cache_scrub(struct ftl_nv_cache *nv_cache, spdk_bdev_io_completion_cb cb_fn, void *cb_arg)
1299 {
1300 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1301 	struct ftl_io_channel *ioch;
1302 	struct spdk_bdev *bdev;
1303 
1304 	ioch = spdk_io_channel_get_ctx(ftl_get_io_channel(dev));
1305 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1306 
1307 	return spdk_bdev_write_zeroes_blocks(nv_cache->bdev_desc, ioch->cache_ioch, 1,
1308 					     spdk_bdev_get_num_blocks(bdev) - 1,
1309 					     cb_fn, cb_arg);
1310 }
1311 
1312 static void
1313 ftl_write_fail(struct ftl_io *io, int status)
1314 {
1315 	struct ftl_rwb_batch *batch = io->rwb_batch;
1316 	struct spdk_ftl_dev *dev = io->dev;
1317 	struct ftl_rwb_entry *entry;
1318 	struct ftl_band *band;
1319 	char buf[128];
1320 
1321 	entry = ftl_rwb_batch_first_entry(batch);
1322 
1323 	band = ftl_band_from_ppa(io->dev, entry->ppa);
1324 	SPDK_ERRLOG("Write failed @ppa: %s, status: %d\n",
1325 		    ftl_ppa2str(entry->ppa, buf, sizeof(buf)), status);
1326 
1327 	/* Close the band and, halt wptr and defrag */
1328 	ftl_halt_writes(dev, band);
1329 
1330 	ftl_rwb_foreach(entry, batch) {
1331 		/* Invalidate meta set by process_writes() */
1332 		ftl_invalidate_addr(dev, entry->ppa);
1333 	}
1334 
1335 	/* Reset the batch back to the the RWB to resend it later */
1336 	ftl_rwb_batch_revert(batch);
1337 }
1338 
1339 static void
1340 ftl_write_cb(struct ftl_io *io, void *arg, int status)
1341 {
1342 	struct spdk_ftl_dev *dev = io->dev;
1343 	struct ftl_rwb_batch *batch = io->rwb_batch;
1344 	struct ftl_rwb_entry *entry;
1345 	struct ftl_band *band;
1346 
1347 	if (status) {
1348 		ftl_write_fail(io, status);
1349 		return;
1350 	}
1351 
1352 	assert(io->lbk_cnt == dev->xfer_size);
1353 	ftl_rwb_foreach(entry, batch) {
1354 		band = entry->band;
1355 		if (!(io->flags & FTL_IO_MD) && !(entry->flags & FTL_IO_PAD)) {
1356 			/* Verify that the LBA is set for user lbks */
1357 			assert(entry->lba != FTL_LBA_INVALID);
1358 		}
1359 
1360 		if (band != NULL) {
1361 			assert(band->num_reloc_blocks > 0);
1362 			band->num_reloc_blocks--;
1363 		}
1364 
1365 		SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write ppa:%lu, lba:%lu\n",
1366 			      entry->ppa.ppa, entry->lba);
1367 	}
1368 
1369 	ftl_process_flush(dev, batch);
1370 	ftl_rwb_batch_release(batch);
1371 }
1372 
1373 static void
1374 ftl_update_rwb_stats(struct spdk_ftl_dev *dev, const struct ftl_rwb_entry *entry)
1375 {
1376 	if (!ftl_rwb_entry_internal(entry)) {
1377 		dev->stats.write_user++;
1378 	}
1379 	dev->stats.write_total++;
1380 }
1381 
1382 static void
1383 ftl_update_l2p(struct spdk_ftl_dev *dev, const struct ftl_rwb_entry *entry,
1384 	       struct ftl_ppa ppa)
1385 {
1386 	struct ftl_ppa prev_ppa;
1387 	struct ftl_rwb_entry *prev;
1388 	struct ftl_band *band;
1389 	int valid;
1390 
1391 	prev_ppa = ftl_l2p_get(dev, entry->lba);
1392 	if (ftl_ppa_invalid(prev_ppa)) {
1393 		ftl_l2p_set(dev, entry->lba, ppa);
1394 		return;
1395 	}
1396 
1397 	/* If the L2P's PPA is different than what we expected we don't need to */
1398 	/* do anything (someone's already overwritten our data). */
1399 	if (ftl_rwb_entry_weak(entry) && !ftl_ppa_cmp(prev_ppa, entry->ppa)) {
1400 		return;
1401 	}
1402 
1403 	if (ftl_ppa_cached(prev_ppa)) {
1404 		assert(!ftl_rwb_entry_weak(entry));
1405 		prev = ftl_rwb_entry_from_offset(dev->rwb, prev_ppa.offset);
1406 		pthread_spin_lock(&prev->lock);
1407 
1408 		/* Re-read the L2P under the lock to protect against updates */
1409 		/* to this LBA from other threads */
1410 		prev_ppa = ftl_l2p_get(dev, entry->lba);
1411 
1412 		/* If the entry is no longer in cache, another write has been */
1413 		/* scheduled in the meantime, so we have to invalidate its LBA */
1414 		if (!ftl_ppa_cached(prev_ppa)) {
1415 			ftl_invalidate_addr(dev, prev_ppa);
1416 		}
1417 
1418 		/* If previous entry is part of cache, remove and invalidate it */
1419 		if (ftl_rwb_entry_valid(prev)) {
1420 			ftl_invalidate_addr(dev, prev->ppa);
1421 			ftl_rwb_entry_invalidate(prev);
1422 		}
1423 
1424 		ftl_l2p_set(dev, entry->lba, ppa);
1425 		pthread_spin_unlock(&prev->lock);
1426 		return;
1427 	}
1428 
1429 	/* Lock the band containing previous PPA. This assures atomic changes to */
1430 	/* the L2P as wall as metadata. The valid bits in metadata are used to */
1431 	/* check weak writes validity. */
1432 	band = ftl_band_from_ppa(dev, prev_ppa);
1433 	pthread_spin_lock(&band->lba_map.lock);
1434 
1435 	valid = ftl_invalidate_addr_unlocked(dev, prev_ppa);
1436 
1437 	/* If the address has been invalidated already, we don't want to update */
1438 	/* the L2P for weak writes, as it means the write is no longer valid. */
1439 	if (!ftl_rwb_entry_weak(entry) || valid) {
1440 		ftl_l2p_set(dev, entry->lba, ppa);
1441 	}
1442 
1443 	pthread_spin_unlock(&band->lba_map.lock);
1444 }
1445 
1446 static struct ftl_io *
1447 ftl_io_init_child_write(struct ftl_io *parent, struct ftl_ppa ppa,
1448 			void *data, void *md, ftl_io_fn cb)
1449 {
1450 	struct ftl_io *io;
1451 	struct spdk_ftl_dev *dev = parent->dev;
1452 	struct ftl_io_init_opts opts = {
1453 		.dev		= dev,
1454 		.io		= NULL,
1455 		.parent		= parent,
1456 		.rwb_batch	= NULL,
1457 		.band		= parent->band,
1458 		.size		= sizeof(struct ftl_io),
1459 		.flags		= 0,
1460 		.type		= FTL_IO_WRITE,
1461 		.lbk_cnt	= dev->xfer_size,
1462 		.cb_fn		= cb,
1463 		.data		= data,
1464 		.md		= md,
1465 	};
1466 
1467 	io = ftl_io_init_internal(&opts);
1468 	if (!io) {
1469 		return NULL;
1470 	}
1471 
1472 	io->ppa = ppa;
1473 
1474 	return io;
1475 }
1476 
1477 static void
1478 ftl_io_child_write_cb(struct ftl_io *io, void *ctx, int status)
1479 {
1480 	struct ftl_chunk *chunk;
1481 	struct ftl_wptr *wptr;
1482 
1483 	chunk = ftl_band_chunk_from_ppa(io->band, io->ppa);
1484 	wptr = ftl_wptr_from_band(io->band);
1485 
1486 	chunk->busy = false;
1487 	chunk->write_offset += io->lbk_cnt;
1488 
1489 	/* If some other write on the same band failed the write pointer would already be freed */
1490 	if (spdk_likely(wptr)) {
1491 		wptr->num_outstanding--;
1492 	}
1493 }
1494 
1495 static int
1496 ftl_submit_child_write(struct ftl_wptr *wptr, struct ftl_io *io, int lbk_cnt)
1497 {
1498 	struct spdk_ftl_dev	*dev = io->dev;
1499 	struct ftl_io		*child;
1500 	int			rc;
1501 	struct ftl_ppa		ppa;
1502 
1503 	if (spdk_likely(!wptr->direct_mode)) {
1504 		ppa = wptr->ppa;
1505 	} else {
1506 		assert(io->flags & FTL_IO_DIRECT_ACCESS);
1507 		assert(io->ppa.chk == wptr->band->id);
1508 		ppa = io->ppa;
1509 	}
1510 
1511 	/* Split IO to child requests and release chunk immediately after child is completed */
1512 	child = ftl_io_init_child_write(io, ppa, ftl_io_iovec_addr(io),
1513 					ftl_io_get_md(io), ftl_io_child_write_cb);
1514 	if (!child) {
1515 		return -EAGAIN;
1516 	}
1517 
1518 	wptr->num_outstanding++;
1519 	rc = spdk_nvme_ns_cmd_write_with_md(dev->ns, ftl_get_write_qpair(dev),
1520 					    ftl_io_iovec_addr(child), child->md,
1521 					    ftl_ppa_addr_pack(dev, ppa),
1522 					    lbk_cnt, ftl_io_cmpl_cb, child, 0, 0, 0);
1523 	if (rc) {
1524 		wptr->num_outstanding--;
1525 		ftl_io_fail(child, rc);
1526 		ftl_io_complete(child);
1527 		SPDK_ERRLOG("spdk_nvme_ns_cmd_write_with_md failed with status:%d, ppa:%lu\n",
1528 			    rc, ppa.ppa);
1529 		return -EIO;
1530 	}
1531 
1532 	ftl_io_inc_req(child);
1533 	ftl_io_advance(child, lbk_cnt);
1534 
1535 	return 0;
1536 }
1537 
1538 static int
1539 ftl_submit_write(struct ftl_wptr *wptr, struct ftl_io *io)
1540 {
1541 	struct spdk_ftl_dev	*dev = io->dev;
1542 	int			rc = 0;
1543 
1544 	assert(io->lbk_cnt % dev->xfer_size == 0);
1545 
1546 	while (io->iov_pos < io->iov_cnt) {
1547 		/* There are no guarantees of the order of completion of NVMe IO submission queue */
1548 		/* so wait until chunk is not busy before submitting another write */
1549 		if (wptr->chunk->busy) {
1550 			TAILQ_INSERT_TAIL(&wptr->pending_queue, io, retry_entry);
1551 			rc = -EAGAIN;
1552 			break;
1553 		}
1554 
1555 		rc = ftl_submit_child_write(wptr, io, dev->xfer_size);
1556 		if (spdk_unlikely(rc)) {
1557 			if (rc == -EAGAIN) {
1558 				TAILQ_INSERT_TAIL(&wptr->pending_queue, io, retry_entry);
1559 			} else {
1560 				ftl_io_fail(io, rc);
1561 			}
1562 			break;
1563 		}
1564 
1565 		ftl_trace_submission(dev, io, wptr->ppa, dev->xfer_size);
1566 		ftl_wptr_advance(wptr, dev->xfer_size);
1567 	}
1568 
1569 	if (ftl_io_done(io)) {
1570 		/* Parent IO will complete after all children are completed */
1571 		ftl_io_complete(io);
1572 	}
1573 
1574 	return rc;
1575 }
1576 
1577 static void
1578 ftl_flush_pad_batch(struct spdk_ftl_dev *dev)
1579 {
1580 	struct ftl_rwb *rwb = dev->rwb;
1581 	size_t size, num_entries;
1582 
1583 	size = ftl_rwb_num_acquired(rwb, FTL_RWB_TYPE_INTERNAL) +
1584 	       ftl_rwb_num_acquired(rwb, FTL_RWB_TYPE_USER);
1585 
1586 	/* There must be something in the RWB, otherwise the flush */
1587 	/* wouldn't be waiting for anything */
1588 	assert(size > 0);
1589 
1590 	/* Only add padding when there's less than xfer size */
1591 	/* entries in the buffer. Otherwise we just have to wait */
1592 	/* for the entries to become ready. */
1593 	num_entries = ftl_rwb_get_active_batches(dev->rwb) * dev->xfer_size;
1594 	if (size < num_entries) {
1595 		ftl_rwb_pad(dev, num_entries - (size % num_entries));
1596 	}
1597 }
1598 
1599 static int
1600 ftl_wptr_process_writes(struct ftl_wptr *wptr)
1601 {
1602 	struct spdk_ftl_dev	*dev = wptr->dev;
1603 	struct ftl_rwb_batch	*batch;
1604 	struct ftl_rwb_entry	*entry;
1605 	struct ftl_io		*io;
1606 	struct ftl_ppa		ppa, prev_ppa;
1607 
1608 	if (spdk_unlikely(!TAILQ_EMPTY(&wptr->pending_queue))) {
1609 		io = TAILQ_FIRST(&wptr->pending_queue);
1610 		TAILQ_REMOVE(&wptr->pending_queue, io, retry_entry);
1611 
1612 		if (ftl_submit_write(wptr, io) == -EAGAIN) {
1613 			return 0;
1614 		}
1615 	}
1616 
1617 	/* Make sure the band is prepared for writing */
1618 	if (!ftl_wptr_ready(wptr)) {
1619 		return 0;
1620 	}
1621 
1622 	if (dev->halt) {
1623 		ftl_wptr_process_shutdown(wptr);
1624 	}
1625 
1626 	if (spdk_unlikely(wptr->flush)) {
1627 		ftl_wptr_pad_band(wptr);
1628 	}
1629 
1630 	batch = ftl_rwb_pop(dev->rwb);
1631 	if (!batch) {
1632 		/* If there are queued flush requests we need to pad the RWB to */
1633 		/* force out remaining entries */
1634 		if (!LIST_EMPTY(&dev->flush_list)) {
1635 			ftl_flush_pad_batch(dev);
1636 		}
1637 
1638 		return 0;
1639 	}
1640 
1641 	io = ftl_io_rwb_init(dev, wptr->band, batch, ftl_write_cb);
1642 	if (!io) {
1643 		goto error;
1644 	}
1645 
1646 	ppa = wptr->ppa;
1647 	ftl_rwb_foreach(entry, batch) {
1648 		/* Update band's relocation stats if the IO comes from reloc */
1649 		if (entry->flags & FTL_IO_WEAK) {
1650 			if (!spdk_bit_array_get(wptr->band->reloc_bitmap, entry->band->id)) {
1651 				spdk_bit_array_set(wptr->band->reloc_bitmap, entry->band->id);
1652 				entry->band->num_reloc_bands++;
1653 			}
1654 		}
1655 
1656 		entry->ppa = ppa;
1657 		if (entry->lba != FTL_LBA_INVALID) {
1658 			pthread_spin_lock(&entry->lock);
1659 			prev_ppa = ftl_l2p_get(dev, entry->lba);
1660 
1661 			/* If the l2p was updated in the meantime, don't update band's metadata */
1662 			if (ftl_ppa_cached(prev_ppa) && prev_ppa.offset == entry->pos) {
1663 				/* Setting entry's cache bit needs to be done after metadata */
1664 				/* within the band is updated to make sure that writes */
1665 				/* invalidating the entry clear the metadata as well */
1666 				ftl_band_set_addr(wptr->band, entry->lba, entry->ppa);
1667 				ftl_rwb_entry_set_valid(entry);
1668 			}
1669 			pthread_spin_unlock(&entry->lock);
1670 		}
1671 
1672 		ftl_trace_rwb_pop(dev, entry);
1673 		ftl_update_rwb_stats(dev, entry);
1674 
1675 		ppa = ftl_band_next_ppa(wptr->band, ppa, 1);
1676 	}
1677 
1678 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write ppa:%lx, %lx\n", wptr->ppa.ppa,
1679 		      ftl_ppa_addr_pack(dev, wptr->ppa));
1680 
1681 	if (ftl_submit_write(wptr, io)) {
1682 		/* TODO: we need some recovery here */
1683 		assert(0 && "Write submit failed");
1684 		if (ftl_io_done(io)) {
1685 			ftl_io_free(io);
1686 		}
1687 	}
1688 
1689 	return dev->xfer_size;
1690 error:
1691 	ftl_rwb_batch_revert(batch);
1692 	return 0;
1693 }
1694 
1695 static int
1696 ftl_process_writes(struct spdk_ftl_dev *dev)
1697 {
1698 	struct ftl_wptr *wptr, *twptr;
1699 	size_t num_active = 0;
1700 	enum ftl_band_state state;
1701 
1702 	LIST_FOREACH_SAFE(wptr, &dev->wptr_list, list_entry, twptr) {
1703 		ftl_wptr_process_writes(wptr);
1704 		state = wptr->band->state;
1705 
1706 		if (state != FTL_BAND_STATE_FULL &&
1707 		    state != FTL_BAND_STATE_CLOSING &&
1708 		    state != FTL_BAND_STATE_CLOSED) {
1709 			num_active++;
1710 		}
1711 	}
1712 
1713 	if (num_active < 1) {
1714 		ftl_add_wptr(dev);
1715 	}
1716 
1717 	return 0;
1718 }
1719 
1720 static void
1721 ftl_rwb_entry_fill(struct ftl_rwb_entry *entry, struct ftl_io *io)
1722 {
1723 	memcpy(entry->data, ftl_io_iovec_addr(io), FTL_BLOCK_SIZE);
1724 
1725 	if (ftl_rwb_entry_weak(entry)) {
1726 		entry->band = ftl_band_from_ppa(io->dev, io->ppa);
1727 		entry->ppa = ftl_band_next_ppa(entry->band, io->ppa, io->pos);
1728 		entry->band->num_reloc_blocks++;
1729 	}
1730 
1731 	entry->trace = io->trace;
1732 	entry->lba = ftl_io_current_lba(io);
1733 
1734 	if (entry->md) {
1735 		memcpy(entry->md, &entry->lba, sizeof(entry->lba));
1736 	}
1737 }
1738 
1739 static int
1740 ftl_rwb_fill(struct ftl_io *io)
1741 {
1742 	struct spdk_ftl_dev *dev = io->dev;
1743 	struct ftl_rwb_entry *entry;
1744 	struct ftl_ppa ppa = { .cached = 1 };
1745 	int flags = ftl_rwb_flags_from_io(io);
1746 
1747 	while (io->pos < io->lbk_cnt) {
1748 		if (ftl_io_current_lba(io) == FTL_LBA_INVALID) {
1749 			ftl_io_advance(io, 1);
1750 			continue;
1751 		}
1752 
1753 		entry = ftl_acquire_entry(dev, flags);
1754 		if (!entry) {
1755 			return -EAGAIN;
1756 		}
1757 
1758 		ftl_rwb_entry_fill(entry, io);
1759 
1760 		ppa.offset = entry->pos;
1761 
1762 		ftl_trace_rwb_fill(dev, io);
1763 		ftl_update_l2p(dev, entry, ppa);
1764 		ftl_io_advance(io, 1);
1765 
1766 		/* Needs to be done after L2P is updated to avoid race with */
1767 		/* write completion callback when it's processed faster than */
1768 		/* L2P is set in update_l2p(). */
1769 		ftl_rwb_push(entry);
1770 	}
1771 
1772 	if (ftl_io_done(io)) {
1773 		if (ftl_dev_has_nv_cache(dev) && !(io->flags & FTL_IO_BYPASS_CACHE)) {
1774 			ftl_write_nv_cache(io);
1775 		} else {
1776 			ftl_io_complete(io);
1777 		}
1778 	}
1779 
1780 	return 0;
1781 }
1782 
1783 static bool
1784 ftl_dev_needs_defrag(struct spdk_ftl_dev *dev)
1785 {
1786 	const struct spdk_ftl_limit *limit = ftl_get_limit(dev, SPDK_FTL_LIMIT_START);
1787 
1788 	if (ftl_reloc_is_halted(dev->reloc)) {
1789 		return false;
1790 	}
1791 
1792 	if (ftl_reloc_is_defrag_active(dev->reloc)) {
1793 		return false;
1794 	}
1795 
1796 	if (dev->num_free <= limit->thld) {
1797 		return true;
1798 	}
1799 
1800 	return false;
1801 }
1802 
1803 static double
1804 ftl_band_calc_merit(struct ftl_band *band, size_t *threshold_valid)
1805 {
1806 	size_t usable, valid, invalid;
1807 	double vld_ratio;
1808 
1809 	/* If the band doesn't have any usable lbks it's of no use */
1810 	usable = ftl_band_num_usable_lbks(band);
1811 	if (usable == 0) {
1812 		return 0.0;
1813 	}
1814 
1815 	valid =  threshold_valid ? (usable - *threshold_valid) : band->lba_map.num_vld;
1816 	invalid = usable - valid;
1817 
1818 	/* Add one to avoid division by 0 */
1819 	vld_ratio = (double)invalid / (double)(valid + 1);
1820 	return vld_ratio * ftl_band_age(band);
1821 }
1822 
1823 static bool
1824 ftl_band_needs_defrag(struct ftl_band *band, struct spdk_ftl_dev *dev)
1825 {
1826 	struct spdk_ftl_conf *conf = &dev->conf;
1827 	size_t thld_vld;
1828 
1829 	/* If we're in dire need of free bands, every band is worth defragging */
1830 	if (ftl_current_limit(dev) == SPDK_FTL_LIMIT_CRIT) {
1831 		return true;
1832 	}
1833 
1834 	thld_vld = (ftl_band_num_usable_lbks(band) * conf->invalid_thld) / 100;
1835 
1836 	return band->merit > ftl_band_calc_merit(band, &thld_vld);
1837 }
1838 
1839 static struct ftl_band *
1840 ftl_select_defrag_band(struct spdk_ftl_dev *dev)
1841 {
1842 	struct ftl_band *band, *mband = NULL;
1843 	double merit = 0;
1844 
1845 	LIST_FOREACH(band, &dev->shut_bands, list_entry) {
1846 		assert(band->state == FTL_BAND_STATE_CLOSED);
1847 		band->merit = ftl_band_calc_merit(band, NULL);
1848 		if (band->merit > merit) {
1849 			merit = band->merit;
1850 			mband = band;
1851 		}
1852 	}
1853 
1854 	if (mband && !ftl_band_needs_defrag(mband, dev)) {
1855 		mband = NULL;
1856 	}
1857 
1858 	return mband;
1859 }
1860 
1861 static void
1862 ftl_process_relocs(struct spdk_ftl_dev *dev)
1863 {
1864 	struct ftl_band *band;
1865 
1866 	if (ftl_dev_needs_defrag(dev)) {
1867 		band = ftl_select_defrag_band(dev);
1868 		if (band) {
1869 			ftl_reloc_add(dev->reloc, band, 0, ftl_num_band_lbks(dev), 0, true);
1870 			ftl_trace_defrag_band(dev, band);
1871 		}
1872 	}
1873 
1874 	ftl_reloc(dev->reloc);
1875 }
1876 
1877 int
1878 ftl_current_limit(const struct spdk_ftl_dev *dev)
1879 {
1880 	return dev->limit;
1881 }
1882 
1883 void
1884 spdk_ftl_dev_get_attrs(const struct spdk_ftl_dev *dev, struct spdk_ftl_attrs *attrs)
1885 {
1886 	attrs->uuid = dev->uuid;
1887 	attrs->lbk_cnt = dev->num_lbas;
1888 	attrs->lbk_size = FTL_BLOCK_SIZE;
1889 	attrs->range = dev->range;
1890 	attrs->cache_bdev_desc = dev->nv_cache.bdev_desc;
1891 	attrs->num_chunks = dev->geo.num_chk;
1892 	attrs->chunk_size = dev->geo.clba;
1893 	attrs->conf = dev->conf;
1894 }
1895 
1896 static void
1897 _ftl_io_write(void *ctx)
1898 {
1899 	ftl_io_write((struct ftl_io *)ctx);
1900 }
1901 
1902 static int
1903 ftl_rwb_fill_leaf(struct ftl_io *io)
1904 {
1905 	int rc;
1906 
1907 	rc = ftl_rwb_fill(io);
1908 	if (rc == -EAGAIN) {
1909 		spdk_thread_send_msg(spdk_io_channel_get_thread(io->ioch),
1910 				     _ftl_io_write, io);
1911 		return 0;
1912 	}
1913 
1914 	return rc;
1915 }
1916 
1917 static int
1918 ftl_submit_write_leaf(struct ftl_io *io)
1919 {
1920 	int rc;
1921 
1922 	rc = ftl_submit_write(ftl_wptr_from_band(io->band), io);
1923 	if (rc == -EAGAIN) {
1924 		/* EAGAIN means that the request was put on the pending queue */
1925 		return 0;
1926 	}
1927 
1928 	return rc;
1929 }
1930 
1931 void
1932 ftl_io_write(struct ftl_io *io)
1933 {
1934 	struct spdk_ftl_dev *dev = io->dev;
1935 
1936 	/* For normal IOs we just need to copy the data onto the rwb */
1937 	if (!(io->flags & FTL_IO_MD)) {
1938 		ftl_io_call_foreach_child(io, ftl_rwb_fill_leaf);
1939 	} else {
1940 		/* Metadata has its own buffer, so it doesn't have to be copied, so just */
1941 		/* send it the the core thread and schedule the write immediately */
1942 		if (ftl_check_core_thread(dev)) {
1943 			ftl_io_call_foreach_child(io, ftl_submit_write_leaf);
1944 		} else {
1945 			spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_write, io);
1946 		}
1947 	}
1948 }
1949 
1950 int
1951 spdk_ftl_write(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
1952 	       struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
1953 {
1954 	struct ftl_io *io;
1955 
1956 	if (iov_cnt == 0) {
1957 		return -EINVAL;
1958 	}
1959 
1960 	if (lba_cnt == 0) {
1961 		return -EINVAL;
1962 	}
1963 
1964 	if (lba_cnt != ftl_iovec_num_lbks(iov, iov_cnt)) {
1965 		return -EINVAL;
1966 	}
1967 
1968 	if (!dev->initialized) {
1969 		return -EBUSY;
1970 	}
1971 
1972 	io = ftl_io_user_init(ch, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_WRITE);
1973 	if (!io) {
1974 		return -ENOMEM;
1975 	}
1976 
1977 	ftl_io_write(io);
1978 
1979 	return 0;
1980 }
1981 
1982 static int
1983 ftl_io_read_leaf(struct ftl_io *io)
1984 {
1985 	int rc;
1986 
1987 	rc = ftl_submit_read(io);
1988 	if (rc == -ENOMEM) {
1989 		/* ENOMEM means that the request was put on a pending queue */
1990 		return 0;
1991 	}
1992 
1993 	return rc;
1994 }
1995 
1996 static void
1997 _ftl_io_read(void *arg)
1998 {
1999 	ftl_io_read((struct ftl_io *)arg);
2000 }
2001 
2002 void
2003 ftl_io_read(struct ftl_io *io)
2004 {
2005 	struct spdk_ftl_dev *dev = io->dev;
2006 
2007 	if (ftl_check_read_thread(dev)) {
2008 		ftl_io_call_foreach_child(io, ftl_io_read_leaf);
2009 	} else {
2010 		spdk_thread_send_msg(ftl_get_read_thread(dev), _ftl_io_read, io);
2011 	}
2012 }
2013 
2014 int
2015 spdk_ftl_read(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
2016 	      struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
2017 {
2018 	struct ftl_io *io;
2019 
2020 	if (iov_cnt == 0) {
2021 		return -EINVAL;
2022 	}
2023 
2024 	if (lba_cnt == 0) {
2025 		return -EINVAL;
2026 	}
2027 
2028 	if (lba_cnt != ftl_iovec_num_lbks(iov, iov_cnt)) {
2029 		return -EINVAL;
2030 	}
2031 
2032 	if (!dev->initialized) {
2033 		return -EBUSY;
2034 	}
2035 
2036 	io = ftl_io_user_init(ch, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_READ);
2037 	if (!io) {
2038 		return -ENOMEM;
2039 	}
2040 
2041 	ftl_io_read(io);
2042 	return 0;
2043 }
2044 
2045 static struct ftl_flush *
2046 ftl_flush_init(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2047 {
2048 	struct ftl_flush *flush;
2049 	struct ftl_rwb *rwb = dev->rwb;
2050 
2051 	flush = calloc(1, sizeof(*flush));
2052 	if (!flush) {
2053 		return NULL;
2054 	}
2055 
2056 	flush->bmap = spdk_bit_array_create(ftl_rwb_num_batches(rwb));
2057 	if (!flush->bmap) {
2058 		goto error;
2059 	}
2060 
2061 	flush->dev = dev;
2062 	flush->cb.fn = cb_fn;
2063 	flush->cb.ctx = cb_arg;
2064 
2065 	return flush;
2066 error:
2067 	free(flush);
2068 	return NULL;
2069 }
2070 
2071 static void
2072 _ftl_flush(void *ctx)
2073 {
2074 	struct ftl_flush *flush = ctx;
2075 	struct spdk_ftl_dev *dev = flush->dev;
2076 	struct ftl_rwb *rwb = dev->rwb;
2077 	struct ftl_rwb_batch *batch;
2078 
2079 	/* Attach flush object to all non-empty batches */
2080 	ftl_rwb_foreach_batch(batch, rwb) {
2081 		if (!ftl_rwb_batch_empty(batch)) {
2082 			spdk_bit_array_set(flush->bmap, ftl_rwb_batch_get_offset(batch));
2083 			flush->num_req++;
2084 		}
2085 	}
2086 
2087 	LIST_INSERT_HEAD(&dev->flush_list, flush, list_entry);
2088 
2089 	/* If the RWB was already empty, the flush can be completed right away */
2090 	if (!flush->num_req) {
2091 		ftl_complete_flush(flush);
2092 	}
2093 }
2094 
2095 int
2096 ftl_flush_rwb(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2097 {
2098 	struct ftl_flush *flush;
2099 
2100 	flush = ftl_flush_init(dev, cb_fn, cb_arg);
2101 	if (!flush) {
2102 		return -ENOMEM;
2103 	}
2104 
2105 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_flush, flush);
2106 	return 0;
2107 }
2108 
2109 int
2110 spdk_ftl_flush(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2111 {
2112 	if (!dev->initialized) {
2113 		return -EBUSY;
2114 	}
2115 
2116 	return ftl_flush_rwb(dev, cb_fn, cb_arg);
2117 }
2118 
2119 static void
2120 _ftl_process_anm_event(void *ctx)
2121 {
2122 	ftl_process_anm_event((struct ftl_anm_event *)ctx);
2123 }
2124 
2125 void
2126 ftl_process_anm_event(struct ftl_anm_event *event)
2127 {
2128 	struct spdk_ftl_dev *dev = event->dev;
2129 	struct ftl_band *band;
2130 	size_t lbkoff;
2131 
2132 	/* Drop any ANM requests until the device is initialized */
2133 	if (!dev->initialized) {
2134 		ftl_anm_event_complete(event);
2135 		return;
2136 	}
2137 
2138 	if (!ftl_check_core_thread(dev)) {
2139 		spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_process_anm_event, event);
2140 		return;
2141 	}
2142 
2143 	band = ftl_band_from_ppa(dev, event->ppa);
2144 	lbkoff = ftl_band_lbkoff_from_ppa(band, event->ppa);
2145 
2146 	ftl_reloc_add(dev->reloc, band, lbkoff, event->num_lbks, 0, false);
2147 	ftl_anm_event_complete(event);
2148 }
2149 
2150 bool
2151 ftl_ppa_is_written(struct ftl_band *band, struct ftl_ppa ppa)
2152 {
2153 	struct ftl_chunk *chunk = ftl_band_chunk_from_ppa(band, ppa);
2154 
2155 	return ppa.lbk < chunk->write_offset;
2156 }
2157 
2158 static void
2159 ftl_process_retry_queue(struct spdk_ftl_dev *dev)
2160 {
2161 	struct ftl_io *io;
2162 	int rc;
2163 
2164 	while (!TAILQ_EMPTY(&dev->retry_queue)) {
2165 		io = TAILQ_FIRST(&dev->retry_queue);
2166 
2167 		/* Retry only if IO is still healthy */
2168 		if (spdk_likely(io->status == 0)) {
2169 			rc = ftl_submit_read(io);
2170 			if (rc == -ENOMEM) {
2171 				break;
2172 			}
2173 		}
2174 
2175 		io->flags &= ~FTL_IO_RETRY;
2176 		TAILQ_REMOVE(&dev->retry_queue, io, retry_entry);
2177 
2178 		if (ftl_io_done(io)) {
2179 			ftl_io_complete(io);
2180 		}
2181 	}
2182 }
2183 
2184 int
2185 ftl_task_read(void *ctx)
2186 {
2187 	struct ftl_thread *thread = ctx;
2188 	struct spdk_ftl_dev *dev = thread->dev;
2189 	struct spdk_nvme_qpair *qpair = ftl_get_read_qpair(dev);
2190 	size_t num_completed;
2191 
2192 	if (dev->halt) {
2193 		if (ftl_shutdown_complete(dev)) {
2194 			spdk_poller_unregister(&thread->poller);
2195 			return 0;
2196 		}
2197 	}
2198 
2199 	num_completed = spdk_nvme_qpair_process_completions(qpair, 0);
2200 
2201 	if (num_completed && !TAILQ_EMPTY(&dev->retry_queue)) {
2202 		ftl_process_retry_queue(dev);
2203 	}
2204 
2205 	return num_completed;
2206 }
2207 
2208 int
2209 ftl_task_core(void *ctx)
2210 {
2211 	struct ftl_thread *thread = ctx;
2212 	struct spdk_ftl_dev *dev = thread->dev;
2213 	struct spdk_nvme_qpair *qpair = ftl_get_write_qpair(dev);
2214 
2215 	if (dev->halt) {
2216 		if (ftl_shutdown_complete(dev)) {
2217 			spdk_poller_unregister(&thread->poller);
2218 			return 0;
2219 		}
2220 	}
2221 
2222 	ftl_process_writes(dev);
2223 	spdk_nvme_qpair_process_completions(qpair, 0);
2224 	ftl_process_relocs(dev);
2225 
2226 	return 0;
2227 }
2228 
2229 SPDK_LOG_REGISTER_COMPONENT("ftl_core", SPDK_LOG_FTL_CORE)
2230