xref: /spdk/lib/virtio/virtio.c (revision c4d9daeb7bf491bc0eb6e8d417b75d44773cb009)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include <linux/virtio_scsi.h>
37 #include <linux/virtio_pci.h>
38 #include <linux/virtio_config.h>
39 
40 #include <rte_config.h>
41 #include <rte_memcpy.h>
42 #include <rte_string_fns.h>
43 #include <rte_memzone.h>
44 #include <rte_malloc.h>
45 #include <rte_atomic.h>
46 #include <rte_branch_prediction.h>
47 #include <rte_pci.h>
48 #include <rte_common.h>
49 #include <rte_errno.h>
50 
51 #include <rte_eal.h>
52 #include <rte_dev.h>
53 #include <rte_prefetch.h>
54 
55 #include "spdk/env.h"
56 #include "spdk/barrier.h"
57 
58 #include "spdk_internal/virtio.h"
59 
60 /* We use SMP memory barrier variants as all virtio_pci devices
61  * are purely virtual. All MMIO is executed on a CPU core, so
62  * there's no need to do full MMIO synchronization.
63  */
64 #define virtio_mb()	spdk_smp_mb()
65 #define virtio_rmb()	spdk_smp_rmb()
66 #define virtio_wmb()	spdk_smp_wmb()
67 
68 /* Chain all the descriptors in the ring with an END */
69 static inline void
70 vring_desc_init(struct vring_desc *dp, uint16_t n)
71 {
72 	uint16_t i;
73 
74 	for (i = 0; i < n - 1; i++) {
75 		dp[i].next = (uint16_t)(i + 1);
76 	}
77 	dp[i].next = VQ_RING_DESC_CHAIN_END;
78 }
79 
80 static void
81 virtio_init_vring(struct virtqueue *vq)
82 {
83 	int size = vq->vq_nentries;
84 	struct vring *vr = &vq->vq_ring;
85 	uint8_t *ring_mem = vq->vq_ring_virt_mem;
86 
87 	/*
88 	 * Reinitialise since virtio port might have been stopped and restarted
89 	 */
90 	memset(ring_mem, 0, vq->vq_ring_size);
91 	vring_init(vr, size, ring_mem, VIRTIO_PCI_VRING_ALIGN);
92 	vq->vq_used_cons_idx = 0;
93 	vq->vq_desc_head_idx = 0;
94 	vq->vq_avail_idx = 0;
95 	vq->vq_desc_tail_idx = (uint16_t)(vq->vq_nentries - 1);
96 	vq->vq_free_cnt = vq->vq_nentries;
97 	vq->req_start = VQ_RING_DESC_CHAIN_END;
98 	vq->req_end = VQ_RING_DESC_CHAIN_END;
99 	vq->reqs_finished = 0;
100 	memset(vq->vq_descx, 0, sizeof(struct vq_desc_extra) * vq->vq_nentries);
101 
102 	vring_desc_init(vr->desc, size);
103 
104 	/* Tell the backend not to interrupt us.
105 	 * If F_EVENT_IDX is negotiated, we will always set incredibly high
106 	 * used event idx, so that we will practically never receive an
107 	 * interrupt. See virtqueue_req_flush()
108 	 */
109 	if (vq->vdev->negotiated_features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
110 		vring_used_event(&vq->vq_ring) = UINT16_MAX;
111 	} else {
112 		vq->vq_ring.avail->flags |= VRING_AVAIL_F_NO_INTERRUPT;
113 	}
114 }
115 
116 static int
117 virtio_init_queue(struct virtio_dev *dev, uint16_t vtpci_queue_idx)
118 {
119 	unsigned int vq_size, size;
120 	struct virtqueue *vq;
121 	int rc;
122 
123 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "setting up queue: %"PRIu16"\n", vtpci_queue_idx);
124 
125 	/*
126 	 * Read the virtqueue size from the Queue Size field
127 	 * Always power of 2 and if 0 virtqueue does not exist
128 	 */
129 	vq_size = virtio_dev_backend_ops(dev)->get_queue_size(dev, vtpci_queue_idx);
130 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "vq_size: %u\n", vq_size);
131 	if (vq_size == 0) {
132 		SPDK_ERRLOG("virtqueue %"PRIu16" does not exist\n", vtpci_queue_idx);
133 		return -EINVAL;
134 	}
135 
136 	if (!rte_is_power_of_2(vq_size)) {
137 		SPDK_ERRLOG("virtqueue %"PRIu16" size (%u) is not powerof 2\n",
138 			    vtpci_queue_idx, vq_size);
139 		return -EINVAL;
140 	}
141 
142 	size = sizeof(*vq) + vq_size * sizeof(struct vq_desc_extra);
143 
144 	if (posix_memalign((void **)&vq, RTE_CACHE_LINE_SIZE, size)) {
145 		SPDK_ERRLOG("can not allocate vq\n");
146 		return -ENOMEM;
147 	}
148 	memset(vq, 0, size);
149 	dev->vqs[vtpci_queue_idx] = vq;
150 
151 	vq->vdev = dev;
152 	vq->vq_queue_index = vtpci_queue_idx;
153 	vq->vq_nentries = vq_size;
154 
155 	/*
156 	 * Reserve a memzone for vring elements
157 	 */
158 	size = vring_size(vq_size, VIRTIO_PCI_VRING_ALIGN);
159 	vq->vq_ring_size = RTE_ALIGN_CEIL(size, VIRTIO_PCI_VRING_ALIGN);
160 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "vring_size: %u, rounded_vring_size: %u\n",
161 		      size, vq->vq_ring_size);
162 
163 	vq->owner_thread = NULL;
164 
165 	rc = virtio_dev_backend_ops(dev)->setup_queue(dev, vq);
166 	if (rc < 0) {
167 		SPDK_ERRLOG("setup_queue failed\n");
168 		free(vq);
169 		dev->vqs[vtpci_queue_idx] = NULL;
170 		return rc;
171 	}
172 
173 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "vq->vq_ring_mem:      0x%" PRIx64 "\n",
174 		      vq->vq_ring_mem);
175 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "vq->vq_ring_virt_mem: 0x%" PRIx64 "\n",
176 		      (uint64_t)(uintptr_t)vq->vq_ring_virt_mem);
177 
178 	virtio_init_vring(vq);
179 	return 0;
180 }
181 
182 static void
183 virtio_free_queues(struct virtio_dev *dev)
184 {
185 	uint16_t nr_vq = dev->max_queues;
186 	struct virtqueue *vq;
187 	uint16_t i;
188 
189 	if (dev->vqs == NULL) {
190 		return;
191 	}
192 
193 	for (i = 0; i < nr_vq; i++) {
194 		vq = dev->vqs[i];
195 		if (!vq) {
196 			continue;
197 		}
198 
199 		virtio_dev_backend_ops(dev)->del_queue(dev, vq);
200 
201 		free(vq);
202 		dev->vqs[i] = NULL;
203 	}
204 
205 	free(dev->vqs);
206 	dev->vqs = NULL;
207 }
208 
209 static int
210 virtio_alloc_queues(struct virtio_dev *dev, uint16_t request_vq_num, uint16_t fixed_vq_num)
211 {
212 	uint16_t nr_vq;
213 	uint16_t i;
214 	int ret;
215 
216 	nr_vq = request_vq_num + fixed_vq_num;
217 	if (nr_vq == 0) {
218 		/* perfectly fine to have a device with no virtqueues. */
219 		return 0;
220 	}
221 
222 	assert(dev->vqs == NULL);
223 	dev->vqs = calloc(1, sizeof(struct virtqueue *) * nr_vq);
224 	if (!dev->vqs) {
225 		SPDK_ERRLOG("failed to allocate %"PRIu16" vqs\n", nr_vq);
226 		return -ENOMEM;
227 	}
228 
229 	for (i = 0; i < nr_vq; i++) {
230 		ret = virtio_init_queue(dev, i);
231 		if (ret < 0) {
232 			virtio_free_queues(dev);
233 			return ret;
234 		}
235 	}
236 
237 	dev->max_queues = nr_vq;
238 	dev->fixed_queues_num = fixed_vq_num;
239 	return 0;
240 }
241 
242 /**
243  * Negotiate virtio features. For virtio_user this will also set
244  * dev->modern flag if VIRTIO_F_VERSION_1 flag is negotiated.
245  */
246 static int
247 virtio_negotiate_features(struct virtio_dev *dev, uint64_t req_features)
248 {
249 	uint64_t host_features = virtio_dev_backend_ops(dev)->get_features(dev);
250 	int rc;
251 
252 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "guest features = %" PRIx64 "\n", req_features);
253 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "device features = %" PRIx64 "\n", host_features);
254 
255 	rc = virtio_dev_backend_ops(dev)->set_features(dev, req_features & host_features);
256 	if (rc != 0) {
257 		SPDK_ERRLOG("failed to negotiate device features.\n");
258 		return rc;
259 	}
260 
261 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "negotiated features = %" PRIx64 "\n",
262 		      dev->negotiated_features);
263 
264 	virtio_dev_set_status(dev, VIRTIO_CONFIG_S_FEATURES_OK);
265 	if (!(virtio_dev_get_status(dev) & VIRTIO_CONFIG_S_FEATURES_OK)) {
266 		SPDK_ERRLOG("failed to set FEATURES_OK status!\n");
267 		/* either the device failed, or we offered some features that
268 		 * depend on other, not offered features.
269 		 */
270 		return -EINVAL;
271 	}
272 
273 	return 0;
274 }
275 
276 int
277 virtio_dev_construct(struct virtio_dev *vdev, const char *name,
278 		     const struct virtio_dev_ops *ops, void *ctx)
279 {
280 	int rc;
281 
282 	vdev->name = strdup(name);
283 	if (vdev->name == NULL) {
284 		return -ENOMEM;
285 	}
286 
287 	rc = pthread_mutex_init(&vdev->mutex, NULL);
288 	if (rc != 0) {
289 		free(vdev->name);
290 		return -rc;
291 	}
292 
293 	vdev->backend_ops = ops;
294 	vdev->ctx = ctx;
295 
296 	return 0;
297 }
298 
299 int
300 virtio_dev_reset(struct virtio_dev *dev, uint64_t req_features)
301 {
302 	req_features |= (1ULL << VIRTIO_F_VERSION_1);
303 
304 	virtio_dev_stop(dev);
305 
306 	virtio_dev_set_status(dev, VIRTIO_CONFIG_S_ACKNOWLEDGE);
307 	if (!(virtio_dev_get_status(dev) & VIRTIO_CONFIG_S_ACKNOWLEDGE)) {
308 		SPDK_ERRLOG("Failed to set VIRTIO_CONFIG_S_ACKNOWLEDGE status.\n");
309 		return -EIO;
310 	}
311 
312 	virtio_dev_set_status(dev, VIRTIO_CONFIG_S_DRIVER);
313 	if (!(virtio_dev_get_status(dev) & VIRTIO_CONFIG_S_DRIVER)) {
314 		SPDK_ERRLOG("Failed to set VIRTIO_CONFIG_S_DRIVER status.\n");
315 		return -EIO;
316 	}
317 
318 	return virtio_negotiate_features(dev, req_features);
319 }
320 
321 int
322 virtio_dev_start(struct virtio_dev *vdev, uint16_t max_queues, uint16_t fixed_queue_num)
323 {
324 	int ret;
325 
326 	ret = virtio_alloc_queues(vdev, max_queues, fixed_queue_num);
327 	if (ret < 0) {
328 		return ret;
329 	}
330 
331 	virtio_dev_set_status(vdev, VIRTIO_CONFIG_S_DRIVER_OK);
332 	if (!(virtio_dev_get_status(vdev) & VIRTIO_CONFIG_S_DRIVER_OK)) {
333 		SPDK_ERRLOG("Failed to set VIRTIO_CONFIG_S_DRIVER_OK status.\n");
334 		return -1;
335 	}
336 
337 	return 0;
338 }
339 
340 void
341 virtio_dev_destruct(struct virtio_dev *dev)
342 {
343 	virtio_dev_backend_ops(dev)->destruct_dev(dev);
344 	pthread_mutex_destroy(&dev->mutex);
345 	free(dev->name);
346 }
347 
348 static void
349 vq_ring_free_chain(struct virtqueue *vq, uint16_t desc_idx)
350 {
351 	struct vring_desc *dp, *dp_tail;
352 	struct vq_desc_extra *dxp;
353 	uint16_t desc_idx_last = desc_idx;
354 
355 	dp  = &vq->vq_ring.desc[desc_idx];
356 	dxp = &vq->vq_descx[desc_idx];
357 	vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt + dxp->ndescs);
358 	if ((dp->flags & VRING_DESC_F_INDIRECT) == 0) {
359 		while (dp->flags & VRING_DESC_F_NEXT) {
360 			desc_idx_last = dp->next;
361 			dp = &vq->vq_ring.desc[dp->next];
362 		}
363 	}
364 	dxp->ndescs = 0;
365 
366 	/*
367 	 * We must append the existing free chain, if any, to the end of
368 	 * newly freed chain. If the virtqueue was completely used, then
369 	 * head would be VQ_RING_DESC_CHAIN_END (ASSERTed above).
370 	 */
371 	if (vq->vq_desc_tail_idx == VQ_RING_DESC_CHAIN_END) {
372 		vq->vq_desc_head_idx = desc_idx;
373 	} else {
374 		dp_tail = &vq->vq_ring.desc[vq->vq_desc_tail_idx];
375 		dp_tail->next = desc_idx;
376 	}
377 
378 	vq->vq_desc_tail_idx = desc_idx_last;
379 	dp->next = VQ_RING_DESC_CHAIN_END;
380 }
381 
382 static uint16_t
383 virtqueue_dequeue_burst_rx(struct virtqueue *vq, void **rx_pkts,
384 			   uint32_t *len, uint16_t num)
385 {
386 	struct vring_used_elem *uep;
387 	struct virtio_req *cookie;
388 	uint16_t used_idx, desc_idx;
389 	uint16_t i;
390 
391 	/*  Caller does the check */
392 	for (i = 0; i < num ; i++) {
393 		used_idx = (uint16_t)(vq->vq_used_cons_idx & (vq->vq_nentries - 1));
394 		uep = &vq->vq_ring.used->ring[used_idx];
395 		desc_idx = (uint16_t) uep->id;
396 		len[i] = uep->len;
397 		cookie = (struct virtio_req *)vq->vq_descx[desc_idx].cookie;
398 
399 		if (spdk_unlikely(cookie == NULL)) {
400 			SPDK_WARNLOG("vring descriptor with no mbuf cookie at %"PRIu16"\n",
401 				     vq->vq_used_cons_idx);
402 			break;
403 		}
404 
405 		rte_prefetch0(cookie);
406 		rx_pkts[i]  = cookie;
407 		vq->vq_used_cons_idx++;
408 		vq_ring_free_chain(vq, desc_idx);
409 		vq->vq_descx[desc_idx].cookie = NULL;
410 	}
411 
412 	return i;
413 }
414 
415 static void
416 finish_req(struct virtqueue *vq)
417 {
418 	struct vring_desc *desc;
419 	uint16_t avail_idx;
420 
421 	desc = &vq->vq_ring.desc[vq->req_end];
422 	desc->flags &= ~VRING_DESC_F_NEXT;
423 
424 	/*
425 	 * Place the head of the descriptor chain into the next slot and make
426 	 * it usable to the host. The chain is made available now rather than
427 	 * deferring to virtqueue_req_flush() in the hopes that if the host is
428 	 * currently running on another CPU, we can keep it processing the new
429 	 * descriptor.
430 	 */
431 	avail_idx = (uint16_t)(vq->vq_avail_idx & (vq->vq_nentries - 1));
432 	vq->vq_ring.avail->ring[avail_idx] = vq->req_start;
433 	vq->vq_avail_idx++;
434 	vq->req_end = VQ_RING_DESC_CHAIN_END;
435 	virtio_wmb();
436 	vq->vq_ring.avail->idx = vq->vq_avail_idx;
437 	vq->reqs_finished++;
438 }
439 
440 int
441 virtqueue_req_start(struct virtqueue *vq, void *cookie, int iovcnt)
442 {
443 	struct vq_desc_extra *dxp;
444 
445 	if (iovcnt > vq->vq_free_cnt) {
446 		return iovcnt > vq->vq_nentries ? -EINVAL : -ENOMEM;
447 	}
448 
449 	if (vq->req_end != VQ_RING_DESC_CHAIN_END) {
450 		finish_req(vq);
451 	}
452 
453 	vq->req_start = vq->vq_desc_head_idx;
454 	dxp = &vq->vq_descx[vq->req_start];
455 	dxp->cookie = cookie;
456 	dxp->ndescs = 0;
457 
458 	return 0;
459 }
460 
461 void
462 virtqueue_req_flush(struct virtqueue *vq)
463 {
464 	uint16_t reqs_finished;
465 
466 	if (vq->req_end == VQ_RING_DESC_CHAIN_END) {
467 		/* no non-empty requests have been started */
468 		return;
469 	}
470 
471 	finish_req(vq);
472 	virtio_mb();
473 
474 	reqs_finished = vq->reqs_finished;
475 	vq->reqs_finished = 0;
476 
477 	if (vq->vdev->negotiated_features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
478 		/* Set used event idx to a value the device will never reach.
479 		 * This effectively disables interrupts.
480 		 */
481 		vring_used_event(&vq->vq_ring) = vq->vq_used_cons_idx - vq->vq_nentries - 1;
482 
483 		if (!vring_need_event(vring_avail_event(&vq->vq_ring),
484 				      vq->vq_avail_idx,
485 				      vq->vq_avail_idx - reqs_finished)) {
486 			return;
487 		}
488 	} else if (vq->vq_ring.used->flags & VRING_USED_F_NO_NOTIFY) {
489 		return;
490 	}
491 
492 	virtio_dev_backend_ops(vq->vdev)->notify_queue(vq->vdev, vq);
493 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "Notified backend after xmit\n");
494 }
495 
496 void
497 virtqueue_req_abort(struct virtqueue *vq)
498 {
499 	struct vring_desc *desc;
500 
501 	if (vq->req_start == VQ_RING_DESC_CHAIN_END) {
502 		/* no requests have been started */
503 		return;
504 	}
505 
506 	desc = &vq->vq_ring.desc[vq->req_end];
507 	desc->flags &= ~VRING_DESC_F_NEXT;
508 
509 	vq_ring_free_chain(vq, vq->req_start);
510 	vq->req_start = VQ_RING_DESC_CHAIN_END;
511 }
512 
513 void
514 virtqueue_req_add_iovs(struct virtqueue *vq, struct iovec *iovs, uint16_t iovcnt,
515 		       enum spdk_virtio_desc_type desc_type)
516 {
517 	struct vring_desc *desc;
518 	struct vq_desc_extra *dxp;
519 	uint16_t i, prev_head, new_head;
520 
521 	assert(vq->req_start != VQ_RING_DESC_CHAIN_END);
522 	assert(iovcnt <= vq->vq_free_cnt);
523 
524 	/* TODO use indirect descriptors if iovcnt is high enough
525 	 * or the caller specifies SPDK_VIRTIO_DESC_F_INDIRECT
526 	 */
527 
528 	prev_head = vq->req_end;
529 	new_head = vq->vq_desc_head_idx;
530 	for (i = 0; i < iovcnt; ++i) {
531 		desc = &vq->vq_ring.desc[new_head];
532 
533 		if (!vq->vdev->is_hw) {
534 			desc->addr  = (uintptr_t)iovs[i].iov_base;
535 		} else {
536 			desc->addr = spdk_vtophys(iovs[i].iov_base, NULL);
537 		}
538 
539 		desc->len = iovs[i].iov_len;
540 		/* always set NEXT flag. unset it on the last descriptor
541 		 * in the request-ending function.
542 		 */
543 		desc->flags = desc_type | VRING_DESC_F_NEXT;
544 
545 		prev_head = new_head;
546 		new_head = desc->next;
547 	}
548 
549 	dxp = &vq->vq_descx[vq->req_start];
550 	dxp->ndescs += iovcnt;
551 
552 	vq->req_end = prev_head;
553 	vq->vq_desc_head_idx = new_head;
554 	vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - iovcnt);
555 	if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END) {
556 		assert(vq->vq_free_cnt == 0);
557 		vq->vq_desc_tail_idx = VQ_RING_DESC_CHAIN_END;
558 	}
559 }
560 
561 #define DESC_PER_CACHELINE (RTE_CACHE_LINE_SIZE / sizeof(struct vring_desc))
562 uint16_t
563 virtio_recv_pkts(struct virtqueue *vq, void **io, uint32_t *len, uint16_t nb_pkts)
564 {
565 	uint16_t nb_used, num;
566 
567 	nb_used = vq->vq_ring.used->idx - vq->vq_used_cons_idx;
568 	virtio_rmb();
569 
570 	num = (uint16_t)(spdk_likely(nb_used <= nb_pkts) ? nb_used : nb_pkts);
571 	if (spdk_likely(num > DESC_PER_CACHELINE)) {
572 		num = num - ((vq->vq_used_cons_idx + num) % DESC_PER_CACHELINE);
573 	}
574 
575 	return virtqueue_dequeue_burst_rx(vq, io, len, num);
576 }
577 
578 int
579 virtio_dev_acquire_queue(struct virtio_dev *vdev, uint16_t index)
580 {
581 	struct virtqueue *vq = NULL;
582 
583 	if (index >= vdev->max_queues) {
584 		SPDK_ERRLOG("requested vq index %"PRIu16" exceeds max queue count %"PRIu16".\n",
585 			    index, vdev->max_queues);
586 		return -1;
587 	}
588 
589 	pthread_mutex_lock(&vdev->mutex);
590 	vq = vdev->vqs[index];
591 	if (vq == NULL || vq->owner_thread != NULL) {
592 		pthread_mutex_unlock(&vdev->mutex);
593 		return -1;
594 	}
595 
596 	vq->owner_thread = spdk_get_thread();
597 	pthread_mutex_unlock(&vdev->mutex);
598 	return 0;
599 }
600 
601 int32_t
602 virtio_dev_find_and_acquire_queue(struct virtio_dev *vdev, uint16_t start_index)
603 {
604 	struct virtqueue *vq = NULL;
605 	uint16_t i;
606 
607 	pthread_mutex_lock(&vdev->mutex);
608 	for (i = start_index; i < vdev->max_queues; ++i) {
609 		vq = vdev->vqs[i];
610 		if (vq != NULL && vq->owner_thread == NULL) {
611 			break;
612 		}
613 	}
614 
615 	if (vq == NULL || i == vdev->max_queues) {
616 		SPDK_ERRLOG("no more unused virtio queues with idx >= %"PRIu16".\n", start_index);
617 		pthread_mutex_unlock(&vdev->mutex);
618 		return -1;
619 	}
620 
621 	vq->owner_thread = spdk_get_thread();
622 	pthread_mutex_unlock(&vdev->mutex);
623 	return i;
624 }
625 
626 struct spdk_thread *
627 virtio_dev_queue_get_thread(struct virtio_dev *vdev, uint16_t index)
628 {
629 	struct spdk_thread *thread = NULL;
630 
631 	if (index >= vdev->max_queues) {
632 		SPDK_ERRLOG("given vq index %"PRIu16" exceeds max queue count %"PRIu16"\n",
633 			    index, vdev->max_queues);
634 		abort(); /* This is not recoverable */
635 	}
636 
637 	pthread_mutex_lock(&vdev->mutex);
638 	thread = vdev->vqs[index]->owner_thread;
639 	pthread_mutex_unlock(&vdev->mutex);
640 
641 	return thread;
642 }
643 
644 bool
645 virtio_dev_queue_is_acquired(struct virtio_dev *vdev, uint16_t index)
646 {
647 	return virtio_dev_queue_get_thread(vdev, index) != NULL;
648 }
649 
650 void
651 virtio_dev_release_queue(struct virtio_dev *vdev, uint16_t index)
652 {
653 	struct virtqueue *vq = NULL;
654 
655 	if (index >= vdev->max_queues) {
656 		SPDK_ERRLOG("given vq index %"PRIu16" exceeds max queue count %"PRIu16".\n",
657 			    index, vdev->max_queues);
658 		return;
659 	}
660 
661 	pthread_mutex_lock(&vdev->mutex);
662 	vq = vdev->vqs[index];
663 	if (vq == NULL) {
664 		SPDK_ERRLOG("virtqueue at index %"PRIu16" is not initialized.\n", index);
665 		pthread_mutex_unlock(&vdev->mutex);
666 		return;
667 	}
668 
669 	assert(vq->owner_thread == spdk_get_thread());
670 	vq->owner_thread = NULL;
671 	pthread_mutex_unlock(&vdev->mutex);
672 }
673 
674 int
675 virtio_dev_read_dev_config(struct virtio_dev *dev, size_t offset,
676 			   void *dst, int length)
677 {
678 	return virtio_dev_backend_ops(dev)->read_dev_cfg(dev, offset, dst, length);
679 }
680 
681 int
682 virtio_dev_write_dev_config(struct virtio_dev *dev, size_t offset,
683 			    const void *src, int length)
684 {
685 	return virtio_dev_backend_ops(dev)->write_dev_cfg(dev, offset, src, length);
686 }
687 
688 void
689 virtio_dev_stop(struct virtio_dev *dev)
690 {
691 	virtio_dev_backend_ops(dev)->set_status(dev, VIRTIO_CONFIG_S_RESET);
692 	/* flush status write */
693 	virtio_dev_backend_ops(dev)->get_status(dev);
694 	virtio_free_queues(dev);
695 }
696 
697 void
698 virtio_dev_set_status(struct virtio_dev *dev, uint8_t status)
699 {
700 	if (status != VIRTIO_CONFIG_S_RESET) {
701 		status |= virtio_dev_backend_ops(dev)->get_status(dev);
702 	}
703 
704 	virtio_dev_backend_ops(dev)->set_status(dev, status);
705 }
706 
707 uint8_t
708 virtio_dev_get_status(struct virtio_dev *dev)
709 {
710 	return virtio_dev_backend_ops(dev)->get_status(dev);
711 }
712 
713 const struct virtio_dev_ops *
714 virtio_dev_backend_ops(struct virtio_dev *dev)
715 {
716 	return dev->backend_ops;
717 }
718 
719 void
720 virtio_dev_dump_json_info(struct virtio_dev *hw, struct spdk_json_write_ctx *w)
721 {
722 	spdk_json_write_named_object_begin(w, "virtio");
723 
724 	spdk_json_write_named_uint32(w, "vq_count", hw->max_queues);
725 
726 	spdk_json_write_named_uint32(w, "vq_size",
727 				     virtio_dev_backend_ops(hw)->get_queue_size(hw, 0));
728 
729 	virtio_dev_backend_ops(hw)->dump_json_info(hw, w);
730 
731 	spdk_json_write_object_end(w);
732 }
733 
734 SPDK_LOG_REGISTER_COMPONENT("virtio_dev", SPDK_LOG_VIRTIO_DEV)
735