xref: /spdk/lib/ftl/ftl_core.c (revision 88e3ffd7b6c5ec1ea1a660354d25f02c766092e1)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/likely.h"
35 #include "spdk/stdinc.h"
36 #include "spdk/nvme.h"
37 #include "spdk/thread.h"
38 #include "spdk/bdev_module.h"
39 #include "spdk/string.h"
40 #include "spdk/log.h"
41 #include "spdk/ftl.h"
42 #include "spdk/crc32.h"
43 
44 #include "ftl_core.h"
45 #include "ftl_band.h"
46 #include "ftl_io.h"
47 #include "ftl_debug.h"
48 #include "ftl_reloc.h"
49 
50 struct ftl_band_flush {
51 	struct spdk_ftl_dev		*dev;
52 	/* Number of bands left to be flushed */
53 	size_t				num_bands;
54 	/* User callback */
55 	spdk_ftl_fn			cb_fn;
56 	/* Callback's argument */
57 	void				*cb_arg;
58 	/* List link */
59 	LIST_ENTRY(ftl_band_flush)	list_entry;
60 };
61 
62 struct ftl_wptr {
63 	/* Owner device */
64 	struct spdk_ftl_dev		*dev;
65 
66 	/* Current address */
67 	struct ftl_addr			addr;
68 
69 	/* Band currently being written to */
70 	struct ftl_band			*band;
71 
72 	/* Current logical block's offset */
73 	uint64_t			offset;
74 
75 	/* Current zone */
76 	struct ftl_zone			*zone;
77 
78 	/* Pending IO queue */
79 	TAILQ_HEAD(, ftl_io)		pending_queue;
80 
81 	/* List link */
82 	LIST_ENTRY(ftl_wptr)		list_entry;
83 
84 	/*
85 	 * If setup in direct mode, there will be no offset or band state update after IO.
86 	 * The zoned bdev address is not assigned by wptr, and is instead taken directly
87 	 * from the request.
88 	 */
89 	bool				direct_mode;
90 
91 	/* Number of outstanding write requests */
92 	uint32_t			num_outstanding;
93 
94 	/* Marks that the band related to this wptr needs to be closed as soon as possible */
95 	bool				flush;
96 };
97 
98 struct ftl_flush {
99 	/* Owner device */
100 	struct spdk_ftl_dev		*dev;
101 
102 	/* Number of batches to wait for */
103 	size_t				num_req;
104 
105 	/* Callback */
106 	struct {
107 		spdk_ftl_fn		fn;
108 		void			*ctx;
109 	} cb;
110 
111 	/* Batch bitmap */
112 	struct spdk_bit_array		*bmap;
113 
114 	/* List link */
115 	LIST_ENTRY(ftl_flush)		list_entry;
116 };
117 
118 static void
119 ftl_wptr_free(struct ftl_wptr *wptr)
120 {
121 	if (!wptr) {
122 		return;
123 	}
124 
125 	free(wptr);
126 }
127 
128 static void
129 ftl_remove_wptr(struct ftl_wptr *wptr)
130 {
131 	struct spdk_ftl_dev *dev = wptr->dev;
132 	struct ftl_band_flush *flush, *tmp;
133 
134 	if (spdk_unlikely(wptr->flush)) {
135 		LIST_FOREACH_SAFE(flush, &dev->band_flush_list, list_entry, tmp) {
136 			assert(flush->num_bands > 0);
137 			if (--flush->num_bands == 0) {
138 				flush->cb_fn(flush->cb_arg, 0);
139 				LIST_REMOVE(flush, list_entry);
140 				free(flush);
141 			}
142 		}
143 	}
144 
145 	LIST_REMOVE(wptr, list_entry);
146 	ftl_wptr_free(wptr);
147 }
148 
149 static struct ftl_wbuf_entry *
150 ftl_acquire_wbuf_entry(struct ftl_io_channel *io_channel, int io_flags)
151 {
152 	struct ftl_wbuf_entry *entry = NULL;
153 	uint32_t qdepth;
154 
155 	if (!(io_flags & FTL_IO_INTERNAL)) {
156 		qdepth = __atomic_fetch_add(&io_channel->qdepth_current, 1, __ATOMIC_SEQ_CST);
157 		if (qdepth >= io_channel->qdepth_limit) {
158 			__atomic_fetch_sub(&io_channel->qdepth_current, 1, __ATOMIC_SEQ_CST);
159 			return NULL;
160 		}
161 	}
162 
163 	if (spdk_ring_dequeue(io_channel->free_queue, (void **)&entry, 1) != 1) {
164 		if (!(io_flags & FTL_IO_INTERNAL)) {
165 			__atomic_fetch_sub(&io_channel->qdepth_current, 1, __ATOMIC_SEQ_CST);
166 		}
167 
168 		return NULL;
169 	}
170 
171 	assert(entry != NULL);
172 
173 	ftl_evict_cache_entry(io_channel->dev, entry);
174 
175 	entry->io_flags = io_flags;
176 	entry->addr.offset = FTL_ADDR_INVALID;
177 	entry->lba = FTL_LBA_INVALID;
178 	entry->band = NULL;
179 	entry->valid = false;
180 
181 	return entry;
182 }
183 
184 static void
185 ftl_release_wbuf_entry(struct ftl_wbuf_entry *entry)
186 {
187 	struct ftl_io_channel *io_channel = entry->ioch;
188 
189 	if (!(entry->io_flags & FTL_IO_INTERNAL)) {
190 		__atomic_fetch_sub(&io_channel->qdepth_current, 1, __ATOMIC_SEQ_CST);
191 	}
192 
193 	spdk_ring_enqueue(io_channel->free_queue, (void **)&entry, 1, NULL);
194 }
195 
196 static struct ftl_batch *
197 ftl_get_next_batch(struct spdk_ftl_dev *dev)
198 {
199 	struct ftl_batch *batch = dev->current_batch;
200 	struct ftl_io_channel *ioch;
201 #define FTL_DEQUEUE_ENTRIES 128
202 	struct ftl_wbuf_entry *entries[FTL_DEQUEUE_ENTRIES];
203 	TAILQ_HEAD(, ftl_io_channel) ioch_queue;
204 	size_t i, num_dequeued, num_remaining;
205 	uint64_t *metadata;
206 
207 	if (batch == NULL) {
208 		batch = TAILQ_FIRST(&dev->pending_batches);
209 		if (batch != NULL) {
210 			TAILQ_REMOVE(&dev->pending_batches, batch, tailq);
211 			return batch;
212 		}
213 
214 		batch = TAILQ_FIRST(&dev->free_batches);
215 		if (spdk_unlikely(batch == NULL)) {
216 			return NULL;
217 		}
218 
219 		assert(TAILQ_EMPTY(&batch->entries));
220 		assert(batch->num_entries == 0);
221 		TAILQ_REMOVE(&dev->free_batches, batch, tailq);
222 	}
223 
224 	/*
225 	 * Keep shifting the queue to ensure fairness in IO channel selection.  Each time
226 	 * ftl_get_next_batch() is called, we're starting to dequeue write buffer entries from a
227 	 * different IO channel.
228 	 */
229 	TAILQ_INIT(&ioch_queue);
230 	while (!TAILQ_EMPTY(&dev->ioch_queue)) {
231 		ioch = TAILQ_FIRST(&dev->ioch_queue);
232 		TAILQ_REMOVE(&dev->ioch_queue, ioch, tailq);
233 		TAILQ_INSERT_TAIL(&ioch_queue, ioch, tailq);
234 
235 		num_remaining = dev->xfer_size - batch->num_entries;
236 		while (num_remaining > 0) {
237 			num_dequeued = spdk_ring_dequeue(ioch->submit_queue, (void **)entries,
238 							 spdk_min(num_remaining,
239 									 FTL_DEQUEUE_ENTRIES));
240 			if (num_dequeued == 0) {
241 				break;
242 			}
243 
244 			for (i = 0; i < num_dequeued; ++i) {
245 				batch->iov[batch->num_entries + i].iov_base = entries[i]->payload;
246 				batch->iov[batch->num_entries + i].iov_len = FTL_BLOCK_SIZE;
247 
248 				if (batch->metadata != NULL) {
249 					metadata = (uint64_t *)((char *)batch->metadata +
250 								i * dev->md_size);
251 					*metadata = entries[i]->lba;
252 				}
253 
254 				TAILQ_INSERT_TAIL(&batch->entries, entries[i], tailq);
255 			}
256 
257 			batch->num_entries += num_dequeued;
258 			num_remaining -= num_dequeued;
259 		}
260 
261 		if (num_remaining == 0) {
262 			break;
263 		}
264 	}
265 
266 	TAILQ_CONCAT(&dev->ioch_queue, &ioch_queue, tailq);
267 
268 	if (batch->num_entries == dev->xfer_size) {
269 		dev->current_batch = NULL;
270 	} else {
271 		dev->current_batch = batch;
272 		batch = NULL;
273 	}
274 
275 	return batch;
276 }
277 
278 static void
279 ftl_release_batch(struct spdk_ftl_dev *dev, struct ftl_batch *batch)
280 {
281 	struct ftl_wbuf_entry *entry;
282 
283 	while (!TAILQ_EMPTY(&batch->entries)) {
284 		entry = TAILQ_FIRST(&batch->entries);
285 		TAILQ_REMOVE(&batch->entries, entry, tailq);
286 		ftl_release_wbuf_entry(entry);
287 	}
288 
289 	batch->num_entries = 0;
290 	TAILQ_INSERT_TAIL(&dev->free_batches, batch, tailq);
291 }
292 
293 static struct ftl_wbuf_entry *
294 ftl_get_entry_from_addr(struct spdk_ftl_dev *dev, struct ftl_addr addr)
295 {
296 	struct ftl_io_channel *ioch;
297 	uint64_t ioch_offset, entry_offset;
298 
299 	ioch_offset = addr.cache_offset & ((1 << dev->ioch_shift) - 1);
300 	entry_offset = addr.cache_offset >> dev->ioch_shift;
301 	ioch = dev->ioch_array[ioch_offset];
302 
303 	assert(ioch_offset < dev->conf.max_io_channels);
304 	assert(entry_offset < ioch->num_entries);
305 	assert(addr.cached == 1);
306 
307 	return &ioch->wbuf_entries[entry_offset];
308 }
309 
310 static struct ftl_addr
311 ftl_get_addr_from_entry(struct ftl_wbuf_entry *entry)
312 {
313 	struct ftl_io_channel *ioch = entry->ioch;
314 	struct ftl_addr addr = {};
315 
316 	addr.cached = 1;
317 	addr.cache_offset = (uint64_t)entry->index << ioch->dev->ioch_shift | ioch->index;
318 
319 	return addr;
320 }
321 
322 static void
323 ftl_io_cmpl_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
324 {
325 	struct ftl_io *io = cb_arg;
326 	struct spdk_ftl_dev *dev = io->dev;
327 
328 	if (spdk_unlikely(!success)) {
329 		io->status = -EIO;
330 	}
331 
332 	ftl_trace_completion(dev, io, FTL_TRACE_COMPLETION_DISK);
333 
334 	if (io->type == FTL_IO_WRITE && ftl_is_append_supported(dev)) {
335 		assert(io->parent);
336 		io->parent->addr.offset = spdk_bdev_io_get_append_location(bdev_io);
337 	}
338 
339 	ftl_io_dec_req(io);
340 	if (ftl_io_done(io)) {
341 		ftl_io_complete(io);
342 	}
343 
344 	spdk_bdev_free_io(bdev_io);
345 }
346 
347 static void
348 ftl_halt_writes(struct spdk_ftl_dev *dev, struct ftl_band *band)
349 {
350 	struct ftl_wptr *wptr = NULL;
351 
352 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
353 		if (wptr->band == band) {
354 			break;
355 		}
356 	}
357 
358 	/* If the band already has the high_prio flag set, other writes must */
359 	/* have failed earlier, so it's already taken care of. */
360 	if (band->high_prio) {
361 		assert(wptr == NULL);
362 		return;
363 	}
364 
365 	ftl_band_write_failed(band);
366 	ftl_remove_wptr(wptr);
367 }
368 
369 static struct ftl_wptr *
370 ftl_wptr_from_band(struct ftl_band *band)
371 {
372 	struct spdk_ftl_dev *dev = band->dev;
373 	struct ftl_wptr *wptr = NULL;
374 
375 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
376 		if (wptr->band == band) {
377 			return wptr;
378 		}
379 	}
380 	assert(false);
381 	return NULL;
382 }
383 
384 static void
385 ftl_md_write_fail(struct ftl_io *io, int status)
386 {
387 	struct ftl_band *band = io->band;
388 	struct ftl_wptr *wptr;
389 	char buf[128];
390 
391 	wptr = ftl_wptr_from_band(band);
392 	SPDK_ERRLOG("Metadata write failed @addr: %s, status: %d\n",
393 		    ftl_addr2str(wptr->addr, buf, sizeof(buf)), status);
394 
395 	ftl_halt_writes(io->dev, band);
396 }
397 
398 static void
399 ftl_md_write_cb(struct ftl_io *io, void *arg, int status)
400 {
401 	struct spdk_ftl_dev *dev = io->dev;
402 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
403 	struct ftl_band *band = io->band;
404 	struct ftl_wptr *wptr;
405 	size_t id;
406 
407 	wptr = ftl_wptr_from_band(band);
408 
409 	if (status) {
410 		ftl_md_write_fail(io, status);
411 		return;
412 	}
413 
414 	ftl_band_set_next_state(band);
415 	if (band->state == FTL_BAND_STATE_CLOSED) {
416 		if (ftl_dev_has_nv_cache(dev)) {
417 			pthread_spin_lock(&nv_cache->lock);
418 			nv_cache->num_available += ftl_band_user_blocks(band);
419 
420 			if (spdk_unlikely(nv_cache->num_available > nv_cache->num_data_blocks)) {
421 				nv_cache->num_available = nv_cache->num_data_blocks;
422 			}
423 			pthread_spin_unlock(&nv_cache->lock);
424 		}
425 
426 		/*
427 		 * Go through the reloc_bitmap, checking for all the bands that had its data moved
428 		 * onto current band and update their counters to allow them to be used for writing
429 		 * (once they're closed and empty).
430 		 */
431 		for (id = 0; id < ftl_get_num_bands(dev); ++id) {
432 			if (spdk_bit_array_get(band->reloc_bitmap, id)) {
433 				assert(dev->bands[id].num_reloc_bands > 0);
434 				dev->bands[id].num_reloc_bands--;
435 
436 				spdk_bit_array_clear(band->reloc_bitmap, id);
437 			}
438 		}
439 
440 		ftl_remove_wptr(wptr);
441 	}
442 }
443 
444 static int
445 ftl_read_next_physical_addr(struct ftl_io *io, struct ftl_addr *addr)
446 {
447 	struct spdk_ftl_dev *dev = io->dev;
448 	size_t num_blocks, max_blocks;
449 
450 	assert(ftl_io_mode_physical(io));
451 	assert(io->iov_pos < io->iov_cnt);
452 
453 	if (io->pos == 0) {
454 		*addr = io->addr;
455 	} else {
456 		*addr = ftl_band_next_xfer_addr(io->band, io->addr, io->pos);
457 	}
458 
459 	assert(!ftl_addr_invalid(*addr));
460 
461 	/* Metadata has to be read in the way it's written (jumping across */
462 	/* the zones in xfer_size increments) */
463 	if (io->flags & FTL_IO_MD) {
464 		max_blocks = dev->xfer_size - (addr->offset % dev->xfer_size);
465 		num_blocks = spdk_min(ftl_io_iovec_len_left(io), max_blocks);
466 		assert(addr->offset / dev->xfer_size ==
467 		       (addr->offset + num_blocks - 1) / dev->xfer_size);
468 	} else {
469 		num_blocks = ftl_io_iovec_len_left(io);
470 	}
471 
472 	return num_blocks;
473 }
474 
475 static int
476 ftl_wptr_close_band(struct ftl_wptr *wptr)
477 {
478 	struct ftl_band *band = wptr->band;
479 
480 	ftl_band_set_state(band, FTL_BAND_STATE_CLOSING);
481 
482 	return ftl_band_write_tail_md(band, ftl_md_write_cb);
483 }
484 
485 static int
486 ftl_wptr_open_band(struct ftl_wptr *wptr)
487 {
488 	struct ftl_band *band = wptr->band;
489 
490 	assert(ftl_band_zone_is_first(band, wptr->zone));
491 	assert(band->lba_map.num_vld == 0);
492 
493 	ftl_band_clear_lba_map(band);
494 
495 	assert(band->state == FTL_BAND_STATE_PREP);
496 	ftl_band_set_state(band, FTL_BAND_STATE_OPENING);
497 
498 	return ftl_band_write_head_md(band, ftl_md_write_cb);
499 }
500 
501 static int
502 ftl_submit_erase(struct ftl_io *io)
503 {
504 	struct spdk_ftl_dev *dev = io->dev;
505 	struct ftl_band *band = io->band;
506 	struct ftl_addr addr = io->addr;
507 	struct ftl_io_channel *ioch;
508 	struct ftl_zone *zone;
509 	int rc = 0;
510 	size_t i;
511 
512 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
513 
514 	for (i = 0; i < io->num_blocks; ++i) {
515 		if (i != 0) {
516 			zone = ftl_band_next_zone(band, ftl_band_zone_from_addr(band, addr));
517 			assert(zone->info.state == SPDK_BDEV_ZONE_STATE_FULL);
518 			addr.offset = zone->info.zone_id;
519 		}
520 
521 		assert(ftl_addr_get_zone_offset(dev, addr) == 0);
522 
523 		ftl_trace_submission(dev, io, addr, 1);
524 		rc = spdk_bdev_zone_management(dev->base_bdev_desc, ioch->base_ioch, addr.offset,
525 					       SPDK_BDEV_ZONE_RESET, ftl_io_cmpl_cb, io);
526 		if (spdk_unlikely(rc)) {
527 			ftl_io_fail(io, rc);
528 			SPDK_ERRLOG("Vector reset failed with status: %d\n", rc);
529 			break;
530 		}
531 
532 		ftl_io_inc_req(io);
533 		ftl_io_advance(io, 1);
534 	}
535 
536 	if (ftl_io_done(io)) {
537 		ftl_io_complete(io);
538 	}
539 
540 	return rc;
541 }
542 
543 static bool
544 ftl_check_core_thread(const struct spdk_ftl_dev *dev)
545 {
546 	return dev->core_thread == spdk_get_thread();
547 }
548 
549 struct spdk_io_channel *
550 ftl_get_io_channel(const struct spdk_ftl_dev *dev)
551 {
552 	if (ftl_check_core_thread(dev)) {
553 		return dev->ioch;
554 	}
555 
556 	return NULL;
557 }
558 
559 static void
560 ftl_erase_fail(struct ftl_io *io, int status)
561 {
562 	struct ftl_zone *zone;
563 	struct ftl_band *band = io->band;
564 	char buf[128];
565 
566 	SPDK_ERRLOG("Erase failed at address: %s, status: %d\n",
567 		    ftl_addr2str(io->addr, buf, sizeof(buf)), status);
568 
569 	zone = ftl_band_zone_from_addr(band, io->addr);
570 	zone->info.state = SPDK_BDEV_ZONE_STATE_OFFLINE;
571 	ftl_band_remove_zone(band, zone);
572 	band->tail_md_addr = ftl_band_tail_md_addr(band);
573 }
574 
575 static void
576 ftl_zone_erase_cb(struct ftl_io *io, void *ctx, int status)
577 {
578 	struct ftl_zone *zone;
579 
580 	zone = ftl_band_zone_from_addr(io->band, io->addr);
581 	zone->busy = false;
582 
583 	if (spdk_unlikely(status)) {
584 		ftl_erase_fail(io, status);
585 		return;
586 	}
587 
588 	zone->info.state = SPDK_BDEV_ZONE_STATE_EMPTY;
589 	zone->info.write_pointer = zone->info.zone_id;
590 }
591 
592 static int
593 ftl_band_erase(struct ftl_band *band)
594 {
595 	struct ftl_zone *zone;
596 	struct ftl_io *io;
597 	int rc = 0;
598 
599 	assert(band->state == FTL_BAND_STATE_CLOSED ||
600 	       band->state == FTL_BAND_STATE_FREE);
601 
602 	ftl_band_set_state(band, FTL_BAND_STATE_PREP);
603 
604 	CIRCLEQ_FOREACH(zone, &band->zones, circleq) {
605 		if (zone->info.state == SPDK_BDEV_ZONE_STATE_EMPTY) {
606 			continue;
607 		}
608 
609 		io = ftl_io_erase_init(band, 1, ftl_zone_erase_cb);
610 		if (!io) {
611 			rc = -ENOMEM;
612 			break;
613 		}
614 
615 		zone->busy = true;
616 		io->addr.offset = zone->info.zone_id;
617 		rc = ftl_submit_erase(io);
618 		if (rc) {
619 			zone->busy = false;
620 			assert(0);
621 			/* TODO: change band's state back to close? */
622 			break;
623 		}
624 	}
625 
626 	return rc;
627 }
628 
629 static struct ftl_band *
630 ftl_next_write_band(struct spdk_ftl_dev *dev)
631 {
632 	struct ftl_band *band;
633 
634 	/* Find a free band that has all of its data moved onto other closed bands */
635 	LIST_FOREACH(band, &dev->free_bands, list_entry) {
636 		assert(band->state == FTL_BAND_STATE_FREE);
637 		if (band->num_reloc_bands == 0 && band->num_reloc_blocks == 0) {
638 			break;
639 		}
640 	}
641 
642 	if (spdk_unlikely(!band)) {
643 		return NULL;
644 	}
645 
646 	if (ftl_band_erase(band)) {
647 		/* TODO: handle erase failure */
648 		return NULL;
649 	}
650 
651 	return band;
652 }
653 
654 static struct ftl_band *
655 ftl_next_wptr_band(struct spdk_ftl_dev *dev)
656 {
657 	struct ftl_band *band;
658 
659 	if (!dev->next_band) {
660 		band = ftl_next_write_band(dev);
661 	} else {
662 		assert(dev->next_band->state == FTL_BAND_STATE_PREP);
663 		band = dev->next_band;
664 		dev->next_band = NULL;
665 	}
666 
667 	return band;
668 }
669 
670 static struct ftl_wptr *
671 ftl_wptr_init(struct ftl_band *band)
672 {
673 	struct spdk_ftl_dev *dev = band->dev;
674 	struct ftl_wptr *wptr;
675 
676 	wptr = calloc(1, sizeof(*wptr));
677 	if (!wptr) {
678 		return NULL;
679 	}
680 
681 	wptr->dev = dev;
682 	wptr->band = band;
683 	wptr->zone = CIRCLEQ_FIRST(&band->zones);
684 	wptr->addr.offset = wptr->zone->info.zone_id;
685 	TAILQ_INIT(&wptr->pending_queue);
686 
687 	return wptr;
688 }
689 
690 static int
691 ftl_add_direct_wptr(struct ftl_band *band)
692 {
693 	struct spdk_ftl_dev *dev = band->dev;
694 	struct ftl_wptr *wptr;
695 
696 	assert(band->state == FTL_BAND_STATE_OPEN);
697 
698 	wptr = ftl_wptr_init(band);
699 	if (!wptr) {
700 		return -1;
701 	}
702 
703 	wptr->direct_mode = true;
704 
705 	if (ftl_band_alloc_lba_map(band)) {
706 		ftl_wptr_free(wptr);
707 		return -1;
708 	}
709 
710 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
711 
712 	SPDK_DEBUGLOG(ftl_core, "wptr: direct band %u\n", band->id);
713 	ftl_trace_write_band(dev, band);
714 	return 0;
715 }
716 
717 static void
718 ftl_close_direct_wptr(struct ftl_band *band)
719 {
720 	struct ftl_wptr *wptr = ftl_wptr_from_band(band);
721 
722 	assert(wptr->direct_mode);
723 	assert(band->state == FTL_BAND_STATE_CLOSED);
724 
725 	ftl_band_release_lba_map(band);
726 
727 	ftl_remove_wptr(wptr);
728 }
729 
730 int
731 ftl_band_set_direct_access(struct ftl_band *band, bool access)
732 {
733 	if (access) {
734 		return ftl_add_direct_wptr(band);
735 	} else {
736 		ftl_close_direct_wptr(band);
737 		return 0;
738 	}
739 }
740 
741 static int
742 ftl_add_wptr(struct spdk_ftl_dev *dev)
743 {
744 	struct ftl_band *band;
745 	struct ftl_wptr *wptr;
746 
747 	band = ftl_next_wptr_band(dev);
748 	if (!band) {
749 		return -1;
750 	}
751 
752 	wptr = ftl_wptr_init(band);
753 	if (!wptr) {
754 		return -1;
755 	}
756 
757 	if (ftl_band_write_prep(band)) {
758 		ftl_wptr_free(wptr);
759 		return -1;
760 	}
761 
762 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
763 
764 	SPDK_DEBUGLOG(ftl_core, "wptr: band %u\n", band->id);
765 	ftl_trace_write_band(dev, band);
766 	return 0;
767 }
768 
769 static void
770 ftl_wptr_advance(struct ftl_wptr *wptr, size_t xfer_size)
771 {
772 	struct ftl_band *band = wptr->band;
773 	struct spdk_ftl_dev *dev = wptr->dev;
774 	struct spdk_ftl_conf *conf = &dev->conf;
775 	size_t next_thld;
776 
777 	if (spdk_unlikely(wptr->direct_mode)) {
778 		return;
779 	}
780 
781 	wptr->offset += xfer_size;
782 	next_thld = (ftl_band_num_usable_blocks(band) * conf->band_thld) / 100;
783 
784 	if (ftl_band_full(band, wptr->offset)) {
785 		ftl_band_set_state(band, FTL_BAND_STATE_FULL);
786 	}
787 
788 	wptr->zone->busy = true;
789 	wptr->addr = ftl_band_next_xfer_addr(band, wptr->addr, xfer_size);
790 	wptr->zone = ftl_band_next_operational_zone(band, wptr->zone);
791 
792 	assert(!ftl_addr_invalid(wptr->addr));
793 
794 	SPDK_DEBUGLOG(ftl_core, "wptr: pu:%lu band:%lu, offset:%lu\n",
795 		      ftl_addr_get_punit(dev, wptr->addr),
796 		      ftl_addr_get_band(dev, wptr->addr),
797 		      wptr->addr.offset);
798 
799 	if (wptr->offset >= next_thld && !dev->next_band) {
800 		dev->next_band = ftl_next_write_band(dev);
801 	}
802 }
803 
804 static size_t
805 ftl_wptr_user_blocks_left(const struct ftl_wptr *wptr)
806 {
807 	return ftl_band_user_blocks_left(wptr->band, wptr->offset);
808 }
809 
810 static bool
811 ftl_wptr_ready(struct ftl_wptr *wptr)
812 {
813 	struct ftl_band *band = wptr->band;
814 
815 	/* TODO: add handling of empty bands */
816 
817 	if (spdk_unlikely(!ftl_zone_is_writable(wptr->dev, wptr->zone))) {
818 		/* Erasing band may fail after it was assigned to wptr. */
819 		if (spdk_unlikely(wptr->zone->info.state == SPDK_BDEV_ZONE_STATE_OFFLINE)) {
820 			ftl_wptr_advance(wptr, wptr->dev->xfer_size);
821 		}
822 		return false;
823 	}
824 
825 	/* If we're in the process of writing metadata, wait till it is */
826 	/* completed. */
827 	/* TODO: we should probably change bands once we're writing tail md */
828 	if (ftl_band_state_changing(band)) {
829 		return false;
830 	}
831 
832 	if (band->state == FTL_BAND_STATE_FULL) {
833 		if (wptr->num_outstanding == 0) {
834 			if (ftl_wptr_close_band(wptr)) {
835 				/* TODO: need recovery here */
836 				assert(false);
837 			}
838 		}
839 
840 		return false;
841 	}
842 
843 	if (band->state != FTL_BAND_STATE_OPEN) {
844 		if (ftl_wptr_open_band(wptr)) {
845 			/* TODO: need recovery here */
846 			assert(false);
847 		}
848 
849 		return false;
850 	}
851 
852 	return true;
853 }
854 
855 int
856 ftl_flush_active_bands(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
857 {
858 	struct ftl_wptr *wptr;
859 	struct ftl_band_flush *flush;
860 
861 	assert(ftl_get_core_thread(dev) == spdk_get_thread());
862 
863 	flush = calloc(1, sizeof(*flush));
864 	if (spdk_unlikely(!flush)) {
865 		return -ENOMEM;
866 	}
867 
868 	LIST_INSERT_HEAD(&dev->band_flush_list, flush, list_entry);
869 
870 	flush->cb_fn = cb_fn;
871 	flush->cb_arg = cb_arg;
872 	flush->dev = dev;
873 
874 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
875 		wptr->flush = true;
876 		flush->num_bands++;
877 	}
878 
879 	return 0;
880 }
881 
882 static const struct spdk_ftl_limit *
883 ftl_get_limit(const struct spdk_ftl_dev *dev, int type)
884 {
885 	assert(type < SPDK_FTL_LIMIT_MAX);
886 	return &dev->conf.limits[type];
887 }
888 
889 static bool
890 ftl_cache_lba_valid(struct spdk_ftl_dev *dev, struct ftl_wbuf_entry *entry)
891 {
892 	struct ftl_addr addr;
893 
894 	/* If the LBA is invalid don't bother checking the md and l2p */
895 	if (spdk_unlikely(entry->lba == FTL_LBA_INVALID)) {
896 		return false;
897 	}
898 
899 	addr = ftl_l2p_get(dev, entry->lba);
900 	if (!(ftl_addr_cached(addr) && entry == ftl_get_entry_from_addr(dev, addr))) {
901 		return false;
902 	}
903 
904 	return true;
905 }
906 
907 void
908 ftl_evict_cache_entry(struct spdk_ftl_dev *dev, struct ftl_wbuf_entry *entry)
909 {
910 	pthread_spin_lock(&entry->lock);
911 
912 	if (!entry->valid) {
913 		goto unlock;
914 	}
915 
916 	/* If the l2p wasn't updated and still points at the entry, fill it with the */
917 	/* on-disk address and clear the cache status bit. Otherwise, skip the l2p update */
918 	/* and just clear the cache status. */
919 	if (!ftl_cache_lba_valid(dev, entry)) {
920 		goto clear;
921 	}
922 
923 	ftl_l2p_set(dev, entry->lba, entry->addr);
924 clear:
925 	entry->valid = false;
926 unlock:
927 	pthread_spin_unlock(&entry->lock);
928 }
929 
930 static void
931 ftl_pad_wbuf(struct spdk_ftl_dev *dev, size_t size)
932 {
933 	struct ftl_wbuf_entry *entry;
934 	struct ftl_io_channel *ioch;
935 	int flags = FTL_IO_PAD | FTL_IO_INTERNAL;
936 
937 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
938 
939 	for (size_t i = 0; i < size; ++i) {
940 		entry = ftl_acquire_wbuf_entry(ioch, flags);
941 		if (!entry) {
942 			break;
943 		}
944 
945 		entry->lba = FTL_LBA_INVALID;
946 		entry->addr = ftl_to_addr(FTL_ADDR_INVALID);
947 		memset(entry->payload, 0, FTL_BLOCK_SIZE);
948 
949 		spdk_ring_enqueue(ioch->submit_queue, (void **)&entry, 1, NULL);
950 	}
951 }
952 
953 static void
954 ftl_remove_free_bands(struct spdk_ftl_dev *dev)
955 {
956 	while (!LIST_EMPTY(&dev->free_bands)) {
957 		LIST_REMOVE(LIST_FIRST(&dev->free_bands), list_entry);
958 	}
959 
960 	dev->next_band = NULL;
961 }
962 
963 static void
964 ftl_wptr_pad_band(struct ftl_wptr *wptr)
965 {
966 	struct spdk_ftl_dev *dev = wptr->dev;
967 	struct ftl_batch *batch = dev->current_batch;
968 	struct ftl_io_channel *ioch;
969 	size_t size, pad_size, blocks_left;
970 
971 	size = batch != NULL ? batch->num_entries : 0;
972 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
973 		size += spdk_ring_count(ioch->submit_queue);
974 	}
975 
976 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
977 
978 	blocks_left = ftl_wptr_user_blocks_left(wptr);
979 	assert(size <= blocks_left);
980 	assert(blocks_left % dev->xfer_size == 0);
981 	pad_size = spdk_min(blocks_left - size, spdk_ring_count(ioch->free_queue));
982 
983 	ftl_pad_wbuf(dev, pad_size);
984 }
985 
986 static void
987 ftl_wptr_process_shutdown(struct ftl_wptr *wptr)
988 {
989 	struct spdk_ftl_dev *dev = wptr->dev;
990 	struct ftl_batch *batch = dev->current_batch;
991 	struct ftl_io_channel *ioch;
992 	size_t size;
993 
994 	size = batch != NULL ? batch->num_entries : 0;
995 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
996 		size += spdk_ring_count(ioch->submit_queue);
997 	}
998 
999 	if (size >= dev->xfer_size) {
1000 		return;
1001 	}
1002 
1003 	/* If we reach this point we need to remove free bands */
1004 	/* and pad current wptr band to the end */
1005 	ftl_remove_free_bands(dev);
1006 	ftl_wptr_pad_band(wptr);
1007 }
1008 
1009 static int
1010 ftl_shutdown_complete(struct spdk_ftl_dev *dev)
1011 {
1012 	struct ftl_io_channel *ioch = ftl_io_channel_get_ctx(dev->ioch);
1013 
1014 	return !__atomic_load_n(&dev->num_inflight, __ATOMIC_SEQ_CST) &&
1015 	       dev->num_io_channels == 1 && LIST_EMPTY(&dev->wptr_list) &&
1016 	       TAILQ_EMPTY(&ioch->retry_queue);
1017 }
1018 
1019 void
1020 ftl_apply_limits(struct spdk_ftl_dev *dev)
1021 {
1022 	const struct spdk_ftl_limit *limit;
1023 	struct ftl_io_channel *ioch;
1024 	struct ftl_stats *stats = &dev->stats;
1025 	uint32_t qdepth_limit = 100;
1026 	int i;
1027 
1028 	/* Clear existing limit */
1029 	dev->limit = SPDK_FTL_LIMIT_MAX;
1030 
1031 	for (i = SPDK_FTL_LIMIT_CRIT; i < SPDK_FTL_LIMIT_MAX; ++i) {
1032 		limit = ftl_get_limit(dev, i);
1033 
1034 		if (dev->num_free <= limit->thld) {
1035 			qdepth_limit = limit->limit;
1036 			stats->limits[i]++;
1037 			dev->limit = i;
1038 			break;
1039 		}
1040 	}
1041 
1042 	ftl_trace_limits(dev, dev->limit, dev->num_free);
1043 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
1044 		__atomic_store_n(&ioch->qdepth_limit, (qdepth_limit * ioch->num_entries) / 100,
1045 				 __ATOMIC_SEQ_CST);
1046 	}
1047 }
1048 
1049 static int
1050 ftl_invalidate_addr_unlocked(struct spdk_ftl_dev *dev, struct ftl_addr addr)
1051 {
1052 	struct ftl_band *band = ftl_band_from_addr(dev, addr);
1053 	struct ftl_lba_map *lba_map = &band->lba_map;
1054 	uint64_t offset;
1055 
1056 	offset = ftl_band_block_offset_from_addr(band, addr);
1057 
1058 	/* The bit might be already cleared if two writes are scheduled to the */
1059 	/* same LBA at the same time */
1060 	if (spdk_bit_array_get(lba_map->vld, offset)) {
1061 		assert(lba_map->num_vld > 0);
1062 		spdk_bit_array_clear(lba_map->vld, offset);
1063 		lba_map->num_vld--;
1064 		return 1;
1065 	}
1066 
1067 	return 0;
1068 }
1069 
1070 int
1071 ftl_invalidate_addr(struct spdk_ftl_dev *dev, struct ftl_addr addr)
1072 {
1073 	struct ftl_band *band;
1074 	int rc;
1075 
1076 	assert(!ftl_addr_cached(addr));
1077 	band = ftl_band_from_addr(dev, addr);
1078 
1079 	pthread_spin_lock(&band->lba_map.lock);
1080 	rc = ftl_invalidate_addr_unlocked(dev, addr);
1081 	pthread_spin_unlock(&band->lba_map.lock);
1082 
1083 	return rc;
1084 }
1085 
1086 static int
1087 ftl_read_retry(int rc)
1088 {
1089 	return rc == -EAGAIN;
1090 }
1091 
1092 static int
1093 ftl_read_canceled(int rc)
1094 {
1095 	return rc == -EFAULT || rc == 0;
1096 }
1097 
1098 static int
1099 ftl_cache_read(struct ftl_io *io, uint64_t lba,
1100 	       struct ftl_addr addr, void *buf)
1101 {
1102 	struct ftl_wbuf_entry *entry;
1103 	struct ftl_addr naddr;
1104 	int rc = 0;
1105 
1106 	entry = ftl_get_entry_from_addr(io->dev, addr);
1107 	pthread_spin_lock(&entry->lock);
1108 
1109 	naddr = ftl_l2p_get(io->dev, lba);
1110 	if (addr.offset != naddr.offset) {
1111 		rc = -1;
1112 		goto out;
1113 	}
1114 
1115 	memcpy(buf, entry->payload, FTL_BLOCK_SIZE);
1116 out:
1117 	pthread_spin_unlock(&entry->lock);
1118 	return rc;
1119 }
1120 
1121 static int
1122 ftl_read_next_logical_addr(struct ftl_io *io, struct ftl_addr *addr)
1123 {
1124 	struct spdk_ftl_dev *dev = io->dev;
1125 	struct ftl_addr next_addr;
1126 	size_t i;
1127 
1128 	*addr = ftl_l2p_get(dev, ftl_io_current_lba(io));
1129 
1130 	SPDK_DEBUGLOG(ftl_core, "Read addr:%lx, lba:%lu\n",
1131 		      addr->offset, ftl_io_current_lba(io));
1132 
1133 	/* If the address is invalid, skip it (the buffer should already be zero'ed) */
1134 	if (ftl_addr_invalid(*addr)) {
1135 		return -EFAULT;
1136 	}
1137 
1138 	if (ftl_addr_cached(*addr)) {
1139 		if (!ftl_cache_read(io, ftl_io_current_lba(io), *addr, ftl_io_iovec_addr(io))) {
1140 			return 0;
1141 		}
1142 
1143 		/* If the state changed, we have to re-read the l2p */
1144 		return -EAGAIN;
1145 	}
1146 
1147 	for (i = 1; i < ftl_io_iovec_len_left(io); ++i) {
1148 		next_addr = ftl_l2p_get(dev, ftl_io_get_lba(io, io->pos + i));
1149 
1150 		if (ftl_addr_invalid(next_addr) || ftl_addr_cached(next_addr)) {
1151 			break;
1152 		}
1153 
1154 		if (addr->offset + i != next_addr.offset) {
1155 			break;
1156 		}
1157 	}
1158 
1159 	return i;
1160 }
1161 
1162 static int
1163 ftl_submit_read(struct ftl_io *io)
1164 {
1165 	struct spdk_ftl_dev *dev = io->dev;
1166 	struct ftl_io_channel *ioch;
1167 	struct ftl_addr addr;
1168 	int rc = 0, num_blocks;
1169 
1170 	ioch = ftl_io_channel_get_ctx(io->ioch);
1171 
1172 	assert(LIST_EMPTY(&io->children));
1173 
1174 	while (io->pos < io->num_blocks) {
1175 		if (ftl_io_mode_physical(io)) {
1176 			num_blocks = rc = ftl_read_next_physical_addr(io, &addr);
1177 		} else {
1178 			num_blocks = rc = ftl_read_next_logical_addr(io, &addr);
1179 		}
1180 
1181 		/* We might need to retry the read from scratch (e.g. */
1182 		/* because write was under way and completed before */
1183 		/* we could read it from the write buffer */
1184 		if (ftl_read_retry(rc)) {
1185 			continue;
1186 		}
1187 
1188 		/* We don't have to schedule the read, as it was read from cache */
1189 		if (ftl_read_canceled(rc)) {
1190 			ftl_io_advance(io, 1);
1191 			ftl_trace_completion(io->dev, io, rc ? FTL_TRACE_COMPLETION_INVALID :
1192 					     FTL_TRACE_COMPLETION_CACHE);
1193 			rc = 0;
1194 			continue;
1195 		}
1196 
1197 		assert(num_blocks > 0);
1198 
1199 		ftl_trace_submission(dev, io, addr, num_blocks);
1200 		rc = spdk_bdev_read_blocks(dev->base_bdev_desc, ioch->base_ioch,
1201 					   ftl_io_iovec_addr(io),
1202 					   addr.offset,
1203 					   num_blocks, ftl_io_cmpl_cb, io);
1204 		if (spdk_unlikely(rc)) {
1205 			if (rc == -ENOMEM) {
1206 				TAILQ_INSERT_TAIL(&ioch->retry_queue, io, ioch_entry);
1207 				rc = 0;
1208 			} else {
1209 				ftl_io_fail(io, rc);
1210 			}
1211 			break;
1212 		}
1213 
1214 		ftl_io_inc_req(io);
1215 		ftl_io_advance(io, num_blocks);
1216 	}
1217 
1218 	/* If we didn't have to read anything from the device, */
1219 	/* complete the request right away */
1220 	if (ftl_io_done(io)) {
1221 		ftl_io_complete(io);
1222 	}
1223 
1224 	return rc;
1225 }
1226 
1227 static void
1228 ftl_complete_flush(struct ftl_flush *flush)
1229 {
1230 	assert(flush->num_req == 0);
1231 	LIST_REMOVE(flush, list_entry);
1232 
1233 	flush->cb.fn(flush->cb.ctx, 0);
1234 
1235 	spdk_bit_array_free(&flush->bmap);
1236 	free(flush);
1237 }
1238 
1239 static void
1240 ftl_process_flush(struct spdk_ftl_dev *dev, struct ftl_batch *batch)
1241 {
1242 	struct ftl_flush *flush, *tflush;
1243 	size_t offset;
1244 
1245 	LIST_FOREACH_SAFE(flush, &dev->flush_list, list_entry, tflush) {
1246 		offset = batch->index;
1247 
1248 		if (spdk_bit_array_get(flush->bmap, offset)) {
1249 			spdk_bit_array_clear(flush->bmap, offset);
1250 			if (!(--flush->num_req)) {
1251 				ftl_complete_flush(flush);
1252 			}
1253 		}
1254 	}
1255 }
1256 
1257 static void
1258 ftl_nv_cache_wrap_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1259 {
1260 	struct ftl_nv_cache *nv_cache = cb_arg;
1261 
1262 	if (!success) {
1263 		SPDK_ERRLOG("Unable to write non-volatile cache metadata header\n");
1264 		/* TODO: go into read-only mode */
1265 		assert(0);
1266 	}
1267 
1268 	pthread_spin_lock(&nv_cache->lock);
1269 	nv_cache->ready = true;
1270 	pthread_spin_unlock(&nv_cache->lock);
1271 
1272 	spdk_bdev_free_io(bdev_io);
1273 }
1274 
1275 static void
1276 ftl_nv_cache_wrap(void *ctx)
1277 {
1278 	struct ftl_nv_cache *nv_cache = ctx;
1279 	int rc;
1280 
1281 	rc = ftl_nv_cache_write_header(nv_cache, false, ftl_nv_cache_wrap_cb, nv_cache);
1282 	if (spdk_unlikely(rc != 0)) {
1283 		SPDK_ERRLOG("Unable to write non-volatile cache metadata header: %s\n",
1284 			    spdk_strerror(-rc));
1285 		/* TODO: go into read-only mode */
1286 		assert(0);
1287 	}
1288 }
1289 
1290 static uint64_t
1291 ftl_reserve_nv_cache(struct ftl_nv_cache *nv_cache, size_t *num_blocks, unsigned int *phase)
1292 {
1293 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1294 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1295 	uint64_t num_available, cache_size, cache_addr = FTL_LBA_INVALID;
1296 
1297 	cache_size = spdk_bdev_get_num_blocks(bdev);
1298 
1299 	pthread_spin_lock(&nv_cache->lock);
1300 	if (spdk_unlikely(nv_cache->num_available == 0 || !nv_cache->ready)) {
1301 		goto out;
1302 	}
1303 
1304 	num_available = spdk_min(nv_cache->num_available, *num_blocks);
1305 	num_available = spdk_min(num_available, dev->conf.nv_cache.max_request_cnt);
1306 
1307 	if (spdk_unlikely(nv_cache->current_addr + num_available > cache_size)) {
1308 		*num_blocks = cache_size - nv_cache->current_addr;
1309 	} else {
1310 		*num_blocks = num_available;
1311 	}
1312 
1313 	cache_addr = nv_cache->current_addr;
1314 	nv_cache->current_addr += *num_blocks;
1315 	nv_cache->num_available -= *num_blocks;
1316 	*phase = nv_cache->phase;
1317 
1318 	if (nv_cache->current_addr == spdk_bdev_get_num_blocks(bdev)) {
1319 		nv_cache->current_addr = FTL_NV_CACHE_DATA_OFFSET;
1320 		nv_cache->phase = ftl_nv_cache_next_phase(nv_cache->phase);
1321 		nv_cache->ready = false;
1322 		spdk_thread_send_msg(ftl_get_core_thread(dev), ftl_nv_cache_wrap, nv_cache);
1323 	}
1324 out:
1325 	pthread_spin_unlock(&nv_cache->lock);
1326 	return cache_addr;
1327 }
1328 
1329 static struct ftl_io *
1330 ftl_alloc_io_nv_cache(struct ftl_io *parent, size_t num_blocks)
1331 {
1332 	struct ftl_io_init_opts opts = {
1333 		.dev		= parent->dev,
1334 		.parent		= parent,
1335 		.iovcnt		= 0,
1336 		.num_blocks	= num_blocks,
1337 		.flags		= parent->flags | FTL_IO_CACHE,
1338 	};
1339 
1340 	return ftl_io_init_internal(&opts);
1341 }
1342 
1343 static void
1344 ftl_nv_cache_submit_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1345 {
1346 	struct ftl_io *io = cb_arg;
1347 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1348 
1349 	if (spdk_unlikely(!success)) {
1350 		SPDK_ERRLOG("Non-volatile cache write failed at %"PRIx64"\n", io->addr.offset);
1351 		io->status = -EIO;
1352 	}
1353 
1354 	ftl_io_dec_req(io);
1355 	if (ftl_io_done(io)) {
1356 		spdk_mempool_put(nv_cache->md_pool, io->md);
1357 		ftl_io_complete(io);
1358 	}
1359 
1360 	spdk_bdev_free_io(bdev_io);
1361 }
1362 
1363 static void
1364 ftl_submit_nv_cache(void *ctx)
1365 {
1366 	struct ftl_io *io = ctx;
1367 	struct spdk_ftl_dev *dev = io->dev;
1368 	struct spdk_thread *thread;
1369 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
1370 	struct ftl_io_channel *ioch;
1371 	int rc;
1372 
1373 	ioch = ftl_io_channel_get_ctx(io->ioch);
1374 	thread = spdk_io_channel_get_thread(io->ioch);
1375 
1376 	rc = spdk_bdev_write_blocks_with_md(nv_cache->bdev_desc, ioch->cache_ioch,
1377 					    ftl_io_iovec_addr(io), io->md, io->addr.offset,
1378 					    io->num_blocks, ftl_nv_cache_submit_cb, io);
1379 	if (rc == -ENOMEM) {
1380 		spdk_thread_send_msg(thread, ftl_submit_nv_cache, io);
1381 		return;
1382 	} else if (rc) {
1383 		SPDK_ERRLOG("Write to persistent cache failed: %s (%"PRIu64", %"PRIu64")\n",
1384 			    spdk_strerror(-rc), io->addr.offset, io->num_blocks);
1385 		spdk_mempool_put(nv_cache->md_pool, io->md);
1386 		io->status = -EIO;
1387 		ftl_io_complete(io);
1388 		return;
1389 	}
1390 
1391 	ftl_io_advance(io, io->num_blocks);
1392 	ftl_io_inc_req(io);
1393 }
1394 
1395 static void
1396 ftl_nv_cache_fill_md(struct ftl_io *io, unsigned int phase)
1397 {
1398 	struct spdk_bdev *bdev;
1399 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1400 	uint64_t block_off, lba;
1401 	void *md_buf = io->md;
1402 
1403 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1404 
1405 	for (block_off = 0; block_off < io->num_blocks; ++block_off) {
1406 		lba = ftl_nv_cache_pack_lba(ftl_io_get_lba(io, block_off), phase);
1407 		memcpy(md_buf, &lba, sizeof(lba));
1408 		md_buf += spdk_bdev_get_md_size(bdev);
1409 	}
1410 }
1411 
1412 static void
1413 _ftl_write_nv_cache(void *ctx)
1414 {
1415 	struct ftl_io *child, *io = ctx;
1416 	struct spdk_ftl_dev *dev = io->dev;
1417 	struct spdk_thread *thread;
1418 	unsigned int phase;
1419 	uint64_t num_blocks;
1420 
1421 	thread = spdk_io_channel_get_thread(io->ioch);
1422 
1423 	while (io->pos < io->num_blocks) {
1424 		num_blocks = ftl_io_iovec_len_left(io);
1425 
1426 		child = ftl_alloc_io_nv_cache(io, num_blocks);
1427 		if (spdk_unlikely(!child)) {
1428 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1429 			return;
1430 		}
1431 
1432 		child->md = spdk_mempool_get(dev->nv_cache.md_pool);
1433 		if (spdk_unlikely(!child->md)) {
1434 			ftl_io_free(child);
1435 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1436 			break;
1437 		}
1438 
1439 		/* Reserve area on the write buffer cache */
1440 		child->addr.offset = ftl_reserve_nv_cache(&dev->nv_cache, &num_blocks, &phase);
1441 		if (child->addr.offset == FTL_LBA_INVALID) {
1442 			spdk_mempool_put(dev->nv_cache.md_pool, child->md);
1443 			ftl_io_free(child);
1444 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1445 			break;
1446 		}
1447 
1448 		/* Shrink the IO if there isn't enough room in the cache to fill the whole iovec */
1449 		if (spdk_unlikely(num_blocks != ftl_io_iovec_len_left(io))) {
1450 			ftl_io_shrink_iovec(child, num_blocks);
1451 		}
1452 
1453 		ftl_nv_cache_fill_md(child, phase);
1454 		ftl_submit_nv_cache(child);
1455 	}
1456 
1457 	if (ftl_io_done(io)) {
1458 		ftl_io_complete(io);
1459 	}
1460 }
1461 
1462 static void
1463 ftl_write_nv_cache(struct ftl_io *parent)
1464 {
1465 	ftl_io_reset(parent);
1466 	parent->flags |= FTL_IO_CACHE;
1467 	_ftl_write_nv_cache(parent);
1468 }
1469 
1470 int
1471 ftl_nv_cache_write_header(struct ftl_nv_cache *nv_cache, bool shutdown,
1472 			  spdk_bdev_io_completion_cb cb_fn, void *cb_arg)
1473 {
1474 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1475 	struct ftl_nv_cache_header *hdr = nv_cache->dma_buf;
1476 	struct spdk_bdev *bdev;
1477 	struct ftl_io_channel *ioch;
1478 
1479 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1480 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
1481 
1482 	memset(hdr, 0, spdk_bdev_get_block_size(bdev));
1483 
1484 	hdr->phase = (uint8_t)nv_cache->phase;
1485 	hdr->size = spdk_bdev_get_num_blocks(bdev);
1486 	hdr->uuid = dev->uuid;
1487 	hdr->version = FTL_NV_CACHE_HEADER_VERSION;
1488 	hdr->current_addr = shutdown ? nv_cache->current_addr : FTL_LBA_INVALID;
1489 	hdr->checksum = spdk_crc32c_update(hdr, offsetof(struct ftl_nv_cache_header, checksum), 0);
1490 
1491 	return spdk_bdev_write_blocks(nv_cache->bdev_desc, ioch->cache_ioch, hdr, 0, 1,
1492 				      cb_fn, cb_arg);
1493 }
1494 
1495 int
1496 ftl_nv_cache_scrub(struct ftl_nv_cache *nv_cache, spdk_bdev_io_completion_cb cb_fn, void *cb_arg)
1497 {
1498 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1499 	struct ftl_io_channel *ioch;
1500 	struct spdk_bdev *bdev;
1501 
1502 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
1503 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1504 
1505 	return spdk_bdev_write_zeroes_blocks(nv_cache->bdev_desc, ioch->cache_ioch, 1,
1506 					     spdk_bdev_get_num_blocks(bdev) - 1,
1507 					     cb_fn, cb_arg);
1508 }
1509 
1510 static void
1511 ftl_write_fail(struct ftl_io *io, int status)
1512 {
1513 	struct ftl_batch *batch = io->batch;
1514 	struct spdk_ftl_dev *dev = io->dev;
1515 	struct ftl_wbuf_entry *entry;
1516 	struct ftl_band *band;
1517 	char buf[128];
1518 
1519 	entry = TAILQ_FIRST(&batch->entries);
1520 
1521 	band = ftl_band_from_addr(io->dev, entry->addr);
1522 	SPDK_ERRLOG("Write failed @addr: %s, status: %d\n",
1523 		    ftl_addr2str(entry->addr, buf, sizeof(buf)), status);
1524 
1525 	/* Close the band and, halt wptr and defrag */
1526 	ftl_halt_writes(dev, band);
1527 
1528 	TAILQ_FOREACH(entry, &batch->entries, tailq) {
1529 		/* Invalidate meta set by process_writes() */
1530 		ftl_invalidate_addr(dev, entry->addr);
1531 	}
1532 
1533 	/* Reset the batch back to the write buffer to resend it later */
1534 	TAILQ_INSERT_TAIL(&dev->pending_batches, batch, tailq);
1535 }
1536 
1537 static void
1538 ftl_write_cb(struct ftl_io *io, void *arg, int status)
1539 {
1540 	struct spdk_ftl_dev *dev = io->dev;
1541 	struct ftl_batch *batch = io->batch;
1542 	struct ftl_wbuf_entry *entry;
1543 	struct ftl_band *band;
1544 	struct ftl_addr prev_addr, addr = io->addr;
1545 
1546 	if (status) {
1547 		ftl_write_fail(io, status);
1548 		return;
1549 	}
1550 
1551 	assert(io->num_blocks == dev->xfer_size);
1552 	assert(!(io->flags & FTL_IO_MD));
1553 
1554 	TAILQ_FOREACH(entry, &batch->entries, tailq) {
1555 		band = entry->band;
1556 		if (!(entry->io_flags & FTL_IO_PAD)) {
1557 			/* Verify that the LBA is set for user blocks */
1558 			assert(entry->lba != FTL_LBA_INVALID);
1559 		}
1560 
1561 		if (band != NULL) {
1562 			assert(band->num_reloc_blocks > 0);
1563 			band->num_reloc_blocks--;
1564 		}
1565 
1566 		entry->addr = addr;
1567 		if (entry->lba != FTL_LBA_INVALID) {
1568 			pthread_spin_lock(&entry->lock);
1569 			prev_addr = ftl_l2p_get(dev, entry->lba);
1570 
1571 			/* If the l2p was updated in the meantime, don't update band's metadata */
1572 			if (ftl_addr_cached(prev_addr) &&
1573 			    entry == ftl_get_entry_from_addr(dev, prev_addr)) {
1574 				/* Setting entry's cache bit needs to be done after metadata */
1575 				/* within the band is updated to make sure that writes */
1576 				/* invalidating the entry clear the metadata as well */
1577 				ftl_band_set_addr(io->band, entry->lba, entry->addr);
1578 				entry->valid = true;
1579 			}
1580 			pthread_spin_unlock(&entry->lock);
1581 		}
1582 
1583 		SPDK_DEBUGLOG(ftl_core, "Write addr:%lu, lba:%lu\n",
1584 			      entry->addr.offset, entry->lba);
1585 
1586 		addr = ftl_band_next_addr(io->band, addr, 1);
1587 	}
1588 
1589 	ftl_process_flush(dev, batch);
1590 	ftl_release_batch(dev, batch);
1591 }
1592 
1593 static void
1594 ftl_update_stats(struct spdk_ftl_dev *dev, const struct ftl_wbuf_entry *entry)
1595 {
1596 	if (!(entry->io_flags & FTL_IO_INTERNAL)) {
1597 		dev->stats.write_user++;
1598 	}
1599 	dev->stats.write_total++;
1600 }
1601 
1602 static void
1603 ftl_update_l2p(struct spdk_ftl_dev *dev, const struct ftl_wbuf_entry *entry,
1604 	       struct ftl_addr addr)
1605 {
1606 	struct ftl_addr prev_addr;
1607 	struct ftl_wbuf_entry *prev;
1608 	struct ftl_band *band;
1609 	int valid;
1610 	bool io_weak = entry->io_flags & FTL_IO_WEAK;
1611 
1612 	prev_addr = ftl_l2p_get(dev, entry->lba);
1613 	if (ftl_addr_invalid(prev_addr)) {
1614 		ftl_l2p_set(dev, entry->lba, addr);
1615 		return;
1616 	}
1617 
1618 	if (ftl_addr_cached(prev_addr)) {
1619 		prev = ftl_get_entry_from_addr(dev, prev_addr);
1620 		pthread_spin_lock(&prev->lock);
1621 
1622 		/* Re-read the L2P under the lock to protect against updates */
1623 		/* to this LBA from other threads */
1624 		prev_addr = ftl_l2p_get(dev, entry->lba);
1625 
1626 		/* If the entry is no longer in cache, another write has been */
1627 		/* scheduled in the meantime, so we can return to evicted path */
1628 		if (!ftl_addr_cached(prev_addr)) {
1629 			pthread_spin_unlock(&prev->lock);
1630 			goto evicted;
1631 		}
1632 
1633 		/*
1634 		 * Relocating block could still reside in cache due to fact that write
1635 		 * buffers are independent for each IO channel and enough amount of data
1636 		 * (write unit size) must be collected before it will be submitted to lower
1637 		 * layer.
1638 		 * When previous entry wasn't overwritten invalidate old address and entry.
1639 		 * Otherwise skip relocating block.
1640 		 */
1641 		if (io_weak &&
1642 		    /* Check if prev_addr was updated in meantime */
1643 		    !(ftl_addr_cmp(prev_addr, ftl_get_addr_from_entry(prev)) &&
1644 		      /* Check if relocating address it the same as in previous entry */
1645 		      ftl_addr_cmp(prev->addr, entry->addr))) {
1646 			pthread_spin_unlock(&prev->lock);
1647 			return;
1648 		}
1649 
1650 		/*
1651 		 * If previous entry is part of cache and was written into disk remove
1652 		 * and invalidate it
1653 		 */
1654 		if (prev->valid) {
1655 			ftl_invalidate_addr(dev, prev->addr);
1656 			prev->valid = false;
1657 		}
1658 
1659 		ftl_l2p_set(dev, entry->lba, addr);
1660 		pthread_spin_unlock(&prev->lock);
1661 		return;
1662 	}
1663 
1664 evicted:
1665 	/*
1666 	 *  If the L2P's physical address is different than what we expected we don't need to
1667 	 *  do anything (someone's already overwritten our data).
1668 	 */
1669 	if (io_weak && !ftl_addr_cmp(prev_addr, entry->addr)) {
1670 		return;
1671 	}
1672 
1673 	/* Lock the band containing previous physical address. This assures atomic changes to */
1674 	/* the L2P as wall as metadata. The valid bits in metadata are used to */
1675 	/* check weak writes validity. */
1676 	band = ftl_band_from_addr(dev, prev_addr);
1677 	pthread_spin_lock(&band->lba_map.lock);
1678 
1679 	valid = ftl_invalidate_addr_unlocked(dev, prev_addr);
1680 
1681 	/* If the address has been invalidated already, we don't want to update */
1682 	/* the L2P for weak writes, as it means the write is no longer valid. */
1683 	if (!io_weak || valid) {
1684 		ftl_l2p_set(dev, entry->lba, addr);
1685 	}
1686 
1687 	pthread_spin_unlock(&band->lba_map.lock);
1688 }
1689 
1690 static struct ftl_io *
1691 ftl_io_init_child_write(struct ftl_io *parent, struct ftl_addr addr, ftl_io_fn cb)
1692 {
1693 	struct ftl_io *io;
1694 	struct spdk_ftl_dev *dev = parent->dev;
1695 	struct ftl_io_init_opts opts = {
1696 		.dev		= dev,
1697 		.io		= NULL,
1698 		.parent		= parent,
1699 		.band		= parent->band,
1700 		.size		= sizeof(struct ftl_io),
1701 		.flags		= 0,
1702 		.type		= parent->type,
1703 		.num_blocks	= dev->xfer_size,
1704 		.cb_fn		= cb,
1705 		.iovcnt		= 0,
1706 	};
1707 
1708 	io = ftl_io_init_internal(&opts);
1709 	if (!io) {
1710 		return NULL;
1711 	}
1712 
1713 	io->addr = addr;
1714 
1715 	return io;
1716 }
1717 
1718 static void
1719 ftl_io_child_write_cb(struct ftl_io *io, void *ctx, int status)
1720 {
1721 	struct ftl_zone *zone;
1722 	struct ftl_wptr *wptr;
1723 
1724 	zone = ftl_band_zone_from_addr(io->band, io->addr);
1725 	wptr = ftl_wptr_from_band(io->band);
1726 
1727 	zone->busy = false;
1728 	zone->info.write_pointer += io->num_blocks;
1729 
1730 	if (zone->info.write_pointer == zone->info.zone_id + zone->info.capacity) {
1731 		zone->info.state = SPDK_BDEV_ZONE_STATE_FULL;
1732 	}
1733 
1734 	/* If some other write on the same band failed the write pointer would already be freed */
1735 	if (spdk_likely(wptr)) {
1736 		wptr->num_outstanding--;
1737 	}
1738 }
1739 
1740 static int
1741 ftl_submit_child_write(struct ftl_wptr *wptr, struct ftl_io *io)
1742 {
1743 	struct spdk_ftl_dev	*dev = io->dev;
1744 	struct ftl_io_channel	*ioch;
1745 	struct ftl_io		*child;
1746 	struct ftl_addr		addr;
1747 	int			rc;
1748 
1749 	ioch = ftl_io_channel_get_ctx(io->ioch);
1750 
1751 	if (spdk_likely(!wptr->direct_mode)) {
1752 		addr = wptr->addr;
1753 	} else {
1754 		assert(io->flags & FTL_IO_DIRECT_ACCESS);
1755 		assert(ftl_addr_get_band(dev, io->addr) == wptr->band->id);
1756 		addr = io->addr;
1757 	}
1758 
1759 	/* Split IO to child requests and release zone immediately after child is completed */
1760 	child = ftl_io_init_child_write(io, addr, ftl_io_child_write_cb);
1761 	if (!child) {
1762 		return -EAGAIN;
1763 	}
1764 
1765 	wptr->num_outstanding++;
1766 
1767 	if (ftl_is_append_supported(dev)) {
1768 		rc = spdk_bdev_zone_appendv(dev->base_bdev_desc, ioch->base_ioch,
1769 					    child->iov, child->iov_cnt,
1770 					    ftl_addr_get_zone_slba(dev, addr),
1771 					    dev->xfer_size, ftl_io_cmpl_cb, child);
1772 	} else {
1773 		rc = spdk_bdev_writev_blocks(dev->base_bdev_desc, ioch->base_ioch,
1774 					     child->iov, child->iov_cnt, addr.offset,
1775 					     dev->xfer_size, ftl_io_cmpl_cb, child);
1776 	}
1777 
1778 	if (rc) {
1779 		wptr->num_outstanding--;
1780 		ftl_io_fail(child, rc);
1781 		ftl_io_complete(child);
1782 		SPDK_ERRLOG("spdk_bdev_write_blocks_with_md failed with status:%d, addr:%lu\n",
1783 			    rc, addr.offset);
1784 		return -EIO;
1785 	}
1786 
1787 	ftl_io_inc_req(child);
1788 	ftl_io_advance(child, dev->xfer_size);
1789 
1790 	return 0;
1791 }
1792 
1793 static int
1794 ftl_submit_write(struct ftl_wptr *wptr, struct ftl_io *io)
1795 {
1796 	struct spdk_ftl_dev	*dev = io->dev;
1797 	int			rc = 0;
1798 
1799 	assert(io->num_blocks % dev->xfer_size == 0);
1800 
1801 	while (io->iov_pos < io->iov_cnt) {
1802 		/* There are no guarantees of the order of completion of NVMe IO submission queue */
1803 		/* so wait until zone is not busy before submitting another write */
1804 		if (!ftl_is_append_supported(dev) && wptr->zone->busy) {
1805 			TAILQ_INSERT_TAIL(&wptr->pending_queue, io, ioch_entry);
1806 			rc = -EAGAIN;
1807 			break;
1808 		}
1809 
1810 		rc = ftl_submit_child_write(wptr, io);
1811 		if (spdk_unlikely(rc)) {
1812 			if (rc == -EAGAIN) {
1813 				TAILQ_INSERT_TAIL(&wptr->pending_queue, io, ioch_entry);
1814 			} else {
1815 				ftl_io_fail(io, rc);
1816 			}
1817 			break;
1818 		}
1819 
1820 		ftl_trace_submission(dev, io, wptr->addr, dev->xfer_size);
1821 		ftl_wptr_advance(wptr, dev->xfer_size);
1822 	}
1823 
1824 	if (ftl_io_done(io)) {
1825 		/* Parent IO will complete after all children are completed */
1826 		ftl_io_complete(io);
1827 	}
1828 
1829 	return rc;
1830 }
1831 
1832 static void
1833 ftl_flush_pad_batch(struct spdk_ftl_dev *dev)
1834 {
1835 	struct ftl_batch *batch = dev->current_batch;
1836 	struct ftl_io_channel *ioch;
1837 	size_t size = 0, num_entries = 0;
1838 
1839 	assert(batch != NULL);
1840 	assert(batch->num_entries < dev->xfer_size);
1841 
1842 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
1843 		size += spdk_ring_count(ioch->submit_queue);
1844 	}
1845 
1846 	num_entries = dev->xfer_size - batch->num_entries;
1847 	if (size < num_entries) {
1848 		ftl_pad_wbuf(dev, num_entries - size);
1849 	}
1850 }
1851 
1852 static bool
1853 ftl_check_io_channel_flush(struct spdk_ftl_dev *dev)
1854 {
1855 	struct ftl_io_channel *ioch;
1856 
1857 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
1858 		if (ioch->flush && spdk_ring_count(ioch->free_queue) != ioch->num_entries) {
1859 			return true;
1860 		}
1861 	}
1862 
1863 	return false;
1864 }
1865 
1866 static int
1867 ftl_wptr_process_writes(struct ftl_wptr *wptr)
1868 {
1869 	struct spdk_ftl_dev	*dev = wptr->dev;
1870 	struct ftl_batch	*batch;
1871 	struct ftl_wbuf_entry	*entry;
1872 	struct ftl_io		*io;
1873 
1874 	if (spdk_unlikely(!TAILQ_EMPTY(&wptr->pending_queue))) {
1875 		io = TAILQ_FIRST(&wptr->pending_queue);
1876 		TAILQ_REMOVE(&wptr->pending_queue, io, ioch_entry);
1877 
1878 		if (ftl_submit_write(wptr, io) == -EAGAIN) {
1879 			return 0;
1880 		}
1881 	}
1882 
1883 	/* Make sure the band is prepared for writing */
1884 	if (!ftl_wptr_ready(wptr)) {
1885 		return 0;
1886 	}
1887 
1888 	if (dev->halt) {
1889 		ftl_wptr_process_shutdown(wptr);
1890 	}
1891 
1892 	if (spdk_unlikely(wptr->flush)) {
1893 		ftl_wptr_pad_band(wptr);
1894 	}
1895 
1896 	batch = ftl_get_next_batch(dev);
1897 	if (!batch) {
1898 		/* If there are queued flush requests we need to pad the write buffer to */
1899 		/* force out remaining entries */
1900 		if (!LIST_EMPTY(&dev->flush_list) || ftl_check_io_channel_flush(dev)) {
1901 			ftl_flush_pad_batch(dev);
1902 		}
1903 
1904 		return 0;
1905 	}
1906 
1907 	io = ftl_io_wbuf_init(dev, wptr->addr, wptr->band, batch, ftl_write_cb);
1908 	if (!io) {
1909 		goto error;
1910 	}
1911 
1912 	TAILQ_FOREACH(entry, &batch->entries, tailq) {
1913 		/* Update band's relocation stats if the IO comes from reloc */
1914 		if (entry->io_flags & FTL_IO_WEAK) {
1915 			if (!spdk_bit_array_get(wptr->band->reloc_bitmap, entry->band->id)) {
1916 				spdk_bit_array_set(wptr->band->reloc_bitmap, entry->band->id);
1917 				entry->band->num_reloc_bands++;
1918 			}
1919 		}
1920 
1921 		ftl_trace_wbuf_pop(dev, entry);
1922 		ftl_update_stats(dev, entry);
1923 	}
1924 
1925 	SPDK_DEBUGLOG(ftl_core, "Write addr:%lx\n", wptr->addr.offset);
1926 
1927 	if (ftl_submit_write(wptr, io)) {
1928 		/* TODO: we need some recovery here */
1929 		assert(0 && "Write submit failed");
1930 		if (ftl_io_done(io)) {
1931 			ftl_io_free(io);
1932 		}
1933 	}
1934 
1935 	return dev->xfer_size;
1936 error:
1937 	TAILQ_INSERT_TAIL(&dev->pending_batches, batch, tailq);
1938 	return 0;
1939 }
1940 
1941 static bool
1942 ftl_process_writes(struct spdk_ftl_dev *dev)
1943 {
1944 	struct ftl_wptr *wptr, *twptr;
1945 	size_t num_active = 0, num_writes = 0;
1946 	enum ftl_band_state state;
1947 
1948 	LIST_FOREACH_SAFE(wptr, &dev->wptr_list, list_entry, twptr) {
1949 		num_writes += ftl_wptr_process_writes(wptr);
1950 		state = wptr->band->state;
1951 
1952 		if (state != FTL_BAND_STATE_FULL &&
1953 		    state != FTL_BAND_STATE_CLOSING &&
1954 		    state != FTL_BAND_STATE_CLOSED) {
1955 			num_active++;
1956 		}
1957 	}
1958 
1959 	if (num_active < 1) {
1960 		ftl_add_wptr(dev);
1961 	}
1962 
1963 	return num_writes != 0;
1964 }
1965 
1966 static void
1967 ftl_fill_wbuf_entry(struct ftl_wbuf_entry *entry, struct ftl_io *io)
1968 {
1969 	memcpy(entry->payload, ftl_io_iovec_addr(io), FTL_BLOCK_SIZE);
1970 
1971 	if (entry->io_flags & FTL_IO_WEAK) {
1972 		entry->band = ftl_band_from_addr(io->dev, io->addr);
1973 		entry->addr = ftl_band_next_addr(entry->band, io->addr, io->pos);
1974 		entry->band->num_reloc_blocks++;
1975 	}
1976 
1977 	entry->trace = io->trace;
1978 	entry->lba = ftl_io_current_lba(io);
1979 }
1980 
1981 static int
1982 ftl_wbuf_fill(struct ftl_io *io)
1983 {
1984 	struct spdk_ftl_dev *dev = io->dev;
1985 	struct ftl_io_channel *ioch;
1986 	struct ftl_wbuf_entry *entry;
1987 
1988 	ioch = ftl_io_channel_get_ctx(io->ioch);
1989 
1990 	while (io->pos < io->num_blocks) {
1991 		if (ftl_io_current_lba(io) == FTL_LBA_INVALID) {
1992 			ftl_io_advance(io, 1);
1993 			continue;
1994 		}
1995 
1996 		entry = ftl_acquire_wbuf_entry(ioch, io->flags);
1997 		if (!entry) {
1998 			TAILQ_INSERT_TAIL(&ioch->retry_queue, io, ioch_entry);
1999 			return 0;
2000 		}
2001 
2002 		ftl_fill_wbuf_entry(entry, io);
2003 
2004 		ftl_trace_wbuf_fill(dev, io);
2005 		ftl_update_l2p(dev, entry, ftl_get_addr_from_entry(entry));
2006 		ftl_io_advance(io, 1);
2007 
2008 		/* Needs to be done after L2P is updated to avoid race with */
2009 		/* write completion callback when it's processed faster than */
2010 		/* L2P is set in update_l2p(). */
2011 		spdk_ring_enqueue(ioch->submit_queue, (void **)&entry, 1, NULL);
2012 	}
2013 
2014 	if (ftl_io_done(io)) {
2015 		if (ftl_dev_has_nv_cache(dev) && !(io->flags & FTL_IO_BYPASS_CACHE)) {
2016 			ftl_write_nv_cache(io);
2017 		} else {
2018 			TAILQ_INSERT_TAIL(&ioch->write_cmpl_queue, io, ioch_entry);
2019 		}
2020 	}
2021 
2022 	return 0;
2023 }
2024 
2025 static bool
2026 ftl_dev_needs_defrag(struct spdk_ftl_dev *dev)
2027 {
2028 	const struct spdk_ftl_limit *limit = ftl_get_limit(dev, SPDK_FTL_LIMIT_START);
2029 
2030 	if (ftl_reloc_is_halted(dev->reloc)) {
2031 		return false;
2032 	}
2033 
2034 	if (ftl_reloc_is_defrag_active(dev->reloc)) {
2035 		return false;
2036 	}
2037 
2038 	if (dev->num_free <= limit->thld) {
2039 		return true;
2040 	}
2041 
2042 	return false;
2043 }
2044 
2045 static double
2046 ftl_band_calc_merit(struct ftl_band *band, size_t *threshold_valid)
2047 {
2048 	size_t usable, valid, invalid;
2049 	double vld_ratio;
2050 
2051 	/* If the band doesn't have any usable blocks it's of no use */
2052 	usable = ftl_band_num_usable_blocks(band);
2053 	if (usable == 0) {
2054 		return 0.0;
2055 	}
2056 
2057 	valid =  threshold_valid ? (usable - *threshold_valid) : band->lba_map.num_vld;
2058 	invalid = usable - valid;
2059 
2060 	/* Add one to avoid division by 0 */
2061 	vld_ratio = (double)invalid / (double)(valid + 1);
2062 	return vld_ratio * ftl_band_age(band);
2063 }
2064 
2065 static bool
2066 ftl_band_needs_defrag(struct ftl_band *band, struct spdk_ftl_dev *dev)
2067 {
2068 	struct spdk_ftl_conf *conf = &dev->conf;
2069 	size_t thld_vld;
2070 
2071 	/* If we're in dire need of free bands, every band is worth defragging */
2072 	if (ftl_current_limit(dev) == SPDK_FTL_LIMIT_CRIT) {
2073 		return true;
2074 	}
2075 
2076 	thld_vld = (ftl_band_num_usable_blocks(band) * conf->invalid_thld) / 100;
2077 
2078 	return band->merit > ftl_band_calc_merit(band, &thld_vld);
2079 }
2080 
2081 static struct ftl_band *
2082 ftl_select_defrag_band(struct spdk_ftl_dev *dev)
2083 {
2084 	struct ftl_band *band, *mband = NULL;
2085 	double merit = 0;
2086 
2087 	LIST_FOREACH(band, &dev->shut_bands, list_entry) {
2088 		assert(band->state == FTL_BAND_STATE_CLOSED);
2089 		band->merit = ftl_band_calc_merit(band, NULL);
2090 		if (band->merit > merit) {
2091 			merit = band->merit;
2092 			mband = band;
2093 		}
2094 	}
2095 
2096 	if (mband && !ftl_band_needs_defrag(mband, dev)) {
2097 		mband = NULL;
2098 	}
2099 
2100 	return mband;
2101 }
2102 
2103 static bool
2104 ftl_process_relocs(struct spdk_ftl_dev *dev)
2105 {
2106 	struct ftl_band *band;
2107 
2108 	if (ftl_dev_needs_defrag(dev)) {
2109 		band = ftl_select_defrag_band(dev);
2110 		if (band) {
2111 			ftl_reloc_add(dev->reloc, band, 0, ftl_get_num_blocks_in_band(dev), 0, true);
2112 			ftl_trace_defrag_band(dev, band);
2113 		}
2114 	}
2115 
2116 	return ftl_reloc(dev->reloc);
2117 }
2118 
2119 int
2120 ftl_current_limit(const struct spdk_ftl_dev *dev)
2121 {
2122 	return dev->limit;
2123 }
2124 
2125 void
2126 spdk_ftl_dev_get_attrs(const struct spdk_ftl_dev *dev, struct spdk_ftl_attrs *attrs)
2127 {
2128 	attrs->uuid = dev->uuid;
2129 	attrs->num_blocks = dev->num_lbas;
2130 	attrs->block_size = FTL_BLOCK_SIZE;
2131 	attrs->num_zones = ftl_get_num_zones(dev);
2132 	attrs->zone_size = ftl_get_num_blocks_in_zone(dev);
2133 	attrs->conf = dev->conf;
2134 	attrs->base_bdev = spdk_bdev_get_name(spdk_bdev_desc_get_bdev(dev->base_bdev_desc));
2135 
2136 	attrs->cache_bdev = NULL;
2137 	if (dev->nv_cache.bdev_desc) {
2138 		attrs->cache_bdev = spdk_bdev_get_name(
2139 					    spdk_bdev_desc_get_bdev(dev->nv_cache.bdev_desc));
2140 	}
2141 }
2142 
2143 static void
2144 _ftl_io_write(void *ctx)
2145 {
2146 	ftl_io_write((struct ftl_io *)ctx);
2147 }
2148 
2149 static int
2150 ftl_submit_write_leaf(struct ftl_io *io)
2151 {
2152 	int rc;
2153 
2154 	rc = ftl_submit_write(ftl_wptr_from_band(io->band), io);
2155 	if (rc == -EAGAIN) {
2156 		/* EAGAIN means that the request was put on the pending queue */
2157 		return 0;
2158 	}
2159 
2160 	return rc;
2161 }
2162 
2163 void
2164 ftl_io_write(struct ftl_io *io)
2165 {
2166 	struct spdk_ftl_dev *dev = io->dev;
2167 	struct ftl_io_channel *ioch = ftl_io_channel_get_ctx(io->ioch);
2168 
2169 	/* Put the IO on retry queue in case IO channel is not initialized */
2170 	if (spdk_unlikely(ioch->index == FTL_IO_CHANNEL_INDEX_INVALID)) {
2171 		TAILQ_INSERT_TAIL(&ioch->retry_queue, io, ioch_entry);
2172 		return;
2173 	}
2174 
2175 	/* For normal IOs we just need to copy the data onto the write buffer */
2176 	if (!(io->flags & FTL_IO_MD)) {
2177 		ftl_io_call_foreach_child(io, ftl_wbuf_fill);
2178 	} else {
2179 		/* Metadata has its own buffer, so it doesn't have to be copied, so just */
2180 		/* send it the the core thread and schedule the write immediately */
2181 		if (ftl_check_core_thread(dev)) {
2182 			ftl_io_call_foreach_child(io, ftl_submit_write_leaf);
2183 		} else {
2184 			spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_write, io);
2185 		}
2186 	}
2187 }
2188 
2189 int
2190 spdk_ftl_write(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
2191 	       struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
2192 {
2193 	struct ftl_io *io;
2194 
2195 	if (iov_cnt == 0) {
2196 		return -EINVAL;
2197 	}
2198 
2199 	if (lba_cnt == 0) {
2200 		return -EINVAL;
2201 	}
2202 
2203 	if (lba_cnt != ftl_iovec_num_blocks(iov, iov_cnt)) {
2204 		return -EINVAL;
2205 	}
2206 
2207 	if (!dev->initialized) {
2208 		return -EBUSY;
2209 	}
2210 
2211 	io = ftl_io_user_init(ch, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_WRITE);
2212 	if (!io) {
2213 		return -ENOMEM;
2214 	}
2215 
2216 	ftl_io_write(io);
2217 
2218 	return 0;
2219 }
2220 
2221 void
2222 ftl_io_read(struct ftl_io *io)
2223 {
2224 	ftl_io_call_foreach_child(io, ftl_submit_read);
2225 }
2226 
2227 int
2228 spdk_ftl_read(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
2229 	      struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
2230 {
2231 	struct ftl_io *io;
2232 
2233 	if (iov_cnt == 0) {
2234 		return -EINVAL;
2235 	}
2236 
2237 	if (lba_cnt == 0) {
2238 		return -EINVAL;
2239 	}
2240 
2241 	if (lba_cnt != ftl_iovec_num_blocks(iov, iov_cnt)) {
2242 		return -EINVAL;
2243 	}
2244 
2245 	if (!dev->initialized) {
2246 		return -EBUSY;
2247 	}
2248 
2249 	io = ftl_io_user_init(ch, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_READ);
2250 	if (!io) {
2251 		return -ENOMEM;
2252 	}
2253 
2254 	ftl_io_read(io);
2255 	return 0;
2256 }
2257 
2258 static struct ftl_flush *
2259 ftl_flush_init(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2260 {
2261 	struct ftl_flush *flush;
2262 
2263 	flush = calloc(1, sizeof(*flush));
2264 	if (!flush) {
2265 		return NULL;
2266 	}
2267 
2268 	flush->bmap = spdk_bit_array_create(FTL_BATCH_COUNT);
2269 	if (!flush->bmap) {
2270 		goto error;
2271 	}
2272 
2273 	flush->dev = dev;
2274 	flush->cb.fn = cb_fn;
2275 	flush->cb.ctx = cb_arg;
2276 
2277 	return flush;
2278 error:
2279 	free(flush);
2280 	return NULL;
2281 }
2282 
2283 static void
2284 _ftl_flush(void *ctx)
2285 {
2286 	struct ftl_flush *flush = ctx;
2287 	struct spdk_ftl_dev *dev = flush->dev;
2288 	uint32_t i;
2289 
2290 	/* Attach flush object to all non-empty batches */
2291 	for (i = 0; i < FTL_BATCH_COUNT; ++i) {
2292 		if (dev->batch_array[i].num_entries > 0) {
2293 			spdk_bit_array_set(flush->bmap, i);
2294 			flush->num_req++;
2295 		}
2296 	}
2297 
2298 	LIST_INSERT_HEAD(&dev->flush_list, flush, list_entry);
2299 
2300 	/* If the write buffer was already empty, the flush can be completed right away */
2301 	if (!flush->num_req) {
2302 		ftl_complete_flush(flush);
2303 	}
2304 }
2305 
2306 int
2307 ftl_flush_wbuf(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2308 {
2309 	struct ftl_flush *flush;
2310 
2311 	flush = ftl_flush_init(dev, cb_fn, cb_arg);
2312 	if (!flush) {
2313 		return -ENOMEM;
2314 	}
2315 
2316 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_flush, flush);
2317 	return 0;
2318 }
2319 
2320 int
2321 spdk_ftl_flush(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2322 {
2323 	if (!dev->initialized) {
2324 		return -EBUSY;
2325 	}
2326 
2327 	return ftl_flush_wbuf(dev, cb_fn, cb_arg);
2328 }
2329 
2330 bool
2331 ftl_addr_is_written(struct ftl_band *band, struct ftl_addr addr)
2332 {
2333 	struct ftl_zone *zone = ftl_band_zone_from_addr(band, addr);
2334 
2335 	return addr.offset < zone->info.write_pointer;
2336 }
2337 
2338 static void ftl_process_media_event(struct spdk_ftl_dev *dev, struct spdk_bdev_media_event event);
2339 
2340 static void
2341 _ftl_process_media_event(void *ctx)
2342 {
2343 	struct ftl_media_event *event = ctx;
2344 	struct spdk_ftl_dev *dev = event->dev;
2345 
2346 	ftl_process_media_event(dev, event->event);
2347 	spdk_mempool_put(dev->media_events_pool, event);
2348 }
2349 
2350 static void
2351 ftl_process_media_event(struct spdk_ftl_dev *dev, struct spdk_bdev_media_event event)
2352 {
2353 	struct ftl_band *band;
2354 	struct ftl_addr addr = { .offset = event.offset };
2355 	size_t block_off;
2356 
2357 	if (!ftl_check_core_thread(dev)) {
2358 		struct ftl_media_event *media_event;
2359 
2360 		media_event = spdk_mempool_get(dev->media_events_pool);
2361 		if (!media_event) {
2362 			SPDK_ERRLOG("Media event lost due to lack of memory");
2363 			return;
2364 		}
2365 
2366 		media_event->dev = dev;
2367 		media_event->event = event;
2368 		spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_process_media_event,
2369 				     media_event);
2370 		return;
2371 	}
2372 
2373 	band = ftl_band_from_addr(dev, addr);
2374 	block_off = ftl_band_block_offset_from_addr(band, addr);
2375 
2376 	ftl_reloc_add(dev->reloc, band, block_off, event.num_blocks, 0, false);
2377 }
2378 
2379 void
2380 ftl_get_media_events(struct spdk_ftl_dev *dev)
2381 {
2382 #define FTL_MAX_MEDIA_EVENTS 128
2383 	struct spdk_bdev_media_event events[FTL_MAX_MEDIA_EVENTS];
2384 	size_t num_events, i;
2385 
2386 	if (!dev->initialized) {
2387 		return;
2388 	}
2389 
2390 	do {
2391 		num_events = spdk_bdev_get_media_events(dev->base_bdev_desc,
2392 							events, FTL_MAX_MEDIA_EVENTS);
2393 
2394 		for (i = 0; i < num_events; ++i) {
2395 			ftl_process_media_event(dev, events[i]);
2396 		}
2397 
2398 	} while (num_events);
2399 }
2400 
2401 int
2402 ftl_io_channel_poll(void *arg)
2403 {
2404 	struct ftl_io_channel *ch = arg;
2405 	struct ftl_io *io;
2406 	TAILQ_HEAD(, ftl_io) retry_queue;
2407 
2408 	if (TAILQ_EMPTY(&ch->write_cmpl_queue) && TAILQ_EMPTY(&ch->retry_queue)) {
2409 		return SPDK_POLLER_IDLE;
2410 	}
2411 
2412 	while (!TAILQ_EMPTY(&ch->write_cmpl_queue)) {
2413 		io = TAILQ_FIRST(&ch->write_cmpl_queue);
2414 		TAILQ_REMOVE(&ch->write_cmpl_queue, io, ioch_entry);
2415 		ftl_io_complete(io);
2416 	}
2417 
2418 	/*
2419 	 * Create local copy of the retry queue to prevent from infinite retrying if IO will be
2420 	 * inserted to the retry queue again
2421 	 */
2422 	TAILQ_INIT(&retry_queue);
2423 	TAILQ_SWAP(&ch->retry_queue, &retry_queue, ftl_io, ioch_entry);
2424 
2425 	while (!TAILQ_EMPTY(&retry_queue)) {
2426 		io = TAILQ_FIRST(&retry_queue);
2427 		TAILQ_REMOVE(&retry_queue, io, ioch_entry);
2428 		if (io->type == FTL_IO_WRITE) {
2429 			ftl_io_write(io);
2430 		} else {
2431 			ftl_io_read(io);
2432 		}
2433 	}
2434 
2435 	return SPDK_POLLER_BUSY;
2436 }
2437 
2438 int
2439 ftl_task_core(void *ctx)
2440 {
2441 	struct spdk_ftl_dev *dev = ctx;
2442 	bool busy;
2443 
2444 	if (dev->halt) {
2445 		if (ftl_shutdown_complete(dev)) {
2446 			spdk_poller_unregister(&dev->core_poller);
2447 			return SPDK_POLLER_IDLE;
2448 		}
2449 	}
2450 
2451 	busy = ftl_process_writes(dev) || ftl_process_relocs(dev);
2452 
2453 	return busy ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
2454 }
2455 
2456 SPDK_LOG_REGISTER_COMPONENT(ftl_core)
2457