xref: /spdk/lib/ftl/ftl_core.c (revision 488570ebd418ba07c9e69e65106dcc964f3bb41b)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/likely.h"
7 #include "spdk/stdinc.h"
8 #include "spdk/nvme.h"
9 #include "spdk/thread.h"
10 #include "spdk/bdev_module.h"
11 #include "spdk/string.h"
12 #include "spdk/log.h"
13 #include "spdk/ftl.h"
14 #include "spdk/crc32.h"
15 
16 #include "ftl_core.h"
17 #include "ftl_band.h"
18 #include "ftl_io.h"
19 #include "ftl_debug.h"
20 #include "ftl_reloc.h"
21 
22 struct ftl_band_flush {
23 	struct spdk_ftl_dev		*dev;
24 	/* Number of bands left to be flushed */
25 	size_t				num_bands;
26 	/* User callback */
27 	spdk_ftl_fn			cb_fn;
28 	/* Callback's argument */
29 	void				*cb_arg;
30 	/* List link */
31 	LIST_ENTRY(ftl_band_flush)	list_entry;
32 };
33 
34 struct ftl_wptr {
35 	/* Owner device */
36 	struct spdk_ftl_dev		*dev;
37 
38 	/* Current address */
39 	struct ftl_addr			addr;
40 
41 	/* Band currently being written to */
42 	struct ftl_band			*band;
43 
44 	/* Current logical block's offset */
45 	uint64_t			offset;
46 
47 	/* Current zone */
48 	struct ftl_zone			*zone;
49 
50 	/* Pending IO queue */
51 	TAILQ_HEAD(, ftl_io)		pending_queue;
52 
53 	/* List link */
54 	LIST_ENTRY(ftl_wptr)		list_entry;
55 
56 	/*
57 	 * If setup in direct mode, there will be no offset or band state update after IO.
58 	 * The zoned bdev address is not assigned by wptr, and is instead taken directly
59 	 * from the request.
60 	 */
61 	bool				direct_mode;
62 
63 	/* Number of outstanding write requests */
64 	uint32_t			num_outstanding;
65 
66 	/* Marks that the band related to this wptr needs to be closed as soon as possible */
67 	bool				flush;
68 };
69 
70 struct ftl_flush {
71 	/* Owner device */
72 	struct spdk_ftl_dev		*dev;
73 
74 	/* Number of batches to wait for */
75 	size_t				num_req;
76 
77 	/* Callback */
78 	struct {
79 		spdk_ftl_fn		fn;
80 		void			*ctx;
81 	} cb;
82 
83 	/* Batch bitmap */
84 	struct spdk_bit_array		*bmap;
85 
86 	/* List link */
87 	LIST_ENTRY(ftl_flush)		list_entry;
88 };
89 
90 static void
91 ftl_wptr_free(struct ftl_wptr *wptr)
92 {
93 	if (!wptr) {
94 		return;
95 	}
96 
97 	free(wptr);
98 }
99 
100 static void
101 ftl_remove_wptr(struct ftl_wptr *wptr)
102 {
103 	struct spdk_ftl_dev *dev = wptr->dev;
104 	struct ftl_band_flush *flush, *tmp;
105 
106 	if (spdk_unlikely(wptr->flush)) {
107 		LIST_FOREACH_SAFE(flush, &dev->band_flush_list, list_entry, tmp) {
108 			assert(flush->num_bands > 0);
109 			if (--flush->num_bands == 0) {
110 				flush->cb_fn(flush->cb_arg, 0);
111 				LIST_REMOVE(flush, list_entry);
112 				free(flush);
113 			}
114 		}
115 	}
116 
117 	LIST_REMOVE(wptr, list_entry);
118 	ftl_wptr_free(wptr);
119 }
120 
121 static struct ftl_wbuf_entry *
122 ftl_acquire_wbuf_entry(struct ftl_io_channel *io_channel, int io_flags)
123 {
124 	struct ftl_wbuf_entry *entry = NULL;
125 	uint32_t qdepth;
126 
127 	if (!(io_flags & FTL_IO_INTERNAL)) {
128 		qdepth = __atomic_fetch_add(&io_channel->qdepth_current, 1, __ATOMIC_SEQ_CST);
129 		if (qdepth >= io_channel->qdepth_limit) {
130 			__atomic_fetch_sub(&io_channel->qdepth_current, 1, __ATOMIC_SEQ_CST);
131 			return NULL;
132 		}
133 	}
134 
135 	if (spdk_ring_dequeue(io_channel->free_queue, (void **)&entry, 1) != 1) {
136 		if (!(io_flags & FTL_IO_INTERNAL)) {
137 			__atomic_fetch_sub(&io_channel->qdepth_current, 1, __ATOMIC_SEQ_CST);
138 		}
139 
140 		return NULL;
141 	}
142 
143 	assert(entry != NULL);
144 
145 	ftl_evict_cache_entry(io_channel->dev, entry);
146 
147 	entry->io_flags = io_flags;
148 	entry->addr.offset = FTL_ADDR_INVALID;
149 	entry->lba = FTL_LBA_INVALID;
150 	entry->band = NULL;
151 	entry->valid = false;
152 
153 	return entry;
154 }
155 
156 static void
157 ftl_release_wbuf_entry(struct ftl_wbuf_entry *entry)
158 {
159 	struct ftl_io_channel *io_channel = entry->ioch;
160 
161 	if (!(entry->io_flags & FTL_IO_INTERNAL)) {
162 		__atomic_fetch_sub(&io_channel->qdepth_current, 1, __ATOMIC_SEQ_CST);
163 	}
164 
165 	spdk_ring_enqueue(io_channel->free_queue, (void **)&entry, 1, NULL);
166 }
167 
168 static struct ftl_batch *
169 ftl_get_next_batch(struct spdk_ftl_dev *dev)
170 {
171 	struct ftl_batch *batch = dev->current_batch;
172 	struct ftl_io_channel *ioch;
173 #define FTL_DEQUEUE_ENTRIES 128
174 	struct ftl_wbuf_entry *entries[FTL_DEQUEUE_ENTRIES];
175 	TAILQ_HEAD(, ftl_io_channel) ioch_queue;
176 	size_t i, num_dequeued, num_remaining;
177 	uint64_t *metadata;
178 
179 	if (batch == NULL) {
180 		batch = TAILQ_FIRST(&dev->pending_batches);
181 		if (batch != NULL) {
182 			TAILQ_REMOVE(&dev->pending_batches, batch, tailq);
183 			return batch;
184 		}
185 
186 		batch = TAILQ_FIRST(&dev->free_batches);
187 		if (spdk_unlikely(batch == NULL)) {
188 			return NULL;
189 		}
190 
191 		assert(TAILQ_EMPTY(&batch->entries));
192 		assert(batch->num_entries == 0);
193 		TAILQ_REMOVE(&dev->free_batches, batch, tailq);
194 	}
195 
196 	/*
197 	 * Keep shifting the queue to ensure fairness in IO channel selection.  Each time
198 	 * ftl_get_next_batch() is called, we're starting to dequeue write buffer entries from a
199 	 * different IO channel.
200 	 */
201 	TAILQ_INIT(&ioch_queue);
202 	while (!TAILQ_EMPTY(&dev->ioch_queue)) {
203 		ioch = TAILQ_FIRST(&dev->ioch_queue);
204 		TAILQ_REMOVE(&dev->ioch_queue, ioch, tailq);
205 		TAILQ_INSERT_TAIL(&ioch_queue, ioch, tailq);
206 
207 		num_remaining = dev->xfer_size - batch->num_entries;
208 		while (num_remaining > 0) {
209 			num_dequeued = spdk_ring_dequeue(ioch->submit_queue, (void **)entries,
210 							 spdk_min(num_remaining,
211 									 FTL_DEQUEUE_ENTRIES));
212 			if (num_dequeued == 0) {
213 				break;
214 			}
215 
216 			for (i = 0; i < num_dequeued; ++i) {
217 				batch->iov[batch->num_entries + i].iov_base = entries[i]->payload;
218 				batch->iov[batch->num_entries + i].iov_len = FTL_BLOCK_SIZE;
219 
220 				if (batch->metadata != NULL) {
221 					metadata = (uint64_t *)((char *)batch->metadata +
222 								i * dev->md_size);
223 					*metadata = entries[i]->lba;
224 				}
225 
226 				TAILQ_INSERT_TAIL(&batch->entries, entries[i], tailq);
227 			}
228 
229 			batch->num_entries += num_dequeued;
230 			num_remaining -= num_dequeued;
231 		}
232 
233 		if (num_remaining == 0) {
234 			break;
235 		}
236 	}
237 
238 	TAILQ_CONCAT(&dev->ioch_queue, &ioch_queue, tailq);
239 
240 	if (batch->num_entries == dev->xfer_size) {
241 		dev->current_batch = NULL;
242 	} else {
243 		dev->current_batch = batch;
244 		batch = NULL;
245 	}
246 
247 	return batch;
248 }
249 
250 static void
251 ftl_release_batch(struct spdk_ftl_dev *dev, struct ftl_batch *batch)
252 {
253 	struct ftl_wbuf_entry *entry;
254 
255 	while (!TAILQ_EMPTY(&batch->entries)) {
256 		entry = TAILQ_FIRST(&batch->entries);
257 		TAILQ_REMOVE(&batch->entries, entry, tailq);
258 		ftl_release_wbuf_entry(entry);
259 	}
260 
261 	batch->num_entries = 0;
262 	TAILQ_INSERT_TAIL(&dev->free_batches, batch, tailq);
263 }
264 
265 static struct ftl_wbuf_entry *
266 ftl_get_entry_from_addr(struct spdk_ftl_dev *dev, struct ftl_addr addr)
267 {
268 	struct ftl_io_channel *ioch;
269 	uint64_t ioch_offset, entry_offset;
270 
271 	ioch_offset = addr.cache_offset & ((1 << dev->ioch_shift) - 1);
272 	entry_offset = addr.cache_offset >> dev->ioch_shift;
273 	ioch = dev->ioch_array[ioch_offset];
274 
275 	assert(ioch_offset < dev->conf.max_io_channels);
276 	assert(entry_offset < ioch->num_entries);
277 	assert(addr.cached == 1);
278 
279 	return &ioch->wbuf_entries[entry_offset];
280 }
281 
282 static struct ftl_addr
283 ftl_get_addr_from_entry(struct ftl_wbuf_entry *entry)
284 {
285 	struct ftl_io_channel *ioch = entry->ioch;
286 	struct ftl_addr addr = {};
287 
288 	addr.cached = 1;
289 	addr.cache_offset = (uint64_t)entry->index << ioch->dev->ioch_shift | ioch->index;
290 
291 	return addr;
292 }
293 
294 static void
295 ftl_io_cmpl_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
296 {
297 	struct ftl_io *io = cb_arg;
298 	struct spdk_ftl_dev *dev = io->dev;
299 
300 	if (spdk_unlikely(!success)) {
301 		io->status = -EIO;
302 	}
303 
304 	ftl_trace_completion(dev, io, FTL_TRACE_COMPLETION_DISK);
305 
306 	if (io->type == FTL_IO_WRITE && ftl_is_append_supported(dev)) {
307 		assert(io->parent);
308 		io->parent->addr.offset = spdk_bdev_io_get_append_location(bdev_io);
309 	}
310 
311 	ftl_io_dec_req(io);
312 	if (ftl_io_done(io)) {
313 		ftl_io_complete(io);
314 	}
315 
316 	spdk_bdev_free_io(bdev_io);
317 }
318 
319 static void
320 ftl_halt_writes(struct spdk_ftl_dev *dev, struct ftl_band *band)
321 {
322 	struct ftl_wptr *wptr = NULL;
323 
324 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
325 		if (wptr->band == band) {
326 			break;
327 		}
328 	}
329 
330 	/* If the band already has the high_prio flag set, other writes must */
331 	/* have failed earlier, so it's already taken care of. */
332 	if (band->high_prio) {
333 		assert(wptr == NULL);
334 		return;
335 	}
336 
337 	ftl_band_write_failed(band);
338 	ftl_remove_wptr(wptr);
339 }
340 
341 static struct ftl_wptr *
342 ftl_wptr_from_band(struct ftl_band *band)
343 {
344 	struct spdk_ftl_dev *dev = band->dev;
345 	struct ftl_wptr *wptr = NULL;
346 
347 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
348 		if (wptr->band == band) {
349 			return wptr;
350 		}
351 	}
352 	assert(false);
353 	return NULL;
354 }
355 
356 static void
357 ftl_md_write_fail(struct ftl_io *io, int status)
358 {
359 	struct ftl_band *band = io->band;
360 	struct ftl_wptr *wptr;
361 	char buf[128];
362 
363 	wptr = ftl_wptr_from_band(band);
364 	SPDK_ERRLOG("Metadata write failed @addr: %s, status: %d\n",
365 		    ftl_addr2str(wptr->addr, buf, sizeof(buf)), status);
366 
367 	ftl_halt_writes(io->dev, band);
368 }
369 
370 static void
371 ftl_md_write_cb(struct ftl_io *io, void *arg, int status)
372 {
373 	struct spdk_ftl_dev *dev = io->dev;
374 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
375 	struct ftl_band *band = io->band;
376 	struct ftl_wptr *wptr;
377 	size_t id;
378 
379 	wptr = ftl_wptr_from_band(band);
380 
381 	if (status) {
382 		ftl_md_write_fail(io, status);
383 		return;
384 	}
385 
386 	ftl_band_set_next_state(band);
387 	if (band->state == FTL_BAND_STATE_CLOSED) {
388 		if (ftl_dev_has_nv_cache(dev)) {
389 			pthread_spin_lock(&nv_cache->lock);
390 			nv_cache->num_available += ftl_band_user_blocks(band);
391 
392 			if (spdk_unlikely(nv_cache->num_available > nv_cache->num_data_blocks)) {
393 				nv_cache->num_available = nv_cache->num_data_blocks;
394 			}
395 			pthread_spin_unlock(&nv_cache->lock);
396 		}
397 
398 		/*
399 		 * Go through the reloc_bitmap, checking for all the bands that had its data moved
400 		 * onto current band and update their counters to allow them to be used for writing
401 		 * (once they're closed and empty).
402 		 */
403 		for (id = 0; id < ftl_get_num_bands(dev); ++id) {
404 			if (spdk_bit_array_get(band->reloc_bitmap, id)) {
405 				assert(dev->bands[id].num_reloc_bands > 0);
406 				dev->bands[id].num_reloc_bands--;
407 
408 				spdk_bit_array_clear(band->reloc_bitmap, id);
409 			}
410 		}
411 
412 		ftl_remove_wptr(wptr);
413 	}
414 }
415 
416 static int
417 ftl_read_next_physical_addr(struct ftl_io *io, struct ftl_addr *addr)
418 {
419 	struct spdk_ftl_dev *dev = io->dev;
420 	size_t num_blocks, max_blocks;
421 
422 	assert(ftl_io_mode_physical(io));
423 	assert(io->iov_pos < io->iov_cnt);
424 
425 	if (io->pos == 0) {
426 		*addr = io->addr;
427 	} else {
428 		*addr = ftl_band_next_xfer_addr(io->band, io->addr, io->pos);
429 	}
430 
431 	assert(!ftl_addr_invalid(*addr));
432 
433 	/* Metadata has to be read in the way it's written (jumping across */
434 	/* the zones in xfer_size increments) */
435 	if (io->flags & FTL_IO_MD) {
436 		max_blocks = dev->xfer_size - (addr->offset % dev->xfer_size);
437 		num_blocks = spdk_min(ftl_io_iovec_len_left(io), max_blocks);
438 		assert(addr->offset / dev->xfer_size ==
439 		       (addr->offset + num_blocks - 1) / dev->xfer_size);
440 	} else {
441 		num_blocks = ftl_io_iovec_len_left(io);
442 	}
443 
444 	return num_blocks;
445 }
446 
447 static int
448 ftl_wptr_close_band(struct ftl_wptr *wptr)
449 {
450 	struct ftl_band *band = wptr->band;
451 
452 	ftl_band_set_state(band, FTL_BAND_STATE_CLOSING);
453 
454 	return ftl_band_write_tail_md(band, ftl_md_write_cb);
455 }
456 
457 static int
458 ftl_wptr_open_band(struct ftl_wptr *wptr)
459 {
460 	struct ftl_band *band = wptr->band;
461 
462 	assert(ftl_band_zone_is_first(band, wptr->zone));
463 	assert(band->lba_map.num_vld == 0);
464 
465 	ftl_band_clear_lba_map(band);
466 
467 	assert(band->state == FTL_BAND_STATE_PREP);
468 	ftl_band_set_state(band, FTL_BAND_STATE_OPENING);
469 
470 	return ftl_band_write_head_md(band, ftl_md_write_cb);
471 }
472 
473 static int
474 ftl_submit_erase(struct ftl_io *io)
475 {
476 	struct spdk_ftl_dev *dev = io->dev;
477 	struct ftl_band *band = io->band;
478 	struct ftl_addr addr = io->addr;
479 	struct ftl_io_channel *ioch;
480 	struct ftl_zone *zone;
481 	int rc = 0;
482 	size_t i;
483 
484 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
485 
486 	for (i = 0; i < io->num_blocks; ++i) {
487 		if (i != 0) {
488 			zone = ftl_band_next_zone(band, ftl_band_zone_from_addr(band, addr));
489 			assert(zone->info.state == SPDK_BDEV_ZONE_STATE_FULL);
490 			addr.offset = zone->info.zone_id;
491 		}
492 
493 		assert(ftl_addr_get_zone_offset(dev, addr) == 0);
494 
495 		ftl_trace_submission(dev, io, addr, 1);
496 		rc = spdk_bdev_zone_management(dev->base_bdev_desc, ioch->base_ioch, addr.offset,
497 					       SPDK_BDEV_ZONE_RESET, ftl_io_cmpl_cb, io);
498 		if (spdk_unlikely(rc)) {
499 			ftl_io_fail(io, rc);
500 			SPDK_ERRLOG("Vector reset failed with status: %d\n", rc);
501 			break;
502 		}
503 
504 		ftl_io_inc_req(io);
505 		ftl_io_advance(io, 1);
506 	}
507 
508 	if (ftl_io_done(io)) {
509 		ftl_io_complete(io);
510 	}
511 
512 	return rc;
513 }
514 
515 static bool
516 ftl_check_core_thread(const struct spdk_ftl_dev *dev)
517 {
518 	return dev->core_thread == spdk_get_thread();
519 }
520 
521 struct spdk_io_channel *
522 ftl_get_io_channel(const struct spdk_ftl_dev *dev)
523 {
524 	if (ftl_check_core_thread(dev)) {
525 		return dev->ioch;
526 	}
527 
528 	return NULL;
529 }
530 
531 static void
532 ftl_erase_fail(struct ftl_io *io, int status)
533 {
534 	struct ftl_zone *zone;
535 	struct ftl_band *band = io->band;
536 	char buf[128];
537 
538 	SPDK_ERRLOG("Erase failed at address: %s, status: %d\n",
539 		    ftl_addr2str(io->addr, buf, sizeof(buf)), status);
540 
541 	zone = ftl_band_zone_from_addr(band, io->addr);
542 	zone->info.state = SPDK_BDEV_ZONE_STATE_OFFLINE;
543 	ftl_band_remove_zone(band, zone);
544 	band->tail_md_addr = ftl_band_tail_md_addr(band);
545 }
546 
547 static void
548 ftl_zone_erase_cb(struct ftl_io *io, void *ctx, int status)
549 {
550 	struct ftl_zone *zone;
551 
552 	zone = ftl_band_zone_from_addr(io->band, io->addr);
553 	zone->busy = false;
554 
555 	if (spdk_unlikely(status)) {
556 		ftl_erase_fail(io, status);
557 		return;
558 	}
559 
560 	zone->info.state = SPDK_BDEV_ZONE_STATE_EMPTY;
561 	zone->info.write_pointer = zone->info.zone_id;
562 }
563 
564 static int
565 ftl_band_erase(struct ftl_band *band)
566 {
567 	struct ftl_zone *zone;
568 	struct ftl_io *io;
569 	int rc = 0;
570 
571 	assert(band->state == FTL_BAND_STATE_CLOSED ||
572 	       band->state == FTL_BAND_STATE_FREE);
573 
574 	ftl_band_set_state(band, FTL_BAND_STATE_PREP);
575 
576 	CIRCLEQ_FOREACH(zone, &band->zones, circleq) {
577 		if (zone->info.state == SPDK_BDEV_ZONE_STATE_EMPTY) {
578 			continue;
579 		}
580 
581 		io = ftl_io_erase_init(band, 1, ftl_zone_erase_cb);
582 		if (!io) {
583 			rc = -ENOMEM;
584 			break;
585 		}
586 
587 		zone->busy = true;
588 		io->addr.offset = zone->info.zone_id;
589 		rc = ftl_submit_erase(io);
590 		if (rc) {
591 			zone->busy = false;
592 			assert(0);
593 			/* TODO: change band's state back to close? */
594 			break;
595 		}
596 	}
597 
598 	return rc;
599 }
600 
601 static struct ftl_band *
602 ftl_next_write_band(struct spdk_ftl_dev *dev)
603 {
604 	struct ftl_band *band;
605 
606 	/* Find a free band that has all of its data moved onto other closed bands */
607 	LIST_FOREACH(band, &dev->free_bands, list_entry) {
608 		assert(band->state == FTL_BAND_STATE_FREE);
609 		if (band->num_reloc_bands == 0 && band->num_reloc_blocks == 0) {
610 			break;
611 		}
612 	}
613 
614 	if (spdk_unlikely(!band)) {
615 		return NULL;
616 	}
617 
618 	if (ftl_band_erase(band)) {
619 		/* TODO: handle erase failure */
620 		return NULL;
621 	}
622 
623 	return band;
624 }
625 
626 static struct ftl_band *
627 ftl_next_wptr_band(struct spdk_ftl_dev *dev)
628 {
629 	struct ftl_band *band;
630 
631 	if (!dev->next_band) {
632 		band = ftl_next_write_band(dev);
633 	} else {
634 		assert(dev->next_band->state == FTL_BAND_STATE_PREP);
635 		band = dev->next_band;
636 		dev->next_band = NULL;
637 	}
638 
639 	return band;
640 }
641 
642 static struct ftl_wptr *
643 ftl_wptr_init(struct ftl_band *band)
644 {
645 	struct spdk_ftl_dev *dev = band->dev;
646 	struct ftl_wptr *wptr;
647 
648 	wptr = calloc(1, sizeof(*wptr));
649 	if (!wptr) {
650 		return NULL;
651 	}
652 
653 	wptr->dev = dev;
654 	wptr->band = band;
655 	wptr->zone = CIRCLEQ_FIRST(&band->zones);
656 	wptr->addr.offset = wptr->zone->info.zone_id;
657 	TAILQ_INIT(&wptr->pending_queue);
658 
659 	return wptr;
660 }
661 
662 static int
663 ftl_add_direct_wptr(struct ftl_band *band)
664 {
665 	struct spdk_ftl_dev *dev = band->dev;
666 	struct ftl_wptr *wptr;
667 
668 	assert(band->state == FTL_BAND_STATE_OPEN);
669 
670 	wptr = ftl_wptr_init(band);
671 	if (!wptr) {
672 		return -1;
673 	}
674 
675 	wptr->direct_mode = true;
676 
677 	if (ftl_band_alloc_lba_map(band)) {
678 		ftl_wptr_free(wptr);
679 		return -1;
680 	}
681 
682 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
683 
684 	SPDK_DEBUGLOG(ftl_core, "wptr: direct band %u\n", band->id);
685 	ftl_trace_write_band(dev, band);
686 	return 0;
687 }
688 
689 static void
690 ftl_close_direct_wptr(struct ftl_band *band)
691 {
692 	struct ftl_wptr *wptr = ftl_wptr_from_band(band);
693 
694 	assert(wptr->direct_mode);
695 	assert(band->state == FTL_BAND_STATE_CLOSED);
696 
697 	ftl_band_release_lba_map(band);
698 
699 	ftl_remove_wptr(wptr);
700 }
701 
702 int
703 ftl_band_set_direct_access(struct ftl_band *band, bool access)
704 {
705 	if (access) {
706 		return ftl_add_direct_wptr(band);
707 	} else {
708 		ftl_close_direct_wptr(band);
709 		return 0;
710 	}
711 }
712 
713 static int
714 ftl_add_wptr(struct spdk_ftl_dev *dev)
715 {
716 	struct ftl_band *band;
717 	struct ftl_wptr *wptr;
718 
719 	band = ftl_next_wptr_band(dev);
720 	if (!band) {
721 		return -1;
722 	}
723 
724 	wptr = ftl_wptr_init(band);
725 	if (!wptr) {
726 		return -1;
727 	}
728 
729 	if (ftl_band_write_prep(band)) {
730 		ftl_wptr_free(wptr);
731 		return -1;
732 	}
733 
734 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
735 
736 	SPDK_DEBUGLOG(ftl_core, "wptr: band %u\n", band->id);
737 	ftl_trace_write_band(dev, band);
738 	return 0;
739 }
740 
741 static void
742 ftl_wptr_advance(struct ftl_wptr *wptr, size_t xfer_size)
743 {
744 	struct ftl_band *band = wptr->band;
745 	struct spdk_ftl_dev *dev = wptr->dev;
746 	struct spdk_ftl_conf *conf = &dev->conf;
747 	size_t next_thld;
748 
749 	if (spdk_unlikely(wptr->direct_mode)) {
750 		return;
751 	}
752 
753 	wptr->offset += xfer_size;
754 	next_thld = (ftl_band_num_usable_blocks(band) * conf->band_thld) / 100;
755 
756 	if (ftl_band_full(band, wptr->offset)) {
757 		ftl_band_set_state(band, FTL_BAND_STATE_FULL);
758 	}
759 
760 	wptr->zone->busy = true;
761 	wptr->addr = ftl_band_next_xfer_addr(band, wptr->addr, xfer_size);
762 	wptr->zone = ftl_band_next_operational_zone(band, wptr->zone);
763 
764 	assert(!ftl_addr_invalid(wptr->addr));
765 
766 	SPDK_DEBUGLOG(ftl_core, "wptr: pu:%lu band:%lu, offset:%lu\n",
767 		      ftl_addr_get_punit(dev, wptr->addr),
768 		      ftl_addr_get_band(dev, wptr->addr),
769 		      wptr->addr.offset);
770 
771 	if (wptr->offset >= next_thld && !dev->next_band) {
772 		dev->next_band = ftl_next_write_band(dev);
773 	}
774 }
775 
776 static size_t
777 ftl_wptr_user_blocks_left(const struct ftl_wptr *wptr)
778 {
779 	return ftl_band_user_blocks_left(wptr->band, wptr->offset);
780 }
781 
782 static bool
783 ftl_wptr_ready(struct ftl_wptr *wptr)
784 {
785 	struct ftl_band *band = wptr->band;
786 
787 	/* TODO: add handling of empty bands */
788 
789 	if (spdk_unlikely(!ftl_zone_is_writable(wptr->dev, wptr->zone))) {
790 		/* Erasing band may fail after it was assigned to wptr. */
791 		if (spdk_unlikely(wptr->zone->info.state == SPDK_BDEV_ZONE_STATE_OFFLINE)) {
792 			ftl_wptr_advance(wptr, wptr->dev->xfer_size);
793 		}
794 		return false;
795 	}
796 
797 	/* If we're in the process of writing metadata, wait till it is */
798 	/* completed. */
799 	/* TODO: we should probably change bands once we're writing tail md */
800 	if (ftl_band_state_changing(band)) {
801 		return false;
802 	}
803 
804 	if (band->state == FTL_BAND_STATE_FULL) {
805 		if (wptr->num_outstanding == 0) {
806 			if (ftl_wptr_close_band(wptr)) {
807 				/* TODO: need recovery here */
808 				assert(false);
809 			}
810 		}
811 
812 		return false;
813 	}
814 
815 	if (band->state != FTL_BAND_STATE_OPEN) {
816 		if (ftl_wptr_open_band(wptr)) {
817 			/* TODO: need recovery here */
818 			assert(false);
819 		}
820 
821 		return false;
822 	}
823 
824 	return true;
825 }
826 
827 int
828 ftl_flush_active_bands(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
829 {
830 	struct ftl_wptr *wptr;
831 	struct ftl_band_flush *flush;
832 
833 	assert(ftl_get_core_thread(dev) == spdk_get_thread());
834 
835 	flush = calloc(1, sizeof(*flush));
836 	if (spdk_unlikely(!flush)) {
837 		return -ENOMEM;
838 	}
839 
840 	LIST_INSERT_HEAD(&dev->band_flush_list, flush, list_entry);
841 
842 	flush->cb_fn = cb_fn;
843 	flush->cb_arg = cb_arg;
844 	flush->dev = dev;
845 
846 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
847 		wptr->flush = true;
848 		flush->num_bands++;
849 	}
850 
851 	return 0;
852 }
853 
854 static const struct spdk_ftl_limit *
855 ftl_get_limit(const struct spdk_ftl_dev *dev, int type)
856 {
857 	assert(type < SPDK_FTL_LIMIT_MAX);
858 	return &dev->conf.limits[type];
859 }
860 
861 static bool
862 ftl_cache_lba_valid(struct spdk_ftl_dev *dev, struct ftl_wbuf_entry *entry)
863 {
864 	struct ftl_addr addr;
865 
866 	/* If the LBA is invalid don't bother checking the md and l2p */
867 	if (spdk_unlikely(entry->lba == FTL_LBA_INVALID)) {
868 		return false;
869 	}
870 
871 	addr = ftl_l2p_get(dev, entry->lba);
872 	if (!(ftl_addr_cached(addr) && entry == ftl_get_entry_from_addr(dev, addr))) {
873 		return false;
874 	}
875 
876 	return true;
877 }
878 
879 void
880 ftl_evict_cache_entry(struct spdk_ftl_dev *dev, struct ftl_wbuf_entry *entry)
881 {
882 	pthread_spin_lock(&entry->lock);
883 
884 	if (!entry->valid) {
885 		goto unlock;
886 	}
887 
888 	/* If the l2p wasn't updated and still points at the entry, fill it with the */
889 	/* on-disk address and clear the cache status bit. Otherwise, skip the l2p update */
890 	/* and just clear the cache status. */
891 	if (!ftl_cache_lba_valid(dev, entry)) {
892 		goto clear;
893 	}
894 
895 	ftl_l2p_set(dev, entry->lba, entry->addr);
896 clear:
897 	entry->valid = false;
898 unlock:
899 	pthread_spin_unlock(&entry->lock);
900 }
901 
902 static void
903 ftl_pad_wbuf(struct spdk_ftl_dev *dev, size_t size)
904 {
905 	struct ftl_wbuf_entry *entry;
906 	struct ftl_io_channel *ioch;
907 	int flags = FTL_IO_PAD | FTL_IO_INTERNAL;
908 
909 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
910 
911 	for (size_t i = 0; i < size; ++i) {
912 		entry = ftl_acquire_wbuf_entry(ioch, flags);
913 		if (!entry) {
914 			break;
915 		}
916 
917 		entry->lba = FTL_LBA_INVALID;
918 		entry->addr = ftl_to_addr(FTL_ADDR_INVALID);
919 		memset(entry->payload, 0, FTL_BLOCK_SIZE);
920 
921 		spdk_ring_enqueue(ioch->submit_queue, (void **)&entry, 1, NULL);
922 	}
923 }
924 
925 static void
926 ftl_remove_free_bands(struct spdk_ftl_dev *dev)
927 {
928 	while (!LIST_EMPTY(&dev->free_bands)) {
929 		LIST_REMOVE(LIST_FIRST(&dev->free_bands), list_entry);
930 	}
931 
932 	dev->next_band = NULL;
933 }
934 
935 static void
936 ftl_wptr_pad_band(struct ftl_wptr *wptr)
937 {
938 	struct spdk_ftl_dev *dev = wptr->dev;
939 	struct ftl_batch *batch = dev->current_batch;
940 	struct ftl_io_channel *ioch;
941 	struct ftl_io *io;
942 	size_t size, pad_size, blocks_left;
943 
944 	size = batch != NULL ? batch->num_entries : 0;
945 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
946 		size += spdk_ring_count(ioch->submit_queue);
947 
948 		TAILQ_FOREACH(io, &ioch->retry_queue, ioch_entry) {
949 			if (io->type == FTL_IO_WRITE) {
950 				size += io->num_blocks - io->pos;
951 			}
952 		}
953 	}
954 
955 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
956 
957 	blocks_left = ftl_wptr_user_blocks_left(wptr);
958 	assert(size <= blocks_left);
959 	assert(blocks_left % dev->xfer_size == 0);
960 	pad_size = spdk_min(blocks_left - size, spdk_ring_count(ioch->free_queue));
961 
962 	ftl_pad_wbuf(dev, pad_size);
963 }
964 
965 static void
966 ftl_wptr_process_shutdown(struct ftl_wptr *wptr)
967 {
968 	struct spdk_ftl_dev *dev = wptr->dev;
969 	struct ftl_batch *batch = dev->current_batch;
970 	struct ftl_io_channel *ioch;
971 	struct ftl_io *io;
972 	size_t size;
973 
974 	size = batch != NULL ? batch->num_entries : 0;
975 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
976 		size += spdk_ring_count(ioch->submit_queue);
977 
978 		TAILQ_FOREACH(io, &ioch->retry_queue, ioch_entry) {
979 			if (io->type == FTL_IO_WRITE) {
980 				size += io->num_blocks - io->pos;
981 			}
982 		}
983 	}
984 
985 	if (size >= dev->xfer_size) {
986 		return;
987 	}
988 
989 	/* If we reach this point we need to remove free bands */
990 	/* and pad current wptr band to the end */
991 	ftl_remove_free_bands(dev);
992 	ftl_wptr_pad_band(wptr);
993 }
994 
995 static int
996 ftl_shutdown_complete(struct spdk_ftl_dev *dev)
997 {
998 	struct ftl_io_channel *ioch = ftl_io_channel_get_ctx(dev->ioch);
999 
1000 	return !__atomic_load_n(&dev->num_inflight, __ATOMIC_SEQ_CST) &&
1001 	       dev->num_io_channels == 1 && LIST_EMPTY(&dev->wptr_list) &&
1002 	       TAILQ_EMPTY(&ioch->retry_queue);
1003 }
1004 
1005 void
1006 ftl_apply_limits(struct spdk_ftl_dev *dev)
1007 {
1008 	const struct spdk_ftl_limit *limit;
1009 	struct ftl_io_channel *ioch;
1010 	struct ftl_stats *stats = &dev->stats;
1011 	uint32_t qdepth_limit = 100;
1012 	int i;
1013 
1014 	/* Clear existing limit */
1015 	dev->limit = SPDK_FTL_LIMIT_MAX;
1016 
1017 	for (i = SPDK_FTL_LIMIT_CRIT; i < SPDK_FTL_LIMIT_MAX; ++i) {
1018 		limit = ftl_get_limit(dev, i);
1019 
1020 		if (dev->num_free <= limit->thld) {
1021 			qdepth_limit = limit->limit;
1022 			stats->limits[i]++;
1023 			dev->limit = i;
1024 			break;
1025 		}
1026 	}
1027 
1028 	ftl_trace_limits(dev, dev->limit, dev->num_free);
1029 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
1030 		__atomic_store_n(&ioch->qdepth_limit, (qdepth_limit * ioch->num_entries) / 100,
1031 				 __ATOMIC_SEQ_CST);
1032 	}
1033 }
1034 
1035 static int
1036 ftl_invalidate_addr_unlocked(struct spdk_ftl_dev *dev, struct ftl_addr addr)
1037 {
1038 	struct ftl_band *band = ftl_band_from_addr(dev, addr);
1039 	struct ftl_lba_map *lba_map = &band->lba_map;
1040 	uint64_t offset;
1041 
1042 	offset = ftl_band_block_offset_from_addr(band, addr);
1043 
1044 	/* The bit might be already cleared if two writes are scheduled to the */
1045 	/* same LBA at the same time */
1046 	if (spdk_bit_array_get(lba_map->vld, offset)) {
1047 		assert(lba_map->num_vld > 0);
1048 		spdk_bit_array_clear(lba_map->vld, offset);
1049 		lba_map->num_vld--;
1050 		return 1;
1051 	}
1052 
1053 	return 0;
1054 }
1055 
1056 int
1057 ftl_invalidate_addr(struct spdk_ftl_dev *dev, struct ftl_addr addr)
1058 {
1059 	struct ftl_band *band;
1060 	int rc;
1061 
1062 	assert(!ftl_addr_cached(addr));
1063 	band = ftl_band_from_addr(dev, addr);
1064 
1065 	pthread_spin_lock(&band->lba_map.lock);
1066 	rc = ftl_invalidate_addr_unlocked(dev, addr);
1067 	pthread_spin_unlock(&band->lba_map.lock);
1068 
1069 	return rc;
1070 }
1071 
1072 static int
1073 ftl_read_retry(int rc)
1074 {
1075 	return rc == -EAGAIN;
1076 }
1077 
1078 static int
1079 ftl_read_canceled(int rc)
1080 {
1081 	return rc == -EFAULT || rc == 0;
1082 }
1083 
1084 static int
1085 ftl_cache_read(struct ftl_io *io, uint64_t lba,
1086 	       struct ftl_addr addr, void *buf)
1087 {
1088 	struct ftl_wbuf_entry *entry;
1089 	struct ftl_addr naddr;
1090 	int rc = 0;
1091 
1092 	entry = ftl_get_entry_from_addr(io->dev, addr);
1093 	pthread_spin_lock(&entry->lock);
1094 
1095 	naddr = ftl_l2p_get(io->dev, lba);
1096 	if (addr.offset != naddr.offset) {
1097 		rc = -1;
1098 		goto out;
1099 	}
1100 
1101 	memcpy(buf, entry->payload, FTL_BLOCK_SIZE);
1102 out:
1103 	pthread_spin_unlock(&entry->lock);
1104 	return rc;
1105 }
1106 
1107 static int
1108 ftl_read_next_logical_addr(struct ftl_io *io, struct ftl_addr *addr)
1109 {
1110 	struct spdk_ftl_dev *dev = io->dev;
1111 	struct ftl_addr next_addr;
1112 	size_t i;
1113 
1114 	*addr = ftl_l2p_get(dev, ftl_io_current_lba(io));
1115 
1116 	SPDK_DEBUGLOG(ftl_core, "Read addr:%lx, lba:%lu\n",
1117 		      addr->offset, ftl_io_current_lba(io));
1118 
1119 	/* If the address is invalid, skip it (the buffer should already be zeroed) */
1120 	if (ftl_addr_invalid(*addr)) {
1121 		return -EFAULT;
1122 	}
1123 
1124 	if (ftl_addr_cached(*addr)) {
1125 		if (!ftl_cache_read(io, ftl_io_current_lba(io), *addr, ftl_io_iovec_addr(io))) {
1126 			return 0;
1127 		}
1128 
1129 		/* If the state changed, we have to re-read the l2p */
1130 		return -EAGAIN;
1131 	}
1132 
1133 	for (i = 1; i < ftl_io_iovec_len_left(io); ++i) {
1134 		next_addr = ftl_l2p_get(dev, ftl_io_get_lba(io, io->pos + i));
1135 
1136 		if (ftl_addr_invalid(next_addr) || ftl_addr_cached(next_addr)) {
1137 			break;
1138 		}
1139 
1140 		if (addr->offset + i != next_addr.offset) {
1141 			break;
1142 		}
1143 	}
1144 
1145 	return i;
1146 }
1147 
1148 static int
1149 ftl_submit_read(struct ftl_io *io)
1150 {
1151 	struct spdk_ftl_dev *dev = io->dev;
1152 	struct ftl_io_channel *ioch;
1153 	struct ftl_addr addr;
1154 	int rc = 0, num_blocks;
1155 
1156 	ioch = ftl_io_channel_get_ctx(io->ioch);
1157 
1158 	assert(LIST_EMPTY(&io->children));
1159 
1160 	while (io->pos < io->num_blocks) {
1161 		if (ftl_io_mode_physical(io)) {
1162 			num_blocks = rc = ftl_read_next_physical_addr(io, &addr);
1163 		} else {
1164 			num_blocks = rc = ftl_read_next_logical_addr(io, &addr);
1165 		}
1166 
1167 		/* We might need to retry the read from scratch (e.g. */
1168 		/* because write was under way and completed before */
1169 		/* we could read it from the write buffer */
1170 		if (ftl_read_retry(rc)) {
1171 			continue;
1172 		}
1173 
1174 		/* We don't have to schedule the read, as it was read from cache */
1175 		if (ftl_read_canceled(rc)) {
1176 			ftl_io_advance(io, 1);
1177 			ftl_trace_completion(io->dev, io, rc ? FTL_TRACE_COMPLETION_INVALID :
1178 					     FTL_TRACE_COMPLETION_CACHE);
1179 			rc = 0;
1180 			continue;
1181 		}
1182 
1183 		assert(num_blocks > 0);
1184 
1185 		ftl_trace_submission(dev, io, addr, num_blocks);
1186 		rc = spdk_bdev_read_blocks(dev->base_bdev_desc, ioch->base_ioch,
1187 					   ftl_io_iovec_addr(io),
1188 					   addr.offset,
1189 					   num_blocks, ftl_io_cmpl_cb, io);
1190 		if (spdk_unlikely(rc)) {
1191 			if (rc == -ENOMEM) {
1192 				TAILQ_INSERT_TAIL(&ioch->retry_queue, io, ioch_entry);
1193 				rc = 0;
1194 			} else {
1195 				ftl_io_fail(io, rc);
1196 			}
1197 			break;
1198 		}
1199 
1200 		ftl_io_inc_req(io);
1201 		ftl_io_advance(io, num_blocks);
1202 	}
1203 
1204 	/* If we didn't have to read anything from the device, */
1205 	/* complete the request right away */
1206 	if (ftl_io_done(io)) {
1207 		ftl_io_complete(io);
1208 	}
1209 
1210 	return rc;
1211 }
1212 
1213 static void
1214 ftl_complete_flush(struct ftl_flush *flush)
1215 {
1216 	assert(flush->num_req == 0);
1217 	LIST_REMOVE(flush, list_entry);
1218 
1219 	flush->cb.fn(flush->cb.ctx, 0);
1220 
1221 	spdk_bit_array_free(&flush->bmap);
1222 	free(flush);
1223 }
1224 
1225 static void
1226 ftl_process_flush(struct spdk_ftl_dev *dev, struct ftl_batch *batch)
1227 {
1228 	struct ftl_flush *flush, *tflush;
1229 	size_t offset;
1230 
1231 	LIST_FOREACH_SAFE(flush, &dev->flush_list, list_entry, tflush) {
1232 		offset = batch->index;
1233 
1234 		if (spdk_bit_array_get(flush->bmap, offset)) {
1235 			spdk_bit_array_clear(flush->bmap, offset);
1236 			if (!(--flush->num_req)) {
1237 				ftl_complete_flush(flush);
1238 			}
1239 		}
1240 	}
1241 }
1242 
1243 static void
1244 ftl_nv_cache_wrap_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1245 {
1246 	struct ftl_nv_cache *nv_cache = cb_arg;
1247 
1248 	if (!success) {
1249 		SPDK_ERRLOG("Unable to write non-volatile cache metadata header\n");
1250 		/* TODO: go into read-only mode */
1251 		assert(0);
1252 	}
1253 
1254 	pthread_spin_lock(&nv_cache->lock);
1255 	nv_cache->ready = true;
1256 	pthread_spin_unlock(&nv_cache->lock);
1257 
1258 	spdk_bdev_free_io(bdev_io);
1259 }
1260 
1261 static void
1262 ftl_nv_cache_wrap(void *ctx)
1263 {
1264 	struct ftl_nv_cache *nv_cache = ctx;
1265 	int rc;
1266 
1267 	rc = ftl_nv_cache_write_header(nv_cache, false, ftl_nv_cache_wrap_cb, nv_cache);
1268 	if (spdk_unlikely(rc != 0)) {
1269 		SPDK_ERRLOG("Unable to write non-volatile cache metadata header: %s\n",
1270 			    spdk_strerror(-rc));
1271 		/* TODO: go into read-only mode */
1272 		assert(0);
1273 	}
1274 }
1275 
1276 static uint64_t
1277 ftl_reserve_nv_cache(struct ftl_nv_cache *nv_cache, size_t *num_blocks, unsigned int *phase)
1278 {
1279 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1280 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1281 	uint64_t num_available, cache_size, cache_addr = FTL_LBA_INVALID;
1282 
1283 	cache_size = spdk_bdev_get_num_blocks(bdev);
1284 
1285 	pthread_spin_lock(&nv_cache->lock);
1286 	if (spdk_unlikely(nv_cache->num_available == 0 || !nv_cache->ready)) {
1287 		goto out;
1288 	}
1289 
1290 	num_available = spdk_min(nv_cache->num_available, *num_blocks);
1291 	num_available = spdk_min(num_available, dev->conf.nv_cache.max_request_cnt);
1292 
1293 	if (spdk_unlikely(nv_cache->current_addr + num_available > cache_size)) {
1294 		*num_blocks = cache_size - nv_cache->current_addr;
1295 	} else {
1296 		*num_blocks = num_available;
1297 	}
1298 
1299 	cache_addr = nv_cache->current_addr;
1300 	nv_cache->current_addr += *num_blocks;
1301 	nv_cache->num_available -= *num_blocks;
1302 	*phase = nv_cache->phase;
1303 
1304 	if (nv_cache->current_addr == spdk_bdev_get_num_blocks(bdev)) {
1305 		nv_cache->current_addr = FTL_NV_CACHE_DATA_OFFSET;
1306 		nv_cache->phase = ftl_nv_cache_next_phase(nv_cache->phase);
1307 		nv_cache->ready = false;
1308 		spdk_thread_send_msg(ftl_get_core_thread(dev), ftl_nv_cache_wrap, nv_cache);
1309 	}
1310 out:
1311 	pthread_spin_unlock(&nv_cache->lock);
1312 	return cache_addr;
1313 }
1314 
1315 static struct ftl_io *
1316 ftl_alloc_io_nv_cache(struct ftl_io *parent, size_t num_blocks)
1317 {
1318 	struct ftl_io_init_opts opts = {
1319 		.dev		= parent->dev,
1320 		.parent		= parent,
1321 		.iovcnt		= 0,
1322 		.num_blocks	= num_blocks,
1323 		.flags		= parent->flags | FTL_IO_CACHE,
1324 	};
1325 
1326 	return ftl_io_init_internal(&opts);
1327 }
1328 
1329 static void
1330 ftl_nv_cache_submit_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1331 {
1332 	struct ftl_io *io = cb_arg;
1333 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1334 
1335 	if (spdk_unlikely(!success)) {
1336 		SPDK_ERRLOG("Non-volatile cache write failed at %"PRIx64"\n", io->addr.offset);
1337 		io->status = -EIO;
1338 	}
1339 
1340 	ftl_io_dec_req(io);
1341 	if (ftl_io_done(io)) {
1342 		spdk_mempool_put(nv_cache->md_pool, io->md);
1343 		ftl_io_complete(io);
1344 	}
1345 
1346 	spdk_bdev_free_io(bdev_io);
1347 }
1348 
1349 static void
1350 ftl_submit_nv_cache(void *ctx)
1351 {
1352 	struct ftl_io *io = ctx;
1353 	struct spdk_ftl_dev *dev = io->dev;
1354 	struct spdk_thread *thread;
1355 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
1356 	struct ftl_io_channel *ioch;
1357 	int rc;
1358 
1359 	ioch = ftl_io_channel_get_ctx(io->ioch);
1360 	thread = spdk_io_channel_get_thread(io->ioch);
1361 
1362 	rc = spdk_bdev_write_blocks_with_md(nv_cache->bdev_desc, ioch->cache_ioch,
1363 					    ftl_io_iovec_addr(io), io->md, io->addr.offset,
1364 					    io->num_blocks, ftl_nv_cache_submit_cb, io);
1365 	if (rc == -ENOMEM) {
1366 		spdk_thread_send_msg(thread, ftl_submit_nv_cache, io);
1367 		return;
1368 	} else if (rc) {
1369 		SPDK_ERRLOG("Write to persistent cache failed: %s (%"PRIu64", %"PRIu64")\n",
1370 			    spdk_strerror(-rc), io->addr.offset, io->num_blocks);
1371 		spdk_mempool_put(nv_cache->md_pool, io->md);
1372 		io->status = -EIO;
1373 		ftl_io_complete(io);
1374 		return;
1375 	}
1376 
1377 	ftl_io_advance(io, io->num_blocks);
1378 	ftl_io_inc_req(io);
1379 }
1380 
1381 static void
1382 ftl_nv_cache_fill_md(struct ftl_io *io, unsigned int phase)
1383 {
1384 	struct spdk_bdev *bdev;
1385 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1386 	uint64_t block_off, lba;
1387 	void *md_buf = io->md;
1388 
1389 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1390 
1391 	for (block_off = 0; block_off < io->num_blocks; ++block_off) {
1392 		lba = ftl_nv_cache_pack_lba(ftl_io_get_lba(io, block_off), phase);
1393 		memcpy(md_buf, &lba, sizeof(lba));
1394 		md_buf += spdk_bdev_get_md_size(bdev);
1395 	}
1396 }
1397 
1398 static void
1399 _ftl_write_nv_cache(void *ctx)
1400 {
1401 	struct ftl_io *child, *io = ctx;
1402 	struct spdk_ftl_dev *dev = io->dev;
1403 	struct spdk_thread *thread;
1404 	unsigned int phase;
1405 	uint64_t num_blocks;
1406 
1407 	thread = spdk_io_channel_get_thread(io->ioch);
1408 
1409 	while (io->pos < io->num_blocks) {
1410 		num_blocks = ftl_io_iovec_len_left(io);
1411 
1412 		child = ftl_alloc_io_nv_cache(io, num_blocks);
1413 		if (spdk_unlikely(!child)) {
1414 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1415 			return;
1416 		}
1417 
1418 		child->md = spdk_mempool_get(dev->nv_cache.md_pool);
1419 		if (spdk_unlikely(!child->md)) {
1420 			ftl_io_free(child);
1421 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1422 			break;
1423 		}
1424 
1425 		/* Reserve area on the write buffer cache */
1426 		child->addr.offset = ftl_reserve_nv_cache(&dev->nv_cache, &num_blocks, &phase);
1427 		if (child->addr.offset == FTL_LBA_INVALID) {
1428 			spdk_mempool_put(dev->nv_cache.md_pool, child->md);
1429 			ftl_io_free(child);
1430 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1431 			break;
1432 		}
1433 
1434 		/* Shrink the IO if there isn't enough room in the cache to fill the whole iovec */
1435 		if (spdk_unlikely(num_blocks != ftl_io_iovec_len_left(io))) {
1436 			ftl_io_shrink_iovec(child, num_blocks);
1437 		}
1438 
1439 		ftl_nv_cache_fill_md(child, phase);
1440 		ftl_submit_nv_cache(child);
1441 	}
1442 
1443 	if (ftl_io_done(io)) {
1444 		ftl_io_complete(io);
1445 	}
1446 }
1447 
1448 static void
1449 ftl_write_nv_cache(struct ftl_io *parent)
1450 {
1451 	ftl_io_reset(parent);
1452 	parent->flags |= FTL_IO_CACHE;
1453 	_ftl_write_nv_cache(parent);
1454 }
1455 
1456 int
1457 ftl_nv_cache_write_header(struct ftl_nv_cache *nv_cache, bool shutdown,
1458 			  spdk_bdev_io_completion_cb cb_fn, void *cb_arg)
1459 {
1460 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1461 	struct ftl_nv_cache_header *hdr = nv_cache->dma_buf;
1462 	struct spdk_bdev *bdev;
1463 	struct ftl_io_channel *ioch;
1464 
1465 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1466 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
1467 
1468 	memset(hdr, 0, spdk_bdev_get_block_size(bdev));
1469 
1470 	hdr->phase = (uint8_t)nv_cache->phase;
1471 	hdr->size = spdk_bdev_get_num_blocks(bdev);
1472 	hdr->uuid = dev->uuid;
1473 	hdr->version = FTL_NV_CACHE_HEADER_VERSION;
1474 	hdr->current_addr = shutdown ? nv_cache->current_addr : FTL_LBA_INVALID;
1475 	hdr->checksum = spdk_crc32c_update(hdr, offsetof(struct ftl_nv_cache_header, checksum), 0);
1476 
1477 	return spdk_bdev_write_blocks(nv_cache->bdev_desc, ioch->cache_ioch, hdr, 0, 1,
1478 				      cb_fn, cb_arg);
1479 }
1480 
1481 int
1482 ftl_nv_cache_scrub(struct ftl_nv_cache *nv_cache, spdk_bdev_io_completion_cb cb_fn, void *cb_arg)
1483 {
1484 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1485 	struct ftl_io_channel *ioch;
1486 	struct spdk_bdev *bdev;
1487 
1488 	ioch = ftl_io_channel_get_ctx(ftl_get_io_channel(dev));
1489 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1490 
1491 	return spdk_bdev_write_zeroes_blocks(nv_cache->bdev_desc, ioch->cache_ioch, 1,
1492 					     spdk_bdev_get_num_blocks(bdev) - 1,
1493 					     cb_fn, cb_arg);
1494 }
1495 
1496 static void
1497 ftl_write_fail(struct ftl_io *io, int status)
1498 {
1499 	struct ftl_batch *batch = io->batch;
1500 	struct spdk_ftl_dev *dev = io->dev;
1501 	struct ftl_wbuf_entry *entry;
1502 	struct ftl_band *band;
1503 	char buf[128];
1504 
1505 	entry = TAILQ_FIRST(&batch->entries);
1506 
1507 	band = ftl_band_from_addr(io->dev, entry->addr);
1508 	SPDK_ERRLOG("Write failed @addr: %s, status: %d\n",
1509 		    ftl_addr2str(entry->addr, buf, sizeof(buf)), status);
1510 
1511 	/* Close the band and, halt wptr and defrag */
1512 	ftl_halt_writes(dev, band);
1513 
1514 	TAILQ_FOREACH(entry, &batch->entries, tailq) {
1515 		/* Invalidate meta set by process_writes() */
1516 		ftl_invalidate_addr(dev, entry->addr);
1517 	}
1518 
1519 	/* Reset the batch back to the write buffer to resend it later */
1520 	TAILQ_INSERT_TAIL(&dev->pending_batches, batch, tailq);
1521 }
1522 
1523 static void
1524 ftl_write_cb(struct ftl_io *io, void *arg, int status)
1525 {
1526 	struct spdk_ftl_dev *dev = io->dev;
1527 	struct ftl_batch *batch = io->batch;
1528 	struct ftl_wbuf_entry *entry;
1529 	struct ftl_band *band;
1530 	struct ftl_addr prev_addr, addr = io->addr;
1531 
1532 	if (status) {
1533 		ftl_write_fail(io, status);
1534 		return;
1535 	}
1536 
1537 	assert(io->num_blocks == dev->xfer_size);
1538 	assert(!(io->flags & FTL_IO_MD));
1539 
1540 	TAILQ_FOREACH(entry, &batch->entries, tailq) {
1541 		band = entry->band;
1542 		if (!(entry->io_flags & FTL_IO_PAD)) {
1543 			/* Verify that the LBA is set for user blocks */
1544 			assert(entry->lba != FTL_LBA_INVALID);
1545 		}
1546 
1547 		if (band != NULL) {
1548 			assert(band->num_reloc_blocks > 0);
1549 			band->num_reloc_blocks--;
1550 		}
1551 
1552 		entry->addr = addr;
1553 		if (entry->lba != FTL_LBA_INVALID) {
1554 			pthread_spin_lock(&entry->lock);
1555 			prev_addr = ftl_l2p_get(dev, entry->lba);
1556 
1557 			/* If the l2p was updated in the meantime, don't update band's metadata */
1558 			if (ftl_addr_cached(prev_addr) &&
1559 			    entry == ftl_get_entry_from_addr(dev, prev_addr)) {
1560 				/* Setting entry's cache bit needs to be done after metadata */
1561 				/* within the band is updated to make sure that writes */
1562 				/* invalidating the entry clear the metadata as well */
1563 				ftl_band_set_addr(io->band, entry->lba, entry->addr);
1564 				entry->valid = true;
1565 			}
1566 			pthread_spin_unlock(&entry->lock);
1567 		}
1568 
1569 		SPDK_DEBUGLOG(ftl_core, "Write addr:%lu, lba:%lu\n",
1570 			      entry->addr.offset, entry->lba);
1571 
1572 		addr = ftl_band_next_addr(io->band, addr, 1);
1573 	}
1574 
1575 	ftl_process_flush(dev, batch);
1576 	ftl_release_batch(dev, batch);
1577 }
1578 
1579 static void
1580 ftl_update_stats(struct spdk_ftl_dev *dev, const struct ftl_wbuf_entry *entry)
1581 {
1582 	if (!(entry->io_flags & FTL_IO_INTERNAL)) {
1583 		dev->stats.write_user++;
1584 	}
1585 	dev->stats.write_total++;
1586 }
1587 
1588 static void
1589 ftl_update_l2p(struct spdk_ftl_dev *dev, const struct ftl_wbuf_entry *entry,
1590 	       struct ftl_addr addr)
1591 {
1592 	struct ftl_addr prev_addr;
1593 	struct ftl_wbuf_entry *prev;
1594 	struct ftl_band *band;
1595 	int valid;
1596 	bool io_weak = entry->io_flags & FTL_IO_WEAK;
1597 
1598 	prev_addr = ftl_l2p_get(dev, entry->lba);
1599 	if (ftl_addr_invalid(prev_addr)) {
1600 		ftl_l2p_set(dev, entry->lba, addr);
1601 		return;
1602 	}
1603 
1604 	if (ftl_addr_cached(prev_addr)) {
1605 		prev = ftl_get_entry_from_addr(dev, prev_addr);
1606 		pthread_spin_lock(&prev->lock);
1607 
1608 		/* Re-read the L2P under the lock to protect against updates */
1609 		/* to this LBA from other threads */
1610 		prev_addr = ftl_l2p_get(dev, entry->lba);
1611 
1612 		/* If the entry is no longer in cache, another write has been */
1613 		/* scheduled in the meantime, so we can return to evicted path */
1614 		if (!ftl_addr_cached(prev_addr)) {
1615 			pthread_spin_unlock(&prev->lock);
1616 			goto evicted;
1617 		}
1618 
1619 		/*
1620 		 * Relocating block could still reside in cache due to fact that write
1621 		 * buffers are independent for each IO channel and enough amount of data
1622 		 * (write unit size) must be collected before it will be submitted to lower
1623 		 * layer.
1624 		 * When previous entry wasn't overwritten invalidate old address and entry.
1625 		 * Otherwise skip relocating block.
1626 		 */
1627 		if (io_weak &&
1628 		    /* Check if prev_addr was updated in meantime */
1629 		    !(ftl_addr_cmp(prev_addr, ftl_get_addr_from_entry(prev)) &&
1630 		      /* Check if relocating address it the same as in previous entry */
1631 		      ftl_addr_cmp(prev->addr, entry->addr))) {
1632 			pthread_spin_unlock(&prev->lock);
1633 			return;
1634 		}
1635 
1636 		/*
1637 		 * If previous entry is part of cache and was written into disk remove
1638 		 * and invalidate it
1639 		 */
1640 		if (prev->valid) {
1641 			ftl_invalidate_addr(dev, prev->addr);
1642 			prev->valid = false;
1643 		}
1644 
1645 		ftl_l2p_set(dev, entry->lba, addr);
1646 		pthread_spin_unlock(&prev->lock);
1647 		return;
1648 	}
1649 
1650 evicted:
1651 	/*
1652 	 *  If the L2P's physical address is different than what we expected we don't need to
1653 	 *  do anything (someone's already overwritten our data).
1654 	 */
1655 	if (io_weak && !ftl_addr_cmp(prev_addr, entry->addr)) {
1656 		return;
1657 	}
1658 
1659 	/* Lock the band containing previous physical address. This assures atomic changes to */
1660 	/* the L2P as wall as metadata. The valid bits in metadata are used to */
1661 	/* check weak writes validity. */
1662 	band = ftl_band_from_addr(dev, prev_addr);
1663 	pthread_spin_lock(&band->lba_map.lock);
1664 
1665 	valid = ftl_invalidate_addr_unlocked(dev, prev_addr);
1666 
1667 	/* If the address has been invalidated already, we don't want to update */
1668 	/* the L2P for weak writes, as it means the write is no longer valid. */
1669 	if (!io_weak || valid) {
1670 		ftl_l2p_set(dev, entry->lba, addr);
1671 	}
1672 
1673 	pthread_spin_unlock(&band->lba_map.lock);
1674 }
1675 
1676 static struct ftl_io *
1677 ftl_io_init_child_write(struct ftl_io *parent, struct ftl_addr addr, ftl_io_fn cb)
1678 {
1679 	struct ftl_io *io;
1680 	struct spdk_ftl_dev *dev = parent->dev;
1681 	struct ftl_io_init_opts opts = {
1682 		.dev		= dev,
1683 		.io		= NULL,
1684 		.parent		= parent,
1685 		.band		= parent->band,
1686 		.size		= sizeof(struct ftl_io),
1687 		.flags		= 0,
1688 		.type		= parent->type,
1689 		.num_blocks	= dev->xfer_size,
1690 		.cb_fn		= cb,
1691 		.iovcnt		= 0,
1692 	};
1693 
1694 	io = ftl_io_init_internal(&opts);
1695 	if (!io) {
1696 		return NULL;
1697 	}
1698 
1699 	io->addr = addr;
1700 
1701 	return io;
1702 }
1703 
1704 static void
1705 ftl_io_child_write_cb(struct ftl_io *io, void *ctx, int status)
1706 {
1707 	struct ftl_zone *zone;
1708 	struct ftl_wptr *wptr;
1709 
1710 	zone = ftl_band_zone_from_addr(io->band, io->addr);
1711 	wptr = ftl_wptr_from_band(io->band);
1712 
1713 	zone->busy = false;
1714 	zone->info.write_pointer += io->num_blocks;
1715 
1716 	if (zone->info.write_pointer == zone->info.zone_id + zone->info.capacity) {
1717 		zone->info.state = SPDK_BDEV_ZONE_STATE_FULL;
1718 	}
1719 
1720 	/* If some other write on the same band failed the write pointer would already be freed */
1721 	if (spdk_likely(wptr)) {
1722 		wptr->num_outstanding--;
1723 	}
1724 }
1725 
1726 static int
1727 ftl_submit_child_write(struct ftl_wptr *wptr, struct ftl_io *io)
1728 {
1729 	struct spdk_ftl_dev	*dev = io->dev;
1730 	struct ftl_io_channel	*ioch;
1731 	struct ftl_io		*child;
1732 	struct ftl_addr		addr;
1733 	int			rc;
1734 
1735 	ioch = ftl_io_channel_get_ctx(io->ioch);
1736 
1737 	if (spdk_likely(!wptr->direct_mode)) {
1738 		addr = wptr->addr;
1739 	} else {
1740 		assert(io->flags & FTL_IO_DIRECT_ACCESS);
1741 		assert(ftl_addr_get_band(dev, io->addr) == wptr->band->id);
1742 		addr = io->addr;
1743 	}
1744 
1745 	/* Split IO to child requests and release zone immediately after child is completed */
1746 	child = ftl_io_init_child_write(io, addr, ftl_io_child_write_cb);
1747 	if (!child) {
1748 		return -EAGAIN;
1749 	}
1750 
1751 	wptr->num_outstanding++;
1752 
1753 	if (ftl_is_append_supported(dev)) {
1754 		rc = spdk_bdev_zone_appendv(dev->base_bdev_desc, ioch->base_ioch,
1755 					    child->iov, child->iov_cnt,
1756 					    ftl_addr_get_zone_slba(dev, addr),
1757 					    dev->xfer_size, ftl_io_cmpl_cb, child);
1758 	} else {
1759 		rc = spdk_bdev_writev_blocks(dev->base_bdev_desc, ioch->base_ioch,
1760 					     child->iov, child->iov_cnt, addr.offset,
1761 					     dev->xfer_size, ftl_io_cmpl_cb, child);
1762 	}
1763 
1764 	if (rc) {
1765 		wptr->num_outstanding--;
1766 		ftl_io_fail(child, rc);
1767 		ftl_io_complete(child);
1768 		SPDK_ERRLOG("spdk_bdev_write_blocks_with_md failed with status:%d, addr:%lu\n",
1769 			    rc, addr.offset);
1770 		return -EIO;
1771 	}
1772 
1773 	ftl_io_inc_req(child);
1774 	ftl_io_advance(child, dev->xfer_size);
1775 
1776 	return 0;
1777 }
1778 
1779 static int
1780 ftl_submit_write(struct ftl_wptr *wptr, struct ftl_io *io)
1781 {
1782 	struct spdk_ftl_dev	*dev = io->dev;
1783 	int			rc = 0;
1784 
1785 	assert(io->num_blocks % dev->xfer_size == 0);
1786 
1787 	while (io->iov_pos < io->iov_cnt) {
1788 		/* There are no guarantees of the order of completion of NVMe IO submission queue */
1789 		/* so wait until zone is not busy before submitting another write */
1790 		if (!ftl_is_append_supported(dev) && wptr->zone->busy) {
1791 			TAILQ_INSERT_TAIL(&wptr->pending_queue, io, ioch_entry);
1792 			rc = -EAGAIN;
1793 			break;
1794 		}
1795 
1796 		rc = ftl_submit_child_write(wptr, io);
1797 		if (spdk_unlikely(rc)) {
1798 			if (rc == -EAGAIN) {
1799 				TAILQ_INSERT_TAIL(&wptr->pending_queue, io, ioch_entry);
1800 			} else {
1801 				ftl_io_fail(io, rc);
1802 			}
1803 			break;
1804 		}
1805 
1806 		ftl_trace_submission(dev, io, wptr->addr, dev->xfer_size);
1807 		ftl_wptr_advance(wptr, dev->xfer_size);
1808 	}
1809 
1810 	if (ftl_io_done(io)) {
1811 		/* Parent IO will complete after all children are completed */
1812 		ftl_io_complete(io);
1813 	}
1814 
1815 	return rc;
1816 }
1817 
1818 static void
1819 ftl_flush_pad_batch(struct spdk_ftl_dev *dev)
1820 {
1821 	struct ftl_batch *batch = dev->current_batch;
1822 	struct ftl_io_channel *ioch;
1823 	size_t size = 0, num_entries = 0;
1824 
1825 	assert(batch != NULL);
1826 	assert(batch->num_entries < dev->xfer_size);
1827 
1828 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
1829 		size += spdk_ring_count(ioch->submit_queue);
1830 	}
1831 
1832 	num_entries = dev->xfer_size - batch->num_entries;
1833 	if (size < num_entries) {
1834 		ftl_pad_wbuf(dev, num_entries - size);
1835 	}
1836 }
1837 
1838 static bool
1839 ftl_check_io_channel_flush(struct spdk_ftl_dev *dev)
1840 {
1841 	struct ftl_io_channel *ioch;
1842 
1843 	TAILQ_FOREACH(ioch, &dev->ioch_queue, tailq) {
1844 		if (ioch->flush && spdk_ring_count(ioch->free_queue) != ioch->num_entries) {
1845 			return true;
1846 		}
1847 	}
1848 
1849 	return false;
1850 }
1851 
1852 static int
1853 ftl_wptr_process_writes(struct ftl_wptr *wptr)
1854 {
1855 	struct spdk_ftl_dev	*dev = wptr->dev;
1856 	struct ftl_batch	*batch;
1857 	struct ftl_wbuf_entry	*entry;
1858 	struct ftl_io		*io;
1859 
1860 	if (spdk_unlikely(!TAILQ_EMPTY(&wptr->pending_queue))) {
1861 		io = TAILQ_FIRST(&wptr->pending_queue);
1862 		TAILQ_REMOVE(&wptr->pending_queue, io, ioch_entry);
1863 
1864 		if (ftl_submit_write(wptr, io) == -EAGAIN) {
1865 			return 0;
1866 		}
1867 	}
1868 
1869 	/* Make sure the band is prepared for writing */
1870 	if (!ftl_wptr_ready(wptr)) {
1871 		return 0;
1872 	}
1873 
1874 	if (dev->halt) {
1875 		ftl_wptr_process_shutdown(wptr);
1876 	}
1877 
1878 	if (spdk_unlikely(wptr->flush)) {
1879 		ftl_wptr_pad_band(wptr);
1880 	}
1881 
1882 	batch = ftl_get_next_batch(dev);
1883 	if (!batch) {
1884 		/* If there are queued flush requests we need to pad the write buffer to */
1885 		/* force out remaining entries */
1886 		if (!LIST_EMPTY(&dev->flush_list) || ftl_check_io_channel_flush(dev)) {
1887 			ftl_flush_pad_batch(dev);
1888 		}
1889 
1890 		return 0;
1891 	}
1892 
1893 	io = ftl_io_wbuf_init(dev, wptr->addr, wptr->band, batch, ftl_write_cb);
1894 	if (!io) {
1895 		goto error;
1896 	}
1897 
1898 	TAILQ_FOREACH(entry, &batch->entries, tailq) {
1899 		/* Update band's relocation stats if the IO comes from reloc */
1900 		if (entry->io_flags & FTL_IO_WEAK) {
1901 			if (!spdk_bit_array_get(wptr->band->reloc_bitmap, entry->band->id)) {
1902 				spdk_bit_array_set(wptr->band->reloc_bitmap, entry->band->id);
1903 				entry->band->num_reloc_bands++;
1904 			}
1905 		}
1906 
1907 		ftl_trace_wbuf_pop(dev, entry);
1908 		ftl_update_stats(dev, entry);
1909 	}
1910 
1911 	SPDK_DEBUGLOG(ftl_core, "Write addr:%lx\n", wptr->addr.offset);
1912 
1913 	if (ftl_submit_write(wptr, io)) {
1914 		/* TODO: we need some recovery here */
1915 		assert(0 && "Write submit failed");
1916 		if (ftl_io_done(io)) {
1917 			ftl_io_free(io);
1918 		}
1919 	}
1920 
1921 	return dev->xfer_size;
1922 error:
1923 	TAILQ_INSERT_TAIL(&dev->pending_batches, batch, tailq);
1924 	return 0;
1925 }
1926 
1927 static bool
1928 ftl_process_writes(struct spdk_ftl_dev *dev)
1929 {
1930 	struct ftl_wptr *wptr, *twptr;
1931 	size_t num_active = 0, num_writes = 0;
1932 	enum ftl_band_state state;
1933 
1934 	LIST_FOREACH_SAFE(wptr, &dev->wptr_list, list_entry, twptr) {
1935 		num_writes += ftl_wptr_process_writes(wptr);
1936 		state = wptr->band->state;
1937 
1938 		if (state != FTL_BAND_STATE_FULL &&
1939 		    state != FTL_BAND_STATE_CLOSING &&
1940 		    state != FTL_BAND_STATE_CLOSED) {
1941 			num_active++;
1942 		}
1943 	}
1944 
1945 	if (num_active < 1) {
1946 		ftl_add_wptr(dev);
1947 	}
1948 
1949 	return num_writes != 0;
1950 }
1951 
1952 static void
1953 ftl_fill_wbuf_entry(struct ftl_wbuf_entry *entry, struct ftl_io *io)
1954 {
1955 	memcpy(entry->payload, ftl_io_iovec_addr(io), FTL_BLOCK_SIZE);
1956 
1957 	if (entry->io_flags & FTL_IO_WEAK) {
1958 		entry->band = ftl_band_from_addr(io->dev, io->addr);
1959 		entry->addr = ftl_band_next_addr(entry->band, io->addr, io->pos);
1960 		entry->band->num_reloc_blocks++;
1961 	}
1962 
1963 	entry->trace = io->trace;
1964 	entry->lba = ftl_io_current_lba(io);
1965 }
1966 
1967 static int
1968 ftl_wbuf_fill(struct ftl_io *io)
1969 {
1970 	struct spdk_ftl_dev *dev = io->dev;
1971 	struct ftl_io_channel *ioch;
1972 	struct ftl_wbuf_entry *entry;
1973 
1974 	ioch = ftl_io_channel_get_ctx(io->ioch);
1975 
1976 	while (io->pos < io->num_blocks) {
1977 		if (ftl_io_current_lba(io) == FTL_LBA_INVALID) {
1978 			ftl_io_advance(io, 1);
1979 			continue;
1980 		}
1981 
1982 		entry = ftl_acquire_wbuf_entry(ioch, io->flags);
1983 		if (!entry) {
1984 			TAILQ_INSERT_TAIL(&ioch->retry_queue, io, ioch_entry);
1985 			return 0;
1986 		}
1987 
1988 		ftl_fill_wbuf_entry(entry, io);
1989 
1990 		ftl_trace_wbuf_fill(dev, io);
1991 		ftl_update_l2p(dev, entry, ftl_get_addr_from_entry(entry));
1992 		ftl_io_advance(io, 1);
1993 
1994 		/* Needs to be done after L2P is updated to avoid race with */
1995 		/* write completion callback when it's processed faster than */
1996 		/* L2P is set in update_l2p(). */
1997 		spdk_ring_enqueue(ioch->submit_queue, (void **)&entry, 1, NULL);
1998 	}
1999 
2000 	if (ftl_io_done(io)) {
2001 		if (ftl_dev_has_nv_cache(dev) && !(io->flags & FTL_IO_BYPASS_CACHE)) {
2002 			ftl_write_nv_cache(io);
2003 		} else {
2004 			TAILQ_INSERT_TAIL(&ioch->write_cmpl_queue, io, ioch_entry);
2005 		}
2006 	}
2007 
2008 	return 0;
2009 }
2010 
2011 static bool
2012 ftl_dev_needs_defrag(struct spdk_ftl_dev *dev)
2013 {
2014 	const struct spdk_ftl_limit *limit = ftl_get_limit(dev, SPDK_FTL_LIMIT_START);
2015 
2016 	if (ftl_reloc_is_halted(dev->reloc)) {
2017 		return false;
2018 	}
2019 
2020 	if (ftl_reloc_is_defrag_active(dev->reloc)) {
2021 		return false;
2022 	}
2023 
2024 	if (dev->num_free <= limit->thld) {
2025 		return true;
2026 	}
2027 
2028 	return false;
2029 }
2030 
2031 static double
2032 ftl_band_calc_merit(struct ftl_band *band, size_t *threshold_valid)
2033 {
2034 	size_t usable, valid, invalid;
2035 	double vld_ratio;
2036 
2037 	/* If the band doesn't have any usable blocks it's of no use */
2038 	usable = ftl_band_num_usable_blocks(band);
2039 	if (usable == 0) {
2040 		return 0.0;
2041 	}
2042 
2043 	valid =  threshold_valid ? (usable - *threshold_valid) : band->lba_map.num_vld;
2044 	invalid = usable - valid;
2045 
2046 	/* Add one to avoid division by 0 */
2047 	vld_ratio = (double)invalid / (double)(valid + 1);
2048 	return vld_ratio * ftl_band_age(band);
2049 }
2050 
2051 static bool
2052 ftl_band_needs_defrag(struct ftl_band *band, struct spdk_ftl_dev *dev)
2053 {
2054 	struct spdk_ftl_conf *conf = &dev->conf;
2055 	size_t thld_vld;
2056 
2057 	/* If we're in dire need of free bands, every band is worth defragging */
2058 	if (ftl_current_limit(dev) == SPDK_FTL_LIMIT_CRIT) {
2059 		return true;
2060 	}
2061 
2062 	thld_vld = (ftl_band_num_usable_blocks(band) * conf->invalid_thld) / 100;
2063 
2064 	return band->merit > ftl_band_calc_merit(band, &thld_vld);
2065 }
2066 
2067 static struct ftl_band *
2068 ftl_select_defrag_band(struct spdk_ftl_dev *dev)
2069 {
2070 	struct ftl_band *band, *mband = NULL;
2071 	double merit = 0;
2072 
2073 	LIST_FOREACH(band, &dev->shut_bands, list_entry) {
2074 		assert(band->state == FTL_BAND_STATE_CLOSED);
2075 		band->merit = ftl_band_calc_merit(band, NULL);
2076 		if (band->merit > merit) {
2077 			merit = band->merit;
2078 			mband = band;
2079 		}
2080 	}
2081 
2082 	if (mband && !ftl_band_needs_defrag(mband, dev)) {
2083 		mband = NULL;
2084 	}
2085 
2086 	return mband;
2087 }
2088 
2089 static bool
2090 ftl_process_relocs(struct spdk_ftl_dev *dev)
2091 {
2092 	struct ftl_band *band;
2093 
2094 	if (ftl_dev_needs_defrag(dev)) {
2095 		band = ftl_select_defrag_band(dev);
2096 		if (band) {
2097 			ftl_reloc_add(dev->reloc, band, 0, ftl_get_num_blocks_in_band(dev), 0, true);
2098 			ftl_trace_defrag_band(dev, band);
2099 		}
2100 	}
2101 
2102 	return ftl_reloc(dev->reloc);
2103 }
2104 
2105 int
2106 ftl_current_limit(const struct spdk_ftl_dev *dev)
2107 {
2108 	return dev->limit;
2109 }
2110 
2111 void
2112 spdk_ftl_dev_get_attrs(const struct spdk_ftl_dev *dev, struct spdk_ftl_attrs *attrs)
2113 {
2114 	attrs->uuid = dev->uuid;
2115 	attrs->num_blocks = dev->num_lbas;
2116 	attrs->block_size = FTL_BLOCK_SIZE;
2117 	attrs->num_zones = ftl_get_num_zones(dev);
2118 	attrs->zone_size = ftl_get_num_blocks_in_zone(dev);
2119 	attrs->conf = dev->conf;
2120 	attrs->base_bdev = spdk_bdev_get_name(spdk_bdev_desc_get_bdev(dev->base_bdev_desc));
2121 
2122 	attrs->cache_bdev = NULL;
2123 	if (dev->nv_cache.bdev_desc) {
2124 		attrs->cache_bdev = spdk_bdev_get_name(
2125 					    spdk_bdev_desc_get_bdev(dev->nv_cache.bdev_desc));
2126 	}
2127 }
2128 
2129 static void
2130 _ftl_io_write(void *ctx)
2131 {
2132 	ftl_io_write((struct ftl_io *)ctx);
2133 }
2134 
2135 static int
2136 ftl_submit_write_leaf(struct ftl_io *io)
2137 {
2138 	int rc;
2139 
2140 	rc = ftl_submit_write(ftl_wptr_from_band(io->band), io);
2141 	if (rc == -EAGAIN) {
2142 		/* EAGAIN means that the request was put on the pending queue */
2143 		return 0;
2144 	}
2145 
2146 	return rc;
2147 }
2148 
2149 void
2150 ftl_io_write(struct ftl_io *io)
2151 {
2152 	struct spdk_ftl_dev *dev = io->dev;
2153 	struct ftl_io_channel *ioch = ftl_io_channel_get_ctx(io->ioch);
2154 
2155 	/* Put the IO on retry queue in case IO channel is not initialized */
2156 	if (spdk_unlikely(ioch->index == FTL_IO_CHANNEL_INDEX_INVALID)) {
2157 		TAILQ_INSERT_TAIL(&ioch->retry_queue, io, ioch_entry);
2158 		return;
2159 	}
2160 
2161 	/* For normal IOs we just need to copy the data onto the write buffer */
2162 	if (!(io->flags & FTL_IO_MD)) {
2163 		ftl_io_call_foreach_child(io, ftl_wbuf_fill);
2164 	} else {
2165 		/* Metadata has its own buffer, so it doesn't have to be copied, so just */
2166 		/* send it the the core thread and schedule the write immediately */
2167 		if (ftl_check_core_thread(dev)) {
2168 			ftl_io_call_foreach_child(io, ftl_submit_write_leaf);
2169 		} else {
2170 			spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_write, io);
2171 		}
2172 	}
2173 }
2174 
2175 int
2176 spdk_ftl_write(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
2177 	       struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
2178 {
2179 	struct ftl_io *io;
2180 
2181 	if (iov_cnt == 0) {
2182 		return -EINVAL;
2183 	}
2184 
2185 	if (lba_cnt == 0) {
2186 		return -EINVAL;
2187 	}
2188 
2189 	if (lba_cnt != ftl_iovec_num_blocks(iov, iov_cnt)) {
2190 		return -EINVAL;
2191 	}
2192 
2193 	if (!dev->initialized) {
2194 		return -EBUSY;
2195 	}
2196 
2197 	io = ftl_io_user_init(ch, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_WRITE);
2198 	if (!io) {
2199 		return -ENOMEM;
2200 	}
2201 
2202 	ftl_io_write(io);
2203 
2204 	return 0;
2205 }
2206 
2207 void
2208 ftl_io_read(struct ftl_io *io)
2209 {
2210 	ftl_io_call_foreach_child(io, ftl_submit_read);
2211 }
2212 
2213 int
2214 spdk_ftl_read(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
2215 	      struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
2216 {
2217 	struct ftl_io *io;
2218 
2219 	if (iov_cnt == 0) {
2220 		return -EINVAL;
2221 	}
2222 
2223 	if (lba_cnt == 0) {
2224 		return -EINVAL;
2225 	}
2226 
2227 	if (lba_cnt != ftl_iovec_num_blocks(iov, iov_cnt)) {
2228 		return -EINVAL;
2229 	}
2230 
2231 	if (!dev->initialized) {
2232 		return -EBUSY;
2233 	}
2234 
2235 	io = ftl_io_user_init(ch, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_READ);
2236 	if (!io) {
2237 		return -ENOMEM;
2238 	}
2239 
2240 	ftl_io_read(io);
2241 	return 0;
2242 }
2243 
2244 static struct ftl_flush *
2245 ftl_flush_init(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2246 {
2247 	struct ftl_flush *flush;
2248 
2249 	flush = calloc(1, sizeof(*flush));
2250 	if (!flush) {
2251 		return NULL;
2252 	}
2253 
2254 	flush->bmap = spdk_bit_array_create(FTL_BATCH_COUNT);
2255 	if (!flush->bmap) {
2256 		goto error;
2257 	}
2258 
2259 	flush->dev = dev;
2260 	flush->cb.fn = cb_fn;
2261 	flush->cb.ctx = cb_arg;
2262 
2263 	return flush;
2264 error:
2265 	free(flush);
2266 	return NULL;
2267 }
2268 
2269 static void
2270 _ftl_flush(void *ctx)
2271 {
2272 	struct ftl_flush *flush = ctx;
2273 	struct spdk_ftl_dev *dev = flush->dev;
2274 	uint32_t i;
2275 
2276 	/* Attach flush object to all non-empty batches */
2277 	for (i = 0; i < FTL_BATCH_COUNT; ++i) {
2278 		if (dev->batch_array[i].num_entries > 0) {
2279 			spdk_bit_array_set(flush->bmap, i);
2280 			flush->num_req++;
2281 		}
2282 	}
2283 
2284 	LIST_INSERT_HEAD(&dev->flush_list, flush, list_entry);
2285 
2286 	/* If the write buffer was already empty, the flush can be completed right away */
2287 	if (!flush->num_req) {
2288 		ftl_complete_flush(flush);
2289 	}
2290 }
2291 
2292 int
2293 ftl_flush_wbuf(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2294 {
2295 	struct ftl_flush *flush;
2296 
2297 	flush = ftl_flush_init(dev, cb_fn, cb_arg);
2298 	if (!flush) {
2299 		return -ENOMEM;
2300 	}
2301 
2302 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_flush, flush);
2303 	return 0;
2304 }
2305 
2306 int
2307 spdk_ftl_flush(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2308 {
2309 	if (!dev->initialized) {
2310 		return -EBUSY;
2311 	}
2312 
2313 	return ftl_flush_wbuf(dev, cb_fn, cb_arg);
2314 }
2315 
2316 bool
2317 ftl_addr_is_written(struct ftl_band *band, struct ftl_addr addr)
2318 {
2319 	struct ftl_zone *zone = ftl_band_zone_from_addr(band, addr);
2320 
2321 	return addr.offset < zone->info.write_pointer;
2322 }
2323 
2324 static void ftl_process_media_event(struct spdk_ftl_dev *dev, struct spdk_bdev_media_event event);
2325 
2326 static void
2327 _ftl_process_media_event(void *ctx)
2328 {
2329 	struct ftl_media_event *event = ctx;
2330 	struct spdk_ftl_dev *dev = event->dev;
2331 
2332 	ftl_process_media_event(dev, event->event);
2333 	spdk_mempool_put(dev->media_events_pool, event);
2334 }
2335 
2336 static void
2337 ftl_process_media_event(struct spdk_ftl_dev *dev, struct spdk_bdev_media_event event)
2338 {
2339 	struct ftl_band *band;
2340 	struct ftl_addr addr = { .offset = event.offset };
2341 	size_t block_off;
2342 
2343 	if (!ftl_check_core_thread(dev)) {
2344 		struct ftl_media_event *media_event;
2345 
2346 		media_event = spdk_mempool_get(dev->media_events_pool);
2347 		if (!media_event) {
2348 			SPDK_ERRLOG("Media event lost due to lack of memory");
2349 			return;
2350 		}
2351 
2352 		media_event->dev = dev;
2353 		media_event->event = event;
2354 		spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_process_media_event,
2355 				     media_event);
2356 		return;
2357 	}
2358 
2359 	band = ftl_band_from_addr(dev, addr);
2360 	block_off = ftl_band_block_offset_from_addr(band, addr);
2361 
2362 	ftl_reloc_add(dev->reloc, band, block_off, event.num_blocks, 0, false);
2363 }
2364 
2365 void
2366 ftl_get_media_events(struct spdk_ftl_dev *dev)
2367 {
2368 #define FTL_MAX_MEDIA_EVENTS 128
2369 	struct spdk_bdev_media_event events[FTL_MAX_MEDIA_EVENTS];
2370 	size_t num_events, i;
2371 
2372 	if (!dev->initialized) {
2373 		return;
2374 	}
2375 
2376 	do {
2377 		num_events = spdk_bdev_get_media_events(dev->base_bdev_desc,
2378 							events, FTL_MAX_MEDIA_EVENTS);
2379 
2380 		for (i = 0; i < num_events; ++i) {
2381 			ftl_process_media_event(dev, events[i]);
2382 		}
2383 
2384 	} while (num_events);
2385 }
2386 
2387 int
2388 ftl_io_channel_poll(void *arg)
2389 {
2390 	struct ftl_io_channel *ch = arg;
2391 	struct ftl_io *io;
2392 	TAILQ_HEAD(, ftl_io) retry_queue;
2393 
2394 	if (TAILQ_EMPTY(&ch->write_cmpl_queue) && TAILQ_EMPTY(&ch->retry_queue)) {
2395 		return SPDK_POLLER_IDLE;
2396 	}
2397 
2398 	while (!TAILQ_EMPTY(&ch->write_cmpl_queue)) {
2399 		io = TAILQ_FIRST(&ch->write_cmpl_queue);
2400 		TAILQ_REMOVE(&ch->write_cmpl_queue, io, ioch_entry);
2401 		ftl_io_complete(io);
2402 	}
2403 
2404 	/*
2405 	 * Create local copy of the retry queue to prevent from infinite retrying if IO will be
2406 	 * inserted to the retry queue again
2407 	 */
2408 	TAILQ_INIT(&retry_queue);
2409 	TAILQ_SWAP(&ch->retry_queue, &retry_queue, ftl_io, ioch_entry);
2410 
2411 	while (!TAILQ_EMPTY(&retry_queue)) {
2412 		io = TAILQ_FIRST(&retry_queue);
2413 		TAILQ_REMOVE(&retry_queue, io, ioch_entry);
2414 		if (io->type == FTL_IO_WRITE) {
2415 			ftl_io_write(io);
2416 		} else {
2417 			ftl_io_read(io);
2418 		}
2419 	}
2420 
2421 	return SPDK_POLLER_BUSY;
2422 }
2423 
2424 int
2425 ftl_task_core(void *ctx)
2426 {
2427 	struct spdk_ftl_dev *dev = ctx;
2428 	bool busy;
2429 
2430 	if (dev->halt) {
2431 		if (ftl_shutdown_complete(dev)) {
2432 			spdk_poller_unregister(&dev->core_poller);
2433 			return SPDK_POLLER_IDLE;
2434 		}
2435 	}
2436 
2437 	busy = ftl_process_writes(dev) || ftl_process_relocs(dev);
2438 
2439 	return busy ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
2440 }
2441 
2442 SPDK_LOG_REGISTER_COMPONENT(ftl_core)
2443