xref: /spdk/module/bdev/ocf/vbdev_ocf.c (revision 2f5c602574a98ede645991abe279a96e19c50196)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include <ocf/ocf.h>
35 #include <ocf/ocf_types.h>
36 #include <ocf/ocf_mngt.h>
37 
38 #include "ctx.h"
39 #include "data.h"
40 #include "volume.h"
41 #include "utils.h"
42 #include "vbdev_ocf.h"
43 
44 #include "spdk/bdev_module.h"
45 #include "spdk/thread.h"
46 #include "spdk/string.h"
47 #include "spdk/log.h"
48 #include "spdk/cpuset.h"
49 
50 static struct spdk_bdev_module ocf_if;
51 
52 static TAILQ_HEAD(, vbdev_ocf) g_ocf_vbdev_head
53 	= TAILQ_HEAD_INITIALIZER(g_ocf_vbdev_head);
54 
55 static TAILQ_HEAD(, examining_bdev) g_ocf_examining_bdevs_head
56 	= TAILQ_HEAD_INITIALIZER(g_ocf_examining_bdevs_head);
57 
58 bool g_fini_started = false;
59 
60 /* Structure for keeping list of bdevs that are claimed but not used yet */
61 struct examining_bdev {
62 	struct spdk_bdev           *bdev;
63 	TAILQ_ENTRY(examining_bdev) tailq;
64 };
65 
66 /* Add bdev to list of claimed */
67 static void
68 examine_start(struct spdk_bdev *bdev)
69 {
70 	struct examining_bdev *entry = malloc(sizeof(*entry));
71 
72 	assert(entry);
73 	entry->bdev = bdev;
74 	TAILQ_INSERT_TAIL(&g_ocf_examining_bdevs_head, entry, tailq);
75 }
76 
77 /* Find bdev on list of claimed bdevs, then remove it,
78  * if it was the last one on list then report examine done */
79 static void
80 examine_done(int status, struct vbdev_ocf *vbdev, void *cb_arg)
81 {
82 	struct spdk_bdev *bdev = cb_arg;
83 	struct examining_bdev *entry, *safe, *found = NULL;
84 
85 	TAILQ_FOREACH_SAFE(entry, &g_ocf_examining_bdevs_head, tailq, safe) {
86 		if (entry->bdev == bdev) {
87 			if (found) {
88 				goto remove;
89 			} else {
90 				found = entry;
91 			}
92 		}
93 	}
94 
95 	assert(found);
96 	spdk_bdev_module_examine_done(&ocf_if);
97 
98 remove:
99 	TAILQ_REMOVE(&g_ocf_examining_bdevs_head, found, tailq);
100 	free(found);
101 }
102 
103 /* Free allocated strings and structure itself
104  * Used at shutdown only */
105 static void
106 free_vbdev(struct vbdev_ocf *vbdev)
107 {
108 	if (!vbdev) {
109 		return;
110 	}
111 
112 	free(vbdev->name);
113 	free(vbdev->cache.name);
114 	free(vbdev->core.name);
115 	free(vbdev);
116 }
117 
118 /* Get existing cache base
119  * that is attached to other vbdev */
120 static struct vbdev_ocf_base *
121 get_other_cache_base(struct vbdev_ocf_base *base)
122 {
123 	struct vbdev_ocf *vbdev;
124 
125 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
126 		if (&vbdev->cache == base || !vbdev->cache.attached) {
127 			continue;
128 		}
129 		if (!strcmp(vbdev->cache.name, base->name)) {
130 			return &vbdev->cache;
131 		}
132 	}
133 
134 	return NULL;
135 }
136 
137 static bool
138 is_ocf_cache_running(struct vbdev_ocf *vbdev)
139 {
140 	if (vbdev->cache.attached && vbdev->ocf_cache) {
141 		return ocf_cache_is_running(vbdev->ocf_cache);
142 	}
143 	return false;
144 }
145 
146 /* Get existing OCF cache instance
147  * that is started by other vbdev */
148 static ocf_cache_t
149 get_other_cache_instance(struct vbdev_ocf *vbdev)
150 {
151 	struct vbdev_ocf *cmp;
152 
153 	TAILQ_FOREACH(cmp, &g_ocf_vbdev_head, tailq) {
154 		if (cmp->state.doing_finish || cmp == vbdev) {
155 			continue;
156 		}
157 		if (strcmp(cmp->cache.name, vbdev->cache.name)) {
158 			continue;
159 		}
160 		if (is_ocf_cache_running(cmp)) {
161 			return cmp->ocf_cache;
162 		}
163 	}
164 
165 	return NULL;
166 }
167 
168 static void
169 _remove_base_bdev(void *ctx)
170 {
171 	struct spdk_bdev_desc *desc = ctx;
172 
173 	spdk_bdev_close(desc);
174 }
175 
176 /* Close and unclaim base bdev */
177 static void
178 remove_base_bdev(struct vbdev_ocf_base *base)
179 {
180 	if (base->attached) {
181 		if (base->management_channel) {
182 			spdk_put_io_channel(base->management_channel);
183 		}
184 
185 		spdk_bdev_module_release_bdev(base->bdev);
186 		/* Close the underlying bdev on its same opened thread. */
187 		if (base->thread && base->thread != spdk_get_thread()) {
188 			spdk_thread_send_msg(base->thread, _remove_base_bdev, base->desc);
189 		} else {
190 			spdk_bdev_close(base->desc);
191 		}
192 		base->attached = false;
193 	}
194 }
195 
196 /* Finish unregister operation */
197 static void
198 unregister_finish(struct vbdev_ocf *vbdev)
199 {
200 	spdk_bdev_destruct_done(&vbdev->exp_bdev, vbdev->state.stop_status);
201 
202 	if (vbdev->ocf_cache) {
203 		ocf_mngt_cache_put(vbdev->ocf_cache);
204 	}
205 
206 	vbdev_ocf_cache_ctx_put(vbdev->cache_ctx);
207 	vbdev_ocf_mngt_continue(vbdev, 0);
208 }
209 
210 static void
211 close_core_bdev(struct vbdev_ocf *vbdev)
212 {
213 	remove_base_bdev(&vbdev->core);
214 	vbdev_ocf_mngt_continue(vbdev, 0);
215 }
216 
217 static void
218 remove_core_cmpl(void *priv, int error)
219 {
220 	struct vbdev_ocf *vbdev = priv;
221 
222 	ocf_mngt_cache_unlock(vbdev->ocf_cache);
223 	vbdev_ocf_mngt_continue(vbdev, error);
224 }
225 
226 /* Try to lock cache, then remove core */
227 static void
228 remove_core_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
229 {
230 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
231 
232 	if (error) {
233 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
234 			    error, vbdev->name);
235 		vbdev_ocf_mngt_continue(vbdev, error);
236 		return;
237 	}
238 
239 	ocf_mngt_cache_remove_core(vbdev->ocf_core, remove_core_cmpl, vbdev);
240 }
241 
242 /* Detach core base */
243 static void
244 detach_core(struct vbdev_ocf *vbdev)
245 {
246 	if (is_ocf_cache_running(vbdev)) {
247 		ocf_mngt_cache_lock(vbdev->ocf_cache, remove_core_cache_lock_cmpl, vbdev);
248 	} else {
249 		vbdev_ocf_mngt_continue(vbdev, 0);
250 	}
251 }
252 
253 static void
254 close_cache_bdev(struct vbdev_ocf *vbdev)
255 {
256 	remove_base_bdev(&vbdev->cache);
257 	vbdev_ocf_mngt_continue(vbdev, 0);
258 }
259 
260 /* Detach cache base */
261 static void
262 detach_cache(struct vbdev_ocf *vbdev)
263 {
264 	vbdev->state.stop_status = vbdev->mngt_ctx.status;
265 
266 	/* If some other vbdev references this cache bdev,
267 	 * we detach this only by changing the flag, without actual close */
268 	if (get_other_cache_base(&vbdev->cache)) {
269 		vbdev->cache.attached = false;
270 	}
271 
272 	vbdev_ocf_mngt_continue(vbdev, 0);
273 }
274 
275 static void
276 stop_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
277 {
278 	struct vbdev_ocf *vbdev = priv;
279 
280 	vbdev_ocf_queue_put(vbdev->cache_ctx->mngt_queue);
281 	ocf_mngt_cache_unlock(cache);
282 
283 	vbdev_ocf_mngt_continue(vbdev, error);
284 }
285 
286 /* Try to lock cache, then stop it */
287 static void
288 stop_vbdev_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
289 {
290 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
291 
292 	if (error) {
293 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
294 			    error, vbdev->name);
295 		vbdev_ocf_mngt_continue(vbdev, error);
296 		return;
297 	}
298 
299 	ocf_mngt_cache_stop(vbdev->ocf_cache, stop_vbdev_cmpl, vbdev);
300 }
301 
302 /* Stop OCF cache object
303  * vbdev_ocf is not operational after this */
304 static void
305 stop_vbdev(struct vbdev_ocf *vbdev)
306 {
307 	if (!is_ocf_cache_running(vbdev)) {
308 		vbdev_ocf_mngt_continue(vbdev, 0);
309 		return;
310 	}
311 
312 	if (!g_fini_started && get_other_cache_instance(vbdev)) {
313 		SPDK_NOTICELOG("Not stopping cache instance '%s'"
314 			       " because it is referenced by other OCF bdev\n",
315 			       vbdev->cache.name);
316 		vbdev_ocf_mngt_continue(vbdev, 0);
317 		return;
318 	}
319 
320 	ocf_mngt_cache_lock(vbdev->ocf_cache, stop_vbdev_cache_lock_cmpl, vbdev);
321 }
322 
323 static void
324 flush_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
325 {
326 	struct vbdev_ocf *vbdev = priv;
327 
328 	ocf_mngt_cache_unlock(cache);
329 	vbdev_ocf_mngt_continue(vbdev, error);
330 }
331 
332 static void
333 flush_vbdev_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
334 {
335 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
336 
337 	if (error) {
338 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
339 			    error, vbdev->name);
340 		vbdev_ocf_mngt_continue(vbdev, error);
341 		return;
342 	}
343 
344 	ocf_mngt_cache_flush(vbdev->ocf_cache, flush_vbdev_cmpl, vbdev);
345 }
346 
347 static void
348 flush_vbdev(struct vbdev_ocf *vbdev)
349 {
350 	if (!is_ocf_cache_running(vbdev)) {
351 		vbdev_ocf_mngt_continue(vbdev, -EINVAL);
352 		return;
353 	}
354 
355 	ocf_mngt_cache_lock(vbdev->ocf_cache, flush_vbdev_cache_lock_cmpl, vbdev);
356 }
357 
358 /* Procedures called during dirty unregister */
359 vbdev_ocf_mngt_fn unregister_path_dirty[] = {
360 	flush_vbdev,
361 	stop_vbdev,
362 	detach_cache,
363 	close_cache_bdev,
364 	detach_core,
365 	close_core_bdev,
366 	unregister_finish,
367 	NULL
368 };
369 
370 /* Procedures called during clean unregister */
371 vbdev_ocf_mngt_fn unregister_path_clean[] = {
372 	flush_vbdev,
373 	detach_core,
374 	close_core_bdev,
375 	stop_vbdev,
376 	detach_cache,
377 	close_cache_bdev,
378 	unregister_finish,
379 	NULL
380 };
381 
382 /* Start asynchronous management operation using unregister_path */
383 static void
384 unregister_cb(void *opaque)
385 {
386 	struct vbdev_ocf *vbdev = opaque;
387 	vbdev_ocf_mngt_fn *unregister_path;
388 	int rc;
389 
390 	unregister_path = vbdev->state.doing_clean_delete ?
391 			  unregister_path_clean : unregister_path_dirty;
392 
393 	rc = vbdev_ocf_mngt_start(vbdev, unregister_path, NULL, NULL);
394 	if (rc) {
395 		SPDK_ERRLOG("Unable to unregister OCF bdev: %d\n", rc);
396 		spdk_bdev_destruct_done(&vbdev->exp_bdev, rc);
397 	}
398 }
399 
400 /* Clean remove case - remove core and then cache, this order
401  * will remove instance permanently */
402 static void
403 _vbdev_ocf_destruct_clean(struct vbdev_ocf *vbdev)
404 {
405 	if (vbdev->core.attached) {
406 		detach_core(vbdev);
407 		close_core_bdev(vbdev);
408 	}
409 
410 	if (vbdev->cache.attached) {
411 		detach_cache(vbdev);
412 		close_cache_bdev(vbdev);
413 	}
414 }
415 
416 /* Dirty shutdown/hot remove case - remove cache and then core, this order
417  * will allow us to recover this instance in the future */
418 static void
419 _vbdev_ocf_destruct_dirty(struct vbdev_ocf *vbdev)
420 {
421 	if (vbdev->cache.attached) {
422 		detach_cache(vbdev);
423 		close_cache_bdev(vbdev);
424 	}
425 
426 	if (vbdev->core.attached) {
427 		detach_core(vbdev);
428 		close_core_bdev(vbdev);
429 	}
430 }
431 
432 /* Unregister io device with callback to unregister_cb
433  * This function is called during spdk_bdev_unregister */
434 static int
435 vbdev_ocf_destruct(void *opaque)
436 {
437 	struct vbdev_ocf *vbdev = opaque;
438 
439 	if (vbdev->state.doing_finish) {
440 		return -EALREADY;
441 	}
442 
443 	if (vbdev->state.starting && !vbdev->state.started) {
444 		/* Prevent before detach cache/core during register path of
445 		  this bdev */
446 		return -EBUSY;
447 	}
448 
449 	vbdev->state.doing_finish = true;
450 
451 	if (vbdev->state.started) {
452 		spdk_io_device_unregister(vbdev, unregister_cb);
453 		/* Return 1 because unregister is delayed */
454 		return 1;
455 	}
456 
457 	if (vbdev->state.doing_clean_delete) {
458 		_vbdev_ocf_destruct_clean(vbdev);
459 	} else {
460 		_vbdev_ocf_destruct_dirty(vbdev);
461 	}
462 
463 	return 0;
464 }
465 
466 /* Stop OCF cache and unregister SPDK bdev */
467 int
468 vbdev_ocf_delete(struct vbdev_ocf *vbdev, void (*cb)(void *, int), void *cb_arg)
469 {
470 	int rc = 0;
471 
472 	if (vbdev->state.started) {
473 		spdk_bdev_unregister(&vbdev->exp_bdev, cb, cb_arg);
474 	} else {
475 		rc = vbdev_ocf_destruct(vbdev);
476 		if (rc == 0 && cb) {
477 			cb(cb_arg, 0);
478 		}
479 	}
480 
481 	return rc;
482 }
483 
484 /* Remove cores permanently and then stop OCF cache and unregister SPDK bdev */
485 int
486 vbdev_ocf_delete_clean(struct vbdev_ocf *vbdev, void (*cb)(void *, int),
487 		       void *cb_arg)
488 {
489 	vbdev->state.doing_clean_delete = true;
490 
491 	return vbdev_ocf_delete(vbdev, cb, cb_arg);
492 }
493 
494 
495 /* If vbdev is online, return its object */
496 struct vbdev_ocf *
497 vbdev_ocf_get_by_name(const char *name)
498 {
499 	struct vbdev_ocf *vbdev;
500 
501 	if (name == NULL) {
502 		assert(false);
503 		return NULL;
504 	}
505 
506 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
507 		if (vbdev->name == NULL || vbdev->state.doing_finish) {
508 			continue;
509 		}
510 		if (strcmp(vbdev->name, name) == 0) {
511 			return vbdev;
512 		}
513 	}
514 	return NULL;
515 }
516 
517 /* Return matching base if parent vbdev is online */
518 struct vbdev_ocf_base *
519 vbdev_ocf_get_base_by_name(const char *name)
520 {
521 	struct vbdev_ocf *vbdev;
522 
523 	if (name == NULL) {
524 		assert(false);
525 		return NULL;
526 	}
527 
528 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
529 		if (vbdev->state.doing_finish) {
530 			continue;
531 		}
532 
533 		if (vbdev->cache.name && strcmp(vbdev->cache.name, name) == 0) {
534 			return &vbdev->cache;
535 		}
536 		if (vbdev->core.name && strcmp(vbdev->core.name, name) == 0) {
537 			return &vbdev->core;
538 		}
539 	}
540 	return NULL;
541 }
542 
543 /* Execute fn for each OCF device that is online or waits for base devices */
544 void
545 vbdev_ocf_foreach(vbdev_ocf_foreach_fn fn, void *ctx)
546 {
547 	struct vbdev_ocf *vbdev;
548 
549 	assert(fn != NULL);
550 
551 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
552 		if (!vbdev->state.doing_finish) {
553 			fn(vbdev, ctx);
554 		}
555 	}
556 }
557 
558 /* Called from OCF when SPDK_IO is completed */
559 static void
560 vbdev_ocf_io_submit_cb(struct ocf_io *io, int error)
561 {
562 	struct spdk_bdev_io *bdev_io = io->priv1;
563 
564 	if (error == 0) {
565 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
566 	} else if (error == -ENOMEM) {
567 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
568 	} else {
569 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
570 	}
571 
572 	ocf_io_put(io);
573 }
574 
575 /* Configure io parameters and send it to OCF */
576 static int
577 io_submit_to_ocf(struct spdk_bdev_io *bdev_io, struct ocf_io *io)
578 {
579 	switch (bdev_io->type) {
580 	case SPDK_BDEV_IO_TYPE_WRITE:
581 	case SPDK_BDEV_IO_TYPE_READ:
582 		ocf_core_submit_io(io);
583 		return 0;
584 	case SPDK_BDEV_IO_TYPE_FLUSH:
585 		ocf_core_submit_flush(io);
586 		return 0;
587 	case SPDK_BDEV_IO_TYPE_UNMAP:
588 		ocf_core_submit_discard(io);
589 		return 0;
590 	case SPDK_BDEV_IO_TYPE_RESET:
591 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
592 	default:
593 		SPDK_ERRLOG("Unsupported IO type: %d\n", bdev_io->type);
594 		return -EINVAL;
595 	}
596 }
597 
598 /* Submit SPDK-IO to OCF */
599 static void
600 io_handle(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
601 {
602 	struct vbdev_ocf *vbdev = bdev_io->bdev->ctxt;
603 	struct ocf_io *io = NULL;
604 	struct bdev_ocf_data *data = NULL;
605 	struct vbdev_ocf_qctx *qctx = spdk_io_channel_get_ctx(ch);
606 	uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
607 	uint64_t offset = bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen;
608 	int dir, flags = 0;
609 	int err;
610 
611 	switch (bdev_io->type) {
612 	case SPDK_BDEV_IO_TYPE_READ:
613 		dir = OCF_READ;
614 		break;
615 	case SPDK_BDEV_IO_TYPE_WRITE:
616 		dir = OCF_WRITE;
617 		break;
618 	case SPDK_BDEV_IO_TYPE_FLUSH:
619 		dir = OCF_WRITE;
620 		break;
621 	case SPDK_BDEV_IO_TYPE_UNMAP:
622 		dir = OCF_WRITE;
623 		break;
624 	default:
625 		err = -EINVAL;
626 		goto fail;
627 	}
628 
629 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
630 		flags = OCF_WRITE_FLUSH;
631 	}
632 
633 	io = ocf_core_new_io(vbdev->ocf_core, qctx->queue, offset, len, dir, 0, flags);
634 	if (!io) {
635 		err = -ENOMEM;
636 		goto fail;
637 	}
638 
639 	data = vbdev_ocf_data_from_spdk_io(bdev_io);
640 	if (!data) {
641 		err = -ENOMEM;
642 		goto fail;
643 	}
644 
645 	err = ocf_io_set_data(io, data, 0);
646 	if (err) {
647 		goto fail;
648 	}
649 
650 	ocf_io_set_cmpl(io, bdev_io, NULL, vbdev_ocf_io_submit_cb);
651 
652 	err = io_submit_to_ocf(bdev_io, io);
653 	if (err) {
654 		goto fail;
655 	}
656 
657 	return;
658 
659 fail:
660 	if (io) {
661 		ocf_io_put(io);
662 	}
663 
664 	if (err == -ENOMEM) {
665 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
666 	} else {
667 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
668 	}
669 }
670 
671 static void
672 vbdev_ocf_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
673 		     bool success)
674 {
675 	if (!success) {
676 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
677 		return;
678 	}
679 
680 	io_handle(ch, bdev_io);
681 }
682 
683 /* Called from bdev layer when an io to Cache vbdev is submitted */
684 static void
685 vbdev_ocf_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
686 {
687 	switch (bdev_io->type) {
688 	case SPDK_BDEV_IO_TYPE_READ:
689 		/* User does not have to allocate io vectors for the request,
690 		 * so in case they are not allocated, we allocate them here */
691 		spdk_bdev_io_get_buf(bdev_io, vbdev_ocf_get_buf_cb,
692 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
693 		break;
694 	case SPDK_BDEV_IO_TYPE_WRITE:
695 	case SPDK_BDEV_IO_TYPE_FLUSH:
696 	case SPDK_BDEV_IO_TYPE_UNMAP:
697 		io_handle(ch, bdev_io);
698 		break;
699 	case SPDK_BDEV_IO_TYPE_RESET:
700 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
701 	default:
702 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
703 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
704 		break;
705 	}
706 }
707 
708 /* Called from bdev layer */
709 static bool
710 vbdev_ocf_io_type_supported(void *opaque, enum spdk_bdev_io_type io_type)
711 {
712 	struct vbdev_ocf *vbdev = opaque;
713 
714 	switch (io_type) {
715 	case SPDK_BDEV_IO_TYPE_READ:
716 	case SPDK_BDEV_IO_TYPE_WRITE:
717 	case SPDK_BDEV_IO_TYPE_FLUSH:
718 	case SPDK_BDEV_IO_TYPE_UNMAP:
719 		return spdk_bdev_io_type_supported(vbdev->core.bdev, io_type);
720 	case SPDK_BDEV_IO_TYPE_RESET:
721 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
722 	default:
723 		return false;
724 	}
725 }
726 
727 /* Called from bdev layer */
728 static struct spdk_io_channel *
729 vbdev_ocf_get_io_channel(void *opaque)
730 {
731 	struct vbdev_ocf *bdev = opaque;
732 
733 	return spdk_get_io_channel(bdev);
734 }
735 
736 static int
737 vbdev_ocf_dump_info_json(void *opaque, struct spdk_json_write_ctx *w)
738 {
739 	struct vbdev_ocf *vbdev = opaque;
740 
741 	spdk_json_write_named_string(w, "cache_device", vbdev->cache.name);
742 	spdk_json_write_named_string(w, "core_device", vbdev->core.name);
743 
744 	spdk_json_write_named_string(w, "mode",
745 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
746 	spdk_json_write_named_uint32(w, "cache_line_size",
747 				     ocf_get_cache_line_size(vbdev->ocf_cache));
748 	spdk_json_write_named_bool(w, "metadata_volatile",
749 				   vbdev->cfg.cache.metadata_volatile);
750 
751 	return 0;
752 }
753 
754 static void
755 vbdev_ocf_write_json_config(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
756 {
757 	struct vbdev_ocf *vbdev = bdev->ctxt;
758 
759 	spdk_json_write_object_begin(w);
760 
761 	spdk_json_write_named_string(w, "method", "bdev_ocf_create");
762 
763 	spdk_json_write_named_object_begin(w, "params");
764 	spdk_json_write_named_string(w, "name", vbdev->name);
765 	spdk_json_write_named_string(w, "mode",
766 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
767 	spdk_json_write_named_uint32(w, "cache_line_size",
768 				     ocf_get_cache_line_size(vbdev->ocf_cache));
769 	spdk_json_write_named_string(w, "cache_bdev_name", vbdev->cache.name);
770 	spdk_json_write_named_string(w, "core_bdev_name", vbdev->core.name);
771 	spdk_json_write_object_end(w);
772 
773 	spdk_json_write_object_end(w);
774 }
775 
776 /* Cache vbdev function table
777  * Used by bdev layer */
778 static struct spdk_bdev_fn_table cache_dev_fn_table = {
779 	.destruct = vbdev_ocf_destruct,
780 	.io_type_supported = vbdev_ocf_io_type_supported,
781 	.submit_request	= vbdev_ocf_submit_request,
782 	.get_io_channel	= vbdev_ocf_get_io_channel,
783 	.write_config_json = vbdev_ocf_write_json_config,
784 	.dump_info_json = vbdev_ocf_dump_info_json,
785 };
786 
787 /* Poller function for the OCF queue
788  * We execute OCF requests here synchronously */
789 static int
790 queue_poll(void *opaque)
791 {
792 	struct vbdev_ocf_qctx *qctx = opaque;
793 	uint32_t iono = ocf_queue_pending_io(qctx->queue);
794 	int i, max = spdk_min(32, iono);
795 
796 	for (i = 0; i < max; i++) {
797 		ocf_queue_run_single(qctx->queue);
798 	}
799 
800 	if (iono > 0) {
801 		return SPDK_POLLER_BUSY;
802 	} else {
803 		return SPDK_POLLER_IDLE;
804 	}
805 }
806 
807 /* Called during ocf_submit_io, ocf_purge*
808  * and any other requests that need to submit io */
809 static void
810 vbdev_ocf_ctx_queue_kick(ocf_queue_t q)
811 {
812 }
813 
814 /* OCF queue deinitialization
815  * Called at ocf_cache_stop */
816 static void
817 vbdev_ocf_ctx_queue_stop(ocf_queue_t q)
818 {
819 	struct vbdev_ocf_qctx *qctx = ocf_queue_get_priv(q);
820 
821 	if (qctx) {
822 		spdk_put_io_channel(qctx->cache_ch);
823 		spdk_put_io_channel(qctx->core_ch);
824 		spdk_poller_unregister(&qctx->poller);
825 		if (qctx->allocated) {
826 			free(qctx);
827 		}
828 	}
829 }
830 
831 /* Queue ops is an interface for running queue thread
832  * stop() operation in called just before queue gets destroyed */
833 const struct ocf_queue_ops queue_ops = {
834 	.kick_sync = vbdev_ocf_ctx_queue_kick,
835 	.kick = vbdev_ocf_ctx_queue_kick,
836 	.stop = vbdev_ocf_ctx_queue_stop,
837 };
838 
839 /* Called on cache vbdev creation at every thread
840  * We allocate OCF queues here and SPDK poller for it */
841 static int
842 io_device_create_cb(void *io_device, void *ctx_buf)
843 {
844 	struct vbdev_ocf *vbdev = io_device;
845 	struct vbdev_ocf_qctx *qctx = ctx_buf;
846 	int rc;
847 
848 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &qctx->queue, &queue_ops);
849 	if (rc) {
850 		return rc;
851 	}
852 
853 	ocf_queue_set_priv(qctx->queue, qctx);
854 
855 	qctx->vbdev      = vbdev;
856 	qctx->cache_ch   = spdk_bdev_get_io_channel(vbdev->cache.desc);
857 	qctx->core_ch    = spdk_bdev_get_io_channel(vbdev->core.desc);
858 	qctx->poller     = SPDK_POLLER_REGISTER(queue_poll, qctx, 0);
859 
860 	return rc;
861 }
862 
863 /* Called per thread
864  * Put OCF queue and relaunch poller with new context to finish pending requests */
865 static void
866 io_device_destroy_cb(void *io_device, void *ctx_buf)
867 {
868 	/* Making a copy of context to use it after io channel will be destroyed */
869 	struct vbdev_ocf_qctx *copy = malloc(sizeof(*copy));
870 	struct vbdev_ocf_qctx *qctx = ctx_buf;
871 
872 	if (copy) {
873 		ocf_queue_set_priv(qctx->queue, copy);
874 		memcpy(copy, qctx, sizeof(*copy));
875 		spdk_poller_unregister(&qctx->poller);
876 		copy->poller = SPDK_POLLER_REGISTER(queue_poll, copy, 0);
877 		copy->allocated = true;
878 	} else {
879 		SPDK_ERRLOG("Unable to stop OCF queue properly: %s\n",
880 			    spdk_strerror(ENOMEM));
881 	}
882 
883 	vbdev_ocf_queue_put(qctx->queue);
884 }
885 
886 /* OCF management queue deinitialization */
887 static void
888 vbdev_ocf_ctx_mngt_queue_stop(ocf_queue_t q)
889 {
890 	struct spdk_poller *poller = ocf_queue_get_priv(q);
891 
892 	if (poller) {
893 		spdk_poller_unregister(&poller);
894 	}
895 }
896 
897 static int
898 mngt_queue_poll(void *opaque)
899 {
900 	ocf_queue_t q = opaque;
901 	uint32_t iono = ocf_queue_pending_io(q);
902 	int i, max = spdk_min(32, iono);
903 
904 	for (i = 0; i < max; i++) {
905 		ocf_queue_run_single(q);
906 	}
907 
908 	if (iono > 0) {
909 		return SPDK_POLLER_BUSY;
910 	} else {
911 		return SPDK_POLLER_IDLE;
912 	}
913 }
914 
915 static void
916 vbdev_ocf_ctx_mngt_queue_kick(ocf_queue_t q)
917 {
918 }
919 
920 /* Queue ops is an interface for running queue thread
921  * stop() operation in called just before queue gets destroyed */
922 const struct ocf_queue_ops mngt_queue_ops = {
923 	.kick_sync = NULL,
924 	.kick = vbdev_ocf_ctx_mngt_queue_kick,
925 	.stop = vbdev_ocf_ctx_mngt_queue_stop,
926 };
927 
928 static void
929 vbdev_ocf_mngt_exit(struct vbdev_ocf *vbdev, vbdev_ocf_mngt_fn *rollback_path, int rc)
930 {
931 	vbdev->state.starting = false;
932 	vbdev_ocf_mngt_stop(vbdev, rollback_path, rc);
933 }
934 
935 /* Create exported spdk object */
936 static void
937 finish_register(struct vbdev_ocf *vbdev)
938 {
939 	int result;
940 
941 	/* Copy properties of the base bdev */
942 	vbdev->exp_bdev.blocklen = vbdev->core.bdev->blocklen;
943 	vbdev->exp_bdev.write_cache = vbdev->core.bdev->write_cache;
944 	vbdev->exp_bdev.required_alignment = vbdev->core.bdev->required_alignment;
945 
946 	vbdev->exp_bdev.name = vbdev->name;
947 	vbdev->exp_bdev.product_name = "SPDK OCF";
948 
949 	vbdev->exp_bdev.blockcnt = vbdev->core.bdev->blockcnt;
950 	vbdev->exp_bdev.ctxt = vbdev;
951 	vbdev->exp_bdev.fn_table = &cache_dev_fn_table;
952 	vbdev->exp_bdev.module = &ocf_if;
953 
954 	/* Finally register vbdev in SPDK */
955 	spdk_io_device_register(vbdev, io_device_create_cb, io_device_destroy_cb,
956 				sizeof(struct vbdev_ocf_qctx), vbdev->name);
957 	result = spdk_bdev_register(&vbdev->exp_bdev);
958 	if (result) {
959 		SPDK_ERRLOG("Could not register exposed bdev %s\n",
960 			    vbdev->name);
961 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, result);
962 		return;
963 	} else {
964 		vbdev->state.started = true;
965 	}
966 
967 	vbdev_ocf_mngt_continue(vbdev, result);
968 }
969 
970 static void
971 add_core_cmpl(ocf_cache_t cache, ocf_core_t core, void *priv, int error)
972 {
973 	struct vbdev_ocf *vbdev = priv;
974 
975 	ocf_mngt_cache_unlock(cache);
976 
977 	if (error) {
978 		SPDK_ERRLOG("Error %d, failed to add core device to cache instance %s,"
979 			    "starting rollback\n", error, vbdev->name);
980 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
981 		return;
982 	} else {
983 		vbdev->ocf_core = core;
984 	}
985 
986 	vbdev_ocf_mngt_continue(vbdev, error);
987 }
988 
989 /* Try to lock cache, then add core */
990 static void
991 add_core_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
992 {
993 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
994 
995 	if (error) {
996 		SPDK_ERRLOG("Error %d, can not lock cache instance %s,"
997 			    "starting rollback\n", error, vbdev->name);
998 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
999 	}
1000 	ocf_mngt_cache_add_core(vbdev->ocf_cache, &vbdev->cfg.core, add_core_cmpl, vbdev);
1001 }
1002 
1003 /* Add core for existing OCF cache instance */
1004 static void
1005 add_core(struct vbdev_ocf *vbdev)
1006 {
1007 	ocf_mngt_cache_lock(vbdev->ocf_cache, add_core_cache_lock_cmpl, vbdev);
1008 }
1009 
1010 static void
1011 start_cache_cmpl(ocf_cache_t cache, void *priv, int error)
1012 {
1013 	struct vbdev_ocf *vbdev = priv;
1014 	uint64_t mem_needed;
1015 
1016 	ocf_mngt_cache_unlock(cache);
1017 
1018 	if (error) {
1019 		SPDK_ERRLOG("Error %d during start cache %s, starting rollback\n",
1020 			    error, vbdev->name);
1021 
1022 		if (error == -OCF_ERR_NO_MEM) {
1023 			ocf_mngt_get_ram_needed(cache, &vbdev->cfg.device, &mem_needed);
1024 
1025 			SPDK_NOTICELOG("Try to increase hugepage memory size or cache line size. "
1026 				       "For your configuration:\nDevice size: %"PRIu64" bytes\n"
1027 				       "Cache line size: %"PRIu64" bytes\nFree memory needed to start "
1028 				       "cache: %"PRIu64" bytes\n", vbdev->cache.bdev->blockcnt *
1029 				       vbdev->cache.bdev->blocklen, vbdev->cfg.cache.cache_line_size,
1030 				       mem_needed);
1031 		}
1032 
1033 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
1034 		return;
1035 	}
1036 
1037 	vbdev_ocf_mngt_continue(vbdev, error);
1038 }
1039 
1040 static int
1041 create_management_queue(struct vbdev_ocf *vbdev)
1042 {
1043 	struct spdk_poller *mngt_poller;
1044 	int rc;
1045 
1046 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &vbdev->cache_ctx->mngt_queue, &mngt_queue_ops);
1047 	if (rc) {
1048 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1049 		return rc;
1050 	}
1051 
1052 	mngt_poller = SPDK_POLLER_REGISTER(mngt_queue_poll, vbdev->cache_ctx->mngt_queue, 100);
1053 	if (mngt_poller == NULL) {
1054 		SPDK_ERRLOG("Unable to initiate mngt request: %s", spdk_strerror(ENOMEM));
1055 		return -ENOMEM;
1056 	}
1057 
1058 	ocf_queue_set_priv(vbdev->cache_ctx->mngt_queue, mngt_poller);
1059 	ocf_mngt_cache_set_mngt_queue(vbdev->ocf_cache, vbdev->cache_ctx->mngt_queue);
1060 
1061 	return 0;
1062 }
1063 
1064 /* Start OCF cache, attach caching device */
1065 static void
1066 start_cache(struct vbdev_ocf *vbdev)
1067 {
1068 	ocf_cache_t existing;
1069 	int rc;
1070 
1071 	if (is_ocf_cache_running(vbdev)) {
1072 		vbdev_ocf_mngt_stop(vbdev, NULL, -EALREADY);
1073 		return;
1074 	}
1075 
1076 	existing = get_other_cache_instance(vbdev);
1077 	if (existing) {
1078 		SPDK_NOTICELOG("OCF bdev %s connects to existing cache device %s\n",
1079 			       vbdev->name, vbdev->cache.name);
1080 		vbdev->ocf_cache = existing;
1081 		ocf_mngt_cache_get(vbdev->ocf_cache);
1082 		vbdev->cache_ctx = ocf_cache_get_priv(existing);
1083 		vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1084 		vbdev_ocf_mngt_continue(vbdev, 0);
1085 		return;
1086 	}
1087 
1088 	vbdev->cache_ctx = calloc(1, sizeof(struct vbdev_ocf_cache_ctx));
1089 	if (vbdev->cache_ctx == NULL) {
1090 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, -ENOMEM);
1091 		return;
1092 	}
1093 
1094 	vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1095 	pthread_mutex_init(&vbdev->cache_ctx->lock, NULL);
1096 
1097 	rc = ocf_mngt_cache_start(vbdev_ocf_ctx, &vbdev->ocf_cache, &vbdev->cfg.cache);
1098 	if (rc) {
1099 		SPDK_ERRLOG("Could not start cache %s: %d\n", vbdev->name, rc);
1100 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, rc);
1101 		return;
1102 	}
1103 	ocf_mngt_cache_get(vbdev->ocf_cache);
1104 
1105 	ocf_cache_set_priv(vbdev->ocf_cache, vbdev->cache_ctx);
1106 
1107 	rc = create_management_queue(vbdev);
1108 	if (rc) {
1109 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1110 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, rc);
1111 		return;
1112 	}
1113 
1114 	if (vbdev->cfg.loadq) {
1115 		ocf_mngt_cache_load(vbdev->ocf_cache, &vbdev->cfg.device, start_cache_cmpl, vbdev);
1116 	} else {
1117 		ocf_mngt_cache_attach(vbdev->ocf_cache, &vbdev->cfg.device, start_cache_cmpl, vbdev);
1118 	}
1119 }
1120 
1121 /* Procedures called during register operation */
1122 vbdev_ocf_mngt_fn register_path[] = {
1123 	start_cache,
1124 	add_core,
1125 	finish_register,
1126 	NULL
1127 };
1128 
1129 /* Start cache instance and register OCF bdev */
1130 static void
1131 register_vbdev(struct vbdev_ocf *vbdev, vbdev_ocf_mngt_callback cb, void *cb_arg)
1132 {
1133 	int rc;
1134 
1135 	if (!(vbdev->core.attached && vbdev->cache.attached) || vbdev->state.started) {
1136 		cb(-EPERM, vbdev, cb_arg);
1137 		return;
1138 	}
1139 
1140 	vbdev->state.starting = true;
1141 	rc = vbdev_ocf_mngt_start(vbdev, register_path, cb, cb_arg);
1142 	if (rc) {
1143 		cb(rc, vbdev, cb_arg);
1144 	}
1145 }
1146 
1147 /* Init OCF configuration options
1148  * for core and cache devices */
1149 static void
1150 init_vbdev_config(struct vbdev_ocf *vbdev)
1151 {
1152 	struct vbdev_ocf_config *cfg = &vbdev->cfg;
1153 
1154 	snprintf(cfg->cache.name, sizeof(cfg->cache.name), "%s", vbdev->name);
1155 	snprintf(cfg->core.name, sizeof(cfg->core.name), "%s", vbdev->core.name);
1156 
1157 	/* TODO [metadata]: make configurable with persistent
1158 	 * metadata support */
1159 	cfg->cache.metadata_volatile = false;
1160 
1161 	/* This are suggested values that
1162 	 * should be sufficient for most use cases */
1163 	cfg->cache.backfill.max_queue_size = 65536;
1164 	cfg->cache.backfill.queue_unblock_size = 60000;
1165 
1166 	cfg->device.perform_test = false;
1167 	cfg->device.discard_on_start = false;
1168 
1169 	vbdev->cfg.cache.locked = true;
1170 
1171 	cfg->core.volume_type = SPDK_OBJECT;
1172 	cfg->device.volume_type = SPDK_OBJECT;
1173 
1174 	if (vbdev->cfg.loadq) {
1175 		/* When doing cache_load(), we need to set try_add to true,
1176 		 * otherwise OCF will interpret this core as new
1177 		 * instead of the inactive one */
1178 		vbdev->cfg.core.try_add = true;
1179 	} else {
1180 		/* When cache is initialized as new, set force flag to true,
1181 		 * to ignore warnings about existing metadata */
1182 		cfg->device.force = true;
1183 	}
1184 
1185 	/* Serialize bdev names in OCF UUID to interpret on future loads
1186 	 * Core UUID is a triple of (core name, vbdev name, cache name)
1187 	 * Cache UUID is cache bdev name */
1188 	cfg->device.uuid.size = strlen(vbdev->cache.name) + 1;
1189 	cfg->device.uuid.data = vbdev->cache.name;
1190 
1191 	snprintf(vbdev->uuid, VBDEV_OCF_MD_MAX_LEN, "%s %s %s",
1192 		 vbdev->core.name, vbdev->name, vbdev->cache.name);
1193 	cfg->core.uuid.size = strlen(vbdev->uuid) + 1;
1194 	cfg->core.uuid.data = vbdev->uuid;
1195 	vbdev->uuid[strlen(vbdev->core.name)] = 0;
1196 	vbdev->uuid[strlen(vbdev->core.name) + 1 + strlen(vbdev->name)] = 0;
1197 }
1198 
1199 /* Allocate vbdev structure object and add it to the global list */
1200 static int
1201 init_vbdev(const char *vbdev_name,
1202 	   const char *cache_mode_name,
1203 	   const uint64_t cache_line_size,
1204 	   const char *cache_name,
1205 	   const char *core_name,
1206 	   bool loadq)
1207 {
1208 	struct vbdev_ocf *vbdev;
1209 	int rc = 0;
1210 
1211 	if (spdk_bdev_get_by_name(vbdev_name) || vbdev_ocf_get_by_name(vbdev_name)) {
1212 		SPDK_ERRLOG("Device with name '%s' already exists\n", vbdev_name);
1213 		return -EPERM;
1214 	}
1215 
1216 	vbdev = calloc(1, sizeof(*vbdev));
1217 	if (!vbdev) {
1218 		goto error_mem;
1219 	}
1220 
1221 	vbdev->cache.parent = vbdev;
1222 	vbdev->core.parent = vbdev;
1223 	vbdev->cache.is_cache = true;
1224 	vbdev->core.is_cache = false;
1225 
1226 	if (cache_mode_name) {
1227 		vbdev->cfg.cache.cache_mode
1228 			= ocf_get_cache_mode(cache_mode_name);
1229 	} else if (!loadq) { /* In load path it is OK to pass NULL as cache mode */
1230 		SPDK_ERRLOG("No cache mode specified\n");
1231 		rc = -EINVAL;
1232 		goto error_free;
1233 	}
1234 	if (vbdev->cfg.cache.cache_mode < 0) {
1235 		SPDK_ERRLOG("Incorrect cache mode '%s'\n", cache_mode_name);
1236 		rc = -EINVAL;
1237 		goto error_free;
1238 	}
1239 
1240 	ocf_cache_line_size_t set_cache_line_size = cache_line_size ?
1241 			(ocf_cache_line_size_t)cache_line_size * KiB :
1242 			ocf_cache_line_size_default;
1243 	if (set_cache_line_size == 0) {
1244 		SPDK_ERRLOG("Cache line size should be non-zero.\n");
1245 		rc = -EINVAL;
1246 		goto error_free;
1247 	}
1248 	vbdev->cfg.device.cache_line_size = set_cache_line_size;
1249 	vbdev->cfg.cache.cache_line_size = set_cache_line_size;
1250 
1251 	vbdev->name = strdup(vbdev_name);
1252 	if (!vbdev->name) {
1253 		goto error_mem;
1254 	}
1255 
1256 	vbdev->cache.name = strdup(cache_name);
1257 	if (!vbdev->cache.name) {
1258 		goto error_mem;
1259 	}
1260 
1261 	vbdev->core.name = strdup(core_name);
1262 	if (!vbdev->core.name) {
1263 		goto error_mem;
1264 	}
1265 
1266 	vbdev->cfg.loadq = loadq;
1267 	init_vbdev_config(vbdev);
1268 	TAILQ_INSERT_TAIL(&g_ocf_vbdev_head, vbdev, tailq);
1269 	return rc;
1270 
1271 error_mem:
1272 	rc = -ENOMEM;
1273 error_free:
1274 	free_vbdev(vbdev);
1275 	return rc;
1276 }
1277 
1278 /* Read configuration file at the start of SPDK application
1279  * This adds vbdevs to global list if some mentioned in config */
1280 static int
1281 vbdev_ocf_init(void)
1282 {
1283 	int status;
1284 
1285 	status = vbdev_ocf_ctx_init();
1286 	if (status) {
1287 		SPDK_ERRLOG("OCF ctx initialization failed with=%d\n", status);
1288 		return status;
1289 	}
1290 
1291 	status = vbdev_ocf_volume_init();
1292 	if (status) {
1293 		vbdev_ocf_ctx_cleanup();
1294 		SPDK_ERRLOG("OCF volume initialization failed with=%d\n", status);
1295 		return status;
1296 	}
1297 
1298 	return status;
1299 }
1300 
1301 /* Called after application shutdown started
1302  * Release memory of allocated structures here */
1303 static void
1304 vbdev_ocf_module_fini(void)
1305 {
1306 	struct vbdev_ocf *vbdev;
1307 
1308 	while ((vbdev = TAILQ_FIRST(&g_ocf_vbdev_head))) {
1309 		TAILQ_REMOVE(&g_ocf_vbdev_head, vbdev, tailq);
1310 		free_vbdev(vbdev);
1311 	}
1312 
1313 	vbdev_ocf_volume_cleanup();
1314 	vbdev_ocf_ctx_cleanup();
1315 }
1316 
1317 /* When base device gets unpluged this is called
1318  * We will unregister cache vbdev here
1319  * When cache device is removed, we delete every OCF bdev that used it */
1320 static void
1321 hotremove_cb(struct vbdev_ocf_base *base)
1322 {
1323 	struct vbdev_ocf *vbdev;
1324 
1325 	if (!base->is_cache) {
1326 		if (base->parent->state.doing_finish) {
1327 			return;
1328 		}
1329 
1330 		SPDK_NOTICELOG("Deinitializing '%s' because its core device '%s' was removed\n",
1331 			       base->parent->name, base->name);
1332 		vbdev_ocf_delete(base->parent, NULL, NULL);
1333 		return;
1334 	}
1335 
1336 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1337 		if (vbdev->state.doing_finish) {
1338 			continue;
1339 		}
1340 		if (strcmp(base->name, vbdev->cache.name) == 0) {
1341 			SPDK_NOTICELOG("Deinitializing '%s' because"
1342 				       " its cache device '%s' was removed\n",
1343 				       vbdev->name, base->name);
1344 			vbdev_ocf_delete(vbdev, NULL, NULL);
1345 		}
1346 	}
1347 }
1348 
1349 static void
1350 base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1351 		   void *event_ctx)
1352 {
1353 	switch (type) {
1354 	case SPDK_BDEV_EVENT_REMOVE:
1355 		if (event_ctx) {
1356 			hotremove_cb(event_ctx);
1357 		}
1358 		break;
1359 	default:
1360 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1361 		break;
1362 	}
1363 }
1364 
1365 /* Open base SPDK bdev and claim it */
1366 static int
1367 attach_base(struct vbdev_ocf_base *base)
1368 {
1369 	int status;
1370 
1371 	if (base->attached) {
1372 		return -EALREADY;
1373 	}
1374 
1375 	/* If base cache bdev was already opened by other vbdev,
1376 	 * we just copy its descriptor here */
1377 	if (base->is_cache) {
1378 		struct vbdev_ocf_base *existing = get_other_cache_base(base);
1379 		if (existing) {
1380 			base->desc = existing->desc;
1381 			base->management_channel = existing->management_channel;
1382 			base->attached = true;
1383 			return 0;
1384 		}
1385 	}
1386 
1387 	status = spdk_bdev_open_ext(base->name, true, base_bdev_event_cb, base, &base->desc);
1388 	if (status) {
1389 		SPDK_ERRLOG("Unable to open device '%s' for writing\n", base->name);
1390 		return status;
1391 	}
1392 
1393 	status = spdk_bdev_module_claim_bdev(base->bdev, base->desc,
1394 					     &ocf_if);
1395 	if (status) {
1396 		SPDK_ERRLOG("Unable to claim device '%s'\n", base->name);
1397 		spdk_bdev_close(base->desc);
1398 		return status;
1399 	}
1400 
1401 	base->management_channel = spdk_bdev_get_io_channel(base->desc);
1402 	if (!base->management_channel) {
1403 		SPDK_ERRLOG("Unable to get io channel '%s'\n", base->name);
1404 		spdk_bdev_module_release_bdev(base->bdev);
1405 		spdk_bdev_close(base->desc);
1406 		return -ENOMEM;
1407 	}
1408 
1409 	/* Save the thread where the base device is opened */
1410 	base->thread = spdk_get_thread();
1411 
1412 	base->attached = true;
1413 	return status;
1414 }
1415 
1416 /* Attach base bdevs */
1417 static int
1418 attach_base_bdevs(struct vbdev_ocf *vbdev,
1419 		  struct spdk_bdev *cache_bdev,
1420 		  struct spdk_bdev *core_bdev)
1421 {
1422 	int rc = 0;
1423 
1424 	if (cache_bdev) {
1425 		vbdev->cache.bdev = cache_bdev;
1426 		rc |= attach_base(&vbdev->cache);
1427 	}
1428 
1429 	if (core_bdev) {
1430 		vbdev->core.bdev = core_bdev;
1431 		rc |= attach_base(&vbdev->core);
1432 	}
1433 
1434 	return rc;
1435 }
1436 
1437 /* Init and then start vbdev if all base devices are present */
1438 void
1439 vbdev_ocf_construct(const char *vbdev_name,
1440 		    const char *cache_mode_name,
1441 		    const uint64_t cache_line_size,
1442 		    const char *cache_name,
1443 		    const char *core_name,
1444 		    bool loadq,
1445 		    void (*cb)(int, struct vbdev_ocf *, void *),
1446 		    void *cb_arg)
1447 {
1448 	int rc;
1449 	struct spdk_bdev *cache_bdev = spdk_bdev_get_by_name(cache_name);
1450 	struct spdk_bdev *core_bdev = spdk_bdev_get_by_name(core_name);
1451 	struct vbdev_ocf *vbdev;
1452 
1453 	rc = init_vbdev(vbdev_name, cache_mode_name, cache_line_size, cache_name, core_name, loadq);
1454 	if (rc) {
1455 		cb(rc, NULL, cb_arg);
1456 		return;
1457 	}
1458 
1459 	vbdev = vbdev_ocf_get_by_name(vbdev_name);
1460 	if (vbdev == NULL) {
1461 		cb(-ENODEV, NULL, cb_arg);
1462 		return;
1463 	}
1464 
1465 	if (cache_bdev == NULL) {
1466 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for cache device '%s' to connect\n",
1467 			       vbdev->name, cache_name);
1468 	}
1469 	if (core_bdev == NULL) {
1470 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for core device '%s' to connect\n",
1471 			       vbdev->name, core_name);
1472 	}
1473 
1474 	rc = attach_base_bdevs(vbdev, cache_bdev, core_bdev);
1475 	if (rc) {
1476 		cb(rc, vbdev, cb_arg);
1477 		return;
1478 	}
1479 
1480 	if (core_bdev && cache_bdev) {
1481 		register_vbdev(vbdev, cb, cb_arg);
1482 	} else {
1483 		cb(0, vbdev, cb_arg);
1484 	}
1485 }
1486 
1487 /* Set new cache mode on OCF cache */
1488 void
1489 vbdev_ocf_set_cache_mode(struct vbdev_ocf *vbdev,
1490 			 const char *cache_mode_name,
1491 			 void (*cb)(int, struct vbdev_ocf *, void *),
1492 			 void *cb_arg)
1493 {
1494 	ocf_cache_t cache;
1495 	ocf_cache_mode_t cache_mode;
1496 	int rc;
1497 
1498 	cache = vbdev->ocf_cache;
1499 	cache_mode = ocf_get_cache_mode(cache_mode_name);
1500 
1501 	rc = ocf_mngt_cache_trylock(cache);
1502 	if (rc) {
1503 		cb(rc, vbdev, cb_arg);
1504 		return;
1505 	}
1506 
1507 	rc = ocf_mngt_cache_set_mode(cache, cache_mode);
1508 	ocf_mngt_cache_unlock(cache);
1509 	cb(rc, vbdev, cb_arg);
1510 }
1511 
1512 /* This called if new device is created in SPDK application
1513  * If that device named as one of base bdevs of OCF vbdev,
1514  * claim and open them */
1515 static void
1516 vbdev_ocf_examine(struct spdk_bdev *bdev)
1517 {
1518 	const char *bdev_name = spdk_bdev_get_name(bdev);
1519 	struct vbdev_ocf *vbdev;
1520 
1521 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1522 		if (vbdev->state.doing_finish) {
1523 			continue;
1524 		}
1525 
1526 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1527 			attach_base_bdevs(vbdev, bdev, NULL);
1528 			continue;
1529 		}
1530 		if (!strcmp(bdev_name, vbdev->core.name)) {
1531 			attach_base_bdevs(vbdev, NULL, bdev);
1532 			break;
1533 		}
1534 	}
1535 	spdk_bdev_module_examine_done(&ocf_if);
1536 }
1537 
1538 struct metadata_probe_ctx {
1539 	struct vbdev_ocf_base base;
1540 	ocf_volume_t volume;
1541 
1542 	struct ocf_volume_uuid *core_uuids;
1543 	unsigned int uuid_count;
1544 
1545 	int result;
1546 	int refcnt;
1547 };
1548 
1549 static void
1550 _examine_ctx_put(void *ctx)
1551 {
1552 	struct spdk_bdev_desc *desc = ctx;
1553 
1554 	spdk_bdev_close(desc);
1555 }
1556 
1557 static void
1558 examine_ctx_put(struct metadata_probe_ctx *ctx)
1559 {
1560 	unsigned int i;
1561 
1562 	ctx->refcnt--;
1563 	if (ctx->refcnt > 0) {
1564 		return;
1565 	}
1566 
1567 	if (ctx->result) {
1568 		SPDK_ERRLOG("OCF metadata probe for bdev '%s' failed with %d\n",
1569 			    spdk_bdev_get_name(ctx->base.bdev), ctx->result);
1570 	}
1571 
1572 	if (ctx->base.desc) {
1573 		/* Close the underlying bdev on its same opened thread. */
1574 		if (ctx->base.thread && ctx->base.thread != spdk_get_thread()) {
1575 			spdk_thread_send_msg(ctx->base.thread, _examine_ctx_put, ctx->base.desc);
1576 		} else {
1577 			spdk_bdev_close(ctx->base.desc);
1578 		}
1579 	}
1580 
1581 	if (ctx->volume) {
1582 		ocf_volume_destroy(ctx->volume);
1583 	}
1584 
1585 	if (ctx->core_uuids) {
1586 		for (i = 0; i < ctx->uuid_count; i++) {
1587 			free(ctx->core_uuids[i].data);
1588 		}
1589 	}
1590 	free(ctx->core_uuids);
1591 
1592 	examine_done(ctx->result, NULL, ctx->base.bdev);
1593 	free(ctx);
1594 }
1595 
1596 static void
1597 metadata_probe_construct_cb(int rc, struct vbdev_ocf *vbdev, void *vctx)
1598 {
1599 	struct metadata_probe_ctx *ctx = vctx;
1600 
1601 	examine_ctx_put(ctx);
1602 }
1603 
1604 /* This is second callback for ocf_metadata_probe_cores()
1605  * Here we create vbdev configurations based on UUIDs */
1606 static void
1607 metadata_probe_cores_construct(void *priv, int error, unsigned int num_cores)
1608 {
1609 	struct metadata_probe_ctx *ctx = priv;
1610 	const char *vbdev_name;
1611 	const char *core_name;
1612 	const char *cache_name;
1613 	unsigned int i;
1614 
1615 	if (error) {
1616 		ctx->result = error;
1617 		examine_ctx_put(ctx);
1618 		return;
1619 	}
1620 
1621 	for (i = 0; i < num_cores; i++) {
1622 		core_name = ocf_uuid_to_str(&ctx->core_uuids[i]);
1623 		vbdev_name = core_name + strlen(core_name) + 1;
1624 		cache_name = vbdev_name + strlen(vbdev_name) + 1;
1625 
1626 		if (strcmp(ctx->base.bdev->name, cache_name)) {
1627 			SPDK_NOTICELOG("OCF metadata found on %s belongs to bdev named '%s'\n",
1628 				       ctx->base.bdev->name, cache_name);
1629 		}
1630 
1631 		ctx->refcnt++;
1632 		vbdev_ocf_construct(vbdev_name, NULL, 0, cache_name, core_name, true,
1633 				    metadata_probe_construct_cb, ctx);
1634 	}
1635 
1636 	examine_ctx_put(ctx);
1637 }
1638 
1639 /* This callback is called after OCF reads cores UUIDs from cache metadata
1640  * Here we allocate memory for those UUIDs and call ocf_metadata_probe_cores() again */
1641 static void
1642 metadata_probe_cores_get_num(void *priv, int error, unsigned int num_cores)
1643 {
1644 	struct metadata_probe_ctx *ctx = priv;
1645 	unsigned int i;
1646 
1647 	if (error) {
1648 		ctx->result = error;
1649 		examine_ctx_put(ctx);
1650 		return;
1651 	}
1652 
1653 	ctx->uuid_count = num_cores;
1654 	ctx->core_uuids = calloc(num_cores, sizeof(struct ocf_volume_uuid));
1655 	if (!ctx->core_uuids) {
1656 		ctx->result = -ENOMEM;
1657 		examine_ctx_put(ctx);
1658 		return;
1659 	}
1660 
1661 	for (i = 0; i < ctx->uuid_count; i++) {
1662 		ctx->core_uuids[i].size = OCF_VOLUME_UUID_MAX_SIZE;
1663 		ctx->core_uuids[i].data = malloc(OCF_VOLUME_UUID_MAX_SIZE);
1664 		if (!ctx->core_uuids[i].data) {
1665 			ctx->result = -ENOMEM;
1666 			examine_ctx_put(ctx);
1667 			return;
1668 		}
1669 	}
1670 
1671 	ocf_metadata_probe_cores(vbdev_ocf_ctx, ctx->volume, ctx->core_uuids, ctx->uuid_count,
1672 				 metadata_probe_cores_construct, ctx);
1673 }
1674 
1675 static void
1676 metadata_probe_cb(void *priv, int rc,
1677 		  struct ocf_metadata_probe_status *status)
1678 {
1679 	struct metadata_probe_ctx *ctx = priv;
1680 
1681 	if (rc) {
1682 		/* -ENODATA means device does not have cache metadata on it */
1683 		if (rc != -OCF_ERR_NO_METADATA) {
1684 			ctx->result = rc;
1685 		}
1686 		examine_ctx_put(ctx);
1687 		return;
1688 	}
1689 
1690 	ocf_metadata_probe_cores(vbdev_ocf_ctx, ctx->volume, NULL, 0,
1691 				 metadata_probe_cores_get_num, ctx);
1692 }
1693 
1694 /* This is called after vbdev_ocf_examine
1695  * It allows to delay application initialization
1696  * until all OCF bdevs get registered
1697  * If vbdev has all of its base devices it starts asynchronously here
1698  * We first check if bdev appears in configuration,
1699  * if not we do metadata_probe() to create its configuration from bdev metadata */
1700 static void
1701 vbdev_ocf_examine_disk(struct spdk_bdev *bdev)
1702 {
1703 	const char *bdev_name = spdk_bdev_get_name(bdev);
1704 	struct vbdev_ocf *vbdev;
1705 	struct metadata_probe_ctx *ctx;
1706 	bool created_from_config = false;
1707 	int rc;
1708 
1709 	examine_start(bdev);
1710 
1711 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1712 		if (vbdev->state.doing_finish || vbdev->state.started) {
1713 			continue;
1714 		}
1715 
1716 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1717 			examine_start(bdev);
1718 			register_vbdev(vbdev, examine_done, bdev);
1719 			created_from_config = true;
1720 			continue;
1721 		}
1722 		if (!strcmp(bdev_name, vbdev->core.name)) {
1723 			examine_start(bdev);
1724 			register_vbdev(vbdev, examine_done, bdev);
1725 			examine_done(0, NULL, bdev);
1726 			return;
1727 		}
1728 	}
1729 
1730 	/* If devices is discovered during config we do not check for metadata */
1731 	if (created_from_config) {
1732 		examine_done(0, NULL, bdev);
1733 		return;
1734 	}
1735 
1736 	/* Metadata probe path
1737 	 * We create temporary OCF volume and a temporary base structure
1738 	 * to use them for ocf_metadata_probe() and for bottom adapter IOs
1739 	 * Then we get UUIDs of core devices an create configurations based on them */
1740 	ctx = calloc(1, sizeof(*ctx));
1741 	if (!ctx) {
1742 		examine_done(-ENOMEM, NULL, bdev);
1743 		return;
1744 	}
1745 
1746 	ctx->base.bdev = bdev;
1747 	ctx->refcnt = 1;
1748 
1749 	rc = spdk_bdev_open_ext(bdev_name, true, base_bdev_event_cb, NULL, &ctx->base.desc);
1750 	if (rc) {
1751 		ctx->result = rc;
1752 		examine_ctx_put(ctx);
1753 		return;
1754 	}
1755 
1756 	rc = ocf_ctx_volume_create(vbdev_ocf_ctx, &ctx->volume, NULL, SPDK_OBJECT);
1757 	if (rc) {
1758 		ctx->result = rc;
1759 		examine_ctx_put(ctx);
1760 		return;
1761 	}
1762 
1763 	rc = ocf_volume_open(ctx->volume, &ctx->base);
1764 	if (rc) {
1765 		ctx->result = rc;
1766 		examine_ctx_put(ctx);
1767 		return;
1768 	}
1769 
1770 	/* Save the thread where the base device is opened */
1771 	ctx->base.thread = spdk_get_thread();
1772 
1773 	ocf_metadata_probe(vbdev_ocf_ctx, ctx->volume, metadata_probe_cb, ctx);
1774 }
1775 
1776 static int
1777 vbdev_ocf_get_ctx_size(void)
1778 {
1779 	return sizeof(struct bdev_ocf_data);
1780 }
1781 
1782 static void
1783 fini_start(void)
1784 {
1785 	g_fini_started = true;
1786 }
1787 
1788 /* Module-global function table
1789  * Does not relate to vbdev instances */
1790 static struct spdk_bdev_module ocf_if = {
1791 	.name = "ocf",
1792 	.module_init = vbdev_ocf_init,
1793 	.fini_start = fini_start,
1794 	.module_fini = vbdev_ocf_module_fini,
1795 	.get_ctx_size = vbdev_ocf_get_ctx_size,
1796 	.examine_config = vbdev_ocf_examine,
1797 	.examine_disk   = vbdev_ocf_examine_disk,
1798 };
1799 SPDK_BDEV_MODULE_REGISTER(ocf, &ocf_if);
1800