xref: /spdk/module/bdev/ocf/vbdev_ocf.c (revision 7a9ecf8284accbb14e99cf8533e561d5f9820f76)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include <ocf/ocf.h>
35 #include <ocf/ocf_types.h>
36 #include <ocf/ocf_mngt.h>
37 
38 #include "ctx.h"
39 #include "data.h"
40 #include "volume.h"
41 #include "utils.h"
42 #include "vbdev_ocf.h"
43 
44 #include "spdk/bdev_module.h"
45 #include "spdk/conf.h"
46 #include "spdk/io_channel.h"
47 #include "spdk/string.h"
48 #include "spdk_internal/log.h"
49 #include "spdk/cpuset.h"
50 
51 static struct spdk_bdev_module ocf_if;
52 
53 static TAILQ_HEAD(, vbdev_ocf) g_ocf_vbdev_head
54 	= TAILQ_HEAD_INITIALIZER(g_ocf_vbdev_head);
55 
56 static TAILQ_HEAD(, examining_bdev) g_ocf_examining_bdevs_head
57 	= TAILQ_HEAD_INITIALIZER(g_ocf_examining_bdevs_head);
58 
59 bool g_fini_started = false;
60 
61 /* Structure for keeping list of bdevs that are claimed but not used yet */
62 struct examining_bdev {
63 	struct spdk_bdev           *bdev;
64 	TAILQ_ENTRY(examining_bdev) tailq;
65 };
66 
67 /* Add bdev to list of claimed */
68 static void
69 examine_start(struct spdk_bdev *bdev)
70 {
71 	struct examining_bdev *entry = malloc(sizeof(*entry));
72 
73 	assert(entry);
74 	entry->bdev = bdev;
75 	TAILQ_INSERT_TAIL(&g_ocf_examining_bdevs_head, entry, tailq);
76 }
77 
78 /* Find bdev on list of claimed bdevs, then remove it,
79  * if it was the last one on list then report examine done */
80 static void
81 examine_done(int status, struct vbdev_ocf *vbdev, void *cb_arg)
82 {
83 	struct spdk_bdev *bdev = cb_arg;
84 	struct examining_bdev *entry, *safe, *found = NULL;
85 
86 	TAILQ_FOREACH_SAFE(entry, &g_ocf_examining_bdevs_head, tailq, safe) {
87 		if (entry->bdev == bdev) {
88 			if (found) {
89 				goto remove;
90 			} else {
91 				found = entry;
92 			}
93 		}
94 	}
95 
96 	assert(found);
97 	spdk_bdev_module_examine_done(&ocf_if);
98 
99 remove:
100 	TAILQ_REMOVE(&g_ocf_examining_bdevs_head, found, tailq);
101 	free(found);
102 }
103 
104 /* Free allocated strings and structure itself
105  * Used at shutdown only */
106 static void
107 free_vbdev(struct vbdev_ocf *vbdev)
108 {
109 	if (!vbdev) {
110 		return;
111 	}
112 
113 	free(vbdev->name);
114 	free(vbdev->cache.name);
115 	free(vbdev->core.name);
116 	free(vbdev);
117 }
118 
119 /* Get existing cache base
120  * that is attached to other vbdev */
121 static struct vbdev_ocf_base *
122 get_other_cache_base(struct vbdev_ocf_base *base)
123 {
124 	struct vbdev_ocf *vbdev;
125 
126 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
127 		if (&vbdev->cache == base || !vbdev->cache.attached) {
128 			continue;
129 		}
130 		if (!strcmp(vbdev->cache.name, base->name)) {
131 			return &vbdev->cache;
132 		}
133 	}
134 
135 	return NULL;
136 }
137 
138 /* Get existing OCF cache instance
139  * that is started by other vbdev */
140 static ocf_cache_t
141 get_other_cache_instance(struct vbdev_ocf *vbdev)
142 {
143 	struct vbdev_ocf *cmp;
144 
145 	TAILQ_FOREACH(cmp, &g_ocf_vbdev_head, tailq) {
146 		if (cmp->state.doing_finish || cmp == vbdev) {
147 			continue;
148 		}
149 		if (strcmp(cmp->cache.name, vbdev->cache.name)) {
150 			continue;
151 		}
152 		if (cmp->ocf_cache) {
153 			return cmp->ocf_cache;
154 		}
155 	}
156 
157 	return NULL;
158 }
159 
160 /* Close and unclaim base bdev */
161 static void
162 remove_base_bdev(struct vbdev_ocf_base *base)
163 {
164 	if (base->attached) {
165 		if (base->management_channel) {
166 			spdk_put_io_channel(base->management_channel);
167 		}
168 
169 		spdk_bdev_module_release_bdev(base->bdev);
170 		spdk_bdev_close(base->desc);
171 		base->attached = false;
172 	}
173 }
174 
175 /* Finish unregister operation */
176 static void
177 unregister_finish(struct vbdev_ocf *vbdev)
178 {
179 	spdk_bdev_destruct_done(&vbdev->exp_bdev, vbdev->state.stop_status);
180 	vbdev_ocf_cache_ctx_put(vbdev->cache_ctx);
181 	vbdev_ocf_mngt_continue(vbdev, 0);
182 }
183 
184 static void
185 close_core_bdev(struct vbdev_ocf *vbdev)
186 {
187 	remove_base_bdev(&vbdev->core);
188 	vbdev_ocf_mngt_continue(vbdev, 0);
189 }
190 
191 static void
192 remove_core_cmpl(void *priv, int error)
193 {
194 	struct vbdev_ocf *vbdev = priv;
195 
196 	ocf_mngt_cache_unlock(vbdev->ocf_cache);
197 	vbdev_ocf_mngt_continue(vbdev, error);
198 }
199 
200 /* Try to lock cache, then remove core */
201 static void
202 remove_core_poll(struct vbdev_ocf *vbdev)
203 {
204 	int rc;
205 
206 	rc = ocf_mngt_cache_trylock(vbdev->ocf_cache);
207 	if (rc) {
208 		return;
209 	}
210 
211 	vbdev_ocf_mngt_poll(vbdev, NULL);
212 	ocf_mngt_cache_remove_core(vbdev->ocf_core, remove_core_cmpl, vbdev);
213 }
214 
215 /* Detach core base */
216 static void
217 detach_core(struct vbdev_ocf *vbdev)
218 {
219 	if (vbdev->ocf_cache && ocf_cache_is_running(vbdev->ocf_cache)) {
220 		vbdev_ocf_mngt_poll(vbdev, remove_core_poll);
221 	} else {
222 		vbdev_ocf_mngt_continue(vbdev, 0);
223 	}
224 }
225 
226 static void
227 close_cache_bdev(struct vbdev_ocf *vbdev)
228 {
229 	remove_base_bdev(&vbdev->cache);
230 	vbdev_ocf_mngt_continue(vbdev, 0);
231 }
232 
233 /* Detach cache base */
234 static void
235 detach_cache(struct vbdev_ocf *vbdev)
236 {
237 	vbdev->state.stop_status = vbdev->mngt_ctx.status;
238 
239 	/* If some other vbdev references this cache bdev,
240 	 * we detach this only by changing the flag, without actual close */
241 	if (get_other_cache_base(&vbdev->cache)) {
242 		vbdev->cache.attached = false;
243 	}
244 
245 	vbdev_ocf_mngt_continue(vbdev, 0);
246 }
247 
248 static void
249 stop_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
250 {
251 	struct vbdev_ocf *vbdev = priv;
252 
253 	vbdev_ocf_queue_put(vbdev->cache_ctx->mngt_queue);
254 	ocf_mngt_cache_unlock(cache);
255 
256 	vbdev_ocf_mngt_continue(vbdev, error);
257 }
258 
259 /* Try to lock cache, then stop it */
260 static void
261 stop_vbdev_poll(struct vbdev_ocf *vbdev)
262 {
263 	if (!ocf_cache_is_running(vbdev->ocf_cache)) {
264 		vbdev_ocf_mngt_continue(vbdev, 0);
265 		return;
266 	}
267 
268 	if (!g_fini_started && get_other_cache_instance(vbdev)) {
269 		SPDK_NOTICELOG("Not stopping cache instance '%s'"
270 			       " because it is referenced by other OCF bdev\n",
271 			       vbdev->cache.name);
272 		vbdev_ocf_mngt_continue(vbdev, 0);
273 		return;
274 	}
275 
276 	if (ocf_mngt_cache_trylock(vbdev->ocf_cache)) {
277 		return;
278 	}
279 
280 	vbdev_ocf_mngt_poll(vbdev, NULL);
281 	ocf_mngt_cache_stop(vbdev->ocf_cache, stop_vbdev_cmpl, vbdev);
282 }
283 
284 /* Stop OCF cache object
285  * vbdev_ocf is not operational after this */
286 static void
287 stop_vbdev(struct vbdev_ocf *vbdev)
288 {
289 	vbdev_ocf_mngt_poll(vbdev, stop_vbdev_poll);
290 }
291 
292 /* Wait for all OCF requests to finish */
293 static void
294 wait_for_requests_poll(struct vbdev_ocf *vbdev)
295 {
296 	if (ocf_cache_has_pending_requests(vbdev->ocf_cache)) {
297 		return;
298 	}
299 
300 	vbdev_ocf_mngt_continue(vbdev, 0);
301 }
302 
303 /* Start waiting for OCF requests to finish */
304 static void
305 wait_for_requests(struct vbdev_ocf *vbdev)
306 {
307 	vbdev_ocf_mngt_poll(vbdev, wait_for_requests_poll);
308 }
309 
310 static void
311 flush_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
312 {
313 	struct vbdev_ocf *vbdev = priv;
314 
315 	ocf_mngt_cache_unlock(cache);
316 	vbdev_ocf_mngt_continue(vbdev, error);
317 }
318 
319 static void
320 flush_vbdev_poll(struct vbdev_ocf *vbdev)
321 {
322 	if (!ocf_cache_is_running(vbdev->ocf_cache)) {
323 		vbdev_ocf_mngt_continue(vbdev, -EINVAL);
324 		return;
325 	}
326 
327 	if (ocf_mngt_cache_trylock(vbdev->ocf_cache)) {
328 		return;
329 	}
330 
331 	vbdev_ocf_mngt_poll(vbdev, NULL);
332 	ocf_mngt_cache_flush(vbdev->ocf_cache, false, flush_vbdev_cmpl, vbdev);
333 }
334 
335 static void
336 flush_vbdev(struct vbdev_ocf *vbdev)
337 {
338 	vbdev_ocf_mngt_poll(vbdev, flush_vbdev_poll);
339 }
340 
341 /* Procedures called during dirty unregister */
342 vbdev_ocf_mngt_fn unregister_path_dirty[] = {
343 	flush_vbdev,
344 	wait_for_requests,
345 	stop_vbdev,
346 	detach_cache,
347 	close_cache_bdev,
348 	detach_core,
349 	close_core_bdev,
350 	unregister_finish,
351 	NULL
352 };
353 
354 /* Procedures called during clean unregister */
355 vbdev_ocf_mngt_fn unregister_path_clean[] = {
356 	flush_vbdev,
357 	wait_for_requests,
358 	detach_core,
359 	close_core_bdev,
360 	stop_vbdev,
361 	detach_cache,
362 	close_cache_bdev,
363 	unregister_finish,
364 	NULL
365 };
366 
367 /* Start asynchronous management operation using unregister_path */
368 static void
369 unregister_cb(void *opaque)
370 {
371 	struct vbdev_ocf *vbdev = opaque;
372 	vbdev_ocf_mngt_fn *unregister_path;
373 	int rc;
374 
375 	unregister_path = vbdev->state.doing_clean_delete ?
376 			  unregister_path_clean : unregister_path_dirty;
377 
378 	rc = vbdev_ocf_mngt_start(vbdev, unregister_path, NULL, NULL);
379 	if (rc) {
380 		SPDK_ERRLOG("Unable to unregister OCF bdev: %d\n", rc);
381 		spdk_bdev_destruct_done(&vbdev->exp_bdev, rc);
382 	}
383 }
384 
385 /* Clean remove case - remove core and then cache, this order
386  * will remove instance permanently */
387 static void
388 _vbdev_ocf_destruct_clean(struct vbdev_ocf *vbdev)
389 {
390 	if (vbdev->core.attached) {
391 		detach_core(vbdev);
392 		close_core_bdev(vbdev);
393 	}
394 
395 	if (vbdev->cache.attached) {
396 		detach_cache(vbdev);
397 		close_cache_bdev(vbdev);
398 	}
399 }
400 
401 /* Dirty shutdown/hot remove case - remove cache and then core, this order
402  * will allow us to recover this instance in the future */
403 static void
404 _vbdev_ocf_destruct_dirty(struct vbdev_ocf *vbdev)
405 {
406 	if (vbdev->cache.attached) {
407 		detach_cache(vbdev);
408 		close_cache_bdev(vbdev);
409 	}
410 
411 	if (vbdev->core.attached) {
412 		detach_core(vbdev);
413 		close_core_bdev(vbdev);
414 	}
415 }
416 
417 /* Unregister io device with callback to unregister_cb
418  * This function is called during spdk_bdev_unregister */
419 static int
420 vbdev_ocf_destruct(void *opaque)
421 {
422 	struct vbdev_ocf *vbdev = opaque;
423 
424 	if (vbdev->state.doing_finish) {
425 		return -EALREADY;
426 	}
427 
428 	if (vbdev->state.starting && !vbdev->state.started) {
429 		/* Prevent before detach cache/core during register path of
430 		  this bdev */
431 		return -EBUSY;
432 	}
433 
434 	vbdev->state.doing_finish = true;
435 
436 	if (vbdev->state.started) {
437 		spdk_io_device_unregister(vbdev, unregister_cb);
438 		/* Return 1 because unregister is delayed */
439 		return 1;
440 	}
441 
442 	if (vbdev->state.doing_clean_delete) {
443 		_vbdev_ocf_destruct_clean(vbdev);
444 	} else {
445 		_vbdev_ocf_destruct_dirty(vbdev);
446 	}
447 
448 	return 0;
449 }
450 
451 /* Stop OCF cache and unregister SPDK bdev */
452 int
453 vbdev_ocf_delete(struct vbdev_ocf *vbdev, void (*cb)(void *, int), void *cb_arg)
454 {
455 	int rc = 0;
456 
457 	if (vbdev->state.started) {
458 		spdk_bdev_unregister(&vbdev->exp_bdev, cb, cb_arg);
459 	} else {
460 		rc = vbdev_ocf_destruct(vbdev);
461 		if (rc == 0 && cb) {
462 			cb(cb_arg, 0);
463 		}
464 	}
465 
466 	return rc;
467 }
468 
469 /* Remove cores permanently and then stop OCF cache and unregister SPDK bdev */
470 int
471 vbdev_ocf_delete_clean(struct vbdev_ocf *vbdev, void (*cb)(void *, int),
472 		       void *cb_arg)
473 {
474 	vbdev->state.doing_clean_delete = true;
475 
476 	return vbdev_ocf_delete(vbdev, cb, cb_arg);
477 }
478 
479 
480 /* If vbdev is online, return its object */
481 struct vbdev_ocf *
482 vbdev_ocf_get_by_name(const char *name)
483 {
484 	struct vbdev_ocf *vbdev;
485 
486 	if (name == NULL) {
487 		assert(false);
488 		return NULL;
489 	}
490 
491 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
492 		if (vbdev->name == NULL || vbdev->state.doing_finish) {
493 			continue;
494 		}
495 		if (strcmp(vbdev->name, name) == 0) {
496 			return vbdev;
497 		}
498 	}
499 	return NULL;
500 }
501 
502 /* Return matching base if parent vbdev is online */
503 struct vbdev_ocf_base *
504 vbdev_ocf_get_base_by_name(const char *name)
505 {
506 	struct vbdev_ocf *vbdev;
507 
508 	if (name == NULL) {
509 		assert(false);
510 		return NULL;
511 	}
512 
513 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
514 		if (vbdev->state.doing_finish) {
515 			continue;
516 		}
517 
518 		if (vbdev->cache.name && strcmp(vbdev->cache.name, name) == 0) {
519 			return &vbdev->cache;
520 		}
521 		if (vbdev->core.name && strcmp(vbdev->core.name, name) == 0) {
522 			return &vbdev->core;
523 		}
524 	}
525 	return NULL;
526 }
527 
528 /* Execute fn for each OCF device that is online or waits for base devices */
529 void
530 vbdev_ocf_foreach(vbdev_ocf_foreach_fn fn, void *ctx)
531 {
532 	struct vbdev_ocf *vbdev;
533 
534 	assert(fn != NULL);
535 
536 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
537 		if (!vbdev->state.doing_finish) {
538 			fn(vbdev, ctx);
539 		}
540 	}
541 }
542 
543 /* Called from OCF when SPDK_IO is completed */
544 static void
545 vbdev_ocf_io_submit_cb(struct ocf_io *io, int error)
546 {
547 	struct spdk_bdev_io *bdev_io = io->priv1;
548 
549 	if (error == 0) {
550 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
551 	} else if (error == -ENOMEM) {
552 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
553 	} else {
554 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
555 	}
556 
557 	ocf_io_put(io);
558 }
559 
560 /* Configure io parameters and send it to OCF */
561 static int
562 io_submit_to_ocf(struct spdk_bdev_io *bdev_io, struct ocf_io *io)
563 {
564 	int dir;
565 	uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
566 	uint64_t offset = bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen;
567 
568 	switch (bdev_io->type) {
569 	case SPDK_BDEV_IO_TYPE_WRITE:
570 	case SPDK_BDEV_IO_TYPE_READ:
571 		dir = OCF_READ;
572 		if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
573 			dir = OCF_WRITE;
574 		}
575 		ocf_io_configure(io, offset, len, dir, 0, 0);
576 		ocf_core_submit_io(io);
577 		return 0;
578 	case SPDK_BDEV_IO_TYPE_FLUSH:
579 		ocf_io_configure(io, offset, len, OCF_WRITE, 0, OCF_WRITE_FLUSH);
580 		ocf_core_submit_flush(io);
581 		return 0;
582 	case SPDK_BDEV_IO_TYPE_UNMAP:
583 		ocf_io_configure(io, offset, len, 0, 0, 0);
584 		ocf_core_submit_discard(io);
585 		return 0;
586 	case SPDK_BDEV_IO_TYPE_RESET:
587 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
588 	default:
589 		SPDK_ERRLOG("Unsupported IO type: %d\n", bdev_io->type);
590 		return -EINVAL;
591 	}
592 }
593 
594 /* Submit SPDK-IO to OCF */
595 static void
596 io_handle(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
597 {
598 	struct vbdev_ocf *vbdev = bdev_io->bdev->ctxt;
599 	struct ocf_io *io = NULL;
600 	struct bdev_ocf_data *data = NULL;
601 	struct vbdev_ocf_qcxt *qctx = spdk_io_channel_get_ctx(ch);
602 	int err;
603 
604 	io = ocf_core_new_io(vbdev->ocf_core);
605 	if (!io) {
606 		err = -ENOMEM;
607 		goto fail;
608 	}
609 
610 	ocf_io_set_queue(io, qctx->queue);
611 
612 	data = vbdev_ocf_data_from_spdk_io(bdev_io);
613 	if (!data) {
614 		err = -ENOMEM;
615 		goto fail;
616 	}
617 
618 	err = ocf_io_set_data(io, data, 0);
619 	if (err) {
620 		goto fail;
621 	}
622 
623 	ocf_io_set_cmpl(io, bdev_io, NULL, vbdev_ocf_io_submit_cb);
624 
625 	err = io_submit_to_ocf(bdev_io, io);
626 	if (err) {
627 		goto fail;
628 	}
629 
630 	return;
631 
632 fail:
633 	if (io) {
634 		ocf_io_put(io);
635 	}
636 
637 	if (err == -ENOMEM) {
638 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
639 	} else {
640 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
641 	}
642 }
643 
644 static void
645 vbdev_ocf_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
646 		     bool success)
647 {
648 	if (!success) {
649 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
650 		return;
651 	}
652 
653 	io_handle(ch, bdev_io);
654 }
655 
656 /* Called from bdev layer when an io to Cache vbdev is submitted */
657 static void
658 vbdev_ocf_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
659 {
660 	switch (bdev_io->type) {
661 	case SPDK_BDEV_IO_TYPE_READ:
662 		/* User does not have to allocate io vectors for the request,
663 		 * so in case they are not allocated, we allocate them here */
664 		spdk_bdev_io_get_buf(bdev_io, vbdev_ocf_get_buf_cb,
665 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
666 		break;
667 	case SPDK_BDEV_IO_TYPE_WRITE:
668 	case SPDK_BDEV_IO_TYPE_FLUSH:
669 	case SPDK_BDEV_IO_TYPE_UNMAP:
670 		io_handle(ch, bdev_io);
671 		break;
672 	case SPDK_BDEV_IO_TYPE_RESET:
673 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
674 	default:
675 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
676 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
677 		break;
678 	}
679 }
680 
681 /* Called from bdev layer */
682 static bool
683 vbdev_ocf_io_type_supported(void *opaque, enum spdk_bdev_io_type io_type)
684 {
685 	struct vbdev_ocf *vbdev = opaque;
686 
687 	switch (io_type) {
688 	case SPDK_BDEV_IO_TYPE_READ:
689 	case SPDK_BDEV_IO_TYPE_WRITE:
690 	case SPDK_BDEV_IO_TYPE_FLUSH:
691 	case SPDK_BDEV_IO_TYPE_UNMAP:
692 		return spdk_bdev_io_type_supported(vbdev->core.bdev, io_type);
693 	case SPDK_BDEV_IO_TYPE_RESET:
694 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
695 	default:
696 		return false;
697 	}
698 }
699 
700 /* Called from bdev layer */
701 static struct spdk_io_channel *
702 vbdev_ocf_get_io_channel(void *opaque)
703 {
704 	struct vbdev_ocf *bdev = opaque;
705 
706 	return spdk_get_io_channel(bdev);
707 }
708 
709 static int
710 vbdev_ocf_dump_info_json(void *opaque, struct spdk_json_write_ctx *w)
711 {
712 	struct vbdev_ocf *vbdev = opaque;
713 
714 	spdk_json_write_named_string(w, "cache_device", vbdev->cache.name);
715 	spdk_json_write_named_string(w, "core_device", vbdev->core.name);
716 
717 	spdk_json_write_named_string(w, "mode",
718 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
719 	spdk_json_write_named_uint32(w, "cache_line_size",
720 				     ocf_cache_get_line_size(vbdev->ocf_cache));
721 	spdk_json_write_named_bool(w, "metadata_volatile",
722 				   vbdev->cfg.cache.metadata_volatile);
723 
724 	return 0;
725 }
726 
727 static void
728 vbdev_ocf_write_json_config(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
729 {
730 	struct vbdev_ocf *vbdev = bdev->ctxt;
731 
732 	spdk_json_write_object_begin(w);
733 
734 	spdk_json_write_named_string(w, "method", "bdev_ocf_create");
735 
736 	spdk_json_write_named_object_begin(w, "params");
737 	spdk_json_write_named_string(w, "name", vbdev->name);
738 	spdk_json_write_named_string(w, "mode",
739 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
740 	spdk_json_write_named_string(w, "cache_bdev_name", vbdev->cache.name);
741 	spdk_json_write_named_string(w, "core_bdev_name", vbdev->core.name);
742 	spdk_json_write_object_end(w);
743 
744 	spdk_json_write_object_end(w);
745 }
746 
747 /* Cache vbdev function table
748  * Used by bdev layer */
749 static struct spdk_bdev_fn_table cache_dev_fn_table = {
750 	.destruct = vbdev_ocf_destruct,
751 	.io_type_supported = vbdev_ocf_io_type_supported,
752 	.submit_request	= vbdev_ocf_submit_request,
753 	.get_io_channel	= vbdev_ocf_get_io_channel,
754 	.write_config_json = vbdev_ocf_write_json_config,
755 	.dump_info_json = vbdev_ocf_dump_info_json,
756 };
757 
758 /* Poller function for the OCF queue
759  * We execute OCF requests here synchronously */
760 static int
761 queue_poll(void *opaque)
762 {
763 	struct vbdev_ocf_qcxt *qctx = opaque;
764 	uint32_t iono = ocf_queue_pending_io(qctx->queue);
765 	int i, max = spdk_min(32, iono);
766 
767 	for (i = 0; i < max; i++) {
768 		ocf_queue_run_single(qctx->queue);
769 	}
770 
771 	if (iono > 0) {
772 		return 1;
773 	} else {
774 		return 0;
775 	}
776 }
777 
778 /* Called during ocf_submit_io, ocf_purge*
779  * and any other requests that need to submit io */
780 static void
781 vbdev_ocf_ctx_queue_kick(ocf_queue_t q)
782 {
783 }
784 
785 /* OCF queue deinitialization
786  * Called at ocf_cache_stop */
787 static void
788 vbdev_ocf_ctx_queue_stop(ocf_queue_t q)
789 {
790 	struct vbdev_ocf_qcxt *qctx = ocf_queue_get_priv(q);
791 
792 	if (qctx) {
793 		spdk_put_io_channel(qctx->cache_ch);
794 		spdk_put_io_channel(qctx->core_ch);
795 		spdk_poller_unregister(&qctx->poller);
796 		if (qctx->allocated) {
797 			free(qctx);
798 		}
799 	}
800 }
801 
802 /* Queue ops is an interface for running queue thread
803  * stop() operation in called just before queue gets destroyed */
804 const struct ocf_queue_ops queue_ops = {
805 	.kick_sync = vbdev_ocf_ctx_queue_kick,
806 	.kick = vbdev_ocf_ctx_queue_kick,
807 	.stop = vbdev_ocf_ctx_queue_stop,
808 };
809 
810 /* Called on cache vbdev creation at every thread
811  * We allocate OCF queues here and SPDK poller for it */
812 static int
813 io_device_create_cb(void *io_device, void *ctx_buf)
814 {
815 	struct vbdev_ocf *vbdev = io_device;
816 	struct vbdev_ocf_qcxt *qctx = ctx_buf;
817 	int rc;
818 
819 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &qctx->queue, &queue_ops);
820 	if (rc) {
821 		return rc;
822 	}
823 
824 	ocf_queue_set_priv(qctx->queue, qctx);
825 
826 	qctx->vbdev      = vbdev;
827 	qctx->cache_ch   = spdk_bdev_get_io_channel(vbdev->cache.desc);
828 	qctx->core_ch    = spdk_bdev_get_io_channel(vbdev->core.desc);
829 	qctx->poller     = spdk_poller_register(queue_poll, qctx, 0);
830 
831 	return rc;
832 }
833 
834 /* Called per thread
835  * Put OCF queue and relaunch poller with new context to finish pending requests */
836 static void
837 io_device_destroy_cb(void *io_device, void *ctx_buf)
838 {
839 	/* Making a copy of context to use it after io channel will be destroyed */
840 	struct vbdev_ocf_qcxt *copy = malloc(sizeof(*copy));
841 	struct vbdev_ocf_qcxt *qctx = ctx_buf;
842 
843 	if (copy) {
844 		ocf_queue_set_priv(qctx->queue, copy);
845 		memcpy(copy, qctx, sizeof(*copy));
846 		spdk_poller_unregister(&qctx->poller);
847 		copy->poller = spdk_poller_register(queue_poll, copy, 0);
848 		copy->allocated = true;
849 	} else {
850 		SPDK_ERRLOG("Unable to stop OCF queue properly: %s\n",
851 			    spdk_strerror(ENOMEM));
852 	}
853 
854 	vbdev_ocf_queue_put(qctx->queue);
855 }
856 
857 /* OCF management queue deinitialization */
858 static void
859 vbdev_ocf_ctx_mngt_queue_stop(ocf_queue_t q)
860 {
861 	struct spdk_poller *poller = ocf_queue_get_priv(q);
862 
863 	if (poller) {
864 		spdk_poller_unregister(&poller);
865 	}
866 }
867 
868 static int
869 mngt_queue_poll(void *opaque)
870 {
871 	ocf_queue_t q = opaque;
872 	uint32_t iono = ocf_queue_pending_io(q);
873 	int i, max = spdk_min(32, iono);
874 
875 	for (i = 0; i < max; i++) {
876 		ocf_queue_run_single(q);
877 	}
878 
879 	if (iono > 0) {
880 		return 1;
881 	} else {
882 		return 0;
883 	}
884 }
885 
886 static void
887 vbdev_ocf_ctx_mngt_queue_kick(ocf_queue_t q)
888 {
889 }
890 
891 /* Queue ops is an interface for running queue thread
892  * stop() operation in called just before queue gets destroyed */
893 const struct ocf_queue_ops mngt_queue_ops = {
894 	.kick_sync = NULL,
895 	.kick = vbdev_ocf_ctx_mngt_queue_kick,
896 	.stop = vbdev_ocf_ctx_mngt_queue_stop,
897 };
898 
899 /* Create exported spdk object */
900 static void
901 finish_register(struct vbdev_ocf *vbdev)
902 {
903 	int result;
904 
905 	/* Copy properties of the base bdev */
906 	vbdev->exp_bdev.blocklen = vbdev->core.bdev->blocklen;
907 	vbdev->exp_bdev.write_cache = vbdev->core.bdev->write_cache;
908 	vbdev->exp_bdev.required_alignment = vbdev->core.bdev->required_alignment;
909 
910 	vbdev->exp_bdev.name = vbdev->name;
911 	vbdev->exp_bdev.product_name = "SPDK OCF";
912 
913 	vbdev->exp_bdev.blockcnt = vbdev->core.bdev->blockcnt;
914 	vbdev->exp_bdev.ctxt = vbdev;
915 	vbdev->exp_bdev.fn_table = &cache_dev_fn_table;
916 	vbdev->exp_bdev.module = &ocf_if;
917 
918 	/* Finally register vbdev in SPDK */
919 	spdk_io_device_register(vbdev, io_device_create_cb, io_device_destroy_cb,
920 				sizeof(struct vbdev_ocf_qcxt), vbdev->name);
921 	result = spdk_bdev_register(&vbdev->exp_bdev);
922 	if (result) {
923 		SPDK_ERRLOG("Could not register exposed bdev\n");
924 	} else {
925 		vbdev->state.started = true;
926 	}
927 
928 	vbdev_ocf_mngt_continue(vbdev, result);
929 }
930 
931 static void
932 add_core_cmpl(ocf_cache_t cache, ocf_core_t core, void *priv, int error)
933 {
934 	struct vbdev_ocf *vbdev = priv;
935 
936 	ocf_mngt_cache_unlock(cache);
937 
938 	if (error) {
939 		SPDK_ERRLOG("Failed to add core device to cache instance\n");
940 	} else {
941 		vbdev->ocf_core = core;
942 		vbdev->core.id  = ocf_core_get_id(core);
943 	}
944 
945 	vbdev_ocf_mngt_continue(vbdev, error);
946 }
947 
948 /* Try to lock cache, then add core */
949 static void
950 add_core_poll(struct vbdev_ocf *vbdev)
951 {
952 	if (ocf_mngt_cache_trylock(vbdev->ocf_cache)) {
953 		return;
954 	}
955 
956 	vbdev_ocf_mngt_poll(vbdev, NULL);
957 	ocf_mngt_cache_add_core(vbdev->ocf_cache, &vbdev->cfg.core, add_core_cmpl, vbdev);
958 }
959 
960 /* Add core for existing OCF cache instance */
961 static void
962 add_core(struct vbdev_ocf *vbdev)
963 {
964 	vbdev_ocf_mngt_poll(vbdev, add_core_poll);
965 }
966 
967 static void
968 start_cache_cmpl(ocf_cache_t cache, void *priv, int error)
969 {
970 	struct vbdev_ocf *vbdev = priv;
971 
972 	ocf_mngt_cache_unlock(cache);
973 
974 	vbdev_ocf_mngt_continue(vbdev, error);
975 }
976 
977 static int
978 create_management_queue(struct vbdev_ocf *vbdev)
979 {
980 	struct spdk_poller *mngt_poller;
981 	int rc;
982 
983 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &vbdev->cache_ctx->mngt_queue, &mngt_queue_ops);
984 	if (rc) {
985 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
986 		return rc;
987 	}
988 
989 	mngt_poller = spdk_poller_register(mngt_queue_poll, vbdev->cache_ctx->mngt_queue, 100);
990 	if (mngt_poller == NULL) {
991 		SPDK_ERRLOG("Unable to initiate mngt request: %s", spdk_strerror(ENOMEM));
992 		return -ENOMEM;
993 	}
994 
995 	ocf_queue_set_priv(vbdev->cache_ctx->mngt_queue, mngt_poller);
996 	ocf_mngt_cache_set_mngt_queue(vbdev->ocf_cache, vbdev->cache_ctx->mngt_queue);
997 
998 	return 0;
999 }
1000 
1001 /* Start OCF cache, attach caching device */
1002 static void
1003 start_cache(struct vbdev_ocf *vbdev)
1004 {
1005 	ocf_cache_t existing;
1006 	int rc;
1007 
1008 	if (vbdev->ocf_cache) {
1009 		vbdev->mngt_ctx.status = -EALREADY;
1010 		vbdev_ocf_mngt_stop(vbdev);
1011 		return;
1012 	}
1013 
1014 	existing = get_other_cache_instance(vbdev);
1015 	if (existing) {
1016 		SPDK_NOTICELOG("OCF bdev %s connects to existing cache device %s\n",
1017 			       vbdev->name, vbdev->cache.name);
1018 		vbdev->ocf_cache = existing;
1019 		vbdev->cache.id = ocf_cache_get_id(existing);
1020 		vbdev->cache_ctx = ocf_cache_get_priv(existing);
1021 		vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1022 		vbdev_ocf_mngt_continue(vbdev, 0);
1023 		return;
1024 	}
1025 
1026 	vbdev->cache_ctx = calloc(1, sizeof(struct vbdev_ocf_cache_ctx));
1027 	if (vbdev->cache_ctx == NULL) {
1028 		vbdev->mngt_ctx.status = -ENOMEM;
1029 		vbdev_ocf_mngt_stop(vbdev);
1030 		return;
1031 	}
1032 
1033 	vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1034 	pthread_mutex_init(&vbdev->cache_ctx->lock, NULL);
1035 
1036 	rc = ocf_mngt_cache_start(vbdev_ocf_ctx, &vbdev->ocf_cache, &vbdev->cfg.cache);
1037 	if (rc) {
1038 		vbdev_ocf_cache_ctx_put(vbdev->cache_ctx);
1039 		vbdev->mngt_ctx.status = rc;
1040 		vbdev_ocf_mngt_stop(vbdev);
1041 		return;
1042 	}
1043 
1044 	vbdev->cache.id = ocf_cache_get_id(vbdev->ocf_cache);
1045 	ocf_cache_set_priv(vbdev->ocf_cache, vbdev->cache_ctx);
1046 
1047 	rc = create_management_queue(vbdev);
1048 	if (rc) {
1049 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1050 		vbdev_ocf_cache_ctx_put(vbdev->cache_ctx);
1051 		vbdev->mngt_ctx.status = rc;
1052 		vbdev_ocf_mngt_stop(vbdev);
1053 		return;
1054 	}
1055 
1056 	if (vbdev->cfg.loadq) {
1057 		ocf_mngt_cache_load(vbdev->ocf_cache, &vbdev->cfg.device, start_cache_cmpl, vbdev);
1058 	} else {
1059 		ocf_mngt_cache_attach(vbdev->ocf_cache, &vbdev->cfg.device, start_cache_cmpl, vbdev);
1060 	}
1061 }
1062 
1063 /* Procedures called during register operation */
1064 vbdev_ocf_mngt_fn register_path[] = {
1065 	start_cache,
1066 	add_core,
1067 	finish_register,
1068 	NULL
1069 };
1070 
1071 /* Start cache instance and register OCF bdev */
1072 static void
1073 register_vbdev(struct vbdev_ocf *vbdev, vbdev_ocf_mngt_callback cb, void *cb_arg)
1074 {
1075 	int rc;
1076 
1077 	if (!(vbdev->core.attached && vbdev->cache.attached) || vbdev->state.started) {
1078 		cb(-EPERM, vbdev, cb_arg);
1079 		return;
1080 	}
1081 
1082 	vbdev->state.starting = true;
1083 	rc = vbdev_ocf_mngt_start(vbdev, register_path, cb, cb_arg);
1084 	if (rc) {
1085 		cb(rc, vbdev, cb_arg);
1086 	}
1087 }
1088 
1089 /* Init OCF configuration options
1090  * for core and cache devices */
1091 static void
1092 init_vbdev_config(struct vbdev_ocf *vbdev)
1093 {
1094 	struct vbdev_ocf_config *cfg = &vbdev->cfg;
1095 
1096 	/* Id 0 means OCF decides the id */
1097 	cfg->cache.id = 0;
1098 	cfg->cache.name = vbdev->name;
1099 
1100 	/* TODO [metadata]: make configurable with persistent
1101 	 * metadata support */
1102 	cfg->cache.metadata_volatile = false;
1103 
1104 	/* TODO [cache line size]: make cache line size configurable
1105 	 * Using standard 4KiB for now */
1106 	cfg->cache.cache_line_size = ocf_cache_line_size_4;
1107 
1108 	/* This are suggested values that
1109 	 * should be sufficient for most use cases */
1110 	cfg->cache.backfill.max_queue_size = 65536;
1111 	cfg->cache.backfill.queue_unblock_size = 60000;
1112 
1113 	/* TODO [cache line size] */
1114 	cfg->device.cache_line_size = ocf_cache_line_size_4;
1115 	cfg->device.force = true;
1116 	cfg->device.min_free_ram = 0;
1117 	cfg->device.perform_test = false;
1118 	cfg->device.discard_on_start = false;
1119 
1120 	vbdev->cfg.cache.locked = true;
1121 
1122 	cfg->core.volume_type = SPDK_OBJECT;
1123 	cfg->device.volume_type = SPDK_OBJECT;
1124 	cfg->core.core_id = OCF_CORE_MAX;
1125 
1126 	if (vbdev->cfg.loadq) {
1127 		/* When doing cache_load(), we need to set try_add to true,
1128 		 * otherwise OCF will interpret this core as new
1129 		 * instead of the inactive one */
1130 		vbdev->cfg.core.try_add = true;
1131 	}
1132 
1133 	/* Serialize bdev names in OCF UUID to interpret on future loads
1134 	 * Core UUID is pair of (core bdev name, cache bdev name)
1135 	 * Cache UUID is cache bdev name */
1136 	cfg->device.uuid.size = strlen(vbdev->cache.name) + 1;
1137 	cfg->device.uuid.data = vbdev->cache.name;
1138 
1139 	snprintf(vbdev->uuid, VBDEV_OCF_MD_MAX_LEN, "%s %s",
1140 		 vbdev->core.name, vbdev->name);
1141 	cfg->core.uuid.size = strlen(vbdev->uuid) + 1;
1142 	cfg->core.uuid.data = vbdev->uuid;
1143 	vbdev->uuid[strlen(vbdev->core.name)] = 0;
1144 }
1145 
1146 /* Allocate vbdev structure object and add it to the global list */
1147 static int
1148 init_vbdev(const char *vbdev_name,
1149 	   const char *cache_mode_name,
1150 	   const char *cache_name,
1151 	   const char *core_name,
1152 	   bool loadq)
1153 {
1154 	struct vbdev_ocf *vbdev;
1155 	int rc = 0;
1156 
1157 	if (spdk_bdev_get_by_name(vbdev_name) || vbdev_ocf_get_by_name(vbdev_name)) {
1158 		SPDK_ERRLOG("Device with name '%s' already exists\n", vbdev_name);
1159 		return -EPERM;
1160 	}
1161 
1162 	vbdev = calloc(1, sizeof(*vbdev));
1163 	if (!vbdev) {
1164 		goto error_mem;
1165 	}
1166 
1167 	vbdev->cache.parent = vbdev;
1168 	vbdev->core.parent = vbdev;
1169 	vbdev->cache.is_cache = true;
1170 	vbdev->core.is_cache = false;
1171 
1172 	if (cache_mode_name) {
1173 		vbdev->cfg.cache.cache_mode
1174 			= ocf_get_cache_mode(cache_mode_name);
1175 	} else if (!loadq) { /* In load path it is OK to pass NULL as cache mode */
1176 		SPDK_ERRLOG("No cache mode specified\n");
1177 		rc = -EINVAL;
1178 		goto error_free;
1179 	}
1180 	if (vbdev->cfg.cache.cache_mode < 0) {
1181 		SPDK_ERRLOG("Incorrect cache mode '%s'\n", cache_mode_name);
1182 		rc = -EINVAL;
1183 		goto error_free;
1184 	}
1185 
1186 	vbdev->name = strdup(vbdev_name);
1187 	if (!vbdev->name) {
1188 		goto error_mem;
1189 	}
1190 
1191 	vbdev->cache.name = strdup(cache_name);
1192 	if (!vbdev->cache.name) {
1193 		goto error_mem;
1194 	}
1195 
1196 	vbdev->core.name = strdup(core_name);
1197 	if (!vbdev->core.name) {
1198 		goto error_mem;
1199 	}
1200 
1201 	vbdev->cfg.loadq = loadq;
1202 	init_vbdev_config(vbdev);
1203 	TAILQ_INSERT_TAIL(&g_ocf_vbdev_head, vbdev, tailq);
1204 	return rc;
1205 
1206 error_mem:
1207 	rc = -ENOMEM;
1208 error_free:
1209 	free_vbdev(vbdev);
1210 	return rc;
1211 }
1212 
1213 /* Read configuration file at the start of SPDK application
1214  * This adds vbdevs to global list if some mentioned in config */
1215 static int
1216 vbdev_ocf_init(void)
1217 {
1218 	const char *vbdev_name, *modename, *cache_name, *core_name;
1219 	struct spdk_conf_section *sp;
1220 	int status;
1221 
1222 	status = vbdev_ocf_ctx_init();
1223 	if (status) {
1224 		SPDK_ERRLOG("OCF ctx initialization failed with=%d\n", status);
1225 		return status;
1226 	}
1227 
1228 	status = vbdev_ocf_volume_init();
1229 	if (status) {
1230 		vbdev_ocf_ctx_cleanup();
1231 		SPDK_ERRLOG("OCF volume initialization failed with=%d\n", status);
1232 		return status;
1233 	}
1234 
1235 	sp = spdk_conf_find_section(NULL, "OCF");
1236 	if (sp == NULL) {
1237 		return 0;
1238 	}
1239 
1240 	for (int i = 0; ; i++) {
1241 		if (!spdk_conf_section_get_nval(sp, "OCF", i)) {
1242 			break;
1243 		}
1244 
1245 		vbdev_name = spdk_conf_section_get_nmval(sp, "OCF", i, 0);
1246 		if (!vbdev_name) {
1247 			SPDK_ERRLOG("No vbdev name specified\n");
1248 			continue;
1249 		}
1250 
1251 		modename = spdk_conf_section_get_nmval(sp, "OCF", i, 1);
1252 		if (!modename) {
1253 			SPDK_ERRLOG("No modename specified for OCF vbdev '%s'\n", vbdev_name);
1254 			continue;
1255 		}
1256 
1257 		cache_name = spdk_conf_section_get_nmval(sp, "OCF", i, 2);
1258 		if (!cache_name) {
1259 			SPDK_ERRLOG("No cache device specified for OCF vbdev '%s'\n", vbdev_name);
1260 			continue;
1261 		}
1262 
1263 		core_name = spdk_conf_section_get_nmval(sp, "OCF", i, 3);
1264 		if (!core_name) {
1265 			SPDK_ERRLOG("No core devices specified for OCF vbdev '%s'\n", vbdev_name);
1266 			continue;
1267 		}
1268 
1269 		status = init_vbdev(vbdev_name, modename, cache_name, core_name, false);
1270 		if (status) {
1271 			SPDK_ERRLOG("Config initialization failed with code: %d\n", status);
1272 		}
1273 	}
1274 
1275 	return status;
1276 }
1277 
1278 /* Called after application shutdown started
1279  * Release memory of allocated structures here */
1280 static void
1281 vbdev_ocf_module_fini(void)
1282 {
1283 	struct vbdev_ocf *vbdev;
1284 
1285 	while ((vbdev = TAILQ_FIRST(&g_ocf_vbdev_head))) {
1286 		TAILQ_REMOVE(&g_ocf_vbdev_head, vbdev, tailq);
1287 		free_vbdev(vbdev);
1288 	}
1289 
1290 	vbdev_ocf_volume_cleanup();
1291 	vbdev_ocf_ctx_cleanup();
1292 }
1293 
1294 /* When base device gets unpluged this is called
1295  * We will unregister cache vbdev here
1296  * When cache device is removed, we delete every OCF bdev that used it */
1297 static void
1298 hotremove_cb(void *ctx)
1299 {
1300 	struct vbdev_ocf_base *base = ctx;
1301 	struct vbdev_ocf *vbdev;
1302 
1303 	if (!base->is_cache) {
1304 		if (base->parent->state.doing_finish) {
1305 			return;
1306 		}
1307 
1308 		SPDK_NOTICELOG("Deinitializing '%s' because its core device '%s' was removed\n",
1309 			       base->parent->name, base->name);
1310 		vbdev_ocf_delete(base->parent, NULL, NULL);
1311 		return;
1312 	}
1313 
1314 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1315 		if (vbdev->state.doing_finish) {
1316 			continue;
1317 		}
1318 		if (strcmp(base->name, vbdev->cache.name) == 0) {
1319 			SPDK_NOTICELOG("Deinitializing '%s' because"
1320 				       " its cache device '%s' was removed\n",
1321 				       vbdev->name, base->name);
1322 			vbdev_ocf_delete(vbdev, NULL, NULL);
1323 		}
1324 	}
1325 }
1326 
1327 /* Open base SPDK bdev and claim it */
1328 static int
1329 attach_base(struct vbdev_ocf_base *base)
1330 {
1331 	int status;
1332 
1333 	if (base->attached) {
1334 		return -EALREADY;
1335 	}
1336 
1337 	/* If base cache bdev was already opened by other vbdev,
1338 	 * we just copy its descriptor here */
1339 	if (base->is_cache) {
1340 		struct vbdev_ocf_base *existing = get_other_cache_base(base);
1341 		if (existing) {
1342 			base->desc = existing->desc;
1343 			base->management_channel = existing->management_channel;
1344 			base->attached = true;
1345 			return 0;
1346 		}
1347 	}
1348 
1349 	status = spdk_bdev_open(base->bdev, true, hotremove_cb, base, &base->desc);
1350 	if (status) {
1351 		SPDK_ERRLOG("Unable to open device '%s' for writing\n", base->name);
1352 		return status;
1353 	}
1354 
1355 	status = spdk_bdev_module_claim_bdev(base->bdev, base->desc,
1356 					     &ocf_if);
1357 	if (status) {
1358 		SPDK_ERRLOG("Unable to claim device '%s'\n", base->name);
1359 		spdk_bdev_close(base->desc);
1360 		return status;
1361 	}
1362 
1363 	base->management_channel = spdk_bdev_get_io_channel(base->desc);
1364 	if (!base->management_channel) {
1365 		SPDK_ERRLOG("Unable to get io channel '%s'\n", base->name);
1366 		spdk_bdev_module_release_bdev(base->bdev);
1367 		spdk_bdev_close(base->desc);
1368 		return -ENOMEM;
1369 	}
1370 
1371 	base->attached = true;
1372 	return status;
1373 }
1374 
1375 /* Attach base bdevs */
1376 static int
1377 attach_base_bdevs(struct vbdev_ocf *vbdev,
1378 		  struct spdk_bdev *cache_bdev,
1379 		  struct spdk_bdev *core_bdev)
1380 {
1381 	int rc = 0;
1382 
1383 	if (cache_bdev) {
1384 		vbdev->cache.bdev = cache_bdev;
1385 		rc |= attach_base(&vbdev->cache);
1386 	}
1387 
1388 	if (core_bdev) {
1389 		vbdev->core.bdev = core_bdev;
1390 		rc |= attach_base(&vbdev->core);
1391 	}
1392 
1393 	return rc;
1394 }
1395 
1396 /* Init and then start vbdev if all base devices are present */
1397 void
1398 vbdev_ocf_construct(const char *vbdev_name,
1399 		    const char *cache_mode_name,
1400 		    const char *cache_name,
1401 		    const char *core_name,
1402 		    bool loadq,
1403 		    void (*cb)(int, struct vbdev_ocf *, void *),
1404 		    void *cb_arg)
1405 {
1406 	int rc;
1407 	struct spdk_bdev *cache_bdev = spdk_bdev_get_by_name(cache_name);
1408 	struct spdk_bdev *core_bdev = spdk_bdev_get_by_name(core_name);
1409 	struct vbdev_ocf *vbdev;
1410 
1411 	rc = init_vbdev(vbdev_name, cache_mode_name, cache_name, core_name, loadq);
1412 	if (rc) {
1413 		cb(rc, NULL, cb_arg);
1414 		return;
1415 	}
1416 
1417 	vbdev = vbdev_ocf_get_by_name(vbdev_name);
1418 	if (vbdev == NULL) {
1419 		cb(-ENODEV, NULL, cb_arg);
1420 		return;
1421 	}
1422 
1423 	if (cache_bdev == NULL) {
1424 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for cache device '%s' to connect\n",
1425 			       vbdev->name, cache_name);
1426 	}
1427 	if (core_bdev == NULL) {
1428 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for core device '%s' to connect\n",
1429 			       vbdev->name, core_name);
1430 	}
1431 
1432 	rc = attach_base_bdevs(vbdev, cache_bdev, core_bdev);
1433 	if (rc) {
1434 		cb(rc, vbdev, cb_arg);
1435 		return;
1436 	}
1437 
1438 	if (core_bdev && cache_bdev) {
1439 		register_vbdev(vbdev, cb, cb_arg);
1440 	} else {
1441 		cb(0, vbdev, cb_arg);
1442 	}
1443 }
1444 
1445 /* This called if new device is created in SPDK application
1446  * If that device named as one of base bdevs of OCF vbdev,
1447  * claim and open them */
1448 static void
1449 vbdev_ocf_examine(struct spdk_bdev *bdev)
1450 {
1451 	const char *bdev_name = spdk_bdev_get_name(bdev);
1452 	struct vbdev_ocf *vbdev;
1453 
1454 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1455 		if (vbdev->state.doing_finish) {
1456 			continue;
1457 		}
1458 
1459 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1460 			attach_base_bdevs(vbdev, bdev, NULL);
1461 			continue;
1462 		}
1463 		if (!strcmp(bdev_name, vbdev->core.name)) {
1464 			attach_base_bdevs(vbdev, NULL, bdev);
1465 			break;
1466 		}
1467 	}
1468 	spdk_bdev_module_examine_done(&ocf_if);
1469 }
1470 
1471 struct metadata_probe_ctx {
1472 	struct vbdev_ocf_base base;
1473 	ocf_volume_t volume;
1474 
1475 	struct ocf_volume_uuid *core_uuids;
1476 	unsigned int uuid_count;
1477 
1478 	int result;
1479 	int refcnt;
1480 };
1481 
1482 static void
1483 examine_ctx_put(struct metadata_probe_ctx *ctx)
1484 {
1485 	unsigned int i;
1486 
1487 	ctx->refcnt--;
1488 	if (ctx->refcnt > 0) {
1489 		return;
1490 	}
1491 
1492 	if (ctx->result) {
1493 		SPDK_ERRLOG("OCF metadata probe for bdev '%s' failed with %d\n",
1494 			    spdk_bdev_get_name(ctx->base.bdev), ctx->result);
1495 	}
1496 
1497 	if (ctx->base.desc) {
1498 		spdk_bdev_close(ctx->base.desc);
1499 	}
1500 
1501 	if (ctx->volume) {
1502 		ocf_volume_destroy(ctx->volume);
1503 	}
1504 
1505 	if (ctx->core_uuids) {
1506 		for (i = 0; i < ctx->uuid_count; i++) {
1507 			free(ctx->core_uuids[i].data);
1508 		}
1509 	}
1510 	free(ctx->core_uuids);
1511 
1512 	examine_done(ctx->result, NULL, ctx->base.bdev);
1513 	free(ctx);
1514 }
1515 
1516 static void
1517 metadata_probe_construct_cb(int rc, struct vbdev_ocf *vbdev, void *vctx)
1518 {
1519 	struct metadata_probe_ctx *ctx = vctx;
1520 
1521 	examine_ctx_put(ctx);
1522 }
1523 
1524 /* This is second callback for ocf_metadata_probe_cores()
1525  * Here we create vbdev configurations based on UUIDs */
1526 static void
1527 metadata_probe_cores_construct(void *priv, int error, unsigned int num_cores)
1528 {
1529 	struct metadata_probe_ctx *ctx = priv;
1530 	const char *vbdev_name;
1531 	const char *core_name;
1532 	unsigned int i;
1533 
1534 	if (error) {
1535 		ctx->result = error;
1536 		examine_ctx_put(ctx);
1537 		return;
1538 	}
1539 
1540 	for (i = 0; i < num_cores; i++) {
1541 		core_name = ocf_uuid_to_str(&ctx->core_uuids[i]);
1542 		vbdev_name = core_name + strlen(core_name) + 1;
1543 		ctx->refcnt++;
1544 		vbdev_ocf_construct(vbdev_name, NULL, ctx->base.bdev->name, core_name, true,
1545 				    metadata_probe_construct_cb, ctx);
1546 	}
1547 
1548 	examine_ctx_put(ctx);
1549 }
1550 
1551 /* This callback is called after OCF reads cores UUIDs from cache metadata
1552  * Here we allocate memory for those UUIDs and call ocf_metadata_probe_cores() again */
1553 static void
1554 metadata_probe_cores_get_num(void *priv, int error, unsigned int num_cores)
1555 {
1556 	struct metadata_probe_ctx *ctx = priv;
1557 	unsigned int i;
1558 
1559 	if (error) {
1560 		ctx->result = error;
1561 		examine_ctx_put(ctx);
1562 		return;
1563 	}
1564 
1565 	ctx->uuid_count = num_cores;
1566 	ctx->core_uuids = calloc(num_cores, sizeof(struct ocf_volume_uuid));
1567 	if (!ctx->core_uuids) {
1568 		ctx->result = -ENOMEM;
1569 		examine_ctx_put(ctx);
1570 		return;
1571 	}
1572 
1573 	for (i = 0; i < ctx->uuid_count; i++) {
1574 		ctx->core_uuids[i].size = OCF_VOLUME_UUID_MAX_SIZE;
1575 		ctx->core_uuids[i].data = malloc(OCF_VOLUME_UUID_MAX_SIZE);
1576 		if (!ctx->core_uuids[i].data) {
1577 			ctx->result = -ENOMEM;
1578 			examine_ctx_put(ctx);
1579 			return;
1580 		}
1581 	}
1582 
1583 	ocf_metadata_probe_cores(vbdev_ocf_ctx, ctx->volume, ctx->core_uuids, ctx->uuid_count,
1584 				 metadata_probe_cores_construct, ctx);
1585 }
1586 
1587 static void
1588 metadata_probe_cb(void *priv, int rc,
1589 		  struct ocf_metadata_probe_status *status)
1590 {
1591 	struct metadata_probe_ctx *ctx = priv;
1592 
1593 	if (rc) {
1594 		/* -ENODATA means device does not have cache metadata on it */
1595 		if (rc != -ENODATA) {
1596 			ctx->result = rc;
1597 		}
1598 		examine_ctx_put(ctx);
1599 		return;
1600 	}
1601 
1602 	ocf_metadata_probe_cores(vbdev_ocf_ctx, ctx->volume, NULL, 0,
1603 				 metadata_probe_cores_get_num, ctx);
1604 }
1605 
1606 /* This is called after vbdev_ocf_examine
1607  * It allows to delay application initialization
1608  * until all OCF bdevs get registered
1609  * If vbdev has all of its base devices it starts asynchronously here
1610  * We first check if bdev appears in configuration,
1611  * if not we do metadata_probe() to create its configuration from bdev metadata */
1612 static void
1613 vbdev_ocf_examine_disk(struct spdk_bdev *bdev)
1614 {
1615 	const char *bdev_name = spdk_bdev_get_name(bdev);
1616 	struct vbdev_ocf *vbdev;
1617 	struct metadata_probe_ctx *ctx;
1618 	bool created_from_config = false;
1619 	int rc;
1620 
1621 	examine_start(bdev);
1622 
1623 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1624 		if (vbdev->state.doing_finish || vbdev->state.started) {
1625 			continue;
1626 		}
1627 
1628 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1629 			examine_start(bdev);
1630 			register_vbdev(vbdev, examine_done, bdev);
1631 			created_from_config = true;
1632 			continue;
1633 		}
1634 		if (!strcmp(bdev_name, vbdev->core.name)) {
1635 			examine_start(bdev);
1636 			register_vbdev(vbdev, examine_done, bdev);
1637 			examine_done(0, NULL, bdev);
1638 			return;
1639 		}
1640 	}
1641 
1642 	/* If devices is discovered during config we do not check for metadata */
1643 	if (created_from_config) {
1644 		examine_done(0, NULL, bdev);
1645 		return;
1646 	}
1647 
1648 	/* Metadata probe path
1649 	 * We create temporary OCF volume and a temporary base structure
1650 	 * to use them for ocf_metadata_probe() and for bottom adapter IOs
1651 	 * Then we get UUIDs of core devices an create configurations based on them */
1652 	ctx = calloc(1, sizeof(*ctx));
1653 	if (!ctx) {
1654 		examine_done(-ENOMEM, NULL, bdev);
1655 		return;
1656 	}
1657 
1658 	ctx->base.bdev = bdev;
1659 	ctx->refcnt = 1;
1660 
1661 	rc = spdk_bdev_open(ctx->base.bdev, true, NULL, NULL, &ctx->base.desc);
1662 	if (rc) {
1663 		ctx->result = rc;
1664 		examine_ctx_put(ctx);
1665 		return;
1666 	}
1667 
1668 	rc = ocf_ctx_volume_create(vbdev_ocf_ctx, &ctx->volume, NULL, SPDK_OBJECT);
1669 	if (rc) {
1670 		ctx->result = rc;
1671 		examine_ctx_put(ctx);
1672 		return;
1673 	}
1674 
1675 	rc = ocf_volume_open(ctx->volume, &ctx->base);
1676 	if (rc) {
1677 		ctx->result = rc;
1678 		examine_ctx_put(ctx);
1679 		return;
1680 	}
1681 
1682 	ocf_metadata_probe(vbdev_ocf_ctx, ctx->volume, metadata_probe_cb, ctx);
1683 }
1684 
1685 static int
1686 vbdev_ocf_get_ctx_size(void)
1687 {
1688 	return sizeof(struct bdev_ocf_data);
1689 }
1690 
1691 static void
1692 fini_start(void)
1693 {
1694 	g_fini_started = true;
1695 }
1696 
1697 /* Module-global function table
1698  * Does not relate to vbdev instances */
1699 static struct spdk_bdev_module ocf_if = {
1700 	.name = "ocf",
1701 	.module_init = vbdev_ocf_init,
1702 	.fini_start = fini_start,
1703 	.module_fini = vbdev_ocf_module_fini,
1704 	.config_text = NULL,
1705 	.get_ctx_size = vbdev_ocf_get_ctx_size,
1706 	.examine_config = vbdev_ocf_examine,
1707 	.examine_disk   = vbdev_ocf_examine_disk,
1708 };
1709 SPDK_BDEV_MODULE_REGISTER(ocf, &ocf_if);
1710 
1711 SPDK_LOG_REGISTER_COMPONENT("vbdev_ocf", SPDK_TRACE_VBDEV_OCF)
1712