xref: /spdk/lib/vhost/vhost_blk.c (revision 510f4c134a21b45ff3a5add9ebc6c6cf7e49aeab)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright(c) Intel Corporation. All rights reserved.
3  *   All rights reserved.
4  */
5 
6 #include <linux/virtio_blk.h>
7 
8 #include "spdk/env.h"
9 #include "spdk/bdev.h"
10 #include "spdk/bdev_module.h"
11 #include "spdk/thread.h"
12 #include "spdk/likely.h"
13 #include "spdk/string.h"
14 #include "spdk/util.h"
15 #include "spdk/vhost.h"
16 
17 #include "vhost_internal.h"
18 #include <rte_version.h>
19 
20 /* Minimal set of features supported by every SPDK VHOST-BLK device */
21 #define SPDK_VHOST_BLK_FEATURES_BASE (SPDK_VHOST_FEATURES | \
22 		(1ULL << VIRTIO_BLK_F_SIZE_MAX) | (1ULL << VIRTIO_BLK_F_SEG_MAX) | \
23 		(1ULL << VIRTIO_BLK_F_GEOMETRY) | (1ULL << VIRTIO_BLK_F_BLK_SIZE) | \
24 		(1ULL << VIRTIO_BLK_F_TOPOLOGY) | (1ULL << VIRTIO_BLK_F_BARRIER)  | \
25 		(1ULL << VIRTIO_BLK_F_SCSI)     | (1ULL << VIRTIO_BLK_F_CONFIG_WCE) | \
26 		(1ULL << VIRTIO_BLK_F_MQ))
27 
28 /* Not supported features */
29 #define SPDK_VHOST_BLK_DISABLED_FEATURES (SPDK_VHOST_DISABLED_FEATURES | \
30 		(1ULL << VIRTIO_BLK_F_GEOMETRY) | (1ULL << VIRTIO_BLK_F_CONFIG_WCE) | \
31 		(1ULL << VIRTIO_BLK_F_BARRIER)  | (1ULL << VIRTIO_BLK_F_SCSI))
32 
33 /* Vhost-blk support protocol features */
34 #define SPDK_VHOST_BLK_PROTOCOL_FEATURES ((1ULL << VHOST_USER_PROTOCOL_F_CONFIG) | \
35 		(1ULL << VHOST_USER_PROTOCOL_F_INFLIGHT_SHMFD))
36 
37 #define VIRTIO_BLK_DEFAULT_TRANSPORT "vhost_user_blk"
38 
39 struct spdk_vhost_user_blk_task {
40 	struct spdk_vhost_blk_task blk_task;
41 	struct spdk_vhost_blk_session *bvsession;
42 	struct spdk_vhost_virtqueue *vq;
43 
44 	uint16_t req_idx;
45 	uint16_t num_descs;
46 	uint16_t buffer_id;
47 	uint16_t inflight_head;
48 
49 	/* If set, the task is currently used for I/O processing. */
50 	bool used;
51 };
52 
53 struct spdk_vhost_blk_dev {
54 	struct spdk_vhost_dev vdev;
55 	struct spdk_bdev *bdev;
56 	struct spdk_bdev_desc *bdev_desc;
57 	const struct spdk_virtio_blk_transport_ops *ops;
58 
59 	/* dummy_io_channel is used to hold a bdev reference */
60 	struct spdk_io_channel *dummy_io_channel;
61 	bool readonly;
62 };
63 
64 struct spdk_vhost_blk_session {
65 	/* The parent session must be the very first field in this struct */
66 	struct spdk_vhost_session vsession;
67 	struct spdk_vhost_blk_dev *bvdev;
68 	struct spdk_poller *requestq_poller;
69 	struct spdk_io_channel *io_channel;
70 	struct spdk_poller *stop_poller;
71 };
72 
73 /* forward declaration */
74 static const struct spdk_vhost_dev_backend vhost_blk_device_backend;
75 
76 static void vhost_user_blk_request_finish(uint8_t status, struct spdk_vhost_blk_task *task,
77 		void *cb_arg);
78 
79 static int
80 vhost_user_process_blk_request(struct spdk_vhost_user_blk_task *user_task)
81 {
82 	struct spdk_vhost_blk_session *bvsession = user_task->bvsession;
83 	struct spdk_vhost_dev *vdev = &bvsession->bvdev->vdev;
84 
85 	return virtio_blk_process_request(vdev, bvsession->io_channel, &user_task->blk_task,
86 					  vhost_user_blk_request_finish, NULL);
87 }
88 
89 static struct spdk_vhost_blk_dev *
90 to_blk_dev(struct spdk_vhost_dev *vdev)
91 {
92 	if (vdev == NULL) {
93 		return NULL;
94 	}
95 
96 	if (vdev->backend->type != VHOST_BACKEND_BLK) {
97 		SPDK_ERRLOG("%s: not a vhost-blk device\n", vdev->name);
98 		return NULL;
99 	}
100 
101 	return SPDK_CONTAINEROF(vdev, struct spdk_vhost_blk_dev, vdev);
102 }
103 
104 struct spdk_bdev *
105 vhost_blk_get_bdev(struct spdk_vhost_dev *vdev)
106 {
107 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
108 
109 	assert(bvdev != NULL);
110 
111 	return bvdev->bdev;
112 }
113 
114 static struct spdk_vhost_blk_session *
115 to_blk_session(struct spdk_vhost_session *vsession)
116 {
117 	assert(vsession->vdev->backend->type == VHOST_BACKEND_BLK);
118 	return (struct spdk_vhost_blk_session *)vsession;
119 }
120 
121 static void
122 blk_task_finish(struct spdk_vhost_user_blk_task *task)
123 {
124 	assert(task->bvsession->vsession.task_cnt > 0);
125 	task->bvsession->vsession.task_cnt--;
126 	task->used = false;
127 }
128 
129 static void
130 blk_task_init(struct spdk_vhost_user_blk_task *task)
131 {
132 	struct spdk_vhost_blk_task *blk_task = &task->blk_task;
133 
134 	task->used = true;
135 	blk_task->iovcnt = SPDK_COUNTOF(blk_task->iovs);
136 	blk_task->status = NULL;
137 	blk_task->used_len = 0;
138 	blk_task->payload_size = 0;
139 }
140 
141 static void
142 blk_task_enqueue(struct spdk_vhost_user_blk_task *task)
143 {
144 	if (task->vq->packed.packed_ring) {
145 		vhost_vq_packed_ring_enqueue(&task->bvsession->vsession, task->vq,
146 					     task->num_descs,
147 					     task->buffer_id, task->blk_task.used_len,
148 					     task->inflight_head);
149 	} else {
150 		vhost_vq_used_ring_enqueue(&task->bvsession->vsession, task->vq,
151 					   task->req_idx, task->blk_task.used_len);
152 	}
153 }
154 
155 static void
156 vhost_user_blk_request_finish(uint8_t status, struct spdk_vhost_blk_task *task, void *cb_arg)
157 {
158 	struct spdk_vhost_user_blk_task *user_task;
159 
160 	user_task = SPDK_CONTAINEROF(task, struct spdk_vhost_user_blk_task, blk_task);
161 
162 	blk_task_enqueue(user_task);
163 
164 	SPDK_DEBUGLOG(vhost_blk, "Finished task (%p) req_idx=%d\n status: %" PRIu8"\n",
165 		      user_task, user_task->req_idx, status);
166 	blk_task_finish(user_task);
167 }
168 
169 static void
170 blk_request_finish(uint8_t status, struct spdk_vhost_blk_task *task)
171 {
172 
173 	if (task->status) {
174 		*task->status = status;
175 	}
176 
177 	task->cb(status, task, task->cb_arg);
178 }
179 
180 /*
181  * Process task's descriptor chain and setup data related fields.
182  * Return
183  *   total size of supplied buffers
184  *
185  *   FIXME: Make this function return to rd_cnt and wr_cnt
186  */
187 static int
188 blk_iovs_split_queue_setup(struct spdk_vhost_blk_session *bvsession,
189 			   struct spdk_vhost_virtqueue *vq,
190 			   uint16_t req_idx, struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
191 {
192 	struct spdk_vhost_session *vsession = &bvsession->vsession;
193 	struct spdk_vhost_dev *vdev = vsession->vdev;
194 	struct vring_desc *desc, *desc_table;
195 	uint16_t out_cnt = 0, cnt = 0;
196 	uint32_t desc_table_size, len = 0;
197 	uint32_t desc_handled_cnt;
198 	int rc;
199 
200 	rc = vhost_vq_get_desc(vsession, vq, req_idx, &desc, &desc_table, &desc_table_size);
201 	if (rc != 0) {
202 		SPDK_ERRLOG("%s: invalid descriptor at index %"PRIu16".\n", vdev->name, req_idx);
203 		return -1;
204 	}
205 
206 	desc_handled_cnt = 0;
207 	while (1) {
208 		/*
209 		 * Maximum cnt reached?
210 		 * Should not happen if request is well formatted, otherwise this is a BUG.
211 		 */
212 		if (spdk_unlikely(cnt == *iovs_cnt)) {
213 			SPDK_DEBUGLOG(vhost_blk, "%s: max IOVs in request reached (req_idx = %"PRIu16").\n",
214 				      vsession->name, req_idx);
215 			return -1;
216 		}
217 
218 		if (spdk_unlikely(vhost_vring_desc_to_iov(vsession, iovs, &cnt, desc))) {
219 			SPDK_DEBUGLOG(vhost_blk, "%s: invalid descriptor %" PRIu16" (req_idx = %"PRIu16").\n",
220 				      vsession->name, req_idx, cnt);
221 			return -1;
222 		}
223 
224 		len += desc->len;
225 
226 		out_cnt += vhost_vring_desc_is_wr(desc);
227 
228 		rc = vhost_vring_desc_get_next(&desc, desc_table, desc_table_size);
229 		if (rc != 0) {
230 			SPDK_ERRLOG("%s: descriptor chain at index %"PRIu16" terminated unexpectedly.\n",
231 				    vsession->name, req_idx);
232 			return -1;
233 		} else if (desc == NULL) {
234 			break;
235 		}
236 
237 		desc_handled_cnt++;
238 		if (spdk_unlikely(desc_handled_cnt > desc_table_size)) {
239 			/* Break a cycle and report an error, if any. */
240 			SPDK_ERRLOG("%s: found a cycle in the descriptor chain: desc_table_size = %d, desc_handled_cnt = %d.\n",
241 				    vsession->name, desc_table_size, desc_handled_cnt);
242 			return -1;
243 		}
244 	}
245 
246 	/*
247 	 * There must be least two descriptors.
248 	 * First contain request so it must be readable.
249 	 * Last descriptor contain buffer for response so it must be writable.
250 	 */
251 	if (spdk_unlikely(out_cnt == 0 || cnt < 2)) {
252 		return -1;
253 	}
254 
255 	*length = len;
256 	*iovs_cnt = cnt;
257 	return 0;
258 }
259 
260 static int
261 blk_iovs_packed_desc_setup(struct spdk_vhost_session *vsession,
262 			   struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
263 			   struct vring_packed_desc *desc_table, uint16_t desc_table_size,
264 			   struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
265 {
266 	struct vring_packed_desc *desc;
267 	uint16_t cnt = 0, out_cnt = 0;
268 	uint32_t len = 0;
269 
270 	if (desc_table == NULL) {
271 		desc = &vq->vring.desc_packed[req_idx];
272 	} else {
273 		req_idx = 0;
274 		desc = desc_table;
275 	}
276 
277 	while (1) {
278 		/*
279 		 * Maximum cnt reached?
280 		 * Should not happen if request is well formatted, otherwise this is a BUG.
281 		 */
282 		if (spdk_unlikely(cnt == *iovs_cnt)) {
283 			SPDK_ERRLOG("%s: max IOVs in request reached (req_idx = %"PRIu16").\n",
284 				    vsession->name, req_idx);
285 			return -EINVAL;
286 		}
287 
288 		if (spdk_unlikely(vhost_vring_packed_desc_to_iov(vsession, iovs, &cnt, desc))) {
289 			SPDK_ERRLOG("%s: invalid descriptor %" PRIu16" (req_idx = %"PRIu16").\n",
290 				    vsession->name, req_idx, cnt);
291 			return -EINVAL;
292 		}
293 
294 		len += desc->len;
295 		out_cnt += vhost_vring_packed_desc_is_wr(desc);
296 
297 		/* desc is NULL means we reach the last desc of this request */
298 		vhost_vring_packed_desc_get_next(&desc, &req_idx, vq, desc_table, desc_table_size);
299 		if (desc == NULL) {
300 			break;
301 		}
302 	}
303 
304 	/*
305 	 * There must be least two descriptors.
306 	 * First contain request so it must be readable.
307 	 * Last descriptor contain buffer for response so it must be writable.
308 	 */
309 	if (spdk_unlikely(out_cnt == 0 || cnt < 2)) {
310 		return -EINVAL;
311 	}
312 
313 	*length = len;
314 	*iovs_cnt = cnt;
315 
316 	return 0;
317 }
318 
319 static int
320 blk_iovs_packed_queue_setup(struct spdk_vhost_blk_session *bvsession,
321 			    struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
322 			    struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
323 {
324 	struct spdk_vhost_session *vsession = &bvsession->vsession;
325 	struct spdk_vhost_dev *vdev = vsession->vdev;
326 	struct vring_packed_desc *desc = NULL, *desc_table;
327 	uint32_t desc_table_size;
328 	int rc;
329 
330 	rc = vhost_vq_get_desc_packed(vsession, vq, req_idx, &desc,
331 				      &desc_table, &desc_table_size);
332 	if (spdk_unlikely(rc != 0)) {
333 		SPDK_ERRLOG("%s: Invalid descriptor at index %"PRIu16".\n", vdev->name, req_idx);
334 		return rc;
335 	}
336 
337 	return blk_iovs_packed_desc_setup(vsession, vq, req_idx, desc_table, desc_table_size,
338 					  iovs, iovs_cnt, length);
339 }
340 
341 static int
342 blk_iovs_inflight_queue_setup(struct spdk_vhost_blk_session *bvsession,
343 			      struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
344 			      struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
345 {
346 	struct spdk_vhost_session *vsession = &bvsession->vsession;
347 	struct spdk_vhost_dev *vdev = vsession->vdev;
348 	spdk_vhost_inflight_desc *inflight_desc;
349 	struct vring_packed_desc *desc_table;
350 	uint16_t out_cnt = 0, cnt = 0;
351 	uint32_t desc_table_size, len = 0;
352 	int rc = 0;
353 
354 	rc = vhost_inflight_queue_get_desc(vsession, vq->vring_inflight.inflight_packed->desc,
355 					   req_idx, &inflight_desc, &desc_table, &desc_table_size);
356 	if (spdk_unlikely(rc != 0)) {
357 		SPDK_ERRLOG("%s: Invalid descriptor at index %"PRIu16".\n", vdev->name, req_idx);
358 		return rc;
359 	}
360 
361 	if (desc_table != NULL) {
362 		return blk_iovs_packed_desc_setup(vsession, vq, req_idx, desc_table, desc_table_size,
363 						  iovs, iovs_cnt, length);
364 	}
365 
366 	while (1) {
367 		/*
368 		 * Maximum cnt reached?
369 		 * Should not happen if request is well formatted, otherwise this is a BUG.
370 		 */
371 		if (spdk_unlikely(cnt == *iovs_cnt)) {
372 			SPDK_ERRLOG("%s: max IOVs in request reached (req_idx = %"PRIu16").\n",
373 				    vsession->name, req_idx);
374 			return -EINVAL;
375 		}
376 
377 		if (spdk_unlikely(vhost_vring_inflight_desc_to_iov(vsession, iovs, &cnt, inflight_desc))) {
378 			SPDK_ERRLOG("%s: invalid descriptor %" PRIu16" (req_idx = %"PRIu16").\n",
379 				    vsession->name, req_idx, cnt);
380 			return -EINVAL;
381 		}
382 
383 		len += inflight_desc->len;
384 		out_cnt += vhost_vring_inflight_desc_is_wr(inflight_desc);
385 
386 		/* Without F_NEXT means it's the last desc */
387 		if ((inflight_desc->flags & VRING_DESC_F_NEXT) == 0) {
388 			break;
389 		}
390 
391 		inflight_desc = &vq->vring_inflight.inflight_packed->desc[inflight_desc->next];
392 	}
393 
394 	/*
395 	 * There must be least two descriptors.
396 	 * First contain request so it must be readable.
397 	 * Last descriptor contain buffer for response so it must be writable.
398 	 */
399 	if (spdk_unlikely(out_cnt == 0 || cnt < 2)) {
400 		return -EINVAL;
401 	}
402 
403 	*length = len;
404 	*iovs_cnt = cnt;
405 
406 	return 0;
407 }
408 
409 static void
410 blk_request_complete_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
411 {
412 	struct spdk_vhost_blk_task *task = cb_arg;
413 
414 	spdk_bdev_free_io(bdev_io);
415 	blk_request_finish(success ? VIRTIO_BLK_S_OK : VIRTIO_BLK_S_IOERR, task);
416 }
417 
418 static void
419 blk_request_resubmit(void *arg)
420 {
421 	struct spdk_vhost_blk_task *task = arg;
422 	int rc = 0;
423 
424 	rc = virtio_blk_process_request(task->bdev_io_wait_vdev, task->bdev_io_wait_ch, task,
425 					task->cb, task->cb_arg);
426 	if (rc == 0) {
427 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p resubmitted ======\n", task);
428 	} else {
429 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p failed ======\n", task);
430 	}
431 }
432 
433 static inline void
434 blk_request_queue_io(struct spdk_vhost_dev *vdev, struct spdk_io_channel *ch,
435 		     struct spdk_vhost_blk_task *task)
436 {
437 	int rc;
438 	struct spdk_bdev *bdev = vhost_blk_get_bdev(vdev);
439 
440 	task->bdev_io_wait.bdev = bdev;
441 	task->bdev_io_wait.cb_fn = blk_request_resubmit;
442 	task->bdev_io_wait.cb_arg = task;
443 	task->bdev_io_wait_ch = ch;
444 	task->bdev_io_wait_vdev = vdev;
445 
446 	rc = spdk_bdev_queue_io_wait(bdev, ch, &task->bdev_io_wait);
447 	if (rc != 0) {
448 		blk_request_finish(VIRTIO_BLK_S_IOERR, task);
449 	}
450 }
451 
452 int
453 virtio_blk_process_request(struct spdk_vhost_dev *vdev, struct spdk_io_channel *ch,
454 			   struct spdk_vhost_blk_task *task, virtio_blk_request_cb cb, void *cb_arg)
455 {
456 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
457 	struct virtio_blk_outhdr req;
458 	struct virtio_blk_discard_write_zeroes *desc;
459 	struct iovec *iov;
460 	uint32_t type;
461 	uint64_t flush_bytes;
462 	uint32_t payload_len;
463 	uint16_t iovcnt;
464 	int rc;
465 
466 	task->cb = cb;
467 	task->cb_arg = cb_arg;
468 
469 	iov = &task->iovs[0];
470 	if (spdk_unlikely(iov->iov_len != sizeof(req))) {
471 		SPDK_DEBUGLOG(vhost_blk,
472 			      "First descriptor size is %zu but expected %zu (task = %p).\n",
473 			      iov->iov_len, sizeof(req), task);
474 		blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
475 		return -1;
476 	}
477 
478 	/* Some SeaBIOS versions don't align the virtio_blk_outhdr on an 8-byte boundary, which
479 	 * triggers ubsan errors.  So copy this small 16-byte structure to the stack to workaround
480 	 * this problem.
481 	 */
482 	memcpy(&req, iov->iov_base, sizeof(req));
483 
484 	iov = &task->iovs[task->iovcnt - 1];
485 	if (spdk_unlikely(iov->iov_len != 1)) {
486 		SPDK_DEBUGLOG(vhost_blk,
487 			      "Last descriptor size is %zu but expected %d (task = %p).\n",
488 			      iov->iov_len, 1, task);
489 		blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
490 		return -1;
491 	}
492 
493 	payload_len = task->payload_size;
494 	task->status = iov->iov_base;
495 	payload_len -= sizeof(req) + sizeof(*task->status);
496 	iovcnt = task->iovcnt - 2;
497 
498 	type = req.type;
499 #ifdef VIRTIO_BLK_T_BARRIER
500 	/* Don't care about barrier for now (as QEMU's virtio-blk do). */
501 	type &= ~VIRTIO_BLK_T_BARRIER;
502 #endif
503 
504 	switch (type) {
505 	case VIRTIO_BLK_T_IN:
506 	case VIRTIO_BLK_T_OUT:
507 		if (spdk_unlikely(payload_len == 0 || (payload_len & (512 - 1)) != 0)) {
508 			SPDK_ERRLOG("%s - passed IO buffer is not multiple of 512b (task = %p).\n",
509 				    type ? "WRITE" : "READ", task);
510 			blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
511 			return -1;
512 		}
513 
514 		if (type == VIRTIO_BLK_T_IN) {
515 			task->used_len = payload_len + sizeof(*task->status);
516 			rc = spdk_bdev_readv(bvdev->bdev_desc, ch,
517 					     &task->iovs[1], iovcnt, req.sector * 512,
518 					     payload_len, blk_request_complete_cb, task);
519 		} else if (!bvdev->readonly) {
520 			task->used_len = sizeof(*task->status);
521 			rc = spdk_bdev_writev(bvdev->bdev_desc, ch,
522 					      &task->iovs[1], iovcnt, req.sector * 512,
523 					      payload_len, blk_request_complete_cb, task);
524 		} else {
525 			SPDK_DEBUGLOG(vhost_blk, "Device is in read-only mode!\n");
526 			rc = -1;
527 		}
528 
529 		if (rc) {
530 			if (rc == -ENOMEM) {
531 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
532 				blk_request_queue_io(vdev, ch, task);
533 			} else {
534 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
535 				return -1;
536 			}
537 		}
538 		break;
539 	case VIRTIO_BLK_T_DISCARD:
540 		desc = task->iovs[1].iov_base;
541 		if (payload_len != sizeof(*desc)) {
542 			SPDK_NOTICELOG("Invalid discard payload size: %u\n", payload_len);
543 			blk_request_finish(VIRTIO_BLK_S_IOERR, task);
544 			return -1;
545 		}
546 
547 		if (desc->flags & VIRTIO_BLK_WRITE_ZEROES_FLAG_UNMAP) {
548 			SPDK_ERRLOG("UNMAP flag is only used for WRITE ZEROES command\n");
549 			blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
550 			return -1;
551 		}
552 
553 		rc = spdk_bdev_unmap(bvdev->bdev_desc, ch,
554 				     desc->sector * 512, desc->num_sectors * 512,
555 				     blk_request_complete_cb, task);
556 		if (rc) {
557 			if (rc == -ENOMEM) {
558 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
559 				blk_request_queue_io(vdev, ch, task);
560 			} else {
561 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
562 				return -1;
563 			}
564 		}
565 		break;
566 	case VIRTIO_BLK_T_WRITE_ZEROES:
567 		desc = task->iovs[1].iov_base;
568 		if (payload_len != sizeof(*desc)) {
569 			SPDK_NOTICELOG("Invalid write zeroes payload size: %u\n", payload_len);
570 			blk_request_finish(VIRTIO_BLK_S_IOERR, task);
571 			return -1;
572 		}
573 
574 		/* Unmap this range, SPDK doesn't support it, kernel will enable this flag by default
575 		 * without checking unmap feature is negotiated or not, the flag isn't mandatory, so
576 		 * just print a warning.
577 		 */
578 		if (desc->flags & VIRTIO_BLK_WRITE_ZEROES_FLAG_UNMAP) {
579 			SPDK_WARNLOG("Ignore the unmap flag for WRITE ZEROES from %"PRIx64", len %"PRIx64"\n",
580 				     (uint64_t)desc->sector * 512, (uint64_t)desc->num_sectors * 512);
581 		}
582 
583 		rc = spdk_bdev_write_zeroes(bvdev->bdev_desc, ch,
584 					    desc->sector * 512, desc->num_sectors * 512,
585 					    blk_request_complete_cb, task);
586 		if (rc) {
587 			if (rc == -ENOMEM) {
588 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
589 				blk_request_queue_io(vdev, ch, task);
590 			} else {
591 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
592 				return -1;
593 			}
594 		}
595 		break;
596 	case VIRTIO_BLK_T_FLUSH:
597 		flush_bytes = spdk_bdev_get_num_blocks(bvdev->bdev) * spdk_bdev_get_block_size(bvdev->bdev);
598 		if (req.sector != 0) {
599 			SPDK_NOTICELOG("sector must be zero for flush command\n");
600 			blk_request_finish(VIRTIO_BLK_S_IOERR, task);
601 			return -1;
602 		}
603 		rc = spdk_bdev_flush(bvdev->bdev_desc, ch,
604 				     0, flush_bytes,
605 				     blk_request_complete_cb, task);
606 		if (rc) {
607 			if (rc == -ENOMEM) {
608 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
609 				blk_request_queue_io(vdev, ch, task);
610 			} else {
611 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
612 				return -1;
613 			}
614 		}
615 		break;
616 	case VIRTIO_BLK_T_GET_ID:
617 		if (!iovcnt || !payload_len) {
618 			blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
619 			return -1;
620 		}
621 		task->used_len = spdk_min((size_t)VIRTIO_BLK_ID_BYTES, task->iovs[1].iov_len);
622 		spdk_strcpy_pad(task->iovs[1].iov_base, spdk_bdev_get_name(bvdev->bdev),
623 				task->used_len, ' ');
624 		blk_request_finish(VIRTIO_BLK_S_OK, task);
625 		break;
626 	default:
627 		SPDK_DEBUGLOG(vhost_blk, "Not supported request type '%"PRIu32"'.\n", type);
628 		blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
629 		return -1;
630 	}
631 
632 	return 0;
633 }
634 
635 static void
636 process_blk_task(struct spdk_vhost_virtqueue *vq, uint16_t req_idx)
637 {
638 	struct spdk_vhost_user_blk_task *task;
639 	struct spdk_vhost_blk_task *blk_task;
640 	int rc;
641 
642 	assert(vq->packed.packed_ring == false);
643 
644 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[req_idx];
645 	blk_task = &task->blk_task;
646 	if (spdk_unlikely(task->used)) {
647 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
648 			    task->bvsession->vsession.name, req_idx);
649 		blk_task->used_len = 0;
650 		blk_task_enqueue(task);
651 		return;
652 	}
653 
654 	task->bvsession->vsession.task_cnt++;
655 
656 	blk_task_init(task);
657 
658 	rc = blk_iovs_split_queue_setup(task->bvsession, vq, task->req_idx,
659 					blk_task->iovs, &blk_task->iovcnt, &blk_task->payload_size);
660 
661 	if (rc) {
662 		SPDK_DEBUGLOG(vhost_blk, "Invalid request (req_idx = %"PRIu16").\n", task->req_idx);
663 		/* Only READ and WRITE are supported for now. */
664 		vhost_user_blk_request_finish(VIRTIO_BLK_S_UNSUPP, blk_task, NULL);
665 		return;
666 	}
667 
668 	if (vhost_user_process_blk_request(task) == 0) {
669 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d submitted ======\n", task,
670 			      req_idx);
671 	} else {
672 		SPDK_ERRLOG("====== Task %p req_idx %d failed ======\n", task, req_idx);
673 	}
674 }
675 
676 static void
677 process_packed_blk_task(struct spdk_vhost_virtqueue *vq, uint16_t req_idx)
678 {
679 	struct spdk_vhost_user_blk_task *task;
680 	struct spdk_vhost_blk_task *blk_task;
681 	uint16_t task_idx = req_idx, num_descs;
682 	int rc;
683 
684 	assert(vq->packed.packed_ring);
685 
686 	/* Packed ring used the buffer_id as the task_idx to get task struct.
687 	 * In kernel driver, it uses the vq->free_head to set the buffer_id so the value
688 	 * must be in the range of 0 ~ vring.size. The free_head value must be unique
689 	 * in the outstanding requests.
690 	 * We can't use the req_idx as the task_idx because the desc can be reused in
691 	 * the next phase even when it's not completed in the previous phase. For example,
692 	 * At phase 0, last_used_idx was 2 and desc0 was not completed.Then after moving
693 	 * phase 1, last_avail_idx is updated to 1. In this case, req_idx can not be used
694 	 * as task_idx because we will know task[0]->used is true at phase 1.
695 	 * The split queue is quite different, the desc would insert into the free list when
696 	 * device completes the request, the driver gets the desc from the free list which
697 	 * ensures the req_idx is unique in the outstanding requests.
698 	 */
699 	task_idx = vhost_vring_packed_desc_get_buffer_id(vq, req_idx, &num_descs);
700 
701 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[task_idx];
702 	blk_task = &task->blk_task;
703 	if (spdk_unlikely(task->used)) {
704 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
705 			    task->bvsession->vsession.name, task_idx);
706 		blk_task->used_len = 0;
707 		blk_task_enqueue(task);
708 		return;
709 	}
710 
711 	task->req_idx = req_idx;
712 	task->num_descs = num_descs;
713 	task->buffer_id = task_idx;
714 
715 	rte_vhost_set_inflight_desc_packed(task->bvsession->vsession.vid, vq->vring_idx,
716 					   req_idx, (req_idx + num_descs - 1) % vq->vring.size,
717 					   &task->inflight_head);
718 
719 	task->bvsession->vsession.task_cnt++;
720 
721 	blk_task_init(task);
722 
723 	rc = blk_iovs_packed_queue_setup(task->bvsession, vq, task->req_idx, blk_task->iovs,
724 					 &blk_task->iovcnt,
725 					 &blk_task->payload_size);
726 	if (rc) {
727 		SPDK_DEBUGLOG(vhost_blk, "Invalid request (req_idx = %"PRIu16").\n", task->req_idx);
728 		/* Only READ and WRITE are supported for now. */
729 		vhost_user_blk_request_finish(VIRTIO_BLK_S_UNSUPP, blk_task, NULL);
730 		return;
731 	}
732 
733 	if (vhost_user_process_blk_request(task) == 0) {
734 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d submitted ======\n", task,
735 			      task_idx);
736 	} else {
737 		SPDK_ERRLOG("====== Task %p req_idx %d failed ======\n", task, task_idx);
738 	}
739 }
740 
741 static void
742 process_packed_inflight_blk_task(struct spdk_vhost_virtqueue *vq,
743 				 uint16_t req_idx)
744 {
745 	spdk_vhost_inflight_desc *desc_array = vq->vring_inflight.inflight_packed->desc;
746 	spdk_vhost_inflight_desc *desc = &desc_array[req_idx];
747 	struct spdk_vhost_user_blk_task *task;
748 	struct spdk_vhost_blk_task *blk_task;
749 	uint16_t task_idx, num_descs;
750 	int rc;
751 
752 	task_idx = desc_array[desc->last].id;
753 	num_descs = desc->num;
754 	/* In packed ring reconnection, we use the last_used_idx as the
755 	 * initial value. So when we process the inflight descs we still
756 	 * need to update the available ring index.
757 	 */
758 	vq->last_avail_idx += num_descs;
759 	if (vq->last_avail_idx >= vq->vring.size) {
760 		vq->last_avail_idx -= vq->vring.size;
761 		vq->packed.avail_phase = !vq->packed.avail_phase;
762 	}
763 
764 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[task_idx];
765 	blk_task = &task->blk_task;
766 	if (spdk_unlikely(task->used)) {
767 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
768 			    task->bvsession->vsession.name, task_idx);
769 		blk_task->used_len = 0;
770 		blk_task_enqueue(task);
771 		return;
772 	}
773 
774 	task->req_idx = req_idx;
775 	task->num_descs = num_descs;
776 	task->buffer_id = task_idx;
777 	/* It's for cleaning inflight entries */
778 	task->inflight_head = req_idx;
779 
780 	task->bvsession->vsession.task_cnt++;
781 
782 	blk_task_init(task);
783 
784 	rc = blk_iovs_inflight_queue_setup(task->bvsession, vq, task->req_idx, blk_task->iovs,
785 					   &blk_task->iovcnt,
786 					   &blk_task->payload_size);
787 	if (rc) {
788 		SPDK_DEBUGLOG(vhost_blk, "Invalid request (req_idx = %"PRIu16").\n", task->req_idx);
789 		/* Only READ and WRITE are supported for now. */
790 		vhost_user_blk_request_finish(VIRTIO_BLK_S_UNSUPP, blk_task, NULL);
791 		return;
792 	}
793 
794 	if (vhost_user_process_blk_request(task) == 0) {
795 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d submitted ======\n", task,
796 			      task_idx);
797 	} else {
798 		SPDK_ERRLOG("====== Task %p req_idx %d failed ======\n", task, task_idx);
799 	}
800 }
801 
802 static int
803 submit_inflight_desc(struct spdk_vhost_blk_session *bvsession,
804 		     struct spdk_vhost_virtqueue *vq)
805 {
806 	struct spdk_vhost_session *vsession;
807 	spdk_vhost_resubmit_info *resubmit;
808 	spdk_vhost_resubmit_desc *resubmit_list;
809 	uint16_t req_idx;
810 	int i, resubmit_cnt;
811 
812 	resubmit = vq->vring_inflight.resubmit_inflight;
813 	if (spdk_likely(resubmit == NULL || resubmit->resubmit_list == NULL ||
814 			resubmit->resubmit_num == 0)) {
815 		return 0;
816 	}
817 
818 	resubmit_list = resubmit->resubmit_list;
819 	vsession = &bvsession->vsession;
820 
821 	for (i = resubmit->resubmit_num - 1; i >= 0; --i) {
822 		req_idx = resubmit_list[i].index;
823 		SPDK_DEBUGLOG(vhost_blk, "====== Start processing resubmit request idx %"PRIu16"======\n",
824 			      req_idx);
825 
826 		if (spdk_unlikely(req_idx >= vq->vring.size)) {
827 			SPDK_ERRLOG("%s: request idx '%"PRIu16"' exceeds virtqueue size (%"PRIu16").\n",
828 				    vsession->name, req_idx, vq->vring.size);
829 			vhost_vq_used_ring_enqueue(vsession, vq, req_idx, 0);
830 			continue;
831 		}
832 
833 		if (vq->packed.packed_ring) {
834 			process_packed_inflight_blk_task(vq, req_idx);
835 		} else {
836 			process_blk_task(vq, req_idx);
837 		}
838 	}
839 	resubmit_cnt = resubmit->resubmit_num;
840 	resubmit->resubmit_num = 0;
841 	return resubmit_cnt;
842 }
843 
844 static int
845 process_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
846 {
847 	struct spdk_vhost_session *vsession = &bvsession->vsession;
848 	uint16_t reqs[SPDK_VHOST_VQ_MAX_SUBMISSIONS];
849 	uint16_t reqs_cnt, i;
850 	int resubmit_cnt = 0;
851 
852 	resubmit_cnt = submit_inflight_desc(bvsession, vq);
853 
854 	reqs_cnt = vhost_vq_avail_ring_get(vq, reqs, SPDK_COUNTOF(reqs));
855 	if (!reqs_cnt) {
856 		return resubmit_cnt;
857 	}
858 
859 	for (i = 0; i < reqs_cnt; i++) {
860 		SPDK_DEBUGLOG(vhost_blk, "====== Starting processing request idx %"PRIu16"======\n",
861 			      reqs[i]);
862 
863 		if (spdk_unlikely(reqs[i] >= vq->vring.size)) {
864 			SPDK_ERRLOG("%s: request idx '%"PRIu16"' exceeds virtqueue size (%"PRIu16").\n",
865 				    vsession->name, reqs[i], vq->vring.size);
866 			vhost_vq_used_ring_enqueue(vsession, vq, reqs[i], 0);
867 			continue;
868 		}
869 
870 		rte_vhost_set_inflight_desc_split(vsession->vid, vq->vring_idx, reqs[i]);
871 
872 		process_blk_task(vq, reqs[i]);
873 	}
874 
875 	return reqs_cnt;
876 }
877 
878 static int
879 process_packed_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
880 {
881 	uint16_t i = 0;
882 	uint16_t count = 0;
883 	int resubmit_cnt = 0;
884 
885 	resubmit_cnt = submit_inflight_desc(bvsession, vq);
886 
887 	while (i++ < SPDK_VHOST_VQ_MAX_SUBMISSIONS &&
888 	       vhost_vq_packed_ring_is_avail(vq)) {
889 		SPDK_DEBUGLOG(vhost_blk, "====== Starting processing request idx %"PRIu16"======\n",
890 			      vq->last_avail_idx);
891 		count++;
892 		process_packed_blk_task(vq, vq->last_avail_idx);
893 	}
894 
895 	return count > 0 ? count : resubmit_cnt;
896 }
897 
898 static int
899 _vdev_vq_worker(struct spdk_vhost_virtqueue *vq)
900 {
901 	struct spdk_vhost_session *vsession = vq->vsession;
902 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
903 	bool packed_ring;
904 	int rc = 0;
905 
906 	packed_ring = vq->packed.packed_ring;
907 	if (packed_ring) {
908 		rc = process_packed_vq(bvsession, vq);
909 	} else {
910 		rc = process_vq(bvsession, vq);
911 	}
912 
913 	vhost_session_vq_used_signal(vq);
914 
915 	return rc;
916 
917 }
918 
919 static int
920 vdev_vq_worker(void *arg)
921 {
922 	struct spdk_vhost_virtqueue *vq = arg;
923 
924 	return _vdev_vq_worker(vq);
925 }
926 
927 static int
928 vdev_worker(void *arg)
929 {
930 	struct spdk_vhost_blk_session *bvsession = arg;
931 	struct spdk_vhost_session *vsession = &bvsession->vsession;
932 	uint16_t q_idx;
933 	int rc = 0;
934 
935 	for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
936 		rc += _vdev_vq_worker(&vsession->virtqueue[q_idx]);
937 	}
938 
939 	return rc > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
940 }
941 
942 static void
943 no_bdev_process_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
944 {
945 	struct spdk_vhost_session *vsession = &bvsession->vsession;
946 	struct iovec iovs[SPDK_VHOST_IOVS_MAX];
947 	uint32_t length;
948 	uint16_t iovcnt, req_idx;
949 
950 	if (vhost_vq_avail_ring_get(vq, &req_idx, 1) != 1) {
951 		return;
952 	}
953 
954 	iovcnt = SPDK_COUNTOF(iovs);
955 	if (blk_iovs_split_queue_setup(bvsession, vq, req_idx, iovs, &iovcnt, &length) == 0) {
956 		*(volatile uint8_t *)iovs[iovcnt - 1].iov_base = VIRTIO_BLK_S_IOERR;
957 		SPDK_DEBUGLOG(vhost_blk_data, "Aborting request %" PRIu16"\n", req_idx);
958 	}
959 
960 	vhost_vq_used_ring_enqueue(vsession, vq, req_idx, 0);
961 }
962 
963 static void
964 no_bdev_process_packed_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
965 {
966 	struct spdk_vhost_session *vsession = &bvsession->vsession;
967 	struct spdk_vhost_user_blk_task *task;
968 	struct spdk_vhost_blk_task *blk_task;
969 	uint32_t length;
970 	uint16_t req_idx = vq->last_avail_idx;
971 	uint16_t task_idx, num_descs;
972 
973 	if (!vhost_vq_packed_ring_is_avail(vq)) {
974 		return;
975 	}
976 
977 	task_idx = vhost_vring_packed_desc_get_buffer_id(vq, req_idx, &num_descs);
978 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[task_idx];
979 	blk_task = &task->blk_task;
980 	if (spdk_unlikely(task->used)) {
981 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
982 			    vsession->name, req_idx);
983 		vhost_vq_packed_ring_enqueue(vsession, vq, num_descs,
984 					     task->buffer_id, blk_task->used_len,
985 					     task->inflight_head);
986 		return;
987 	}
988 
989 	task->req_idx = req_idx;
990 	task->num_descs = num_descs;
991 	task->buffer_id = task_idx;
992 	blk_task_init(task);
993 
994 	if (blk_iovs_packed_queue_setup(bvsession, vq, task->req_idx, blk_task->iovs, &blk_task->iovcnt,
995 					&length)) {
996 		*(volatile uint8_t *)(blk_task->iovs[blk_task->iovcnt - 1].iov_base) = VIRTIO_BLK_S_IOERR;
997 		SPDK_DEBUGLOG(vhost_blk_data, "Aborting request %" PRIu16"\n", req_idx);
998 	}
999 
1000 	task->used = false;
1001 	vhost_vq_packed_ring_enqueue(vsession, vq, num_descs,
1002 				     task->buffer_id, blk_task->used_len,
1003 				     task->inflight_head);
1004 }
1005 
1006 static int
1007 _no_bdev_vdev_vq_worker(struct spdk_vhost_virtqueue *vq)
1008 {
1009 	struct spdk_vhost_session *vsession = vq->vsession;
1010 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1011 	bool packed_ring;
1012 
1013 	packed_ring = vq->packed.packed_ring;
1014 	if (packed_ring) {
1015 		no_bdev_process_packed_vq(bvsession, vq);
1016 	} else {
1017 		no_bdev_process_vq(bvsession, vq);
1018 	}
1019 
1020 	vhost_session_vq_used_signal(vq);
1021 
1022 	if (vsession->task_cnt == 0 && bvsession->io_channel) {
1023 		vhost_blk_put_io_channel(bvsession->io_channel);
1024 		bvsession->io_channel = NULL;
1025 	}
1026 
1027 	return SPDK_POLLER_BUSY;
1028 }
1029 
1030 static int
1031 no_bdev_vdev_vq_worker(void *arg)
1032 {
1033 	struct spdk_vhost_virtqueue *vq = arg;
1034 
1035 	return _no_bdev_vdev_vq_worker(vq);
1036 }
1037 
1038 static int
1039 no_bdev_vdev_worker(void *arg)
1040 {
1041 	struct spdk_vhost_blk_session *bvsession = arg;
1042 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1043 	uint16_t q_idx;
1044 
1045 	for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
1046 		_no_bdev_vdev_vq_worker(&vsession->virtqueue[q_idx]);
1047 	}
1048 
1049 	return SPDK_POLLER_BUSY;
1050 }
1051 
1052 static void
1053 vhost_blk_session_unregister_interrupts(struct spdk_vhost_blk_session *bvsession)
1054 {
1055 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1056 	struct spdk_vhost_virtqueue *vq;
1057 	int i;
1058 
1059 	SPDK_DEBUGLOG(vhost_blk, "unregister virtqueues interrupt\n");
1060 	for (i = 0; i < vsession->max_queues; i++) {
1061 		vq = &vsession->virtqueue[i];
1062 		if (vq->intr == NULL) {
1063 			break;
1064 		}
1065 
1066 		SPDK_DEBUGLOG(vhost_blk, "unregister vq[%d]'s kickfd is %d\n",
1067 			      i, vq->vring.kickfd);
1068 		spdk_interrupt_unregister(&vq->intr);
1069 	}
1070 }
1071 
1072 static int
1073 vhost_blk_session_register_interrupts(struct spdk_vhost_blk_session *bvsession,
1074 				      spdk_interrupt_fn fn, const char *name)
1075 {
1076 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1077 	struct spdk_vhost_virtqueue *vq = NULL;
1078 	int i;
1079 
1080 	SPDK_DEBUGLOG(vhost_blk, "Register virtqueues interrupt\n");
1081 	for (i = 0; i < vsession->max_queues; i++) {
1082 		vq = &vsession->virtqueue[i];
1083 		SPDK_DEBUGLOG(vhost_blk, "Register vq[%d]'s kickfd is %d\n",
1084 			      i, vq->vring.kickfd);
1085 
1086 		vq->intr = spdk_interrupt_register(vq->vring.kickfd, fn, vq, name);
1087 		if (vq->intr == NULL) {
1088 			SPDK_ERRLOG("Fail to register req notifier handler.\n");
1089 			goto err;
1090 		}
1091 	}
1092 
1093 	return 0;
1094 
1095 err:
1096 	vhost_blk_session_unregister_interrupts(bvsession);
1097 
1098 	return -1;
1099 }
1100 
1101 static void
1102 vhost_blk_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1103 {
1104 	struct spdk_vhost_blk_session *bvsession = cb_arg;
1105 
1106 	vhost_user_session_set_interrupt_mode(&bvsession->vsession, interrupt_mode);
1107 }
1108 
1109 static void
1110 bdev_event_cpl_cb(struct spdk_vhost_dev *vdev, void *ctx)
1111 {
1112 	enum spdk_bdev_event_type type = (enum spdk_bdev_event_type)(uintptr_t)ctx;
1113 	struct spdk_vhost_blk_dev *bvdev;
1114 
1115 	if (type == SPDK_BDEV_EVENT_REMOVE) {
1116 		/* All sessions have been notified, time to close the bdev */
1117 		bvdev = to_blk_dev(vdev);
1118 		assert(bvdev != NULL);
1119 		spdk_put_io_channel(bvdev->dummy_io_channel);
1120 		spdk_bdev_close(bvdev->bdev_desc);
1121 		bvdev->bdev_desc = NULL;
1122 		bvdev->bdev = NULL;
1123 	}
1124 }
1125 
1126 static int
1127 vhost_session_bdev_resize_cb(struct spdk_vhost_dev *vdev,
1128 			     struct spdk_vhost_session *vsession,
1129 			     void *ctx)
1130 {
1131 #if RTE_VERSION >= RTE_VERSION_NUM(20, 02, 0, 0)
1132 	SPDK_NOTICELOG("bdev send slave msg to vid(%d)\n", vsession->vid);
1133 	rte_vhost_slave_config_change(vsession->vid, false);
1134 #else
1135 	SPDK_NOTICELOG("bdev does not support resize until DPDK submodule version >= 20.02\n");
1136 #endif
1137 
1138 	return 0;
1139 }
1140 
1141 static void
1142 vhost_user_blk_resize_cb(struct spdk_vhost_dev *vdev, bdev_event_cb_complete cb, void *cb_arg)
1143 {
1144 	spdk_vhost_lock();
1145 	vhost_user_dev_foreach_session(vdev, vhost_session_bdev_resize_cb,
1146 				       cb, cb_arg);
1147 	spdk_vhost_unlock();
1148 }
1149 
1150 static int
1151 vhost_user_session_bdev_remove_cb(struct spdk_vhost_dev *vdev,
1152 				  struct spdk_vhost_session *vsession,
1153 				  void *ctx)
1154 {
1155 	struct spdk_vhost_blk_session *bvsession;
1156 	int rc;
1157 
1158 	bvsession = to_blk_session(vsession);
1159 	if (bvsession->requestq_poller) {
1160 		spdk_poller_unregister(&bvsession->requestq_poller);
1161 		if (vsession->virtqueue[0].intr) {
1162 			vhost_blk_session_unregister_interrupts(bvsession);
1163 			rc = vhost_blk_session_register_interrupts(bvsession, no_bdev_vdev_vq_worker,
1164 					"no_bdev_vdev_vq_worker");
1165 			if (rc) {
1166 				SPDK_ERRLOG("%s: Interrupt register failed\n", vsession->name);
1167 				return rc;
1168 			}
1169 		}
1170 
1171 		bvsession->requestq_poller = SPDK_POLLER_REGISTER(no_bdev_vdev_worker, bvsession, 0);
1172 		spdk_poller_register_interrupt(bvsession->requestq_poller, vhost_blk_poller_set_interrupt_mode,
1173 					       bvsession);
1174 	}
1175 
1176 	return 0;
1177 }
1178 
1179 static void
1180 vhost_user_bdev_remove_cb(struct spdk_vhost_dev *vdev, bdev_event_cb_complete cb, void *cb_arg)
1181 {
1182 	SPDK_WARNLOG("%s: hot-removing bdev - all further requests will fail.\n",
1183 		     vdev->name);
1184 
1185 	spdk_vhost_lock();
1186 	vhost_user_dev_foreach_session(vdev, vhost_user_session_bdev_remove_cb,
1187 				       cb, cb_arg);
1188 	spdk_vhost_unlock();
1189 }
1190 
1191 static void
1192 vhost_user_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_vhost_dev *vdev,
1193 			 bdev_event_cb_complete cb, void *cb_arg)
1194 {
1195 	switch (type) {
1196 	case SPDK_BDEV_EVENT_REMOVE:
1197 		vhost_user_bdev_remove_cb(vdev, cb, cb_arg);
1198 		break;
1199 	case SPDK_BDEV_EVENT_RESIZE:
1200 		vhost_user_blk_resize_cb(vdev, cb, cb_arg);
1201 		break;
1202 	default:
1203 		assert(false);
1204 		return;
1205 	}
1206 }
1207 
1208 static void
1209 bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1210 	      void *event_ctx)
1211 {
1212 	struct spdk_vhost_dev *vdev = (struct spdk_vhost_dev *)event_ctx;
1213 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1214 
1215 	SPDK_DEBUGLOG(vhost_blk, "Bdev event: type %d, name %s\n",
1216 		      type,
1217 		      bdev->name);
1218 
1219 	switch (type) {
1220 	case SPDK_BDEV_EVENT_REMOVE:
1221 	case SPDK_BDEV_EVENT_RESIZE:
1222 		bvdev->ops->bdev_event(type, vdev, bdev_event_cpl_cb, (void *)type);
1223 		break;
1224 	default:
1225 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1226 		break;
1227 	}
1228 }
1229 
1230 static void
1231 free_task_pool(struct spdk_vhost_blk_session *bvsession)
1232 {
1233 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1234 	struct spdk_vhost_virtqueue *vq;
1235 	uint16_t i;
1236 
1237 	for (i = 0; i < vsession->max_queues; i++) {
1238 		vq = &vsession->virtqueue[i];
1239 		if (vq->tasks == NULL) {
1240 			continue;
1241 		}
1242 
1243 		spdk_free(vq->tasks);
1244 		vq->tasks = NULL;
1245 	}
1246 }
1247 
1248 static int
1249 alloc_task_pool(struct spdk_vhost_blk_session *bvsession)
1250 {
1251 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1252 	struct spdk_vhost_virtqueue *vq;
1253 	struct spdk_vhost_user_blk_task *task;
1254 	uint32_t task_cnt;
1255 	uint16_t i;
1256 	uint32_t j;
1257 
1258 	for (i = 0; i < vsession->max_queues; i++) {
1259 		vq = &vsession->virtqueue[i];
1260 		if (vq->vring.desc == NULL) {
1261 			continue;
1262 		}
1263 
1264 		task_cnt = vq->vring.size;
1265 		if (task_cnt > SPDK_VHOST_MAX_VQ_SIZE) {
1266 			/* sanity check */
1267 			SPDK_ERRLOG("%s: virtqueue %"PRIu16" is too big. (size = %"PRIu32", max = %"PRIu32")\n",
1268 				    vsession->name, i, task_cnt, SPDK_VHOST_MAX_VQ_SIZE);
1269 			free_task_pool(bvsession);
1270 			return -1;
1271 		}
1272 		vq->tasks = spdk_zmalloc(sizeof(struct spdk_vhost_user_blk_task) * task_cnt,
1273 					 SPDK_CACHE_LINE_SIZE, NULL,
1274 					 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1275 		if (vq->tasks == NULL) {
1276 			SPDK_ERRLOG("%s: failed to allocate %"PRIu32" tasks for virtqueue %"PRIu16"\n",
1277 				    vsession->name, task_cnt, i);
1278 			free_task_pool(bvsession);
1279 			return -1;
1280 		}
1281 
1282 		for (j = 0; j < task_cnt; j++) {
1283 			task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[j];
1284 			task->bvsession = bvsession;
1285 			task->req_idx = j;
1286 			task->vq = vq;
1287 		}
1288 	}
1289 
1290 	return 0;
1291 }
1292 
1293 static int
1294 vhost_blk_start_cb(struct spdk_vhost_dev *vdev,
1295 		   struct spdk_vhost_session *vsession, void *unused)
1296 {
1297 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1298 	struct spdk_vhost_blk_dev *bvdev;
1299 	int i, rc = 0;
1300 
1301 	bvdev = to_blk_dev(vdev);
1302 	assert(bvdev != NULL);
1303 	bvsession->bvdev = bvdev;
1304 
1305 	/* validate all I/O queues are in a contiguous index range */
1306 	for (i = 0; i < vsession->max_queues; i++) {
1307 		/* vring.desc and vring.desc_packed are in a union struct
1308 		 * so q->vring.desc can replace q->vring.desc_packed.
1309 		 */
1310 		if (vsession->virtqueue[i].vring.desc == NULL) {
1311 			SPDK_ERRLOG("%s: queue %"PRIu32" is empty\n", vsession->name, i);
1312 			rc = -1;
1313 			goto out;
1314 		}
1315 	}
1316 
1317 	rc = alloc_task_pool(bvsession);
1318 	if (rc != 0) {
1319 		SPDK_ERRLOG("%s: failed to alloc task pool.\n", vsession->name);
1320 		goto out;
1321 	}
1322 
1323 	if (bvdev->bdev) {
1324 		bvsession->io_channel = vhost_blk_get_io_channel(vdev);
1325 		if (!bvsession->io_channel) {
1326 			free_task_pool(bvsession);
1327 			SPDK_ERRLOG("%s: I/O channel allocation failed\n", vsession->name);
1328 			rc = -1;
1329 			goto out;
1330 		}
1331 	}
1332 
1333 	if (spdk_interrupt_mode_is_enabled()) {
1334 		if (bvdev->bdev) {
1335 			rc = vhost_blk_session_register_interrupts(bvsession,
1336 					vdev_vq_worker,
1337 					"vdev_vq_worker");
1338 		} else {
1339 			rc = vhost_blk_session_register_interrupts(bvsession,
1340 					no_bdev_vdev_vq_worker,
1341 					"no_bdev_vdev_vq_worker");
1342 		}
1343 
1344 		if (rc) {
1345 			SPDK_ERRLOG("%s: Interrupt register failed\n", vsession->name);
1346 			goto out;
1347 		}
1348 	}
1349 
1350 	if (bvdev->bdev) {
1351 		bvsession->requestq_poller = SPDK_POLLER_REGISTER(vdev_worker, bvsession, 0);
1352 	} else {
1353 		bvsession->requestq_poller = SPDK_POLLER_REGISTER(no_bdev_vdev_worker, bvsession, 0);
1354 	}
1355 	SPDK_INFOLOG(vhost, "%s: started poller on lcore %d\n",
1356 		     vsession->name, spdk_env_get_current_core());
1357 
1358 	spdk_poller_register_interrupt(bvsession->requestq_poller, vhost_blk_poller_set_interrupt_mode,
1359 				       bvsession);
1360 
1361 out:
1362 	vhost_user_session_start_done(vsession, rc);
1363 	return rc;
1364 }
1365 
1366 static int
1367 vhost_blk_start(struct spdk_vhost_session *vsession)
1368 {
1369 	return vhost_user_session_send_event(vsession, vhost_blk_start_cb,
1370 					     3, "start session");
1371 }
1372 
1373 static int
1374 destroy_session_poller_cb(void *arg)
1375 {
1376 	struct spdk_vhost_blk_session *bvsession = arg;
1377 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1378 	int i;
1379 
1380 	if (vsession->task_cnt > 0 || spdk_vhost_trylock() != 0) {
1381 		assert(vsession->stop_retry_count > 0);
1382 		vsession->stop_retry_count--;
1383 		if (vsession->stop_retry_count == 0) {
1384 			SPDK_ERRLOG("%s: Timedout when destroy session (task_cnt %d)\n", vsession->name,
1385 				    vsession->task_cnt);
1386 			spdk_poller_unregister(&bvsession->stop_poller);
1387 			vhost_user_session_stop_done(vsession, -ETIMEDOUT);
1388 		}
1389 
1390 		return SPDK_POLLER_BUSY;
1391 	}
1392 
1393 	for (i = 0; i < vsession->max_queues; i++) {
1394 		vsession->virtqueue[i].next_event_time = 0;
1395 		vhost_vq_used_signal(vsession, &vsession->virtqueue[i]);
1396 	}
1397 
1398 	SPDK_INFOLOG(vhost, "%s: stopping poller on lcore %d\n",
1399 		     vsession->name, spdk_env_get_current_core());
1400 
1401 	if (bvsession->io_channel) {
1402 		vhost_blk_put_io_channel(bvsession->io_channel);
1403 		bvsession->io_channel = NULL;
1404 	}
1405 
1406 	free_task_pool(bvsession);
1407 	spdk_poller_unregister(&bvsession->stop_poller);
1408 	vhost_user_session_stop_done(vsession, 0);
1409 
1410 	spdk_vhost_unlock();
1411 	return SPDK_POLLER_BUSY;
1412 }
1413 
1414 static int
1415 vhost_blk_stop_cb(struct spdk_vhost_dev *vdev,
1416 		  struct spdk_vhost_session *vsession, void *unused)
1417 {
1418 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1419 
1420 	spdk_poller_unregister(&bvsession->requestq_poller);
1421 
1422 	if (vsession->virtqueue[0].intr) {
1423 		vhost_blk_session_unregister_interrupts(bvsession);
1424 	}
1425 
1426 	/* vhost_user_session_send_event timeout is 3 seconds, here set retry within 4 seconds */
1427 	bvsession->vsession.stop_retry_count = 4000;
1428 	bvsession->stop_poller = SPDK_POLLER_REGISTER(destroy_session_poller_cb,
1429 				 bvsession, 1000);
1430 	return 0;
1431 }
1432 
1433 static int
1434 vhost_blk_stop(struct spdk_vhost_session *vsession)
1435 {
1436 	return vhost_user_session_send_event(vsession, vhost_blk_stop_cb,
1437 					     3, "stop session");
1438 }
1439 
1440 static void
1441 vhost_blk_dump_info_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_ctx *w)
1442 {
1443 	struct spdk_vhost_blk_dev *bvdev;
1444 
1445 	bvdev = to_blk_dev(vdev);
1446 	assert(bvdev != NULL);
1447 
1448 	spdk_json_write_named_object_begin(w, "block");
1449 
1450 	spdk_json_write_named_bool(w, "readonly", bvdev->readonly);
1451 
1452 	spdk_json_write_name(w, "bdev");
1453 	if (bvdev->bdev) {
1454 		spdk_json_write_string(w, spdk_bdev_get_name(bvdev->bdev));
1455 	} else {
1456 		spdk_json_write_null(w);
1457 	}
1458 	spdk_json_write_named_string(w, "transport", bvdev->ops->name);
1459 
1460 	spdk_json_write_object_end(w);
1461 }
1462 
1463 static void
1464 vhost_blk_write_config_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_ctx *w)
1465 {
1466 	struct spdk_vhost_blk_dev *bvdev;
1467 
1468 	bvdev = to_blk_dev(vdev);
1469 	assert(bvdev != NULL);
1470 
1471 	if (!bvdev->bdev) {
1472 		return;
1473 	}
1474 
1475 	spdk_json_write_object_begin(w);
1476 	spdk_json_write_named_string(w, "method", "vhost_create_blk_controller");
1477 
1478 	spdk_json_write_named_object_begin(w, "params");
1479 	spdk_json_write_named_string(w, "ctrlr", vdev->name);
1480 	spdk_json_write_named_string(w, "dev_name", spdk_bdev_get_name(bvdev->bdev));
1481 	spdk_json_write_named_string(w, "cpumask",
1482 				     spdk_cpuset_fmt(spdk_thread_get_cpumask(vdev->thread)));
1483 	spdk_json_write_named_bool(w, "readonly", bvdev->readonly);
1484 	spdk_json_write_named_string(w, "transport", bvdev->ops->name);
1485 	spdk_json_write_object_end(w);
1486 
1487 	spdk_json_write_object_end(w);
1488 }
1489 
1490 static int vhost_blk_destroy(struct spdk_vhost_dev *dev);
1491 
1492 static int
1493 vhost_blk_get_config(struct spdk_vhost_dev *vdev, uint8_t *config,
1494 		     uint32_t len)
1495 {
1496 	struct virtio_blk_config blkcfg;
1497 	struct spdk_bdev *bdev;
1498 	uint32_t blk_size;
1499 	uint64_t blkcnt;
1500 
1501 	memset(&blkcfg, 0, sizeof(blkcfg));
1502 	bdev = vhost_blk_get_bdev(vdev);
1503 	if (bdev == NULL) {
1504 		/* We can't just return -1 here as this GET_CONFIG message might
1505 		 * be caused by a QEMU VM reboot. Returning -1 will indicate an
1506 		 * error to QEMU, who might then decide to terminate itself.
1507 		 * We don't want that. A simple reboot shouldn't break the system.
1508 		 *
1509 		 * Presenting a block device with block size 0 and block count 0
1510 		 * doesn't cause any problems on QEMU side and the virtio-pci
1511 		 * device is even still available inside the VM, but there will
1512 		 * be no block device created for it - the kernel drivers will
1513 		 * silently reject it.
1514 		 */
1515 		blk_size = 0;
1516 		blkcnt = 0;
1517 	} else {
1518 		blk_size = spdk_bdev_get_block_size(bdev);
1519 		blkcnt = spdk_bdev_get_num_blocks(bdev);
1520 		if (spdk_bdev_get_buf_align(bdev) > 1) {
1521 			blkcfg.size_max = SPDK_BDEV_LARGE_BUF_MAX_SIZE;
1522 			blkcfg.seg_max = spdk_min(SPDK_VHOST_IOVS_MAX - 2 - 1, BDEV_IO_NUM_CHILD_IOV - 2 - 1);
1523 		} else {
1524 			blkcfg.size_max = 131072;
1525 			/*  -2 for REQ and RESP and -1 for region boundary splitting */
1526 			blkcfg.seg_max = SPDK_VHOST_IOVS_MAX - 2 - 1;
1527 		}
1528 	}
1529 
1530 	blkcfg.blk_size = blk_size;
1531 	/* minimum I/O size in blocks */
1532 	blkcfg.min_io_size = 1;
1533 	/* expressed in 512 Bytes sectors */
1534 	blkcfg.capacity = (blkcnt * blk_size) / 512;
1535 	/* QEMU can overwrite this value when started */
1536 	blkcfg.num_queues = SPDK_VHOST_MAX_VQUEUES;
1537 
1538 	if (bdev && spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1539 		/* 16MiB, expressed in 512 Bytes */
1540 		blkcfg.max_discard_sectors = 32768;
1541 		blkcfg.max_discard_seg = 1;
1542 		blkcfg.discard_sector_alignment = blk_size / 512;
1543 	}
1544 	if (bdev && spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1545 		blkcfg.max_write_zeroes_sectors = 32768;
1546 		blkcfg.max_write_zeroes_seg = 1;
1547 	}
1548 
1549 	memcpy(config, &blkcfg, spdk_min(len, sizeof(blkcfg)));
1550 
1551 	return 0;
1552 }
1553 
1554 static const struct spdk_vhost_user_dev_backend vhost_blk_user_device_backend = {
1555 	.session_ctx_size = sizeof(struct spdk_vhost_blk_session) - sizeof(struct spdk_vhost_session),
1556 	.start_session =  vhost_blk_start,
1557 	.stop_session = vhost_blk_stop,
1558 };
1559 
1560 static const struct spdk_vhost_dev_backend vhost_blk_device_backend = {
1561 	.type = VHOST_BACKEND_BLK,
1562 	.vhost_get_config = vhost_blk_get_config,
1563 	.dump_info_json = vhost_blk_dump_info_json,
1564 	.write_config_json = vhost_blk_write_config_json,
1565 	.remove_device = vhost_blk_destroy,
1566 };
1567 
1568 int
1569 virtio_blk_construct_ctrlr(struct spdk_vhost_dev *vdev, const char *address,
1570 			   struct spdk_cpuset *cpumask, const struct spdk_json_val *params,
1571 			   const struct spdk_vhost_user_dev_backend *user_backend)
1572 {
1573 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1574 
1575 	return bvdev->ops->create_ctrlr(vdev, cpumask, address, params, (void *)user_backend);
1576 }
1577 
1578 int
1579 spdk_vhost_blk_construct(const char *name, const char *cpumask, const char *dev_name,
1580 			 const char *transport, const struct spdk_json_val *params)
1581 {
1582 	struct spdk_vhost_blk_dev *bvdev = NULL;
1583 	struct spdk_vhost_dev *vdev;
1584 	struct spdk_bdev *bdev;
1585 	const char *transport_name = VIRTIO_BLK_DEFAULT_TRANSPORT;
1586 	int ret = 0;
1587 
1588 	spdk_vhost_lock();
1589 
1590 	bvdev = calloc(1, sizeof(*bvdev));
1591 	if (bvdev == NULL) {
1592 		ret = -ENOMEM;
1593 		goto out;
1594 	}
1595 
1596 	if (transport != NULL) {
1597 		transport_name = transport;
1598 	}
1599 
1600 	bvdev->ops = virtio_blk_get_transport_ops(transport_name);
1601 	if (!bvdev->ops) {
1602 		ret = -EINVAL;
1603 		SPDK_ERRLOG("Transport type '%s' unavailable.\n", transport_name);
1604 		goto out;
1605 	}
1606 
1607 	ret = spdk_bdev_open_ext(dev_name, true, bdev_event_cb, bvdev, &bvdev->bdev_desc);
1608 	if (ret != 0) {
1609 		SPDK_ERRLOG("%s: could not open bdev '%s', error=%d\n",
1610 			    name, dev_name, ret);
1611 		goto out;
1612 	}
1613 	bdev = spdk_bdev_desc_get_bdev(bvdev->bdev_desc);
1614 
1615 	vdev = &bvdev->vdev;
1616 	vdev->virtio_features = SPDK_VHOST_BLK_FEATURES_BASE;
1617 	vdev->disabled_features = SPDK_VHOST_BLK_DISABLED_FEATURES;
1618 	vdev->protocol_features = SPDK_VHOST_BLK_PROTOCOL_FEATURES;
1619 
1620 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1621 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_DISCARD);
1622 	}
1623 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1624 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_WRITE_ZEROES);
1625 	}
1626 
1627 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_FLUSH)) {
1628 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_FLUSH);
1629 	}
1630 
1631 	/*
1632 	 * When starting qemu with multiqueue enable, the vhost device will
1633 	 * be started/stopped many times, related to the queues num, as the
1634 	 * exact number of queues used for this device is not known at the time.
1635 	 * The target has to stop and start the device once got a valid IO queue.
1636 	 * When stoping and starting the vhost device, the backend bdev io device
1637 	 * will be deleted and created repeatedly.
1638 	 * Hold a bdev reference so that in the struct spdk_vhost_blk_dev, so that
1639 	 * the io device will not be deleted.
1640 	 */
1641 	bvdev->dummy_io_channel = spdk_bdev_get_io_channel(bvdev->bdev_desc);
1642 
1643 	bvdev->bdev = bdev;
1644 	bvdev->readonly = false;
1645 	ret = vhost_dev_register(vdev, name, cpumask, params, &vhost_blk_device_backend,
1646 				 &vhost_blk_user_device_backend);
1647 	if (ret != 0) {
1648 		spdk_put_io_channel(bvdev->dummy_io_channel);
1649 		spdk_bdev_close(bvdev->bdev_desc);
1650 		goto out;
1651 	}
1652 
1653 	SPDK_INFOLOG(vhost, "%s: using bdev '%s'\n", name, dev_name);
1654 out:
1655 	if (ret != 0 && bvdev) {
1656 		free(bvdev);
1657 	}
1658 	spdk_vhost_unlock();
1659 	return ret;
1660 }
1661 
1662 int
1663 virtio_blk_destroy_ctrlr(struct spdk_vhost_dev *vdev)
1664 {
1665 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1666 
1667 	return bvdev->ops->destroy_ctrlr(vdev);
1668 }
1669 
1670 static int
1671 vhost_blk_destroy(struct spdk_vhost_dev *vdev)
1672 {
1673 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1674 	int rc;
1675 
1676 	assert(bvdev != NULL);
1677 
1678 	rc = vhost_dev_unregister(&bvdev->vdev);
1679 	if (rc != 0) {
1680 		return rc;
1681 	}
1682 
1683 	/* if the bdev is removed, don't need call spdk_put_io_channel. */
1684 	if (bvdev->bdev) {
1685 		spdk_put_io_channel(bvdev->dummy_io_channel);
1686 	}
1687 
1688 	if (bvdev->bdev_desc) {
1689 		spdk_bdev_close(bvdev->bdev_desc);
1690 		bvdev->bdev_desc = NULL;
1691 	}
1692 	bvdev->bdev = NULL;
1693 
1694 	free(bvdev);
1695 	return 0;
1696 }
1697 
1698 struct spdk_io_channel *
1699 vhost_blk_get_io_channel(struct spdk_vhost_dev *vdev)
1700 {
1701 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1702 
1703 	return spdk_bdev_get_io_channel(bvdev->bdev_desc);
1704 }
1705 
1706 void
1707 vhost_blk_put_io_channel(struct spdk_io_channel *ch)
1708 {
1709 	spdk_put_io_channel(ch);
1710 }
1711 
1712 static struct spdk_virtio_blk_transport *
1713 vhost_user_blk_create(const struct spdk_json_val *params)
1714 {
1715 	int ret;
1716 	struct spdk_virtio_blk_transport *vhost_user_blk;
1717 
1718 	vhost_user_blk = calloc(1, sizeof(*vhost_user_blk));
1719 	if (!vhost_user_blk) {
1720 		return NULL;
1721 	}
1722 
1723 	ret = vhost_user_init();
1724 	if (ret != 0) {
1725 		free(vhost_user_blk);
1726 		return NULL;
1727 	}
1728 
1729 	return vhost_user_blk;
1730 }
1731 
1732 static int
1733 vhost_user_blk_destroy(struct spdk_virtio_blk_transport *transport,
1734 		       spdk_vhost_fini_cb cb_fn)
1735 {
1736 	vhost_user_fini(cb_fn);
1737 	free(transport);
1738 	return 0;
1739 }
1740 
1741 struct rpc_vhost_blk {
1742 	bool readonly;
1743 	bool packed_ring;
1744 	bool packed_ring_recovery;
1745 };
1746 
1747 static const struct spdk_json_object_decoder rpc_construct_vhost_blk[] = {
1748 	{"readonly", offsetof(struct rpc_vhost_blk, readonly), spdk_json_decode_bool, true},
1749 	{"packed_ring", offsetof(struct rpc_vhost_blk, packed_ring), spdk_json_decode_bool, true},
1750 	{"packed_ring_recovery", offsetof(struct rpc_vhost_blk, packed_ring_recovery), spdk_json_decode_bool, true},
1751 };
1752 
1753 static int
1754 vhost_user_blk_create_ctrlr(struct spdk_vhost_dev *vdev, struct spdk_cpuset *cpumask,
1755 			    const char *address, const struct spdk_json_val *params, void *custom_opts)
1756 {
1757 	struct rpc_vhost_blk req = {0};
1758 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1759 
1760 	if (spdk_json_decode_object_relaxed(params, rpc_construct_vhost_blk,
1761 					    SPDK_COUNTOF(rpc_construct_vhost_blk),
1762 					    &req)) {
1763 		SPDK_DEBUGLOG(vhost_blk, "spdk_json_decode_object failed\n");
1764 		return -EINVAL;
1765 	}
1766 
1767 	vdev->packed_ring_recovery = false;
1768 
1769 	if (req.packed_ring) {
1770 		vdev->virtio_features |= (uint64_t)req.packed_ring << VIRTIO_F_RING_PACKED;
1771 		vdev->packed_ring_recovery = req.packed_ring_recovery;
1772 	}
1773 	if (req.readonly) {
1774 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_RO);
1775 		bvdev->readonly = req.readonly;
1776 	}
1777 
1778 	return vhost_user_dev_register(vdev, address, cpumask, custom_opts);
1779 }
1780 
1781 static int
1782 vhost_user_blk_destroy_ctrlr(struct spdk_vhost_dev *vdev)
1783 {
1784 	return vhost_user_dev_unregister(vdev);
1785 }
1786 
1787 static const struct spdk_virtio_blk_transport_ops vhost_user_blk = {
1788 	.name = "vhost_user_blk",
1789 
1790 	.dump_opts = NULL,
1791 
1792 	.create = vhost_user_blk_create,
1793 	.destroy = vhost_user_blk_destroy,
1794 
1795 	.create_ctrlr = vhost_user_blk_create_ctrlr,
1796 	.destroy_ctrlr = vhost_user_blk_destroy_ctrlr,
1797 
1798 	.bdev_event = vhost_user_bdev_event_cb,
1799 };
1800 
1801 SPDK_VIRTIO_BLK_TRANSPORT_REGISTER(vhost_user_blk, &vhost_user_blk);
1802 
1803 SPDK_LOG_REGISTER_COMPONENT(vhost_blk)
1804 SPDK_LOG_REGISTER_COMPONENT(vhost_blk_data)
1805