xref: /spdk/module/bdev/ocf/vbdev_ocf.c (revision 9889ab2dc80e40dae92dcef361d53dcba722043d)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include <ocf/ocf.h>
35 #include <ocf/ocf_types.h>
36 #include <ocf/ocf_mngt.h>
37 
38 #include "ctx.h"
39 #include "data.h"
40 #include "volume.h"
41 #include "utils.h"
42 #include "vbdev_ocf.h"
43 
44 #include "spdk/bdev_module.h"
45 #include "spdk/conf.h"
46 #include "spdk/io_channel.h"
47 #include "spdk/string.h"
48 #include "spdk_internal/log.h"
49 #include "spdk/cpuset.h"
50 
51 static struct spdk_bdev_module ocf_if;
52 
53 static TAILQ_HEAD(, vbdev_ocf) g_ocf_vbdev_head
54 	= TAILQ_HEAD_INITIALIZER(g_ocf_vbdev_head);
55 
56 static TAILQ_HEAD(, examining_bdev) g_ocf_examining_bdevs_head
57 	= TAILQ_HEAD_INITIALIZER(g_ocf_examining_bdevs_head);
58 
59 bool g_fini_started = false;
60 
61 /* Structure for keeping list of bdevs that are claimed but not used yet */
62 struct examining_bdev {
63 	struct spdk_bdev           *bdev;
64 	TAILQ_ENTRY(examining_bdev) tailq;
65 };
66 
67 /* Add bdev to list of claimed */
68 static void
69 examine_start(struct spdk_bdev *bdev)
70 {
71 	struct examining_bdev *entry = malloc(sizeof(*entry));
72 
73 	assert(entry);
74 	entry->bdev = bdev;
75 	TAILQ_INSERT_TAIL(&g_ocf_examining_bdevs_head, entry, tailq);
76 }
77 
78 /* Find bdev on list of claimed bdevs, then remove it,
79  * if it was the last one on list then report examine done */
80 static void
81 examine_done(int status, struct vbdev_ocf *vbdev, void *cb_arg)
82 {
83 	struct spdk_bdev *bdev = cb_arg;
84 	struct examining_bdev *entry, *safe, *found = NULL;
85 
86 	TAILQ_FOREACH_SAFE(entry, &g_ocf_examining_bdevs_head, tailq, safe) {
87 		if (entry->bdev == bdev) {
88 			if (found) {
89 				goto remove;
90 			} else {
91 				found = entry;
92 			}
93 		}
94 	}
95 
96 	assert(found);
97 	spdk_bdev_module_examine_done(&ocf_if);
98 
99 remove:
100 	TAILQ_REMOVE(&g_ocf_examining_bdevs_head, found, tailq);
101 	free(found);
102 }
103 
104 /* Free allocated strings and structure itself
105  * Used at shutdown only */
106 static void
107 free_vbdev(struct vbdev_ocf *vbdev)
108 {
109 	if (!vbdev) {
110 		return;
111 	}
112 
113 	free(vbdev->name);
114 	free(vbdev->cache.name);
115 	free(vbdev->core.name);
116 	free(vbdev);
117 }
118 
119 /* Get existing cache base
120  * that is attached to other vbdev */
121 static struct vbdev_ocf_base *
122 get_other_cache_base(struct vbdev_ocf_base *base)
123 {
124 	struct vbdev_ocf *vbdev;
125 
126 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
127 		if (&vbdev->cache == base || !vbdev->cache.attached) {
128 			continue;
129 		}
130 		if (!strcmp(vbdev->cache.name, base->name)) {
131 			return &vbdev->cache;
132 		}
133 	}
134 
135 	return NULL;
136 }
137 
138 /* Get existing OCF cache instance
139  * that is started by other vbdev */
140 static ocf_cache_t
141 get_other_cache_instance(struct vbdev_ocf *vbdev)
142 {
143 	struct vbdev_ocf *cmp;
144 
145 	TAILQ_FOREACH(cmp, &g_ocf_vbdev_head, tailq) {
146 		if (cmp->state.doing_finish || cmp == vbdev) {
147 			continue;
148 		}
149 		if (strcmp(cmp->cache.name, vbdev->cache.name)) {
150 			continue;
151 		}
152 		if (cmp->ocf_cache) {
153 			return cmp->ocf_cache;
154 		}
155 	}
156 
157 	return NULL;
158 }
159 
160 /* Close and unclaim base bdev */
161 static void
162 remove_base_bdev(struct vbdev_ocf_base *base)
163 {
164 	if (base->attached) {
165 		if (base->management_channel) {
166 			spdk_put_io_channel(base->management_channel);
167 		}
168 
169 		spdk_bdev_module_release_bdev(base->bdev);
170 		spdk_bdev_close(base->desc);
171 		base->attached = false;
172 	}
173 }
174 
175 /* Finish unregister operation */
176 static void
177 unregister_finish(struct vbdev_ocf *vbdev)
178 {
179 	spdk_bdev_destruct_done(&vbdev->exp_bdev, vbdev->state.stop_status);
180 	vbdev_ocf_cache_ctx_put(vbdev->cache_ctx);
181 	vbdev_ocf_mngt_continue(vbdev, 0);
182 }
183 
184 static void
185 close_core_bdev(struct vbdev_ocf *vbdev)
186 {
187 	remove_base_bdev(&vbdev->core);
188 	vbdev_ocf_mngt_continue(vbdev, 0);
189 }
190 
191 static void
192 remove_core_cmpl(void *priv, int error)
193 {
194 	struct vbdev_ocf *vbdev = priv;
195 
196 	ocf_mngt_cache_unlock(vbdev->ocf_cache);
197 	vbdev_ocf_mngt_continue(vbdev, error);
198 }
199 
200 /* Try to lock cache, then remove core */
201 static void
202 remove_core_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
203 {
204 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
205 
206 	if (error) {
207 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
208 			    error, vbdev->name);
209 		vbdev_ocf_mngt_continue(vbdev, error);
210 		return;
211 	}
212 
213 	ocf_mngt_cache_remove_core(vbdev->ocf_core, remove_core_cmpl, vbdev);
214 }
215 
216 /* Detach core base */
217 static void
218 detach_core(struct vbdev_ocf *vbdev)
219 {
220 	if (vbdev->ocf_cache && ocf_cache_is_running(vbdev->ocf_cache)) {
221 		ocf_mngt_cache_lock(vbdev->ocf_cache, remove_core_cache_lock_cmpl, vbdev);
222 	} else {
223 		vbdev_ocf_mngt_continue(vbdev, 0);
224 	}
225 }
226 
227 static void
228 close_cache_bdev(struct vbdev_ocf *vbdev)
229 {
230 	remove_base_bdev(&vbdev->cache);
231 	vbdev_ocf_mngt_continue(vbdev, 0);
232 }
233 
234 /* Detach cache base */
235 static void
236 detach_cache(struct vbdev_ocf *vbdev)
237 {
238 	vbdev->state.stop_status = vbdev->mngt_ctx.status;
239 
240 	/* If some other vbdev references this cache bdev,
241 	 * we detach this only by changing the flag, without actual close */
242 	if (get_other_cache_base(&vbdev->cache)) {
243 		vbdev->cache.attached = false;
244 	}
245 
246 	vbdev_ocf_mngt_continue(vbdev, 0);
247 }
248 
249 static void
250 stop_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
251 {
252 	struct vbdev_ocf *vbdev = priv;
253 
254 	vbdev_ocf_queue_put(vbdev->cache_ctx->mngt_queue);
255 	ocf_mngt_cache_unlock(cache);
256 
257 	vbdev_ocf_mngt_continue(vbdev, error);
258 }
259 
260 /* Try to lock cache, then stop it */
261 static void
262 stop_vbdev_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
263 {
264 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
265 
266 	if (error) {
267 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
268 			    error, vbdev->name);
269 		vbdev_ocf_mngt_continue(vbdev, error);
270 		return;
271 	}
272 
273 	ocf_mngt_cache_stop(vbdev->ocf_cache, stop_vbdev_cmpl, vbdev);
274 }
275 
276 /* Stop OCF cache object
277  * vbdev_ocf is not operational after this */
278 static void
279 stop_vbdev(struct vbdev_ocf *vbdev)
280 {
281 	if (!ocf_cache_is_running(vbdev->ocf_cache)) {
282 		vbdev_ocf_mngt_continue(vbdev, 0);
283 		return;
284 	}
285 
286 	if (!g_fini_started && get_other_cache_instance(vbdev)) {
287 		SPDK_NOTICELOG("Not stopping cache instance '%s'"
288 			       " because it is referenced by other OCF bdev\n",
289 			       vbdev->cache.name);
290 		vbdev_ocf_mngt_continue(vbdev, 0);
291 		return;
292 	}
293 
294 	ocf_mngt_cache_lock(vbdev->ocf_cache, stop_vbdev_cache_lock_cmpl, vbdev);
295 }
296 
297 static void
298 flush_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
299 {
300 	struct vbdev_ocf *vbdev = priv;
301 
302 	ocf_mngt_cache_unlock(cache);
303 	vbdev_ocf_mngt_continue(vbdev, error);
304 }
305 
306 static void
307 flush_vbdev_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
308 {
309 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
310 
311 	if (error) {
312 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
313 			    error, vbdev->name);
314 		vbdev_ocf_mngt_continue(vbdev, error);
315 		return;
316 	}
317 
318 	ocf_mngt_cache_flush(vbdev->ocf_cache, flush_vbdev_cmpl, vbdev);
319 }
320 
321 static void
322 flush_vbdev(struct vbdev_ocf *vbdev)
323 {
324 	if (!ocf_cache_is_running(vbdev->ocf_cache)) {
325 		vbdev_ocf_mngt_continue(vbdev, -EINVAL);
326 		return;
327 	}
328 
329 	ocf_mngt_cache_lock(vbdev->ocf_cache, flush_vbdev_cache_lock_cmpl, vbdev);
330 }
331 
332 /* Procedures called during dirty unregister */
333 vbdev_ocf_mngt_fn unregister_path_dirty[] = {
334 	flush_vbdev,
335 	stop_vbdev,
336 	detach_cache,
337 	close_cache_bdev,
338 	detach_core,
339 	close_core_bdev,
340 	unregister_finish,
341 	NULL
342 };
343 
344 /* Procedures called during clean unregister */
345 vbdev_ocf_mngt_fn unregister_path_clean[] = {
346 	flush_vbdev,
347 	detach_core,
348 	close_core_bdev,
349 	stop_vbdev,
350 	detach_cache,
351 	close_cache_bdev,
352 	unregister_finish,
353 	NULL
354 };
355 
356 /* Start asynchronous management operation using unregister_path */
357 static void
358 unregister_cb(void *opaque)
359 {
360 	struct vbdev_ocf *vbdev = opaque;
361 	vbdev_ocf_mngt_fn *unregister_path;
362 	int rc;
363 
364 	unregister_path = vbdev->state.doing_clean_delete ?
365 			  unregister_path_clean : unregister_path_dirty;
366 
367 	rc = vbdev_ocf_mngt_start(vbdev, unregister_path, NULL, NULL);
368 	if (rc) {
369 		SPDK_ERRLOG("Unable to unregister OCF bdev: %d\n", rc);
370 		spdk_bdev_destruct_done(&vbdev->exp_bdev, rc);
371 	}
372 }
373 
374 /* Clean remove case - remove core and then cache, this order
375  * will remove instance permanently */
376 static void
377 _vbdev_ocf_destruct_clean(struct vbdev_ocf *vbdev)
378 {
379 	if (vbdev->core.attached) {
380 		detach_core(vbdev);
381 		close_core_bdev(vbdev);
382 	}
383 
384 	if (vbdev->cache.attached) {
385 		detach_cache(vbdev);
386 		close_cache_bdev(vbdev);
387 	}
388 }
389 
390 /* Dirty shutdown/hot remove case - remove cache and then core, this order
391  * will allow us to recover this instance in the future */
392 static void
393 _vbdev_ocf_destruct_dirty(struct vbdev_ocf *vbdev)
394 {
395 	if (vbdev->cache.attached) {
396 		detach_cache(vbdev);
397 		close_cache_bdev(vbdev);
398 	}
399 
400 	if (vbdev->core.attached) {
401 		detach_core(vbdev);
402 		close_core_bdev(vbdev);
403 	}
404 }
405 
406 /* Unregister io device with callback to unregister_cb
407  * This function is called during spdk_bdev_unregister */
408 static int
409 vbdev_ocf_destruct(void *opaque)
410 {
411 	struct vbdev_ocf *vbdev = opaque;
412 
413 	if (vbdev->state.doing_finish) {
414 		return -EALREADY;
415 	}
416 
417 	if (vbdev->state.starting && !vbdev->state.started) {
418 		/* Prevent before detach cache/core during register path of
419 		  this bdev */
420 		return -EBUSY;
421 	}
422 
423 	vbdev->state.doing_finish = true;
424 
425 	if (vbdev->state.started) {
426 		spdk_io_device_unregister(vbdev, unregister_cb);
427 		/* Return 1 because unregister is delayed */
428 		return 1;
429 	}
430 
431 	if (vbdev->state.doing_clean_delete) {
432 		_vbdev_ocf_destruct_clean(vbdev);
433 	} else {
434 		_vbdev_ocf_destruct_dirty(vbdev);
435 	}
436 
437 	return 0;
438 }
439 
440 /* Stop OCF cache and unregister SPDK bdev */
441 int
442 vbdev_ocf_delete(struct vbdev_ocf *vbdev, void (*cb)(void *, int), void *cb_arg)
443 {
444 	int rc = 0;
445 
446 	if (vbdev->state.started) {
447 		spdk_bdev_unregister(&vbdev->exp_bdev, cb, cb_arg);
448 	} else {
449 		rc = vbdev_ocf_destruct(vbdev);
450 		if (rc == 0 && cb) {
451 			cb(cb_arg, 0);
452 		}
453 	}
454 
455 	return rc;
456 }
457 
458 /* Remove cores permanently and then stop OCF cache and unregister SPDK bdev */
459 int
460 vbdev_ocf_delete_clean(struct vbdev_ocf *vbdev, void (*cb)(void *, int),
461 		       void *cb_arg)
462 {
463 	vbdev->state.doing_clean_delete = true;
464 
465 	return vbdev_ocf_delete(vbdev, cb, cb_arg);
466 }
467 
468 
469 /* If vbdev is online, return its object */
470 struct vbdev_ocf *
471 vbdev_ocf_get_by_name(const char *name)
472 {
473 	struct vbdev_ocf *vbdev;
474 
475 	if (name == NULL) {
476 		assert(false);
477 		return NULL;
478 	}
479 
480 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
481 		if (vbdev->name == NULL || vbdev->state.doing_finish) {
482 			continue;
483 		}
484 		if (strcmp(vbdev->name, name) == 0) {
485 			return vbdev;
486 		}
487 	}
488 	return NULL;
489 }
490 
491 /* Return matching base if parent vbdev is online */
492 struct vbdev_ocf_base *
493 vbdev_ocf_get_base_by_name(const char *name)
494 {
495 	struct vbdev_ocf *vbdev;
496 
497 	if (name == NULL) {
498 		assert(false);
499 		return NULL;
500 	}
501 
502 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
503 		if (vbdev->state.doing_finish) {
504 			continue;
505 		}
506 
507 		if (vbdev->cache.name && strcmp(vbdev->cache.name, name) == 0) {
508 			return &vbdev->cache;
509 		}
510 		if (vbdev->core.name && strcmp(vbdev->core.name, name) == 0) {
511 			return &vbdev->core;
512 		}
513 	}
514 	return NULL;
515 }
516 
517 /* Execute fn for each OCF device that is online or waits for base devices */
518 void
519 vbdev_ocf_foreach(vbdev_ocf_foreach_fn fn, void *ctx)
520 {
521 	struct vbdev_ocf *vbdev;
522 
523 	assert(fn != NULL);
524 
525 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
526 		if (!vbdev->state.doing_finish) {
527 			fn(vbdev, ctx);
528 		}
529 	}
530 }
531 
532 /* Called from OCF when SPDK_IO is completed */
533 static void
534 vbdev_ocf_io_submit_cb(struct ocf_io *io, int error)
535 {
536 	struct spdk_bdev_io *bdev_io = io->priv1;
537 
538 	if (error == 0) {
539 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
540 	} else if (error == -ENOMEM) {
541 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
542 	} else {
543 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
544 	}
545 
546 	ocf_io_put(io);
547 }
548 
549 /* Configure io parameters and send it to OCF */
550 static int
551 io_submit_to_ocf(struct spdk_bdev_io *bdev_io, struct ocf_io *io)
552 {
553 	int dir;
554 	uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
555 	uint64_t offset = bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen;
556 
557 	switch (bdev_io->type) {
558 	case SPDK_BDEV_IO_TYPE_WRITE:
559 	case SPDK_BDEV_IO_TYPE_READ:
560 		dir = OCF_READ;
561 		if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
562 			dir = OCF_WRITE;
563 		}
564 		ocf_io_configure(io, offset, len, dir, 0, 0);
565 		ocf_core_submit_io(io);
566 		return 0;
567 	case SPDK_BDEV_IO_TYPE_FLUSH:
568 		ocf_io_configure(io, offset, len, OCF_WRITE, 0, OCF_WRITE_FLUSH);
569 		ocf_core_submit_flush(io);
570 		return 0;
571 	case SPDK_BDEV_IO_TYPE_UNMAP:
572 		ocf_io_configure(io, offset, len, 0, 0, 0);
573 		ocf_core_submit_discard(io);
574 		return 0;
575 	case SPDK_BDEV_IO_TYPE_RESET:
576 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
577 	default:
578 		SPDK_ERRLOG("Unsupported IO type: %d\n", bdev_io->type);
579 		return -EINVAL;
580 	}
581 }
582 
583 /* Submit SPDK-IO to OCF */
584 static void
585 io_handle(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
586 {
587 	struct vbdev_ocf *vbdev = bdev_io->bdev->ctxt;
588 	struct ocf_io *io = NULL;
589 	struct bdev_ocf_data *data = NULL;
590 	struct vbdev_ocf_qcxt *qctx = spdk_io_channel_get_ctx(ch);
591 	int err;
592 
593 	io = ocf_core_new_io(vbdev->ocf_core);
594 	if (!io) {
595 		err = -ENOMEM;
596 		goto fail;
597 	}
598 
599 	ocf_io_set_queue(io, qctx->queue);
600 
601 	data = vbdev_ocf_data_from_spdk_io(bdev_io);
602 	if (!data) {
603 		err = -ENOMEM;
604 		goto fail;
605 	}
606 
607 	err = ocf_io_set_data(io, data, 0);
608 	if (err) {
609 		goto fail;
610 	}
611 
612 	ocf_io_set_cmpl(io, bdev_io, NULL, vbdev_ocf_io_submit_cb);
613 
614 	err = io_submit_to_ocf(bdev_io, io);
615 	if (err) {
616 		goto fail;
617 	}
618 
619 	return;
620 
621 fail:
622 	if (io) {
623 		ocf_io_put(io);
624 	}
625 
626 	if (err == -ENOMEM) {
627 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
628 	} else {
629 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
630 	}
631 }
632 
633 static void
634 vbdev_ocf_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
635 		     bool success)
636 {
637 	if (!success) {
638 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
639 		return;
640 	}
641 
642 	io_handle(ch, bdev_io);
643 }
644 
645 /* Called from bdev layer when an io to Cache vbdev is submitted */
646 static void
647 vbdev_ocf_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
648 {
649 	switch (bdev_io->type) {
650 	case SPDK_BDEV_IO_TYPE_READ:
651 		/* User does not have to allocate io vectors for the request,
652 		 * so in case they are not allocated, we allocate them here */
653 		spdk_bdev_io_get_buf(bdev_io, vbdev_ocf_get_buf_cb,
654 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
655 		break;
656 	case SPDK_BDEV_IO_TYPE_WRITE:
657 	case SPDK_BDEV_IO_TYPE_FLUSH:
658 	case SPDK_BDEV_IO_TYPE_UNMAP:
659 		io_handle(ch, bdev_io);
660 		break;
661 	case SPDK_BDEV_IO_TYPE_RESET:
662 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
663 	default:
664 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
665 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
666 		break;
667 	}
668 }
669 
670 /* Called from bdev layer */
671 static bool
672 vbdev_ocf_io_type_supported(void *opaque, enum spdk_bdev_io_type io_type)
673 {
674 	struct vbdev_ocf *vbdev = opaque;
675 
676 	switch (io_type) {
677 	case SPDK_BDEV_IO_TYPE_READ:
678 	case SPDK_BDEV_IO_TYPE_WRITE:
679 	case SPDK_BDEV_IO_TYPE_FLUSH:
680 	case SPDK_BDEV_IO_TYPE_UNMAP:
681 		return spdk_bdev_io_type_supported(vbdev->core.bdev, io_type);
682 	case SPDK_BDEV_IO_TYPE_RESET:
683 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
684 	default:
685 		return false;
686 	}
687 }
688 
689 /* Called from bdev layer */
690 static struct spdk_io_channel *
691 vbdev_ocf_get_io_channel(void *opaque)
692 {
693 	struct vbdev_ocf *bdev = opaque;
694 
695 	return spdk_get_io_channel(bdev);
696 }
697 
698 static int
699 vbdev_ocf_dump_info_json(void *opaque, struct spdk_json_write_ctx *w)
700 {
701 	struct vbdev_ocf *vbdev = opaque;
702 
703 	spdk_json_write_named_string(w, "cache_device", vbdev->cache.name);
704 	spdk_json_write_named_string(w, "core_device", vbdev->core.name);
705 
706 	spdk_json_write_named_string(w, "mode",
707 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
708 	spdk_json_write_named_uint32(w, "cache_line_size",
709 				     ocf_cache_get_line_size(vbdev->ocf_cache));
710 	spdk_json_write_named_bool(w, "metadata_volatile",
711 				   vbdev->cfg.cache.metadata_volatile);
712 
713 	return 0;
714 }
715 
716 static void
717 vbdev_ocf_write_json_config(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
718 {
719 	struct vbdev_ocf *vbdev = bdev->ctxt;
720 
721 	spdk_json_write_object_begin(w);
722 
723 	spdk_json_write_named_string(w, "method", "bdev_ocf_create");
724 
725 	spdk_json_write_named_object_begin(w, "params");
726 	spdk_json_write_named_string(w, "name", vbdev->name);
727 	spdk_json_write_named_string(w, "mode",
728 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
729 	spdk_json_write_named_string(w, "cache_bdev_name", vbdev->cache.name);
730 	spdk_json_write_named_string(w, "core_bdev_name", vbdev->core.name);
731 	spdk_json_write_object_end(w);
732 
733 	spdk_json_write_object_end(w);
734 }
735 
736 /* Cache vbdev function table
737  * Used by bdev layer */
738 static struct spdk_bdev_fn_table cache_dev_fn_table = {
739 	.destruct = vbdev_ocf_destruct,
740 	.io_type_supported = vbdev_ocf_io_type_supported,
741 	.submit_request	= vbdev_ocf_submit_request,
742 	.get_io_channel	= vbdev_ocf_get_io_channel,
743 	.write_config_json = vbdev_ocf_write_json_config,
744 	.dump_info_json = vbdev_ocf_dump_info_json,
745 };
746 
747 /* Poller function for the OCF queue
748  * We execute OCF requests here synchronously */
749 static int
750 queue_poll(void *opaque)
751 {
752 	struct vbdev_ocf_qcxt *qctx = opaque;
753 	uint32_t iono = ocf_queue_pending_io(qctx->queue);
754 	int i, max = spdk_min(32, iono);
755 
756 	for (i = 0; i < max; i++) {
757 		ocf_queue_run_single(qctx->queue);
758 	}
759 
760 	if (iono > 0) {
761 		return 1;
762 	} else {
763 		return 0;
764 	}
765 }
766 
767 /* Called during ocf_submit_io, ocf_purge*
768  * and any other requests that need to submit io */
769 static void
770 vbdev_ocf_ctx_queue_kick(ocf_queue_t q)
771 {
772 }
773 
774 /* OCF queue deinitialization
775  * Called at ocf_cache_stop */
776 static void
777 vbdev_ocf_ctx_queue_stop(ocf_queue_t q)
778 {
779 	struct vbdev_ocf_qcxt *qctx = ocf_queue_get_priv(q);
780 
781 	if (qctx) {
782 		spdk_put_io_channel(qctx->cache_ch);
783 		spdk_put_io_channel(qctx->core_ch);
784 		spdk_poller_unregister(&qctx->poller);
785 		if (qctx->allocated) {
786 			free(qctx);
787 		}
788 	}
789 }
790 
791 /* Queue ops is an interface for running queue thread
792  * stop() operation in called just before queue gets destroyed */
793 const struct ocf_queue_ops queue_ops = {
794 	.kick_sync = vbdev_ocf_ctx_queue_kick,
795 	.kick = vbdev_ocf_ctx_queue_kick,
796 	.stop = vbdev_ocf_ctx_queue_stop,
797 };
798 
799 /* Called on cache vbdev creation at every thread
800  * We allocate OCF queues here and SPDK poller for it */
801 static int
802 io_device_create_cb(void *io_device, void *ctx_buf)
803 {
804 	struct vbdev_ocf *vbdev = io_device;
805 	struct vbdev_ocf_qcxt *qctx = ctx_buf;
806 	int rc;
807 
808 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &qctx->queue, &queue_ops);
809 	if (rc) {
810 		return rc;
811 	}
812 
813 	ocf_queue_set_priv(qctx->queue, qctx);
814 
815 	qctx->vbdev      = vbdev;
816 	qctx->cache_ch   = spdk_bdev_get_io_channel(vbdev->cache.desc);
817 	qctx->core_ch    = spdk_bdev_get_io_channel(vbdev->core.desc);
818 	qctx->poller     = spdk_poller_register(queue_poll, qctx, 0);
819 
820 	return rc;
821 }
822 
823 /* Called per thread
824  * Put OCF queue and relaunch poller with new context to finish pending requests */
825 static void
826 io_device_destroy_cb(void *io_device, void *ctx_buf)
827 {
828 	/* Making a copy of context to use it after io channel will be destroyed */
829 	struct vbdev_ocf_qcxt *copy = malloc(sizeof(*copy));
830 	struct vbdev_ocf_qcxt *qctx = ctx_buf;
831 
832 	if (copy) {
833 		ocf_queue_set_priv(qctx->queue, copy);
834 		memcpy(copy, qctx, sizeof(*copy));
835 		spdk_poller_unregister(&qctx->poller);
836 		copy->poller = spdk_poller_register(queue_poll, copy, 0);
837 		copy->allocated = true;
838 	} else {
839 		SPDK_ERRLOG("Unable to stop OCF queue properly: %s\n",
840 			    spdk_strerror(ENOMEM));
841 	}
842 
843 	vbdev_ocf_queue_put(qctx->queue);
844 }
845 
846 /* OCF management queue deinitialization */
847 static void
848 vbdev_ocf_ctx_mngt_queue_stop(ocf_queue_t q)
849 {
850 	struct spdk_poller *poller = ocf_queue_get_priv(q);
851 
852 	if (poller) {
853 		spdk_poller_unregister(&poller);
854 	}
855 }
856 
857 static int
858 mngt_queue_poll(void *opaque)
859 {
860 	ocf_queue_t q = opaque;
861 	uint32_t iono = ocf_queue_pending_io(q);
862 	int i, max = spdk_min(32, iono);
863 
864 	for (i = 0; i < max; i++) {
865 		ocf_queue_run_single(q);
866 	}
867 
868 	if (iono > 0) {
869 		return 1;
870 	} else {
871 		return 0;
872 	}
873 }
874 
875 static void
876 vbdev_ocf_ctx_mngt_queue_kick(ocf_queue_t q)
877 {
878 }
879 
880 /* Queue ops is an interface for running queue thread
881  * stop() operation in called just before queue gets destroyed */
882 const struct ocf_queue_ops mngt_queue_ops = {
883 	.kick_sync = NULL,
884 	.kick = vbdev_ocf_ctx_mngt_queue_kick,
885 	.stop = vbdev_ocf_ctx_mngt_queue_stop,
886 };
887 
888 static void
889 clear_starting_indicator_vbdev(struct vbdev_ocf *vbdev)
890 {
891 	vbdev->state.starting = false;
892 }
893 
894 /* Create exported spdk object */
895 static void
896 finish_register(struct vbdev_ocf *vbdev)
897 {
898 	int result;
899 
900 	/* Copy properties of the base bdev */
901 	vbdev->exp_bdev.blocklen = vbdev->core.bdev->blocklen;
902 	vbdev->exp_bdev.write_cache = vbdev->core.bdev->write_cache;
903 	vbdev->exp_bdev.required_alignment = vbdev->core.bdev->required_alignment;
904 
905 	vbdev->exp_bdev.name = vbdev->name;
906 	vbdev->exp_bdev.product_name = "SPDK OCF";
907 
908 	vbdev->exp_bdev.blockcnt = vbdev->core.bdev->blockcnt;
909 	vbdev->exp_bdev.ctxt = vbdev;
910 	vbdev->exp_bdev.fn_table = &cache_dev_fn_table;
911 	vbdev->exp_bdev.module = &ocf_if;
912 
913 	/* Finally register vbdev in SPDK */
914 	spdk_io_device_register(vbdev, io_device_create_cb, io_device_destroy_cb,
915 				sizeof(struct vbdev_ocf_qcxt), vbdev->name);
916 	result = spdk_bdev_register(&vbdev->exp_bdev);
917 	if (result) {
918 		SPDK_ERRLOG("Could not register exposed bdev %s\n",
919 			    vbdev->name);
920 		clear_starting_indicator_vbdev(vbdev);
921 		vbdev_ocf_mngt_stop(vbdev, unregister_path_dirty, result);
922 		return;
923 	} else {
924 		vbdev->state.started = true;
925 	}
926 
927 	vbdev_ocf_mngt_continue(vbdev, result);
928 }
929 
930 static void
931 add_core_cmpl(ocf_cache_t cache, ocf_core_t core, void *priv, int error)
932 {
933 	struct vbdev_ocf *vbdev = priv;
934 
935 	ocf_mngt_cache_unlock(cache);
936 
937 	if (error) {
938 		SPDK_ERRLOG("Error %d, failed to add core device to cache instance %s,"
939 			    "starting rollback\n", error, vbdev->name);
940 		clear_starting_indicator_vbdev(vbdev);
941 		vbdev_ocf_mngt_stop(vbdev, unregister_path_dirty, error);
942 		return;
943 	} else {
944 		vbdev->ocf_core = core;
945 		vbdev->core.id  = ocf_core_get_id(core);
946 	}
947 
948 	vbdev_ocf_mngt_continue(vbdev, error);
949 }
950 
951 /* Try to lock cache, then add core */
952 static void
953 add_core_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
954 {
955 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
956 
957 	if (error) {
958 		SPDK_ERRLOG("Error %d, can not lock cache instance %s,"
959 			    "starting rollback\n", error, vbdev->name);
960 		clear_starting_indicator_vbdev(vbdev);
961 		vbdev_ocf_mngt_stop(vbdev, unregister_path_dirty, error);
962 	}
963 	ocf_mngt_cache_add_core(vbdev->ocf_cache, &vbdev->cfg.core, add_core_cmpl, vbdev);
964 }
965 
966 /* Add core for existing OCF cache instance */
967 static void
968 add_core(struct vbdev_ocf *vbdev)
969 {
970 	ocf_mngt_cache_lock(vbdev->ocf_cache, add_core_cache_lock_cmpl, vbdev);
971 }
972 
973 static void
974 start_cache_cmpl(ocf_cache_t cache, void *priv, int error)
975 {
976 	struct vbdev_ocf *vbdev = priv;
977 
978 	ocf_mngt_cache_unlock(cache);
979 
980 	if (error) {
981 		SPDK_ERRLOG("Error %d during start cache %s, starting rollback\n",
982 			    error, vbdev->name);
983 		clear_starting_indicator_vbdev(vbdev);
984 		vbdev_ocf_mngt_stop(vbdev, unregister_path_dirty, error);
985 		return;
986 	}
987 
988 	vbdev_ocf_mngt_continue(vbdev, error);
989 }
990 
991 static int
992 create_management_queue(struct vbdev_ocf *vbdev)
993 {
994 	struct spdk_poller *mngt_poller;
995 	int rc;
996 
997 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &vbdev->cache_ctx->mngt_queue, &mngt_queue_ops);
998 	if (rc) {
999 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1000 		return rc;
1001 	}
1002 
1003 	mngt_poller = spdk_poller_register(mngt_queue_poll, vbdev->cache_ctx->mngt_queue, 100);
1004 	if (mngt_poller == NULL) {
1005 		SPDK_ERRLOG("Unable to initiate mngt request: %s", spdk_strerror(ENOMEM));
1006 		return -ENOMEM;
1007 	}
1008 
1009 	ocf_queue_set_priv(vbdev->cache_ctx->mngt_queue, mngt_poller);
1010 	ocf_mngt_cache_set_mngt_queue(vbdev->ocf_cache, vbdev->cache_ctx->mngt_queue);
1011 
1012 	return 0;
1013 }
1014 
1015 /* Start OCF cache, attach caching device */
1016 static void
1017 start_cache(struct vbdev_ocf *vbdev)
1018 {
1019 	ocf_cache_t existing;
1020 	int rc;
1021 
1022 	if (vbdev->ocf_cache) {
1023 		vbdev_ocf_mngt_stop(vbdev, NULL, -EALREADY);
1024 		return;
1025 	}
1026 
1027 	existing = get_other_cache_instance(vbdev);
1028 	if (existing) {
1029 		SPDK_NOTICELOG("OCF bdev %s connects to existing cache device %s\n",
1030 			       vbdev->name, vbdev->cache.name);
1031 		vbdev->ocf_cache = existing;
1032 		vbdev->cache.id = ocf_cache_get_id(existing);
1033 		vbdev->cache_ctx = ocf_cache_get_priv(existing);
1034 		vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1035 		vbdev_ocf_mngt_continue(vbdev, 0);
1036 		return;
1037 	}
1038 
1039 	vbdev->cache_ctx = calloc(1, sizeof(struct vbdev_ocf_cache_ctx));
1040 	if (vbdev->cache_ctx == NULL) {
1041 		clear_starting_indicator_vbdev(vbdev);
1042 		vbdev_ocf_mngt_stop(vbdev, unregister_path_dirty, -ENOMEM);
1043 		return;
1044 	}
1045 
1046 	vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1047 	pthread_mutex_init(&vbdev->cache_ctx->lock, NULL);
1048 
1049 	rc = ocf_mngt_cache_start(vbdev_ocf_ctx, &vbdev->ocf_cache, &vbdev->cfg.cache);
1050 	if (rc) {
1051 		clear_starting_indicator_vbdev(vbdev);
1052 		vbdev_ocf_mngt_stop(vbdev, unregister_path_dirty, rc);
1053 		return;
1054 	}
1055 
1056 	vbdev->cache.id = ocf_cache_get_id(vbdev->ocf_cache);
1057 	ocf_cache_set_priv(vbdev->ocf_cache, vbdev->cache_ctx);
1058 
1059 	rc = create_management_queue(vbdev);
1060 	if (rc) {
1061 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1062 		clear_starting_indicator_vbdev(vbdev);
1063 		vbdev_ocf_mngt_stop(vbdev, unregister_path_dirty, rc);
1064 		return;
1065 	}
1066 
1067 	if (vbdev->cfg.loadq) {
1068 		ocf_mngt_cache_load(vbdev->ocf_cache, &vbdev->cfg.device, start_cache_cmpl, vbdev);
1069 	} else {
1070 		ocf_mngt_cache_attach(vbdev->ocf_cache, &vbdev->cfg.device, start_cache_cmpl, vbdev);
1071 	}
1072 }
1073 
1074 /* Procedures called during register operation */
1075 vbdev_ocf_mngt_fn register_path[] = {
1076 	start_cache,
1077 	add_core,
1078 	finish_register,
1079 	NULL
1080 };
1081 
1082 /* Start cache instance and register OCF bdev */
1083 static void
1084 register_vbdev(struct vbdev_ocf *vbdev, vbdev_ocf_mngt_callback cb, void *cb_arg)
1085 {
1086 	int rc;
1087 
1088 	if (!(vbdev->core.attached && vbdev->cache.attached) || vbdev->state.started) {
1089 		cb(-EPERM, vbdev, cb_arg);
1090 		return;
1091 	}
1092 
1093 	vbdev->state.starting = true;
1094 	rc = vbdev_ocf_mngt_start(vbdev, register_path, cb, cb_arg);
1095 	if (rc) {
1096 		cb(rc, vbdev, cb_arg);
1097 	}
1098 }
1099 
1100 /* Init OCF configuration options
1101  * for core and cache devices */
1102 static void
1103 init_vbdev_config(struct vbdev_ocf *vbdev)
1104 {
1105 	struct vbdev_ocf_config *cfg = &vbdev->cfg;
1106 
1107 	/* Id 0 means OCF decides the id */
1108 	cfg->cache.id = 0;
1109 	cfg->cache.name = vbdev->name;
1110 
1111 	/* TODO [metadata]: make configurable with persistent
1112 	 * metadata support */
1113 	cfg->cache.metadata_volatile = false;
1114 
1115 	/* TODO [cache line size]: make cache line size configurable
1116 	 * Using standard 4KiB for now */
1117 	cfg->cache.cache_line_size = ocf_cache_line_size_4;
1118 
1119 	/* This are suggested values that
1120 	 * should be sufficient for most use cases */
1121 	cfg->cache.backfill.max_queue_size = 65536;
1122 	cfg->cache.backfill.queue_unblock_size = 60000;
1123 
1124 	/* TODO [cache line size] */
1125 	cfg->device.cache_line_size = ocf_cache_line_size_4;
1126 	cfg->device.force = true;
1127 	cfg->device.perform_test = false;
1128 	cfg->device.discard_on_start = false;
1129 
1130 	vbdev->cfg.cache.locked = true;
1131 
1132 	cfg->core.volume_type = SPDK_OBJECT;
1133 	cfg->device.volume_type = SPDK_OBJECT;
1134 	cfg->core.core_id = OCF_CORE_MAX;
1135 
1136 	if (vbdev->cfg.loadq) {
1137 		/* When doing cache_load(), we need to set try_add to true,
1138 		 * otherwise OCF will interpret this core as new
1139 		 * instead of the inactive one */
1140 		vbdev->cfg.core.try_add = true;
1141 	}
1142 
1143 	/* Serialize bdev names in OCF UUID to interpret on future loads
1144 	 * Core UUID is pair of (core bdev name, cache bdev name)
1145 	 * Cache UUID is cache bdev name */
1146 	cfg->device.uuid.size = strlen(vbdev->cache.name) + 1;
1147 	cfg->device.uuid.data = vbdev->cache.name;
1148 
1149 	snprintf(vbdev->uuid, VBDEV_OCF_MD_MAX_LEN, "%s %s",
1150 		 vbdev->core.name, vbdev->name);
1151 	cfg->core.uuid.size = strlen(vbdev->uuid) + 1;
1152 	cfg->core.uuid.data = vbdev->uuid;
1153 	vbdev->uuid[strlen(vbdev->core.name)] = 0;
1154 }
1155 
1156 /* Allocate vbdev structure object and add it to the global list */
1157 static int
1158 init_vbdev(const char *vbdev_name,
1159 	   const char *cache_mode_name,
1160 	   const char *cache_name,
1161 	   const char *core_name,
1162 	   bool loadq)
1163 {
1164 	struct vbdev_ocf *vbdev;
1165 	int rc = 0;
1166 
1167 	if (spdk_bdev_get_by_name(vbdev_name) || vbdev_ocf_get_by_name(vbdev_name)) {
1168 		SPDK_ERRLOG("Device with name '%s' already exists\n", vbdev_name);
1169 		return -EPERM;
1170 	}
1171 
1172 	vbdev = calloc(1, sizeof(*vbdev));
1173 	if (!vbdev) {
1174 		goto error_mem;
1175 	}
1176 
1177 	vbdev->cache.parent = vbdev;
1178 	vbdev->core.parent = vbdev;
1179 	vbdev->cache.is_cache = true;
1180 	vbdev->core.is_cache = false;
1181 
1182 	if (cache_mode_name) {
1183 		vbdev->cfg.cache.cache_mode
1184 			= ocf_get_cache_mode(cache_mode_name);
1185 	} else if (!loadq) { /* In load path it is OK to pass NULL as cache mode */
1186 		SPDK_ERRLOG("No cache mode specified\n");
1187 		rc = -EINVAL;
1188 		goto error_free;
1189 	}
1190 	if (vbdev->cfg.cache.cache_mode < 0) {
1191 		SPDK_ERRLOG("Incorrect cache mode '%s'\n", cache_mode_name);
1192 		rc = -EINVAL;
1193 		goto error_free;
1194 	}
1195 
1196 	vbdev->name = strdup(vbdev_name);
1197 	if (!vbdev->name) {
1198 		goto error_mem;
1199 	}
1200 
1201 	vbdev->cache.name = strdup(cache_name);
1202 	if (!vbdev->cache.name) {
1203 		goto error_mem;
1204 	}
1205 
1206 	vbdev->core.name = strdup(core_name);
1207 	if (!vbdev->core.name) {
1208 		goto error_mem;
1209 	}
1210 
1211 	vbdev->cfg.loadq = loadq;
1212 	init_vbdev_config(vbdev);
1213 	TAILQ_INSERT_TAIL(&g_ocf_vbdev_head, vbdev, tailq);
1214 	return rc;
1215 
1216 error_mem:
1217 	rc = -ENOMEM;
1218 error_free:
1219 	free_vbdev(vbdev);
1220 	return rc;
1221 }
1222 
1223 /* Read configuration file at the start of SPDK application
1224  * This adds vbdevs to global list if some mentioned in config */
1225 static int
1226 vbdev_ocf_init(void)
1227 {
1228 	const char *vbdev_name, *modename, *cache_name, *core_name;
1229 	struct spdk_conf_section *sp;
1230 	int status;
1231 
1232 	status = vbdev_ocf_ctx_init();
1233 	if (status) {
1234 		SPDK_ERRLOG("OCF ctx initialization failed with=%d\n", status);
1235 		return status;
1236 	}
1237 
1238 	status = vbdev_ocf_volume_init();
1239 	if (status) {
1240 		vbdev_ocf_ctx_cleanup();
1241 		SPDK_ERRLOG("OCF volume initialization failed with=%d\n", status);
1242 		return status;
1243 	}
1244 
1245 	sp = spdk_conf_find_section(NULL, "OCF");
1246 	if (sp == NULL) {
1247 		return 0;
1248 	}
1249 
1250 	for (int i = 0; ; i++) {
1251 		if (!spdk_conf_section_get_nval(sp, "OCF", i)) {
1252 			break;
1253 		}
1254 
1255 		vbdev_name = spdk_conf_section_get_nmval(sp, "OCF", i, 0);
1256 		if (!vbdev_name) {
1257 			SPDK_ERRLOG("No vbdev name specified\n");
1258 			continue;
1259 		}
1260 
1261 		modename = spdk_conf_section_get_nmval(sp, "OCF", i, 1);
1262 		if (!modename) {
1263 			SPDK_ERRLOG("No modename specified for OCF vbdev '%s'\n", vbdev_name);
1264 			continue;
1265 		}
1266 
1267 		cache_name = spdk_conf_section_get_nmval(sp, "OCF", i, 2);
1268 		if (!cache_name) {
1269 			SPDK_ERRLOG("No cache device specified for OCF vbdev '%s'\n", vbdev_name);
1270 			continue;
1271 		}
1272 
1273 		core_name = spdk_conf_section_get_nmval(sp, "OCF", i, 3);
1274 		if (!core_name) {
1275 			SPDK_ERRLOG("No core devices specified for OCF vbdev '%s'\n", vbdev_name);
1276 			continue;
1277 		}
1278 
1279 		status = init_vbdev(vbdev_name, modename, cache_name, core_name, false);
1280 		if (status) {
1281 			SPDK_ERRLOG("Config initialization failed with code: %d\n", status);
1282 		}
1283 	}
1284 
1285 	return status;
1286 }
1287 
1288 /* Called after application shutdown started
1289  * Release memory of allocated structures here */
1290 static void
1291 vbdev_ocf_module_fini(void)
1292 {
1293 	struct vbdev_ocf *vbdev;
1294 
1295 	while ((vbdev = TAILQ_FIRST(&g_ocf_vbdev_head))) {
1296 		TAILQ_REMOVE(&g_ocf_vbdev_head, vbdev, tailq);
1297 		free_vbdev(vbdev);
1298 	}
1299 
1300 	vbdev_ocf_volume_cleanup();
1301 	vbdev_ocf_ctx_cleanup();
1302 }
1303 
1304 /* When base device gets unpluged this is called
1305  * We will unregister cache vbdev here
1306  * When cache device is removed, we delete every OCF bdev that used it */
1307 static void
1308 hotremove_cb(void *ctx)
1309 {
1310 	struct vbdev_ocf_base *base = ctx;
1311 	struct vbdev_ocf *vbdev;
1312 
1313 	if (!base->is_cache) {
1314 		if (base->parent->state.doing_finish) {
1315 			return;
1316 		}
1317 
1318 		SPDK_NOTICELOG("Deinitializing '%s' because its core device '%s' was removed\n",
1319 			       base->parent->name, base->name);
1320 		vbdev_ocf_delete(base->parent, NULL, NULL);
1321 		return;
1322 	}
1323 
1324 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1325 		if (vbdev->state.doing_finish) {
1326 			continue;
1327 		}
1328 		if (strcmp(base->name, vbdev->cache.name) == 0) {
1329 			SPDK_NOTICELOG("Deinitializing '%s' because"
1330 				       " its cache device '%s' was removed\n",
1331 				       vbdev->name, base->name);
1332 			vbdev_ocf_delete(vbdev, NULL, NULL);
1333 		}
1334 	}
1335 }
1336 
1337 /* Open base SPDK bdev and claim it */
1338 static int
1339 attach_base(struct vbdev_ocf_base *base)
1340 {
1341 	int status;
1342 
1343 	if (base->attached) {
1344 		return -EALREADY;
1345 	}
1346 
1347 	/* If base cache bdev was already opened by other vbdev,
1348 	 * we just copy its descriptor here */
1349 	if (base->is_cache) {
1350 		struct vbdev_ocf_base *existing = get_other_cache_base(base);
1351 		if (existing) {
1352 			base->desc = existing->desc;
1353 			base->management_channel = existing->management_channel;
1354 			base->attached = true;
1355 			return 0;
1356 		}
1357 	}
1358 
1359 	status = spdk_bdev_open(base->bdev, true, hotremove_cb, base, &base->desc);
1360 	if (status) {
1361 		SPDK_ERRLOG("Unable to open device '%s' for writing\n", base->name);
1362 		return status;
1363 	}
1364 
1365 	status = spdk_bdev_module_claim_bdev(base->bdev, base->desc,
1366 					     &ocf_if);
1367 	if (status) {
1368 		SPDK_ERRLOG("Unable to claim device '%s'\n", base->name);
1369 		spdk_bdev_close(base->desc);
1370 		return status;
1371 	}
1372 
1373 	base->management_channel = spdk_bdev_get_io_channel(base->desc);
1374 	if (!base->management_channel) {
1375 		SPDK_ERRLOG("Unable to get io channel '%s'\n", base->name);
1376 		spdk_bdev_module_release_bdev(base->bdev);
1377 		spdk_bdev_close(base->desc);
1378 		return -ENOMEM;
1379 	}
1380 
1381 	base->attached = true;
1382 	return status;
1383 }
1384 
1385 /* Attach base bdevs */
1386 static int
1387 attach_base_bdevs(struct vbdev_ocf *vbdev,
1388 		  struct spdk_bdev *cache_bdev,
1389 		  struct spdk_bdev *core_bdev)
1390 {
1391 	int rc = 0;
1392 
1393 	if (cache_bdev) {
1394 		vbdev->cache.bdev = cache_bdev;
1395 		rc |= attach_base(&vbdev->cache);
1396 	}
1397 
1398 	if (core_bdev) {
1399 		vbdev->core.bdev = core_bdev;
1400 		rc |= attach_base(&vbdev->core);
1401 	}
1402 
1403 	return rc;
1404 }
1405 
1406 /* Init and then start vbdev if all base devices are present */
1407 void
1408 vbdev_ocf_construct(const char *vbdev_name,
1409 		    const char *cache_mode_name,
1410 		    const char *cache_name,
1411 		    const char *core_name,
1412 		    bool loadq,
1413 		    void (*cb)(int, struct vbdev_ocf *, void *),
1414 		    void *cb_arg)
1415 {
1416 	int rc;
1417 	struct spdk_bdev *cache_bdev = spdk_bdev_get_by_name(cache_name);
1418 	struct spdk_bdev *core_bdev = spdk_bdev_get_by_name(core_name);
1419 	struct vbdev_ocf *vbdev;
1420 
1421 	rc = init_vbdev(vbdev_name, cache_mode_name, cache_name, core_name, loadq);
1422 	if (rc) {
1423 		cb(rc, NULL, cb_arg);
1424 		return;
1425 	}
1426 
1427 	vbdev = vbdev_ocf_get_by_name(vbdev_name);
1428 	if (vbdev == NULL) {
1429 		cb(-ENODEV, NULL, cb_arg);
1430 		return;
1431 	}
1432 
1433 	if (cache_bdev == NULL) {
1434 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for cache device '%s' to connect\n",
1435 			       vbdev->name, cache_name);
1436 	}
1437 	if (core_bdev == NULL) {
1438 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for core device '%s' to connect\n",
1439 			       vbdev->name, core_name);
1440 	}
1441 
1442 	rc = attach_base_bdevs(vbdev, cache_bdev, core_bdev);
1443 	if (rc) {
1444 		cb(rc, vbdev, cb_arg);
1445 		return;
1446 	}
1447 
1448 	if (core_bdev && cache_bdev) {
1449 		register_vbdev(vbdev, cb, cb_arg);
1450 	} else {
1451 		cb(0, vbdev, cb_arg);
1452 	}
1453 }
1454 
1455 /* This called if new device is created in SPDK application
1456  * If that device named as one of base bdevs of OCF vbdev,
1457  * claim and open them */
1458 static void
1459 vbdev_ocf_examine(struct spdk_bdev *bdev)
1460 {
1461 	const char *bdev_name = spdk_bdev_get_name(bdev);
1462 	struct vbdev_ocf *vbdev;
1463 
1464 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1465 		if (vbdev->state.doing_finish) {
1466 			continue;
1467 		}
1468 
1469 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1470 			attach_base_bdevs(vbdev, bdev, NULL);
1471 			continue;
1472 		}
1473 		if (!strcmp(bdev_name, vbdev->core.name)) {
1474 			attach_base_bdevs(vbdev, NULL, bdev);
1475 			break;
1476 		}
1477 	}
1478 	spdk_bdev_module_examine_done(&ocf_if);
1479 }
1480 
1481 struct metadata_probe_ctx {
1482 	struct vbdev_ocf_base base;
1483 	ocf_volume_t volume;
1484 
1485 	struct ocf_volume_uuid *core_uuids;
1486 	unsigned int uuid_count;
1487 
1488 	int result;
1489 	int refcnt;
1490 };
1491 
1492 static void
1493 examine_ctx_put(struct metadata_probe_ctx *ctx)
1494 {
1495 	unsigned int i;
1496 
1497 	ctx->refcnt--;
1498 	if (ctx->refcnt > 0) {
1499 		return;
1500 	}
1501 
1502 	if (ctx->result) {
1503 		SPDK_ERRLOG("OCF metadata probe for bdev '%s' failed with %d\n",
1504 			    spdk_bdev_get_name(ctx->base.bdev), ctx->result);
1505 	}
1506 
1507 	if (ctx->base.desc) {
1508 		spdk_bdev_close(ctx->base.desc);
1509 	}
1510 
1511 	if (ctx->volume) {
1512 		ocf_volume_destroy(ctx->volume);
1513 	}
1514 
1515 	if (ctx->core_uuids) {
1516 		for (i = 0; i < ctx->uuid_count; i++) {
1517 			free(ctx->core_uuids[i].data);
1518 		}
1519 	}
1520 	free(ctx->core_uuids);
1521 
1522 	examine_done(ctx->result, NULL, ctx->base.bdev);
1523 	free(ctx);
1524 }
1525 
1526 static void
1527 metadata_probe_construct_cb(int rc, struct vbdev_ocf *vbdev, void *vctx)
1528 {
1529 	struct metadata_probe_ctx *ctx = vctx;
1530 
1531 	examine_ctx_put(ctx);
1532 }
1533 
1534 /* This is second callback for ocf_metadata_probe_cores()
1535  * Here we create vbdev configurations based on UUIDs */
1536 static void
1537 metadata_probe_cores_construct(void *priv, int error, unsigned int num_cores)
1538 {
1539 	struct metadata_probe_ctx *ctx = priv;
1540 	const char *vbdev_name;
1541 	const char *core_name;
1542 	unsigned int i;
1543 
1544 	if (error) {
1545 		ctx->result = error;
1546 		examine_ctx_put(ctx);
1547 		return;
1548 	}
1549 
1550 	for (i = 0; i < num_cores; i++) {
1551 		core_name = ocf_uuid_to_str(&ctx->core_uuids[i]);
1552 		vbdev_name = core_name + strlen(core_name) + 1;
1553 		ctx->refcnt++;
1554 		vbdev_ocf_construct(vbdev_name, NULL, ctx->base.bdev->name, core_name, true,
1555 				    metadata_probe_construct_cb, ctx);
1556 	}
1557 
1558 	examine_ctx_put(ctx);
1559 }
1560 
1561 /* This callback is called after OCF reads cores UUIDs from cache metadata
1562  * Here we allocate memory for those UUIDs and call ocf_metadata_probe_cores() again */
1563 static void
1564 metadata_probe_cores_get_num(void *priv, int error, unsigned int num_cores)
1565 {
1566 	struct metadata_probe_ctx *ctx = priv;
1567 	unsigned int i;
1568 
1569 	if (error) {
1570 		ctx->result = error;
1571 		examine_ctx_put(ctx);
1572 		return;
1573 	}
1574 
1575 	ctx->uuid_count = num_cores;
1576 	ctx->core_uuids = calloc(num_cores, sizeof(struct ocf_volume_uuid));
1577 	if (!ctx->core_uuids) {
1578 		ctx->result = -ENOMEM;
1579 		examine_ctx_put(ctx);
1580 		return;
1581 	}
1582 
1583 	for (i = 0; i < ctx->uuid_count; i++) {
1584 		ctx->core_uuids[i].size = OCF_VOLUME_UUID_MAX_SIZE;
1585 		ctx->core_uuids[i].data = malloc(OCF_VOLUME_UUID_MAX_SIZE);
1586 		if (!ctx->core_uuids[i].data) {
1587 			ctx->result = -ENOMEM;
1588 			examine_ctx_put(ctx);
1589 			return;
1590 		}
1591 	}
1592 
1593 	ocf_metadata_probe_cores(vbdev_ocf_ctx, ctx->volume, ctx->core_uuids, ctx->uuid_count,
1594 				 metadata_probe_cores_construct, ctx);
1595 }
1596 
1597 static void
1598 metadata_probe_cb(void *priv, int rc,
1599 		  struct ocf_metadata_probe_status *status)
1600 {
1601 	struct metadata_probe_ctx *ctx = priv;
1602 
1603 	if (rc) {
1604 		/* -ENODATA means device does not have cache metadata on it */
1605 		if (rc != -OCF_ERR_NO_METADATA) {
1606 			ctx->result = rc;
1607 		}
1608 		examine_ctx_put(ctx);
1609 		return;
1610 	}
1611 
1612 	ocf_metadata_probe_cores(vbdev_ocf_ctx, ctx->volume, NULL, 0,
1613 				 metadata_probe_cores_get_num, ctx);
1614 }
1615 
1616 /* This is called after vbdev_ocf_examine
1617  * It allows to delay application initialization
1618  * until all OCF bdevs get registered
1619  * If vbdev has all of its base devices it starts asynchronously here
1620  * We first check if bdev appears in configuration,
1621  * if not we do metadata_probe() to create its configuration from bdev metadata */
1622 static void
1623 vbdev_ocf_examine_disk(struct spdk_bdev *bdev)
1624 {
1625 	const char *bdev_name = spdk_bdev_get_name(bdev);
1626 	struct vbdev_ocf *vbdev;
1627 	struct metadata_probe_ctx *ctx;
1628 	bool created_from_config = false;
1629 	int rc;
1630 
1631 	examine_start(bdev);
1632 
1633 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1634 		if (vbdev->state.doing_finish || vbdev->state.started) {
1635 			continue;
1636 		}
1637 
1638 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1639 			examine_start(bdev);
1640 			register_vbdev(vbdev, examine_done, bdev);
1641 			created_from_config = true;
1642 			continue;
1643 		}
1644 		if (!strcmp(bdev_name, vbdev->core.name)) {
1645 			examine_start(bdev);
1646 			register_vbdev(vbdev, examine_done, bdev);
1647 			examine_done(0, NULL, bdev);
1648 			return;
1649 		}
1650 	}
1651 
1652 	/* If devices is discovered during config we do not check for metadata */
1653 	if (created_from_config) {
1654 		examine_done(0, NULL, bdev);
1655 		return;
1656 	}
1657 
1658 	/* Metadata probe path
1659 	 * We create temporary OCF volume and a temporary base structure
1660 	 * to use them for ocf_metadata_probe() and for bottom adapter IOs
1661 	 * Then we get UUIDs of core devices an create configurations based on them */
1662 	ctx = calloc(1, sizeof(*ctx));
1663 	if (!ctx) {
1664 		examine_done(-ENOMEM, NULL, bdev);
1665 		return;
1666 	}
1667 
1668 	ctx->base.bdev = bdev;
1669 	ctx->refcnt = 1;
1670 
1671 	rc = spdk_bdev_open(ctx->base.bdev, true, NULL, NULL, &ctx->base.desc);
1672 	if (rc) {
1673 		ctx->result = rc;
1674 		examine_ctx_put(ctx);
1675 		return;
1676 	}
1677 
1678 	rc = ocf_ctx_volume_create(vbdev_ocf_ctx, &ctx->volume, NULL, SPDK_OBJECT);
1679 	if (rc) {
1680 		ctx->result = rc;
1681 		examine_ctx_put(ctx);
1682 		return;
1683 	}
1684 
1685 	rc = ocf_volume_open(ctx->volume, &ctx->base);
1686 	if (rc) {
1687 		ctx->result = rc;
1688 		examine_ctx_put(ctx);
1689 		return;
1690 	}
1691 
1692 	ocf_metadata_probe(vbdev_ocf_ctx, ctx->volume, metadata_probe_cb, ctx);
1693 }
1694 
1695 static int
1696 vbdev_ocf_get_ctx_size(void)
1697 {
1698 	return sizeof(struct bdev_ocf_data);
1699 }
1700 
1701 static void
1702 fini_start(void)
1703 {
1704 	g_fini_started = true;
1705 }
1706 
1707 /* Module-global function table
1708  * Does not relate to vbdev instances */
1709 static struct spdk_bdev_module ocf_if = {
1710 	.name = "ocf",
1711 	.module_init = vbdev_ocf_init,
1712 	.fini_start = fini_start,
1713 	.module_fini = vbdev_ocf_module_fini,
1714 	.config_text = NULL,
1715 	.get_ctx_size = vbdev_ocf_get_ctx_size,
1716 	.examine_config = vbdev_ocf_examine,
1717 	.examine_disk   = vbdev_ocf_examine_disk,
1718 };
1719 SPDK_BDEV_MODULE_REGISTER(ocf, &ocf_if);
1720 
1721 SPDK_LOG_REGISTER_COMPONENT("vbdev_ocf", SPDK_TRACE_VBDEV_OCF)
1722