xref: /spdk/lib/vhost/rte_vhost_user.c (revision 12fbe739a31b09aff0d05f354d4f3bbef99afc55)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2019 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2021 Mellanox Technologies LTD. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/env.h"
10 #include "spdk/likely.h"
11 #include "spdk/string.h"
12 #include "spdk/util.h"
13 #include "spdk/memory.h"
14 #include "spdk/barrier.h"
15 #include "spdk/vhost.h"
16 #include "vhost_internal.h"
17 #include <rte_version.h>
18 
19 #include "spdk_internal/vhost_user.h"
20 
21 /* Path to folder where character device will be created. Can be set by user. */
22 static char g_vhost_user_dev_dirname[PATH_MAX] = "";
23 
24 static struct spdk_thread *g_vhost_user_init_thread;
25 
26 /**
27  * DPDK calls our callbacks synchronously but the work those callbacks
28  * perform needs to be async. Luckily, all DPDK callbacks are called on
29  * a DPDK-internal pthread, so we'll just wait on a semaphore in there.
30  */
31 static sem_t g_dpdk_sem;
32 
33 /** Return code for the current DPDK callback */
34 static int g_dpdk_response;
35 
36 struct vhost_session_fn_ctx {
37 	/** Device pointer obtained before enqueueing the event */
38 	struct spdk_vhost_dev *vdev;
39 
40 	/** ID of the session to send event to. */
41 	uint32_t vsession_id;
42 
43 	/** User provided function to be executed on session's thread. */
44 	spdk_vhost_session_fn cb_fn;
45 
46 	/**
47 	 * User provided function to be called on the init thread
48 	 * after iterating through all sessions.
49 	 */
50 	spdk_vhost_dev_fn cpl_fn;
51 
52 	/** Custom user context */
53 	void *user_ctx;
54 };
55 
56 static int vhost_user_wait_for_session_stop(struct spdk_vhost_session *vsession,
57 		unsigned timeout_sec, const char *errmsg);
58 
59 static void
60 __attribute__((constructor))
61 _vhost_user_sem_init(void)
62 {
63 	if (sem_init(&g_dpdk_sem, 0, 0) != 0) {
64 		SPDK_ERRLOG("Failed to initialize semaphore for rte_vhost pthread.\n");
65 		abort();
66 	}
67 }
68 
69 static void
70 __attribute__((destructor))
71 _vhost_user_sem_destroy(void)
72 {
73 	sem_destroy(&g_dpdk_sem);
74 }
75 
76 void *
77 vhost_gpa_to_vva(struct spdk_vhost_session *vsession, uint64_t addr, uint64_t len)
78 {
79 	void *vva;
80 	uint64_t newlen;
81 
82 	newlen = len;
83 	vva = (void *)rte_vhost_va_from_guest_pa(vsession->mem, addr, &newlen);
84 	if (newlen != len) {
85 		return NULL;
86 	}
87 
88 	return vva;
89 
90 }
91 
92 static void
93 vhost_log_req_desc(struct spdk_vhost_session *vsession, struct spdk_vhost_virtqueue *virtqueue,
94 		   uint16_t req_id)
95 {
96 	struct vring_desc *desc, *desc_table;
97 	uint32_t desc_table_size;
98 	int rc;
99 
100 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
101 		return;
102 	}
103 
104 	rc = vhost_vq_get_desc(vsession, virtqueue, req_id, &desc, &desc_table, &desc_table_size);
105 	if (spdk_unlikely(rc != 0)) {
106 		SPDK_ERRLOG("Can't log used ring descriptors!\n");
107 		return;
108 	}
109 
110 	do {
111 		if (vhost_vring_desc_is_wr(desc)) {
112 			/* To be honest, only pages really touched should be logged, but
113 			 * doing so would require tracking those changes in each backed.
114 			 * Also backend most likely will touch all/most of those pages so
115 			 * for lets assume we touched all pages passed to as writeable buffers. */
116 			rte_vhost_log_write(vsession->vid, desc->addr, desc->len);
117 		}
118 		vhost_vring_desc_get_next(&desc, desc_table, desc_table_size);
119 	} while (desc);
120 }
121 
122 static void
123 vhost_log_used_vring_elem(struct spdk_vhost_session *vsession,
124 			  struct spdk_vhost_virtqueue *virtqueue,
125 			  uint16_t idx)
126 {
127 	uint64_t offset, len;
128 
129 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
130 		return;
131 	}
132 
133 	if (spdk_unlikely(virtqueue->packed.packed_ring)) {
134 		offset = idx * sizeof(struct vring_packed_desc);
135 		len = sizeof(struct vring_packed_desc);
136 	} else {
137 		offset = offsetof(struct vring_used, ring[idx]);
138 		len = sizeof(virtqueue->vring.used->ring[idx]);
139 	}
140 
141 	rte_vhost_log_used_vring(vsession->vid, virtqueue->vring_idx, offset, len);
142 }
143 
144 static void
145 vhost_log_used_vring_idx(struct spdk_vhost_session *vsession,
146 			 struct spdk_vhost_virtqueue *virtqueue)
147 {
148 	uint64_t offset, len;
149 	uint16_t vq_idx;
150 
151 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
152 		return;
153 	}
154 
155 	offset = offsetof(struct vring_used, idx);
156 	len = sizeof(virtqueue->vring.used->idx);
157 	vq_idx = virtqueue - vsession->virtqueue;
158 
159 	rte_vhost_log_used_vring(vsession->vid, vq_idx, offset, len);
160 }
161 
162 /*
163  * Get available requests from avail ring.
164  */
165 uint16_t
166 vhost_vq_avail_ring_get(struct spdk_vhost_virtqueue *virtqueue, uint16_t *reqs,
167 			uint16_t reqs_len)
168 {
169 	struct rte_vhost_vring *vring = &virtqueue->vring;
170 	struct vring_avail *avail = vring->avail;
171 	uint16_t size_mask = vring->size - 1;
172 	uint16_t last_idx = virtqueue->last_avail_idx, avail_idx = avail->idx;
173 	uint16_t count, i;
174 	int rc;
175 	uint64_t u64_value;
176 
177 	spdk_smp_rmb();
178 
179 	if (virtqueue->vsession && spdk_unlikely(virtqueue->vsession->interrupt_mode)) {
180 		/* Read to clear vring's kickfd */
181 		rc = read(vring->kickfd, &u64_value, sizeof(u64_value));
182 		if (rc < 0) {
183 			SPDK_ERRLOG("failed to acknowledge kickfd: %s.\n", spdk_strerror(errno));
184 			return -errno;
185 		}
186 	}
187 
188 	count = avail_idx - last_idx;
189 	if (spdk_likely(count == 0)) {
190 		return 0;
191 	}
192 
193 	if (spdk_unlikely(count > vring->size)) {
194 		/* TODO: the queue is unrecoverably broken and should be marked so.
195 		 * For now we will fail silently and report there are no new avail entries.
196 		 */
197 		return 0;
198 	}
199 
200 	count = spdk_min(count, reqs_len);
201 
202 	virtqueue->last_avail_idx += count;
203 	/* Check whether there are unprocessed reqs in vq, then kick vq manually */
204 	if (virtqueue->vsession && spdk_unlikely(virtqueue->vsession->interrupt_mode)) {
205 		/* If avail_idx is larger than virtqueue's last_avail_idx, then there is unprocessed reqs.
206 		 * avail_idx should get updated here from memory, in case of race condition with guest.
207 		 */
208 		avail_idx = * (volatile uint16_t *) &avail->idx;
209 		if (avail_idx > virtqueue->last_avail_idx) {
210 			/* Write to notify vring's kickfd */
211 			rc = write(vring->kickfd, &u64_value, sizeof(u64_value));
212 			if (rc < 0) {
213 				SPDK_ERRLOG("failed to kick vring: %s.\n", spdk_strerror(errno));
214 				return -errno;
215 			}
216 		}
217 	}
218 
219 	for (i = 0; i < count; i++) {
220 		reqs[i] = vring->avail->ring[(last_idx + i) & size_mask];
221 	}
222 
223 	SPDK_DEBUGLOG(vhost_ring,
224 		      "AVAIL: last_idx=%"PRIu16" avail_idx=%"PRIu16" count=%"PRIu16"\n",
225 		      last_idx, avail_idx, count);
226 
227 	return count;
228 }
229 
230 static bool
231 vhost_vring_desc_is_indirect(struct vring_desc *cur_desc)
232 {
233 	return !!(cur_desc->flags & VRING_DESC_F_INDIRECT);
234 }
235 
236 static bool
237 vhost_vring_packed_desc_is_indirect(struct vring_packed_desc *cur_desc)
238 {
239 	return (cur_desc->flags & VRING_DESC_F_INDIRECT) != 0;
240 }
241 
242 static bool
243 vhost_inflight_packed_desc_is_indirect(spdk_vhost_inflight_desc *cur_desc)
244 {
245 	return (cur_desc->flags & VRING_DESC_F_INDIRECT) != 0;
246 }
247 
248 int
249 vhost_vq_get_desc(struct spdk_vhost_session *vsession, struct spdk_vhost_virtqueue *virtqueue,
250 		  uint16_t req_idx, struct vring_desc **desc, struct vring_desc **desc_table,
251 		  uint32_t *desc_table_size)
252 {
253 	if (spdk_unlikely(req_idx >= virtqueue->vring.size)) {
254 		return -1;
255 	}
256 
257 	*desc = &virtqueue->vring.desc[req_idx];
258 
259 	if (vhost_vring_desc_is_indirect(*desc)) {
260 		*desc_table_size = (*desc)->len / sizeof(**desc);
261 		*desc_table = vhost_gpa_to_vva(vsession, (*desc)->addr,
262 					       sizeof(**desc) * *desc_table_size);
263 		*desc = *desc_table;
264 		if (*desc == NULL) {
265 			return -1;
266 		}
267 
268 		return 0;
269 	}
270 
271 	*desc_table = virtqueue->vring.desc;
272 	*desc_table_size = virtqueue->vring.size;
273 
274 	return 0;
275 }
276 
277 static bool
278 vhost_packed_desc_indirect_to_desc_table(struct spdk_vhost_session *vsession,
279 		uint64_t addr, uint32_t len,
280 		struct vring_packed_desc **desc_table,
281 		uint32_t *desc_table_size)
282 {
283 	*desc_table_size = len / sizeof(struct vring_packed_desc);
284 
285 	*desc_table = vhost_gpa_to_vva(vsession, addr, len);
286 	if (spdk_unlikely(*desc_table == NULL)) {
287 		return false;
288 	}
289 
290 	return true;
291 }
292 
293 int
294 vhost_vq_get_desc_packed(struct spdk_vhost_session *vsession,
295 			 struct spdk_vhost_virtqueue *virtqueue,
296 			 uint16_t req_idx, struct vring_packed_desc **desc,
297 			 struct vring_packed_desc **desc_table, uint32_t *desc_table_size)
298 {
299 	*desc =  &virtqueue->vring.desc_packed[req_idx];
300 
301 	/* In packed ring when the desc is non-indirect we get next desc
302 	 * by judging (desc->flag & VRING_DESC_F_NEXT) != 0. When the desc
303 	 * is indirect we get next desc by idx and desc_table_size. It's
304 	 * different from split ring.
305 	 */
306 	if (vhost_vring_packed_desc_is_indirect(*desc)) {
307 		if (!vhost_packed_desc_indirect_to_desc_table(vsession, (*desc)->addr, (*desc)->len,
308 				desc_table, desc_table_size)) {
309 			return -1;
310 		}
311 
312 		*desc = *desc_table;
313 	} else {
314 		*desc_table = NULL;
315 		*desc_table_size  = 0;
316 	}
317 
318 	return 0;
319 }
320 
321 int
322 vhost_inflight_queue_get_desc(struct spdk_vhost_session *vsession,
323 			      spdk_vhost_inflight_desc *desc_array,
324 			      uint16_t req_idx, spdk_vhost_inflight_desc **desc,
325 			      struct vring_packed_desc  **desc_table, uint32_t *desc_table_size)
326 {
327 	*desc = &desc_array[req_idx];
328 
329 	if (vhost_inflight_packed_desc_is_indirect(*desc)) {
330 		if (!vhost_packed_desc_indirect_to_desc_table(vsession, (*desc)->addr, (*desc)->len,
331 				desc_table, desc_table_size)) {
332 			return -1;
333 		}
334 
335 		/* This desc is the inflight desc not the packed desc.
336 		 * When set the F_INDIRECT the table entry should be the packed desc
337 		 * so set the inflight desc NULL.
338 		 */
339 		*desc = NULL;
340 	} else {
341 		/* When not set the F_INDIRECT means there is no packed desc table */
342 		*desc_table = NULL;
343 		*desc_table_size = 0;
344 	}
345 
346 	return 0;
347 }
348 
349 int
350 vhost_vq_used_signal(struct spdk_vhost_session *vsession,
351 		     struct spdk_vhost_virtqueue *virtqueue)
352 {
353 	if (virtqueue->used_req_cnt == 0) {
354 		return 0;
355 	}
356 
357 	SPDK_DEBUGLOG(vhost_ring,
358 		      "Queue %td - USED RING: sending IRQ: last used %"PRIu16"\n",
359 		      virtqueue - vsession->virtqueue, virtqueue->last_used_idx);
360 
361 #if RTE_VERSION < RTE_VERSION_NUM(22, 11, 0, 0)
362 	if (rte_vhost_vring_call(vsession->vid, virtqueue->vring_idx) == 0) {
363 #else
364 	if (rte_vhost_vring_call_nonblock(vsession->vid, virtqueue->vring_idx) == 0) {
365 #endif
366 		/* interrupt signalled */
367 		virtqueue->req_cnt += virtqueue->used_req_cnt;
368 		virtqueue->used_req_cnt = 0;
369 		return 1;
370 	} else {
371 		/* interrupt not signalled */
372 		return 0;
373 	}
374 }
375 
376 static void
377 session_vq_io_stats_update(struct spdk_vhost_session *vsession,
378 			   struct spdk_vhost_virtqueue *virtqueue, uint64_t now)
379 {
380 	uint32_t irq_delay_base = vsession->coalescing_delay_time_base;
381 	uint32_t io_threshold = vsession->coalescing_io_rate_threshold;
382 	int32_t irq_delay;
383 	uint32_t req_cnt;
384 
385 	req_cnt = virtqueue->req_cnt + virtqueue->used_req_cnt;
386 	if (req_cnt <= io_threshold) {
387 		return;
388 	}
389 
390 	irq_delay = (irq_delay_base * (req_cnt - io_threshold)) / io_threshold;
391 	virtqueue->irq_delay_time = (uint32_t) spdk_max(0, irq_delay);
392 
393 	virtqueue->req_cnt = 0;
394 	virtqueue->next_event_time = now;
395 }
396 
397 static void
398 check_session_vq_io_stats(struct spdk_vhost_session *vsession,
399 			  struct spdk_vhost_virtqueue *virtqueue, uint64_t now)
400 {
401 	if (now < vsession->next_stats_check_time) {
402 		return;
403 	}
404 
405 	vsession->next_stats_check_time = now + vsession->stats_check_interval;
406 	session_vq_io_stats_update(vsession, virtqueue, now);
407 }
408 
409 static inline bool
410 vhost_vq_event_is_suppressed(struct spdk_vhost_virtqueue *vq)
411 {
412 	spdk_smp_mb();
413 
414 	if (spdk_unlikely(vq->packed.packed_ring)) {
415 		if (vq->vring.driver_event->flags & VRING_PACKED_EVENT_FLAG_DISABLE) {
416 			return true;
417 		}
418 	} else {
419 		if (vq->vring.avail->flags & VRING_AVAIL_F_NO_INTERRUPT) {
420 			return true;
421 		}
422 	}
423 
424 	return false;
425 }
426 
427 void
428 vhost_session_vq_used_signal(struct spdk_vhost_virtqueue *virtqueue)
429 {
430 	struct spdk_vhost_session *vsession = virtqueue->vsession;
431 	uint64_t now;
432 
433 	if (vsession->coalescing_delay_time_base == 0) {
434 		if (virtqueue->vring.desc == NULL) {
435 			return;
436 		}
437 
438 		if (vhost_vq_event_is_suppressed(virtqueue)) {
439 			return;
440 		}
441 
442 		vhost_vq_used_signal(vsession, virtqueue);
443 	} else {
444 		now = spdk_get_ticks();
445 		check_session_vq_io_stats(vsession, virtqueue, now);
446 
447 		/* No need for event right now */
448 		if (now < virtqueue->next_event_time) {
449 			return;
450 		}
451 
452 		if (vhost_vq_event_is_suppressed(virtqueue)) {
453 			return;
454 		}
455 
456 		if (!vhost_vq_used_signal(vsession, virtqueue)) {
457 			return;
458 		}
459 
460 		/* Syscall is quite long so update time */
461 		now = spdk_get_ticks();
462 		virtqueue->next_event_time = now + virtqueue->irq_delay_time;
463 	}
464 }
465 
466 /*
467  * Enqueue id and len to used ring.
468  */
469 void
470 vhost_vq_used_ring_enqueue(struct spdk_vhost_session *vsession,
471 			   struct spdk_vhost_virtqueue *virtqueue,
472 			   uint16_t id, uint32_t len)
473 {
474 	struct rte_vhost_vring *vring = &virtqueue->vring;
475 	struct vring_used *used = vring->used;
476 	uint16_t last_idx = virtqueue->last_used_idx & (vring->size - 1);
477 	uint16_t vq_idx = virtqueue->vring_idx;
478 
479 	SPDK_DEBUGLOG(vhost_ring,
480 		      "Queue %td - USED RING: last_idx=%"PRIu16" req id=%"PRIu16" len=%"PRIu32"\n",
481 		      virtqueue - vsession->virtqueue, virtqueue->last_used_idx, id, len);
482 
483 	vhost_log_req_desc(vsession, virtqueue, id);
484 
485 	virtqueue->last_used_idx++;
486 	used->ring[last_idx].id = id;
487 	used->ring[last_idx].len = len;
488 
489 	/* Ensure the used ring is updated before we log it or increment used->idx. */
490 	spdk_smp_wmb();
491 
492 	rte_vhost_set_last_inflight_io_split(vsession->vid, vq_idx, id);
493 
494 	vhost_log_used_vring_elem(vsession, virtqueue, last_idx);
495 	* (volatile uint16_t *) &used->idx = virtqueue->last_used_idx;
496 	vhost_log_used_vring_idx(vsession, virtqueue);
497 
498 	rte_vhost_clr_inflight_desc_split(vsession->vid, vq_idx, virtqueue->last_used_idx, id);
499 
500 	virtqueue->used_req_cnt++;
501 
502 	if (vsession->interrupt_mode) {
503 		if (virtqueue->vring.desc == NULL || vhost_vq_event_is_suppressed(virtqueue)) {
504 			return;
505 		}
506 
507 		vhost_vq_used_signal(vsession, virtqueue);
508 	}
509 }
510 
511 void
512 vhost_vq_packed_ring_enqueue(struct spdk_vhost_session *vsession,
513 			     struct spdk_vhost_virtqueue *virtqueue,
514 			     uint16_t num_descs, uint16_t buffer_id,
515 			     uint32_t length, uint16_t inflight_head)
516 {
517 	struct vring_packed_desc *desc = &virtqueue->vring.desc_packed[virtqueue->last_used_idx];
518 	bool used, avail;
519 
520 	SPDK_DEBUGLOG(vhost_ring,
521 		      "Queue %td - RING: buffer_id=%"PRIu16"\n",
522 		      virtqueue - vsession->virtqueue, buffer_id);
523 
524 	/* When the descriptor is used, two flags in descriptor
525 	 * avail flag and used flag are set to equal
526 	 * and used flag value == used_wrap_counter.
527 	 */
528 	used = !!(desc->flags & VRING_DESC_F_USED);
529 	avail = !!(desc->flags & VRING_DESC_F_AVAIL);
530 	if (spdk_unlikely(used == virtqueue->packed.used_phase && used == avail)) {
531 		SPDK_ERRLOG("descriptor has been used before\n");
532 		return;
533 	}
534 
535 	/* In used desc addr is unused and len specifies the buffer length
536 	 * that has been written to by the device.
537 	 */
538 	desc->addr = 0;
539 	desc->len = length;
540 
541 	/* This bit specifies whether any data has been written by the device */
542 	if (length != 0) {
543 		desc->flags |= VRING_DESC_F_WRITE;
544 	}
545 
546 	/* Buffer ID is included in the last descriptor in the list.
547 	 * The driver needs to keep track of the size of the list corresponding
548 	 * to each buffer ID.
549 	 */
550 	desc->id = buffer_id;
551 
552 	/* A device MUST NOT make the descriptor used before buffer_id is
553 	 * written to the descriptor.
554 	 */
555 	spdk_smp_wmb();
556 
557 	rte_vhost_set_last_inflight_io_packed(vsession->vid, virtqueue->vring_idx, inflight_head);
558 	/* To mark a desc as used, the device sets the F_USED bit in flags to match
559 	 * the internal Device ring wrap counter. It also sets the F_AVAIL bit to
560 	 * match the same value.
561 	 */
562 	if (virtqueue->packed.used_phase) {
563 		desc->flags |= VRING_DESC_F_AVAIL_USED;
564 	} else {
565 		desc->flags &= ~VRING_DESC_F_AVAIL_USED;
566 	}
567 	rte_vhost_clr_inflight_desc_packed(vsession->vid, virtqueue->vring_idx, inflight_head);
568 
569 	vhost_log_used_vring_elem(vsession, virtqueue, virtqueue->last_used_idx);
570 	virtqueue->last_used_idx += num_descs;
571 	if (virtqueue->last_used_idx >= virtqueue->vring.size) {
572 		virtqueue->last_used_idx -= virtqueue->vring.size;
573 		virtqueue->packed.used_phase = !virtqueue->packed.used_phase;
574 	}
575 
576 	virtqueue->used_req_cnt++;
577 }
578 
579 bool
580 vhost_vq_packed_ring_is_avail(struct spdk_vhost_virtqueue *virtqueue)
581 {
582 	uint16_t flags = virtqueue->vring.desc_packed[virtqueue->last_avail_idx].flags;
583 
584 	/* To mark a desc as available, the driver sets the F_AVAIL bit in flags
585 	 * to match the internal avail wrap counter. It also sets the F_USED bit to
586 	 * match the inverse value but it's not mandatory.
587 	 */
588 	return (!!(flags & VRING_DESC_F_AVAIL) == virtqueue->packed.avail_phase);
589 }
590 
591 bool
592 vhost_vring_packed_desc_is_wr(struct vring_packed_desc *cur_desc)
593 {
594 	return (cur_desc->flags & VRING_DESC_F_WRITE) != 0;
595 }
596 
597 bool
598 vhost_vring_inflight_desc_is_wr(spdk_vhost_inflight_desc *cur_desc)
599 {
600 	return (cur_desc->flags & VRING_DESC_F_WRITE) != 0;
601 }
602 
603 int
604 vhost_vring_packed_desc_get_next(struct vring_packed_desc **desc, uint16_t *req_idx,
605 				 struct spdk_vhost_virtqueue *vq,
606 				 struct vring_packed_desc *desc_table,
607 				 uint32_t desc_table_size)
608 {
609 	if (desc_table != NULL) {
610 		/* When the desc_table isn't NULL means it's indirect and we get the next
611 		 * desc by req_idx and desc_table_size. The return value is NULL means
612 		 * we reach the last desc of this request.
613 		 */
614 		(*req_idx)++;
615 		if (*req_idx < desc_table_size) {
616 			*desc = &desc_table[*req_idx];
617 		} else {
618 			*desc = NULL;
619 		}
620 	} else {
621 		/* When the desc_table is NULL means it's non-indirect and we get the next
622 		 * desc by req_idx and F_NEXT in flags. The return value is NULL means
623 		 * we reach the last desc of this request. When return new desc
624 		 * we update the req_idx too.
625 		 */
626 		if (((*desc)->flags & VRING_DESC_F_NEXT) == 0) {
627 			*desc = NULL;
628 			return 0;
629 		}
630 
631 		*req_idx = (*req_idx + 1) % vq->vring.size;
632 		*desc = &vq->vring.desc_packed[*req_idx];
633 	}
634 
635 	return 0;
636 }
637 
638 static int
639 vhost_vring_desc_payload_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
640 				uint16_t *iov_index, uintptr_t payload, uint64_t remaining)
641 {
642 	uintptr_t vva;
643 	uint64_t len;
644 
645 	do {
646 		if (*iov_index >= SPDK_VHOST_IOVS_MAX) {
647 			SPDK_ERRLOG("SPDK_VHOST_IOVS_MAX(%d) reached\n", SPDK_VHOST_IOVS_MAX);
648 			return -1;
649 		}
650 		len = remaining;
651 		vva = (uintptr_t)rte_vhost_va_from_guest_pa(vsession->mem, payload, &len);
652 		if (vva == 0 || len == 0) {
653 			SPDK_ERRLOG("gpa_to_vva(%p) == NULL\n", (void *)payload);
654 			return -1;
655 		}
656 		iov[*iov_index].iov_base = (void *)vva;
657 		iov[*iov_index].iov_len = len;
658 		remaining -= len;
659 		payload += len;
660 		(*iov_index)++;
661 	} while (remaining);
662 
663 	return 0;
664 }
665 
666 int
667 vhost_vring_packed_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
668 			       uint16_t *iov_index, const struct vring_packed_desc *desc)
669 {
670 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
671 					       desc->addr, desc->len);
672 }
673 
674 int
675 vhost_vring_inflight_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
676 				 uint16_t *iov_index, const spdk_vhost_inflight_desc *desc)
677 {
678 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
679 					       desc->addr, desc->len);
680 }
681 
682 /* 1, Traverse the desc chain to get the buffer_id and return buffer_id as task_idx.
683  * 2, Update the vq->last_avail_idx to point next available desc chain.
684  * 3, Update the avail_wrap_counter if last_avail_idx overturn.
685  */
686 uint16_t
687 vhost_vring_packed_desc_get_buffer_id(struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
688 				      uint16_t *num_descs)
689 {
690 	struct vring_packed_desc *desc;
691 	uint16_t desc_head = req_idx;
692 
693 	*num_descs = 1;
694 
695 	desc =  &vq->vring.desc_packed[req_idx];
696 	if (!vhost_vring_packed_desc_is_indirect(desc)) {
697 		while ((desc->flags & VRING_DESC_F_NEXT) != 0) {
698 			req_idx = (req_idx + 1) % vq->vring.size;
699 			desc = &vq->vring.desc_packed[req_idx];
700 			(*num_descs)++;
701 		}
702 	}
703 
704 	/* Queue Size doesn't have to be a power of 2
705 	 * Device maintains last_avail_idx so we can make sure
706 	 * the value is valid(0 ~ vring.size - 1)
707 	 */
708 	vq->last_avail_idx = (req_idx + 1) % vq->vring.size;
709 	if (vq->last_avail_idx < desc_head) {
710 		vq->packed.avail_phase = !vq->packed.avail_phase;
711 	}
712 
713 	return desc->id;
714 }
715 
716 int
717 vhost_vring_desc_get_next(struct vring_desc **desc,
718 			  struct vring_desc *desc_table, uint32_t desc_table_size)
719 {
720 	struct vring_desc *old_desc = *desc;
721 	uint16_t next_idx;
722 
723 	if ((old_desc->flags & VRING_DESC_F_NEXT) == 0) {
724 		*desc = NULL;
725 		return 0;
726 	}
727 
728 	next_idx = old_desc->next;
729 	if (spdk_unlikely(next_idx >= desc_table_size)) {
730 		*desc = NULL;
731 		return -1;
732 	}
733 
734 	*desc = &desc_table[next_idx];
735 	return 0;
736 }
737 
738 int
739 vhost_vring_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
740 			uint16_t *iov_index, const struct vring_desc *desc)
741 {
742 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
743 					       desc->addr, desc->len);
744 }
745 
746 static inline void
747 vhost_session_mem_region_calc(uint64_t *previous_start, uint64_t *start, uint64_t *end,
748 			      uint64_t *len, struct rte_vhost_mem_region *region)
749 {
750 	*start = FLOOR_2MB(region->mmap_addr);
751 	*end = CEIL_2MB(region->mmap_addr + region->mmap_size);
752 	if (*start == *previous_start) {
753 		*start += (size_t) VALUE_2MB;
754 	}
755 	*previous_start = *start;
756 	*len = *end - *start;
757 }
758 
759 void
760 vhost_session_mem_register(struct rte_vhost_memory *mem)
761 {
762 	uint64_t start, end, len;
763 	uint32_t i;
764 	uint64_t previous_start = UINT64_MAX;
765 
766 
767 	for (i = 0; i < mem->nregions; i++) {
768 		vhost_session_mem_region_calc(&previous_start, &start, &end, &len, &mem->regions[i]);
769 		SPDK_INFOLOG(vhost, "Registering VM memory for vtophys translation - 0x%jx len:0x%jx\n",
770 			     start, len);
771 
772 		if (spdk_mem_register((void *)start, len) != 0) {
773 			SPDK_WARNLOG("Failed to register memory region %"PRIu32". Future vtophys translation might fail.\n",
774 				     i);
775 			continue;
776 		}
777 	}
778 }
779 
780 void
781 vhost_session_mem_unregister(struct rte_vhost_memory *mem)
782 {
783 	uint64_t start, end, len;
784 	uint32_t i;
785 	uint64_t previous_start = UINT64_MAX;
786 
787 	for (i = 0; i < mem->nregions; i++) {
788 		vhost_session_mem_region_calc(&previous_start, &start, &end, &len, &mem->regions[i]);
789 		if (spdk_vtophys((void *) start, NULL) == SPDK_VTOPHYS_ERROR) {
790 			continue; /* region has not been registered */
791 		}
792 
793 		if (spdk_mem_unregister((void *)start, len) != 0) {
794 			assert(false);
795 		}
796 	}
797 }
798 
799 static bool
800 vhost_memory_changed(struct rte_vhost_memory *new,
801 		     struct rte_vhost_memory *old)
802 {
803 	uint32_t i;
804 
805 	if (new->nregions != old->nregions) {
806 		return true;
807 	}
808 
809 	for (i = 0; i < new->nregions; ++i) {
810 		struct rte_vhost_mem_region *new_r = &new->regions[i];
811 		struct rte_vhost_mem_region *old_r = &old->regions[i];
812 
813 		if (new_r->guest_phys_addr != old_r->guest_phys_addr) {
814 			return true;
815 		}
816 		if (new_r->size != old_r->size) {
817 			return true;
818 		}
819 		if (new_r->guest_user_addr != old_r->guest_user_addr) {
820 			return true;
821 		}
822 		if (new_r->mmap_addr != old_r->mmap_addr) {
823 			return true;
824 		}
825 		if (new_r->fd != old_r->fd) {
826 			return true;
827 		}
828 	}
829 
830 	return false;
831 }
832 
833 static int
834 vhost_register_memtable_if_required(struct spdk_vhost_session *vsession, int vid)
835 {
836 	struct rte_vhost_memory *new_mem;
837 
838 	if (vhost_get_mem_table(vid, &new_mem) != 0) {
839 		SPDK_ERRLOG("vhost device %d: Failed to get guest memory table\n", vid);
840 		return -1;
841 	}
842 
843 	if (vsession->mem == NULL) {
844 		SPDK_INFOLOG(vhost, "Start to set memtable\n");
845 		vsession->mem = new_mem;
846 		vhost_session_mem_register(vsession->mem);
847 		return 0;
848 	}
849 
850 	if (vhost_memory_changed(new_mem, vsession->mem)) {
851 		SPDK_INFOLOG(vhost, "Memtable is changed\n");
852 		vhost_session_mem_unregister(vsession->mem);
853 		free(vsession->mem);
854 
855 		vsession->mem = new_mem;
856 		vhost_session_mem_register(vsession->mem);
857 		return 0;
858 
859 	}
860 
861 	SPDK_INFOLOG(vhost, "Memtable is unchanged\n");
862 	free(new_mem);
863 	return 0;
864 }
865 
866 static int
867 _stop_session(struct spdk_vhost_session *vsession)
868 {
869 	struct spdk_vhost_virtqueue *q;
870 	int rc;
871 	uint16_t i;
872 
873 	rc = vhost_user_wait_for_session_stop(vsession, 3, "stop session");
874 	if (rc != 0) {
875 		SPDK_ERRLOG("Couldn't stop device with vid %d.\n", vsession->vid);
876 		return rc;
877 	}
878 
879 	for (i = 0; i < vsession->max_queues; i++) {
880 		q = &vsession->virtqueue[i];
881 
882 		/* vring.desc and vring.desc_packed are in a union struct
883 		 * so q->vring.desc can replace q->vring.desc_packed.
884 		 */
885 		if (q->vring.desc == NULL) {
886 			continue;
887 		}
888 
889 		/* Packed virtqueues support up to 2^15 entries each
890 		 * so left one bit can be used as wrap counter.
891 		 */
892 		if (q->packed.packed_ring) {
893 			q->last_avail_idx = q->last_avail_idx |
894 					    ((uint16_t)q->packed.avail_phase << 15);
895 			q->last_used_idx = q->last_used_idx |
896 					   ((uint16_t)q->packed.used_phase << 15);
897 		}
898 
899 		rte_vhost_set_vring_base(vsession->vid, i, q->last_avail_idx, q->last_used_idx);
900 		q->vring.desc = NULL;
901 	}
902 	vsession->max_queues = 0;
903 
904 	return 0;
905 }
906 
907 static int
908 new_connection(int vid)
909 {
910 	struct spdk_vhost_dev *vdev;
911 	struct spdk_vhost_user_dev *user_dev;
912 	struct spdk_vhost_session *vsession;
913 	size_t dev_dirname_len;
914 	char ifname[PATH_MAX];
915 	char *ctrlr_name;
916 
917 	if (rte_vhost_get_ifname(vid, ifname, PATH_MAX) < 0) {
918 		SPDK_ERRLOG("Couldn't get a valid ifname for device with vid %d\n", vid);
919 		return -1;
920 	}
921 
922 	ctrlr_name = &ifname[0];
923 	dev_dirname_len = strlen(g_vhost_user_dev_dirname);
924 	if (strncmp(ctrlr_name, g_vhost_user_dev_dirname, dev_dirname_len) == 0) {
925 		ctrlr_name += dev_dirname_len;
926 	}
927 
928 	spdk_vhost_lock();
929 	vdev = spdk_vhost_dev_find(ctrlr_name);
930 	if (vdev == NULL) {
931 		SPDK_ERRLOG("Couldn't find device with vid %d to create connection for.\n", vid);
932 		spdk_vhost_unlock();
933 		return -1;
934 	}
935 	spdk_vhost_unlock();
936 
937 	user_dev = to_user_dev(vdev);
938 	pthread_mutex_lock(&user_dev->lock);
939 	if (user_dev->registered == false) {
940 		SPDK_ERRLOG("Device %s is unregistered\n", ctrlr_name);
941 		pthread_mutex_unlock(&user_dev->lock);
942 		return -1;
943 	}
944 
945 	/* We expect sessions inside user_dev->vsessions to be sorted in ascending
946 	 * order in regard of vsession->id. For now we always set id = vsessions_num++
947 	 * and append each session to the very end of the vsessions list.
948 	 * This is required for vhost_user_dev_foreach_session() to work.
949 	 */
950 	if (user_dev->vsessions_num == UINT_MAX) {
951 		pthread_mutex_unlock(&user_dev->lock);
952 		assert(false);
953 		return -EINVAL;
954 	}
955 
956 	if (posix_memalign((void **)&vsession, SPDK_CACHE_LINE_SIZE, sizeof(*vsession) +
957 			   user_dev->user_backend->session_ctx_size)) {
958 		SPDK_ERRLOG("vsession alloc failed\n");
959 		pthread_mutex_unlock(&user_dev->lock);
960 		return -1;
961 	}
962 	memset(vsession, 0, sizeof(*vsession) + user_dev->user_backend->session_ctx_size);
963 
964 	vsession->vdev = vdev;
965 	vsession->vid = vid;
966 	vsession->id = user_dev->vsessions_num++;
967 	vsession->name = spdk_sprintf_alloc("%ss%u", vdev->name, vsession->vid);
968 	if (vsession->name == NULL) {
969 		SPDK_ERRLOG("vsession alloc failed\n");
970 		free(vsession);
971 		pthread_mutex_unlock(&user_dev->lock);
972 		return -1;
973 	}
974 	vsession->started = false;
975 	vsession->starting = false;
976 	vsession->next_stats_check_time = 0;
977 	vsession->stats_check_interval = SPDK_VHOST_STATS_CHECK_INTERVAL_MS *
978 					 spdk_get_ticks_hz() / 1000UL;
979 	TAILQ_INSERT_TAIL(&user_dev->vsessions, vsession, tailq);
980 	vhost_session_install_rte_compat_hooks(vsession);
981 	pthread_mutex_unlock(&user_dev->lock);
982 
983 	return 0;
984 }
985 
986 static void
987 vhost_user_session_start(void *arg1)
988 {
989 	struct spdk_vhost_session *vsession = arg1;
990 	struct spdk_vhost_dev *vdev = vsession->vdev;
991 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vsession->vdev);
992 	const struct spdk_vhost_user_dev_backend *backend;
993 	int rc;
994 
995 	SPDK_INFOLOG(vhost, "Starting new session for device %s with vid %d\n", vdev->name, vsession->vid);
996 	pthread_mutex_lock(&user_dev->lock);
997 	vsession->starting = false;
998 	backend = user_dev->user_backend;
999 	rc = backend->start_session(vdev, vsession, NULL);
1000 	if (rc == 0) {
1001 		vsession->started = true;
1002 	}
1003 	pthread_mutex_unlock(&user_dev->lock);
1004 }
1005 
1006 static int
1007 set_device_vq_callfd(struct spdk_vhost_session *vsession, uint16_t qid)
1008 {
1009 	struct spdk_vhost_virtqueue *q;
1010 
1011 	if (qid >= SPDK_VHOST_MAX_VQUEUES) {
1012 		return -EINVAL;
1013 	}
1014 
1015 	q = &vsession->virtqueue[qid];
1016 	/* vq isn't enabled yet */
1017 	if (q->vring_idx != qid) {
1018 		return 0;
1019 	}
1020 
1021 	/* vring.desc and vring.desc_packed are in a union struct
1022 	 * so q->vring.desc can replace q->vring.desc_packed.
1023 	 */
1024 	if (q->vring.desc == NULL || q->vring.size == 0) {
1025 		return 0;
1026 	}
1027 
1028 	/*
1029 	 * Not sure right now but this look like some kind of QEMU bug and guest IO
1030 	 * might be frozed without kicking all queues after live-migration. This look like
1031 	 * the previous vhost instance failed to effectively deliver all interrupts before
1032 	 * the GET_VRING_BASE message. This shouldn't harm guest since spurious interrupts
1033 	 * should be ignored by guest virtio driver.
1034 	 *
1035 	 * Tested on QEMU 2.10.91 and 2.11.50.
1036 	 *
1037 	 * Make sure a successful call of
1038 	 * `rte_vhost_vring_call` will happen
1039 	 * after starting the device.
1040 	 */
1041 	q->used_req_cnt += 1;
1042 
1043 	return 0;
1044 }
1045 
1046 static int
1047 enable_device_vq(struct spdk_vhost_session *vsession, uint16_t qid)
1048 {
1049 	struct spdk_vhost_virtqueue *q;
1050 	bool packed_ring;
1051 	const struct spdk_vhost_user_dev_backend *backend;
1052 	int rc;
1053 
1054 	if (qid >= SPDK_VHOST_MAX_VQUEUES) {
1055 		return -EINVAL;
1056 	}
1057 
1058 	q = &vsession->virtqueue[qid];
1059 	memset(q, 0, sizeof(*q));
1060 	packed_ring = ((vsession->negotiated_features & (1ULL << VIRTIO_F_RING_PACKED)) != 0);
1061 
1062 	q->vsession = vsession;
1063 	q->vring_idx = -1;
1064 	if (rte_vhost_get_vhost_vring(vsession->vid, qid, &q->vring)) {
1065 		return 0;
1066 	}
1067 	q->vring_idx = qid;
1068 	rte_vhost_get_vhost_ring_inflight(vsession->vid, qid, &q->vring_inflight);
1069 
1070 	/* vring.desc and vring.desc_packed are in a union struct
1071 	 * so q->vring.desc can replace q->vring.desc_packed.
1072 	 */
1073 	if (q->vring.desc == NULL || q->vring.size == 0) {
1074 		return 0;
1075 	}
1076 
1077 	if (rte_vhost_get_vring_base(vsession->vid, qid, &q->last_avail_idx, &q->last_used_idx)) {
1078 		q->vring.desc = NULL;
1079 		return 0;
1080 	}
1081 
1082 	backend = to_user_dev(vsession->vdev)->user_backend;
1083 	rc = backend->alloc_vq_tasks(vsession, qid);
1084 	if (rc) {
1085 		return rc;
1086 	}
1087 
1088 	/*
1089 	 * This shouldn't harm guest since spurious interrupts should be ignored by
1090 	 * guest virtio driver.
1091 	 *
1092 	 * Make sure a successful call of `rte_vhost_vring_call` will happen after
1093 	 * restarting the device.
1094 	 */
1095 	if (vsession->needs_restart) {
1096 		q->used_req_cnt += 1;
1097 	}
1098 
1099 	if (packed_ring) {
1100 		/* Since packed ring flag is already negociated between SPDK and VM, VM doesn't
1101 		 * restore `last_avail_idx` and `last_used_idx` for packed ring, so use the
1102 		 * inflight mem to restore the `last_avail_idx` and `last_used_idx`.
1103 		 */
1104 		rte_vhost_get_vring_base_from_inflight(vsession->vid, qid, &q->last_avail_idx,
1105 						       &q->last_used_idx);
1106 
1107 		/* Packed virtqueues support up to 2^15 entries each
1108 		 * so left one bit can be used as wrap counter.
1109 		 */
1110 		q->packed.avail_phase = q->last_avail_idx >> 15;
1111 		q->last_avail_idx = q->last_avail_idx & 0x7FFF;
1112 		q->packed.used_phase = q->last_used_idx >> 15;
1113 		q->last_used_idx = q->last_used_idx & 0x7FFF;
1114 
1115 		if (!spdk_interrupt_mode_is_enabled()) {
1116 			/* Disable I/O submission notifications, we'll be polling. */
1117 			q->vring.device_event->flags = VRING_PACKED_EVENT_FLAG_DISABLE;
1118 		} else {
1119 			/* Enable I/O submission notifications, we'll be interrupting. */
1120 			q->vring.device_event->flags = VRING_PACKED_EVENT_FLAG_ENABLE;
1121 		}
1122 	} else {
1123 		if (!spdk_interrupt_mode_is_enabled()) {
1124 			/* Disable I/O submission notifications, we'll be polling. */
1125 			q->vring.used->flags = VRING_USED_F_NO_NOTIFY;
1126 		} else {
1127 			/* Enable I/O submission notifications, we'll be interrupting. */
1128 			q->vring.used->flags = 0;
1129 		}
1130 	}
1131 
1132 	if (spdk_interrupt_mode_is_enabled() && backend->register_vq_interrupt) {
1133 		backend->register_vq_interrupt(vsession, q);
1134 	}
1135 
1136 	q->packed.packed_ring = packed_ring;
1137 	vsession->max_queues = spdk_max(vsession->max_queues, qid + 1);
1138 
1139 	return 0;
1140 }
1141 
1142 static int
1143 start_device(int vid)
1144 {
1145 	struct spdk_vhost_dev *vdev;
1146 	struct spdk_vhost_session *vsession;
1147 	struct spdk_vhost_user_dev *user_dev;
1148 	int rc = 0;
1149 
1150 	vsession = vhost_session_find_by_vid(vid);
1151 	if (vsession == NULL) {
1152 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1153 		return -1;
1154 	}
1155 	vdev = vsession->vdev;
1156 	user_dev = to_user_dev(vdev);
1157 
1158 	pthread_mutex_lock(&user_dev->lock);
1159 	if (vsession->started) {
1160 		/* already started, nothing to do */
1161 		goto out;
1162 	}
1163 
1164 	if (!vsession->mem) {
1165 		rc = -1;
1166 		SPDK_ERRLOG("Session %s doesn't set memory table yet\n", vsession->name);
1167 		goto out;
1168 	}
1169 
1170 	vsession->starting = true;
1171 	SPDK_INFOLOG(vhost, "Session %s is scheduled to start\n", vsession->name);
1172 	vhost_user_session_set_coalescing(vdev, vsession, NULL);
1173 	spdk_thread_send_msg(vdev->thread, vhost_user_session_start, vsession);
1174 
1175 out:
1176 	pthread_mutex_unlock(&user_dev->lock);
1177 	return rc;
1178 }
1179 
1180 static void
1181 stop_device(int vid)
1182 {
1183 	struct spdk_vhost_session *vsession;
1184 	struct spdk_vhost_user_dev *user_dev;
1185 
1186 	vsession = vhost_session_find_by_vid(vid);
1187 	if (vsession == NULL) {
1188 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1189 		return;
1190 	}
1191 	user_dev = to_user_dev(vsession->vdev);
1192 
1193 	pthread_mutex_lock(&user_dev->lock);
1194 	if (!vsession->started && !vsession->starting) {
1195 		pthread_mutex_unlock(&user_dev->lock);
1196 		/* already stopped, nothing to do */
1197 		return;
1198 	}
1199 
1200 	_stop_session(vsession);
1201 	pthread_mutex_unlock(&user_dev->lock);
1202 }
1203 
1204 static void
1205 destroy_connection(int vid)
1206 {
1207 	struct spdk_vhost_session *vsession;
1208 	struct spdk_vhost_user_dev *user_dev;
1209 
1210 	vsession = vhost_session_find_by_vid(vid);
1211 	if (vsession == NULL) {
1212 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1213 		return;
1214 	}
1215 	user_dev = to_user_dev(vsession->vdev);
1216 
1217 	pthread_mutex_lock(&user_dev->lock);
1218 	if (vsession->started || vsession->starting) {
1219 		if (_stop_session(vsession) != 0) {
1220 			pthread_mutex_unlock(&user_dev->lock);
1221 			return;
1222 		}
1223 	}
1224 
1225 	if (vsession->mem) {
1226 		vhost_session_mem_unregister(vsession->mem);
1227 		free(vsession->mem);
1228 	}
1229 
1230 	TAILQ_REMOVE(&to_user_dev(vsession->vdev)->vsessions, vsession, tailq);
1231 	free(vsession->name);
1232 	free(vsession);
1233 	pthread_mutex_unlock(&user_dev->lock);
1234 }
1235 
1236 static const struct rte_vhost_device_ops g_spdk_vhost_ops = {
1237 	.new_device =  start_device,
1238 	.destroy_device = stop_device,
1239 	.new_connection = new_connection,
1240 	.destroy_connection = destroy_connection,
1241 };
1242 
1243 static struct spdk_vhost_session *
1244 vhost_session_find_by_id(struct spdk_vhost_dev *vdev, unsigned id)
1245 {
1246 	struct spdk_vhost_session *vsession;
1247 
1248 	TAILQ_FOREACH(vsession, &to_user_dev(vdev)->vsessions, tailq) {
1249 		if (vsession->id == id) {
1250 			return vsession;
1251 		}
1252 	}
1253 
1254 	return NULL;
1255 }
1256 
1257 struct spdk_vhost_session *
1258 vhost_session_find_by_vid(int vid)
1259 {
1260 	struct spdk_vhost_dev *vdev;
1261 	struct spdk_vhost_session *vsession;
1262 	struct spdk_vhost_user_dev *user_dev;
1263 
1264 	spdk_vhost_lock();
1265 	for (vdev = spdk_vhost_dev_next(NULL); vdev != NULL;
1266 	     vdev = spdk_vhost_dev_next(vdev)) {
1267 		user_dev = to_user_dev(vdev);
1268 
1269 		pthread_mutex_lock(&user_dev->lock);
1270 		TAILQ_FOREACH(vsession, &user_dev->vsessions, tailq) {
1271 			if (vsession->vid == vid) {
1272 				pthread_mutex_unlock(&user_dev->lock);
1273 				spdk_vhost_unlock();
1274 				return vsession;
1275 			}
1276 		}
1277 		pthread_mutex_unlock(&user_dev->lock);
1278 	}
1279 	spdk_vhost_unlock();
1280 
1281 	return NULL;
1282 }
1283 
1284 static void
1285 wait_for_semaphore(int timeout_sec, const char *errmsg)
1286 {
1287 	struct timespec timeout;
1288 	int rc;
1289 
1290 	clock_gettime(CLOCK_REALTIME, &timeout);
1291 	timeout.tv_sec += timeout_sec;
1292 	rc = sem_timedwait(&g_dpdk_sem, &timeout);
1293 	if (rc != 0) {
1294 		SPDK_ERRLOG("Timeout waiting for event: %s.\n", errmsg);
1295 		sem_wait(&g_dpdk_sem);
1296 	}
1297 }
1298 
1299 void
1300 vhost_user_session_stop_done(struct spdk_vhost_session *vsession, int response)
1301 {
1302 	if (response == 0) {
1303 		vsession->started = false;
1304 	}
1305 
1306 	g_dpdk_response = response;
1307 	sem_post(&g_dpdk_sem);
1308 }
1309 
1310 static void
1311 vhost_user_session_stop_event(void *arg1)
1312 {
1313 	struct vhost_session_fn_ctx *ctx = arg1;
1314 	struct spdk_vhost_dev *vdev = ctx->vdev;
1315 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1316 	struct spdk_vhost_session *vsession;
1317 
1318 	if (pthread_mutex_trylock(&user_dev->lock) != 0) {
1319 		spdk_thread_send_msg(spdk_get_thread(), vhost_user_session_stop_event, arg1);
1320 		return;
1321 	}
1322 
1323 	vsession = vhost_session_find_by_id(vdev, ctx->vsession_id);
1324 	user_dev->user_backend->stop_session(vdev, vsession, NULL);
1325 	pthread_mutex_unlock(&user_dev->lock);
1326 }
1327 
1328 static int
1329 vhost_user_wait_for_session_stop(struct spdk_vhost_session *vsession,
1330 				 unsigned timeout_sec, const char *errmsg)
1331 {
1332 	struct vhost_session_fn_ctx ev_ctx = {0};
1333 	struct spdk_vhost_dev *vdev = vsession->vdev;
1334 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1335 
1336 	ev_ctx.vdev = vdev;
1337 	ev_ctx.vsession_id = vsession->id;
1338 
1339 	spdk_thread_send_msg(vdev->thread, vhost_user_session_stop_event, &ev_ctx);
1340 
1341 	pthread_mutex_unlock(&user_dev->lock);
1342 	wait_for_semaphore(timeout_sec, errmsg);
1343 	pthread_mutex_lock(&user_dev->lock);
1344 
1345 	return g_dpdk_response;
1346 }
1347 
1348 static void
1349 foreach_session_finish_cb(void *arg1)
1350 {
1351 	struct vhost_session_fn_ctx *ev_ctx = arg1;
1352 	struct spdk_vhost_dev *vdev = ev_ctx->vdev;
1353 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1354 
1355 	if (pthread_mutex_trylock(&user_dev->lock) != 0) {
1356 		spdk_thread_send_msg(spdk_get_thread(),
1357 				     foreach_session_finish_cb, arg1);
1358 		return;
1359 	}
1360 
1361 	assert(user_dev->pending_async_op_num > 0);
1362 	user_dev->pending_async_op_num--;
1363 	if (ev_ctx->cpl_fn != NULL) {
1364 		ev_ctx->cpl_fn(vdev, ev_ctx->user_ctx);
1365 	}
1366 
1367 	pthread_mutex_unlock(&user_dev->lock);
1368 	free(ev_ctx);
1369 }
1370 
1371 static void
1372 foreach_session(void *arg1)
1373 {
1374 	struct vhost_session_fn_ctx *ev_ctx = arg1;
1375 	struct spdk_vhost_dev *vdev = ev_ctx->vdev;
1376 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1377 	struct spdk_vhost_session *vsession;
1378 	int rc;
1379 
1380 	if (pthread_mutex_trylock(&user_dev->lock) != 0) {
1381 		spdk_thread_send_msg(spdk_get_thread(), foreach_session, arg1);
1382 		return;
1383 	}
1384 
1385 	TAILQ_FOREACH(vsession, &user_dev->vsessions, tailq) {
1386 		rc = ev_ctx->cb_fn(vdev, vsession, ev_ctx->user_ctx);
1387 		if (rc < 0) {
1388 			goto out;
1389 		}
1390 	}
1391 
1392 out:
1393 	pthread_mutex_unlock(&user_dev->lock);
1394 	spdk_thread_send_msg(g_vhost_user_init_thread, foreach_session_finish_cb, arg1);
1395 }
1396 
1397 void
1398 vhost_user_dev_foreach_session(struct spdk_vhost_dev *vdev,
1399 			       spdk_vhost_session_fn fn,
1400 			       spdk_vhost_dev_fn cpl_fn,
1401 			       void *arg)
1402 {
1403 	struct vhost_session_fn_ctx *ev_ctx;
1404 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1405 
1406 	ev_ctx = calloc(1, sizeof(*ev_ctx));
1407 	if (ev_ctx == NULL) {
1408 		SPDK_ERRLOG("Failed to alloc vhost event.\n");
1409 		assert(false);
1410 		return;
1411 	}
1412 
1413 	ev_ctx->vdev = vdev;
1414 	ev_ctx->cb_fn = fn;
1415 	ev_ctx->cpl_fn = cpl_fn;
1416 	ev_ctx->user_ctx = arg;
1417 
1418 	pthread_mutex_lock(&user_dev->lock);
1419 	assert(user_dev->pending_async_op_num < UINT32_MAX);
1420 	user_dev->pending_async_op_num++;
1421 	pthread_mutex_unlock(&user_dev->lock);
1422 
1423 	spdk_thread_send_msg(vdev->thread, foreach_session, ev_ctx);
1424 }
1425 
1426 void
1427 vhost_user_session_set_interrupt_mode(struct spdk_vhost_session *vsession, bool interrupt_mode)
1428 {
1429 	uint16_t i;
1430 	int rc = 0;
1431 
1432 	for (i = 0; i < vsession->max_queues; i++) {
1433 		struct spdk_vhost_virtqueue *q = &vsession->virtqueue[i];
1434 		uint64_t num_events = 1;
1435 
1436 		/* vring.desc and vring.desc_packed are in a union struct
1437 		 * so q->vring.desc can replace q->vring.desc_packed.
1438 		 */
1439 		if (q->vring.desc == NULL || q->vring.size == 0) {
1440 			continue;
1441 		}
1442 
1443 		if (interrupt_mode) {
1444 
1445 			/* In case of race condition, always kick vring when switch to intr */
1446 			rc = write(q->vring.kickfd, &num_events, sizeof(num_events));
1447 			if (rc < 0) {
1448 				SPDK_ERRLOG("failed to kick vring: %s.\n", spdk_strerror(errno));
1449 			}
1450 
1451 			vsession->interrupt_mode = true;
1452 		} else {
1453 
1454 			vsession->interrupt_mode = false;
1455 		}
1456 	}
1457 }
1458 
1459 static int
1460 extern_vhost_pre_msg_handler(int vid, void *_msg)
1461 {
1462 	struct vhost_user_msg *msg = _msg;
1463 	struct spdk_vhost_session *vsession;
1464 	struct spdk_vhost_user_dev *user_dev;
1465 
1466 	vsession = vhost_session_find_by_vid(vid);
1467 	if (vsession == NULL) {
1468 		SPDK_ERRLOG("Received a message to uninitialized session (vid %d).\n", vid);
1469 		assert(false);
1470 		return RTE_VHOST_MSG_RESULT_ERR;
1471 	}
1472 	user_dev = to_user_dev(vsession->vdev);
1473 
1474 	switch (msg->request) {
1475 	case VHOST_USER_GET_VRING_BASE:
1476 		pthread_mutex_lock(&user_dev->lock);
1477 		if (vsession->started) {
1478 			pthread_mutex_unlock(&user_dev->lock);
1479 			g_spdk_vhost_ops.destroy_device(vid);
1480 			break;
1481 		}
1482 		pthread_mutex_unlock(&user_dev->lock);
1483 		break;
1484 	case VHOST_USER_SET_MEM_TABLE:
1485 		pthread_mutex_lock(&user_dev->lock);
1486 		if (vsession->started) {
1487 			vsession->original_max_queues = vsession->max_queues;
1488 			pthread_mutex_unlock(&user_dev->lock);
1489 			g_spdk_vhost_ops.destroy_device(vid);
1490 			vsession->needs_restart = true;
1491 			break;
1492 		}
1493 		pthread_mutex_unlock(&user_dev->lock);
1494 		break;
1495 	case VHOST_USER_GET_CONFIG: {
1496 		int rc = 0;
1497 
1498 		pthread_mutex_lock(&user_dev->lock);
1499 		if (vsession->vdev->backend->vhost_get_config) {
1500 			rc = vsession->vdev->backend->vhost_get_config(vsession->vdev,
1501 					msg->payload.cfg.region, msg->payload.cfg.size);
1502 			if (rc != 0) {
1503 				msg->size = 0;
1504 			}
1505 		}
1506 		pthread_mutex_unlock(&user_dev->lock);
1507 
1508 		return RTE_VHOST_MSG_RESULT_REPLY;
1509 	}
1510 	case VHOST_USER_SET_CONFIG: {
1511 		int rc = 0;
1512 
1513 		pthread_mutex_lock(&user_dev->lock);
1514 		if (vsession->vdev->backend->vhost_set_config) {
1515 			rc = vsession->vdev->backend->vhost_set_config(vsession->vdev,
1516 					msg->payload.cfg.region, msg->payload.cfg.offset,
1517 					msg->payload.cfg.size, msg->payload.cfg.flags);
1518 		}
1519 		pthread_mutex_unlock(&user_dev->lock);
1520 
1521 		return rc == 0 ? RTE_VHOST_MSG_RESULT_OK : RTE_VHOST_MSG_RESULT_ERR;
1522 	}
1523 	default:
1524 		break;
1525 	}
1526 
1527 	return RTE_VHOST_MSG_RESULT_NOT_HANDLED;
1528 }
1529 
1530 static int
1531 extern_vhost_post_msg_handler(int vid, void *_msg)
1532 {
1533 	struct vhost_user_msg *msg = _msg;
1534 	struct spdk_vhost_session *vsession;
1535 	struct spdk_vhost_user_dev *user_dev;
1536 	uint16_t qid;
1537 	int rc;
1538 
1539 	vsession = vhost_session_find_by_vid(vid);
1540 	if (vsession == NULL) {
1541 		SPDK_ERRLOG("Received a message to uninitialized session (vid %d).\n", vid);
1542 		assert(false);
1543 		return RTE_VHOST_MSG_RESULT_ERR;
1544 	}
1545 	user_dev = to_user_dev(vsession->vdev);
1546 
1547 	switch (msg->request) {
1548 	case VHOST_USER_SET_FEATURES:
1549 		rc = vhost_get_negotiated_features(vid, &vsession->negotiated_features);
1550 		if (rc) {
1551 			SPDK_ERRLOG("vhost device %d: Failed to get negotiated driver features\n", vid);
1552 			return RTE_VHOST_MSG_RESULT_ERR;
1553 		}
1554 		break;
1555 	case VHOST_USER_SET_VRING_CALL:
1556 		qid = (uint16_t)msg->payload.u64;
1557 		rc = set_device_vq_callfd(vsession, qid);
1558 		if (rc) {
1559 			return RTE_VHOST_MSG_RESULT_ERR;
1560 		}
1561 		break;
1562 	case VHOST_USER_SET_VRING_KICK:
1563 		qid = (uint16_t)msg->payload.u64;
1564 		rc = enable_device_vq(vsession, qid);
1565 		if (rc) {
1566 			return RTE_VHOST_MSG_RESULT_ERR;
1567 		}
1568 
1569 		/* vhost-user spec tells us to start polling a queue after receiving
1570 		 * its SET_VRING_KICK message. Let's do it!
1571 		 */
1572 		pthread_mutex_lock(&user_dev->lock);
1573 		if (!vsession->started) {
1574 			pthread_mutex_unlock(&user_dev->lock);
1575 			g_spdk_vhost_ops.new_device(vid);
1576 			return RTE_VHOST_MSG_RESULT_NOT_HANDLED;
1577 		}
1578 		pthread_mutex_unlock(&user_dev->lock);
1579 		break;
1580 	case VHOST_USER_SET_MEM_TABLE:
1581 		vhost_register_memtable_if_required(vsession, vid);
1582 		pthread_mutex_lock(&user_dev->lock);
1583 		if (vsession->needs_restart) {
1584 			pthread_mutex_unlock(&user_dev->lock);
1585 			for (qid = 0; qid < vsession->original_max_queues; qid++) {
1586 				enable_device_vq(vsession, qid);
1587 			}
1588 			vsession->original_max_queues = 0;
1589 			vsession->needs_restart = false;
1590 			g_spdk_vhost_ops.new_device(vid);
1591 			break;
1592 		}
1593 		pthread_mutex_unlock(&user_dev->lock);
1594 		break;
1595 	default:
1596 		break;
1597 	}
1598 
1599 	return RTE_VHOST_MSG_RESULT_NOT_HANDLED;
1600 }
1601 
1602 struct rte_vhost_user_extern_ops g_spdk_extern_vhost_ops = {
1603 	.pre_msg_handle = extern_vhost_pre_msg_handler,
1604 	.post_msg_handle = extern_vhost_post_msg_handler,
1605 };
1606 
1607 void
1608 vhost_session_install_rte_compat_hooks(struct spdk_vhost_session *vsession)
1609 {
1610 	int rc;
1611 
1612 	rc = rte_vhost_extern_callback_register(vsession->vid, &g_spdk_extern_vhost_ops, NULL);
1613 	if (rc != 0) {
1614 		SPDK_ERRLOG("rte_vhost_extern_callback_register() failed for vid = %d\n",
1615 			    vsession->vid);
1616 		return;
1617 	}
1618 }
1619 
1620 int
1621 vhost_register_unix_socket(const char *path, const char *ctrl_name,
1622 			   uint64_t virtio_features, uint64_t disabled_features, uint64_t protocol_features)
1623 {
1624 	struct stat file_stat;
1625 	uint64_t features = 0;
1626 	uint64_t flags = 0;
1627 
1628 	/* Register vhost driver to handle vhost messages. */
1629 	if (stat(path, &file_stat) != -1) {
1630 		if (!S_ISSOCK(file_stat.st_mode)) {
1631 			SPDK_ERRLOG("Cannot create a domain socket at path \"%s\": "
1632 				    "The file already exists and is not a socket.\n",
1633 				    path);
1634 			return -EIO;
1635 		} else if (unlink(path) != 0) {
1636 			SPDK_ERRLOG("Cannot create a domain socket at path \"%s\": "
1637 				    "The socket already exists and failed to unlink.\n",
1638 				    path);
1639 			return -EIO;
1640 		}
1641 	}
1642 
1643 	flags = spdk_iommu_is_enabled() ? 0 : RTE_VHOST_USER_ASYNC_COPY;
1644 	if (rte_vhost_driver_register(path, flags) != 0) {
1645 		SPDK_ERRLOG("Could not register controller %s with vhost library\n", ctrl_name);
1646 		SPDK_ERRLOG("Check if domain socket %s already exists\n", path);
1647 		return -EIO;
1648 	}
1649 	if (rte_vhost_driver_set_features(path, virtio_features) ||
1650 	    rte_vhost_driver_disable_features(path, disabled_features)) {
1651 		SPDK_ERRLOG("Couldn't set vhost features for controller %s\n", ctrl_name);
1652 
1653 		rte_vhost_driver_unregister(path);
1654 		return -EIO;
1655 	}
1656 
1657 	if (rte_vhost_driver_callback_register(path, &g_spdk_vhost_ops) != 0) {
1658 		rte_vhost_driver_unregister(path);
1659 		SPDK_ERRLOG("Couldn't register callbacks for controller %s\n", ctrl_name);
1660 		return -EIO;
1661 	}
1662 
1663 	rte_vhost_driver_get_protocol_features(path, &features);
1664 	features |= protocol_features;
1665 	rte_vhost_driver_set_protocol_features(path, features);
1666 
1667 	if (rte_vhost_driver_start(path) != 0) {
1668 		SPDK_ERRLOG("Failed to start vhost driver for controller %s (%d): %s\n",
1669 			    ctrl_name, errno, spdk_strerror(errno));
1670 		rte_vhost_driver_unregister(path);
1671 		return -EIO;
1672 	}
1673 
1674 	return 0;
1675 }
1676 
1677 int
1678 vhost_get_mem_table(int vid, struct rte_vhost_memory **mem)
1679 {
1680 	return rte_vhost_get_mem_table(vid, mem);
1681 }
1682 
1683 int
1684 vhost_driver_unregister(const char *path)
1685 {
1686 	return rte_vhost_driver_unregister(path);
1687 }
1688 
1689 int
1690 vhost_get_negotiated_features(int vid, uint64_t *negotiated_features)
1691 {
1692 	return rte_vhost_get_negotiated_features(vid, negotiated_features);
1693 }
1694 
1695 int
1696 vhost_user_dev_set_coalescing(struct spdk_vhost_user_dev *user_dev, uint32_t delay_base_us,
1697 			      uint32_t iops_threshold)
1698 {
1699 	uint64_t delay_time_base = delay_base_us * spdk_get_ticks_hz() / 1000000ULL;
1700 	uint32_t io_rate = iops_threshold * SPDK_VHOST_STATS_CHECK_INTERVAL_MS / 1000U;
1701 
1702 	if (delay_time_base >= UINT32_MAX) {
1703 		SPDK_ERRLOG("Delay time of %"PRIu32" is to big\n", delay_base_us);
1704 		return -EINVAL;
1705 	} else if (io_rate == 0) {
1706 		SPDK_ERRLOG("IOPS rate of %"PRIu32" is too low. Min is %u\n", io_rate,
1707 			    1000U / SPDK_VHOST_STATS_CHECK_INTERVAL_MS);
1708 		return -EINVAL;
1709 	}
1710 
1711 	user_dev->coalescing_delay_us = delay_base_us;
1712 	user_dev->coalescing_iops_threshold = iops_threshold;
1713 	return 0;
1714 }
1715 
1716 int
1717 vhost_user_session_set_coalescing(struct spdk_vhost_dev *vdev,
1718 				  struct spdk_vhost_session *vsession, void *ctx)
1719 {
1720 	vsession->coalescing_delay_time_base =
1721 		to_user_dev(vdev)->coalescing_delay_us * spdk_get_ticks_hz() / 1000000ULL;
1722 	vsession->coalescing_io_rate_threshold =
1723 		to_user_dev(vdev)->coalescing_iops_threshold * SPDK_VHOST_STATS_CHECK_INTERVAL_MS / 1000U;
1724 	return 0;
1725 }
1726 
1727 int
1728 vhost_user_set_coalescing(struct spdk_vhost_dev *vdev, uint32_t delay_base_us,
1729 			  uint32_t iops_threshold)
1730 {
1731 	int rc;
1732 
1733 	rc = vhost_user_dev_set_coalescing(to_user_dev(vdev), delay_base_us, iops_threshold);
1734 	if (rc != 0) {
1735 		return rc;
1736 	}
1737 
1738 	vhost_user_dev_foreach_session(vdev, vhost_user_session_set_coalescing, NULL, NULL);
1739 
1740 	return 0;
1741 }
1742 
1743 void
1744 vhost_user_get_coalescing(struct spdk_vhost_dev *vdev, uint32_t *delay_base_us,
1745 			  uint32_t *iops_threshold)
1746 {
1747 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1748 
1749 	if (delay_base_us) {
1750 		*delay_base_us = user_dev->coalescing_delay_us;
1751 	}
1752 
1753 	if (iops_threshold) {
1754 		*iops_threshold = user_dev->coalescing_iops_threshold;
1755 	}
1756 }
1757 
1758 int
1759 spdk_vhost_set_socket_path(const char *basename)
1760 {
1761 	int ret;
1762 
1763 	if (basename && strlen(basename) > 0) {
1764 		ret = snprintf(g_vhost_user_dev_dirname, sizeof(g_vhost_user_dev_dirname) - 2, "%s", basename);
1765 		if (ret <= 0) {
1766 			return -EINVAL;
1767 		}
1768 		if ((size_t)ret >= sizeof(g_vhost_user_dev_dirname) - 2) {
1769 			SPDK_ERRLOG("Char dev dir path length %d is too long\n", ret);
1770 			return -EINVAL;
1771 		}
1772 
1773 		if (g_vhost_user_dev_dirname[ret - 1] != '/') {
1774 			g_vhost_user_dev_dirname[ret] = '/';
1775 			g_vhost_user_dev_dirname[ret + 1]  = '\0';
1776 		}
1777 	}
1778 
1779 	return 0;
1780 }
1781 
1782 static void
1783 vhost_dev_thread_exit(void *arg1)
1784 {
1785 	spdk_thread_exit(spdk_get_thread());
1786 }
1787 
1788 static bool g_vhost_user_started = false;
1789 
1790 int
1791 vhost_user_dev_register(struct spdk_vhost_dev *vdev, const char *name, struct spdk_cpuset *cpumask,
1792 			const struct spdk_vhost_user_dev_backend *user_backend)
1793 {
1794 	char path[PATH_MAX];
1795 	struct spdk_vhost_user_dev *user_dev;
1796 
1797 	if (snprintf(path, sizeof(path), "%s%s", g_vhost_user_dev_dirname, name) >= (int)sizeof(path)) {
1798 		SPDK_ERRLOG("Resulting socket path for controller %s is too long: %s%s\n",
1799 			    name, g_vhost_user_dev_dirname, name);
1800 		return -EINVAL;
1801 	}
1802 
1803 	vdev->path = strdup(path);
1804 	if (vdev->path == NULL) {
1805 		return -EIO;
1806 	}
1807 
1808 	user_dev = calloc(1, sizeof(*user_dev));
1809 	if (user_dev == NULL) {
1810 		free(vdev->path);
1811 		return -ENOMEM;
1812 	}
1813 	vdev->ctxt = user_dev;
1814 
1815 	vdev->thread = spdk_thread_create(vdev->name, cpumask);
1816 	if (vdev->thread == NULL) {
1817 		free(user_dev);
1818 		free(vdev->path);
1819 		SPDK_ERRLOG("Failed to create thread for vhost controller %s.\n", name);
1820 		return -EIO;
1821 	}
1822 
1823 	user_dev->user_backend = user_backend;
1824 	user_dev->vdev = vdev;
1825 	user_dev->registered = true;
1826 	TAILQ_INIT(&user_dev->vsessions);
1827 	pthread_mutex_init(&user_dev->lock, NULL);
1828 
1829 	vhost_user_dev_set_coalescing(user_dev, SPDK_VHOST_COALESCING_DELAY_BASE_US,
1830 				      SPDK_VHOST_VQ_IOPS_COALESCING_THRESHOLD);
1831 
1832 	if (vhost_register_unix_socket(path, name, vdev->virtio_features, vdev->disabled_features,
1833 				       vdev->protocol_features)) {
1834 		spdk_thread_send_msg(vdev->thread, vhost_dev_thread_exit, NULL);
1835 		pthread_mutex_destroy(&user_dev->lock);
1836 		free(user_dev);
1837 		free(vdev->path);
1838 		return -EIO;
1839 	}
1840 
1841 	return 0;
1842 }
1843 
1844 int
1845 vhost_user_dev_unregister(struct spdk_vhost_dev *vdev)
1846 {
1847 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1848 	struct spdk_vhost_session *vsession, *tmp_vsession;
1849 
1850 	pthread_mutex_lock(&user_dev->lock);
1851 	if (user_dev->pending_async_op_num) {
1852 		pthread_mutex_unlock(&user_dev->lock);
1853 		return -EBUSY;
1854 	}
1855 
1856 	/* This is the case that uses RPC call `vhost_delete_controller` while VM is connected */
1857 	if (!TAILQ_EMPTY(&user_dev->vsessions) && g_vhost_user_started) {
1858 		SPDK_ERRLOG("Controller %s has still valid connection.\n", vdev->name);
1859 		pthread_mutex_unlock(&user_dev->lock);
1860 		return -EBUSY;
1861 	}
1862 
1863 	/* This is the case that quits the subsystem while VM is connected, the VM
1864 	 * should be stopped by the shutdown thread.
1865 	 */
1866 	if (!g_vhost_user_started) {
1867 		TAILQ_FOREACH_SAFE(vsession, &user_dev->vsessions, tailq, tmp_vsession) {
1868 			assert(vsession->started == false);
1869 			TAILQ_REMOVE(&user_dev->vsessions, vsession, tailq);
1870 			if (vsession->mem) {
1871 				vhost_session_mem_unregister(vsession->mem);
1872 				free(vsession->mem);
1873 			}
1874 			free(vsession->name);
1875 			free(vsession);
1876 		}
1877 	}
1878 
1879 	user_dev->registered = false;
1880 	pthread_mutex_unlock(&user_dev->lock);
1881 
1882 	/* There are no valid connections now, and it's not an error if the domain
1883 	 * socket was already removed by shutdown thread.
1884 	 */
1885 	vhost_driver_unregister(vdev->path);
1886 
1887 	spdk_thread_send_msg(vdev->thread, vhost_dev_thread_exit, NULL);
1888 	pthread_mutex_destroy(&user_dev->lock);
1889 
1890 	free(user_dev);
1891 	free(vdev->path);
1892 
1893 	return 0;
1894 }
1895 
1896 int
1897 vhost_user_init(void)
1898 {
1899 	size_t len;
1900 
1901 	if (g_vhost_user_started) {
1902 		return 0;
1903 	}
1904 
1905 	if (g_vhost_user_dev_dirname[0] == '\0') {
1906 		if (getcwd(g_vhost_user_dev_dirname, sizeof(g_vhost_user_dev_dirname) - 1) == NULL) {
1907 			SPDK_ERRLOG("getcwd failed (%d): %s\n", errno, spdk_strerror(errno));
1908 			return -1;
1909 		}
1910 
1911 		len = strlen(g_vhost_user_dev_dirname);
1912 		if (g_vhost_user_dev_dirname[len - 1] != '/') {
1913 			g_vhost_user_dev_dirname[len] = '/';
1914 			g_vhost_user_dev_dirname[len + 1] = '\0';
1915 		}
1916 	}
1917 
1918 	g_vhost_user_started = true;
1919 
1920 	g_vhost_user_init_thread = spdk_get_thread();
1921 	assert(g_vhost_user_init_thread != NULL);
1922 
1923 	return 0;
1924 }
1925 
1926 static void
1927 vhost_user_session_shutdown_on_init(void *vhost_cb)
1928 {
1929 	spdk_vhost_fini_cb fn = vhost_cb;
1930 
1931 	fn();
1932 }
1933 
1934 static void *
1935 vhost_user_session_shutdown(void *vhost_cb)
1936 {
1937 	struct spdk_vhost_dev *vdev = NULL;
1938 	struct spdk_vhost_session *vsession;
1939 	struct spdk_vhost_user_dev *user_dev;
1940 	int ret;
1941 
1942 	for (vdev = spdk_vhost_dev_next(NULL); vdev != NULL;
1943 	     vdev = spdk_vhost_dev_next(vdev)) {
1944 		user_dev = to_user_dev(vdev);
1945 		ret = 0;
1946 		pthread_mutex_lock(&user_dev->lock);
1947 		TAILQ_FOREACH(vsession, &user_dev->vsessions, tailq) {
1948 			if (vsession->started || vsession->starting) {
1949 				ret += _stop_session(vsession);
1950 			}
1951 		}
1952 		pthread_mutex_unlock(&user_dev->lock);
1953 		if (ret == 0) {
1954 			vhost_driver_unregister(vdev->path);
1955 		}
1956 	}
1957 
1958 	SPDK_INFOLOG(vhost, "Exiting\n");
1959 	spdk_thread_send_msg(g_vhost_user_init_thread, vhost_user_session_shutdown_on_init, vhost_cb);
1960 	return NULL;
1961 }
1962 
1963 void
1964 vhost_user_fini(spdk_vhost_fini_cb vhost_cb)
1965 {
1966 	pthread_t tid;
1967 	int rc;
1968 
1969 	if (!g_vhost_user_started) {
1970 		vhost_cb();
1971 		return;
1972 	}
1973 
1974 	g_vhost_user_started = false;
1975 
1976 	/* rte_vhost API for removing sockets is not asynchronous. Since it may call SPDK
1977 	 * ops for stopping a device or removing a connection, we need to call it from
1978 	 * a separate thread to avoid deadlock.
1979 	 */
1980 	rc = pthread_create(&tid, NULL, &vhost_user_session_shutdown, vhost_cb);
1981 	if (rc < 0) {
1982 		SPDK_ERRLOG("Failed to start session shutdown thread (%d): %s\n", rc, spdk_strerror(rc));
1983 		abort();
1984 	}
1985 	pthread_detach(tid);
1986 }
1987 
1988 void
1989 vhost_session_info_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_ctx *w)
1990 {
1991 	struct spdk_vhost_session *vsession;
1992 	struct spdk_vhost_user_dev *user_dev;
1993 
1994 	user_dev = to_user_dev(vdev);
1995 	pthread_mutex_lock(&user_dev->lock);
1996 	TAILQ_FOREACH(vsession, &user_dev->vsessions, tailq) {
1997 		spdk_json_write_object_begin(w);
1998 		spdk_json_write_named_uint32(w, "vid", vsession->vid);
1999 		spdk_json_write_named_uint32(w, "id", vsession->id);
2000 		spdk_json_write_named_string(w, "name", vsession->name);
2001 		spdk_json_write_named_bool(w, "started", vsession->started);
2002 		spdk_json_write_named_uint32(w, "max_queues", vsession->max_queues);
2003 		spdk_json_write_named_uint32(w, "inflight_task_cnt", vsession->task_cnt);
2004 		spdk_json_write_object_end(w);
2005 	}
2006 	pthread_mutex_unlock(&user_dev->lock);
2007 }
2008