xref: /spdk/lib/ftl/ftl_core.c (revision 2f5c602574a98ede645991abe279a96e19c50196)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/likely.h"
35 #include "spdk/stdinc.h"
36 #include "spdk/nvme.h"
37 #include "spdk/thread.h"
38 #include "spdk/bdev_module.h"
39 #include "spdk/string.h"
40 #include "spdk/log.h"
41 #include "spdk/ftl.h"
42 #include "spdk/crc32.h"
43 
44 #include "ftl_core.h"
45 #include "ftl_band.h"
46 #include "ftl_io.h"
47 #include "ftl_debug.h"
48 #include "ftl_reloc.h"
49 
50 struct ftl_band_flush {
51 	struct spdk_ftl_dev		*dev;
52 	/* Number of bands left to be flushed */
53 	size_t				num_bands;
54 	/* User callback */
55 	spdk_ftl_fn			cb_fn;
56 	/* Callback's argument */
57 	void				*cb_arg;
58 	/* List link */
59 	LIST_ENTRY(ftl_band_flush)	list_entry;
60 };
61 
62 struct ftl_wptr {
63 	/* Owner device */
64 	struct spdk_ftl_dev		*dev;
65 
66 	/* Current address */
67 	struct ftl_addr			addr;
68 
69 	/* Band currently being written to */
70 	struct ftl_band			*band;
71 
72 	/* Current logical block's offset */
73 	uint64_t			offset;
74 
75 	/* Current zone */
76 	struct ftl_zone			*zone;
77 
78 	/* Pending IO queue */
79 	TAILQ_HEAD(, ftl_io)		pending_queue;
80 
81 	/* List link */
82 	LIST_ENTRY(ftl_wptr)		list_entry;
83 
84 	/*
85 	 * If setup in direct mode, there will be no offset or band state update after IO.
86 	 * The zoned bdev address is not assigned by wptr, and is instead taken directly
87 	 * from the request.
88 	 */
89 	bool				direct_mode;
90 
91 	/* Number of outstanding write requests */
92 	uint32_t			num_outstanding;
93 
94 	/* Marks that the band related to this wptr needs to be closed as soon as possible */
95 	bool				flush;
96 };
97 
98 struct ftl_flush {
99 	/* Owner device */
100 	struct spdk_ftl_dev		*dev;
101 
102 	/* Number of batches to wait for */
103 	size_t				num_req;
104 
105 	/* Callback */
106 	struct {
107 		spdk_ftl_fn		fn;
108 		void			*ctx;
109 	} cb;
110 
111 	/* Batch bitmap */
112 	struct spdk_bit_array		*bmap;
113 
114 	/* List link */
115 	LIST_ENTRY(ftl_flush)		list_entry;
116 };
117 
118 static void
119 ftl_wptr_free(struct ftl_wptr *wptr)
120 {
121 	if (!wptr) {
122 		return;
123 	}
124 
125 	free(wptr);
126 }
127 
128 static void
129 ftl_remove_wptr(struct ftl_wptr *wptr)
130 {
131 	struct spdk_ftl_dev *dev = wptr->dev;
132 	struct ftl_band_flush *flush, *tmp;
133 
134 	if (spdk_unlikely(wptr->flush)) {
135 		LIST_FOREACH_SAFE(flush, &dev->band_flush_list, list_entry, tmp) {
136 			assert(flush->num_bands > 0);
137 			if (--flush->num_bands == 0) {
138 				flush->cb_fn(flush->cb_arg, 0);
139 				LIST_REMOVE(flush, list_entry);
140 				free(flush);
141 			}
142 		}
143 	}
144 
145 	LIST_REMOVE(wptr, list_entry);
146 	ftl_wptr_free(wptr);
147 }
148 
149 static struct ftl_wbuf_entry *
150 ftl_acquire_wbuf_entry(struct ftl_io_channel *io_channel, int io_flags)
151 {
152 	struct ftl_wbuf_entry *entry = NULL;
153 	uint32_t qdepth;
154 
155 	if (!(io_flags & FTL_IO_INTERNAL)) {
156 		qdepth = __atomic_fetch_add(&io_channel->qdepth_current, 1, __ATOMIC_SEQ_CST);
157 		if (qdepth >= io_channel->qdepth_limit) {
158 			__atomic_fetch_sub(&io_channel->qdepth_current, 1, __ATOMIC_SEQ_CST);
159 			return NULL;
160 		}
161 	}
162 
163 	if (spdk_ring_dequeue(io_channel->free_queue, (void **)&entry, 1) != 1) {
164 		if (!(io_flags & FTL_IO_INTERNAL)) {
165 			__atomic_fetch_sub(&io_channel->qdepth_current, 1, __ATOMIC_SEQ_CST);
166 		}
167 
168 		return NULL;
169 	}
170 
171 	assert(entry != NULL);
172 
173 	ftl_evict_cache_entry(io_channel->dev, entry);
174 
175 	entry->io_flags = io_flags;
176 	entry->addr.offset = FTL_ADDR_INVALID;
177 	entry->lba = FTL_LBA_INVALID;
178 	entry->band = NULL;
179 	entry->valid = false;
180 
181 	return entry;
182 }
183 
184 static void
185 ftl_release_wbuf_entry(struct ftl_wbuf_entry *entry)
186 {
187 	struct ftl_io_channel *io_channel = entry->ioch;
188 
189 	if (!(entry->io_flags & FTL_IO_INTERNAL)) {
190 		__atomic_fetch_sub(&io_channel->qdepth_current, 1, __ATOMIC_SEQ_CST);
191 	}
192 
193 	spdk_ring_enqueue(io_channel->free_queue, (void **)&entry, 1, NULL);
194 }
195 
196 static struct ftl_batch *
197 ftl_get_next_batch(struct spdk_ftl_dev *dev)
198 {
199 	struct ftl_batch *batch = dev->current_batch;
200 	struct ftl_io_channel *ioch;
201 #define FTL_DEQUEUE_ENTRIES 128
202 	struct ftl_wbuf_entry *entries[FTL_DEQUEUE_ENTRIES];
203 	TAILQ_HEAD(, ftl_io_channel) ioch_queue;
204 	size_t i, num_dequeued, num_remaining;
205 	uint64_t *metadata;
206 
207 	if (batch == NULL) {
208 		batch = TAILQ_FIRST(&dev->pending_batches);
209 		if (batch != NULL) {
210 			TAILQ_REMOVE(&dev->pending_batches, batch, tailq);
211 			return batch;
212 		}
213 
214 		batch = TAILQ_FIRST(&dev->free_batches);
215 		if (spdk_unlikely(batch == NULL)) {
216 			return NULL;
217 		}
218 
219 		assert(TAILQ_EMPTY(&batch->entries));
220 		assert(batch->num_entries == 0);
221 		TAILQ_REMOVE(&dev->free_batches, batch, tailq);
222 	}
223 
224 	/*
225 	 * Keep shifting the queue to ensure fairness in IO channel selection.  Each time
226 	 * ftl_get_next_batch() is called, we're starting to dequeue write buffer entries from a
227 	 * different IO channel.
228 	 */
229 	TAILQ_INIT(&ioch_queue);
230 	while (!TAILQ_EMPTY(&dev->ioch_queue)) {
231 		ioch = TAILQ_FIRST(&dev->ioch_queue);
232 		TAILQ_REMOVE(&dev->ioch_queue, ioch, tailq);
233 		TAILQ_INSERT_TAIL(&ioch_queue, ioch, tailq);
234 
235 		num_remaining = dev->xfer_size - batch->num_entries;
236 		while (num_remaining > 0) {
237 			num_dequeued = spdk_ring_dequeue(ioch->submit_queue, (void **)entries,
238 							 spdk_min(num_remaining,
239 									 FTL_DEQUEUE_ENTRIES));
240 			if (num_dequeued == 0) {
241 				break;
242 			}
243 
244 			for (i = 0; i < num_dequeued; ++i) {
245 				batch->iov[batch->num_entries + i].iov_base = entries[i]->payload;
246 				batch->iov[batch->num_entries + i].iov_len = FTL_BLOCK_SIZE;
247 
248 				if (batch->metadata != NULL) {
249 					metadata = (uint64_t *)((char *)batch->metadata +
250 								i * dev->md_size);
251 					*metadata = entries[i]->lba;
252 				}
253 
254 				TAILQ_INSERT_TAIL(&batch->entries, entries[i], tailq);
255 			}
256 
257 			batch->num_entries += num_dequeued;
258 			num_remaining -= num_dequeued;
259 		}
260 
261 		if (num_remaining == 0) {
262 			break;
263 		}
264 	}
265 
266 	TAILQ_CONCAT(&dev->ioch_queue, &ioch_queue, tailq);
267 
268 	if (batch->num_entries == dev->xfer_size) {
269 		dev->current_batch = NULL;
270 	} else {
271 		dev->current_batch = batch;
272 		batch = NULL;
273 	}
274 
275 	return batch;
276 }
277 
278 static void
279 ftl_release_batch(struct spdk_ftl_dev *dev, struct ftl_batch *batch)
280 {
281 	struct ftl_wbuf_entry *entry;
282 
283 	while (!TAILQ_EMPTY(&batch->entries)) {
284 		entry = TAILQ_FIRST(&batch->entries);
285 		TAILQ_REMOVE(&batch->entries, entry, tailq);
286 		ftl_release_wbuf_entry(entry);
287 	}
288 
289 	batch->num_entries = 0;
290 	TAILQ_INSERT_TAIL(&dev->free_batches, batch, tailq);
291 }
292 
293 static struct ftl_wbuf_entry *
294 ftl_get_entry_from_addr(struct spdk_ftl_dev *dev, struct ftl_addr addr)
295 {
296 	struct ftl_io_channel *ioch;
297 	uint64_t ioch_offset, entry_offset;
298 
299 	ioch_offset = addr.cache_offset & ((1 << dev->ioch_shift) - 1);
300 	entry_offset = addr.cache_offset >> dev->ioch_shift;
301 	ioch = dev->ioch_array[ioch_offset];
302 
303 	assert(ioch_offset < dev->conf.max_io_channels);
304 	assert(entry_offset < ioch->num_entries);
305 	assert(addr.cached == 1);
306 
307 	return &ioch->wbuf_entries[entry_offset];
308 }
309 
310 static struct ftl_addr
311 ftl_get_addr_from_entry(struct ftl_wbuf_entry *entry)
312 {
313 	struct ftl_io_channel *ioch = entry->ioch;
314 	struct ftl_addr addr = {};
315 
316 	addr.cached = 1;
317 	addr.cache_offset = (uint64_t)entry->index << ioch->dev->ioch_shift | ioch->index;
318 
319 	return addr;
320 }
321 
322 static void
323 ftl_io_cmpl_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
324 {
325 	struct ftl_io *io = cb_arg;
326 	struct spdk_ftl_dev *dev = io->dev;
327 
328 	if (spdk_unlikely(!success)) {
329 		io->status = -EIO;
330 	}
331 
332 	ftl_trace_completion(dev, io, FTL_TRACE_COMPLETION_DISK);
333 
334 	if (io->type == FTL_IO_WRITE && ftl_is_append_supported(dev)) {
335 		assert(io->parent);
336 		io->parent->addr.offset = spdk_bdev_io_get_append_location(bdev_io);
337 	}
338 
339 	ftl_io_dec_req(io);
340 	if (ftl_io_done(io)) {
341 		ftl_io_complete(io);
342 	}
343 
344 	spdk_bdev_free_io(bdev_io);
345 }
346 
347 static void
348 ftl_halt_writes(struct spdk_ftl_dev *dev, struct ftl_band *band)
349 {
350 	struct ftl_wptr *wptr = NULL;
351 
352 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
353 		if (wptr->band == band) {
354 			break;
355 		}
356 	}
357 
358 	/* If the band already has the high_prio flag set, other writes must */
359 	/* have failed earlier, so it's already taken care of. */
360 	if (band->high_prio) {
361 		assert(wptr == NULL);
362 		return;
363 	}
364 
365 	ftl_band_write_failed(band);
366 	ftl_remove_wptr(wptr);
367 }
368 
369 static struct ftl_wptr *
370 ftl_wptr_from_band(struct ftl_band *band)
371 {
372 	struct spdk_ftl_dev *dev = band->dev;
373 	struct ftl_wptr *wptr = NULL;
374 
375 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
376 		if (wptr->band == band) {
377 			return wptr;
378 		}
379 	}
380 	assert(false);
381 	return NULL;
382 }
383 
384 static void
385 ftl_md_write_fail(struct ftl_io *io, int status)
386 {
387 	struct ftl_band *band = io->band;
388 	struct ftl_wptr *wptr;
389 	char buf[128];
390 
391 	wptr = ftl_wptr_from_band(band);
392 	SPDK_ERRLOG("Metadata write failed @addr: %s, status: %d\n",
393 		    ftl_addr2str(wptr->addr, buf, sizeof(buf)), status);
394 
395 	ftl_halt_writes(io->dev, band);
396 }
397 
398 static void
399 ftl_md_write_cb(struct ftl_io *io, void *arg, int status)
400 {
401 	struct spdk_ftl_dev *dev = io->dev;
402 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
403 	struct ftl_band *band = io->band;
404 	struct ftl_wptr *wptr;
405 	size_t id;
406 
407 	wptr = ftl_wptr_from_band(band);
408 
409 	if (status) {
410 		ftl_md_write_fail(io, status);
411 		return;
412 	}
413 
414 	ftl_band_set_next_state(band);
415 	if (band->state == FTL_BAND_STATE_CLOSED) {
416 		if (ftl_dev_has_nv_cache(dev)) {
417 			pthread_spin_lock(&nv_cache->lock);
418 			nv_cache->num_available += ftl_band_user_blocks(band);
419 
420 			if (spdk_unlikely(nv_cache->num_available > nv_cache->num_data_blocks)) {
421 				nv_cache->num_available = nv_cache->num_data_blocks;
422 			}
423 			pthread_spin_unlock(&nv_cache->lock);
424 		}
425 
426 		/*
427 		 * Go through the reloc_bitmap, checking for all the bands that had its data moved
428 		 * onto current band and update their counters to allow them to be used for writing
429 		 * (once they're closed and empty).
430 		 */
431 		for (id = 0; id < ftl_get_num_bands(dev); ++id) {
432 			if (spdk_bit_array_get(band->reloc_bitmap, id)) {
433 				assert(dev->bands[id].num_reloc_bands > 0);
434 				dev->bands[id].num_reloc_bands--;
435 
436 				spdk_bit_array_clear(band->reloc_bitmap, id);
437 			}
438 		}
439 
440 		ftl_remove_wptr(wptr);
441 	}
442 }
443 
444 static int
445 ftl_read_next_physical_addr(struct ftl_io *io, struct ftl_addr *addr)
446 {
447 	struct spdk_ftl_dev *dev = io->dev;
448 	size_t num_blocks, max_blocks;
449 
450 	assert(ftl_io_mode_physical(io));
451 	assert(io->iov_pos < io->iov_cnt);
452 
453 	if (io->pos == 0) {
454 		*addr = io->addr;
455 	} else {
456 		*addr = ftl_band_next_xfer_addr(io->band, io->addr, io->pos);
457 	}
458 
459 	assert(!ftl_addr_invalid(*addr));
460 
461 	/* Metadata has to be read in the way it's written (jumping across */
462 	/* the zones in xfer_size increments) */
463 	if (io->flags & FTL_IO_MD) {
464 		max_blocks = dev->xfer_size - (addr->offset % dev->xfer_size);
465 		num_blocks = spdk_min(ftl_io_iovec_len_left(io), max_blocks);
466 		assert(addr->offset / dev->xfer_size ==
467 		       (addr->offset + num_blocks - 1) / dev->xfer_size);
468 	} else {
469 		num_blocks = ftl_io_iovec_len_left(io);
470 	}
471 
472 	return num_blocks;
473 }
474 
475 static int
476 ftl_wptr_close_band(struct ftl_wptr *wptr)
477 {
478 	struct ftl_band *band = wptr->band;
479 
480 	ftl_band_set_state(band, FTL_BAND_STATE_CLOSING);
481 
482 	return ftl_band_write_tail_md(band, ftl_md_write_cb);
483 }
484 
485 static int
486 ftl_wptr_open_band(struct ftl_wptr *wptr)
487 {
488 	struct ftl_band *band = wptr->band;
489 
490 	assert(ftl_band_zone_is_first(band, wptr->zone));
491 	assert(band->lba_map.num_vld == 0);
492 
493 	ftl_band_clear_lba_map(band);
494 
495 	assert(band->state == FTL_BAND_STATE_PREP);
496 	ftl_band_set_state(band, FTL_BAND_STATE_OPENING);
497 
498 	return ftl_band_write_head_md(band, ftl_md_write_cb);
499 }
500 
501 static int
502 ftl_submit_erase(struct ftl_io *io)
503 {
504 	struct spdk_ftl_dev *dev = io->dev;
505 	struct ftl_band *band = io->band;
506 	struct ftl_addr addr = io->addr;
507 	struct ftl_io_channel *ioch;
508 	struct ftl_zone *zone;
509 	int rc = 0;
510 	size_t i;
511 
512 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
513 
514 	for (i = 0; i < io->num_blocks; ++i) {
515 		if (i != 0) {
516 			zone = ftl_band_next_zone(band, ftl_band_zone_from_addr(band, addr));
517 			assert(zone->info.state == SPDK_BDEV_ZONE_STATE_FULL);
518 			addr.offset = zone->info.zone_id;
519 		}
520 
521 		assert(ftl_addr_get_zone_offset(dev, addr) == 0);
522 
523 		ftl_trace_submission(dev, io, addr, 1);
524 		rc = spdk_bdev_zone_management(dev->base_bdev_desc, ioch->base_ioch, addr.offset,
525 					       SPDK_BDEV_ZONE_RESET, ftl_io_cmpl_cb, io);
526 		if (spdk_unlikely(rc)) {
527 			ftl_io_fail(io, rc);
528 			SPDK_ERRLOG("Vector reset failed with status: %d\n", rc);
529 			break;
530 		}
531 
532 		ftl_io_inc_req(io);
533 		ftl_io_advance(io, 1);
534 	}
535 
536 	if (ftl_io_done(io)) {
537 		ftl_io_complete(io);
538 	}
539 
540 	return rc;
541 }
542 
543 static bool
544 ftl_check_core_thread(const struct spdk_ftl_dev *dev)
545 {
546 	return dev->core_thread == spdk_get_thread();
547 }
548 
549 struct spdk_io_channel *
550 ftl_get_io_channel(const struct spdk_ftl_dev *dev)
551 {
552 	if (ftl_check_core_thread(dev)) {
553 		return dev->ioch;
554 	}
555 
556 	return NULL;
557 }
558 
559 static void
560 ftl_erase_fail(struct ftl_io *io, int status)
561 {
562 	struct ftl_zone *zone;
563 	struct ftl_band *band = io->band;
564 	char buf[128];
565 
566 	SPDK_ERRLOG("Erase failed at address: %s, status: %d\n",
567 		    ftl_addr2str(io->addr, buf, sizeof(buf)), status);
568 
569 	zone = ftl_band_zone_from_addr(band, io->addr);
570 	zone->info.state = SPDK_BDEV_ZONE_STATE_OFFLINE;
571 	ftl_band_remove_zone(band, zone);
572 	band->tail_md_addr = ftl_band_tail_md_addr(band);
573 }
574 
575 static void
576 ftl_zone_erase_cb(struct ftl_io *io, void *ctx, int status)
577 {
578 	struct ftl_zone *zone;
579 
580 	zone = ftl_band_zone_from_addr(io->band, io->addr);
581 	zone->busy = false;
582 
583 	if (spdk_unlikely(status)) {
584 		ftl_erase_fail(io, status);
585 		return;
586 	}
587 
588 	zone->info.state = SPDK_BDEV_ZONE_STATE_EMPTY;
589 	zone->info.write_pointer = zone->info.zone_id;
590 }
591 
592 static int
593 ftl_band_erase(struct ftl_band *band)
594 {
595 	struct ftl_zone *zone;
596 	struct ftl_io *io;
597 	int rc = 0;
598 
599 	assert(band->state == FTL_BAND_STATE_CLOSED ||
600 	       band->state == FTL_BAND_STATE_FREE);
601 
602 	ftl_band_set_state(band, FTL_BAND_STATE_PREP);
603 
604 	CIRCLEQ_FOREACH(zone, &band->zones, circleq) {
605 		if (zone->info.state == SPDK_BDEV_ZONE_STATE_EMPTY) {
606 			continue;
607 		}
608 
609 		io = ftl_io_erase_init(band, 1, ftl_zone_erase_cb);
610 		if (!io) {
611 			rc = -ENOMEM;
612 			break;
613 		}
614 
615 		zone->busy = true;
616 		io->addr.offset = zone->info.zone_id;
617 		rc = ftl_submit_erase(io);
618 		if (rc) {
619 			zone->busy = false;
620 			assert(0);
621 			/* TODO: change band's state back to close? */
622 			break;
623 		}
624 	}
625 
626 	return rc;
627 }
628 
629 static struct ftl_band *
630 ftl_next_write_band(struct spdk_ftl_dev *dev)
631 {
632 	struct ftl_band *band;
633 
634 	/* Find a free band that has all of its data moved onto other closed bands */
635 	LIST_FOREACH(band, &dev->free_bands, list_entry) {
636 		assert(band->state == FTL_BAND_STATE_FREE);
637 		if (band->num_reloc_bands == 0 && band->num_reloc_blocks == 0) {
638 			break;
639 		}
640 	}
641 
642 	if (spdk_unlikely(!band)) {
643 		return NULL;
644 	}
645 
646 	if (ftl_band_erase(band)) {
647 		/* TODO: handle erase failure */
648 		return NULL;
649 	}
650 
651 	return band;
652 }
653 
654 static struct ftl_band *
655 ftl_next_wptr_band(struct spdk_ftl_dev *dev)
656 {
657 	struct ftl_band *band;
658 
659 	if (!dev->next_band) {
660 		band = ftl_next_write_band(dev);
661 	} else {
662 		assert(dev->next_band->state == FTL_BAND_STATE_PREP);
663 		band = dev->next_band;
664 		dev->next_band = NULL;
665 	}
666 
667 	return band;
668 }
669 
670 static struct ftl_wptr *
671 ftl_wptr_init(struct ftl_band *band)
672 {
673 	struct spdk_ftl_dev *dev = band->dev;
674 	struct ftl_wptr *wptr;
675 
676 	wptr = calloc(1, sizeof(*wptr));
677 	if (!wptr) {
678 		return NULL;
679 	}
680 
681 	wptr->dev = dev;
682 	wptr->band = band;
683 	wptr->zone = CIRCLEQ_FIRST(&band->zones);
684 	wptr->addr.offset = wptr->zone->info.zone_id;
685 	TAILQ_INIT(&wptr->pending_queue);
686 
687 	return wptr;
688 }
689 
690 static int
691 ftl_add_direct_wptr(struct ftl_band *band)
692 {
693 	struct spdk_ftl_dev *dev = band->dev;
694 	struct ftl_wptr *wptr;
695 
696 	assert(band->state == FTL_BAND_STATE_OPEN);
697 
698 	wptr = ftl_wptr_init(band);
699 	if (!wptr) {
700 		return -1;
701 	}
702 
703 	wptr->direct_mode = true;
704 
705 	if (ftl_band_alloc_lba_map(band)) {
706 		ftl_wptr_free(wptr);
707 		return -1;
708 	}
709 
710 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
711 
712 	SPDK_DEBUGLOG(ftl_core, "wptr: direct band %u\n", band->id);
713 	ftl_trace_write_band(dev, band);
714 	return 0;
715 }
716 
717 static void
718 ftl_close_direct_wptr(struct ftl_band *band)
719 {
720 	struct ftl_wptr *wptr = ftl_wptr_from_band(band);
721 
722 	assert(wptr->direct_mode);
723 	assert(band->state == FTL_BAND_STATE_CLOSED);
724 
725 	ftl_band_release_lba_map(band);
726 
727 	ftl_remove_wptr(wptr);
728 }
729 
730 int
731 ftl_band_set_direct_access(struct ftl_band *band, bool access)
732 {
733 	if (access) {
734 		return ftl_add_direct_wptr(band);
735 	} else {
736 		ftl_close_direct_wptr(band);
737 		return 0;
738 	}
739 }
740 
741 static int
742 ftl_add_wptr(struct spdk_ftl_dev *dev)
743 {
744 	struct ftl_band *band;
745 	struct ftl_wptr *wptr;
746 
747 	band = ftl_next_wptr_band(dev);
748 	if (!band) {
749 		return -1;
750 	}
751 
752 	wptr = ftl_wptr_init(band);
753 	if (!wptr) {
754 		return -1;
755 	}
756 
757 	if (ftl_band_write_prep(band)) {
758 		ftl_wptr_free(wptr);
759 		return -1;
760 	}
761 
762 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
763 
764 	SPDK_DEBUGLOG(ftl_core, "wptr: band %u\n", band->id);
765 	ftl_trace_write_band(dev, band);
766 	return 0;
767 }
768 
769 static void
770 ftl_wptr_advance(struct ftl_wptr *wptr, size_t xfer_size)
771 {
772 	struct ftl_band *band = wptr->band;
773 	struct spdk_ftl_dev *dev = wptr->dev;
774 	struct spdk_ftl_conf *conf = &dev->conf;
775 	size_t next_thld;
776 
777 	if (spdk_unlikely(wptr->direct_mode)) {
778 		return;
779 	}
780 
781 	wptr->offset += xfer_size;
782 	next_thld = (ftl_band_num_usable_blocks(band) * conf->band_thld) / 100;
783 
784 	if (ftl_band_full(band, wptr->offset)) {
785 		ftl_band_set_state(band, FTL_BAND_STATE_FULL);
786 	}
787 
788 	wptr->zone->busy = true;
789 	wptr->addr = ftl_band_next_xfer_addr(band, wptr->addr, xfer_size);
790 	wptr->zone = ftl_band_next_operational_zone(band, wptr->zone);
791 
792 	assert(!ftl_addr_invalid(wptr->addr));
793 
794 	SPDK_DEBUGLOG(ftl_core, "wptr: pu:%lu band:%lu, offset:%lu\n",
795 		      ftl_addr_get_punit(dev, wptr->addr),
796 		      ftl_addr_get_band(dev, wptr->addr),
797 		      wptr->addr.offset);
798 
799 	if (wptr->offset >= next_thld && !dev->next_band) {
800 		dev->next_band = ftl_next_write_band(dev);
801 	}
802 }
803 
804 static size_t
805 ftl_wptr_user_blocks_left(const struct ftl_wptr *wptr)
806 {
807 	return ftl_band_user_blocks_left(wptr->band, wptr->offset);
808 }
809 
810 static bool
811 ftl_wptr_ready(struct ftl_wptr *wptr)
812 {
813 	struct ftl_band *band = wptr->band;
814 
815 	/* TODO: add handling of empty bands */
816 
817 	if (spdk_unlikely(!ftl_zone_is_writable(wptr->dev, wptr->zone))) {
818 		/* Erasing band may fail after it was assigned to wptr. */
819 		if (spdk_unlikely(wptr->zone->info.state == SPDK_BDEV_ZONE_STATE_OFFLINE)) {
820 			ftl_wptr_advance(wptr, wptr->dev->xfer_size);
821 		}
822 		return false;
823 	}
824 
825 	/* If we're in the process of writing metadata, wait till it is */
826 	/* completed. */
827 	/* TODO: we should probably change bands once we're writing tail md */
828 	if (ftl_band_state_changing(band)) {
829 		return false;
830 	}
831 
832 	if (band->state == FTL_BAND_STATE_FULL) {
833 		if (wptr->num_outstanding == 0) {
834 			if (ftl_wptr_close_band(wptr)) {
835 				/* TODO: need recovery here */
836 				assert(false);
837 			}
838 		}
839 
840 		return false;
841 	}
842 
843 	if (band->state != FTL_BAND_STATE_OPEN) {
844 		if (ftl_wptr_open_band(wptr)) {
845 			/* TODO: need recovery here */
846 			assert(false);
847 		}
848 
849 		return false;
850 	}
851 
852 	return true;
853 }
854 
855 int
856 ftl_flush_active_bands(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
857 {
858 	struct ftl_wptr *wptr;
859 	struct ftl_band_flush *flush;
860 
861 	assert(ftl_get_core_thread(dev) == spdk_get_thread());
862 
863 	flush = calloc(1, sizeof(*flush));
864 	if (spdk_unlikely(!flush)) {
865 		return -ENOMEM;
866 	}
867 
868 	LIST_INSERT_HEAD(&dev->band_flush_list, flush, list_entry);
869 
870 	flush->cb_fn = cb_fn;
871 	flush->cb_arg = cb_arg;
872 	flush->dev = dev;
873 
874 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
875 		wptr->flush = true;
876 		flush->num_bands++;
877 	}
878 
879 	return 0;
880 }
881 
882 static const struct spdk_ftl_limit *
883 ftl_get_limit(const struct spdk_ftl_dev *dev, int type)
884 {
885 	assert(type < SPDK_FTL_LIMIT_MAX);
886 	return &dev->conf.limits[type];
887 }
888 
889 static bool
890 ftl_cache_lba_valid(struct spdk_ftl_dev *dev, struct ftl_wbuf_entry *entry)
891 {
892 	struct ftl_addr addr;
893 
894 	/* If the LBA is invalid don't bother checking the md and l2p */
895 	if (spdk_unlikely(entry->lba == FTL_LBA_INVALID)) {
896 		return false;
897 	}
898 
899 	addr = ftl_l2p_get(dev, entry->lba);
900 	if (!(ftl_addr_cached(addr) && entry == ftl_get_entry_from_addr(dev, addr))) {
901 		return false;
902 	}
903 
904 	return true;
905 }
906 
907 void
908 ftl_evict_cache_entry(struct spdk_ftl_dev *dev, struct ftl_wbuf_entry *entry)
909 {
910 	pthread_spin_lock(&entry->lock);
911 
912 	if (!entry->valid) {
913 		goto unlock;
914 	}
915 
916 	/* If the l2p wasn't updated and still points at the entry, fill it with the */
917 	/* on-disk address and clear the cache status bit. Otherwise, skip the l2p update */
918 	/* and just clear the cache status. */
919 	if (!ftl_cache_lba_valid(dev, entry)) {
920 		goto clear;
921 	}
922 
923 	ftl_l2p_set(dev, entry->lba, entry->addr);
924 clear:
925 	entry->valid = false;
926 unlock:
927 	pthread_spin_unlock(&entry->lock);
928 }
929 
930 static void
931 ftl_pad_wbuf(struct spdk_ftl_dev *dev, size_t size)
932 {
933 	struct ftl_wbuf_entry *entry;
934 	struct ftl_io_channel *ioch;
935 	int flags = FTL_IO_PAD | FTL_IO_INTERNAL;
936 
937 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
938 
939 	for (size_t i = 0; i < size; ++i) {
940 		entry = ftl_acquire_wbuf_entry(ioch, flags);
941 		if (!entry) {
942 			break;
943 		}
944 
945 		entry->lba = FTL_LBA_INVALID;
946 		entry->addr = ftl_to_addr(FTL_ADDR_INVALID);
947 		memset(entry->payload, 0, FTL_BLOCK_SIZE);
948 
949 		spdk_ring_enqueue(ioch->submit_queue, (void **)&entry, 1, NULL);
950 	}
951 }
952 
953 static void
954 ftl_remove_free_bands(struct spdk_ftl_dev *dev)
955 {
956 	while (!LIST_EMPTY(&dev->free_bands)) {
957 		LIST_REMOVE(LIST_FIRST(&dev->free_bands), list_entry);
958 	}
959 
960 	dev->next_band = NULL;
961 }
962 
963 static void
964 ftl_wptr_pad_band(struct ftl_wptr *wptr)
965 {
966 	struct spdk_ftl_dev *dev = wptr->dev;
967 	struct ftl_batch *batch = dev->current_batch;
968 	struct ftl_io_channel *ioch;
969 	struct ftl_io *io;
970 	size_t size, pad_size, blocks_left;
971 
972 	size = batch != NULL ? batch->num_entries : 0;
973 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
974 		size += spdk_ring_count(ioch->submit_queue);
975 
976 		TAILQ_FOREACH(io, &ioch->retry_queue, ioch_entry) {
977 			if (io->type == FTL_IO_WRITE) {
978 				size += io->num_blocks - io->pos;
979 			}
980 		}
981 	}
982 
983 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
984 
985 	blocks_left = ftl_wptr_user_blocks_left(wptr);
986 	assert(size <= blocks_left);
987 	assert(blocks_left % dev->xfer_size == 0);
988 	pad_size = spdk_min(blocks_left - size, spdk_ring_count(ioch->free_queue));
989 
990 	ftl_pad_wbuf(dev, pad_size);
991 }
992 
993 static void
994 ftl_wptr_process_shutdown(struct ftl_wptr *wptr)
995 {
996 	struct spdk_ftl_dev *dev = wptr->dev;
997 	struct ftl_batch *batch = dev->current_batch;
998 	struct ftl_io_channel *ioch;
999 	struct ftl_io *io;
1000 	size_t size;
1001 
1002 	size = batch != NULL ? batch->num_entries : 0;
1003 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
1004 		size += spdk_ring_count(ioch->submit_queue);
1005 
1006 		TAILQ_FOREACH(io, &ioch->retry_queue, ioch_entry) {
1007 			if (io->type == FTL_IO_WRITE) {
1008 				size += io->num_blocks - io->pos;
1009 			}
1010 		}
1011 	}
1012 
1013 	if (size >= dev->xfer_size) {
1014 		return;
1015 	}
1016 
1017 	/* If we reach this point we need to remove free bands */
1018 	/* and pad current wptr band to the end */
1019 	ftl_remove_free_bands(dev);
1020 	ftl_wptr_pad_band(wptr);
1021 }
1022 
1023 static int
1024 ftl_shutdown_complete(struct spdk_ftl_dev *dev)
1025 {
1026 	struct ftl_io_channel *ioch = ftl_io_channel_get_ctx(dev->ioch);
1027 
1028 	return !__atomic_load_n(&dev->num_inflight, __ATOMIC_SEQ_CST) &&
1029 	       dev->num_io_channels == 1 && LIST_EMPTY(&dev->wptr_list) &&
1030 	       TAILQ_EMPTY(&ioch->retry_queue);
1031 }
1032 
1033 void
1034 ftl_apply_limits(struct spdk_ftl_dev *dev)
1035 {
1036 	const struct spdk_ftl_limit *limit;
1037 	struct ftl_io_channel *ioch;
1038 	struct ftl_stats *stats = &dev->stats;
1039 	uint32_t qdepth_limit = 100;
1040 	int i;
1041 
1042 	/* Clear existing limit */
1043 	dev->limit = SPDK_FTL_LIMIT_MAX;
1044 
1045 	for (i = SPDK_FTL_LIMIT_CRIT; i < SPDK_FTL_LIMIT_MAX; ++i) {
1046 		limit = ftl_get_limit(dev, i);
1047 
1048 		if (dev->num_free <= limit->thld) {
1049 			qdepth_limit = limit->limit;
1050 			stats->limits[i]++;
1051 			dev->limit = i;
1052 			break;
1053 		}
1054 	}
1055 
1056 	ftl_trace_limits(dev, dev->limit, dev->num_free);
1057 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
1058 		__atomic_store_n(&ioch->qdepth_limit, (qdepth_limit * ioch->num_entries) / 100,
1059 				 __ATOMIC_SEQ_CST);
1060 	}
1061 }
1062 
1063 static int
1064 ftl_invalidate_addr_unlocked(struct spdk_ftl_dev *dev, struct ftl_addr addr)
1065 {
1066 	struct ftl_band *band = ftl_band_from_addr(dev, addr);
1067 	struct ftl_lba_map *lba_map = &band->lba_map;
1068 	uint64_t offset;
1069 
1070 	offset = ftl_band_block_offset_from_addr(band, addr);
1071 
1072 	/* The bit might be already cleared if two writes are scheduled to the */
1073 	/* same LBA at the same time */
1074 	if (spdk_bit_array_get(lba_map->vld, offset)) {
1075 		assert(lba_map->num_vld > 0);
1076 		spdk_bit_array_clear(lba_map->vld, offset);
1077 		lba_map->num_vld--;
1078 		return 1;
1079 	}
1080 
1081 	return 0;
1082 }
1083 
1084 int
1085 ftl_invalidate_addr(struct spdk_ftl_dev *dev, struct ftl_addr addr)
1086 {
1087 	struct ftl_band *band;
1088 	int rc;
1089 
1090 	assert(!ftl_addr_cached(addr));
1091 	band = ftl_band_from_addr(dev, addr);
1092 
1093 	pthread_spin_lock(&band->lba_map.lock);
1094 	rc = ftl_invalidate_addr_unlocked(dev, addr);
1095 	pthread_spin_unlock(&band->lba_map.lock);
1096 
1097 	return rc;
1098 }
1099 
1100 static int
1101 ftl_read_retry(int rc)
1102 {
1103 	return rc == -EAGAIN;
1104 }
1105 
1106 static int
1107 ftl_read_canceled(int rc)
1108 {
1109 	return rc == -EFAULT || rc == 0;
1110 }
1111 
1112 static int
1113 ftl_cache_read(struct ftl_io *io, uint64_t lba,
1114 	       struct ftl_addr addr, void *buf)
1115 {
1116 	struct ftl_wbuf_entry *entry;
1117 	struct ftl_addr naddr;
1118 	int rc = 0;
1119 
1120 	entry = ftl_get_entry_from_addr(io->dev, addr);
1121 	pthread_spin_lock(&entry->lock);
1122 
1123 	naddr = ftl_l2p_get(io->dev, lba);
1124 	if (addr.offset != naddr.offset) {
1125 		rc = -1;
1126 		goto out;
1127 	}
1128 
1129 	memcpy(buf, entry->payload, FTL_BLOCK_SIZE);
1130 out:
1131 	pthread_spin_unlock(&entry->lock);
1132 	return rc;
1133 }
1134 
1135 static int
1136 ftl_read_next_logical_addr(struct ftl_io *io, struct ftl_addr *addr)
1137 {
1138 	struct spdk_ftl_dev *dev = io->dev;
1139 	struct ftl_addr next_addr;
1140 	size_t i;
1141 
1142 	*addr = ftl_l2p_get(dev, ftl_io_current_lba(io));
1143 
1144 	SPDK_DEBUGLOG(ftl_core, "Read addr:%lx, lba:%lu\n",
1145 		      addr->offset, ftl_io_current_lba(io));
1146 
1147 	/* If the address is invalid, skip it (the buffer should already be zero'ed) */
1148 	if (ftl_addr_invalid(*addr)) {
1149 		return -EFAULT;
1150 	}
1151 
1152 	if (ftl_addr_cached(*addr)) {
1153 		if (!ftl_cache_read(io, ftl_io_current_lba(io), *addr, ftl_io_iovec_addr(io))) {
1154 			return 0;
1155 		}
1156 
1157 		/* If the state changed, we have to re-read the l2p */
1158 		return -EAGAIN;
1159 	}
1160 
1161 	for (i = 1; i < ftl_io_iovec_len_left(io); ++i) {
1162 		next_addr = ftl_l2p_get(dev, ftl_io_get_lba(io, io->pos + i));
1163 
1164 		if (ftl_addr_invalid(next_addr) || ftl_addr_cached(next_addr)) {
1165 			break;
1166 		}
1167 
1168 		if (addr->offset + i != next_addr.offset) {
1169 			break;
1170 		}
1171 	}
1172 
1173 	return i;
1174 }
1175 
1176 static int
1177 ftl_submit_read(struct ftl_io *io)
1178 {
1179 	struct spdk_ftl_dev *dev = io->dev;
1180 	struct ftl_io_channel *ioch;
1181 	struct ftl_addr addr;
1182 	int rc = 0, num_blocks;
1183 
1184 	ioch = ftl_io_channel_get_ctx(io->ioch);
1185 
1186 	assert(LIST_EMPTY(&io->children));
1187 
1188 	while (io->pos < io->num_blocks) {
1189 		if (ftl_io_mode_physical(io)) {
1190 			num_blocks = rc = ftl_read_next_physical_addr(io, &addr);
1191 		} else {
1192 			num_blocks = rc = ftl_read_next_logical_addr(io, &addr);
1193 		}
1194 
1195 		/* We might need to retry the read from scratch (e.g. */
1196 		/* because write was under way and completed before */
1197 		/* we could read it from the write buffer */
1198 		if (ftl_read_retry(rc)) {
1199 			continue;
1200 		}
1201 
1202 		/* We don't have to schedule the read, as it was read from cache */
1203 		if (ftl_read_canceled(rc)) {
1204 			ftl_io_advance(io, 1);
1205 			ftl_trace_completion(io->dev, io, rc ? FTL_TRACE_COMPLETION_INVALID :
1206 					     FTL_TRACE_COMPLETION_CACHE);
1207 			rc = 0;
1208 			continue;
1209 		}
1210 
1211 		assert(num_blocks > 0);
1212 
1213 		ftl_trace_submission(dev, io, addr, num_blocks);
1214 		rc = spdk_bdev_read_blocks(dev->base_bdev_desc, ioch->base_ioch,
1215 					   ftl_io_iovec_addr(io),
1216 					   addr.offset,
1217 					   num_blocks, ftl_io_cmpl_cb, io);
1218 		if (spdk_unlikely(rc)) {
1219 			if (rc == -ENOMEM) {
1220 				TAILQ_INSERT_TAIL(&ioch->retry_queue, io, ioch_entry);
1221 				rc = 0;
1222 			} else {
1223 				ftl_io_fail(io, rc);
1224 			}
1225 			break;
1226 		}
1227 
1228 		ftl_io_inc_req(io);
1229 		ftl_io_advance(io, num_blocks);
1230 	}
1231 
1232 	/* If we didn't have to read anything from the device, */
1233 	/* complete the request right away */
1234 	if (ftl_io_done(io)) {
1235 		ftl_io_complete(io);
1236 	}
1237 
1238 	return rc;
1239 }
1240 
1241 static void
1242 ftl_complete_flush(struct ftl_flush *flush)
1243 {
1244 	assert(flush->num_req == 0);
1245 	LIST_REMOVE(flush, list_entry);
1246 
1247 	flush->cb.fn(flush->cb.ctx, 0);
1248 
1249 	spdk_bit_array_free(&flush->bmap);
1250 	free(flush);
1251 }
1252 
1253 static void
1254 ftl_process_flush(struct spdk_ftl_dev *dev, struct ftl_batch *batch)
1255 {
1256 	struct ftl_flush *flush, *tflush;
1257 	size_t offset;
1258 
1259 	LIST_FOREACH_SAFE(flush, &dev->flush_list, list_entry, tflush) {
1260 		offset = batch->index;
1261 
1262 		if (spdk_bit_array_get(flush->bmap, offset)) {
1263 			spdk_bit_array_clear(flush->bmap, offset);
1264 			if (!(--flush->num_req)) {
1265 				ftl_complete_flush(flush);
1266 			}
1267 		}
1268 	}
1269 }
1270 
1271 static void
1272 ftl_nv_cache_wrap_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1273 {
1274 	struct ftl_nv_cache *nv_cache = cb_arg;
1275 
1276 	if (!success) {
1277 		SPDK_ERRLOG("Unable to write non-volatile cache metadata header\n");
1278 		/* TODO: go into read-only mode */
1279 		assert(0);
1280 	}
1281 
1282 	pthread_spin_lock(&nv_cache->lock);
1283 	nv_cache->ready = true;
1284 	pthread_spin_unlock(&nv_cache->lock);
1285 
1286 	spdk_bdev_free_io(bdev_io);
1287 }
1288 
1289 static void
1290 ftl_nv_cache_wrap(void *ctx)
1291 {
1292 	struct ftl_nv_cache *nv_cache = ctx;
1293 	int rc;
1294 
1295 	rc = ftl_nv_cache_write_header(nv_cache, false, ftl_nv_cache_wrap_cb, nv_cache);
1296 	if (spdk_unlikely(rc != 0)) {
1297 		SPDK_ERRLOG("Unable to write non-volatile cache metadata header: %s\n",
1298 			    spdk_strerror(-rc));
1299 		/* TODO: go into read-only mode */
1300 		assert(0);
1301 	}
1302 }
1303 
1304 static uint64_t
1305 ftl_reserve_nv_cache(struct ftl_nv_cache *nv_cache, size_t *num_blocks, unsigned int *phase)
1306 {
1307 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1308 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1309 	uint64_t num_available, cache_size, cache_addr = FTL_LBA_INVALID;
1310 
1311 	cache_size = spdk_bdev_get_num_blocks(bdev);
1312 
1313 	pthread_spin_lock(&nv_cache->lock);
1314 	if (spdk_unlikely(nv_cache->num_available == 0 || !nv_cache->ready)) {
1315 		goto out;
1316 	}
1317 
1318 	num_available = spdk_min(nv_cache->num_available, *num_blocks);
1319 	num_available = spdk_min(num_available, dev->conf.nv_cache.max_request_cnt);
1320 
1321 	if (spdk_unlikely(nv_cache->current_addr + num_available > cache_size)) {
1322 		*num_blocks = cache_size - nv_cache->current_addr;
1323 	} else {
1324 		*num_blocks = num_available;
1325 	}
1326 
1327 	cache_addr = nv_cache->current_addr;
1328 	nv_cache->current_addr += *num_blocks;
1329 	nv_cache->num_available -= *num_blocks;
1330 	*phase = nv_cache->phase;
1331 
1332 	if (nv_cache->current_addr == spdk_bdev_get_num_blocks(bdev)) {
1333 		nv_cache->current_addr = FTL_NV_CACHE_DATA_OFFSET;
1334 		nv_cache->phase = ftl_nv_cache_next_phase(nv_cache->phase);
1335 		nv_cache->ready = false;
1336 		spdk_thread_send_msg(ftl_get_core_thread(dev), ftl_nv_cache_wrap, nv_cache);
1337 	}
1338 out:
1339 	pthread_spin_unlock(&nv_cache->lock);
1340 	return cache_addr;
1341 }
1342 
1343 static struct ftl_io *
1344 ftl_alloc_io_nv_cache(struct ftl_io *parent, size_t num_blocks)
1345 {
1346 	struct ftl_io_init_opts opts = {
1347 		.dev		= parent->dev,
1348 		.parent		= parent,
1349 		.iovcnt		= 0,
1350 		.num_blocks	= num_blocks,
1351 		.flags		= parent->flags | FTL_IO_CACHE,
1352 	};
1353 
1354 	return ftl_io_init_internal(&opts);
1355 }
1356 
1357 static void
1358 ftl_nv_cache_submit_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1359 {
1360 	struct ftl_io *io = cb_arg;
1361 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1362 
1363 	if (spdk_unlikely(!success)) {
1364 		SPDK_ERRLOG("Non-volatile cache write failed at %"PRIx64"\n", io->addr.offset);
1365 		io->status = -EIO;
1366 	}
1367 
1368 	ftl_io_dec_req(io);
1369 	if (ftl_io_done(io)) {
1370 		spdk_mempool_put(nv_cache->md_pool, io->md);
1371 		ftl_io_complete(io);
1372 	}
1373 
1374 	spdk_bdev_free_io(bdev_io);
1375 }
1376 
1377 static void
1378 ftl_submit_nv_cache(void *ctx)
1379 {
1380 	struct ftl_io *io = ctx;
1381 	struct spdk_ftl_dev *dev = io->dev;
1382 	struct spdk_thread *thread;
1383 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
1384 	struct ftl_io_channel *ioch;
1385 	int rc;
1386 
1387 	ioch = ftl_io_channel_get_ctx(io->ioch);
1388 	thread = spdk_io_channel_get_thread(io->ioch);
1389 
1390 	rc = spdk_bdev_write_blocks_with_md(nv_cache->bdev_desc, ioch->cache_ioch,
1391 					    ftl_io_iovec_addr(io), io->md, io->addr.offset,
1392 					    io->num_blocks, ftl_nv_cache_submit_cb, io);
1393 	if (rc == -ENOMEM) {
1394 		spdk_thread_send_msg(thread, ftl_submit_nv_cache, io);
1395 		return;
1396 	} else if (rc) {
1397 		SPDK_ERRLOG("Write to persistent cache failed: %s (%"PRIu64", %"PRIu64")\n",
1398 			    spdk_strerror(-rc), io->addr.offset, io->num_blocks);
1399 		spdk_mempool_put(nv_cache->md_pool, io->md);
1400 		io->status = -EIO;
1401 		ftl_io_complete(io);
1402 		return;
1403 	}
1404 
1405 	ftl_io_advance(io, io->num_blocks);
1406 	ftl_io_inc_req(io);
1407 }
1408 
1409 static void
1410 ftl_nv_cache_fill_md(struct ftl_io *io, unsigned int phase)
1411 {
1412 	struct spdk_bdev *bdev;
1413 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1414 	uint64_t block_off, lba;
1415 	void *md_buf = io->md;
1416 
1417 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1418 
1419 	for (block_off = 0; block_off < io->num_blocks; ++block_off) {
1420 		lba = ftl_nv_cache_pack_lba(ftl_io_get_lba(io, block_off), phase);
1421 		memcpy(md_buf, &lba, sizeof(lba));
1422 		md_buf += spdk_bdev_get_md_size(bdev);
1423 	}
1424 }
1425 
1426 static void
1427 _ftl_write_nv_cache(void *ctx)
1428 {
1429 	struct ftl_io *child, *io = ctx;
1430 	struct spdk_ftl_dev *dev = io->dev;
1431 	struct spdk_thread *thread;
1432 	unsigned int phase;
1433 	uint64_t num_blocks;
1434 
1435 	thread = spdk_io_channel_get_thread(io->ioch);
1436 
1437 	while (io->pos < io->num_blocks) {
1438 		num_blocks = ftl_io_iovec_len_left(io);
1439 
1440 		child = ftl_alloc_io_nv_cache(io, num_blocks);
1441 		if (spdk_unlikely(!child)) {
1442 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1443 			return;
1444 		}
1445 
1446 		child->md = spdk_mempool_get(dev->nv_cache.md_pool);
1447 		if (spdk_unlikely(!child->md)) {
1448 			ftl_io_free(child);
1449 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1450 			break;
1451 		}
1452 
1453 		/* Reserve area on the write buffer cache */
1454 		child->addr.offset = ftl_reserve_nv_cache(&dev->nv_cache, &num_blocks, &phase);
1455 		if (child->addr.offset == FTL_LBA_INVALID) {
1456 			spdk_mempool_put(dev->nv_cache.md_pool, child->md);
1457 			ftl_io_free(child);
1458 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1459 			break;
1460 		}
1461 
1462 		/* Shrink the IO if there isn't enough room in the cache to fill the whole iovec */
1463 		if (spdk_unlikely(num_blocks != ftl_io_iovec_len_left(io))) {
1464 			ftl_io_shrink_iovec(child, num_blocks);
1465 		}
1466 
1467 		ftl_nv_cache_fill_md(child, phase);
1468 		ftl_submit_nv_cache(child);
1469 	}
1470 
1471 	if (ftl_io_done(io)) {
1472 		ftl_io_complete(io);
1473 	}
1474 }
1475 
1476 static void
1477 ftl_write_nv_cache(struct ftl_io *parent)
1478 {
1479 	ftl_io_reset(parent);
1480 	parent->flags |= FTL_IO_CACHE;
1481 	_ftl_write_nv_cache(parent);
1482 }
1483 
1484 int
1485 ftl_nv_cache_write_header(struct ftl_nv_cache *nv_cache, bool shutdown,
1486 			  spdk_bdev_io_completion_cb cb_fn, void *cb_arg)
1487 {
1488 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1489 	struct ftl_nv_cache_header *hdr = nv_cache->dma_buf;
1490 	struct spdk_bdev *bdev;
1491 	struct ftl_io_channel *ioch;
1492 
1493 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1494 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
1495 
1496 	memset(hdr, 0, spdk_bdev_get_block_size(bdev));
1497 
1498 	hdr->phase = (uint8_t)nv_cache->phase;
1499 	hdr->size = spdk_bdev_get_num_blocks(bdev);
1500 	hdr->uuid = dev->uuid;
1501 	hdr->version = FTL_NV_CACHE_HEADER_VERSION;
1502 	hdr->current_addr = shutdown ? nv_cache->current_addr : FTL_LBA_INVALID;
1503 	hdr->checksum = spdk_crc32c_update(hdr, offsetof(struct ftl_nv_cache_header, checksum), 0);
1504 
1505 	return spdk_bdev_write_blocks(nv_cache->bdev_desc, ioch->cache_ioch, hdr, 0, 1,
1506 				      cb_fn, cb_arg);
1507 }
1508 
1509 int
1510 ftl_nv_cache_scrub(struct ftl_nv_cache *nv_cache, spdk_bdev_io_completion_cb cb_fn, void *cb_arg)
1511 {
1512 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1513 	struct ftl_io_channel *ioch;
1514 	struct spdk_bdev *bdev;
1515 
1516 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
1517 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1518 
1519 	return spdk_bdev_write_zeroes_blocks(nv_cache->bdev_desc, ioch->cache_ioch, 1,
1520 					     spdk_bdev_get_num_blocks(bdev) - 1,
1521 					     cb_fn, cb_arg);
1522 }
1523 
1524 static void
1525 ftl_write_fail(struct ftl_io *io, int status)
1526 {
1527 	struct ftl_batch *batch = io->batch;
1528 	struct spdk_ftl_dev *dev = io->dev;
1529 	struct ftl_wbuf_entry *entry;
1530 	struct ftl_band *band;
1531 	char buf[128];
1532 
1533 	entry = TAILQ_FIRST(&batch->entries);
1534 
1535 	band = ftl_band_from_addr(io->dev, entry->addr);
1536 	SPDK_ERRLOG("Write failed @addr: %s, status: %d\n",
1537 		    ftl_addr2str(entry->addr, buf, sizeof(buf)), status);
1538 
1539 	/* Close the band and, halt wptr and defrag */
1540 	ftl_halt_writes(dev, band);
1541 
1542 	TAILQ_FOREACH(entry, &batch->entries, tailq) {
1543 		/* Invalidate meta set by process_writes() */
1544 		ftl_invalidate_addr(dev, entry->addr);
1545 	}
1546 
1547 	/* Reset the batch back to the write buffer to resend it later */
1548 	TAILQ_INSERT_TAIL(&dev->pending_batches, batch, tailq);
1549 }
1550 
1551 static void
1552 ftl_write_cb(struct ftl_io *io, void *arg, int status)
1553 {
1554 	struct spdk_ftl_dev *dev = io->dev;
1555 	struct ftl_batch *batch = io->batch;
1556 	struct ftl_wbuf_entry *entry;
1557 	struct ftl_band *band;
1558 	struct ftl_addr prev_addr, addr = io->addr;
1559 
1560 	if (status) {
1561 		ftl_write_fail(io, status);
1562 		return;
1563 	}
1564 
1565 	assert(io->num_blocks == dev->xfer_size);
1566 	assert(!(io->flags & FTL_IO_MD));
1567 
1568 	TAILQ_FOREACH(entry, &batch->entries, tailq) {
1569 		band = entry->band;
1570 		if (!(entry->io_flags & FTL_IO_PAD)) {
1571 			/* Verify that the LBA is set for user blocks */
1572 			assert(entry->lba != FTL_LBA_INVALID);
1573 		}
1574 
1575 		if (band != NULL) {
1576 			assert(band->num_reloc_blocks > 0);
1577 			band->num_reloc_blocks--;
1578 		}
1579 
1580 		entry->addr = addr;
1581 		if (entry->lba != FTL_LBA_INVALID) {
1582 			pthread_spin_lock(&entry->lock);
1583 			prev_addr = ftl_l2p_get(dev, entry->lba);
1584 
1585 			/* If the l2p was updated in the meantime, don't update band's metadata */
1586 			if (ftl_addr_cached(prev_addr) &&
1587 			    entry == ftl_get_entry_from_addr(dev, prev_addr)) {
1588 				/* Setting entry's cache bit needs to be done after metadata */
1589 				/* within the band is updated to make sure that writes */
1590 				/* invalidating the entry clear the metadata as well */
1591 				ftl_band_set_addr(io->band, entry->lba, entry->addr);
1592 				entry->valid = true;
1593 			}
1594 			pthread_spin_unlock(&entry->lock);
1595 		}
1596 
1597 		SPDK_DEBUGLOG(ftl_core, "Write addr:%lu, lba:%lu\n",
1598 			      entry->addr.offset, entry->lba);
1599 
1600 		addr = ftl_band_next_addr(io->band, addr, 1);
1601 	}
1602 
1603 	ftl_process_flush(dev, batch);
1604 	ftl_release_batch(dev, batch);
1605 }
1606 
1607 static void
1608 ftl_update_stats(struct spdk_ftl_dev *dev, const struct ftl_wbuf_entry *entry)
1609 {
1610 	if (!(entry->io_flags & FTL_IO_INTERNAL)) {
1611 		dev->stats.write_user++;
1612 	}
1613 	dev->stats.write_total++;
1614 }
1615 
1616 static void
1617 ftl_update_l2p(struct spdk_ftl_dev *dev, const struct ftl_wbuf_entry *entry,
1618 	       struct ftl_addr addr)
1619 {
1620 	struct ftl_addr prev_addr;
1621 	struct ftl_wbuf_entry *prev;
1622 	struct ftl_band *band;
1623 	int valid;
1624 	bool io_weak = entry->io_flags & FTL_IO_WEAK;
1625 
1626 	prev_addr = ftl_l2p_get(dev, entry->lba);
1627 	if (ftl_addr_invalid(prev_addr)) {
1628 		ftl_l2p_set(dev, entry->lba, addr);
1629 		return;
1630 	}
1631 
1632 	if (ftl_addr_cached(prev_addr)) {
1633 		prev = ftl_get_entry_from_addr(dev, prev_addr);
1634 		pthread_spin_lock(&prev->lock);
1635 
1636 		/* Re-read the L2P under the lock to protect against updates */
1637 		/* to this LBA from other threads */
1638 		prev_addr = ftl_l2p_get(dev, entry->lba);
1639 
1640 		/* If the entry is no longer in cache, another write has been */
1641 		/* scheduled in the meantime, so we can return to evicted path */
1642 		if (!ftl_addr_cached(prev_addr)) {
1643 			pthread_spin_unlock(&prev->lock);
1644 			goto evicted;
1645 		}
1646 
1647 		/*
1648 		 * Relocating block could still reside in cache due to fact that write
1649 		 * buffers are independent for each IO channel and enough amount of data
1650 		 * (write unit size) must be collected before it will be submitted to lower
1651 		 * layer.
1652 		 * When previous entry wasn't overwritten invalidate old address and entry.
1653 		 * Otherwise skip relocating block.
1654 		 */
1655 		if (io_weak &&
1656 		    /* Check if prev_addr was updated in meantime */
1657 		    !(ftl_addr_cmp(prev_addr, ftl_get_addr_from_entry(prev)) &&
1658 		      /* Check if relocating address it the same as in previous entry */
1659 		      ftl_addr_cmp(prev->addr, entry->addr))) {
1660 			pthread_spin_unlock(&prev->lock);
1661 			return;
1662 		}
1663 
1664 		/*
1665 		 * If previous entry is part of cache and was written into disk remove
1666 		 * and invalidate it
1667 		 */
1668 		if (prev->valid) {
1669 			ftl_invalidate_addr(dev, prev->addr);
1670 			prev->valid = false;
1671 		}
1672 
1673 		ftl_l2p_set(dev, entry->lba, addr);
1674 		pthread_spin_unlock(&prev->lock);
1675 		return;
1676 	}
1677 
1678 evicted:
1679 	/*
1680 	 *  If the L2P's physical address is different than what we expected we don't need to
1681 	 *  do anything (someone's already overwritten our data).
1682 	 */
1683 	if (io_weak && !ftl_addr_cmp(prev_addr, entry->addr)) {
1684 		return;
1685 	}
1686 
1687 	/* Lock the band containing previous physical address. This assures atomic changes to */
1688 	/* the L2P as wall as metadata. The valid bits in metadata are used to */
1689 	/* check weak writes validity. */
1690 	band = ftl_band_from_addr(dev, prev_addr);
1691 	pthread_spin_lock(&band->lba_map.lock);
1692 
1693 	valid = ftl_invalidate_addr_unlocked(dev, prev_addr);
1694 
1695 	/* If the address has been invalidated already, we don't want to update */
1696 	/* the L2P for weak writes, as it means the write is no longer valid. */
1697 	if (!io_weak || valid) {
1698 		ftl_l2p_set(dev, entry->lba, addr);
1699 	}
1700 
1701 	pthread_spin_unlock(&band->lba_map.lock);
1702 }
1703 
1704 static struct ftl_io *
1705 ftl_io_init_child_write(struct ftl_io *parent, struct ftl_addr addr, ftl_io_fn cb)
1706 {
1707 	struct ftl_io *io;
1708 	struct spdk_ftl_dev *dev = parent->dev;
1709 	struct ftl_io_init_opts opts = {
1710 		.dev		= dev,
1711 		.io		= NULL,
1712 		.parent		= parent,
1713 		.band		= parent->band,
1714 		.size		= sizeof(struct ftl_io),
1715 		.flags		= 0,
1716 		.type		= parent->type,
1717 		.num_blocks	= dev->xfer_size,
1718 		.cb_fn		= cb,
1719 		.iovcnt		= 0,
1720 	};
1721 
1722 	io = ftl_io_init_internal(&opts);
1723 	if (!io) {
1724 		return NULL;
1725 	}
1726 
1727 	io->addr = addr;
1728 
1729 	return io;
1730 }
1731 
1732 static void
1733 ftl_io_child_write_cb(struct ftl_io *io, void *ctx, int status)
1734 {
1735 	struct ftl_zone *zone;
1736 	struct ftl_wptr *wptr;
1737 
1738 	zone = ftl_band_zone_from_addr(io->band, io->addr);
1739 	wptr = ftl_wptr_from_band(io->band);
1740 
1741 	zone->busy = false;
1742 	zone->info.write_pointer += io->num_blocks;
1743 
1744 	if (zone->info.write_pointer == zone->info.zone_id + zone->info.capacity) {
1745 		zone->info.state = SPDK_BDEV_ZONE_STATE_FULL;
1746 	}
1747 
1748 	/* If some other write on the same band failed the write pointer would already be freed */
1749 	if (spdk_likely(wptr)) {
1750 		wptr->num_outstanding--;
1751 	}
1752 }
1753 
1754 static int
1755 ftl_submit_child_write(struct ftl_wptr *wptr, struct ftl_io *io)
1756 {
1757 	struct spdk_ftl_dev	*dev = io->dev;
1758 	struct ftl_io_channel	*ioch;
1759 	struct ftl_io		*child;
1760 	struct ftl_addr		addr;
1761 	int			rc;
1762 
1763 	ioch = ftl_io_channel_get_ctx(io->ioch);
1764 
1765 	if (spdk_likely(!wptr->direct_mode)) {
1766 		addr = wptr->addr;
1767 	} else {
1768 		assert(io->flags & FTL_IO_DIRECT_ACCESS);
1769 		assert(ftl_addr_get_band(dev, io->addr) == wptr->band->id);
1770 		addr = io->addr;
1771 	}
1772 
1773 	/* Split IO to child requests and release zone immediately after child is completed */
1774 	child = ftl_io_init_child_write(io, addr, ftl_io_child_write_cb);
1775 	if (!child) {
1776 		return -EAGAIN;
1777 	}
1778 
1779 	wptr->num_outstanding++;
1780 
1781 	if (ftl_is_append_supported(dev)) {
1782 		rc = spdk_bdev_zone_appendv(dev->base_bdev_desc, ioch->base_ioch,
1783 					    child->iov, child->iov_cnt,
1784 					    ftl_addr_get_zone_slba(dev, addr),
1785 					    dev->xfer_size, ftl_io_cmpl_cb, child);
1786 	} else {
1787 		rc = spdk_bdev_writev_blocks(dev->base_bdev_desc, ioch->base_ioch,
1788 					     child->iov, child->iov_cnt, addr.offset,
1789 					     dev->xfer_size, ftl_io_cmpl_cb, child);
1790 	}
1791 
1792 	if (rc) {
1793 		wptr->num_outstanding--;
1794 		ftl_io_fail(child, rc);
1795 		ftl_io_complete(child);
1796 		SPDK_ERRLOG("spdk_bdev_write_blocks_with_md failed with status:%d, addr:%lu\n",
1797 			    rc, addr.offset);
1798 		return -EIO;
1799 	}
1800 
1801 	ftl_io_inc_req(child);
1802 	ftl_io_advance(child, dev->xfer_size);
1803 
1804 	return 0;
1805 }
1806 
1807 static int
1808 ftl_submit_write(struct ftl_wptr *wptr, struct ftl_io *io)
1809 {
1810 	struct spdk_ftl_dev	*dev = io->dev;
1811 	int			rc = 0;
1812 
1813 	assert(io->num_blocks % dev->xfer_size == 0);
1814 
1815 	while (io->iov_pos < io->iov_cnt) {
1816 		/* There are no guarantees of the order of completion of NVMe IO submission queue */
1817 		/* so wait until zone is not busy before submitting another write */
1818 		if (!ftl_is_append_supported(dev) && wptr->zone->busy) {
1819 			TAILQ_INSERT_TAIL(&wptr->pending_queue, io, ioch_entry);
1820 			rc = -EAGAIN;
1821 			break;
1822 		}
1823 
1824 		rc = ftl_submit_child_write(wptr, io);
1825 		if (spdk_unlikely(rc)) {
1826 			if (rc == -EAGAIN) {
1827 				TAILQ_INSERT_TAIL(&wptr->pending_queue, io, ioch_entry);
1828 			} else {
1829 				ftl_io_fail(io, rc);
1830 			}
1831 			break;
1832 		}
1833 
1834 		ftl_trace_submission(dev, io, wptr->addr, dev->xfer_size);
1835 		ftl_wptr_advance(wptr, dev->xfer_size);
1836 	}
1837 
1838 	if (ftl_io_done(io)) {
1839 		/* Parent IO will complete after all children are completed */
1840 		ftl_io_complete(io);
1841 	}
1842 
1843 	return rc;
1844 }
1845 
1846 static void
1847 ftl_flush_pad_batch(struct spdk_ftl_dev *dev)
1848 {
1849 	struct ftl_batch *batch = dev->current_batch;
1850 	struct ftl_io_channel *ioch;
1851 	size_t size = 0, num_entries = 0;
1852 
1853 	assert(batch != NULL);
1854 	assert(batch->num_entries < dev->xfer_size);
1855 
1856 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
1857 		size += spdk_ring_count(ioch->submit_queue);
1858 	}
1859 
1860 	num_entries = dev->xfer_size - batch->num_entries;
1861 	if (size < num_entries) {
1862 		ftl_pad_wbuf(dev, num_entries - size);
1863 	}
1864 }
1865 
1866 static bool
1867 ftl_check_io_channel_flush(struct spdk_ftl_dev *dev)
1868 {
1869 	struct ftl_io_channel *ioch;
1870 
1871 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
1872 		if (ioch->flush && spdk_ring_count(ioch->free_queue) != ioch->num_entries) {
1873 			return true;
1874 		}
1875 	}
1876 
1877 	return false;
1878 }
1879 
1880 static int
1881 ftl_wptr_process_writes(struct ftl_wptr *wptr)
1882 {
1883 	struct spdk_ftl_dev	*dev = wptr->dev;
1884 	struct ftl_batch	*batch;
1885 	struct ftl_wbuf_entry	*entry;
1886 	struct ftl_io		*io;
1887 
1888 	if (spdk_unlikely(!TAILQ_EMPTY(&wptr->pending_queue))) {
1889 		io = TAILQ_FIRST(&wptr->pending_queue);
1890 		TAILQ_REMOVE(&wptr->pending_queue, io, ioch_entry);
1891 
1892 		if (ftl_submit_write(wptr, io) == -EAGAIN) {
1893 			return 0;
1894 		}
1895 	}
1896 
1897 	/* Make sure the band is prepared for writing */
1898 	if (!ftl_wptr_ready(wptr)) {
1899 		return 0;
1900 	}
1901 
1902 	if (dev->halt) {
1903 		ftl_wptr_process_shutdown(wptr);
1904 	}
1905 
1906 	if (spdk_unlikely(wptr->flush)) {
1907 		ftl_wptr_pad_band(wptr);
1908 	}
1909 
1910 	batch = ftl_get_next_batch(dev);
1911 	if (!batch) {
1912 		/* If there are queued flush requests we need to pad the write buffer to */
1913 		/* force out remaining entries */
1914 		if (!LIST_EMPTY(&dev->flush_list) || ftl_check_io_channel_flush(dev)) {
1915 			ftl_flush_pad_batch(dev);
1916 		}
1917 
1918 		return 0;
1919 	}
1920 
1921 	io = ftl_io_wbuf_init(dev, wptr->addr, wptr->band, batch, ftl_write_cb);
1922 	if (!io) {
1923 		goto error;
1924 	}
1925 
1926 	TAILQ_FOREACH(entry, &batch->entries, tailq) {
1927 		/* Update band's relocation stats if the IO comes from reloc */
1928 		if (entry->io_flags & FTL_IO_WEAK) {
1929 			if (!spdk_bit_array_get(wptr->band->reloc_bitmap, entry->band->id)) {
1930 				spdk_bit_array_set(wptr->band->reloc_bitmap, entry->band->id);
1931 				entry->band->num_reloc_bands++;
1932 			}
1933 		}
1934 
1935 		ftl_trace_wbuf_pop(dev, entry);
1936 		ftl_update_stats(dev, entry);
1937 	}
1938 
1939 	SPDK_DEBUGLOG(ftl_core, "Write addr:%lx\n", wptr->addr.offset);
1940 
1941 	if (ftl_submit_write(wptr, io)) {
1942 		/* TODO: we need some recovery here */
1943 		assert(0 && "Write submit failed");
1944 		if (ftl_io_done(io)) {
1945 			ftl_io_free(io);
1946 		}
1947 	}
1948 
1949 	return dev->xfer_size;
1950 error:
1951 	TAILQ_INSERT_TAIL(&dev->pending_batches, batch, tailq);
1952 	return 0;
1953 }
1954 
1955 static bool
1956 ftl_process_writes(struct spdk_ftl_dev *dev)
1957 {
1958 	struct ftl_wptr *wptr, *twptr;
1959 	size_t num_active = 0, num_writes = 0;
1960 	enum ftl_band_state state;
1961 
1962 	LIST_FOREACH_SAFE(wptr, &dev->wptr_list, list_entry, twptr) {
1963 		num_writes += ftl_wptr_process_writes(wptr);
1964 		state = wptr->band->state;
1965 
1966 		if (state != FTL_BAND_STATE_FULL &&
1967 		    state != FTL_BAND_STATE_CLOSING &&
1968 		    state != FTL_BAND_STATE_CLOSED) {
1969 			num_active++;
1970 		}
1971 	}
1972 
1973 	if (num_active < 1) {
1974 		ftl_add_wptr(dev);
1975 	}
1976 
1977 	return num_writes != 0;
1978 }
1979 
1980 static void
1981 ftl_fill_wbuf_entry(struct ftl_wbuf_entry *entry, struct ftl_io *io)
1982 {
1983 	memcpy(entry->payload, ftl_io_iovec_addr(io), FTL_BLOCK_SIZE);
1984 
1985 	if (entry->io_flags & FTL_IO_WEAK) {
1986 		entry->band = ftl_band_from_addr(io->dev, io->addr);
1987 		entry->addr = ftl_band_next_addr(entry->band, io->addr, io->pos);
1988 		entry->band->num_reloc_blocks++;
1989 	}
1990 
1991 	entry->trace = io->trace;
1992 	entry->lba = ftl_io_current_lba(io);
1993 }
1994 
1995 static int
1996 ftl_wbuf_fill(struct ftl_io *io)
1997 {
1998 	struct spdk_ftl_dev *dev = io->dev;
1999 	struct ftl_io_channel *ioch;
2000 	struct ftl_wbuf_entry *entry;
2001 
2002 	ioch = ftl_io_channel_get_ctx(io->ioch);
2003 
2004 	while (io->pos < io->num_blocks) {
2005 		if (ftl_io_current_lba(io) == FTL_LBA_INVALID) {
2006 			ftl_io_advance(io, 1);
2007 			continue;
2008 		}
2009 
2010 		entry = ftl_acquire_wbuf_entry(ioch, io->flags);
2011 		if (!entry) {
2012 			TAILQ_INSERT_TAIL(&ioch->retry_queue, io, ioch_entry);
2013 			return 0;
2014 		}
2015 
2016 		ftl_fill_wbuf_entry(entry, io);
2017 
2018 		ftl_trace_wbuf_fill(dev, io);
2019 		ftl_update_l2p(dev, entry, ftl_get_addr_from_entry(entry));
2020 		ftl_io_advance(io, 1);
2021 
2022 		/* Needs to be done after L2P is updated to avoid race with */
2023 		/* write completion callback when it's processed faster than */
2024 		/* L2P is set in update_l2p(). */
2025 		spdk_ring_enqueue(ioch->submit_queue, (void **)&entry, 1, NULL);
2026 	}
2027 
2028 	if (ftl_io_done(io)) {
2029 		if (ftl_dev_has_nv_cache(dev) && !(io->flags & FTL_IO_BYPASS_CACHE)) {
2030 			ftl_write_nv_cache(io);
2031 		} else {
2032 			TAILQ_INSERT_TAIL(&ioch->write_cmpl_queue, io, ioch_entry);
2033 		}
2034 	}
2035 
2036 	return 0;
2037 }
2038 
2039 static bool
2040 ftl_dev_needs_defrag(struct spdk_ftl_dev *dev)
2041 {
2042 	const struct spdk_ftl_limit *limit = ftl_get_limit(dev, SPDK_FTL_LIMIT_START);
2043 
2044 	if (ftl_reloc_is_halted(dev->reloc)) {
2045 		return false;
2046 	}
2047 
2048 	if (ftl_reloc_is_defrag_active(dev->reloc)) {
2049 		return false;
2050 	}
2051 
2052 	if (dev->num_free <= limit->thld) {
2053 		return true;
2054 	}
2055 
2056 	return false;
2057 }
2058 
2059 static double
2060 ftl_band_calc_merit(struct ftl_band *band, size_t *threshold_valid)
2061 {
2062 	size_t usable, valid, invalid;
2063 	double vld_ratio;
2064 
2065 	/* If the band doesn't have any usable blocks it's of no use */
2066 	usable = ftl_band_num_usable_blocks(band);
2067 	if (usable == 0) {
2068 		return 0.0;
2069 	}
2070 
2071 	valid =  threshold_valid ? (usable - *threshold_valid) : band->lba_map.num_vld;
2072 	invalid = usable - valid;
2073 
2074 	/* Add one to avoid division by 0 */
2075 	vld_ratio = (double)invalid / (double)(valid + 1);
2076 	return vld_ratio * ftl_band_age(band);
2077 }
2078 
2079 static bool
2080 ftl_band_needs_defrag(struct ftl_band *band, struct spdk_ftl_dev *dev)
2081 {
2082 	struct spdk_ftl_conf *conf = &dev->conf;
2083 	size_t thld_vld;
2084 
2085 	/* If we're in dire need of free bands, every band is worth defragging */
2086 	if (ftl_current_limit(dev) == SPDK_FTL_LIMIT_CRIT) {
2087 		return true;
2088 	}
2089 
2090 	thld_vld = (ftl_band_num_usable_blocks(band) * conf->invalid_thld) / 100;
2091 
2092 	return band->merit > ftl_band_calc_merit(band, &thld_vld);
2093 }
2094 
2095 static struct ftl_band *
2096 ftl_select_defrag_band(struct spdk_ftl_dev *dev)
2097 {
2098 	struct ftl_band *band, *mband = NULL;
2099 	double merit = 0;
2100 
2101 	LIST_FOREACH(band, &dev->shut_bands, list_entry) {
2102 		assert(band->state == FTL_BAND_STATE_CLOSED);
2103 		band->merit = ftl_band_calc_merit(band, NULL);
2104 		if (band->merit > merit) {
2105 			merit = band->merit;
2106 			mband = band;
2107 		}
2108 	}
2109 
2110 	if (mband && !ftl_band_needs_defrag(mband, dev)) {
2111 		mband = NULL;
2112 	}
2113 
2114 	return mband;
2115 }
2116 
2117 static bool
2118 ftl_process_relocs(struct spdk_ftl_dev *dev)
2119 {
2120 	struct ftl_band *band;
2121 
2122 	if (ftl_dev_needs_defrag(dev)) {
2123 		band = ftl_select_defrag_band(dev);
2124 		if (band) {
2125 			ftl_reloc_add(dev->reloc, band, 0, ftl_get_num_blocks_in_band(dev), 0, true);
2126 			ftl_trace_defrag_band(dev, band);
2127 		}
2128 	}
2129 
2130 	return ftl_reloc(dev->reloc);
2131 }
2132 
2133 int
2134 ftl_current_limit(const struct spdk_ftl_dev *dev)
2135 {
2136 	return dev->limit;
2137 }
2138 
2139 void
2140 spdk_ftl_dev_get_attrs(const struct spdk_ftl_dev *dev, struct spdk_ftl_attrs *attrs)
2141 {
2142 	attrs->uuid = dev->uuid;
2143 	attrs->num_blocks = dev->num_lbas;
2144 	attrs->block_size = FTL_BLOCK_SIZE;
2145 	attrs->num_zones = ftl_get_num_zones(dev);
2146 	attrs->zone_size = ftl_get_num_blocks_in_zone(dev);
2147 	attrs->conf = dev->conf;
2148 	attrs->base_bdev = spdk_bdev_get_name(spdk_bdev_desc_get_bdev(dev->base_bdev_desc));
2149 
2150 	attrs->cache_bdev = NULL;
2151 	if (dev->nv_cache.bdev_desc) {
2152 		attrs->cache_bdev = spdk_bdev_get_name(
2153 					    spdk_bdev_desc_get_bdev(dev->nv_cache.bdev_desc));
2154 	}
2155 }
2156 
2157 static void
2158 _ftl_io_write(void *ctx)
2159 {
2160 	ftl_io_write((struct ftl_io *)ctx);
2161 }
2162 
2163 static int
2164 ftl_submit_write_leaf(struct ftl_io *io)
2165 {
2166 	int rc;
2167 
2168 	rc = ftl_submit_write(ftl_wptr_from_band(io->band), io);
2169 	if (rc == -EAGAIN) {
2170 		/* EAGAIN means that the request was put on the pending queue */
2171 		return 0;
2172 	}
2173 
2174 	return rc;
2175 }
2176 
2177 void
2178 ftl_io_write(struct ftl_io *io)
2179 {
2180 	struct spdk_ftl_dev *dev = io->dev;
2181 	struct ftl_io_channel *ioch = ftl_io_channel_get_ctx(io->ioch);
2182 
2183 	/* Put the IO on retry queue in case IO channel is not initialized */
2184 	if (spdk_unlikely(ioch->index == FTL_IO_CHANNEL_INDEX_INVALID)) {
2185 		TAILQ_INSERT_TAIL(&ioch->retry_queue, io, ioch_entry);
2186 		return;
2187 	}
2188 
2189 	/* For normal IOs we just need to copy the data onto the write buffer */
2190 	if (!(io->flags & FTL_IO_MD)) {
2191 		ftl_io_call_foreach_child(io, ftl_wbuf_fill);
2192 	} else {
2193 		/* Metadata has its own buffer, so it doesn't have to be copied, so just */
2194 		/* send it the the core thread and schedule the write immediately */
2195 		if (ftl_check_core_thread(dev)) {
2196 			ftl_io_call_foreach_child(io, ftl_submit_write_leaf);
2197 		} else {
2198 			spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_write, io);
2199 		}
2200 	}
2201 }
2202 
2203 int
2204 spdk_ftl_write(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
2205 	       struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
2206 {
2207 	struct ftl_io *io;
2208 
2209 	if (iov_cnt == 0) {
2210 		return -EINVAL;
2211 	}
2212 
2213 	if (lba_cnt == 0) {
2214 		return -EINVAL;
2215 	}
2216 
2217 	if (lba_cnt != ftl_iovec_num_blocks(iov, iov_cnt)) {
2218 		return -EINVAL;
2219 	}
2220 
2221 	if (!dev->initialized) {
2222 		return -EBUSY;
2223 	}
2224 
2225 	io = ftl_io_user_init(ch, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_WRITE);
2226 	if (!io) {
2227 		return -ENOMEM;
2228 	}
2229 
2230 	ftl_io_write(io);
2231 
2232 	return 0;
2233 }
2234 
2235 void
2236 ftl_io_read(struct ftl_io *io)
2237 {
2238 	ftl_io_call_foreach_child(io, ftl_submit_read);
2239 }
2240 
2241 int
2242 spdk_ftl_read(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
2243 	      struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
2244 {
2245 	struct ftl_io *io;
2246 
2247 	if (iov_cnt == 0) {
2248 		return -EINVAL;
2249 	}
2250 
2251 	if (lba_cnt == 0) {
2252 		return -EINVAL;
2253 	}
2254 
2255 	if (lba_cnt != ftl_iovec_num_blocks(iov, iov_cnt)) {
2256 		return -EINVAL;
2257 	}
2258 
2259 	if (!dev->initialized) {
2260 		return -EBUSY;
2261 	}
2262 
2263 	io = ftl_io_user_init(ch, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_READ);
2264 	if (!io) {
2265 		return -ENOMEM;
2266 	}
2267 
2268 	ftl_io_read(io);
2269 	return 0;
2270 }
2271 
2272 static struct ftl_flush *
2273 ftl_flush_init(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2274 {
2275 	struct ftl_flush *flush;
2276 
2277 	flush = calloc(1, sizeof(*flush));
2278 	if (!flush) {
2279 		return NULL;
2280 	}
2281 
2282 	flush->bmap = spdk_bit_array_create(FTL_BATCH_COUNT);
2283 	if (!flush->bmap) {
2284 		goto error;
2285 	}
2286 
2287 	flush->dev = dev;
2288 	flush->cb.fn = cb_fn;
2289 	flush->cb.ctx = cb_arg;
2290 
2291 	return flush;
2292 error:
2293 	free(flush);
2294 	return NULL;
2295 }
2296 
2297 static void
2298 _ftl_flush(void *ctx)
2299 {
2300 	struct ftl_flush *flush = ctx;
2301 	struct spdk_ftl_dev *dev = flush->dev;
2302 	uint32_t i;
2303 
2304 	/* Attach flush object to all non-empty batches */
2305 	for (i = 0; i < FTL_BATCH_COUNT; ++i) {
2306 		if (dev->batch_array[i].num_entries > 0) {
2307 			spdk_bit_array_set(flush->bmap, i);
2308 			flush->num_req++;
2309 		}
2310 	}
2311 
2312 	LIST_INSERT_HEAD(&dev->flush_list, flush, list_entry);
2313 
2314 	/* If the write buffer was already empty, the flush can be completed right away */
2315 	if (!flush->num_req) {
2316 		ftl_complete_flush(flush);
2317 	}
2318 }
2319 
2320 int
2321 ftl_flush_wbuf(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2322 {
2323 	struct ftl_flush *flush;
2324 
2325 	flush = ftl_flush_init(dev, cb_fn, cb_arg);
2326 	if (!flush) {
2327 		return -ENOMEM;
2328 	}
2329 
2330 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_flush, flush);
2331 	return 0;
2332 }
2333 
2334 int
2335 spdk_ftl_flush(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2336 {
2337 	if (!dev->initialized) {
2338 		return -EBUSY;
2339 	}
2340 
2341 	return ftl_flush_wbuf(dev, cb_fn, cb_arg);
2342 }
2343 
2344 bool
2345 ftl_addr_is_written(struct ftl_band *band, struct ftl_addr addr)
2346 {
2347 	struct ftl_zone *zone = ftl_band_zone_from_addr(band, addr);
2348 
2349 	return addr.offset < zone->info.write_pointer;
2350 }
2351 
2352 static void ftl_process_media_event(struct spdk_ftl_dev *dev, struct spdk_bdev_media_event event);
2353 
2354 static void
2355 _ftl_process_media_event(void *ctx)
2356 {
2357 	struct ftl_media_event *event = ctx;
2358 	struct spdk_ftl_dev *dev = event->dev;
2359 
2360 	ftl_process_media_event(dev, event->event);
2361 	spdk_mempool_put(dev->media_events_pool, event);
2362 }
2363 
2364 static void
2365 ftl_process_media_event(struct spdk_ftl_dev *dev, struct spdk_bdev_media_event event)
2366 {
2367 	struct ftl_band *band;
2368 	struct ftl_addr addr = { .offset = event.offset };
2369 	size_t block_off;
2370 
2371 	if (!ftl_check_core_thread(dev)) {
2372 		struct ftl_media_event *media_event;
2373 
2374 		media_event = spdk_mempool_get(dev->media_events_pool);
2375 		if (!media_event) {
2376 			SPDK_ERRLOG("Media event lost due to lack of memory");
2377 			return;
2378 		}
2379 
2380 		media_event->dev = dev;
2381 		media_event->event = event;
2382 		spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_process_media_event,
2383 				     media_event);
2384 		return;
2385 	}
2386 
2387 	band = ftl_band_from_addr(dev, addr);
2388 	block_off = ftl_band_block_offset_from_addr(band, addr);
2389 
2390 	ftl_reloc_add(dev->reloc, band, block_off, event.num_blocks, 0, false);
2391 }
2392 
2393 void
2394 ftl_get_media_events(struct spdk_ftl_dev *dev)
2395 {
2396 #define FTL_MAX_MEDIA_EVENTS 128
2397 	struct spdk_bdev_media_event events[FTL_MAX_MEDIA_EVENTS];
2398 	size_t num_events, i;
2399 
2400 	if (!dev->initialized) {
2401 		return;
2402 	}
2403 
2404 	do {
2405 		num_events = spdk_bdev_get_media_events(dev->base_bdev_desc,
2406 							events, FTL_MAX_MEDIA_EVENTS);
2407 
2408 		for (i = 0; i < num_events; ++i) {
2409 			ftl_process_media_event(dev, events[i]);
2410 		}
2411 
2412 	} while (num_events);
2413 }
2414 
2415 int
2416 ftl_io_channel_poll(void *arg)
2417 {
2418 	struct ftl_io_channel *ch = arg;
2419 	struct ftl_io *io;
2420 	TAILQ_HEAD(, ftl_io) retry_queue;
2421 
2422 	if (TAILQ_EMPTY(&ch->write_cmpl_queue) && TAILQ_EMPTY(&ch->retry_queue)) {
2423 		return SPDK_POLLER_IDLE;
2424 	}
2425 
2426 	while (!TAILQ_EMPTY(&ch->write_cmpl_queue)) {
2427 		io = TAILQ_FIRST(&ch->write_cmpl_queue);
2428 		TAILQ_REMOVE(&ch->write_cmpl_queue, io, ioch_entry);
2429 		ftl_io_complete(io);
2430 	}
2431 
2432 	/*
2433 	 * Create local copy of the retry queue to prevent from infinite retrying if IO will be
2434 	 * inserted to the retry queue again
2435 	 */
2436 	TAILQ_INIT(&retry_queue);
2437 	TAILQ_SWAP(&ch->retry_queue, &retry_queue, ftl_io, ioch_entry);
2438 
2439 	while (!TAILQ_EMPTY(&retry_queue)) {
2440 		io = TAILQ_FIRST(&retry_queue);
2441 		TAILQ_REMOVE(&retry_queue, io, ioch_entry);
2442 		if (io->type == FTL_IO_WRITE) {
2443 			ftl_io_write(io);
2444 		} else {
2445 			ftl_io_read(io);
2446 		}
2447 	}
2448 
2449 	return SPDK_POLLER_BUSY;
2450 }
2451 
2452 int
2453 ftl_task_core(void *ctx)
2454 {
2455 	struct spdk_ftl_dev *dev = ctx;
2456 	bool busy;
2457 
2458 	if (dev->halt) {
2459 		if (ftl_shutdown_complete(dev)) {
2460 			spdk_poller_unregister(&dev->core_poller);
2461 			return SPDK_POLLER_IDLE;
2462 		}
2463 	}
2464 
2465 	busy = ftl_process_writes(dev) || ftl_process_relocs(dev);
2466 
2467 	return busy ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
2468 }
2469 
2470 SPDK_LOG_REGISTER_COMPONENT(ftl_core)
2471