xref: /spdk/lib/ftl/ftl_core.c (revision 95d6c9fac17572b107042103439aafd696d60b0e)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/likely.h"
7 #include "spdk/stdinc.h"
8 #include "spdk/nvme.h"
9 #include "spdk/thread.h"
10 #include "spdk/bdev_module.h"
11 #include "spdk/string.h"
12 #include "spdk/ftl.h"
13 #include "spdk/crc32.h"
14 
15 #include "ftl_core.h"
16 #include "ftl_band.h"
17 #include "ftl_io.h"
18 #include "ftl_debug.h"
19 #include "ftl_internal.h"
20 #include "mngt/ftl_mngt.h"
21 
22 
23 size_t
24 spdk_ftl_io_size(void)
25 {
26 	return sizeof(struct ftl_io);
27 }
28 
29 static void
30 ftl_io_cmpl_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
31 {
32 	struct ftl_io *io = cb_arg;
33 	struct spdk_ftl_dev *dev = io->dev;
34 
35 	ftl_stats_bdev_io_completed(dev, FTL_STATS_TYPE_USER, bdev_io);
36 
37 	if (spdk_unlikely(!success)) {
38 		io->status = -EIO;
39 	}
40 
41 	ftl_trace_completion(dev, io, FTL_TRACE_COMPLETION_DISK);
42 
43 	ftl_io_dec_req(io);
44 	if (ftl_io_done(io)) {
45 		ftl_io_complete(io);
46 	}
47 
48 	spdk_bdev_free_io(bdev_io);
49 }
50 
51 static void
52 ftl_band_erase(struct ftl_band *band)
53 {
54 	assert(band->md->state == FTL_BAND_STATE_CLOSED ||
55 	       band->md->state == FTL_BAND_STATE_FREE);
56 
57 	ftl_band_set_state(band, FTL_BAND_STATE_PREP);
58 }
59 
60 static size_t
61 ftl_get_limit(const struct spdk_ftl_dev *dev, int type)
62 {
63 	assert(type < SPDK_FTL_LIMIT_MAX);
64 	return dev->conf.limits[type];
65 }
66 
67 static bool
68 ftl_shutdown_complete(struct spdk_ftl_dev *dev)
69 {
70 	uint64_t i;
71 
72 	if (dev->num_inflight) {
73 		return false;
74 	}
75 
76 	if (!ftl_nv_cache_is_halted(&dev->nv_cache)) {
77 		ftl_nv_cache_halt(&dev->nv_cache);
78 		return false;
79 	}
80 
81 	if (!ftl_writer_is_halted(&dev->writer_user)) {
82 		ftl_writer_halt(&dev->writer_user);
83 		return false;
84 	}
85 
86 	if (!ftl_reloc_is_halted(dev->reloc)) {
87 		ftl_reloc_halt(dev->reloc);
88 		return false;
89 	}
90 
91 	if (!ftl_writer_is_halted(&dev->writer_gc)) {
92 		ftl_writer_halt(&dev->writer_gc);
93 		return false;
94 	}
95 
96 	if (!ftl_nv_cache_chunks_busy(&dev->nv_cache)) {
97 		return false;
98 	}
99 
100 	for (i = 0; i < ftl_get_num_bands(dev); ++i) {
101 		if (dev->bands[i].queue_depth ||
102 		    dev->bands[i].md->state == FTL_BAND_STATE_CLOSING) {
103 			return false;
104 		}
105 	}
106 
107 	if (!ftl_l2p_is_halted(dev)) {
108 		ftl_l2p_halt(dev);
109 		return false;
110 	}
111 
112 	return true;
113 }
114 
115 void
116 ftl_apply_limits(struct spdk_ftl_dev *dev)
117 {
118 	size_t limit;
119 	struct ftl_stats *stats = &dev->stats;
120 	int i;
121 
122 	/*  Clear existing limit */
123 	dev->limit = SPDK_FTL_LIMIT_MAX;
124 
125 	for (i = SPDK_FTL_LIMIT_CRIT; i < SPDK_FTL_LIMIT_MAX; ++i) {
126 		limit = ftl_get_limit(dev, i);
127 
128 		if (dev->num_free <= limit) {
129 			stats->limits[i]++;
130 			dev->limit = i;
131 			break;
132 		}
133 	}
134 
135 	ftl_trace_limits(dev, dev->limit, dev->num_free);
136 }
137 
138 void
139 ftl_invalidate_addr(struct spdk_ftl_dev *dev, ftl_addr addr)
140 {
141 	struct ftl_band *band;
142 	struct ftl_p2l_map *p2l_map;
143 
144 	if (ftl_addr_in_nvc(dev, addr)) {
145 		ftl_bitmap_clear(dev->valid_map, addr);
146 		return;
147 	}
148 
149 	band = ftl_band_from_addr(dev, addr);
150 	p2l_map = &band->p2l_map;
151 
152 	/* The bit might be already cleared if two writes are scheduled to the */
153 	/* same LBA at the same time */
154 	if (ftl_bitmap_get(dev->valid_map, addr)) {
155 		assert(p2l_map->num_valid > 0);
156 		ftl_bitmap_clear(dev->valid_map, addr);
157 		p2l_map->num_valid--;
158 	}
159 
160 	/* Invalidate open/full band p2l_map entry to keep p2l and l2p
161 	 * consistency when band is going to close state */
162 	if (FTL_BAND_STATE_OPEN == band->md->state || FTL_BAND_STATE_FULL == band->md->state) {
163 		p2l_map->band_map[ftl_band_block_offset_from_addr(band, addr)].lba = FTL_LBA_INVALID;
164 		p2l_map->band_map[ftl_band_block_offset_from_addr(band, addr)].seq_id = 0;
165 	}
166 }
167 
168 static int
169 ftl_read_canceled(int rc)
170 {
171 	return rc == -EFAULT;
172 }
173 
174 static int
175 ftl_get_next_read_addr(struct ftl_io *io, ftl_addr *addr)
176 {
177 	struct spdk_ftl_dev *dev = io->dev;
178 	ftl_addr next_addr;
179 	size_t i;
180 	bool addr_cached = false;
181 
182 	*addr = ftl_l2p_get(dev, ftl_io_current_lba(io));
183 	io->map[io->pos] = *addr;
184 
185 	/* If the address is invalid, skip it */
186 	if (*addr == FTL_ADDR_INVALID) {
187 		return -EFAULT;
188 	}
189 
190 	addr_cached = ftl_addr_in_nvc(dev, *addr);
191 
192 	for (i = 1; i < ftl_io_iovec_len_left(io); ++i) {
193 		next_addr = ftl_l2p_get(dev, ftl_io_get_lba(io, io->pos + i));
194 
195 		if (next_addr == FTL_ADDR_INVALID) {
196 			break;
197 		}
198 
199 		/* It's not enough to check for contiguity, if user data is on the last block
200 		 * of base device and first nvc, then they're 'contiguous', but can't be handled
201 		 * with one read request.
202 		 */
203 		if (addr_cached != ftl_addr_in_nvc(dev, next_addr)) {
204 			break;
205 		}
206 
207 		if (*addr + i != next_addr) {
208 			break;
209 		}
210 
211 		io->map[io->pos + i] = next_addr;
212 	}
213 
214 	return i;
215 }
216 
217 static void ftl_submit_read(struct ftl_io *io);
218 
219 static void
220 _ftl_submit_read(void *_io)
221 {
222 	struct ftl_io *io = _io;
223 
224 	ftl_submit_read(io);
225 }
226 
227 static void
228 ftl_submit_read(struct ftl_io *io)
229 {
230 	struct spdk_ftl_dev *dev = io->dev;
231 	ftl_addr addr;
232 	int rc = 0, num_blocks;
233 
234 	while (io->pos < io->num_blocks) {
235 		num_blocks = ftl_get_next_read_addr(io, &addr);
236 		rc = num_blocks;
237 
238 		/* User LBA doesn't hold valid data (trimmed or never written to), fill with 0 and skip this block */
239 		if (ftl_read_canceled(rc)) {
240 			memset(ftl_io_iovec_addr(io), 0, FTL_BLOCK_SIZE);
241 			ftl_io_advance(io, 1);
242 			continue;
243 		}
244 
245 		assert(num_blocks > 0);
246 
247 		ftl_trace_submission(dev, io, addr, num_blocks);
248 
249 		if (ftl_addr_in_nvc(dev, addr)) {
250 			rc = ftl_nv_cache_read(io, addr, num_blocks, ftl_io_cmpl_cb, io);
251 		} else {
252 			rc = spdk_bdev_read_blocks(dev->base_bdev_desc, dev->base_ioch,
253 						   ftl_io_iovec_addr(io),
254 						   addr, num_blocks, ftl_io_cmpl_cb, io);
255 		}
256 
257 		if (spdk_unlikely(rc)) {
258 			if (rc == -ENOMEM) {
259 				struct spdk_bdev *bdev;
260 				struct spdk_io_channel *ch;
261 
262 				if (ftl_addr_in_nvc(dev, addr)) {
263 					bdev = spdk_bdev_desc_get_bdev(dev->nv_cache.bdev_desc);
264 					ch = dev->nv_cache.cache_ioch;
265 				} else {
266 					bdev = spdk_bdev_desc_get_bdev(dev->base_bdev_desc);
267 					ch = dev->base_ioch;
268 				}
269 				io->bdev_io_wait.bdev = bdev;
270 				io->bdev_io_wait.cb_fn = _ftl_submit_read;
271 				io->bdev_io_wait.cb_arg = io;
272 				spdk_bdev_queue_io_wait(bdev, ch, &io->bdev_io_wait);
273 				return;
274 			} else {
275 				ftl_abort();
276 			}
277 		}
278 
279 		ftl_io_inc_req(io);
280 		ftl_io_advance(io, num_blocks);
281 	}
282 
283 	/* If we didn't have to read anything from the device, */
284 	/* complete the request right away */
285 	if (ftl_io_done(io)) {
286 		ftl_io_complete(io);
287 	}
288 }
289 
290 bool
291 ftl_needs_reloc(struct spdk_ftl_dev *dev)
292 {
293 	size_t limit = ftl_get_limit(dev, SPDK_FTL_LIMIT_START);
294 
295 	if (dev->num_free <= limit) {
296 		return true;
297 	}
298 
299 	return false;
300 }
301 
302 void
303 spdk_ftl_dev_get_attrs(const struct spdk_ftl_dev *dev, struct spdk_ftl_attrs *attrs,
304 		       size_t attrs_size)
305 {
306 	attrs->num_blocks = dev->num_lbas;
307 	attrs->block_size = FTL_BLOCK_SIZE;
308 	attrs->optimum_io_size = dev->xfer_size;
309 	/* NOTE: check any new fields in attrs against attrs_size */
310 }
311 
312 static void
313 ftl_io_pin_cb(struct spdk_ftl_dev *dev, int status, struct ftl_l2p_pin_ctx *pin_ctx)
314 {
315 	struct ftl_io *io = pin_ctx->cb_ctx;
316 
317 	if (spdk_unlikely(status != 0)) {
318 		/* Retry on the internal L2P fault */
319 		io->status = -EAGAIN;
320 		ftl_io_complete(io);
321 		return;
322 	}
323 
324 	io->flags |= FTL_IO_PINNED;
325 	ftl_submit_read(io);
326 }
327 
328 static void
329 ftl_io_pin(struct ftl_io *io)
330 {
331 	if (spdk_unlikely(io->flags & FTL_IO_PINNED)) {
332 		/*
333 		 * The IO is in a retry path and it had been pinned already.
334 		 * Continue with further processing.
335 		 */
336 		ftl_l2p_pin_skip(io->dev, ftl_io_pin_cb, io, &io->l2p_pin_ctx);
337 	} else {
338 		/* First time when pinning the IO */
339 		ftl_l2p_pin(io->dev, io->lba, io->num_blocks,
340 			    ftl_io_pin_cb, io, &io->l2p_pin_ctx);
341 	}
342 }
343 
344 static void
345 start_io(struct ftl_io *io)
346 {
347 	struct ftl_io_channel *ioch = ftl_io_channel_get_ctx(io->ioch);
348 	struct spdk_ftl_dev *dev = io->dev;
349 
350 	io->map = ftl_mempool_get(ioch->map_pool);
351 	if (spdk_unlikely(!io->map)) {
352 		io->status = -ENOMEM;
353 		ftl_io_complete(io);
354 		return;
355 	}
356 
357 	switch (io->type) {
358 	case FTL_IO_READ:
359 		TAILQ_INSERT_TAIL(&dev->rd_sq, io, queue_entry);
360 		break;
361 	case FTL_IO_WRITE:
362 		TAILQ_INSERT_TAIL(&dev->wr_sq, io, queue_entry);
363 		break;
364 	case FTL_IO_TRIM:
365 		TAILQ_INSERT_TAIL(&dev->trim_sq, io, queue_entry);
366 		break;
367 	default:
368 		io->status = -EOPNOTSUPP;
369 		ftl_io_complete(io);
370 	}
371 }
372 
373 static int
374 queue_io(struct spdk_ftl_dev *dev, struct ftl_io *io)
375 {
376 	size_t result;
377 	struct ftl_io_channel *ioch = ftl_io_channel_get_ctx(io->ioch);
378 
379 	result = spdk_ring_enqueue(ioch->sq, (void **)&io, 1, NULL);
380 	if (spdk_unlikely(0 == result)) {
381 		return -EAGAIN;
382 	}
383 
384 	return 0;
385 }
386 
387 int
388 spdk_ftl_writev(struct spdk_ftl_dev *dev, struct ftl_io *io, struct spdk_io_channel *ch,
389 		uint64_t lba, uint64_t lba_cnt, struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn,
390 		void *cb_arg)
391 {
392 	int rc;
393 
394 	if (iov_cnt == 0) {
395 		return -EINVAL;
396 	}
397 
398 	if (lba_cnt == 0) {
399 		return -EINVAL;
400 	}
401 
402 	if (lba_cnt != ftl_iovec_num_blocks(iov, iov_cnt)) {
403 		FTL_ERRLOG(dev, "Invalid IO vector to handle, device %s, LBA %"PRIu64"\n",
404 			   dev->conf.name, lba);
405 		return -EINVAL;
406 	}
407 
408 	if (!dev->initialized) {
409 		return -EBUSY;
410 	}
411 
412 	rc = ftl_io_init(ch, io, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_WRITE);
413 	if (rc) {
414 		return rc;
415 	}
416 
417 	return queue_io(dev, io);
418 }
419 
420 int
421 spdk_ftl_readv(struct spdk_ftl_dev *dev, struct ftl_io *io, struct spdk_io_channel *ch,
422 	       uint64_t lba, uint64_t lba_cnt, struct iovec *iov, size_t iov_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
423 {
424 	int rc;
425 
426 	if (iov_cnt == 0) {
427 		return -EINVAL;
428 	}
429 
430 	if (lba_cnt == 0) {
431 		return -EINVAL;
432 	}
433 
434 	if (lba_cnt != ftl_iovec_num_blocks(iov, iov_cnt)) {
435 		FTL_ERRLOG(dev, "Invalid IO vector to handle, device %s, LBA %"PRIu64"\n",
436 			   dev->conf.name, lba);
437 		return -EINVAL;
438 	}
439 
440 	if (!dev->initialized) {
441 		return -EBUSY;
442 	}
443 
444 	rc = ftl_io_init(ch, io, lba, lba_cnt, iov, iov_cnt, cb_fn, cb_arg, FTL_IO_READ);
445 	if (rc) {
446 		return rc;
447 	}
448 
449 	return queue_io(dev, io);
450 }
451 
452 int
453 ftl_trim(struct spdk_ftl_dev *dev, struct ftl_io *io, struct spdk_io_channel *ch,
454 	 uint64_t lba, uint64_t lba_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
455 {
456 	int rc;
457 
458 	rc = ftl_io_init(ch, io, lba, lba_cnt, NULL, 0, cb_fn, cb_arg, FTL_IO_TRIM);
459 	if (rc) {
460 		return rc;
461 	}
462 
463 	return queue_io(dev, io);
464 }
465 
466 int
467 spdk_ftl_unmap(struct spdk_ftl_dev *dev, struct ftl_io *io, struct spdk_io_channel *ch,
468 	       uint64_t lba, uint64_t lba_cnt, spdk_ftl_fn cb_fn, void *cb_arg)
469 {
470 	int rc;
471 	uint64_t alignment = dev->layout.l2p.lbas_in_page;
472 
473 	if (lba_cnt == 0) {
474 		return -EINVAL;
475 	}
476 
477 	if (lba + lba_cnt < lba_cnt) {
478 		return -EINVAL;
479 	}
480 
481 	if (lba + lba_cnt > dev->num_lbas) {
482 		return -EINVAL;
483 	}
484 
485 	if (!dev->initialized) {
486 		return -EBUSY;
487 	}
488 
489 	if (lba % alignment || lba_cnt % alignment) {
490 		if (!io) {
491 			/* This is management/RPC path, its parameters must be aligned to 1MiB. */
492 			return -EINVAL;
493 		}
494 
495 		/* Otherwise unaligned IO requests are NOPs */
496 		rc = ftl_io_init(ch, io, lba, lba_cnt, NULL, 0, cb_fn, cb_arg, FTL_IO_TRIM);
497 		if (rc) {
498 			return rc;
499 		}
500 
501 		io->status = 0;
502 		ftl_io_complete(io);
503 		return 0;
504 	}
505 
506 	if (io) {
507 		rc = ftl_trim(dev, io, ch, lba, lba_cnt, cb_fn, cb_arg);
508 	} else {
509 		rc = ftl_mngt_trim(dev, lba, lba_cnt, cb_fn, cb_arg);
510 	}
511 
512 	return rc;
513 }
514 
515 #define FTL_IO_QUEUE_BATCH 16
516 int
517 ftl_io_channel_poll(void *arg)
518 {
519 	struct ftl_io_channel *ch = arg;
520 	void *ios[FTL_IO_QUEUE_BATCH];
521 	uint64_t i, count;
522 
523 	count = spdk_ring_dequeue(ch->cq, ios, FTL_IO_QUEUE_BATCH);
524 	if (count == 0) {
525 		return SPDK_POLLER_IDLE;
526 	}
527 
528 	for (i = 0; i < count; i++) {
529 		struct ftl_io *io = ios[i];
530 		io->user_fn(io->cb_ctx, io->status);
531 	}
532 
533 	return SPDK_POLLER_BUSY;
534 }
535 
536 static void
537 ftl_process_io_channel(struct spdk_ftl_dev *dev, struct ftl_io_channel *ioch)
538 {
539 	void *ios[FTL_IO_QUEUE_BATCH];
540 	size_t count, i;
541 
542 	count = spdk_ring_dequeue(ioch->sq, ios, FTL_IO_QUEUE_BATCH);
543 	if (count == 0) {
544 		return;
545 	}
546 
547 	for (i = 0; i < count; i++) {
548 		struct ftl_io *io = ios[i];
549 		start_io(io);
550 	}
551 }
552 
553 static void
554 ftl_trim_log_clear(struct spdk_ftl_dev *dev)
555 {
556 	struct ftl_trim_log *log = ftl_md_get_buffer(dev->layout.md[FTL_LAYOUT_REGION_TYPE_TRIM_LOG]);
557 
558 	memset(&log->hdr, 0, sizeof(log->hdr));
559 }
560 
561 static void
562 ftl_trim_finish(struct ftl_io *io, int status)
563 {
564 	io->dev->trim_qd--;
565 
566 	if (spdk_unlikely(status)) {
567 #ifdef SPDK_FTL_RETRY_ON_ERROR
568 		ftl_io_clear(io);
569 		TAILQ_INSERT_HEAD(&io->dev->trim_sq, io, queue_entry);
570 		return;
571 #else
572 		io->status = status;
573 #endif
574 	}
575 
576 	ftl_io_complete(io);
577 }
578 
579 static void
580 ftl_trim_log_close_cb(int status, void *cb_arg)
581 {
582 	struct ftl_io *io = cb_arg;
583 
584 	ftl_trim_finish(io, status);
585 }
586 
587 static void
588 ftl_trim_log_persist(struct ftl_io *io, ftl_md_io_entry_cb cb)
589 {
590 	struct spdk_ftl_dev *dev = io->dev;
591 	struct ftl_md *trim_log = dev->layout.md[FTL_LAYOUT_REGION_TYPE_TRIM_LOG];
592 
593 	ftl_md_persist_entries(trim_log, 0, 1, ftl_md_get_buffer(trim_log), NULL,
594 			       cb, io, &dev->trim_md_io_entry_ctx);
595 }
596 
597 static void
598 ftl_trim_md_cb(int status, void *cb_arg)
599 {
600 	struct ftl_io *io = cb_arg;
601 	struct spdk_ftl_dev *dev = io->dev;
602 
603 	if (status) {
604 		io->status = status;
605 	}
606 	ftl_trim_log_clear(dev);
607 	ftl_trim_log_persist(io, ftl_trim_log_close_cb);
608 }
609 
610 static void
611 ftl_trim_log_open_cb(int status, void *cb_arg)
612 {
613 	struct ftl_io *io = cb_arg;
614 	struct spdk_ftl_dev *dev = io->dev;
615 	struct ftl_md *trim_md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_TRIM_MD];
616 	uint64_t first, entries;
617 	const uint64_t entries_in_block = FTL_BLOCK_SIZE / sizeof(uint64_t);
618 	char *buffer;
619 
620 	if (status) {
621 		ftl_trim_finish(io, status);
622 		return;
623 	}
624 
625 	/* Map trim space into L2P pages */
626 	first = io->lba / dev->layout.l2p.lbas_in_page;
627 	entries = io->num_blocks / dev->layout.l2p.lbas_in_page;
628 	/* Map pages into trim metadata location */
629 	first = first / entries_in_block;
630 	entries = spdk_divide_round_up(entries, entries_in_block);
631 
632 	/* Get trim metadata buffer */
633 	buffer = (char *)ftl_md_get_buffer(trim_md) + (FTL_BLOCK_SIZE * first);
634 
635 	/* Persist the trim metadata snippet which corresponds to the trim IO */
636 	ftl_md_persist_entries(trim_md, first, entries, buffer, NULL,
637 			       ftl_trim_md_cb, io, &dev->trim_md_io_entry_ctx);
638 }
639 
640 void
641 ftl_set_trim_map(struct spdk_ftl_dev *dev, uint64_t lba, uint64_t num_blocks, uint64_t seq_id)
642 {
643 	uint64_t first_page, num_pages;
644 	uint64_t lbas_in_page = dev->layout.l2p.lbas_in_page;
645 	struct ftl_md *md = dev->layout.md[FTL_LAYOUT_REGION_TYPE_TRIM_MD];
646 	uint64_t *page = ftl_md_get_buffer(md);
647 	struct ftl_trim_log *log;
648 	size_t i;
649 
650 	first_page = lba / lbas_in_page;
651 	num_pages = num_blocks / lbas_in_page;
652 
653 	/* Fill trim metadata */
654 	for (i = first_page; i < first_page + num_pages; ++i) {
655 		ftl_bitmap_set(dev->trim_map, i);
656 		page[i] = seq_id;
657 	}
658 
659 	/* Fill trim log */
660 	log = ftl_md_get_buffer(dev->layout.md[FTL_LAYOUT_REGION_TYPE_TRIM_LOG]);
661 	log->hdr.trim.seq_id = seq_id;
662 	log->hdr.trim.num_blocks = num_blocks;
663 	log->hdr.trim.start_lba = lba;
664 }
665 
666 static bool
667 ftl_process_trim(struct ftl_io *io)
668 {
669 	struct spdk_ftl_dev *dev = io->dev;
670 	uint64_t seq_id;
671 
672 	seq_id = ftl_nv_cache_acquire_trim_seq_id(&dev->nv_cache);
673 	if (seq_id == 0) {
674 		return false;
675 	}
676 
677 	dev->trim_in_progress = true;
678 	dev->trim_qd++;
679 
680 	dev->sb_shm->trim.start_lba = io->lba;
681 	dev->sb_shm->trim.num_blocks = io->num_blocks;
682 	dev->sb_shm->trim.seq_id = seq_id;
683 	dev->sb_shm->trim.in_progress = true;
684 	ftl_set_trim_map(dev, io->lba, io->num_blocks, seq_id);
685 	ftl_debug_inject_trim_error();
686 	dev->sb_shm->trim.in_progress = false;
687 
688 	ftl_trim_log_persist(io, ftl_trim_log_open_cb);
689 	return true;
690 }
691 
692 static void
693 ftl_process_io_queue(struct spdk_ftl_dev *dev)
694 {
695 	struct ftl_io_channel *ioch;
696 	struct ftl_io *io;
697 
698 	/* TODO: Try to figure out a mechanism to batch more requests at the same time,
699 	 * with keeping enough resources (pinned pages), between reads, writes and gc/compaction
700 	 */
701 	if (!TAILQ_EMPTY(&dev->rd_sq)) {
702 		io = TAILQ_FIRST(&dev->rd_sq);
703 		TAILQ_REMOVE(&dev->rd_sq, io, queue_entry);
704 		assert(io->type == FTL_IO_READ);
705 		ftl_io_pin(io);
706 		ftl_add_io_activity(dev);
707 	}
708 
709 	while (!TAILQ_EMPTY(&dev->wr_sq) && !ftl_nv_cache_throttle(dev)) {
710 		io = TAILQ_FIRST(&dev->wr_sq);
711 		TAILQ_REMOVE(&dev->wr_sq, io, queue_entry);
712 		assert(io->type == FTL_IO_WRITE);
713 		if (!ftl_nv_cache_write(io)) {
714 			TAILQ_INSERT_HEAD(&dev->wr_sq, io, queue_entry);
715 			break;
716 		}
717 		ftl_add_io_activity(dev);
718 	}
719 
720 	if (!TAILQ_EMPTY(&dev->trim_sq) && dev->trim_qd == 0) {
721 		io = TAILQ_FIRST(&dev->trim_sq);
722 		TAILQ_REMOVE(&dev->trim_sq, io, queue_entry);
723 		assert(io->type == FTL_IO_TRIM);
724 
725 		/*
726 		 * Trim operation requires generating a sequence id for itself, which it gets based on the open chunk
727 		 * in nv cache. If there are no open chunks (because we're in the middle of state transition or compaction
728 		 * lagged behind), then we need to wait for the nv cache to resolve the situation - it's fine to just put the
729 		 * trim and try again later.
730 		 */
731 		if (!ftl_process_trim(io)) {
732 			TAILQ_INSERT_HEAD(&dev->trim_sq, io, queue_entry);
733 		} else {
734 			ftl_add_io_activity(dev);
735 		}
736 	}
737 
738 	TAILQ_FOREACH(ioch, &dev->ioch_queue, entry) {
739 		ftl_process_io_channel(dev, ioch);
740 	}
741 }
742 
743 int
744 ftl_core_poller(void *ctx)
745 {
746 	struct spdk_ftl_dev *dev = ctx;
747 	uint64_t io_activity_total_old = dev->stats.io_activity_total;
748 
749 	if (dev->halt && ftl_shutdown_complete(dev)) {
750 		spdk_poller_unregister(&dev->core_poller);
751 		return SPDK_POLLER_IDLE;
752 	}
753 
754 	ftl_process_io_queue(dev);
755 	ftl_writer_run(&dev->writer_user);
756 	ftl_writer_run(&dev->writer_gc);
757 	ftl_reloc(dev->reloc);
758 	ftl_nv_cache_process(dev);
759 	ftl_l2p_process(dev);
760 
761 	if (io_activity_total_old != dev->stats.io_activity_total) {
762 		return SPDK_POLLER_BUSY;
763 	}
764 
765 	return SPDK_POLLER_IDLE;
766 }
767 
768 struct ftl_band *
769 ftl_band_get_next_free(struct spdk_ftl_dev *dev)
770 {
771 	struct ftl_band *band = NULL;
772 
773 	if (!TAILQ_EMPTY(&dev->free_bands)) {
774 		band = TAILQ_FIRST(&dev->free_bands);
775 		TAILQ_REMOVE(&dev->free_bands, band, queue_entry);
776 		ftl_band_erase(band);
777 	}
778 
779 	return band;
780 }
781 
782 void *g_ftl_write_buf;
783 void *g_ftl_read_buf;
784 
785 int
786 spdk_ftl_init(void)
787 {
788 	g_ftl_write_buf = spdk_zmalloc(FTL_ZERO_BUFFER_SIZE, FTL_ZERO_BUFFER_SIZE, NULL,
789 				       SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
790 	if (!g_ftl_write_buf) {
791 		return -ENOMEM;
792 	}
793 
794 	g_ftl_read_buf = spdk_zmalloc(FTL_ZERO_BUFFER_SIZE, FTL_ZERO_BUFFER_SIZE, NULL,
795 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
796 	if (!g_ftl_read_buf) {
797 		spdk_free(g_ftl_write_buf);
798 		g_ftl_write_buf = NULL;
799 		return -ENOMEM;
800 	}
801 	return 0;
802 }
803 
804 void
805 spdk_ftl_fini(void)
806 {
807 	spdk_free(g_ftl_write_buf);
808 	spdk_free(g_ftl_read_buf);
809 }
810 
811 void
812 spdk_ftl_dev_set_fast_shutdown(struct spdk_ftl_dev *dev, bool fast_shutdown)
813 {
814 	assert(dev);
815 	dev->conf.fast_shutdown = fast_shutdown;
816 }
817 
818 void
819 ftl_stats_bdev_io_completed(struct spdk_ftl_dev *dev, enum ftl_stats_type type,
820 			    struct spdk_bdev_io *bdev_io)
821 {
822 	struct ftl_stats_entry *stats_entry = &dev->stats.entries[type];
823 	struct ftl_stats_group *stats_group;
824 	uint32_t cdw0;
825 	int sct;
826 	int sc;
827 
828 	switch (bdev_io->type) {
829 	case SPDK_BDEV_IO_TYPE_READ:
830 		stats_group = &stats_entry->read;
831 		break;
832 	case SPDK_BDEV_IO_TYPE_WRITE:
833 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
834 		stats_group = &stats_entry->write;
835 		break;
836 	default:
837 		return;
838 	}
839 
840 	spdk_bdev_io_get_nvme_status(bdev_io, &cdw0, &sct, &sc);
841 
842 	if (sct == SPDK_NVME_SCT_GENERIC && sc == SPDK_NVME_SC_SUCCESS) {
843 		stats_group->ios++;
844 		stats_group->blocks += bdev_io->u.bdev.num_blocks;
845 	} else if (sct == SPDK_NVME_SCT_MEDIA_ERROR) {
846 		stats_group->errors.media++;
847 	} else {
848 		stats_group->errors.other++;
849 	}
850 }
851 
852 struct spdk_io_channel *
853 spdk_ftl_get_io_channel(struct spdk_ftl_dev *dev)
854 {
855 	return spdk_get_io_channel(dev);
856 }
857 
858 void
859 ftl_stats_crc_error(struct spdk_ftl_dev *dev, enum ftl_stats_type type)
860 {
861 
862 	struct ftl_stats_entry *stats_entry = &dev->stats.entries[type];
863 	struct ftl_stats_group *stats_group = &stats_entry->read;
864 
865 	stats_group->errors.crc++;
866 }
867 
868 struct ftl_get_stats_ctx {
869 	struct spdk_ftl_dev *dev;
870 	struct ftl_stats *stats;
871 	struct spdk_thread *thread;
872 	spdk_ftl_stats_fn cb_fn;
873 	void *cb_arg;
874 };
875 
876 static void
877 _ftl_get_stats_cb(void *_ctx)
878 {
879 	struct ftl_get_stats_ctx *stats_ctx = _ctx;
880 
881 	stats_ctx->cb_fn(stats_ctx->stats, stats_ctx->cb_arg);
882 	free(stats_ctx);
883 }
884 
885 static void
886 _ftl_get_stats(void *_ctx)
887 {
888 	struct ftl_get_stats_ctx *stats_ctx = _ctx;
889 
890 	*stats_ctx->stats = stats_ctx->dev->stats;
891 
892 	if (spdk_thread_send_msg(stats_ctx->thread, _ftl_get_stats_cb, stats_ctx)) {
893 		ftl_abort();
894 	}
895 }
896 
897 int
898 spdk_ftl_get_stats(struct spdk_ftl_dev *dev, struct ftl_stats *stats, spdk_ftl_stats_fn cb_fn,
899 		   void *cb_arg)
900 {
901 	struct ftl_get_stats_ctx *stats_ctx;
902 	int rc;
903 
904 	stats_ctx = calloc(1, sizeof(struct ftl_get_stats_ctx));
905 	if (!stats_ctx) {
906 		return -ENOMEM;
907 	}
908 
909 	stats_ctx->dev = dev;
910 	stats_ctx->stats = stats;
911 	stats_ctx->cb_fn = cb_fn;
912 	stats_ctx->cb_arg = cb_arg;
913 	stats_ctx->thread = spdk_get_thread();
914 
915 	rc = spdk_thread_send_msg(dev->core_thread, _ftl_get_stats, stats_ctx);
916 	if (rc) {
917 		goto stats_allocated;
918 	}
919 
920 	return 0;
921 
922 stats_allocated:
923 	free(stats_ctx);
924 	return rc;
925 }
926 
927 SPDK_LOG_REGISTER_COMPONENT(ftl_core)
928