xref: /spdk/lib/vhost/rte_vhost_user.c (revision 927f1fd57bd004df581518466ec4c1b8083e5d23)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *   Copyright (c) 2021 Mellanox Technologies LTD. All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/env.h"
38 #include "spdk/likely.h"
39 #include "spdk/string.h"
40 #include "spdk/util.h"
41 #include "spdk/memory.h"
42 #include "spdk/barrier.h"
43 #include "spdk/vhost.h"
44 #include "vhost_internal.h"
45 #include <rte_version.h>
46 
47 #include "spdk_internal/vhost_user.h"
48 
49 /* Path to folder where character device will be created. Can be set by user. */
50 static char g_vhost_user_dev_dirname[PATH_MAX] = "";
51 
52 static struct spdk_thread *g_vhost_user_init_thread;
53 
54 /**
55  * DPDK calls our callbacks synchronously but the work those callbacks
56  * perform needs to be async. Luckily, all DPDK callbacks are called on
57  * a DPDK-internal pthread, so we'll just wait on a semaphore in there.
58  */
59 static sem_t g_dpdk_sem;
60 
61 /** Return code for the current DPDK callback */
62 static int g_dpdk_response;
63 
64 struct vhost_session_fn_ctx {
65 	/** Device pointer obtained before enqueueing the event */
66 	struct spdk_vhost_dev *vdev;
67 
68 	/** ID of the session to send event to. */
69 	uint32_t vsession_id;
70 
71 	/** User provided function to be executed on session's thread. */
72 	spdk_vhost_session_fn cb_fn;
73 
74 	/**
75 	 * User provided function to be called on the init thread
76 	 * after iterating through all sessions.
77 	 */
78 	spdk_vhost_dev_fn cpl_fn;
79 
80 	/** Custom user context */
81 	void *user_ctx;
82 };
83 
84 static struct spdk_vhost_user_dev *
85 to_user_dev(struct spdk_vhost_dev *vdev)
86 {
87 	assert(vdev != NULL);
88 	return vdev->ctxt;
89 }
90 
91 static void __attribute__((constructor))
92 _vhost_user_sem_init(void)
93 {
94 	if (sem_init(&g_dpdk_sem, 0, 0) != 0) {
95 		SPDK_ERRLOG("Failed to initialize semaphore for rte_vhost pthread.\n");
96 		abort();
97 	}
98 }
99 
100 static void __attribute__((destructor))
101 _vhost_user_sem_destroy(void)
102 {
103 	sem_destroy(&g_dpdk_sem);
104 }
105 
106 void *vhost_gpa_to_vva(struct spdk_vhost_session *vsession, uint64_t addr, uint64_t len)
107 {
108 	void *vva;
109 	uint64_t newlen;
110 
111 	newlen = len;
112 	vva = (void *)rte_vhost_va_from_guest_pa(vsession->mem, addr, &newlen);
113 	if (newlen != len) {
114 		return NULL;
115 	}
116 
117 	return vva;
118 
119 }
120 
121 static void
122 vhost_log_req_desc(struct spdk_vhost_session *vsession, struct spdk_vhost_virtqueue *virtqueue,
123 		   uint16_t req_id)
124 {
125 	struct vring_desc *desc, *desc_table;
126 	uint32_t desc_table_size;
127 	int rc;
128 
129 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
130 		return;
131 	}
132 
133 	rc = vhost_vq_get_desc(vsession, virtqueue, req_id, &desc, &desc_table, &desc_table_size);
134 	if (spdk_unlikely(rc != 0)) {
135 		SPDK_ERRLOG("Can't log used ring descriptors!\n");
136 		return;
137 	}
138 
139 	do {
140 		if (vhost_vring_desc_is_wr(desc)) {
141 			/* To be honest, only pages realy touched should be logged, but
142 			 * doing so would require tracking those changes in each backed.
143 			 * Also backend most likely will touch all/most of those pages so
144 			 * for lets assume we touched all pages passed to as writeable buffers. */
145 			rte_vhost_log_write(vsession->vid, desc->addr, desc->len);
146 		}
147 		vhost_vring_desc_get_next(&desc, desc_table, desc_table_size);
148 	} while (desc);
149 }
150 
151 static void
152 vhost_log_used_vring_elem(struct spdk_vhost_session *vsession,
153 			  struct spdk_vhost_virtqueue *virtqueue,
154 			  uint16_t idx)
155 {
156 	uint64_t offset, len;
157 
158 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
159 		return;
160 	}
161 
162 	if (spdk_unlikely(virtqueue->packed.packed_ring)) {
163 		offset = idx * sizeof(struct vring_packed_desc);
164 		len = sizeof(struct vring_packed_desc);
165 	} else {
166 		offset = offsetof(struct vring_used, ring[idx]);
167 		len = sizeof(virtqueue->vring.used->ring[idx]);
168 	}
169 
170 	rte_vhost_log_used_vring(vsession->vid, virtqueue->vring_idx, offset, len);
171 }
172 
173 static void
174 vhost_log_used_vring_idx(struct spdk_vhost_session *vsession,
175 			 struct spdk_vhost_virtqueue *virtqueue)
176 {
177 	uint64_t offset, len;
178 	uint16_t vq_idx;
179 
180 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
181 		return;
182 	}
183 
184 	offset = offsetof(struct vring_used, idx);
185 	len = sizeof(virtqueue->vring.used->idx);
186 	vq_idx = virtqueue - vsession->virtqueue;
187 
188 	rte_vhost_log_used_vring(vsession->vid, vq_idx, offset, len);
189 }
190 
191 /*
192  * Get available requests from avail ring.
193  */
194 uint16_t
195 vhost_vq_avail_ring_get(struct spdk_vhost_virtqueue *virtqueue, uint16_t *reqs,
196 			uint16_t reqs_len)
197 {
198 	struct rte_vhost_vring *vring = &virtqueue->vring;
199 	struct vring_avail *avail = vring->avail;
200 	uint16_t size_mask = vring->size - 1;
201 	uint16_t last_idx = virtqueue->last_avail_idx, avail_idx = avail->idx;
202 	uint16_t count, i;
203 	int rc;
204 	uint64_t u64_value;
205 
206 	spdk_smp_rmb();
207 
208 	if (virtqueue->vsession && spdk_unlikely(virtqueue->vsession->interrupt_mode)) {
209 		/* Read to clear vring's kickfd */
210 		rc = read(vring->kickfd, &u64_value, sizeof(u64_value));
211 		if (rc < 0) {
212 			SPDK_ERRLOG("failed to acknowledge kickfd: %s.\n", spdk_strerror(errno));
213 			return -errno;
214 		}
215 	}
216 
217 	count = avail_idx - last_idx;
218 	if (spdk_likely(count == 0)) {
219 		return 0;
220 	}
221 
222 	if (spdk_unlikely(count > vring->size)) {
223 		/* TODO: the queue is unrecoverably broken and should be marked so.
224 		 * For now we will fail silently and report there are no new avail entries.
225 		 */
226 		return 0;
227 	}
228 
229 	count = spdk_min(count, reqs_len);
230 
231 	virtqueue->last_avail_idx += count;
232 	/* Check whether there are unprocessed reqs in vq, then kick vq manually */
233 	if (virtqueue->vsession && spdk_unlikely(virtqueue->vsession->interrupt_mode)) {
234 		/* If avail_idx is larger than virtqueue's last_avail_idx, then there is unprocessed reqs.
235 		 * avail_idx should get updated here from memory, in case of race condition with guest.
236 		 */
237 		avail_idx = * (volatile uint16_t *) &avail->idx;
238 		if (avail_idx > virtqueue->last_avail_idx) {
239 			/* Write to notify vring's kickfd */
240 			rc = write(vring->kickfd, &u64_value, sizeof(u64_value));
241 			if (rc < 0) {
242 				SPDK_ERRLOG("failed to kick vring: %s.\n", spdk_strerror(errno));
243 				return -errno;
244 			}
245 		}
246 	}
247 
248 	for (i = 0; i < count; i++) {
249 		reqs[i] = vring->avail->ring[(last_idx + i) & size_mask];
250 	}
251 
252 	SPDK_DEBUGLOG(vhost_ring,
253 		      "AVAIL: last_idx=%"PRIu16" avail_idx=%"PRIu16" count=%"PRIu16"\n",
254 		      last_idx, avail_idx, count);
255 
256 	return count;
257 }
258 
259 static bool
260 vhost_vring_desc_is_indirect(struct vring_desc *cur_desc)
261 {
262 	return !!(cur_desc->flags & VRING_DESC_F_INDIRECT);
263 }
264 
265 static bool
266 vhost_vring_packed_desc_is_indirect(struct vring_packed_desc *cur_desc)
267 {
268 	return (cur_desc->flags & VRING_DESC_F_INDIRECT) != 0;
269 }
270 
271 static bool
272 vhost_inflight_packed_desc_is_indirect(spdk_vhost_inflight_desc *cur_desc)
273 {
274 	return (cur_desc->flags & VRING_DESC_F_INDIRECT) != 0;
275 }
276 
277 int
278 vhost_vq_get_desc(struct spdk_vhost_session *vsession, struct spdk_vhost_virtqueue *virtqueue,
279 		  uint16_t req_idx, struct vring_desc **desc, struct vring_desc **desc_table,
280 		  uint32_t *desc_table_size)
281 {
282 	if (spdk_unlikely(req_idx >= virtqueue->vring.size)) {
283 		return -1;
284 	}
285 
286 	*desc = &virtqueue->vring.desc[req_idx];
287 
288 	if (vhost_vring_desc_is_indirect(*desc)) {
289 		*desc_table_size = (*desc)->len / sizeof(**desc);
290 		*desc_table = vhost_gpa_to_vva(vsession, (*desc)->addr,
291 					       sizeof(**desc) * *desc_table_size);
292 		*desc = *desc_table;
293 		if (*desc == NULL) {
294 			return -1;
295 		}
296 
297 		return 0;
298 	}
299 
300 	*desc_table = virtqueue->vring.desc;
301 	*desc_table_size = virtqueue->vring.size;
302 
303 	return 0;
304 }
305 
306 static bool
307 vhost_packed_desc_indirect_to_desc_table(struct spdk_vhost_session *vsession,
308 		uint64_t addr, uint32_t len,
309 		struct vring_packed_desc **desc_table,
310 		uint32_t *desc_table_size)
311 {
312 	*desc_table_size = len / sizeof(struct vring_packed_desc);
313 
314 	*desc_table = vhost_gpa_to_vva(vsession, addr, len);
315 	if (spdk_unlikely(*desc_table == NULL)) {
316 		return false;
317 	}
318 
319 	return true;
320 }
321 
322 int
323 vhost_vq_get_desc_packed(struct spdk_vhost_session *vsession,
324 			 struct spdk_vhost_virtqueue *virtqueue,
325 			 uint16_t req_idx, struct vring_packed_desc **desc,
326 			 struct vring_packed_desc **desc_table, uint32_t *desc_table_size)
327 {
328 	*desc =  &virtqueue->vring.desc_packed[req_idx];
329 
330 	/* In packed ring when the desc is non-indirect we get next desc
331 	 * by judging (desc->flag & VRING_DESC_F_NEXT) != 0. When the desc
332 	 * is indirect we get next desc by idx and desc_table_size. It's
333 	 * different from split ring.
334 	 */
335 	if (vhost_vring_packed_desc_is_indirect(*desc)) {
336 		if (!vhost_packed_desc_indirect_to_desc_table(vsession, (*desc)->addr, (*desc)->len,
337 				desc_table, desc_table_size)) {
338 			return -1;
339 		}
340 
341 		*desc = *desc_table;
342 	} else {
343 		*desc_table = NULL;
344 		*desc_table_size  = 0;
345 	}
346 
347 	return 0;
348 }
349 
350 int
351 vhost_inflight_queue_get_desc(struct spdk_vhost_session *vsession,
352 			      spdk_vhost_inflight_desc *desc_array,
353 			      uint16_t req_idx, spdk_vhost_inflight_desc **desc,
354 			      struct vring_packed_desc  **desc_table, uint32_t *desc_table_size)
355 {
356 	*desc = &desc_array[req_idx];
357 
358 	if (vhost_inflight_packed_desc_is_indirect(*desc)) {
359 		if (!vhost_packed_desc_indirect_to_desc_table(vsession, (*desc)->addr, (*desc)->len,
360 				desc_table, desc_table_size)) {
361 			return -1;
362 		}
363 
364 		/* This desc is the inflight desc not the packed desc.
365 		 * When set the F_INDIRECT the table entry should be the packed desc
366 		 * so set the inflight desc NULL.
367 		 */
368 		*desc = NULL;
369 	} else {
370 		/* When not set the F_INDIRECT means there is no packed desc table */
371 		*desc_table = NULL;
372 		*desc_table_size = 0;
373 	}
374 
375 	return 0;
376 }
377 
378 int
379 vhost_vq_used_signal(struct spdk_vhost_session *vsession,
380 		     struct spdk_vhost_virtqueue *virtqueue)
381 {
382 	if (virtqueue->used_req_cnt == 0) {
383 		return 0;
384 	}
385 
386 	virtqueue->req_cnt += virtqueue->used_req_cnt;
387 	virtqueue->used_req_cnt = 0;
388 
389 	SPDK_DEBUGLOG(vhost_ring,
390 		      "Queue %td - USED RING: sending IRQ: last used %"PRIu16"\n",
391 		      virtqueue - vsession->virtqueue, virtqueue->last_used_idx);
392 
393 	if (rte_vhost_vring_call(vsession->vid, virtqueue->vring_idx) == 0) {
394 		/* interrupt signalled */
395 		return 1;
396 	} else {
397 		/* interrupt not signalled */
398 		return 0;
399 	}
400 }
401 
402 static void
403 session_vq_io_stats_update(struct spdk_vhost_session *vsession,
404 			   struct spdk_vhost_virtqueue *virtqueue, uint64_t now)
405 {
406 	uint32_t irq_delay_base = vsession->coalescing_delay_time_base;
407 	uint32_t io_threshold = vsession->coalescing_io_rate_threshold;
408 	int32_t irq_delay;
409 	uint32_t req_cnt;
410 
411 	req_cnt = virtqueue->req_cnt + virtqueue->used_req_cnt;
412 	if (req_cnt <= io_threshold) {
413 		return;
414 	}
415 
416 	irq_delay = (irq_delay_base * (req_cnt - io_threshold)) / io_threshold;
417 	virtqueue->irq_delay_time = (uint32_t) spdk_max(0, irq_delay);
418 
419 	virtqueue->req_cnt = 0;
420 	virtqueue->next_event_time = now;
421 }
422 
423 static void
424 check_session_vq_io_stats(struct spdk_vhost_session *vsession,
425 			  struct spdk_vhost_virtqueue *virtqueue, uint64_t now)
426 {
427 	if (now < vsession->next_stats_check_time) {
428 		return;
429 	}
430 
431 	vsession->next_stats_check_time = now + vsession->stats_check_interval;
432 	session_vq_io_stats_update(vsession, virtqueue, now);
433 }
434 
435 static inline bool
436 vhost_vq_event_is_suppressed(struct spdk_vhost_virtqueue *vq)
437 {
438 	if (spdk_unlikely(vq->packed.packed_ring)) {
439 		if (vq->vring.driver_event->flags & VRING_PACKED_EVENT_FLAG_DISABLE) {
440 			return true;
441 		}
442 	} else {
443 		if (vq->vring.avail->flags & VRING_AVAIL_F_NO_INTERRUPT) {
444 			return true;
445 		}
446 	}
447 
448 	return false;
449 }
450 
451 void
452 vhost_session_vq_used_signal(struct spdk_vhost_virtqueue *virtqueue)
453 {
454 	struct spdk_vhost_session *vsession = virtqueue->vsession;
455 	uint64_t now;
456 
457 	if (vsession->coalescing_delay_time_base == 0) {
458 		if (virtqueue->vring.desc == NULL) {
459 			return;
460 		}
461 
462 		if (vhost_vq_event_is_suppressed(virtqueue)) {
463 			return;
464 		}
465 
466 		vhost_vq_used_signal(vsession, virtqueue);
467 	} else {
468 		now = spdk_get_ticks();
469 		check_session_vq_io_stats(vsession, virtqueue, now);
470 
471 		/* No need for event right now */
472 		if (now < virtqueue->next_event_time) {
473 			return;
474 		}
475 
476 		if (vhost_vq_event_is_suppressed(virtqueue)) {
477 			return;
478 		}
479 
480 		if (!vhost_vq_used_signal(vsession, virtqueue)) {
481 			return;
482 		}
483 
484 		/* Syscall is quite long so update time */
485 		now = spdk_get_ticks();
486 		virtqueue->next_event_time = now + virtqueue->irq_delay_time;
487 	}
488 }
489 
490 void
491 vhost_session_used_signal(struct spdk_vhost_session *vsession)
492 {
493 	struct spdk_vhost_virtqueue *virtqueue;
494 	uint16_t q_idx;
495 
496 	for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
497 		virtqueue = &vsession->virtqueue[q_idx];
498 		vhost_session_vq_used_signal(virtqueue);
499 	}
500 }
501 
502 /*
503  * Enqueue id and len to used ring.
504  */
505 void
506 vhost_vq_used_ring_enqueue(struct spdk_vhost_session *vsession,
507 			   struct spdk_vhost_virtqueue *virtqueue,
508 			   uint16_t id, uint32_t len)
509 {
510 	struct rte_vhost_vring *vring = &virtqueue->vring;
511 	struct vring_used *used = vring->used;
512 	uint16_t last_idx = virtqueue->last_used_idx & (vring->size - 1);
513 	uint16_t vq_idx = virtqueue->vring_idx;
514 
515 	SPDK_DEBUGLOG(vhost_ring,
516 		      "Queue %td - USED RING: last_idx=%"PRIu16" req id=%"PRIu16" len=%"PRIu32"\n",
517 		      virtqueue - vsession->virtqueue, virtqueue->last_used_idx, id, len);
518 
519 	vhost_log_req_desc(vsession, virtqueue, id);
520 
521 	virtqueue->last_used_idx++;
522 	used->ring[last_idx].id = id;
523 	used->ring[last_idx].len = len;
524 
525 	/* Ensure the used ring is updated before we log it or increment used->idx. */
526 	spdk_smp_wmb();
527 
528 	rte_vhost_set_last_inflight_io_split(vsession->vid, vq_idx, id);
529 
530 	vhost_log_used_vring_elem(vsession, virtqueue, last_idx);
531 	* (volatile uint16_t *) &used->idx = virtqueue->last_used_idx;
532 	vhost_log_used_vring_idx(vsession, virtqueue);
533 
534 	rte_vhost_clr_inflight_desc_split(vsession->vid, vq_idx, virtqueue->last_used_idx, id);
535 
536 	virtqueue->used_req_cnt++;
537 
538 	if (vsession->interrupt_mode) {
539 		if (virtqueue->vring.desc == NULL || vhost_vq_event_is_suppressed(virtqueue)) {
540 			return;
541 		}
542 
543 		vhost_vq_used_signal(vsession, virtqueue);
544 	}
545 }
546 
547 void
548 vhost_vq_packed_ring_enqueue(struct spdk_vhost_session *vsession,
549 			     struct spdk_vhost_virtqueue *virtqueue,
550 			     uint16_t num_descs, uint16_t buffer_id,
551 			     uint32_t length, uint16_t inflight_head)
552 {
553 	struct vring_packed_desc *desc = &virtqueue->vring.desc_packed[virtqueue->last_used_idx];
554 	bool used, avail;
555 
556 	SPDK_DEBUGLOG(vhost_ring,
557 		      "Queue %td - RING: buffer_id=%"PRIu16"\n",
558 		      virtqueue - vsession->virtqueue, buffer_id);
559 
560 	/* When the descriptor is used, two flags in descriptor
561 	 * avail flag and used flag are set to equal
562 	 * and used flag value == used_wrap_counter.
563 	 */
564 	used = !!(desc->flags & VRING_DESC_F_USED);
565 	avail = !!(desc->flags & VRING_DESC_F_AVAIL);
566 	if (spdk_unlikely(used == virtqueue->packed.used_phase && used == avail)) {
567 		SPDK_ERRLOG("descriptor has been used before\n");
568 		return;
569 	}
570 
571 	/* In used desc addr is unused and len specifies the buffer length
572 	 * that has been written to by the device.
573 	 */
574 	desc->addr = 0;
575 	desc->len = length;
576 
577 	/* This bit specifies whether any data has been written by the device */
578 	if (length != 0) {
579 		desc->flags |= VRING_DESC_F_WRITE;
580 	}
581 
582 	/* Buffer ID is included in the last descriptor in the list.
583 	 * The driver needs to keep track of the size of the list corresponding
584 	 * to each buffer ID.
585 	 */
586 	desc->id = buffer_id;
587 
588 	/* A device MUST NOT make the descriptor used before buffer_id is
589 	 * written to the descriptor.
590 	 */
591 	spdk_smp_wmb();
592 
593 	rte_vhost_set_last_inflight_io_packed(vsession->vid, virtqueue->vring_idx, inflight_head);
594 	/* To mark a desc as used, the device sets the F_USED bit in flags to match
595 	 * the internal Device ring wrap counter. It also sets the F_AVAIL bit to
596 	 * match the same value.
597 	 */
598 	if (virtqueue->packed.used_phase) {
599 		desc->flags |= VRING_DESC_F_AVAIL_USED;
600 	} else {
601 		desc->flags &= ~VRING_DESC_F_AVAIL_USED;
602 	}
603 	rte_vhost_clr_inflight_desc_packed(vsession->vid, virtqueue->vring_idx, inflight_head);
604 
605 	vhost_log_used_vring_elem(vsession, virtqueue, virtqueue->last_used_idx);
606 	virtqueue->last_used_idx += num_descs;
607 	if (virtqueue->last_used_idx >= virtqueue->vring.size) {
608 		virtqueue->last_used_idx -= virtqueue->vring.size;
609 		virtqueue->packed.used_phase = !virtqueue->packed.used_phase;
610 	}
611 
612 	virtqueue->used_req_cnt++;
613 }
614 
615 bool
616 vhost_vq_packed_ring_is_avail(struct spdk_vhost_virtqueue *virtqueue)
617 {
618 	uint16_t flags = virtqueue->vring.desc_packed[virtqueue->last_avail_idx].flags;
619 
620 	/* To mark a desc as available, the driver sets the F_AVAIL bit in flags
621 	 * to match the internal avail wrap counter. It also sets the F_USED bit to
622 	 * match the inverse value but it's not mandatory.
623 	 */
624 	return (!!(flags & VRING_DESC_F_AVAIL) == virtqueue->packed.avail_phase);
625 }
626 
627 bool
628 vhost_vring_packed_desc_is_wr(struct vring_packed_desc *cur_desc)
629 {
630 	return (cur_desc->flags & VRING_DESC_F_WRITE) != 0;
631 }
632 
633 bool
634 vhost_vring_inflight_desc_is_wr(spdk_vhost_inflight_desc *cur_desc)
635 {
636 	return (cur_desc->flags & VRING_DESC_F_WRITE) != 0;
637 }
638 
639 int
640 vhost_vring_packed_desc_get_next(struct vring_packed_desc **desc, uint16_t *req_idx,
641 				 struct spdk_vhost_virtqueue *vq,
642 				 struct vring_packed_desc *desc_table,
643 				 uint32_t desc_table_size)
644 {
645 	if (desc_table != NULL) {
646 		/* When the desc_table isn't NULL means it's indirect and we get the next
647 		 * desc by req_idx and desc_table_size. The return value is NULL means
648 		 * we reach the last desc of this request.
649 		 */
650 		(*req_idx)++;
651 		if (*req_idx < desc_table_size) {
652 			*desc = &desc_table[*req_idx];
653 		} else {
654 			*desc = NULL;
655 		}
656 	} else {
657 		/* When the desc_table is NULL means it's non-indirect and we get the next
658 		 * desc by req_idx and F_NEXT in flags. The return value is NULL means
659 		 * we reach the last desc of this request. When return new desc
660 		 * we update the req_idx too.
661 		 */
662 		if (((*desc)->flags & VRING_DESC_F_NEXT) == 0) {
663 			*desc = NULL;
664 			return 0;
665 		}
666 
667 		*req_idx = (*req_idx + 1) % vq->vring.size;
668 		*desc = &vq->vring.desc_packed[*req_idx];
669 	}
670 
671 	return 0;
672 }
673 
674 static int
675 vhost_vring_desc_payload_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
676 				uint16_t *iov_index, uintptr_t payload, uint64_t remaining)
677 {
678 	uintptr_t vva;
679 	uint64_t len;
680 
681 	do {
682 		if (*iov_index >= SPDK_VHOST_IOVS_MAX) {
683 			SPDK_ERRLOG("SPDK_VHOST_IOVS_MAX(%d) reached\n", SPDK_VHOST_IOVS_MAX);
684 			return -1;
685 		}
686 		len = remaining;
687 		vva = (uintptr_t)rte_vhost_va_from_guest_pa(vsession->mem, payload, &len);
688 		if (vva == 0 || len == 0) {
689 			SPDK_ERRLOG("gpa_to_vva(%p) == NULL\n", (void *)payload);
690 			return -1;
691 		}
692 		iov[*iov_index].iov_base = (void *)vva;
693 		iov[*iov_index].iov_len = len;
694 		remaining -= len;
695 		payload += len;
696 		(*iov_index)++;
697 	} while (remaining);
698 
699 	return 0;
700 }
701 
702 int
703 vhost_vring_packed_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
704 			       uint16_t *iov_index, const struct vring_packed_desc *desc)
705 {
706 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
707 					       desc->addr, desc->len);
708 }
709 
710 int
711 vhost_vring_inflight_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
712 				 uint16_t *iov_index, const spdk_vhost_inflight_desc *desc)
713 {
714 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
715 					       desc->addr, desc->len);
716 }
717 
718 /* 1, Traverse the desc chain to get the buffer_id and return buffer_id as task_idx.
719  * 2, Update the vq->last_avail_idx to point next available desc chain.
720  * 3, Update the avail_wrap_counter if last_avail_idx overturn.
721  */
722 uint16_t
723 vhost_vring_packed_desc_get_buffer_id(struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
724 				      uint16_t *num_descs)
725 {
726 	struct vring_packed_desc *desc;
727 	uint16_t desc_head = req_idx;
728 
729 	*num_descs = 1;
730 
731 	desc =  &vq->vring.desc_packed[req_idx];
732 	if (!vhost_vring_packed_desc_is_indirect(desc)) {
733 		while ((desc->flags & VRING_DESC_F_NEXT) != 0) {
734 			req_idx = (req_idx + 1) % vq->vring.size;
735 			desc = &vq->vring.desc_packed[req_idx];
736 			(*num_descs)++;
737 		}
738 	}
739 
740 	/* Queue Size doesn't have to be a power of 2
741 	 * Device maintains last_avail_idx so we can make sure
742 	 * the value is valid(0 ~ vring.size - 1)
743 	 */
744 	vq->last_avail_idx = (req_idx + 1) % vq->vring.size;
745 	if (vq->last_avail_idx < desc_head) {
746 		vq->packed.avail_phase = !vq->packed.avail_phase;
747 	}
748 
749 	return desc->id;
750 }
751 
752 int
753 vhost_vring_desc_get_next(struct vring_desc **desc,
754 			  struct vring_desc *desc_table, uint32_t desc_table_size)
755 {
756 	struct vring_desc *old_desc = *desc;
757 	uint16_t next_idx;
758 
759 	if ((old_desc->flags & VRING_DESC_F_NEXT) == 0) {
760 		*desc = NULL;
761 		return 0;
762 	}
763 
764 	next_idx = old_desc->next;
765 	if (spdk_unlikely(next_idx >= desc_table_size)) {
766 		*desc = NULL;
767 		return -1;
768 	}
769 
770 	*desc = &desc_table[next_idx];
771 	return 0;
772 }
773 
774 int
775 vhost_vring_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
776 			uint16_t *iov_index, const struct vring_desc *desc)
777 {
778 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
779 					       desc->addr, desc->len);
780 }
781 
782 static inline void
783 vhost_session_mem_region_calc(uint64_t *previous_start, uint64_t *start, uint64_t *end,
784 			      uint64_t *len, struct rte_vhost_mem_region *region)
785 {
786 	*start = FLOOR_2MB(region->mmap_addr);
787 	*end = CEIL_2MB(region->mmap_addr + region->mmap_size);
788 	if (*start == *previous_start) {
789 		*start += (size_t) VALUE_2MB;
790 	}
791 	*previous_start = *start;
792 	*len = *end - *start;
793 }
794 
795 void
796 vhost_session_mem_register(struct rte_vhost_memory *mem)
797 {
798 	uint64_t start, end, len;
799 	uint32_t i;
800 	uint64_t previous_start = UINT64_MAX;
801 
802 
803 	for (i = 0; i < mem->nregions; i++) {
804 		vhost_session_mem_region_calc(&previous_start, &start, &end, &len, &mem->regions[i]);
805 		SPDK_INFOLOG(vhost, "Registering VM memory for vtophys translation - 0x%jx len:0x%jx\n",
806 			     start, len);
807 
808 		if (spdk_mem_register((void *)start, len) != 0) {
809 			SPDK_WARNLOG("Failed to register memory region %"PRIu32". Future vtophys translation might fail.\n",
810 				     i);
811 			continue;
812 		}
813 	}
814 }
815 
816 void
817 vhost_session_mem_unregister(struct rte_vhost_memory *mem)
818 {
819 	uint64_t start, end, len;
820 	uint32_t i;
821 	uint64_t previous_start = UINT64_MAX;
822 
823 	for (i = 0; i < mem->nregions; i++) {
824 		vhost_session_mem_region_calc(&previous_start, &start, &end, &len, &mem->regions[i]);
825 		if (spdk_vtophys((void *) start, NULL) == SPDK_VTOPHYS_ERROR) {
826 			continue; /* region has not been registered */
827 		}
828 
829 		if (spdk_mem_unregister((void *)start, len) != 0) {
830 			assert(false);
831 		}
832 	}
833 }
834 
835 static int
836 _stop_session(struct spdk_vhost_session *vsession)
837 {
838 	struct spdk_vhost_dev *vdev = vsession->vdev;
839 	struct spdk_vhost_user_dev *user_vdev = to_user_dev(vdev);
840 	struct spdk_vhost_virtqueue *q;
841 	int rc;
842 	uint16_t i;
843 
844 	rc = user_vdev->user_backend->stop_session(vsession);
845 	if (rc != 0) {
846 		SPDK_ERRLOG("Couldn't stop device with vid %d.\n", vsession->vid);
847 		return rc;
848 	}
849 
850 	for (i = 0; i < vsession->max_queues; i++) {
851 		q = &vsession->virtqueue[i];
852 
853 		/* vring.desc and vring.desc_packed are in a union struct
854 		 * so q->vring.desc can replace q->vring.desc_packed.
855 		 */
856 		if (q->vring.desc == NULL) {
857 			continue;
858 		}
859 
860 		/* Packed virtqueues support up to 2^15 entries each
861 		 * so left one bit can be used as wrap counter.
862 		 */
863 		if (q->packed.packed_ring) {
864 			q->last_avail_idx = q->last_avail_idx |
865 					    ((uint16_t)q->packed.avail_phase << 15);
866 			q->last_used_idx = q->last_used_idx |
867 					   ((uint16_t)q->packed.used_phase << 15);
868 		}
869 
870 		rte_vhost_set_vring_base(vsession->vid, i, q->last_avail_idx, q->last_used_idx);
871 	}
872 
873 	vhost_session_mem_unregister(vsession->mem);
874 	free(vsession->mem);
875 
876 	return 0;
877 }
878 
879 static int
880 new_connection(int vid)
881 {
882 	struct spdk_vhost_dev *vdev;
883 	struct spdk_vhost_user_dev *user_dev;
884 	struct spdk_vhost_session *vsession;
885 	size_t dev_dirname_len;
886 	char ifname[PATH_MAX];
887 	char *ctrlr_name;
888 
889 	if (rte_vhost_get_ifname(vid, ifname, PATH_MAX) < 0) {
890 		SPDK_ERRLOG("Couldn't get a valid ifname for device with vid %d\n", vid);
891 		return -1;
892 	}
893 
894 	spdk_vhost_lock();
895 
896 	ctrlr_name = &ifname[0];
897 	dev_dirname_len = strlen(g_vhost_user_dev_dirname);
898 	if (strncmp(ctrlr_name, g_vhost_user_dev_dirname, dev_dirname_len) == 0) {
899 		ctrlr_name += dev_dirname_len;
900 	}
901 
902 	vdev = spdk_vhost_dev_find(ctrlr_name);
903 	if (vdev == NULL) {
904 		SPDK_ERRLOG("Couldn't find device with vid %d to create connection for.\n", vid);
905 		spdk_vhost_unlock();
906 		return -1;
907 	}
908 	user_dev = to_user_dev(vdev);
909 
910 	/* We expect sessions inside user_dev->vsessions to be sorted in ascending
911 	 * order in regard of vsession->id. For now we always set id = vsessions_cnt++
912 	 * and append each session to the very end of the vsessions list.
913 	 * This is required for vhost_user_dev_foreach_session() to work.
914 	 */
915 	if (user_dev->vsessions_num == UINT_MAX) {
916 		assert(false);
917 		return -EINVAL;
918 	}
919 
920 	if (posix_memalign((void **)&vsession, SPDK_CACHE_LINE_SIZE, sizeof(*vsession) +
921 			   user_dev->user_backend->session_ctx_size)) {
922 		SPDK_ERRLOG("vsession alloc failed\n");
923 		spdk_vhost_unlock();
924 		return -1;
925 	}
926 	memset(vsession, 0, sizeof(*vsession) + user_dev->user_backend->session_ctx_size);
927 
928 	vsession->vdev = vdev;
929 	vsession->vid = vid;
930 	vsession->id = user_dev->vsessions_num++;
931 	vsession->name = spdk_sprintf_alloc("%ss%u", vdev->name, vsession->vid);
932 	if (vsession->name == NULL) {
933 		SPDK_ERRLOG("vsession alloc failed\n");
934 		spdk_vhost_unlock();
935 		free(vsession);
936 		return -1;
937 	}
938 	vsession->started = false;
939 	vsession->initialized = false;
940 	vsession->next_stats_check_time = 0;
941 	vsession->stats_check_interval = SPDK_VHOST_STATS_CHECK_INTERVAL_MS *
942 					 spdk_get_ticks_hz() / 1000UL;
943 	TAILQ_INSERT_TAIL(&user_dev->vsessions, vsession, tailq);
944 
945 	vhost_session_install_rte_compat_hooks(vsession);
946 	spdk_vhost_unlock();
947 	return 0;
948 }
949 
950 static int
951 start_device(int vid)
952 {
953 	struct spdk_vhost_dev *vdev;
954 	struct spdk_vhost_session *vsession;
955 	int rc = -1;
956 	uint16_t i;
957 	bool packed_ring;
958 
959 	spdk_vhost_lock();
960 
961 	vsession = vhost_session_find_by_vid(vid);
962 	if (vsession == NULL) {
963 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
964 		goto out;
965 	}
966 
967 	vdev = vsession->vdev;
968 	if (vsession->started) {
969 		/* already started, nothing to do */
970 		rc = 0;
971 		goto out;
972 	}
973 
974 	if (vhost_get_negotiated_features(vid, &vsession->negotiated_features) != 0) {
975 		SPDK_ERRLOG("vhost device %d: Failed to get negotiated driver features\n", vid);
976 		goto out;
977 	}
978 
979 	packed_ring = ((vsession->negotiated_features & (1ULL << VIRTIO_F_RING_PACKED)) != 0);
980 
981 	vsession->max_queues = 0;
982 	memset(vsession->virtqueue, 0, sizeof(vsession->virtqueue));
983 	for (i = 0; i < SPDK_VHOST_MAX_VQUEUES; i++) {
984 		struct spdk_vhost_virtqueue *q = &vsession->virtqueue[i];
985 
986 		q->vsession = vsession;
987 		q->vring_idx = -1;
988 		if (rte_vhost_get_vhost_vring(vid, i, &q->vring)) {
989 			continue;
990 		}
991 		q->vring_idx = i;
992 		rte_vhost_get_vhost_ring_inflight(vid, i, &q->vring_inflight);
993 
994 		/* vring.desc and vring.desc_packed are in a union struct
995 		 * so q->vring.desc can replace q->vring.desc_packed.
996 		 */
997 		if (q->vring.desc == NULL || q->vring.size == 0) {
998 			continue;
999 		}
1000 
1001 		if (rte_vhost_get_vring_base(vsession->vid, i, &q->last_avail_idx, &q->last_used_idx)) {
1002 			q->vring.desc = NULL;
1003 			continue;
1004 		}
1005 
1006 		if (packed_ring) {
1007 			/* Use the inflight mem to restore the last_avail_idx and last_used_idx.
1008 			 * When the vring format is packed, there is no used_idx in the
1009 			 * used ring, so VM can't resend the used_idx to VHOST when reconnect.
1010 			 * QEMU version 5.2.0 supports the packed inflight before that it only
1011 			 * supports split ring inflight because it doesn't send negotiated features
1012 			 * before get inflight fd. Users can use RPC to enable this function.
1013 			 */
1014 			if (spdk_unlikely(vdev->packed_ring_recovery)) {
1015 				rte_vhost_get_vring_base_from_inflight(vsession->vid, i,
1016 								       &q->last_avail_idx,
1017 								       &q->last_used_idx);
1018 			}
1019 
1020 			/* Packed virtqueues support up to 2^15 entries each
1021 			 * so left one bit can be used as wrap counter.
1022 			 */
1023 			q->packed.avail_phase = q->last_avail_idx >> 15;
1024 			q->last_avail_idx = q->last_avail_idx & 0x7FFF;
1025 			q->packed.used_phase = q->last_used_idx >> 15;
1026 			q->last_used_idx = q->last_used_idx & 0x7FFF;
1027 
1028 			if (!vsession->interrupt_mode) {
1029 				/* Disable I/O submission notifications, we'll be polling. */
1030 				q->vring.device_event->flags = VRING_PACKED_EVENT_FLAG_DISABLE;
1031 			}
1032 		} else {
1033 			if (!vsession->interrupt_mode) {
1034 				/* Disable I/O submission notifications, we'll be polling. */
1035 				q->vring.used->flags = VRING_USED_F_NO_NOTIFY;
1036 			}
1037 		}
1038 
1039 		q->packed.packed_ring = packed_ring;
1040 		vsession->max_queues = i + 1;
1041 	}
1042 
1043 	if (vhost_get_mem_table(vid, &vsession->mem) != 0) {
1044 		SPDK_ERRLOG("vhost device %d: Failed to get guest memory table\n", vid);
1045 		goto out;
1046 	}
1047 
1048 	/*
1049 	 * Not sure right now but this look like some kind of QEMU bug and guest IO
1050 	 * might be frozed without kicking all queues after live-migration. This look like
1051 	 * the previous vhost instance failed to effectively deliver all interrupts before
1052 	 * the GET_VRING_BASE message. This shouldn't harm guest since spurious interrupts
1053 	 * should be ignored by guest virtio driver.
1054 	 *
1055 	 * Tested on QEMU 2.10.91 and 2.11.50.
1056 	 */
1057 	for (i = 0; i < vsession->max_queues; i++) {
1058 		struct spdk_vhost_virtqueue *q = &vsession->virtqueue[i];
1059 
1060 		/* vring.desc and vring.desc_packed are in a union struct
1061 		 * so q->vring.desc can replace q->vring.desc_packed.
1062 		 */
1063 		if (q->vring.desc != NULL && q->vring.size > 0) {
1064 			rte_vhost_vring_call(vsession->vid, q->vring_idx);
1065 		}
1066 	}
1067 
1068 	vhost_user_session_set_coalescing(vdev, vsession, NULL);
1069 	vhost_session_mem_register(vsession->mem);
1070 	vsession->initialized = true;
1071 	rc = to_user_dev(vdev)->user_backend->start_session(vsession);
1072 	if (rc != 0) {
1073 		vhost_session_mem_unregister(vsession->mem);
1074 		free(vsession->mem);
1075 		goto out;
1076 	}
1077 
1078 out:
1079 	spdk_vhost_unlock();
1080 	return rc;
1081 }
1082 
1083 static void
1084 stop_device(int vid)
1085 {
1086 	struct spdk_vhost_session *vsession;
1087 
1088 	spdk_vhost_lock();
1089 	vsession = vhost_session_find_by_vid(vid);
1090 	if (vsession == NULL) {
1091 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1092 		spdk_vhost_unlock();
1093 		return;
1094 	}
1095 
1096 	if (!vsession->started) {
1097 		/* already stopped, nothing to do */
1098 		spdk_vhost_unlock();
1099 		return;
1100 	}
1101 
1102 	_stop_session(vsession);
1103 	spdk_vhost_unlock();
1104 
1105 	return;
1106 }
1107 
1108 static void
1109 destroy_connection(int vid)
1110 {
1111 	struct spdk_vhost_session *vsession;
1112 
1113 	spdk_vhost_lock();
1114 	vsession = vhost_session_find_by_vid(vid);
1115 	if (vsession == NULL) {
1116 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1117 		spdk_vhost_unlock();
1118 		return;
1119 	}
1120 
1121 	if (vsession->started) {
1122 		if (_stop_session(vsession) != 0) {
1123 			spdk_vhost_unlock();
1124 			return;
1125 		}
1126 	}
1127 
1128 	TAILQ_REMOVE(&to_user_dev(vsession->vdev)->vsessions, vsession, tailq);
1129 	free(vsession->name);
1130 	free(vsession);
1131 	spdk_vhost_unlock();
1132 }
1133 
1134 #if RTE_VERSION >= RTE_VERSION_NUM(21, 11, 0, 0)
1135 static const struct rte_vhost_device_ops g_spdk_vhost_ops = {
1136 #else
1137 static const struct vhost_device_ops g_spdk_vhost_ops = {
1138 #endif
1139 	.new_device =  start_device,
1140 	.destroy_device = stop_device,
1141 	.new_connection = new_connection,
1142 	.destroy_connection = destroy_connection,
1143 };
1144 
1145 static struct spdk_vhost_session *
1146 vhost_session_find_by_id(struct spdk_vhost_dev *vdev, unsigned id)
1147 {
1148 	struct spdk_vhost_session *vsession;
1149 
1150 	TAILQ_FOREACH(vsession, &to_user_dev(vdev)->vsessions, tailq) {
1151 		if (vsession->id == id) {
1152 			return vsession;
1153 		}
1154 	}
1155 
1156 	return NULL;
1157 }
1158 
1159 struct spdk_vhost_session *
1160 vhost_session_find_by_vid(int vid)
1161 {
1162 	struct spdk_vhost_dev *vdev;
1163 	struct spdk_vhost_session *vsession;
1164 
1165 	for (vdev = spdk_vhost_dev_next(NULL); vdev != NULL;
1166 	     vdev = spdk_vhost_dev_next(vdev)) {
1167 		TAILQ_FOREACH(vsession, &to_user_dev(vdev)->vsessions, tailq) {
1168 			if (vsession->vid == vid) {
1169 				return vsession;
1170 			}
1171 		}
1172 	}
1173 
1174 	return NULL;
1175 }
1176 
1177 static void
1178 wait_for_semaphore(int timeout_sec, const char *errmsg)
1179 {
1180 	struct timespec timeout;
1181 	int rc;
1182 
1183 	clock_gettime(CLOCK_REALTIME, &timeout);
1184 	timeout.tv_sec += timeout_sec;
1185 	rc = sem_timedwait(&g_dpdk_sem, &timeout);
1186 	if (rc != 0) {
1187 		SPDK_ERRLOG("Timeout waiting for event: %s.\n", errmsg);
1188 		sem_wait(&g_dpdk_sem);
1189 	}
1190 }
1191 
1192 static void
1193 vhost_session_cb_done(int rc)
1194 {
1195 	g_dpdk_response = rc;
1196 	sem_post(&g_dpdk_sem);
1197 }
1198 
1199 void
1200 vhost_user_session_start_done(struct spdk_vhost_session *vsession, int response)
1201 {
1202 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vsession->vdev);
1203 	if (response == 0) {
1204 		vsession->started = true;
1205 
1206 		assert(user_dev->active_session_num < UINT32_MAX);
1207 		user_dev->active_session_num++;
1208 	}
1209 
1210 	vhost_session_cb_done(response);
1211 }
1212 
1213 void
1214 vhost_user_session_stop_done(struct spdk_vhost_session *vsession, int response)
1215 {
1216 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vsession->vdev);
1217 
1218 	if (response == 0) {
1219 		vsession->started = false;
1220 
1221 		assert(user_dev->active_session_num > 0);
1222 		user_dev->active_session_num--;
1223 	}
1224 
1225 	vhost_session_cb_done(response);
1226 }
1227 
1228 static void
1229 vhost_event_cb(void *arg1)
1230 {
1231 	struct vhost_session_fn_ctx *ctx = arg1;
1232 	struct spdk_vhost_session *vsession;
1233 
1234 	if (spdk_vhost_trylock() != 0) {
1235 		spdk_thread_send_msg(spdk_get_thread(), vhost_event_cb, arg1);
1236 		return;
1237 	}
1238 
1239 	vsession = vhost_session_find_by_id(ctx->vdev, ctx->vsession_id);
1240 	ctx->cb_fn(ctx->vdev, vsession, NULL);
1241 	spdk_vhost_unlock();
1242 }
1243 
1244 int
1245 vhost_user_session_send_event(struct spdk_vhost_session *vsession,
1246 			 spdk_vhost_session_fn cb_fn, unsigned timeout_sec,
1247 			 const char *errmsg)
1248 {
1249 	struct vhost_session_fn_ctx ev_ctx = {0};
1250 	struct spdk_vhost_dev *vdev = vsession->vdev;
1251 
1252 	ev_ctx.vdev = vdev;
1253 	ev_ctx.vsession_id = vsession->id;
1254 	ev_ctx.cb_fn = cb_fn;
1255 
1256 	spdk_thread_send_msg(vdev->thread, vhost_event_cb, &ev_ctx);
1257 
1258 	spdk_vhost_unlock();
1259 	wait_for_semaphore(timeout_sec, errmsg);
1260 	spdk_vhost_lock();
1261 
1262 	return g_dpdk_response;
1263 }
1264 
1265 static void
1266 foreach_session_finish_cb(void *arg1)
1267 {
1268 	struct vhost_session_fn_ctx *ev_ctx = arg1;
1269 	struct spdk_vhost_dev *vdev = ev_ctx->vdev;
1270 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1271 
1272 	if (spdk_vhost_trylock() != 0) {
1273 		spdk_thread_send_msg(spdk_get_thread(),
1274 				     foreach_session_finish_cb, arg1);
1275 		return;
1276 	}
1277 
1278 	assert(user_dev->pending_async_op_num > 0);
1279 	user_dev->pending_async_op_num--;
1280 	if (ev_ctx->cpl_fn != NULL) {
1281 		ev_ctx->cpl_fn(vdev, ev_ctx->user_ctx);
1282 	}
1283 
1284 	spdk_vhost_unlock();
1285 	free(ev_ctx);
1286 }
1287 
1288 static void
1289 foreach_session(void *arg1)
1290 {
1291 	struct vhost_session_fn_ctx *ev_ctx = arg1;
1292 	struct spdk_vhost_session *vsession;
1293 	struct spdk_vhost_dev *vdev = ev_ctx->vdev;
1294 	int rc;
1295 
1296 	if (spdk_vhost_trylock() != 0) {
1297 		spdk_thread_send_msg(spdk_get_thread(), foreach_session, arg1);
1298 		return;
1299 	}
1300 
1301 	TAILQ_FOREACH(vsession, &to_user_dev(vdev)->vsessions, tailq) {
1302 		if (vsession->initialized) {
1303 			rc = ev_ctx->cb_fn(vdev, vsession, ev_ctx->user_ctx);
1304 			if (rc < 0) {
1305 				goto out;
1306 			}
1307 		}
1308 	}
1309 
1310 out:
1311 	spdk_vhost_unlock();
1312 
1313 	spdk_thread_send_msg(g_vhost_user_init_thread, foreach_session_finish_cb, arg1);
1314 }
1315 
1316 void
1317 vhost_user_dev_foreach_session(struct spdk_vhost_dev *vdev,
1318 			  spdk_vhost_session_fn fn,
1319 			  spdk_vhost_dev_fn cpl_fn,
1320 			  void *arg)
1321 {
1322 	struct vhost_session_fn_ctx *ev_ctx;
1323 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1324 
1325 	ev_ctx = calloc(1, sizeof(*ev_ctx));
1326 	if (ev_ctx == NULL) {
1327 		SPDK_ERRLOG("Failed to alloc vhost event.\n");
1328 		assert(false);
1329 		return;
1330 	}
1331 
1332 	ev_ctx->vdev = vdev;
1333 	ev_ctx->cb_fn = fn;
1334 	ev_ctx->cpl_fn = cpl_fn;
1335 	ev_ctx->user_ctx = arg;
1336 
1337 	assert(user_dev->pending_async_op_num < UINT32_MAX);
1338 	user_dev->pending_async_op_num++;
1339 
1340 	spdk_thread_send_msg(vdev->thread, foreach_session, ev_ctx);
1341 }
1342 
1343 void
1344 vhost_user_session_set_interrupt_mode(struct spdk_vhost_session *vsession, bool interrupt_mode)
1345 {
1346 	uint16_t i;
1347 	bool packed_ring;
1348 	int rc = 0;
1349 
1350 	packed_ring = ((vsession->negotiated_features & (1ULL << VIRTIO_F_RING_PACKED)) != 0);
1351 
1352 	for (i = 0; i < vsession->max_queues; i++) {
1353 		struct spdk_vhost_virtqueue *q = &vsession->virtqueue[i];
1354 		uint64_t num_events = 1;
1355 
1356 		/* vring.desc and vring.desc_packed are in a union struct
1357 		 * so q->vring.desc can replace q->vring.desc_packed.
1358 		 */
1359 		if (q->vring.desc == NULL || q->vring.size == 0) {
1360 			continue;
1361 		}
1362 
1363 		if (interrupt_mode) {
1364 			/* Enable I/O submission notifications, we'll be interrupting. */
1365 			if (packed_ring) {
1366 				* (volatile uint16_t *) &q->vring.device_event->flags = VRING_PACKED_EVENT_FLAG_ENABLE;
1367 			} else {
1368 				* (volatile uint16_t *) &q->vring.used->flags = 0;
1369 			}
1370 
1371 			/* In case of race condition, always kick vring when switch to intr */
1372 			rc = write(q->vring.kickfd, &num_events, sizeof(num_events));
1373 			if (rc < 0) {
1374 				SPDK_ERRLOG("failed to kick vring: %s.\n", spdk_strerror(errno));
1375 			}
1376 
1377 			vsession->interrupt_mode = true;
1378 		} else {
1379 			/* Disable I/O submission notifications, we'll be polling. */
1380 			if (packed_ring) {
1381 				* (volatile uint16_t *) &q->vring.device_event->flags = VRING_PACKED_EVENT_FLAG_DISABLE;
1382 			} else {
1383 				* (volatile uint16_t *) &q->vring.used->flags = VRING_USED_F_NO_NOTIFY;
1384 			}
1385 
1386 			vsession->interrupt_mode = false;
1387 		}
1388 	}
1389 }
1390 
1391 
1392 static enum rte_vhost_msg_result
1393 extern_vhost_pre_msg_handler(int vid, void *_msg)
1394 {
1395 	struct vhost_user_msg *msg = _msg;
1396 	struct spdk_vhost_session *vsession;
1397 
1398 	vsession = vhost_session_find_by_vid(vid);
1399 	if (vsession == NULL) {
1400 		SPDK_ERRLOG("Received a message to unitialized session (vid %d).\n", vid);
1401 		assert(false);
1402 		return RTE_VHOST_MSG_RESULT_ERR;
1403 	}
1404 
1405 	switch (msg->request) {
1406 	case VHOST_USER_GET_VRING_BASE:
1407 		if (vsession->forced_polling && vsession->started) {
1408 			/* Our queue is stopped for whatever reason, but we may still
1409 			 * need to poll it after it's initialized again.
1410 			 */
1411 			g_spdk_vhost_ops.destroy_device(vid);
1412 		}
1413 		break;
1414 	case VHOST_USER_SET_VRING_BASE:
1415 	case VHOST_USER_SET_VRING_ADDR:
1416 	case VHOST_USER_SET_VRING_NUM:
1417 		if (vsession->forced_polling && vsession->started) {
1418 			/* Additional queues are being initialized, so we either processed
1419 			 * enough I/Os and are switching from SeaBIOS to the OS now, or
1420 			 * we were never in SeaBIOS in the first place. Either way, we
1421 			 * don't need our workaround anymore.
1422 			 */
1423 			g_spdk_vhost_ops.destroy_device(vid);
1424 			vsession->forced_polling = false;
1425 		}
1426 		break;
1427 	case VHOST_USER_SET_VRING_KICK:
1428 		/* rte_vhost(after 20.08) will call new_device after one active vring is
1429 		 * configured, we will start the session before all vrings are available,
1430 		 * so for each new vring, if the session is started, we need to restart it
1431 		 * again.
1432 		 */
1433 	case VHOST_USER_SET_VRING_CALL:
1434 		/* rte_vhost will close the previous callfd and won't notify
1435 		 * us about any change. This will effectively make SPDK fail
1436 		 * to deliver any subsequent interrupts until a session is
1437 		 * restarted. We stop the session here before closing the previous
1438 		 * fd (so that all interrupts must have been delivered by the
1439 		 * time the descriptor is closed) and start right after (which
1440 		 * will make SPDK retrieve the latest, up-to-date callfd from
1441 		 * rte_vhost.
1442 		 */
1443 	case VHOST_USER_SET_MEM_TABLE:
1444 		/* rte_vhost will unmap previous memory that SPDK may still
1445 		 * have pending DMA operations on. We can't let that happen,
1446 		 * so stop the device before letting rte_vhost unmap anything.
1447 		 * This will block until all pending I/Os are finished.
1448 		 * We will start the device again from the post-processing
1449 		 * message handler.
1450 		 */
1451 		if (vsession->started) {
1452 			g_spdk_vhost_ops.destroy_device(vid);
1453 			vsession->needs_restart = true;
1454 		}
1455 		break;
1456 	case VHOST_USER_GET_CONFIG: {
1457 		int rc = 0;
1458 
1459 		spdk_vhost_lock();
1460 		if (vsession->vdev->backend->vhost_get_config) {
1461 			rc = vsession->vdev->backend->vhost_get_config(vsession->vdev,
1462 				msg->payload.cfg.region, msg->payload.cfg.size);
1463 			if (rc != 0) {
1464 				msg->size = 0;
1465 			}
1466 		}
1467 		spdk_vhost_unlock();
1468 
1469 		return RTE_VHOST_MSG_RESULT_REPLY;
1470 	}
1471 	case VHOST_USER_SET_CONFIG: {
1472 		int rc = 0;
1473 
1474 		spdk_vhost_lock();
1475 		if (vsession->vdev->backend->vhost_set_config) {
1476 			rc = vsession->vdev->backend->vhost_set_config(vsession->vdev,
1477 				msg->payload.cfg.region, msg->payload.cfg.offset,
1478 				msg->payload.cfg.size, msg->payload.cfg.flags);
1479 		}
1480 		spdk_vhost_unlock();
1481 
1482 		return rc == 0 ? RTE_VHOST_MSG_RESULT_OK : RTE_VHOST_MSG_RESULT_ERR;
1483 	}
1484 	default:
1485 		break;
1486 	}
1487 
1488 	return RTE_VHOST_MSG_RESULT_NOT_HANDLED;
1489 }
1490 
1491 static enum rte_vhost_msg_result
1492 extern_vhost_post_msg_handler(int vid, void *_msg)
1493 {
1494 	struct vhost_user_msg *msg = _msg;
1495 	struct spdk_vhost_session *vsession;
1496 
1497 	vsession = vhost_session_find_by_vid(vid);
1498 	if (vsession == NULL) {
1499 		SPDK_ERRLOG("Received a message to unitialized session (vid %d).\n", vid);
1500 		assert(false);
1501 		return RTE_VHOST_MSG_RESULT_ERR;
1502 	}
1503 
1504 	if (vsession->needs_restart) {
1505 		g_spdk_vhost_ops.new_device(vid);
1506 		vsession->needs_restart = false;
1507 		return RTE_VHOST_MSG_RESULT_NOT_HANDLED;
1508 	}
1509 
1510 	switch (msg->request) {
1511 	case VHOST_USER_SET_FEATURES:
1512 		/* rte_vhost requires all queues to be fully initialized in order
1513 		 * to start I/O processing. This behavior is not compliant with the
1514 		 * vhost-user specification and doesn't work with QEMU 2.12+, which
1515 		 * will only initialize 1 I/O queue for the SeaBIOS boot.
1516 		 * Theoretically, we should start polling each virtqueue individually
1517 		 * after receiving its SET_VRING_KICK message, but rte_vhost is not
1518 		 * designed to poll individual queues. So here we use a workaround
1519 		 * to detect when the vhost session could be potentially at that SeaBIOS
1520 		 * stage and we mark it to start polling as soon as its first virtqueue
1521 		 * gets initialized. This doesn't hurt any non-QEMU vhost slaves
1522 		 * and allows QEMU 2.12+ to boot correctly. SET_FEATURES could be sent
1523 		 * at any time, but QEMU will send it at least once on SeaBIOS
1524 		 * initialization - whenever powered-up or rebooted.
1525 		 */
1526 		vsession->forced_polling = true;
1527 		break;
1528 	case VHOST_USER_SET_VRING_KICK:
1529 		/* vhost-user spec tells us to start polling a queue after receiving
1530 		 * its SET_VRING_KICK message. Let's do it!
1531 		 */
1532 		if (vsession->forced_polling && !vsession->started) {
1533 			g_spdk_vhost_ops.new_device(vid);
1534 		}
1535 		break;
1536 	default:
1537 		break;
1538 	}
1539 
1540 	return RTE_VHOST_MSG_RESULT_NOT_HANDLED;
1541 }
1542 
1543 struct rte_vhost_user_extern_ops g_spdk_extern_vhost_ops = {
1544 	.pre_msg_handle = extern_vhost_pre_msg_handler,
1545 	.post_msg_handle = extern_vhost_post_msg_handler,
1546 };
1547 
1548 void
1549 vhost_session_install_rte_compat_hooks(struct spdk_vhost_session *vsession)
1550 {
1551 	int rc;
1552 
1553 	rc = rte_vhost_extern_callback_register(vsession->vid, &g_spdk_extern_vhost_ops, NULL);
1554 	if (rc != 0) {
1555 		SPDK_ERRLOG("rte_vhost_extern_callback_register() failed for vid = %d\n",
1556 			    vsession->vid);
1557 		return;
1558 	}
1559 }
1560 
1561 int
1562 vhost_register_unix_socket(const char *path, const char *ctrl_name,
1563 			   uint64_t virtio_features, uint64_t disabled_features, uint64_t protocol_features)
1564 {
1565 	struct stat file_stat;
1566 	uint64_t features = 0;
1567 
1568 	/* Register vhost driver to handle vhost messages. */
1569 	if (stat(path, &file_stat) != -1) {
1570 		if (!S_ISSOCK(file_stat.st_mode)) {
1571 			SPDK_ERRLOG("Cannot create a domain socket at path \"%s\": "
1572 				    "The file already exists and is not a socket.\n",
1573 				    path);
1574 			return -EIO;
1575 		} else if (unlink(path) != 0) {
1576 			SPDK_ERRLOG("Cannot create a domain socket at path \"%s\": "
1577 				    "The socket already exists and failed to unlink.\n",
1578 				    path);
1579 			return -EIO;
1580 		}
1581 	}
1582 
1583 #if RTE_VERSION < RTE_VERSION_NUM(20, 8, 0, 0)
1584 	if (rte_vhost_driver_register(path, 0) != 0) {
1585 #else
1586 	if (rte_vhost_driver_register(path, RTE_VHOST_USER_ASYNC_COPY) != 0) {
1587 #endif
1588 		SPDK_ERRLOG("Could not register controller %s with vhost library\n", ctrl_name);
1589 		SPDK_ERRLOG("Check if domain socket %s already exists\n", path);
1590 		return -EIO;
1591 	}
1592 	if (rte_vhost_driver_set_features(path, virtio_features) ||
1593 	    rte_vhost_driver_disable_features(path, disabled_features)) {
1594 		SPDK_ERRLOG("Couldn't set vhost features for controller %s\n", ctrl_name);
1595 
1596 		rte_vhost_driver_unregister(path);
1597 		return -EIO;
1598 	}
1599 
1600 	if (rte_vhost_driver_callback_register(path, &g_spdk_vhost_ops) != 0) {
1601 		rte_vhost_driver_unregister(path);
1602 		SPDK_ERRLOG("Couldn't register callbacks for controller %s\n", ctrl_name);
1603 		return -EIO;
1604 	}
1605 
1606 	rte_vhost_driver_get_protocol_features(path, &features);
1607 	features |= protocol_features;
1608 	rte_vhost_driver_set_protocol_features(path, features);
1609 
1610 	if (rte_vhost_driver_start(path) != 0) {
1611 		SPDK_ERRLOG("Failed to start vhost driver for controller %s (%d): %s\n",
1612 			    ctrl_name, errno, spdk_strerror(errno));
1613 		rte_vhost_driver_unregister(path);
1614 		return -EIO;
1615 	}
1616 
1617 	return 0;
1618 }
1619 
1620 int
1621 vhost_get_mem_table(int vid, struct rte_vhost_memory **mem)
1622 {
1623 	return rte_vhost_get_mem_table(vid, mem);
1624 }
1625 
1626 int
1627 vhost_driver_unregister(const char *path)
1628 {
1629 	return rte_vhost_driver_unregister(path);
1630 }
1631 
1632 int
1633 vhost_get_negotiated_features(int vid, uint64_t *negotiated_features)
1634 {
1635 	return rte_vhost_get_negotiated_features(vid, negotiated_features);
1636 }
1637 
1638 int
1639 vhost_user_dev_set_coalescing(struct spdk_vhost_user_dev *user_dev, uint32_t delay_base_us,
1640 			 uint32_t iops_threshold)
1641 {
1642 	uint64_t delay_time_base = delay_base_us * spdk_get_ticks_hz() / 1000000ULL;
1643 	uint32_t io_rate = iops_threshold * SPDK_VHOST_STATS_CHECK_INTERVAL_MS / 1000U;
1644 
1645 	if (delay_time_base >= UINT32_MAX) {
1646 		SPDK_ERRLOG("Delay time of %"PRIu32" is to big\n", delay_base_us);
1647 		return -EINVAL;
1648 	} else if (io_rate == 0) {
1649 		SPDK_ERRLOG("IOPS rate of %"PRIu32" is too low. Min is %u\n", io_rate,
1650 			    1000U / SPDK_VHOST_STATS_CHECK_INTERVAL_MS);
1651 		return -EINVAL;
1652 	}
1653 
1654 	user_dev->coalescing_delay_us = delay_base_us;
1655 	user_dev->coalescing_iops_threshold = iops_threshold;
1656 	return 0;
1657 }
1658 
1659 int
1660 vhost_user_session_set_coalescing(struct spdk_vhost_dev *vdev,
1661 			     struct spdk_vhost_session *vsession, void *ctx)
1662 {
1663 	vsession->coalescing_delay_time_base =
1664 		to_user_dev(vdev)->coalescing_delay_us * spdk_get_ticks_hz() / 1000000ULL;
1665 	vsession->coalescing_io_rate_threshold =
1666 		to_user_dev(vdev)->coalescing_iops_threshold * SPDK_VHOST_STATS_CHECK_INTERVAL_MS / 1000U;
1667 	return 0;
1668 }
1669 
1670 int
1671 spdk_vhost_set_coalescing(struct spdk_vhost_dev *vdev, uint32_t delay_base_us,
1672 			  uint32_t iops_threshold)
1673 {
1674 	int rc;
1675 
1676 	rc = vhost_user_dev_set_coalescing(to_user_dev(vdev), delay_base_us, iops_threshold);
1677 	if (rc != 0) {
1678 		return rc;
1679 	}
1680 
1681 	vhost_user_dev_foreach_session(vdev, vhost_user_session_set_coalescing, NULL, NULL);
1682 	return 0;
1683 }
1684 
1685 void
1686 spdk_vhost_get_coalescing(struct spdk_vhost_dev *vdev, uint32_t *delay_base_us,
1687 			  uint32_t *iops_threshold)
1688 {
1689 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1690 
1691 	if (delay_base_us) {
1692 		*delay_base_us = user_dev->coalescing_delay_us;
1693 	}
1694 
1695 	if (iops_threshold) {
1696 		*iops_threshold = user_dev->coalescing_iops_threshold;
1697 	}
1698 }
1699 
1700 int
1701 spdk_vhost_set_socket_path(const char *basename)
1702 {
1703 	int ret;
1704 
1705 	if (basename && strlen(basename) > 0) {
1706 		ret = snprintf(g_vhost_user_dev_dirname, sizeof(g_vhost_user_dev_dirname) - 2, "%s", basename);
1707 		if (ret <= 0) {
1708 			return -EINVAL;
1709 		}
1710 		if ((size_t)ret >= sizeof(g_vhost_user_dev_dirname) - 2) {
1711 			SPDK_ERRLOG("Char dev dir path length %d is too long\n", ret);
1712 			return -EINVAL;
1713 		}
1714 
1715 		if (g_vhost_user_dev_dirname[ret - 1] != '/') {
1716 			g_vhost_user_dev_dirname[ret] = '/';
1717 			g_vhost_user_dev_dirname[ret + 1]  = '\0';
1718 		}
1719 	}
1720 
1721 	return 0;
1722 }
1723 
1724 static void
1725 vhost_dev_thread_exit(void *arg1)
1726 {
1727 	spdk_thread_exit(spdk_get_thread());
1728 }
1729 
1730 int
1731 vhost_user_dev_register(struct spdk_vhost_dev *vdev, const char *name, struct spdk_cpuset *cpumask,
1732 			const struct spdk_vhost_user_dev_backend *user_backend)
1733 {
1734 	char path[PATH_MAX];
1735 	struct spdk_vhost_user_dev *user_dev;
1736 
1737 	if (snprintf(path, sizeof(path), "%s%s", g_vhost_user_dev_dirname, name) >= (int)sizeof(path)) {
1738 		SPDK_ERRLOG("Resulting socket path for controller %s is too long: %s%s\n",
1739 				name,g_vhost_user_dev_dirname, name);
1740 		return -EINVAL;
1741 	}
1742 
1743 	vdev->path = strdup(path);
1744 	if (vdev->path == NULL) {
1745 		return -EIO;
1746 	}
1747 
1748 	user_dev = calloc(1, sizeof(*user_dev));
1749 	if (user_dev == NULL) {
1750 		free(vdev->path);
1751 		return -ENOMEM;
1752 	}
1753 	vdev->ctxt = user_dev;
1754 
1755 	vdev->thread = spdk_thread_create(vdev->name, cpumask);
1756 	if (vdev->thread == NULL) {
1757 		free(user_dev);
1758 		free(vdev->path);
1759 		SPDK_ERRLOG("Failed to create thread for vhost controller %s.\n", name);
1760 		return -EIO;
1761 	}
1762 
1763 	vdev->registered = true;
1764 	user_dev->user_backend = user_backend;
1765 	user_dev->vdev = vdev;
1766 	TAILQ_INIT(&user_dev->vsessions);
1767 
1768 	vhost_user_dev_set_coalescing(user_dev, SPDK_VHOST_COALESCING_DELAY_BASE_US,
1769 				 SPDK_VHOST_VQ_IOPS_COALESCING_THRESHOLD);
1770 
1771 	if (vhost_register_unix_socket(path, name, vdev->virtio_features, vdev->disabled_features,
1772 				       vdev->protocol_features)) {
1773 		spdk_thread_send_msg(vdev->thread, vhost_dev_thread_exit, NULL);
1774 		free(user_dev);
1775 		free(vdev->path);
1776 		return -EIO;
1777 	}
1778 
1779 	return 0;
1780 }
1781 
1782 int
1783 vhost_user_dev_unregister(struct spdk_vhost_dev *vdev)
1784 {
1785 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1786 
1787 	if (user_dev->pending_async_op_num) {
1788 		return -EBUSY;
1789 	}
1790 
1791 	if (!TAILQ_EMPTY(&user_dev->vsessions)) {
1792 		SPDK_ERRLOG("Controller %s has still valid connection.\n", vdev->name);
1793 		return -EBUSY;
1794 	}
1795 
1796 	if (vdev->registered && vhost_driver_unregister(vdev->path) != 0) {
1797 		SPDK_ERRLOG("Could not unregister controller %s with vhost library\n"
1798 			    "Check if domain socket %s still exists\n",
1799 			    vdev->name, vdev->path);
1800 		return -EIO;
1801 	}
1802 
1803 	spdk_thread_send_msg(vdev->thread, vhost_dev_thread_exit, NULL);
1804 	free(user_dev);
1805 	free(vdev->path);
1806 
1807 	return 0;
1808 }
1809 
1810 static bool g_vhost_user_started = false;
1811 
1812 int
1813 vhost_user_init(void)
1814 {
1815 	size_t len;
1816 
1817 	if (g_vhost_user_started) {
1818 		return 0;
1819 	}
1820 
1821 	if (g_vhost_user_dev_dirname[0] == '\0') {
1822 		if (getcwd(g_vhost_user_dev_dirname, sizeof(g_vhost_user_dev_dirname) - 1) == NULL) {
1823 			SPDK_ERRLOG("getcwd failed (%d): %s\n", errno, spdk_strerror(errno));
1824 			return -1;
1825 		}
1826 
1827 		len = strlen(g_vhost_user_dev_dirname);
1828 		if (g_vhost_user_dev_dirname[len - 1] != '/') {
1829 			g_vhost_user_dev_dirname[len] = '/';
1830 			g_vhost_user_dev_dirname[len + 1] = '\0';
1831 		}
1832 	}
1833 
1834 	g_vhost_user_started = true;
1835 
1836 	g_vhost_user_init_thread = spdk_get_thread();
1837 	assert(g_vhost_user_init_thread != NULL);
1838 
1839 	return 0;
1840 }
1841 
1842 static void
1843 vhost_user_session_shutdown_on_init(void *vhost_cb)
1844 {
1845 	spdk_vhost_fini_cb fn = vhost_cb;
1846 
1847 	fn();
1848 }
1849 
1850 static void *
1851 vhost_user_session_shutdown(void *vhost_cb)
1852 {
1853 	struct spdk_vhost_dev *vdev = NULL;
1854 	struct spdk_vhost_session *vsession;
1855 
1856 	for (vdev = spdk_vhost_dev_next(NULL); vdev != NULL;
1857 	     vdev = spdk_vhost_dev_next(vdev)) {
1858 		spdk_vhost_lock();
1859 		TAILQ_FOREACH(vsession, &to_user_dev(vdev)->vsessions, tailq) {
1860 			if (vsession->started) {
1861 				_stop_session(vsession);
1862 			}
1863 		}
1864 		spdk_vhost_unlock();
1865 		vhost_driver_unregister(vdev->path);
1866 		vdev->registered = false;
1867 	}
1868 
1869 	SPDK_INFOLOG(vhost, "Exiting\n");
1870 	spdk_thread_send_msg(g_vhost_user_init_thread, vhost_user_session_shutdown_on_init, vhost_cb);
1871 	return NULL;
1872 }
1873 
1874 void
1875 vhost_user_fini(spdk_vhost_fini_cb vhost_cb)
1876 {
1877 	pthread_t tid;
1878 	int rc;
1879 
1880 	if (!g_vhost_user_started) {
1881 		vhost_cb();
1882 		return;
1883 	}
1884 
1885 	g_vhost_user_started = false;
1886 
1887 	/* rte_vhost API for removing sockets is not asynchronous. Since it may call SPDK
1888 	 * ops for stopping a device or removing a connection, we need to call it from
1889 	 * a separate thread to avoid deadlock.
1890 	 */
1891 	rc = pthread_create(&tid, NULL, &vhost_user_session_shutdown, vhost_cb);
1892 	if (rc < 0) {
1893 		SPDK_ERRLOG("Failed to start session shutdown thread (%d): %s\n", rc, spdk_strerror(rc));
1894 		abort();
1895 	}
1896 	pthread_detach(tid);
1897 }
1898