xref: /spdk/lib/vhost/vhost_blk.c (revision 7192849ed24874f3e9cc31e8a33a9b32c49b9506)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include <linux/virtio_blk.h>
35 
36 #include "spdk/env.h"
37 #include "spdk/bdev.h"
38 #include "spdk/bdev_module.h"
39 #include "spdk/conf.h"
40 #include "spdk/thread.h"
41 #include "spdk/likely.h"
42 #include "spdk/string.h"
43 #include "spdk/util.h"
44 #include "spdk/vhost.h"
45 
46 #include "vhost_internal.h"
47 
48 /* Minimal set of features supported by every SPDK VHOST-BLK device */
49 #define SPDK_VHOST_BLK_FEATURES_BASE (SPDK_VHOST_FEATURES | \
50 		(1ULL << VIRTIO_BLK_F_SIZE_MAX) | (1ULL << VIRTIO_BLK_F_SEG_MAX) | \
51 		(1ULL << VIRTIO_BLK_F_GEOMETRY) | (1ULL << VIRTIO_BLK_F_BLK_SIZE) | \
52 		(1ULL << VIRTIO_BLK_F_TOPOLOGY) | (1ULL << VIRTIO_BLK_F_BARRIER)  | \
53 		(1ULL << VIRTIO_BLK_F_SCSI)     | (1ULL << VIRTIO_BLK_F_CONFIG_WCE) | \
54 		(1ULL << VIRTIO_BLK_F_MQ))
55 
56 /* Not supported features */
57 #define SPDK_VHOST_BLK_DISABLED_FEATURES (SPDK_VHOST_DISABLED_FEATURES | \
58 		(1ULL << VIRTIO_BLK_F_GEOMETRY) | (1ULL << VIRTIO_BLK_F_CONFIG_WCE) | \
59 		(1ULL << VIRTIO_BLK_F_BARRIER)  | (1ULL << VIRTIO_BLK_F_SCSI))
60 
61 /* Vhost-blk support protocol features */
62 #ifndef SPDK_CONFIG_VHOST_INTERNAL_LIB
63 #define SPDK_VHOST_BLK_PROTOCOL_FEATURES ((1ULL << VHOST_USER_PROTOCOL_F_CONFIG) | \
64 		(1ULL << VHOST_USER_PROTOCOL_F_INFLIGHT_SHMFD))
65 #else
66 #define SPDK_VHOST_BLK_PROTOCOL_FEATURES (1ULL << VHOST_USER_PROTOCOL_F_CONFIG)
67 #endif
68 
69 struct spdk_vhost_blk_task {
70 	struct spdk_bdev_io *bdev_io;
71 	struct spdk_vhost_blk_session *bvsession;
72 	struct spdk_vhost_virtqueue *vq;
73 
74 	volatile uint8_t *status;
75 
76 	uint16_t req_idx;
77 	uint16_t num_descs;
78 	uint16_t buffer_id;
79 
80 	/* for io wait */
81 	struct spdk_bdev_io_wait_entry bdev_io_wait;
82 
83 	/* If set, the task is currently used for I/O processing. */
84 	bool used;
85 
86 	/** Number of bytes that were written. */
87 	uint32_t used_len;
88 	uint16_t iovcnt;
89 	struct iovec iovs[SPDK_VHOST_IOVS_MAX];
90 };
91 
92 struct spdk_vhost_blk_dev {
93 	struct spdk_vhost_dev vdev;
94 	struct spdk_bdev *bdev;
95 	struct spdk_bdev_desc *bdev_desc;
96 	/* dummy_io_channel is used to hold a bdev reference */
97 	struct spdk_io_channel *dummy_io_channel;
98 	bool readonly;
99 };
100 
101 struct spdk_vhost_blk_session {
102 	/* The parent session must be the very first field in this struct */
103 	struct spdk_vhost_session vsession;
104 	struct spdk_vhost_blk_dev *bvdev;
105 	struct spdk_poller *requestq_poller;
106 	struct spdk_io_channel *io_channel;
107 	struct spdk_poller *stop_poller;
108 };
109 
110 /* forward declaration */
111 static const struct spdk_vhost_dev_backend vhost_blk_device_backend;
112 
113 static int
114 process_blk_request(struct spdk_vhost_blk_task *task,
115 		    struct spdk_vhost_blk_session *bvsession,
116 		    struct spdk_vhost_virtqueue *vq);
117 
118 static void
119 blk_task_finish(struct spdk_vhost_blk_task *task)
120 {
121 	assert(task->bvsession->vsession.task_cnt > 0);
122 	task->bvsession->vsession.task_cnt--;
123 	task->used = false;
124 }
125 
126 static void
127 blk_task_init(struct spdk_vhost_blk_task *task)
128 {
129 	task->used = true;
130 	task->iovcnt = SPDK_COUNTOF(task->iovs);
131 	task->status = NULL;
132 	task->used_len = 0;
133 }
134 
135 static void
136 blk_task_enqueue(struct spdk_vhost_blk_task *task)
137 {
138 	if (task->vq->packed.packed_ring) {
139 		vhost_vq_packed_ring_enqueue(&task->bvsession->vsession, task->vq,
140 					     task->num_descs,
141 					     task->buffer_id, task->used_len);
142 	} else {
143 		vhost_vq_used_ring_enqueue(&task->bvsession->vsession, task->vq,
144 					   task->req_idx, task->used_len);
145 	}
146 }
147 
148 static void
149 invalid_blk_request(struct spdk_vhost_blk_task *task, uint8_t status)
150 {
151 	if (task->status) {
152 		*task->status = status;
153 	}
154 
155 	blk_task_enqueue(task);
156 	blk_task_finish(task);
157 	SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK_DATA, "Invalid request (status=%" PRIu8")\n", status);
158 }
159 
160 /*
161  * Process task's descriptor chain and setup data related fields.
162  * Return
163  *   total size of suplied buffers
164  *
165  *   FIXME: Make this function return to rd_cnt and wr_cnt
166  */
167 static int
168 blk_iovs_split_queue_setup(struct spdk_vhost_blk_session *bvsession,
169 			   struct spdk_vhost_virtqueue *vq,
170 			   uint16_t req_idx, struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
171 {
172 	struct spdk_vhost_session *vsession = &bvsession->vsession;
173 	struct spdk_vhost_dev *vdev = vsession->vdev;
174 	struct vring_desc *desc, *desc_table;
175 	uint16_t out_cnt = 0, cnt = 0;
176 	uint32_t desc_table_size, len = 0;
177 	uint32_t desc_handled_cnt;
178 	int rc;
179 
180 	rc = vhost_vq_get_desc(vsession, vq, req_idx, &desc, &desc_table, &desc_table_size);
181 	if (rc != 0) {
182 		SPDK_ERRLOG("%s: invalid descriptor at index %"PRIu16".\n", vdev->name, req_idx);
183 		return -1;
184 	}
185 
186 	desc_handled_cnt = 0;
187 	while (1) {
188 		/*
189 		 * Maximum cnt reached?
190 		 * Should not happen if request is well formatted, otherwise this is a BUG.
191 		 */
192 		if (spdk_unlikely(cnt == *iovs_cnt)) {
193 			SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK, "%s: max IOVs in request reached (req_idx = %"PRIu16").\n",
194 				      vsession->name, req_idx);
195 			return -1;
196 		}
197 
198 		if (spdk_unlikely(vhost_vring_desc_to_iov(vsession, iovs, &cnt, desc))) {
199 			SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK, "%s: invalid descriptor %" PRIu16" (req_idx = %"PRIu16").\n",
200 				      vsession->name, req_idx, cnt);
201 			return -1;
202 		}
203 
204 		len += desc->len;
205 
206 		out_cnt += vhost_vring_desc_is_wr(desc);
207 
208 		rc = vhost_vring_desc_get_next(&desc, desc_table, desc_table_size);
209 		if (rc != 0) {
210 			SPDK_ERRLOG("%s: descriptor chain at index %"PRIu16" terminated unexpectedly.\n",
211 				    vsession->name, req_idx);
212 			return -1;
213 		} else if (desc == NULL) {
214 			break;
215 		}
216 
217 		desc_handled_cnt++;
218 		if (spdk_unlikely(desc_handled_cnt > desc_table_size)) {
219 			/* Break a cycle and report an error, if any. */
220 			SPDK_ERRLOG("%s: found a cycle in the descriptor chain: desc_table_size = %d, desc_handled_cnt = %d.\n",
221 				    vsession->name, desc_table_size, desc_handled_cnt);
222 			return -1;
223 		}
224 	}
225 
226 	/*
227 	 * There must be least two descriptors.
228 	 * First contain request so it must be readable.
229 	 * Last descriptor contain buffer for response so it must be writable.
230 	 */
231 	if (spdk_unlikely(out_cnt == 0 || cnt < 2)) {
232 		return -1;
233 	}
234 
235 	*length = len;
236 	*iovs_cnt = cnt;
237 	return 0;
238 }
239 
240 static int
241 blk_iovs_packed_queue_setup(struct spdk_vhost_blk_session *bvsession,
242 			    struct spdk_vhost_virtqueue *vq,
243 			    uint16_t req_idx, struct iovec *iovs, uint16_t *iovs_cnt, uint32_t *length)
244 {
245 	struct spdk_vhost_session *vsession = &bvsession->vsession;
246 	struct spdk_vhost_dev *vdev = vsession->vdev;
247 	struct vring_packed_desc *desc = NULL, *desc_table;
248 	uint16_t out_cnt = 0, cnt = 0;
249 	uint32_t desc_table_size, len = 0;
250 	int rc = 0;
251 
252 	rc = vhost_vq_get_desc_packed(vsession, vq, req_idx, &desc,
253 				      &desc_table, &desc_table_size);
254 	if (spdk_unlikely(rc != 0)) {
255 		SPDK_ERRLOG("%s: Invalid descriptor at index %"PRIu16".\n", vdev->name, req_idx);
256 		return rc;
257 	}
258 
259 	if (desc_table != NULL) {
260 		req_idx = 0;
261 	}
262 
263 	while (1) {
264 		/*
265 		 * Maximum cnt reached?
266 		 * Should not happen if request is well formatted, otherwise this is a BUG.
267 		 */
268 		if (spdk_unlikely(cnt == *iovs_cnt)) {
269 			SPDK_ERRLOG("%s: max IOVs in request reached (req_idx = %"PRIu16").\n",
270 				    vsession->name, req_idx);
271 			return -EINVAL;
272 		}
273 
274 		if (spdk_unlikely(vhost_vring_packed_desc_to_iov(vsession, iovs, &cnt, desc))) {
275 			SPDK_ERRLOG("%s: invalid descriptor %" PRIu16" (req_idx = %"PRIu16").\n",
276 				    vsession->name, req_idx, cnt);
277 			return -EINVAL;
278 		}
279 
280 		len += desc->len;
281 		out_cnt += vhost_vring_packed_desc_is_wr(desc);
282 
283 		/* desc is NULL means we reach the last desc of this request */
284 		vhost_vring_packed_desc_get_next(&desc, &req_idx, vq, desc_table, desc_table_size);
285 		if (desc == NULL) {
286 			break;
287 		}
288 	}
289 
290 	/*
291 	 * There must be least two descriptors.
292 	 * First contain request so it must be readable.
293 	 * Last descriptor contain buffer for response so it must be writable.
294 	 */
295 	if (spdk_unlikely(out_cnt == 0 || cnt < 2)) {
296 		return -EINVAL;
297 	}
298 
299 	*length = len;
300 	*iovs_cnt = cnt;
301 
302 	return 0;
303 }
304 
305 static void
306 blk_request_finish(bool success, struct spdk_vhost_blk_task *task)
307 {
308 	*task->status = success ? VIRTIO_BLK_S_OK : VIRTIO_BLK_S_IOERR;
309 
310 	blk_task_enqueue(task);
311 
312 	SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK, "Finished task (%p) req_idx=%d\n status: %s\n", task,
313 		      task->req_idx, success ? "OK" : "FAIL");
314 	blk_task_finish(task);
315 }
316 
317 static void
318 blk_request_complete_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
319 {
320 	struct spdk_vhost_blk_task *task = cb_arg;
321 
322 	spdk_bdev_free_io(bdev_io);
323 	blk_request_finish(success, task);
324 }
325 
326 static void
327 blk_request_resubmit(void *arg)
328 {
329 	struct spdk_vhost_blk_task *task = (struct spdk_vhost_blk_task *)arg;
330 	int rc = 0;
331 
332 	blk_task_init(task);
333 
334 	rc = process_blk_request(task, task->bvsession, task->vq);
335 	if (rc == 0) {
336 		SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK, "====== Task %p resubmitted ======\n", task);
337 	} else {
338 		SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK, "====== Task %p failed ======\n", task);
339 	}
340 }
341 
342 static inline void
343 blk_request_queue_io(struct spdk_vhost_blk_task *task)
344 {
345 	int rc;
346 	struct spdk_vhost_blk_session *bvsession = task->bvsession;
347 	struct spdk_bdev *bdev = bvsession->bvdev->bdev;
348 
349 	task->bdev_io_wait.bdev = bdev;
350 	task->bdev_io_wait.cb_fn = blk_request_resubmit;
351 	task->bdev_io_wait.cb_arg = task;
352 
353 	rc = spdk_bdev_queue_io_wait(bdev, bvsession->io_channel, &task->bdev_io_wait);
354 	if (rc != 0) {
355 		SPDK_ERRLOG("%s: failed to queue I/O, rc=%d\n", bvsession->vsession.name, rc);
356 		invalid_blk_request(task, VIRTIO_BLK_S_IOERR);
357 	}
358 }
359 
360 static int
361 process_blk_request(struct spdk_vhost_blk_task *task,
362 		    struct spdk_vhost_blk_session *bvsession,
363 		    struct spdk_vhost_virtqueue *vq)
364 {
365 	struct spdk_vhost_blk_dev *bvdev = bvsession->bvdev;
366 	const struct virtio_blk_outhdr *req;
367 	struct virtio_blk_discard_write_zeroes *desc;
368 	struct iovec *iov;
369 	uint32_t type;
370 	uint32_t payload_len;
371 	uint64_t flush_bytes;
372 	int rc;
373 
374 	if (vq->packed.packed_ring) {
375 		rc = blk_iovs_packed_queue_setup(bvsession, vq, task->req_idx, task->iovs, &task->iovcnt,
376 						 &payload_len);
377 	} else {
378 		rc = blk_iovs_split_queue_setup(bvsession, vq, task->req_idx, task->iovs, &task->iovcnt,
379 						&payload_len);
380 	}
381 
382 	if (rc) {
383 		SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK, "Invalid request (req_idx = %"PRIu16").\n", task->req_idx);
384 		/* Only READ and WRITE are supported for now. */
385 		invalid_blk_request(task, VIRTIO_BLK_S_UNSUPP);
386 		return -1;
387 	}
388 
389 	iov = &task->iovs[0];
390 	if (spdk_unlikely(iov->iov_len != sizeof(*req))) {
391 		SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK,
392 			      "First descriptor size is %zu but expected %zu (req_idx = %"PRIu16").\n",
393 			      iov->iov_len, sizeof(*req), task->req_idx);
394 		invalid_blk_request(task, VIRTIO_BLK_S_UNSUPP);
395 		return -1;
396 	}
397 
398 	req = iov->iov_base;
399 
400 	iov = &task->iovs[task->iovcnt - 1];
401 	if (spdk_unlikely(iov->iov_len != 1)) {
402 		SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK,
403 			      "Last descriptor size is %zu but expected %d (req_idx = %"PRIu16").\n",
404 			      iov->iov_len, 1, task->req_idx);
405 		invalid_blk_request(task, VIRTIO_BLK_S_UNSUPP);
406 		return -1;
407 	}
408 
409 	task->status = iov->iov_base;
410 	payload_len -= sizeof(*req) + sizeof(*task->status);
411 	task->iovcnt -= 2;
412 
413 	type = req->type;
414 #ifdef VIRTIO_BLK_T_BARRIER
415 	/* Don't care about barier for now (as QEMU's virtio-blk do). */
416 	type &= ~VIRTIO_BLK_T_BARRIER;
417 #endif
418 
419 	switch (type) {
420 	case VIRTIO_BLK_T_IN:
421 	case VIRTIO_BLK_T_OUT:
422 		if (spdk_unlikely(payload_len == 0 || (payload_len & (512 - 1)) != 0)) {
423 			SPDK_ERRLOG("%s - passed IO buffer is not multiple of 512b (req_idx = %"PRIu16").\n",
424 				    type ? "WRITE" : "READ", task->req_idx);
425 			invalid_blk_request(task, VIRTIO_BLK_S_UNSUPP);
426 			return -1;
427 		}
428 
429 		if (type == VIRTIO_BLK_T_IN) {
430 			task->used_len = payload_len + sizeof(*task->status);
431 			rc = spdk_bdev_readv(bvdev->bdev_desc, bvsession->io_channel,
432 					     &task->iovs[1], task->iovcnt, req->sector * 512,
433 					     payload_len, blk_request_complete_cb, task);
434 		} else if (!bvdev->readonly) {
435 			task->used_len = sizeof(*task->status);
436 			rc = spdk_bdev_writev(bvdev->bdev_desc, bvsession->io_channel,
437 					      &task->iovs[1], task->iovcnt, req->sector * 512,
438 					      payload_len, blk_request_complete_cb, task);
439 		} else {
440 			SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK, "Device is in read-only mode!\n");
441 			rc = -1;
442 		}
443 
444 		if (rc) {
445 			if (rc == -ENOMEM) {
446 				SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK, "No memory, start to queue io.\n");
447 				blk_request_queue_io(task);
448 			} else {
449 				invalid_blk_request(task, VIRTIO_BLK_S_IOERR);
450 				return -1;
451 			}
452 		}
453 		break;
454 	case VIRTIO_BLK_T_DISCARD:
455 		desc = task->iovs[1].iov_base;
456 		if (payload_len != sizeof(*desc)) {
457 			SPDK_NOTICELOG("Invalid discard payload size: %u\n", payload_len);
458 			invalid_blk_request(task, VIRTIO_BLK_S_IOERR);
459 			return -1;
460 		}
461 
462 		rc = spdk_bdev_unmap(bvdev->bdev_desc, bvsession->io_channel,
463 				     desc->sector * 512, desc->num_sectors * 512,
464 				     blk_request_complete_cb, task);
465 		if (rc) {
466 			if (rc == -ENOMEM) {
467 				SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK, "No memory, start to queue io.\n");
468 				blk_request_queue_io(task);
469 			} else {
470 				invalid_blk_request(task, VIRTIO_BLK_S_IOERR);
471 				return -1;
472 			}
473 		}
474 		break;
475 	case VIRTIO_BLK_T_WRITE_ZEROES:
476 		desc = task->iovs[1].iov_base;
477 		if (payload_len != sizeof(*desc)) {
478 			SPDK_NOTICELOG("Invalid write zeroes payload size: %u\n", payload_len);
479 			invalid_blk_request(task, VIRTIO_BLK_S_IOERR);
480 			return -1;
481 		}
482 
483 		/* Zeroed and Unmap the range, SPDK doen't support it. */
484 		if (desc->flags & VIRTIO_BLK_WRITE_ZEROES_FLAG_UNMAP) {
485 			SPDK_NOTICELOG("Can't support Write Zeroes with Unmap flag\n");
486 			invalid_blk_request(task, VIRTIO_BLK_S_UNSUPP);
487 			return -1;
488 		}
489 
490 		rc = spdk_bdev_write_zeroes(bvdev->bdev_desc, bvsession->io_channel,
491 					    desc->sector * 512, desc->num_sectors * 512,
492 					    blk_request_complete_cb, task);
493 		if (rc) {
494 			if (rc == -ENOMEM) {
495 				SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK, "No memory, start to queue io.\n");
496 				blk_request_queue_io(task);
497 			} else {
498 				invalid_blk_request(task, VIRTIO_BLK_S_IOERR);
499 				return -1;
500 			}
501 		}
502 		break;
503 	case VIRTIO_BLK_T_FLUSH:
504 		flush_bytes = spdk_bdev_get_num_blocks(bvdev->bdev) * spdk_bdev_get_block_size(bvdev->bdev);
505 		if (req->sector != 0) {
506 			SPDK_NOTICELOG("sector must be zero for flush command\n");
507 			invalid_blk_request(task, VIRTIO_BLK_S_IOERR);
508 			return -1;
509 		}
510 		rc = spdk_bdev_flush(bvdev->bdev_desc, bvsession->io_channel,
511 				     0, flush_bytes,
512 				     blk_request_complete_cb, task);
513 		if (rc) {
514 			if (rc == -ENOMEM) {
515 				SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK, "No memory, start to queue io.\n");
516 				blk_request_queue_io(task);
517 			} else {
518 				invalid_blk_request(task, VIRTIO_BLK_S_IOERR);
519 				return -1;
520 			}
521 		}
522 		break;
523 	case VIRTIO_BLK_T_GET_ID:
524 		if (!task->iovcnt || !payload_len) {
525 			invalid_blk_request(task, VIRTIO_BLK_S_UNSUPP);
526 			return -1;
527 		}
528 		task->used_len = spdk_min((size_t)VIRTIO_BLK_ID_BYTES, task->iovs[1].iov_len);
529 		spdk_strcpy_pad(task->iovs[1].iov_base, spdk_bdev_get_product_name(bvdev->bdev),
530 				task->used_len, ' ');
531 		blk_request_finish(true, task);
532 		break;
533 	default:
534 		SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK, "Not supported request type '%"PRIu32"'.\n", type);
535 		invalid_blk_request(task, VIRTIO_BLK_S_UNSUPP);
536 		return -1;
537 	}
538 
539 	return 0;
540 }
541 
542 static void
543 process_blk_task(struct spdk_vhost_virtqueue *vq, uint16_t req_idx)
544 {
545 	struct spdk_vhost_blk_task *task;
546 	uint16_t task_idx = req_idx, num_descs;
547 
548 	if (vq->packed.packed_ring) {
549 		/* Packed ring used the buffer_id as the task_idx to get task struct.
550 		 * In kernel driver, it uses the vq->free_head to set the buffer_id so the value
551 		 * must be in the range of 0 ~ vring.size. The free_head value must be unique
552 		 * in the outstanding requests.
553 		 * We can't use the req_idx as the task_idx because the desc can be reused in
554 		 * the next phase even when it's not completed in the previous phase. For example,
555 		 * At phase 0, last_used_idx was 2 and desc0 was not completed.Then after moving
556 		 * phase 1, last_avail_idx is updated to 1. In this case, req_idx can not be used
557 		 * as task_idx because we will know task[0]->used is true at phase 1.
558 		 * The split queue is quite different, the desc would insert into the free list when
559 		 * device completes the request, the driver gets the desc from the free list which
560 		 * ensures the req_idx is unique in the outstanding requests.
561 		 */
562 		task_idx = vhost_vring_packed_desc_get_buffer_id(vq, req_idx, &num_descs);
563 	}
564 
565 	task = &((struct spdk_vhost_blk_task *)vq->tasks)[task_idx];
566 	if (spdk_unlikely(task->used)) {
567 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
568 			    task->bvsession->vsession.name, task_idx);
569 		task->used_len = 0;
570 		blk_task_enqueue(task);
571 		return;
572 	}
573 
574 	if (vq->packed.packed_ring) {
575 		task->req_idx = req_idx;
576 		task->num_descs = num_descs;
577 		task->buffer_id = task_idx;
578 	}
579 
580 	task->bvsession->vsession.task_cnt++;
581 
582 	blk_task_init(task);
583 
584 	if (process_blk_request(task, task->bvsession, vq) == 0) {
585 		SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK, "====== Task %p req_idx %d submitted ======\n", task,
586 			      task_idx);
587 	} else {
588 		SPDK_ERRLOG("====== Task %p req_idx %d failed ======\n", task, task_idx);
589 	}
590 }
591 
592 static void
593 submit_inflight_desc(struct spdk_vhost_blk_session *bvsession,
594 		     struct spdk_vhost_virtqueue *vq)
595 {
596 	struct spdk_vhost_session *vsession = &bvsession->vsession;
597 	spdk_vhost_resubmit_info *resubmit = vq->vring_inflight.resubmit_inflight;
598 	spdk_vhost_resubmit_desc *resubmit_list;
599 	uint16_t req_idx;
600 
601 	if (spdk_likely(resubmit == NULL || resubmit->resubmit_list == NULL)) {
602 		return;
603 	}
604 
605 	resubmit_list = resubmit->resubmit_list;
606 	while (resubmit->resubmit_num-- > 0) {
607 		req_idx = resubmit_list[resubmit->resubmit_num].index;
608 		SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK, "====== Start processing request idx %"PRIu16"======\n",
609 			      req_idx);
610 
611 		if (spdk_unlikely(req_idx >= vq->vring.size)) {
612 			SPDK_ERRLOG("%s: request idx '%"PRIu16"' exceeds virtqueue size (%"PRIu16").\n",
613 				    vsession->name, req_idx, vq->vring.size);
614 			vhost_vq_used_ring_enqueue(vsession, vq, req_idx, 0);
615 			continue;
616 		}
617 
618 		process_blk_task(vq, req_idx);
619 	}
620 
621 	free(resubmit_list);
622 	resubmit->resubmit_list = NULL;
623 }
624 
625 static void
626 process_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
627 {
628 	struct spdk_vhost_session *vsession = &bvsession->vsession;
629 	uint16_t reqs[SPDK_VHOST_VQ_MAX_SUBMISSIONS];
630 	uint16_t reqs_cnt, i;
631 
632 	submit_inflight_desc(bvsession, vq);
633 
634 	reqs_cnt = vhost_vq_avail_ring_get(vq, reqs, SPDK_COUNTOF(reqs));
635 	if (!reqs_cnt) {
636 		return;
637 	}
638 
639 	for (i = 0; i < reqs_cnt; i++) {
640 		SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK, "====== Starting processing request idx %"PRIu16"======\n",
641 			      reqs[i]);
642 
643 		if (spdk_unlikely(reqs[i] >= vq->vring.size)) {
644 			SPDK_ERRLOG("%s: request idx '%"PRIu16"' exceeds virtqueue size (%"PRIu16").\n",
645 				    vsession->name, reqs[i], vq->vring.size);
646 			vhost_vq_used_ring_enqueue(vsession, vq, reqs[i], 0);
647 			continue;
648 		}
649 
650 		rte_vhost_set_inflight_desc_split(vsession->vid, vq->vring_idx, reqs[i]);
651 
652 		process_blk_task(vq, reqs[i]);
653 	}
654 }
655 
656 static void
657 process_packed_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
658 {
659 	uint16_t i = 0;
660 
661 	while (i++ < SPDK_VHOST_VQ_MAX_SUBMISSIONS &&
662 	       vhost_vq_packed_ring_is_avail(vq)) {
663 		SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK, "====== Starting processing request idx %"PRIu16"======\n",
664 			      vq->last_avail_idx);
665 
666 		process_blk_task(vq, vq->last_avail_idx);
667 	}
668 }
669 
670 static int
671 vdev_worker(void *arg)
672 {
673 	struct spdk_vhost_blk_session *bvsession = arg;
674 	struct spdk_vhost_session *vsession = &bvsession->vsession;
675 
676 	uint16_t q_idx;
677 	bool packed_ring;
678 
679 	/* In a session, every vq supports the same format */
680 	packed_ring = vsession->virtqueue[0].packed.packed_ring;
681 	for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
682 		if (packed_ring) {
683 			process_packed_vq(bvsession, &vsession->virtqueue[q_idx]);
684 		} else {
685 			process_vq(bvsession, &vsession->virtqueue[q_idx]);
686 		}
687 	}
688 
689 	vhost_session_used_signal(vsession);
690 
691 	return SPDK_POLLER_BUSY;
692 }
693 
694 static void
695 no_bdev_process_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
696 {
697 	struct spdk_vhost_session *vsession = &bvsession->vsession;
698 	struct iovec iovs[SPDK_VHOST_IOVS_MAX];
699 	uint32_t length;
700 	uint16_t iovcnt, req_idx;
701 
702 	if (vhost_vq_avail_ring_get(vq, &req_idx, 1) != 1) {
703 		return;
704 	}
705 
706 	iovcnt = SPDK_COUNTOF(iovs);
707 	if (blk_iovs_split_queue_setup(bvsession, vq, req_idx, iovs, &iovcnt, &length) == 0) {
708 		*(volatile uint8_t *)iovs[iovcnt - 1].iov_base = VIRTIO_BLK_S_IOERR;
709 		SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK_DATA, "Aborting request %" PRIu16"\n", req_idx);
710 	}
711 
712 	vhost_vq_used_ring_enqueue(vsession, vq, req_idx, 0);
713 }
714 
715 static void
716 no_bdev_process_packed_vq(struct spdk_vhost_blk_session *bvsession, struct spdk_vhost_virtqueue *vq)
717 {
718 	struct spdk_vhost_session *vsession = &bvsession->vsession;
719 	struct spdk_vhost_blk_task *task;
720 	uint32_t length;
721 	uint16_t req_idx = vq->last_avail_idx;
722 	uint16_t task_idx, num_descs;
723 
724 	if (!vhost_vq_packed_ring_is_avail(vq)) {
725 		return;
726 	}
727 
728 	task_idx = vhost_vring_packed_desc_get_buffer_id(vq, req_idx, &num_descs);
729 	task = &((struct spdk_vhost_blk_task *)vq->tasks)[task_idx];
730 	if (spdk_unlikely(task->used)) {
731 		SPDK_ERRLOG("%s: request with idx '%"PRIu16"' is already pending.\n",
732 			    vsession->name, req_idx);
733 		vhost_vq_packed_ring_enqueue(vsession, vq, num_descs,
734 					     task->buffer_id, task->used_len);
735 		return;
736 	}
737 
738 	task->req_idx = req_idx;
739 	task->num_descs = num_descs;
740 	task->buffer_id = task_idx;
741 	blk_task_init(task);
742 
743 	if (blk_iovs_packed_queue_setup(bvsession, vq, task->req_idx, task->iovs, &task->iovcnt,
744 					&length)) {
745 		*(volatile uint8_t *)(task->iovs[task->iovcnt - 1].iov_base) = VIRTIO_BLK_S_IOERR;
746 		SPDK_DEBUGLOG(SPDK_LOG_VHOST_BLK_DATA, "Aborting request %" PRIu16"\n", req_idx);
747 	}
748 
749 	task->used = false;
750 	vhost_vq_packed_ring_enqueue(vsession, vq, num_descs,
751 				     task->buffer_id, task->used_len);
752 }
753 
754 static int
755 no_bdev_vdev_worker(void *arg)
756 {
757 	struct spdk_vhost_blk_session *bvsession = arg;
758 	struct spdk_vhost_session *vsession = &bvsession->vsession;
759 	uint16_t q_idx;
760 	bool packed_ring;
761 
762 	/* In a session, every vq supports the same format */
763 	packed_ring = vsession->virtqueue[0].packed.packed_ring;
764 	for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
765 		if (packed_ring) {
766 			no_bdev_process_packed_vq(bvsession, &vsession->virtqueue[q_idx]);
767 		} else {
768 			no_bdev_process_vq(bvsession, &vsession->virtqueue[q_idx]);
769 		}
770 	}
771 
772 	vhost_session_used_signal(vsession);
773 
774 	if (vsession->task_cnt == 0 && bvsession->io_channel) {
775 		spdk_put_io_channel(bvsession->io_channel);
776 		bvsession->io_channel = NULL;
777 	}
778 
779 	return SPDK_POLLER_BUSY;
780 }
781 
782 static struct spdk_vhost_blk_session *
783 to_blk_session(struct spdk_vhost_session *vsession)
784 {
785 	assert(vsession->vdev->backend == &vhost_blk_device_backend);
786 	return (struct spdk_vhost_blk_session *)vsession;
787 }
788 
789 static struct spdk_vhost_blk_dev *
790 to_blk_dev(struct spdk_vhost_dev *vdev)
791 {
792 	if (vdev == NULL) {
793 		return NULL;
794 	}
795 
796 	if (vdev->backend != &vhost_blk_device_backend) {
797 		SPDK_ERRLOG("%s: not a vhost-blk device\n", vdev->name);
798 		return NULL;
799 	}
800 
801 	return SPDK_CONTAINEROF(vdev, struct spdk_vhost_blk_dev, vdev);
802 }
803 
804 static void
805 vhost_dev_bdev_remove_cpl_cb(struct spdk_vhost_dev *vdev, void *ctx)
806 {
807 
808 	/* All sessions have been notified, time to close the bdev */
809 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
810 
811 	assert(bvdev != NULL);
812 	spdk_put_io_channel(bvdev->dummy_io_channel);
813 	spdk_bdev_close(bvdev->bdev_desc);
814 	bvdev->bdev_desc = NULL;
815 	bvdev->bdev = NULL;
816 }
817 
818 static int
819 vhost_session_bdev_remove_cb(struct spdk_vhost_dev *vdev,
820 			     struct spdk_vhost_session *vsession,
821 			     void *ctx)
822 {
823 	struct spdk_vhost_blk_session *bvsession;
824 
825 	bvsession = (struct spdk_vhost_blk_session *)vsession;
826 	if (bvsession->requestq_poller) {
827 		spdk_poller_unregister(&bvsession->requestq_poller);
828 		bvsession->requestq_poller = SPDK_POLLER_REGISTER(no_bdev_vdev_worker, bvsession, 0);
829 	}
830 
831 	return 0;
832 }
833 
834 static void
835 bdev_remove_cb(void *remove_ctx)
836 {
837 	struct spdk_vhost_blk_dev *bvdev = remove_ctx;
838 
839 	SPDK_WARNLOG("%s: hot-removing bdev - all further requests will fail.\n",
840 		     bvdev->vdev.name);
841 
842 	spdk_vhost_lock();
843 	vhost_dev_foreach_session(&bvdev->vdev, vhost_session_bdev_remove_cb,
844 				  vhost_dev_bdev_remove_cpl_cb, NULL);
845 	spdk_vhost_unlock();
846 }
847 
848 static void
849 free_task_pool(struct spdk_vhost_blk_session *bvsession)
850 {
851 	struct spdk_vhost_session *vsession = &bvsession->vsession;
852 	struct spdk_vhost_virtqueue *vq;
853 	uint16_t i;
854 
855 	for (i = 0; i < vsession->max_queues; i++) {
856 		vq = &vsession->virtqueue[i];
857 		if (vq->tasks == NULL) {
858 			continue;
859 		}
860 
861 		spdk_free(vq->tasks);
862 		vq->tasks = NULL;
863 	}
864 }
865 
866 static int
867 alloc_task_pool(struct spdk_vhost_blk_session *bvsession)
868 {
869 	struct spdk_vhost_session *vsession = &bvsession->vsession;
870 	struct spdk_vhost_virtqueue *vq;
871 	struct spdk_vhost_blk_task *task;
872 	uint32_t task_cnt;
873 	uint16_t i;
874 	uint32_t j;
875 
876 	for (i = 0; i < vsession->max_queues; i++) {
877 		vq = &vsession->virtqueue[i];
878 		if (vq->vring.desc == NULL) {
879 			continue;
880 		}
881 
882 		task_cnt = vq->vring.size;
883 		if (task_cnt > SPDK_VHOST_MAX_VQ_SIZE) {
884 			/* sanity check */
885 			SPDK_ERRLOG("%s: virtuque %"PRIu16" is too big. (size = %"PRIu32", max = %"PRIu32")\n",
886 				    vsession->name, i, task_cnt, SPDK_VHOST_MAX_VQ_SIZE);
887 			free_task_pool(bvsession);
888 			return -1;
889 		}
890 		vq->tasks = spdk_zmalloc(sizeof(struct spdk_vhost_blk_task) * task_cnt,
891 					 SPDK_CACHE_LINE_SIZE, NULL,
892 					 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
893 		if (vq->tasks == NULL) {
894 			SPDK_ERRLOG("%s: failed to allocate %"PRIu32" tasks for virtqueue %"PRIu16"\n",
895 				    vsession->name, task_cnt, i);
896 			free_task_pool(bvsession);
897 			return -1;
898 		}
899 
900 		for (j = 0; j < task_cnt; j++) {
901 			task = &((struct spdk_vhost_blk_task *)vq->tasks)[j];
902 			task->bvsession = bvsession;
903 			task->req_idx = j;
904 			task->vq = vq;
905 		}
906 	}
907 
908 	return 0;
909 }
910 
911 static int
912 vhost_blk_start_cb(struct spdk_vhost_dev *vdev,
913 		   struct spdk_vhost_session *vsession, void *unused)
914 {
915 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
916 	struct spdk_vhost_blk_dev *bvdev;
917 	int i, rc = 0;
918 
919 	bvdev = to_blk_dev(vdev);
920 	assert(bvdev != NULL);
921 	bvsession->bvdev = bvdev;
922 
923 	/* validate all I/O queues are in a contiguous index range */
924 	for (i = 0; i < vsession->max_queues; i++) {
925 		/* vring.desc and vring.desc_packed are in a union struct
926 		 * so q->vring.desc can replace q->vring.desc_packed.
927 		 */
928 		if (vsession->virtqueue[i].vring.desc == NULL) {
929 			SPDK_ERRLOG("%s: queue %"PRIu32" is empty\n", vsession->name, i);
930 			rc = -1;
931 			goto out;
932 		}
933 	}
934 
935 	rc = alloc_task_pool(bvsession);
936 	if (rc != 0) {
937 		SPDK_ERRLOG("%s: failed to alloc task pool.\n", vsession->name);
938 		goto out;
939 	}
940 
941 	if (bvdev->bdev) {
942 		bvsession->io_channel = spdk_bdev_get_io_channel(bvdev->bdev_desc);
943 		if (!bvsession->io_channel) {
944 			free_task_pool(bvsession);
945 			SPDK_ERRLOG("%s: I/O channel allocation failed\n", vsession->name);
946 			rc = -1;
947 			goto out;
948 		}
949 	}
950 
951 	bvsession->requestq_poller = SPDK_POLLER_REGISTER(bvdev->bdev ? vdev_worker : no_bdev_vdev_worker,
952 				     bvsession, 0);
953 	SPDK_INFOLOG(SPDK_LOG_VHOST, "%s: started poller on lcore %d\n",
954 		     vsession->name, spdk_env_get_current_core());
955 out:
956 	vhost_session_start_done(vsession, rc);
957 	return rc;
958 }
959 
960 static int
961 vhost_blk_start(struct spdk_vhost_session *vsession)
962 {
963 	return vhost_session_send_event(vsession, vhost_blk_start_cb,
964 					3, "start session");
965 }
966 
967 static int
968 destroy_session_poller_cb(void *arg)
969 {
970 	struct spdk_vhost_blk_session *bvsession = arg;
971 	struct spdk_vhost_session *vsession = &bvsession->vsession;
972 	int i;
973 
974 	if (vsession->task_cnt > 0) {
975 		return SPDK_POLLER_BUSY;
976 	}
977 
978 	if (spdk_vhost_trylock() != 0) {
979 		return SPDK_POLLER_BUSY;
980 	}
981 
982 	for (i = 0; i < vsession->max_queues; i++) {
983 		vsession->virtqueue[i].next_event_time = 0;
984 		vhost_vq_used_signal(vsession, &vsession->virtqueue[i]);
985 	}
986 
987 	SPDK_INFOLOG(SPDK_LOG_VHOST, "%s: stopping poller on lcore %d\n",
988 		     vsession->name, spdk_env_get_current_core());
989 
990 	if (bvsession->io_channel) {
991 		spdk_put_io_channel(bvsession->io_channel);
992 		bvsession->io_channel = NULL;
993 	}
994 
995 	free_task_pool(bvsession);
996 	spdk_poller_unregister(&bvsession->stop_poller);
997 	vhost_session_stop_done(vsession, 0);
998 
999 	spdk_vhost_unlock();
1000 	return SPDK_POLLER_BUSY;
1001 }
1002 
1003 static int
1004 vhost_blk_stop_cb(struct spdk_vhost_dev *vdev,
1005 		  struct spdk_vhost_session *vsession, void *unused)
1006 {
1007 	struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
1008 
1009 	spdk_poller_unregister(&bvsession->requestq_poller);
1010 	bvsession->stop_poller = SPDK_POLLER_REGISTER(destroy_session_poller_cb,
1011 				 bvsession, 1000);
1012 	return 0;
1013 }
1014 
1015 static int
1016 vhost_blk_stop(struct spdk_vhost_session *vsession)
1017 {
1018 	return vhost_session_send_event(vsession, vhost_blk_stop_cb,
1019 					3, "stop session");
1020 }
1021 
1022 static void
1023 vhost_blk_dump_info_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_ctx *w)
1024 {
1025 	struct spdk_vhost_blk_dev *bvdev;
1026 
1027 	bvdev = to_blk_dev(vdev);
1028 	assert(bvdev != NULL);
1029 
1030 	spdk_json_write_named_object_begin(w, "block");
1031 
1032 	spdk_json_write_named_bool(w, "readonly", bvdev->readonly);
1033 
1034 	spdk_json_write_name(w, "bdev");
1035 	if (bvdev->bdev) {
1036 		spdk_json_write_string(w, spdk_bdev_get_name(bvdev->bdev));
1037 	} else {
1038 		spdk_json_write_null(w);
1039 	}
1040 
1041 	spdk_json_write_object_end(w);
1042 }
1043 
1044 static void
1045 vhost_blk_write_config_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_ctx *w)
1046 {
1047 	struct spdk_vhost_blk_dev *bvdev;
1048 
1049 	bvdev = to_blk_dev(vdev);
1050 	assert(bvdev != NULL);
1051 
1052 	if (!bvdev->bdev) {
1053 		return;
1054 	}
1055 
1056 	spdk_json_write_object_begin(w);
1057 	spdk_json_write_named_string(w, "method", "vhost_create_blk_controller");
1058 
1059 	spdk_json_write_named_object_begin(w, "params");
1060 	spdk_json_write_named_string(w, "ctrlr", vdev->name);
1061 	spdk_json_write_named_string(w, "dev_name", spdk_bdev_get_name(bvdev->bdev));
1062 	spdk_json_write_named_string(w, "cpumask",
1063 				     spdk_cpuset_fmt(spdk_thread_get_cpumask(vdev->thread)));
1064 	spdk_json_write_named_bool(w, "readonly", bvdev->readonly);
1065 	spdk_json_write_object_end(w);
1066 
1067 	spdk_json_write_object_end(w);
1068 }
1069 
1070 static int vhost_blk_destroy(struct spdk_vhost_dev *dev);
1071 
1072 static int
1073 vhost_blk_get_config(struct spdk_vhost_dev *vdev, uint8_t *config,
1074 		     uint32_t len)
1075 {
1076 	struct virtio_blk_config blkcfg;
1077 	struct spdk_vhost_blk_dev *bvdev;
1078 	struct spdk_bdev *bdev;
1079 	uint32_t blk_size;
1080 	uint64_t blkcnt;
1081 
1082 	memset(&blkcfg, 0, sizeof(blkcfg));
1083 	bvdev = to_blk_dev(vdev);
1084 	assert(bvdev != NULL);
1085 	bdev = bvdev->bdev;
1086 	if (bdev == NULL) {
1087 		/* We can't just return -1 here as this GET_CONFIG message might
1088 		 * be caused by a QEMU VM reboot. Returning -1 will indicate an
1089 		 * error to QEMU, who might then decide to terminate itself.
1090 		 * We don't want that. A simple reboot shouldn't break the system.
1091 		 *
1092 		 * Presenting a block device with block size 0 and block count 0
1093 		 * doesn't cause any problems on QEMU side and the virtio-pci
1094 		 * device is even still available inside the VM, but there will
1095 		 * be no block device created for it - the kernel drivers will
1096 		 * silently reject it.
1097 		 */
1098 		blk_size = 0;
1099 		blkcnt = 0;
1100 	} else {
1101 		blk_size = spdk_bdev_get_block_size(bdev);
1102 		blkcnt = spdk_bdev_get_num_blocks(bdev);
1103 		if (spdk_bdev_get_buf_align(bdev) > 1) {
1104 			blkcfg.size_max = SPDK_BDEV_LARGE_BUF_MAX_SIZE;
1105 			blkcfg.seg_max = spdk_min(SPDK_VHOST_IOVS_MAX - 2 - 1, BDEV_IO_NUM_CHILD_IOV - 2 - 1);
1106 		} else {
1107 			blkcfg.size_max = 131072;
1108 			/*  -2 for REQ and RESP and -1 for region boundary splitting */
1109 			blkcfg.seg_max = SPDK_VHOST_IOVS_MAX - 2 - 1;
1110 		}
1111 	}
1112 
1113 	blkcfg.blk_size = blk_size;
1114 	/* minimum I/O size in blocks */
1115 	blkcfg.min_io_size = 1;
1116 	/* expressed in 512 Bytes sectors */
1117 	blkcfg.capacity = (blkcnt * blk_size) / 512;
1118 	/* QEMU can overwrite this value when started */
1119 	blkcfg.num_queues = SPDK_VHOST_MAX_VQUEUES;
1120 
1121 	if (bdev && spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1122 		/* 16MiB, expressed in 512 Bytes */
1123 		blkcfg.max_discard_sectors = 32768;
1124 		blkcfg.max_discard_seg = 1;
1125 		blkcfg.discard_sector_alignment = blk_size / 512;
1126 	}
1127 	if (bdev && spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1128 		blkcfg.max_write_zeroes_sectors = 32768;
1129 		blkcfg.max_write_zeroes_seg = 1;
1130 	}
1131 
1132 	memcpy(config, &blkcfg, spdk_min(len, sizeof(blkcfg)));
1133 
1134 	return 0;
1135 }
1136 
1137 static const struct spdk_vhost_dev_backend vhost_blk_device_backend = {
1138 	.session_ctx_size = sizeof(struct spdk_vhost_blk_session) - sizeof(struct spdk_vhost_session),
1139 	.start_session =  vhost_blk_start,
1140 	.stop_session = vhost_blk_stop,
1141 	.vhost_get_config = vhost_blk_get_config,
1142 	.dump_info_json = vhost_blk_dump_info_json,
1143 	.write_config_json = vhost_blk_write_config_json,
1144 	.remove_device = vhost_blk_destroy,
1145 };
1146 
1147 int
1148 vhost_blk_controller_construct(void)
1149 {
1150 	struct spdk_conf_section *sp;
1151 	unsigned ctrlr_num;
1152 	char *bdev_name;
1153 	char *cpumask;
1154 	char *name;
1155 	bool readonly;
1156 	bool packed_ring;
1157 
1158 	for (sp = spdk_conf_first_section(NULL); sp != NULL; sp = spdk_conf_next_section(sp)) {
1159 		if (!spdk_conf_section_match_prefix(sp, "VhostBlk")) {
1160 			continue;
1161 		}
1162 
1163 		if (sscanf(spdk_conf_section_get_name(sp), "VhostBlk%u", &ctrlr_num) != 1) {
1164 			SPDK_ERRLOG("Section '%s' has non-numeric suffix.\n",
1165 				    spdk_conf_section_get_name(sp));
1166 			return -1;
1167 		}
1168 
1169 		name = spdk_conf_section_get_val(sp, "Name");
1170 		if (name == NULL) {
1171 			SPDK_ERRLOG("VhostBlk%u: missing Name\n", ctrlr_num);
1172 			return -1;
1173 		}
1174 
1175 		cpumask = spdk_conf_section_get_val(sp, "Cpumask");
1176 		readonly = spdk_conf_section_get_boolval(sp, "ReadOnly", false);
1177 		packed_ring = spdk_conf_section_get_boolval(sp, "PackedRing", false);
1178 
1179 		bdev_name = spdk_conf_section_get_val(sp, "Dev");
1180 		if (bdev_name == NULL) {
1181 			continue;
1182 		}
1183 
1184 		if (spdk_vhost_blk_construct(name, cpumask, bdev_name,
1185 					     readonly, packed_ring) < 0) {
1186 			return -1;
1187 		}
1188 	}
1189 
1190 	return 0;
1191 }
1192 
1193 int
1194 spdk_vhost_blk_construct(const char *name, const char *cpumask, const char *dev_name,
1195 			 bool readonly, bool packed_ring)
1196 {
1197 	struct spdk_vhost_blk_dev *bvdev = NULL;
1198 	struct spdk_vhost_dev *vdev;
1199 	struct spdk_bdev *bdev;
1200 	int ret = 0;
1201 
1202 	spdk_vhost_lock();
1203 	bdev = spdk_bdev_get_by_name(dev_name);
1204 	if (bdev == NULL) {
1205 		SPDK_ERRLOG("%s: bdev '%s' not found\n",
1206 			    name, dev_name);
1207 		ret = -ENODEV;
1208 		goto out;
1209 	}
1210 
1211 	bvdev = calloc(1, sizeof(*bvdev));
1212 	if (bvdev == NULL) {
1213 		ret = -ENOMEM;
1214 		goto out;
1215 	}
1216 
1217 	vdev = &bvdev->vdev;
1218 	vdev->virtio_features = SPDK_VHOST_BLK_FEATURES_BASE;
1219 	vdev->disabled_features = SPDK_VHOST_BLK_DISABLED_FEATURES;
1220 	vdev->protocol_features = SPDK_VHOST_BLK_PROTOCOL_FEATURES;
1221 
1222 	vdev->virtio_features |= (uint64_t)packed_ring << VIRTIO_F_RING_PACKED;
1223 
1224 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1225 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_DISCARD);
1226 	}
1227 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1228 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_WRITE_ZEROES);
1229 	}
1230 	if (readonly) {
1231 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_RO);
1232 	}
1233 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_FLUSH)) {
1234 		vdev->virtio_features |= (1ULL << VIRTIO_BLK_F_FLUSH);
1235 	}
1236 
1237 	ret = spdk_bdev_open(bdev, true, bdev_remove_cb, bvdev, &bvdev->bdev_desc);
1238 	if (ret != 0) {
1239 		SPDK_ERRLOG("%s: could not open bdev '%s', error=%d\n",
1240 			    name, dev_name, ret);
1241 		goto out;
1242 	}
1243 
1244 	/*
1245 	 * When starting qemu with vhost-user-blk multiqueue, the vhost device will
1246 	 * be started/stopped many times, related to the queues num, as the
1247 	 * vhost-user backend doesn't know the exact number of queues used for this
1248 	 * device. The target have to stop and start the device once got a valid
1249 	 * IO queue.
1250 	 * When stoping and starting the vhost device, the backend bdev io device
1251 	 * will be deleted and created repeatedly.
1252 	 * Hold a bdev reference so that in the struct spdk_vhost_blk_dev, so that
1253 	 * the io device will not be deleted.
1254 	 */
1255 	bvdev->dummy_io_channel = spdk_bdev_get_io_channel(bvdev->bdev_desc);
1256 
1257 	bvdev->bdev = bdev;
1258 	bvdev->readonly = readonly;
1259 	ret = vhost_dev_register(vdev, name, cpumask, &vhost_blk_device_backend);
1260 	if (ret != 0) {
1261 		spdk_put_io_channel(bvdev->dummy_io_channel);
1262 		spdk_bdev_close(bvdev->bdev_desc);
1263 		goto out;
1264 	}
1265 
1266 	SPDK_INFOLOG(SPDK_LOG_VHOST, "%s: using bdev '%s'\n", name, dev_name);
1267 out:
1268 	if (ret != 0 && bvdev) {
1269 		free(bvdev);
1270 	}
1271 	spdk_vhost_unlock();
1272 	return ret;
1273 }
1274 
1275 static int
1276 vhost_blk_destroy(struct spdk_vhost_dev *vdev)
1277 {
1278 	struct spdk_vhost_blk_dev *bvdev = to_blk_dev(vdev);
1279 	int rc;
1280 
1281 	assert(bvdev != NULL);
1282 
1283 	rc = vhost_dev_unregister(&bvdev->vdev);
1284 	if (rc != 0) {
1285 		return rc;
1286 	}
1287 
1288 	/* if the bdev is removed, don't need call spdk_put_io_channel. */
1289 	if (bvdev->bdev) {
1290 		spdk_put_io_channel(bvdev->dummy_io_channel);
1291 	}
1292 
1293 	if (bvdev->bdev_desc) {
1294 		spdk_bdev_close(bvdev->bdev_desc);
1295 		bvdev->bdev_desc = NULL;
1296 	}
1297 	bvdev->bdev = NULL;
1298 
1299 	free(bvdev);
1300 	return 0;
1301 }
1302 
1303 SPDK_LOG_REGISTER_COMPONENT("vhost_blk", SPDK_LOG_VHOST_BLK)
1304 SPDK_LOG_REGISTER_COMPONENT("vhost_blk_data", SPDK_LOG_VHOST_BLK_DATA)
1305