xref: /spdk/lib/ftl/ftl_core.c (revision a15dcb0bf07debe26957eaf30ec392942910ea99)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/likely.h"
35 #include "spdk/stdinc.h"
36 #include "spdk/nvme.h"
37 #include "spdk/io_channel.h"
38 #include "spdk/bdev_module.h"
39 #include "spdk/string.h"
40 #include "spdk_internal/log.h"
41 #include "spdk/ftl.h"
42 #include "spdk/crc32.h"
43 
44 #include "ftl_core.h"
45 #include "ftl_band.h"
46 #include "ftl_io.h"
47 #include "ftl_anm.h"
48 #include "ftl_rwb.h"
49 #include "ftl_debug.h"
50 #include "ftl_reloc.h"
51 
52 struct ftl_band_flush {
53 	struct spdk_ftl_dev		*dev;
54 	/* Number of bands left to be flushed */
55 	size_t				num_bands;
56 	/* User callback */
57 	spdk_ftl_fn			cb_fn;
58 	/* Callback's argument */
59 	void				*cb_arg;
60 	/* List link */
61 	LIST_ENTRY(ftl_band_flush)	list_entry;
62 };
63 
64 struct ftl_wptr {
65 	/* Owner device */
66 	struct spdk_ftl_dev		*dev;
67 
68 	/* Current PPA */
69 	struct ftl_ppa			ppa;
70 
71 	/* Band currently being written to */
72 	struct ftl_band			*band;
73 
74 	/* Current logical block's offset */
75 	uint64_t			offset;
76 
77 	/* Current erase block */
78 	struct ftl_chunk		*chunk;
79 
80 	/* Pending IO queue */
81 	TAILQ_HEAD(, ftl_io)		pending_queue;
82 
83 	/* List link */
84 	LIST_ENTRY(ftl_wptr)		list_entry;
85 
86 	/*
87 	 * If setup in direct mode, there will be no offset or band state update after IO.
88 	 * The PPA is not assigned by wptr, and is instead taken directly from the request.
89 	 */
90 	bool				direct_mode;
91 
92 	/* Number of outstanding write requests */
93 	uint32_t			num_outstanding;
94 
95 	/* Marks that the band related to this wptr needs to be closed as soon as possible */
96 	bool				flush;
97 };
98 
99 struct ftl_flush {
100 	/* Owner device */
101 	struct spdk_ftl_dev		*dev;
102 
103 	/* Number of batches to wait for */
104 	size_t				num_req;
105 
106 	/* Callback */
107 	struct {
108 		spdk_ftl_fn		fn;
109 		void			*ctx;
110 	} cb;
111 
112 	/* Batch bitmap */
113 	struct spdk_bit_array		*bmap;
114 
115 	/* List link */
116 	LIST_ENTRY(ftl_flush)		list_entry;
117 };
118 
119 static int
120 ftl_rwb_flags_from_io(const struct ftl_io *io)
121 {
122 	int valid_flags = FTL_IO_INTERNAL | FTL_IO_WEAK | FTL_IO_PAD;
123 	return io->flags & valid_flags;
124 }
125 
126 static int
127 ftl_rwb_entry_weak(const struct ftl_rwb_entry *entry)
128 {
129 	return entry->flags & FTL_IO_WEAK;
130 }
131 
132 static void
133 ftl_wptr_free(struct ftl_wptr *wptr)
134 {
135 	if (!wptr) {
136 		return;
137 	}
138 
139 	free(wptr);
140 }
141 
142 static void
143 ftl_remove_wptr(struct ftl_wptr *wptr)
144 {
145 	struct spdk_ftl_dev *dev = wptr->dev;
146 	struct ftl_band_flush *flush, *tmp;
147 
148 	if (spdk_unlikely(wptr->flush)) {
149 		LIST_FOREACH_SAFE(flush, &dev->band_flush_list, list_entry, tmp) {
150 			assert(flush->num_bands > 0);
151 			if (--flush->num_bands == 0) {
152 				flush->cb_fn(flush->cb_arg, 0);
153 				LIST_REMOVE(flush, list_entry);
154 				free(flush);
155 			}
156 		}
157 	}
158 
159 	LIST_REMOVE(wptr, list_entry);
160 	ftl_wptr_free(wptr);
161 }
162 
163 static void
164 ftl_io_cmpl_cb(void *arg, const struct spdk_nvme_cpl *status)
165 {
166 	struct ftl_io *io = arg;
167 
168 	if (spdk_nvme_cpl_is_error(status)) {
169 		ftl_io_process_error(io, status);
170 	}
171 
172 	ftl_trace_completion(io->dev, io, FTL_TRACE_COMPLETION_DISK);
173 
174 	ftl_io_dec_req(io);
175 	if (ftl_io_done(io)) {
176 		ftl_io_complete(io);
177 	}
178 }
179 
180 static void
181 ftl_halt_writes(struct spdk_ftl_dev *dev, struct ftl_band *band)
182 {
183 	struct ftl_wptr *wptr = NULL;
184 
185 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
186 		if (wptr->band == band) {
187 			break;
188 		}
189 	}
190 
191 	/* If the band already has the high_prio flag set, other writes must */
192 	/* have failed earlier, so it's already taken care of. */
193 	if (band->high_prio) {
194 		assert(wptr == NULL);
195 		return;
196 	}
197 
198 	ftl_band_write_failed(band);
199 	ftl_remove_wptr(wptr);
200 }
201 
202 static struct ftl_wptr *
203 ftl_wptr_from_band(struct ftl_band *band)
204 {
205 	struct spdk_ftl_dev *dev = band->dev;
206 	struct ftl_wptr *wptr = NULL;
207 
208 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
209 		if (wptr->band == band) {
210 			return wptr;
211 		}
212 	}
213 
214 	return NULL;
215 }
216 
217 static void
218 ftl_md_write_fail(struct ftl_io *io, int status)
219 {
220 	struct ftl_band *band = io->band;
221 	struct ftl_wptr *wptr;
222 	char buf[128];
223 
224 	wptr = ftl_wptr_from_band(band);
225 
226 	SPDK_ERRLOG("Metadata write failed @ppa: %s, status: %d\n",
227 		    ftl_ppa2str(wptr->ppa, buf, sizeof(buf)), status);
228 
229 	ftl_halt_writes(io->dev, band);
230 }
231 
232 static void
233 ftl_md_write_cb(struct ftl_io *io, void *arg, int status)
234 {
235 	struct spdk_ftl_dev *dev = io->dev;
236 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
237 	struct ftl_band *band = io->band;
238 	struct ftl_wptr *wptr;
239 	size_t id;
240 
241 	wptr = ftl_wptr_from_band(band);
242 
243 	if (status) {
244 		ftl_md_write_fail(io, status);
245 		return;
246 	}
247 
248 	ftl_band_set_next_state(band);
249 	if (band->state == FTL_BAND_STATE_CLOSED) {
250 		if (ftl_dev_has_nv_cache(dev)) {
251 			pthread_spin_lock(&nv_cache->lock);
252 			nv_cache->num_available += ftl_band_user_lbks(band);
253 
254 			if (spdk_unlikely(nv_cache->num_available > nv_cache->num_data_blocks)) {
255 				nv_cache->num_available = nv_cache->num_data_blocks;
256 			}
257 			pthread_spin_unlock(&nv_cache->lock);
258 		}
259 
260 		/*
261 		 * Go through the reloc_bitmap, checking for all the bands that had its data moved
262 		 * onto current band and update their counters to allow them to be used for writing
263 		 * (once they're closed and empty).
264 		 */
265 		for (id = 0; id < ftl_dev_num_bands(dev); ++id) {
266 			if (spdk_bit_array_get(band->reloc_bitmap, id)) {
267 				assert(dev->bands[id].num_reloc_bands > 0);
268 				dev->bands[id].num_reloc_bands--;
269 
270 				spdk_bit_array_clear(band->reloc_bitmap, id);
271 			}
272 		}
273 
274 		ftl_remove_wptr(wptr);
275 	}
276 }
277 
278 static int
279 ftl_ppa_read_next_ppa(struct ftl_io *io, struct ftl_ppa *ppa)
280 {
281 	struct spdk_ftl_dev *dev = io->dev;
282 	size_t lbk_cnt, max_lbks;
283 
284 	assert(ftl_io_mode_ppa(io));
285 	assert(io->iov_pos < io->iov_cnt);
286 
287 	if (io->pos == 0) {
288 		*ppa = io->ppa;
289 	} else {
290 		*ppa = ftl_band_next_xfer_ppa(io->band, io->ppa, io->pos);
291 	}
292 
293 	assert(!ftl_ppa_invalid(*ppa));
294 
295 	/* Metadata has to be read in the way it's written (jumping across */
296 	/* the chunks in xfer_size increments) */
297 	if (io->flags & FTL_IO_MD) {
298 		max_lbks = dev->xfer_size - (ppa->lbk % dev->xfer_size);
299 		lbk_cnt = spdk_min(ftl_io_iovec_len_left(io), max_lbks);
300 		assert(ppa->lbk / dev->xfer_size == (ppa->lbk + lbk_cnt - 1) / dev->xfer_size);
301 	} else {
302 		lbk_cnt = ftl_io_iovec_len_left(io);
303 	}
304 
305 	return lbk_cnt;
306 }
307 
308 static int
309 ftl_wptr_close_band(struct ftl_wptr *wptr)
310 {
311 	struct ftl_band *band = wptr->band;
312 
313 	ftl_band_set_state(band, FTL_BAND_STATE_CLOSING);
314 
315 	return ftl_band_write_tail_md(band, ftl_md_write_cb);
316 }
317 
318 static int
319 ftl_wptr_open_band(struct ftl_wptr *wptr)
320 {
321 	struct ftl_band *band = wptr->band;
322 
323 	assert(ftl_band_chunk_is_first(band, wptr->chunk));
324 	assert(band->lba_map.num_vld == 0);
325 
326 	ftl_band_clear_lba_map(band);
327 
328 	assert(band->state == FTL_BAND_STATE_PREP);
329 	ftl_band_set_state(band, FTL_BAND_STATE_OPENING);
330 
331 	return ftl_band_write_head_md(band, ftl_md_write_cb);
332 }
333 
334 static int
335 ftl_submit_erase(struct ftl_io *io)
336 {
337 	struct spdk_ftl_dev *dev = io->dev;
338 	struct ftl_band *band = io->band;
339 	struct ftl_ppa ppa = io->ppa;
340 	struct ftl_chunk *chunk;
341 	uint64_t ppa_packed;
342 	int rc = 0;
343 	size_t i;
344 
345 	for (i = 0; i < io->lbk_cnt; ++i) {
346 		if (i != 0) {
347 			chunk = ftl_band_next_chunk(band, ftl_band_chunk_from_ppa(band, ppa));
348 			assert(chunk->state == FTL_CHUNK_STATE_CLOSED ||
349 			       chunk->state == FTL_CHUNK_STATE_VACANT);
350 			ppa = chunk->start_ppa;
351 		}
352 
353 		assert(ppa.lbk == 0);
354 		ppa_packed = ftl_ppa_addr_pack(dev, ppa);
355 
356 		ftl_trace_submission(dev, io, ppa, 1);
357 		rc = spdk_nvme_ocssd_ns_cmd_vector_reset(dev->ns, ftl_get_write_qpair(dev),
358 				&ppa_packed, 1, NULL, ftl_io_cmpl_cb, io);
359 		if (spdk_unlikely(rc)) {
360 			ftl_io_fail(io, rc);
361 			SPDK_ERRLOG("Vector reset failed with status: %d\n", rc);
362 			break;
363 		}
364 
365 		ftl_io_inc_req(io);
366 		ftl_io_advance(io, 1);
367 	}
368 
369 	if (ftl_io_done(io)) {
370 		ftl_io_complete(io);
371 	}
372 
373 	return rc;
374 }
375 
376 static void
377 _ftl_io_erase(void *ctx)
378 {
379 	ftl_io_erase((struct ftl_io *)ctx);
380 }
381 
382 static bool
383 ftl_check_core_thread(const struct spdk_ftl_dev *dev)
384 {
385 	return dev->core_thread.thread == spdk_get_thread();
386 }
387 
388 static bool
389 ftl_check_read_thread(const struct spdk_ftl_dev *dev)
390 {
391 	return dev->read_thread.thread == spdk_get_thread();
392 }
393 
394 int
395 ftl_io_erase(struct ftl_io *io)
396 {
397 	struct spdk_ftl_dev *dev = io->dev;
398 
399 	if (ftl_check_core_thread(dev)) {
400 		return ftl_submit_erase(io);
401 	}
402 
403 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_erase, io);
404 	return 0;
405 }
406 
407 static struct ftl_band *
408 ftl_next_write_band(struct spdk_ftl_dev *dev)
409 {
410 	struct ftl_band *band;
411 
412 	/* Find a free band that has all of its data moved onto other closed bands */
413 	LIST_FOREACH(band, &dev->free_bands, list_entry) {
414 		assert(band->state == FTL_BAND_STATE_FREE);
415 		if (band->num_reloc_bands == 0 && band->num_reloc_blocks == 0) {
416 			break;
417 		}
418 	}
419 
420 	if (spdk_unlikely(!band)) {
421 		return NULL;
422 	}
423 
424 	if (ftl_band_erase(band)) {
425 		/* TODO: handle erase failure */
426 		return NULL;
427 	}
428 
429 	return band;
430 }
431 
432 static struct ftl_band *
433 ftl_next_wptr_band(struct spdk_ftl_dev *dev)
434 {
435 	struct ftl_band *band;
436 
437 	if (!dev->next_band) {
438 		band = ftl_next_write_band(dev);
439 	} else {
440 		assert(dev->next_band->state == FTL_BAND_STATE_PREP);
441 		band = dev->next_band;
442 		dev->next_band = NULL;
443 	}
444 
445 	return band;
446 }
447 
448 static struct ftl_wptr *
449 ftl_wptr_init(struct ftl_band *band)
450 {
451 	struct spdk_ftl_dev *dev = band->dev;
452 	struct ftl_wptr *wptr;
453 
454 	wptr = calloc(1, sizeof(*wptr));
455 	if (!wptr) {
456 		return NULL;
457 	}
458 
459 	wptr->dev = dev;
460 	wptr->band = band;
461 	wptr->chunk = CIRCLEQ_FIRST(&band->chunks);
462 	wptr->ppa = wptr->chunk->start_ppa;
463 	TAILQ_INIT(&wptr->pending_queue);
464 
465 	return wptr;
466 }
467 
468 static int
469 ftl_add_direct_wptr(struct ftl_band *band)
470 {
471 	struct spdk_ftl_dev *dev = band->dev;
472 	struct ftl_wptr *wptr;
473 
474 	assert(band->state == FTL_BAND_STATE_OPEN);
475 
476 	wptr = ftl_wptr_init(band);
477 	if (!wptr) {
478 		return -1;
479 	}
480 
481 	wptr->direct_mode = true;
482 
483 	if (ftl_band_alloc_lba_map(band)) {
484 		ftl_wptr_free(wptr);
485 		return -1;
486 	}
487 
488 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
489 
490 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: direct band %u\n", band->id);
491 	ftl_trace_write_band(dev, band);
492 	return 0;
493 }
494 
495 static void
496 ftl_close_direct_wptr(struct ftl_band *band)
497 {
498 	struct ftl_wptr *wptr = ftl_wptr_from_band(band);
499 
500 	assert(wptr->direct_mode);
501 	assert(band->state == FTL_BAND_STATE_CLOSED);
502 
503 	ftl_band_release_lba_map(band);
504 
505 	ftl_remove_wptr(wptr);
506 }
507 
508 int
509 ftl_band_set_direct_access(struct ftl_band *band, bool access)
510 {
511 	if (access) {
512 		return ftl_add_direct_wptr(band);
513 	} else {
514 		ftl_close_direct_wptr(band);
515 		return 0;
516 	}
517 }
518 
519 static int
520 ftl_add_wptr(struct spdk_ftl_dev *dev)
521 {
522 	struct ftl_band *band;
523 	struct ftl_wptr *wptr;
524 
525 	band = ftl_next_wptr_band(dev);
526 	if (!band) {
527 		return -1;
528 	}
529 
530 	wptr = ftl_wptr_init(band);
531 	if (!wptr) {
532 		return -1;
533 	}
534 
535 	if (ftl_band_write_prep(band)) {
536 		ftl_wptr_free(wptr);
537 		return -1;
538 	}
539 
540 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
541 
542 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: band %u\n", band->id);
543 	ftl_trace_write_band(dev, band);
544 	return 0;
545 }
546 
547 static void
548 ftl_wptr_advance(struct ftl_wptr *wptr, size_t xfer_size)
549 {
550 	struct ftl_band *band = wptr->band;
551 	struct spdk_ftl_dev *dev = wptr->dev;
552 	struct spdk_ftl_conf *conf = &dev->conf;
553 	size_t next_thld;
554 
555 	if (spdk_unlikely(wptr->direct_mode)) {
556 		return;
557 	}
558 
559 	wptr->offset += xfer_size;
560 	next_thld = (ftl_band_num_usable_lbks(band) * conf->band_thld) / 100;
561 
562 	if (ftl_band_full(band, wptr->offset)) {
563 		ftl_band_set_state(band, FTL_BAND_STATE_FULL);
564 	}
565 
566 	wptr->chunk->busy = true;
567 	wptr->ppa = ftl_band_next_xfer_ppa(band, wptr->ppa, xfer_size);
568 	wptr->chunk = ftl_band_next_operational_chunk(band, wptr->chunk);
569 
570 	assert(!ftl_ppa_invalid(wptr->ppa));
571 
572 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: grp:%d, pu:%d chunk:%d, lbk:%u\n",
573 		      wptr->ppa.grp, wptr->ppa.pu, wptr->ppa.chk, wptr->ppa.lbk);
574 
575 	if (wptr->offset >= next_thld && !dev->next_band) {
576 		dev->next_band = ftl_next_write_band(dev);
577 	}
578 }
579 
580 static size_t
581 ftl_wptr_user_lbks_left(const struct ftl_wptr *wptr)
582 {
583 	return ftl_band_user_lbks_left(wptr->band, wptr->offset);
584 }
585 
586 static int
587 ftl_wptr_ready(struct ftl_wptr *wptr)
588 {
589 	struct ftl_band *band = wptr->band;
590 
591 	/* TODO: add handling of empty bands */
592 
593 	if (spdk_unlikely(!ftl_chunk_is_writable(wptr->chunk))) {
594 		/* Erasing band may fail after it was assigned to wptr. */
595 		if (spdk_unlikely(wptr->chunk->state == FTL_CHUNK_STATE_BAD)) {
596 			ftl_wptr_advance(wptr, wptr->dev->xfer_size);
597 		}
598 		return 0;
599 	}
600 
601 	/* If we're in the process of writing metadata, wait till it is */
602 	/* completed. */
603 	/* TODO: we should probably change bands once we're writing tail md */
604 	if (ftl_band_state_changing(band)) {
605 		return 0;
606 	}
607 
608 	if (band->state == FTL_BAND_STATE_FULL) {
609 		if (wptr->num_outstanding == 0) {
610 			if (ftl_wptr_close_band(wptr)) {
611 				/* TODO: need recovery here */
612 				assert(false);
613 			}
614 		}
615 
616 		return 0;
617 	}
618 
619 	if (band->state != FTL_BAND_STATE_OPEN) {
620 		if (ftl_wptr_open_band(wptr)) {
621 			/* TODO: need recovery here */
622 			assert(false);
623 		}
624 
625 		return 0;
626 	}
627 
628 	return 1;
629 }
630 
631 int
632 ftl_flush_active_bands(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
633 {
634 	struct ftl_wptr *wptr;
635 	struct ftl_band_flush *flush;
636 
637 	assert(ftl_get_core_thread(dev) == spdk_get_thread());
638 
639 	flush = calloc(1, sizeof(*flush));
640 	if (spdk_unlikely(!flush)) {
641 		return -ENOMEM;
642 	}
643 
644 	LIST_INSERT_HEAD(&dev->band_flush_list, flush, list_entry);
645 
646 	flush->cb_fn = cb_fn;
647 	flush->cb_arg = cb_arg;
648 	flush->dev = dev;
649 
650 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
651 		wptr->flush = true;
652 		flush->num_bands++;
653 	}
654 
655 	return 0;
656 }
657 
658 static const struct spdk_ftl_limit *
659 ftl_get_limit(const struct spdk_ftl_dev *dev, int type)
660 {
661 	assert(type < SPDK_FTL_LIMIT_MAX);
662 	return &dev->conf.defrag.limits[type];
663 }
664 
665 static bool
666 ftl_cache_lba_valid(struct spdk_ftl_dev *dev, struct ftl_rwb_entry *entry)
667 {
668 	struct ftl_ppa ppa;
669 
670 	/* If the LBA is invalid don't bother checking the md and l2p */
671 	if (spdk_unlikely(entry->lba == FTL_LBA_INVALID)) {
672 		return false;
673 	}
674 
675 	ppa = ftl_l2p_get(dev, entry->lba);
676 	if (!(ftl_ppa_cached(ppa) && ppa.offset == entry->pos)) {
677 		return false;
678 	}
679 
680 	return true;
681 }
682 
683 static void
684 ftl_evict_cache_entry(struct spdk_ftl_dev *dev, struct ftl_rwb_entry *entry)
685 {
686 	pthread_spin_lock(&entry->lock);
687 
688 	if (!ftl_rwb_entry_valid(entry)) {
689 		goto unlock;
690 	}
691 
692 	/* If the l2p wasn't updated and still points at the entry, fill it with the */
693 	/* on-disk PPA and clear the cache status bit. Otherwise, skip the l2p update */
694 	/* and just clear the cache status. */
695 	if (!ftl_cache_lba_valid(dev, entry)) {
696 		goto clear;
697 	}
698 
699 	ftl_l2p_set(dev, entry->lba, entry->ppa);
700 clear:
701 	ftl_rwb_entry_invalidate(entry);
702 unlock:
703 	pthread_spin_unlock(&entry->lock);
704 }
705 
706 static struct ftl_rwb_entry *
707 ftl_acquire_entry(struct spdk_ftl_dev *dev, int flags)
708 {
709 	struct ftl_rwb_entry *entry;
710 
711 	entry = ftl_rwb_acquire(dev->rwb, ftl_rwb_type_from_flags(flags));
712 	if (!entry) {
713 		return NULL;
714 	}
715 
716 	ftl_evict_cache_entry(dev, entry);
717 
718 	entry->flags = flags;
719 	return entry;
720 }
721 
722 static void
723 ftl_rwb_pad(struct spdk_ftl_dev *dev, size_t size)
724 {
725 	struct ftl_rwb_entry *entry;
726 	int flags = FTL_IO_PAD | FTL_IO_INTERNAL;
727 
728 	for (size_t i = 0; i < size; ++i) {
729 		entry = ftl_acquire_entry(dev, flags);
730 		if (!entry) {
731 			break;
732 		}
733 
734 		entry->lba = FTL_LBA_INVALID;
735 		entry->ppa = ftl_to_ppa(FTL_PPA_INVALID);
736 		memset(entry->data, 0, FTL_BLOCK_SIZE);
737 		ftl_rwb_push(entry);
738 	}
739 }
740 
741 static void
742 ftl_remove_free_bands(struct spdk_ftl_dev *dev)
743 {
744 	while (!LIST_EMPTY(&dev->free_bands)) {
745 		LIST_REMOVE(LIST_FIRST(&dev->free_bands), list_entry);
746 	}
747 
748 	dev->next_band = NULL;
749 }
750 
751 static void
752 ftl_wptr_pad_band(struct ftl_wptr *wptr)
753 {
754 	struct spdk_ftl_dev *dev = wptr->dev;
755 	size_t size = ftl_rwb_num_acquired(dev->rwb, FTL_RWB_TYPE_INTERNAL) +
756 		      ftl_rwb_num_acquired(dev->rwb, FTL_RWB_TYPE_USER);
757 	size_t blocks_left, rwb_size, pad_size;
758 
759 	blocks_left = ftl_wptr_user_lbks_left(wptr);
760 	rwb_size = ftl_rwb_size(dev->rwb) - size;
761 	pad_size = spdk_min(blocks_left, rwb_size);
762 
763 	/* Pad write buffer until band is full */
764 	ftl_rwb_pad(dev, pad_size);
765 }
766 
767 static void
768 ftl_wptr_process_shutdown(struct ftl_wptr *wptr)
769 {
770 	struct spdk_ftl_dev *dev = wptr->dev;
771 	size_t size = ftl_rwb_num_acquired(dev->rwb, FTL_RWB_TYPE_INTERNAL) +
772 		      ftl_rwb_num_acquired(dev->rwb, FTL_RWB_TYPE_USER);
773 	size_t num_active = dev->xfer_size * ftl_rwb_get_active_batches(dev->rwb);
774 
775 	num_active = num_active ? num_active : dev->xfer_size;
776 	if (size >= num_active) {
777 		return;
778 	}
779 
780 	/* If we reach this point we need to remove free bands */
781 	/* and pad current wptr band to the end */
782 	if (ftl_rwb_get_active_batches(dev->rwb) <= 1) {
783 		ftl_remove_free_bands(dev);
784 	}
785 
786 	ftl_wptr_pad_band(wptr);
787 }
788 
789 static int
790 ftl_shutdown_complete(struct spdk_ftl_dev *dev)
791 {
792 	return !__atomic_load_n(&dev->num_inflight, __ATOMIC_SEQ_CST) &&
793 	       LIST_EMPTY(&dev->wptr_list);
794 }
795 
796 void
797 ftl_apply_limits(struct spdk_ftl_dev *dev)
798 {
799 	const struct spdk_ftl_limit *limit;
800 	struct ftl_stats *stats = &dev->stats;
801 	size_t rwb_limit[FTL_RWB_TYPE_MAX];
802 	int i;
803 
804 	ftl_rwb_get_limits(dev->rwb, rwb_limit);
805 
806 	/* Clear existing limit */
807 	dev->limit = SPDK_FTL_LIMIT_MAX;
808 
809 	for (i = SPDK_FTL_LIMIT_CRIT; i < SPDK_FTL_LIMIT_MAX; ++i) {
810 		limit = ftl_get_limit(dev, i);
811 
812 		if (dev->num_free <= limit->thld) {
813 			rwb_limit[FTL_RWB_TYPE_USER] =
814 				(limit->limit * ftl_rwb_entry_cnt(dev->rwb)) / 100;
815 			stats->limits[i]++;
816 			dev->limit = i;
817 			goto apply;
818 		}
819 	}
820 
821 	/* Clear the limits, since we don't need to apply them anymore */
822 	rwb_limit[FTL_RWB_TYPE_USER] = ftl_rwb_entry_cnt(dev->rwb);
823 apply:
824 	ftl_trace_limits(dev, rwb_limit, dev->num_free);
825 	ftl_rwb_set_limits(dev->rwb, rwb_limit);
826 }
827 
828 static int
829 ftl_invalidate_addr_unlocked(struct spdk_ftl_dev *dev, struct ftl_ppa ppa)
830 {
831 	struct ftl_band *band = ftl_band_from_ppa(dev, ppa);
832 	struct ftl_lba_map *lba_map = &band->lba_map;
833 	uint64_t offset;
834 
835 	offset = ftl_band_lbkoff_from_ppa(band, ppa);
836 
837 	/* The bit might be already cleared if two writes are scheduled to the */
838 	/* same LBA at the same time */
839 	if (spdk_bit_array_get(lba_map->vld, offset)) {
840 		assert(lba_map->num_vld > 0);
841 		spdk_bit_array_clear(lba_map->vld, offset);
842 		lba_map->num_vld--;
843 		return 1;
844 	}
845 
846 	return 0;
847 }
848 
849 int
850 ftl_invalidate_addr(struct spdk_ftl_dev *dev, struct ftl_ppa ppa)
851 {
852 	struct ftl_band *band;
853 	int rc;
854 
855 	assert(!ftl_ppa_cached(ppa));
856 	band = ftl_band_from_ppa(dev, ppa);
857 
858 	pthread_spin_lock(&band->lba_map.lock);
859 	rc = ftl_invalidate_addr_unlocked(dev, ppa);
860 	pthread_spin_unlock(&band->lba_map.lock);
861 
862 	return rc;
863 }
864 
865 static int
866 ftl_read_retry(int rc)
867 {
868 	return rc == -EAGAIN;
869 }
870 
871 static int
872 ftl_read_canceled(int rc)
873 {
874 	return rc == -EFAULT || rc == 0;
875 }
876 
877 static void
878 ftl_add_to_retry_queue(struct ftl_io *io)
879 {
880 	if (!(io->flags & FTL_IO_RETRY)) {
881 		io->flags |= FTL_IO_RETRY;
882 		TAILQ_INSERT_TAIL(&io->dev->retry_queue, io, retry_entry);
883 	}
884 }
885 
886 static int
887 ftl_ppa_cache_read(struct ftl_io *io, uint64_t lba,
888 		   struct ftl_ppa ppa, void *buf)
889 {
890 	struct ftl_rwb *rwb = io->dev->rwb;
891 	struct ftl_rwb_entry *entry;
892 	struct ftl_ppa nppa;
893 	int rc = 0;
894 
895 	entry = ftl_rwb_entry_from_offset(rwb, ppa.offset);
896 	pthread_spin_lock(&entry->lock);
897 
898 	nppa = ftl_l2p_get(io->dev, lba);
899 	if (ppa.ppa != nppa.ppa) {
900 		rc = -1;
901 		goto out;
902 	}
903 
904 	memcpy(buf, entry->data, FTL_BLOCK_SIZE);
905 out:
906 	pthread_spin_unlock(&entry->lock);
907 	return rc;
908 }
909 
910 static int
911 ftl_lba_read_next_ppa(struct ftl_io *io, struct ftl_ppa *ppa)
912 {
913 	struct spdk_ftl_dev *dev = io->dev;
914 	struct ftl_ppa next_ppa;
915 	size_t i;
916 
917 	*ppa = ftl_l2p_get(dev, ftl_io_current_lba(io));
918 
919 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Read ppa:%lx, lba:%lu\n",
920 		      ppa->ppa, ftl_io_current_lba(io));
921 
922 	/* If the PPA is invalid, skip it (the buffer should already be zero'ed) */
923 	if (ftl_ppa_invalid(*ppa)) {
924 		return -EFAULT;
925 	}
926 
927 	if (ftl_ppa_cached(*ppa)) {
928 		if (!ftl_ppa_cache_read(io, ftl_io_current_lba(io), *ppa, ftl_io_iovec_addr(io))) {
929 			return 0;
930 		}
931 
932 		/* If the state changed, we have to re-read the l2p */
933 		return -EAGAIN;
934 	}
935 
936 	for (i = 1; i < ftl_io_iovec_len_left(io); ++i) {
937 		next_ppa = ftl_l2p_get(dev, ftl_io_get_lba(io, io->pos + i));
938 
939 		if (ftl_ppa_invalid(next_ppa) || ftl_ppa_cached(next_ppa)) {
940 			break;
941 		}
942 
943 		if (ftl_ppa_addr_pack(dev, *ppa) + i != ftl_ppa_addr_pack(dev, next_ppa)) {
944 			break;
945 		}
946 	}
947 
948 	return i;
949 }
950 
951 static int
952 ftl_submit_read(struct ftl_io *io)
953 {
954 	struct spdk_ftl_dev *dev = io->dev;
955 	struct ftl_ppa ppa;
956 	int rc = 0, lbk_cnt;
957 
958 	assert(LIST_EMPTY(&io->children));
959 
960 	while (io->pos < io->lbk_cnt) {
961 		if (ftl_io_mode_ppa(io)) {
962 			lbk_cnt = rc = ftl_ppa_read_next_ppa(io, &ppa);
963 		} else {
964 			lbk_cnt = rc = ftl_lba_read_next_ppa(io, &ppa);
965 		}
966 
967 		/* We might need to retry the read from scratch (e.g. */
968 		/* because write was under way and completed before */
969 		/* we could read it from rwb */
970 		if (ftl_read_retry(rc)) {
971 			continue;
972 		}
973 
974 		/* We don't have to schedule the read, as it was read from cache */
975 		if (ftl_read_canceled(rc)) {
976 			ftl_io_advance(io, 1);
977 			ftl_trace_completion(io->dev, io, rc ? FTL_TRACE_COMPLETION_INVALID :
978 					     FTL_TRACE_COMPLETION_CACHE);
979 			rc = 0;
980 			continue;
981 		}
982 
983 		assert(lbk_cnt > 0);
984 
985 		ftl_trace_submission(dev, io, ppa, lbk_cnt);
986 		rc = spdk_nvme_ns_cmd_read(dev->ns, ftl_get_read_qpair(dev),
987 					   ftl_io_iovec_addr(io),
988 					   ftl_ppa_addr_pack(io->dev, ppa), lbk_cnt,
989 					   ftl_io_cmpl_cb, io, 0);
990 		if (spdk_unlikely(rc)) {
991 			if (rc == -ENOMEM) {
992 				ftl_add_to_retry_queue(io);
993 			} else {
994 				ftl_io_fail(io, rc);
995 			}
996 			break;
997 		}
998 
999 		ftl_io_inc_req(io);
1000 		ftl_io_advance(io, lbk_cnt);
1001 	}
1002 
1003 	/* If we didn't have to read anything from the device, */
1004 	/* complete the request right away */
1005 	if (ftl_io_done(io)) {
1006 		ftl_io_complete(io);
1007 	}
1008 
1009 	return rc;
1010 }
1011 
1012 static void
1013 ftl_complete_flush(struct ftl_flush *flush)
1014 {
1015 	assert(flush->num_req == 0);
1016 	LIST_REMOVE(flush, list_entry);
1017 
1018 	flush->cb.fn(flush->cb.ctx, 0);
1019 
1020 	spdk_bit_array_free(&flush->bmap);
1021 	free(flush);
1022 }
1023 
1024 static void
1025 ftl_process_flush(struct spdk_ftl_dev *dev, struct ftl_rwb_batch *batch)
1026 {
1027 	struct ftl_flush *flush, *tflush;
1028 	size_t offset;
1029 
1030 	LIST_FOREACH_SAFE(flush, &dev->flush_list, list_entry, tflush) {
1031 		offset = ftl_rwb_batch_get_offset(batch);
1032 
1033 		if (spdk_bit_array_get(flush->bmap, offset)) {
1034 			spdk_bit_array_clear(flush->bmap, offset);
1035 			if (!(--flush->num_req)) {
1036 				ftl_complete_flush(flush);
1037 			}
1038 		}
1039 	}
1040 }
1041 
1042 static void
1043 ftl_nv_cache_wrap_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1044 {
1045 	struct ftl_nv_cache *nv_cache = cb_arg;
1046 
1047 	if (!success) {
1048 		SPDK_ERRLOG("Unable to write non-volatile cache metadata header\n");
1049 		/* TODO: go into read-only mode */
1050 		assert(0);
1051 	}
1052 
1053 	pthread_spin_lock(&nv_cache->lock);
1054 	nv_cache->ready = true;
1055 	pthread_spin_unlock(&nv_cache->lock);
1056 
1057 	spdk_bdev_free_io(bdev_io);
1058 }
1059 
1060 static void
1061 ftl_nv_cache_wrap(void *ctx)
1062 {
1063 	struct ftl_nv_cache *nv_cache = ctx;
1064 	int rc;
1065 
1066 	rc = ftl_nv_cache_write_header(nv_cache, ftl_nv_cache_wrap_cb, nv_cache);
1067 	if (spdk_unlikely(rc != 0)) {
1068 		SPDK_ERRLOG("Unable to write non-volatile cache metadata header: %s\n",
1069 			    spdk_strerror(-rc));
1070 		/* TODO: go into read-only mode */
1071 		assert(0);
1072 	}
1073 }
1074 
1075 static uint64_t
1076 ftl_reserve_nv_cache(struct ftl_nv_cache *nv_cache, size_t *num_lbks, unsigned int *phase)
1077 {
1078 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1079 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1080 	uint64_t num_available, cache_size, cache_addr = FTL_LBA_INVALID;
1081 
1082 	cache_size = spdk_bdev_get_num_blocks(bdev);
1083 
1084 	pthread_spin_lock(&nv_cache->lock);
1085 	if (spdk_unlikely(nv_cache->num_available == 0 || !nv_cache->ready)) {
1086 		goto out;
1087 	}
1088 
1089 	num_available = spdk_min(nv_cache->num_available, *num_lbks);
1090 	num_available = spdk_min(num_available, dev->conf.nv_cache.max_request_cnt);
1091 
1092 	if (spdk_unlikely(nv_cache->current_addr + num_available > cache_size)) {
1093 		*num_lbks = cache_size - nv_cache->current_addr;
1094 	} else {
1095 		*num_lbks = num_available;
1096 	}
1097 
1098 	cache_addr = nv_cache->current_addr;
1099 	nv_cache->current_addr += *num_lbks;
1100 	nv_cache->num_available -= *num_lbks;
1101 	*phase = nv_cache->phase;
1102 
1103 	if (nv_cache->current_addr == spdk_bdev_get_num_blocks(bdev)) {
1104 		nv_cache->current_addr = FTL_NV_CACHE_DATA_OFFSET;
1105 		nv_cache->phase = ftl_nv_cache_next_phase(nv_cache->phase);
1106 		nv_cache->ready = false;
1107 		spdk_thread_send_msg(ftl_get_core_thread(dev), ftl_nv_cache_wrap, nv_cache);
1108 	}
1109 out:
1110 	pthread_spin_unlock(&nv_cache->lock);
1111 	return cache_addr;
1112 }
1113 
1114 static struct ftl_io *
1115 ftl_alloc_io_nv_cache(struct ftl_io *parent, size_t num_lbks)
1116 {
1117 	struct ftl_io_init_opts opts = {
1118 		.dev		= parent->dev,
1119 		.parent		= parent,
1120 		.data		= ftl_io_iovec_addr(parent),
1121 		.lbk_cnt	= num_lbks,
1122 		.flags		= parent->flags | FTL_IO_CACHE,
1123 	};
1124 
1125 	return ftl_io_init_internal(&opts);
1126 }
1127 
1128 static void
1129 ftl_nv_cache_submit_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1130 {
1131 	struct ftl_io *io = cb_arg;
1132 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1133 
1134 	if (spdk_unlikely(!success)) {
1135 		SPDK_ERRLOG("Non-volatile cache write failed at %"PRIx64"\n", io->ppa.ppa);
1136 		io->status = -EIO;
1137 	}
1138 
1139 	ftl_io_dec_req(io);
1140 	if (ftl_io_done(io)) {
1141 		spdk_mempool_put(nv_cache->md_pool, io->md);
1142 		ftl_io_complete(io);
1143 	}
1144 
1145 	spdk_bdev_free_io(bdev_io);
1146 }
1147 
1148 static void
1149 ftl_submit_nv_cache(void *ctx)
1150 {
1151 	struct ftl_io *io = ctx;
1152 	struct spdk_ftl_dev *dev = io->dev;
1153 	struct spdk_thread *thread;
1154 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
1155 	struct ftl_io_channel *ioch;
1156 	int rc;
1157 
1158 	ioch = spdk_io_channel_get_ctx(io->ioch);
1159 	thread = spdk_io_channel_get_thread(io->ioch);
1160 
1161 	rc = spdk_bdev_write_blocks_with_md(nv_cache->bdev_desc, ioch->cache_ioch,
1162 					    ftl_io_iovec_addr(io), io->md, io->ppa.ppa,
1163 					    io->lbk_cnt, ftl_nv_cache_submit_cb, io);
1164 	if (rc == -ENOMEM) {
1165 		spdk_thread_send_msg(thread, ftl_submit_nv_cache, io);
1166 		return;
1167 	} else if (rc) {
1168 		SPDK_ERRLOG("Write to persistent cache failed: %s (%"PRIu64", %"PRIu64")\n",
1169 			    spdk_strerror(-rc), io->ppa.ppa, io->lbk_cnt);
1170 		spdk_mempool_put(nv_cache->md_pool, io->md);
1171 		io->status = -EIO;
1172 		ftl_io_complete(io);
1173 		return;
1174 	}
1175 
1176 	ftl_io_advance(io, io->lbk_cnt);
1177 	ftl_io_inc_req(io);
1178 }
1179 
1180 static void
1181 ftl_nv_cache_fill_md(struct ftl_io *io, unsigned int phase)
1182 {
1183 	struct spdk_bdev *bdev;
1184 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1185 	uint64_t lbk_off, lba;
1186 	void *md_buf = io->md;
1187 
1188 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1189 
1190 	for (lbk_off = 0; lbk_off < io->lbk_cnt; ++lbk_off) {
1191 		lba = ftl_nv_cache_pack_lba(ftl_io_get_lba(io, lbk_off), phase);
1192 		memcpy(md_buf, &lba, sizeof(lba));
1193 		md_buf += spdk_bdev_get_md_size(bdev);
1194 	}
1195 }
1196 
1197 static void
1198 _ftl_write_nv_cache(void *ctx)
1199 {
1200 	struct ftl_io *child, *io = ctx;
1201 	struct spdk_ftl_dev *dev = io->dev;
1202 	struct spdk_thread *thread;
1203 	unsigned int phase;
1204 	uint64_t num_lbks;
1205 
1206 	thread = spdk_io_channel_get_thread(io->ioch);
1207 
1208 	while (io->pos < io->lbk_cnt) {
1209 		num_lbks = ftl_io_iovec_len_left(io);
1210 
1211 		child = ftl_alloc_io_nv_cache(io, num_lbks);
1212 		if (spdk_unlikely(!child)) {
1213 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1214 			return;
1215 		}
1216 
1217 		child->md = spdk_mempool_get(dev->nv_cache.md_pool);
1218 		if (spdk_unlikely(!child->md)) {
1219 			ftl_io_free(child);
1220 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1221 			break;
1222 		}
1223 
1224 		/* Reserve area on the write buffer cache */
1225 		child->ppa.ppa = ftl_reserve_nv_cache(&dev->nv_cache, &num_lbks, &phase);
1226 		if (child->ppa.ppa == FTL_LBA_INVALID) {
1227 			spdk_mempool_put(dev->nv_cache.md_pool, child->md);
1228 			ftl_io_free(child);
1229 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1230 			break;
1231 		}
1232 
1233 		/* Shrink the IO if there isn't enough room in the cache to fill the whole iovec */
1234 		if (spdk_unlikely(num_lbks != ftl_io_iovec_len_left(io))) {
1235 			ftl_io_shrink_iovec(child, num_lbks);
1236 		}
1237 
1238 		ftl_nv_cache_fill_md(child, phase);
1239 		ftl_submit_nv_cache(child);
1240 	}
1241 
1242 	if (ftl_io_done(io)) {
1243 		ftl_io_complete(io);
1244 	}
1245 }
1246 
1247 static void
1248 ftl_write_nv_cache(struct ftl_io *parent)
1249 {
1250 	ftl_io_reset(parent);
1251 	parent->flags |= FTL_IO_CACHE;
1252 	_ftl_write_nv_cache(parent);
1253 }
1254 
1255 int
1256 ftl_nv_cache_write_header(struct ftl_nv_cache *nv_cache, spdk_bdev_io_completion_cb cb_fn,
1257 			  void *cb_arg)
1258 {
1259 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1260 	struct ftl_nv_cache_header *hdr = nv_cache->dma_buf;
1261 	struct spdk_bdev *bdev;
1262 	struct ftl_io_channel *ioch;
1263 
1264 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1265 	ioch = spdk_io_channel_get_ctx(dev->ioch);
1266 
1267 	memset(hdr, 0, spdk_bdev_get_block_size(bdev));
1268 
1269 	hdr->phase = (uint8_t)nv_cache->phase;
1270 	hdr->size = spdk_bdev_get_num_blocks(bdev);
1271 	hdr->uuid = dev->uuid;
1272 	hdr->version = FTL_NV_CACHE_HEADER_VERSION;
1273 	hdr->checksum = spdk_crc32c_update(hdr, offsetof(struct ftl_nv_cache_header, checksum), 0);
1274 
1275 	return spdk_bdev_write_blocks(nv_cache->bdev_desc, ioch->cache_ioch, hdr, 0, 1,
1276 				      cb_fn, cb_arg);
1277 }
1278 
1279 int
1280 ftl_nv_cache_scrub(struct ftl_nv_cache *nv_cache, spdk_bdev_io_completion_cb cb_fn, void *cb_arg)
1281 {
1282 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1283 	struct ftl_io_channel *ioch;
1284 	struct spdk_bdev *bdev;
1285 
1286 	ioch = spdk_io_channel_get_ctx(dev->ioch);
1287 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1288 
1289 	return spdk_bdev_write_zeroes_blocks(nv_cache->bdev_desc, ioch->cache_ioch, 1,
1290 					     spdk_bdev_get_num_blocks(bdev) - 1,
1291 					     cb_fn, cb_arg);
1292 }
1293 
1294 static void
1295 ftl_write_fail(struct ftl_io *io, int status)
1296 {
1297 	struct ftl_rwb_batch *batch = io->rwb_batch;
1298 	struct spdk_ftl_dev *dev = io->dev;
1299 	struct ftl_rwb_entry *entry;
1300 	struct ftl_band *band;
1301 	char buf[128];
1302 
1303 	entry = ftl_rwb_batch_first_entry(batch);
1304 
1305 	band = ftl_band_from_ppa(io->dev, entry->ppa);
1306 	SPDK_ERRLOG("Write failed @ppa: %s, status: %d\n",
1307 		    ftl_ppa2str(entry->ppa, buf, sizeof(buf)), status);
1308 
1309 	/* Close the band and, halt wptr and defrag */
1310 	ftl_halt_writes(dev, band);
1311 
1312 	ftl_rwb_foreach(entry, batch) {
1313 		/* Invalidate meta set by process_writes() */
1314 		ftl_invalidate_addr(dev, entry->ppa);
1315 	}
1316 
1317 	/* Reset the batch back to the the RWB to resend it later */
1318 	ftl_rwb_batch_revert(batch);
1319 }
1320 
1321 static void
1322 ftl_write_cb(struct ftl_io *io, void *arg, int status)
1323 {
1324 	struct spdk_ftl_dev *dev = io->dev;
1325 	struct ftl_rwb_batch *batch = io->rwb_batch;
1326 	struct ftl_rwb_entry *entry;
1327 	struct ftl_band *band;
1328 
1329 	if (status) {
1330 		ftl_write_fail(io, status);
1331 		return;
1332 	}
1333 
1334 	assert(io->lbk_cnt == dev->xfer_size);
1335 	ftl_rwb_foreach(entry, batch) {
1336 		band = entry->band;
1337 		if (!(io->flags & FTL_IO_MD) && !(entry->flags & FTL_IO_PAD)) {
1338 			/* Verify that the LBA is set for user lbks */
1339 			assert(entry->lba != FTL_LBA_INVALID);
1340 		}
1341 
1342 		if (band != NULL) {
1343 			assert(band->num_reloc_blocks > 0);
1344 			band->num_reloc_blocks--;
1345 		}
1346 
1347 		SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write ppa:%lu, lba:%lu\n",
1348 			      entry->ppa.ppa, entry->lba);
1349 	}
1350 
1351 	ftl_process_flush(dev, batch);
1352 	ftl_rwb_batch_release(batch);
1353 }
1354 
1355 static void
1356 ftl_update_rwb_stats(struct spdk_ftl_dev *dev, const struct ftl_rwb_entry *entry)
1357 {
1358 	if (!ftl_rwb_entry_internal(entry)) {
1359 		dev->stats.write_user++;
1360 	}
1361 	dev->stats.write_total++;
1362 }
1363 
1364 static void
1365 ftl_update_l2p(struct spdk_ftl_dev *dev, const struct ftl_rwb_entry *entry,
1366 	       struct ftl_ppa ppa)
1367 {
1368 	struct ftl_ppa prev_ppa;
1369 	struct ftl_rwb_entry *prev;
1370 	struct ftl_band *band;
1371 	int valid;
1372 
1373 	prev_ppa = ftl_l2p_get(dev, entry->lba);
1374 	if (ftl_ppa_invalid(prev_ppa)) {
1375 		ftl_l2p_set(dev, entry->lba, ppa);
1376 		return;
1377 	}
1378 
1379 	/* If the L2P's PPA is different than what we expected we don't need to */
1380 	/* do anything (someone's already overwritten our data). */
1381 	if (ftl_rwb_entry_weak(entry) && !ftl_ppa_cmp(prev_ppa, entry->ppa)) {
1382 		return;
1383 	}
1384 
1385 	if (ftl_ppa_cached(prev_ppa)) {
1386 		assert(!ftl_rwb_entry_weak(entry));
1387 		prev = ftl_rwb_entry_from_offset(dev->rwb, prev_ppa.offset);
1388 		pthread_spin_lock(&prev->lock);
1389 
1390 		/* Re-read the L2P under the lock to protect against updates */
1391 		/* to this LBA from other threads */
1392 		prev_ppa = ftl_l2p_get(dev, entry->lba);
1393 
1394 		/* If the entry is no longer in cache, another write has been */
1395 		/* scheduled in the meantime, so we have to invalidate its LBA */
1396 		if (!ftl_ppa_cached(prev_ppa)) {
1397 			ftl_invalidate_addr(dev, prev_ppa);
1398 		}
1399 
1400 		/* If previous entry is part of cache, remove and invalidate it */
1401 		if (ftl_rwb_entry_valid(prev)) {
1402 			ftl_invalidate_addr(dev, prev->ppa);
1403 			ftl_rwb_entry_invalidate(prev);
1404 		}
1405 
1406 		ftl_l2p_set(dev, entry->lba, ppa);
1407 		pthread_spin_unlock(&prev->lock);
1408 		return;
1409 	}
1410 
1411 	/* Lock the band containing previous PPA. This assures atomic changes to */
1412 	/* the L2P as wall as metadata. The valid bits in metadata are used to */
1413 	/* check weak writes validity. */
1414 	band = ftl_band_from_ppa(dev, prev_ppa);
1415 	pthread_spin_lock(&band->lba_map.lock);
1416 
1417 	valid = ftl_invalidate_addr_unlocked(dev, prev_ppa);
1418 
1419 	/* If the address has been invalidated already, we don't want to update */
1420 	/* the L2P for weak writes, as it means the write is no longer valid. */
1421 	if (!ftl_rwb_entry_weak(entry) || valid) {
1422 		ftl_l2p_set(dev, entry->lba, ppa);
1423 	}
1424 
1425 	pthread_spin_unlock(&band->lba_map.lock);
1426 }
1427 
1428 static struct ftl_io *
1429 ftl_io_init_child_write(struct ftl_io *parent, struct ftl_ppa ppa,
1430 			void *data, void *md, ftl_io_fn cb)
1431 {
1432 	struct ftl_io *io;
1433 	struct spdk_ftl_dev *dev = parent->dev;
1434 	struct ftl_io_init_opts opts = {
1435 		.dev		= dev,
1436 		.io		= NULL,
1437 		.parent		= parent,
1438 		.rwb_batch	= NULL,
1439 		.band		= parent->band,
1440 		.size		= sizeof(struct ftl_io),
1441 		.flags		= 0,
1442 		.type		= FTL_IO_WRITE,
1443 		.lbk_cnt	= dev->xfer_size,
1444 		.cb_fn		= cb,
1445 		.data		= data,
1446 		.md		= md,
1447 	};
1448 
1449 	io = ftl_io_init_internal(&opts);
1450 	if (!io) {
1451 		return NULL;
1452 	}
1453 
1454 	io->ppa = ppa;
1455 
1456 	return io;
1457 }
1458 
1459 static void
1460 ftl_io_child_write_cb(struct ftl_io *io, void *ctx, int status)
1461 {
1462 	struct ftl_chunk *chunk;
1463 	struct ftl_wptr *wptr;
1464 
1465 	chunk = ftl_band_chunk_from_ppa(io->band, io->ppa);
1466 	wptr = ftl_wptr_from_band(io->band);
1467 
1468 	chunk->busy = false;
1469 	chunk->write_offset += io->lbk_cnt;
1470 	wptr->num_outstanding--;
1471 }
1472 
1473 static int
1474 ftl_submit_child_write(struct ftl_wptr *wptr, struct ftl_io *io, int lbk_cnt)
1475 {
1476 	struct spdk_ftl_dev	*dev = io->dev;
1477 	struct ftl_io		*child;
1478 	int			rc;
1479 	struct ftl_ppa		ppa;
1480 
1481 	if (spdk_likely(!wptr->direct_mode)) {
1482 		ppa = wptr->ppa;
1483 	} else {
1484 		assert(io->flags & FTL_IO_DIRECT_ACCESS);
1485 		assert(io->ppa.chk == wptr->band->id);
1486 		ppa = io->ppa;
1487 	}
1488 
1489 	/* Split IO to child requests and release chunk immediately after child is completed */
1490 	child = ftl_io_init_child_write(io, ppa, ftl_io_iovec_addr(io),
1491 					ftl_io_get_md(io), ftl_io_child_write_cb);
1492 	if (!child) {
1493 		return -EAGAIN;
1494 	}
1495 
1496 	wptr->num_outstanding++;
1497 	rc = spdk_nvme_ns_cmd_write_with_md(dev->ns, ftl_get_write_qpair(dev),
1498 					    ftl_io_iovec_addr(child), child->md,
1499 					    ftl_ppa_addr_pack(dev, ppa),
1500 					    lbk_cnt, ftl_io_cmpl_cb, child, 0, 0, 0);
1501 	if (rc) {
1502 		wptr->num_outstanding--;
1503 		ftl_io_fail(child, rc);
1504 		ftl_io_complete(child);
1505 		SPDK_ERRLOG("spdk_nvme_ns_cmd_write_with_md failed with status:%d, ppa:%lu\n",
1506 			    rc, ppa.ppa);
1507 		return -EIO;
1508 	}
1509 
1510 	ftl_io_inc_req(child);
1511 	ftl_io_advance(child, lbk_cnt);
1512 
1513 	return 0;
1514 }
1515 
1516 static int
1517 ftl_submit_write(struct ftl_wptr *wptr, struct ftl_io *io)
1518 {
1519 	struct spdk_ftl_dev	*dev = io->dev;
1520 	int			rc = 0;
1521 
1522 	assert(io->lbk_cnt % dev->xfer_size == 0);
1523 
1524 	while (io->iov_pos < io->iov_cnt) {
1525 		/* There are no guarantees of the order of completion of NVMe IO submission queue */
1526 		/* so wait until chunk is not busy before submitting another write */
1527 		if (wptr->chunk->busy) {
1528 			TAILQ_INSERT_TAIL(&wptr->pending_queue, io, retry_entry);
1529 			rc = -EAGAIN;
1530 			break;
1531 		}
1532 
1533 		rc = ftl_submit_child_write(wptr, io, dev->xfer_size);
1534 		if (spdk_unlikely(rc)) {
1535 			if (rc == -EAGAIN) {
1536 				TAILQ_INSERT_TAIL(&wptr->pending_queue, io, retry_entry);
1537 			} else {
1538 				ftl_io_fail(io, rc);
1539 			}
1540 			break;
1541 		}
1542 
1543 		ftl_trace_submission(dev, io, wptr->ppa, dev->xfer_size);
1544 		ftl_wptr_advance(wptr, dev->xfer_size);
1545 	}
1546 
1547 	if (ftl_io_done(io)) {
1548 		/* Parent IO will complete after all children are completed */
1549 		ftl_io_complete(io);
1550 	}
1551 
1552 	return rc;
1553 }
1554 
1555 static void
1556 ftl_flush_pad_batch(struct spdk_ftl_dev *dev)
1557 {
1558 	struct ftl_rwb *rwb = dev->rwb;
1559 	size_t size, num_entries;
1560 
1561 	size = ftl_rwb_num_acquired(rwb, FTL_RWB_TYPE_INTERNAL) +
1562 	       ftl_rwb_num_acquired(rwb, FTL_RWB_TYPE_USER);
1563 
1564 	/* There must be something in the RWB, otherwise the flush */
1565 	/* wouldn't be waiting for anything */
1566 	assert(size > 0);
1567 
1568 	/* Only add padding when there's less than xfer size */
1569 	/* entries in the buffer. Otherwise we just have to wait */
1570 	/* for the entries to become ready. */
1571 	num_entries = ftl_rwb_get_active_batches(dev->rwb) * dev->xfer_size;
1572 	if (size < num_entries) {
1573 		ftl_rwb_pad(dev, num_entries - (size % num_entries));
1574 	}
1575 }
1576 
1577 static int
1578 ftl_wptr_process_writes(struct ftl_wptr *wptr)
1579 {
1580 	struct spdk_ftl_dev	*dev = wptr->dev;
1581 	struct ftl_rwb_batch	*batch;
1582 	struct ftl_rwb_entry	*entry;
1583 	struct ftl_io		*io;
1584 	struct ftl_ppa		ppa, prev_ppa;
1585 
1586 	if (spdk_unlikely(!TAILQ_EMPTY(&wptr->pending_queue))) {
1587 		io = TAILQ_FIRST(&wptr->pending_queue);
1588 		TAILQ_REMOVE(&wptr->pending_queue, io, retry_entry);
1589 
1590 		if (ftl_submit_write(wptr, io) == -EAGAIN) {
1591 			return 0;
1592 		}
1593 	}
1594 
1595 	/* Make sure the band is prepared for writing */
1596 	if (!ftl_wptr_ready(wptr)) {
1597 		return 0;
1598 	}
1599 
1600 	if (dev->halt) {
1601 		ftl_wptr_process_shutdown(wptr);
1602 	}
1603 
1604 	if (spdk_unlikely(wptr->flush)) {
1605 		ftl_wptr_pad_band(wptr);
1606 	}
1607 
1608 	batch = ftl_rwb_pop(dev->rwb);
1609 	if (!batch) {
1610 		/* If there are queued flush requests we need to pad the RWB to */
1611 		/* force out remaining entries */
1612 		if (!LIST_EMPTY(&dev->flush_list)) {
1613 			ftl_flush_pad_batch(dev);
1614 		}
1615 
1616 		return 0;
1617 	}
1618 
1619 	io = ftl_io_rwb_init(dev, wptr->band, batch, ftl_write_cb);
1620 	if (!io) {
1621 		goto error;
1622 	}
1623 
1624 	ppa = wptr->ppa;
1625 	ftl_rwb_foreach(entry, batch) {
1626 		/* Update band's relocation stats if the IO comes from reloc */
1627 		if (entry->flags & FTL_IO_WEAK) {
1628 			if (!spdk_bit_array_get(wptr->band->reloc_bitmap, entry->band->id)) {
1629 				spdk_bit_array_set(wptr->band->reloc_bitmap, entry->band->id);
1630 				entry->band->num_reloc_bands++;
1631 			}
1632 		}
1633 
1634 		entry->ppa = ppa;
1635 		if (entry->lba != FTL_LBA_INVALID) {
1636 			pthread_spin_lock(&entry->lock);
1637 			prev_ppa = ftl_l2p_get(dev, entry->lba);
1638 
1639 			/* If the l2p was updated in the meantime, don't update band's metadata */
1640 			if (ftl_ppa_cached(prev_ppa) && prev_ppa.offset == entry->pos) {
1641 				/* Setting entry's cache bit needs to be done after metadata */
1642 				/* within the band is updated to make sure that writes */
1643 				/* invalidating the entry clear the metadata as well */
1644 				ftl_band_set_addr(wptr->band, entry->lba, entry->ppa);
1645 				ftl_rwb_entry_set_valid(entry);
1646 			}
1647 			pthread_spin_unlock(&entry->lock);
1648 		}
1649 
1650 		ftl_trace_rwb_pop(dev, entry);
1651 		ftl_update_rwb_stats(dev, entry);
1652 
1653 		ppa = ftl_band_next_ppa(wptr->band, ppa, 1);
1654 	}
1655 
1656 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write ppa:%lx, %lx\n", wptr->ppa.ppa,
1657 		      ftl_ppa_addr_pack(dev, wptr->ppa));
1658 
1659 	if (ftl_submit_write(wptr, io)) {
1660 		/* TODO: we need some recovery here */
1661 		assert(0 && "Write submit failed");
1662 		if (ftl_io_done(io)) {
1663 			ftl_io_free(io);
1664 		}
1665 	}
1666 
1667 	return dev->xfer_size;
1668 error:
1669 	ftl_rwb_batch_revert(batch);
1670 	return 0;
1671 }
1672 
1673 static int
1674 ftl_process_writes(struct spdk_ftl_dev *dev)
1675 {
1676 	struct ftl_wptr *wptr, *twptr;
1677 	size_t num_active = 0;
1678 	enum ftl_band_state state;
1679 
1680 	LIST_FOREACH_SAFE(wptr, &dev->wptr_list, list_entry, twptr) {
1681 		ftl_wptr_process_writes(wptr);
1682 		state = wptr->band->state;
1683 
1684 		if (state != FTL_BAND_STATE_FULL &&
1685 		    state != FTL_BAND_STATE_CLOSING &&
1686 		    state != FTL_BAND_STATE_CLOSED) {
1687 			num_active++;
1688 		}
1689 	}
1690 
1691 	if (num_active < 1) {
1692 		ftl_add_wptr(dev);
1693 	}
1694 
1695 	return 0;
1696 }
1697 
1698 static void
1699 ftl_rwb_entry_fill(struct ftl_rwb_entry *entry, struct ftl_io *io)
1700 {
1701 	memcpy(entry->data, ftl_io_iovec_addr(io), FTL_BLOCK_SIZE);
1702 
1703 	if (ftl_rwb_entry_weak(entry)) {
1704 		entry->band = ftl_band_from_ppa(io->dev, io->ppa);
1705 		entry->ppa = ftl_band_next_ppa(entry->band, io->ppa, io->pos);
1706 		entry->band->num_reloc_blocks++;
1707 	}
1708 
1709 	entry->trace = io->trace;
1710 	entry->lba = ftl_io_current_lba(io);
1711 
1712 	if (entry->md) {
1713 		memcpy(entry->md, &entry->lba, sizeof(entry->lba));
1714 	}
1715 }
1716 
1717 static int
1718 ftl_rwb_fill(struct ftl_io *io)
1719 {
1720 	struct spdk_ftl_dev *dev = io->dev;
1721 	struct ftl_rwb_entry *entry;
1722 	struct ftl_ppa ppa = { .cached = 1 };
1723 	int flags = ftl_rwb_flags_from_io(io);
1724 
1725 	while (io->pos < io->lbk_cnt) {
1726 		if (ftl_io_current_lba(io) == FTL_LBA_INVALID) {
1727 			ftl_io_advance(io, 1);
1728 			continue;
1729 		}
1730 
1731 		entry = ftl_acquire_entry(dev, flags);
1732 		if (!entry) {
1733 			return -EAGAIN;
1734 		}
1735 
1736 		ftl_rwb_entry_fill(entry, io);
1737 
1738 		ppa.offset = entry->pos;
1739 
1740 		ftl_trace_rwb_fill(dev, io);
1741 		ftl_update_l2p(dev, entry, ppa);
1742 		ftl_io_advance(io, 1);
1743 
1744 		/* Needs to be done after L2P is updated to avoid race with */
1745 		/* write completion callback when it's processed faster than */
1746 		/* L2P is set in update_l2p(). */
1747 		ftl_rwb_push(entry);
1748 	}
1749 
1750 	if (ftl_io_done(io)) {
1751 		if (ftl_dev_has_nv_cache(dev) && !(io->flags & FTL_IO_BYPASS_CACHE)) {
1752 			ftl_write_nv_cache(io);
1753 		} else {
1754 			ftl_io_complete(io);
1755 		}
1756 	}
1757 
1758 	return 0;
1759 }
1760 
1761 static bool
1762 ftl_dev_needs_defrag(struct spdk_ftl_dev *dev)
1763 {
1764 	const struct spdk_ftl_limit *limit = ftl_get_limit(dev, SPDK_FTL_LIMIT_START);
1765 
1766 	if (ftl_reloc_is_halted(dev->reloc)) {
1767 		return false;
1768 	}
1769 
1770 	if (dev->df_band) {
1771 		return false;
1772 	}
1773 
1774 	if (dev->num_free <= limit->thld) {
1775 		return true;
1776 	}
1777 
1778 	return false;
1779 }
1780 
1781 static double
1782 ftl_band_calc_merit(struct ftl_band *band, size_t *threshold_valid)
1783 {
1784 	size_t usable, valid, invalid;
1785 	double vld_ratio;
1786 
1787 	/* If the band doesn't have any usable lbks it's of no use */
1788 	usable = ftl_band_num_usable_lbks(band);
1789 	if (usable == 0) {
1790 		return 0.0;
1791 	}
1792 
1793 	valid =  threshold_valid ? (usable - *threshold_valid) : band->lba_map.num_vld;
1794 	invalid = usable - valid;
1795 
1796 	/* Add one to avoid division by 0 */
1797 	vld_ratio = (double)invalid / (double)(valid + 1);
1798 	return vld_ratio * ftl_band_age(band);
1799 }
1800 
1801 static bool
1802 ftl_band_needs_defrag(struct ftl_band *band, struct spdk_ftl_dev *dev)
1803 {
1804 	struct spdk_ftl_conf *conf = &dev->conf;
1805 	size_t thld_vld;
1806 
1807 	/* If we're in dire need of free bands, every band is worth defragging */
1808 	if (ftl_current_limit(dev) == SPDK_FTL_LIMIT_CRIT) {
1809 		return true;
1810 	}
1811 
1812 	thld_vld = (ftl_band_num_usable_lbks(band) * conf->defrag.invalid_thld) / 100;
1813 
1814 	return band->merit > ftl_band_calc_merit(band, &thld_vld);
1815 }
1816 
1817 static struct ftl_band *
1818 ftl_select_defrag_band(struct spdk_ftl_dev *dev)
1819 {
1820 	struct ftl_band *band, *mband = NULL;
1821 	double merit = 0;
1822 
1823 	LIST_FOREACH(band, &dev->shut_bands, list_entry) {
1824 		assert(band->state == FTL_BAND_STATE_CLOSED);
1825 		band->merit = ftl_band_calc_merit(band, NULL);
1826 		if (band->merit > merit) {
1827 			merit = band->merit;
1828 			mband = band;
1829 		}
1830 	}
1831 
1832 	if (mband && !ftl_band_needs_defrag(mband, dev)) {
1833 		mband = NULL;
1834 	}
1835 
1836 	return mband;
1837 }
1838 
1839 static void
1840 ftl_process_relocs(struct spdk_ftl_dev *dev)
1841 {
1842 	struct ftl_band *band;
1843 
1844 	if (ftl_dev_needs_defrag(dev)) {
1845 		band = dev->df_band = ftl_select_defrag_band(dev);
1846 
1847 		if (band) {
1848 			ftl_reloc_add(dev->reloc, band, 0, ftl_num_band_lbks(dev), 0);
1849 			ftl_trace_defrag_band(dev, band);
1850 		}
1851 	}
1852 
1853 	ftl_reloc(dev->reloc);
1854 }
1855 
1856 int
1857 ftl_current_limit(const struct spdk_ftl_dev *dev)
1858 {
1859 	return dev->limit;
1860 }
1861 
1862 void
1863 spdk_ftl_dev_get_attrs(const struct spdk_ftl_dev *dev, struct spdk_ftl_attrs *attrs)
1864 {
1865 	attrs->uuid = dev->uuid;
1866 	attrs->lbk_cnt = dev->num_lbas;
1867 	attrs->lbk_size = FTL_BLOCK_SIZE;
1868 	attrs->range = dev->range;
1869 	attrs->cache_bdev_desc = dev->nv_cache.bdev_desc;
1870 	attrs->allow_open_bands = dev->conf.allow_open_bands;
1871 	attrs->num_chunks = dev->geo.num_chk;
1872 	attrs->chunk_size = dev->geo.clba;
1873 }
1874 
1875 static void
1876 _ftl_io_write(void *ctx)
1877 {
1878 	ftl_io_write((struct ftl_io *)ctx);
1879 }
1880 
1881 static int
1882 ftl_rwb_fill_leaf(struct ftl_io *io)
1883 {
1884 	int rc;
1885 
1886 	rc = ftl_rwb_fill(io);
1887 	if (rc == -EAGAIN) {
1888 		spdk_thread_send_msg(spdk_io_channel_get_thread(io->ioch),
1889 				     _ftl_io_write, io);
1890 		return 0;
1891 	}
1892 
1893 	return rc;
1894 }
1895 
1896 static int
1897 ftl_submit_write_leaf(struct ftl_io *io)
1898 {
1899 	int rc;
1900 
1901 	rc = ftl_submit_write(ftl_wptr_from_band(io->band), io);
1902 	if (rc == -EAGAIN) {
1903 		/* EAGAIN means that the request was put on the pending queue */
1904 		return 0;
1905 	}
1906 
1907 	return rc;
1908 }
1909 
1910 void
1911 ftl_io_write(struct ftl_io *io)
1912 {
1913 	struct spdk_ftl_dev *dev = io->dev;
1914 
1915 	/* For normal IOs we just need to copy the data onto the rwb */
1916 	if (!(io->flags & FTL_IO_MD)) {
1917 		ftl_io_call_foreach_child(io, ftl_rwb_fill_leaf);
1918 	} else {
1919 		/* Metadata has its own buffer, so it doesn't have to be copied, so just */
1920 		/* send it the the core thread and schedule the write immediately */
1921 		if (ftl_check_core_thread(dev)) {
1922 			ftl_io_call_foreach_child(io, ftl_submit_write_leaf);
1923 		} else {
1924 			spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_write, io);
1925 		}
1926 	}
1927 }
1928 
1929 int
1930 spdk_ftl_write(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
1931 	       struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
1932 {
1933 	struct ftl_io *io;
1934 
1935 	if (iov_cnt == 0) {
1936 		return -EINVAL;
1937 	}
1938 
1939 	if (lba_cnt == 0) {
1940 		return -EINVAL;
1941 	}
1942 
1943 	if (lba_cnt != ftl_iovec_num_lbks(iov, iov_cnt)) {
1944 		return -EINVAL;
1945 	}
1946 
1947 	if (!dev->initialized) {
1948 		return -EBUSY;
1949 	}
1950 
1951 	io = ftl_io_user_init(ch, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_WRITE);
1952 	if (!io) {
1953 		return -ENOMEM;
1954 	}
1955 
1956 	ftl_io_write(io);
1957 
1958 	return 0;
1959 }
1960 
1961 static int
1962 ftl_io_read_leaf(struct ftl_io *io)
1963 {
1964 	int rc;
1965 
1966 	rc = ftl_submit_read(io);
1967 	if (rc == -ENOMEM) {
1968 		/* ENOMEM means that the request was put on a pending queue */
1969 		return 0;
1970 	}
1971 
1972 	return rc;
1973 }
1974 
1975 static void
1976 _ftl_io_read(void *arg)
1977 {
1978 	ftl_io_read((struct ftl_io *)arg);
1979 }
1980 
1981 void
1982 ftl_io_read(struct ftl_io *io)
1983 {
1984 	struct spdk_ftl_dev *dev = io->dev;
1985 
1986 	if (ftl_check_read_thread(dev)) {
1987 		ftl_io_call_foreach_child(io, ftl_io_read_leaf);
1988 	} else {
1989 		spdk_thread_send_msg(ftl_get_read_thread(dev), _ftl_io_read, io);
1990 	}
1991 }
1992 
1993 int
1994 spdk_ftl_read(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
1995 	      struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
1996 {
1997 	struct ftl_io *io;
1998 
1999 	if (iov_cnt == 0) {
2000 		return -EINVAL;
2001 	}
2002 
2003 	if (lba_cnt == 0) {
2004 		return -EINVAL;
2005 	}
2006 
2007 	if (lba_cnt != ftl_iovec_num_lbks(iov, iov_cnt)) {
2008 		return -EINVAL;
2009 	}
2010 
2011 	if (!dev->initialized) {
2012 		return -EBUSY;
2013 	}
2014 
2015 	io = ftl_io_user_init(ch, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_READ);
2016 	if (!io) {
2017 		return -ENOMEM;
2018 	}
2019 
2020 	ftl_io_read(io);
2021 	return 0;
2022 }
2023 
2024 static struct ftl_flush *
2025 ftl_flush_init(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2026 {
2027 	struct ftl_flush *flush;
2028 	struct ftl_rwb *rwb = dev->rwb;
2029 
2030 	flush = calloc(1, sizeof(*flush));
2031 	if (!flush) {
2032 		return NULL;
2033 	}
2034 
2035 	flush->bmap = spdk_bit_array_create(ftl_rwb_num_batches(rwb));
2036 	if (!flush->bmap) {
2037 		goto error;
2038 	}
2039 
2040 	flush->dev = dev;
2041 	flush->cb.fn = cb_fn;
2042 	flush->cb.ctx = cb_arg;
2043 
2044 	return flush;
2045 error:
2046 	free(flush);
2047 	return NULL;
2048 }
2049 
2050 static void
2051 _ftl_flush(void *ctx)
2052 {
2053 	struct ftl_flush *flush = ctx;
2054 	struct spdk_ftl_dev *dev = flush->dev;
2055 	struct ftl_rwb *rwb = dev->rwb;
2056 	struct ftl_rwb_batch *batch;
2057 
2058 	/* Attach flush object to all non-empty batches */
2059 	ftl_rwb_foreach_batch(batch, rwb) {
2060 		if (!ftl_rwb_batch_empty(batch)) {
2061 			spdk_bit_array_set(flush->bmap, ftl_rwb_batch_get_offset(batch));
2062 			flush->num_req++;
2063 		}
2064 	}
2065 
2066 	LIST_INSERT_HEAD(&dev->flush_list, flush, list_entry);
2067 
2068 	/* If the RWB was already empty, the flush can be completed right away */
2069 	if (!flush->num_req) {
2070 		ftl_complete_flush(flush);
2071 	}
2072 }
2073 
2074 int
2075 spdk_ftl_flush(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2076 {
2077 	struct ftl_flush *flush;
2078 
2079 	if (!dev->initialized) {
2080 		return -EBUSY;
2081 	}
2082 
2083 	flush = ftl_flush_init(dev, cb_fn, cb_arg);
2084 	if (!flush) {
2085 		return -ENOMEM;
2086 	}
2087 
2088 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_flush, flush);
2089 	return 0;
2090 }
2091 
2092 static void
2093 _ftl_process_anm_event(void *ctx)
2094 {
2095 	ftl_process_anm_event((struct ftl_anm_event *)ctx);
2096 }
2097 
2098 void
2099 ftl_process_anm_event(struct ftl_anm_event *event)
2100 {
2101 	struct spdk_ftl_dev *dev = event->dev;
2102 	struct ftl_band *band;
2103 	size_t lbkoff;
2104 
2105 	if (!ftl_check_core_thread(dev)) {
2106 		spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_process_anm_event, event);
2107 		return;
2108 	}
2109 
2110 	band = ftl_band_from_ppa(dev, event->ppa);
2111 	lbkoff = ftl_band_lbkoff_from_ppa(band, event->ppa);
2112 
2113 	ftl_reloc_add(dev->reloc, band, lbkoff, event->num_lbks, 0);
2114 	ftl_anm_event_complete(event);
2115 }
2116 
2117 bool
2118 ftl_ppa_is_written(struct ftl_band *band, struct ftl_ppa ppa)
2119 {
2120 	struct ftl_chunk *chunk = ftl_band_chunk_from_ppa(band, ppa);
2121 
2122 	return ppa.lbk < chunk->write_offset;
2123 }
2124 
2125 static void
2126 ftl_process_retry_queue(struct spdk_ftl_dev *dev)
2127 {
2128 	struct ftl_io *io;
2129 	int rc;
2130 
2131 	while (!TAILQ_EMPTY(&dev->retry_queue)) {
2132 		io = TAILQ_FIRST(&dev->retry_queue);
2133 
2134 		/* Retry only if IO is still healthy */
2135 		if (spdk_likely(io->status == 0)) {
2136 			rc = ftl_submit_read(io);
2137 			if (rc == -ENOMEM) {
2138 				break;
2139 			}
2140 		}
2141 
2142 		io->flags &= ~FTL_IO_RETRY;
2143 		TAILQ_REMOVE(&dev->retry_queue, io, retry_entry);
2144 
2145 		if (ftl_io_done(io)) {
2146 			ftl_io_complete(io);
2147 		}
2148 	}
2149 }
2150 
2151 int
2152 ftl_task_read(void *ctx)
2153 {
2154 	struct ftl_thread *thread = ctx;
2155 	struct spdk_ftl_dev *dev = thread->dev;
2156 	struct spdk_nvme_qpair *qpair = ftl_get_read_qpair(dev);
2157 	size_t num_completed;
2158 
2159 	if (dev->halt) {
2160 		if (ftl_shutdown_complete(dev)) {
2161 			spdk_poller_unregister(&thread->poller);
2162 			return 0;
2163 		}
2164 	}
2165 
2166 	num_completed = spdk_nvme_qpair_process_completions(qpair, 0);
2167 
2168 	if (num_completed && !TAILQ_EMPTY(&dev->retry_queue)) {
2169 		ftl_process_retry_queue(dev);
2170 	}
2171 
2172 	return num_completed;
2173 }
2174 
2175 int
2176 ftl_task_core(void *ctx)
2177 {
2178 	struct ftl_thread *thread = ctx;
2179 	struct spdk_ftl_dev *dev = thread->dev;
2180 	struct spdk_nvme_qpair *qpair = ftl_get_write_qpair(dev);
2181 
2182 	if (dev->halt) {
2183 		if (ftl_shutdown_complete(dev)) {
2184 			spdk_poller_unregister(&thread->poller);
2185 			return 0;
2186 		}
2187 	}
2188 
2189 	ftl_process_writes(dev);
2190 	spdk_nvme_qpair_process_completions(qpair, 0);
2191 	ftl_process_relocs(dev);
2192 
2193 	return 0;
2194 }
2195 
2196 SPDK_LOG_REGISTER_COMPONENT("ftl_core", SPDK_LOG_FTL_CORE)
2197