xref: /spdk/lib/ftl/ftl_core.c (revision ba23cec1820104cc710ad776f0127e1cf82033aa)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/likely.h"
35 #include "spdk/stdinc.h"
36 #include "spdk/nvme.h"
37 #include "spdk/thread.h"
38 #include "spdk/bdev_module.h"
39 #include "spdk/string.h"
40 #include "spdk_internal/log.h"
41 #include "spdk/ftl.h"
42 #include "spdk/crc32.h"
43 
44 #include "ftl_core.h"
45 #include "ftl_band.h"
46 #include "ftl_io.h"
47 #include "ftl_debug.h"
48 #include "ftl_reloc.h"
49 
50 struct ftl_band_flush {
51 	struct spdk_ftl_dev		*dev;
52 	/* Number of bands left to be flushed */
53 	size_t				num_bands;
54 	/* User callback */
55 	spdk_ftl_fn			cb_fn;
56 	/* Callback's argument */
57 	void				*cb_arg;
58 	/* List link */
59 	LIST_ENTRY(ftl_band_flush)	list_entry;
60 };
61 
62 struct ftl_wptr {
63 	/* Owner device */
64 	struct spdk_ftl_dev		*dev;
65 
66 	/* Current address */
67 	struct ftl_addr			addr;
68 
69 	/* Band currently being written to */
70 	struct ftl_band			*band;
71 
72 	/* Current logical block's offset */
73 	uint64_t			offset;
74 
75 	/* Current zone */
76 	struct ftl_zone			*zone;
77 
78 	/* Pending IO queue */
79 	TAILQ_HEAD(, ftl_io)		pending_queue;
80 
81 	/* List link */
82 	LIST_ENTRY(ftl_wptr)		list_entry;
83 
84 	/*
85 	 * If setup in direct mode, there will be no offset or band state update after IO.
86 	 * The zoned bdev address is not assigned by wptr, and is instead taken directly
87 	 * from the request.
88 	 */
89 	bool				direct_mode;
90 
91 	/* Number of outstanding write requests */
92 	uint32_t			num_outstanding;
93 
94 	/* Marks that the band related to this wptr needs to be closed as soon as possible */
95 	bool				flush;
96 };
97 
98 struct ftl_flush {
99 	/* Owner device */
100 	struct spdk_ftl_dev		*dev;
101 
102 	/* Number of batches to wait for */
103 	size_t				num_req;
104 
105 	/* Callback */
106 	struct {
107 		spdk_ftl_fn		fn;
108 		void			*ctx;
109 	} cb;
110 
111 	/* Batch bitmap */
112 	struct spdk_bit_array		*bmap;
113 
114 	/* List link */
115 	LIST_ENTRY(ftl_flush)		list_entry;
116 };
117 
118 static void
119 ftl_wptr_free(struct ftl_wptr *wptr)
120 {
121 	if (!wptr) {
122 		return;
123 	}
124 
125 	free(wptr);
126 }
127 
128 static void
129 ftl_remove_wptr(struct ftl_wptr *wptr)
130 {
131 	struct spdk_ftl_dev *dev = wptr->dev;
132 	struct ftl_band_flush *flush, *tmp;
133 
134 	if (spdk_unlikely(wptr->flush)) {
135 		LIST_FOREACH_SAFE(flush, &dev->band_flush_list, list_entry, tmp) {
136 			assert(flush->num_bands > 0);
137 			if (--flush->num_bands == 0) {
138 				flush->cb_fn(flush->cb_arg, 0);
139 				LIST_REMOVE(flush, list_entry);
140 				free(flush);
141 			}
142 		}
143 	}
144 
145 	LIST_REMOVE(wptr, list_entry);
146 	ftl_wptr_free(wptr);
147 }
148 
149 static void ftl_evict_cache_entry(struct spdk_ftl_dev *dev, struct ftl_wbuf_entry *entry);
150 
151 static struct ftl_wbuf_entry *
152 ftl_acquire_wbuf_entry(struct ftl_io_channel *io_channel, int io_flags)
153 {
154 	struct ftl_wbuf_entry *entry = NULL;
155 	uint32_t qdepth;
156 
157 	if (!(io_flags & FTL_IO_INTERNAL)) {
158 		qdepth = __atomic_fetch_add(&io_channel->qdepth_current, 1, __ATOMIC_SEQ_CST);
159 		if (qdepth >= io_channel->qdepth_limit) {
160 			__atomic_fetch_sub(&io_channel->qdepth_current, 1, __ATOMIC_SEQ_CST);
161 			return NULL;
162 		}
163 	}
164 
165 	if (spdk_ring_dequeue(io_channel->free_queue, (void **)&entry, 1) != 1) {
166 		if (!(io_flags & FTL_IO_INTERNAL)) {
167 			__atomic_fetch_sub(&io_channel->qdepth_current, 1, __ATOMIC_SEQ_CST);
168 		}
169 
170 		return NULL;
171 	}
172 
173 	assert(entry != NULL);
174 
175 	ftl_evict_cache_entry(io_channel->dev, entry);
176 
177 	entry->io_flags = io_flags;
178 	entry->addr.offset = FTL_ADDR_INVALID;
179 	entry->lba = FTL_LBA_INVALID;
180 	entry->band = NULL;
181 	entry->valid = false;
182 
183 	return entry;
184 }
185 
186 static void
187 ftl_release_wbuf_entry(struct ftl_wbuf_entry *entry)
188 {
189 	struct ftl_io_channel *io_channel = entry->ioch;
190 
191 	if (!(entry->io_flags & FTL_IO_INTERNAL)) {
192 		__atomic_fetch_sub(&io_channel->qdepth_current, 1, __ATOMIC_SEQ_CST);
193 	}
194 
195 	spdk_ring_enqueue(io_channel->free_queue, (void **)&entry, 1, NULL);
196 }
197 
198 static struct ftl_batch *
199 ftl_get_next_batch(struct spdk_ftl_dev *dev)
200 {
201 	struct ftl_batch *batch = dev->current_batch;
202 	struct ftl_io_channel *ioch;
203 #define FTL_DEQUEUE_ENTRIES 128
204 	struct ftl_wbuf_entry *entries[FTL_DEQUEUE_ENTRIES];
205 	TAILQ_HEAD(, ftl_io_channel) ioch_queue;
206 	size_t i, num_dequeued, num_remaining;
207 	uint64_t *metadata;
208 
209 	if (batch == NULL) {
210 		batch = TAILQ_FIRST(&dev->pending_batches);
211 		if (batch != NULL) {
212 			TAILQ_REMOVE(&dev->pending_batches, batch, tailq);
213 			return batch;
214 		}
215 
216 		batch = TAILQ_FIRST(&dev->free_batches);
217 		if (spdk_unlikely(batch == NULL)) {
218 			return NULL;
219 		}
220 
221 		assert(TAILQ_EMPTY(&batch->entries));
222 		assert(batch->num_entries == 0);
223 		TAILQ_REMOVE(&dev->free_batches, batch, tailq);
224 	}
225 
226 	/*
227 	 * Keep shifting the queue to ensure fairness in IO channel selection.  Each time
228 	 * ftl_get_next_batch() is called, we're starting to dequeue write buffer entries from a
229 	 * different IO channel.
230 	 */
231 	TAILQ_INIT(&ioch_queue);
232 	while (!TAILQ_EMPTY(&dev->ioch_queue)) {
233 		ioch = TAILQ_FIRST(&dev->ioch_queue);
234 		TAILQ_REMOVE(&dev->ioch_queue, ioch, tailq);
235 		TAILQ_INSERT_TAIL(&ioch_queue, ioch, tailq);
236 
237 		num_remaining = dev->xfer_size - batch->num_entries;
238 		while (num_remaining > 0) {
239 			num_dequeued = spdk_ring_dequeue(ioch->submit_queue, (void **)entries,
240 							 spdk_min(num_remaining,
241 									 FTL_DEQUEUE_ENTRIES));
242 			if (num_dequeued == 0) {
243 				break;
244 			}
245 
246 			for (i = 0; i < num_dequeued; ++i) {
247 				batch->iov[batch->num_entries + i].iov_base = entries[i]->payload;
248 				batch->iov[batch->num_entries + i].iov_len = FTL_BLOCK_SIZE;
249 
250 				if (batch->metadata != NULL) {
251 					metadata = (uint64_t *)((char *)batch->metadata +
252 								i * dev->md_size);
253 					*metadata = entries[i]->lba;
254 				}
255 
256 				TAILQ_INSERT_TAIL(&batch->entries, entries[i], tailq);
257 			}
258 
259 			batch->num_entries += num_dequeued;
260 			num_remaining -= num_dequeued;
261 		}
262 
263 		if (num_remaining == 0) {
264 			break;
265 		}
266 	}
267 
268 	TAILQ_CONCAT(&dev->ioch_queue, &ioch_queue, tailq);
269 
270 	if (batch->num_entries == dev->xfer_size) {
271 		dev->current_batch = NULL;
272 	} else {
273 		dev->current_batch = batch;
274 		batch = NULL;
275 	}
276 
277 	return batch;
278 }
279 
280 static void
281 ftl_release_batch(struct spdk_ftl_dev *dev, struct ftl_batch *batch)
282 {
283 	struct ftl_wbuf_entry *entry;
284 
285 	while (!TAILQ_EMPTY(&batch->entries)) {
286 		entry = TAILQ_FIRST(&batch->entries);
287 		TAILQ_REMOVE(&batch->entries, entry, tailq);
288 		ftl_release_wbuf_entry(entry);
289 	}
290 
291 	batch->num_entries = 0;
292 	TAILQ_INSERT_TAIL(&dev->free_batches, batch, tailq);
293 }
294 
295 static struct ftl_wbuf_entry *
296 ftl_get_entry_from_addr(struct spdk_ftl_dev *dev, struct ftl_addr addr)
297 {
298 	struct ftl_io_channel *ioch;
299 	uint64_t ioch_offset, entry_offset;
300 
301 	ioch_offset = addr.cache_offset & ((1 << dev->ioch_shift) - 1);
302 	entry_offset = addr.cache_offset >> dev->ioch_shift;
303 	ioch = dev->ioch_array[ioch_offset];
304 
305 	assert(ioch_offset < dev->conf.max_io_channels);
306 	assert(entry_offset < ioch->num_entries);
307 	assert(addr.cached == 1);
308 
309 	return &ioch->wbuf_entries[entry_offset];
310 }
311 
312 static struct ftl_addr
313 ftl_get_addr_from_entry(struct ftl_wbuf_entry *entry)
314 {
315 	struct ftl_io_channel *ioch = entry->ioch;
316 	struct ftl_addr addr = {};
317 
318 	addr.cached = 1;
319 	addr.cache_offset = (uint64_t)entry->index << ioch->dev->ioch_shift | ioch->index;
320 
321 	return addr;
322 }
323 
324 static void
325 ftl_io_cmpl_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
326 {
327 	struct ftl_io *io = cb_arg;
328 	struct spdk_ftl_dev *dev = io->dev;
329 
330 	if (spdk_unlikely(!success)) {
331 		io->status = -EIO;
332 	}
333 
334 	ftl_trace_completion(dev, io, FTL_TRACE_COMPLETION_DISK);
335 
336 	if (io->type == FTL_IO_WRITE && ftl_is_append_supported(dev)) {
337 		assert(io->parent);
338 		io->parent->addr.offset = spdk_bdev_io_get_append_location(bdev_io);
339 	}
340 
341 	ftl_io_dec_req(io);
342 	if (ftl_io_done(io)) {
343 		ftl_io_complete(io);
344 	}
345 
346 	spdk_bdev_free_io(bdev_io);
347 }
348 
349 static void
350 ftl_halt_writes(struct spdk_ftl_dev *dev, struct ftl_band *band)
351 {
352 	struct ftl_wptr *wptr = NULL;
353 
354 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
355 		if (wptr->band == band) {
356 			break;
357 		}
358 	}
359 
360 	/* If the band already has the high_prio flag set, other writes must */
361 	/* have failed earlier, so it's already taken care of. */
362 	if (band->high_prio) {
363 		assert(wptr == NULL);
364 		return;
365 	}
366 
367 	ftl_band_write_failed(band);
368 	ftl_remove_wptr(wptr);
369 }
370 
371 static struct ftl_wptr *
372 ftl_wptr_from_band(struct ftl_band *band)
373 {
374 	struct spdk_ftl_dev *dev = band->dev;
375 	struct ftl_wptr *wptr = NULL;
376 
377 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
378 		if (wptr->band == band) {
379 			return wptr;
380 		}
381 	}
382 
383 	return NULL;
384 }
385 
386 static void
387 ftl_md_write_fail(struct ftl_io *io, int status)
388 {
389 	struct ftl_band *band = io->band;
390 	struct ftl_wptr *wptr;
391 	char buf[128];
392 
393 	wptr = ftl_wptr_from_band(band);
394 	assert(wptr);
395 
396 	SPDK_ERRLOG("Metadata write failed @addr: %s, status: %d\n",
397 		    ftl_addr2str(wptr->addr, buf, sizeof(buf)), status);
398 
399 	ftl_halt_writes(io->dev, band);
400 }
401 
402 static void
403 ftl_md_write_cb(struct ftl_io *io, void *arg, int status)
404 {
405 	struct spdk_ftl_dev *dev = io->dev;
406 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
407 	struct ftl_band *band = io->band;
408 	struct ftl_wptr *wptr;
409 	size_t id;
410 
411 	wptr = ftl_wptr_from_band(band);
412 	assert(wptr);
413 
414 	if (status) {
415 		ftl_md_write_fail(io, status);
416 		return;
417 	}
418 
419 	ftl_band_set_next_state(band);
420 	if (band->state == FTL_BAND_STATE_CLOSED) {
421 		if (ftl_dev_has_nv_cache(dev)) {
422 			pthread_spin_lock(&nv_cache->lock);
423 			nv_cache->num_available += ftl_band_user_blocks(band);
424 
425 			if (spdk_unlikely(nv_cache->num_available > nv_cache->num_data_blocks)) {
426 				nv_cache->num_available = nv_cache->num_data_blocks;
427 			}
428 			pthread_spin_unlock(&nv_cache->lock);
429 		}
430 
431 		/*
432 		 * Go through the reloc_bitmap, checking for all the bands that had its data moved
433 		 * onto current band and update their counters to allow them to be used for writing
434 		 * (once they're closed and empty).
435 		 */
436 		for (id = 0; id < ftl_get_num_bands(dev); ++id) {
437 			if (spdk_bit_array_get(band->reloc_bitmap, id)) {
438 				assert(dev->bands[id].num_reloc_bands > 0);
439 				dev->bands[id].num_reloc_bands--;
440 
441 				spdk_bit_array_clear(band->reloc_bitmap, id);
442 			}
443 		}
444 
445 		ftl_remove_wptr(wptr);
446 	}
447 }
448 
449 static int
450 ftl_read_next_physical_addr(struct ftl_io *io, struct ftl_addr *addr)
451 {
452 	struct spdk_ftl_dev *dev = io->dev;
453 	size_t num_blocks, max_blocks;
454 
455 	assert(ftl_io_mode_physical(io));
456 	assert(io->iov_pos < io->iov_cnt);
457 
458 	if (io->pos == 0) {
459 		*addr = io->addr;
460 	} else {
461 		*addr = ftl_band_next_xfer_addr(io->band, io->addr, io->pos);
462 	}
463 
464 	assert(!ftl_addr_invalid(*addr));
465 
466 	/* Metadata has to be read in the way it's written (jumping across */
467 	/* the zones in xfer_size increments) */
468 	if (io->flags & FTL_IO_MD) {
469 		max_blocks = dev->xfer_size - (addr->offset % dev->xfer_size);
470 		num_blocks = spdk_min(ftl_io_iovec_len_left(io), max_blocks);
471 		assert(addr->offset / dev->xfer_size ==
472 		       (addr->offset + num_blocks - 1) / dev->xfer_size);
473 	} else {
474 		num_blocks = ftl_io_iovec_len_left(io);
475 	}
476 
477 	return num_blocks;
478 }
479 
480 static int
481 ftl_wptr_close_band(struct ftl_wptr *wptr)
482 {
483 	struct ftl_band *band = wptr->band;
484 
485 	ftl_band_set_state(band, FTL_BAND_STATE_CLOSING);
486 
487 	return ftl_band_write_tail_md(band, ftl_md_write_cb);
488 }
489 
490 static int
491 ftl_wptr_open_band(struct ftl_wptr *wptr)
492 {
493 	struct ftl_band *band = wptr->band;
494 
495 	assert(ftl_band_zone_is_first(band, wptr->zone));
496 	assert(band->lba_map.num_vld == 0);
497 
498 	ftl_band_clear_lba_map(band);
499 
500 	assert(band->state == FTL_BAND_STATE_PREP);
501 	ftl_band_set_state(band, FTL_BAND_STATE_OPENING);
502 
503 	return ftl_band_write_head_md(band, ftl_md_write_cb);
504 }
505 
506 static int
507 ftl_submit_erase(struct ftl_io *io)
508 {
509 	struct spdk_ftl_dev *dev = io->dev;
510 	struct ftl_band *band = io->band;
511 	struct ftl_addr addr = io->addr;
512 	struct ftl_io_channel *ioch;
513 	struct ftl_zone *zone;
514 	int rc = 0;
515 	size_t i;
516 
517 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
518 
519 	for (i = 0; i < io->num_blocks; ++i) {
520 		if (i != 0) {
521 			zone = ftl_band_next_zone(band, ftl_band_zone_from_addr(band, addr));
522 			assert(zone->info.state == SPDK_BDEV_ZONE_STATE_FULL);
523 			addr.offset = zone->info.zone_id;
524 		}
525 
526 		assert(ftl_addr_get_zone_offset(dev, addr) == 0);
527 
528 		ftl_trace_submission(dev, io, addr, 1);
529 		rc = spdk_bdev_zone_management(dev->base_bdev_desc, ioch->base_ioch, addr.offset,
530 					       SPDK_BDEV_ZONE_RESET, ftl_io_cmpl_cb, io);
531 		if (spdk_unlikely(rc)) {
532 			ftl_io_fail(io, rc);
533 			SPDK_ERRLOG("Vector reset failed with status: %d\n", rc);
534 			break;
535 		}
536 
537 		ftl_io_inc_req(io);
538 		ftl_io_advance(io, 1);
539 	}
540 
541 	if (ftl_io_done(io)) {
542 		ftl_io_complete(io);
543 	}
544 
545 	return rc;
546 }
547 
548 static bool
549 ftl_check_core_thread(const struct spdk_ftl_dev *dev)
550 {
551 	return dev->core_thread == spdk_get_thread();
552 }
553 
554 struct spdk_io_channel *
555 ftl_get_io_channel(const struct spdk_ftl_dev *dev)
556 {
557 	if (ftl_check_core_thread(dev)) {
558 		return dev->ioch;
559 	}
560 
561 	return NULL;
562 }
563 
564 static void
565 ftl_erase_fail(struct ftl_io *io, int status)
566 {
567 	struct ftl_zone *zone;
568 	struct ftl_band *band = io->band;
569 	char buf[128];
570 
571 	SPDK_ERRLOG("Erase failed at address: %s, status: %d\n",
572 		    ftl_addr2str(io->addr, buf, sizeof(buf)), status);
573 
574 	zone = ftl_band_zone_from_addr(band, io->addr);
575 	zone->info.state = SPDK_BDEV_ZONE_STATE_OFFLINE;
576 	ftl_band_remove_zone(band, zone);
577 	band->tail_md_addr = ftl_band_tail_md_addr(band);
578 }
579 
580 static void
581 ftl_zone_erase_cb(struct ftl_io *io, void *ctx, int status)
582 {
583 	struct ftl_zone *zone;
584 
585 	zone = ftl_band_zone_from_addr(io->band, io->addr);
586 	zone->busy = false;
587 
588 	if (spdk_unlikely(status)) {
589 		ftl_erase_fail(io, status);
590 		return;
591 	}
592 
593 	zone->info.state = SPDK_BDEV_ZONE_STATE_EMPTY;
594 	zone->info.write_pointer = zone->info.zone_id;
595 }
596 
597 static int
598 ftl_band_erase(struct ftl_band *band)
599 {
600 	struct ftl_zone *zone;
601 	struct ftl_io *io;
602 	int rc = 0;
603 
604 	assert(band->state == FTL_BAND_STATE_CLOSED ||
605 	       band->state == FTL_BAND_STATE_FREE);
606 
607 	ftl_band_set_state(band, FTL_BAND_STATE_PREP);
608 
609 	CIRCLEQ_FOREACH(zone, &band->zones, circleq) {
610 		if (zone->info.state == SPDK_BDEV_ZONE_STATE_EMPTY) {
611 			continue;
612 		}
613 
614 		io = ftl_io_erase_init(band, 1, ftl_zone_erase_cb);
615 		if (!io) {
616 			rc = -ENOMEM;
617 			break;
618 		}
619 
620 		zone->busy = true;
621 		io->addr.offset = zone->info.zone_id;
622 		rc = ftl_submit_erase(io);
623 		if (rc) {
624 			zone->busy = false;
625 			assert(0);
626 			/* TODO: change band's state back to close? */
627 			break;
628 		}
629 	}
630 
631 	return rc;
632 }
633 
634 static struct ftl_band *
635 ftl_next_write_band(struct spdk_ftl_dev *dev)
636 {
637 	struct ftl_band *band;
638 
639 	/* Find a free band that has all of its data moved onto other closed bands */
640 	LIST_FOREACH(band, &dev->free_bands, list_entry) {
641 		assert(band->state == FTL_BAND_STATE_FREE);
642 		if (band->num_reloc_bands == 0 && band->num_reloc_blocks == 0) {
643 			break;
644 		}
645 	}
646 
647 	if (spdk_unlikely(!band)) {
648 		return NULL;
649 	}
650 
651 	if (ftl_band_erase(band)) {
652 		/* TODO: handle erase failure */
653 		return NULL;
654 	}
655 
656 	return band;
657 }
658 
659 static struct ftl_band *
660 ftl_next_wptr_band(struct spdk_ftl_dev *dev)
661 {
662 	struct ftl_band *band;
663 
664 	if (!dev->next_band) {
665 		band = ftl_next_write_band(dev);
666 	} else {
667 		assert(dev->next_band->state == FTL_BAND_STATE_PREP);
668 		band = dev->next_band;
669 		dev->next_band = NULL;
670 	}
671 
672 	return band;
673 }
674 
675 static struct ftl_wptr *
676 ftl_wptr_init(struct ftl_band *band)
677 {
678 	struct spdk_ftl_dev *dev = band->dev;
679 	struct ftl_wptr *wptr;
680 
681 	wptr = calloc(1, sizeof(*wptr));
682 	if (!wptr) {
683 		return NULL;
684 	}
685 
686 	wptr->dev = dev;
687 	wptr->band = band;
688 	wptr->zone = CIRCLEQ_FIRST(&band->zones);
689 	wptr->addr.offset = wptr->zone->info.zone_id;
690 	TAILQ_INIT(&wptr->pending_queue);
691 
692 	return wptr;
693 }
694 
695 static int
696 ftl_add_direct_wptr(struct ftl_band *band)
697 {
698 	struct spdk_ftl_dev *dev = band->dev;
699 	struct ftl_wptr *wptr;
700 
701 	assert(band->state == FTL_BAND_STATE_OPEN);
702 
703 	wptr = ftl_wptr_init(band);
704 	if (!wptr) {
705 		return -1;
706 	}
707 
708 	wptr->direct_mode = true;
709 
710 	if (ftl_band_alloc_lba_map(band)) {
711 		ftl_wptr_free(wptr);
712 		return -1;
713 	}
714 
715 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
716 
717 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: direct band %u\n", band->id);
718 	ftl_trace_write_band(dev, band);
719 	return 0;
720 }
721 
722 static void
723 ftl_close_direct_wptr(struct ftl_band *band)
724 {
725 	struct ftl_wptr *wptr = ftl_wptr_from_band(band);
726 
727 	assert(wptr);
728 	assert(wptr->direct_mode);
729 	assert(band->state == FTL_BAND_STATE_CLOSED);
730 
731 	ftl_band_release_lba_map(band);
732 
733 	ftl_remove_wptr(wptr);
734 }
735 
736 int
737 ftl_band_set_direct_access(struct ftl_band *band, bool access)
738 {
739 	if (access) {
740 		return ftl_add_direct_wptr(band);
741 	} else {
742 		ftl_close_direct_wptr(band);
743 		return 0;
744 	}
745 }
746 
747 static int
748 ftl_add_wptr(struct spdk_ftl_dev *dev)
749 {
750 	struct ftl_band *band;
751 	struct ftl_wptr *wptr;
752 
753 	band = ftl_next_wptr_band(dev);
754 	if (!band) {
755 		return -1;
756 	}
757 
758 	wptr = ftl_wptr_init(band);
759 	if (!wptr) {
760 		return -1;
761 	}
762 
763 	if (ftl_band_write_prep(band)) {
764 		ftl_wptr_free(wptr);
765 		return -1;
766 	}
767 
768 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
769 
770 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: band %u\n", band->id);
771 	ftl_trace_write_band(dev, band);
772 	return 0;
773 }
774 
775 static void
776 ftl_wptr_advance(struct ftl_wptr *wptr, size_t xfer_size)
777 {
778 	struct ftl_band *band = wptr->band;
779 	struct spdk_ftl_dev *dev = wptr->dev;
780 	struct spdk_ftl_conf *conf = &dev->conf;
781 	size_t next_thld;
782 
783 	if (spdk_unlikely(wptr->direct_mode)) {
784 		return;
785 	}
786 
787 	wptr->offset += xfer_size;
788 	next_thld = (ftl_band_num_usable_blocks(band) * conf->band_thld) / 100;
789 
790 	if (ftl_band_full(band, wptr->offset)) {
791 		ftl_band_set_state(band, FTL_BAND_STATE_FULL);
792 	}
793 
794 	wptr->zone->busy = true;
795 	wptr->addr = ftl_band_next_xfer_addr(band, wptr->addr, xfer_size);
796 	wptr->zone = ftl_band_next_operational_zone(band, wptr->zone);
797 
798 	assert(!ftl_addr_invalid(wptr->addr));
799 
800 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: pu:%lu band:%lu, offset:%lu\n",
801 		      ftl_addr_get_punit(dev, wptr->addr),
802 		      ftl_addr_get_band(dev, wptr->addr),
803 		      wptr->addr.offset);
804 
805 	if (wptr->offset >= next_thld && !dev->next_band) {
806 		dev->next_band = ftl_next_write_band(dev);
807 	}
808 }
809 
810 static size_t
811 ftl_wptr_user_blocks_left(const struct ftl_wptr *wptr)
812 {
813 	return ftl_band_user_blocks_left(wptr->band, wptr->offset);
814 }
815 
816 static bool
817 ftl_wptr_ready(struct ftl_wptr *wptr)
818 {
819 	struct ftl_band *band = wptr->band;
820 
821 	/* TODO: add handling of empty bands */
822 
823 	if (spdk_unlikely(!ftl_zone_is_writable(wptr->dev, wptr->zone))) {
824 		/* Erasing band may fail after it was assigned to wptr. */
825 		if (spdk_unlikely(wptr->zone->info.state == SPDK_BDEV_ZONE_STATE_OFFLINE)) {
826 			ftl_wptr_advance(wptr, wptr->dev->xfer_size);
827 		}
828 		return false;
829 	}
830 
831 	/* If we're in the process of writing metadata, wait till it is */
832 	/* completed. */
833 	/* TODO: we should probably change bands once we're writing tail md */
834 	if (ftl_band_state_changing(band)) {
835 		return false;
836 	}
837 
838 	if (band->state == FTL_BAND_STATE_FULL) {
839 		if (wptr->num_outstanding == 0) {
840 			if (ftl_wptr_close_band(wptr)) {
841 				/* TODO: need recovery here */
842 				assert(false);
843 			}
844 		}
845 
846 		return false;
847 	}
848 
849 	if (band->state != FTL_BAND_STATE_OPEN) {
850 		if (ftl_wptr_open_band(wptr)) {
851 			/* TODO: need recovery here */
852 			assert(false);
853 		}
854 
855 		return false;
856 	}
857 
858 	return true;
859 }
860 
861 int
862 ftl_flush_active_bands(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
863 {
864 	struct ftl_wptr *wptr;
865 	struct ftl_band_flush *flush;
866 
867 	assert(ftl_get_core_thread(dev) == spdk_get_thread());
868 
869 	flush = calloc(1, sizeof(*flush));
870 	if (spdk_unlikely(!flush)) {
871 		return -ENOMEM;
872 	}
873 
874 	LIST_INSERT_HEAD(&dev->band_flush_list, flush, list_entry);
875 
876 	flush->cb_fn = cb_fn;
877 	flush->cb_arg = cb_arg;
878 	flush->dev = dev;
879 
880 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
881 		wptr->flush = true;
882 		flush->num_bands++;
883 	}
884 
885 	return 0;
886 }
887 
888 static const struct spdk_ftl_limit *
889 ftl_get_limit(const struct spdk_ftl_dev *dev, int type)
890 {
891 	assert(type < SPDK_FTL_LIMIT_MAX);
892 	return &dev->conf.limits[type];
893 }
894 
895 static bool
896 ftl_cache_lba_valid(struct spdk_ftl_dev *dev, struct ftl_wbuf_entry *entry)
897 {
898 	struct ftl_addr addr;
899 
900 	/* If the LBA is invalid don't bother checking the md and l2p */
901 	if (spdk_unlikely(entry->lba == FTL_LBA_INVALID)) {
902 		return false;
903 	}
904 
905 	addr = ftl_l2p_get(dev, entry->lba);
906 	if (!(ftl_addr_cached(addr) && entry == ftl_get_entry_from_addr(dev, addr))) {
907 		return false;
908 	}
909 
910 	return true;
911 }
912 
913 static void
914 ftl_evict_cache_entry(struct spdk_ftl_dev *dev, struct ftl_wbuf_entry *entry)
915 {
916 	pthread_spin_lock(&entry->lock);
917 
918 	if (!entry->valid) {
919 		goto unlock;
920 	}
921 
922 	/* If the l2p wasn't updated and still points at the entry, fill it with the */
923 	/* on-disk address and clear the cache status bit. Otherwise, skip the l2p update */
924 	/* and just clear the cache status. */
925 	if (!ftl_cache_lba_valid(dev, entry)) {
926 		goto clear;
927 	}
928 
929 	ftl_l2p_set(dev, entry->lba, entry->addr);
930 clear:
931 	entry->valid = false;
932 unlock:
933 	pthread_spin_unlock(&entry->lock);
934 }
935 
936 static void
937 ftl_pad_wbuf(struct spdk_ftl_dev *dev, size_t size)
938 {
939 	struct ftl_wbuf_entry *entry;
940 	struct ftl_io_channel *ioch;
941 	int flags = FTL_IO_PAD | FTL_IO_INTERNAL;
942 
943 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
944 
945 	for (size_t i = 0; i < size; ++i) {
946 		entry = ftl_acquire_wbuf_entry(ioch, flags);
947 		if (!entry) {
948 			break;
949 		}
950 
951 		entry->lba = FTL_LBA_INVALID;
952 		entry->addr = ftl_to_addr(FTL_ADDR_INVALID);
953 		memset(entry->payload, 0, FTL_BLOCK_SIZE);
954 
955 		spdk_ring_enqueue(ioch->submit_queue, (void **)&entry, 1, NULL);
956 	}
957 }
958 
959 static void
960 ftl_remove_free_bands(struct spdk_ftl_dev *dev)
961 {
962 	while (!LIST_EMPTY(&dev->free_bands)) {
963 		LIST_REMOVE(LIST_FIRST(&dev->free_bands), list_entry);
964 	}
965 
966 	dev->next_band = NULL;
967 }
968 
969 static void
970 ftl_wptr_pad_band(struct ftl_wptr *wptr)
971 {
972 	struct spdk_ftl_dev *dev = wptr->dev;
973 	struct ftl_batch *batch = dev->current_batch;
974 	struct ftl_io_channel *ioch;
975 	size_t size, pad_size, blocks_left;
976 
977 	size = batch != NULL ? batch->num_entries : 0;
978 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
979 		size += spdk_ring_count(ioch->submit_queue);
980 	}
981 
982 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
983 
984 	blocks_left = ftl_wptr_user_blocks_left(wptr);
985 	assert(size <= blocks_left);
986 	assert(blocks_left % dev->xfer_size == 0);
987 	pad_size = spdk_min(blocks_left - size, spdk_ring_count(ioch->free_queue));
988 
989 	ftl_pad_wbuf(dev, pad_size);
990 }
991 
992 static void
993 ftl_wptr_process_shutdown(struct ftl_wptr *wptr)
994 {
995 	struct spdk_ftl_dev *dev = wptr->dev;
996 	struct ftl_batch *batch = dev->current_batch;
997 	struct ftl_io_channel *ioch;
998 	size_t size;
999 
1000 	size = batch != NULL ? batch->num_entries : 0;
1001 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
1002 		size += spdk_ring_count(ioch->submit_queue);
1003 	}
1004 
1005 	if (size >= dev->xfer_size) {
1006 		return;
1007 	}
1008 
1009 	/* If we reach this point we need to remove free bands */
1010 	/* and pad current wptr band to the end */
1011 	ftl_remove_free_bands(dev);
1012 	ftl_wptr_pad_band(wptr);
1013 }
1014 
1015 static int
1016 ftl_shutdown_complete(struct spdk_ftl_dev *dev)
1017 {
1018 	struct ftl_io_channel *ioch = ftl_io_channel_get_ctx(dev->ioch);
1019 
1020 	return !__atomic_load_n(&dev->num_inflight, __ATOMIC_SEQ_CST) &&
1021 	       dev->num_io_channels == 1 && LIST_EMPTY(&dev->wptr_list) &&
1022 	       TAILQ_EMPTY(&ioch->retry_queue);
1023 }
1024 
1025 void
1026 ftl_apply_limits(struct spdk_ftl_dev *dev)
1027 {
1028 	const struct spdk_ftl_limit *limit;
1029 	struct ftl_io_channel *ioch;
1030 	struct ftl_stats *stats = &dev->stats;
1031 	uint32_t qdepth_limit = 100;
1032 	int i;
1033 
1034 	/* Clear existing limit */
1035 	dev->limit = SPDK_FTL_LIMIT_MAX;
1036 
1037 	for (i = SPDK_FTL_LIMIT_CRIT; i < SPDK_FTL_LIMIT_MAX; ++i) {
1038 		limit = ftl_get_limit(dev, i);
1039 
1040 		if (dev->num_free <= limit->thld) {
1041 			qdepth_limit = limit->limit;
1042 			stats->limits[i]++;
1043 			dev->limit = i;
1044 			break;
1045 		}
1046 	}
1047 
1048 	ftl_trace_limits(dev, dev->limit, dev->num_free);
1049 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
1050 		__atomic_store_n(&ioch->qdepth_limit, (qdepth_limit * ioch->num_entries) / 100,
1051 				 __ATOMIC_SEQ_CST);
1052 	}
1053 }
1054 
1055 static int
1056 ftl_invalidate_addr_unlocked(struct spdk_ftl_dev *dev, struct ftl_addr addr)
1057 {
1058 	struct ftl_band *band = ftl_band_from_addr(dev, addr);
1059 	struct ftl_lba_map *lba_map = &band->lba_map;
1060 	uint64_t offset;
1061 
1062 	offset = ftl_band_block_offset_from_addr(band, addr);
1063 
1064 	/* The bit might be already cleared if two writes are scheduled to the */
1065 	/* same LBA at the same time */
1066 	if (spdk_bit_array_get(lba_map->vld, offset)) {
1067 		assert(lba_map->num_vld > 0);
1068 		spdk_bit_array_clear(lba_map->vld, offset);
1069 		lba_map->num_vld--;
1070 		return 1;
1071 	}
1072 
1073 	return 0;
1074 }
1075 
1076 int
1077 ftl_invalidate_addr(struct spdk_ftl_dev *dev, struct ftl_addr addr)
1078 {
1079 	struct ftl_band *band;
1080 	int rc;
1081 
1082 	assert(!ftl_addr_cached(addr));
1083 	band = ftl_band_from_addr(dev, addr);
1084 
1085 	pthread_spin_lock(&band->lba_map.lock);
1086 	rc = ftl_invalidate_addr_unlocked(dev, addr);
1087 	pthread_spin_unlock(&band->lba_map.lock);
1088 
1089 	return rc;
1090 }
1091 
1092 static int
1093 ftl_read_retry(int rc)
1094 {
1095 	return rc == -EAGAIN;
1096 }
1097 
1098 static int
1099 ftl_read_canceled(int rc)
1100 {
1101 	return rc == -EFAULT || rc == 0;
1102 }
1103 
1104 static int
1105 ftl_cache_read(struct ftl_io *io, uint64_t lba,
1106 	       struct ftl_addr addr, void *buf)
1107 {
1108 	struct ftl_wbuf_entry *entry;
1109 	struct ftl_addr naddr;
1110 	int rc = 0;
1111 
1112 	entry = ftl_get_entry_from_addr(io->dev, addr);
1113 	pthread_spin_lock(&entry->lock);
1114 
1115 	naddr = ftl_l2p_get(io->dev, lba);
1116 	if (addr.offset != naddr.offset) {
1117 		rc = -1;
1118 		goto out;
1119 	}
1120 
1121 	memcpy(buf, entry->payload, FTL_BLOCK_SIZE);
1122 out:
1123 	pthread_spin_unlock(&entry->lock);
1124 	return rc;
1125 }
1126 
1127 static int
1128 ftl_read_next_logical_addr(struct ftl_io *io, struct ftl_addr *addr)
1129 {
1130 	struct spdk_ftl_dev *dev = io->dev;
1131 	struct ftl_addr next_addr;
1132 	size_t i;
1133 
1134 	*addr = ftl_l2p_get(dev, ftl_io_current_lba(io));
1135 
1136 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Read addr:%lx, lba:%lu\n",
1137 		      addr->offset, ftl_io_current_lba(io));
1138 
1139 	/* If the address is invalid, skip it (the buffer should already be zero'ed) */
1140 	if (ftl_addr_invalid(*addr)) {
1141 		return -EFAULT;
1142 	}
1143 
1144 	if (ftl_addr_cached(*addr)) {
1145 		if (!ftl_cache_read(io, ftl_io_current_lba(io), *addr, ftl_io_iovec_addr(io))) {
1146 			return 0;
1147 		}
1148 
1149 		/* If the state changed, we have to re-read the l2p */
1150 		return -EAGAIN;
1151 	}
1152 
1153 	for (i = 1; i < ftl_io_iovec_len_left(io); ++i) {
1154 		next_addr = ftl_l2p_get(dev, ftl_io_get_lba(io, io->pos + i));
1155 
1156 		if (ftl_addr_invalid(next_addr) || ftl_addr_cached(next_addr)) {
1157 			break;
1158 		}
1159 
1160 		if (addr->offset + i != next_addr.offset) {
1161 			break;
1162 		}
1163 	}
1164 
1165 	return i;
1166 }
1167 
1168 static int
1169 ftl_submit_read(struct ftl_io *io)
1170 {
1171 	struct spdk_ftl_dev *dev = io->dev;
1172 	struct ftl_io_channel *ioch;
1173 	struct ftl_addr addr;
1174 	int rc = 0, num_blocks;
1175 
1176 	ioch = ftl_io_channel_get_ctx(io->ioch);
1177 
1178 	assert(LIST_EMPTY(&io->children));
1179 
1180 	while (io->pos < io->num_blocks) {
1181 		if (ftl_io_mode_physical(io)) {
1182 			num_blocks = rc = ftl_read_next_physical_addr(io, &addr);
1183 		} else {
1184 			num_blocks = rc = ftl_read_next_logical_addr(io, &addr);
1185 		}
1186 
1187 		/* We might need to retry the read from scratch (e.g. */
1188 		/* because write was under way and completed before */
1189 		/* we could read it from the write buffer */
1190 		if (ftl_read_retry(rc)) {
1191 			continue;
1192 		}
1193 
1194 		/* We don't have to schedule the read, as it was read from cache */
1195 		if (ftl_read_canceled(rc)) {
1196 			ftl_io_advance(io, 1);
1197 			ftl_trace_completion(io->dev, io, rc ? FTL_TRACE_COMPLETION_INVALID :
1198 					     FTL_TRACE_COMPLETION_CACHE);
1199 			rc = 0;
1200 			continue;
1201 		}
1202 
1203 		assert(num_blocks > 0);
1204 
1205 		ftl_trace_submission(dev, io, addr, num_blocks);
1206 		rc = spdk_bdev_read_blocks(dev->base_bdev_desc, ioch->base_ioch,
1207 					   ftl_io_iovec_addr(io),
1208 					   addr.offset,
1209 					   num_blocks, ftl_io_cmpl_cb, io);
1210 		if (spdk_unlikely(rc)) {
1211 			if (rc == -ENOMEM) {
1212 				TAILQ_INSERT_TAIL(&ioch->retry_queue, io, ioch_entry);
1213 				rc = 0;
1214 			} else {
1215 				ftl_io_fail(io, rc);
1216 			}
1217 			break;
1218 		}
1219 
1220 		ftl_io_inc_req(io);
1221 		ftl_io_advance(io, num_blocks);
1222 	}
1223 
1224 	/* If we didn't have to read anything from the device, */
1225 	/* complete the request right away */
1226 	if (ftl_io_done(io)) {
1227 		ftl_io_complete(io);
1228 	}
1229 
1230 	return rc;
1231 }
1232 
1233 static void
1234 ftl_complete_flush(struct ftl_flush *flush)
1235 {
1236 	assert(flush->num_req == 0);
1237 	LIST_REMOVE(flush, list_entry);
1238 
1239 	flush->cb.fn(flush->cb.ctx, 0);
1240 
1241 	spdk_bit_array_free(&flush->bmap);
1242 	free(flush);
1243 }
1244 
1245 static void
1246 ftl_process_flush(struct spdk_ftl_dev *dev, struct ftl_batch *batch)
1247 {
1248 	struct ftl_flush *flush, *tflush;
1249 	size_t offset;
1250 
1251 	LIST_FOREACH_SAFE(flush, &dev->flush_list, list_entry, tflush) {
1252 		offset = batch->index;
1253 
1254 		if (spdk_bit_array_get(flush->bmap, offset)) {
1255 			spdk_bit_array_clear(flush->bmap, offset);
1256 			if (!(--flush->num_req)) {
1257 				ftl_complete_flush(flush);
1258 			}
1259 		}
1260 	}
1261 }
1262 
1263 static void
1264 ftl_nv_cache_wrap_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1265 {
1266 	struct ftl_nv_cache *nv_cache = cb_arg;
1267 
1268 	if (!success) {
1269 		SPDK_ERRLOG("Unable to write non-volatile cache metadata header\n");
1270 		/* TODO: go into read-only mode */
1271 		assert(0);
1272 	}
1273 
1274 	pthread_spin_lock(&nv_cache->lock);
1275 	nv_cache->ready = true;
1276 	pthread_spin_unlock(&nv_cache->lock);
1277 
1278 	spdk_bdev_free_io(bdev_io);
1279 }
1280 
1281 static void
1282 ftl_nv_cache_wrap(void *ctx)
1283 {
1284 	struct ftl_nv_cache *nv_cache = ctx;
1285 	int rc;
1286 
1287 	rc = ftl_nv_cache_write_header(nv_cache, false, ftl_nv_cache_wrap_cb, nv_cache);
1288 	if (spdk_unlikely(rc != 0)) {
1289 		SPDK_ERRLOG("Unable to write non-volatile cache metadata header: %s\n",
1290 			    spdk_strerror(-rc));
1291 		/* TODO: go into read-only mode */
1292 		assert(0);
1293 	}
1294 }
1295 
1296 static uint64_t
1297 ftl_reserve_nv_cache(struct ftl_nv_cache *nv_cache, size_t *num_blocks, unsigned int *phase)
1298 {
1299 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1300 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1301 	uint64_t num_available, cache_size, cache_addr = FTL_LBA_INVALID;
1302 
1303 	cache_size = spdk_bdev_get_num_blocks(bdev);
1304 
1305 	pthread_spin_lock(&nv_cache->lock);
1306 	if (spdk_unlikely(nv_cache->num_available == 0 || !nv_cache->ready)) {
1307 		goto out;
1308 	}
1309 
1310 	num_available = spdk_min(nv_cache->num_available, *num_blocks);
1311 	num_available = spdk_min(num_available, dev->conf.nv_cache.max_request_cnt);
1312 
1313 	if (spdk_unlikely(nv_cache->current_addr + num_available > cache_size)) {
1314 		*num_blocks = cache_size - nv_cache->current_addr;
1315 	} else {
1316 		*num_blocks = num_available;
1317 	}
1318 
1319 	cache_addr = nv_cache->current_addr;
1320 	nv_cache->current_addr += *num_blocks;
1321 	nv_cache->num_available -= *num_blocks;
1322 	*phase = nv_cache->phase;
1323 
1324 	if (nv_cache->current_addr == spdk_bdev_get_num_blocks(bdev)) {
1325 		nv_cache->current_addr = FTL_NV_CACHE_DATA_OFFSET;
1326 		nv_cache->phase = ftl_nv_cache_next_phase(nv_cache->phase);
1327 		nv_cache->ready = false;
1328 		spdk_thread_send_msg(ftl_get_core_thread(dev), ftl_nv_cache_wrap, nv_cache);
1329 	}
1330 out:
1331 	pthread_spin_unlock(&nv_cache->lock);
1332 	return cache_addr;
1333 }
1334 
1335 static struct ftl_io *
1336 ftl_alloc_io_nv_cache(struct ftl_io *parent, size_t num_blocks)
1337 {
1338 	struct ftl_io_init_opts opts = {
1339 		.dev		= parent->dev,
1340 		.parent		= parent,
1341 		.iovcnt		= 0,
1342 		.num_blocks	= num_blocks,
1343 		.flags		= parent->flags | FTL_IO_CACHE,
1344 	};
1345 
1346 	return ftl_io_init_internal(&opts);
1347 }
1348 
1349 static void
1350 ftl_nv_cache_submit_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1351 {
1352 	struct ftl_io *io = cb_arg;
1353 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1354 
1355 	if (spdk_unlikely(!success)) {
1356 		SPDK_ERRLOG("Non-volatile cache write failed at %"PRIx64"\n", io->addr.offset);
1357 		io->status = -EIO;
1358 	}
1359 
1360 	ftl_io_dec_req(io);
1361 	if (ftl_io_done(io)) {
1362 		spdk_mempool_put(nv_cache->md_pool, io->md);
1363 		ftl_io_complete(io);
1364 	}
1365 
1366 	spdk_bdev_free_io(bdev_io);
1367 }
1368 
1369 static void
1370 ftl_submit_nv_cache(void *ctx)
1371 {
1372 	struct ftl_io *io = ctx;
1373 	struct spdk_ftl_dev *dev = io->dev;
1374 	struct spdk_thread *thread;
1375 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
1376 	struct ftl_io_channel *ioch;
1377 	int rc;
1378 
1379 	ioch = ftl_io_channel_get_ctx(io->ioch);
1380 	thread = spdk_io_channel_get_thread(io->ioch);
1381 
1382 	rc = spdk_bdev_write_blocks_with_md(nv_cache->bdev_desc, ioch->cache_ioch,
1383 					    ftl_io_iovec_addr(io), io->md, io->addr.offset,
1384 					    io->num_blocks, ftl_nv_cache_submit_cb, io);
1385 	if (rc == -ENOMEM) {
1386 		spdk_thread_send_msg(thread, ftl_submit_nv_cache, io);
1387 		return;
1388 	} else if (rc) {
1389 		SPDK_ERRLOG("Write to persistent cache failed: %s (%"PRIu64", %"PRIu64")\n",
1390 			    spdk_strerror(-rc), io->addr.offset, io->num_blocks);
1391 		spdk_mempool_put(nv_cache->md_pool, io->md);
1392 		io->status = -EIO;
1393 		ftl_io_complete(io);
1394 		return;
1395 	}
1396 
1397 	ftl_io_advance(io, io->num_blocks);
1398 	ftl_io_inc_req(io);
1399 }
1400 
1401 static void
1402 ftl_nv_cache_fill_md(struct ftl_io *io, unsigned int phase)
1403 {
1404 	struct spdk_bdev *bdev;
1405 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1406 	uint64_t block_off, lba;
1407 	void *md_buf = io->md;
1408 
1409 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1410 
1411 	for (block_off = 0; block_off < io->num_blocks; ++block_off) {
1412 		lba = ftl_nv_cache_pack_lba(ftl_io_get_lba(io, block_off), phase);
1413 		memcpy(md_buf, &lba, sizeof(lba));
1414 		md_buf += spdk_bdev_get_md_size(bdev);
1415 	}
1416 }
1417 
1418 static void
1419 _ftl_write_nv_cache(void *ctx)
1420 {
1421 	struct ftl_io *child, *io = ctx;
1422 	struct spdk_ftl_dev *dev = io->dev;
1423 	struct spdk_thread *thread;
1424 	unsigned int phase;
1425 	uint64_t num_blocks;
1426 
1427 	thread = spdk_io_channel_get_thread(io->ioch);
1428 
1429 	while (io->pos < io->num_blocks) {
1430 		num_blocks = ftl_io_iovec_len_left(io);
1431 
1432 		child = ftl_alloc_io_nv_cache(io, num_blocks);
1433 		if (spdk_unlikely(!child)) {
1434 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1435 			return;
1436 		}
1437 
1438 		child->md = spdk_mempool_get(dev->nv_cache.md_pool);
1439 		if (spdk_unlikely(!child->md)) {
1440 			ftl_io_free(child);
1441 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1442 			break;
1443 		}
1444 
1445 		/* Reserve area on the write buffer cache */
1446 		child->addr.offset = ftl_reserve_nv_cache(&dev->nv_cache, &num_blocks, &phase);
1447 		if (child->addr.offset == FTL_LBA_INVALID) {
1448 			spdk_mempool_put(dev->nv_cache.md_pool, child->md);
1449 			ftl_io_free(child);
1450 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1451 			break;
1452 		}
1453 
1454 		/* Shrink the IO if there isn't enough room in the cache to fill the whole iovec */
1455 		if (spdk_unlikely(num_blocks != ftl_io_iovec_len_left(io))) {
1456 			ftl_io_shrink_iovec(child, num_blocks);
1457 		}
1458 
1459 		ftl_nv_cache_fill_md(child, phase);
1460 		ftl_submit_nv_cache(child);
1461 	}
1462 
1463 	if (ftl_io_done(io)) {
1464 		ftl_io_complete(io);
1465 	}
1466 }
1467 
1468 static void
1469 ftl_write_nv_cache(struct ftl_io *parent)
1470 {
1471 	ftl_io_reset(parent);
1472 	parent->flags |= FTL_IO_CACHE;
1473 	_ftl_write_nv_cache(parent);
1474 }
1475 
1476 int
1477 ftl_nv_cache_write_header(struct ftl_nv_cache *nv_cache, bool shutdown,
1478 			  spdk_bdev_io_completion_cb cb_fn, void *cb_arg)
1479 {
1480 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1481 	struct ftl_nv_cache_header *hdr = nv_cache->dma_buf;
1482 	struct spdk_bdev *bdev;
1483 	struct ftl_io_channel *ioch;
1484 
1485 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1486 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
1487 
1488 	memset(hdr, 0, spdk_bdev_get_block_size(bdev));
1489 
1490 	hdr->phase = (uint8_t)nv_cache->phase;
1491 	hdr->size = spdk_bdev_get_num_blocks(bdev);
1492 	hdr->uuid = dev->uuid;
1493 	hdr->version = FTL_NV_CACHE_HEADER_VERSION;
1494 	hdr->current_addr = shutdown ? nv_cache->current_addr : FTL_LBA_INVALID;
1495 	hdr->checksum = spdk_crc32c_update(hdr, offsetof(struct ftl_nv_cache_header, checksum), 0);
1496 
1497 	return spdk_bdev_write_blocks(nv_cache->bdev_desc, ioch->cache_ioch, hdr, 0, 1,
1498 				      cb_fn, cb_arg);
1499 }
1500 
1501 int
1502 ftl_nv_cache_scrub(struct ftl_nv_cache *nv_cache, spdk_bdev_io_completion_cb cb_fn, void *cb_arg)
1503 {
1504 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1505 	struct ftl_io_channel *ioch;
1506 	struct spdk_bdev *bdev;
1507 
1508 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
1509 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1510 
1511 	return spdk_bdev_write_zeroes_blocks(nv_cache->bdev_desc, ioch->cache_ioch, 1,
1512 					     spdk_bdev_get_num_blocks(bdev) - 1,
1513 					     cb_fn, cb_arg);
1514 }
1515 
1516 static void
1517 ftl_write_fail(struct ftl_io *io, int status)
1518 {
1519 	struct ftl_batch *batch = io->batch;
1520 	struct spdk_ftl_dev *dev = io->dev;
1521 	struct ftl_wbuf_entry *entry;
1522 	struct ftl_band *band;
1523 	char buf[128];
1524 
1525 	entry = TAILQ_FIRST(&batch->entries);
1526 
1527 	band = ftl_band_from_addr(io->dev, entry->addr);
1528 	SPDK_ERRLOG("Write failed @addr: %s, status: %d\n",
1529 		    ftl_addr2str(entry->addr, buf, sizeof(buf)), status);
1530 
1531 	/* Close the band and, halt wptr and defrag */
1532 	ftl_halt_writes(dev, band);
1533 
1534 	TAILQ_FOREACH(entry, &batch->entries, tailq) {
1535 		/* Invalidate meta set by process_writes() */
1536 		ftl_invalidate_addr(dev, entry->addr);
1537 	}
1538 
1539 	/* Reset the batch back to the write buffer to resend it later */
1540 	TAILQ_INSERT_TAIL(&dev->pending_batches, batch, tailq);
1541 }
1542 
1543 static void
1544 ftl_write_cb(struct ftl_io *io, void *arg, int status)
1545 {
1546 	struct spdk_ftl_dev *dev = io->dev;
1547 	struct ftl_batch *batch = io->batch;
1548 	struct ftl_wbuf_entry *entry;
1549 	struct ftl_band *band;
1550 	struct ftl_addr prev_addr, addr = io->addr;
1551 
1552 	if (status) {
1553 		ftl_write_fail(io, status);
1554 		return;
1555 	}
1556 
1557 	assert(io->num_blocks == dev->xfer_size);
1558 	assert(!(io->flags & FTL_IO_MD));
1559 
1560 	TAILQ_FOREACH(entry, &batch->entries, tailq) {
1561 		band = entry->band;
1562 		if (!(entry->io_flags & FTL_IO_PAD)) {
1563 			/* Verify that the LBA is set for user blocks */
1564 			assert(entry->lba != FTL_LBA_INVALID);
1565 		}
1566 
1567 		if (band != NULL) {
1568 			assert(band->num_reloc_blocks > 0);
1569 			band->num_reloc_blocks--;
1570 		}
1571 
1572 		entry->addr = addr;
1573 		if (entry->lba != FTL_LBA_INVALID) {
1574 			pthread_spin_lock(&entry->lock);
1575 			prev_addr = ftl_l2p_get(dev, entry->lba);
1576 
1577 			/* If the l2p was updated in the meantime, don't update band's metadata */
1578 			if (ftl_addr_cached(prev_addr) &&
1579 			    entry == ftl_get_entry_from_addr(dev, prev_addr)) {
1580 				/* Setting entry's cache bit needs to be done after metadata */
1581 				/* within the band is updated to make sure that writes */
1582 				/* invalidating the entry clear the metadata as well */
1583 				ftl_band_set_addr(io->band, entry->lba, entry->addr);
1584 				entry->valid = true;
1585 			}
1586 			pthread_spin_unlock(&entry->lock);
1587 		}
1588 
1589 		SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write addr:%lu, lba:%lu\n",
1590 			      entry->addr.offset, entry->lba);
1591 
1592 		addr = ftl_band_next_addr(io->band, addr, 1);
1593 	}
1594 
1595 	ftl_process_flush(dev, batch);
1596 	ftl_release_batch(dev, batch);
1597 }
1598 
1599 static void
1600 ftl_update_stats(struct spdk_ftl_dev *dev, const struct ftl_wbuf_entry *entry)
1601 {
1602 	if (!(entry->io_flags & FTL_IO_INTERNAL)) {
1603 		dev->stats.write_user++;
1604 	}
1605 	dev->stats.write_total++;
1606 }
1607 
1608 static void
1609 ftl_update_l2p(struct spdk_ftl_dev *dev, const struct ftl_wbuf_entry *entry,
1610 	       struct ftl_addr addr)
1611 {
1612 	struct ftl_addr prev_addr;
1613 	struct ftl_wbuf_entry *prev;
1614 	struct ftl_band *band;
1615 	int valid;
1616 	bool io_weak = entry->io_flags & FTL_IO_WEAK;
1617 
1618 	prev_addr = ftl_l2p_get(dev, entry->lba);
1619 	if (ftl_addr_invalid(prev_addr)) {
1620 		ftl_l2p_set(dev, entry->lba, addr);
1621 		return;
1622 	}
1623 
1624 	if (ftl_addr_cached(prev_addr)) {
1625 		prev = ftl_get_entry_from_addr(dev, prev_addr);
1626 		pthread_spin_lock(&prev->lock);
1627 
1628 		/* Re-read the L2P under the lock to protect against updates */
1629 		/* to this LBA from other threads */
1630 		prev_addr = ftl_l2p_get(dev, entry->lba);
1631 
1632 		/* If the entry is no longer in cache, another write has been */
1633 		/* scheduled in the meantime, so we can return to evicted path */
1634 		if (!ftl_addr_cached(prev_addr)) {
1635 			pthread_spin_unlock(&prev->lock);
1636 			goto evicted;
1637 		}
1638 
1639 		/*
1640 		 * Relocating block could still reside in cache due to fact that write
1641 		 * buffers are independent for each IO channel and enough amount of data
1642 		 * (write unit size) must be collected before it will be submitted to lower
1643 		 * layer.
1644 		 * When previous entry wasn't overwritten invalidate old address and entry.
1645 		 * Otherwise skip relocating block.
1646 		 */
1647 		if (io_weak &&
1648 		    /* Check if prev_addr was updated in meantime */
1649 		    !(ftl_addr_cmp(prev_addr, ftl_get_addr_from_entry(prev)) &&
1650 		      /* Check if relocating address it the same as in previous entry */
1651 		      ftl_addr_cmp(prev->addr, entry->addr))) {
1652 			pthread_spin_unlock(&prev->lock);
1653 			return;
1654 		}
1655 
1656 		/*
1657 		 * If previous entry is part of cache and was written into disk remove
1658 		 * and invalidate it
1659 		 */
1660 		if (prev->valid) {
1661 			ftl_invalidate_addr(dev, prev->addr);
1662 			prev->valid = false;
1663 		}
1664 
1665 		ftl_l2p_set(dev, entry->lba, addr);
1666 		pthread_spin_unlock(&prev->lock);
1667 		return;
1668 	}
1669 
1670 evicted:
1671 	/*
1672 	 *  If the L2P's physical address is different than what we expected we don't need to
1673 	 *  do anything (someone's already overwritten our data).
1674 	 */
1675 	if (io_weak && !ftl_addr_cmp(prev_addr, entry->addr)) {
1676 		return;
1677 	}
1678 
1679 	/* Lock the band containing previous physical address. This assures atomic changes to */
1680 	/* the L2P as wall as metadata. The valid bits in metadata are used to */
1681 	/* check weak writes validity. */
1682 	band = ftl_band_from_addr(dev, prev_addr);
1683 	pthread_spin_lock(&band->lba_map.lock);
1684 
1685 	valid = ftl_invalidate_addr_unlocked(dev, prev_addr);
1686 
1687 	/* If the address has been invalidated already, we don't want to update */
1688 	/* the L2P for weak writes, as it means the write is no longer valid. */
1689 	if (!io_weak || valid) {
1690 		ftl_l2p_set(dev, entry->lba, addr);
1691 	}
1692 
1693 	pthread_spin_unlock(&band->lba_map.lock);
1694 }
1695 
1696 static struct ftl_io *
1697 ftl_io_init_child_write(struct ftl_io *parent, struct ftl_addr addr, ftl_io_fn cb)
1698 {
1699 	struct ftl_io *io;
1700 	struct spdk_ftl_dev *dev = parent->dev;
1701 	struct ftl_io_init_opts opts = {
1702 		.dev		= dev,
1703 		.io		= NULL,
1704 		.parent		= parent,
1705 		.band		= parent->band,
1706 		.size		= sizeof(struct ftl_io),
1707 		.flags		= 0,
1708 		.type		= parent->type,
1709 		.num_blocks	= dev->xfer_size,
1710 		.cb_fn		= cb,
1711 		.iovcnt		= 0,
1712 	};
1713 
1714 	io = ftl_io_init_internal(&opts);
1715 	if (!io) {
1716 		return NULL;
1717 	}
1718 
1719 	io->addr = addr;
1720 
1721 	return io;
1722 }
1723 
1724 static void
1725 ftl_io_child_write_cb(struct ftl_io *io, void *ctx, int status)
1726 {
1727 	struct ftl_zone *zone;
1728 	struct ftl_wptr *wptr;
1729 
1730 	zone = ftl_band_zone_from_addr(io->band, io->addr);
1731 	wptr = ftl_wptr_from_band(io->band);
1732 
1733 	zone->busy = false;
1734 	zone->info.write_pointer += io->num_blocks;
1735 
1736 	if (zone->info.write_pointer == zone->info.zone_id + zone->info.capacity) {
1737 		zone->info.state = SPDK_BDEV_ZONE_STATE_FULL;
1738 	}
1739 
1740 	/* If some other write on the same band failed the write pointer would already be freed */
1741 	if (spdk_likely(wptr)) {
1742 		wptr->num_outstanding--;
1743 	}
1744 }
1745 
1746 static int
1747 ftl_submit_child_write(struct ftl_wptr *wptr, struct ftl_io *io)
1748 {
1749 	struct spdk_ftl_dev	*dev = io->dev;
1750 	struct ftl_io_channel	*ioch;
1751 	struct ftl_io		*child;
1752 	struct ftl_addr		addr;
1753 	int			rc;
1754 
1755 	ioch = ftl_io_channel_get_ctx(io->ioch);
1756 
1757 	if (spdk_likely(!wptr->direct_mode)) {
1758 		addr = wptr->addr;
1759 	} else {
1760 		assert(io->flags & FTL_IO_DIRECT_ACCESS);
1761 		assert(ftl_addr_get_band(dev, io->addr) == wptr->band->id);
1762 		addr = io->addr;
1763 	}
1764 
1765 	/* Split IO to child requests and release zone immediately after child is completed */
1766 	child = ftl_io_init_child_write(io, addr, ftl_io_child_write_cb);
1767 	if (!child) {
1768 		return -EAGAIN;
1769 	}
1770 
1771 	wptr->num_outstanding++;
1772 
1773 	if (ftl_is_append_supported(dev)) {
1774 		rc = spdk_bdev_zone_appendv(dev->base_bdev_desc, ioch->base_ioch,
1775 					    child->iov, child->iov_cnt,
1776 					    ftl_addr_get_zone_slba(dev, addr),
1777 					    dev->xfer_size, ftl_io_cmpl_cb, child);
1778 	} else {
1779 		rc = spdk_bdev_writev_blocks(dev->base_bdev_desc, ioch->base_ioch,
1780 					     child->iov, child->iov_cnt, addr.offset,
1781 					     dev->xfer_size, ftl_io_cmpl_cb, child);
1782 	}
1783 
1784 	if (rc) {
1785 		wptr->num_outstanding--;
1786 		ftl_io_fail(child, rc);
1787 		ftl_io_complete(child);
1788 		SPDK_ERRLOG("spdk_bdev_write_blocks_with_md failed with status:%d, addr:%lu\n",
1789 			    rc, addr.offset);
1790 		return -EIO;
1791 	}
1792 
1793 	ftl_io_inc_req(child);
1794 	ftl_io_advance(child, dev->xfer_size);
1795 
1796 	return 0;
1797 }
1798 
1799 static int
1800 ftl_submit_write(struct ftl_wptr *wptr, struct ftl_io *io)
1801 {
1802 	struct spdk_ftl_dev	*dev = io->dev;
1803 	int			rc = 0;
1804 
1805 	assert(io->num_blocks % dev->xfer_size == 0);
1806 
1807 	while (io->iov_pos < io->iov_cnt) {
1808 		/* There are no guarantees of the order of completion of NVMe IO submission queue */
1809 		/* so wait until zone is not busy before submitting another write */
1810 		if (!ftl_is_append_supported(dev) && wptr->zone->busy) {
1811 			TAILQ_INSERT_TAIL(&wptr->pending_queue, io, ioch_entry);
1812 			rc = -EAGAIN;
1813 			break;
1814 		}
1815 
1816 		rc = ftl_submit_child_write(wptr, io);
1817 		if (spdk_unlikely(rc)) {
1818 			if (rc == -EAGAIN) {
1819 				TAILQ_INSERT_TAIL(&wptr->pending_queue, io, ioch_entry);
1820 			} else {
1821 				ftl_io_fail(io, rc);
1822 			}
1823 			break;
1824 		}
1825 
1826 		ftl_trace_submission(dev, io, wptr->addr, dev->xfer_size);
1827 		ftl_wptr_advance(wptr, dev->xfer_size);
1828 	}
1829 
1830 	if (ftl_io_done(io)) {
1831 		/* Parent IO will complete after all children are completed */
1832 		ftl_io_complete(io);
1833 	}
1834 
1835 	return rc;
1836 }
1837 
1838 static void
1839 ftl_flush_pad_batch(struct spdk_ftl_dev *dev)
1840 {
1841 	struct ftl_batch *batch = dev->current_batch;
1842 	struct ftl_io_channel *ioch;
1843 	size_t size = 0, num_entries = 0;
1844 
1845 	assert(batch != NULL);
1846 	assert(batch->num_entries < dev->xfer_size);
1847 
1848 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
1849 		size += spdk_ring_count(ioch->submit_queue);
1850 	}
1851 
1852 	num_entries = dev->xfer_size - batch->num_entries;
1853 	if (size < num_entries) {
1854 		ftl_pad_wbuf(dev, num_entries - size);
1855 	}
1856 }
1857 
1858 static bool
1859 ftl_check_io_channel_flush(struct spdk_ftl_dev *dev)
1860 {
1861 	struct ftl_io_channel *ioch;
1862 
1863 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
1864 		if (ioch->flush && spdk_ring_count(ioch->free_queue) != ioch->num_entries) {
1865 			return true;
1866 		}
1867 	}
1868 
1869 	return false;
1870 }
1871 
1872 static int
1873 ftl_wptr_process_writes(struct ftl_wptr *wptr)
1874 {
1875 	struct spdk_ftl_dev	*dev = wptr->dev;
1876 	struct ftl_batch	*batch;
1877 	struct ftl_wbuf_entry	*entry;
1878 	struct ftl_io		*io;
1879 
1880 	if (spdk_unlikely(!TAILQ_EMPTY(&wptr->pending_queue))) {
1881 		io = TAILQ_FIRST(&wptr->pending_queue);
1882 		TAILQ_REMOVE(&wptr->pending_queue, io, ioch_entry);
1883 
1884 		if (ftl_submit_write(wptr, io) == -EAGAIN) {
1885 			return 0;
1886 		}
1887 	}
1888 
1889 	/* Make sure the band is prepared for writing */
1890 	if (!ftl_wptr_ready(wptr)) {
1891 		return 0;
1892 	}
1893 
1894 	if (dev->halt) {
1895 		ftl_wptr_process_shutdown(wptr);
1896 	}
1897 
1898 	if (spdk_unlikely(wptr->flush)) {
1899 		ftl_wptr_pad_band(wptr);
1900 	}
1901 
1902 	batch = ftl_get_next_batch(dev);
1903 	if (!batch) {
1904 		/* If there are queued flush requests we need to pad the write buffer to */
1905 		/* force out remaining entries */
1906 		if (!LIST_EMPTY(&dev->flush_list) || ftl_check_io_channel_flush(dev)) {
1907 			ftl_flush_pad_batch(dev);
1908 		}
1909 
1910 		return 0;
1911 	}
1912 
1913 	io = ftl_io_wbuf_init(dev, wptr->addr, wptr->band, batch, ftl_write_cb);
1914 	if (!io) {
1915 		goto error;
1916 	}
1917 
1918 	TAILQ_FOREACH(entry, &batch->entries, tailq) {
1919 		/* Update band's relocation stats if the IO comes from reloc */
1920 		if (entry->io_flags & FTL_IO_WEAK) {
1921 			if (!spdk_bit_array_get(wptr->band->reloc_bitmap, entry->band->id)) {
1922 				spdk_bit_array_set(wptr->band->reloc_bitmap, entry->band->id);
1923 				entry->band->num_reloc_bands++;
1924 			}
1925 		}
1926 
1927 		ftl_trace_wbuf_pop(dev, entry);
1928 		ftl_update_stats(dev, entry);
1929 	}
1930 
1931 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write addr:%lx\n", wptr->addr.offset);
1932 
1933 	if (ftl_submit_write(wptr, io)) {
1934 		/* TODO: we need some recovery here */
1935 		assert(0 && "Write submit failed");
1936 		if (ftl_io_done(io)) {
1937 			ftl_io_free(io);
1938 		}
1939 	}
1940 
1941 	return dev->xfer_size;
1942 error:
1943 	TAILQ_INSERT_TAIL(&dev->pending_batches, batch, tailq);
1944 	return 0;
1945 }
1946 
1947 static int
1948 ftl_process_writes(struct spdk_ftl_dev *dev)
1949 {
1950 	struct ftl_wptr *wptr, *twptr;
1951 	size_t num_active = 0;
1952 	enum ftl_band_state state;
1953 
1954 	LIST_FOREACH_SAFE(wptr, &dev->wptr_list, list_entry, twptr) {
1955 		ftl_wptr_process_writes(wptr);
1956 		state = wptr->band->state;
1957 
1958 		if (state != FTL_BAND_STATE_FULL &&
1959 		    state != FTL_BAND_STATE_CLOSING &&
1960 		    state != FTL_BAND_STATE_CLOSED) {
1961 			num_active++;
1962 		}
1963 	}
1964 
1965 	if (num_active < 1) {
1966 		ftl_add_wptr(dev);
1967 	}
1968 
1969 	return 0;
1970 }
1971 
1972 static void
1973 ftl_fill_wbuf_entry(struct ftl_wbuf_entry *entry, struct ftl_io *io)
1974 {
1975 	memcpy(entry->payload, ftl_io_iovec_addr(io), FTL_BLOCK_SIZE);
1976 
1977 	if (entry->io_flags & FTL_IO_WEAK) {
1978 		entry->band = ftl_band_from_addr(io->dev, io->addr);
1979 		entry->addr = ftl_band_next_addr(entry->band, io->addr, io->pos);
1980 		entry->band->num_reloc_blocks++;
1981 	}
1982 
1983 	entry->trace = io->trace;
1984 	entry->lba = ftl_io_current_lba(io);
1985 }
1986 
1987 static int
1988 ftl_wbuf_fill(struct ftl_io *io)
1989 {
1990 	struct spdk_ftl_dev *dev = io->dev;
1991 	struct ftl_io_channel *ioch;
1992 	struct ftl_wbuf_entry *entry;
1993 
1994 	ioch = ftl_io_channel_get_ctx(io->ioch);
1995 
1996 	while (io->pos < io->num_blocks) {
1997 		if (ftl_io_current_lba(io) == FTL_LBA_INVALID) {
1998 			ftl_io_advance(io, 1);
1999 			continue;
2000 		}
2001 
2002 		entry = ftl_acquire_wbuf_entry(ioch, io->flags);
2003 		if (!entry) {
2004 			TAILQ_INSERT_TAIL(&ioch->retry_queue, io, ioch_entry);
2005 			return 0;
2006 		}
2007 
2008 		ftl_fill_wbuf_entry(entry, io);
2009 
2010 		ftl_trace_wbuf_fill(dev, io);
2011 		ftl_update_l2p(dev, entry, ftl_get_addr_from_entry(entry));
2012 		ftl_io_advance(io, 1);
2013 
2014 		/* Needs to be done after L2P is updated to avoid race with */
2015 		/* write completion callback when it's processed faster than */
2016 		/* L2P is set in update_l2p(). */
2017 		spdk_ring_enqueue(ioch->submit_queue, (void **)&entry, 1, NULL);
2018 	}
2019 
2020 	if (ftl_io_done(io)) {
2021 		if (ftl_dev_has_nv_cache(dev) && !(io->flags & FTL_IO_BYPASS_CACHE)) {
2022 			ftl_write_nv_cache(io);
2023 		} else {
2024 			TAILQ_INSERT_TAIL(&ioch->write_cmpl_queue, io, ioch_entry);
2025 		}
2026 	}
2027 
2028 	return 0;
2029 }
2030 
2031 static bool
2032 ftl_dev_needs_defrag(struct spdk_ftl_dev *dev)
2033 {
2034 	const struct spdk_ftl_limit *limit = ftl_get_limit(dev, SPDK_FTL_LIMIT_START);
2035 
2036 	if (ftl_reloc_is_halted(dev->reloc)) {
2037 		return false;
2038 	}
2039 
2040 	if (ftl_reloc_is_defrag_active(dev->reloc)) {
2041 		return false;
2042 	}
2043 
2044 	if (dev->num_free <= limit->thld) {
2045 		return true;
2046 	}
2047 
2048 	return false;
2049 }
2050 
2051 static double
2052 ftl_band_calc_merit(struct ftl_band *band, size_t *threshold_valid)
2053 {
2054 	size_t usable, valid, invalid;
2055 	double vld_ratio;
2056 
2057 	/* If the band doesn't have any usable blocks it's of no use */
2058 	usable = ftl_band_num_usable_blocks(band);
2059 	if (usable == 0) {
2060 		return 0.0;
2061 	}
2062 
2063 	valid =  threshold_valid ? (usable - *threshold_valid) : band->lba_map.num_vld;
2064 	invalid = usable - valid;
2065 
2066 	/* Add one to avoid division by 0 */
2067 	vld_ratio = (double)invalid / (double)(valid + 1);
2068 	return vld_ratio * ftl_band_age(band);
2069 }
2070 
2071 static bool
2072 ftl_band_needs_defrag(struct ftl_band *band, struct spdk_ftl_dev *dev)
2073 {
2074 	struct spdk_ftl_conf *conf = &dev->conf;
2075 	size_t thld_vld;
2076 
2077 	/* If we're in dire need of free bands, every band is worth defragging */
2078 	if (ftl_current_limit(dev) == SPDK_FTL_LIMIT_CRIT) {
2079 		return true;
2080 	}
2081 
2082 	thld_vld = (ftl_band_num_usable_blocks(band) * conf->invalid_thld) / 100;
2083 
2084 	return band->merit > ftl_band_calc_merit(band, &thld_vld);
2085 }
2086 
2087 static struct ftl_band *
2088 ftl_select_defrag_band(struct spdk_ftl_dev *dev)
2089 {
2090 	struct ftl_band *band, *mband = NULL;
2091 	double merit = 0;
2092 
2093 	LIST_FOREACH(band, &dev->shut_bands, list_entry) {
2094 		assert(band->state == FTL_BAND_STATE_CLOSED);
2095 		band->merit = ftl_band_calc_merit(band, NULL);
2096 		if (band->merit > merit) {
2097 			merit = band->merit;
2098 			mband = band;
2099 		}
2100 	}
2101 
2102 	if (mband && !ftl_band_needs_defrag(mband, dev)) {
2103 		mband = NULL;
2104 	}
2105 
2106 	return mband;
2107 }
2108 
2109 static void
2110 ftl_process_relocs(struct spdk_ftl_dev *dev)
2111 {
2112 	struct ftl_band *band;
2113 
2114 	if (ftl_dev_needs_defrag(dev)) {
2115 		band = ftl_select_defrag_band(dev);
2116 		if (band) {
2117 			ftl_reloc_add(dev->reloc, band, 0, ftl_get_num_blocks_in_band(dev), 0, true);
2118 			ftl_trace_defrag_band(dev, band);
2119 		}
2120 	}
2121 
2122 	ftl_reloc(dev->reloc);
2123 }
2124 
2125 int
2126 ftl_current_limit(const struct spdk_ftl_dev *dev)
2127 {
2128 	return dev->limit;
2129 }
2130 
2131 void
2132 spdk_ftl_dev_get_attrs(const struct spdk_ftl_dev *dev, struct spdk_ftl_attrs *attrs)
2133 {
2134 	attrs->uuid = dev->uuid;
2135 	attrs->num_blocks = dev->num_lbas;
2136 	attrs->block_size = FTL_BLOCK_SIZE;
2137 	attrs->num_zones = ftl_get_num_zones(dev);
2138 	attrs->zone_size = ftl_get_num_blocks_in_zone(dev);
2139 	attrs->conf = dev->conf;
2140 	attrs->base_bdev = spdk_bdev_get_name(spdk_bdev_desc_get_bdev(dev->base_bdev_desc));
2141 
2142 	attrs->cache_bdev = NULL;
2143 	if (dev->nv_cache.bdev_desc) {
2144 		attrs->cache_bdev = spdk_bdev_get_name(
2145 					    spdk_bdev_desc_get_bdev(dev->nv_cache.bdev_desc));
2146 	}
2147 }
2148 
2149 static void
2150 _ftl_io_write(void *ctx)
2151 {
2152 	ftl_io_write((struct ftl_io *)ctx);
2153 }
2154 
2155 static int
2156 ftl_submit_write_leaf(struct ftl_io *io)
2157 {
2158 	int rc;
2159 
2160 	rc = ftl_submit_write(ftl_wptr_from_band(io->band), io);
2161 	if (rc == -EAGAIN) {
2162 		/* EAGAIN means that the request was put on the pending queue */
2163 		return 0;
2164 	}
2165 
2166 	return rc;
2167 }
2168 
2169 void
2170 ftl_io_write(struct ftl_io *io)
2171 {
2172 	struct spdk_ftl_dev *dev = io->dev;
2173 	struct ftl_io_channel *ioch = ftl_io_channel_get_ctx(io->ioch);
2174 
2175 	/* Put the IO on retry queue in case IO channel is not initialized */
2176 	if (spdk_unlikely(ioch->index == FTL_IO_CHANNEL_INDEX_INVALID)) {
2177 		TAILQ_INSERT_TAIL(&ioch->retry_queue, io, ioch_entry);
2178 		return;
2179 	}
2180 
2181 	/* For normal IOs we just need to copy the data onto the write buffer */
2182 	if (!(io->flags & FTL_IO_MD)) {
2183 		ftl_io_call_foreach_child(io, ftl_wbuf_fill);
2184 	} else {
2185 		/* Metadata has its own buffer, so it doesn't have to be copied, so just */
2186 		/* send it the the core thread and schedule the write immediately */
2187 		if (ftl_check_core_thread(dev)) {
2188 			ftl_io_call_foreach_child(io, ftl_submit_write_leaf);
2189 		} else {
2190 			spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_write, io);
2191 		}
2192 	}
2193 }
2194 
2195 int
2196 spdk_ftl_write(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
2197 	       struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
2198 {
2199 	struct ftl_io *io;
2200 
2201 	if (iov_cnt == 0) {
2202 		return -EINVAL;
2203 	}
2204 
2205 	if (lba_cnt == 0) {
2206 		return -EINVAL;
2207 	}
2208 
2209 	if (lba_cnt != ftl_iovec_num_blocks(iov, iov_cnt)) {
2210 		return -EINVAL;
2211 	}
2212 
2213 	if (!dev->initialized) {
2214 		return -EBUSY;
2215 	}
2216 
2217 	io = ftl_io_user_init(ch, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_WRITE);
2218 	if (!io) {
2219 		return -ENOMEM;
2220 	}
2221 
2222 	ftl_io_write(io);
2223 
2224 	return 0;
2225 }
2226 
2227 void
2228 ftl_io_read(struct ftl_io *io)
2229 {
2230 	ftl_io_call_foreach_child(io, ftl_submit_read);
2231 }
2232 
2233 int
2234 spdk_ftl_read(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
2235 	      struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
2236 {
2237 	struct ftl_io *io;
2238 
2239 	if (iov_cnt == 0) {
2240 		return -EINVAL;
2241 	}
2242 
2243 	if (lba_cnt == 0) {
2244 		return -EINVAL;
2245 	}
2246 
2247 	if (lba_cnt != ftl_iovec_num_blocks(iov, iov_cnt)) {
2248 		return -EINVAL;
2249 	}
2250 
2251 	if (!dev->initialized) {
2252 		return -EBUSY;
2253 	}
2254 
2255 	io = ftl_io_user_init(ch, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_READ);
2256 	if (!io) {
2257 		return -ENOMEM;
2258 	}
2259 
2260 	ftl_io_read(io);
2261 	return 0;
2262 }
2263 
2264 static struct ftl_flush *
2265 ftl_flush_init(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2266 {
2267 	struct ftl_flush *flush;
2268 
2269 	flush = calloc(1, sizeof(*flush));
2270 	if (!flush) {
2271 		return NULL;
2272 	}
2273 
2274 	flush->bmap = spdk_bit_array_create(FTL_BATCH_COUNT);
2275 	if (!flush->bmap) {
2276 		goto error;
2277 	}
2278 
2279 	flush->dev = dev;
2280 	flush->cb.fn = cb_fn;
2281 	flush->cb.ctx = cb_arg;
2282 
2283 	return flush;
2284 error:
2285 	free(flush);
2286 	return NULL;
2287 }
2288 
2289 static void
2290 _ftl_flush(void *ctx)
2291 {
2292 	struct ftl_flush *flush = ctx;
2293 	struct spdk_ftl_dev *dev = flush->dev;
2294 	uint32_t i;
2295 
2296 	/* Attach flush object to all non-empty batches */
2297 	for (i = 0; i < FTL_BATCH_COUNT; ++i) {
2298 		if (dev->batch_array[i].num_entries > 0) {
2299 			spdk_bit_array_set(flush->bmap, i);
2300 			flush->num_req++;
2301 		}
2302 	}
2303 
2304 	LIST_INSERT_HEAD(&dev->flush_list, flush, list_entry);
2305 
2306 	/* If the write buffer was already empty, the flush can be completed right away */
2307 	if (!flush->num_req) {
2308 		ftl_complete_flush(flush);
2309 	}
2310 }
2311 
2312 int
2313 ftl_flush_wbuf(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2314 {
2315 	struct ftl_flush *flush;
2316 
2317 	flush = ftl_flush_init(dev, cb_fn, cb_arg);
2318 	if (!flush) {
2319 		return -ENOMEM;
2320 	}
2321 
2322 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_flush, flush);
2323 	return 0;
2324 }
2325 
2326 int
2327 spdk_ftl_flush(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2328 {
2329 	if (!dev->initialized) {
2330 		return -EBUSY;
2331 	}
2332 
2333 	return ftl_flush_wbuf(dev, cb_fn, cb_arg);
2334 }
2335 
2336 bool
2337 ftl_addr_is_written(struct ftl_band *band, struct ftl_addr addr)
2338 {
2339 	struct ftl_zone *zone = ftl_band_zone_from_addr(band, addr);
2340 
2341 	return addr.offset < zone->info.write_pointer;
2342 }
2343 
2344 static void ftl_process_media_event(struct spdk_ftl_dev *dev, struct spdk_bdev_media_event event);
2345 
2346 static void
2347 _ftl_process_media_event(void *ctx)
2348 {
2349 	struct ftl_media_event *event = ctx;
2350 	struct spdk_ftl_dev *dev = event->dev;
2351 
2352 	ftl_process_media_event(dev, event->event);
2353 	spdk_mempool_put(dev->media_events_pool, event);
2354 }
2355 
2356 static void
2357 ftl_process_media_event(struct spdk_ftl_dev *dev, struct spdk_bdev_media_event event)
2358 {
2359 	struct ftl_band *band;
2360 	struct ftl_addr addr = { .offset = event.offset };
2361 	size_t block_off;
2362 
2363 	if (!ftl_check_core_thread(dev)) {
2364 		struct ftl_media_event *media_event;
2365 
2366 		media_event = spdk_mempool_get(dev->media_events_pool);
2367 		if (!media_event) {
2368 			SPDK_ERRLOG("Media event lost due to lack of memory");
2369 			return;
2370 		}
2371 
2372 		media_event->dev = dev;
2373 		media_event->event = event;
2374 		spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_process_media_event,
2375 				     media_event);
2376 		return;
2377 	}
2378 
2379 	band = ftl_band_from_addr(dev, addr);
2380 	block_off = ftl_band_block_offset_from_addr(band, addr);
2381 
2382 	ftl_reloc_add(dev->reloc, band, block_off, event.num_blocks, 0, false);
2383 }
2384 
2385 void
2386 ftl_get_media_events(struct spdk_ftl_dev *dev)
2387 {
2388 #define FTL_MAX_MEDIA_EVENTS 128
2389 	struct spdk_bdev_media_event events[FTL_MAX_MEDIA_EVENTS];
2390 	size_t num_events, i;
2391 
2392 	if (!dev->initialized) {
2393 		return;
2394 	}
2395 
2396 	do {
2397 		num_events = spdk_bdev_get_media_events(dev->base_bdev_desc,
2398 							events, FTL_MAX_MEDIA_EVENTS);
2399 
2400 		for (i = 0; i < num_events; ++i) {
2401 			ftl_process_media_event(dev, events[i]);
2402 		}
2403 
2404 	} while (num_events);
2405 }
2406 
2407 int
2408 ftl_io_channel_poll(void *arg)
2409 {
2410 	struct ftl_io_channel *ch = arg;
2411 	struct ftl_io *io;
2412 	TAILQ_HEAD(, ftl_io) retry_queue;
2413 
2414 	if (TAILQ_EMPTY(&ch->write_cmpl_queue) && TAILQ_EMPTY(&ch->retry_queue)) {
2415 		return 0;
2416 	}
2417 
2418 	while (!TAILQ_EMPTY(&ch->write_cmpl_queue)) {
2419 		io = TAILQ_FIRST(&ch->write_cmpl_queue);
2420 		TAILQ_REMOVE(&ch->write_cmpl_queue, io, ioch_entry);
2421 		ftl_io_complete(io);
2422 	}
2423 
2424 	/*
2425 	 * Create local copy of the retry queue to prevent from infinite retrying if IO will be
2426 	 * inserted to the retry queue again
2427 	 */
2428 	TAILQ_INIT(&retry_queue);
2429 	TAILQ_SWAP(&ch->retry_queue, &retry_queue, ftl_io, ioch_entry);
2430 
2431 	while (!TAILQ_EMPTY(&retry_queue)) {
2432 		io = TAILQ_FIRST(&retry_queue);
2433 		TAILQ_REMOVE(&retry_queue, io, ioch_entry);
2434 		if (io->type == FTL_IO_WRITE) {
2435 			ftl_io_write(io);
2436 		} else {
2437 			ftl_io_read(io);
2438 		}
2439 	}
2440 
2441 	return 1;
2442 }
2443 
2444 int
2445 ftl_task_core(void *ctx)
2446 {
2447 	struct spdk_ftl_dev *dev = ctx;
2448 
2449 	if (dev->halt) {
2450 		if (ftl_shutdown_complete(dev)) {
2451 			spdk_poller_unregister(&dev->core_poller);
2452 			return 0;
2453 		}
2454 	}
2455 
2456 	ftl_process_writes(dev);
2457 	ftl_process_relocs(dev);
2458 
2459 	return 0;
2460 }
2461 
2462 SPDK_LOG_REGISTER_COMPONENT("ftl_core", SPDK_LOG_FTL_CORE)
2463