xref: /spdk/module/bdev/ocf/vbdev_ocf.c (revision 1f4f4cc75a522f897856e980a0b35d3c8fac24ed)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include <ocf/ocf.h>
35 #include <ocf/ocf_types.h>
36 #include <ocf/ocf_mngt.h>
37 
38 #include "ctx.h"
39 #include "data.h"
40 #include "volume.h"
41 #include "utils.h"
42 #include "vbdev_ocf.h"
43 
44 #include "spdk/bdev_module.h"
45 #include "spdk/conf.h"
46 #include "spdk/thread.h"
47 #include "spdk/string.h"
48 #include "spdk_internal/log.h"
49 #include "spdk/cpuset.h"
50 
51 static struct spdk_bdev_module ocf_if;
52 
53 static TAILQ_HEAD(, vbdev_ocf) g_ocf_vbdev_head
54 	= TAILQ_HEAD_INITIALIZER(g_ocf_vbdev_head);
55 
56 static TAILQ_HEAD(, examining_bdev) g_ocf_examining_bdevs_head
57 	= TAILQ_HEAD_INITIALIZER(g_ocf_examining_bdevs_head);
58 
59 bool g_fini_started = false;
60 
61 /* Structure for keeping list of bdevs that are claimed but not used yet */
62 struct examining_bdev {
63 	struct spdk_bdev           *bdev;
64 	TAILQ_ENTRY(examining_bdev) tailq;
65 };
66 
67 /* Add bdev to list of claimed */
68 static void
69 examine_start(struct spdk_bdev *bdev)
70 {
71 	struct examining_bdev *entry = malloc(sizeof(*entry));
72 
73 	assert(entry);
74 	entry->bdev = bdev;
75 	TAILQ_INSERT_TAIL(&g_ocf_examining_bdevs_head, entry, tailq);
76 }
77 
78 /* Find bdev on list of claimed bdevs, then remove it,
79  * if it was the last one on list then report examine done */
80 static void
81 examine_done(int status, struct vbdev_ocf *vbdev, void *cb_arg)
82 {
83 	struct spdk_bdev *bdev = cb_arg;
84 	struct examining_bdev *entry, *safe, *found = NULL;
85 
86 	TAILQ_FOREACH_SAFE(entry, &g_ocf_examining_bdevs_head, tailq, safe) {
87 		if (entry->bdev == bdev) {
88 			if (found) {
89 				goto remove;
90 			} else {
91 				found = entry;
92 			}
93 		}
94 	}
95 
96 	assert(found);
97 	spdk_bdev_module_examine_done(&ocf_if);
98 
99 remove:
100 	TAILQ_REMOVE(&g_ocf_examining_bdevs_head, found, tailq);
101 	free(found);
102 }
103 
104 /* Free allocated strings and structure itself
105  * Used at shutdown only */
106 static void
107 free_vbdev(struct vbdev_ocf *vbdev)
108 {
109 	if (!vbdev) {
110 		return;
111 	}
112 
113 	free(vbdev->name);
114 	free(vbdev->cache.name);
115 	free(vbdev->core.name);
116 	free(vbdev);
117 }
118 
119 /* Get existing cache base
120  * that is attached to other vbdev */
121 static struct vbdev_ocf_base *
122 get_other_cache_base(struct vbdev_ocf_base *base)
123 {
124 	struct vbdev_ocf *vbdev;
125 
126 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
127 		if (&vbdev->cache == base || !vbdev->cache.attached) {
128 			continue;
129 		}
130 		if (!strcmp(vbdev->cache.name, base->name)) {
131 			return &vbdev->cache;
132 		}
133 	}
134 
135 	return NULL;
136 }
137 
138 /* Get existing OCF cache instance
139  * that is started by other vbdev */
140 static ocf_cache_t
141 get_other_cache_instance(struct vbdev_ocf *vbdev)
142 {
143 	struct vbdev_ocf *cmp;
144 
145 	TAILQ_FOREACH(cmp, &g_ocf_vbdev_head, tailq) {
146 		if (cmp->state.doing_finish || cmp == vbdev) {
147 			continue;
148 		}
149 		if (strcmp(cmp->cache.name, vbdev->cache.name)) {
150 			continue;
151 		}
152 		if (cmp->ocf_cache) {
153 			return cmp->ocf_cache;
154 		}
155 	}
156 
157 	return NULL;
158 }
159 
160 static void
161 _remove_base_bdev(void *ctx)
162 {
163 	struct spdk_bdev_desc *desc = ctx;
164 
165 	spdk_bdev_close(desc);
166 }
167 
168 /* Close and unclaim base bdev */
169 static void
170 remove_base_bdev(struct vbdev_ocf_base *base)
171 {
172 	if (base->attached) {
173 		if (base->management_channel) {
174 			spdk_put_io_channel(base->management_channel);
175 		}
176 
177 		spdk_bdev_module_release_bdev(base->bdev);
178 		/* Close the underlying bdev on its same opened thread. */
179 		if (base->thread && base->thread != spdk_get_thread()) {
180 			spdk_thread_send_msg(base->thread, _remove_base_bdev, base->desc);
181 		} else {
182 			spdk_bdev_close(base->desc);
183 		}
184 		base->attached = false;
185 	}
186 }
187 
188 /* Finish unregister operation */
189 static void
190 unregister_finish(struct vbdev_ocf *vbdev)
191 {
192 	spdk_bdev_destruct_done(&vbdev->exp_bdev, vbdev->state.stop_status);
193 	vbdev_ocf_cache_ctx_put(vbdev->cache_ctx);
194 	vbdev_ocf_mngt_continue(vbdev, 0);
195 }
196 
197 static void
198 close_core_bdev(struct vbdev_ocf *vbdev)
199 {
200 	remove_base_bdev(&vbdev->core);
201 	vbdev_ocf_mngt_continue(vbdev, 0);
202 }
203 
204 static void
205 remove_core_cmpl(void *priv, int error)
206 {
207 	struct vbdev_ocf *vbdev = priv;
208 
209 	ocf_mngt_cache_unlock(vbdev->ocf_cache);
210 	vbdev_ocf_mngt_continue(vbdev, error);
211 }
212 
213 /* Try to lock cache, then remove core */
214 static void
215 remove_core_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
216 {
217 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
218 
219 	if (error) {
220 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
221 			    error, vbdev->name);
222 		vbdev_ocf_mngt_continue(vbdev, error);
223 		return;
224 	}
225 
226 	ocf_mngt_cache_remove_core(vbdev->ocf_core, remove_core_cmpl, vbdev);
227 }
228 
229 /* Detach core base */
230 static void
231 detach_core(struct vbdev_ocf *vbdev)
232 {
233 	if (vbdev->ocf_cache && ocf_cache_is_running(vbdev->ocf_cache)) {
234 		ocf_mngt_cache_lock(vbdev->ocf_cache, remove_core_cache_lock_cmpl, vbdev);
235 	} else {
236 		vbdev_ocf_mngt_continue(vbdev, 0);
237 	}
238 }
239 
240 static void
241 close_cache_bdev(struct vbdev_ocf *vbdev)
242 {
243 	remove_base_bdev(&vbdev->cache);
244 	vbdev_ocf_mngt_continue(vbdev, 0);
245 }
246 
247 /* Detach cache base */
248 static void
249 detach_cache(struct vbdev_ocf *vbdev)
250 {
251 	vbdev->state.stop_status = vbdev->mngt_ctx.status;
252 
253 	/* If some other vbdev references this cache bdev,
254 	 * we detach this only by changing the flag, without actual close */
255 	if (get_other_cache_base(&vbdev->cache)) {
256 		vbdev->cache.attached = false;
257 	}
258 
259 	vbdev_ocf_mngt_continue(vbdev, 0);
260 }
261 
262 static void
263 stop_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
264 {
265 	struct vbdev_ocf *vbdev = priv;
266 
267 	vbdev_ocf_queue_put(vbdev->cache_ctx->mngt_queue);
268 	ocf_mngt_cache_unlock(cache);
269 
270 	vbdev_ocf_mngt_continue(vbdev, error);
271 }
272 
273 /* Try to lock cache, then stop it */
274 static void
275 stop_vbdev_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
276 {
277 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
278 
279 	if (error) {
280 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
281 			    error, vbdev->name);
282 		vbdev_ocf_mngt_continue(vbdev, error);
283 		return;
284 	}
285 
286 	ocf_mngt_cache_stop(vbdev->ocf_cache, stop_vbdev_cmpl, vbdev);
287 }
288 
289 /* Stop OCF cache object
290  * vbdev_ocf is not operational after this */
291 static void
292 stop_vbdev(struct vbdev_ocf *vbdev)
293 {
294 	if (!ocf_cache_is_running(vbdev->ocf_cache)) {
295 		vbdev_ocf_mngt_continue(vbdev, 0);
296 		return;
297 	}
298 
299 	if (!g_fini_started && get_other_cache_instance(vbdev)) {
300 		SPDK_NOTICELOG("Not stopping cache instance '%s'"
301 			       " because it is referenced by other OCF bdev\n",
302 			       vbdev->cache.name);
303 		vbdev_ocf_mngt_continue(vbdev, 0);
304 		return;
305 	}
306 
307 	ocf_mngt_cache_lock(vbdev->ocf_cache, stop_vbdev_cache_lock_cmpl, vbdev);
308 }
309 
310 static void
311 flush_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
312 {
313 	struct vbdev_ocf *vbdev = priv;
314 
315 	ocf_mngt_cache_unlock(cache);
316 	vbdev_ocf_mngt_continue(vbdev, error);
317 }
318 
319 static void
320 flush_vbdev_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
321 {
322 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
323 
324 	if (error) {
325 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
326 			    error, vbdev->name);
327 		vbdev_ocf_mngt_continue(vbdev, error);
328 		return;
329 	}
330 
331 	ocf_mngt_cache_flush(vbdev->ocf_cache, flush_vbdev_cmpl, vbdev);
332 }
333 
334 static void
335 flush_vbdev(struct vbdev_ocf *vbdev)
336 {
337 	if (!ocf_cache_is_running(vbdev->ocf_cache)) {
338 		vbdev_ocf_mngt_continue(vbdev, -EINVAL);
339 		return;
340 	}
341 
342 	ocf_mngt_cache_lock(vbdev->ocf_cache, flush_vbdev_cache_lock_cmpl, vbdev);
343 }
344 
345 /* Procedures called during dirty unregister */
346 vbdev_ocf_mngt_fn unregister_path_dirty[] = {
347 	flush_vbdev,
348 	stop_vbdev,
349 	detach_cache,
350 	close_cache_bdev,
351 	detach_core,
352 	close_core_bdev,
353 	unregister_finish,
354 	NULL
355 };
356 
357 /* Procedures called during clean unregister */
358 vbdev_ocf_mngt_fn unregister_path_clean[] = {
359 	flush_vbdev,
360 	detach_core,
361 	close_core_bdev,
362 	stop_vbdev,
363 	detach_cache,
364 	close_cache_bdev,
365 	unregister_finish,
366 	NULL
367 };
368 
369 /* Start asynchronous management operation using unregister_path */
370 static void
371 unregister_cb(void *opaque)
372 {
373 	struct vbdev_ocf *vbdev = opaque;
374 	vbdev_ocf_mngt_fn *unregister_path;
375 	int rc;
376 
377 	unregister_path = vbdev->state.doing_clean_delete ?
378 			  unregister_path_clean : unregister_path_dirty;
379 
380 	rc = vbdev_ocf_mngt_start(vbdev, unregister_path, NULL, NULL);
381 	if (rc) {
382 		SPDK_ERRLOG("Unable to unregister OCF bdev: %d\n", rc);
383 		spdk_bdev_destruct_done(&vbdev->exp_bdev, rc);
384 	}
385 }
386 
387 /* Clean remove case - remove core and then cache, this order
388  * will remove instance permanently */
389 static void
390 _vbdev_ocf_destruct_clean(struct vbdev_ocf *vbdev)
391 {
392 	if (vbdev->core.attached) {
393 		detach_core(vbdev);
394 		close_core_bdev(vbdev);
395 	}
396 
397 	if (vbdev->cache.attached) {
398 		detach_cache(vbdev);
399 		close_cache_bdev(vbdev);
400 	}
401 }
402 
403 /* Dirty shutdown/hot remove case - remove cache and then core, this order
404  * will allow us to recover this instance in the future */
405 static void
406 _vbdev_ocf_destruct_dirty(struct vbdev_ocf *vbdev)
407 {
408 	if (vbdev->cache.attached) {
409 		detach_cache(vbdev);
410 		close_cache_bdev(vbdev);
411 	}
412 
413 	if (vbdev->core.attached) {
414 		detach_core(vbdev);
415 		close_core_bdev(vbdev);
416 	}
417 }
418 
419 /* Unregister io device with callback to unregister_cb
420  * This function is called during spdk_bdev_unregister */
421 static int
422 vbdev_ocf_destruct(void *opaque)
423 {
424 	struct vbdev_ocf *vbdev = opaque;
425 
426 	if (vbdev->state.doing_finish) {
427 		return -EALREADY;
428 	}
429 
430 	if (vbdev->state.starting && !vbdev->state.started) {
431 		/* Prevent before detach cache/core during register path of
432 		  this bdev */
433 		return -EBUSY;
434 	}
435 
436 	vbdev->state.doing_finish = true;
437 
438 	if (vbdev->state.started) {
439 		spdk_io_device_unregister(vbdev, unregister_cb);
440 		/* Return 1 because unregister is delayed */
441 		return 1;
442 	}
443 
444 	if (vbdev->state.doing_clean_delete) {
445 		_vbdev_ocf_destruct_clean(vbdev);
446 	} else {
447 		_vbdev_ocf_destruct_dirty(vbdev);
448 	}
449 
450 	return 0;
451 }
452 
453 /* Stop OCF cache and unregister SPDK bdev */
454 int
455 vbdev_ocf_delete(struct vbdev_ocf *vbdev, void (*cb)(void *, int), void *cb_arg)
456 {
457 	int rc = 0;
458 
459 	if (vbdev->state.started) {
460 		spdk_bdev_unregister(&vbdev->exp_bdev, cb, cb_arg);
461 	} else {
462 		rc = vbdev_ocf_destruct(vbdev);
463 		if (rc == 0 && cb) {
464 			cb(cb_arg, 0);
465 		}
466 	}
467 
468 	return rc;
469 }
470 
471 /* Remove cores permanently and then stop OCF cache and unregister SPDK bdev */
472 int
473 vbdev_ocf_delete_clean(struct vbdev_ocf *vbdev, void (*cb)(void *, int),
474 		       void *cb_arg)
475 {
476 	vbdev->state.doing_clean_delete = true;
477 
478 	return vbdev_ocf_delete(vbdev, cb, cb_arg);
479 }
480 
481 
482 /* If vbdev is online, return its object */
483 struct vbdev_ocf *
484 vbdev_ocf_get_by_name(const char *name)
485 {
486 	struct vbdev_ocf *vbdev;
487 
488 	if (name == NULL) {
489 		assert(false);
490 		return NULL;
491 	}
492 
493 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
494 		if (vbdev->name == NULL || vbdev->state.doing_finish) {
495 			continue;
496 		}
497 		if (strcmp(vbdev->name, name) == 0) {
498 			return vbdev;
499 		}
500 	}
501 	return NULL;
502 }
503 
504 /* Return matching base if parent vbdev is online */
505 struct vbdev_ocf_base *
506 vbdev_ocf_get_base_by_name(const char *name)
507 {
508 	struct vbdev_ocf *vbdev;
509 
510 	if (name == NULL) {
511 		assert(false);
512 		return NULL;
513 	}
514 
515 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
516 		if (vbdev->state.doing_finish) {
517 			continue;
518 		}
519 
520 		if (vbdev->cache.name && strcmp(vbdev->cache.name, name) == 0) {
521 			return &vbdev->cache;
522 		}
523 		if (vbdev->core.name && strcmp(vbdev->core.name, name) == 0) {
524 			return &vbdev->core;
525 		}
526 	}
527 	return NULL;
528 }
529 
530 /* Execute fn for each OCF device that is online or waits for base devices */
531 void
532 vbdev_ocf_foreach(vbdev_ocf_foreach_fn fn, void *ctx)
533 {
534 	struct vbdev_ocf *vbdev;
535 
536 	assert(fn != NULL);
537 
538 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
539 		if (!vbdev->state.doing_finish) {
540 			fn(vbdev, ctx);
541 		}
542 	}
543 }
544 
545 /* Called from OCF when SPDK_IO is completed */
546 static void
547 vbdev_ocf_io_submit_cb(struct ocf_io *io, int error)
548 {
549 	struct spdk_bdev_io *bdev_io = io->priv1;
550 
551 	if (error == 0) {
552 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
553 	} else if (error == -ENOMEM) {
554 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
555 	} else {
556 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
557 	}
558 
559 	ocf_io_put(io);
560 }
561 
562 /* Configure io parameters and send it to OCF */
563 static int
564 io_submit_to_ocf(struct spdk_bdev_io *bdev_io, struct ocf_io *io)
565 {
566 	switch (bdev_io->type) {
567 	case SPDK_BDEV_IO_TYPE_WRITE:
568 	case SPDK_BDEV_IO_TYPE_READ:
569 		ocf_core_submit_io(io);
570 		return 0;
571 	case SPDK_BDEV_IO_TYPE_FLUSH:
572 		ocf_core_submit_flush(io);
573 		return 0;
574 	case SPDK_BDEV_IO_TYPE_UNMAP:
575 		ocf_core_submit_discard(io);
576 		return 0;
577 	case SPDK_BDEV_IO_TYPE_RESET:
578 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
579 	default:
580 		SPDK_ERRLOG("Unsupported IO type: %d\n", bdev_io->type);
581 		return -EINVAL;
582 	}
583 }
584 
585 /* Submit SPDK-IO to OCF */
586 static void
587 io_handle(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
588 {
589 	struct vbdev_ocf *vbdev = bdev_io->bdev->ctxt;
590 	struct ocf_io *io = NULL;
591 	struct bdev_ocf_data *data = NULL;
592 	struct vbdev_ocf_qctx *qctx = spdk_io_channel_get_ctx(ch);
593 	uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
594 	uint64_t offset = bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen;
595 	int dir, flags = 0;
596 	int err;
597 
598 	switch (bdev_io->type) {
599 	case SPDK_BDEV_IO_TYPE_READ:
600 		dir = OCF_READ;
601 		break;
602 	case SPDK_BDEV_IO_TYPE_WRITE:
603 		dir = OCF_WRITE;
604 		break;
605 	case SPDK_BDEV_IO_TYPE_FLUSH:
606 		dir = OCF_WRITE;
607 		break;
608 	case SPDK_BDEV_IO_TYPE_UNMAP:
609 		dir = OCF_WRITE;
610 		break;
611 	default:
612 		err = -EINVAL;
613 		goto fail;
614 	}
615 
616 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
617 		flags = OCF_WRITE_FLUSH;
618 	}
619 
620 	io = ocf_core_new_io(vbdev->ocf_core, qctx->queue, offset, len, dir, 0, flags);
621 	if (!io) {
622 		err = -ENOMEM;
623 		goto fail;
624 	}
625 
626 	data = vbdev_ocf_data_from_spdk_io(bdev_io);
627 	if (!data) {
628 		err = -ENOMEM;
629 		goto fail;
630 	}
631 
632 	err = ocf_io_set_data(io, data, 0);
633 	if (err) {
634 		goto fail;
635 	}
636 
637 	ocf_io_set_cmpl(io, bdev_io, NULL, vbdev_ocf_io_submit_cb);
638 
639 	err = io_submit_to_ocf(bdev_io, io);
640 	if (err) {
641 		goto fail;
642 	}
643 
644 	return;
645 
646 fail:
647 	if (io) {
648 		ocf_io_put(io);
649 	}
650 
651 	if (err == -ENOMEM) {
652 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
653 	} else {
654 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
655 	}
656 }
657 
658 static void
659 vbdev_ocf_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
660 		     bool success)
661 {
662 	if (!success) {
663 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
664 		return;
665 	}
666 
667 	io_handle(ch, bdev_io);
668 }
669 
670 /* Called from bdev layer when an io to Cache vbdev is submitted */
671 static void
672 vbdev_ocf_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
673 {
674 	switch (bdev_io->type) {
675 	case SPDK_BDEV_IO_TYPE_READ:
676 		/* User does not have to allocate io vectors for the request,
677 		 * so in case they are not allocated, we allocate them here */
678 		spdk_bdev_io_get_buf(bdev_io, vbdev_ocf_get_buf_cb,
679 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
680 		break;
681 	case SPDK_BDEV_IO_TYPE_WRITE:
682 	case SPDK_BDEV_IO_TYPE_FLUSH:
683 	case SPDK_BDEV_IO_TYPE_UNMAP:
684 		io_handle(ch, bdev_io);
685 		break;
686 	case SPDK_BDEV_IO_TYPE_RESET:
687 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
688 	default:
689 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
690 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
691 		break;
692 	}
693 }
694 
695 /* Called from bdev layer */
696 static bool
697 vbdev_ocf_io_type_supported(void *opaque, enum spdk_bdev_io_type io_type)
698 {
699 	struct vbdev_ocf *vbdev = opaque;
700 
701 	switch (io_type) {
702 	case SPDK_BDEV_IO_TYPE_READ:
703 	case SPDK_BDEV_IO_TYPE_WRITE:
704 	case SPDK_BDEV_IO_TYPE_FLUSH:
705 	case SPDK_BDEV_IO_TYPE_UNMAP:
706 		return spdk_bdev_io_type_supported(vbdev->core.bdev, io_type);
707 	case SPDK_BDEV_IO_TYPE_RESET:
708 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
709 	default:
710 		return false;
711 	}
712 }
713 
714 /* Called from bdev layer */
715 static struct spdk_io_channel *
716 vbdev_ocf_get_io_channel(void *opaque)
717 {
718 	struct vbdev_ocf *bdev = opaque;
719 
720 	return spdk_get_io_channel(bdev);
721 }
722 
723 static int
724 vbdev_ocf_dump_info_json(void *opaque, struct spdk_json_write_ctx *w)
725 {
726 	struct vbdev_ocf *vbdev = opaque;
727 
728 	spdk_json_write_named_string(w, "cache_device", vbdev->cache.name);
729 	spdk_json_write_named_string(w, "core_device", vbdev->core.name);
730 
731 	spdk_json_write_named_string(w, "mode",
732 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
733 	spdk_json_write_named_uint32(w, "cache_line_size",
734 				     ocf_cache_get_line_size(vbdev->ocf_cache));
735 	spdk_json_write_named_bool(w, "metadata_volatile",
736 				   vbdev->cfg.cache.metadata_volatile);
737 
738 	return 0;
739 }
740 
741 static void
742 vbdev_ocf_write_json_config(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
743 {
744 	struct vbdev_ocf *vbdev = bdev->ctxt;
745 
746 	spdk_json_write_object_begin(w);
747 
748 	spdk_json_write_named_string(w, "method", "bdev_ocf_create");
749 
750 	spdk_json_write_named_object_begin(w, "params");
751 	spdk_json_write_named_string(w, "name", vbdev->name);
752 	spdk_json_write_named_string(w, "mode",
753 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
754 	spdk_json_write_named_string(w, "cache_bdev_name", vbdev->cache.name);
755 	spdk_json_write_named_string(w, "core_bdev_name", vbdev->core.name);
756 	spdk_json_write_object_end(w);
757 
758 	spdk_json_write_object_end(w);
759 }
760 
761 /* Cache vbdev function table
762  * Used by bdev layer */
763 static struct spdk_bdev_fn_table cache_dev_fn_table = {
764 	.destruct = vbdev_ocf_destruct,
765 	.io_type_supported = vbdev_ocf_io_type_supported,
766 	.submit_request	= vbdev_ocf_submit_request,
767 	.get_io_channel	= vbdev_ocf_get_io_channel,
768 	.write_config_json = vbdev_ocf_write_json_config,
769 	.dump_info_json = vbdev_ocf_dump_info_json,
770 };
771 
772 /* Poller function for the OCF queue
773  * We execute OCF requests here synchronously */
774 static int
775 queue_poll(void *opaque)
776 {
777 	struct vbdev_ocf_qctx *qctx = opaque;
778 	uint32_t iono = ocf_queue_pending_io(qctx->queue);
779 	int i, max = spdk_min(32, iono);
780 
781 	for (i = 0; i < max; i++) {
782 		ocf_queue_run_single(qctx->queue);
783 	}
784 
785 	if (iono > 0) {
786 		return 1;
787 	} else {
788 		return 0;
789 	}
790 }
791 
792 /* Called during ocf_submit_io, ocf_purge*
793  * and any other requests that need to submit io */
794 static void
795 vbdev_ocf_ctx_queue_kick(ocf_queue_t q)
796 {
797 }
798 
799 /* OCF queue deinitialization
800  * Called at ocf_cache_stop */
801 static void
802 vbdev_ocf_ctx_queue_stop(ocf_queue_t q)
803 {
804 	struct vbdev_ocf_qctx *qctx = ocf_queue_get_priv(q);
805 
806 	if (qctx) {
807 		spdk_put_io_channel(qctx->cache_ch);
808 		spdk_put_io_channel(qctx->core_ch);
809 		spdk_poller_unregister(&qctx->poller);
810 		if (qctx->allocated) {
811 			free(qctx);
812 		}
813 	}
814 }
815 
816 /* Queue ops is an interface for running queue thread
817  * stop() operation in called just before queue gets destroyed */
818 const struct ocf_queue_ops queue_ops = {
819 	.kick_sync = vbdev_ocf_ctx_queue_kick,
820 	.kick = vbdev_ocf_ctx_queue_kick,
821 	.stop = vbdev_ocf_ctx_queue_stop,
822 };
823 
824 /* Called on cache vbdev creation at every thread
825  * We allocate OCF queues here and SPDK poller for it */
826 static int
827 io_device_create_cb(void *io_device, void *ctx_buf)
828 {
829 	struct vbdev_ocf *vbdev = io_device;
830 	struct vbdev_ocf_qctx *qctx = ctx_buf;
831 	int rc;
832 
833 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &qctx->queue, &queue_ops);
834 	if (rc) {
835 		return rc;
836 	}
837 
838 	ocf_queue_set_priv(qctx->queue, qctx);
839 
840 	qctx->vbdev      = vbdev;
841 	qctx->cache_ch   = spdk_bdev_get_io_channel(vbdev->cache.desc);
842 	qctx->core_ch    = spdk_bdev_get_io_channel(vbdev->core.desc);
843 	qctx->poller     = SPDK_POLLER_REGISTER(queue_poll, qctx, 0);
844 
845 	return rc;
846 }
847 
848 /* Called per thread
849  * Put OCF queue and relaunch poller with new context to finish pending requests */
850 static void
851 io_device_destroy_cb(void *io_device, void *ctx_buf)
852 {
853 	/* Making a copy of context to use it after io channel will be destroyed */
854 	struct vbdev_ocf_qctx *copy = malloc(sizeof(*copy));
855 	struct vbdev_ocf_qctx *qctx = ctx_buf;
856 
857 	if (copy) {
858 		ocf_queue_set_priv(qctx->queue, copy);
859 		memcpy(copy, qctx, sizeof(*copy));
860 		spdk_poller_unregister(&qctx->poller);
861 		copy->poller = SPDK_POLLER_REGISTER(queue_poll, copy, 0);
862 		copy->allocated = true;
863 	} else {
864 		SPDK_ERRLOG("Unable to stop OCF queue properly: %s\n",
865 			    spdk_strerror(ENOMEM));
866 	}
867 
868 	vbdev_ocf_queue_put(qctx->queue);
869 }
870 
871 /* OCF management queue deinitialization */
872 static void
873 vbdev_ocf_ctx_mngt_queue_stop(ocf_queue_t q)
874 {
875 	struct spdk_poller *poller = ocf_queue_get_priv(q);
876 
877 	if (poller) {
878 		spdk_poller_unregister(&poller);
879 	}
880 }
881 
882 static int
883 mngt_queue_poll(void *opaque)
884 {
885 	ocf_queue_t q = opaque;
886 	uint32_t iono = ocf_queue_pending_io(q);
887 	int i, max = spdk_min(32, iono);
888 
889 	for (i = 0; i < max; i++) {
890 		ocf_queue_run_single(q);
891 	}
892 
893 	if (iono > 0) {
894 		return 1;
895 	} else {
896 		return 0;
897 	}
898 }
899 
900 static void
901 vbdev_ocf_ctx_mngt_queue_kick(ocf_queue_t q)
902 {
903 }
904 
905 /* Queue ops is an interface for running queue thread
906  * stop() operation in called just before queue gets destroyed */
907 const struct ocf_queue_ops mngt_queue_ops = {
908 	.kick_sync = NULL,
909 	.kick = vbdev_ocf_ctx_mngt_queue_kick,
910 	.stop = vbdev_ocf_ctx_mngt_queue_stop,
911 };
912 
913 static void
914 clear_starting_indicator_vbdev(struct vbdev_ocf *vbdev)
915 {
916 	vbdev->state.starting = false;
917 }
918 
919 /* Create exported spdk object */
920 static void
921 finish_register(struct vbdev_ocf *vbdev)
922 {
923 	int result;
924 
925 	/* Copy properties of the base bdev */
926 	vbdev->exp_bdev.blocklen = vbdev->core.bdev->blocklen;
927 	vbdev->exp_bdev.write_cache = vbdev->core.bdev->write_cache;
928 	vbdev->exp_bdev.required_alignment = vbdev->core.bdev->required_alignment;
929 
930 	vbdev->exp_bdev.name = vbdev->name;
931 	vbdev->exp_bdev.product_name = "SPDK OCF";
932 
933 	vbdev->exp_bdev.blockcnt = vbdev->core.bdev->blockcnt;
934 	vbdev->exp_bdev.ctxt = vbdev;
935 	vbdev->exp_bdev.fn_table = &cache_dev_fn_table;
936 	vbdev->exp_bdev.module = &ocf_if;
937 
938 	/* Finally register vbdev in SPDK */
939 	spdk_io_device_register(vbdev, io_device_create_cb, io_device_destroy_cb,
940 				sizeof(struct vbdev_ocf_qctx), vbdev->name);
941 	result = spdk_bdev_register(&vbdev->exp_bdev);
942 	if (result) {
943 		SPDK_ERRLOG("Could not register exposed bdev %s\n",
944 			    vbdev->name);
945 		clear_starting_indicator_vbdev(vbdev);
946 		vbdev_ocf_mngt_stop(vbdev, unregister_path_dirty, result);
947 		return;
948 	} else {
949 		vbdev->state.started = true;
950 	}
951 
952 	vbdev_ocf_mngt_continue(vbdev, result);
953 }
954 
955 static void
956 add_core_cmpl(ocf_cache_t cache, ocf_core_t core, void *priv, int error)
957 {
958 	struct vbdev_ocf *vbdev = priv;
959 
960 	ocf_mngt_cache_unlock(cache);
961 
962 	if (error) {
963 		SPDK_ERRLOG("Error %d, failed to add core device to cache instance %s,"
964 			    "starting rollback\n", error, vbdev->name);
965 		clear_starting_indicator_vbdev(vbdev);
966 		vbdev_ocf_mngt_stop(vbdev, unregister_path_dirty, error);
967 		return;
968 	} else {
969 		vbdev->ocf_core = core;
970 	}
971 
972 	vbdev_ocf_mngt_continue(vbdev, error);
973 }
974 
975 /* Try to lock cache, then add core */
976 static void
977 add_core_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
978 {
979 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
980 
981 	if (error) {
982 		SPDK_ERRLOG("Error %d, can not lock cache instance %s,"
983 			    "starting rollback\n", error, vbdev->name);
984 		clear_starting_indicator_vbdev(vbdev);
985 		vbdev_ocf_mngt_stop(vbdev, unregister_path_dirty, error);
986 	}
987 	ocf_mngt_cache_add_core(vbdev->ocf_cache, &vbdev->cfg.core, add_core_cmpl, vbdev);
988 }
989 
990 /* Add core for existing OCF cache instance */
991 static void
992 add_core(struct vbdev_ocf *vbdev)
993 {
994 	ocf_mngt_cache_lock(vbdev->ocf_cache, add_core_cache_lock_cmpl, vbdev);
995 }
996 
997 static void
998 start_cache_cmpl(ocf_cache_t cache, void *priv, int error)
999 {
1000 	struct vbdev_ocf *vbdev = priv;
1001 
1002 	ocf_mngt_cache_unlock(cache);
1003 
1004 	if (error) {
1005 		SPDK_ERRLOG("Error %d during start cache %s, starting rollback\n",
1006 			    error, vbdev->name);
1007 		clear_starting_indicator_vbdev(vbdev);
1008 		vbdev_ocf_mngt_stop(vbdev, unregister_path_dirty, error);
1009 		return;
1010 	}
1011 
1012 	vbdev_ocf_mngt_continue(vbdev, error);
1013 }
1014 
1015 static int
1016 create_management_queue(struct vbdev_ocf *vbdev)
1017 {
1018 	struct spdk_poller *mngt_poller;
1019 	int rc;
1020 
1021 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &vbdev->cache_ctx->mngt_queue, &mngt_queue_ops);
1022 	if (rc) {
1023 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1024 		return rc;
1025 	}
1026 
1027 	mngt_poller = SPDK_POLLER_REGISTER(mngt_queue_poll, vbdev->cache_ctx->mngt_queue, 100);
1028 	if (mngt_poller == NULL) {
1029 		SPDK_ERRLOG("Unable to initiate mngt request: %s", spdk_strerror(ENOMEM));
1030 		return -ENOMEM;
1031 	}
1032 
1033 	ocf_queue_set_priv(vbdev->cache_ctx->mngt_queue, mngt_poller);
1034 	ocf_mngt_cache_set_mngt_queue(vbdev->ocf_cache, vbdev->cache_ctx->mngt_queue);
1035 
1036 	return 0;
1037 }
1038 
1039 /* Start OCF cache, attach caching device */
1040 static void
1041 start_cache(struct vbdev_ocf *vbdev)
1042 {
1043 	ocf_cache_t existing;
1044 	int rc;
1045 
1046 	if (vbdev->ocf_cache) {
1047 		vbdev_ocf_mngt_stop(vbdev, NULL, -EALREADY);
1048 		return;
1049 	}
1050 
1051 	existing = get_other_cache_instance(vbdev);
1052 	if (existing) {
1053 		SPDK_NOTICELOG("OCF bdev %s connects to existing cache device %s\n",
1054 			       vbdev->name, vbdev->cache.name);
1055 		vbdev->ocf_cache = existing;
1056 		vbdev->cache_ctx = ocf_cache_get_priv(existing);
1057 		vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1058 		vbdev_ocf_mngt_continue(vbdev, 0);
1059 		return;
1060 	}
1061 
1062 	vbdev->cache_ctx = calloc(1, sizeof(struct vbdev_ocf_cache_ctx));
1063 	if (vbdev->cache_ctx == NULL) {
1064 		clear_starting_indicator_vbdev(vbdev);
1065 		vbdev_ocf_mngt_stop(vbdev, unregister_path_dirty, -ENOMEM);
1066 		return;
1067 	}
1068 
1069 	vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1070 	pthread_mutex_init(&vbdev->cache_ctx->lock, NULL);
1071 
1072 	rc = ocf_mngt_cache_start(vbdev_ocf_ctx, &vbdev->ocf_cache, &vbdev->cfg.cache);
1073 	if (rc) {
1074 		clear_starting_indicator_vbdev(vbdev);
1075 		vbdev_ocf_mngt_stop(vbdev, unregister_path_dirty, rc);
1076 		return;
1077 	}
1078 
1079 	ocf_cache_set_priv(vbdev->ocf_cache, vbdev->cache_ctx);
1080 
1081 	rc = create_management_queue(vbdev);
1082 	if (rc) {
1083 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1084 		clear_starting_indicator_vbdev(vbdev);
1085 		vbdev_ocf_mngt_stop(vbdev, unregister_path_dirty, rc);
1086 		return;
1087 	}
1088 
1089 	if (vbdev->cfg.loadq) {
1090 		ocf_mngt_cache_load(vbdev->ocf_cache, &vbdev->cfg.device, start_cache_cmpl, vbdev);
1091 	} else {
1092 		ocf_mngt_cache_attach(vbdev->ocf_cache, &vbdev->cfg.device, start_cache_cmpl, vbdev);
1093 	}
1094 }
1095 
1096 /* Procedures called during register operation */
1097 vbdev_ocf_mngt_fn register_path[] = {
1098 	start_cache,
1099 	add_core,
1100 	finish_register,
1101 	NULL
1102 };
1103 
1104 /* Start cache instance and register OCF bdev */
1105 static void
1106 register_vbdev(struct vbdev_ocf *vbdev, vbdev_ocf_mngt_callback cb, void *cb_arg)
1107 {
1108 	int rc;
1109 
1110 	if (!(vbdev->core.attached && vbdev->cache.attached) || vbdev->state.started) {
1111 		cb(-EPERM, vbdev, cb_arg);
1112 		return;
1113 	}
1114 
1115 	vbdev->state.starting = true;
1116 	rc = vbdev_ocf_mngt_start(vbdev, register_path, cb, cb_arg);
1117 	if (rc) {
1118 		cb(rc, vbdev, cb_arg);
1119 	}
1120 }
1121 
1122 /* Init OCF configuration options
1123  * for core and cache devices */
1124 static void
1125 init_vbdev_config(struct vbdev_ocf *vbdev)
1126 {
1127 	struct vbdev_ocf_config *cfg = &vbdev->cfg;
1128 
1129 	snprintf(cfg->cache.name, sizeof(cfg->cache.name), "%s", vbdev->name);
1130 	snprintf(cfg->core.name, sizeof(cfg->core.name), "%s", vbdev->core.name);
1131 
1132 	/* TODO [metadata]: make configurable with persistent
1133 	 * metadata support */
1134 	cfg->cache.metadata_volatile = false;
1135 
1136 	/* TODO [cache line size]: make cache line size configurable
1137 	 * Using standard 4KiB for now */
1138 	cfg->cache.cache_line_size = ocf_cache_line_size_4;
1139 
1140 	/* This are suggested values that
1141 	 * should be sufficient for most use cases */
1142 	cfg->cache.backfill.max_queue_size = 65536;
1143 	cfg->cache.backfill.queue_unblock_size = 60000;
1144 
1145 	/* TODO [cache line size] */
1146 	cfg->device.cache_line_size = ocf_cache_line_size_4;
1147 	cfg->device.force = true;
1148 	cfg->device.perform_test = false;
1149 	cfg->device.discard_on_start = false;
1150 
1151 	vbdev->cfg.cache.locked = true;
1152 
1153 	cfg->core.volume_type = SPDK_OBJECT;
1154 	cfg->device.volume_type = SPDK_OBJECT;
1155 
1156 	if (vbdev->cfg.loadq) {
1157 		/* When doing cache_load(), we need to set try_add to true,
1158 		 * otherwise OCF will interpret this core as new
1159 		 * instead of the inactive one */
1160 		vbdev->cfg.core.try_add = true;
1161 	}
1162 
1163 	/* Serialize bdev names in OCF UUID to interpret on future loads
1164 	 * Core UUID is a triple of (core name, vbdev name, cache name)
1165 	 * Cache UUID is cache bdev name */
1166 	cfg->device.uuid.size = strlen(vbdev->cache.name) + 1;
1167 	cfg->device.uuid.data = vbdev->cache.name;
1168 
1169 	snprintf(vbdev->uuid, VBDEV_OCF_MD_MAX_LEN, "%s %s %s",
1170 		 vbdev->core.name, vbdev->name, vbdev->cache.name);
1171 	cfg->core.uuid.size = strlen(vbdev->uuid) + 1;
1172 	cfg->core.uuid.data = vbdev->uuid;
1173 	vbdev->uuid[strlen(vbdev->core.name)] = 0;
1174 	vbdev->uuid[strlen(vbdev->core.name) + 1 + strlen(vbdev->name)] = 0;
1175 }
1176 
1177 /* Allocate vbdev structure object and add it to the global list */
1178 static int
1179 init_vbdev(const char *vbdev_name,
1180 	   const char *cache_mode_name,
1181 	   const char *cache_name,
1182 	   const char *core_name,
1183 	   bool loadq)
1184 {
1185 	struct vbdev_ocf *vbdev;
1186 	int rc = 0;
1187 
1188 	if (spdk_bdev_get_by_name(vbdev_name) || vbdev_ocf_get_by_name(vbdev_name)) {
1189 		SPDK_ERRLOG("Device with name '%s' already exists\n", vbdev_name);
1190 		return -EPERM;
1191 	}
1192 
1193 	vbdev = calloc(1, sizeof(*vbdev));
1194 	if (!vbdev) {
1195 		goto error_mem;
1196 	}
1197 
1198 	vbdev->cache.parent = vbdev;
1199 	vbdev->core.parent = vbdev;
1200 	vbdev->cache.is_cache = true;
1201 	vbdev->core.is_cache = false;
1202 
1203 	if (cache_mode_name) {
1204 		vbdev->cfg.cache.cache_mode
1205 			= ocf_get_cache_mode(cache_mode_name);
1206 	} else if (!loadq) { /* In load path it is OK to pass NULL as cache mode */
1207 		SPDK_ERRLOG("No cache mode specified\n");
1208 		rc = -EINVAL;
1209 		goto error_free;
1210 	}
1211 	if (vbdev->cfg.cache.cache_mode < 0) {
1212 		SPDK_ERRLOG("Incorrect cache mode '%s'\n", cache_mode_name);
1213 		rc = -EINVAL;
1214 		goto error_free;
1215 	}
1216 
1217 	vbdev->name = strdup(vbdev_name);
1218 	if (!vbdev->name) {
1219 		goto error_mem;
1220 	}
1221 
1222 	vbdev->cache.name = strdup(cache_name);
1223 	if (!vbdev->cache.name) {
1224 		goto error_mem;
1225 	}
1226 
1227 	vbdev->core.name = strdup(core_name);
1228 	if (!vbdev->core.name) {
1229 		goto error_mem;
1230 	}
1231 
1232 	vbdev->cfg.loadq = loadq;
1233 	init_vbdev_config(vbdev);
1234 	TAILQ_INSERT_TAIL(&g_ocf_vbdev_head, vbdev, tailq);
1235 	return rc;
1236 
1237 error_mem:
1238 	rc = -ENOMEM;
1239 error_free:
1240 	free_vbdev(vbdev);
1241 	return rc;
1242 }
1243 
1244 /* Read configuration file at the start of SPDK application
1245  * This adds vbdevs to global list if some mentioned in config */
1246 static int
1247 vbdev_ocf_init(void)
1248 {
1249 	const char *vbdev_name, *modename, *cache_name, *core_name;
1250 	struct spdk_conf_section *sp;
1251 	int status;
1252 
1253 	status = vbdev_ocf_ctx_init();
1254 	if (status) {
1255 		SPDK_ERRLOG("OCF ctx initialization failed with=%d\n", status);
1256 		return status;
1257 	}
1258 
1259 	status = vbdev_ocf_volume_init();
1260 	if (status) {
1261 		vbdev_ocf_ctx_cleanup();
1262 		SPDK_ERRLOG("OCF volume initialization failed with=%d\n", status);
1263 		return status;
1264 	}
1265 
1266 	sp = spdk_conf_find_section(NULL, "OCF");
1267 	if (sp == NULL) {
1268 		return 0;
1269 	}
1270 
1271 	for (int i = 0; ; i++) {
1272 		if (!spdk_conf_section_get_nval(sp, "OCF", i)) {
1273 			break;
1274 		}
1275 
1276 		vbdev_name = spdk_conf_section_get_nmval(sp, "OCF", i, 0);
1277 		if (!vbdev_name) {
1278 			SPDK_ERRLOG("No vbdev name specified\n");
1279 			continue;
1280 		}
1281 
1282 		modename = spdk_conf_section_get_nmval(sp, "OCF", i, 1);
1283 		if (!modename) {
1284 			SPDK_ERRLOG("No modename specified for OCF vbdev '%s'\n", vbdev_name);
1285 			continue;
1286 		}
1287 
1288 		cache_name = spdk_conf_section_get_nmval(sp, "OCF", i, 2);
1289 		if (!cache_name) {
1290 			SPDK_ERRLOG("No cache device specified for OCF vbdev '%s'\n", vbdev_name);
1291 			continue;
1292 		}
1293 
1294 		core_name = spdk_conf_section_get_nmval(sp, "OCF", i, 3);
1295 		if (!core_name) {
1296 			SPDK_ERRLOG("No core devices specified for OCF vbdev '%s'\n", vbdev_name);
1297 			continue;
1298 		}
1299 
1300 		status = init_vbdev(vbdev_name, modename, cache_name, core_name, false);
1301 		if (status) {
1302 			SPDK_ERRLOG("Config initialization failed with code: %d\n", status);
1303 		}
1304 	}
1305 
1306 	return status;
1307 }
1308 
1309 /* Called after application shutdown started
1310  * Release memory of allocated structures here */
1311 static void
1312 vbdev_ocf_module_fini(void)
1313 {
1314 	struct vbdev_ocf *vbdev;
1315 
1316 	while ((vbdev = TAILQ_FIRST(&g_ocf_vbdev_head))) {
1317 		TAILQ_REMOVE(&g_ocf_vbdev_head, vbdev, tailq);
1318 		free_vbdev(vbdev);
1319 	}
1320 
1321 	vbdev_ocf_volume_cleanup();
1322 	vbdev_ocf_ctx_cleanup();
1323 }
1324 
1325 /* When base device gets unpluged this is called
1326  * We will unregister cache vbdev here
1327  * When cache device is removed, we delete every OCF bdev that used it */
1328 static void
1329 hotremove_cb(void *ctx)
1330 {
1331 	struct vbdev_ocf_base *base = ctx;
1332 	struct vbdev_ocf *vbdev;
1333 
1334 	if (!base->is_cache) {
1335 		if (base->parent->state.doing_finish) {
1336 			return;
1337 		}
1338 
1339 		SPDK_NOTICELOG("Deinitializing '%s' because its core device '%s' was removed\n",
1340 			       base->parent->name, base->name);
1341 		vbdev_ocf_delete(base->parent, NULL, NULL);
1342 		return;
1343 	}
1344 
1345 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1346 		if (vbdev->state.doing_finish) {
1347 			continue;
1348 		}
1349 		if (strcmp(base->name, vbdev->cache.name) == 0) {
1350 			SPDK_NOTICELOG("Deinitializing '%s' because"
1351 				       " its cache device '%s' was removed\n",
1352 				       vbdev->name, base->name);
1353 			vbdev_ocf_delete(vbdev, NULL, NULL);
1354 		}
1355 	}
1356 }
1357 
1358 /* Open base SPDK bdev and claim it */
1359 static int
1360 attach_base(struct vbdev_ocf_base *base)
1361 {
1362 	int status;
1363 
1364 	if (base->attached) {
1365 		return -EALREADY;
1366 	}
1367 
1368 	/* If base cache bdev was already opened by other vbdev,
1369 	 * we just copy its descriptor here */
1370 	if (base->is_cache) {
1371 		struct vbdev_ocf_base *existing = get_other_cache_base(base);
1372 		if (existing) {
1373 			base->desc = existing->desc;
1374 			base->management_channel = existing->management_channel;
1375 			base->attached = true;
1376 			return 0;
1377 		}
1378 	}
1379 
1380 	status = spdk_bdev_open(base->bdev, true, hotremove_cb, base, &base->desc);
1381 	if (status) {
1382 		SPDK_ERRLOG("Unable to open device '%s' for writing\n", base->name);
1383 		return status;
1384 	}
1385 
1386 	status = spdk_bdev_module_claim_bdev(base->bdev, base->desc,
1387 					     &ocf_if);
1388 	if (status) {
1389 		SPDK_ERRLOG("Unable to claim device '%s'\n", base->name);
1390 		spdk_bdev_close(base->desc);
1391 		return status;
1392 	}
1393 
1394 	base->management_channel = spdk_bdev_get_io_channel(base->desc);
1395 	if (!base->management_channel) {
1396 		SPDK_ERRLOG("Unable to get io channel '%s'\n", base->name);
1397 		spdk_bdev_module_release_bdev(base->bdev);
1398 		spdk_bdev_close(base->desc);
1399 		return -ENOMEM;
1400 	}
1401 
1402 	/* Save the thread where the base device is opened */
1403 	base->thread = spdk_get_thread();
1404 
1405 	base->attached = true;
1406 	return status;
1407 }
1408 
1409 /* Attach base bdevs */
1410 static int
1411 attach_base_bdevs(struct vbdev_ocf *vbdev,
1412 		  struct spdk_bdev *cache_bdev,
1413 		  struct spdk_bdev *core_bdev)
1414 {
1415 	int rc = 0;
1416 
1417 	if (cache_bdev) {
1418 		vbdev->cache.bdev = cache_bdev;
1419 		rc |= attach_base(&vbdev->cache);
1420 	}
1421 
1422 	if (core_bdev) {
1423 		vbdev->core.bdev = core_bdev;
1424 		rc |= attach_base(&vbdev->core);
1425 	}
1426 
1427 	return rc;
1428 }
1429 
1430 /* Init and then start vbdev if all base devices are present */
1431 void
1432 vbdev_ocf_construct(const char *vbdev_name,
1433 		    const char *cache_mode_name,
1434 		    const char *cache_name,
1435 		    const char *core_name,
1436 		    bool loadq,
1437 		    void (*cb)(int, struct vbdev_ocf *, void *),
1438 		    void *cb_arg)
1439 {
1440 	int rc;
1441 	struct spdk_bdev *cache_bdev = spdk_bdev_get_by_name(cache_name);
1442 	struct spdk_bdev *core_bdev = spdk_bdev_get_by_name(core_name);
1443 	struct vbdev_ocf *vbdev;
1444 
1445 	rc = init_vbdev(vbdev_name, cache_mode_name, cache_name, core_name, loadq);
1446 	if (rc) {
1447 		cb(rc, NULL, cb_arg);
1448 		return;
1449 	}
1450 
1451 	vbdev = vbdev_ocf_get_by_name(vbdev_name);
1452 	if (vbdev == NULL) {
1453 		cb(-ENODEV, NULL, cb_arg);
1454 		return;
1455 	}
1456 
1457 	if (cache_bdev == NULL) {
1458 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for cache device '%s' to connect\n",
1459 			       vbdev->name, cache_name);
1460 	}
1461 	if (core_bdev == NULL) {
1462 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for core device '%s' to connect\n",
1463 			       vbdev->name, core_name);
1464 	}
1465 
1466 	rc = attach_base_bdevs(vbdev, cache_bdev, core_bdev);
1467 	if (rc) {
1468 		cb(rc, vbdev, cb_arg);
1469 		return;
1470 	}
1471 
1472 	if (core_bdev && cache_bdev) {
1473 		register_vbdev(vbdev, cb, cb_arg);
1474 	} else {
1475 		cb(0, vbdev, cb_arg);
1476 	}
1477 }
1478 
1479 /* This called if new device is created in SPDK application
1480  * If that device named as one of base bdevs of OCF vbdev,
1481  * claim and open them */
1482 static void
1483 vbdev_ocf_examine(struct spdk_bdev *bdev)
1484 {
1485 	const char *bdev_name = spdk_bdev_get_name(bdev);
1486 	struct vbdev_ocf *vbdev;
1487 
1488 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1489 		if (vbdev->state.doing_finish) {
1490 			continue;
1491 		}
1492 
1493 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1494 			attach_base_bdevs(vbdev, bdev, NULL);
1495 			continue;
1496 		}
1497 		if (!strcmp(bdev_name, vbdev->core.name)) {
1498 			attach_base_bdevs(vbdev, NULL, bdev);
1499 			break;
1500 		}
1501 	}
1502 	spdk_bdev_module_examine_done(&ocf_if);
1503 }
1504 
1505 struct metadata_probe_ctx {
1506 	struct vbdev_ocf_base base;
1507 	ocf_volume_t volume;
1508 
1509 	struct ocf_volume_uuid *core_uuids;
1510 	unsigned int uuid_count;
1511 
1512 	int result;
1513 	int refcnt;
1514 };
1515 
1516 static void
1517 _examine_ctx_put(void *ctx)
1518 {
1519 	struct spdk_bdev_desc *desc = ctx;
1520 
1521 	spdk_bdev_close(desc);
1522 }
1523 
1524 static void
1525 examine_ctx_put(struct metadata_probe_ctx *ctx)
1526 {
1527 	unsigned int i;
1528 
1529 	ctx->refcnt--;
1530 	if (ctx->refcnt > 0) {
1531 		return;
1532 	}
1533 
1534 	if (ctx->result) {
1535 		SPDK_ERRLOG("OCF metadata probe for bdev '%s' failed with %d\n",
1536 			    spdk_bdev_get_name(ctx->base.bdev), ctx->result);
1537 	}
1538 
1539 	if (ctx->base.desc) {
1540 		/* Close the underlying bdev on its same opened thread. */
1541 		if (ctx->base.thread && ctx->base.thread != spdk_get_thread()) {
1542 			spdk_thread_send_msg(ctx->base.thread, _examine_ctx_put, ctx->base.desc);
1543 		} else {
1544 			spdk_bdev_close(ctx->base.desc);
1545 		}
1546 	}
1547 
1548 	if (ctx->volume) {
1549 		ocf_volume_destroy(ctx->volume);
1550 	}
1551 
1552 	if (ctx->core_uuids) {
1553 		for (i = 0; i < ctx->uuid_count; i++) {
1554 			free(ctx->core_uuids[i].data);
1555 		}
1556 	}
1557 	free(ctx->core_uuids);
1558 
1559 	examine_done(ctx->result, NULL, ctx->base.bdev);
1560 	free(ctx);
1561 }
1562 
1563 static void
1564 metadata_probe_construct_cb(int rc, struct vbdev_ocf *vbdev, void *vctx)
1565 {
1566 	struct metadata_probe_ctx *ctx = vctx;
1567 
1568 	examine_ctx_put(ctx);
1569 }
1570 
1571 /* This is second callback for ocf_metadata_probe_cores()
1572  * Here we create vbdev configurations based on UUIDs */
1573 static void
1574 metadata_probe_cores_construct(void *priv, int error, unsigned int num_cores)
1575 {
1576 	struct metadata_probe_ctx *ctx = priv;
1577 	const char *vbdev_name;
1578 	const char *core_name;
1579 	const char *cache_name;
1580 	unsigned int i;
1581 
1582 	if (error) {
1583 		ctx->result = error;
1584 		examine_ctx_put(ctx);
1585 		return;
1586 	}
1587 
1588 	for (i = 0; i < num_cores; i++) {
1589 		core_name = ocf_uuid_to_str(&ctx->core_uuids[i]);
1590 		vbdev_name = core_name + strlen(core_name) + 1;
1591 		cache_name = vbdev_name + strlen(vbdev_name) + 1;
1592 
1593 		if (strcmp(ctx->base.bdev->name, cache_name)) {
1594 			SPDK_NOTICELOG("OCF metadata found on %s belongs to bdev named '%s'\n",
1595 				       ctx->base.bdev->name, cache_name);
1596 		}
1597 
1598 		ctx->refcnt++;
1599 		vbdev_ocf_construct(vbdev_name, NULL, cache_name, core_name, true,
1600 				    metadata_probe_construct_cb, ctx);
1601 	}
1602 
1603 	examine_ctx_put(ctx);
1604 }
1605 
1606 /* This callback is called after OCF reads cores UUIDs from cache metadata
1607  * Here we allocate memory for those UUIDs and call ocf_metadata_probe_cores() again */
1608 static void
1609 metadata_probe_cores_get_num(void *priv, int error, unsigned int num_cores)
1610 {
1611 	struct metadata_probe_ctx *ctx = priv;
1612 	unsigned int i;
1613 
1614 	if (error) {
1615 		ctx->result = error;
1616 		examine_ctx_put(ctx);
1617 		return;
1618 	}
1619 
1620 	ctx->uuid_count = num_cores;
1621 	ctx->core_uuids = calloc(num_cores, sizeof(struct ocf_volume_uuid));
1622 	if (!ctx->core_uuids) {
1623 		ctx->result = -ENOMEM;
1624 		examine_ctx_put(ctx);
1625 		return;
1626 	}
1627 
1628 	for (i = 0; i < ctx->uuid_count; i++) {
1629 		ctx->core_uuids[i].size = OCF_VOLUME_UUID_MAX_SIZE;
1630 		ctx->core_uuids[i].data = malloc(OCF_VOLUME_UUID_MAX_SIZE);
1631 		if (!ctx->core_uuids[i].data) {
1632 			ctx->result = -ENOMEM;
1633 			examine_ctx_put(ctx);
1634 			return;
1635 		}
1636 	}
1637 
1638 	ocf_metadata_probe_cores(vbdev_ocf_ctx, ctx->volume, ctx->core_uuids, ctx->uuid_count,
1639 				 metadata_probe_cores_construct, ctx);
1640 }
1641 
1642 static void
1643 metadata_probe_cb(void *priv, int rc,
1644 		  struct ocf_metadata_probe_status *status)
1645 {
1646 	struct metadata_probe_ctx *ctx = priv;
1647 
1648 	if (rc) {
1649 		/* -ENODATA means device does not have cache metadata on it */
1650 		if (rc != -OCF_ERR_NO_METADATA) {
1651 			ctx->result = rc;
1652 		}
1653 		examine_ctx_put(ctx);
1654 		return;
1655 	}
1656 
1657 	ocf_metadata_probe_cores(vbdev_ocf_ctx, ctx->volume, NULL, 0,
1658 				 metadata_probe_cores_get_num, ctx);
1659 }
1660 
1661 /* This is called after vbdev_ocf_examine
1662  * It allows to delay application initialization
1663  * until all OCF bdevs get registered
1664  * If vbdev has all of its base devices it starts asynchronously here
1665  * We first check if bdev appears in configuration,
1666  * if not we do metadata_probe() to create its configuration from bdev metadata */
1667 static void
1668 vbdev_ocf_examine_disk(struct spdk_bdev *bdev)
1669 {
1670 	const char *bdev_name = spdk_bdev_get_name(bdev);
1671 	struct vbdev_ocf *vbdev;
1672 	struct metadata_probe_ctx *ctx;
1673 	bool created_from_config = false;
1674 	int rc;
1675 
1676 	examine_start(bdev);
1677 
1678 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1679 		if (vbdev->state.doing_finish || vbdev->state.started) {
1680 			continue;
1681 		}
1682 
1683 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1684 			examine_start(bdev);
1685 			register_vbdev(vbdev, examine_done, bdev);
1686 			created_from_config = true;
1687 			continue;
1688 		}
1689 		if (!strcmp(bdev_name, vbdev->core.name)) {
1690 			examine_start(bdev);
1691 			register_vbdev(vbdev, examine_done, bdev);
1692 			examine_done(0, NULL, bdev);
1693 			return;
1694 		}
1695 	}
1696 
1697 	/* If devices is discovered during config we do not check for metadata */
1698 	if (created_from_config) {
1699 		examine_done(0, NULL, bdev);
1700 		return;
1701 	}
1702 
1703 	/* Metadata probe path
1704 	 * We create temporary OCF volume and a temporary base structure
1705 	 * to use them for ocf_metadata_probe() and for bottom adapter IOs
1706 	 * Then we get UUIDs of core devices an create configurations based on them */
1707 	ctx = calloc(1, sizeof(*ctx));
1708 	if (!ctx) {
1709 		examine_done(-ENOMEM, NULL, bdev);
1710 		return;
1711 	}
1712 
1713 	ctx->base.bdev = bdev;
1714 	ctx->refcnt = 1;
1715 
1716 	rc = spdk_bdev_open(ctx->base.bdev, true, NULL, NULL, &ctx->base.desc);
1717 	if (rc) {
1718 		ctx->result = rc;
1719 		examine_ctx_put(ctx);
1720 		return;
1721 	}
1722 
1723 	rc = ocf_ctx_volume_create(vbdev_ocf_ctx, &ctx->volume, NULL, SPDK_OBJECT);
1724 	if (rc) {
1725 		ctx->result = rc;
1726 		examine_ctx_put(ctx);
1727 		return;
1728 	}
1729 
1730 	rc = ocf_volume_open(ctx->volume, &ctx->base);
1731 	if (rc) {
1732 		ctx->result = rc;
1733 		examine_ctx_put(ctx);
1734 		return;
1735 	}
1736 
1737 	/* Save the thread where the base device is opened */
1738 	ctx->base.thread = spdk_get_thread();
1739 
1740 	ocf_metadata_probe(vbdev_ocf_ctx, ctx->volume, metadata_probe_cb, ctx);
1741 }
1742 
1743 static int
1744 vbdev_ocf_get_ctx_size(void)
1745 {
1746 	return sizeof(struct bdev_ocf_data);
1747 }
1748 
1749 static void
1750 fini_start(void)
1751 {
1752 	g_fini_started = true;
1753 }
1754 
1755 /* Module-global function table
1756  * Does not relate to vbdev instances */
1757 static struct spdk_bdev_module ocf_if = {
1758 	.name = "ocf",
1759 	.module_init = vbdev_ocf_init,
1760 	.fini_start = fini_start,
1761 	.module_fini = vbdev_ocf_module_fini,
1762 	.config_text = NULL,
1763 	.get_ctx_size = vbdev_ocf_get_ctx_size,
1764 	.examine_config = vbdev_ocf_examine,
1765 	.examine_disk   = vbdev_ocf_examine_disk,
1766 };
1767 SPDK_BDEV_MODULE_REGISTER(ocf, &ocf_if);
1768 
1769 SPDK_LOG_REGISTER_COMPONENT("vbdev_ocf", SPDK_TRACE_VBDEV_OCF)
1770