xref: /spdk/lib/vhost/vhost_blk.c (revision 838e61c3772fdefb17e1a0b8f9880e2bcb9c4c0d)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation. All rights reserved.
3  *   All rights reserved.
4  */
5 
6 #include <linux/virtio_blk.h>
7 
8 #include "spdk/env.h"
9 #include "spdk/bdev.h"
10 #include "spdk/bdev_module.h"
11 #include "spdk/thread.h"
12 #include "spdk/likely.h"
13 #include "spdk/string.h"
14 #include "spdk/util.h"
15 #include "spdk/vhost.h"
16 #include "spdk/json.h"
17 
18 #include "vhost_internal.h"
19 #include <rte_version.h>
20 
21 /* Minimal set of features supported by every SPDK VHOST-BLK device */
22 #define SPDK_VHOST_BLK_FEATURES_BASE (SPDK_VHOST_FEATURES | \
23 		(1ULL << VIRTIO_BLK_F_SIZE_MAX) | (1ULL << VIRTIO_BLK_F_SEG_MAX) | \
24 		(1ULL << VIRTIO_BLK_F_GEOMETRY) | (1ULL << VIRTIO_BLK_F_BLK_SIZE) | \
25 		(1ULL << VIRTIO_BLK_F_TOPOLOGY) | (1ULL << VIRTIO_BLK_F_BARRIER)  | \
26 		(1ULL << VIRTIO_BLK_F_SCSI)     | (1ULL << VIRTIO_BLK_F_CONFIG_WCE) | \
27 		(1ULL << VIRTIO_BLK_F_MQ))
28 
29 /* Not supported features */
30 #define SPDK_VHOST_BLK_DISABLED_FEATURES (SPDK_VHOST_DISABLED_FEATURES | \
31 		(1ULL << VIRTIO_BLK_F_GEOMETRY) | (1ULL << VIRTIO_BLK_F_CONFIG_WCE) | \
32 		(1ULL << VIRTIO_BLK_F_BARRIER)  | (1ULL << VIRTIO_BLK_F_SCSI))
33 
34 /* Vhost-blk support protocol features */
35 #define SPDK_VHOST_BLK_PROTOCOL_FEATURES ((1ULL << VHOST_USER_PROTOCOL_F_CONFIG) | \
36 		(1ULL << VHOST_USER_PROTOCOL_F_INFLIGHT_SHMFD))
37 
38 #define VIRTIO_BLK_DEFAULT_TRANSPORT "vhost_user_blk"
39 
40 struct spdk_vhost_user_blk_task {
41 	struct spdk_vhost_blk_task blk_task;
42 	struct spdk_vhost_blk_session *bvsession;
43 	struct spdk_vhost_virtqueue *vq;
44 
45 	uint16_t req_idx;
46 	uint16_t num_descs;
47 	uint16_t buffer_id;
48 	uint16_t inflight_head;
49 
50 	/* If set, the task is currently used for I/O processing. */
51 	bool used;
52 };
53 
54 struct spdk_vhost_blk_dev {
55 	struct spdk_vhost_dev vdev;
56 	struct spdk_bdev *bdev;
57 	struct spdk_bdev_desc *bdev_desc;
58 	const struct spdk_virtio_blk_transport_ops *ops;
59 
60 	/* dummy_io_channel is used to hold a bdev reference */
61 	struct spdk_io_channel *dummy_io_channel;
62 	bool readonly;
63 };
64 
65 struct spdk_vhost_blk_session {
66 	/* The parent session must be the very first field in this struct */
67 	struct spdk_vhost_session vsession;
68 	struct spdk_vhost_blk_dev *bvdev;
69 	struct spdk_poller *requestq_poller;
70 	struct spdk_io_channel *io_channel;
71 	struct spdk_poller *stop_poller;
72 };
73 
74 /* forward declaration */
75 static const struct spdk_vhost_dev_backend vhost_blk_device_backend;
76 
77 static void vhost_user_blk_request_finish(uint8_t status, struct spdk_vhost_blk_task *task,
78 		void *cb_arg);
79 
80 static int
81 vhost_user_process_blk_request(struct spdk_vhost_user_blk_task *user_task)
82 {
83 	struct spdk_vhost_blk_session *bvsession = user_task->bvsession;
84 	struct spdk_vhost_dev *vdev = &bvsession->bvdev->vdev;
85 
86 	return virtio_blk_process_request(vdev, bvsession->io_channel, &user_task->blk_task,
87 					  vhost_user_blk_request_finish, NULL);
88 }
89 
90 static struct spdk_vhost_blk_dev *
91 to_blk_dev(struct spdk_vhost_dev *vdev)
92 {
93 	if (vdev == NULL) {
94 		return NULL;
95 	}
96 
97 	if (vdev->backend->type != VHOST_BACKEND_BLK) {
98 		SPDK_ERRLOG("%s: not a vhost-blk device\n", vdev->name);
99 		return NULL;
100 	}
101 
102 	return SPDK_CONTAINEROF(vdev, struct spdk_vhost_blk_dev, vdev);
103 }
104 
105 struct spdk_bdev *
106 vhost_blk_get_bdev(struct spdk_vhost_dev *vdev)
107 {
108 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
109 
110 	assert(bvdev != NULL);
111 
112 	return bvdev->bdev;
113 }
114 
115 static struct spdk_vhost_blk_session *
116 to_blk_session(struct spdk_vhost_session *vsession)
117 {
118 	assert(vsession->vdev->backend->type == VHOST_BACKEND_BLK);
119 	return (struct spdk_vhost_blk_session *)vsession;
120 }
121 
122 static void
123 blk_task_finish(struct spdk_vhost_user_blk_task *task)
124 {
125 	assert(task->bvsession->vsession.task_cnt > 0);
126 	task->bvsession->vsession.task_cnt--;
127 	task->used = false;
128 }
129 
130 static void
131 blk_task_init(struct spdk_vhost_user_blk_task *task)
132 {
133 	struct spdk_vhost_blk_task *blk_task = &task->blk_task;
134 
135 	task->used = true;
136 	blk_task->iovcnt = SPDK_COUNTOF(blk_task->iovs);
137 	blk_task->status = NULL;
138 	blk_task->used_len = 0;
139 	blk_task->payload_size = 0;
140 }
141 
142 static void
143 blk_task_enqueue(struct spdk_vhost_user_blk_task *task)
144 {
145 	if (task->vq->packed.packed_ring) {
146 		vhost_vq_packed_ring_enqueue(&task->bvsession->vsession, task->vq,
147 					     task->num_descs,
148 					     task->buffer_id, task->blk_task.used_len,
149 					     task->inflight_head);
150 	} else {
151 		vhost_vq_used_ring_enqueue(&task->bvsession->vsession, task->vq,
152 					   task->req_idx, task->blk_task.used_len);
153 	}
154 }
155 
156 static void
157 vhost_user_blk_request_finish(uint8_t status, struct spdk_vhost_blk_task *task, void *cb_arg)
158 {
159 	struct spdk_vhost_user_blk_task *user_task;
160 
161 	user_task = SPDK_CONTAINEROF(task, struct spdk_vhost_user_blk_task, blk_task);
162 
163 	blk_task_enqueue(user_task);
164 
165 	SPDK_DEBUGLOG(vhost_blk, "Finished task (%p) req_idx=%d\n status: %" PRIu8"\n",
166 		      user_task, user_task->req_idx, status);
167 	blk_task_finish(user_task);
168 }
169 
170 static void
171 blk_request_finish(uint8_t status, struct spdk_vhost_blk_task *task)
172 {
173 
174 	if (task->status) {
175 		*task->status = status;
176 	}
177 
178 	task->cb(status, task, task->cb_arg);
179 }
180 
181 /*
182  * Process task's descriptor chain and setup data related fields.
183  * Return
184  *   total size of supplied buffers
185  *
186  *   FIXME: Make this function return to rd_cnt and wr_cnt
187  */
188 static int
189 blk_iovs_split_queue_setup(struct spdk_vhost_blk_session *bvsession,
190 			   struct spdk_vhost_virtqueue *vq,
191 			   uint16_t req_idx, struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
192 {
193 	struct spdk_vhost_session *vsession = &bvsession->vsession;
194 	struct spdk_vhost_dev *vdev = vsession->vdev;
195 	struct vring_desc *desc, *desc_table;
196 	uint16_t out_cnt = 0, cnt = 0;
197 	uint32_t desc_table_size, len = 0;
198 	uint32_t desc_handled_cnt;
199 	int rc;
200 
201 	rc = vhost_vq_get_desc(vsession, vq, req_idx, &desc, &desc_table, &desc_table_size);
202 	if (rc != 0) {
203 		SPDK_ERRLOG("%s: invalid descriptor at index %"PRIu16".\n", vdev->name, req_idx);
204 		return -1;
205 	}
206 
207 	desc_handled_cnt = 0;
208 	while (1) {
209 		/*
210 		 * Maximum cnt reached?
211 		 * Should not happen if request is well formatted, otherwise this is a BUG.
212 		 */
213 		if (spdk_unlikely(cnt == *iovs_cnt)) {
214 			SPDK_DEBUGLOG(vhost_blk, "%s: max IOVs in request reached (req_idx = %"PRIu16").\n",
215 				      vsession->name, req_idx);
216 			return -1;
217 		}
218 
219 		if (spdk_unlikely(vhost_vring_desc_to_iov(vsession, iovs, &cnt, desc))) {
220 			SPDK_DEBUGLOG(vhost_blk, "%s: invalid descriptor %" PRIu16" (req_idx = %"PRIu16").\n",
221 				      vsession->name, req_idx, cnt);
222 			return -1;
223 		}
224 
225 		len += desc->len;
226 
227 		out_cnt += vhost_vring_desc_is_wr(desc);
228 
229 		rc = vhost_vring_desc_get_next(&desc, desc_table, desc_table_size);
230 		if (rc != 0) {
231 			SPDK_ERRLOG("%s: descriptor chain at index %"PRIu16" terminated unexpectedly.\n",
232 				    vsession->name, req_idx);
233 			return -1;
234 		} else if (desc == NULL) {
235 			break;
236 		}
237 
238 		desc_handled_cnt++;
239 		if (spdk_unlikely(desc_handled_cnt > desc_table_size)) {
240 			/* Break a cycle and report an error, if any. */
241 			SPDK_ERRLOG("%s: found a cycle in the descriptor chain: desc_table_size = %d, desc_handled_cnt = %d.\n",
242 				    vsession->name, desc_table_size, desc_handled_cnt);
243 			return -1;
244 		}
245 	}
246 
247 	/*
248 	 * There must be least two descriptors.
249 	 * First contain request so it must be readable.
250 	 * Last descriptor contain buffer for response so it must be writable.
251 	 */
252 	if (spdk_unlikely(out_cnt == 0 || cnt < 2)) {
253 		return -1;
254 	}
255 
256 	*length = len;
257 	*iovs_cnt = cnt;
258 	return 0;
259 }
260 
261 static int
262 blk_iovs_packed_desc_setup(struct spdk_vhost_session *vsession,
263 			   struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
264 			   struct vring_packed_desc *desc_table, uint16_t desc_table_size,
265 			   struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
266 {
267 	struct vring_packed_desc *desc;
268 	uint16_t cnt = 0, out_cnt = 0;
269 	uint32_t len = 0;
270 
271 	if (desc_table == NULL) {
272 		desc = &vq->vring.desc_packed[req_idx];
273 	} else {
274 		req_idx = 0;
275 		desc = desc_table;
276 	}
277 
278 	while (1) {
279 		/*
280 		 * Maximum cnt reached?
281 		 * Should not happen if request is well formatted, otherwise this is a BUG.
282 		 */
283 		if (spdk_unlikely(cnt == *iovs_cnt)) {
284 			SPDK_ERRLOG("%s: max IOVs in request reached (req_idx = %"PRIu16").\n",
285 				    vsession->name, req_idx);
286 			return -EINVAL;
287 		}
288 
289 		if (spdk_unlikely(vhost_vring_packed_desc_to_iov(vsession, iovs, &cnt, desc))) {
290 			SPDK_ERRLOG("%s: invalid descriptor %" PRIu16" (req_idx = %"PRIu16").\n",
291 				    vsession->name, req_idx, cnt);
292 			return -EINVAL;
293 		}
294 
295 		len += desc->len;
296 		out_cnt += vhost_vring_packed_desc_is_wr(desc);
297 
298 		/* desc is NULL means we reach the last desc of this request */
299 		vhost_vring_packed_desc_get_next(&desc, &req_idx, vq, desc_table, desc_table_size);
300 		if (desc == NULL) {
301 			break;
302 		}
303 	}
304 
305 	/*
306 	 * There must be least two descriptors.
307 	 * First contain request so it must be readable.
308 	 * Last descriptor contain buffer for response so it must be writable.
309 	 */
310 	if (spdk_unlikely(out_cnt == 0 || cnt < 2)) {
311 		return -EINVAL;
312 	}
313 
314 	*length = len;
315 	*iovs_cnt = cnt;
316 
317 	return 0;
318 }
319 
320 static int
321 blk_iovs_packed_queue_setup(struct spdk_vhost_blk_session *bvsession,
322 			    struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
323 			    struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
324 {
325 	struct spdk_vhost_session *vsession = &bvsession->vsession;
326 	struct spdk_vhost_dev *vdev = vsession->vdev;
327 	struct vring_packed_desc *desc = NULL, *desc_table;
328 	uint32_t desc_table_size;
329 	int rc;
330 
331 	rc = vhost_vq_get_desc_packed(vsession, vq, req_idx, &desc,
332 				      &desc_table, &desc_table_size);
333 	if (spdk_unlikely(rc != 0)) {
334 		SPDK_ERRLOG("%s: Invalid descriptor at index %"PRIu16".\n", vdev->name, req_idx);
335 		return rc;
336 	}
337 
338 	return blk_iovs_packed_desc_setup(vsession, vq, req_idx, desc_table, desc_table_size,
339 					  iovs, iovs_cnt, length);
340 }
341 
342 static int
343 blk_iovs_inflight_queue_setup(struct spdk_vhost_blk_session *bvsession,
344 			      struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
345 			      struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
346 {
347 	struct spdk_vhost_session *vsession = &bvsession->vsession;
348 	struct spdk_vhost_dev *vdev = vsession->vdev;
349 	spdk_vhost_inflight_desc *inflight_desc;
350 	struct vring_packed_desc *desc_table;
351 	uint16_t out_cnt = 0, cnt = 0;
352 	uint32_t desc_table_size, len = 0;
353 	int rc = 0;
354 
355 	rc = vhost_inflight_queue_get_desc(vsession, vq->vring_inflight.inflight_packed->desc,
356 					   req_idx, &inflight_desc, &desc_table, &desc_table_size);
357 	if (spdk_unlikely(rc != 0)) {
358 		SPDK_ERRLOG("%s: Invalid descriptor at index %"PRIu16".\n", vdev->name, req_idx);
359 		return rc;
360 	}
361 
362 	if (desc_table != NULL) {
363 		return blk_iovs_packed_desc_setup(vsession, vq, req_idx, desc_table, desc_table_size,
364 						  iovs, iovs_cnt, length);
365 	}
366 
367 	while (1) {
368 		/*
369 		 * Maximum cnt reached?
370 		 * Should not happen if request is well formatted, otherwise this is a BUG.
371 		 */
372 		if (spdk_unlikely(cnt == *iovs_cnt)) {
373 			SPDK_ERRLOG("%s: max IOVs in request reached (req_idx = %"PRIu16").\n",
374 				    vsession->name, req_idx);
375 			return -EINVAL;
376 		}
377 
378 		if (spdk_unlikely(vhost_vring_inflight_desc_to_iov(vsession, iovs, &cnt, inflight_desc))) {
379 			SPDK_ERRLOG("%s: invalid descriptor %" PRIu16" (req_idx = %"PRIu16").\n",
380 				    vsession->name, req_idx, cnt);
381 			return -EINVAL;
382 		}
383 
384 		len += inflight_desc->len;
385 		out_cnt += vhost_vring_inflight_desc_is_wr(inflight_desc);
386 
387 		/* Without F_NEXT means it's the last desc */
388 		if ((inflight_desc->flags & VRING_DESC_F_NEXT) == 0) {
389 			break;
390 		}
391 
392 		inflight_desc = &vq->vring_inflight.inflight_packed->desc[inflight_desc->next];
393 	}
394 
395 	/*
396 	 * There must be least two descriptors.
397 	 * First contain request so it must be readable.
398 	 * Last descriptor contain buffer for response so it must be writable.
399 	 */
400 	if (spdk_unlikely(out_cnt == 0 || cnt < 2)) {
401 		return -EINVAL;
402 	}
403 
404 	*length = len;
405 	*iovs_cnt = cnt;
406 
407 	return 0;
408 }
409 
410 static void
411 blk_request_complete_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
412 {
413 	struct spdk_vhost_blk_task *task = cb_arg;
414 
415 	spdk_bdev_free_io(bdev_io);
416 	blk_request_finish(success ? VIRTIO_BLK_S_OK : VIRTIO_BLK_S_IOERR, task);
417 }
418 
419 static void
420 blk_request_resubmit(void *arg)
421 {
422 	struct spdk_vhost_blk_task *task = arg;
423 	int rc = 0;
424 
425 	rc = virtio_blk_process_request(task->bdev_io_wait_vdev, task->bdev_io_wait_ch, task,
426 					task->cb, task->cb_arg);
427 	if (rc == 0) {
428 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p resubmitted ======\n", task);
429 	} else {
430 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p failed ======\n", task);
431 	}
432 }
433 
434 static inline void
435 blk_request_queue_io(struct spdk_vhost_dev *vdev, struct spdk_io_channel *ch,
436 		     struct spdk_vhost_blk_task *task)
437 {
438 	int rc;
439 	struct spdk_bdev *bdev = vhost_blk_get_bdev(vdev);
440 
441 	task->bdev_io_wait.bdev = bdev;
442 	task->bdev_io_wait.cb_fn = blk_request_resubmit;
443 	task->bdev_io_wait.cb_arg = task;
444 	task->bdev_io_wait_ch = ch;
445 	task->bdev_io_wait_vdev = vdev;
446 
447 	rc = spdk_bdev_queue_io_wait(bdev, ch, &task->bdev_io_wait);
448 	if (rc != 0) {
449 		blk_request_finish(VIRTIO_BLK_S_IOERR, task);
450 	}
451 }
452 
453 int
454 virtio_blk_process_request(struct spdk_vhost_dev *vdev, struct spdk_io_channel *ch,
455 			   struct spdk_vhost_blk_task *task, virtio_blk_request_cb cb, void *cb_arg)
456 {
457 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
458 	struct virtio_blk_outhdr req;
459 	struct virtio_blk_discard_write_zeroes *desc;
460 	struct iovec *iov;
461 	uint32_t type;
462 	uint64_t flush_bytes;
463 	uint32_t payload_len;
464 	uint16_t iovcnt;
465 	int rc;
466 
467 	assert(bvdev != NULL);
468 
469 	task->cb = cb;
470 	task->cb_arg = cb_arg;
471 
472 	iov = &task->iovs[0];
473 	if (spdk_unlikely(iov->iov_len != sizeof(req))) {
474 		SPDK_DEBUGLOG(vhost_blk,
475 			      "First descriptor size is %zu but expected %zu (task = %p).\n",
476 			      iov->iov_len, sizeof(req), task);
477 		blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
478 		return -1;
479 	}
480 
481 	/* Some SeaBIOS versions don't align the virtio_blk_outhdr on an 8-byte boundary, which
482 	 * triggers ubsan errors.  So copy this small 16-byte structure to the stack to workaround
483 	 * this problem.
484 	 */
485 	memcpy(&req, iov->iov_base, sizeof(req));
486 
487 	iov = &task->iovs[task->iovcnt - 1];
488 	if (spdk_unlikely(iov->iov_len != 1)) {
489 		SPDK_DEBUGLOG(vhost_blk,
490 			      "Last descriptor size is %zu but expected %d (task = %p).\n",
491 			      iov->iov_len, 1, task);
492 		blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
493 		return -1;
494 	}
495 
496 	payload_len = task->payload_size;
497 	task->status = iov->iov_base;
498 	payload_len -= sizeof(req) + sizeof(*task->status);
499 	iovcnt = task->iovcnt - 2;
500 
501 	type = req.type;
502 #ifdef VIRTIO_BLK_T_BARRIER
503 	/* Don't care about barrier for now (as QEMU's virtio-blk do). */
504 	type &= ~VIRTIO_BLK_T_BARRIER;
505 #endif
506 
507 	switch (type) {
508 	case VIRTIO_BLK_T_IN:
509 	case VIRTIO_BLK_T_OUT:
510 		if (spdk_unlikely(payload_len == 0 || (payload_len & (512 - 1)) != 0)) {
511 			SPDK_ERRLOG("%s - passed IO buffer is not multiple of 512b (task = %p).\n",
512 				    type ? "WRITE" : "READ", task);
513 			blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
514 			return -1;
515 		}
516 
517 		if (type == VIRTIO_BLK_T_IN) {
518 			task->used_len = payload_len + sizeof(*task->status);
519 			rc = spdk_bdev_readv(bvdev->bdev_desc, ch,
520 					     &task->iovs[1], iovcnt, req.sector * 512,
521 					     payload_len, blk_request_complete_cb, task);
522 		} else if (!bvdev->readonly) {
523 			task->used_len = sizeof(*task->status);
524 			rc = spdk_bdev_writev(bvdev->bdev_desc, ch,
525 					      &task->iovs[1], iovcnt, req.sector * 512,
526 					      payload_len, blk_request_complete_cb, task);
527 		} else {
528 			SPDK_DEBUGLOG(vhost_blk, "Device is in read-only mode!\n");
529 			rc = -1;
530 		}
531 
532 		if (rc) {
533 			if (rc == -ENOMEM) {
534 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
535 				blk_request_queue_io(vdev, ch, task);
536 			} else {
537 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
538 				return -1;
539 			}
540 		}
541 		break;
542 	case VIRTIO_BLK_T_DISCARD:
543 		desc = task->iovs[1].iov_base;
544 		if (payload_len != sizeof(*desc)) {
545 			SPDK_NOTICELOG("Invalid discard payload size: %u\n", payload_len);
546 			blk_request_finish(VIRTIO_BLK_S_IOERR, task);
547 			return -1;
548 		}
549 
550 		if (desc->flags & VIRTIO_BLK_WRITE_ZEROES_FLAG_UNMAP) {
551 			SPDK_ERRLOG("UNMAP flag is only used for WRITE ZEROES command\n");
552 			blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
553 			return -1;
554 		}
555 
556 		rc = spdk_bdev_unmap(bvdev->bdev_desc, ch,
557 				     desc->sector * 512, desc->num_sectors * 512,
558 				     blk_request_complete_cb, task);
559 		if (rc) {
560 			if (rc == -ENOMEM) {
561 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
562 				blk_request_queue_io(vdev, ch, task);
563 			} else {
564 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
565 				return -1;
566 			}
567 		}
568 		break;
569 	case VIRTIO_BLK_T_WRITE_ZEROES:
570 		desc = task->iovs[1].iov_base;
571 		if (payload_len != sizeof(*desc)) {
572 			SPDK_NOTICELOG("Invalid write zeroes payload size: %u\n", payload_len);
573 			blk_request_finish(VIRTIO_BLK_S_IOERR, task);
574 			return -1;
575 		}
576 
577 		/* Unmap this range, SPDK doesn't support it, kernel will enable this flag by default
578 		 * without checking unmap feature is negotiated or not, the flag isn't mandatory, so
579 		 * just print a warning.
580 		 */
581 		if (desc->flags & VIRTIO_BLK_WRITE_ZEROES_FLAG_UNMAP) {
582 			SPDK_WARNLOG("Ignore the unmap flag for WRITE ZEROES from %"PRIx64", len %"PRIx64"\n",
583 				     (uint64_t)desc->sector * 512, (uint64_t)desc->num_sectors * 512);
584 		}
585 
586 		rc = spdk_bdev_write_zeroes(bvdev->bdev_desc, ch,
587 					    desc->sector * 512, desc->num_sectors * 512,
588 					    blk_request_complete_cb, task);
589 		if (rc) {
590 			if (rc == -ENOMEM) {
591 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
592 				blk_request_queue_io(vdev, ch, task);
593 			} else {
594 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
595 				return -1;
596 			}
597 		}
598 		break;
599 	case VIRTIO_BLK_T_FLUSH:
600 		flush_bytes = spdk_bdev_get_num_blocks(bvdev->bdev) * spdk_bdev_get_block_size(bvdev->bdev);
601 		if (req.sector != 0) {
602 			SPDK_NOTICELOG("sector must be zero for flush command\n");
603 			blk_request_finish(VIRTIO_BLK_S_IOERR, task);
604 			return -1;
605 		}
606 		rc = spdk_bdev_flush(bvdev->bdev_desc, ch,
607 				     0, flush_bytes,
608 				     blk_request_complete_cb, task);
609 		if (rc) {
610 			if (rc == -ENOMEM) {
611 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
612 				blk_request_queue_io(vdev, ch, task);
613 			} else {
614 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
615 				return -1;
616 			}
617 		}
618 		break;
619 	case VIRTIO_BLK_T_GET_ID:
620 		if (!iovcnt || !payload_len) {
621 			blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
622 			return -1;
623 		}
624 		task->used_len = spdk_min((size_t)VIRTIO_BLK_ID_BYTES, task->iovs[1].iov_len);
625 		spdk_strcpy_pad(task->iovs[1].iov_base, spdk_bdev_get_name(bvdev->bdev),
626 				task->used_len, ' ');
627 		blk_request_finish(VIRTIO_BLK_S_OK, task);
628 		break;
629 	default:
630 		SPDK_DEBUGLOG(vhost_blk, "Not supported request type '%"PRIu32"'.\n", type);
631 		blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
632 		return -1;
633 	}
634 
635 	return 0;
636 }
637 
638 static void
639 process_blk_task(struct spdk_vhost_virtqueue *vq, uint16_t req_idx)
640 {
641 	struct spdk_vhost_user_blk_task *task;
642 	struct spdk_vhost_blk_task *blk_task;
643 	int rc;
644 
645 	assert(vq->packed.packed_ring == false);
646 
647 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[req_idx];
648 	blk_task = &task->blk_task;
649 	if (spdk_unlikely(task->used)) {
650 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
651 			    task->bvsession->vsession.name, req_idx);
652 		blk_task->used_len = 0;
653 		blk_task_enqueue(task);
654 		return;
655 	}
656 
657 	task->bvsession->vsession.task_cnt++;
658 
659 	blk_task_init(task);
660 
661 	rc = blk_iovs_split_queue_setup(task->bvsession, vq, task->req_idx,
662 					blk_task->iovs, &blk_task->iovcnt, &blk_task->payload_size);
663 
664 	if (rc) {
665 		SPDK_DEBUGLOG(vhost_blk, "Invalid request (req_idx = %"PRIu16").\n", task->req_idx);
666 		/* Only READ and WRITE are supported for now. */
667 		vhost_user_blk_request_finish(VIRTIO_BLK_S_UNSUPP, blk_task, NULL);
668 		return;
669 	}
670 
671 	if (vhost_user_process_blk_request(task) == 0) {
672 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d submitted ======\n", task,
673 			      req_idx);
674 	} else {
675 		SPDK_ERRLOG("====== Task %p req_idx %d failed ======\n", task, req_idx);
676 	}
677 }
678 
679 static void
680 process_packed_blk_task(struct spdk_vhost_virtqueue *vq, uint16_t req_idx)
681 {
682 	struct spdk_vhost_user_blk_task *task;
683 	struct spdk_vhost_blk_task *blk_task;
684 	uint16_t task_idx = req_idx, num_descs;
685 	int rc;
686 
687 	assert(vq->packed.packed_ring);
688 
689 	/* Packed ring used the buffer_id as the task_idx to get task struct.
690 	 * In kernel driver, it uses the vq->free_head to set the buffer_id so the value
691 	 * must be in the range of 0 ~ vring.size. The free_head value must be unique
692 	 * in the outstanding requests.
693 	 * We can't use the req_idx as the task_idx because the desc can be reused in
694 	 * the next phase even when it's not completed in the previous phase. For example,
695 	 * At phase 0, last_used_idx was 2 and desc0 was not completed.Then after moving
696 	 * phase 1, last_avail_idx is updated to 1. In this case, req_idx can not be used
697 	 * as task_idx because we will know task[0]->used is true at phase 1.
698 	 * The split queue is quite different, the desc would insert into the free list when
699 	 * device completes the request, the driver gets the desc from the free list which
700 	 * ensures the req_idx is unique in the outstanding requests.
701 	 */
702 	task_idx = vhost_vring_packed_desc_get_buffer_id(vq, req_idx, &num_descs);
703 
704 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[task_idx];
705 	blk_task = &task->blk_task;
706 	if (spdk_unlikely(task->used)) {
707 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
708 			    task->bvsession->vsession.name, task_idx);
709 		blk_task->used_len = 0;
710 		blk_task_enqueue(task);
711 		return;
712 	}
713 
714 	task->req_idx = req_idx;
715 	task->num_descs = num_descs;
716 	task->buffer_id = task_idx;
717 
718 	rte_vhost_set_inflight_desc_packed(task->bvsession->vsession.vid, vq->vring_idx,
719 					   req_idx, (req_idx + num_descs - 1) % vq->vring.size,
720 					   &task->inflight_head);
721 
722 	task->bvsession->vsession.task_cnt++;
723 
724 	blk_task_init(task);
725 
726 	rc = blk_iovs_packed_queue_setup(task->bvsession, vq, task->req_idx, blk_task->iovs,
727 					 &blk_task->iovcnt,
728 					 &blk_task->payload_size);
729 	if (rc) {
730 		SPDK_DEBUGLOG(vhost_blk, "Invalid request (req_idx = %"PRIu16").\n", task->req_idx);
731 		/* Only READ and WRITE are supported for now. */
732 		vhost_user_blk_request_finish(VIRTIO_BLK_S_UNSUPP, blk_task, NULL);
733 		return;
734 	}
735 
736 	if (vhost_user_process_blk_request(task) == 0) {
737 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d submitted ======\n", task,
738 			      task_idx);
739 	} else {
740 		SPDK_ERRLOG("====== Task %p req_idx %d failed ======\n", task, task_idx);
741 	}
742 }
743 
744 static void
745 process_packed_inflight_blk_task(struct spdk_vhost_virtqueue *vq,
746 				 uint16_t req_idx)
747 {
748 	spdk_vhost_inflight_desc *desc_array = vq->vring_inflight.inflight_packed->desc;
749 	spdk_vhost_inflight_desc *desc = &desc_array[req_idx];
750 	struct spdk_vhost_user_blk_task *task;
751 	struct spdk_vhost_blk_task *blk_task;
752 	uint16_t task_idx, num_descs;
753 	int rc;
754 
755 	task_idx = desc_array[desc->last].id;
756 	num_descs = desc->num;
757 	/* In packed ring reconnection, we use the last_used_idx as the
758 	 * initial value. So when we process the inflight descs we still
759 	 * need to update the available ring index.
760 	 */
761 	vq->last_avail_idx += num_descs;
762 	if (vq->last_avail_idx >= vq->vring.size) {
763 		vq->last_avail_idx -= vq->vring.size;
764 		vq->packed.avail_phase = !vq->packed.avail_phase;
765 	}
766 
767 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[task_idx];
768 	blk_task = &task->blk_task;
769 	if (spdk_unlikely(task->used)) {
770 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
771 			    task->bvsession->vsession.name, task_idx);
772 		blk_task->used_len = 0;
773 		blk_task_enqueue(task);
774 		return;
775 	}
776 
777 	task->req_idx = req_idx;
778 	task->num_descs = num_descs;
779 	task->buffer_id = task_idx;
780 	/* It's for cleaning inflight entries */
781 	task->inflight_head = req_idx;
782 
783 	task->bvsession->vsession.task_cnt++;
784 
785 	blk_task_init(task);
786 
787 	rc = blk_iovs_inflight_queue_setup(task->bvsession, vq, task->req_idx, blk_task->iovs,
788 					   &blk_task->iovcnt,
789 					   &blk_task->payload_size);
790 	if (rc) {
791 		SPDK_DEBUGLOG(vhost_blk, "Invalid request (req_idx = %"PRIu16").\n", task->req_idx);
792 		/* Only READ and WRITE are supported for now. */
793 		vhost_user_blk_request_finish(VIRTIO_BLK_S_UNSUPP, blk_task, NULL);
794 		return;
795 	}
796 
797 	if (vhost_user_process_blk_request(task) == 0) {
798 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d submitted ======\n", task,
799 			      task_idx);
800 	} else {
801 		SPDK_ERRLOG("====== Task %p req_idx %d failed ======\n", task, task_idx);
802 	}
803 }
804 
805 static int
806 submit_inflight_desc(struct spdk_vhost_blk_session *bvsession,
807 		     struct spdk_vhost_virtqueue *vq)
808 {
809 	struct spdk_vhost_session *vsession;
810 	spdk_vhost_resubmit_info *resubmit;
811 	spdk_vhost_resubmit_desc *resubmit_list;
812 	uint16_t req_idx;
813 	int i, resubmit_cnt;
814 
815 	resubmit = vq->vring_inflight.resubmit_inflight;
816 	if (spdk_likely(resubmit == NULL || resubmit->resubmit_list == NULL ||
817 			resubmit->resubmit_num == 0)) {
818 		return 0;
819 	}
820 
821 	resubmit_list = resubmit->resubmit_list;
822 	vsession = &bvsession->vsession;
823 
824 	for (i = resubmit->resubmit_num - 1; i >= 0; --i) {
825 		req_idx = resubmit_list[i].index;
826 		SPDK_DEBUGLOG(vhost_blk, "====== Start processing resubmit request idx %"PRIu16"======\n",
827 			      req_idx);
828 
829 		if (spdk_unlikely(req_idx >= vq->vring.size)) {
830 			SPDK_ERRLOG("%s: request idx '%"PRIu16"' exceeds virtqueue size (%"PRIu16").\n",
831 				    vsession->name, req_idx, vq->vring.size);
832 			vhost_vq_used_ring_enqueue(vsession, vq, req_idx, 0);
833 			continue;
834 		}
835 
836 		if (vq->packed.packed_ring) {
837 			process_packed_inflight_blk_task(vq, req_idx);
838 		} else {
839 			process_blk_task(vq, req_idx);
840 		}
841 	}
842 	resubmit_cnt = resubmit->resubmit_num;
843 	resubmit->resubmit_num = 0;
844 	return resubmit_cnt;
845 }
846 
847 static int
848 process_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
849 {
850 	struct spdk_vhost_session *vsession = &bvsession->vsession;
851 	uint16_t reqs[SPDK_VHOST_VQ_MAX_SUBMISSIONS];
852 	uint16_t reqs_cnt, i;
853 	int resubmit_cnt = 0;
854 
855 	resubmit_cnt = submit_inflight_desc(bvsession, vq);
856 
857 	reqs_cnt = vhost_vq_avail_ring_get(vq, reqs, SPDK_COUNTOF(reqs));
858 	if (!reqs_cnt) {
859 		return resubmit_cnt;
860 	}
861 
862 	for (i = 0; i < reqs_cnt; i++) {
863 		SPDK_DEBUGLOG(vhost_blk, "====== Starting processing request idx %"PRIu16"======\n",
864 			      reqs[i]);
865 
866 		if (spdk_unlikely(reqs[i] >= vq->vring.size)) {
867 			SPDK_ERRLOG("%s: request idx '%"PRIu16"' exceeds virtqueue size (%"PRIu16").\n",
868 				    vsession->name, reqs[i], vq->vring.size);
869 			vhost_vq_used_ring_enqueue(vsession, vq, reqs[i], 0);
870 			continue;
871 		}
872 
873 		rte_vhost_set_inflight_desc_split(vsession->vid, vq->vring_idx, reqs[i]);
874 
875 		process_blk_task(vq, reqs[i]);
876 	}
877 
878 	return reqs_cnt;
879 }
880 
881 static int
882 process_packed_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
883 {
884 	uint16_t i = 0;
885 	uint16_t count = 0;
886 	int resubmit_cnt = 0;
887 
888 	resubmit_cnt = submit_inflight_desc(bvsession, vq);
889 
890 	while (i++ < SPDK_VHOST_VQ_MAX_SUBMISSIONS &&
891 	       vhost_vq_packed_ring_is_avail(vq)) {
892 		SPDK_DEBUGLOG(vhost_blk, "====== Starting processing request idx %"PRIu16"======\n",
893 			      vq->last_avail_idx);
894 		count++;
895 		process_packed_blk_task(vq, vq->last_avail_idx);
896 	}
897 
898 	return count > 0 ? count : resubmit_cnt;
899 }
900 
901 static int
902 _vdev_vq_worker(struct spdk_vhost_virtqueue *vq)
903 {
904 	struct spdk_vhost_session *vsession = vq->vsession;
905 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
906 	bool packed_ring;
907 	int rc = 0;
908 
909 	packed_ring = vq->packed.packed_ring;
910 	if (packed_ring) {
911 		rc = process_packed_vq(bvsession, vq);
912 	} else {
913 		rc = process_vq(bvsession, vq);
914 	}
915 
916 	vhost_session_vq_used_signal(vq);
917 
918 	return rc;
919 
920 }
921 
922 static int
923 vdev_vq_worker(void *arg)
924 {
925 	struct spdk_vhost_virtqueue *vq = arg;
926 
927 	return _vdev_vq_worker(vq);
928 }
929 
930 static int
931 vdev_worker(void *arg)
932 {
933 	struct spdk_vhost_blk_session *bvsession = arg;
934 	struct spdk_vhost_session *vsession = &bvsession->vsession;
935 	uint16_t q_idx;
936 	int rc = 0;
937 
938 	for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
939 		rc += _vdev_vq_worker(&vsession->virtqueue[q_idx]);
940 	}
941 
942 	return rc > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
943 }
944 
945 static void
946 no_bdev_process_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
947 {
948 	struct spdk_vhost_session *vsession = &bvsession->vsession;
949 	struct iovec iovs[SPDK_VHOST_IOVS_MAX];
950 	uint32_t length;
951 	uint16_t iovcnt, req_idx;
952 
953 	if (vhost_vq_avail_ring_get(vq, &req_idx, 1) != 1) {
954 		return;
955 	}
956 
957 	iovcnt = SPDK_COUNTOF(iovs);
958 	if (blk_iovs_split_queue_setup(bvsession, vq, req_idx, iovs, &iovcnt, &length) == 0) {
959 		*(volatile uint8_t *)iovs[iovcnt - 1].iov_base = VIRTIO_BLK_S_IOERR;
960 		SPDK_DEBUGLOG(vhost_blk_data, "Aborting request %" PRIu16"\n", req_idx);
961 	}
962 
963 	vhost_vq_used_ring_enqueue(vsession, vq, req_idx, 0);
964 }
965 
966 static void
967 no_bdev_process_packed_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
968 {
969 	struct spdk_vhost_session *vsession = &bvsession->vsession;
970 	struct spdk_vhost_user_blk_task *task;
971 	struct spdk_vhost_blk_task *blk_task;
972 	uint32_t length;
973 	uint16_t req_idx = vq->last_avail_idx;
974 	uint16_t task_idx, num_descs;
975 
976 	if (!vhost_vq_packed_ring_is_avail(vq)) {
977 		return;
978 	}
979 
980 	task_idx = vhost_vring_packed_desc_get_buffer_id(vq, req_idx, &num_descs);
981 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[task_idx];
982 	blk_task = &task->blk_task;
983 	if (spdk_unlikely(task->used)) {
984 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
985 			    vsession->name, req_idx);
986 		vhost_vq_packed_ring_enqueue(vsession, vq, num_descs,
987 					     task->buffer_id, blk_task->used_len,
988 					     task->inflight_head);
989 		return;
990 	}
991 
992 	task->req_idx = req_idx;
993 	task->num_descs = num_descs;
994 	task->buffer_id = task_idx;
995 	blk_task_init(task);
996 
997 	if (blk_iovs_packed_queue_setup(bvsession, vq, task->req_idx, blk_task->iovs, &blk_task->iovcnt,
998 					&length)) {
999 		*(volatile uint8_t *)(blk_task->iovs[blk_task->iovcnt - 1].iov_base) = VIRTIO_BLK_S_IOERR;
1000 		SPDK_DEBUGLOG(vhost_blk_data, "Aborting request %" PRIu16"\n", req_idx);
1001 	}
1002 
1003 	task->used = false;
1004 	vhost_vq_packed_ring_enqueue(vsession, vq, num_descs,
1005 				     task->buffer_id, blk_task->used_len,
1006 				     task->inflight_head);
1007 }
1008 
1009 static int
1010 _no_bdev_vdev_vq_worker(struct spdk_vhost_virtqueue *vq)
1011 {
1012 	struct spdk_vhost_session *vsession = vq->vsession;
1013 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1014 	bool packed_ring;
1015 
1016 	packed_ring = vq->packed.packed_ring;
1017 	if (packed_ring) {
1018 		no_bdev_process_packed_vq(bvsession, vq);
1019 	} else {
1020 		no_bdev_process_vq(bvsession, vq);
1021 	}
1022 
1023 	vhost_session_vq_used_signal(vq);
1024 
1025 	if (vsession->task_cnt == 0 && bvsession->io_channel) {
1026 		vhost_blk_put_io_channel(bvsession->io_channel);
1027 		bvsession->io_channel = NULL;
1028 	}
1029 
1030 	return SPDK_POLLER_BUSY;
1031 }
1032 
1033 static int
1034 no_bdev_vdev_vq_worker(void *arg)
1035 {
1036 	struct spdk_vhost_virtqueue *vq = arg;
1037 
1038 	return _no_bdev_vdev_vq_worker(vq);
1039 }
1040 
1041 static int
1042 no_bdev_vdev_worker(void *arg)
1043 {
1044 	struct spdk_vhost_blk_session *bvsession = arg;
1045 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1046 	uint16_t q_idx;
1047 
1048 	for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
1049 		_no_bdev_vdev_vq_worker(&vsession->virtqueue[q_idx]);
1050 	}
1051 
1052 	return SPDK_POLLER_BUSY;
1053 }
1054 
1055 static void
1056 vhost_blk_session_unregister_interrupts(struct spdk_vhost_blk_session *bvsession)
1057 {
1058 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1059 	struct spdk_vhost_virtqueue *vq;
1060 	int i;
1061 
1062 	SPDK_DEBUGLOG(vhost_blk, "unregister virtqueues interrupt\n");
1063 	for (i = 0; i < vsession->max_queues; i++) {
1064 		vq = &vsession->virtqueue[i];
1065 		if (vq->intr == NULL) {
1066 			break;
1067 		}
1068 
1069 		SPDK_DEBUGLOG(vhost_blk, "unregister vq[%d]'s kickfd is %d\n",
1070 			      i, vq->vring.kickfd);
1071 		spdk_interrupt_unregister(&vq->intr);
1072 	}
1073 }
1074 
1075 static void
1076 _vhost_blk_vq_register_interrupt(void *arg)
1077 {
1078 	struct spdk_vhost_virtqueue *vq = arg;
1079 	struct spdk_vhost_session *vsession = vq->vsession;
1080 	struct spdk_vhost_blk_dev *bvdev =  to_blk_dev(vsession->vdev);
1081 
1082 	assert(bvdev != NULL);
1083 
1084 	if (bvdev->bdev) {
1085 		vq->intr = spdk_interrupt_register(vq->vring.kickfd, vdev_vq_worker, vq, "vdev_vq_worker");
1086 	} else {
1087 		vq->intr = spdk_interrupt_register(vq->vring.kickfd, no_bdev_vdev_vq_worker, vq,
1088 						   "no_bdev_vdev_vq_worker");
1089 	}
1090 
1091 	if (vq->intr == NULL) {
1092 		SPDK_ERRLOG("Fail to register req notifier handler.\n");
1093 		assert(false);
1094 	}
1095 }
1096 
1097 static void
1098 vhost_blk_vq_register_interrupt(struct spdk_vhost_session *vsession,
1099 				struct spdk_vhost_virtqueue *vq)
1100 {
1101 	spdk_thread_send_msg(vsession->vdev->thread, _vhost_blk_vq_register_interrupt, vq);
1102 }
1103 
1104 static int
1105 vhost_blk_session_register_no_bdev_interrupts(struct spdk_vhost_blk_session *bvsession)
1106 {
1107 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1108 	struct spdk_vhost_virtqueue *vq = NULL;
1109 	int i;
1110 
1111 	SPDK_DEBUGLOG(vhost_blk, "Register virtqueues interrupt\n");
1112 	for (i = 0; i < vsession->max_queues; i++) {
1113 		vq = &vsession->virtqueue[i];
1114 		SPDK_DEBUGLOG(vhost_blk, "Register vq[%d]'s kickfd is %d\n",
1115 			      i, vq->vring.kickfd);
1116 		vq->intr = spdk_interrupt_register(vq->vring.kickfd, no_bdev_vdev_vq_worker, vq,
1117 						   "no_bdev_vdev_vq_worker");
1118 		if (vq->intr == NULL) {
1119 			goto err;
1120 		}
1121 
1122 	}
1123 
1124 	return 0;
1125 
1126 err:
1127 	vhost_blk_session_unregister_interrupts(bvsession);
1128 	return -1;
1129 }
1130 
1131 static void
1132 vhost_blk_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1133 {
1134 	struct spdk_vhost_blk_session *bvsession = cb_arg;
1135 
1136 	vhost_user_session_set_interrupt_mode(&bvsession->vsession, interrupt_mode);
1137 }
1138 
1139 static void
1140 bdev_event_cpl_cb(struct spdk_vhost_dev *vdev, void *ctx)
1141 {
1142 	enum spdk_bdev_event_type type = (enum spdk_bdev_event_type)(uintptr_t)ctx;
1143 	struct spdk_vhost_blk_dev *bvdev;
1144 
1145 	if (type == SPDK_BDEV_EVENT_REMOVE) {
1146 		/* All sessions have been notified, time to close the bdev */
1147 		bvdev = to_blk_dev(vdev);
1148 		assert(bvdev != NULL);
1149 		spdk_put_io_channel(bvdev->dummy_io_channel);
1150 		spdk_bdev_close(bvdev->bdev_desc);
1151 		bvdev->bdev_desc = NULL;
1152 		bvdev->bdev = NULL;
1153 	}
1154 }
1155 
1156 static int
1157 vhost_session_bdev_resize_cb(struct spdk_vhost_dev *vdev,
1158 			     struct spdk_vhost_session *vsession,
1159 			     void *ctx)
1160 {
1161 	SPDK_NOTICELOG("bdev send slave msg to vid(%d)\n", vsession->vid);
1162 #if RTE_VERSION >= RTE_VERSION_NUM(23, 03, 0, 0)
1163 	rte_vhost_backend_config_change(vsession->vid, false);
1164 #else
1165 	rte_vhost_slave_config_change(vsession->vid, false);
1166 #endif
1167 
1168 	return 0;
1169 }
1170 
1171 static void
1172 vhost_user_blk_resize_cb(struct spdk_vhost_dev *vdev, bdev_event_cb_complete cb, void *cb_arg)
1173 {
1174 	vhost_user_dev_foreach_session(vdev, vhost_session_bdev_resize_cb,
1175 				       cb, cb_arg);
1176 }
1177 
1178 static int
1179 vhost_user_session_bdev_remove_cb(struct spdk_vhost_dev *vdev,
1180 				  struct spdk_vhost_session *vsession,
1181 				  void *ctx)
1182 {
1183 	struct spdk_vhost_blk_session *bvsession;
1184 	int rc;
1185 
1186 	bvsession = to_blk_session(vsession);
1187 	if (bvsession->requestq_poller) {
1188 		spdk_poller_unregister(&bvsession->requestq_poller);
1189 		if (vsession->interrupt_mode) {
1190 			vhost_blk_session_unregister_interrupts(bvsession);
1191 			rc = vhost_blk_session_register_no_bdev_interrupts(bvsession);
1192 			if (rc) {
1193 				SPDK_ERRLOG("%s: Interrupt register failed\n", vsession->name);
1194 				return rc;
1195 			}
1196 		}
1197 
1198 		bvsession->requestq_poller = SPDK_POLLER_REGISTER(no_bdev_vdev_worker, bvsession, 0);
1199 		spdk_poller_register_interrupt(bvsession->requestq_poller, vhost_blk_poller_set_interrupt_mode,
1200 					       bvsession);
1201 	}
1202 
1203 	return 0;
1204 }
1205 
1206 static void
1207 vhost_user_bdev_remove_cb(struct spdk_vhost_dev *vdev, bdev_event_cb_complete cb, void *cb_arg)
1208 {
1209 	SPDK_WARNLOG("%s: hot-removing bdev - all further requests will fail.\n",
1210 		     vdev->name);
1211 
1212 	vhost_user_dev_foreach_session(vdev, vhost_user_session_bdev_remove_cb,
1213 				       cb, cb_arg);
1214 }
1215 
1216 static void
1217 vhost_user_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_vhost_dev *vdev,
1218 			 bdev_event_cb_complete cb, void *cb_arg)
1219 {
1220 	switch (type) {
1221 	case SPDK_BDEV_EVENT_REMOVE:
1222 		vhost_user_bdev_remove_cb(vdev, cb, cb_arg);
1223 		break;
1224 	case SPDK_BDEV_EVENT_RESIZE:
1225 		vhost_user_blk_resize_cb(vdev, cb, cb_arg);
1226 		break;
1227 	default:
1228 		assert(false);
1229 		return;
1230 	}
1231 }
1232 
1233 static void
1234 bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1235 	      void *event_ctx)
1236 {
1237 	struct spdk_vhost_dev *vdev = (struct spdk_vhost_dev *)event_ctx;
1238 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1239 
1240 	assert(bvdev != NULL);
1241 
1242 	SPDK_DEBUGLOG(vhost_blk, "Bdev event: type %d, name %s\n",
1243 		      type,
1244 		      bdev->name);
1245 
1246 	switch (type) {
1247 	case SPDK_BDEV_EVENT_REMOVE:
1248 	case SPDK_BDEV_EVENT_RESIZE:
1249 		bvdev->ops->bdev_event(type, vdev, bdev_event_cpl_cb, (void *)type);
1250 		break;
1251 	default:
1252 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1253 		break;
1254 	}
1255 }
1256 
1257 static void
1258 free_task_pool(struct spdk_vhost_blk_session *bvsession)
1259 {
1260 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1261 	struct spdk_vhost_virtqueue *vq;
1262 	uint16_t i;
1263 
1264 	for (i = 0; i < vsession->max_queues; i++) {
1265 		vq = &vsession->virtqueue[i];
1266 		if (vq->tasks == NULL) {
1267 			continue;
1268 		}
1269 
1270 		spdk_free(vq->tasks);
1271 		vq->tasks = NULL;
1272 	}
1273 }
1274 
1275 static int
1276 alloc_vq_task_pool(struct spdk_vhost_session *vsession, uint16_t qid)
1277 {
1278 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1279 	struct spdk_vhost_virtqueue *vq;
1280 	struct spdk_vhost_user_blk_task *task;
1281 	uint32_t task_cnt;
1282 	uint32_t j;
1283 
1284 	if (qid >= SPDK_VHOST_MAX_VQUEUES) {
1285 		return -EINVAL;
1286 	}
1287 
1288 	vq = &vsession->virtqueue[qid];
1289 	if (vq->vring.desc == NULL) {
1290 		return 0;
1291 	}
1292 
1293 	task_cnt = vq->vring.size;
1294 	if (task_cnt > SPDK_VHOST_MAX_VQ_SIZE) {
1295 		/* sanity check */
1296 		SPDK_ERRLOG("%s: virtqueue %"PRIu16" is too big. (size = %"PRIu32", max = %"PRIu32")\n",
1297 			    vsession->name, qid, task_cnt, SPDK_VHOST_MAX_VQ_SIZE);
1298 		return -1;
1299 	}
1300 	vq->tasks = spdk_zmalloc(sizeof(struct spdk_vhost_user_blk_task) * task_cnt,
1301 				 SPDK_CACHE_LINE_SIZE, NULL,
1302 				 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1303 	if (vq->tasks == NULL) {
1304 		SPDK_ERRLOG("%s: failed to allocate %"PRIu32" tasks for virtqueue %"PRIu16"\n",
1305 			    vsession->name, task_cnt, qid);
1306 		return -1;
1307 	}
1308 
1309 	for (j = 0; j < task_cnt; j++) {
1310 		task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[j];
1311 		task->bvsession = bvsession;
1312 		task->req_idx = j;
1313 		task->vq = vq;
1314 	}
1315 
1316 	return 0;
1317 }
1318 
1319 static int
1320 vhost_blk_start(struct spdk_vhost_dev *vdev,
1321 		struct spdk_vhost_session *vsession, void *unused)
1322 {
1323 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1324 	struct spdk_vhost_blk_dev *bvdev;
1325 	int i;
1326 
1327 	/* return if start is already in progress */
1328 	if (bvsession->requestq_poller) {
1329 		SPDK_INFOLOG(vhost, "%s: start in progress\n", vsession->name);
1330 		return -EINPROGRESS;
1331 	}
1332 
1333 	/* validate all I/O queues are in a contiguous index range */
1334 	for (i = 0; i < vsession->max_queues; i++) {
1335 		/* vring.desc and vring.desc_packed are in a union struct
1336 		 * so q->vring.desc can replace q->vring.desc_packed.
1337 		 */
1338 		if (vsession->virtqueue[i].vring.desc == NULL) {
1339 			SPDK_ERRLOG("%s: queue %"PRIu32" is empty\n", vsession->name, i);
1340 			return -1;
1341 		}
1342 	}
1343 
1344 	bvdev = to_blk_dev(vdev);
1345 	assert(bvdev != NULL);
1346 	bvsession->bvdev = bvdev;
1347 
1348 	if (bvdev->bdev) {
1349 		bvsession->io_channel = vhost_blk_get_io_channel(vdev);
1350 		if (!bvsession->io_channel) {
1351 			free_task_pool(bvsession);
1352 			SPDK_ERRLOG("%s: I/O channel allocation failed\n", vsession->name);
1353 			return -1;
1354 		}
1355 	}
1356 
1357 	if (bvdev->bdev) {
1358 		bvsession->requestq_poller = SPDK_POLLER_REGISTER(vdev_worker, bvsession, 0);
1359 	} else {
1360 		bvsession->requestq_poller = SPDK_POLLER_REGISTER(no_bdev_vdev_worker, bvsession, 0);
1361 	}
1362 	SPDK_INFOLOG(vhost, "%s: started poller on lcore %d\n",
1363 		     vsession->name, spdk_env_get_current_core());
1364 
1365 	spdk_poller_register_interrupt(bvsession->requestq_poller, vhost_blk_poller_set_interrupt_mode,
1366 				       bvsession);
1367 
1368 	return 0;
1369 }
1370 
1371 static int
1372 destroy_session_poller_cb(void *arg)
1373 {
1374 	struct spdk_vhost_blk_session *bvsession = arg;
1375 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1376 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vsession->vdev);
1377 	int i;
1378 
1379 	if (vsession->task_cnt > 0 || (pthread_mutex_trylock(&user_dev->lock) != 0)) {
1380 		assert(vsession->stop_retry_count > 0);
1381 		vsession->stop_retry_count--;
1382 		if (vsession->stop_retry_count == 0) {
1383 			SPDK_ERRLOG("%s: Timedout when destroy session (task_cnt %d)\n", vsession->name,
1384 				    vsession->task_cnt);
1385 			spdk_poller_unregister(&bvsession->stop_poller);
1386 			vhost_user_session_stop_done(vsession, -ETIMEDOUT);
1387 		}
1388 
1389 		return SPDK_POLLER_BUSY;
1390 	}
1391 
1392 	for (i = 0; i < vsession->max_queues; i++) {
1393 		vsession->virtqueue[i].next_event_time = 0;
1394 		vhost_vq_used_signal(vsession, &vsession->virtqueue[i]);
1395 	}
1396 
1397 	SPDK_INFOLOG(vhost, "%s: stopping poller on lcore %d\n",
1398 		     vsession->name, spdk_env_get_current_core());
1399 
1400 	if (bvsession->io_channel) {
1401 		vhost_blk_put_io_channel(bvsession->io_channel);
1402 		bvsession->io_channel = NULL;
1403 	}
1404 
1405 	free_task_pool(bvsession);
1406 	spdk_poller_unregister(&bvsession->stop_poller);
1407 	vhost_user_session_stop_done(vsession, 0);
1408 
1409 	pthread_mutex_unlock(&user_dev->lock);
1410 	return SPDK_POLLER_BUSY;
1411 }
1412 
1413 static int
1414 vhost_blk_stop(struct spdk_vhost_dev *vdev,
1415 	       struct spdk_vhost_session *vsession, void *unused)
1416 {
1417 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1418 
1419 	/* return if stop is already in progress */
1420 	if (bvsession->stop_poller) {
1421 		return -EINPROGRESS;
1422 	}
1423 
1424 	spdk_poller_unregister(&bvsession->requestq_poller);
1425 	vhost_blk_session_unregister_interrupts(bvsession);
1426 
1427 	/* vhost_user_session_send_event timeout is 3 seconds, here set retry within 4 seconds */
1428 	bvsession->vsession.stop_retry_count = 4000;
1429 	bvsession->stop_poller = SPDK_POLLER_REGISTER(destroy_session_poller_cb,
1430 				 bvsession, 1000);
1431 	return 0;
1432 }
1433 
1434 static void
1435 vhost_blk_dump_info_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_ctx *w)
1436 {
1437 	struct spdk_vhost_blk_dev *bvdev;
1438 
1439 	bvdev = to_blk_dev(vdev);
1440 	assert(bvdev != NULL);
1441 
1442 	spdk_json_write_named_object_begin(w, "block");
1443 
1444 	spdk_json_write_named_bool(w, "readonly", bvdev->readonly);
1445 
1446 	spdk_json_write_name(w, "bdev");
1447 	if (bvdev->bdev) {
1448 		spdk_json_write_string(w, spdk_bdev_get_name(bvdev->bdev));
1449 	} else {
1450 		spdk_json_write_null(w);
1451 	}
1452 	spdk_json_write_named_string(w, "transport", bvdev->ops->name);
1453 
1454 	spdk_json_write_object_end(w);
1455 }
1456 
1457 static void
1458 vhost_blk_write_config_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_ctx *w)
1459 {
1460 	struct spdk_vhost_blk_dev *bvdev;
1461 
1462 	bvdev = to_blk_dev(vdev);
1463 	assert(bvdev != NULL);
1464 
1465 	if (!bvdev->bdev) {
1466 		return;
1467 	}
1468 
1469 	spdk_json_write_object_begin(w);
1470 	spdk_json_write_named_string(w, "method", "vhost_create_blk_controller");
1471 
1472 	spdk_json_write_named_object_begin(w, "params");
1473 	spdk_json_write_named_string(w, "ctrlr", vdev->name);
1474 	spdk_json_write_named_string(w, "dev_name", spdk_bdev_get_name(bvdev->bdev));
1475 	spdk_json_write_named_string(w, "cpumask",
1476 				     spdk_cpuset_fmt(spdk_thread_get_cpumask(vdev->thread)));
1477 	spdk_json_write_named_bool(w, "readonly", bvdev->readonly);
1478 	spdk_json_write_named_string(w, "transport", bvdev->ops->name);
1479 	spdk_json_write_object_end(w);
1480 
1481 	spdk_json_write_object_end(w);
1482 }
1483 
1484 static int vhost_blk_destroy(struct spdk_vhost_dev *dev);
1485 
1486 static int
1487 vhost_blk_get_config(struct spdk_vhost_dev *vdev, uint8_t *config,
1488 		     uint32_t len)
1489 {
1490 	struct virtio_blk_config blkcfg;
1491 	struct spdk_bdev *bdev;
1492 	uint32_t blk_size;
1493 	uint64_t blkcnt;
1494 
1495 	memset(&blkcfg, 0, sizeof(blkcfg));
1496 	bdev = vhost_blk_get_bdev(vdev);
1497 	if (bdev == NULL) {
1498 		/* We can't just return -1 here as this GET_CONFIG message might
1499 		 * be caused by a QEMU VM reboot. Returning -1 will indicate an
1500 		 * error to QEMU, who might then decide to terminate itself.
1501 		 * We don't want that. A simple reboot shouldn't break the system.
1502 		 *
1503 		 * Presenting a block device with block size 0 and block count 0
1504 		 * doesn't cause any problems on QEMU side and the virtio-pci
1505 		 * device is even still available inside the VM, but there will
1506 		 * be no block device created for it - the kernel drivers will
1507 		 * silently reject it.
1508 		 */
1509 		blk_size = 0;
1510 		blkcnt = 0;
1511 	} else {
1512 		blk_size = spdk_bdev_get_block_size(bdev);
1513 		blkcnt = spdk_bdev_get_num_blocks(bdev);
1514 		if (spdk_bdev_get_buf_align(bdev) > 1) {
1515 			blkcfg.size_max = SPDK_BDEV_LARGE_BUF_MAX_SIZE;
1516 			blkcfg.seg_max = spdk_min(SPDK_VHOST_IOVS_MAX - 2 - 1, SPDK_BDEV_IO_NUM_CHILD_IOV - 2 - 1);
1517 		} else {
1518 			blkcfg.size_max = 131072;
1519 			/*  -2 for REQ and RESP and -1 for region boundary splitting */
1520 			blkcfg.seg_max = SPDK_VHOST_IOVS_MAX - 2 - 1;
1521 		}
1522 	}
1523 
1524 	blkcfg.blk_size = blk_size;
1525 	/* minimum I/O size in blocks */
1526 	blkcfg.min_io_size = 1;
1527 	/* expressed in 512 Bytes sectors */
1528 	blkcfg.capacity = (blkcnt * blk_size) / 512;
1529 	/* QEMU can overwrite this value when started */
1530 	blkcfg.num_queues = SPDK_VHOST_MAX_VQUEUES;
1531 
1532 	if (bdev && spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1533 		/* 16MiB, expressed in 512 Bytes */
1534 		blkcfg.max_discard_sectors = 32768;
1535 		blkcfg.max_discard_seg = 1;
1536 		blkcfg.discard_sector_alignment = blk_size / 512;
1537 	}
1538 	if (bdev && spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1539 		blkcfg.max_write_zeroes_sectors = 32768;
1540 		blkcfg.max_write_zeroes_seg = 1;
1541 	}
1542 
1543 	memcpy(config, &blkcfg, spdk_min(len, sizeof(blkcfg)));
1544 
1545 	return 0;
1546 }
1547 
1548 static int
1549 vhost_blk_set_coalescing(struct spdk_vhost_dev *vdev, uint32_t delay_base_us,
1550 			 uint32_t iops_threshold)
1551 {
1552 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1553 
1554 	assert(bvdev != NULL);
1555 
1556 	return bvdev->ops->set_coalescing(vdev, delay_base_us, iops_threshold);
1557 }
1558 
1559 static void
1560 vhost_blk_get_coalescing(struct spdk_vhost_dev *vdev, uint32_t *delay_base_us,
1561 			 uint32_t *iops_threshold)
1562 {
1563 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1564 
1565 	assert(bvdev != NULL);
1566 
1567 	bvdev->ops->get_coalescing(vdev, delay_base_us, iops_threshold);
1568 }
1569 
1570 static const struct spdk_vhost_user_dev_backend vhost_blk_user_device_backend = {
1571 	.session_ctx_size = sizeof(struct spdk_vhost_blk_session) - sizeof(struct spdk_vhost_session),
1572 	.start_session =  vhost_blk_start,
1573 	.stop_session = vhost_blk_stop,
1574 	.alloc_vq_tasks = alloc_vq_task_pool,
1575 	.register_vq_interrupt = vhost_blk_vq_register_interrupt,
1576 };
1577 
1578 static const struct spdk_vhost_dev_backend vhost_blk_device_backend = {
1579 	.type = VHOST_BACKEND_BLK,
1580 	.vhost_get_config = vhost_blk_get_config,
1581 	.dump_info_json = vhost_blk_dump_info_json,
1582 	.write_config_json = vhost_blk_write_config_json,
1583 	.remove_device = vhost_blk_destroy,
1584 	.set_coalescing = vhost_blk_set_coalescing,
1585 	.get_coalescing = vhost_blk_get_coalescing,
1586 };
1587 
1588 int
1589 virtio_blk_construct_ctrlr(struct spdk_vhost_dev *vdev, const char *address,
1590 			   struct spdk_cpuset *cpumask, const struct spdk_json_val *params,
1591 			   const struct spdk_vhost_user_dev_backend *user_backend)
1592 {
1593 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1594 
1595 	assert(bvdev != NULL);
1596 
1597 	return bvdev->ops->create_ctrlr(vdev, cpumask, address, params, (void *)user_backend);
1598 }
1599 
1600 int
1601 spdk_vhost_blk_construct(const char *name, const char *cpumask, const char *dev_name,
1602 			 const char *transport, const struct spdk_json_val *params)
1603 {
1604 	struct spdk_vhost_blk_dev *bvdev = NULL;
1605 	struct spdk_vhost_dev *vdev;
1606 	struct spdk_bdev *bdev;
1607 	const char *transport_name = VIRTIO_BLK_DEFAULT_TRANSPORT;
1608 	int ret = 0;
1609 
1610 	bvdev = calloc(1, sizeof(*bvdev));
1611 	if (bvdev == NULL) {
1612 		ret = -ENOMEM;
1613 		goto out;
1614 	}
1615 
1616 	if (transport != NULL) {
1617 		transport_name = transport;
1618 	}
1619 
1620 	bvdev->ops = virtio_blk_get_transport_ops(transport_name);
1621 	if (!bvdev->ops) {
1622 		ret = -EINVAL;
1623 		SPDK_ERRLOG("Transport type '%s' unavailable.\n", transport_name);
1624 		goto out;
1625 	}
1626 
1627 	ret = spdk_bdev_open_ext(dev_name, true, bdev_event_cb, bvdev, &bvdev->bdev_desc);
1628 	if (ret != 0) {
1629 		SPDK_ERRLOG("%s: could not open bdev '%s', error=%d\n",
1630 			    name, dev_name, ret);
1631 		goto out;
1632 	}
1633 	bdev = spdk_bdev_desc_get_bdev(bvdev->bdev_desc);
1634 
1635 	vdev = &bvdev->vdev;
1636 	vdev->virtio_features = SPDK_VHOST_BLK_FEATURES_BASE;
1637 	vdev->disabled_features = SPDK_VHOST_BLK_DISABLED_FEATURES;
1638 	vdev->protocol_features = SPDK_VHOST_BLK_PROTOCOL_FEATURES;
1639 
1640 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1641 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_DISCARD);
1642 	}
1643 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1644 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_WRITE_ZEROES);
1645 	}
1646 
1647 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_FLUSH)) {
1648 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_FLUSH);
1649 	}
1650 
1651 	/*
1652 	 * When starting qemu with multiqueue enable, the vhost device will
1653 	 * be started/stopped many times, related to the queues num, as the
1654 	 * exact number of queues used for this device is not known at the time.
1655 	 * The target has to stop and start the device once got a valid IO queue.
1656 	 * When stopping and starting the vhost device, the backend bdev io device
1657 	 * will be deleted and created repeatedly.
1658 	 * Hold a bdev reference so that in the struct spdk_vhost_blk_dev, so that
1659 	 * the io device will not be deleted.
1660 	 */
1661 	bvdev->dummy_io_channel = spdk_bdev_get_io_channel(bvdev->bdev_desc);
1662 
1663 	bvdev->bdev = bdev;
1664 	bvdev->readonly = false;
1665 	ret = vhost_dev_register(vdev, name, cpumask, params, &vhost_blk_device_backend,
1666 				 &vhost_blk_user_device_backend);
1667 	if (ret != 0) {
1668 		spdk_put_io_channel(bvdev->dummy_io_channel);
1669 		spdk_bdev_close(bvdev->bdev_desc);
1670 		goto out;
1671 	}
1672 
1673 	SPDK_INFOLOG(vhost, "%s: using bdev '%s'\n", name, dev_name);
1674 out:
1675 	if (ret != 0 && bvdev) {
1676 		free(bvdev);
1677 	}
1678 	return ret;
1679 }
1680 
1681 int
1682 virtio_blk_destroy_ctrlr(struct spdk_vhost_dev *vdev)
1683 {
1684 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1685 
1686 	assert(bvdev != NULL);
1687 
1688 	return bvdev->ops->destroy_ctrlr(vdev);
1689 }
1690 
1691 static int
1692 vhost_blk_destroy(struct spdk_vhost_dev *vdev)
1693 {
1694 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1695 	int rc;
1696 
1697 	assert(bvdev != NULL);
1698 
1699 	rc = vhost_dev_unregister(&bvdev->vdev);
1700 	if (rc != 0) {
1701 		return rc;
1702 	}
1703 
1704 	/* if the bdev is removed, don't need call spdk_put_io_channel. */
1705 	if (bvdev->bdev) {
1706 		spdk_put_io_channel(bvdev->dummy_io_channel);
1707 	}
1708 
1709 	if (bvdev->bdev_desc) {
1710 		spdk_bdev_close(bvdev->bdev_desc);
1711 		bvdev->bdev_desc = NULL;
1712 	}
1713 	bvdev->bdev = NULL;
1714 
1715 	free(bvdev);
1716 	return 0;
1717 }
1718 
1719 struct spdk_io_channel *
1720 vhost_blk_get_io_channel(struct spdk_vhost_dev *vdev)
1721 {
1722 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1723 
1724 	assert(bvdev != NULL);
1725 
1726 	return spdk_bdev_get_io_channel(bvdev->bdev_desc);
1727 }
1728 
1729 void
1730 vhost_blk_put_io_channel(struct spdk_io_channel *ch)
1731 {
1732 	spdk_put_io_channel(ch);
1733 }
1734 
1735 static struct spdk_virtio_blk_transport *
1736 vhost_user_blk_create(const struct spdk_json_val *params)
1737 {
1738 	int ret;
1739 	struct spdk_virtio_blk_transport *vhost_user_blk;
1740 
1741 	vhost_user_blk = calloc(1, sizeof(*vhost_user_blk));
1742 	if (!vhost_user_blk) {
1743 		return NULL;
1744 	}
1745 
1746 	ret = vhost_user_init();
1747 	if (ret != 0) {
1748 		free(vhost_user_blk);
1749 		return NULL;
1750 	}
1751 
1752 	return vhost_user_blk;
1753 }
1754 
1755 static int
1756 vhost_user_blk_destroy(struct spdk_virtio_blk_transport *transport,
1757 		       spdk_vhost_fini_cb cb_fn)
1758 {
1759 	vhost_user_fini(cb_fn);
1760 	free(transport);
1761 	return 0;
1762 }
1763 
1764 struct rpc_vhost_blk {
1765 	bool readonly;
1766 	bool packed_ring;
1767 	bool packed_ring_recovery;
1768 };
1769 
1770 static const struct spdk_json_object_decoder rpc_construct_vhost_blk[] = {
1771 	{"readonly", offsetof(struct rpc_vhost_blk, readonly), spdk_json_decode_bool, true},
1772 	{"packed_ring", offsetof(struct rpc_vhost_blk, packed_ring), spdk_json_decode_bool, true},
1773 	{"packed_ring_recovery", offsetof(struct rpc_vhost_blk, packed_ring_recovery), spdk_json_decode_bool, true},
1774 };
1775 
1776 static int
1777 vhost_user_blk_create_ctrlr(struct spdk_vhost_dev *vdev, struct spdk_cpuset *cpumask,
1778 			    const char *address, const struct spdk_json_val *params, void *custom_opts)
1779 {
1780 	struct rpc_vhost_blk req = {0};
1781 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1782 
1783 	assert(bvdev != NULL);
1784 
1785 	if (spdk_json_decode_object_relaxed(params, rpc_construct_vhost_blk,
1786 					    SPDK_COUNTOF(rpc_construct_vhost_blk),
1787 					    &req)) {
1788 		SPDK_DEBUGLOG(vhost_blk, "spdk_json_decode_object failed\n");
1789 		return -EINVAL;
1790 	}
1791 
1792 	vdev->packed_ring_recovery = false;
1793 
1794 	if (req.packed_ring) {
1795 		vdev->virtio_features |= (uint64_t)req.packed_ring << VIRTIO_F_RING_PACKED;
1796 		vdev->packed_ring_recovery = req.packed_ring_recovery;
1797 	}
1798 	if (req.readonly) {
1799 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_RO);
1800 		bvdev->readonly = req.readonly;
1801 	}
1802 
1803 	return vhost_user_dev_register(vdev, address, cpumask, custom_opts);
1804 }
1805 
1806 static int
1807 vhost_user_blk_destroy_ctrlr(struct spdk_vhost_dev *vdev)
1808 {
1809 	return vhost_user_dev_unregister(vdev);
1810 }
1811 
1812 static void
1813 vhost_user_blk_dump_opts(struct spdk_virtio_blk_transport *transport, struct spdk_json_write_ctx *w)
1814 {
1815 	assert(w != NULL);
1816 
1817 	spdk_json_write_named_string(w, "name", transport->ops->name);
1818 }
1819 
1820 static const struct spdk_virtio_blk_transport_ops vhost_user_blk = {
1821 	.name = "vhost_user_blk",
1822 
1823 	.dump_opts = vhost_user_blk_dump_opts,
1824 
1825 	.create = vhost_user_blk_create,
1826 	.destroy = vhost_user_blk_destroy,
1827 
1828 	.create_ctrlr = vhost_user_blk_create_ctrlr,
1829 	.destroy_ctrlr = vhost_user_blk_destroy_ctrlr,
1830 
1831 	.bdev_event = vhost_user_bdev_event_cb,
1832 	.set_coalescing = vhost_user_set_coalescing,
1833 	.get_coalescing = vhost_user_get_coalescing,
1834 };
1835 
1836 SPDK_VIRTIO_BLK_TRANSPORT_REGISTER(vhost_user_blk, &vhost_user_blk);
1837 
1838 SPDK_LOG_REGISTER_COMPONENT(vhost_blk)
1839 SPDK_LOG_REGISTER_COMPONENT(vhost_blk_data)
1840