xref: /spdk/lib/ftl/ftl_core.c (revision ae7b5890ef728af40bd233a5011b924c482603bf)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/likely.h"
35 #include "spdk/stdinc.h"
36 #include "spdk/nvme.h"
37 #include "spdk/io_channel.h"
38 #include "spdk/bdev_module.h"
39 #include "spdk/string.h"
40 #include "spdk_internal/log.h"
41 #include "spdk/ftl.h"
42 #include "spdk/crc32.h"
43 
44 #include "ftl_core.h"
45 #include "ftl_band.h"
46 #include "ftl_io.h"
47 #include "ftl_anm.h"
48 #include "ftl_rwb.h"
49 #include "ftl_debug.h"
50 #include "ftl_reloc.h"
51 
52 struct ftl_band_flush {
53 	struct spdk_ftl_dev		*dev;
54 	/* Number of bands left to be flushed */
55 	size_t				num_bands;
56 	/* User callback */
57 	spdk_ftl_fn			cb_fn;
58 	/* Callback's argument */
59 	void				*cb_arg;
60 	/* List link */
61 	LIST_ENTRY(ftl_band_flush)	list_entry;
62 };
63 
64 struct ftl_wptr {
65 	/* Owner device */
66 	struct spdk_ftl_dev		*dev;
67 
68 	/* Current PPA */
69 	struct ftl_ppa			ppa;
70 
71 	/* Band currently being written to */
72 	struct ftl_band			*band;
73 
74 	/* Current logical block's offset */
75 	uint64_t			offset;
76 
77 	/* Current erase block */
78 	struct ftl_chunk		*chunk;
79 
80 	/* Pending IO queue */
81 	TAILQ_HEAD(, ftl_io)		pending_queue;
82 
83 	/* List link */
84 	LIST_ENTRY(ftl_wptr)		list_entry;
85 
86 	/*
87 	 * If setup in direct mode, there will be no offset or band state update after IO.
88 	 * The PPA is not assigned by wptr, and is instead taken directly from the request.
89 	 */
90 	bool				direct_mode;
91 
92 	/* Number of outstanding write requests */
93 	uint32_t			num_outstanding;
94 
95 	/* Marks that the band related to this wptr needs to be closed as soon as possible */
96 	bool				flush;
97 };
98 
99 struct ftl_flush {
100 	/* Owner device */
101 	struct spdk_ftl_dev		*dev;
102 
103 	/* Number of batches to wait for */
104 	size_t				num_req;
105 
106 	/* Callback */
107 	struct {
108 		spdk_ftl_fn		fn;
109 		void			*ctx;
110 	} cb;
111 
112 	/* Batch bitmap */
113 	struct spdk_bit_array		*bmap;
114 
115 	/* List link */
116 	LIST_ENTRY(ftl_flush)		list_entry;
117 };
118 
119 static int
120 ftl_rwb_flags_from_io(const struct ftl_io *io)
121 {
122 	int valid_flags = FTL_IO_INTERNAL | FTL_IO_WEAK | FTL_IO_PAD;
123 	return io->flags & valid_flags;
124 }
125 
126 static int
127 ftl_rwb_entry_weak(const struct ftl_rwb_entry *entry)
128 {
129 	return entry->flags & FTL_IO_WEAK;
130 }
131 
132 static void
133 ftl_wptr_free(struct ftl_wptr *wptr)
134 {
135 	if (!wptr) {
136 		return;
137 	}
138 
139 	free(wptr);
140 }
141 
142 static void
143 ftl_remove_wptr(struct ftl_wptr *wptr)
144 {
145 	struct spdk_ftl_dev *dev = wptr->dev;
146 	struct ftl_band_flush *flush, *tmp;
147 
148 	if (spdk_unlikely(wptr->flush)) {
149 		LIST_FOREACH_SAFE(flush, &dev->band_flush_list, list_entry, tmp) {
150 			assert(flush->num_bands > 0);
151 			if (--flush->num_bands == 0) {
152 				flush->cb_fn(flush->cb_arg, 0);
153 				LIST_REMOVE(flush, list_entry);
154 				free(flush);
155 			}
156 		}
157 	}
158 
159 	LIST_REMOVE(wptr, list_entry);
160 	ftl_wptr_free(wptr);
161 }
162 
163 static void
164 ftl_io_cmpl_cb(void *arg, const struct spdk_nvme_cpl *status)
165 {
166 	struct ftl_io *io = arg;
167 
168 	if (spdk_nvme_cpl_is_error(status)) {
169 		ftl_io_process_error(io, status);
170 	}
171 
172 	ftl_trace_completion(io->dev, io, FTL_TRACE_COMPLETION_DISK);
173 
174 	ftl_io_dec_req(io);
175 	if (ftl_io_done(io)) {
176 		ftl_io_complete(io);
177 	}
178 }
179 
180 static void
181 ftl_halt_writes(struct spdk_ftl_dev *dev, struct ftl_band *band)
182 {
183 	struct ftl_wptr *wptr = NULL;
184 
185 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
186 		if (wptr->band == band) {
187 			break;
188 		}
189 	}
190 
191 	/* If the band already has the high_prio flag set, other writes must */
192 	/* have failed earlier, so it's already taken care of. */
193 	if (band->high_prio) {
194 		assert(wptr == NULL);
195 		return;
196 	}
197 
198 	ftl_band_write_failed(band);
199 	ftl_remove_wptr(wptr);
200 }
201 
202 static struct ftl_wptr *
203 ftl_wptr_from_band(struct ftl_band *band)
204 {
205 	struct spdk_ftl_dev *dev = band->dev;
206 	struct ftl_wptr *wptr = NULL;
207 
208 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
209 		if (wptr->band == band) {
210 			return wptr;
211 		}
212 	}
213 
214 	return NULL;
215 }
216 
217 static void
218 ftl_md_write_fail(struct ftl_io *io, int status)
219 {
220 	struct ftl_band *band = io->band;
221 	struct ftl_wptr *wptr;
222 	char buf[128];
223 
224 	wptr = ftl_wptr_from_band(band);
225 	assert(wptr);
226 
227 	SPDK_ERRLOG("Metadata write failed @ppa: %s, status: %d\n",
228 		    ftl_ppa2str(wptr->ppa, buf, sizeof(buf)), status);
229 
230 	ftl_halt_writes(io->dev, band);
231 }
232 
233 static void
234 ftl_md_write_cb(struct ftl_io *io, void *arg, int status)
235 {
236 	struct spdk_ftl_dev *dev = io->dev;
237 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
238 	struct ftl_band *band = io->band;
239 	struct ftl_wptr *wptr;
240 	size_t id;
241 
242 	wptr = ftl_wptr_from_band(band);
243 	assert(wptr);
244 
245 	if (status) {
246 		ftl_md_write_fail(io, status);
247 		return;
248 	}
249 
250 	ftl_band_set_next_state(band);
251 	if (band->state == FTL_BAND_STATE_CLOSED) {
252 		if (ftl_dev_has_nv_cache(dev)) {
253 			pthread_spin_lock(&nv_cache->lock);
254 			nv_cache->num_available += ftl_band_user_lbks(band);
255 
256 			if (spdk_unlikely(nv_cache->num_available > nv_cache->num_data_blocks)) {
257 				nv_cache->num_available = nv_cache->num_data_blocks;
258 			}
259 			pthread_spin_unlock(&nv_cache->lock);
260 		}
261 
262 		/*
263 		 * Go through the reloc_bitmap, checking for all the bands that had its data moved
264 		 * onto current band and update their counters to allow them to be used for writing
265 		 * (once they're closed and empty).
266 		 */
267 		for (id = 0; id < ftl_dev_num_bands(dev); ++id) {
268 			if (spdk_bit_array_get(band->reloc_bitmap, id)) {
269 				assert(dev->bands[id].num_reloc_bands > 0);
270 				dev->bands[id].num_reloc_bands--;
271 
272 				spdk_bit_array_clear(band->reloc_bitmap, id);
273 			}
274 		}
275 
276 		ftl_remove_wptr(wptr);
277 	}
278 }
279 
280 static int
281 ftl_ppa_read_next_ppa(struct ftl_io *io, struct ftl_ppa *ppa)
282 {
283 	struct spdk_ftl_dev *dev = io->dev;
284 	size_t lbk_cnt, max_lbks;
285 
286 	assert(ftl_io_mode_ppa(io));
287 	assert(io->iov_pos < io->iov_cnt);
288 
289 	if (io->pos == 0) {
290 		*ppa = io->ppa;
291 	} else {
292 		*ppa = ftl_band_next_xfer_ppa(io->band, io->ppa, io->pos);
293 	}
294 
295 	assert(!ftl_ppa_invalid(*ppa));
296 
297 	/* Metadata has to be read in the way it's written (jumping across */
298 	/* the chunks in xfer_size increments) */
299 	if (io->flags & FTL_IO_MD) {
300 		max_lbks = dev->xfer_size - (ppa->lbk % dev->xfer_size);
301 		lbk_cnt = spdk_min(ftl_io_iovec_len_left(io), max_lbks);
302 		assert(ppa->lbk / dev->xfer_size == (ppa->lbk + lbk_cnt - 1) / dev->xfer_size);
303 	} else {
304 		lbk_cnt = ftl_io_iovec_len_left(io);
305 	}
306 
307 	return lbk_cnt;
308 }
309 
310 static int
311 ftl_wptr_close_band(struct ftl_wptr *wptr)
312 {
313 	struct ftl_band *band = wptr->band;
314 
315 	ftl_band_set_state(band, FTL_BAND_STATE_CLOSING);
316 
317 	return ftl_band_write_tail_md(band, ftl_md_write_cb);
318 }
319 
320 static int
321 ftl_wptr_open_band(struct ftl_wptr *wptr)
322 {
323 	struct ftl_band *band = wptr->band;
324 
325 	assert(ftl_band_chunk_is_first(band, wptr->chunk));
326 	assert(band->lba_map.num_vld == 0);
327 
328 	ftl_band_clear_lba_map(band);
329 
330 	assert(band->state == FTL_BAND_STATE_PREP);
331 	ftl_band_set_state(band, FTL_BAND_STATE_OPENING);
332 
333 	return ftl_band_write_head_md(band, ftl_md_write_cb);
334 }
335 
336 static int
337 ftl_submit_erase(struct ftl_io *io)
338 {
339 	struct spdk_ftl_dev *dev = io->dev;
340 	struct ftl_band *band = io->band;
341 	struct ftl_ppa ppa = io->ppa;
342 	struct ftl_chunk *chunk;
343 	uint64_t ppa_packed;
344 	int rc = 0;
345 	size_t i;
346 
347 	for (i = 0; i < io->lbk_cnt; ++i) {
348 		if (i != 0) {
349 			chunk = ftl_band_next_chunk(band, ftl_band_chunk_from_ppa(band, ppa));
350 			assert(chunk->state == FTL_CHUNK_STATE_CLOSED ||
351 			       chunk->state == FTL_CHUNK_STATE_VACANT);
352 			ppa = chunk->start_ppa;
353 		}
354 
355 		assert(ppa.lbk == 0);
356 		ppa_packed = ftl_ppa_addr_pack(dev, ppa);
357 
358 		ftl_trace_submission(dev, io, ppa, 1);
359 		rc = spdk_nvme_ocssd_ns_cmd_vector_reset(dev->ns, ftl_get_write_qpair(dev),
360 				&ppa_packed, 1, NULL, ftl_io_cmpl_cb, io);
361 		if (spdk_unlikely(rc)) {
362 			ftl_io_fail(io, rc);
363 			SPDK_ERRLOG("Vector reset failed with status: %d\n", rc);
364 			break;
365 		}
366 
367 		ftl_io_inc_req(io);
368 		ftl_io_advance(io, 1);
369 	}
370 
371 	if (ftl_io_done(io)) {
372 		ftl_io_complete(io);
373 	}
374 
375 	return rc;
376 }
377 
378 static void
379 _ftl_io_erase(void *ctx)
380 {
381 	ftl_io_erase((struct ftl_io *)ctx);
382 }
383 
384 static bool
385 ftl_check_core_thread(const struct spdk_ftl_dev *dev)
386 {
387 	return dev->core_thread.thread == spdk_get_thread();
388 }
389 
390 static bool
391 ftl_check_read_thread(const struct spdk_ftl_dev *dev)
392 {
393 	return dev->read_thread.thread == spdk_get_thread();
394 }
395 
396 int
397 ftl_io_erase(struct ftl_io *io)
398 {
399 	struct spdk_ftl_dev *dev = io->dev;
400 
401 	if (ftl_check_core_thread(dev)) {
402 		return ftl_submit_erase(io);
403 	}
404 
405 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_erase, io);
406 	return 0;
407 }
408 
409 static struct ftl_band *
410 ftl_next_write_band(struct spdk_ftl_dev *dev)
411 {
412 	struct ftl_band *band;
413 
414 	/* Find a free band that has all of its data moved onto other closed bands */
415 	LIST_FOREACH(band, &dev->free_bands, list_entry) {
416 		assert(band->state == FTL_BAND_STATE_FREE);
417 		if (band->num_reloc_bands == 0 && band->num_reloc_blocks == 0) {
418 			break;
419 		}
420 	}
421 
422 	if (spdk_unlikely(!band)) {
423 		return NULL;
424 	}
425 
426 	if (ftl_band_erase(band)) {
427 		/* TODO: handle erase failure */
428 		return NULL;
429 	}
430 
431 	return band;
432 }
433 
434 static struct ftl_band *
435 ftl_next_wptr_band(struct spdk_ftl_dev *dev)
436 {
437 	struct ftl_band *band;
438 
439 	if (!dev->next_band) {
440 		band = ftl_next_write_band(dev);
441 	} else {
442 		assert(dev->next_band->state == FTL_BAND_STATE_PREP);
443 		band = dev->next_band;
444 		dev->next_band = NULL;
445 	}
446 
447 	return band;
448 }
449 
450 static struct ftl_wptr *
451 ftl_wptr_init(struct ftl_band *band)
452 {
453 	struct spdk_ftl_dev *dev = band->dev;
454 	struct ftl_wptr *wptr;
455 
456 	wptr = calloc(1, sizeof(*wptr));
457 	if (!wptr) {
458 		return NULL;
459 	}
460 
461 	wptr->dev = dev;
462 	wptr->band = band;
463 	wptr->chunk = CIRCLEQ_FIRST(&band->chunks);
464 	wptr->ppa = wptr->chunk->start_ppa;
465 	TAILQ_INIT(&wptr->pending_queue);
466 
467 	return wptr;
468 }
469 
470 static int
471 ftl_add_direct_wptr(struct ftl_band *band)
472 {
473 	struct spdk_ftl_dev *dev = band->dev;
474 	struct ftl_wptr *wptr;
475 
476 	assert(band->state == FTL_BAND_STATE_OPEN);
477 
478 	wptr = ftl_wptr_init(band);
479 	if (!wptr) {
480 		return -1;
481 	}
482 
483 	wptr->direct_mode = true;
484 
485 	if (ftl_band_alloc_lba_map(band)) {
486 		ftl_wptr_free(wptr);
487 		return -1;
488 	}
489 
490 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
491 
492 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: direct band %u\n", band->id);
493 	ftl_trace_write_band(dev, band);
494 	return 0;
495 }
496 
497 static void
498 ftl_close_direct_wptr(struct ftl_band *band)
499 {
500 	struct ftl_wptr *wptr = ftl_wptr_from_band(band);
501 
502 	assert(wptr);
503 	assert(wptr->direct_mode);
504 	assert(band->state == FTL_BAND_STATE_CLOSED);
505 
506 	ftl_band_release_lba_map(band);
507 
508 	ftl_remove_wptr(wptr);
509 }
510 
511 int
512 ftl_band_set_direct_access(struct ftl_band *band, bool access)
513 {
514 	if (access) {
515 		return ftl_add_direct_wptr(band);
516 	} else {
517 		ftl_close_direct_wptr(band);
518 		return 0;
519 	}
520 }
521 
522 static int
523 ftl_add_wptr(struct spdk_ftl_dev *dev)
524 {
525 	struct ftl_band *band;
526 	struct ftl_wptr *wptr;
527 
528 	band = ftl_next_wptr_band(dev);
529 	if (!band) {
530 		return -1;
531 	}
532 
533 	wptr = ftl_wptr_init(band);
534 	if (!wptr) {
535 		return -1;
536 	}
537 
538 	if (ftl_band_write_prep(band)) {
539 		ftl_wptr_free(wptr);
540 		return -1;
541 	}
542 
543 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
544 
545 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: band %u\n", band->id);
546 	ftl_trace_write_band(dev, band);
547 	return 0;
548 }
549 
550 static void
551 ftl_wptr_advance(struct ftl_wptr *wptr, size_t xfer_size)
552 {
553 	struct ftl_band *band = wptr->band;
554 	struct spdk_ftl_dev *dev = wptr->dev;
555 	struct spdk_ftl_conf *conf = &dev->conf;
556 	size_t next_thld;
557 
558 	if (spdk_unlikely(wptr->direct_mode)) {
559 		return;
560 	}
561 
562 	wptr->offset += xfer_size;
563 	next_thld = (ftl_band_num_usable_lbks(band) * conf->band_thld) / 100;
564 
565 	if (ftl_band_full(band, wptr->offset)) {
566 		ftl_band_set_state(band, FTL_BAND_STATE_FULL);
567 	}
568 
569 	wptr->chunk->busy = true;
570 	wptr->ppa = ftl_band_next_xfer_ppa(band, wptr->ppa, xfer_size);
571 	wptr->chunk = ftl_band_next_operational_chunk(band, wptr->chunk);
572 
573 	assert(!ftl_ppa_invalid(wptr->ppa));
574 
575 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: grp:%d, pu:%d chunk:%d, lbk:%u\n",
576 		      wptr->ppa.grp, wptr->ppa.pu, wptr->ppa.chk, wptr->ppa.lbk);
577 
578 	if (wptr->offset >= next_thld && !dev->next_band) {
579 		dev->next_band = ftl_next_write_band(dev);
580 	}
581 }
582 
583 static size_t
584 ftl_wptr_user_lbks_left(const struct ftl_wptr *wptr)
585 {
586 	return ftl_band_user_lbks_left(wptr->band, wptr->offset);
587 }
588 
589 static int
590 ftl_wptr_ready(struct ftl_wptr *wptr)
591 {
592 	struct ftl_band *band = wptr->band;
593 
594 	/* TODO: add handling of empty bands */
595 
596 	if (spdk_unlikely(!ftl_chunk_is_writable(wptr->chunk))) {
597 		/* Erasing band may fail after it was assigned to wptr. */
598 		if (spdk_unlikely(wptr->chunk->state == FTL_CHUNK_STATE_BAD)) {
599 			ftl_wptr_advance(wptr, wptr->dev->xfer_size);
600 		}
601 		return 0;
602 	}
603 
604 	/* If we're in the process of writing metadata, wait till it is */
605 	/* completed. */
606 	/* TODO: we should probably change bands once we're writing tail md */
607 	if (ftl_band_state_changing(band)) {
608 		return 0;
609 	}
610 
611 	if (band->state == FTL_BAND_STATE_FULL) {
612 		if (wptr->num_outstanding == 0) {
613 			if (ftl_wptr_close_band(wptr)) {
614 				/* TODO: need recovery here */
615 				assert(false);
616 			}
617 		}
618 
619 		return 0;
620 	}
621 
622 	if (band->state != FTL_BAND_STATE_OPEN) {
623 		if (ftl_wptr_open_band(wptr)) {
624 			/* TODO: need recovery here */
625 			assert(false);
626 		}
627 
628 		return 0;
629 	}
630 
631 	return 1;
632 }
633 
634 int
635 ftl_flush_active_bands(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
636 {
637 	struct ftl_wptr *wptr;
638 	struct ftl_band_flush *flush;
639 
640 	assert(ftl_get_core_thread(dev) == spdk_get_thread());
641 
642 	flush = calloc(1, sizeof(*flush));
643 	if (spdk_unlikely(!flush)) {
644 		return -ENOMEM;
645 	}
646 
647 	LIST_INSERT_HEAD(&dev->band_flush_list, flush, list_entry);
648 
649 	flush->cb_fn = cb_fn;
650 	flush->cb_arg = cb_arg;
651 	flush->dev = dev;
652 
653 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
654 		wptr->flush = true;
655 		flush->num_bands++;
656 	}
657 
658 	return 0;
659 }
660 
661 static const struct spdk_ftl_limit *
662 ftl_get_limit(const struct spdk_ftl_dev *dev, int type)
663 {
664 	assert(type < SPDK_FTL_LIMIT_MAX);
665 	return &dev->conf.limits[type];
666 }
667 
668 static bool
669 ftl_cache_lba_valid(struct spdk_ftl_dev *dev, struct ftl_rwb_entry *entry)
670 {
671 	struct ftl_ppa ppa;
672 
673 	/* If the LBA is invalid don't bother checking the md and l2p */
674 	if (spdk_unlikely(entry->lba == FTL_LBA_INVALID)) {
675 		return false;
676 	}
677 
678 	ppa = ftl_l2p_get(dev, entry->lba);
679 	if (!(ftl_ppa_cached(ppa) && ppa.offset == entry->pos)) {
680 		return false;
681 	}
682 
683 	return true;
684 }
685 
686 static void
687 ftl_evict_cache_entry(struct spdk_ftl_dev *dev, struct ftl_rwb_entry *entry)
688 {
689 	pthread_spin_lock(&entry->lock);
690 
691 	if (!ftl_rwb_entry_valid(entry)) {
692 		goto unlock;
693 	}
694 
695 	/* If the l2p wasn't updated and still points at the entry, fill it with the */
696 	/* on-disk PPA and clear the cache status bit. Otherwise, skip the l2p update */
697 	/* and just clear the cache status. */
698 	if (!ftl_cache_lba_valid(dev, entry)) {
699 		goto clear;
700 	}
701 
702 	ftl_l2p_set(dev, entry->lba, entry->ppa);
703 clear:
704 	ftl_rwb_entry_invalidate(entry);
705 unlock:
706 	pthread_spin_unlock(&entry->lock);
707 }
708 
709 static struct ftl_rwb_entry *
710 ftl_acquire_entry(struct spdk_ftl_dev *dev, int flags)
711 {
712 	struct ftl_rwb_entry *entry;
713 
714 	entry = ftl_rwb_acquire(dev->rwb, ftl_rwb_type_from_flags(flags));
715 	if (!entry) {
716 		return NULL;
717 	}
718 
719 	ftl_evict_cache_entry(dev, entry);
720 
721 	entry->flags = flags;
722 	return entry;
723 }
724 
725 static void
726 ftl_rwb_pad(struct spdk_ftl_dev *dev, size_t size)
727 {
728 	struct ftl_rwb_entry *entry;
729 	int flags = FTL_IO_PAD | FTL_IO_INTERNAL;
730 
731 	for (size_t i = 0; i < size; ++i) {
732 		entry = ftl_acquire_entry(dev, flags);
733 		if (!entry) {
734 			break;
735 		}
736 
737 		entry->lba = FTL_LBA_INVALID;
738 		entry->ppa = ftl_to_ppa(FTL_PPA_INVALID);
739 		memset(entry->data, 0, FTL_BLOCK_SIZE);
740 		ftl_rwb_push(entry);
741 	}
742 }
743 
744 static void
745 ftl_remove_free_bands(struct spdk_ftl_dev *dev)
746 {
747 	while (!LIST_EMPTY(&dev->free_bands)) {
748 		LIST_REMOVE(LIST_FIRST(&dev->free_bands), list_entry);
749 	}
750 
751 	dev->next_band = NULL;
752 }
753 
754 static void
755 ftl_wptr_pad_band(struct ftl_wptr *wptr)
756 {
757 	struct spdk_ftl_dev *dev = wptr->dev;
758 	size_t size = ftl_rwb_num_acquired(dev->rwb, FTL_RWB_TYPE_INTERNAL) +
759 		      ftl_rwb_num_acquired(dev->rwb, FTL_RWB_TYPE_USER);
760 	size_t blocks_left, rwb_size, pad_size;
761 
762 	blocks_left = ftl_wptr_user_lbks_left(wptr);
763 	rwb_size = ftl_rwb_size(dev->rwb) - size;
764 	pad_size = spdk_min(blocks_left, rwb_size);
765 
766 	/* Pad write buffer until band is full */
767 	ftl_rwb_pad(dev, pad_size);
768 }
769 
770 static void
771 ftl_wptr_process_shutdown(struct ftl_wptr *wptr)
772 {
773 	struct spdk_ftl_dev *dev = wptr->dev;
774 	size_t size = ftl_rwb_num_acquired(dev->rwb, FTL_RWB_TYPE_INTERNAL) +
775 		      ftl_rwb_num_acquired(dev->rwb, FTL_RWB_TYPE_USER);
776 	size_t num_active = dev->xfer_size * ftl_rwb_get_active_batches(dev->rwb);
777 
778 	num_active = num_active ? num_active : dev->xfer_size;
779 	if (size >= num_active) {
780 		return;
781 	}
782 
783 	/* If we reach this point we need to remove free bands */
784 	/* and pad current wptr band to the end */
785 	if (ftl_rwb_get_active_batches(dev->rwb) <= 1) {
786 		ftl_remove_free_bands(dev);
787 	}
788 
789 	ftl_wptr_pad_band(wptr);
790 }
791 
792 static int
793 ftl_shutdown_complete(struct spdk_ftl_dev *dev)
794 {
795 	return !__atomic_load_n(&dev->num_inflight, __ATOMIC_SEQ_CST) &&
796 	       LIST_EMPTY(&dev->wptr_list) && TAILQ_EMPTY(&dev->retry_queue);
797 }
798 
799 void
800 ftl_apply_limits(struct spdk_ftl_dev *dev)
801 {
802 	const struct spdk_ftl_limit *limit;
803 	struct ftl_stats *stats = &dev->stats;
804 	size_t rwb_limit[FTL_RWB_TYPE_MAX];
805 	int i;
806 
807 	ftl_rwb_get_limits(dev->rwb, rwb_limit);
808 
809 	/* Clear existing limit */
810 	dev->limit = SPDK_FTL_LIMIT_MAX;
811 
812 	for (i = SPDK_FTL_LIMIT_CRIT; i < SPDK_FTL_LIMIT_MAX; ++i) {
813 		limit = ftl_get_limit(dev, i);
814 
815 		if (dev->num_free <= limit->thld) {
816 			rwb_limit[FTL_RWB_TYPE_USER] =
817 				(limit->limit * ftl_rwb_entry_cnt(dev->rwb)) / 100;
818 			stats->limits[i]++;
819 			dev->limit = i;
820 			goto apply;
821 		}
822 	}
823 
824 	/* Clear the limits, since we don't need to apply them anymore */
825 	rwb_limit[FTL_RWB_TYPE_USER] = ftl_rwb_entry_cnt(dev->rwb);
826 apply:
827 	ftl_trace_limits(dev, rwb_limit, dev->num_free);
828 	ftl_rwb_set_limits(dev->rwb, rwb_limit);
829 }
830 
831 static int
832 ftl_invalidate_addr_unlocked(struct spdk_ftl_dev *dev, struct ftl_ppa ppa)
833 {
834 	struct ftl_band *band = ftl_band_from_ppa(dev, ppa);
835 	struct ftl_lba_map *lba_map = &band->lba_map;
836 	uint64_t offset;
837 
838 	offset = ftl_band_lbkoff_from_ppa(band, ppa);
839 
840 	/* The bit might be already cleared if two writes are scheduled to the */
841 	/* same LBA at the same time */
842 	if (spdk_bit_array_get(lba_map->vld, offset)) {
843 		assert(lba_map->num_vld > 0);
844 		spdk_bit_array_clear(lba_map->vld, offset);
845 		lba_map->num_vld--;
846 		return 1;
847 	}
848 
849 	return 0;
850 }
851 
852 int
853 ftl_invalidate_addr(struct spdk_ftl_dev *dev, struct ftl_ppa ppa)
854 {
855 	struct ftl_band *band;
856 	int rc;
857 
858 	assert(!ftl_ppa_cached(ppa));
859 	band = ftl_band_from_ppa(dev, ppa);
860 
861 	pthread_spin_lock(&band->lba_map.lock);
862 	rc = ftl_invalidate_addr_unlocked(dev, ppa);
863 	pthread_spin_unlock(&band->lba_map.lock);
864 
865 	return rc;
866 }
867 
868 static int
869 ftl_read_retry(int rc)
870 {
871 	return rc == -EAGAIN;
872 }
873 
874 static int
875 ftl_read_canceled(int rc)
876 {
877 	return rc == -EFAULT || rc == 0;
878 }
879 
880 static void
881 ftl_add_to_retry_queue(struct ftl_io *io)
882 {
883 	if (!(io->flags & FTL_IO_RETRY)) {
884 		io->flags |= FTL_IO_RETRY;
885 		TAILQ_INSERT_TAIL(&io->dev->retry_queue, io, retry_entry);
886 	}
887 }
888 
889 static int
890 ftl_ppa_cache_read(struct ftl_io *io, uint64_t lba,
891 		   struct ftl_ppa ppa, void *buf)
892 {
893 	struct ftl_rwb *rwb = io->dev->rwb;
894 	struct ftl_rwb_entry *entry;
895 	struct ftl_ppa nppa;
896 	int rc = 0;
897 
898 	entry = ftl_rwb_entry_from_offset(rwb, ppa.offset);
899 	pthread_spin_lock(&entry->lock);
900 
901 	nppa = ftl_l2p_get(io->dev, lba);
902 	if (ppa.ppa != nppa.ppa) {
903 		rc = -1;
904 		goto out;
905 	}
906 
907 	memcpy(buf, entry->data, FTL_BLOCK_SIZE);
908 out:
909 	pthread_spin_unlock(&entry->lock);
910 	return rc;
911 }
912 
913 static int
914 ftl_lba_read_next_ppa(struct ftl_io *io, struct ftl_ppa *ppa)
915 {
916 	struct spdk_ftl_dev *dev = io->dev;
917 	struct ftl_ppa next_ppa;
918 	size_t i;
919 
920 	*ppa = ftl_l2p_get(dev, ftl_io_current_lba(io));
921 
922 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Read ppa:%lx, lba:%lu\n",
923 		      ppa->ppa, ftl_io_current_lba(io));
924 
925 	/* If the PPA is invalid, skip it (the buffer should already be zero'ed) */
926 	if (ftl_ppa_invalid(*ppa)) {
927 		return -EFAULT;
928 	}
929 
930 	if (ftl_ppa_cached(*ppa)) {
931 		if (!ftl_ppa_cache_read(io, ftl_io_current_lba(io), *ppa, ftl_io_iovec_addr(io))) {
932 			return 0;
933 		}
934 
935 		/* If the state changed, we have to re-read the l2p */
936 		return -EAGAIN;
937 	}
938 
939 	for (i = 1; i < ftl_io_iovec_len_left(io); ++i) {
940 		next_ppa = ftl_l2p_get(dev, ftl_io_get_lba(io, io->pos + i));
941 
942 		if (ftl_ppa_invalid(next_ppa) || ftl_ppa_cached(next_ppa)) {
943 			break;
944 		}
945 
946 		if (ftl_ppa_addr_pack(dev, *ppa) + i != ftl_ppa_addr_pack(dev, next_ppa)) {
947 			break;
948 		}
949 	}
950 
951 	return i;
952 }
953 
954 static int
955 ftl_submit_read(struct ftl_io *io)
956 {
957 	struct spdk_ftl_dev *dev = io->dev;
958 	struct ftl_ppa ppa;
959 	int rc = 0, lbk_cnt;
960 
961 	assert(LIST_EMPTY(&io->children));
962 
963 	while (io->pos < io->lbk_cnt) {
964 		if (ftl_io_mode_ppa(io)) {
965 			lbk_cnt = rc = ftl_ppa_read_next_ppa(io, &ppa);
966 		} else {
967 			lbk_cnt = rc = ftl_lba_read_next_ppa(io, &ppa);
968 		}
969 
970 		/* We might need to retry the read from scratch (e.g. */
971 		/* because write was under way and completed before */
972 		/* we could read it from rwb */
973 		if (ftl_read_retry(rc)) {
974 			continue;
975 		}
976 
977 		/* We don't have to schedule the read, as it was read from cache */
978 		if (ftl_read_canceled(rc)) {
979 			ftl_io_advance(io, 1);
980 			ftl_trace_completion(io->dev, io, rc ? FTL_TRACE_COMPLETION_INVALID :
981 					     FTL_TRACE_COMPLETION_CACHE);
982 			rc = 0;
983 			continue;
984 		}
985 
986 		assert(lbk_cnt > 0);
987 
988 		ftl_trace_submission(dev, io, ppa, lbk_cnt);
989 		rc = spdk_nvme_ns_cmd_read(dev->ns, ftl_get_read_qpair(dev),
990 					   ftl_io_iovec_addr(io),
991 					   ftl_ppa_addr_pack(io->dev, ppa), lbk_cnt,
992 					   ftl_io_cmpl_cb, io, 0);
993 		if (spdk_unlikely(rc)) {
994 			if (rc == -ENOMEM) {
995 				ftl_add_to_retry_queue(io);
996 			} else {
997 				ftl_io_fail(io, rc);
998 			}
999 			break;
1000 		}
1001 
1002 		ftl_io_inc_req(io);
1003 		ftl_io_advance(io, lbk_cnt);
1004 	}
1005 
1006 	/* If we didn't have to read anything from the device, */
1007 	/* complete the request right away */
1008 	if (ftl_io_done(io)) {
1009 		ftl_io_complete(io);
1010 	}
1011 
1012 	return rc;
1013 }
1014 
1015 static void
1016 ftl_complete_flush(struct ftl_flush *flush)
1017 {
1018 	assert(flush->num_req == 0);
1019 	LIST_REMOVE(flush, list_entry);
1020 
1021 	flush->cb.fn(flush->cb.ctx, 0);
1022 
1023 	spdk_bit_array_free(&flush->bmap);
1024 	free(flush);
1025 }
1026 
1027 static void
1028 ftl_process_flush(struct spdk_ftl_dev *dev, struct ftl_rwb_batch *batch)
1029 {
1030 	struct ftl_flush *flush, *tflush;
1031 	size_t offset;
1032 
1033 	LIST_FOREACH_SAFE(flush, &dev->flush_list, list_entry, tflush) {
1034 		offset = ftl_rwb_batch_get_offset(batch);
1035 
1036 		if (spdk_bit_array_get(flush->bmap, offset)) {
1037 			spdk_bit_array_clear(flush->bmap, offset);
1038 			if (!(--flush->num_req)) {
1039 				ftl_complete_flush(flush);
1040 			}
1041 		}
1042 	}
1043 }
1044 
1045 static void
1046 ftl_nv_cache_wrap_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1047 {
1048 	struct ftl_nv_cache *nv_cache = cb_arg;
1049 
1050 	if (!success) {
1051 		SPDK_ERRLOG("Unable to write non-volatile cache metadata header\n");
1052 		/* TODO: go into read-only mode */
1053 		assert(0);
1054 	}
1055 
1056 	pthread_spin_lock(&nv_cache->lock);
1057 	nv_cache->ready = true;
1058 	pthread_spin_unlock(&nv_cache->lock);
1059 
1060 	spdk_bdev_free_io(bdev_io);
1061 }
1062 
1063 static void
1064 ftl_nv_cache_wrap(void *ctx)
1065 {
1066 	struct ftl_nv_cache *nv_cache = ctx;
1067 	int rc;
1068 
1069 	rc = ftl_nv_cache_write_header(nv_cache, false, ftl_nv_cache_wrap_cb, nv_cache);
1070 	if (spdk_unlikely(rc != 0)) {
1071 		SPDK_ERRLOG("Unable to write non-volatile cache metadata header: %s\n",
1072 			    spdk_strerror(-rc));
1073 		/* TODO: go into read-only mode */
1074 		assert(0);
1075 	}
1076 }
1077 
1078 static uint64_t
1079 ftl_reserve_nv_cache(struct ftl_nv_cache *nv_cache, size_t *num_lbks, unsigned int *phase)
1080 {
1081 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1082 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1083 	uint64_t num_available, cache_size, cache_addr = FTL_LBA_INVALID;
1084 
1085 	cache_size = spdk_bdev_get_num_blocks(bdev);
1086 
1087 	pthread_spin_lock(&nv_cache->lock);
1088 	if (spdk_unlikely(nv_cache->num_available == 0 || !nv_cache->ready)) {
1089 		goto out;
1090 	}
1091 
1092 	num_available = spdk_min(nv_cache->num_available, *num_lbks);
1093 	num_available = spdk_min(num_available, dev->conf.nv_cache.max_request_cnt);
1094 
1095 	if (spdk_unlikely(nv_cache->current_addr + num_available > cache_size)) {
1096 		*num_lbks = cache_size - nv_cache->current_addr;
1097 	} else {
1098 		*num_lbks = num_available;
1099 	}
1100 
1101 	cache_addr = nv_cache->current_addr;
1102 	nv_cache->current_addr += *num_lbks;
1103 	nv_cache->num_available -= *num_lbks;
1104 	*phase = nv_cache->phase;
1105 
1106 	if (nv_cache->current_addr == spdk_bdev_get_num_blocks(bdev)) {
1107 		nv_cache->current_addr = FTL_NV_CACHE_DATA_OFFSET;
1108 		nv_cache->phase = ftl_nv_cache_next_phase(nv_cache->phase);
1109 		nv_cache->ready = false;
1110 		spdk_thread_send_msg(ftl_get_core_thread(dev), ftl_nv_cache_wrap, nv_cache);
1111 	}
1112 out:
1113 	pthread_spin_unlock(&nv_cache->lock);
1114 	return cache_addr;
1115 }
1116 
1117 static struct ftl_io *
1118 ftl_alloc_io_nv_cache(struct ftl_io *parent, size_t num_lbks)
1119 {
1120 	struct ftl_io_init_opts opts = {
1121 		.dev		= parent->dev,
1122 		.parent		= parent,
1123 		.data		= ftl_io_iovec_addr(parent),
1124 		.lbk_cnt	= num_lbks,
1125 		.flags		= parent->flags | FTL_IO_CACHE,
1126 	};
1127 
1128 	return ftl_io_init_internal(&opts);
1129 }
1130 
1131 static void
1132 ftl_nv_cache_submit_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1133 {
1134 	struct ftl_io *io = cb_arg;
1135 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1136 
1137 	if (spdk_unlikely(!success)) {
1138 		SPDK_ERRLOG("Non-volatile cache write failed at %"PRIx64"\n", io->ppa.ppa);
1139 		io->status = -EIO;
1140 	}
1141 
1142 	ftl_io_dec_req(io);
1143 	if (ftl_io_done(io)) {
1144 		spdk_mempool_put(nv_cache->md_pool, io->md);
1145 		ftl_io_complete(io);
1146 	}
1147 
1148 	spdk_bdev_free_io(bdev_io);
1149 }
1150 
1151 static void
1152 ftl_submit_nv_cache(void *ctx)
1153 {
1154 	struct ftl_io *io = ctx;
1155 	struct spdk_ftl_dev *dev = io->dev;
1156 	struct spdk_thread *thread;
1157 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
1158 	struct ftl_io_channel *ioch;
1159 	int rc;
1160 
1161 	ioch = spdk_io_channel_get_ctx(io->ioch);
1162 	thread = spdk_io_channel_get_thread(io->ioch);
1163 
1164 	rc = spdk_bdev_write_blocks_with_md(nv_cache->bdev_desc, ioch->cache_ioch,
1165 					    ftl_io_iovec_addr(io), io->md, io->ppa.ppa,
1166 					    io->lbk_cnt, ftl_nv_cache_submit_cb, io);
1167 	if (rc == -ENOMEM) {
1168 		spdk_thread_send_msg(thread, ftl_submit_nv_cache, io);
1169 		return;
1170 	} else if (rc) {
1171 		SPDK_ERRLOG("Write to persistent cache failed: %s (%"PRIu64", %"PRIu64")\n",
1172 			    spdk_strerror(-rc), io->ppa.ppa, io->lbk_cnt);
1173 		spdk_mempool_put(nv_cache->md_pool, io->md);
1174 		io->status = -EIO;
1175 		ftl_io_complete(io);
1176 		return;
1177 	}
1178 
1179 	ftl_io_advance(io, io->lbk_cnt);
1180 	ftl_io_inc_req(io);
1181 }
1182 
1183 static void
1184 ftl_nv_cache_fill_md(struct ftl_io *io, unsigned int phase)
1185 {
1186 	struct spdk_bdev *bdev;
1187 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1188 	uint64_t lbk_off, lba;
1189 	void *md_buf = io->md;
1190 
1191 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1192 
1193 	for (lbk_off = 0; lbk_off < io->lbk_cnt; ++lbk_off) {
1194 		lba = ftl_nv_cache_pack_lba(ftl_io_get_lba(io, lbk_off), phase);
1195 		memcpy(md_buf, &lba, sizeof(lba));
1196 		md_buf += spdk_bdev_get_md_size(bdev);
1197 	}
1198 }
1199 
1200 static void
1201 _ftl_write_nv_cache(void *ctx)
1202 {
1203 	struct ftl_io *child, *io = ctx;
1204 	struct spdk_ftl_dev *dev = io->dev;
1205 	struct spdk_thread *thread;
1206 	unsigned int phase;
1207 	uint64_t num_lbks;
1208 
1209 	thread = spdk_io_channel_get_thread(io->ioch);
1210 
1211 	while (io->pos < io->lbk_cnt) {
1212 		num_lbks = ftl_io_iovec_len_left(io);
1213 
1214 		child = ftl_alloc_io_nv_cache(io, num_lbks);
1215 		if (spdk_unlikely(!child)) {
1216 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1217 			return;
1218 		}
1219 
1220 		child->md = spdk_mempool_get(dev->nv_cache.md_pool);
1221 		if (spdk_unlikely(!child->md)) {
1222 			ftl_io_free(child);
1223 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1224 			break;
1225 		}
1226 
1227 		/* Reserve area on the write buffer cache */
1228 		child->ppa.ppa = ftl_reserve_nv_cache(&dev->nv_cache, &num_lbks, &phase);
1229 		if (child->ppa.ppa == FTL_LBA_INVALID) {
1230 			spdk_mempool_put(dev->nv_cache.md_pool, child->md);
1231 			ftl_io_free(child);
1232 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1233 			break;
1234 		}
1235 
1236 		/* Shrink the IO if there isn't enough room in the cache to fill the whole iovec */
1237 		if (spdk_unlikely(num_lbks != ftl_io_iovec_len_left(io))) {
1238 			ftl_io_shrink_iovec(child, num_lbks);
1239 		}
1240 
1241 		ftl_nv_cache_fill_md(child, phase);
1242 		ftl_submit_nv_cache(child);
1243 	}
1244 
1245 	if (ftl_io_done(io)) {
1246 		ftl_io_complete(io);
1247 	}
1248 }
1249 
1250 static void
1251 ftl_write_nv_cache(struct ftl_io *parent)
1252 {
1253 	ftl_io_reset(parent);
1254 	parent->flags |= FTL_IO_CACHE;
1255 	_ftl_write_nv_cache(parent);
1256 }
1257 
1258 int
1259 ftl_nv_cache_write_header(struct ftl_nv_cache *nv_cache, bool shutdown,
1260 			  spdk_bdev_io_completion_cb cb_fn, void *cb_arg)
1261 {
1262 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1263 	struct ftl_nv_cache_header *hdr = nv_cache->dma_buf;
1264 	struct spdk_bdev *bdev;
1265 	struct ftl_io_channel *ioch;
1266 
1267 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1268 	ioch = spdk_io_channel_get_ctx(dev->ioch);
1269 
1270 	memset(hdr, 0, spdk_bdev_get_block_size(bdev));
1271 
1272 	hdr->phase = (uint8_t)nv_cache->phase;
1273 	hdr->size = spdk_bdev_get_num_blocks(bdev);
1274 	hdr->uuid = dev->uuid;
1275 	hdr->version = FTL_NV_CACHE_HEADER_VERSION;
1276 	hdr->current_addr = shutdown ? nv_cache->current_addr : FTL_LBA_INVALID;
1277 	hdr->checksum = spdk_crc32c_update(hdr, offsetof(struct ftl_nv_cache_header, checksum), 0);
1278 
1279 	return spdk_bdev_write_blocks(nv_cache->bdev_desc, ioch->cache_ioch, hdr, 0, 1,
1280 				      cb_fn, cb_arg);
1281 }
1282 
1283 int
1284 ftl_nv_cache_scrub(struct ftl_nv_cache *nv_cache, spdk_bdev_io_completion_cb cb_fn, void *cb_arg)
1285 {
1286 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1287 	struct ftl_io_channel *ioch;
1288 	struct spdk_bdev *bdev;
1289 
1290 	ioch = spdk_io_channel_get_ctx(dev->ioch);
1291 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1292 
1293 	return spdk_bdev_write_zeroes_blocks(nv_cache->bdev_desc, ioch->cache_ioch, 1,
1294 					     spdk_bdev_get_num_blocks(bdev) - 1,
1295 					     cb_fn, cb_arg);
1296 }
1297 
1298 static void
1299 ftl_write_fail(struct ftl_io *io, int status)
1300 {
1301 	struct ftl_rwb_batch *batch = io->rwb_batch;
1302 	struct spdk_ftl_dev *dev = io->dev;
1303 	struct ftl_rwb_entry *entry;
1304 	struct ftl_band *band;
1305 	char buf[128];
1306 
1307 	entry = ftl_rwb_batch_first_entry(batch);
1308 
1309 	band = ftl_band_from_ppa(io->dev, entry->ppa);
1310 	SPDK_ERRLOG("Write failed @ppa: %s, status: %d\n",
1311 		    ftl_ppa2str(entry->ppa, buf, sizeof(buf)), status);
1312 
1313 	/* Close the band and, halt wptr and defrag */
1314 	ftl_halt_writes(dev, band);
1315 
1316 	ftl_rwb_foreach(entry, batch) {
1317 		/* Invalidate meta set by process_writes() */
1318 		ftl_invalidate_addr(dev, entry->ppa);
1319 	}
1320 
1321 	/* Reset the batch back to the the RWB to resend it later */
1322 	ftl_rwb_batch_revert(batch);
1323 }
1324 
1325 static void
1326 ftl_write_cb(struct ftl_io *io, void *arg, int status)
1327 {
1328 	struct spdk_ftl_dev *dev = io->dev;
1329 	struct ftl_rwb_batch *batch = io->rwb_batch;
1330 	struct ftl_rwb_entry *entry;
1331 	struct ftl_band *band;
1332 
1333 	if (status) {
1334 		ftl_write_fail(io, status);
1335 		return;
1336 	}
1337 
1338 	assert(io->lbk_cnt == dev->xfer_size);
1339 	ftl_rwb_foreach(entry, batch) {
1340 		band = entry->band;
1341 		if (!(io->flags & FTL_IO_MD) && !(entry->flags & FTL_IO_PAD)) {
1342 			/* Verify that the LBA is set for user lbks */
1343 			assert(entry->lba != FTL_LBA_INVALID);
1344 		}
1345 
1346 		if (band != NULL) {
1347 			assert(band->num_reloc_blocks > 0);
1348 			band->num_reloc_blocks--;
1349 		}
1350 
1351 		SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write ppa:%lu, lba:%lu\n",
1352 			      entry->ppa.ppa, entry->lba);
1353 	}
1354 
1355 	ftl_process_flush(dev, batch);
1356 	ftl_rwb_batch_release(batch);
1357 }
1358 
1359 static void
1360 ftl_update_rwb_stats(struct spdk_ftl_dev *dev, const struct ftl_rwb_entry *entry)
1361 {
1362 	if (!ftl_rwb_entry_internal(entry)) {
1363 		dev->stats.write_user++;
1364 	}
1365 	dev->stats.write_total++;
1366 }
1367 
1368 static void
1369 ftl_update_l2p(struct spdk_ftl_dev *dev, const struct ftl_rwb_entry *entry,
1370 	       struct ftl_ppa ppa)
1371 {
1372 	struct ftl_ppa prev_ppa;
1373 	struct ftl_rwb_entry *prev;
1374 	struct ftl_band *band;
1375 	int valid;
1376 
1377 	prev_ppa = ftl_l2p_get(dev, entry->lba);
1378 	if (ftl_ppa_invalid(prev_ppa)) {
1379 		ftl_l2p_set(dev, entry->lba, ppa);
1380 		return;
1381 	}
1382 
1383 	/* If the L2P's PPA is different than what we expected we don't need to */
1384 	/* do anything (someone's already overwritten our data). */
1385 	if (ftl_rwb_entry_weak(entry) && !ftl_ppa_cmp(prev_ppa, entry->ppa)) {
1386 		return;
1387 	}
1388 
1389 	if (ftl_ppa_cached(prev_ppa)) {
1390 		assert(!ftl_rwb_entry_weak(entry));
1391 		prev = ftl_rwb_entry_from_offset(dev->rwb, prev_ppa.offset);
1392 		pthread_spin_lock(&prev->lock);
1393 
1394 		/* Re-read the L2P under the lock to protect against updates */
1395 		/* to this LBA from other threads */
1396 		prev_ppa = ftl_l2p_get(dev, entry->lba);
1397 
1398 		/* If the entry is no longer in cache, another write has been */
1399 		/* scheduled in the meantime, so we have to invalidate its LBA */
1400 		if (!ftl_ppa_cached(prev_ppa)) {
1401 			ftl_invalidate_addr(dev, prev_ppa);
1402 		}
1403 
1404 		/* If previous entry is part of cache, remove and invalidate it */
1405 		if (ftl_rwb_entry_valid(prev)) {
1406 			ftl_invalidate_addr(dev, prev->ppa);
1407 			ftl_rwb_entry_invalidate(prev);
1408 		}
1409 
1410 		ftl_l2p_set(dev, entry->lba, ppa);
1411 		pthread_spin_unlock(&prev->lock);
1412 		return;
1413 	}
1414 
1415 	/* Lock the band containing previous PPA. This assures atomic changes to */
1416 	/* the L2P as wall as metadata. The valid bits in metadata are used to */
1417 	/* check weak writes validity. */
1418 	band = ftl_band_from_ppa(dev, prev_ppa);
1419 	pthread_spin_lock(&band->lba_map.lock);
1420 
1421 	valid = ftl_invalidate_addr_unlocked(dev, prev_ppa);
1422 
1423 	/* If the address has been invalidated already, we don't want to update */
1424 	/* the L2P for weak writes, as it means the write is no longer valid. */
1425 	if (!ftl_rwb_entry_weak(entry) || valid) {
1426 		ftl_l2p_set(dev, entry->lba, ppa);
1427 	}
1428 
1429 	pthread_spin_unlock(&band->lba_map.lock);
1430 }
1431 
1432 static struct ftl_io *
1433 ftl_io_init_child_write(struct ftl_io *parent, struct ftl_ppa ppa,
1434 			void *data, void *md, ftl_io_fn cb)
1435 {
1436 	struct ftl_io *io;
1437 	struct spdk_ftl_dev *dev = parent->dev;
1438 	struct ftl_io_init_opts opts = {
1439 		.dev		= dev,
1440 		.io		= NULL,
1441 		.parent		= parent,
1442 		.rwb_batch	= NULL,
1443 		.band		= parent->band,
1444 		.size		= sizeof(struct ftl_io),
1445 		.flags		= 0,
1446 		.type		= FTL_IO_WRITE,
1447 		.lbk_cnt	= dev->xfer_size,
1448 		.cb_fn		= cb,
1449 		.data		= data,
1450 		.md		= md,
1451 	};
1452 
1453 	io = ftl_io_init_internal(&opts);
1454 	if (!io) {
1455 		return NULL;
1456 	}
1457 
1458 	io->ppa = ppa;
1459 
1460 	return io;
1461 }
1462 
1463 static void
1464 ftl_io_child_write_cb(struct ftl_io *io, void *ctx, int status)
1465 {
1466 	struct ftl_chunk *chunk;
1467 	struct ftl_wptr *wptr;
1468 
1469 	chunk = ftl_band_chunk_from_ppa(io->band, io->ppa);
1470 	wptr = ftl_wptr_from_band(io->band);
1471 
1472 	chunk->busy = false;
1473 	chunk->write_offset += io->lbk_cnt;
1474 
1475 	/* If some other write on the same band failed the write pointer would already be freed */
1476 	if (spdk_likely(wptr)) {
1477 		wptr->num_outstanding--;
1478 	}
1479 }
1480 
1481 static int
1482 ftl_submit_child_write(struct ftl_wptr *wptr, struct ftl_io *io, int lbk_cnt)
1483 {
1484 	struct spdk_ftl_dev	*dev = io->dev;
1485 	struct ftl_io		*child;
1486 	int			rc;
1487 	struct ftl_ppa		ppa;
1488 
1489 	if (spdk_likely(!wptr->direct_mode)) {
1490 		ppa = wptr->ppa;
1491 	} else {
1492 		assert(io->flags & FTL_IO_DIRECT_ACCESS);
1493 		assert(io->ppa.chk == wptr->band->id);
1494 		ppa = io->ppa;
1495 	}
1496 
1497 	/* Split IO to child requests and release chunk immediately after child is completed */
1498 	child = ftl_io_init_child_write(io, ppa, ftl_io_iovec_addr(io),
1499 					ftl_io_get_md(io), ftl_io_child_write_cb);
1500 	if (!child) {
1501 		return -EAGAIN;
1502 	}
1503 
1504 	wptr->num_outstanding++;
1505 	rc = spdk_nvme_ns_cmd_write_with_md(dev->ns, ftl_get_write_qpair(dev),
1506 					    ftl_io_iovec_addr(child), child->md,
1507 					    ftl_ppa_addr_pack(dev, ppa),
1508 					    lbk_cnt, ftl_io_cmpl_cb, child, 0, 0, 0);
1509 	if (rc) {
1510 		wptr->num_outstanding--;
1511 		ftl_io_fail(child, rc);
1512 		ftl_io_complete(child);
1513 		SPDK_ERRLOG("spdk_nvme_ns_cmd_write_with_md failed with status:%d, ppa:%lu\n",
1514 			    rc, ppa.ppa);
1515 		return -EIO;
1516 	}
1517 
1518 	ftl_io_inc_req(child);
1519 	ftl_io_advance(child, lbk_cnt);
1520 
1521 	return 0;
1522 }
1523 
1524 static int
1525 ftl_submit_write(struct ftl_wptr *wptr, struct ftl_io *io)
1526 {
1527 	struct spdk_ftl_dev	*dev = io->dev;
1528 	int			rc = 0;
1529 
1530 	assert(io->lbk_cnt % dev->xfer_size == 0);
1531 
1532 	while (io->iov_pos < io->iov_cnt) {
1533 		/* There are no guarantees of the order of completion of NVMe IO submission queue */
1534 		/* so wait until chunk is not busy before submitting another write */
1535 		if (wptr->chunk->busy) {
1536 			TAILQ_INSERT_TAIL(&wptr->pending_queue, io, retry_entry);
1537 			rc = -EAGAIN;
1538 			break;
1539 		}
1540 
1541 		rc = ftl_submit_child_write(wptr, io, dev->xfer_size);
1542 		if (spdk_unlikely(rc)) {
1543 			if (rc == -EAGAIN) {
1544 				TAILQ_INSERT_TAIL(&wptr->pending_queue, io, retry_entry);
1545 			} else {
1546 				ftl_io_fail(io, rc);
1547 			}
1548 			break;
1549 		}
1550 
1551 		ftl_trace_submission(dev, io, wptr->ppa, dev->xfer_size);
1552 		ftl_wptr_advance(wptr, dev->xfer_size);
1553 	}
1554 
1555 	if (ftl_io_done(io)) {
1556 		/* Parent IO will complete after all children are completed */
1557 		ftl_io_complete(io);
1558 	}
1559 
1560 	return rc;
1561 }
1562 
1563 static void
1564 ftl_flush_pad_batch(struct spdk_ftl_dev *dev)
1565 {
1566 	struct ftl_rwb *rwb = dev->rwb;
1567 	size_t size, num_entries;
1568 
1569 	size = ftl_rwb_num_acquired(rwb, FTL_RWB_TYPE_INTERNAL) +
1570 	       ftl_rwb_num_acquired(rwb, FTL_RWB_TYPE_USER);
1571 
1572 	/* There must be something in the RWB, otherwise the flush */
1573 	/* wouldn't be waiting for anything */
1574 	assert(size > 0);
1575 
1576 	/* Only add padding when there's less than xfer size */
1577 	/* entries in the buffer. Otherwise we just have to wait */
1578 	/* for the entries to become ready. */
1579 	num_entries = ftl_rwb_get_active_batches(dev->rwb) * dev->xfer_size;
1580 	if (size < num_entries) {
1581 		ftl_rwb_pad(dev, num_entries - (size % num_entries));
1582 	}
1583 }
1584 
1585 static int
1586 ftl_wptr_process_writes(struct ftl_wptr *wptr)
1587 {
1588 	struct spdk_ftl_dev	*dev = wptr->dev;
1589 	struct ftl_rwb_batch	*batch;
1590 	struct ftl_rwb_entry	*entry;
1591 	struct ftl_io		*io;
1592 	struct ftl_ppa		ppa, prev_ppa;
1593 
1594 	if (spdk_unlikely(!TAILQ_EMPTY(&wptr->pending_queue))) {
1595 		io = TAILQ_FIRST(&wptr->pending_queue);
1596 		TAILQ_REMOVE(&wptr->pending_queue, io, retry_entry);
1597 
1598 		if (ftl_submit_write(wptr, io) == -EAGAIN) {
1599 			return 0;
1600 		}
1601 	}
1602 
1603 	/* Make sure the band is prepared for writing */
1604 	if (!ftl_wptr_ready(wptr)) {
1605 		return 0;
1606 	}
1607 
1608 	if (dev->halt) {
1609 		ftl_wptr_process_shutdown(wptr);
1610 	}
1611 
1612 	if (spdk_unlikely(wptr->flush)) {
1613 		ftl_wptr_pad_band(wptr);
1614 	}
1615 
1616 	batch = ftl_rwb_pop(dev->rwb);
1617 	if (!batch) {
1618 		/* If there are queued flush requests we need to pad the RWB to */
1619 		/* force out remaining entries */
1620 		if (!LIST_EMPTY(&dev->flush_list)) {
1621 			ftl_flush_pad_batch(dev);
1622 		}
1623 
1624 		return 0;
1625 	}
1626 
1627 	io = ftl_io_rwb_init(dev, wptr->band, batch, ftl_write_cb);
1628 	if (!io) {
1629 		goto error;
1630 	}
1631 
1632 	ppa = wptr->ppa;
1633 	ftl_rwb_foreach(entry, batch) {
1634 		/* Update band's relocation stats if the IO comes from reloc */
1635 		if (entry->flags & FTL_IO_WEAK) {
1636 			if (!spdk_bit_array_get(wptr->band->reloc_bitmap, entry->band->id)) {
1637 				spdk_bit_array_set(wptr->band->reloc_bitmap, entry->band->id);
1638 				entry->band->num_reloc_bands++;
1639 			}
1640 		}
1641 
1642 		entry->ppa = ppa;
1643 		if (entry->lba != FTL_LBA_INVALID) {
1644 			pthread_spin_lock(&entry->lock);
1645 			prev_ppa = ftl_l2p_get(dev, entry->lba);
1646 
1647 			/* If the l2p was updated in the meantime, don't update band's metadata */
1648 			if (ftl_ppa_cached(prev_ppa) && prev_ppa.offset == entry->pos) {
1649 				/* Setting entry's cache bit needs to be done after metadata */
1650 				/* within the band is updated to make sure that writes */
1651 				/* invalidating the entry clear the metadata as well */
1652 				ftl_band_set_addr(wptr->band, entry->lba, entry->ppa);
1653 				ftl_rwb_entry_set_valid(entry);
1654 			}
1655 			pthread_spin_unlock(&entry->lock);
1656 		}
1657 
1658 		ftl_trace_rwb_pop(dev, entry);
1659 		ftl_update_rwb_stats(dev, entry);
1660 
1661 		ppa = ftl_band_next_ppa(wptr->band, ppa, 1);
1662 	}
1663 
1664 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write ppa:%lx, %lx\n", wptr->ppa.ppa,
1665 		      ftl_ppa_addr_pack(dev, wptr->ppa));
1666 
1667 	if (ftl_submit_write(wptr, io)) {
1668 		/* TODO: we need some recovery here */
1669 		assert(0 && "Write submit failed");
1670 		if (ftl_io_done(io)) {
1671 			ftl_io_free(io);
1672 		}
1673 	}
1674 
1675 	return dev->xfer_size;
1676 error:
1677 	ftl_rwb_batch_revert(batch);
1678 	return 0;
1679 }
1680 
1681 static int
1682 ftl_process_writes(struct spdk_ftl_dev *dev)
1683 {
1684 	struct ftl_wptr *wptr, *twptr;
1685 	size_t num_active = 0;
1686 	enum ftl_band_state state;
1687 
1688 	LIST_FOREACH_SAFE(wptr, &dev->wptr_list, list_entry, twptr) {
1689 		ftl_wptr_process_writes(wptr);
1690 		state = wptr->band->state;
1691 
1692 		if (state != FTL_BAND_STATE_FULL &&
1693 		    state != FTL_BAND_STATE_CLOSING &&
1694 		    state != FTL_BAND_STATE_CLOSED) {
1695 			num_active++;
1696 		}
1697 	}
1698 
1699 	if (num_active < 1) {
1700 		ftl_add_wptr(dev);
1701 	}
1702 
1703 	return 0;
1704 }
1705 
1706 static void
1707 ftl_rwb_entry_fill(struct ftl_rwb_entry *entry, struct ftl_io *io)
1708 {
1709 	memcpy(entry->data, ftl_io_iovec_addr(io), FTL_BLOCK_SIZE);
1710 
1711 	if (ftl_rwb_entry_weak(entry)) {
1712 		entry->band = ftl_band_from_ppa(io->dev, io->ppa);
1713 		entry->ppa = ftl_band_next_ppa(entry->band, io->ppa, io->pos);
1714 		entry->band->num_reloc_blocks++;
1715 	}
1716 
1717 	entry->trace = io->trace;
1718 	entry->lba = ftl_io_current_lba(io);
1719 
1720 	if (entry->md) {
1721 		memcpy(entry->md, &entry->lba, sizeof(entry->lba));
1722 	}
1723 }
1724 
1725 static int
1726 ftl_rwb_fill(struct ftl_io *io)
1727 {
1728 	struct spdk_ftl_dev *dev = io->dev;
1729 	struct ftl_rwb_entry *entry;
1730 	struct ftl_ppa ppa = { .cached = 1 };
1731 	int flags = ftl_rwb_flags_from_io(io);
1732 
1733 	while (io->pos < io->lbk_cnt) {
1734 		if (ftl_io_current_lba(io) == FTL_LBA_INVALID) {
1735 			ftl_io_advance(io, 1);
1736 			continue;
1737 		}
1738 
1739 		entry = ftl_acquire_entry(dev, flags);
1740 		if (!entry) {
1741 			return -EAGAIN;
1742 		}
1743 
1744 		ftl_rwb_entry_fill(entry, io);
1745 
1746 		ppa.offset = entry->pos;
1747 
1748 		ftl_trace_rwb_fill(dev, io);
1749 		ftl_update_l2p(dev, entry, ppa);
1750 		ftl_io_advance(io, 1);
1751 
1752 		/* Needs to be done after L2P is updated to avoid race with */
1753 		/* write completion callback when it's processed faster than */
1754 		/* L2P is set in update_l2p(). */
1755 		ftl_rwb_push(entry);
1756 	}
1757 
1758 	if (ftl_io_done(io)) {
1759 		if (ftl_dev_has_nv_cache(dev) && !(io->flags & FTL_IO_BYPASS_CACHE)) {
1760 			ftl_write_nv_cache(io);
1761 		} else {
1762 			ftl_io_complete(io);
1763 		}
1764 	}
1765 
1766 	return 0;
1767 }
1768 
1769 static bool
1770 ftl_dev_needs_defrag(struct spdk_ftl_dev *dev)
1771 {
1772 	const struct spdk_ftl_limit *limit = ftl_get_limit(dev, SPDK_FTL_LIMIT_START);
1773 
1774 	if (ftl_reloc_is_halted(dev->reloc)) {
1775 		return false;
1776 	}
1777 
1778 	if (dev->df_band) {
1779 		return false;
1780 	}
1781 
1782 	if (dev->num_free <= limit->thld) {
1783 		return true;
1784 	}
1785 
1786 	return false;
1787 }
1788 
1789 static double
1790 ftl_band_calc_merit(struct ftl_band *band, size_t *threshold_valid)
1791 {
1792 	size_t usable, valid, invalid;
1793 	double vld_ratio;
1794 
1795 	/* If the band doesn't have any usable lbks it's of no use */
1796 	usable = ftl_band_num_usable_lbks(band);
1797 	if (usable == 0) {
1798 		return 0.0;
1799 	}
1800 
1801 	valid =  threshold_valid ? (usable - *threshold_valid) : band->lba_map.num_vld;
1802 	invalid = usable - valid;
1803 
1804 	/* Add one to avoid division by 0 */
1805 	vld_ratio = (double)invalid / (double)(valid + 1);
1806 	return vld_ratio * ftl_band_age(band);
1807 }
1808 
1809 static bool
1810 ftl_band_needs_defrag(struct ftl_band *band, struct spdk_ftl_dev *dev)
1811 {
1812 	struct spdk_ftl_conf *conf = &dev->conf;
1813 	size_t thld_vld;
1814 
1815 	/* If we're in dire need of free bands, every band is worth defragging */
1816 	if (ftl_current_limit(dev) == SPDK_FTL_LIMIT_CRIT) {
1817 		return true;
1818 	}
1819 
1820 	thld_vld = (ftl_band_num_usable_lbks(band) * conf->invalid_thld) / 100;
1821 
1822 	return band->merit > ftl_band_calc_merit(band, &thld_vld);
1823 }
1824 
1825 static struct ftl_band *
1826 ftl_select_defrag_band(struct spdk_ftl_dev *dev)
1827 {
1828 	struct ftl_band *band, *mband = NULL;
1829 	double merit = 0;
1830 
1831 	LIST_FOREACH(band, &dev->shut_bands, list_entry) {
1832 		assert(band->state == FTL_BAND_STATE_CLOSED);
1833 		band->merit = ftl_band_calc_merit(band, NULL);
1834 		if (band->merit > merit) {
1835 			merit = band->merit;
1836 			mband = band;
1837 		}
1838 	}
1839 
1840 	if (mband && !ftl_band_needs_defrag(mband, dev)) {
1841 		mband = NULL;
1842 	}
1843 
1844 	return mband;
1845 }
1846 
1847 static void
1848 ftl_process_relocs(struct spdk_ftl_dev *dev)
1849 {
1850 	struct ftl_band *band;
1851 
1852 	if (ftl_dev_needs_defrag(dev)) {
1853 		band = dev->df_band = ftl_select_defrag_band(dev);
1854 
1855 		if (band) {
1856 			ftl_reloc_add(dev->reloc, band, 0, ftl_num_band_lbks(dev), 0);
1857 			ftl_trace_defrag_band(dev, band);
1858 		}
1859 	}
1860 
1861 	ftl_reloc(dev->reloc);
1862 }
1863 
1864 int
1865 ftl_current_limit(const struct spdk_ftl_dev *dev)
1866 {
1867 	return dev->limit;
1868 }
1869 
1870 void
1871 spdk_ftl_dev_get_attrs(const struct spdk_ftl_dev *dev, struct spdk_ftl_attrs *attrs)
1872 {
1873 	attrs->uuid = dev->uuid;
1874 	attrs->lbk_cnt = dev->num_lbas;
1875 	attrs->lbk_size = FTL_BLOCK_SIZE;
1876 	attrs->range = dev->range;
1877 	attrs->cache_bdev_desc = dev->nv_cache.bdev_desc;
1878 	attrs->num_chunks = dev->geo.num_chk;
1879 	attrs->chunk_size = dev->geo.clba;
1880 	attrs->conf = dev->conf;
1881 }
1882 
1883 static void
1884 _ftl_io_write(void *ctx)
1885 {
1886 	ftl_io_write((struct ftl_io *)ctx);
1887 }
1888 
1889 static int
1890 ftl_rwb_fill_leaf(struct ftl_io *io)
1891 {
1892 	int rc;
1893 
1894 	rc = ftl_rwb_fill(io);
1895 	if (rc == -EAGAIN) {
1896 		spdk_thread_send_msg(spdk_io_channel_get_thread(io->ioch),
1897 				     _ftl_io_write, io);
1898 		return 0;
1899 	}
1900 
1901 	return rc;
1902 }
1903 
1904 static int
1905 ftl_submit_write_leaf(struct ftl_io *io)
1906 {
1907 	int rc;
1908 
1909 	rc = ftl_submit_write(ftl_wptr_from_band(io->band), io);
1910 	if (rc == -EAGAIN) {
1911 		/* EAGAIN means that the request was put on the pending queue */
1912 		return 0;
1913 	}
1914 
1915 	return rc;
1916 }
1917 
1918 void
1919 ftl_io_write(struct ftl_io *io)
1920 {
1921 	struct spdk_ftl_dev *dev = io->dev;
1922 
1923 	/* For normal IOs we just need to copy the data onto the rwb */
1924 	if (!(io->flags & FTL_IO_MD)) {
1925 		ftl_io_call_foreach_child(io, ftl_rwb_fill_leaf);
1926 	} else {
1927 		/* Metadata has its own buffer, so it doesn't have to be copied, so just */
1928 		/* send it the the core thread and schedule the write immediately */
1929 		if (ftl_check_core_thread(dev)) {
1930 			ftl_io_call_foreach_child(io, ftl_submit_write_leaf);
1931 		} else {
1932 			spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_write, io);
1933 		}
1934 	}
1935 }
1936 
1937 int
1938 spdk_ftl_write(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
1939 	       struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
1940 {
1941 	struct ftl_io *io;
1942 
1943 	if (iov_cnt == 0) {
1944 		return -EINVAL;
1945 	}
1946 
1947 	if (lba_cnt == 0) {
1948 		return -EINVAL;
1949 	}
1950 
1951 	if (lba_cnt != ftl_iovec_num_lbks(iov, iov_cnt)) {
1952 		return -EINVAL;
1953 	}
1954 
1955 	if (!dev->initialized) {
1956 		return -EBUSY;
1957 	}
1958 
1959 	io = ftl_io_user_init(ch, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_WRITE);
1960 	if (!io) {
1961 		return -ENOMEM;
1962 	}
1963 
1964 	ftl_io_write(io);
1965 
1966 	return 0;
1967 }
1968 
1969 static int
1970 ftl_io_read_leaf(struct ftl_io *io)
1971 {
1972 	int rc;
1973 
1974 	rc = ftl_submit_read(io);
1975 	if (rc == -ENOMEM) {
1976 		/* ENOMEM means that the request was put on a pending queue */
1977 		return 0;
1978 	}
1979 
1980 	return rc;
1981 }
1982 
1983 static void
1984 _ftl_io_read(void *arg)
1985 {
1986 	ftl_io_read((struct ftl_io *)arg);
1987 }
1988 
1989 void
1990 ftl_io_read(struct ftl_io *io)
1991 {
1992 	struct spdk_ftl_dev *dev = io->dev;
1993 
1994 	if (ftl_check_read_thread(dev)) {
1995 		ftl_io_call_foreach_child(io, ftl_io_read_leaf);
1996 	} else {
1997 		spdk_thread_send_msg(ftl_get_read_thread(dev), _ftl_io_read, io);
1998 	}
1999 }
2000 
2001 int
2002 spdk_ftl_read(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
2003 	      struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
2004 {
2005 	struct ftl_io *io;
2006 
2007 	if (iov_cnt == 0) {
2008 		return -EINVAL;
2009 	}
2010 
2011 	if (lba_cnt == 0) {
2012 		return -EINVAL;
2013 	}
2014 
2015 	if (lba_cnt != ftl_iovec_num_lbks(iov, iov_cnt)) {
2016 		return -EINVAL;
2017 	}
2018 
2019 	if (!dev->initialized) {
2020 		return -EBUSY;
2021 	}
2022 
2023 	io = ftl_io_user_init(ch, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_READ);
2024 	if (!io) {
2025 		return -ENOMEM;
2026 	}
2027 
2028 	ftl_io_read(io);
2029 	return 0;
2030 }
2031 
2032 static struct ftl_flush *
2033 ftl_flush_init(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2034 {
2035 	struct ftl_flush *flush;
2036 	struct ftl_rwb *rwb = dev->rwb;
2037 
2038 	flush = calloc(1, sizeof(*flush));
2039 	if (!flush) {
2040 		return NULL;
2041 	}
2042 
2043 	flush->bmap = spdk_bit_array_create(ftl_rwb_num_batches(rwb));
2044 	if (!flush->bmap) {
2045 		goto error;
2046 	}
2047 
2048 	flush->dev = dev;
2049 	flush->cb.fn = cb_fn;
2050 	flush->cb.ctx = cb_arg;
2051 
2052 	return flush;
2053 error:
2054 	free(flush);
2055 	return NULL;
2056 }
2057 
2058 static void
2059 _ftl_flush(void *ctx)
2060 {
2061 	struct ftl_flush *flush = ctx;
2062 	struct spdk_ftl_dev *dev = flush->dev;
2063 	struct ftl_rwb *rwb = dev->rwb;
2064 	struct ftl_rwb_batch *batch;
2065 
2066 	/* Attach flush object to all non-empty batches */
2067 	ftl_rwb_foreach_batch(batch, rwb) {
2068 		if (!ftl_rwb_batch_empty(batch)) {
2069 			spdk_bit_array_set(flush->bmap, ftl_rwb_batch_get_offset(batch));
2070 			flush->num_req++;
2071 		}
2072 	}
2073 
2074 	LIST_INSERT_HEAD(&dev->flush_list, flush, list_entry);
2075 
2076 	/* If the RWB was already empty, the flush can be completed right away */
2077 	if (!flush->num_req) {
2078 		ftl_complete_flush(flush);
2079 	}
2080 }
2081 
2082 int
2083 spdk_ftl_flush(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2084 {
2085 	struct ftl_flush *flush;
2086 
2087 	if (!dev->initialized) {
2088 		return -EBUSY;
2089 	}
2090 
2091 	flush = ftl_flush_init(dev, cb_fn, cb_arg);
2092 	if (!flush) {
2093 		return -ENOMEM;
2094 	}
2095 
2096 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_flush, flush);
2097 	return 0;
2098 }
2099 
2100 static void
2101 _ftl_process_anm_event(void *ctx)
2102 {
2103 	ftl_process_anm_event((struct ftl_anm_event *)ctx);
2104 }
2105 
2106 void
2107 ftl_process_anm_event(struct ftl_anm_event *event)
2108 {
2109 	struct spdk_ftl_dev *dev = event->dev;
2110 	struct ftl_band *band;
2111 	size_t lbkoff;
2112 
2113 	if (!ftl_check_core_thread(dev)) {
2114 		spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_process_anm_event, event);
2115 		return;
2116 	}
2117 
2118 	band = ftl_band_from_ppa(dev, event->ppa);
2119 	lbkoff = ftl_band_lbkoff_from_ppa(band, event->ppa);
2120 
2121 	ftl_reloc_add(dev->reloc, band, lbkoff, event->num_lbks, 0);
2122 	ftl_anm_event_complete(event);
2123 }
2124 
2125 bool
2126 ftl_ppa_is_written(struct ftl_band *band, struct ftl_ppa ppa)
2127 {
2128 	struct ftl_chunk *chunk = ftl_band_chunk_from_ppa(band, ppa);
2129 
2130 	return ppa.lbk < chunk->write_offset;
2131 }
2132 
2133 static void
2134 ftl_process_retry_queue(struct spdk_ftl_dev *dev)
2135 {
2136 	struct ftl_io *io;
2137 	int rc;
2138 
2139 	while (!TAILQ_EMPTY(&dev->retry_queue)) {
2140 		io = TAILQ_FIRST(&dev->retry_queue);
2141 
2142 		/* Retry only if IO is still healthy */
2143 		if (spdk_likely(io->status == 0)) {
2144 			rc = ftl_submit_read(io);
2145 			if (rc == -ENOMEM) {
2146 				break;
2147 			}
2148 		}
2149 
2150 		io->flags &= ~FTL_IO_RETRY;
2151 		TAILQ_REMOVE(&dev->retry_queue, io, retry_entry);
2152 
2153 		if (ftl_io_done(io)) {
2154 			ftl_io_complete(io);
2155 		}
2156 	}
2157 }
2158 
2159 int
2160 ftl_task_read(void *ctx)
2161 {
2162 	struct ftl_thread *thread = ctx;
2163 	struct spdk_ftl_dev *dev = thread->dev;
2164 	struct spdk_nvme_qpair *qpair = ftl_get_read_qpair(dev);
2165 	size_t num_completed;
2166 
2167 	if (dev->halt) {
2168 		if (ftl_shutdown_complete(dev)) {
2169 			spdk_poller_unregister(&thread->poller);
2170 			return 0;
2171 		}
2172 	}
2173 
2174 	num_completed = spdk_nvme_qpair_process_completions(qpair, 0);
2175 
2176 	if (num_completed && !TAILQ_EMPTY(&dev->retry_queue)) {
2177 		ftl_process_retry_queue(dev);
2178 	}
2179 
2180 	return num_completed;
2181 }
2182 
2183 int
2184 ftl_task_core(void *ctx)
2185 {
2186 	struct ftl_thread *thread = ctx;
2187 	struct spdk_ftl_dev *dev = thread->dev;
2188 	struct spdk_nvme_qpair *qpair = ftl_get_write_qpair(dev);
2189 
2190 	if (dev->halt) {
2191 		if (ftl_shutdown_complete(dev)) {
2192 			spdk_poller_unregister(&thread->poller);
2193 			return 0;
2194 		}
2195 	}
2196 
2197 	ftl_process_writes(dev);
2198 	spdk_nvme_qpair_process_completions(qpair, 0);
2199 	ftl_process_relocs(dev);
2200 
2201 	return 0;
2202 }
2203 
2204 SPDK_LOG_REGISTER_COMPONENT("ftl_core", SPDK_LOG_FTL_CORE)
2205