xref: /spdk/lib/vhost/rte_vhost_user.c (revision 1078198e78653b2f39414c1566740018d76ee73d)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2019 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2021 Mellanox Technologies LTD. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/env.h"
10 #include "spdk/likely.h"
11 #include "spdk/string.h"
12 #include "spdk/util.h"
13 #include "spdk/memory.h"
14 #include "spdk/barrier.h"
15 #include "spdk/vhost.h"
16 #include "vhost_internal.h"
17 #include <rte_version.h>
18 
19 #include "spdk_internal/vhost_user.h"
20 
21 /* Path to folder where character device will be created. Can be set by user. */
22 static char g_vhost_user_dev_dirname[PATH_MAX] = "";
23 
24 static struct spdk_thread *g_vhost_user_init_thread;
25 
26 /**
27  * DPDK calls our callbacks synchronously but the work those callbacks
28  * perform needs to be async. Luckily, all DPDK callbacks are called on
29  * a DPDK-internal pthread, so we'll just wait on a semaphore in there.
30  */
31 static sem_t g_dpdk_sem;
32 
33 /** Return code for the current DPDK callback */
34 static int g_dpdk_response;
35 
36 struct vhost_session_fn_ctx {
37 	/** Device pointer obtained before enqueueing the event */
38 	struct spdk_vhost_dev *vdev;
39 
40 	/** ID of the session to send event to. */
41 	uint32_t vsession_id;
42 
43 	/** User provided function to be executed on session's thread. */
44 	spdk_vhost_session_fn cb_fn;
45 
46 	/**
47 	 * User provided function to be called on the init thread
48 	 * after iterating through all sessions.
49 	 */
50 	spdk_vhost_dev_fn cpl_fn;
51 
52 	/** Custom user context */
53 	void *user_ctx;
54 };
55 
56 static int vhost_user_wait_for_session_stop(struct spdk_vhost_session *vsession,
57 		unsigned timeout_sec, const char *errmsg);
58 
59 static void
60 __attribute__((constructor))
61 _vhost_user_sem_init(void)
62 {
63 	if (sem_init(&g_dpdk_sem, 0, 0) != 0) {
64 		SPDK_ERRLOG("Failed to initialize semaphore for rte_vhost pthread.\n");
65 		abort();
66 	}
67 }
68 
69 static void
70 __attribute__((destructor))
71 _vhost_user_sem_destroy(void)
72 {
73 	sem_destroy(&g_dpdk_sem);
74 }
75 
76 void *
77 vhost_gpa_to_vva(struct spdk_vhost_session *vsession, uint64_t addr, uint64_t len)
78 {
79 	void *vva;
80 	uint64_t newlen;
81 
82 	newlen = len;
83 	vva = (void *)rte_vhost_va_from_guest_pa(vsession->mem, addr, &newlen);
84 	if (newlen != len) {
85 		return NULL;
86 	}
87 
88 	return vva;
89 
90 }
91 
92 static void
93 vhost_log_req_desc(struct spdk_vhost_session *vsession, struct spdk_vhost_virtqueue *virtqueue,
94 		   uint16_t req_id)
95 {
96 	struct vring_desc *desc, *desc_table;
97 	uint32_t desc_table_size;
98 	int rc;
99 
100 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
101 		return;
102 	}
103 
104 	rc = vhost_vq_get_desc(vsession, virtqueue, req_id, &desc, &desc_table, &desc_table_size);
105 	if (spdk_unlikely(rc != 0)) {
106 		SPDK_ERRLOG("Can't log used ring descriptors!\n");
107 		return;
108 	}
109 
110 	do {
111 		if (vhost_vring_desc_is_wr(desc)) {
112 			/* To be honest, only pages really touched should be logged, but
113 			 * doing so would require tracking those changes in each backed.
114 			 * Also backend most likely will touch all/most of those pages so
115 			 * for lets assume we touched all pages passed to as writeable buffers. */
116 			rte_vhost_log_write(vsession->vid, desc->addr, desc->len);
117 		}
118 		vhost_vring_desc_get_next(&desc, desc_table, desc_table_size);
119 	} while (desc);
120 }
121 
122 static void
123 vhost_log_used_vring_elem(struct spdk_vhost_session *vsession,
124 			  struct spdk_vhost_virtqueue *virtqueue,
125 			  uint16_t idx)
126 {
127 	uint64_t offset, len;
128 
129 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
130 		return;
131 	}
132 
133 	if (spdk_unlikely(virtqueue->packed.packed_ring)) {
134 		offset = idx * sizeof(struct vring_packed_desc);
135 		len = sizeof(struct vring_packed_desc);
136 	} else {
137 		offset = offsetof(struct vring_used, ring[idx]);
138 		len = sizeof(virtqueue->vring.used->ring[idx]);
139 	}
140 
141 	rte_vhost_log_used_vring(vsession->vid, virtqueue->vring_idx, offset, len);
142 }
143 
144 static void
145 vhost_log_used_vring_idx(struct spdk_vhost_session *vsession,
146 			 struct spdk_vhost_virtqueue *virtqueue)
147 {
148 	uint64_t offset, len;
149 	uint16_t vq_idx;
150 
151 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
152 		return;
153 	}
154 
155 	offset = offsetof(struct vring_used, idx);
156 	len = sizeof(virtqueue->vring.used->idx);
157 	vq_idx = virtqueue - vsession->virtqueue;
158 
159 	rte_vhost_log_used_vring(vsession->vid, vq_idx, offset, len);
160 }
161 
162 /*
163  * Get available requests from avail ring.
164  */
165 uint16_t
166 vhost_vq_avail_ring_get(struct spdk_vhost_virtqueue *virtqueue, uint16_t *reqs,
167 			uint16_t reqs_len)
168 {
169 	struct rte_vhost_vring *vring = &virtqueue->vring;
170 	struct vring_avail *avail = vring->avail;
171 	uint16_t size_mask = vring->size - 1;
172 	uint16_t last_idx = virtqueue->last_avail_idx, avail_idx = avail->idx;
173 	uint16_t count, i;
174 	int rc;
175 	uint64_t u64_value;
176 
177 	spdk_smp_rmb();
178 
179 	if (virtqueue->vsession && spdk_unlikely(virtqueue->vsession->interrupt_mode)) {
180 		/* Read to clear vring's kickfd */
181 		rc = read(vring->kickfd, &u64_value, sizeof(u64_value));
182 		if (rc < 0) {
183 			SPDK_ERRLOG("failed to acknowledge kickfd: %s.\n", spdk_strerror(errno));
184 			return -errno;
185 		}
186 	}
187 
188 	count = avail_idx - last_idx;
189 	if (spdk_likely(count == 0)) {
190 		return 0;
191 	}
192 
193 	if (spdk_unlikely(count > vring->size)) {
194 		/* TODO: the queue is unrecoverably broken and should be marked so.
195 		 * For now we will fail silently and report there are no new avail entries.
196 		 */
197 		return 0;
198 	}
199 
200 	count = spdk_min(count, reqs_len);
201 
202 	virtqueue->last_avail_idx += count;
203 	/* Check whether there are unprocessed reqs in vq, then kick vq manually */
204 	if (virtqueue->vsession && spdk_unlikely(virtqueue->vsession->interrupt_mode)) {
205 		/* If avail_idx is larger than virtqueue's last_avail_idx, then there is unprocessed reqs.
206 		 * avail_idx should get updated here from memory, in case of race condition with guest.
207 		 */
208 		avail_idx = * (volatile uint16_t *) &avail->idx;
209 		if (avail_idx > virtqueue->last_avail_idx) {
210 			/* Write to notify vring's kickfd */
211 			rc = write(vring->kickfd, &u64_value, sizeof(u64_value));
212 			if (rc < 0) {
213 				SPDK_ERRLOG("failed to kick vring: %s.\n", spdk_strerror(errno));
214 				return -errno;
215 			}
216 		}
217 	}
218 
219 	for (i = 0; i < count; i++) {
220 		reqs[i] = vring->avail->ring[(last_idx + i) & size_mask];
221 	}
222 
223 	SPDK_DEBUGLOG(vhost_ring,
224 		      "AVAIL: last_idx=%"PRIu16" avail_idx=%"PRIu16" count=%"PRIu16"\n",
225 		      last_idx, avail_idx, count);
226 
227 	return count;
228 }
229 
230 static bool
231 vhost_vring_desc_is_indirect(struct vring_desc *cur_desc)
232 {
233 	return !!(cur_desc->flags & VRING_DESC_F_INDIRECT);
234 }
235 
236 static bool
237 vhost_vring_packed_desc_is_indirect(struct vring_packed_desc *cur_desc)
238 {
239 	return (cur_desc->flags & VRING_DESC_F_INDIRECT) != 0;
240 }
241 
242 static bool
243 vhost_inflight_packed_desc_is_indirect(spdk_vhost_inflight_desc *cur_desc)
244 {
245 	return (cur_desc->flags & VRING_DESC_F_INDIRECT) != 0;
246 }
247 
248 int
249 vhost_vq_get_desc(struct spdk_vhost_session *vsession, struct spdk_vhost_virtqueue *virtqueue,
250 		  uint16_t req_idx, struct vring_desc **desc, struct vring_desc **desc_table,
251 		  uint32_t *desc_table_size)
252 {
253 	if (spdk_unlikely(req_idx >= virtqueue->vring.size)) {
254 		return -1;
255 	}
256 
257 	*desc = &virtqueue->vring.desc[req_idx];
258 
259 	if (vhost_vring_desc_is_indirect(*desc)) {
260 		*desc_table_size = (*desc)->len / sizeof(**desc);
261 		*desc_table = vhost_gpa_to_vva(vsession, (*desc)->addr,
262 					       sizeof(**desc) * *desc_table_size);
263 		*desc = *desc_table;
264 		if (*desc == NULL) {
265 			return -1;
266 		}
267 
268 		return 0;
269 	}
270 
271 	*desc_table = virtqueue->vring.desc;
272 	*desc_table_size = virtqueue->vring.size;
273 
274 	return 0;
275 }
276 
277 static bool
278 vhost_packed_desc_indirect_to_desc_table(struct spdk_vhost_session *vsession,
279 		uint64_t addr, uint32_t len,
280 		struct vring_packed_desc **desc_table,
281 		uint32_t *desc_table_size)
282 {
283 	*desc_table_size = len / sizeof(struct vring_packed_desc);
284 
285 	*desc_table = vhost_gpa_to_vva(vsession, addr, len);
286 	if (spdk_unlikely(*desc_table == NULL)) {
287 		return false;
288 	}
289 
290 	return true;
291 }
292 
293 int
294 vhost_vq_get_desc_packed(struct spdk_vhost_session *vsession,
295 			 struct spdk_vhost_virtqueue *virtqueue,
296 			 uint16_t req_idx, struct vring_packed_desc **desc,
297 			 struct vring_packed_desc **desc_table, uint32_t *desc_table_size)
298 {
299 	*desc =  &virtqueue->vring.desc_packed[req_idx];
300 
301 	/* In packed ring when the desc is non-indirect we get next desc
302 	 * by judging (desc->flag & VRING_DESC_F_NEXT) != 0. When the desc
303 	 * is indirect we get next desc by idx and desc_table_size. It's
304 	 * different from split ring.
305 	 */
306 	if (vhost_vring_packed_desc_is_indirect(*desc)) {
307 		if (!vhost_packed_desc_indirect_to_desc_table(vsession, (*desc)->addr, (*desc)->len,
308 				desc_table, desc_table_size)) {
309 			return -1;
310 		}
311 
312 		*desc = *desc_table;
313 	} else {
314 		*desc_table = NULL;
315 		*desc_table_size  = 0;
316 	}
317 
318 	return 0;
319 }
320 
321 int
322 vhost_inflight_queue_get_desc(struct spdk_vhost_session *vsession,
323 			      spdk_vhost_inflight_desc *desc_array,
324 			      uint16_t req_idx, spdk_vhost_inflight_desc **desc,
325 			      struct vring_packed_desc  **desc_table, uint32_t *desc_table_size)
326 {
327 	*desc = &desc_array[req_idx];
328 
329 	if (vhost_inflight_packed_desc_is_indirect(*desc)) {
330 		if (!vhost_packed_desc_indirect_to_desc_table(vsession, (*desc)->addr, (*desc)->len,
331 				desc_table, desc_table_size)) {
332 			return -1;
333 		}
334 
335 		/* This desc is the inflight desc not the packed desc.
336 		 * When set the F_INDIRECT the table entry should be the packed desc
337 		 * so set the inflight desc NULL.
338 		 */
339 		*desc = NULL;
340 	} else {
341 		/* When not set the F_INDIRECT means there is no packed desc table */
342 		*desc_table = NULL;
343 		*desc_table_size = 0;
344 	}
345 
346 	return 0;
347 }
348 
349 int
350 vhost_vq_used_signal(struct spdk_vhost_session *vsession,
351 		     struct spdk_vhost_virtqueue *virtqueue)
352 {
353 	if (virtqueue->used_req_cnt == 0) {
354 		return 0;
355 	}
356 
357 	SPDK_DEBUGLOG(vhost_ring,
358 		      "Queue %td - USED RING: sending IRQ: last used %"PRIu16"\n",
359 		      virtqueue - vsession->virtqueue, virtqueue->last_used_idx);
360 
361 #if RTE_VERSION < RTE_VERSION_NUM(22, 11, 0, 0)
362 	if (rte_vhost_vring_call(vsession->vid, virtqueue->vring_idx) == 0) {
363 #else
364 	if (rte_vhost_vring_call_nonblock(vsession->vid, virtqueue->vring_idx) == 0) {
365 #endif
366 		/* interrupt signalled */
367 		virtqueue->req_cnt += virtqueue->used_req_cnt;
368 		virtqueue->used_req_cnt = 0;
369 		return 1;
370 	} else {
371 		/* interrupt not signalled */
372 		return 0;
373 	}
374 }
375 
376 static void
377 session_vq_io_stats_update(struct spdk_vhost_session *vsession,
378 			   struct spdk_vhost_virtqueue *virtqueue, uint64_t now)
379 {
380 	uint32_t irq_delay_base = vsession->coalescing_delay_time_base;
381 	uint32_t io_threshold = vsession->coalescing_io_rate_threshold;
382 	int32_t irq_delay;
383 	uint32_t req_cnt;
384 
385 	req_cnt = virtqueue->req_cnt + virtqueue->used_req_cnt;
386 	if (req_cnt <= io_threshold) {
387 		return;
388 	}
389 
390 	irq_delay = (irq_delay_base * (req_cnt - io_threshold)) / io_threshold;
391 	virtqueue->irq_delay_time = (uint32_t) spdk_max(0, irq_delay);
392 
393 	virtqueue->req_cnt = 0;
394 	virtqueue->next_event_time = now;
395 }
396 
397 static void
398 check_session_vq_io_stats(struct spdk_vhost_session *vsession,
399 			  struct spdk_vhost_virtqueue *virtqueue, uint64_t now)
400 {
401 	if (now < vsession->next_stats_check_time) {
402 		return;
403 	}
404 
405 	vsession->next_stats_check_time = now + vsession->stats_check_interval;
406 	session_vq_io_stats_update(vsession, virtqueue, now);
407 }
408 
409 static inline bool
410 vhost_vq_event_is_suppressed(struct spdk_vhost_virtqueue *vq)
411 {
412 	if (spdk_unlikely(vq->packed.packed_ring)) {
413 		if (vq->vring.driver_event->flags & VRING_PACKED_EVENT_FLAG_DISABLE) {
414 			return true;
415 		}
416 	} else {
417 		if (vq->vring.avail->flags & VRING_AVAIL_F_NO_INTERRUPT) {
418 			return true;
419 		}
420 	}
421 
422 	return false;
423 }
424 
425 void
426 vhost_session_vq_used_signal(struct spdk_vhost_virtqueue *virtqueue)
427 {
428 	struct spdk_vhost_session *vsession = virtqueue->vsession;
429 	uint64_t now;
430 
431 	if (vsession->coalescing_delay_time_base == 0) {
432 		if (virtqueue->vring.desc == NULL) {
433 			return;
434 		}
435 
436 		if (vhost_vq_event_is_suppressed(virtqueue)) {
437 			return;
438 		}
439 
440 		vhost_vq_used_signal(vsession, virtqueue);
441 	} else {
442 		now = spdk_get_ticks();
443 		check_session_vq_io_stats(vsession, virtqueue, now);
444 
445 		/* No need for event right now */
446 		if (now < virtqueue->next_event_time) {
447 			return;
448 		}
449 
450 		if (vhost_vq_event_is_suppressed(virtqueue)) {
451 			return;
452 		}
453 
454 		if (!vhost_vq_used_signal(vsession, virtqueue)) {
455 			return;
456 		}
457 
458 		/* Syscall is quite long so update time */
459 		now = spdk_get_ticks();
460 		virtqueue->next_event_time = now + virtqueue->irq_delay_time;
461 	}
462 }
463 
464 /*
465  * Enqueue id and len to used ring.
466  */
467 void
468 vhost_vq_used_ring_enqueue(struct spdk_vhost_session *vsession,
469 			   struct spdk_vhost_virtqueue *virtqueue,
470 			   uint16_t id, uint32_t len)
471 {
472 	struct rte_vhost_vring *vring = &virtqueue->vring;
473 	struct vring_used *used = vring->used;
474 	uint16_t last_idx = virtqueue->last_used_idx & (vring->size - 1);
475 	uint16_t vq_idx = virtqueue->vring_idx;
476 
477 	SPDK_DEBUGLOG(vhost_ring,
478 		      "Queue %td - USED RING: last_idx=%"PRIu16" req id=%"PRIu16" len=%"PRIu32"\n",
479 		      virtqueue - vsession->virtqueue, virtqueue->last_used_idx, id, len);
480 
481 	vhost_log_req_desc(vsession, virtqueue, id);
482 
483 	virtqueue->last_used_idx++;
484 	used->ring[last_idx].id = id;
485 	used->ring[last_idx].len = len;
486 
487 	/* Ensure the used ring is updated before we log it or increment used->idx. */
488 	spdk_smp_wmb();
489 
490 	rte_vhost_set_last_inflight_io_split(vsession->vid, vq_idx, id);
491 
492 	vhost_log_used_vring_elem(vsession, virtqueue, last_idx);
493 	* (volatile uint16_t *) &used->idx = virtqueue->last_used_idx;
494 	vhost_log_used_vring_idx(vsession, virtqueue);
495 
496 	rte_vhost_clr_inflight_desc_split(vsession->vid, vq_idx, virtqueue->last_used_idx, id);
497 
498 	virtqueue->used_req_cnt++;
499 
500 	if (vsession->interrupt_mode) {
501 		if (virtqueue->vring.desc == NULL || vhost_vq_event_is_suppressed(virtqueue)) {
502 			return;
503 		}
504 
505 		vhost_vq_used_signal(vsession, virtqueue);
506 	}
507 }
508 
509 void
510 vhost_vq_packed_ring_enqueue(struct spdk_vhost_session *vsession,
511 			     struct spdk_vhost_virtqueue *virtqueue,
512 			     uint16_t num_descs, uint16_t buffer_id,
513 			     uint32_t length, uint16_t inflight_head)
514 {
515 	struct vring_packed_desc *desc = &virtqueue->vring.desc_packed[virtqueue->last_used_idx];
516 	bool used, avail;
517 
518 	SPDK_DEBUGLOG(vhost_ring,
519 		      "Queue %td - RING: buffer_id=%"PRIu16"\n",
520 		      virtqueue - vsession->virtqueue, buffer_id);
521 
522 	/* When the descriptor is used, two flags in descriptor
523 	 * avail flag and used flag are set to equal
524 	 * and used flag value == used_wrap_counter.
525 	 */
526 	used = !!(desc->flags & VRING_DESC_F_USED);
527 	avail = !!(desc->flags & VRING_DESC_F_AVAIL);
528 	if (spdk_unlikely(used == virtqueue->packed.used_phase && used == avail)) {
529 		SPDK_ERRLOG("descriptor has been used before\n");
530 		return;
531 	}
532 
533 	/* In used desc addr is unused and len specifies the buffer length
534 	 * that has been written to by the device.
535 	 */
536 	desc->addr = 0;
537 	desc->len = length;
538 
539 	/* This bit specifies whether any data has been written by the device */
540 	if (length != 0) {
541 		desc->flags |= VRING_DESC_F_WRITE;
542 	}
543 
544 	/* Buffer ID is included in the last descriptor in the list.
545 	 * The driver needs to keep track of the size of the list corresponding
546 	 * to each buffer ID.
547 	 */
548 	desc->id = buffer_id;
549 
550 	/* A device MUST NOT make the descriptor used before buffer_id is
551 	 * written to the descriptor.
552 	 */
553 	spdk_smp_wmb();
554 
555 	rte_vhost_set_last_inflight_io_packed(vsession->vid, virtqueue->vring_idx, inflight_head);
556 	/* To mark a desc as used, the device sets the F_USED bit in flags to match
557 	 * the internal Device ring wrap counter. It also sets the F_AVAIL bit to
558 	 * match the same value.
559 	 */
560 	if (virtqueue->packed.used_phase) {
561 		desc->flags |= VRING_DESC_F_AVAIL_USED;
562 	} else {
563 		desc->flags &= ~VRING_DESC_F_AVAIL_USED;
564 	}
565 	rte_vhost_clr_inflight_desc_packed(vsession->vid, virtqueue->vring_idx, inflight_head);
566 
567 	vhost_log_used_vring_elem(vsession, virtqueue, virtqueue->last_used_idx);
568 	virtqueue->last_used_idx += num_descs;
569 	if (virtqueue->last_used_idx >= virtqueue->vring.size) {
570 		virtqueue->last_used_idx -= virtqueue->vring.size;
571 		virtqueue->packed.used_phase = !virtqueue->packed.used_phase;
572 	}
573 
574 	virtqueue->used_req_cnt++;
575 }
576 
577 bool
578 vhost_vq_packed_ring_is_avail(struct spdk_vhost_virtqueue *virtqueue)
579 {
580 	uint16_t flags = virtqueue->vring.desc_packed[virtqueue->last_avail_idx].flags;
581 
582 	/* To mark a desc as available, the driver sets the F_AVAIL bit in flags
583 	 * to match the internal avail wrap counter. It also sets the F_USED bit to
584 	 * match the inverse value but it's not mandatory.
585 	 */
586 	return (!!(flags & VRING_DESC_F_AVAIL) == virtqueue->packed.avail_phase);
587 }
588 
589 bool
590 vhost_vring_packed_desc_is_wr(struct vring_packed_desc *cur_desc)
591 {
592 	return (cur_desc->flags & VRING_DESC_F_WRITE) != 0;
593 }
594 
595 bool
596 vhost_vring_inflight_desc_is_wr(spdk_vhost_inflight_desc *cur_desc)
597 {
598 	return (cur_desc->flags & VRING_DESC_F_WRITE) != 0;
599 }
600 
601 int
602 vhost_vring_packed_desc_get_next(struct vring_packed_desc **desc, uint16_t *req_idx,
603 				 struct spdk_vhost_virtqueue *vq,
604 				 struct vring_packed_desc *desc_table,
605 				 uint32_t desc_table_size)
606 {
607 	if (desc_table != NULL) {
608 		/* When the desc_table isn't NULL means it's indirect and we get the next
609 		 * desc by req_idx and desc_table_size. The return value is NULL means
610 		 * we reach the last desc of this request.
611 		 */
612 		(*req_idx)++;
613 		if (*req_idx < desc_table_size) {
614 			*desc = &desc_table[*req_idx];
615 		} else {
616 			*desc = NULL;
617 		}
618 	} else {
619 		/* When the desc_table is NULL means it's non-indirect and we get the next
620 		 * desc by req_idx and F_NEXT in flags. The return value is NULL means
621 		 * we reach the last desc of this request. When return new desc
622 		 * we update the req_idx too.
623 		 */
624 		if (((*desc)->flags & VRING_DESC_F_NEXT) == 0) {
625 			*desc = NULL;
626 			return 0;
627 		}
628 
629 		*req_idx = (*req_idx + 1) % vq->vring.size;
630 		*desc = &vq->vring.desc_packed[*req_idx];
631 	}
632 
633 	return 0;
634 }
635 
636 static int
637 vhost_vring_desc_payload_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
638 				uint16_t *iov_index, uintptr_t payload, uint64_t remaining)
639 {
640 	uintptr_t vva;
641 	uint64_t len;
642 
643 	do {
644 		if (*iov_index >= SPDK_VHOST_IOVS_MAX) {
645 			SPDK_ERRLOG("SPDK_VHOST_IOVS_MAX(%d) reached\n", SPDK_VHOST_IOVS_MAX);
646 			return -1;
647 		}
648 		len = remaining;
649 		vva = (uintptr_t)rte_vhost_va_from_guest_pa(vsession->mem, payload, &len);
650 		if (vva == 0 || len == 0) {
651 			SPDK_ERRLOG("gpa_to_vva(%p) == NULL\n", (void *)payload);
652 			return -1;
653 		}
654 		iov[*iov_index].iov_base = (void *)vva;
655 		iov[*iov_index].iov_len = len;
656 		remaining -= len;
657 		payload += len;
658 		(*iov_index)++;
659 	} while (remaining);
660 
661 	return 0;
662 }
663 
664 int
665 vhost_vring_packed_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
666 			       uint16_t *iov_index, const struct vring_packed_desc *desc)
667 {
668 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
669 					       desc->addr, desc->len);
670 }
671 
672 int
673 vhost_vring_inflight_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
674 				 uint16_t *iov_index, const spdk_vhost_inflight_desc *desc)
675 {
676 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
677 					       desc->addr, desc->len);
678 }
679 
680 /* 1, Traverse the desc chain to get the buffer_id and return buffer_id as task_idx.
681  * 2, Update the vq->last_avail_idx to point next available desc chain.
682  * 3, Update the avail_wrap_counter if last_avail_idx overturn.
683  */
684 uint16_t
685 vhost_vring_packed_desc_get_buffer_id(struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
686 				      uint16_t *num_descs)
687 {
688 	struct vring_packed_desc *desc;
689 	uint16_t desc_head = req_idx;
690 
691 	*num_descs = 1;
692 
693 	desc =  &vq->vring.desc_packed[req_idx];
694 	if (!vhost_vring_packed_desc_is_indirect(desc)) {
695 		while ((desc->flags & VRING_DESC_F_NEXT) != 0) {
696 			req_idx = (req_idx + 1) % vq->vring.size;
697 			desc = &vq->vring.desc_packed[req_idx];
698 			(*num_descs)++;
699 		}
700 	}
701 
702 	/* Queue Size doesn't have to be a power of 2
703 	 * Device maintains last_avail_idx so we can make sure
704 	 * the value is valid(0 ~ vring.size - 1)
705 	 */
706 	vq->last_avail_idx = (req_idx + 1) % vq->vring.size;
707 	if (vq->last_avail_idx < desc_head) {
708 		vq->packed.avail_phase = !vq->packed.avail_phase;
709 	}
710 
711 	return desc->id;
712 }
713 
714 int
715 vhost_vring_desc_get_next(struct vring_desc **desc,
716 			  struct vring_desc *desc_table, uint32_t desc_table_size)
717 {
718 	struct vring_desc *old_desc = *desc;
719 	uint16_t next_idx;
720 
721 	if ((old_desc->flags & VRING_DESC_F_NEXT) == 0) {
722 		*desc = NULL;
723 		return 0;
724 	}
725 
726 	next_idx = old_desc->next;
727 	if (spdk_unlikely(next_idx >= desc_table_size)) {
728 		*desc = NULL;
729 		return -1;
730 	}
731 
732 	*desc = &desc_table[next_idx];
733 	return 0;
734 }
735 
736 int
737 vhost_vring_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
738 			uint16_t *iov_index, const struct vring_desc *desc)
739 {
740 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
741 					       desc->addr, desc->len);
742 }
743 
744 static inline void
745 vhost_session_mem_region_calc(uint64_t *previous_start, uint64_t *start, uint64_t *end,
746 			      uint64_t *len, struct rte_vhost_mem_region *region)
747 {
748 	*start = FLOOR_2MB(region->mmap_addr);
749 	*end = CEIL_2MB(region->mmap_addr + region->mmap_size);
750 	if (*start == *previous_start) {
751 		*start += (size_t) VALUE_2MB;
752 	}
753 	*previous_start = *start;
754 	*len = *end - *start;
755 }
756 
757 void
758 vhost_session_mem_register(struct rte_vhost_memory *mem)
759 {
760 	uint64_t start, end, len;
761 	uint32_t i;
762 	uint64_t previous_start = UINT64_MAX;
763 
764 
765 	for (i = 0; i < mem->nregions; i++) {
766 		vhost_session_mem_region_calc(&previous_start, &start, &end, &len, &mem->regions[i]);
767 		SPDK_INFOLOG(vhost, "Registering VM memory for vtophys translation - 0x%jx len:0x%jx\n",
768 			     start, len);
769 
770 		if (spdk_mem_register((void *)start, len) != 0) {
771 			SPDK_WARNLOG("Failed to register memory region %"PRIu32". Future vtophys translation might fail.\n",
772 				     i);
773 			continue;
774 		}
775 	}
776 }
777 
778 void
779 vhost_session_mem_unregister(struct rte_vhost_memory *mem)
780 {
781 	uint64_t start, end, len;
782 	uint32_t i;
783 	uint64_t previous_start = UINT64_MAX;
784 
785 	for (i = 0; i < mem->nregions; i++) {
786 		vhost_session_mem_region_calc(&previous_start, &start, &end, &len, &mem->regions[i]);
787 		if (spdk_vtophys((void *) start, NULL) == SPDK_VTOPHYS_ERROR) {
788 			continue; /* region has not been registered */
789 		}
790 
791 		if (spdk_mem_unregister((void *)start, len) != 0) {
792 			assert(false);
793 		}
794 	}
795 }
796 
797 static bool
798 vhost_memory_changed(struct rte_vhost_memory *new,
799 		     struct rte_vhost_memory *old)
800 {
801 	uint32_t i;
802 
803 	if (new->nregions != old->nregions) {
804 		return true;
805 	}
806 
807 	for (i = 0; i < new->nregions; ++i) {
808 		struct rte_vhost_mem_region *new_r = &new->regions[i];
809 		struct rte_vhost_mem_region *old_r = &old->regions[i];
810 
811 		if (new_r->guest_phys_addr != old_r->guest_phys_addr) {
812 			return true;
813 		}
814 		if (new_r->size != old_r->size) {
815 			return true;
816 		}
817 		if (new_r->guest_user_addr != old_r->guest_user_addr) {
818 			return true;
819 		}
820 		if (new_r->mmap_addr != old_r->mmap_addr) {
821 			return true;
822 		}
823 		if (new_r->fd != old_r->fd) {
824 			return true;
825 		}
826 	}
827 
828 	return false;
829 }
830 
831 static int
832 vhost_register_memtable_if_required(struct spdk_vhost_session *vsession, int vid)
833 {
834 	struct rte_vhost_memory *new_mem;
835 
836 	if (vhost_get_mem_table(vid, &new_mem) != 0) {
837 		SPDK_ERRLOG("vhost device %d: Failed to get guest memory table\n", vid);
838 		return -1;
839 	}
840 
841 	if (vsession->mem == NULL) {
842 		SPDK_INFOLOG(vhost, "Start to set memtable\n");
843 		vsession->mem = new_mem;
844 		vhost_session_mem_register(vsession->mem);
845 		return 0;
846 	}
847 
848 	if (vhost_memory_changed(new_mem, vsession->mem)) {
849 		SPDK_INFOLOG(vhost, "Memtable is changed\n");
850 		vhost_session_mem_unregister(vsession->mem);
851 		free(vsession->mem);
852 
853 		vsession->mem = new_mem;
854 		vhost_session_mem_register(vsession->mem);
855 		return 0;
856 
857 	}
858 
859 	SPDK_INFOLOG(vhost, "Memtable is unchanged\n");
860 	free(new_mem);
861 	return 0;
862 }
863 
864 static int
865 _stop_session(struct spdk_vhost_session *vsession)
866 {
867 	struct spdk_vhost_virtqueue *q;
868 	int rc;
869 	uint16_t i;
870 
871 	rc = vhost_user_wait_for_session_stop(vsession, 3, "stop session");
872 	if (rc != 0) {
873 		SPDK_ERRLOG("Couldn't stop device with vid %d.\n", vsession->vid);
874 		return rc;
875 	}
876 
877 	for (i = 0; i < vsession->max_queues; i++) {
878 		q = &vsession->virtqueue[i];
879 
880 		/* vring.desc and vring.desc_packed are in a union struct
881 		 * so q->vring.desc can replace q->vring.desc_packed.
882 		 */
883 		if (q->vring.desc == NULL) {
884 			continue;
885 		}
886 
887 		/* Packed virtqueues support up to 2^15 entries each
888 		 * so left one bit can be used as wrap counter.
889 		 */
890 		if (q->packed.packed_ring) {
891 			q->last_avail_idx = q->last_avail_idx |
892 					    ((uint16_t)q->packed.avail_phase << 15);
893 			q->last_used_idx = q->last_used_idx |
894 					   ((uint16_t)q->packed.used_phase << 15);
895 		}
896 
897 		rte_vhost_set_vring_base(vsession->vid, i, q->last_avail_idx, q->last_used_idx);
898 		q->vring.desc = NULL;
899 	}
900 	vsession->max_queues = 0;
901 
902 	return 0;
903 }
904 
905 static int
906 new_connection(int vid)
907 {
908 	struct spdk_vhost_dev *vdev;
909 	struct spdk_vhost_user_dev *user_dev;
910 	struct spdk_vhost_session *vsession;
911 	size_t dev_dirname_len;
912 	char ifname[PATH_MAX];
913 	char *ctrlr_name;
914 
915 	if (rte_vhost_get_ifname(vid, ifname, PATH_MAX) < 0) {
916 		SPDK_ERRLOG("Couldn't get a valid ifname for device with vid %d\n", vid);
917 		return -1;
918 	}
919 
920 	ctrlr_name = &ifname[0];
921 	dev_dirname_len = strlen(g_vhost_user_dev_dirname);
922 	if (strncmp(ctrlr_name, g_vhost_user_dev_dirname, dev_dirname_len) == 0) {
923 		ctrlr_name += dev_dirname_len;
924 	}
925 
926 	spdk_vhost_lock();
927 	vdev = spdk_vhost_dev_find(ctrlr_name);
928 	if (vdev == NULL) {
929 		SPDK_ERRLOG("Couldn't find device with vid %d to create connection for.\n", vid);
930 		spdk_vhost_unlock();
931 		return -1;
932 	}
933 	spdk_vhost_unlock();
934 
935 	user_dev = to_user_dev(vdev);
936 	pthread_mutex_lock(&user_dev->lock);
937 	if (user_dev->registered == false) {
938 		SPDK_ERRLOG("Device %s is unregistered\n", ctrlr_name);
939 		pthread_mutex_unlock(&user_dev->lock);
940 		return -1;
941 	}
942 
943 	/* We expect sessions inside user_dev->vsessions to be sorted in ascending
944 	 * order in regard of vsession->id. For now we always set id = vsessions_num++
945 	 * and append each session to the very end of the vsessions list.
946 	 * This is required for vhost_user_dev_foreach_session() to work.
947 	 */
948 	if (user_dev->vsessions_num == UINT_MAX) {
949 		pthread_mutex_unlock(&user_dev->lock);
950 		assert(false);
951 		return -EINVAL;
952 	}
953 
954 	if (posix_memalign((void **)&vsession, SPDK_CACHE_LINE_SIZE, sizeof(*vsession) +
955 			   user_dev->user_backend->session_ctx_size)) {
956 		SPDK_ERRLOG("vsession alloc failed\n");
957 		pthread_mutex_unlock(&user_dev->lock);
958 		return -1;
959 	}
960 	memset(vsession, 0, sizeof(*vsession) + user_dev->user_backend->session_ctx_size);
961 
962 	vsession->vdev = vdev;
963 	vsession->vid = vid;
964 	vsession->id = user_dev->vsessions_num++;
965 	vsession->name = spdk_sprintf_alloc("%ss%u", vdev->name, vsession->vid);
966 	if (vsession->name == NULL) {
967 		SPDK_ERRLOG("vsession alloc failed\n");
968 		free(vsession);
969 		pthread_mutex_unlock(&user_dev->lock);
970 		return -1;
971 	}
972 	vsession->started = false;
973 	vsession->starting = false;
974 	vsession->next_stats_check_time = 0;
975 	vsession->stats_check_interval = SPDK_VHOST_STATS_CHECK_INTERVAL_MS *
976 					 spdk_get_ticks_hz() / 1000UL;
977 	TAILQ_INSERT_TAIL(&user_dev->vsessions, vsession, tailq);
978 	vhost_session_install_rte_compat_hooks(vsession);
979 	pthread_mutex_unlock(&user_dev->lock);
980 
981 	return 0;
982 }
983 
984 static void
985 vhost_user_session_start(void *arg1)
986 {
987 	struct spdk_vhost_session *vsession = arg1;
988 	struct spdk_vhost_dev *vdev = vsession->vdev;
989 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vsession->vdev);
990 	const struct spdk_vhost_user_dev_backend *backend;
991 	int rc;
992 
993 	SPDK_INFOLOG(vhost, "Starting new session for device %s with vid %d\n", vdev->name, vsession->vid);
994 	pthread_mutex_lock(&user_dev->lock);
995 	vsession->starting = false;
996 	backend = user_dev->user_backend;
997 	rc = backend->start_session(vdev, vsession, NULL);
998 	if (rc == 0) {
999 		vsession->started = true;
1000 	}
1001 	pthread_mutex_unlock(&user_dev->lock);
1002 }
1003 
1004 static int
1005 set_device_vq_callfd(struct spdk_vhost_session *vsession, uint16_t qid)
1006 {
1007 	struct spdk_vhost_virtqueue *q;
1008 
1009 	if (qid >= SPDK_VHOST_MAX_VQUEUES) {
1010 		return -EINVAL;
1011 	}
1012 
1013 	q = &vsession->virtqueue[qid];
1014 	/* vq isn't enabled yet */
1015 	if (q->vring_idx != qid) {
1016 		return 0;
1017 	}
1018 
1019 	/* vring.desc and vring.desc_packed are in a union struct
1020 	 * so q->vring.desc can replace q->vring.desc_packed.
1021 	 */
1022 	if (q->vring.desc == NULL || q->vring.size == 0) {
1023 		return 0;
1024 	}
1025 
1026 	/*
1027 	 * Not sure right now but this look like some kind of QEMU bug and guest IO
1028 	 * might be frozed without kicking all queues after live-migration. This look like
1029 	 * the previous vhost instance failed to effectively deliver all interrupts before
1030 	 * the GET_VRING_BASE message. This shouldn't harm guest since spurious interrupts
1031 	 * should be ignored by guest virtio driver.
1032 	 *
1033 	 * Tested on QEMU 2.10.91 and 2.11.50.
1034 	 *
1035 	 * Make sure a successful call of
1036 	 * `rte_vhost_vring_call` will happen
1037 	 * after starting the device.
1038 	 */
1039 	q->used_req_cnt += 1;
1040 
1041 	return 0;
1042 }
1043 
1044 static int
1045 enable_device_vq(struct spdk_vhost_session *vsession, uint16_t qid)
1046 {
1047 	struct spdk_vhost_virtqueue *q;
1048 	bool packed_ring;
1049 	const struct spdk_vhost_user_dev_backend *backend;
1050 	int rc;
1051 
1052 	if (qid >= SPDK_VHOST_MAX_VQUEUES) {
1053 		return -EINVAL;
1054 	}
1055 
1056 	q = &vsession->virtqueue[qid];
1057 	memset(q, 0, sizeof(*q));
1058 	packed_ring = ((vsession->negotiated_features & (1ULL << VIRTIO_F_RING_PACKED)) != 0);
1059 
1060 	q->vsession = vsession;
1061 	q->vring_idx = -1;
1062 	if (rte_vhost_get_vhost_vring(vsession->vid, qid, &q->vring)) {
1063 		return 0;
1064 	}
1065 	q->vring_idx = qid;
1066 	rte_vhost_get_vhost_ring_inflight(vsession->vid, qid, &q->vring_inflight);
1067 
1068 	/* vring.desc and vring.desc_packed are in a union struct
1069 	 * so q->vring.desc can replace q->vring.desc_packed.
1070 	 */
1071 	if (q->vring.desc == NULL || q->vring.size == 0) {
1072 		return 0;
1073 	}
1074 
1075 	if (rte_vhost_get_vring_base(vsession->vid, qid, &q->last_avail_idx, &q->last_used_idx)) {
1076 		q->vring.desc = NULL;
1077 		return 0;
1078 	}
1079 
1080 	backend = to_user_dev(vsession->vdev)->user_backend;
1081 	rc = backend->alloc_vq_tasks(vsession, qid);
1082 	if (rc) {
1083 		return rc;
1084 	}
1085 
1086 	/*
1087 	 * This shouldn't harm guest since spurious interrupts should be ignored by
1088 	 * guest virtio driver.
1089 	 *
1090 	 * Make sure a successful call of `rte_vhost_vring_call` will happen after
1091 	 * restarting the device.
1092 	 */
1093 	if (vsession->needs_restart) {
1094 		q->used_req_cnt += 1;
1095 	}
1096 
1097 	if (packed_ring) {
1098 		/* Since packed ring flag is already negociated between SPDK and VM, VM doesn't
1099 		 * restore `last_avail_idx` and `last_used_idx` for packed ring, so use the
1100 		 * inflight mem to restore the `last_avail_idx` and `last_used_idx`.
1101 		 */
1102 		rte_vhost_get_vring_base_from_inflight(vsession->vid, qid, &q->last_avail_idx,
1103 						       &q->last_used_idx);
1104 
1105 		/* Packed virtqueues support up to 2^15 entries each
1106 		 * so left one bit can be used as wrap counter.
1107 		 */
1108 		q->packed.avail_phase = q->last_avail_idx >> 15;
1109 		q->last_avail_idx = q->last_avail_idx & 0x7FFF;
1110 		q->packed.used_phase = q->last_used_idx >> 15;
1111 		q->last_used_idx = q->last_used_idx & 0x7FFF;
1112 
1113 		if (!spdk_interrupt_mode_is_enabled()) {
1114 			/* Disable I/O submission notifications, we'll be polling. */
1115 			q->vring.device_event->flags = VRING_PACKED_EVENT_FLAG_DISABLE;
1116 		} else {
1117 			/* Enable I/O submission notifications, we'll be interrupting. */
1118 			q->vring.device_event->flags = VRING_PACKED_EVENT_FLAG_ENABLE;
1119 		}
1120 	} else {
1121 		if (!spdk_interrupt_mode_is_enabled()) {
1122 			/* Disable I/O submission notifications, we'll be polling. */
1123 			q->vring.used->flags = VRING_USED_F_NO_NOTIFY;
1124 		} else {
1125 			/* Enable I/O submission notifications, we'll be interrupting. */
1126 			q->vring.used->flags = 0;
1127 		}
1128 	}
1129 
1130 	if (spdk_interrupt_mode_is_enabled() && backend->register_vq_interrupt) {
1131 		backend->register_vq_interrupt(vsession, q);
1132 	}
1133 
1134 	q->packed.packed_ring = packed_ring;
1135 	vsession->max_queues = spdk_max(vsession->max_queues, qid + 1);
1136 
1137 	return 0;
1138 }
1139 
1140 static int
1141 start_device(int vid)
1142 {
1143 	struct spdk_vhost_dev *vdev;
1144 	struct spdk_vhost_session *vsession;
1145 	struct spdk_vhost_user_dev *user_dev;
1146 	int rc = 0;
1147 
1148 	vsession = vhost_session_find_by_vid(vid);
1149 	if (vsession == NULL) {
1150 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1151 		return -1;
1152 	}
1153 	vdev = vsession->vdev;
1154 	user_dev = to_user_dev(vdev);
1155 
1156 	pthread_mutex_lock(&user_dev->lock);
1157 	if (vsession->started) {
1158 		/* already started, nothing to do */
1159 		goto out;
1160 	}
1161 
1162 	if (!vsession->mem) {
1163 		rc = -1;
1164 		SPDK_ERRLOG("Session %s doesn't set memory table yet\n", vsession->name);
1165 		goto out;
1166 	}
1167 
1168 	vsession->starting = true;
1169 	SPDK_INFOLOG(vhost, "Session %s is scheduled to start\n", vsession->name);
1170 	vhost_user_session_set_coalescing(vdev, vsession, NULL);
1171 	spdk_thread_send_msg(vdev->thread, vhost_user_session_start, vsession);
1172 
1173 out:
1174 	pthread_mutex_unlock(&user_dev->lock);
1175 	return rc;
1176 }
1177 
1178 static void
1179 stop_device(int vid)
1180 {
1181 	struct spdk_vhost_session *vsession;
1182 	struct spdk_vhost_user_dev *user_dev;
1183 
1184 	vsession = vhost_session_find_by_vid(vid);
1185 	if (vsession == NULL) {
1186 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1187 		return;
1188 	}
1189 	user_dev = to_user_dev(vsession->vdev);
1190 
1191 	pthread_mutex_lock(&user_dev->lock);
1192 	if (!vsession->started && !vsession->starting) {
1193 		pthread_mutex_unlock(&user_dev->lock);
1194 		/* already stopped, nothing to do */
1195 		return;
1196 	}
1197 
1198 	_stop_session(vsession);
1199 	pthread_mutex_unlock(&user_dev->lock);
1200 }
1201 
1202 static void
1203 destroy_connection(int vid)
1204 {
1205 	struct spdk_vhost_session *vsession;
1206 	struct spdk_vhost_user_dev *user_dev;
1207 
1208 	vsession = vhost_session_find_by_vid(vid);
1209 	if (vsession == NULL) {
1210 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1211 		return;
1212 	}
1213 	user_dev = to_user_dev(vsession->vdev);
1214 
1215 	pthread_mutex_lock(&user_dev->lock);
1216 	if (vsession->started || vsession->starting) {
1217 		if (_stop_session(vsession) != 0) {
1218 			pthread_mutex_unlock(&user_dev->lock);
1219 			return;
1220 		}
1221 	}
1222 
1223 	if (vsession->mem) {
1224 		vhost_session_mem_unregister(vsession->mem);
1225 		free(vsession->mem);
1226 	}
1227 
1228 	TAILQ_REMOVE(&to_user_dev(vsession->vdev)->vsessions, vsession, tailq);
1229 	free(vsession->name);
1230 	free(vsession);
1231 	pthread_mutex_unlock(&user_dev->lock);
1232 }
1233 
1234 static const struct rte_vhost_device_ops g_spdk_vhost_ops = {
1235 	.new_device =  start_device,
1236 	.destroy_device = stop_device,
1237 	.new_connection = new_connection,
1238 	.destroy_connection = destroy_connection,
1239 };
1240 
1241 static struct spdk_vhost_session *
1242 vhost_session_find_by_id(struct spdk_vhost_dev *vdev, unsigned id)
1243 {
1244 	struct spdk_vhost_session *vsession;
1245 
1246 	TAILQ_FOREACH(vsession, &to_user_dev(vdev)->vsessions, tailq) {
1247 		if (vsession->id == id) {
1248 			return vsession;
1249 		}
1250 	}
1251 
1252 	return NULL;
1253 }
1254 
1255 struct spdk_vhost_session *
1256 vhost_session_find_by_vid(int vid)
1257 {
1258 	struct spdk_vhost_dev *vdev;
1259 	struct spdk_vhost_session *vsession;
1260 	struct spdk_vhost_user_dev *user_dev;
1261 
1262 	spdk_vhost_lock();
1263 	for (vdev = spdk_vhost_dev_next(NULL); vdev != NULL;
1264 	     vdev = spdk_vhost_dev_next(vdev)) {
1265 		user_dev = to_user_dev(vdev);
1266 
1267 		pthread_mutex_lock(&user_dev->lock);
1268 		TAILQ_FOREACH(vsession, &user_dev->vsessions, tailq) {
1269 			if (vsession->vid == vid) {
1270 				pthread_mutex_unlock(&user_dev->lock);
1271 				spdk_vhost_unlock();
1272 				return vsession;
1273 			}
1274 		}
1275 		pthread_mutex_unlock(&user_dev->lock);
1276 	}
1277 	spdk_vhost_unlock();
1278 
1279 	return NULL;
1280 }
1281 
1282 static void
1283 wait_for_semaphore(int timeout_sec, const char *errmsg)
1284 {
1285 	struct timespec timeout;
1286 	int rc;
1287 
1288 	clock_gettime(CLOCK_REALTIME, &timeout);
1289 	timeout.tv_sec += timeout_sec;
1290 	rc = sem_timedwait(&g_dpdk_sem, &timeout);
1291 	if (rc != 0) {
1292 		SPDK_ERRLOG("Timeout waiting for event: %s.\n", errmsg);
1293 		sem_wait(&g_dpdk_sem);
1294 	}
1295 }
1296 
1297 void
1298 vhost_user_session_stop_done(struct spdk_vhost_session *vsession, int response)
1299 {
1300 	if (response == 0) {
1301 		vsession->started = false;
1302 	}
1303 
1304 	g_dpdk_response = response;
1305 	sem_post(&g_dpdk_sem);
1306 }
1307 
1308 static void
1309 vhost_user_session_stop_event(void *arg1)
1310 {
1311 	struct vhost_session_fn_ctx *ctx = arg1;
1312 	struct spdk_vhost_dev *vdev = ctx->vdev;
1313 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1314 	struct spdk_vhost_session *vsession;
1315 
1316 	if (pthread_mutex_trylock(&user_dev->lock) != 0) {
1317 		spdk_thread_send_msg(spdk_get_thread(), vhost_user_session_stop_event, arg1);
1318 		return;
1319 	}
1320 
1321 	vsession = vhost_session_find_by_id(vdev, ctx->vsession_id);
1322 	user_dev->user_backend->stop_session(vdev, vsession, NULL);
1323 	pthread_mutex_unlock(&user_dev->lock);
1324 }
1325 
1326 static int
1327 vhost_user_wait_for_session_stop(struct spdk_vhost_session *vsession,
1328 				 unsigned timeout_sec, const char *errmsg)
1329 {
1330 	struct vhost_session_fn_ctx ev_ctx = {0};
1331 	struct spdk_vhost_dev *vdev = vsession->vdev;
1332 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1333 
1334 	ev_ctx.vdev = vdev;
1335 	ev_ctx.vsession_id = vsession->id;
1336 
1337 	spdk_thread_send_msg(vdev->thread, vhost_user_session_stop_event, &ev_ctx);
1338 
1339 	pthread_mutex_unlock(&user_dev->lock);
1340 	wait_for_semaphore(timeout_sec, errmsg);
1341 	pthread_mutex_lock(&user_dev->lock);
1342 
1343 	return g_dpdk_response;
1344 }
1345 
1346 static void
1347 foreach_session_finish_cb(void *arg1)
1348 {
1349 	struct vhost_session_fn_ctx *ev_ctx = arg1;
1350 	struct spdk_vhost_dev *vdev = ev_ctx->vdev;
1351 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1352 
1353 	if (pthread_mutex_trylock(&user_dev->lock) != 0) {
1354 		spdk_thread_send_msg(spdk_get_thread(),
1355 				     foreach_session_finish_cb, arg1);
1356 		return;
1357 	}
1358 
1359 	assert(user_dev->pending_async_op_num > 0);
1360 	user_dev->pending_async_op_num--;
1361 	if (ev_ctx->cpl_fn != NULL) {
1362 		ev_ctx->cpl_fn(vdev, ev_ctx->user_ctx);
1363 	}
1364 
1365 	pthread_mutex_unlock(&user_dev->lock);
1366 	free(ev_ctx);
1367 }
1368 
1369 static void
1370 foreach_session(void *arg1)
1371 {
1372 	struct vhost_session_fn_ctx *ev_ctx = arg1;
1373 	struct spdk_vhost_dev *vdev = ev_ctx->vdev;
1374 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1375 	struct spdk_vhost_session *vsession;
1376 	int rc;
1377 
1378 	if (pthread_mutex_trylock(&user_dev->lock) != 0) {
1379 		spdk_thread_send_msg(spdk_get_thread(), foreach_session, arg1);
1380 		return;
1381 	}
1382 
1383 	TAILQ_FOREACH(vsession, &user_dev->vsessions, tailq) {
1384 		rc = ev_ctx->cb_fn(vdev, vsession, ev_ctx->user_ctx);
1385 		if (rc < 0) {
1386 			goto out;
1387 		}
1388 	}
1389 
1390 out:
1391 	pthread_mutex_unlock(&user_dev->lock);
1392 	spdk_thread_send_msg(g_vhost_user_init_thread, foreach_session_finish_cb, arg1);
1393 }
1394 
1395 void
1396 vhost_user_dev_foreach_session(struct spdk_vhost_dev *vdev,
1397 			       spdk_vhost_session_fn fn,
1398 			       spdk_vhost_dev_fn cpl_fn,
1399 			       void *arg)
1400 {
1401 	struct vhost_session_fn_ctx *ev_ctx;
1402 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1403 
1404 	ev_ctx = calloc(1, sizeof(*ev_ctx));
1405 	if (ev_ctx == NULL) {
1406 		SPDK_ERRLOG("Failed to alloc vhost event.\n");
1407 		assert(false);
1408 		return;
1409 	}
1410 
1411 	ev_ctx->vdev = vdev;
1412 	ev_ctx->cb_fn = fn;
1413 	ev_ctx->cpl_fn = cpl_fn;
1414 	ev_ctx->user_ctx = arg;
1415 
1416 	pthread_mutex_lock(&user_dev->lock);
1417 	assert(user_dev->pending_async_op_num < UINT32_MAX);
1418 	user_dev->pending_async_op_num++;
1419 	pthread_mutex_unlock(&user_dev->lock);
1420 
1421 	spdk_thread_send_msg(vdev->thread, foreach_session, ev_ctx);
1422 }
1423 
1424 void
1425 vhost_user_session_set_interrupt_mode(struct spdk_vhost_session *vsession, bool interrupt_mode)
1426 {
1427 	uint16_t i;
1428 	int rc = 0;
1429 
1430 	for (i = 0; i < vsession->max_queues; i++) {
1431 		struct spdk_vhost_virtqueue *q = &vsession->virtqueue[i];
1432 		uint64_t num_events = 1;
1433 
1434 		/* vring.desc and vring.desc_packed are in a union struct
1435 		 * so q->vring.desc can replace q->vring.desc_packed.
1436 		 */
1437 		if (q->vring.desc == NULL || q->vring.size == 0) {
1438 			continue;
1439 		}
1440 
1441 		if (interrupt_mode) {
1442 
1443 			/* In case of race condition, always kick vring when switch to intr */
1444 			rc = write(q->vring.kickfd, &num_events, sizeof(num_events));
1445 			if (rc < 0) {
1446 				SPDK_ERRLOG("failed to kick vring: %s.\n", spdk_strerror(errno));
1447 			}
1448 
1449 			vsession->interrupt_mode = true;
1450 		} else {
1451 
1452 			vsession->interrupt_mode = false;
1453 		}
1454 	}
1455 }
1456 
1457 static int
1458 extern_vhost_pre_msg_handler(int vid, void *_msg)
1459 {
1460 	struct vhost_user_msg *msg = _msg;
1461 	struct spdk_vhost_session *vsession;
1462 	struct spdk_vhost_user_dev *user_dev;
1463 
1464 	vsession = vhost_session_find_by_vid(vid);
1465 	if (vsession == NULL) {
1466 		SPDK_ERRLOG("Received a message to uninitialized session (vid %d).\n", vid);
1467 		assert(false);
1468 		return RTE_VHOST_MSG_RESULT_ERR;
1469 	}
1470 	user_dev = to_user_dev(vsession->vdev);
1471 
1472 	switch (msg->request) {
1473 	case VHOST_USER_GET_VRING_BASE:
1474 		pthread_mutex_lock(&user_dev->lock);
1475 		if (vsession->started) {
1476 			pthread_mutex_unlock(&user_dev->lock);
1477 			g_spdk_vhost_ops.destroy_device(vid);
1478 			break;
1479 		}
1480 		pthread_mutex_unlock(&user_dev->lock);
1481 		break;
1482 	case VHOST_USER_SET_MEM_TABLE:
1483 		pthread_mutex_lock(&user_dev->lock);
1484 		if (vsession->started) {
1485 			vsession->original_max_queues = vsession->max_queues;
1486 			pthread_mutex_unlock(&user_dev->lock);
1487 			g_spdk_vhost_ops.destroy_device(vid);
1488 			vsession->needs_restart = true;
1489 			break;
1490 		}
1491 		pthread_mutex_unlock(&user_dev->lock);
1492 		break;
1493 	case VHOST_USER_GET_CONFIG: {
1494 		int rc = 0;
1495 
1496 		pthread_mutex_lock(&user_dev->lock);
1497 		if (vsession->vdev->backend->vhost_get_config) {
1498 			rc = vsession->vdev->backend->vhost_get_config(vsession->vdev,
1499 					msg->payload.cfg.region, msg->payload.cfg.size);
1500 			if (rc != 0) {
1501 				msg->size = 0;
1502 			}
1503 		}
1504 		pthread_mutex_unlock(&user_dev->lock);
1505 
1506 		return RTE_VHOST_MSG_RESULT_REPLY;
1507 	}
1508 	case VHOST_USER_SET_CONFIG: {
1509 		int rc = 0;
1510 
1511 		pthread_mutex_lock(&user_dev->lock);
1512 		if (vsession->vdev->backend->vhost_set_config) {
1513 			rc = vsession->vdev->backend->vhost_set_config(vsession->vdev,
1514 					msg->payload.cfg.region, msg->payload.cfg.offset,
1515 					msg->payload.cfg.size, msg->payload.cfg.flags);
1516 		}
1517 		pthread_mutex_unlock(&user_dev->lock);
1518 
1519 		return rc == 0 ? RTE_VHOST_MSG_RESULT_OK : RTE_VHOST_MSG_RESULT_ERR;
1520 	}
1521 	default:
1522 		break;
1523 	}
1524 
1525 	return RTE_VHOST_MSG_RESULT_NOT_HANDLED;
1526 }
1527 
1528 static int
1529 extern_vhost_post_msg_handler(int vid, void *_msg)
1530 {
1531 	struct vhost_user_msg *msg = _msg;
1532 	struct spdk_vhost_session *vsession;
1533 	struct spdk_vhost_user_dev *user_dev;
1534 	uint16_t qid;
1535 	int rc;
1536 
1537 	vsession = vhost_session_find_by_vid(vid);
1538 	if (vsession == NULL) {
1539 		SPDK_ERRLOG("Received a message to uninitialized session (vid %d).\n", vid);
1540 		assert(false);
1541 		return RTE_VHOST_MSG_RESULT_ERR;
1542 	}
1543 	user_dev = to_user_dev(vsession->vdev);
1544 
1545 	switch (msg->request) {
1546 	case VHOST_USER_SET_FEATURES:
1547 		rc = vhost_get_negotiated_features(vid, &vsession->negotiated_features);
1548 		if (rc) {
1549 			SPDK_ERRLOG("vhost device %d: Failed to get negotiated driver features\n", vid);
1550 			return RTE_VHOST_MSG_RESULT_ERR;
1551 		}
1552 		break;
1553 	case VHOST_USER_SET_VRING_CALL:
1554 		qid = (uint16_t)msg->payload.u64;
1555 		rc = set_device_vq_callfd(vsession, qid);
1556 		if (rc) {
1557 			return RTE_VHOST_MSG_RESULT_ERR;
1558 		}
1559 		break;
1560 	case VHOST_USER_SET_VRING_KICK:
1561 		qid = (uint16_t)msg->payload.u64;
1562 		rc = enable_device_vq(vsession, qid);
1563 		if (rc) {
1564 			return RTE_VHOST_MSG_RESULT_ERR;
1565 		}
1566 
1567 		/* vhost-user spec tells us to start polling a queue after receiving
1568 		 * its SET_VRING_KICK message. Let's do it!
1569 		 */
1570 		pthread_mutex_lock(&user_dev->lock);
1571 		if (!vsession->started) {
1572 			pthread_mutex_unlock(&user_dev->lock);
1573 			g_spdk_vhost_ops.new_device(vid);
1574 			return RTE_VHOST_MSG_RESULT_NOT_HANDLED;
1575 		}
1576 		pthread_mutex_unlock(&user_dev->lock);
1577 		break;
1578 	case VHOST_USER_SET_MEM_TABLE:
1579 		vhost_register_memtable_if_required(vsession, vid);
1580 		pthread_mutex_lock(&user_dev->lock);
1581 		if (vsession->needs_restart) {
1582 			pthread_mutex_unlock(&user_dev->lock);
1583 			for (qid = 0; qid < vsession->original_max_queues; qid++) {
1584 				enable_device_vq(vsession, qid);
1585 			}
1586 			vsession->original_max_queues = 0;
1587 			vsession->needs_restart = false;
1588 			g_spdk_vhost_ops.new_device(vid);
1589 			break;
1590 		}
1591 		pthread_mutex_unlock(&user_dev->lock);
1592 		break;
1593 	default:
1594 		break;
1595 	}
1596 
1597 	return RTE_VHOST_MSG_RESULT_NOT_HANDLED;
1598 }
1599 
1600 struct rte_vhost_user_extern_ops g_spdk_extern_vhost_ops = {
1601 	.pre_msg_handle = extern_vhost_pre_msg_handler,
1602 	.post_msg_handle = extern_vhost_post_msg_handler,
1603 };
1604 
1605 void
1606 vhost_session_install_rte_compat_hooks(struct spdk_vhost_session *vsession)
1607 {
1608 	int rc;
1609 
1610 	rc = rte_vhost_extern_callback_register(vsession->vid, &g_spdk_extern_vhost_ops, NULL);
1611 	if (rc != 0) {
1612 		SPDK_ERRLOG("rte_vhost_extern_callback_register() failed for vid = %d\n",
1613 			    vsession->vid);
1614 		return;
1615 	}
1616 }
1617 
1618 int
1619 vhost_register_unix_socket(const char *path, const char *ctrl_name,
1620 			   uint64_t virtio_features, uint64_t disabled_features, uint64_t protocol_features)
1621 {
1622 	struct stat file_stat;
1623 	uint64_t features = 0;
1624 	uint64_t flags = 0;
1625 
1626 	/* Register vhost driver to handle vhost messages. */
1627 	if (stat(path, &file_stat) != -1) {
1628 		if (!S_ISSOCK(file_stat.st_mode)) {
1629 			SPDK_ERRLOG("Cannot create a domain socket at path \"%s\": "
1630 				    "The file already exists and is not a socket.\n",
1631 				    path);
1632 			return -EIO;
1633 		} else if (unlink(path) != 0) {
1634 			SPDK_ERRLOG("Cannot create a domain socket at path \"%s\": "
1635 				    "The socket already exists and failed to unlink.\n",
1636 				    path);
1637 			return -EIO;
1638 		}
1639 	}
1640 
1641 	flags = spdk_iommu_is_enabled() ? 0 : RTE_VHOST_USER_ASYNC_COPY;
1642 	if (rte_vhost_driver_register(path, flags) != 0) {
1643 		SPDK_ERRLOG("Could not register controller %s with vhost library\n", ctrl_name);
1644 		SPDK_ERRLOG("Check if domain socket %s already exists\n", path);
1645 		return -EIO;
1646 	}
1647 	if (rte_vhost_driver_set_features(path, virtio_features) ||
1648 	    rte_vhost_driver_disable_features(path, disabled_features)) {
1649 		SPDK_ERRLOG("Couldn't set vhost features for controller %s\n", ctrl_name);
1650 
1651 		rte_vhost_driver_unregister(path);
1652 		return -EIO;
1653 	}
1654 
1655 	if (rte_vhost_driver_callback_register(path, &g_spdk_vhost_ops) != 0) {
1656 		rte_vhost_driver_unregister(path);
1657 		SPDK_ERRLOG("Couldn't register callbacks for controller %s\n", ctrl_name);
1658 		return -EIO;
1659 	}
1660 
1661 	rte_vhost_driver_get_protocol_features(path, &features);
1662 	features |= protocol_features;
1663 	rte_vhost_driver_set_protocol_features(path, features);
1664 
1665 	if (rte_vhost_driver_start(path) != 0) {
1666 		SPDK_ERRLOG("Failed to start vhost driver for controller %s (%d): %s\n",
1667 			    ctrl_name, errno, spdk_strerror(errno));
1668 		rte_vhost_driver_unregister(path);
1669 		return -EIO;
1670 	}
1671 
1672 	return 0;
1673 }
1674 
1675 int
1676 vhost_get_mem_table(int vid, struct rte_vhost_memory **mem)
1677 {
1678 	return rte_vhost_get_mem_table(vid, mem);
1679 }
1680 
1681 int
1682 vhost_driver_unregister(const char *path)
1683 {
1684 	return rte_vhost_driver_unregister(path);
1685 }
1686 
1687 int
1688 vhost_get_negotiated_features(int vid, uint64_t *negotiated_features)
1689 {
1690 	return rte_vhost_get_negotiated_features(vid, negotiated_features);
1691 }
1692 
1693 int
1694 vhost_user_dev_set_coalescing(struct spdk_vhost_user_dev *user_dev, uint32_t delay_base_us,
1695 			      uint32_t iops_threshold)
1696 {
1697 	uint64_t delay_time_base = delay_base_us * spdk_get_ticks_hz() / 1000000ULL;
1698 	uint32_t io_rate = iops_threshold * SPDK_VHOST_STATS_CHECK_INTERVAL_MS / 1000U;
1699 
1700 	if (delay_time_base >= UINT32_MAX) {
1701 		SPDK_ERRLOG("Delay time of %"PRIu32" is to big\n", delay_base_us);
1702 		return -EINVAL;
1703 	} else if (io_rate == 0) {
1704 		SPDK_ERRLOG("IOPS rate of %"PRIu32" is too low. Min is %u\n", io_rate,
1705 			    1000U / SPDK_VHOST_STATS_CHECK_INTERVAL_MS);
1706 		return -EINVAL;
1707 	}
1708 
1709 	user_dev->coalescing_delay_us = delay_base_us;
1710 	user_dev->coalescing_iops_threshold = iops_threshold;
1711 	return 0;
1712 }
1713 
1714 int
1715 vhost_user_session_set_coalescing(struct spdk_vhost_dev *vdev,
1716 				  struct spdk_vhost_session *vsession, void *ctx)
1717 {
1718 	vsession->coalescing_delay_time_base =
1719 		to_user_dev(vdev)->coalescing_delay_us * spdk_get_ticks_hz() / 1000000ULL;
1720 	vsession->coalescing_io_rate_threshold =
1721 		to_user_dev(vdev)->coalescing_iops_threshold * SPDK_VHOST_STATS_CHECK_INTERVAL_MS / 1000U;
1722 	return 0;
1723 }
1724 
1725 int
1726 vhost_user_set_coalescing(struct spdk_vhost_dev *vdev, uint32_t delay_base_us,
1727 			  uint32_t iops_threshold)
1728 {
1729 	int rc;
1730 
1731 	rc = vhost_user_dev_set_coalescing(to_user_dev(vdev), delay_base_us, iops_threshold);
1732 	if (rc != 0) {
1733 		return rc;
1734 	}
1735 
1736 	vhost_user_dev_foreach_session(vdev, vhost_user_session_set_coalescing, NULL, NULL);
1737 
1738 	return 0;
1739 }
1740 
1741 void
1742 vhost_user_get_coalescing(struct spdk_vhost_dev *vdev, uint32_t *delay_base_us,
1743 			  uint32_t *iops_threshold)
1744 {
1745 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1746 
1747 	if (delay_base_us) {
1748 		*delay_base_us = user_dev->coalescing_delay_us;
1749 	}
1750 
1751 	if (iops_threshold) {
1752 		*iops_threshold = user_dev->coalescing_iops_threshold;
1753 	}
1754 }
1755 
1756 int
1757 spdk_vhost_set_socket_path(const char *basename)
1758 {
1759 	int ret;
1760 
1761 	if (basename && strlen(basename) > 0) {
1762 		ret = snprintf(g_vhost_user_dev_dirname, sizeof(g_vhost_user_dev_dirname) - 2, "%s", basename);
1763 		if (ret <= 0) {
1764 			return -EINVAL;
1765 		}
1766 		if ((size_t)ret >= sizeof(g_vhost_user_dev_dirname) - 2) {
1767 			SPDK_ERRLOG("Char dev dir path length %d is too long\n", ret);
1768 			return -EINVAL;
1769 		}
1770 
1771 		if (g_vhost_user_dev_dirname[ret - 1] != '/') {
1772 			g_vhost_user_dev_dirname[ret] = '/';
1773 			g_vhost_user_dev_dirname[ret + 1]  = '\0';
1774 		}
1775 	}
1776 
1777 	return 0;
1778 }
1779 
1780 static void
1781 vhost_dev_thread_exit(void *arg1)
1782 {
1783 	spdk_thread_exit(spdk_get_thread());
1784 }
1785 
1786 static bool g_vhost_user_started = false;
1787 
1788 int
1789 vhost_user_dev_register(struct spdk_vhost_dev *vdev, const char *name, struct spdk_cpuset *cpumask,
1790 			const struct spdk_vhost_user_dev_backend *user_backend)
1791 {
1792 	char path[PATH_MAX];
1793 	struct spdk_vhost_user_dev *user_dev;
1794 
1795 	if (snprintf(path, sizeof(path), "%s%s", g_vhost_user_dev_dirname, name) >= (int)sizeof(path)) {
1796 		SPDK_ERRLOG("Resulting socket path for controller %s is too long: %s%s\n",
1797 			    name, g_vhost_user_dev_dirname, name);
1798 		return -EINVAL;
1799 	}
1800 
1801 	vdev->path = strdup(path);
1802 	if (vdev->path == NULL) {
1803 		return -EIO;
1804 	}
1805 
1806 	user_dev = calloc(1, sizeof(*user_dev));
1807 	if (user_dev == NULL) {
1808 		free(vdev->path);
1809 		return -ENOMEM;
1810 	}
1811 	vdev->ctxt = user_dev;
1812 
1813 	vdev->thread = spdk_thread_create(vdev->name, cpumask);
1814 	if (vdev->thread == NULL) {
1815 		free(user_dev);
1816 		free(vdev->path);
1817 		SPDK_ERRLOG("Failed to create thread for vhost controller %s.\n", name);
1818 		return -EIO;
1819 	}
1820 
1821 	user_dev->user_backend = user_backend;
1822 	user_dev->vdev = vdev;
1823 	user_dev->registered = true;
1824 	TAILQ_INIT(&user_dev->vsessions);
1825 	pthread_mutex_init(&user_dev->lock, NULL);
1826 
1827 	vhost_user_dev_set_coalescing(user_dev, SPDK_VHOST_COALESCING_DELAY_BASE_US,
1828 				      SPDK_VHOST_VQ_IOPS_COALESCING_THRESHOLD);
1829 
1830 	if (vhost_register_unix_socket(path, name, vdev->virtio_features, vdev->disabled_features,
1831 				       vdev->protocol_features)) {
1832 		spdk_thread_send_msg(vdev->thread, vhost_dev_thread_exit, NULL);
1833 		pthread_mutex_destroy(&user_dev->lock);
1834 		free(user_dev);
1835 		free(vdev->path);
1836 		return -EIO;
1837 	}
1838 
1839 	return 0;
1840 }
1841 
1842 int
1843 vhost_user_dev_unregister(struct spdk_vhost_dev *vdev)
1844 {
1845 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1846 	struct spdk_vhost_session *vsession, *tmp_vsession;
1847 
1848 	pthread_mutex_lock(&user_dev->lock);
1849 	if (user_dev->pending_async_op_num) {
1850 		pthread_mutex_unlock(&user_dev->lock);
1851 		return -EBUSY;
1852 	}
1853 
1854 	/* This is the case that uses RPC call `vhost_delete_controller` while VM is connected */
1855 	if (!TAILQ_EMPTY(&user_dev->vsessions) && g_vhost_user_started) {
1856 		SPDK_ERRLOG("Controller %s has still valid connection.\n", vdev->name);
1857 		pthread_mutex_unlock(&user_dev->lock);
1858 		return -EBUSY;
1859 	}
1860 
1861 	/* This is the case that quits the subsystem while VM is connected, the VM
1862 	 * should be stopped by the shutdown thread.
1863 	 */
1864 	if (!g_vhost_user_started) {
1865 		TAILQ_FOREACH_SAFE(vsession, &user_dev->vsessions, tailq, tmp_vsession) {
1866 			assert(vsession->started == false);
1867 			TAILQ_REMOVE(&user_dev->vsessions, vsession, tailq);
1868 			if (vsession->mem) {
1869 				vhost_session_mem_unregister(vsession->mem);
1870 				free(vsession->mem);
1871 			}
1872 			free(vsession->name);
1873 			free(vsession);
1874 		}
1875 	}
1876 
1877 	user_dev->registered = false;
1878 	pthread_mutex_unlock(&user_dev->lock);
1879 
1880 	/* There are no valid connections now, and it's not an error if the domain
1881 	 * socket was already removed by shutdown thread.
1882 	 */
1883 	vhost_driver_unregister(vdev->path);
1884 
1885 	spdk_thread_send_msg(vdev->thread, vhost_dev_thread_exit, NULL);
1886 	pthread_mutex_destroy(&user_dev->lock);
1887 
1888 	free(user_dev);
1889 	free(vdev->path);
1890 
1891 	return 0;
1892 }
1893 
1894 int
1895 vhost_user_init(void)
1896 {
1897 	size_t len;
1898 
1899 	if (g_vhost_user_started) {
1900 		return 0;
1901 	}
1902 
1903 	if (g_vhost_user_dev_dirname[0] == '\0') {
1904 		if (getcwd(g_vhost_user_dev_dirname, sizeof(g_vhost_user_dev_dirname) - 1) == NULL) {
1905 			SPDK_ERRLOG("getcwd failed (%d): %s\n", errno, spdk_strerror(errno));
1906 			return -1;
1907 		}
1908 
1909 		len = strlen(g_vhost_user_dev_dirname);
1910 		if (g_vhost_user_dev_dirname[len - 1] != '/') {
1911 			g_vhost_user_dev_dirname[len] = '/';
1912 			g_vhost_user_dev_dirname[len + 1] = '\0';
1913 		}
1914 	}
1915 
1916 	g_vhost_user_started = true;
1917 
1918 	g_vhost_user_init_thread = spdk_get_thread();
1919 	assert(g_vhost_user_init_thread != NULL);
1920 
1921 	return 0;
1922 }
1923 
1924 static void
1925 vhost_user_session_shutdown_on_init(void *vhost_cb)
1926 {
1927 	spdk_vhost_fini_cb fn = vhost_cb;
1928 
1929 	fn();
1930 }
1931 
1932 static void *
1933 vhost_user_session_shutdown(void *vhost_cb)
1934 {
1935 	struct spdk_vhost_dev *vdev = NULL;
1936 	struct spdk_vhost_session *vsession;
1937 	struct spdk_vhost_user_dev *user_dev;
1938 	int ret;
1939 
1940 	for (vdev = spdk_vhost_dev_next(NULL); vdev != NULL;
1941 	     vdev = spdk_vhost_dev_next(vdev)) {
1942 		user_dev = to_user_dev(vdev);
1943 		ret = 0;
1944 		pthread_mutex_lock(&user_dev->lock);
1945 		TAILQ_FOREACH(vsession, &user_dev->vsessions, tailq) {
1946 			if (vsession->started || vsession->starting) {
1947 				ret += _stop_session(vsession);
1948 			}
1949 		}
1950 		pthread_mutex_unlock(&user_dev->lock);
1951 		if (ret == 0) {
1952 			vhost_driver_unregister(vdev->path);
1953 		}
1954 	}
1955 
1956 	SPDK_INFOLOG(vhost, "Exiting\n");
1957 	spdk_thread_send_msg(g_vhost_user_init_thread, vhost_user_session_shutdown_on_init, vhost_cb);
1958 	return NULL;
1959 }
1960 
1961 void
1962 vhost_user_fini(spdk_vhost_fini_cb vhost_cb)
1963 {
1964 	pthread_t tid;
1965 	int rc;
1966 
1967 	if (!g_vhost_user_started) {
1968 		vhost_cb();
1969 		return;
1970 	}
1971 
1972 	g_vhost_user_started = false;
1973 
1974 	/* rte_vhost API for removing sockets is not asynchronous. Since it may call SPDK
1975 	 * ops for stopping a device or removing a connection, we need to call it from
1976 	 * a separate thread to avoid deadlock.
1977 	 */
1978 	rc = pthread_create(&tid, NULL, &vhost_user_session_shutdown, vhost_cb);
1979 	if (rc < 0) {
1980 		SPDK_ERRLOG("Failed to start session shutdown thread (%d): %s\n", rc, spdk_strerror(rc));
1981 		abort();
1982 	}
1983 	pthread_detach(tid);
1984 }
1985 
1986 void
1987 vhost_session_info_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_ctx *w)
1988 {
1989 	struct spdk_vhost_session *vsession;
1990 	struct spdk_vhost_user_dev *user_dev;
1991 
1992 	user_dev = to_user_dev(vdev);
1993 	pthread_mutex_lock(&user_dev->lock);
1994 	TAILQ_FOREACH(vsession, &user_dev->vsessions, tailq) {
1995 		spdk_json_write_object_begin(w);
1996 		spdk_json_write_named_uint32(w, "vid", vsession->vid);
1997 		spdk_json_write_named_uint32(w, "id", vsession->id);
1998 		spdk_json_write_named_string(w, "name", vsession->name);
1999 		spdk_json_write_named_bool(w, "started", vsession->started);
2000 		spdk_json_write_named_uint32(w, "max_queues", vsession->max_queues);
2001 		spdk_json_write_named_uint32(w, "inflight_task_cnt", vsession->task_cnt);
2002 		spdk_json_write_object_end(w);
2003 	}
2004 	pthread_mutex_unlock(&user_dev->lock);
2005 }
2006