xref: /spdk/lib/vhost/vhost_blk.c (revision 0098e636761237b77c12c30c2408263a5d2260cc)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright(c) Intel Corporation. All rights reserved.
3  *   All rights reserved.
4  */
5 
6 #include <linux/virtio_blk.h>
7 
8 #include "spdk/env.h"
9 #include "spdk/bdev.h"
10 #include "spdk/bdev_module.h"
11 #include "spdk/thread.h"
12 #include "spdk/likely.h"
13 #include "spdk/string.h"
14 #include "spdk/util.h"
15 #include "spdk/vhost.h"
16 
17 #include "vhost_internal.h"
18 #include <rte_version.h>
19 
20 /* Minimal set of features supported by every SPDK VHOST-BLK device */
21 #define SPDK_VHOST_BLK_FEATURES_BASE (SPDK_VHOST_FEATURES | \
22 		(1ULL << VIRTIO_BLK_F_SIZE_MAX) | (1ULL << VIRTIO_BLK_F_SEG_MAX) | \
23 		(1ULL << VIRTIO_BLK_F_GEOMETRY) | (1ULL << VIRTIO_BLK_F_BLK_SIZE) | \
24 		(1ULL << VIRTIO_BLK_F_TOPOLOGY) | (1ULL << VIRTIO_BLK_F_BARRIER)  | \
25 		(1ULL << VIRTIO_BLK_F_SCSI)     | (1ULL << VIRTIO_BLK_F_CONFIG_WCE) | \
26 		(1ULL << VIRTIO_BLK_F_MQ))
27 
28 /* Not supported features */
29 #define SPDK_VHOST_BLK_DISABLED_FEATURES (SPDK_VHOST_DISABLED_FEATURES | \
30 		(1ULL << VIRTIO_BLK_F_GEOMETRY) | (1ULL << VIRTIO_BLK_F_CONFIG_WCE) | \
31 		(1ULL << VIRTIO_BLK_F_BARRIER)  | (1ULL << VIRTIO_BLK_F_SCSI))
32 
33 /* Vhost-blk support protocol features */
34 #define SPDK_VHOST_BLK_PROTOCOL_FEATURES ((1ULL << VHOST_USER_PROTOCOL_F_CONFIG) | \
35 		(1ULL << VHOST_USER_PROTOCOL_F_INFLIGHT_SHMFD))
36 
37 #define VIRTIO_BLK_DEFAULT_TRANSPORT "vhost_user_blk"
38 
39 struct spdk_vhost_user_blk_task {
40 	struct spdk_vhost_blk_task blk_task;
41 	struct spdk_vhost_blk_session *bvsession;
42 	struct spdk_vhost_virtqueue *vq;
43 
44 	uint16_t req_idx;
45 	uint16_t num_descs;
46 	uint16_t buffer_id;
47 	uint16_t inflight_head;
48 
49 	/* If set, the task is currently used for I/O processing. */
50 	bool used;
51 };
52 
53 struct spdk_vhost_blk_dev {
54 	struct spdk_vhost_dev vdev;
55 	struct spdk_bdev *bdev;
56 	struct spdk_bdev_desc *bdev_desc;
57 	const struct spdk_virtio_blk_transport_ops *ops;
58 
59 	/* dummy_io_channel is used to hold a bdev reference */
60 	struct spdk_io_channel *dummy_io_channel;
61 	bool readonly;
62 };
63 
64 struct spdk_vhost_blk_session {
65 	/* The parent session must be the very first field in this struct */
66 	struct spdk_vhost_session vsession;
67 	struct spdk_vhost_blk_dev *bvdev;
68 	struct spdk_poller *requestq_poller;
69 	struct spdk_io_channel *io_channel;
70 	struct spdk_poller *stop_poller;
71 };
72 
73 /* forward declaration */
74 static const struct spdk_vhost_dev_backend vhost_blk_device_backend;
75 
76 static void vhost_user_blk_request_finish(uint8_t status, struct spdk_vhost_blk_task *task,
77 		void *cb_arg);
78 
79 static int
80 vhost_user_process_blk_request(struct spdk_vhost_user_blk_task *user_task)
81 {
82 	struct spdk_vhost_blk_session *bvsession = user_task->bvsession;
83 	struct spdk_vhost_dev *vdev = &bvsession->bvdev->vdev;
84 
85 	return virtio_blk_process_request(vdev, bvsession->io_channel, &user_task->blk_task,
86 					  vhost_user_blk_request_finish, NULL);
87 }
88 
89 static struct spdk_vhost_blk_dev *
90 to_blk_dev(struct spdk_vhost_dev *vdev)
91 {
92 	if (vdev == NULL) {
93 		return NULL;
94 	}
95 
96 	if (vdev->backend->type != VHOST_BACKEND_BLK) {
97 		SPDK_ERRLOG("%s: not a vhost-blk device\n", vdev->name);
98 		return NULL;
99 	}
100 
101 	return SPDK_CONTAINEROF(vdev, struct spdk_vhost_blk_dev, vdev);
102 }
103 
104 struct spdk_bdev *
105 vhost_blk_get_bdev(struct spdk_vhost_dev *vdev)
106 {
107 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
108 
109 	assert(bvdev != NULL);
110 
111 	return bvdev->bdev;
112 }
113 
114 static struct spdk_vhost_blk_session *
115 to_blk_session(struct spdk_vhost_session *vsession)
116 {
117 	assert(vsession->vdev->backend->type == VHOST_BACKEND_BLK);
118 	return (struct spdk_vhost_blk_session *)vsession;
119 }
120 
121 static void
122 blk_task_finish(struct spdk_vhost_user_blk_task *task)
123 {
124 	assert(task->bvsession->vsession.task_cnt > 0);
125 	task->bvsession->vsession.task_cnt--;
126 	task->used = false;
127 }
128 
129 static void
130 blk_task_init(struct spdk_vhost_user_blk_task *task)
131 {
132 	struct spdk_vhost_blk_task *blk_task = &task->blk_task;
133 
134 	task->used = true;
135 	blk_task->iovcnt = SPDK_COUNTOF(blk_task->iovs);
136 	blk_task->status = NULL;
137 	blk_task->used_len = 0;
138 	blk_task->payload_size = 0;
139 }
140 
141 static void
142 blk_task_enqueue(struct spdk_vhost_user_blk_task *task)
143 {
144 	if (task->vq->packed.packed_ring) {
145 		vhost_vq_packed_ring_enqueue(&task->bvsession->vsession, task->vq,
146 					     task->num_descs,
147 					     task->buffer_id, task->blk_task.used_len,
148 					     task->inflight_head);
149 	} else {
150 		vhost_vq_used_ring_enqueue(&task->bvsession->vsession, task->vq,
151 					   task->req_idx, task->blk_task.used_len);
152 	}
153 }
154 
155 static void
156 vhost_user_blk_request_finish(uint8_t status, struct spdk_vhost_blk_task *task, void *cb_arg)
157 {
158 	struct spdk_vhost_user_blk_task *user_task;
159 
160 	user_task = SPDK_CONTAINEROF(task, struct spdk_vhost_user_blk_task, blk_task);
161 
162 	blk_task_enqueue(user_task);
163 
164 	SPDK_DEBUGLOG(vhost_blk, "Finished task (%p) req_idx=%d\n status: %" PRIu8"\n",
165 		      user_task, user_task->req_idx, status);
166 	blk_task_finish(user_task);
167 }
168 
169 static void
170 blk_request_finish(uint8_t status, struct spdk_vhost_blk_task *task)
171 {
172 
173 	if (task->status) {
174 		*task->status = status;
175 	}
176 
177 	task->cb(status, task, task->cb_arg);
178 }
179 
180 /*
181  * Process task's descriptor chain and setup data related fields.
182  * Return
183  *   total size of supplied buffers
184  *
185  *   FIXME: Make this function return to rd_cnt and wr_cnt
186  */
187 static int
188 blk_iovs_split_queue_setup(struct spdk_vhost_blk_session *bvsession,
189 			   struct spdk_vhost_virtqueue *vq,
190 			   uint16_t req_idx, struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
191 {
192 	struct spdk_vhost_session *vsession = &bvsession->vsession;
193 	struct spdk_vhost_dev *vdev = vsession->vdev;
194 	struct vring_desc *desc, *desc_table;
195 	uint16_t out_cnt = 0, cnt = 0;
196 	uint32_t desc_table_size, len = 0;
197 	uint32_t desc_handled_cnt;
198 	int rc;
199 
200 	rc = vhost_vq_get_desc(vsession, vq, req_idx, &desc, &desc_table, &desc_table_size);
201 	if (rc != 0) {
202 		SPDK_ERRLOG("%s: invalid descriptor at index %"PRIu16".\n", vdev->name, req_idx);
203 		return -1;
204 	}
205 
206 	desc_handled_cnt = 0;
207 	while (1) {
208 		/*
209 		 * Maximum cnt reached?
210 		 * Should not happen if request is well formatted, otherwise this is a BUG.
211 		 */
212 		if (spdk_unlikely(cnt == *iovs_cnt)) {
213 			SPDK_DEBUGLOG(vhost_blk, "%s: max IOVs in request reached (req_idx = %"PRIu16").\n",
214 				      vsession->name, req_idx);
215 			return -1;
216 		}
217 
218 		if (spdk_unlikely(vhost_vring_desc_to_iov(vsession, iovs, &cnt, desc))) {
219 			SPDK_DEBUGLOG(vhost_blk, "%s: invalid descriptor %" PRIu16" (req_idx = %"PRIu16").\n",
220 				      vsession->name, req_idx, cnt);
221 			return -1;
222 		}
223 
224 		len += desc->len;
225 
226 		out_cnt += vhost_vring_desc_is_wr(desc);
227 
228 		rc = vhost_vring_desc_get_next(&desc, desc_table, desc_table_size);
229 		if (rc != 0) {
230 			SPDK_ERRLOG("%s: descriptor chain at index %"PRIu16" terminated unexpectedly.\n",
231 				    vsession->name, req_idx);
232 			return -1;
233 		} else if (desc == NULL) {
234 			break;
235 		}
236 
237 		desc_handled_cnt++;
238 		if (spdk_unlikely(desc_handled_cnt > desc_table_size)) {
239 			/* Break a cycle and report an error, if any. */
240 			SPDK_ERRLOG("%s: found a cycle in the descriptor chain: desc_table_size = %d, desc_handled_cnt = %d.\n",
241 				    vsession->name, desc_table_size, desc_handled_cnt);
242 			return -1;
243 		}
244 	}
245 
246 	/*
247 	 * There must be least two descriptors.
248 	 * First contain request so it must be readable.
249 	 * Last descriptor contain buffer for response so it must be writable.
250 	 */
251 	if (spdk_unlikely(out_cnt == 0 || cnt < 2)) {
252 		return -1;
253 	}
254 
255 	*length = len;
256 	*iovs_cnt = cnt;
257 	return 0;
258 }
259 
260 static int
261 blk_iovs_packed_desc_setup(struct spdk_vhost_session *vsession,
262 			   struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
263 			   struct vring_packed_desc *desc_table, uint16_t desc_table_size,
264 			   struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
265 {
266 	struct vring_packed_desc *desc;
267 	uint16_t cnt = 0, out_cnt = 0;
268 	uint32_t len = 0;
269 
270 	if (desc_table == NULL) {
271 		desc = &vq->vring.desc_packed[req_idx];
272 	} else {
273 		req_idx = 0;
274 		desc = desc_table;
275 	}
276 
277 	while (1) {
278 		/*
279 		 * Maximum cnt reached?
280 		 * Should not happen if request is well formatted, otherwise this is a BUG.
281 		 */
282 		if (spdk_unlikely(cnt == *iovs_cnt)) {
283 			SPDK_ERRLOG("%s: max IOVs in request reached (req_idx = %"PRIu16").\n",
284 				    vsession->name, req_idx);
285 			return -EINVAL;
286 		}
287 
288 		if (spdk_unlikely(vhost_vring_packed_desc_to_iov(vsession, iovs, &cnt, desc))) {
289 			SPDK_ERRLOG("%s: invalid descriptor %" PRIu16" (req_idx = %"PRIu16").\n",
290 				    vsession->name, req_idx, cnt);
291 			return -EINVAL;
292 		}
293 
294 		len += desc->len;
295 		out_cnt += vhost_vring_packed_desc_is_wr(desc);
296 
297 		/* desc is NULL means we reach the last desc of this request */
298 		vhost_vring_packed_desc_get_next(&desc, &req_idx, vq, desc_table, desc_table_size);
299 		if (desc == NULL) {
300 			break;
301 		}
302 	}
303 
304 	/*
305 	 * There must be least two descriptors.
306 	 * First contain request so it must be readable.
307 	 * Last descriptor contain buffer for response so it must be writable.
308 	 */
309 	if (spdk_unlikely(out_cnt == 0 || cnt < 2)) {
310 		return -EINVAL;
311 	}
312 
313 	*length = len;
314 	*iovs_cnt = cnt;
315 
316 	return 0;
317 }
318 
319 static int
320 blk_iovs_packed_queue_setup(struct spdk_vhost_blk_session *bvsession,
321 			    struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
322 			    struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
323 {
324 	struct spdk_vhost_session *vsession = &bvsession->vsession;
325 	struct spdk_vhost_dev *vdev = vsession->vdev;
326 	struct vring_packed_desc *desc = NULL, *desc_table;
327 	uint32_t desc_table_size;
328 	int rc;
329 
330 	rc = vhost_vq_get_desc_packed(vsession, vq, req_idx, &desc,
331 				      &desc_table, &desc_table_size);
332 	if (spdk_unlikely(rc != 0)) {
333 		SPDK_ERRLOG("%s: Invalid descriptor at index %"PRIu16".\n", vdev->name, req_idx);
334 		return rc;
335 	}
336 
337 	return blk_iovs_packed_desc_setup(vsession, vq, req_idx, desc_table, desc_table_size,
338 					  iovs, iovs_cnt, length);
339 }
340 
341 static int
342 blk_iovs_inflight_queue_setup(struct spdk_vhost_blk_session *bvsession,
343 			      struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
344 			      struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
345 {
346 	struct spdk_vhost_session *vsession = &bvsession->vsession;
347 	struct spdk_vhost_dev *vdev = vsession->vdev;
348 	spdk_vhost_inflight_desc *inflight_desc;
349 	struct vring_packed_desc *desc_table;
350 	uint16_t out_cnt = 0, cnt = 0;
351 	uint32_t desc_table_size, len = 0;
352 	int rc = 0;
353 
354 	rc = vhost_inflight_queue_get_desc(vsession, vq->vring_inflight.inflight_packed->desc,
355 					   req_idx, &inflight_desc, &desc_table, &desc_table_size);
356 	if (spdk_unlikely(rc != 0)) {
357 		SPDK_ERRLOG("%s: Invalid descriptor at index %"PRIu16".\n", vdev->name, req_idx);
358 		return rc;
359 	}
360 
361 	if (desc_table != NULL) {
362 		return blk_iovs_packed_desc_setup(vsession, vq, req_idx, desc_table, desc_table_size,
363 						  iovs, iovs_cnt, length);
364 	}
365 
366 	while (1) {
367 		/*
368 		 * Maximum cnt reached?
369 		 * Should not happen if request is well formatted, otherwise this is a BUG.
370 		 */
371 		if (spdk_unlikely(cnt == *iovs_cnt)) {
372 			SPDK_ERRLOG("%s: max IOVs in request reached (req_idx = %"PRIu16").\n",
373 				    vsession->name, req_idx);
374 			return -EINVAL;
375 		}
376 
377 		if (spdk_unlikely(vhost_vring_inflight_desc_to_iov(vsession, iovs, &cnt, inflight_desc))) {
378 			SPDK_ERRLOG("%s: invalid descriptor %" PRIu16" (req_idx = %"PRIu16").\n",
379 				    vsession->name, req_idx, cnt);
380 			return -EINVAL;
381 		}
382 
383 		len += inflight_desc->len;
384 		out_cnt += vhost_vring_inflight_desc_is_wr(inflight_desc);
385 
386 		/* Without F_NEXT means it's the last desc */
387 		if ((inflight_desc->flags & VRING_DESC_F_NEXT) == 0) {
388 			break;
389 		}
390 
391 		inflight_desc = &vq->vring_inflight.inflight_packed->desc[inflight_desc->next];
392 	}
393 
394 	/*
395 	 * There must be least two descriptors.
396 	 * First contain request so it must be readable.
397 	 * Last descriptor contain buffer for response so it must be writable.
398 	 */
399 	if (spdk_unlikely(out_cnt == 0 || cnt < 2)) {
400 		return -EINVAL;
401 	}
402 
403 	*length = len;
404 	*iovs_cnt = cnt;
405 
406 	return 0;
407 }
408 
409 static void
410 blk_request_complete_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
411 {
412 	struct spdk_vhost_blk_task *task = cb_arg;
413 
414 	spdk_bdev_free_io(bdev_io);
415 	blk_request_finish(success ? VIRTIO_BLK_S_OK : VIRTIO_BLK_S_IOERR, task);
416 }
417 
418 static void
419 blk_request_resubmit(void *arg)
420 {
421 	struct spdk_vhost_blk_task *task = arg;
422 	int rc = 0;
423 
424 	rc = virtio_blk_process_request(task->bdev_io_wait_vdev, task->bdev_io_wait_ch, task,
425 					task->cb, task->cb_arg);
426 	if (rc == 0) {
427 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p resubmitted ======\n", task);
428 	} else {
429 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p failed ======\n", task);
430 	}
431 }
432 
433 static inline void
434 blk_request_queue_io(struct spdk_vhost_dev *vdev, struct spdk_io_channel *ch,
435 		     struct spdk_vhost_blk_task *task)
436 {
437 	int rc;
438 	struct spdk_bdev *bdev = vhost_blk_get_bdev(vdev);
439 
440 	task->bdev_io_wait.bdev = bdev;
441 	task->bdev_io_wait.cb_fn = blk_request_resubmit;
442 	task->bdev_io_wait.cb_arg = task;
443 	task->bdev_io_wait_ch = ch;
444 	task->bdev_io_wait_vdev = vdev;
445 
446 	rc = spdk_bdev_queue_io_wait(bdev, ch, &task->bdev_io_wait);
447 	if (rc != 0) {
448 		blk_request_finish(VIRTIO_BLK_S_IOERR, task);
449 	}
450 }
451 
452 int
453 virtio_blk_process_request(struct spdk_vhost_dev *vdev, struct spdk_io_channel *ch,
454 			   struct spdk_vhost_blk_task *task, virtio_blk_request_cb cb, void *cb_arg)
455 {
456 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
457 	struct virtio_blk_outhdr req;
458 	struct virtio_blk_discard_write_zeroes *desc;
459 	struct iovec *iov;
460 	uint32_t type;
461 	uint64_t flush_bytes;
462 	uint32_t payload_len;
463 	uint16_t iovcnt;
464 	int rc;
465 
466 	task->cb = cb;
467 	task->cb_arg = cb_arg;
468 
469 	iov = &task->iovs[0];
470 	if (spdk_unlikely(iov->iov_len != sizeof(req))) {
471 		SPDK_DEBUGLOG(vhost_blk,
472 			      "First descriptor size is %zu but expected %zu (task = %p).\n",
473 			      iov->iov_len, sizeof(req), task);
474 		blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
475 		return -1;
476 	}
477 
478 	/* Some SeaBIOS versions don't align the virtio_blk_outhdr on an 8-byte boundary, which
479 	 * triggers ubsan errors.  So copy this small 16-byte structure to the stack to workaround
480 	 * this problem.
481 	 */
482 	memcpy(&req, iov->iov_base, sizeof(req));
483 
484 	iov = &task->iovs[task->iovcnt - 1];
485 	if (spdk_unlikely(iov->iov_len != 1)) {
486 		SPDK_DEBUGLOG(vhost_blk,
487 			      "Last descriptor size is %zu but expected %d (task = %p).\n",
488 			      iov->iov_len, 1, task);
489 		blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
490 		return -1;
491 	}
492 
493 	payload_len = task->payload_size;
494 	task->status = iov->iov_base;
495 	payload_len -= sizeof(req) + sizeof(*task->status);
496 	iovcnt = task->iovcnt - 2;
497 
498 	type = req.type;
499 #ifdef VIRTIO_BLK_T_BARRIER
500 	/* Don't care about barrier for now (as QEMU's virtio-blk do). */
501 	type &= ~VIRTIO_BLK_T_BARRIER;
502 #endif
503 
504 	switch (type) {
505 	case VIRTIO_BLK_T_IN:
506 	case VIRTIO_BLK_T_OUT:
507 		if (spdk_unlikely(payload_len == 0 || (payload_len & (512 - 1)) != 0)) {
508 			SPDK_ERRLOG("%s - passed IO buffer is not multiple of 512b (task = %p).\n",
509 				    type ? "WRITE" : "READ", task);
510 			blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
511 			return -1;
512 		}
513 
514 		if (type == VIRTIO_BLK_T_IN) {
515 			task->used_len = payload_len + sizeof(*task->status);
516 			rc = spdk_bdev_readv(bvdev->bdev_desc, ch,
517 					     &task->iovs[1], iovcnt, req.sector * 512,
518 					     payload_len, blk_request_complete_cb, task);
519 		} else if (!bvdev->readonly) {
520 			task->used_len = sizeof(*task->status);
521 			rc = spdk_bdev_writev(bvdev->bdev_desc, ch,
522 					      &task->iovs[1], iovcnt, req.sector * 512,
523 					      payload_len, blk_request_complete_cb, task);
524 		} else {
525 			SPDK_DEBUGLOG(vhost_blk, "Device is in read-only mode!\n");
526 			rc = -1;
527 		}
528 
529 		if (rc) {
530 			if (rc == -ENOMEM) {
531 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
532 				blk_request_queue_io(vdev, ch, task);
533 			} else {
534 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
535 				return -1;
536 			}
537 		}
538 		break;
539 	case VIRTIO_BLK_T_DISCARD:
540 		desc = task->iovs[1].iov_base;
541 		if (payload_len != sizeof(*desc)) {
542 			SPDK_NOTICELOG("Invalid discard payload size: %u\n", payload_len);
543 			blk_request_finish(VIRTIO_BLK_S_IOERR, task);
544 			return -1;
545 		}
546 
547 		if (desc->flags & VIRTIO_BLK_WRITE_ZEROES_FLAG_UNMAP) {
548 			SPDK_ERRLOG("UNMAP flag is only used for WRITE ZEROES command\n");
549 			blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
550 			return -1;
551 		}
552 
553 		rc = spdk_bdev_unmap(bvdev->bdev_desc, ch,
554 				     desc->sector * 512, desc->num_sectors * 512,
555 				     blk_request_complete_cb, task);
556 		if (rc) {
557 			if (rc == -ENOMEM) {
558 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
559 				blk_request_queue_io(vdev, ch, task);
560 			} else {
561 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
562 				return -1;
563 			}
564 		}
565 		break;
566 	case VIRTIO_BLK_T_WRITE_ZEROES:
567 		desc = task->iovs[1].iov_base;
568 		if (payload_len != sizeof(*desc)) {
569 			SPDK_NOTICELOG("Invalid write zeroes payload size: %u\n", payload_len);
570 			blk_request_finish(VIRTIO_BLK_S_IOERR, task);
571 			return -1;
572 		}
573 
574 		/* Unmap this range, SPDK doesn't support it, kernel will enable this flag by default
575 		 * without checking unmap feature is negotiated or not, the flag isn't mandatory, so
576 		 * just print a warning.
577 		 */
578 		if (desc->flags & VIRTIO_BLK_WRITE_ZEROES_FLAG_UNMAP) {
579 			SPDK_WARNLOG("Ignore the unmap flag for WRITE ZEROES from %"PRIx64", len %"PRIx64"\n",
580 				     (uint64_t)desc->sector * 512, (uint64_t)desc->num_sectors * 512);
581 		}
582 
583 		rc = spdk_bdev_write_zeroes(bvdev->bdev_desc, ch,
584 					    desc->sector * 512, desc->num_sectors * 512,
585 					    blk_request_complete_cb, task);
586 		if (rc) {
587 			if (rc == -ENOMEM) {
588 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
589 				blk_request_queue_io(vdev, ch, task);
590 			} else {
591 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
592 				return -1;
593 			}
594 		}
595 		break;
596 	case VIRTIO_BLK_T_FLUSH:
597 		flush_bytes = spdk_bdev_get_num_blocks(bvdev->bdev) * spdk_bdev_get_block_size(bvdev->bdev);
598 		if (req.sector != 0) {
599 			SPDK_NOTICELOG("sector must be zero for flush command\n");
600 			blk_request_finish(VIRTIO_BLK_S_IOERR, task);
601 			return -1;
602 		}
603 		rc = spdk_bdev_flush(bvdev->bdev_desc, ch,
604 				     0, flush_bytes,
605 				     blk_request_complete_cb, task);
606 		if (rc) {
607 			if (rc == -ENOMEM) {
608 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
609 				blk_request_queue_io(vdev, ch, task);
610 			} else {
611 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
612 				return -1;
613 			}
614 		}
615 		break;
616 	case VIRTIO_BLK_T_GET_ID:
617 		if (!iovcnt || !payload_len) {
618 			blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
619 			return -1;
620 		}
621 		task->used_len = spdk_min((size_t)VIRTIO_BLK_ID_BYTES, task->iovs[1].iov_len);
622 		spdk_strcpy_pad(task->iovs[1].iov_base, spdk_bdev_get_name(bvdev->bdev),
623 				task->used_len, ' ');
624 		blk_request_finish(VIRTIO_BLK_S_OK, task);
625 		break;
626 	default:
627 		SPDK_DEBUGLOG(vhost_blk, "Not supported request type '%"PRIu32"'.\n", type);
628 		blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
629 		return -1;
630 	}
631 
632 	return 0;
633 }
634 
635 static void
636 process_blk_task(struct spdk_vhost_virtqueue *vq, uint16_t req_idx)
637 {
638 	struct spdk_vhost_user_blk_task *task;
639 	struct spdk_vhost_blk_task *blk_task;
640 	int rc;
641 
642 	assert(vq->packed.packed_ring == false);
643 
644 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[req_idx];
645 	blk_task = &task->blk_task;
646 	if (spdk_unlikely(task->used)) {
647 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
648 			    task->bvsession->vsession.name, req_idx);
649 		blk_task->used_len = 0;
650 		blk_task_enqueue(task);
651 		return;
652 	}
653 
654 	task->bvsession->vsession.task_cnt++;
655 
656 	blk_task_init(task);
657 
658 	rc = blk_iovs_split_queue_setup(task->bvsession, vq, task->req_idx,
659 					blk_task->iovs, &blk_task->iovcnt, &blk_task->payload_size);
660 
661 	if (rc) {
662 		SPDK_DEBUGLOG(vhost_blk, "Invalid request (req_idx = %"PRIu16").\n", task->req_idx);
663 		/* Only READ and WRITE are supported for now. */
664 		vhost_user_blk_request_finish(VIRTIO_BLK_S_UNSUPP, blk_task, NULL);
665 		return;
666 	}
667 
668 	if (vhost_user_process_blk_request(task) == 0) {
669 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d submitted ======\n", task,
670 			      req_idx);
671 	} else {
672 		SPDK_ERRLOG("====== Task %p req_idx %d failed ======\n", task, req_idx);
673 	}
674 }
675 
676 static void
677 process_packed_blk_task(struct spdk_vhost_virtqueue *vq, uint16_t req_idx)
678 {
679 	struct spdk_vhost_user_blk_task *task;
680 	struct spdk_vhost_blk_task *blk_task;
681 	uint16_t task_idx = req_idx, num_descs;
682 	int rc;
683 
684 	assert(vq->packed.packed_ring);
685 
686 	/* Packed ring used the buffer_id as the task_idx to get task struct.
687 	 * In kernel driver, it uses the vq->free_head to set the buffer_id so the value
688 	 * must be in the range of 0 ~ vring.size. The free_head value must be unique
689 	 * in the outstanding requests.
690 	 * We can't use the req_idx as the task_idx because the desc can be reused in
691 	 * the next phase even when it's not completed in the previous phase. For example,
692 	 * At phase 0, last_used_idx was 2 and desc0 was not completed.Then after moving
693 	 * phase 1, last_avail_idx is updated to 1. In this case, req_idx can not be used
694 	 * as task_idx because we will know task[0]->used is true at phase 1.
695 	 * The split queue is quite different, the desc would insert into the free list when
696 	 * device completes the request, the driver gets the desc from the free list which
697 	 * ensures the req_idx is unique in the outstanding requests.
698 	 */
699 	task_idx = vhost_vring_packed_desc_get_buffer_id(vq, req_idx, &num_descs);
700 
701 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[task_idx];
702 	blk_task = &task->blk_task;
703 	if (spdk_unlikely(task->used)) {
704 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
705 			    task->bvsession->vsession.name, task_idx);
706 		blk_task->used_len = 0;
707 		blk_task_enqueue(task);
708 		return;
709 	}
710 
711 	task->req_idx = req_idx;
712 	task->num_descs = num_descs;
713 	task->buffer_id = task_idx;
714 
715 	rte_vhost_set_inflight_desc_packed(task->bvsession->vsession.vid, vq->vring_idx,
716 					   req_idx, (req_idx + num_descs - 1) % vq->vring.size,
717 					   &task->inflight_head);
718 
719 	task->bvsession->vsession.task_cnt++;
720 
721 	blk_task_init(task);
722 
723 	rc = blk_iovs_packed_queue_setup(task->bvsession, vq, task->req_idx, blk_task->iovs,
724 					 &blk_task->iovcnt,
725 					 &blk_task->payload_size);
726 	if (rc) {
727 		SPDK_DEBUGLOG(vhost_blk, "Invalid request (req_idx = %"PRIu16").\n", task->req_idx);
728 		/* Only READ and WRITE are supported for now. */
729 		vhost_user_blk_request_finish(VIRTIO_BLK_S_UNSUPP, blk_task, NULL);
730 		return;
731 	}
732 
733 	if (vhost_user_process_blk_request(task) == 0) {
734 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d submitted ======\n", task,
735 			      task_idx);
736 	} else {
737 		SPDK_ERRLOG("====== Task %p req_idx %d failed ======\n", task, task_idx);
738 	}
739 }
740 
741 static void
742 process_packed_inflight_blk_task(struct spdk_vhost_virtqueue *vq,
743 				 uint16_t req_idx)
744 {
745 	spdk_vhost_inflight_desc *desc_array = vq->vring_inflight.inflight_packed->desc;
746 	spdk_vhost_inflight_desc *desc = &desc_array[req_idx];
747 	struct spdk_vhost_user_blk_task *task;
748 	struct spdk_vhost_blk_task *blk_task;
749 	uint16_t task_idx, num_descs;
750 	int rc;
751 
752 	task_idx = desc_array[desc->last].id;
753 	num_descs = desc->num;
754 	/* In packed ring reconnection, we use the last_used_idx as the
755 	 * initial value. So when we process the inflight descs we still
756 	 * need to update the available ring index.
757 	 */
758 	vq->last_avail_idx += num_descs;
759 	if (vq->last_avail_idx >= vq->vring.size) {
760 		vq->last_avail_idx -= vq->vring.size;
761 		vq->packed.avail_phase = !vq->packed.avail_phase;
762 	}
763 
764 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[task_idx];
765 	blk_task = &task->blk_task;
766 	if (spdk_unlikely(task->used)) {
767 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
768 			    task->bvsession->vsession.name, task_idx);
769 		blk_task->used_len = 0;
770 		blk_task_enqueue(task);
771 		return;
772 	}
773 
774 	task->req_idx = req_idx;
775 	task->num_descs = num_descs;
776 	task->buffer_id = task_idx;
777 	/* It's for cleaning inflight entries */
778 	task->inflight_head = req_idx;
779 
780 	task->bvsession->vsession.task_cnt++;
781 
782 	blk_task_init(task);
783 
784 	rc = blk_iovs_inflight_queue_setup(task->bvsession, vq, task->req_idx, blk_task->iovs,
785 					   &blk_task->iovcnt,
786 					   &blk_task->payload_size);
787 	if (rc) {
788 		SPDK_DEBUGLOG(vhost_blk, "Invalid request (req_idx = %"PRIu16").\n", task->req_idx);
789 		/* Only READ and WRITE are supported for now. */
790 		vhost_user_blk_request_finish(VIRTIO_BLK_S_UNSUPP, blk_task, NULL);
791 		return;
792 	}
793 
794 	if (vhost_user_process_blk_request(task) == 0) {
795 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d submitted ======\n", task,
796 			      task_idx);
797 	} else {
798 		SPDK_ERRLOG("====== Task %p req_idx %d failed ======\n", task, task_idx);
799 	}
800 }
801 
802 static void
803 submit_inflight_desc(struct spdk_vhost_blk_session *bvsession,
804 		     struct spdk_vhost_virtqueue *vq)
805 {
806 	struct spdk_vhost_session *vsession;
807 	spdk_vhost_resubmit_info *resubmit;
808 	spdk_vhost_resubmit_desc *resubmit_list;
809 	uint16_t req_idx;
810 	int i;
811 
812 	resubmit = vq->vring_inflight.resubmit_inflight;
813 	if (spdk_likely(resubmit == NULL || resubmit->resubmit_list == NULL ||
814 			resubmit->resubmit_num == 0)) {
815 		return;
816 	}
817 
818 	resubmit_list = resubmit->resubmit_list;
819 	vsession = &bvsession->vsession;
820 
821 	for (i = resubmit->resubmit_num - 1; i >= 0; --i) {
822 		req_idx = resubmit_list[i].index;
823 		SPDK_DEBUGLOG(vhost_blk, "====== Start processing resubmit request idx %"PRIu16"======\n",
824 			      req_idx);
825 
826 		if (spdk_unlikely(req_idx >= vq->vring.size)) {
827 			SPDK_ERRLOG("%s: request idx '%"PRIu16"' exceeds virtqueue size (%"PRIu16").\n",
828 				    vsession->name, req_idx, vq->vring.size);
829 			vhost_vq_used_ring_enqueue(vsession, vq, req_idx, 0);
830 			continue;
831 		}
832 
833 		if (vq->packed.packed_ring) {
834 			process_packed_inflight_blk_task(vq, req_idx);
835 		} else {
836 			process_blk_task(vq, req_idx);
837 		}
838 	}
839 	resubmit->resubmit_num = 0;
840 }
841 
842 static int
843 process_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
844 {
845 	struct spdk_vhost_session *vsession = &bvsession->vsession;
846 	uint16_t reqs[SPDK_VHOST_VQ_MAX_SUBMISSIONS];
847 	uint16_t reqs_cnt, i;
848 
849 	submit_inflight_desc(bvsession, vq);
850 
851 	reqs_cnt = vhost_vq_avail_ring_get(vq, reqs, SPDK_COUNTOF(reqs));
852 	if (!reqs_cnt) {
853 		return 0;
854 	}
855 
856 	for (i = 0; i < reqs_cnt; i++) {
857 		SPDK_DEBUGLOG(vhost_blk, "====== Starting processing request idx %"PRIu16"======\n",
858 			      reqs[i]);
859 
860 		if (spdk_unlikely(reqs[i] >= vq->vring.size)) {
861 			SPDK_ERRLOG("%s: request idx '%"PRIu16"' exceeds virtqueue size (%"PRIu16").\n",
862 				    vsession->name, reqs[i], vq->vring.size);
863 			vhost_vq_used_ring_enqueue(vsession, vq, reqs[i], 0);
864 			continue;
865 		}
866 
867 		rte_vhost_set_inflight_desc_split(vsession->vid, vq->vring_idx, reqs[i]);
868 
869 		process_blk_task(vq, reqs[i]);
870 	}
871 
872 	return reqs_cnt;
873 }
874 
875 static int
876 process_packed_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
877 {
878 	uint16_t i = 0;
879 	uint16_t count = 0;
880 
881 	submit_inflight_desc(bvsession, vq);
882 
883 	while (i++ < SPDK_VHOST_VQ_MAX_SUBMISSIONS &&
884 	       vhost_vq_packed_ring_is_avail(vq)) {
885 		SPDK_DEBUGLOG(vhost_blk, "====== Starting processing request idx %"PRIu16"======\n",
886 			      vq->last_avail_idx);
887 		count++;
888 		process_packed_blk_task(vq, vq->last_avail_idx);
889 	}
890 
891 	return count;
892 }
893 
894 static int
895 _vdev_vq_worker(struct spdk_vhost_virtqueue *vq)
896 {
897 	struct spdk_vhost_session *vsession = vq->vsession;
898 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
899 	bool packed_ring;
900 	int rc = 0;
901 
902 	packed_ring = vq->packed.packed_ring;
903 	if (packed_ring) {
904 		rc = process_packed_vq(bvsession, vq);
905 	} else {
906 		rc = process_vq(bvsession, vq);
907 	}
908 
909 	vhost_session_vq_used_signal(vq);
910 
911 	return rc;
912 
913 }
914 
915 static int
916 vdev_vq_worker(void *arg)
917 {
918 	struct spdk_vhost_virtqueue *vq = arg;
919 
920 	return _vdev_vq_worker(vq);
921 }
922 
923 static int
924 vdev_worker(void *arg)
925 {
926 	struct spdk_vhost_blk_session *bvsession = arg;
927 	struct spdk_vhost_session *vsession = &bvsession->vsession;
928 	uint16_t q_idx;
929 	int rc = 0;
930 
931 	for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
932 		rc += _vdev_vq_worker(&vsession->virtqueue[q_idx]);
933 	}
934 
935 	return rc > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
936 }
937 
938 static void
939 no_bdev_process_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
940 {
941 	struct spdk_vhost_session *vsession = &bvsession->vsession;
942 	struct iovec iovs[SPDK_VHOST_IOVS_MAX];
943 	uint32_t length;
944 	uint16_t iovcnt, req_idx;
945 
946 	if (vhost_vq_avail_ring_get(vq, &req_idx, 1) != 1) {
947 		return;
948 	}
949 
950 	iovcnt = SPDK_COUNTOF(iovs);
951 	if (blk_iovs_split_queue_setup(bvsession, vq, req_idx, iovs, &iovcnt, &length) == 0) {
952 		*(volatile uint8_t *)iovs[iovcnt - 1].iov_base = VIRTIO_BLK_S_IOERR;
953 		SPDK_DEBUGLOG(vhost_blk_data, "Aborting request %" PRIu16"\n", req_idx);
954 	}
955 
956 	vhost_vq_used_ring_enqueue(vsession, vq, req_idx, 0);
957 }
958 
959 static void
960 no_bdev_process_packed_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
961 {
962 	struct spdk_vhost_session *vsession = &bvsession->vsession;
963 	struct spdk_vhost_user_blk_task *task;
964 	struct spdk_vhost_blk_task *blk_task;
965 	uint32_t length;
966 	uint16_t req_idx = vq->last_avail_idx;
967 	uint16_t task_idx, num_descs;
968 
969 	if (!vhost_vq_packed_ring_is_avail(vq)) {
970 		return;
971 	}
972 
973 	task_idx = vhost_vring_packed_desc_get_buffer_id(vq, req_idx, &num_descs);
974 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[task_idx];
975 	blk_task = &task->blk_task;
976 	if (spdk_unlikely(task->used)) {
977 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
978 			    vsession->name, req_idx);
979 		vhost_vq_packed_ring_enqueue(vsession, vq, num_descs,
980 					     task->buffer_id, blk_task->used_len,
981 					     task->inflight_head);
982 		return;
983 	}
984 
985 	task->req_idx = req_idx;
986 	task->num_descs = num_descs;
987 	task->buffer_id = task_idx;
988 	blk_task_init(task);
989 
990 	if (blk_iovs_packed_queue_setup(bvsession, vq, task->req_idx, blk_task->iovs, &blk_task->iovcnt,
991 					&length)) {
992 		*(volatile uint8_t *)(blk_task->iovs[blk_task->iovcnt - 1].iov_base) = VIRTIO_BLK_S_IOERR;
993 		SPDK_DEBUGLOG(vhost_blk_data, "Aborting request %" PRIu16"\n", req_idx);
994 	}
995 
996 	task->used = false;
997 	vhost_vq_packed_ring_enqueue(vsession, vq, num_descs,
998 				     task->buffer_id, blk_task->used_len,
999 				     task->inflight_head);
1000 }
1001 
1002 static int
1003 _no_bdev_vdev_vq_worker(struct spdk_vhost_virtqueue *vq)
1004 {
1005 	struct spdk_vhost_session *vsession = vq->vsession;
1006 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1007 	bool packed_ring;
1008 
1009 	packed_ring = vq->packed.packed_ring;
1010 	if (packed_ring) {
1011 		no_bdev_process_packed_vq(bvsession, vq);
1012 	} else {
1013 		no_bdev_process_vq(bvsession, vq);
1014 	}
1015 
1016 	vhost_session_vq_used_signal(vq);
1017 
1018 	if (vsession->task_cnt == 0 && bvsession->io_channel) {
1019 		vhost_blk_put_io_channel(bvsession->io_channel);
1020 		bvsession->io_channel = NULL;
1021 	}
1022 
1023 	return SPDK_POLLER_BUSY;
1024 }
1025 
1026 static int
1027 no_bdev_vdev_vq_worker(void *arg)
1028 {
1029 	struct spdk_vhost_virtqueue *vq = arg;
1030 
1031 	return _no_bdev_vdev_vq_worker(vq);
1032 }
1033 
1034 static int
1035 no_bdev_vdev_worker(void *arg)
1036 {
1037 	struct spdk_vhost_blk_session *bvsession = arg;
1038 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1039 	uint16_t q_idx;
1040 
1041 	for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
1042 		_no_bdev_vdev_vq_worker(&vsession->virtqueue[q_idx]);
1043 	}
1044 
1045 	return SPDK_POLLER_BUSY;
1046 }
1047 
1048 static void
1049 vhost_blk_session_unregister_interrupts(struct spdk_vhost_blk_session *bvsession)
1050 {
1051 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1052 	struct spdk_vhost_virtqueue *vq;
1053 	int i;
1054 
1055 	SPDK_DEBUGLOG(vhost_blk, "unregister virtqueues interrupt\n");
1056 	for (i = 0; i < vsession->max_queues; i++) {
1057 		vq = &vsession->virtqueue[i];
1058 		if (vq->intr == NULL) {
1059 			break;
1060 		}
1061 
1062 		SPDK_DEBUGLOG(vhost_blk, "unregister vq[%d]'s kickfd is %d\n",
1063 			      i, vq->vring.kickfd);
1064 		spdk_interrupt_unregister(&vq->intr);
1065 	}
1066 }
1067 
1068 static int
1069 vhost_blk_session_register_interrupts(struct spdk_vhost_blk_session *bvsession,
1070 				      spdk_interrupt_fn fn, const char *name)
1071 {
1072 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1073 	struct spdk_vhost_virtqueue *vq = NULL;
1074 	int i;
1075 
1076 	SPDK_DEBUGLOG(vhost_blk, "Register virtqueues interrupt\n");
1077 	for (i = 0; i < vsession->max_queues; i++) {
1078 		vq = &vsession->virtqueue[i];
1079 		SPDK_DEBUGLOG(vhost_blk, "Register vq[%d]'s kickfd is %d\n",
1080 			      i, vq->vring.kickfd);
1081 
1082 		vq->intr = spdk_interrupt_register(vq->vring.kickfd, fn, vq, name);
1083 		if (vq->intr == NULL) {
1084 			SPDK_ERRLOG("Fail to register req notifier handler.\n");
1085 			goto err;
1086 		}
1087 	}
1088 
1089 	return 0;
1090 
1091 err:
1092 	vhost_blk_session_unregister_interrupts(bvsession);
1093 
1094 	return -1;
1095 }
1096 
1097 static void
1098 vhost_blk_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1099 {
1100 	struct spdk_vhost_blk_session *bvsession = cb_arg;
1101 
1102 	vhost_user_session_set_interrupt_mode(&bvsession->vsession, interrupt_mode);
1103 }
1104 
1105 static void
1106 bdev_event_cpl_cb(struct spdk_vhost_dev *vdev, void *ctx)
1107 {
1108 	enum spdk_bdev_event_type type = (enum spdk_bdev_event_type)(uintptr_t)ctx;
1109 	struct spdk_vhost_blk_dev *bvdev;
1110 
1111 	if (type == SPDK_BDEV_EVENT_REMOVE) {
1112 		/* All sessions have been notified, time to close the bdev */
1113 		bvdev = to_blk_dev(vdev);
1114 		assert(bvdev != NULL);
1115 		spdk_put_io_channel(bvdev->dummy_io_channel);
1116 		spdk_bdev_close(bvdev->bdev_desc);
1117 		bvdev->bdev_desc = NULL;
1118 		bvdev->bdev = NULL;
1119 	}
1120 }
1121 
1122 static int
1123 vhost_session_bdev_resize_cb(struct spdk_vhost_dev *vdev,
1124 			     struct spdk_vhost_session *vsession,
1125 			     void *ctx)
1126 {
1127 #if RTE_VERSION >= RTE_VERSION_NUM(20, 02, 0, 0)
1128 	SPDK_NOTICELOG("bdev send slave msg to vid(%d)\n", vsession->vid);
1129 	rte_vhost_slave_config_change(vsession->vid, false);
1130 #else
1131 	SPDK_NOTICELOG("bdev does not support resize until DPDK submodule version >= 20.02\n");
1132 #endif
1133 
1134 	return 0;
1135 }
1136 
1137 static void
1138 vhost_user_blk_resize_cb(struct spdk_vhost_dev *vdev, bdev_event_cb_complete cb, void *cb_arg)
1139 {
1140 	spdk_vhost_lock();
1141 	vhost_user_dev_foreach_session(vdev, vhost_session_bdev_resize_cb,
1142 				       cb, cb_arg);
1143 	spdk_vhost_unlock();
1144 }
1145 
1146 static int
1147 vhost_user_session_bdev_remove_cb(struct spdk_vhost_dev *vdev,
1148 				  struct spdk_vhost_session *vsession,
1149 				  void *ctx)
1150 {
1151 	struct spdk_vhost_blk_session *bvsession;
1152 	int rc;
1153 
1154 	bvsession = to_blk_session(vsession);
1155 	if (bvsession->requestq_poller) {
1156 		spdk_poller_unregister(&bvsession->requestq_poller);
1157 		if (vsession->virtqueue[0].intr) {
1158 			vhost_blk_session_unregister_interrupts(bvsession);
1159 			rc = vhost_blk_session_register_interrupts(bvsession, no_bdev_vdev_vq_worker,
1160 					"no_bdev_vdev_vq_worker");
1161 			if (rc) {
1162 				SPDK_ERRLOG("%s: Interrupt register failed\n", vsession->name);
1163 				return rc;
1164 			}
1165 		}
1166 
1167 		bvsession->requestq_poller = SPDK_POLLER_REGISTER(no_bdev_vdev_worker, bvsession, 0);
1168 		spdk_poller_register_interrupt(bvsession->requestq_poller, vhost_blk_poller_set_interrupt_mode,
1169 					       bvsession);
1170 	}
1171 
1172 	return 0;
1173 }
1174 
1175 static void
1176 vhost_user_bdev_remove_cb(struct spdk_vhost_dev *vdev, bdev_event_cb_complete cb, void *cb_arg)
1177 {
1178 	SPDK_WARNLOG("%s: hot-removing bdev - all further requests will fail.\n",
1179 		     vdev->name);
1180 
1181 	spdk_vhost_lock();
1182 	vhost_user_dev_foreach_session(vdev, vhost_user_session_bdev_remove_cb,
1183 				       cb, cb_arg);
1184 	spdk_vhost_unlock();
1185 }
1186 
1187 static void
1188 vhost_user_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_vhost_dev *vdev,
1189 			 bdev_event_cb_complete cb, void *cb_arg)
1190 {
1191 	switch (type) {
1192 	case SPDK_BDEV_EVENT_REMOVE:
1193 		vhost_user_bdev_remove_cb(vdev, cb, cb_arg);
1194 		break;
1195 	case SPDK_BDEV_EVENT_RESIZE:
1196 		vhost_user_blk_resize_cb(vdev, cb, cb_arg);
1197 		break;
1198 	default:
1199 		assert(false);
1200 		return;
1201 	}
1202 }
1203 
1204 static void
1205 bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1206 	      void *event_ctx)
1207 {
1208 	struct spdk_vhost_dev *vdev = (struct spdk_vhost_dev *)event_ctx;
1209 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1210 
1211 	SPDK_DEBUGLOG(vhost_blk, "Bdev event: type %d, name %s\n",
1212 		      type,
1213 		      bdev->name);
1214 
1215 	switch (type) {
1216 	case SPDK_BDEV_EVENT_REMOVE:
1217 	case SPDK_BDEV_EVENT_RESIZE:
1218 		bvdev->ops->bdev_event(type, vdev, bdev_event_cpl_cb, (void *)type);
1219 		break;
1220 	default:
1221 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1222 		break;
1223 	}
1224 }
1225 
1226 static void
1227 free_task_pool(struct spdk_vhost_blk_session *bvsession)
1228 {
1229 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1230 	struct spdk_vhost_virtqueue *vq;
1231 	uint16_t i;
1232 
1233 	for (i = 0; i < vsession->max_queues; i++) {
1234 		vq = &vsession->virtqueue[i];
1235 		if (vq->tasks == NULL) {
1236 			continue;
1237 		}
1238 
1239 		spdk_free(vq->tasks);
1240 		vq->tasks = NULL;
1241 	}
1242 }
1243 
1244 static int
1245 alloc_task_pool(struct spdk_vhost_blk_session *bvsession)
1246 {
1247 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1248 	struct spdk_vhost_virtqueue *vq;
1249 	struct spdk_vhost_user_blk_task *task;
1250 	uint32_t task_cnt;
1251 	uint16_t i;
1252 	uint32_t j;
1253 
1254 	for (i = 0; i < vsession->max_queues; i++) {
1255 		vq = &vsession->virtqueue[i];
1256 		if (vq->vring.desc == NULL) {
1257 			continue;
1258 		}
1259 
1260 		task_cnt = vq->vring.size;
1261 		if (task_cnt > SPDK_VHOST_MAX_VQ_SIZE) {
1262 			/* sanity check */
1263 			SPDK_ERRLOG("%s: virtqueue %"PRIu16" is too big. (size = %"PRIu32", max = %"PRIu32")\n",
1264 				    vsession->name, i, task_cnt, SPDK_VHOST_MAX_VQ_SIZE);
1265 			free_task_pool(bvsession);
1266 			return -1;
1267 		}
1268 		vq->tasks = spdk_zmalloc(sizeof(struct spdk_vhost_user_blk_task) * task_cnt,
1269 					 SPDK_CACHE_LINE_SIZE, NULL,
1270 					 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1271 		if (vq->tasks == NULL) {
1272 			SPDK_ERRLOG("%s: failed to allocate %"PRIu32" tasks for virtqueue %"PRIu16"\n",
1273 				    vsession->name, task_cnt, i);
1274 			free_task_pool(bvsession);
1275 			return -1;
1276 		}
1277 
1278 		for (j = 0; j < task_cnt; j++) {
1279 			task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[j];
1280 			task->bvsession = bvsession;
1281 			task->req_idx = j;
1282 			task->vq = vq;
1283 		}
1284 	}
1285 
1286 	return 0;
1287 }
1288 
1289 static int
1290 vhost_blk_start_cb(struct spdk_vhost_dev *vdev,
1291 		   struct spdk_vhost_session *vsession, void *unused)
1292 {
1293 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1294 	struct spdk_vhost_blk_dev *bvdev;
1295 	int i, rc = 0;
1296 
1297 	bvdev = to_blk_dev(vdev);
1298 	assert(bvdev != NULL);
1299 	bvsession->bvdev = bvdev;
1300 
1301 	/* validate all I/O queues are in a contiguous index range */
1302 	for (i = 0; i < vsession->max_queues; i++) {
1303 		/* vring.desc and vring.desc_packed are in a union struct
1304 		 * so q->vring.desc can replace q->vring.desc_packed.
1305 		 */
1306 		if (vsession->virtqueue[i].vring.desc == NULL) {
1307 			SPDK_ERRLOG("%s: queue %"PRIu32" is empty\n", vsession->name, i);
1308 			rc = -1;
1309 			goto out;
1310 		}
1311 	}
1312 
1313 	rc = alloc_task_pool(bvsession);
1314 	if (rc != 0) {
1315 		SPDK_ERRLOG("%s: failed to alloc task pool.\n", vsession->name);
1316 		goto out;
1317 	}
1318 
1319 	if (bvdev->bdev) {
1320 		bvsession->io_channel = vhost_blk_get_io_channel(vdev);
1321 		if (!bvsession->io_channel) {
1322 			free_task_pool(bvsession);
1323 			SPDK_ERRLOG("%s: I/O channel allocation failed\n", vsession->name);
1324 			rc = -1;
1325 			goto out;
1326 		}
1327 	}
1328 
1329 	if (spdk_interrupt_mode_is_enabled()) {
1330 		if (bvdev->bdev) {
1331 			rc = vhost_blk_session_register_interrupts(bvsession,
1332 					vdev_vq_worker,
1333 					"vdev_vq_worker");
1334 		} else {
1335 			rc = vhost_blk_session_register_interrupts(bvsession,
1336 					no_bdev_vdev_vq_worker,
1337 					"no_bdev_vdev_vq_worker");
1338 		}
1339 
1340 		if (rc) {
1341 			SPDK_ERRLOG("%s: Interrupt register failed\n", vsession->name);
1342 			goto out;
1343 		}
1344 	}
1345 
1346 	if (bvdev->bdev) {
1347 		bvsession->requestq_poller = SPDK_POLLER_REGISTER(vdev_worker, bvsession, 0);
1348 	} else {
1349 		bvsession->requestq_poller = SPDK_POLLER_REGISTER(no_bdev_vdev_worker, bvsession, 0);
1350 	}
1351 	SPDK_INFOLOG(vhost, "%s: started poller on lcore %d\n",
1352 		     vsession->name, spdk_env_get_current_core());
1353 
1354 	spdk_poller_register_interrupt(bvsession->requestq_poller, vhost_blk_poller_set_interrupt_mode,
1355 				       bvsession);
1356 
1357 out:
1358 	vhost_user_session_start_done(vsession, rc);
1359 	return rc;
1360 }
1361 
1362 static int
1363 vhost_blk_start(struct spdk_vhost_session *vsession)
1364 {
1365 	return vhost_user_session_send_event(vsession, vhost_blk_start_cb,
1366 					     3, "start session");
1367 }
1368 
1369 static int
1370 destroy_session_poller_cb(void *arg)
1371 {
1372 	struct spdk_vhost_blk_session *bvsession = arg;
1373 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1374 	int i;
1375 
1376 	if (vsession->task_cnt > 0 || spdk_vhost_trylock() != 0) {
1377 		assert(vsession->stop_retry_count > 0);
1378 		vsession->stop_retry_count--;
1379 		if (vsession->stop_retry_count == 0) {
1380 			SPDK_ERRLOG("%s: Timedout when destroy session (task_cnt %d)\n", vsession->name,
1381 				    vsession->task_cnt);
1382 			spdk_poller_unregister(&bvsession->stop_poller);
1383 			vhost_user_session_stop_done(vsession, -ETIMEDOUT);
1384 		}
1385 
1386 		return SPDK_POLLER_BUSY;
1387 	}
1388 
1389 	for (i = 0; i < vsession->max_queues; i++) {
1390 		vsession->virtqueue[i].next_event_time = 0;
1391 		vhost_vq_used_signal(vsession, &vsession->virtqueue[i]);
1392 	}
1393 
1394 	SPDK_INFOLOG(vhost, "%s: stopping poller on lcore %d\n",
1395 		     vsession->name, spdk_env_get_current_core());
1396 
1397 	if (bvsession->io_channel) {
1398 		vhost_blk_put_io_channel(bvsession->io_channel);
1399 		bvsession->io_channel = NULL;
1400 	}
1401 
1402 	free_task_pool(bvsession);
1403 	spdk_poller_unregister(&bvsession->stop_poller);
1404 	vhost_user_session_stop_done(vsession, 0);
1405 
1406 	spdk_vhost_unlock();
1407 	return SPDK_POLLER_BUSY;
1408 }
1409 
1410 static int
1411 vhost_blk_stop_cb(struct spdk_vhost_dev *vdev,
1412 		  struct spdk_vhost_session *vsession, void *unused)
1413 {
1414 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1415 
1416 	spdk_poller_unregister(&bvsession->requestq_poller);
1417 
1418 	if (vsession->virtqueue[0].intr) {
1419 		vhost_blk_session_unregister_interrupts(bvsession);
1420 	}
1421 
1422 	/* vhost_user_session_send_event timeout is 3 seconds, here set retry within 4 seconds */
1423 	bvsession->vsession.stop_retry_count = 4000;
1424 	bvsession->stop_poller = SPDK_POLLER_REGISTER(destroy_session_poller_cb,
1425 				 bvsession, 1000);
1426 	return 0;
1427 }
1428 
1429 static int
1430 vhost_blk_stop(struct spdk_vhost_session *vsession)
1431 {
1432 	return vhost_user_session_send_event(vsession, vhost_blk_stop_cb,
1433 					     3, "stop session");
1434 }
1435 
1436 static void
1437 vhost_blk_dump_info_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_ctx *w)
1438 {
1439 	struct spdk_vhost_blk_dev *bvdev;
1440 
1441 	bvdev = to_blk_dev(vdev);
1442 	assert(bvdev != NULL);
1443 
1444 	spdk_json_write_named_object_begin(w, "block");
1445 
1446 	spdk_json_write_named_bool(w, "readonly", bvdev->readonly);
1447 
1448 	spdk_json_write_name(w, "bdev");
1449 	if (bvdev->bdev) {
1450 		spdk_json_write_string(w, spdk_bdev_get_name(bvdev->bdev));
1451 	} else {
1452 		spdk_json_write_null(w);
1453 	}
1454 	spdk_json_write_named_string(w, "transport", bvdev->ops->name);
1455 
1456 	spdk_json_write_object_end(w);
1457 }
1458 
1459 static void
1460 vhost_blk_write_config_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_ctx *w)
1461 {
1462 	struct spdk_vhost_blk_dev *bvdev;
1463 
1464 	bvdev = to_blk_dev(vdev);
1465 	assert(bvdev != NULL);
1466 
1467 	if (!bvdev->bdev) {
1468 		return;
1469 	}
1470 
1471 	spdk_json_write_object_begin(w);
1472 	spdk_json_write_named_string(w, "method", "vhost_create_blk_controller");
1473 
1474 	spdk_json_write_named_object_begin(w, "params");
1475 	spdk_json_write_named_string(w, "ctrlr", vdev->name);
1476 	spdk_json_write_named_string(w, "dev_name", spdk_bdev_get_name(bvdev->bdev));
1477 	spdk_json_write_named_string(w, "cpumask",
1478 				     spdk_cpuset_fmt(spdk_thread_get_cpumask(vdev->thread)));
1479 	spdk_json_write_named_bool(w, "readonly", bvdev->readonly);
1480 	spdk_json_write_named_string(w, "transport", bvdev->ops->name);
1481 	spdk_json_write_object_end(w);
1482 
1483 	spdk_json_write_object_end(w);
1484 }
1485 
1486 static int vhost_blk_destroy(struct spdk_vhost_dev *dev);
1487 
1488 static int
1489 vhost_blk_get_config(struct spdk_vhost_dev *vdev, uint8_t *config,
1490 		     uint32_t len)
1491 {
1492 	struct virtio_blk_config blkcfg;
1493 	struct spdk_bdev *bdev;
1494 	uint32_t blk_size;
1495 	uint64_t blkcnt;
1496 
1497 	memset(&blkcfg, 0, sizeof(blkcfg));
1498 	bdev = vhost_blk_get_bdev(vdev);
1499 	if (bdev == NULL) {
1500 		/* We can't just return -1 here as this GET_CONFIG message might
1501 		 * be caused by a QEMU VM reboot. Returning -1 will indicate an
1502 		 * error to QEMU, who might then decide to terminate itself.
1503 		 * We don't want that. A simple reboot shouldn't break the system.
1504 		 *
1505 		 * Presenting a block device with block size 0 and block count 0
1506 		 * doesn't cause any problems on QEMU side and the virtio-pci
1507 		 * device is even still available inside the VM, but there will
1508 		 * be no block device created for it - the kernel drivers will
1509 		 * silently reject it.
1510 		 */
1511 		blk_size = 0;
1512 		blkcnt = 0;
1513 	} else {
1514 		blk_size = spdk_bdev_get_block_size(bdev);
1515 		blkcnt = spdk_bdev_get_num_blocks(bdev);
1516 		if (spdk_bdev_get_buf_align(bdev) > 1) {
1517 			blkcfg.size_max = SPDK_BDEV_LARGE_BUF_MAX_SIZE;
1518 			blkcfg.seg_max = spdk_min(SPDK_VHOST_IOVS_MAX - 2 - 1, BDEV_IO_NUM_CHILD_IOV - 2 - 1);
1519 		} else {
1520 			blkcfg.size_max = 131072;
1521 			/*  -2 for REQ and RESP and -1 for region boundary splitting */
1522 			blkcfg.seg_max = SPDK_VHOST_IOVS_MAX - 2 - 1;
1523 		}
1524 	}
1525 
1526 	blkcfg.blk_size = blk_size;
1527 	/* minimum I/O size in blocks */
1528 	blkcfg.min_io_size = 1;
1529 	/* expressed in 512 Bytes sectors */
1530 	blkcfg.capacity = (blkcnt * blk_size) / 512;
1531 	/* QEMU can overwrite this value when started */
1532 	blkcfg.num_queues = SPDK_VHOST_MAX_VQUEUES;
1533 
1534 	if (bdev && spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1535 		/* 16MiB, expressed in 512 Bytes */
1536 		blkcfg.max_discard_sectors = 32768;
1537 		blkcfg.max_discard_seg = 1;
1538 		blkcfg.discard_sector_alignment = blk_size / 512;
1539 	}
1540 	if (bdev && spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1541 		blkcfg.max_write_zeroes_sectors = 32768;
1542 		blkcfg.max_write_zeroes_seg = 1;
1543 	}
1544 
1545 	memcpy(config, &blkcfg, spdk_min(len, sizeof(blkcfg)));
1546 
1547 	return 0;
1548 }
1549 
1550 static const struct spdk_vhost_user_dev_backend vhost_blk_user_device_backend = {
1551 	.session_ctx_size = sizeof(struct spdk_vhost_blk_session) - sizeof(struct spdk_vhost_session),
1552 	.start_session =  vhost_blk_start,
1553 	.stop_session = vhost_blk_stop,
1554 };
1555 
1556 static const struct spdk_vhost_dev_backend vhost_blk_device_backend = {
1557 	.type = VHOST_BACKEND_BLK,
1558 	.vhost_get_config = vhost_blk_get_config,
1559 	.dump_info_json = vhost_blk_dump_info_json,
1560 	.write_config_json = vhost_blk_write_config_json,
1561 	.remove_device = vhost_blk_destroy,
1562 };
1563 
1564 int
1565 virtio_blk_construct_ctrlr(struct spdk_vhost_dev *vdev, const char *address,
1566 			   struct spdk_cpuset *cpumask, const struct spdk_json_val *params,
1567 			   const struct spdk_vhost_user_dev_backend *user_backend)
1568 {
1569 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1570 
1571 	return bvdev->ops->create_ctrlr(vdev, cpumask, address, params, (void *)user_backend);
1572 }
1573 
1574 int
1575 spdk_vhost_blk_construct(const char *name, const char *cpumask, const char *dev_name,
1576 			 const char *transport, const struct spdk_json_val *params)
1577 {
1578 	struct spdk_vhost_blk_dev *bvdev = NULL;
1579 	struct spdk_vhost_dev *vdev;
1580 	struct spdk_bdev *bdev;
1581 	const char *transport_name = VIRTIO_BLK_DEFAULT_TRANSPORT;
1582 	int ret = 0;
1583 
1584 	spdk_vhost_lock();
1585 
1586 	bvdev = calloc(1, sizeof(*bvdev));
1587 	if (bvdev == NULL) {
1588 		ret = -ENOMEM;
1589 		goto out;
1590 	}
1591 
1592 	if (transport != NULL) {
1593 		transport_name = transport;
1594 	}
1595 
1596 	bvdev->ops = virtio_blk_get_transport_ops(transport_name);
1597 	if (!bvdev->ops) {
1598 		ret = -EINVAL;
1599 		SPDK_ERRLOG("Transport type '%s' unavailable.\n", transport_name);
1600 		goto out;
1601 	}
1602 
1603 	ret = spdk_bdev_open_ext(dev_name, true, bdev_event_cb, bvdev, &bvdev->bdev_desc);
1604 	if (ret != 0) {
1605 		SPDK_ERRLOG("%s: could not open bdev '%s', error=%d\n",
1606 			    name, dev_name, ret);
1607 		goto out;
1608 	}
1609 	bdev = spdk_bdev_desc_get_bdev(bvdev->bdev_desc);
1610 
1611 	vdev = &bvdev->vdev;
1612 	vdev->virtio_features = SPDK_VHOST_BLK_FEATURES_BASE;
1613 	vdev->disabled_features = SPDK_VHOST_BLK_DISABLED_FEATURES;
1614 	vdev->protocol_features = SPDK_VHOST_BLK_PROTOCOL_FEATURES;
1615 
1616 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1617 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_DISCARD);
1618 	}
1619 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1620 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_WRITE_ZEROES);
1621 	}
1622 
1623 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_FLUSH)) {
1624 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_FLUSH);
1625 	}
1626 
1627 	/*
1628 	 * When starting qemu with multiqueue enable, the vhost device will
1629 	 * be started/stopped many times, related to the queues num, as the
1630 	 * exact number of queues used for this device is not known at the time.
1631 	 * The target has to stop and start the device once got a valid IO queue.
1632 	 * When stoping and starting the vhost device, the backend bdev io device
1633 	 * will be deleted and created repeatedly.
1634 	 * Hold a bdev reference so that in the struct spdk_vhost_blk_dev, so that
1635 	 * the io device will not be deleted.
1636 	 */
1637 	bvdev->dummy_io_channel = spdk_bdev_get_io_channel(bvdev->bdev_desc);
1638 
1639 	bvdev->bdev = bdev;
1640 	bvdev->readonly = false;
1641 	ret = vhost_dev_register(vdev, name, cpumask, params, &vhost_blk_device_backend,
1642 				 &vhost_blk_user_device_backend);
1643 	if (ret != 0) {
1644 		spdk_put_io_channel(bvdev->dummy_io_channel);
1645 		spdk_bdev_close(bvdev->bdev_desc);
1646 		goto out;
1647 	}
1648 
1649 	SPDK_INFOLOG(vhost, "%s: using bdev '%s'\n", name, dev_name);
1650 out:
1651 	if (ret != 0 && bvdev) {
1652 		free(bvdev);
1653 	}
1654 	spdk_vhost_unlock();
1655 	return ret;
1656 }
1657 
1658 int
1659 virtio_blk_destroy_ctrlr(struct spdk_vhost_dev *vdev)
1660 {
1661 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1662 
1663 	return bvdev->ops->destroy_ctrlr(vdev);
1664 }
1665 
1666 static int
1667 vhost_blk_destroy(struct spdk_vhost_dev *vdev)
1668 {
1669 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1670 	int rc;
1671 
1672 	assert(bvdev != NULL);
1673 
1674 	rc = vhost_dev_unregister(&bvdev->vdev);
1675 	if (rc != 0) {
1676 		return rc;
1677 	}
1678 
1679 	/* if the bdev is removed, don't need call spdk_put_io_channel. */
1680 	if (bvdev->bdev) {
1681 		spdk_put_io_channel(bvdev->dummy_io_channel);
1682 	}
1683 
1684 	if (bvdev->bdev_desc) {
1685 		spdk_bdev_close(bvdev->bdev_desc);
1686 		bvdev->bdev_desc = NULL;
1687 	}
1688 	bvdev->bdev = NULL;
1689 
1690 	free(bvdev);
1691 	return 0;
1692 }
1693 
1694 struct spdk_io_channel *
1695 vhost_blk_get_io_channel(struct spdk_vhost_dev *vdev)
1696 {
1697 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1698 
1699 	return spdk_bdev_get_io_channel(bvdev->bdev_desc);
1700 }
1701 
1702 void
1703 vhost_blk_put_io_channel(struct spdk_io_channel *ch)
1704 {
1705 	spdk_put_io_channel(ch);
1706 }
1707 
1708 static struct spdk_virtio_blk_transport *
1709 vhost_user_blk_create(const struct spdk_json_val *params)
1710 {
1711 	int ret;
1712 	struct spdk_virtio_blk_transport *vhost_user_blk;
1713 
1714 	vhost_user_blk = calloc(1, sizeof(*vhost_user_blk));
1715 	if (!vhost_user_blk) {
1716 		return NULL;
1717 	}
1718 
1719 	ret = vhost_user_init();
1720 	if (ret != 0) {
1721 		free(vhost_user_blk);
1722 		return NULL;
1723 	}
1724 
1725 	return vhost_user_blk;
1726 }
1727 
1728 static int
1729 vhost_user_blk_destroy(struct spdk_virtio_blk_transport *transport,
1730 		       spdk_vhost_fini_cb cb_fn)
1731 {
1732 	vhost_user_fini(cb_fn);
1733 	free(transport);
1734 	return 0;
1735 }
1736 
1737 struct rpc_vhost_blk {
1738 	bool readonly;
1739 	bool packed_ring;
1740 	bool packed_ring_recovery;
1741 };
1742 
1743 static const struct spdk_json_object_decoder rpc_construct_vhost_blk[] = {
1744 	{"readonly", offsetof(struct rpc_vhost_blk, readonly), spdk_json_decode_bool, true},
1745 	{"packed_ring", offsetof(struct rpc_vhost_blk, packed_ring), spdk_json_decode_bool, true},
1746 	{"packed_ring_recovery", offsetof(struct rpc_vhost_blk, packed_ring_recovery), spdk_json_decode_bool, true},
1747 };
1748 
1749 static int
1750 vhost_user_blk_create_ctrlr(struct spdk_vhost_dev *vdev, struct spdk_cpuset *cpumask,
1751 			    const char *address, const struct spdk_json_val *params, void *custom_opts)
1752 {
1753 	struct rpc_vhost_blk req = {0};
1754 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1755 
1756 	if (spdk_json_decode_object_relaxed(params, rpc_construct_vhost_blk,
1757 					    SPDK_COUNTOF(rpc_construct_vhost_blk),
1758 					    &req)) {
1759 		SPDK_DEBUGLOG(vhost_blk, "spdk_json_decode_object failed\n");
1760 		return -EINVAL;
1761 	}
1762 
1763 	vdev->packed_ring_recovery = false;
1764 
1765 	if (req.packed_ring) {
1766 		vdev->virtio_features |= (uint64_t)req.packed_ring << VIRTIO_F_RING_PACKED;
1767 		vdev->packed_ring_recovery = req.packed_ring_recovery;
1768 	}
1769 	if (req.readonly) {
1770 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_RO);
1771 		bvdev->readonly = req.readonly;
1772 	}
1773 
1774 	return vhost_user_dev_register(vdev, address, cpumask, custom_opts);
1775 }
1776 
1777 static int
1778 vhost_user_blk_destroy_ctrlr(struct spdk_vhost_dev *vdev)
1779 {
1780 	return vhost_user_dev_unregister(vdev);
1781 }
1782 
1783 static const struct spdk_virtio_blk_transport_ops vhost_user_blk = {
1784 	.name = "vhost_user_blk",
1785 
1786 	.dump_opts = NULL,
1787 
1788 	.create = vhost_user_blk_create,
1789 	.destroy = vhost_user_blk_destroy,
1790 
1791 	.create_ctrlr = vhost_user_blk_create_ctrlr,
1792 	.destroy_ctrlr = vhost_user_blk_destroy_ctrlr,
1793 
1794 	.bdev_event = vhost_user_bdev_event_cb,
1795 };
1796 
1797 SPDK_VIRTIO_BLK_TRANSPORT_REGISTER(vhost_user_blk, &vhost_user_blk);
1798 
1799 SPDK_LOG_REGISTER_COMPONENT(vhost_blk)
1800 SPDK_LOG_REGISTER_COMPONENT(vhost_blk_data)
1801