xref: /spdk/lib/ftl/ftl_core.c (revision a044e19470d20ae1792bedcd820e80d8ab4ad498)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/likely.h"
35 #include "spdk/stdinc.h"
36 #include "spdk/nvme.h"
37 #include "spdk/io_channel.h"
38 #include "spdk/bdev_module.h"
39 #include "spdk/string.h"
40 #include "spdk_internal/log.h"
41 #include "spdk/ftl.h"
42 
43 #include "ftl_core.h"
44 #include "ftl_band.h"
45 #include "ftl_io.h"
46 #include "ftl_anm.h"
47 #include "ftl_rwb.h"
48 #include "ftl_debug.h"
49 #include "ftl_reloc.h"
50 
51 struct ftl_wptr {
52 	/* Owner device */
53 	struct spdk_ftl_dev		*dev;
54 
55 	/* Current PPA */
56 	struct ftl_ppa			ppa;
57 
58 	/* Band currently being written to */
59 	struct ftl_band			*band;
60 
61 	/* Current logical block's offset */
62 	uint64_t			offset;
63 
64 	/* Current erase block */
65 	struct ftl_chunk		*chunk;
66 
67 	/* Pending IO queue */
68 	TAILQ_HEAD(, ftl_io)		pending_queue;
69 
70 	/* List link */
71 	LIST_ENTRY(ftl_wptr)		list_entry;
72 
73 	/*
74 	 * If setup in direct mode, there will be no offset or band state update after IO.
75 	 * The PPA is not assigned by wptr, and is instead taken directly from the request.
76 	 */
77 	bool				direct_mode;
78 };
79 
80 struct ftl_flush {
81 	/* Owner device */
82 	struct spdk_ftl_dev		*dev;
83 
84 	/* Number of batches to wait for */
85 	size_t				num_req;
86 
87 	/* Callback */
88 	struct {
89 		spdk_ftl_fn		fn;
90 		void			*ctx;
91 	} cb;
92 
93 	/* Batch bitmap */
94 	struct spdk_bit_array		*bmap;
95 
96 	/* List link */
97 	LIST_ENTRY(ftl_flush)		list_entry;
98 };
99 
100 static int
101 ftl_rwb_flags_from_io(const struct ftl_io *io)
102 {
103 	int valid_flags = FTL_IO_INTERNAL | FTL_IO_WEAK | FTL_IO_PAD;
104 	return io->flags & valid_flags;
105 }
106 
107 static int
108 ftl_rwb_entry_weak(const struct ftl_rwb_entry *entry)
109 {
110 	return entry->flags & FTL_IO_WEAK;
111 }
112 
113 static void
114 ftl_wptr_free(struct ftl_wptr *wptr)
115 {
116 	if (!wptr) {
117 		return;
118 	}
119 
120 	free(wptr);
121 }
122 
123 static void
124 ftl_remove_wptr(struct ftl_wptr *wptr)
125 {
126 	LIST_REMOVE(wptr, list_entry);
127 	ftl_wptr_free(wptr);
128 }
129 
130 static void
131 ftl_io_cmpl_cb(void *arg, const struct spdk_nvme_cpl *status)
132 {
133 	struct ftl_io *io = arg;
134 
135 	if (spdk_nvme_cpl_is_error(status)) {
136 		ftl_io_process_error(io, status);
137 	}
138 
139 	ftl_trace_completion(io->dev, io, FTL_TRACE_COMPLETION_DISK);
140 
141 	ftl_io_dec_req(io);
142 
143 	if (ftl_io_done(io)) {
144 		ftl_io_complete(io);
145 	}
146 }
147 
148 static void
149 ftl_halt_writes(struct spdk_ftl_dev *dev, struct ftl_band *band)
150 {
151 	struct ftl_wptr *wptr = NULL;
152 
153 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
154 		if (wptr->band == band) {
155 			break;
156 		}
157 	}
158 
159 	/* If the band already has the high_prio flag set, other writes must */
160 	/* have failed earlier, so it's already taken care of. */
161 	if (band->high_prio) {
162 		assert(wptr == NULL);
163 		return;
164 	}
165 
166 	ftl_band_write_failed(band);
167 	ftl_remove_wptr(wptr);
168 }
169 
170 static struct ftl_wptr *
171 ftl_wptr_from_band(struct ftl_band *band)
172 {
173 	struct spdk_ftl_dev *dev = band->dev;
174 	struct ftl_wptr *wptr = NULL;
175 
176 	LIST_FOREACH(wptr, &dev->wptr_list, list_entry) {
177 		if (wptr->band == band) {
178 			return wptr;
179 		}
180 	}
181 
182 	return NULL;
183 }
184 
185 static void
186 ftl_md_write_fail(struct ftl_io *io, int status)
187 {
188 	struct ftl_band *band = io->band;
189 	struct ftl_wptr *wptr;
190 	char buf[128];
191 
192 	wptr = ftl_wptr_from_band(band);
193 
194 	SPDK_ERRLOG("Metadata write failed @ppa: %s, status: %d\n",
195 		    ftl_ppa2str(wptr->ppa, buf, sizeof(buf)), status);
196 
197 	ftl_halt_writes(io->dev, band);
198 }
199 
200 static void
201 ftl_md_write_cb(struct ftl_io *io, void *arg, int status)
202 {
203 	struct spdk_ftl_dev *dev = io->dev;
204 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
205 	struct ftl_wptr *wptr;
206 	struct spdk_bdev *bdev;
207 
208 	wptr = ftl_wptr_from_band(io->band);
209 
210 	if (status) {
211 		ftl_md_write_fail(io, status);
212 		return;
213 	}
214 
215 	ftl_band_set_next_state(io->band);
216 	if (io->band->state == FTL_BAND_STATE_CLOSED) {
217 		if (nv_cache->bdev_desc) {
218 			bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
219 
220 			pthread_spin_lock(&nv_cache->lock);
221 			nv_cache->num_available += ftl_band_user_lbks(io->band);
222 
223 			if (spdk_unlikely(nv_cache->num_available > spdk_bdev_get_num_blocks(bdev))) {
224 				nv_cache->num_available = spdk_bdev_get_num_blocks(bdev);
225 			}
226 			pthread_spin_unlock(&nv_cache->lock);
227 		}
228 
229 		ftl_remove_wptr(wptr);
230 	}
231 }
232 
233 static int
234 ftl_ppa_read_next_ppa(struct ftl_io *io, struct ftl_ppa *ppa)
235 {
236 	struct spdk_ftl_dev *dev = io->dev;
237 	size_t lbk_cnt, max_lbks;
238 
239 	assert(ftl_io_mode_ppa(io));
240 	assert(io->iov_pos < io->iov_cnt);
241 
242 	if (io->pos == 0) {
243 		*ppa = io->ppa;
244 	} else {
245 		*ppa = ftl_band_next_xfer_ppa(io->band, io->ppa, io->pos);
246 	}
247 
248 	assert(!ftl_ppa_invalid(*ppa));
249 
250 	/* Metadata has to be read in the way it's written (jumping across */
251 	/* the chunks in xfer_size increments) */
252 	if (io->flags & FTL_IO_MD) {
253 		max_lbks = dev->xfer_size - (ppa->lbk % dev->xfer_size);
254 		lbk_cnt = spdk_min(ftl_io_iovec_len_left(io), max_lbks);
255 		assert(ppa->lbk / dev->xfer_size == (ppa->lbk + lbk_cnt - 1) / dev->xfer_size);
256 	} else {
257 		lbk_cnt = ftl_io_iovec_len_left(io);
258 	}
259 
260 	return lbk_cnt;
261 }
262 
263 static int
264 ftl_wptr_close_band(struct ftl_wptr *wptr)
265 {
266 	struct ftl_band *band = wptr->band;
267 
268 	ftl_band_set_state(band, FTL_BAND_STATE_CLOSING);
269 
270 	return ftl_band_write_tail_md(band, ftl_md_write_cb);
271 }
272 
273 static int
274 ftl_wptr_open_band(struct ftl_wptr *wptr)
275 {
276 	struct ftl_band *band = wptr->band;
277 
278 	assert(ftl_band_chunk_is_first(band, wptr->chunk));
279 	assert(band->lba_map.num_vld == 0);
280 
281 	ftl_band_clear_lba_map(band);
282 
283 	assert(band->state == FTL_BAND_STATE_PREP);
284 	ftl_band_set_state(band, FTL_BAND_STATE_OPENING);
285 
286 	return ftl_band_write_head_md(band, ftl_md_write_cb);
287 }
288 
289 static int
290 ftl_submit_erase(struct ftl_io *io)
291 {
292 	struct spdk_ftl_dev *dev = io->dev;
293 	struct ftl_band *band = io->band;
294 	struct ftl_ppa ppa = io->ppa;
295 	struct ftl_chunk *chunk;
296 	uint64_t ppa_packed;
297 	int rc = 0;
298 	size_t i;
299 
300 	for (i = 0; i < io->lbk_cnt; ++i) {
301 		if (i != 0) {
302 			chunk = ftl_band_next_chunk(band, ftl_band_chunk_from_ppa(band, ppa));
303 			assert(chunk->state == FTL_CHUNK_STATE_CLOSED ||
304 			       chunk->state == FTL_CHUNK_STATE_VACANT);
305 			ppa = chunk->start_ppa;
306 		}
307 
308 		assert(ppa.lbk == 0);
309 		ppa_packed = ftl_ppa_addr_pack(dev, ppa);
310 
311 		ftl_trace_submission(dev, io, ppa, 1);
312 		rc = spdk_nvme_ocssd_ns_cmd_vector_reset(dev->ns, ftl_get_write_qpair(dev),
313 				&ppa_packed, 1, NULL, ftl_io_cmpl_cb, io);
314 		if (spdk_unlikely(rc)) {
315 			ftl_io_fail(io, rc);
316 			SPDK_ERRLOG("Vector reset failed with status: %d\n", rc);
317 			break;
318 		}
319 
320 		ftl_io_inc_req(io);
321 		ftl_io_advance(io, 1);
322 	}
323 
324 	if (ftl_io_done(io)) {
325 		ftl_io_complete(io);
326 	}
327 
328 	return rc;
329 }
330 
331 static void
332 _ftl_io_erase(void *ctx)
333 {
334 	ftl_io_erase((struct ftl_io *)ctx);
335 }
336 
337 static bool
338 ftl_check_core_thread(const struct spdk_ftl_dev *dev)
339 {
340 	return dev->core_thread.thread == spdk_get_thread();
341 }
342 
343 static bool
344 ftl_check_read_thread(const struct spdk_ftl_dev *dev)
345 {
346 	return dev->read_thread.thread == spdk_get_thread();
347 }
348 
349 int
350 ftl_io_erase(struct ftl_io *io)
351 {
352 	struct spdk_ftl_dev *dev = io->dev;
353 
354 	if (ftl_check_core_thread(dev)) {
355 		return ftl_submit_erase(io);
356 	}
357 
358 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_erase, io);
359 	return 0;
360 }
361 
362 static struct ftl_band *
363 ftl_next_write_band(struct spdk_ftl_dev *dev)
364 {
365 	struct ftl_band *band;
366 
367 	band = LIST_FIRST(&dev->free_bands);
368 	if (!band) {
369 		return NULL;
370 	}
371 	assert(band->state == FTL_BAND_STATE_FREE);
372 
373 	if (ftl_band_erase(band)) {
374 		/* TODO: handle erase failure */
375 		return NULL;
376 	}
377 
378 	return band;
379 }
380 
381 static struct ftl_band *
382 ftl_next_wptr_band(struct spdk_ftl_dev *dev)
383 {
384 	struct ftl_band *band;
385 
386 	if (!dev->next_band) {
387 		band = ftl_next_write_band(dev);
388 	} else {
389 		assert(dev->next_band->state == FTL_BAND_STATE_PREP);
390 		band = dev->next_band;
391 		dev->next_band = NULL;
392 	}
393 
394 	return band;
395 }
396 
397 static struct ftl_wptr *
398 ftl_wptr_init(struct ftl_band *band)
399 {
400 	struct spdk_ftl_dev *dev = band->dev;
401 	struct ftl_wptr *wptr;
402 
403 	wptr = calloc(1, sizeof(*wptr));
404 	if (!wptr) {
405 		return NULL;
406 	}
407 
408 	wptr->dev = dev;
409 	wptr->band = band;
410 	wptr->chunk = CIRCLEQ_FIRST(&band->chunks);
411 	wptr->ppa = wptr->chunk->start_ppa;
412 	TAILQ_INIT(&wptr->pending_queue);
413 
414 	return wptr;
415 }
416 
417 static int
418 ftl_add_direct_wptr(struct ftl_band *band)
419 {
420 	struct spdk_ftl_dev *dev = band->dev;
421 	struct ftl_wptr *wptr;
422 
423 	assert(band->state == FTL_BAND_STATE_OPEN);
424 
425 	wptr = ftl_wptr_init(band);
426 	if (!wptr) {
427 		return -1;
428 	}
429 
430 	wptr->direct_mode = true;
431 
432 	if (ftl_band_alloc_lba_map(band)) {
433 		ftl_wptr_free(wptr);
434 		return -1;
435 	}
436 
437 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
438 
439 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: direct band %u\n", band->id);
440 	ftl_trace_write_band(dev, band);
441 	return 0;
442 }
443 
444 static void
445 ftl_close_direct_wptr(struct ftl_band *band)
446 {
447 	struct ftl_wptr *wptr = ftl_wptr_from_band(band);
448 
449 	assert(wptr->direct_mode);
450 	assert(band->state == FTL_BAND_STATE_CLOSED);
451 
452 	ftl_band_release_lba_map(band);
453 
454 	ftl_remove_wptr(wptr);
455 }
456 
457 int
458 ftl_band_set_direct_access(struct ftl_band *band, bool access)
459 {
460 	if (access) {
461 		return ftl_add_direct_wptr(band);
462 	} else {
463 		ftl_close_direct_wptr(band);
464 		return 0;
465 	}
466 }
467 
468 static int
469 ftl_add_wptr(struct spdk_ftl_dev *dev)
470 {
471 	struct ftl_band *band;
472 	struct ftl_wptr *wptr;
473 
474 	band = ftl_next_wptr_band(dev);
475 	if (!band) {
476 		return -1;
477 	}
478 
479 	wptr = ftl_wptr_init(band);
480 	if (!wptr) {
481 		return -1;
482 	}
483 
484 	if (ftl_band_write_prep(band)) {
485 		ftl_wptr_free(wptr);
486 		return -1;
487 	}
488 
489 	LIST_INSERT_HEAD(&dev->wptr_list, wptr, list_entry);
490 
491 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: band %u\n", band->id);
492 	ftl_trace_write_band(dev, band);
493 	return 0;
494 }
495 
496 static void
497 ftl_wptr_advance(struct ftl_wptr *wptr, size_t xfer_size)
498 {
499 	struct ftl_band *band = wptr->band;
500 	struct spdk_ftl_dev *dev = wptr->dev;
501 	struct spdk_ftl_conf *conf = &dev->conf;
502 	size_t next_thld;
503 
504 	if (spdk_unlikely(wptr->direct_mode)) {
505 		return;
506 	}
507 
508 	wptr->offset += xfer_size;
509 	next_thld = (ftl_band_num_usable_lbks(band) * conf->band_thld) / 100;
510 
511 	if (ftl_band_full(band, wptr->offset)) {
512 		ftl_band_set_state(band, FTL_BAND_STATE_FULL);
513 	}
514 
515 	wptr->chunk->busy = true;
516 	wptr->ppa = ftl_band_next_xfer_ppa(band, wptr->ppa, xfer_size);
517 	wptr->chunk = ftl_band_next_operational_chunk(band, wptr->chunk);
518 
519 	assert(!ftl_ppa_invalid(wptr->ppa));
520 
521 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "wptr: grp:%d, pu:%d chunk:%d, lbk:%u\n",
522 		      wptr->ppa.grp, wptr->ppa.pu, wptr->ppa.chk, wptr->ppa.lbk);
523 
524 	if (wptr->offset >= next_thld && !dev->next_band) {
525 		dev->next_band = ftl_next_write_band(dev);
526 	}
527 }
528 
529 static size_t
530 ftl_wptr_user_lbks_left(const struct ftl_wptr *wptr)
531 {
532 	return ftl_band_user_lbks_left(wptr->band, wptr->offset);
533 }
534 
535 static int
536 ftl_wptr_ready(struct ftl_wptr *wptr)
537 {
538 	struct ftl_band *band = wptr->band;
539 
540 	/* TODO: add handling of empty bands */
541 
542 	if (spdk_unlikely(!ftl_chunk_is_writable(wptr->chunk))) {
543 		/* Erasing band may fail after it was assigned to wptr. */
544 		if (spdk_unlikely(wptr->chunk->state == FTL_CHUNK_STATE_BAD)) {
545 			ftl_wptr_advance(wptr, wptr->dev->xfer_size);
546 		}
547 		return 0;
548 	}
549 
550 	/* If we're in the process of writing metadata, wait till it is */
551 	/* completed. */
552 	/* TODO: we should probably change bands once we're writing tail md */
553 	if (ftl_band_state_changing(band)) {
554 		return 0;
555 	}
556 
557 	if (band->state == FTL_BAND_STATE_FULL) {
558 		if (ftl_wptr_close_band(wptr)) {
559 			/* TODO: need recovery here */
560 			assert(false);
561 		}
562 		return 0;
563 	}
564 
565 	if (band->state != FTL_BAND_STATE_OPEN) {
566 		if (ftl_wptr_open_band(wptr)) {
567 			/* TODO: need recovery here */
568 			assert(false);
569 		}
570 		return 0;
571 	}
572 
573 	return 1;
574 }
575 
576 static const struct spdk_ftl_limit *
577 ftl_get_limit(const struct spdk_ftl_dev *dev, int type)
578 {
579 	assert(type < SPDK_FTL_LIMIT_MAX);
580 	return &dev->conf.defrag.limits[type];
581 }
582 
583 static bool
584 ftl_cache_lba_valid(struct spdk_ftl_dev *dev, struct ftl_rwb_entry *entry)
585 {
586 	struct ftl_ppa ppa;
587 
588 	/* If the LBA is invalid don't bother checking the md and l2p */
589 	if (spdk_unlikely(entry->lba == FTL_LBA_INVALID)) {
590 		return false;
591 	}
592 
593 	ppa = ftl_l2p_get(dev, entry->lba);
594 	if (!(ftl_ppa_cached(ppa) && ppa.offset == entry->pos)) {
595 		return false;
596 	}
597 
598 	return true;
599 }
600 
601 static void
602 ftl_evict_cache_entry(struct spdk_ftl_dev *dev, struct ftl_rwb_entry *entry)
603 {
604 	pthread_spin_lock(&entry->lock);
605 
606 	if (!ftl_rwb_entry_valid(entry)) {
607 		goto unlock;
608 	}
609 
610 	/* If the l2p wasn't updated and still points at the entry, fill it with the */
611 	/* on-disk PPA and clear the cache status bit. Otherwise, skip the l2p update */
612 	/* and just clear the cache status. */
613 	if (!ftl_cache_lba_valid(dev, entry)) {
614 		goto clear;
615 	}
616 
617 	ftl_l2p_set(dev, entry->lba, entry->ppa);
618 clear:
619 	ftl_rwb_entry_invalidate(entry);
620 unlock:
621 	pthread_spin_unlock(&entry->lock);
622 }
623 
624 static struct ftl_rwb_entry *
625 ftl_acquire_entry(struct spdk_ftl_dev *dev, int flags)
626 {
627 	struct ftl_rwb_entry *entry;
628 
629 	entry = ftl_rwb_acquire(dev->rwb, ftl_rwb_type_from_flags(flags));
630 	if (!entry) {
631 		return NULL;
632 	}
633 
634 	ftl_evict_cache_entry(dev, entry);
635 
636 	entry->flags = flags;
637 	return entry;
638 }
639 
640 static void
641 ftl_rwb_pad(struct spdk_ftl_dev *dev, size_t size)
642 {
643 	struct ftl_rwb_entry *entry;
644 	int flags = FTL_IO_PAD | FTL_IO_INTERNAL;
645 
646 	for (size_t i = 0; i < size; ++i) {
647 		entry = ftl_acquire_entry(dev, flags);
648 		if (!entry) {
649 			break;
650 		}
651 
652 		entry->lba = FTL_LBA_INVALID;
653 		entry->ppa = ftl_to_ppa(FTL_PPA_INVALID);
654 		memset(entry->data, 0, FTL_BLOCK_SIZE);
655 		ftl_rwb_push(entry);
656 	}
657 }
658 
659 static void
660 ftl_remove_free_bands(struct spdk_ftl_dev *dev)
661 {
662 	while (!LIST_EMPTY(&dev->free_bands)) {
663 		LIST_REMOVE(LIST_FIRST(&dev->free_bands), list_entry);
664 	}
665 
666 	dev->next_band = NULL;
667 }
668 
669 static void
670 ftl_wptr_process_shutdown(struct ftl_wptr *wptr)
671 {
672 	struct spdk_ftl_dev *dev = wptr->dev;
673 	size_t size = ftl_rwb_num_acquired(dev->rwb, FTL_RWB_TYPE_INTERNAL) +
674 		      ftl_rwb_num_acquired(dev->rwb, FTL_RWB_TYPE_USER);
675 	size_t num_active = dev->xfer_size * ftl_rwb_get_active_batches(dev->rwb);
676 	size_t band_length, rwb_free_space, pad_length;
677 
678 	num_active = num_active ? num_active : dev->xfer_size;
679 	if (size >= num_active) {
680 		return;
681 	}
682 
683 	/* If we reach this point we need to remove free bands */
684 	/* and pad current wptr band to the end */
685 	if (ftl_rwb_get_active_batches(dev->rwb) <= 1) {
686 		ftl_remove_free_bands(dev);
687 	}
688 
689 	band_length = ftl_wptr_user_lbks_left(wptr);
690 	rwb_free_space = ftl_rwb_size(dev->rwb) - size;
691 	pad_length = spdk_min(band_length, rwb_free_space);
692 
693 	/* Pad write buffer until band is full */
694 	ftl_rwb_pad(dev, pad_length);
695 }
696 
697 static int
698 ftl_shutdown_complete(struct spdk_ftl_dev *dev)
699 {
700 	return !__atomic_load_n(&dev->num_inflight, __ATOMIC_SEQ_CST) &&
701 	       LIST_EMPTY(&dev->wptr_list);
702 }
703 
704 void
705 ftl_apply_limits(struct spdk_ftl_dev *dev)
706 {
707 	const struct spdk_ftl_limit *limit;
708 	struct ftl_stats *stats = &dev->stats;
709 	size_t rwb_limit[FTL_RWB_TYPE_MAX];
710 	int i;
711 
712 	ftl_rwb_get_limits(dev->rwb, rwb_limit);
713 
714 	/* Clear existing limit */
715 	dev->limit = SPDK_FTL_LIMIT_MAX;
716 
717 	for (i = SPDK_FTL_LIMIT_CRIT; i < SPDK_FTL_LIMIT_MAX; ++i) {
718 		limit = ftl_get_limit(dev, i);
719 
720 		if (dev->num_free <= limit->thld) {
721 			rwb_limit[FTL_RWB_TYPE_USER] =
722 				(limit->limit * ftl_rwb_entry_cnt(dev->rwb)) / 100;
723 			stats->limits[i]++;
724 			dev->limit = i;
725 			goto apply;
726 		}
727 	}
728 
729 	/* Clear the limits, since we don't need to apply them anymore */
730 	rwb_limit[FTL_RWB_TYPE_USER] = ftl_rwb_entry_cnt(dev->rwb);
731 apply:
732 	ftl_trace_limits(dev, rwb_limit, dev->num_free);
733 	ftl_rwb_set_limits(dev->rwb, rwb_limit);
734 }
735 
736 static int
737 ftl_invalidate_addr_unlocked(struct spdk_ftl_dev *dev, struct ftl_ppa ppa)
738 {
739 	struct ftl_band *band = ftl_band_from_ppa(dev, ppa);
740 	struct ftl_lba_map *lba_map = &band->lba_map;
741 	uint64_t offset;
742 
743 	offset = ftl_band_lbkoff_from_ppa(band, ppa);
744 
745 	/* The bit might be already cleared if two writes are scheduled to the */
746 	/* same LBA at the same time */
747 	if (spdk_bit_array_get(lba_map->vld, offset)) {
748 		assert(lba_map->num_vld > 0);
749 		spdk_bit_array_clear(lba_map->vld, offset);
750 		lba_map->num_vld--;
751 		return 1;
752 	}
753 
754 	return 0;
755 }
756 
757 int
758 ftl_invalidate_addr(struct spdk_ftl_dev *dev, struct ftl_ppa ppa)
759 {
760 	struct ftl_band *band;
761 	int rc;
762 
763 	assert(!ftl_ppa_cached(ppa));
764 	band = ftl_band_from_ppa(dev, ppa);
765 
766 	pthread_spin_lock(&band->lba_map.lock);
767 	rc = ftl_invalidate_addr_unlocked(dev, ppa);
768 	pthread_spin_unlock(&band->lba_map.lock);
769 
770 	return rc;
771 }
772 
773 static int
774 ftl_read_retry(int rc)
775 {
776 	return rc == -EAGAIN;
777 }
778 
779 static int
780 ftl_read_canceled(int rc)
781 {
782 	return rc == -EFAULT || rc == 0;
783 }
784 
785 static void
786 ftl_add_to_retry_queue(struct ftl_io *io)
787 {
788 	if (!(io->flags & FTL_IO_RETRY)) {
789 		io->flags |= FTL_IO_RETRY;
790 		TAILQ_INSERT_TAIL(&io->dev->retry_queue, io, retry_entry);
791 	}
792 }
793 
794 static int
795 ftl_ppa_cache_read(struct ftl_io *io, uint64_t lba,
796 		   struct ftl_ppa ppa, void *buf)
797 {
798 	struct ftl_rwb *rwb = io->dev->rwb;
799 	struct ftl_rwb_entry *entry;
800 	struct ftl_ppa nppa;
801 	int rc = 0;
802 
803 	entry = ftl_rwb_entry_from_offset(rwb, ppa.offset);
804 	pthread_spin_lock(&entry->lock);
805 
806 	nppa = ftl_l2p_get(io->dev, lba);
807 	if (ppa.ppa != nppa.ppa) {
808 		rc = -1;
809 		goto out;
810 	}
811 
812 	memcpy(buf, entry->data, FTL_BLOCK_SIZE);
813 out:
814 	pthread_spin_unlock(&entry->lock);
815 	return rc;
816 }
817 
818 static int
819 ftl_lba_read_next_ppa(struct ftl_io *io, struct ftl_ppa *ppa)
820 {
821 	struct spdk_ftl_dev *dev = io->dev;
822 	struct ftl_ppa next_ppa;
823 	size_t i;
824 
825 	*ppa = ftl_l2p_get(dev, ftl_io_current_lba(io));
826 
827 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Read ppa:%lx, lba:%lu\n",
828 		      ppa->ppa, ftl_io_current_lba(io));
829 
830 	/* If the PPA is invalid, skip it (the buffer should already be zero'ed) */
831 	if (ftl_ppa_invalid(*ppa)) {
832 		return -EFAULT;
833 	}
834 
835 	if (ftl_ppa_cached(*ppa)) {
836 		if (!ftl_ppa_cache_read(io, ftl_io_current_lba(io), *ppa, ftl_io_iovec_addr(io))) {
837 			return 0;
838 		}
839 
840 		/* If the state changed, we have to re-read the l2p */
841 		return -EAGAIN;
842 	}
843 
844 	for (i = 1; i < ftl_io_iovec_len_left(io); ++i) {
845 		next_ppa = ftl_l2p_get(dev, ftl_io_get_lba(io, io->pos + i));
846 
847 		if (ftl_ppa_invalid(next_ppa) || ftl_ppa_cached(next_ppa)) {
848 			break;
849 		}
850 
851 		if (ftl_ppa_addr_pack(dev, *ppa) + i != ftl_ppa_addr_pack(dev, next_ppa)) {
852 			break;
853 		}
854 	}
855 
856 	return i;
857 }
858 
859 static int
860 ftl_submit_read(struct ftl_io *io)
861 {
862 	struct spdk_ftl_dev *dev = io->dev;
863 	struct ftl_ppa ppa;
864 	int rc = 0, lbk_cnt;
865 
866 	assert(LIST_EMPTY(&io->children));
867 
868 	while (io->pos < io->lbk_cnt) {
869 		if (ftl_io_mode_ppa(io)) {
870 			lbk_cnt = rc = ftl_ppa_read_next_ppa(io, &ppa);
871 		} else {
872 			lbk_cnt = rc = ftl_lba_read_next_ppa(io, &ppa);
873 		}
874 
875 		/* We might need to retry the read from scratch (e.g. */
876 		/* because write was under way and completed before */
877 		/* we could read it from rwb */
878 		if (ftl_read_retry(rc)) {
879 			continue;
880 		}
881 
882 		/* We don't have to schedule the read, as it was read from cache */
883 		if (ftl_read_canceled(rc)) {
884 			ftl_io_advance(io, 1);
885 			ftl_trace_completion(io->dev, io, rc ? FTL_TRACE_COMPLETION_INVALID :
886 					     FTL_TRACE_COMPLETION_CACHE);
887 			rc = 0;
888 			continue;
889 		}
890 
891 		assert(lbk_cnt > 0);
892 
893 		ftl_trace_submission(dev, io, ppa, lbk_cnt);
894 		rc = spdk_nvme_ns_cmd_read(dev->ns, ftl_get_read_qpair(dev),
895 					   ftl_io_iovec_addr(io),
896 					   ftl_ppa_addr_pack(io->dev, ppa), lbk_cnt,
897 					   ftl_io_cmpl_cb, io, 0);
898 		if (spdk_unlikely(rc)) {
899 			if (rc == -ENOMEM) {
900 				ftl_add_to_retry_queue(io);
901 			} else {
902 				ftl_io_fail(io, rc);
903 			}
904 			break;
905 		}
906 
907 		ftl_io_inc_req(io);
908 		ftl_io_advance(io, lbk_cnt);
909 	}
910 
911 	/* If we didn't have to read anything from the device, */
912 	/* complete the request right away */
913 	if (ftl_io_done(io)) {
914 		ftl_io_complete(io);
915 	}
916 
917 	return rc;
918 }
919 
920 static void
921 ftl_complete_flush(struct ftl_flush *flush)
922 {
923 	assert(flush->num_req == 0);
924 	LIST_REMOVE(flush, list_entry);
925 
926 	flush->cb.fn(flush->cb.ctx, 0);
927 
928 	spdk_bit_array_free(&flush->bmap);
929 	free(flush);
930 }
931 
932 static void
933 ftl_process_flush(struct spdk_ftl_dev *dev, struct ftl_rwb_batch *batch)
934 {
935 	struct ftl_flush *flush, *tflush;
936 	size_t offset;
937 
938 	LIST_FOREACH_SAFE(flush, &dev->flush_list, list_entry, tflush) {
939 		offset = ftl_rwb_batch_get_offset(batch);
940 
941 		if (spdk_bit_array_get(flush->bmap, offset)) {
942 			spdk_bit_array_clear(flush->bmap, offset);
943 			if (!(--flush->num_req)) {
944 				ftl_complete_flush(flush);
945 			}
946 		}
947 	}
948 }
949 
950 static uint64_t
951 ftl_reserve_nv_cache(struct ftl_nv_cache *nv_cache, size_t *num_lbks)
952 {
953 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
954 	struct spdk_ftl_dev *dev = SPDK_CONTAINEROF(nv_cache, struct spdk_ftl_dev, nv_cache);
955 	uint64_t num_available, cache_size, cache_addr = FTL_LBA_INVALID;
956 
957 	cache_size = spdk_bdev_get_num_blocks(bdev);
958 
959 	pthread_spin_lock(&nv_cache->lock);
960 	if (spdk_unlikely(nv_cache->num_available == 0)) {
961 		goto out;
962 	}
963 
964 	num_available = spdk_min(nv_cache->num_available, *num_lbks);
965 	num_available = spdk_min(num_available, dev->conf.nv_cache.max_request_cnt);
966 
967 	if (spdk_unlikely(nv_cache->current_addr + num_available > cache_size)) {
968 		*num_lbks = cache_size - nv_cache->current_addr;
969 	} else {
970 		*num_lbks = num_available;
971 	}
972 
973 	cache_addr = nv_cache->current_addr;
974 	nv_cache->current_addr += *num_lbks;
975 	nv_cache->num_available -= *num_lbks;
976 
977 	if (nv_cache->current_addr == spdk_bdev_get_num_blocks(bdev)) {
978 		nv_cache->current_addr = 0;
979 	}
980 out:
981 	pthread_spin_unlock(&nv_cache->lock);
982 	return cache_addr;
983 }
984 
985 static struct ftl_io *
986 ftl_alloc_io_nv_cache(struct ftl_io *parent, size_t num_lbks)
987 {
988 	struct ftl_io_init_opts opts = {
989 		.dev		= parent->dev,
990 		.parent		= parent,
991 		.data		= ftl_io_iovec_addr(parent),
992 		.lbk_cnt	= num_lbks,
993 		.flags		= FTL_IO_CACHE,
994 	};
995 
996 	return ftl_io_init_internal(&opts);
997 }
998 
999 static void
1000 ftl_nv_cache_submit_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1001 {
1002 	struct ftl_io *io = cb_arg;
1003 	struct ftl_nv_cache *nv_cache = &io->dev->nv_cache;
1004 
1005 	if (spdk_unlikely(!success)) {
1006 		SPDK_ERRLOG("Non-volatile cache write failed at %"PRIx64"\n", io->ppa.ppa);
1007 		io->status = -EIO;
1008 	}
1009 
1010 	ftl_io_dec_req(io);
1011 	if (ftl_io_done(io)) {
1012 		spdk_mempool_put(nv_cache->md_pool, io->md);
1013 		ftl_io_complete(io);
1014 	}
1015 
1016 	spdk_bdev_free_io(bdev_io);
1017 }
1018 
1019 static void
1020 ftl_submit_nv_cache(void *ctx)
1021 {
1022 	struct ftl_io *io = ctx;
1023 	struct spdk_ftl_dev *dev = io->dev;
1024 	struct spdk_thread *thread;
1025 	struct ftl_nv_cache *nv_cache = &dev->nv_cache;
1026 	struct ftl_io_channel *ioch;
1027 	int rc;
1028 
1029 	ioch = spdk_io_channel_get_ctx(io->ioch);
1030 	thread = spdk_io_channel_get_thread(io->ioch);
1031 
1032 	rc = spdk_bdev_write_blocks_with_md(nv_cache->bdev_desc, ioch->cache_ioch,
1033 					    ftl_io_iovec_addr(io), io->md, io->ppa.ppa,
1034 					    io->lbk_cnt, ftl_nv_cache_submit_cb, io);
1035 	if (rc == -ENOMEM) {
1036 		spdk_thread_send_msg(thread, ftl_submit_nv_cache, io);
1037 		return;
1038 	} else if (rc) {
1039 		SPDK_ERRLOG("Write to persistent cache failed: %s (%"PRIu64", %"PRIu64")\n",
1040 			    spdk_strerror(-rc), io->ppa.ppa, io->lbk_cnt);
1041 		spdk_mempool_put(nv_cache->md_pool, io->md);
1042 		io->status = -EIO;
1043 		ftl_io_complete(io);
1044 		return;
1045 	}
1046 
1047 	ftl_io_advance(io, io->lbk_cnt);
1048 	ftl_io_inc_req(io);
1049 }
1050 
1051 static void
1052 ftl_nv_cache_fill_md(struct ftl_nv_cache *nv_cache, struct ftl_io *io)
1053 {
1054 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(nv_cache->bdev_desc);
1055 	void *md_buf = io->md;
1056 	size_t lbk_off;
1057 
1058 	for (lbk_off = 0; lbk_off < io->lbk_cnt; ++lbk_off) {
1059 		*(uint64_t *)md_buf = ftl_io_get_lba(io, lbk_off);
1060 		md_buf = (char *)md_buf + spdk_bdev_get_md_size(bdev);
1061 	}
1062 }
1063 
1064 static void
1065 _ftl_write_nv_cache(void *ctx)
1066 {
1067 	struct ftl_io *child, *io = ctx;
1068 	struct spdk_ftl_dev *dev = io->dev;
1069 	struct spdk_thread *thread;
1070 	uint64_t num_lbks;
1071 
1072 	thread = spdk_io_channel_get_thread(io->ioch);
1073 
1074 	while (io->pos < io->lbk_cnt) {
1075 		num_lbks = ftl_io_iovec_len_left(io);
1076 
1077 		child = ftl_alloc_io_nv_cache(io, num_lbks);
1078 		if (spdk_unlikely(!child)) {
1079 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1080 			return;
1081 		}
1082 
1083 		child->md = spdk_mempool_get(dev->nv_cache.md_pool);
1084 		if (spdk_unlikely(!child->md)) {
1085 			ftl_io_free(child);
1086 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1087 			break;
1088 		}
1089 
1090 		/* Reserve area on the write buffer cache */
1091 		child->ppa.ppa = ftl_reserve_nv_cache(&dev->nv_cache, &num_lbks);
1092 		if (child->ppa.ppa == FTL_LBA_INVALID) {
1093 			spdk_mempool_put(dev->nv_cache.md_pool, child->md);
1094 			ftl_io_free(child);
1095 			spdk_thread_send_msg(thread, _ftl_write_nv_cache, io);
1096 			break;
1097 		}
1098 
1099 		/* Shrink the IO if there isn't enough room in the cache to fill the whole iovec */
1100 		if (spdk_unlikely(num_lbks != ftl_io_iovec_len_left(io))) {
1101 			ftl_io_shrink_iovec(child, num_lbks);
1102 		}
1103 
1104 		ftl_nv_cache_fill_md(&dev->nv_cache, child);
1105 		ftl_submit_nv_cache(child);
1106 	}
1107 
1108 	if (ftl_io_done(io)) {
1109 		ftl_io_complete(io);
1110 	}
1111 }
1112 
1113 static void
1114 ftl_write_nv_cache(struct ftl_io *parent)
1115 {
1116 	ftl_io_reset(parent);
1117 	parent->flags |= FTL_IO_CACHE;
1118 	_ftl_write_nv_cache(parent);
1119 }
1120 
1121 static void
1122 ftl_write_fail(struct ftl_io *io, int status)
1123 {
1124 	struct ftl_rwb_batch *batch = io->rwb_batch;
1125 	struct spdk_ftl_dev *dev = io->dev;
1126 	struct ftl_rwb_entry *entry;
1127 	struct ftl_band *band;
1128 	char buf[128];
1129 
1130 	entry = ftl_rwb_batch_first_entry(batch);
1131 
1132 	band = ftl_band_from_ppa(io->dev, entry->ppa);
1133 	SPDK_ERRLOG("Write failed @ppa: %s, status: %d\n",
1134 		    ftl_ppa2str(entry->ppa, buf, sizeof(buf)), status);
1135 
1136 	/* Close the band and, halt wptr and defrag */
1137 	ftl_halt_writes(dev, band);
1138 
1139 	ftl_rwb_foreach(entry, batch) {
1140 		/* Invalidate meta set by process_writes() */
1141 		ftl_invalidate_addr(dev, entry->ppa);
1142 	}
1143 
1144 	/* Reset the batch back to the the RWB to resend it later */
1145 	ftl_rwb_batch_revert(batch);
1146 }
1147 
1148 static void
1149 ftl_write_cb(struct ftl_io *io, void *arg, int status)
1150 {
1151 	struct spdk_ftl_dev *dev = io->dev;
1152 	struct ftl_rwb_batch *batch = io->rwb_batch;
1153 	struct ftl_rwb_entry *entry;
1154 
1155 	if (status) {
1156 		ftl_write_fail(io, status);
1157 		return;
1158 	}
1159 
1160 	assert(io->lbk_cnt == dev->xfer_size);
1161 	ftl_rwb_foreach(entry, batch) {
1162 		if (!(io->flags & FTL_IO_MD) && !(entry->flags & FTL_IO_PAD)) {
1163 			/* Verify that the LBA is set for user lbks */
1164 			assert(entry->lba != FTL_LBA_INVALID);
1165 		}
1166 
1167 		SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write ppa:%lu, lba:%lu\n",
1168 			      entry->ppa.ppa, entry->lba);
1169 	}
1170 
1171 	ftl_process_flush(dev, batch);
1172 	ftl_rwb_batch_release(batch);
1173 }
1174 
1175 static void
1176 ftl_update_rwb_stats(struct spdk_ftl_dev *dev, const struct ftl_rwb_entry *entry)
1177 {
1178 	if (!ftl_rwb_entry_internal(entry)) {
1179 		dev->stats.write_user++;
1180 	}
1181 	dev->stats.write_total++;
1182 }
1183 
1184 static void
1185 ftl_update_l2p(struct spdk_ftl_dev *dev, const struct ftl_rwb_entry *entry,
1186 	       struct ftl_ppa ppa)
1187 {
1188 	struct ftl_ppa prev_ppa;
1189 	struct ftl_rwb_entry *prev;
1190 	struct ftl_band *band;
1191 	int valid;
1192 
1193 	prev_ppa = ftl_l2p_get(dev, entry->lba);
1194 	if (ftl_ppa_invalid(prev_ppa)) {
1195 		ftl_l2p_set(dev, entry->lba, ppa);
1196 		return;
1197 	}
1198 
1199 	/* If the L2P's PPA is different than what we expected we don't need to */
1200 	/* do anything (someone's already overwritten our data). */
1201 	if (ftl_rwb_entry_weak(entry) && !ftl_ppa_cmp(prev_ppa, entry->ppa)) {
1202 		return;
1203 	}
1204 
1205 	if (ftl_ppa_cached(prev_ppa)) {
1206 		assert(!ftl_rwb_entry_weak(entry));
1207 		prev = ftl_rwb_entry_from_offset(dev->rwb, prev_ppa.offset);
1208 		pthread_spin_lock(&prev->lock);
1209 
1210 		/* Re-read the L2P under the lock to protect against updates */
1211 		/* to this LBA from other threads */
1212 		prev_ppa = ftl_l2p_get(dev, entry->lba);
1213 
1214 		/* If the entry is no longer in cache, another write has been */
1215 		/* scheduled in the meantime, so we have to invalidate its LBA */
1216 		if (!ftl_ppa_cached(prev_ppa)) {
1217 			ftl_invalidate_addr(dev, prev_ppa);
1218 		}
1219 
1220 		/* If previous entry is part of cache, remove and invalidate it */
1221 		if (ftl_rwb_entry_valid(prev)) {
1222 			ftl_invalidate_addr(dev, prev->ppa);
1223 			ftl_rwb_entry_invalidate(prev);
1224 		}
1225 
1226 		ftl_l2p_set(dev, entry->lba, ppa);
1227 		pthread_spin_unlock(&prev->lock);
1228 		return;
1229 	}
1230 
1231 	/* Lock the band containing previous PPA. This assures atomic changes to */
1232 	/* the L2P as wall as metadata. The valid bits in metadata are used to */
1233 	/* check weak writes validity. */
1234 	band = ftl_band_from_ppa(dev, prev_ppa);
1235 	pthread_spin_lock(&band->lba_map.lock);
1236 
1237 	valid = ftl_invalidate_addr_unlocked(dev, prev_ppa);
1238 
1239 	/* If the address has been invalidated already, we don't want to update */
1240 	/* the L2P for weak writes, as it means the write is no longer valid. */
1241 	if (!ftl_rwb_entry_weak(entry) || valid) {
1242 		ftl_l2p_set(dev, entry->lba, ppa);
1243 	}
1244 
1245 	pthread_spin_unlock(&band->lba_map.lock);
1246 }
1247 
1248 static struct ftl_io *
1249 ftl_io_init_child_write(struct ftl_io *parent, struct ftl_ppa ppa,
1250 			void *data, void *md, ftl_io_fn cb)
1251 {
1252 	struct ftl_io *io;
1253 	struct spdk_ftl_dev *dev = parent->dev;
1254 	struct ftl_io_init_opts opts = {
1255 		.dev		= dev,
1256 		.io		= NULL,
1257 		.parent		= parent,
1258 		.rwb_batch	= NULL,
1259 		.band		= parent->band,
1260 		.size		= sizeof(struct ftl_io),
1261 		.flags		= 0,
1262 		.type		= FTL_IO_WRITE,
1263 		.lbk_cnt	= dev->xfer_size,
1264 		.cb_fn		= cb,
1265 		.data		= data,
1266 		.md		= md,
1267 	};
1268 
1269 	io = ftl_io_init_internal(&opts);
1270 	if (!io) {
1271 		return NULL;
1272 	}
1273 
1274 	io->ppa = ppa;
1275 
1276 	return io;
1277 }
1278 
1279 static void
1280 ftl_io_child_write_cb(struct ftl_io *io, void *ctx, int status)
1281 {
1282 	struct ftl_chunk *chunk;
1283 
1284 	chunk = ftl_band_chunk_from_ppa(io->band, io->ppa);
1285 	chunk->busy = false;
1286 	chunk->write_offset += io->lbk_cnt;
1287 }
1288 
1289 static int
1290 ftl_submit_child_write(struct ftl_wptr *wptr, struct ftl_io *io, int lbk_cnt)
1291 {
1292 	struct spdk_ftl_dev	*dev = io->dev;
1293 	struct ftl_io		*child;
1294 	int			rc;
1295 	struct ftl_ppa		ppa;
1296 
1297 	if (spdk_likely(!wptr->direct_mode)) {
1298 		ppa = wptr->ppa;
1299 	} else {
1300 		assert(io->flags & FTL_IO_DIRECT_ACCESS);
1301 		assert(io->ppa.chk == wptr->band->id);
1302 		ppa = io->ppa;
1303 	}
1304 
1305 	/* Split IO to child requests and release chunk immediately after child is completed */
1306 	child = ftl_io_init_child_write(io, ppa, ftl_io_iovec_addr(io),
1307 					ftl_io_get_md(io), ftl_io_child_write_cb);
1308 	if (!child) {
1309 		return -EAGAIN;
1310 	}
1311 
1312 	rc = spdk_nvme_ns_cmd_write_with_md(dev->ns, ftl_get_write_qpair(dev),
1313 					    ftl_io_iovec_addr(child), child->md,
1314 					    ftl_ppa_addr_pack(dev, ppa),
1315 					    lbk_cnt, ftl_io_cmpl_cb, child, 0, 0, 0);
1316 	if (rc) {
1317 		ftl_io_fail(child, rc);
1318 		ftl_io_complete(child);
1319 		SPDK_ERRLOG("spdk_nvme_ns_cmd_write failed with status:%d, ppa:%lu\n",
1320 			    rc, ppa.ppa);
1321 
1322 		return -EIO;
1323 	}
1324 
1325 	ftl_io_inc_req(child);
1326 	ftl_io_advance(child, lbk_cnt);
1327 
1328 	return 0;
1329 }
1330 
1331 static int
1332 ftl_submit_write(struct ftl_wptr *wptr, struct ftl_io *io)
1333 {
1334 	struct spdk_ftl_dev	*dev = io->dev;
1335 	int			rc = 0;
1336 
1337 	assert(io->lbk_cnt % dev->xfer_size == 0);
1338 
1339 	while (io->iov_pos < io->iov_cnt) {
1340 		/* There are no guarantees of the order of completion of NVMe IO submission queue */
1341 		/* so wait until chunk is not busy before submitting another write */
1342 		if (wptr->chunk->busy) {
1343 			TAILQ_INSERT_TAIL(&wptr->pending_queue, io, retry_entry);
1344 			rc = -EAGAIN;
1345 			break;
1346 		}
1347 
1348 		rc = ftl_submit_child_write(wptr, io, dev->xfer_size);
1349 		if (spdk_unlikely(rc)) {
1350 			if (rc == -EAGAIN) {
1351 				TAILQ_INSERT_TAIL(&wptr->pending_queue, io, retry_entry);
1352 			} else {
1353 				ftl_io_fail(io, rc);
1354 			}
1355 			break;
1356 		}
1357 
1358 		ftl_trace_submission(dev, io, wptr->ppa, dev->xfer_size);
1359 		ftl_wptr_advance(wptr, dev->xfer_size);
1360 	}
1361 
1362 	if (ftl_io_done(io)) {
1363 		/* Parent IO will complete after all children are completed */
1364 		ftl_io_complete(io);
1365 	}
1366 
1367 	return rc;
1368 }
1369 
1370 static void
1371 ftl_flush_pad_batch(struct spdk_ftl_dev *dev)
1372 {
1373 	struct ftl_rwb *rwb = dev->rwb;
1374 	size_t size, num_entries;
1375 
1376 	size = ftl_rwb_num_acquired(rwb, FTL_RWB_TYPE_INTERNAL) +
1377 	       ftl_rwb_num_acquired(rwb, FTL_RWB_TYPE_USER);
1378 
1379 	/* There must be something in the RWB, otherwise the flush */
1380 	/* wouldn't be waiting for anything */
1381 	assert(size > 0);
1382 
1383 	/* Only add padding when there's less than xfer size */
1384 	/* entries in the buffer. Otherwise we just have to wait */
1385 	/* for the entries to become ready. */
1386 	num_entries = ftl_rwb_get_active_batches(dev->rwb) * dev->xfer_size;
1387 	if (size < num_entries) {
1388 		ftl_rwb_pad(dev, num_entries - (size % num_entries));
1389 	}
1390 }
1391 
1392 static int
1393 ftl_wptr_process_writes(struct ftl_wptr *wptr)
1394 {
1395 	struct spdk_ftl_dev	*dev = wptr->dev;
1396 	struct ftl_rwb_batch	*batch;
1397 	struct ftl_rwb_entry	*entry;
1398 	struct ftl_io		*io;
1399 	struct ftl_ppa		ppa, prev_ppa;
1400 
1401 	if (spdk_unlikely(!TAILQ_EMPTY(&wptr->pending_queue))) {
1402 		io = TAILQ_FIRST(&wptr->pending_queue);
1403 		TAILQ_REMOVE(&wptr->pending_queue, io, retry_entry);
1404 
1405 		if (ftl_submit_write(wptr, io) == -EAGAIN) {
1406 			return 0;
1407 		}
1408 	}
1409 
1410 	/* Make sure the band is prepared for writing */
1411 	if (!ftl_wptr_ready(wptr)) {
1412 		return 0;
1413 	}
1414 
1415 	if (dev->halt) {
1416 		ftl_wptr_process_shutdown(wptr);
1417 	}
1418 
1419 	batch = ftl_rwb_pop(dev->rwb);
1420 	if (!batch) {
1421 		/* If there are queued flush requests we need to pad the RWB to */
1422 		/* force out remaining entries */
1423 		if (!LIST_EMPTY(&dev->flush_list)) {
1424 			ftl_flush_pad_batch(dev);
1425 		}
1426 
1427 		return 0;
1428 	}
1429 
1430 	io = ftl_io_rwb_init(dev, wptr->band, batch, ftl_write_cb);
1431 	if (!io) {
1432 		goto error;
1433 	}
1434 
1435 	ppa = wptr->ppa;
1436 	ftl_rwb_foreach(entry, batch) {
1437 		entry->ppa = ppa;
1438 
1439 		if (entry->lba != FTL_LBA_INVALID) {
1440 			pthread_spin_lock(&entry->lock);
1441 			prev_ppa = ftl_l2p_get(dev, entry->lba);
1442 
1443 			/* If the l2p was updated in the meantime, don't update band's metadata */
1444 			if (ftl_ppa_cached(prev_ppa) && prev_ppa.offset == entry->pos) {
1445 				/* Setting entry's cache bit needs to be done after metadata */
1446 				/* within the band is updated to make sure that writes */
1447 				/* invalidating the entry clear the metadata as well */
1448 				ftl_band_set_addr(wptr->band, entry->lba, entry->ppa);
1449 				ftl_rwb_entry_set_valid(entry);
1450 			}
1451 			pthread_spin_unlock(&entry->lock);
1452 		}
1453 
1454 		ftl_trace_rwb_pop(dev, entry);
1455 		ftl_update_rwb_stats(dev, entry);
1456 
1457 		ppa = ftl_band_next_ppa(wptr->band, ppa, 1);
1458 	}
1459 
1460 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Write ppa:%lx, %lx\n", wptr->ppa.ppa,
1461 		      ftl_ppa_addr_pack(dev, wptr->ppa));
1462 
1463 	if (ftl_submit_write(wptr, io)) {
1464 		/* TODO: we need some recovery here */
1465 		assert(0 && "Write submit failed");
1466 		if (ftl_io_done(io)) {
1467 			ftl_io_free(io);
1468 		}
1469 	}
1470 
1471 	return dev->xfer_size;
1472 error:
1473 	ftl_rwb_batch_revert(batch);
1474 	return 0;
1475 }
1476 
1477 static int
1478 ftl_process_writes(struct spdk_ftl_dev *dev)
1479 {
1480 	struct ftl_wptr *wptr, *twptr;
1481 	size_t num_active = 0;
1482 	enum ftl_band_state state;
1483 
1484 	LIST_FOREACH_SAFE(wptr, &dev->wptr_list, list_entry, twptr) {
1485 		ftl_wptr_process_writes(wptr);
1486 		state = wptr->band->state;
1487 
1488 		if (state != FTL_BAND_STATE_FULL &&
1489 		    state != FTL_BAND_STATE_CLOSING &&
1490 		    state != FTL_BAND_STATE_CLOSED) {
1491 			num_active++;
1492 		}
1493 	}
1494 
1495 	if (num_active < 1) {
1496 		ftl_add_wptr(dev);
1497 	}
1498 
1499 	return 0;
1500 }
1501 
1502 static void
1503 ftl_rwb_entry_fill(struct ftl_rwb_entry *entry, struct ftl_io *io)
1504 {
1505 	struct ftl_band *band;
1506 
1507 	memcpy(entry->data, ftl_io_iovec_addr(io), FTL_BLOCK_SIZE);
1508 
1509 	if (ftl_rwb_entry_weak(entry)) {
1510 		band = ftl_band_from_ppa(io->dev, io->ppa);
1511 		entry->ppa = ftl_band_next_ppa(band, io->ppa, io->pos);
1512 	}
1513 
1514 	entry->trace = io->trace;
1515 	entry->lba = ftl_io_current_lba(io);
1516 
1517 	if (entry->md) {
1518 		memcpy(entry->md, &entry->lba, sizeof(entry->lba));
1519 	}
1520 }
1521 
1522 static int
1523 ftl_rwb_fill(struct ftl_io *io)
1524 {
1525 	struct spdk_ftl_dev *dev = io->dev;
1526 	struct ftl_rwb_entry *entry;
1527 	struct ftl_ppa ppa = { .cached = 1 };
1528 	int flags = ftl_rwb_flags_from_io(io);
1529 
1530 	while (io->pos < io->lbk_cnt) {
1531 		if (ftl_io_current_lba(io) == FTL_LBA_INVALID) {
1532 			ftl_io_advance(io, 1);
1533 			continue;
1534 		}
1535 
1536 		entry = ftl_acquire_entry(dev, flags);
1537 		if (!entry) {
1538 			return -EAGAIN;
1539 		}
1540 
1541 		ftl_rwb_entry_fill(entry, io);
1542 
1543 		ppa.offset = entry->pos;
1544 
1545 		ftl_trace_rwb_fill(dev, io);
1546 		ftl_update_l2p(dev, entry, ppa);
1547 		ftl_io_advance(io, 1);
1548 
1549 		/* Needs to be done after L2P is updated to avoid race with */
1550 		/* write completion callback when it's processed faster than */
1551 		/* L2P is set in update_l2p(). */
1552 		ftl_rwb_push(entry);
1553 	}
1554 
1555 	if (ftl_io_done(io)) {
1556 		if (dev->nv_cache.bdev_desc) {
1557 			ftl_write_nv_cache(io);
1558 		} else {
1559 			ftl_io_complete(io);
1560 		}
1561 	}
1562 
1563 	return 0;
1564 }
1565 
1566 static bool
1567 ftl_dev_needs_defrag(struct spdk_ftl_dev *dev)
1568 {
1569 	const struct spdk_ftl_limit *limit = ftl_get_limit(dev, SPDK_FTL_LIMIT_START);
1570 
1571 	if (ftl_reloc_is_halted(dev->reloc)) {
1572 		return false;
1573 	}
1574 
1575 	if (dev->df_band) {
1576 		return false;
1577 	}
1578 
1579 	if (dev->num_free <= limit->thld) {
1580 		return true;
1581 	}
1582 
1583 	return false;
1584 }
1585 
1586 static double
1587 ftl_band_calc_merit(struct ftl_band *band, size_t *threshold_valid)
1588 {
1589 	size_t usable, valid, invalid;
1590 	double vld_ratio;
1591 
1592 	/* If the band doesn't have any usable lbks it's of no use */
1593 	usable = ftl_band_num_usable_lbks(band);
1594 	if (usable == 0) {
1595 		return 0.0;
1596 	}
1597 
1598 	valid =  threshold_valid ? (usable - *threshold_valid) : band->lba_map.num_vld;
1599 	invalid = usable - valid;
1600 
1601 	/* Add one to avoid division by 0 */
1602 	vld_ratio = (double)invalid / (double)(valid + 1);
1603 	return vld_ratio * ftl_band_age(band);
1604 }
1605 
1606 static bool
1607 ftl_band_needs_defrag(struct ftl_band *band, struct spdk_ftl_dev *dev)
1608 {
1609 	struct spdk_ftl_conf *conf = &dev->conf;
1610 	size_t thld_vld;
1611 
1612 	/* If we're in dire need of free bands, every band is worth defragging */
1613 	if (ftl_current_limit(dev) == SPDK_FTL_LIMIT_CRIT) {
1614 		return true;
1615 	}
1616 
1617 	thld_vld = (ftl_band_num_usable_lbks(band) * conf->defrag.invalid_thld) / 100;
1618 
1619 	return band->merit > ftl_band_calc_merit(band, &thld_vld);
1620 }
1621 
1622 static struct ftl_band *
1623 ftl_select_defrag_band(struct spdk_ftl_dev *dev)
1624 {
1625 	struct ftl_band *band, *mband = NULL;
1626 	double merit = 0;
1627 
1628 	LIST_FOREACH(band, &dev->shut_bands, list_entry) {
1629 		assert(band->state == FTL_BAND_STATE_CLOSED);
1630 		band->merit = ftl_band_calc_merit(band, NULL);
1631 		if (band->merit > merit) {
1632 			merit = band->merit;
1633 			mband = band;
1634 		}
1635 	}
1636 
1637 	if (mband && !ftl_band_needs_defrag(mband, dev)) {
1638 		mband = NULL;
1639 	}
1640 
1641 	return mband;
1642 }
1643 
1644 static void
1645 ftl_process_relocs(struct spdk_ftl_dev *dev)
1646 {
1647 	struct ftl_band *band;
1648 
1649 	if (ftl_dev_needs_defrag(dev)) {
1650 		band = dev->df_band = ftl_select_defrag_band(dev);
1651 
1652 		if (band) {
1653 			ftl_reloc_add(dev->reloc, band, 0, ftl_num_band_lbks(dev), 0);
1654 			ftl_trace_defrag_band(dev, band);
1655 		}
1656 	}
1657 
1658 	ftl_reloc(dev->reloc);
1659 }
1660 
1661 int
1662 ftl_current_limit(const struct spdk_ftl_dev *dev)
1663 {
1664 	return dev->limit;
1665 }
1666 
1667 void
1668 spdk_ftl_dev_get_attrs(const struct spdk_ftl_dev *dev, struct spdk_ftl_attrs *attrs)
1669 {
1670 	attrs->uuid = dev->uuid;
1671 	attrs->lbk_cnt = dev->num_lbas;
1672 	attrs->lbk_size = FTL_BLOCK_SIZE;
1673 	attrs->range = dev->range;
1674 	attrs->cache_bdev_desc = dev->nv_cache.bdev_desc;
1675 	attrs->allow_open_bands = dev->conf.allow_open_bands;
1676 	attrs->num_chunks = dev->geo.num_chk;
1677 	attrs->chunk_size = dev->geo.clba;
1678 }
1679 
1680 static void
1681 _ftl_io_write(void *ctx)
1682 {
1683 	ftl_io_write((struct ftl_io *)ctx);
1684 }
1685 
1686 static int
1687 ftl_rwb_fill_leaf(struct ftl_io *io)
1688 {
1689 	int rc;
1690 
1691 	rc = ftl_rwb_fill(io);
1692 	if (rc == -EAGAIN) {
1693 		spdk_thread_send_msg(spdk_io_channel_get_thread(io->ioch),
1694 				     _ftl_io_write, io);
1695 		return 0;
1696 	}
1697 
1698 	return rc;
1699 }
1700 
1701 static int
1702 ftl_submit_write_leaf(struct ftl_io *io)
1703 {
1704 	int rc;
1705 
1706 	rc = ftl_submit_write(ftl_wptr_from_band(io->band), io);
1707 	if (rc == -EAGAIN) {
1708 		/* EAGAIN means that the request was put on the pending queue */
1709 		return 0;
1710 	}
1711 
1712 	return rc;
1713 }
1714 
1715 void
1716 ftl_io_write(struct ftl_io *io)
1717 {
1718 	struct spdk_ftl_dev *dev = io->dev;
1719 
1720 	/* For normal IOs we just need to copy the data onto the rwb */
1721 	if (!(io->flags & FTL_IO_MD)) {
1722 		ftl_io_call_foreach_child(io, ftl_rwb_fill_leaf);
1723 	} else {
1724 		/* Metadata has its own buffer, so it doesn't have to be copied, so just */
1725 		/* send it the the core thread and schedule the write immediately */
1726 		if (ftl_check_core_thread(dev)) {
1727 			ftl_io_call_foreach_child(io, ftl_submit_write_leaf);
1728 		} else {
1729 			spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_io_write, io);
1730 		}
1731 	}
1732 }
1733 
1734 int
1735 spdk_ftl_write(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
1736 	       struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
1737 {
1738 	struct ftl_io *io;
1739 
1740 	if (iov_cnt == 0) {
1741 		return -EINVAL;
1742 	}
1743 
1744 	if (lba_cnt == 0) {
1745 		return -EINVAL;
1746 	}
1747 
1748 	if (lba_cnt != ftl_iovec_num_lbks(iov, iov_cnt)) {
1749 		return -EINVAL;
1750 	}
1751 
1752 	if (!dev->initialized) {
1753 		return -EBUSY;
1754 	}
1755 
1756 	io = ftl_io_user_init(ch, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_WRITE);
1757 	if (!io) {
1758 		return -ENOMEM;
1759 	}
1760 
1761 	ftl_io_write(io);
1762 
1763 	return 0;
1764 }
1765 
1766 static int
1767 ftl_io_read_leaf(struct ftl_io *io)
1768 {
1769 	int rc;
1770 
1771 	rc = ftl_submit_read(io);
1772 	if (rc == -ENOMEM) {
1773 		/* ENOMEM means that the request was put on a pending queue */
1774 		return 0;
1775 	}
1776 
1777 	return rc;
1778 }
1779 
1780 static void
1781 _ftl_io_read(void *arg)
1782 {
1783 	ftl_io_read((struct ftl_io *)arg);
1784 }
1785 
1786 void
1787 ftl_io_read(struct ftl_io *io)
1788 {
1789 	struct spdk_ftl_dev *dev = io->dev;
1790 
1791 	if (ftl_check_read_thread(dev)) {
1792 		ftl_io_call_foreach_child(io, ftl_io_read_leaf);
1793 	} else {
1794 		spdk_thread_send_msg(ftl_get_read_thread(dev), _ftl_io_read, io);
1795 	}
1796 }
1797 
1798 int
1799 spdk_ftl_read(struct spdk_ftl_dev *dev, struct spdk_io_channel *ch, uint64_t lba, size_t lba_cnt,
1800 	      struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
1801 {
1802 	struct ftl_io *io;
1803 
1804 	if (iov_cnt == 0) {
1805 		return -EINVAL;
1806 	}
1807 
1808 	if (lba_cnt == 0) {
1809 		return -EINVAL;
1810 	}
1811 
1812 	if (lba_cnt != ftl_iovec_num_lbks(iov, iov_cnt)) {
1813 		return -EINVAL;
1814 	}
1815 
1816 	if (!dev->initialized) {
1817 		return -EBUSY;
1818 	}
1819 
1820 	io = ftl_io_user_init(ch, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_READ);
1821 	if (!io) {
1822 		return -ENOMEM;
1823 	}
1824 
1825 	ftl_io_read(io);
1826 	return 0;
1827 }
1828 
1829 static struct ftl_flush *
1830 ftl_flush_init(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
1831 {
1832 	struct ftl_flush *flush;
1833 	struct ftl_rwb *rwb = dev->rwb;
1834 
1835 	flush = calloc(1, sizeof(*flush));
1836 	if (!flush) {
1837 		return NULL;
1838 	}
1839 
1840 	flush->bmap = spdk_bit_array_create(ftl_rwb_num_batches(rwb));
1841 	if (!flush->bmap) {
1842 		goto error;
1843 	}
1844 
1845 	flush->dev = dev;
1846 	flush->cb.fn = cb_fn;
1847 	flush->cb.ctx = cb_arg;
1848 
1849 	return flush;
1850 error:
1851 	free(flush);
1852 	return NULL;
1853 }
1854 
1855 static void
1856 _ftl_flush(void *ctx)
1857 {
1858 	struct ftl_flush *flush = ctx;
1859 	struct spdk_ftl_dev *dev = flush->dev;
1860 	struct ftl_rwb *rwb = dev->rwb;
1861 	struct ftl_rwb_batch *batch;
1862 
1863 	/* Attach flush object to all non-empty batches */
1864 	ftl_rwb_foreach_batch(batch, rwb) {
1865 		if (!ftl_rwb_batch_empty(batch)) {
1866 			spdk_bit_array_set(flush->bmap, ftl_rwb_batch_get_offset(batch));
1867 			flush->num_req++;
1868 		}
1869 	}
1870 
1871 	LIST_INSERT_HEAD(&dev->flush_list, flush, list_entry);
1872 
1873 	/* If the RWB was already empty, the flush can be completed right away */
1874 	if (!flush->num_req) {
1875 		ftl_complete_flush(flush);
1876 	}
1877 }
1878 
1879 int
1880 spdk_ftl_flush(struct spdk_ftl_dev *dev, spdk_ftl_fn cb_fn, void *cb_arg)
1881 {
1882 	struct ftl_flush *flush;
1883 
1884 	if (!dev->initialized) {
1885 		return -EBUSY;
1886 	}
1887 
1888 	flush = ftl_flush_init(dev, cb_fn, cb_arg);
1889 	if (!flush) {
1890 		return -ENOMEM;
1891 	}
1892 
1893 	spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_flush, flush);
1894 	return 0;
1895 }
1896 
1897 static void
1898 _ftl_process_anm_event(void *ctx)
1899 {
1900 	ftl_process_anm_event((struct ftl_anm_event *)ctx);
1901 }
1902 
1903 void
1904 ftl_process_anm_event(struct ftl_anm_event *event)
1905 {
1906 	struct spdk_ftl_dev *dev = event->dev;
1907 
1908 	if (!ftl_check_core_thread(dev)) {
1909 		spdk_thread_send_msg(ftl_get_core_thread(dev), _ftl_process_anm_event, event);
1910 		return;
1911 	}
1912 
1913 	SPDK_DEBUGLOG(SPDK_LOG_FTL_CORE, "Unconsumed ANM received for dev: %p...\n", event->dev);
1914 	ftl_anm_event_complete(event);
1915 }
1916 
1917 bool
1918 ftl_ppa_is_written(struct ftl_band *band, struct ftl_ppa ppa)
1919 {
1920 	struct ftl_chunk *chunk = ftl_band_chunk_from_ppa(band, ppa);
1921 
1922 	return ppa.lbk < chunk->write_offset;
1923 }
1924 
1925 static void
1926 ftl_process_retry_queue(struct spdk_ftl_dev *dev)
1927 {
1928 	struct ftl_io *io;
1929 	int rc;
1930 
1931 	while (!TAILQ_EMPTY(&dev->retry_queue)) {
1932 		io = TAILQ_FIRST(&dev->retry_queue);
1933 
1934 		/* Retry only if IO is still healthy */
1935 		if (spdk_likely(io->status == 0)) {
1936 			rc = ftl_submit_read(io);
1937 			if (rc == -ENOMEM) {
1938 				break;
1939 			}
1940 		}
1941 
1942 		io->flags &= ~FTL_IO_RETRY;
1943 		TAILQ_REMOVE(&dev->retry_queue, io, retry_entry);
1944 
1945 		if (ftl_io_done(io)) {
1946 			ftl_io_complete(io);
1947 		}
1948 	}
1949 }
1950 
1951 int
1952 ftl_task_read(void *ctx)
1953 {
1954 	struct ftl_thread *thread = ctx;
1955 	struct spdk_ftl_dev *dev = thread->dev;
1956 	struct spdk_nvme_qpair *qpair = ftl_get_read_qpair(dev);
1957 	size_t num_completed;
1958 
1959 	if (dev->halt) {
1960 		if (ftl_shutdown_complete(dev)) {
1961 			spdk_poller_unregister(&thread->poller);
1962 			return 0;
1963 		}
1964 	}
1965 
1966 	num_completed = spdk_nvme_qpair_process_completions(qpair, 0);
1967 
1968 	if (num_completed && !TAILQ_EMPTY(&dev->retry_queue)) {
1969 		ftl_process_retry_queue(dev);
1970 	}
1971 
1972 	return num_completed;
1973 }
1974 
1975 int
1976 ftl_task_core(void *ctx)
1977 {
1978 	struct ftl_thread *thread = ctx;
1979 	struct spdk_ftl_dev *dev = thread->dev;
1980 	struct spdk_nvme_qpair *qpair = ftl_get_write_qpair(dev);
1981 
1982 	if (dev->halt) {
1983 		if (ftl_shutdown_complete(dev)) {
1984 			spdk_poller_unregister(&thread->poller);
1985 			return 0;
1986 		}
1987 	}
1988 
1989 	ftl_process_writes(dev);
1990 	spdk_nvme_qpair_process_completions(qpair, 0);
1991 	ftl_process_relocs(dev);
1992 
1993 	return 0;
1994 }
1995 
1996 SPDK_LOG_REGISTER_COMPONENT("ftl_core", SPDK_LOG_FTL_CORE)
1997