xref: /spdk/module/bdev/ocf/vbdev_ocf.c (revision 8efa583f13dddb1adc12dcac27022d5cf1160d90)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include <ocf/ocf.h>
35 #include <ocf/ocf_types.h>
36 #include <ocf/ocf_mngt.h>
37 
38 #include "ctx.h"
39 #include "data.h"
40 #include "volume.h"
41 #include "utils.h"
42 #include "vbdev_ocf.h"
43 
44 #include "spdk/bdev_module.h"
45 #include "spdk/conf.h"
46 #include "spdk/io_channel.h"
47 #include "spdk/string.h"
48 #include "spdk_internal/log.h"
49 #include "spdk/cpuset.h"
50 
51 static struct spdk_bdev_module ocf_if;
52 
53 static TAILQ_HEAD(, vbdev_ocf) g_ocf_vbdev_head
54 	= TAILQ_HEAD_INITIALIZER(g_ocf_vbdev_head);
55 
56 static TAILQ_HEAD(, examining_bdev) g_ocf_examining_bdevs_head
57 	= TAILQ_HEAD_INITIALIZER(g_ocf_examining_bdevs_head);
58 
59 bool g_fini_started = false;
60 
61 /* Structure for keeping list of bdevs that are claimed but not used yet */
62 struct examining_bdev {
63 	struct spdk_bdev           *bdev;
64 	TAILQ_ENTRY(examining_bdev) tailq;
65 };
66 
67 /* Add bdev to list of claimed */
68 static void
69 examine_start(struct spdk_bdev *bdev)
70 {
71 	struct examining_bdev *entry = malloc(sizeof(*entry));
72 
73 	assert(entry);
74 	entry->bdev = bdev;
75 	TAILQ_INSERT_TAIL(&g_ocf_examining_bdevs_head, entry, tailq);
76 }
77 
78 /* Find bdev on list of claimed bdevs, then remove it,
79  * if it was the last one on list then report examine done */
80 static void
81 examine_done(int status, struct vbdev_ocf *vbdev, void *cb_arg)
82 {
83 	struct spdk_bdev *bdev = cb_arg;
84 	struct examining_bdev *entry, *safe, *found = NULL;
85 
86 	TAILQ_FOREACH_SAFE(entry, &g_ocf_examining_bdevs_head, tailq, safe) {
87 		if (entry->bdev == bdev) {
88 			if (found) {
89 				goto remove;
90 			} else {
91 				found = entry;
92 			}
93 		}
94 	}
95 
96 	assert(found);
97 	spdk_bdev_module_examine_done(&ocf_if);
98 
99 remove:
100 	TAILQ_REMOVE(&g_ocf_examining_bdevs_head, found, tailq);
101 	free(found);
102 }
103 
104 /* Free allocated strings and structure itself
105  * Used at shutdown only */
106 static void
107 free_vbdev(struct vbdev_ocf *vbdev)
108 {
109 	if (!vbdev) {
110 		return;
111 	}
112 
113 	free(vbdev->name);
114 	free(vbdev->cache.name);
115 	free(vbdev->core.name);
116 	free(vbdev);
117 }
118 
119 /* Get existing cache base
120  * that is attached to other vbdev */
121 static struct vbdev_ocf_base *
122 get_other_cache_base(struct vbdev_ocf_base *base)
123 {
124 	struct vbdev_ocf *vbdev;
125 
126 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
127 		if (&vbdev->cache == base || !vbdev->cache.attached) {
128 			continue;
129 		}
130 		if (!strcmp(vbdev->cache.name, base->name)) {
131 			return &vbdev->cache;
132 		}
133 	}
134 
135 	return NULL;
136 }
137 
138 /* Get existing OCF cache instance
139  * that is started by other vbdev */
140 static ocf_cache_t
141 get_other_cache_instance(struct vbdev_ocf *vbdev)
142 {
143 	struct vbdev_ocf *cmp;
144 
145 	TAILQ_FOREACH(cmp, &g_ocf_vbdev_head, tailq) {
146 		if (cmp->state.doing_finish || cmp == vbdev) {
147 			continue;
148 		}
149 		if (strcmp(cmp->cache.name, vbdev->cache.name)) {
150 			continue;
151 		}
152 		if (cmp->ocf_cache) {
153 			return cmp->ocf_cache;
154 		}
155 	}
156 
157 	return NULL;
158 }
159 
160 /* Close and unclaim base bdev */
161 static void
162 remove_base_bdev(struct vbdev_ocf_base *base)
163 {
164 	if (base->attached) {
165 		spdk_bdev_module_release_bdev(base->bdev);
166 		spdk_bdev_close(base->desc);
167 		base->attached = false;
168 
169 		if (base->management_channel && !base->is_cache) {
170 			spdk_put_io_channel(base->management_channel);
171 		}
172 	}
173 }
174 
175 /* Finish unregister operation */
176 static void
177 unregister_finish(struct vbdev_ocf *vbdev)
178 {
179 	spdk_bdev_destruct_done(&vbdev->exp_bdev, vbdev->state.stop_status);
180 	vbdev_ocf_cache_ctx_put(vbdev->cache_ctx);
181 	vbdev_ocf_mngt_continue(vbdev, 0);
182 }
183 
184 static void
185 close_core_bdev(struct vbdev_ocf *vbdev)
186 {
187 	remove_base_bdev(&vbdev->core);
188 	vbdev_ocf_mngt_continue(vbdev, 0);
189 }
190 
191 static void
192 remove_core_cmpl(void *priv, int error)
193 {
194 	struct vbdev_ocf *vbdev = priv;
195 
196 	ocf_mngt_cache_unlock(vbdev->ocf_cache);
197 	vbdev_ocf_mngt_continue(vbdev, error);
198 }
199 
200 /* Try to lock cache, then remove core */
201 static void
202 remove_core_poll(struct vbdev_ocf *vbdev)
203 {
204 	int rc;
205 
206 	rc = ocf_mngt_cache_trylock(vbdev->ocf_cache);
207 	if (rc) {
208 		return;
209 	}
210 
211 	vbdev_ocf_mngt_poll(vbdev, NULL);
212 	ocf_mngt_cache_remove_core(vbdev->ocf_core, remove_core_cmpl, vbdev);
213 }
214 
215 /* Detach core base */
216 static void
217 detach_core(struct vbdev_ocf *vbdev)
218 {
219 	if (vbdev->ocf_cache && ocf_cache_is_running(vbdev->ocf_cache)) {
220 		vbdev_ocf_mngt_poll(vbdev, remove_core_poll);
221 	} else {
222 		vbdev_ocf_mngt_continue(vbdev, 0);
223 	}
224 }
225 
226 static void
227 close_cache_bdev(struct vbdev_ocf *vbdev)
228 {
229 	remove_base_bdev(&vbdev->cache);
230 	vbdev_ocf_mngt_continue(vbdev, 0);
231 }
232 
233 /* Detach cache base */
234 static void
235 detach_cache(struct vbdev_ocf *vbdev)
236 {
237 	vbdev->state.stop_status = vbdev->mngt_ctx.status;
238 
239 	/* If some other vbdev references this cache bdev,
240 	 * we detach this only by changing the flag, without actual close */
241 	if (get_other_cache_base(&vbdev->cache)) {
242 		vbdev->cache.attached = false;
243 	}
244 
245 	vbdev_ocf_mngt_continue(vbdev, 0);
246 }
247 
248 static void
249 stop_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
250 {
251 	struct vbdev_ocf *vbdev = priv;
252 
253 	vbdev_ocf_queue_put(vbdev->cache_ctx->mngt_queue);
254 	ocf_mngt_cache_unlock(cache);
255 	spdk_put_io_channel(vbdev->cache.management_channel);
256 
257 	vbdev_ocf_mngt_continue(vbdev, error);
258 }
259 
260 /* Try to lock cache, then stop it */
261 static void
262 stop_vbdev_poll(struct vbdev_ocf *vbdev)
263 {
264 	if (!ocf_cache_is_running(vbdev->ocf_cache)) {
265 		vbdev_ocf_mngt_continue(vbdev, 0);
266 		return;
267 	}
268 
269 	if (!g_fini_started && get_other_cache_instance(vbdev)) {
270 		SPDK_NOTICELOG("Not stopping cache instance '%s'"
271 			       " because it is referenced by other OCF bdev\n",
272 			       vbdev->cache.name);
273 		vbdev_ocf_mngt_continue(vbdev, 0);
274 		return;
275 	}
276 
277 	if (ocf_mngt_cache_trylock(vbdev->ocf_cache)) {
278 		return;
279 	}
280 
281 	vbdev_ocf_mngt_poll(vbdev, NULL);
282 	ocf_mngt_cache_stop(vbdev->ocf_cache, stop_vbdev_cmpl, vbdev);
283 }
284 
285 /* Stop OCF cache object
286  * vbdev_ocf is not operational after this */
287 static void
288 stop_vbdev(struct vbdev_ocf *vbdev)
289 {
290 	vbdev_ocf_mngt_poll(vbdev, stop_vbdev_poll);
291 }
292 
293 /* Wait for all OCF requests to finish */
294 static void
295 wait_for_requests_poll(struct vbdev_ocf *vbdev)
296 {
297 	if (ocf_cache_has_pending_requests(vbdev->ocf_cache)) {
298 		return;
299 	}
300 
301 	vbdev_ocf_mngt_continue(vbdev, 0);
302 }
303 
304 /* Start waiting for OCF requests to finish */
305 static void
306 wait_for_requests(struct vbdev_ocf *vbdev)
307 {
308 	vbdev_ocf_mngt_poll(vbdev, wait_for_requests_poll);
309 }
310 
311 static void
312 flush_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
313 {
314 	struct vbdev_ocf *vbdev = priv;
315 
316 	ocf_mngt_cache_unlock(cache);
317 	vbdev_ocf_mngt_continue(vbdev, error);
318 }
319 
320 static void
321 flush_vbdev_poll(struct vbdev_ocf *vbdev)
322 {
323 	if (!ocf_cache_is_running(vbdev->ocf_cache)) {
324 		vbdev_ocf_mngt_continue(vbdev, -EINVAL);
325 		return;
326 	}
327 
328 	if (ocf_mngt_cache_trylock(vbdev->ocf_cache)) {
329 		return;
330 	}
331 
332 	vbdev_ocf_mngt_poll(vbdev, NULL);
333 	ocf_mngt_cache_flush(vbdev->ocf_cache, false, flush_vbdev_cmpl, vbdev);
334 }
335 
336 static void
337 flush_vbdev(struct vbdev_ocf *vbdev)
338 {
339 	vbdev_ocf_mngt_poll(vbdev, flush_vbdev_poll);
340 }
341 
342 /* Procedures called during dirty unregister */
343 vbdev_ocf_mngt_fn unregister_path_dirty[] = {
344 	flush_vbdev,
345 	wait_for_requests,
346 	stop_vbdev,
347 	detach_cache,
348 	close_cache_bdev,
349 	detach_core,
350 	close_core_bdev,
351 	unregister_finish,
352 	NULL
353 };
354 
355 /* Procedures called during clean unregister */
356 vbdev_ocf_mngt_fn unregister_path_clean[] = {
357 	flush_vbdev,
358 	wait_for_requests,
359 	detach_core,
360 	close_core_bdev,
361 	stop_vbdev,
362 	detach_cache,
363 	close_cache_bdev,
364 	unregister_finish,
365 	NULL
366 };
367 
368 /* Start asynchronous management operation using unregister_path */
369 static void
370 unregister_cb(void *opaque)
371 {
372 	struct vbdev_ocf *vbdev = opaque;
373 	vbdev_ocf_mngt_fn *unregister_path;
374 	int rc;
375 
376 	unregister_path = vbdev->state.doing_clean_delete ?
377 			  unregister_path_clean : unregister_path_dirty;
378 
379 	rc = vbdev_ocf_mngt_start(vbdev, unregister_path, NULL, NULL);
380 	if (rc) {
381 		SPDK_ERRLOG("Unable to unregister OCF bdev: %d\n", rc);
382 		spdk_bdev_destruct_done(&vbdev->exp_bdev, rc);
383 	}
384 }
385 
386 /* Clean remove case - remove core and then cache, this order
387  * will remove instance permanently */
388 static void
389 _vbdev_ocf_destruct_clean(struct vbdev_ocf *vbdev)
390 {
391 	if (vbdev->core.attached) {
392 		detach_core(vbdev);
393 		close_core_bdev(vbdev);
394 	}
395 
396 	if (vbdev->cache.attached) {
397 		detach_cache(vbdev);
398 		close_cache_bdev(vbdev);
399 	}
400 }
401 
402 /* Dirty shutdown/hot remove case - remove cache and then core, this order
403  * will allow us to recover this instance in the future */
404 static void
405 _vbdev_ocf_destruct_dirty(struct vbdev_ocf *vbdev)
406 {
407 	if (vbdev->cache.attached) {
408 		detach_cache(vbdev);
409 		close_cache_bdev(vbdev);
410 	}
411 
412 	if (vbdev->core.attached) {
413 		detach_core(vbdev);
414 		close_core_bdev(vbdev);
415 	}
416 }
417 
418 /* Unregister io device with callback to unregister_cb
419  * This function is called during spdk_bdev_unregister */
420 static int
421 vbdev_ocf_destruct(void *opaque)
422 {
423 	struct vbdev_ocf *vbdev = opaque;
424 
425 	if (vbdev->state.doing_finish) {
426 		return -EALREADY;
427 	}
428 
429 	if (vbdev->state.starting && !vbdev->state.started) {
430 		/* Prevent before detach cache/core during register path of
431 		  this bdev */
432 		return -EBUSY;
433 	}
434 
435 	vbdev->state.doing_finish = true;
436 
437 	if (vbdev->state.started) {
438 		spdk_io_device_unregister(vbdev, unregister_cb);
439 		/* Return 1 because unregister is delayed */
440 		return 1;
441 	}
442 
443 	if (vbdev->state.doing_clean_delete) {
444 		_vbdev_ocf_destruct_clean(vbdev);
445 	} else {
446 		_vbdev_ocf_destruct_dirty(vbdev);
447 	}
448 
449 	return 0;
450 }
451 
452 /* Stop OCF cache and unregister SPDK bdev */
453 int
454 vbdev_ocf_delete(struct vbdev_ocf *vbdev, void (*cb)(void *, int), void *cb_arg)
455 {
456 	int rc = 0;
457 
458 	if (vbdev->state.started) {
459 		spdk_bdev_unregister(&vbdev->exp_bdev, cb, cb_arg);
460 	} else {
461 		rc = vbdev_ocf_destruct(vbdev);
462 		if (rc == 0 && cb) {
463 			cb(cb_arg, 0);
464 		}
465 	}
466 
467 	return rc;
468 }
469 
470 /* Remove cores permanently and then stop OCF cache and unregister SPDK bdev */
471 int
472 vbdev_ocf_delete_clean(struct vbdev_ocf *vbdev, void (*cb)(void *, int),
473 		       void *cb_arg)
474 {
475 	vbdev->state.doing_clean_delete = true;
476 
477 	return vbdev_ocf_delete(vbdev, cb, cb_arg);
478 }
479 
480 
481 /* If vbdev is online, return its object */
482 struct vbdev_ocf *
483 vbdev_ocf_get_by_name(const char *name)
484 {
485 	struct vbdev_ocf *vbdev;
486 
487 	if (name == NULL) {
488 		assert(false);
489 		return NULL;
490 	}
491 
492 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
493 		if (vbdev->name == NULL || vbdev->state.doing_finish) {
494 			continue;
495 		}
496 		if (strcmp(vbdev->name, name) == 0) {
497 			return vbdev;
498 		}
499 	}
500 	return NULL;
501 }
502 
503 /* Return matching base if parent vbdev is online */
504 struct vbdev_ocf_base *
505 vbdev_ocf_get_base_by_name(const char *name)
506 {
507 	struct vbdev_ocf *vbdev;
508 
509 	if (name == NULL) {
510 		assert(false);
511 		return NULL;
512 	}
513 
514 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
515 		if (vbdev->state.doing_finish) {
516 			continue;
517 		}
518 
519 		if (vbdev->cache.name && strcmp(vbdev->cache.name, name) == 0) {
520 			return &vbdev->cache;
521 		}
522 		if (vbdev->core.name && strcmp(vbdev->core.name, name) == 0) {
523 			return &vbdev->core;
524 		}
525 	}
526 	return NULL;
527 }
528 
529 /* Execute fn for each OCF device that is online or waits for base devices */
530 void
531 vbdev_ocf_foreach(vbdev_ocf_foreach_fn fn, void *ctx)
532 {
533 	struct vbdev_ocf *vbdev;
534 
535 	assert(fn != NULL);
536 
537 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
538 		if (!vbdev->state.doing_finish) {
539 			fn(vbdev, ctx);
540 		}
541 	}
542 }
543 
544 /* Called from OCF when SPDK_IO is completed */
545 static void
546 vbdev_ocf_io_submit_cb(struct ocf_io *io, int error)
547 {
548 	struct spdk_bdev_io *bdev_io = io->priv1;
549 
550 	if (error == 0) {
551 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
552 	} else if (error == -ENOMEM) {
553 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
554 	} else {
555 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
556 	}
557 
558 	ocf_io_put(io);
559 }
560 
561 /* Configure io parameters and send it to OCF */
562 static int
563 io_submit_to_ocf(struct spdk_bdev_io *bdev_io, struct ocf_io *io)
564 {
565 	int dir;
566 	uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
567 	uint64_t offset = bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen;
568 
569 	switch (bdev_io->type) {
570 	case SPDK_BDEV_IO_TYPE_WRITE:
571 	case SPDK_BDEV_IO_TYPE_READ:
572 		dir = OCF_READ;
573 		if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
574 			dir = OCF_WRITE;
575 		}
576 		ocf_io_configure(io, offset, len, dir, 0, 0);
577 		ocf_core_submit_io(io);
578 		return 0;
579 	case SPDK_BDEV_IO_TYPE_FLUSH:
580 		ocf_io_configure(io, offset, len, OCF_WRITE, 0, OCF_WRITE_FLUSH);
581 		ocf_core_submit_flush(io);
582 		return 0;
583 	case SPDK_BDEV_IO_TYPE_UNMAP:
584 		ocf_io_configure(io, offset, len, 0, 0, 0);
585 		ocf_core_submit_discard(io);
586 		return 0;
587 	case SPDK_BDEV_IO_TYPE_RESET:
588 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
589 	default:
590 		SPDK_ERRLOG("Unsupported IO type: %d\n", bdev_io->type);
591 		return -EINVAL;
592 	}
593 }
594 
595 /* Submit SPDK-IO to OCF */
596 static void
597 io_handle(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
598 {
599 	struct vbdev_ocf *vbdev = bdev_io->bdev->ctxt;
600 	struct ocf_io *io = NULL;
601 	struct bdev_ocf_data *data = NULL;
602 	struct vbdev_ocf_qcxt *qctx = spdk_io_channel_get_ctx(ch);
603 	int err;
604 
605 	io = ocf_core_new_io(vbdev->ocf_core);
606 	if (!io) {
607 		err = -ENOMEM;
608 		goto fail;
609 	}
610 
611 	ocf_io_set_queue(io, qctx->queue);
612 
613 	data = vbdev_ocf_data_from_spdk_io(bdev_io);
614 	if (!data) {
615 		err = -ENOMEM;
616 		goto fail;
617 	}
618 
619 	err = ocf_io_set_data(io, data, 0);
620 	if (err) {
621 		goto fail;
622 	}
623 
624 	ocf_io_set_cmpl(io, bdev_io, NULL, vbdev_ocf_io_submit_cb);
625 
626 	err = io_submit_to_ocf(bdev_io, io);
627 	if (err) {
628 		goto fail;
629 	}
630 
631 	return;
632 
633 fail:
634 	if (io) {
635 		ocf_io_put(io);
636 	}
637 
638 	if (err == -ENOMEM) {
639 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
640 	} else {
641 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
642 	}
643 }
644 
645 static void
646 vbdev_ocf_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
647 		     bool success)
648 {
649 	if (!success) {
650 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
651 		return;
652 	}
653 
654 	io_handle(ch, bdev_io);
655 }
656 
657 /* Called from bdev layer when an io to Cache vbdev is submitted */
658 static void
659 vbdev_ocf_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
660 {
661 	switch (bdev_io->type) {
662 	case SPDK_BDEV_IO_TYPE_READ:
663 		/* User does not have to allocate io vectors for the request,
664 		 * so in case they are not allocated, we allocate them here */
665 		spdk_bdev_io_get_buf(bdev_io, vbdev_ocf_get_buf_cb,
666 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
667 		break;
668 	case SPDK_BDEV_IO_TYPE_WRITE:
669 	case SPDK_BDEV_IO_TYPE_FLUSH:
670 	case SPDK_BDEV_IO_TYPE_UNMAP:
671 		io_handle(ch, bdev_io);
672 		break;
673 	case SPDK_BDEV_IO_TYPE_RESET:
674 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
675 	default:
676 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
677 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
678 		break;
679 	}
680 }
681 
682 /* Called from bdev layer */
683 static bool
684 vbdev_ocf_io_type_supported(void *opaque, enum spdk_bdev_io_type io_type)
685 {
686 	struct vbdev_ocf *vbdev = opaque;
687 
688 	switch (io_type) {
689 	case SPDK_BDEV_IO_TYPE_READ:
690 	case SPDK_BDEV_IO_TYPE_WRITE:
691 	case SPDK_BDEV_IO_TYPE_FLUSH:
692 	case SPDK_BDEV_IO_TYPE_UNMAP:
693 		return spdk_bdev_io_type_supported(vbdev->core.bdev, io_type);
694 	case SPDK_BDEV_IO_TYPE_RESET:
695 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
696 	default:
697 		return false;
698 	}
699 }
700 
701 /* Called from bdev layer */
702 static struct spdk_io_channel *
703 vbdev_ocf_get_io_channel(void *opaque)
704 {
705 	struct vbdev_ocf *bdev = opaque;
706 
707 	return spdk_get_io_channel(bdev);
708 }
709 
710 static int
711 vbdev_ocf_dump_info_json(void *opaque, struct spdk_json_write_ctx *w)
712 {
713 	struct vbdev_ocf *vbdev = opaque;
714 
715 	spdk_json_write_named_string(w, "cache_device", vbdev->cache.name);
716 	spdk_json_write_named_string(w, "core_device", vbdev->core.name);
717 
718 	spdk_json_write_named_string(w, "mode",
719 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
720 	spdk_json_write_named_uint32(w, "cache_line_size",
721 				     ocf_cache_get_line_size(vbdev->ocf_cache));
722 	spdk_json_write_named_bool(w, "metadata_volatile",
723 				   vbdev->cfg.cache.metadata_volatile);
724 
725 	return 0;
726 }
727 
728 static void
729 vbdev_ocf_write_json_config(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
730 {
731 	struct vbdev_ocf *vbdev = bdev->ctxt;
732 
733 	spdk_json_write_object_begin(w);
734 
735 	spdk_json_write_named_string(w, "method", "bdev_ocf_create");
736 
737 	spdk_json_write_named_object_begin(w, "params");
738 	spdk_json_write_named_string(w, "name", vbdev->name);
739 	spdk_json_write_named_string(w, "mode",
740 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
741 	spdk_json_write_named_string(w, "cache_bdev_name", vbdev->cache.name);
742 	spdk_json_write_named_string(w, "core_bdev_name", vbdev->core.name);
743 	spdk_json_write_object_end(w);
744 
745 	spdk_json_write_object_end(w);
746 }
747 
748 /* Cache vbdev function table
749  * Used by bdev layer */
750 static struct spdk_bdev_fn_table cache_dev_fn_table = {
751 	.destruct = vbdev_ocf_destruct,
752 	.io_type_supported = vbdev_ocf_io_type_supported,
753 	.submit_request	= vbdev_ocf_submit_request,
754 	.get_io_channel	= vbdev_ocf_get_io_channel,
755 	.write_config_json = vbdev_ocf_write_json_config,
756 	.dump_info_json = vbdev_ocf_dump_info_json,
757 };
758 
759 /* Poller function for the OCF queue
760  * We execute OCF requests here synchronously */
761 static int
762 queue_poll(void *opaque)
763 {
764 	struct vbdev_ocf_qcxt *qctx = opaque;
765 	uint32_t iono = ocf_queue_pending_io(qctx->queue);
766 	int i, max = spdk_min(32, iono);
767 
768 	for (i = 0; i < max; i++) {
769 		ocf_queue_run_single(qctx->queue);
770 	}
771 
772 	if (iono > 0) {
773 		return 1;
774 	} else {
775 		return 0;
776 	}
777 }
778 
779 /* Called during ocf_submit_io, ocf_purge*
780  * and any other requests that need to submit io */
781 static void
782 vbdev_ocf_ctx_queue_kick(ocf_queue_t q)
783 {
784 }
785 
786 /* OCF queue deinitialization
787  * Called at ocf_cache_stop */
788 static void
789 vbdev_ocf_ctx_queue_stop(ocf_queue_t q)
790 {
791 	struct vbdev_ocf_qcxt *qctx = ocf_queue_get_priv(q);
792 
793 	if (qctx) {
794 		spdk_put_io_channel(qctx->cache_ch);
795 		spdk_put_io_channel(qctx->core_ch);
796 		spdk_poller_unregister(&qctx->poller);
797 		if (qctx->allocated) {
798 			free(qctx);
799 		}
800 	}
801 }
802 
803 /* Queue ops is an interface for running queue thread
804  * stop() operation in called just before queue gets destroyed */
805 const struct ocf_queue_ops queue_ops = {
806 	.kick_sync = vbdev_ocf_ctx_queue_kick,
807 	.kick = vbdev_ocf_ctx_queue_kick,
808 	.stop = vbdev_ocf_ctx_queue_stop,
809 };
810 
811 /* Called on cache vbdev creation at every thread
812  * We allocate OCF queues here and SPDK poller for it */
813 static int
814 io_device_create_cb(void *io_device, void *ctx_buf)
815 {
816 	struct vbdev_ocf *vbdev = io_device;
817 	struct vbdev_ocf_qcxt *qctx = ctx_buf;
818 	int rc;
819 
820 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &qctx->queue, &queue_ops);
821 	if (rc) {
822 		return rc;
823 	}
824 
825 	ocf_queue_set_priv(qctx->queue, qctx);
826 
827 	qctx->vbdev      = vbdev;
828 	qctx->cache_ch   = spdk_bdev_get_io_channel(vbdev->cache.desc);
829 	qctx->core_ch    = spdk_bdev_get_io_channel(vbdev->core.desc);
830 	qctx->poller     = spdk_poller_register(queue_poll, qctx, 0);
831 
832 	return rc;
833 }
834 
835 /* Called per thread
836  * Put OCF queue and relaunch poller with new context to finish pending requests */
837 static void
838 io_device_destroy_cb(void *io_device, void *ctx_buf)
839 {
840 	/* Making a copy of context to use it after io channel will be destroyed */
841 	struct vbdev_ocf_qcxt *copy = malloc(sizeof(*copy));
842 	struct vbdev_ocf_qcxt *qctx = ctx_buf;
843 
844 	if (copy) {
845 		ocf_queue_set_priv(qctx->queue, copy);
846 		memcpy(copy, qctx, sizeof(*copy));
847 		spdk_poller_unregister(&qctx->poller);
848 		copy->poller = spdk_poller_register(queue_poll, copy, 0);
849 		copy->allocated = true;
850 	} else {
851 		SPDK_ERRLOG("Unable to stop OCF queue properly: %s\n",
852 			    spdk_strerror(ENOMEM));
853 	}
854 
855 	vbdev_ocf_queue_put(qctx->queue);
856 }
857 
858 /* OCF management queue deinitialization */
859 static void
860 vbdev_ocf_ctx_mngt_queue_stop(ocf_queue_t q)
861 {
862 	struct spdk_poller *poller = ocf_queue_get_priv(q);
863 
864 	if (poller) {
865 		spdk_poller_unregister(&poller);
866 	}
867 }
868 
869 static int
870 mngt_queue_poll(void *opaque)
871 {
872 	ocf_queue_t q = opaque;
873 	uint32_t iono = ocf_queue_pending_io(q);
874 	int i, max = spdk_min(32, iono);
875 
876 	for (i = 0; i < max; i++) {
877 		ocf_queue_run_single(q);
878 	}
879 
880 	if (iono > 0) {
881 		return 1;
882 	} else {
883 		return 0;
884 	}
885 }
886 
887 static void
888 vbdev_ocf_ctx_mngt_queue_kick(ocf_queue_t q)
889 {
890 }
891 
892 /* Queue ops is an interface for running queue thread
893  * stop() operation in called just before queue gets destroyed */
894 const struct ocf_queue_ops mngt_queue_ops = {
895 	.kick_sync = NULL,
896 	.kick = vbdev_ocf_ctx_mngt_queue_kick,
897 	.stop = vbdev_ocf_ctx_mngt_queue_stop,
898 };
899 
900 /* Create exported spdk object */
901 static void
902 finish_register(struct vbdev_ocf *vbdev)
903 {
904 	int result;
905 
906 	vbdev->cache.management_channel = vbdev->cache_ctx->management_channel;
907 
908 	/* Copy properties of the base bdev */
909 	vbdev->exp_bdev.blocklen = vbdev->core.bdev->blocklen;
910 	vbdev->exp_bdev.write_cache = vbdev->core.bdev->write_cache;
911 	vbdev->exp_bdev.required_alignment = vbdev->core.bdev->required_alignment;
912 
913 	vbdev->exp_bdev.name = vbdev->name;
914 	vbdev->exp_bdev.product_name = "SPDK OCF";
915 
916 	vbdev->exp_bdev.blockcnt = vbdev->core.bdev->blockcnt;
917 	vbdev->exp_bdev.ctxt = vbdev;
918 	vbdev->exp_bdev.fn_table = &cache_dev_fn_table;
919 	vbdev->exp_bdev.module = &ocf_if;
920 
921 	/* Finally register vbdev in SPDK */
922 	spdk_io_device_register(vbdev, io_device_create_cb, io_device_destroy_cb,
923 				sizeof(struct vbdev_ocf_qcxt), vbdev->name);
924 	result = spdk_bdev_register(&vbdev->exp_bdev);
925 	if (result) {
926 		SPDK_ERRLOG("Could not register exposed bdev\n");
927 	} else {
928 		vbdev->state.started = true;
929 	}
930 
931 	vbdev_ocf_mngt_continue(vbdev, result);
932 }
933 
934 static void
935 add_core_cmpl(ocf_cache_t cache, ocf_core_t core, void *priv, int error)
936 {
937 	struct vbdev_ocf *vbdev = priv;
938 
939 	ocf_mngt_cache_unlock(cache);
940 
941 	if (error) {
942 		SPDK_ERRLOG("Failed to add core device to cache instance\n");
943 	} else {
944 		vbdev->ocf_core = core;
945 		vbdev->core.id  = ocf_core_get_id(core);
946 	}
947 
948 	vbdev->core.management_channel = spdk_bdev_get_io_channel(vbdev->core.desc);
949 	vbdev_ocf_mngt_continue(vbdev, error);
950 }
951 
952 /* Try to lock cache, then add core */
953 static void
954 add_core_poll(struct vbdev_ocf *vbdev)
955 {
956 	if (ocf_mngt_cache_trylock(vbdev->ocf_cache)) {
957 		return;
958 	}
959 
960 	vbdev_ocf_mngt_poll(vbdev, NULL);
961 	ocf_mngt_cache_add_core(vbdev->ocf_cache, &vbdev->cfg.core, add_core_cmpl, vbdev);
962 }
963 
964 /* Add core for existing OCF cache instance */
965 static void
966 add_core(struct vbdev_ocf *vbdev)
967 {
968 	vbdev_ocf_mngt_poll(vbdev, add_core_poll);
969 }
970 
971 static void
972 start_cache_cmpl(ocf_cache_t cache, void *priv, int error)
973 {
974 	struct vbdev_ocf *vbdev = priv;
975 
976 	ocf_mngt_cache_unlock(cache);
977 
978 	vbdev_ocf_mngt_continue(vbdev, error);
979 }
980 
981 static int
982 create_management_queue(struct vbdev_ocf *vbdev)
983 {
984 	struct spdk_poller *mngt_poller;
985 	int rc;
986 
987 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &vbdev->cache_ctx->mngt_queue, &mngt_queue_ops);
988 	if (rc) {
989 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
990 		return rc;
991 	}
992 
993 	mngt_poller = spdk_poller_register(mngt_queue_poll, vbdev->cache_ctx->mngt_queue, 100);
994 	if (mngt_poller == NULL) {
995 		SPDK_ERRLOG("Unable to initiate mngt request: %s", spdk_strerror(ENOMEM));
996 		return -ENOMEM;
997 	}
998 
999 	ocf_queue_set_priv(vbdev->cache_ctx->mngt_queue, mngt_poller);
1000 	ocf_mngt_cache_set_mngt_queue(vbdev->ocf_cache, vbdev->cache_ctx->mngt_queue);
1001 
1002 	return 0;
1003 }
1004 
1005 /* Start OCF cache, attach caching device */
1006 static void
1007 start_cache(struct vbdev_ocf *vbdev)
1008 {
1009 	ocf_cache_t existing;
1010 	int rc;
1011 
1012 	if (vbdev->ocf_cache) {
1013 		vbdev->mngt_ctx.status = -EALREADY;
1014 		vbdev_ocf_mngt_stop(vbdev);
1015 		return;
1016 	}
1017 
1018 	existing = get_other_cache_instance(vbdev);
1019 	if (existing) {
1020 		SPDK_NOTICELOG("OCF bdev %s connects to existing cache device %s\n",
1021 			       vbdev->name, vbdev->cache.name);
1022 		vbdev->ocf_cache = existing;
1023 		vbdev->cache.id = ocf_cache_get_id(existing);
1024 		vbdev->cache_ctx = ocf_cache_get_priv(existing);
1025 		vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1026 		vbdev_ocf_mngt_continue(vbdev, 0);
1027 		return;
1028 	}
1029 
1030 	vbdev->cache_ctx = calloc(1, sizeof(struct vbdev_ocf_cache_ctx));
1031 	if (vbdev->cache_ctx == NULL) {
1032 		vbdev->mngt_ctx.status = -ENOMEM;
1033 		vbdev_ocf_mngt_stop(vbdev);
1034 		return;
1035 	}
1036 
1037 	vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1038 	pthread_mutex_init(&vbdev->cache_ctx->lock, NULL);
1039 
1040 	rc = ocf_mngt_cache_start(vbdev_ocf_ctx, &vbdev->ocf_cache, &vbdev->cfg.cache);
1041 	if (rc) {
1042 		vbdev_ocf_cache_ctx_put(vbdev->cache_ctx);
1043 		vbdev->mngt_ctx.status = rc;
1044 		vbdev_ocf_mngt_stop(vbdev);
1045 		return;
1046 	}
1047 
1048 	vbdev->cache.id = ocf_cache_get_id(vbdev->ocf_cache);
1049 	ocf_cache_set_priv(vbdev->ocf_cache, vbdev->cache_ctx);
1050 
1051 	rc = create_management_queue(vbdev);
1052 	if (rc) {
1053 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1054 		vbdev_ocf_cache_ctx_put(vbdev->cache_ctx);
1055 		vbdev->mngt_ctx.status = rc;
1056 		vbdev_ocf_mngt_stop(vbdev);
1057 		return;
1058 	}
1059 
1060 	vbdev->cache_ctx->management_channel = spdk_bdev_get_io_channel(vbdev->cache.desc);
1061 	vbdev->cache.management_channel = vbdev->cache_ctx->management_channel;
1062 
1063 	if (vbdev->cfg.loadq) {
1064 		ocf_mngt_cache_load(vbdev->ocf_cache, &vbdev->cfg.device, start_cache_cmpl, vbdev);
1065 	} else {
1066 		ocf_mngt_cache_attach(vbdev->ocf_cache, &vbdev->cfg.device, start_cache_cmpl, vbdev);
1067 	}
1068 }
1069 
1070 /* Procedures called during register operation */
1071 vbdev_ocf_mngt_fn register_path[] = {
1072 	start_cache,
1073 	add_core,
1074 	finish_register,
1075 	NULL
1076 };
1077 
1078 /* Start cache instance and register OCF bdev */
1079 static void
1080 register_vbdev(struct vbdev_ocf *vbdev, vbdev_ocf_mngt_callback cb, void *cb_arg)
1081 {
1082 	int rc;
1083 
1084 	if (!(vbdev->core.attached && vbdev->cache.attached) || vbdev->state.started) {
1085 		cb(-EPERM, vbdev, cb_arg);
1086 		return;
1087 	}
1088 
1089 	vbdev->state.starting = true;
1090 	rc = vbdev_ocf_mngt_start(vbdev, register_path, cb, cb_arg);
1091 	if (rc) {
1092 		cb(rc, vbdev, cb_arg);
1093 	}
1094 }
1095 
1096 /* Init OCF configuration options
1097  * for core and cache devices */
1098 static void
1099 init_vbdev_config(struct vbdev_ocf *vbdev)
1100 {
1101 	struct vbdev_ocf_config *cfg = &vbdev->cfg;
1102 
1103 	/* Id 0 means OCF decides the id */
1104 	cfg->cache.id = 0;
1105 	cfg->cache.name = vbdev->name;
1106 
1107 	/* TODO [metadata]: make configurable with persistent
1108 	 * metadata support */
1109 	cfg->cache.metadata_volatile = false;
1110 
1111 	/* TODO [cache line size]: make cache line size configurable
1112 	 * Using standard 4KiB for now */
1113 	cfg->cache.cache_line_size = ocf_cache_line_size_4;
1114 
1115 	/* This are suggested values that
1116 	 * should be sufficient for most use cases */
1117 	cfg->cache.backfill.max_queue_size = 65536;
1118 	cfg->cache.backfill.queue_unblock_size = 60000;
1119 
1120 	/* TODO [cache line size] */
1121 	cfg->device.cache_line_size = ocf_cache_line_size_4;
1122 	cfg->device.force = true;
1123 	cfg->device.min_free_ram = 0;
1124 	cfg->device.perform_test = false;
1125 	cfg->device.discard_on_start = false;
1126 
1127 	vbdev->cfg.cache.locked = true;
1128 
1129 	cfg->core.volume_type = SPDK_OBJECT;
1130 	cfg->device.volume_type = SPDK_OBJECT;
1131 	cfg->core.core_id = OCF_CORE_MAX;
1132 
1133 	if (vbdev->cfg.loadq) {
1134 		/* When doing cache_load(), we need to set try_add to true,
1135 		 * otherwise OCF will interpret this core as new
1136 		 * instead of the inactive one */
1137 		vbdev->cfg.core.try_add = true;
1138 	}
1139 
1140 	/* Serialize bdev names in OCF UUID to interpret on future loads
1141 	 * Core UUID is pair of (core bdev name, cache bdev name)
1142 	 * Cache UUID is cache bdev name */
1143 	cfg->device.uuid.size = strlen(vbdev->cache.name) + 1;
1144 	cfg->device.uuid.data = vbdev->cache.name;
1145 
1146 	snprintf(vbdev->uuid, VBDEV_OCF_MD_MAX_LEN, "%s %s",
1147 		 vbdev->core.name, vbdev->name);
1148 	cfg->core.uuid.size = strlen(vbdev->uuid) + 1;
1149 	cfg->core.uuid.data = vbdev->uuid;
1150 	vbdev->uuid[strlen(vbdev->core.name)] = 0;
1151 }
1152 
1153 /* Allocate vbdev structure object and add it to the global list */
1154 static int
1155 init_vbdev(const char *vbdev_name,
1156 	   const char *cache_mode_name,
1157 	   const char *cache_name,
1158 	   const char *core_name,
1159 	   bool loadq)
1160 {
1161 	struct vbdev_ocf *vbdev;
1162 	int rc = 0;
1163 
1164 	if (spdk_bdev_get_by_name(vbdev_name) || vbdev_ocf_get_by_name(vbdev_name)) {
1165 		SPDK_ERRLOG("Device with name '%s' already exists\n", vbdev_name);
1166 		return -EPERM;
1167 	}
1168 
1169 	vbdev = calloc(1, sizeof(*vbdev));
1170 	if (!vbdev) {
1171 		goto error_mem;
1172 	}
1173 
1174 	vbdev->cache.parent = vbdev;
1175 	vbdev->core.parent = vbdev;
1176 	vbdev->cache.is_cache = true;
1177 	vbdev->core.is_cache = false;
1178 
1179 	if (cache_mode_name) {
1180 		vbdev->cfg.cache.cache_mode
1181 			= ocf_get_cache_mode(cache_mode_name);
1182 	} else if (!loadq) { /* In load path it is OK to pass NULL as cache mode */
1183 		SPDK_ERRLOG("No cache mode specified\n");
1184 		rc = -EINVAL;
1185 		goto error_free;
1186 	}
1187 	if (vbdev->cfg.cache.cache_mode < 0) {
1188 		SPDK_ERRLOG("Incorrect cache mode '%s'\n", cache_mode_name);
1189 		rc = -EINVAL;
1190 		goto error_free;
1191 	}
1192 
1193 	vbdev->name = strdup(vbdev_name);
1194 	if (!vbdev->name) {
1195 		goto error_mem;
1196 	}
1197 
1198 	vbdev->cache.name = strdup(cache_name);
1199 	if (!vbdev->cache.name) {
1200 		goto error_mem;
1201 	}
1202 
1203 	vbdev->core.name = strdup(core_name);
1204 	if (!vbdev->core.name) {
1205 		goto error_mem;
1206 	}
1207 
1208 	vbdev->cfg.loadq = loadq;
1209 	init_vbdev_config(vbdev);
1210 	TAILQ_INSERT_TAIL(&g_ocf_vbdev_head, vbdev, tailq);
1211 	return rc;
1212 
1213 error_mem:
1214 	rc = -ENOMEM;
1215 error_free:
1216 	free_vbdev(vbdev);
1217 	return rc;
1218 }
1219 
1220 /* Read configuration file at the start of SPDK application
1221  * This adds vbdevs to global list if some mentioned in config */
1222 static int
1223 vbdev_ocf_init(void)
1224 {
1225 	const char *vbdev_name, *modename, *cache_name, *core_name;
1226 	struct spdk_conf_section *sp;
1227 	int status;
1228 
1229 	status = vbdev_ocf_ctx_init();
1230 	if (status) {
1231 		SPDK_ERRLOG("OCF ctx initialization failed with=%d\n", status);
1232 		return status;
1233 	}
1234 
1235 	status = vbdev_ocf_volume_init();
1236 	if (status) {
1237 		vbdev_ocf_ctx_cleanup();
1238 		SPDK_ERRLOG("OCF volume initialization failed with=%d\n", status);
1239 		return status;
1240 	}
1241 
1242 	sp = spdk_conf_find_section(NULL, "OCF");
1243 	if (sp == NULL) {
1244 		return 0;
1245 	}
1246 
1247 	for (int i = 0; ; i++) {
1248 		if (!spdk_conf_section_get_nval(sp, "OCF", i)) {
1249 			break;
1250 		}
1251 
1252 		vbdev_name = spdk_conf_section_get_nmval(sp, "OCF", i, 0);
1253 		if (!vbdev_name) {
1254 			SPDK_ERRLOG("No vbdev name specified\n");
1255 			continue;
1256 		}
1257 
1258 		modename = spdk_conf_section_get_nmval(sp, "OCF", i, 1);
1259 		if (!modename) {
1260 			SPDK_ERRLOG("No modename specified for OCF vbdev '%s'\n", vbdev_name);
1261 			continue;
1262 		}
1263 
1264 		cache_name = spdk_conf_section_get_nmval(sp, "OCF", i, 2);
1265 		if (!cache_name) {
1266 			SPDK_ERRLOG("No cache device specified for OCF vbdev '%s'\n", vbdev_name);
1267 			continue;
1268 		}
1269 
1270 		core_name = spdk_conf_section_get_nmval(sp, "OCF", i, 3);
1271 		if (!core_name) {
1272 			SPDK_ERRLOG("No core devices specified for OCF vbdev '%s'\n", vbdev_name);
1273 			continue;
1274 		}
1275 
1276 		status = init_vbdev(vbdev_name, modename, cache_name, core_name, false);
1277 		if (status) {
1278 			SPDK_ERRLOG("Config initialization failed with code: %d\n", status);
1279 		}
1280 	}
1281 
1282 	return status;
1283 }
1284 
1285 /* Called after application shutdown started
1286  * Release memory of allocated structures here */
1287 static void
1288 vbdev_ocf_module_fini(void)
1289 {
1290 	struct vbdev_ocf *vbdev;
1291 
1292 	while ((vbdev = TAILQ_FIRST(&g_ocf_vbdev_head))) {
1293 		TAILQ_REMOVE(&g_ocf_vbdev_head, vbdev, tailq);
1294 		free_vbdev(vbdev);
1295 	}
1296 
1297 	vbdev_ocf_volume_cleanup();
1298 	vbdev_ocf_ctx_cleanup();
1299 }
1300 
1301 /* When base device gets unpluged this is called
1302  * We will unregister cache vbdev here
1303  * When cache device is removed, we delete every OCF bdev that used it */
1304 static void
1305 hotremove_cb(void *ctx)
1306 {
1307 	struct vbdev_ocf_base *base = ctx;
1308 	struct vbdev_ocf *vbdev;
1309 
1310 	if (!base->is_cache) {
1311 		if (base->parent->state.doing_finish) {
1312 			return;
1313 		}
1314 
1315 		SPDK_NOTICELOG("Deinitializing '%s' because its core device '%s' was removed\n",
1316 			       base->parent->name, base->name);
1317 		vbdev_ocf_delete(base->parent, NULL, NULL);
1318 		return;
1319 	}
1320 
1321 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1322 		if (vbdev->state.doing_finish) {
1323 			continue;
1324 		}
1325 		if (strcmp(base->name, vbdev->cache.name) == 0) {
1326 			SPDK_NOTICELOG("Deinitializing '%s' because"
1327 				       " its cache device '%s' was removed\n",
1328 				       vbdev->name, base->name);
1329 			vbdev_ocf_delete(vbdev, NULL, NULL);
1330 		}
1331 	}
1332 }
1333 
1334 /* Open base SPDK bdev and claim it */
1335 static int
1336 attach_base(struct vbdev_ocf_base *base)
1337 {
1338 	int status;
1339 
1340 	if (base->attached) {
1341 		return -EALREADY;
1342 	}
1343 
1344 	/* If base cache bdev was already opened by other vbdev,
1345 	 * we just copy its descriptor here */
1346 	if (base->is_cache) {
1347 		struct vbdev_ocf_base *existing = get_other_cache_base(base);
1348 		if (existing) {
1349 			base->desc = existing->desc;
1350 			base->attached = true;
1351 			return 0;
1352 		}
1353 	}
1354 
1355 	status = spdk_bdev_open(base->bdev, true, hotremove_cb, base, &base->desc);
1356 	if (status) {
1357 		SPDK_ERRLOG("Unable to open device '%s' for writing\n", base->name);
1358 		return status;
1359 	}
1360 
1361 	status = spdk_bdev_module_claim_bdev(base->bdev, base->desc,
1362 					     &ocf_if);
1363 	if (status) {
1364 		SPDK_ERRLOG("Unable to claim device '%s'\n", base->name);
1365 		spdk_bdev_close(base->desc);
1366 		return status;
1367 	}
1368 
1369 	base->attached = true;
1370 	return status;
1371 }
1372 
1373 /* Attach base bdevs */
1374 static int
1375 attach_base_bdevs(struct vbdev_ocf *vbdev,
1376 		  struct spdk_bdev *cache_bdev,
1377 		  struct spdk_bdev *core_bdev)
1378 {
1379 	int rc = 0;
1380 
1381 	if (cache_bdev) {
1382 		vbdev->cache.bdev = cache_bdev;
1383 		rc |= attach_base(&vbdev->cache);
1384 	}
1385 
1386 	if (core_bdev) {
1387 		vbdev->core.bdev = core_bdev;
1388 		rc |= attach_base(&vbdev->core);
1389 	}
1390 
1391 	return rc;
1392 }
1393 
1394 /* Init and then start vbdev if all base devices are present */
1395 void
1396 vbdev_ocf_construct(const char *vbdev_name,
1397 		    const char *cache_mode_name,
1398 		    const char *cache_name,
1399 		    const char *core_name,
1400 		    bool loadq,
1401 		    void (*cb)(int, struct vbdev_ocf *, void *),
1402 		    void *cb_arg)
1403 {
1404 	int rc;
1405 	struct spdk_bdev *cache_bdev = spdk_bdev_get_by_name(cache_name);
1406 	struct spdk_bdev *core_bdev = spdk_bdev_get_by_name(core_name);
1407 	struct vbdev_ocf *vbdev;
1408 
1409 	rc = init_vbdev(vbdev_name, cache_mode_name, cache_name, core_name, loadq);
1410 	if (rc) {
1411 		cb(rc, NULL, cb_arg);
1412 		return;
1413 	}
1414 
1415 	vbdev = vbdev_ocf_get_by_name(vbdev_name);
1416 	if (vbdev == NULL) {
1417 		cb(-ENODEV, NULL, cb_arg);
1418 		return;
1419 	}
1420 
1421 	if (cache_bdev == NULL) {
1422 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for cache device '%s' to connect\n",
1423 			       vbdev->name, cache_name);
1424 	}
1425 	if (core_bdev == NULL) {
1426 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for core device '%s' to connect\n",
1427 			       vbdev->name, core_name);
1428 	}
1429 
1430 	rc = attach_base_bdevs(vbdev, cache_bdev, core_bdev);
1431 	if (rc) {
1432 		cb(rc, vbdev, cb_arg);
1433 		return;
1434 	}
1435 
1436 	if (core_bdev && cache_bdev) {
1437 		register_vbdev(vbdev, cb, cb_arg);
1438 	} else {
1439 		cb(0, vbdev, cb_arg);
1440 	}
1441 }
1442 
1443 /* This called if new device is created in SPDK application
1444  * If that device named as one of base bdevs of OCF vbdev,
1445  * claim and open them */
1446 static void
1447 vbdev_ocf_examine(struct spdk_bdev *bdev)
1448 {
1449 	const char *bdev_name = spdk_bdev_get_name(bdev);
1450 	struct vbdev_ocf *vbdev;
1451 
1452 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1453 		if (vbdev->state.doing_finish) {
1454 			continue;
1455 		}
1456 
1457 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1458 			attach_base_bdevs(vbdev, bdev, NULL);
1459 			continue;
1460 		}
1461 		if (!strcmp(bdev_name, vbdev->core.name)) {
1462 			attach_base_bdevs(vbdev, NULL, bdev);
1463 			break;
1464 		}
1465 	}
1466 	spdk_bdev_module_examine_done(&ocf_if);
1467 }
1468 
1469 struct metadata_probe_ctx {
1470 	struct vbdev_ocf_base base;
1471 	ocf_volume_t volume;
1472 
1473 	struct ocf_volume_uuid *core_uuids;
1474 	unsigned int uuid_count;
1475 
1476 	int result;
1477 	int refcnt;
1478 };
1479 
1480 static void
1481 examine_ctx_put(struct metadata_probe_ctx *ctx)
1482 {
1483 	unsigned int i;
1484 
1485 	ctx->refcnt--;
1486 	if (ctx->refcnt > 0) {
1487 		return;
1488 	}
1489 
1490 	if (ctx->result) {
1491 		SPDK_ERRLOG("OCF metadata probe for bdev '%s' failed with %d\n",
1492 			    spdk_bdev_get_name(ctx->base.bdev), ctx->result);
1493 	}
1494 
1495 	if (ctx->base.desc) {
1496 		spdk_bdev_close(ctx->base.desc);
1497 	}
1498 
1499 	if (ctx->volume) {
1500 		ocf_volume_destroy(ctx->volume);
1501 	}
1502 
1503 	if (ctx->core_uuids) {
1504 		for (i = 0; i < ctx->uuid_count; i++) {
1505 			free(ctx->core_uuids[i].data);
1506 		}
1507 	}
1508 	free(ctx->core_uuids);
1509 
1510 	examine_done(ctx->result, NULL, ctx->base.bdev);
1511 	free(ctx);
1512 }
1513 
1514 static void
1515 metadata_probe_construct_cb(int rc, struct vbdev_ocf *vbdev, void *vctx)
1516 {
1517 	struct metadata_probe_ctx *ctx = vctx;
1518 
1519 	examine_ctx_put(ctx);
1520 }
1521 
1522 /* This is second callback for ocf_metadata_probe_cores()
1523  * Here we create vbdev configurations based on UUIDs */
1524 static void
1525 metadata_probe_cores_construct(void *priv, int error, unsigned int num_cores)
1526 {
1527 	struct metadata_probe_ctx *ctx = priv;
1528 	const char *vbdev_name;
1529 	const char *core_name;
1530 	unsigned int i;
1531 
1532 	if (error) {
1533 		ctx->result = error;
1534 		examine_ctx_put(ctx);
1535 		return;
1536 	}
1537 
1538 	for (i = 0; i < num_cores; i++) {
1539 		core_name = ocf_uuid_to_str(&ctx->core_uuids[i]);
1540 		vbdev_name = core_name + strlen(core_name) + 1;
1541 		ctx->refcnt++;
1542 		vbdev_ocf_construct(vbdev_name, NULL, ctx->base.bdev->name, core_name, true,
1543 				    metadata_probe_construct_cb, ctx);
1544 	}
1545 
1546 	examine_ctx_put(ctx);
1547 }
1548 
1549 /* This callback is called after OCF reads cores UUIDs from cache metadata
1550  * Here we allocate memory for those UUIDs and call ocf_metadata_probe_cores() again */
1551 static void
1552 metadata_probe_cores_get_num(void *priv, int error, unsigned int num_cores)
1553 {
1554 	struct metadata_probe_ctx *ctx = priv;
1555 	unsigned int i;
1556 
1557 	if (error) {
1558 		ctx->result = error;
1559 		examine_ctx_put(ctx);
1560 		return;
1561 	}
1562 
1563 	ctx->uuid_count = num_cores;
1564 	ctx->core_uuids = calloc(num_cores, sizeof(struct ocf_volume_uuid));
1565 	if (!ctx->core_uuids) {
1566 		ctx->result = -ENOMEM;
1567 		examine_ctx_put(ctx);
1568 		return;
1569 	}
1570 
1571 	for (i = 0; i < ctx->uuid_count; i++) {
1572 		ctx->core_uuids[i].size = OCF_VOLUME_UUID_MAX_SIZE;
1573 		ctx->core_uuids[i].data = malloc(OCF_VOLUME_UUID_MAX_SIZE);
1574 		if (!ctx->core_uuids[i].data) {
1575 			ctx->result = -ENOMEM;
1576 			examine_ctx_put(ctx);
1577 			return;
1578 		}
1579 	}
1580 
1581 	ocf_metadata_probe_cores(vbdev_ocf_ctx, ctx->volume, ctx->core_uuids, ctx->uuid_count,
1582 				 metadata_probe_cores_construct, ctx);
1583 }
1584 
1585 static void
1586 metadata_probe_cb(void *priv, int rc,
1587 		  struct ocf_metadata_probe_status *status)
1588 {
1589 	struct metadata_probe_ctx *ctx = priv;
1590 
1591 	if (rc) {
1592 		/* -ENODATA means device does not have cache metadata on it */
1593 		if (rc != -ENODATA) {
1594 			ctx->result = rc;
1595 		}
1596 		examine_ctx_put(ctx);
1597 		return;
1598 	}
1599 
1600 	ocf_metadata_probe_cores(vbdev_ocf_ctx, ctx->volume, NULL, 0,
1601 				 metadata_probe_cores_get_num, ctx);
1602 }
1603 
1604 /* This is called after vbdev_ocf_examine
1605  * It allows to delay application initialization
1606  * until all OCF bdevs get registered
1607  * If vbdev has all of its base devices it starts asynchronously here
1608  * We first check if bdev appears in configuration,
1609  * if not we do metadata_probe() to create its configuration from bdev metadata */
1610 static void
1611 vbdev_ocf_examine_disk(struct spdk_bdev *bdev)
1612 {
1613 	const char *bdev_name = spdk_bdev_get_name(bdev);
1614 	struct vbdev_ocf *vbdev;
1615 	struct metadata_probe_ctx *ctx;
1616 	bool created_from_config = false;
1617 	int rc;
1618 
1619 	examine_start(bdev);
1620 
1621 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1622 		if (vbdev->state.doing_finish || vbdev->state.started) {
1623 			continue;
1624 		}
1625 
1626 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1627 			examine_start(bdev);
1628 			register_vbdev(vbdev, examine_done, bdev);
1629 			created_from_config = true;
1630 			continue;
1631 		}
1632 		if (!strcmp(bdev_name, vbdev->core.name)) {
1633 			examine_start(bdev);
1634 			register_vbdev(vbdev, examine_done, bdev);
1635 			examine_done(0, NULL, bdev);
1636 			return;
1637 		}
1638 	}
1639 
1640 	/* If devices is discovered during config we do not check for metadata */
1641 	if (created_from_config) {
1642 		examine_done(0, NULL, bdev);
1643 		return;
1644 	}
1645 
1646 	/* Metadata probe path
1647 	 * We create temporary OCF volume and a temporary base structure
1648 	 * to use them for ocf_metadata_probe() and for bottom adapter IOs
1649 	 * Then we get UUIDs of core devices an create configurations based on them */
1650 	ctx = calloc(1, sizeof(*ctx));
1651 	if (!ctx) {
1652 		examine_done(-ENOMEM, NULL, bdev);
1653 		return;
1654 	}
1655 
1656 	ctx->base.bdev = bdev;
1657 	ctx->refcnt = 1;
1658 
1659 	rc = spdk_bdev_open(ctx->base.bdev, true, NULL, NULL, &ctx->base.desc);
1660 	if (rc) {
1661 		ctx->result = rc;
1662 		examine_ctx_put(ctx);
1663 		return;
1664 	}
1665 
1666 	rc = ocf_ctx_volume_create(vbdev_ocf_ctx, &ctx->volume, NULL, SPDK_OBJECT);
1667 	if (rc) {
1668 		ctx->result = rc;
1669 		examine_ctx_put(ctx);
1670 		return;
1671 	}
1672 
1673 	rc = ocf_volume_open(ctx->volume, &ctx->base);
1674 	if (rc) {
1675 		ctx->result = rc;
1676 		examine_ctx_put(ctx);
1677 		return;
1678 	}
1679 
1680 	ocf_metadata_probe(vbdev_ocf_ctx, ctx->volume, metadata_probe_cb, ctx);
1681 }
1682 
1683 static int
1684 vbdev_ocf_get_ctx_size(void)
1685 {
1686 	return sizeof(struct bdev_ocf_data);
1687 }
1688 
1689 static void
1690 fini_start(void)
1691 {
1692 	g_fini_started = true;
1693 }
1694 
1695 /* Module-global function table
1696  * Does not relate to vbdev instances */
1697 static struct spdk_bdev_module ocf_if = {
1698 	.name = "ocf",
1699 	.module_init = vbdev_ocf_init,
1700 	.fini_start = fini_start,
1701 	.module_fini = vbdev_ocf_module_fini,
1702 	.config_text = NULL,
1703 	.get_ctx_size = vbdev_ocf_get_ctx_size,
1704 	.examine_config = vbdev_ocf_examine,
1705 	.examine_disk   = vbdev_ocf_examine_disk,
1706 };
1707 SPDK_BDEV_MODULE_REGISTER(ocf, &ocf_if);
1708 
1709 SPDK_LOG_REGISTER_COMPONENT("vbdev_ocf", SPDK_TRACE_VBDEV_OCF)
1710