xref: /spdk/module/bdev/ocf/vbdev_ocf.c (revision 927f1fd57bd004df581518466ec4c1b8083e5d23)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include <ocf/ocf.h>
35 #include <ocf/ocf_types.h>
36 #include <ocf/ocf_mngt.h>
37 
38 #include "ctx.h"
39 #include "data.h"
40 #include "volume.h"
41 #include "utils.h"
42 #include "vbdev_ocf.h"
43 
44 #include "spdk/bdev_module.h"
45 #include "spdk/thread.h"
46 #include "spdk/string.h"
47 #include "spdk/log.h"
48 #include "spdk/cpuset.h"
49 
50 static struct spdk_bdev_module ocf_if;
51 
52 static TAILQ_HEAD(, vbdev_ocf) g_ocf_vbdev_head
53 	= TAILQ_HEAD_INITIALIZER(g_ocf_vbdev_head);
54 
55 static TAILQ_HEAD(, examining_bdev) g_ocf_examining_bdevs_head
56 	= TAILQ_HEAD_INITIALIZER(g_ocf_examining_bdevs_head);
57 
58 bool g_fini_started = false;
59 
60 /* Structure for keeping list of bdevs that are claimed but not used yet */
61 struct examining_bdev {
62 	struct spdk_bdev           *bdev;
63 	TAILQ_ENTRY(examining_bdev) tailq;
64 };
65 
66 /* Add bdev to list of claimed */
67 static void
68 examine_start(struct spdk_bdev *bdev)
69 {
70 	struct examining_bdev *entry = malloc(sizeof(*entry));
71 
72 	assert(entry);
73 	entry->bdev = bdev;
74 	TAILQ_INSERT_TAIL(&g_ocf_examining_bdevs_head, entry, tailq);
75 }
76 
77 /* Find bdev on list of claimed bdevs, then remove it,
78  * if it was the last one on list then report examine done */
79 static void
80 examine_done(int status, struct vbdev_ocf *vbdev, void *cb_arg)
81 {
82 	struct spdk_bdev *bdev = cb_arg;
83 	struct examining_bdev *entry, *safe, *found = NULL;
84 
85 	TAILQ_FOREACH_SAFE(entry, &g_ocf_examining_bdevs_head, tailq, safe) {
86 		if (entry->bdev == bdev) {
87 			if (found) {
88 				goto remove;
89 			} else {
90 				found = entry;
91 			}
92 		}
93 	}
94 
95 	assert(found);
96 	spdk_bdev_module_examine_done(&ocf_if);
97 
98 remove:
99 	TAILQ_REMOVE(&g_ocf_examining_bdevs_head, found, tailq);
100 	free(found);
101 }
102 
103 /* Free allocated strings and structure itself
104  * Used at shutdown only */
105 static void
106 free_vbdev(struct vbdev_ocf *vbdev)
107 {
108 	if (!vbdev) {
109 		return;
110 	}
111 
112 	free(vbdev->name);
113 	free(vbdev->cache.name);
114 	free(vbdev->core.name);
115 	free(vbdev);
116 }
117 
118 /* Get existing cache base
119  * that is attached to other vbdev */
120 static struct vbdev_ocf_base *
121 get_other_cache_base(struct vbdev_ocf_base *base)
122 {
123 	struct vbdev_ocf *vbdev;
124 
125 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
126 		if (&vbdev->cache == base || !vbdev->cache.attached) {
127 			continue;
128 		}
129 		if (!strcmp(vbdev->cache.name, base->name)) {
130 			return &vbdev->cache;
131 		}
132 	}
133 
134 	return NULL;
135 }
136 
137 static bool
138 is_ocf_cache_running(struct vbdev_ocf *vbdev)
139 {
140 	if (vbdev->cache.attached && vbdev->ocf_cache) {
141 		return ocf_cache_is_running(vbdev->ocf_cache);
142 	}
143 	return false;
144 }
145 
146 /* Get existing OCF cache instance
147  * that is started by other vbdev */
148 static ocf_cache_t
149 get_other_cache_instance(struct vbdev_ocf *vbdev)
150 {
151 	struct vbdev_ocf *cmp;
152 
153 	TAILQ_FOREACH(cmp, &g_ocf_vbdev_head, tailq) {
154 		if (cmp->state.doing_finish || cmp == vbdev) {
155 			continue;
156 		}
157 		if (strcmp(cmp->cache.name, vbdev->cache.name)) {
158 			continue;
159 		}
160 		if (is_ocf_cache_running(cmp)) {
161 			return cmp->ocf_cache;
162 		}
163 	}
164 
165 	return NULL;
166 }
167 
168 static void
169 _remove_base_bdev(void *ctx)
170 {
171 	struct spdk_bdev_desc *desc = ctx;
172 
173 	spdk_bdev_close(desc);
174 }
175 
176 /* Close and unclaim base bdev */
177 static void
178 remove_base_bdev(struct vbdev_ocf_base *base)
179 {
180 	if (base->attached) {
181 		if (base->management_channel) {
182 			spdk_put_io_channel(base->management_channel);
183 		}
184 
185 		spdk_bdev_module_release_bdev(base->bdev);
186 		/* Close the underlying bdev on its same opened thread. */
187 		if (base->thread && base->thread != spdk_get_thread()) {
188 			spdk_thread_send_msg(base->thread, _remove_base_bdev, base->desc);
189 		} else {
190 			spdk_bdev_close(base->desc);
191 		}
192 		base->attached = false;
193 	}
194 }
195 
196 /* Finish unregister operation */
197 static void
198 unregister_finish(struct vbdev_ocf *vbdev)
199 {
200 	spdk_bdev_destruct_done(&vbdev->exp_bdev, vbdev->state.stop_status);
201 
202 	if (vbdev->ocf_cache) {
203 		ocf_mngt_cache_put(vbdev->ocf_cache);
204 	}
205 
206 	if (vbdev->cache_ctx) {
207 		vbdev_ocf_cache_ctx_put(vbdev->cache_ctx);
208 	}
209 	vbdev_ocf_mngt_continue(vbdev, 0);
210 }
211 
212 static void
213 close_core_bdev(struct vbdev_ocf *vbdev)
214 {
215 	remove_base_bdev(&vbdev->core);
216 	vbdev_ocf_mngt_continue(vbdev, 0);
217 }
218 
219 static void
220 remove_core_cmpl(void *priv, int error)
221 {
222 	struct vbdev_ocf *vbdev = priv;
223 
224 	ocf_mngt_cache_unlock(vbdev->ocf_cache);
225 	vbdev_ocf_mngt_continue(vbdev, error);
226 }
227 
228 /* Try to lock cache, then remove core */
229 static void
230 remove_core_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
231 {
232 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
233 
234 	if (error) {
235 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
236 			    error, vbdev->name);
237 		vbdev_ocf_mngt_continue(vbdev, error);
238 		return;
239 	}
240 
241 	ocf_mngt_cache_remove_core(vbdev->ocf_core, remove_core_cmpl, vbdev);
242 }
243 
244 /* Detach core base */
245 static void
246 detach_core(struct vbdev_ocf *vbdev)
247 {
248 	if (is_ocf_cache_running(vbdev)) {
249 		ocf_mngt_cache_lock(vbdev->ocf_cache, remove_core_cache_lock_cmpl, vbdev);
250 	} else {
251 		vbdev_ocf_mngt_continue(vbdev, 0);
252 	}
253 }
254 
255 static void
256 close_cache_bdev(struct vbdev_ocf *vbdev)
257 {
258 	remove_base_bdev(&vbdev->cache);
259 	vbdev_ocf_mngt_continue(vbdev, 0);
260 }
261 
262 /* Detach cache base */
263 static void
264 detach_cache(struct vbdev_ocf *vbdev)
265 {
266 	vbdev->state.stop_status = vbdev->mngt_ctx.status;
267 
268 	/* If some other vbdev references this cache bdev,
269 	 * we detach this only by changing the flag, without actual close */
270 	if (get_other_cache_base(&vbdev->cache)) {
271 		vbdev->cache.attached = false;
272 	}
273 
274 	vbdev_ocf_mngt_continue(vbdev, 0);
275 }
276 
277 static void
278 stop_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
279 {
280 	struct vbdev_ocf *vbdev = priv;
281 
282 	vbdev_ocf_queue_put(vbdev->cache_ctx->mngt_queue);
283 	ocf_mngt_cache_unlock(cache);
284 
285 	vbdev_ocf_mngt_continue(vbdev, error);
286 }
287 
288 /* Try to lock cache, then stop it */
289 static void
290 stop_vbdev_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
291 {
292 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
293 
294 	if (error) {
295 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
296 			    error, vbdev->name);
297 		vbdev_ocf_mngt_continue(vbdev, error);
298 		return;
299 	}
300 
301 	ocf_mngt_cache_stop(vbdev->ocf_cache, stop_vbdev_cmpl, vbdev);
302 }
303 
304 /* Stop OCF cache object
305  * vbdev_ocf is not operational after this */
306 static void
307 stop_vbdev(struct vbdev_ocf *vbdev)
308 {
309 	if (!is_ocf_cache_running(vbdev)) {
310 		vbdev_ocf_mngt_continue(vbdev, 0);
311 		return;
312 	}
313 
314 	if (!g_fini_started && get_other_cache_instance(vbdev)) {
315 		SPDK_NOTICELOG("Not stopping cache instance '%s'"
316 			       " because it is referenced by other OCF bdev\n",
317 			       vbdev->cache.name);
318 		vbdev_ocf_mngt_continue(vbdev, 0);
319 		return;
320 	}
321 
322 	ocf_mngt_cache_lock(vbdev->ocf_cache, stop_vbdev_cache_lock_cmpl, vbdev);
323 }
324 
325 static void
326 flush_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
327 {
328 	struct vbdev_ocf *vbdev = priv;
329 
330 	ocf_mngt_cache_unlock(cache);
331 	vbdev_ocf_mngt_continue(vbdev, error);
332 }
333 
334 static void
335 flush_vbdev_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
336 {
337 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
338 
339 	if (error) {
340 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
341 			    error, vbdev->name);
342 		vbdev_ocf_mngt_continue(vbdev, error);
343 		return;
344 	}
345 
346 	ocf_mngt_cache_flush(vbdev->ocf_cache, flush_vbdev_cmpl, vbdev);
347 }
348 
349 static void
350 flush_vbdev(struct vbdev_ocf *vbdev)
351 {
352 	if (!is_ocf_cache_running(vbdev)) {
353 		vbdev_ocf_mngt_continue(vbdev, -EINVAL);
354 		return;
355 	}
356 
357 	ocf_mngt_cache_lock(vbdev->ocf_cache, flush_vbdev_cache_lock_cmpl, vbdev);
358 }
359 
360 /* Procedures called during dirty unregister */
361 vbdev_ocf_mngt_fn unregister_path_dirty[] = {
362 	flush_vbdev,
363 	stop_vbdev,
364 	detach_cache,
365 	close_cache_bdev,
366 	detach_core,
367 	close_core_bdev,
368 	unregister_finish,
369 	NULL
370 };
371 
372 /* Procedures called during clean unregister */
373 vbdev_ocf_mngt_fn unregister_path_clean[] = {
374 	flush_vbdev,
375 	detach_core,
376 	close_core_bdev,
377 	stop_vbdev,
378 	detach_cache,
379 	close_cache_bdev,
380 	unregister_finish,
381 	NULL
382 };
383 
384 /* Start asynchronous management operation using unregister_path */
385 static void
386 unregister_cb(void *opaque)
387 {
388 	struct vbdev_ocf *vbdev = opaque;
389 	vbdev_ocf_mngt_fn *unregister_path;
390 	int rc;
391 
392 	unregister_path = vbdev->state.doing_clean_delete ?
393 			  unregister_path_clean : unregister_path_dirty;
394 
395 	rc = vbdev_ocf_mngt_start(vbdev, unregister_path, NULL, NULL);
396 	if (rc) {
397 		SPDK_ERRLOG("Unable to unregister OCF bdev: %d\n", rc);
398 		spdk_bdev_destruct_done(&vbdev->exp_bdev, rc);
399 	}
400 }
401 
402 /* Clean remove case - remove core and then cache, this order
403  * will remove instance permanently */
404 static void
405 _vbdev_ocf_destruct_clean(struct vbdev_ocf *vbdev)
406 {
407 	if (vbdev->core.attached) {
408 		detach_core(vbdev);
409 		close_core_bdev(vbdev);
410 	}
411 
412 	if (vbdev->cache.attached) {
413 		detach_cache(vbdev);
414 		close_cache_bdev(vbdev);
415 	}
416 }
417 
418 /* Dirty shutdown/hot remove case - remove cache and then core, this order
419  * will allow us to recover this instance in the future */
420 static void
421 _vbdev_ocf_destruct_dirty(struct vbdev_ocf *vbdev)
422 {
423 	if (vbdev->cache.attached) {
424 		detach_cache(vbdev);
425 		close_cache_bdev(vbdev);
426 	}
427 
428 	if (vbdev->core.attached) {
429 		detach_core(vbdev);
430 		close_core_bdev(vbdev);
431 	}
432 }
433 
434 /* Unregister io device with callback to unregister_cb
435  * This function is called during spdk_bdev_unregister */
436 static int
437 vbdev_ocf_destruct(void *opaque)
438 {
439 	struct vbdev_ocf *vbdev = opaque;
440 
441 	if (vbdev->state.doing_finish) {
442 		return -EALREADY;
443 	}
444 
445 	if (vbdev->state.starting && !vbdev->state.started) {
446 		/* Prevent before detach cache/core during register path of
447 		  this bdev */
448 		return -EBUSY;
449 	}
450 
451 	vbdev->state.doing_finish = true;
452 
453 	if (vbdev->state.started) {
454 		spdk_io_device_unregister(vbdev, unregister_cb);
455 		/* Return 1 because unregister is delayed */
456 		return 1;
457 	}
458 
459 	if (vbdev->state.doing_clean_delete) {
460 		_vbdev_ocf_destruct_clean(vbdev);
461 	} else {
462 		_vbdev_ocf_destruct_dirty(vbdev);
463 	}
464 
465 	return 0;
466 }
467 
468 /* Stop OCF cache and unregister SPDK bdev */
469 int
470 vbdev_ocf_delete(struct vbdev_ocf *vbdev, void (*cb)(void *, int), void *cb_arg)
471 {
472 	int rc = 0;
473 
474 	if (vbdev->state.started) {
475 		spdk_bdev_unregister(&vbdev->exp_bdev, cb, cb_arg);
476 	} else {
477 		rc = vbdev_ocf_destruct(vbdev);
478 		if (rc == 0 && cb) {
479 			cb(cb_arg, 0);
480 		}
481 	}
482 
483 	return rc;
484 }
485 
486 /* Remove cores permanently and then stop OCF cache and unregister SPDK bdev */
487 int
488 vbdev_ocf_delete_clean(struct vbdev_ocf *vbdev, void (*cb)(void *, int),
489 		       void *cb_arg)
490 {
491 	vbdev->state.doing_clean_delete = true;
492 
493 	return vbdev_ocf_delete(vbdev, cb, cb_arg);
494 }
495 
496 
497 /* If vbdev is online, return its object */
498 struct vbdev_ocf *
499 vbdev_ocf_get_by_name(const char *name)
500 {
501 	struct vbdev_ocf *vbdev;
502 
503 	if (name == NULL) {
504 		assert(false);
505 		return NULL;
506 	}
507 
508 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
509 		if (vbdev->name == NULL || vbdev->state.doing_finish) {
510 			continue;
511 		}
512 		if (strcmp(vbdev->name, name) == 0) {
513 			return vbdev;
514 		}
515 	}
516 	return NULL;
517 }
518 
519 /* Return matching base if parent vbdev is online */
520 struct vbdev_ocf_base *
521 vbdev_ocf_get_base_by_name(const char *name)
522 {
523 	struct vbdev_ocf *vbdev;
524 
525 	if (name == NULL) {
526 		assert(false);
527 		return NULL;
528 	}
529 
530 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
531 		if (vbdev->state.doing_finish) {
532 			continue;
533 		}
534 
535 		if (vbdev->cache.name && strcmp(vbdev->cache.name, name) == 0) {
536 			return &vbdev->cache;
537 		}
538 		if (vbdev->core.name && strcmp(vbdev->core.name, name) == 0) {
539 			return &vbdev->core;
540 		}
541 	}
542 	return NULL;
543 }
544 
545 /* Execute fn for each OCF device that is online or waits for base devices */
546 void
547 vbdev_ocf_foreach(vbdev_ocf_foreach_fn fn, void *ctx)
548 {
549 	struct vbdev_ocf *vbdev;
550 
551 	assert(fn != NULL);
552 
553 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
554 		if (!vbdev->state.doing_finish) {
555 			fn(vbdev, ctx);
556 		}
557 	}
558 }
559 
560 /* Called from OCF when SPDK_IO is completed */
561 static void
562 vbdev_ocf_io_submit_cb(struct ocf_io *io, int error)
563 {
564 	struct spdk_bdev_io *bdev_io = io->priv1;
565 
566 	if (error == 0) {
567 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
568 	} else if (error == -OCF_ERR_NO_MEM) {
569 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
570 	} else {
571 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
572 	}
573 
574 	ocf_io_put(io);
575 }
576 
577 /* Configure io parameters and send it to OCF */
578 static int
579 io_submit_to_ocf(struct spdk_bdev_io *bdev_io, struct ocf_io *io)
580 {
581 	switch (bdev_io->type) {
582 	case SPDK_BDEV_IO_TYPE_WRITE:
583 	case SPDK_BDEV_IO_TYPE_READ:
584 		ocf_core_submit_io(io);
585 		return 0;
586 	case SPDK_BDEV_IO_TYPE_FLUSH:
587 		ocf_core_submit_flush(io);
588 		return 0;
589 	case SPDK_BDEV_IO_TYPE_UNMAP:
590 		ocf_core_submit_discard(io);
591 		return 0;
592 	case SPDK_BDEV_IO_TYPE_RESET:
593 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
594 	default:
595 		SPDK_ERRLOG("Unsupported IO type: %d\n", bdev_io->type);
596 		return -EINVAL;
597 	}
598 }
599 
600 /* Submit SPDK-IO to OCF */
601 static void
602 io_handle(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
603 {
604 	struct vbdev_ocf *vbdev = bdev_io->bdev->ctxt;
605 	struct ocf_io *io = NULL;
606 	struct bdev_ocf_data *data = NULL;
607 	struct vbdev_ocf_qctx *qctx = spdk_io_channel_get_ctx(ch);
608 	uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
609 	uint64_t offset = bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen;
610 	int dir, flags = 0;
611 	int err;
612 
613 	switch (bdev_io->type) {
614 	case SPDK_BDEV_IO_TYPE_READ:
615 		dir = OCF_READ;
616 		break;
617 	case SPDK_BDEV_IO_TYPE_WRITE:
618 		dir = OCF_WRITE;
619 		break;
620 	case SPDK_BDEV_IO_TYPE_FLUSH:
621 		dir = OCF_WRITE;
622 		break;
623 	case SPDK_BDEV_IO_TYPE_UNMAP:
624 		dir = OCF_WRITE;
625 		break;
626 	default:
627 		err = -EINVAL;
628 		goto fail;
629 	}
630 
631 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
632 		flags = OCF_WRITE_FLUSH;
633 	}
634 
635 	io = ocf_core_new_io(vbdev->ocf_core, qctx->queue, offset, len, dir, 0, flags);
636 	if (!io) {
637 		err = -ENOMEM;
638 		goto fail;
639 	}
640 
641 	data = vbdev_ocf_data_from_spdk_io(bdev_io);
642 	if (!data) {
643 		err = -ENOMEM;
644 		goto fail;
645 	}
646 
647 	err = ocf_io_set_data(io, data, 0);
648 	if (err) {
649 		goto fail;
650 	}
651 
652 	ocf_io_set_cmpl(io, bdev_io, NULL, vbdev_ocf_io_submit_cb);
653 
654 	err = io_submit_to_ocf(bdev_io, io);
655 	if (err) {
656 		goto fail;
657 	}
658 
659 	return;
660 
661 fail:
662 	if (io) {
663 		ocf_io_put(io);
664 	}
665 
666 	if (err == -ENOMEM) {
667 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
668 	} else {
669 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
670 	}
671 }
672 
673 static void
674 vbdev_ocf_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
675 		     bool success)
676 {
677 	if (!success) {
678 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
679 		return;
680 	}
681 
682 	io_handle(ch, bdev_io);
683 }
684 
685 /* Called from bdev layer when an io to Cache vbdev is submitted */
686 static void
687 vbdev_ocf_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
688 {
689 	switch (bdev_io->type) {
690 	case SPDK_BDEV_IO_TYPE_READ:
691 		/* User does not have to allocate io vectors for the request,
692 		 * so in case they are not allocated, we allocate them here */
693 		spdk_bdev_io_get_buf(bdev_io, vbdev_ocf_get_buf_cb,
694 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
695 		break;
696 	case SPDK_BDEV_IO_TYPE_WRITE:
697 	case SPDK_BDEV_IO_TYPE_FLUSH:
698 	case SPDK_BDEV_IO_TYPE_UNMAP:
699 		io_handle(ch, bdev_io);
700 		break;
701 	case SPDK_BDEV_IO_TYPE_RESET:
702 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
703 	default:
704 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
705 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
706 		break;
707 	}
708 }
709 
710 /* Called from bdev layer */
711 static bool
712 vbdev_ocf_io_type_supported(void *opaque, enum spdk_bdev_io_type io_type)
713 {
714 	struct vbdev_ocf *vbdev = opaque;
715 
716 	switch (io_type) {
717 	case SPDK_BDEV_IO_TYPE_READ:
718 	case SPDK_BDEV_IO_TYPE_WRITE:
719 	case SPDK_BDEV_IO_TYPE_FLUSH:
720 	case SPDK_BDEV_IO_TYPE_UNMAP:
721 		return spdk_bdev_io_type_supported(vbdev->core.bdev, io_type);
722 	case SPDK_BDEV_IO_TYPE_RESET:
723 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
724 	default:
725 		return false;
726 	}
727 }
728 
729 /* Called from bdev layer */
730 static struct spdk_io_channel *
731 vbdev_ocf_get_io_channel(void *opaque)
732 {
733 	struct vbdev_ocf *bdev = opaque;
734 
735 	return spdk_get_io_channel(bdev);
736 }
737 
738 static int
739 vbdev_ocf_dump_info_json(void *opaque, struct spdk_json_write_ctx *w)
740 {
741 	struct vbdev_ocf *vbdev = opaque;
742 
743 	spdk_json_write_named_string(w, "cache_device", vbdev->cache.name);
744 	spdk_json_write_named_string(w, "core_device", vbdev->core.name);
745 
746 	spdk_json_write_named_string(w, "mode",
747 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
748 	spdk_json_write_named_uint32(w, "cache_line_size",
749 				     ocf_get_cache_line_size(vbdev->ocf_cache));
750 	spdk_json_write_named_bool(w, "metadata_volatile",
751 				   vbdev->cfg.cache.metadata_volatile);
752 
753 	return 0;
754 }
755 
756 static void
757 vbdev_ocf_write_json_config(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
758 {
759 	struct vbdev_ocf *vbdev = bdev->ctxt;
760 
761 	spdk_json_write_object_begin(w);
762 
763 	spdk_json_write_named_string(w, "method", "bdev_ocf_create");
764 
765 	spdk_json_write_named_object_begin(w, "params");
766 	spdk_json_write_named_string(w, "name", vbdev->name);
767 	spdk_json_write_named_string(w, "mode",
768 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
769 	spdk_json_write_named_uint32(w, "cache_line_size",
770 				     ocf_get_cache_line_size(vbdev->ocf_cache));
771 	spdk_json_write_named_string(w, "cache_bdev_name", vbdev->cache.name);
772 	spdk_json_write_named_string(w, "core_bdev_name", vbdev->core.name);
773 	spdk_json_write_object_end(w);
774 
775 	spdk_json_write_object_end(w);
776 }
777 
778 /* Cache vbdev function table
779  * Used by bdev layer */
780 static struct spdk_bdev_fn_table cache_dev_fn_table = {
781 	.destruct = vbdev_ocf_destruct,
782 	.io_type_supported = vbdev_ocf_io_type_supported,
783 	.submit_request	= vbdev_ocf_submit_request,
784 	.get_io_channel	= vbdev_ocf_get_io_channel,
785 	.write_config_json = vbdev_ocf_write_json_config,
786 	.dump_info_json = vbdev_ocf_dump_info_json,
787 };
788 
789 /* Poller function for the OCF queue
790  * We execute OCF requests here synchronously */
791 static int
792 queue_poll(void *opaque)
793 {
794 	struct vbdev_ocf_qctx *qctx = opaque;
795 	uint32_t iono = ocf_queue_pending_io(qctx->queue);
796 	int i, max = spdk_min(32, iono);
797 
798 	for (i = 0; i < max; i++) {
799 		ocf_queue_run_single(qctx->queue);
800 	}
801 
802 	if (iono > 0) {
803 		return SPDK_POLLER_BUSY;
804 	} else {
805 		return SPDK_POLLER_IDLE;
806 	}
807 }
808 
809 /* Called during ocf_submit_io, ocf_purge*
810  * and any other requests that need to submit io */
811 static void
812 vbdev_ocf_ctx_queue_kick(ocf_queue_t q)
813 {
814 }
815 
816 /* OCF queue deinitialization
817  * Called at ocf_cache_stop */
818 static void
819 vbdev_ocf_ctx_queue_stop(ocf_queue_t q)
820 {
821 	struct vbdev_ocf_qctx *qctx = ocf_queue_get_priv(q);
822 
823 	if (qctx) {
824 		spdk_put_io_channel(qctx->cache_ch);
825 		spdk_put_io_channel(qctx->core_ch);
826 		spdk_poller_unregister(&qctx->poller);
827 		if (qctx->allocated) {
828 			free(qctx);
829 		}
830 	}
831 }
832 
833 /* Queue ops is an interface for running queue thread
834  * stop() operation in called just before queue gets destroyed */
835 const struct ocf_queue_ops queue_ops = {
836 	.kick_sync = vbdev_ocf_ctx_queue_kick,
837 	.kick = vbdev_ocf_ctx_queue_kick,
838 	.stop = vbdev_ocf_ctx_queue_stop,
839 };
840 
841 /* Called on cache vbdev creation at every thread
842  * We allocate OCF queues here and SPDK poller for it */
843 static int
844 io_device_create_cb(void *io_device, void *ctx_buf)
845 {
846 	struct vbdev_ocf *vbdev = io_device;
847 	struct vbdev_ocf_qctx *qctx = ctx_buf;
848 	int rc;
849 
850 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &qctx->queue, &queue_ops);
851 	if (rc) {
852 		return rc;
853 	}
854 
855 	ocf_queue_set_priv(qctx->queue, qctx);
856 
857 	qctx->vbdev      = vbdev;
858 	qctx->cache_ch   = spdk_bdev_get_io_channel(vbdev->cache.desc);
859 	qctx->core_ch    = spdk_bdev_get_io_channel(vbdev->core.desc);
860 	qctx->poller     = SPDK_POLLER_REGISTER(queue_poll, qctx, 0);
861 
862 	return rc;
863 }
864 
865 /* Called per thread
866  * Put OCF queue and relaunch poller with new context to finish pending requests */
867 static void
868 io_device_destroy_cb(void *io_device, void *ctx_buf)
869 {
870 	/* Making a copy of context to use it after io channel will be destroyed */
871 	struct vbdev_ocf_qctx *copy = malloc(sizeof(*copy));
872 	struct vbdev_ocf_qctx *qctx = ctx_buf;
873 
874 	if (copy) {
875 		ocf_queue_set_priv(qctx->queue, copy);
876 		memcpy(copy, qctx, sizeof(*copy));
877 		spdk_poller_unregister(&qctx->poller);
878 		copy->poller = SPDK_POLLER_REGISTER(queue_poll, copy, 0);
879 		copy->allocated = true;
880 	} else {
881 		SPDK_ERRLOG("Unable to stop OCF queue properly: %s\n",
882 			    spdk_strerror(ENOMEM));
883 	}
884 
885 	vbdev_ocf_queue_put(qctx->queue);
886 }
887 
888 /* OCF management queue deinitialization */
889 static void
890 vbdev_ocf_ctx_mngt_queue_stop(ocf_queue_t q)
891 {
892 	struct spdk_poller *poller = ocf_queue_get_priv(q);
893 
894 	if (poller) {
895 		spdk_poller_unregister(&poller);
896 	}
897 }
898 
899 static int
900 mngt_queue_poll(void *opaque)
901 {
902 	ocf_queue_t q = opaque;
903 	uint32_t iono = ocf_queue_pending_io(q);
904 	int i, max = spdk_min(32, iono);
905 
906 	for (i = 0; i < max; i++) {
907 		ocf_queue_run_single(q);
908 	}
909 
910 	if (iono > 0) {
911 		return SPDK_POLLER_BUSY;
912 	} else {
913 		return SPDK_POLLER_IDLE;
914 	}
915 }
916 
917 static void
918 vbdev_ocf_ctx_mngt_queue_kick(ocf_queue_t q)
919 {
920 }
921 
922 /* Queue ops is an interface for running queue thread
923  * stop() operation in called just before queue gets destroyed */
924 const struct ocf_queue_ops mngt_queue_ops = {
925 	.kick_sync = NULL,
926 	.kick = vbdev_ocf_ctx_mngt_queue_kick,
927 	.stop = vbdev_ocf_ctx_mngt_queue_stop,
928 };
929 
930 static void
931 vbdev_ocf_mngt_exit(struct vbdev_ocf *vbdev, vbdev_ocf_mngt_fn *rollback_path, int rc)
932 {
933 	vbdev->state.starting = false;
934 	vbdev_ocf_mngt_stop(vbdev, rollback_path, rc);
935 }
936 
937 /* Create exported spdk object */
938 static void
939 finish_register(struct vbdev_ocf *vbdev)
940 {
941 	int result;
942 
943 	/* Copy properties of the base bdev */
944 	vbdev->exp_bdev.blocklen = vbdev->core.bdev->blocklen;
945 	vbdev->exp_bdev.write_cache = vbdev->core.bdev->write_cache;
946 	vbdev->exp_bdev.required_alignment = vbdev->core.bdev->required_alignment;
947 
948 	vbdev->exp_bdev.name = vbdev->name;
949 	vbdev->exp_bdev.product_name = "SPDK OCF";
950 
951 	vbdev->exp_bdev.blockcnt = vbdev->core.bdev->blockcnt;
952 	vbdev->exp_bdev.ctxt = vbdev;
953 	vbdev->exp_bdev.fn_table = &cache_dev_fn_table;
954 	vbdev->exp_bdev.module = &ocf_if;
955 
956 	/* Finally register vbdev in SPDK */
957 	spdk_io_device_register(vbdev, io_device_create_cb, io_device_destroy_cb,
958 				sizeof(struct vbdev_ocf_qctx), vbdev->name);
959 	result = spdk_bdev_register(&vbdev->exp_bdev);
960 	if (result) {
961 		SPDK_ERRLOG("Could not register exposed bdev %s\n",
962 			    vbdev->name);
963 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, result);
964 		return;
965 	} else {
966 		vbdev->state.started = true;
967 	}
968 
969 	vbdev_ocf_mngt_continue(vbdev, result);
970 }
971 
972 static void
973 add_core_cmpl(ocf_cache_t cache, ocf_core_t core, void *priv, int error)
974 {
975 	struct vbdev_ocf *vbdev = priv;
976 
977 	ocf_mngt_cache_unlock(cache);
978 
979 	if (error) {
980 		SPDK_ERRLOG("Error %d, failed to add core device to cache instance %s,"
981 			    "starting rollback\n", error, vbdev->name);
982 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
983 		return;
984 	} else {
985 		vbdev->ocf_core = core;
986 	}
987 
988 	vbdev_ocf_mngt_continue(vbdev, error);
989 }
990 
991 /* Try to lock cache, then add core */
992 static void
993 add_core_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
994 {
995 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
996 
997 	if (error) {
998 		SPDK_ERRLOG("Error %d, can not lock cache instance %s,"
999 			    "starting rollback\n", error, vbdev->name);
1000 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
1001 	}
1002 	ocf_mngt_cache_add_core(vbdev->ocf_cache, &vbdev->cfg.core, add_core_cmpl, vbdev);
1003 }
1004 
1005 /* Add core for existing OCF cache instance */
1006 static void
1007 add_core(struct vbdev_ocf *vbdev)
1008 {
1009 	ocf_mngt_cache_lock(vbdev->ocf_cache, add_core_cache_lock_cmpl, vbdev);
1010 }
1011 
1012 static void
1013 start_cache_cmpl(ocf_cache_t cache, void *priv, int error)
1014 {
1015 	struct vbdev_ocf *vbdev = priv;
1016 	uint64_t mem_needed;
1017 
1018 	ocf_mngt_cache_unlock(cache);
1019 
1020 	if (error) {
1021 		SPDK_ERRLOG("Error %d during start cache %s, starting rollback\n",
1022 			    error, vbdev->name);
1023 
1024 		if (error == -OCF_ERR_NO_MEM) {
1025 			ocf_mngt_get_ram_needed(cache, &vbdev->cfg.device, &mem_needed);
1026 
1027 			SPDK_NOTICELOG("Try to increase hugepage memory size or cache line size. "
1028 				       "For your configuration:\nDevice size: %"PRIu64" bytes\n"
1029 				       "Cache line size: %"PRIu64" bytes\nFree memory needed to start "
1030 				       "cache: %"PRIu64" bytes\n", vbdev->cache.bdev->blockcnt *
1031 				       vbdev->cache.bdev->blocklen, vbdev->cfg.cache.cache_line_size,
1032 				       mem_needed);
1033 		}
1034 
1035 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
1036 		return;
1037 	}
1038 
1039 	vbdev_ocf_mngt_continue(vbdev, error);
1040 }
1041 
1042 static int
1043 create_management_queue(struct vbdev_ocf *vbdev)
1044 {
1045 	struct spdk_poller *mngt_poller;
1046 	int rc;
1047 
1048 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &vbdev->cache_ctx->mngt_queue, &mngt_queue_ops);
1049 	if (rc) {
1050 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1051 		return rc;
1052 	}
1053 
1054 	mngt_poller = SPDK_POLLER_REGISTER(mngt_queue_poll, vbdev->cache_ctx->mngt_queue, 100);
1055 	if (mngt_poller == NULL) {
1056 		SPDK_ERRLOG("Unable to initiate mngt request: %s", spdk_strerror(ENOMEM));
1057 		return -ENOMEM;
1058 	}
1059 
1060 	ocf_queue_set_priv(vbdev->cache_ctx->mngt_queue, mngt_poller);
1061 	ocf_mngt_cache_set_mngt_queue(vbdev->ocf_cache, vbdev->cache_ctx->mngt_queue);
1062 
1063 	return 0;
1064 }
1065 
1066 /* Start OCF cache, attach caching device */
1067 static void
1068 start_cache(struct vbdev_ocf *vbdev)
1069 {
1070 	ocf_cache_t existing;
1071 	uint32_t cache_block_size = vbdev->cache.bdev->blocklen;
1072 	uint32_t core_block_size = vbdev->core.bdev->blocklen;
1073 	int rc;
1074 
1075 	if (is_ocf_cache_running(vbdev)) {
1076 		vbdev_ocf_mngt_stop(vbdev, NULL, -EALREADY);
1077 		return;
1078 	}
1079 
1080 	if (cache_block_size > core_block_size) {
1081 		SPDK_ERRLOG("Cache bdev block size (%d) is bigger then core bdev block size (%d)\n",
1082 			    cache_block_size, core_block_size);
1083 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, -EINVAL);
1084 		return;
1085 	}
1086 
1087 	existing = get_other_cache_instance(vbdev);
1088 	if (existing) {
1089 		SPDK_NOTICELOG("OCF bdev %s connects to existing cache device %s\n",
1090 			       vbdev->name, vbdev->cache.name);
1091 		vbdev->ocf_cache = existing;
1092 		ocf_mngt_cache_get(vbdev->ocf_cache);
1093 		vbdev->cache_ctx = ocf_cache_get_priv(existing);
1094 		vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1095 		vbdev_ocf_mngt_continue(vbdev, 0);
1096 		return;
1097 	}
1098 
1099 	vbdev->cache_ctx = calloc(1, sizeof(struct vbdev_ocf_cache_ctx));
1100 	if (vbdev->cache_ctx == NULL) {
1101 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, -ENOMEM);
1102 		return;
1103 	}
1104 
1105 	vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1106 	pthread_mutex_init(&vbdev->cache_ctx->lock, NULL);
1107 
1108 	rc = ocf_mngt_cache_start(vbdev_ocf_ctx, &vbdev->ocf_cache, &vbdev->cfg.cache, NULL);
1109 	if (rc) {
1110 		SPDK_ERRLOG("Could not start cache %s: %d\n", vbdev->name, rc);
1111 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, rc);
1112 		return;
1113 	}
1114 	ocf_mngt_cache_get(vbdev->ocf_cache);
1115 
1116 	ocf_cache_set_priv(vbdev->ocf_cache, vbdev->cache_ctx);
1117 
1118 	rc = create_management_queue(vbdev);
1119 	if (rc) {
1120 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1121 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, rc);
1122 		return;
1123 	}
1124 
1125 	if (vbdev->cfg.loadq) {
1126 		ocf_mngt_cache_load(vbdev->ocf_cache, &vbdev->cfg.device, start_cache_cmpl, vbdev);
1127 	} else {
1128 		ocf_mngt_cache_attach(vbdev->ocf_cache, &vbdev->cfg.device, start_cache_cmpl, vbdev);
1129 	}
1130 }
1131 
1132 /* Procedures called during register operation */
1133 vbdev_ocf_mngt_fn register_path[] = {
1134 	start_cache,
1135 	add_core,
1136 	finish_register,
1137 	NULL
1138 };
1139 
1140 /* Start cache instance and register OCF bdev */
1141 static void
1142 register_vbdev(struct vbdev_ocf *vbdev, vbdev_ocf_mngt_callback cb, void *cb_arg)
1143 {
1144 	int rc;
1145 
1146 	if (!(vbdev->core.attached && vbdev->cache.attached) || vbdev->state.started) {
1147 		cb(-EPERM, vbdev, cb_arg);
1148 		return;
1149 	}
1150 
1151 	vbdev->state.starting = true;
1152 	rc = vbdev_ocf_mngt_start(vbdev, register_path, cb, cb_arg);
1153 	if (rc) {
1154 		cb(rc, vbdev, cb_arg);
1155 	}
1156 }
1157 
1158 /* Init OCF configuration options
1159  * for core and cache devices */
1160 static void
1161 init_vbdev_config(struct vbdev_ocf *vbdev)
1162 {
1163 	struct vbdev_ocf_config *cfg = &vbdev->cfg;
1164 
1165 	/* Initialize OCF defaults first */
1166 	ocf_mngt_cache_device_config_set_default(&cfg->device);
1167 	ocf_mngt_cache_config_set_default(&cfg->cache);
1168 	ocf_mngt_core_config_set_default(&cfg->core);
1169 
1170 	snprintf(cfg->cache.name, sizeof(cfg->cache.name), "%s", vbdev->name);
1171 	snprintf(cfg->core.name, sizeof(cfg->core.name), "%s", vbdev->core.name);
1172 
1173 	cfg->device.open_cores = false;
1174 	cfg->device.perform_test = false;
1175 	cfg->device.discard_on_start = false;
1176 
1177 	vbdev->cfg.cache.locked = true;
1178 
1179 	cfg->core.volume_type = SPDK_OBJECT;
1180 	cfg->device.volume_type = SPDK_OBJECT;
1181 
1182 	if (vbdev->cfg.loadq) {
1183 		/* When doing cache_load(), we need to set try_add to true,
1184 		 * otherwise OCF will interpret this core as new
1185 		 * instead of the inactive one */
1186 		vbdev->cfg.core.try_add = true;
1187 	} else {
1188 		/* When cache is initialized as new, set force flag to true,
1189 		 * to ignore warnings about existing metadata */
1190 		cfg->device.force = true;
1191 	}
1192 
1193 	/* Serialize bdev names in OCF UUID to interpret on future loads
1194 	 * Core UUID is a triple of (core name, vbdev name, cache name)
1195 	 * Cache UUID is cache bdev name */
1196 	cfg->device.uuid.size = strlen(vbdev->cache.name) + 1;
1197 	cfg->device.uuid.data = vbdev->cache.name;
1198 
1199 	snprintf(vbdev->uuid, VBDEV_OCF_MD_MAX_LEN, "%s %s %s",
1200 		 vbdev->core.name, vbdev->name, vbdev->cache.name);
1201 	cfg->core.uuid.size = strlen(vbdev->uuid) + 1;
1202 	cfg->core.uuid.data = vbdev->uuid;
1203 	vbdev->uuid[strlen(vbdev->core.name)] = 0;
1204 	vbdev->uuid[strlen(vbdev->core.name) + 1 + strlen(vbdev->name)] = 0;
1205 }
1206 
1207 /* Allocate vbdev structure object and add it to the global list */
1208 static int
1209 init_vbdev(const char *vbdev_name,
1210 	   const char *cache_mode_name,
1211 	   const uint64_t cache_line_size,
1212 	   const char *cache_name,
1213 	   const char *core_name,
1214 	   bool loadq)
1215 {
1216 	struct vbdev_ocf *vbdev;
1217 	int rc = 0;
1218 
1219 	if (spdk_bdev_get_by_name(vbdev_name) || vbdev_ocf_get_by_name(vbdev_name)) {
1220 		SPDK_ERRLOG("Device with name '%s' already exists\n", vbdev_name);
1221 		return -EPERM;
1222 	}
1223 
1224 	vbdev = calloc(1, sizeof(*vbdev));
1225 	if (!vbdev) {
1226 		goto error_mem;
1227 	}
1228 
1229 	vbdev->name = strdup(vbdev_name);
1230 	if (!vbdev->name) {
1231 		goto error_mem;
1232 	}
1233 
1234 	vbdev->cache.name = strdup(cache_name);
1235 	if (!vbdev->cache.name) {
1236 		goto error_mem;
1237 	}
1238 
1239 	vbdev->core.name = strdup(core_name);
1240 	if (!vbdev->core.name) {
1241 		goto error_mem;
1242 	}
1243 
1244 	vbdev->cache.parent = vbdev;
1245 	vbdev->core.parent = vbdev;
1246 	vbdev->cache.is_cache = true;
1247 	vbdev->core.is_cache = false;
1248 	vbdev->cfg.loadq = loadq;
1249 
1250 	init_vbdev_config(vbdev);
1251 
1252 	if (cache_mode_name) {
1253 		vbdev->cfg.cache.cache_mode
1254 			= ocf_get_cache_mode(cache_mode_name);
1255 	} else if (!loadq) { /* In load path it is OK to pass NULL as cache mode */
1256 		SPDK_ERRLOG("No cache mode specified\n");
1257 		rc = -EINVAL;
1258 		goto error_free;
1259 	}
1260 	if (vbdev->cfg.cache.cache_mode < 0) {
1261 		SPDK_ERRLOG("Incorrect cache mode '%s'\n", cache_mode_name);
1262 		rc = -EINVAL;
1263 		goto error_free;
1264 	}
1265 
1266 	ocf_cache_line_size_t set_cache_line_size = cache_line_size ?
1267 			(ocf_cache_line_size_t)cache_line_size * KiB :
1268 			ocf_cache_line_size_default;
1269 	if (set_cache_line_size == 0) {
1270 		SPDK_ERRLOG("Cache line size should be non-zero.\n");
1271 		rc = -EINVAL;
1272 		goto error_free;
1273 	}
1274 	vbdev->cfg.device.cache_line_size = set_cache_line_size;
1275 	vbdev->cfg.cache.cache_line_size = set_cache_line_size;
1276 
1277 	TAILQ_INSERT_TAIL(&g_ocf_vbdev_head, vbdev, tailq);
1278 	return rc;
1279 
1280 error_mem:
1281 	rc = -ENOMEM;
1282 error_free:
1283 	free_vbdev(vbdev);
1284 	return rc;
1285 }
1286 
1287 /* Read configuration file at the start of SPDK application
1288  * This adds vbdevs to global list if some mentioned in config */
1289 static int
1290 vbdev_ocf_init(void)
1291 {
1292 	int status;
1293 
1294 	status = vbdev_ocf_ctx_init();
1295 	if (status) {
1296 		SPDK_ERRLOG("OCF ctx initialization failed with=%d\n", status);
1297 		return status;
1298 	}
1299 
1300 	status = vbdev_ocf_volume_init();
1301 	if (status) {
1302 		vbdev_ocf_ctx_cleanup();
1303 		SPDK_ERRLOG("OCF volume initialization failed with=%d\n", status);
1304 		return status;
1305 	}
1306 
1307 	return status;
1308 }
1309 
1310 /* Called after application shutdown started
1311  * Release memory of allocated structures here */
1312 static void
1313 vbdev_ocf_module_fini(void)
1314 {
1315 	struct vbdev_ocf *vbdev;
1316 
1317 	while ((vbdev = TAILQ_FIRST(&g_ocf_vbdev_head))) {
1318 		TAILQ_REMOVE(&g_ocf_vbdev_head, vbdev, tailq);
1319 		free_vbdev(vbdev);
1320 	}
1321 
1322 	vbdev_ocf_volume_cleanup();
1323 	vbdev_ocf_ctx_cleanup();
1324 }
1325 
1326 /* When base device gets unplugged this is called
1327  * We will unregister cache vbdev here
1328  * When cache device is removed, we delete every OCF bdev that used it */
1329 static void
1330 hotremove_cb(struct vbdev_ocf_base *base)
1331 {
1332 	struct vbdev_ocf *vbdev;
1333 
1334 	if (!base->is_cache) {
1335 		if (base->parent->state.doing_finish) {
1336 			return;
1337 		}
1338 
1339 		SPDK_NOTICELOG("Deinitializing '%s' because its core device '%s' was removed\n",
1340 			       base->parent->name, base->name);
1341 		vbdev_ocf_delete(base->parent, NULL, NULL);
1342 		return;
1343 	}
1344 
1345 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1346 		if (vbdev->state.doing_finish) {
1347 			continue;
1348 		}
1349 		if (strcmp(base->name, vbdev->cache.name) == 0) {
1350 			SPDK_NOTICELOG("Deinitializing '%s' because"
1351 				       " its cache device '%s' was removed\n",
1352 				       vbdev->name, base->name);
1353 			vbdev_ocf_delete(vbdev, NULL, NULL);
1354 		}
1355 	}
1356 }
1357 
1358 static void
1359 base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1360 		   void *event_ctx)
1361 {
1362 	switch (type) {
1363 	case SPDK_BDEV_EVENT_REMOVE:
1364 		if (event_ctx) {
1365 			hotremove_cb(event_ctx);
1366 		}
1367 		break;
1368 	default:
1369 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1370 		break;
1371 	}
1372 }
1373 
1374 /* Open base SPDK bdev and claim it */
1375 static int
1376 attach_base(struct vbdev_ocf_base *base)
1377 {
1378 	int status;
1379 
1380 	if (base->attached) {
1381 		return -EALREADY;
1382 	}
1383 
1384 	/* If base cache bdev was already opened by other vbdev,
1385 	 * we just copy its descriptor here */
1386 	if (base->is_cache) {
1387 		struct vbdev_ocf_base *existing = get_other_cache_base(base);
1388 		if (existing) {
1389 			base->desc = existing->desc;
1390 			base->management_channel = existing->management_channel;
1391 			base->attached = true;
1392 			return 0;
1393 		}
1394 	}
1395 
1396 	status = spdk_bdev_open_ext(base->name, true, base_bdev_event_cb, base, &base->desc);
1397 	if (status) {
1398 		SPDK_ERRLOG("Unable to open device '%s' for writing\n", base->name);
1399 		return status;
1400 	}
1401 
1402 	status = spdk_bdev_module_claim_bdev(base->bdev, base->desc,
1403 					     &ocf_if);
1404 	if (status) {
1405 		SPDK_ERRLOG("Unable to claim device '%s'\n", base->name);
1406 		spdk_bdev_close(base->desc);
1407 		return status;
1408 	}
1409 
1410 	base->management_channel = spdk_bdev_get_io_channel(base->desc);
1411 	if (!base->management_channel) {
1412 		SPDK_ERRLOG("Unable to get io channel '%s'\n", base->name);
1413 		spdk_bdev_module_release_bdev(base->bdev);
1414 		spdk_bdev_close(base->desc);
1415 		return -ENOMEM;
1416 	}
1417 
1418 	/* Save the thread where the base device is opened */
1419 	base->thread = spdk_get_thread();
1420 
1421 	base->attached = true;
1422 	return status;
1423 }
1424 
1425 /* Attach base bdevs */
1426 static int
1427 attach_base_bdevs(struct vbdev_ocf *vbdev,
1428 		  struct spdk_bdev *cache_bdev,
1429 		  struct spdk_bdev *core_bdev)
1430 {
1431 	int rc = 0;
1432 
1433 	if (cache_bdev) {
1434 		vbdev->cache.bdev = cache_bdev;
1435 		rc |= attach_base(&vbdev->cache);
1436 	}
1437 
1438 	if (core_bdev) {
1439 		vbdev->core.bdev = core_bdev;
1440 		rc |= attach_base(&vbdev->core);
1441 	}
1442 
1443 	return rc;
1444 }
1445 
1446 /* Init and then start vbdev if all base devices are present */
1447 void
1448 vbdev_ocf_construct(const char *vbdev_name,
1449 		    const char *cache_mode_name,
1450 		    const uint64_t cache_line_size,
1451 		    const char *cache_name,
1452 		    const char *core_name,
1453 		    bool loadq,
1454 		    void (*cb)(int, struct vbdev_ocf *, void *),
1455 		    void *cb_arg)
1456 {
1457 	int rc;
1458 	struct spdk_bdev *cache_bdev = spdk_bdev_get_by_name(cache_name);
1459 	struct spdk_bdev *core_bdev = spdk_bdev_get_by_name(core_name);
1460 	struct vbdev_ocf *vbdev;
1461 
1462 	rc = init_vbdev(vbdev_name, cache_mode_name, cache_line_size, cache_name, core_name, loadq);
1463 	if (rc) {
1464 		cb(rc, NULL, cb_arg);
1465 		return;
1466 	}
1467 
1468 	vbdev = vbdev_ocf_get_by_name(vbdev_name);
1469 	if (vbdev == NULL) {
1470 		cb(-ENODEV, NULL, cb_arg);
1471 		return;
1472 	}
1473 
1474 	if (cache_bdev == NULL) {
1475 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for cache device '%s' to connect\n",
1476 			       vbdev->name, cache_name);
1477 	}
1478 	if (core_bdev == NULL) {
1479 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for core device '%s' to connect\n",
1480 			       vbdev->name, core_name);
1481 	}
1482 
1483 	rc = attach_base_bdevs(vbdev, cache_bdev, core_bdev);
1484 	if (rc) {
1485 		cb(rc, vbdev, cb_arg);
1486 		return;
1487 	}
1488 
1489 	if (core_bdev && cache_bdev) {
1490 		register_vbdev(vbdev, cb, cb_arg);
1491 	} else {
1492 		cb(0, vbdev, cb_arg);
1493 	}
1494 }
1495 
1496 /* Set new cache mode on OCF cache */
1497 void
1498 vbdev_ocf_set_cache_mode(struct vbdev_ocf *vbdev,
1499 			 const char *cache_mode_name,
1500 			 void (*cb)(int, struct vbdev_ocf *, void *),
1501 			 void *cb_arg)
1502 {
1503 	ocf_cache_t cache;
1504 	ocf_cache_mode_t cache_mode;
1505 	int rc;
1506 
1507 	cache = vbdev->ocf_cache;
1508 	cache_mode = ocf_get_cache_mode(cache_mode_name);
1509 
1510 	rc = ocf_mngt_cache_trylock(cache);
1511 	if (rc) {
1512 		cb(rc, vbdev, cb_arg);
1513 		return;
1514 	}
1515 
1516 	rc = ocf_mngt_cache_set_mode(cache, cache_mode);
1517 	ocf_mngt_cache_unlock(cache);
1518 	cb(rc, vbdev, cb_arg);
1519 }
1520 
1521 /* Set sequential cutoff parameters on OCF cache */
1522 void
1523 vbdev_ocf_set_seqcutoff(struct vbdev_ocf *vbdev, const char *policy_name, uint32_t threshold,
1524 			uint32_t promotion_count, void (*cb)(int, void *), void *cb_arg)
1525 {
1526 	ocf_cache_t cache;
1527 	ocf_seq_cutoff_policy policy;
1528 	int rc;
1529 
1530 	cache = vbdev->ocf_cache;
1531 
1532 	policy = ocf_get_seqcutoff_policy(policy_name);
1533 	if (policy == ocf_seq_cutoff_policy_max) {
1534 		cb(OCF_ERR_INVAL, cb_arg);
1535 		return;
1536 	}
1537 
1538 	rc = ocf_mngt_cache_trylock(cache);
1539 	if (rc) {
1540 		cb(rc, cb_arg);
1541 		return;
1542 	}
1543 
1544 	rc = ocf_mngt_core_set_seq_cutoff_policy_all(cache, policy);
1545 	if (rc) {
1546 		goto end;
1547 	}
1548 
1549 	if (threshold) {
1550 		threshold = threshold * KiB;
1551 
1552 		rc = ocf_mngt_core_set_seq_cutoff_threshold_all(cache, threshold);
1553 		if (rc) {
1554 			goto end;
1555 		}
1556 	}
1557 
1558 	if (promotion_count) {
1559 		rc = ocf_mngt_core_set_seq_cutoff_promotion_count_all(cache, promotion_count);
1560 	}
1561 
1562 end:
1563 	ocf_mngt_cache_unlock(cache);
1564 	cb(rc, cb_arg);
1565 }
1566 
1567 /* This called if new device is created in SPDK application
1568  * If that device named as one of base bdevs of OCF vbdev,
1569  * claim and open them */
1570 static void
1571 vbdev_ocf_examine(struct spdk_bdev *bdev)
1572 {
1573 	const char *bdev_name = spdk_bdev_get_name(bdev);
1574 	struct vbdev_ocf *vbdev;
1575 
1576 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1577 		if (vbdev->state.doing_finish) {
1578 			continue;
1579 		}
1580 
1581 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1582 			attach_base_bdevs(vbdev, bdev, NULL);
1583 			continue;
1584 		}
1585 		if (!strcmp(bdev_name, vbdev->core.name)) {
1586 			attach_base_bdevs(vbdev, NULL, bdev);
1587 			break;
1588 		}
1589 	}
1590 	spdk_bdev_module_examine_done(&ocf_if);
1591 }
1592 
1593 struct metadata_probe_ctx {
1594 	struct vbdev_ocf_base base;
1595 	ocf_volume_t volume;
1596 
1597 	struct ocf_volume_uuid *core_uuids;
1598 	unsigned int uuid_count;
1599 
1600 	int result;
1601 	int refcnt;
1602 };
1603 
1604 static void
1605 _examine_ctx_put(void *ctx)
1606 {
1607 	struct spdk_bdev_desc *desc = ctx;
1608 
1609 	spdk_bdev_close(desc);
1610 }
1611 
1612 static void
1613 examine_ctx_put(struct metadata_probe_ctx *ctx)
1614 {
1615 	unsigned int i;
1616 
1617 	ctx->refcnt--;
1618 	if (ctx->refcnt > 0) {
1619 		return;
1620 	}
1621 
1622 	if (ctx->result) {
1623 		SPDK_ERRLOG("OCF metadata probe for bdev '%s' failed with %d\n",
1624 			    spdk_bdev_get_name(ctx->base.bdev), ctx->result);
1625 	}
1626 
1627 	if (ctx->base.desc) {
1628 		/* Close the underlying bdev on its same opened thread. */
1629 		if (ctx->base.thread && ctx->base.thread != spdk_get_thread()) {
1630 			spdk_thread_send_msg(ctx->base.thread, _examine_ctx_put, ctx->base.desc);
1631 		} else {
1632 			spdk_bdev_close(ctx->base.desc);
1633 		}
1634 	}
1635 
1636 	if (ctx->volume) {
1637 		ocf_volume_destroy(ctx->volume);
1638 	}
1639 
1640 	if (ctx->core_uuids) {
1641 		for (i = 0; i < ctx->uuid_count; i++) {
1642 			free(ctx->core_uuids[i].data);
1643 		}
1644 	}
1645 	free(ctx->core_uuids);
1646 
1647 	examine_done(ctx->result, NULL, ctx->base.bdev);
1648 	free(ctx);
1649 }
1650 
1651 static void
1652 metadata_probe_construct_cb(int rc, struct vbdev_ocf *vbdev, void *vctx)
1653 {
1654 	struct metadata_probe_ctx *ctx = vctx;
1655 
1656 	examine_ctx_put(ctx);
1657 }
1658 
1659 /* This is second callback for ocf_metadata_probe_cores()
1660  * Here we create vbdev configurations based on UUIDs */
1661 static void
1662 metadata_probe_cores_construct(void *priv, int error, unsigned int num_cores)
1663 {
1664 	struct metadata_probe_ctx *ctx = priv;
1665 	const char *vbdev_name;
1666 	const char *core_name;
1667 	const char *cache_name;
1668 	unsigned int i;
1669 
1670 	if (error) {
1671 		ctx->result = error;
1672 		examine_ctx_put(ctx);
1673 		return;
1674 	}
1675 
1676 	for (i = 0; i < num_cores; i++) {
1677 		core_name = ocf_uuid_to_str(&ctx->core_uuids[i]);
1678 		vbdev_name = core_name + strlen(core_name) + 1;
1679 		cache_name = vbdev_name + strlen(vbdev_name) + 1;
1680 
1681 		if (strcmp(ctx->base.bdev->name, cache_name)) {
1682 			SPDK_NOTICELOG("OCF metadata found on %s belongs to bdev named '%s'\n",
1683 				       ctx->base.bdev->name, cache_name);
1684 		}
1685 
1686 		ctx->refcnt++;
1687 		vbdev_ocf_construct(vbdev_name, NULL, 0, cache_name, core_name, true,
1688 				    metadata_probe_construct_cb, ctx);
1689 	}
1690 
1691 	examine_ctx_put(ctx);
1692 }
1693 
1694 /* This callback is called after OCF reads cores UUIDs from cache metadata
1695  * Here we allocate memory for those UUIDs and call ocf_metadata_probe_cores() again */
1696 static void
1697 metadata_probe_cores_get_num(void *priv, int error, unsigned int num_cores)
1698 {
1699 	struct metadata_probe_ctx *ctx = priv;
1700 	unsigned int i;
1701 
1702 	if (error) {
1703 		ctx->result = error;
1704 		examine_ctx_put(ctx);
1705 		return;
1706 	}
1707 
1708 	ctx->uuid_count = num_cores;
1709 	ctx->core_uuids = calloc(num_cores, sizeof(struct ocf_volume_uuid));
1710 	if (!ctx->core_uuids) {
1711 		ctx->result = -ENOMEM;
1712 		examine_ctx_put(ctx);
1713 		return;
1714 	}
1715 
1716 	for (i = 0; i < ctx->uuid_count; i++) {
1717 		ctx->core_uuids[i].size = OCF_VOLUME_UUID_MAX_SIZE;
1718 		ctx->core_uuids[i].data = malloc(OCF_VOLUME_UUID_MAX_SIZE);
1719 		if (!ctx->core_uuids[i].data) {
1720 			ctx->result = -ENOMEM;
1721 			examine_ctx_put(ctx);
1722 			return;
1723 		}
1724 	}
1725 
1726 	ocf_metadata_probe_cores(vbdev_ocf_ctx, ctx->volume, ctx->core_uuids, ctx->uuid_count,
1727 				 metadata_probe_cores_construct, ctx);
1728 }
1729 
1730 static void
1731 metadata_probe_cb(void *priv, int rc,
1732 		  struct ocf_metadata_probe_status *status)
1733 {
1734 	struct metadata_probe_ctx *ctx = priv;
1735 
1736 	if (rc) {
1737 		/* -ENODATA means device does not have cache metadata on it */
1738 		if (rc != -OCF_ERR_NO_METADATA) {
1739 			ctx->result = rc;
1740 		}
1741 		examine_ctx_put(ctx);
1742 		return;
1743 	}
1744 
1745 	ocf_metadata_probe_cores(vbdev_ocf_ctx, ctx->volume, NULL, 0,
1746 				 metadata_probe_cores_get_num, ctx);
1747 }
1748 
1749 /* This is called after vbdev_ocf_examine
1750  * It allows to delay application initialization
1751  * until all OCF bdevs get registered
1752  * If vbdev has all of its base devices it starts asynchronously here
1753  * We first check if bdev appears in configuration,
1754  * if not we do metadata_probe() to create its configuration from bdev metadata */
1755 static void
1756 vbdev_ocf_examine_disk(struct spdk_bdev *bdev)
1757 {
1758 	const char *bdev_name = spdk_bdev_get_name(bdev);
1759 	struct vbdev_ocf *vbdev;
1760 	struct metadata_probe_ctx *ctx;
1761 	bool created_from_config = false;
1762 	int rc;
1763 
1764 	examine_start(bdev);
1765 
1766 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1767 		if (vbdev->state.doing_finish || vbdev->state.started) {
1768 			continue;
1769 		}
1770 
1771 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1772 			examine_start(bdev);
1773 			register_vbdev(vbdev, examine_done, bdev);
1774 			created_from_config = true;
1775 			continue;
1776 		}
1777 		if (!strcmp(bdev_name, vbdev->core.name)) {
1778 			examine_start(bdev);
1779 			register_vbdev(vbdev, examine_done, bdev);
1780 			examine_done(0, NULL, bdev);
1781 			return;
1782 		}
1783 	}
1784 
1785 	/* If devices is discovered during config we do not check for metadata */
1786 	if (created_from_config) {
1787 		examine_done(0, NULL, bdev);
1788 		return;
1789 	}
1790 
1791 	/* Metadata probe path
1792 	 * We create temporary OCF volume and a temporary base structure
1793 	 * to use them for ocf_metadata_probe() and for bottom adapter IOs
1794 	 * Then we get UUIDs of core devices an create configurations based on them */
1795 	ctx = calloc(1, sizeof(*ctx));
1796 	if (!ctx) {
1797 		examine_done(-ENOMEM, NULL, bdev);
1798 		return;
1799 	}
1800 
1801 	ctx->base.bdev = bdev;
1802 	ctx->refcnt = 1;
1803 
1804 	rc = spdk_bdev_open_ext(bdev_name, true, base_bdev_event_cb, NULL, &ctx->base.desc);
1805 	if (rc) {
1806 		ctx->result = rc;
1807 		examine_ctx_put(ctx);
1808 		return;
1809 	}
1810 
1811 	rc = ocf_ctx_volume_create(vbdev_ocf_ctx, &ctx->volume, NULL, SPDK_OBJECT);
1812 	if (rc) {
1813 		ctx->result = rc;
1814 		examine_ctx_put(ctx);
1815 		return;
1816 	}
1817 
1818 	rc = ocf_volume_open(ctx->volume, &ctx->base);
1819 	if (rc) {
1820 		ctx->result = rc;
1821 		examine_ctx_put(ctx);
1822 		return;
1823 	}
1824 
1825 	/* Save the thread where the base device is opened */
1826 	ctx->base.thread = spdk_get_thread();
1827 
1828 	ocf_metadata_probe(vbdev_ocf_ctx, ctx->volume, metadata_probe_cb, ctx);
1829 }
1830 
1831 static int
1832 vbdev_ocf_get_ctx_size(void)
1833 {
1834 	return sizeof(struct bdev_ocf_data);
1835 }
1836 
1837 static void
1838 fini_start(void)
1839 {
1840 	g_fini_started = true;
1841 }
1842 
1843 /* Module-global function table
1844  * Does not relate to vbdev instances */
1845 static struct spdk_bdev_module ocf_if = {
1846 	.name = "ocf",
1847 	.module_init = vbdev_ocf_init,
1848 	.fini_start = fini_start,
1849 	.module_fini = vbdev_ocf_module_fini,
1850 	.get_ctx_size = vbdev_ocf_get_ctx_size,
1851 	.examine_config = vbdev_ocf_examine,
1852 	.examine_disk   = vbdev_ocf_examine_disk,
1853 };
1854 SPDK_BDEV_MODULE_REGISTER(ocf, &ocf_if);
1855