xref: /spdk/module/bdev/ocf/vbdev_ocf.c (revision 0ed85362c8132a2d1927757fbcade66b6660d26a)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include <ocf/ocf.h>
35 #include <ocf/ocf_types.h>
36 #include <ocf/ocf_mngt.h>
37 
38 #include "ctx.h"
39 #include "data.h"
40 #include "volume.h"
41 #include "utils.h"
42 #include "vbdev_ocf.h"
43 
44 #include "spdk/bdev_module.h"
45 #include "spdk/conf.h"
46 #include "spdk/thread.h"
47 #include "spdk/string.h"
48 #include "spdk_internal/log.h"
49 #include "spdk/cpuset.h"
50 
51 static struct spdk_bdev_module ocf_if;
52 
53 static TAILQ_HEAD(, vbdev_ocf) g_ocf_vbdev_head
54 	= TAILQ_HEAD_INITIALIZER(g_ocf_vbdev_head);
55 
56 static TAILQ_HEAD(, examining_bdev) g_ocf_examining_bdevs_head
57 	= TAILQ_HEAD_INITIALIZER(g_ocf_examining_bdevs_head);
58 
59 bool g_fini_started = false;
60 
61 /* Structure for keeping list of bdevs that are claimed but not used yet */
62 struct examining_bdev {
63 	struct spdk_bdev           *bdev;
64 	TAILQ_ENTRY(examining_bdev) tailq;
65 };
66 
67 /* Add bdev to list of claimed */
68 static void
69 examine_start(struct spdk_bdev *bdev)
70 {
71 	struct examining_bdev *entry = malloc(sizeof(*entry));
72 
73 	assert(entry);
74 	entry->bdev = bdev;
75 	TAILQ_INSERT_TAIL(&g_ocf_examining_bdevs_head, entry, tailq);
76 }
77 
78 /* Find bdev on list of claimed bdevs, then remove it,
79  * if it was the last one on list then report examine done */
80 static void
81 examine_done(int status, struct vbdev_ocf *vbdev, void *cb_arg)
82 {
83 	struct spdk_bdev *bdev = cb_arg;
84 	struct examining_bdev *entry, *safe, *found = NULL;
85 
86 	TAILQ_FOREACH_SAFE(entry, &g_ocf_examining_bdevs_head, tailq, safe) {
87 		if (entry->bdev == bdev) {
88 			if (found) {
89 				goto remove;
90 			} else {
91 				found = entry;
92 			}
93 		}
94 	}
95 
96 	assert(found);
97 	spdk_bdev_module_examine_done(&ocf_if);
98 
99 remove:
100 	TAILQ_REMOVE(&g_ocf_examining_bdevs_head, found, tailq);
101 	free(found);
102 }
103 
104 /* Free allocated strings and structure itself
105  * Used at shutdown only */
106 static void
107 free_vbdev(struct vbdev_ocf *vbdev)
108 {
109 	if (!vbdev) {
110 		return;
111 	}
112 
113 	free(vbdev->name);
114 	free(vbdev->cache.name);
115 	free(vbdev->core.name);
116 	free(vbdev);
117 }
118 
119 /* Get existing cache base
120  * that is attached to other vbdev */
121 static struct vbdev_ocf_base *
122 get_other_cache_base(struct vbdev_ocf_base *base)
123 {
124 	struct vbdev_ocf *vbdev;
125 
126 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
127 		if (&vbdev->cache == base || !vbdev->cache.attached) {
128 			continue;
129 		}
130 		if (!strcmp(vbdev->cache.name, base->name)) {
131 			return &vbdev->cache;
132 		}
133 	}
134 
135 	return NULL;
136 }
137 
138 static bool
139 is_ocf_cache_running(struct vbdev_ocf *vbdev)
140 {
141 	if (vbdev->cache.attached && vbdev->ocf_cache) {
142 		return ocf_cache_is_running(vbdev->ocf_cache);
143 	}
144 	return false;
145 }
146 
147 /* Get existing OCF cache instance
148  * that is started by other vbdev */
149 static ocf_cache_t
150 get_other_cache_instance(struct vbdev_ocf *vbdev)
151 {
152 	struct vbdev_ocf *cmp;
153 
154 	TAILQ_FOREACH(cmp, &g_ocf_vbdev_head, tailq) {
155 		if (cmp->state.doing_finish || cmp == vbdev) {
156 			continue;
157 		}
158 		if (strcmp(cmp->cache.name, vbdev->cache.name)) {
159 			continue;
160 		}
161 		if (is_ocf_cache_running(cmp)) {
162 			return cmp->ocf_cache;
163 		}
164 	}
165 
166 	return NULL;
167 }
168 
169 static void
170 _remove_base_bdev(void *ctx)
171 {
172 	struct spdk_bdev_desc *desc = ctx;
173 
174 	spdk_bdev_close(desc);
175 }
176 
177 /* Close and unclaim base bdev */
178 static void
179 remove_base_bdev(struct vbdev_ocf_base *base)
180 {
181 	if (base->attached) {
182 		if (base->management_channel) {
183 			spdk_put_io_channel(base->management_channel);
184 		}
185 
186 		spdk_bdev_module_release_bdev(base->bdev);
187 		/* Close the underlying bdev on its same opened thread. */
188 		if (base->thread && base->thread != spdk_get_thread()) {
189 			spdk_thread_send_msg(base->thread, _remove_base_bdev, base->desc);
190 		} else {
191 			spdk_bdev_close(base->desc);
192 		}
193 		base->attached = false;
194 	}
195 }
196 
197 /* Finish unregister operation */
198 static void
199 unregister_finish(struct vbdev_ocf *vbdev)
200 {
201 	spdk_bdev_destruct_done(&vbdev->exp_bdev, vbdev->state.stop_status);
202 	ocf_mngt_cache_put(vbdev->ocf_cache);
203 	vbdev_ocf_cache_ctx_put(vbdev->cache_ctx);
204 	vbdev_ocf_mngt_continue(vbdev, 0);
205 }
206 
207 static void
208 close_core_bdev(struct vbdev_ocf *vbdev)
209 {
210 	remove_base_bdev(&vbdev->core);
211 	vbdev_ocf_mngt_continue(vbdev, 0);
212 }
213 
214 static void
215 remove_core_cmpl(void *priv, int error)
216 {
217 	struct vbdev_ocf *vbdev = priv;
218 
219 	ocf_mngt_cache_unlock(vbdev->ocf_cache);
220 	vbdev_ocf_mngt_continue(vbdev, error);
221 }
222 
223 /* Try to lock cache, then remove core */
224 static void
225 remove_core_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
226 {
227 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
228 
229 	if (error) {
230 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
231 			    error, vbdev->name);
232 		vbdev_ocf_mngt_continue(vbdev, error);
233 		return;
234 	}
235 
236 	ocf_mngt_cache_remove_core(vbdev->ocf_core, remove_core_cmpl, vbdev);
237 }
238 
239 /* Detach core base */
240 static void
241 detach_core(struct vbdev_ocf *vbdev)
242 {
243 	if (is_ocf_cache_running(vbdev)) {
244 		ocf_mngt_cache_lock(vbdev->ocf_cache, remove_core_cache_lock_cmpl, vbdev);
245 	} else {
246 		vbdev_ocf_mngt_continue(vbdev, 0);
247 	}
248 }
249 
250 static void
251 close_cache_bdev(struct vbdev_ocf *vbdev)
252 {
253 	remove_base_bdev(&vbdev->cache);
254 	vbdev_ocf_mngt_continue(vbdev, 0);
255 }
256 
257 /* Detach cache base */
258 static void
259 detach_cache(struct vbdev_ocf *vbdev)
260 {
261 	vbdev->state.stop_status = vbdev->mngt_ctx.status;
262 
263 	/* If some other vbdev references this cache bdev,
264 	 * we detach this only by changing the flag, without actual close */
265 	if (get_other_cache_base(&vbdev->cache)) {
266 		vbdev->cache.attached = false;
267 	}
268 
269 	vbdev_ocf_mngt_continue(vbdev, 0);
270 }
271 
272 static void
273 stop_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
274 {
275 	struct vbdev_ocf *vbdev = priv;
276 
277 	vbdev_ocf_queue_put(vbdev->cache_ctx->mngt_queue);
278 	ocf_mngt_cache_unlock(cache);
279 
280 	vbdev_ocf_mngt_continue(vbdev, error);
281 }
282 
283 /* Try to lock cache, then stop it */
284 static void
285 stop_vbdev_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
286 {
287 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
288 
289 	if (error) {
290 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
291 			    error, vbdev->name);
292 		vbdev_ocf_mngt_continue(vbdev, error);
293 		return;
294 	}
295 
296 	ocf_mngt_cache_stop(vbdev->ocf_cache, stop_vbdev_cmpl, vbdev);
297 }
298 
299 /* Stop OCF cache object
300  * vbdev_ocf is not operational after this */
301 static void
302 stop_vbdev(struct vbdev_ocf *vbdev)
303 {
304 	if (!is_ocf_cache_running(vbdev)) {
305 		vbdev_ocf_mngt_continue(vbdev, 0);
306 		return;
307 	}
308 
309 	if (!g_fini_started && get_other_cache_instance(vbdev)) {
310 		SPDK_NOTICELOG("Not stopping cache instance '%s'"
311 			       " because it is referenced by other OCF bdev\n",
312 			       vbdev->cache.name);
313 		vbdev_ocf_mngt_continue(vbdev, 0);
314 		return;
315 	}
316 
317 	ocf_mngt_cache_lock(vbdev->ocf_cache, stop_vbdev_cache_lock_cmpl, vbdev);
318 }
319 
320 static void
321 flush_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
322 {
323 	struct vbdev_ocf *vbdev = priv;
324 
325 	ocf_mngt_cache_unlock(cache);
326 	vbdev_ocf_mngt_continue(vbdev, error);
327 }
328 
329 static void
330 flush_vbdev_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
331 {
332 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
333 
334 	if (error) {
335 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
336 			    error, vbdev->name);
337 		vbdev_ocf_mngt_continue(vbdev, error);
338 		return;
339 	}
340 
341 	ocf_mngt_cache_flush(vbdev->ocf_cache, flush_vbdev_cmpl, vbdev);
342 }
343 
344 static void
345 flush_vbdev(struct vbdev_ocf *vbdev)
346 {
347 	if (!is_ocf_cache_running(vbdev)) {
348 		vbdev_ocf_mngt_continue(vbdev, -EINVAL);
349 		return;
350 	}
351 
352 	ocf_mngt_cache_lock(vbdev->ocf_cache, flush_vbdev_cache_lock_cmpl, vbdev);
353 }
354 
355 /* Procedures called during dirty unregister */
356 vbdev_ocf_mngt_fn unregister_path_dirty[] = {
357 	flush_vbdev,
358 	stop_vbdev,
359 	detach_cache,
360 	close_cache_bdev,
361 	detach_core,
362 	close_core_bdev,
363 	unregister_finish,
364 	NULL
365 };
366 
367 /* Procedures called during clean unregister */
368 vbdev_ocf_mngt_fn unregister_path_clean[] = {
369 	flush_vbdev,
370 	detach_core,
371 	close_core_bdev,
372 	stop_vbdev,
373 	detach_cache,
374 	close_cache_bdev,
375 	unregister_finish,
376 	NULL
377 };
378 
379 /* Start asynchronous management operation using unregister_path */
380 static void
381 unregister_cb(void *opaque)
382 {
383 	struct vbdev_ocf *vbdev = opaque;
384 	vbdev_ocf_mngt_fn *unregister_path;
385 	int rc;
386 
387 	unregister_path = vbdev->state.doing_clean_delete ?
388 			  unregister_path_clean : unregister_path_dirty;
389 
390 	rc = vbdev_ocf_mngt_start(vbdev, unregister_path, NULL, NULL);
391 	if (rc) {
392 		SPDK_ERRLOG("Unable to unregister OCF bdev: %d\n", rc);
393 		spdk_bdev_destruct_done(&vbdev->exp_bdev, rc);
394 	}
395 }
396 
397 /* Clean remove case - remove core and then cache, this order
398  * will remove instance permanently */
399 static void
400 _vbdev_ocf_destruct_clean(struct vbdev_ocf *vbdev)
401 {
402 	if (vbdev->core.attached) {
403 		detach_core(vbdev);
404 		close_core_bdev(vbdev);
405 	}
406 
407 	if (vbdev->cache.attached) {
408 		detach_cache(vbdev);
409 		close_cache_bdev(vbdev);
410 	}
411 }
412 
413 /* Dirty shutdown/hot remove case - remove cache and then core, this order
414  * will allow us to recover this instance in the future */
415 static void
416 _vbdev_ocf_destruct_dirty(struct vbdev_ocf *vbdev)
417 {
418 	if (vbdev->cache.attached) {
419 		detach_cache(vbdev);
420 		close_cache_bdev(vbdev);
421 	}
422 
423 	if (vbdev->core.attached) {
424 		detach_core(vbdev);
425 		close_core_bdev(vbdev);
426 	}
427 }
428 
429 /* Unregister io device with callback to unregister_cb
430  * This function is called during spdk_bdev_unregister */
431 static int
432 vbdev_ocf_destruct(void *opaque)
433 {
434 	struct vbdev_ocf *vbdev = opaque;
435 
436 	if (vbdev->state.doing_finish) {
437 		return -EALREADY;
438 	}
439 
440 	if (vbdev->state.starting && !vbdev->state.started) {
441 		/* Prevent before detach cache/core during register path of
442 		  this bdev */
443 		return -EBUSY;
444 	}
445 
446 	vbdev->state.doing_finish = true;
447 
448 	if (vbdev->state.started) {
449 		spdk_io_device_unregister(vbdev, unregister_cb);
450 		/* Return 1 because unregister is delayed */
451 		return 1;
452 	}
453 
454 	if (vbdev->state.doing_clean_delete) {
455 		_vbdev_ocf_destruct_clean(vbdev);
456 	} else {
457 		_vbdev_ocf_destruct_dirty(vbdev);
458 	}
459 
460 	return 0;
461 }
462 
463 /* Stop OCF cache and unregister SPDK bdev */
464 int
465 vbdev_ocf_delete(struct vbdev_ocf *vbdev, void (*cb)(void *, int), void *cb_arg)
466 {
467 	int rc = 0;
468 
469 	if (vbdev->state.started) {
470 		spdk_bdev_unregister(&vbdev->exp_bdev, cb, cb_arg);
471 	} else {
472 		rc = vbdev_ocf_destruct(vbdev);
473 		if (rc == 0 && cb) {
474 			cb(cb_arg, 0);
475 		}
476 	}
477 
478 	return rc;
479 }
480 
481 /* Remove cores permanently and then stop OCF cache and unregister SPDK bdev */
482 int
483 vbdev_ocf_delete_clean(struct vbdev_ocf *vbdev, void (*cb)(void *, int),
484 		       void *cb_arg)
485 {
486 	vbdev->state.doing_clean_delete = true;
487 
488 	return vbdev_ocf_delete(vbdev, cb, cb_arg);
489 }
490 
491 
492 /* If vbdev is online, return its object */
493 struct vbdev_ocf *
494 vbdev_ocf_get_by_name(const char *name)
495 {
496 	struct vbdev_ocf *vbdev;
497 
498 	if (name == NULL) {
499 		assert(false);
500 		return NULL;
501 	}
502 
503 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
504 		if (vbdev->name == NULL || vbdev->state.doing_finish) {
505 			continue;
506 		}
507 		if (strcmp(vbdev->name, name) == 0) {
508 			return vbdev;
509 		}
510 	}
511 	return NULL;
512 }
513 
514 /* Return matching base if parent vbdev is online */
515 struct vbdev_ocf_base *
516 vbdev_ocf_get_base_by_name(const char *name)
517 {
518 	struct vbdev_ocf *vbdev;
519 
520 	if (name == NULL) {
521 		assert(false);
522 		return NULL;
523 	}
524 
525 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
526 		if (vbdev->state.doing_finish) {
527 			continue;
528 		}
529 
530 		if (vbdev->cache.name && strcmp(vbdev->cache.name, name) == 0) {
531 			return &vbdev->cache;
532 		}
533 		if (vbdev->core.name && strcmp(vbdev->core.name, name) == 0) {
534 			return &vbdev->core;
535 		}
536 	}
537 	return NULL;
538 }
539 
540 /* Execute fn for each OCF device that is online or waits for base devices */
541 void
542 vbdev_ocf_foreach(vbdev_ocf_foreach_fn fn, void *ctx)
543 {
544 	struct vbdev_ocf *vbdev;
545 
546 	assert(fn != NULL);
547 
548 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
549 		if (!vbdev->state.doing_finish) {
550 			fn(vbdev, ctx);
551 		}
552 	}
553 }
554 
555 /* Called from OCF when SPDK_IO is completed */
556 static void
557 vbdev_ocf_io_submit_cb(struct ocf_io *io, int error)
558 {
559 	struct spdk_bdev_io *bdev_io = io->priv1;
560 
561 	if (error == 0) {
562 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
563 	} else if (error == -ENOMEM) {
564 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
565 	} else {
566 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
567 	}
568 
569 	ocf_io_put(io);
570 }
571 
572 /* Configure io parameters and send it to OCF */
573 static int
574 io_submit_to_ocf(struct spdk_bdev_io *bdev_io, struct ocf_io *io)
575 {
576 	switch (bdev_io->type) {
577 	case SPDK_BDEV_IO_TYPE_WRITE:
578 	case SPDK_BDEV_IO_TYPE_READ:
579 		ocf_core_submit_io(io);
580 		return 0;
581 	case SPDK_BDEV_IO_TYPE_FLUSH:
582 		ocf_core_submit_flush(io);
583 		return 0;
584 	case SPDK_BDEV_IO_TYPE_UNMAP:
585 		ocf_core_submit_discard(io);
586 		return 0;
587 	case SPDK_BDEV_IO_TYPE_RESET:
588 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
589 	default:
590 		SPDK_ERRLOG("Unsupported IO type: %d\n", bdev_io->type);
591 		return -EINVAL;
592 	}
593 }
594 
595 /* Submit SPDK-IO to OCF */
596 static void
597 io_handle(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
598 {
599 	struct vbdev_ocf *vbdev = bdev_io->bdev->ctxt;
600 	struct ocf_io *io = NULL;
601 	struct bdev_ocf_data *data = NULL;
602 	struct vbdev_ocf_qctx *qctx = spdk_io_channel_get_ctx(ch);
603 	uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
604 	uint64_t offset = bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen;
605 	int dir, flags = 0;
606 	int err;
607 
608 	switch (bdev_io->type) {
609 	case SPDK_BDEV_IO_TYPE_READ:
610 		dir = OCF_READ;
611 		break;
612 	case SPDK_BDEV_IO_TYPE_WRITE:
613 		dir = OCF_WRITE;
614 		break;
615 	case SPDK_BDEV_IO_TYPE_FLUSH:
616 		dir = OCF_WRITE;
617 		break;
618 	case SPDK_BDEV_IO_TYPE_UNMAP:
619 		dir = OCF_WRITE;
620 		break;
621 	default:
622 		err = -EINVAL;
623 		goto fail;
624 	}
625 
626 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
627 		flags = OCF_WRITE_FLUSH;
628 	}
629 
630 	io = ocf_core_new_io(vbdev->ocf_core, qctx->queue, offset, len, dir, 0, flags);
631 	if (!io) {
632 		err = -ENOMEM;
633 		goto fail;
634 	}
635 
636 	data = vbdev_ocf_data_from_spdk_io(bdev_io);
637 	if (!data) {
638 		err = -ENOMEM;
639 		goto fail;
640 	}
641 
642 	err = ocf_io_set_data(io, data, 0);
643 	if (err) {
644 		goto fail;
645 	}
646 
647 	ocf_io_set_cmpl(io, bdev_io, NULL, vbdev_ocf_io_submit_cb);
648 
649 	err = io_submit_to_ocf(bdev_io, io);
650 	if (err) {
651 		goto fail;
652 	}
653 
654 	return;
655 
656 fail:
657 	if (io) {
658 		ocf_io_put(io);
659 	}
660 
661 	if (err == -ENOMEM) {
662 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
663 	} else {
664 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
665 	}
666 }
667 
668 static void
669 vbdev_ocf_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
670 		     bool success)
671 {
672 	if (!success) {
673 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
674 		return;
675 	}
676 
677 	io_handle(ch, bdev_io);
678 }
679 
680 /* Called from bdev layer when an io to Cache vbdev is submitted */
681 static void
682 vbdev_ocf_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
683 {
684 	switch (bdev_io->type) {
685 	case SPDK_BDEV_IO_TYPE_READ:
686 		/* User does not have to allocate io vectors for the request,
687 		 * so in case they are not allocated, we allocate them here */
688 		spdk_bdev_io_get_buf(bdev_io, vbdev_ocf_get_buf_cb,
689 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
690 		break;
691 	case SPDK_BDEV_IO_TYPE_WRITE:
692 	case SPDK_BDEV_IO_TYPE_FLUSH:
693 	case SPDK_BDEV_IO_TYPE_UNMAP:
694 		io_handle(ch, bdev_io);
695 		break;
696 	case SPDK_BDEV_IO_TYPE_RESET:
697 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
698 	default:
699 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
700 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
701 		break;
702 	}
703 }
704 
705 /* Called from bdev layer */
706 static bool
707 vbdev_ocf_io_type_supported(void *opaque, enum spdk_bdev_io_type io_type)
708 {
709 	struct vbdev_ocf *vbdev = opaque;
710 
711 	switch (io_type) {
712 	case SPDK_BDEV_IO_TYPE_READ:
713 	case SPDK_BDEV_IO_TYPE_WRITE:
714 	case SPDK_BDEV_IO_TYPE_FLUSH:
715 	case SPDK_BDEV_IO_TYPE_UNMAP:
716 		return spdk_bdev_io_type_supported(vbdev->core.bdev, io_type);
717 	case SPDK_BDEV_IO_TYPE_RESET:
718 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
719 	default:
720 		return false;
721 	}
722 }
723 
724 /* Called from bdev layer */
725 static struct spdk_io_channel *
726 vbdev_ocf_get_io_channel(void *opaque)
727 {
728 	struct vbdev_ocf *bdev = opaque;
729 
730 	return spdk_get_io_channel(bdev);
731 }
732 
733 static int
734 vbdev_ocf_dump_info_json(void *opaque, struct spdk_json_write_ctx *w)
735 {
736 	struct vbdev_ocf *vbdev = opaque;
737 
738 	spdk_json_write_named_string(w, "cache_device", vbdev->cache.name);
739 	spdk_json_write_named_string(w, "core_device", vbdev->core.name);
740 
741 	spdk_json_write_named_string(w, "mode",
742 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
743 	spdk_json_write_named_uint32(w, "cache_line_size",
744 				     ocf_cache_get_line_size(vbdev->ocf_cache));
745 	spdk_json_write_named_bool(w, "metadata_volatile",
746 				   vbdev->cfg.cache.metadata_volatile);
747 
748 	return 0;
749 }
750 
751 static void
752 vbdev_ocf_write_json_config(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
753 {
754 	struct vbdev_ocf *vbdev = bdev->ctxt;
755 
756 	spdk_json_write_object_begin(w);
757 
758 	spdk_json_write_named_string(w, "method", "bdev_ocf_create");
759 
760 	spdk_json_write_named_object_begin(w, "params");
761 	spdk_json_write_named_string(w, "name", vbdev->name);
762 	spdk_json_write_named_string(w, "mode",
763 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
764 	spdk_json_write_named_uint32(w, "cache_line_size",
765 				     ocf_cache_get_line_size(vbdev->ocf_cache));
766 	spdk_json_write_named_string(w, "cache_bdev_name", vbdev->cache.name);
767 	spdk_json_write_named_string(w, "core_bdev_name", vbdev->core.name);
768 	spdk_json_write_object_end(w);
769 
770 	spdk_json_write_object_end(w);
771 }
772 
773 /* Cache vbdev function table
774  * Used by bdev layer */
775 static struct spdk_bdev_fn_table cache_dev_fn_table = {
776 	.destruct = vbdev_ocf_destruct,
777 	.io_type_supported = vbdev_ocf_io_type_supported,
778 	.submit_request	= vbdev_ocf_submit_request,
779 	.get_io_channel	= vbdev_ocf_get_io_channel,
780 	.write_config_json = vbdev_ocf_write_json_config,
781 	.dump_info_json = vbdev_ocf_dump_info_json,
782 };
783 
784 /* Poller function for the OCF queue
785  * We execute OCF requests here synchronously */
786 static int
787 queue_poll(void *opaque)
788 {
789 	struct vbdev_ocf_qctx *qctx = opaque;
790 	uint32_t iono = ocf_queue_pending_io(qctx->queue);
791 	int i, max = spdk_min(32, iono);
792 
793 	for (i = 0; i < max; i++) {
794 		ocf_queue_run_single(qctx->queue);
795 	}
796 
797 	if (iono > 0) {
798 		return SPDK_POLLER_BUSY;
799 	} else {
800 		return SPDK_POLLER_IDLE;
801 	}
802 }
803 
804 /* Called during ocf_submit_io, ocf_purge*
805  * and any other requests that need to submit io */
806 static void
807 vbdev_ocf_ctx_queue_kick(ocf_queue_t q)
808 {
809 }
810 
811 /* OCF queue deinitialization
812  * Called at ocf_cache_stop */
813 static void
814 vbdev_ocf_ctx_queue_stop(ocf_queue_t q)
815 {
816 	struct vbdev_ocf_qctx *qctx = ocf_queue_get_priv(q);
817 
818 	if (qctx) {
819 		spdk_put_io_channel(qctx->cache_ch);
820 		spdk_put_io_channel(qctx->core_ch);
821 		spdk_poller_unregister(&qctx->poller);
822 		if (qctx->allocated) {
823 			free(qctx);
824 		}
825 	}
826 }
827 
828 /* Queue ops is an interface for running queue thread
829  * stop() operation in called just before queue gets destroyed */
830 const struct ocf_queue_ops queue_ops = {
831 	.kick_sync = vbdev_ocf_ctx_queue_kick,
832 	.kick = vbdev_ocf_ctx_queue_kick,
833 	.stop = vbdev_ocf_ctx_queue_stop,
834 };
835 
836 /* Called on cache vbdev creation at every thread
837  * We allocate OCF queues here and SPDK poller for it */
838 static int
839 io_device_create_cb(void *io_device, void *ctx_buf)
840 {
841 	struct vbdev_ocf *vbdev = io_device;
842 	struct vbdev_ocf_qctx *qctx = ctx_buf;
843 	int rc;
844 
845 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &qctx->queue, &queue_ops);
846 	if (rc) {
847 		return rc;
848 	}
849 
850 	ocf_queue_set_priv(qctx->queue, qctx);
851 
852 	qctx->vbdev      = vbdev;
853 	qctx->cache_ch   = spdk_bdev_get_io_channel(vbdev->cache.desc);
854 	qctx->core_ch    = spdk_bdev_get_io_channel(vbdev->core.desc);
855 	qctx->poller     = SPDK_POLLER_REGISTER(queue_poll, qctx, 0);
856 
857 	return rc;
858 }
859 
860 /* Called per thread
861  * Put OCF queue and relaunch poller with new context to finish pending requests */
862 static void
863 io_device_destroy_cb(void *io_device, void *ctx_buf)
864 {
865 	/* Making a copy of context to use it after io channel will be destroyed */
866 	struct vbdev_ocf_qctx *copy = malloc(sizeof(*copy));
867 	struct vbdev_ocf_qctx *qctx = ctx_buf;
868 
869 	if (copy) {
870 		ocf_queue_set_priv(qctx->queue, copy);
871 		memcpy(copy, qctx, sizeof(*copy));
872 		spdk_poller_unregister(&qctx->poller);
873 		copy->poller = SPDK_POLLER_REGISTER(queue_poll, copy, 0);
874 		copy->allocated = true;
875 	} else {
876 		SPDK_ERRLOG("Unable to stop OCF queue properly: %s\n",
877 			    spdk_strerror(ENOMEM));
878 	}
879 
880 	vbdev_ocf_queue_put(qctx->queue);
881 }
882 
883 /* OCF management queue deinitialization */
884 static void
885 vbdev_ocf_ctx_mngt_queue_stop(ocf_queue_t q)
886 {
887 	struct spdk_poller *poller = ocf_queue_get_priv(q);
888 
889 	if (poller) {
890 		spdk_poller_unregister(&poller);
891 	}
892 }
893 
894 static int
895 mngt_queue_poll(void *opaque)
896 {
897 	ocf_queue_t q = opaque;
898 	uint32_t iono = ocf_queue_pending_io(q);
899 	int i, max = spdk_min(32, iono);
900 
901 	for (i = 0; i < max; i++) {
902 		ocf_queue_run_single(q);
903 	}
904 
905 	if (iono > 0) {
906 		return SPDK_POLLER_BUSY;
907 	} else {
908 		return SPDK_POLLER_IDLE;
909 	}
910 }
911 
912 static void
913 vbdev_ocf_ctx_mngt_queue_kick(ocf_queue_t q)
914 {
915 }
916 
917 /* Queue ops is an interface for running queue thread
918  * stop() operation in called just before queue gets destroyed */
919 const struct ocf_queue_ops mngt_queue_ops = {
920 	.kick_sync = NULL,
921 	.kick = vbdev_ocf_ctx_mngt_queue_kick,
922 	.stop = vbdev_ocf_ctx_mngt_queue_stop,
923 };
924 
925 static void
926 vbdev_ocf_mngt_exit(struct vbdev_ocf *vbdev, vbdev_ocf_mngt_fn *rollback_path, int rc)
927 {
928 	vbdev->state.starting = false;
929 	vbdev_ocf_mngt_stop(vbdev, rollback_path, rc);
930 }
931 
932 /* Create exported spdk object */
933 static void
934 finish_register(struct vbdev_ocf *vbdev)
935 {
936 	int result;
937 
938 	/* Copy properties of the base bdev */
939 	vbdev->exp_bdev.blocklen = vbdev->core.bdev->blocklen;
940 	vbdev->exp_bdev.write_cache = vbdev->core.bdev->write_cache;
941 	vbdev->exp_bdev.required_alignment = vbdev->core.bdev->required_alignment;
942 
943 	vbdev->exp_bdev.name = vbdev->name;
944 	vbdev->exp_bdev.product_name = "SPDK OCF";
945 
946 	vbdev->exp_bdev.blockcnt = vbdev->core.bdev->blockcnt;
947 	vbdev->exp_bdev.ctxt = vbdev;
948 	vbdev->exp_bdev.fn_table = &cache_dev_fn_table;
949 	vbdev->exp_bdev.module = &ocf_if;
950 
951 	/* Finally register vbdev in SPDK */
952 	spdk_io_device_register(vbdev, io_device_create_cb, io_device_destroy_cb,
953 				sizeof(struct vbdev_ocf_qctx), vbdev->name);
954 	result = spdk_bdev_register(&vbdev->exp_bdev);
955 	if (result) {
956 		SPDK_ERRLOG("Could not register exposed bdev %s\n",
957 			    vbdev->name);
958 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, result);
959 		return;
960 	} else {
961 		vbdev->state.started = true;
962 	}
963 
964 	vbdev_ocf_mngt_continue(vbdev, result);
965 }
966 
967 static void
968 add_core_cmpl(ocf_cache_t cache, ocf_core_t core, void *priv, int error)
969 {
970 	struct vbdev_ocf *vbdev = priv;
971 
972 	ocf_mngt_cache_unlock(cache);
973 
974 	if (error) {
975 		SPDK_ERRLOG("Error %d, failed to add core device to cache instance %s,"
976 			    "starting rollback\n", error, vbdev->name);
977 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
978 		return;
979 	} else {
980 		vbdev->ocf_core = core;
981 	}
982 
983 	vbdev_ocf_mngt_continue(vbdev, error);
984 }
985 
986 /* Try to lock cache, then add core */
987 static void
988 add_core_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
989 {
990 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
991 
992 	if (error) {
993 		SPDK_ERRLOG("Error %d, can not lock cache instance %s,"
994 			    "starting rollback\n", error, vbdev->name);
995 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
996 	}
997 	ocf_mngt_cache_add_core(vbdev->ocf_cache, &vbdev->cfg.core, add_core_cmpl, vbdev);
998 }
999 
1000 /* Add core for existing OCF cache instance */
1001 static void
1002 add_core(struct vbdev_ocf *vbdev)
1003 {
1004 	ocf_mngt_cache_lock(vbdev->ocf_cache, add_core_cache_lock_cmpl, vbdev);
1005 }
1006 
1007 static void
1008 start_cache_cmpl(ocf_cache_t cache, void *priv, int error)
1009 {
1010 	struct vbdev_ocf *vbdev = priv;
1011 
1012 	ocf_mngt_cache_unlock(cache);
1013 
1014 	if (error) {
1015 		SPDK_ERRLOG("Error %d during start cache %s, starting rollback\n",
1016 			    error, vbdev->name);
1017 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
1018 		return;
1019 	}
1020 
1021 	vbdev_ocf_mngt_continue(vbdev, error);
1022 }
1023 
1024 static int
1025 create_management_queue(struct vbdev_ocf *vbdev)
1026 {
1027 	struct spdk_poller *mngt_poller;
1028 	int rc;
1029 
1030 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &vbdev->cache_ctx->mngt_queue, &mngt_queue_ops);
1031 	if (rc) {
1032 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1033 		return rc;
1034 	}
1035 
1036 	mngt_poller = SPDK_POLLER_REGISTER(mngt_queue_poll, vbdev->cache_ctx->mngt_queue, 100);
1037 	if (mngt_poller == NULL) {
1038 		SPDK_ERRLOG("Unable to initiate mngt request: %s", spdk_strerror(ENOMEM));
1039 		return -ENOMEM;
1040 	}
1041 
1042 	ocf_queue_set_priv(vbdev->cache_ctx->mngt_queue, mngt_poller);
1043 	ocf_mngt_cache_set_mngt_queue(vbdev->ocf_cache, vbdev->cache_ctx->mngt_queue);
1044 
1045 	return 0;
1046 }
1047 
1048 /* Start OCF cache, attach caching device */
1049 static void
1050 start_cache(struct vbdev_ocf *vbdev)
1051 {
1052 	ocf_cache_t existing;
1053 	int rc;
1054 
1055 	if (is_ocf_cache_running(vbdev)) {
1056 		vbdev_ocf_mngt_stop(vbdev, NULL, -EALREADY);
1057 		return;
1058 	}
1059 
1060 	existing = get_other_cache_instance(vbdev);
1061 	if (existing) {
1062 		SPDK_NOTICELOG("OCF bdev %s connects to existing cache device %s\n",
1063 			       vbdev->name, vbdev->cache.name);
1064 		vbdev->ocf_cache = existing;
1065 		ocf_mngt_cache_get(vbdev->ocf_cache);
1066 		vbdev->cache_ctx = ocf_cache_get_priv(existing);
1067 		vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1068 		vbdev_ocf_mngt_continue(vbdev, 0);
1069 		return;
1070 	}
1071 
1072 	vbdev->cache_ctx = calloc(1, sizeof(struct vbdev_ocf_cache_ctx));
1073 	if (vbdev->cache_ctx == NULL) {
1074 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, -ENOMEM);
1075 		return;
1076 	}
1077 
1078 	vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1079 	pthread_mutex_init(&vbdev->cache_ctx->lock, NULL);
1080 
1081 	rc = ocf_mngt_cache_start(vbdev_ocf_ctx, &vbdev->ocf_cache, &vbdev->cfg.cache);
1082 	if (rc) {
1083 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, rc);
1084 		return;
1085 	}
1086 	ocf_mngt_cache_get(vbdev->ocf_cache);
1087 
1088 	ocf_cache_set_priv(vbdev->ocf_cache, vbdev->cache_ctx);
1089 
1090 	rc = create_management_queue(vbdev);
1091 	if (rc) {
1092 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1093 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, rc);
1094 		return;
1095 	}
1096 
1097 	if (vbdev->cfg.loadq) {
1098 		ocf_mngt_cache_load(vbdev->ocf_cache, &vbdev->cfg.device, start_cache_cmpl, vbdev);
1099 	} else {
1100 		ocf_mngt_cache_attach(vbdev->ocf_cache, &vbdev->cfg.device, start_cache_cmpl, vbdev);
1101 	}
1102 }
1103 
1104 /* Procedures called during register operation */
1105 vbdev_ocf_mngt_fn register_path[] = {
1106 	start_cache,
1107 	add_core,
1108 	finish_register,
1109 	NULL
1110 };
1111 
1112 /* Start cache instance and register OCF bdev */
1113 static void
1114 register_vbdev(struct vbdev_ocf *vbdev, vbdev_ocf_mngt_callback cb, void *cb_arg)
1115 {
1116 	int rc;
1117 
1118 	if (!(vbdev->core.attached && vbdev->cache.attached) || vbdev->state.started) {
1119 		cb(-EPERM, vbdev, cb_arg);
1120 		return;
1121 	}
1122 
1123 	vbdev->state.starting = true;
1124 	rc = vbdev_ocf_mngt_start(vbdev, register_path, cb, cb_arg);
1125 	if (rc) {
1126 		cb(rc, vbdev, cb_arg);
1127 	}
1128 }
1129 
1130 /* Init OCF configuration options
1131  * for core and cache devices */
1132 static void
1133 init_vbdev_config(struct vbdev_ocf *vbdev)
1134 {
1135 	struct vbdev_ocf_config *cfg = &vbdev->cfg;
1136 
1137 	snprintf(cfg->cache.name, sizeof(cfg->cache.name), "%s", vbdev->name);
1138 	snprintf(cfg->core.name, sizeof(cfg->core.name), "%s", vbdev->core.name);
1139 
1140 	/* TODO [metadata]: make configurable with persistent
1141 	 * metadata support */
1142 	cfg->cache.metadata_volatile = false;
1143 
1144 	/* This are suggested values that
1145 	 * should be sufficient for most use cases */
1146 	cfg->cache.backfill.max_queue_size = 65536;
1147 	cfg->cache.backfill.queue_unblock_size = 60000;
1148 
1149 	cfg->device.force = true;
1150 	cfg->device.perform_test = false;
1151 	cfg->device.discard_on_start = false;
1152 
1153 	vbdev->cfg.cache.locked = true;
1154 
1155 	cfg->core.volume_type = SPDK_OBJECT;
1156 	cfg->device.volume_type = SPDK_OBJECT;
1157 
1158 	if (vbdev->cfg.loadq) {
1159 		/* When doing cache_load(), we need to set try_add to true,
1160 		 * otherwise OCF will interpret this core as new
1161 		 * instead of the inactive one */
1162 		vbdev->cfg.core.try_add = true;
1163 	}
1164 
1165 	/* Serialize bdev names in OCF UUID to interpret on future loads
1166 	 * Core UUID is a triple of (core name, vbdev name, cache name)
1167 	 * Cache UUID is cache bdev name */
1168 	cfg->device.uuid.size = strlen(vbdev->cache.name) + 1;
1169 	cfg->device.uuid.data = vbdev->cache.name;
1170 
1171 	snprintf(vbdev->uuid, VBDEV_OCF_MD_MAX_LEN, "%s %s %s",
1172 		 vbdev->core.name, vbdev->name, vbdev->cache.name);
1173 	cfg->core.uuid.size = strlen(vbdev->uuid) + 1;
1174 	cfg->core.uuid.data = vbdev->uuid;
1175 	vbdev->uuid[strlen(vbdev->core.name)] = 0;
1176 	vbdev->uuid[strlen(vbdev->core.name) + 1 + strlen(vbdev->name)] = 0;
1177 }
1178 
1179 /* Allocate vbdev structure object and add it to the global list */
1180 static int
1181 init_vbdev(const char *vbdev_name,
1182 	   const char *cache_mode_name,
1183 	   const uint64_t cache_line_size,
1184 	   const char *cache_name,
1185 	   const char *core_name,
1186 	   bool loadq)
1187 {
1188 	struct vbdev_ocf *vbdev;
1189 	int rc = 0;
1190 
1191 	if (spdk_bdev_get_by_name(vbdev_name) || vbdev_ocf_get_by_name(vbdev_name)) {
1192 		SPDK_ERRLOG("Device with name '%s' already exists\n", vbdev_name);
1193 		return -EPERM;
1194 	}
1195 
1196 	vbdev = calloc(1, sizeof(*vbdev));
1197 	if (!vbdev) {
1198 		goto error_mem;
1199 	}
1200 
1201 	vbdev->cache.parent = vbdev;
1202 	vbdev->core.parent = vbdev;
1203 	vbdev->cache.is_cache = true;
1204 	vbdev->core.is_cache = false;
1205 
1206 	if (cache_mode_name) {
1207 		vbdev->cfg.cache.cache_mode
1208 			= ocf_get_cache_mode(cache_mode_name);
1209 	} else if (!loadq) { /* In load path it is OK to pass NULL as cache mode */
1210 		SPDK_ERRLOG("No cache mode specified\n");
1211 		rc = -EINVAL;
1212 		goto error_free;
1213 	}
1214 	if (vbdev->cfg.cache.cache_mode < 0) {
1215 		SPDK_ERRLOG("Incorrect cache mode '%s'\n", cache_mode_name);
1216 		rc = -EINVAL;
1217 		goto error_free;
1218 	}
1219 
1220 	ocf_cache_line_size_t set_cache_line_size = cache_line_size ?
1221 			(ocf_cache_line_size_t)cache_line_size * KiB :
1222 			ocf_cache_line_size_default;
1223 	if (set_cache_line_size == 0) {
1224 		SPDK_ERRLOG("Cache line size should be non-zero.\n");
1225 		rc = -EINVAL;
1226 		goto error_free;
1227 	}
1228 	vbdev->cfg.device.cache_line_size = set_cache_line_size;
1229 	vbdev->cfg.cache.cache_line_size = set_cache_line_size;
1230 
1231 	vbdev->name = strdup(vbdev_name);
1232 	if (!vbdev->name) {
1233 		goto error_mem;
1234 	}
1235 
1236 	vbdev->cache.name = strdup(cache_name);
1237 	if (!vbdev->cache.name) {
1238 		goto error_mem;
1239 	}
1240 
1241 	vbdev->core.name = strdup(core_name);
1242 	if (!vbdev->core.name) {
1243 		goto error_mem;
1244 	}
1245 
1246 	vbdev->cfg.loadq = loadq;
1247 	init_vbdev_config(vbdev);
1248 	TAILQ_INSERT_TAIL(&g_ocf_vbdev_head, vbdev, tailq);
1249 	return rc;
1250 
1251 error_mem:
1252 	rc = -ENOMEM;
1253 error_free:
1254 	free_vbdev(vbdev);
1255 	return rc;
1256 }
1257 
1258 /* Read configuration file at the start of SPDK application
1259  * This adds vbdevs to global list if some mentioned in config */
1260 static int
1261 vbdev_ocf_init(void)
1262 {
1263 	const char *vbdev_name, *modename, *cache_line_size, *cache_name, *core_name;
1264 	struct spdk_conf_section *sp;
1265 	int status;
1266 	uint64_t cache_line_size_uint64;
1267 
1268 	status = vbdev_ocf_ctx_init();
1269 	if (status) {
1270 		SPDK_ERRLOG("OCF ctx initialization failed with=%d\n", status);
1271 		return status;
1272 	}
1273 
1274 	status = vbdev_ocf_volume_init();
1275 	if (status) {
1276 		vbdev_ocf_ctx_cleanup();
1277 		SPDK_ERRLOG("OCF volume initialization failed with=%d\n", status);
1278 		return status;
1279 	}
1280 
1281 	sp = spdk_conf_find_section(NULL, "OCF");
1282 	if (sp == NULL) {
1283 		return 0;
1284 	}
1285 
1286 	for (int i = 0; ; i++) {
1287 		if (!spdk_conf_section_get_nval(sp, "OCF", i)) {
1288 			break;
1289 		}
1290 
1291 		vbdev_name = spdk_conf_section_get_nmval(sp, "OCF", i, 0);
1292 		if (!vbdev_name) {
1293 			SPDK_ERRLOG("No vbdev name specified\n");
1294 			continue;
1295 		}
1296 
1297 		modename = spdk_conf_section_get_nmval(sp, "OCF", i, 1);
1298 		if (!modename) {
1299 			SPDK_ERRLOG("No modename specified for OCF vbdev '%s'\n", vbdev_name);
1300 			continue;
1301 		}
1302 
1303 		cache_line_size = spdk_conf_section_get_nmval(sp, "OCF", i, 2);
1304 		if (!cache_line_size) {
1305 			SPDK_ERRLOG("No cache line size specified for OCF vbdev '%s'\n", vbdev_name);
1306 			continue;
1307 		}
1308 		cache_line_size_uint64 = strtoull(cache_line_size, NULL, 10);
1309 
1310 		cache_name = spdk_conf_section_get_nmval(sp, "OCF", i, 3);
1311 		if (!cache_name) {
1312 			SPDK_ERRLOG("No cache device specified for OCF vbdev '%s'\n", vbdev_name);
1313 			continue;
1314 		}
1315 
1316 		core_name = spdk_conf_section_get_nmval(sp, "OCF", i, 4);
1317 		if (!core_name) {
1318 			SPDK_ERRLOG("No core devices specified for OCF vbdev '%s'\n", vbdev_name);
1319 			continue;
1320 		}
1321 
1322 		status = init_vbdev(vbdev_name, modename, cache_line_size_uint64, cache_name, core_name, false);
1323 		if (status) {
1324 			SPDK_ERRLOG("Config initialization failed with code: %d\n", status);
1325 		}
1326 	}
1327 
1328 	return status;
1329 }
1330 
1331 /* Called after application shutdown started
1332  * Release memory of allocated structures here */
1333 static void
1334 vbdev_ocf_module_fini(void)
1335 {
1336 	struct vbdev_ocf *vbdev;
1337 
1338 	while ((vbdev = TAILQ_FIRST(&g_ocf_vbdev_head))) {
1339 		TAILQ_REMOVE(&g_ocf_vbdev_head, vbdev, tailq);
1340 		free_vbdev(vbdev);
1341 	}
1342 
1343 	vbdev_ocf_volume_cleanup();
1344 	vbdev_ocf_ctx_cleanup();
1345 }
1346 
1347 /* When base device gets unpluged this is called
1348  * We will unregister cache vbdev here
1349  * When cache device is removed, we delete every OCF bdev that used it */
1350 static void
1351 hotremove_cb(void *ctx)
1352 {
1353 	struct vbdev_ocf_base *base = ctx;
1354 	struct vbdev_ocf *vbdev;
1355 
1356 	if (!base->is_cache) {
1357 		if (base->parent->state.doing_finish) {
1358 			return;
1359 		}
1360 
1361 		SPDK_NOTICELOG("Deinitializing '%s' because its core device '%s' was removed\n",
1362 			       base->parent->name, base->name);
1363 		vbdev_ocf_delete(base->parent, NULL, NULL);
1364 		return;
1365 	}
1366 
1367 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1368 		if (vbdev->state.doing_finish) {
1369 			continue;
1370 		}
1371 		if (strcmp(base->name, vbdev->cache.name) == 0) {
1372 			SPDK_NOTICELOG("Deinitializing '%s' because"
1373 				       " its cache device '%s' was removed\n",
1374 				       vbdev->name, base->name);
1375 			vbdev_ocf_delete(vbdev, NULL, NULL);
1376 		}
1377 	}
1378 }
1379 
1380 /* Open base SPDK bdev and claim it */
1381 static int
1382 attach_base(struct vbdev_ocf_base *base)
1383 {
1384 	int status;
1385 
1386 	if (base->attached) {
1387 		return -EALREADY;
1388 	}
1389 
1390 	/* If base cache bdev was already opened by other vbdev,
1391 	 * we just copy its descriptor here */
1392 	if (base->is_cache) {
1393 		struct vbdev_ocf_base *existing = get_other_cache_base(base);
1394 		if (existing) {
1395 			base->desc = existing->desc;
1396 			base->management_channel = existing->management_channel;
1397 			base->attached = true;
1398 			return 0;
1399 		}
1400 	}
1401 
1402 	status = spdk_bdev_open(base->bdev, true, hotremove_cb, base, &base->desc);
1403 	if (status) {
1404 		SPDK_ERRLOG("Unable to open device '%s' for writing\n", base->name);
1405 		return status;
1406 	}
1407 
1408 	status = spdk_bdev_module_claim_bdev(base->bdev, base->desc,
1409 					     &ocf_if);
1410 	if (status) {
1411 		SPDK_ERRLOG("Unable to claim device '%s'\n", base->name);
1412 		spdk_bdev_close(base->desc);
1413 		return status;
1414 	}
1415 
1416 	base->management_channel = spdk_bdev_get_io_channel(base->desc);
1417 	if (!base->management_channel) {
1418 		SPDK_ERRLOG("Unable to get io channel '%s'\n", base->name);
1419 		spdk_bdev_module_release_bdev(base->bdev);
1420 		spdk_bdev_close(base->desc);
1421 		return -ENOMEM;
1422 	}
1423 
1424 	/* Save the thread where the base device is opened */
1425 	base->thread = spdk_get_thread();
1426 
1427 	base->attached = true;
1428 	return status;
1429 }
1430 
1431 /* Attach base bdevs */
1432 static int
1433 attach_base_bdevs(struct vbdev_ocf *vbdev,
1434 		  struct spdk_bdev *cache_bdev,
1435 		  struct spdk_bdev *core_bdev)
1436 {
1437 	int rc = 0;
1438 
1439 	if (cache_bdev) {
1440 		vbdev->cache.bdev = cache_bdev;
1441 		rc |= attach_base(&vbdev->cache);
1442 	}
1443 
1444 	if (core_bdev) {
1445 		vbdev->core.bdev = core_bdev;
1446 		rc |= attach_base(&vbdev->core);
1447 	}
1448 
1449 	return rc;
1450 }
1451 
1452 /* Init and then start vbdev if all base devices are present */
1453 void
1454 vbdev_ocf_construct(const char *vbdev_name,
1455 		    const char *cache_mode_name,
1456 		    const uint64_t cache_line_size,
1457 		    const char *cache_name,
1458 		    const char *core_name,
1459 		    bool loadq,
1460 		    void (*cb)(int, struct vbdev_ocf *, void *),
1461 		    void *cb_arg)
1462 {
1463 	int rc;
1464 	struct spdk_bdev *cache_bdev = spdk_bdev_get_by_name(cache_name);
1465 	struct spdk_bdev *core_bdev = spdk_bdev_get_by_name(core_name);
1466 	struct vbdev_ocf *vbdev;
1467 
1468 	rc = init_vbdev(vbdev_name, cache_mode_name, cache_line_size, cache_name, core_name, loadq);
1469 	if (rc) {
1470 		cb(rc, NULL, cb_arg);
1471 		return;
1472 	}
1473 
1474 	vbdev = vbdev_ocf_get_by_name(vbdev_name);
1475 	if (vbdev == NULL) {
1476 		cb(-ENODEV, NULL, cb_arg);
1477 		return;
1478 	}
1479 
1480 	if (cache_bdev == NULL) {
1481 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for cache device '%s' to connect\n",
1482 			       vbdev->name, cache_name);
1483 	}
1484 	if (core_bdev == NULL) {
1485 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for core device '%s' to connect\n",
1486 			       vbdev->name, core_name);
1487 	}
1488 
1489 	rc = attach_base_bdevs(vbdev, cache_bdev, core_bdev);
1490 	if (rc) {
1491 		cb(rc, vbdev, cb_arg);
1492 		return;
1493 	}
1494 
1495 	if (core_bdev && cache_bdev) {
1496 		register_vbdev(vbdev, cb, cb_arg);
1497 	} else {
1498 		cb(0, vbdev, cb_arg);
1499 	}
1500 }
1501 
1502 /* This called if new device is created in SPDK application
1503  * If that device named as one of base bdevs of OCF vbdev,
1504  * claim and open them */
1505 static void
1506 vbdev_ocf_examine(struct spdk_bdev *bdev)
1507 {
1508 	const char *bdev_name = spdk_bdev_get_name(bdev);
1509 	struct vbdev_ocf *vbdev;
1510 
1511 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1512 		if (vbdev->state.doing_finish) {
1513 			continue;
1514 		}
1515 
1516 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1517 			attach_base_bdevs(vbdev, bdev, NULL);
1518 			continue;
1519 		}
1520 		if (!strcmp(bdev_name, vbdev->core.name)) {
1521 			attach_base_bdevs(vbdev, NULL, bdev);
1522 			break;
1523 		}
1524 	}
1525 	spdk_bdev_module_examine_done(&ocf_if);
1526 }
1527 
1528 struct metadata_probe_ctx {
1529 	struct vbdev_ocf_base base;
1530 	ocf_volume_t volume;
1531 
1532 	struct ocf_volume_uuid *core_uuids;
1533 	unsigned int uuid_count;
1534 
1535 	int result;
1536 	int refcnt;
1537 };
1538 
1539 static void
1540 _examine_ctx_put(void *ctx)
1541 {
1542 	struct spdk_bdev_desc *desc = ctx;
1543 
1544 	spdk_bdev_close(desc);
1545 }
1546 
1547 static void
1548 examine_ctx_put(struct metadata_probe_ctx *ctx)
1549 {
1550 	unsigned int i;
1551 
1552 	ctx->refcnt--;
1553 	if (ctx->refcnt > 0) {
1554 		return;
1555 	}
1556 
1557 	if (ctx->result) {
1558 		SPDK_ERRLOG("OCF metadata probe for bdev '%s' failed with %d\n",
1559 			    spdk_bdev_get_name(ctx->base.bdev), ctx->result);
1560 	}
1561 
1562 	if (ctx->base.desc) {
1563 		/* Close the underlying bdev on its same opened thread. */
1564 		if (ctx->base.thread && ctx->base.thread != spdk_get_thread()) {
1565 			spdk_thread_send_msg(ctx->base.thread, _examine_ctx_put, ctx->base.desc);
1566 		} else {
1567 			spdk_bdev_close(ctx->base.desc);
1568 		}
1569 	}
1570 
1571 	if (ctx->volume) {
1572 		ocf_volume_destroy(ctx->volume);
1573 	}
1574 
1575 	if (ctx->core_uuids) {
1576 		for (i = 0; i < ctx->uuid_count; i++) {
1577 			free(ctx->core_uuids[i].data);
1578 		}
1579 	}
1580 	free(ctx->core_uuids);
1581 
1582 	examine_done(ctx->result, NULL, ctx->base.bdev);
1583 	free(ctx);
1584 }
1585 
1586 static void
1587 metadata_probe_construct_cb(int rc, struct vbdev_ocf *vbdev, void *vctx)
1588 {
1589 	struct metadata_probe_ctx *ctx = vctx;
1590 
1591 	examine_ctx_put(ctx);
1592 }
1593 
1594 /* This is second callback for ocf_metadata_probe_cores()
1595  * Here we create vbdev configurations based on UUIDs */
1596 static void
1597 metadata_probe_cores_construct(void *priv, int error, unsigned int num_cores)
1598 {
1599 	struct metadata_probe_ctx *ctx = priv;
1600 	const char *vbdev_name;
1601 	const char *core_name;
1602 	const char *cache_name;
1603 	unsigned int i;
1604 
1605 	if (error) {
1606 		ctx->result = error;
1607 		examine_ctx_put(ctx);
1608 		return;
1609 	}
1610 
1611 	for (i = 0; i < num_cores; i++) {
1612 		core_name = ocf_uuid_to_str(&ctx->core_uuids[i]);
1613 		vbdev_name = core_name + strlen(core_name) + 1;
1614 		cache_name = vbdev_name + strlen(vbdev_name) + 1;
1615 
1616 		if (strcmp(ctx->base.bdev->name, cache_name)) {
1617 			SPDK_NOTICELOG("OCF metadata found on %s belongs to bdev named '%s'\n",
1618 				       ctx->base.bdev->name, cache_name);
1619 		}
1620 
1621 		ctx->refcnt++;
1622 		vbdev_ocf_construct(vbdev_name, NULL, 0, cache_name, core_name, true,
1623 				    metadata_probe_construct_cb, ctx);
1624 	}
1625 
1626 	examine_ctx_put(ctx);
1627 }
1628 
1629 /* This callback is called after OCF reads cores UUIDs from cache metadata
1630  * Here we allocate memory for those UUIDs and call ocf_metadata_probe_cores() again */
1631 static void
1632 metadata_probe_cores_get_num(void *priv, int error, unsigned int num_cores)
1633 {
1634 	struct metadata_probe_ctx *ctx = priv;
1635 	unsigned int i;
1636 
1637 	if (error) {
1638 		ctx->result = error;
1639 		examine_ctx_put(ctx);
1640 		return;
1641 	}
1642 
1643 	ctx->uuid_count = num_cores;
1644 	ctx->core_uuids = calloc(num_cores, sizeof(struct ocf_volume_uuid));
1645 	if (!ctx->core_uuids) {
1646 		ctx->result = -ENOMEM;
1647 		examine_ctx_put(ctx);
1648 		return;
1649 	}
1650 
1651 	for (i = 0; i < ctx->uuid_count; i++) {
1652 		ctx->core_uuids[i].size = OCF_VOLUME_UUID_MAX_SIZE;
1653 		ctx->core_uuids[i].data = malloc(OCF_VOLUME_UUID_MAX_SIZE);
1654 		if (!ctx->core_uuids[i].data) {
1655 			ctx->result = -ENOMEM;
1656 			examine_ctx_put(ctx);
1657 			return;
1658 		}
1659 	}
1660 
1661 	ocf_metadata_probe_cores(vbdev_ocf_ctx, ctx->volume, ctx->core_uuids, ctx->uuid_count,
1662 				 metadata_probe_cores_construct, ctx);
1663 }
1664 
1665 static void
1666 metadata_probe_cb(void *priv, int rc,
1667 		  struct ocf_metadata_probe_status *status)
1668 {
1669 	struct metadata_probe_ctx *ctx = priv;
1670 
1671 	if (rc) {
1672 		/* -ENODATA means device does not have cache metadata on it */
1673 		if (rc != -OCF_ERR_NO_METADATA) {
1674 			ctx->result = rc;
1675 		}
1676 		examine_ctx_put(ctx);
1677 		return;
1678 	}
1679 
1680 	ocf_metadata_probe_cores(vbdev_ocf_ctx, ctx->volume, NULL, 0,
1681 				 metadata_probe_cores_get_num, ctx);
1682 }
1683 
1684 /* This is called after vbdev_ocf_examine
1685  * It allows to delay application initialization
1686  * until all OCF bdevs get registered
1687  * If vbdev has all of its base devices it starts asynchronously here
1688  * We first check if bdev appears in configuration,
1689  * if not we do metadata_probe() to create its configuration from bdev metadata */
1690 static void
1691 vbdev_ocf_examine_disk(struct spdk_bdev *bdev)
1692 {
1693 	const char *bdev_name = spdk_bdev_get_name(bdev);
1694 	struct vbdev_ocf *vbdev;
1695 	struct metadata_probe_ctx *ctx;
1696 	bool created_from_config = false;
1697 	int rc;
1698 
1699 	examine_start(bdev);
1700 
1701 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1702 		if (vbdev->state.doing_finish || vbdev->state.started) {
1703 			continue;
1704 		}
1705 
1706 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1707 			examine_start(bdev);
1708 			register_vbdev(vbdev, examine_done, bdev);
1709 			created_from_config = true;
1710 			continue;
1711 		}
1712 		if (!strcmp(bdev_name, vbdev->core.name)) {
1713 			examine_start(bdev);
1714 			register_vbdev(vbdev, examine_done, bdev);
1715 			examine_done(0, NULL, bdev);
1716 			return;
1717 		}
1718 	}
1719 
1720 	/* If devices is discovered during config we do not check for metadata */
1721 	if (created_from_config) {
1722 		examine_done(0, NULL, bdev);
1723 		return;
1724 	}
1725 
1726 	/* Metadata probe path
1727 	 * We create temporary OCF volume and a temporary base structure
1728 	 * to use them for ocf_metadata_probe() and for bottom adapter IOs
1729 	 * Then we get UUIDs of core devices an create configurations based on them */
1730 	ctx = calloc(1, sizeof(*ctx));
1731 	if (!ctx) {
1732 		examine_done(-ENOMEM, NULL, bdev);
1733 		return;
1734 	}
1735 
1736 	ctx->base.bdev = bdev;
1737 	ctx->refcnt = 1;
1738 
1739 	rc = spdk_bdev_open(ctx->base.bdev, true, NULL, NULL, &ctx->base.desc);
1740 	if (rc) {
1741 		ctx->result = rc;
1742 		examine_ctx_put(ctx);
1743 		return;
1744 	}
1745 
1746 	rc = ocf_ctx_volume_create(vbdev_ocf_ctx, &ctx->volume, NULL, SPDK_OBJECT);
1747 	if (rc) {
1748 		ctx->result = rc;
1749 		examine_ctx_put(ctx);
1750 		return;
1751 	}
1752 
1753 	rc = ocf_volume_open(ctx->volume, &ctx->base);
1754 	if (rc) {
1755 		ctx->result = rc;
1756 		examine_ctx_put(ctx);
1757 		return;
1758 	}
1759 
1760 	/* Save the thread where the base device is opened */
1761 	ctx->base.thread = spdk_get_thread();
1762 
1763 	ocf_metadata_probe(vbdev_ocf_ctx, ctx->volume, metadata_probe_cb, ctx);
1764 }
1765 
1766 static int
1767 vbdev_ocf_get_ctx_size(void)
1768 {
1769 	return sizeof(struct bdev_ocf_data);
1770 }
1771 
1772 static void
1773 fini_start(void)
1774 {
1775 	g_fini_started = true;
1776 }
1777 
1778 /* Module-global function table
1779  * Does not relate to vbdev instances */
1780 static struct spdk_bdev_module ocf_if = {
1781 	.name = "ocf",
1782 	.module_init = vbdev_ocf_init,
1783 	.fini_start = fini_start,
1784 	.module_fini = vbdev_ocf_module_fini,
1785 	.config_text = NULL,
1786 	.get_ctx_size = vbdev_ocf_get_ctx_size,
1787 	.examine_config = vbdev_ocf_examine,
1788 	.examine_disk   = vbdev_ocf_examine_disk,
1789 };
1790 SPDK_BDEV_MODULE_REGISTER(ocf, &ocf_if);
1791 
1792 SPDK_LOG_REGISTER_COMPONENT("vbdev_ocf", SPDK_TRACE_VBDEV_OCF)
1793