xref: /spdk/module/bdev/ocf/vbdev_ocf.c (revision f6866117acb32c78d5ea7bd76ba330284655af35)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include <ocf/ocf.h>
7 #include <ocf/ocf_types.h>
8 #include <ocf/ocf_mngt.h>
9 
10 #include "ctx.h"
11 #include "data.h"
12 #include "volume.h"
13 #include "utils.h"
14 #include "vbdev_ocf.h"
15 
16 #include "spdk/bdev_module.h"
17 #include "spdk/thread.h"
18 #include "spdk/string.h"
19 #include "spdk/log.h"
20 #include "spdk/cpuset.h"
21 
22 static struct spdk_bdev_module ocf_if;
23 
24 static TAILQ_HEAD(, vbdev_ocf) g_ocf_vbdev_head
25 	= TAILQ_HEAD_INITIALIZER(g_ocf_vbdev_head);
26 
27 static TAILQ_HEAD(, examining_bdev) g_ocf_examining_bdevs_head
28 	= TAILQ_HEAD_INITIALIZER(g_ocf_examining_bdevs_head);
29 
30 bool g_fini_started = false;
31 
32 /* Structure for keeping list of bdevs that are claimed but not used yet */
33 struct examining_bdev {
34 	struct spdk_bdev           *bdev;
35 	TAILQ_ENTRY(examining_bdev) tailq;
36 };
37 
38 /* Add bdev to list of claimed */
39 static void
40 examine_start(struct spdk_bdev *bdev)
41 {
42 	struct examining_bdev *entry = malloc(sizeof(*entry));
43 
44 	assert(entry);
45 	entry->bdev = bdev;
46 	TAILQ_INSERT_TAIL(&g_ocf_examining_bdevs_head, entry, tailq);
47 }
48 
49 /* Find bdev on list of claimed bdevs, then remove it,
50  * if it was the last one on list then report examine done */
51 static void
52 examine_done(int status, struct vbdev_ocf *vbdev, void *cb_arg)
53 {
54 	struct spdk_bdev *bdev = cb_arg;
55 	struct examining_bdev *entry, *safe, *found = NULL;
56 
57 	TAILQ_FOREACH_SAFE(entry, &g_ocf_examining_bdevs_head, tailq, safe) {
58 		if (entry->bdev == bdev) {
59 			if (found) {
60 				goto remove;
61 			} else {
62 				found = entry;
63 			}
64 		}
65 	}
66 
67 	assert(found);
68 	spdk_bdev_module_examine_done(&ocf_if);
69 
70 remove:
71 	TAILQ_REMOVE(&g_ocf_examining_bdevs_head, found, tailq);
72 	free(found);
73 }
74 
75 /* Free allocated strings and structure itself
76  * Used at shutdown only */
77 static void
78 free_vbdev(struct vbdev_ocf *vbdev)
79 {
80 	if (!vbdev) {
81 		return;
82 	}
83 
84 	free(vbdev->name);
85 	free(vbdev->cache.name);
86 	free(vbdev->core.name);
87 	free(vbdev);
88 }
89 
90 /* Get existing cache base
91  * that is attached to other vbdev */
92 static struct vbdev_ocf_base *
93 get_other_cache_base(struct vbdev_ocf_base *base)
94 {
95 	struct vbdev_ocf *vbdev;
96 
97 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
98 		if (&vbdev->cache == base || !vbdev->cache.attached) {
99 			continue;
100 		}
101 		if (!strcmp(vbdev->cache.name, base->name)) {
102 			return &vbdev->cache;
103 		}
104 	}
105 
106 	return NULL;
107 }
108 
109 static bool
110 is_ocf_cache_running(struct vbdev_ocf *vbdev)
111 {
112 	if (vbdev->cache.attached && vbdev->ocf_cache) {
113 		return ocf_cache_is_running(vbdev->ocf_cache);
114 	}
115 	return false;
116 }
117 
118 /* Get existing OCF cache instance
119  * that is started by other vbdev */
120 static ocf_cache_t
121 get_other_cache_instance(struct vbdev_ocf *vbdev)
122 {
123 	struct vbdev_ocf *cmp;
124 
125 	TAILQ_FOREACH(cmp, &g_ocf_vbdev_head, tailq) {
126 		if (cmp->state.doing_finish || cmp == vbdev) {
127 			continue;
128 		}
129 		if (strcmp(cmp->cache.name, vbdev->cache.name)) {
130 			continue;
131 		}
132 		if (is_ocf_cache_running(cmp)) {
133 			return cmp->ocf_cache;
134 		}
135 	}
136 
137 	return NULL;
138 }
139 
140 static void
141 _remove_base_bdev(void *ctx)
142 {
143 	struct spdk_bdev_desc *desc = ctx;
144 
145 	spdk_bdev_close(desc);
146 }
147 
148 /* Close and unclaim base bdev */
149 static void
150 remove_base_bdev(struct vbdev_ocf_base *base)
151 {
152 	if (base->attached) {
153 		if (base->management_channel) {
154 			spdk_put_io_channel(base->management_channel);
155 		}
156 
157 		spdk_bdev_module_release_bdev(base->bdev);
158 		/* Close the underlying bdev on its same opened thread. */
159 		if (base->thread && base->thread != spdk_get_thread()) {
160 			spdk_thread_send_msg(base->thread, _remove_base_bdev, base->desc);
161 		} else {
162 			spdk_bdev_close(base->desc);
163 		}
164 		base->attached = false;
165 	}
166 }
167 
168 /* Finish unregister operation */
169 static void
170 unregister_finish(struct vbdev_ocf *vbdev)
171 {
172 	spdk_bdev_destruct_done(&vbdev->exp_bdev, vbdev->state.stop_status);
173 
174 	if (vbdev->ocf_cache) {
175 		ocf_mngt_cache_put(vbdev->ocf_cache);
176 	}
177 
178 	if (vbdev->cache_ctx) {
179 		vbdev_ocf_cache_ctx_put(vbdev->cache_ctx);
180 	}
181 	vbdev_ocf_mngt_continue(vbdev, 0);
182 }
183 
184 static void
185 close_core_bdev(struct vbdev_ocf *vbdev)
186 {
187 	remove_base_bdev(&vbdev->core);
188 	vbdev_ocf_mngt_continue(vbdev, 0);
189 }
190 
191 static void
192 remove_core_cmpl(void *priv, int error)
193 {
194 	struct vbdev_ocf *vbdev = priv;
195 
196 	ocf_mngt_cache_unlock(vbdev->ocf_cache);
197 	vbdev_ocf_mngt_continue(vbdev, error);
198 }
199 
200 /* Try to lock cache, then remove core */
201 static void
202 remove_core_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
203 {
204 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
205 
206 	if (error) {
207 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
208 			    error, vbdev->name);
209 		vbdev_ocf_mngt_continue(vbdev, error);
210 		return;
211 	}
212 
213 	ocf_mngt_cache_remove_core(vbdev->ocf_core, remove_core_cmpl, vbdev);
214 }
215 
216 /* Detach core base */
217 static void
218 detach_core(struct vbdev_ocf *vbdev)
219 {
220 	if (is_ocf_cache_running(vbdev)) {
221 		ocf_mngt_cache_lock(vbdev->ocf_cache, remove_core_cache_lock_cmpl, vbdev);
222 	} else {
223 		vbdev_ocf_mngt_continue(vbdev, 0);
224 	}
225 }
226 
227 static void
228 close_cache_bdev(struct vbdev_ocf *vbdev)
229 {
230 	remove_base_bdev(&vbdev->cache);
231 	vbdev_ocf_mngt_continue(vbdev, 0);
232 }
233 
234 /* Detach cache base */
235 static void
236 detach_cache(struct vbdev_ocf *vbdev)
237 {
238 	vbdev->state.stop_status = vbdev->mngt_ctx.status;
239 
240 	/* If some other vbdev references this cache bdev,
241 	 * we detach this only by changing the flag, without actual close */
242 	if (get_other_cache_base(&vbdev->cache)) {
243 		vbdev->cache.attached = false;
244 	}
245 
246 	vbdev_ocf_mngt_continue(vbdev, 0);
247 }
248 
249 static void
250 stop_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
251 {
252 	struct vbdev_ocf *vbdev = priv;
253 
254 	vbdev_ocf_queue_put(vbdev->cache_ctx->mngt_queue);
255 	ocf_mngt_cache_unlock(cache);
256 
257 	vbdev_ocf_mngt_continue(vbdev, error);
258 }
259 
260 /* Try to lock cache, then stop it */
261 static void
262 stop_vbdev_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
263 {
264 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
265 
266 	if (error) {
267 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
268 			    error, vbdev->name);
269 		vbdev_ocf_mngt_continue(vbdev, error);
270 		return;
271 	}
272 
273 	ocf_mngt_cache_stop(vbdev->ocf_cache, stop_vbdev_cmpl, vbdev);
274 }
275 
276 /* Stop OCF cache object
277  * vbdev_ocf is not operational after this */
278 static void
279 stop_vbdev(struct vbdev_ocf *vbdev)
280 {
281 	if (!is_ocf_cache_running(vbdev)) {
282 		vbdev_ocf_mngt_continue(vbdev, 0);
283 		return;
284 	}
285 
286 	if (!g_fini_started && get_other_cache_instance(vbdev)) {
287 		SPDK_NOTICELOG("Not stopping cache instance '%s'"
288 			       " because it is referenced by other OCF bdev\n",
289 			       vbdev->cache.name);
290 		vbdev_ocf_mngt_continue(vbdev, 0);
291 		return;
292 	}
293 
294 	ocf_mngt_cache_lock(vbdev->ocf_cache, stop_vbdev_cache_lock_cmpl, vbdev);
295 }
296 
297 static void
298 flush_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
299 {
300 	struct vbdev_ocf *vbdev = priv;
301 
302 	ocf_mngt_cache_unlock(cache);
303 	vbdev_ocf_mngt_continue(vbdev, error);
304 }
305 
306 static void
307 flush_vbdev_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
308 {
309 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
310 
311 	if (error) {
312 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
313 			    error, vbdev->name);
314 		vbdev_ocf_mngt_continue(vbdev, error);
315 		return;
316 	}
317 
318 	ocf_mngt_cache_flush(vbdev->ocf_cache, flush_vbdev_cmpl, vbdev);
319 }
320 
321 static void
322 flush_vbdev(struct vbdev_ocf *vbdev)
323 {
324 	if (!is_ocf_cache_running(vbdev)) {
325 		vbdev_ocf_mngt_continue(vbdev, -EINVAL);
326 		return;
327 	}
328 
329 	ocf_mngt_cache_lock(vbdev->ocf_cache, flush_vbdev_cache_lock_cmpl, vbdev);
330 }
331 
332 /* Procedures called during dirty unregister */
333 vbdev_ocf_mngt_fn unregister_path_dirty[] = {
334 	flush_vbdev,
335 	stop_vbdev,
336 	detach_cache,
337 	close_cache_bdev,
338 	detach_core,
339 	close_core_bdev,
340 	unregister_finish,
341 	NULL
342 };
343 
344 /* Procedures called during clean unregister */
345 vbdev_ocf_mngt_fn unregister_path_clean[] = {
346 	flush_vbdev,
347 	detach_core,
348 	close_core_bdev,
349 	stop_vbdev,
350 	detach_cache,
351 	close_cache_bdev,
352 	unregister_finish,
353 	NULL
354 };
355 
356 /* Start asynchronous management operation using unregister_path */
357 static void
358 unregister_cb(void *opaque)
359 {
360 	struct vbdev_ocf *vbdev = opaque;
361 	vbdev_ocf_mngt_fn *unregister_path;
362 	int rc;
363 
364 	unregister_path = vbdev->state.doing_clean_delete ?
365 			  unregister_path_clean : unregister_path_dirty;
366 
367 	rc = vbdev_ocf_mngt_start(vbdev, unregister_path, NULL, NULL);
368 	if (rc) {
369 		SPDK_ERRLOG("Unable to unregister OCF bdev: %d\n", rc);
370 		spdk_bdev_destruct_done(&vbdev->exp_bdev, rc);
371 	}
372 }
373 
374 /* Clean remove case - remove core and then cache, this order
375  * will remove instance permanently */
376 static void
377 _vbdev_ocf_destruct_clean(struct vbdev_ocf *vbdev)
378 {
379 	if (vbdev->core.attached) {
380 		detach_core(vbdev);
381 		close_core_bdev(vbdev);
382 	}
383 
384 	if (vbdev->cache.attached) {
385 		detach_cache(vbdev);
386 		close_cache_bdev(vbdev);
387 	}
388 }
389 
390 /* Dirty shutdown/hot remove case - remove cache and then core, this order
391  * will allow us to recover this instance in the future */
392 static void
393 _vbdev_ocf_destruct_dirty(struct vbdev_ocf *vbdev)
394 {
395 	if (vbdev->cache.attached) {
396 		detach_cache(vbdev);
397 		close_cache_bdev(vbdev);
398 	}
399 
400 	if (vbdev->core.attached) {
401 		detach_core(vbdev);
402 		close_core_bdev(vbdev);
403 	}
404 }
405 
406 /* Unregister io device with callback to unregister_cb
407  * This function is called during spdk_bdev_unregister */
408 static int
409 vbdev_ocf_destruct(void *opaque)
410 {
411 	struct vbdev_ocf *vbdev = opaque;
412 
413 	if (vbdev->state.doing_finish) {
414 		return -EALREADY;
415 	}
416 
417 	if (vbdev->state.starting && !vbdev->state.started) {
418 		/* Prevent before detach cache/core during register path of
419 		  this bdev */
420 		return -EBUSY;
421 	}
422 
423 	vbdev->state.doing_finish = true;
424 
425 	if (vbdev->state.started) {
426 		spdk_io_device_unregister(vbdev, unregister_cb);
427 		/* Return 1 because unregister is delayed */
428 		return 1;
429 	}
430 
431 	if (vbdev->state.doing_clean_delete) {
432 		_vbdev_ocf_destruct_clean(vbdev);
433 	} else {
434 		_vbdev_ocf_destruct_dirty(vbdev);
435 	}
436 
437 	return 0;
438 }
439 
440 /* Stop OCF cache and unregister SPDK bdev */
441 int
442 vbdev_ocf_delete(struct vbdev_ocf *vbdev, void (*cb)(void *, int), void *cb_arg)
443 {
444 	int rc = 0;
445 
446 	if (vbdev->state.started) {
447 		spdk_bdev_unregister(&vbdev->exp_bdev, cb, cb_arg);
448 	} else {
449 		rc = vbdev_ocf_destruct(vbdev);
450 		if (rc == 0 && cb) {
451 			cb(cb_arg, 0);
452 		}
453 	}
454 
455 	return rc;
456 }
457 
458 /* Remove cores permanently and then stop OCF cache and unregister SPDK bdev */
459 int
460 vbdev_ocf_delete_clean(struct vbdev_ocf *vbdev, void (*cb)(void *, int),
461 		       void *cb_arg)
462 {
463 	vbdev->state.doing_clean_delete = true;
464 
465 	return vbdev_ocf_delete(vbdev, cb, cb_arg);
466 }
467 
468 
469 /* If vbdev is online, return its object */
470 struct vbdev_ocf *
471 vbdev_ocf_get_by_name(const char *name)
472 {
473 	struct vbdev_ocf *vbdev;
474 
475 	if (name == NULL) {
476 		assert(false);
477 		return NULL;
478 	}
479 
480 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
481 		if (vbdev->name == NULL || vbdev->state.doing_finish) {
482 			continue;
483 		}
484 		if (strcmp(vbdev->name, name) == 0) {
485 			return vbdev;
486 		}
487 	}
488 	return NULL;
489 }
490 
491 /* Return matching base if parent vbdev is online */
492 struct vbdev_ocf_base *
493 vbdev_ocf_get_base_by_name(const char *name)
494 {
495 	struct vbdev_ocf *vbdev;
496 
497 	if (name == NULL) {
498 		assert(false);
499 		return NULL;
500 	}
501 
502 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
503 		if (vbdev->state.doing_finish) {
504 			continue;
505 		}
506 
507 		if (vbdev->cache.name && strcmp(vbdev->cache.name, name) == 0) {
508 			return &vbdev->cache;
509 		}
510 		if (vbdev->core.name && strcmp(vbdev->core.name, name) == 0) {
511 			return &vbdev->core;
512 		}
513 	}
514 	return NULL;
515 }
516 
517 /* Execute fn for each OCF device that is online or waits for base devices */
518 void
519 vbdev_ocf_foreach(vbdev_ocf_foreach_fn fn, void *ctx)
520 {
521 	struct vbdev_ocf *vbdev;
522 
523 	assert(fn != NULL);
524 
525 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
526 		if (!vbdev->state.doing_finish) {
527 			fn(vbdev, ctx);
528 		}
529 	}
530 }
531 
532 /* Called from OCF when SPDK_IO is completed */
533 static void
534 vbdev_ocf_io_submit_cb(struct ocf_io *io, int error)
535 {
536 	struct spdk_bdev_io *bdev_io = io->priv1;
537 
538 	if (error == 0) {
539 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
540 	} else if (error == -OCF_ERR_NO_MEM) {
541 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
542 	} else {
543 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
544 	}
545 
546 	ocf_io_put(io);
547 }
548 
549 /* Configure io parameters and send it to OCF */
550 static int
551 io_submit_to_ocf(struct spdk_bdev_io *bdev_io, struct ocf_io *io)
552 {
553 	switch (bdev_io->type) {
554 	case SPDK_BDEV_IO_TYPE_WRITE:
555 	case SPDK_BDEV_IO_TYPE_READ:
556 		ocf_core_submit_io(io);
557 		return 0;
558 	case SPDK_BDEV_IO_TYPE_FLUSH:
559 		ocf_core_submit_flush(io);
560 		return 0;
561 	case SPDK_BDEV_IO_TYPE_UNMAP:
562 		ocf_core_submit_discard(io);
563 		return 0;
564 	case SPDK_BDEV_IO_TYPE_RESET:
565 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
566 	default:
567 		SPDK_ERRLOG("Unsupported IO type: %d\n", bdev_io->type);
568 		return -EINVAL;
569 	}
570 }
571 
572 /* Submit SPDK-IO to OCF */
573 static void
574 io_handle(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
575 {
576 	struct vbdev_ocf *vbdev = bdev_io->bdev->ctxt;
577 	struct ocf_io *io = NULL;
578 	struct bdev_ocf_data *data = NULL;
579 	struct vbdev_ocf_qctx *qctx = spdk_io_channel_get_ctx(ch);
580 	uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
581 	uint64_t offset = bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen;
582 	int dir, flags = 0;
583 	int err;
584 
585 	switch (bdev_io->type) {
586 	case SPDK_BDEV_IO_TYPE_READ:
587 		dir = OCF_READ;
588 		break;
589 	case SPDK_BDEV_IO_TYPE_WRITE:
590 		dir = OCF_WRITE;
591 		break;
592 	case SPDK_BDEV_IO_TYPE_FLUSH:
593 		dir = OCF_WRITE;
594 		break;
595 	case SPDK_BDEV_IO_TYPE_UNMAP:
596 		dir = OCF_WRITE;
597 		break;
598 	default:
599 		err = -EINVAL;
600 		goto fail;
601 	}
602 
603 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
604 		flags = OCF_WRITE_FLUSH;
605 	}
606 
607 	io = ocf_core_new_io(vbdev->ocf_core, qctx->queue, offset, len, dir, 0, flags);
608 	if (!io) {
609 		err = -ENOMEM;
610 		goto fail;
611 	}
612 
613 	data = vbdev_ocf_data_from_spdk_io(bdev_io);
614 	if (!data) {
615 		err = -ENOMEM;
616 		goto fail;
617 	}
618 
619 	err = ocf_io_set_data(io, data, 0);
620 	if (err) {
621 		goto fail;
622 	}
623 
624 	ocf_io_set_cmpl(io, bdev_io, NULL, vbdev_ocf_io_submit_cb);
625 
626 	err = io_submit_to_ocf(bdev_io, io);
627 	if (err) {
628 		goto fail;
629 	}
630 
631 	return;
632 
633 fail:
634 	if (io) {
635 		ocf_io_put(io);
636 	}
637 
638 	if (err == -ENOMEM) {
639 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
640 	} else {
641 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
642 	}
643 }
644 
645 static void
646 vbdev_ocf_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
647 		     bool success)
648 {
649 	if (!success) {
650 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
651 		return;
652 	}
653 
654 	io_handle(ch, bdev_io);
655 }
656 
657 /* Called from bdev layer when an io to Cache vbdev is submitted */
658 static void
659 vbdev_ocf_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
660 {
661 	switch (bdev_io->type) {
662 	case SPDK_BDEV_IO_TYPE_READ:
663 		/* User does not have to allocate io vectors for the request,
664 		 * so in case they are not allocated, we allocate them here */
665 		spdk_bdev_io_get_buf(bdev_io, vbdev_ocf_get_buf_cb,
666 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
667 		break;
668 	case SPDK_BDEV_IO_TYPE_WRITE:
669 	case SPDK_BDEV_IO_TYPE_FLUSH:
670 	case SPDK_BDEV_IO_TYPE_UNMAP:
671 		io_handle(ch, bdev_io);
672 		break;
673 	case SPDK_BDEV_IO_TYPE_RESET:
674 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
675 	default:
676 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
677 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
678 		break;
679 	}
680 }
681 
682 /* Called from bdev layer */
683 static bool
684 vbdev_ocf_io_type_supported(void *opaque, enum spdk_bdev_io_type io_type)
685 {
686 	struct vbdev_ocf *vbdev = opaque;
687 
688 	switch (io_type) {
689 	case SPDK_BDEV_IO_TYPE_READ:
690 	case SPDK_BDEV_IO_TYPE_WRITE:
691 	case SPDK_BDEV_IO_TYPE_FLUSH:
692 	case SPDK_BDEV_IO_TYPE_UNMAP:
693 		return spdk_bdev_io_type_supported(vbdev->core.bdev, io_type);
694 	case SPDK_BDEV_IO_TYPE_RESET:
695 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
696 	default:
697 		return false;
698 	}
699 }
700 
701 /* Called from bdev layer */
702 static struct spdk_io_channel *
703 vbdev_ocf_get_io_channel(void *opaque)
704 {
705 	struct vbdev_ocf *bdev = opaque;
706 
707 	return spdk_get_io_channel(bdev);
708 }
709 
710 static int
711 vbdev_ocf_dump_info_json(void *opaque, struct spdk_json_write_ctx *w)
712 {
713 	struct vbdev_ocf *vbdev = opaque;
714 
715 	spdk_json_write_named_string(w, "cache_device", vbdev->cache.name);
716 	spdk_json_write_named_string(w, "core_device", vbdev->core.name);
717 
718 	spdk_json_write_named_string(w, "mode",
719 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
720 	spdk_json_write_named_uint32(w, "cache_line_size",
721 				     ocf_get_cache_line_size(vbdev->ocf_cache));
722 	spdk_json_write_named_bool(w, "metadata_volatile",
723 				   vbdev->cfg.cache.metadata_volatile);
724 
725 	return 0;
726 }
727 
728 static void
729 vbdev_ocf_write_json_config(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
730 {
731 	struct vbdev_ocf *vbdev = bdev->ctxt;
732 
733 	spdk_json_write_object_begin(w);
734 
735 	spdk_json_write_named_string(w, "method", "bdev_ocf_create");
736 
737 	spdk_json_write_named_object_begin(w, "params");
738 	spdk_json_write_named_string(w, "name", vbdev->name);
739 	spdk_json_write_named_string(w, "mode",
740 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
741 	spdk_json_write_named_uint32(w, "cache_line_size",
742 				     ocf_get_cache_line_size(vbdev->ocf_cache));
743 	spdk_json_write_named_string(w, "cache_bdev_name", vbdev->cache.name);
744 	spdk_json_write_named_string(w, "core_bdev_name", vbdev->core.name);
745 	spdk_json_write_object_end(w);
746 
747 	spdk_json_write_object_end(w);
748 }
749 
750 /* Cache vbdev function table
751  * Used by bdev layer */
752 static struct spdk_bdev_fn_table cache_dev_fn_table = {
753 	.destruct = vbdev_ocf_destruct,
754 	.io_type_supported = vbdev_ocf_io_type_supported,
755 	.submit_request	= vbdev_ocf_submit_request,
756 	.get_io_channel	= vbdev_ocf_get_io_channel,
757 	.write_config_json = vbdev_ocf_write_json_config,
758 	.dump_info_json = vbdev_ocf_dump_info_json,
759 };
760 
761 /* Poller function for the OCF queue
762  * We execute OCF requests here synchronously */
763 static int
764 queue_poll(void *opaque)
765 {
766 	struct vbdev_ocf_qctx *qctx = opaque;
767 	uint32_t iono = ocf_queue_pending_io(qctx->queue);
768 	int i, max = spdk_min(32, iono);
769 
770 	for (i = 0; i < max; i++) {
771 		ocf_queue_run_single(qctx->queue);
772 	}
773 
774 	if (iono > 0) {
775 		return SPDK_POLLER_BUSY;
776 	} else {
777 		return SPDK_POLLER_IDLE;
778 	}
779 }
780 
781 /* Called during ocf_submit_io, ocf_purge*
782  * and any other requests that need to submit io */
783 static void
784 vbdev_ocf_ctx_queue_kick(ocf_queue_t q)
785 {
786 }
787 
788 /* OCF queue deinitialization
789  * Called at ocf_cache_stop */
790 static void
791 vbdev_ocf_ctx_queue_stop(ocf_queue_t q)
792 {
793 	struct vbdev_ocf_qctx *qctx = ocf_queue_get_priv(q);
794 
795 	if (qctx) {
796 		spdk_put_io_channel(qctx->cache_ch);
797 		spdk_put_io_channel(qctx->core_ch);
798 		spdk_poller_unregister(&qctx->poller);
799 		if (qctx->allocated) {
800 			free(qctx);
801 		}
802 	}
803 }
804 
805 /* Queue ops is an interface for running queue thread
806  * stop() operation in called just before queue gets destroyed */
807 const struct ocf_queue_ops queue_ops = {
808 	.kick_sync = vbdev_ocf_ctx_queue_kick,
809 	.kick = vbdev_ocf_ctx_queue_kick,
810 	.stop = vbdev_ocf_ctx_queue_stop,
811 };
812 
813 /* Called on cache vbdev creation at every thread
814  * We allocate OCF queues here and SPDK poller for it */
815 static int
816 io_device_create_cb(void *io_device, void *ctx_buf)
817 {
818 	struct vbdev_ocf *vbdev = io_device;
819 	struct vbdev_ocf_qctx *qctx = ctx_buf;
820 	int rc;
821 
822 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &qctx->queue, &queue_ops);
823 	if (rc) {
824 		return rc;
825 	}
826 
827 	ocf_queue_set_priv(qctx->queue, qctx);
828 
829 	qctx->vbdev      = vbdev;
830 	qctx->cache_ch   = spdk_bdev_get_io_channel(vbdev->cache.desc);
831 	qctx->core_ch    = spdk_bdev_get_io_channel(vbdev->core.desc);
832 	qctx->poller     = SPDK_POLLER_REGISTER(queue_poll, qctx, 0);
833 
834 	return rc;
835 }
836 
837 /* Called per thread
838  * Put OCF queue and relaunch poller with new context to finish pending requests */
839 static void
840 io_device_destroy_cb(void *io_device, void *ctx_buf)
841 {
842 	/* Making a copy of context to use it after io channel will be destroyed */
843 	struct vbdev_ocf_qctx *copy = malloc(sizeof(*copy));
844 	struct vbdev_ocf_qctx *qctx = ctx_buf;
845 
846 	if (copy) {
847 		ocf_queue_set_priv(qctx->queue, copy);
848 		memcpy(copy, qctx, sizeof(*copy));
849 		spdk_poller_unregister(&qctx->poller);
850 		copy->poller = SPDK_POLLER_REGISTER(queue_poll, copy, 0);
851 		copy->allocated = true;
852 	} else {
853 		SPDK_ERRLOG("Unable to stop OCF queue properly: %s\n",
854 			    spdk_strerror(ENOMEM));
855 	}
856 
857 	vbdev_ocf_queue_put(qctx->queue);
858 }
859 
860 /* OCF management queue deinitialization */
861 static void
862 vbdev_ocf_ctx_mngt_queue_stop(ocf_queue_t q)
863 {
864 	struct spdk_poller *poller = ocf_queue_get_priv(q);
865 
866 	if (poller) {
867 		spdk_poller_unregister(&poller);
868 	}
869 }
870 
871 static int
872 mngt_queue_poll(void *opaque)
873 {
874 	ocf_queue_t q = opaque;
875 	uint32_t iono = ocf_queue_pending_io(q);
876 	int i, max = spdk_min(32, iono);
877 
878 	for (i = 0; i < max; i++) {
879 		ocf_queue_run_single(q);
880 	}
881 
882 	if (iono > 0) {
883 		return SPDK_POLLER_BUSY;
884 	} else {
885 		return SPDK_POLLER_IDLE;
886 	}
887 }
888 
889 static void
890 vbdev_ocf_ctx_mngt_queue_kick(ocf_queue_t q)
891 {
892 }
893 
894 /* Queue ops is an interface for running queue thread
895  * stop() operation in called just before queue gets destroyed */
896 const struct ocf_queue_ops mngt_queue_ops = {
897 	.kick_sync = NULL,
898 	.kick = vbdev_ocf_ctx_mngt_queue_kick,
899 	.stop = vbdev_ocf_ctx_mngt_queue_stop,
900 };
901 
902 static void
903 vbdev_ocf_mngt_exit(struct vbdev_ocf *vbdev, vbdev_ocf_mngt_fn *rollback_path, int rc)
904 {
905 	vbdev->state.starting = false;
906 	vbdev_ocf_mngt_stop(vbdev, rollback_path, rc);
907 }
908 
909 /* Create exported spdk object */
910 static void
911 finish_register(struct vbdev_ocf *vbdev)
912 {
913 	int result;
914 
915 	/* Copy properties of the base bdev */
916 	vbdev->exp_bdev.blocklen = vbdev->core.bdev->blocklen;
917 	vbdev->exp_bdev.write_cache = vbdev->core.bdev->write_cache;
918 	vbdev->exp_bdev.required_alignment = vbdev->core.bdev->required_alignment;
919 
920 	vbdev->exp_bdev.name = vbdev->name;
921 	vbdev->exp_bdev.product_name = "SPDK OCF";
922 
923 	vbdev->exp_bdev.blockcnt = vbdev->core.bdev->blockcnt;
924 	vbdev->exp_bdev.ctxt = vbdev;
925 	vbdev->exp_bdev.fn_table = &cache_dev_fn_table;
926 	vbdev->exp_bdev.module = &ocf_if;
927 
928 	/* Finally register vbdev in SPDK */
929 	spdk_io_device_register(vbdev, io_device_create_cb, io_device_destroy_cb,
930 				sizeof(struct vbdev_ocf_qctx), vbdev->name);
931 	result = spdk_bdev_register(&vbdev->exp_bdev);
932 	if (result) {
933 		SPDK_ERRLOG("Could not register exposed bdev %s\n",
934 			    vbdev->name);
935 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, result);
936 		return;
937 	} else {
938 		vbdev->state.started = true;
939 	}
940 
941 	vbdev_ocf_mngt_continue(vbdev, result);
942 }
943 
944 static void
945 add_core_cmpl(ocf_cache_t cache, ocf_core_t core, void *priv, int error)
946 {
947 	struct vbdev_ocf *vbdev = priv;
948 
949 	ocf_mngt_cache_unlock(cache);
950 
951 	if (error) {
952 		SPDK_ERRLOG("Error %d, failed to add core device to cache instance %s,"
953 			    "starting rollback\n", error, vbdev->name);
954 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
955 		return;
956 	} else {
957 		vbdev->ocf_core = core;
958 	}
959 
960 	vbdev_ocf_mngt_continue(vbdev, error);
961 }
962 
963 /* Try to lock cache, then add core */
964 static void
965 add_core_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
966 {
967 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
968 
969 	if (error) {
970 		SPDK_ERRLOG("Error %d, can not lock cache instance %s,"
971 			    "starting rollback\n", error, vbdev->name);
972 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
973 	}
974 	ocf_mngt_cache_add_core(vbdev->ocf_cache, &vbdev->cfg.core, add_core_cmpl, vbdev);
975 }
976 
977 /* Add core for existing OCF cache instance */
978 static void
979 add_core(struct vbdev_ocf *vbdev)
980 {
981 	ocf_mngt_cache_lock(vbdev->ocf_cache, add_core_cache_lock_cmpl, vbdev);
982 }
983 
984 static void
985 start_cache_cmpl(ocf_cache_t cache, void *priv, int error)
986 {
987 	struct vbdev_ocf *vbdev = priv;
988 	uint64_t mem_needed;
989 
990 	ocf_mngt_cache_unlock(cache);
991 
992 	if (error) {
993 		SPDK_ERRLOG("Error %d during start cache %s, starting rollback\n",
994 			    error, vbdev->name);
995 
996 		if (error == -OCF_ERR_NO_MEM) {
997 			ocf_mngt_get_ram_needed(cache, &vbdev->cfg.device, &mem_needed);
998 
999 			SPDK_NOTICELOG("Try to increase hugepage memory size or cache line size. "
1000 				       "For your configuration:\nDevice size: %"PRIu64" bytes\n"
1001 				       "Cache line size: %"PRIu64" bytes\nFree memory needed to start "
1002 				       "cache: %"PRIu64" bytes\n", vbdev->cache.bdev->blockcnt *
1003 				       vbdev->cache.bdev->blocklen, vbdev->cfg.cache.cache_line_size,
1004 				       mem_needed);
1005 		}
1006 
1007 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
1008 		return;
1009 	}
1010 
1011 	vbdev_ocf_mngt_continue(vbdev, error);
1012 }
1013 
1014 static int
1015 create_management_queue(struct vbdev_ocf *vbdev)
1016 {
1017 	struct spdk_poller *mngt_poller;
1018 	int rc;
1019 
1020 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &vbdev->cache_ctx->mngt_queue, &mngt_queue_ops);
1021 	if (rc) {
1022 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1023 		return rc;
1024 	}
1025 
1026 	mngt_poller = SPDK_POLLER_REGISTER(mngt_queue_poll, vbdev->cache_ctx->mngt_queue, 100);
1027 	if (mngt_poller == NULL) {
1028 		SPDK_ERRLOG("Unable to initiate mngt request: %s", spdk_strerror(ENOMEM));
1029 		return -ENOMEM;
1030 	}
1031 
1032 	ocf_queue_set_priv(vbdev->cache_ctx->mngt_queue, mngt_poller);
1033 	ocf_mngt_cache_set_mngt_queue(vbdev->ocf_cache, vbdev->cache_ctx->mngt_queue);
1034 
1035 	return 0;
1036 }
1037 
1038 /* Start OCF cache, attach caching device */
1039 static void
1040 start_cache(struct vbdev_ocf *vbdev)
1041 {
1042 	ocf_cache_t existing;
1043 	uint32_t cache_block_size = vbdev->cache.bdev->blocklen;
1044 	uint32_t core_block_size = vbdev->core.bdev->blocklen;
1045 	int rc;
1046 
1047 	if (is_ocf_cache_running(vbdev)) {
1048 		vbdev_ocf_mngt_stop(vbdev, NULL, -EALREADY);
1049 		return;
1050 	}
1051 
1052 	if (cache_block_size > core_block_size) {
1053 		SPDK_ERRLOG("Cache bdev block size (%d) is bigger then core bdev block size (%d)\n",
1054 			    cache_block_size, core_block_size);
1055 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, -EINVAL);
1056 		return;
1057 	}
1058 
1059 	existing = get_other_cache_instance(vbdev);
1060 	if (existing) {
1061 		SPDK_NOTICELOG("OCF bdev %s connects to existing cache device %s\n",
1062 			       vbdev->name, vbdev->cache.name);
1063 		vbdev->ocf_cache = existing;
1064 		ocf_mngt_cache_get(vbdev->ocf_cache);
1065 		vbdev->cache_ctx = ocf_cache_get_priv(existing);
1066 		vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1067 		vbdev_ocf_mngt_continue(vbdev, 0);
1068 		return;
1069 	}
1070 
1071 	vbdev->cache_ctx = calloc(1, sizeof(struct vbdev_ocf_cache_ctx));
1072 	if (vbdev->cache_ctx == NULL) {
1073 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, -ENOMEM);
1074 		return;
1075 	}
1076 
1077 	vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1078 	pthread_mutex_init(&vbdev->cache_ctx->lock, NULL);
1079 
1080 	rc = ocf_mngt_cache_start(vbdev_ocf_ctx, &vbdev->ocf_cache, &vbdev->cfg.cache, NULL);
1081 	if (rc) {
1082 		SPDK_ERRLOG("Could not start cache %s: %d\n", vbdev->name, rc);
1083 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, rc);
1084 		return;
1085 	}
1086 	ocf_mngt_cache_get(vbdev->ocf_cache);
1087 
1088 	ocf_cache_set_priv(vbdev->ocf_cache, vbdev->cache_ctx);
1089 
1090 	rc = create_management_queue(vbdev);
1091 	if (rc) {
1092 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1093 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, rc);
1094 		return;
1095 	}
1096 
1097 	if (vbdev->cfg.loadq) {
1098 		ocf_mngt_cache_load(vbdev->ocf_cache, &vbdev->cfg.device, start_cache_cmpl, vbdev);
1099 	} else {
1100 		ocf_mngt_cache_attach(vbdev->ocf_cache, &vbdev->cfg.device, start_cache_cmpl, vbdev);
1101 	}
1102 }
1103 
1104 /* Procedures called during register operation */
1105 vbdev_ocf_mngt_fn register_path[] = {
1106 	start_cache,
1107 	add_core,
1108 	finish_register,
1109 	NULL
1110 };
1111 
1112 /* Start cache instance and register OCF bdev */
1113 static void
1114 register_vbdev(struct vbdev_ocf *vbdev, vbdev_ocf_mngt_callback cb, void *cb_arg)
1115 {
1116 	int rc;
1117 
1118 	if (!(vbdev->core.attached && vbdev->cache.attached) || vbdev->state.started) {
1119 		cb(-EPERM, vbdev, cb_arg);
1120 		return;
1121 	}
1122 
1123 	vbdev->state.starting = true;
1124 	rc = vbdev_ocf_mngt_start(vbdev, register_path, cb, cb_arg);
1125 	if (rc) {
1126 		cb(rc, vbdev, cb_arg);
1127 	}
1128 }
1129 
1130 /* Init OCF configuration options
1131  * for core and cache devices */
1132 static void
1133 init_vbdev_config(struct vbdev_ocf *vbdev)
1134 {
1135 	struct vbdev_ocf_config *cfg = &vbdev->cfg;
1136 
1137 	/* Initialize OCF defaults first */
1138 	ocf_mngt_cache_device_config_set_default(&cfg->device);
1139 	ocf_mngt_cache_config_set_default(&cfg->cache);
1140 	ocf_mngt_core_config_set_default(&cfg->core);
1141 
1142 	snprintf(cfg->cache.name, sizeof(cfg->cache.name), "%s", vbdev->name);
1143 	snprintf(cfg->core.name, sizeof(cfg->core.name), "%s", vbdev->core.name);
1144 
1145 	cfg->device.open_cores = false;
1146 	cfg->device.perform_test = false;
1147 	cfg->device.discard_on_start = false;
1148 
1149 	vbdev->cfg.cache.locked = true;
1150 
1151 	cfg->core.volume_type = SPDK_OBJECT;
1152 	cfg->device.volume_type = SPDK_OBJECT;
1153 
1154 	if (vbdev->cfg.loadq) {
1155 		/* When doing cache_load(), we need to set try_add to true,
1156 		 * otherwise OCF will interpret this core as new
1157 		 * instead of the inactive one */
1158 		vbdev->cfg.core.try_add = true;
1159 	} else {
1160 		/* When cache is initialized as new, set force flag to true,
1161 		 * to ignore warnings about existing metadata */
1162 		cfg->device.force = true;
1163 	}
1164 
1165 	/* Serialize bdev names in OCF UUID to interpret on future loads
1166 	 * Core UUID is a triple of (core name, vbdev name, cache name)
1167 	 * Cache UUID is cache bdev name */
1168 	cfg->device.uuid.size = strlen(vbdev->cache.name) + 1;
1169 	cfg->device.uuid.data = vbdev->cache.name;
1170 
1171 	snprintf(vbdev->uuid, VBDEV_OCF_MD_MAX_LEN, "%s %s %s",
1172 		 vbdev->core.name, vbdev->name, vbdev->cache.name);
1173 	cfg->core.uuid.size = strlen(vbdev->uuid) + 1;
1174 	cfg->core.uuid.data = vbdev->uuid;
1175 	vbdev->uuid[strlen(vbdev->core.name)] = 0;
1176 	vbdev->uuid[strlen(vbdev->core.name) + 1 + strlen(vbdev->name)] = 0;
1177 }
1178 
1179 /* Allocate vbdev structure object and add it to the global list */
1180 static int
1181 init_vbdev(const char *vbdev_name,
1182 	   const char *cache_mode_name,
1183 	   const uint64_t cache_line_size,
1184 	   const char *cache_name,
1185 	   const char *core_name,
1186 	   bool loadq)
1187 {
1188 	struct vbdev_ocf *vbdev;
1189 	int rc = 0;
1190 
1191 	if (spdk_bdev_get_by_name(vbdev_name) || vbdev_ocf_get_by_name(vbdev_name)) {
1192 		SPDK_ERRLOG("Device with name '%s' already exists\n", vbdev_name);
1193 		return -EPERM;
1194 	}
1195 
1196 	vbdev = calloc(1, sizeof(*vbdev));
1197 	if (!vbdev) {
1198 		goto error_mem;
1199 	}
1200 
1201 	vbdev->name = strdup(vbdev_name);
1202 	if (!vbdev->name) {
1203 		goto error_mem;
1204 	}
1205 
1206 	vbdev->cache.name = strdup(cache_name);
1207 	if (!vbdev->cache.name) {
1208 		goto error_mem;
1209 	}
1210 
1211 	vbdev->core.name = strdup(core_name);
1212 	if (!vbdev->core.name) {
1213 		goto error_mem;
1214 	}
1215 
1216 	vbdev->cache.parent = vbdev;
1217 	vbdev->core.parent = vbdev;
1218 	vbdev->cache.is_cache = true;
1219 	vbdev->core.is_cache = false;
1220 	vbdev->cfg.loadq = loadq;
1221 
1222 	init_vbdev_config(vbdev);
1223 
1224 	if (cache_mode_name) {
1225 		vbdev->cfg.cache.cache_mode
1226 			= ocf_get_cache_mode(cache_mode_name);
1227 	} else if (!loadq) { /* In load path it is OK to pass NULL as cache mode */
1228 		SPDK_ERRLOG("No cache mode specified\n");
1229 		rc = -EINVAL;
1230 		goto error_free;
1231 	}
1232 	if (vbdev->cfg.cache.cache_mode < 0) {
1233 		SPDK_ERRLOG("Incorrect cache mode '%s'\n", cache_mode_name);
1234 		rc = -EINVAL;
1235 		goto error_free;
1236 	}
1237 
1238 	ocf_cache_line_size_t set_cache_line_size = cache_line_size ?
1239 			(ocf_cache_line_size_t)cache_line_size * KiB :
1240 			ocf_cache_line_size_default;
1241 	if (set_cache_line_size == 0) {
1242 		SPDK_ERRLOG("Cache line size should be non-zero.\n");
1243 		rc = -EINVAL;
1244 		goto error_free;
1245 	}
1246 	vbdev->cfg.device.cache_line_size = set_cache_line_size;
1247 	vbdev->cfg.cache.cache_line_size = set_cache_line_size;
1248 
1249 	TAILQ_INSERT_TAIL(&g_ocf_vbdev_head, vbdev, tailq);
1250 	return rc;
1251 
1252 error_mem:
1253 	rc = -ENOMEM;
1254 error_free:
1255 	free_vbdev(vbdev);
1256 	return rc;
1257 }
1258 
1259 /* This deprecation is likely to be removed, and the ocf support will remain
1260  * in SPDK.  See deprecation.md for more details.
1261  */
1262 SPDK_LOG_DEPRECATION_REGISTER(bdev_ocf, "bdev_ocf support", "SPDK 23.09", 0);
1263 
1264 /* Read configuration file at the start of SPDK application
1265  * This adds vbdevs to global list if some mentioned in config */
1266 static int
1267 vbdev_ocf_init(void)
1268 {
1269 	int status;
1270 
1271 	status = vbdev_ocf_ctx_init();
1272 	if (status) {
1273 		SPDK_ERRLOG("OCF ctx initialization failed with=%d\n", status);
1274 		return status;
1275 	}
1276 
1277 	status = vbdev_ocf_volume_init();
1278 	if (status) {
1279 		vbdev_ocf_ctx_cleanup();
1280 		SPDK_ERRLOG("OCF volume initialization failed with=%d\n", status);
1281 		return status;
1282 	}
1283 
1284 	return status;
1285 }
1286 
1287 /* Called after application shutdown started
1288  * Release memory of allocated structures here */
1289 static void
1290 vbdev_ocf_module_fini(void)
1291 {
1292 	struct vbdev_ocf *vbdev;
1293 
1294 	while ((vbdev = TAILQ_FIRST(&g_ocf_vbdev_head))) {
1295 		TAILQ_REMOVE(&g_ocf_vbdev_head, vbdev, tailq);
1296 		free_vbdev(vbdev);
1297 	}
1298 
1299 	vbdev_ocf_volume_cleanup();
1300 	vbdev_ocf_ctx_cleanup();
1301 }
1302 
1303 /* When base device gets unplugged this is called
1304  * We will unregister cache vbdev here
1305  * When cache device is removed, we delete every OCF bdev that used it */
1306 static void
1307 hotremove_cb(struct vbdev_ocf_base *base)
1308 {
1309 	struct vbdev_ocf *vbdev;
1310 
1311 	if (!base->is_cache) {
1312 		if (base->parent->state.doing_finish) {
1313 			return;
1314 		}
1315 
1316 		SPDK_NOTICELOG("Deinitializing '%s' because its core device '%s' was removed\n",
1317 			       base->parent->name, base->name);
1318 		vbdev_ocf_delete(base->parent, NULL, NULL);
1319 		return;
1320 	}
1321 
1322 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1323 		if (vbdev->state.doing_finish) {
1324 			continue;
1325 		}
1326 		if (strcmp(base->name, vbdev->cache.name) == 0) {
1327 			SPDK_NOTICELOG("Deinitializing '%s' because"
1328 				       " its cache device '%s' was removed\n",
1329 				       vbdev->name, base->name);
1330 			vbdev_ocf_delete(vbdev, NULL, NULL);
1331 		}
1332 	}
1333 }
1334 
1335 static void
1336 base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1337 		   void *event_ctx)
1338 {
1339 	switch (type) {
1340 	case SPDK_BDEV_EVENT_REMOVE:
1341 		if (event_ctx) {
1342 			hotremove_cb(event_ctx);
1343 		}
1344 		break;
1345 	default:
1346 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1347 		break;
1348 	}
1349 }
1350 
1351 /* Open base SPDK bdev and claim it */
1352 static int
1353 attach_base(struct vbdev_ocf_base *base)
1354 {
1355 	int status;
1356 
1357 	if (base->attached) {
1358 		return -EALREADY;
1359 	}
1360 
1361 	/* If base cache bdev was already opened by other vbdev,
1362 	 * we just copy its descriptor here */
1363 	if (base->is_cache) {
1364 		struct vbdev_ocf_base *existing = get_other_cache_base(base);
1365 		if (existing) {
1366 			base->desc = existing->desc;
1367 			base->management_channel = existing->management_channel;
1368 			base->attached = true;
1369 			return 0;
1370 		}
1371 	}
1372 
1373 	status = spdk_bdev_open_ext(base->name, true, base_bdev_event_cb, base, &base->desc);
1374 	if (status) {
1375 		SPDK_ERRLOG("Unable to open device '%s' for writing\n", base->name);
1376 		return status;
1377 	}
1378 
1379 	status = spdk_bdev_module_claim_bdev(base->bdev, base->desc,
1380 					     &ocf_if);
1381 	if (status) {
1382 		SPDK_ERRLOG("Unable to claim device '%s'\n", base->name);
1383 		spdk_bdev_close(base->desc);
1384 		return status;
1385 	}
1386 
1387 	base->management_channel = spdk_bdev_get_io_channel(base->desc);
1388 	if (!base->management_channel) {
1389 		SPDK_ERRLOG("Unable to get io channel '%s'\n", base->name);
1390 		spdk_bdev_module_release_bdev(base->bdev);
1391 		spdk_bdev_close(base->desc);
1392 		return -ENOMEM;
1393 	}
1394 
1395 	/* Save the thread where the base device is opened */
1396 	base->thread = spdk_get_thread();
1397 
1398 	base->attached = true;
1399 	return status;
1400 }
1401 
1402 /* Attach base bdevs */
1403 static int
1404 attach_base_bdevs(struct vbdev_ocf *vbdev,
1405 		  struct spdk_bdev *cache_bdev,
1406 		  struct spdk_bdev *core_bdev)
1407 {
1408 	int rc = 0;
1409 
1410 	if (cache_bdev) {
1411 		vbdev->cache.bdev = cache_bdev;
1412 		rc |= attach_base(&vbdev->cache);
1413 	}
1414 
1415 	if (core_bdev) {
1416 		vbdev->core.bdev = core_bdev;
1417 		rc |= attach_base(&vbdev->core);
1418 	}
1419 
1420 	return rc;
1421 }
1422 
1423 /* Init and then start vbdev if all base devices are present */
1424 void
1425 vbdev_ocf_construct(const char *vbdev_name,
1426 		    const char *cache_mode_name,
1427 		    const uint64_t cache_line_size,
1428 		    const char *cache_name,
1429 		    const char *core_name,
1430 		    bool loadq,
1431 		    void (*cb)(int, struct vbdev_ocf *, void *),
1432 		    void *cb_arg)
1433 {
1434 	int rc;
1435 	struct spdk_bdev *cache_bdev = spdk_bdev_get_by_name(cache_name);
1436 	struct spdk_bdev *core_bdev = spdk_bdev_get_by_name(core_name);
1437 	struct vbdev_ocf *vbdev;
1438 
1439 	rc = init_vbdev(vbdev_name, cache_mode_name, cache_line_size, cache_name, core_name, loadq);
1440 	if (rc) {
1441 		cb(rc, NULL, cb_arg);
1442 		return;
1443 	}
1444 
1445 	vbdev = vbdev_ocf_get_by_name(vbdev_name);
1446 	if (vbdev == NULL) {
1447 		cb(-ENODEV, NULL, cb_arg);
1448 		return;
1449 	}
1450 
1451 	if (cache_bdev == NULL) {
1452 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for cache device '%s' to connect\n",
1453 			       vbdev->name, cache_name);
1454 	}
1455 	if (core_bdev == NULL) {
1456 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for core device '%s' to connect\n",
1457 			       vbdev->name, core_name);
1458 	}
1459 
1460 	rc = attach_base_bdevs(vbdev, cache_bdev, core_bdev);
1461 	if (rc) {
1462 		cb(rc, vbdev, cb_arg);
1463 		return;
1464 	}
1465 
1466 	if (core_bdev && cache_bdev) {
1467 		register_vbdev(vbdev, cb, cb_arg);
1468 	} else {
1469 		cb(0, vbdev, cb_arg);
1470 	}
1471 }
1472 
1473 /* Set new cache mode on OCF cache */
1474 void
1475 vbdev_ocf_set_cache_mode(struct vbdev_ocf *vbdev,
1476 			 const char *cache_mode_name,
1477 			 void (*cb)(int, struct vbdev_ocf *, void *),
1478 			 void *cb_arg)
1479 {
1480 	ocf_cache_t cache;
1481 	ocf_cache_mode_t cache_mode;
1482 	int rc;
1483 
1484 	cache = vbdev->ocf_cache;
1485 	cache_mode = ocf_get_cache_mode(cache_mode_name);
1486 
1487 	rc = ocf_mngt_cache_trylock(cache);
1488 	if (rc) {
1489 		cb(rc, vbdev, cb_arg);
1490 		return;
1491 	}
1492 
1493 	rc = ocf_mngt_cache_set_mode(cache, cache_mode);
1494 	ocf_mngt_cache_unlock(cache);
1495 	cb(rc, vbdev, cb_arg);
1496 }
1497 
1498 /* Set sequential cutoff parameters on OCF cache */
1499 void
1500 vbdev_ocf_set_seqcutoff(struct vbdev_ocf *vbdev, const char *policy_name, uint32_t threshold,
1501 			uint32_t promotion_count, void (*cb)(int, void *), void *cb_arg)
1502 {
1503 	ocf_cache_t cache;
1504 	ocf_seq_cutoff_policy policy;
1505 	int rc;
1506 
1507 	cache = vbdev->ocf_cache;
1508 
1509 	policy = ocf_get_seqcutoff_policy(policy_name);
1510 	if (policy == ocf_seq_cutoff_policy_max) {
1511 		cb(OCF_ERR_INVAL, cb_arg);
1512 		return;
1513 	}
1514 
1515 	rc = ocf_mngt_cache_trylock(cache);
1516 	if (rc) {
1517 		cb(rc, cb_arg);
1518 		return;
1519 	}
1520 
1521 	rc = ocf_mngt_core_set_seq_cutoff_policy_all(cache, policy);
1522 	if (rc) {
1523 		goto end;
1524 	}
1525 
1526 	if (threshold) {
1527 		threshold = threshold * KiB;
1528 
1529 		rc = ocf_mngt_core_set_seq_cutoff_threshold_all(cache, threshold);
1530 		if (rc) {
1531 			goto end;
1532 		}
1533 	}
1534 
1535 	if (promotion_count) {
1536 		rc = ocf_mngt_core_set_seq_cutoff_promotion_count_all(cache, promotion_count);
1537 	}
1538 
1539 end:
1540 	ocf_mngt_cache_unlock(cache);
1541 	cb(rc, cb_arg);
1542 }
1543 
1544 /* This called if new device is created in SPDK application
1545  * If that device named as one of base bdevs of OCF vbdev,
1546  * claim and open them */
1547 static void
1548 vbdev_ocf_examine(struct spdk_bdev *bdev)
1549 {
1550 	const char *bdev_name = spdk_bdev_get_name(bdev);
1551 	struct vbdev_ocf *vbdev;
1552 
1553 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1554 		if (vbdev->state.doing_finish) {
1555 			continue;
1556 		}
1557 
1558 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1559 			attach_base_bdevs(vbdev, bdev, NULL);
1560 			continue;
1561 		}
1562 		if (!strcmp(bdev_name, vbdev->core.name)) {
1563 			attach_base_bdevs(vbdev, NULL, bdev);
1564 			break;
1565 		}
1566 	}
1567 	spdk_bdev_module_examine_done(&ocf_if);
1568 }
1569 
1570 struct metadata_probe_ctx {
1571 	struct vbdev_ocf_base base;
1572 	ocf_volume_t volume;
1573 
1574 	struct ocf_volume_uuid *core_uuids;
1575 	unsigned int uuid_count;
1576 
1577 	int result;
1578 	int refcnt;
1579 };
1580 
1581 static void
1582 _examine_ctx_put(void *ctx)
1583 {
1584 	struct spdk_bdev_desc *desc = ctx;
1585 
1586 	spdk_bdev_close(desc);
1587 }
1588 
1589 static void
1590 examine_ctx_put(struct metadata_probe_ctx *ctx)
1591 {
1592 	unsigned int i;
1593 
1594 	ctx->refcnt--;
1595 	if (ctx->refcnt > 0) {
1596 		return;
1597 	}
1598 
1599 	if (ctx->result) {
1600 		SPDK_ERRLOG("OCF metadata probe for bdev '%s' failed with %d\n",
1601 			    spdk_bdev_get_name(ctx->base.bdev), ctx->result);
1602 	}
1603 
1604 	if (ctx->base.desc) {
1605 		/* Close the underlying bdev on its same opened thread. */
1606 		if (ctx->base.thread && ctx->base.thread != spdk_get_thread()) {
1607 			spdk_thread_send_msg(ctx->base.thread, _examine_ctx_put, ctx->base.desc);
1608 		} else {
1609 			spdk_bdev_close(ctx->base.desc);
1610 		}
1611 	}
1612 
1613 	if (ctx->volume) {
1614 		ocf_volume_destroy(ctx->volume);
1615 	}
1616 
1617 	if (ctx->core_uuids) {
1618 		for (i = 0; i < ctx->uuid_count; i++) {
1619 			free(ctx->core_uuids[i].data);
1620 		}
1621 	}
1622 	free(ctx->core_uuids);
1623 
1624 	examine_done(ctx->result, NULL, ctx->base.bdev);
1625 	free(ctx);
1626 }
1627 
1628 static void
1629 metadata_probe_construct_cb(int rc, struct vbdev_ocf *vbdev, void *vctx)
1630 {
1631 	struct metadata_probe_ctx *ctx = vctx;
1632 
1633 	examine_ctx_put(ctx);
1634 }
1635 
1636 /* This is second callback for ocf_metadata_probe_cores()
1637  * Here we create vbdev configurations based on UUIDs */
1638 static void
1639 metadata_probe_cores_construct(void *priv, int error, unsigned int num_cores)
1640 {
1641 	struct metadata_probe_ctx *ctx = priv;
1642 	const char *vbdev_name;
1643 	const char *core_name;
1644 	const char *cache_name;
1645 	unsigned int i;
1646 
1647 	if (error) {
1648 		ctx->result = error;
1649 		examine_ctx_put(ctx);
1650 		return;
1651 	}
1652 
1653 	for (i = 0; i < num_cores; i++) {
1654 		core_name = ocf_uuid_to_str(&ctx->core_uuids[i]);
1655 		vbdev_name = core_name + strlen(core_name) + 1;
1656 		cache_name = vbdev_name + strlen(vbdev_name) + 1;
1657 
1658 		if (strcmp(ctx->base.bdev->name, cache_name)) {
1659 			SPDK_NOTICELOG("OCF metadata found on %s belongs to bdev named '%s'\n",
1660 				       ctx->base.bdev->name, cache_name);
1661 		}
1662 
1663 		ctx->refcnt++;
1664 		vbdev_ocf_construct(vbdev_name, NULL, 0, cache_name, core_name, true,
1665 				    metadata_probe_construct_cb, ctx);
1666 	}
1667 
1668 	examine_ctx_put(ctx);
1669 }
1670 
1671 /* This callback is called after OCF reads cores UUIDs from cache metadata
1672  * Here we allocate memory for those UUIDs and call ocf_metadata_probe_cores() again */
1673 static void
1674 metadata_probe_cores_get_num(void *priv, int error, unsigned int num_cores)
1675 {
1676 	struct metadata_probe_ctx *ctx = priv;
1677 	unsigned int i;
1678 
1679 	if (error) {
1680 		ctx->result = error;
1681 		examine_ctx_put(ctx);
1682 		return;
1683 	}
1684 
1685 	ctx->uuid_count = num_cores;
1686 	ctx->core_uuids = calloc(num_cores, sizeof(struct ocf_volume_uuid));
1687 	if (!ctx->core_uuids) {
1688 		ctx->result = -ENOMEM;
1689 		examine_ctx_put(ctx);
1690 		return;
1691 	}
1692 
1693 	for (i = 0; i < ctx->uuid_count; i++) {
1694 		ctx->core_uuids[i].size = OCF_VOLUME_UUID_MAX_SIZE;
1695 		ctx->core_uuids[i].data = malloc(OCF_VOLUME_UUID_MAX_SIZE);
1696 		if (!ctx->core_uuids[i].data) {
1697 			ctx->result = -ENOMEM;
1698 			examine_ctx_put(ctx);
1699 			return;
1700 		}
1701 	}
1702 
1703 	ocf_metadata_probe_cores(vbdev_ocf_ctx, ctx->volume, ctx->core_uuids, ctx->uuid_count,
1704 				 metadata_probe_cores_construct, ctx);
1705 }
1706 
1707 static void
1708 metadata_probe_cb(void *priv, int rc,
1709 		  struct ocf_metadata_probe_status *status)
1710 {
1711 	struct metadata_probe_ctx *ctx = priv;
1712 
1713 	if (rc) {
1714 		/* -ENODATA means device does not have cache metadata on it */
1715 		if (rc != -OCF_ERR_NO_METADATA) {
1716 			ctx->result = rc;
1717 		}
1718 		examine_ctx_put(ctx);
1719 		return;
1720 	}
1721 
1722 	ocf_metadata_probe_cores(vbdev_ocf_ctx, ctx->volume, NULL, 0,
1723 				 metadata_probe_cores_get_num, ctx);
1724 }
1725 
1726 /* This is called after vbdev_ocf_examine
1727  * It allows to delay application initialization
1728  * until all OCF bdevs get registered
1729  * If vbdev has all of its base devices it starts asynchronously here
1730  * We first check if bdev appears in configuration,
1731  * if not we do metadata_probe() to create its configuration from bdev metadata */
1732 static void
1733 vbdev_ocf_examine_disk(struct spdk_bdev *bdev)
1734 {
1735 	const char *bdev_name = spdk_bdev_get_name(bdev);
1736 	struct vbdev_ocf *vbdev;
1737 	struct metadata_probe_ctx *ctx;
1738 	bool created_from_config = false;
1739 	int rc;
1740 
1741 	examine_start(bdev);
1742 
1743 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1744 		if (vbdev->state.doing_finish || vbdev->state.started) {
1745 			continue;
1746 		}
1747 
1748 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1749 			examine_start(bdev);
1750 			register_vbdev(vbdev, examine_done, bdev);
1751 			created_from_config = true;
1752 			continue;
1753 		}
1754 		if (!strcmp(bdev_name, vbdev->core.name)) {
1755 			examine_start(bdev);
1756 			register_vbdev(vbdev, examine_done, bdev);
1757 			examine_done(0, NULL, bdev);
1758 			return;
1759 		}
1760 	}
1761 
1762 	/* If devices is discovered during config we do not check for metadata */
1763 	if (created_from_config) {
1764 		examine_done(0, NULL, bdev);
1765 		return;
1766 	}
1767 
1768 	/* Metadata probe path
1769 	 * We create temporary OCF volume and a temporary base structure
1770 	 * to use them for ocf_metadata_probe() and for bottom adapter IOs
1771 	 * Then we get UUIDs of core devices an create configurations based on them */
1772 	ctx = calloc(1, sizeof(*ctx));
1773 	if (!ctx) {
1774 		examine_done(-ENOMEM, NULL, bdev);
1775 		return;
1776 	}
1777 
1778 	ctx->base.bdev = bdev;
1779 	ctx->refcnt = 1;
1780 
1781 	rc = spdk_bdev_open_ext(bdev_name, true, base_bdev_event_cb, NULL, &ctx->base.desc);
1782 	if (rc) {
1783 		ctx->result = rc;
1784 		examine_ctx_put(ctx);
1785 		return;
1786 	}
1787 
1788 	rc = ocf_ctx_volume_create(vbdev_ocf_ctx, &ctx->volume, NULL, SPDK_OBJECT);
1789 	if (rc) {
1790 		ctx->result = rc;
1791 		examine_ctx_put(ctx);
1792 		return;
1793 	}
1794 
1795 	rc = ocf_volume_open(ctx->volume, &ctx->base);
1796 	if (rc) {
1797 		ctx->result = rc;
1798 		examine_ctx_put(ctx);
1799 		return;
1800 	}
1801 
1802 	/* Save the thread where the base device is opened */
1803 	ctx->base.thread = spdk_get_thread();
1804 
1805 	ocf_metadata_probe(vbdev_ocf_ctx, ctx->volume, metadata_probe_cb, ctx);
1806 }
1807 
1808 static int
1809 vbdev_ocf_get_ctx_size(void)
1810 {
1811 	return sizeof(struct bdev_ocf_data);
1812 }
1813 
1814 static void
1815 fini_start(void)
1816 {
1817 	g_fini_started = true;
1818 }
1819 
1820 /* Module-global function table
1821  * Does not relate to vbdev instances */
1822 static struct spdk_bdev_module ocf_if = {
1823 	.name = "ocf",
1824 	.module_init = vbdev_ocf_init,
1825 	.fini_start = fini_start,
1826 	.module_fini = vbdev_ocf_module_fini,
1827 	.get_ctx_size = vbdev_ocf_get_ctx_size,
1828 	.examine_config = vbdev_ocf_examine,
1829 	.examine_disk   = vbdev_ocf_examine_disk,
1830 };
1831 SPDK_BDEV_MODULE_REGISTER(ocf, &ocf_if);
1832