xref: /spdk/lib/virtio/virtio.c (revision bcfd6d0fb47bc29d04d5948fa6f1a8bf8e7aa220)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include <linux/virtio_scsi.h>
37 #include <linux/virtio_pci.h>
38 #include <linux/virtio_config.h>
39 
40 #include <rte_config.h>
41 #include <rte_memcpy.h>
42 #include <rte_string_fns.h>
43 #include <rte_memzone.h>
44 #include <rte_malloc.h>
45 #include <rte_atomic.h>
46 #include <rte_branch_prediction.h>
47 #include <rte_pci.h>
48 #include <rte_common.h>
49 #include <rte_errno.h>
50 
51 #include <rte_eal.h>
52 #include <rte_dev.h>
53 #include <rte_prefetch.h>
54 
55 #include "spdk/env.h"
56 #include "spdk/barrier.h"
57 
58 #include "spdk_internal/virtio.h"
59 
60 /* We use SMP memory barrier variants as all virtio_pci devices
61  * are purely virtual. All MMIO is executed on a CPU core, so
62  * there's no need to do full MMIO synchronization.
63  */
64 #define virtio_mb()	spdk_smp_mb()
65 #define virtio_rmb()	spdk_smp_rmb()
66 #define virtio_wmb()	spdk_smp_wmb()
67 
68 /* Chain all the descriptors in the ring with an END */
69 static inline void
70 vring_desc_init(struct vring_desc *dp, uint16_t n)
71 {
72 	uint16_t i;
73 
74 	for (i = 0; i < n - 1; i++) {
75 		dp[i].next = (uint16_t)(i + 1);
76 	}
77 	dp[i].next = VQ_RING_DESC_CHAIN_END;
78 }
79 
80 static void
81 virtio_init_vring(struct virtqueue *vq)
82 {
83 	int size = vq->vq_nentries;
84 	struct vring *vr = &vq->vq_ring;
85 	uint8_t *ring_mem = vq->vq_ring_virt_mem;
86 
87 	/*
88 	 * Reinitialise since virtio port might have been stopped and restarted
89 	 */
90 	memset(ring_mem, 0, vq->vq_ring_size);
91 	vring_init(vr, size, ring_mem, VIRTIO_PCI_VRING_ALIGN);
92 	vq->vq_used_cons_idx = 0;
93 	vq->vq_desc_head_idx = 0;
94 	vq->vq_avail_idx = 0;
95 	vq->vq_desc_tail_idx = (uint16_t)(vq->vq_nentries - 1);
96 	vq->vq_free_cnt = vq->vq_nentries;
97 	vq->req_start = VQ_RING_DESC_CHAIN_END;
98 	vq->req_end = VQ_RING_DESC_CHAIN_END;
99 	vq->reqs_finished = 0;
100 	memset(vq->vq_descx, 0, sizeof(struct vq_desc_extra) * vq->vq_nentries);
101 
102 	vring_desc_init(vr->desc, size);
103 
104 	/* Tell the backend not to interrupt us.
105 	 * If F_EVENT_IDX is negotiated, we will always set incredibly high
106 	 * used event idx, so that we will practically never receive an
107 	 * interrupt. See virtqueue_req_flush()
108 	 */
109 	if (vq->vdev->negotiated_features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
110 		vring_used_event(&vq->vq_ring) = UINT16_MAX;
111 	} else {
112 		vq->vq_ring.avail->flags |= VRING_AVAIL_F_NO_INTERRUPT;
113 	}
114 }
115 
116 static int
117 virtio_init_queue(struct virtio_dev *dev, uint16_t vtpci_queue_idx)
118 {
119 	unsigned int vq_size, size;
120 	struct virtqueue *vq;
121 
122 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "setting up queue: %"PRIu16"\n", vtpci_queue_idx);
123 
124 	/*
125 	 * Read the virtqueue size from the Queue Size field
126 	 * Always power of 2 and if 0 virtqueue does not exist
127 	 */
128 	vq_size = virtio_dev_backend_ops(dev)->get_queue_size(dev, vtpci_queue_idx);
129 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "vq_size: %u\n", vq_size);
130 	if (vq_size == 0) {
131 		SPDK_ERRLOG("virtqueue %"PRIu16" does not exist\n", vtpci_queue_idx);
132 		return -EINVAL;
133 	}
134 
135 	if (!rte_is_power_of_2(vq_size)) {
136 		SPDK_ERRLOG("virtqueue %"PRIu16" size (%u) is not powerof 2\n",
137 			    vtpci_queue_idx, vq_size);
138 		return -EINVAL;
139 	}
140 
141 	size = RTE_ALIGN_CEIL(sizeof(*vq) +
142 			      vq_size * sizeof(struct vq_desc_extra),
143 			      RTE_CACHE_LINE_SIZE);
144 
145 	vq = spdk_dma_zmalloc(size, RTE_CACHE_LINE_SIZE, NULL);
146 	if (vq == NULL) {
147 		SPDK_ERRLOG("can not allocate vq\n");
148 		return -ENOMEM;
149 	}
150 	dev->vqs[vtpci_queue_idx] = vq;
151 
152 	vq->vdev = dev;
153 	vq->vq_queue_index = vtpci_queue_idx;
154 	vq->vq_nentries = vq_size;
155 
156 	/*
157 	 * Reserve a memzone for vring elements
158 	 */
159 	size = vring_size(vq_size, VIRTIO_PCI_VRING_ALIGN);
160 	vq->vq_ring_size = RTE_ALIGN_CEIL(size, VIRTIO_PCI_VRING_ALIGN);
161 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "vring_size: %u, rounded_vring_size: %u\n",
162 		      size, vq->vq_ring_size);
163 
164 	vq->owner_thread = NULL;
165 
166 	if (virtio_dev_backend_ops(dev)->setup_queue(dev, vq) < 0) {
167 		SPDK_ERRLOG("setup_queue failed\n");
168 		return -EINVAL;
169 	}
170 
171 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "vq->vq_ring_mem:      0x%" PRIx64 "\n",
172 		      vq->vq_ring_mem);
173 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "vq->vq_ring_virt_mem: 0x%" PRIx64 "\n",
174 		      (uint64_t)(uintptr_t)vq->vq_ring_virt_mem);
175 
176 	virtio_init_vring(vq);
177 	return 0;
178 }
179 
180 static void
181 virtio_free_queues(struct virtio_dev *dev)
182 {
183 	uint16_t nr_vq = dev->max_queues;
184 	struct virtqueue *vq;
185 	uint16_t i;
186 
187 	if (dev->vqs == NULL) {
188 		return;
189 	}
190 
191 	for (i = 0; i < nr_vq; i++) {
192 		vq = dev->vqs[i];
193 		if (!vq) {
194 			continue;
195 		}
196 
197 		virtio_dev_backend_ops(dev)->del_queue(dev, vq);
198 
199 		rte_free(vq);
200 		dev->vqs[i] = NULL;
201 	}
202 
203 	rte_free(dev->vqs);
204 	dev->vqs = NULL;
205 }
206 
207 static int
208 virtio_alloc_queues(struct virtio_dev *dev, uint16_t request_vq_num, uint16_t fixed_vq_num)
209 {
210 	uint16_t nr_vq;
211 	uint16_t i;
212 	int ret;
213 
214 	nr_vq = request_vq_num + fixed_vq_num;
215 	if (nr_vq == 0) {
216 		/* perfectly fine to have a device with no virtqueues. */
217 		return 0;
218 	}
219 
220 	assert(dev->vqs == NULL);
221 	dev->vqs = rte_zmalloc(NULL, sizeof(struct virtqueue *) * nr_vq, 0);
222 	if (!dev->vqs) {
223 		SPDK_ERRLOG("failed to allocate %"PRIu16" vqs\n", nr_vq);
224 		return -ENOMEM;
225 	}
226 
227 	for (i = 0; i < nr_vq; i++) {
228 		ret = virtio_init_queue(dev, i);
229 		if (ret < 0) {
230 			virtio_free_queues(dev);
231 			return ret;
232 		}
233 	}
234 
235 	dev->max_queues = nr_vq;
236 	dev->fixed_queues_num = fixed_vq_num;
237 	return 0;
238 }
239 
240 /**
241  * Negotiate virtio features. For virtio_user this will also set
242  * dev->modern flag if VIRTIO_F_VERSION_1 flag is negotiated.
243  */
244 static int
245 virtio_negotiate_features(struct virtio_dev *dev, uint64_t req_features)
246 {
247 	uint64_t host_features = virtio_dev_backend_ops(dev)->get_features(dev);
248 	int rc;
249 
250 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "guest features = %" PRIx64 "\n", req_features);
251 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "device features = %" PRIx64 "\n", host_features);
252 
253 	rc = virtio_dev_backend_ops(dev)->set_features(dev, req_features & host_features);
254 	if (rc != 0) {
255 		SPDK_ERRLOG("failed to negotiate device features.\n");
256 		return -1;
257 	}
258 
259 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "negotiated features = %" PRIx64 "\n",
260 		      dev->negotiated_features);
261 
262 	virtio_dev_set_status(dev, VIRTIO_CONFIG_S_FEATURES_OK);
263 	if (!(virtio_dev_get_status(dev) & VIRTIO_CONFIG_S_FEATURES_OK)) {
264 		SPDK_ERRLOG("failed to set FEATURES_OK status!\n");
265 		return -1;
266 	}
267 
268 	return 0;
269 }
270 
271 int
272 virtio_dev_construct(struct virtio_dev *vdev, const char *name,
273 		     const struct virtio_dev_ops *ops, void *ctx)
274 {
275 	int rc;
276 
277 	vdev->name = strdup(name);
278 	if (vdev->name == NULL) {
279 		return -ENOMEM;
280 	}
281 
282 	rc = pthread_mutex_init(&vdev->mutex, NULL);
283 	if (rc != 0) {
284 		free(vdev->name);
285 		return -rc;
286 	}
287 
288 	vdev->backend_ops = ops;
289 	vdev->ctx = ctx;
290 
291 	return 0;
292 }
293 
294 int
295 virtio_dev_reset(struct virtio_dev *dev, uint64_t req_features)
296 {
297 	req_features |= (1ULL << VIRTIO_F_VERSION_1);
298 
299 	virtio_dev_stop(dev);
300 
301 	virtio_dev_set_status(dev, VIRTIO_CONFIG_S_ACKNOWLEDGE);
302 	if (!(virtio_dev_get_status(dev) & VIRTIO_CONFIG_S_ACKNOWLEDGE)) {
303 		SPDK_ERRLOG("Failed to set VIRTIO_CONFIG_S_ACKNOWLEDGE status.\n");
304 		return -1;
305 	}
306 
307 	virtio_dev_set_status(dev, VIRTIO_CONFIG_S_DRIVER);
308 	if (!(virtio_dev_get_status(dev) & VIRTIO_CONFIG_S_DRIVER)) {
309 		SPDK_ERRLOG("Failed to set VIRTIO_CONFIG_S_DRIVER status.\n");
310 		return -1;
311 	}
312 
313 	return virtio_negotiate_features(dev, req_features);
314 }
315 
316 int
317 virtio_dev_start(struct virtio_dev *vdev, uint16_t max_queues, uint16_t fixed_queue_num)
318 {
319 	int ret;
320 
321 	ret = virtio_alloc_queues(vdev, max_queues, fixed_queue_num);
322 	if (ret < 0) {
323 		return ret;
324 	}
325 
326 	virtio_dev_set_status(vdev, VIRTIO_CONFIG_S_DRIVER_OK);
327 	if (!(virtio_dev_get_status(vdev) & VIRTIO_CONFIG_S_DRIVER_OK)) {
328 		SPDK_ERRLOG("Failed to set VIRTIO_CONFIG_S_DRIVER_OK status.\n");
329 		return -1;
330 	}
331 
332 	return 0;
333 }
334 
335 void
336 virtio_dev_destruct(struct virtio_dev *dev)
337 {
338 	virtio_dev_backend_ops(dev)->destruct_dev(dev);
339 	pthread_mutex_destroy(&dev->mutex);
340 	free(dev->name);
341 }
342 
343 static void
344 vq_ring_free_chain(struct virtqueue *vq, uint16_t desc_idx)
345 {
346 	struct vring_desc *dp, *dp_tail;
347 	struct vq_desc_extra *dxp;
348 	uint16_t desc_idx_last = desc_idx;
349 
350 	dp  = &vq->vq_ring.desc[desc_idx];
351 	dxp = &vq->vq_descx[desc_idx];
352 	vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt + dxp->ndescs);
353 	if ((dp->flags & VRING_DESC_F_INDIRECT) == 0) {
354 		while (dp->flags & VRING_DESC_F_NEXT) {
355 			desc_idx_last = dp->next;
356 			dp = &vq->vq_ring.desc[dp->next];
357 		}
358 	}
359 	dxp->ndescs = 0;
360 
361 	/*
362 	 * We must append the existing free chain, if any, to the end of
363 	 * newly freed chain. If the virtqueue was completely used, then
364 	 * head would be VQ_RING_DESC_CHAIN_END (ASSERTed above).
365 	 */
366 	if (vq->vq_desc_tail_idx == VQ_RING_DESC_CHAIN_END) {
367 		vq->vq_desc_head_idx = desc_idx;
368 	} else {
369 		dp_tail = &vq->vq_ring.desc[vq->vq_desc_tail_idx];
370 		dp_tail->next = desc_idx;
371 	}
372 
373 	vq->vq_desc_tail_idx = desc_idx_last;
374 	dp->next = VQ_RING_DESC_CHAIN_END;
375 }
376 
377 static uint16_t
378 virtqueue_dequeue_burst_rx(struct virtqueue *vq, void **rx_pkts,
379 			   uint32_t *len, uint16_t num)
380 {
381 	struct vring_used_elem *uep;
382 	struct virtio_req *cookie;
383 	uint16_t used_idx, desc_idx;
384 	uint16_t i;
385 
386 	/*  Caller does the check */
387 	for (i = 0; i < num ; i++) {
388 		used_idx = (uint16_t)(vq->vq_used_cons_idx & (vq->vq_nentries - 1));
389 		uep = &vq->vq_ring.used->ring[used_idx];
390 		desc_idx = (uint16_t) uep->id;
391 		len[i] = uep->len;
392 		cookie = (struct virtio_req *)vq->vq_descx[desc_idx].cookie;
393 
394 		if (spdk_unlikely(cookie == NULL)) {
395 			SPDK_WARNLOG("vring descriptor with no mbuf cookie at %"PRIu16"\n",
396 				     vq->vq_used_cons_idx);
397 			break;
398 		}
399 
400 		rte_prefetch0(cookie);
401 		rx_pkts[i]  = cookie;
402 		vq->vq_used_cons_idx++;
403 		vq_ring_free_chain(vq, desc_idx);
404 		vq->vq_descx[desc_idx].cookie = NULL;
405 	}
406 
407 	return i;
408 }
409 
410 static void
411 finish_req(struct virtqueue *vq)
412 {
413 	struct vring_desc *desc;
414 	uint16_t avail_idx;
415 
416 	desc = &vq->vq_ring.desc[vq->req_end];
417 	desc->flags &= ~VRING_DESC_F_NEXT;
418 
419 	/*
420 	 * Place the head of the descriptor chain into the next slot and make
421 	 * it usable to the host. The chain is made available now rather than
422 	 * deferring to virtqueue_req_flush() in the hopes that if the host is
423 	 * currently running on another CPU, we can keep it processing the new
424 	 * descriptor.
425 	 */
426 	avail_idx = (uint16_t)(vq->vq_avail_idx & (vq->vq_nentries - 1));
427 	vq->vq_ring.avail->ring[avail_idx] = vq->req_start;
428 	vq->vq_avail_idx++;
429 	vq->req_end = VQ_RING_DESC_CHAIN_END;
430 	virtio_wmb();
431 	vq->vq_ring.avail->idx = vq->vq_avail_idx;
432 	vq->reqs_finished++;
433 }
434 
435 int
436 virtqueue_req_start(struct virtqueue *vq, void *cookie, int iovcnt)
437 {
438 	struct vq_desc_extra *dxp;
439 
440 	if (iovcnt > vq->vq_free_cnt) {
441 		return iovcnt > vq->vq_nentries ? -EINVAL : -ENOMEM;
442 	}
443 
444 	if (vq->req_end != VQ_RING_DESC_CHAIN_END) {
445 		finish_req(vq);
446 	}
447 
448 	vq->req_start = vq->vq_desc_head_idx;
449 	dxp = &vq->vq_descx[vq->req_start];
450 	dxp->cookie = cookie;
451 	dxp->ndescs = 0;
452 
453 	return 0;
454 }
455 
456 void
457 virtqueue_req_flush(struct virtqueue *vq)
458 {
459 	uint16_t reqs_finished;
460 
461 	if (vq->req_end == VQ_RING_DESC_CHAIN_END) {
462 		/* no non-empty requests have been started */
463 		return;
464 	}
465 
466 	finish_req(vq);
467 	virtio_mb();
468 
469 	reqs_finished = vq->reqs_finished;
470 	vq->reqs_finished = 0;
471 
472 	if (vq->vdev->negotiated_features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
473 		/* Set used event idx to a value the device will never reach.
474 		 * This effectively disables interrupts.
475 		 */
476 		vring_used_event(&vq->vq_ring) = vq->vq_used_cons_idx - vq->vq_nentries - 1;
477 
478 		if (!vring_need_event(vring_avail_event(&vq->vq_ring),
479 				      vq->vq_avail_idx,
480 				      vq->vq_avail_idx - reqs_finished)) {
481 			return;
482 		}
483 	} else if (vq->vq_ring.used->flags & VRING_USED_F_NO_NOTIFY) {
484 		return;
485 	}
486 
487 	virtio_dev_backend_ops(vq->vdev)->notify_queue(vq->vdev, vq);
488 	SPDK_DEBUGLOG(SPDK_LOG_VIRTIO_DEV, "Notified backend after xmit\n");
489 }
490 
491 void
492 virtqueue_req_abort(struct virtqueue *vq)
493 {
494 	struct vring_desc *desc;
495 
496 	if (vq->req_start == VQ_RING_DESC_CHAIN_END) {
497 		/* no requests have been started */
498 		return;
499 	}
500 
501 	desc = &vq->vq_ring.desc[vq->req_end];
502 	desc->flags &= ~VRING_DESC_F_NEXT;
503 
504 	vq_ring_free_chain(vq, vq->req_start);
505 	vq->req_start = VQ_RING_DESC_CHAIN_END;
506 }
507 
508 void
509 virtqueue_req_add_iovs(struct virtqueue *vq, struct iovec *iovs, uint16_t iovcnt,
510 		       enum spdk_virtio_desc_type desc_type)
511 {
512 	struct vring_desc *desc;
513 	struct vq_desc_extra *dxp;
514 	uint16_t i, prev_head, new_head;
515 
516 	assert(vq->req_start != VQ_RING_DESC_CHAIN_END);
517 	assert(iovcnt <= vq->vq_free_cnt);
518 
519 	/* TODO use indirect descriptors if iovcnt is high enough
520 	 * or the caller specifies SPDK_VIRTIO_DESC_F_INDIRECT
521 	 */
522 
523 	prev_head = vq->req_end;
524 	new_head = vq->vq_desc_head_idx;
525 	for (i = 0; i < iovcnt; ++i) {
526 		desc = &vq->vq_ring.desc[new_head];
527 
528 		if (!vq->vdev->is_hw) {
529 			desc->addr  = (uintptr_t)iovs[i].iov_base;
530 		} else {
531 			desc->addr = spdk_vtophys(iovs[i].iov_base);
532 		}
533 
534 		desc->len = iovs[i].iov_len;
535 		/* always set NEXT flag. unset it on the last descriptor
536 		 * in the request-ending function.
537 		 */
538 		desc->flags = desc_type | VRING_DESC_F_NEXT;
539 
540 		prev_head = new_head;
541 		new_head = desc->next;
542 	}
543 
544 	dxp = &vq->vq_descx[vq->req_start];
545 	dxp->ndescs += iovcnt;
546 
547 	vq->req_end = prev_head;
548 	vq->vq_desc_head_idx = new_head;
549 	if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END) {
550 		assert(vq->vq_free_cnt == 0);
551 		vq->vq_desc_tail_idx = VQ_RING_DESC_CHAIN_END;
552 	}
553 	vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - iovcnt);
554 }
555 
556 #define DESC_PER_CACHELINE (RTE_CACHE_LINE_SIZE / sizeof(struct vring_desc))
557 uint16_t
558 virtio_recv_pkts(struct virtqueue *vq, void **io, uint32_t *len, uint16_t nb_pkts)
559 {
560 	uint16_t nb_used, num;
561 
562 	nb_used = vq->vq_ring.used->idx - vq->vq_used_cons_idx;
563 	virtio_rmb();
564 
565 	num = (uint16_t)(spdk_likely(nb_used <= nb_pkts) ? nb_used : nb_pkts);
566 	if (spdk_likely(num > DESC_PER_CACHELINE)) {
567 		num = num - ((vq->vq_used_cons_idx + num) % DESC_PER_CACHELINE);
568 	}
569 
570 	return virtqueue_dequeue_burst_rx(vq, io, len, num);
571 }
572 
573 int
574 virtio_dev_acquire_queue(struct virtio_dev *vdev, uint16_t index)
575 {
576 	struct virtqueue *vq = NULL;
577 
578 	if (index >= vdev->max_queues) {
579 		SPDK_ERRLOG("requested vq index %"PRIu16" exceeds max queue count %"PRIu16".\n",
580 			    index, vdev->max_queues);
581 		return -1;
582 	}
583 
584 	pthread_mutex_lock(&vdev->mutex);
585 	vq = vdev->vqs[index];
586 	if (vq == NULL || vq->owner_thread != NULL) {
587 		pthread_mutex_unlock(&vdev->mutex);
588 		return -1;
589 	}
590 
591 	vq->owner_thread = spdk_get_thread();
592 	pthread_mutex_unlock(&vdev->mutex);
593 	return 0;
594 }
595 
596 int32_t
597 virtio_dev_find_and_acquire_queue(struct virtio_dev *vdev, uint16_t start_index)
598 {
599 	struct virtqueue *vq = NULL;
600 	uint16_t i;
601 
602 	pthread_mutex_lock(&vdev->mutex);
603 	for (i = start_index; i < vdev->max_queues; ++i) {
604 		vq = vdev->vqs[i];
605 		if (vq != NULL && vq->owner_thread == NULL) {
606 			break;
607 		}
608 	}
609 
610 	if (vq == NULL || i == vdev->max_queues) {
611 		SPDK_ERRLOG("no more unused virtio queues with idx >= %"PRIu16".\n", start_index);
612 		pthread_mutex_unlock(&vdev->mutex);
613 		return -1;
614 	}
615 
616 	vq->owner_thread = spdk_get_thread();
617 	pthread_mutex_unlock(&vdev->mutex);
618 	return i;
619 }
620 
621 struct spdk_thread *
622 virtio_dev_queue_get_thread(struct virtio_dev *vdev, uint16_t index)
623 {
624 	struct spdk_thread *thread = NULL;
625 
626 	if (index >= vdev->max_queues) {
627 		SPDK_ERRLOG("given vq index %"PRIu16" exceeds max queue count %"PRIu16"\n",
628 			    index, vdev->max_queues);
629 		abort(); /* This is not recoverable */
630 	}
631 
632 	pthread_mutex_lock(&vdev->mutex);
633 	thread = vdev->vqs[index]->owner_thread;
634 	pthread_mutex_unlock(&vdev->mutex);
635 
636 	return thread;
637 }
638 
639 bool
640 virtio_dev_queue_is_acquired(struct virtio_dev *vdev, uint16_t index)
641 {
642 	return virtio_dev_queue_get_thread(vdev, index) != NULL;
643 }
644 
645 void
646 virtio_dev_release_queue(struct virtio_dev *vdev, uint16_t index)
647 {
648 	struct virtqueue *vq = NULL;
649 
650 	if (index >= vdev->max_queues) {
651 		SPDK_ERRLOG("given vq index %"PRIu16" exceeds max queue count %"PRIu16".\n",
652 			    index, vdev->max_queues);
653 		return;
654 	}
655 
656 	pthread_mutex_lock(&vdev->mutex);
657 	vq = vdev->vqs[index];
658 	if (vq == NULL) {
659 		SPDK_ERRLOG("virtqueue at index %"PRIu16" is not initialized.\n", index);
660 		pthread_mutex_unlock(&vdev->mutex);
661 		return;
662 	}
663 
664 	assert(vq->owner_thread == spdk_get_thread());
665 	vq->owner_thread = NULL;
666 	pthread_mutex_unlock(&vdev->mutex);
667 }
668 
669 int
670 virtio_dev_read_dev_config(struct virtio_dev *dev, size_t offset,
671 			   void *dst, int length)
672 {
673 	return virtio_dev_backend_ops(dev)->read_dev_cfg(dev, offset, dst, length);
674 }
675 
676 int
677 virtio_dev_write_dev_config(struct virtio_dev *dev, size_t offset,
678 			    const void *src, int length)
679 {
680 	return virtio_dev_backend_ops(dev)->write_dev_cfg(dev, offset, src, length);
681 }
682 
683 void
684 virtio_dev_stop(struct virtio_dev *dev)
685 {
686 	virtio_dev_backend_ops(dev)->set_status(dev, VIRTIO_CONFIG_S_RESET);
687 	/* flush status write */
688 	virtio_dev_backend_ops(dev)->get_status(dev);
689 	virtio_free_queues(dev);
690 }
691 
692 void
693 virtio_dev_set_status(struct virtio_dev *dev, uint8_t status)
694 {
695 	if (status != VIRTIO_CONFIG_S_RESET) {
696 		status |= virtio_dev_backend_ops(dev)->get_status(dev);
697 	}
698 
699 	virtio_dev_backend_ops(dev)->set_status(dev, status);
700 }
701 
702 uint8_t
703 virtio_dev_get_status(struct virtio_dev *dev)
704 {
705 	return virtio_dev_backend_ops(dev)->get_status(dev);
706 }
707 
708 const struct virtio_dev_ops *
709 virtio_dev_backend_ops(struct virtio_dev *dev)
710 {
711 	return dev->backend_ops;
712 }
713 
714 void
715 virtio_dev_dump_json_info(struct virtio_dev *hw, struct spdk_json_write_ctx *w)
716 {
717 	spdk_json_write_name(w, "virtio");
718 	spdk_json_write_object_begin(w);
719 
720 	spdk_json_write_name(w, "vq_count");
721 	spdk_json_write_uint32(w, hw->max_queues);
722 
723 	spdk_json_write_name(w, "vq_size");
724 	spdk_json_write_uint32(w, virtio_dev_backend_ops(hw)->get_queue_size(hw, 0));
725 
726 	virtio_dev_backend_ops(hw)->dump_json_info(hw, w);
727 
728 	spdk_json_write_object_end(w);
729 }
730 
731 SPDK_LOG_REGISTER_COMPONENT("virtio_dev", SPDK_LOG_VIRTIO_DEV)
732