xref: /spdk/lib/vhost/vhost.c (revision ceea3088870a3919d6bdfe61d7adba11b9733fb7)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/likely.h"
38 #include "spdk/string.h"
39 #include "spdk/util.h"
40 #include "spdk/memory.h"
41 #include "spdk/barrier.h"
42 #include "spdk/vhost.h"
43 #include "vhost_internal.h"
44 
45 static struct spdk_cpuset g_vhost_core_mask;
46 
47 /* Path to folder where character device will be created. Can be set by user. */
48 static char dev_dirname[PATH_MAX] = "";
49 
50 /* Thread performing all vhost management operations */
51 static struct spdk_thread *g_vhost_init_thread;
52 
53 static spdk_vhost_fini_cb g_fini_cpl_cb;
54 
55 /**
56  * DPDK calls our callbacks synchronously but the work those callbacks
57  * perform needs to be async. Luckily, all DPDK callbacks are called on
58  * a DPDK-internal pthread, so we'll just wait on a semaphore in there.
59  */
60 static sem_t g_dpdk_sem;
61 
62 /** Return code for the current DPDK callback */
63 static int g_dpdk_response;
64 
65 struct vhost_session_fn_ctx {
66 	/** Device pointer obtained before enqueuing the event */
67 	struct spdk_vhost_dev *vdev;
68 
69 	/** ID of the session to send event to. */
70 	uint32_t vsession_id;
71 
72 	/** User provided function to be executed on session's thread. */
73 	spdk_vhost_session_fn cb_fn;
74 
75 	/**
76 	 * User provided function to be called on the init thread
77 	 * after iterating through all sessions.
78 	 */
79 	spdk_vhost_dev_fn cpl_fn;
80 
81 	/** Custom user context */
82 	void *user_ctx;
83 };
84 
85 static TAILQ_HEAD(, spdk_vhost_dev) g_vhost_devices = TAILQ_HEAD_INITIALIZER(
86 			g_vhost_devices);
87 static pthread_mutex_t g_vhost_mutex = PTHREAD_MUTEX_INITIALIZER;
88 
89 void *vhost_gpa_to_vva(struct spdk_vhost_session *vsession, uint64_t addr, uint64_t len)
90 {
91 	void *vva;
92 	uint64_t newlen;
93 
94 	newlen = len;
95 	vva = (void *)rte_vhost_va_from_guest_pa(vsession->mem, addr, &newlen);
96 	if (newlen != len) {
97 		return NULL;
98 	}
99 
100 	return vva;
101 
102 }
103 
104 static void
105 vhost_log_req_desc(struct spdk_vhost_session *vsession, struct spdk_vhost_virtqueue *virtqueue,
106 		   uint16_t req_id)
107 {
108 	struct vring_desc *desc, *desc_table;
109 	uint32_t desc_table_size;
110 	int rc;
111 
112 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
113 		return;
114 	}
115 
116 	rc = vhost_vq_get_desc(vsession, virtqueue, req_id, &desc, &desc_table, &desc_table_size);
117 	if (spdk_unlikely(rc != 0)) {
118 		SPDK_ERRLOG("Can't log used ring descriptors!\n");
119 		return;
120 	}
121 
122 	do {
123 		if (vhost_vring_desc_is_wr(desc)) {
124 			/* To be honest, only pages realy touched should be logged, but
125 			 * doing so would require tracking those changes in each backed.
126 			 * Also backend most likely will touch all/most of those pages so
127 			 * for lets assume we touched all pages passed to as writeable buffers. */
128 			rte_vhost_log_write(vsession->vid, desc->addr, desc->len);
129 		}
130 		vhost_vring_desc_get_next(&desc, desc_table, desc_table_size);
131 	} while (desc);
132 }
133 
134 static void
135 vhost_log_used_vring_elem(struct spdk_vhost_session *vsession,
136 			  struct spdk_vhost_virtqueue *virtqueue,
137 			  uint16_t idx)
138 {
139 	uint64_t offset, len;
140 
141 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
142 		return;
143 	}
144 
145 	if (spdk_unlikely(virtqueue->packed.packed_ring)) {
146 		offset = idx * sizeof(struct vring_packed_desc);
147 		len = sizeof(struct vring_packed_desc);
148 	} else {
149 		offset = offsetof(struct vring_used, ring[idx]);
150 		len = sizeof(virtqueue->vring.used->ring[idx]);
151 	}
152 
153 	rte_vhost_log_used_vring(vsession->vid, virtqueue->vring_idx, offset, len);
154 }
155 
156 static void
157 vhost_log_used_vring_idx(struct spdk_vhost_session *vsession,
158 			 struct spdk_vhost_virtqueue *virtqueue)
159 {
160 	uint64_t offset, len;
161 	uint16_t vq_idx;
162 
163 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
164 		return;
165 	}
166 
167 	offset = offsetof(struct vring_used, idx);
168 	len = sizeof(virtqueue->vring.used->idx);
169 	vq_idx = virtqueue - vsession->virtqueue;
170 
171 	rte_vhost_log_used_vring(vsession->vid, vq_idx, offset, len);
172 }
173 
174 /*
175  * Get available requests from avail ring.
176  */
177 uint16_t
178 vhost_vq_avail_ring_get(struct spdk_vhost_virtqueue *virtqueue, uint16_t *reqs,
179 			uint16_t reqs_len)
180 {
181 	struct rte_vhost_vring *vring = &virtqueue->vring;
182 	struct vring_avail *avail = vring->avail;
183 	uint16_t size_mask = vring->size - 1;
184 	uint16_t last_idx = virtqueue->last_avail_idx, avail_idx = avail->idx;
185 	uint16_t count, i;
186 
187 	spdk_smp_rmb();
188 
189 	count = avail_idx - last_idx;
190 	if (spdk_likely(count == 0)) {
191 		return 0;
192 	}
193 
194 	if (spdk_unlikely(count > vring->size)) {
195 		/* TODO: the queue is unrecoverably broken and should be marked so.
196 		 * For now we will fail silently and report there are no new avail entries.
197 		 */
198 		return 0;
199 	}
200 
201 	count = spdk_min(count, reqs_len);
202 	virtqueue->last_avail_idx += count;
203 	for (i = 0; i < count; i++) {
204 		reqs[i] = vring->avail->ring[(last_idx + i) & size_mask];
205 	}
206 
207 	SPDK_DEBUGLOG(vhost_ring,
208 		      "AVAIL: last_idx=%"PRIu16" avail_idx=%"PRIu16" count=%"PRIu16"\n",
209 		      last_idx, avail_idx, count);
210 
211 	return count;
212 }
213 
214 static bool
215 vhost_vring_desc_is_indirect(struct vring_desc *cur_desc)
216 {
217 	return !!(cur_desc->flags & VRING_DESC_F_INDIRECT);
218 }
219 
220 static bool
221 vhost_vring_packed_desc_is_indirect(struct vring_packed_desc *cur_desc)
222 {
223 	return (cur_desc->flags & VRING_DESC_F_INDIRECT) != 0;
224 }
225 
226 int
227 vhost_vq_get_desc(struct spdk_vhost_session *vsession, struct spdk_vhost_virtqueue *virtqueue,
228 		  uint16_t req_idx, struct vring_desc **desc, struct vring_desc **desc_table,
229 		  uint32_t *desc_table_size)
230 {
231 	if (spdk_unlikely(req_idx >= virtqueue->vring.size)) {
232 		return -1;
233 	}
234 
235 	*desc = &virtqueue->vring.desc[req_idx];
236 
237 	if (vhost_vring_desc_is_indirect(*desc)) {
238 		*desc_table_size = (*desc)->len / sizeof(**desc);
239 		*desc_table = vhost_gpa_to_vva(vsession, (*desc)->addr,
240 					       sizeof(**desc) * *desc_table_size);
241 		*desc = *desc_table;
242 		if (*desc == NULL) {
243 			return -1;
244 		}
245 
246 		return 0;
247 	}
248 
249 	*desc_table = virtqueue->vring.desc;
250 	*desc_table_size = virtqueue->vring.size;
251 
252 	return 0;
253 }
254 
255 int
256 vhost_vq_get_desc_packed(struct spdk_vhost_session *vsession,
257 			 struct spdk_vhost_virtqueue *virtqueue,
258 			 uint16_t req_idx, struct vring_packed_desc **desc,
259 			 struct vring_packed_desc **desc_table, uint32_t *desc_table_size)
260 {
261 	*desc =  &virtqueue->vring.desc_packed[req_idx];
262 
263 	/* In packed ring when the desc is non-indirect we get next desc
264 	 * by judging (desc->flag & VRING_DESC_F_NEXT) != 0. When the desc
265 	 * is indirect we get next desc by idx and desc_table_size. It's
266 	 * different from split ring.
267 	 */
268 	if (vhost_vring_packed_desc_is_indirect(*desc)) {
269 		*desc_table_size = (*desc)->len / sizeof(struct vring_packed_desc);
270 		*desc_table = vhost_gpa_to_vva(vsession, (*desc)->addr,
271 					       (*desc)->len);
272 		*desc = *desc_table;
273 		if (spdk_unlikely(*desc == NULL)) {
274 			return -1;
275 		}
276 	} else {
277 		*desc_table = NULL;
278 		*desc_table_size  = 0;
279 	}
280 
281 	return 0;
282 }
283 
284 int
285 vhost_vq_used_signal(struct spdk_vhost_session *vsession,
286 		     struct spdk_vhost_virtqueue *virtqueue)
287 {
288 	if (virtqueue->used_req_cnt == 0) {
289 		return 0;
290 	}
291 
292 	virtqueue->req_cnt += virtqueue->used_req_cnt;
293 	virtqueue->used_req_cnt = 0;
294 
295 	SPDK_DEBUGLOG(vhost_ring,
296 		      "Queue %td - USED RING: sending IRQ: last used %"PRIu16"\n",
297 		      virtqueue - vsession->virtqueue, virtqueue->last_used_idx);
298 
299 	if (rte_vhost_vring_call(vsession->vid, virtqueue->vring_idx) == 0) {
300 		/* interrupt signalled */
301 		return 1;
302 	} else {
303 		/* interrupt not signalled */
304 		return 0;
305 	}
306 }
307 
308 
309 static void
310 check_session_io_stats(struct spdk_vhost_session *vsession, uint64_t now)
311 {
312 	struct spdk_vhost_virtqueue *virtqueue;
313 	uint32_t irq_delay_base = vsession->coalescing_delay_time_base;
314 	uint32_t io_threshold = vsession->coalescing_io_rate_threshold;
315 	int32_t irq_delay;
316 	uint32_t req_cnt;
317 	uint16_t q_idx;
318 
319 	if (now < vsession->next_stats_check_time) {
320 		return;
321 	}
322 
323 	vsession->next_stats_check_time = now + vsession->stats_check_interval;
324 	for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
325 		virtqueue = &vsession->virtqueue[q_idx];
326 
327 		req_cnt = virtqueue->req_cnt + virtqueue->used_req_cnt;
328 		if (req_cnt <= io_threshold) {
329 			continue;
330 		}
331 
332 		irq_delay = (irq_delay_base * (req_cnt - io_threshold)) / io_threshold;
333 		virtqueue->irq_delay_time = (uint32_t) spdk_max(0, irq_delay);
334 
335 		virtqueue->req_cnt = 0;
336 		virtqueue->next_event_time = now;
337 	}
338 }
339 
340 static inline bool
341 vhost_vq_event_is_suppressed(struct spdk_vhost_virtqueue *vq)
342 {
343 	if (spdk_unlikely(vq->packed.packed_ring)) {
344 		if (vq->vring.driver_event->flags & VRING_PACKED_EVENT_FLAG_DISABLE) {
345 			return true;
346 		}
347 	} else {
348 		if (vq->vring.avail->flags & VRING_AVAIL_F_NO_INTERRUPT) {
349 			return true;
350 		}
351 	}
352 
353 	return false;
354 }
355 
356 void
357 vhost_session_used_signal(struct spdk_vhost_session *vsession)
358 {
359 	struct spdk_vhost_virtqueue *virtqueue;
360 	uint64_t now;
361 	uint16_t q_idx;
362 
363 	if (vsession->coalescing_delay_time_base == 0) {
364 		for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
365 			virtqueue = &vsession->virtqueue[q_idx];
366 
367 			if (virtqueue->vring.desc == NULL) {
368 				continue;
369 			}
370 
371 			if (vhost_vq_event_is_suppressed(virtqueue)) {
372 				continue;
373 			}
374 
375 			vhost_vq_used_signal(vsession, virtqueue);
376 		}
377 	} else {
378 		now = spdk_get_ticks();
379 		check_session_io_stats(vsession, now);
380 
381 		for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
382 			virtqueue = &vsession->virtqueue[q_idx];
383 
384 			/* No need for event right now */
385 			if (now < virtqueue->next_event_time) {
386 				continue;
387 			}
388 
389 			if (vhost_vq_event_is_suppressed(virtqueue)) {
390 				continue;
391 			}
392 
393 			if (!vhost_vq_used_signal(vsession, virtqueue)) {
394 				continue;
395 			}
396 
397 			/* Syscall is quite long so update time */
398 			now = spdk_get_ticks();
399 			virtqueue->next_event_time = now + virtqueue->irq_delay_time;
400 		}
401 	}
402 }
403 
404 static int
405 vhost_session_set_coalescing(struct spdk_vhost_dev *vdev,
406 			     struct spdk_vhost_session *vsession, void *ctx)
407 {
408 	vsession->coalescing_delay_time_base =
409 		vdev->coalescing_delay_us * spdk_get_ticks_hz() / 1000000ULL;
410 	vsession->coalescing_io_rate_threshold =
411 		vdev->coalescing_iops_threshold * SPDK_VHOST_STATS_CHECK_INTERVAL_MS / 1000U;
412 	return 0;
413 }
414 
415 static int
416 vhost_dev_set_coalescing(struct spdk_vhost_dev *vdev, uint32_t delay_base_us,
417 			 uint32_t iops_threshold)
418 {
419 	uint64_t delay_time_base = delay_base_us * spdk_get_ticks_hz() / 1000000ULL;
420 	uint32_t io_rate = iops_threshold * SPDK_VHOST_STATS_CHECK_INTERVAL_MS / 1000U;
421 
422 	if (delay_time_base >= UINT32_MAX) {
423 		SPDK_ERRLOG("Delay time of %"PRIu32" is to big\n", delay_base_us);
424 		return -EINVAL;
425 	} else if (io_rate == 0) {
426 		SPDK_ERRLOG("IOPS rate of %"PRIu32" is too low. Min is %u\n", io_rate,
427 			    1000U / SPDK_VHOST_STATS_CHECK_INTERVAL_MS);
428 		return -EINVAL;
429 	}
430 
431 	vdev->coalescing_delay_us = delay_base_us;
432 	vdev->coalescing_iops_threshold = iops_threshold;
433 	return 0;
434 }
435 
436 int
437 spdk_vhost_set_coalescing(struct spdk_vhost_dev *vdev, uint32_t delay_base_us,
438 			  uint32_t iops_threshold)
439 {
440 	int rc;
441 
442 	rc = vhost_dev_set_coalescing(vdev, delay_base_us, iops_threshold);
443 	if (rc != 0) {
444 		return rc;
445 	}
446 
447 	vhost_dev_foreach_session(vdev, vhost_session_set_coalescing, NULL, NULL);
448 	return 0;
449 }
450 
451 void
452 spdk_vhost_get_coalescing(struct spdk_vhost_dev *vdev, uint32_t *delay_base_us,
453 			  uint32_t *iops_threshold)
454 {
455 	if (delay_base_us) {
456 		*delay_base_us = vdev->coalescing_delay_us;
457 	}
458 
459 	if (iops_threshold) {
460 		*iops_threshold = vdev->coalescing_iops_threshold;
461 	}
462 }
463 
464 /*
465  * Enqueue id and len to used ring.
466  */
467 void
468 vhost_vq_used_ring_enqueue(struct spdk_vhost_session *vsession,
469 			   struct spdk_vhost_virtqueue *virtqueue,
470 			   uint16_t id, uint32_t len)
471 {
472 	struct rte_vhost_vring *vring = &virtqueue->vring;
473 	struct vring_used *used = vring->used;
474 	uint16_t last_idx = virtqueue->last_used_idx & (vring->size - 1);
475 	uint16_t vq_idx = virtqueue->vring_idx;
476 
477 	SPDK_DEBUGLOG(vhost_ring,
478 		      "Queue %td - USED RING: last_idx=%"PRIu16" req id=%"PRIu16" len=%"PRIu32"\n",
479 		      virtqueue - vsession->virtqueue, virtqueue->last_used_idx, id, len);
480 
481 	vhost_log_req_desc(vsession, virtqueue, id);
482 
483 	virtqueue->last_used_idx++;
484 	used->ring[last_idx].id = id;
485 	used->ring[last_idx].len = len;
486 
487 	/* Ensure the used ring is updated before we log it or increment used->idx. */
488 	spdk_smp_wmb();
489 
490 	rte_vhost_set_last_inflight_io_split(vsession->vid, vq_idx, id);
491 
492 	vhost_log_used_vring_elem(vsession, virtqueue, last_idx);
493 	* (volatile uint16_t *) &used->idx = virtqueue->last_used_idx;
494 	vhost_log_used_vring_idx(vsession, virtqueue);
495 
496 	rte_vhost_clr_inflight_desc_split(vsession->vid, vq_idx, virtqueue->last_used_idx, id);
497 
498 	virtqueue->used_req_cnt++;
499 }
500 
501 void
502 vhost_vq_packed_ring_enqueue(struct spdk_vhost_session *vsession,
503 			     struct spdk_vhost_virtqueue *virtqueue,
504 			     uint16_t num_descs, uint16_t buffer_id,
505 			     uint32_t length)
506 {
507 	struct vring_packed_desc *desc = &virtqueue->vring.desc_packed[virtqueue->last_used_idx];
508 	bool used, avail;
509 
510 	SPDK_DEBUGLOG(vhost_ring,
511 		      "Queue %td - RING: buffer_id=%"PRIu16"\n",
512 		      virtqueue - vsession->virtqueue, buffer_id);
513 
514 	/* When the descriptor is used, two flags in descriptor
515 	 * avail flag and used flag are set to equal
516 	 * and used flag value == used_wrap_counter.
517 	 */
518 	used = !!(desc->flags & VRING_DESC_F_USED);
519 	avail = !!(desc->flags & VRING_DESC_F_AVAIL);
520 	if (spdk_unlikely(used == virtqueue->packed.used_phase && used == avail)) {
521 		SPDK_ERRLOG("descriptor has been used before\n");
522 		return;
523 	}
524 
525 	/* In used desc addr is unused and len specifies the buffer length
526 	 * that has been written to by the device.
527 	 */
528 	desc->addr = 0;
529 	desc->len = length;
530 
531 	/* This bit specifies whether any data has been written by the device */
532 	if (length != 0) {
533 		desc->flags |= VRING_DESC_F_WRITE;
534 	}
535 
536 	/* Buffer ID is included in the last descriptor in the list.
537 	 * The driver needs to keep track of the size of the list corresponding
538 	 * to each buffer ID.
539 	 */
540 	desc->id = buffer_id;
541 
542 	/* A device MUST NOT make the descriptor used before buffer_id is
543 	 * written to the descriptor.
544 	 */
545 	spdk_smp_wmb();
546 	/* To mark a desc as used, the device sets the F_USED bit in flags to match
547 	 * the internal Device ring wrap counter. It also sets the F_AVAIL bit to
548 	 * match the same value.
549 	 */
550 	if (virtqueue->packed.used_phase) {
551 		desc->flags |= VRING_DESC_F_AVAIL_USED;
552 	} else {
553 		desc->flags &= ~VRING_DESC_F_AVAIL_USED;
554 	}
555 
556 	vhost_log_used_vring_elem(vsession, virtqueue, virtqueue->last_used_idx);
557 	virtqueue->last_used_idx += num_descs;
558 	if (virtqueue->last_used_idx >= virtqueue->vring.size) {
559 		virtqueue->last_used_idx -= virtqueue->vring.size;
560 		virtqueue->packed.used_phase = !virtqueue->packed.used_phase;
561 	}
562 
563 	virtqueue->used_req_cnt++;
564 }
565 
566 bool
567 vhost_vq_packed_ring_is_avail(struct spdk_vhost_virtqueue *virtqueue)
568 {
569 	uint16_t flags = virtqueue->vring.desc_packed[virtqueue->last_avail_idx].flags;
570 
571 	/* To mark a desc as available, the driver sets the F_AVAIL bit in flags
572 	 * to match the internal avail wrap counter. It also sets the F_USED bit to
573 	 * match the inverse value but it's not mandatory.
574 	 */
575 	return (!!(flags & VRING_DESC_F_AVAIL) == virtqueue->packed.avail_phase);
576 }
577 
578 bool
579 vhost_vring_packed_desc_is_wr(struct vring_packed_desc *cur_desc)
580 {
581 	return (cur_desc->flags & VRING_DESC_F_WRITE) != 0;
582 }
583 
584 int
585 vhost_vring_packed_desc_get_next(struct vring_packed_desc **desc, uint16_t *req_idx,
586 				 struct spdk_vhost_virtqueue *vq,
587 				 struct vring_packed_desc *desc_table,
588 				 uint32_t desc_table_size)
589 {
590 	if (desc_table != NULL) {
591 		/* When the desc_table isn't NULL means it's indirect and we get the next
592 		 * desc by req_idx and desc_table_size. The return value is NULL means
593 		 * we reach the last desc of this request.
594 		 */
595 		(*req_idx)++;
596 		if (*req_idx < desc_table_size) {
597 			*desc = &desc_table[*req_idx];
598 		} else {
599 			*desc = NULL;
600 		}
601 	} else {
602 		/* When the desc_table is NULL means it's non-indirect and we get the next
603 		 * desc by req_idx and F_NEXT in flags. The return value is NULL means
604 		 * we reach the last desc of this request. When return new desc
605 		 * we update the req_idx too.
606 		 */
607 		if (((*desc)->flags & VRING_DESC_F_NEXT) == 0) {
608 			*desc = NULL;
609 			return 0;
610 		}
611 
612 		*req_idx = (*req_idx + 1) % vq->vring.size;
613 		*desc = &vq->vring.desc_packed[*req_idx];
614 	}
615 
616 	return 0;
617 }
618 
619 static int
620 vhost_vring_desc_payload_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
621 				uint16_t *iov_index, uintptr_t payload, uint64_t remaining)
622 {
623 	uintptr_t vva;
624 	uint64_t len;
625 
626 	do {
627 		if (*iov_index >= SPDK_VHOST_IOVS_MAX) {
628 			SPDK_ERRLOG("SPDK_VHOST_IOVS_MAX(%d) reached\n", SPDK_VHOST_IOVS_MAX);
629 			return -1;
630 		}
631 		len = remaining;
632 		vva = (uintptr_t)rte_vhost_va_from_guest_pa(vsession->mem, payload, &len);
633 		if (vva == 0 || len == 0) {
634 			SPDK_ERRLOG("gpa_to_vva(%p) == NULL\n", (void *)payload);
635 			return -1;
636 		}
637 		iov[*iov_index].iov_base = (void *)vva;
638 		iov[*iov_index].iov_len = len;
639 		remaining -= len;
640 		payload += len;
641 		(*iov_index)++;
642 	} while (remaining);
643 
644 	return 0;
645 }
646 
647 int
648 vhost_vring_packed_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
649 			       uint16_t *iov_index, const struct vring_packed_desc *desc)
650 {
651 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
652 					       desc->addr, desc->len);
653 }
654 
655 /* 1, Traverse the desc chain to get the buffer_id and return buffer_id as task_idx.
656  * 2, Update the vq->last_avail_idx to point next available desc chain.
657  * 3, Update the avail_wrap_counter if last_avail_idx overturn.
658  */
659 uint16_t
660 vhost_vring_packed_desc_get_buffer_id(struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
661 				      uint16_t *num_descs)
662 {
663 	struct vring_packed_desc *desc;
664 	uint16_t desc_head = req_idx;
665 
666 	*num_descs = 1;
667 
668 	desc =  &vq->vring.desc_packed[req_idx];
669 	if (!vhost_vring_packed_desc_is_indirect(desc)) {
670 		while ((desc->flags & VRING_DESC_F_NEXT) != 0) {
671 			req_idx = (req_idx + 1) % vq->vring.size;
672 			desc = &vq->vring.desc_packed[req_idx];
673 			(*num_descs)++;
674 		}
675 	}
676 
677 	/* Queue Size doesn't have to be a power of 2
678 	 * Device maintains last_avail_idx so we can make sure
679 	 * the value is valid(0 ~ vring.size - 1)
680 	 */
681 	vq->last_avail_idx = (req_idx + 1) % vq->vring.size;
682 	if (vq->last_avail_idx < desc_head) {
683 		vq->packed.avail_phase = !vq->packed.avail_phase;
684 	}
685 
686 	return desc->id;
687 }
688 
689 int
690 vhost_vring_desc_get_next(struct vring_desc **desc,
691 			  struct vring_desc *desc_table, uint32_t desc_table_size)
692 {
693 	struct vring_desc *old_desc = *desc;
694 	uint16_t next_idx;
695 
696 	if ((old_desc->flags & VRING_DESC_F_NEXT) == 0) {
697 		*desc = NULL;
698 		return 0;
699 	}
700 
701 	next_idx = old_desc->next;
702 	if (spdk_unlikely(next_idx >= desc_table_size)) {
703 		*desc = NULL;
704 		return -1;
705 	}
706 
707 	*desc = &desc_table[next_idx];
708 	return 0;
709 }
710 
711 int
712 vhost_vring_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
713 			uint16_t *iov_index, const struct vring_desc *desc)
714 {
715 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
716 					       desc->addr, desc->len);
717 }
718 
719 static struct spdk_vhost_session *
720 vhost_session_find_by_id(struct spdk_vhost_dev *vdev, unsigned id)
721 {
722 	struct spdk_vhost_session *vsession;
723 
724 	TAILQ_FOREACH(vsession, &vdev->vsessions, tailq) {
725 		if (vsession->id == id) {
726 			return vsession;
727 		}
728 	}
729 
730 	return NULL;
731 }
732 
733 struct spdk_vhost_session *
734 vhost_session_find_by_vid(int vid)
735 {
736 	struct spdk_vhost_dev *vdev;
737 	struct spdk_vhost_session *vsession;
738 
739 	TAILQ_FOREACH(vdev, &g_vhost_devices, tailq) {
740 		TAILQ_FOREACH(vsession, &vdev->vsessions, tailq) {
741 			if (vsession->vid == vid) {
742 				return vsession;
743 			}
744 		}
745 	}
746 
747 	return NULL;
748 }
749 
750 struct spdk_vhost_dev *
751 spdk_vhost_dev_next(struct spdk_vhost_dev *vdev)
752 {
753 	if (vdev == NULL) {
754 		return TAILQ_FIRST(&g_vhost_devices);
755 	}
756 
757 	return TAILQ_NEXT(vdev, tailq);
758 }
759 
760 struct spdk_vhost_dev *
761 spdk_vhost_dev_find(const char *ctrlr_name)
762 {
763 	struct spdk_vhost_dev *vdev;
764 	size_t dev_dirname_len = strlen(dev_dirname);
765 
766 	if (strncmp(ctrlr_name, dev_dirname, dev_dirname_len) == 0) {
767 		ctrlr_name += dev_dirname_len;
768 	}
769 
770 	TAILQ_FOREACH(vdev, &g_vhost_devices, tailq) {
771 		if (strcmp(vdev->name, ctrlr_name) == 0) {
772 			return vdev;
773 		}
774 	}
775 
776 	return NULL;
777 }
778 
779 static int
780 vhost_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask)
781 {
782 	int rc;
783 
784 	if (cpumask == NULL) {
785 		return -1;
786 	}
787 
788 	if (mask == NULL) {
789 		spdk_cpuset_copy(cpumask, &g_vhost_core_mask);
790 		return 0;
791 	}
792 
793 	rc = spdk_cpuset_parse(cpumask, mask);
794 	if (rc < 0) {
795 		SPDK_ERRLOG("invalid cpumask %s\n", mask);
796 		return -1;
797 	}
798 
799 	spdk_cpuset_and(cpumask, &g_vhost_core_mask);
800 
801 	if (spdk_cpuset_count(cpumask) == 0) {
802 		SPDK_ERRLOG("no cpu is selected among core mask(=%s)\n",
803 			    spdk_cpuset_fmt(&g_vhost_core_mask));
804 		return -1;
805 	}
806 
807 	return 0;
808 }
809 
810 static void
811 vhost_setup_core_mask(void *ctx)
812 {
813 	struct spdk_thread *thread = spdk_get_thread();
814 	spdk_cpuset_or(&g_vhost_core_mask, spdk_thread_get_cpumask(thread));
815 }
816 
817 static void
818 vhost_setup_core_mask_done(void *ctx)
819 {
820 	spdk_vhost_init_cb init_cb = ctx;
821 
822 	if (spdk_cpuset_count(&g_vhost_core_mask) == 0) {
823 		init_cb(-ECHILD);
824 		return;
825 	}
826 
827 	init_cb(0);
828 }
829 
830 static void
831 vhost_dev_thread_exit(void *arg1)
832 {
833 	spdk_thread_exit(spdk_get_thread());
834 }
835 
836 int
837 vhost_dev_register(struct spdk_vhost_dev *vdev, const char *name, const char *mask_str,
838 		   const struct spdk_vhost_dev_backend *backend)
839 {
840 	char path[PATH_MAX];
841 	struct spdk_cpuset cpumask = {};
842 	int rc;
843 
844 	assert(vdev);
845 	if (name == NULL) {
846 		SPDK_ERRLOG("Can't register controller with no name\n");
847 		return -EINVAL;
848 	}
849 
850 	if (vhost_parse_core_mask(mask_str, &cpumask) != 0) {
851 		SPDK_ERRLOG("cpumask %s is invalid (core mask is 0x%s)\n",
852 			    mask_str, spdk_cpuset_fmt(&g_vhost_core_mask));
853 		return -EINVAL;
854 	}
855 
856 	if (spdk_vhost_dev_find(name)) {
857 		SPDK_ERRLOG("vhost controller %s already exists.\n", name);
858 		return -EEXIST;
859 	}
860 
861 	if (snprintf(path, sizeof(path), "%s%s", dev_dirname, name) >= (int)sizeof(path)) {
862 		SPDK_ERRLOG("Resulting socket path for controller %s is too long: %s%s\n", name, dev_dirname,
863 			    name);
864 		return -EINVAL;
865 	}
866 
867 	vdev->name = strdup(name);
868 	vdev->path = strdup(path);
869 	if (vdev->name == NULL || vdev->path == NULL) {
870 		rc = -EIO;
871 		goto out;
872 	}
873 
874 	vdev->thread = spdk_thread_create(vdev->name, &cpumask);
875 	if (vdev->thread == NULL) {
876 		SPDK_ERRLOG("Failed to create thread for vhost controller %s.\n", name);
877 		rc = -EIO;
878 		goto out;
879 	}
880 
881 	vdev->registered = true;
882 	vdev->backend = backend;
883 	TAILQ_INIT(&vdev->vsessions);
884 
885 	vhost_dev_set_coalescing(vdev, SPDK_VHOST_COALESCING_DELAY_BASE_US,
886 				 SPDK_VHOST_VQ_IOPS_COALESCING_THRESHOLD);
887 
888 	if (vhost_register_unix_socket(path, name, vdev->virtio_features, vdev->disabled_features,
889 				       vdev->protocol_features)) {
890 		spdk_thread_send_msg(vdev->thread, vhost_dev_thread_exit, NULL);
891 		rc = -EIO;
892 		goto out;
893 	}
894 
895 	TAILQ_INSERT_TAIL(&g_vhost_devices, vdev, tailq);
896 
897 	SPDK_INFOLOG(vhost, "Controller %s: new controller added\n", vdev->name);
898 	return 0;
899 
900 out:
901 	free(vdev->name);
902 	free(vdev->path);
903 	return rc;
904 }
905 
906 int
907 vhost_dev_unregister(struct spdk_vhost_dev *vdev)
908 {
909 	if (!TAILQ_EMPTY(&vdev->vsessions)) {
910 		SPDK_ERRLOG("Controller %s has still valid connection.\n", vdev->name);
911 		return -EBUSY;
912 	}
913 
914 	if (vdev->registered && vhost_driver_unregister(vdev->path) != 0) {
915 		SPDK_ERRLOG("Could not unregister controller %s with vhost library\n"
916 			    "Check if domain socket %s still exists\n",
917 			    vdev->name, vdev->path);
918 		return -EIO;
919 	}
920 
921 	SPDK_INFOLOG(vhost, "Controller %s: removed\n", vdev->name);
922 
923 	spdk_thread_send_msg(vdev->thread, vhost_dev_thread_exit, NULL);
924 
925 	free(vdev->name);
926 	free(vdev->path);
927 	TAILQ_REMOVE(&g_vhost_devices, vdev, tailq);
928 	return 0;
929 }
930 
931 const char *
932 spdk_vhost_dev_get_name(struct spdk_vhost_dev *vdev)
933 {
934 	assert(vdev != NULL);
935 	return vdev->name;
936 }
937 
938 const struct spdk_cpuset *
939 spdk_vhost_dev_get_cpumask(struct spdk_vhost_dev *vdev)
940 {
941 	assert(vdev != NULL);
942 	return spdk_thread_get_cpumask(vdev->thread);
943 }
944 
945 static void
946 wait_for_semaphore(int timeout_sec, const char *errmsg)
947 {
948 	struct timespec timeout;
949 	int rc;
950 
951 	clock_gettime(CLOCK_REALTIME, &timeout);
952 	timeout.tv_sec += timeout_sec;
953 	rc = sem_timedwait(&g_dpdk_sem, &timeout);
954 	if (rc != 0) {
955 		SPDK_ERRLOG("Timeout waiting for event: %s.\n", errmsg);
956 		sem_wait(&g_dpdk_sem);
957 	}
958 }
959 
960 static void
961 vhost_session_cb_done(int rc)
962 {
963 	g_dpdk_response = rc;
964 	sem_post(&g_dpdk_sem);
965 }
966 
967 void
968 vhost_session_start_done(struct spdk_vhost_session *vsession, int response)
969 {
970 	if (response == 0) {
971 		vsession->started = true;
972 
973 		assert(vsession->vdev->active_session_num < UINT32_MAX);
974 		vsession->vdev->active_session_num++;
975 	}
976 
977 	vhost_session_cb_done(response);
978 }
979 
980 void
981 vhost_session_stop_done(struct spdk_vhost_session *vsession, int response)
982 {
983 	if (response == 0) {
984 		vsession->started = false;
985 
986 		assert(vsession->vdev->active_session_num > 0);
987 		vsession->vdev->active_session_num--;
988 	}
989 
990 	vhost_session_cb_done(response);
991 }
992 
993 static void
994 vhost_event_cb(void *arg1)
995 {
996 	struct vhost_session_fn_ctx *ctx = arg1;
997 	struct spdk_vhost_session *vsession;
998 
999 	if (pthread_mutex_trylock(&g_vhost_mutex) != 0) {
1000 		spdk_thread_send_msg(spdk_get_thread(), vhost_event_cb, arg1);
1001 		return;
1002 	}
1003 
1004 	vsession = vhost_session_find_by_id(ctx->vdev, ctx->vsession_id);
1005 	ctx->cb_fn(ctx->vdev, vsession, NULL);
1006 	pthread_mutex_unlock(&g_vhost_mutex);
1007 }
1008 
1009 int
1010 vhost_session_send_event(struct spdk_vhost_session *vsession,
1011 			 spdk_vhost_session_fn cb_fn, unsigned timeout_sec,
1012 			 const char *errmsg)
1013 {
1014 	struct vhost_session_fn_ctx ev_ctx = {0};
1015 	struct spdk_vhost_dev *vdev = vsession->vdev;
1016 
1017 	ev_ctx.vdev = vdev;
1018 	ev_ctx.vsession_id = vsession->id;
1019 	ev_ctx.cb_fn = cb_fn;
1020 
1021 	spdk_thread_send_msg(vdev->thread, vhost_event_cb, &ev_ctx);
1022 
1023 	pthread_mutex_unlock(&g_vhost_mutex);
1024 	wait_for_semaphore(timeout_sec, errmsg);
1025 	pthread_mutex_lock(&g_vhost_mutex);
1026 
1027 	return g_dpdk_response;
1028 }
1029 
1030 static void
1031 foreach_session_finish_cb(void *arg1)
1032 {
1033 	struct vhost_session_fn_ctx *ev_ctx = arg1;
1034 	struct spdk_vhost_dev *vdev = ev_ctx->vdev;
1035 
1036 	if (pthread_mutex_trylock(&g_vhost_mutex) != 0) {
1037 		spdk_thread_send_msg(spdk_get_thread(),
1038 				     foreach_session_finish_cb, arg1);
1039 		return;
1040 	}
1041 
1042 	assert(vdev->pending_async_op_num > 0);
1043 	vdev->pending_async_op_num--;
1044 	if (ev_ctx->cpl_fn != NULL) {
1045 		ev_ctx->cpl_fn(vdev, ev_ctx->user_ctx);
1046 	}
1047 
1048 	pthread_mutex_unlock(&g_vhost_mutex);
1049 	free(ev_ctx);
1050 }
1051 
1052 static void
1053 foreach_session(void *arg1)
1054 {
1055 	struct vhost_session_fn_ctx *ev_ctx = arg1;
1056 	struct spdk_vhost_session *vsession;
1057 	struct spdk_vhost_dev *vdev = ev_ctx->vdev;
1058 	int rc;
1059 
1060 	if (pthread_mutex_trylock(&g_vhost_mutex) != 0) {
1061 		spdk_thread_send_msg(spdk_get_thread(), foreach_session, arg1);
1062 		return;
1063 	}
1064 
1065 	TAILQ_FOREACH(vsession, &vdev->vsessions, tailq) {
1066 		if (vsession->initialized) {
1067 			rc = ev_ctx->cb_fn(vdev, vsession, ev_ctx->user_ctx);
1068 			if (rc < 0) {
1069 				goto out;
1070 			}
1071 		}
1072 	}
1073 
1074 out:
1075 	pthread_mutex_unlock(&g_vhost_mutex);
1076 
1077 	spdk_thread_send_msg(g_vhost_init_thread, foreach_session_finish_cb, arg1);
1078 }
1079 
1080 void
1081 vhost_dev_foreach_session(struct spdk_vhost_dev *vdev,
1082 			  spdk_vhost_session_fn fn,
1083 			  spdk_vhost_dev_fn cpl_fn,
1084 			  void *arg)
1085 {
1086 	struct vhost_session_fn_ctx *ev_ctx;
1087 
1088 	ev_ctx = calloc(1, sizeof(*ev_ctx));
1089 	if (ev_ctx == NULL) {
1090 		SPDK_ERRLOG("Failed to alloc vhost event.\n");
1091 		assert(false);
1092 		return;
1093 	}
1094 
1095 	ev_ctx->vdev = vdev;
1096 	ev_ctx->cb_fn = fn;
1097 	ev_ctx->cpl_fn = cpl_fn;
1098 	ev_ctx->user_ctx = arg;
1099 
1100 	assert(vdev->pending_async_op_num < UINT32_MAX);
1101 	vdev->pending_async_op_num++;
1102 
1103 	spdk_thread_send_msg(vdev->thread, foreach_session, ev_ctx);
1104 }
1105 
1106 static int
1107 _stop_session(struct spdk_vhost_session *vsession)
1108 {
1109 	struct spdk_vhost_dev *vdev = vsession->vdev;
1110 	struct spdk_vhost_virtqueue *q;
1111 	int rc;
1112 	uint16_t i;
1113 
1114 	rc = vdev->backend->stop_session(vsession);
1115 	if (rc != 0) {
1116 		SPDK_ERRLOG("Couldn't stop device with vid %d.\n", vsession->vid);
1117 		pthread_mutex_unlock(&g_vhost_mutex);
1118 		return rc;
1119 	}
1120 
1121 	for (i = 0; i < vsession->max_queues; i++) {
1122 		q = &vsession->virtqueue[i];
1123 
1124 		/* vring.desc and vring.desc_packed are in a union struct
1125 		 * so q->vring.desc can replace q->vring.desc_packed.
1126 		 */
1127 		if (q->vring.desc == NULL) {
1128 			continue;
1129 		}
1130 
1131 		/* Packed virtqueues support up to 2^15 entries each
1132 		 * so left one bit can be used as wrap counter.
1133 		 */
1134 		if (q->packed.packed_ring) {
1135 			q->last_avail_idx = q->last_avail_idx |
1136 					    ((uint16_t)q->packed.avail_phase << 15);
1137 			q->last_used_idx = q->last_used_idx |
1138 					   ((uint16_t)q->packed.used_phase << 15);
1139 		}
1140 
1141 		rte_vhost_set_vring_base(vsession->vid, i, q->last_avail_idx, q->last_used_idx);
1142 	}
1143 
1144 	vhost_session_mem_unregister(vsession->mem);
1145 	free(vsession->mem);
1146 
1147 	return 0;
1148 }
1149 
1150 int
1151 vhost_stop_device_cb(int vid)
1152 {
1153 	struct spdk_vhost_session *vsession;
1154 	int rc;
1155 
1156 	pthread_mutex_lock(&g_vhost_mutex);
1157 	vsession = vhost_session_find_by_vid(vid);
1158 	if (vsession == NULL) {
1159 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1160 		pthread_mutex_unlock(&g_vhost_mutex);
1161 		return -EINVAL;
1162 	}
1163 
1164 	if (!vsession->started) {
1165 		/* already stopped, nothing to do */
1166 		pthread_mutex_unlock(&g_vhost_mutex);
1167 		return -EALREADY;
1168 	}
1169 
1170 	rc = _stop_session(vsession);
1171 	pthread_mutex_unlock(&g_vhost_mutex);
1172 
1173 	return rc;
1174 }
1175 
1176 int
1177 vhost_start_device_cb(int vid)
1178 {
1179 	struct spdk_vhost_dev *vdev;
1180 	struct spdk_vhost_session *vsession;
1181 	int rc = -1;
1182 	uint16_t i;
1183 	bool packed_ring;
1184 
1185 	pthread_mutex_lock(&g_vhost_mutex);
1186 
1187 	vsession = vhost_session_find_by_vid(vid);
1188 	if (vsession == NULL) {
1189 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1190 		goto out;
1191 	}
1192 
1193 	vdev = vsession->vdev;
1194 	if (vsession->started) {
1195 		/* already started, nothing to do */
1196 		rc = 0;
1197 		goto out;
1198 	}
1199 
1200 	if (vhost_get_negotiated_features(vid, &vsession->negotiated_features) != 0) {
1201 		SPDK_ERRLOG("vhost device %d: Failed to get negotiated driver features\n", vid);
1202 		goto out;
1203 	}
1204 
1205 	packed_ring = ((vsession->negotiated_features & (1ULL << VIRTIO_F_RING_PACKED)) != 0);
1206 
1207 	vsession->max_queues = 0;
1208 	memset(vsession->virtqueue, 0, sizeof(vsession->virtqueue));
1209 	for (i = 0; i < SPDK_VHOST_MAX_VQUEUES; i++) {
1210 		struct spdk_vhost_virtqueue *q = &vsession->virtqueue[i];
1211 
1212 		q->vring_idx = -1;
1213 		if (rte_vhost_get_vhost_vring(vid, i, &q->vring)) {
1214 			continue;
1215 		}
1216 		q->vring_idx = i;
1217 		rte_vhost_get_vhost_ring_inflight(vid, i, &q->vring_inflight);
1218 
1219 		/* vring.desc and vring.desc_packed are in a union struct
1220 		 * so q->vring.desc can replace q->vring.desc_packed.
1221 		 */
1222 		if (q->vring.desc == NULL || q->vring.size == 0) {
1223 			continue;
1224 		}
1225 
1226 		if (rte_vhost_get_vring_base(vsession->vid, i, &q->last_avail_idx, &q->last_used_idx)) {
1227 			q->vring.desc = NULL;
1228 			continue;
1229 		}
1230 
1231 		if (packed_ring) {
1232 			/* Packed virtqueues support up to 2^15 entries each
1233 			 * so left one bit can be used as wrap counter.
1234 			 */
1235 			q->packed.avail_phase = q->last_avail_idx >> 15;
1236 			q->last_avail_idx = q->last_avail_idx & 0x7FFF;
1237 			q->packed.used_phase = q->last_used_idx >> 15;
1238 			q->last_used_idx = q->last_used_idx & 0x7FFF;
1239 
1240 			/* Disable I/O submission notifications, we'll be polling. */
1241 			q->vring.device_event->flags = VRING_PACKED_EVENT_FLAG_DISABLE;
1242 		} else {
1243 			/* Disable I/O submission notifications, we'll be polling. */
1244 			q->vring.used->flags = VRING_USED_F_NO_NOTIFY;
1245 		}
1246 
1247 		q->packed.packed_ring = packed_ring;
1248 		vsession->max_queues = i + 1;
1249 	}
1250 
1251 	if (vhost_get_mem_table(vid, &vsession->mem) != 0) {
1252 		SPDK_ERRLOG("vhost device %d: Failed to get guest memory table\n", vid);
1253 		goto out;
1254 	}
1255 
1256 	/*
1257 	 * Not sure right now but this look like some kind of QEMU bug and guest IO
1258 	 * might be frozed without kicking all queues after live-migration. This look like
1259 	 * the previous vhost instance failed to effectively deliver all interrupts before
1260 	 * the GET_VRING_BASE message. This shouldn't harm guest since spurious interrupts
1261 	 * should be ignored by guest virtio driver.
1262 	 *
1263 	 * Tested on QEMU 2.10.91 and 2.11.50.
1264 	 */
1265 	for (i = 0; i < vsession->max_queues; i++) {
1266 		struct spdk_vhost_virtqueue *q = &vsession->virtqueue[i];
1267 
1268 		/* vring.desc and vring.desc_packed are in a union struct
1269 		 * so q->vring.desc can replace q->vring.desc_packed.
1270 		 */
1271 		if (q->vring.desc != NULL && q->vring.size > 0) {
1272 			rte_vhost_vring_call(vsession->vid, q->vring_idx);
1273 		}
1274 	}
1275 
1276 	vhost_session_set_coalescing(vdev, vsession, NULL);
1277 	vhost_session_mem_register(vsession->mem);
1278 	vsession->initialized = true;
1279 	rc = vdev->backend->start_session(vsession);
1280 	if (rc != 0) {
1281 		vhost_session_mem_unregister(vsession->mem);
1282 		free(vsession->mem);
1283 		goto out;
1284 	}
1285 
1286 out:
1287 	pthread_mutex_unlock(&g_vhost_mutex);
1288 	return rc;
1289 }
1290 
1291 int
1292 spdk_vhost_set_socket_path(const char *basename)
1293 {
1294 	int ret;
1295 
1296 	if (basename && strlen(basename) > 0) {
1297 		ret = snprintf(dev_dirname, sizeof(dev_dirname) - 2, "%s", basename);
1298 		if (ret <= 0) {
1299 			return -EINVAL;
1300 		}
1301 		if ((size_t)ret >= sizeof(dev_dirname) - 2) {
1302 			SPDK_ERRLOG("Char dev dir path length %d is too long\n", ret);
1303 			return -EINVAL;
1304 		}
1305 
1306 		if (dev_dirname[ret - 1] != '/') {
1307 			dev_dirname[ret] = '/';
1308 			dev_dirname[ret + 1]  = '\0';
1309 		}
1310 	}
1311 
1312 	return 0;
1313 }
1314 
1315 void
1316 vhost_dump_info_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_ctx *w)
1317 {
1318 	assert(vdev->backend->dump_info_json != NULL);
1319 	vdev->backend->dump_info_json(vdev, w);
1320 }
1321 
1322 int
1323 spdk_vhost_dev_remove(struct spdk_vhost_dev *vdev)
1324 {
1325 	if (vdev->pending_async_op_num) {
1326 		return -EBUSY;
1327 	}
1328 
1329 	return vdev->backend->remove_device(vdev);
1330 }
1331 
1332 int
1333 vhost_new_connection_cb(int vid, const char *ifname)
1334 {
1335 	struct spdk_vhost_dev *vdev;
1336 	struct spdk_vhost_session *vsession;
1337 
1338 	pthread_mutex_lock(&g_vhost_mutex);
1339 
1340 	vdev = spdk_vhost_dev_find(ifname);
1341 	if (vdev == NULL) {
1342 		SPDK_ERRLOG("Couldn't find device with vid %d to create connection for.\n", vid);
1343 		pthread_mutex_unlock(&g_vhost_mutex);
1344 		return -1;
1345 	}
1346 
1347 	/* We expect sessions inside vdev->vsessions to be sorted in ascending
1348 	 * order in regard of vsession->id. For now we always set id = vsessions_cnt++
1349 	 * and append each session to the very end of the vsessions list.
1350 	 * This is required for spdk_vhost_dev_foreach_session() to work.
1351 	 */
1352 	if (vdev->vsessions_num == UINT_MAX) {
1353 		assert(false);
1354 		return -EINVAL;
1355 	}
1356 
1357 	if (posix_memalign((void **)&vsession, SPDK_CACHE_LINE_SIZE, sizeof(*vsession) +
1358 			   vdev->backend->session_ctx_size)) {
1359 		SPDK_ERRLOG("vsession alloc failed\n");
1360 		pthread_mutex_unlock(&g_vhost_mutex);
1361 		return -1;
1362 	}
1363 	memset(vsession, 0, sizeof(*vsession) + vdev->backend->session_ctx_size);
1364 
1365 	vsession->vdev = vdev;
1366 	vsession->vid = vid;
1367 	vsession->id = vdev->vsessions_num++;
1368 	vsession->name = spdk_sprintf_alloc("%ss%u", vdev->name, vsession->vid);
1369 	if (vsession->name == NULL) {
1370 		SPDK_ERRLOG("vsession alloc failed\n");
1371 		pthread_mutex_unlock(&g_vhost_mutex);
1372 		free(vsession);
1373 		return -1;
1374 	}
1375 	vsession->started = false;
1376 	vsession->initialized = false;
1377 	vsession->next_stats_check_time = 0;
1378 	vsession->stats_check_interval = SPDK_VHOST_STATS_CHECK_INTERVAL_MS *
1379 					 spdk_get_ticks_hz() / 1000UL;
1380 	TAILQ_INSERT_TAIL(&vdev->vsessions, vsession, tailq);
1381 
1382 	vhost_session_install_rte_compat_hooks(vsession);
1383 	pthread_mutex_unlock(&g_vhost_mutex);
1384 	return 0;
1385 }
1386 
1387 int
1388 vhost_destroy_connection_cb(int vid)
1389 {
1390 	struct spdk_vhost_session *vsession;
1391 	int rc = 0;
1392 
1393 	pthread_mutex_lock(&g_vhost_mutex);
1394 	vsession = vhost_session_find_by_vid(vid);
1395 	if (vsession == NULL) {
1396 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1397 		pthread_mutex_unlock(&g_vhost_mutex);
1398 		return -EINVAL;
1399 	}
1400 
1401 	if (vsession->started) {
1402 		rc = _stop_session(vsession);
1403 	}
1404 
1405 	TAILQ_REMOVE(&vsession->vdev->vsessions, vsession, tailq);
1406 	free(vsession->name);
1407 	free(vsession);
1408 	pthread_mutex_unlock(&g_vhost_mutex);
1409 
1410 	return rc;
1411 }
1412 
1413 void
1414 spdk_vhost_lock(void)
1415 {
1416 	pthread_mutex_lock(&g_vhost_mutex);
1417 }
1418 
1419 int
1420 spdk_vhost_trylock(void)
1421 {
1422 	return -pthread_mutex_trylock(&g_vhost_mutex);
1423 }
1424 
1425 void
1426 spdk_vhost_unlock(void)
1427 {
1428 	pthread_mutex_unlock(&g_vhost_mutex);
1429 }
1430 
1431 void
1432 spdk_vhost_init(spdk_vhost_init_cb init_cb)
1433 {
1434 	size_t len;
1435 	int ret;
1436 
1437 	g_vhost_init_thread = spdk_get_thread();
1438 	assert(g_vhost_init_thread != NULL);
1439 
1440 	if (dev_dirname[0] == '\0') {
1441 		if (getcwd(dev_dirname, sizeof(dev_dirname) - 1) == NULL) {
1442 			SPDK_ERRLOG("getcwd failed (%d): %s\n", errno, spdk_strerror(errno));
1443 			ret = -1;
1444 			goto out;
1445 		}
1446 
1447 		len = strlen(dev_dirname);
1448 		if (dev_dirname[len - 1] != '/') {
1449 			dev_dirname[len] = '/';
1450 			dev_dirname[len + 1] = '\0';
1451 		}
1452 	}
1453 
1454 	ret = sem_init(&g_dpdk_sem, 0, 0);
1455 	if (ret != 0) {
1456 		SPDK_ERRLOG("Failed to initialize semaphore for rte_vhost pthread.\n");
1457 		ret = -1;
1458 		goto out;
1459 	}
1460 
1461 	spdk_cpuset_zero(&g_vhost_core_mask);
1462 
1463 	/* iterate threads instead of using SPDK_ENV_FOREACH_CORE to ensure that threads are really
1464 	 * created.
1465 	 */
1466 	spdk_for_each_thread(vhost_setup_core_mask, init_cb, vhost_setup_core_mask_done);
1467 	return;
1468 out:
1469 	init_cb(ret);
1470 }
1471 
1472 static void
1473 vhost_fini(void *arg1)
1474 {
1475 	struct spdk_vhost_dev *vdev, *tmp;
1476 
1477 	spdk_vhost_lock();
1478 	vdev = spdk_vhost_dev_next(NULL);
1479 	while (vdev != NULL) {
1480 		tmp = spdk_vhost_dev_next(vdev);
1481 		spdk_vhost_dev_remove(vdev);
1482 		/* don't care if it fails, there's nothing we can do for now */
1483 		vdev = tmp;
1484 	}
1485 	spdk_vhost_unlock();
1486 
1487 	spdk_cpuset_zero(&g_vhost_core_mask);
1488 
1489 	/* All devices are removed now. */
1490 	sem_destroy(&g_dpdk_sem);
1491 
1492 	g_fini_cpl_cb();
1493 }
1494 
1495 static void *
1496 session_shutdown(void *arg)
1497 {
1498 	struct spdk_vhost_dev *vdev = NULL;
1499 
1500 	TAILQ_FOREACH(vdev, &g_vhost_devices, tailq) {
1501 		vhost_driver_unregister(vdev->path);
1502 		vdev->registered = false;
1503 	}
1504 
1505 	SPDK_INFOLOG(vhost, "Exiting\n");
1506 	spdk_thread_send_msg(g_vhost_init_thread, vhost_fini, NULL);
1507 	return NULL;
1508 }
1509 
1510 void
1511 spdk_vhost_fini(spdk_vhost_fini_cb fini_cb)
1512 {
1513 	pthread_t tid;
1514 	int rc;
1515 
1516 	assert(spdk_get_thread() == g_vhost_init_thread);
1517 	g_fini_cpl_cb = fini_cb;
1518 
1519 	/* rte_vhost API for removing sockets is not asynchronous. Since it may call SPDK
1520 	 * ops for stopping a device or removing a connection, we need to call it from
1521 	 * a separate thread to avoid deadlock.
1522 	 */
1523 	rc = pthread_create(&tid, NULL, &session_shutdown, NULL);
1524 	if (rc < 0) {
1525 		SPDK_ERRLOG("Failed to start session shutdown thread (%d): %s\n", rc, spdk_strerror(rc));
1526 		abort();
1527 	}
1528 	pthread_detach(tid);
1529 }
1530 
1531 void
1532 spdk_vhost_config_json(struct spdk_json_write_ctx *w)
1533 {
1534 	struct spdk_vhost_dev *vdev;
1535 	uint32_t delay_base_us;
1536 	uint32_t iops_threshold;
1537 
1538 	spdk_json_write_array_begin(w);
1539 
1540 	spdk_vhost_lock();
1541 	vdev = spdk_vhost_dev_next(NULL);
1542 	while (vdev != NULL) {
1543 		vdev->backend->write_config_json(vdev, w);
1544 
1545 		spdk_vhost_get_coalescing(vdev, &delay_base_us, &iops_threshold);
1546 		if (delay_base_us) {
1547 			spdk_json_write_object_begin(w);
1548 			spdk_json_write_named_string(w, "method", "vhost_controller_set_coalescing");
1549 
1550 			spdk_json_write_named_object_begin(w, "params");
1551 			spdk_json_write_named_string(w, "ctrlr", vdev->name);
1552 			spdk_json_write_named_uint32(w, "delay_base_us", delay_base_us);
1553 			spdk_json_write_named_uint32(w, "iops_threshold", iops_threshold);
1554 			spdk_json_write_object_end(w);
1555 
1556 			spdk_json_write_object_end(w);
1557 		}
1558 		vdev = spdk_vhost_dev_next(vdev);
1559 	}
1560 	spdk_vhost_unlock();
1561 
1562 	spdk_json_write_array_end(w);
1563 }
1564 
1565 SPDK_LOG_REGISTER_COMPONENT(vhost)
1566 SPDK_LOG_REGISTER_COMPONENT(vhost_ring)
1567