xref: /spdk/module/bdev/ocf/vbdev_ocf.c (revision 45a053c5777494f4e8ce4bc1191c9de3920377f7)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include <ocf/ocf.h>
7 #include <ocf/ocf_types.h>
8 #include <ocf/ocf_mngt.h>
9 
10 #include "ctx.h"
11 #include "data.h"
12 #include "volume.h"
13 #include "utils.h"
14 #include "vbdev_ocf.h"
15 
16 #include "spdk/bdev_module.h"
17 #include "spdk/thread.h"
18 #include "spdk/string.h"
19 #include "spdk/log.h"
20 #include "spdk/cpuset.h"
21 
22 /* This namespace UUID was generated using uuid_generate() method. */
23 #define BDEV_OCF_NAMESPACE_UUID "f92b7f49-f6c0-44c8-bd23-3205e8c3b6ad"
24 
25 static struct spdk_bdev_module ocf_if;
26 
27 static TAILQ_HEAD(, vbdev_ocf) g_ocf_vbdev_head
28 	= TAILQ_HEAD_INITIALIZER(g_ocf_vbdev_head);
29 
30 static TAILQ_HEAD(, examining_bdev) g_ocf_examining_bdevs_head
31 	= TAILQ_HEAD_INITIALIZER(g_ocf_examining_bdevs_head);
32 
33 bool g_fini_started = false;
34 
35 /* Structure for keeping list of bdevs that are claimed but not used yet */
36 struct examining_bdev {
37 	struct spdk_bdev           *bdev;
38 	TAILQ_ENTRY(examining_bdev) tailq;
39 };
40 
41 /* Add bdev to list of claimed */
42 static void
43 examine_start(struct spdk_bdev *bdev)
44 {
45 	struct examining_bdev *entry = malloc(sizeof(*entry));
46 
47 	assert(entry);
48 	entry->bdev = bdev;
49 	TAILQ_INSERT_TAIL(&g_ocf_examining_bdevs_head, entry, tailq);
50 }
51 
52 /* Find bdev on list of claimed bdevs, then remove it,
53  * if it was the last one on list then report examine done */
54 static void
55 examine_done(int status, struct vbdev_ocf *vbdev, void *cb_arg)
56 {
57 	struct spdk_bdev *bdev = cb_arg;
58 	struct examining_bdev *entry, *safe, *found = NULL;
59 
60 	TAILQ_FOREACH_SAFE(entry, &g_ocf_examining_bdevs_head, tailq, safe) {
61 		if (entry->bdev == bdev) {
62 			if (found) {
63 				goto remove;
64 			} else {
65 				found = entry;
66 			}
67 		}
68 	}
69 
70 	assert(found);
71 	spdk_bdev_module_examine_done(&ocf_if);
72 
73 remove:
74 	TAILQ_REMOVE(&g_ocf_examining_bdevs_head, found, tailq);
75 	free(found);
76 }
77 
78 /* Free allocated strings and structure itself
79  * Used at shutdown only */
80 static void
81 free_vbdev(struct vbdev_ocf *vbdev)
82 {
83 	if (!vbdev) {
84 		return;
85 	}
86 
87 	free(vbdev->name);
88 	free(vbdev->cache.name);
89 	free(vbdev->core.name);
90 	free(vbdev);
91 }
92 
93 /* Get existing cache base
94  * that is attached to other vbdev */
95 static struct vbdev_ocf_base *
96 get_other_cache_base(struct vbdev_ocf_base *base)
97 {
98 	struct vbdev_ocf *vbdev;
99 
100 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
101 		if (&vbdev->cache == base || !vbdev->cache.attached) {
102 			continue;
103 		}
104 		if (!strcmp(vbdev->cache.name, base->name)) {
105 			return &vbdev->cache;
106 		}
107 	}
108 
109 	return NULL;
110 }
111 
112 static bool
113 is_ocf_cache_running(struct vbdev_ocf *vbdev)
114 {
115 	if (vbdev->cache.attached && vbdev->ocf_cache) {
116 		return ocf_cache_is_running(vbdev->ocf_cache);
117 	}
118 	return false;
119 }
120 
121 static bool
122 is_ocf_cache_initializing(struct vbdev_ocf *vbdev)
123 {
124 	if (vbdev->cache.attached && vbdev->ocf_cache) {
125 		return ocf_cache_is_initializing(vbdev->ocf_cache);
126 	}
127 	return false;
128 }
129 
130 /* Get existing OCF cache instance
131  * that is started by other vbdev */
132 static ocf_cache_t
133 get_other_cache_instance(struct vbdev_ocf *vbdev)
134 {
135 	struct vbdev_ocf *cmp;
136 
137 	TAILQ_FOREACH(cmp, &g_ocf_vbdev_head, tailq) {
138 		if (cmp->state.doing_finish || cmp == vbdev) {
139 			continue;
140 		}
141 		if (strcmp(cmp->cache.name, vbdev->cache.name)) {
142 			continue;
143 		}
144 		if (is_ocf_cache_running(cmp) || is_ocf_cache_initializing(cmp)) {
145 			return cmp->ocf_cache;
146 		}
147 	}
148 
149 	return NULL;
150 }
151 
152 static void
153 _remove_base_bdev(void *ctx)
154 {
155 	struct spdk_bdev_desc *desc = ctx;
156 
157 	spdk_bdev_close(desc);
158 }
159 
160 /* Close and unclaim base bdev */
161 static void
162 remove_base_bdev(struct vbdev_ocf_base *base)
163 {
164 	if (base->attached) {
165 		if (base->management_channel) {
166 			spdk_put_io_channel(base->management_channel);
167 		}
168 
169 		spdk_bdev_module_release_bdev(base->bdev);
170 		/* Close the underlying bdev on its same opened thread. */
171 		if (base->thread && base->thread != spdk_get_thread()) {
172 			spdk_thread_send_msg(base->thread, _remove_base_bdev, base->desc);
173 		} else {
174 			spdk_bdev_close(base->desc);
175 		}
176 		base->attached = false;
177 	}
178 }
179 
180 /* Finish unregister operation */
181 static void
182 unregister_finish(struct vbdev_ocf *vbdev)
183 {
184 	spdk_bdev_destruct_done(&vbdev->exp_bdev, vbdev->state.stop_status);
185 
186 	if (vbdev->ocf_cache) {
187 		ocf_mngt_cache_put(vbdev->ocf_cache);
188 	}
189 
190 	if (vbdev->cache_ctx) {
191 		vbdev_ocf_cache_ctx_put(vbdev->cache_ctx);
192 	}
193 	vbdev_ocf_mngt_continue(vbdev, 0);
194 }
195 
196 static void
197 close_core_bdev(struct vbdev_ocf *vbdev)
198 {
199 	remove_base_bdev(&vbdev->core);
200 	vbdev_ocf_mngt_continue(vbdev, 0);
201 }
202 
203 static void
204 remove_core_cmpl(void *priv, int error)
205 {
206 	struct vbdev_ocf *vbdev = priv;
207 
208 	ocf_mngt_cache_unlock(vbdev->ocf_cache);
209 	vbdev_ocf_mngt_continue(vbdev, error);
210 }
211 
212 /* Try to lock cache, then remove core */
213 static void
214 remove_core_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
215 {
216 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
217 
218 	if (error) {
219 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
220 			    error, vbdev->name);
221 		vbdev_ocf_mngt_continue(vbdev, error);
222 		return;
223 	}
224 
225 	ocf_mngt_cache_remove_core(vbdev->ocf_core, remove_core_cmpl, vbdev);
226 }
227 
228 /* Detach core base */
229 static void
230 detach_core(struct vbdev_ocf *vbdev)
231 {
232 	if (is_ocf_cache_running(vbdev)) {
233 		ocf_mngt_cache_lock(vbdev->ocf_cache, remove_core_cache_lock_cmpl, vbdev);
234 	} else {
235 		vbdev_ocf_mngt_continue(vbdev, 0);
236 	}
237 }
238 
239 static void
240 close_cache_bdev(struct vbdev_ocf *vbdev)
241 {
242 	remove_base_bdev(&vbdev->cache);
243 	vbdev_ocf_mngt_continue(vbdev, 0);
244 }
245 
246 /* Detach cache base */
247 static void
248 detach_cache(struct vbdev_ocf *vbdev)
249 {
250 	vbdev->state.stop_status = vbdev->mngt_ctx.status;
251 
252 	/* If some other vbdev references this cache bdev,
253 	 * we detach this only by changing the flag, without actual close */
254 	if (get_other_cache_base(&vbdev->cache)) {
255 		vbdev->cache.attached = false;
256 	}
257 
258 	vbdev_ocf_mngt_continue(vbdev, 0);
259 }
260 
261 static void
262 stop_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
263 {
264 	struct vbdev_ocf *vbdev = priv;
265 
266 	vbdev_ocf_queue_put(vbdev->cache_ctx->mngt_queue);
267 	ocf_mngt_cache_unlock(cache);
268 
269 	vbdev_ocf_mngt_continue(vbdev, error);
270 }
271 
272 /* Try to lock cache, then stop it */
273 static void
274 stop_vbdev_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
275 {
276 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
277 
278 	if (error) {
279 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
280 			    error, vbdev->name);
281 		vbdev_ocf_mngt_continue(vbdev, error);
282 		return;
283 	}
284 
285 	ocf_mngt_cache_stop(vbdev->ocf_cache, stop_vbdev_cmpl, vbdev);
286 }
287 
288 /* Stop OCF cache object
289  * vbdev_ocf is not operational after this */
290 static void
291 stop_vbdev(struct vbdev_ocf *vbdev)
292 {
293 	if (!is_ocf_cache_running(vbdev)) {
294 		vbdev_ocf_mngt_continue(vbdev, 0);
295 		return;
296 	}
297 
298 	if (!g_fini_started && get_other_cache_instance(vbdev)) {
299 		SPDK_NOTICELOG("Not stopping cache instance '%s'"
300 			       " because it is referenced by other OCF bdev\n",
301 			       vbdev->cache.name);
302 		vbdev_ocf_mngt_continue(vbdev, 0);
303 		return;
304 	}
305 
306 	ocf_mngt_cache_lock(vbdev->ocf_cache, stop_vbdev_cache_lock_cmpl, vbdev);
307 }
308 
309 static void
310 flush_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
311 {
312 	struct vbdev_ocf *vbdev = priv;
313 
314 	ocf_mngt_cache_unlock(cache);
315 	vbdev_ocf_mngt_continue(vbdev, error);
316 }
317 
318 static void
319 flush_vbdev_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
320 {
321 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
322 
323 	if (error) {
324 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
325 			    error, vbdev->name);
326 		vbdev_ocf_mngt_continue(vbdev, error);
327 		return;
328 	}
329 
330 	ocf_mngt_cache_flush(vbdev->ocf_cache, flush_vbdev_cmpl, vbdev);
331 }
332 
333 static void
334 flush_vbdev(struct vbdev_ocf *vbdev)
335 {
336 	if (!is_ocf_cache_running(vbdev)) {
337 		vbdev_ocf_mngt_continue(vbdev, -EINVAL);
338 		return;
339 	}
340 
341 	ocf_mngt_cache_lock(vbdev->ocf_cache, flush_vbdev_cache_lock_cmpl, vbdev);
342 }
343 
344 /* Procedures called during dirty unregister */
345 vbdev_ocf_mngt_fn unregister_path_dirty[] = {
346 	flush_vbdev,
347 	stop_vbdev,
348 	detach_cache,
349 	close_cache_bdev,
350 	detach_core,
351 	close_core_bdev,
352 	unregister_finish,
353 	NULL
354 };
355 
356 /* Procedures called during clean unregister */
357 vbdev_ocf_mngt_fn unregister_path_clean[] = {
358 	flush_vbdev,
359 	detach_core,
360 	close_core_bdev,
361 	stop_vbdev,
362 	detach_cache,
363 	close_cache_bdev,
364 	unregister_finish,
365 	NULL
366 };
367 
368 /* Start asynchronous management operation using unregister_path */
369 static void
370 unregister_cb(void *opaque)
371 {
372 	struct vbdev_ocf *vbdev = opaque;
373 	vbdev_ocf_mngt_fn *unregister_path;
374 	int rc;
375 
376 	unregister_path = vbdev->state.doing_clean_delete ?
377 			  unregister_path_clean : unregister_path_dirty;
378 
379 	rc = vbdev_ocf_mngt_start(vbdev, unregister_path, NULL, NULL);
380 	if (rc) {
381 		SPDK_ERRLOG("Unable to unregister OCF bdev: %d\n", rc);
382 		spdk_bdev_destruct_done(&vbdev->exp_bdev, rc);
383 	}
384 }
385 
386 /* Clean remove case - remove core and then cache, this order
387  * will remove instance permanently */
388 static void
389 _vbdev_ocf_destruct_clean(struct vbdev_ocf *vbdev)
390 {
391 	if (vbdev->core.attached) {
392 		detach_core(vbdev);
393 		close_core_bdev(vbdev);
394 	}
395 
396 	if (vbdev->cache.attached) {
397 		detach_cache(vbdev);
398 		close_cache_bdev(vbdev);
399 	}
400 }
401 
402 /* Dirty shutdown/hot remove case - remove cache and then core, this order
403  * will allow us to recover this instance in the future */
404 static void
405 _vbdev_ocf_destruct_dirty(struct vbdev_ocf *vbdev)
406 {
407 	if (vbdev->cache.attached) {
408 		detach_cache(vbdev);
409 		close_cache_bdev(vbdev);
410 	}
411 
412 	if (vbdev->core.attached) {
413 		detach_core(vbdev);
414 		close_core_bdev(vbdev);
415 	}
416 }
417 
418 /* Unregister io device with callback to unregister_cb
419  * This function is called during spdk_bdev_unregister */
420 static int
421 vbdev_ocf_destruct(void *opaque)
422 {
423 	struct vbdev_ocf *vbdev = opaque;
424 
425 	if (vbdev->state.doing_finish) {
426 		return -EALREADY;
427 	}
428 
429 	if (vbdev->state.starting && !vbdev->state.started) {
430 		/* Prevent before detach cache/core during register path of
431 		  this bdev */
432 		return -EBUSY;
433 	}
434 
435 	vbdev->state.doing_finish = true;
436 
437 	if (vbdev->state.started) {
438 		spdk_io_device_unregister(vbdev, unregister_cb);
439 		/* Return 1 because unregister is delayed */
440 		return 1;
441 	}
442 
443 	if (vbdev->state.doing_clean_delete) {
444 		_vbdev_ocf_destruct_clean(vbdev);
445 	} else {
446 		_vbdev_ocf_destruct_dirty(vbdev);
447 	}
448 
449 	return 0;
450 }
451 
452 /* Stop OCF cache and unregister SPDK bdev */
453 int
454 vbdev_ocf_delete(struct vbdev_ocf *vbdev, void (*cb)(void *, int), void *cb_arg)
455 {
456 	int rc = 0;
457 
458 	if (vbdev->state.started) {
459 		spdk_bdev_unregister(&vbdev->exp_bdev, cb, cb_arg);
460 	} else {
461 		rc = vbdev_ocf_destruct(vbdev);
462 		if (rc == 0 && cb) {
463 			cb(cb_arg, 0);
464 		}
465 	}
466 
467 	return rc;
468 }
469 
470 /* Remove cores permanently and then stop OCF cache and unregister SPDK bdev */
471 int
472 vbdev_ocf_delete_clean(struct vbdev_ocf *vbdev, void (*cb)(void *, int),
473 		       void *cb_arg)
474 {
475 	vbdev->state.doing_clean_delete = true;
476 
477 	return vbdev_ocf_delete(vbdev, cb, cb_arg);
478 }
479 
480 
481 /* If vbdev is online, return its object */
482 struct vbdev_ocf *
483 vbdev_ocf_get_by_name(const char *name)
484 {
485 	struct vbdev_ocf *vbdev;
486 
487 	if (name == NULL) {
488 		assert(false);
489 		return NULL;
490 	}
491 
492 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
493 		if (vbdev->name == NULL || vbdev->state.doing_finish) {
494 			continue;
495 		}
496 		if (strcmp(vbdev->name, name) == 0) {
497 			return vbdev;
498 		}
499 	}
500 	return NULL;
501 }
502 
503 /* Return matching base if parent vbdev is online */
504 struct vbdev_ocf_base *
505 vbdev_ocf_get_base_by_name(const char *name)
506 {
507 	struct vbdev_ocf *vbdev;
508 
509 	if (name == NULL) {
510 		assert(false);
511 		return NULL;
512 	}
513 
514 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
515 		if (vbdev->state.doing_finish) {
516 			continue;
517 		}
518 
519 		if (vbdev->cache.name && strcmp(vbdev->cache.name, name) == 0) {
520 			return &vbdev->cache;
521 		}
522 		if (vbdev->core.name && strcmp(vbdev->core.name, name) == 0) {
523 			return &vbdev->core;
524 		}
525 	}
526 	return NULL;
527 }
528 
529 /* Execute fn for each OCF device that is online or waits for base devices */
530 void
531 vbdev_ocf_foreach(vbdev_ocf_foreach_fn fn, void *ctx)
532 {
533 	struct vbdev_ocf *vbdev;
534 
535 	assert(fn != NULL);
536 
537 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
538 		if (!vbdev->state.doing_finish) {
539 			fn(vbdev, ctx);
540 		}
541 	}
542 }
543 
544 /* Called from OCF when SPDK_IO is completed */
545 static void
546 vbdev_ocf_io_submit_cb(struct ocf_io *io, int error)
547 {
548 	struct spdk_bdev_io *bdev_io = io->priv1;
549 
550 	if (error == 0) {
551 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
552 	} else if (error == -OCF_ERR_NO_MEM) {
553 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
554 	} else {
555 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
556 	}
557 
558 	ocf_io_put(io);
559 }
560 
561 /* Configure io parameters and send it to OCF */
562 static int
563 io_submit_to_ocf(struct spdk_bdev_io *bdev_io, struct ocf_io *io)
564 {
565 	switch (bdev_io->type) {
566 	case SPDK_BDEV_IO_TYPE_WRITE:
567 	case SPDK_BDEV_IO_TYPE_READ:
568 		ocf_core_submit_io(io);
569 		return 0;
570 	case SPDK_BDEV_IO_TYPE_FLUSH:
571 		ocf_core_submit_flush(io);
572 		return 0;
573 	case SPDK_BDEV_IO_TYPE_UNMAP:
574 		ocf_core_submit_discard(io);
575 		return 0;
576 	case SPDK_BDEV_IO_TYPE_RESET:
577 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
578 	default:
579 		SPDK_ERRLOG("Unsupported IO type: %d\n", bdev_io->type);
580 		return -EINVAL;
581 	}
582 }
583 
584 /* Submit SPDK-IO to OCF */
585 static void
586 io_handle(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
587 {
588 	struct vbdev_ocf *vbdev = bdev_io->bdev->ctxt;
589 	struct ocf_io *io = NULL;
590 	struct bdev_ocf_data *data = NULL;
591 	struct vbdev_ocf_qctx *qctx = spdk_io_channel_get_ctx(ch);
592 	uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
593 	uint64_t offset = bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen;
594 	int dir, flags = 0;
595 	int err;
596 
597 	switch (bdev_io->type) {
598 	case SPDK_BDEV_IO_TYPE_READ:
599 		dir = OCF_READ;
600 		break;
601 	case SPDK_BDEV_IO_TYPE_WRITE:
602 		dir = OCF_WRITE;
603 		break;
604 	case SPDK_BDEV_IO_TYPE_FLUSH:
605 		dir = OCF_WRITE;
606 		break;
607 	case SPDK_BDEV_IO_TYPE_UNMAP:
608 		dir = OCF_WRITE;
609 		break;
610 	default:
611 		err = -EINVAL;
612 		goto fail;
613 	}
614 
615 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
616 		flags = OCF_WRITE_FLUSH;
617 	}
618 
619 	io = ocf_volume_new_io(ocf_core_get_front_volume(vbdev->ocf_core), qctx->queue, offset, len, dir, 0,
620 			       flags);
621 	if (!io) {
622 		err = -ENOMEM;
623 		goto fail;
624 	}
625 
626 	data = vbdev_ocf_data_from_spdk_io(bdev_io);
627 	if (!data) {
628 		err = -ENOMEM;
629 		goto fail;
630 	}
631 
632 	err = ocf_io_set_data(io, data, 0);
633 	if (err) {
634 		goto fail;
635 	}
636 
637 	ocf_io_set_cmpl(io, bdev_io, NULL, vbdev_ocf_io_submit_cb);
638 
639 	err = io_submit_to_ocf(bdev_io, io);
640 	if (err) {
641 		goto fail;
642 	}
643 
644 	return;
645 
646 fail:
647 	if (io) {
648 		ocf_io_put(io);
649 	}
650 
651 	if (err == -ENOMEM) {
652 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
653 	} else {
654 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
655 	}
656 }
657 
658 static void
659 vbdev_ocf_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
660 		     bool success)
661 {
662 	if (!success) {
663 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
664 		return;
665 	}
666 
667 	io_handle(ch, bdev_io);
668 }
669 
670 /* Called from bdev layer when an io to Cache vbdev is submitted */
671 static void
672 vbdev_ocf_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
673 {
674 	switch (bdev_io->type) {
675 	case SPDK_BDEV_IO_TYPE_READ:
676 		/* User does not have to allocate io vectors for the request,
677 		 * so in case they are not allocated, we allocate them here */
678 		spdk_bdev_io_get_buf(bdev_io, vbdev_ocf_get_buf_cb,
679 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
680 		break;
681 	case SPDK_BDEV_IO_TYPE_WRITE:
682 	case SPDK_BDEV_IO_TYPE_FLUSH:
683 	case SPDK_BDEV_IO_TYPE_UNMAP:
684 		io_handle(ch, bdev_io);
685 		break;
686 	case SPDK_BDEV_IO_TYPE_RESET:
687 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
688 	default:
689 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
690 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
691 		break;
692 	}
693 }
694 
695 /* Called from bdev layer */
696 static bool
697 vbdev_ocf_io_type_supported(void *opaque, enum spdk_bdev_io_type io_type)
698 {
699 	struct vbdev_ocf *vbdev = opaque;
700 
701 	switch (io_type) {
702 	case SPDK_BDEV_IO_TYPE_READ:
703 	case SPDK_BDEV_IO_TYPE_WRITE:
704 	case SPDK_BDEV_IO_TYPE_FLUSH:
705 	case SPDK_BDEV_IO_TYPE_UNMAP:
706 		return spdk_bdev_io_type_supported(vbdev->core.bdev, io_type);
707 	case SPDK_BDEV_IO_TYPE_RESET:
708 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
709 	default:
710 		return false;
711 	}
712 }
713 
714 /* Called from bdev layer */
715 static struct spdk_io_channel *
716 vbdev_ocf_get_io_channel(void *opaque)
717 {
718 	struct vbdev_ocf *bdev = opaque;
719 
720 	return spdk_get_io_channel(bdev);
721 }
722 
723 static int
724 vbdev_ocf_dump_info_json(void *opaque, struct spdk_json_write_ctx *w)
725 {
726 	struct vbdev_ocf *vbdev = opaque;
727 
728 	spdk_json_write_named_string(w, "cache_device", vbdev->cache.name);
729 	spdk_json_write_named_string(w, "core_device", vbdev->core.name);
730 
731 	spdk_json_write_named_string(w, "mode",
732 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
733 	spdk_json_write_named_uint32(w, "cache_line_size",
734 				     ocf_get_cache_line_size(vbdev->ocf_cache));
735 	spdk_json_write_named_bool(w, "metadata_volatile",
736 				   vbdev->cfg.cache.metadata_volatile);
737 
738 	return 0;
739 }
740 
741 static void
742 vbdev_ocf_write_json_config(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
743 {
744 	struct vbdev_ocf *vbdev = bdev->ctxt;
745 
746 	spdk_json_write_object_begin(w);
747 
748 	spdk_json_write_named_string(w, "method", "bdev_ocf_create");
749 
750 	spdk_json_write_named_object_begin(w, "params");
751 	spdk_json_write_named_string(w, "name", vbdev->name);
752 	spdk_json_write_named_string(w, "mode",
753 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
754 	spdk_json_write_named_uint32(w, "cache_line_size",
755 				     ocf_get_cache_line_size(vbdev->ocf_cache));
756 	spdk_json_write_named_string(w, "cache_bdev_name", vbdev->cache.name);
757 	spdk_json_write_named_string(w, "core_bdev_name", vbdev->core.name);
758 	spdk_json_write_object_end(w);
759 
760 	spdk_json_write_object_end(w);
761 }
762 
763 /* Cache vbdev function table
764  * Used by bdev layer */
765 static struct spdk_bdev_fn_table cache_dev_fn_table = {
766 	.destruct = vbdev_ocf_destruct,
767 	.io_type_supported = vbdev_ocf_io_type_supported,
768 	.submit_request	= vbdev_ocf_submit_request,
769 	.get_io_channel	= vbdev_ocf_get_io_channel,
770 	.write_config_json = vbdev_ocf_write_json_config,
771 	.dump_info_json = vbdev_ocf_dump_info_json,
772 };
773 
774 /* Poller function for the OCF queue
775  * We execute OCF requests here synchronously */
776 static int
777 queue_poll(void *opaque)
778 {
779 	struct vbdev_ocf_qctx *qctx = opaque;
780 	uint32_t iono = ocf_queue_pending_io(qctx->queue);
781 	int i, max = spdk_min(32, iono);
782 
783 	for (i = 0; i < max; i++) {
784 		ocf_queue_run_single(qctx->queue);
785 	}
786 
787 	if (iono > 0) {
788 		return SPDK_POLLER_BUSY;
789 	} else {
790 		return SPDK_POLLER_IDLE;
791 	}
792 }
793 
794 /* Called during ocf_submit_io, ocf_purge*
795  * and any other requests that need to submit io */
796 static void
797 vbdev_ocf_ctx_queue_kick(ocf_queue_t q)
798 {
799 }
800 
801 /* OCF queue deinitialization
802  * Called at ocf_cache_stop */
803 static void
804 vbdev_ocf_ctx_queue_stop(ocf_queue_t q)
805 {
806 	struct vbdev_ocf_qctx *qctx = ocf_queue_get_priv(q);
807 
808 	if (qctx) {
809 		spdk_put_io_channel(qctx->cache_ch);
810 		spdk_put_io_channel(qctx->core_ch);
811 		spdk_poller_unregister(&qctx->poller);
812 		if (qctx->allocated) {
813 			free(qctx);
814 		}
815 	}
816 }
817 
818 /* Queue ops is an interface for running queue thread
819  * stop() operation in called just before queue gets destroyed */
820 const struct ocf_queue_ops queue_ops = {
821 	.kick_sync = vbdev_ocf_ctx_queue_kick,
822 	.kick = vbdev_ocf_ctx_queue_kick,
823 	.stop = vbdev_ocf_ctx_queue_stop,
824 };
825 
826 /* Called on cache vbdev creation at every thread
827  * We allocate OCF queues here and SPDK poller for it */
828 static int
829 io_device_create_cb(void *io_device, void *ctx_buf)
830 {
831 	struct vbdev_ocf *vbdev = io_device;
832 	struct vbdev_ocf_qctx *qctx = ctx_buf;
833 	int rc;
834 
835 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &qctx->queue, &queue_ops);
836 	if (rc) {
837 		return rc;
838 	}
839 
840 	ocf_queue_set_priv(qctx->queue, qctx);
841 
842 	qctx->vbdev      = vbdev;
843 	qctx->cache_ch   = spdk_bdev_get_io_channel(vbdev->cache.desc);
844 	qctx->core_ch    = spdk_bdev_get_io_channel(vbdev->core.desc);
845 	qctx->poller     = SPDK_POLLER_REGISTER(queue_poll, qctx, 0);
846 
847 	return rc;
848 }
849 
850 /* Called per thread
851  * Put OCF queue and relaunch poller with new context to finish pending requests */
852 static void
853 io_device_destroy_cb(void *io_device, void *ctx_buf)
854 {
855 	/* Making a copy of context to use it after io channel will be destroyed */
856 	struct vbdev_ocf_qctx *copy = malloc(sizeof(*copy));
857 	struct vbdev_ocf_qctx *qctx = ctx_buf;
858 
859 	if (copy) {
860 		ocf_queue_set_priv(qctx->queue, copy);
861 		memcpy(copy, qctx, sizeof(*copy));
862 		spdk_poller_unregister(&qctx->poller);
863 		copy->poller = SPDK_POLLER_REGISTER(queue_poll, copy, 0);
864 		copy->allocated = true;
865 	} else {
866 		SPDK_ERRLOG("Unable to stop OCF queue properly: %s\n",
867 			    spdk_strerror(ENOMEM));
868 	}
869 
870 	vbdev_ocf_queue_put(qctx->queue);
871 }
872 
873 /* OCF management queue deinitialization */
874 static void
875 vbdev_ocf_ctx_mngt_queue_stop(ocf_queue_t q)
876 {
877 	struct spdk_poller *poller = ocf_queue_get_priv(q);
878 
879 	if (poller) {
880 		spdk_poller_unregister(&poller);
881 	}
882 }
883 
884 static int
885 mngt_queue_poll(void *opaque)
886 {
887 	ocf_queue_t q = opaque;
888 	uint32_t iono = ocf_queue_pending_io(q);
889 	int i, max = spdk_min(32, iono);
890 
891 	for (i = 0; i < max; i++) {
892 		ocf_queue_run_single(q);
893 	}
894 
895 	if (iono > 0) {
896 		return SPDK_POLLER_BUSY;
897 	} else {
898 		return SPDK_POLLER_IDLE;
899 	}
900 }
901 
902 static void
903 vbdev_ocf_ctx_mngt_queue_kick(ocf_queue_t q)
904 {
905 }
906 
907 /* Queue ops is an interface for running queue thread
908  * stop() operation in called just before queue gets destroyed */
909 const struct ocf_queue_ops mngt_queue_ops = {
910 	.kick_sync = NULL,
911 	.kick = vbdev_ocf_ctx_mngt_queue_kick,
912 	.stop = vbdev_ocf_ctx_mngt_queue_stop,
913 };
914 
915 static void
916 vbdev_ocf_mngt_exit(struct vbdev_ocf *vbdev, vbdev_ocf_mngt_fn *rollback_path, int rc)
917 {
918 	vbdev->state.starting = false;
919 	vbdev_ocf_mngt_stop(vbdev, rollback_path, rc);
920 }
921 
922 /* Create exported spdk object */
923 static void
924 finish_register(struct vbdev_ocf *vbdev)
925 {
926 	struct spdk_uuid ns_uuid;
927 	int result;
928 
929 	/* Copy properties of the base bdev */
930 	vbdev->exp_bdev.blocklen = vbdev->core.bdev->blocklen;
931 	vbdev->exp_bdev.write_cache = vbdev->core.bdev->write_cache;
932 	vbdev->exp_bdev.required_alignment = vbdev->core.bdev->required_alignment;
933 
934 	vbdev->exp_bdev.name = vbdev->name;
935 	vbdev->exp_bdev.product_name = "SPDK OCF";
936 
937 	vbdev->exp_bdev.blockcnt = vbdev->core.bdev->blockcnt;
938 	vbdev->exp_bdev.ctxt = vbdev;
939 	vbdev->exp_bdev.fn_table = &cache_dev_fn_table;
940 	vbdev->exp_bdev.module = &ocf_if;
941 
942 	/* Generate UUID based on namespace UUID + base bdev UUID. */
943 	spdk_uuid_parse(&ns_uuid, BDEV_OCF_NAMESPACE_UUID);
944 	result = spdk_uuid_generate_sha1(&vbdev->exp_bdev.uuid, &ns_uuid,
945 					 (const char *)&vbdev->core.bdev->uuid, sizeof(struct spdk_uuid));
946 	if (result) {
947 		SPDK_ERRLOG("Unable to generate new UUID for ocf bdev\n");
948 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, result);
949 		return;
950 	}
951 
952 	/* Finally register vbdev in SPDK */
953 	spdk_io_device_register(vbdev, io_device_create_cb, io_device_destroy_cb,
954 				sizeof(struct vbdev_ocf_qctx), vbdev->name);
955 	result = spdk_bdev_register(&vbdev->exp_bdev);
956 	if (result) {
957 		SPDK_ERRLOG("Could not register exposed bdev %s\n",
958 			    vbdev->name);
959 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, result);
960 		return;
961 	} else {
962 		vbdev->state.started = true;
963 	}
964 
965 	vbdev_ocf_mngt_continue(vbdev, result);
966 }
967 
968 static void
969 add_core_cmpl(ocf_cache_t cache, ocf_core_t core, void *priv, int error)
970 {
971 	struct vbdev_ocf *vbdev = priv;
972 
973 	ocf_mngt_cache_unlock(cache);
974 
975 	if (error) {
976 		SPDK_ERRLOG("Error %d, failed to add core device to cache instance %s,"
977 			    "starting rollback\n", error, vbdev->name);
978 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
979 		return;
980 	} else {
981 		vbdev->ocf_core = core;
982 	}
983 
984 	vbdev_ocf_mngt_continue(vbdev, error);
985 }
986 
987 /* Try to lock cache, then add core */
988 static void
989 add_core_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
990 {
991 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
992 
993 	if (error) {
994 		SPDK_ERRLOG("Error %d, can not lock cache instance %s,"
995 			    "starting rollback\n", error, vbdev->name);
996 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
997 	}
998 	ocf_mngt_cache_add_core(vbdev->ocf_cache, &vbdev->cfg.core, add_core_cmpl, vbdev);
999 }
1000 
1001 /* Add core for existing OCF cache instance */
1002 static void
1003 add_core(struct vbdev_ocf *vbdev)
1004 {
1005 	ocf_mngt_cache_lock(vbdev->ocf_cache, add_core_cache_lock_cmpl, vbdev);
1006 }
1007 
1008 static void
1009 start_cache_cmpl(ocf_cache_t cache, void *priv, int error)
1010 {
1011 	struct vbdev_ocf *vbdev = priv;
1012 	uint64_t mem_needed;
1013 
1014 	ocf_mngt_cache_unlock(cache);
1015 
1016 	if (error) {
1017 		SPDK_ERRLOG("Error %d during start cache %s, starting rollback\n",
1018 			    error, vbdev->name);
1019 
1020 		if (error == -OCF_ERR_NO_MEM) {
1021 			ocf_mngt_get_ram_needed(cache, &vbdev->cfg.attach.device, &mem_needed);
1022 
1023 			SPDK_NOTICELOG("Try to increase hugepage memory size or cache line size. "
1024 				       "For your configuration:\nDevice size: %"PRIu64" bytes\n"
1025 				       "Cache line size: %"PRIu64" bytes\nFree memory needed to start "
1026 				       "cache: %"PRIu64" bytes\n", vbdev->cache.bdev->blockcnt *
1027 				       vbdev->cache.bdev->blocklen, vbdev->cfg.cache.cache_line_size,
1028 				       mem_needed);
1029 		}
1030 
1031 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
1032 		return;
1033 	}
1034 
1035 	vbdev_ocf_mngt_continue(vbdev, error);
1036 }
1037 
1038 static int
1039 create_management_queue(struct vbdev_ocf *vbdev)
1040 {
1041 	struct spdk_poller *mngt_poller;
1042 	int rc;
1043 
1044 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &vbdev->cache_ctx->mngt_queue, &mngt_queue_ops);
1045 	if (rc) {
1046 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1047 		return rc;
1048 	}
1049 
1050 	mngt_poller = SPDK_POLLER_REGISTER(mngt_queue_poll, vbdev->cache_ctx->mngt_queue, 100);
1051 	if (mngt_poller == NULL) {
1052 		SPDK_ERRLOG("Unable to initiate mngt request: %s", spdk_strerror(ENOMEM));
1053 		return -ENOMEM;
1054 	}
1055 
1056 	ocf_queue_set_priv(vbdev->cache_ctx->mngt_queue, mngt_poller);
1057 	ocf_mngt_cache_set_mngt_queue(vbdev->ocf_cache, vbdev->cache_ctx->mngt_queue);
1058 
1059 	return 0;
1060 }
1061 
1062 /* Start OCF cache, attach caching device */
1063 static void
1064 start_cache(struct vbdev_ocf *vbdev)
1065 {
1066 	ocf_cache_t existing;
1067 	uint32_t cache_block_size = vbdev->cache.bdev->blocklen;
1068 	uint32_t core_block_size = vbdev->core.bdev->blocklen;
1069 	int rc;
1070 
1071 	if (is_ocf_cache_running(vbdev)) {
1072 		vbdev_ocf_mngt_stop(vbdev, NULL, -EALREADY);
1073 		return;
1074 	}
1075 
1076 	if (cache_block_size > core_block_size) {
1077 		SPDK_ERRLOG("Cache bdev block size (%d) is bigger then core bdev block size (%d)\n",
1078 			    cache_block_size, core_block_size);
1079 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, -EINVAL);
1080 		return;
1081 	}
1082 
1083 	existing = get_other_cache_instance(vbdev);
1084 	if (existing) {
1085 		SPDK_NOTICELOG("OCF bdev %s connects to existing cache device %s\n",
1086 			       vbdev->name, vbdev->cache.name);
1087 		vbdev->ocf_cache = existing;
1088 		ocf_mngt_cache_get(vbdev->ocf_cache);
1089 		vbdev->cache_ctx = ocf_cache_get_priv(existing);
1090 		vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1091 		vbdev_ocf_mngt_continue(vbdev, 0);
1092 		return;
1093 	}
1094 
1095 	vbdev->cache_ctx = calloc(1, sizeof(struct vbdev_ocf_cache_ctx));
1096 	if (vbdev->cache_ctx == NULL) {
1097 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, -ENOMEM);
1098 		return;
1099 	}
1100 
1101 	vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1102 	pthread_mutex_init(&vbdev->cache_ctx->lock, NULL);
1103 
1104 	rc = ocf_mngt_cache_start(vbdev_ocf_ctx, &vbdev->ocf_cache, &vbdev->cfg.cache, NULL);
1105 	if (rc) {
1106 		SPDK_ERRLOG("Could not start cache %s: %d\n", vbdev->name, rc);
1107 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, rc);
1108 		return;
1109 	}
1110 	ocf_mngt_cache_get(vbdev->ocf_cache);
1111 
1112 	ocf_cache_set_priv(vbdev->ocf_cache, vbdev->cache_ctx);
1113 
1114 	rc = create_management_queue(vbdev);
1115 	if (rc) {
1116 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1117 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, rc);
1118 		return;
1119 	}
1120 
1121 	if (vbdev->cfg.loadq) {
1122 		ocf_mngt_cache_load(vbdev->ocf_cache, &vbdev->cfg.attach, start_cache_cmpl, vbdev);
1123 	} else {
1124 		ocf_mngt_cache_attach(vbdev->ocf_cache, &vbdev->cfg.attach, start_cache_cmpl, vbdev);
1125 	}
1126 }
1127 
1128 /* Procedures called during register operation */
1129 vbdev_ocf_mngt_fn register_path[] = {
1130 	start_cache,
1131 	add_core,
1132 	finish_register,
1133 	NULL
1134 };
1135 
1136 /* Start cache instance and register OCF bdev */
1137 static void
1138 register_vbdev(struct vbdev_ocf *vbdev, vbdev_ocf_mngt_callback cb, void *cb_arg)
1139 {
1140 	int rc;
1141 
1142 	if (!(vbdev->core.attached && vbdev->cache.attached) || vbdev->state.started) {
1143 		cb(-EPERM, vbdev, cb_arg);
1144 		return;
1145 	}
1146 
1147 	vbdev->state.starting = true;
1148 	rc = vbdev_ocf_mngt_start(vbdev, register_path, cb, cb_arg);
1149 	if (rc) {
1150 		cb(rc, vbdev, cb_arg);
1151 	}
1152 }
1153 
1154 /* Init OCF configuration options
1155  * for core and cache devices */
1156 static int
1157 init_vbdev_config(struct vbdev_ocf *vbdev)
1158 {
1159 	struct vbdev_ocf_config *cfg = &vbdev->cfg;
1160 	struct ocf_volume_uuid uuid;
1161 	ocf_volume_type_t type;
1162 	int ret;
1163 
1164 
1165 	/* Initialize OCF defaults first */
1166 	ocf_mngt_cache_attach_config_set_default(&cfg->attach);
1167 	ocf_mngt_cache_config_set_default(&cfg->cache);
1168 	ocf_mngt_core_config_set_default(&cfg->core);
1169 
1170 	ret = snprintf(cfg->cache.name, sizeof(cfg->cache.name), "%s", vbdev->name);
1171 	if (ret < 0 || (size_t) ret >= sizeof(cfg->cache.name)) {
1172 		return -EINVAL;
1173 	}
1174 	ret = snprintf(cfg->core.name, sizeof(cfg->core.name), "%s", vbdev->core.name);
1175 	if (ret < 0 || (size_t) ret >= sizeof(cfg->core.name)) {
1176 		return -EINVAL;
1177 	}
1178 
1179 	cfg->attach.open_cores = false;
1180 	cfg->attach.device.perform_test = false;
1181 	cfg->attach.discard_on_start = false;
1182 
1183 	vbdev->cfg.cache.locked = true;
1184 
1185 	cfg->core.volume_type = SPDK_OBJECT;
1186 
1187 	if (vbdev->cfg.loadq) {
1188 		/* When doing cache_load(), we need to set try_add to true,
1189 		 * otherwise OCF will interpret this core as new
1190 		 * instead of the inactive one */
1191 		vbdev->cfg.core.try_add = true;
1192 	} else {
1193 		/* When cache is initialized as new, set force flag to true,
1194 		 * to ignore warnings about existing metadata */
1195 		cfg->attach.force = true;
1196 	}
1197 
1198 	/* Serialize bdev names in OCF UUID to interpret on future loads
1199 	 * Core UUID is a triple of (core name, vbdev name, cache name)
1200 	 * Cache UUID is cache bdev name */
1201 	type = ocf_ctx_get_volume_type(vbdev_ocf_ctx, SPDK_OBJECT);
1202 	if (!type) {
1203 		SPDK_ERRLOG("Fail to get volume type\n");
1204 		return -EINVAL;
1205 	}
1206 	uuid.size = strlen(vbdev->cache.name) + 1;
1207 	uuid.data = vbdev->cache.name;
1208 	ret = ocf_volume_create(&cfg->attach.device.volume, type, &uuid);
1209 	if (ret) {
1210 		SPDK_ERRLOG("Fail to create volume\n");
1211 		return -EINVAL;
1212 	}
1213 
1214 	snprintf(vbdev->uuid, VBDEV_OCF_MD_MAX_LEN, "%s %s %s",
1215 		 vbdev->core.name, vbdev->name, vbdev->cache.name);
1216 	cfg->core.uuid.size = strlen(vbdev->uuid) + 1;
1217 	cfg->core.uuid.data = vbdev->uuid;
1218 	vbdev->uuid[strlen(vbdev->core.name)] = 0;
1219 	vbdev->uuid[strlen(vbdev->core.name) + 1 + strlen(vbdev->name)] = 0;
1220 
1221 	return 0;
1222 }
1223 
1224 /* Allocate vbdev structure object and add it to the global list */
1225 static int
1226 init_vbdev(const char *vbdev_name,
1227 	   const char *cache_mode_name,
1228 	   const uint64_t cache_line_size,
1229 	   const char *cache_name,
1230 	   const char *core_name,
1231 	   bool loadq)
1232 {
1233 	struct vbdev_ocf *vbdev;
1234 	int rc = 0;
1235 
1236 	if (spdk_bdev_get_by_name(vbdev_name) || vbdev_ocf_get_by_name(vbdev_name)) {
1237 		SPDK_ERRLOG("Device with name '%s' already exists\n", vbdev_name);
1238 		return -EPERM;
1239 	}
1240 
1241 	vbdev = calloc(1, sizeof(*vbdev));
1242 	if (!vbdev) {
1243 		goto error_mem;
1244 	}
1245 
1246 	vbdev->name = strdup(vbdev_name);
1247 	if (!vbdev->name) {
1248 		goto error_mem;
1249 	}
1250 
1251 	vbdev->cache.name = strdup(cache_name);
1252 	if (!vbdev->cache.name) {
1253 		goto error_mem;
1254 	}
1255 
1256 	vbdev->core.name = strdup(core_name);
1257 	if (!vbdev->core.name) {
1258 		goto error_mem;
1259 	}
1260 
1261 	vbdev->cache.parent = vbdev;
1262 	vbdev->core.parent = vbdev;
1263 	vbdev->cache.is_cache = true;
1264 	vbdev->core.is_cache = false;
1265 	vbdev->cfg.loadq = loadq;
1266 
1267 	rc = init_vbdev_config(vbdev);
1268 	if (rc) {
1269 		SPDK_ERRLOG("Fail to init vbdev config\n");
1270 		goto error_free;
1271 	}
1272 
1273 
1274 	if (cache_mode_name) {
1275 		vbdev->cfg.cache.cache_mode
1276 			= ocf_get_cache_mode(cache_mode_name);
1277 	} else if (!loadq) { /* In load path it is OK to pass NULL as cache mode */
1278 		SPDK_ERRLOG("No cache mode specified\n");
1279 		rc = -EINVAL;
1280 		goto error_free;
1281 	}
1282 	if (vbdev->cfg.cache.cache_mode < 0) {
1283 		SPDK_ERRLOG("Incorrect cache mode '%s'\n", cache_mode_name);
1284 		rc = -EINVAL;
1285 		goto error_free;
1286 	}
1287 
1288 	ocf_cache_line_size_t set_cache_line_size = cache_line_size ?
1289 			(ocf_cache_line_size_t)cache_line_size * KiB :
1290 			ocf_cache_line_size_default;
1291 	if (set_cache_line_size == 0) {
1292 		SPDK_ERRLOG("Cache line size should be non-zero.\n");
1293 		rc = -EINVAL;
1294 		goto error_free;
1295 	}
1296 	vbdev->cfg.attach.cache_line_size = set_cache_line_size;
1297 	vbdev->cfg.cache.cache_line_size = set_cache_line_size;
1298 
1299 	TAILQ_INSERT_TAIL(&g_ocf_vbdev_head, vbdev, tailq);
1300 	return rc;
1301 
1302 error_mem:
1303 	rc = -ENOMEM;
1304 error_free:
1305 	free_vbdev(vbdev);
1306 	return rc;
1307 }
1308 
1309 /* Read configuration file at the start of SPDK application
1310  * This adds vbdevs to global list if some mentioned in config */
1311 static int
1312 vbdev_ocf_init(void)
1313 {
1314 	int status;
1315 
1316 	status = vbdev_ocf_ctx_init();
1317 	if (status) {
1318 		SPDK_ERRLOG("OCF ctx initialization failed with=%d\n", status);
1319 		return status;
1320 	}
1321 
1322 	status = vbdev_ocf_volume_init();
1323 	if (status) {
1324 		vbdev_ocf_ctx_cleanup();
1325 		SPDK_ERRLOG("OCF volume initialization failed with=%d\n", status);
1326 		return status;
1327 	}
1328 
1329 	return status;
1330 }
1331 
1332 /* Called after application shutdown started
1333  * Release memory of allocated structures here */
1334 static void
1335 vbdev_ocf_module_fini(void)
1336 {
1337 	struct vbdev_ocf *vbdev;
1338 
1339 	while ((vbdev = TAILQ_FIRST(&g_ocf_vbdev_head))) {
1340 		TAILQ_REMOVE(&g_ocf_vbdev_head, vbdev, tailq);
1341 		free_vbdev(vbdev);
1342 	}
1343 
1344 	vbdev_ocf_volume_cleanup();
1345 	vbdev_ocf_ctx_cleanup();
1346 }
1347 
1348 /* When base device gets unplugged this is called
1349  * We will unregister cache vbdev here
1350  * When cache device is removed, we delete every OCF bdev that used it */
1351 static void
1352 hotremove_cb(struct vbdev_ocf_base *base)
1353 {
1354 	struct vbdev_ocf *vbdev;
1355 
1356 	if (!base->is_cache) {
1357 		if (base->parent->state.doing_finish) {
1358 			return;
1359 		}
1360 
1361 		SPDK_NOTICELOG("Deinitializing '%s' because its core device '%s' was removed\n",
1362 			       base->parent->name, base->name);
1363 		vbdev_ocf_delete(base->parent, NULL, NULL);
1364 		return;
1365 	}
1366 
1367 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1368 		if (vbdev->state.doing_finish) {
1369 			continue;
1370 		}
1371 		if (strcmp(base->name, vbdev->cache.name) == 0) {
1372 			SPDK_NOTICELOG("Deinitializing '%s' because"
1373 				       " its cache device '%s' was removed\n",
1374 				       vbdev->name, base->name);
1375 			vbdev_ocf_delete(vbdev, NULL, NULL);
1376 		}
1377 	}
1378 }
1379 
1380 static void
1381 base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1382 		   void *event_ctx)
1383 {
1384 	switch (type) {
1385 	case SPDK_BDEV_EVENT_REMOVE:
1386 		if (event_ctx) {
1387 			hotremove_cb(event_ctx);
1388 		}
1389 		break;
1390 	default:
1391 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1392 		break;
1393 	}
1394 }
1395 
1396 /* Open base SPDK bdev and claim it */
1397 static int
1398 attach_base(struct vbdev_ocf_base *base)
1399 {
1400 	int status;
1401 
1402 	if (base->attached) {
1403 		return -EALREADY;
1404 	}
1405 
1406 	/* If base cache bdev was already opened by other vbdev,
1407 	 * we just copy its descriptor here */
1408 	if (base->is_cache) {
1409 		struct vbdev_ocf_base *existing = get_other_cache_base(base);
1410 		if (existing) {
1411 			base->desc = existing->desc;
1412 			base->management_channel = existing->management_channel;
1413 			base->attached = true;
1414 			return 0;
1415 		}
1416 	}
1417 
1418 	status = spdk_bdev_open_ext(base->name, true, base_bdev_event_cb, base, &base->desc);
1419 	if (status) {
1420 		SPDK_ERRLOG("Unable to open device '%s' for writing\n", base->name);
1421 		return status;
1422 	}
1423 
1424 	status = spdk_bdev_module_claim_bdev(base->bdev, base->desc,
1425 					     &ocf_if);
1426 	if (status) {
1427 		SPDK_ERRLOG("Unable to claim device '%s'\n", base->name);
1428 		spdk_bdev_close(base->desc);
1429 		return status;
1430 	}
1431 
1432 	base->management_channel = spdk_bdev_get_io_channel(base->desc);
1433 	if (!base->management_channel) {
1434 		SPDK_ERRLOG("Unable to get io channel '%s'\n", base->name);
1435 		spdk_bdev_module_release_bdev(base->bdev);
1436 		spdk_bdev_close(base->desc);
1437 		return -ENOMEM;
1438 	}
1439 
1440 	/* Save the thread where the base device is opened */
1441 	base->thread = spdk_get_thread();
1442 
1443 	base->attached = true;
1444 	return status;
1445 }
1446 
1447 /* Attach base bdevs */
1448 static int
1449 attach_base_bdevs(struct vbdev_ocf *vbdev,
1450 		  struct spdk_bdev *cache_bdev,
1451 		  struct spdk_bdev *core_bdev)
1452 {
1453 	int rc = 0;
1454 
1455 	if (cache_bdev) {
1456 		vbdev->cache.bdev = cache_bdev;
1457 		rc |= attach_base(&vbdev->cache);
1458 	}
1459 
1460 	if (core_bdev) {
1461 		vbdev->core.bdev = core_bdev;
1462 		rc |= attach_base(&vbdev->core);
1463 	}
1464 
1465 	return rc;
1466 }
1467 
1468 /* Init and then start vbdev if all base devices are present */
1469 void
1470 vbdev_ocf_construct(const char *vbdev_name,
1471 		    const char *cache_mode_name,
1472 		    const uint64_t cache_line_size,
1473 		    const char *cache_name,
1474 		    const char *core_name,
1475 		    bool loadq,
1476 		    void (*cb)(int, struct vbdev_ocf *, void *),
1477 		    void *cb_arg)
1478 {
1479 	int rc;
1480 	struct spdk_bdev *cache_bdev = spdk_bdev_get_by_name(cache_name);
1481 	struct spdk_bdev *core_bdev = spdk_bdev_get_by_name(core_name);
1482 	struct vbdev_ocf *vbdev;
1483 
1484 	rc = init_vbdev(vbdev_name, cache_mode_name, cache_line_size, cache_name, core_name, loadq);
1485 	if (rc) {
1486 		cb(rc, NULL, cb_arg);
1487 		return;
1488 	}
1489 
1490 	vbdev = vbdev_ocf_get_by_name(vbdev_name);
1491 	if (vbdev == NULL) {
1492 		cb(-ENODEV, NULL, cb_arg);
1493 		return;
1494 	}
1495 
1496 	if (cache_bdev == NULL) {
1497 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for cache device '%s' to connect\n",
1498 			       vbdev->name, cache_name);
1499 	}
1500 	if (core_bdev == NULL) {
1501 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for core device '%s' to connect\n",
1502 			       vbdev->name, core_name);
1503 	}
1504 
1505 	rc = attach_base_bdevs(vbdev, cache_bdev, core_bdev);
1506 	if (rc) {
1507 		cb(rc, vbdev, cb_arg);
1508 		return;
1509 	}
1510 
1511 	if (core_bdev && cache_bdev) {
1512 		register_vbdev(vbdev, cb, cb_arg);
1513 	} else {
1514 		cb(0, vbdev, cb_arg);
1515 	}
1516 }
1517 
1518 /* Set new cache mode on OCF cache */
1519 void
1520 vbdev_ocf_set_cache_mode(struct vbdev_ocf *vbdev,
1521 			 const char *cache_mode_name,
1522 			 void (*cb)(int, struct vbdev_ocf *, void *),
1523 			 void *cb_arg)
1524 {
1525 	ocf_cache_t cache;
1526 	ocf_cache_mode_t cache_mode;
1527 	int rc;
1528 
1529 	cache = vbdev->ocf_cache;
1530 	cache_mode = ocf_get_cache_mode(cache_mode_name);
1531 
1532 	rc = ocf_mngt_cache_trylock(cache);
1533 	if (rc) {
1534 		cb(rc, vbdev, cb_arg);
1535 		return;
1536 	}
1537 
1538 	rc = ocf_mngt_cache_set_mode(cache, cache_mode);
1539 	ocf_mngt_cache_unlock(cache);
1540 	cb(rc, vbdev, cb_arg);
1541 }
1542 
1543 /* Set sequential cutoff parameters on OCF cache */
1544 void
1545 vbdev_ocf_set_seqcutoff(struct vbdev_ocf *vbdev, const char *policy_name, uint32_t threshold,
1546 			uint32_t promotion_count, void (*cb)(int, void *), void *cb_arg)
1547 {
1548 	ocf_cache_t cache;
1549 	ocf_seq_cutoff_policy policy;
1550 	int rc;
1551 
1552 	cache = vbdev->ocf_cache;
1553 
1554 	policy = ocf_get_seqcutoff_policy(policy_name);
1555 	if (policy == ocf_seq_cutoff_policy_max) {
1556 		cb(OCF_ERR_INVAL, cb_arg);
1557 		return;
1558 	}
1559 
1560 	rc = ocf_mngt_cache_trylock(cache);
1561 	if (rc) {
1562 		cb(rc, cb_arg);
1563 		return;
1564 	}
1565 
1566 	rc = ocf_mngt_core_set_seq_cutoff_policy_all(cache, policy);
1567 	if (rc) {
1568 		goto end;
1569 	}
1570 
1571 	if (threshold) {
1572 		threshold = threshold * KiB;
1573 
1574 		rc = ocf_mngt_core_set_seq_cutoff_threshold_all(cache, threshold);
1575 		if (rc) {
1576 			goto end;
1577 		}
1578 	}
1579 
1580 	if (promotion_count) {
1581 		rc = ocf_mngt_core_set_seq_cutoff_promotion_count_all(cache, promotion_count);
1582 	}
1583 
1584 end:
1585 	ocf_mngt_cache_unlock(cache);
1586 	cb(rc, cb_arg);
1587 }
1588 
1589 /* This called if new device is created in SPDK application
1590  * If that device named as one of base bdevs of OCF vbdev,
1591  * claim and open them */
1592 static void
1593 vbdev_ocf_examine(struct spdk_bdev *bdev)
1594 {
1595 	const char *bdev_name = spdk_bdev_get_name(bdev);
1596 	struct vbdev_ocf *vbdev;
1597 
1598 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1599 		if (vbdev->state.doing_finish) {
1600 			continue;
1601 		}
1602 
1603 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1604 			attach_base_bdevs(vbdev, bdev, NULL);
1605 			continue;
1606 		}
1607 		if (!strcmp(bdev_name, vbdev->core.name)) {
1608 			attach_base_bdevs(vbdev, NULL, bdev);
1609 			break;
1610 		}
1611 	}
1612 	spdk_bdev_module_examine_done(&ocf_if);
1613 }
1614 
1615 struct metadata_probe_ctx {
1616 	struct vbdev_ocf_base base;
1617 	ocf_volume_t volume;
1618 
1619 	struct ocf_volume_uuid *core_uuids;
1620 	unsigned int uuid_count;
1621 
1622 	int result;
1623 	int refcnt;
1624 };
1625 
1626 static void
1627 _examine_ctx_put(void *ctx)
1628 {
1629 	struct spdk_bdev_desc *desc = ctx;
1630 
1631 	spdk_bdev_close(desc);
1632 }
1633 
1634 static void
1635 examine_ctx_put(struct metadata_probe_ctx *ctx)
1636 {
1637 	unsigned int i;
1638 
1639 	ctx->refcnt--;
1640 	if (ctx->refcnt > 0) {
1641 		return;
1642 	}
1643 
1644 	if (ctx->result) {
1645 		SPDK_ERRLOG("OCF metadata probe for bdev '%s' failed with %d\n",
1646 			    spdk_bdev_get_name(ctx->base.bdev), ctx->result);
1647 	}
1648 
1649 	if (ctx->base.desc) {
1650 		/* Close the underlying bdev on its same opened thread. */
1651 		if (ctx->base.thread && ctx->base.thread != spdk_get_thread()) {
1652 			spdk_thread_send_msg(ctx->base.thread, _examine_ctx_put, ctx->base.desc);
1653 		} else {
1654 			spdk_bdev_close(ctx->base.desc);
1655 		}
1656 	}
1657 
1658 	if (ctx->volume) {
1659 		ocf_volume_destroy(ctx->volume);
1660 	}
1661 
1662 	if (ctx->core_uuids) {
1663 		for (i = 0; i < ctx->uuid_count; i++) {
1664 			free(ctx->core_uuids[i].data);
1665 		}
1666 	}
1667 	free(ctx->core_uuids);
1668 
1669 	examine_done(ctx->result, NULL, ctx->base.bdev);
1670 	free(ctx);
1671 }
1672 
1673 static void
1674 metadata_probe_cb(void *priv, int rc,
1675 		  struct ocf_metadata_probe_status *status)
1676 {
1677 	struct metadata_probe_ctx *ctx = priv;
1678 
1679 	if (rc) {
1680 		/* -ENODATA means device does not have cache metadata on it */
1681 		if (rc != -OCF_ERR_NO_METADATA) {
1682 			ctx->result = rc;
1683 		}
1684 	}
1685 
1686 	examine_ctx_put(ctx);
1687 }
1688 
1689 /* This is called after vbdev_ocf_examine
1690  * It allows to delay application initialization
1691  * until all OCF bdevs get registered
1692  * If vbdev has all of its base devices it starts asynchronously here
1693  * We first check if bdev appears in configuration,
1694  * if not we do metadata_probe() to create its configuration from bdev metadata */
1695 static void
1696 vbdev_ocf_examine_disk(struct spdk_bdev *bdev)
1697 {
1698 	const char *bdev_name = spdk_bdev_get_name(bdev);
1699 	struct vbdev_ocf *vbdev;
1700 	struct metadata_probe_ctx *ctx;
1701 	bool created_from_config = false;
1702 	int rc;
1703 
1704 	examine_start(bdev);
1705 
1706 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1707 		if (vbdev->state.doing_finish || vbdev->state.started) {
1708 			continue;
1709 		}
1710 
1711 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1712 			examine_start(bdev);
1713 			register_vbdev(vbdev, examine_done, bdev);
1714 			created_from_config = true;
1715 			continue;
1716 		}
1717 		if (!strcmp(bdev_name, vbdev->core.name)) {
1718 			examine_start(bdev);
1719 			register_vbdev(vbdev, examine_done, bdev);
1720 			examine_done(0, NULL, bdev);
1721 			return;
1722 		}
1723 	}
1724 
1725 	/* If devices is discovered during config we do not check for metadata */
1726 	if (created_from_config) {
1727 		examine_done(0, NULL, bdev);
1728 		return;
1729 	}
1730 
1731 	/* Metadata probe path
1732 	 * We create temporary OCF volume and a temporary base structure
1733 	 * to use them for ocf_metadata_probe() and for bottom adapter IOs
1734 	 * Then we get UUIDs of core devices an create configurations based on them */
1735 	ctx = calloc(1, sizeof(*ctx));
1736 	if (!ctx) {
1737 		examine_done(-ENOMEM, NULL, bdev);
1738 		return;
1739 	}
1740 
1741 	ctx->base.bdev = bdev;
1742 	ctx->refcnt = 1;
1743 
1744 	rc = spdk_bdev_open_ext(bdev_name, true, base_bdev_event_cb, NULL, &ctx->base.desc);
1745 	if (rc) {
1746 		ctx->result = rc;
1747 		examine_ctx_put(ctx);
1748 		return;
1749 	}
1750 
1751 	rc = ocf_ctx_volume_create(vbdev_ocf_ctx, &ctx->volume, NULL, SPDK_OBJECT);
1752 	if (rc) {
1753 		ctx->result = rc;
1754 		examine_ctx_put(ctx);
1755 		return;
1756 	}
1757 
1758 	rc = ocf_volume_open(ctx->volume, &ctx->base);
1759 	if (rc) {
1760 		ctx->result = rc;
1761 		examine_ctx_put(ctx);
1762 		return;
1763 	}
1764 
1765 	/* Save the thread where the base device is opened */
1766 	ctx->base.thread = spdk_get_thread();
1767 
1768 	ocf_metadata_probe(vbdev_ocf_ctx, ctx->volume, metadata_probe_cb, ctx);
1769 }
1770 
1771 static int
1772 vbdev_ocf_get_ctx_size(void)
1773 {
1774 	return sizeof(struct bdev_ocf_data);
1775 }
1776 
1777 static void
1778 fini_start(void)
1779 {
1780 	g_fini_started = true;
1781 }
1782 
1783 /* Module-global function table
1784  * Does not relate to vbdev instances */
1785 static struct spdk_bdev_module ocf_if = {
1786 	.name = "ocf",
1787 	.module_init = vbdev_ocf_init,
1788 	.fini_start = fini_start,
1789 	.module_fini = vbdev_ocf_module_fini,
1790 	.get_ctx_size = vbdev_ocf_get_ctx_size,
1791 	.examine_config = vbdev_ocf_examine,
1792 	.examine_disk   = vbdev_ocf_examine_disk,
1793 };
1794 SPDK_BDEV_MODULE_REGISTER(ocf, &ocf_if);
1795