xref: /spdk/lib/vhost/vhost_blk.c (revision 9bff828f99403ad2f3ac3b8b29b62acb83b24145)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include <linux/virtio_blk.h>
35 
36 #include "spdk/env.h"
37 #include "spdk/bdev.h"
38 #include "spdk/bdev_module.h"
39 #include "spdk/thread.h"
40 #include "spdk/likely.h"
41 #include "spdk/string.h"
42 #include "spdk/util.h"
43 #include "spdk/vhost.h"
44 
45 #include "vhost_internal.h"
46 #include <rte_version.h>
47 
48 /* Minimal set of features supported by every SPDK VHOST-BLK device */
49 #define SPDK_VHOST_BLK_FEATURES_BASE (SPDK_VHOST_FEATURES | \
50 		(1ULL << VIRTIO_BLK_F_SIZE_MAX) | (1ULL << VIRTIO_BLK_F_SEG_MAX) | \
51 		(1ULL << VIRTIO_BLK_F_GEOMETRY) | (1ULL << VIRTIO_BLK_F_BLK_SIZE) | \
52 		(1ULL << VIRTIO_BLK_F_TOPOLOGY) | (1ULL << VIRTIO_BLK_F_BARRIER)  | \
53 		(1ULL << VIRTIO_BLK_F_SCSI)     | (1ULL << VIRTIO_BLK_F_CONFIG_WCE) | \
54 		(1ULL << VIRTIO_BLK_F_MQ))
55 
56 /* Not supported features */
57 #define SPDK_VHOST_BLK_DISABLED_FEATURES (SPDK_VHOST_DISABLED_FEATURES | \
58 		(1ULL << VIRTIO_BLK_F_GEOMETRY) | (1ULL << VIRTIO_BLK_F_CONFIG_WCE) | \
59 		(1ULL << VIRTIO_BLK_F_BARRIER)  | (1ULL << VIRTIO_BLK_F_SCSI))
60 
61 /* Vhost-blk support protocol features */
62 #define SPDK_VHOST_BLK_PROTOCOL_FEATURES ((1ULL << VHOST_USER_PROTOCOL_F_CONFIG) | \
63 		(1ULL << VHOST_USER_PROTOCOL_F_INFLIGHT_SHMFD))
64 
65 struct spdk_vhost_user_blk_task {
66 	struct spdk_vhost_blk_task blk_task;
67 	struct spdk_vhost_blk_session *bvsession;
68 	struct spdk_vhost_virtqueue *vq;
69 
70 	uint16_t req_idx;
71 	uint16_t num_descs;
72 	uint16_t buffer_id;
73 	uint16_t inflight_head;
74 
75 	/* If set, the task is currently used for I/O processing. */
76 	bool used;
77 };
78 
79 struct spdk_vhost_blk_dev {
80 	struct spdk_vhost_dev vdev;
81 	struct spdk_bdev *bdev;
82 	struct spdk_bdev_desc *bdev_desc;
83 	/* dummy_io_channel is used to hold a bdev reference */
84 	struct spdk_io_channel *dummy_io_channel;
85 	bool readonly;
86 };
87 
88 struct spdk_vhost_blk_session {
89 	/* The parent session must be the very first field in this struct */
90 	struct spdk_vhost_session vsession;
91 	struct spdk_vhost_blk_dev *bvdev;
92 	struct spdk_poller *requestq_poller;
93 	struct spdk_io_channel *io_channel;
94 	struct spdk_poller *stop_poller;
95 };
96 
97 struct rpc_vhost_blk {
98 	bool readonly;
99 	bool packed_ring;
100 	bool packed_ring_recovery;
101 };
102 
103 static const struct spdk_json_object_decoder rpc_construct_vhost_blk[] = {
104 	{"readonly", offsetof(struct rpc_vhost_blk, readonly), spdk_json_decode_bool, true},
105 	{"packed_ring", offsetof(struct rpc_vhost_blk, packed_ring), spdk_json_decode_bool, true},
106 	{"packed_ring_recovery", offsetof(struct rpc_vhost_blk, packed_ring_recovery), spdk_json_decode_bool, true},
107 };
108 
109 /* forward declaration */
110 static const struct spdk_vhost_dev_backend vhost_blk_device_backend;
111 
112 static int
113 virtio_blk_process_request(struct spdk_vhost_dev *vdev, struct spdk_io_channel *ch,
114 			   struct spdk_vhost_blk_task *task);
115 
116 static int
117 vhost_user_process_blk_request(struct spdk_vhost_user_blk_task *user_task)
118 {
119 	struct spdk_vhost_blk_session *bvsession = user_task->bvsession;
120 	struct spdk_vhost_dev *vdev = &bvsession->bvdev->vdev;
121 
122 	return virtio_blk_process_request(vdev, bvsession->io_channel, &user_task->blk_task);
123 }
124 
125 static struct spdk_vhost_blk_dev *
126 to_blk_dev(struct spdk_vhost_dev *vdev)
127 {
128 	if (vdev == NULL) {
129 		return NULL;
130 	}
131 
132 	if (vdev->backend != &vhost_blk_device_backend) {
133 		SPDK_ERRLOG("%s: not a vhost-blk device\n", vdev->name);
134 		return NULL;
135 	}
136 
137 	return SPDK_CONTAINEROF(vdev, struct spdk_vhost_blk_dev, vdev);
138 }
139 
140 static struct spdk_vhost_blk_session *
141 to_blk_session(struct spdk_vhost_session *vsession)
142 {
143 	assert(vsession->vdev->backend == &vhost_blk_device_backend);
144 	return (struct spdk_vhost_blk_session *)vsession;
145 }
146 
147 static void
148 blk_task_finish(struct spdk_vhost_user_blk_task *task)
149 {
150 	assert(task->bvsession->vsession.task_cnt > 0);
151 	task->bvsession->vsession.task_cnt--;
152 	task->used = false;
153 }
154 
155 static void
156 blk_task_init(struct spdk_vhost_user_blk_task *task)
157 {
158 	struct spdk_vhost_blk_task *blk_task = &task->blk_task;
159 
160 	task->used = true;
161 	blk_task->iovcnt = SPDK_COUNTOF(blk_task->iovs);
162 	blk_task->status = NULL;
163 	blk_task->used_len = 0;
164 	blk_task->payload_size = 0;
165 }
166 
167 static void
168 blk_task_enqueue(struct spdk_vhost_user_blk_task *task)
169 {
170 	if (task->vq->packed.packed_ring) {
171 		vhost_vq_packed_ring_enqueue(&task->bvsession->vsession, task->vq,
172 					     task->num_descs,
173 					     task->buffer_id, task->blk_task.used_len,
174 					     task->inflight_head);
175 	} else {
176 		vhost_vq_used_ring_enqueue(&task->bvsession->vsession, task->vq,
177 					   task->req_idx, task->blk_task.used_len);
178 	}
179 }
180 
181 static void
182 vhost_user_blk_request_finish(uint8_t status, struct spdk_vhost_blk_task *task)
183 {
184 	struct spdk_vhost_user_blk_task *user_task;
185 
186 	user_task = SPDK_CONTAINEROF(task, struct spdk_vhost_user_blk_task, blk_task);
187 
188 	blk_task_enqueue(user_task);
189 
190 	SPDK_DEBUGLOG(vhost_blk, "Finished task (%p) req_idx=%d\n status: %" PRIu8"\n",
191 		      user_task, user_task->req_idx, status);
192 	blk_task_finish(user_task);
193 }
194 
195 static void
196 blk_request_finish(uint8_t status, struct spdk_vhost_blk_task *task)
197 {
198 
199 	if (task->status) {
200 		*task->status = status;
201 	}
202 
203 	vhost_user_blk_request_finish(status, task);
204 }
205 
206 /*
207  * Process task's descriptor chain and setup data related fields.
208  * Return
209  *   total size of supplied buffers
210  *
211  *   FIXME: Make this function return to rd_cnt and wr_cnt
212  */
213 static int
214 blk_iovs_split_queue_setup(struct spdk_vhost_blk_session *bvsession,
215 			   struct spdk_vhost_virtqueue *vq,
216 			   uint16_t req_idx, struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
217 {
218 	struct spdk_vhost_session *vsession = &bvsession->vsession;
219 	struct spdk_vhost_dev *vdev = vsession->vdev;
220 	struct vring_desc *desc, *desc_table;
221 	uint16_t out_cnt = 0, cnt = 0;
222 	uint32_t desc_table_size, len = 0;
223 	uint32_t desc_handled_cnt;
224 	int rc;
225 
226 	rc = vhost_vq_get_desc(vsession, vq, req_idx, &desc, &desc_table, &desc_table_size);
227 	if (rc != 0) {
228 		SPDK_ERRLOG("%s: invalid descriptor at index %"PRIu16".\n", vdev->name, req_idx);
229 		return -1;
230 	}
231 
232 	desc_handled_cnt = 0;
233 	while (1) {
234 		/*
235 		 * Maximum cnt reached?
236 		 * Should not happen if request is well formatted, otherwise this is a BUG.
237 		 */
238 		if (spdk_unlikely(cnt == *iovs_cnt)) {
239 			SPDK_DEBUGLOG(vhost_blk, "%s: max IOVs in request reached (req_idx = %"PRIu16").\n",
240 				      vsession->name, req_idx);
241 			return -1;
242 		}
243 
244 		if (spdk_unlikely(vhost_vring_desc_to_iov(vsession, iovs, &cnt, desc))) {
245 			SPDK_DEBUGLOG(vhost_blk, "%s: invalid descriptor %" PRIu16" (req_idx = %"PRIu16").\n",
246 				      vsession->name, req_idx, cnt);
247 			return -1;
248 		}
249 
250 		len += desc->len;
251 
252 		out_cnt += vhost_vring_desc_is_wr(desc);
253 
254 		rc = vhost_vring_desc_get_next(&desc, desc_table, desc_table_size);
255 		if (rc != 0) {
256 			SPDK_ERRLOG("%s: descriptor chain at index %"PRIu16" terminated unexpectedly.\n",
257 				    vsession->name, req_idx);
258 			return -1;
259 		} else if (desc == NULL) {
260 			break;
261 		}
262 
263 		desc_handled_cnt++;
264 		if (spdk_unlikely(desc_handled_cnt > desc_table_size)) {
265 			/* Break a cycle and report an error, if any. */
266 			SPDK_ERRLOG("%s: found a cycle in the descriptor chain: desc_table_size = %d, desc_handled_cnt = %d.\n",
267 				    vsession->name, desc_table_size, desc_handled_cnt);
268 			return -1;
269 		}
270 	}
271 
272 	/*
273 	 * There must be least two descriptors.
274 	 * First contain request so it must be readable.
275 	 * Last descriptor contain buffer for response so it must be writable.
276 	 */
277 	if (spdk_unlikely(out_cnt == 0 || cnt < 2)) {
278 		return -1;
279 	}
280 
281 	*length = len;
282 	*iovs_cnt = cnt;
283 	return 0;
284 }
285 
286 static int
287 blk_iovs_packed_desc_setup(struct spdk_vhost_session *vsession,
288 			   struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
289 			   struct vring_packed_desc *desc_table, uint16_t desc_table_size,
290 			   struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
291 {
292 	struct vring_packed_desc *desc;
293 	uint16_t cnt = 0, out_cnt = 0;
294 	uint32_t len = 0;
295 
296 	if (desc_table == NULL) {
297 		desc = &vq->vring.desc_packed[req_idx];
298 	} else {
299 		req_idx = 0;
300 		desc = desc_table;
301 	}
302 
303 	while (1) {
304 		/*
305 		 * Maximum cnt reached?
306 		 * Should not happen if request is well formatted, otherwise this is a BUG.
307 		 */
308 		if (spdk_unlikely(cnt == *iovs_cnt)) {
309 			SPDK_ERRLOG("%s: max IOVs in request reached (req_idx = %"PRIu16").\n",
310 				    vsession->name, req_idx);
311 			return -EINVAL;
312 		}
313 
314 		if (spdk_unlikely(vhost_vring_packed_desc_to_iov(vsession, iovs, &cnt, desc))) {
315 			SPDK_ERRLOG("%s: invalid descriptor %" PRIu16" (req_idx = %"PRIu16").\n",
316 				    vsession->name, req_idx, cnt);
317 			return -EINVAL;
318 		}
319 
320 		len += desc->len;
321 		out_cnt += vhost_vring_packed_desc_is_wr(desc);
322 
323 		/* desc is NULL means we reach the last desc of this request */
324 		vhost_vring_packed_desc_get_next(&desc, &req_idx, vq, desc_table, desc_table_size);
325 		if (desc == NULL) {
326 			break;
327 		}
328 	}
329 
330 	/*
331 	 * There must be least two descriptors.
332 	 * First contain request so it must be readable.
333 	 * Last descriptor contain buffer for response so it must be writable.
334 	 */
335 	if (spdk_unlikely(out_cnt == 0 || cnt < 2)) {
336 		return -EINVAL;
337 	}
338 
339 	*length = len;
340 	*iovs_cnt = cnt;
341 
342 	return 0;
343 }
344 
345 static int
346 blk_iovs_packed_queue_setup(struct spdk_vhost_blk_session *bvsession,
347 			    struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
348 			    struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
349 {
350 	struct spdk_vhost_session *vsession = &bvsession->vsession;
351 	struct spdk_vhost_dev *vdev = vsession->vdev;
352 	struct vring_packed_desc *desc = NULL, *desc_table;
353 	uint32_t desc_table_size;
354 	int rc;
355 
356 	rc = vhost_vq_get_desc_packed(vsession, vq, req_idx, &desc,
357 				      &desc_table, &desc_table_size);
358 	if (spdk_unlikely(rc != 0)) {
359 		SPDK_ERRLOG("%s: Invalid descriptor at index %"PRIu16".\n", vdev->name, req_idx);
360 		return rc;
361 	}
362 
363 	return blk_iovs_packed_desc_setup(vsession, vq, req_idx, desc_table, desc_table_size,
364 					  iovs, iovs_cnt, length);
365 }
366 
367 static int
368 blk_iovs_inflight_queue_setup(struct spdk_vhost_blk_session *bvsession,
369 			      struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
370 			      struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
371 {
372 	struct spdk_vhost_session *vsession = &bvsession->vsession;
373 	struct spdk_vhost_dev *vdev = vsession->vdev;
374 	spdk_vhost_inflight_desc *inflight_desc;
375 	struct vring_packed_desc *desc_table;
376 	uint16_t out_cnt = 0, cnt = 0;
377 	uint32_t desc_table_size, len = 0;
378 	int rc = 0;
379 
380 	rc = vhost_inflight_queue_get_desc(vsession, vq->vring_inflight.inflight_packed->desc,
381 					   req_idx, &inflight_desc, &desc_table, &desc_table_size);
382 	if (spdk_unlikely(rc != 0)) {
383 		SPDK_ERRLOG("%s: Invalid descriptor at index %"PRIu16".\n", vdev->name, req_idx);
384 		return rc;
385 	}
386 
387 	if (desc_table != NULL) {
388 		return blk_iovs_packed_desc_setup(vsession, vq, req_idx, desc_table, desc_table_size,
389 						  iovs, iovs_cnt, length);
390 	}
391 
392 	while (1) {
393 		/*
394 		 * Maximum cnt reached?
395 		 * Should not happen if request is well formatted, otherwise this is a BUG.
396 		 */
397 		if (spdk_unlikely(cnt == *iovs_cnt)) {
398 			SPDK_ERRLOG("%s: max IOVs in request reached (req_idx = %"PRIu16").\n",
399 				    vsession->name, req_idx);
400 			return -EINVAL;
401 		}
402 
403 		if (spdk_unlikely(vhost_vring_inflight_desc_to_iov(vsession, iovs, &cnt, inflight_desc))) {
404 			SPDK_ERRLOG("%s: invalid descriptor %" PRIu16" (req_idx = %"PRIu16").\n",
405 				    vsession->name, req_idx, cnt);
406 			return -EINVAL;
407 		}
408 
409 		len += inflight_desc->len;
410 		out_cnt += vhost_vring_inflight_desc_is_wr(inflight_desc);
411 
412 		/* Without F_NEXT means it's the last desc */
413 		if ((inflight_desc->flags & VRING_DESC_F_NEXT) == 0) {
414 			break;
415 		}
416 
417 		inflight_desc = &vq->vring_inflight.inflight_packed->desc[inflight_desc->next];
418 	}
419 
420 	/*
421 	 * There must be least two descriptors.
422 	 * First contain request so it must be readable.
423 	 * Last descriptor contain buffer for response so it must be writable.
424 	 */
425 	if (spdk_unlikely(out_cnt == 0 || cnt < 2)) {
426 		return -EINVAL;
427 	}
428 
429 	*length = len;
430 	*iovs_cnt = cnt;
431 
432 	return 0;
433 }
434 
435 static void
436 blk_request_complete_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
437 {
438 	struct spdk_vhost_blk_task *task = cb_arg;
439 
440 	spdk_bdev_free_io(bdev_io);
441 	blk_request_finish(success ? VIRTIO_BLK_S_OK : VIRTIO_BLK_S_IOERR, task);
442 }
443 
444 static void
445 blk_request_resubmit(void *arg)
446 {
447 	struct spdk_vhost_blk_task *task = arg;
448 	int rc = 0;
449 
450 	rc = virtio_blk_process_request(task->bdev_io_wait_vdev, task->bdev_io_wait_ch, task);
451 	if (rc == 0) {
452 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p resubmitted ======\n", task);
453 	} else {
454 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p failed ======\n", task);
455 	}
456 }
457 
458 static inline void
459 blk_request_queue_io(struct spdk_vhost_dev *vdev, struct spdk_io_channel *ch,
460 		     struct spdk_vhost_blk_task *task)
461 {
462 	int rc;
463 	struct spdk_bdev *bdev = task->bdev_io->bdev;
464 
465 	task->bdev_io_wait.bdev = bdev;
466 	task->bdev_io_wait.cb_fn = blk_request_resubmit;
467 	task->bdev_io_wait.cb_arg = task;
468 	task->bdev_io_wait_ch = ch;
469 	task->bdev_io_wait_vdev = vdev;
470 
471 	rc = spdk_bdev_queue_io_wait(bdev, ch, &task->bdev_io_wait);
472 	if (rc != 0) {
473 		blk_request_finish(VIRTIO_BLK_S_IOERR, task);
474 	}
475 }
476 
477 static int
478 virtio_blk_process_request(struct spdk_vhost_dev *vdev, struct spdk_io_channel *ch,
479 			   struct spdk_vhost_blk_task *task)
480 {
481 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
482 	struct virtio_blk_outhdr req;
483 	struct virtio_blk_discard_write_zeroes *desc;
484 	struct iovec *iov;
485 	uint32_t type;
486 	uint64_t flush_bytes;
487 	uint32_t payload_len;
488 	uint16_t iovcnt;
489 	int rc;
490 
491 	iov = &task->iovs[0];
492 	if (spdk_unlikely(iov->iov_len != sizeof(req))) {
493 		SPDK_DEBUGLOG(vhost_blk,
494 			      "First descriptor size is %zu but expected %zu (task = %p).\n",
495 			      iov->iov_len, sizeof(req), task);
496 		blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
497 		return -1;
498 	}
499 
500 	/* Some SeaBIOS versions don't align the virtio_blk_outhdr on an 8-byte boundary, which
501 	 * triggers ubsan errors.  So copy this small 16-byte structure to the stack to workaround
502 	 * this problem.
503 	 */
504 	memcpy(&req, iov->iov_base, sizeof(req));
505 
506 	iov = &task->iovs[task->iovcnt - 1];
507 	if (spdk_unlikely(iov->iov_len != 1)) {
508 		SPDK_DEBUGLOG(vhost_blk,
509 			      "Last descriptor size is %zu but expected %d (task = %p).\n",
510 			      iov->iov_len, 1, task);
511 		blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
512 		return -1;
513 	}
514 
515 	payload_len = task->payload_size;
516 	task->status = iov->iov_base;
517 	payload_len -= sizeof(req) + sizeof(*task->status);
518 	iovcnt = task->iovcnt - 2;
519 
520 	type = req.type;
521 #ifdef VIRTIO_BLK_T_BARRIER
522 	/* Don't care about barrier for now (as QEMU's virtio-blk do). */
523 	type &= ~VIRTIO_BLK_T_BARRIER;
524 #endif
525 
526 	switch (type) {
527 	case VIRTIO_BLK_T_IN:
528 	case VIRTIO_BLK_T_OUT:
529 		if (spdk_unlikely(payload_len == 0 || (payload_len & (512 - 1)) != 0)) {
530 			SPDK_ERRLOG("%s - passed IO buffer is not multiple of 512b (task = %p).\n",
531 				    type ? "WRITE" : "READ", task);
532 			blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
533 			return -1;
534 		}
535 
536 		if (type == VIRTIO_BLK_T_IN) {
537 			task->used_len = payload_len + sizeof(*task->status);
538 			rc = spdk_bdev_readv(bvdev->bdev_desc, ch,
539 					     &task->iovs[1], iovcnt, req.sector * 512,
540 					     payload_len, blk_request_complete_cb, task);
541 		} else if (!bvdev->readonly) {
542 			task->used_len = sizeof(*task->status);
543 			rc = spdk_bdev_writev(bvdev->bdev_desc, ch,
544 					      &task->iovs[1], iovcnt, req.sector * 512,
545 					      payload_len, blk_request_complete_cb, task);
546 		} else {
547 			SPDK_DEBUGLOG(vhost_blk, "Device is in read-only mode!\n");
548 			rc = -1;
549 		}
550 
551 		if (rc) {
552 			if (rc == -ENOMEM) {
553 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
554 				blk_request_queue_io(vdev, ch, task);
555 			} else {
556 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
557 				return -1;
558 			}
559 		}
560 		break;
561 	case VIRTIO_BLK_T_DISCARD:
562 		desc = task->iovs[1].iov_base;
563 		if (payload_len != sizeof(*desc)) {
564 			SPDK_NOTICELOG("Invalid discard payload size: %u\n", payload_len);
565 			blk_request_finish(VIRTIO_BLK_S_IOERR, task);
566 			return -1;
567 		}
568 
569 		if (desc->flags & VIRTIO_BLK_WRITE_ZEROES_FLAG_UNMAP) {
570 			SPDK_ERRLOG("UNMAP flag is only used for WRITE ZEROES command\n");
571 			blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
572 			return -1;
573 		}
574 
575 		rc = spdk_bdev_unmap(bvdev->bdev_desc, ch,
576 				     desc->sector * 512, desc->num_sectors * 512,
577 				     blk_request_complete_cb, task);
578 		if (rc) {
579 			if (rc == -ENOMEM) {
580 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
581 				blk_request_queue_io(vdev, ch, task);
582 			} else {
583 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
584 				return -1;
585 			}
586 		}
587 		break;
588 	case VIRTIO_BLK_T_WRITE_ZEROES:
589 		desc = task->iovs[1].iov_base;
590 		if (payload_len != sizeof(*desc)) {
591 			SPDK_NOTICELOG("Invalid write zeroes payload size: %u\n", payload_len);
592 			blk_request_finish(VIRTIO_BLK_S_IOERR, task);
593 			return -1;
594 		}
595 
596 		/* Unmap this range, SPDK doesn't support it, kernel will enable this flag by default
597 		 * without checking unmap feature is negotiated or not, the flag isn't mandatory, so
598 		 * just print a warning.
599 		 */
600 		if (desc->flags & VIRTIO_BLK_WRITE_ZEROES_FLAG_UNMAP) {
601 			SPDK_WARNLOG("Ignore the unmap flag for WRITE ZEROES from %"PRIx64", len %"PRIx64"\n",
602 				     (uint64_t)desc->sector * 512, (uint64_t)desc->num_sectors * 512);
603 		}
604 
605 		rc = spdk_bdev_write_zeroes(bvdev->bdev_desc, ch,
606 					    desc->sector * 512, desc->num_sectors * 512,
607 					    blk_request_complete_cb, task);
608 		if (rc) {
609 			if (rc == -ENOMEM) {
610 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
611 				blk_request_queue_io(vdev, ch, task);
612 			} else {
613 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
614 				return -1;
615 			}
616 		}
617 		break;
618 	case VIRTIO_BLK_T_FLUSH:
619 		flush_bytes = spdk_bdev_get_num_blocks(bvdev->bdev) * spdk_bdev_get_block_size(bvdev->bdev);
620 		if (req.sector != 0) {
621 			SPDK_NOTICELOG("sector must be zero for flush command\n");
622 			blk_request_finish(VIRTIO_BLK_S_IOERR, task);
623 			return -1;
624 		}
625 		rc = spdk_bdev_flush(bvdev->bdev_desc, ch,
626 				     0, flush_bytes,
627 				     blk_request_complete_cb, task);
628 		if (rc) {
629 			if (rc == -ENOMEM) {
630 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
631 				blk_request_queue_io(vdev, ch, task);
632 			} else {
633 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
634 				return -1;
635 			}
636 		}
637 		break;
638 	case VIRTIO_BLK_T_GET_ID:
639 		if (!iovcnt || !payload_len) {
640 			blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
641 			return -1;
642 		}
643 		task->used_len = spdk_min((size_t)VIRTIO_BLK_ID_BYTES, task->iovs[1].iov_len);
644 		spdk_strcpy_pad(task->iovs[1].iov_base, spdk_bdev_get_name(bvdev->bdev),
645 				task->used_len, ' ');
646 		blk_request_finish(VIRTIO_BLK_S_OK, task);
647 		break;
648 	default:
649 		SPDK_DEBUGLOG(vhost_blk, "Not supported request type '%"PRIu32"'.\n", type);
650 		blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
651 		return -1;
652 	}
653 
654 	return 0;
655 }
656 
657 static void
658 process_blk_task(struct spdk_vhost_virtqueue *vq, uint16_t req_idx)
659 {
660 	struct spdk_vhost_user_blk_task *task;
661 	struct spdk_vhost_blk_task *blk_task;
662 	int rc;
663 
664 	assert(vq->packed.packed_ring == false);
665 
666 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[req_idx];
667 	blk_task = &task->blk_task;
668 	if (spdk_unlikely(task->used)) {
669 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
670 			    task->bvsession->vsession.name, req_idx);
671 		blk_task->used_len = 0;
672 		blk_task_enqueue(task);
673 		return;
674 	}
675 
676 	task->bvsession->vsession.task_cnt++;
677 
678 	blk_task_init(task);
679 
680 	rc = blk_iovs_split_queue_setup(task->bvsession, vq, task->req_idx,
681 					blk_task->iovs, &blk_task->iovcnt, &blk_task->payload_size);
682 
683 	if (rc) {
684 		SPDK_DEBUGLOG(vhost_blk, "Invalid request (req_idx = %"PRIu16").\n", task->req_idx);
685 		/* Only READ and WRITE are supported for now. */
686 		vhost_user_blk_request_finish(VIRTIO_BLK_S_UNSUPP, blk_task);
687 		return;
688 	}
689 
690 	if (vhost_user_process_blk_request(task) == 0) {
691 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d submitted ======\n", task,
692 			      req_idx);
693 	} else {
694 		SPDK_ERRLOG("====== Task %p req_idx %d failed ======\n", task, req_idx);
695 	}
696 }
697 
698 static void
699 process_packed_blk_task(struct spdk_vhost_virtqueue *vq, uint16_t req_idx)
700 {
701 	struct spdk_vhost_user_blk_task *task;
702 	struct spdk_vhost_blk_task *blk_task;
703 	uint16_t task_idx = req_idx, num_descs;
704 	int rc;
705 
706 	assert(vq->packed.packed_ring);
707 
708 	/* Packed ring used the buffer_id as the task_idx to get task struct.
709 	 * In kernel driver, it uses the vq->free_head to set the buffer_id so the value
710 	 * must be in the range of 0 ~ vring.size. The free_head value must be unique
711 	 * in the outstanding requests.
712 	 * We can't use the req_idx as the task_idx because the desc can be reused in
713 	 * the next phase even when it's not completed in the previous phase. For example,
714 	 * At phase 0, last_used_idx was 2 and desc0 was not completed.Then after moving
715 	 * phase 1, last_avail_idx is updated to 1. In this case, req_idx can not be used
716 	 * as task_idx because we will know task[0]->used is true at phase 1.
717 	 * The split queue is quite different, the desc would insert into the free list when
718 	 * device completes the request, the driver gets the desc from the free list which
719 	 * ensures the req_idx is unique in the outstanding requests.
720 	 */
721 	task_idx = vhost_vring_packed_desc_get_buffer_id(vq, req_idx, &num_descs);
722 
723 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[task_idx];
724 	blk_task = &task->blk_task;
725 	if (spdk_unlikely(task->used)) {
726 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
727 			    task->bvsession->vsession.name, task_idx);
728 		blk_task->used_len = 0;
729 		blk_task_enqueue(task);
730 		return;
731 	}
732 
733 	task->req_idx = req_idx;
734 	task->num_descs = num_descs;
735 	task->buffer_id = task_idx;
736 
737 	rte_vhost_set_inflight_desc_packed(task->bvsession->vsession.vid, vq->vring_idx,
738 					   req_idx, (req_idx + num_descs - 1) % vq->vring.size,
739 					   &task->inflight_head);
740 
741 	task->bvsession->vsession.task_cnt++;
742 
743 	blk_task_init(task);
744 
745 	rc = blk_iovs_packed_queue_setup(task->bvsession, vq, task->req_idx, blk_task->iovs,
746 					 &blk_task->iovcnt,
747 					 &blk_task->payload_size);
748 	if (rc) {
749 		SPDK_DEBUGLOG(vhost_blk, "Invalid request (req_idx = %"PRIu16").\n", task->req_idx);
750 		/* Only READ and WRITE are supported for now. */
751 		vhost_user_blk_request_finish(VIRTIO_BLK_S_UNSUPP, blk_task);
752 		return;
753 	}
754 
755 	if (vhost_user_process_blk_request(task) == 0) {
756 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d submitted ======\n", task,
757 			      task_idx);
758 	} else {
759 		SPDK_ERRLOG("====== Task %p req_idx %d failed ======\n", task, task_idx);
760 	}
761 }
762 
763 static void
764 process_packed_inflight_blk_task(struct spdk_vhost_virtqueue *vq,
765 				 uint16_t req_idx)
766 {
767 	spdk_vhost_inflight_desc *desc_array = vq->vring_inflight.inflight_packed->desc;
768 	spdk_vhost_inflight_desc *desc = &desc_array[req_idx];
769 	struct spdk_vhost_user_blk_task *task;
770 	struct spdk_vhost_blk_task *blk_task;
771 	uint16_t task_idx, num_descs;
772 	int rc;
773 
774 	task_idx = desc_array[desc->last].id;
775 	num_descs = desc->num;
776 	/* In packed ring reconnection, we use the last_used_idx as the
777 	 * initial value. So when we process the inflight descs we still
778 	 * need to update the available ring index.
779 	 */
780 	vq->last_avail_idx += num_descs;
781 	if (vq->last_avail_idx >= vq->vring.size) {
782 		vq->last_avail_idx -= vq->vring.size;
783 		vq->packed.avail_phase = !vq->packed.avail_phase;
784 	}
785 
786 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[task_idx];
787 	blk_task = &task->blk_task;
788 	if (spdk_unlikely(task->used)) {
789 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
790 			    task->bvsession->vsession.name, task_idx);
791 		blk_task->used_len = 0;
792 		blk_task_enqueue(task);
793 		return;
794 	}
795 
796 	task->req_idx = req_idx;
797 	task->num_descs = num_descs;
798 	task->buffer_id = task_idx;
799 	/* It's for cleaning inflight entries */
800 	task->inflight_head = req_idx;
801 
802 	task->bvsession->vsession.task_cnt++;
803 
804 	blk_task_init(task);
805 
806 	rc = blk_iovs_inflight_queue_setup(task->bvsession, vq, task->req_idx, blk_task->iovs,
807 					   &blk_task->iovcnt,
808 					   &blk_task->payload_size);
809 	if (rc) {
810 		SPDK_DEBUGLOG(vhost_blk, "Invalid request (req_idx = %"PRIu16").\n", task->req_idx);
811 		/* Only READ and WRITE are supported for now. */
812 		vhost_user_blk_request_finish(VIRTIO_BLK_S_UNSUPP, blk_task);
813 		return;
814 	}
815 
816 	if (vhost_user_process_blk_request(task) == 0) {
817 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d submitted ======\n", task,
818 			      task_idx);
819 	} else {
820 		SPDK_ERRLOG("====== Task %p req_idx %d failed ======\n", task, task_idx);
821 	}
822 }
823 
824 static void
825 submit_inflight_desc(struct spdk_vhost_blk_session *bvsession,
826 		     struct spdk_vhost_virtqueue *vq)
827 {
828 	struct spdk_vhost_session *vsession;
829 	spdk_vhost_resubmit_info *resubmit;
830 	spdk_vhost_resubmit_desc *resubmit_list;
831 	uint16_t req_idx;
832 	int i;
833 
834 	resubmit = vq->vring_inflight.resubmit_inflight;
835 	if (spdk_likely(resubmit == NULL || resubmit->resubmit_list == NULL ||
836 			resubmit->resubmit_num == 0)) {
837 		return;
838 	}
839 
840 	resubmit_list = resubmit->resubmit_list;
841 	vsession = &bvsession->vsession;
842 
843 	for (i = resubmit->resubmit_num - 1; i >= 0; --i) {
844 		req_idx = resubmit_list[i].index;
845 		SPDK_DEBUGLOG(vhost_blk, "====== Start processing resubmit request idx %"PRIu16"======\n",
846 			      req_idx);
847 
848 		if (spdk_unlikely(req_idx >= vq->vring.size)) {
849 			SPDK_ERRLOG("%s: request idx '%"PRIu16"' exceeds virtqueue size (%"PRIu16").\n",
850 				    vsession->name, req_idx, vq->vring.size);
851 			vhost_vq_used_ring_enqueue(vsession, vq, req_idx, 0);
852 			continue;
853 		}
854 
855 		if (vq->packed.packed_ring) {
856 			process_packed_inflight_blk_task(vq, req_idx);
857 		} else {
858 			process_blk_task(vq, req_idx);
859 		}
860 	}
861 	resubmit->resubmit_num = 0;
862 }
863 
864 static void
865 process_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
866 {
867 	struct spdk_vhost_session *vsession = &bvsession->vsession;
868 	uint16_t reqs[SPDK_VHOST_VQ_MAX_SUBMISSIONS];
869 	uint16_t reqs_cnt, i;
870 
871 	submit_inflight_desc(bvsession, vq);
872 
873 	reqs_cnt = vhost_vq_avail_ring_get(vq, reqs, SPDK_COUNTOF(reqs));
874 	if (!reqs_cnt) {
875 		return;
876 	}
877 
878 	for (i = 0; i < reqs_cnt; i++) {
879 		SPDK_DEBUGLOG(vhost_blk, "====== Starting processing request idx %"PRIu16"======\n",
880 			      reqs[i]);
881 
882 		if (spdk_unlikely(reqs[i] >= vq->vring.size)) {
883 			SPDK_ERRLOG("%s: request idx '%"PRIu16"' exceeds virtqueue size (%"PRIu16").\n",
884 				    vsession->name, reqs[i], vq->vring.size);
885 			vhost_vq_used_ring_enqueue(vsession, vq, reqs[i], 0);
886 			continue;
887 		}
888 
889 		rte_vhost_set_inflight_desc_split(vsession->vid, vq->vring_idx, reqs[i]);
890 
891 		process_blk_task(vq, reqs[i]);
892 	}
893 }
894 
895 static void
896 process_packed_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
897 {
898 	uint16_t i = 0;
899 
900 	submit_inflight_desc(bvsession, vq);
901 
902 	while (i++ < SPDK_VHOST_VQ_MAX_SUBMISSIONS &&
903 	       vhost_vq_packed_ring_is_avail(vq)) {
904 		SPDK_DEBUGLOG(vhost_blk, "====== Starting processing request idx %"PRIu16"======\n",
905 			      vq->last_avail_idx);
906 
907 		process_packed_blk_task(vq, vq->last_avail_idx);
908 	}
909 }
910 
911 static int
912 _vdev_vq_worker(struct spdk_vhost_virtqueue *vq)
913 {
914 	struct spdk_vhost_session *vsession = vq->vsession;
915 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
916 	bool packed_ring;
917 
918 	packed_ring = vq->packed.packed_ring;
919 	if (packed_ring) {
920 		process_packed_vq(bvsession, vq);
921 	} else {
922 		process_vq(bvsession, vq);
923 	}
924 
925 	vhost_session_vq_used_signal(vq);
926 
927 	return SPDK_POLLER_BUSY;
928 
929 }
930 
931 static int
932 vdev_vq_worker(void *arg)
933 {
934 	struct spdk_vhost_virtqueue *vq = arg;
935 
936 	return _vdev_vq_worker(vq);
937 }
938 
939 static int
940 vdev_worker(void *arg)
941 {
942 	struct spdk_vhost_blk_session *bvsession = arg;
943 	struct spdk_vhost_session *vsession = &bvsession->vsession;
944 	uint16_t q_idx;
945 
946 	for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
947 		_vdev_vq_worker(&vsession->virtqueue[q_idx]);
948 	}
949 
950 	return SPDK_POLLER_BUSY;
951 }
952 
953 static void
954 no_bdev_process_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
955 {
956 	struct spdk_vhost_session *vsession = &bvsession->vsession;
957 	struct iovec iovs[SPDK_VHOST_IOVS_MAX];
958 	uint32_t length;
959 	uint16_t iovcnt, req_idx;
960 
961 	if (vhost_vq_avail_ring_get(vq, &req_idx, 1) != 1) {
962 		return;
963 	}
964 
965 	iovcnt = SPDK_COUNTOF(iovs);
966 	if (blk_iovs_split_queue_setup(bvsession, vq, req_idx, iovs, &iovcnt, &length) == 0) {
967 		*(volatile uint8_t *)iovs[iovcnt - 1].iov_base = VIRTIO_BLK_S_IOERR;
968 		SPDK_DEBUGLOG(vhost_blk_data, "Aborting request %" PRIu16"\n", req_idx);
969 	}
970 
971 	vhost_vq_used_ring_enqueue(vsession, vq, req_idx, 0);
972 }
973 
974 static void
975 no_bdev_process_packed_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
976 {
977 	struct spdk_vhost_session *vsession = &bvsession->vsession;
978 	struct spdk_vhost_user_blk_task *task;
979 	struct spdk_vhost_blk_task *blk_task;
980 	uint32_t length;
981 	uint16_t req_idx = vq->last_avail_idx;
982 	uint16_t task_idx, num_descs;
983 
984 	if (!vhost_vq_packed_ring_is_avail(vq)) {
985 		return;
986 	}
987 
988 	task_idx = vhost_vring_packed_desc_get_buffer_id(vq, req_idx, &num_descs);
989 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[task_idx];
990 	blk_task = &task->blk_task;
991 	if (spdk_unlikely(task->used)) {
992 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
993 			    vsession->name, req_idx);
994 		vhost_vq_packed_ring_enqueue(vsession, vq, num_descs,
995 					     task->buffer_id, blk_task->used_len,
996 					     task->inflight_head);
997 		return;
998 	}
999 
1000 	task->req_idx = req_idx;
1001 	task->num_descs = num_descs;
1002 	task->buffer_id = task_idx;
1003 	blk_task_init(task);
1004 
1005 	if (blk_iovs_packed_queue_setup(bvsession, vq, task->req_idx, blk_task->iovs, &blk_task->iovcnt,
1006 					&length)) {
1007 		*(volatile uint8_t *)(blk_task->iovs[blk_task->iovcnt - 1].iov_base) = VIRTIO_BLK_S_IOERR;
1008 		SPDK_DEBUGLOG(vhost_blk_data, "Aborting request %" PRIu16"\n", req_idx);
1009 	}
1010 
1011 	task->used = false;
1012 	vhost_vq_packed_ring_enqueue(vsession, vq, num_descs,
1013 				     task->buffer_id, blk_task->used_len,
1014 				     task->inflight_head);
1015 }
1016 
1017 static int
1018 _no_bdev_vdev_vq_worker(struct spdk_vhost_virtqueue *vq)
1019 {
1020 	struct spdk_vhost_session *vsession = vq->vsession;
1021 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1022 	bool packed_ring;
1023 
1024 	packed_ring = vq->packed.packed_ring;
1025 	if (packed_ring) {
1026 		no_bdev_process_packed_vq(bvsession, vq);
1027 	} else {
1028 		no_bdev_process_vq(bvsession, vq);
1029 	}
1030 
1031 	vhost_session_vq_used_signal(vq);
1032 
1033 	if (vsession->task_cnt == 0 && bvsession->io_channel) {
1034 		vhost_blk_put_io_channel(bvsession->io_channel);
1035 		bvsession->io_channel = NULL;
1036 	}
1037 
1038 	return SPDK_POLLER_BUSY;
1039 }
1040 
1041 static int
1042 no_bdev_vdev_vq_worker(void *arg)
1043 {
1044 	struct spdk_vhost_virtqueue *vq = arg;
1045 
1046 	return _no_bdev_vdev_vq_worker(vq);
1047 }
1048 
1049 static int
1050 no_bdev_vdev_worker(void *arg)
1051 {
1052 	struct spdk_vhost_blk_session *bvsession = arg;
1053 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1054 	uint16_t q_idx;
1055 
1056 	for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
1057 		_no_bdev_vdev_vq_worker(&vsession->virtqueue[q_idx]);
1058 	}
1059 
1060 	return SPDK_POLLER_BUSY;
1061 }
1062 
1063 static void
1064 vhost_blk_session_unregister_interrupts(struct spdk_vhost_blk_session *bvsession)
1065 {
1066 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1067 	struct spdk_vhost_virtqueue *vq;
1068 	int i;
1069 
1070 	SPDK_DEBUGLOG(vhost_blk, "unregister virtqueues interrupt\n");
1071 	for (i = 0; i < vsession->max_queues; i++) {
1072 		vq = &vsession->virtqueue[i];
1073 		if (vq->intr == NULL) {
1074 			break;
1075 		}
1076 
1077 		SPDK_DEBUGLOG(vhost_blk, "unregister vq[%d]'s kickfd is %d\n",
1078 			      i, vq->vring.kickfd);
1079 		spdk_interrupt_unregister(&vq->intr);
1080 	}
1081 }
1082 
1083 static int
1084 vhost_blk_session_register_interrupts(struct spdk_vhost_blk_session *bvsession,
1085 				      spdk_interrupt_fn fn, const char *name)
1086 {
1087 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1088 	struct spdk_vhost_virtqueue *vq = NULL;
1089 	int i;
1090 
1091 	SPDK_DEBUGLOG(vhost_blk, "Register virtqueues interrupt\n");
1092 	for (i = 0; i < vsession->max_queues; i++) {
1093 		vq = &vsession->virtqueue[i];
1094 		SPDK_DEBUGLOG(vhost_blk, "Register vq[%d]'s kickfd is %d\n",
1095 			      i, vq->vring.kickfd);
1096 
1097 		vq->intr = spdk_interrupt_register(vq->vring.kickfd, fn, vq, name);
1098 		if (vq->intr == NULL) {
1099 			SPDK_ERRLOG("Fail to register req notifier handler.\n");
1100 			goto err;
1101 		}
1102 	}
1103 
1104 	return 0;
1105 
1106 err:
1107 	vhost_blk_session_unregister_interrupts(bvsession);
1108 
1109 	return -1;
1110 }
1111 
1112 static void
1113 vhost_blk_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1114 {
1115 	struct spdk_vhost_blk_session *bvsession = cb_arg;
1116 
1117 	vhost_user_session_set_interrupt_mode(&bvsession->vsession, interrupt_mode);
1118 }
1119 
1120 static int
1121 vhost_session_bdev_resize_cb(struct spdk_vhost_dev *vdev,
1122 			     struct spdk_vhost_session *vsession,
1123 			     void *ctx)
1124 {
1125 #if RTE_VERSION >= RTE_VERSION_NUM(20, 02, 0, 0)
1126 	SPDK_NOTICELOG("bdev send slave msg to vid(%d)\n", vsession->vid);
1127 	rte_vhost_slave_config_change(vsession->vid, false);
1128 #else
1129 	SPDK_NOTICELOG("bdev does not support resize until DPDK submodule version >= 20.02\n");
1130 #endif
1131 
1132 	return 0;
1133 }
1134 
1135 static void
1136 blk_resize_cb(void *resize_ctx)
1137 {
1138 	struct spdk_vhost_blk_dev *bvdev = resize_ctx;
1139 
1140 	spdk_vhost_lock();
1141 	vhost_user_dev_foreach_session(&bvdev->vdev, vhost_session_bdev_resize_cb,
1142 				       NULL, NULL);
1143 	spdk_vhost_unlock();
1144 }
1145 
1146 static void
1147 vhost_dev_bdev_remove_cpl_cb(struct spdk_vhost_dev *vdev, void *ctx)
1148 {
1149 
1150 	/* All sessions have been notified, time to close the bdev */
1151 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1152 
1153 	assert(bvdev != NULL);
1154 	spdk_put_io_channel(bvdev->dummy_io_channel);
1155 	spdk_bdev_close(bvdev->bdev_desc);
1156 	bvdev->bdev_desc = NULL;
1157 	bvdev->bdev = NULL;
1158 }
1159 
1160 static int
1161 vhost_session_bdev_remove_cb(struct spdk_vhost_dev *vdev,
1162 			     struct spdk_vhost_session *vsession,
1163 			     void *ctx)
1164 {
1165 	struct spdk_vhost_blk_session *bvsession;
1166 	int rc;
1167 
1168 	bvsession = to_blk_session(vsession);
1169 	if (bvsession->requestq_poller) {
1170 		spdk_poller_unregister(&bvsession->requestq_poller);
1171 		if (vsession->virtqueue[0].intr) {
1172 			vhost_blk_session_unregister_interrupts(bvsession);
1173 			rc = vhost_blk_session_register_interrupts(bvsession, no_bdev_vdev_vq_worker,
1174 					"no_bdev_vdev_vq_worker");
1175 			if (rc) {
1176 				SPDK_ERRLOG("%s: Interrupt register failed\n", vsession->name);
1177 				return rc;
1178 			}
1179 		}
1180 
1181 		bvsession->requestq_poller = SPDK_POLLER_REGISTER(no_bdev_vdev_worker, bvsession, 0);
1182 		spdk_poller_register_interrupt(bvsession->requestq_poller, vhost_blk_poller_set_interrupt_mode,
1183 					       bvsession);
1184 	}
1185 
1186 	return 0;
1187 }
1188 
1189 static void
1190 bdev_remove_cb(void *remove_ctx)
1191 {
1192 	struct spdk_vhost_blk_dev *bvdev = remove_ctx;
1193 
1194 	SPDK_WARNLOG("%s: hot-removing bdev - all further requests will fail.\n",
1195 		     bvdev->vdev.name);
1196 
1197 	spdk_vhost_lock();
1198 	vhost_user_dev_foreach_session(&bvdev->vdev, vhost_session_bdev_remove_cb,
1199 				       vhost_dev_bdev_remove_cpl_cb, NULL);
1200 	spdk_vhost_unlock();
1201 }
1202 
1203 static void
1204 bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1205 	      void *event_ctx)
1206 {
1207 	SPDK_DEBUGLOG(vhost_blk, "Bdev event: type %d, name %s\n",
1208 		      type,
1209 		      bdev->name);
1210 
1211 	switch (type) {
1212 	case SPDK_BDEV_EVENT_REMOVE:
1213 		SPDK_NOTICELOG("bdev name (%s) received event(SPDK_BDEV_EVENT_REMOVE)\n", bdev->name);
1214 		bdev_remove_cb(event_ctx);
1215 		break;
1216 	case SPDK_BDEV_EVENT_RESIZE:
1217 		SPDK_NOTICELOG("bdev name (%s) received event(SPDK_BDEV_EVENT_RESIZE)\n", bdev->name);
1218 		blk_resize_cb(event_ctx);
1219 		break;
1220 	default:
1221 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1222 		break;
1223 	}
1224 }
1225 
1226 static void
1227 free_task_pool(struct spdk_vhost_blk_session *bvsession)
1228 {
1229 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1230 	struct spdk_vhost_virtqueue *vq;
1231 	uint16_t i;
1232 
1233 	for (i = 0; i < vsession->max_queues; i++) {
1234 		vq = &vsession->virtqueue[i];
1235 		if (vq->tasks == NULL) {
1236 			continue;
1237 		}
1238 
1239 		spdk_free(vq->tasks);
1240 		vq->tasks = NULL;
1241 	}
1242 }
1243 
1244 static int
1245 alloc_task_pool(struct spdk_vhost_blk_session *bvsession)
1246 {
1247 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1248 	struct spdk_vhost_virtqueue *vq;
1249 	struct spdk_vhost_user_blk_task *task;
1250 	uint32_t task_cnt;
1251 	uint16_t i;
1252 	uint32_t j;
1253 
1254 	for (i = 0; i < vsession->max_queues; i++) {
1255 		vq = &vsession->virtqueue[i];
1256 		if (vq->vring.desc == NULL) {
1257 			continue;
1258 		}
1259 
1260 		task_cnt = vq->vring.size;
1261 		if (task_cnt > SPDK_VHOST_MAX_VQ_SIZE) {
1262 			/* sanity check */
1263 			SPDK_ERRLOG("%s: virtqueue %"PRIu16" is too big. (size = %"PRIu32", max = %"PRIu32")\n",
1264 				    vsession->name, i, task_cnt, SPDK_VHOST_MAX_VQ_SIZE);
1265 			free_task_pool(bvsession);
1266 			return -1;
1267 		}
1268 		vq->tasks = spdk_zmalloc(sizeof(struct spdk_vhost_user_blk_task) * task_cnt,
1269 					 SPDK_CACHE_LINE_SIZE, NULL,
1270 					 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1271 		if (vq->tasks == NULL) {
1272 			SPDK_ERRLOG("%s: failed to allocate %"PRIu32" tasks for virtqueue %"PRIu16"\n",
1273 				    vsession->name, task_cnt, i);
1274 			free_task_pool(bvsession);
1275 			return -1;
1276 		}
1277 
1278 		for (j = 0; j < task_cnt; j++) {
1279 			task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[j];
1280 			task->bvsession = bvsession;
1281 			task->req_idx = j;
1282 			task->vq = vq;
1283 		}
1284 	}
1285 
1286 	return 0;
1287 }
1288 
1289 static int
1290 vhost_blk_start_cb(struct spdk_vhost_dev *vdev,
1291 		   struct spdk_vhost_session *vsession, void *unused)
1292 {
1293 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1294 	struct spdk_vhost_blk_dev *bvdev;
1295 	int i, rc = 0;
1296 
1297 	bvdev = to_blk_dev(vdev);
1298 	assert(bvdev != NULL);
1299 	bvsession->bvdev = bvdev;
1300 
1301 	/* validate all I/O queues are in a contiguous index range */
1302 	for (i = 0; i < vsession->max_queues; i++) {
1303 		/* vring.desc and vring.desc_packed are in a union struct
1304 		 * so q->vring.desc can replace q->vring.desc_packed.
1305 		 */
1306 		if (vsession->virtqueue[i].vring.desc == NULL) {
1307 			SPDK_ERRLOG("%s: queue %"PRIu32" is empty\n", vsession->name, i);
1308 			rc = -1;
1309 			goto out;
1310 		}
1311 	}
1312 
1313 	rc = alloc_task_pool(bvsession);
1314 	if (rc != 0) {
1315 		SPDK_ERRLOG("%s: failed to alloc task pool.\n", vsession->name);
1316 		goto out;
1317 	}
1318 
1319 	if (bvdev->bdev) {
1320 		bvsession->io_channel = vhost_blk_get_io_channel(vdev);
1321 		if (!bvsession->io_channel) {
1322 			free_task_pool(bvsession);
1323 			SPDK_ERRLOG("%s: I/O channel allocation failed\n", vsession->name);
1324 			rc = -1;
1325 			goto out;
1326 		}
1327 	}
1328 
1329 	if (spdk_interrupt_mode_is_enabled()) {
1330 		if (bvdev->bdev) {
1331 			rc = vhost_blk_session_register_interrupts(bvsession,
1332 					vdev_vq_worker,
1333 					"vdev_vq_worker");
1334 		} else {
1335 			rc = vhost_blk_session_register_interrupts(bvsession,
1336 					no_bdev_vdev_vq_worker,
1337 					"no_bdev_vdev_vq_worker");
1338 		}
1339 
1340 		if (rc) {
1341 			SPDK_ERRLOG("%s: Interrupt register failed\n", vsession->name);
1342 			goto out;
1343 		}
1344 	}
1345 
1346 	if (bvdev->bdev) {
1347 		bvsession->requestq_poller = SPDK_POLLER_REGISTER(vdev_worker, bvsession, 0);
1348 	} else {
1349 		bvsession->requestq_poller = SPDK_POLLER_REGISTER(no_bdev_vdev_worker, bvsession, 0);
1350 	}
1351 	SPDK_INFOLOG(vhost, "%s: started poller on lcore %d\n",
1352 		     vsession->name, spdk_env_get_current_core());
1353 
1354 	spdk_poller_register_interrupt(bvsession->requestq_poller, vhost_blk_poller_set_interrupt_mode,
1355 				       bvsession);
1356 
1357 out:
1358 	vhost_user_session_start_done(vsession, rc);
1359 	return rc;
1360 }
1361 
1362 static int
1363 vhost_blk_start(struct spdk_vhost_session *vsession)
1364 {
1365 	return vhost_user_session_send_event(vsession, vhost_blk_start_cb,
1366 					     3, "start session");
1367 }
1368 
1369 static int
1370 destroy_session_poller_cb(void *arg)
1371 {
1372 	struct spdk_vhost_blk_session *bvsession = arg;
1373 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1374 	int i;
1375 
1376 	if (vsession->task_cnt > 0 || spdk_vhost_trylock() != 0) {
1377 		assert(vsession->stop_retry_count > 0);
1378 		vsession->stop_retry_count--;
1379 		if (vsession->stop_retry_count == 0) {
1380 			SPDK_ERRLOG("%s: Timedout when destroy session (task_cnt %d)\n", vsession->name,
1381 				    vsession->task_cnt);
1382 			spdk_poller_unregister(&bvsession->stop_poller);
1383 			vhost_user_session_stop_done(vsession, -ETIMEDOUT);
1384 		}
1385 
1386 		return SPDK_POLLER_BUSY;
1387 	}
1388 
1389 	for (i = 0; i < vsession->max_queues; i++) {
1390 		vsession->virtqueue[i].next_event_time = 0;
1391 		vhost_vq_used_signal(vsession, &vsession->virtqueue[i]);
1392 	}
1393 
1394 	SPDK_INFOLOG(vhost, "%s: stopping poller on lcore %d\n",
1395 		     vsession->name, spdk_env_get_current_core());
1396 
1397 	if (bvsession->io_channel) {
1398 		vhost_blk_put_io_channel(bvsession->io_channel);
1399 		bvsession->io_channel = NULL;
1400 	}
1401 
1402 	free_task_pool(bvsession);
1403 	spdk_poller_unregister(&bvsession->stop_poller);
1404 	vhost_user_session_stop_done(vsession, 0);
1405 
1406 	spdk_vhost_unlock();
1407 	return SPDK_POLLER_BUSY;
1408 }
1409 
1410 static int
1411 vhost_blk_stop_cb(struct spdk_vhost_dev *vdev,
1412 		  struct spdk_vhost_session *vsession, void *unused)
1413 {
1414 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1415 
1416 	spdk_poller_unregister(&bvsession->requestq_poller);
1417 
1418 	if (vsession->virtqueue[0].intr) {
1419 		vhost_blk_session_unregister_interrupts(bvsession);
1420 	}
1421 
1422 	/* vhost_user_session_send_event timeout is 3 seconds, here set retry within 4 seconds */
1423 	bvsession->vsession.stop_retry_count = 4000;
1424 	bvsession->stop_poller = SPDK_POLLER_REGISTER(destroy_session_poller_cb,
1425 				 bvsession, 1000);
1426 	return 0;
1427 }
1428 
1429 static int
1430 vhost_blk_stop(struct spdk_vhost_session *vsession)
1431 {
1432 	return vhost_user_session_send_event(vsession, vhost_blk_stop_cb,
1433 					     3, "stop session");
1434 }
1435 
1436 static void
1437 vhost_blk_dump_info_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_ctx *w)
1438 {
1439 	struct spdk_vhost_blk_dev *bvdev;
1440 
1441 	bvdev = to_blk_dev(vdev);
1442 	assert(bvdev != NULL);
1443 
1444 	spdk_json_write_named_object_begin(w, "block");
1445 
1446 	spdk_json_write_named_bool(w, "readonly", bvdev->readonly);
1447 
1448 	spdk_json_write_name(w, "bdev");
1449 	if (bvdev->bdev) {
1450 		spdk_json_write_string(w, spdk_bdev_get_name(bvdev->bdev));
1451 	} else {
1452 		spdk_json_write_null(w);
1453 	}
1454 
1455 	spdk_json_write_object_end(w);
1456 }
1457 
1458 static void
1459 vhost_blk_write_config_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_ctx *w)
1460 {
1461 	struct spdk_vhost_blk_dev *bvdev;
1462 
1463 	bvdev = to_blk_dev(vdev);
1464 	assert(bvdev != NULL);
1465 
1466 	if (!bvdev->bdev) {
1467 		return;
1468 	}
1469 
1470 	spdk_json_write_object_begin(w);
1471 	spdk_json_write_named_string(w, "method", "vhost_create_blk_controller");
1472 
1473 	spdk_json_write_named_object_begin(w, "params");
1474 	spdk_json_write_named_string(w, "ctrlr", vdev->name);
1475 	spdk_json_write_named_string(w, "dev_name", spdk_bdev_get_name(bvdev->bdev));
1476 	spdk_json_write_named_string(w, "cpumask",
1477 				     spdk_cpuset_fmt(spdk_thread_get_cpumask(vdev->thread)));
1478 	spdk_json_write_named_bool(w, "readonly", bvdev->readonly);
1479 	spdk_json_write_object_end(w);
1480 
1481 	spdk_json_write_object_end(w);
1482 }
1483 
1484 static int vhost_blk_destroy(struct spdk_vhost_dev *dev);
1485 
1486 static int
1487 vhost_blk_get_config(struct spdk_vhost_dev *vdev, uint8_t *config,
1488 		     uint32_t len)
1489 {
1490 	struct virtio_blk_config blkcfg;
1491 	struct spdk_vhost_blk_dev *bvdev;
1492 	struct spdk_bdev *bdev;
1493 	uint32_t blk_size;
1494 	uint64_t blkcnt;
1495 
1496 	memset(&blkcfg, 0, sizeof(blkcfg));
1497 	bvdev = to_blk_dev(vdev);
1498 	assert(bvdev != NULL);
1499 	bdev = bvdev->bdev;
1500 	if (bdev == NULL) {
1501 		/* We can't just return -1 here as this GET_CONFIG message might
1502 		 * be caused by a QEMU VM reboot. Returning -1 will indicate an
1503 		 * error to QEMU, who might then decide to terminate itself.
1504 		 * We don't want that. A simple reboot shouldn't break the system.
1505 		 *
1506 		 * Presenting a block device with block size 0 and block count 0
1507 		 * doesn't cause any problems on QEMU side and the virtio-pci
1508 		 * device is even still available inside the VM, but there will
1509 		 * be no block device created for it - the kernel drivers will
1510 		 * silently reject it.
1511 		 */
1512 		blk_size = 0;
1513 		blkcnt = 0;
1514 	} else {
1515 		blk_size = spdk_bdev_get_block_size(bdev);
1516 		blkcnt = spdk_bdev_get_num_blocks(bdev);
1517 		if (spdk_bdev_get_buf_align(bdev) > 1) {
1518 			blkcfg.size_max = SPDK_BDEV_LARGE_BUF_MAX_SIZE;
1519 			blkcfg.seg_max = spdk_min(SPDK_VHOST_IOVS_MAX - 2 - 1, BDEV_IO_NUM_CHILD_IOV - 2 - 1);
1520 		} else {
1521 			blkcfg.size_max = 131072;
1522 			/*  -2 for REQ and RESP and -1 for region boundary splitting */
1523 			blkcfg.seg_max = SPDK_VHOST_IOVS_MAX - 2 - 1;
1524 		}
1525 	}
1526 
1527 	blkcfg.blk_size = blk_size;
1528 	/* minimum I/O size in blocks */
1529 	blkcfg.min_io_size = 1;
1530 	/* expressed in 512 Bytes sectors */
1531 	blkcfg.capacity = (blkcnt * blk_size) / 512;
1532 	/* QEMU can overwrite this value when started */
1533 	blkcfg.num_queues = SPDK_VHOST_MAX_VQUEUES;
1534 
1535 	if (bdev && spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1536 		/* 16MiB, expressed in 512 Bytes */
1537 		blkcfg.max_discard_sectors = 32768;
1538 		blkcfg.max_discard_seg = 1;
1539 		blkcfg.discard_sector_alignment = blk_size / 512;
1540 	}
1541 	if (bdev && spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1542 		blkcfg.max_write_zeroes_sectors = 32768;
1543 		blkcfg.max_write_zeroes_seg = 1;
1544 	}
1545 
1546 	memcpy(config, &blkcfg, spdk_min(len, sizeof(blkcfg)));
1547 
1548 	return 0;
1549 }
1550 
1551 static const struct spdk_vhost_user_dev_backend vhost_blk_user_device_backend = {
1552 	.session_ctx_size = sizeof(struct spdk_vhost_blk_session) - sizeof(struct spdk_vhost_session),
1553 	.start_session =  vhost_blk_start,
1554 	.stop_session = vhost_blk_stop,
1555 };
1556 
1557 static const struct spdk_vhost_dev_backend vhost_blk_device_backend = {
1558 	.vhost_get_config = vhost_blk_get_config,
1559 	.dump_info_json = vhost_blk_dump_info_json,
1560 	.write_config_json = vhost_blk_write_config_json,
1561 	.remove_device = vhost_blk_destroy,
1562 };
1563 
1564 int
1565 spdk_vhost_blk_construct(const char *name, const char *cpumask, const char *dev_name,
1566 			 const struct spdk_json_val *params)
1567 {
1568 	struct rpc_vhost_blk req = {0};
1569 	struct spdk_vhost_blk_dev *bvdev = NULL;
1570 	struct spdk_vhost_dev *vdev;
1571 	struct spdk_bdev *bdev;
1572 	int ret = 0;
1573 
1574 	spdk_vhost_lock();
1575 
1576 	if (spdk_json_decode_object_relaxed(params, rpc_construct_vhost_blk,
1577 					    SPDK_COUNTOF(rpc_construct_vhost_blk),
1578 					    &req)) {
1579 		SPDK_DEBUGLOG(vhost_blk, "spdk_json_decode_object failed\n");
1580 		ret = -EINVAL;
1581 		goto out;
1582 	}
1583 
1584 	bvdev = calloc(1, sizeof(*bvdev));
1585 	if (bvdev == NULL) {
1586 		ret = -ENOMEM;
1587 		goto out;
1588 	}
1589 
1590 	ret = spdk_bdev_open_ext(dev_name, true, bdev_event_cb, bvdev, &bvdev->bdev_desc);
1591 	if (ret != 0) {
1592 		SPDK_ERRLOG("%s: could not open bdev '%s', error=%d\n",
1593 			    name, dev_name, ret);
1594 		goto out;
1595 	}
1596 	bdev = spdk_bdev_desc_get_bdev(bvdev->bdev_desc);
1597 
1598 	vdev = &bvdev->vdev;
1599 	vdev->virtio_features = SPDK_VHOST_BLK_FEATURES_BASE;
1600 	vdev->disabled_features = SPDK_VHOST_BLK_DISABLED_FEATURES;
1601 	vdev->protocol_features = SPDK_VHOST_BLK_PROTOCOL_FEATURES;
1602 	vdev->packed_ring_recovery = false;
1603 
1604 	if (req.packed_ring) {
1605 		vdev->virtio_features |= (uint64_t)req.packed_ring << VIRTIO_F_RING_PACKED;
1606 		vdev->packed_ring_recovery = req.packed_ring_recovery;
1607 	}
1608 
1609 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1610 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_DISCARD);
1611 	}
1612 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1613 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_WRITE_ZEROES);
1614 	}
1615 	if (req.readonly) {
1616 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_RO);
1617 	}
1618 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_FLUSH)) {
1619 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_FLUSH);
1620 	}
1621 
1622 	/*
1623 	 * When starting qemu with multiqueue enable, the vhost device will
1624 	 * be started/stopped many times, related to the queues num, as the
1625 	 * exact number of queues used for this device is not known at the time.
1626 	 * The target has to stop and start the device once got a valid IO queue.
1627 	 * When stoping and starting the vhost device, the backend bdev io device
1628 	 * will be deleted and created repeatedly.
1629 	 * Hold a bdev reference so that in the struct spdk_vhost_blk_dev, so that
1630 	 * the io device will not be deleted.
1631 	 */
1632 	bvdev->dummy_io_channel = spdk_bdev_get_io_channel(bvdev->bdev_desc);
1633 
1634 	bvdev->bdev = bdev;
1635 	bvdev->readonly = req.readonly;
1636 	ret = vhost_dev_register(vdev, name, cpumask, &vhost_blk_device_backend,
1637 				 &vhost_blk_user_device_backend);
1638 	if (ret != 0) {
1639 		spdk_put_io_channel(bvdev->dummy_io_channel);
1640 		spdk_bdev_close(bvdev->bdev_desc);
1641 		goto out;
1642 	}
1643 
1644 	SPDK_INFOLOG(vhost, "%s: using bdev '%s'\n", name, dev_name);
1645 out:
1646 	if (ret != 0 && bvdev) {
1647 		free(bvdev);
1648 	}
1649 	spdk_vhost_unlock();
1650 	return ret;
1651 }
1652 
1653 static int
1654 vhost_blk_destroy(struct spdk_vhost_dev *vdev)
1655 {
1656 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1657 	int rc;
1658 
1659 	assert(bvdev != NULL);
1660 
1661 	rc = vhost_dev_unregister(&bvdev->vdev);
1662 	if (rc != 0) {
1663 		return rc;
1664 	}
1665 
1666 	/* if the bdev is removed, don't need call spdk_put_io_channel. */
1667 	if (bvdev->bdev) {
1668 		spdk_put_io_channel(bvdev->dummy_io_channel);
1669 	}
1670 
1671 	if (bvdev->bdev_desc) {
1672 		spdk_bdev_close(bvdev->bdev_desc);
1673 		bvdev->bdev_desc = NULL;
1674 	}
1675 	bvdev->bdev = NULL;
1676 
1677 	free(bvdev);
1678 	return 0;
1679 }
1680 
1681 struct spdk_io_channel *
1682 vhost_blk_get_io_channel(struct spdk_vhost_dev *vdev)
1683 {
1684 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1685 
1686 	return spdk_bdev_get_io_channel(bvdev->bdev_desc);
1687 }
1688 
1689 void
1690 vhost_blk_put_io_channel(struct spdk_io_channel *ch)
1691 {
1692 	spdk_put_io_channel(ch);
1693 }
1694 
1695 SPDK_LOG_REGISTER_COMPONENT(vhost_blk)
1696 SPDK_LOG_REGISTER_COMPONENT(vhost_blk_data)
1697