xref: /spdk/lib/vhost/vhost_blk.c (revision 60982c759db49b4f4579f16e3b24df0725ba4b94)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation. All rights reserved.
3  *   All rights reserved.
4  */
5 
6 #include <linux/virtio_blk.h>
7 
8 #include "spdk/env.h"
9 #include "spdk/bdev.h"
10 #include "spdk/bdev_module.h"
11 #include "spdk/thread.h"
12 #include "spdk/likely.h"
13 #include "spdk/string.h"
14 #include "spdk/util.h"
15 #include "spdk/vhost.h"
16 #include "spdk/json.h"
17 
18 #include "vhost_internal.h"
19 #include <rte_version.h>
20 
21 /* Minimal set of features supported by every SPDK VHOST-BLK device */
22 #define SPDK_VHOST_BLK_FEATURES_BASE (SPDK_VHOST_FEATURES | \
23 		(1ULL << VIRTIO_BLK_F_SIZE_MAX) | (1ULL << VIRTIO_BLK_F_SEG_MAX) | \
24 		(1ULL << VIRTIO_BLK_F_GEOMETRY) | (1ULL << VIRTIO_BLK_F_BLK_SIZE) | \
25 		(1ULL << VIRTIO_BLK_F_TOPOLOGY) | (1ULL << VIRTIO_BLK_F_BARRIER)  | \
26 		(1ULL << VIRTIO_BLK_F_SCSI)     | (1ULL << VIRTIO_BLK_F_CONFIG_WCE) | \
27 		(1ULL << VIRTIO_BLK_F_MQ))
28 
29 /* Not supported features */
30 #define SPDK_VHOST_BLK_DISABLED_FEATURES (SPDK_VHOST_DISABLED_FEATURES | \
31 		(1ULL << VIRTIO_BLK_F_GEOMETRY) | (1ULL << VIRTIO_BLK_F_CONFIG_WCE) | \
32 		(1ULL << VIRTIO_BLK_F_BARRIER)  | (1ULL << VIRTIO_BLK_F_SCSI))
33 
34 /* Vhost-blk support protocol features */
35 #define SPDK_VHOST_BLK_PROTOCOL_FEATURES ((1ULL << VHOST_USER_PROTOCOL_F_CONFIG) | \
36 		(1ULL << VHOST_USER_PROTOCOL_F_INFLIGHT_SHMFD))
37 
38 #define VIRTIO_BLK_DEFAULT_TRANSPORT "vhost_user_blk"
39 
40 struct spdk_vhost_user_blk_task {
41 	struct spdk_vhost_blk_task blk_task;
42 	struct spdk_vhost_blk_session *bvsession;
43 	struct spdk_vhost_virtqueue *vq;
44 
45 	uint16_t req_idx;
46 	uint16_t num_descs;
47 	uint16_t buffer_id;
48 	uint16_t inflight_head;
49 
50 	/* If set, the task is currently used for I/O processing. */
51 	bool used;
52 };
53 
54 struct spdk_vhost_blk_dev {
55 	struct spdk_vhost_dev vdev;
56 	struct spdk_bdev *bdev;
57 	struct spdk_bdev_desc *bdev_desc;
58 	const struct spdk_virtio_blk_transport_ops *ops;
59 
60 	/* dummy_io_channel is used to hold a bdev reference */
61 	struct spdk_io_channel *dummy_io_channel;
62 	bool readonly;
63 };
64 
65 struct spdk_vhost_blk_session {
66 	/* The parent session must be the very first field in this struct */
67 	struct spdk_vhost_session vsession;
68 	struct spdk_vhost_blk_dev *bvdev;
69 	struct spdk_poller *requestq_poller;
70 	struct spdk_io_channel *io_channel;
71 	struct spdk_poller *stop_poller;
72 };
73 
74 /* forward declaration */
75 static const struct spdk_vhost_dev_backend vhost_blk_device_backend;
76 
77 static void vhost_user_blk_request_finish(uint8_t status, struct spdk_vhost_blk_task *task,
78 		void *cb_arg);
79 
80 static int
81 vhost_user_process_blk_request(struct spdk_vhost_user_blk_task *user_task)
82 {
83 	struct spdk_vhost_blk_session *bvsession = user_task->bvsession;
84 	struct spdk_vhost_dev *vdev = &bvsession->bvdev->vdev;
85 
86 	return virtio_blk_process_request(vdev, bvsession->io_channel, &user_task->blk_task,
87 					  vhost_user_blk_request_finish, NULL);
88 }
89 
90 static struct spdk_vhost_blk_dev *
91 to_blk_dev(struct spdk_vhost_dev *vdev)
92 {
93 	if (vdev == NULL) {
94 		return NULL;
95 	}
96 
97 	if (vdev->backend->type != VHOST_BACKEND_BLK) {
98 		SPDK_ERRLOG("%s: not a vhost-blk device\n", vdev->name);
99 		return NULL;
100 	}
101 
102 	return SPDK_CONTAINEROF(vdev, struct spdk_vhost_blk_dev, vdev);
103 }
104 
105 struct spdk_bdev *
106 vhost_blk_get_bdev(struct spdk_vhost_dev *vdev)
107 {
108 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
109 
110 	assert(bvdev != NULL);
111 
112 	return bvdev->bdev;
113 }
114 
115 static struct spdk_vhost_blk_session *
116 to_blk_session(struct spdk_vhost_session *vsession)
117 {
118 	assert(vsession->vdev->backend->type == VHOST_BACKEND_BLK);
119 	return (struct spdk_vhost_blk_session *)vsession;
120 }
121 
122 static inline void
123 blk_task_inc_task_cnt(struct spdk_vhost_user_blk_task *task)
124 {
125 	task->bvsession->vsession.task_cnt++;
126 }
127 
128 static inline void
129 blk_task_dec_task_cnt(struct spdk_vhost_user_blk_task *task)
130 {
131 	assert(task->bvsession->vsession.task_cnt > 0);
132 	task->bvsession->vsession.task_cnt--;
133 }
134 
135 static void
136 blk_task_finish(struct spdk_vhost_user_blk_task *task)
137 {
138 	blk_task_dec_task_cnt(task);
139 	task->used = false;
140 }
141 
142 static void
143 blk_task_init(struct spdk_vhost_user_blk_task *task)
144 {
145 	struct spdk_vhost_blk_task *blk_task = &task->blk_task;
146 
147 	task->used = true;
148 	blk_task->iovcnt = SPDK_COUNTOF(blk_task->iovs);
149 	blk_task->status = NULL;
150 	blk_task->used_len = 0;
151 	blk_task->payload_size = 0;
152 }
153 
154 static void
155 blk_task_enqueue(struct spdk_vhost_user_blk_task *task)
156 {
157 	if (task->vq->packed.packed_ring) {
158 		vhost_vq_packed_ring_enqueue(&task->bvsession->vsession, task->vq,
159 					     task->num_descs,
160 					     task->buffer_id, task->blk_task.used_len,
161 					     task->inflight_head);
162 	} else {
163 		vhost_vq_used_ring_enqueue(&task->bvsession->vsession, task->vq,
164 					   task->req_idx, task->blk_task.used_len);
165 	}
166 }
167 
168 static void
169 vhost_user_blk_request_finish(uint8_t status, struct spdk_vhost_blk_task *task, void *cb_arg)
170 {
171 	struct spdk_vhost_user_blk_task *user_task;
172 
173 	user_task = SPDK_CONTAINEROF(task, struct spdk_vhost_user_blk_task, blk_task);
174 
175 	blk_task_enqueue(user_task);
176 
177 	SPDK_DEBUGLOG(vhost_blk, "Finished task (%p) req_idx=%d\n status: %" PRIu8"\n",
178 		      user_task, user_task->req_idx, status);
179 	blk_task_finish(user_task);
180 }
181 
182 static void
183 blk_request_finish(uint8_t status, struct spdk_vhost_blk_task *task)
184 {
185 
186 	if (task->status) {
187 		*task->status = status;
188 	}
189 
190 	task->cb(status, task, task->cb_arg);
191 }
192 
193 /*
194  * Process task's descriptor chain and setup data related fields.
195  * Return
196  *   total size of supplied buffers
197  *
198  *   FIXME: Make this function return to rd_cnt and wr_cnt
199  */
200 static int
201 blk_iovs_split_queue_setup(struct spdk_vhost_blk_session *bvsession,
202 			   struct spdk_vhost_virtqueue *vq,
203 			   uint16_t req_idx, struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
204 {
205 	struct spdk_vhost_session *vsession = &bvsession->vsession;
206 	struct spdk_vhost_dev *vdev = vsession->vdev;
207 	struct vring_desc *desc, *desc_table;
208 	uint16_t out_cnt = 0, cnt = 0;
209 	uint32_t desc_table_size, len = 0;
210 	uint32_t desc_handled_cnt;
211 	int rc;
212 
213 	rc = vhost_vq_get_desc(vsession, vq, req_idx, &desc, &desc_table, &desc_table_size);
214 	if (rc != 0) {
215 		SPDK_ERRLOG("%s: invalid descriptor at index %"PRIu16".\n", vdev->name, req_idx);
216 		return -1;
217 	}
218 
219 	desc_handled_cnt = 0;
220 	while (1) {
221 		/*
222 		 * Maximum cnt reached?
223 		 * Should not happen if request is well formatted, otherwise this is a BUG.
224 		 */
225 		if (spdk_unlikely(cnt == *iovs_cnt)) {
226 			SPDK_DEBUGLOG(vhost_blk, "%s: max IOVs in request reached (req_idx = %"PRIu16").\n",
227 				      vsession->name, req_idx);
228 			return -1;
229 		}
230 
231 		if (spdk_unlikely(vhost_vring_desc_to_iov(vsession, iovs, &cnt, desc))) {
232 			SPDK_DEBUGLOG(vhost_blk, "%s: invalid descriptor %" PRIu16" (req_idx = %"PRIu16").\n",
233 				      vsession->name, req_idx, cnt);
234 			return -1;
235 		}
236 
237 		len += desc->len;
238 
239 		out_cnt += vhost_vring_desc_is_wr(desc);
240 
241 		rc = vhost_vring_desc_get_next(&desc, desc_table, desc_table_size);
242 		if (rc != 0) {
243 			SPDK_ERRLOG("%s: descriptor chain at index %"PRIu16" terminated unexpectedly.\n",
244 				    vsession->name, req_idx);
245 			return -1;
246 		} else if (desc == NULL) {
247 			break;
248 		}
249 
250 		desc_handled_cnt++;
251 		if (spdk_unlikely(desc_handled_cnt > desc_table_size)) {
252 			/* Break a cycle and report an error, if any. */
253 			SPDK_ERRLOG("%s: found a cycle in the descriptor chain: desc_table_size = %d, desc_handled_cnt = %d.\n",
254 				    vsession->name, desc_table_size, desc_handled_cnt);
255 			return -1;
256 		}
257 	}
258 
259 	/*
260 	 * There must be least two descriptors.
261 	 * First contain request so it must be readable.
262 	 * Last descriptor contain buffer for response so it must be writable.
263 	 */
264 	if (spdk_unlikely(out_cnt == 0 || cnt < 2)) {
265 		return -1;
266 	}
267 
268 	*length = len;
269 	*iovs_cnt = cnt;
270 	return 0;
271 }
272 
273 static int
274 blk_iovs_packed_desc_setup(struct spdk_vhost_session *vsession,
275 			   struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
276 			   struct vring_packed_desc *desc_table, uint16_t desc_table_size,
277 			   struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
278 {
279 	struct vring_packed_desc *desc;
280 	uint16_t cnt = 0, out_cnt = 0;
281 	uint32_t len = 0;
282 
283 	if (desc_table == NULL) {
284 		desc = &vq->vring.desc_packed[req_idx];
285 	} else {
286 		req_idx = 0;
287 		desc = desc_table;
288 	}
289 
290 	while (1) {
291 		/*
292 		 * Maximum cnt reached?
293 		 * Should not happen if request is well formatted, otherwise this is a BUG.
294 		 */
295 		if (spdk_unlikely(cnt == *iovs_cnt)) {
296 			SPDK_ERRLOG("%s: max IOVs in request reached (req_idx = %"PRIu16").\n",
297 				    vsession->name, req_idx);
298 			return -EINVAL;
299 		}
300 
301 		if (spdk_unlikely(vhost_vring_packed_desc_to_iov(vsession, iovs, &cnt, desc))) {
302 			SPDK_ERRLOG("%s: invalid descriptor %" PRIu16" (req_idx = %"PRIu16").\n",
303 				    vsession->name, req_idx, cnt);
304 			return -EINVAL;
305 		}
306 
307 		len += desc->len;
308 		out_cnt += vhost_vring_packed_desc_is_wr(desc);
309 
310 		/* desc is NULL means we reach the last desc of this request */
311 		vhost_vring_packed_desc_get_next(&desc, &req_idx, vq, desc_table, desc_table_size);
312 		if (desc == NULL) {
313 			break;
314 		}
315 	}
316 
317 	/*
318 	 * There must be least two descriptors.
319 	 * First contain request so it must be readable.
320 	 * Last descriptor contain buffer for response so it must be writable.
321 	 */
322 	if (spdk_unlikely(out_cnt == 0 || cnt < 2)) {
323 		return -EINVAL;
324 	}
325 
326 	*length = len;
327 	*iovs_cnt = cnt;
328 
329 	return 0;
330 }
331 
332 static int
333 blk_iovs_packed_queue_setup(struct spdk_vhost_blk_session *bvsession,
334 			    struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
335 			    struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
336 {
337 	struct spdk_vhost_session *vsession = &bvsession->vsession;
338 	struct spdk_vhost_dev *vdev = vsession->vdev;
339 	struct vring_packed_desc *desc = NULL, *desc_table;
340 	uint32_t desc_table_size;
341 	int rc;
342 
343 	rc = vhost_vq_get_desc_packed(vsession, vq, req_idx, &desc,
344 				      &desc_table, &desc_table_size);
345 	if (spdk_unlikely(rc != 0)) {
346 		SPDK_ERRLOG("%s: Invalid descriptor at index %"PRIu16".\n", vdev->name, req_idx);
347 		return rc;
348 	}
349 
350 	return blk_iovs_packed_desc_setup(vsession, vq, req_idx, desc_table, desc_table_size,
351 					  iovs, iovs_cnt, length);
352 }
353 
354 static int
355 blk_iovs_inflight_queue_setup(struct spdk_vhost_blk_session *bvsession,
356 			      struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
357 			      struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
358 {
359 	struct spdk_vhost_session *vsession = &bvsession->vsession;
360 	struct spdk_vhost_dev *vdev = vsession->vdev;
361 	spdk_vhost_inflight_desc *inflight_desc;
362 	struct vring_packed_desc *desc_table;
363 	uint16_t out_cnt = 0, cnt = 0;
364 	uint32_t desc_table_size, len = 0;
365 	int rc = 0;
366 
367 	rc = vhost_inflight_queue_get_desc(vsession, vq->vring_inflight.inflight_packed->desc,
368 					   req_idx, &inflight_desc, &desc_table, &desc_table_size);
369 	if (spdk_unlikely(rc != 0)) {
370 		SPDK_ERRLOG("%s: Invalid descriptor at index %"PRIu16".\n", vdev->name, req_idx);
371 		return rc;
372 	}
373 
374 	if (desc_table != NULL) {
375 		return blk_iovs_packed_desc_setup(vsession, vq, req_idx, desc_table, desc_table_size,
376 						  iovs, iovs_cnt, length);
377 	}
378 
379 	while (1) {
380 		/*
381 		 * Maximum cnt reached?
382 		 * Should not happen if request is well formatted, otherwise this is a BUG.
383 		 */
384 		if (spdk_unlikely(cnt == *iovs_cnt)) {
385 			SPDK_ERRLOG("%s: max IOVs in request reached (req_idx = %"PRIu16").\n",
386 				    vsession->name, req_idx);
387 			return -EINVAL;
388 		}
389 
390 		if (spdk_unlikely(vhost_vring_inflight_desc_to_iov(vsession, iovs, &cnt, inflight_desc))) {
391 			SPDK_ERRLOG("%s: invalid descriptor %" PRIu16" (req_idx = %"PRIu16").\n",
392 				    vsession->name, req_idx, cnt);
393 			return -EINVAL;
394 		}
395 
396 		len += inflight_desc->len;
397 		out_cnt += vhost_vring_inflight_desc_is_wr(inflight_desc);
398 
399 		/* Without F_NEXT means it's the last desc */
400 		if ((inflight_desc->flags & VRING_DESC_F_NEXT) == 0) {
401 			break;
402 		}
403 
404 		inflight_desc = &vq->vring_inflight.inflight_packed->desc[inflight_desc->next];
405 	}
406 
407 	/*
408 	 * There must be least two descriptors.
409 	 * First contain request so it must be readable.
410 	 * Last descriptor contain buffer for response so it must be writable.
411 	 */
412 	if (spdk_unlikely(out_cnt == 0 || cnt < 2)) {
413 		return -EINVAL;
414 	}
415 
416 	*length = len;
417 	*iovs_cnt = cnt;
418 
419 	return 0;
420 }
421 
422 static void
423 blk_request_complete_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
424 {
425 	struct spdk_vhost_blk_task *task = cb_arg;
426 
427 	spdk_bdev_free_io(bdev_io);
428 	blk_request_finish(success ? VIRTIO_BLK_S_OK : VIRTIO_BLK_S_IOERR, task);
429 }
430 
431 static void
432 blk_request_resubmit(void *arg)
433 {
434 	struct spdk_vhost_blk_task *task = arg;
435 	int rc = 0;
436 
437 	rc = virtio_blk_process_request(task->bdev_io_wait_vdev, task->bdev_io_wait_ch, task,
438 					task->cb, task->cb_arg);
439 	if (rc == 0) {
440 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p resubmitted ======\n", task);
441 	} else {
442 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p failed ======\n", task);
443 	}
444 }
445 
446 static inline void
447 blk_request_queue_io(struct spdk_vhost_dev *vdev, struct spdk_io_channel *ch,
448 		     struct spdk_vhost_blk_task *task)
449 {
450 	int rc;
451 	struct spdk_bdev *bdev = vhost_blk_get_bdev(vdev);
452 
453 	task->bdev_io_wait.bdev = bdev;
454 	task->bdev_io_wait.cb_fn = blk_request_resubmit;
455 	task->bdev_io_wait.cb_arg = task;
456 	task->bdev_io_wait_ch = ch;
457 	task->bdev_io_wait_vdev = vdev;
458 
459 	rc = spdk_bdev_queue_io_wait(bdev, ch, &task->bdev_io_wait);
460 	if (rc != 0) {
461 		blk_request_finish(VIRTIO_BLK_S_IOERR, task);
462 	}
463 }
464 
465 int
466 virtio_blk_process_request(struct spdk_vhost_dev *vdev, struct spdk_io_channel *ch,
467 			   struct spdk_vhost_blk_task *task, virtio_blk_request_cb cb, void *cb_arg)
468 {
469 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
470 	struct virtio_blk_outhdr req;
471 	struct virtio_blk_discard_write_zeroes *desc;
472 	struct iovec *iov;
473 	uint32_t type;
474 	uint64_t flush_bytes;
475 	uint32_t payload_len;
476 	uint16_t iovcnt;
477 	int rc;
478 
479 	assert(bvdev != NULL);
480 
481 	task->cb = cb;
482 	task->cb_arg = cb_arg;
483 
484 	iov = &task->iovs[0];
485 	if (spdk_unlikely(iov->iov_len != sizeof(req))) {
486 		SPDK_DEBUGLOG(vhost_blk,
487 			      "First descriptor size is %zu but expected %zu (task = %p).\n",
488 			      iov->iov_len, sizeof(req), task);
489 		blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
490 		return -1;
491 	}
492 
493 	/* Some SeaBIOS versions don't align the virtio_blk_outhdr on an 8-byte boundary, which
494 	 * triggers ubsan errors.  So copy this small 16-byte structure to the stack to workaround
495 	 * this problem.
496 	 */
497 	memcpy(&req, iov->iov_base, sizeof(req));
498 
499 	iov = &task->iovs[task->iovcnt - 1];
500 	if (spdk_unlikely(iov->iov_len != 1)) {
501 		SPDK_DEBUGLOG(vhost_blk,
502 			      "Last descriptor size is %zu but expected %d (task = %p).\n",
503 			      iov->iov_len, 1, task);
504 		blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
505 		return -1;
506 	}
507 
508 	payload_len = task->payload_size;
509 	task->status = iov->iov_base;
510 	payload_len -= sizeof(req) + sizeof(*task->status);
511 	iovcnt = task->iovcnt - 2;
512 
513 	type = req.type;
514 #ifdef VIRTIO_BLK_T_BARRIER
515 	/* Don't care about barrier for now (as QEMU's virtio-blk do). */
516 	type &= ~VIRTIO_BLK_T_BARRIER;
517 #endif
518 
519 	switch (type) {
520 	case VIRTIO_BLK_T_IN:
521 	case VIRTIO_BLK_T_OUT:
522 		if (spdk_unlikely(payload_len == 0 || (payload_len & (512 - 1)) != 0)) {
523 			SPDK_ERRLOG("%s - passed IO buffer is not multiple of 512b (task = %p).\n",
524 				    type ? "WRITE" : "READ", task);
525 			blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
526 			return -1;
527 		}
528 
529 		if (type == VIRTIO_BLK_T_IN) {
530 			task->used_len = payload_len + sizeof(*task->status);
531 			rc = spdk_bdev_readv(bvdev->bdev_desc, ch,
532 					     &task->iovs[1], iovcnt, req.sector * 512,
533 					     payload_len, blk_request_complete_cb, task);
534 		} else if (!bvdev->readonly) {
535 			task->used_len = sizeof(*task->status);
536 			rc = spdk_bdev_writev(bvdev->bdev_desc, ch,
537 					      &task->iovs[1], iovcnt, req.sector * 512,
538 					      payload_len, blk_request_complete_cb, task);
539 		} else {
540 			SPDK_DEBUGLOG(vhost_blk, "Device is in read-only mode!\n");
541 			rc = -1;
542 		}
543 
544 		if (rc) {
545 			if (rc == -ENOMEM) {
546 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
547 				blk_request_queue_io(vdev, ch, task);
548 			} else {
549 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
550 				return -1;
551 			}
552 		}
553 		break;
554 	case VIRTIO_BLK_T_DISCARD:
555 		desc = task->iovs[1].iov_base;
556 		if (payload_len != sizeof(*desc)) {
557 			SPDK_NOTICELOG("Invalid discard payload size: %u\n", payload_len);
558 			blk_request_finish(VIRTIO_BLK_S_IOERR, task);
559 			return -1;
560 		}
561 
562 		if (desc->flags & VIRTIO_BLK_WRITE_ZEROES_FLAG_UNMAP) {
563 			SPDK_ERRLOG("UNMAP flag is only used for WRITE ZEROES command\n");
564 			blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
565 			return -1;
566 		}
567 
568 		rc = spdk_bdev_unmap(bvdev->bdev_desc, ch,
569 				     desc->sector * 512, desc->num_sectors * 512,
570 				     blk_request_complete_cb, task);
571 		if (rc) {
572 			if (rc == -ENOMEM) {
573 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
574 				blk_request_queue_io(vdev, ch, task);
575 			} else {
576 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
577 				return -1;
578 			}
579 		}
580 		break;
581 	case VIRTIO_BLK_T_WRITE_ZEROES:
582 		desc = task->iovs[1].iov_base;
583 		if (payload_len != sizeof(*desc)) {
584 			SPDK_NOTICELOG("Invalid write zeroes payload size: %u\n", payload_len);
585 			blk_request_finish(VIRTIO_BLK_S_IOERR, task);
586 			return -1;
587 		}
588 
589 		/* Unmap this range, SPDK doesn't support it, kernel will enable this flag by default
590 		 * without checking unmap feature is negotiated or not, the flag isn't mandatory, so
591 		 * just print a warning.
592 		 */
593 		if (desc->flags & VIRTIO_BLK_WRITE_ZEROES_FLAG_UNMAP) {
594 			SPDK_WARNLOG("Ignore the unmap flag for WRITE ZEROES from %"PRIx64", len %"PRIx64"\n",
595 				     (uint64_t)desc->sector * 512, (uint64_t)desc->num_sectors * 512);
596 		}
597 
598 		rc = spdk_bdev_write_zeroes(bvdev->bdev_desc, ch,
599 					    desc->sector * 512, desc->num_sectors * 512,
600 					    blk_request_complete_cb, task);
601 		if (rc) {
602 			if (rc == -ENOMEM) {
603 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
604 				blk_request_queue_io(vdev, ch, task);
605 			} else {
606 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
607 				return -1;
608 			}
609 		}
610 		break;
611 	case VIRTIO_BLK_T_FLUSH:
612 		flush_bytes = spdk_bdev_get_num_blocks(bvdev->bdev) * spdk_bdev_get_block_size(bvdev->bdev);
613 		if (req.sector != 0) {
614 			SPDK_NOTICELOG("sector must be zero for flush command\n");
615 			blk_request_finish(VIRTIO_BLK_S_IOERR, task);
616 			return -1;
617 		}
618 		rc = spdk_bdev_flush(bvdev->bdev_desc, ch,
619 				     0, flush_bytes,
620 				     blk_request_complete_cb, task);
621 		if (rc) {
622 			if (rc == -ENOMEM) {
623 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
624 				blk_request_queue_io(vdev, ch, task);
625 			} else {
626 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
627 				return -1;
628 			}
629 		}
630 		break;
631 	case VIRTIO_BLK_T_GET_ID:
632 		if (!iovcnt || !payload_len) {
633 			blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
634 			return -1;
635 		}
636 		task->used_len = spdk_min((size_t)VIRTIO_BLK_ID_BYTES, task->iovs[1].iov_len);
637 		spdk_strcpy_pad(task->iovs[1].iov_base, spdk_bdev_get_name(bvdev->bdev),
638 				task->used_len, ' ');
639 		blk_request_finish(VIRTIO_BLK_S_OK, task);
640 		break;
641 	default:
642 		SPDK_DEBUGLOG(vhost_blk, "Not supported request type '%"PRIu32"'.\n", type);
643 		blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
644 		return -1;
645 	}
646 
647 	return 0;
648 }
649 
650 static void
651 process_blk_task(struct spdk_vhost_virtqueue *vq, uint16_t req_idx)
652 {
653 	struct spdk_vhost_user_blk_task *task;
654 	struct spdk_vhost_blk_task *blk_task;
655 	int rc;
656 
657 	assert(vq->packed.packed_ring == false);
658 
659 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[req_idx];
660 	blk_task = &task->blk_task;
661 	if (spdk_unlikely(task->used)) {
662 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
663 			    task->bvsession->vsession.name, req_idx);
664 		blk_task->used_len = 0;
665 		blk_task_enqueue(task);
666 		return;
667 	}
668 
669 	blk_task_inc_task_cnt(task);
670 
671 	blk_task_init(task);
672 
673 	rc = blk_iovs_split_queue_setup(task->bvsession, vq, task->req_idx,
674 					blk_task->iovs, &blk_task->iovcnt, &blk_task->payload_size);
675 
676 	if (rc) {
677 		SPDK_DEBUGLOG(vhost_blk, "Invalid request (req_idx = %"PRIu16").\n", task->req_idx);
678 		/* Only READ and WRITE are supported for now. */
679 		vhost_user_blk_request_finish(VIRTIO_BLK_S_UNSUPP, blk_task, NULL);
680 		return;
681 	}
682 
683 	if (vhost_user_process_blk_request(task) == 0) {
684 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d submitted ======\n", task,
685 			      req_idx);
686 	} else {
687 		SPDK_ERRLOG("====== Task %p req_idx %d failed ======\n", task, req_idx);
688 	}
689 }
690 
691 static void
692 process_packed_blk_task(struct spdk_vhost_virtqueue *vq, uint16_t req_idx)
693 {
694 	struct spdk_vhost_user_blk_task *task;
695 	struct spdk_vhost_blk_task *blk_task;
696 	uint16_t task_idx = req_idx, num_descs;
697 	int rc;
698 
699 	assert(vq->packed.packed_ring);
700 
701 	/* Packed ring used the buffer_id as the task_idx to get task struct.
702 	 * In kernel driver, it uses the vq->free_head to set the buffer_id so the value
703 	 * must be in the range of 0 ~ vring.size. The free_head value must be unique
704 	 * in the outstanding requests.
705 	 * We can't use the req_idx as the task_idx because the desc can be reused in
706 	 * the next phase even when it's not completed in the previous phase. For example,
707 	 * At phase 0, last_used_idx was 2 and desc0 was not completed.Then after moving
708 	 * phase 1, last_avail_idx is updated to 1. In this case, req_idx can not be used
709 	 * as task_idx because we will know task[0]->used is true at phase 1.
710 	 * The split queue is quite different, the desc would insert into the free list when
711 	 * device completes the request, the driver gets the desc from the free list which
712 	 * ensures the req_idx is unique in the outstanding requests.
713 	 */
714 	task_idx = vhost_vring_packed_desc_get_buffer_id(vq, req_idx, &num_descs);
715 
716 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[task_idx];
717 	blk_task = &task->blk_task;
718 	if (spdk_unlikely(task->used)) {
719 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
720 			    task->bvsession->vsession.name, task_idx);
721 		blk_task->used_len = 0;
722 		blk_task_enqueue(task);
723 		return;
724 	}
725 
726 	task->req_idx = req_idx;
727 	task->num_descs = num_descs;
728 	task->buffer_id = task_idx;
729 
730 	rte_vhost_set_inflight_desc_packed(task->bvsession->vsession.vid, vq->vring_idx,
731 					   req_idx, (req_idx + num_descs - 1) % vq->vring.size,
732 					   &task->inflight_head);
733 
734 	blk_task_inc_task_cnt(task);
735 
736 	blk_task_init(task);
737 
738 	rc = blk_iovs_packed_queue_setup(task->bvsession, vq, task->req_idx, blk_task->iovs,
739 					 &blk_task->iovcnt,
740 					 &blk_task->payload_size);
741 	if (rc) {
742 		SPDK_DEBUGLOG(vhost_blk, "Invalid request (req_idx = %"PRIu16").\n", task->req_idx);
743 		/* Only READ and WRITE are supported for now. */
744 		vhost_user_blk_request_finish(VIRTIO_BLK_S_UNSUPP, blk_task, NULL);
745 		return;
746 	}
747 
748 	if (vhost_user_process_blk_request(task) == 0) {
749 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d submitted ======\n", task,
750 			      task_idx);
751 	} else {
752 		SPDK_ERRLOG("====== Task %p req_idx %d failed ======\n", task, task_idx);
753 	}
754 }
755 
756 static void
757 process_packed_inflight_blk_task(struct spdk_vhost_virtqueue *vq,
758 				 uint16_t req_idx)
759 {
760 	spdk_vhost_inflight_desc *desc_array = vq->vring_inflight.inflight_packed->desc;
761 	spdk_vhost_inflight_desc *desc = &desc_array[req_idx];
762 	struct spdk_vhost_user_blk_task *task;
763 	struct spdk_vhost_blk_task *blk_task;
764 	uint16_t task_idx, num_descs;
765 	int rc;
766 
767 	task_idx = desc_array[desc->last].id;
768 	num_descs = desc->num;
769 	/* In packed ring reconnection, we use the last_used_idx as the
770 	 * initial value. So when we process the inflight descs we still
771 	 * need to update the available ring index.
772 	 */
773 	vq->last_avail_idx += num_descs;
774 	if (vq->last_avail_idx >= vq->vring.size) {
775 		vq->last_avail_idx -= vq->vring.size;
776 		vq->packed.avail_phase = !vq->packed.avail_phase;
777 	}
778 
779 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[task_idx];
780 	blk_task = &task->blk_task;
781 	if (spdk_unlikely(task->used)) {
782 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
783 			    task->bvsession->vsession.name, task_idx);
784 		blk_task->used_len = 0;
785 		blk_task_enqueue(task);
786 		return;
787 	}
788 
789 	task->req_idx = req_idx;
790 	task->num_descs = num_descs;
791 	task->buffer_id = task_idx;
792 	/* It's for cleaning inflight entries */
793 	task->inflight_head = req_idx;
794 
795 	blk_task_inc_task_cnt(task);
796 
797 	blk_task_init(task);
798 
799 	rc = blk_iovs_inflight_queue_setup(task->bvsession, vq, task->req_idx, blk_task->iovs,
800 					   &blk_task->iovcnt,
801 					   &blk_task->payload_size);
802 	if (rc) {
803 		SPDK_DEBUGLOG(vhost_blk, "Invalid request (req_idx = %"PRIu16").\n", task->req_idx);
804 		/* Only READ and WRITE are supported for now. */
805 		vhost_user_blk_request_finish(VIRTIO_BLK_S_UNSUPP, blk_task, NULL);
806 		return;
807 	}
808 
809 	if (vhost_user_process_blk_request(task) == 0) {
810 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d submitted ======\n", task,
811 			      task_idx);
812 	} else {
813 		SPDK_ERRLOG("====== Task %p req_idx %d failed ======\n", task, task_idx);
814 	}
815 }
816 
817 static int
818 submit_inflight_desc(struct spdk_vhost_blk_session *bvsession,
819 		     struct spdk_vhost_virtqueue *vq)
820 {
821 	struct spdk_vhost_session *vsession;
822 	spdk_vhost_resubmit_info *resubmit;
823 	spdk_vhost_resubmit_desc *resubmit_list;
824 	uint16_t req_idx;
825 	int i, resubmit_cnt;
826 
827 	resubmit = vq->vring_inflight.resubmit_inflight;
828 	if (spdk_likely(resubmit == NULL || resubmit->resubmit_list == NULL ||
829 			resubmit->resubmit_num == 0)) {
830 		return 0;
831 	}
832 
833 	resubmit_list = resubmit->resubmit_list;
834 	vsession = &bvsession->vsession;
835 
836 	for (i = resubmit->resubmit_num - 1; i >= 0; --i) {
837 		req_idx = resubmit_list[i].index;
838 		SPDK_DEBUGLOG(vhost_blk, "====== Start processing resubmit request idx %"PRIu16"======\n",
839 			      req_idx);
840 
841 		if (spdk_unlikely(req_idx >= vq->vring.size)) {
842 			SPDK_ERRLOG("%s: request idx '%"PRIu16"' exceeds virtqueue size (%"PRIu16").\n",
843 				    vsession->name, req_idx, vq->vring.size);
844 			vhost_vq_used_ring_enqueue(vsession, vq, req_idx, 0);
845 			continue;
846 		}
847 
848 		if (vq->packed.packed_ring) {
849 			process_packed_inflight_blk_task(vq, req_idx);
850 		} else {
851 			process_blk_task(vq, req_idx);
852 		}
853 	}
854 	resubmit_cnt = resubmit->resubmit_num;
855 	resubmit->resubmit_num = 0;
856 	return resubmit_cnt;
857 }
858 
859 static int
860 process_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
861 {
862 	struct spdk_vhost_session *vsession = &bvsession->vsession;
863 	uint16_t reqs[SPDK_VHOST_VQ_MAX_SUBMISSIONS];
864 	uint16_t reqs_cnt, i;
865 	int resubmit_cnt = 0;
866 
867 	resubmit_cnt = submit_inflight_desc(bvsession, vq);
868 
869 	reqs_cnt = vhost_vq_avail_ring_get(vq, reqs, SPDK_COUNTOF(reqs));
870 	if (!reqs_cnt) {
871 		return resubmit_cnt;
872 	}
873 
874 	for (i = 0; i < reqs_cnt; i++) {
875 		SPDK_DEBUGLOG(vhost_blk, "====== Starting processing request idx %"PRIu16"======\n",
876 			      reqs[i]);
877 
878 		if (spdk_unlikely(reqs[i] >= vq->vring.size)) {
879 			SPDK_ERRLOG("%s: request idx '%"PRIu16"' exceeds virtqueue size (%"PRIu16").\n",
880 				    vsession->name, reqs[i], vq->vring.size);
881 			vhost_vq_used_ring_enqueue(vsession, vq, reqs[i], 0);
882 			continue;
883 		}
884 
885 		rte_vhost_set_inflight_desc_split(vsession->vid, vq->vring_idx, reqs[i]);
886 
887 		process_blk_task(vq, reqs[i]);
888 	}
889 
890 	return reqs_cnt;
891 }
892 
893 static int
894 process_packed_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
895 {
896 	uint16_t i = 0;
897 	uint16_t count = 0;
898 	int resubmit_cnt = 0;
899 
900 	resubmit_cnt = submit_inflight_desc(bvsession, vq);
901 
902 	while (i++ < SPDK_VHOST_VQ_MAX_SUBMISSIONS &&
903 	       vhost_vq_packed_ring_is_avail(vq)) {
904 		SPDK_DEBUGLOG(vhost_blk, "====== Starting processing request idx %"PRIu16"======\n",
905 			      vq->last_avail_idx);
906 		count++;
907 		process_packed_blk_task(vq, vq->last_avail_idx);
908 	}
909 
910 	return count > 0 ? count : resubmit_cnt;
911 }
912 
913 static int
914 _vdev_vq_worker(struct spdk_vhost_virtqueue *vq)
915 {
916 	struct spdk_vhost_session *vsession = vq->vsession;
917 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
918 	bool packed_ring;
919 	int rc = 0;
920 
921 	packed_ring = vq->packed.packed_ring;
922 	if (packed_ring) {
923 		rc = process_packed_vq(bvsession, vq);
924 	} else {
925 		rc = process_vq(bvsession, vq);
926 	}
927 
928 	vhost_session_vq_used_signal(vq);
929 
930 	return rc;
931 
932 }
933 
934 static int
935 vdev_vq_worker(void *arg)
936 {
937 	struct spdk_vhost_virtqueue *vq = arg;
938 
939 	return _vdev_vq_worker(vq);
940 }
941 
942 static int
943 vdev_worker(void *arg)
944 {
945 	struct spdk_vhost_blk_session *bvsession = arg;
946 	struct spdk_vhost_session *vsession = &bvsession->vsession;
947 	uint16_t q_idx;
948 	int rc = 0;
949 
950 	for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
951 		rc += _vdev_vq_worker(&vsession->virtqueue[q_idx]);
952 	}
953 
954 	return rc > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
955 }
956 
957 static void
958 no_bdev_process_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
959 {
960 	struct spdk_vhost_session *vsession = &bvsession->vsession;
961 	struct iovec iovs[SPDK_VHOST_IOVS_MAX];
962 	uint32_t length;
963 	uint16_t iovcnt, req_idx;
964 
965 	if (vhost_vq_avail_ring_get(vq, &req_idx, 1) != 1) {
966 		return;
967 	}
968 
969 	iovcnt = SPDK_COUNTOF(iovs);
970 	if (blk_iovs_split_queue_setup(bvsession, vq, req_idx, iovs, &iovcnt, &length) == 0) {
971 		*(volatile uint8_t *)iovs[iovcnt - 1].iov_base = VIRTIO_BLK_S_IOERR;
972 		SPDK_DEBUGLOG(vhost_blk_data, "Aborting request %" PRIu16"\n", req_idx);
973 	}
974 
975 	vhost_vq_used_ring_enqueue(vsession, vq, req_idx, 0);
976 }
977 
978 static void
979 no_bdev_process_packed_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
980 {
981 	struct spdk_vhost_session *vsession = &bvsession->vsession;
982 	struct spdk_vhost_user_blk_task *task;
983 	struct spdk_vhost_blk_task *blk_task;
984 	uint32_t length;
985 	uint16_t req_idx = vq->last_avail_idx;
986 	uint16_t task_idx, num_descs;
987 
988 	if (!vhost_vq_packed_ring_is_avail(vq)) {
989 		return;
990 	}
991 
992 	task_idx = vhost_vring_packed_desc_get_buffer_id(vq, req_idx, &num_descs);
993 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[task_idx];
994 	blk_task = &task->blk_task;
995 	if (spdk_unlikely(task->used)) {
996 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
997 			    vsession->name, req_idx);
998 		vhost_vq_packed_ring_enqueue(vsession, vq, num_descs,
999 					     task->buffer_id, blk_task->used_len,
1000 					     task->inflight_head);
1001 		return;
1002 	}
1003 
1004 	task->req_idx = req_idx;
1005 	task->num_descs = num_descs;
1006 	task->buffer_id = task_idx;
1007 	blk_task_init(task);
1008 
1009 	if (blk_iovs_packed_queue_setup(bvsession, vq, task->req_idx, blk_task->iovs, &blk_task->iovcnt,
1010 					&length)) {
1011 		*(volatile uint8_t *)(blk_task->iovs[blk_task->iovcnt - 1].iov_base) = VIRTIO_BLK_S_IOERR;
1012 		SPDK_DEBUGLOG(vhost_blk_data, "Aborting request %" PRIu16"\n", req_idx);
1013 	}
1014 
1015 	task->used = false;
1016 	vhost_vq_packed_ring_enqueue(vsession, vq, num_descs,
1017 				     task->buffer_id, blk_task->used_len,
1018 				     task->inflight_head);
1019 }
1020 
1021 static int
1022 _no_bdev_vdev_vq_worker(struct spdk_vhost_virtqueue *vq)
1023 {
1024 	struct spdk_vhost_session *vsession = vq->vsession;
1025 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1026 	bool packed_ring;
1027 
1028 	packed_ring = vq->packed.packed_ring;
1029 	if (packed_ring) {
1030 		no_bdev_process_packed_vq(bvsession, vq);
1031 	} else {
1032 		no_bdev_process_vq(bvsession, vq);
1033 	}
1034 
1035 	vhost_session_vq_used_signal(vq);
1036 
1037 	if (vsession->task_cnt == 0 && bvsession->io_channel) {
1038 		vhost_blk_put_io_channel(bvsession->io_channel);
1039 		bvsession->io_channel = NULL;
1040 	}
1041 
1042 	return SPDK_POLLER_BUSY;
1043 }
1044 
1045 static int
1046 no_bdev_vdev_vq_worker(void *arg)
1047 {
1048 	struct spdk_vhost_virtqueue *vq = arg;
1049 
1050 	return _no_bdev_vdev_vq_worker(vq);
1051 }
1052 
1053 static int
1054 no_bdev_vdev_worker(void *arg)
1055 {
1056 	struct spdk_vhost_blk_session *bvsession = arg;
1057 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1058 	uint16_t q_idx;
1059 
1060 	for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
1061 		_no_bdev_vdev_vq_worker(&vsession->virtqueue[q_idx]);
1062 	}
1063 
1064 	return SPDK_POLLER_BUSY;
1065 }
1066 
1067 static void
1068 vhost_blk_session_unregister_interrupts(struct spdk_vhost_blk_session *bvsession)
1069 {
1070 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1071 	struct spdk_vhost_virtqueue *vq;
1072 	int i;
1073 
1074 	SPDK_DEBUGLOG(vhost_blk, "unregister virtqueues interrupt\n");
1075 	for (i = 0; i < vsession->max_queues; i++) {
1076 		vq = &vsession->virtqueue[i];
1077 		if (vq->intr == NULL) {
1078 			break;
1079 		}
1080 
1081 		SPDK_DEBUGLOG(vhost_blk, "unregister vq[%d]'s kickfd is %d\n",
1082 			      i, vq->vring.kickfd);
1083 		spdk_interrupt_unregister(&vq->intr);
1084 	}
1085 }
1086 
1087 static void
1088 _vhost_blk_vq_register_interrupt(void *arg)
1089 {
1090 	struct spdk_vhost_virtqueue *vq = arg;
1091 	struct spdk_vhost_session *vsession = vq->vsession;
1092 	struct spdk_vhost_blk_dev *bvdev =  to_blk_dev(vsession->vdev);
1093 
1094 	assert(bvdev != NULL);
1095 
1096 	if (bvdev->bdev) {
1097 		vq->intr = spdk_interrupt_register(vq->vring.kickfd, vdev_vq_worker, vq, "vdev_vq_worker");
1098 	} else {
1099 		vq->intr = spdk_interrupt_register(vq->vring.kickfd, no_bdev_vdev_vq_worker, vq,
1100 						   "no_bdev_vdev_vq_worker");
1101 	}
1102 
1103 	if (vq->intr == NULL) {
1104 		SPDK_ERRLOG("Fail to register req notifier handler.\n");
1105 		assert(false);
1106 	}
1107 }
1108 
1109 static void
1110 vhost_blk_vq_register_interrupt(struct spdk_vhost_session *vsession,
1111 				struct spdk_vhost_virtqueue *vq)
1112 {
1113 	spdk_thread_send_msg(vsession->vdev->thread, _vhost_blk_vq_register_interrupt, vq);
1114 }
1115 
1116 static int
1117 vhost_blk_session_register_no_bdev_interrupts(struct spdk_vhost_blk_session *bvsession)
1118 {
1119 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1120 	struct spdk_vhost_virtqueue *vq = NULL;
1121 	int i;
1122 
1123 	SPDK_DEBUGLOG(vhost_blk, "Register virtqueues interrupt\n");
1124 	for (i = 0; i < vsession->max_queues; i++) {
1125 		vq = &vsession->virtqueue[i];
1126 		SPDK_DEBUGLOG(vhost_blk, "Register vq[%d]'s kickfd is %d\n",
1127 			      i, vq->vring.kickfd);
1128 		vq->intr = spdk_interrupt_register(vq->vring.kickfd, no_bdev_vdev_vq_worker, vq,
1129 						   "no_bdev_vdev_vq_worker");
1130 		if (vq->intr == NULL) {
1131 			goto err;
1132 		}
1133 
1134 	}
1135 
1136 	return 0;
1137 
1138 err:
1139 	vhost_blk_session_unregister_interrupts(bvsession);
1140 	return -1;
1141 }
1142 
1143 static void
1144 vhost_blk_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1145 {
1146 	struct spdk_vhost_blk_session *bvsession = cb_arg;
1147 
1148 	vhost_user_session_set_interrupt_mode(&bvsession->vsession, interrupt_mode);
1149 }
1150 
1151 static void
1152 bdev_event_cpl_cb(struct spdk_vhost_dev *vdev, void *ctx)
1153 {
1154 	enum spdk_bdev_event_type type = (enum spdk_bdev_event_type)(uintptr_t)ctx;
1155 	struct spdk_vhost_blk_dev *bvdev;
1156 
1157 	if (type == SPDK_BDEV_EVENT_REMOVE) {
1158 		/* All sessions have been notified, time to close the bdev */
1159 		bvdev = to_blk_dev(vdev);
1160 		assert(bvdev != NULL);
1161 		spdk_put_io_channel(bvdev->dummy_io_channel);
1162 		spdk_bdev_close(bvdev->bdev_desc);
1163 		bvdev->bdev_desc = NULL;
1164 		bvdev->bdev = NULL;
1165 	}
1166 }
1167 
1168 static int
1169 vhost_session_bdev_resize_cb(struct spdk_vhost_dev *vdev,
1170 			     struct spdk_vhost_session *vsession,
1171 			     void *ctx)
1172 {
1173 	SPDK_NOTICELOG("bdev send slave msg to vid(%d)\n", vsession->vid);
1174 #if RTE_VERSION >= RTE_VERSION_NUM(23, 03, 0, 0)
1175 	rte_vhost_backend_config_change(vsession->vid, false);
1176 #else
1177 	rte_vhost_slave_config_change(vsession->vid, false);
1178 #endif
1179 
1180 	return 0;
1181 }
1182 
1183 static void
1184 vhost_user_blk_resize_cb(struct spdk_vhost_dev *vdev, bdev_event_cb_complete cb, void *cb_arg)
1185 {
1186 	vhost_user_dev_foreach_session(vdev, vhost_session_bdev_resize_cb,
1187 				       cb, cb_arg);
1188 }
1189 
1190 static int
1191 vhost_user_session_bdev_remove_cb(struct spdk_vhost_dev *vdev,
1192 				  struct spdk_vhost_session *vsession,
1193 				  void *ctx)
1194 {
1195 	struct spdk_vhost_blk_session *bvsession;
1196 	int rc;
1197 
1198 	bvsession = to_blk_session(vsession);
1199 	if (bvsession->requestq_poller) {
1200 		spdk_poller_unregister(&bvsession->requestq_poller);
1201 		if (vsession->interrupt_mode) {
1202 			vhost_blk_session_unregister_interrupts(bvsession);
1203 			rc = vhost_blk_session_register_no_bdev_interrupts(bvsession);
1204 			if (rc) {
1205 				SPDK_ERRLOG("%s: Interrupt register failed\n", vsession->name);
1206 				return rc;
1207 			}
1208 		}
1209 
1210 		bvsession->requestq_poller = SPDK_POLLER_REGISTER(no_bdev_vdev_worker, bvsession, 0);
1211 		spdk_poller_register_interrupt(bvsession->requestq_poller, vhost_blk_poller_set_interrupt_mode,
1212 					       bvsession);
1213 	}
1214 
1215 	return 0;
1216 }
1217 
1218 static void
1219 vhost_user_bdev_remove_cb(struct spdk_vhost_dev *vdev, bdev_event_cb_complete cb, void *cb_arg)
1220 {
1221 	SPDK_WARNLOG("%s: hot-removing bdev - all further requests will fail.\n",
1222 		     vdev->name);
1223 
1224 	vhost_user_dev_foreach_session(vdev, vhost_user_session_bdev_remove_cb,
1225 				       cb, cb_arg);
1226 }
1227 
1228 static void
1229 vhost_user_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_vhost_dev *vdev,
1230 			 bdev_event_cb_complete cb, void *cb_arg)
1231 {
1232 	switch (type) {
1233 	case SPDK_BDEV_EVENT_REMOVE:
1234 		vhost_user_bdev_remove_cb(vdev, cb, cb_arg);
1235 		break;
1236 	case SPDK_BDEV_EVENT_RESIZE:
1237 		vhost_user_blk_resize_cb(vdev, cb, cb_arg);
1238 		break;
1239 	default:
1240 		assert(false);
1241 		return;
1242 	}
1243 }
1244 
1245 static void
1246 bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1247 	      void *event_ctx)
1248 {
1249 	struct spdk_vhost_dev *vdev = (struct spdk_vhost_dev *)event_ctx;
1250 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1251 
1252 	assert(bvdev != NULL);
1253 
1254 	SPDK_DEBUGLOG(vhost_blk, "Bdev event: type %d, name %s\n",
1255 		      type,
1256 		      bdev->name);
1257 
1258 	switch (type) {
1259 	case SPDK_BDEV_EVENT_REMOVE:
1260 	case SPDK_BDEV_EVENT_RESIZE:
1261 		bvdev->ops->bdev_event(type, vdev, bdev_event_cpl_cb, (void *)type);
1262 		break;
1263 	default:
1264 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1265 		break;
1266 	}
1267 }
1268 
1269 static void
1270 free_task_pool(struct spdk_vhost_blk_session *bvsession)
1271 {
1272 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1273 	struct spdk_vhost_virtqueue *vq;
1274 	uint16_t i;
1275 
1276 	for (i = 0; i < vsession->max_queues; i++) {
1277 		vq = &vsession->virtqueue[i];
1278 		if (vq->tasks == NULL) {
1279 			continue;
1280 		}
1281 
1282 		spdk_free(vq->tasks);
1283 		vq->tasks = NULL;
1284 	}
1285 }
1286 
1287 static int
1288 alloc_vq_task_pool(struct spdk_vhost_session *vsession, uint16_t qid)
1289 {
1290 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1291 	struct spdk_vhost_virtqueue *vq;
1292 	struct spdk_vhost_user_blk_task *task;
1293 	uint32_t task_cnt;
1294 	uint32_t j;
1295 
1296 	if (qid >= SPDK_VHOST_MAX_VQUEUES) {
1297 		return -EINVAL;
1298 	}
1299 
1300 	vq = &vsession->virtqueue[qid];
1301 	if (vq->vring.desc == NULL) {
1302 		return 0;
1303 	}
1304 
1305 	task_cnt = vq->vring.size;
1306 	if (task_cnt > SPDK_VHOST_MAX_VQ_SIZE) {
1307 		/* sanity check */
1308 		SPDK_ERRLOG("%s: virtqueue %"PRIu16" is too big. (size = %"PRIu32", max = %"PRIu32")\n",
1309 			    vsession->name, qid, task_cnt, SPDK_VHOST_MAX_VQ_SIZE);
1310 		return -1;
1311 	}
1312 	vq->tasks = spdk_zmalloc(sizeof(struct spdk_vhost_user_blk_task) * task_cnt,
1313 				 SPDK_CACHE_LINE_SIZE, NULL,
1314 				 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1315 	if (vq->tasks == NULL) {
1316 		SPDK_ERRLOG("%s: failed to allocate %"PRIu32" tasks for virtqueue %"PRIu16"\n",
1317 			    vsession->name, task_cnt, qid);
1318 		return -1;
1319 	}
1320 
1321 	for (j = 0; j < task_cnt; j++) {
1322 		task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[j];
1323 		task->bvsession = bvsession;
1324 		task->req_idx = j;
1325 		task->vq = vq;
1326 	}
1327 
1328 	return 0;
1329 }
1330 
1331 static int
1332 vhost_blk_start(struct spdk_vhost_dev *vdev,
1333 		struct spdk_vhost_session *vsession, void *unused)
1334 {
1335 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1336 	struct spdk_vhost_blk_dev *bvdev;
1337 	int i;
1338 
1339 	/* return if start is already in progress */
1340 	if (bvsession->requestq_poller) {
1341 		SPDK_INFOLOG(vhost, "%s: start in progress\n", vsession->name);
1342 		return -EINPROGRESS;
1343 	}
1344 
1345 	/* validate all I/O queues are in a contiguous index range */
1346 	for (i = 0; i < vsession->max_queues; i++) {
1347 		/* vring.desc and vring.desc_packed are in a union struct
1348 		 * so q->vring.desc can replace q->vring.desc_packed.
1349 		 */
1350 		if (vsession->virtqueue[i].vring.desc == NULL) {
1351 			SPDK_ERRLOG("%s: queue %"PRIu32" is empty\n", vsession->name, i);
1352 			return -1;
1353 		}
1354 	}
1355 
1356 	bvdev = to_blk_dev(vdev);
1357 	assert(bvdev != NULL);
1358 	bvsession->bvdev = bvdev;
1359 
1360 	if (bvdev->bdev) {
1361 		bvsession->io_channel = vhost_blk_get_io_channel(vdev);
1362 		if (!bvsession->io_channel) {
1363 			free_task_pool(bvsession);
1364 			SPDK_ERRLOG("%s: I/O channel allocation failed\n", vsession->name);
1365 			return -1;
1366 		}
1367 	}
1368 
1369 	if (bvdev->bdev) {
1370 		bvsession->requestq_poller = SPDK_POLLER_REGISTER(vdev_worker, bvsession, 0);
1371 	} else {
1372 		bvsession->requestq_poller = SPDK_POLLER_REGISTER(no_bdev_vdev_worker, bvsession, 0);
1373 	}
1374 	SPDK_INFOLOG(vhost, "%s: started poller on lcore %d\n",
1375 		     vsession->name, spdk_env_get_current_core());
1376 
1377 	spdk_poller_register_interrupt(bvsession->requestq_poller, vhost_blk_poller_set_interrupt_mode,
1378 				       bvsession);
1379 
1380 	return 0;
1381 }
1382 
1383 static int
1384 destroy_session_poller_cb(void *arg)
1385 {
1386 	struct spdk_vhost_blk_session *bvsession = arg;
1387 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1388 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vsession->vdev);
1389 	int i;
1390 
1391 	if (vsession->task_cnt > 0 || (pthread_mutex_trylock(&user_dev->lock) != 0)) {
1392 		assert(vsession->stop_retry_count > 0);
1393 		vsession->stop_retry_count--;
1394 		if (vsession->stop_retry_count == 0) {
1395 			SPDK_ERRLOG("%s: Timedout when destroy session (task_cnt %d)\n", vsession->name,
1396 				    vsession->task_cnt);
1397 			spdk_poller_unregister(&bvsession->stop_poller);
1398 			vhost_user_session_stop_done(vsession, -ETIMEDOUT);
1399 		}
1400 
1401 		return SPDK_POLLER_BUSY;
1402 	}
1403 
1404 	for (i = 0; i < vsession->max_queues; i++) {
1405 		vsession->virtqueue[i].next_event_time = 0;
1406 		vhost_vq_used_signal(vsession, &vsession->virtqueue[i]);
1407 	}
1408 
1409 	SPDK_INFOLOG(vhost, "%s: stopping poller on lcore %d\n",
1410 		     vsession->name, spdk_env_get_current_core());
1411 
1412 	if (bvsession->io_channel) {
1413 		vhost_blk_put_io_channel(bvsession->io_channel);
1414 		bvsession->io_channel = NULL;
1415 	}
1416 
1417 	free_task_pool(bvsession);
1418 	spdk_poller_unregister(&bvsession->stop_poller);
1419 	vhost_user_session_stop_done(vsession, 0);
1420 
1421 	pthread_mutex_unlock(&user_dev->lock);
1422 	return SPDK_POLLER_BUSY;
1423 }
1424 
1425 static int
1426 vhost_blk_stop(struct spdk_vhost_dev *vdev,
1427 	       struct spdk_vhost_session *vsession, void *unused)
1428 {
1429 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1430 
1431 	/* return if stop is already in progress */
1432 	if (bvsession->stop_poller) {
1433 		return -EINPROGRESS;
1434 	}
1435 
1436 	spdk_poller_unregister(&bvsession->requestq_poller);
1437 	vhost_blk_session_unregister_interrupts(bvsession);
1438 
1439 	/* vhost_user_session_send_event timeout is 3 seconds, here set retry within 4 seconds */
1440 	bvsession->vsession.stop_retry_count = 4000;
1441 	bvsession->stop_poller = SPDK_POLLER_REGISTER(destroy_session_poller_cb,
1442 				 bvsession, 1000);
1443 	return 0;
1444 }
1445 
1446 static void
1447 vhost_blk_dump_info_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_ctx *w)
1448 {
1449 	struct spdk_vhost_blk_dev *bvdev;
1450 
1451 	bvdev = to_blk_dev(vdev);
1452 	assert(bvdev != NULL);
1453 
1454 	spdk_json_write_named_object_begin(w, "block");
1455 
1456 	spdk_json_write_named_bool(w, "readonly", bvdev->readonly);
1457 
1458 	spdk_json_write_name(w, "bdev");
1459 	if (bvdev->bdev) {
1460 		spdk_json_write_string(w, spdk_bdev_get_name(bvdev->bdev));
1461 	} else {
1462 		spdk_json_write_null(w);
1463 	}
1464 	spdk_json_write_named_string(w, "transport", bvdev->ops->name);
1465 
1466 	spdk_json_write_object_end(w);
1467 }
1468 
1469 static void
1470 vhost_blk_write_config_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_ctx *w)
1471 {
1472 	struct spdk_vhost_blk_dev *bvdev;
1473 
1474 	bvdev = to_blk_dev(vdev);
1475 	assert(bvdev != NULL);
1476 
1477 	if (!bvdev->bdev) {
1478 		return;
1479 	}
1480 
1481 	spdk_json_write_object_begin(w);
1482 	spdk_json_write_named_string(w, "method", "vhost_create_blk_controller");
1483 
1484 	spdk_json_write_named_object_begin(w, "params");
1485 	spdk_json_write_named_string(w, "ctrlr", vdev->name);
1486 	spdk_json_write_named_string(w, "dev_name", spdk_bdev_get_name(bvdev->bdev));
1487 	spdk_json_write_named_string(w, "cpumask",
1488 				     spdk_cpuset_fmt(spdk_thread_get_cpumask(vdev->thread)));
1489 	spdk_json_write_named_bool(w, "readonly", bvdev->readonly);
1490 	spdk_json_write_named_string(w, "transport", bvdev->ops->name);
1491 	spdk_json_write_object_end(w);
1492 
1493 	spdk_json_write_object_end(w);
1494 }
1495 
1496 static int vhost_blk_destroy(struct spdk_vhost_dev *dev);
1497 
1498 static int
1499 vhost_blk_get_config(struct spdk_vhost_dev *vdev, uint8_t *config,
1500 		     uint32_t len)
1501 {
1502 	struct virtio_blk_config blkcfg;
1503 	struct spdk_bdev *bdev;
1504 	uint32_t blk_size;
1505 	uint64_t blkcnt;
1506 
1507 	memset(&blkcfg, 0, sizeof(blkcfg));
1508 	bdev = vhost_blk_get_bdev(vdev);
1509 	if (bdev == NULL) {
1510 		/* We can't just return -1 here as this GET_CONFIG message might
1511 		 * be caused by a QEMU VM reboot. Returning -1 will indicate an
1512 		 * error to QEMU, who might then decide to terminate itself.
1513 		 * We don't want that. A simple reboot shouldn't break the system.
1514 		 *
1515 		 * Presenting a block device with block size 0 and block count 0
1516 		 * doesn't cause any problems on QEMU side and the virtio-pci
1517 		 * device is even still available inside the VM, but there will
1518 		 * be no block device created for it - the kernel drivers will
1519 		 * silently reject it.
1520 		 */
1521 		blk_size = 0;
1522 		blkcnt = 0;
1523 	} else {
1524 		blk_size = spdk_bdev_get_block_size(bdev);
1525 		blkcnt = spdk_bdev_get_num_blocks(bdev);
1526 		if (spdk_bdev_get_buf_align(bdev) > 1) {
1527 			blkcfg.size_max = SPDK_BDEV_LARGE_BUF_MAX_SIZE;
1528 			blkcfg.seg_max = spdk_min(SPDK_VHOST_IOVS_MAX - 2 - 1, SPDK_BDEV_IO_NUM_CHILD_IOV - 2 - 1);
1529 		} else {
1530 			blkcfg.size_max = 131072;
1531 			/*  -2 for REQ and RESP and -1 for region boundary splitting */
1532 			blkcfg.seg_max = SPDK_VHOST_IOVS_MAX - 2 - 1;
1533 		}
1534 	}
1535 
1536 	blkcfg.blk_size = blk_size;
1537 	/* minimum I/O size in blocks */
1538 	blkcfg.min_io_size = 1;
1539 	/* expressed in 512 Bytes sectors */
1540 	blkcfg.capacity = (blkcnt * blk_size) / 512;
1541 	/* QEMU can overwrite this value when started */
1542 	blkcfg.num_queues = SPDK_VHOST_MAX_VQUEUES;
1543 
1544 	if (bdev && spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1545 		/* 16MiB, expressed in 512 Bytes */
1546 		blkcfg.max_discard_sectors = 32768;
1547 		blkcfg.max_discard_seg = 1;
1548 		blkcfg.discard_sector_alignment = blk_size / 512;
1549 	}
1550 	if (bdev && spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1551 		blkcfg.max_write_zeroes_sectors = 32768;
1552 		blkcfg.max_write_zeroes_seg = 1;
1553 	}
1554 
1555 	memcpy(config, &blkcfg, spdk_min(len, sizeof(blkcfg)));
1556 
1557 	return 0;
1558 }
1559 
1560 static int
1561 vhost_blk_set_coalescing(struct spdk_vhost_dev *vdev, uint32_t delay_base_us,
1562 			 uint32_t iops_threshold)
1563 {
1564 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1565 
1566 	assert(bvdev != NULL);
1567 
1568 	return bvdev->ops->set_coalescing(vdev, delay_base_us, iops_threshold);
1569 }
1570 
1571 static void
1572 vhost_blk_get_coalescing(struct spdk_vhost_dev *vdev, uint32_t *delay_base_us,
1573 			 uint32_t *iops_threshold)
1574 {
1575 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1576 
1577 	assert(bvdev != NULL);
1578 
1579 	bvdev->ops->get_coalescing(vdev, delay_base_us, iops_threshold);
1580 }
1581 
1582 static const struct spdk_vhost_user_dev_backend vhost_blk_user_device_backend = {
1583 	.session_ctx_size = sizeof(struct spdk_vhost_blk_session) - sizeof(struct spdk_vhost_session),
1584 	.start_session =  vhost_blk_start,
1585 	.stop_session = vhost_blk_stop,
1586 	.alloc_vq_tasks = alloc_vq_task_pool,
1587 	.register_vq_interrupt = vhost_blk_vq_register_interrupt,
1588 };
1589 
1590 static const struct spdk_vhost_dev_backend vhost_blk_device_backend = {
1591 	.type = VHOST_BACKEND_BLK,
1592 	.vhost_get_config = vhost_blk_get_config,
1593 	.dump_info_json = vhost_blk_dump_info_json,
1594 	.write_config_json = vhost_blk_write_config_json,
1595 	.remove_device = vhost_blk_destroy,
1596 	.set_coalescing = vhost_blk_set_coalescing,
1597 	.get_coalescing = vhost_blk_get_coalescing,
1598 };
1599 
1600 int
1601 virtio_blk_construct_ctrlr(struct spdk_vhost_dev *vdev, const char *address,
1602 			   struct spdk_cpuset *cpumask, const struct spdk_json_val *params,
1603 			   const struct spdk_vhost_user_dev_backend *user_backend)
1604 {
1605 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1606 
1607 	assert(bvdev != NULL);
1608 
1609 	return bvdev->ops->create_ctrlr(vdev, cpumask, address, params, (void *)user_backend);
1610 }
1611 
1612 int
1613 spdk_vhost_blk_construct(const char *name, const char *cpumask, const char *dev_name,
1614 			 const char *transport, const struct spdk_json_val *params)
1615 {
1616 	struct spdk_vhost_blk_dev *bvdev = NULL;
1617 	struct spdk_vhost_dev *vdev;
1618 	struct spdk_bdev *bdev;
1619 	const char *transport_name = VIRTIO_BLK_DEFAULT_TRANSPORT;
1620 	int ret = 0;
1621 
1622 	bvdev = calloc(1, sizeof(*bvdev));
1623 	if (bvdev == NULL) {
1624 		ret = -ENOMEM;
1625 		goto out;
1626 	}
1627 
1628 	if (transport != NULL) {
1629 		transport_name = transport;
1630 	}
1631 
1632 	bvdev->ops = virtio_blk_get_transport_ops(transport_name);
1633 	if (!bvdev->ops) {
1634 		ret = -EINVAL;
1635 		SPDK_ERRLOG("Transport type '%s' unavailable.\n", transport_name);
1636 		goto out;
1637 	}
1638 
1639 	ret = spdk_bdev_open_ext(dev_name, true, bdev_event_cb, bvdev, &bvdev->bdev_desc);
1640 	if (ret != 0) {
1641 		SPDK_ERRLOG("%s: could not open bdev '%s', error=%d\n",
1642 			    name, dev_name, ret);
1643 		goto out;
1644 	}
1645 	bdev = spdk_bdev_desc_get_bdev(bvdev->bdev_desc);
1646 
1647 	vdev = &bvdev->vdev;
1648 	vdev->virtio_features = SPDK_VHOST_BLK_FEATURES_BASE;
1649 	vdev->disabled_features = SPDK_VHOST_BLK_DISABLED_FEATURES;
1650 	vdev->protocol_features = SPDK_VHOST_BLK_PROTOCOL_FEATURES;
1651 
1652 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1653 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_DISCARD);
1654 	}
1655 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1656 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_WRITE_ZEROES);
1657 	}
1658 
1659 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_FLUSH)) {
1660 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_FLUSH);
1661 	}
1662 
1663 	/*
1664 	 * When starting qemu with multiqueue enable, the vhost device will
1665 	 * be started/stopped many times, related to the queues num, as the
1666 	 * exact number of queues used for this device is not known at the time.
1667 	 * The target has to stop and start the device once got a valid IO queue.
1668 	 * When stopping and starting the vhost device, the backend bdev io device
1669 	 * will be deleted and created repeatedly.
1670 	 * Hold a bdev reference so that in the struct spdk_vhost_blk_dev, so that
1671 	 * the io device will not be deleted.
1672 	 */
1673 	bvdev->dummy_io_channel = spdk_bdev_get_io_channel(bvdev->bdev_desc);
1674 
1675 	bvdev->bdev = bdev;
1676 	bvdev->readonly = false;
1677 	ret = vhost_dev_register(vdev, name, cpumask, params, &vhost_blk_device_backend,
1678 				 &vhost_blk_user_device_backend);
1679 	if (ret != 0) {
1680 		spdk_put_io_channel(bvdev->dummy_io_channel);
1681 		spdk_bdev_close(bvdev->bdev_desc);
1682 		goto out;
1683 	}
1684 
1685 	SPDK_INFOLOG(vhost, "%s: using bdev '%s'\n", name, dev_name);
1686 out:
1687 	if (ret != 0 && bvdev) {
1688 		free(bvdev);
1689 	}
1690 	return ret;
1691 }
1692 
1693 int
1694 virtio_blk_destroy_ctrlr(struct spdk_vhost_dev *vdev)
1695 {
1696 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1697 
1698 	assert(bvdev != NULL);
1699 
1700 	return bvdev->ops->destroy_ctrlr(vdev);
1701 }
1702 
1703 static int
1704 vhost_blk_destroy(struct spdk_vhost_dev *vdev)
1705 {
1706 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1707 	int rc;
1708 
1709 	assert(bvdev != NULL);
1710 
1711 	rc = vhost_dev_unregister(&bvdev->vdev);
1712 	if (rc != 0) {
1713 		return rc;
1714 	}
1715 
1716 	/* if the bdev is removed, don't need call spdk_put_io_channel. */
1717 	if (bvdev->bdev) {
1718 		spdk_put_io_channel(bvdev->dummy_io_channel);
1719 	}
1720 
1721 	if (bvdev->bdev_desc) {
1722 		spdk_bdev_close(bvdev->bdev_desc);
1723 		bvdev->bdev_desc = NULL;
1724 	}
1725 	bvdev->bdev = NULL;
1726 
1727 	free(bvdev);
1728 	return 0;
1729 }
1730 
1731 struct spdk_io_channel *
1732 vhost_blk_get_io_channel(struct spdk_vhost_dev *vdev)
1733 {
1734 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1735 
1736 	assert(bvdev != NULL);
1737 
1738 	return spdk_bdev_get_io_channel(bvdev->bdev_desc);
1739 }
1740 
1741 void
1742 vhost_blk_put_io_channel(struct spdk_io_channel *ch)
1743 {
1744 	spdk_put_io_channel(ch);
1745 }
1746 
1747 static struct spdk_virtio_blk_transport *
1748 vhost_user_blk_create(const struct spdk_json_val *params)
1749 {
1750 	int ret;
1751 	struct spdk_virtio_blk_transport *vhost_user_blk;
1752 
1753 	vhost_user_blk = calloc(1, sizeof(*vhost_user_blk));
1754 	if (!vhost_user_blk) {
1755 		return NULL;
1756 	}
1757 
1758 	ret = vhost_user_init();
1759 	if (ret != 0) {
1760 		free(vhost_user_blk);
1761 		return NULL;
1762 	}
1763 
1764 	return vhost_user_blk;
1765 }
1766 
1767 static int
1768 vhost_user_blk_destroy(struct spdk_virtio_blk_transport *transport,
1769 		       spdk_vhost_fini_cb cb_fn)
1770 {
1771 	vhost_user_fini(cb_fn);
1772 	free(transport);
1773 	return 0;
1774 }
1775 
1776 struct rpc_vhost_blk {
1777 	bool readonly;
1778 	bool packed_ring;
1779 	bool packed_ring_recovery;
1780 };
1781 
1782 static const struct spdk_json_object_decoder rpc_construct_vhost_blk[] = {
1783 	{"readonly", offsetof(struct rpc_vhost_blk, readonly), spdk_json_decode_bool, true},
1784 	{"packed_ring", offsetof(struct rpc_vhost_blk, packed_ring), spdk_json_decode_bool, true},
1785 	{"packed_ring_recovery", offsetof(struct rpc_vhost_blk, packed_ring_recovery), spdk_json_decode_bool, true},
1786 };
1787 
1788 static int
1789 vhost_user_blk_create_ctrlr(struct spdk_vhost_dev *vdev, struct spdk_cpuset *cpumask,
1790 			    const char *address, const struct spdk_json_val *params, void *custom_opts)
1791 {
1792 	struct rpc_vhost_blk req = {0};
1793 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1794 
1795 	assert(bvdev != NULL);
1796 
1797 	if (spdk_json_decode_object_relaxed(params, rpc_construct_vhost_blk,
1798 					    SPDK_COUNTOF(rpc_construct_vhost_blk),
1799 					    &req)) {
1800 		SPDK_DEBUGLOG(vhost_blk, "spdk_json_decode_object failed\n");
1801 		return -EINVAL;
1802 	}
1803 
1804 	vdev->packed_ring_recovery = false;
1805 
1806 	if (req.packed_ring) {
1807 		vdev->virtio_features |= (uint64_t)req.packed_ring << VIRTIO_F_RING_PACKED;
1808 		vdev->packed_ring_recovery = req.packed_ring_recovery;
1809 	}
1810 	if (req.readonly) {
1811 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_RO);
1812 		bvdev->readonly = req.readonly;
1813 	}
1814 
1815 	return vhost_user_dev_register(vdev, address, cpumask, custom_opts);
1816 }
1817 
1818 static int
1819 vhost_user_blk_destroy_ctrlr(struct spdk_vhost_dev *vdev)
1820 {
1821 	return vhost_user_dev_unregister(vdev);
1822 }
1823 
1824 static void
1825 vhost_user_blk_dump_opts(struct spdk_virtio_blk_transport *transport, struct spdk_json_write_ctx *w)
1826 {
1827 	assert(w != NULL);
1828 
1829 	spdk_json_write_named_string(w, "name", transport->ops->name);
1830 }
1831 
1832 static const struct spdk_virtio_blk_transport_ops vhost_user_blk = {
1833 	.name = "vhost_user_blk",
1834 
1835 	.dump_opts = vhost_user_blk_dump_opts,
1836 
1837 	.create = vhost_user_blk_create,
1838 	.destroy = vhost_user_blk_destroy,
1839 
1840 	.create_ctrlr = vhost_user_blk_create_ctrlr,
1841 	.destroy_ctrlr = vhost_user_blk_destroy_ctrlr,
1842 
1843 	.bdev_event = vhost_user_bdev_event_cb,
1844 	.set_coalescing = vhost_user_set_coalescing,
1845 	.get_coalescing = vhost_user_get_coalescing,
1846 };
1847 
1848 SPDK_VIRTIO_BLK_TRANSPORT_REGISTER(vhost_user_blk, &vhost_user_blk);
1849 
1850 SPDK_LOG_REGISTER_COMPONENT(vhost_blk)
1851 SPDK_LOG_REGISTER_COMPONENT(vhost_blk_data)
1852