xref: /spdk/lib/virtio/virtio.c (revision 1fc4165fe9bf8512483356ad8e6d27f793f2e3db)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include <linux/virtio_scsi.h>
37 #include <linux/virtio_pci.h>
38 #include <linux/virtio_config.h>
39 
40 #include <rte_config.h>
41 #include <rte_memcpy.h>
42 #include <rte_string_fns.h>
43 #include <rte_memzone.h>
44 #include <rte_malloc.h>
45 #include <rte_atomic.h>
46 #include <rte_branch_prediction.h>
47 #include <rte_pci.h>
48 #include <rte_common.h>
49 #include <rte_errno.h>
50 
51 #include <rte_eal.h>
52 #include <rte_dev.h>
53 #include <rte_prefetch.h>
54 
55 #include "spdk/env.h"
56 #include "spdk/barrier.h"
57 
58 #include "spdk_internal/virtio.h"
59 
60 /* We use SMP memory barrier variants as all virtio_pci devices
61  * are purely virtual. All MMIO is executed on a CPU core, so
62  * there's no need to do full MMIO synchronization.
63  */
64 #define virtio_mb()	spdk_smp_mb()
65 #define virtio_rmb()	spdk_smp_rmb()
66 #define virtio_wmb()	spdk_smp_wmb()
67 
68 /* Chain all the descriptors in the ring with an END */
69 static inline void
70 vring_desc_init(struct vring_desc *dp, uint16_t n)
71 {
72 	uint16_t i;
73 
74 	for (i = 0; i < n - 1; i++) {
75 		dp[i].next = (uint16_t)(i + 1);
76 	}
77 	dp[i].next = VQ_RING_DESC_CHAIN_END;
78 }
79 
80 static void
81 virtio_init_vring(struct virtqueue *vq)
82 {
83 	int size = vq->vq_nentries;
84 	struct vring *vr = &vq->vq_ring;
85 	uint8_t *ring_mem = vq->vq_ring_virt_mem;
86 
87 	/*
88 	 * Reinitialise since virtio port might have been stopped and restarted
89 	 */
90 	memset(ring_mem, 0, vq->vq_ring_size);
91 	vring_init(vr, size, ring_mem, VIRTIO_PCI_VRING_ALIGN);
92 	vq->vq_used_cons_idx = 0;
93 	vq->vq_desc_head_idx = 0;
94 	vq->vq_avail_idx = 0;
95 	vq->vq_desc_tail_idx = (uint16_t)(vq->vq_nentries - 1);
96 	vq->vq_free_cnt = vq->vq_nentries;
97 	vq->req_start = VQ_RING_DESC_CHAIN_END;
98 	vq->req_end = VQ_RING_DESC_CHAIN_END;
99 	vq->reqs_finished = 0;
100 	memset(vq->vq_descx, 0, sizeof(struct vq_desc_extra) * vq->vq_nentries);
101 
102 	vring_desc_init(vr->desc, size);
103 
104 	/* Tell the backend not to interrupt us.
105 	 * If F_EVENT_IDX is negotiated, we will always set incredibly high
106 	 * used event idx, so that we will practically never receive an
107 	 * interrupt. See virtqueue_req_flush()
108 	 */
109 	if (vq->vdev->negotiated_features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
110 		vring_used_event(&vq->vq_ring) = UINT16_MAX;
111 	} else {
112 		vq->vq_ring.avail->flags |= VRING_AVAIL_F_NO_INTERRUPT;
113 	}
114 }
115 
116 static int
117 virtio_init_queue(struct virtio_dev *dev, uint16_t vtpci_queue_idx)
118 {
119 	unsigned int vq_size, size;
120 	struct virtqueue *vq;
121 	int rc;
122 
123 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "setting up queue: %"PRIu16"\n", vtpci_queue_idx);
124 
125 	/*
126 	 * Read the virtqueue size from the Queue Size field
127 	 * Always power of 2 and if 0 virtqueue does not exist
128 	 */
129 	vq_size = virtio_dev_backend_ops(dev)->get_queue_size(dev, vtpci_queue_idx);
130 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "vq_size: %u\n", vq_size);
131 	if (vq_size == 0) {
132 		SPDK_ERRLOG("virtqueue %"PRIu16" does not exist\n", vtpci_queue_idx);
133 		return -EINVAL;
134 	}
135 
136 	if (!rte_is_power_of_2(vq_size)) {
137 		SPDK_ERRLOG("virtqueue %"PRIu16" size (%u) is not powerof 2\n",
138 			    vtpci_queue_idx, vq_size);
139 		return -EINVAL;
140 	}
141 
142 	size = RTE_ALIGN_CEIL(sizeof(*vq) +
143 			      vq_size * sizeof(struct vq_desc_extra),
144 			      RTE_CACHE_LINE_SIZE);
145 
146 	vq = spdk_dma_zmalloc(size, RTE_CACHE_LINE_SIZE, NULL);
147 	if (vq == NULL) {
148 		SPDK_ERRLOG("can not allocate vq\n");
149 		return -ENOMEM;
150 	}
151 	dev->vqs[vtpci_queue_idx] = vq;
152 
153 	vq->vdev = dev;
154 	vq->vq_queue_index = vtpci_queue_idx;
155 	vq->vq_nentries = vq_size;
156 
157 	/*
158 	 * Reserve a memzone for vring elements
159 	 */
160 	size = vring_size(vq_size, VIRTIO_PCI_VRING_ALIGN);
161 	vq->vq_ring_size = RTE_ALIGN_CEIL(size, VIRTIO_PCI_VRING_ALIGN);
162 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "vring_size: %u, rounded_vring_size: %u\n",
163 		      size, vq->vq_ring_size);
164 
165 	vq->owner_thread = NULL;
166 
167 	rc = virtio_dev_backend_ops(dev)->setup_queue(dev, vq);
168 	if (rc < 0) {
169 		SPDK_ERRLOG("setup_queue failed\n");
170 		spdk_dma_free(vq);
171 		dev->vqs[vtpci_queue_idx] = NULL;
172 		return rc;
173 	}
174 
175 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "vq->vq_ring_mem:      0x%" PRIx64 "\n",
176 		      vq->vq_ring_mem);
177 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "vq->vq_ring_virt_mem: 0x%" PRIx64 "\n",
178 		      (uint64_t)(uintptr_t)vq->vq_ring_virt_mem);
179 
180 	virtio_init_vring(vq);
181 	return 0;
182 }
183 
184 static void
185 virtio_free_queues(struct virtio_dev *dev)
186 {
187 	uint16_t nr_vq = dev->max_queues;
188 	struct virtqueue *vq;
189 	uint16_t i;
190 
191 	if (dev->vqs == NULL) {
192 		return;
193 	}
194 
195 	for (i = 0; i < nr_vq; i++) {
196 		vq = dev->vqs[i];
197 		if (!vq) {
198 			continue;
199 		}
200 
201 		virtio_dev_backend_ops(dev)->del_queue(dev, vq);
202 
203 		rte_free(vq);
204 		dev->vqs[i] = NULL;
205 	}
206 
207 	rte_free(dev->vqs);
208 	dev->vqs = NULL;
209 }
210 
211 static int
212 virtio_alloc_queues(struct virtio_dev *dev, uint16_t request_vq_num, uint16_t fixed_vq_num)
213 {
214 	uint16_t nr_vq;
215 	uint16_t i;
216 	int ret;
217 
218 	nr_vq = request_vq_num + fixed_vq_num;
219 	if (nr_vq == 0) {
220 		/* perfectly fine to have a device with no virtqueues. */
221 		return 0;
222 	}
223 
224 	assert(dev->vqs == NULL);
225 	dev->vqs = rte_zmalloc(NULL, sizeof(struct virtqueue *) * nr_vq, 0);
226 	if (!dev->vqs) {
227 		SPDK_ERRLOG("failed to allocate %"PRIu16" vqs\n", nr_vq);
228 		return -ENOMEM;
229 	}
230 
231 	for (i = 0; i < nr_vq; i++) {
232 		ret = virtio_init_queue(dev, i);
233 		if (ret < 0) {
234 			virtio_free_queues(dev);
235 			return ret;
236 		}
237 	}
238 
239 	dev->max_queues = nr_vq;
240 	dev->fixed_queues_num = fixed_vq_num;
241 	return 0;
242 }
243 
244 /**
245  * Negotiate virtio features. For virtio_user this will also set
246  * dev->modern flag if VIRTIO_F_VERSION_1 flag is negotiated.
247  */
248 static int
249 virtio_negotiate_features(struct virtio_dev *dev, uint64_t req_features)
250 {
251 	uint64_t host_features = virtio_dev_backend_ops(dev)->get_features(dev);
252 	int rc;
253 
254 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "guest features = %" PRIx64 "\n", req_features);
255 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "device features = %" PRIx64 "\n", host_features);
256 
257 	rc = virtio_dev_backend_ops(dev)->set_features(dev, req_features & host_features);
258 	if (rc != 0) {
259 		SPDK_ERRLOG("failed to negotiate device features.\n");
260 		return rc;
261 	}
262 
263 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "negotiated features = %" PRIx64 "\n",
264 		      dev->negotiated_features);
265 
266 	virtio_dev_set_status(dev, VIRTIO_CONFIG_S_FEATURES_OK);
267 	if (!(virtio_dev_get_status(dev) & VIRTIO_CONFIG_S_FEATURES_OK)) {
268 		SPDK_ERRLOG("failed to set FEATURES_OK status!\n");
269 		/* either the device failed, or we offered some features that
270 		 * depend on other, not offered features.
271 		 */
272 		return -EINVAL;
273 	}
274 
275 	return 0;
276 }
277 
278 int
279 virtio_dev_construct(struct virtio_dev *vdev, const char *name,
280 		     const struct virtio_dev_ops *ops, void *ctx)
281 {
282 	int rc;
283 
284 	vdev->name = strdup(name);
285 	if (vdev->name == NULL) {
286 		return -ENOMEM;
287 	}
288 
289 	rc = pthread_mutex_init(&vdev->mutex, NULL);
290 	if (rc != 0) {
291 		free(vdev->name);
292 		return -rc;
293 	}
294 
295 	vdev->backend_ops = ops;
296 	vdev->ctx = ctx;
297 
298 	return 0;
299 }
300 
301 int
302 virtio_dev_reset(struct virtio_dev *dev, uint64_t req_features)
303 {
304 	req_features |= (1ULL << VIRTIO_F_VERSION_1);
305 
306 	virtio_dev_stop(dev);
307 
308 	virtio_dev_set_status(dev, VIRTIO_CONFIG_S_ACKNOWLEDGE);
309 	if (!(virtio_dev_get_status(dev) & VIRTIO_CONFIG_S_ACKNOWLEDGE)) {
310 		SPDK_ERRLOG("Failed to set VIRTIO_CONFIG_S_ACKNOWLEDGE status.\n");
311 		return -EIO;
312 	}
313 
314 	virtio_dev_set_status(dev, VIRTIO_CONFIG_S_DRIVER);
315 	if (!(virtio_dev_get_status(dev) & VIRTIO_CONFIG_S_DRIVER)) {
316 		SPDK_ERRLOG("Failed to set VIRTIO_CONFIG_S_DRIVER status.\n");
317 		return -EIO;
318 	}
319 
320 	return virtio_negotiate_features(dev, req_features);
321 }
322 
323 int
324 virtio_dev_start(struct virtio_dev *vdev, uint16_t max_queues, uint16_t fixed_queue_num)
325 {
326 	int ret;
327 
328 	ret = virtio_alloc_queues(vdev, max_queues, fixed_queue_num);
329 	if (ret < 0) {
330 		return ret;
331 	}
332 
333 	virtio_dev_set_status(vdev, VIRTIO_CONFIG_S_DRIVER_OK);
334 	if (!(virtio_dev_get_status(vdev) & VIRTIO_CONFIG_S_DRIVER_OK)) {
335 		SPDK_ERRLOG("Failed to set VIRTIO_CONFIG_S_DRIVER_OK status.\n");
336 		return -1;
337 	}
338 
339 	return 0;
340 }
341 
342 void
343 virtio_dev_destruct(struct virtio_dev *dev)
344 {
345 	virtio_dev_backend_ops(dev)->destruct_dev(dev);
346 	pthread_mutex_destroy(&dev->mutex);
347 	free(dev->name);
348 }
349 
350 static void
351 vq_ring_free_chain(struct virtqueue *vq, uint16_t desc_idx)
352 {
353 	struct vring_desc *dp, *dp_tail;
354 	struct vq_desc_extra *dxp;
355 	uint16_t desc_idx_last = desc_idx;
356 
357 	dp  = &vq->vq_ring.desc[desc_idx];
358 	dxp = &vq->vq_descx[desc_idx];
359 	vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt + dxp->ndescs);
360 	if ((dp->flags & VRING_DESC_F_INDIRECT) == 0) {
361 		while (dp->flags & VRING_DESC_F_NEXT) {
362 			desc_idx_last = dp->next;
363 			dp = &vq->vq_ring.desc[dp->next];
364 		}
365 	}
366 	dxp->ndescs = 0;
367 
368 	/*
369 	 * We must append the existing free chain, if any, to the end of
370 	 * newly freed chain. If the virtqueue was completely used, then
371 	 * head would be VQ_RING_DESC_CHAIN_END (ASSERTed above).
372 	 */
373 	if (vq->vq_desc_tail_idx == VQ_RING_DESC_CHAIN_END) {
374 		vq->vq_desc_head_idx = desc_idx;
375 	} else {
376 		dp_tail = &vq->vq_ring.desc[vq->vq_desc_tail_idx];
377 		dp_tail->next = desc_idx;
378 	}
379 
380 	vq->vq_desc_tail_idx = desc_idx_last;
381 	dp->next = VQ_RING_DESC_CHAIN_END;
382 }
383 
384 static uint16_t
385 virtqueue_dequeue_burst_rx(struct virtqueue *vq, void **rx_pkts,
386 			   uint32_t *len, uint16_t num)
387 {
388 	struct vring_used_elem *uep;
389 	struct virtio_req *cookie;
390 	uint16_t used_idx, desc_idx;
391 	uint16_t i;
392 
393 	/*  Caller does the check */
394 	for (i = 0; i < num ; i++) {
395 		used_idx = (uint16_t)(vq->vq_used_cons_idx & (vq->vq_nentries - 1));
396 		uep = &vq->vq_ring.used->ring[used_idx];
397 		desc_idx = (uint16_t) uep->id;
398 		len[i] = uep->len;
399 		cookie = (struct virtio_req *)vq->vq_descx[desc_idx].cookie;
400 
401 		if (spdk_unlikely(cookie == NULL)) {
402 			SPDK_WARNLOG("vring descriptor with no mbuf cookie at %"PRIu16"\n",
403 				     vq->vq_used_cons_idx);
404 			break;
405 		}
406 
407 		rte_prefetch0(cookie);
408 		rx_pkts[i]  = cookie;
409 		vq->vq_used_cons_idx++;
410 		vq_ring_free_chain(vq, desc_idx);
411 		vq->vq_descx[desc_idx].cookie = NULL;
412 	}
413 
414 	return i;
415 }
416 
417 static void
418 finish_req(struct virtqueue *vq)
419 {
420 	struct vring_desc *desc;
421 	uint16_t avail_idx;
422 
423 	desc = &vq->vq_ring.desc[vq->req_end];
424 	desc->flags &= ~VRING_DESC_F_NEXT;
425 
426 	/*
427 	 * Place the head of the descriptor chain into the next slot and make
428 	 * it usable to the host. The chain is made available now rather than
429 	 * deferring to virtqueue_req_flush() in the hopes that if the host is
430 	 * currently running on another CPU, we can keep it processing the new
431 	 * descriptor.
432 	 */
433 	avail_idx = (uint16_t)(vq->vq_avail_idx & (vq->vq_nentries - 1));
434 	vq->vq_ring.avail->ring[avail_idx] = vq->req_start;
435 	vq->vq_avail_idx++;
436 	vq->req_end = VQ_RING_DESC_CHAIN_END;
437 	virtio_wmb();
438 	vq->vq_ring.avail->idx = vq->vq_avail_idx;
439 	vq->reqs_finished++;
440 }
441 
442 int
443 virtqueue_req_start(struct virtqueue *vq, void *cookie, int iovcnt)
444 {
445 	struct vq_desc_extra *dxp;
446 
447 	if (iovcnt > vq->vq_free_cnt) {
448 		return iovcnt > vq->vq_nentries ? -EINVAL : -ENOMEM;
449 	}
450 
451 	if (vq->req_end != VQ_RING_DESC_CHAIN_END) {
452 		finish_req(vq);
453 	}
454 
455 	vq->req_start = vq->vq_desc_head_idx;
456 	dxp = &vq->vq_descx[vq->req_start];
457 	dxp->cookie = cookie;
458 	dxp->ndescs = 0;
459 
460 	return 0;
461 }
462 
463 void
464 virtqueue_req_flush(struct virtqueue *vq)
465 {
466 	uint16_t reqs_finished;
467 
468 	if (vq->req_end == VQ_RING_DESC_CHAIN_END) {
469 		/* no non-empty requests have been started */
470 		return;
471 	}
472 
473 	finish_req(vq);
474 	virtio_mb();
475 
476 	reqs_finished = vq->reqs_finished;
477 	vq->reqs_finished = 0;
478 
479 	if (vq->vdev->negotiated_features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
480 		/* Set used event idx to a value the device will never reach.
481 		 * This effectively disables interrupts.
482 		 */
483 		vring_used_event(&vq->vq_ring) = vq->vq_used_cons_idx - vq->vq_nentries - 1;
484 
485 		if (!vring_need_event(vring_avail_event(&vq->vq_ring),
486 				      vq->vq_avail_idx,
487 				      vq->vq_avail_idx - reqs_finished)) {
488 			return;
489 		}
490 	} else if (vq->vq_ring.used->flags & VRING_USED_F_NO_NOTIFY) {
491 		return;
492 	}
493 
494 	virtio_dev_backend_ops(vq->vdev)->notify_queue(vq->vdev, vq);
495 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "Notified backend after xmit\n");
496 }
497 
498 void
499 virtqueue_req_abort(struct virtqueue *vq)
500 {
501 	struct vring_desc *desc;
502 
503 	if (vq->req_start == VQ_RING_DESC_CHAIN_END) {
504 		/* no requests have been started */
505 		return;
506 	}
507 
508 	desc = &vq->vq_ring.desc[vq->req_end];
509 	desc->flags &= ~VRING_DESC_F_NEXT;
510 
511 	vq_ring_free_chain(vq, vq->req_start);
512 	vq->req_start = VQ_RING_DESC_CHAIN_END;
513 }
514 
515 void
516 virtqueue_req_add_iovs(struct virtqueue *vq, struct iovec *iovs, uint16_t iovcnt,
517 		       enum spdk_virtio_desc_type desc_type)
518 {
519 	struct vring_desc *desc;
520 	struct vq_desc_extra *dxp;
521 	uint16_t i, prev_head, new_head;
522 
523 	assert(vq->req_start != VQ_RING_DESC_CHAIN_END);
524 	assert(iovcnt <= vq->vq_free_cnt);
525 
526 	/* TODO use indirect descriptors if iovcnt is high enough
527 	 * or the caller specifies SPDK_VIRTIO_DESC_F_INDIRECT
528 	 */
529 
530 	prev_head = vq->req_end;
531 	new_head = vq->vq_desc_head_idx;
532 	for (i = 0; i < iovcnt; ++i) {
533 		desc = &vq->vq_ring.desc[new_head];
534 
535 		if (!vq->vdev->is_hw) {
536 			desc->addr  = (uintptr_t)iovs[i].iov_base;
537 		} else {
538 			desc->addr = spdk_vtophys(iovs[i].iov_base, NULL);
539 		}
540 
541 		desc->len = iovs[i].iov_len;
542 		/* always set NEXT flag. unset it on the last descriptor
543 		 * in the request-ending function.
544 		 */
545 		desc->flags = desc_type | VRING_DESC_F_NEXT;
546 
547 		prev_head = new_head;
548 		new_head = desc->next;
549 	}
550 
551 	dxp = &vq->vq_descx[vq->req_start];
552 	dxp->ndescs += iovcnt;
553 
554 	vq->req_end = prev_head;
555 	vq->vq_desc_head_idx = new_head;
556 	vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - iovcnt);
557 	if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END) {
558 		assert(vq->vq_free_cnt == 0);
559 		vq->vq_desc_tail_idx = VQ_RING_DESC_CHAIN_END;
560 	}
561 }
562 
563 #define DESC_PER_CACHELINE (RTE_CACHE_LINE_SIZE / sizeof(struct vring_desc))
564 uint16_t
565 virtio_recv_pkts(struct virtqueue *vq, void **io, uint32_t *len, uint16_t nb_pkts)
566 {
567 	uint16_t nb_used, num;
568 
569 	nb_used = vq->vq_ring.used->idx - vq->vq_used_cons_idx;
570 	virtio_rmb();
571 
572 	num = (uint16_t)(spdk_likely(nb_used <= nb_pkts) ? nb_used : nb_pkts);
573 	if (spdk_likely(num > DESC_PER_CACHELINE)) {
574 		num = num - ((vq->vq_used_cons_idx + num) % DESC_PER_CACHELINE);
575 	}
576 
577 	return virtqueue_dequeue_burst_rx(vq, io, len, num);
578 }
579 
580 int
581 virtio_dev_acquire_queue(struct virtio_dev *vdev, uint16_t index)
582 {
583 	struct virtqueue *vq = NULL;
584 
585 	if (index >= vdev->max_queues) {
586 		SPDK_ERRLOG("requested vq index %"PRIu16" exceeds max queue count %"PRIu16".\n",
587 			    index, vdev->max_queues);
588 		return -1;
589 	}
590 
591 	pthread_mutex_lock(&vdev->mutex);
592 	vq = vdev->vqs[index];
593 	if (vq == NULL || vq->owner_thread != NULL) {
594 		pthread_mutex_unlock(&vdev->mutex);
595 		return -1;
596 	}
597 
598 	vq->owner_thread = spdk_get_thread();
599 	pthread_mutex_unlock(&vdev->mutex);
600 	return 0;
601 }
602 
603 int32_t
604 virtio_dev_find_and_acquire_queue(struct virtio_dev *vdev, uint16_t start_index)
605 {
606 	struct virtqueue *vq = NULL;
607 	uint16_t i;
608 
609 	pthread_mutex_lock(&vdev->mutex);
610 	for (i = start_index; i < vdev->max_queues; ++i) {
611 		vq = vdev->vqs[i];
612 		if (vq != NULL && vq->owner_thread == NULL) {
613 			break;
614 		}
615 	}
616 
617 	if (vq == NULL || i == vdev->max_queues) {
618 		SPDK_ERRLOG("no more unused virtio queues with idx >= %"PRIu16".\n", start_index);
619 		pthread_mutex_unlock(&vdev->mutex);
620 		return -1;
621 	}
622 
623 	vq->owner_thread = spdk_get_thread();
624 	pthread_mutex_unlock(&vdev->mutex);
625 	return i;
626 }
627 
628 struct spdk_thread *
629 virtio_dev_queue_get_thread(struct virtio_dev *vdev, uint16_t index)
630 {
631 	struct spdk_thread *thread = NULL;
632 
633 	if (index >= vdev->max_queues) {
634 		SPDK_ERRLOG("given vq index %"PRIu16" exceeds max queue count %"PRIu16"\n",
635 			    index, vdev->max_queues);
636 		abort(); /* This is not recoverable */
637 	}
638 
639 	pthread_mutex_lock(&vdev->mutex);
640 	thread = vdev->vqs[index]->owner_thread;
641 	pthread_mutex_unlock(&vdev->mutex);
642 
643 	return thread;
644 }
645 
646 bool
647 virtio_dev_queue_is_acquired(struct virtio_dev *vdev, uint16_t index)
648 {
649 	return virtio_dev_queue_get_thread(vdev, index) != NULL;
650 }
651 
652 void
653 virtio_dev_release_queue(struct virtio_dev *vdev, uint16_t index)
654 {
655 	struct virtqueue *vq = NULL;
656 
657 	if (index >= vdev->max_queues) {
658 		SPDK_ERRLOG("given vq index %"PRIu16" exceeds max queue count %"PRIu16".\n",
659 			    index, vdev->max_queues);
660 		return;
661 	}
662 
663 	pthread_mutex_lock(&vdev->mutex);
664 	vq = vdev->vqs[index];
665 	if (vq == NULL) {
666 		SPDK_ERRLOG("virtqueue at index %"PRIu16" is not initialized.\n", index);
667 		pthread_mutex_unlock(&vdev->mutex);
668 		return;
669 	}
670 
671 	assert(vq->owner_thread == spdk_get_thread());
672 	vq->owner_thread = NULL;
673 	pthread_mutex_unlock(&vdev->mutex);
674 }
675 
676 int
677 virtio_dev_read_dev_config(struct virtio_dev *dev, size_t offset,
678 			   void *dst, int length)
679 {
680 	return virtio_dev_backend_ops(dev)->read_dev_cfg(dev, offset, dst, length);
681 }
682 
683 int
684 virtio_dev_write_dev_config(struct virtio_dev *dev, size_t offset,
685 			    const void *src, int length)
686 {
687 	return virtio_dev_backend_ops(dev)->write_dev_cfg(dev, offset, src, length);
688 }
689 
690 void
691 virtio_dev_stop(struct virtio_dev *dev)
692 {
693 	virtio_dev_backend_ops(dev)->set_status(dev, VIRTIO_CONFIG_S_RESET);
694 	/* flush status write */
695 	virtio_dev_backend_ops(dev)->get_status(dev);
696 	virtio_free_queues(dev);
697 }
698 
699 void
700 virtio_dev_set_status(struct virtio_dev *dev, uint8_t status)
701 {
702 	if (status != VIRTIO_CONFIG_S_RESET) {
703 		status |= virtio_dev_backend_ops(dev)->get_status(dev);
704 	}
705 
706 	virtio_dev_backend_ops(dev)->set_status(dev, status);
707 }
708 
709 uint8_t
710 virtio_dev_get_status(struct virtio_dev *dev)
711 {
712 	return virtio_dev_backend_ops(dev)->get_status(dev);
713 }
714 
715 const struct virtio_dev_ops *
716 virtio_dev_backend_ops(struct virtio_dev *dev)
717 {
718 	return dev->backend_ops;
719 }
720 
721 void
722 virtio_dev_dump_json_info(struct virtio_dev *hw, struct spdk_json_write_ctx *w)
723 {
724 	spdk_json_write_named_object_begin(w, "virtio");
725 
726 	spdk_json_write_named_uint32(w, "vq_count", hw->max_queues);
727 
728 	spdk_json_write_named_uint32(w, "vq_size",
729 				     virtio_dev_backend_ops(hw)->get_queue_size(hw, 0));
730 
731 	virtio_dev_backend_ops(hw)->dump_json_info(hw, w);
732 
733 	spdk_json_write_object_end(w);
734 }
735 
736 SPDK_LOG_REGISTER_COMPONENT("virtio_dev", SPDK_LOG_VIRTIO_DEV)
737