xref: /spdk/lib/ftl/ftl_core.c (revision 42dba6047b054d9db99f2a78f1b7100a9b3162a1)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/likely.h"
35 #include "spdk/stdinc.h"
36 #include "spdk/nvme.h"
37 #include "spdk/io_channel.h"
38 #include "spdk/bdev_module.h"
39 #include "spdk_internal/log.h"
40 #include "spdk/ftl.h"
41 
42 #include "ftl_core.h"
43 #include "ftl_band.h"
44 #include "ftl_io.h"
45 #include "ftl_anm.h"
46 #include "ftl_rwb.h"
47 #include "ftl_debug.h"
48 #include "ftl_reloc.h"
49 
50 /* Max number of iovecs */
51 #define FTL_MAX_IOV 1024
52 
53 struct ftl_wptr {
54 	/* Owner device */
55 	struct spdk_ftl_dev		*dev;
56 
57 	/* Current PPA */
58 	struct ftl_ppa			ppa;
59 
60 	/* Band currently being written to */
61 	struct ftl_band			*band;
62 
63 	/* Current logical block's offset */
64 	uint64_t			offset;
65 
66 	/* Current erase block */
67 	struct ftl_chunk		*chunk;
68 
69 	/* Metadata DMA buffer */
70 	void				*md_buf;
71 
72 	/* List link */
73 	LIST_ENTRY(ftl_wptr)		list_entry;
74 };
75 
76 struct ftl_flush {
77 	/* Owner device */
78 	struct spdk_ftl_dev		*dev;
79 
80 	/* Number of batches to wait for */
81 	size_t				num_req;
82 
83 	/* Callback */
84 	struct ftl_cb			cb;
85 
86 	/* Batch bitmap */
87 	struct spdk_bit_array		*bmap;
88 
89 	/* List link */
90 	LIST_ENTRY(ftl_flush)		list_entry;
91 };
92 
93 typedef int (*ftl_next_ppa_fn)(struct ftl_io *, struct ftl_ppa *, size_t, void *);
94 static void _ftl_read(void *);
95 static void _ftl_write(void *);
96 
97 static int
98 ftl_rwb_flags_from_io(const struct ftl_io *io)
99 {
100 	int valid_flags = FTL_IO_INTERNAL | FTL_IO_WEAK | FTL_IO_PAD;
101 	return io->flags & valid_flags;
102 }
103 
104 static int
105 ftl_rwb_entry_weak(const struct ftl_rwb_entry *entry)
106 {
107 	return entry->flags & FTL_IO_WEAK;
108 }
109 
110 static void
111 ftl_wptr_free(struct ftl_wptr *wptr)
112 {
113 	if (!wptr) {
114 		return;
115 	}
116 
117 	spdk_dma_free(wptr->md_buf);
118 	free(wptr);
119 }
120 
121 static void
122 ftl_remove_wptr(struct ftl_wptr *wptr)
123 {
124 	LIST_REMOVE(wptr, list_entry);
125 	ftl_wptr_free(wptr);
126 }
127 
128 static void
129 ftl_io_cmpl_cb(void *arg, const struct spdk_nvme_cpl *status)
130 {
131 	struct ftl_io *io = arg;
132 
133 	if (spdk_nvme_cpl_is_error(status)) {
134 		ftl_io_process_error(io, status);
135 	}
136 
137 	ftl_trace_completion(io->dev, io, FTL_TRACE_COMPLETION_DISK);
138 
139 	if (!ftl_io_dec_req(io)) {
140 		ftl_io_complete(io);
141 	}
142 }
143 
144 static void
145 ftl_halt_writes(struct spdk_ftl_dev *dev, struct ftl_band *band)
146 {
147 	struct ftl_wptr *wptr = NULL;
148 
149 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
150 		if (wptr->band == band) {
151 			break;
152 		}
153 	}
154 
155 	/* If the band already has the high_prio flag set, other writes must */
156 	/* have failed earlier, so it's already taken care of. */
157 	if (band->high_prio) {
158 		assert(wptr == NULL);
159 		return;
160 	}
161 
162 	ftl_band_write_failed(band);
163 	ftl_remove_wptr(wptr);
164 }
165 
166 static struct ftl_wptr *
167 ftl_wptr_from_band(struct ftl_band *band)
168 {
169 	struct spdk_ftl_dev *dev = band->dev;
170 	struct ftl_wptr *wptr = NULL;
171 
172 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
173 		if (wptr->band == band) {
174 			return wptr;
175 		}
176 	}
177 
178 	return NULL;
179 }
180 
181 static void
182 ftl_md_write_fail(struct ftl_io *io, int status)
183 {
184 	struct ftl_band *band = io->band;
185 	struct ftl_wptr *wptr;
186 	char buf[128];
187 
188 	wptr = ftl_wptr_from_band(band);
189 
190 	SPDK_ERRLOG("Metadata write failed @ppa: %s, status: %d\n",
191 		    ftl_ppa2str(wptr->ppa, buf, sizeof(buf)), status);
192 
193 	ftl_halt_writes(io->dev, band);
194 }
195 
196 static void
197 ftl_md_write_cb(void *arg, int status)
198 {
199 	struct ftl_io *io = arg;
200 	struct ftl_wptr *wptr;
201 
202 	wptr = ftl_wptr_from_band(io->band);
203 
204 	if (status) {
205 		ftl_md_write_fail(io, status);
206 		return;
207 	}
208 
209 	ftl_band_set_next_state(io->band);
210 	if (io->band->state == FTL_BAND_STATE_CLOSED) {
211 		ftl_remove_wptr(wptr);
212 	}
213 }
214 
215 static int
216 ftl_ppa_read_next_ppa(struct ftl_io *io, struct ftl_ppa *ppa,
217 		      size_t lbk, void *ctx)
218 {
219 	struct spdk_ftl_dev *dev = io->dev;
220 	size_t lbk_cnt, max_lbks;
221 
222 	assert(ftl_io_mode_ppa(io));
223 	assert(io->iov_pos < io->iov_cnt);
224 
225 	if (lbk == 0) {
226 		*ppa = io->ppa;
227 	} else {
228 		*ppa = ftl_band_next_xfer_ppa(io->band, io->ppa, lbk);
229 	}
230 
231 	assert(!ftl_ppa_invalid(*ppa));
232 
233 	/* Metadata has to be read in the way it's written (jumping across */
234 	/* the chunks in xfer_size increments) */
235 	if (io->flags & FTL_IO_MD) {
236 		max_lbks = dev->xfer_size - (ppa->lbk % dev->xfer_size);
237 		lbk_cnt = spdk_min(ftl_io_iovec_len_left(io), max_lbks);
238 		assert(ppa->lbk / dev->xfer_size == (ppa->lbk + lbk_cnt - 1) / dev->xfer_size);
239 	} else {
240 		lbk_cnt = ftl_io_iovec_len_left(io);
241 	}
242 
243 	return lbk_cnt;
244 }
245 
246 static int
247 ftl_wptr_close_band(struct ftl_wptr *wptr)
248 {
249 	struct ftl_band *band = wptr->band;
250 
251 	ftl_band_set_state(band, FTL_BAND_STATE_CLOSING);
252 	band->tail_md_ppa = wptr->ppa;
253 
254 	return ftl_band_write_tail_md(band, wptr->md_buf, ftl_md_write_cb);
255 }
256 
257 static int
258 ftl_wptr_open_band(struct ftl_wptr *wptr)
259 {
260 	struct ftl_band *band = wptr->band;
261 
262 	assert(ftl_band_chunk_is_first(band, wptr->chunk));
263 	assert(band->md.num_vld == 0);
264 
265 	ftl_band_clear_md(band);
266 
267 	assert(band->state == FTL_BAND_STATE_PREP);
268 	ftl_band_set_state(band, FTL_BAND_STATE_OPENING);
269 
270 	return ftl_band_write_head_md(band, wptr->md_buf, ftl_md_write_cb);
271 }
272 
273 static int
274 ftl_submit_erase(struct ftl_io *io)
275 {
276 	struct spdk_ftl_dev *dev = io->dev;
277 	struct ftl_band *band = io->band;
278 	struct ftl_ppa ppa = io->ppa;
279 	struct ftl_chunk *chunk;
280 	uint64_t ppa_packed;
281 	int rc = 0;
282 	size_t i;
283 
284 	for (i = 0; i < io->lbk_cnt; ++i) {
285 		if (i != 0) {
286 			chunk = ftl_band_next_chunk(band, ftl_band_chunk_from_ppa(band, ppa));
287 			assert(chunk->state == FTL_CHUNK_STATE_CLOSED ||
288 			       chunk->state == FTL_CHUNK_STATE_VACANT);
289 			ppa = chunk->start_ppa;
290 		}
291 
292 		assert(ppa.lbk == 0);
293 		ppa_packed = ftl_ppa_addr_pack(dev, ppa);
294 
295 		ftl_io_inc_req(io);
296 
297 		ftl_trace_submission(dev, io, ppa, 1);
298 		rc = spdk_nvme_ocssd_ns_cmd_vector_reset(dev->ns, ftl_get_write_qpair(dev),
299 				&ppa_packed, 1, NULL, ftl_io_cmpl_cb, io);
300 		if (rc) {
301 			SPDK_ERRLOG("Vector reset failed with status: %d\n", rc);
302 			ftl_io_dec_req(io);
303 			break;
304 		}
305 	}
306 
307 	if (ftl_io_done(io)) {
308 		ftl_io_complete(io);
309 	}
310 
311 	return rc;
312 }
313 
314 static void
315 _ftl_io_erase(void *ctx)
316 {
317 	ftl_io_erase((struct ftl_io *)ctx);
318 }
319 
320 static bool
321 ftl_check_core_thread(const struct spdk_ftl_dev *dev)
322 {
323 	return dev->core_thread.thread == spdk_get_thread();
324 }
325 
326 static bool
327 ftl_check_read_thread(const struct spdk_ftl_dev *dev)
328 {
329 	return dev->read_thread.thread == spdk_get_thread();
330 }
331 
332 int
333 ftl_io_erase(struct ftl_io *io)
334 {
335 	struct spdk_ftl_dev *dev = io->dev;
336 
337 	if (ftl_check_core_thread(dev)) {
338 		return ftl_submit_erase(io);
339 	}
340 
341 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_erase, io);
342 	return 0;
343 }
344 
345 static struct ftl_band *
346 ftl_next_write_band(struct spdk_ftl_dev *dev)
347 {
348 	struct ftl_band *band;
349 
350 	band = LIST_FIRST(&dev->free_bands);
351 	if (!band) {
352 		return NULL;
353 	}
354 	assert(band->state == FTL_BAND_STATE_FREE);
355 
356 	if (ftl_band_erase(band)) {
357 		/* TODO: handle erase failure */
358 		return NULL;
359 	}
360 
361 	return band;
362 }
363 
364 static struct ftl_band *
365 ftl_next_wptr_band(struct spdk_ftl_dev *dev)
366 {
367 	struct ftl_band *band;
368 
369 	if (!dev->next_band) {
370 		band = ftl_next_write_band(dev);
371 	} else {
372 		assert(dev->next_band->state == FTL_BAND_STATE_PREP);
373 		band = dev->next_band;
374 		dev->next_band = NULL;
375 	}
376 
377 	return band;
378 }
379 
380 static struct ftl_wptr *
381 ftl_wptr_init(struct ftl_band *band)
382 {
383 	struct spdk_ftl_dev *dev = band->dev;
384 	struct ftl_wptr *wptr;
385 
386 	wptr = calloc(1, sizeof(*wptr));
387 	if (!wptr) {
388 		return NULL;
389 	}
390 
391 	wptr->md_buf = spdk_dma_zmalloc(ftl_tail_md_num_lbks(dev) * FTL_BLOCK_SIZE,
392 					FTL_BLOCK_SIZE, NULL);
393 	if (!wptr->md_buf) {
394 		ftl_wptr_free(wptr);
395 		return NULL;
396 	}
397 
398 	wptr->dev = dev;
399 	wptr->band = band;
400 	wptr->chunk = CIRCLEQ_FIRST(&band->chunks);
401 	wptr->ppa = wptr->chunk->start_ppa;
402 
403 	return wptr;
404 }
405 
406 static int
407 ftl_add_wptr(struct spdk_ftl_dev *dev)
408 {
409 	struct ftl_band *band;
410 	struct ftl_wptr *wptr;
411 
412 	band = ftl_next_wptr_band(dev);
413 	if (!band) {
414 		return -1;
415 	}
416 
417 	wptr = ftl_wptr_init(band);
418 	if (!wptr) {
419 		return -1;
420 	}
421 
422 	if (ftl_band_write_prep(band)) {
423 		ftl_wptr_free(wptr);
424 		return -1;
425 	}
426 
427 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
428 
429 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: band %u\n", band->id);
430 	ftl_trace_write_band(dev, band);
431 	return 0;
432 }
433 
434 static void
435 ftl_wptr_advance(struct ftl_wptr *wptr, size_t xfer_size)
436 {
437 	struct ftl_band *band = wptr->band;
438 	struct spdk_ftl_dev *dev = wptr->dev;
439 	struct spdk_ftl_conf *conf = &dev->conf;
440 	size_t next_thld;
441 
442 	wptr->offset += xfer_size;
443 	next_thld = (ftl_band_num_usable_lbks(band) * conf->band_thld) / 100;
444 
445 	if (ftl_band_full(band, wptr->offset)) {
446 		ftl_band_set_state(band, FTL_BAND_STATE_FULL);
447 	}
448 
449 	wptr->ppa = ftl_band_next_xfer_ppa(band, wptr->ppa, xfer_size);
450 	wptr->chunk = ftl_band_next_operational_chunk(band, wptr->chunk);
451 
452 	assert(!ftl_ppa_invalid(wptr->ppa));
453 
454 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: grp:%d, pu:%d chunk:%d, lbk:%u\n",
455 		      wptr->ppa.grp, wptr->ppa.pu, wptr->ppa.chk, wptr->ppa.lbk);
456 
457 	if (wptr->offset >= next_thld && !dev->next_band) {
458 		dev->next_band = ftl_next_write_band(dev);
459 	}
460 }
461 
462 static int
463 ftl_wptr_ready(struct ftl_wptr *wptr)
464 {
465 	struct ftl_band *band = wptr->band;
466 
467 	/* TODO: add handling of empty bands */
468 
469 	if (spdk_unlikely(!ftl_chunk_is_writable(wptr->chunk))) {
470 		/* Erasing band may fail after it was assigned to wptr. */
471 		if (spdk_unlikely(wptr->chunk->state == FTL_CHUNK_STATE_BAD)) {
472 			ftl_wptr_advance(wptr, wptr->dev->xfer_size);
473 		}
474 		return 0;
475 	}
476 
477 	/* If we're in the process of writing metadata, wait till it is */
478 	/* completed. */
479 	/* TODO: we should probably change bands once we're writing tail md */
480 	if (ftl_band_state_changing(band)) {
481 		return 0;
482 	}
483 
484 	if (band->state == FTL_BAND_STATE_FULL) {
485 		if (ftl_wptr_close_band(wptr)) {
486 			/* TODO: need recovery here */
487 			assert(false);
488 		}
489 		return 0;
490 	}
491 
492 	if (band->state != FTL_BAND_STATE_OPEN) {
493 		if (ftl_wptr_open_band(wptr)) {
494 			/* TODO: need recovery here */
495 			assert(false);
496 		}
497 		return 0;
498 	}
499 
500 	return 1;
501 }
502 
503 static const struct spdk_ftl_limit *
504 ftl_get_limit(const struct spdk_ftl_dev *dev, int type)
505 {
506 	assert(type < SPDK_FTL_LIMIT_MAX);
507 	return &dev->conf.defrag.limits[type];
508 }
509 
510 static int
511 ftl_update_md_entry(struct spdk_ftl_dev *dev, struct ftl_rwb_entry *entry)
512 {
513 	struct ftl_ppa ppa;
514 
515 	/* If the LBA is invalid don't bother checking the md and l2p */
516 	if (spdk_unlikely(entry->lba == FTL_LBA_INVALID)) {
517 		return 1;
518 	}
519 
520 	ppa = ftl_l2p_get(dev, entry->lba);
521 	if (!(ftl_ppa_cached(ppa) && ppa.offset == entry->pos)) {
522 		ftl_invalidate_addr(dev, entry->ppa);
523 		return 1;
524 	}
525 
526 	return 0;
527 }
528 
529 static void
530 ftl_evict_cache_entry(struct spdk_ftl_dev *dev, struct ftl_rwb_entry *entry)
531 {
532 	pthread_spin_lock(&entry->lock);
533 
534 	if (!ftl_rwb_entry_valid(entry)) {
535 		goto unlock;
536 	}
537 
538 	/* Make sure the metadata is in sync with l2p. If the l2p still contains */
539 	/* the entry, fill it with the on-disk PPA and clear the cache status */
540 	/* bit. Otherwise, skip the l2p update and just clear the cache status. */
541 	/* This can happen, when a write comes during the time that l2p contains */
542 	/* the entry, but the entry doesn't  have a PPA assigned (and therefore */
543 	/* does not have the cache bit set). */
544 	if (ftl_update_md_entry(dev, entry)) {
545 		goto clear;
546 	}
547 
548 	ftl_l2p_set(dev, entry->lba, entry->ppa);
549 clear:
550 	ftl_rwb_entry_invalidate(entry);
551 unlock:
552 	pthread_spin_unlock(&entry->lock);
553 }
554 
555 static struct ftl_rwb_entry *
556 ftl_acquire_entry(struct spdk_ftl_dev *dev, int flags)
557 {
558 	struct ftl_rwb_entry *entry;
559 
560 	entry = ftl_rwb_acquire(dev->rwb, ftl_rwb_type_from_flags(flags));
561 	if (!entry) {
562 		return NULL;
563 	}
564 
565 	ftl_evict_cache_entry(dev, entry);
566 
567 	entry->flags = flags;
568 	return entry;
569 }
570 
571 static void
572 ftl_rwb_pad(struct spdk_ftl_dev *dev, size_t size)
573 {
574 	struct ftl_rwb_entry *entry;
575 	int flags = FTL_IO_PAD | FTL_IO_INTERNAL;
576 
577 	for (size_t i = 0; i < size; ++i) {
578 		entry = ftl_acquire_entry(dev, flags);
579 		if (!entry) {
580 			break;
581 		}
582 
583 		entry->lba = FTL_LBA_INVALID;
584 		entry->ppa = ftl_to_ppa(FTL_PPA_INVALID);
585 		memset(entry->data, 0, FTL_BLOCK_SIZE);
586 		ftl_rwb_push(entry);
587 	}
588 }
589 
590 static void
591 ftl_remove_free_bands(struct spdk_ftl_dev *dev)
592 {
593 	while (!LIST_EMPTY(&dev->free_bands)) {
594 		LIST_REMOVE(LIST_FIRST(&dev->free_bands), list_entry);
595 	}
596 
597 	dev->next_band = NULL;
598 }
599 
600 static void
601 ftl_process_shutdown(struct spdk_ftl_dev *dev)
602 {
603 	size_t size = ftl_rwb_num_acquired(dev->rwb, FTL_RWB_TYPE_INTERNAL) +
604 		      ftl_rwb_num_acquired(dev->rwb, FTL_RWB_TYPE_USER);
605 
606 	if (size >= dev->xfer_size) {
607 		return;
608 	}
609 
610 	/* If we reach this point we need to remove free bands */
611 	/* and pad current wptr band to the end */
612 	ftl_remove_free_bands(dev);
613 
614 	/* Pad write buffer until band is full */
615 	ftl_rwb_pad(dev, dev->xfer_size - size);
616 }
617 
618 static int
619 ftl_shutdown_complete(struct spdk_ftl_dev *dev)
620 {
621 	return !__atomic_load_n(&dev->num_inflight, __ATOMIC_SEQ_CST) &&
622 	       LIST_EMPTY(&dev->wptr_list);
623 }
624 
625 void
626 ftl_apply_limits(struct spdk_ftl_dev *dev)
627 {
628 	const struct spdk_ftl_limit *limit;
629 	struct ftl_stats *stats = &dev->stats;
630 	size_t rwb_limit[FTL_RWB_TYPE_MAX];
631 	int i;
632 
633 	ftl_rwb_get_limits(dev->rwb, rwb_limit);
634 
635 	/* Clear existing limit */
636 	dev->limit = SPDK_FTL_LIMIT_MAX;
637 
638 	for (i = SPDK_FTL_LIMIT_CRIT; i < SPDK_FTL_LIMIT_MAX; ++i) {
639 		limit = ftl_get_limit(dev, i);
640 
641 		if (dev->num_free <= limit->thld) {
642 			rwb_limit[FTL_RWB_TYPE_USER] =
643 				(limit->limit * ftl_rwb_entry_cnt(dev->rwb)) / 100;
644 			stats->limits[i]++;
645 			dev->limit = i;
646 			goto apply;
647 		}
648 	}
649 
650 	/* Clear the limits, since we don't need to apply them anymore */
651 	rwb_limit[FTL_RWB_TYPE_USER] = ftl_rwb_entry_cnt(dev->rwb);
652 apply:
653 	ftl_trace_limits(dev, rwb_limit, dev->num_free);
654 	ftl_rwb_set_limits(dev->rwb, rwb_limit);
655 }
656 
657 static int
658 ftl_invalidate_addr_unlocked(struct spdk_ftl_dev *dev, struct ftl_ppa ppa)
659 {
660 	struct ftl_band *band = ftl_band_from_ppa(dev, ppa);
661 	struct ftl_md *md = &band->md;
662 	uint64_t offset;
663 
664 	offset = ftl_band_lbkoff_from_ppa(band, ppa);
665 
666 	/* The bit might be already cleared if two writes are scheduled to the */
667 	/* same LBA at the same time */
668 	if (spdk_bit_array_get(md->vld_map, offset)) {
669 		assert(md->num_vld > 0);
670 		spdk_bit_array_clear(md->vld_map, offset);
671 		md->num_vld--;
672 		return 1;
673 	}
674 
675 	return 0;
676 }
677 
678 int
679 ftl_invalidate_addr(struct spdk_ftl_dev *dev, struct ftl_ppa ppa)
680 {
681 	struct ftl_band *band;
682 	int rc;
683 
684 	assert(!ftl_ppa_cached(ppa));
685 	band = ftl_band_from_ppa(dev, ppa);
686 
687 	pthread_spin_lock(&band->md.lock);
688 	rc = ftl_invalidate_addr_unlocked(dev, ppa);
689 	pthread_spin_unlock(&band->md.lock);
690 
691 	return rc;
692 }
693 
694 static int
695 ftl_read_retry(int rc)
696 {
697 	return rc == -EAGAIN;
698 }
699 
700 static int
701 ftl_read_canceled(int rc)
702 {
703 	return rc == 0;
704 }
705 
706 static int
707 ftl_submit_read(struct ftl_io *io, ftl_next_ppa_fn next_ppa,
708 		void *ctx)
709 {
710 	struct spdk_ftl_dev *dev = io->dev;
711 	struct ftl_ppa ppa;
712 	size_t lbk = 0;
713 	int rc = 0, lbk_cnt;
714 
715 	while (lbk < io->lbk_cnt) {
716 		/* We might hit the cache here, if so, skip the read */
717 		lbk_cnt = rc = next_ppa(io, &ppa, lbk, ctx);
718 
719 		/* We might need to retry the read from scratch (e.g. */
720 		/* because write was under way and completed before */
721 		/* we could read it from rwb */
722 		if (ftl_read_retry(rc)) {
723 			continue;
724 		}
725 
726 		/* We don't have to schedule the read, as it was read from cache */
727 		if (ftl_read_canceled(rc)) {
728 			ftl_io_update_iovec(io, 1);
729 			lbk++;
730 			continue;
731 		}
732 
733 		assert(lbk_cnt > 0);
734 
735 		ftl_trace_submission(dev, io, ppa, lbk_cnt);
736 		rc = spdk_nvme_ns_cmd_read(dev->ns, ftl_get_read_qpair(dev),
737 					   ftl_io_iovec_addr(io),
738 					   ftl_ppa_addr_pack(io->dev, ppa), lbk_cnt,
739 					   ftl_io_cmpl_cb, io, 0);
740 		if (rc) {
741 			SPDK_ERRLOG("spdk_nvme_ns_cmd_read failed with status: %d\n", rc);
742 			io->status = -EIO;
743 			break;
744 		}
745 
746 		ftl_io_update_iovec(io, lbk_cnt);
747 		ftl_io_inc_req(io);
748 		lbk += lbk_cnt;
749 	}
750 
751 	/* If we didn't have to read anything from the device, */
752 	/* complete the request right away */
753 	if (ftl_io_done(io)) {
754 		ftl_io_complete(io);
755 	}
756 
757 	return rc;
758 }
759 
760 static int
761 ftl_ppa_cache_read(struct ftl_io *io, uint64_t lba,
762 		   struct ftl_ppa ppa, void *buf)
763 {
764 	struct ftl_rwb *rwb = io->dev->rwb;
765 	struct ftl_rwb_entry *entry;
766 	struct ftl_ppa nppa;
767 	int rc = 0;
768 
769 	entry = ftl_rwb_entry_from_offset(rwb, ppa.offset);
770 	pthread_spin_lock(&entry->lock);
771 
772 	nppa = ftl_l2p_get(io->dev, lba);
773 	if (ppa.ppa != nppa.ppa) {
774 		rc = -1;
775 		goto out;
776 	}
777 
778 	memcpy(buf, entry->data, FTL_BLOCK_SIZE);
779 out:
780 	pthread_spin_unlock(&entry->lock);
781 	return rc;
782 }
783 
784 static int
785 ftl_lba_read_next_ppa(struct ftl_io *io, struct ftl_ppa *ppa,
786 		      size_t lbk, void *ctx)
787 {
788 	struct spdk_ftl_dev *dev = io->dev;
789 	*ppa = ftl_l2p_get(dev, io->lba + lbk);
790 
791 	(void) ctx;
792 
793 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Read ppa:%lx, lba:%lu\n", ppa->ppa, io->lba);
794 
795 	/* If the PPA is invalid, skip it (the buffer should already be zero'ed) */
796 	if (ftl_ppa_invalid(*ppa)) {
797 		ftl_trace_completion(io->dev, io, FTL_TRACE_COMPLETION_INVALID);
798 		return 0;
799 	}
800 
801 	if (ftl_ppa_cached(*ppa)) {
802 		if (!ftl_ppa_cache_read(io, io->lba + lbk, *ppa, ftl_io_iovec_addr(io))) {
803 			ftl_trace_completion(io->dev, io, FTL_TRACE_COMPLETION_CACHE);
804 			return 0;
805 		}
806 
807 		/* If the state changed, we have to re-read the l2p */
808 		return -EAGAIN;
809 	}
810 
811 	/* We want to read one lbk at a time */
812 	return 1;
813 }
814 
815 static void
816 ftl_complete_flush(struct ftl_flush *flush)
817 {
818 	assert(flush->num_req == 0);
819 	LIST_REMOVE(flush, list_entry);
820 
821 	flush->cb.fn(flush->cb.ctx, 0);
822 
823 	spdk_bit_array_free(&flush->bmap);
824 	free(flush);
825 }
826 
827 static void
828 ftl_process_flush(struct spdk_ftl_dev *dev, struct ftl_rwb_batch *batch)
829 {
830 	struct ftl_flush *flush, *tflush;
831 	size_t offset;
832 
833 	LIST_FOREACH_SAFE(flush, &dev->flush_list, list_entry, tflush) {
834 		offset = ftl_rwb_batch_get_offset(batch);
835 
836 		if (spdk_bit_array_get(flush->bmap, offset)) {
837 			spdk_bit_array_set(flush->bmap, offset);
838 			if (!(--flush->num_req)) {
839 				ftl_complete_flush(flush);
840 			}
841 		}
842 	}
843 }
844 
845 static void
846 ftl_write_fail(struct ftl_io *io, int status)
847 {
848 	struct ftl_rwb_batch *batch = io->rwb_batch;
849 	struct spdk_ftl_dev *dev = io->dev;
850 	struct ftl_rwb_entry *entry;
851 	struct ftl_band *band;
852 	char buf[128];
853 
854 	entry = ftl_rwb_batch_first_entry(batch);
855 
856 	band = ftl_band_from_ppa(io->dev, entry->ppa);
857 	SPDK_ERRLOG("Write failed @ppa: %s, status: %d\n",
858 		    ftl_ppa2str(entry->ppa, buf, sizeof(buf)), status);
859 
860 	/* Close the band and, halt wptr and defrag */
861 	ftl_halt_writes(dev, band);
862 
863 	ftl_rwb_foreach(entry, batch) {
864 		/* Invalidate meta set by process_writes() */
865 		ftl_invalidate_addr(dev, entry->ppa);
866 	}
867 
868 	/* Reset the batch back to the the RWB to resend it later */
869 	ftl_rwb_batch_revert(batch);
870 }
871 
872 static void
873 ftl_write_cb(void *arg, int status)
874 {
875 	struct ftl_io *io = arg;
876 	struct spdk_ftl_dev *dev = io->dev;
877 	struct ftl_rwb_batch *batch = io->rwb_batch;
878 	struct ftl_rwb_entry *entry;
879 
880 	if (status) {
881 		ftl_write_fail(io, status);
882 		return;
883 	}
884 
885 	assert(io->lbk_cnt == dev->xfer_size);
886 	ftl_rwb_foreach(entry, batch) {
887 		if (!(io->flags & FTL_IO_MD) && !(entry->flags & FTL_IO_PAD)) {
888 			/* Verify that the LBA is set for user lbks */
889 			assert(entry->lba != FTL_LBA_INVALID);
890 		}
891 
892 		SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write ppa:%lu, lba:%lu\n",
893 			      entry->ppa.ppa, entry->lba);
894 
895 		if (ftl_update_md_entry(dev, entry)) {
896 			ftl_rwb_entry_invalidate(entry);
897 		}
898 	}
899 
900 	ftl_process_flush(dev, batch);
901 	ftl_rwb_batch_release(batch);
902 }
903 
904 static void
905 ftl_update_rwb_stats(struct spdk_ftl_dev *dev, const struct ftl_rwb_entry *entry)
906 {
907 	if (!ftl_rwb_entry_internal(entry)) {
908 		dev->stats.write_user++;
909 	}
910 	dev->stats.write_total++;
911 }
912 
913 static void
914 ftl_update_l2p(struct spdk_ftl_dev *dev, const struct ftl_rwb_entry *entry,
915 	       struct ftl_ppa ppa)
916 {
917 	struct ftl_ppa prev_ppa;
918 	struct ftl_rwb_entry *prev;
919 	struct ftl_band *band;
920 	int valid;
921 
922 	prev_ppa = ftl_l2p_get(dev, entry->lba);
923 	if (ftl_ppa_invalid(prev_ppa)) {
924 		ftl_l2p_set(dev, entry->lba, ppa);
925 		return;
926 	}
927 
928 	/* If the L2P's PPA is different than what we expected we don't need to */
929 	/* do anything (someone's already overwritten our data). */
930 	if (ftl_rwb_entry_weak(entry) && !ftl_ppa_cmp(prev_ppa, entry->ppa)) {
931 		return;
932 	}
933 
934 	if (ftl_ppa_cached(prev_ppa)) {
935 		assert(!ftl_rwb_entry_weak(entry));
936 		prev = ftl_rwb_entry_from_offset(dev->rwb, prev_ppa.offset);
937 		pthread_spin_lock(&prev->lock);
938 
939 		/* Re-read the L2P under the lock to protect against updates */
940 		/* to this LBA from other threads */
941 		prev_ppa = ftl_l2p_get(dev, entry->lba);
942 
943 		/* If the entry is no longer in cache, another write has been */
944 		/* scheduled in the meantime, so we have to invalidate its LBA */
945 		if (!ftl_ppa_cached(prev_ppa)) {
946 			ftl_invalidate_addr(dev, prev_ppa);
947 		}
948 
949 		/* If previous entry is part of cache, remove and invalidate it */
950 		if (ftl_rwb_entry_valid(prev)) {
951 			ftl_invalidate_addr(dev, prev->ppa);
952 			ftl_rwb_entry_invalidate(prev);
953 		}
954 
955 		ftl_l2p_set(dev, entry->lba, ppa);
956 		pthread_spin_unlock(&prev->lock);
957 		return;
958 	}
959 
960 	/* Lock the band containing previous PPA. This assures atomic changes to */
961 	/* the L2P as wall as metadata. The valid bits in metadata are used to */
962 	/* check weak writes validity. */
963 	band = ftl_band_from_ppa(dev, prev_ppa);
964 	pthread_spin_lock(&band->md.lock);
965 
966 	valid = ftl_invalidate_addr_unlocked(dev, prev_ppa);
967 
968 	/* If the address has been invalidated already, we don't want to update */
969 	/* the L2P for weak writes, as it means the write is no longer valid. */
970 	if (!ftl_rwb_entry_weak(entry) || valid) {
971 		ftl_l2p_set(dev, entry->lba, ppa);
972 	}
973 
974 	pthread_spin_unlock(&band->md.lock);
975 }
976 
977 static int
978 ftl_submit_write(struct ftl_wptr *wptr, struct ftl_io *io)
979 {
980 	struct spdk_ftl_dev	*dev = io->dev;
981 	struct iovec		*iov = ftl_io_iovec(io);
982 	int			rc = 0;
983 	size_t			i;
984 
985 	for (i = 0; i < io->iov_cnt; ++i) {
986 		assert(iov[i].iov_len > 0);
987 		assert(iov[i].iov_len / PAGE_SIZE == dev->xfer_size);
988 
989 		ftl_trace_submission(dev, io, wptr->ppa, iov[i].iov_len / PAGE_SIZE);
990 		rc = spdk_nvme_ns_cmd_write_with_md(dev->ns, ftl_get_write_qpair(dev),
991 						    iov[i].iov_base, ftl_io_get_md(io),
992 						    ftl_ppa_addr_pack(dev, wptr->ppa),
993 						    iov[i].iov_len / PAGE_SIZE,
994 						    ftl_io_cmpl_cb, io, 0, 0, 0);
995 		if (rc) {
996 			SPDK_ERRLOG("spdk_nvme_ns_cmd_write failed with status:%d, ppa:%lu\n",
997 				    rc, wptr->ppa.ppa);
998 			io->status = -EIO;
999 			break;
1000 		}
1001 
1002 		io->pos = iov[i].iov_len / PAGE_SIZE;
1003 		ftl_io_inc_req(io);
1004 		ftl_wptr_advance(wptr, iov[i].iov_len / PAGE_SIZE);
1005 	}
1006 
1007 	if (ftl_io_done(io)) {
1008 		ftl_io_complete(io);
1009 	}
1010 
1011 	return rc;
1012 }
1013 
1014 static void
1015 ftl_flush_pad_batch(struct spdk_ftl_dev *dev)
1016 {
1017 	struct ftl_rwb *rwb = dev->rwb;
1018 	size_t size;
1019 
1020 	size = ftl_rwb_num_acquired(rwb, FTL_RWB_TYPE_INTERNAL) +
1021 	       ftl_rwb_num_acquired(rwb, FTL_RWB_TYPE_USER);
1022 
1023 	/* There must be something in the RWB, otherwise the flush */
1024 	/* wouldn't be waiting for anything */
1025 	assert(size > 0);
1026 
1027 	/* Only add padding when there's less than xfer size */
1028 	/* entries in the buffer. Otherwise we just have to wait */
1029 	/* for the entries to become ready. */
1030 	if (size < dev->xfer_size) {
1031 		ftl_rwb_pad(dev, dev->xfer_size - (size % dev->xfer_size));
1032 	}
1033 }
1034 
1035 static int
1036 ftl_wptr_process_writes(struct ftl_wptr *wptr)
1037 {
1038 	struct spdk_ftl_dev	*dev = wptr->dev;
1039 	struct ftl_rwb_batch	*batch;
1040 	struct ftl_rwb_entry	*entry;
1041 	struct ftl_io		*io;
1042 	struct ftl_ppa		ppa;
1043 
1044 	/* Make sure the band is prepared for writing */
1045 	if (!ftl_wptr_ready(wptr)) {
1046 		return 0;
1047 	}
1048 
1049 	if (dev->halt) {
1050 		ftl_process_shutdown(dev);
1051 	}
1052 
1053 	batch = ftl_rwb_pop(dev->rwb);
1054 	if (!batch) {
1055 		/* If there are queued flush requests we need to pad the RWB to */
1056 		/* force out remaining entries */
1057 		if (!LIST_EMPTY(&dev->flush_list)) {
1058 			ftl_flush_pad_batch(dev);
1059 		}
1060 
1061 		return 0;
1062 	}
1063 
1064 	io = ftl_io_rwb_init(dev, wptr->band, batch, ftl_write_cb);
1065 	if (!io) {
1066 		goto error;
1067 	}
1068 
1069 	ppa = wptr->ppa;
1070 	ftl_rwb_foreach(entry, batch) {
1071 		entry->ppa = ppa;
1072 		/* Setting entry's cache bit needs to be done after metadata */
1073 		/* within the band is updated to make sure that writes */
1074 		/* invalidating the entry clear the metadata as well */
1075 		if (entry->lba != FTL_LBA_INVALID) {
1076 			ftl_band_set_addr(wptr->band, entry->lba, entry->ppa);
1077 		}
1078 
1079 		ftl_rwb_entry_set_valid(entry);
1080 
1081 		ftl_trace_rwb_pop(dev, entry);
1082 		ftl_update_rwb_stats(dev, entry);
1083 
1084 		ppa = ftl_band_next_ppa(wptr->band, ppa, 1);
1085 	}
1086 
1087 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write ppa:%lx, %lx\n", wptr->ppa.ppa,
1088 		      ftl_ppa_addr_pack(dev, wptr->ppa));
1089 
1090 	if (ftl_submit_write(wptr, io)) {
1091 		/* TODO: we need some recovery here */
1092 		assert(0 && "Write submit failed");
1093 		if (ftl_io_done(io)) {
1094 			ftl_io_free(io);
1095 		}
1096 	}
1097 
1098 	return dev->xfer_size;
1099 error:
1100 	ftl_rwb_batch_revert(batch);
1101 	return 0;
1102 }
1103 
1104 static int
1105 ftl_process_writes(struct spdk_ftl_dev *dev)
1106 {
1107 	struct ftl_wptr *wptr, *twptr;
1108 	size_t num_active = 0;
1109 	enum ftl_band_state state;
1110 
1111 	LIST_FOREACH_SAFE(wptr, &dev->wptr_list, list_entry, twptr) {
1112 		ftl_wptr_process_writes(wptr);
1113 		state = wptr->band->state;
1114 
1115 		if (state != FTL_BAND_STATE_FULL &&
1116 		    state != FTL_BAND_STATE_CLOSING &&
1117 		    state != FTL_BAND_STATE_CLOSED) {
1118 			num_active++;
1119 		}
1120 	}
1121 
1122 	if (num_active < 1) {
1123 		ftl_add_wptr(dev);
1124 	}
1125 
1126 	return 0;
1127 }
1128 
1129 static void
1130 ftl_rwb_entry_fill(struct ftl_rwb_entry *entry, struct ftl_io *io)
1131 {
1132 	struct ftl_band *band;
1133 
1134 	memcpy(entry->data, ftl_io_iovec_addr(io), FTL_BLOCK_SIZE);
1135 
1136 	if (ftl_rwb_entry_weak(entry)) {
1137 		band = ftl_band_from_ppa(io->dev, io->ppa);
1138 		entry->ppa = ftl_band_next_ppa(band, io->ppa, io->pos);
1139 	}
1140 
1141 	entry->trace = io->trace;
1142 
1143 	if (entry->md) {
1144 		memcpy(entry->md, &entry->lba, sizeof(io->lba));
1145 	}
1146 }
1147 
1148 static int
1149 ftl_rwb_fill(struct ftl_io *io)
1150 {
1151 	struct spdk_ftl_dev *dev = io->dev;
1152 	struct ftl_rwb_entry *entry;
1153 	struct ftl_ppa ppa = { .cached = 1 };
1154 	int flags = ftl_rwb_flags_from_io(io);
1155 	uint64_t lba;
1156 
1157 	for (; io->pos < io->lbk_cnt; ++io->pos) {
1158 		lba = ftl_io_current_lba(io);
1159 		if (lba == FTL_LBA_INVALID) {
1160 			ftl_io_update_iovec(io, 1);
1161 			continue;
1162 		}
1163 
1164 		entry = ftl_acquire_entry(dev, flags);
1165 		if (!entry) {
1166 			return -EAGAIN;
1167 		}
1168 
1169 		entry->lba = lba;
1170 		ftl_rwb_entry_fill(entry, io);
1171 
1172 		ppa.offset = entry->pos;
1173 
1174 		ftl_io_update_iovec(io, 1);
1175 		ftl_update_l2p(dev, entry, ppa);
1176 
1177 		/* Needs to be done after L2P is updated to avoid race with */
1178 		/* write completion callback when it's processed faster than */
1179 		/* L2P is set in update_l2p(). */
1180 		ftl_rwb_push(entry);
1181 		ftl_trace_rwb_fill(dev, io);
1182 	}
1183 
1184 	ftl_io_complete(io);
1185 	return 0;
1186 }
1187 
1188 static bool
1189 ftl_dev_needs_defrag(struct spdk_ftl_dev *dev)
1190 {
1191 	const struct spdk_ftl_limit *limit = ftl_get_limit(dev, SPDK_FTL_LIMIT_START);
1192 
1193 	if (ftl_reloc_is_halted(dev->reloc)) {
1194 		return false;
1195 	}
1196 
1197 	if (dev->df_band) {
1198 		return false;
1199 	}
1200 
1201 	if (dev->num_free <= limit->thld) {
1202 		return true;
1203 	}
1204 
1205 	return false;
1206 }
1207 
1208 static double
1209 ftl_band_calc_merit(struct ftl_band *band, size_t *threshold_valid)
1210 {
1211 	size_t usable, valid, invalid;
1212 	double vld_ratio;
1213 
1214 	/* If the band doesn't have any usable lbks it's of no use */
1215 	usable = ftl_band_num_usable_lbks(band);
1216 	if (usable == 0) {
1217 		return 0.0;
1218 	}
1219 
1220 	valid =  threshold_valid ? (usable - *threshold_valid) : band->md.num_vld;
1221 	invalid = usable - valid;
1222 
1223 	/* Add one to avoid division by 0 */
1224 	vld_ratio = (double)invalid / (double)(valid + 1);
1225 	return vld_ratio * ftl_band_age(band);
1226 }
1227 
1228 static bool
1229 ftl_band_needs_defrag(struct ftl_band *band, struct spdk_ftl_dev *dev)
1230 {
1231 	struct spdk_ftl_conf *conf = &dev->conf;
1232 	size_t thld_vld;
1233 
1234 	/* If we're in dire need of free bands, every band is worth defragging */
1235 	if (ftl_current_limit(dev) == SPDK_FTL_LIMIT_CRIT) {
1236 		return true;
1237 	}
1238 
1239 	thld_vld = (ftl_band_num_usable_lbks(band) * conf->defrag.invalid_thld) / 100;
1240 
1241 	return band->merit > ftl_band_calc_merit(band, &thld_vld);
1242 }
1243 
1244 static struct ftl_band *
1245 ftl_select_defrag_band(struct spdk_ftl_dev *dev)
1246 {
1247 	struct ftl_band *band, *mband = NULL;
1248 	double merit = 0;
1249 
1250 	LIST_FOREACH(band, &dev->shut_bands, list_entry) {
1251 		assert(band->state == FTL_BAND_STATE_CLOSED);
1252 		band->merit = ftl_band_calc_merit(band, NULL);
1253 		if (band->merit > merit) {
1254 			merit = band->merit;
1255 			mband = band;
1256 		}
1257 	}
1258 
1259 	if (mband && !ftl_band_needs_defrag(mband, dev)) {
1260 		mband = NULL;
1261 	}
1262 
1263 	return mband;
1264 }
1265 
1266 static void
1267 ftl_process_relocs(struct spdk_ftl_dev *dev)
1268 {
1269 	if (ftl_dev_needs_defrag(dev)) {
1270 		dev->df_band = ftl_select_defrag_band(dev);
1271 		if (dev->df_band) {
1272 			ftl_reloc_add(dev->reloc, dev->df_band, 0, ftl_num_band_lbks(dev), 0);
1273 		}
1274 	}
1275 
1276 	ftl_reloc(dev->reloc);
1277 }
1278 
1279 int
1280 ftl_current_limit(const struct spdk_ftl_dev *dev)
1281 {
1282 	return dev->limit;
1283 }
1284 
1285 int
1286 spdk_ftl_dev_get_attrs(const struct spdk_ftl_dev *dev, struct spdk_ftl_attrs *attrs)
1287 {
1288 	if (!dev || !attrs) {
1289 		return -EINVAL;
1290 	}
1291 
1292 	attrs->uuid = dev->uuid;
1293 	attrs->lbk_cnt = dev->num_lbas;
1294 	attrs->lbk_size = FTL_BLOCK_SIZE;
1295 	attrs->range = dev->range;
1296 
1297 	return 0;
1298 }
1299 
1300 static void
1301 _ftl_io_write(void *ctx)
1302 {
1303 	ftl_io_write((struct ftl_io *)ctx);
1304 }
1305 
1306 int
1307 ftl_io_write(struct ftl_io *io)
1308 {
1309 	struct spdk_ftl_dev *dev = io->dev;
1310 
1311 	/* For normal IOs we just need to copy the data onto the rwb */
1312 	if (!(io->flags & FTL_IO_MD)) {
1313 		return ftl_rwb_fill(io);
1314 	}
1315 
1316 	/* Metadata has its own buffer, so it doesn't have to be copied, so just */
1317 	/* send it the the core thread and schedule the write immediately */
1318 	if (ftl_check_core_thread(dev)) {
1319 		return ftl_submit_write(ftl_wptr_from_band(io->band), io);
1320 	}
1321 
1322 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_write, io);
1323 
1324 	return 0;
1325 }
1326 
1327 
1328 
1329 static int
1330 _spdk_ftl_write(struct ftl_io *io)
1331 {
1332 	int rc;
1333 
1334 	rc = ftl_io_write(io);
1335 	if (rc == -EAGAIN) {
1336 		spdk_thread_send_msg(spdk_io_channel_get_thread(io->ch),
1337 				     _ftl_write, io);
1338 		return 0;
1339 	}
1340 
1341 	if (rc) {
1342 		ftl_io_free(io);
1343 	}
1344 
1345 	return rc;
1346 }
1347 
1348 static void
1349 _ftl_write(void *ctx)
1350 {
1351 	_spdk_ftl_write(ctx);
1352 }
1353 
1354 int
1355 spdk_ftl_write(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
1356 	       struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
1357 {
1358 	struct ftl_io *io;
1359 
1360 	if (!iov || !cb_fn || !dev) {
1361 		return -EINVAL;
1362 	}
1363 
1364 	if (iov_cnt == 0 || iov_cnt > FTL_MAX_IOV) {
1365 		return -EINVAL;
1366 	}
1367 
1368 	if (lba_cnt == 0) {
1369 		return -EINVAL;
1370 	}
1371 
1372 	if (lba_cnt != ftl_iovec_num_lbks(iov, iov_cnt)) {
1373 		return -EINVAL;
1374 	}
1375 
1376 	if (!dev->initialized) {
1377 		return -EBUSY;
1378 	}
1379 
1380 	io = ftl_io_alloc(ch);
1381 	if (!io) {
1382 		return -ENOMEM;
1383 	}
1384 
1385 	ftl_io_user_init(dev, io, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_WRITE);
1386 	return _spdk_ftl_write(io);
1387 }
1388 
1389 int
1390 ftl_io_read(struct ftl_io *io)
1391 {
1392 	struct spdk_ftl_dev *dev = io->dev;
1393 	ftl_next_ppa_fn	next_ppa;
1394 
1395 	if (ftl_check_read_thread(dev)) {
1396 		if (ftl_io_mode_ppa(io)) {
1397 			next_ppa = ftl_ppa_read_next_ppa;
1398 		} else {
1399 			next_ppa = ftl_lba_read_next_ppa;
1400 		}
1401 
1402 		return ftl_submit_read(io, next_ppa, NULL);
1403 	}
1404 
1405 	spdk_thread_send_msg(ftl_get_read_thread(dev), _ftl_read, io);
1406 	return 0;
1407 }
1408 
1409 static void
1410 _ftl_read(void *arg)
1411 {
1412 	ftl_io_read((struct ftl_io *)arg);
1413 }
1414 
1415 int
1416 spdk_ftl_read(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
1417 	      struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
1418 {
1419 	struct ftl_io *io;
1420 
1421 	if (!iov || !cb_fn || !dev) {
1422 		return -EINVAL;
1423 	}
1424 
1425 	if (iov_cnt == 0 || iov_cnt > FTL_MAX_IOV) {
1426 		return -EINVAL;
1427 	}
1428 
1429 	if (lba_cnt == 0) {
1430 		return -EINVAL;
1431 	}
1432 
1433 	if (lba_cnt != ftl_iovec_num_lbks(iov, iov_cnt)) {
1434 		return -EINVAL;
1435 	}
1436 
1437 	if (!dev->initialized) {
1438 		return -EBUSY;
1439 	}
1440 
1441 	io = ftl_io_alloc(ch);
1442 	if (!io) {
1443 		return -ENOMEM;
1444 	}
1445 
1446 	ftl_io_user_init(dev, io, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_READ);
1447 	return ftl_io_read(io);
1448 }
1449 
1450 static struct ftl_flush *
1451 ftl_flush_init(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
1452 {
1453 	struct ftl_flush *flush;
1454 	struct ftl_rwb *rwb = dev->rwb;
1455 
1456 	flush = calloc(1, sizeof(*flush));
1457 	if (!flush) {
1458 		return NULL;
1459 	}
1460 
1461 	flush->bmap = spdk_bit_array_create(ftl_rwb_num_batches(rwb));
1462 	if (!flush->bmap) {
1463 		goto error;
1464 	}
1465 
1466 	flush->dev = dev;
1467 	flush->cb.fn = cb_fn;
1468 	flush->cb.ctx = cb_arg;
1469 
1470 	return flush;
1471 error:
1472 	free(flush);
1473 	return NULL;
1474 }
1475 
1476 static void
1477 _ftl_flush(void *ctx)
1478 {
1479 	struct ftl_flush *flush = ctx;
1480 	struct spdk_ftl_dev *dev = flush->dev;
1481 	struct ftl_rwb *rwb = dev->rwb;
1482 	struct ftl_rwb_batch *batch;
1483 
1484 	/* Attach flush object to all non-empty batches */
1485 	ftl_rwb_foreach_batch(batch, rwb) {
1486 		if (!ftl_rwb_batch_empty(batch)) {
1487 			spdk_bit_array_set(flush->bmap, ftl_rwb_batch_get_offset(batch));
1488 			flush->num_req++;
1489 		}
1490 	}
1491 
1492 	LIST_INSERT_HEAD(&dev->flush_list, flush, list_entry);
1493 
1494 	/* If the RWB was already empty, the flush can be completed right away */
1495 	if (!flush->num_req) {
1496 		ftl_complete_flush(flush);
1497 	}
1498 }
1499 
1500 int
1501 spdk_ftl_flush(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
1502 {
1503 	struct ftl_flush *flush;
1504 
1505 	if (!dev || !cb_fn) {
1506 		return -EINVAL;
1507 	}
1508 
1509 	if (!dev->initialized) {
1510 		return -EBUSY;
1511 	}
1512 
1513 	flush = ftl_flush_init(dev, cb_fn, cb_arg);
1514 	if (!flush) {
1515 		return -ENOMEM;
1516 	}
1517 
1518 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_flush, flush);
1519 	return 0;
1520 }
1521 
1522 void
1523 ftl_process_anm_event(struct ftl_anm_event *event)
1524 {
1525 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Unconsumed ANM received for dev: %p...\n", event->dev);
1526 	ftl_anm_event_complete(event);
1527 }
1528 
1529 int
1530 ftl_task_read(void *ctx)
1531 {
1532 	struct ftl_thread *thread = ctx;
1533 	struct spdk_ftl_dev *dev = thread->dev;
1534 	struct spdk_nvme_qpair *qpair = ftl_get_read_qpair(dev);
1535 
1536 	if (dev->halt) {
1537 		if (ftl_shutdown_complete(dev)) {
1538 			spdk_poller_unregister(&thread->poller);
1539 			return 0;
1540 		}
1541 	}
1542 
1543 	return spdk_nvme_qpair_process_completions(qpair, 1);
1544 }
1545 
1546 int
1547 ftl_task_core(void *ctx)
1548 {
1549 	struct ftl_thread *thread = ctx;
1550 	struct spdk_ftl_dev *dev = thread->dev;
1551 	struct spdk_nvme_qpair *qpair = ftl_get_write_qpair(dev);
1552 
1553 	if (dev->halt) {
1554 		if (ftl_shutdown_complete(dev)) {
1555 			spdk_poller_unregister(&thread->poller);
1556 			return 0;
1557 		}
1558 	}
1559 
1560 	ftl_process_writes(dev);
1561 	spdk_nvme_qpair_process_completions(qpair, 1);
1562 	ftl_process_relocs(dev);
1563 
1564 	return 0;
1565 }
1566 
1567 SPDK_LOG_REGISTER_COMPONENT("ftl_core", SPDK_LOG_FTL_CORE)
1568