xref: /spdk/module/bdev/ocf/vbdev_ocf.c (revision 6828ed1807e750af3ff4d49686614fe6013191bf)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include <ocf/ocf.h>
7 #include <ocf/ocf_types.h>
8 #include <ocf/ocf_mngt.h>
9 
10 #include "ctx.h"
11 #include "data.h"
12 #include "volume.h"
13 #include "utils.h"
14 #include "vbdev_ocf.h"
15 
16 #include "spdk/bdev_module.h"
17 #include "spdk/thread.h"
18 #include "spdk/string.h"
19 #include "spdk/log.h"
20 #include "spdk/cpuset.h"
21 
22 static struct spdk_bdev_module ocf_if;
23 
24 static TAILQ_HEAD(, vbdev_ocf) g_ocf_vbdev_head
25 	= TAILQ_HEAD_INITIALIZER(g_ocf_vbdev_head);
26 
27 static TAILQ_HEAD(, examining_bdev) g_ocf_examining_bdevs_head
28 	= TAILQ_HEAD_INITIALIZER(g_ocf_examining_bdevs_head);
29 
30 bool g_fini_started = false;
31 
32 /* Structure for keeping list of bdevs that are claimed but not used yet */
33 struct examining_bdev {
34 	struct spdk_bdev           *bdev;
35 	TAILQ_ENTRY(examining_bdev) tailq;
36 };
37 
38 /* Add bdev to list of claimed */
39 static void
40 examine_start(struct spdk_bdev *bdev)
41 {
42 	struct examining_bdev *entry = malloc(sizeof(*entry));
43 
44 	assert(entry);
45 	entry->bdev = bdev;
46 	TAILQ_INSERT_TAIL(&g_ocf_examining_bdevs_head, entry, tailq);
47 }
48 
49 /* Find bdev on list of claimed bdevs, then remove it,
50  * if it was the last one on list then report examine done */
51 static void
52 examine_done(int status, struct vbdev_ocf *vbdev, void *cb_arg)
53 {
54 	struct spdk_bdev *bdev = cb_arg;
55 	struct examining_bdev *entry, *safe, *found = NULL;
56 
57 	TAILQ_FOREACH_SAFE(entry, &g_ocf_examining_bdevs_head, tailq, safe) {
58 		if (entry->bdev == bdev) {
59 			if (found) {
60 				goto remove;
61 			} else {
62 				found = entry;
63 			}
64 		}
65 	}
66 
67 	assert(found);
68 	spdk_bdev_module_examine_done(&ocf_if);
69 
70 remove:
71 	TAILQ_REMOVE(&g_ocf_examining_bdevs_head, found, tailq);
72 	free(found);
73 }
74 
75 /* Free allocated strings and structure itself
76  * Used at shutdown only */
77 static void
78 free_vbdev(struct vbdev_ocf *vbdev)
79 {
80 	if (!vbdev) {
81 		return;
82 	}
83 
84 	free(vbdev->name);
85 	free(vbdev->cache.name);
86 	free(vbdev->core.name);
87 	free(vbdev);
88 }
89 
90 /* Get existing cache base
91  * that is attached to other vbdev */
92 static struct vbdev_ocf_base *
93 get_other_cache_base(struct vbdev_ocf_base *base)
94 {
95 	struct vbdev_ocf *vbdev;
96 
97 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
98 		if (&vbdev->cache == base || !vbdev->cache.attached) {
99 			continue;
100 		}
101 		if (!strcmp(vbdev->cache.name, base->name)) {
102 			return &vbdev->cache;
103 		}
104 	}
105 
106 	return NULL;
107 }
108 
109 static bool
110 is_ocf_cache_running(struct vbdev_ocf *vbdev)
111 {
112 	if (vbdev->cache.attached && vbdev->ocf_cache) {
113 		return ocf_cache_is_running(vbdev->ocf_cache);
114 	}
115 	return false;
116 }
117 
118 static bool
119 is_ocf_cache_initializing(struct vbdev_ocf *vbdev)
120 {
121 	if (vbdev->cache.attached && vbdev->ocf_cache) {
122 		return ocf_cache_is_initializing(vbdev->ocf_cache);
123 	}
124 	return false;
125 }
126 
127 /* Get existing OCF cache instance
128  * that is started by other vbdev */
129 static ocf_cache_t
130 get_other_cache_instance(struct vbdev_ocf *vbdev)
131 {
132 	struct vbdev_ocf *cmp;
133 
134 	TAILQ_FOREACH(cmp, &g_ocf_vbdev_head, tailq) {
135 		if (cmp->state.doing_finish || cmp == vbdev) {
136 			continue;
137 		}
138 		if (strcmp(cmp->cache.name, vbdev->cache.name)) {
139 			continue;
140 		}
141 		if (is_ocf_cache_running(cmp) || is_ocf_cache_initializing(cmp)) {
142 			return cmp->ocf_cache;
143 		}
144 	}
145 
146 	return NULL;
147 }
148 
149 static void
150 _remove_base_bdev(void *ctx)
151 {
152 	struct spdk_bdev_desc *desc = ctx;
153 
154 	spdk_bdev_close(desc);
155 }
156 
157 /* Close and unclaim base bdev */
158 static void
159 remove_base_bdev(struct vbdev_ocf_base *base)
160 {
161 	if (base->attached) {
162 		if (base->management_channel) {
163 			spdk_put_io_channel(base->management_channel);
164 		}
165 
166 		spdk_bdev_module_release_bdev(base->bdev);
167 		/* Close the underlying bdev on its same opened thread. */
168 		if (base->thread && base->thread != spdk_get_thread()) {
169 			spdk_thread_send_msg(base->thread, _remove_base_bdev, base->desc);
170 		} else {
171 			spdk_bdev_close(base->desc);
172 		}
173 		base->attached = false;
174 	}
175 }
176 
177 /* Finish unregister operation */
178 static void
179 unregister_finish(struct vbdev_ocf *vbdev)
180 {
181 	spdk_bdev_destruct_done(&vbdev->exp_bdev, vbdev->state.stop_status);
182 
183 	if (vbdev->ocf_cache) {
184 		ocf_mngt_cache_put(vbdev->ocf_cache);
185 	}
186 
187 	if (vbdev->cache_ctx) {
188 		vbdev_ocf_cache_ctx_put(vbdev->cache_ctx);
189 	}
190 	vbdev_ocf_mngt_continue(vbdev, 0);
191 }
192 
193 static void
194 close_core_bdev(struct vbdev_ocf *vbdev)
195 {
196 	remove_base_bdev(&vbdev->core);
197 	vbdev_ocf_mngt_continue(vbdev, 0);
198 }
199 
200 static void
201 remove_core_cmpl(void *priv, int error)
202 {
203 	struct vbdev_ocf *vbdev = priv;
204 
205 	ocf_mngt_cache_unlock(vbdev->ocf_cache);
206 	vbdev_ocf_mngt_continue(vbdev, error);
207 }
208 
209 /* Try to lock cache, then remove core */
210 static void
211 remove_core_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
212 {
213 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
214 
215 	if (error) {
216 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
217 			    error, vbdev->name);
218 		vbdev_ocf_mngt_continue(vbdev, error);
219 		return;
220 	}
221 
222 	ocf_mngt_cache_remove_core(vbdev->ocf_core, remove_core_cmpl, vbdev);
223 }
224 
225 /* Detach core base */
226 static void
227 detach_core(struct vbdev_ocf *vbdev)
228 {
229 	if (is_ocf_cache_running(vbdev)) {
230 		ocf_mngt_cache_lock(vbdev->ocf_cache, remove_core_cache_lock_cmpl, vbdev);
231 	} else {
232 		vbdev_ocf_mngt_continue(vbdev, 0);
233 	}
234 }
235 
236 static void
237 close_cache_bdev(struct vbdev_ocf *vbdev)
238 {
239 	remove_base_bdev(&vbdev->cache);
240 	vbdev_ocf_mngt_continue(vbdev, 0);
241 }
242 
243 /* Detach cache base */
244 static void
245 detach_cache(struct vbdev_ocf *vbdev)
246 {
247 	vbdev->state.stop_status = vbdev->mngt_ctx.status;
248 
249 	/* If some other vbdev references this cache bdev,
250 	 * we detach this only by changing the flag, without actual close */
251 	if (get_other_cache_base(&vbdev->cache)) {
252 		vbdev->cache.attached = false;
253 	}
254 
255 	vbdev_ocf_mngt_continue(vbdev, 0);
256 }
257 
258 static void
259 stop_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
260 {
261 	struct vbdev_ocf *vbdev = priv;
262 
263 	vbdev_ocf_queue_put(vbdev->cache_ctx->mngt_queue);
264 	ocf_mngt_cache_unlock(cache);
265 
266 	vbdev_ocf_mngt_continue(vbdev, error);
267 }
268 
269 /* Try to lock cache, then stop it */
270 static void
271 stop_vbdev_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
272 {
273 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
274 
275 	if (error) {
276 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
277 			    error, vbdev->name);
278 		vbdev_ocf_mngt_continue(vbdev, error);
279 		return;
280 	}
281 
282 	ocf_mngt_cache_stop(vbdev->ocf_cache, stop_vbdev_cmpl, vbdev);
283 }
284 
285 /* Stop OCF cache object
286  * vbdev_ocf is not operational after this */
287 static void
288 stop_vbdev(struct vbdev_ocf *vbdev)
289 {
290 	if (!is_ocf_cache_running(vbdev)) {
291 		vbdev_ocf_mngt_continue(vbdev, 0);
292 		return;
293 	}
294 
295 	if (!g_fini_started && get_other_cache_instance(vbdev)) {
296 		SPDK_NOTICELOG("Not stopping cache instance '%s'"
297 			       " because it is referenced by other OCF bdev\n",
298 			       vbdev->cache.name);
299 		vbdev_ocf_mngt_continue(vbdev, 0);
300 		return;
301 	}
302 
303 	ocf_mngt_cache_lock(vbdev->ocf_cache, stop_vbdev_cache_lock_cmpl, vbdev);
304 }
305 
306 static void
307 flush_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
308 {
309 	struct vbdev_ocf *vbdev = priv;
310 
311 	ocf_mngt_cache_unlock(cache);
312 	vbdev_ocf_mngt_continue(vbdev, error);
313 }
314 
315 static void
316 flush_vbdev_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
317 {
318 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
319 
320 	if (error) {
321 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
322 			    error, vbdev->name);
323 		vbdev_ocf_mngt_continue(vbdev, error);
324 		return;
325 	}
326 
327 	ocf_mngt_cache_flush(vbdev->ocf_cache, flush_vbdev_cmpl, vbdev);
328 }
329 
330 static void
331 flush_vbdev(struct vbdev_ocf *vbdev)
332 {
333 	if (!is_ocf_cache_running(vbdev)) {
334 		vbdev_ocf_mngt_continue(vbdev, -EINVAL);
335 		return;
336 	}
337 
338 	ocf_mngt_cache_lock(vbdev->ocf_cache, flush_vbdev_cache_lock_cmpl, vbdev);
339 }
340 
341 /* Procedures called during dirty unregister */
342 vbdev_ocf_mngt_fn unregister_path_dirty[] = {
343 	flush_vbdev,
344 	stop_vbdev,
345 	detach_cache,
346 	close_cache_bdev,
347 	detach_core,
348 	close_core_bdev,
349 	unregister_finish,
350 	NULL
351 };
352 
353 /* Procedures called during clean unregister */
354 vbdev_ocf_mngt_fn unregister_path_clean[] = {
355 	flush_vbdev,
356 	detach_core,
357 	close_core_bdev,
358 	stop_vbdev,
359 	detach_cache,
360 	close_cache_bdev,
361 	unregister_finish,
362 	NULL
363 };
364 
365 /* Start asynchronous management operation using unregister_path */
366 static void
367 unregister_cb(void *opaque)
368 {
369 	struct vbdev_ocf *vbdev = opaque;
370 	vbdev_ocf_mngt_fn *unregister_path;
371 	int rc;
372 
373 	unregister_path = vbdev->state.doing_clean_delete ?
374 			  unregister_path_clean : unregister_path_dirty;
375 
376 	rc = vbdev_ocf_mngt_start(vbdev, unregister_path, NULL, NULL);
377 	if (rc) {
378 		SPDK_ERRLOG("Unable to unregister OCF bdev: %d\n", rc);
379 		spdk_bdev_destruct_done(&vbdev->exp_bdev, rc);
380 	}
381 }
382 
383 /* Clean remove case - remove core and then cache, this order
384  * will remove instance permanently */
385 static void
386 _vbdev_ocf_destruct_clean(struct vbdev_ocf *vbdev)
387 {
388 	if (vbdev->core.attached) {
389 		detach_core(vbdev);
390 		close_core_bdev(vbdev);
391 	}
392 
393 	if (vbdev->cache.attached) {
394 		detach_cache(vbdev);
395 		close_cache_bdev(vbdev);
396 	}
397 }
398 
399 /* Dirty shutdown/hot remove case - remove cache and then core, this order
400  * will allow us to recover this instance in the future */
401 static void
402 _vbdev_ocf_destruct_dirty(struct vbdev_ocf *vbdev)
403 {
404 	if (vbdev->cache.attached) {
405 		detach_cache(vbdev);
406 		close_cache_bdev(vbdev);
407 	}
408 
409 	if (vbdev->core.attached) {
410 		detach_core(vbdev);
411 		close_core_bdev(vbdev);
412 	}
413 }
414 
415 /* Unregister io device with callback to unregister_cb
416  * This function is called during spdk_bdev_unregister */
417 static int
418 vbdev_ocf_destruct(void *opaque)
419 {
420 	struct vbdev_ocf *vbdev = opaque;
421 
422 	if (vbdev->state.doing_finish) {
423 		return -EALREADY;
424 	}
425 
426 	if (vbdev->state.starting && !vbdev->state.started) {
427 		/* Prevent before detach cache/core during register path of
428 		  this bdev */
429 		return -EBUSY;
430 	}
431 
432 	vbdev->state.doing_finish = true;
433 
434 	if (vbdev->state.started) {
435 		spdk_io_device_unregister(vbdev, unregister_cb);
436 		/* Return 1 because unregister is delayed */
437 		return 1;
438 	}
439 
440 	if (vbdev->state.doing_clean_delete) {
441 		_vbdev_ocf_destruct_clean(vbdev);
442 	} else {
443 		_vbdev_ocf_destruct_dirty(vbdev);
444 	}
445 
446 	return 0;
447 }
448 
449 /* Stop OCF cache and unregister SPDK bdev */
450 int
451 vbdev_ocf_delete(struct vbdev_ocf *vbdev, void (*cb)(void *, int), void *cb_arg)
452 {
453 	int rc = 0;
454 
455 	if (vbdev->state.started) {
456 		spdk_bdev_unregister(&vbdev->exp_bdev, cb, cb_arg);
457 	} else {
458 		rc = vbdev_ocf_destruct(vbdev);
459 		if (rc == 0 && cb) {
460 			cb(cb_arg, 0);
461 		}
462 	}
463 
464 	return rc;
465 }
466 
467 /* Remove cores permanently and then stop OCF cache and unregister SPDK bdev */
468 int
469 vbdev_ocf_delete_clean(struct vbdev_ocf *vbdev, void (*cb)(void *, int),
470 		       void *cb_arg)
471 {
472 	vbdev->state.doing_clean_delete = true;
473 
474 	return vbdev_ocf_delete(vbdev, cb, cb_arg);
475 }
476 
477 
478 /* If vbdev is online, return its object */
479 struct vbdev_ocf *
480 vbdev_ocf_get_by_name(const char *name)
481 {
482 	struct vbdev_ocf *vbdev;
483 
484 	if (name == NULL) {
485 		assert(false);
486 		return NULL;
487 	}
488 
489 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
490 		if (vbdev->name == NULL || vbdev->state.doing_finish) {
491 			continue;
492 		}
493 		if (strcmp(vbdev->name, name) == 0) {
494 			return vbdev;
495 		}
496 	}
497 	return NULL;
498 }
499 
500 /* Return matching base if parent vbdev is online */
501 struct vbdev_ocf_base *
502 vbdev_ocf_get_base_by_name(const char *name)
503 {
504 	struct vbdev_ocf *vbdev;
505 
506 	if (name == NULL) {
507 		assert(false);
508 		return NULL;
509 	}
510 
511 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
512 		if (vbdev->state.doing_finish) {
513 			continue;
514 		}
515 
516 		if (vbdev->cache.name && strcmp(vbdev->cache.name, name) == 0) {
517 			return &vbdev->cache;
518 		}
519 		if (vbdev->core.name && strcmp(vbdev->core.name, name) == 0) {
520 			return &vbdev->core;
521 		}
522 	}
523 	return NULL;
524 }
525 
526 /* Execute fn for each OCF device that is online or waits for base devices */
527 void
528 vbdev_ocf_foreach(vbdev_ocf_foreach_fn fn, void *ctx)
529 {
530 	struct vbdev_ocf *vbdev;
531 
532 	assert(fn != NULL);
533 
534 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
535 		if (!vbdev->state.doing_finish) {
536 			fn(vbdev, ctx);
537 		}
538 	}
539 }
540 
541 /* Called from OCF when SPDK_IO is completed */
542 static void
543 vbdev_ocf_io_submit_cb(struct ocf_io *io, int error)
544 {
545 	struct spdk_bdev_io *bdev_io = io->priv1;
546 
547 	if (error == 0) {
548 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
549 	} else if (error == -OCF_ERR_NO_MEM) {
550 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
551 	} else {
552 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
553 	}
554 
555 	ocf_io_put(io);
556 }
557 
558 /* Configure io parameters and send it to OCF */
559 static int
560 io_submit_to_ocf(struct spdk_bdev_io *bdev_io, struct ocf_io *io)
561 {
562 	switch (bdev_io->type) {
563 	case SPDK_BDEV_IO_TYPE_WRITE:
564 	case SPDK_BDEV_IO_TYPE_READ:
565 		ocf_core_submit_io(io);
566 		return 0;
567 	case SPDK_BDEV_IO_TYPE_FLUSH:
568 		ocf_core_submit_flush(io);
569 		return 0;
570 	case SPDK_BDEV_IO_TYPE_UNMAP:
571 		ocf_core_submit_discard(io);
572 		return 0;
573 	case SPDK_BDEV_IO_TYPE_RESET:
574 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
575 	default:
576 		SPDK_ERRLOG("Unsupported IO type: %d\n", bdev_io->type);
577 		return -EINVAL;
578 	}
579 }
580 
581 /* Submit SPDK-IO to OCF */
582 static void
583 io_handle(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
584 {
585 	struct vbdev_ocf *vbdev = bdev_io->bdev->ctxt;
586 	struct ocf_io *io = NULL;
587 	struct bdev_ocf_data *data = NULL;
588 	struct vbdev_ocf_qctx *qctx = spdk_io_channel_get_ctx(ch);
589 	uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
590 	uint64_t offset = bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen;
591 	int dir, flags = 0;
592 	int err;
593 
594 	switch (bdev_io->type) {
595 	case SPDK_BDEV_IO_TYPE_READ:
596 		dir = OCF_READ;
597 		break;
598 	case SPDK_BDEV_IO_TYPE_WRITE:
599 		dir = OCF_WRITE;
600 		break;
601 	case SPDK_BDEV_IO_TYPE_FLUSH:
602 		dir = OCF_WRITE;
603 		break;
604 	case SPDK_BDEV_IO_TYPE_UNMAP:
605 		dir = OCF_WRITE;
606 		break;
607 	default:
608 		err = -EINVAL;
609 		goto fail;
610 	}
611 
612 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
613 		flags = OCF_WRITE_FLUSH;
614 	}
615 
616 	io = ocf_volume_new_io(ocf_core_get_front_volume(vbdev->ocf_core), qctx->queue, offset, len, dir, 0,
617 			       flags);
618 	if (!io) {
619 		err = -ENOMEM;
620 		goto fail;
621 	}
622 
623 	data = vbdev_ocf_data_from_spdk_io(bdev_io);
624 	if (!data) {
625 		err = -ENOMEM;
626 		goto fail;
627 	}
628 
629 	err = ocf_io_set_data(io, data, 0);
630 	if (err) {
631 		goto fail;
632 	}
633 
634 	ocf_io_set_cmpl(io, bdev_io, NULL, vbdev_ocf_io_submit_cb);
635 
636 	err = io_submit_to_ocf(bdev_io, io);
637 	if (err) {
638 		goto fail;
639 	}
640 
641 	return;
642 
643 fail:
644 	if (io) {
645 		ocf_io_put(io);
646 	}
647 
648 	if (err == -ENOMEM) {
649 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
650 	} else {
651 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
652 	}
653 }
654 
655 static void
656 vbdev_ocf_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
657 		     bool success)
658 {
659 	if (!success) {
660 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
661 		return;
662 	}
663 
664 	io_handle(ch, bdev_io);
665 }
666 
667 /* Called from bdev layer when an io to Cache vbdev is submitted */
668 static void
669 vbdev_ocf_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
670 {
671 	switch (bdev_io->type) {
672 	case SPDK_BDEV_IO_TYPE_READ:
673 		/* User does not have to allocate io vectors for the request,
674 		 * so in case they are not allocated, we allocate them here */
675 		spdk_bdev_io_get_buf(bdev_io, vbdev_ocf_get_buf_cb,
676 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
677 		break;
678 	case SPDK_BDEV_IO_TYPE_WRITE:
679 	case SPDK_BDEV_IO_TYPE_FLUSH:
680 	case SPDK_BDEV_IO_TYPE_UNMAP:
681 		io_handle(ch, bdev_io);
682 		break;
683 	case SPDK_BDEV_IO_TYPE_RESET:
684 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
685 	default:
686 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
687 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
688 		break;
689 	}
690 }
691 
692 /* Called from bdev layer */
693 static bool
694 vbdev_ocf_io_type_supported(void *opaque, enum spdk_bdev_io_type io_type)
695 {
696 	struct vbdev_ocf *vbdev = opaque;
697 
698 	switch (io_type) {
699 	case SPDK_BDEV_IO_TYPE_READ:
700 	case SPDK_BDEV_IO_TYPE_WRITE:
701 	case SPDK_BDEV_IO_TYPE_FLUSH:
702 	case SPDK_BDEV_IO_TYPE_UNMAP:
703 		return spdk_bdev_io_type_supported(vbdev->core.bdev, io_type);
704 	case SPDK_BDEV_IO_TYPE_RESET:
705 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
706 	default:
707 		return false;
708 	}
709 }
710 
711 /* Called from bdev layer */
712 static struct spdk_io_channel *
713 vbdev_ocf_get_io_channel(void *opaque)
714 {
715 	struct vbdev_ocf *bdev = opaque;
716 
717 	return spdk_get_io_channel(bdev);
718 }
719 
720 static int
721 vbdev_ocf_dump_info_json(void *opaque, struct spdk_json_write_ctx *w)
722 {
723 	struct vbdev_ocf *vbdev = opaque;
724 
725 	spdk_json_write_named_string(w, "cache_device", vbdev->cache.name);
726 	spdk_json_write_named_string(w, "core_device", vbdev->core.name);
727 
728 	spdk_json_write_named_string(w, "mode",
729 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
730 	spdk_json_write_named_uint32(w, "cache_line_size",
731 				     ocf_get_cache_line_size(vbdev->ocf_cache));
732 	spdk_json_write_named_bool(w, "metadata_volatile",
733 				   vbdev->cfg.cache.metadata_volatile);
734 
735 	return 0;
736 }
737 
738 static void
739 vbdev_ocf_write_json_config(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
740 {
741 	struct vbdev_ocf *vbdev = bdev->ctxt;
742 
743 	spdk_json_write_object_begin(w);
744 
745 	spdk_json_write_named_string(w, "method", "bdev_ocf_create");
746 
747 	spdk_json_write_named_object_begin(w, "params");
748 	spdk_json_write_named_string(w, "name", vbdev->name);
749 	spdk_json_write_named_string(w, "mode",
750 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
751 	spdk_json_write_named_uint32(w, "cache_line_size",
752 				     ocf_get_cache_line_size(vbdev->ocf_cache));
753 	spdk_json_write_named_string(w, "cache_bdev_name", vbdev->cache.name);
754 	spdk_json_write_named_string(w, "core_bdev_name", vbdev->core.name);
755 	spdk_json_write_object_end(w);
756 
757 	spdk_json_write_object_end(w);
758 }
759 
760 /* Cache vbdev function table
761  * Used by bdev layer */
762 static struct spdk_bdev_fn_table cache_dev_fn_table = {
763 	.destruct = vbdev_ocf_destruct,
764 	.io_type_supported = vbdev_ocf_io_type_supported,
765 	.submit_request	= vbdev_ocf_submit_request,
766 	.get_io_channel	= vbdev_ocf_get_io_channel,
767 	.write_config_json = vbdev_ocf_write_json_config,
768 	.dump_info_json = vbdev_ocf_dump_info_json,
769 };
770 
771 /* Poller function for the OCF queue
772  * We execute OCF requests here synchronously */
773 static int
774 queue_poll(void *opaque)
775 {
776 	struct vbdev_ocf_qctx *qctx = opaque;
777 	uint32_t iono = ocf_queue_pending_io(qctx->queue);
778 	int i, max = spdk_min(32, iono);
779 
780 	for (i = 0; i < max; i++) {
781 		ocf_queue_run_single(qctx->queue);
782 	}
783 
784 	if (iono > 0) {
785 		return SPDK_POLLER_BUSY;
786 	} else {
787 		return SPDK_POLLER_IDLE;
788 	}
789 }
790 
791 /* Called during ocf_submit_io, ocf_purge*
792  * and any other requests that need to submit io */
793 static void
794 vbdev_ocf_ctx_queue_kick(ocf_queue_t q)
795 {
796 }
797 
798 /* OCF queue deinitialization
799  * Called at ocf_cache_stop */
800 static void
801 vbdev_ocf_ctx_queue_stop(ocf_queue_t q)
802 {
803 	struct vbdev_ocf_qctx *qctx = ocf_queue_get_priv(q);
804 
805 	if (qctx) {
806 		spdk_put_io_channel(qctx->cache_ch);
807 		spdk_put_io_channel(qctx->core_ch);
808 		spdk_poller_unregister(&qctx->poller);
809 		if (qctx->allocated) {
810 			free(qctx);
811 		}
812 	}
813 }
814 
815 /* Queue ops is an interface for running queue thread
816  * stop() operation in called just before queue gets destroyed */
817 const struct ocf_queue_ops queue_ops = {
818 	.kick_sync = vbdev_ocf_ctx_queue_kick,
819 	.kick = vbdev_ocf_ctx_queue_kick,
820 	.stop = vbdev_ocf_ctx_queue_stop,
821 };
822 
823 /* Called on cache vbdev creation at every thread
824  * We allocate OCF queues here and SPDK poller for it */
825 static int
826 io_device_create_cb(void *io_device, void *ctx_buf)
827 {
828 	struct vbdev_ocf *vbdev = io_device;
829 	struct vbdev_ocf_qctx *qctx = ctx_buf;
830 	int rc;
831 
832 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &qctx->queue, &queue_ops);
833 	if (rc) {
834 		return rc;
835 	}
836 
837 	ocf_queue_set_priv(qctx->queue, qctx);
838 
839 	qctx->vbdev      = vbdev;
840 	qctx->cache_ch   = spdk_bdev_get_io_channel(vbdev->cache.desc);
841 	qctx->core_ch    = spdk_bdev_get_io_channel(vbdev->core.desc);
842 	qctx->poller     = SPDK_POLLER_REGISTER(queue_poll, qctx, 0);
843 
844 	return rc;
845 }
846 
847 /* Called per thread
848  * Put OCF queue and relaunch poller with new context to finish pending requests */
849 static void
850 io_device_destroy_cb(void *io_device, void *ctx_buf)
851 {
852 	/* Making a copy of context to use it after io channel will be destroyed */
853 	struct vbdev_ocf_qctx *copy = malloc(sizeof(*copy));
854 	struct vbdev_ocf_qctx *qctx = ctx_buf;
855 
856 	if (copy) {
857 		ocf_queue_set_priv(qctx->queue, copy);
858 		memcpy(copy, qctx, sizeof(*copy));
859 		spdk_poller_unregister(&qctx->poller);
860 		copy->poller = SPDK_POLLER_REGISTER(queue_poll, copy, 0);
861 		copy->allocated = true;
862 	} else {
863 		SPDK_ERRLOG("Unable to stop OCF queue properly: %s\n",
864 			    spdk_strerror(ENOMEM));
865 	}
866 
867 	vbdev_ocf_queue_put(qctx->queue);
868 }
869 
870 /* OCF management queue deinitialization */
871 static void
872 vbdev_ocf_ctx_mngt_queue_stop(ocf_queue_t q)
873 {
874 	struct spdk_poller *poller = ocf_queue_get_priv(q);
875 
876 	if (poller) {
877 		spdk_poller_unregister(&poller);
878 	}
879 }
880 
881 static int
882 mngt_queue_poll(void *opaque)
883 {
884 	ocf_queue_t q = opaque;
885 	uint32_t iono = ocf_queue_pending_io(q);
886 	int i, max = spdk_min(32, iono);
887 
888 	for (i = 0; i < max; i++) {
889 		ocf_queue_run_single(q);
890 	}
891 
892 	if (iono > 0) {
893 		return SPDK_POLLER_BUSY;
894 	} else {
895 		return SPDK_POLLER_IDLE;
896 	}
897 }
898 
899 static void
900 vbdev_ocf_ctx_mngt_queue_kick(ocf_queue_t q)
901 {
902 }
903 
904 /* Queue ops is an interface for running queue thread
905  * stop() operation in called just before queue gets destroyed */
906 const struct ocf_queue_ops mngt_queue_ops = {
907 	.kick_sync = NULL,
908 	.kick = vbdev_ocf_ctx_mngt_queue_kick,
909 	.stop = vbdev_ocf_ctx_mngt_queue_stop,
910 };
911 
912 static void
913 vbdev_ocf_mngt_exit(struct vbdev_ocf *vbdev, vbdev_ocf_mngt_fn *rollback_path, int rc)
914 {
915 	vbdev->state.starting = false;
916 	vbdev_ocf_mngt_stop(vbdev, rollback_path, rc);
917 }
918 
919 /* Create exported spdk object */
920 static void
921 finish_register(struct vbdev_ocf *vbdev)
922 {
923 	int result;
924 
925 	/* Copy properties of the base bdev */
926 	vbdev->exp_bdev.blocklen = vbdev->core.bdev->blocklen;
927 	vbdev->exp_bdev.write_cache = vbdev->core.bdev->write_cache;
928 	vbdev->exp_bdev.required_alignment = vbdev->core.bdev->required_alignment;
929 
930 	vbdev->exp_bdev.name = vbdev->name;
931 	vbdev->exp_bdev.product_name = "SPDK OCF";
932 
933 	vbdev->exp_bdev.blockcnt = vbdev->core.bdev->blockcnt;
934 	vbdev->exp_bdev.ctxt = vbdev;
935 	vbdev->exp_bdev.fn_table = &cache_dev_fn_table;
936 	vbdev->exp_bdev.module = &ocf_if;
937 
938 	/* Finally register vbdev in SPDK */
939 	spdk_io_device_register(vbdev, io_device_create_cb, io_device_destroy_cb,
940 				sizeof(struct vbdev_ocf_qctx), vbdev->name);
941 	result = spdk_bdev_register(&vbdev->exp_bdev);
942 	if (result) {
943 		SPDK_ERRLOG("Could not register exposed bdev %s\n",
944 			    vbdev->name);
945 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, result);
946 		return;
947 	} else {
948 		vbdev->state.started = true;
949 	}
950 
951 	vbdev_ocf_mngt_continue(vbdev, result);
952 }
953 
954 static void
955 add_core_cmpl(ocf_cache_t cache, ocf_core_t core, void *priv, int error)
956 {
957 	struct vbdev_ocf *vbdev = priv;
958 
959 	ocf_mngt_cache_unlock(cache);
960 
961 	if (error) {
962 		SPDK_ERRLOG("Error %d, failed to add core device to cache instance %s,"
963 			    "starting rollback\n", error, vbdev->name);
964 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
965 		return;
966 	} else {
967 		vbdev->ocf_core = core;
968 	}
969 
970 	vbdev_ocf_mngt_continue(vbdev, error);
971 }
972 
973 /* Try to lock cache, then add core */
974 static void
975 add_core_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
976 {
977 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
978 
979 	if (error) {
980 		SPDK_ERRLOG("Error %d, can not lock cache instance %s,"
981 			    "starting rollback\n", error, vbdev->name);
982 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
983 	}
984 	ocf_mngt_cache_add_core(vbdev->ocf_cache, &vbdev->cfg.core, add_core_cmpl, vbdev);
985 }
986 
987 /* Add core for existing OCF cache instance */
988 static void
989 add_core(struct vbdev_ocf *vbdev)
990 {
991 	ocf_mngt_cache_lock(vbdev->ocf_cache, add_core_cache_lock_cmpl, vbdev);
992 }
993 
994 static void
995 start_cache_cmpl(ocf_cache_t cache, void *priv, int error)
996 {
997 	struct vbdev_ocf *vbdev = priv;
998 	uint64_t mem_needed;
999 
1000 	ocf_mngt_cache_unlock(cache);
1001 
1002 	if (error) {
1003 		SPDK_ERRLOG("Error %d during start cache %s, starting rollback\n",
1004 			    error, vbdev->name);
1005 
1006 		if (error == -OCF_ERR_NO_MEM) {
1007 			ocf_mngt_get_ram_needed(cache, &vbdev->cfg.attach.device, &mem_needed);
1008 
1009 			SPDK_NOTICELOG("Try to increase hugepage memory size or cache line size. "
1010 				       "For your configuration:\nDevice size: %"PRIu64" bytes\n"
1011 				       "Cache line size: %"PRIu64" bytes\nFree memory needed to start "
1012 				       "cache: %"PRIu64" bytes\n", vbdev->cache.bdev->blockcnt *
1013 				       vbdev->cache.bdev->blocklen, vbdev->cfg.cache.cache_line_size,
1014 				       mem_needed);
1015 		}
1016 
1017 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
1018 		return;
1019 	}
1020 
1021 	vbdev_ocf_mngt_continue(vbdev, error);
1022 }
1023 
1024 static int
1025 create_management_queue(struct vbdev_ocf *vbdev)
1026 {
1027 	struct spdk_poller *mngt_poller;
1028 	int rc;
1029 
1030 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &vbdev->cache_ctx->mngt_queue, &mngt_queue_ops);
1031 	if (rc) {
1032 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1033 		return rc;
1034 	}
1035 
1036 	mngt_poller = SPDK_POLLER_REGISTER(mngt_queue_poll, vbdev->cache_ctx->mngt_queue, 100);
1037 	if (mngt_poller == NULL) {
1038 		SPDK_ERRLOG("Unable to initiate mngt request: %s", spdk_strerror(ENOMEM));
1039 		return -ENOMEM;
1040 	}
1041 
1042 	ocf_queue_set_priv(vbdev->cache_ctx->mngt_queue, mngt_poller);
1043 	ocf_mngt_cache_set_mngt_queue(vbdev->ocf_cache, vbdev->cache_ctx->mngt_queue);
1044 
1045 	return 0;
1046 }
1047 
1048 /* Start OCF cache, attach caching device */
1049 static void
1050 start_cache(struct vbdev_ocf *vbdev)
1051 {
1052 	ocf_cache_t existing;
1053 	uint32_t cache_block_size = vbdev->cache.bdev->blocklen;
1054 	uint32_t core_block_size = vbdev->core.bdev->blocklen;
1055 	int rc;
1056 
1057 	if (is_ocf_cache_running(vbdev)) {
1058 		vbdev_ocf_mngt_stop(vbdev, NULL, -EALREADY);
1059 		return;
1060 	}
1061 
1062 	if (cache_block_size > core_block_size) {
1063 		SPDK_ERRLOG("Cache bdev block size (%d) is bigger then core bdev block size (%d)\n",
1064 			    cache_block_size, core_block_size);
1065 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, -EINVAL);
1066 		return;
1067 	}
1068 
1069 	existing = get_other_cache_instance(vbdev);
1070 	if (existing) {
1071 		SPDK_NOTICELOG("OCF bdev %s connects to existing cache device %s\n",
1072 			       vbdev->name, vbdev->cache.name);
1073 		vbdev->ocf_cache = existing;
1074 		ocf_mngt_cache_get(vbdev->ocf_cache);
1075 		vbdev->cache_ctx = ocf_cache_get_priv(existing);
1076 		vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1077 		vbdev_ocf_mngt_continue(vbdev, 0);
1078 		return;
1079 	}
1080 
1081 	vbdev->cache_ctx = calloc(1, sizeof(struct vbdev_ocf_cache_ctx));
1082 	if (vbdev->cache_ctx == NULL) {
1083 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, -ENOMEM);
1084 		return;
1085 	}
1086 
1087 	vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1088 	pthread_mutex_init(&vbdev->cache_ctx->lock, NULL);
1089 
1090 	rc = ocf_mngt_cache_start(vbdev_ocf_ctx, &vbdev->ocf_cache, &vbdev->cfg.cache, NULL);
1091 	if (rc) {
1092 		SPDK_ERRLOG("Could not start cache %s: %d\n", vbdev->name, rc);
1093 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, rc);
1094 		return;
1095 	}
1096 	ocf_mngt_cache_get(vbdev->ocf_cache);
1097 
1098 	ocf_cache_set_priv(vbdev->ocf_cache, vbdev->cache_ctx);
1099 
1100 	rc = create_management_queue(vbdev);
1101 	if (rc) {
1102 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1103 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, rc);
1104 		return;
1105 	}
1106 
1107 	if (vbdev->cfg.loadq) {
1108 		ocf_mngt_cache_load(vbdev->ocf_cache, &vbdev->cfg.attach, start_cache_cmpl, vbdev);
1109 	} else {
1110 		ocf_mngt_cache_attach(vbdev->ocf_cache, &vbdev->cfg.attach, start_cache_cmpl, vbdev);
1111 	}
1112 }
1113 
1114 /* Procedures called during register operation */
1115 vbdev_ocf_mngt_fn register_path[] = {
1116 	start_cache,
1117 	add_core,
1118 	finish_register,
1119 	NULL
1120 };
1121 
1122 /* Start cache instance and register OCF bdev */
1123 static void
1124 register_vbdev(struct vbdev_ocf *vbdev, vbdev_ocf_mngt_callback cb, void *cb_arg)
1125 {
1126 	int rc;
1127 
1128 	if (!(vbdev->core.attached && vbdev->cache.attached) || vbdev->state.started) {
1129 		cb(-EPERM, vbdev, cb_arg);
1130 		return;
1131 	}
1132 
1133 	vbdev->state.starting = true;
1134 	rc = vbdev_ocf_mngt_start(vbdev, register_path, cb, cb_arg);
1135 	if (rc) {
1136 		cb(rc, vbdev, cb_arg);
1137 	}
1138 }
1139 
1140 /* Init OCF configuration options
1141  * for core and cache devices */
1142 static int
1143 init_vbdev_config(struct vbdev_ocf *vbdev)
1144 {
1145 	struct vbdev_ocf_config *cfg = &vbdev->cfg;
1146 	struct ocf_volume_uuid uuid;
1147 	ocf_volume_type_t type;
1148 	int ret;
1149 
1150 
1151 	/* Initialize OCF defaults first */
1152 	ocf_mngt_cache_attach_config_set_default(&cfg->attach);
1153 	ocf_mngt_cache_config_set_default(&cfg->cache);
1154 	ocf_mngt_core_config_set_default(&cfg->core);
1155 
1156 	snprintf(cfg->cache.name, sizeof(cfg->cache.name), "%s", vbdev->name);
1157 	snprintf(cfg->core.name, sizeof(cfg->core.name), "%s", vbdev->core.name);
1158 
1159 	cfg->attach.open_cores = false;
1160 	cfg->attach.device.perform_test = false;
1161 	cfg->attach.discard_on_start = false;
1162 
1163 	vbdev->cfg.cache.locked = true;
1164 
1165 	cfg->core.volume_type = SPDK_OBJECT;
1166 
1167 	if (vbdev->cfg.loadq) {
1168 		/* When doing cache_load(), we need to set try_add to true,
1169 		 * otherwise OCF will interpret this core as new
1170 		 * instead of the inactive one */
1171 		vbdev->cfg.core.try_add = true;
1172 	} else {
1173 		/* When cache is initialized as new, set force flag to true,
1174 		 * to ignore warnings about existing metadata */
1175 		cfg->attach.force = true;
1176 	}
1177 
1178 	/* Serialize bdev names in OCF UUID to interpret on future loads
1179 	 * Core UUID is a triple of (core name, vbdev name, cache name)
1180 	 * Cache UUID is cache bdev name */
1181 	type = ocf_ctx_get_volume_type(vbdev_ocf_ctx, SPDK_OBJECT);
1182 	if (!type) {
1183 		SPDK_ERRLOG("Fail to get volume type\n");
1184 		return -EINVAL;
1185 	}
1186 	uuid.size = strlen(vbdev->cache.name) + 1;
1187 	uuid.data = vbdev->cache.name;
1188 	ret = ocf_volume_create(&cfg->attach.device.volume, type, &uuid);
1189 	if (ret) {
1190 		SPDK_ERRLOG("Fail to create volume\n");
1191 		return -EINVAL;
1192 	}
1193 
1194 	snprintf(vbdev->uuid, VBDEV_OCF_MD_MAX_LEN, "%s %s %s",
1195 		 vbdev->core.name, vbdev->name, vbdev->cache.name);
1196 	cfg->core.uuid.size = strlen(vbdev->uuid) + 1;
1197 	cfg->core.uuid.data = vbdev->uuid;
1198 	vbdev->uuid[strlen(vbdev->core.name)] = 0;
1199 	vbdev->uuid[strlen(vbdev->core.name) + 1 + strlen(vbdev->name)] = 0;
1200 
1201 	return 0;
1202 }
1203 
1204 /* Allocate vbdev structure object and add it to the global list */
1205 static int
1206 init_vbdev(const char *vbdev_name,
1207 	   const char *cache_mode_name,
1208 	   const uint64_t cache_line_size,
1209 	   const char *cache_name,
1210 	   const char *core_name,
1211 	   bool loadq)
1212 {
1213 	struct vbdev_ocf *vbdev;
1214 	int rc = 0;
1215 
1216 	if (spdk_bdev_get_by_name(vbdev_name) || vbdev_ocf_get_by_name(vbdev_name)) {
1217 		SPDK_ERRLOG("Device with name '%s' already exists\n", vbdev_name);
1218 		return -EPERM;
1219 	}
1220 
1221 	vbdev = calloc(1, sizeof(*vbdev));
1222 	if (!vbdev) {
1223 		goto error_mem;
1224 	}
1225 
1226 	vbdev->name = strdup(vbdev_name);
1227 	if (!vbdev->name) {
1228 		goto error_mem;
1229 	}
1230 
1231 	vbdev->cache.name = strdup(cache_name);
1232 	if (!vbdev->cache.name) {
1233 		goto error_mem;
1234 	}
1235 
1236 	vbdev->core.name = strdup(core_name);
1237 	if (!vbdev->core.name) {
1238 		goto error_mem;
1239 	}
1240 
1241 	vbdev->cache.parent = vbdev;
1242 	vbdev->core.parent = vbdev;
1243 	vbdev->cache.is_cache = true;
1244 	vbdev->core.is_cache = false;
1245 	vbdev->cfg.loadq = loadq;
1246 
1247 	rc = init_vbdev_config(vbdev);
1248 	if (rc) {
1249 		SPDK_ERRLOG("Fail to init vbdev config\n");
1250 		goto error_free;
1251 	}
1252 
1253 
1254 	if (cache_mode_name) {
1255 		vbdev->cfg.cache.cache_mode
1256 			= ocf_get_cache_mode(cache_mode_name);
1257 	} else if (!loadq) { /* In load path it is OK to pass NULL as cache mode */
1258 		SPDK_ERRLOG("No cache mode specified\n");
1259 		rc = -EINVAL;
1260 		goto error_free;
1261 	}
1262 	if (vbdev->cfg.cache.cache_mode < 0) {
1263 		SPDK_ERRLOG("Incorrect cache mode '%s'\n", cache_mode_name);
1264 		rc = -EINVAL;
1265 		goto error_free;
1266 	}
1267 
1268 	ocf_cache_line_size_t set_cache_line_size = cache_line_size ?
1269 			(ocf_cache_line_size_t)cache_line_size * KiB :
1270 			ocf_cache_line_size_default;
1271 	if (set_cache_line_size == 0) {
1272 		SPDK_ERRLOG("Cache line size should be non-zero.\n");
1273 		rc = -EINVAL;
1274 		goto error_free;
1275 	}
1276 	vbdev->cfg.attach.cache_line_size = set_cache_line_size;
1277 	vbdev->cfg.cache.cache_line_size = set_cache_line_size;
1278 
1279 	TAILQ_INSERT_TAIL(&g_ocf_vbdev_head, vbdev, tailq);
1280 	return rc;
1281 
1282 error_mem:
1283 	rc = -ENOMEM;
1284 error_free:
1285 	free_vbdev(vbdev);
1286 	return rc;
1287 }
1288 
1289 /* Read configuration file at the start of SPDK application
1290  * This adds vbdevs to global list if some mentioned in config */
1291 static int
1292 vbdev_ocf_init(void)
1293 {
1294 	int status;
1295 
1296 	status = vbdev_ocf_ctx_init();
1297 	if (status) {
1298 		SPDK_ERRLOG("OCF ctx initialization failed with=%d\n", status);
1299 		return status;
1300 	}
1301 
1302 	status = vbdev_ocf_volume_init();
1303 	if (status) {
1304 		vbdev_ocf_ctx_cleanup();
1305 		SPDK_ERRLOG("OCF volume initialization failed with=%d\n", status);
1306 		return status;
1307 	}
1308 
1309 	return status;
1310 }
1311 
1312 /* Called after application shutdown started
1313  * Release memory of allocated structures here */
1314 static void
1315 vbdev_ocf_module_fini(void)
1316 {
1317 	struct vbdev_ocf *vbdev;
1318 
1319 	while ((vbdev = TAILQ_FIRST(&g_ocf_vbdev_head))) {
1320 		TAILQ_REMOVE(&g_ocf_vbdev_head, vbdev, tailq);
1321 		free_vbdev(vbdev);
1322 	}
1323 
1324 	vbdev_ocf_volume_cleanup();
1325 	vbdev_ocf_ctx_cleanup();
1326 }
1327 
1328 /* When base device gets unplugged this is called
1329  * We will unregister cache vbdev here
1330  * When cache device is removed, we delete every OCF bdev that used it */
1331 static void
1332 hotremove_cb(struct vbdev_ocf_base *base)
1333 {
1334 	struct vbdev_ocf *vbdev;
1335 
1336 	if (!base->is_cache) {
1337 		if (base->parent->state.doing_finish) {
1338 			return;
1339 		}
1340 
1341 		SPDK_NOTICELOG("Deinitializing '%s' because its core device '%s' was removed\n",
1342 			       base->parent->name, base->name);
1343 		vbdev_ocf_delete(base->parent, NULL, NULL);
1344 		return;
1345 	}
1346 
1347 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1348 		if (vbdev->state.doing_finish) {
1349 			continue;
1350 		}
1351 		if (strcmp(base->name, vbdev->cache.name) == 0) {
1352 			SPDK_NOTICELOG("Deinitializing '%s' because"
1353 				       " its cache device '%s' was removed\n",
1354 				       vbdev->name, base->name);
1355 			vbdev_ocf_delete(vbdev, NULL, NULL);
1356 		}
1357 	}
1358 }
1359 
1360 static void
1361 base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1362 		   void *event_ctx)
1363 {
1364 	switch (type) {
1365 	case SPDK_BDEV_EVENT_REMOVE:
1366 		if (event_ctx) {
1367 			hotremove_cb(event_ctx);
1368 		}
1369 		break;
1370 	default:
1371 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1372 		break;
1373 	}
1374 }
1375 
1376 /* Open base SPDK bdev and claim it */
1377 static int
1378 attach_base(struct vbdev_ocf_base *base)
1379 {
1380 	int status;
1381 
1382 	if (base->attached) {
1383 		return -EALREADY;
1384 	}
1385 
1386 	/* If base cache bdev was already opened by other vbdev,
1387 	 * we just copy its descriptor here */
1388 	if (base->is_cache) {
1389 		struct vbdev_ocf_base *existing = get_other_cache_base(base);
1390 		if (existing) {
1391 			base->desc = existing->desc;
1392 			base->management_channel = existing->management_channel;
1393 			base->attached = true;
1394 			return 0;
1395 		}
1396 	}
1397 
1398 	status = spdk_bdev_open_ext(base->name, true, base_bdev_event_cb, base, &base->desc);
1399 	if (status) {
1400 		SPDK_ERRLOG("Unable to open device '%s' for writing\n", base->name);
1401 		return status;
1402 	}
1403 
1404 	status = spdk_bdev_module_claim_bdev(base->bdev, base->desc,
1405 					     &ocf_if);
1406 	if (status) {
1407 		SPDK_ERRLOG("Unable to claim device '%s'\n", base->name);
1408 		spdk_bdev_close(base->desc);
1409 		return status;
1410 	}
1411 
1412 	base->management_channel = spdk_bdev_get_io_channel(base->desc);
1413 	if (!base->management_channel) {
1414 		SPDK_ERRLOG("Unable to get io channel '%s'\n", base->name);
1415 		spdk_bdev_module_release_bdev(base->bdev);
1416 		spdk_bdev_close(base->desc);
1417 		return -ENOMEM;
1418 	}
1419 
1420 	/* Save the thread where the base device is opened */
1421 	base->thread = spdk_get_thread();
1422 
1423 	base->attached = true;
1424 	return status;
1425 }
1426 
1427 /* Attach base bdevs */
1428 static int
1429 attach_base_bdevs(struct vbdev_ocf *vbdev,
1430 		  struct spdk_bdev *cache_bdev,
1431 		  struct spdk_bdev *core_bdev)
1432 {
1433 	int rc = 0;
1434 
1435 	if (cache_bdev) {
1436 		vbdev->cache.bdev = cache_bdev;
1437 		rc |= attach_base(&vbdev->cache);
1438 	}
1439 
1440 	if (core_bdev) {
1441 		vbdev->core.bdev = core_bdev;
1442 		rc |= attach_base(&vbdev->core);
1443 	}
1444 
1445 	return rc;
1446 }
1447 
1448 /* Init and then start vbdev if all base devices are present */
1449 void
1450 vbdev_ocf_construct(const char *vbdev_name,
1451 		    const char *cache_mode_name,
1452 		    const uint64_t cache_line_size,
1453 		    const char *cache_name,
1454 		    const char *core_name,
1455 		    bool loadq,
1456 		    void (*cb)(int, struct vbdev_ocf *, void *),
1457 		    void *cb_arg)
1458 {
1459 	int rc;
1460 	struct spdk_bdev *cache_bdev = spdk_bdev_get_by_name(cache_name);
1461 	struct spdk_bdev *core_bdev = spdk_bdev_get_by_name(core_name);
1462 	struct vbdev_ocf *vbdev;
1463 
1464 	rc = init_vbdev(vbdev_name, cache_mode_name, cache_line_size, cache_name, core_name, loadq);
1465 	if (rc) {
1466 		cb(rc, NULL, cb_arg);
1467 		return;
1468 	}
1469 
1470 	vbdev = vbdev_ocf_get_by_name(vbdev_name);
1471 	if (vbdev == NULL) {
1472 		cb(-ENODEV, NULL, cb_arg);
1473 		return;
1474 	}
1475 
1476 	if (cache_bdev == NULL) {
1477 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for cache device '%s' to connect\n",
1478 			       vbdev->name, cache_name);
1479 	}
1480 	if (core_bdev == NULL) {
1481 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for core device '%s' to connect\n",
1482 			       vbdev->name, core_name);
1483 	}
1484 
1485 	rc = attach_base_bdevs(vbdev, cache_bdev, core_bdev);
1486 	if (rc) {
1487 		cb(rc, vbdev, cb_arg);
1488 		return;
1489 	}
1490 
1491 	if (core_bdev && cache_bdev) {
1492 		register_vbdev(vbdev, cb, cb_arg);
1493 	} else {
1494 		cb(0, vbdev, cb_arg);
1495 	}
1496 }
1497 
1498 /* Set new cache mode on OCF cache */
1499 void
1500 vbdev_ocf_set_cache_mode(struct vbdev_ocf *vbdev,
1501 			 const char *cache_mode_name,
1502 			 void (*cb)(int, struct vbdev_ocf *, void *),
1503 			 void *cb_arg)
1504 {
1505 	ocf_cache_t cache;
1506 	ocf_cache_mode_t cache_mode;
1507 	int rc;
1508 
1509 	cache = vbdev->ocf_cache;
1510 	cache_mode = ocf_get_cache_mode(cache_mode_name);
1511 
1512 	rc = ocf_mngt_cache_trylock(cache);
1513 	if (rc) {
1514 		cb(rc, vbdev, cb_arg);
1515 		return;
1516 	}
1517 
1518 	rc = ocf_mngt_cache_set_mode(cache, cache_mode);
1519 	ocf_mngt_cache_unlock(cache);
1520 	cb(rc, vbdev, cb_arg);
1521 }
1522 
1523 /* Set sequential cutoff parameters on OCF cache */
1524 void
1525 vbdev_ocf_set_seqcutoff(struct vbdev_ocf *vbdev, const char *policy_name, uint32_t threshold,
1526 			uint32_t promotion_count, void (*cb)(int, void *), void *cb_arg)
1527 {
1528 	ocf_cache_t cache;
1529 	ocf_seq_cutoff_policy policy;
1530 	int rc;
1531 
1532 	cache = vbdev->ocf_cache;
1533 
1534 	policy = ocf_get_seqcutoff_policy(policy_name);
1535 	if (policy == ocf_seq_cutoff_policy_max) {
1536 		cb(OCF_ERR_INVAL, cb_arg);
1537 		return;
1538 	}
1539 
1540 	rc = ocf_mngt_cache_trylock(cache);
1541 	if (rc) {
1542 		cb(rc, cb_arg);
1543 		return;
1544 	}
1545 
1546 	rc = ocf_mngt_core_set_seq_cutoff_policy_all(cache, policy);
1547 	if (rc) {
1548 		goto end;
1549 	}
1550 
1551 	if (threshold) {
1552 		threshold = threshold * KiB;
1553 
1554 		rc = ocf_mngt_core_set_seq_cutoff_threshold_all(cache, threshold);
1555 		if (rc) {
1556 			goto end;
1557 		}
1558 	}
1559 
1560 	if (promotion_count) {
1561 		rc = ocf_mngt_core_set_seq_cutoff_promotion_count_all(cache, promotion_count);
1562 	}
1563 
1564 end:
1565 	ocf_mngt_cache_unlock(cache);
1566 	cb(rc, cb_arg);
1567 }
1568 
1569 /* This called if new device is created in SPDK application
1570  * If that device named as one of base bdevs of OCF vbdev,
1571  * claim and open them */
1572 static void
1573 vbdev_ocf_examine(struct spdk_bdev *bdev)
1574 {
1575 	const char *bdev_name = spdk_bdev_get_name(bdev);
1576 	struct vbdev_ocf *vbdev;
1577 
1578 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1579 		if (vbdev->state.doing_finish) {
1580 			continue;
1581 		}
1582 
1583 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1584 			attach_base_bdevs(vbdev, bdev, NULL);
1585 			continue;
1586 		}
1587 		if (!strcmp(bdev_name, vbdev->core.name)) {
1588 			attach_base_bdevs(vbdev, NULL, bdev);
1589 			break;
1590 		}
1591 	}
1592 	spdk_bdev_module_examine_done(&ocf_if);
1593 }
1594 
1595 struct metadata_probe_ctx {
1596 	struct vbdev_ocf_base base;
1597 	ocf_volume_t volume;
1598 
1599 	struct ocf_volume_uuid *core_uuids;
1600 	unsigned int uuid_count;
1601 
1602 	int result;
1603 	int refcnt;
1604 };
1605 
1606 static void
1607 _examine_ctx_put(void *ctx)
1608 {
1609 	struct spdk_bdev_desc *desc = ctx;
1610 
1611 	spdk_bdev_close(desc);
1612 }
1613 
1614 static void
1615 examine_ctx_put(struct metadata_probe_ctx *ctx)
1616 {
1617 	unsigned int i;
1618 
1619 	ctx->refcnt--;
1620 	if (ctx->refcnt > 0) {
1621 		return;
1622 	}
1623 
1624 	if (ctx->result) {
1625 		SPDK_ERRLOG("OCF metadata probe for bdev '%s' failed with %d\n",
1626 			    spdk_bdev_get_name(ctx->base.bdev), ctx->result);
1627 	}
1628 
1629 	if (ctx->base.desc) {
1630 		/* Close the underlying bdev on its same opened thread. */
1631 		if (ctx->base.thread && ctx->base.thread != spdk_get_thread()) {
1632 			spdk_thread_send_msg(ctx->base.thread, _examine_ctx_put, ctx->base.desc);
1633 		} else {
1634 			spdk_bdev_close(ctx->base.desc);
1635 		}
1636 	}
1637 
1638 	if (ctx->volume) {
1639 		ocf_volume_destroy(ctx->volume);
1640 	}
1641 
1642 	if (ctx->core_uuids) {
1643 		for (i = 0; i < ctx->uuid_count; i++) {
1644 			free(ctx->core_uuids[i].data);
1645 		}
1646 	}
1647 	free(ctx->core_uuids);
1648 
1649 	examine_done(ctx->result, NULL, ctx->base.bdev);
1650 	free(ctx);
1651 }
1652 
1653 static void
1654 metadata_probe_cb(void *priv, int rc,
1655 		  struct ocf_metadata_probe_status *status)
1656 {
1657 	struct metadata_probe_ctx *ctx = priv;
1658 
1659 	if (rc) {
1660 		/* -ENODATA means device does not have cache metadata on it */
1661 		if (rc != -OCF_ERR_NO_METADATA) {
1662 			ctx->result = rc;
1663 		}
1664 	}
1665 
1666 	examine_ctx_put(ctx);
1667 }
1668 
1669 /* This is called after vbdev_ocf_examine
1670  * It allows to delay application initialization
1671  * until all OCF bdevs get registered
1672  * If vbdev has all of its base devices it starts asynchronously here
1673  * We first check if bdev appears in configuration,
1674  * if not we do metadata_probe() to create its configuration from bdev metadata */
1675 static void
1676 vbdev_ocf_examine_disk(struct spdk_bdev *bdev)
1677 {
1678 	const char *bdev_name = spdk_bdev_get_name(bdev);
1679 	struct vbdev_ocf *vbdev;
1680 	struct metadata_probe_ctx *ctx;
1681 	bool created_from_config = false;
1682 	int rc;
1683 
1684 	examine_start(bdev);
1685 
1686 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1687 		if (vbdev->state.doing_finish || vbdev->state.started) {
1688 			continue;
1689 		}
1690 
1691 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1692 			examine_start(bdev);
1693 			register_vbdev(vbdev, examine_done, bdev);
1694 			created_from_config = true;
1695 			continue;
1696 		}
1697 		if (!strcmp(bdev_name, vbdev->core.name)) {
1698 			examine_start(bdev);
1699 			register_vbdev(vbdev, examine_done, bdev);
1700 			examine_done(0, NULL, bdev);
1701 			return;
1702 		}
1703 	}
1704 
1705 	/* If devices is discovered during config we do not check for metadata */
1706 	if (created_from_config) {
1707 		examine_done(0, NULL, bdev);
1708 		return;
1709 	}
1710 
1711 	/* Metadata probe path
1712 	 * We create temporary OCF volume and a temporary base structure
1713 	 * to use them for ocf_metadata_probe() and for bottom adapter IOs
1714 	 * Then we get UUIDs of core devices an create configurations based on them */
1715 	ctx = calloc(1, sizeof(*ctx));
1716 	if (!ctx) {
1717 		examine_done(-ENOMEM, NULL, bdev);
1718 		return;
1719 	}
1720 
1721 	ctx->base.bdev = bdev;
1722 	ctx->refcnt = 1;
1723 
1724 	rc = spdk_bdev_open_ext(bdev_name, true, base_bdev_event_cb, NULL, &ctx->base.desc);
1725 	if (rc) {
1726 		ctx->result = rc;
1727 		examine_ctx_put(ctx);
1728 		return;
1729 	}
1730 
1731 	rc = ocf_ctx_volume_create(vbdev_ocf_ctx, &ctx->volume, NULL, SPDK_OBJECT);
1732 	if (rc) {
1733 		ctx->result = rc;
1734 		examine_ctx_put(ctx);
1735 		return;
1736 	}
1737 
1738 	rc = ocf_volume_open(ctx->volume, &ctx->base);
1739 	if (rc) {
1740 		ctx->result = rc;
1741 		examine_ctx_put(ctx);
1742 		return;
1743 	}
1744 
1745 	/* Save the thread where the base device is opened */
1746 	ctx->base.thread = spdk_get_thread();
1747 
1748 	ocf_metadata_probe(vbdev_ocf_ctx, ctx->volume, metadata_probe_cb, ctx);
1749 }
1750 
1751 static int
1752 vbdev_ocf_get_ctx_size(void)
1753 {
1754 	return sizeof(struct bdev_ocf_data);
1755 }
1756 
1757 static void
1758 fini_start(void)
1759 {
1760 	g_fini_started = true;
1761 }
1762 
1763 /* Module-global function table
1764  * Does not relate to vbdev instances */
1765 static struct spdk_bdev_module ocf_if = {
1766 	.name = "ocf",
1767 	.module_init = vbdev_ocf_init,
1768 	.fini_start = fini_start,
1769 	.module_fini = vbdev_ocf_module_fini,
1770 	.get_ctx_size = vbdev_ocf_get_ctx_size,
1771 	.examine_config = vbdev_ocf_examine,
1772 	.examine_disk   = vbdev_ocf_examine_disk,
1773 };
1774 SPDK_BDEV_MODULE_REGISTER(ocf, &ocf_if);
1775