xref: /spdk/lib/ftl/ftl_core.c (revision 19d5c3ed8e87dbd240c77ae0ddb5eda25ae99b5f)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/likely.h"
35 #include "spdk/stdinc.h"
36 #include "spdk/nvme.h"
37 #include "spdk/io_channel.h"
38 #include "spdk/bdev_module.h"
39 #include "spdk/string.h"
40 #include "spdk_internal/log.h"
41 #include "spdk/ftl.h"
42 #include "spdk/crc32.h"
43 
44 #include "ftl_core.h"
45 #include "ftl_band.h"
46 #include "ftl_io.h"
47 #include "ftl_rwb.h"
48 #include "ftl_debug.h"
49 #include "ftl_reloc.h"
50 
51 struct ftl_band_flush {
52 	struct spdk_ftl_dev		*dev;
53 	/* Number of bands left to be flushed */
54 	size_t				num_bands;
55 	/* User callback */
56 	spdk_ftl_fn			cb_fn;
57 	/* Callback's argument */
58 	void				*cb_arg;
59 	/* List link */
60 	LIST_ENTRY(ftl_band_flush)	list_entry;
61 };
62 
63 struct ftl_wptr {
64 	/* Owner device */
65 	struct spdk_ftl_dev		*dev;
66 
67 	/* Current address */
68 	struct ftl_addr			addr;
69 
70 	/* Band currently being written to */
71 	struct ftl_band			*band;
72 
73 	/* Current logical block's offset */
74 	uint64_t			offset;
75 
76 	/* Current zone */
77 	struct ftl_zone			*zone;
78 
79 	/* Pending IO queue */
80 	TAILQ_HEAD(, ftl_io)		pending_queue;
81 
82 	/* List link */
83 	LIST_ENTRY(ftl_wptr)		list_entry;
84 
85 	/*
86 	 * If setup in direct mode, there will be no offset or band state update after IO.
87 	 * The zoned bdev address is not assigned by wptr, and is instead taken directly
88 	 * from the request.
89 	 */
90 	bool				direct_mode;
91 
92 	/* Number of outstanding write requests */
93 	uint32_t			num_outstanding;
94 
95 	/* Marks that the band related to this wptr needs to be closed as soon as possible */
96 	bool				flush;
97 };
98 
99 struct ftl_flush {
100 	/* Owner device */
101 	struct spdk_ftl_dev		*dev;
102 
103 	/* Number of batches to wait for */
104 	size_t				num_req;
105 
106 	/* Callback */
107 	struct {
108 		spdk_ftl_fn		fn;
109 		void			*ctx;
110 	} cb;
111 
112 	/* Batch bitmap */
113 	struct spdk_bit_array		*bmap;
114 
115 	/* List link */
116 	LIST_ENTRY(ftl_flush)		list_entry;
117 };
118 
119 static int
120 ftl_rwb_flags_from_io(const struct ftl_io *io)
121 {
122 	int valid_flags = FTL_IO_INTERNAL | FTL_IO_WEAK | FTL_IO_PAD;
123 	return io->flags & valid_flags;
124 }
125 
126 static int
127 ftl_rwb_entry_weak(const struct ftl_rwb_entry *entry)
128 {
129 	return entry->flags & FTL_IO_WEAK;
130 }
131 
132 static void
133 ftl_wptr_free(struct ftl_wptr *wptr)
134 {
135 	if (!wptr) {
136 		return;
137 	}
138 
139 	free(wptr);
140 }
141 
142 static void
143 ftl_remove_wptr(struct ftl_wptr *wptr)
144 {
145 	struct spdk_ftl_dev *dev = wptr->dev;
146 	struct ftl_band_flush *flush, *tmp;
147 
148 	if (spdk_unlikely(wptr->flush)) {
149 		LIST_FOREACH_SAFE(flush, &dev->band_flush_list, list_entry, tmp) {
150 			assert(flush->num_bands > 0);
151 			if (--flush->num_bands == 0) {
152 				flush->cb_fn(flush->cb_arg, 0);
153 				LIST_REMOVE(flush, list_entry);
154 				free(flush);
155 			}
156 		}
157 	}
158 
159 	LIST_REMOVE(wptr, list_entry);
160 	ftl_wptr_free(wptr);
161 }
162 
163 static void
164 ftl_io_cmpl_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
165 {
166 	struct ftl_io *io = cb_arg;
167 	struct spdk_ftl_dev *dev = io->dev;
168 
169 	if (spdk_unlikely(!success)) {
170 		io->status = -EIO;
171 	}
172 
173 	ftl_trace_completion(dev, io, FTL_TRACE_COMPLETION_DISK);
174 
175 	if (io->type == FTL_IO_WRITE && ftl_is_append_supported(dev)) {
176 		assert(io->parent);
177 		io->parent->addr.offset = spdk_bdev_io_get_append_location(bdev_io);
178 	}
179 
180 	ftl_io_dec_req(io);
181 	if (ftl_io_done(io)) {
182 		ftl_io_complete(io);
183 	}
184 
185 	spdk_bdev_free_io(bdev_io);
186 }
187 
188 static void
189 ftl_halt_writes(struct spdk_ftl_dev *dev, struct ftl_band *band)
190 {
191 	struct ftl_wptr *wptr = NULL;
192 
193 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
194 		if (wptr->band == band) {
195 			break;
196 		}
197 	}
198 
199 	/* If the band already has the high_prio flag set, other writes must */
200 	/* have failed earlier, so it's already taken care of. */
201 	if (band->high_prio) {
202 		assert(wptr == NULL);
203 		return;
204 	}
205 
206 	ftl_band_write_failed(band);
207 	ftl_remove_wptr(wptr);
208 }
209 
210 static struct ftl_wptr *
211 ftl_wptr_from_band(struct ftl_band *band)
212 {
213 	struct spdk_ftl_dev *dev = band->dev;
214 	struct ftl_wptr *wptr = NULL;
215 
216 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
217 		if (wptr->band == band) {
218 			return wptr;
219 		}
220 	}
221 
222 	return NULL;
223 }
224 
225 static void
226 ftl_md_write_fail(struct ftl_io *io, int status)
227 {
228 	struct ftl_band *band = io->band;
229 	struct ftl_wptr *wptr;
230 	char buf[128];
231 
232 	wptr = ftl_wptr_from_band(band);
233 	assert(wptr);
234 
235 	SPDK_ERRLOG("Metadata write failed @addr: %s, status: %d\n",
236 		    ftl_addr2str(wptr->addr, buf, sizeof(buf)), status);
237 
238 	ftl_halt_writes(io->dev, band);
239 }
240 
241 static void
242 ftl_md_write_cb(struct ftl_io *io, void *arg, int status)
243 {
244 	struct spdk_ftl_dev *dev = io->dev;
245 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
246 	struct ftl_band *band = io->band;
247 	struct ftl_wptr *wptr;
248 	size_t id;
249 
250 	wptr = ftl_wptr_from_band(band);
251 	assert(wptr);
252 
253 	if (status) {
254 		ftl_md_write_fail(io, status);
255 		return;
256 	}
257 
258 	ftl_band_set_next_state(band);
259 	if (band->state == FTL_BAND_STATE_CLOSED) {
260 		if (ftl_dev_has_nv_cache(dev)) {
261 			pthread_spin_lock(&nv_cache->lock);
262 			nv_cache->num_available += ftl_band_user_blocks(band);
263 
264 			if (spdk_unlikely(nv_cache->num_available > nv_cache->num_data_blocks)) {
265 				nv_cache->num_available = nv_cache->num_data_blocks;
266 			}
267 			pthread_spin_unlock(&nv_cache->lock);
268 		}
269 
270 		/*
271 		 * Go through the reloc_bitmap, checking for all the bands that had its data moved
272 		 * onto current band and update their counters to allow them to be used for writing
273 		 * (once they're closed and empty).
274 		 */
275 		for (id = 0; id < ftl_get_num_bands(dev); ++id) {
276 			if (spdk_bit_array_get(band->reloc_bitmap, id)) {
277 				assert(dev->bands[id].num_reloc_bands > 0);
278 				dev->bands[id].num_reloc_bands--;
279 
280 				spdk_bit_array_clear(band->reloc_bitmap, id);
281 			}
282 		}
283 
284 		ftl_remove_wptr(wptr);
285 	}
286 }
287 
288 static int
289 ftl_read_next_physical_addr(struct ftl_io *io, struct ftl_addr *addr)
290 {
291 	struct spdk_ftl_dev *dev = io->dev;
292 	size_t num_blocks, max_blocks;
293 
294 	assert(ftl_io_mode_physical(io));
295 	assert(io->iov_pos < io->iov_cnt);
296 
297 	if (io->pos == 0) {
298 		*addr = io->addr;
299 	} else {
300 		*addr = ftl_band_next_xfer_addr(io->band, io->addr, io->pos);
301 	}
302 
303 	assert(!ftl_addr_invalid(*addr));
304 
305 	/* Metadata has to be read in the way it's written (jumping across */
306 	/* the zones in xfer_size increments) */
307 	if (io->flags & FTL_IO_MD) {
308 		max_blocks = dev->xfer_size - (addr->offset % dev->xfer_size);
309 		num_blocks = spdk_min(ftl_io_iovec_len_left(io), max_blocks);
310 		assert(addr->offset / dev->xfer_size ==
311 		       (addr->offset + num_blocks - 1) / dev->xfer_size);
312 	} else {
313 		num_blocks = ftl_io_iovec_len_left(io);
314 	}
315 
316 	return num_blocks;
317 }
318 
319 static int
320 ftl_wptr_close_band(struct ftl_wptr *wptr)
321 {
322 	struct ftl_band *band = wptr->band;
323 
324 	ftl_band_set_state(band, FTL_BAND_STATE_CLOSING);
325 
326 	return ftl_band_write_tail_md(band, ftl_md_write_cb);
327 }
328 
329 static int
330 ftl_wptr_open_band(struct ftl_wptr *wptr)
331 {
332 	struct ftl_band *band = wptr->band;
333 
334 	assert(ftl_band_zone_is_first(band, wptr->zone));
335 	assert(band->lba_map.num_vld == 0);
336 
337 	ftl_band_clear_lba_map(band);
338 
339 	assert(band->state == FTL_BAND_STATE_PREP);
340 	ftl_band_set_state(band, FTL_BAND_STATE_OPENING);
341 
342 	return ftl_band_write_head_md(band, ftl_md_write_cb);
343 }
344 
345 static int
346 ftl_submit_erase(struct ftl_io *io)
347 {
348 	struct spdk_ftl_dev *dev = io->dev;
349 	struct ftl_band *band = io->band;
350 	struct ftl_addr addr = io->addr;
351 	struct ftl_io_channel *ioch;
352 	struct ftl_zone *zone;
353 	int rc = 0;
354 	size_t i;
355 
356 	ioch = spdk_io_channel_get_ctx(ftl_get_io_channel(dev));
357 
358 	for (i = 0; i < io->num_blocks; ++i) {
359 		if (i != 0) {
360 			zone = ftl_band_next_zone(band, ftl_band_zone_from_addr(band, addr));
361 			assert(zone->info.state == SPDK_BDEV_ZONE_STATE_FULL);
362 			addr.offset = zone->info.zone_id;
363 		}
364 
365 		assert(ftl_addr_get_zone_offset(dev, addr) == 0);
366 
367 		ftl_trace_submission(dev, io, addr, 1);
368 		rc = spdk_bdev_zone_management(dev->base_bdev_desc, ioch->base_ioch, addr.offset,
369 					       SPDK_BDEV_ZONE_RESET, ftl_io_cmpl_cb, io);
370 		if (spdk_unlikely(rc)) {
371 			ftl_io_fail(io, rc);
372 			SPDK_ERRLOG("Vector reset failed with status: %d\n", rc);
373 			break;
374 		}
375 
376 		ftl_io_inc_req(io);
377 		ftl_io_advance(io, 1);
378 	}
379 
380 	if (ftl_io_done(io)) {
381 		ftl_io_complete(io);
382 	}
383 
384 	return rc;
385 }
386 
387 static bool
388 ftl_check_core_thread(const struct spdk_ftl_dev *dev)
389 {
390 	return dev->core_thread.thread == spdk_get_thread();
391 }
392 
393 struct spdk_io_channel *
394 ftl_get_io_channel(const struct spdk_ftl_dev *dev)
395 {
396 	if (ftl_check_core_thread(dev)) {
397 		return dev->core_thread.ioch;
398 	}
399 
400 	return NULL;
401 }
402 
403 static void
404 ftl_erase_fail(struct ftl_io *io, int status)
405 {
406 	struct ftl_zone *zone;
407 	struct ftl_band *band = io->band;
408 	char buf[128];
409 
410 	SPDK_ERRLOG("Erase failed at address: %s, status: %d\n",
411 		    ftl_addr2str(io->addr, buf, sizeof(buf)), status);
412 
413 	zone = ftl_band_zone_from_addr(band, io->addr);
414 	zone->info.state = SPDK_BDEV_ZONE_STATE_OFFLINE;
415 	ftl_band_remove_zone(band, zone);
416 	band->tail_md_addr = ftl_band_tail_md_addr(band);
417 }
418 
419 static void
420 ftl_zone_erase_cb(struct ftl_io *io, void *ctx, int status)
421 {
422 	struct ftl_zone *zone;
423 
424 	zone = ftl_band_zone_from_addr(io->band, io->addr);
425 	zone->busy = false;
426 
427 	if (spdk_unlikely(status)) {
428 		ftl_erase_fail(io, status);
429 		return;
430 	}
431 
432 	zone->info.state = SPDK_BDEV_ZONE_STATE_EMPTY;
433 	zone->info.write_pointer = zone->info.zone_id;
434 }
435 
436 static int
437 ftl_band_erase(struct ftl_band *band)
438 {
439 	struct ftl_zone *zone;
440 	struct ftl_io *io;
441 	int rc = 0;
442 
443 	assert(band->state == FTL_BAND_STATE_CLOSED ||
444 	       band->state == FTL_BAND_STATE_FREE);
445 
446 	ftl_band_set_state(band, FTL_BAND_STATE_PREP);
447 
448 	CIRCLEQ_FOREACH(zone, &band->zones, circleq) {
449 		if (zone->info.state == SPDK_BDEV_ZONE_STATE_EMPTY) {
450 			continue;
451 		}
452 
453 		io = ftl_io_erase_init(band, 1, ftl_zone_erase_cb);
454 		if (!io) {
455 			rc = -ENOMEM;
456 			break;
457 		}
458 
459 		zone->busy = true;
460 		io->addr.offset = zone->info.zone_id;
461 		rc = ftl_submit_erase(io);
462 		if (rc) {
463 			zone->busy = false;
464 			assert(0);
465 			/* TODO: change band's state back to close? */
466 			break;
467 		}
468 	}
469 
470 	return rc;
471 }
472 
473 static struct ftl_band *
474 ftl_next_write_band(struct spdk_ftl_dev *dev)
475 {
476 	struct ftl_band *band;
477 
478 	/* Find a free band that has all of its data moved onto other closed bands */
479 	LIST_FOREACH(band, &dev->free_bands, list_entry) {
480 		assert(band->state == FTL_BAND_STATE_FREE);
481 		if (band->num_reloc_bands == 0 && band->num_reloc_blocks == 0) {
482 			break;
483 		}
484 	}
485 
486 	if (spdk_unlikely(!band)) {
487 		return NULL;
488 	}
489 
490 	if (ftl_band_erase(band)) {
491 		/* TODO: handle erase failure */
492 		return NULL;
493 	}
494 
495 	return band;
496 }
497 
498 static struct ftl_band *
499 ftl_next_wptr_band(struct spdk_ftl_dev *dev)
500 {
501 	struct ftl_band *band;
502 
503 	if (!dev->next_band) {
504 		band = ftl_next_write_band(dev);
505 	} else {
506 		assert(dev->next_band->state == FTL_BAND_STATE_PREP);
507 		band = dev->next_band;
508 		dev->next_band = NULL;
509 	}
510 
511 	return band;
512 }
513 
514 static struct ftl_wptr *
515 ftl_wptr_init(struct ftl_band *band)
516 {
517 	struct spdk_ftl_dev *dev = band->dev;
518 	struct ftl_wptr *wptr;
519 
520 	wptr = calloc(1, sizeof(*wptr));
521 	if (!wptr) {
522 		return NULL;
523 	}
524 
525 	wptr->dev = dev;
526 	wptr->band = band;
527 	wptr->zone = CIRCLEQ_FIRST(&band->zones);
528 	wptr->addr.offset = wptr->zone->info.zone_id;
529 	TAILQ_INIT(&wptr->pending_queue);
530 
531 	return wptr;
532 }
533 
534 static int
535 ftl_add_direct_wptr(struct ftl_band *band)
536 {
537 	struct spdk_ftl_dev *dev = band->dev;
538 	struct ftl_wptr *wptr;
539 
540 	assert(band->state == FTL_BAND_STATE_OPEN);
541 
542 	wptr = ftl_wptr_init(band);
543 	if (!wptr) {
544 		return -1;
545 	}
546 
547 	wptr->direct_mode = true;
548 
549 	if (ftl_band_alloc_lba_map(band)) {
550 		ftl_wptr_free(wptr);
551 		return -1;
552 	}
553 
554 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
555 
556 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: direct band %u\n", band->id);
557 	ftl_trace_write_band(dev, band);
558 	return 0;
559 }
560 
561 static void
562 ftl_close_direct_wptr(struct ftl_band *band)
563 {
564 	struct ftl_wptr *wptr = ftl_wptr_from_band(band);
565 
566 	assert(wptr);
567 	assert(wptr->direct_mode);
568 	assert(band->state == FTL_BAND_STATE_CLOSED);
569 
570 	ftl_band_release_lba_map(band);
571 
572 	ftl_remove_wptr(wptr);
573 }
574 
575 int
576 ftl_band_set_direct_access(struct ftl_band *band, bool access)
577 {
578 	if (access) {
579 		return ftl_add_direct_wptr(band);
580 	} else {
581 		ftl_close_direct_wptr(band);
582 		return 0;
583 	}
584 }
585 
586 static int
587 ftl_add_wptr(struct spdk_ftl_dev *dev)
588 {
589 	struct ftl_band *band;
590 	struct ftl_wptr *wptr;
591 
592 	band = ftl_next_wptr_band(dev);
593 	if (!band) {
594 		return -1;
595 	}
596 
597 	wptr = ftl_wptr_init(band);
598 	if (!wptr) {
599 		return -1;
600 	}
601 
602 	if (ftl_band_write_prep(band)) {
603 		ftl_wptr_free(wptr);
604 		return -1;
605 	}
606 
607 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
608 
609 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: band %u\n", band->id);
610 	ftl_trace_write_band(dev, band);
611 	return 0;
612 }
613 
614 static void
615 ftl_wptr_advance(struct ftl_wptr *wptr, size_t xfer_size)
616 {
617 	struct ftl_band *band = wptr->band;
618 	struct spdk_ftl_dev *dev = wptr->dev;
619 	struct spdk_ftl_conf *conf = &dev->conf;
620 	size_t next_thld;
621 
622 	if (spdk_unlikely(wptr->direct_mode)) {
623 		return;
624 	}
625 
626 	wptr->offset += xfer_size;
627 	next_thld = (ftl_band_num_usable_blocks(band) * conf->band_thld) / 100;
628 
629 	if (ftl_band_full(band, wptr->offset)) {
630 		ftl_band_set_state(band, FTL_BAND_STATE_FULL);
631 	}
632 
633 	wptr->zone->busy = true;
634 	wptr->addr = ftl_band_next_xfer_addr(band, wptr->addr, xfer_size);
635 	wptr->zone = ftl_band_next_operational_zone(band, wptr->zone);
636 
637 	assert(!ftl_addr_invalid(wptr->addr));
638 
639 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: pu:%lu band:%lu, offset:%lu\n",
640 		      ftl_addr_get_punit(dev, wptr->addr),
641 		      ftl_addr_get_band(dev, wptr->addr),
642 		      wptr->addr.offset);
643 
644 	if (wptr->offset >= next_thld && !dev->next_band) {
645 		dev->next_band = ftl_next_write_band(dev);
646 	}
647 }
648 
649 static size_t
650 ftl_wptr_user_blocks_left(const struct ftl_wptr *wptr)
651 {
652 	return ftl_band_user_blocks_left(wptr->band, wptr->offset);
653 }
654 
655 static bool
656 ftl_wptr_ready(struct ftl_wptr *wptr)
657 {
658 	struct ftl_band *band = wptr->band;
659 
660 	/* TODO: add handling of empty bands */
661 
662 	if (spdk_unlikely(!ftl_zone_is_writable(wptr->zone))) {
663 		/* Erasing band may fail after it was assigned to wptr. */
664 		if (spdk_unlikely(wptr->zone->info.state == SPDK_BDEV_ZONE_STATE_OFFLINE)) {
665 			ftl_wptr_advance(wptr, wptr->dev->xfer_size);
666 		}
667 		return false;
668 	}
669 
670 	/* If we're in the process of writing metadata, wait till it is */
671 	/* completed. */
672 	/* TODO: we should probably change bands once we're writing tail md */
673 	if (ftl_band_state_changing(band)) {
674 		return false;
675 	}
676 
677 	if (band->state == FTL_BAND_STATE_FULL) {
678 		if (wptr->num_outstanding == 0) {
679 			if (ftl_wptr_close_band(wptr)) {
680 				/* TODO: need recovery here */
681 				assert(false);
682 			}
683 		}
684 
685 		return false;
686 	}
687 
688 	if (band->state != FTL_BAND_STATE_OPEN) {
689 		if (ftl_wptr_open_band(wptr)) {
690 			/* TODO: need recovery here */
691 			assert(false);
692 		}
693 
694 		return false;
695 	}
696 
697 	return true;
698 }
699 
700 int
701 ftl_flush_active_bands(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
702 {
703 	struct ftl_wptr *wptr;
704 	struct ftl_band_flush *flush;
705 
706 	assert(ftl_get_core_thread(dev) == spdk_get_thread());
707 
708 	flush = calloc(1, sizeof(*flush));
709 	if (spdk_unlikely(!flush)) {
710 		return -ENOMEM;
711 	}
712 
713 	LIST_INSERT_HEAD(&dev->band_flush_list, flush, list_entry);
714 
715 	flush->cb_fn = cb_fn;
716 	flush->cb_arg = cb_arg;
717 	flush->dev = dev;
718 
719 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
720 		wptr->flush = true;
721 		flush->num_bands++;
722 	}
723 
724 	return 0;
725 }
726 
727 static const struct spdk_ftl_limit *
728 ftl_get_limit(const struct spdk_ftl_dev *dev, int type)
729 {
730 	assert(type < SPDK_FTL_LIMIT_MAX);
731 	return &dev->conf.limits[type];
732 }
733 
734 static bool
735 ftl_cache_lba_valid(struct spdk_ftl_dev *dev, struct ftl_rwb_entry *entry)
736 {
737 	struct ftl_addr addr;
738 
739 	/* If the LBA is invalid don't bother checking the md and l2p */
740 	if (spdk_unlikely(entry->lba == FTL_LBA_INVALID)) {
741 		return false;
742 	}
743 
744 	addr = ftl_l2p_get(dev, entry->lba);
745 	if (!(ftl_addr_cached(addr) && addr.cache_offset == entry->pos)) {
746 		return false;
747 	}
748 
749 	return true;
750 }
751 
752 static void
753 ftl_evict_cache_entry(struct spdk_ftl_dev *dev, struct ftl_rwb_entry *entry)
754 {
755 	pthread_spin_lock(&entry->lock);
756 
757 	if (!ftl_rwb_entry_valid(entry)) {
758 		goto unlock;
759 	}
760 
761 	/* If the l2p wasn't updated and still points at the entry, fill it with the */
762 	/* on-disk address and clear the cache status bit. Otherwise, skip the l2p update */
763 	/* and just clear the cache status. */
764 	if (!ftl_cache_lba_valid(dev, entry)) {
765 		goto clear;
766 	}
767 
768 	ftl_l2p_set(dev, entry->lba, entry->addr);
769 clear:
770 	ftl_rwb_entry_invalidate(entry);
771 unlock:
772 	pthread_spin_unlock(&entry->lock);
773 }
774 
775 static struct ftl_rwb_entry *
776 ftl_acquire_entry(struct spdk_ftl_dev *dev, int flags)
777 {
778 	struct ftl_rwb_entry *entry;
779 
780 	entry = ftl_rwb_acquire(dev->rwb, ftl_rwb_type_from_flags(flags));
781 	if (!entry) {
782 		return NULL;
783 	}
784 
785 	ftl_evict_cache_entry(dev, entry);
786 
787 	entry->flags = flags;
788 	return entry;
789 }
790 
791 static void
792 ftl_rwb_pad(struct spdk_ftl_dev *dev, size_t size)
793 {
794 	struct ftl_rwb_entry *entry;
795 	int flags = FTL_IO_PAD | FTL_IO_INTERNAL;
796 
797 	for (size_t i = 0; i < size; ++i) {
798 		entry = ftl_acquire_entry(dev, flags);
799 		if (!entry) {
800 			break;
801 		}
802 
803 		entry->lba = FTL_LBA_INVALID;
804 		entry->addr = ftl_to_addr(FTL_ADDR_INVALID);
805 		memset(entry->data, 0, FTL_BLOCK_SIZE);
806 		ftl_rwb_push(entry);
807 	}
808 }
809 
810 static void
811 ftl_remove_free_bands(struct spdk_ftl_dev *dev)
812 {
813 	while (!LIST_EMPTY(&dev->free_bands)) {
814 		LIST_REMOVE(LIST_FIRST(&dev->free_bands), list_entry);
815 	}
816 
817 	dev->next_band = NULL;
818 }
819 
820 static void
821 ftl_wptr_pad_band(struct ftl_wptr *wptr)
822 {
823 	struct spdk_ftl_dev *dev = wptr->dev;
824 	size_t size = ftl_rwb_num_pending(dev->rwb);
825 	size_t blocks_left, rwb_size, pad_size;
826 
827 	blocks_left = ftl_wptr_user_blocks_left(wptr);
828 	assert(size <= blocks_left);
829 	assert(blocks_left % dev->xfer_size == 0);
830 	rwb_size = ftl_rwb_size(dev->rwb) - size;
831 	pad_size = spdk_min(blocks_left - size, rwb_size);
832 
833 	/* Pad write buffer until band is full */
834 	ftl_rwb_pad(dev, pad_size);
835 }
836 
837 static void
838 ftl_wptr_process_shutdown(struct ftl_wptr *wptr)
839 {
840 	struct spdk_ftl_dev *dev = wptr->dev;
841 	size_t size = ftl_rwb_num_pending(dev->rwb);
842 	size_t num_active = dev->xfer_size * ftl_rwb_get_active_batches(dev->rwb);
843 
844 	num_active = num_active ? num_active : dev->xfer_size;
845 	if (size >= num_active) {
846 		return;
847 	}
848 
849 	/* If we reach this point we need to remove free bands */
850 	/* and pad current wptr band to the end */
851 	if (ftl_rwb_get_active_batches(dev->rwb) <= 1) {
852 		ftl_remove_free_bands(dev);
853 	}
854 
855 	ftl_wptr_pad_band(wptr);
856 }
857 
858 static int
859 ftl_shutdown_complete(struct spdk_ftl_dev *dev)
860 {
861 	return !__atomic_load_n(&dev->num_inflight, __ATOMIC_SEQ_CST) &&
862 	       LIST_EMPTY(&dev->wptr_list) && TAILQ_EMPTY(&dev->retry_queue);
863 }
864 
865 void
866 ftl_apply_limits(struct spdk_ftl_dev *dev)
867 {
868 	const struct spdk_ftl_limit *limit;
869 	struct ftl_stats *stats = &dev->stats;
870 	size_t rwb_limit[FTL_RWB_TYPE_MAX];
871 	int i;
872 
873 	ftl_rwb_get_limits(dev->rwb, rwb_limit);
874 
875 	/* Clear existing limit */
876 	dev->limit = SPDK_FTL_LIMIT_MAX;
877 
878 	for (i = SPDK_FTL_LIMIT_CRIT; i < SPDK_FTL_LIMIT_MAX; ++i) {
879 		limit = ftl_get_limit(dev, i);
880 
881 		if (dev->num_free <= limit->thld) {
882 			rwb_limit[FTL_RWB_TYPE_USER] =
883 				(limit->limit * ftl_rwb_entry_cnt(dev->rwb)) / 100;
884 			stats->limits[i]++;
885 			dev->limit = i;
886 			goto apply;
887 		}
888 	}
889 
890 	/* Clear the limits, since we don't need to apply them anymore */
891 	rwb_limit[FTL_RWB_TYPE_USER] = ftl_rwb_entry_cnt(dev->rwb);
892 apply:
893 	ftl_trace_limits(dev, rwb_limit, dev->num_free);
894 	ftl_rwb_set_limits(dev->rwb, rwb_limit);
895 }
896 
897 static int
898 ftl_invalidate_addr_unlocked(struct spdk_ftl_dev *dev, struct ftl_addr addr)
899 {
900 	struct ftl_band *band = ftl_band_from_addr(dev, addr);
901 	struct ftl_lba_map *lba_map = &band->lba_map;
902 	uint64_t offset;
903 
904 	offset = ftl_band_block_offset_from_addr(band, addr);
905 
906 	/* The bit might be already cleared if two writes are scheduled to the */
907 	/* same LBA at the same time */
908 	if (spdk_bit_array_get(lba_map->vld, offset)) {
909 		assert(lba_map->num_vld > 0);
910 		spdk_bit_array_clear(lba_map->vld, offset);
911 		lba_map->num_vld--;
912 		return 1;
913 	}
914 
915 	return 0;
916 }
917 
918 int
919 ftl_invalidate_addr(struct spdk_ftl_dev *dev, struct ftl_addr addr)
920 {
921 	struct ftl_band *band;
922 	int rc;
923 
924 	assert(!ftl_addr_cached(addr));
925 	band = ftl_band_from_addr(dev, addr);
926 
927 	pthread_spin_lock(&band->lba_map.lock);
928 	rc = ftl_invalidate_addr_unlocked(dev, addr);
929 	pthread_spin_unlock(&band->lba_map.lock);
930 
931 	return rc;
932 }
933 
934 static int
935 ftl_read_retry(int rc)
936 {
937 	return rc == -EAGAIN;
938 }
939 
940 static int
941 ftl_read_canceled(int rc)
942 {
943 	return rc == -EFAULT || rc == 0;
944 }
945 
946 static void
947 ftl_add_to_retry_queue(struct ftl_io *io)
948 {
949 	if (!(io->flags & FTL_IO_RETRY)) {
950 		io->flags |= FTL_IO_RETRY;
951 		TAILQ_INSERT_TAIL(&io->dev->retry_queue, io, retry_entry);
952 	}
953 }
954 
955 static int
956 ftl_cache_read(struct ftl_io *io, uint64_t lba,
957 	       struct ftl_addr addr, void *buf)
958 {
959 	struct ftl_rwb *rwb = io->dev->rwb;
960 	struct ftl_rwb_entry *entry;
961 	struct ftl_addr naddr;
962 	int rc = 0;
963 
964 	entry = ftl_rwb_entry_from_offset(rwb, addr.cache_offset);
965 	pthread_spin_lock(&entry->lock);
966 
967 	naddr = ftl_l2p_get(io->dev, lba);
968 	if (addr.offset != naddr.offset) {
969 		rc = -1;
970 		goto out;
971 	}
972 
973 	memcpy(buf, entry->data, FTL_BLOCK_SIZE);
974 out:
975 	pthread_spin_unlock(&entry->lock);
976 	return rc;
977 }
978 
979 static int
980 ftl_read_next_logical_addr(struct ftl_io *io, struct ftl_addr *addr)
981 {
982 	struct spdk_ftl_dev *dev = io->dev;
983 	struct ftl_addr next_addr;
984 	size_t i;
985 
986 	*addr = ftl_l2p_get(dev, ftl_io_current_lba(io));
987 
988 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Read addr:%lx, lba:%lu\n",
989 		      addr->offset, ftl_io_current_lba(io));
990 
991 	/* If the address is invalid, skip it (the buffer should already be zero'ed) */
992 	if (ftl_addr_invalid(*addr)) {
993 		return -EFAULT;
994 	}
995 
996 	if (ftl_addr_cached(*addr)) {
997 		if (!ftl_cache_read(io, ftl_io_current_lba(io), *addr, ftl_io_iovec_addr(io))) {
998 			return 0;
999 		}
1000 
1001 		/* If the state changed, we have to re-read the l2p */
1002 		return -EAGAIN;
1003 	}
1004 
1005 	for (i = 1; i < ftl_io_iovec_len_left(io); ++i) {
1006 		next_addr = ftl_l2p_get(dev, ftl_io_get_lba(io, io->pos + i));
1007 
1008 		if (ftl_addr_invalid(next_addr) || ftl_addr_cached(next_addr)) {
1009 			break;
1010 		}
1011 
1012 		if (addr->offset + i != next_addr.offset) {
1013 			break;
1014 		}
1015 	}
1016 
1017 	return i;
1018 }
1019 
1020 static int
1021 ftl_submit_read(struct ftl_io *io)
1022 {
1023 	struct spdk_ftl_dev *dev = io->dev;
1024 	struct ftl_io_channel *ioch;
1025 	struct ftl_addr addr;
1026 	int rc = 0, num_blocks;
1027 
1028 	ioch = spdk_io_channel_get_ctx(io->ioch);
1029 
1030 	assert(LIST_EMPTY(&io->children));
1031 
1032 	while (io->pos < io->num_blocks) {
1033 		if (ftl_io_mode_physical(io)) {
1034 			num_blocks = rc = ftl_read_next_physical_addr(io, &addr);
1035 		} else {
1036 			num_blocks = rc = ftl_read_next_logical_addr(io, &addr);
1037 		}
1038 
1039 		/* We might need to retry the read from scratch (e.g. */
1040 		/* because write was under way and completed before */
1041 		/* we could read it from rwb */
1042 		if (ftl_read_retry(rc)) {
1043 			continue;
1044 		}
1045 
1046 		/* We don't have to schedule the read, as it was read from cache */
1047 		if (ftl_read_canceled(rc)) {
1048 			ftl_io_advance(io, 1);
1049 			ftl_trace_completion(io->dev, io, rc ? FTL_TRACE_COMPLETION_INVALID :
1050 					     FTL_TRACE_COMPLETION_CACHE);
1051 			rc = 0;
1052 			continue;
1053 		}
1054 
1055 		assert(num_blocks > 0);
1056 
1057 		ftl_trace_submission(dev, io, addr, num_blocks);
1058 		rc = spdk_bdev_read_blocks(dev->base_bdev_desc, ioch->base_ioch,
1059 					   ftl_io_iovec_addr(io),
1060 					   addr.offset,
1061 					   num_blocks, ftl_io_cmpl_cb, io);
1062 		if (spdk_unlikely(rc)) {
1063 			if (rc == -ENOMEM) {
1064 				ftl_add_to_retry_queue(io);
1065 			} else {
1066 				ftl_io_fail(io, rc);
1067 			}
1068 			break;
1069 		}
1070 
1071 		ftl_io_inc_req(io);
1072 		ftl_io_advance(io, num_blocks);
1073 	}
1074 
1075 	/* If we didn't have to read anything from the device, */
1076 	/* complete the request right away */
1077 	if (ftl_io_done(io)) {
1078 		ftl_io_complete(io);
1079 	}
1080 
1081 	return rc;
1082 }
1083 
1084 static void
1085 ftl_complete_flush(struct ftl_flush *flush)
1086 {
1087 	assert(flush->num_req == 0);
1088 	LIST_REMOVE(flush, list_entry);
1089 
1090 	flush->cb.fn(flush->cb.ctx, 0);
1091 
1092 	spdk_bit_array_free(&flush->bmap);
1093 	free(flush);
1094 }
1095 
1096 static void
1097 ftl_process_flush(struct spdk_ftl_dev *dev, struct ftl_rwb_batch *batch)
1098 {
1099 	struct ftl_flush *flush, *tflush;
1100 	size_t offset;
1101 
1102 	LIST_FOREACH_SAFE(flush, &dev->flush_list, list_entry, tflush) {
1103 		offset = ftl_rwb_batch_get_offset(batch);
1104 
1105 		if (spdk_bit_array_get(flush->bmap, offset)) {
1106 			spdk_bit_array_clear(flush->bmap, offset);
1107 			if (!(--flush->num_req)) {
1108 				ftl_complete_flush(flush);
1109 			}
1110 		}
1111 	}
1112 }
1113 
1114 static void
1115 ftl_nv_cache_wrap_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1116 {
1117 	struct ftl_nv_cache *nv_cache = cb_arg;
1118 
1119 	if (!success) {
1120 		SPDK_ERRLOG("Unable to write non-volatile cache metadata header\n");
1121 		/* TODO: go into read-only mode */
1122 		assert(0);
1123 	}
1124 
1125 	pthread_spin_lock(&nv_cache->lock);
1126 	nv_cache->ready = true;
1127 	pthread_spin_unlock(&nv_cache->lock);
1128 
1129 	spdk_bdev_free_io(bdev_io);
1130 }
1131 
1132 static void
1133 ftl_nv_cache_wrap(void *ctx)
1134 {
1135 	struct ftl_nv_cache *nv_cache = ctx;
1136 	int rc;
1137 
1138 	rc = ftl_nv_cache_write_header(nv_cache, false, ftl_nv_cache_wrap_cb, nv_cache);
1139 	if (spdk_unlikely(rc != 0)) {
1140 		SPDK_ERRLOG("Unable to write non-volatile cache metadata header: %s\n",
1141 			    spdk_strerror(-rc));
1142 		/* TODO: go into read-only mode */
1143 		assert(0);
1144 	}
1145 }
1146 
1147 static uint64_t
1148 ftl_reserve_nv_cache(struct ftl_nv_cache *nv_cache, size_t *num_blocks, unsigned int *phase)
1149 {
1150 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1151 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1152 	uint64_t num_available, cache_size, cache_addr = FTL_LBA_INVALID;
1153 
1154 	cache_size = spdk_bdev_get_num_blocks(bdev);
1155 
1156 	pthread_spin_lock(&nv_cache->lock);
1157 	if (spdk_unlikely(nv_cache->num_available == 0 || !nv_cache->ready)) {
1158 		goto out;
1159 	}
1160 
1161 	num_available = spdk_min(nv_cache->num_available, *num_blocks);
1162 	num_available = spdk_min(num_available, dev->conf.nv_cache.max_request_cnt);
1163 
1164 	if (spdk_unlikely(nv_cache->current_addr + num_available > cache_size)) {
1165 		*num_blocks = cache_size - nv_cache->current_addr;
1166 	} else {
1167 		*num_blocks = num_available;
1168 	}
1169 
1170 	cache_addr = nv_cache->current_addr;
1171 	nv_cache->current_addr += *num_blocks;
1172 	nv_cache->num_available -= *num_blocks;
1173 	*phase = nv_cache->phase;
1174 
1175 	if (nv_cache->current_addr == spdk_bdev_get_num_blocks(bdev)) {
1176 		nv_cache->current_addr = FTL_NV_CACHE_DATA_OFFSET;
1177 		nv_cache->phase = ftl_nv_cache_next_phase(nv_cache->phase);
1178 		nv_cache->ready = false;
1179 		spdk_thread_send_msg(ftl_get_core_thread(dev), ftl_nv_cache_wrap, nv_cache);
1180 	}
1181 out:
1182 	pthread_spin_unlock(&nv_cache->lock);
1183 	return cache_addr;
1184 }
1185 
1186 static struct ftl_io *
1187 ftl_alloc_io_nv_cache(struct ftl_io *parent, size_t num_blocks)
1188 {
1189 	struct ftl_io_init_opts opts = {
1190 		.dev		= parent->dev,
1191 		.parent		= parent,
1192 		.data		= ftl_io_iovec_addr(parent),
1193 		.num_blocks	= num_blocks,
1194 		.flags		= parent->flags | FTL_IO_CACHE,
1195 	};
1196 
1197 	return ftl_io_init_internal(&opts);
1198 }
1199 
1200 static void
1201 ftl_nv_cache_submit_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1202 {
1203 	struct ftl_io *io = cb_arg;
1204 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1205 
1206 	if (spdk_unlikely(!success)) {
1207 		SPDK_ERRLOG("Non-volatile cache write failed at %"PRIx64"\n", io->addr.offset);
1208 		io->status = -EIO;
1209 	}
1210 
1211 	ftl_io_dec_req(io);
1212 	if (ftl_io_done(io)) {
1213 		spdk_mempool_put(nv_cache->md_pool, io->md);
1214 		ftl_io_complete(io);
1215 	}
1216 
1217 	spdk_bdev_free_io(bdev_io);
1218 }
1219 
1220 static void
1221 ftl_submit_nv_cache(void *ctx)
1222 {
1223 	struct ftl_io *io = ctx;
1224 	struct spdk_ftl_dev *dev = io->dev;
1225 	struct spdk_thread *thread;
1226 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
1227 	struct ftl_io_channel *ioch;
1228 	int rc;
1229 
1230 	ioch = spdk_io_channel_get_ctx(io->ioch);
1231 	thread = spdk_io_channel_get_thread(io->ioch);
1232 
1233 	rc = spdk_bdev_write_blocks_with_md(nv_cache->bdev_desc, ioch->cache_ioch,
1234 					    ftl_io_iovec_addr(io), io->md, io->addr.offset,
1235 					    io->num_blocks, ftl_nv_cache_submit_cb, io);
1236 	if (rc == -ENOMEM) {
1237 		spdk_thread_send_msg(thread, ftl_submit_nv_cache, io);
1238 		return;
1239 	} else if (rc) {
1240 		SPDK_ERRLOG("Write to persistent cache failed: %s (%"PRIu64", %"PRIu64")\n",
1241 			    spdk_strerror(-rc), io->addr.offset, io->num_blocks);
1242 		spdk_mempool_put(nv_cache->md_pool, io->md);
1243 		io->status = -EIO;
1244 		ftl_io_complete(io);
1245 		return;
1246 	}
1247 
1248 	ftl_io_advance(io, io->num_blocks);
1249 	ftl_io_inc_req(io);
1250 }
1251 
1252 static void
1253 ftl_nv_cache_fill_md(struct ftl_io *io, unsigned int phase)
1254 {
1255 	struct spdk_bdev *bdev;
1256 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1257 	uint64_t block_off, lba;
1258 	void *md_buf = io->md;
1259 
1260 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1261 
1262 	for (block_off = 0; block_off < io->num_blocks; ++block_off) {
1263 		lba = ftl_nv_cache_pack_lba(ftl_io_get_lba(io, block_off), phase);
1264 		memcpy(md_buf, &lba, sizeof(lba));
1265 		md_buf += spdk_bdev_get_md_size(bdev);
1266 	}
1267 }
1268 
1269 static void
1270 _ftl_write_nv_cache(void *ctx)
1271 {
1272 	struct ftl_io *child, *io = ctx;
1273 	struct spdk_ftl_dev *dev = io->dev;
1274 	struct spdk_thread *thread;
1275 	unsigned int phase;
1276 	uint64_t num_blocks;
1277 
1278 	thread = spdk_io_channel_get_thread(io->ioch);
1279 
1280 	while (io->pos < io->num_blocks) {
1281 		num_blocks = ftl_io_iovec_len_left(io);
1282 
1283 		child = ftl_alloc_io_nv_cache(io, num_blocks);
1284 		if (spdk_unlikely(!child)) {
1285 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1286 			return;
1287 		}
1288 
1289 		child->md = spdk_mempool_get(dev->nv_cache.md_pool);
1290 		if (spdk_unlikely(!child->md)) {
1291 			ftl_io_free(child);
1292 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1293 			break;
1294 		}
1295 
1296 		/* Reserve area on the write buffer cache */
1297 		child->addr.offset = ftl_reserve_nv_cache(&dev->nv_cache, &num_blocks, &phase);
1298 		if (child->addr.offset == FTL_LBA_INVALID) {
1299 			spdk_mempool_put(dev->nv_cache.md_pool, child->md);
1300 			ftl_io_free(child);
1301 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1302 			break;
1303 		}
1304 
1305 		/* Shrink the IO if there isn't enough room in the cache to fill the whole iovec */
1306 		if (spdk_unlikely(num_blocks != ftl_io_iovec_len_left(io))) {
1307 			ftl_io_shrink_iovec(child, num_blocks);
1308 		}
1309 
1310 		ftl_nv_cache_fill_md(child, phase);
1311 		ftl_submit_nv_cache(child);
1312 	}
1313 
1314 	if (ftl_io_done(io)) {
1315 		ftl_io_complete(io);
1316 	}
1317 }
1318 
1319 static void
1320 ftl_write_nv_cache(struct ftl_io *parent)
1321 {
1322 	ftl_io_reset(parent);
1323 	parent->flags |= FTL_IO_CACHE;
1324 	_ftl_write_nv_cache(parent);
1325 }
1326 
1327 int
1328 ftl_nv_cache_write_header(struct ftl_nv_cache *nv_cache, bool shutdown,
1329 			  spdk_bdev_io_completion_cb cb_fn, void *cb_arg)
1330 {
1331 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1332 	struct ftl_nv_cache_header *hdr = nv_cache->dma_buf;
1333 	struct spdk_bdev *bdev;
1334 	struct ftl_io_channel *ioch;
1335 
1336 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1337 	ioch = spdk_io_channel_get_ctx(ftl_get_io_channel(dev));
1338 
1339 	memset(hdr, 0, spdk_bdev_get_block_size(bdev));
1340 
1341 	hdr->phase = (uint8_t)nv_cache->phase;
1342 	hdr->size = spdk_bdev_get_num_blocks(bdev);
1343 	hdr->uuid = dev->uuid;
1344 	hdr->version = FTL_NV_CACHE_HEADER_VERSION;
1345 	hdr->current_addr = shutdown ? nv_cache->current_addr : FTL_LBA_INVALID;
1346 	hdr->checksum = spdk_crc32c_update(hdr, offsetof(struct ftl_nv_cache_header, checksum), 0);
1347 
1348 	return spdk_bdev_write_blocks(nv_cache->bdev_desc, ioch->cache_ioch, hdr, 0, 1,
1349 				      cb_fn, cb_arg);
1350 }
1351 
1352 int
1353 ftl_nv_cache_scrub(struct ftl_nv_cache *nv_cache, spdk_bdev_io_completion_cb cb_fn, void *cb_arg)
1354 {
1355 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
1356 	struct ftl_io_channel *ioch;
1357 	struct spdk_bdev *bdev;
1358 
1359 	ioch = spdk_io_channel_get_ctx(ftl_get_io_channel(dev));
1360 	bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1361 
1362 	return spdk_bdev_write_zeroes_blocks(nv_cache->bdev_desc, ioch->cache_ioch, 1,
1363 					     spdk_bdev_get_num_blocks(bdev) - 1,
1364 					     cb_fn, cb_arg);
1365 }
1366 
1367 static void
1368 ftl_write_fail(struct ftl_io *io, int status)
1369 {
1370 	struct ftl_rwb_batch *batch = io->rwb_batch;
1371 	struct spdk_ftl_dev *dev = io->dev;
1372 	struct ftl_rwb_entry *entry;
1373 	struct ftl_band *band;
1374 	char buf[128];
1375 
1376 	entry = ftl_rwb_batch_first_entry(batch);
1377 
1378 	band = ftl_band_from_addr(io->dev, entry->addr);
1379 	SPDK_ERRLOG("Write failed @addr: %s, status: %d\n",
1380 		    ftl_addr2str(entry->addr, buf, sizeof(buf)), status);
1381 
1382 	/* Close the band and, halt wptr and defrag */
1383 	ftl_halt_writes(dev, band);
1384 
1385 	ftl_rwb_foreach(entry, batch) {
1386 		/* Invalidate meta set by process_writes() */
1387 		ftl_invalidate_addr(dev, entry->addr);
1388 	}
1389 
1390 	/* Reset the batch back to the the RWB to resend it later */
1391 	ftl_rwb_batch_revert(batch);
1392 }
1393 
1394 static void
1395 ftl_write_cb(struct ftl_io *io, void *arg, int status)
1396 {
1397 	struct spdk_ftl_dev *dev = io->dev;
1398 	struct ftl_rwb_batch *batch = io->rwb_batch;
1399 	struct ftl_rwb_entry *entry;
1400 	struct ftl_band *band;
1401 	struct ftl_addr prev_addr, addr = io->addr;
1402 
1403 	if (status) {
1404 		ftl_write_fail(io, status);
1405 		return;
1406 	}
1407 
1408 	assert(io->num_blocks == dev->xfer_size);
1409 	assert(!(io->flags & FTL_IO_MD));
1410 
1411 	ftl_rwb_foreach(entry, batch) {
1412 		band = entry->band;
1413 		if (!(entry->flags & FTL_IO_PAD)) {
1414 			/* Verify that the LBA is set for user blocks */
1415 			assert(entry->lba != FTL_LBA_INVALID);
1416 		}
1417 
1418 		if (band != NULL) {
1419 			assert(band->num_reloc_blocks > 0);
1420 			band->num_reloc_blocks--;
1421 		}
1422 
1423 		entry->addr = addr;
1424 		if (entry->lba != FTL_LBA_INVALID) {
1425 			pthread_spin_lock(&entry->lock);
1426 			prev_addr = ftl_l2p_get(dev, entry->lba);
1427 
1428 			/* If the l2p was updated in the meantime, don't update band's metadata */
1429 			if (ftl_addr_cached(prev_addr) && prev_addr.cache_offset == entry->pos) {
1430 				/* Setting entry's cache bit needs to be done after metadata */
1431 				/* within the band is updated to make sure that writes */
1432 				/* invalidating the entry clear the metadata as well */
1433 				ftl_band_set_addr(io->band, entry->lba, entry->addr);
1434 				ftl_rwb_entry_set_valid(entry);
1435 			}
1436 			pthread_spin_unlock(&entry->lock);
1437 		}
1438 
1439 		SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write addr:%lu, lba:%lu\n",
1440 			      entry->addr.offset, entry->lba);
1441 
1442 		addr = ftl_band_next_addr(io->band, addr, 1);
1443 	}
1444 
1445 	ftl_process_flush(dev, batch);
1446 	ftl_rwb_batch_release(batch);
1447 }
1448 
1449 static void
1450 ftl_update_rwb_stats(struct spdk_ftl_dev *dev, const struct ftl_rwb_entry *entry)
1451 {
1452 	if (!ftl_rwb_entry_internal(entry)) {
1453 		dev->stats.write_user++;
1454 	}
1455 	dev->stats.write_total++;
1456 }
1457 
1458 static void
1459 ftl_update_l2p(struct spdk_ftl_dev *dev, const struct ftl_rwb_entry *entry,
1460 	       struct ftl_addr addr)
1461 {
1462 	struct ftl_addr prev_addr;
1463 	struct ftl_rwb_entry *prev;
1464 	struct ftl_band *band;
1465 	int valid;
1466 
1467 	prev_addr = ftl_l2p_get(dev, entry->lba);
1468 	if (ftl_addr_invalid(prev_addr)) {
1469 		ftl_l2p_set(dev, entry->lba, addr);
1470 		return;
1471 	}
1472 
1473 	/* If the L2P's physical address is different than what we expected we don't need to */
1474 	/* do anything (someone's already overwritten our data). */
1475 	if (ftl_rwb_entry_weak(entry) && !ftl_addr_cmp(prev_addr, entry->addr)) {
1476 		return;
1477 	}
1478 
1479 	if (ftl_addr_cached(prev_addr)) {
1480 		assert(!ftl_rwb_entry_weak(entry));
1481 		prev = ftl_rwb_entry_from_offset(dev->rwb, prev_addr.cache_offset);
1482 		pthread_spin_lock(&prev->lock);
1483 
1484 		/* Re-read the L2P under the lock to protect against updates */
1485 		/* to this LBA from other threads */
1486 		prev_addr = ftl_l2p_get(dev, entry->lba);
1487 
1488 		/* If the entry is no longer in cache, another write has been */
1489 		/* scheduled in the meantime, so we have to invalidate its LBA */
1490 		if (!ftl_addr_cached(prev_addr)) {
1491 			ftl_invalidate_addr(dev, prev_addr);
1492 		}
1493 
1494 		/* If previous entry is part of cache, remove and invalidate it */
1495 		if (ftl_rwb_entry_valid(prev)) {
1496 			ftl_invalidate_addr(dev, prev->addr);
1497 			ftl_rwb_entry_invalidate(prev);
1498 		}
1499 
1500 		ftl_l2p_set(dev, entry->lba, addr);
1501 		pthread_spin_unlock(&prev->lock);
1502 		return;
1503 	}
1504 
1505 	/* Lock the band containing previous physical address. This assures atomic changes to */
1506 	/* the L2P as wall as metadata. The valid bits in metadata are used to */
1507 	/* check weak writes validity. */
1508 	band = ftl_band_from_addr(dev, prev_addr);
1509 	pthread_spin_lock(&band->lba_map.lock);
1510 
1511 	valid = ftl_invalidate_addr_unlocked(dev, prev_addr);
1512 
1513 	/* If the address has been invalidated already, we don't want to update */
1514 	/* the L2P for weak writes, as it means the write is no longer valid. */
1515 	if (!ftl_rwb_entry_weak(entry) || valid) {
1516 		ftl_l2p_set(dev, entry->lba, addr);
1517 	}
1518 
1519 	pthread_spin_unlock(&band->lba_map.lock);
1520 }
1521 
1522 static struct ftl_io *
1523 ftl_io_init_child_write(struct ftl_io *parent, struct ftl_addr addr,
1524 			void *data, void *md, ftl_io_fn cb)
1525 {
1526 	struct ftl_io *io;
1527 	struct spdk_ftl_dev *dev = parent->dev;
1528 	struct ftl_io_init_opts opts = {
1529 		.dev		= dev,
1530 		.io		= NULL,
1531 		.parent		= parent,
1532 		.rwb_batch	= NULL,
1533 		.band		= parent->band,
1534 		.size		= sizeof(struct ftl_io),
1535 		.flags		= 0,
1536 		.type		= parent->type,
1537 		.num_blocks	= dev->xfer_size,
1538 		.cb_fn		= cb,
1539 		.data		= data,
1540 		.md		= md,
1541 	};
1542 
1543 	io = ftl_io_init_internal(&opts);
1544 	if (!io) {
1545 		return NULL;
1546 	}
1547 
1548 	io->addr = addr;
1549 
1550 	return io;
1551 }
1552 
1553 static void
1554 ftl_io_child_write_cb(struct ftl_io *io, void *ctx, int status)
1555 {
1556 	struct ftl_zone *zone;
1557 	struct ftl_wptr *wptr;
1558 
1559 	zone = ftl_band_zone_from_addr(io->band, io->addr);
1560 	wptr = ftl_wptr_from_band(io->band);
1561 
1562 	zone->busy = false;
1563 	zone->info.write_pointer += io->num_blocks;
1564 
1565 	if (zone->info.write_pointer == zone->info.capacity) {
1566 		zone->info.state = SPDK_BDEV_ZONE_STATE_FULL;
1567 	}
1568 
1569 	/* If some other write on the same band failed the write pointer would already be freed */
1570 	if (spdk_likely(wptr)) {
1571 		wptr->num_outstanding--;
1572 	}
1573 }
1574 
1575 static int
1576 ftl_submit_child_write(struct ftl_wptr *wptr, struct ftl_io *io, int num_blocks)
1577 {
1578 	struct spdk_ftl_dev	*dev = io->dev;
1579 	struct ftl_io_channel	*ioch;
1580 	struct ftl_io		*child;
1581 	struct ftl_addr		addr;
1582 	int			rc;
1583 
1584 	ioch = spdk_io_channel_get_ctx(io->ioch);
1585 
1586 	if (spdk_likely(!wptr->direct_mode)) {
1587 		addr = wptr->addr;
1588 	} else {
1589 		assert(io->flags & FTL_IO_DIRECT_ACCESS);
1590 		assert(ftl_addr_get_band(dev, io->addr) == wptr->band->id);
1591 		addr = io->addr;
1592 	}
1593 
1594 	/* Split IO to child requests and release zone immediately after child is completed */
1595 	child = ftl_io_init_child_write(io, addr, ftl_io_iovec_addr(io),
1596 					ftl_io_get_md(io), ftl_io_child_write_cb);
1597 	if (!child) {
1598 		return -EAGAIN;
1599 	}
1600 
1601 	wptr->num_outstanding++;
1602 
1603 	if (ftl_is_append_supported(dev)) {
1604 		rc = spdk_bdev_zone_append(dev->base_bdev_desc, ioch->base_ioch,
1605 					   ftl_io_iovec_addr(child),
1606 					   ftl_addr_get_zone_slba(dev, addr),
1607 					   num_blocks, ftl_io_cmpl_cb, child);
1608 	} else {
1609 		rc = spdk_bdev_write_blocks(dev->base_bdev_desc, ioch->base_ioch,
1610 					    ftl_io_iovec_addr(child),
1611 					    addr.offset,
1612 					    num_blocks, ftl_io_cmpl_cb, child);
1613 	}
1614 
1615 	if (rc) {
1616 		wptr->num_outstanding--;
1617 		ftl_io_fail(child, rc);
1618 		ftl_io_complete(child);
1619 		SPDK_ERRLOG("spdk_bdev_write_blocks_with_md failed with status:%d, addr:%lu\n",
1620 			    rc, addr.offset);
1621 		return -EIO;
1622 	}
1623 
1624 	ftl_io_inc_req(child);
1625 	ftl_io_advance(child, num_blocks);
1626 
1627 	return 0;
1628 }
1629 
1630 static int
1631 ftl_submit_write(struct ftl_wptr *wptr, struct ftl_io *io)
1632 {
1633 	struct spdk_ftl_dev	*dev = io->dev;
1634 	int			rc = 0;
1635 
1636 	assert(io->num_blocks % dev->xfer_size == 0);
1637 	/* Only one child write make sense in case of user write */
1638 	assert((io->flags & FTL_IO_MD) || io->iov_cnt == 1);
1639 
1640 	while (io->iov_pos < io->iov_cnt) {
1641 		/* There are no guarantees of the order of completion of NVMe IO submission queue */
1642 		/* so wait until zone is not busy before submitting another write */
1643 		if (!ftl_is_append_supported(dev) && wptr->zone->busy) {
1644 			TAILQ_INSERT_TAIL(&wptr->pending_queue, io, retry_entry);
1645 			rc = -EAGAIN;
1646 			break;
1647 		}
1648 
1649 		rc = ftl_submit_child_write(wptr, io, dev->xfer_size);
1650 		if (spdk_unlikely(rc)) {
1651 			if (rc == -EAGAIN) {
1652 				TAILQ_INSERT_TAIL(&wptr->pending_queue, io, retry_entry);
1653 			} else {
1654 				ftl_io_fail(io, rc);
1655 			}
1656 			break;
1657 		}
1658 
1659 		ftl_trace_submission(dev, io, wptr->addr, dev->xfer_size);
1660 		ftl_wptr_advance(wptr, dev->xfer_size);
1661 	}
1662 
1663 	if (ftl_io_done(io)) {
1664 		/* Parent IO will complete after all children are completed */
1665 		ftl_io_complete(io);
1666 	}
1667 
1668 	return rc;
1669 }
1670 
1671 static void
1672 ftl_flush_pad_batch(struct spdk_ftl_dev *dev)
1673 {
1674 	struct ftl_rwb *rwb = dev->rwb;
1675 	size_t size, num_entries;
1676 
1677 	size = ftl_rwb_num_acquired(rwb, FTL_RWB_TYPE_INTERNAL) +
1678 	       ftl_rwb_num_acquired(rwb, FTL_RWB_TYPE_USER);
1679 
1680 	/* There must be something in the RWB, otherwise the flush */
1681 	/* wouldn't be waiting for anything */
1682 	assert(size > 0);
1683 
1684 	/* Only add padding when there's less than xfer size */
1685 	/* entries in the buffer. Otherwise we just have to wait */
1686 	/* for the entries to become ready. */
1687 	num_entries = ftl_rwb_get_active_batches(dev->rwb) * dev->xfer_size;
1688 	if (size < num_entries) {
1689 		ftl_rwb_pad(dev, num_entries - (size % num_entries));
1690 	}
1691 }
1692 
1693 static int
1694 ftl_wptr_process_writes(struct ftl_wptr *wptr)
1695 {
1696 	struct spdk_ftl_dev	*dev = wptr->dev;
1697 	struct ftl_rwb_batch	*batch;
1698 	struct ftl_rwb_entry	*entry;
1699 	struct ftl_io		*io;
1700 
1701 	if (spdk_unlikely(!TAILQ_EMPTY(&wptr->pending_queue))) {
1702 		io = TAILQ_FIRST(&wptr->pending_queue);
1703 		TAILQ_REMOVE(&wptr->pending_queue, io, retry_entry);
1704 
1705 		if (ftl_submit_write(wptr, io) == -EAGAIN) {
1706 			return 0;
1707 		}
1708 	}
1709 
1710 	/* Make sure the band is prepared for writing */
1711 	if (!ftl_wptr_ready(wptr)) {
1712 		return 0;
1713 	}
1714 
1715 	if (dev->halt) {
1716 		ftl_wptr_process_shutdown(wptr);
1717 	}
1718 
1719 	if (spdk_unlikely(wptr->flush)) {
1720 		ftl_wptr_pad_band(wptr);
1721 	}
1722 
1723 	batch = ftl_rwb_pop(dev->rwb);
1724 	if (!batch) {
1725 		/* If there are queued flush requests we need to pad the RWB to */
1726 		/* force out remaining entries */
1727 		if (!LIST_EMPTY(&dev->flush_list)) {
1728 			ftl_flush_pad_batch(dev);
1729 		}
1730 
1731 		return 0;
1732 	}
1733 
1734 	io = ftl_io_rwb_init(dev, wptr->addr, wptr->band, batch, ftl_write_cb);
1735 	if (!io) {
1736 		goto error;
1737 	}
1738 
1739 	ftl_rwb_foreach(entry, batch) {
1740 		/* Update band's relocation stats if the IO comes from reloc */
1741 		if (entry->flags & FTL_IO_WEAK) {
1742 			if (!spdk_bit_array_get(wptr->band->reloc_bitmap, entry->band->id)) {
1743 				spdk_bit_array_set(wptr->band->reloc_bitmap, entry->band->id);
1744 				entry->band->num_reloc_bands++;
1745 			}
1746 		}
1747 
1748 		ftl_trace_rwb_pop(dev, entry);
1749 		ftl_update_rwb_stats(dev, entry);
1750 	}
1751 
1752 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write addr:%lx\n", wptr->addr.offset);
1753 
1754 	if (ftl_submit_write(wptr, io)) {
1755 		/* TODO: we need some recovery here */
1756 		assert(0 && "Write submit failed");
1757 		if (ftl_io_done(io)) {
1758 			ftl_io_free(io);
1759 		}
1760 	}
1761 
1762 	return dev->xfer_size;
1763 error:
1764 	ftl_rwb_batch_revert(batch);
1765 	return 0;
1766 }
1767 
1768 static int
1769 ftl_process_writes(struct spdk_ftl_dev *dev)
1770 {
1771 	struct ftl_wptr *wptr, *twptr;
1772 	size_t num_active = 0;
1773 	enum ftl_band_state state;
1774 
1775 	LIST_FOREACH_SAFE(wptr, &dev->wptr_list, list_entry, twptr) {
1776 		ftl_wptr_process_writes(wptr);
1777 		state = wptr->band->state;
1778 
1779 		if (state != FTL_BAND_STATE_FULL &&
1780 		    state != FTL_BAND_STATE_CLOSING &&
1781 		    state != FTL_BAND_STATE_CLOSED) {
1782 			num_active++;
1783 		}
1784 	}
1785 
1786 	if (num_active < 1) {
1787 		ftl_add_wptr(dev);
1788 	}
1789 
1790 	return 0;
1791 }
1792 
1793 static void
1794 ftl_rwb_entry_fill(struct ftl_rwb_entry *entry, struct ftl_io *io)
1795 {
1796 	memcpy(entry->data, ftl_io_iovec_addr(io), FTL_BLOCK_SIZE);
1797 
1798 	if (ftl_rwb_entry_weak(entry)) {
1799 		entry->band = ftl_band_from_addr(io->dev, io->addr);
1800 		entry->addr = ftl_band_next_addr(entry->band, io->addr, io->pos);
1801 		entry->band->num_reloc_blocks++;
1802 	}
1803 
1804 	entry->trace = io->trace;
1805 	entry->lba = ftl_io_current_lba(io);
1806 
1807 	if (entry->md) {
1808 		memcpy(entry->md, &entry->lba, sizeof(entry->lba));
1809 	}
1810 }
1811 
1812 static int
1813 ftl_rwb_fill(struct ftl_io *io)
1814 {
1815 	struct spdk_ftl_dev *dev = io->dev;
1816 	struct ftl_rwb_entry *entry;
1817 	struct ftl_addr addr = { .cached = 1 };
1818 	int flags = ftl_rwb_flags_from_io(io);
1819 
1820 	while (io->pos < io->num_blocks) {
1821 		if (ftl_io_current_lba(io) == FTL_LBA_INVALID) {
1822 			ftl_io_advance(io, 1);
1823 			continue;
1824 		}
1825 
1826 		entry = ftl_acquire_entry(dev, flags);
1827 		if (!entry) {
1828 			return -EAGAIN;
1829 		}
1830 
1831 		ftl_rwb_entry_fill(entry, io);
1832 
1833 		addr.cache_offset = entry->pos;
1834 
1835 		ftl_trace_rwb_fill(dev, io);
1836 		ftl_update_l2p(dev, entry, addr);
1837 		ftl_io_advance(io, 1);
1838 
1839 		/* Needs to be done after L2P is updated to avoid race with */
1840 		/* write completion callback when it's processed faster than */
1841 		/* L2P is set in update_l2p(). */
1842 		ftl_rwb_push(entry);
1843 	}
1844 
1845 	if (ftl_io_done(io)) {
1846 		if (ftl_dev_has_nv_cache(dev) && !(io->flags & FTL_IO_BYPASS_CACHE)) {
1847 			ftl_write_nv_cache(io);
1848 		} else {
1849 			ftl_io_complete(io);
1850 		}
1851 	}
1852 
1853 	return 0;
1854 }
1855 
1856 static bool
1857 ftl_dev_needs_defrag(struct spdk_ftl_dev *dev)
1858 {
1859 	const struct spdk_ftl_limit *limit = ftl_get_limit(dev, SPDK_FTL_LIMIT_START);
1860 
1861 	if (ftl_reloc_is_halted(dev->reloc)) {
1862 		return false;
1863 	}
1864 
1865 	if (ftl_reloc_is_defrag_active(dev->reloc)) {
1866 		return false;
1867 	}
1868 
1869 	if (dev->num_free <= limit->thld) {
1870 		return true;
1871 	}
1872 
1873 	return false;
1874 }
1875 
1876 static double
1877 ftl_band_calc_merit(struct ftl_band *band, size_t *threshold_valid)
1878 {
1879 	size_t usable, valid, invalid;
1880 	double vld_ratio;
1881 
1882 	/* If the band doesn't have any usable blocks it's of no use */
1883 	usable = ftl_band_num_usable_blocks(band);
1884 	if (usable == 0) {
1885 		return 0.0;
1886 	}
1887 
1888 	valid =  threshold_valid ? (usable - *threshold_valid) : band->lba_map.num_vld;
1889 	invalid = usable - valid;
1890 
1891 	/* Add one to avoid division by 0 */
1892 	vld_ratio = (double)invalid / (double)(valid + 1);
1893 	return vld_ratio * ftl_band_age(band);
1894 }
1895 
1896 static bool
1897 ftl_band_needs_defrag(struct ftl_band *band, struct spdk_ftl_dev *dev)
1898 {
1899 	struct spdk_ftl_conf *conf = &dev->conf;
1900 	size_t thld_vld;
1901 
1902 	/* If we're in dire need of free bands, every band is worth defragging */
1903 	if (ftl_current_limit(dev) == SPDK_FTL_LIMIT_CRIT) {
1904 		return true;
1905 	}
1906 
1907 	thld_vld = (ftl_band_num_usable_blocks(band) * conf->invalid_thld) / 100;
1908 
1909 	return band->merit > ftl_band_calc_merit(band, &thld_vld);
1910 }
1911 
1912 static struct ftl_band *
1913 ftl_select_defrag_band(struct spdk_ftl_dev *dev)
1914 {
1915 	struct ftl_band *band, *mband = NULL;
1916 	double merit = 0;
1917 
1918 	LIST_FOREACH(band, &dev->shut_bands, list_entry) {
1919 		assert(band->state == FTL_BAND_STATE_CLOSED);
1920 		band->merit = ftl_band_calc_merit(band, NULL);
1921 		if (band->merit > merit) {
1922 			merit = band->merit;
1923 			mband = band;
1924 		}
1925 	}
1926 
1927 	if (mband && !ftl_band_needs_defrag(mband, dev)) {
1928 		mband = NULL;
1929 	}
1930 
1931 	return mband;
1932 }
1933 
1934 static void
1935 ftl_process_relocs(struct spdk_ftl_dev *dev)
1936 {
1937 	struct ftl_band *band;
1938 
1939 	if (ftl_dev_needs_defrag(dev)) {
1940 		band = ftl_select_defrag_band(dev);
1941 		if (band) {
1942 			ftl_reloc_add(dev->reloc, band, 0, ftl_get_num_blocks_in_band(dev), 0, true);
1943 			ftl_trace_defrag_band(dev, band);
1944 		}
1945 	}
1946 
1947 	ftl_reloc(dev->reloc);
1948 }
1949 
1950 int
1951 ftl_current_limit(const struct spdk_ftl_dev *dev)
1952 {
1953 	return dev->limit;
1954 }
1955 
1956 void
1957 spdk_ftl_dev_get_attrs(const struct spdk_ftl_dev *dev, struct spdk_ftl_attrs *attrs)
1958 {
1959 	attrs->uuid = dev->uuid;
1960 	attrs->num_blocks = dev->num_lbas;
1961 	attrs->block_size = FTL_BLOCK_SIZE;
1962 	attrs->num_zones = ftl_get_num_zones(dev);
1963 	attrs->zone_size = ftl_get_num_blocks_in_zone(dev);
1964 	attrs->conf = dev->conf;
1965 	attrs->base_bdev = spdk_bdev_get_name(spdk_bdev_desc_get_bdev(dev->base_bdev_desc));
1966 
1967 	attrs->cache_bdev = NULL;
1968 	if (dev->nv_cache.bdev_desc) {
1969 		attrs->cache_bdev = spdk_bdev_get_name(
1970 					    spdk_bdev_desc_get_bdev(dev->nv_cache.bdev_desc));
1971 	}
1972 }
1973 
1974 static void
1975 _ftl_io_write(void *ctx)
1976 {
1977 	ftl_io_write((struct ftl_io *)ctx);
1978 }
1979 
1980 static int
1981 ftl_rwb_fill_leaf(struct ftl_io *io)
1982 {
1983 	int rc;
1984 
1985 	rc = ftl_rwb_fill(io);
1986 	if (rc == -EAGAIN) {
1987 		spdk_thread_send_msg(spdk_io_channel_get_thread(io->ioch),
1988 				     _ftl_io_write, io);
1989 		return 0;
1990 	}
1991 
1992 	return rc;
1993 }
1994 
1995 static int
1996 ftl_submit_write_leaf(struct ftl_io *io)
1997 {
1998 	int rc;
1999 
2000 	rc = ftl_submit_write(ftl_wptr_from_band(io->band), io);
2001 	if (rc == -EAGAIN) {
2002 		/* EAGAIN means that the request was put on the pending queue */
2003 		return 0;
2004 	}
2005 
2006 	return rc;
2007 }
2008 
2009 void
2010 ftl_io_write(struct ftl_io *io)
2011 {
2012 	struct spdk_ftl_dev *dev = io->dev;
2013 
2014 	/* For normal IOs we just need to copy the data onto the rwb */
2015 	if (!(io->flags & FTL_IO_MD)) {
2016 		ftl_io_call_foreach_child(io, ftl_rwb_fill_leaf);
2017 	} else {
2018 		/* Metadata has its own buffer, so it doesn't have to be copied, so just */
2019 		/* send it the the core thread and schedule the write immediately */
2020 		if (ftl_check_core_thread(dev)) {
2021 			ftl_io_call_foreach_child(io, ftl_submit_write_leaf);
2022 		} else {
2023 			spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_write, io);
2024 		}
2025 	}
2026 }
2027 
2028 int
2029 spdk_ftl_write(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
2030 	       struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
2031 {
2032 	struct ftl_io *io;
2033 
2034 	if (iov_cnt == 0) {
2035 		return -EINVAL;
2036 	}
2037 
2038 	if (lba_cnt == 0) {
2039 		return -EINVAL;
2040 	}
2041 
2042 	if (lba_cnt != ftl_iovec_num_blocks(iov, iov_cnt)) {
2043 		return -EINVAL;
2044 	}
2045 
2046 	if (!dev->initialized) {
2047 		return -EBUSY;
2048 	}
2049 
2050 	io = ftl_io_user_init(ch, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_WRITE);
2051 	if (!io) {
2052 		return -ENOMEM;
2053 	}
2054 
2055 	ftl_io_write(io);
2056 
2057 	return 0;
2058 }
2059 
2060 static int
2061 ftl_io_read_leaf(struct ftl_io *io)
2062 {
2063 	int rc;
2064 
2065 	rc = ftl_submit_read(io);
2066 	if (rc == -ENOMEM) {
2067 		/* ENOMEM means that the request was put on a pending queue */
2068 		return 0;
2069 	}
2070 
2071 	return rc;
2072 }
2073 
2074 static void
2075 _ftl_io_read(void *arg)
2076 {
2077 	ftl_io_read((struct ftl_io *)arg);
2078 }
2079 
2080 void
2081 ftl_io_read(struct ftl_io *io)
2082 {
2083 	struct spdk_ftl_dev *dev = io->dev;
2084 
2085 	if (ftl_check_core_thread(dev)) {
2086 		ftl_io_call_foreach_child(io, ftl_io_read_leaf);
2087 	} else {
2088 		spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_read, io);
2089 	}
2090 }
2091 
2092 int
2093 spdk_ftl_read(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
2094 	      struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
2095 {
2096 	struct ftl_io *io;
2097 
2098 	if (iov_cnt == 0) {
2099 		return -EINVAL;
2100 	}
2101 
2102 	if (lba_cnt == 0) {
2103 		return -EINVAL;
2104 	}
2105 
2106 	if (lba_cnt != ftl_iovec_num_blocks(iov, iov_cnt)) {
2107 		return -EINVAL;
2108 	}
2109 
2110 	if (!dev->initialized) {
2111 		return -EBUSY;
2112 	}
2113 
2114 	io = ftl_io_user_init(ch, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_READ);
2115 	if (!io) {
2116 		return -ENOMEM;
2117 	}
2118 
2119 	ftl_io_read(io);
2120 	return 0;
2121 }
2122 
2123 static struct ftl_flush *
2124 ftl_flush_init(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2125 {
2126 	struct ftl_flush *flush;
2127 	struct ftl_rwb *rwb = dev->rwb;
2128 
2129 	flush = calloc(1, sizeof(*flush));
2130 	if (!flush) {
2131 		return NULL;
2132 	}
2133 
2134 	flush->bmap = spdk_bit_array_create(ftl_rwb_num_batches(rwb));
2135 	if (!flush->bmap) {
2136 		goto error;
2137 	}
2138 
2139 	flush->dev = dev;
2140 	flush->cb.fn = cb_fn;
2141 	flush->cb.ctx = cb_arg;
2142 
2143 	return flush;
2144 error:
2145 	free(flush);
2146 	return NULL;
2147 }
2148 
2149 static void
2150 _ftl_flush(void *ctx)
2151 {
2152 	struct ftl_flush *flush = ctx;
2153 	struct spdk_ftl_dev *dev = flush->dev;
2154 	struct ftl_rwb *rwb = dev->rwb;
2155 	struct ftl_rwb_batch *batch;
2156 
2157 	/* Attach flush object to all non-empty batches */
2158 	ftl_rwb_foreach_batch(batch, rwb) {
2159 		if (!ftl_rwb_batch_empty(batch)) {
2160 			spdk_bit_array_set(flush->bmap, ftl_rwb_batch_get_offset(batch));
2161 			flush->num_req++;
2162 		}
2163 	}
2164 
2165 	LIST_INSERT_HEAD(&dev->flush_list, flush, list_entry);
2166 
2167 	/* If the RWB was already empty, the flush can be completed right away */
2168 	if (!flush->num_req) {
2169 		ftl_complete_flush(flush);
2170 	}
2171 }
2172 
2173 int
2174 ftl_flush_rwb(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2175 {
2176 	struct ftl_flush *flush;
2177 
2178 	flush = ftl_flush_init(dev, cb_fn, cb_arg);
2179 	if (!flush) {
2180 		return -ENOMEM;
2181 	}
2182 
2183 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_flush, flush);
2184 	return 0;
2185 }
2186 
2187 int
2188 spdk_ftl_flush(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
2189 {
2190 	if (!dev->initialized) {
2191 		return -EBUSY;
2192 	}
2193 
2194 	return ftl_flush_rwb(dev, cb_fn, cb_arg);
2195 }
2196 
2197 bool
2198 ftl_addr_is_written(struct ftl_band *band, struct ftl_addr addr)
2199 {
2200 	struct ftl_zone *zone = ftl_band_zone_from_addr(band, addr);
2201 
2202 	return addr.offset < zone->info.write_pointer;
2203 }
2204 
2205 static void ftl_process_media_event(struct spdk_ftl_dev *dev, struct spdk_bdev_media_event event);
2206 
2207 static void
2208 _ftl_process_media_event(void *ctx)
2209 {
2210 	struct ftl_media_event *event = ctx;
2211 	struct spdk_ftl_dev *dev = event->dev;
2212 
2213 	ftl_process_media_event(dev, event->event);
2214 	spdk_mempool_put(dev->media_events_pool, event);
2215 }
2216 
2217 static void
2218 ftl_process_media_event(struct spdk_ftl_dev *dev, struct spdk_bdev_media_event event)
2219 {
2220 	struct ftl_band *band;
2221 	struct ftl_addr addr = { .offset = event.offset };
2222 	size_t block_off;
2223 
2224 	if (!ftl_check_core_thread(dev)) {
2225 		struct ftl_media_event *media_event;
2226 
2227 		media_event = spdk_mempool_get(dev->media_events_pool);
2228 		if (!media_event) {
2229 			SPDK_ERRLOG("Media event lost due to lack of memory");
2230 			return;
2231 		}
2232 
2233 		media_event->dev = dev;
2234 		media_event->event = event;
2235 		spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_process_media_event,
2236 				     media_event);
2237 		return;
2238 	}
2239 
2240 	band = ftl_band_from_addr(dev, addr);
2241 	block_off = ftl_band_block_offset_from_addr(band, addr);
2242 
2243 	ftl_reloc_add(dev->reloc, band, block_off, event.num_blocks, 0, false);
2244 }
2245 
2246 void
2247 ftl_get_media_events(struct spdk_ftl_dev *dev)
2248 {
2249 #define FTL_MAX_MEDIA_EVENTS 128
2250 	struct spdk_bdev_media_event events[FTL_MAX_MEDIA_EVENTS];
2251 	size_t num_events, i;
2252 
2253 	if (!dev->initialized) {
2254 		return;
2255 	}
2256 
2257 	do {
2258 		num_events = spdk_bdev_get_media_events(dev->base_bdev_desc,
2259 							events, FTL_MAX_MEDIA_EVENTS);
2260 
2261 		for (i = 0; i < num_events; ++i) {
2262 			ftl_process_media_event(dev, events[i]);
2263 		}
2264 
2265 	} while (num_events);
2266 }
2267 
2268 static void
2269 ftl_process_retry_queue(struct spdk_ftl_dev *dev)
2270 {
2271 	struct ftl_io *io;
2272 	int rc;
2273 
2274 	while (!TAILQ_EMPTY(&dev->retry_queue)) {
2275 		io = TAILQ_FIRST(&dev->retry_queue);
2276 
2277 		/* Retry only if IO is still healthy */
2278 		if (spdk_likely(io->status == 0)) {
2279 			rc = ftl_submit_read(io);
2280 			if (rc == -ENOMEM) {
2281 				break;
2282 			}
2283 		}
2284 
2285 		io->flags &= ~FTL_IO_RETRY;
2286 		TAILQ_REMOVE(&dev->retry_queue, io, retry_entry);
2287 
2288 		if (ftl_io_done(io)) {
2289 			ftl_io_complete(io);
2290 		}
2291 	}
2292 }
2293 
2294 int
2295 ftl_task_core(void *ctx)
2296 {
2297 	struct ftl_thread *thread = ctx;
2298 	struct spdk_ftl_dev *dev = thread->dev;
2299 
2300 	if (dev->halt) {
2301 		if (ftl_shutdown_complete(dev)) {
2302 			spdk_poller_unregister(&thread->poller);
2303 			return 0;
2304 		}
2305 	}
2306 
2307 	ftl_process_writes(dev);
2308 	ftl_process_relocs(dev);
2309 
2310 	if (!TAILQ_EMPTY(&dev->retry_queue)) {
2311 		ftl_process_retry_queue(dev);
2312 		return 1;
2313 	}
2314 
2315 	return 0;
2316 }
2317 
2318 SPDK_LOG_REGISTER_COMPONENT("ftl_core", SPDK_LOG_FTL_CORE)
2319