xref: /spdk/lib/virtio/virtio.c (revision 8bb0ded3e55c182cea67af1f6790f8de5f38c05f)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/util.h"
38 #include "spdk/barrier.h"
39 
40 #include "spdk_internal/virtio.h"
41 
42 /* We use SMP memory barrier variants as all virtio_pci devices
43  * are purely virtual. All MMIO is executed on a CPU core, so
44  * there's no need to do full MMIO synchronization.
45  */
46 #define virtio_mb()	spdk_smp_mb()
47 #define virtio_rmb()	spdk_smp_rmb()
48 #define virtio_wmb()	spdk_smp_wmb()
49 
50 /* Chain all the descriptors in the ring with an END */
51 static inline void
52 vring_desc_init(struct vring_desc *dp, uint16_t n)
53 {
54 	uint16_t i;
55 
56 	for (i = 0; i < n - 1; i++) {
57 		dp[i].next = (uint16_t)(i + 1);
58 	}
59 	dp[i].next = VQ_RING_DESC_CHAIN_END;
60 }
61 
62 static void
63 virtio_init_vring(struct virtqueue *vq)
64 {
65 	int size = vq->vq_nentries;
66 	struct vring *vr = &vq->vq_ring;
67 	uint8_t *ring_mem = vq->vq_ring_virt_mem;
68 
69 	/*
70 	 * Reinitialise since virtio port might have been stopped and restarted
71 	 */
72 	memset(ring_mem, 0, vq->vq_ring_size);
73 	vring_init(vr, size, ring_mem, VIRTIO_PCI_VRING_ALIGN);
74 	vq->vq_used_cons_idx = 0;
75 	vq->vq_desc_head_idx = 0;
76 	vq->vq_avail_idx = 0;
77 	vq->vq_desc_tail_idx = (uint16_t)(vq->vq_nentries - 1);
78 	vq->vq_free_cnt = vq->vq_nentries;
79 	vq->req_start = VQ_RING_DESC_CHAIN_END;
80 	vq->req_end = VQ_RING_DESC_CHAIN_END;
81 	vq->reqs_finished = 0;
82 	memset(vq->vq_descx, 0, sizeof(struct vq_desc_extra) * vq->vq_nentries);
83 
84 	vring_desc_init(vr->desc, size);
85 
86 	/* Tell the backend not to interrupt us.
87 	 * If F_EVENT_IDX is negotiated, we will always set incredibly high
88 	 * used event idx, so that we will practically never receive an
89 	 * interrupt. See virtqueue_req_flush()
90 	 */
91 	if (vq->vdev->negotiated_features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
92 		vring_used_event(&vq->vq_ring) = UINT16_MAX;
93 	} else {
94 		vq->vq_ring.avail->flags |= VRING_AVAIL_F_NO_INTERRUPT;
95 	}
96 }
97 
98 static int
99 virtio_init_queue(struct virtio_dev *dev, uint16_t vtpci_queue_idx)
100 {
101 	unsigned int vq_size, size;
102 	struct virtqueue *vq;
103 	int rc;
104 
105 	SPDK_DEBUGLOG(virtio_dev, "setting up queue: %"PRIu16"\n", vtpci_queue_idx);
106 
107 	/*
108 	 * Read the virtqueue size from the Queue Size field
109 	 * Always power of 2 and if 0 virtqueue does not exist
110 	 */
111 	vq_size = virtio_dev_backend_ops(dev)->get_queue_size(dev, vtpci_queue_idx);
112 	SPDK_DEBUGLOG(virtio_dev, "vq_size: %u\n", vq_size);
113 	if (vq_size == 0) {
114 		SPDK_ERRLOG("virtqueue %"PRIu16" does not exist\n", vtpci_queue_idx);
115 		return -EINVAL;
116 	}
117 
118 	if (!spdk_u32_is_pow2(vq_size)) {
119 		SPDK_ERRLOG("virtqueue %"PRIu16" size (%u) is not powerof 2\n",
120 			    vtpci_queue_idx, vq_size);
121 		return -EINVAL;
122 	}
123 
124 	size = sizeof(*vq) + vq_size * sizeof(struct vq_desc_extra);
125 
126 	if (posix_memalign((void **)&vq, SPDK_CACHE_LINE_SIZE, size)) {
127 		SPDK_ERRLOG("can not allocate vq\n");
128 		return -ENOMEM;
129 	}
130 	memset(vq, 0, size);
131 	dev->vqs[vtpci_queue_idx] = vq;
132 
133 	vq->vdev = dev;
134 	vq->vq_queue_index = vtpci_queue_idx;
135 	vq->vq_nentries = vq_size;
136 
137 	/*
138 	 * Reserve a memzone for vring elements
139 	 */
140 	size = vring_size(vq_size, VIRTIO_PCI_VRING_ALIGN);
141 	vq->vq_ring_size = SPDK_ALIGN_CEIL(size, VIRTIO_PCI_VRING_ALIGN);
142 	SPDK_DEBUGLOG(virtio_dev, "vring_size: %u, rounded_vring_size: %u\n",
143 		      size, vq->vq_ring_size);
144 
145 	vq->owner_thread = NULL;
146 
147 	rc = virtio_dev_backend_ops(dev)->setup_queue(dev, vq);
148 	if (rc < 0) {
149 		SPDK_ERRLOG("setup_queue failed\n");
150 		free(vq);
151 		dev->vqs[vtpci_queue_idx] = NULL;
152 		return rc;
153 	}
154 
155 	SPDK_DEBUGLOG(virtio_dev, "vq->vq_ring_mem:      0x%" PRIx64 "\n",
156 		      vq->vq_ring_mem);
157 	SPDK_DEBUGLOG(virtio_dev, "vq->vq_ring_virt_mem: 0x%" PRIx64 "\n",
158 		      (uint64_t)(uintptr_t)vq->vq_ring_virt_mem);
159 
160 	virtio_init_vring(vq);
161 	return 0;
162 }
163 
164 static void
165 virtio_free_queues(struct virtio_dev *dev)
166 {
167 	uint16_t nr_vq = dev->max_queues;
168 	struct virtqueue *vq;
169 	uint16_t i;
170 
171 	if (dev->vqs == NULL) {
172 		return;
173 	}
174 
175 	for (i = 0; i < nr_vq; i++) {
176 		vq = dev->vqs[i];
177 		if (!vq) {
178 			continue;
179 		}
180 
181 		virtio_dev_backend_ops(dev)->del_queue(dev, vq);
182 
183 		free(vq);
184 		dev->vqs[i] = NULL;
185 	}
186 
187 	free(dev->vqs);
188 	dev->vqs = NULL;
189 }
190 
191 static int
192 virtio_alloc_queues(struct virtio_dev *dev, uint16_t request_vq_num, uint16_t fixed_vq_num)
193 {
194 	uint16_t nr_vq;
195 	uint16_t i;
196 	int ret;
197 
198 	nr_vq = request_vq_num + fixed_vq_num;
199 	if (nr_vq == 0) {
200 		/* perfectly fine to have a device with no virtqueues. */
201 		return 0;
202 	}
203 
204 	assert(dev->vqs == NULL);
205 	dev->vqs = calloc(1, sizeof(struct virtqueue *) * nr_vq);
206 	if (!dev->vqs) {
207 		SPDK_ERRLOG("failed to allocate %"PRIu16" vqs\n", nr_vq);
208 		return -ENOMEM;
209 	}
210 
211 	for (i = 0; i < nr_vq; i++) {
212 		ret = virtio_init_queue(dev, i);
213 		if (ret < 0) {
214 			virtio_free_queues(dev);
215 			return ret;
216 		}
217 	}
218 
219 	dev->max_queues = nr_vq;
220 	dev->fixed_queues_num = fixed_vq_num;
221 	return 0;
222 }
223 
224 /**
225  * Negotiate virtio features. For virtio_user this will also set
226  * dev->modern flag if VIRTIO_F_VERSION_1 flag is negotiated.
227  */
228 static int
229 virtio_negotiate_features(struct virtio_dev *dev, uint64_t req_features)
230 {
231 	uint64_t host_features = virtio_dev_backend_ops(dev)->get_features(dev);
232 	int rc;
233 
234 	SPDK_DEBUGLOG(virtio_dev, "guest features = %" PRIx64 "\n", req_features);
235 	SPDK_DEBUGLOG(virtio_dev, "device features = %" PRIx64 "\n", host_features);
236 
237 	rc = virtio_dev_backend_ops(dev)->set_features(dev, req_features & host_features);
238 	if (rc != 0) {
239 		SPDK_ERRLOG("failed to negotiate device features.\n");
240 		return rc;
241 	}
242 
243 	SPDK_DEBUGLOG(virtio_dev, "negotiated features = %" PRIx64 "\n",
244 		      dev->negotiated_features);
245 
246 	virtio_dev_set_status(dev, VIRTIO_CONFIG_S_FEATURES_OK);
247 	if (!(virtio_dev_get_status(dev) & VIRTIO_CONFIG_S_FEATURES_OK)) {
248 		SPDK_ERRLOG("failed to set FEATURES_OK status!\n");
249 		/* either the device failed, or we offered some features that
250 		 * depend on other, not offered features.
251 		 */
252 		return -EINVAL;
253 	}
254 
255 	return 0;
256 }
257 
258 int
259 virtio_dev_construct(struct virtio_dev *vdev, const char *name,
260 		     const struct virtio_dev_ops *ops, void *ctx)
261 {
262 	int rc;
263 
264 	vdev->name = strdup(name);
265 	if (vdev->name == NULL) {
266 		return -ENOMEM;
267 	}
268 
269 	rc = pthread_mutex_init(&vdev->mutex, NULL);
270 	if (rc != 0) {
271 		free(vdev->name);
272 		return -rc;
273 	}
274 
275 	vdev->backend_ops = ops;
276 	vdev->ctx = ctx;
277 
278 	return 0;
279 }
280 
281 int
282 virtio_dev_reset(struct virtio_dev *dev, uint64_t req_features)
283 {
284 	req_features |= (1ULL << VIRTIO_F_VERSION_1);
285 
286 	virtio_dev_stop(dev);
287 
288 	virtio_dev_set_status(dev, VIRTIO_CONFIG_S_ACKNOWLEDGE);
289 	if (!(virtio_dev_get_status(dev) & VIRTIO_CONFIG_S_ACKNOWLEDGE)) {
290 		SPDK_ERRLOG("Failed to set VIRTIO_CONFIG_S_ACKNOWLEDGE status.\n");
291 		return -EIO;
292 	}
293 
294 	virtio_dev_set_status(dev, VIRTIO_CONFIG_S_DRIVER);
295 	if (!(virtio_dev_get_status(dev) & VIRTIO_CONFIG_S_DRIVER)) {
296 		SPDK_ERRLOG("Failed to set VIRTIO_CONFIG_S_DRIVER status.\n");
297 		return -EIO;
298 	}
299 
300 	return virtio_negotiate_features(dev, req_features);
301 }
302 
303 int
304 virtio_dev_start(struct virtio_dev *vdev, uint16_t max_queues, uint16_t fixed_queue_num)
305 {
306 	int ret;
307 
308 	ret = virtio_alloc_queues(vdev, max_queues, fixed_queue_num);
309 	if (ret < 0) {
310 		return ret;
311 	}
312 
313 	virtio_dev_set_status(vdev, VIRTIO_CONFIG_S_DRIVER_OK);
314 	if (!(virtio_dev_get_status(vdev) & VIRTIO_CONFIG_S_DRIVER_OK)) {
315 		SPDK_ERRLOG("Failed to set VIRTIO_CONFIG_S_DRIVER_OK status.\n");
316 		return -1;
317 	}
318 
319 	return 0;
320 }
321 
322 void
323 virtio_dev_destruct(struct virtio_dev *dev)
324 {
325 	virtio_dev_backend_ops(dev)->destruct_dev(dev);
326 	pthread_mutex_destroy(&dev->mutex);
327 	free(dev->name);
328 }
329 
330 static void
331 vq_ring_free_chain(struct virtqueue *vq, uint16_t desc_idx)
332 {
333 	struct vring_desc *dp, *dp_tail;
334 	struct vq_desc_extra *dxp;
335 	uint16_t desc_idx_last = desc_idx;
336 
337 	dp  = &vq->vq_ring.desc[desc_idx];
338 	dxp = &vq->vq_descx[desc_idx];
339 	vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt + dxp->ndescs);
340 	if ((dp->flags & VRING_DESC_F_INDIRECT) == 0) {
341 		while (dp->flags & VRING_DESC_F_NEXT) {
342 			desc_idx_last = dp->next;
343 			dp = &vq->vq_ring.desc[dp->next];
344 		}
345 	}
346 	dxp->ndescs = 0;
347 
348 	/*
349 	 * We must append the existing free chain, if any, to the end of
350 	 * newly freed chain. If the virtqueue was completely used, then
351 	 * head would be VQ_RING_DESC_CHAIN_END (ASSERTed above).
352 	 */
353 	if (vq->vq_desc_tail_idx == VQ_RING_DESC_CHAIN_END) {
354 		vq->vq_desc_head_idx = desc_idx;
355 	} else {
356 		dp_tail = &vq->vq_ring.desc[vq->vq_desc_tail_idx];
357 		dp_tail->next = desc_idx;
358 	}
359 
360 	vq->vq_desc_tail_idx = desc_idx_last;
361 	dp->next = VQ_RING_DESC_CHAIN_END;
362 }
363 
364 static uint16_t
365 virtqueue_dequeue_burst_rx(struct virtqueue *vq, void **rx_pkts,
366 			   uint32_t *len, uint16_t num)
367 {
368 	struct vring_used_elem *uep;
369 	void *cookie;
370 	uint16_t used_idx, desc_idx;
371 	uint16_t i;
372 
373 	/*  Caller does the check */
374 	for (i = 0; i < num ; i++) {
375 		used_idx = (uint16_t)(vq->vq_used_cons_idx & (vq->vq_nentries - 1));
376 		uep = &vq->vq_ring.used->ring[used_idx];
377 		desc_idx = (uint16_t) uep->id;
378 		len[i] = uep->len;
379 		cookie = vq->vq_descx[desc_idx].cookie;
380 
381 		if (spdk_unlikely(cookie == NULL)) {
382 			SPDK_WARNLOG("vring descriptor with no mbuf cookie at %"PRIu16"\n",
383 				     vq->vq_used_cons_idx);
384 			break;
385 		}
386 
387 		__builtin_prefetch(cookie);
388 
389 		rx_pkts[i]  = cookie;
390 		vq->vq_used_cons_idx++;
391 		vq_ring_free_chain(vq, desc_idx);
392 		vq->vq_descx[desc_idx].cookie = NULL;
393 	}
394 
395 	return i;
396 }
397 
398 static void
399 finish_req(struct virtqueue *vq)
400 {
401 	struct vring_desc *desc;
402 	uint16_t avail_idx;
403 
404 	desc = &vq->vq_ring.desc[vq->req_end];
405 	desc->flags &= ~VRING_DESC_F_NEXT;
406 
407 	/*
408 	 * Place the head of the descriptor chain into the next slot and make
409 	 * it usable to the host. The chain is made available now rather than
410 	 * deferring to virtqueue_req_flush() in the hopes that if the host is
411 	 * currently running on another CPU, we can keep it processing the new
412 	 * descriptor.
413 	 */
414 	avail_idx = (uint16_t)(vq->vq_avail_idx & (vq->vq_nentries - 1));
415 	vq->vq_ring.avail->ring[avail_idx] = vq->req_start;
416 	vq->vq_avail_idx++;
417 	vq->req_end = VQ_RING_DESC_CHAIN_END;
418 	virtio_wmb();
419 	vq->vq_ring.avail->idx = vq->vq_avail_idx;
420 	vq->reqs_finished++;
421 }
422 
423 int
424 virtqueue_req_start(struct virtqueue *vq, void *cookie, int iovcnt)
425 {
426 	struct vq_desc_extra *dxp;
427 
428 	if (iovcnt > vq->vq_free_cnt) {
429 		return iovcnt > vq->vq_nentries ? -EINVAL : -ENOMEM;
430 	}
431 
432 	if (vq->req_end != VQ_RING_DESC_CHAIN_END) {
433 		finish_req(vq);
434 	}
435 
436 	vq->req_start = vq->vq_desc_head_idx;
437 	dxp = &vq->vq_descx[vq->req_start];
438 	dxp->cookie = cookie;
439 	dxp->ndescs = 0;
440 
441 	return 0;
442 }
443 
444 void
445 virtqueue_req_flush(struct virtqueue *vq)
446 {
447 	uint16_t reqs_finished;
448 
449 	if (vq->req_end == VQ_RING_DESC_CHAIN_END) {
450 		/* no non-empty requests have been started */
451 		return;
452 	}
453 
454 	finish_req(vq);
455 	virtio_mb();
456 
457 	reqs_finished = vq->reqs_finished;
458 	vq->reqs_finished = 0;
459 
460 	if (vq->vdev->negotiated_features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
461 		/* Set used event idx to a value the device will never reach.
462 		 * This effectively disables interrupts.
463 		 */
464 		vring_used_event(&vq->vq_ring) = vq->vq_used_cons_idx - vq->vq_nentries - 1;
465 
466 		if (!vring_need_event(vring_avail_event(&vq->vq_ring),
467 				      vq->vq_avail_idx,
468 				      vq->vq_avail_idx - reqs_finished)) {
469 			return;
470 		}
471 	} else if (vq->vq_ring.used->flags & VRING_USED_F_NO_NOTIFY) {
472 		return;
473 	}
474 
475 	virtio_dev_backend_ops(vq->vdev)->notify_queue(vq->vdev, vq);
476 	SPDK_DEBUGLOG(virtio_dev, "Notified backend after xmit\n");
477 }
478 
479 void
480 virtqueue_req_abort(struct virtqueue *vq)
481 {
482 	struct vring_desc *desc;
483 
484 	if (vq->req_start == VQ_RING_DESC_CHAIN_END) {
485 		/* no requests have been started */
486 		return;
487 	}
488 
489 	desc = &vq->vq_ring.desc[vq->req_end];
490 	desc->flags &= ~VRING_DESC_F_NEXT;
491 
492 	vq_ring_free_chain(vq, vq->req_start);
493 	vq->req_start = VQ_RING_DESC_CHAIN_END;
494 }
495 
496 void
497 virtqueue_req_add_iovs(struct virtqueue *vq, struct iovec *iovs, uint16_t iovcnt,
498 		       enum spdk_virtio_desc_type desc_type)
499 {
500 	struct vring_desc *desc;
501 	struct vq_desc_extra *dxp;
502 	uint16_t i, prev_head, new_head;
503 
504 	assert(vq->req_start != VQ_RING_DESC_CHAIN_END);
505 	assert(iovcnt <= vq->vq_free_cnt);
506 
507 	/* TODO use indirect descriptors if iovcnt is high enough
508 	 * or the caller specifies SPDK_VIRTIO_DESC_F_INDIRECT
509 	 */
510 
511 	prev_head = vq->req_end;
512 	new_head = vq->vq_desc_head_idx;
513 	for (i = 0; i < iovcnt; ++i) {
514 		desc = &vq->vq_ring.desc[new_head];
515 
516 		if (!vq->vdev->is_hw) {
517 			desc->addr  = (uintptr_t)iovs[i].iov_base;
518 		} else {
519 			desc->addr = spdk_vtophys(iovs[i].iov_base, NULL);
520 		}
521 
522 		desc->len = iovs[i].iov_len;
523 		/* always set NEXT flag. unset it on the last descriptor
524 		 * in the request-ending function.
525 		 */
526 		desc->flags = desc_type | VRING_DESC_F_NEXT;
527 
528 		prev_head = new_head;
529 		new_head = desc->next;
530 	}
531 
532 	dxp = &vq->vq_descx[vq->req_start];
533 	dxp->ndescs += iovcnt;
534 
535 	vq->req_end = prev_head;
536 	vq->vq_desc_head_idx = new_head;
537 	vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - iovcnt);
538 	if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END) {
539 		assert(vq->vq_free_cnt == 0);
540 		vq->vq_desc_tail_idx = VQ_RING_DESC_CHAIN_END;
541 	}
542 }
543 
544 #define DESC_PER_CACHELINE (SPDK_CACHE_LINE_SIZE / sizeof(struct vring_desc))
545 uint16_t
546 virtio_recv_pkts(struct virtqueue *vq, void **io, uint32_t *len, uint16_t nb_pkts)
547 {
548 	uint16_t nb_used, num;
549 
550 	nb_used = vq->vq_ring.used->idx - vq->vq_used_cons_idx;
551 	virtio_rmb();
552 
553 	num = (uint16_t)(spdk_likely(nb_used <= nb_pkts) ? nb_used : nb_pkts);
554 	if (spdk_likely(num > DESC_PER_CACHELINE)) {
555 		num = num - ((vq->vq_used_cons_idx + num) % DESC_PER_CACHELINE);
556 	}
557 
558 	return virtqueue_dequeue_burst_rx(vq, io, len, num);
559 }
560 
561 int
562 virtio_dev_acquire_queue(struct virtio_dev *vdev, uint16_t index)
563 {
564 	struct virtqueue *vq = NULL;
565 
566 	if (index >= vdev->max_queues) {
567 		SPDK_ERRLOG("requested vq index %"PRIu16" exceeds max queue count %"PRIu16".\n",
568 			    index, vdev->max_queues);
569 		return -1;
570 	}
571 
572 	pthread_mutex_lock(&vdev->mutex);
573 	vq = vdev->vqs[index];
574 	if (vq == NULL || vq->owner_thread != NULL) {
575 		pthread_mutex_unlock(&vdev->mutex);
576 		return -1;
577 	}
578 
579 	vq->owner_thread = spdk_get_thread();
580 	pthread_mutex_unlock(&vdev->mutex);
581 	return 0;
582 }
583 
584 int32_t
585 virtio_dev_find_and_acquire_queue(struct virtio_dev *vdev, uint16_t start_index)
586 {
587 	struct virtqueue *vq = NULL;
588 	uint16_t i;
589 
590 	pthread_mutex_lock(&vdev->mutex);
591 	for (i = start_index; i < vdev->max_queues; ++i) {
592 		vq = vdev->vqs[i];
593 		if (vq != NULL && vq->owner_thread == NULL) {
594 			break;
595 		}
596 	}
597 
598 	if (vq == NULL || i == vdev->max_queues) {
599 		SPDK_ERRLOG("no more unused virtio queues with idx >= %"PRIu16".\n", start_index);
600 		pthread_mutex_unlock(&vdev->mutex);
601 		return -1;
602 	}
603 
604 	vq->owner_thread = spdk_get_thread();
605 	pthread_mutex_unlock(&vdev->mutex);
606 	return i;
607 }
608 
609 struct spdk_thread *
610 virtio_dev_queue_get_thread(struct virtio_dev *vdev, uint16_t index)
611 {
612 	struct spdk_thread *thread = NULL;
613 
614 	if (index >= vdev->max_queues) {
615 		SPDK_ERRLOG("given vq index %"PRIu16" exceeds max queue count %"PRIu16"\n",
616 			    index, vdev->max_queues);
617 		abort(); /* This is not recoverable */
618 	}
619 
620 	pthread_mutex_lock(&vdev->mutex);
621 	thread = vdev->vqs[index]->owner_thread;
622 	pthread_mutex_unlock(&vdev->mutex);
623 
624 	return thread;
625 }
626 
627 bool
628 virtio_dev_queue_is_acquired(struct virtio_dev *vdev, uint16_t index)
629 {
630 	return virtio_dev_queue_get_thread(vdev, index) != NULL;
631 }
632 
633 void
634 virtio_dev_release_queue(struct virtio_dev *vdev, uint16_t index)
635 {
636 	struct virtqueue *vq = NULL;
637 
638 	if (index >= vdev->max_queues) {
639 		SPDK_ERRLOG("given vq index %"PRIu16" exceeds max queue count %"PRIu16".\n",
640 			    index, vdev->max_queues);
641 		return;
642 	}
643 
644 	pthread_mutex_lock(&vdev->mutex);
645 	vq = vdev->vqs[index];
646 	if (vq == NULL) {
647 		SPDK_ERRLOG("virtqueue at index %"PRIu16" is not initialized.\n", index);
648 		pthread_mutex_unlock(&vdev->mutex);
649 		return;
650 	}
651 
652 	assert(vq->owner_thread == spdk_get_thread());
653 	vq->owner_thread = NULL;
654 	pthread_mutex_unlock(&vdev->mutex);
655 }
656 
657 int
658 virtio_dev_read_dev_config(struct virtio_dev *dev, size_t offset,
659 			   void *dst, int length)
660 {
661 	return virtio_dev_backend_ops(dev)->read_dev_cfg(dev, offset, dst, length);
662 }
663 
664 int
665 virtio_dev_write_dev_config(struct virtio_dev *dev, size_t offset,
666 			    const void *src, int length)
667 {
668 	return virtio_dev_backend_ops(dev)->write_dev_cfg(dev, offset, src, length);
669 }
670 
671 void
672 virtio_dev_stop(struct virtio_dev *dev)
673 {
674 	virtio_dev_backend_ops(dev)->set_status(dev, VIRTIO_CONFIG_S_RESET);
675 	/* flush status write */
676 	virtio_dev_backend_ops(dev)->get_status(dev);
677 	virtio_free_queues(dev);
678 }
679 
680 void
681 virtio_dev_set_status(struct virtio_dev *dev, uint8_t status)
682 {
683 	if (status != VIRTIO_CONFIG_S_RESET) {
684 		status |= virtio_dev_backend_ops(dev)->get_status(dev);
685 	}
686 
687 	virtio_dev_backend_ops(dev)->set_status(dev, status);
688 }
689 
690 uint8_t
691 virtio_dev_get_status(struct virtio_dev *dev)
692 {
693 	return virtio_dev_backend_ops(dev)->get_status(dev);
694 }
695 
696 const struct virtio_dev_ops *
697 virtio_dev_backend_ops(struct virtio_dev *dev)
698 {
699 	return dev->backend_ops;
700 }
701 
702 void
703 virtio_dev_dump_json_info(struct virtio_dev *hw, struct spdk_json_write_ctx *w)
704 {
705 	spdk_json_write_named_object_begin(w, "virtio");
706 
707 	spdk_json_write_named_uint32(w, "vq_count", hw->max_queues);
708 
709 	spdk_json_write_named_uint32(w, "vq_size",
710 				     virtio_dev_backend_ops(hw)->get_queue_size(hw, 0));
711 
712 	virtio_dev_backend_ops(hw)->dump_json_info(hw, w);
713 
714 	spdk_json_write_object_end(w);
715 }
716 
717 SPDK_LOG_REGISTER_COMPONENT(virtio_dev)
718