xref: /spdk/lib/vhost/vhost.c (revision 367c980b453f48310e52d2574afe7d2774df800c)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/likely.h"
38 #include "spdk/string.h"
39 #include "spdk/util.h"
40 #include "spdk/memory.h"
41 #include "spdk/barrier.h"
42 #include "spdk/vhost.h"
43 #include "vhost_internal.h"
44 
45 static struct spdk_cpuset g_vhost_core_mask;
46 
47 /* Path to folder where character device will be created. Can be set by user. */
48 static char dev_dirname[PATH_MAX] = "";
49 
50 /* Thread performing all vhost management operations */
51 static struct spdk_thread *g_vhost_init_thread;
52 
53 static spdk_vhost_fini_cb g_fini_cpl_cb;
54 
55 /**
56  * DPDK calls our callbacks synchronously but the work those callbacks
57  * perform needs to be async. Luckily, all DPDK callbacks are called on
58  * a DPDK-internal pthread, so we'll just wait on a semaphore in there.
59  */
60 static sem_t g_dpdk_sem;
61 
62 /** Return code for the current DPDK callback */
63 static int g_dpdk_response;
64 
65 struct vhost_session_fn_ctx {
66 	/** Device pointer obtained before enqueuing the event */
67 	struct spdk_vhost_dev *vdev;
68 
69 	/** ID of the session to send event to. */
70 	uint32_t vsession_id;
71 
72 	/** User provided function to be executed on session's thread. */
73 	spdk_vhost_session_fn cb_fn;
74 
75 	/**
76 	 * User provided function to be called on the init thread
77 	 * after iterating through all sessions.
78 	 */
79 	spdk_vhost_dev_fn cpl_fn;
80 
81 	/** Custom user context */
82 	void *user_ctx;
83 };
84 
85 static TAILQ_HEAD(, spdk_vhost_dev) g_vhost_devices = TAILQ_HEAD_INITIALIZER(
86 			g_vhost_devices);
87 static pthread_mutex_t g_vhost_mutex = PTHREAD_MUTEX_INITIALIZER;
88 
89 void *vhost_gpa_to_vva(struct spdk_vhost_session *vsession, uint64_t addr, uint64_t len)
90 {
91 	void *vva;
92 	uint64_t newlen;
93 
94 	newlen = len;
95 	vva = (void *)rte_vhost_va_from_guest_pa(vsession->mem, addr, &newlen);
96 	if (newlen != len) {
97 		return NULL;
98 	}
99 
100 	return vva;
101 
102 }
103 
104 static void
105 vhost_log_req_desc(struct spdk_vhost_session *vsession, struct spdk_vhost_virtqueue *virtqueue,
106 		   uint16_t req_id)
107 {
108 	struct vring_desc *desc, *desc_table;
109 	uint32_t desc_table_size;
110 	int rc;
111 
112 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
113 		return;
114 	}
115 
116 	rc = vhost_vq_get_desc(vsession, virtqueue, req_id, &desc, &desc_table, &desc_table_size);
117 	if (spdk_unlikely(rc != 0)) {
118 		SPDK_ERRLOG("Can't log used ring descriptors!\n");
119 		return;
120 	}
121 
122 	do {
123 		if (vhost_vring_desc_is_wr(desc)) {
124 			/* To be honest, only pages realy touched should be logged, but
125 			 * doing so would require tracking those changes in each backed.
126 			 * Also backend most likely will touch all/most of those pages so
127 			 * for lets assume we touched all pages passed to as writeable buffers. */
128 			rte_vhost_log_write(vsession->vid, desc->addr, desc->len);
129 		}
130 		vhost_vring_desc_get_next(&desc, desc_table, desc_table_size);
131 	} while (desc);
132 }
133 
134 static void
135 vhost_log_used_vring_elem(struct spdk_vhost_session *vsession,
136 			  struct spdk_vhost_virtqueue *virtqueue,
137 			  uint16_t idx)
138 {
139 	uint64_t offset, len;
140 
141 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
142 		return;
143 	}
144 
145 	if (spdk_unlikely(virtqueue->packed.packed_ring)) {
146 		offset = idx * sizeof(struct vring_packed_desc);
147 		len = sizeof(struct vring_packed_desc);
148 	} else {
149 		offset = offsetof(struct vring_used, ring[idx]);
150 		len = sizeof(virtqueue->vring.used->ring[idx]);
151 	}
152 
153 	rte_vhost_log_used_vring(vsession->vid, virtqueue->vring_idx, offset, len);
154 }
155 
156 static void
157 vhost_log_used_vring_idx(struct spdk_vhost_session *vsession,
158 			 struct spdk_vhost_virtqueue *virtqueue)
159 {
160 	uint64_t offset, len;
161 	uint16_t vq_idx;
162 
163 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
164 		return;
165 	}
166 
167 	offset = offsetof(struct vring_used, idx);
168 	len = sizeof(virtqueue->vring.used->idx);
169 	vq_idx = virtqueue - vsession->virtqueue;
170 
171 	rte_vhost_log_used_vring(vsession->vid, vq_idx, offset, len);
172 }
173 
174 /*
175  * Get available requests from avail ring.
176  */
177 uint16_t
178 vhost_vq_avail_ring_get(struct spdk_vhost_virtqueue *virtqueue, uint16_t *reqs,
179 			uint16_t reqs_len)
180 {
181 	struct rte_vhost_vring *vring = &virtqueue->vring;
182 	struct vring_avail *avail = vring->avail;
183 	uint16_t size_mask = vring->size - 1;
184 	uint16_t last_idx = virtqueue->last_avail_idx, avail_idx = avail->idx;
185 	uint16_t count, i;
186 
187 	count = avail_idx - last_idx;
188 	if (spdk_likely(count == 0)) {
189 		return 0;
190 	}
191 
192 	if (spdk_unlikely(count > vring->size)) {
193 		/* TODO: the queue is unrecoverably broken and should be marked so.
194 		 * For now we will fail silently and report there are no new avail entries.
195 		 */
196 		return 0;
197 	}
198 
199 	count = spdk_min(count, reqs_len);
200 	virtqueue->last_avail_idx += count;
201 	for (i = 0; i < count; i++) {
202 		reqs[i] = vring->avail->ring[(last_idx + i) & size_mask];
203 	}
204 
205 	SPDK_DEBUGLOG(SPDK_LOG_VHOST_RING,
206 		      "AVAIL: last_idx=%"PRIu16" avail_idx=%"PRIu16" count=%"PRIu16"\n",
207 		      last_idx, avail_idx, count);
208 
209 	return count;
210 }
211 
212 static bool
213 vhost_vring_desc_is_indirect(struct vring_desc *cur_desc)
214 {
215 	return !!(cur_desc->flags & VRING_DESC_F_INDIRECT);
216 }
217 
218 static bool
219 vhost_vring_packed_desc_is_indirect(struct vring_packed_desc *cur_desc)
220 {
221 	return (cur_desc->flags & VRING_DESC_F_INDIRECT) != 0;
222 }
223 
224 int
225 vhost_vq_get_desc(struct spdk_vhost_session *vsession, struct spdk_vhost_virtqueue *virtqueue,
226 		  uint16_t req_idx, struct vring_desc **desc, struct vring_desc **desc_table,
227 		  uint32_t *desc_table_size)
228 {
229 	if (spdk_unlikely(req_idx >= virtqueue->vring.size)) {
230 		return -1;
231 	}
232 
233 	*desc = &virtqueue->vring.desc[req_idx];
234 
235 	if (vhost_vring_desc_is_indirect(*desc)) {
236 		*desc_table_size = (*desc)->len / sizeof(**desc);
237 		*desc_table = vhost_gpa_to_vva(vsession, (*desc)->addr,
238 					       sizeof(**desc) * *desc_table_size);
239 		*desc = *desc_table;
240 		if (*desc == NULL) {
241 			return -1;
242 		}
243 
244 		return 0;
245 	}
246 
247 	*desc_table = virtqueue->vring.desc;
248 	*desc_table_size = virtqueue->vring.size;
249 
250 	return 0;
251 }
252 
253 int
254 vhost_vq_get_desc_packed(struct spdk_vhost_session *vsession,
255 			 struct spdk_vhost_virtqueue *virtqueue,
256 			 uint16_t req_idx, struct vring_packed_desc **desc,
257 			 struct vring_packed_desc **desc_table, uint32_t *desc_table_size)
258 {
259 	*desc =  &virtqueue->vring.desc_packed[req_idx];
260 
261 	/* In packed ring when the desc is non-indirect we get next desc
262 	 * by judging (desc->flag & VRING_DESC_F_NEXT) != 0. When the desc
263 	 * is indirect we get next desc by idx and desc_table_size. It's
264 	 * different from split ring.
265 	 */
266 	if (vhost_vring_packed_desc_is_indirect(*desc)) {
267 		*desc_table_size = (*desc)->len / sizeof(struct vring_packed_desc);
268 		*desc_table = vhost_gpa_to_vva(vsession, (*desc)->addr,
269 					       (*desc)->len);
270 		*desc = *desc_table;
271 		if (spdk_unlikely(*desc == NULL)) {
272 			return -1;
273 		}
274 	} else {
275 		*desc_table = NULL;
276 		*desc_table_size  = 0;
277 	}
278 
279 	return 0;
280 }
281 
282 int
283 vhost_vq_used_signal(struct spdk_vhost_session *vsession,
284 		     struct spdk_vhost_virtqueue *virtqueue)
285 {
286 	if (virtqueue->used_req_cnt == 0) {
287 		return 0;
288 	}
289 
290 	virtqueue->req_cnt += virtqueue->used_req_cnt;
291 	virtqueue->used_req_cnt = 0;
292 
293 	SPDK_DEBUGLOG(SPDK_LOG_VHOST_RING,
294 		      "Queue %td - USED RING: sending IRQ: last used %"PRIu16"\n",
295 		      virtqueue - vsession->virtqueue, virtqueue->last_used_idx);
296 
297 	if (rte_vhost_vring_call(vsession->vid, virtqueue->vring_idx) == 0) {
298 		/* interrupt signalled */
299 		return 1;
300 	} else {
301 		/* interrupt not signalled */
302 		return 0;
303 	}
304 }
305 
306 
307 static void
308 check_session_io_stats(struct spdk_vhost_session *vsession, uint64_t now)
309 {
310 	struct spdk_vhost_virtqueue *virtqueue;
311 	uint32_t irq_delay_base = vsession->coalescing_delay_time_base;
312 	uint32_t io_threshold = vsession->coalescing_io_rate_threshold;
313 	int32_t irq_delay;
314 	uint32_t req_cnt;
315 	uint16_t q_idx;
316 
317 	if (now < vsession->next_stats_check_time) {
318 		return;
319 	}
320 
321 	vsession->next_stats_check_time = now + vsession->stats_check_interval;
322 	for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
323 		virtqueue = &vsession->virtqueue[q_idx];
324 
325 		req_cnt = virtqueue->req_cnt + virtqueue->used_req_cnt;
326 		if (req_cnt <= io_threshold) {
327 			continue;
328 		}
329 
330 		irq_delay = (irq_delay_base * (req_cnt - io_threshold)) / io_threshold;
331 		virtqueue->irq_delay_time = (uint32_t) spdk_max(0, irq_delay);
332 
333 		virtqueue->req_cnt = 0;
334 		virtqueue->next_event_time = now;
335 	}
336 }
337 
338 static inline bool
339 vhost_vq_event_is_suppressed(struct spdk_vhost_virtqueue *vq)
340 {
341 	if (spdk_unlikely(vq->packed.packed_ring)) {
342 		if (vq->vring.driver_event->flags & VRING_PACKED_EVENT_FLAG_DISABLE) {
343 			return true;
344 		}
345 	} else {
346 		if (vq->vring.avail->flags & VRING_AVAIL_F_NO_INTERRUPT) {
347 			return true;
348 		}
349 	}
350 
351 	return false;
352 }
353 
354 void
355 vhost_session_used_signal(struct spdk_vhost_session *vsession)
356 {
357 	struct spdk_vhost_virtqueue *virtqueue;
358 	uint64_t now;
359 	uint16_t q_idx;
360 
361 	if (vsession->coalescing_delay_time_base == 0) {
362 		for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
363 			virtqueue = &vsession->virtqueue[q_idx];
364 
365 			if (virtqueue->vring.desc == NULL) {
366 				continue;
367 			}
368 
369 			if (vhost_vq_event_is_suppressed(virtqueue)) {
370 				continue;
371 			}
372 
373 			vhost_vq_used_signal(vsession, virtqueue);
374 		}
375 	} else {
376 		now = spdk_get_ticks();
377 		check_session_io_stats(vsession, now);
378 
379 		for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
380 			virtqueue = &vsession->virtqueue[q_idx];
381 
382 			/* No need for event right now */
383 			if (now < virtqueue->next_event_time) {
384 				continue;
385 			}
386 
387 			if (vhost_vq_event_is_suppressed(virtqueue)) {
388 				continue;
389 			}
390 
391 			if (!vhost_vq_used_signal(vsession, virtqueue)) {
392 				continue;
393 			}
394 
395 			/* Syscall is quite long so update time */
396 			now = spdk_get_ticks();
397 			virtqueue->next_event_time = now + virtqueue->irq_delay_time;
398 		}
399 	}
400 }
401 
402 static int
403 vhost_session_set_coalescing(struct spdk_vhost_dev *vdev,
404 			     struct spdk_vhost_session *vsession, void *ctx)
405 {
406 	vsession->coalescing_delay_time_base =
407 		vdev->coalescing_delay_us * spdk_get_ticks_hz() / 1000000ULL;
408 	vsession->coalescing_io_rate_threshold =
409 		vdev->coalescing_iops_threshold * SPDK_VHOST_STATS_CHECK_INTERVAL_MS / 1000U;
410 	return 0;
411 }
412 
413 static int
414 vhost_dev_set_coalescing(struct spdk_vhost_dev *vdev, uint32_t delay_base_us,
415 			 uint32_t iops_threshold)
416 {
417 	uint64_t delay_time_base = delay_base_us * spdk_get_ticks_hz() / 1000000ULL;
418 	uint32_t io_rate = iops_threshold * SPDK_VHOST_STATS_CHECK_INTERVAL_MS / 1000U;
419 
420 	if (delay_time_base >= UINT32_MAX) {
421 		SPDK_ERRLOG("Delay time of %"PRIu32" is to big\n", delay_base_us);
422 		return -EINVAL;
423 	} else if (io_rate == 0) {
424 		SPDK_ERRLOG("IOPS rate of %"PRIu32" is too low. Min is %u\n", io_rate,
425 			    1000U / SPDK_VHOST_STATS_CHECK_INTERVAL_MS);
426 		return -EINVAL;
427 	}
428 
429 	vdev->coalescing_delay_us = delay_base_us;
430 	vdev->coalescing_iops_threshold = iops_threshold;
431 	return 0;
432 }
433 
434 int
435 spdk_vhost_set_coalescing(struct spdk_vhost_dev *vdev, uint32_t delay_base_us,
436 			  uint32_t iops_threshold)
437 {
438 	int rc;
439 
440 	rc = vhost_dev_set_coalescing(vdev, delay_base_us, iops_threshold);
441 	if (rc != 0) {
442 		return rc;
443 	}
444 
445 	vhost_dev_foreach_session(vdev, vhost_session_set_coalescing, NULL, NULL);
446 	return 0;
447 }
448 
449 void
450 spdk_vhost_get_coalescing(struct spdk_vhost_dev *vdev, uint32_t *delay_base_us,
451 			  uint32_t *iops_threshold)
452 {
453 	if (delay_base_us) {
454 		*delay_base_us = vdev->coalescing_delay_us;
455 	}
456 
457 	if (iops_threshold) {
458 		*iops_threshold = vdev->coalescing_iops_threshold;
459 	}
460 }
461 
462 /*
463  * Enqueue id and len to used ring.
464  */
465 void
466 vhost_vq_used_ring_enqueue(struct spdk_vhost_session *vsession,
467 			   struct spdk_vhost_virtqueue *virtqueue,
468 			   uint16_t id, uint32_t len)
469 {
470 	struct rte_vhost_vring *vring = &virtqueue->vring;
471 	struct vring_used *used = vring->used;
472 	uint16_t last_idx = virtqueue->last_used_idx & (vring->size - 1);
473 	uint16_t vq_idx = virtqueue->vring_idx;
474 
475 	SPDK_DEBUGLOG(SPDK_LOG_VHOST_RING,
476 		      "Queue %td - USED RING: last_idx=%"PRIu16" req id=%"PRIu16" len=%"PRIu32"\n",
477 		      virtqueue - vsession->virtqueue, virtqueue->last_used_idx, id, len);
478 
479 	vhost_log_req_desc(vsession, virtqueue, id);
480 
481 	virtqueue->last_used_idx++;
482 	used->ring[last_idx].id = id;
483 	used->ring[last_idx].len = len;
484 
485 	/* Ensure the used ring is updated before we log it or increment used->idx. */
486 	spdk_smp_wmb();
487 
488 	rte_vhost_set_last_inflight_io_split(vsession->vid, vq_idx, id);
489 
490 	vhost_log_used_vring_elem(vsession, virtqueue, last_idx);
491 	* (volatile uint16_t *) &used->idx = virtqueue->last_used_idx;
492 	vhost_log_used_vring_idx(vsession, virtqueue);
493 
494 	rte_vhost_clr_inflight_desc_split(vsession->vid, vq_idx, virtqueue->last_used_idx, id);
495 
496 	virtqueue->used_req_cnt++;
497 }
498 
499 void
500 vhost_vq_packed_ring_enqueue(struct spdk_vhost_session *vsession,
501 			     struct spdk_vhost_virtqueue *virtqueue,
502 			     uint16_t num_descs, uint16_t buffer_id,
503 			     uint32_t length)
504 {
505 	struct vring_packed_desc *desc = &virtqueue->vring.desc_packed[virtqueue->last_used_idx];
506 	bool used, avail;
507 
508 	SPDK_DEBUGLOG(SPDK_LOG_VHOST_RING,
509 		      "Queue %td - RING: buffer_id=%"PRIu16"\n",
510 		      virtqueue - vsession->virtqueue, buffer_id);
511 
512 	/* When the descriptor is used, two flags in descriptor
513 	 * avail flag and used flag are set to equal
514 	 * and used flag value == used_wrap_counter.
515 	 */
516 	used = !!(desc->flags & VRING_DESC_F_USED);
517 	avail = !!(desc->flags & VRING_DESC_F_AVAIL);
518 	if (spdk_unlikely(used == virtqueue->packed.used_phase && used == avail)) {
519 		SPDK_ERRLOG("descriptor has been used before\n");
520 		return;
521 	}
522 
523 	/* In used desc addr is unused and len specifies the buffer length
524 	 * that has been written to by the device.
525 	 */
526 	desc->addr = 0;
527 	desc->len = length;
528 
529 	/* This bit specifies whether any data has been written by the device */
530 	if (length != 0) {
531 		desc->flags |= VRING_DESC_F_WRITE;
532 	}
533 
534 	/* Buffer ID is included in the last descriptor in the list.
535 	 * The driver needs to keep track of the size of the list corresponding
536 	 * to each buffer ID.
537 	 */
538 	desc->id = buffer_id;
539 
540 	/* A device MUST NOT make the descriptor used before buffer_id is
541 	 * written to the descriptor.
542 	 */
543 	spdk_smp_wmb();
544 	/* To mark a desc as used, the device sets the F_USED bit in flags to match
545 	 * the internal Device ring wrap counter. It also sets the F_AVAIL bit to
546 	 * match the same value.
547 	 */
548 	if (virtqueue->packed.used_phase) {
549 		desc->flags |= VRING_DESC_F_AVAIL_USED;
550 	} else {
551 		desc->flags &= ~VRING_DESC_F_AVAIL_USED;
552 	}
553 
554 	vhost_log_used_vring_elem(vsession, virtqueue, virtqueue->last_used_idx);
555 	virtqueue->last_used_idx += num_descs;
556 	if (virtqueue->last_used_idx >= virtqueue->vring.size) {
557 		virtqueue->last_used_idx -= virtqueue->vring.size;
558 		virtqueue->packed.used_phase = !virtqueue->packed.used_phase;
559 	}
560 
561 	virtqueue->used_req_cnt++;
562 }
563 
564 bool
565 vhost_vq_packed_ring_is_avail(struct spdk_vhost_virtqueue *virtqueue)
566 {
567 	uint16_t flags = virtqueue->vring.desc_packed[virtqueue->last_avail_idx].flags;
568 
569 	/* To mark a desc as available, the driver sets the F_AVAIL bit in flags
570 	 * to match the internal avail wrap counter. It also sets the F_USED bit to
571 	 * match the inverse value but it's not mandatory.
572 	 */
573 	return (!!(flags & VRING_DESC_F_AVAIL) == virtqueue->packed.avail_phase);
574 }
575 
576 bool
577 vhost_vring_packed_desc_is_wr(struct vring_packed_desc *cur_desc)
578 {
579 	return (cur_desc->flags & VRING_DESC_F_WRITE) != 0;
580 }
581 
582 int
583 vhost_vring_packed_desc_get_next(struct vring_packed_desc **desc, uint16_t *req_idx,
584 				 struct spdk_vhost_virtqueue *vq,
585 				 struct vring_packed_desc *desc_table,
586 				 uint32_t desc_table_size)
587 {
588 	if (desc_table != NULL) {
589 		/* When the desc_table isn't NULL means it's indirect and we get the next
590 		 * desc by req_idx and desc_table_size. The return value is NULL means
591 		 * we reach the last desc of this request.
592 		 */
593 		(*req_idx)++;
594 		if (*req_idx < desc_table_size) {
595 			*desc = &desc_table[*req_idx];
596 		} else {
597 			*desc = NULL;
598 		}
599 	} else {
600 		/* When the desc_table is NULL means it's non-indirect and we get the next
601 		 * desc by req_idx and F_NEXT in flags. The return value is NULL means
602 		 * we reach the last desc of this request. When return new desc
603 		 * we update the req_idx too.
604 		 */
605 		if (((*desc)->flags & VRING_DESC_F_NEXT) == 0) {
606 			*desc = NULL;
607 			return 0;
608 		}
609 
610 		*req_idx = (*req_idx + 1) % vq->vring.size;
611 		*desc = &vq->vring.desc_packed[*req_idx];
612 	}
613 
614 	return 0;
615 }
616 
617 static int
618 vhost_vring_desc_payload_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
619 				uint16_t *iov_index, uintptr_t payload, uint64_t remaining)
620 {
621 	uintptr_t vva;
622 	uint64_t len;
623 
624 	do {
625 		if (*iov_index >= SPDK_VHOST_IOVS_MAX) {
626 			SPDK_ERRLOG("SPDK_VHOST_IOVS_MAX(%d) reached\n", SPDK_VHOST_IOVS_MAX);
627 			return -1;
628 		}
629 		len = remaining;
630 		vva = (uintptr_t)rte_vhost_va_from_guest_pa(vsession->mem, payload, &len);
631 		if (vva == 0 || len == 0) {
632 			SPDK_ERRLOG("gpa_to_vva(%p) == NULL\n", (void *)payload);
633 			return -1;
634 		}
635 		iov[*iov_index].iov_base = (void *)vva;
636 		iov[*iov_index].iov_len = len;
637 		remaining -= len;
638 		payload += len;
639 		(*iov_index)++;
640 	} while (remaining);
641 
642 	return 0;
643 }
644 
645 int
646 vhost_vring_packed_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
647 			       uint16_t *iov_index, const struct vring_packed_desc *desc)
648 {
649 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
650 					       desc->addr, desc->len);
651 }
652 
653 /* 1, Traverse the desc chain to get the buffer_id and return buffer_id as task_idx.
654  * 2, Update the vq->last_avail_idx to point next available desc chain.
655  * 3, Update the avail_wrap_counter if last_avail_idx overturn.
656  */
657 uint16_t
658 vhost_vring_packed_desc_get_buffer_id(struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
659 				      uint16_t *num_descs)
660 {
661 	struct vring_packed_desc *desc;
662 	uint16_t desc_head = req_idx;
663 
664 	*num_descs = 1;
665 
666 	desc =  &vq->vring.desc_packed[req_idx];
667 	if (!vhost_vring_packed_desc_is_indirect(desc)) {
668 		while ((desc->flags & VRING_DESC_F_NEXT) != 0) {
669 			req_idx = (req_idx + 1) % vq->vring.size;
670 			desc = &vq->vring.desc_packed[req_idx];
671 			(*num_descs)++;
672 		}
673 	}
674 
675 	/* Queue Size doesn't have to be a power of 2
676 	 * Device maintains last_avail_idx so we can make sure
677 	 * the value is valid(0 ~ vring.size - 1)
678 	 */
679 	vq->last_avail_idx = (req_idx + 1) % vq->vring.size;
680 	if (vq->last_avail_idx < desc_head) {
681 		vq->packed.avail_phase = !vq->packed.avail_phase;
682 	}
683 
684 	return desc->id;
685 }
686 
687 int
688 vhost_vring_desc_get_next(struct vring_desc **desc,
689 			  struct vring_desc *desc_table, uint32_t desc_table_size)
690 {
691 	struct vring_desc *old_desc = *desc;
692 	uint16_t next_idx;
693 
694 	if ((old_desc->flags & VRING_DESC_F_NEXT) == 0) {
695 		*desc = NULL;
696 		return 0;
697 	}
698 
699 	next_idx = old_desc->next;
700 	if (spdk_unlikely(next_idx >= desc_table_size)) {
701 		*desc = NULL;
702 		return -1;
703 	}
704 
705 	*desc = &desc_table[next_idx];
706 	return 0;
707 }
708 
709 int
710 vhost_vring_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
711 			uint16_t *iov_index, const struct vring_desc *desc)
712 {
713 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
714 					       desc->addr, desc->len);
715 }
716 
717 static struct spdk_vhost_session *
718 vhost_session_find_by_id(struct spdk_vhost_dev *vdev, unsigned id)
719 {
720 	struct spdk_vhost_session *vsession;
721 
722 	TAILQ_FOREACH(vsession, &vdev->vsessions, tailq) {
723 		if (vsession->id == id) {
724 			return vsession;
725 		}
726 	}
727 
728 	return NULL;
729 }
730 
731 struct spdk_vhost_session *
732 vhost_session_find_by_vid(int vid)
733 {
734 	struct spdk_vhost_dev *vdev;
735 	struct spdk_vhost_session *vsession;
736 
737 	TAILQ_FOREACH(vdev, &g_vhost_devices, tailq) {
738 		TAILQ_FOREACH(vsession, &vdev->vsessions, tailq) {
739 			if (vsession->vid == vid) {
740 				return vsession;
741 			}
742 		}
743 	}
744 
745 	return NULL;
746 }
747 
748 struct spdk_vhost_dev *
749 spdk_vhost_dev_next(struct spdk_vhost_dev *vdev)
750 {
751 	if (vdev == NULL) {
752 		return TAILQ_FIRST(&g_vhost_devices);
753 	}
754 
755 	return TAILQ_NEXT(vdev, tailq);
756 }
757 
758 struct spdk_vhost_dev *
759 spdk_vhost_dev_find(const char *ctrlr_name)
760 {
761 	struct spdk_vhost_dev *vdev;
762 	size_t dev_dirname_len = strlen(dev_dirname);
763 
764 	if (strncmp(ctrlr_name, dev_dirname, dev_dirname_len) == 0) {
765 		ctrlr_name += dev_dirname_len;
766 	}
767 
768 	TAILQ_FOREACH(vdev, &g_vhost_devices, tailq) {
769 		if (strcmp(vdev->name, ctrlr_name) == 0) {
770 			return vdev;
771 		}
772 	}
773 
774 	return NULL;
775 }
776 
777 static int
778 vhost_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask)
779 {
780 	int rc;
781 
782 	if (cpumask == NULL) {
783 		return -1;
784 	}
785 
786 	if (mask == NULL) {
787 		spdk_cpuset_copy(cpumask, &g_vhost_core_mask);
788 		return 0;
789 	}
790 
791 	rc = spdk_cpuset_parse(cpumask, mask);
792 	if (rc < 0) {
793 		SPDK_ERRLOG("invalid cpumask %s\n", mask);
794 		return -1;
795 	}
796 
797 	spdk_cpuset_and(cpumask, &g_vhost_core_mask);
798 
799 	if (spdk_cpuset_count(cpumask) == 0) {
800 		SPDK_ERRLOG("no cpu is selected among core mask(=%s)\n",
801 			    spdk_cpuset_fmt(&g_vhost_core_mask));
802 		return -1;
803 	}
804 
805 	return 0;
806 }
807 
808 static void
809 vhost_setup_core_mask(void *ctx)
810 {
811 	struct spdk_thread *thread = spdk_get_thread();
812 	spdk_cpuset_or(&g_vhost_core_mask, spdk_thread_get_cpumask(thread));
813 }
814 
815 static void
816 vhost_setup_core_mask_done(void *ctx)
817 {
818 	spdk_vhost_init_cb init_cb = ctx;
819 
820 	if (spdk_cpuset_count(&g_vhost_core_mask) == 0) {
821 		init_cb(-ECHILD);
822 		return;
823 	}
824 
825 	init_cb(0);
826 }
827 
828 static void
829 vhost_dev_thread_exit(void *arg1)
830 {
831 	spdk_thread_exit(spdk_get_thread());
832 }
833 
834 int
835 vhost_dev_register(struct spdk_vhost_dev *vdev, const char *name, const char *mask_str,
836 		   const struct spdk_vhost_dev_backend *backend)
837 {
838 	char path[PATH_MAX];
839 	struct spdk_cpuset cpumask = {};
840 	int rc;
841 
842 	assert(vdev);
843 	if (name == NULL) {
844 		SPDK_ERRLOG("Can't register controller with no name\n");
845 		return -EINVAL;
846 	}
847 
848 	if (vhost_parse_core_mask(mask_str, &cpumask) != 0) {
849 		SPDK_ERRLOG("cpumask %s is invalid (core mask is 0x%s)\n",
850 			    mask_str, spdk_cpuset_fmt(&g_vhost_core_mask));
851 		return -EINVAL;
852 	}
853 
854 	if (spdk_vhost_dev_find(name)) {
855 		SPDK_ERRLOG("vhost controller %s already exists.\n", name);
856 		return -EEXIST;
857 	}
858 
859 	if (snprintf(path, sizeof(path), "%s%s", dev_dirname, name) >= (int)sizeof(path)) {
860 		SPDK_ERRLOG("Resulting socket path for controller %s is too long: %s%s\n", name, dev_dirname,
861 			    name);
862 		return -EINVAL;
863 	}
864 
865 	vdev->name = strdup(name);
866 	vdev->path = strdup(path);
867 	if (vdev->name == NULL || vdev->path == NULL) {
868 		rc = -EIO;
869 		goto out;
870 	}
871 
872 	vdev->thread = spdk_thread_create(vdev->name, &cpumask);
873 	if (vdev->thread == NULL) {
874 		SPDK_ERRLOG("Failed to create thread for vhost controller %s.\n", name);
875 		rc = -EIO;
876 		goto out;
877 	}
878 
879 	vdev->registered = true;
880 	vdev->backend = backend;
881 	TAILQ_INIT(&vdev->vsessions);
882 
883 	vhost_dev_set_coalescing(vdev, SPDK_VHOST_COALESCING_DELAY_BASE_US,
884 				 SPDK_VHOST_VQ_IOPS_COALESCING_THRESHOLD);
885 
886 	if (vhost_register_unix_socket(path, name, vdev->virtio_features, vdev->disabled_features,
887 				       vdev->protocol_features)) {
888 		spdk_thread_send_msg(vdev->thread, vhost_dev_thread_exit, NULL);
889 		rc = -EIO;
890 		goto out;
891 	}
892 
893 	TAILQ_INSERT_TAIL(&g_vhost_devices, vdev, tailq);
894 
895 	SPDK_INFOLOG(SPDK_LOG_VHOST, "Controller %s: new controller added\n", vdev->name);
896 	return 0;
897 
898 out:
899 	free(vdev->name);
900 	free(vdev->path);
901 	return rc;
902 }
903 
904 int
905 vhost_dev_unregister(struct spdk_vhost_dev *vdev)
906 {
907 	if (!TAILQ_EMPTY(&vdev->vsessions)) {
908 		SPDK_ERRLOG("Controller %s has still valid connection.\n", vdev->name);
909 		return -EBUSY;
910 	}
911 
912 	if (vdev->registered && vhost_driver_unregister(vdev->path) != 0) {
913 		SPDK_ERRLOG("Could not unregister controller %s with vhost library\n"
914 			    "Check if domain socket %s still exists\n",
915 			    vdev->name, vdev->path);
916 		return -EIO;
917 	}
918 
919 	SPDK_INFOLOG(SPDK_LOG_VHOST, "Controller %s: removed\n", vdev->name);
920 
921 	spdk_thread_send_msg(vdev->thread, vhost_dev_thread_exit, NULL);
922 
923 	free(vdev->name);
924 	free(vdev->path);
925 	TAILQ_REMOVE(&g_vhost_devices, vdev, tailq);
926 	return 0;
927 }
928 
929 const char *
930 spdk_vhost_dev_get_name(struct spdk_vhost_dev *vdev)
931 {
932 	assert(vdev != NULL);
933 	return vdev->name;
934 }
935 
936 const struct spdk_cpuset *
937 spdk_vhost_dev_get_cpumask(struct spdk_vhost_dev *vdev)
938 {
939 	assert(vdev != NULL);
940 	return spdk_thread_get_cpumask(vdev->thread);
941 }
942 
943 static void
944 wait_for_semaphore(int timeout_sec, const char *errmsg)
945 {
946 	struct timespec timeout;
947 	int rc;
948 
949 	clock_gettime(CLOCK_REALTIME, &timeout);
950 	timeout.tv_sec += timeout_sec;
951 	rc = sem_timedwait(&g_dpdk_sem, &timeout);
952 	if (rc != 0) {
953 		SPDK_ERRLOG("Timeout waiting for event: %s.\n", errmsg);
954 		sem_wait(&g_dpdk_sem);
955 	}
956 }
957 
958 static void
959 vhost_session_cb_done(int rc)
960 {
961 	g_dpdk_response = rc;
962 	sem_post(&g_dpdk_sem);
963 }
964 
965 void
966 vhost_session_start_done(struct spdk_vhost_session *vsession, int response)
967 {
968 	if (response == 0) {
969 		vsession->started = true;
970 
971 		assert(vsession->vdev->active_session_num < UINT32_MAX);
972 		vsession->vdev->active_session_num++;
973 	}
974 
975 	vhost_session_cb_done(response);
976 }
977 
978 void
979 vhost_session_stop_done(struct spdk_vhost_session *vsession, int response)
980 {
981 	if (response == 0) {
982 		vsession->started = false;
983 
984 		assert(vsession->vdev->active_session_num > 0);
985 		vsession->vdev->active_session_num--;
986 	}
987 
988 	vhost_session_cb_done(response);
989 }
990 
991 static void
992 vhost_event_cb(void *arg1)
993 {
994 	struct vhost_session_fn_ctx *ctx = arg1;
995 	struct spdk_vhost_session *vsession;
996 
997 	if (pthread_mutex_trylock(&g_vhost_mutex) != 0) {
998 		spdk_thread_send_msg(spdk_get_thread(), vhost_event_cb, arg1);
999 		return;
1000 	}
1001 
1002 	vsession = vhost_session_find_by_id(ctx->vdev, ctx->vsession_id);
1003 	ctx->cb_fn(ctx->vdev, vsession, NULL);
1004 	pthread_mutex_unlock(&g_vhost_mutex);
1005 }
1006 
1007 int
1008 vhost_session_send_event(struct spdk_vhost_session *vsession,
1009 			 spdk_vhost_session_fn cb_fn, unsigned timeout_sec,
1010 			 const char *errmsg)
1011 {
1012 	struct vhost_session_fn_ctx ev_ctx = {0};
1013 	struct spdk_vhost_dev *vdev = vsession->vdev;
1014 
1015 	ev_ctx.vdev = vdev;
1016 	ev_ctx.vsession_id = vsession->id;
1017 	ev_ctx.cb_fn = cb_fn;
1018 
1019 	spdk_thread_send_msg(vdev->thread, vhost_event_cb, &ev_ctx);
1020 
1021 	pthread_mutex_unlock(&g_vhost_mutex);
1022 	wait_for_semaphore(timeout_sec, errmsg);
1023 	pthread_mutex_lock(&g_vhost_mutex);
1024 
1025 	return g_dpdk_response;
1026 }
1027 
1028 static void
1029 foreach_session_finish_cb(void *arg1)
1030 {
1031 	struct vhost_session_fn_ctx *ev_ctx = arg1;
1032 	struct spdk_vhost_dev *vdev = ev_ctx->vdev;
1033 
1034 	if (pthread_mutex_trylock(&g_vhost_mutex) != 0) {
1035 		spdk_thread_send_msg(spdk_get_thread(),
1036 				     foreach_session_finish_cb, arg1);
1037 		return;
1038 	}
1039 
1040 	assert(vdev->pending_async_op_num > 0);
1041 	vdev->pending_async_op_num--;
1042 	if (ev_ctx->cpl_fn != NULL) {
1043 		ev_ctx->cpl_fn(vdev, ev_ctx->user_ctx);
1044 	}
1045 
1046 	pthread_mutex_unlock(&g_vhost_mutex);
1047 	free(ev_ctx);
1048 }
1049 
1050 static void
1051 foreach_session(void *arg1)
1052 {
1053 	struct vhost_session_fn_ctx *ev_ctx = arg1;
1054 	struct spdk_vhost_session *vsession;
1055 	struct spdk_vhost_dev *vdev = ev_ctx->vdev;
1056 	int rc;
1057 
1058 	if (pthread_mutex_trylock(&g_vhost_mutex) != 0) {
1059 		spdk_thread_send_msg(spdk_get_thread(), foreach_session, arg1);
1060 		return;
1061 	}
1062 
1063 	TAILQ_FOREACH(vsession, &vdev->vsessions, tailq) {
1064 		if (vsession->initialized) {
1065 			rc = ev_ctx->cb_fn(vdev, vsession, ev_ctx->user_ctx);
1066 			if (rc < 0) {
1067 				goto out;
1068 			}
1069 		}
1070 	}
1071 
1072 out:
1073 	pthread_mutex_unlock(&g_vhost_mutex);
1074 
1075 	spdk_thread_send_msg(g_vhost_init_thread, foreach_session_finish_cb, arg1);
1076 }
1077 
1078 void
1079 vhost_dev_foreach_session(struct spdk_vhost_dev *vdev,
1080 			  spdk_vhost_session_fn fn,
1081 			  spdk_vhost_dev_fn cpl_fn,
1082 			  void *arg)
1083 {
1084 	struct vhost_session_fn_ctx *ev_ctx;
1085 
1086 	ev_ctx = calloc(1, sizeof(*ev_ctx));
1087 	if (ev_ctx == NULL) {
1088 		SPDK_ERRLOG("Failed to alloc vhost event.\n");
1089 		assert(false);
1090 		return;
1091 	}
1092 
1093 	ev_ctx->vdev = vdev;
1094 	ev_ctx->cb_fn = fn;
1095 	ev_ctx->cpl_fn = cpl_fn;
1096 	ev_ctx->user_ctx = arg;
1097 
1098 	assert(vdev->pending_async_op_num < UINT32_MAX);
1099 	vdev->pending_async_op_num++;
1100 
1101 	spdk_thread_send_msg(vdev->thread, foreach_session, ev_ctx);
1102 }
1103 
1104 static int
1105 _stop_session(struct spdk_vhost_session *vsession)
1106 {
1107 	struct spdk_vhost_dev *vdev = vsession->vdev;
1108 	struct spdk_vhost_virtqueue *q;
1109 	int rc;
1110 	uint16_t i;
1111 
1112 	rc = vdev->backend->stop_session(vsession);
1113 	if (rc != 0) {
1114 		SPDK_ERRLOG("Couldn't stop device with vid %d.\n", vsession->vid);
1115 		pthread_mutex_unlock(&g_vhost_mutex);
1116 		return rc;
1117 	}
1118 
1119 	for (i = 0; i < vsession->max_queues; i++) {
1120 		q = &vsession->virtqueue[i];
1121 
1122 		/* vring.desc and vring.desc_packed are in a union struct
1123 		 * so q->vring.desc can replace q->vring.desc_packed.
1124 		 */
1125 		if (q->vring.desc == NULL) {
1126 			continue;
1127 		}
1128 
1129 		/* Packed virtqueues support up to 2^15 entries each
1130 		 * so left one bit can be used as wrap counter.
1131 		 */
1132 		if (q->packed.packed_ring) {
1133 			q->last_avail_idx = q->last_avail_idx |
1134 					    ((uint16_t)q->packed.avail_phase << 15);
1135 			q->last_used_idx = q->last_used_idx |
1136 					   ((uint16_t)q->packed.used_phase << 15);
1137 		}
1138 
1139 		rte_vhost_set_vring_base(vsession->vid, i, q->last_avail_idx, q->last_used_idx);
1140 	}
1141 
1142 	vhost_session_mem_unregister(vsession->mem);
1143 	free(vsession->mem);
1144 
1145 	return 0;
1146 }
1147 
1148 int
1149 vhost_stop_device_cb(int vid)
1150 {
1151 	struct spdk_vhost_session *vsession;
1152 	int rc;
1153 
1154 	pthread_mutex_lock(&g_vhost_mutex);
1155 	vsession = vhost_session_find_by_vid(vid);
1156 	if (vsession == NULL) {
1157 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1158 		pthread_mutex_unlock(&g_vhost_mutex);
1159 		return -EINVAL;
1160 	}
1161 
1162 	if (!vsession->started) {
1163 		/* already stopped, nothing to do */
1164 		pthread_mutex_unlock(&g_vhost_mutex);
1165 		return -EALREADY;
1166 	}
1167 
1168 	rc = _stop_session(vsession);
1169 	pthread_mutex_unlock(&g_vhost_mutex);
1170 
1171 	return rc;
1172 }
1173 
1174 int
1175 vhost_start_device_cb(int vid)
1176 {
1177 	struct spdk_vhost_dev *vdev;
1178 	struct spdk_vhost_session *vsession;
1179 	int rc = -1;
1180 	uint16_t i;
1181 	bool packed_ring;
1182 
1183 	pthread_mutex_lock(&g_vhost_mutex);
1184 
1185 	vsession = vhost_session_find_by_vid(vid);
1186 	if (vsession == NULL) {
1187 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1188 		goto out;
1189 	}
1190 
1191 	vdev = vsession->vdev;
1192 	if (vsession->started) {
1193 		/* already started, nothing to do */
1194 		rc = 0;
1195 		goto out;
1196 	}
1197 
1198 	if (vhost_get_negotiated_features(vid, &vsession->negotiated_features) != 0) {
1199 		SPDK_ERRLOG("vhost device %d: Failed to get negotiated driver features\n", vid);
1200 		goto out;
1201 	}
1202 
1203 	packed_ring = ((vsession->negotiated_features & (1ULL << VIRTIO_F_RING_PACKED)) != 0);
1204 
1205 	vsession->max_queues = 0;
1206 	memset(vsession->virtqueue, 0, sizeof(vsession->virtqueue));
1207 	for (i = 0; i < SPDK_VHOST_MAX_VQUEUES; i++) {
1208 		struct spdk_vhost_virtqueue *q = &vsession->virtqueue[i];
1209 
1210 		q->vring_idx = -1;
1211 		if (rte_vhost_get_vhost_vring(vid, i, &q->vring)) {
1212 			continue;
1213 		}
1214 		q->vring_idx = i;
1215 		rte_vhost_get_vhost_ring_inflight(vid, i, &q->vring_inflight);
1216 
1217 		/* vring.desc and vring.desc_packed are in a union struct
1218 		 * so q->vring.desc can replace q->vring.desc_packed.
1219 		 */
1220 		if (q->vring.desc == NULL || q->vring.size == 0) {
1221 			continue;
1222 		}
1223 
1224 		if (rte_vhost_get_vring_base(vsession->vid, i, &q->last_avail_idx, &q->last_used_idx)) {
1225 			q->vring.desc = NULL;
1226 			continue;
1227 		}
1228 
1229 		if (packed_ring) {
1230 			/* Packed virtqueues support up to 2^15 entries each
1231 			 * so left one bit can be used as wrap counter.
1232 			 */
1233 			q->packed.avail_phase = q->last_avail_idx >> 15;
1234 			q->last_avail_idx = q->last_avail_idx & 0x7FFF;
1235 			q->packed.used_phase = q->last_used_idx >> 15;
1236 			q->last_used_idx = q->last_used_idx & 0x7FFF;
1237 
1238 			/* Disable I/O submission notifications, we'll be polling. */
1239 			q->vring.device_event->flags = VRING_PACKED_EVENT_FLAG_DISABLE;
1240 		} else {
1241 			/* Disable I/O submission notifications, we'll be polling. */
1242 			q->vring.used->flags = VRING_USED_F_NO_NOTIFY;
1243 		}
1244 
1245 		q->packed.packed_ring = packed_ring;
1246 		vsession->max_queues = i + 1;
1247 	}
1248 
1249 	if (vhost_get_mem_table(vid, &vsession->mem) != 0) {
1250 		SPDK_ERRLOG("vhost device %d: Failed to get guest memory table\n", vid);
1251 		goto out;
1252 	}
1253 
1254 	/*
1255 	 * Not sure right now but this look like some kind of QEMU bug and guest IO
1256 	 * might be frozed without kicking all queues after live-migration. This look like
1257 	 * the previous vhost instance failed to effectively deliver all interrupts before
1258 	 * the GET_VRING_BASE message. This shouldn't harm guest since spurious interrupts
1259 	 * should be ignored by guest virtio driver.
1260 	 *
1261 	 * Tested on QEMU 2.10.91 and 2.11.50.
1262 	 */
1263 	for (i = 0; i < vsession->max_queues; i++) {
1264 		struct spdk_vhost_virtqueue *q = &vsession->virtqueue[i];
1265 
1266 		/* vring.desc and vring.desc_packed are in a union struct
1267 		 * so q->vring.desc can replace q->vring.desc_packed.
1268 		 */
1269 		if (q->vring.desc != NULL && q->vring.size > 0) {
1270 			rte_vhost_vring_call(vsession->vid, q->vring_idx);
1271 		}
1272 	}
1273 
1274 	vhost_session_set_coalescing(vdev, vsession, NULL);
1275 	vhost_session_mem_register(vsession->mem);
1276 	vsession->initialized = true;
1277 	rc = vdev->backend->start_session(vsession);
1278 	if (rc != 0) {
1279 		vhost_session_mem_unregister(vsession->mem);
1280 		free(vsession->mem);
1281 		goto out;
1282 	}
1283 
1284 out:
1285 	pthread_mutex_unlock(&g_vhost_mutex);
1286 	return rc;
1287 }
1288 
1289 #ifdef SPDK_CONFIG_VHOST_INTERNAL_LIB
1290 int
1291 vhost_get_config_cb(int vid, uint8_t *config, uint32_t len)
1292 {
1293 	struct spdk_vhost_session *vsession;
1294 	struct spdk_vhost_dev *vdev;
1295 	int rc = -1;
1296 
1297 	pthread_mutex_lock(&g_vhost_mutex);
1298 	vsession = vhost_session_find_by_vid(vid);
1299 	if (vsession == NULL) {
1300 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1301 		goto out;
1302 	}
1303 
1304 	vdev = vsession->vdev;
1305 	if (vdev->backend->vhost_get_config) {
1306 		rc = vdev->backend->vhost_get_config(vdev, config, len);
1307 	}
1308 
1309 out:
1310 	pthread_mutex_unlock(&g_vhost_mutex);
1311 	return rc;
1312 }
1313 
1314 int
1315 vhost_set_config_cb(int vid, uint8_t *config, uint32_t offset, uint32_t size, uint32_t flags)
1316 {
1317 	struct spdk_vhost_session *vsession;
1318 	struct spdk_vhost_dev *vdev;
1319 	int rc = -1;
1320 
1321 	pthread_mutex_lock(&g_vhost_mutex);
1322 	vsession = vhost_session_find_by_vid(vid);
1323 	if (vsession == NULL) {
1324 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1325 		goto out;
1326 	}
1327 
1328 	vdev = vsession->vdev;
1329 	if (vdev->backend->vhost_set_config) {
1330 		rc = vdev->backend->vhost_set_config(vdev, config, offset, size, flags);
1331 	}
1332 
1333 out:
1334 	pthread_mutex_unlock(&g_vhost_mutex);
1335 	return rc;
1336 }
1337 #endif
1338 
1339 int
1340 spdk_vhost_set_socket_path(const char *basename)
1341 {
1342 	int ret;
1343 
1344 	if (basename && strlen(basename) > 0) {
1345 		ret = snprintf(dev_dirname, sizeof(dev_dirname) - 2, "%s", basename);
1346 		if (ret <= 0) {
1347 			return -EINVAL;
1348 		}
1349 		if ((size_t)ret >= sizeof(dev_dirname) - 2) {
1350 			SPDK_ERRLOG("Char dev dir path length %d is too long\n", ret);
1351 			return -EINVAL;
1352 		}
1353 
1354 		if (dev_dirname[ret - 1] != '/') {
1355 			dev_dirname[ret] = '/';
1356 			dev_dirname[ret + 1]  = '\0';
1357 		}
1358 	}
1359 
1360 	return 0;
1361 }
1362 
1363 void
1364 vhost_dump_info_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_ctx *w)
1365 {
1366 	assert(vdev->backend->dump_info_json != NULL);
1367 	vdev->backend->dump_info_json(vdev, w);
1368 }
1369 
1370 int
1371 spdk_vhost_dev_remove(struct spdk_vhost_dev *vdev)
1372 {
1373 	if (vdev->pending_async_op_num) {
1374 		return -EBUSY;
1375 	}
1376 
1377 	return vdev->backend->remove_device(vdev);
1378 }
1379 
1380 int
1381 vhost_new_connection_cb(int vid, const char *ifname)
1382 {
1383 	struct spdk_vhost_dev *vdev;
1384 	struct spdk_vhost_session *vsession;
1385 
1386 	pthread_mutex_lock(&g_vhost_mutex);
1387 
1388 	vdev = spdk_vhost_dev_find(ifname);
1389 	if (vdev == NULL) {
1390 		SPDK_ERRLOG("Couldn't find device with vid %d to create connection for.\n", vid);
1391 		pthread_mutex_unlock(&g_vhost_mutex);
1392 		return -1;
1393 	}
1394 
1395 	/* We expect sessions inside vdev->vsessions to be sorted in ascending
1396 	 * order in regard of vsession->id. For now we always set id = vsessions_cnt++
1397 	 * and append each session to the very end of the vsessions list.
1398 	 * This is required for spdk_vhost_dev_foreach_session() to work.
1399 	 */
1400 	if (vdev->vsessions_num == UINT_MAX) {
1401 		assert(false);
1402 		return -EINVAL;
1403 	}
1404 
1405 	if (posix_memalign((void **)&vsession, SPDK_CACHE_LINE_SIZE, sizeof(*vsession) +
1406 			   vdev->backend->session_ctx_size)) {
1407 		SPDK_ERRLOG("vsession alloc failed\n");
1408 		pthread_mutex_unlock(&g_vhost_mutex);
1409 		return -1;
1410 	}
1411 	memset(vsession, 0, sizeof(*vsession) + vdev->backend->session_ctx_size);
1412 
1413 	vsession->vdev = vdev;
1414 	vsession->vid = vid;
1415 	vsession->id = vdev->vsessions_num++;
1416 	vsession->name = spdk_sprintf_alloc("%ss%u", vdev->name, vsession->vid);
1417 	if (vsession->name == NULL) {
1418 		SPDK_ERRLOG("vsession alloc failed\n");
1419 		pthread_mutex_unlock(&g_vhost_mutex);
1420 		free(vsession);
1421 		return -1;
1422 	}
1423 	vsession->started = false;
1424 	vsession->initialized = false;
1425 	vsession->next_stats_check_time = 0;
1426 	vsession->stats_check_interval = SPDK_VHOST_STATS_CHECK_INTERVAL_MS *
1427 					 spdk_get_ticks_hz() / 1000UL;
1428 	TAILQ_INSERT_TAIL(&vdev->vsessions, vsession, tailq);
1429 
1430 	vhost_session_install_rte_compat_hooks(vsession);
1431 	pthread_mutex_unlock(&g_vhost_mutex);
1432 	return 0;
1433 }
1434 
1435 int
1436 vhost_destroy_connection_cb(int vid)
1437 {
1438 	struct spdk_vhost_session *vsession;
1439 	int rc = 0;
1440 
1441 	pthread_mutex_lock(&g_vhost_mutex);
1442 	vsession = vhost_session_find_by_vid(vid);
1443 	if (vsession == NULL) {
1444 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1445 		pthread_mutex_unlock(&g_vhost_mutex);
1446 		return -EINVAL;
1447 	}
1448 
1449 	if (vsession->started) {
1450 		rc = _stop_session(vsession);
1451 	}
1452 
1453 	TAILQ_REMOVE(&vsession->vdev->vsessions, vsession, tailq);
1454 	free(vsession->name);
1455 	free(vsession);
1456 	pthread_mutex_unlock(&g_vhost_mutex);
1457 
1458 	return rc;
1459 }
1460 
1461 void
1462 spdk_vhost_lock(void)
1463 {
1464 	pthread_mutex_lock(&g_vhost_mutex);
1465 }
1466 
1467 int
1468 spdk_vhost_trylock(void)
1469 {
1470 	return -pthread_mutex_trylock(&g_vhost_mutex);
1471 }
1472 
1473 void
1474 spdk_vhost_unlock(void)
1475 {
1476 	pthread_mutex_unlock(&g_vhost_mutex);
1477 }
1478 
1479 void
1480 spdk_vhost_init(spdk_vhost_init_cb init_cb)
1481 {
1482 	size_t len;
1483 	int ret;
1484 
1485 	g_vhost_init_thread = spdk_get_thread();
1486 	assert(g_vhost_init_thread != NULL);
1487 
1488 	if (dev_dirname[0] == '\0') {
1489 		if (getcwd(dev_dirname, sizeof(dev_dirname) - 1) == NULL) {
1490 			SPDK_ERRLOG("getcwd failed (%d): %s\n", errno, spdk_strerror(errno));
1491 			ret = -1;
1492 			goto out;
1493 		}
1494 
1495 		len = strlen(dev_dirname);
1496 		if (dev_dirname[len - 1] != '/') {
1497 			dev_dirname[len] = '/';
1498 			dev_dirname[len + 1] = '\0';
1499 		}
1500 	}
1501 
1502 	ret = sem_init(&g_dpdk_sem, 0, 0);
1503 	if (ret != 0) {
1504 		SPDK_ERRLOG("Failed to initialize semaphore for rte_vhost pthread.\n");
1505 		ret = -1;
1506 		goto out;
1507 	}
1508 
1509 	ret = vhost_scsi_controller_construct();
1510 	if (ret != 0) {
1511 		SPDK_ERRLOG("Cannot construct vhost controllers\n");
1512 		goto out;
1513 	}
1514 
1515 	ret = vhost_blk_controller_construct();
1516 	if (ret != 0) {
1517 		SPDK_ERRLOG("Cannot construct vhost block controllers\n");
1518 		goto out;
1519 	}
1520 
1521 #ifdef SPDK_CONFIG_VHOST_INTERNAL_LIB
1522 	ret = vhost_nvme_controller_construct();
1523 	if (ret != 0) {
1524 		SPDK_ERRLOG("Cannot construct vhost NVMe controllers\n");
1525 		goto out;
1526 	}
1527 #endif
1528 
1529 	spdk_cpuset_zero(&g_vhost_core_mask);
1530 
1531 	/* iterate threads instead of using SPDK_ENV_FOREACH_CORE to ensure that threads are really
1532 	 * created.
1533 	 */
1534 	spdk_for_each_thread(vhost_setup_core_mask, init_cb, vhost_setup_core_mask_done);
1535 	return;
1536 out:
1537 	init_cb(ret);
1538 }
1539 
1540 static void
1541 vhost_fini(void *arg1)
1542 {
1543 	struct spdk_vhost_dev *vdev, *tmp;
1544 
1545 	spdk_vhost_lock();
1546 	vdev = spdk_vhost_dev_next(NULL);
1547 	while (vdev != NULL) {
1548 		tmp = spdk_vhost_dev_next(vdev);
1549 		spdk_vhost_dev_remove(vdev);
1550 		/* don't care if it fails, there's nothing we can do for now */
1551 		vdev = tmp;
1552 	}
1553 	spdk_vhost_unlock();
1554 
1555 	spdk_cpuset_zero(&g_vhost_core_mask);
1556 
1557 	/* All devices are removed now. */
1558 	sem_destroy(&g_dpdk_sem);
1559 
1560 	g_fini_cpl_cb();
1561 }
1562 
1563 static void *
1564 session_shutdown(void *arg)
1565 {
1566 	struct spdk_vhost_dev *vdev = NULL;
1567 
1568 	TAILQ_FOREACH(vdev, &g_vhost_devices, tailq) {
1569 		vhost_driver_unregister(vdev->path);
1570 		vdev->registered = false;
1571 	}
1572 
1573 	SPDK_INFOLOG(SPDK_LOG_VHOST, "Exiting\n");
1574 	spdk_thread_send_msg(g_vhost_init_thread, vhost_fini, NULL);
1575 	return NULL;
1576 }
1577 
1578 void
1579 spdk_vhost_fini(spdk_vhost_fini_cb fini_cb)
1580 {
1581 	pthread_t tid;
1582 	int rc;
1583 
1584 	assert(spdk_get_thread() == g_vhost_init_thread);
1585 	g_fini_cpl_cb = fini_cb;
1586 
1587 	/* rte_vhost API for removing sockets is not asynchronous. Since it may call SPDK
1588 	 * ops for stopping a device or removing a connection, we need to call it from
1589 	 * a separate thread to avoid deadlock.
1590 	 */
1591 	rc = pthread_create(&tid, NULL, &session_shutdown, NULL);
1592 	if (rc < 0) {
1593 		SPDK_ERRLOG("Failed to start session shutdown thread (%d): %s\n", rc, spdk_strerror(rc));
1594 		abort();
1595 	}
1596 	pthread_detach(tid);
1597 }
1598 
1599 void
1600 spdk_vhost_config_json(struct spdk_json_write_ctx *w)
1601 {
1602 	struct spdk_vhost_dev *vdev;
1603 	uint32_t delay_base_us;
1604 	uint32_t iops_threshold;
1605 
1606 	spdk_json_write_array_begin(w);
1607 
1608 	spdk_vhost_lock();
1609 	vdev = spdk_vhost_dev_next(NULL);
1610 	while (vdev != NULL) {
1611 		vdev->backend->write_config_json(vdev, w);
1612 
1613 		spdk_vhost_get_coalescing(vdev, &delay_base_us, &iops_threshold);
1614 		if (delay_base_us) {
1615 			spdk_json_write_object_begin(w);
1616 			spdk_json_write_named_string(w, "method", "vhost_controller_set_coalescing");
1617 
1618 			spdk_json_write_named_object_begin(w, "params");
1619 			spdk_json_write_named_string(w, "ctrlr", vdev->name);
1620 			spdk_json_write_named_uint32(w, "delay_base_us", delay_base_us);
1621 			spdk_json_write_named_uint32(w, "iops_threshold", iops_threshold);
1622 			spdk_json_write_object_end(w);
1623 
1624 			spdk_json_write_object_end(w);
1625 		}
1626 		vdev = spdk_vhost_dev_next(vdev);
1627 	}
1628 	spdk_vhost_unlock();
1629 
1630 	spdk_json_write_array_end(w);
1631 }
1632 
1633 SPDK_LOG_REGISTER_COMPONENT("vhost", SPDK_LOG_VHOST)
1634 SPDK_LOG_REGISTER_COMPONENT("vhost_ring", SPDK_LOG_VHOST_RING)
1635