xref: /spdk/lib/ftl/ftl_core.c (revision 94a84ae98590bea46939eb1dcd7a9876bd393b54)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/likely.h"
35 #include "spdk/stdinc.h"
36 #include "spdk/nvme.h"
37 #include "spdk/thread.h"
38 #include "spdk/bdev_module.h"
39 #include "spdk/string.h"
40 #include "spdk_internal/log.h"
41 #include "spdk/ftl.h"
42 #include "spdk/crc32.h"
43 
44 #include "ftl_core.h"
45 #include "ftl_band.h"
46 #include "ftl_io.h"
47 #include "ftl_debug.h"
48 #include "ftl_reloc.h"
49 
50 struct ftl_band_flush {
51 	struct spdk_ftl_dev		*dev;
52 	/* Number of bands left to be flushed */
53 	size_t				num_bands;
54 	/* User callback */
55 	spdk_ftl_fn			cb_fn;
56 	/* Callback's argument */
57 	void				*cb_arg;
58 	/* List link */
59 	LIST_ENTRY(ftl_band_flush)	list_entry;
60 };
61 
62 struct ftl_wptr {
63 	/* Owner device */
64 	struct spdk_ftl_dev		*dev;
65 
66 	/* Current address */
67 	struct ftl_addr			addr;
68 
69 	/* Band currently being written to */
70 	struct ftl_band			*band;
71 
72 	/* Current logical block's offset */
73 	uint64_t			offset;
74 
75 	/* Current zone */
76 	struct ftl_zone			*zone;
77 
78 	/* Pending IO queue */
79 	TAILQ_HEAD(, ftl_io)		pending_queue;
80 
81 	/* List link */
82 	LIST_ENTRY(ftl_wptr)		list_entry;
83 
84 	/*
85 	 * If setup in direct mode, there will be no offset or band state update after IO.
86 	 * The zoned bdev address is not assigned by wptr, and is instead taken directly
87 	 * from the request.
88 	 */
89 	bool				direct_mode;
90 
91 	/* Number of outstanding write requests */
92 	uint32_t			num_outstanding;
93 
94 	/* Marks that the band related to this wptr needs to be closed as soon as possible */
95 	bool				flush;
96 };
97 
98 struct ftl_flush {
99 	/* Owner device */
100 	struct spdk_ftl_dev		*dev;
101 
102 	/* Number of batches to wait for */
103 	size_t				num_req;
104 
105 	/* Callback */
106 	struct {
107 		spdk_ftl_fn		fn;
108 		void			*ctx;
109 	} cb;
110 
111 	/* Batch bitmap */
112 	struct spdk_bit_array		*bmap;
113 
114 	/* List link */
115 	LIST_ENTRY(ftl_flush)		list_entry;
116 };
117 
118 static void
119 ftl_wptr_free(struct ftl_wptr *wptr)
120 {
121 	if (!wptr) {
122 		return;
123 	}
124 
125 	free(wptr);
126 }
127 
128 static void
129 ftl_remove_wptr(struct ftl_wptr *wptr)
130 {
131 	struct spdk_ftl_dev *dev = wptr->dev;
132 	struct ftl_band_flush *flush, *tmp;
133 
134 	if (spdk_unlikely(wptr->flush)) {
135 		LIST_FOREACH_SAFE(flush, &dev->band_flush_list, list_entry, tmp) {
136 			assert(flush->num_bands > 0);
137 			if (--flush->num_bands == 0) {
138 				flush->cb_fn(flush->cb_arg, 0);
139 				LIST_REMOVE(flush, list_entry);
140 				free(flush);
141 			}
142 		}
143 	}
144 
145 	LIST_REMOVE(wptr, list_entry);
146 	ftl_wptr_free(wptr);
147 }
148 
149 static void ftl_evict_cache_entry(struct spdk_ftl_dev *dev, struct ftl_wbuf_entry *entry);
150 
151 static struct ftl_wbuf_entry *
152 ftl_acquire_wbuf_entry(struct ftl_io_channel *io_channel, int io_flags)
153 {
154 	struct ftl_wbuf_entry *entry;
155 	uint32_t qdepth;
156 
157 	if (!(io_flags & FTL_IO_INTERNAL)) {
158 		qdepth = __atomic_fetch_add(&io_channel->qdepth_current, 1, __ATOMIC_SEQ_CST);
159 		if (qdepth >= io_channel->qdepth_limit) {
160 			__atomic_fetch_sub(&io_channel->qdepth_current, 1, __ATOMIC_SEQ_CST);
161 			return NULL;
162 		}
163 	}
164 
165 	if (spdk_ring_dequeue(io_channel->free_queue, (void **)&entry, 1) != 1) {
166 		if (!(io_flags & FTL_IO_INTERNAL)) {
167 			__atomic_fetch_sub(&io_channel->qdepth_current, 1, __ATOMIC_SEQ_CST);
168 		}
169 
170 		return NULL;
171 	}
172 
173 	ftl_evict_cache_entry(io_channel->dev, entry);
174 
175 	entry->io_flags = io_flags;
176 	entry->addr.offset = FTL_ADDR_INVALID;
177 	entry->lba = FTL_LBA_INVALID;
178 	entry->band = NULL;
179 	entry->valid = false;
180 
181 	return entry;
182 }
183 
184 static void
185 ftl_release_wbuf_entry(struct ftl_wbuf_entry *entry)
186 {
187 	struct ftl_io_channel *io_channel = entry->ioch;
188 
189 	if (!(entry->io_flags & FTL_IO_INTERNAL)) {
190 		__atomic_fetch_sub(&io_channel->qdepth_current, 1, __ATOMIC_SEQ_CST);
191 	}
192 
193 	spdk_ring_enqueue(io_channel->free_queue, (void **)&entry, 1, NULL);
194 }
195 
196 static struct ftl_batch *
197 ftl_get_next_batch(struct spdk_ftl_dev *dev)
198 {
199 	struct ftl_batch *batch = dev->current_batch;
200 	struct ftl_io_channel *ioch;
201 #define FTL_DEQUEUE_ENTRIES 128
202 	struct ftl_wbuf_entry *entries[FTL_DEQUEUE_ENTRIES];
203 	TAILQ_HEAD(, ftl_io_channel) ioch_queue;
204 	size_t i, num_dequeued, num_remaining;
205 	uint64_t *metadata;
206 
207 	if (batch == NULL) {
208 		batch = TAILQ_FIRST(&dev->pending_batches);
209 		if (batch != NULL) {
210 			TAILQ_REMOVE(&dev->pending_batches, batch, tailq);
211 			return batch;
212 		}
213 
214 		batch = TAILQ_FIRST(&dev->free_batches);
215 		if (spdk_unlikely(batch == NULL)) {
216 			return NULL;
217 		}
218 
219 		assert(TAILQ_EMPTY(&batch->entries));
220 		assert(batch->num_entries == 0);
221 		TAILQ_REMOVE(&dev->free_batches, batch, tailq);
222 	}
223 
224 	/*
225 	 * Keep shifting the queue to ensure fairness in IO channel selection.  Each time
226 	 * ftl_get_next_batch() is called, we're starting to dequeue write buffer entries from a
227 	 * different IO channel.
228 	 */
229 	TAILQ_INIT(&ioch_queue);
230 	while (!TAILQ_EMPTY(&dev->ioch_queue)) {
231 		ioch = TAILQ_FIRST(&dev->ioch_queue);
232 		TAILQ_REMOVE(&dev->ioch_queue, ioch, tailq);
233 		TAILQ_INSERT_TAIL(&ioch_queue, ioch, tailq);
234 
235 		num_remaining = dev->xfer_size - batch->num_entries;
236 		while (num_remaining > 0) {
237 			num_dequeued = spdk_ring_dequeue(ioch->submit_queue, (void **)entries,
238 							 spdk_min(num_remaining,
239 									 FTL_DEQUEUE_ENTRIES));
240 			if (num_dequeued == 0) {
241 				break;
242 			}
243 
244 			for (i = 0; i < num_dequeued; ++i) {
245 				batch->iov[batch->num_entries + i].iov_base = entries[i]->payload;
246 				batch->iov[batch->num_entries + i].iov_len = FTL_BLOCK_SIZE;
247 
248 				if (batch->metadata != NULL) {
249 					metadata = (uint64_t *)((char *)batch->metadata +
250 								i * dev->md_size);
251 					*metadata = entries[i]->lba;
252 				}
253 
254 				TAILQ_INSERT_TAIL(&batch->entries, entries[i], tailq);
255 			}
256 
257 			batch->num_entries += num_dequeued;
258 			num_remaining -= num_dequeued;
259 		}
260 
261 		if (num_remaining == 0) {
262 			break;
263 		}
264 	}
265 
266 	TAILQ_CONCAT(&dev->ioch_queue, &ioch_queue, tailq);
267 
268 	if (batch->num_entries == dev->xfer_size) {
269 		dev->current_batch = NULL;
270 	} else {
271 		dev->current_batch = batch;
272 		batch = NULL;
273 	}
274 
275 	return batch;
276 }
277 
278 static void
279 ftl_release_batch(struct spdk_ftl_dev *dev, struct ftl_batch *batch)
280 {
281 	struct ftl_wbuf_entry *entry;
282 
283 	while (!TAILQ_EMPTY(&batch->entries)) {
284 		entry = TAILQ_FIRST(&batch->entries);
285 		TAILQ_REMOVE(&batch->entries, entry, tailq);
286 		ftl_release_wbuf_entry(entry);
287 	}
288 
289 	batch->num_entries = 0;
290 	TAILQ_INSERT_TAIL(&dev->free_batches, batch, tailq);
291 }
292 
293 static struct ftl_wbuf_entry *
294 ftl_get_entry_from_addr(struct spdk_ftl_dev *dev, struct ftl_addr addr)
295 {
296 	struct ftl_io_channel *ioch;
297 	uint64_t ioch_offset, entry_offset;
298 
299 	ioch_offset = addr.cache_offset & ((1 << dev->ioch_shift) - 1);
300 	entry_offset = addr.cache_offset >> dev->ioch_shift;
301 	ioch = dev->ioch_array[ioch_offset];
302 
303 	assert(ioch_offset < dev->conf.max_io_channels);
304 	assert(entry_offset < ioch->num_entries);
305 	assert(addr.cached == 1);
306 
307 	return &ioch->wbuf_entries[entry_offset];
308 }
309 
310 static struct ftl_addr
311 ftl_get_addr_from_entry(struct ftl_wbuf_entry *entry)
312 {
313 	struct ftl_io_channel *ioch = entry->ioch;
314 	struct ftl_addr addr = {};
315 
316 	addr.cached = 1;
317 	addr.cache_offset = (uint64_t)entry->index << ioch->dev->ioch_shift | ioch->index;
318 
319 	return addr;
320 }
321 
322 static void
323 ftl_io_cmpl_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
324 {
325 	struct ftl_io *io = cb_arg;
326 	struct spdk_ftl_dev *dev = io->dev;
327 
328 	if (spdk_unlikely(!success)) {
329 		io->status = -EIO;
330 	}
331 
332 	ftl_trace_completion(dev, io, FTL_TRACE_COMPLETION_DISK);
333 
334 	if (io->type == FTL_IO_WRITE && ftl_is_append_supported(dev)) {
335 		assert(io->parent);
336 		io->parent->addr.offset = spdk_bdev_io_get_append_location(bdev_io);
337 	}
338 
339 	ftl_io_dec_req(io);
340 	if (ftl_io_done(io)) {
341 		ftl_io_complete(io);
342 	}
343 
344 	spdk_bdev_free_io(bdev_io);
345 }
346 
347 static void
348 ftl_halt_writes(struct spdk_ftl_dev *dev, struct ftl_band *band)
349 {
350 	struct ftl_wptr *wptr = NULL;
351 
352 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
353 		if (wptr->band == band) {
354 			break;
355 		}
356 	}
357 
358 	/* If the band already has the high_prio flag set, other writes must */
359 	/* have failed earlier, so it's already taken care of. */
360 	if (band->high_prio) {
361 		assert(wptr == NULL);
362 		return;
363 	}
364 
365 	ftl_band_write_failed(band);
366 	ftl_remove_wptr(wptr);
367 }
368 
369 static struct ftl_wptr *
370 ftl_wptr_from_band(struct ftl_band *band)
371 {
372 	struct spdk_ftl_dev *dev = band->dev;
373 	struct ftl_wptr *wptr = NULL;
374 
375 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
376 		if (wptr->band == band) {
377 			return wptr;
378 		}
379 	}
380 
381 	return NULL;
382 }
383 
384 static void
385 ftl_md_write_fail(struct ftl_io *io, int status)
386 {
387 	struct ftl_band *band = io->band;
388 	struct ftl_wptr *wptr;
389 	char buf[128];
390 
391 	wptr = ftl_wptr_from_band(band);
392 	assert(wptr);
393 
394 	SPDK_ERRLOG("Metadata write failed @addr: %s, status: %d\n",
395 		    ftl_addr2str(wptr->addr, buf, sizeof(buf)), status);
396 
397 	ftl_halt_writes(io->dev, band);
398 }
399 
400 static void
401 ftl_md_write_cb(struct ftl_io *io, void *arg, int status)
402 {
403 	struct spdk_ftl_dev *dev = io->dev;
404 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
405 	struct ftl_band *band = io->band;
406 	struct ftl_wptr *wptr;
407 	size_t id;
408 
409 	wptr = ftl_wptr_from_band(band);
410 	assert(wptr);
411 
412 	if (status) {
413 		ftl_md_write_fail(io, status);
414 		return;
415 	}
416 
417 	ftl_band_set_next_state(band);
418 	if (band->state == FTL_BAND_STATE_CLOSED) {
419 		if (ftl_dev_has_nv_cache(dev)) {
420 			pthread_spin_lock(&nv_cache->lock);
421 			nv_cache->num_available += ftl_band_user_blocks(band);
422 
423 			if (spdk_unlikely(nv_cache->num_available > nv_cache->num_data_blocks)) {
424 				nv_cache->num_available = nv_cache->num_data_blocks;
425 			}
426 			pthread_spin_unlock(&nv_cache->lock);
427 		}
428 
429 		/*
430 		 * Go through the reloc_bitmap, checking for all the bands that had its data moved
431 		 * onto current band and update their counters to allow them to be used for writing
432 		 * (once they're closed and empty).
433 		 */
434 		for (id = 0; id < ftl_get_num_bands(dev); ++id) {
435 			if (spdk_bit_array_get(band->reloc_bitmap, id)) {
436 				assert(dev->bands[id].num_reloc_bands > 0);
437 				dev->bands[id].num_reloc_bands--;
438 
439 				spdk_bit_array_clear(band->reloc_bitmap, id);
440 			}
441 		}
442 
443 		ftl_remove_wptr(wptr);
444 	}
445 }
446 
447 static int
448 ftl_read_next_physical_addr(struct ftl_io *io, struct ftl_addr *addr)
449 {
450 	struct spdk_ftl_dev *dev = io->dev;
451 	size_t num_blocks, max_blocks;
452 
453 	assert(ftl_io_mode_physical(io));
454 	assert(io->iov_pos < io->iov_cnt);
455 
456 	if (io->pos == 0) {
457 		*addr = io->addr;
458 	} else {
459 		*addr = ftl_band_next_xfer_addr(io->band, io->addr, io->pos);
460 	}
461 
462 	assert(!ftl_addr_invalid(*addr));
463 
464 	/* Metadata has to be read in the way it's written (jumping across */
465 	/* the zones in xfer_size increments) */
466 	if (io->flags & FTL_IO_MD) {
467 		max_blocks = dev->xfer_size - (addr->offset % dev->xfer_size);
468 		num_blocks = spdk_min(ftl_io_iovec_len_left(io), max_blocks);
469 		assert(addr->offset / dev->xfer_size ==
470 		       (addr->offset + num_blocks - 1) / dev->xfer_size);
471 	} else {
472 		num_blocks = ftl_io_iovec_len_left(io);
473 	}
474 
475 	return num_blocks;
476 }
477 
478 static int
479 ftl_wptr_close_band(struct ftl_wptr *wptr)
480 {
481 	struct ftl_band *band = wptr->band;
482 
483 	ftl_band_set_state(band, FTL_BAND_STATE_CLOSING);
484 
485 	return ftl_band_write_tail_md(band, ftl_md_write_cb);
486 }
487 
488 static int
489 ftl_wptr_open_band(struct ftl_wptr *wptr)
490 {
491 	struct ftl_band *band = wptr->band;
492 
493 	assert(ftl_band_zone_is_first(band, wptr->zone));
494 	assert(band->lba_map.num_vld == 0);
495 
496 	ftl_band_clear_lba_map(band);
497 
498 	assert(band->state == FTL_BAND_STATE_PREP);
499 	ftl_band_set_state(band, FTL_BAND_STATE_OPENING);
500 
501 	return ftl_band_write_head_md(band, ftl_md_write_cb);
502 }
503 
504 static int
505 ftl_submit_erase(struct ftl_io *io)
506 {
507 	struct spdk_ftl_dev *dev = io->dev;
508 	struct ftl_band *band = io->band;
509 	struct ftl_addr addr = io->addr;
510 	struct ftl_io_channel *ioch;
511 	struct ftl_zone *zone;
512 	int rc = 0;
513 	size_t i;
514 
515 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
516 
517 	for (i = 0; i < io->num_blocks; ++i) {
518 		if (i != 0) {
519 			zone = ftl_band_next_zone(band, ftl_band_zone_from_addr(band, addr));
520 			assert(zone->info.state == SPDK_BDEV_ZONE_STATE_FULL);
521 			addr.offset = zone->info.zone_id;
522 		}
523 
524 		assert(ftl_addr_get_zone_offset(dev, addr) == 0);
525 
526 		ftl_trace_submission(dev, io, addr, 1);
527 		rc = spdk_bdev_zone_management(dev->base_bdev_desc, ioch->base_ioch, addr.offset,
528 					       SPDK_BDEV_ZONE_RESET, ftl_io_cmpl_cb, io);
529 		if (spdk_unlikely(rc)) {
530 			ftl_io_fail(io, rc);
531 			SPDK_ERRLOG("Vector reset failed with status: %d\n", rc);
532 			break;
533 		}
534 
535 		ftl_io_inc_req(io);
536 		ftl_io_advance(io, 1);
537 	}
538 
539 	if (ftl_io_done(io)) {
540 		ftl_io_complete(io);
541 	}
542 
543 	return rc;
544 }
545 
546 static bool
547 ftl_check_core_thread(const struct spdk_ftl_dev *dev)
548 {
549 	return dev->core_thread == spdk_get_thread();
550 }
551 
552 struct spdk_io_channel *
553 ftl_get_io_channel(const struct spdk_ftl_dev *dev)
554 {
555 	if (ftl_check_core_thread(dev)) {
556 		return dev->ioch;
557 	}
558 
559 	return NULL;
560 }
561 
562 static void
563 ftl_erase_fail(struct ftl_io *io, int status)
564 {
565 	struct ftl_zone *zone;
566 	struct ftl_band *band = io->band;
567 	char buf[128];
568 
569 	SPDK_ERRLOG("Erase failed at address: %s, status: %d\n",
570 		    ftl_addr2str(io->addr, buf, sizeof(buf)), status);
571 
572 	zone = ftl_band_zone_from_addr(band, io->addr);
573 	zone->info.state = SPDK_BDEV_ZONE_STATE_OFFLINE;
574 	ftl_band_remove_zone(band, zone);
575 	band->tail_md_addr = ftl_band_tail_md_addr(band);
576 }
577 
578 static void
579 ftl_zone_erase_cb(struct ftl_io *io, void *ctx, int status)
580 {
581 	struct ftl_zone *zone;
582 
583 	zone = ftl_band_zone_from_addr(io->band, io->addr);
584 	zone->busy = false;
585 
586 	if (spdk_unlikely(status)) {
587 		ftl_erase_fail(io, status);
588 		return;
589 	}
590 
591 	zone->info.state = SPDK_BDEV_ZONE_STATE_EMPTY;
592 	zone->info.write_pointer = zone->info.zone_id;
593 }
594 
595 static int
596 ftl_band_erase(struct ftl_band *band)
597 {
598 	struct ftl_zone *zone;
599 	struct ftl_io *io;
600 	int rc = 0;
601 
602 	assert(band->state == FTL_BAND_STATE_CLOSED ||
603 	       band->state == FTL_BAND_STATE_FREE);
604 
605 	ftl_band_set_state(band, FTL_BAND_STATE_PREP);
606 
607 	CIRCLEQ_FOREACH(zone, &band->zones, circleq) {
608 		if (zone->info.state == SPDK_BDEV_ZONE_STATE_EMPTY) {
609 			continue;
610 		}
611 
612 		io = ftl_io_erase_init(band, 1, ftl_zone_erase_cb);
613 		if (!io) {
614 			rc = -ENOMEM;
615 			break;
616 		}
617 
618 		zone->busy = true;
619 		io->addr.offset = zone->info.zone_id;
620 		rc = ftl_submit_erase(io);
621 		if (rc) {
622 			zone->busy = false;
623 			assert(0);
624 			/* TODO: change band's state back to close? */
625 			break;
626 		}
627 	}
628 
629 	return rc;
630 }
631 
632 static struct ftl_band *
633 ftl_next_write_band(struct spdk_ftl_dev *dev)
634 {
635 	struct ftl_band *band;
636 
637 	/* Find a free band that has all of its data moved onto other closed bands */
638 	LIST_FOREACH(band, &dev->free_bands, list_entry) {
639 		assert(band->state == FTL_BAND_STATE_FREE);
640 		if (band->num_reloc_bands == 0 && band->num_reloc_blocks == 0) {
641 			break;
642 		}
643 	}
644 
645 	if (spdk_unlikely(!band)) {
646 		return NULL;
647 	}
648 
649 	if (ftl_band_erase(band)) {
650 		/* TODO: handle erase failure */
651 		return NULL;
652 	}
653 
654 	return band;
655 }
656 
657 static struct ftl_band *
658 ftl_next_wptr_band(struct spdk_ftl_dev *dev)
659 {
660 	struct ftl_band *band;
661 
662 	if (!dev->next_band) {
663 		band = ftl_next_write_band(dev);
664 	} else {
665 		assert(dev->next_band->state == FTL_BAND_STATE_PREP);
666 		band = dev->next_band;
667 		dev->next_band = NULL;
668 	}
669 
670 	return band;
671 }
672 
673 static struct ftl_wptr *
674 ftl_wptr_init(struct ftl_band *band)
675 {
676 	struct spdk_ftl_dev *dev = band->dev;
677 	struct ftl_wptr *wptr;
678 
679 	wptr = calloc(1, sizeof(*wptr));
680 	if (!wptr) {
681 		return NULL;
682 	}
683 
684 	wptr->dev = dev;
685 	wptr->band = band;
686 	wptr->zone = CIRCLEQ_FIRST(&band->zones);
687 	wptr->addr.offset = wptr->zone->info.zone_id;
688 	TAILQ_INIT(&wptr->pending_queue);
689 
690 	return wptr;
691 }
692 
693 static int
694 ftl_add_direct_wptr(struct ftl_band *band)
695 {
696 	struct spdk_ftl_dev *dev = band->dev;
697 	struct ftl_wptr *wptr;
698 
699 	assert(band->state == FTL_BAND_STATE_OPEN);
700 
701 	wptr = ftl_wptr_init(band);
702 	if (!wptr) {
703 		return -1;
704 	}
705 
706 	wptr->direct_mode = true;
707 
708 	if (ftl_band_alloc_lba_map(band)) {
709 		ftl_wptr_free(wptr);
710 		return -1;
711 	}
712 
713 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
714 
715 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: direct band %u\n", band->id);
716 	ftl_trace_write_band(dev, band);
717 	return 0;
718 }
719 
720 static void
721 ftl_close_direct_wptr(struct ftl_band *band)
722 {
723 	struct ftl_wptr *wptr = ftl_wptr_from_band(band);
724 
725 	assert(wptr);
726 	assert(wptr->direct_mode);
727 	assert(band->state == FTL_BAND_STATE_CLOSED);
728 
729 	ftl_band_release_lba_map(band);
730 
731 	ftl_remove_wptr(wptr);
732 }
733 
734 int
735 ftl_band_set_direct_access(struct ftl_band *band, bool access)
736 {
737 	if (access) {
738 		return ftl_add_direct_wptr(band);
739 	} else {
740 		ftl_close_direct_wptr(band);
741 		return 0;
742 	}
743 }
744 
745 static int
746 ftl_add_wptr(struct spdk_ftl_dev *dev)
747 {
748 	struct ftl_band *band;
749 	struct ftl_wptr *wptr;
750 
751 	band = ftl_next_wptr_band(dev);
752 	if (!band) {
753 		return -1;
754 	}
755 
756 	wptr = ftl_wptr_init(band);
757 	if (!wptr) {
758 		return -1;
759 	}
760 
761 	if (ftl_band_write_prep(band)) {
762 		ftl_wptr_free(wptr);
763 		return -1;
764 	}
765 
766 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
767 
768 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: band %u\n", band->id);
769 	ftl_trace_write_band(dev, band);
770 	return 0;
771 }
772 
773 static void
774 ftl_wptr_advance(struct ftl_wptr *wptr, size_t xfer_size)
775 {
776 	struct ftl_band *band = wptr->band;
777 	struct spdk_ftl_dev *dev = wptr->dev;
778 	struct spdk_ftl_conf *conf = &dev->conf;
779 	size_t next_thld;
780 
781 	if (spdk_unlikely(wptr->direct_mode)) {
782 		return;
783 	}
784 
785 	wptr->offset += xfer_size;
786 	next_thld = (ftl_band_num_usable_blocks(band) * conf->band_thld) / 100;
787 
788 	if (ftl_band_full(band, wptr->offset)) {
789 		ftl_band_set_state(band, FTL_BAND_STATE_FULL);
790 	}
791 
792 	wptr->zone->busy = true;
793 	wptr->addr = ftl_band_next_xfer_addr(band, wptr->addr, xfer_size);
794 	wptr->zone = ftl_band_next_operational_zone(band, wptr->zone);
795 
796 	assert(!ftl_addr_invalid(wptr->addr));
797 
798 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: pu:%lu band:%lu, offset:%lu\n",
799 		      ftl_addr_get_punit(dev, wptr->addr),
800 		      ftl_addr_get_band(dev, wptr->addr),
801 		      wptr->addr.offset);
802 
803 	if (wptr->offset >= next_thld && !dev->next_band) {
804 		dev->next_band = ftl_next_write_band(dev);
805 	}
806 }
807 
808 static size_t
809 ftl_wptr_user_blocks_left(const struct ftl_wptr *wptr)
810 {
811 	return ftl_band_user_blocks_left(wptr->band, wptr->offset);
812 }
813 
814 static bool
815 ftl_wptr_ready(struct ftl_wptr *wptr)
816 {
817 	struct ftl_band *band = wptr->band;
818 
819 	/* TODO: add handling of empty bands */
820 
821 	if (spdk_unlikely(!ftl_zone_is_writable(wptr->zone))) {
822 		/* Erasing band may fail after it was assigned to wptr. */
823 		if (spdk_unlikely(wptr->zone->info.state == SPDK_BDEV_ZONE_STATE_OFFLINE)) {
824 			ftl_wptr_advance(wptr, wptr->dev->xfer_size);
825 		}
826 		return false;
827 	}
828 
829 	/* If we're in the process of writing metadata, wait till it is */
830 	/* completed. */
831 	/* TODO: we should probably change bands once we're writing tail md */
832 	if (ftl_band_state_changing(band)) {
833 		return false;
834 	}
835 
836 	if (band->state == FTL_BAND_STATE_FULL) {
837 		if (wptr->num_outstanding == 0) {
838 			if (ftl_wptr_close_band(wptr)) {
839 				/* TODO: need recovery here */
840 				assert(false);
841 			}
842 		}
843 
844 		return false;
845 	}
846 
847 	if (band->state != FTL_BAND_STATE_OPEN) {
848 		if (ftl_wptr_open_band(wptr)) {
849 			/* TODO: need recovery here */
850 			assert(false);
851 		}
852 
853 		return false;
854 	}
855 
856 	return true;
857 }
858 
859 int
860 ftl_flush_active_bands(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
861 {
862 	struct ftl_wptr *wptr;
863 	struct ftl_band_flush *flush;
864 
865 	assert(ftl_get_core_thread(dev) == spdk_get_thread());
866 
867 	flush = calloc(1, sizeof(*flush));
868 	if (spdk_unlikely(!flush)) {
869 		return -ENOMEM;
870 	}
871 
872 	LIST_INSERT_HEAD(&dev->band_flush_list, flush, list_entry);
873 
874 	flush->cb_fn = cb_fn;
875 	flush->cb_arg = cb_arg;
876 	flush->dev = dev;
877 
878 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
879 		wptr->flush = true;
880 		flush->num_bands++;
881 	}
882 
883 	return 0;
884 }
885 
886 static const struct spdk_ftl_limit *
887 ftl_get_limit(const struct spdk_ftl_dev *dev, int type)
888 {
889 	assert(type < SPDK_FTL_LIMIT_MAX);
890 	return &dev->conf.limits[type];
891 }
892 
893 static bool
894 ftl_cache_lba_valid(struct spdk_ftl_dev *dev, struct ftl_wbuf_entry *entry)
895 {
896 	struct ftl_addr addr;
897 
898 	/* If the LBA is invalid don't bother checking the md and l2p */
899 	if (spdk_unlikely(entry->lba == FTL_LBA_INVALID)) {
900 		return false;
901 	}
902 
903 	addr = ftl_l2p_get(dev, entry->lba);
904 	if (!(ftl_addr_cached(addr) && entry == ftl_get_entry_from_addr(dev, addr))) {
905 		return false;
906 	}
907 
908 	return true;
909 }
910 
911 static void
912 ftl_evict_cache_entry(struct spdk_ftl_dev *dev, struct ftl_wbuf_entry *entry)
913 {
914 	pthread_spin_lock(&entry->lock);
915 
916 	if (!entry->valid) {
917 		goto unlock;
918 	}
919 
920 	/* If the l2p wasn't updated and still points at the entry, fill it with the */
921 	/* on-disk address and clear the cache status bit. Otherwise, skip the l2p update */
922 	/* and just clear the cache status. */
923 	if (!ftl_cache_lba_valid(dev, entry)) {
924 		goto clear;
925 	}
926 
927 	ftl_l2p_set(dev, entry->lba, entry->addr);
928 clear:
929 	entry->valid = false;
930 unlock:
931 	pthread_spin_unlock(&entry->lock);
932 }
933 
934 static void
935 ftl_pad_wbuf(struct spdk_ftl_dev *dev, size_t size)
936 {
937 	struct ftl_wbuf_entry *entry;
938 	struct ftl_io_channel *ioch;
939 	int flags = FTL_IO_PAD | FTL_IO_INTERNAL;
940 
941 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
942 
943 	for (size_t i = 0; i < size; ++i) {
944 		entry = ftl_acquire_wbuf_entry(ioch, flags);
945 		if (!entry) {
946 			break;
947 		}
948 
949 		entry->lba = FTL_LBA_INVALID;
950 		entry->addr = ftl_to_addr(FTL_ADDR_INVALID);
951 		memset(entry->payload, 0, FTL_BLOCK_SIZE);
952 
953 		spdk_ring_enqueue(ioch->submit_queue, (void **)&entry, 1, NULL);
954 	}
955 }
956 
957 static void
958 ftl_remove_free_bands(struct spdk_ftl_dev *dev)
959 {
960 	while (!LIST_EMPTY(&dev->free_bands)) {
961 		LIST_REMOVE(LIST_FIRST(&dev->free_bands), list_entry);
962 	}
963 
964 	dev->next_band = NULL;
965 }
966 
967 static void
968 ftl_wptr_pad_band(struct ftl_wptr *wptr)
969 {
970 	struct spdk_ftl_dev *dev = wptr->dev;
971 	struct ftl_batch *batch = dev->current_batch;
972 	struct ftl_io_channel *ioch;
973 	size_t size, pad_size, blocks_left;
974 
975 	size = batch != NULL ? batch->num_entries : 0;
976 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
977 		size += spdk_ring_count(ioch->submit_queue);
978 	}
979 
980 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
981 
982 	blocks_left = ftl_wptr_user_blocks_left(wptr);
983 	assert(size <= blocks_left);
984 	assert(blocks_left % dev->xfer_size == 0);
985 	pad_size = spdk_min(blocks_left - size, spdk_ring_count(ioch->free_queue));
986 
987 	ftl_pad_wbuf(dev, pad_size);
988 }
989 
990 static void
991 ftl_wptr_process_shutdown(struct ftl_wptr *wptr)
992 {
993 	struct spdk_ftl_dev *dev = wptr->dev;
994 	struct ftl_batch *batch = dev->current_batch;
995 	struct ftl_io_channel *ioch;
996 	size_t size;
997 
998 	size = batch != NULL ? batch->num_entries : 0;
999 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
1000 		size += spdk_ring_count(ioch->submit_queue);
1001 	}
1002 
1003 	if (size >= dev->xfer_size) {
1004 		return;
1005 	}
1006 
1007 	/* If we reach this point we need to remove free bands */
1008 	/* and pad current wptr band to the end */
1009 	ftl_remove_free_bands(dev);
1010 	ftl_wptr_pad_band(wptr);
1011 }
1012 
1013 static int
1014 ftl_shutdown_complete(struct spdk_ftl_dev *dev)
1015 {
1016 	struct ftl_io_channel *ioch = ftl_io_channel_get_ctx(dev->ioch);
1017 
1018 	return !__atomic_load_n(&dev->num_inflight, __ATOMIC_SEQ_CST) &&
1019 	       dev->num_io_channels == 1 && LIST_EMPTY(&dev->wptr_list) &&
1020 	       TAILQ_EMPTY(&ioch->retry_queue);
1021 }
1022 
1023 void
1024 ftl_apply_limits(struct spdk_ftl_dev *dev)
1025 {
1026 	const struct spdk_ftl_limit *limit;
1027 	struct ftl_io_channel *ioch;
1028 	struct ftl_stats *stats = &dev->stats;
1029 	uint32_t qdepth_limit = 100;
1030 	int i;
1031 
1032 	/* Clear existing limit */
1033 	dev->limit = SPDK_FTL_LIMIT_MAX;
1034 
1035 	for (i = SPDK_FTL_LIMIT_CRIT; i < SPDK_FTL_LIMIT_MAX; ++i) {
1036 		limit = ftl_get_limit(dev, i);
1037 
1038 		if (dev->num_free <= limit->thld) {
1039 			qdepth_limit = limit->limit;
1040 			stats->limits[i]++;
1041 			dev->limit = i;
1042 			break;
1043 		}
1044 	}
1045 
1046 	ftl_trace_limits(dev, dev->limit, dev->num_free);
1047 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
1048 		__atomic_store_n(&ioch->qdepth_limit, (qdepth_limit * ioch->num_entries) / 100,
1049 				 __ATOMIC_SEQ_CST);
1050 	}
1051 }
1052 
1053 static int
1054 ftl_invalidate_addr_unlocked(struct spdk_ftl_dev *dev, struct ftl_addr addr)
1055 {
1056 	struct ftl_band *band = ftl_band_from_addr(dev, addr);
1057 	struct ftl_lba_map *lba_map = &band->lba_map;
1058 	uint64_t offset;
1059 
1060 	offset = ftl_band_block_offset_from_addr(band, addr);
1061 
1062 	/* The bit might be already cleared if two writes are scheduled to the */
1063 	/* same LBA at the same time */
1064 	if (spdk_bit_array_get(lba_map->vld, offset)) {
1065 		assert(lba_map->num_vld > 0);
1066 		spdk_bit_array_clear(lba_map->vld, offset);
1067 		lba_map->num_vld--;
1068 		return 1;
1069 	}
1070 
1071 	return 0;
1072 }
1073 
1074 int
1075 ftl_invalidate_addr(struct spdk_ftl_dev *dev, struct ftl_addr addr)
1076 {
1077 	struct ftl_band *band;
1078 	int rc;
1079 
1080 	assert(!ftl_addr_cached(addr));
1081 	band = ftl_band_from_addr(dev, addr);
1082 
1083 	pthread_spin_lock(&band->lba_map.lock);
1084 	rc = ftl_invalidate_addr_unlocked(dev, addr);
1085 	pthread_spin_unlock(&band->lba_map.lock);
1086 
1087 	return rc;
1088 }
1089 
1090 static int
1091 ftl_read_retry(int rc)
1092 {
1093 	return rc == -EAGAIN;
1094 }
1095 
1096 static int
1097 ftl_read_canceled(int rc)
1098 {
1099 	return rc == -EFAULT || rc == 0;
1100 }
1101 
1102 static int
1103 ftl_cache_read(struct ftl_io *io, uint64_t lba,
1104 	       struct ftl_addr addr, void *buf)
1105 {
1106 	struct ftl_wbuf_entry *entry;
1107 	struct ftl_addr naddr;
1108 	int rc = 0;
1109 
1110 	entry = ftl_get_entry_from_addr(io->dev, addr);
1111 	pthread_spin_lock(&entry->lock);
1112 
1113 	naddr = ftl_l2p_get(io->dev, lba);
1114 	if (addr.offset != naddr.offset) {
1115 		rc = -1;
1116 		goto out;
1117 	}
1118 
1119 	memcpy(buf, entry->payload, FTL_BLOCK_SIZE);
1120 out:
1121 	pthread_spin_unlock(&entry->lock);
1122 	return rc;
1123 }
1124 
1125 static int
1126 ftl_read_next_logical_addr(struct ftl_io *io, struct ftl_addr *addr)
1127 {
1128 	struct spdk_ftl_dev *dev = io->dev;
1129 	struct ftl_addr next_addr;
1130 	size_t i;
1131 
1132 	*addr = ftl_l2p_get(dev, ftl_io_current_lba(io));
1133 
1134 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Read addr:%lx, lba:%lu\n",
1135 		      addr->offset, ftl_io_current_lba(io));
1136 
1137 	/* If the address is invalid, skip it (the buffer should already be zero'ed) */
1138 	if (ftl_addr_invalid(*addr)) {
1139 		return -EFAULT;
1140 	}
1141 
1142 	if (ftl_addr_cached(*addr)) {
1143 		if (!ftl_cache_read(io, ftl_io_current_lba(io), *addr, ftl_io_iovec_addr(io))) {
1144 			return 0;
1145 		}
1146 
1147 		/* If the state changed, we have to re-read the l2p */
1148 		return -EAGAIN;
1149 	}
1150 
1151 	for (i = 1; i < ftl_io_iovec_len_left(io); ++i) {
1152 		next_addr = ftl_l2p_get(dev, ftl_io_get_lba(io, io->pos + i));
1153 
1154 		if (ftl_addr_invalid(next_addr) || ftl_addr_cached(next_addr)) {
1155 			break;
1156 		}
1157 
1158 		if (addr->offset + i != next_addr.offset) {
1159 			break;
1160 		}
1161 	}
1162 
1163 	return i;
1164 }
1165 
1166 static int
1167 ftl_submit_read(struct ftl_io *io)
1168 {
1169 	struct spdk_ftl_dev *dev = io->dev;
1170 	struct ftl_io_channel *ioch;
1171 	struct ftl_addr addr;
1172 	int rc = 0, num_blocks;
1173 
1174 	ioch = ftl_io_channel_get_ctx(io->ioch);
1175 
1176 	assert(LIST_EMPTY(&io->children));
1177 
1178 	while (io->pos < io->num_blocks) {
1179 		if (ftl_io_mode_physical(io)) {
1180 			num_blocks = rc = ftl_read_next_physical_addr(io, &addr);
1181 		} else {
1182 			num_blocks = rc = ftl_read_next_logical_addr(io, &addr);
1183 		}
1184 
1185 		/* We might need to retry the read from scratch (e.g. */
1186 		/* because write was under way and completed before */
1187 		/* we could read it from the write buffer */
1188 		if (ftl_read_retry(rc)) {
1189 			continue;
1190 		}
1191 
1192 		/* We don't have to schedule the read, as it was read from cache */
1193 		if (ftl_read_canceled(rc)) {
1194 			ftl_io_advance(io, 1);
1195 			ftl_trace_completion(io->dev, io, rc ? FTL_TRACE_COMPLETION_INVALID :
1196 					     FTL_TRACE_COMPLETION_CACHE);
1197 			rc = 0;
1198 			continue;
1199 		}
1200 
1201 		assert(num_blocks > 0);
1202 
1203 		ftl_trace_submission(dev, io, addr, num_blocks);
1204 		rc = spdk_bdev_read_blocks(dev->base_bdev_desc, ioch->base_ioch,
1205 					   ftl_io_iovec_addr(io),
1206 					   addr.offset,
1207 					   num_blocks, ftl_io_cmpl_cb, io);
1208 		if (spdk_unlikely(rc)) {
1209 			if (rc == -ENOMEM) {
1210 				TAILQ_INSERT_TAIL(&ioch->retry_queue, io, ioch_entry);
1211 				rc = 0;
1212 			} else {
1213 				ftl_io_fail(io, rc);
1214 			}
1215 			break;
1216 		}
1217 
1218 		ftl_io_inc_req(io);
1219 		ftl_io_advance(io, num_blocks);
1220 	}
1221 
1222 	/* If we didn't have to read anything from the device, */
1223 	/* complete the request right away */
1224 	if (ftl_io_done(io)) {
1225 		ftl_io_complete(io);
1226 	}
1227 
1228 	return rc;
1229 }
1230 
1231 static void
1232 ftl_complete_flush(struct ftl_flush *flush)
1233 {
1234 	assert(flush->num_req == 0);
1235 	LIST_REMOVE(flush, list_entry);
1236 
1237 	flush->cb.fn(flush->cb.ctx, 0);
1238 
1239 	spdk_bit_array_free(&flush->bmap);
1240 	free(flush);
1241 }
1242 
1243 static void
1244 ftl_process_flush(struct spdk_ftl_dev *dev, struct ftl_batch *batch)
1245 {
1246 	struct ftl_flush *flush, *tflush;
1247 	size_t offset;
1248 
1249 	LIST_FOREACH_SAFE(flush, &dev->flush_list, list_entry, tflush) {
1250 		offset = batch->index;
1251 
1252 		if (spdk_bit_array_get(flush->bmap, offset)) {
1253 			spdk_bit_array_clear(flush->bmap, offset);
1254 			if (!(--flush->num_req)) {
1255 				ftl_complete_flush(flush);
1256 			}
1257 		}
1258 	}
1259 }
1260 
1261 static void
1262 ftl_nv_cache_wrap_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1263 {
1264 	struct ftl_nv_cache *nv_cache = cb_arg;
1265 
1266 	if (!success) {
1267 		SPDK_ERRLOG("Unable to write non-volatile cache metadata header\n");
1268 		/* TODO: go into read-only mode */
1269 		assert(0);
1270 	}
1271 
1272 	pthread_spin_lock(&nv_cache->lock);
1273 	nv_cache->ready = true;
1274 	pthread_spin_unlock(&nv_cache->lock);
1275 
1276 	spdk_bdev_free_io(bdev_io);
1277 }
1278 
1279 static void
1280 ftl_nv_cache_wrap(void *ctx)
1281 {
1282 	struct ftl_nv_cache *nv_cache = ctx;
1283 	int rc;
1284 
1285 	rc = ftl_nv_cache_write_header(nv_cache, false, ftl_nv_cache_wrap_cb, nv_cache);
1286 	if (spdk_unlikely(rc != 0)) {
1287 		SPDK_ERRLOG("Unable to write non-volatile cache metadata header: %s\n",
1288 			    spdk_strerror(-rc));
1289 		/* TODO: go into read-only mode */
1290 		assert(0);
1291 	}
1292 }
1293 
1294 static uint64_t
1295 ftl_reserve_nv_cache(struct ftl_nv_cache *nv_cache, size_t *num_blocks, unsigned int *phase)
1296 {
1297 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1298 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1299 	uint64_t num_available, cache_size, cache_addr = FTL_LBA_INVALID;
1300 
1301 	cache_size = spdk_bdev_get_num_blocks(bdev);
1302 
1303 	pthread_spin_lock(&nv_cache->lock);
1304 	if (spdk_unlikely(nv_cache->num_available == 0 || !nv_cache->ready)) {
1305 		goto out;
1306 	}
1307 
1308 	num_available = spdk_min(nv_cache->num_available, *num_blocks);
1309 	num_available = spdk_min(num_available, dev->conf.nv_cache.max_request_cnt);
1310 
1311 	if (spdk_unlikely(nv_cache->current_addr + num_available > cache_size)) {
1312 		*num_blocks = cache_size - nv_cache->current_addr;
1313 	} else {
1314 		*num_blocks = num_available;
1315 	}
1316 
1317 	cache_addr = nv_cache->current_addr;
1318 	nv_cache->current_addr += *num_blocks;
1319 	nv_cache->num_available -= *num_blocks;
1320 	*phase = nv_cache->phase;
1321 
1322 	if (nv_cache->current_addr == spdk_bdev_get_num_blocks(bdev)) {
1323 		nv_cache->current_addr = FTL_NV_CACHE_DATA_OFFSET;
1324 		nv_cache->phase = ftl_nv_cache_next_phase(nv_cache->phase);
1325 		nv_cache->ready = false;
1326 		spdk_thread_send_msg(ftl_get_core_thread(dev), ftl_nv_cache_wrap, nv_cache);
1327 	}
1328 out:
1329 	pthread_spin_unlock(&nv_cache->lock);
1330 	return cache_addr;
1331 }
1332 
1333 static struct ftl_io *
1334 ftl_alloc_io_nv_cache(struct ftl_io *parent, size_t num_blocks)
1335 {
1336 	struct ftl_io_init_opts opts = {
1337 		.dev		= parent->dev,
1338 		.parent		= parent,
1339 		.iovcnt		= 0,
1340 		.num_blocks	= num_blocks,
1341 		.flags		= parent->flags | FTL_IO_CACHE,
1342 	};
1343 
1344 	return ftl_io_init_internal(&opts);
1345 }
1346 
1347 static void
1348 ftl_nv_cache_submit_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1349 {
1350 	struct ftl_io *io = cb_arg;
1351 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1352 
1353 	if (spdk_unlikely(!success)) {
1354 		SPDK_ERRLOG("Non-volatile cache write failed at %"PRIx64"\n", io->addr.offset);
1355 		io->status = -EIO;
1356 	}
1357 
1358 	ftl_io_dec_req(io);
1359 	if (ftl_io_done(io)) {
1360 		spdk_mempool_put(nv_cache->md_pool, io->md);
1361 		ftl_io_complete(io);
1362 	}
1363 
1364 	spdk_bdev_free_io(bdev_io);
1365 }
1366 
1367 static void
1368 ftl_submit_nv_cache(void *ctx)
1369 {
1370 	struct ftl_io *io = ctx;
1371 	struct spdk_ftl_dev *dev = io->dev;
1372 	struct spdk_thread *thread;
1373 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
1374 	struct ftl_io_channel *ioch;
1375 	int rc;
1376 
1377 	ioch = ftl_io_channel_get_ctx(io->ioch);
1378 	thread = spdk_io_channel_get_thread(io->ioch);
1379 
1380 	rc = spdk_bdev_write_blocks_with_md(nv_cache->bdev_desc, ioch->cache_ioch,
1381 					    ftl_io_iovec_addr(io), io->md, io->addr.offset,
1382 					    io->num_blocks, ftl_nv_cache_submit_cb, io);
1383 	if (rc == -ENOMEM) {
1384 		spdk_thread_send_msg(thread, ftl_submit_nv_cache, io);
1385 		return;
1386 	} else if (rc) {
1387 		SPDK_ERRLOG("Write to persistent cache failed: %s (%"PRIu64", %"PRIu64")\n",
1388 			    spdk_strerror(-rc), io->addr.offset, io->num_blocks);
1389 		spdk_mempool_put(nv_cache->md_pool, io->md);
1390 		io->status = -EIO;
1391 		ftl_io_complete(io);
1392 		return;
1393 	}
1394 
1395 	ftl_io_advance(io, io->num_blocks);
1396 	ftl_io_inc_req(io);
1397 }
1398 
1399 static void
1400 ftl_nv_cache_fill_md(struct ftl_io *io, unsigned int phase)
1401 {
1402 	struct spdk_bdev *bdev;
1403 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1404 	uint64_t block_off, lba;
1405 	void *md_buf = io->md;
1406 
1407 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1408 
1409 	for (block_off = 0; block_off < io->num_blocks; ++block_off) {
1410 		lba = ftl_nv_cache_pack_lba(ftl_io_get_lba(io, block_off), phase);
1411 		memcpy(md_buf, &lba, sizeof(lba));
1412 		md_buf += spdk_bdev_get_md_size(bdev);
1413 	}
1414 }
1415 
1416 static void
1417 _ftl_write_nv_cache(void *ctx)
1418 {
1419 	struct ftl_io *child, *io = ctx;
1420 	struct spdk_ftl_dev *dev = io->dev;
1421 	struct spdk_thread *thread;
1422 	unsigned int phase;
1423 	uint64_t num_blocks;
1424 
1425 	thread = spdk_io_channel_get_thread(io->ioch);
1426 
1427 	while (io->pos < io->num_blocks) {
1428 		num_blocks = ftl_io_iovec_len_left(io);
1429 
1430 		child = ftl_alloc_io_nv_cache(io, num_blocks);
1431 		if (spdk_unlikely(!child)) {
1432 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1433 			return;
1434 		}
1435 
1436 		child->md = spdk_mempool_get(dev->nv_cache.md_pool);
1437 		if (spdk_unlikely(!child->md)) {
1438 			ftl_io_free(child);
1439 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1440 			break;
1441 		}
1442 
1443 		/* Reserve area on the write buffer cache */
1444 		child->addr.offset = ftl_reserve_nv_cache(&dev->nv_cache, &num_blocks, &phase);
1445 		if (child->addr.offset == FTL_LBA_INVALID) {
1446 			spdk_mempool_put(dev->nv_cache.md_pool, child->md);
1447 			ftl_io_free(child);
1448 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1449 			break;
1450 		}
1451 
1452 		/* Shrink the IO if there isn't enough room in the cache to fill the whole iovec */
1453 		if (spdk_unlikely(num_blocks != ftl_io_iovec_len_left(io))) {
1454 			ftl_io_shrink_iovec(child, num_blocks);
1455 		}
1456 
1457 		ftl_nv_cache_fill_md(child, phase);
1458 		ftl_submit_nv_cache(child);
1459 	}
1460 
1461 	if (ftl_io_done(io)) {
1462 		ftl_io_complete(io);
1463 	}
1464 }
1465 
1466 static void
1467 ftl_write_nv_cache(struct ftl_io *parent)
1468 {
1469 	ftl_io_reset(parent);
1470 	parent->flags |= FTL_IO_CACHE;
1471 	_ftl_write_nv_cache(parent);
1472 }
1473 
1474 int
1475 ftl_nv_cache_write_header(struct ftl_nv_cache *nv_cache, bool shutdown,
1476 			  spdk_bdev_io_completion_cb cb_fn, void *cb_arg)
1477 {
1478 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1479 	struct ftl_nv_cache_header *hdr = nv_cache->dma_buf;
1480 	struct spdk_bdev *bdev;
1481 	struct ftl_io_channel *ioch;
1482 
1483 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1484 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
1485 
1486 	memset(hdr, 0, spdk_bdev_get_block_size(bdev));
1487 
1488 	hdr->phase = (uint8_t)nv_cache->phase;
1489 	hdr->size = spdk_bdev_get_num_blocks(bdev);
1490 	hdr->uuid = dev->uuid;
1491 	hdr->version = FTL_NV_CACHE_HEADER_VERSION;
1492 	hdr->current_addr = shutdown ? nv_cache->current_addr : FTL_LBA_INVALID;
1493 	hdr->checksum = spdk_crc32c_update(hdr, offsetof(struct ftl_nv_cache_header, checksum), 0);
1494 
1495 	return spdk_bdev_write_blocks(nv_cache->bdev_desc, ioch->cache_ioch, hdr, 0, 1,
1496 				      cb_fn, cb_arg);
1497 }
1498 
1499 int
1500 ftl_nv_cache_scrub(struct ftl_nv_cache *nv_cache, spdk_bdev_io_completion_cb cb_fn, void *cb_arg)
1501 {
1502 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1503 	struct ftl_io_channel *ioch;
1504 	struct spdk_bdev *bdev;
1505 
1506 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
1507 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1508 
1509 	return spdk_bdev_write_zeroes_blocks(nv_cache->bdev_desc, ioch->cache_ioch, 1,
1510 					     spdk_bdev_get_num_blocks(bdev) - 1,
1511 					     cb_fn, cb_arg);
1512 }
1513 
1514 static void
1515 ftl_write_fail(struct ftl_io *io, int status)
1516 {
1517 	struct ftl_batch *batch = io->batch;
1518 	struct spdk_ftl_dev *dev = io->dev;
1519 	struct ftl_wbuf_entry *entry;
1520 	struct ftl_band *band;
1521 	char buf[128];
1522 
1523 	entry = TAILQ_FIRST(&batch->entries);
1524 
1525 	band = ftl_band_from_addr(io->dev, entry->addr);
1526 	SPDK_ERRLOG("Write failed @addr: %s, status: %d\n",
1527 		    ftl_addr2str(entry->addr, buf, sizeof(buf)), status);
1528 
1529 	/* Close the band and, halt wptr and defrag */
1530 	ftl_halt_writes(dev, band);
1531 
1532 	TAILQ_FOREACH(entry, &batch->entries, tailq) {
1533 		/* Invalidate meta set by process_writes() */
1534 		ftl_invalidate_addr(dev, entry->addr);
1535 	}
1536 
1537 	/* Reset the batch back to the write buffer to resend it later */
1538 	TAILQ_INSERT_TAIL(&dev->pending_batches, batch, tailq);
1539 }
1540 
1541 static void
1542 ftl_write_cb(struct ftl_io *io, void *arg, int status)
1543 {
1544 	struct spdk_ftl_dev *dev = io->dev;
1545 	struct ftl_batch *batch = io->batch;
1546 	struct ftl_wbuf_entry *entry;
1547 	struct ftl_band *band;
1548 	struct ftl_addr prev_addr, addr = io->addr;
1549 
1550 	if (status) {
1551 		ftl_write_fail(io, status);
1552 		return;
1553 	}
1554 
1555 	assert(io->num_blocks == dev->xfer_size);
1556 	assert(!(io->flags & FTL_IO_MD));
1557 
1558 	TAILQ_FOREACH(entry, &batch->entries, tailq) {
1559 		band = entry->band;
1560 		if (!(entry->io_flags & FTL_IO_PAD)) {
1561 			/* Verify that the LBA is set for user blocks */
1562 			assert(entry->lba != FTL_LBA_INVALID);
1563 		}
1564 
1565 		if (band != NULL) {
1566 			assert(band->num_reloc_blocks > 0);
1567 			band->num_reloc_blocks--;
1568 		}
1569 
1570 		entry->addr = addr;
1571 		if (entry->lba != FTL_LBA_INVALID) {
1572 			pthread_spin_lock(&entry->lock);
1573 			prev_addr = ftl_l2p_get(dev, entry->lba);
1574 
1575 			/* If the l2p was updated in the meantime, don't update band's metadata */
1576 			if (ftl_addr_cached(prev_addr) &&
1577 			    entry == ftl_get_entry_from_addr(dev, prev_addr)) {
1578 				/* Setting entry's cache bit needs to be done after metadata */
1579 				/* within the band is updated to make sure that writes */
1580 				/* invalidating the entry clear the metadata as well */
1581 				ftl_band_set_addr(io->band, entry->lba, entry->addr);
1582 				entry->valid = true;
1583 			}
1584 			pthread_spin_unlock(&entry->lock);
1585 		}
1586 
1587 		SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write addr:%lu, lba:%lu\n",
1588 			      entry->addr.offset, entry->lba);
1589 
1590 		addr = ftl_band_next_addr(io->band, addr, 1);
1591 	}
1592 
1593 	ftl_process_flush(dev, batch);
1594 	ftl_release_batch(dev, batch);
1595 }
1596 
1597 static void
1598 ftl_update_stats(struct spdk_ftl_dev *dev, const struct ftl_wbuf_entry *entry)
1599 {
1600 	if (entry->io_flags & FTL_IO_INTERNAL) {
1601 		dev->stats.write_user++;
1602 	}
1603 	dev->stats.write_total++;
1604 }
1605 
1606 static void
1607 ftl_update_l2p(struct spdk_ftl_dev *dev, const struct ftl_wbuf_entry *entry,
1608 	       struct ftl_addr addr)
1609 {
1610 	struct ftl_addr prev_addr;
1611 	struct ftl_wbuf_entry *prev;
1612 	struct ftl_band *band;
1613 	int valid;
1614 
1615 	prev_addr = ftl_l2p_get(dev, entry->lba);
1616 	if (ftl_addr_invalid(prev_addr)) {
1617 		ftl_l2p_set(dev, entry->lba, addr);
1618 		return;
1619 	}
1620 
1621 	/* If the L2P's physical address is different than what we expected we don't need to */
1622 	/* do anything (someone's already overwritten our data). */
1623 	if ((entry->io_flags & FTL_IO_WEAK) && !ftl_addr_cmp(prev_addr, entry->addr)) {
1624 		return;
1625 	}
1626 
1627 	if (ftl_addr_cached(prev_addr)) {
1628 		assert(!(entry->io_flags & FTL_IO_WEAK));
1629 		prev = ftl_get_entry_from_addr(dev, prev_addr);
1630 		pthread_spin_lock(&prev->lock);
1631 
1632 		/* Re-read the L2P under the lock to protect against updates */
1633 		/* to this LBA from other threads */
1634 		prev_addr = ftl_l2p_get(dev, entry->lba);
1635 
1636 		/* If the entry is no longer in cache, another write has been */
1637 		/* scheduled in the meantime, so we have to invalidate its LBA */
1638 		if (!ftl_addr_cached(prev_addr)) {
1639 			ftl_invalidate_addr(dev, prev_addr);
1640 		}
1641 
1642 		/* If previous entry is part of cache, remove and invalidate it */
1643 		if (prev->valid) {
1644 			ftl_invalidate_addr(dev, prev->addr);
1645 			prev->valid = false;
1646 		}
1647 
1648 		ftl_l2p_set(dev, entry->lba, addr);
1649 		pthread_spin_unlock(&prev->lock);
1650 		return;
1651 	}
1652 
1653 	/* Lock the band containing previous physical address. This assures atomic changes to */
1654 	/* the L2P as wall as metadata. The valid bits in metadata are used to */
1655 	/* check weak writes validity. */
1656 	band = ftl_band_from_addr(dev, prev_addr);
1657 	pthread_spin_lock(&band->lba_map.lock);
1658 
1659 	valid = ftl_invalidate_addr_unlocked(dev, prev_addr);
1660 
1661 	/* If the address has been invalidated already, we don't want to update */
1662 	/* the L2P for weak writes, as it means the write is no longer valid. */
1663 	if (!(entry->io_flags & FTL_IO_WEAK) || valid) {
1664 		ftl_l2p_set(dev, entry->lba, addr);
1665 	}
1666 
1667 	pthread_spin_unlock(&band->lba_map.lock);
1668 }
1669 
1670 static struct ftl_io *
1671 ftl_io_init_child_write(struct ftl_io *parent, struct ftl_addr addr, ftl_io_fn cb)
1672 {
1673 	struct ftl_io *io;
1674 	struct spdk_ftl_dev *dev = parent->dev;
1675 	struct ftl_io_init_opts opts = {
1676 		.dev		= dev,
1677 		.io		= NULL,
1678 		.parent		= parent,
1679 		.band		= parent->band,
1680 		.size		= sizeof(struct ftl_io),
1681 		.flags		= 0,
1682 		.type		= parent->type,
1683 		.num_blocks	= dev->xfer_size,
1684 		.cb_fn		= cb,
1685 		.iovcnt		= 0,
1686 	};
1687 
1688 	io = ftl_io_init_internal(&opts);
1689 	if (!io) {
1690 		return NULL;
1691 	}
1692 
1693 	io->addr = addr;
1694 
1695 	return io;
1696 }
1697 
1698 static void
1699 ftl_io_child_write_cb(struct ftl_io *io, void *ctx, int status)
1700 {
1701 	struct ftl_zone *zone;
1702 	struct ftl_wptr *wptr;
1703 
1704 	zone = ftl_band_zone_from_addr(io->band, io->addr);
1705 	wptr = ftl_wptr_from_band(io->band);
1706 
1707 	zone->busy = false;
1708 	zone->info.write_pointer += io->num_blocks;
1709 
1710 	if (zone->info.write_pointer == zone->info.zone_id + zone->info.capacity) {
1711 		zone->info.state = SPDK_BDEV_ZONE_STATE_FULL;
1712 	}
1713 
1714 	/* If some other write on the same band failed the write pointer would already be freed */
1715 	if (spdk_likely(wptr)) {
1716 		wptr->num_outstanding--;
1717 	}
1718 }
1719 
1720 static int
1721 ftl_submit_child_write(struct ftl_wptr *wptr, struct ftl_io *io)
1722 {
1723 	struct spdk_ftl_dev	*dev = io->dev;
1724 	struct ftl_io_channel	*ioch;
1725 	struct ftl_io		*child;
1726 	struct ftl_addr		addr;
1727 	int			rc;
1728 
1729 	ioch = ftl_io_channel_get_ctx(io->ioch);
1730 
1731 	if (spdk_likely(!wptr->direct_mode)) {
1732 		addr = wptr->addr;
1733 	} else {
1734 		assert(io->flags & FTL_IO_DIRECT_ACCESS);
1735 		assert(ftl_addr_get_band(dev, io->addr) == wptr->band->id);
1736 		addr = io->addr;
1737 	}
1738 
1739 	/* Split IO to child requests and release zone immediately after child is completed */
1740 	child = ftl_io_init_child_write(io, addr, ftl_io_child_write_cb);
1741 	if (!child) {
1742 		return -EAGAIN;
1743 	}
1744 
1745 	wptr->num_outstanding++;
1746 
1747 	if (ftl_is_append_supported(dev)) {
1748 		rc = spdk_bdev_zone_appendv(dev->base_bdev_desc, ioch->base_ioch,
1749 					    child->iov, child->iov_cnt,
1750 					    ftl_addr_get_zone_slba(dev, addr),
1751 					    dev->xfer_size, ftl_io_cmpl_cb, child);
1752 	} else {
1753 		rc = spdk_bdev_writev_blocks(dev->base_bdev_desc, ioch->base_ioch,
1754 					     child->iov, child->iov_cnt, addr.offset,
1755 					     dev->xfer_size, ftl_io_cmpl_cb, child);
1756 	}
1757 
1758 	if (rc) {
1759 		wptr->num_outstanding--;
1760 		ftl_io_fail(child, rc);
1761 		ftl_io_complete(child);
1762 		SPDK_ERRLOG("spdk_bdev_write_blocks_with_md failed with status:%d, addr:%lu\n",
1763 			    rc, addr.offset);
1764 		return -EIO;
1765 	}
1766 
1767 	ftl_io_inc_req(child);
1768 	ftl_io_advance(child, dev->xfer_size);
1769 
1770 	return 0;
1771 }
1772 
1773 static int
1774 ftl_submit_write(struct ftl_wptr *wptr, struct ftl_io *io)
1775 {
1776 	struct spdk_ftl_dev	*dev = io->dev;
1777 	int			rc = 0;
1778 
1779 	assert(io->num_blocks % dev->xfer_size == 0);
1780 
1781 	while (io->iov_pos < io->iov_cnt) {
1782 		/* There are no guarantees of the order of completion of NVMe IO submission queue */
1783 		/* so wait until zone is not busy before submitting another write */
1784 		if (!ftl_is_append_supported(dev) && wptr->zone->busy) {
1785 			TAILQ_INSERT_TAIL(&wptr->pending_queue, io, ioch_entry);
1786 			rc = -EAGAIN;
1787 			break;
1788 		}
1789 
1790 		rc = ftl_submit_child_write(wptr, io);
1791 		if (spdk_unlikely(rc)) {
1792 			if (rc == -EAGAIN) {
1793 				TAILQ_INSERT_TAIL(&wptr->pending_queue, io, ioch_entry);
1794 			} else {
1795 				ftl_io_fail(io, rc);
1796 			}
1797 			break;
1798 		}
1799 
1800 		ftl_trace_submission(dev, io, wptr->addr, dev->xfer_size);
1801 		ftl_wptr_advance(wptr, dev->xfer_size);
1802 	}
1803 
1804 	if (ftl_io_done(io)) {
1805 		/* Parent IO will complete after all children are completed */
1806 		ftl_io_complete(io);
1807 	}
1808 
1809 	return rc;
1810 }
1811 
1812 static void
1813 ftl_flush_pad_batch(struct spdk_ftl_dev *dev)
1814 {
1815 	struct ftl_batch *batch = dev->current_batch;
1816 	struct ftl_io_channel *ioch;
1817 	size_t size = 0, num_entries = 0;
1818 
1819 	assert(batch != NULL);
1820 	assert(batch->num_entries < dev->xfer_size);
1821 
1822 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
1823 		size += spdk_ring_count(ioch->submit_queue);
1824 	}
1825 
1826 	num_entries = dev->xfer_size - batch->num_entries;
1827 	if (size < num_entries) {
1828 		ftl_pad_wbuf(dev, num_entries - size);
1829 	}
1830 }
1831 
1832 static bool
1833 ftl_check_io_channel_flush(struct spdk_ftl_dev *dev)
1834 {
1835 	struct ftl_io_channel *ioch;
1836 
1837 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
1838 		if (ioch->flush && spdk_ring_count(ioch->free_queue) != ioch->num_entries) {
1839 			return true;
1840 		}
1841 	}
1842 
1843 	return false;
1844 }
1845 
1846 static int
1847 ftl_wptr_process_writes(struct ftl_wptr *wptr)
1848 {
1849 	struct spdk_ftl_dev	*dev = wptr->dev;
1850 	struct ftl_batch	*batch;
1851 	struct ftl_wbuf_entry	*entry;
1852 	struct ftl_io		*io;
1853 
1854 	if (spdk_unlikely(!TAILQ_EMPTY(&wptr->pending_queue))) {
1855 		io = TAILQ_FIRST(&wptr->pending_queue);
1856 		TAILQ_REMOVE(&wptr->pending_queue, io, ioch_entry);
1857 
1858 		if (ftl_submit_write(wptr, io) == -EAGAIN) {
1859 			return 0;
1860 		}
1861 	}
1862 
1863 	/* Make sure the band is prepared for writing */
1864 	if (!ftl_wptr_ready(wptr)) {
1865 		return 0;
1866 	}
1867 
1868 	if (dev->halt) {
1869 		ftl_wptr_process_shutdown(wptr);
1870 	}
1871 
1872 	if (spdk_unlikely(wptr->flush)) {
1873 		ftl_wptr_pad_band(wptr);
1874 	}
1875 
1876 	batch = ftl_get_next_batch(dev);
1877 	if (!batch) {
1878 		/* If there are queued flush requests we need to pad the write buffer to */
1879 		/* force out remaining entries */
1880 		if (!LIST_EMPTY(&dev->flush_list) || ftl_check_io_channel_flush(dev)) {
1881 			ftl_flush_pad_batch(dev);
1882 		}
1883 
1884 		return 0;
1885 	}
1886 
1887 	io = ftl_io_wbuf_init(dev, wptr->addr, wptr->band, batch, ftl_write_cb);
1888 	if (!io) {
1889 		goto error;
1890 	}
1891 
1892 	TAILQ_FOREACH(entry, &batch->entries, tailq) {
1893 		/* Update band's relocation stats if the IO comes from reloc */
1894 		if (entry->io_flags & FTL_IO_WEAK) {
1895 			if (!spdk_bit_array_get(wptr->band->reloc_bitmap, entry->band->id)) {
1896 				spdk_bit_array_set(wptr->band->reloc_bitmap, entry->band->id);
1897 				entry->band->num_reloc_bands++;
1898 			}
1899 		}
1900 
1901 		ftl_trace_wbuf_pop(dev, entry);
1902 		ftl_update_stats(dev, entry);
1903 	}
1904 
1905 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write addr:%lx\n", wptr->addr.offset);
1906 
1907 	if (ftl_submit_write(wptr, io)) {
1908 		/* TODO: we need some recovery here */
1909 		assert(0 && "Write submit failed");
1910 		if (ftl_io_done(io)) {
1911 			ftl_io_free(io);
1912 		}
1913 	}
1914 
1915 	return dev->xfer_size;
1916 error:
1917 	TAILQ_INSERT_TAIL(&dev->pending_batches, batch, tailq);
1918 	return 0;
1919 }
1920 
1921 static int
1922 ftl_process_writes(struct spdk_ftl_dev *dev)
1923 {
1924 	struct ftl_wptr *wptr, *twptr;
1925 	size_t num_active = 0;
1926 	enum ftl_band_state state;
1927 
1928 	LIST_FOREACH_SAFE(wptr, &dev->wptr_list, list_entry, twptr) {
1929 		ftl_wptr_process_writes(wptr);
1930 		state = wptr->band->state;
1931 
1932 		if (state != FTL_BAND_STATE_FULL &&
1933 		    state != FTL_BAND_STATE_CLOSING &&
1934 		    state != FTL_BAND_STATE_CLOSED) {
1935 			num_active++;
1936 		}
1937 	}
1938 
1939 	if (num_active < 1) {
1940 		ftl_add_wptr(dev);
1941 	}
1942 
1943 	return 0;
1944 }
1945 
1946 static void
1947 ftl_fill_wbuf_entry(struct ftl_wbuf_entry *entry, struct ftl_io *io)
1948 {
1949 	memcpy(entry->payload, ftl_io_iovec_addr(io), FTL_BLOCK_SIZE);
1950 
1951 	if (entry->io_flags & FTL_IO_WEAK) {
1952 		entry->band = ftl_band_from_addr(io->dev, io->addr);
1953 		entry->addr = ftl_band_next_addr(entry->band, io->addr, io->pos);
1954 		entry->band->num_reloc_blocks++;
1955 	}
1956 
1957 	entry->trace = io->trace;
1958 	entry->lba = ftl_io_current_lba(io);
1959 }
1960 
1961 static int
1962 ftl_wbuf_fill(struct ftl_io *io)
1963 {
1964 	struct spdk_ftl_dev *dev = io->dev;
1965 	struct ftl_io_channel *ioch;
1966 	struct ftl_wbuf_entry *entry;
1967 
1968 	ioch = ftl_io_channel_get_ctx(io->ioch);
1969 
1970 	while (io->pos < io->num_blocks) {
1971 		if (ftl_io_current_lba(io) == FTL_LBA_INVALID) {
1972 			ftl_io_advance(io, 1);
1973 			continue;
1974 		}
1975 
1976 		entry = ftl_acquire_wbuf_entry(ioch, io->flags);
1977 		if (!entry) {
1978 			TAILQ_INSERT_TAIL(&ioch->retry_queue, io, ioch_entry);
1979 			return 0;
1980 		}
1981 
1982 		ftl_fill_wbuf_entry(entry, io);
1983 
1984 		ftl_trace_wbuf_fill(dev, io);
1985 		ftl_update_l2p(dev, entry, ftl_get_addr_from_entry(entry));
1986 		ftl_io_advance(io, 1);
1987 
1988 		/* Needs to be done after L2P is updated to avoid race with */
1989 		/* write completion callback when it's processed faster than */
1990 		/* L2P is set in update_l2p(). */
1991 		spdk_ring_enqueue(ioch->submit_queue, (void **)&entry, 1, NULL);
1992 	}
1993 
1994 	if (ftl_io_done(io)) {
1995 		if (ftl_dev_has_nv_cache(dev) && !(io->flags & FTL_IO_BYPASS_CACHE)) {
1996 			ftl_write_nv_cache(io);
1997 		} else {
1998 			TAILQ_INSERT_TAIL(&ioch->write_cmpl_queue, io, ioch_entry);
1999 		}
2000 	}
2001 
2002 	return 0;
2003 }
2004 
2005 static bool
2006 ftl_dev_needs_defrag(struct spdk_ftl_dev *dev)
2007 {
2008 	const struct spdk_ftl_limit *limit = ftl_get_limit(dev, SPDK_FTL_LIMIT_START);
2009 
2010 	if (ftl_reloc_is_halted(dev->reloc)) {
2011 		return false;
2012 	}
2013 
2014 	if (ftl_reloc_is_defrag_active(dev->reloc)) {
2015 		return false;
2016 	}
2017 
2018 	if (dev->num_free <= limit->thld) {
2019 		return true;
2020 	}
2021 
2022 	return false;
2023 }
2024 
2025 static double
2026 ftl_band_calc_merit(struct ftl_band *band, size_t *threshold_valid)
2027 {
2028 	size_t usable, valid, invalid;
2029 	double vld_ratio;
2030 
2031 	/* If the band doesn't have any usable blocks it's of no use */
2032 	usable = ftl_band_num_usable_blocks(band);
2033 	if (usable == 0) {
2034 		return 0.0;
2035 	}
2036 
2037 	valid =  threshold_valid ? (usable - *threshold_valid) : band->lba_map.num_vld;
2038 	invalid = usable - valid;
2039 
2040 	/* Add one to avoid division by 0 */
2041 	vld_ratio = (double)invalid / (double)(valid + 1);
2042 	return vld_ratio * ftl_band_age(band);
2043 }
2044 
2045 static bool
2046 ftl_band_needs_defrag(struct ftl_band *band, struct spdk_ftl_dev *dev)
2047 {
2048 	struct spdk_ftl_conf *conf = &dev->conf;
2049 	size_t thld_vld;
2050 
2051 	/* If we're in dire need of free bands, every band is worth defragging */
2052 	if (ftl_current_limit(dev) == SPDK_FTL_LIMIT_CRIT) {
2053 		return true;
2054 	}
2055 
2056 	thld_vld = (ftl_band_num_usable_blocks(band) * conf->invalid_thld) / 100;
2057 
2058 	return band->merit > ftl_band_calc_merit(band, &thld_vld);
2059 }
2060 
2061 static struct ftl_band *
2062 ftl_select_defrag_band(struct spdk_ftl_dev *dev)
2063 {
2064 	struct ftl_band *band, *mband = NULL;
2065 	double merit = 0;
2066 
2067 	LIST_FOREACH(band, &dev->shut_bands, list_entry) {
2068 		assert(band->state == FTL_BAND_STATE_CLOSED);
2069 		band->merit = ftl_band_calc_merit(band, NULL);
2070 		if (band->merit > merit) {
2071 			merit = band->merit;
2072 			mband = band;
2073 		}
2074 	}
2075 
2076 	if (mband && !ftl_band_needs_defrag(mband, dev)) {
2077 		mband = NULL;
2078 	}
2079 
2080 	return mband;
2081 }
2082 
2083 static void
2084 ftl_process_relocs(struct spdk_ftl_dev *dev)
2085 {
2086 	struct ftl_band *band;
2087 
2088 	if (ftl_dev_needs_defrag(dev)) {
2089 		band = ftl_select_defrag_band(dev);
2090 		if (band) {
2091 			ftl_reloc_add(dev->reloc, band, 0, ftl_get_num_blocks_in_band(dev), 0, true);
2092 			ftl_trace_defrag_band(dev, band);
2093 		}
2094 	}
2095 
2096 	ftl_reloc(dev->reloc);
2097 }
2098 
2099 int
2100 ftl_current_limit(const struct spdk_ftl_dev *dev)
2101 {
2102 	return dev->limit;
2103 }
2104 
2105 void
2106 spdk_ftl_dev_get_attrs(const struct spdk_ftl_dev *dev, struct spdk_ftl_attrs *attrs)
2107 {
2108 	attrs->uuid = dev->uuid;
2109 	attrs->num_blocks = dev->num_lbas;
2110 	attrs->block_size = FTL_BLOCK_SIZE;
2111 	attrs->num_zones = ftl_get_num_zones(dev);
2112 	attrs->zone_size = ftl_get_num_blocks_in_zone(dev);
2113 	attrs->conf = dev->conf;
2114 	attrs->base_bdev = spdk_bdev_get_name(spdk_bdev_desc_get_bdev(dev->base_bdev_desc));
2115 
2116 	attrs->cache_bdev = NULL;
2117 	if (dev->nv_cache.bdev_desc) {
2118 		attrs->cache_bdev = spdk_bdev_get_name(
2119 					    spdk_bdev_desc_get_bdev(dev->nv_cache.bdev_desc));
2120 	}
2121 }
2122 
2123 static void
2124 _ftl_io_write(void *ctx)
2125 {
2126 	ftl_io_write((struct ftl_io *)ctx);
2127 }
2128 
2129 static int
2130 ftl_submit_write_leaf(struct ftl_io *io)
2131 {
2132 	int rc;
2133 
2134 	rc = ftl_submit_write(ftl_wptr_from_band(io->band), io);
2135 	if (rc == -EAGAIN) {
2136 		/* EAGAIN means that the request was put on the pending queue */
2137 		return 0;
2138 	}
2139 
2140 	return rc;
2141 }
2142 
2143 void
2144 ftl_io_write(struct ftl_io *io)
2145 {
2146 	struct spdk_ftl_dev *dev = io->dev;
2147 	struct ftl_io_channel *ioch = ftl_io_channel_get_ctx(io->ioch);
2148 
2149 	/* Put the IO on retry queue in case IO channel is not initialized */
2150 	if (spdk_unlikely(ioch->index == FTL_IO_CHANNEL_INDEX_INVALID)) {
2151 		TAILQ_INSERT_TAIL(&ioch->retry_queue, io, ioch_entry);
2152 		return;
2153 	}
2154 
2155 	/* For normal IOs we just need to copy the data onto the write buffer */
2156 	if (!(io->flags & FTL_IO_MD)) {
2157 		ftl_io_call_foreach_child(io, ftl_wbuf_fill);
2158 	} else {
2159 		/* Metadata has its own buffer, so it doesn't have to be copied, so just */
2160 		/* send it the the core thread and schedule the write immediately */
2161 		if (ftl_check_core_thread(dev)) {
2162 			ftl_io_call_foreach_child(io, ftl_submit_write_leaf);
2163 		} else {
2164 			spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_write, io);
2165 		}
2166 	}
2167 }
2168 
2169 int
2170 spdk_ftl_write(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
2171 	       struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
2172 {
2173 	struct ftl_io *io;
2174 
2175 	if (iov_cnt == 0) {
2176 		return -EINVAL;
2177 	}
2178 
2179 	if (lba_cnt == 0) {
2180 		return -EINVAL;
2181 	}
2182 
2183 	if (lba_cnt != ftl_iovec_num_blocks(iov, iov_cnt)) {
2184 		return -EINVAL;
2185 	}
2186 
2187 	if (!dev->initialized) {
2188 		return -EBUSY;
2189 	}
2190 
2191 	io = ftl_io_user_init(ch, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_WRITE);
2192 	if (!io) {
2193 		return -ENOMEM;
2194 	}
2195 
2196 	ftl_io_write(io);
2197 
2198 	return 0;
2199 }
2200 
2201 void
2202 ftl_io_read(struct ftl_io *io)
2203 {
2204 	ftl_io_call_foreach_child(io, ftl_submit_read);
2205 }
2206 
2207 int
2208 spdk_ftl_read(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
2209 	      struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
2210 {
2211 	struct ftl_io *io;
2212 
2213 	if (iov_cnt == 0) {
2214 		return -EINVAL;
2215 	}
2216 
2217 	if (lba_cnt == 0) {
2218 		return -EINVAL;
2219 	}
2220 
2221 	if (lba_cnt != ftl_iovec_num_blocks(iov, iov_cnt)) {
2222 		return -EINVAL;
2223 	}
2224 
2225 	if (!dev->initialized) {
2226 		return -EBUSY;
2227 	}
2228 
2229 	io = ftl_io_user_init(ch, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_READ);
2230 	if (!io) {
2231 		return -ENOMEM;
2232 	}
2233 
2234 	ftl_io_read(io);
2235 	return 0;
2236 }
2237 
2238 static struct ftl_flush *
2239 ftl_flush_init(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2240 {
2241 	struct ftl_flush *flush;
2242 
2243 	flush = calloc(1, sizeof(*flush));
2244 	if (!flush) {
2245 		return NULL;
2246 	}
2247 
2248 	flush->bmap = spdk_bit_array_create(FTL_BATCH_COUNT);
2249 	if (!flush->bmap) {
2250 		goto error;
2251 	}
2252 
2253 	flush->dev = dev;
2254 	flush->cb.fn = cb_fn;
2255 	flush->cb.ctx = cb_arg;
2256 
2257 	return flush;
2258 error:
2259 	free(flush);
2260 	return NULL;
2261 }
2262 
2263 static void
2264 _ftl_flush(void *ctx)
2265 {
2266 	struct ftl_flush *flush = ctx;
2267 	struct spdk_ftl_dev *dev = flush->dev;
2268 	uint32_t i;
2269 
2270 	/* Attach flush object to all non-empty batches */
2271 	for (i = 0; i < FTL_BATCH_COUNT; ++i) {
2272 		if (dev->batch_array[i].num_entries > 0) {
2273 			spdk_bit_array_set(flush->bmap, i);
2274 			flush->num_req++;
2275 		}
2276 	}
2277 
2278 	LIST_INSERT_HEAD(&dev->flush_list, flush, list_entry);
2279 
2280 	/* If the write buffer was already empty, the flush can be completed right away */
2281 	if (!flush->num_req) {
2282 		ftl_complete_flush(flush);
2283 	}
2284 }
2285 
2286 int
2287 ftl_flush_wbuf(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2288 {
2289 	struct ftl_flush *flush;
2290 
2291 	flush = ftl_flush_init(dev, cb_fn, cb_arg);
2292 	if (!flush) {
2293 		return -ENOMEM;
2294 	}
2295 
2296 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_flush, flush);
2297 	return 0;
2298 }
2299 
2300 int
2301 spdk_ftl_flush(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2302 {
2303 	if (!dev->initialized) {
2304 		return -EBUSY;
2305 	}
2306 
2307 	return ftl_flush_wbuf(dev, cb_fn, cb_arg);
2308 }
2309 
2310 bool
2311 ftl_addr_is_written(struct ftl_band *band, struct ftl_addr addr)
2312 {
2313 	struct ftl_zone *zone = ftl_band_zone_from_addr(band, addr);
2314 
2315 	return addr.offset < zone->info.write_pointer;
2316 }
2317 
2318 static void ftl_process_media_event(struct spdk_ftl_dev *dev, struct spdk_bdev_media_event event);
2319 
2320 static void
2321 _ftl_process_media_event(void *ctx)
2322 {
2323 	struct ftl_media_event *event = ctx;
2324 	struct spdk_ftl_dev *dev = event->dev;
2325 
2326 	ftl_process_media_event(dev, event->event);
2327 	spdk_mempool_put(dev->media_events_pool, event);
2328 }
2329 
2330 static void
2331 ftl_process_media_event(struct spdk_ftl_dev *dev, struct spdk_bdev_media_event event)
2332 {
2333 	struct ftl_band *band;
2334 	struct ftl_addr addr = { .offset = event.offset };
2335 	size_t block_off;
2336 
2337 	if (!ftl_check_core_thread(dev)) {
2338 		struct ftl_media_event *media_event;
2339 
2340 		media_event = spdk_mempool_get(dev->media_events_pool);
2341 		if (!media_event) {
2342 			SPDK_ERRLOG("Media event lost due to lack of memory");
2343 			return;
2344 		}
2345 
2346 		media_event->dev = dev;
2347 		media_event->event = event;
2348 		spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_process_media_event,
2349 				     media_event);
2350 		return;
2351 	}
2352 
2353 	band = ftl_band_from_addr(dev, addr);
2354 	block_off = ftl_band_block_offset_from_addr(band, addr);
2355 
2356 	ftl_reloc_add(dev->reloc, band, block_off, event.num_blocks, 0, false);
2357 }
2358 
2359 void
2360 ftl_get_media_events(struct spdk_ftl_dev *dev)
2361 {
2362 #define FTL_MAX_MEDIA_EVENTS 128
2363 	struct spdk_bdev_media_event events[FTL_MAX_MEDIA_EVENTS];
2364 	size_t num_events, i;
2365 
2366 	if (!dev->initialized) {
2367 		return;
2368 	}
2369 
2370 	do {
2371 		num_events = spdk_bdev_get_media_events(dev->base_bdev_desc,
2372 							events, FTL_MAX_MEDIA_EVENTS);
2373 
2374 		for (i = 0; i < num_events; ++i) {
2375 			ftl_process_media_event(dev, events[i]);
2376 		}
2377 
2378 	} while (num_events);
2379 }
2380 
2381 int
2382 ftl_io_channel_poll(void *arg)
2383 {
2384 	struct ftl_io_channel *ch = arg;
2385 	struct ftl_io *io;
2386 	TAILQ_HEAD(, ftl_io) retry_queue;
2387 
2388 	if (TAILQ_EMPTY(&ch->write_cmpl_queue) && TAILQ_EMPTY(&ch->retry_queue)) {
2389 		return 0;
2390 	}
2391 
2392 	while (!TAILQ_EMPTY(&ch->write_cmpl_queue)) {
2393 		io = TAILQ_FIRST(&ch->write_cmpl_queue);
2394 		TAILQ_REMOVE(&ch->write_cmpl_queue, io, ioch_entry);
2395 		ftl_io_complete(io);
2396 	}
2397 
2398 	/*
2399 	 * Create local copy of the retry queue to prevent from infinite retrying if IO will be
2400 	 * inserted to the retry queue again
2401 	 */
2402 	TAILQ_INIT(&retry_queue);
2403 	TAILQ_SWAP(&ch->retry_queue, &retry_queue, ftl_io, ioch_entry);
2404 
2405 	while (!TAILQ_EMPTY(&retry_queue)) {
2406 		io = TAILQ_FIRST(&retry_queue);
2407 		TAILQ_REMOVE(&retry_queue, io, ioch_entry);
2408 		if (io->type == FTL_IO_WRITE) {
2409 			ftl_io_write(io);
2410 		} else {
2411 			ftl_io_read(io);
2412 		}
2413 	}
2414 
2415 	return 1;
2416 }
2417 
2418 int
2419 ftl_task_core(void *ctx)
2420 {
2421 	struct spdk_ftl_dev *dev = ctx;
2422 
2423 	if (dev->halt) {
2424 		if (ftl_shutdown_complete(dev)) {
2425 			spdk_poller_unregister(&dev->core_poller);
2426 			return 0;
2427 		}
2428 	}
2429 
2430 	ftl_process_writes(dev);
2431 	ftl_process_relocs(dev);
2432 
2433 	return 0;
2434 }
2435 
2436 SPDK_LOG_REGISTER_COMPONENT("ftl_core", SPDK_LOG_FTL_CORE)
2437