xref: /spdk/lib/vhost/rte_vhost_user.c (revision 60982c759db49b4f4579f16e3b24df0725ba4b94)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2019 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2021 Mellanox Technologies LTD. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/env.h"
10 #include "spdk/likely.h"
11 #include "spdk/string.h"
12 #include "spdk/util.h"
13 #include "spdk/memory.h"
14 #include "spdk/barrier.h"
15 #include "spdk/vhost.h"
16 #include "vhost_internal.h"
17 #include <rte_version.h>
18 
19 #include "spdk_internal/vhost_user.h"
20 
21 /* Path to folder where character device will be created. Can be set by user. */
22 static char g_vhost_user_dev_dirname[PATH_MAX] = "";
23 
24 static struct spdk_thread *g_vhost_user_init_thread;
25 
26 /**
27  * DPDK calls our callbacks synchronously but the work those callbacks
28  * perform needs to be async. Luckily, all DPDK callbacks are called on
29  * a DPDK-internal pthread, so we'll just wait on a semaphore in there.
30  */
31 static sem_t g_dpdk_sem;
32 
33 /** Return code for the current DPDK callback */
34 static int g_dpdk_response;
35 
36 struct vhost_session_fn_ctx {
37 	/** Device pointer obtained before enqueueing the event */
38 	struct spdk_vhost_dev *vdev;
39 
40 	/** ID of the session to send event to. */
41 	uint32_t vsession_id;
42 
43 	/** User provided function to be executed on session's thread. */
44 	spdk_vhost_session_fn cb_fn;
45 
46 	/**
47 	 * User provided function to be called on the init thread
48 	 * after iterating through all sessions.
49 	 */
50 	spdk_vhost_dev_fn cpl_fn;
51 
52 	/** Custom user context */
53 	void *user_ctx;
54 };
55 
56 static int vhost_user_wait_for_session_stop(struct spdk_vhost_session *vsession,
57 		unsigned timeout_sec, const char *errmsg);
58 
59 static void
60 __attribute__((constructor))
61 _vhost_user_sem_init(void)
62 {
63 	if (sem_init(&g_dpdk_sem, 0, 0) != 0) {
64 		SPDK_ERRLOG("Failed to initialize semaphore for rte_vhost pthread.\n");
65 		abort();
66 	}
67 }
68 
69 static void
70 __attribute__((destructor))
71 _vhost_user_sem_destroy(void)
72 {
73 	sem_destroy(&g_dpdk_sem);
74 }
75 
76 void *
77 vhost_gpa_to_vva(struct spdk_vhost_session *vsession, uint64_t addr, uint64_t len)
78 {
79 	void *vva;
80 	uint64_t newlen;
81 
82 	newlen = len;
83 	vva = (void *)rte_vhost_va_from_guest_pa(vsession->mem, addr, &newlen);
84 	if (newlen != len) {
85 		return NULL;
86 	}
87 
88 	return vva;
89 
90 }
91 
92 static void
93 vhost_log_req_desc(struct spdk_vhost_session *vsession, struct spdk_vhost_virtqueue *virtqueue,
94 		   uint16_t req_id)
95 {
96 	struct vring_desc *desc, *desc_table;
97 	uint32_t desc_table_size;
98 	int rc;
99 
100 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
101 		return;
102 	}
103 
104 	rc = vhost_vq_get_desc(vsession, virtqueue, req_id, &desc, &desc_table, &desc_table_size);
105 	if (spdk_unlikely(rc != 0)) {
106 		SPDK_ERRLOG("Can't log used ring descriptors!\n");
107 		return;
108 	}
109 
110 	do {
111 		if (vhost_vring_desc_is_wr(desc)) {
112 			/* To be honest, only pages really touched should be logged, but
113 			 * doing so would require tracking those changes in each backed.
114 			 * Also backend most likely will touch all/most of those pages so
115 			 * for lets assume we touched all pages passed to as writeable buffers. */
116 			rte_vhost_log_write(vsession->vid, desc->addr, desc->len);
117 		}
118 		vhost_vring_desc_get_next(&desc, desc_table, desc_table_size);
119 	} while (desc);
120 }
121 
122 static void
123 vhost_log_used_vring_elem(struct spdk_vhost_session *vsession,
124 			  struct spdk_vhost_virtqueue *virtqueue,
125 			  uint16_t idx)
126 {
127 	uint64_t offset, len;
128 
129 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
130 		return;
131 	}
132 
133 	if (spdk_unlikely(virtqueue->packed.packed_ring)) {
134 		offset = idx * sizeof(struct vring_packed_desc);
135 		len = sizeof(struct vring_packed_desc);
136 	} else {
137 		offset = offsetof(struct vring_used, ring[idx]);
138 		len = sizeof(virtqueue->vring.used->ring[idx]);
139 	}
140 
141 	rte_vhost_log_used_vring(vsession->vid, virtqueue->vring_idx, offset, len);
142 }
143 
144 static void
145 vhost_log_used_vring_idx(struct spdk_vhost_session *vsession,
146 			 struct spdk_vhost_virtqueue *virtqueue)
147 {
148 	uint64_t offset, len;
149 	uint16_t vq_idx;
150 
151 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
152 		return;
153 	}
154 
155 	offset = offsetof(struct vring_used, idx);
156 	len = sizeof(virtqueue->vring.used->idx);
157 	vq_idx = virtqueue - vsession->virtqueue;
158 
159 	rte_vhost_log_used_vring(vsession->vid, vq_idx, offset, len);
160 }
161 
162 /*
163  * Get available requests from avail ring.
164  */
165 uint16_t
166 vhost_vq_avail_ring_get(struct spdk_vhost_virtqueue *virtqueue, uint16_t *reqs,
167 			uint16_t reqs_len)
168 {
169 	struct rte_vhost_vring *vring = &virtqueue->vring;
170 	struct vring_avail *avail = vring->avail;
171 	uint16_t size_mask = vring->size - 1;
172 	uint16_t last_idx = virtqueue->last_avail_idx, avail_idx = avail->idx;
173 	uint16_t count, i;
174 	int rc;
175 	uint64_t u64_value;
176 
177 	spdk_smp_rmb();
178 
179 	if (virtqueue->vsession && spdk_unlikely(virtqueue->vsession->interrupt_mode)) {
180 		/* Read to clear vring's kickfd */
181 		rc = read(vring->kickfd, &u64_value, sizeof(u64_value));
182 		if (rc < 0) {
183 			SPDK_ERRLOG("failed to acknowledge kickfd: %s.\n", spdk_strerror(errno));
184 			return -errno;
185 		}
186 	}
187 
188 	count = avail_idx - last_idx;
189 	if (spdk_likely(count == 0)) {
190 		return 0;
191 	}
192 
193 	if (spdk_unlikely(count > vring->size)) {
194 		/* TODO: the queue is unrecoverably broken and should be marked so.
195 		 * For now we will fail silently and report there are no new avail entries.
196 		 */
197 		return 0;
198 	}
199 
200 	count = spdk_min(count, reqs_len);
201 
202 	virtqueue->last_avail_idx += count;
203 	/* Check whether there are unprocessed reqs in vq, then kick vq manually */
204 	if (virtqueue->vsession && spdk_unlikely(virtqueue->vsession->interrupt_mode)) {
205 		/* If avail_idx is larger than virtqueue's last_avail_idx, then there is unprocessed reqs.
206 		 * avail_idx should get updated here from memory, in case of race condition with guest.
207 		 */
208 		avail_idx = * (volatile uint16_t *) &avail->idx;
209 		if (avail_idx > virtqueue->last_avail_idx) {
210 			/* Write to notify vring's kickfd */
211 			rc = write(vring->kickfd, &u64_value, sizeof(u64_value));
212 			if (rc < 0) {
213 				SPDK_ERRLOG("failed to kick vring: %s.\n", spdk_strerror(errno));
214 				return -errno;
215 			}
216 		}
217 	}
218 
219 	for (i = 0; i < count; i++) {
220 		reqs[i] = vring->avail->ring[(last_idx + i) & size_mask];
221 	}
222 
223 	SPDK_DEBUGLOG(vhost_ring,
224 		      "AVAIL: last_idx=%"PRIu16" avail_idx=%"PRIu16" count=%"PRIu16"\n",
225 		      last_idx, avail_idx, count);
226 
227 	return count;
228 }
229 
230 static bool
231 vhost_vring_desc_is_indirect(struct vring_desc *cur_desc)
232 {
233 	return !!(cur_desc->flags & VRING_DESC_F_INDIRECT);
234 }
235 
236 static bool
237 vhost_vring_packed_desc_is_indirect(struct vring_packed_desc *cur_desc)
238 {
239 	return (cur_desc->flags & VRING_DESC_F_INDIRECT) != 0;
240 }
241 
242 static bool
243 vhost_inflight_packed_desc_is_indirect(spdk_vhost_inflight_desc *cur_desc)
244 {
245 	return (cur_desc->flags & VRING_DESC_F_INDIRECT) != 0;
246 }
247 
248 int
249 vhost_vq_get_desc(struct spdk_vhost_session *vsession, struct spdk_vhost_virtqueue *virtqueue,
250 		  uint16_t req_idx, struct vring_desc **desc, struct vring_desc **desc_table,
251 		  uint32_t *desc_table_size)
252 {
253 	if (spdk_unlikely(req_idx >= virtqueue->vring.size)) {
254 		return -1;
255 	}
256 
257 	*desc = &virtqueue->vring.desc[req_idx];
258 
259 	if (vhost_vring_desc_is_indirect(*desc)) {
260 		*desc_table_size = (*desc)->len / sizeof(**desc);
261 		*desc_table = vhost_gpa_to_vva(vsession, (*desc)->addr,
262 					       sizeof(**desc) * *desc_table_size);
263 		*desc = *desc_table;
264 		if (*desc == NULL) {
265 			return -1;
266 		}
267 
268 		return 0;
269 	}
270 
271 	*desc_table = virtqueue->vring.desc;
272 	*desc_table_size = virtqueue->vring.size;
273 
274 	return 0;
275 }
276 
277 static bool
278 vhost_packed_desc_indirect_to_desc_table(struct spdk_vhost_session *vsession,
279 		uint64_t addr, uint32_t len,
280 		struct vring_packed_desc **desc_table,
281 		uint32_t *desc_table_size)
282 {
283 	*desc_table_size = len / sizeof(struct vring_packed_desc);
284 
285 	*desc_table = vhost_gpa_to_vva(vsession, addr, len);
286 	if (spdk_unlikely(*desc_table == NULL)) {
287 		return false;
288 	}
289 
290 	return true;
291 }
292 
293 int
294 vhost_vq_get_desc_packed(struct spdk_vhost_session *vsession,
295 			 struct spdk_vhost_virtqueue *virtqueue,
296 			 uint16_t req_idx, struct vring_packed_desc **desc,
297 			 struct vring_packed_desc **desc_table, uint32_t *desc_table_size)
298 {
299 	*desc =  &virtqueue->vring.desc_packed[req_idx];
300 
301 	/* In packed ring when the desc is non-indirect we get next desc
302 	 * by judging (desc->flag & VRING_DESC_F_NEXT) != 0. When the desc
303 	 * is indirect we get next desc by idx and desc_table_size. It's
304 	 * different from split ring.
305 	 */
306 	if (vhost_vring_packed_desc_is_indirect(*desc)) {
307 		if (!vhost_packed_desc_indirect_to_desc_table(vsession, (*desc)->addr, (*desc)->len,
308 				desc_table, desc_table_size)) {
309 			return -1;
310 		}
311 
312 		*desc = *desc_table;
313 	} else {
314 		*desc_table = NULL;
315 		*desc_table_size  = 0;
316 	}
317 
318 	return 0;
319 }
320 
321 int
322 vhost_inflight_queue_get_desc(struct spdk_vhost_session *vsession,
323 			      spdk_vhost_inflight_desc *desc_array,
324 			      uint16_t req_idx, spdk_vhost_inflight_desc **desc,
325 			      struct vring_packed_desc  **desc_table, uint32_t *desc_table_size)
326 {
327 	*desc = &desc_array[req_idx];
328 
329 	if (vhost_inflight_packed_desc_is_indirect(*desc)) {
330 		if (!vhost_packed_desc_indirect_to_desc_table(vsession, (*desc)->addr, (*desc)->len,
331 				desc_table, desc_table_size)) {
332 			return -1;
333 		}
334 
335 		/* This desc is the inflight desc not the packed desc.
336 		 * When set the F_INDIRECT the table entry should be the packed desc
337 		 * so set the inflight desc NULL.
338 		 */
339 		*desc = NULL;
340 	} else {
341 		/* When not set the F_INDIRECT means there is no packed desc table */
342 		*desc_table = NULL;
343 		*desc_table_size = 0;
344 	}
345 
346 	return 0;
347 }
348 
349 int
350 vhost_vq_used_signal(struct spdk_vhost_session *vsession,
351 		     struct spdk_vhost_virtqueue *virtqueue)
352 {
353 	if (virtqueue->used_req_cnt == 0) {
354 		return 0;
355 	}
356 
357 	SPDK_DEBUGLOG(vhost_ring,
358 		      "Queue %td - USED RING: sending IRQ: last used %"PRIu16"\n",
359 		      virtqueue - vsession->virtqueue, virtqueue->last_used_idx);
360 
361 	if (rte_vhost_vring_call(vsession->vid, virtqueue->vring_idx) == 0) {
362 		/* interrupt signalled */
363 		virtqueue->req_cnt += virtqueue->used_req_cnt;
364 		virtqueue->used_req_cnt = 0;
365 		return 1;
366 	} else {
367 		/* interrupt not signalled */
368 		return 0;
369 	}
370 }
371 
372 static void
373 session_vq_io_stats_update(struct spdk_vhost_session *vsession,
374 			   struct spdk_vhost_virtqueue *virtqueue, uint64_t now)
375 {
376 	uint32_t irq_delay_base = vsession->coalescing_delay_time_base;
377 	uint32_t io_threshold = vsession->coalescing_io_rate_threshold;
378 	int32_t irq_delay;
379 	uint32_t req_cnt;
380 
381 	req_cnt = virtqueue->req_cnt + virtqueue->used_req_cnt;
382 	if (req_cnt <= io_threshold) {
383 		return;
384 	}
385 
386 	irq_delay = (irq_delay_base * (req_cnt - io_threshold)) / io_threshold;
387 	virtqueue->irq_delay_time = (uint32_t) spdk_max(0, irq_delay);
388 
389 	virtqueue->req_cnt = 0;
390 	virtqueue->next_event_time = now;
391 }
392 
393 static void
394 check_session_vq_io_stats(struct spdk_vhost_session *vsession,
395 			  struct spdk_vhost_virtqueue *virtqueue, uint64_t now)
396 {
397 	if (now < vsession->next_stats_check_time) {
398 		return;
399 	}
400 
401 	vsession->next_stats_check_time = now + vsession->stats_check_interval;
402 	session_vq_io_stats_update(vsession, virtqueue, now);
403 }
404 
405 static inline bool
406 vhost_vq_event_is_suppressed(struct spdk_vhost_virtqueue *vq)
407 {
408 	if (spdk_unlikely(vq->packed.packed_ring)) {
409 		if (vq->vring.driver_event->flags & VRING_PACKED_EVENT_FLAG_DISABLE) {
410 			return true;
411 		}
412 	} else {
413 		if (vq->vring.avail->flags & VRING_AVAIL_F_NO_INTERRUPT) {
414 			return true;
415 		}
416 	}
417 
418 	return false;
419 }
420 
421 void
422 vhost_session_vq_used_signal(struct spdk_vhost_virtqueue *virtqueue)
423 {
424 	struct spdk_vhost_session *vsession = virtqueue->vsession;
425 	uint64_t now;
426 
427 	if (vsession->coalescing_delay_time_base == 0) {
428 		if (virtqueue->vring.desc == NULL) {
429 			return;
430 		}
431 
432 		if (vhost_vq_event_is_suppressed(virtqueue)) {
433 			return;
434 		}
435 
436 		vhost_vq_used_signal(vsession, virtqueue);
437 	} else {
438 		now = spdk_get_ticks();
439 		check_session_vq_io_stats(vsession, virtqueue, now);
440 
441 		/* No need for event right now */
442 		if (now < virtqueue->next_event_time) {
443 			return;
444 		}
445 
446 		if (vhost_vq_event_is_suppressed(virtqueue)) {
447 			return;
448 		}
449 
450 		if (!vhost_vq_used_signal(vsession, virtqueue)) {
451 			return;
452 		}
453 
454 		/* Syscall is quite long so update time */
455 		now = spdk_get_ticks();
456 		virtqueue->next_event_time = now + virtqueue->irq_delay_time;
457 	}
458 }
459 
460 /*
461  * Enqueue id and len to used ring.
462  */
463 void
464 vhost_vq_used_ring_enqueue(struct spdk_vhost_session *vsession,
465 			   struct spdk_vhost_virtqueue *virtqueue,
466 			   uint16_t id, uint32_t len)
467 {
468 	struct rte_vhost_vring *vring = &virtqueue->vring;
469 	struct vring_used *used = vring->used;
470 	uint16_t last_idx = virtqueue->last_used_idx & (vring->size - 1);
471 	uint16_t vq_idx = virtqueue->vring_idx;
472 
473 	SPDK_DEBUGLOG(vhost_ring,
474 		      "Queue %td - USED RING: last_idx=%"PRIu16" req id=%"PRIu16" len=%"PRIu32"\n",
475 		      virtqueue - vsession->virtqueue, virtqueue->last_used_idx, id, len);
476 
477 	vhost_log_req_desc(vsession, virtqueue, id);
478 
479 	virtqueue->last_used_idx++;
480 	used->ring[last_idx].id = id;
481 	used->ring[last_idx].len = len;
482 
483 	/* Ensure the used ring is updated before we log it or increment used->idx. */
484 	spdk_smp_wmb();
485 
486 	rte_vhost_set_last_inflight_io_split(vsession->vid, vq_idx, id);
487 
488 	vhost_log_used_vring_elem(vsession, virtqueue, last_idx);
489 	* (volatile uint16_t *) &used->idx = virtqueue->last_used_idx;
490 	vhost_log_used_vring_idx(vsession, virtqueue);
491 
492 	rte_vhost_clr_inflight_desc_split(vsession->vid, vq_idx, virtqueue->last_used_idx, id);
493 
494 	virtqueue->used_req_cnt++;
495 
496 	if (vsession->interrupt_mode) {
497 		if (virtqueue->vring.desc == NULL || vhost_vq_event_is_suppressed(virtqueue)) {
498 			return;
499 		}
500 
501 		vhost_vq_used_signal(vsession, virtqueue);
502 	}
503 }
504 
505 void
506 vhost_vq_packed_ring_enqueue(struct spdk_vhost_session *vsession,
507 			     struct spdk_vhost_virtqueue *virtqueue,
508 			     uint16_t num_descs, uint16_t buffer_id,
509 			     uint32_t length, uint16_t inflight_head)
510 {
511 	struct vring_packed_desc *desc = &virtqueue->vring.desc_packed[virtqueue->last_used_idx];
512 	bool used, avail;
513 
514 	SPDK_DEBUGLOG(vhost_ring,
515 		      "Queue %td - RING: buffer_id=%"PRIu16"\n",
516 		      virtqueue - vsession->virtqueue, buffer_id);
517 
518 	/* When the descriptor is used, two flags in descriptor
519 	 * avail flag and used flag are set to equal
520 	 * and used flag value == used_wrap_counter.
521 	 */
522 	used = !!(desc->flags & VRING_DESC_F_USED);
523 	avail = !!(desc->flags & VRING_DESC_F_AVAIL);
524 	if (spdk_unlikely(used == virtqueue->packed.used_phase && used == avail)) {
525 		SPDK_ERRLOG("descriptor has been used before\n");
526 		return;
527 	}
528 
529 	/* In used desc addr is unused and len specifies the buffer length
530 	 * that has been written to by the device.
531 	 */
532 	desc->addr = 0;
533 	desc->len = length;
534 
535 	/* This bit specifies whether any data has been written by the device */
536 	if (length != 0) {
537 		desc->flags |= VRING_DESC_F_WRITE;
538 	}
539 
540 	/* Buffer ID is included in the last descriptor in the list.
541 	 * The driver needs to keep track of the size of the list corresponding
542 	 * to each buffer ID.
543 	 */
544 	desc->id = buffer_id;
545 
546 	/* A device MUST NOT make the descriptor used before buffer_id is
547 	 * written to the descriptor.
548 	 */
549 	spdk_smp_wmb();
550 
551 	rte_vhost_set_last_inflight_io_packed(vsession->vid, virtqueue->vring_idx, inflight_head);
552 	/* To mark a desc as used, the device sets the F_USED bit in flags to match
553 	 * the internal Device ring wrap counter. It also sets the F_AVAIL bit to
554 	 * match the same value.
555 	 */
556 	if (virtqueue->packed.used_phase) {
557 		desc->flags |= VRING_DESC_F_AVAIL_USED;
558 	} else {
559 		desc->flags &= ~VRING_DESC_F_AVAIL_USED;
560 	}
561 	rte_vhost_clr_inflight_desc_packed(vsession->vid, virtqueue->vring_idx, inflight_head);
562 
563 	vhost_log_used_vring_elem(vsession, virtqueue, virtqueue->last_used_idx);
564 	virtqueue->last_used_idx += num_descs;
565 	if (virtqueue->last_used_idx >= virtqueue->vring.size) {
566 		virtqueue->last_used_idx -= virtqueue->vring.size;
567 		virtqueue->packed.used_phase = !virtqueue->packed.used_phase;
568 	}
569 
570 	virtqueue->used_req_cnt++;
571 }
572 
573 bool
574 vhost_vq_packed_ring_is_avail(struct spdk_vhost_virtqueue *virtqueue)
575 {
576 	uint16_t flags = virtqueue->vring.desc_packed[virtqueue->last_avail_idx].flags;
577 
578 	/* To mark a desc as available, the driver sets the F_AVAIL bit in flags
579 	 * to match the internal avail wrap counter. It also sets the F_USED bit to
580 	 * match the inverse value but it's not mandatory.
581 	 */
582 	return (!!(flags & VRING_DESC_F_AVAIL) == virtqueue->packed.avail_phase);
583 }
584 
585 bool
586 vhost_vring_packed_desc_is_wr(struct vring_packed_desc *cur_desc)
587 {
588 	return (cur_desc->flags & VRING_DESC_F_WRITE) != 0;
589 }
590 
591 bool
592 vhost_vring_inflight_desc_is_wr(spdk_vhost_inflight_desc *cur_desc)
593 {
594 	return (cur_desc->flags & VRING_DESC_F_WRITE) != 0;
595 }
596 
597 int
598 vhost_vring_packed_desc_get_next(struct vring_packed_desc **desc, uint16_t *req_idx,
599 				 struct spdk_vhost_virtqueue *vq,
600 				 struct vring_packed_desc *desc_table,
601 				 uint32_t desc_table_size)
602 {
603 	if (desc_table != NULL) {
604 		/* When the desc_table isn't NULL means it's indirect and we get the next
605 		 * desc by req_idx and desc_table_size. The return value is NULL means
606 		 * we reach the last desc of this request.
607 		 */
608 		(*req_idx)++;
609 		if (*req_idx < desc_table_size) {
610 			*desc = &desc_table[*req_idx];
611 		} else {
612 			*desc = NULL;
613 		}
614 	} else {
615 		/* When the desc_table is NULL means it's non-indirect and we get the next
616 		 * desc by req_idx and F_NEXT in flags. The return value is NULL means
617 		 * we reach the last desc of this request. When return new desc
618 		 * we update the req_idx too.
619 		 */
620 		if (((*desc)->flags & VRING_DESC_F_NEXT) == 0) {
621 			*desc = NULL;
622 			return 0;
623 		}
624 
625 		*req_idx = (*req_idx + 1) % vq->vring.size;
626 		*desc = &vq->vring.desc_packed[*req_idx];
627 	}
628 
629 	return 0;
630 }
631 
632 static int
633 vhost_vring_desc_payload_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
634 				uint16_t *iov_index, uintptr_t payload, uint64_t remaining)
635 {
636 	uintptr_t vva;
637 	uint64_t len;
638 
639 	do {
640 		if (*iov_index >= SPDK_VHOST_IOVS_MAX) {
641 			SPDK_ERRLOG("SPDK_VHOST_IOVS_MAX(%d) reached\n", SPDK_VHOST_IOVS_MAX);
642 			return -1;
643 		}
644 		len = remaining;
645 		vva = (uintptr_t)rte_vhost_va_from_guest_pa(vsession->mem, payload, &len);
646 		if (vva == 0 || len == 0) {
647 			SPDK_ERRLOG("gpa_to_vva(%p) == NULL\n", (void *)payload);
648 			return -1;
649 		}
650 		iov[*iov_index].iov_base = (void *)vva;
651 		iov[*iov_index].iov_len = len;
652 		remaining -= len;
653 		payload += len;
654 		(*iov_index)++;
655 	} while (remaining);
656 
657 	return 0;
658 }
659 
660 int
661 vhost_vring_packed_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
662 			       uint16_t *iov_index, const struct vring_packed_desc *desc)
663 {
664 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
665 					       desc->addr, desc->len);
666 }
667 
668 int
669 vhost_vring_inflight_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
670 				 uint16_t *iov_index, const spdk_vhost_inflight_desc *desc)
671 {
672 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
673 					       desc->addr, desc->len);
674 }
675 
676 /* 1, Traverse the desc chain to get the buffer_id and return buffer_id as task_idx.
677  * 2, Update the vq->last_avail_idx to point next available desc chain.
678  * 3, Update the avail_wrap_counter if last_avail_idx overturn.
679  */
680 uint16_t
681 vhost_vring_packed_desc_get_buffer_id(struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
682 				      uint16_t *num_descs)
683 {
684 	struct vring_packed_desc *desc;
685 	uint16_t desc_head = req_idx;
686 
687 	*num_descs = 1;
688 
689 	desc =  &vq->vring.desc_packed[req_idx];
690 	if (!vhost_vring_packed_desc_is_indirect(desc)) {
691 		while ((desc->flags & VRING_DESC_F_NEXT) != 0) {
692 			req_idx = (req_idx + 1) % vq->vring.size;
693 			desc = &vq->vring.desc_packed[req_idx];
694 			(*num_descs)++;
695 		}
696 	}
697 
698 	/* Queue Size doesn't have to be a power of 2
699 	 * Device maintains last_avail_idx so we can make sure
700 	 * the value is valid(0 ~ vring.size - 1)
701 	 */
702 	vq->last_avail_idx = (req_idx + 1) % vq->vring.size;
703 	if (vq->last_avail_idx < desc_head) {
704 		vq->packed.avail_phase = !vq->packed.avail_phase;
705 	}
706 
707 	return desc->id;
708 }
709 
710 int
711 vhost_vring_desc_get_next(struct vring_desc **desc,
712 			  struct vring_desc *desc_table, uint32_t desc_table_size)
713 {
714 	struct vring_desc *old_desc = *desc;
715 	uint16_t next_idx;
716 
717 	if ((old_desc->flags & VRING_DESC_F_NEXT) == 0) {
718 		*desc = NULL;
719 		return 0;
720 	}
721 
722 	next_idx = old_desc->next;
723 	if (spdk_unlikely(next_idx >= desc_table_size)) {
724 		*desc = NULL;
725 		return -1;
726 	}
727 
728 	*desc = &desc_table[next_idx];
729 	return 0;
730 }
731 
732 int
733 vhost_vring_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
734 			uint16_t *iov_index, const struct vring_desc *desc)
735 {
736 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
737 					       desc->addr, desc->len);
738 }
739 
740 static inline void
741 vhost_session_mem_region_calc(uint64_t *previous_start, uint64_t *start, uint64_t *end,
742 			      uint64_t *len, struct rte_vhost_mem_region *region)
743 {
744 	*start = FLOOR_2MB(region->mmap_addr);
745 	*end = CEIL_2MB(region->mmap_addr + region->mmap_size);
746 	if (*start == *previous_start) {
747 		*start += (size_t) VALUE_2MB;
748 	}
749 	*previous_start = *start;
750 	*len = *end - *start;
751 }
752 
753 void
754 vhost_session_mem_register(struct rte_vhost_memory *mem)
755 {
756 	uint64_t start, end, len;
757 	uint32_t i;
758 	uint64_t previous_start = UINT64_MAX;
759 
760 
761 	for (i = 0; i < mem->nregions; i++) {
762 		vhost_session_mem_region_calc(&previous_start, &start, &end, &len, &mem->regions[i]);
763 		SPDK_INFOLOG(vhost, "Registering VM memory for vtophys translation - 0x%jx len:0x%jx\n",
764 			     start, len);
765 
766 		if (spdk_mem_register((void *)start, len) != 0) {
767 			SPDK_WARNLOG("Failed to register memory region %"PRIu32". Future vtophys translation might fail.\n",
768 				     i);
769 			continue;
770 		}
771 	}
772 }
773 
774 void
775 vhost_session_mem_unregister(struct rte_vhost_memory *mem)
776 {
777 	uint64_t start, end, len;
778 	uint32_t i;
779 	uint64_t previous_start = UINT64_MAX;
780 
781 	for (i = 0; i < mem->nregions; i++) {
782 		vhost_session_mem_region_calc(&previous_start, &start, &end, &len, &mem->regions[i]);
783 		if (spdk_vtophys((void *) start, NULL) == SPDK_VTOPHYS_ERROR) {
784 			continue; /* region has not been registered */
785 		}
786 
787 		if (spdk_mem_unregister((void *)start, len) != 0) {
788 			assert(false);
789 		}
790 	}
791 }
792 
793 static bool
794 vhost_memory_changed(struct rte_vhost_memory *new,
795 		     struct rte_vhost_memory *old)
796 {
797 	uint32_t i;
798 
799 	if (new->nregions != old->nregions) {
800 		return true;
801 	}
802 
803 	for (i = 0; i < new->nregions; ++i) {
804 		struct rte_vhost_mem_region *new_r = &new->regions[i];
805 		struct rte_vhost_mem_region *old_r = &old->regions[i];
806 
807 		if (new_r->guest_phys_addr != old_r->guest_phys_addr) {
808 			return true;
809 		}
810 		if (new_r->size != old_r->size) {
811 			return true;
812 		}
813 		if (new_r->guest_user_addr != old_r->guest_user_addr) {
814 			return true;
815 		}
816 		if (new_r->mmap_addr != old_r->mmap_addr) {
817 			return true;
818 		}
819 		if (new_r->fd != old_r->fd) {
820 			return true;
821 		}
822 	}
823 
824 	return false;
825 }
826 
827 static int
828 vhost_register_memtable_if_required(struct spdk_vhost_session *vsession, int vid)
829 {
830 	struct rte_vhost_memory *new_mem;
831 
832 	if (vhost_get_mem_table(vid, &new_mem) != 0) {
833 		SPDK_ERRLOG("vhost device %d: Failed to get guest memory table\n", vid);
834 		return -1;
835 	}
836 
837 	if (vsession->mem == NULL) {
838 		SPDK_INFOLOG(vhost, "Start to set memtable\n");
839 		vsession->mem = new_mem;
840 		vhost_session_mem_register(vsession->mem);
841 		return 0;
842 	}
843 
844 	if (vhost_memory_changed(new_mem, vsession->mem)) {
845 		SPDK_INFOLOG(vhost, "Memtable is changed\n");
846 		vhost_session_mem_unregister(vsession->mem);
847 		free(vsession->mem);
848 
849 		vsession->mem = new_mem;
850 		vhost_session_mem_register(vsession->mem);
851 		return 0;
852 
853 	}
854 
855 	SPDK_INFOLOG(vhost, "Memtable is unchanged\n");
856 	free(new_mem);
857 	return 0;
858 }
859 
860 static int
861 _stop_session(struct spdk_vhost_session *vsession)
862 {
863 	struct spdk_vhost_virtqueue *q;
864 	int rc;
865 	uint16_t i;
866 
867 	rc = vhost_user_wait_for_session_stop(vsession, 3, "stop session");
868 	if (rc != 0) {
869 		SPDK_ERRLOG("Couldn't stop device with vid %d.\n", vsession->vid);
870 		return rc;
871 	}
872 
873 	for (i = 0; i < vsession->max_queues; i++) {
874 		q = &vsession->virtqueue[i];
875 
876 		/* vring.desc and vring.desc_packed are in a union struct
877 		 * so q->vring.desc can replace q->vring.desc_packed.
878 		 */
879 		if (q->vring.desc == NULL) {
880 			continue;
881 		}
882 
883 		/* Packed virtqueues support up to 2^15 entries each
884 		 * so left one bit can be used as wrap counter.
885 		 */
886 		if (q->packed.packed_ring) {
887 			q->last_avail_idx = q->last_avail_idx |
888 					    ((uint16_t)q->packed.avail_phase << 15);
889 			q->last_used_idx = q->last_used_idx |
890 					   ((uint16_t)q->packed.used_phase << 15);
891 		}
892 
893 		rte_vhost_set_vring_base(vsession->vid, i, q->last_avail_idx, q->last_used_idx);
894 		q->vring.desc = NULL;
895 	}
896 	vsession->max_queues = 0;
897 
898 	return 0;
899 }
900 
901 static int
902 new_connection(int vid)
903 {
904 	struct spdk_vhost_dev *vdev;
905 	struct spdk_vhost_user_dev *user_dev;
906 	struct spdk_vhost_session *vsession;
907 	size_t dev_dirname_len;
908 	char ifname[PATH_MAX];
909 	char *ctrlr_name;
910 
911 	if (rte_vhost_get_ifname(vid, ifname, PATH_MAX) < 0) {
912 		SPDK_ERRLOG("Couldn't get a valid ifname for device with vid %d\n", vid);
913 		return -1;
914 	}
915 
916 	ctrlr_name = &ifname[0];
917 	dev_dirname_len = strlen(g_vhost_user_dev_dirname);
918 	if (strncmp(ctrlr_name, g_vhost_user_dev_dirname, dev_dirname_len) == 0) {
919 		ctrlr_name += dev_dirname_len;
920 	}
921 
922 	spdk_vhost_lock();
923 	vdev = spdk_vhost_dev_find(ctrlr_name);
924 	if (vdev == NULL) {
925 		SPDK_ERRLOG("Couldn't find device with vid %d to create connection for.\n", vid);
926 		spdk_vhost_unlock();
927 		return -1;
928 	}
929 	spdk_vhost_unlock();
930 
931 	user_dev = to_user_dev(vdev);
932 	pthread_mutex_lock(&user_dev->lock);
933 	if (user_dev->registered == false) {
934 		SPDK_ERRLOG("Device %s is unregistered\n", ctrlr_name);
935 		pthread_mutex_unlock(&user_dev->lock);
936 		return -1;
937 	}
938 
939 	/* We expect sessions inside user_dev->vsessions to be sorted in ascending
940 	 * order in regard of vsession->id. For now we always set id = vsessions_num++
941 	 * and append each session to the very end of the vsessions list.
942 	 * This is required for vhost_user_dev_foreach_session() to work.
943 	 */
944 	if (user_dev->vsessions_num == UINT_MAX) {
945 		pthread_mutex_unlock(&user_dev->lock);
946 		assert(false);
947 		return -EINVAL;
948 	}
949 
950 	if (posix_memalign((void **)&vsession, SPDK_CACHE_LINE_SIZE, sizeof(*vsession) +
951 			   user_dev->user_backend->session_ctx_size)) {
952 		SPDK_ERRLOG("vsession alloc failed\n");
953 		pthread_mutex_unlock(&user_dev->lock);
954 		return -1;
955 	}
956 	memset(vsession, 0, sizeof(*vsession) + user_dev->user_backend->session_ctx_size);
957 
958 	vsession->vdev = vdev;
959 	vsession->vid = vid;
960 	vsession->id = user_dev->vsessions_num++;
961 	vsession->name = spdk_sprintf_alloc("%ss%u", vdev->name, vsession->vid);
962 	if (vsession->name == NULL) {
963 		SPDK_ERRLOG("vsession alloc failed\n");
964 		free(vsession);
965 		pthread_mutex_unlock(&user_dev->lock);
966 		return -1;
967 	}
968 	vsession->started = false;
969 	vsession->next_stats_check_time = 0;
970 	vsession->stats_check_interval = SPDK_VHOST_STATS_CHECK_INTERVAL_MS *
971 					 spdk_get_ticks_hz() / 1000UL;
972 	TAILQ_INSERT_TAIL(&user_dev->vsessions, vsession, tailq);
973 	vhost_session_install_rte_compat_hooks(vsession);
974 	pthread_mutex_unlock(&user_dev->lock);
975 
976 	return 0;
977 }
978 
979 static void
980 vhost_user_session_start(void *arg1)
981 {
982 	struct spdk_vhost_session *vsession = arg1;
983 	struct spdk_vhost_dev *vdev = vsession->vdev;
984 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vsession->vdev);
985 	const struct spdk_vhost_user_dev_backend *backend;
986 	int rc;
987 
988 	pthread_mutex_lock(&user_dev->lock);
989 	backend = user_dev->user_backend;
990 	rc = backend->start_session(vdev, vsession, NULL);
991 	if (rc == 0) {
992 		vsession->started = true;
993 	}
994 	pthread_mutex_unlock(&user_dev->lock);
995 }
996 
997 static int
998 set_device_vq_callfd(struct spdk_vhost_session *vsession, uint16_t qid)
999 {
1000 	struct spdk_vhost_virtqueue *q;
1001 
1002 	if (qid >= SPDK_VHOST_MAX_VQUEUES) {
1003 		return -EINVAL;
1004 	}
1005 
1006 	q = &vsession->virtqueue[qid];
1007 	/* vq isn't enabled yet */
1008 	if (q->vring_idx != qid) {
1009 		return 0;
1010 	}
1011 
1012 	/* vring.desc and vring.desc_packed are in a union struct
1013 	 * so q->vring.desc can replace q->vring.desc_packed.
1014 	 */
1015 	if (q->vring.desc == NULL || q->vring.size == 0) {
1016 		return 0;
1017 	}
1018 
1019 	/*
1020 	 * Not sure right now but this look like some kind of QEMU bug and guest IO
1021 	 * might be frozed without kicking all queues after live-migration. This look like
1022 	 * the previous vhost instance failed to effectively deliver all interrupts before
1023 	 * the GET_VRING_BASE message. This shouldn't harm guest since spurious interrupts
1024 	 * should be ignored by guest virtio driver.
1025 	 *
1026 	 * Tested on QEMU 2.10.91 and 2.11.50.
1027 	 *
1028 	 * Make sure a successful call of
1029 	 * `rte_vhost_vring_call` will happen
1030 	 * after starting the device.
1031 	 */
1032 	q->used_req_cnt += 1;
1033 
1034 	return 0;
1035 }
1036 
1037 static int
1038 enable_device_vq(struct spdk_vhost_session *vsession, uint16_t qid)
1039 {
1040 	struct spdk_vhost_virtqueue *q;
1041 	bool packed_ring;
1042 	const struct spdk_vhost_user_dev_backend *backend;
1043 	int rc;
1044 
1045 	if (qid >= SPDK_VHOST_MAX_VQUEUES) {
1046 		return -EINVAL;
1047 	}
1048 
1049 	q = &vsession->virtqueue[qid];
1050 	memset(q, 0, sizeof(*q));
1051 	packed_ring = ((vsession->negotiated_features & (1ULL << VIRTIO_F_RING_PACKED)) != 0);
1052 
1053 	q->vsession = vsession;
1054 	q->vring_idx = -1;
1055 	if (rte_vhost_get_vhost_vring(vsession->vid, qid, &q->vring)) {
1056 		return 0;
1057 	}
1058 	q->vring_idx = qid;
1059 	rte_vhost_get_vhost_ring_inflight(vsession->vid, qid, &q->vring_inflight);
1060 
1061 	/* vring.desc and vring.desc_packed are in a union struct
1062 	 * so q->vring.desc can replace q->vring.desc_packed.
1063 	 */
1064 	if (q->vring.desc == NULL || q->vring.size == 0) {
1065 		return 0;
1066 	}
1067 
1068 	if (rte_vhost_get_vring_base(vsession->vid, qid, &q->last_avail_idx, &q->last_used_idx)) {
1069 		q->vring.desc = NULL;
1070 		return 0;
1071 	}
1072 
1073 	backend = to_user_dev(vsession->vdev)->user_backend;
1074 	rc = backend->alloc_vq_tasks(vsession, qid);
1075 	if (rc) {
1076 		return rc;
1077 	}
1078 
1079 	if (packed_ring) {
1080 		/* Use the inflight mem to restore the last_avail_idx and last_used_idx.
1081 		 * When the vring format is packed, there is no used_idx in the
1082 		 * used ring, so VM can't resend the used_idx to VHOST when reconnect.
1083 		 * QEMU version 5.2.0 supports the packed inflight before that it only
1084 		 * supports split ring inflight because it doesn't send negotiated features
1085 		 * before get inflight fd. Users can use RPC to enable this function.
1086 		 */
1087 		if (spdk_unlikely(vsession->vdev->packed_ring_recovery)) {
1088 			rte_vhost_get_vring_base_from_inflight(vsession->vid, qid,
1089 							       &q->last_avail_idx,
1090 							       &q->last_used_idx);
1091 		}
1092 
1093 		/* Packed virtqueues support up to 2^15 entries each
1094 		 * so left one bit can be used as wrap counter.
1095 		 */
1096 		q->packed.avail_phase = q->last_avail_idx >> 15;
1097 		q->last_avail_idx = q->last_avail_idx & 0x7FFF;
1098 		q->packed.used_phase = q->last_used_idx >> 15;
1099 		q->last_used_idx = q->last_used_idx & 0x7FFF;
1100 
1101 		if (!spdk_interrupt_mode_is_enabled()) {
1102 			/* Disable I/O submission notifications, we'll be polling. */
1103 			q->vring.device_event->flags = VRING_PACKED_EVENT_FLAG_DISABLE;
1104 		} else {
1105 			/* Enable I/O submission notifications, we'll be interrupting. */
1106 			q->vring.device_event->flags = VRING_PACKED_EVENT_FLAG_ENABLE;
1107 		}
1108 	} else {
1109 		if (!spdk_interrupt_mode_is_enabled()) {
1110 			/* Disable I/O submission notifications, we'll be polling. */
1111 			q->vring.used->flags = VRING_USED_F_NO_NOTIFY;
1112 		} else {
1113 			/* Enable I/O submission notifications, we'll be interrupting. */
1114 			q->vring.used->flags = 0;
1115 		}
1116 	}
1117 
1118 	if (spdk_interrupt_mode_is_enabled() && backend->register_vq_interrupt) {
1119 		backend->register_vq_interrupt(vsession, q);
1120 	}
1121 
1122 	q->packed.packed_ring = packed_ring;
1123 	vsession->max_queues = spdk_max(vsession->max_queues, qid + 1);
1124 
1125 	return 0;
1126 }
1127 
1128 static int
1129 start_device(int vid)
1130 {
1131 	struct spdk_vhost_dev *vdev;
1132 	struct spdk_vhost_session *vsession;
1133 	struct spdk_vhost_user_dev *user_dev;
1134 	int rc = 0;
1135 
1136 	vsession = vhost_session_find_by_vid(vid);
1137 	if (vsession == NULL) {
1138 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1139 		return -1;
1140 	}
1141 	vdev = vsession->vdev;
1142 	user_dev = to_user_dev(vdev);
1143 
1144 	pthread_mutex_lock(&user_dev->lock);
1145 	if (vsession->started) {
1146 		/* already started, nothing to do */
1147 		goto out;
1148 	}
1149 
1150 	if (!vsession->mem) {
1151 		rc = -1;
1152 		SPDK_ERRLOG("Session %s doesn't set memory table yet\n", vsession->name);
1153 		goto out;
1154 	}
1155 
1156 	vhost_user_session_set_coalescing(vdev, vsession, NULL);
1157 	spdk_thread_send_msg(vdev->thread, vhost_user_session_start, vsession);
1158 
1159 out:
1160 	pthread_mutex_unlock(&user_dev->lock);
1161 	return rc;
1162 }
1163 
1164 static void
1165 stop_device(int vid)
1166 {
1167 	struct spdk_vhost_session *vsession;
1168 	struct spdk_vhost_user_dev *user_dev;
1169 
1170 	vsession = vhost_session_find_by_vid(vid);
1171 	if (vsession == NULL) {
1172 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1173 		return;
1174 	}
1175 	user_dev = to_user_dev(vsession->vdev);
1176 
1177 	pthread_mutex_lock(&user_dev->lock);
1178 	if (!vsession->started) {
1179 		pthread_mutex_unlock(&user_dev->lock);
1180 		/* already stopped, nothing to do */
1181 		return;
1182 	}
1183 
1184 	_stop_session(vsession);
1185 	pthread_mutex_unlock(&user_dev->lock);
1186 }
1187 
1188 static void
1189 destroy_connection(int vid)
1190 {
1191 	struct spdk_vhost_session *vsession;
1192 	struct spdk_vhost_user_dev *user_dev;
1193 
1194 	vsession = vhost_session_find_by_vid(vid);
1195 	if (vsession == NULL) {
1196 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1197 		return;
1198 	}
1199 	user_dev = to_user_dev(vsession->vdev);
1200 
1201 	pthread_mutex_lock(&user_dev->lock);
1202 	if (vsession->started) {
1203 		if (_stop_session(vsession) != 0) {
1204 			pthread_mutex_unlock(&user_dev->lock);
1205 			return;
1206 		}
1207 	}
1208 
1209 	if (vsession->mem) {
1210 		vhost_session_mem_unregister(vsession->mem);
1211 		free(vsession->mem);
1212 	}
1213 
1214 	TAILQ_REMOVE(&to_user_dev(vsession->vdev)->vsessions, vsession, tailq);
1215 	free(vsession->name);
1216 	free(vsession);
1217 	pthread_mutex_unlock(&user_dev->lock);
1218 }
1219 
1220 #if RTE_VERSION >= RTE_VERSION_NUM(21, 11, 0, 0)
1221 static const struct rte_vhost_device_ops g_spdk_vhost_ops = {
1222 #else
1223 static const struct vhost_device_ops g_spdk_vhost_ops = {
1224 #endif
1225 	.new_device =  start_device,
1226 	.destroy_device = stop_device,
1227 	.new_connection = new_connection,
1228 	.destroy_connection = destroy_connection,
1229 };
1230 
1231 static struct spdk_vhost_session *
1232 vhost_session_find_by_id(struct spdk_vhost_dev *vdev, unsigned id)
1233 {
1234 	struct spdk_vhost_session *vsession;
1235 
1236 	TAILQ_FOREACH(vsession, &to_user_dev(vdev)->vsessions, tailq) {
1237 		if (vsession->id == id) {
1238 			return vsession;
1239 		}
1240 	}
1241 
1242 	return NULL;
1243 }
1244 
1245 struct spdk_vhost_session *
1246 vhost_session_find_by_vid(int vid)
1247 {
1248 	struct spdk_vhost_dev *vdev;
1249 	struct spdk_vhost_session *vsession;
1250 	struct spdk_vhost_user_dev *user_dev;
1251 
1252 	spdk_vhost_lock();
1253 	for (vdev = spdk_vhost_dev_next(NULL); vdev != NULL;
1254 	     vdev = spdk_vhost_dev_next(vdev)) {
1255 		user_dev = to_user_dev(vdev);
1256 
1257 		pthread_mutex_lock(&user_dev->lock);
1258 		TAILQ_FOREACH(vsession, &user_dev->vsessions, tailq) {
1259 			if (vsession->vid == vid) {
1260 				pthread_mutex_unlock(&user_dev->lock);
1261 				spdk_vhost_unlock();
1262 				return vsession;
1263 			}
1264 		}
1265 		pthread_mutex_unlock(&user_dev->lock);
1266 	}
1267 	spdk_vhost_unlock();
1268 
1269 	return NULL;
1270 }
1271 
1272 static void
1273 wait_for_semaphore(int timeout_sec, const char *errmsg)
1274 {
1275 	struct timespec timeout;
1276 	int rc;
1277 
1278 	clock_gettime(CLOCK_REALTIME, &timeout);
1279 	timeout.tv_sec += timeout_sec;
1280 	rc = sem_timedwait(&g_dpdk_sem, &timeout);
1281 	if (rc != 0) {
1282 		SPDK_ERRLOG("Timeout waiting for event: %s.\n", errmsg);
1283 		sem_wait(&g_dpdk_sem);
1284 	}
1285 }
1286 
1287 void
1288 vhost_user_session_stop_done(struct spdk_vhost_session *vsession, int response)
1289 {
1290 	if (response == 0) {
1291 		vsession->started = false;
1292 	}
1293 
1294 	g_dpdk_response = response;
1295 	sem_post(&g_dpdk_sem);
1296 }
1297 
1298 static void
1299 vhost_user_session_stop_event(void *arg1)
1300 {
1301 	struct vhost_session_fn_ctx *ctx = arg1;
1302 	struct spdk_vhost_dev *vdev = ctx->vdev;
1303 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1304 	struct spdk_vhost_session *vsession;
1305 
1306 	if (pthread_mutex_trylock(&user_dev->lock) != 0) {
1307 		spdk_thread_send_msg(spdk_get_thread(), vhost_user_session_stop_event, arg1);
1308 		return;
1309 	}
1310 
1311 	vsession = vhost_session_find_by_id(vdev, ctx->vsession_id);
1312 	user_dev->user_backend->stop_session(vdev, vsession, NULL);
1313 	pthread_mutex_unlock(&user_dev->lock);
1314 }
1315 
1316 static int
1317 vhost_user_wait_for_session_stop(struct spdk_vhost_session *vsession,
1318 				 unsigned timeout_sec, const char *errmsg)
1319 {
1320 	struct vhost_session_fn_ctx ev_ctx = {0};
1321 	struct spdk_vhost_dev *vdev = vsession->vdev;
1322 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1323 
1324 	ev_ctx.vdev = vdev;
1325 	ev_ctx.vsession_id = vsession->id;
1326 
1327 	spdk_thread_send_msg(vdev->thread, vhost_user_session_stop_event, &ev_ctx);
1328 
1329 	pthread_mutex_unlock(&user_dev->lock);
1330 	wait_for_semaphore(timeout_sec, errmsg);
1331 	pthread_mutex_lock(&user_dev->lock);
1332 
1333 	return g_dpdk_response;
1334 }
1335 
1336 static void
1337 foreach_session_finish_cb(void *arg1)
1338 {
1339 	struct vhost_session_fn_ctx *ev_ctx = arg1;
1340 	struct spdk_vhost_dev *vdev = ev_ctx->vdev;
1341 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1342 
1343 	if (pthread_mutex_trylock(&user_dev->lock) != 0) {
1344 		spdk_thread_send_msg(spdk_get_thread(),
1345 				     foreach_session_finish_cb, arg1);
1346 		return;
1347 	}
1348 
1349 	assert(user_dev->pending_async_op_num > 0);
1350 	user_dev->pending_async_op_num--;
1351 	if (ev_ctx->cpl_fn != NULL) {
1352 		ev_ctx->cpl_fn(vdev, ev_ctx->user_ctx);
1353 	}
1354 
1355 	pthread_mutex_unlock(&user_dev->lock);
1356 	free(ev_ctx);
1357 }
1358 
1359 static void
1360 foreach_session(void *arg1)
1361 {
1362 	struct vhost_session_fn_ctx *ev_ctx = arg1;
1363 	struct spdk_vhost_dev *vdev = ev_ctx->vdev;
1364 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1365 	struct spdk_vhost_session *vsession;
1366 	int rc;
1367 
1368 	if (pthread_mutex_trylock(&user_dev->lock) != 0) {
1369 		spdk_thread_send_msg(spdk_get_thread(), foreach_session, arg1);
1370 		return;
1371 	}
1372 
1373 	TAILQ_FOREACH(vsession, &user_dev->vsessions, tailq) {
1374 		rc = ev_ctx->cb_fn(vdev, vsession, ev_ctx->user_ctx);
1375 		if (rc < 0) {
1376 			goto out;
1377 		}
1378 	}
1379 
1380 out:
1381 	pthread_mutex_unlock(&user_dev->lock);
1382 	spdk_thread_send_msg(g_vhost_user_init_thread, foreach_session_finish_cb, arg1);
1383 }
1384 
1385 void
1386 vhost_user_dev_foreach_session(struct spdk_vhost_dev *vdev,
1387 			       spdk_vhost_session_fn fn,
1388 			       spdk_vhost_dev_fn cpl_fn,
1389 			       void *arg)
1390 {
1391 	struct vhost_session_fn_ctx *ev_ctx;
1392 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1393 
1394 	ev_ctx = calloc(1, sizeof(*ev_ctx));
1395 	if (ev_ctx == NULL) {
1396 		SPDK_ERRLOG("Failed to alloc vhost event.\n");
1397 		assert(false);
1398 		return;
1399 	}
1400 
1401 	ev_ctx->vdev = vdev;
1402 	ev_ctx->cb_fn = fn;
1403 	ev_ctx->cpl_fn = cpl_fn;
1404 	ev_ctx->user_ctx = arg;
1405 
1406 	pthread_mutex_lock(&user_dev->lock);
1407 	assert(user_dev->pending_async_op_num < UINT32_MAX);
1408 	user_dev->pending_async_op_num++;
1409 	pthread_mutex_unlock(&user_dev->lock);
1410 
1411 	spdk_thread_send_msg(vdev->thread, foreach_session, ev_ctx);
1412 }
1413 
1414 void
1415 vhost_user_session_set_interrupt_mode(struct spdk_vhost_session *vsession, bool interrupt_mode)
1416 {
1417 	uint16_t i;
1418 	int rc = 0;
1419 
1420 	for (i = 0; i < vsession->max_queues; i++) {
1421 		struct spdk_vhost_virtqueue *q = &vsession->virtqueue[i];
1422 		uint64_t num_events = 1;
1423 
1424 		/* vring.desc and vring.desc_packed are in a union struct
1425 		 * so q->vring.desc can replace q->vring.desc_packed.
1426 		 */
1427 		if (q->vring.desc == NULL || q->vring.size == 0) {
1428 			continue;
1429 		}
1430 
1431 		if (interrupt_mode) {
1432 
1433 			/* In case of race condition, always kick vring when switch to intr */
1434 			rc = write(q->vring.kickfd, &num_events, sizeof(num_events));
1435 			if (rc < 0) {
1436 				SPDK_ERRLOG("failed to kick vring: %s.\n", spdk_strerror(errno));
1437 			}
1438 
1439 			vsession->interrupt_mode = true;
1440 		} else {
1441 
1442 			vsession->interrupt_mode = false;
1443 		}
1444 	}
1445 }
1446 
1447 static int
1448 extern_vhost_pre_msg_handler(int vid, void *_msg)
1449 {
1450 	struct vhost_user_msg *msg = _msg;
1451 	struct spdk_vhost_session *vsession;
1452 	struct spdk_vhost_user_dev *user_dev;
1453 
1454 	vsession = vhost_session_find_by_vid(vid);
1455 	if (vsession == NULL) {
1456 		SPDK_ERRLOG("Received a message to uninitialized session (vid %d).\n", vid);
1457 		assert(false);
1458 		return RTE_VHOST_MSG_RESULT_ERR;
1459 	}
1460 	user_dev = to_user_dev(vsession->vdev);
1461 
1462 	switch (msg->request) {
1463 	case VHOST_USER_GET_VRING_BASE:
1464 		pthread_mutex_lock(&user_dev->lock);
1465 		if (vsession->started) {
1466 			pthread_mutex_unlock(&user_dev->lock);
1467 			/* `stop_device` is running in synchronous, it
1468 			 * will hold this lock again before exiting.
1469 			 */
1470 			g_spdk_vhost_ops.destroy_device(vid);
1471 		}
1472 		pthread_mutex_unlock(&user_dev->lock);
1473 		break;
1474 	case VHOST_USER_GET_CONFIG: {
1475 		int rc = 0;
1476 
1477 		pthread_mutex_lock(&user_dev->lock);
1478 		if (vsession->vdev->backend->vhost_get_config) {
1479 			rc = vsession->vdev->backend->vhost_get_config(vsession->vdev,
1480 					msg->payload.cfg.region, msg->payload.cfg.size);
1481 			if (rc != 0) {
1482 				msg->size = 0;
1483 			}
1484 		}
1485 		pthread_mutex_unlock(&user_dev->lock);
1486 
1487 		return RTE_VHOST_MSG_RESULT_REPLY;
1488 	}
1489 	case VHOST_USER_SET_CONFIG: {
1490 		int rc = 0;
1491 
1492 		pthread_mutex_lock(&user_dev->lock);
1493 		if (vsession->vdev->backend->vhost_set_config) {
1494 			rc = vsession->vdev->backend->vhost_set_config(vsession->vdev,
1495 					msg->payload.cfg.region, msg->payload.cfg.offset,
1496 					msg->payload.cfg.size, msg->payload.cfg.flags);
1497 		}
1498 		pthread_mutex_unlock(&user_dev->lock);
1499 
1500 		return rc == 0 ? RTE_VHOST_MSG_RESULT_OK : RTE_VHOST_MSG_RESULT_ERR;
1501 	}
1502 	default:
1503 		break;
1504 	}
1505 
1506 	return RTE_VHOST_MSG_RESULT_NOT_HANDLED;
1507 }
1508 
1509 static int
1510 extern_vhost_post_msg_handler(int vid, void *_msg)
1511 {
1512 	struct vhost_user_msg *msg = _msg;
1513 	struct spdk_vhost_session *vsession;
1514 	struct spdk_vhost_user_dev *user_dev;
1515 	uint16_t qid;
1516 	int rc;
1517 
1518 	vsession = vhost_session_find_by_vid(vid);
1519 	if (vsession == NULL) {
1520 		SPDK_ERRLOG("Received a message to uninitialized session (vid %d).\n", vid);
1521 		assert(false);
1522 		return RTE_VHOST_MSG_RESULT_ERR;
1523 	}
1524 	user_dev = to_user_dev(vsession->vdev);
1525 
1526 	if (msg->request == VHOST_USER_SET_MEM_TABLE) {
1527 		vhost_register_memtable_if_required(vsession, vid);
1528 	}
1529 
1530 	switch (msg->request) {
1531 	case VHOST_USER_SET_FEATURES:
1532 		rc = vhost_get_negotiated_features(vid, &vsession->negotiated_features);
1533 		if (rc) {
1534 			SPDK_ERRLOG("vhost device %d: Failed to get negotiated driver features\n", vid);
1535 			return RTE_VHOST_MSG_RESULT_ERR;
1536 		}
1537 		break;
1538 	case VHOST_USER_SET_VRING_CALL:
1539 		qid = (uint16_t)msg->payload.u64;
1540 		rc = set_device_vq_callfd(vsession, qid);
1541 		if (rc) {
1542 			return RTE_VHOST_MSG_RESULT_ERR;
1543 		}
1544 		break;
1545 	case VHOST_USER_SET_VRING_KICK:
1546 		qid = (uint16_t)msg->payload.u64;
1547 		rc = enable_device_vq(vsession, qid);
1548 		if (rc) {
1549 			return RTE_VHOST_MSG_RESULT_ERR;
1550 		}
1551 
1552 		/* vhost-user spec tells us to start polling a queue after receiving
1553 		 * its SET_VRING_KICK message. Let's do it!
1554 		 */
1555 		pthread_mutex_lock(&user_dev->lock);
1556 		if (!vsession->started) {
1557 			pthread_mutex_unlock(&user_dev->lock);
1558 			g_spdk_vhost_ops.new_device(vid);
1559 			return RTE_VHOST_MSG_RESULT_NOT_HANDLED;
1560 		}
1561 		pthread_mutex_unlock(&user_dev->lock);
1562 		break;
1563 	default:
1564 		break;
1565 	}
1566 
1567 	return RTE_VHOST_MSG_RESULT_NOT_HANDLED;
1568 }
1569 
1570 struct rte_vhost_user_extern_ops g_spdk_extern_vhost_ops = {
1571 	.pre_msg_handle = extern_vhost_pre_msg_handler,
1572 	.post_msg_handle = extern_vhost_post_msg_handler,
1573 };
1574 
1575 void
1576 vhost_session_install_rte_compat_hooks(struct spdk_vhost_session *vsession)
1577 {
1578 	int rc;
1579 
1580 	rc = rte_vhost_extern_callback_register(vsession->vid, &g_spdk_extern_vhost_ops, NULL);
1581 	if (rc != 0) {
1582 		SPDK_ERRLOG("rte_vhost_extern_callback_register() failed for vid = %d\n",
1583 			    vsession->vid);
1584 		return;
1585 	}
1586 }
1587 
1588 int
1589 vhost_register_unix_socket(const char *path, const char *ctrl_name,
1590 			   uint64_t virtio_features, uint64_t disabled_features, uint64_t protocol_features)
1591 {
1592 	struct stat file_stat;
1593 	uint64_t features = 0;
1594 
1595 	/* Register vhost driver to handle vhost messages. */
1596 	if (stat(path, &file_stat) != -1) {
1597 		if (!S_ISSOCK(file_stat.st_mode)) {
1598 			SPDK_ERRLOG("Cannot create a domain socket at path \"%s\": "
1599 				    "The file already exists and is not a socket.\n",
1600 				    path);
1601 			return -EIO;
1602 		} else if (unlink(path) != 0) {
1603 			SPDK_ERRLOG("Cannot create a domain socket at path \"%s\": "
1604 				    "The socket already exists and failed to unlink.\n",
1605 				    path);
1606 			return -EIO;
1607 		}
1608 	}
1609 
1610 #if RTE_VERSION < RTE_VERSION_NUM(20, 8, 0, 0)
1611 	if (rte_vhost_driver_register(path, 0) != 0) {
1612 #else
1613 	if (rte_vhost_driver_register(path, RTE_VHOST_USER_ASYNC_COPY) != 0) {
1614 #endif
1615 		SPDK_ERRLOG("Could not register controller %s with vhost library\n", ctrl_name);
1616 		SPDK_ERRLOG("Check if domain socket %s already exists\n", path);
1617 		return -EIO;
1618 	}
1619 	if (rte_vhost_driver_set_features(path, virtio_features) ||
1620 	    rte_vhost_driver_disable_features(path, disabled_features)) {
1621 		SPDK_ERRLOG("Couldn't set vhost features for controller %s\n", ctrl_name);
1622 
1623 		rte_vhost_driver_unregister(path);
1624 		return -EIO;
1625 	}
1626 
1627 	if (rte_vhost_driver_callback_register(path, &g_spdk_vhost_ops) != 0) {
1628 		rte_vhost_driver_unregister(path);
1629 		SPDK_ERRLOG("Couldn't register callbacks for controller %s\n", ctrl_name);
1630 		return -EIO;
1631 	}
1632 
1633 	rte_vhost_driver_get_protocol_features(path, &features);
1634 	features |= protocol_features;
1635 	rte_vhost_driver_set_protocol_features(path, features);
1636 
1637 	if (rte_vhost_driver_start(path) != 0) {
1638 		SPDK_ERRLOG("Failed to start vhost driver for controller %s (%d): %s\n",
1639 			    ctrl_name, errno, spdk_strerror(errno));
1640 		rte_vhost_driver_unregister(path);
1641 		return -EIO;
1642 	}
1643 
1644 	return 0;
1645 }
1646 
1647 int
1648 vhost_get_mem_table(int vid, struct rte_vhost_memory **mem)
1649 {
1650 	return rte_vhost_get_mem_table(vid, mem);
1651 }
1652 
1653 int
1654 vhost_driver_unregister(const char *path)
1655 {
1656 	return rte_vhost_driver_unregister(path);
1657 }
1658 
1659 int
1660 vhost_get_negotiated_features(int vid, uint64_t *negotiated_features)
1661 {
1662 	return rte_vhost_get_negotiated_features(vid, negotiated_features);
1663 }
1664 
1665 int
1666 vhost_user_dev_set_coalescing(struct spdk_vhost_user_dev *user_dev, uint32_t delay_base_us,
1667 			      uint32_t iops_threshold)
1668 {
1669 	uint64_t delay_time_base = delay_base_us * spdk_get_ticks_hz() / 1000000ULL;
1670 	uint32_t io_rate = iops_threshold * SPDK_VHOST_STATS_CHECK_INTERVAL_MS / 1000U;
1671 
1672 	if (delay_time_base >= UINT32_MAX) {
1673 		SPDK_ERRLOG("Delay time of %"PRIu32" is to big\n", delay_base_us);
1674 		return -EINVAL;
1675 	} else if (io_rate == 0) {
1676 		SPDK_ERRLOG("IOPS rate of %"PRIu32" is too low. Min is %u\n", io_rate,
1677 			    1000U / SPDK_VHOST_STATS_CHECK_INTERVAL_MS);
1678 		return -EINVAL;
1679 	}
1680 
1681 	user_dev->coalescing_delay_us = delay_base_us;
1682 	user_dev->coalescing_iops_threshold = iops_threshold;
1683 	return 0;
1684 }
1685 
1686 int
1687 vhost_user_session_set_coalescing(struct spdk_vhost_dev *vdev,
1688 				  struct spdk_vhost_session *vsession, void *ctx)
1689 {
1690 	vsession->coalescing_delay_time_base =
1691 		to_user_dev(vdev)->coalescing_delay_us * spdk_get_ticks_hz() / 1000000ULL;
1692 	vsession->coalescing_io_rate_threshold =
1693 		to_user_dev(vdev)->coalescing_iops_threshold * SPDK_VHOST_STATS_CHECK_INTERVAL_MS / 1000U;
1694 	return 0;
1695 }
1696 
1697 int
1698 vhost_user_set_coalescing(struct spdk_vhost_dev *vdev, uint32_t delay_base_us,
1699 			  uint32_t iops_threshold)
1700 {
1701 	int rc;
1702 
1703 	rc = vhost_user_dev_set_coalescing(to_user_dev(vdev), delay_base_us, iops_threshold);
1704 	if (rc != 0) {
1705 		return rc;
1706 	}
1707 
1708 	vhost_user_dev_foreach_session(vdev, vhost_user_session_set_coalescing, NULL, NULL);
1709 
1710 	return 0;
1711 }
1712 
1713 void
1714 vhost_user_get_coalescing(struct spdk_vhost_dev *vdev, uint32_t *delay_base_us,
1715 			  uint32_t *iops_threshold)
1716 {
1717 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1718 
1719 	if (delay_base_us) {
1720 		*delay_base_us = user_dev->coalescing_delay_us;
1721 	}
1722 
1723 	if (iops_threshold) {
1724 		*iops_threshold = user_dev->coalescing_iops_threshold;
1725 	}
1726 }
1727 
1728 int
1729 spdk_vhost_set_socket_path(const char *basename)
1730 {
1731 	int ret;
1732 
1733 	if (basename && strlen(basename) > 0) {
1734 		ret = snprintf(g_vhost_user_dev_dirname, sizeof(g_vhost_user_dev_dirname) - 2, "%s", basename);
1735 		if (ret <= 0) {
1736 			return -EINVAL;
1737 		}
1738 		if ((size_t)ret >= sizeof(g_vhost_user_dev_dirname) - 2) {
1739 			SPDK_ERRLOG("Char dev dir path length %d is too long\n", ret);
1740 			return -EINVAL;
1741 		}
1742 
1743 		if (g_vhost_user_dev_dirname[ret - 1] != '/') {
1744 			g_vhost_user_dev_dirname[ret] = '/';
1745 			g_vhost_user_dev_dirname[ret + 1]  = '\0';
1746 		}
1747 	}
1748 
1749 	return 0;
1750 }
1751 
1752 static void
1753 vhost_dev_thread_exit(void *arg1)
1754 {
1755 	spdk_thread_exit(spdk_get_thread());
1756 }
1757 
1758 static bool g_vhost_user_started = false;
1759 
1760 int
1761 vhost_user_dev_register(struct spdk_vhost_dev *vdev, const char *name, struct spdk_cpuset *cpumask,
1762 			const struct spdk_vhost_user_dev_backend *user_backend)
1763 {
1764 	char path[PATH_MAX];
1765 	struct spdk_vhost_user_dev *user_dev;
1766 
1767 	if (snprintf(path, sizeof(path), "%s%s", g_vhost_user_dev_dirname, name) >= (int)sizeof(path)) {
1768 		SPDK_ERRLOG("Resulting socket path for controller %s is too long: %s%s\n",
1769 			    name, g_vhost_user_dev_dirname, name);
1770 		return -EINVAL;
1771 	}
1772 
1773 	vdev->path = strdup(path);
1774 	if (vdev->path == NULL) {
1775 		return -EIO;
1776 	}
1777 
1778 	user_dev = calloc(1, sizeof(*user_dev));
1779 	if (user_dev == NULL) {
1780 		free(vdev->path);
1781 		return -ENOMEM;
1782 	}
1783 	vdev->ctxt = user_dev;
1784 
1785 	vdev->thread = spdk_thread_create(vdev->name, cpumask);
1786 	if (vdev->thread == NULL) {
1787 		free(user_dev);
1788 		free(vdev->path);
1789 		SPDK_ERRLOG("Failed to create thread for vhost controller %s.\n", name);
1790 		return -EIO;
1791 	}
1792 
1793 	user_dev->user_backend = user_backend;
1794 	user_dev->vdev = vdev;
1795 	user_dev->registered = true;
1796 	TAILQ_INIT(&user_dev->vsessions);
1797 	pthread_mutex_init(&user_dev->lock, NULL);
1798 
1799 	vhost_user_dev_set_coalescing(user_dev, SPDK_VHOST_COALESCING_DELAY_BASE_US,
1800 				      SPDK_VHOST_VQ_IOPS_COALESCING_THRESHOLD);
1801 
1802 	if (vhost_register_unix_socket(path, name, vdev->virtio_features, vdev->disabled_features,
1803 				       vdev->protocol_features)) {
1804 		spdk_thread_send_msg(vdev->thread, vhost_dev_thread_exit, NULL);
1805 		pthread_mutex_destroy(&user_dev->lock);
1806 		free(user_dev);
1807 		free(vdev->path);
1808 		return -EIO;
1809 	}
1810 
1811 	return 0;
1812 }
1813 
1814 int
1815 vhost_user_dev_unregister(struct spdk_vhost_dev *vdev)
1816 {
1817 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1818 	struct spdk_vhost_session *vsession, *tmp_vsession;
1819 
1820 	pthread_mutex_lock(&user_dev->lock);
1821 	if (user_dev->pending_async_op_num) {
1822 		pthread_mutex_unlock(&user_dev->lock);
1823 		return -EBUSY;
1824 	}
1825 
1826 	/* This is the case that uses RPC call `vhost_delete_controller` while VM is connected */
1827 	if (!TAILQ_EMPTY(&user_dev->vsessions) && g_vhost_user_started) {
1828 		SPDK_ERRLOG("Controller %s has still valid connection.\n", vdev->name);
1829 		pthread_mutex_unlock(&user_dev->lock);
1830 		return -EBUSY;
1831 	}
1832 
1833 	/* This is the case that quits the subsystem while VM is connected, the VM
1834 	 * should be stopped by the shutdown thread.
1835 	 */
1836 	if (!g_vhost_user_started) {
1837 		TAILQ_FOREACH_SAFE(vsession, &user_dev->vsessions, tailq, tmp_vsession) {
1838 			assert(vsession->started == false);
1839 			TAILQ_REMOVE(&user_dev->vsessions, vsession, tailq);
1840 			if (vsession->mem) {
1841 				vhost_session_mem_unregister(vsession->mem);
1842 				free(vsession->mem);
1843 			}
1844 			free(vsession->name);
1845 			free(vsession);
1846 		}
1847 	}
1848 
1849 	user_dev->registered = false;
1850 	pthread_mutex_unlock(&user_dev->lock);
1851 
1852 	/* There are no valid connections now, and it's not an error if the domain
1853 	 * socket was already removed by shutdown thread.
1854 	 */
1855 	vhost_driver_unregister(vdev->path);
1856 
1857 	spdk_thread_send_msg(vdev->thread, vhost_dev_thread_exit, NULL);
1858 	pthread_mutex_destroy(&user_dev->lock);
1859 
1860 	free(user_dev);
1861 	free(vdev->path);
1862 
1863 	return 0;
1864 }
1865 
1866 int
1867 vhost_user_init(void)
1868 {
1869 	size_t len;
1870 
1871 	if (g_vhost_user_started) {
1872 		return 0;
1873 	}
1874 
1875 	if (g_vhost_user_dev_dirname[0] == '\0') {
1876 		if (getcwd(g_vhost_user_dev_dirname, sizeof(g_vhost_user_dev_dirname) - 1) == NULL) {
1877 			SPDK_ERRLOG("getcwd failed (%d): %s\n", errno, spdk_strerror(errno));
1878 			return -1;
1879 		}
1880 
1881 		len = strlen(g_vhost_user_dev_dirname);
1882 		if (g_vhost_user_dev_dirname[len - 1] != '/') {
1883 			g_vhost_user_dev_dirname[len] = '/';
1884 			g_vhost_user_dev_dirname[len + 1] = '\0';
1885 		}
1886 	}
1887 
1888 	g_vhost_user_started = true;
1889 
1890 	g_vhost_user_init_thread = spdk_get_thread();
1891 	assert(g_vhost_user_init_thread != NULL);
1892 
1893 	return 0;
1894 }
1895 
1896 static void
1897 vhost_user_session_shutdown_on_init(void *vhost_cb)
1898 {
1899 	spdk_vhost_fini_cb fn = vhost_cb;
1900 
1901 	fn();
1902 }
1903 
1904 static void *
1905 vhost_user_session_shutdown(void *vhost_cb)
1906 {
1907 	struct spdk_vhost_dev *vdev = NULL;
1908 	struct spdk_vhost_session *vsession;
1909 	struct spdk_vhost_user_dev *user_dev;
1910 
1911 	for (vdev = spdk_vhost_dev_next(NULL); vdev != NULL;
1912 	     vdev = spdk_vhost_dev_next(vdev)) {
1913 		user_dev = to_user_dev(vdev);
1914 		pthread_mutex_lock(&user_dev->lock);
1915 		TAILQ_FOREACH(vsession, &user_dev->vsessions, tailq) {
1916 			if (vsession->started) {
1917 				_stop_session(vsession);
1918 			}
1919 		}
1920 		pthread_mutex_unlock(&user_dev->lock);
1921 		vhost_driver_unregister(vdev->path);
1922 	}
1923 
1924 	SPDK_INFOLOG(vhost, "Exiting\n");
1925 	spdk_thread_send_msg(g_vhost_user_init_thread, vhost_user_session_shutdown_on_init, vhost_cb);
1926 	return NULL;
1927 }
1928 
1929 void
1930 vhost_user_fini(spdk_vhost_fini_cb vhost_cb)
1931 {
1932 	pthread_t tid;
1933 	int rc;
1934 
1935 	if (!g_vhost_user_started) {
1936 		vhost_cb();
1937 		return;
1938 	}
1939 
1940 	g_vhost_user_started = false;
1941 
1942 	/* rte_vhost API for removing sockets is not asynchronous. Since it may call SPDK
1943 	 * ops for stopping a device or removing a connection, we need to call it from
1944 	 * a separate thread to avoid deadlock.
1945 	 */
1946 	rc = pthread_create(&tid, NULL, &vhost_user_session_shutdown, vhost_cb);
1947 	if (rc < 0) {
1948 		SPDK_ERRLOG("Failed to start session shutdown thread (%d): %s\n", rc, spdk_strerror(rc));
1949 		abort();
1950 	}
1951 	pthread_detach(tid);
1952 }
1953 
1954 void
1955 vhost_session_info_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_ctx *w)
1956 {
1957 	struct spdk_vhost_session *vsession;
1958 	struct spdk_vhost_user_dev *user_dev;
1959 
1960 	user_dev = to_user_dev(vdev);
1961 	pthread_mutex_lock(&user_dev->lock);
1962 	TAILQ_FOREACH(vsession, &user_dev->vsessions, tailq) {
1963 		spdk_json_write_object_begin(w);
1964 		spdk_json_write_named_uint32(w, "vid", vsession->vid);
1965 		spdk_json_write_named_uint32(w, "id", vsession->id);
1966 		spdk_json_write_named_string(w, "name", vsession->name);
1967 		spdk_json_write_named_bool(w, "started", vsession->started);
1968 		spdk_json_write_named_uint32(w, "max_queues", vsession->max_queues);
1969 		spdk_json_write_named_uint32(w, "inflight_task_cnt", vsession->task_cnt);
1970 		spdk_json_write_object_end(w);
1971 	}
1972 	pthread_mutex_unlock(&user_dev->lock);
1973 }
1974