xref: /spdk/lib/vhost/vhost_blk.c (revision 8afdeef3becfe9409cc9e7372bd0bc10e8b7d46d)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation. All rights reserved.
3  *   All rights reserved.
4  */
5 
6 #include <linux/virtio_blk.h>
7 
8 #include "spdk/env.h"
9 #include "spdk/bdev.h"
10 #include "spdk/bdev_module.h"
11 #include "spdk/thread.h"
12 #include "spdk/likely.h"
13 #include "spdk/string.h"
14 #include "spdk/util.h"
15 #include "spdk/vhost.h"
16 #include "spdk/json.h"
17 
18 #include "vhost_internal.h"
19 #include <rte_version.h>
20 
21 /* Minimal set of features supported by every SPDK VHOST-BLK device */
22 #define SPDK_VHOST_BLK_FEATURES_BASE (SPDK_VHOST_FEATURES | \
23 		(1ULL << VIRTIO_BLK_F_SIZE_MAX) | (1ULL << VIRTIO_BLK_F_SEG_MAX) | \
24 		(1ULL << VIRTIO_BLK_F_GEOMETRY) | (1ULL << VIRTIO_BLK_F_BLK_SIZE) | \
25 		(1ULL << VIRTIO_BLK_F_TOPOLOGY) | (1ULL << VIRTIO_BLK_F_BARRIER)  | \
26 		(1ULL << VIRTIO_BLK_F_SCSI)     | (1ULL << VIRTIO_BLK_F_CONFIG_WCE) | \
27 		(1ULL << VIRTIO_BLK_F_MQ))
28 
29 /* Not supported features */
30 #define SPDK_VHOST_BLK_DISABLED_FEATURES (SPDK_VHOST_DISABLED_FEATURES | \
31 		(1ULL << VIRTIO_BLK_F_GEOMETRY) | (1ULL << VIRTIO_BLK_F_CONFIG_WCE) | \
32 		(1ULL << VIRTIO_BLK_F_BARRIER)  | (1ULL << VIRTIO_BLK_F_SCSI))
33 
34 /* Vhost-blk support protocol features */
35 #define SPDK_VHOST_BLK_PROTOCOL_FEATURES ((1ULL << VHOST_USER_PROTOCOL_F_CONFIG) | \
36 		(1ULL << VHOST_USER_PROTOCOL_F_INFLIGHT_SHMFD))
37 
38 #define VIRTIO_BLK_DEFAULT_TRANSPORT "vhost_user_blk"
39 
40 struct spdk_vhost_user_blk_task {
41 	struct spdk_vhost_blk_task blk_task;
42 	struct spdk_vhost_blk_session *bvsession;
43 	struct spdk_vhost_virtqueue *vq;
44 
45 	uint16_t req_idx;
46 	uint16_t num_descs;
47 	uint16_t buffer_id;
48 	uint16_t inflight_head;
49 
50 	/* If set, the task is currently used for I/O processing. */
51 	bool used;
52 };
53 
54 struct spdk_vhost_blk_dev {
55 	struct spdk_vhost_dev vdev;
56 	struct spdk_bdev *bdev;
57 	struct spdk_bdev_desc *bdev_desc;
58 	const struct spdk_virtio_blk_transport_ops *ops;
59 
60 	bool readonly;
61 	/* Next poll group index to be assigned */
62 	uint32_t next_pg_index;
63 };
64 
65 struct vhost_user_pg_vq_info {
66 	struct vhost_user_poll_group *pg;
67 	struct spdk_vhost_virtqueue *vq;
68 	struct spdk_vhost_session *vsession;
69 
70 	TAILQ_ENTRY(vhost_user_pg_vq_info) link;
71 };
72 
73 struct vhost_user_poll_group {
74 	struct spdk_vhost_dev *vdev;
75 	struct spdk_vhost_session *vsession;
76 
77 	struct spdk_thread *thread;
78 	struct spdk_poller *requestq_poller;
79 	struct spdk_io_channel *io_channel;
80 
81 	int task_cnt;
82 
83 	TAILQ_HEAD(, vhost_user_pg_vq_info) vqs;
84 
85 	struct spdk_poller *stop_poller;
86 	uint32_t stop_retry_count;
87 };
88 
89 struct spdk_vhost_blk_session {
90 	/* The parent session must be the very first field in this struct */
91 	struct spdk_vhost_session vsession;
92 	struct spdk_vhost_blk_dev *bvdev;
93 	struct spdk_poller *stop_poller;
94 
95 	struct spdk_thread *thread;
96 	struct vhost_user_poll_group *poll_groups;
97 	uint32_t num_poll_groups;
98 
99 	uint32_t num_stopped_poll_groups;
100 };
101 
102 /* forward declaration */
103 static const struct spdk_vhost_dev_backend vhost_blk_device_backend;
104 
105 static void vhost_user_blk_request_finish(uint8_t status, struct spdk_vhost_blk_task *task,
106 		void *cb_arg);
107 
108 static void session_stop_poll_groups(struct spdk_vhost_blk_session *bvsession);
109 
110 static int
111 vhost_user_process_blk_request(struct spdk_vhost_user_blk_task *user_task)
112 {
113 	struct spdk_vhost_blk_session *bvsession = user_task->bvsession;
114 	struct spdk_vhost_dev *vdev = &bvsession->bvdev->vdev;
115 	struct vhost_user_poll_group *pg = (struct vhost_user_poll_group *)user_task->vq->poll_group;
116 
117 	return virtio_blk_process_request(vdev, pg->io_channel, &user_task->blk_task,
118 					  vhost_user_blk_request_finish, NULL);
119 }
120 
121 static struct spdk_vhost_blk_dev *
122 to_blk_dev(struct spdk_vhost_dev *vdev)
123 {
124 	if (vdev == NULL) {
125 		return NULL;
126 	}
127 
128 	if (vdev->backend->type != VHOST_BACKEND_BLK) {
129 		SPDK_ERRLOG("%s: not a vhost-blk device\n", vdev->name);
130 		return NULL;
131 	}
132 
133 	return SPDK_CONTAINEROF(vdev, struct spdk_vhost_blk_dev, vdev);
134 }
135 
136 struct spdk_bdev *
137 vhost_blk_get_bdev(struct spdk_vhost_dev *vdev)
138 {
139 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
140 
141 	assert(bvdev != NULL);
142 
143 	return bvdev->bdev;
144 }
145 
146 static struct spdk_vhost_blk_session *
147 to_blk_session(struct spdk_vhost_session *vsession)
148 {
149 	assert(vsession->vdev->backend->type == VHOST_BACKEND_BLK);
150 	return (struct spdk_vhost_blk_session *)vsession;
151 }
152 
153 static inline void
154 blk_task_inc_task_cnt(struct spdk_vhost_user_blk_task *task)
155 {
156 	struct spdk_vhost_virtqueue *vq = task->vq;
157 	struct vhost_user_poll_group *pg = (struct vhost_user_poll_group *)vq->poll_group;
158 
159 	pg->task_cnt++;
160 }
161 
162 static inline void
163 blk_task_dec_task_cnt(struct spdk_vhost_user_blk_task *task)
164 {
165 	struct spdk_vhost_virtqueue *vq = task->vq;
166 	struct vhost_user_poll_group *pg = (struct vhost_user_poll_group *)vq->poll_group;
167 
168 	assert(pg->task_cnt > 0);
169 	pg->task_cnt--;
170 }
171 
172 static void
173 blk_task_finish(struct spdk_vhost_user_blk_task *task)
174 {
175 	blk_task_dec_task_cnt(task);
176 	task->used = false;
177 }
178 
179 static void
180 blk_task_init(struct spdk_vhost_user_blk_task *task)
181 {
182 	struct spdk_vhost_blk_task *blk_task = &task->blk_task;
183 
184 	task->used = true;
185 	blk_task->iovcnt = SPDK_COUNTOF(blk_task->iovs);
186 	blk_task->status = NULL;
187 	blk_task->used_len = 0;
188 	blk_task->payload_size = 0;
189 }
190 
191 static void
192 blk_task_enqueue(struct spdk_vhost_user_blk_task *task)
193 {
194 	if (task->vq->packed.packed_ring) {
195 		vhost_vq_packed_ring_enqueue(&task->bvsession->vsession, task->vq,
196 					     task->num_descs,
197 					     task->buffer_id, task->blk_task.used_len,
198 					     task->inflight_head);
199 	} else {
200 		vhost_vq_used_ring_enqueue(&task->bvsession->vsession, task->vq,
201 					   task->req_idx, task->blk_task.used_len);
202 	}
203 }
204 
205 static void
206 vhost_user_blk_request_finish(uint8_t status, struct spdk_vhost_blk_task *task, void *cb_arg)
207 {
208 	struct spdk_vhost_user_blk_task *user_task;
209 
210 	user_task = SPDK_CONTAINEROF(task, struct spdk_vhost_user_blk_task, blk_task);
211 
212 	blk_task_enqueue(user_task);
213 
214 	SPDK_DEBUGLOG(vhost_blk, "Finished task (%p) req_idx=%d\n status: %" PRIu8"\n",
215 		      user_task, user_task->req_idx, status);
216 	blk_task_finish(user_task);
217 }
218 
219 static void
220 blk_request_finish(uint8_t status, struct spdk_vhost_blk_task *task)
221 {
222 
223 	if (task->status) {
224 		*task->status = status;
225 	}
226 
227 	task->cb(status, task, task->cb_arg);
228 }
229 
230 /*
231  * Process task's descriptor chain and setup data related fields.
232  * Return
233  *   total size of supplied buffers
234  *
235  *   FIXME: Make this function return to rd_cnt and wr_cnt
236  */
237 static int
238 blk_iovs_split_queue_setup(struct spdk_vhost_blk_session *bvsession,
239 			   struct spdk_vhost_virtqueue *vq,
240 			   uint16_t req_idx, struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
241 {
242 	struct spdk_vhost_session *vsession = &bvsession->vsession;
243 	struct spdk_vhost_dev *vdev = vsession->vdev;
244 	struct vring_desc *desc, *desc_table;
245 	uint16_t out_cnt = 0, cnt = 0;
246 	uint32_t desc_table_size, len = 0;
247 	uint32_t desc_handled_cnt;
248 	int rc;
249 
250 	rc = vhost_vq_get_desc(vsession, vq, req_idx, &desc, &desc_table, &desc_table_size);
251 	if (rc != 0) {
252 		SPDK_ERRLOG("%s: invalid descriptor at index %"PRIu16".\n", vdev->name, req_idx);
253 		return -1;
254 	}
255 
256 	desc_handled_cnt = 0;
257 	while (1) {
258 		/*
259 		 * Maximum cnt reached?
260 		 * Should not happen if request is well formatted, otherwise this is a BUG.
261 		 */
262 		if (spdk_unlikely(cnt == *iovs_cnt)) {
263 			SPDK_DEBUGLOG(vhost_blk, "%s: max IOVs in request reached (req_idx = %"PRIu16").\n",
264 				      vsession->name, req_idx);
265 			return -1;
266 		}
267 
268 		if (spdk_unlikely(vhost_vring_desc_to_iov(vsession, iovs, &cnt, desc))) {
269 			SPDK_DEBUGLOG(vhost_blk, "%s: invalid descriptor %" PRIu16" (req_idx = %"PRIu16").\n",
270 				      vsession->name, req_idx, cnt);
271 			return -1;
272 		}
273 
274 		len += desc->len;
275 
276 		out_cnt += vhost_vring_desc_is_wr(desc);
277 
278 		rc = vhost_vring_desc_get_next(&desc, desc_table, desc_table_size);
279 		if (rc != 0) {
280 			SPDK_ERRLOG("%s: descriptor chain at index %"PRIu16" terminated unexpectedly.\n",
281 				    vsession->name, req_idx);
282 			return -1;
283 		} else if (desc == NULL) {
284 			break;
285 		}
286 
287 		desc_handled_cnt++;
288 		if (spdk_unlikely(desc_handled_cnt > desc_table_size)) {
289 			/* Break a cycle and report an error, if any. */
290 			SPDK_ERRLOG("%s: found a cycle in the descriptor chain: desc_table_size = %d, desc_handled_cnt = %d.\n",
291 				    vsession->name, desc_table_size, desc_handled_cnt);
292 			return -1;
293 		}
294 	}
295 
296 	/*
297 	 * There must be least two descriptors.
298 	 * First contain request so it must be readable.
299 	 * Last descriptor contain buffer for response so it must be writable.
300 	 */
301 	if (spdk_unlikely(out_cnt == 0 || cnt < 2)) {
302 		return -1;
303 	}
304 
305 	*length = len;
306 	*iovs_cnt = cnt;
307 	return 0;
308 }
309 
310 static int
311 blk_iovs_packed_desc_setup(struct spdk_vhost_session *vsession,
312 			   struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
313 			   struct vring_packed_desc *desc_table, uint16_t desc_table_size,
314 			   struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
315 {
316 	struct vring_packed_desc *desc;
317 	uint16_t cnt = 0, out_cnt = 0;
318 	uint32_t len = 0;
319 
320 	if (desc_table == NULL) {
321 		desc = &vq->vring.desc_packed[req_idx];
322 	} else {
323 		req_idx = 0;
324 		desc = desc_table;
325 	}
326 
327 	while (1) {
328 		/*
329 		 * Maximum cnt reached?
330 		 * Should not happen if request is well formatted, otherwise this is a BUG.
331 		 */
332 		if (spdk_unlikely(cnt == *iovs_cnt)) {
333 			SPDK_ERRLOG("%s: max IOVs in request reached (req_idx = %"PRIu16").\n",
334 				    vsession->name, req_idx);
335 			return -EINVAL;
336 		}
337 
338 		if (spdk_unlikely(vhost_vring_packed_desc_to_iov(vsession, iovs, &cnt, desc))) {
339 			SPDK_ERRLOG("%s: invalid descriptor %" PRIu16" (req_idx = %"PRIu16").\n",
340 				    vsession->name, req_idx, cnt);
341 			return -EINVAL;
342 		}
343 
344 		len += desc->len;
345 		out_cnt += vhost_vring_packed_desc_is_wr(desc);
346 
347 		/* desc is NULL means we reach the last desc of this request */
348 		vhost_vring_packed_desc_get_next(&desc, &req_idx, vq, desc_table, desc_table_size);
349 		if (desc == NULL) {
350 			break;
351 		}
352 	}
353 
354 	/*
355 	 * There must be least two descriptors.
356 	 * First contain request so it must be readable.
357 	 * Last descriptor contain buffer for response so it must be writable.
358 	 */
359 	if (spdk_unlikely(out_cnt == 0 || cnt < 2)) {
360 		return -EINVAL;
361 	}
362 
363 	*length = len;
364 	*iovs_cnt = cnt;
365 
366 	return 0;
367 }
368 
369 static int
370 blk_iovs_packed_queue_setup(struct spdk_vhost_blk_session *bvsession,
371 			    struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
372 			    struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
373 {
374 	struct spdk_vhost_session *vsession = &bvsession->vsession;
375 	struct spdk_vhost_dev *vdev = vsession->vdev;
376 	struct vring_packed_desc *desc = NULL, *desc_table;
377 	uint32_t desc_table_size;
378 	int rc;
379 
380 	rc = vhost_vq_get_desc_packed(vsession, vq, req_idx, &desc,
381 				      &desc_table, &desc_table_size);
382 	if (spdk_unlikely(rc != 0)) {
383 		SPDK_ERRLOG("%s: Invalid descriptor at index %"PRIu16".\n", vdev->name, req_idx);
384 		return rc;
385 	}
386 
387 	return blk_iovs_packed_desc_setup(vsession, vq, req_idx, desc_table, desc_table_size,
388 					  iovs, iovs_cnt, length);
389 }
390 
391 static int
392 blk_iovs_inflight_queue_setup(struct spdk_vhost_blk_session *bvsession,
393 			      struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
394 			      struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
395 {
396 	struct spdk_vhost_session *vsession = &bvsession->vsession;
397 	struct spdk_vhost_dev *vdev = vsession->vdev;
398 	spdk_vhost_inflight_desc *inflight_desc;
399 	struct vring_packed_desc *desc_table;
400 	uint16_t out_cnt = 0, cnt = 0;
401 	uint32_t desc_table_size, len = 0;
402 	int rc = 0;
403 
404 	rc = vhost_inflight_queue_get_desc(vsession, vq->vring_inflight.inflight_packed->desc,
405 					   req_idx, &inflight_desc, &desc_table, &desc_table_size);
406 	if (spdk_unlikely(rc != 0)) {
407 		SPDK_ERRLOG("%s: Invalid descriptor at index %"PRIu16".\n", vdev->name, req_idx);
408 		return rc;
409 	}
410 
411 	if (desc_table != NULL) {
412 		return blk_iovs_packed_desc_setup(vsession, vq, req_idx, desc_table, desc_table_size,
413 						  iovs, iovs_cnt, length);
414 	}
415 
416 	while (1) {
417 		/*
418 		 * Maximum cnt reached?
419 		 * Should not happen if request is well formatted, otherwise this is a BUG.
420 		 */
421 		if (spdk_unlikely(cnt == *iovs_cnt)) {
422 			SPDK_ERRLOG("%s: max IOVs in request reached (req_idx = %"PRIu16").\n",
423 				    vsession->name, req_idx);
424 			return -EINVAL;
425 		}
426 
427 		if (spdk_unlikely(vhost_vring_inflight_desc_to_iov(vsession, iovs, &cnt, inflight_desc))) {
428 			SPDK_ERRLOG("%s: invalid descriptor %" PRIu16" (req_idx = %"PRIu16").\n",
429 				    vsession->name, req_idx, cnt);
430 			return -EINVAL;
431 		}
432 
433 		len += inflight_desc->len;
434 		out_cnt += vhost_vring_inflight_desc_is_wr(inflight_desc);
435 
436 		/* Without F_NEXT means it's the last desc */
437 		if ((inflight_desc->flags & VRING_DESC_F_NEXT) == 0) {
438 			break;
439 		}
440 
441 		inflight_desc = &vq->vring_inflight.inflight_packed->desc[inflight_desc->next];
442 	}
443 
444 	/*
445 	 * There must be least two descriptors.
446 	 * First contain request so it must be readable.
447 	 * Last descriptor contain buffer for response so it must be writable.
448 	 */
449 	if (spdk_unlikely(out_cnt == 0 || cnt < 2)) {
450 		return -EINVAL;
451 	}
452 
453 	*length = len;
454 	*iovs_cnt = cnt;
455 
456 	return 0;
457 }
458 
459 static void
460 blk_request_complete_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
461 {
462 	struct spdk_vhost_blk_task *task = cb_arg;
463 
464 	spdk_bdev_free_io(bdev_io);
465 	blk_request_finish(success ? VIRTIO_BLK_S_OK : VIRTIO_BLK_S_IOERR, task);
466 }
467 
468 static void
469 blk_request_resubmit(void *arg)
470 {
471 	struct spdk_vhost_blk_task *task = arg;
472 	int rc = 0;
473 
474 	rc = virtio_blk_process_request(task->bdev_io_wait_vdev, task->bdev_io_wait_ch, task,
475 					task->cb, task->cb_arg);
476 	if (rc == 0) {
477 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p resubmitted ======\n", task);
478 	} else {
479 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p failed ======\n", task);
480 	}
481 }
482 
483 static inline void
484 blk_request_queue_io(struct spdk_vhost_dev *vdev, struct spdk_io_channel *ch,
485 		     struct spdk_vhost_blk_task *task)
486 {
487 	int rc;
488 	struct spdk_bdev *bdev = vhost_blk_get_bdev(vdev);
489 
490 	task->bdev_io_wait.bdev = bdev;
491 	task->bdev_io_wait.cb_fn = blk_request_resubmit;
492 	task->bdev_io_wait.cb_arg = task;
493 	task->bdev_io_wait_ch = ch;
494 	task->bdev_io_wait_vdev = vdev;
495 
496 	rc = spdk_bdev_queue_io_wait(bdev, ch, &task->bdev_io_wait);
497 	if (rc != 0) {
498 		blk_request_finish(VIRTIO_BLK_S_IOERR, task);
499 	}
500 }
501 
502 int
503 virtio_blk_process_request(struct spdk_vhost_dev *vdev, struct spdk_io_channel *ch,
504 			   struct spdk_vhost_blk_task *task, virtio_blk_request_cb cb, void *cb_arg)
505 {
506 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
507 	struct virtio_blk_outhdr req;
508 	struct virtio_blk_discard_write_zeroes *desc;
509 	struct iovec *iov;
510 	uint32_t type;
511 	uint64_t flush_bytes;
512 	uint32_t payload_len;
513 	uint16_t iovcnt;
514 	int rc;
515 
516 	assert(bvdev != NULL);
517 
518 	task->cb = cb;
519 	task->cb_arg = cb_arg;
520 
521 	iov = &task->iovs[0];
522 	if (spdk_unlikely(iov->iov_len != sizeof(req))) {
523 		SPDK_DEBUGLOG(vhost_blk,
524 			      "First descriptor size is %zu but expected %zu (task = %p).\n",
525 			      iov->iov_len, sizeof(req), task);
526 		blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
527 		return -1;
528 	}
529 
530 	/* Some SeaBIOS versions don't align the virtio_blk_outhdr on an 8-byte boundary, which
531 	 * triggers ubsan errors.  So copy this small 16-byte structure to the stack to workaround
532 	 * this problem.
533 	 */
534 	memcpy(&req, iov->iov_base, sizeof(req));
535 
536 	iov = &task->iovs[task->iovcnt - 1];
537 	if (spdk_unlikely(iov->iov_len != 1)) {
538 		SPDK_DEBUGLOG(vhost_blk,
539 			      "Last descriptor size is %zu but expected %d (task = %p).\n",
540 			      iov->iov_len, 1, task);
541 		blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
542 		return -1;
543 	}
544 
545 	payload_len = task->payload_size;
546 	task->status = iov->iov_base;
547 	payload_len -= sizeof(req) + sizeof(*task->status);
548 	iovcnt = task->iovcnt - 2;
549 
550 	type = req.type;
551 #ifdef VIRTIO_BLK_T_BARRIER
552 	/* Don't care about barrier for now (as QEMU's virtio-blk do). */
553 	type &= ~VIRTIO_BLK_T_BARRIER;
554 #endif
555 
556 	switch (type) {
557 	case VIRTIO_BLK_T_IN:
558 	case VIRTIO_BLK_T_OUT:
559 		if (spdk_unlikely(payload_len == 0 || (payload_len & (512 - 1)) != 0)) {
560 			SPDK_ERRLOG("%s - passed IO buffer is not multiple of 512b (task = %p).\n",
561 				    type ? "WRITE" : "READ", task);
562 			blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
563 			return -1;
564 		}
565 
566 		if (type == VIRTIO_BLK_T_IN) {
567 			task->used_len = payload_len + sizeof(*task->status);
568 			rc = spdk_bdev_readv(bvdev->bdev_desc, ch,
569 					     &task->iovs[1], iovcnt, req.sector * 512,
570 					     payload_len, blk_request_complete_cb, task);
571 		} else if (!bvdev->readonly) {
572 			task->used_len = sizeof(*task->status);
573 			rc = spdk_bdev_writev(bvdev->bdev_desc, ch,
574 					      &task->iovs[1], iovcnt, req.sector * 512,
575 					      payload_len, blk_request_complete_cb, task);
576 		} else {
577 			SPDK_DEBUGLOG(vhost_blk, "Device is in read-only mode!\n");
578 			rc = -1;
579 		}
580 
581 		if (rc) {
582 			if (rc == -ENOMEM) {
583 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
584 				blk_request_queue_io(vdev, ch, task);
585 			} else {
586 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
587 				return -1;
588 			}
589 		}
590 		break;
591 	case VIRTIO_BLK_T_DISCARD:
592 		desc = task->iovs[1].iov_base;
593 		if (payload_len != sizeof(*desc)) {
594 			SPDK_NOTICELOG("Invalid discard payload size: %u\n", payload_len);
595 			blk_request_finish(VIRTIO_BLK_S_IOERR, task);
596 			return -1;
597 		}
598 
599 		if (desc->flags & VIRTIO_BLK_WRITE_ZEROES_FLAG_UNMAP) {
600 			SPDK_ERRLOG("UNMAP flag is only used for WRITE ZEROES command\n");
601 			blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
602 			return -1;
603 		}
604 
605 		rc = spdk_bdev_unmap(bvdev->bdev_desc, ch,
606 				     desc->sector * 512, desc->num_sectors * 512,
607 				     blk_request_complete_cb, task);
608 		if (rc) {
609 			if (rc == -ENOMEM) {
610 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
611 				blk_request_queue_io(vdev, ch, task);
612 			} else {
613 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
614 				return -1;
615 			}
616 		}
617 		break;
618 	case VIRTIO_BLK_T_WRITE_ZEROES:
619 		desc = task->iovs[1].iov_base;
620 		if (payload_len != sizeof(*desc)) {
621 			SPDK_NOTICELOG("Invalid write zeroes payload size: %u\n", payload_len);
622 			blk_request_finish(VIRTIO_BLK_S_IOERR, task);
623 			return -1;
624 		}
625 
626 		/* Unmap this range, SPDK doesn't support it, kernel will enable this flag by default
627 		 * without checking unmap feature is negotiated or not, the flag isn't mandatory, so
628 		 * just print a warning.
629 		 */
630 		if (desc->flags & VIRTIO_BLK_WRITE_ZEROES_FLAG_UNMAP) {
631 			SPDK_WARNLOG("Ignore the unmap flag for WRITE ZEROES from %"PRIx64", len %"PRIx64"\n",
632 				     (uint64_t)desc->sector * 512, (uint64_t)desc->num_sectors * 512);
633 		}
634 
635 		rc = spdk_bdev_write_zeroes(bvdev->bdev_desc, ch,
636 					    desc->sector * 512, desc->num_sectors * 512,
637 					    blk_request_complete_cb, task);
638 		if (rc) {
639 			if (rc == -ENOMEM) {
640 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
641 				blk_request_queue_io(vdev, ch, task);
642 			} else {
643 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
644 				return -1;
645 			}
646 		}
647 		break;
648 	case VIRTIO_BLK_T_FLUSH:
649 		flush_bytes = spdk_bdev_get_num_blocks(bvdev->bdev) * spdk_bdev_get_block_size(bvdev->bdev);
650 		if (req.sector != 0) {
651 			SPDK_NOTICELOG("sector must be zero for flush command\n");
652 			blk_request_finish(VIRTIO_BLK_S_IOERR, task);
653 			return -1;
654 		}
655 		rc = spdk_bdev_flush(bvdev->bdev_desc, ch,
656 				     0, flush_bytes,
657 				     blk_request_complete_cb, task);
658 		if (rc) {
659 			if (rc == -ENOMEM) {
660 				SPDK_DEBUGLOG(vhost_blk, "No memory, start to queue io.\n");
661 				blk_request_queue_io(vdev, ch, task);
662 			} else {
663 				blk_request_finish(VIRTIO_BLK_S_IOERR, task);
664 				return -1;
665 			}
666 		}
667 		break;
668 	case VIRTIO_BLK_T_GET_ID:
669 		if (!iovcnt || !payload_len) {
670 			blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
671 			return -1;
672 		}
673 		task->used_len = spdk_min((size_t)VIRTIO_BLK_ID_BYTES, task->iovs[1].iov_len);
674 		spdk_strcpy_pad(task->iovs[1].iov_base, spdk_bdev_get_name(bvdev->bdev),
675 				task->used_len, ' ');
676 		blk_request_finish(VIRTIO_BLK_S_OK, task);
677 		break;
678 	default:
679 		SPDK_DEBUGLOG(vhost_blk, "Not supported request type '%"PRIu32"'.\n", type);
680 		blk_request_finish(VIRTIO_BLK_S_UNSUPP, task);
681 		return -1;
682 	}
683 
684 	return 0;
685 }
686 
687 static void
688 process_blk_task(struct spdk_vhost_virtqueue *vq, uint16_t req_idx)
689 {
690 	struct spdk_vhost_user_blk_task *task;
691 	struct spdk_vhost_blk_task *blk_task;
692 	int rc;
693 
694 	assert(vq->packed.packed_ring == false);
695 
696 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[req_idx];
697 	blk_task = &task->blk_task;
698 	if (spdk_unlikely(task->used)) {
699 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
700 			    task->bvsession->vsession.name, req_idx);
701 		blk_task->used_len = 0;
702 		blk_task_enqueue(task);
703 		return;
704 	}
705 
706 	blk_task_inc_task_cnt(task);
707 
708 	blk_task_init(task);
709 
710 	rc = blk_iovs_split_queue_setup(task->bvsession, vq, task->req_idx,
711 					blk_task->iovs, &blk_task->iovcnt, &blk_task->payload_size);
712 
713 	if (rc) {
714 		SPDK_DEBUGLOG(vhost_blk, "Invalid request (req_idx = %"PRIu16").\n", task->req_idx);
715 		/* Only READ and WRITE are supported for now. */
716 		vhost_user_blk_request_finish(VIRTIO_BLK_S_UNSUPP, blk_task, NULL);
717 		return;
718 	}
719 
720 	if (vhost_user_process_blk_request(task) == 0) {
721 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d submitted ======\n", task,
722 			      req_idx);
723 	} else {
724 		SPDK_ERRLOG("====== Task %p req_idx %d failed ======\n", task, req_idx);
725 	}
726 }
727 
728 static void
729 process_packed_blk_task(struct spdk_vhost_virtqueue *vq, uint16_t req_idx)
730 {
731 	struct spdk_vhost_user_blk_task *task;
732 	struct spdk_vhost_blk_task *blk_task;
733 	uint16_t task_idx = req_idx, num_descs;
734 	int rc;
735 
736 	assert(vq->packed.packed_ring);
737 
738 	/* Packed ring used the buffer_id as the task_idx to get task struct.
739 	 * In kernel driver, it uses the vq->free_head to set the buffer_id so the value
740 	 * must be in the range of 0 ~ vring.size. The free_head value must be unique
741 	 * in the outstanding requests.
742 	 * We can't use the req_idx as the task_idx because the desc can be reused in
743 	 * the next phase even when it's not completed in the previous phase. For example,
744 	 * At phase 0, last_used_idx was 2 and desc0 was not completed.Then after moving
745 	 * phase 1, last_avail_idx is updated to 1. In this case, req_idx can not be used
746 	 * as task_idx because we will know task[0]->used is true at phase 1.
747 	 * The split queue is quite different, the desc would insert into the free list when
748 	 * device completes the request, the driver gets the desc from the free list which
749 	 * ensures the req_idx is unique in the outstanding requests.
750 	 */
751 	task_idx = vhost_vring_packed_desc_get_buffer_id(vq, req_idx, &num_descs);
752 
753 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[task_idx];
754 	blk_task = &task->blk_task;
755 	if (spdk_unlikely(task->used)) {
756 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
757 			    task->bvsession->vsession.name, task_idx);
758 		blk_task->used_len = 0;
759 		blk_task_enqueue(task);
760 		return;
761 	}
762 
763 	task->req_idx = req_idx;
764 	task->num_descs = num_descs;
765 	task->buffer_id = task_idx;
766 
767 	rte_vhost_set_inflight_desc_packed(task->bvsession->vsession.vid, vq->vring_idx,
768 					   req_idx, (req_idx + num_descs - 1) % vq->vring.size,
769 					   &task->inflight_head);
770 
771 	blk_task_inc_task_cnt(task);
772 
773 	blk_task_init(task);
774 
775 	rc = blk_iovs_packed_queue_setup(task->bvsession, vq, task->req_idx, blk_task->iovs,
776 					 &blk_task->iovcnt,
777 					 &blk_task->payload_size);
778 	if (rc) {
779 		SPDK_DEBUGLOG(vhost_blk, "Invalid request (req_idx = %"PRIu16").\n", task->req_idx);
780 		/* Only READ and WRITE are supported for now. */
781 		vhost_user_blk_request_finish(VIRTIO_BLK_S_UNSUPP, blk_task, NULL);
782 		return;
783 	}
784 
785 	if (vhost_user_process_blk_request(task) == 0) {
786 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d submitted ======\n", task,
787 			      task_idx);
788 	} else {
789 		SPDK_ERRLOG("====== Task %p req_idx %d failed ======\n", task, task_idx);
790 	}
791 }
792 
793 static void
794 process_packed_inflight_blk_task(struct spdk_vhost_virtqueue *vq,
795 				 uint16_t req_idx)
796 {
797 	spdk_vhost_inflight_desc *desc_array = vq->vring_inflight.inflight_packed->desc;
798 	spdk_vhost_inflight_desc *desc = &desc_array[req_idx];
799 	struct spdk_vhost_user_blk_task *task;
800 	struct spdk_vhost_blk_task *blk_task;
801 	uint16_t task_idx, num_descs;
802 	int rc;
803 
804 	task_idx = desc_array[desc->last].id;
805 	num_descs = desc->num;
806 	/* In packed ring reconnection, we use the last_used_idx as the
807 	 * initial value. So when we process the inflight descs we still
808 	 * need to update the available ring index.
809 	 */
810 	vq->last_avail_idx += num_descs;
811 	if (vq->last_avail_idx >= vq->vring.size) {
812 		vq->last_avail_idx -= vq->vring.size;
813 		vq->packed.avail_phase = !vq->packed.avail_phase;
814 	}
815 
816 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[task_idx];
817 	blk_task = &task->blk_task;
818 	if (spdk_unlikely(task->used)) {
819 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
820 			    task->bvsession->vsession.name, task_idx);
821 		blk_task->used_len = 0;
822 		blk_task_enqueue(task);
823 		return;
824 	}
825 
826 	task->req_idx = req_idx;
827 	task->num_descs = num_descs;
828 	task->buffer_id = task_idx;
829 	/* It's for cleaning inflight entries */
830 	task->inflight_head = req_idx;
831 
832 	blk_task_inc_task_cnt(task);
833 
834 	blk_task_init(task);
835 
836 	rc = blk_iovs_inflight_queue_setup(task->bvsession, vq, task->req_idx, blk_task->iovs,
837 					   &blk_task->iovcnt,
838 					   &blk_task->payload_size);
839 	if (rc) {
840 		SPDK_DEBUGLOG(vhost_blk, "Invalid request (req_idx = %"PRIu16").\n", task->req_idx);
841 		/* Only READ and WRITE are supported for now. */
842 		vhost_user_blk_request_finish(VIRTIO_BLK_S_UNSUPP, blk_task, NULL);
843 		return;
844 	}
845 
846 	if (vhost_user_process_blk_request(task) == 0) {
847 		SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d submitted ======\n", task,
848 			      task_idx);
849 	} else {
850 		SPDK_ERRLOG("====== Task %p req_idx %d failed ======\n", task, task_idx);
851 	}
852 }
853 
854 static int
855 submit_inflight_desc(struct spdk_vhost_blk_session *bvsession,
856 		     struct spdk_vhost_virtqueue *vq)
857 {
858 	struct spdk_vhost_session *vsession;
859 	spdk_vhost_resubmit_info *resubmit;
860 	spdk_vhost_resubmit_desc *resubmit_list;
861 	uint16_t req_idx;
862 	int i, resubmit_cnt;
863 
864 	resubmit = vq->vring_inflight.resubmit_inflight;
865 	if (spdk_likely(resubmit == NULL || resubmit->resubmit_list == NULL ||
866 			resubmit->resubmit_num == 0)) {
867 		return 0;
868 	}
869 
870 	resubmit_list = resubmit->resubmit_list;
871 	vsession = &bvsession->vsession;
872 
873 	for (i = resubmit->resubmit_num - 1; i >= 0; --i) {
874 		req_idx = resubmit_list[i].index;
875 		SPDK_DEBUGLOG(vhost_blk, "====== Start processing resubmit request idx %"PRIu16"======\n",
876 			      req_idx);
877 
878 		if (spdk_unlikely(req_idx >= vq->vring.size)) {
879 			SPDK_ERRLOG("%s: request idx '%"PRIu16"' exceeds virtqueue size (%"PRIu16").\n",
880 				    vsession->name, req_idx, vq->vring.size);
881 			vhost_vq_used_ring_enqueue(vsession, vq, req_idx, 0);
882 			continue;
883 		}
884 
885 		if (vq->packed.packed_ring) {
886 			process_packed_inflight_blk_task(vq, req_idx);
887 		} else {
888 			process_blk_task(vq, req_idx);
889 		}
890 	}
891 	resubmit_cnt = resubmit->resubmit_num;
892 	resubmit->resubmit_num = 0;
893 	return resubmit_cnt;
894 }
895 
896 static int
897 process_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
898 {
899 	struct spdk_vhost_session *vsession = &bvsession->vsession;
900 	uint16_t reqs[SPDK_VHOST_VQ_MAX_SUBMISSIONS];
901 	uint16_t reqs_cnt, i;
902 	int resubmit_cnt = 0;
903 
904 	resubmit_cnt = submit_inflight_desc(bvsession, vq);
905 
906 	reqs_cnt = vhost_vq_avail_ring_get(vq, reqs, SPDK_COUNTOF(reqs));
907 	if (!reqs_cnt) {
908 		return resubmit_cnt;
909 	}
910 
911 	for (i = 0; i < reqs_cnt; i++) {
912 		SPDK_DEBUGLOG(vhost_blk, "====== Starting processing request idx %"PRIu16"======\n",
913 			      reqs[i]);
914 
915 		if (spdk_unlikely(reqs[i] >= vq->vring.size)) {
916 			SPDK_ERRLOG("%s: request idx '%"PRIu16"' exceeds virtqueue size (%"PRIu16").\n",
917 				    vsession->name, reqs[i], vq->vring.size);
918 			vhost_vq_used_ring_enqueue(vsession, vq, reqs[i], 0);
919 			continue;
920 		}
921 
922 		rte_vhost_set_inflight_desc_split(vsession->vid, vq->vring_idx, reqs[i]);
923 
924 		process_blk_task(vq, reqs[i]);
925 	}
926 
927 	return reqs_cnt;
928 }
929 
930 static int
931 process_packed_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
932 {
933 	uint16_t i = 0;
934 	uint16_t count = 0;
935 	int resubmit_cnt = 0;
936 
937 	resubmit_cnt = submit_inflight_desc(bvsession, vq);
938 
939 	while (i++ < SPDK_VHOST_VQ_MAX_SUBMISSIONS &&
940 	       vhost_vq_packed_ring_is_avail(vq)) {
941 		SPDK_DEBUGLOG(vhost_blk, "====== Starting processing request idx %"PRIu16"======\n",
942 			      vq->last_avail_idx);
943 		count++;
944 		process_packed_blk_task(vq, vq->last_avail_idx);
945 	}
946 
947 	return count > 0 ? count : resubmit_cnt;
948 }
949 
950 static int
951 _vdev_vq_worker(struct spdk_vhost_virtqueue *vq)
952 {
953 	struct spdk_vhost_session *vsession = vq->vsession;
954 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
955 	bool packed_ring;
956 	int rc = 0;
957 
958 	packed_ring = vq->packed.packed_ring;
959 	if (packed_ring) {
960 		rc = process_packed_vq(bvsession, vq);
961 	} else {
962 		rc = process_vq(bvsession, vq);
963 	}
964 
965 	vhost_session_vq_used_signal(vq);
966 
967 	return rc;
968 
969 }
970 
971 static int
972 vdev_vq_worker(void *arg)
973 {
974 	struct spdk_vhost_virtqueue *vq = arg;
975 
976 	return _vdev_vq_worker(vq);
977 }
978 
979 static int
980 vdev_worker(void *arg)
981 {
982 	struct vhost_user_poll_group *pg = arg;
983 	struct vhost_user_pg_vq_info *vq_info;
984 	struct spdk_vhost_virtqueue *vq;
985 	int rc = 0;
986 
987 	TAILQ_FOREACH(vq_info, &pg->vqs, link) {
988 		vq = vq_info->vq;
989 		assert(vq->poll_group == pg);
990 		rc = _vdev_vq_worker(vq);
991 	}
992 
993 	return rc > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
994 }
995 
996 static void
997 no_bdev_process_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
998 {
999 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1000 	struct iovec iovs[SPDK_VHOST_IOVS_MAX];
1001 	uint32_t length;
1002 	uint16_t iovcnt, req_idx;
1003 
1004 	if (vhost_vq_avail_ring_get(vq, &req_idx, 1) != 1) {
1005 		return;
1006 	}
1007 
1008 	iovcnt = SPDK_COUNTOF(iovs);
1009 	if (blk_iovs_split_queue_setup(bvsession, vq, req_idx, iovs, &iovcnt, &length) == 0) {
1010 		*(volatile uint8_t *)iovs[iovcnt - 1].iov_base = VIRTIO_BLK_S_IOERR;
1011 		SPDK_DEBUGLOG(vhost_blk_data, "Aborting request %" PRIu16"\n", req_idx);
1012 	}
1013 
1014 	vhost_vq_used_ring_enqueue(vsession, vq, req_idx, 0);
1015 }
1016 
1017 static void
1018 no_bdev_process_packed_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
1019 {
1020 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1021 	struct spdk_vhost_user_blk_task *task;
1022 	struct spdk_vhost_blk_task *blk_task;
1023 	uint32_t length;
1024 	uint16_t req_idx = vq->last_avail_idx;
1025 	uint16_t task_idx, num_descs;
1026 
1027 	if (!vhost_vq_packed_ring_is_avail(vq)) {
1028 		return;
1029 	}
1030 
1031 	task_idx = vhost_vring_packed_desc_get_buffer_id(vq, req_idx, &num_descs);
1032 	task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[task_idx];
1033 	blk_task = &task->blk_task;
1034 	if (spdk_unlikely(task->used)) {
1035 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
1036 			    vsession->name, req_idx);
1037 		vhost_vq_packed_ring_enqueue(vsession, vq, num_descs,
1038 					     task->buffer_id, blk_task->used_len,
1039 					     task->inflight_head);
1040 		return;
1041 	}
1042 
1043 	task->req_idx = req_idx;
1044 	task->num_descs = num_descs;
1045 	task->buffer_id = task_idx;
1046 	blk_task_init(task);
1047 
1048 	if (blk_iovs_packed_queue_setup(bvsession, vq, task->req_idx, blk_task->iovs, &blk_task->iovcnt,
1049 					&length)) {
1050 		*(volatile uint8_t *)(blk_task->iovs[blk_task->iovcnt - 1].iov_base) = VIRTIO_BLK_S_IOERR;
1051 		SPDK_DEBUGLOG(vhost_blk_data, "Aborting request %" PRIu16"\n", req_idx);
1052 	}
1053 
1054 	task->used = false;
1055 	vhost_vq_packed_ring_enqueue(vsession, vq, num_descs,
1056 				     task->buffer_id, blk_task->used_len,
1057 				     task->inflight_head);
1058 }
1059 
1060 static int
1061 _no_bdev_vdev_vq_worker(struct spdk_vhost_virtqueue *vq)
1062 {
1063 	struct spdk_vhost_session *vsession = vq->vsession;
1064 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1065 	struct vhost_user_poll_group *pg = (struct vhost_user_poll_group *)vq->poll_group;
1066 
1067 	bool packed_ring;
1068 
1069 	packed_ring = vq->packed.packed_ring;
1070 	if (packed_ring) {
1071 		no_bdev_process_packed_vq(bvsession, vq);
1072 	} else {
1073 		no_bdev_process_vq(bvsession, vq);
1074 	}
1075 
1076 	vhost_session_vq_used_signal(vq);
1077 
1078 	if (pg->task_cnt == 0 && pg->io_channel) {
1079 		vhost_blk_put_io_channel(pg->io_channel);
1080 		pg->io_channel = NULL;
1081 	}
1082 
1083 	return SPDK_POLLER_BUSY;
1084 }
1085 
1086 static int
1087 no_bdev_vdev_vq_worker(void *arg)
1088 {
1089 	struct spdk_vhost_virtqueue *vq = arg;
1090 
1091 	return _no_bdev_vdev_vq_worker(vq);
1092 }
1093 
1094 static int
1095 no_bdev_vdev_worker(void *arg)
1096 {
1097 	struct vhost_user_poll_group *pg = arg;
1098 	struct vhost_user_pg_vq_info *vq_info;
1099 	int rc = 0;
1100 
1101 	TAILQ_FOREACH(vq_info, &pg->vqs, link) {
1102 		rc = _no_bdev_vdev_vq_worker(vq_info->vq);
1103 	}
1104 
1105 	return rc > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
1106 }
1107 
1108 static void
1109 vhost_blk_pg_unregister_interrupts(struct vhost_user_poll_group *pg)
1110 {
1111 	struct vhost_user_pg_vq_info *vq_info;
1112 	struct spdk_vhost_virtqueue *vq;
1113 
1114 	TAILQ_FOREACH(vq_info, &pg->vqs, link) {
1115 		vq = vq_info->vq;
1116 		if (vq->intr == NULL) {
1117 			break;
1118 		}
1119 
1120 		SPDK_DEBUGLOG(vhost_blk, "unregister vq[%d]'s kickfd is %d\n",
1121 			      vq->vring_idx, vq->vring.kickfd);
1122 		spdk_interrupt_unregister(&vq->intr);
1123 	}
1124 }
1125 
1126 static void
1127 vhost_blk_vq_register_interrupt(struct spdk_vhost_virtqueue *vq)
1128 {
1129 	struct spdk_vhost_session *vsession = vq->vsession;
1130 	struct spdk_vhost_blk_dev *bvdev =  to_blk_dev(vsession->vdev);
1131 
1132 	assert(bvdev != NULL);
1133 
1134 	if (bvdev->bdev) {
1135 		vq->intr = spdk_interrupt_register(vq->vring.kickfd, vdev_vq_worker, vq, "vdev_vq_worker");
1136 	} else {
1137 		vq->intr = spdk_interrupt_register(vq->vring.kickfd, no_bdev_vdev_vq_worker, vq,
1138 						   "no_bdev_vdev_vq_worker");
1139 	}
1140 
1141 	if (vq->intr == NULL) {
1142 		SPDK_ERRLOG("Fail to register req notifier handler.\n");
1143 		assert(false);
1144 	}
1145 }
1146 
1147 static void
1148 add_vq_to_poll_group(void *arg)
1149 {
1150 	struct vhost_user_pg_vq_info *vq_info = arg;
1151 	struct vhost_user_poll_group *pg = vq_info->pg;
1152 
1153 	SPDK_DEBUGLOG(vhost_blk, "%s: vring %u is added to pg %p, thread %s, lcore %u\n",
1154 		      pg->vsession->name,
1155 		      vq_info->vq->vring_idx, pg, spdk_thread_get_name(spdk_get_thread()), spdk_env_get_current_core());
1156 
1157 	TAILQ_INSERT_TAIL(&pg->vqs, vq_info, link);
1158 
1159 	if (spdk_interrupt_mode_is_enabled()) {
1160 		vhost_blk_vq_register_interrupt(vq_info->vq);
1161 	}
1162 }
1163 
1164 static struct vhost_user_poll_group *
1165 get_optimal_poll_group(struct spdk_vhost_blk_session *bvsession)
1166 {
1167 	struct vhost_user_poll_group *pg;
1168 	struct spdk_vhost_blk_dev *bvdev;
1169 
1170 	if (bvsession->bvdev == NULL) {
1171 		return NULL;
1172 	}
1173 
1174 	/* round robin */
1175 	bvdev = bvsession->bvdev;
1176 	if (bvdev->next_pg_index >= bvsession->num_poll_groups) {
1177 		bvdev->next_pg_index = 0;
1178 	}
1179 
1180 	pg = &bvsession->poll_groups[bvdev->next_pg_index];
1181 	bvdev->next_pg_index++;
1182 
1183 	return pg;
1184 }
1185 
1186 static int
1187 vhost_blk_vq_enable(struct spdk_vhost_session *vsession, struct spdk_vhost_virtqueue *vq)
1188 {
1189 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1190 	struct spdk_vhost_dev *vdev;
1191 	struct spdk_vhost_user_dev *user_dev;
1192 	struct vhost_user_pg_vq_info *vq_info;
1193 
1194 	vdev = vsession->vdev;
1195 	user_dev = to_user_dev(vdev);
1196 
1197 	SPDK_DEBUGLOG(vhost_blk, "%s: enable vq %u\n", vsession->name, vq->vring_idx);
1198 
1199 	pthread_mutex_lock(&user_dev->lock);
1200 	if (vsession->started || vsession->starting) {
1201 		pthread_mutex_unlock(&user_dev->lock);
1202 		vq_info = calloc(1, sizeof(*vq_info));
1203 		if (!vq_info) {
1204 			SPDK_ERRLOG("Failed to allocate vq_info\n");
1205 			return -ENOMEM;
1206 		}
1207 		vq_info->vq = vq;
1208 		vq_info->pg = get_optimal_poll_group(bvsession);
1209 		if (vq_info->pg == NULL) {
1210 			free(vq_info);
1211 			return -EFAULT;
1212 		}
1213 		vq->poll_group = (void *)vq_info->pg;
1214 		spdk_thread_send_msg(vq_info->pg->thread, add_vq_to_poll_group, vq_info);
1215 		return 0;
1216 	}
1217 	pthread_mutex_unlock(&user_dev->lock);
1218 
1219 	return 0;
1220 }
1221 
1222 static int
1223 vhost_blk_pg_register_no_bdev_interrupts(struct vhost_user_poll_group *pg)
1224 {
1225 	struct vhost_user_pg_vq_info *vq_info;
1226 	struct spdk_vhost_virtqueue *vq;
1227 
1228 	TAILQ_FOREACH(vq_info, &pg->vqs, link) {
1229 		vq = vq_info->vq;
1230 		SPDK_DEBUGLOG(vhost_blk, "Register vq[%d]'s kickfd is %d\n",
1231 			      vq->vring_idx, vq->vring.kickfd);
1232 		vq->intr = spdk_interrupt_register(vq->vring.kickfd, no_bdev_vdev_vq_worker, vq,
1233 						   "no_bdev_vdev_vq_worker");
1234 		if (vq->intr == NULL) {
1235 			goto err;
1236 		}
1237 
1238 	}
1239 
1240 	return 0;
1241 
1242 err:
1243 	vhost_blk_pg_unregister_interrupts(pg);
1244 	return -1;
1245 }
1246 
1247 static void
1248 vhost_blk_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode)
1249 {
1250 	struct spdk_vhost_blk_session *bvsession = cb_arg;
1251 
1252 	vhost_user_session_set_interrupt_mode(&bvsession->vsession, interrupt_mode);
1253 }
1254 
1255 static void
1256 bdev_event_cpl_cb(struct spdk_vhost_dev *vdev, void *ctx)
1257 {
1258 	enum spdk_bdev_event_type type = (enum spdk_bdev_event_type)(uintptr_t)ctx;
1259 	struct spdk_vhost_blk_dev *bvdev;
1260 
1261 	if (type == SPDK_BDEV_EVENT_REMOVE) {
1262 		/* All sessions have been notified, time to close the bdev */
1263 		bvdev = to_blk_dev(vdev);
1264 		assert(bvdev != NULL);
1265 		spdk_bdev_close(bvdev->bdev_desc);
1266 		bvdev->bdev_desc = NULL;
1267 		bvdev->bdev = NULL;
1268 	}
1269 }
1270 
1271 static int
1272 vhost_session_bdev_resize_cb(struct spdk_vhost_dev *vdev,
1273 			     struct spdk_vhost_session *vsession,
1274 			     void *ctx)
1275 {
1276 	SPDK_NOTICELOG("bdev send slave msg to vid(%d)\n", vsession->vid);
1277 #if RTE_VERSION >= RTE_VERSION_NUM(23, 03, 0, 0)
1278 	rte_vhost_backend_config_change(vsession->vid, false);
1279 #else
1280 	rte_vhost_slave_config_change(vsession->vid, false);
1281 #endif
1282 
1283 	return 0;
1284 }
1285 
1286 static void
1287 vhost_user_blk_resize_cb(struct spdk_vhost_dev *vdev, bdev_event_cb_complete cb, void *cb_arg)
1288 {
1289 	vhost_user_dev_foreach_session(vdev, vhost_session_bdev_resize_cb,
1290 				       cb, cb_arg);
1291 }
1292 
1293 static void
1294 _vhost_user_session_bdev_remove_cb(void *arg)
1295 {
1296 	struct vhost_user_poll_group *pg = arg;
1297 	struct spdk_vhost_session *vsession = pg->vsession;
1298 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1299 	int rc;
1300 
1301 	if (pg->requestq_poller == NULL) {
1302 		return;
1303 	}
1304 
1305 	spdk_poller_unregister(&pg->requestq_poller);
1306 	if (spdk_interrupt_mode_is_enabled()) {
1307 		vhost_blk_pg_unregister_interrupts(pg);
1308 		rc = vhost_blk_pg_register_no_bdev_interrupts(pg);
1309 		if (rc) {
1310 			SPDK_ERRLOG("Interrupt register failed\n");
1311 			return;
1312 		}
1313 	}
1314 
1315 	pg->requestq_poller = SPDK_POLLER_REGISTER(no_bdev_vdev_worker, pg, 0);
1316 	spdk_poller_register_interrupt(pg->requestq_poller, vhost_blk_poller_set_interrupt_mode, bvsession);
1317 }
1318 
1319 static int
1320 vhost_user_session_bdev_remove_cb(struct spdk_vhost_dev *vdev,
1321 				  struct spdk_vhost_session *vsession,
1322 				  void *ctx)
1323 {
1324 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1325 	struct vhost_user_poll_group *pg;
1326 	uint32_t i;
1327 
1328 	for (i = 0; i < bvsession->num_poll_groups; i++) {
1329 		pg = &bvsession->poll_groups[i];
1330 		spdk_thread_send_msg(pg->thread, _vhost_user_session_bdev_remove_cb, pg);
1331 	}
1332 
1333 	return 0;
1334 }
1335 
1336 static void
1337 vhost_user_bdev_remove_cb(struct spdk_vhost_dev *vdev, bdev_event_cb_complete cb, void *cb_arg)
1338 {
1339 	SPDK_WARNLOG("%s: hot-removing bdev - all further requests will fail.\n",
1340 		     vdev->name);
1341 
1342 	vhost_user_dev_foreach_session(vdev, vhost_user_session_bdev_remove_cb,
1343 				       cb, cb_arg);
1344 }
1345 
1346 static void
1347 vhost_user_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_vhost_dev *vdev,
1348 			 bdev_event_cb_complete cb, void *cb_arg)
1349 {
1350 	switch (type) {
1351 	case SPDK_BDEV_EVENT_REMOVE:
1352 		vhost_user_bdev_remove_cb(vdev, cb, cb_arg);
1353 		break;
1354 	case SPDK_BDEV_EVENT_RESIZE:
1355 		vhost_user_blk_resize_cb(vdev, cb, cb_arg);
1356 		break;
1357 	default:
1358 		assert(false);
1359 		return;
1360 	}
1361 }
1362 
1363 static void
1364 bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1365 	      void *event_ctx)
1366 {
1367 	struct spdk_vhost_dev *vdev = (struct spdk_vhost_dev *)event_ctx;
1368 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1369 
1370 	assert(bvdev != NULL);
1371 
1372 	SPDK_DEBUGLOG(vhost_blk, "Bdev event: type %d, name %s\n",
1373 		      type,
1374 		      bdev->name);
1375 
1376 	switch (type) {
1377 	case SPDK_BDEV_EVENT_REMOVE:
1378 	case SPDK_BDEV_EVENT_RESIZE:
1379 		bvdev->ops->bdev_event(type, vdev, bdev_event_cpl_cb, (void *)type);
1380 		break;
1381 	default:
1382 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1383 		break;
1384 	}
1385 }
1386 
1387 static void
1388 free_task_pool(struct spdk_vhost_blk_session *bvsession)
1389 {
1390 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1391 	struct spdk_vhost_virtqueue *vq;
1392 	uint16_t i;
1393 
1394 	for (i = 0; i < vsession->max_queues; i++) {
1395 		vq = &vsession->virtqueue[i];
1396 		if (vq->tasks == NULL) {
1397 			continue;
1398 		}
1399 
1400 		spdk_free(vq->tasks);
1401 		vq->tasks = NULL;
1402 	}
1403 }
1404 
1405 static int
1406 alloc_vq_task_pool(struct spdk_vhost_session *vsession, uint16_t qid)
1407 {
1408 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1409 	struct spdk_vhost_virtqueue *vq;
1410 	struct spdk_vhost_user_blk_task *task;
1411 	uint32_t task_cnt;
1412 	uint32_t j;
1413 
1414 	if (qid >= SPDK_VHOST_MAX_VQUEUES) {
1415 		return -EINVAL;
1416 	}
1417 
1418 	vq = &vsession->virtqueue[qid];
1419 	if (vq->vring.desc == NULL) {
1420 		return 0;
1421 	}
1422 
1423 	task_cnt = vq->vring.size;
1424 	if (task_cnt > SPDK_VHOST_MAX_VQ_SIZE) {
1425 		/* sanity check */
1426 		SPDK_ERRLOG("%s: virtqueue %"PRIu16" is too big. (size = %"PRIu32", max = %"PRIu32")\n",
1427 			    vsession->name, qid, task_cnt, SPDK_VHOST_MAX_VQ_SIZE);
1428 		return -1;
1429 	}
1430 	vq->tasks = spdk_zmalloc(sizeof(struct spdk_vhost_user_blk_task) * task_cnt,
1431 				 SPDK_CACHE_LINE_SIZE, NULL,
1432 				 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1433 	if (vq->tasks == NULL) {
1434 		SPDK_ERRLOG("%s: failed to allocate %"PRIu32" tasks for virtqueue %"PRIu16"\n",
1435 			    vsession->name, task_cnt, qid);
1436 		return -1;
1437 	}
1438 
1439 	for (j = 0; j < task_cnt; j++) {
1440 		task = &((struct spdk_vhost_user_blk_task *)vq->tasks)[j];
1441 		task->bvsession = bvsession;
1442 		task->req_idx = j;
1443 		task->vq = vq;
1444 	}
1445 
1446 	return 0;
1447 }
1448 
1449 static void
1450 session_start_poll_group(void *args)
1451 {
1452 	struct vhost_user_pg_vq_info *vq_info;
1453 	struct vhost_user_poll_group *pg = args;
1454 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(pg->vdev);
1455 	struct spdk_vhost_blk_session *bvsession = to_blk_session(pg->vsession);
1456 
1457 	assert(bvdev != NULL);
1458 
1459 	if (bvdev->bdev) {
1460 		pg->io_channel = vhost_blk_get_io_channel(pg->vdev);
1461 		SPDK_DEBUGLOG(vhost_blk, "%s: pg %p, pg io channel %p, thread %s, lcore %u\n",
1462 			      bvsession->vsession.name, pg,
1463 			      pg->io_channel, spdk_thread_get_name(spdk_get_thread()), spdk_env_get_current_core());
1464 		if (!pg->io_channel) {
1465 			SPDK_ERRLOG("%s: I/O channel allocation failed\n", bvsession->vsession.name);
1466 			return;
1467 		}
1468 	}
1469 
1470 	if (spdk_interrupt_mode_is_enabled()) {
1471 		TAILQ_FOREACH(vq_info, &pg->vqs, link) {
1472 			vhost_blk_vq_register_interrupt(vq_info->vq);
1473 		}
1474 	}
1475 
1476 	if (bvdev->bdev) {
1477 		pg->requestq_poller = SPDK_POLLER_REGISTER(vdev_worker, pg, 0);
1478 	} else {
1479 		pg->requestq_poller = SPDK_POLLER_REGISTER(no_bdev_vdev_worker, pg, 0);
1480 	}
1481 	SPDK_INFOLOG(vhost, "%s: poller started on lcore %d\n",
1482 		     bvsession->vsession.name, spdk_env_get_current_core());
1483 
1484 	spdk_poller_register_interrupt(pg->requestq_poller, vhost_blk_poller_set_interrupt_mode, bvsession);
1485 }
1486 
1487 static int
1488 session_start_poll_groups(struct spdk_vhost_dev *vdev, struct spdk_vhost_session *vsession)
1489 {
1490 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1491 	struct vhost_user_poll_group *pg;
1492 	struct vhost_user_pg_vq_info *vq_info;
1493 	struct spdk_cpuset *cpumask;
1494 	char thread_name[128];
1495 	uint32_t i, index = 0;
1496 	int rc = 0;
1497 
1498 	bvsession->thread = vdev->thread;
1499 	cpumask = spdk_thread_get_cpumask(vdev->thread);
1500 	/* If no cpumask is input by user, we still start one thread for the device */
1501 	if (vdev->use_default_cpumask) {
1502 		bvsession->num_poll_groups = 1;
1503 	} else {
1504 		bvsession->num_poll_groups = spdk_cpuset_count(cpumask);
1505 	}
1506 	bvsession->poll_groups = calloc(bvsession->num_poll_groups, sizeof(struct vhost_user_poll_group));
1507 	if (!bvsession->poll_groups) {
1508 		SPDK_ERRLOG("Failed to allocate poll groups\n");
1509 		return -ENOMEM;
1510 	}
1511 
1512 	for (i = 0; i < bvsession->num_poll_groups; i++) {
1513 		pg = &bvsession->poll_groups[i];
1514 		TAILQ_INIT(&pg->vqs);
1515 	}
1516 
1517 	for (i = 0; i < vsession->max_queues; i++) {
1518 		vq_info = calloc(1, sizeof(*vq_info));
1519 		if (!vq_info) {
1520 			SPDK_ERRLOG("Failed to allocate vq_info\n");
1521 			rc = -ENOMEM;
1522 			goto err;
1523 		}
1524 		vq_info->vq = &vsession->virtqueue[i];
1525 		vq_info->vsession = vsession;
1526 
1527 		pg = get_optimal_poll_group(bvsession);
1528 		if (pg == NULL) {
1529 			free(vq_info);
1530 			rc = -EFAULT;
1531 			goto err;
1532 		}
1533 		vq_info->pg = pg;
1534 		vq_info->vq->poll_group = pg;
1535 
1536 		SPDK_DEBUGLOG(vhost_blk, "%s: vring %u is added to pg %p\n", vsession->name, i, pg);
1537 		TAILQ_INSERT_TAIL(&pg->vqs, vq_info, link);
1538 	}
1539 
1540 	SPDK_ENV_FOREACH_CORE(i) {
1541 		if (!spdk_cpuset_get_cpu(cpumask, i)) {
1542 			continue;
1543 		}
1544 
1545 		snprintf(thread_name, sizeof(thread_name), "%s.%u_%u", vdev->name, vsession->vid, i);
1546 		pg = &bvsession->poll_groups[index];
1547 		pg->vdev = vdev;
1548 		pg->vsession = vsession;
1549 		pg->thread = spdk_thread_create(thread_name, cpumask);
1550 		if (!pg->thread) {
1551 			SPDK_ERRLOG("Failed to create %s session %d poll groups\n", vdev->name, vsession->vid);
1552 			rc = -EFAULT;
1553 			goto err;
1554 		}
1555 		spdk_thread_send_msg(pg->thread, session_start_poll_group, pg);
1556 		index++;
1557 		if (index == bvsession->num_poll_groups) {
1558 			break;
1559 		}
1560 	}
1561 
1562 	return 0;
1563 
1564 err:
1565 	session_stop_poll_groups(bvsession);
1566 	return rc;
1567 }
1568 
1569 static int
1570 vhost_blk_start(struct spdk_vhost_dev *vdev,
1571 		struct spdk_vhost_session *vsession, void *unused)
1572 {
1573 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1574 	struct spdk_vhost_blk_dev *bvdev;
1575 	int i;
1576 
1577 	/* return if start is already in progress */
1578 	if (vsession->started || vsession->starting) {
1579 		SPDK_INFOLOG(vhost, "%s: is starting or started\n", vsession->name);
1580 		return -EINPROGRESS;
1581 	}
1582 
1583 	/* validate all I/O queues are in a contiguous index range */
1584 	for (i = 0; i < vsession->max_queues; i++) {
1585 		/* vring.desc and vring.desc_packed are in a union struct
1586 		 * so q->vring.desc can replace q->vring.desc_packed.
1587 		 */
1588 		if (vsession->virtqueue[i].vring.desc == NULL) {
1589 			SPDK_ERRLOG("%s: queue %"PRIu32" is empty\n", vsession->name, i);
1590 			return -1;
1591 		}
1592 	}
1593 
1594 	bvdev = to_blk_dev(vdev);
1595 	assert(bvdev != NULL);
1596 	bvsession->bvdev = bvdev;
1597 
1598 	return session_start_poll_groups(vdev, vsession);
1599 }
1600 
1601 static void
1602 session_stop_poll_group_done(void *arg)
1603 {
1604 	struct spdk_vhost_blk_session *bvession = arg;
1605 
1606 	bvession->num_stopped_poll_groups++;
1607 }
1608 
1609 static int
1610 pg_stop_poller_cb(void *args)
1611 {
1612 	struct vhost_user_poll_group *pg = args;
1613 	struct spdk_vhost_blk_session *bvsession;
1614 	struct vhost_user_pg_vq_info *vq_info, *tmp;
1615 
1616 	if (!pg->task_cnt) {
1617 		TAILQ_FOREACH_SAFE(vq_info, &pg->vqs, link, tmp) {
1618 			TAILQ_REMOVE(&pg->vqs, vq_info, link);
1619 			vq_info->vq->next_event_time = 0;
1620 			vhost_vq_used_signal(pg->vsession, vq_info->vq);
1621 			free(vq_info);
1622 		}
1623 		goto done;
1624 	}
1625 
1626 	pg->stop_retry_count--;
1627 	if (pg->stop_retry_count) {
1628 		return SPDK_POLLER_IDLE;
1629 	}
1630 
1631 done:
1632 	SPDK_INFOLOG(vhost, "%s: stopping poller on lcore %d\n",
1633 		     pg->vsession->name, spdk_env_get_current_core());
1634 
1635 	spdk_poller_unregister(&pg->stop_poller);
1636 	if (pg->io_channel) {
1637 		vhost_blk_put_io_channel(pg->io_channel);
1638 		pg->io_channel = NULL;
1639 	}
1640 
1641 	bvsession = to_blk_session(pg->vsession);
1642 	spdk_thread_exit(pg->thread);
1643 	spdk_thread_send_msg(bvsession->thread, session_stop_poll_group_done, bvsession);
1644 
1645 	return SPDK_POLLER_BUSY;
1646 }
1647 
1648 static void
1649 session_stop_poll_group(void *args)
1650 {
1651 	struct vhost_user_poll_group *pg = args;
1652 
1653 	spdk_poller_unregister(&pg->requestq_poller);
1654 	vhost_blk_pg_unregister_interrupts(pg);
1655 
1656 	/* Timeout value should be less than SPDK_VHOST_SESSION_STOP_RETRY_TIMEOUT_IN_SEC */
1657 	pg->stop_retry_count = (SPDK_VHOST_SESSION_STOP_TIMEOUT_IN_SEC * 1000 *
1658 				1000) / SPDK_VHOST_SESSION_STOP_RETRY_PERIOD_IN_US;
1659 	pg->stop_poller = SPDK_POLLER_REGISTER(pg_stop_poller_cb, pg,
1660 					       SPDK_VHOST_SESSION_STOP_RETRY_PERIOD_IN_US);
1661 }
1662 
1663 static void
1664 session_stop_poll_groups(struct spdk_vhost_blk_session *bvsession)
1665 {
1666 	uint32_t i;
1667 	struct vhost_user_poll_group *pg;
1668 
1669 	bvsession->num_stopped_poll_groups = 0;
1670 	for (i = 0; i < bvsession->num_poll_groups; i++) {
1671 		pg = &bvsession->poll_groups[i];
1672 		if (pg->thread) {
1673 			spdk_thread_send_msg(pg->thread, session_stop_poll_group, pg);
1674 		}
1675 	}
1676 }
1677 
1678 static int
1679 destroy_session_poller_cb(void *arg)
1680 {
1681 	struct spdk_vhost_blk_session *bvsession = arg;
1682 	struct spdk_vhost_session *vsession = &bvsession->vsession;
1683 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vsession->vdev);
1684 
1685 	if ((bvsession->num_stopped_poll_groups != bvsession->num_poll_groups) ||
1686 	    (pthread_mutex_trylock(&user_dev->lock) != 0)) {
1687 		assert(vsession->stop_retry_count > 0);
1688 		vsession->stop_retry_count--;
1689 		if (vsession->stop_retry_count == 0) {
1690 			SPDK_ERRLOG("%s: Timedout when destroy session (number of stopped pg %d)\n", vsession->name,
1691 				    bvsession->num_stopped_poll_groups);
1692 			spdk_poller_unregister(&bvsession->stop_poller);
1693 			vhost_user_session_stop_done(vsession, -ETIMEDOUT);
1694 		}
1695 
1696 		return SPDK_POLLER_BUSY;
1697 	}
1698 
1699 	SPDK_DEBUGLOG(vhost_blk, "%s: session stopped\n", vsession->name);
1700 	free(bvsession->poll_groups);
1701 	free_task_pool(bvsession);
1702 	spdk_poller_unregister(&bvsession->stop_poller);
1703 	vhost_user_session_stop_done(vsession, 0);
1704 
1705 	pthread_mutex_unlock(&user_dev->lock);
1706 	return SPDK_POLLER_BUSY;
1707 }
1708 
1709 static int
1710 vhost_blk_stop(struct spdk_vhost_dev *vdev,
1711 	       struct spdk_vhost_session *vsession, void *unused)
1712 {
1713 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1714 
1715 	/* return if stop is already in progress */
1716 	if (bvsession->stop_poller) {
1717 		return -EINPROGRESS;
1718 	}
1719 
1720 	session_stop_poll_groups(bvsession);
1721 
1722 	bvsession->vsession.stop_retry_count = (SPDK_VHOST_SESSION_STOP_RETRY_TIMEOUT_IN_SEC * 1000 *
1723 						1000) / SPDK_VHOST_SESSION_STOP_RETRY_PERIOD_IN_US;
1724 	bvsession->stop_poller = SPDK_POLLER_REGISTER(destroy_session_poller_cb,
1725 				 bvsession, SPDK_VHOST_SESSION_STOP_RETRY_PERIOD_IN_US);
1726 	return 0;
1727 }
1728 
1729 static void
1730 vhost_blk_dump_info_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_ctx *w)
1731 {
1732 	struct spdk_vhost_blk_dev *bvdev;
1733 
1734 	bvdev = to_blk_dev(vdev);
1735 	assert(bvdev != NULL);
1736 
1737 	spdk_json_write_named_object_begin(w, "block");
1738 
1739 	spdk_json_write_named_bool(w, "readonly", bvdev->readonly);
1740 
1741 	spdk_json_write_name(w, "bdev");
1742 	if (bvdev->bdev) {
1743 		spdk_json_write_string(w, spdk_bdev_get_name(bvdev->bdev));
1744 	} else {
1745 		spdk_json_write_null(w);
1746 	}
1747 	spdk_json_write_named_string(w, "transport", bvdev->ops->name);
1748 
1749 	spdk_json_write_object_end(w);
1750 }
1751 
1752 static void
1753 vhost_blk_write_config_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_ctx *w)
1754 {
1755 	struct spdk_vhost_blk_dev *bvdev;
1756 
1757 	bvdev = to_blk_dev(vdev);
1758 	assert(bvdev != NULL);
1759 
1760 	if (!bvdev->bdev) {
1761 		return;
1762 	}
1763 
1764 	spdk_json_write_object_begin(w);
1765 	spdk_json_write_named_string(w, "method", "vhost_create_blk_controller");
1766 
1767 	spdk_json_write_named_object_begin(w, "params");
1768 	spdk_json_write_named_string(w, "ctrlr", vdev->name);
1769 	spdk_json_write_named_string(w, "dev_name", spdk_bdev_get_name(bvdev->bdev));
1770 	spdk_json_write_named_string(w, "cpumask",
1771 				     spdk_cpuset_fmt(spdk_thread_get_cpumask(vdev->thread)));
1772 	spdk_json_write_named_bool(w, "readonly", bvdev->readonly);
1773 	spdk_json_write_named_string(w, "transport", bvdev->ops->name);
1774 	spdk_json_write_object_end(w);
1775 
1776 	spdk_json_write_object_end(w);
1777 }
1778 
1779 static int vhost_blk_destroy(struct spdk_vhost_dev *dev);
1780 
1781 static int
1782 vhost_blk_get_config(struct spdk_vhost_dev *vdev, uint8_t *config,
1783 		     uint32_t len)
1784 {
1785 	struct virtio_blk_config blkcfg;
1786 	struct spdk_bdev *bdev;
1787 	uint32_t blk_size;
1788 	uint64_t blkcnt;
1789 
1790 	memset(&blkcfg, 0, sizeof(blkcfg));
1791 	bdev = vhost_blk_get_bdev(vdev);
1792 	if (bdev == NULL) {
1793 		/* We can't just return -1 here as this GET_CONFIG message might
1794 		 * be caused by a QEMU VM reboot. Returning -1 will indicate an
1795 		 * error to QEMU, who might then decide to terminate itself.
1796 		 * We don't want that. A simple reboot shouldn't break the system.
1797 		 *
1798 		 * Presenting a block device with block size 0 and block count 0
1799 		 * doesn't cause any problems on QEMU side and the virtio-pci
1800 		 * device is even still available inside the VM, but there will
1801 		 * be no block device created for it - the kernel drivers will
1802 		 * silently reject it.
1803 		 */
1804 		blk_size = 0;
1805 		blkcnt = 0;
1806 	} else {
1807 		blk_size = spdk_bdev_get_block_size(bdev);
1808 		blkcnt = spdk_bdev_get_num_blocks(bdev);
1809 		if (spdk_bdev_get_buf_align(bdev) > 1) {
1810 			blkcfg.size_max = SPDK_BDEV_LARGE_BUF_MAX_SIZE;
1811 			blkcfg.seg_max = spdk_min(SPDK_VHOST_IOVS_MAX - 2 - 1, SPDK_BDEV_IO_NUM_CHILD_IOV - 2 - 1);
1812 		} else {
1813 			blkcfg.size_max = 131072;
1814 			/*  -2 for REQ and RESP and -1 for region boundary splitting */
1815 			blkcfg.seg_max = SPDK_VHOST_IOVS_MAX - 2 - 1;
1816 		}
1817 	}
1818 
1819 	blkcfg.blk_size = blk_size;
1820 	/* minimum I/O size in blocks */
1821 	blkcfg.min_io_size = 1;
1822 	/* expressed in 512 Bytes sectors */
1823 	blkcfg.capacity = (blkcnt * blk_size) / 512;
1824 	/* QEMU can overwrite this value when started */
1825 	blkcfg.num_queues = SPDK_VHOST_MAX_VQUEUES;
1826 
1827 	if (bdev && spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1828 		/* 16MiB, expressed in 512 Bytes */
1829 		blkcfg.max_discard_sectors = 32768;
1830 		blkcfg.max_discard_seg = 1;
1831 		blkcfg.discard_sector_alignment = blk_size / 512;
1832 	}
1833 	if (bdev && spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1834 		blkcfg.max_write_zeroes_sectors = 32768;
1835 		blkcfg.max_write_zeroes_seg = 1;
1836 	}
1837 
1838 	memcpy(config, &blkcfg, spdk_min(len, sizeof(blkcfg)));
1839 
1840 	return 0;
1841 }
1842 
1843 static int
1844 vhost_blk_set_coalescing(struct spdk_vhost_dev *vdev, uint32_t delay_base_us,
1845 			 uint32_t iops_threshold)
1846 {
1847 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1848 
1849 	assert(bvdev != NULL);
1850 
1851 	return bvdev->ops->set_coalescing(vdev, delay_base_us, iops_threshold);
1852 }
1853 
1854 static void
1855 vhost_blk_get_coalescing(struct spdk_vhost_dev *vdev, uint32_t *delay_base_us,
1856 			 uint32_t *iops_threshold)
1857 {
1858 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1859 
1860 	assert(bvdev != NULL);
1861 
1862 	bvdev->ops->get_coalescing(vdev, delay_base_us, iops_threshold);
1863 }
1864 
1865 static const struct spdk_vhost_user_dev_backend vhost_blk_user_device_backend = {
1866 	.session_ctx_size = sizeof(struct spdk_vhost_blk_session) - sizeof(struct spdk_vhost_session),
1867 	.start_session =  vhost_blk_start,
1868 	.stop_session = vhost_blk_stop,
1869 	.alloc_vq_tasks = alloc_vq_task_pool,
1870 	.enable_vq = vhost_blk_vq_enable,
1871 };
1872 
1873 static const struct spdk_vhost_dev_backend vhost_blk_device_backend = {
1874 	.type = VHOST_BACKEND_BLK,
1875 	.vhost_get_config = vhost_blk_get_config,
1876 	.dump_info_json = vhost_blk_dump_info_json,
1877 	.write_config_json = vhost_blk_write_config_json,
1878 	.remove_device = vhost_blk_destroy,
1879 	.set_coalescing = vhost_blk_set_coalescing,
1880 	.get_coalescing = vhost_blk_get_coalescing,
1881 };
1882 
1883 int
1884 virtio_blk_construct_ctrlr(struct spdk_vhost_dev *vdev, const char *address,
1885 			   struct spdk_cpuset *cpumask, const struct spdk_json_val *params,
1886 			   const struct spdk_vhost_user_dev_backend *user_backend)
1887 {
1888 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1889 
1890 	assert(bvdev != NULL);
1891 
1892 	return bvdev->ops->create_ctrlr(vdev, cpumask, address, params, (void *)user_backend);
1893 }
1894 
1895 int
1896 spdk_vhost_blk_construct(const char *name, const char *cpumask, const char *dev_name,
1897 			 const char *transport, const struct spdk_json_val *params)
1898 {
1899 	struct spdk_vhost_blk_dev *bvdev = NULL;
1900 	struct spdk_vhost_dev *vdev;
1901 	struct spdk_bdev *bdev;
1902 	const char *transport_name = VIRTIO_BLK_DEFAULT_TRANSPORT;
1903 	int ret = 0;
1904 
1905 	bvdev = calloc(1, sizeof(*bvdev));
1906 	if (bvdev == NULL) {
1907 		ret = -ENOMEM;
1908 		goto out;
1909 	}
1910 
1911 	if (transport != NULL) {
1912 		transport_name = transport;
1913 	}
1914 
1915 	bvdev->ops = virtio_blk_get_transport_ops(transport_name);
1916 	if (!bvdev->ops) {
1917 		ret = -EINVAL;
1918 		SPDK_ERRLOG("Transport type '%s' unavailable.\n", transport_name);
1919 		goto out;
1920 	}
1921 
1922 	ret = spdk_bdev_open_ext(dev_name, true, bdev_event_cb, bvdev, &bvdev->bdev_desc);
1923 	if (ret != 0) {
1924 		SPDK_ERRLOG("%s: could not open bdev '%s', error=%d\n",
1925 			    name, dev_name, ret);
1926 		goto out;
1927 	}
1928 	bdev = spdk_bdev_desc_get_bdev(bvdev->bdev_desc);
1929 
1930 	vdev = &bvdev->vdev;
1931 	vdev->virtio_features = SPDK_VHOST_BLK_FEATURES_BASE;
1932 	vdev->disabled_features = SPDK_VHOST_BLK_DISABLED_FEATURES;
1933 	vdev->protocol_features = SPDK_VHOST_BLK_PROTOCOL_FEATURES;
1934 
1935 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1936 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_DISCARD);
1937 	}
1938 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1939 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_WRITE_ZEROES);
1940 	}
1941 
1942 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_FLUSH)) {
1943 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_FLUSH);
1944 	}
1945 
1946 	bvdev->bdev = bdev;
1947 	bvdev->readonly = false;
1948 	ret = vhost_dev_register(vdev, name, cpumask, params, &vhost_blk_device_backend,
1949 				 &vhost_blk_user_device_backend, false);
1950 	if (ret != 0) {
1951 		spdk_bdev_close(bvdev->bdev_desc);
1952 		goto out;
1953 	}
1954 
1955 	SPDK_INFOLOG(vhost, "%s: using bdev '%s'\n", name, dev_name);
1956 out:
1957 	if (ret != 0 && bvdev) {
1958 		free(bvdev);
1959 	}
1960 	return ret;
1961 }
1962 
1963 int
1964 virtio_blk_destroy_ctrlr(struct spdk_vhost_dev *vdev)
1965 {
1966 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1967 
1968 	assert(bvdev != NULL);
1969 
1970 	return bvdev->ops->destroy_ctrlr(vdev);
1971 }
1972 
1973 static int
1974 vhost_blk_destroy(struct spdk_vhost_dev *vdev)
1975 {
1976 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1977 	int rc;
1978 
1979 	assert(bvdev != NULL);
1980 
1981 	rc = vhost_dev_unregister(&bvdev->vdev);
1982 	if (rc != 0) {
1983 		return rc;
1984 	}
1985 
1986 	if (bvdev->bdev_desc) {
1987 		spdk_bdev_close(bvdev->bdev_desc);
1988 		bvdev->bdev_desc = NULL;
1989 	}
1990 	bvdev->bdev = NULL;
1991 
1992 	free(bvdev);
1993 	return 0;
1994 }
1995 
1996 struct spdk_io_channel *
1997 vhost_blk_get_io_channel(struct spdk_vhost_dev *vdev)
1998 {
1999 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
2000 
2001 	assert(bvdev != NULL);
2002 
2003 	return spdk_bdev_get_io_channel(bvdev->bdev_desc);
2004 }
2005 
2006 void
2007 vhost_blk_put_io_channel(struct spdk_io_channel *ch)
2008 {
2009 	spdk_put_io_channel(ch);
2010 }
2011 
2012 static struct spdk_virtio_blk_transport *
2013 vhost_user_blk_create(const struct spdk_json_val *params)
2014 {
2015 	int ret;
2016 	struct spdk_virtio_blk_transport *vhost_user_blk;
2017 
2018 	vhost_user_blk = calloc(1, sizeof(*vhost_user_blk));
2019 	if (!vhost_user_blk) {
2020 		return NULL;
2021 	}
2022 
2023 	ret = vhost_user_init();
2024 	if (ret != 0) {
2025 		free(vhost_user_blk);
2026 		return NULL;
2027 	}
2028 
2029 	return vhost_user_blk;
2030 }
2031 
2032 static int
2033 vhost_user_blk_destroy(struct spdk_virtio_blk_transport *transport,
2034 		       spdk_vhost_fini_cb cb_fn)
2035 {
2036 	vhost_user_fini(cb_fn);
2037 	free(transport);
2038 	return 0;
2039 }
2040 
2041 struct rpc_vhost_blk {
2042 	bool readonly;
2043 	bool packed_ring;
2044 };
2045 
2046 static const struct spdk_json_object_decoder rpc_construct_vhost_blk[] = {
2047 	{"readonly", offsetof(struct rpc_vhost_blk, readonly), spdk_json_decode_bool, true},
2048 	{"packed_ring", offsetof(struct rpc_vhost_blk, packed_ring), spdk_json_decode_bool, true},
2049 };
2050 
2051 static int
2052 vhost_user_blk_create_ctrlr(struct spdk_vhost_dev *vdev, struct spdk_cpuset *cpumask,
2053 			    const char *address, const struct spdk_json_val *params, void *custom_opts)
2054 {
2055 	struct rpc_vhost_blk req = {0};
2056 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
2057 
2058 	assert(bvdev != NULL);
2059 
2060 	if (spdk_json_decode_object_relaxed(params, rpc_construct_vhost_blk,
2061 					    SPDK_COUNTOF(rpc_construct_vhost_blk),
2062 					    &req)) {
2063 		SPDK_DEBUGLOG(vhost_blk, "spdk_json_decode_object failed\n");
2064 		return -EINVAL;
2065 	}
2066 
2067 	if (req.packed_ring) {
2068 		vdev->virtio_features |= (uint64_t)req.packed_ring << VIRTIO_F_RING_PACKED;
2069 	}
2070 	if (req.readonly) {
2071 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_RO);
2072 		bvdev->readonly = req.readonly;
2073 	}
2074 
2075 	return vhost_user_dev_create(vdev, address, cpumask, custom_opts, false);
2076 }
2077 
2078 static int
2079 vhost_user_blk_destroy_ctrlr(struct spdk_vhost_dev *vdev)
2080 {
2081 	return vhost_user_dev_unregister(vdev);
2082 }
2083 
2084 static void
2085 vhost_user_blk_dump_opts(struct spdk_virtio_blk_transport *transport, struct spdk_json_write_ctx *w)
2086 {
2087 	assert(w != NULL);
2088 
2089 	spdk_json_write_named_string(w, "name", transport->ops->name);
2090 }
2091 
2092 static const struct spdk_virtio_blk_transport_ops vhost_user_blk = {
2093 	.name = "vhost_user_blk",
2094 
2095 	.dump_opts = vhost_user_blk_dump_opts,
2096 
2097 	.create = vhost_user_blk_create,
2098 	.destroy = vhost_user_blk_destroy,
2099 
2100 	.create_ctrlr = vhost_user_blk_create_ctrlr,
2101 	.destroy_ctrlr = vhost_user_blk_destroy_ctrlr,
2102 
2103 	.bdev_event = vhost_user_bdev_event_cb,
2104 	.set_coalescing = vhost_user_set_coalescing,
2105 	.get_coalescing = vhost_user_get_coalescing,
2106 };
2107 
2108 SPDK_VIRTIO_BLK_TRANSPORT_REGISTER(vhost_user_blk, &vhost_user_blk);
2109 
2110 SPDK_LOG_REGISTER_COMPONENT(vhost_blk)
2111 SPDK_LOG_REGISTER_COMPONENT(vhost_blk_data)
2112