xref: /spdk/lib/ftl/ftl_core.c (revision bb488d2829a9b7863daab45917dd2174905cc0ae)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/likely.h"
35 #include "spdk/stdinc.h"
36 #include "spdk/nvme.h"
37 #include "spdk/io_channel.h"
38 #include "spdk/bdev_module.h"
39 #include "spdk_internal/log.h"
40 #include "spdk/ftl.h"
41 
42 #include "ftl_core.h"
43 #include "ftl_band.h"
44 #include "ftl_io.h"
45 #include "ftl_anm.h"
46 #include "ftl_rwb.h"
47 #include "ftl_debug.h"
48 #include "ftl_reloc.h"
49 
50 /* Max number of iovecs */
51 #define FTL_MAX_IOV 1024
52 
53 struct ftl_wptr {
54 	/* Owner device */
55 	struct spdk_ftl_dev		*dev;
56 
57 	/* Current PPA */
58 	struct ftl_ppa			ppa;
59 
60 	/* Band currently being written to */
61 	struct ftl_band			*band;
62 
63 	/* Current logical block's offset */
64 	uint64_t			offset;
65 
66 	/* Current erase block */
67 	struct ftl_chunk		*chunk;
68 
69 	/* List link */
70 	LIST_ENTRY(ftl_wptr)		list_entry;
71 };
72 
73 struct ftl_flush {
74 	/* Owner device */
75 	struct spdk_ftl_dev		*dev;
76 
77 	/* Number of batches to wait for */
78 	size_t				num_req;
79 
80 	/* Callback */
81 	struct ftl_cb			cb;
82 
83 	/* Batch bitmap */
84 	struct spdk_bit_array		*bmap;
85 
86 	/* List link */
87 	LIST_ENTRY(ftl_flush)		list_entry;
88 };
89 
90 typedef int (*ftl_next_ppa_fn)(struct ftl_io *, struct ftl_ppa *, size_t, void *);
91 static void _ftl_read(void *);
92 static void _ftl_write(void *);
93 
94 static int
95 ftl_rwb_flags_from_io(const struct ftl_io *io)
96 {
97 	int valid_flags = FTL_IO_INTERNAL | FTL_IO_WEAK | FTL_IO_PAD;
98 	return io->flags & valid_flags;
99 }
100 
101 static int
102 ftl_rwb_entry_weak(const struct ftl_rwb_entry *entry)
103 {
104 	return entry->flags & FTL_IO_WEAK;
105 }
106 
107 static void
108 ftl_wptr_free(struct ftl_wptr *wptr)
109 {
110 	if (!wptr) {
111 		return;
112 	}
113 
114 	free(wptr);
115 }
116 
117 static void
118 ftl_remove_wptr(struct ftl_wptr *wptr)
119 {
120 	LIST_REMOVE(wptr, list_entry);
121 	ftl_wptr_free(wptr);
122 }
123 
124 static void
125 ftl_io_cmpl_cb(void *arg, const struct spdk_nvme_cpl *status)
126 {
127 	struct ftl_io *io = arg;
128 
129 	if (spdk_nvme_cpl_is_error(status)) {
130 		ftl_io_process_error(io, status);
131 	}
132 
133 	ftl_trace_completion(io->dev, io, FTL_TRACE_COMPLETION_DISK);
134 
135 	ftl_io_dec_req(io);
136 
137 	if (ftl_io_done(io)) {
138 		ftl_io_complete(io);
139 	}
140 }
141 
142 static void
143 ftl_halt_writes(struct spdk_ftl_dev *dev, struct ftl_band *band)
144 {
145 	struct ftl_wptr *wptr = NULL;
146 
147 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
148 		if (wptr->band == band) {
149 			break;
150 		}
151 	}
152 
153 	/* If the band already has the high_prio flag set, other writes must */
154 	/* have failed earlier, so it's already taken care of. */
155 	if (band->high_prio) {
156 		assert(wptr == NULL);
157 		return;
158 	}
159 
160 	ftl_band_write_failed(band);
161 	ftl_remove_wptr(wptr);
162 }
163 
164 static struct ftl_wptr *
165 ftl_wptr_from_band(struct ftl_band *band)
166 {
167 	struct spdk_ftl_dev *dev = band->dev;
168 	struct ftl_wptr *wptr = NULL;
169 
170 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
171 		if (wptr->band == band) {
172 			return wptr;
173 		}
174 	}
175 
176 	return NULL;
177 }
178 
179 static void
180 ftl_md_write_fail(struct ftl_io *io, int status)
181 {
182 	struct ftl_band *band = io->band;
183 	struct ftl_wptr *wptr;
184 	char buf[128];
185 
186 	wptr = ftl_wptr_from_band(band);
187 
188 	SPDK_ERRLOG("Metadata write failed @ppa: %s, status: %d\n",
189 		    ftl_ppa2str(wptr->ppa, buf, sizeof(buf)), status);
190 
191 	ftl_halt_writes(io->dev, band);
192 }
193 
194 static void
195 ftl_md_write_cb(void *arg, int status)
196 {
197 	struct ftl_io *io = arg;
198 	struct ftl_wptr *wptr;
199 
200 	wptr = ftl_wptr_from_band(io->band);
201 
202 	if (status) {
203 		ftl_md_write_fail(io, status);
204 		return;
205 	}
206 
207 	ftl_band_set_next_state(io->band);
208 	if (io->band->state == FTL_BAND_STATE_CLOSED) {
209 		ftl_remove_wptr(wptr);
210 	}
211 }
212 
213 static int
214 ftl_ppa_read_next_ppa(struct ftl_io *io, struct ftl_ppa *ppa,
215 		      size_t lbk, void *ctx)
216 {
217 	struct spdk_ftl_dev *dev = io->dev;
218 	size_t lbk_cnt, max_lbks;
219 
220 	assert(ftl_io_mode_ppa(io));
221 	assert(io->iov_pos < io->iov_cnt);
222 
223 	if (lbk == 0) {
224 		*ppa = io->ppa;
225 	} else {
226 		*ppa = ftl_band_next_xfer_ppa(io->band, io->ppa, lbk);
227 	}
228 
229 	assert(!ftl_ppa_invalid(*ppa));
230 
231 	/* Metadata has to be read in the way it's written (jumping across */
232 	/* the chunks in xfer_size increments) */
233 	if (io->flags & FTL_IO_MD) {
234 		max_lbks = dev->xfer_size - (ppa->lbk % dev->xfer_size);
235 		lbk_cnt = spdk_min(ftl_io_iovec_len_left(io), max_lbks);
236 		assert(ppa->lbk / dev->xfer_size == (ppa->lbk + lbk_cnt - 1) / dev->xfer_size);
237 	} else {
238 		lbk_cnt = ftl_io_iovec_len_left(io);
239 	}
240 
241 	return lbk_cnt;
242 }
243 
244 static int
245 ftl_wptr_close_band(struct ftl_wptr *wptr)
246 {
247 	struct ftl_band *band = wptr->band;
248 
249 	ftl_band_set_state(band, FTL_BAND_STATE_CLOSING);
250 	band->tail_md_ppa = wptr->ppa;
251 
252 	return ftl_band_write_tail_md(band, band->md.dma_buf, ftl_md_write_cb);
253 }
254 
255 static int
256 ftl_wptr_open_band(struct ftl_wptr *wptr)
257 {
258 	struct ftl_band *band = wptr->band;
259 
260 	assert(ftl_band_chunk_is_first(band, wptr->chunk));
261 	assert(band->md.num_vld == 0);
262 
263 	ftl_band_clear_md(band);
264 
265 	assert(band->state == FTL_BAND_STATE_PREP);
266 	ftl_band_set_state(band, FTL_BAND_STATE_OPENING);
267 
268 	return ftl_band_write_head_md(band, band->md.dma_buf, ftl_md_write_cb);
269 }
270 
271 static int
272 ftl_submit_erase(struct ftl_io *io)
273 {
274 	struct spdk_ftl_dev *dev = io->dev;
275 	struct ftl_band *band = io->band;
276 	struct ftl_ppa ppa = io->ppa;
277 	struct ftl_chunk *chunk;
278 	uint64_t ppa_packed;
279 	int rc = 0;
280 	size_t i;
281 
282 	for (i = 0; i < io->lbk_cnt; ++i) {
283 		if (i != 0) {
284 			chunk = ftl_band_next_chunk(band, ftl_band_chunk_from_ppa(band, ppa));
285 			assert(chunk->state == FTL_CHUNK_STATE_CLOSED ||
286 			       chunk->state == FTL_CHUNK_STATE_VACANT);
287 			ppa = chunk->start_ppa;
288 		}
289 
290 		assert(ppa.lbk == 0);
291 		ppa_packed = ftl_ppa_addr_pack(dev, ppa);
292 
293 		ftl_io_inc_req(io);
294 
295 		ftl_trace_submission(dev, io, ppa, 1);
296 		rc = spdk_nvme_ocssd_ns_cmd_vector_reset(dev->ns, ftl_get_write_qpair(dev),
297 				&ppa_packed, 1, NULL, ftl_io_cmpl_cb, io);
298 		if (rc) {
299 			SPDK_ERRLOG("Vector reset failed with status: %d\n", rc);
300 			ftl_io_dec_req(io);
301 			break;
302 		}
303 	}
304 
305 	if (ftl_io_done(io)) {
306 		ftl_io_complete(io);
307 	}
308 
309 	return rc;
310 }
311 
312 static void
313 _ftl_io_erase(void *ctx)
314 {
315 	ftl_io_erase((struct ftl_io *)ctx);
316 }
317 
318 static bool
319 ftl_check_core_thread(const struct spdk_ftl_dev *dev)
320 {
321 	return dev->core_thread.thread == spdk_get_thread();
322 }
323 
324 static bool
325 ftl_check_read_thread(const struct spdk_ftl_dev *dev)
326 {
327 	return dev->read_thread.thread == spdk_get_thread();
328 }
329 
330 int
331 ftl_io_erase(struct ftl_io *io)
332 {
333 	struct spdk_ftl_dev *dev = io->dev;
334 
335 	if (ftl_check_core_thread(dev)) {
336 		return ftl_submit_erase(io);
337 	}
338 
339 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_erase, io);
340 	return 0;
341 }
342 
343 static struct ftl_band *
344 ftl_next_write_band(struct spdk_ftl_dev *dev)
345 {
346 	struct ftl_band *band;
347 
348 	band = LIST_FIRST(&dev->free_bands);
349 	if (!band) {
350 		return NULL;
351 	}
352 	assert(band->state == FTL_BAND_STATE_FREE);
353 
354 	if (ftl_band_erase(band)) {
355 		/* TODO: handle erase failure */
356 		return NULL;
357 	}
358 
359 	return band;
360 }
361 
362 static struct ftl_band *
363 ftl_next_wptr_band(struct spdk_ftl_dev *dev)
364 {
365 	struct ftl_band *band;
366 
367 	if (!dev->next_band) {
368 		band = ftl_next_write_band(dev);
369 	} else {
370 		assert(dev->next_band->state == FTL_BAND_STATE_PREP);
371 		band = dev->next_band;
372 		dev->next_band = NULL;
373 	}
374 
375 	return band;
376 }
377 
378 static struct ftl_wptr *
379 ftl_wptr_init(struct ftl_band *band)
380 {
381 	struct spdk_ftl_dev *dev = band->dev;
382 	struct ftl_wptr *wptr;
383 
384 	wptr = calloc(1, sizeof(*wptr));
385 	if (!wptr) {
386 		return NULL;
387 	}
388 
389 	wptr->dev = dev;
390 	wptr->band = band;
391 	wptr->chunk = CIRCLEQ_FIRST(&band->chunks);
392 	wptr->ppa = wptr->chunk->start_ppa;
393 
394 	return wptr;
395 }
396 
397 static int
398 ftl_add_wptr(struct spdk_ftl_dev *dev)
399 {
400 	struct ftl_band *band;
401 	struct ftl_wptr *wptr;
402 
403 	band = ftl_next_wptr_band(dev);
404 	if (!band) {
405 		return -1;
406 	}
407 
408 	wptr = ftl_wptr_init(band);
409 	if (!wptr) {
410 		return -1;
411 	}
412 
413 	if (ftl_band_write_prep(band)) {
414 		ftl_wptr_free(wptr);
415 		return -1;
416 	}
417 
418 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
419 
420 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: band %u\n", band->id);
421 	ftl_trace_write_band(dev, band);
422 	return 0;
423 }
424 
425 static void
426 ftl_wptr_advance(struct ftl_wptr *wptr, size_t xfer_size)
427 {
428 	struct ftl_band *band = wptr->band;
429 	struct spdk_ftl_dev *dev = wptr->dev;
430 	struct spdk_ftl_conf *conf = &dev->conf;
431 	size_t next_thld;
432 
433 	wptr->offset += xfer_size;
434 	next_thld = (ftl_band_num_usable_lbks(band) * conf->band_thld) / 100;
435 
436 	if (ftl_band_full(band, wptr->offset)) {
437 		ftl_band_set_state(band, FTL_BAND_STATE_FULL);
438 	}
439 
440 	wptr->ppa = ftl_band_next_xfer_ppa(band, wptr->ppa, xfer_size);
441 	wptr->chunk = ftl_band_next_operational_chunk(band, wptr->chunk);
442 
443 	assert(!ftl_ppa_invalid(wptr->ppa));
444 
445 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: grp:%d, pu:%d chunk:%d, lbk:%u\n",
446 		      wptr->ppa.grp, wptr->ppa.pu, wptr->ppa.chk, wptr->ppa.lbk);
447 
448 	if (wptr->offset >= next_thld && !dev->next_band) {
449 		dev->next_band = ftl_next_write_band(dev);
450 	}
451 }
452 
453 static int
454 ftl_wptr_ready(struct ftl_wptr *wptr)
455 {
456 	struct ftl_band *band = wptr->band;
457 
458 	/* TODO: add handling of empty bands */
459 
460 	if (spdk_unlikely(!ftl_chunk_is_writable(wptr->chunk))) {
461 		/* Erasing band may fail after it was assigned to wptr. */
462 		if (spdk_unlikely(wptr->chunk->state == FTL_CHUNK_STATE_BAD)) {
463 			ftl_wptr_advance(wptr, wptr->dev->xfer_size);
464 		}
465 		return 0;
466 	}
467 
468 	/* If we're in the process of writing metadata, wait till it is */
469 	/* completed. */
470 	/* TODO: we should probably change bands once we're writing tail md */
471 	if (ftl_band_state_changing(band)) {
472 		return 0;
473 	}
474 
475 	if (band->state == FTL_BAND_STATE_FULL) {
476 		if (ftl_wptr_close_band(wptr)) {
477 			/* TODO: need recovery here */
478 			assert(false);
479 		}
480 		return 0;
481 	}
482 
483 	if (band->state != FTL_BAND_STATE_OPEN) {
484 		if (ftl_wptr_open_band(wptr)) {
485 			/* TODO: need recovery here */
486 			assert(false);
487 		}
488 		return 0;
489 	}
490 
491 	return 1;
492 }
493 
494 static const struct spdk_ftl_limit *
495 ftl_get_limit(const struct spdk_ftl_dev *dev, int type)
496 {
497 	assert(type < SPDK_FTL_LIMIT_MAX);
498 	return &dev->conf.defrag.limits[type];
499 }
500 
501 static bool
502 ftl_cache_lba_valid(struct spdk_ftl_dev *dev, struct ftl_rwb_entry *entry)
503 {
504 	struct ftl_ppa ppa;
505 
506 	/* If the LBA is invalid don't bother checking the md and l2p */
507 	if (spdk_unlikely(entry->lba == FTL_LBA_INVALID)) {
508 		return false;
509 	}
510 
511 	ppa = ftl_l2p_get(dev, entry->lba);
512 	if (!(ftl_ppa_cached(ppa) && ppa.offset == entry->pos)) {
513 		return false;
514 	}
515 
516 	return true;
517 }
518 
519 static void
520 ftl_evict_cache_entry(struct spdk_ftl_dev *dev, struct ftl_rwb_entry *entry)
521 {
522 	pthread_spin_lock(&entry->lock);
523 
524 	if (!ftl_rwb_entry_valid(entry)) {
525 		goto unlock;
526 	}
527 
528 	/* If the l2p wasn't updated and still points at the entry, fill it with the */
529 	/* on-disk PPA and clear the cache status bit. Otherwise, skip the l2p update */
530 	/* and just clear the cache status. */
531 	if (!ftl_cache_lba_valid(dev, entry)) {
532 		goto clear;
533 	}
534 
535 	ftl_l2p_set(dev, entry->lba, entry->ppa);
536 clear:
537 	ftl_rwb_entry_invalidate(entry);
538 unlock:
539 	pthread_spin_unlock(&entry->lock);
540 }
541 
542 static struct ftl_rwb_entry *
543 ftl_acquire_entry(struct spdk_ftl_dev *dev, int flags)
544 {
545 	struct ftl_rwb_entry *entry;
546 
547 	entry = ftl_rwb_acquire(dev->rwb, ftl_rwb_type_from_flags(flags));
548 	if (!entry) {
549 		return NULL;
550 	}
551 
552 	ftl_evict_cache_entry(dev, entry);
553 
554 	entry->flags = flags;
555 	return entry;
556 }
557 
558 static void
559 ftl_rwb_pad(struct spdk_ftl_dev *dev, size_t size)
560 {
561 	struct ftl_rwb_entry *entry;
562 	int flags = FTL_IO_PAD | FTL_IO_INTERNAL;
563 
564 	for (size_t i = 0; i < size; ++i) {
565 		entry = ftl_acquire_entry(dev, flags);
566 		if (!entry) {
567 			break;
568 		}
569 
570 		entry->lba = FTL_LBA_INVALID;
571 		entry->ppa = ftl_to_ppa(FTL_PPA_INVALID);
572 		memset(entry->data, 0, FTL_BLOCK_SIZE);
573 		ftl_rwb_push(entry);
574 	}
575 }
576 
577 static void
578 ftl_remove_free_bands(struct spdk_ftl_dev *dev)
579 {
580 	while (!LIST_EMPTY(&dev->free_bands)) {
581 		LIST_REMOVE(LIST_FIRST(&dev->free_bands), list_entry);
582 	}
583 
584 	dev->next_band = NULL;
585 }
586 
587 static void
588 ftl_process_shutdown(struct spdk_ftl_dev *dev)
589 {
590 	size_t size = ftl_rwb_num_acquired(dev->rwb, FTL_RWB_TYPE_INTERNAL) +
591 		      ftl_rwb_num_acquired(dev->rwb, FTL_RWB_TYPE_USER);
592 
593 	if (size >= dev->xfer_size) {
594 		return;
595 	}
596 
597 	/* If we reach this point we need to remove free bands */
598 	/* and pad current wptr band to the end */
599 	ftl_remove_free_bands(dev);
600 
601 	/* Pad write buffer until band is full */
602 	ftl_rwb_pad(dev, dev->xfer_size - size);
603 }
604 
605 static int
606 ftl_shutdown_complete(struct spdk_ftl_dev *dev)
607 {
608 	return !__atomic_load_n(&dev->num_inflight, __ATOMIC_SEQ_CST) &&
609 	       LIST_EMPTY(&dev->wptr_list);
610 }
611 
612 void
613 ftl_apply_limits(struct spdk_ftl_dev *dev)
614 {
615 	const struct spdk_ftl_limit *limit;
616 	struct ftl_stats *stats = &dev->stats;
617 	size_t rwb_limit[FTL_RWB_TYPE_MAX];
618 	int i;
619 
620 	ftl_rwb_get_limits(dev->rwb, rwb_limit);
621 
622 	/* Clear existing limit */
623 	dev->limit = SPDK_FTL_LIMIT_MAX;
624 
625 	for (i = SPDK_FTL_LIMIT_CRIT; i < SPDK_FTL_LIMIT_MAX; ++i) {
626 		limit = ftl_get_limit(dev, i);
627 
628 		if (dev->num_free <= limit->thld) {
629 			rwb_limit[FTL_RWB_TYPE_USER] =
630 				(limit->limit * ftl_rwb_entry_cnt(dev->rwb)) / 100;
631 			stats->limits[i]++;
632 			dev->limit = i;
633 			goto apply;
634 		}
635 	}
636 
637 	/* Clear the limits, since we don't need to apply them anymore */
638 	rwb_limit[FTL_RWB_TYPE_USER] = ftl_rwb_entry_cnt(dev->rwb);
639 apply:
640 	ftl_trace_limits(dev, rwb_limit, dev->num_free);
641 	ftl_rwb_set_limits(dev->rwb, rwb_limit);
642 }
643 
644 static int
645 ftl_invalidate_addr_unlocked(struct spdk_ftl_dev *dev, struct ftl_ppa ppa)
646 {
647 	struct ftl_band *band = ftl_band_from_ppa(dev, ppa);
648 	struct ftl_md *md = &band->md;
649 	uint64_t offset;
650 
651 	offset = ftl_band_lbkoff_from_ppa(band, ppa);
652 
653 	/* The bit might be already cleared if two writes are scheduled to the */
654 	/* same LBA at the same time */
655 	if (spdk_bit_array_get(md->vld_map, offset)) {
656 		assert(md->num_vld > 0);
657 		spdk_bit_array_clear(md->vld_map, offset);
658 		md->num_vld--;
659 		return 1;
660 	}
661 
662 	return 0;
663 }
664 
665 int
666 ftl_invalidate_addr(struct spdk_ftl_dev *dev, struct ftl_ppa ppa)
667 {
668 	struct ftl_band *band;
669 	int rc;
670 
671 	assert(!ftl_ppa_cached(ppa));
672 	band = ftl_band_from_ppa(dev, ppa);
673 
674 	pthread_spin_lock(&band->md.lock);
675 	rc = ftl_invalidate_addr_unlocked(dev, ppa);
676 	pthread_spin_unlock(&band->md.lock);
677 
678 	return rc;
679 }
680 
681 static int
682 ftl_read_retry(int rc)
683 {
684 	return rc == -EAGAIN;
685 }
686 
687 static int
688 ftl_read_canceled(int rc)
689 {
690 	return rc == 0;
691 }
692 
693 static void
694 ftl_add_to_retry_queue(struct ftl_io *io)
695 {
696 	if (!(io->flags & FTL_IO_RETRY)) {
697 		io->flags |= FTL_IO_RETRY;
698 		TAILQ_INSERT_TAIL(&io->dev->retry_queue, io, retry_entry);
699 	}
700 }
701 
702 static int
703 ftl_submit_read(struct ftl_io *io, ftl_next_ppa_fn next_ppa,
704 		void *ctx)
705 {
706 	struct spdk_ftl_dev *dev = io->dev;
707 	struct ftl_ppa ppa;
708 	int rc = 0, lbk_cnt;
709 
710 	while (io->pos < io->lbk_cnt) {
711 		/* We might hit the cache here, if so, skip the read */
712 		lbk_cnt = rc = next_ppa(io, &ppa, io->pos, ctx);
713 
714 		/* We might need to retry the read from scratch (e.g. */
715 		/* because write was under way and completed before */
716 		/* we could read it from rwb */
717 		if (ftl_read_retry(rc)) {
718 			continue;
719 		}
720 
721 		/* We don't have to schedule the read, as it was read from cache */
722 		if (ftl_read_canceled(rc)) {
723 			ftl_io_update_iovec(io, 1);
724 			continue;
725 		}
726 
727 		assert(lbk_cnt > 0);
728 
729 		ftl_trace_submission(dev, io, ppa, lbk_cnt);
730 		rc = spdk_nvme_ns_cmd_read(dev->ns, ftl_get_read_qpair(dev),
731 					   ftl_io_iovec_addr(io),
732 					   ftl_ppa_addr_pack(io->dev, ppa), lbk_cnt,
733 					   ftl_io_cmpl_cb, io, 0);
734 		if (rc == -ENOMEM) {
735 			ftl_add_to_retry_queue(io);
736 			break;
737 		} else if (rc) {
738 			io->status = rc;
739 			break;
740 		}
741 
742 		ftl_io_update_iovec(io, lbk_cnt);
743 		ftl_io_inc_req(io);
744 	}
745 
746 	/* If we didn't have to read anything from the device, */
747 	/* complete the request right away */
748 	if (ftl_io_done(io)) {
749 		ftl_io_complete(io);
750 	}
751 
752 	return rc;
753 }
754 
755 static int
756 ftl_ppa_cache_read(struct ftl_io *io, uint64_t lba,
757 		   struct ftl_ppa ppa, void *buf)
758 {
759 	struct ftl_rwb *rwb = io->dev->rwb;
760 	struct ftl_rwb_entry *entry;
761 	struct ftl_ppa nppa;
762 	int rc = 0;
763 
764 	entry = ftl_rwb_entry_from_offset(rwb, ppa.offset);
765 	pthread_spin_lock(&entry->lock);
766 
767 	nppa = ftl_l2p_get(io->dev, lba);
768 	if (ppa.ppa != nppa.ppa) {
769 		rc = -1;
770 		goto out;
771 	}
772 
773 	memcpy(buf, entry->data, FTL_BLOCK_SIZE);
774 out:
775 	pthread_spin_unlock(&entry->lock);
776 	return rc;
777 }
778 
779 static int
780 ftl_lba_read_next_ppa(struct ftl_io *io, struct ftl_ppa *ppa,
781 		      size_t lbk, void *ctx)
782 {
783 	struct spdk_ftl_dev *dev = io->dev;
784 	struct ftl_ppa next_ppa;
785 	size_t i;
786 
787 	*ppa = ftl_l2p_get(dev, io->lba + lbk);
788 
789 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Read ppa:%lx, lba:%lu\n", ppa->ppa, io->lba);
790 
791 	/* If the PPA is invalid, skip it (the buffer should already be zero'ed) */
792 	if (ftl_ppa_invalid(*ppa)) {
793 		ftl_trace_completion(io->dev, io, FTL_TRACE_COMPLETION_INVALID);
794 		return 0;
795 	}
796 
797 	if (ftl_ppa_cached(*ppa)) {
798 		if (!ftl_ppa_cache_read(io, io->lba + lbk, *ppa, ftl_io_iovec_addr(io))) {
799 			ftl_trace_completion(io->dev, io, FTL_TRACE_COMPLETION_CACHE);
800 			return 0;
801 		}
802 
803 		/* If the state changed, we have to re-read the l2p */
804 		return -EAGAIN;
805 	}
806 
807 	for (i = 1; i < ftl_io_iovec_len_left(io); ++i) {
808 		next_ppa = ftl_l2p_get(dev, io->lba + lbk + i);
809 
810 		if (ftl_ppa_invalid(next_ppa) || ftl_ppa_cached(next_ppa)) {
811 			break;
812 		}
813 
814 		if (ftl_ppa_addr_pack(dev, *ppa) + i != ftl_ppa_addr_pack(dev, next_ppa)) {
815 			break;
816 		}
817 	}
818 
819 	return i;
820 }
821 
822 static void
823 ftl_complete_flush(struct ftl_flush *flush)
824 {
825 	assert(flush->num_req == 0);
826 	LIST_REMOVE(flush, list_entry);
827 
828 	flush->cb.fn(flush->cb.ctx, 0);
829 
830 	spdk_bit_array_free(&flush->bmap);
831 	free(flush);
832 }
833 
834 static void
835 ftl_process_flush(struct spdk_ftl_dev *dev, struct ftl_rwb_batch *batch)
836 {
837 	struct ftl_flush *flush, *tflush;
838 	size_t offset;
839 
840 	LIST_FOREACH_SAFE(flush, &dev->flush_list, list_entry, tflush) {
841 		offset = ftl_rwb_batch_get_offset(batch);
842 
843 		if (spdk_bit_array_get(flush->bmap, offset)) {
844 			spdk_bit_array_set(flush->bmap, offset);
845 			if (!(--flush->num_req)) {
846 				ftl_complete_flush(flush);
847 			}
848 		}
849 	}
850 }
851 
852 static void
853 ftl_write_fail(struct ftl_io *io, int status)
854 {
855 	struct ftl_rwb_batch *batch = io->rwb_batch;
856 	struct spdk_ftl_dev *dev = io->dev;
857 	struct ftl_rwb_entry *entry;
858 	struct ftl_band *band;
859 	char buf[128];
860 
861 	entry = ftl_rwb_batch_first_entry(batch);
862 
863 	band = ftl_band_from_ppa(io->dev, entry->ppa);
864 	SPDK_ERRLOG("Write failed @ppa: %s, status: %d\n",
865 		    ftl_ppa2str(entry->ppa, buf, sizeof(buf)), status);
866 
867 	/* Close the band and, halt wptr and defrag */
868 	ftl_halt_writes(dev, band);
869 
870 	ftl_rwb_foreach(entry, batch) {
871 		/* Invalidate meta set by process_writes() */
872 		ftl_invalidate_addr(dev, entry->ppa);
873 	}
874 
875 	/* Reset the batch back to the the RWB to resend it later */
876 	ftl_rwb_batch_revert(batch);
877 }
878 
879 static void
880 ftl_write_cb(void *arg, int status)
881 {
882 	struct ftl_io *io = arg;
883 	struct spdk_ftl_dev *dev = io->dev;
884 	struct ftl_rwb_batch *batch = io->rwb_batch;
885 	struct ftl_rwb_entry *entry;
886 
887 	if (status) {
888 		ftl_write_fail(io, status);
889 		return;
890 	}
891 
892 	assert(io->lbk_cnt == dev->xfer_size);
893 	ftl_rwb_foreach(entry, batch) {
894 		if (!(io->flags & FTL_IO_MD) && !(entry->flags & FTL_IO_PAD)) {
895 			/* Verify that the LBA is set for user lbks */
896 			assert(entry->lba != FTL_LBA_INVALID);
897 		}
898 
899 		SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write ppa:%lu, lba:%lu\n",
900 			      entry->ppa.ppa, entry->lba);
901 	}
902 
903 	ftl_process_flush(dev, batch);
904 	ftl_rwb_batch_release(batch);
905 }
906 
907 static void
908 ftl_update_rwb_stats(struct spdk_ftl_dev *dev, const struct ftl_rwb_entry *entry)
909 {
910 	if (!ftl_rwb_entry_internal(entry)) {
911 		dev->stats.write_user++;
912 	}
913 	dev->stats.write_total++;
914 }
915 
916 static void
917 ftl_update_l2p(struct spdk_ftl_dev *dev, const struct ftl_rwb_entry *entry,
918 	       struct ftl_ppa ppa)
919 {
920 	struct ftl_ppa prev_ppa;
921 	struct ftl_rwb_entry *prev;
922 	struct ftl_band *band;
923 	int valid;
924 
925 	prev_ppa = ftl_l2p_get(dev, entry->lba);
926 	if (ftl_ppa_invalid(prev_ppa)) {
927 		ftl_l2p_set(dev, entry->lba, ppa);
928 		return;
929 	}
930 
931 	/* If the L2P's PPA is different than what we expected we don't need to */
932 	/* do anything (someone's already overwritten our data). */
933 	if (ftl_rwb_entry_weak(entry) && !ftl_ppa_cmp(prev_ppa, entry->ppa)) {
934 		return;
935 	}
936 
937 	if (ftl_ppa_cached(prev_ppa)) {
938 		assert(!ftl_rwb_entry_weak(entry));
939 		prev = ftl_rwb_entry_from_offset(dev->rwb, prev_ppa.offset);
940 		pthread_spin_lock(&prev->lock);
941 
942 		/* Re-read the L2P under the lock to protect against updates */
943 		/* to this LBA from other threads */
944 		prev_ppa = ftl_l2p_get(dev, entry->lba);
945 
946 		/* If the entry is no longer in cache, another write has been */
947 		/* scheduled in the meantime, so we have to invalidate its LBA */
948 		if (!ftl_ppa_cached(prev_ppa)) {
949 			ftl_invalidate_addr(dev, prev_ppa);
950 		}
951 
952 		/* If previous entry is part of cache, remove and invalidate it */
953 		if (ftl_rwb_entry_valid(prev)) {
954 			ftl_invalidate_addr(dev, prev->ppa);
955 			ftl_rwb_entry_invalidate(prev);
956 		}
957 
958 		ftl_l2p_set(dev, entry->lba, ppa);
959 		pthread_spin_unlock(&prev->lock);
960 		return;
961 	}
962 
963 	/* Lock the band containing previous PPA. This assures atomic changes to */
964 	/* the L2P as wall as metadata. The valid bits in metadata are used to */
965 	/* check weak writes validity. */
966 	band = ftl_band_from_ppa(dev, prev_ppa);
967 	pthread_spin_lock(&band->md.lock);
968 
969 	valid = ftl_invalidate_addr_unlocked(dev, prev_ppa);
970 
971 	/* If the address has been invalidated already, we don't want to update */
972 	/* the L2P for weak writes, as it means the write is no longer valid. */
973 	if (!ftl_rwb_entry_weak(entry) || valid) {
974 		ftl_l2p_set(dev, entry->lba, ppa);
975 	}
976 
977 	pthread_spin_unlock(&band->md.lock);
978 }
979 
980 static int
981 ftl_submit_write(struct ftl_wptr *wptr, struct ftl_io *io)
982 {
983 	struct spdk_ftl_dev	*dev = io->dev;
984 	struct iovec		*iov = ftl_io_iovec(io);
985 	int			rc = 0;
986 	size_t			i, lbk_cnt;
987 
988 	for (i = 0; i < io->iov_cnt; ++i) {
989 		lbk_cnt = iov[i].iov_len / PAGE_SIZE;
990 		assert(iov[i].iov_len > 0);
991 		assert(lbk_cnt == dev->xfer_size);
992 
993 		ftl_trace_submission(dev, io, wptr->ppa, iov[i].iov_len / PAGE_SIZE);
994 		rc = spdk_nvme_ns_cmd_write_with_md(dev->ns, ftl_get_write_qpair(dev),
995 						    iov[i].iov_base, ftl_io_get_md(io),
996 						    ftl_ppa_addr_pack(dev, wptr->ppa),
997 						    lbk_cnt, ftl_io_cmpl_cb, io, 0, 0, 0);
998 		if (rc) {
999 			SPDK_ERRLOG("spdk_nvme_ns_cmd_write failed with status:%d, ppa:%lu\n",
1000 				    rc, wptr->ppa.ppa);
1001 			io->status = -EIO;
1002 			break;
1003 		}
1004 
1005 		ftl_io_update_iovec(io, lbk_cnt);
1006 		ftl_io_inc_req(io);
1007 		ftl_wptr_advance(wptr, lbk_cnt);
1008 	}
1009 
1010 	if (ftl_io_done(io)) {
1011 		ftl_io_complete(io);
1012 	}
1013 
1014 	return rc;
1015 }
1016 
1017 static void
1018 ftl_flush_pad_batch(struct spdk_ftl_dev *dev)
1019 {
1020 	struct ftl_rwb *rwb = dev->rwb;
1021 	size_t size;
1022 
1023 	size = ftl_rwb_num_acquired(rwb, FTL_RWB_TYPE_INTERNAL) +
1024 	       ftl_rwb_num_acquired(rwb, FTL_RWB_TYPE_USER);
1025 
1026 	/* There must be something in the RWB, otherwise the flush */
1027 	/* wouldn't be waiting for anything */
1028 	assert(size > 0);
1029 
1030 	/* Only add padding when there's less than xfer size */
1031 	/* entries in the buffer. Otherwise we just have to wait */
1032 	/* for the entries to become ready. */
1033 	if (size < dev->xfer_size) {
1034 		ftl_rwb_pad(dev, dev->xfer_size - (size % dev->xfer_size));
1035 	}
1036 }
1037 
1038 static int
1039 ftl_wptr_process_writes(struct ftl_wptr *wptr)
1040 {
1041 	struct spdk_ftl_dev	*dev = wptr->dev;
1042 	struct ftl_rwb_batch	*batch;
1043 	struct ftl_rwb_entry	*entry;
1044 	struct ftl_io		*io;
1045 	struct ftl_ppa		ppa, prev_ppa;
1046 
1047 	/* Make sure the band is prepared for writing */
1048 	if (!ftl_wptr_ready(wptr)) {
1049 		return 0;
1050 	}
1051 
1052 	if (dev->halt) {
1053 		ftl_process_shutdown(dev);
1054 	}
1055 
1056 	batch = ftl_rwb_pop(dev->rwb);
1057 	if (!batch) {
1058 		/* If there are queued flush requests we need to pad the RWB to */
1059 		/* force out remaining entries */
1060 		if (!LIST_EMPTY(&dev->flush_list)) {
1061 			ftl_flush_pad_batch(dev);
1062 		}
1063 
1064 		return 0;
1065 	}
1066 
1067 	io = ftl_io_rwb_init(dev, wptr->band, batch, ftl_write_cb);
1068 	if (!io) {
1069 		goto error;
1070 	}
1071 
1072 	ppa = wptr->ppa;
1073 	ftl_rwb_foreach(entry, batch) {
1074 		entry->ppa = ppa;
1075 
1076 		if (entry->lba != FTL_LBA_INVALID) {
1077 			pthread_spin_lock(&entry->lock);
1078 			prev_ppa = ftl_l2p_get(dev, entry->lba);
1079 
1080 			/* If the l2p was updated in the meantime, don't update band's metadata */
1081 			if (ftl_ppa_cached(prev_ppa) && prev_ppa.offset == entry->pos) {
1082 				/* Setting entry's cache bit needs to be done after metadata */
1083 				/* within the band is updated to make sure that writes */
1084 				/* invalidating the entry clear the metadata as well */
1085 				ftl_band_set_addr(wptr->band, entry->lba, entry->ppa);
1086 				ftl_rwb_entry_set_valid(entry);
1087 			}
1088 			pthread_spin_unlock(&entry->lock);
1089 		}
1090 
1091 		ftl_trace_rwb_pop(dev, entry);
1092 		ftl_update_rwb_stats(dev, entry);
1093 
1094 		ppa = ftl_band_next_ppa(wptr->band, ppa, 1);
1095 	}
1096 
1097 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write ppa:%lx, %lx\n", wptr->ppa.ppa,
1098 		      ftl_ppa_addr_pack(dev, wptr->ppa));
1099 
1100 	if (ftl_submit_write(wptr, io)) {
1101 		/* TODO: we need some recovery here */
1102 		assert(0 && "Write submit failed");
1103 		if (ftl_io_done(io)) {
1104 			ftl_io_free(io);
1105 		}
1106 	}
1107 
1108 	return dev->xfer_size;
1109 error:
1110 	ftl_rwb_batch_revert(batch);
1111 	return 0;
1112 }
1113 
1114 static int
1115 ftl_process_writes(struct spdk_ftl_dev *dev)
1116 {
1117 	struct ftl_wptr *wptr, *twptr;
1118 	size_t num_active = 0;
1119 	enum ftl_band_state state;
1120 
1121 	LIST_FOREACH_SAFE(wptr, &dev->wptr_list, list_entry, twptr) {
1122 		ftl_wptr_process_writes(wptr);
1123 		state = wptr->band->state;
1124 
1125 		if (state != FTL_BAND_STATE_FULL &&
1126 		    state != FTL_BAND_STATE_CLOSING &&
1127 		    state != FTL_BAND_STATE_CLOSED) {
1128 			num_active++;
1129 		}
1130 	}
1131 
1132 	if (num_active < 1) {
1133 		ftl_add_wptr(dev);
1134 	}
1135 
1136 	return 0;
1137 }
1138 
1139 static void
1140 ftl_rwb_entry_fill(struct ftl_rwb_entry *entry, struct ftl_io *io)
1141 {
1142 	struct ftl_band *band;
1143 
1144 	memcpy(entry->data, ftl_io_iovec_addr(io), FTL_BLOCK_SIZE);
1145 
1146 	if (ftl_rwb_entry_weak(entry)) {
1147 		band = ftl_band_from_ppa(io->dev, io->ppa);
1148 		entry->ppa = ftl_band_next_ppa(band, io->ppa, io->pos);
1149 	}
1150 
1151 	entry->trace = io->trace;
1152 
1153 	if (entry->md) {
1154 		memcpy(entry->md, &entry->lba, sizeof(io->lba));
1155 	}
1156 }
1157 
1158 static int
1159 ftl_rwb_fill(struct ftl_io *io)
1160 {
1161 	struct spdk_ftl_dev *dev = io->dev;
1162 	struct ftl_rwb_entry *entry;
1163 	struct ftl_ppa ppa = { .cached = 1 };
1164 	int flags = ftl_rwb_flags_from_io(io);
1165 	uint64_t lba;
1166 
1167 	while (io->pos < io->lbk_cnt) {
1168 		lba = ftl_io_current_lba(io);
1169 		if (lba == FTL_LBA_INVALID) {
1170 			ftl_io_update_iovec(io, 1);
1171 			continue;
1172 		}
1173 
1174 		entry = ftl_acquire_entry(dev, flags);
1175 		if (!entry) {
1176 			return -EAGAIN;
1177 		}
1178 
1179 		entry->lba = lba;
1180 		ftl_rwb_entry_fill(entry, io);
1181 
1182 		ppa.offset = entry->pos;
1183 
1184 		ftl_io_update_iovec(io, 1);
1185 		ftl_update_l2p(dev, entry, ppa);
1186 
1187 		/* Needs to be done after L2P is updated to avoid race with */
1188 		/* write completion callback when it's processed faster than */
1189 		/* L2P is set in update_l2p(). */
1190 		ftl_rwb_push(entry);
1191 		ftl_trace_rwb_fill(dev, io);
1192 	}
1193 
1194 	ftl_io_complete(io);
1195 	return 0;
1196 }
1197 
1198 static bool
1199 ftl_dev_needs_defrag(struct spdk_ftl_dev *dev)
1200 {
1201 	const struct spdk_ftl_limit *limit = ftl_get_limit(dev, SPDK_FTL_LIMIT_START);
1202 
1203 	if (ftl_reloc_is_halted(dev->reloc)) {
1204 		return false;
1205 	}
1206 
1207 	if (dev->df_band) {
1208 		return false;
1209 	}
1210 
1211 	if (dev->num_free <= limit->thld) {
1212 		return true;
1213 	}
1214 
1215 	return false;
1216 }
1217 
1218 static double
1219 ftl_band_calc_merit(struct ftl_band *band, size_t *threshold_valid)
1220 {
1221 	size_t usable, valid, invalid;
1222 	double vld_ratio;
1223 
1224 	/* If the band doesn't have any usable lbks it's of no use */
1225 	usable = ftl_band_num_usable_lbks(band);
1226 	if (usable == 0) {
1227 		return 0.0;
1228 	}
1229 
1230 	valid =  threshold_valid ? (usable - *threshold_valid) : band->md.num_vld;
1231 	invalid = usable - valid;
1232 
1233 	/* Add one to avoid division by 0 */
1234 	vld_ratio = (double)invalid / (double)(valid + 1);
1235 	return vld_ratio * ftl_band_age(band);
1236 }
1237 
1238 static bool
1239 ftl_band_needs_defrag(struct ftl_band *band, struct spdk_ftl_dev *dev)
1240 {
1241 	struct spdk_ftl_conf *conf = &dev->conf;
1242 	size_t thld_vld;
1243 
1244 	/* If we're in dire need of free bands, every band is worth defragging */
1245 	if (ftl_current_limit(dev) == SPDK_FTL_LIMIT_CRIT) {
1246 		return true;
1247 	}
1248 
1249 	thld_vld = (ftl_band_num_usable_lbks(band) * conf->defrag.invalid_thld) / 100;
1250 
1251 	return band->merit > ftl_band_calc_merit(band, &thld_vld);
1252 }
1253 
1254 static struct ftl_band *
1255 ftl_select_defrag_band(struct spdk_ftl_dev *dev)
1256 {
1257 	struct ftl_band *band, *mband = NULL;
1258 	double merit = 0;
1259 
1260 	LIST_FOREACH(band, &dev->shut_bands, list_entry) {
1261 		assert(band->state == FTL_BAND_STATE_CLOSED);
1262 		band->merit = ftl_band_calc_merit(band, NULL);
1263 		if (band->merit > merit) {
1264 			merit = band->merit;
1265 			mband = band;
1266 		}
1267 	}
1268 
1269 	if (mband && !ftl_band_needs_defrag(mband, dev)) {
1270 		mband = NULL;
1271 	}
1272 
1273 	return mband;
1274 }
1275 
1276 static void
1277 ftl_process_relocs(struct spdk_ftl_dev *dev)
1278 {
1279 	struct ftl_band *band;
1280 
1281 	if (ftl_dev_needs_defrag(dev)) {
1282 		band = dev->df_band = ftl_select_defrag_band(dev);
1283 
1284 		if (band) {
1285 			ftl_reloc_add(dev->reloc, band, 0, ftl_num_band_lbks(dev), 0);
1286 			ftl_trace_defrag_band(dev, band);
1287 		}
1288 	}
1289 
1290 	ftl_reloc(dev->reloc);
1291 }
1292 
1293 int
1294 ftl_current_limit(const struct spdk_ftl_dev *dev)
1295 {
1296 	return dev->limit;
1297 }
1298 
1299 void
1300 spdk_ftl_dev_get_attrs(const struct spdk_ftl_dev *dev, struct spdk_ftl_attrs *attrs)
1301 {
1302 	attrs->uuid = dev->uuid;
1303 	attrs->lbk_cnt = dev->num_lbas;
1304 	attrs->lbk_size = FTL_BLOCK_SIZE;
1305 	attrs->range = dev->range;
1306 	attrs->cache_bdev_desc = dev->cache_bdev_desc;
1307 }
1308 
1309 static void
1310 _ftl_io_write(void *ctx)
1311 {
1312 	ftl_io_write((struct ftl_io *)ctx);
1313 }
1314 
1315 int
1316 ftl_io_write(struct ftl_io *io)
1317 {
1318 	struct spdk_ftl_dev *dev = io->dev;
1319 
1320 	/* For normal IOs we just need to copy the data onto the rwb */
1321 	if (!(io->flags & FTL_IO_MD)) {
1322 		return ftl_rwb_fill(io);
1323 	}
1324 
1325 	/* Metadata has its own buffer, so it doesn't have to be copied, so just */
1326 	/* send it the the core thread and schedule the write immediately */
1327 	if (ftl_check_core_thread(dev)) {
1328 		return ftl_submit_write(ftl_wptr_from_band(io->band), io);
1329 	}
1330 
1331 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_write, io);
1332 
1333 	return 0;
1334 }
1335 
1336 static int
1337 _spdk_ftl_write(struct ftl_io *io)
1338 {
1339 	int rc;
1340 
1341 	rc = ftl_io_write(io);
1342 	if (rc == -EAGAIN) {
1343 		spdk_thread_send_msg(spdk_io_channel_get_thread(io->ch),
1344 				     _ftl_write, io);
1345 		return 0;
1346 	}
1347 
1348 	if (rc) {
1349 		ftl_io_free(io);
1350 	}
1351 
1352 	return rc;
1353 }
1354 
1355 static void
1356 _ftl_write(void *ctx)
1357 {
1358 	_spdk_ftl_write(ctx);
1359 }
1360 
1361 int
1362 spdk_ftl_write(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
1363 	       struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
1364 {
1365 	struct ftl_io *io;
1366 
1367 	if (iov_cnt == 0 || iov_cnt > FTL_MAX_IOV) {
1368 		return -EINVAL;
1369 	}
1370 
1371 	if (lba_cnt == 0) {
1372 		return -EINVAL;
1373 	}
1374 
1375 	if (lba_cnt != ftl_iovec_num_lbks(iov, iov_cnt)) {
1376 		return -EINVAL;
1377 	}
1378 
1379 	if (!dev->initialized) {
1380 		return -EBUSY;
1381 	}
1382 
1383 	io = ftl_io_alloc(ch);
1384 	if (!io) {
1385 		return -ENOMEM;
1386 	}
1387 
1388 	ftl_io_user_init(dev, io, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_WRITE);
1389 	return _spdk_ftl_write(io);
1390 }
1391 
1392 int
1393 ftl_io_read(struct ftl_io *io)
1394 {
1395 	struct spdk_ftl_dev *dev = io->dev;
1396 	ftl_next_ppa_fn	next_ppa;
1397 
1398 	if (ftl_check_read_thread(dev)) {
1399 		if (ftl_io_mode_ppa(io)) {
1400 			next_ppa = ftl_ppa_read_next_ppa;
1401 		} else {
1402 			next_ppa = ftl_lba_read_next_ppa;
1403 		}
1404 
1405 		return ftl_submit_read(io, next_ppa, NULL);
1406 	}
1407 
1408 	spdk_thread_send_msg(ftl_get_read_thread(dev), _ftl_read, io);
1409 	return 0;
1410 }
1411 
1412 static void
1413 _ftl_read(void *arg)
1414 {
1415 	ftl_io_read((struct ftl_io *)arg);
1416 }
1417 
1418 int
1419 spdk_ftl_read(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
1420 	      struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
1421 {
1422 	struct ftl_io *io;
1423 
1424 	if (iov_cnt == 0 || iov_cnt > FTL_MAX_IOV) {
1425 		return -EINVAL;
1426 	}
1427 
1428 	if (lba_cnt == 0) {
1429 		return -EINVAL;
1430 	}
1431 
1432 	if (lba_cnt != ftl_iovec_num_lbks(iov, iov_cnt)) {
1433 		return -EINVAL;
1434 	}
1435 
1436 	if (!dev->initialized) {
1437 		return -EBUSY;
1438 	}
1439 
1440 	io = ftl_io_alloc(ch);
1441 	if (!io) {
1442 		return -ENOMEM;
1443 	}
1444 
1445 	ftl_io_user_init(dev, io, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_READ);
1446 	ftl_io_read(io);
1447 	return 0;
1448 }
1449 
1450 static struct ftl_flush *
1451 ftl_flush_init(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
1452 {
1453 	struct ftl_flush *flush;
1454 	struct ftl_rwb *rwb = dev->rwb;
1455 
1456 	flush = calloc(1, sizeof(*flush));
1457 	if (!flush) {
1458 		return NULL;
1459 	}
1460 
1461 	flush->bmap = spdk_bit_array_create(ftl_rwb_num_batches(rwb));
1462 	if (!flush->bmap) {
1463 		goto error;
1464 	}
1465 
1466 	flush->dev = dev;
1467 	flush->cb.fn = cb_fn;
1468 	flush->cb.ctx = cb_arg;
1469 
1470 	return flush;
1471 error:
1472 	free(flush);
1473 	return NULL;
1474 }
1475 
1476 static void
1477 _ftl_flush(void *ctx)
1478 {
1479 	struct ftl_flush *flush = ctx;
1480 	struct spdk_ftl_dev *dev = flush->dev;
1481 	struct ftl_rwb *rwb = dev->rwb;
1482 	struct ftl_rwb_batch *batch;
1483 
1484 	/* Attach flush object to all non-empty batches */
1485 	ftl_rwb_foreach_batch(batch, rwb) {
1486 		if (!ftl_rwb_batch_empty(batch)) {
1487 			spdk_bit_array_set(flush->bmap, ftl_rwb_batch_get_offset(batch));
1488 			flush->num_req++;
1489 		}
1490 	}
1491 
1492 	LIST_INSERT_HEAD(&dev->flush_list, flush, list_entry);
1493 
1494 	/* If the RWB was already empty, the flush can be completed right away */
1495 	if (!flush->num_req) {
1496 		ftl_complete_flush(flush);
1497 	}
1498 }
1499 
1500 int
1501 spdk_ftl_flush(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
1502 {
1503 	struct ftl_flush *flush;
1504 
1505 	if (!dev->initialized) {
1506 		return -EBUSY;
1507 	}
1508 
1509 	flush = ftl_flush_init(dev, cb_fn, cb_arg);
1510 	if (!flush) {
1511 		return -ENOMEM;
1512 	}
1513 
1514 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_flush, flush);
1515 	return 0;
1516 }
1517 
1518 void
1519 ftl_process_anm_event(struct ftl_anm_event *event)
1520 {
1521 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Unconsumed ANM received for dev: %p...\n", event->dev);
1522 	ftl_anm_event_complete(event);
1523 }
1524 
1525 static void
1526 ftl_process_retry_queue(struct spdk_ftl_dev *dev)
1527 {
1528 	struct ftl_io *io;
1529 	int rc;
1530 
1531 	while (!TAILQ_EMPTY(&dev->retry_queue)) {
1532 		io = TAILQ_FIRST(&dev->retry_queue);
1533 
1534 		/* Retry only if IO is still healthy */
1535 		if (spdk_likely(io->status == 0)) {
1536 			rc = ftl_io_read(io);
1537 			if (rc == -ENOMEM) {
1538 				break;
1539 			}
1540 		}
1541 
1542 		io->flags &= ~FTL_IO_RETRY;
1543 		TAILQ_REMOVE(&dev->retry_queue, io, retry_entry);
1544 
1545 		if (ftl_io_done(io)) {
1546 			ftl_io_complete(io);
1547 		}
1548 	}
1549 }
1550 
1551 int
1552 ftl_task_read(void *ctx)
1553 {
1554 	struct ftl_thread *thread = ctx;
1555 	struct spdk_ftl_dev *dev = thread->dev;
1556 	struct spdk_nvme_qpair *qpair = ftl_get_read_qpair(dev);
1557 	size_t num_completed;
1558 
1559 	if (dev->halt) {
1560 		if (ftl_shutdown_complete(dev)) {
1561 			spdk_poller_unregister(&thread->poller);
1562 			return 0;
1563 		}
1564 	}
1565 
1566 	num_completed = spdk_nvme_qpair_process_completions(qpair, 0);
1567 
1568 	if (num_completed && !TAILQ_EMPTY(&dev->retry_queue)) {
1569 		ftl_process_retry_queue(dev);
1570 	}
1571 
1572 	return num_completed;
1573 }
1574 
1575 int
1576 ftl_task_core(void *ctx)
1577 {
1578 	struct ftl_thread *thread = ctx;
1579 	struct spdk_ftl_dev *dev = thread->dev;
1580 	struct spdk_nvme_qpair *qpair = ftl_get_write_qpair(dev);
1581 
1582 	if (dev->halt) {
1583 		if (ftl_shutdown_complete(dev)) {
1584 			spdk_poller_unregister(&thread->poller);
1585 			return 0;
1586 		}
1587 	}
1588 
1589 	ftl_process_writes(dev);
1590 	spdk_nvme_qpair_process_completions(qpair, 0);
1591 	ftl_process_relocs(dev);
1592 
1593 	return 0;
1594 }
1595 
1596 SPDK_LOG_REGISTER_COMPONENT("ftl_core", SPDK_LOG_FTL_CORE)
1597