xref: /spdk/lib/ftl/ftl_core.c (revision 0ed66e7ef6965ba59db2c57aa64e9898f12c302d)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/likely.h"
35 #include "spdk/stdinc.h"
36 #include "spdk/nvme.h"
37 #include "spdk/io_channel.h"
38 #include "spdk/bdev_module.h"
39 #include "spdk_internal/log.h"
40 #include "spdk/ftl.h"
41 
42 #include "ftl_core.h"
43 #include "ftl_band.h"
44 #include "ftl_io.h"
45 #include "ftl_anm.h"
46 #include "ftl_rwb.h"
47 #include "ftl_debug.h"
48 #include "ftl_reloc.h"
49 
50 /* Max number of iovecs */
51 #define FTL_MAX_IOV 1024
52 
53 struct ftl_wptr {
54 	/* Owner device */
55 	struct spdk_ftl_dev		*dev;
56 
57 	/* Current PPA */
58 	struct ftl_ppa			ppa;
59 
60 	/* Band currently being written to */
61 	struct ftl_band			*band;
62 
63 	/* Current logical block's offset */
64 	uint64_t			offset;
65 
66 	/* Current erase block */
67 	struct ftl_chunk		*chunk;
68 
69 	/* Metadata DMA buffer */
70 	void				*md_buf;
71 
72 	/* List link */
73 	LIST_ENTRY(ftl_wptr)		list_entry;
74 };
75 
76 struct ftl_flush {
77 	/* Owner device */
78 	struct spdk_ftl_dev		*dev;
79 
80 	/* Number of batches to wait for */
81 	size_t				num_req;
82 
83 	/* Callback */
84 	struct ftl_cb			cb;
85 
86 	/* Batch bitmap */
87 	struct spdk_bit_array		*bmap;
88 
89 	/* List link */
90 	LIST_ENTRY(ftl_flush)		list_entry;
91 };
92 
93 typedef int (*ftl_next_ppa_fn)(struct ftl_io *, struct ftl_ppa *, size_t, void *);
94 static void _ftl_read(void *);
95 static void _ftl_write(void *);
96 
97 static int
98 ftl_rwb_flags_from_io(const struct ftl_io *io)
99 {
100 	int valid_flags = FTL_IO_INTERNAL | FTL_IO_WEAK | FTL_IO_PAD;
101 	return io->flags & valid_flags;
102 }
103 
104 static int
105 ftl_rwb_entry_weak(const struct ftl_rwb_entry *entry)
106 {
107 	return entry->flags & FTL_IO_WEAK;
108 }
109 
110 static void
111 ftl_wptr_free(struct ftl_wptr *wptr)
112 {
113 	if (!wptr) {
114 		return;
115 	}
116 
117 	spdk_dma_free(wptr->md_buf);
118 	free(wptr);
119 }
120 
121 static void
122 ftl_remove_wptr(struct ftl_wptr *wptr)
123 {
124 	LIST_REMOVE(wptr, list_entry);
125 	ftl_wptr_free(wptr);
126 }
127 
128 static void
129 ftl_io_cmpl_cb(void *arg, const struct spdk_nvme_cpl *status)
130 {
131 	struct ftl_io *io = arg;
132 
133 	if (spdk_nvme_cpl_is_error(status)) {
134 		ftl_io_process_error(io, status);
135 	}
136 
137 	ftl_trace_completion(io->dev, io, FTL_TRACE_COMPLETION_DISK);
138 
139 	ftl_io_dec_req(io);
140 
141 	if (ftl_io_done(io)) {
142 		ftl_io_complete(io);
143 	}
144 }
145 
146 static void
147 ftl_halt_writes(struct spdk_ftl_dev *dev, struct ftl_band *band)
148 {
149 	struct ftl_wptr *wptr = NULL;
150 
151 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
152 		if (wptr->band == band) {
153 			break;
154 		}
155 	}
156 
157 	/* If the band already has the high_prio flag set, other writes must */
158 	/* have failed earlier, so it's already taken care of. */
159 	if (band->high_prio) {
160 		assert(wptr == NULL);
161 		return;
162 	}
163 
164 	ftl_band_write_failed(band);
165 	ftl_remove_wptr(wptr);
166 }
167 
168 static struct ftl_wptr *
169 ftl_wptr_from_band(struct ftl_band *band)
170 {
171 	struct spdk_ftl_dev *dev = band->dev;
172 	struct ftl_wptr *wptr = NULL;
173 
174 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
175 		if (wptr->band == band) {
176 			return wptr;
177 		}
178 	}
179 
180 	return NULL;
181 }
182 
183 static void
184 ftl_md_write_fail(struct ftl_io *io, int status)
185 {
186 	struct ftl_band *band = io->band;
187 	struct ftl_wptr *wptr;
188 	char buf[128];
189 
190 	wptr = ftl_wptr_from_band(band);
191 
192 	SPDK_ERRLOG("Metadata write failed @ppa: %s, status: %d\n",
193 		    ftl_ppa2str(wptr->ppa, buf, sizeof(buf)), status);
194 
195 	ftl_halt_writes(io->dev, band);
196 }
197 
198 static void
199 ftl_md_write_cb(void *arg, int status)
200 {
201 	struct ftl_io *io = arg;
202 	struct ftl_wptr *wptr;
203 
204 	wptr = ftl_wptr_from_band(io->band);
205 
206 	if (status) {
207 		ftl_md_write_fail(io, status);
208 		return;
209 	}
210 
211 	ftl_band_set_next_state(io->band);
212 	if (io->band->state == FTL_BAND_STATE_CLOSED) {
213 		ftl_remove_wptr(wptr);
214 	}
215 }
216 
217 static int
218 ftl_ppa_read_next_ppa(struct ftl_io *io, struct ftl_ppa *ppa,
219 		      size_t lbk, void *ctx)
220 {
221 	struct spdk_ftl_dev *dev = io->dev;
222 	size_t lbk_cnt, max_lbks;
223 
224 	assert(ftl_io_mode_ppa(io));
225 	assert(io->iov_pos < io->iov_cnt);
226 
227 	if (lbk == 0) {
228 		*ppa = io->ppa;
229 	} else {
230 		*ppa = ftl_band_next_xfer_ppa(io->band, io->ppa, lbk);
231 	}
232 
233 	assert(!ftl_ppa_invalid(*ppa));
234 
235 	/* Metadata has to be read in the way it's written (jumping across */
236 	/* the chunks in xfer_size increments) */
237 	if (io->flags & FTL_IO_MD) {
238 		max_lbks = dev->xfer_size - (ppa->lbk % dev->xfer_size);
239 		lbk_cnt = spdk_min(ftl_io_iovec_len_left(io), max_lbks);
240 		assert(ppa->lbk / dev->xfer_size == (ppa->lbk + lbk_cnt - 1) / dev->xfer_size);
241 	} else {
242 		lbk_cnt = ftl_io_iovec_len_left(io);
243 	}
244 
245 	return lbk_cnt;
246 }
247 
248 static int
249 ftl_wptr_close_band(struct ftl_wptr *wptr)
250 {
251 	struct ftl_band *band = wptr->band;
252 
253 	ftl_band_set_state(band, FTL_BAND_STATE_CLOSING);
254 	band->tail_md_ppa = wptr->ppa;
255 
256 	return ftl_band_write_tail_md(band, wptr->md_buf, ftl_md_write_cb);
257 }
258 
259 static int
260 ftl_wptr_open_band(struct ftl_wptr *wptr)
261 {
262 	struct ftl_band *band = wptr->band;
263 
264 	assert(ftl_band_chunk_is_first(band, wptr->chunk));
265 	assert(band->md.num_vld == 0);
266 
267 	ftl_band_clear_md(band);
268 
269 	assert(band->state == FTL_BAND_STATE_PREP);
270 	ftl_band_set_state(band, FTL_BAND_STATE_OPENING);
271 
272 	return ftl_band_write_head_md(band, wptr->md_buf, ftl_md_write_cb);
273 }
274 
275 static int
276 ftl_submit_erase(struct ftl_io *io)
277 {
278 	struct spdk_ftl_dev *dev = io->dev;
279 	struct ftl_band *band = io->band;
280 	struct ftl_ppa ppa = io->ppa;
281 	struct ftl_chunk *chunk;
282 	uint64_t ppa_packed;
283 	int rc = 0;
284 	size_t i;
285 
286 	for (i = 0; i < io->lbk_cnt; ++i) {
287 		if (i != 0) {
288 			chunk = ftl_band_next_chunk(band, ftl_band_chunk_from_ppa(band, ppa));
289 			assert(chunk->state == FTL_CHUNK_STATE_CLOSED ||
290 			       chunk->state == FTL_CHUNK_STATE_VACANT);
291 			ppa = chunk->start_ppa;
292 		}
293 
294 		assert(ppa.lbk == 0);
295 		ppa_packed = ftl_ppa_addr_pack(dev, ppa);
296 
297 		ftl_io_inc_req(io);
298 
299 		ftl_trace_submission(dev, io, ppa, 1);
300 		rc = spdk_nvme_ocssd_ns_cmd_vector_reset(dev->ns, ftl_get_write_qpair(dev),
301 				&ppa_packed, 1, NULL, ftl_io_cmpl_cb, io);
302 		if (rc) {
303 			SPDK_ERRLOG("Vector reset failed with status: %d\n", rc);
304 			ftl_io_dec_req(io);
305 			break;
306 		}
307 	}
308 
309 	if (ftl_io_done(io)) {
310 		ftl_io_complete(io);
311 	}
312 
313 	return rc;
314 }
315 
316 static void
317 _ftl_io_erase(void *ctx)
318 {
319 	ftl_io_erase((struct ftl_io *)ctx);
320 }
321 
322 static bool
323 ftl_check_core_thread(const struct spdk_ftl_dev *dev)
324 {
325 	return dev->core_thread.thread == spdk_get_thread();
326 }
327 
328 static bool
329 ftl_check_read_thread(const struct spdk_ftl_dev *dev)
330 {
331 	return dev->read_thread.thread == spdk_get_thread();
332 }
333 
334 int
335 ftl_io_erase(struct ftl_io *io)
336 {
337 	struct spdk_ftl_dev *dev = io->dev;
338 
339 	if (ftl_check_core_thread(dev)) {
340 		return ftl_submit_erase(io);
341 	}
342 
343 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_erase, io);
344 	return 0;
345 }
346 
347 static struct ftl_band *
348 ftl_next_write_band(struct spdk_ftl_dev *dev)
349 {
350 	struct ftl_band *band;
351 
352 	band = LIST_FIRST(&dev->free_bands);
353 	if (!band) {
354 		return NULL;
355 	}
356 	assert(band->state == FTL_BAND_STATE_FREE);
357 
358 	if (ftl_band_erase(band)) {
359 		/* TODO: handle erase failure */
360 		return NULL;
361 	}
362 
363 	return band;
364 }
365 
366 static struct ftl_band *
367 ftl_next_wptr_band(struct spdk_ftl_dev *dev)
368 {
369 	struct ftl_band *band;
370 
371 	if (!dev->next_band) {
372 		band = ftl_next_write_band(dev);
373 	} else {
374 		assert(dev->next_band->state == FTL_BAND_STATE_PREP);
375 		band = dev->next_band;
376 		dev->next_band = NULL;
377 	}
378 
379 	return band;
380 }
381 
382 static struct ftl_wptr *
383 ftl_wptr_init(struct ftl_band *band)
384 {
385 	struct spdk_ftl_dev *dev = band->dev;
386 	struct ftl_wptr *wptr;
387 
388 	wptr = calloc(1, sizeof(*wptr));
389 	if (!wptr) {
390 		return NULL;
391 	}
392 
393 	wptr->md_buf = spdk_dma_zmalloc(ftl_tail_md_num_lbks(dev) * FTL_BLOCK_SIZE,
394 					FTL_BLOCK_SIZE, NULL);
395 	if (!wptr->md_buf) {
396 		ftl_wptr_free(wptr);
397 		return NULL;
398 	}
399 
400 	wptr->dev = dev;
401 	wptr->band = band;
402 	wptr->chunk = CIRCLEQ_FIRST(&band->chunks);
403 	wptr->ppa = wptr->chunk->start_ppa;
404 
405 	return wptr;
406 }
407 
408 static int
409 ftl_add_wptr(struct spdk_ftl_dev *dev)
410 {
411 	struct ftl_band *band;
412 	struct ftl_wptr *wptr;
413 
414 	band = ftl_next_wptr_band(dev);
415 	if (!band) {
416 		return -1;
417 	}
418 
419 	wptr = ftl_wptr_init(band);
420 	if (!wptr) {
421 		return -1;
422 	}
423 
424 	if (ftl_band_write_prep(band)) {
425 		ftl_wptr_free(wptr);
426 		return -1;
427 	}
428 
429 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
430 
431 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: band %u\n", band->id);
432 	ftl_trace_write_band(dev, band);
433 	return 0;
434 }
435 
436 static void
437 ftl_wptr_advance(struct ftl_wptr *wptr, size_t xfer_size)
438 {
439 	struct ftl_band *band = wptr->band;
440 	struct spdk_ftl_dev *dev = wptr->dev;
441 	struct spdk_ftl_conf *conf = &dev->conf;
442 	size_t next_thld;
443 
444 	wptr->offset += xfer_size;
445 	next_thld = (ftl_band_num_usable_lbks(band) * conf->band_thld) / 100;
446 
447 	if (ftl_band_full(band, wptr->offset)) {
448 		ftl_band_set_state(band, FTL_BAND_STATE_FULL);
449 	}
450 
451 	wptr->ppa = ftl_band_next_xfer_ppa(band, wptr->ppa, xfer_size);
452 	wptr->chunk = ftl_band_next_operational_chunk(band, wptr->chunk);
453 
454 	assert(!ftl_ppa_invalid(wptr->ppa));
455 
456 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: grp:%d, pu:%d chunk:%d, lbk:%u\n",
457 		      wptr->ppa.grp, wptr->ppa.pu, wptr->ppa.chk, wptr->ppa.lbk);
458 
459 	if (wptr->offset >= next_thld && !dev->next_band) {
460 		dev->next_band = ftl_next_write_band(dev);
461 	}
462 }
463 
464 static int
465 ftl_wptr_ready(struct ftl_wptr *wptr)
466 {
467 	struct ftl_band *band = wptr->band;
468 
469 	/* TODO: add handling of empty bands */
470 
471 	if (spdk_unlikely(!ftl_chunk_is_writable(wptr->chunk))) {
472 		/* Erasing band may fail after it was assigned to wptr. */
473 		if (spdk_unlikely(wptr->chunk->state == FTL_CHUNK_STATE_BAD)) {
474 			ftl_wptr_advance(wptr, wptr->dev->xfer_size);
475 		}
476 		return 0;
477 	}
478 
479 	/* If we're in the process of writing metadata, wait till it is */
480 	/* completed. */
481 	/* TODO: we should probably change bands once we're writing tail md */
482 	if (ftl_band_state_changing(band)) {
483 		return 0;
484 	}
485 
486 	if (band->state == FTL_BAND_STATE_FULL) {
487 		if (ftl_wptr_close_band(wptr)) {
488 			/* TODO: need recovery here */
489 			assert(false);
490 		}
491 		return 0;
492 	}
493 
494 	if (band->state != FTL_BAND_STATE_OPEN) {
495 		if (ftl_wptr_open_band(wptr)) {
496 			/* TODO: need recovery here */
497 			assert(false);
498 		}
499 		return 0;
500 	}
501 
502 	return 1;
503 }
504 
505 static const struct spdk_ftl_limit *
506 ftl_get_limit(const struct spdk_ftl_dev *dev, int type)
507 {
508 	assert(type < SPDK_FTL_LIMIT_MAX);
509 	return &dev->conf.defrag.limits[type];
510 }
511 
512 static bool
513 ftl_cache_lba_valid(struct spdk_ftl_dev *dev, struct ftl_rwb_entry *entry)
514 {
515 	struct ftl_ppa ppa;
516 
517 	/* If the LBA is invalid don't bother checking the md and l2p */
518 	if (spdk_unlikely(entry->lba == FTL_LBA_INVALID)) {
519 		return false;
520 	}
521 
522 	ppa = ftl_l2p_get(dev, entry->lba);
523 	if (!(ftl_ppa_cached(ppa) && ppa.offset == entry->pos)) {
524 		return false;
525 	}
526 
527 	return true;
528 }
529 
530 static void
531 ftl_evict_cache_entry(struct spdk_ftl_dev *dev, struct ftl_rwb_entry *entry)
532 {
533 	pthread_spin_lock(&entry->lock);
534 
535 	if (!ftl_rwb_entry_valid(entry)) {
536 		goto unlock;
537 	}
538 
539 	/* If the l2p wasn't updated and still points at the entry, fill it with the */
540 	/* on-disk PPA and clear the cache status bit. Otherwise, skip the l2p update */
541 	/* and just clear the cache status. */
542 	if (!ftl_cache_lba_valid(dev, entry)) {
543 		goto clear;
544 	}
545 
546 	ftl_l2p_set(dev, entry->lba, entry->ppa);
547 clear:
548 	ftl_rwb_entry_invalidate(entry);
549 unlock:
550 	pthread_spin_unlock(&entry->lock);
551 }
552 
553 static struct ftl_rwb_entry *
554 ftl_acquire_entry(struct spdk_ftl_dev *dev, int flags)
555 {
556 	struct ftl_rwb_entry *entry;
557 
558 	entry = ftl_rwb_acquire(dev->rwb, ftl_rwb_type_from_flags(flags));
559 	if (!entry) {
560 		return NULL;
561 	}
562 
563 	ftl_evict_cache_entry(dev, entry);
564 
565 	entry->flags = flags;
566 	return entry;
567 }
568 
569 static void
570 ftl_rwb_pad(struct spdk_ftl_dev *dev, size_t size)
571 {
572 	struct ftl_rwb_entry *entry;
573 	int flags = FTL_IO_PAD | FTL_IO_INTERNAL;
574 
575 	for (size_t i = 0; i < size; ++i) {
576 		entry = ftl_acquire_entry(dev, flags);
577 		if (!entry) {
578 			break;
579 		}
580 
581 		entry->lba = FTL_LBA_INVALID;
582 		entry->ppa = ftl_to_ppa(FTL_PPA_INVALID);
583 		memset(entry->data, 0, FTL_BLOCK_SIZE);
584 		ftl_rwb_push(entry);
585 	}
586 }
587 
588 static void
589 ftl_remove_free_bands(struct spdk_ftl_dev *dev)
590 {
591 	while (!LIST_EMPTY(&dev->free_bands)) {
592 		LIST_REMOVE(LIST_FIRST(&dev->free_bands), list_entry);
593 	}
594 
595 	dev->next_band = NULL;
596 }
597 
598 static void
599 ftl_process_shutdown(struct spdk_ftl_dev *dev)
600 {
601 	size_t size = ftl_rwb_num_acquired(dev->rwb, FTL_RWB_TYPE_INTERNAL) +
602 		      ftl_rwb_num_acquired(dev->rwb, FTL_RWB_TYPE_USER);
603 
604 	if (size >= dev->xfer_size) {
605 		return;
606 	}
607 
608 	/* If we reach this point we need to remove free bands */
609 	/* and pad current wptr band to the end */
610 	ftl_remove_free_bands(dev);
611 
612 	/* Pad write buffer until band is full */
613 	ftl_rwb_pad(dev, dev->xfer_size - size);
614 }
615 
616 static int
617 ftl_shutdown_complete(struct spdk_ftl_dev *dev)
618 {
619 	return !__atomic_load_n(&dev->num_inflight, __ATOMIC_SEQ_CST) &&
620 	       LIST_EMPTY(&dev->wptr_list);
621 }
622 
623 void
624 ftl_apply_limits(struct spdk_ftl_dev *dev)
625 {
626 	const struct spdk_ftl_limit *limit;
627 	struct ftl_stats *stats = &dev->stats;
628 	size_t rwb_limit[FTL_RWB_TYPE_MAX];
629 	int i;
630 
631 	ftl_rwb_get_limits(dev->rwb, rwb_limit);
632 
633 	/* Clear existing limit */
634 	dev->limit = SPDK_FTL_LIMIT_MAX;
635 
636 	for (i = SPDK_FTL_LIMIT_CRIT; i < SPDK_FTL_LIMIT_MAX; ++i) {
637 		limit = ftl_get_limit(dev, i);
638 
639 		if (dev->num_free <= limit->thld) {
640 			rwb_limit[FTL_RWB_TYPE_USER] =
641 				(limit->limit * ftl_rwb_entry_cnt(dev->rwb)) / 100;
642 			stats->limits[i]++;
643 			dev->limit = i;
644 			goto apply;
645 		}
646 	}
647 
648 	/* Clear the limits, since we don't need to apply them anymore */
649 	rwb_limit[FTL_RWB_TYPE_USER] = ftl_rwb_entry_cnt(dev->rwb);
650 apply:
651 	ftl_trace_limits(dev, rwb_limit, dev->num_free);
652 	ftl_rwb_set_limits(dev->rwb, rwb_limit);
653 }
654 
655 static int
656 ftl_invalidate_addr_unlocked(struct spdk_ftl_dev *dev, struct ftl_ppa ppa)
657 {
658 	struct ftl_band *band = ftl_band_from_ppa(dev, ppa);
659 	struct ftl_md *md = &band->md;
660 	uint64_t offset;
661 
662 	offset = ftl_band_lbkoff_from_ppa(band, ppa);
663 
664 	/* The bit might be already cleared if two writes are scheduled to the */
665 	/* same LBA at the same time */
666 	if (spdk_bit_array_get(md->vld_map, offset)) {
667 		assert(md->num_vld > 0);
668 		spdk_bit_array_clear(md->vld_map, offset);
669 		md->num_vld--;
670 		return 1;
671 	}
672 
673 	return 0;
674 }
675 
676 int
677 ftl_invalidate_addr(struct spdk_ftl_dev *dev, struct ftl_ppa ppa)
678 {
679 	struct ftl_band *band;
680 	int rc;
681 
682 	assert(!ftl_ppa_cached(ppa));
683 	band = ftl_band_from_ppa(dev, ppa);
684 
685 	pthread_spin_lock(&band->md.lock);
686 	rc = ftl_invalidate_addr_unlocked(dev, ppa);
687 	pthread_spin_unlock(&band->md.lock);
688 
689 	return rc;
690 }
691 
692 static int
693 ftl_read_retry(int rc)
694 {
695 	return rc == -EAGAIN;
696 }
697 
698 static int
699 ftl_read_canceled(int rc)
700 {
701 	return rc == 0;
702 }
703 
704 static void
705 ftl_add_to_retry_queue(struct ftl_io *io)
706 {
707 	if (!(io->flags & FTL_IO_RETRY)) {
708 		io->flags |= FTL_IO_RETRY;
709 		TAILQ_INSERT_TAIL(&io->dev->retry_queue, io, retry_entry);
710 	}
711 }
712 
713 static int
714 ftl_submit_read(struct ftl_io *io, ftl_next_ppa_fn next_ppa,
715 		void *ctx)
716 {
717 	struct spdk_ftl_dev *dev = io->dev;
718 	struct ftl_ppa ppa;
719 	int rc = 0, lbk_cnt;
720 
721 	while (io->pos < io->lbk_cnt) {
722 		/* We might hit the cache here, if so, skip the read */
723 		lbk_cnt = rc = next_ppa(io, &ppa, io->pos, ctx);
724 
725 		/* We might need to retry the read from scratch (e.g. */
726 		/* because write was under way and completed before */
727 		/* we could read it from rwb */
728 		if (ftl_read_retry(rc)) {
729 			continue;
730 		}
731 
732 		/* We don't have to schedule the read, as it was read from cache */
733 		if (ftl_read_canceled(rc)) {
734 			ftl_io_update_iovec(io, 1);
735 			continue;
736 		}
737 
738 		assert(lbk_cnt > 0);
739 
740 		ftl_trace_submission(dev, io, ppa, lbk_cnt);
741 		rc = spdk_nvme_ns_cmd_read(dev->ns, ftl_get_read_qpair(dev),
742 					   ftl_io_iovec_addr(io),
743 					   ftl_ppa_addr_pack(io->dev, ppa), lbk_cnt,
744 					   ftl_io_cmpl_cb, io, 0);
745 		if (rc == -ENOMEM) {
746 			ftl_add_to_retry_queue(io);
747 			break;
748 		} else if (rc) {
749 			io->status = rc;
750 			break;
751 		}
752 
753 		ftl_io_update_iovec(io, lbk_cnt);
754 		ftl_io_inc_req(io);
755 	}
756 
757 	/* If we didn't have to read anything from the device, */
758 	/* complete the request right away */
759 	if (ftl_io_done(io)) {
760 		ftl_io_complete(io);
761 	}
762 
763 	return rc;
764 }
765 
766 static int
767 ftl_ppa_cache_read(struct ftl_io *io, uint64_t lba,
768 		   struct ftl_ppa ppa, void *buf)
769 {
770 	struct ftl_rwb *rwb = io->dev->rwb;
771 	struct ftl_rwb_entry *entry;
772 	struct ftl_ppa nppa;
773 	int rc = 0;
774 
775 	entry = ftl_rwb_entry_from_offset(rwb, ppa.offset);
776 	pthread_spin_lock(&entry->lock);
777 
778 	nppa = ftl_l2p_get(io->dev, lba);
779 	if (ppa.ppa != nppa.ppa) {
780 		rc = -1;
781 		goto out;
782 	}
783 
784 	memcpy(buf, entry->data, FTL_BLOCK_SIZE);
785 out:
786 	pthread_spin_unlock(&entry->lock);
787 	return rc;
788 }
789 
790 static int
791 ftl_lba_read_next_ppa(struct ftl_io *io, struct ftl_ppa *ppa,
792 		      size_t lbk, void *ctx)
793 {
794 	struct spdk_ftl_dev *dev = io->dev;
795 	struct ftl_ppa next_ppa;
796 	size_t i;
797 
798 	*ppa = ftl_l2p_get(dev, io->lba + lbk);
799 
800 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Read ppa:%lx, lba:%lu\n", ppa->ppa, io->lba);
801 
802 	/* If the PPA is invalid, skip it (the buffer should already be zero'ed) */
803 	if (ftl_ppa_invalid(*ppa)) {
804 		ftl_trace_completion(io->dev, io, FTL_TRACE_COMPLETION_INVALID);
805 		return 0;
806 	}
807 
808 	if (ftl_ppa_cached(*ppa)) {
809 		if (!ftl_ppa_cache_read(io, io->lba + lbk, *ppa, ftl_io_iovec_addr(io))) {
810 			ftl_trace_completion(io->dev, io, FTL_TRACE_COMPLETION_CACHE);
811 			return 0;
812 		}
813 
814 		/* If the state changed, we have to re-read the l2p */
815 		return -EAGAIN;
816 	}
817 
818 	for (i = 1; i < ftl_io_iovec_len_left(io); ++i) {
819 		next_ppa = ftl_l2p_get(dev, io->lba + lbk + i);
820 
821 		if (ftl_ppa_invalid(next_ppa) || ftl_ppa_cached(next_ppa)) {
822 			break;
823 		}
824 
825 		if (ftl_ppa_addr_pack(dev, *ppa) + i != ftl_ppa_addr_pack(dev, next_ppa)) {
826 			break;
827 		}
828 	}
829 
830 	return i;
831 }
832 
833 static void
834 ftl_complete_flush(struct ftl_flush *flush)
835 {
836 	assert(flush->num_req == 0);
837 	LIST_REMOVE(flush, list_entry);
838 
839 	flush->cb.fn(flush->cb.ctx, 0);
840 
841 	spdk_bit_array_free(&flush->bmap);
842 	free(flush);
843 }
844 
845 static void
846 ftl_process_flush(struct spdk_ftl_dev *dev, struct ftl_rwb_batch *batch)
847 {
848 	struct ftl_flush *flush, *tflush;
849 	size_t offset;
850 
851 	LIST_FOREACH_SAFE(flush, &dev->flush_list, list_entry, tflush) {
852 		offset = ftl_rwb_batch_get_offset(batch);
853 
854 		if (spdk_bit_array_get(flush->bmap, offset)) {
855 			spdk_bit_array_set(flush->bmap, offset);
856 			if (!(--flush->num_req)) {
857 				ftl_complete_flush(flush);
858 			}
859 		}
860 	}
861 }
862 
863 static void
864 ftl_write_fail(struct ftl_io *io, int status)
865 {
866 	struct ftl_rwb_batch *batch = io->rwb_batch;
867 	struct spdk_ftl_dev *dev = io->dev;
868 	struct ftl_rwb_entry *entry;
869 	struct ftl_band *band;
870 	char buf[128];
871 
872 	entry = ftl_rwb_batch_first_entry(batch);
873 
874 	band = ftl_band_from_ppa(io->dev, entry->ppa);
875 	SPDK_ERRLOG("Write failed @ppa: %s, status: %d\n",
876 		    ftl_ppa2str(entry->ppa, buf, sizeof(buf)), status);
877 
878 	/* Close the band and, halt wptr and defrag */
879 	ftl_halt_writes(dev, band);
880 
881 	ftl_rwb_foreach(entry, batch) {
882 		/* Invalidate meta set by process_writes() */
883 		ftl_invalidate_addr(dev, entry->ppa);
884 	}
885 
886 	/* Reset the batch back to the the RWB to resend it later */
887 	ftl_rwb_batch_revert(batch);
888 }
889 
890 static void
891 ftl_write_cb(void *arg, int status)
892 {
893 	struct ftl_io *io = arg;
894 	struct spdk_ftl_dev *dev = io->dev;
895 	struct ftl_rwb_batch *batch = io->rwb_batch;
896 	struct ftl_rwb_entry *entry;
897 
898 	if (status) {
899 		ftl_write_fail(io, status);
900 		return;
901 	}
902 
903 	assert(io->lbk_cnt == dev->xfer_size);
904 	ftl_rwb_foreach(entry, batch) {
905 		if (!(io->flags & FTL_IO_MD) && !(entry->flags & FTL_IO_PAD)) {
906 			/* Verify that the LBA is set for user lbks */
907 			assert(entry->lba != FTL_LBA_INVALID);
908 		}
909 
910 		SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write ppa:%lu, lba:%lu\n",
911 			      entry->ppa.ppa, entry->lba);
912 	}
913 
914 	ftl_process_flush(dev, batch);
915 	ftl_rwb_batch_release(batch);
916 }
917 
918 static void
919 ftl_update_rwb_stats(struct spdk_ftl_dev *dev, const struct ftl_rwb_entry *entry)
920 {
921 	if (!ftl_rwb_entry_internal(entry)) {
922 		dev->stats.write_user++;
923 	}
924 	dev->stats.write_total++;
925 }
926 
927 static void
928 ftl_update_l2p(struct spdk_ftl_dev *dev, const struct ftl_rwb_entry *entry,
929 	       struct ftl_ppa ppa)
930 {
931 	struct ftl_ppa prev_ppa;
932 	struct ftl_rwb_entry *prev;
933 	struct ftl_band *band;
934 	int valid;
935 
936 	prev_ppa = ftl_l2p_get(dev, entry->lba);
937 	if (ftl_ppa_invalid(prev_ppa)) {
938 		ftl_l2p_set(dev, entry->lba, ppa);
939 		return;
940 	}
941 
942 	/* If the L2P's PPA is different than what we expected we don't need to */
943 	/* do anything (someone's already overwritten our data). */
944 	if (ftl_rwb_entry_weak(entry) && !ftl_ppa_cmp(prev_ppa, entry->ppa)) {
945 		return;
946 	}
947 
948 	if (ftl_ppa_cached(prev_ppa)) {
949 		assert(!ftl_rwb_entry_weak(entry));
950 		prev = ftl_rwb_entry_from_offset(dev->rwb, prev_ppa.offset);
951 		pthread_spin_lock(&prev->lock);
952 
953 		/* Re-read the L2P under the lock to protect against updates */
954 		/* to this LBA from other threads */
955 		prev_ppa = ftl_l2p_get(dev, entry->lba);
956 
957 		/* If the entry is no longer in cache, another write has been */
958 		/* scheduled in the meantime, so we have to invalidate its LBA */
959 		if (!ftl_ppa_cached(prev_ppa)) {
960 			ftl_invalidate_addr(dev, prev_ppa);
961 		}
962 
963 		/* If previous entry is part of cache, remove and invalidate it */
964 		if (ftl_rwb_entry_valid(prev)) {
965 			ftl_invalidate_addr(dev, prev->ppa);
966 			ftl_rwb_entry_invalidate(prev);
967 		}
968 
969 		ftl_l2p_set(dev, entry->lba, ppa);
970 		pthread_spin_unlock(&prev->lock);
971 		return;
972 	}
973 
974 	/* Lock the band containing previous PPA. This assures atomic changes to */
975 	/* the L2P as wall as metadata. The valid bits in metadata are used to */
976 	/* check weak writes validity. */
977 	band = ftl_band_from_ppa(dev, prev_ppa);
978 	pthread_spin_lock(&band->md.lock);
979 
980 	valid = ftl_invalidate_addr_unlocked(dev, prev_ppa);
981 
982 	/* If the address has been invalidated already, we don't want to update */
983 	/* the L2P for weak writes, as it means the write is no longer valid. */
984 	if (!ftl_rwb_entry_weak(entry) || valid) {
985 		ftl_l2p_set(dev, entry->lba, ppa);
986 	}
987 
988 	pthread_spin_unlock(&band->md.lock);
989 }
990 
991 static int
992 ftl_submit_write(struct ftl_wptr *wptr, struct ftl_io *io)
993 {
994 	struct spdk_ftl_dev	*dev = io->dev;
995 	struct iovec		*iov = ftl_io_iovec(io);
996 	int			rc = 0;
997 	size_t			i, lbk_cnt;
998 
999 	for (i = 0; i < io->iov_cnt; ++i) {
1000 		lbk_cnt = iov[i].iov_len / PAGE_SIZE;
1001 		assert(iov[i].iov_len > 0);
1002 		assert(lbk_cnt == dev->xfer_size);
1003 
1004 		ftl_trace_submission(dev, io, wptr->ppa, iov[i].iov_len / PAGE_SIZE);
1005 		rc = spdk_nvme_ns_cmd_write_with_md(dev->ns, ftl_get_write_qpair(dev),
1006 						    iov[i].iov_base, ftl_io_get_md(io),
1007 						    ftl_ppa_addr_pack(dev, wptr->ppa),
1008 						    lbk_cnt, ftl_io_cmpl_cb, io, 0, 0, 0);
1009 		if (rc) {
1010 			SPDK_ERRLOG("spdk_nvme_ns_cmd_write failed with status:%d, ppa:%lu\n",
1011 				    rc, wptr->ppa.ppa);
1012 			io->status = -EIO;
1013 			break;
1014 		}
1015 
1016 		ftl_io_update_iovec(io, lbk_cnt);
1017 		ftl_io_inc_req(io);
1018 		ftl_wptr_advance(wptr, lbk_cnt);
1019 	}
1020 
1021 	if (ftl_io_done(io)) {
1022 		ftl_io_complete(io);
1023 	}
1024 
1025 	return rc;
1026 }
1027 
1028 static void
1029 ftl_flush_pad_batch(struct spdk_ftl_dev *dev)
1030 {
1031 	struct ftl_rwb *rwb = dev->rwb;
1032 	size_t size;
1033 
1034 	size = ftl_rwb_num_acquired(rwb, FTL_RWB_TYPE_INTERNAL) +
1035 	       ftl_rwb_num_acquired(rwb, FTL_RWB_TYPE_USER);
1036 
1037 	/* There must be something in the RWB, otherwise the flush */
1038 	/* wouldn't be waiting for anything */
1039 	assert(size > 0);
1040 
1041 	/* Only add padding when there's less than xfer size */
1042 	/* entries in the buffer. Otherwise we just have to wait */
1043 	/* for the entries to become ready. */
1044 	if (size < dev->xfer_size) {
1045 		ftl_rwb_pad(dev, dev->xfer_size - (size % dev->xfer_size));
1046 	}
1047 }
1048 
1049 static int
1050 ftl_wptr_process_writes(struct ftl_wptr *wptr)
1051 {
1052 	struct spdk_ftl_dev	*dev = wptr->dev;
1053 	struct ftl_rwb_batch	*batch;
1054 	struct ftl_rwb_entry	*entry;
1055 	struct ftl_io		*io;
1056 	struct ftl_ppa		ppa, prev_ppa;
1057 
1058 	/* Make sure the band is prepared for writing */
1059 	if (!ftl_wptr_ready(wptr)) {
1060 		return 0;
1061 	}
1062 
1063 	if (dev->halt) {
1064 		ftl_process_shutdown(dev);
1065 	}
1066 
1067 	batch = ftl_rwb_pop(dev->rwb);
1068 	if (!batch) {
1069 		/* If there are queued flush requests we need to pad the RWB to */
1070 		/* force out remaining entries */
1071 		if (!LIST_EMPTY(&dev->flush_list)) {
1072 			ftl_flush_pad_batch(dev);
1073 		}
1074 
1075 		return 0;
1076 	}
1077 
1078 	io = ftl_io_rwb_init(dev, wptr->band, batch, ftl_write_cb);
1079 	if (!io) {
1080 		goto error;
1081 	}
1082 
1083 	ppa = wptr->ppa;
1084 	ftl_rwb_foreach(entry, batch) {
1085 		entry->ppa = ppa;
1086 
1087 		if (entry->lba != FTL_LBA_INVALID) {
1088 			pthread_spin_lock(&entry->lock);
1089 			prev_ppa = ftl_l2p_get(dev, entry->lba);
1090 
1091 			/* If the l2p was updated in the meantime, don't update band's metadata */
1092 			if (ftl_ppa_cached(prev_ppa) && prev_ppa.offset == entry->pos) {
1093 				/* Setting entry's cache bit needs to be done after metadata */
1094 				/* within the band is updated to make sure that writes */
1095 				/* invalidating the entry clear the metadata as well */
1096 				ftl_band_set_addr(wptr->band, entry->lba, entry->ppa);
1097 				ftl_rwb_entry_set_valid(entry);
1098 			}
1099 			pthread_spin_unlock(&entry->lock);
1100 		}
1101 
1102 		ftl_trace_rwb_pop(dev, entry);
1103 		ftl_update_rwb_stats(dev, entry);
1104 
1105 		ppa = ftl_band_next_ppa(wptr->band, ppa, 1);
1106 	}
1107 
1108 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write ppa:%lx, %lx\n", wptr->ppa.ppa,
1109 		      ftl_ppa_addr_pack(dev, wptr->ppa));
1110 
1111 	if (ftl_submit_write(wptr, io)) {
1112 		/* TODO: we need some recovery here */
1113 		assert(0 && "Write submit failed");
1114 		if (ftl_io_done(io)) {
1115 			ftl_io_free(io);
1116 		}
1117 	}
1118 
1119 	return dev->xfer_size;
1120 error:
1121 	ftl_rwb_batch_revert(batch);
1122 	return 0;
1123 }
1124 
1125 static int
1126 ftl_process_writes(struct spdk_ftl_dev *dev)
1127 {
1128 	struct ftl_wptr *wptr, *twptr;
1129 	size_t num_active = 0;
1130 	enum ftl_band_state state;
1131 
1132 	LIST_FOREACH_SAFE(wptr, &dev->wptr_list, list_entry, twptr) {
1133 		ftl_wptr_process_writes(wptr);
1134 		state = wptr->band->state;
1135 
1136 		if (state != FTL_BAND_STATE_FULL &&
1137 		    state != FTL_BAND_STATE_CLOSING &&
1138 		    state != FTL_BAND_STATE_CLOSED) {
1139 			num_active++;
1140 		}
1141 	}
1142 
1143 	if (num_active < 1) {
1144 		ftl_add_wptr(dev);
1145 	}
1146 
1147 	return 0;
1148 }
1149 
1150 static void
1151 ftl_rwb_entry_fill(struct ftl_rwb_entry *entry, struct ftl_io *io)
1152 {
1153 	struct ftl_band *band;
1154 
1155 	memcpy(entry->data, ftl_io_iovec_addr(io), FTL_BLOCK_SIZE);
1156 
1157 	if (ftl_rwb_entry_weak(entry)) {
1158 		band = ftl_band_from_ppa(io->dev, io->ppa);
1159 		entry->ppa = ftl_band_next_ppa(band, io->ppa, io->pos);
1160 	}
1161 
1162 	entry->trace = io->trace;
1163 
1164 	if (entry->md) {
1165 		memcpy(entry->md, &entry->lba, sizeof(io->lba));
1166 	}
1167 }
1168 
1169 static int
1170 ftl_rwb_fill(struct ftl_io *io)
1171 {
1172 	struct spdk_ftl_dev *dev = io->dev;
1173 	struct ftl_rwb_entry *entry;
1174 	struct ftl_ppa ppa = { .cached = 1 };
1175 	int flags = ftl_rwb_flags_from_io(io);
1176 	uint64_t lba;
1177 
1178 	while (io->pos < io->lbk_cnt) {
1179 		lba = ftl_io_current_lba(io);
1180 		if (lba == FTL_LBA_INVALID) {
1181 			ftl_io_update_iovec(io, 1);
1182 			continue;
1183 		}
1184 
1185 		entry = ftl_acquire_entry(dev, flags);
1186 		if (!entry) {
1187 			return -EAGAIN;
1188 		}
1189 
1190 		entry->lba = lba;
1191 		ftl_rwb_entry_fill(entry, io);
1192 
1193 		ppa.offset = entry->pos;
1194 
1195 		ftl_io_update_iovec(io, 1);
1196 		ftl_update_l2p(dev, entry, ppa);
1197 
1198 		/* Needs to be done after L2P is updated to avoid race with */
1199 		/* write completion callback when it's processed faster than */
1200 		/* L2P is set in update_l2p(). */
1201 		ftl_rwb_push(entry);
1202 		ftl_trace_rwb_fill(dev, io);
1203 	}
1204 
1205 	ftl_io_complete(io);
1206 	return 0;
1207 }
1208 
1209 static bool
1210 ftl_dev_needs_defrag(struct spdk_ftl_dev *dev)
1211 {
1212 	const struct spdk_ftl_limit *limit = ftl_get_limit(dev, SPDK_FTL_LIMIT_START);
1213 
1214 	if (ftl_reloc_is_halted(dev->reloc)) {
1215 		return false;
1216 	}
1217 
1218 	if (dev->df_band) {
1219 		return false;
1220 	}
1221 
1222 	if (dev->num_free <= limit->thld) {
1223 		return true;
1224 	}
1225 
1226 	return false;
1227 }
1228 
1229 static double
1230 ftl_band_calc_merit(struct ftl_band *band, size_t *threshold_valid)
1231 {
1232 	size_t usable, valid, invalid;
1233 	double vld_ratio;
1234 
1235 	/* If the band doesn't have any usable lbks it's of no use */
1236 	usable = ftl_band_num_usable_lbks(band);
1237 	if (usable == 0) {
1238 		return 0.0;
1239 	}
1240 
1241 	valid =  threshold_valid ? (usable - *threshold_valid) : band->md.num_vld;
1242 	invalid = usable - valid;
1243 
1244 	/* Add one to avoid division by 0 */
1245 	vld_ratio = (double)invalid / (double)(valid + 1);
1246 	return vld_ratio * ftl_band_age(band);
1247 }
1248 
1249 static bool
1250 ftl_band_needs_defrag(struct ftl_band *band, struct spdk_ftl_dev *dev)
1251 {
1252 	struct spdk_ftl_conf *conf = &dev->conf;
1253 	size_t thld_vld;
1254 
1255 	/* If we're in dire need of free bands, every band is worth defragging */
1256 	if (ftl_current_limit(dev) == SPDK_FTL_LIMIT_CRIT) {
1257 		return true;
1258 	}
1259 
1260 	thld_vld = (ftl_band_num_usable_lbks(band) * conf->defrag.invalid_thld) / 100;
1261 
1262 	return band->merit > ftl_band_calc_merit(band, &thld_vld);
1263 }
1264 
1265 static struct ftl_band *
1266 ftl_select_defrag_band(struct spdk_ftl_dev *dev)
1267 {
1268 	struct ftl_band *band, *mband = NULL;
1269 	double merit = 0;
1270 
1271 	LIST_FOREACH(band, &dev->shut_bands, list_entry) {
1272 		assert(band->state == FTL_BAND_STATE_CLOSED);
1273 		band->merit = ftl_band_calc_merit(band, NULL);
1274 		if (band->merit > merit) {
1275 			merit = band->merit;
1276 			mband = band;
1277 		}
1278 	}
1279 
1280 	if (mband && !ftl_band_needs_defrag(mband, dev)) {
1281 		mband = NULL;
1282 	}
1283 
1284 	return mband;
1285 }
1286 
1287 static void
1288 ftl_process_relocs(struct spdk_ftl_dev *dev)
1289 {
1290 	if (ftl_dev_needs_defrag(dev)) {
1291 		dev->df_band = ftl_select_defrag_band(dev);
1292 		if (dev->df_band) {
1293 			ftl_reloc_add(dev->reloc, dev->df_band, 0, ftl_num_band_lbks(dev), 0);
1294 		}
1295 	}
1296 
1297 	ftl_reloc(dev->reloc);
1298 }
1299 
1300 int
1301 ftl_current_limit(const struct spdk_ftl_dev *dev)
1302 {
1303 	return dev->limit;
1304 }
1305 
1306 void
1307 spdk_ftl_dev_get_attrs(const struct spdk_ftl_dev *dev, struct spdk_ftl_attrs *attrs)
1308 {
1309 	attrs->uuid = dev->uuid;
1310 	attrs->lbk_cnt = dev->num_lbas;
1311 	attrs->lbk_size = FTL_BLOCK_SIZE;
1312 	attrs->range = dev->range;
1313 	attrs->cache_bdev_desc = dev->cache_bdev_desc;
1314 }
1315 
1316 static void
1317 _ftl_io_write(void *ctx)
1318 {
1319 	ftl_io_write((struct ftl_io *)ctx);
1320 }
1321 
1322 int
1323 ftl_io_write(struct ftl_io *io)
1324 {
1325 	struct spdk_ftl_dev *dev = io->dev;
1326 
1327 	/* For normal IOs we just need to copy the data onto the rwb */
1328 	if (!(io->flags & FTL_IO_MD)) {
1329 		return ftl_rwb_fill(io);
1330 	}
1331 
1332 	/* Metadata has its own buffer, so it doesn't have to be copied, so just */
1333 	/* send it the the core thread and schedule the write immediately */
1334 	if (ftl_check_core_thread(dev)) {
1335 		return ftl_submit_write(ftl_wptr_from_band(io->band), io);
1336 	}
1337 
1338 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_write, io);
1339 
1340 	return 0;
1341 }
1342 
1343 static int
1344 _spdk_ftl_write(struct ftl_io *io)
1345 {
1346 	int rc;
1347 
1348 	rc = ftl_io_write(io);
1349 	if (rc == -EAGAIN) {
1350 		spdk_thread_send_msg(spdk_io_channel_get_thread(io->ch),
1351 				     _ftl_write, io);
1352 		return 0;
1353 	}
1354 
1355 	if (rc) {
1356 		ftl_io_free(io);
1357 	}
1358 
1359 	return rc;
1360 }
1361 
1362 static void
1363 _ftl_write(void *ctx)
1364 {
1365 	_spdk_ftl_write(ctx);
1366 }
1367 
1368 int
1369 spdk_ftl_write(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
1370 	       struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
1371 {
1372 	struct ftl_io *io;
1373 
1374 	if (iov_cnt == 0 || iov_cnt > FTL_MAX_IOV) {
1375 		return -EINVAL;
1376 	}
1377 
1378 	if (lba_cnt == 0) {
1379 		return -EINVAL;
1380 	}
1381 
1382 	if (lba_cnt != ftl_iovec_num_lbks(iov, iov_cnt)) {
1383 		return -EINVAL;
1384 	}
1385 
1386 	if (!dev->initialized) {
1387 		return -EBUSY;
1388 	}
1389 
1390 	io = ftl_io_alloc(ch);
1391 	if (!io) {
1392 		return -ENOMEM;
1393 	}
1394 
1395 	ftl_io_user_init(dev, io, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_WRITE);
1396 	return _spdk_ftl_write(io);
1397 }
1398 
1399 int
1400 ftl_io_read(struct ftl_io *io)
1401 {
1402 	struct spdk_ftl_dev *dev = io->dev;
1403 	ftl_next_ppa_fn	next_ppa;
1404 
1405 	if (ftl_check_read_thread(dev)) {
1406 		if (ftl_io_mode_ppa(io)) {
1407 			next_ppa = ftl_ppa_read_next_ppa;
1408 		} else {
1409 			next_ppa = ftl_lba_read_next_ppa;
1410 		}
1411 
1412 		return ftl_submit_read(io, next_ppa, NULL);
1413 	}
1414 
1415 	spdk_thread_send_msg(ftl_get_read_thread(dev), _ftl_read, io);
1416 	return 0;
1417 }
1418 
1419 static void
1420 _ftl_read(void *arg)
1421 {
1422 	ftl_io_read((struct ftl_io *)arg);
1423 }
1424 
1425 int
1426 spdk_ftl_read(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
1427 	      struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
1428 {
1429 	struct ftl_io *io;
1430 
1431 	if (iov_cnt == 0 || iov_cnt > FTL_MAX_IOV) {
1432 		return -EINVAL;
1433 	}
1434 
1435 	if (lba_cnt == 0) {
1436 		return -EINVAL;
1437 	}
1438 
1439 	if (lba_cnt != ftl_iovec_num_lbks(iov, iov_cnt)) {
1440 		return -EINVAL;
1441 	}
1442 
1443 	if (!dev->initialized) {
1444 		return -EBUSY;
1445 	}
1446 
1447 	io = ftl_io_alloc(ch);
1448 	if (!io) {
1449 		return -ENOMEM;
1450 	}
1451 
1452 	ftl_io_user_init(dev, io, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_READ);
1453 	ftl_io_read(io);
1454 	return 0;
1455 }
1456 
1457 static struct ftl_flush *
1458 ftl_flush_init(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
1459 {
1460 	struct ftl_flush *flush;
1461 	struct ftl_rwb *rwb = dev->rwb;
1462 
1463 	flush = calloc(1, sizeof(*flush));
1464 	if (!flush) {
1465 		return NULL;
1466 	}
1467 
1468 	flush->bmap = spdk_bit_array_create(ftl_rwb_num_batches(rwb));
1469 	if (!flush->bmap) {
1470 		goto error;
1471 	}
1472 
1473 	flush->dev = dev;
1474 	flush->cb.fn = cb_fn;
1475 	flush->cb.ctx = cb_arg;
1476 
1477 	return flush;
1478 error:
1479 	free(flush);
1480 	return NULL;
1481 }
1482 
1483 static void
1484 _ftl_flush(void *ctx)
1485 {
1486 	struct ftl_flush *flush = ctx;
1487 	struct spdk_ftl_dev *dev = flush->dev;
1488 	struct ftl_rwb *rwb = dev->rwb;
1489 	struct ftl_rwb_batch *batch;
1490 
1491 	/* Attach flush object to all non-empty batches */
1492 	ftl_rwb_foreach_batch(batch, rwb) {
1493 		if (!ftl_rwb_batch_empty(batch)) {
1494 			spdk_bit_array_set(flush->bmap, ftl_rwb_batch_get_offset(batch));
1495 			flush->num_req++;
1496 		}
1497 	}
1498 
1499 	LIST_INSERT_HEAD(&dev->flush_list, flush, list_entry);
1500 
1501 	/* If the RWB was already empty, the flush can be completed right away */
1502 	if (!flush->num_req) {
1503 		ftl_complete_flush(flush);
1504 	}
1505 }
1506 
1507 int
1508 spdk_ftl_flush(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
1509 {
1510 	struct ftl_flush *flush;
1511 
1512 	if (!dev->initialized) {
1513 		return -EBUSY;
1514 	}
1515 
1516 	flush = ftl_flush_init(dev, cb_fn, cb_arg);
1517 	if (!flush) {
1518 		return -ENOMEM;
1519 	}
1520 
1521 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_flush, flush);
1522 	return 0;
1523 }
1524 
1525 void
1526 ftl_process_anm_event(struct ftl_anm_event *event)
1527 {
1528 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Unconsumed ANM received for dev: %p...\n", event->dev);
1529 	ftl_anm_event_complete(event);
1530 }
1531 
1532 static void
1533 ftl_process_retry_queue(struct spdk_ftl_dev *dev)
1534 {
1535 	struct ftl_io *io;
1536 	int rc;
1537 
1538 	while (!TAILQ_EMPTY(&dev->retry_queue)) {
1539 		io = TAILQ_FIRST(&dev->retry_queue);
1540 
1541 		/* Retry only if IO is still healthy */
1542 		if (spdk_likely(io->status == 0)) {
1543 			rc = ftl_io_read(io);
1544 			if (rc == -ENOMEM) {
1545 				break;
1546 			}
1547 		}
1548 
1549 		io->flags &= ~FTL_IO_RETRY;
1550 		TAILQ_REMOVE(&dev->retry_queue, io, retry_entry);
1551 
1552 		if (ftl_io_done(io)) {
1553 			ftl_io_complete(io);
1554 		}
1555 	}
1556 }
1557 
1558 int
1559 ftl_task_read(void *ctx)
1560 {
1561 	struct ftl_thread *thread = ctx;
1562 	struct spdk_ftl_dev *dev = thread->dev;
1563 	struct spdk_nvme_qpair *qpair = ftl_get_read_qpair(dev);
1564 	size_t num_completed;
1565 
1566 	if (dev->halt) {
1567 		if (ftl_shutdown_complete(dev)) {
1568 			spdk_poller_unregister(&thread->poller);
1569 			return 0;
1570 		}
1571 	}
1572 
1573 	num_completed = spdk_nvme_qpair_process_completions(qpair, 0);
1574 
1575 	if (num_completed && !TAILQ_EMPTY(&dev->retry_queue)) {
1576 		ftl_process_retry_queue(dev);
1577 	}
1578 
1579 	return num_completed;
1580 }
1581 
1582 int
1583 ftl_task_core(void *ctx)
1584 {
1585 	struct ftl_thread *thread = ctx;
1586 	struct spdk_ftl_dev *dev = thread->dev;
1587 	struct spdk_nvme_qpair *qpair = ftl_get_write_qpair(dev);
1588 
1589 	if (dev->halt) {
1590 		if (ftl_shutdown_complete(dev)) {
1591 			spdk_poller_unregister(&thread->poller);
1592 			return 0;
1593 		}
1594 	}
1595 
1596 	ftl_process_writes(dev);
1597 	spdk_nvme_qpair_process_completions(qpair, 0);
1598 	ftl_process_relocs(dev);
1599 
1600 	return 0;
1601 }
1602 
1603 SPDK_LOG_REGISTER_COMPONENT("ftl_core", SPDK_LOG_FTL_CORE)
1604