xref: /spdk/module/bdev/ocf/vbdev_ocf.c (revision 60982c759db49b4f4579f16e3b24df0725ba4b94)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include <ocf/ocf.h>
7 #include <ocf/ocf_types.h>
8 #include <ocf/ocf_mngt.h>
9 
10 #include "ctx.h"
11 #include "data.h"
12 #include "volume.h"
13 #include "utils.h"
14 #include "vbdev_ocf.h"
15 
16 #include "spdk/bdev_module.h"
17 #include "spdk/thread.h"
18 #include "spdk/string.h"
19 #include "spdk/log.h"
20 #include "spdk/cpuset.h"
21 
22 /* This namespace UUID was generated using uuid_generate() method. */
23 #define BDEV_OCF_NAMESPACE_UUID "f92b7f49-f6c0-44c8-bd23-3205e8c3b6ad"
24 
25 static struct spdk_bdev_module ocf_if;
26 
27 static TAILQ_HEAD(, vbdev_ocf) g_ocf_vbdev_head
28 	= TAILQ_HEAD_INITIALIZER(g_ocf_vbdev_head);
29 
30 static TAILQ_HEAD(, examining_bdev) g_ocf_examining_bdevs_head
31 	= TAILQ_HEAD_INITIALIZER(g_ocf_examining_bdevs_head);
32 
33 bool g_fini_started = false;
34 
35 /* Structure for keeping list of bdevs that are claimed but not used yet */
36 struct examining_bdev {
37 	struct spdk_bdev           *bdev;
38 	TAILQ_ENTRY(examining_bdev) tailq;
39 };
40 
41 /* Add bdev to list of claimed */
42 static void
43 examine_start(struct spdk_bdev *bdev)
44 {
45 	struct examining_bdev *entry = malloc(sizeof(*entry));
46 
47 	assert(entry);
48 	entry->bdev = bdev;
49 	TAILQ_INSERT_TAIL(&g_ocf_examining_bdevs_head, entry, tailq);
50 }
51 
52 /* Find bdev on list of claimed bdevs, then remove it,
53  * if it was the last one on list then report examine done */
54 static void
55 examine_done(int status, struct vbdev_ocf *vbdev, void *cb_arg)
56 {
57 	struct spdk_bdev *bdev = cb_arg;
58 	struct examining_bdev *entry, *safe, *found = NULL;
59 
60 	TAILQ_FOREACH_SAFE(entry, &g_ocf_examining_bdevs_head, tailq, safe) {
61 		if (entry->bdev == bdev) {
62 			if (found) {
63 				goto remove;
64 			} else {
65 				found = entry;
66 			}
67 		}
68 	}
69 
70 	assert(found);
71 	spdk_bdev_module_examine_done(&ocf_if);
72 
73 remove:
74 	TAILQ_REMOVE(&g_ocf_examining_bdevs_head, found, tailq);
75 	free(found);
76 }
77 
78 /* Free allocated strings and structure itself
79  * Used at shutdown only */
80 static void
81 free_vbdev(struct vbdev_ocf *vbdev)
82 {
83 	if (!vbdev) {
84 		return;
85 	}
86 
87 	free(vbdev->name);
88 	free(vbdev->cache.name);
89 	free(vbdev->core.name);
90 	free(vbdev);
91 }
92 
93 /* Get existing cache base
94  * that is attached to other vbdev */
95 static struct vbdev_ocf_base *
96 get_other_cache_base(struct vbdev_ocf_base *base)
97 {
98 	struct vbdev_ocf *vbdev;
99 
100 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
101 		if (&vbdev->cache == base || !vbdev->cache.attached) {
102 			continue;
103 		}
104 		if (!strcmp(vbdev->cache.name, base->name)) {
105 			return &vbdev->cache;
106 		}
107 	}
108 
109 	return NULL;
110 }
111 
112 static bool
113 is_ocf_cache_running(struct vbdev_ocf *vbdev)
114 {
115 	if (vbdev->cache.attached && vbdev->ocf_cache) {
116 		return ocf_cache_is_running(vbdev->ocf_cache);
117 	}
118 	return false;
119 }
120 
121 static bool
122 is_ocf_cache_initializing(struct vbdev_ocf *vbdev)
123 {
124 	if (vbdev->cache.attached && vbdev->ocf_cache) {
125 		return ocf_cache_is_initializing(vbdev->ocf_cache);
126 	}
127 	return false;
128 }
129 
130 /* Get existing OCF cache instance
131  * that is started by other vbdev */
132 static ocf_cache_t
133 get_other_cache_instance(struct vbdev_ocf *vbdev)
134 {
135 	struct vbdev_ocf *cmp;
136 
137 	TAILQ_FOREACH(cmp, &g_ocf_vbdev_head, tailq) {
138 		if (cmp->state.doing_finish || cmp == vbdev) {
139 			continue;
140 		}
141 		if (strcmp(cmp->cache.name, vbdev->cache.name)) {
142 			continue;
143 		}
144 		if (is_ocf_cache_running(cmp) || is_ocf_cache_initializing(cmp)) {
145 			return cmp->ocf_cache;
146 		}
147 	}
148 
149 	return NULL;
150 }
151 
152 static void
153 _remove_base_bdev(void *ctx)
154 {
155 	struct spdk_bdev_desc *desc = ctx;
156 
157 	spdk_bdev_close(desc);
158 }
159 
160 /* Close and unclaim base bdev */
161 static void
162 remove_base_bdev(struct vbdev_ocf_base *base)
163 {
164 	if (base->attached) {
165 		if (base->management_channel) {
166 			spdk_put_io_channel(base->management_channel);
167 		}
168 
169 		spdk_bdev_module_release_bdev(base->bdev);
170 		/* Close the underlying bdev on its same opened thread. */
171 		if (base->thread && base->thread != spdk_get_thread()) {
172 			spdk_thread_send_msg(base->thread, _remove_base_bdev, base->desc);
173 		} else {
174 			spdk_bdev_close(base->desc);
175 		}
176 		base->attached = false;
177 	}
178 }
179 
180 /* Finish unregister operation */
181 static void
182 unregister_finish(struct vbdev_ocf *vbdev)
183 {
184 	spdk_bdev_destruct_done(&vbdev->exp_bdev, vbdev->state.stop_status);
185 
186 	if (vbdev->ocf_cache) {
187 		ocf_mngt_cache_put(vbdev->ocf_cache);
188 	}
189 
190 	if (vbdev->cache_ctx) {
191 		vbdev_ocf_cache_ctx_put(vbdev->cache_ctx);
192 	}
193 	vbdev_ocf_mngt_continue(vbdev, 0);
194 }
195 
196 static void
197 close_core_bdev(struct vbdev_ocf *vbdev)
198 {
199 	remove_base_bdev(&vbdev->core);
200 	vbdev_ocf_mngt_continue(vbdev, 0);
201 }
202 
203 static void
204 remove_core_cmpl(void *priv, int error)
205 {
206 	struct vbdev_ocf *vbdev = priv;
207 
208 	ocf_mngt_cache_unlock(vbdev->ocf_cache);
209 	vbdev_ocf_mngt_continue(vbdev, error);
210 }
211 
212 /* Try to lock cache, then remove core */
213 static void
214 remove_core_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
215 {
216 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
217 
218 	if (error) {
219 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
220 			    error, vbdev->name);
221 		vbdev_ocf_mngt_continue(vbdev, error);
222 		return;
223 	}
224 
225 	ocf_mngt_cache_remove_core(vbdev->ocf_core, remove_core_cmpl, vbdev);
226 }
227 
228 /* Detach core base */
229 static void
230 detach_core(struct vbdev_ocf *vbdev)
231 {
232 	if (is_ocf_cache_running(vbdev)) {
233 		ocf_mngt_cache_lock(vbdev->ocf_cache, remove_core_cache_lock_cmpl, vbdev);
234 	} else {
235 		vbdev_ocf_mngt_continue(vbdev, 0);
236 	}
237 }
238 
239 static void
240 close_cache_bdev(struct vbdev_ocf *vbdev)
241 {
242 	remove_base_bdev(&vbdev->cache);
243 	vbdev_ocf_mngt_continue(vbdev, 0);
244 }
245 
246 /* Detach cache base */
247 static void
248 detach_cache(struct vbdev_ocf *vbdev)
249 {
250 	vbdev->state.stop_status = vbdev->mngt_ctx.status;
251 
252 	/* If some other vbdev references this cache bdev,
253 	 * we detach this only by changing the flag, without actual close */
254 	if (get_other_cache_base(&vbdev->cache)) {
255 		vbdev->cache.attached = false;
256 	}
257 
258 	vbdev_ocf_mngt_continue(vbdev, 0);
259 }
260 
261 static void
262 stop_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
263 {
264 	struct vbdev_ocf *vbdev = priv;
265 
266 	vbdev_ocf_queue_put(vbdev->cache_ctx->mngt_queue);
267 	ocf_mngt_cache_unlock(cache);
268 
269 	vbdev_ocf_mngt_continue(vbdev, error);
270 }
271 
272 /* Try to lock cache, then stop it */
273 static void
274 stop_vbdev_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
275 {
276 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
277 
278 	if (error) {
279 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
280 			    error, vbdev->name);
281 		vbdev_ocf_mngt_continue(vbdev, error);
282 		return;
283 	}
284 
285 	ocf_mngt_cache_stop(vbdev->ocf_cache, stop_vbdev_cmpl, vbdev);
286 }
287 
288 /* Stop OCF cache object
289  * vbdev_ocf is not operational after this */
290 static void
291 stop_vbdev(struct vbdev_ocf *vbdev)
292 {
293 	if (!is_ocf_cache_running(vbdev)) {
294 		vbdev_ocf_mngt_continue(vbdev, 0);
295 		return;
296 	}
297 
298 	if (!g_fini_started && get_other_cache_instance(vbdev)) {
299 		SPDK_NOTICELOG("Not stopping cache instance '%s'"
300 			       " because it is referenced by other OCF bdev\n",
301 			       vbdev->cache.name);
302 		vbdev_ocf_mngt_continue(vbdev, 0);
303 		return;
304 	}
305 
306 	ocf_mngt_cache_lock(vbdev->ocf_cache, stop_vbdev_cache_lock_cmpl, vbdev);
307 }
308 
309 static void
310 flush_vbdev_cmpl(ocf_cache_t cache, void *priv, int error)
311 {
312 	struct vbdev_ocf *vbdev = priv;
313 
314 	ocf_mngt_cache_unlock(cache);
315 	vbdev_ocf_mngt_continue(vbdev, error);
316 }
317 
318 static void
319 flush_vbdev_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
320 {
321 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
322 
323 	if (error) {
324 		SPDK_ERRLOG("Error %d, can not lock cache instance %s\n",
325 			    error, vbdev->name);
326 		vbdev_ocf_mngt_continue(vbdev, error);
327 		return;
328 	}
329 
330 	ocf_mngt_cache_flush(vbdev->ocf_cache, flush_vbdev_cmpl, vbdev);
331 }
332 
333 static void
334 flush_vbdev(struct vbdev_ocf *vbdev)
335 {
336 	if (!is_ocf_cache_running(vbdev)) {
337 		vbdev_ocf_mngt_continue(vbdev, -EINVAL);
338 		return;
339 	}
340 
341 	ocf_mngt_cache_lock(vbdev->ocf_cache, flush_vbdev_cache_lock_cmpl, vbdev);
342 }
343 
344 /* Procedures called during dirty unregister */
345 vbdev_ocf_mngt_fn unregister_path_dirty[] = {
346 	flush_vbdev,
347 	stop_vbdev,
348 	detach_cache,
349 	close_cache_bdev,
350 	detach_core,
351 	close_core_bdev,
352 	unregister_finish,
353 	NULL
354 };
355 
356 /* Procedures called during clean unregister */
357 vbdev_ocf_mngt_fn unregister_path_clean[] = {
358 	flush_vbdev,
359 	detach_core,
360 	close_core_bdev,
361 	stop_vbdev,
362 	detach_cache,
363 	close_cache_bdev,
364 	unregister_finish,
365 	NULL
366 };
367 
368 /* Start asynchronous management operation using unregister_path */
369 static void
370 unregister_cb(void *opaque)
371 {
372 	struct vbdev_ocf *vbdev = opaque;
373 	vbdev_ocf_mngt_fn *unregister_path;
374 	int rc;
375 
376 	unregister_path = vbdev->state.doing_clean_delete ?
377 			  unregister_path_clean : unregister_path_dirty;
378 
379 	rc = vbdev_ocf_mngt_start(vbdev, unregister_path, NULL, NULL);
380 	if (rc) {
381 		SPDK_ERRLOG("Unable to unregister OCF bdev: %d\n", rc);
382 		spdk_bdev_destruct_done(&vbdev->exp_bdev, rc);
383 	}
384 }
385 
386 /* Clean remove case - remove core and then cache, this order
387  * will remove instance permanently */
388 static void
389 _vbdev_ocf_destruct_clean(struct vbdev_ocf *vbdev)
390 {
391 	if (vbdev->core.attached) {
392 		detach_core(vbdev);
393 		close_core_bdev(vbdev);
394 	}
395 
396 	if (vbdev->cache.attached) {
397 		detach_cache(vbdev);
398 		close_cache_bdev(vbdev);
399 	}
400 }
401 
402 /* Dirty shutdown/hot remove case - remove cache and then core, this order
403  * will allow us to recover this instance in the future */
404 static void
405 _vbdev_ocf_destruct_dirty(struct vbdev_ocf *vbdev)
406 {
407 	if (vbdev->cache.attached) {
408 		detach_cache(vbdev);
409 		close_cache_bdev(vbdev);
410 	}
411 
412 	if (vbdev->core.attached) {
413 		detach_core(vbdev);
414 		close_core_bdev(vbdev);
415 	}
416 }
417 
418 /* Unregister io device with callback to unregister_cb
419  * This function is called during spdk_bdev_unregister */
420 static int
421 vbdev_ocf_destruct(void *opaque)
422 {
423 	struct vbdev_ocf *vbdev = opaque;
424 
425 	if (vbdev->state.doing_finish) {
426 		return -EALREADY;
427 	}
428 
429 	if (vbdev->state.starting && !vbdev->state.started) {
430 		/* Prevent before detach cache/core during register path of
431 		  this bdev */
432 		return -EBUSY;
433 	}
434 
435 	vbdev->state.doing_finish = true;
436 
437 	if (vbdev->state.started) {
438 		spdk_io_device_unregister(vbdev, unregister_cb);
439 		/* Return 1 because unregister is delayed */
440 		return 1;
441 	}
442 
443 	if (vbdev->state.doing_clean_delete) {
444 		_vbdev_ocf_destruct_clean(vbdev);
445 	} else {
446 		_vbdev_ocf_destruct_dirty(vbdev);
447 	}
448 
449 	return 0;
450 }
451 
452 /* Stop OCF cache and unregister SPDK bdev */
453 int
454 vbdev_ocf_delete(struct vbdev_ocf *vbdev, void (*cb)(void *, int), void *cb_arg)
455 {
456 	int rc = 0;
457 
458 	if (vbdev->state.started) {
459 		spdk_bdev_unregister(&vbdev->exp_bdev, cb, cb_arg);
460 	} else {
461 		rc = vbdev_ocf_destruct(vbdev);
462 		if (rc == 0 && cb) {
463 			cb(cb_arg, 0);
464 		}
465 	}
466 
467 	return rc;
468 }
469 
470 /* Remove cores permanently and then stop OCF cache and unregister SPDK bdev */
471 int
472 vbdev_ocf_delete_clean(struct vbdev_ocf *vbdev, void (*cb)(void *, int),
473 		       void *cb_arg)
474 {
475 	vbdev->state.doing_clean_delete = true;
476 
477 	return vbdev_ocf_delete(vbdev, cb, cb_arg);
478 }
479 
480 
481 /* If vbdev is online, return its object */
482 struct vbdev_ocf *
483 vbdev_ocf_get_by_name(const char *name)
484 {
485 	struct vbdev_ocf *vbdev;
486 
487 	if (name == NULL) {
488 		assert(false);
489 		return NULL;
490 	}
491 
492 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
493 		if (vbdev->name == NULL || vbdev->state.doing_finish) {
494 			continue;
495 		}
496 		if (strcmp(vbdev->name, name) == 0) {
497 			return vbdev;
498 		}
499 	}
500 	return NULL;
501 }
502 
503 /* Return matching base if parent vbdev is online */
504 struct vbdev_ocf_base *
505 vbdev_ocf_get_base_by_name(const char *name)
506 {
507 	struct vbdev_ocf *vbdev;
508 
509 	if (name == NULL) {
510 		assert(false);
511 		return NULL;
512 	}
513 
514 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
515 		if (vbdev->state.doing_finish) {
516 			continue;
517 		}
518 
519 		if (vbdev->cache.name && strcmp(vbdev->cache.name, name) == 0) {
520 			return &vbdev->cache;
521 		}
522 		if (vbdev->core.name && strcmp(vbdev->core.name, name) == 0) {
523 			return &vbdev->core;
524 		}
525 	}
526 	return NULL;
527 }
528 
529 /* Execute fn for each OCF device that is online or waits for base devices */
530 void
531 vbdev_ocf_foreach(vbdev_ocf_foreach_fn fn, void *ctx)
532 {
533 	struct vbdev_ocf *vbdev;
534 
535 	assert(fn != NULL);
536 
537 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
538 		if (!vbdev->state.doing_finish) {
539 			fn(vbdev, ctx);
540 		}
541 	}
542 }
543 
544 /* Called from OCF when SPDK_IO is completed */
545 static void
546 vbdev_ocf_io_submit_cb(struct ocf_io *io, int error)
547 {
548 	struct spdk_bdev_io *bdev_io = io->priv1;
549 
550 	if (error == 0) {
551 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
552 	} else if (error == -OCF_ERR_NO_MEM) {
553 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
554 	} else {
555 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
556 	}
557 
558 	ocf_io_put(io);
559 }
560 
561 /* Configure io parameters and send it to OCF */
562 static int
563 io_submit_to_ocf(struct spdk_bdev_io *bdev_io, struct ocf_io *io)
564 {
565 	switch (bdev_io->type) {
566 	case SPDK_BDEV_IO_TYPE_WRITE:
567 	case SPDK_BDEV_IO_TYPE_READ:
568 		ocf_core_submit_io(io);
569 		return 0;
570 	case SPDK_BDEV_IO_TYPE_FLUSH:
571 		ocf_core_submit_flush(io);
572 		return 0;
573 	case SPDK_BDEV_IO_TYPE_UNMAP:
574 		ocf_core_submit_discard(io);
575 		return 0;
576 	case SPDK_BDEV_IO_TYPE_RESET:
577 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
578 	default:
579 		SPDK_ERRLOG("Unsupported IO type: %d\n", bdev_io->type);
580 		return -EINVAL;
581 	}
582 }
583 
584 /* Submit SPDK-IO to OCF */
585 static void
586 io_handle(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
587 {
588 	struct vbdev_ocf *vbdev = bdev_io->bdev->ctxt;
589 	struct ocf_io *io = NULL;
590 	struct bdev_ocf_data *data = NULL;
591 	struct vbdev_ocf_qctx *qctx = spdk_io_channel_get_ctx(ch);
592 	uint64_t len = bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen;
593 	uint64_t offset = bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen;
594 	int dir, flags = 0;
595 	int err;
596 
597 	switch (bdev_io->type) {
598 	case SPDK_BDEV_IO_TYPE_READ:
599 		dir = OCF_READ;
600 		break;
601 	case SPDK_BDEV_IO_TYPE_WRITE:
602 		dir = OCF_WRITE;
603 		break;
604 	case SPDK_BDEV_IO_TYPE_FLUSH:
605 		dir = OCF_WRITE;
606 		break;
607 	case SPDK_BDEV_IO_TYPE_UNMAP:
608 		dir = OCF_WRITE;
609 		break;
610 	default:
611 		err = -EINVAL;
612 		goto fail;
613 	}
614 
615 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_FLUSH) {
616 		flags = OCF_WRITE_FLUSH;
617 	}
618 
619 	io = ocf_volume_new_io(ocf_core_get_front_volume(vbdev->ocf_core), qctx->queue, offset, len, dir, 0,
620 			       flags);
621 	if (!io) {
622 		err = -ENOMEM;
623 		goto fail;
624 	}
625 
626 	data = vbdev_ocf_data_from_spdk_io(bdev_io);
627 	if (!data) {
628 		err = -ENOMEM;
629 		goto fail;
630 	}
631 
632 	err = ocf_io_set_data(io, data, 0);
633 	if (err) {
634 		goto fail;
635 	}
636 
637 	ocf_io_set_cmpl(io, bdev_io, NULL, vbdev_ocf_io_submit_cb);
638 
639 	err = io_submit_to_ocf(bdev_io, io);
640 	if (err) {
641 		goto fail;
642 	}
643 
644 	return;
645 
646 fail:
647 	if (io) {
648 		ocf_io_put(io);
649 	}
650 
651 	if (err == -ENOMEM) {
652 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM);
653 	} else {
654 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
655 	}
656 }
657 
658 static void
659 vbdev_ocf_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
660 		     bool success)
661 {
662 	if (!success) {
663 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
664 		return;
665 	}
666 
667 	io_handle(ch, bdev_io);
668 }
669 
670 /* Called from bdev layer when an io to Cache vbdev is submitted */
671 static void
672 vbdev_ocf_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
673 {
674 	switch (bdev_io->type) {
675 	case SPDK_BDEV_IO_TYPE_READ:
676 		/* User does not have to allocate io vectors for the request,
677 		 * so in case they are not allocated, we allocate them here */
678 		spdk_bdev_io_get_buf(bdev_io, vbdev_ocf_get_buf_cb,
679 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
680 		break;
681 	case SPDK_BDEV_IO_TYPE_WRITE:
682 	case SPDK_BDEV_IO_TYPE_FLUSH:
683 	case SPDK_BDEV_IO_TYPE_UNMAP:
684 		io_handle(ch, bdev_io);
685 		break;
686 	case SPDK_BDEV_IO_TYPE_RESET:
687 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
688 	default:
689 		SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
690 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
691 		break;
692 	}
693 }
694 
695 /* Called from bdev layer */
696 static bool
697 vbdev_ocf_io_type_supported(void *opaque, enum spdk_bdev_io_type io_type)
698 {
699 	struct vbdev_ocf *vbdev = opaque;
700 
701 	switch (io_type) {
702 	case SPDK_BDEV_IO_TYPE_READ:
703 	case SPDK_BDEV_IO_TYPE_WRITE:
704 	case SPDK_BDEV_IO_TYPE_FLUSH:
705 	case SPDK_BDEV_IO_TYPE_UNMAP:
706 		return spdk_bdev_io_type_supported(vbdev->core.bdev, io_type);
707 	case SPDK_BDEV_IO_TYPE_RESET:
708 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
709 	default:
710 		return false;
711 	}
712 }
713 
714 /* Called from bdev layer */
715 static struct spdk_io_channel *
716 vbdev_ocf_get_io_channel(void *opaque)
717 {
718 	struct vbdev_ocf *bdev = opaque;
719 
720 	return spdk_get_io_channel(bdev);
721 }
722 
723 static int
724 vbdev_ocf_dump_info_json(void *opaque, struct spdk_json_write_ctx *w)
725 {
726 	struct vbdev_ocf *vbdev = opaque;
727 
728 	spdk_json_write_named_string(w, "cache_device", vbdev->cache.name);
729 	spdk_json_write_named_string(w, "core_device", vbdev->core.name);
730 
731 	spdk_json_write_named_string(w, "mode",
732 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
733 	spdk_json_write_named_uint32(w, "cache_line_size",
734 				     ocf_get_cache_line_size(vbdev->ocf_cache));
735 	spdk_json_write_named_bool(w, "metadata_volatile",
736 				   vbdev->cfg.cache.metadata_volatile);
737 
738 	return 0;
739 }
740 
741 static void
742 vbdev_ocf_write_json_config(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
743 {
744 	struct vbdev_ocf *vbdev = bdev->ctxt;
745 
746 	spdk_json_write_object_begin(w);
747 
748 	spdk_json_write_named_string(w, "method", "bdev_ocf_create");
749 
750 	spdk_json_write_named_object_begin(w, "params");
751 	spdk_json_write_named_string(w, "name", vbdev->name);
752 	spdk_json_write_named_string(w, "mode",
753 				     ocf_get_cache_modename(ocf_cache_get_mode(vbdev->ocf_cache)));
754 	spdk_json_write_named_uint32(w, "cache_line_size",
755 				     ocf_get_cache_line_size(vbdev->ocf_cache));
756 	spdk_json_write_named_string(w, "cache_bdev_name", vbdev->cache.name);
757 	spdk_json_write_named_string(w, "core_bdev_name", vbdev->core.name);
758 	spdk_json_write_object_end(w);
759 
760 	spdk_json_write_object_end(w);
761 }
762 
763 /* Cache vbdev function table
764  * Used by bdev layer */
765 static struct spdk_bdev_fn_table cache_dev_fn_table = {
766 	.destruct = vbdev_ocf_destruct,
767 	.io_type_supported = vbdev_ocf_io_type_supported,
768 	.submit_request	= vbdev_ocf_submit_request,
769 	.get_io_channel	= vbdev_ocf_get_io_channel,
770 	.write_config_json = vbdev_ocf_write_json_config,
771 	.dump_info_json = vbdev_ocf_dump_info_json,
772 };
773 
774 /* Poller function for the OCF queue
775  * We execute OCF requests here synchronously */
776 static int
777 queue_poll(void *opaque)
778 {
779 	struct vbdev_ocf_qctx *qctx = opaque;
780 	uint32_t iono = ocf_queue_pending_io(qctx->queue);
781 	int i, max = spdk_min(32, iono);
782 
783 	for (i = 0; i < max; i++) {
784 		ocf_queue_run_single(qctx->queue);
785 	}
786 
787 	if (iono > 0) {
788 		return SPDK_POLLER_BUSY;
789 	} else {
790 		return SPDK_POLLER_IDLE;
791 	}
792 }
793 
794 /* Called during ocf_submit_io, ocf_purge*
795  * and any other requests that need to submit io */
796 static void
797 vbdev_ocf_ctx_queue_kick(ocf_queue_t q)
798 {
799 }
800 
801 /* OCF queue deinitialization
802  * Called at ocf_cache_stop */
803 static void
804 vbdev_ocf_ctx_queue_stop(ocf_queue_t q)
805 {
806 	struct vbdev_ocf_qctx *qctx = ocf_queue_get_priv(q);
807 
808 	if (qctx) {
809 		spdk_put_io_channel(qctx->cache_ch);
810 		spdk_put_io_channel(qctx->core_ch);
811 		spdk_poller_unregister(&qctx->poller);
812 		if (qctx->allocated) {
813 			free(qctx);
814 		}
815 	}
816 }
817 
818 /* Queue ops is an interface for running queue thread
819  * stop() operation in called just before queue gets destroyed */
820 const struct ocf_queue_ops queue_ops = {
821 	.kick_sync = vbdev_ocf_ctx_queue_kick,
822 	.kick = vbdev_ocf_ctx_queue_kick,
823 	.stop = vbdev_ocf_ctx_queue_stop,
824 };
825 
826 /* Called on cache vbdev creation at every thread
827  * We allocate OCF queues here and SPDK poller for it */
828 static int
829 io_device_create_cb(void *io_device, void *ctx_buf)
830 {
831 	struct vbdev_ocf *vbdev = io_device;
832 	struct vbdev_ocf_qctx *qctx = ctx_buf;
833 	int rc;
834 
835 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &qctx->queue, &queue_ops);
836 	if (rc) {
837 		return rc;
838 	}
839 
840 	ocf_queue_set_priv(qctx->queue, qctx);
841 
842 	qctx->vbdev      = vbdev;
843 	qctx->cache_ch   = spdk_bdev_get_io_channel(vbdev->cache.desc);
844 	qctx->core_ch    = spdk_bdev_get_io_channel(vbdev->core.desc);
845 	qctx->poller     = SPDK_POLLER_REGISTER(queue_poll, qctx, 0);
846 
847 	return rc;
848 }
849 
850 /* Called per thread
851  * Put OCF queue and relaunch poller with new context to finish pending requests */
852 static void
853 io_device_destroy_cb(void *io_device, void *ctx_buf)
854 {
855 	/* Making a copy of context to use it after io channel will be destroyed */
856 	struct vbdev_ocf_qctx *copy = malloc(sizeof(*copy));
857 	struct vbdev_ocf_qctx *qctx = ctx_buf;
858 
859 	if (copy) {
860 		ocf_queue_set_priv(qctx->queue, copy);
861 		memcpy(copy, qctx, sizeof(*copy));
862 		spdk_poller_unregister(&qctx->poller);
863 		copy->poller = SPDK_POLLER_REGISTER(queue_poll, copy, 0);
864 		copy->allocated = true;
865 	} else {
866 		SPDK_ERRLOG("Unable to stop OCF queue properly: %s\n",
867 			    spdk_strerror(ENOMEM));
868 	}
869 
870 	vbdev_ocf_queue_put(qctx->queue);
871 }
872 
873 /* OCF management queue deinitialization */
874 static void
875 vbdev_ocf_ctx_mngt_queue_stop(ocf_queue_t q)
876 {
877 	struct spdk_poller *poller = ocf_queue_get_priv(q);
878 
879 	if (poller) {
880 		spdk_poller_unregister(&poller);
881 	}
882 }
883 
884 static int
885 mngt_queue_poll(void *opaque)
886 {
887 	ocf_queue_t q = opaque;
888 	uint32_t iono = ocf_queue_pending_io(q);
889 	int i, max = spdk_min(32, iono);
890 
891 	for (i = 0; i < max; i++) {
892 		ocf_queue_run_single(q);
893 	}
894 
895 	if (iono > 0) {
896 		return SPDK_POLLER_BUSY;
897 	} else {
898 		return SPDK_POLLER_IDLE;
899 	}
900 }
901 
902 static void
903 vbdev_ocf_ctx_mngt_queue_kick(ocf_queue_t q)
904 {
905 }
906 
907 /* Queue ops is an interface for running queue thread
908  * stop() operation in called just before queue gets destroyed */
909 const struct ocf_queue_ops mngt_queue_ops = {
910 	.kick_sync = NULL,
911 	.kick = vbdev_ocf_ctx_mngt_queue_kick,
912 	.stop = vbdev_ocf_ctx_mngt_queue_stop,
913 };
914 
915 static void
916 vbdev_ocf_mngt_exit(struct vbdev_ocf *vbdev, vbdev_ocf_mngt_fn *rollback_path, int rc)
917 {
918 	vbdev->state.starting = false;
919 	vbdev_ocf_mngt_stop(vbdev, rollback_path, rc);
920 }
921 
922 /* Create exported spdk object */
923 static void
924 finish_register(struct vbdev_ocf *vbdev)
925 {
926 	struct spdk_uuid ns_uuid;
927 	int result;
928 
929 	/* Copy properties of the base bdev */
930 	vbdev->exp_bdev.blocklen = vbdev->core.bdev->blocklen;
931 	vbdev->exp_bdev.write_cache = vbdev->core.bdev->write_cache;
932 	vbdev->exp_bdev.required_alignment = vbdev->core.bdev->required_alignment;
933 
934 	vbdev->exp_bdev.name = vbdev->name;
935 	vbdev->exp_bdev.product_name = "SPDK OCF";
936 
937 	vbdev->exp_bdev.blockcnt = vbdev->core.bdev->blockcnt;
938 	vbdev->exp_bdev.ctxt = vbdev;
939 	vbdev->exp_bdev.fn_table = &cache_dev_fn_table;
940 	vbdev->exp_bdev.module = &ocf_if;
941 
942 	/* Generate UUID based on namespace UUID + base bdev UUID. */
943 	spdk_uuid_parse(&ns_uuid, BDEV_OCF_NAMESPACE_UUID);
944 	result = spdk_uuid_generate_sha1(&vbdev->exp_bdev.uuid, &ns_uuid,
945 					 (const char *)&vbdev->core.bdev->uuid, sizeof(struct spdk_uuid));
946 	if (result) {
947 		SPDK_ERRLOG("Unable to generate new UUID for ocf bdev\n");
948 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, result);
949 		return;
950 	}
951 
952 	/* Finally register vbdev in SPDK */
953 	spdk_io_device_register(vbdev, io_device_create_cb, io_device_destroy_cb,
954 				sizeof(struct vbdev_ocf_qctx), vbdev->name);
955 	result = spdk_bdev_register(&vbdev->exp_bdev);
956 	if (result) {
957 		SPDK_ERRLOG("Could not register exposed bdev %s\n",
958 			    vbdev->name);
959 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, result);
960 		return;
961 	} else {
962 		vbdev->state.started = true;
963 	}
964 
965 	vbdev_ocf_mngt_continue(vbdev, result);
966 }
967 
968 static void
969 add_core_cmpl(ocf_cache_t cache, ocf_core_t core, void *priv, int error)
970 {
971 	struct vbdev_ocf *vbdev = priv;
972 
973 	ocf_mngt_cache_unlock(cache);
974 
975 	if (error) {
976 		SPDK_ERRLOG("Error %d, failed to add core device to cache instance %s,"
977 			    "starting rollback\n", error, vbdev->name);
978 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
979 		return;
980 	} else {
981 		vbdev->ocf_core = core;
982 	}
983 
984 	vbdev_ocf_mngt_continue(vbdev, error);
985 }
986 
987 /* Try to lock cache, then add core */
988 static void
989 add_core_cache_lock_cmpl(ocf_cache_t cache, void *priv, int error)
990 {
991 	struct vbdev_ocf *vbdev = (struct vbdev_ocf *)priv;
992 
993 	if (error) {
994 		SPDK_ERRLOG("Error %d, can not lock cache instance %s,"
995 			    "starting rollback\n", error, vbdev->name);
996 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
997 	}
998 	ocf_mngt_cache_add_core(vbdev->ocf_cache, &vbdev->cfg.core, add_core_cmpl, vbdev);
999 }
1000 
1001 /* Add core for existing OCF cache instance */
1002 static void
1003 add_core(struct vbdev_ocf *vbdev)
1004 {
1005 	ocf_mngt_cache_lock(vbdev->ocf_cache, add_core_cache_lock_cmpl, vbdev);
1006 }
1007 
1008 static void
1009 start_cache_cmpl(ocf_cache_t cache, void *priv, int error)
1010 {
1011 	struct vbdev_ocf *vbdev = priv;
1012 	uint64_t mem_needed;
1013 
1014 	ocf_mngt_cache_unlock(cache);
1015 
1016 	if (error) {
1017 		SPDK_ERRLOG("Error %d during start cache %s, starting rollback\n",
1018 			    error, vbdev->name);
1019 
1020 		if (error == -OCF_ERR_NO_MEM) {
1021 			ocf_mngt_get_ram_needed(cache, &vbdev->cfg.attach.device, &mem_needed);
1022 
1023 			SPDK_NOTICELOG("Try to increase hugepage memory size or cache line size. "
1024 				       "For your configuration:\nDevice size: %"PRIu64" bytes\n"
1025 				       "Cache line size: %"PRIu64" bytes\nFree memory needed to start "
1026 				       "cache: %"PRIu64" bytes\n", vbdev->cache.bdev->blockcnt *
1027 				       vbdev->cache.bdev->blocklen, vbdev->cfg.cache.cache_line_size,
1028 				       mem_needed);
1029 		}
1030 
1031 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, error);
1032 		return;
1033 	}
1034 
1035 	vbdev_ocf_mngt_continue(vbdev, error);
1036 }
1037 
1038 static int
1039 create_management_queue(struct vbdev_ocf *vbdev)
1040 {
1041 	struct spdk_poller *mngt_poller;
1042 	int rc;
1043 
1044 	rc = vbdev_ocf_queue_create(vbdev->ocf_cache, &vbdev->cache_ctx->mngt_queue, &mngt_queue_ops);
1045 	if (rc) {
1046 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1047 		return rc;
1048 	}
1049 
1050 	mngt_poller = SPDK_POLLER_REGISTER(mngt_queue_poll, vbdev->cache_ctx->mngt_queue, 100);
1051 	if (mngt_poller == NULL) {
1052 		SPDK_ERRLOG("Unable to initiate mngt request: %s", spdk_strerror(ENOMEM));
1053 		return -ENOMEM;
1054 	}
1055 
1056 	ocf_queue_set_priv(vbdev->cache_ctx->mngt_queue, mngt_poller);
1057 	ocf_mngt_cache_set_mngt_queue(vbdev->ocf_cache, vbdev->cache_ctx->mngt_queue);
1058 
1059 	return 0;
1060 }
1061 
1062 /* Start OCF cache, attach caching device */
1063 static void
1064 start_cache(struct vbdev_ocf *vbdev)
1065 {
1066 	ocf_cache_t existing;
1067 	uint32_t cache_block_size = vbdev->cache.bdev->blocklen;
1068 	uint32_t core_block_size = vbdev->core.bdev->blocklen;
1069 	int rc;
1070 
1071 	if (is_ocf_cache_running(vbdev)) {
1072 		vbdev_ocf_mngt_stop(vbdev, NULL, -EALREADY);
1073 		return;
1074 	}
1075 
1076 	if (cache_block_size > core_block_size) {
1077 		SPDK_ERRLOG("Cache bdev block size (%d) is bigger then core bdev block size (%d)\n",
1078 			    cache_block_size, core_block_size);
1079 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, -EINVAL);
1080 		return;
1081 	}
1082 
1083 	existing = get_other_cache_instance(vbdev);
1084 	if (existing) {
1085 		SPDK_NOTICELOG("OCF bdev %s connects to existing cache device %s\n",
1086 			       vbdev->name, vbdev->cache.name);
1087 		vbdev->ocf_cache = existing;
1088 		ocf_mngt_cache_get(vbdev->ocf_cache);
1089 		vbdev->cache_ctx = ocf_cache_get_priv(existing);
1090 		vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1091 		vbdev_ocf_mngt_continue(vbdev, 0);
1092 		return;
1093 	}
1094 
1095 	vbdev->cache_ctx = calloc(1, sizeof(struct vbdev_ocf_cache_ctx));
1096 	if (vbdev->cache_ctx == NULL) {
1097 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, -ENOMEM);
1098 		return;
1099 	}
1100 
1101 	vbdev_ocf_cache_ctx_get(vbdev->cache_ctx);
1102 	pthread_mutex_init(&vbdev->cache_ctx->lock, NULL);
1103 
1104 	rc = ocf_mngt_cache_start(vbdev_ocf_ctx, &vbdev->ocf_cache, &vbdev->cfg.cache, NULL);
1105 	if (rc) {
1106 		SPDK_ERRLOG("Could not start cache %s: %d\n", vbdev->name, rc);
1107 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, rc);
1108 		return;
1109 	}
1110 	ocf_mngt_cache_get(vbdev->ocf_cache);
1111 
1112 	ocf_cache_set_priv(vbdev->ocf_cache, vbdev->cache_ctx);
1113 
1114 	rc = create_management_queue(vbdev);
1115 	if (rc) {
1116 		SPDK_ERRLOG("Unable to create mngt_queue: %d\n", rc);
1117 		vbdev_ocf_mngt_exit(vbdev, unregister_path_dirty, rc);
1118 		return;
1119 	}
1120 
1121 	if (vbdev->cfg.loadq) {
1122 		ocf_mngt_cache_load(vbdev->ocf_cache, &vbdev->cfg.attach, start_cache_cmpl, vbdev);
1123 	} else {
1124 		ocf_mngt_cache_attach(vbdev->ocf_cache, &vbdev->cfg.attach, start_cache_cmpl, vbdev);
1125 	}
1126 }
1127 
1128 /* Procedures called during register operation */
1129 vbdev_ocf_mngt_fn register_path[] = {
1130 	start_cache,
1131 	add_core,
1132 	finish_register,
1133 	NULL
1134 };
1135 
1136 /* Start cache instance and register OCF bdev */
1137 static void
1138 register_vbdev(struct vbdev_ocf *vbdev, vbdev_ocf_mngt_callback cb, void *cb_arg)
1139 {
1140 	int rc;
1141 
1142 	if (!(vbdev->core.attached && vbdev->cache.attached) || vbdev->state.started) {
1143 		cb(-EPERM, vbdev, cb_arg);
1144 		return;
1145 	}
1146 
1147 	vbdev->state.starting = true;
1148 	rc = vbdev_ocf_mngt_start(vbdev, register_path, cb, cb_arg);
1149 	if (rc) {
1150 		cb(rc, vbdev, cb_arg);
1151 	}
1152 }
1153 
1154 /* Init OCF configuration options
1155  * for core and cache devices */
1156 static int
1157 init_vbdev_config(struct vbdev_ocf *vbdev)
1158 {
1159 	struct vbdev_ocf_config *cfg = &vbdev->cfg;
1160 	struct ocf_volume_uuid uuid;
1161 	ocf_volume_type_t type;
1162 	int ret;
1163 
1164 
1165 	/* Initialize OCF defaults first */
1166 	ocf_mngt_cache_attach_config_set_default(&cfg->attach);
1167 	ocf_mngt_cache_config_set_default(&cfg->cache);
1168 	ocf_mngt_core_config_set_default(&cfg->core);
1169 
1170 	snprintf(cfg->cache.name, sizeof(cfg->cache.name), "%s", vbdev->name);
1171 	snprintf(cfg->core.name, sizeof(cfg->core.name), "%s", vbdev->core.name);
1172 
1173 	cfg->attach.open_cores = false;
1174 	cfg->attach.device.perform_test = false;
1175 	cfg->attach.discard_on_start = false;
1176 
1177 	vbdev->cfg.cache.locked = true;
1178 
1179 	cfg->core.volume_type = SPDK_OBJECT;
1180 
1181 	if (vbdev->cfg.loadq) {
1182 		/* When doing cache_load(), we need to set try_add to true,
1183 		 * otherwise OCF will interpret this core as new
1184 		 * instead of the inactive one */
1185 		vbdev->cfg.core.try_add = true;
1186 	} else {
1187 		/* When cache is initialized as new, set force flag to true,
1188 		 * to ignore warnings about existing metadata */
1189 		cfg->attach.force = true;
1190 	}
1191 
1192 	/* Serialize bdev names in OCF UUID to interpret on future loads
1193 	 * Core UUID is a triple of (core name, vbdev name, cache name)
1194 	 * Cache UUID is cache bdev name */
1195 	type = ocf_ctx_get_volume_type(vbdev_ocf_ctx, SPDK_OBJECT);
1196 	if (!type) {
1197 		SPDK_ERRLOG("Fail to get volume type\n");
1198 		return -EINVAL;
1199 	}
1200 	uuid.size = strlen(vbdev->cache.name) + 1;
1201 	uuid.data = vbdev->cache.name;
1202 	ret = ocf_volume_create(&cfg->attach.device.volume, type, &uuid);
1203 	if (ret) {
1204 		SPDK_ERRLOG("Fail to create volume\n");
1205 		return -EINVAL;
1206 	}
1207 
1208 	snprintf(vbdev->uuid, VBDEV_OCF_MD_MAX_LEN, "%s %s %s",
1209 		 vbdev->core.name, vbdev->name, vbdev->cache.name);
1210 	cfg->core.uuid.size = strlen(vbdev->uuid) + 1;
1211 	cfg->core.uuid.data = vbdev->uuid;
1212 	vbdev->uuid[strlen(vbdev->core.name)] = 0;
1213 	vbdev->uuid[strlen(vbdev->core.name) + 1 + strlen(vbdev->name)] = 0;
1214 
1215 	return 0;
1216 }
1217 
1218 /* Allocate vbdev structure object and add it to the global list */
1219 static int
1220 init_vbdev(const char *vbdev_name,
1221 	   const char *cache_mode_name,
1222 	   const uint64_t cache_line_size,
1223 	   const char *cache_name,
1224 	   const char *core_name,
1225 	   bool loadq)
1226 {
1227 	struct vbdev_ocf *vbdev;
1228 	int rc = 0;
1229 
1230 	if (spdk_bdev_get_by_name(vbdev_name) || vbdev_ocf_get_by_name(vbdev_name)) {
1231 		SPDK_ERRLOG("Device with name '%s' already exists\n", vbdev_name);
1232 		return -EPERM;
1233 	}
1234 
1235 	vbdev = calloc(1, sizeof(*vbdev));
1236 	if (!vbdev) {
1237 		goto error_mem;
1238 	}
1239 
1240 	vbdev->name = strdup(vbdev_name);
1241 	if (!vbdev->name) {
1242 		goto error_mem;
1243 	}
1244 
1245 	vbdev->cache.name = strdup(cache_name);
1246 	if (!vbdev->cache.name) {
1247 		goto error_mem;
1248 	}
1249 
1250 	vbdev->core.name = strdup(core_name);
1251 	if (!vbdev->core.name) {
1252 		goto error_mem;
1253 	}
1254 
1255 	vbdev->cache.parent = vbdev;
1256 	vbdev->core.parent = vbdev;
1257 	vbdev->cache.is_cache = true;
1258 	vbdev->core.is_cache = false;
1259 	vbdev->cfg.loadq = loadq;
1260 
1261 	rc = init_vbdev_config(vbdev);
1262 	if (rc) {
1263 		SPDK_ERRLOG("Fail to init vbdev config\n");
1264 		goto error_free;
1265 	}
1266 
1267 
1268 	if (cache_mode_name) {
1269 		vbdev->cfg.cache.cache_mode
1270 			= ocf_get_cache_mode(cache_mode_name);
1271 	} else if (!loadq) { /* In load path it is OK to pass NULL as cache mode */
1272 		SPDK_ERRLOG("No cache mode specified\n");
1273 		rc = -EINVAL;
1274 		goto error_free;
1275 	}
1276 	if (vbdev->cfg.cache.cache_mode < 0) {
1277 		SPDK_ERRLOG("Incorrect cache mode '%s'\n", cache_mode_name);
1278 		rc = -EINVAL;
1279 		goto error_free;
1280 	}
1281 
1282 	ocf_cache_line_size_t set_cache_line_size = cache_line_size ?
1283 			(ocf_cache_line_size_t)cache_line_size * KiB :
1284 			ocf_cache_line_size_default;
1285 	if (set_cache_line_size == 0) {
1286 		SPDK_ERRLOG("Cache line size should be non-zero.\n");
1287 		rc = -EINVAL;
1288 		goto error_free;
1289 	}
1290 	vbdev->cfg.attach.cache_line_size = set_cache_line_size;
1291 	vbdev->cfg.cache.cache_line_size = set_cache_line_size;
1292 
1293 	TAILQ_INSERT_TAIL(&g_ocf_vbdev_head, vbdev, tailq);
1294 	return rc;
1295 
1296 error_mem:
1297 	rc = -ENOMEM;
1298 error_free:
1299 	free_vbdev(vbdev);
1300 	return rc;
1301 }
1302 
1303 /* Read configuration file at the start of SPDK application
1304  * This adds vbdevs to global list if some mentioned in config */
1305 static int
1306 vbdev_ocf_init(void)
1307 {
1308 	int status;
1309 
1310 	status = vbdev_ocf_ctx_init();
1311 	if (status) {
1312 		SPDK_ERRLOG("OCF ctx initialization failed with=%d\n", status);
1313 		return status;
1314 	}
1315 
1316 	status = vbdev_ocf_volume_init();
1317 	if (status) {
1318 		vbdev_ocf_ctx_cleanup();
1319 		SPDK_ERRLOG("OCF volume initialization failed with=%d\n", status);
1320 		return status;
1321 	}
1322 
1323 	return status;
1324 }
1325 
1326 /* Called after application shutdown started
1327  * Release memory of allocated structures here */
1328 static void
1329 vbdev_ocf_module_fini(void)
1330 {
1331 	struct vbdev_ocf *vbdev;
1332 
1333 	while ((vbdev = TAILQ_FIRST(&g_ocf_vbdev_head))) {
1334 		TAILQ_REMOVE(&g_ocf_vbdev_head, vbdev, tailq);
1335 		free_vbdev(vbdev);
1336 	}
1337 
1338 	vbdev_ocf_volume_cleanup();
1339 	vbdev_ocf_ctx_cleanup();
1340 }
1341 
1342 /* When base device gets unplugged this is called
1343  * We will unregister cache vbdev here
1344  * When cache device is removed, we delete every OCF bdev that used it */
1345 static void
1346 hotremove_cb(struct vbdev_ocf_base *base)
1347 {
1348 	struct vbdev_ocf *vbdev;
1349 
1350 	if (!base->is_cache) {
1351 		if (base->parent->state.doing_finish) {
1352 			return;
1353 		}
1354 
1355 		SPDK_NOTICELOG("Deinitializing '%s' because its core device '%s' was removed\n",
1356 			       base->parent->name, base->name);
1357 		vbdev_ocf_delete(base->parent, NULL, NULL);
1358 		return;
1359 	}
1360 
1361 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1362 		if (vbdev->state.doing_finish) {
1363 			continue;
1364 		}
1365 		if (strcmp(base->name, vbdev->cache.name) == 0) {
1366 			SPDK_NOTICELOG("Deinitializing '%s' because"
1367 				       " its cache device '%s' was removed\n",
1368 				       vbdev->name, base->name);
1369 			vbdev_ocf_delete(vbdev, NULL, NULL);
1370 		}
1371 	}
1372 }
1373 
1374 static void
1375 base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1376 		   void *event_ctx)
1377 {
1378 	switch (type) {
1379 	case SPDK_BDEV_EVENT_REMOVE:
1380 		if (event_ctx) {
1381 			hotremove_cb(event_ctx);
1382 		}
1383 		break;
1384 	default:
1385 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1386 		break;
1387 	}
1388 }
1389 
1390 /* Open base SPDK bdev and claim it */
1391 static int
1392 attach_base(struct vbdev_ocf_base *base)
1393 {
1394 	int status;
1395 
1396 	if (base->attached) {
1397 		return -EALREADY;
1398 	}
1399 
1400 	/* If base cache bdev was already opened by other vbdev,
1401 	 * we just copy its descriptor here */
1402 	if (base->is_cache) {
1403 		struct vbdev_ocf_base *existing = get_other_cache_base(base);
1404 		if (existing) {
1405 			base->desc = existing->desc;
1406 			base->management_channel = existing->management_channel;
1407 			base->attached = true;
1408 			return 0;
1409 		}
1410 	}
1411 
1412 	status = spdk_bdev_open_ext(base->name, true, base_bdev_event_cb, base, &base->desc);
1413 	if (status) {
1414 		SPDK_ERRLOG("Unable to open device '%s' for writing\n", base->name);
1415 		return status;
1416 	}
1417 
1418 	status = spdk_bdev_module_claim_bdev(base->bdev, base->desc,
1419 					     &ocf_if);
1420 	if (status) {
1421 		SPDK_ERRLOG("Unable to claim device '%s'\n", base->name);
1422 		spdk_bdev_close(base->desc);
1423 		return status;
1424 	}
1425 
1426 	base->management_channel = spdk_bdev_get_io_channel(base->desc);
1427 	if (!base->management_channel) {
1428 		SPDK_ERRLOG("Unable to get io channel '%s'\n", base->name);
1429 		spdk_bdev_module_release_bdev(base->bdev);
1430 		spdk_bdev_close(base->desc);
1431 		return -ENOMEM;
1432 	}
1433 
1434 	/* Save the thread where the base device is opened */
1435 	base->thread = spdk_get_thread();
1436 
1437 	base->attached = true;
1438 	return status;
1439 }
1440 
1441 /* Attach base bdevs */
1442 static int
1443 attach_base_bdevs(struct vbdev_ocf *vbdev,
1444 		  struct spdk_bdev *cache_bdev,
1445 		  struct spdk_bdev *core_bdev)
1446 {
1447 	int rc = 0;
1448 
1449 	if (cache_bdev) {
1450 		vbdev->cache.bdev = cache_bdev;
1451 		rc |= attach_base(&vbdev->cache);
1452 	}
1453 
1454 	if (core_bdev) {
1455 		vbdev->core.bdev = core_bdev;
1456 		rc |= attach_base(&vbdev->core);
1457 	}
1458 
1459 	return rc;
1460 }
1461 
1462 /* Init and then start vbdev if all base devices are present */
1463 void
1464 vbdev_ocf_construct(const char *vbdev_name,
1465 		    const char *cache_mode_name,
1466 		    const uint64_t cache_line_size,
1467 		    const char *cache_name,
1468 		    const char *core_name,
1469 		    bool loadq,
1470 		    void (*cb)(int, struct vbdev_ocf *, void *),
1471 		    void *cb_arg)
1472 {
1473 	int rc;
1474 	struct spdk_bdev *cache_bdev = spdk_bdev_get_by_name(cache_name);
1475 	struct spdk_bdev *core_bdev = spdk_bdev_get_by_name(core_name);
1476 	struct vbdev_ocf *vbdev;
1477 
1478 	rc = init_vbdev(vbdev_name, cache_mode_name, cache_line_size, cache_name, core_name, loadq);
1479 	if (rc) {
1480 		cb(rc, NULL, cb_arg);
1481 		return;
1482 	}
1483 
1484 	vbdev = vbdev_ocf_get_by_name(vbdev_name);
1485 	if (vbdev == NULL) {
1486 		cb(-ENODEV, NULL, cb_arg);
1487 		return;
1488 	}
1489 
1490 	if (cache_bdev == NULL) {
1491 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for cache device '%s' to connect\n",
1492 			       vbdev->name, cache_name);
1493 	}
1494 	if (core_bdev == NULL) {
1495 		SPDK_NOTICELOG("OCF bdev '%s' is waiting for core device '%s' to connect\n",
1496 			       vbdev->name, core_name);
1497 	}
1498 
1499 	rc = attach_base_bdevs(vbdev, cache_bdev, core_bdev);
1500 	if (rc) {
1501 		cb(rc, vbdev, cb_arg);
1502 		return;
1503 	}
1504 
1505 	if (core_bdev && cache_bdev) {
1506 		register_vbdev(vbdev, cb, cb_arg);
1507 	} else {
1508 		cb(0, vbdev, cb_arg);
1509 	}
1510 }
1511 
1512 /* Set new cache mode on OCF cache */
1513 void
1514 vbdev_ocf_set_cache_mode(struct vbdev_ocf *vbdev,
1515 			 const char *cache_mode_name,
1516 			 void (*cb)(int, struct vbdev_ocf *, void *),
1517 			 void *cb_arg)
1518 {
1519 	ocf_cache_t cache;
1520 	ocf_cache_mode_t cache_mode;
1521 	int rc;
1522 
1523 	cache = vbdev->ocf_cache;
1524 	cache_mode = ocf_get_cache_mode(cache_mode_name);
1525 
1526 	rc = ocf_mngt_cache_trylock(cache);
1527 	if (rc) {
1528 		cb(rc, vbdev, cb_arg);
1529 		return;
1530 	}
1531 
1532 	rc = ocf_mngt_cache_set_mode(cache, cache_mode);
1533 	ocf_mngt_cache_unlock(cache);
1534 	cb(rc, vbdev, cb_arg);
1535 }
1536 
1537 /* Set sequential cutoff parameters on OCF cache */
1538 void
1539 vbdev_ocf_set_seqcutoff(struct vbdev_ocf *vbdev, const char *policy_name, uint32_t threshold,
1540 			uint32_t promotion_count, void (*cb)(int, void *), void *cb_arg)
1541 {
1542 	ocf_cache_t cache;
1543 	ocf_seq_cutoff_policy policy;
1544 	int rc;
1545 
1546 	cache = vbdev->ocf_cache;
1547 
1548 	policy = ocf_get_seqcutoff_policy(policy_name);
1549 	if (policy == ocf_seq_cutoff_policy_max) {
1550 		cb(OCF_ERR_INVAL, cb_arg);
1551 		return;
1552 	}
1553 
1554 	rc = ocf_mngt_cache_trylock(cache);
1555 	if (rc) {
1556 		cb(rc, cb_arg);
1557 		return;
1558 	}
1559 
1560 	rc = ocf_mngt_core_set_seq_cutoff_policy_all(cache, policy);
1561 	if (rc) {
1562 		goto end;
1563 	}
1564 
1565 	if (threshold) {
1566 		threshold = threshold * KiB;
1567 
1568 		rc = ocf_mngt_core_set_seq_cutoff_threshold_all(cache, threshold);
1569 		if (rc) {
1570 			goto end;
1571 		}
1572 	}
1573 
1574 	if (promotion_count) {
1575 		rc = ocf_mngt_core_set_seq_cutoff_promotion_count_all(cache, promotion_count);
1576 	}
1577 
1578 end:
1579 	ocf_mngt_cache_unlock(cache);
1580 	cb(rc, cb_arg);
1581 }
1582 
1583 /* This called if new device is created in SPDK application
1584  * If that device named as one of base bdevs of OCF vbdev,
1585  * claim and open them */
1586 static void
1587 vbdev_ocf_examine(struct spdk_bdev *bdev)
1588 {
1589 	const char *bdev_name = spdk_bdev_get_name(bdev);
1590 	struct vbdev_ocf *vbdev;
1591 
1592 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1593 		if (vbdev->state.doing_finish) {
1594 			continue;
1595 		}
1596 
1597 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1598 			attach_base_bdevs(vbdev, bdev, NULL);
1599 			continue;
1600 		}
1601 		if (!strcmp(bdev_name, vbdev->core.name)) {
1602 			attach_base_bdevs(vbdev, NULL, bdev);
1603 			break;
1604 		}
1605 	}
1606 	spdk_bdev_module_examine_done(&ocf_if);
1607 }
1608 
1609 struct metadata_probe_ctx {
1610 	struct vbdev_ocf_base base;
1611 	ocf_volume_t volume;
1612 
1613 	struct ocf_volume_uuid *core_uuids;
1614 	unsigned int uuid_count;
1615 
1616 	int result;
1617 	int refcnt;
1618 };
1619 
1620 static void
1621 _examine_ctx_put(void *ctx)
1622 {
1623 	struct spdk_bdev_desc *desc = ctx;
1624 
1625 	spdk_bdev_close(desc);
1626 }
1627 
1628 static void
1629 examine_ctx_put(struct metadata_probe_ctx *ctx)
1630 {
1631 	unsigned int i;
1632 
1633 	ctx->refcnt--;
1634 	if (ctx->refcnt > 0) {
1635 		return;
1636 	}
1637 
1638 	if (ctx->result) {
1639 		SPDK_ERRLOG("OCF metadata probe for bdev '%s' failed with %d\n",
1640 			    spdk_bdev_get_name(ctx->base.bdev), ctx->result);
1641 	}
1642 
1643 	if (ctx->base.desc) {
1644 		/* Close the underlying bdev on its same opened thread. */
1645 		if (ctx->base.thread && ctx->base.thread != spdk_get_thread()) {
1646 			spdk_thread_send_msg(ctx->base.thread, _examine_ctx_put, ctx->base.desc);
1647 		} else {
1648 			spdk_bdev_close(ctx->base.desc);
1649 		}
1650 	}
1651 
1652 	if (ctx->volume) {
1653 		ocf_volume_destroy(ctx->volume);
1654 	}
1655 
1656 	if (ctx->core_uuids) {
1657 		for (i = 0; i < ctx->uuid_count; i++) {
1658 			free(ctx->core_uuids[i].data);
1659 		}
1660 	}
1661 	free(ctx->core_uuids);
1662 
1663 	examine_done(ctx->result, NULL, ctx->base.bdev);
1664 	free(ctx);
1665 }
1666 
1667 static void
1668 metadata_probe_cb(void *priv, int rc,
1669 		  struct ocf_metadata_probe_status *status)
1670 {
1671 	struct metadata_probe_ctx *ctx = priv;
1672 
1673 	if (rc) {
1674 		/* -ENODATA means device does not have cache metadata on it */
1675 		if (rc != -OCF_ERR_NO_METADATA) {
1676 			ctx->result = rc;
1677 		}
1678 	}
1679 
1680 	examine_ctx_put(ctx);
1681 }
1682 
1683 /* This is called after vbdev_ocf_examine
1684  * It allows to delay application initialization
1685  * until all OCF bdevs get registered
1686  * If vbdev has all of its base devices it starts asynchronously here
1687  * We first check if bdev appears in configuration,
1688  * if not we do metadata_probe() to create its configuration from bdev metadata */
1689 static void
1690 vbdev_ocf_examine_disk(struct spdk_bdev *bdev)
1691 {
1692 	const char *bdev_name = spdk_bdev_get_name(bdev);
1693 	struct vbdev_ocf *vbdev;
1694 	struct metadata_probe_ctx *ctx;
1695 	bool created_from_config = false;
1696 	int rc;
1697 
1698 	examine_start(bdev);
1699 
1700 	TAILQ_FOREACH(vbdev, &g_ocf_vbdev_head, tailq) {
1701 		if (vbdev->state.doing_finish || vbdev->state.started) {
1702 			continue;
1703 		}
1704 
1705 		if (!strcmp(bdev_name, vbdev->cache.name)) {
1706 			examine_start(bdev);
1707 			register_vbdev(vbdev, examine_done, bdev);
1708 			created_from_config = true;
1709 			continue;
1710 		}
1711 		if (!strcmp(bdev_name, vbdev->core.name)) {
1712 			examine_start(bdev);
1713 			register_vbdev(vbdev, examine_done, bdev);
1714 			examine_done(0, NULL, bdev);
1715 			return;
1716 		}
1717 	}
1718 
1719 	/* If devices is discovered during config we do not check for metadata */
1720 	if (created_from_config) {
1721 		examine_done(0, NULL, bdev);
1722 		return;
1723 	}
1724 
1725 	/* Metadata probe path
1726 	 * We create temporary OCF volume and a temporary base structure
1727 	 * to use them for ocf_metadata_probe() and for bottom adapter IOs
1728 	 * Then we get UUIDs of core devices an create configurations based on them */
1729 	ctx = calloc(1, sizeof(*ctx));
1730 	if (!ctx) {
1731 		examine_done(-ENOMEM, NULL, bdev);
1732 		return;
1733 	}
1734 
1735 	ctx->base.bdev = bdev;
1736 	ctx->refcnt = 1;
1737 
1738 	rc = spdk_bdev_open_ext(bdev_name, true, base_bdev_event_cb, NULL, &ctx->base.desc);
1739 	if (rc) {
1740 		ctx->result = rc;
1741 		examine_ctx_put(ctx);
1742 		return;
1743 	}
1744 
1745 	rc = ocf_ctx_volume_create(vbdev_ocf_ctx, &ctx->volume, NULL, SPDK_OBJECT);
1746 	if (rc) {
1747 		ctx->result = rc;
1748 		examine_ctx_put(ctx);
1749 		return;
1750 	}
1751 
1752 	rc = ocf_volume_open(ctx->volume, &ctx->base);
1753 	if (rc) {
1754 		ctx->result = rc;
1755 		examine_ctx_put(ctx);
1756 		return;
1757 	}
1758 
1759 	/* Save the thread where the base device is opened */
1760 	ctx->base.thread = spdk_get_thread();
1761 
1762 	ocf_metadata_probe(vbdev_ocf_ctx, ctx->volume, metadata_probe_cb, ctx);
1763 }
1764 
1765 static int
1766 vbdev_ocf_get_ctx_size(void)
1767 {
1768 	return sizeof(struct bdev_ocf_data);
1769 }
1770 
1771 static void
1772 fini_start(void)
1773 {
1774 	g_fini_started = true;
1775 }
1776 
1777 /* Module-global function table
1778  * Does not relate to vbdev instances */
1779 static struct spdk_bdev_module ocf_if = {
1780 	.name = "ocf",
1781 	.module_init = vbdev_ocf_init,
1782 	.fini_start = fini_start,
1783 	.module_fini = vbdev_ocf_module_fini,
1784 	.get_ctx_size = vbdev_ocf_get_ctx_size,
1785 	.examine_config = vbdev_ocf_examine,
1786 	.examine_disk   = vbdev_ocf_examine_disk,
1787 };
1788 SPDK_BDEV_MODULE_REGISTER(ocf, &ocf_if);
1789