xref: /spdk/module/bdev/ocf/vbdev_ocf.c (revision de21d8f4e45b732c13ce5c7aa1872f73bffd38aa)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include <ocf/ocf.h>
35 #include <ocf/ocf_types.h>
36 #include <ocf/ocf_mngt.h>
37 
38 #include "ctx.h"
39 #include "data.h"
40 #include "volume.h"
41 #include "utils.h"
42 #include "vbdev_ocf.h"
43 
44 #include "spdk/bdev_module.h"
45 #include "spdk/thread.h"
46 #include "spdk/string.h"
47 #include "spdk/log.h"
48 #include "spdk/cpuset.h"
49 
50 static struct spdk_bdev_module ocf_if;
51 
52 static TAILQ_HEAD(, vbdev_ocf) g_ocf_vbdev_head
53 	= TAILQ_HEAD_INITIALIZER(g_ocf_vbdev_head);
54 
55 static TAILQ_HEAD(, examining_bdev) g_ocf_examining_bdevs_head
56 	= TAILQ_HEAD_INITIALIZER(g_ocf_examining_bdevs_head);
57 
58 bool g_fini_started = false;
59 
60 /* Structure for keeping list of bdevs that are claimed but not used yet */
61 struct examining_bdev {
62 	struct spdk_bdev           *bdev;
63 	TAILQ_ENTRY(examining_bdev) tailq;
64 };
65 
66 /* Add bdev to list of claimed */
67 static void
68 examine_start(struct spdk_bdev *bdev)
69 {
70 	struct examining_bdev *entry = malloc(sizeof(*entry));
71 
72 	assert(entry);
73 	entry->bdev = bdev;
74 	TAILQ_INSERT_TAIL(&g_ocf_examining_bdevs_head, entry, tailq);
75 }
76 
77 /* Find bdev on list of claimed bdevs, then remove it,
78  * if it was the last one on list then report examine done */
79 static void
80 examine_done(int status, struct vbdev_ocf *vbdev, void *cb_arg)
81 {
82 	struct spdk_bdev *bdev = cb_arg;
83 	struct examining_bdev *entry, *safe, *found = NULL;
84 
85 	TAILQ_FOREACH_SAFE(entry, &g_ocf_examining_bdevs_head, tailq, safe) {
86 		if (entry->bdev == bdev) {
87 			if (found) {
88 				goto remove;
89 			} else {
90 				found = entry;
91 			}
92 		}
93 	}
94 
95 	assert(found);
96 	spdk_bdev_module_examine_done(&ocf_if);
97 
98 remove:
99 	TAILQ_REMOVE(&g_ocf_examining_bdevs_head, found, tailq);
100 	free(found);
101 }
102 
103 /* Free allocated strings and structure itself
104  * Used at shutdown only */
105 static void
106 free_vbdev(struct vbdev_ocf *vbdev)
107 {
108 	if (!vbdev) {
109 		return;
110 	}
111 
112 	free(vbdev->name);
113 	free(vbdev->cache.name);
114 	free(vbdev->core.name);
115 	free(vbdev);
116 }
117 
118 /* Get existing cache base
119  * that is attached to other vbdev */
120 static struct vbdev_ocf_base *
121 get_other_cache_base(struct vbdev_ocf_base *base)
122 {
123 	struct vbdev_ocf *vbdev;
124 
125 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
126 		if (&vbdev->cache == base || !vbdev->cache.attached) {
127 			continue;
128 		}
129 		if (!strcmp(vbdev->cache.name, base->name)) {
130 			return &vbdev->cache;
131 		}
132 	}
133 
134 	return NULL;
135 }
136 
137 static bool
138 is_ocf_cache_running(struct vbdev_ocf *vbdev)
139 {
140 	if (vbdev->cache.attached && vbdev->ocf_cache) {
141 		return ocf_cache_is_running(vbdev->ocf_cache);
142 	}
143 	return false;
144 }
145 
146 /* Get existing OCF cache instance
147  * that is started by other vbdev */
148 static ocf_cache_t
149 get_other_cache_instance(struct vbdev_ocf *vbdev)
150 {
151 	struct vbdev_ocf *cmp;
152 
153 	TAILQ_FOREACH(cmp, &g_ocf_vbdev_head, tailq) {
154 		if (cmp->state.doing_finish || cmp == vbdev) {
155 			continue;
156 		}
157 		if (strcmp(cmp->cache.name, vbdev->cache.name)) {
158 			continue;
159 		}
160 		if (is_ocf_cache_running(cmp)) {
161 			return cmp->ocf_cache;
162 		}
163 	}
164 
165 	return NULL;
166 }
167 
168 static void
169 _remove_base_bdev(void *ctx)
170 {
171 	struct spdk_bdev_desc *desc = ctx;
172 
173 	spdk_bdev_close(desc);
174 }
175 
176 /* Close and unclaim base bdev */
177 static void
178 remove_base_bdev(struct vbdev_ocf_base *base)
179 {
180 	if (base->attached) {
181 		if (base->management_channel) {
182 			spdk_put_io_channel(base->management_channel);
183 		}
184 
185 		spdk_bdev_module_release_bdev(base->bdev);
186 		/* Close the underlying bdev on its same opened thread. */
187 		if (base->thread && base->thread != spdk_get_thread()) {
188 			spdk_thread_send_msg(base->thread, _remove_base_bdev, base->desc);
189 		} else {
190 			spdk_bdev_close(base->desc);
191 		}
192 		base->attached = false;
193 	}
194 }
195 
196 /* Finish unregister operation */
197 static void
198 unregister_finish(struct vbdev_ocf *vbdev)
199 {
200 	spdk_bdev_destruct_done(&vbdev->exp_bdev, vbdev->state.stop_status);
201 	ocf_mngt_cache_put(vbdev->ocf_cache);
202 	vbdev_ocf_cache_ctx_put(vbdev->cache_ctx);
203 	vbdev_ocf_mngt_continue(vbdev, 0);
204 }
205 
206 static void
207 close_core_bdev(struct vbdev_ocf *vbdev)
208 {
209 	remove_base_bdev(&vbdev->core);
210 	vbdev_ocf_mngt_continue(vbdev, 0);
211 }
212 
213 static void
214 remove_core_cmpl(void *priv, int error)
215 {
216 	struct vbdev_ocf *vbdev = priv;
217 
218 	ocf_mngt_cache_unlock(vbdev->ocf_cache);
219 	vbdev_ocf_mngt_continue(vbdev, error);
220 }
221 
222 /* Try to lock cache, then remove core */
223 static void
224 remove_core_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
225 {
226 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
227 
228 	if (error) {
229 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
230 			    error, vbdev->name);
231 		vbdev_ocf_mngt_continue(vbdev, error);
232 		return;
233 	}
234 
235 	ocf_mngt_cache_remove_core(vbdev->ocf_core, remove_core_cmpl, vbdev);
236 }
237 
238 /* Detach core base */
239 static void
240 detach_core(struct vbdev_ocf *vbdev)
241 {
242 	if (is_ocf_cache_running(vbdev)) {
243 		ocf_mngt_cache_lock(vbdev->ocf_cache, remove_core_cache_lock_cmpl, vbdev);
244 	} else {
245 		vbdev_ocf_mngt_continue(vbdev, 0);
246 	}
247 }
248 
249 static void
250 close_cache_bdev(struct vbdev_ocf *vbdev)
251 {
252 	remove_base_bdev(&vbdev->cache);
253 	vbdev_ocf_mngt_continue(vbdev, 0);
254 }
255 
256 /* Detach cache base */
257 static void
258 detach_cache(struct vbdev_ocf *vbdev)
259 {
260 	vbdev->state.stop_status = vbdev->mngt_ctx.status;
261 
262 	/* If some other vbdev references this cache bdev,
263 	 * we detach this only by changing the flag, without actual close */
264 	if (get_other_cache_base(&vbdev->cache)) {
265 		vbdev->cache.attached = false;
266 	}
267 
268 	vbdev_ocf_mngt_continue(vbdev, 0);
269 }
270 
271 static void
272 stop_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
273 {
274 	struct vbdev_ocf *vbdev = priv;
275 
276 	vbdev_ocf_queue_put(vbdev->cache_ctx->mngt_queue);
277 	ocf_mngt_cache_unlock(cache);
278 
279 	vbdev_ocf_mngt_continue(vbdev, error);
280 }
281 
282 /* Try to lock cache, then stop it */
283 static void
284 stop_vbdev_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
285 {
286 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
287 
288 	if (error) {
289 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
290 			    error, vbdev->name);
291 		vbdev_ocf_mngt_continue(vbdev, error);
292 		return;
293 	}
294 
295 	ocf_mngt_cache_stop(vbdev->ocf_cache, stop_vbdev_cmpl, vbdev);
296 }
297 
298 /* Stop OCF cache object
299  * vbdev_ocf is not operational after this */
300 static void
301 stop_vbdev(struct vbdev_ocf *vbdev)
302 {
303 	if (!is_ocf_cache_running(vbdev)) {
304 		vbdev_ocf_mngt_continue(vbdev, 0);
305 		return;
306 	}
307 
308 	if (!g_fini_started && get_other_cache_instance(vbdev)) {
309 		SPDK_NOTICELOG("Not stopping cache instance '%s'"
310 			       " because it is referenced by other OCF bdev\n",
311 			       vbdev->cache.name);
312 		vbdev_ocf_mngt_continue(vbdev, 0);
313 		return;
314 	}
315 
316 	ocf_mngt_cache_lock(vbdev->ocf_cache, stop_vbdev_cache_lock_cmpl, vbdev);
317 }
318 
319 static void
320 flush_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
321 {
322 	struct vbdev_ocf *vbdev = priv;
323 
324 	ocf_mngt_cache_unlock(cache);
325 	vbdev_ocf_mngt_continue(vbdev, error);
326 }
327 
328 static void
329 flush_vbdev_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
330 {
331 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
332 
333 	if (error) {
334 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
335 			    error, vbdev->name);
336 		vbdev_ocf_mngt_continue(vbdev, error);
337 		return;
338 	}
339 
340 	ocf_mngt_cache_flush(vbdev->ocf_cache, flush_vbdev_cmpl, vbdev);
341 }
342 
343 static void
344 flush_vbdev(struct vbdev_ocf *vbdev)
345 {
346 	if (!is_ocf_cache_running(vbdev)) {
347 		vbdev_ocf_mngt_continue(vbdev, -EINVAL);
348 		return;
349 	}
350 
351 	ocf_mngt_cache_lock(vbdev->ocf_cache, flush_vbdev_cache_lock_cmpl, vbdev);
352 }
353 
354 /* Procedures called during dirty unregister */
355 vbdev_ocf_mngt_fn unregister_path_dirty[] = {
356 	flush_vbdev,
357 	stop_vbdev,
358 	detach_cache,
359 	close_cache_bdev,
360 	detach_core,
361 	close_core_bdev,
362 	unregister_finish,
363 	NULL
364 };
365 
366 /* Procedures called during clean unregister */
367 vbdev_ocf_mngt_fn unregister_path_clean[] = {
368 	flush_vbdev,
369 	detach_core,
370 	close_core_bdev,
371 	stop_vbdev,
372 	detach_cache,
373 	close_cache_bdev,
374 	unregister_finish,
375 	NULL
376 };
377 
378 /* Start asynchronous management operation using unregister_path */
379 static void
380 unregister_cb(void *opaque)
381 {
382 	struct vbdev_ocf *vbdev = opaque;
383 	vbdev_ocf_mngt_fn *unregister_path;
384 	int rc;
385 
386 	unregister_path = vbdev->state.doing_clean_delete ?
387 			  unregister_path_clean : unregister_path_dirty;
388 
389 	rc = vbdev_ocf_mngt_start(vbdev, unregister_path, NULL, NULL);
390 	if (rc) {
391 		SPDK_ERRLOG("Unable to unregister OCF bdev: %d\n", rc);
392 		spdk_bdev_destruct_done(&vbdev->exp_bdev, rc);
393 	}
394 }
395 
396 /* Clean remove case - remove core and then cache, this order
397  * will remove instance permanently */
398 static void
399 _vbdev_ocf_destruct_clean(struct vbdev_ocf *vbdev)
400 {
401 	if (vbdev->core.attached) {
402 		detach_core(vbdev);
403 		close_core_bdev(vbdev);
404 	}
405 
406 	if (vbdev->cache.attached) {
407 		detach_cache(vbdev);
408 		close_cache_bdev(vbdev);
409 	}
410 }
411 
412 /* Dirty shutdown/hot remove case - remove cache and then core, this order
413  * will allow us to recover this instance in the future */
414 static void
415 _vbdev_ocf_destruct_dirty(struct vbdev_ocf *vbdev)
416 {
417 	if (vbdev->cache.attached) {
418 		detach_cache(vbdev);
419 		close_cache_bdev(vbdev);
420 	}
421 
422 	if (vbdev->core.attached) {
423 		detach_core(vbdev);
424 		close_core_bdev(vbdev);
425 	}
426 }
427 
428 /* Unregister io device with callback to unregister_cb
429  * This function is called during spdk_bdev_unregister */
430 static int
431 vbdev_ocf_destruct(void *opaque)
432 {
433 	struct vbdev_ocf *vbdev = opaque;
434 
435 	if (vbdev->state.doing_finish) {
436 		return -EALREADY;
437 	}
438 
439 	if (vbdev->state.starting && !vbdev->state.started) {
440 		/* Prevent before detach cache/core during register path of
441 		  this bdev */
442 		return -EBUSY;
443 	}
444 
445 	vbdev->state.doing_finish = true;
446 
447 	if (vbdev->state.started) {
448 		spdk_io_device_unregister(vbdev, unregister_cb);
449 		/* Return 1 because unregister is delayed */
450 		return 1;
451 	}
452 
453 	if (vbdev->state.doing_clean_delete) {
454 		_vbdev_ocf_destruct_clean(vbdev);
455 	} else {
456 		_vbdev_ocf_destruct_dirty(vbdev);
457 	}
458 
459 	return 0;
460 }
461 
462 /* Stop OCF cache and unregister SPDK bdev */
463 int
464 vbdev_ocf_delete(struct vbdev_ocf *vbdev, void (*cb)(void *, int), void *cb_arg)
465 {
466 	int rc = 0;
467 
468 	if (vbdev->state.started) {
469 		spdk_bdev_unregister(&vbdev->exp_bdev, cb, cb_arg);
470 	} else {
471 		rc = vbdev_ocf_destruct(vbdev);
472 		if (rc == 0 && cb) {
473 			cb(cb_arg, 0);
474 		}
475 	}
476 
477 	return rc;
478 }
479 
480 /* Remove cores permanently and then stop OCF cache and unregister SPDK bdev */
481 int
482 vbdev_ocf_delete_clean(struct vbdev_ocf *vbdev, void (*cb)(void *, int),
483 		       void *cb_arg)
484 {
485 	vbdev->state.doing_clean_delete = true;
486 
487 	return vbdev_ocf_delete(vbdev, cb, cb_arg);
488 }
489 
490 
491 /* If vbdev is online, return its object */
492 struct vbdev_ocf *
493 vbdev_ocf_get_by_name(const char *name)
494 {
495 	struct vbdev_ocf *vbdev;
496 
497 	if (name == NULL) {
498 		assert(false);
499 		return NULL;
500 	}
501 
502 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
503 		if (vbdev->name == NULL || vbdev->state.doing_finish) {
504 			continue;
505 		}
506 		if (strcmp(vbdev->name, name) == 0) {
507 			return vbdev;
508 		}
509 	}
510 	return NULL;
511 }
512 
513 /* Return matching base if parent vbdev is online */
514 struct vbdev_ocf_base *
515 vbdev_ocf_get_base_by_name(const char *name)
516 {
517 	struct vbdev_ocf *vbdev;
518 
519 	if (name == NULL) {
520 		assert(false);
521 		return NULL;
522 	}
523 
524 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
525 		if (vbdev->state.doing_finish) {
526 			continue;
527 		}
528 
529 		if (vbdev->cache.name && strcmp(vbdev->cache.name, name) == 0) {
530 			return &vbdev->cache;
531 		}
532 		if (vbdev->core.name && strcmp(vbdev->core.name, name) == 0) {
533 			return &vbdev->core;
534 		}
535 	}
536 	return NULL;
537 }
538 
539 /* Execute fn for each OCF device that is online or waits for base devices */
540 void
541 vbdev_ocf_foreach(vbdev_ocf_foreach_fn fn, void *ctx)
542 {
543 	struct vbdev_ocf *vbdev;
544 
545 	assert(fn != NULL);
546 
547 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
548 		if (!vbdev->state.doing_finish) {
549 			fn(vbdev, ctx);
550 		}
551 	}
552 }
553 
554 /* Called from OCF when SPDK_IO is completed */
555 static void
556 vbdev_ocf_io_submit_cb(struct ocf_io *io, int error)
557 {
558 	struct spdk_bdev_io *bdev_io = io->priv1;
559 
560 	if (error == 0) {
561 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
562 	} else if (error == -ENOMEM) {
563 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
564 	} else {
565 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
566 	}
567 
568 	ocf_io_put(io);
569 }
570 
571 /* Configure io parameters and send it to OCF */
572 static int
573 io_submit_to_ocf(struct spdk_bdev_io *bdev_io, struct ocf_io *io)
574 {
575 	switch (bdev_io->type) {
576 	case SPDK_BDEV_IO_TYPE_WRITE:
577 	case SPDK_BDEV_IO_TYPE_READ:
578 		ocf_core_submit_io(io);
579 		return 0;
580 	case SPDK_BDEV_IO_TYPE_FLUSH:
581 		ocf_core_submit_flush(io);
582 		return 0;
583 	case SPDK_BDEV_IO_TYPE_UNMAP:
584 		ocf_core_submit_discard(io);
585 		return 0;
586 	case SPDK_BDEV_IO_TYPE_RESET:
587 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
588 	default:
589 		SPDK_ERRLOG("Unsupported IO type: %d\n", bdev_io->type);
590 		return -EINVAL;
591 	}
592 }
593 
594 /* Submit SPDK-IO to OCF */
595 static void
596 io_handle(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
597 {
598 	struct vbdev_ocf *vbdev = bdev_io->bdev->ctxt;
599 	struct ocf_io *io = NULL;
600 	struct bdev_ocf_data *data = NULL;
601 	struct vbdev_ocf_qctx *qctx = spdk_io_channel_get_ctx(ch);
602 	uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
603 	uint64_t offset = bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen;
604 	int dir, flags = 0;
605 	int err;
606 
607 	switch (bdev_io->type) {
608 	case SPDK_BDEV_IO_TYPE_READ:
609 		dir = OCF_READ;
610 		break;
611 	case SPDK_BDEV_IO_TYPE_WRITE:
612 		dir = OCF_WRITE;
613 		break;
614 	case SPDK_BDEV_IO_TYPE_FLUSH:
615 		dir = OCF_WRITE;
616 		break;
617 	case SPDK_BDEV_IO_TYPE_UNMAP:
618 		dir = OCF_WRITE;
619 		break;
620 	default:
621 		err = -EINVAL;
622 		goto fail;
623 	}
624 
625 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
626 		flags = OCF_WRITE_FLUSH;
627 	}
628 
629 	io = ocf_core_new_io(vbdev->ocf_core, qctx->queue, offset, len, dir, 0, flags);
630 	if (!io) {
631 		err = -ENOMEM;
632 		goto fail;
633 	}
634 
635 	data = vbdev_ocf_data_from_spdk_io(bdev_io);
636 	if (!data) {
637 		err = -ENOMEM;
638 		goto fail;
639 	}
640 
641 	err = ocf_io_set_data(io, data, 0);
642 	if (err) {
643 		goto fail;
644 	}
645 
646 	ocf_io_set_cmpl(io, bdev_io, NULL, vbdev_ocf_io_submit_cb);
647 
648 	err = io_submit_to_ocf(bdev_io, io);
649 	if (err) {
650 		goto fail;
651 	}
652 
653 	return;
654 
655 fail:
656 	if (io) {
657 		ocf_io_put(io);
658 	}
659 
660 	if (err == -ENOMEM) {
661 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
662 	} else {
663 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
664 	}
665 }
666 
667 static void
668 vbdev_ocf_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
669 		     bool success)
670 {
671 	if (!success) {
672 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
673 		return;
674 	}
675 
676 	io_handle(ch, bdev_io);
677 }
678 
679 /* Called from bdev layer when an io to Cache vbdev is submitted */
680 static void
681 vbdev_ocf_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
682 {
683 	switch (bdev_io->type) {
684 	case SPDK_BDEV_IO_TYPE_READ:
685 		/* User does not have to allocate io vectors for the request,
686 		 * so in case they are not allocated, we allocate them here */
687 		spdk_bdev_io_get_buf(bdev_io, vbdev_ocf_get_buf_cb,
688 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
689 		break;
690 	case SPDK_BDEV_IO_TYPE_WRITE:
691 	case SPDK_BDEV_IO_TYPE_FLUSH:
692 	case SPDK_BDEV_IO_TYPE_UNMAP:
693 		io_handle(ch, bdev_io);
694 		break;
695 	case SPDK_BDEV_IO_TYPE_RESET:
696 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
697 	default:
698 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
699 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
700 		break;
701 	}
702 }
703 
704 /* Called from bdev layer */
705 static bool
706 vbdev_ocf_io_type_supported(void *opaque, enum spdk_bdev_io_type io_type)
707 {
708 	struct vbdev_ocf *vbdev = opaque;
709 
710 	switch (io_type) {
711 	case SPDK_BDEV_IO_TYPE_READ:
712 	case SPDK_BDEV_IO_TYPE_WRITE:
713 	case SPDK_BDEV_IO_TYPE_FLUSH:
714 	case SPDK_BDEV_IO_TYPE_UNMAP:
715 		return spdk_bdev_io_type_supported(vbdev->core.bdev, io_type);
716 	case SPDK_BDEV_IO_TYPE_RESET:
717 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
718 	default:
719 		return false;
720 	}
721 }
722 
723 /* Called from bdev layer */
724 static struct spdk_io_channel *
725 vbdev_ocf_get_io_channel(void *opaque)
726 {
727 	struct vbdev_ocf *bdev = opaque;
728 
729 	return spdk_get_io_channel(bdev);
730 }
731 
732 static int
733 vbdev_ocf_dump_info_json(void *opaque, struct spdk_json_write_ctx *w)
734 {
735 	struct vbdev_ocf *vbdev = opaque;
736 
737 	spdk_json_write_named_string(w, "cache_device", vbdev->cache.name);
738 	spdk_json_write_named_string(w, "core_device", vbdev->core.name);
739 
740 	spdk_json_write_named_string(w, "mode",
741 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
742 	spdk_json_write_named_uint32(w, "cache_line_size",
743 				     ocf_cache_get_line_size(vbdev->ocf_cache));
744 	spdk_json_write_named_bool(w, "metadata_volatile",
745 				   vbdev->cfg.cache.metadata_volatile);
746 
747 	return 0;
748 }
749 
750 static void
751 vbdev_ocf_write_json_config(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
752 {
753 	struct vbdev_ocf *vbdev = bdev->ctxt;
754 
755 	spdk_json_write_object_begin(w);
756 
757 	spdk_json_write_named_string(w, "method", "bdev_ocf_create");
758 
759 	spdk_json_write_named_object_begin(w, "params");
760 	spdk_json_write_named_string(w, "name", vbdev->name);
761 	spdk_json_write_named_string(w, "mode",
762 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
763 	spdk_json_write_named_uint32(w, "cache_line_size",
764 				     ocf_cache_get_line_size(vbdev->ocf_cache));
765 	spdk_json_write_named_string(w, "cache_bdev_name", vbdev->cache.name);
766 	spdk_json_write_named_string(w, "core_bdev_name", vbdev->core.name);
767 	spdk_json_write_object_end(w);
768 
769 	spdk_json_write_object_end(w);
770 }
771 
772 /* Cache vbdev function table
773  * Used by bdev layer */
774 static struct spdk_bdev_fn_table cache_dev_fn_table = {
775 	.destruct = vbdev_ocf_destruct,
776 	.io_type_supported = vbdev_ocf_io_type_supported,
777 	.submit_request	= vbdev_ocf_submit_request,
778 	.get_io_channel	= vbdev_ocf_get_io_channel,
779 	.write_config_json = vbdev_ocf_write_json_config,
780 	.dump_info_json = vbdev_ocf_dump_info_json,
781 };
782 
783 /* Poller function for the OCF queue
784  * We execute OCF requests here synchronously */
785 static int
786 queue_poll(void *opaque)
787 {
788 	struct vbdev_ocf_qctx *qctx = opaque;
789 	uint32_t iono = ocf_queue_pending_io(qctx->queue);
790 	int i, max = spdk_min(32, iono);
791 
792 	for (i = 0; i < max; i++) {
793 		ocf_queue_run_single(qctx->queue);
794 	}
795 
796 	if (iono > 0) {
797 		return SPDK_POLLER_BUSY;
798 	} else {
799 		return SPDK_POLLER_IDLE;
800 	}
801 }
802 
803 /* Called during ocf_submit_io, ocf_purge*
804  * and any other requests that need to submit io */
805 static void
806 vbdev_ocf_ctx_queue_kick(ocf_queue_t q)
807 {
808 }
809 
810 /* OCF queue deinitialization
811  * Called at ocf_cache_stop */
812 static void
813 vbdev_ocf_ctx_queue_stop(ocf_queue_t q)
814 {
815 	struct vbdev_ocf_qctx *qctx = ocf_queue_get_priv(q);
816 
817 	if (qctx) {
818 		spdk_put_io_channel(qctx->cache_ch);
819 		spdk_put_io_channel(qctx->core_ch);
820 		spdk_poller_unregister(&qctx->poller);
821 		if (qctx->allocated) {
822 			free(qctx);
823 		}
824 	}
825 }
826 
827 /* Queue ops is an interface for running queue thread
828  * stop() operation in called just before queue gets destroyed */
829 const struct ocf_queue_ops queue_ops = {
830 	.kick_sync = vbdev_ocf_ctx_queue_kick,
831 	.kick = vbdev_ocf_ctx_queue_kick,
832 	.stop = vbdev_ocf_ctx_queue_stop,
833 };
834 
835 /* Called on cache vbdev creation at every thread
836  * We allocate OCF queues here and SPDK poller for it */
837 static int
838 io_device_create_cb(void *io_device, void *ctx_buf)
839 {
840 	struct vbdev_ocf *vbdev = io_device;
841 	struct vbdev_ocf_qctx *qctx = ctx_buf;
842 	int rc;
843 
844 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &qctx->queue, &queue_ops);
845 	if (rc) {
846 		return rc;
847 	}
848 
849 	ocf_queue_set_priv(qctx->queue, qctx);
850 
851 	qctx->vbdev      = vbdev;
852 	qctx->cache_ch   = spdk_bdev_get_io_channel(vbdev->cache.desc);
853 	qctx->core_ch    = spdk_bdev_get_io_channel(vbdev->core.desc);
854 	qctx->poller     = SPDK_POLLER_REGISTER(queue_poll, qctx, 0);
855 
856 	return rc;
857 }
858 
859 /* Called per thread
860  * Put OCF queue and relaunch poller with new context to finish pending requests */
861 static void
862 io_device_destroy_cb(void *io_device, void *ctx_buf)
863 {
864 	/* Making a copy of context to use it after io channel will be destroyed */
865 	struct vbdev_ocf_qctx *copy = malloc(sizeof(*copy));
866 	struct vbdev_ocf_qctx *qctx = ctx_buf;
867 
868 	if (copy) {
869 		ocf_queue_set_priv(qctx->queue, copy);
870 		memcpy(copy, qctx, sizeof(*copy));
871 		spdk_poller_unregister(&qctx->poller);
872 		copy->poller = SPDK_POLLER_REGISTER(queue_poll, copy, 0);
873 		copy->allocated = true;
874 	} else {
875 		SPDK_ERRLOG("Unable to stop OCF queue properly: %s\n",
876 			    spdk_strerror(ENOMEM));
877 	}
878 
879 	vbdev_ocf_queue_put(qctx->queue);
880 }
881 
882 /* OCF management queue deinitialization */
883 static void
884 vbdev_ocf_ctx_mngt_queue_stop(ocf_queue_t q)
885 {
886 	struct spdk_poller *poller = ocf_queue_get_priv(q);
887 
888 	if (poller) {
889 		spdk_poller_unregister(&poller);
890 	}
891 }
892 
893 static int
894 mngt_queue_poll(void *opaque)
895 {
896 	ocf_queue_t q = opaque;
897 	uint32_t iono = ocf_queue_pending_io(q);
898 	int i, max = spdk_min(32, iono);
899 
900 	for (i = 0; i < max; i++) {
901 		ocf_queue_run_single(q);
902 	}
903 
904 	if (iono > 0) {
905 		return SPDK_POLLER_BUSY;
906 	} else {
907 		return SPDK_POLLER_IDLE;
908 	}
909 }
910 
911 static void
912 vbdev_ocf_ctx_mngt_queue_kick(ocf_queue_t q)
913 {
914 }
915 
916 /* Queue ops is an interface for running queue thread
917  * stop() operation in called just before queue gets destroyed */
918 const struct ocf_queue_ops mngt_queue_ops = {
919 	.kick_sync = NULL,
920 	.kick = vbdev_ocf_ctx_mngt_queue_kick,
921 	.stop = vbdev_ocf_ctx_mngt_queue_stop,
922 };
923 
924 static void
925 vbdev_ocf_mngt_exit(struct vbdev_ocf *vbdev, vbdev_ocf_mngt_fn *rollback_path, int rc)
926 {
927 	vbdev->state.starting = false;
928 	vbdev_ocf_mngt_stop(vbdev, rollback_path, rc);
929 }
930 
931 /* Create exported spdk object */
932 static void
933 finish_register(struct vbdev_ocf *vbdev)
934 {
935 	int result;
936 
937 	/* Copy properties of the base bdev */
938 	vbdev->exp_bdev.blocklen = vbdev->core.bdev->blocklen;
939 	vbdev->exp_bdev.write_cache = vbdev->core.bdev->write_cache;
940 	vbdev->exp_bdev.required_alignment = vbdev->core.bdev->required_alignment;
941 
942 	vbdev->exp_bdev.name = vbdev->name;
943 	vbdev->exp_bdev.product_name = "SPDK OCF";
944 
945 	vbdev->exp_bdev.blockcnt = vbdev->core.bdev->blockcnt;
946 	vbdev->exp_bdev.ctxt = vbdev;
947 	vbdev->exp_bdev.fn_table = &cache_dev_fn_table;
948 	vbdev->exp_bdev.module = &ocf_if;
949 
950 	/* Finally register vbdev in SPDK */
951 	spdk_io_device_register(vbdev, io_device_create_cb, io_device_destroy_cb,
952 				sizeof(struct vbdev_ocf_qctx), vbdev->name);
953 	result = spdk_bdev_register(&vbdev->exp_bdev);
954 	if (result) {
955 		SPDK_ERRLOG("Could not register exposed bdev %s\n",
956 			    vbdev->name);
957 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, result);
958 		return;
959 	} else {
960 		vbdev->state.started = true;
961 	}
962 
963 	vbdev_ocf_mngt_continue(vbdev, result);
964 }
965 
966 static void
967 add_core_cmpl(ocf_cache_t cache, ocf_core_t core, void *priv, int error)
968 {
969 	struct vbdev_ocf *vbdev = priv;
970 
971 	ocf_mngt_cache_unlock(cache);
972 
973 	if (error) {
974 		SPDK_ERRLOG("Error %d, failed to add core device to cache instance %s,"
975 			    "starting rollback\n", error, vbdev->name);
976 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
977 		return;
978 	} else {
979 		vbdev->ocf_core = core;
980 	}
981 
982 	vbdev_ocf_mngt_continue(vbdev, error);
983 }
984 
985 /* Try to lock cache, then add core */
986 static void
987 add_core_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
988 {
989 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
990 
991 	if (error) {
992 		SPDK_ERRLOG("Error %d, can not lock cache instance %s,"
993 			    "starting rollback\n", error, vbdev->name);
994 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
995 	}
996 	ocf_mngt_cache_add_core(vbdev->ocf_cache, &vbdev->cfg.core, add_core_cmpl, vbdev);
997 }
998 
999 /* Add core for existing OCF cache instance */
1000 static void
1001 add_core(struct vbdev_ocf *vbdev)
1002 {
1003 	ocf_mngt_cache_lock(vbdev->ocf_cache, add_core_cache_lock_cmpl, vbdev);
1004 }
1005 
1006 static void
1007 start_cache_cmpl(ocf_cache_t cache, void *priv, int error)
1008 {
1009 	struct vbdev_ocf *vbdev = priv;
1010 	uint64_t mem_needed;
1011 
1012 	ocf_mngt_cache_unlock(cache);
1013 
1014 	if (error) {
1015 		SPDK_ERRLOG("Error %d during start cache %s, starting rollback\n",
1016 			    error, vbdev->name);
1017 
1018 		if (error == -OCF_ERR_NO_MEM) {
1019 			ocf_mngt_get_ram_needed(cache, &vbdev->cfg.device, &mem_needed);
1020 
1021 			SPDK_NOTICELOG("Try to increase hugepage memory size or cache line size. "
1022 				       "For your configuration:\nDevice size: %"PRIu64" bytes\n"
1023 				       "Cache line size: %"PRIu64" bytes\nFree memory needed to start "
1024 				       "cache: %"PRIu64" bytes\n", vbdev->cache.bdev->blockcnt *
1025 				       vbdev->cache.bdev->blocklen, vbdev->cfg.cache.cache_line_size,
1026 				       mem_needed);
1027 		}
1028 
1029 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
1030 		return;
1031 	}
1032 
1033 	vbdev_ocf_mngt_continue(vbdev, error);
1034 }
1035 
1036 static int
1037 create_management_queue(struct vbdev_ocf *vbdev)
1038 {
1039 	struct spdk_poller *mngt_poller;
1040 	int rc;
1041 
1042 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &vbdev->cache_ctx->mngt_queue, &mngt_queue_ops);
1043 	if (rc) {
1044 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1045 		return rc;
1046 	}
1047 
1048 	mngt_poller = SPDK_POLLER_REGISTER(mngt_queue_poll, vbdev->cache_ctx->mngt_queue, 100);
1049 	if (mngt_poller == NULL) {
1050 		SPDK_ERRLOG("Unable to initiate mngt request: %s", spdk_strerror(ENOMEM));
1051 		return -ENOMEM;
1052 	}
1053 
1054 	ocf_queue_set_priv(vbdev->cache_ctx->mngt_queue, mngt_poller);
1055 	ocf_mngt_cache_set_mngt_queue(vbdev->ocf_cache, vbdev->cache_ctx->mngt_queue);
1056 
1057 	return 0;
1058 }
1059 
1060 /* Start OCF cache, attach caching device */
1061 static void
1062 start_cache(struct vbdev_ocf *vbdev)
1063 {
1064 	ocf_cache_t existing;
1065 	int rc;
1066 
1067 	if (is_ocf_cache_running(vbdev)) {
1068 		vbdev_ocf_mngt_stop(vbdev, NULL, -EALREADY);
1069 		return;
1070 	}
1071 
1072 	existing = get_other_cache_instance(vbdev);
1073 	if (existing) {
1074 		SPDK_NOTICELOG("OCF bdev %s connects to existing cache device %s\n",
1075 			       vbdev->name, vbdev->cache.name);
1076 		vbdev->ocf_cache = existing;
1077 		ocf_mngt_cache_get(vbdev->ocf_cache);
1078 		vbdev->cache_ctx = ocf_cache_get_priv(existing);
1079 		vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1080 		vbdev_ocf_mngt_continue(vbdev, 0);
1081 		return;
1082 	}
1083 
1084 	vbdev->cache_ctx = calloc(1, sizeof(struct vbdev_ocf_cache_ctx));
1085 	if (vbdev->cache_ctx == NULL) {
1086 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, -ENOMEM);
1087 		return;
1088 	}
1089 
1090 	vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1091 	pthread_mutex_init(&vbdev->cache_ctx->lock, NULL);
1092 
1093 	rc = ocf_mngt_cache_start(vbdev_ocf_ctx, &vbdev->ocf_cache, &vbdev->cfg.cache);
1094 	if (rc) {
1095 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, rc);
1096 		return;
1097 	}
1098 	ocf_mngt_cache_get(vbdev->ocf_cache);
1099 
1100 	ocf_cache_set_priv(vbdev->ocf_cache, vbdev->cache_ctx);
1101 
1102 	rc = create_management_queue(vbdev);
1103 	if (rc) {
1104 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1105 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, rc);
1106 		return;
1107 	}
1108 
1109 	if (vbdev->cfg.loadq) {
1110 		ocf_mngt_cache_load(vbdev->ocf_cache, &vbdev->cfg.device, start_cache_cmpl, vbdev);
1111 	} else {
1112 		ocf_mngt_cache_attach(vbdev->ocf_cache, &vbdev->cfg.device, start_cache_cmpl, vbdev);
1113 	}
1114 }
1115 
1116 /* Procedures called during register operation */
1117 vbdev_ocf_mngt_fn register_path[] = {
1118 	start_cache,
1119 	add_core,
1120 	finish_register,
1121 	NULL
1122 };
1123 
1124 /* Start cache instance and register OCF bdev */
1125 static void
1126 register_vbdev(struct vbdev_ocf *vbdev, vbdev_ocf_mngt_callback cb, void *cb_arg)
1127 {
1128 	int rc;
1129 
1130 	if (!(vbdev->core.attached && vbdev->cache.attached) || vbdev->state.started) {
1131 		cb(-EPERM, vbdev, cb_arg);
1132 		return;
1133 	}
1134 
1135 	vbdev->state.starting = true;
1136 	rc = vbdev_ocf_mngt_start(vbdev, register_path, cb, cb_arg);
1137 	if (rc) {
1138 		cb(rc, vbdev, cb_arg);
1139 	}
1140 }
1141 
1142 /* Init OCF configuration options
1143  * for core and cache devices */
1144 static void
1145 init_vbdev_config(struct vbdev_ocf *vbdev)
1146 {
1147 	struct vbdev_ocf_config *cfg = &vbdev->cfg;
1148 
1149 	snprintf(cfg->cache.name, sizeof(cfg->cache.name), "%s", vbdev->name);
1150 	snprintf(cfg->core.name, sizeof(cfg->core.name), "%s", vbdev->core.name);
1151 
1152 	/* TODO [metadata]: make configurable with persistent
1153 	 * metadata support */
1154 	cfg->cache.metadata_volatile = false;
1155 
1156 	/* This are suggested values that
1157 	 * should be sufficient for most use cases */
1158 	cfg->cache.backfill.max_queue_size = 65536;
1159 	cfg->cache.backfill.queue_unblock_size = 60000;
1160 
1161 	cfg->device.perform_test = false;
1162 	cfg->device.discard_on_start = false;
1163 
1164 	vbdev->cfg.cache.locked = true;
1165 
1166 	cfg->core.volume_type = SPDK_OBJECT;
1167 	cfg->device.volume_type = SPDK_OBJECT;
1168 
1169 	if (vbdev->cfg.loadq) {
1170 		/* When doing cache_load(), we need to set try_add to true,
1171 		 * otherwise OCF will interpret this core as new
1172 		 * instead of the inactive one */
1173 		vbdev->cfg.core.try_add = true;
1174 	} else {
1175 		/* When cache is initialized as new, set force flag to true,
1176 		 * to ignore warnings about existing metadata */
1177 		cfg->device.force = true;
1178 	}
1179 
1180 	/* Serialize bdev names in OCF UUID to interpret on future loads
1181 	 * Core UUID is a triple of (core name, vbdev name, cache name)
1182 	 * Cache UUID is cache bdev name */
1183 	cfg->device.uuid.size = strlen(vbdev->cache.name) + 1;
1184 	cfg->device.uuid.data = vbdev->cache.name;
1185 
1186 	snprintf(vbdev->uuid, VBDEV_OCF_MD_MAX_LEN, "%s %s %s",
1187 		 vbdev->core.name, vbdev->name, vbdev->cache.name);
1188 	cfg->core.uuid.size = strlen(vbdev->uuid) + 1;
1189 	cfg->core.uuid.data = vbdev->uuid;
1190 	vbdev->uuid[strlen(vbdev->core.name)] = 0;
1191 	vbdev->uuid[strlen(vbdev->core.name) + 1 + strlen(vbdev->name)] = 0;
1192 }
1193 
1194 /* Allocate vbdev structure object and add it to the global list */
1195 static int
1196 init_vbdev(const char *vbdev_name,
1197 	   const char *cache_mode_name,
1198 	   const uint64_t cache_line_size,
1199 	   const char *cache_name,
1200 	   const char *core_name,
1201 	   bool loadq)
1202 {
1203 	struct vbdev_ocf *vbdev;
1204 	int rc = 0;
1205 
1206 	if (spdk_bdev_get_by_name(vbdev_name) || vbdev_ocf_get_by_name(vbdev_name)) {
1207 		SPDK_ERRLOG("Device with name '%s' already exists\n", vbdev_name);
1208 		return -EPERM;
1209 	}
1210 
1211 	vbdev = calloc(1, sizeof(*vbdev));
1212 	if (!vbdev) {
1213 		goto error_mem;
1214 	}
1215 
1216 	vbdev->cache.parent = vbdev;
1217 	vbdev->core.parent = vbdev;
1218 	vbdev->cache.is_cache = true;
1219 	vbdev->core.is_cache = false;
1220 
1221 	if (cache_mode_name) {
1222 		vbdev->cfg.cache.cache_mode
1223 			= ocf_get_cache_mode(cache_mode_name);
1224 	} else if (!loadq) { /* In load path it is OK to pass NULL as cache mode */
1225 		SPDK_ERRLOG("No cache mode specified\n");
1226 		rc = -EINVAL;
1227 		goto error_free;
1228 	}
1229 	if (vbdev->cfg.cache.cache_mode < 0) {
1230 		SPDK_ERRLOG("Incorrect cache mode '%s'\n", cache_mode_name);
1231 		rc = -EINVAL;
1232 		goto error_free;
1233 	}
1234 
1235 	ocf_cache_line_size_t set_cache_line_size = cache_line_size ?
1236 			(ocf_cache_line_size_t)cache_line_size * KiB :
1237 			ocf_cache_line_size_default;
1238 	if (set_cache_line_size == 0) {
1239 		SPDK_ERRLOG("Cache line size should be non-zero.\n");
1240 		rc = -EINVAL;
1241 		goto error_free;
1242 	}
1243 	vbdev->cfg.device.cache_line_size = set_cache_line_size;
1244 	vbdev->cfg.cache.cache_line_size = set_cache_line_size;
1245 
1246 	vbdev->name = strdup(vbdev_name);
1247 	if (!vbdev->name) {
1248 		goto error_mem;
1249 	}
1250 
1251 	vbdev->cache.name = strdup(cache_name);
1252 	if (!vbdev->cache.name) {
1253 		goto error_mem;
1254 	}
1255 
1256 	vbdev->core.name = strdup(core_name);
1257 	if (!vbdev->core.name) {
1258 		goto error_mem;
1259 	}
1260 
1261 	vbdev->cfg.loadq = loadq;
1262 	init_vbdev_config(vbdev);
1263 	TAILQ_INSERT_TAIL(&g_ocf_vbdev_head, vbdev, tailq);
1264 	return rc;
1265 
1266 error_mem:
1267 	rc = -ENOMEM;
1268 error_free:
1269 	free_vbdev(vbdev);
1270 	return rc;
1271 }
1272 
1273 /* Read configuration file at the start of SPDK application
1274  * This adds vbdevs to global list if some mentioned in config */
1275 static int
1276 vbdev_ocf_init(void)
1277 {
1278 	int status;
1279 
1280 	status = vbdev_ocf_ctx_init();
1281 	if (status) {
1282 		SPDK_ERRLOG("OCF ctx initialization failed with=%d\n", status);
1283 		return status;
1284 	}
1285 
1286 	status = vbdev_ocf_volume_init();
1287 	if (status) {
1288 		vbdev_ocf_ctx_cleanup();
1289 		SPDK_ERRLOG("OCF volume initialization failed with=%d\n", status);
1290 		return status;
1291 	}
1292 
1293 	return status;
1294 }
1295 
1296 /* Called after application shutdown started
1297  * Release memory of allocated structures here */
1298 static void
1299 vbdev_ocf_module_fini(void)
1300 {
1301 	struct vbdev_ocf *vbdev;
1302 
1303 	while ((vbdev = TAILQ_FIRST(&g_ocf_vbdev_head))) {
1304 		TAILQ_REMOVE(&g_ocf_vbdev_head, vbdev, tailq);
1305 		free_vbdev(vbdev);
1306 	}
1307 
1308 	vbdev_ocf_volume_cleanup();
1309 	vbdev_ocf_ctx_cleanup();
1310 }
1311 
1312 /* When base device gets unpluged this is called
1313  * We will unregister cache vbdev here
1314  * When cache device is removed, we delete every OCF bdev that used it */
1315 static void
1316 hotremove_cb(struct vbdev_ocf_base *base)
1317 {
1318 	struct vbdev_ocf *vbdev;
1319 
1320 	if (!base->is_cache) {
1321 		if (base->parent->state.doing_finish) {
1322 			return;
1323 		}
1324 
1325 		SPDK_NOTICELOG("Deinitializing '%s' because its core device '%s' was removed\n",
1326 			       base->parent->name, base->name);
1327 		vbdev_ocf_delete(base->parent, NULL, NULL);
1328 		return;
1329 	}
1330 
1331 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1332 		if (vbdev->state.doing_finish) {
1333 			continue;
1334 		}
1335 		if (strcmp(base->name, vbdev->cache.name) == 0) {
1336 			SPDK_NOTICELOG("Deinitializing '%s' because"
1337 				       " its cache device '%s' was removed\n",
1338 				       vbdev->name, base->name);
1339 			vbdev_ocf_delete(vbdev, NULL, NULL);
1340 		}
1341 	}
1342 }
1343 
1344 static void
1345 base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1346 		   void *event_ctx)
1347 {
1348 	switch (type) {
1349 	case SPDK_BDEV_EVENT_REMOVE:
1350 		if (event_ctx) {
1351 			hotremove_cb(event_ctx);
1352 		}
1353 		break;
1354 	default:
1355 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1356 		break;
1357 	}
1358 }
1359 
1360 /* Open base SPDK bdev and claim it */
1361 static int
1362 attach_base(struct vbdev_ocf_base *base)
1363 {
1364 	int status;
1365 
1366 	if (base->attached) {
1367 		return -EALREADY;
1368 	}
1369 
1370 	/* If base cache bdev was already opened by other vbdev,
1371 	 * we just copy its descriptor here */
1372 	if (base->is_cache) {
1373 		struct vbdev_ocf_base *existing = get_other_cache_base(base);
1374 		if (existing) {
1375 			base->desc = existing->desc;
1376 			base->management_channel = existing->management_channel;
1377 			base->attached = true;
1378 			return 0;
1379 		}
1380 	}
1381 
1382 	status = spdk_bdev_open_ext(base->name, true, base_bdev_event_cb, base, &base->desc);
1383 	if (status) {
1384 		SPDK_ERRLOG("Unable to open device '%s' for writing\n", base->name);
1385 		return status;
1386 	}
1387 
1388 	status = spdk_bdev_module_claim_bdev(base->bdev, base->desc,
1389 					     &ocf_if);
1390 	if (status) {
1391 		SPDK_ERRLOG("Unable to claim device '%s'\n", base->name);
1392 		spdk_bdev_close(base->desc);
1393 		return status;
1394 	}
1395 
1396 	base->management_channel = spdk_bdev_get_io_channel(base->desc);
1397 	if (!base->management_channel) {
1398 		SPDK_ERRLOG("Unable to get io channel '%s'\n", base->name);
1399 		spdk_bdev_module_release_bdev(base->bdev);
1400 		spdk_bdev_close(base->desc);
1401 		return -ENOMEM;
1402 	}
1403 
1404 	/* Save the thread where the base device is opened */
1405 	base->thread = spdk_get_thread();
1406 
1407 	base->attached = true;
1408 	return status;
1409 }
1410 
1411 /* Attach base bdevs */
1412 static int
1413 attach_base_bdevs(struct vbdev_ocf *vbdev,
1414 		  struct spdk_bdev *cache_bdev,
1415 		  struct spdk_bdev *core_bdev)
1416 {
1417 	int rc = 0;
1418 
1419 	if (cache_bdev) {
1420 		vbdev->cache.bdev = cache_bdev;
1421 		rc |= attach_base(&vbdev->cache);
1422 	}
1423 
1424 	if (core_bdev) {
1425 		vbdev->core.bdev = core_bdev;
1426 		rc |= attach_base(&vbdev->core);
1427 	}
1428 
1429 	return rc;
1430 }
1431 
1432 /* Init and then start vbdev if all base devices are present */
1433 void
1434 vbdev_ocf_construct(const char *vbdev_name,
1435 		    const char *cache_mode_name,
1436 		    const uint64_t cache_line_size,
1437 		    const char *cache_name,
1438 		    const char *core_name,
1439 		    bool loadq,
1440 		    void (*cb)(int, struct vbdev_ocf *, void *),
1441 		    void *cb_arg)
1442 {
1443 	int rc;
1444 	struct spdk_bdev *cache_bdev = spdk_bdev_get_by_name(cache_name);
1445 	struct spdk_bdev *core_bdev = spdk_bdev_get_by_name(core_name);
1446 	struct vbdev_ocf *vbdev;
1447 
1448 	rc = init_vbdev(vbdev_name, cache_mode_name, cache_line_size, cache_name, core_name, loadq);
1449 	if (rc) {
1450 		cb(rc, NULL, cb_arg);
1451 		return;
1452 	}
1453 
1454 	vbdev = vbdev_ocf_get_by_name(vbdev_name);
1455 	if (vbdev == NULL) {
1456 		cb(-ENODEV, NULL, cb_arg);
1457 		return;
1458 	}
1459 
1460 	if (cache_bdev == NULL) {
1461 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for cache device '%s' to connect\n",
1462 			       vbdev->name, cache_name);
1463 	}
1464 	if (core_bdev == NULL) {
1465 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for core device '%s' to connect\n",
1466 			       vbdev->name, core_name);
1467 	}
1468 
1469 	rc = attach_base_bdevs(vbdev, cache_bdev, core_bdev);
1470 	if (rc) {
1471 		cb(rc, vbdev, cb_arg);
1472 		return;
1473 	}
1474 
1475 	if (core_bdev && cache_bdev) {
1476 		register_vbdev(vbdev, cb, cb_arg);
1477 	} else {
1478 		cb(0, vbdev, cb_arg);
1479 	}
1480 }
1481 
1482 /* Set new cache mode on OCF cache */
1483 void
1484 vbdev_ocf_set_cache_mode(struct vbdev_ocf *vbdev,
1485 			 const char *cache_mode_name,
1486 			 void (*cb)(int, struct vbdev_ocf *, void *),
1487 			 void *cb_arg)
1488 {
1489 	ocf_cache_t cache;
1490 	ocf_cache_mode_t cache_mode;
1491 	int rc;
1492 
1493 	cache = vbdev->ocf_cache;
1494 	cache_mode = ocf_get_cache_mode(cache_mode_name);
1495 
1496 	rc = ocf_mngt_cache_trylock(cache);
1497 	if (rc) {
1498 		cb(rc, vbdev, cb_arg);
1499 		return;
1500 	}
1501 
1502 	rc = ocf_mngt_cache_set_mode(cache, cache_mode);
1503 	ocf_mngt_cache_unlock(cache);
1504 	cb(rc, vbdev, cb_arg);
1505 }
1506 
1507 /* This called if new device is created in SPDK application
1508  * If that device named as one of base bdevs of OCF vbdev,
1509  * claim and open them */
1510 static void
1511 vbdev_ocf_examine(struct spdk_bdev *bdev)
1512 {
1513 	const char *bdev_name = spdk_bdev_get_name(bdev);
1514 	struct vbdev_ocf *vbdev;
1515 
1516 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1517 		if (vbdev->state.doing_finish) {
1518 			continue;
1519 		}
1520 
1521 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1522 			attach_base_bdevs(vbdev, bdev, NULL);
1523 			continue;
1524 		}
1525 		if (!strcmp(bdev_name, vbdev->core.name)) {
1526 			attach_base_bdevs(vbdev, NULL, bdev);
1527 			break;
1528 		}
1529 	}
1530 	spdk_bdev_module_examine_done(&ocf_if);
1531 }
1532 
1533 struct metadata_probe_ctx {
1534 	struct vbdev_ocf_base base;
1535 	ocf_volume_t volume;
1536 
1537 	struct ocf_volume_uuid *core_uuids;
1538 	unsigned int uuid_count;
1539 
1540 	int result;
1541 	int refcnt;
1542 };
1543 
1544 static void
1545 _examine_ctx_put(void *ctx)
1546 {
1547 	struct spdk_bdev_desc *desc = ctx;
1548 
1549 	spdk_bdev_close(desc);
1550 }
1551 
1552 static void
1553 examine_ctx_put(struct metadata_probe_ctx *ctx)
1554 {
1555 	unsigned int i;
1556 
1557 	ctx->refcnt--;
1558 	if (ctx->refcnt > 0) {
1559 		return;
1560 	}
1561 
1562 	if (ctx->result) {
1563 		SPDK_ERRLOG("OCF metadata probe for bdev '%s' failed with %d\n",
1564 			    spdk_bdev_get_name(ctx->base.bdev), ctx->result);
1565 	}
1566 
1567 	if (ctx->base.desc) {
1568 		/* Close the underlying bdev on its same opened thread. */
1569 		if (ctx->base.thread && ctx->base.thread != spdk_get_thread()) {
1570 			spdk_thread_send_msg(ctx->base.thread, _examine_ctx_put, ctx->base.desc);
1571 		} else {
1572 			spdk_bdev_close(ctx->base.desc);
1573 		}
1574 	}
1575 
1576 	if (ctx->volume) {
1577 		ocf_volume_destroy(ctx->volume);
1578 	}
1579 
1580 	if (ctx->core_uuids) {
1581 		for (i = 0; i < ctx->uuid_count; i++) {
1582 			free(ctx->core_uuids[i].data);
1583 		}
1584 	}
1585 	free(ctx->core_uuids);
1586 
1587 	examine_done(ctx->result, NULL, ctx->base.bdev);
1588 	free(ctx);
1589 }
1590 
1591 static void
1592 metadata_probe_construct_cb(int rc, struct vbdev_ocf *vbdev, void *vctx)
1593 {
1594 	struct metadata_probe_ctx *ctx = vctx;
1595 
1596 	examine_ctx_put(ctx);
1597 }
1598 
1599 /* This is second callback for ocf_metadata_probe_cores()
1600  * Here we create vbdev configurations based on UUIDs */
1601 static void
1602 metadata_probe_cores_construct(void *priv, int error, unsigned int num_cores)
1603 {
1604 	struct metadata_probe_ctx *ctx = priv;
1605 	const char *vbdev_name;
1606 	const char *core_name;
1607 	const char *cache_name;
1608 	unsigned int i;
1609 
1610 	if (error) {
1611 		ctx->result = error;
1612 		examine_ctx_put(ctx);
1613 		return;
1614 	}
1615 
1616 	for (i = 0; i < num_cores; i++) {
1617 		core_name = ocf_uuid_to_str(&ctx->core_uuids[i]);
1618 		vbdev_name = core_name + strlen(core_name) + 1;
1619 		cache_name = vbdev_name + strlen(vbdev_name) + 1;
1620 
1621 		if (strcmp(ctx->base.bdev->name, cache_name)) {
1622 			SPDK_NOTICELOG("OCF metadata found on %s belongs to bdev named '%s'\n",
1623 				       ctx->base.bdev->name, cache_name);
1624 		}
1625 
1626 		ctx->refcnt++;
1627 		vbdev_ocf_construct(vbdev_name, NULL, 0, cache_name, core_name, true,
1628 				    metadata_probe_construct_cb, ctx);
1629 	}
1630 
1631 	examine_ctx_put(ctx);
1632 }
1633 
1634 /* This callback is called after OCF reads cores UUIDs from cache metadata
1635  * Here we allocate memory for those UUIDs and call ocf_metadata_probe_cores() again */
1636 static void
1637 metadata_probe_cores_get_num(void *priv, int error, unsigned int num_cores)
1638 {
1639 	struct metadata_probe_ctx *ctx = priv;
1640 	unsigned int i;
1641 
1642 	if (error) {
1643 		ctx->result = error;
1644 		examine_ctx_put(ctx);
1645 		return;
1646 	}
1647 
1648 	ctx->uuid_count = num_cores;
1649 	ctx->core_uuids = calloc(num_cores, sizeof(struct ocf_volume_uuid));
1650 	if (!ctx->core_uuids) {
1651 		ctx->result = -ENOMEM;
1652 		examine_ctx_put(ctx);
1653 		return;
1654 	}
1655 
1656 	for (i = 0; i < ctx->uuid_count; i++) {
1657 		ctx->core_uuids[i].size = OCF_VOLUME_UUID_MAX_SIZE;
1658 		ctx->core_uuids[i].data = malloc(OCF_VOLUME_UUID_MAX_SIZE);
1659 		if (!ctx->core_uuids[i].data) {
1660 			ctx->result = -ENOMEM;
1661 			examine_ctx_put(ctx);
1662 			return;
1663 		}
1664 	}
1665 
1666 	ocf_metadata_probe_cores(vbdev_ocf_ctx, ctx->volume, ctx->core_uuids, ctx->uuid_count,
1667 				 metadata_probe_cores_construct, ctx);
1668 }
1669 
1670 static void
1671 metadata_probe_cb(void *priv, int rc,
1672 		  struct ocf_metadata_probe_status *status)
1673 {
1674 	struct metadata_probe_ctx *ctx = priv;
1675 
1676 	if (rc) {
1677 		/* -ENODATA means device does not have cache metadata on it */
1678 		if (rc != -OCF_ERR_NO_METADATA) {
1679 			ctx->result = rc;
1680 		}
1681 		examine_ctx_put(ctx);
1682 		return;
1683 	}
1684 
1685 	ocf_metadata_probe_cores(vbdev_ocf_ctx, ctx->volume, NULL, 0,
1686 				 metadata_probe_cores_get_num, ctx);
1687 }
1688 
1689 /* This is called after vbdev_ocf_examine
1690  * It allows to delay application initialization
1691  * until all OCF bdevs get registered
1692  * If vbdev has all of its base devices it starts asynchronously here
1693  * We first check if bdev appears in configuration,
1694  * if not we do metadata_probe() to create its configuration from bdev metadata */
1695 static void
1696 vbdev_ocf_examine_disk(struct spdk_bdev *bdev)
1697 {
1698 	const char *bdev_name = spdk_bdev_get_name(bdev);
1699 	struct vbdev_ocf *vbdev;
1700 	struct metadata_probe_ctx *ctx;
1701 	bool created_from_config = false;
1702 	int rc;
1703 
1704 	examine_start(bdev);
1705 
1706 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1707 		if (vbdev->state.doing_finish || vbdev->state.started) {
1708 			continue;
1709 		}
1710 
1711 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1712 			examine_start(bdev);
1713 			register_vbdev(vbdev, examine_done, bdev);
1714 			created_from_config = true;
1715 			continue;
1716 		}
1717 		if (!strcmp(bdev_name, vbdev->core.name)) {
1718 			examine_start(bdev);
1719 			register_vbdev(vbdev, examine_done, bdev);
1720 			examine_done(0, NULL, bdev);
1721 			return;
1722 		}
1723 	}
1724 
1725 	/* If devices is discovered during config we do not check for metadata */
1726 	if (created_from_config) {
1727 		examine_done(0, NULL, bdev);
1728 		return;
1729 	}
1730 
1731 	/* Metadata probe path
1732 	 * We create temporary OCF volume and a temporary base structure
1733 	 * to use them for ocf_metadata_probe() and for bottom adapter IOs
1734 	 * Then we get UUIDs of core devices an create configurations based on them */
1735 	ctx = calloc(1, sizeof(*ctx));
1736 	if (!ctx) {
1737 		examine_done(-ENOMEM, NULL, bdev);
1738 		return;
1739 	}
1740 
1741 	ctx->base.bdev = bdev;
1742 	ctx->refcnt = 1;
1743 
1744 	rc = spdk_bdev_open_ext(bdev_name, true, base_bdev_event_cb, NULL, &ctx->base.desc);
1745 	if (rc) {
1746 		ctx->result = rc;
1747 		examine_ctx_put(ctx);
1748 		return;
1749 	}
1750 
1751 	rc = ocf_ctx_volume_create(vbdev_ocf_ctx, &ctx->volume, NULL, SPDK_OBJECT);
1752 	if (rc) {
1753 		ctx->result = rc;
1754 		examine_ctx_put(ctx);
1755 		return;
1756 	}
1757 
1758 	rc = ocf_volume_open(ctx->volume, &ctx->base);
1759 	if (rc) {
1760 		ctx->result = rc;
1761 		examine_ctx_put(ctx);
1762 		return;
1763 	}
1764 
1765 	/* Save the thread where the base device is opened */
1766 	ctx->base.thread = spdk_get_thread();
1767 
1768 	ocf_metadata_probe(vbdev_ocf_ctx, ctx->volume, metadata_probe_cb, ctx);
1769 }
1770 
1771 static int
1772 vbdev_ocf_get_ctx_size(void)
1773 {
1774 	return sizeof(struct bdev_ocf_data);
1775 }
1776 
1777 static void
1778 fini_start(void)
1779 {
1780 	g_fini_started = true;
1781 }
1782 
1783 /* Module-global function table
1784  * Does not relate to vbdev instances */
1785 static struct spdk_bdev_module ocf_if = {
1786 	.name = "ocf",
1787 	.module_init = vbdev_ocf_init,
1788 	.fini_start = fini_start,
1789 	.module_fini = vbdev_ocf_module_fini,
1790 	.get_ctx_size = vbdev_ocf_get_ctx_size,
1791 	.examine_config = vbdev_ocf_examine,
1792 	.examine_disk   = vbdev_ocf_examine_disk,
1793 };
1794 SPDK_BDEV_MODULE_REGISTER(ocf, &ocf_if);
1795