xref: /spdk/lib/vhost/rte_vhost_user.c (revision 7c1a27af46a1cbcda1b76e027c3e2f3f85e4d359)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2019 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2021 Mellanox Technologies LTD. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/env.h"
10 #include "spdk/likely.h"
11 #include "spdk/string.h"
12 #include "spdk/util.h"
13 #include "spdk/memory.h"
14 #include "spdk/barrier.h"
15 #include "spdk/vhost.h"
16 #include "vhost_internal.h"
17 #include <rte_version.h>
18 
19 #include "spdk_internal/vhost_user.h"
20 
21 /* Path to folder where character device will be created. Can be set by user. */
22 static char g_vhost_user_dev_dirname[PATH_MAX] = "";
23 
24 static struct spdk_thread *g_vhost_user_init_thread;
25 
26 /**
27  * DPDK calls our callbacks synchronously but the work those callbacks
28  * perform needs to be async. Luckily, all DPDK callbacks are called on
29  * a DPDK-internal pthread, so we'll just wait on a semaphore in there.
30  */
31 static sem_t g_dpdk_sem;
32 
33 /** Return code for the current DPDK callback */
34 static int g_dpdk_response;
35 
36 struct vhost_session_fn_ctx {
37 	/** Device pointer obtained before enqueueing the event */
38 	struct spdk_vhost_dev *vdev;
39 
40 	/** ID of the session to send event to. */
41 	uint32_t vsession_id;
42 
43 	/** User provided function to be executed on session's thread. */
44 	spdk_vhost_session_fn cb_fn;
45 
46 	/**
47 	 * User provided function to be called on the init thread
48 	 * after iterating through all sessions.
49 	 */
50 	spdk_vhost_dev_fn cpl_fn;
51 
52 	/** Custom user context */
53 	void *user_ctx;
54 };
55 
56 static int vhost_user_wait_for_session_stop(struct spdk_vhost_session *vsession,
57 		unsigned timeout_sec, const char *errmsg);
58 
59 static void
60 __attribute__((constructor))
61 _vhost_user_sem_init(void)
62 {
63 	if (sem_init(&g_dpdk_sem, 0, 0) != 0) {
64 		SPDK_ERRLOG("Failed to initialize semaphore for rte_vhost pthread.\n");
65 		abort();
66 	}
67 }
68 
69 static void
70 __attribute__((destructor))
71 _vhost_user_sem_destroy(void)
72 {
73 	sem_destroy(&g_dpdk_sem);
74 }
75 
76 void *
77 vhost_gpa_to_vva(struct spdk_vhost_session *vsession, uint64_t addr, uint64_t len)
78 {
79 	void *vva;
80 	uint64_t newlen;
81 
82 	newlen = len;
83 	vva = (void *)rte_vhost_va_from_guest_pa(vsession->mem, addr, &newlen);
84 	if (newlen != len) {
85 		return NULL;
86 	}
87 
88 	return vva;
89 
90 }
91 
92 static void
93 vhost_log_req_desc(struct spdk_vhost_session *vsession, struct spdk_vhost_virtqueue *virtqueue,
94 		   uint16_t req_id)
95 {
96 	struct vring_desc *desc, *desc_table;
97 	uint32_t desc_table_size;
98 	int rc;
99 
100 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
101 		return;
102 	}
103 
104 	rc = vhost_vq_get_desc(vsession, virtqueue, req_id, &desc, &desc_table, &desc_table_size);
105 	if (spdk_unlikely(rc != 0)) {
106 		SPDK_ERRLOG("Can't log used ring descriptors!\n");
107 		return;
108 	}
109 
110 	do {
111 		if (vhost_vring_desc_is_wr(desc)) {
112 			/* To be honest, only pages really touched should be logged, but
113 			 * doing so would require tracking those changes in each backed.
114 			 * Also backend most likely will touch all/most of those pages so
115 			 * for lets assume we touched all pages passed to as writeable buffers. */
116 			rte_vhost_log_write(vsession->vid, desc->addr, desc->len);
117 		}
118 		vhost_vring_desc_get_next(&desc, desc_table, desc_table_size);
119 	} while (desc);
120 }
121 
122 static void
123 vhost_log_used_vring_elem(struct spdk_vhost_session *vsession,
124 			  struct spdk_vhost_virtqueue *virtqueue,
125 			  uint16_t idx)
126 {
127 	uint64_t offset, len;
128 
129 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
130 		return;
131 	}
132 
133 	if (spdk_unlikely(virtqueue->packed.packed_ring)) {
134 		offset = idx * sizeof(struct vring_packed_desc);
135 		len = sizeof(struct vring_packed_desc);
136 	} else {
137 		offset = offsetof(struct vring_used, ring[idx]);
138 		len = sizeof(virtqueue->vring.used->ring[idx]);
139 	}
140 
141 	rte_vhost_log_used_vring(vsession->vid, virtqueue->vring_idx, offset, len);
142 }
143 
144 static void
145 vhost_log_used_vring_idx(struct spdk_vhost_session *vsession,
146 			 struct spdk_vhost_virtqueue *virtqueue)
147 {
148 	uint64_t offset, len;
149 	uint16_t vq_idx;
150 
151 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
152 		return;
153 	}
154 
155 	offset = offsetof(struct vring_used, idx);
156 	len = sizeof(virtqueue->vring.used->idx);
157 	vq_idx = virtqueue - vsession->virtqueue;
158 
159 	rte_vhost_log_used_vring(vsession->vid, vq_idx, offset, len);
160 }
161 
162 /*
163  * Get available requests from avail ring.
164  */
165 uint16_t
166 vhost_vq_avail_ring_get(struct spdk_vhost_virtqueue *virtqueue, uint16_t *reqs,
167 			uint16_t reqs_len)
168 {
169 	struct rte_vhost_vring *vring = &virtqueue->vring;
170 	struct vring_avail *avail = vring->avail;
171 	uint16_t size_mask = vring->size - 1;
172 	uint16_t last_idx = virtqueue->last_avail_idx, avail_idx = avail->idx;
173 	uint16_t count, i;
174 	int rc;
175 	uint64_t u64_value;
176 
177 	spdk_smp_rmb();
178 
179 	if (virtqueue->vsession && spdk_unlikely(virtqueue->vsession->interrupt_mode)) {
180 		/* Read to clear vring's kickfd */
181 		rc = read(vring->kickfd, &u64_value, sizeof(u64_value));
182 		if (rc < 0) {
183 			SPDK_ERRLOG("failed to acknowledge kickfd: %s.\n", spdk_strerror(errno));
184 			return -errno;
185 		}
186 	}
187 
188 	count = avail_idx - last_idx;
189 	if (spdk_likely(count == 0)) {
190 		return 0;
191 	}
192 
193 	if (spdk_unlikely(count > vring->size)) {
194 		/* TODO: the queue is unrecoverably broken and should be marked so.
195 		 * For now we will fail silently and report there are no new avail entries.
196 		 */
197 		return 0;
198 	}
199 
200 	count = spdk_min(count, reqs_len);
201 
202 	virtqueue->last_avail_idx += count;
203 	/* Check whether there are unprocessed reqs in vq, then kick vq manually */
204 	if (virtqueue->vsession && spdk_unlikely(virtqueue->vsession->interrupt_mode)) {
205 		/* If avail_idx is larger than virtqueue's last_avail_idx, then there is unprocessed reqs.
206 		 * avail_idx should get updated here from memory, in case of race condition with guest.
207 		 */
208 		avail_idx = * (volatile uint16_t *) &avail->idx;
209 		if (avail_idx > virtqueue->last_avail_idx) {
210 			/* Write to notify vring's kickfd */
211 			rc = write(vring->kickfd, &u64_value, sizeof(u64_value));
212 			if (rc < 0) {
213 				SPDK_ERRLOG("failed to kick vring: %s.\n", spdk_strerror(errno));
214 				return -errno;
215 			}
216 		}
217 	}
218 
219 	for (i = 0; i < count; i++) {
220 		reqs[i] = vring->avail->ring[(last_idx + i) & size_mask];
221 	}
222 
223 	SPDK_DEBUGLOG(vhost_ring,
224 		      "AVAIL: last_idx=%"PRIu16" avail_idx=%"PRIu16" count=%"PRIu16"\n",
225 		      last_idx, avail_idx, count);
226 
227 	return count;
228 }
229 
230 static bool
231 vhost_vring_desc_is_indirect(struct vring_desc *cur_desc)
232 {
233 	return !!(cur_desc->flags & VRING_DESC_F_INDIRECT);
234 }
235 
236 static bool
237 vhost_vring_packed_desc_is_indirect(struct vring_packed_desc *cur_desc)
238 {
239 	return (cur_desc->flags & VRING_DESC_F_INDIRECT) != 0;
240 }
241 
242 static bool
243 vhost_inflight_packed_desc_is_indirect(spdk_vhost_inflight_desc *cur_desc)
244 {
245 	return (cur_desc->flags & VRING_DESC_F_INDIRECT) != 0;
246 }
247 
248 int
249 vhost_vq_get_desc(struct spdk_vhost_session *vsession, struct spdk_vhost_virtqueue *virtqueue,
250 		  uint16_t req_idx, struct vring_desc **desc, struct vring_desc **desc_table,
251 		  uint32_t *desc_table_size)
252 {
253 	if (spdk_unlikely(req_idx >= virtqueue->vring.size)) {
254 		return -1;
255 	}
256 
257 	*desc = &virtqueue->vring.desc[req_idx];
258 
259 	if (vhost_vring_desc_is_indirect(*desc)) {
260 		*desc_table_size = (*desc)->len / sizeof(**desc);
261 		*desc_table = vhost_gpa_to_vva(vsession, (*desc)->addr,
262 					       sizeof(**desc) * *desc_table_size);
263 		*desc = *desc_table;
264 		if (*desc == NULL) {
265 			return -1;
266 		}
267 
268 		return 0;
269 	}
270 
271 	*desc_table = virtqueue->vring.desc;
272 	*desc_table_size = virtqueue->vring.size;
273 
274 	return 0;
275 }
276 
277 static bool
278 vhost_packed_desc_indirect_to_desc_table(struct spdk_vhost_session *vsession,
279 		uint64_t addr, uint32_t len,
280 		struct vring_packed_desc **desc_table,
281 		uint32_t *desc_table_size)
282 {
283 	*desc_table_size = len / sizeof(struct vring_packed_desc);
284 
285 	*desc_table = vhost_gpa_to_vva(vsession, addr, len);
286 	if (spdk_unlikely(*desc_table == NULL)) {
287 		return false;
288 	}
289 
290 	return true;
291 }
292 
293 int
294 vhost_vq_get_desc_packed(struct spdk_vhost_session *vsession,
295 			 struct spdk_vhost_virtqueue *virtqueue,
296 			 uint16_t req_idx, struct vring_packed_desc **desc,
297 			 struct vring_packed_desc **desc_table, uint32_t *desc_table_size)
298 {
299 	*desc =  &virtqueue->vring.desc_packed[req_idx];
300 
301 	/* In packed ring when the desc is non-indirect we get next desc
302 	 * by judging (desc->flag & VRING_DESC_F_NEXT) != 0. When the desc
303 	 * is indirect we get next desc by idx and desc_table_size. It's
304 	 * different from split ring.
305 	 */
306 	if (vhost_vring_packed_desc_is_indirect(*desc)) {
307 		if (!vhost_packed_desc_indirect_to_desc_table(vsession, (*desc)->addr, (*desc)->len,
308 				desc_table, desc_table_size)) {
309 			return -1;
310 		}
311 
312 		*desc = *desc_table;
313 	} else {
314 		*desc_table = NULL;
315 		*desc_table_size  = 0;
316 	}
317 
318 	return 0;
319 }
320 
321 int
322 vhost_inflight_queue_get_desc(struct spdk_vhost_session *vsession,
323 			      spdk_vhost_inflight_desc *desc_array,
324 			      uint16_t req_idx, spdk_vhost_inflight_desc **desc,
325 			      struct vring_packed_desc  **desc_table, uint32_t *desc_table_size)
326 {
327 	*desc = &desc_array[req_idx];
328 
329 	if (vhost_inflight_packed_desc_is_indirect(*desc)) {
330 		if (!vhost_packed_desc_indirect_to_desc_table(vsession, (*desc)->addr, (*desc)->len,
331 				desc_table, desc_table_size)) {
332 			return -1;
333 		}
334 
335 		/* This desc is the inflight desc not the packed desc.
336 		 * When set the F_INDIRECT the table entry should be the packed desc
337 		 * so set the inflight desc NULL.
338 		 */
339 		*desc = NULL;
340 	} else {
341 		/* When not set the F_INDIRECT means there is no packed desc table */
342 		*desc_table = NULL;
343 		*desc_table_size = 0;
344 	}
345 
346 	return 0;
347 }
348 
349 int
350 vhost_vq_used_signal(struct spdk_vhost_session *vsession,
351 		     struct spdk_vhost_virtqueue *virtqueue)
352 {
353 	if (virtqueue->used_req_cnt == 0) {
354 		return 0;
355 	}
356 
357 	SPDK_DEBUGLOG(vhost_ring,
358 		      "Queue %td - USED RING: sending IRQ: last used %"PRIu16"\n",
359 		      virtqueue - vsession->virtqueue, virtqueue->last_used_idx);
360 
361 	if (rte_vhost_vring_call(vsession->vid, virtqueue->vring_idx) == 0) {
362 		/* interrupt signalled */
363 		virtqueue->req_cnt += virtqueue->used_req_cnt;
364 		virtqueue->used_req_cnt = 0;
365 		return 1;
366 	} else {
367 		/* interrupt not signalled */
368 		return 0;
369 	}
370 }
371 
372 static void
373 session_vq_io_stats_update(struct spdk_vhost_session *vsession,
374 			   struct spdk_vhost_virtqueue *virtqueue, uint64_t now)
375 {
376 	uint32_t irq_delay_base = vsession->coalescing_delay_time_base;
377 	uint32_t io_threshold = vsession->coalescing_io_rate_threshold;
378 	int32_t irq_delay;
379 	uint32_t req_cnt;
380 
381 	req_cnt = virtqueue->req_cnt + virtqueue->used_req_cnt;
382 	if (req_cnt <= io_threshold) {
383 		return;
384 	}
385 
386 	irq_delay = (irq_delay_base * (req_cnt - io_threshold)) / io_threshold;
387 	virtqueue->irq_delay_time = (uint32_t) spdk_max(0, irq_delay);
388 
389 	virtqueue->req_cnt = 0;
390 	virtqueue->next_event_time = now;
391 }
392 
393 static void
394 check_session_vq_io_stats(struct spdk_vhost_session *vsession,
395 			  struct spdk_vhost_virtqueue *virtqueue, uint64_t now)
396 {
397 	if (now < vsession->next_stats_check_time) {
398 		return;
399 	}
400 
401 	vsession->next_stats_check_time = now + vsession->stats_check_interval;
402 	session_vq_io_stats_update(vsession, virtqueue, now);
403 }
404 
405 static inline bool
406 vhost_vq_event_is_suppressed(struct spdk_vhost_virtqueue *vq)
407 {
408 	if (spdk_unlikely(vq->packed.packed_ring)) {
409 		if (vq->vring.driver_event->flags & VRING_PACKED_EVENT_FLAG_DISABLE) {
410 			return true;
411 		}
412 	} else {
413 		if (vq->vring.avail->flags & VRING_AVAIL_F_NO_INTERRUPT) {
414 			return true;
415 		}
416 	}
417 
418 	return false;
419 }
420 
421 void
422 vhost_session_vq_used_signal(struct spdk_vhost_virtqueue *virtqueue)
423 {
424 	struct spdk_vhost_session *vsession = virtqueue->vsession;
425 	uint64_t now;
426 
427 	if (vsession->coalescing_delay_time_base == 0) {
428 		if (virtqueue->vring.desc == NULL) {
429 			return;
430 		}
431 
432 		if (vhost_vq_event_is_suppressed(virtqueue)) {
433 			return;
434 		}
435 
436 		vhost_vq_used_signal(vsession, virtqueue);
437 	} else {
438 		now = spdk_get_ticks();
439 		check_session_vq_io_stats(vsession, virtqueue, now);
440 
441 		/* No need for event right now */
442 		if (now < virtqueue->next_event_time) {
443 			return;
444 		}
445 
446 		if (vhost_vq_event_is_suppressed(virtqueue)) {
447 			return;
448 		}
449 
450 		if (!vhost_vq_used_signal(vsession, virtqueue)) {
451 			return;
452 		}
453 
454 		/* Syscall is quite long so update time */
455 		now = spdk_get_ticks();
456 		virtqueue->next_event_time = now + virtqueue->irq_delay_time;
457 	}
458 }
459 
460 /*
461  * Enqueue id and len to used ring.
462  */
463 void
464 vhost_vq_used_ring_enqueue(struct spdk_vhost_session *vsession,
465 			   struct spdk_vhost_virtqueue *virtqueue,
466 			   uint16_t id, uint32_t len)
467 {
468 	struct rte_vhost_vring *vring = &virtqueue->vring;
469 	struct vring_used *used = vring->used;
470 	uint16_t last_idx = virtqueue->last_used_idx & (vring->size - 1);
471 	uint16_t vq_idx = virtqueue->vring_idx;
472 
473 	SPDK_DEBUGLOG(vhost_ring,
474 		      "Queue %td - USED RING: last_idx=%"PRIu16" req id=%"PRIu16" len=%"PRIu32"\n",
475 		      virtqueue - vsession->virtqueue, virtqueue->last_used_idx, id, len);
476 
477 	vhost_log_req_desc(vsession, virtqueue, id);
478 
479 	virtqueue->last_used_idx++;
480 	used->ring[last_idx].id = id;
481 	used->ring[last_idx].len = len;
482 
483 	/* Ensure the used ring is updated before we log it or increment used->idx. */
484 	spdk_smp_wmb();
485 
486 	rte_vhost_set_last_inflight_io_split(vsession->vid, vq_idx, id);
487 
488 	vhost_log_used_vring_elem(vsession, virtqueue, last_idx);
489 	* (volatile uint16_t *) &used->idx = virtqueue->last_used_idx;
490 	vhost_log_used_vring_idx(vsession, virtqueue);
491 
492 	rte_vhost_clr_inflight_desc_split(vsession->vid, vq_idx, virtqueue->last_used_idx, id);
493 
494 	virtqueue->used_req_cnt++;
495 
496 	if (vsession->interrupt_mode) {
497 		if (virtqueue->vring.desc == NULL || vhost_vq_event_is_suppressed(virtqueue)) {
498 			return;
499 		}
500 
501 		vhost_vq_used_signal(vsession, virtqueue);
502 	}
503 }
504 
505 void
506 vhost_vq_packed_ring_enqueue(struct spdk_vhost_session *vsession,
507 			     struct spdk_vhost_virtqueue *virtqueue,
508 			     uint16_t num_descs, uint16_t buffer_id,
509 			     uint32_t length, uint16_t inflight_head)
510 {
511 	struct vring_packed_desc *desc = &virtqueue->vring.desc_packed[virtqueue->last_used_idx];
512 	bool used, avail;
513 
514 	SPDK_DEBUGLOG(vhost_ring,
515 		      "Queue %td - RING: buffer_id=%"PRIu16"\n",
516 		      virtqueue - vsession->virtqueue, buffer_id);
517 
518 	/* When the descriptor is used, two flags in descriptor
519 	 * avail flag and used flag are set to equal
520 	 * and used flag value == used_wrap_counter.
521 	 */
522 	used = !!(desc->flags & VRING_DESC_F_USED);
523 	avail = !!(desc->flags & VRING_DESC_F_AVAIL);
524 	if (spdk_unlikely(used == virtqueue->packed.used_phase && used == avail)) {
525 		SPDK_ERRLOG("descriptor has been used before\n");
526 		return;
527 	}
528 
529 	/* In used desc addr is unused and len specifies the buffer length
530 	 * that has been written to by the device.
531 	 */
532 	desc->addr = 0;
533 	desc->len = length;
534 
535 	/* This bit specifies whether any data has been written by the device */
536 	if (length != 0) {
537 		desc->flags |= VRING_DESC_F_WRITE;
538 	}
539 
540 	/* Buffer ID is included in the last descriptor in the list.
541 	 * The driver needs to keep track of the size of the list corresponding
542 	 * to each buffer ID.
543 	 */
544 	desc->id = buffer_id;
545 
546 	/* A device MUST NOT make the descriptor used before buffer_id is
547 	 * written to the descriptor.
548 	 */
549 	spdk_smp_wmb();
550 
551 	rte_vhost_set_last_inflight_io_packed(vsession->vid, virtqueue->vring_idx, inflight_head);
552 	/* To mark a desc as used, the device sets the F_USED bit in flags to match
553 	 * the internal Device ring wrap counter. It also sets the F_AVAIL bit to
554 	 * match the same value.
555 	 */
556 	if (virtqueue->packed.used_phase) {
557 		desc->flags |= VRING_DESC_F_AVAIL_USED;
558 	} else {
559 		desc->flags &= ~VRING_DESC_F_AVAIL_USED;
560 	}
561 	rte_vhost_clr_inflight_desc_packed(vsession->vid, virtqueue->vring_idx, inflight_head);
562 
563 	vhost_log_used_vring_elem(vsession, virtqueue, virtqueue->last_used_idx);
564 	virtqueue->last_used_idx += num_descs;
565 	if (virtqueue->last_used_idx >= virtqueue->vring.size) {
566 		virtqueue->last_used_idx -= virtqueue->vring.size;
567 		virtqueue->packed.used_phase = !virtqueue->packed.used_phase;
568 	}
569 
570 	virtqueue->used_req_cnt++;
571 }
572 
573 bool
574 vhost_vq_packed_ring_is_avail(struct spdk_vhost_virtqueue *virtqueue)
575 {
576 	uint16_t flags = virtqueue->vring.desc_packed[virtqueue->last_avail_idx].flags;
577 
578 	/* To mark a desc as available, the driver sets the F_AVAIL bit in flags
579 	 * to match the internal avail wrap counter. It also sets the F_USED bit to
580 	 * match the inverse value but it's not mandatory.
581 	 */
582 	return (!!(flags & VRING_DESC_F_AVAIL) == virtqueue->packed.avail_phase);
583 }
584 
585 bool
586 vhost_vring_packed_desc_is_wr(struct vring_packed_desc *cur_desc)
587 {
588 	return (cur_desc->flags & VRING_DESC_F_WRITE) != 0;
589 }
590 
591 bool
592 vhost_vring_inflight_desc_is_wr(spdk_vhost_inflight_desc *cur_desc)
593 {
594 	return (cur_desc->flags & VRING_DESC_F_WRITE) != 0;
595 }
596 
597 int
598 vhost_vring_packed_desc_get_next(struct vring_packed_desc **desc, uint16_t *req_idx,
599 				 struct spdk_vhost_virtqueue *vq,
600 				 struct vring_packed_desc *desc_table,
601 				 uint32_t desc_table_size)
602 {
603 	if (desc_table != NULL) {
604 		/* When the desc_table isn't NULL means it's indirect and we get the next
605 		 * desc by req_idx and desc_table_size. The return value is NULL means
606 		 * we reach the last desc of this request.
607 		 */
608 		(*req_idx)++;
609 		if (*req_idx < desc_table_size) {
610 			*desc = &desc_table[*req_idx];
611 		} else {
612 			*desc = NULL;
613 		}
614 	} else {
615 		/* When the desc_table is NULL means it's non-indirect and we get the next
616 		 * desc by req_idx and F_NEXT in flags. The return value is NULL means
617 		 * we reach the last desc of this request. When return new desc
618 		 * we update the req_idx too.
619 		 */
620 		if (((*desc)->flags & VRING_DESC_F_NEXT) == 0) {
621 			*desc = NULL;
622 			return 0;
623 		}
624 
625 		*req_idx = (*req_idx + 1) % vq->vring.size;
626 		*desc = &vq->vring.desc_packed[*req_idx];
627 	}
628 
629 	return 0;
630 }
631 
632 static int
633 vhost_vring_desc_payload_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
634 				uint16_t *iov_index, uintptr_t payload, uint64_t remaining)
635 {
636 	uintptr_t vva;
637 	uint64_t len;
638 
639 	do {
640 		if (*iov_index >= SPDK_VHOST_IOVS_MAX) {
641 			SPDK_ERRLOG("SPDK_VHOST_IOVS_MAX(%d) reached\n", SPDK_VHOST_IOVS_MAX);
642 			return -1;
643 		}
644 		len = remaining;
645 		vva = (uintptr_t)rte_vhost_va_from_guest_pa(vsession->mem, payload, &len);
646 		if (vva == 0 || len == 0) {
647 			SPDK_ERRLOG("gpa_to_vva(%p) == NULL\n", (void *)payload);
648 			return -1;
649 		}
650 		iov[*iov_index].iov_base = (void *)vva;
651 		iov[*iov_index].iov_len = len;
652 		remaining -= len;
653 		payload += len;
654 		(*iov_index)++;
655 	} while (remaining);
656 
657 	return 0;
658 }
659 
660 int
661 vhost_vring_packed_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
662 			       uint16_t *iov_index, const struct vring_packed_desc *desc)
663 {
664 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
665 					       desc->addr, desc->len);
666 }
667 
668 int
669 vhost_vring_inflight_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
670 				 uint16_t *iov_index, const spdk_vhost_inflight_desc *desc)
671 {
672 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
673 					       desc->addr, desc->len);
674 }
675 
676 /* 1, Traverse the desc chain to get the buffer_id and return buffer_id as task_idx.
677  * 2, Update the vq->last_avail_idx to point next available desc chain.
678  * 3, Update the avail_wrap_counter if last_avail_idx overturn.
679  */
680 uint16_t
681 vhost_vring_packed_desc_get_buffer_id(struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
682 				      uint16_t *num_descs)
683 {
684 	struct vring_packed_desc *desc;
685 	uint16_t desc_head = req_idx;
686 
687 	*num_descs = 1;
688 
689 	desc =  &vq->vring.desc_packed[req_idx];
690 	if (!vhost_vring_packed_desc_is_indirect(desc)) {
691 		while ((desc->flags & VRING_DESC_F_NEXT) != 0) {
692 			req_idx = (req_idx + 1) % vq->vring.size;
693 			desc = &vq->vring.desc_packed[req_idx];
694 			(*num_descs)++;
695 		}
696 	}
697 
698 	/* Queue Size doesn't have to be a power of 2
699 	 * Device maintains last_avail_idx so we can make sure
700 	 * the value is valid(0 ~ vring.size - 1)
701 	 */
702 	vq->last_avail_idx = (req_idx + 1) % vq->vring.size;
703 	if (vq->last_avail_idx < desc_head) {
704 		vq->packed.avail_phase = !vq->packed.avail_phase;
705 	}
706 
707 	return desc->id;
708 }
709 
710 int
711 vhost_vring_desc_get_next(struct vring_desc **desc,
712 			  struct vring_desc *desc_table, uint32_t desc_table_size)
713 {
714 	struct vring_desc *old_desc = *desc;
715 	uint16_t next_idx;
716 
717 	if ((old_desc->flags & VRING_DESC_F_NEXT) == 0) {
718 		*desc = NULL;
719 		return 0;
720 	}
721 
722 	next_idx = old_desc->next;
723 	if (spdk_unlikely(next_idx >= desc_table_size)) {
724 		*desc = NULL;
725 		return -1;
726 	}
727 
728 	*desc = &desc_table[next_idx];
729 	return 0;
730 }
731 
732 int
733 vhost_vring_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
734 			uint16_t *iov_index, const struct vring_desc *desc)
735 {
736 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
737 					       desc->addr, desc->len);
738 }
739 
740 static inline void
741 vhost_session_mem_region_calc(uint64_t *previous_start, uint64_t *start, uint64_t *end,
742 			      uint64_t *len, struct rte_vhost_mem_region *region)
743 {
744 	*start = FLOOR_2MB(region->mmap_addr);
745 	*end = CEIL_2MB(region->mmap_addr + region->mmap_size);
746 	if (*start == *previous_start) {
747 		*start += (size_t) VALUE_2MB;
748 	}
749 	*previous_start = *start;
750 	*len = *end - *start;
751 }
752 
753 void
754 vhost_session_mem_register(struct rte_vhost_memory *mem)
755 {
756 	uint64_t start, end, len;
757 	uint32_t i;
758 	uint64_t previous_start = UINT64_MAX;
759 
760 
761 	for (i = 0; i < mem->nregions; i++) {
762 		vhost_session_mem_region_calc(&previous_start, &start, &end, &len, &mem->regions[i]);
763 		SPDK_INFOLOG(vhost, "Registering VM memory for vtophys translation - 0x%jx len:0x%jx\n",
764 			     start, len);
765 
766 		if (spdk_mem_register((void *)start, len) != 0) {
767 			SPDK_WARNLOG("Failed to register memory region %"PRIu32". Future vtophys translation might fail.\n",
768 				     i);
769 			continue;
770 		}
771 	}
772 }
773 
774 void
775 vhost_session_mem_unregister(struct rte_vhost_memory *mem)
776 {
777 	uint64_t start, end, len;
778 	uint32_t i;
779 	uint64_t previous_start = UINT64_MAX;
780 
781 	for (i = 0; i < mem->nregions; i++) {
782 		vhost_session_mem_region_calc(&previous_start, &start, &end, &len, &mem->regions[i]);
783 		if (spdk_vtophys((void *) start, NULL) == SPDK_VTOPHYS_ERROR) {
784 			continue; /* region has not been registered */
785 		}
786 
787 		if (spdk_mem_unregister((void *)start, len) != 0) {
788 			assert(false);
789 		}
790 	}
791 }
792 
793 static bool
794 vhost_memory_changed(struct rte_vhost_memory *new,
795 		     struct rte_vhost_memory *old)
796 {
797 	uint32_t i;
798 
799 	if (new->nregions != old->nregions) {
800 		return true;
801 	}
802 
803 	for (i = 0; i < new->nregions; ++i) {
804 		struct rte_vhost_mem_region *new_r = &new->regions[i];
805 		struct rte_vhost_mem_region *old_r = &old->regions[i];
806 
807 		if (new_r->guest_phys_addr != old_r->guest_phys_addr) {
808 			return true;
809 		}
810 		if (new_r->size != old_r->size) {
811 			return true;
812 		}
813 		if (new_r->guest_user_addr != old_r->guest_user_addr) {
814 			return true;
815 		}
816 		if (new_r->mmap_addr != old_r->mmap_addr) {
817 			return true;
818 		}
819 		if (new_r->fd != old_r->fd) {
820 			return true;
821 		}
822 	}
823 
824 	return false;
825 }
826 
827 static int
828 vhost_register_memtable_if_required(struct spdk_vhost_session *vsession, int vid)
829 {
830 	struct rte_vhost_memory *new_mem;
831 
832 	if (vhost_get_mem_table(vid, &new_mem) != 0) {
833 		SPDK_ERRLOG("vhost device %d: Failed to get guest memory table\n", vid);
834 		return -1;
835 	}
836 
837 	if (vsession->mem == NULL) {
838 		SPDK_INFOLOG(vhost, "Start to set memtable\n");
839 		vsession->mem = new_mem;
840 		vhost_session_mem_register(vsession->mem);
841 		return 0;
842 	}
843 
844 	if (vhost_memory_changed(new_mem, vsession->mem)) {
845 		SPDK_INFOLOG(vhost, "Memtable is changed\n");
846 		vhost_session_mem_unregister(vsession->mem);
847 		free(vsession->mem);
848 
849 		vsession->mem = new_mem;
850 		vhost_session_mem_register(vsession->mem);
851 		return 0;
852 
853 	}
854 
855 	SPDK_INFOLOG(vhost, "Memtable is unchanged\n");
856 	free(new_mem);
857 	return 0;
858 }
859 
860 static int
861 _stop_session(struct spdk_vhost_session *vsession)
862 {
863 	struct spdk_vhost_virtqueue *q;
864 	int rc;
865 	uint16_t i;
866 
867 	rc = vhost_user_wait_for_session_stop(vsession, 3, "stop session");
868 	if (rc != 0) {
869 		SPDK_ERRLOG("Couldn't stop device with vid %d.\n", vsession->vid);
870 		return rc;
871 	}
872 
873 	for (i = 0; i < vsession->max_queues; i++) {
874 		q = &vsession->virtqueue[i];
875 
876 		/* vring.desc and vring.desc_packed are in a union struct
877 		 * so q->vring.desc can replace q->vring.desc_packed.
878 		 */
879 		if (q->vring.desc == NULL) {
880 			continue;
881 		}
882 
883 		/* Packed virtqueues support up to 2^15 entries each
884 		 * so left one bit can be used as wrap counter.
885 		 */
886 		if (q->packed.packed_ring) {
887 			q->last_avail_idx = q->last_avail_idx |
888 					    ((uint16_t)q->packed.avail_phase << 15);
889 			q->last_used_idx = q->last_used_idx |
890 					   ((uint16_t)q->packed.used_phase << 15);
891 		}
892 
893 		rte_vhost_set_vring_base(vsession->vid, i, q->last_avail_idx, q->last_used_idx);
894 		q->vring.desc = NULL;
895 	}
896 	vsession->max_queues = 0;
897 
898 	return 0;
899 }
900 
901 static int
902 new_connection(int vid)
903 {
904 	struct spdk_vhost_dev *vdev;
905 	struct spdk_vhost_user_dev *user_dev;
906 	struct spdk_vhost_session *vsession;
907 	size_t dev_dirname_len;
908 	char ifname[PATH_MAX];
909 	char *ctrlr_name;
910 
911 	if (rte_vhost_get_ifname(vid, ifname, PATH_MAX) < 0) {
912 		SPDK_ERRLOG("Couldn't get a valid ifname for device with vid %d\n", vid);
913 		return -1;
914 	}
915 
916 	ctrlr_name = &ifname[0];
917 	dev_dirname_len = strlen(g_vhost_user_dev_dirname);
918 	if (strncmp(ctrlr_name, g_vhost_user_dev_dirname, dev_dirname_len) == 0) {
919 		ctrlr_name += dev_dirname_len;
920 	}
921 
922 	spdk_vhost_lock();
923 	vdev = spdk_vhost_dev_find(ctrlr_name);
924 	if (vdev == NULL) {
925 		SPDK_ERRLOG("Couldn't find device with vid %d to create connection for.\n", vid);
926 		spdk_vhost_unlock();
927 		return -1;
928 	}
929 	spdk_vhost_unlock();
930 
931 	user_dev = to_user_dev(vdev);
932 	pthread_mutex_lock(&user_dev->lock);
933 	if (user_dev->registered == false) {
934 		SPDK_ERRLOG("Device %s is unregistered\n", ctrlr_name);
935 		pthread_mutex_unlock(&user_dev->lock);
936 		return -1;
937 	}
938 
939 	/* We expect sessions inside user_dev->vsessions to be sorted in ascending
940 	 * order in regard of vsession->id. For now we always set id = vsessions_num++
941 	 * and append each session to the very end of the vsessions list.
942 	 * This is required for vhost_user_dev_foreach_session() to work.
943 	 */
944 	if (user_dev->vsessions_num == UINT_MAX) {
945 		pthread_mutex_unlock(&user_dev->lock);
946 		assert(false);
947 		return -EINVAL;
948 	}
949 
950 	if (posix_memalign((void **)&vsession, SPDK_CACHE_LINE_SIZE, sizeof(*vsession) +
951 			   user_dev->user_backend->session_ctx_size)) {
952 		SPDK_ERRLOG("vsession alloc failed\n");
953 		pthread_mutex_unlock(&user_dev->lock);
954 		return -1;
955 	}
956 	memset(vsession, 0, sizeof(*vsession) + user_dev->user_backend->session_ctx_size);
957 
958 	vsession->vdev = vdev;
959 	vsession->vid = vid;
960 	vsession->id = user_dev->vsessions_num++;
961 	vsession->name = spdk_sprintf_alloc("%ss%u", vdev->name, vsession->vid);
962 	if (vsession->name == NULL) {
963 		SPDK_ERRLOG("vsession alloc failed\n");
964 		free(vsession);
965 		pthread_mutex_unlock(&user_dev->lock);
966 		return -1;
967 	}
968 	vsession->started = false;
969 	vsession->starting = false;
970 	vsession->next_stats_check_time = 0;
971 	vsession->stats_check_interval = SPDK_VHOST_STATS_CHECK_INTERVAL_MS *
972 					 spdk_get_ticks_hz() / 1000UL;
973 	TAILQ_INSERT_TAIL(&user_dev->vsessions, vsession, tailq);
974 	vhost_session_install_rte_compat_hooks(vsession);
975 	pthread_mutex_unlock(&user_dev->lock);
976 
977 	return 0;
978 }
979 
980 static void
981 vhost_user_session_start(void *arg1)
982 {
983 	struct spdk_vhost_session *vsession = arg1;
984 	struct spdk_vhost_dev *vdev = vsession->vdev;
985 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vsession->vdev);
986 	const struct spdk_vhost_user_dev_backend *backend;
987 	int rc;
988 
989 	SPDK_INFOLOG(vhost, "Starting new session for device %s with vid %d\n", vdev->name, vsession->vid);
990 	pthread_mutex_lock(&user_dev->lock);
991 	vsession->starting = false;
992 	backend = user_dev->user_backend;
993 	rc = backend->start_session(vdev, vsession, NULL);
994 	if (rc == 0) {
995 		vsession->started = true;
996 	}
997 	pthread_mutex_unlock(&user_dev->lock);
998 }
999 
1000 static int
1001 set_device_vq_callfd(struct spdk_vhost_session *vsession, uint16_t qid)
1002 {
1003 	struct spdk_vhost_virtqueue *q;
1004 
1005 	if (qid >= SPDK_VHOST_MAX_VQUEUES) {
1006 		return -EINVAL;
1007 	}
1008 
1009 	q = &vsession->virtqueue[qid];
1010 	/* vq isn't enabled yet */
1011 	if (q->vring_idx != qid) {
1012 		return 0;
1013 	}
1014 
1015 	/* vring.desc and vring.desc_packed are in a union struct
1016 	 * so q->vring.desc can replace q->vring.desc_packed.
1017 	 */
1018 	if (q->vring.desc == NULL || q->vring.size == 0) {
1019 		return 0;
1020 	}
1021 
1022 	/*
1023 	 * Not sure right now but this look like some kind of QEMU bug and guest IO
1024 	 * might be frozed without kicking all queues after live-migration. This look like
1025 	 * the previous vhost instance failed to effectively deliver all interrupts before
1026 	 * the GET_VRING_BASE message. This shouldn't harm guest since spurious interrupts
1027 	 * should be ignored by guest virtio driver.
1028 	 *
1029 	 * Tested on QEMU 2.10.91 and 2.11.50.
1030 	 *
1031 	 * Make sure a successful call of
1032 	 * `rte_vhost_vring_call` will happen
1033 	 * after starting the device.
1034 	 */
1035 	q->used_req_cnt += 1;
1036 
1037 	return 0;
1038 }
1039 
1040 static int
1041 enable_device_vq(struct spdk_vhost_session *vsession, uint16_t qid)
1042 {
1043 	struct spdk_vhost_virtqueue *q;
1044 	bool packed_ring;
1045 	const struct spdk_vhost_user_dev_backend *backend;
1046 	int rc;
1047 
1048 	if (qid >= SPDK_VHOST_MAX_VQUEUES) {
1049 		return -EINVAL;
1050 	}
1051 
1052 	q = &vsession->virtqueue[qid];
1053 	memset(q, 0, sizeof(*q));
1054 	packed_ring = ((vsession->negotiated_features & (1ULL << VIRTIO_F_RING_PACKED)) != 0);
1055 
1056 	q->vsession = vsession;
1057 	q->vring_idx = -1;
1058 	if (rte_vhost_get_vhost_vring(vsession->vid, qid, &q->vring)) {
1059 		return 0;
1060 	}
1061 	q->vring_idx = qid;
1062 	rte_vhost_get_vhost_ring_inflight(vsession->vid, qid, &q->vring_inflight);
1063 
1064 	/* vring.desc and vring.desc_packed are in a union struct
1065 	 * so q->vring.desc can replace q->vring.desc_packed.
1066 	 */
1067 	if (q->vring.desc == NULL || q->vring.size == 0) {
1068 		return 0;
1069 	}
1070 
1071 	if (rte_vhost_get_vring_base(vsession->vid, qid, &q->last_avail_idx, &q->last_used_idx)) {
1072 		q->vring.desc = NULL;
1073 		return 0;
1074 	}
1075 
1076 	backend = to_user_dev(vsession->vdev)->user_backend;
1077 	rc = backend->alloc_vq_tasks(vsession, qid);
1078 	if (rc) {
1079 		return rc;
1080 	}
1081 
1082 	if (packed_ring) {
1083 		/* Since packed ring flag is already negociated between SPDK and VM, VM doesn't
1084 		 * restore `last_avail_idx` and `last_used_idx` for packed ring, so use the
1085 		 * inflight mem to restore the `last_avail_idx` and `last_used_idx`.
1086 		 */
1087 		rte_vhost_get_vring_base_from_inflight(vsession->vid, qid, &q->last_avail_idx,
1088 						       &q->last_used_idx);
1089 
1090 		/* Packed virtqueues support up to 2^15 entries each
1091 		 * so left one bit can be used as wrap counter.
1092 		 */
1093 		q->packed.avail_phase = q->last_avail_idx >> 15;
1094 		q->last_avail_idx = q->last_avail_idx & 0x7FFF;
1095 		q->packed.used_phase = q->last_used_idx >> 15;
1096 		q->last_used_idx = q->last_used_idx & 0x7FFF;
1097 
1098 		if (!spdk_interrupt_mode_is_enabled()) {
1099 			/* Disable I/O submission notifications, we'll be polling. */
1100 			q->vring.device_event->flags = VRING_PACKED_EVENT_FLAG_DISABLE;
1101 		} else {
1102 			/* Enable I/O submission notifications, we'll be interrupting. */
1103 			q->vring.device_event->flags = VRING_PACKED_EVENT_FLAG_ENABLE;
1104 		}
1105 	} else {
1106 		if (!spdk_interrupt_mode_is_enabled()) {
1107 			/* Disable I/O submission notifications, we'll be polling. */
1108 			q->vring.used->flags = VRING_USED_F_NO_NOTIFY;
1109 		} else {
1110 			/* Enable I/O submission notifications, we'll be interrupting. */
1111 			q->vring.used->flags = 0;
1112 		}
1113 	}
1114 
1115 	if (spdk_interrupt_mode_is_enabled() && backend->register_vq_interrupt) {
1116 		backend->register_vq_interrupt(vsession, q);
1117 	}
1118 
1119 	q->packed.packed_ring = packed_ring;
1120 	vsession->max_queues = spdk_max(vsession->max_queues, qid + 1);
1121 
1122 	return 0;
1123 }
1124 
1125 static int
1126 start_device(int vid)
1127 {
1128 	struct spdk_vhost_dev *vdev;
1129 	struct spdk_vhost_session *vsession;
1130 	struct spdk_vhost_user_dev *user_dev;
1131 	int rc = 0;
1132 
1133 	vsession = vhost_session_find_by_vid(vid);
1134 	if (vsession == NULL) {
1135 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1136 		return -1;
1137 	}
1138 	vdev = vsession->vdev;
1139 	user_dev = to_user_dev(vdev);
1140 
1141 	pthread_mutex_lock(&user_dev->lock);
1142 	if (vsession->started) {
1143 		/* already started, nothing to do */
1144 		goto out;
1145 	}
1146 
1147 	if (!vsession->mem) {
1148 		rc = -1;
1149 		SPDK_ERRLOG("Session %s doesn't set memory table yet\n", vsession->name);
1150 		goto out;
1151 	}
1152 
1153 	vsession->starting = true;
1154 	SPDK_INFOLOG(vhost, "Session %s is scheduled to start\n", vsession->name);
1155 	vhost_user_session_set_coalescing(vdev, vsession, NULL);
1156 	spdk_thread_send_msg(vdev->thread, vhost_user_session_start, vsession);
1157 
1158 out:
1159 	pthread_mutex_unlock(&user_dev->lock);
1160 	return rc;
1161 }
1162 
1163 static void
1164 stop_device(int vid)
1165 {
1166 	struct spdk_vhost_session *vsession;
1167 	struct spdk_vhost_user_dev *user_dev;
1168 
1169 	vsession = vhost_session_find_by_vid(vid);
1170 	if (vsession == NULL) {
1171 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1172 		return;
1173 	}
1174 	user_dev = to_user_dev(vsession->vdev);
1175 
1176 	pthread_mutex_lock(&user_dev->lock);
1177 	if (!vsession->started && !vsession->starting) {
1178 		pthread_mutex_unlock(&user_dev->lock);
1179 		/* already stopped, nothing to do */
1180 		return;
1181 	}
1182 
1183 	_stop_session(vsession);
1184 	pthread_mutex_unlock(&user_dev->lock);
1185 }
1186 
1187 static void
1188 destroy_connection(int vid)
1189 {
1190 	struct spdk_vhost_session *vsession;
1191 	struct spdk_vhost_user_dev *user_dev;
1192 
1193 	vsession = vhost_session_find_by_vid(vid);
1194 	if (vsession == NULL) {
1195 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1196 		return;
1197 	}
1198 	user_dev = to_user_dev(vsession->vdev);
1199 
1200 	pthread_mutex_lock(&user_dev->lock);
1201 	if (vsession->started || vsession->starting) {
1202 		if (_stop_session(vsession) != 0) {
1203 			pthread_mutex_unlock(&user_dev->lock);
1204 			return;
1205 		}
1206 	}
1207 
1208 	if (vsession->mem) {
1209 		vhost_session_mem_unregister(vsession->mem);
1210 		free(vsession->mem);
1211 	}
1212 
1213 	TAILQ_REMOVE(&to_user_dev(vsession->vdev)->vsessions, vsession, tailq);
1214 	free(vsession->name);
1215 	free(vsession);
1216 	pthread_mutex_unlock(&user_dev->lock);
1217 }
1218 
1219 #if RTE_VERSION >= RTE_VERSION_NUM(21, 11, 0, 0)
1220 static const struct rte_vhost_device_ops g_spdk_vhost_ops = {
1221 #else
1222 static const struct vhost_device_ops g_spdk_vhost_ops = {
1223 #endif
1224 	.new_device =  start_device,
1225 	.destroy_device = stop_device,
1226 	.new_connection = new_connection,
1227 	.destroy_connection = destroy_connection,
1228 };
1229 
1230 static struct spdk_vhost_session *
1231 vhost_session_find_by_id(struct spdk_vhost_dev *vdev, unsigned id)
1232 {
1233 	struct spdk_vhost_session *vsession;
1234 
1235 	TAILQ_FOREACH(vsession, &to_user_dev(vdev)->vsessions, tailq) {
1236 		if (vsession->id == id) {
1237 			return vsession;
1238 		}
1239 	}
1240 
1241 	return NULL;
1242 }
1243 
1244 struct spdk_vhost_session *
1245 vhost_session_find_by_vid(int vid)
1246 {
1247 	struct spdk_vhost_dev *vdev;
1248 	struct spdk_vhost_session *vsession;
1249 	struct spdk_vhost_user_dev *user_dev;
1250 
1251 	spdk_vhost_lock();
1252 	for (vdev = spdk_vhost_dev_next(NULL); vdev != NULL;
1253 	     vdev = spdk_vhost_dev_next(vdev)) {
1254 		user_dev = to_user_dev(vdev);
1255 
1256 		pthread_mutex_lock(&user_dev->lock);
1257 		TAILQ_FOREACH(vsession, &user_dev->vsessions, tailq) {
1258 			if (vsession->vid == vid) {
1259 				pthread_mutex_unlock(&user_dev->lock);
1260 				spdk_vhost_unlock();
1261 				return vsession;
1262 			}
1263 		}
1264 		pthread_mutex_unlock(&user_dev->lock);
1265 	}
1266 	spdk_vhost_unlock();
1267 
1268 	return NULL;
1269 }
1270 
1271 static void
1272 wait_for_semaphore(int timeout_sec, const char *errmsg)
1273 {
1274 	struct timespec timeout;
1275 	int rc;
1276 
1277 	clock_gettime(CLOCK_REALTIME, &timeout);
1278 	timeout.tv_sec += timeout_sec;
1279 	rc = sem_timedwait(&g_dpdk_sem, &timeout);
1280 	if (rc != 0) {
1281 		SPDK_ERRLOG("Timeout waiting for event: %s.\n", errmsg);
1282 		sem_wait(&g_dpdk_sem);
1283 	}
1284 }
1285 
1286 void
1287 vhost_user_session_stop_done(struct spdk_vhost_session *vsession, int response)
1288 {
1289 	if (response == 0) {
1290 		vsession->started = false;
1291 	}
1292 
1293 	g_dpdk_response = response;
1294 	sem_post(&g_dpdk_sem);
1295 }
1296 
1297 static void
1298 vhost_user_session_stop_event(void *arg1)
1299 {
1300 	struct vhost_session_fn_ctx *ctx = arg1;
1301 	struct spdk_vhost_dev *vdev = ctx->vdev;
1302 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1303 	struct spdk_vhost_session *vsession;
1304 
1305 	if (pthread_mutex_trylock(&user_dev->lock) != 0) {
1306 		spdk_thread_send_msg(spdk_get_thread(), vhost_user_session_stop_event, arg1);
1307 		return;
1308 	}
1309 
1310 	vsession = vhost_session_find_by_id(vdev, ctx->vsession_id);
1311 	user_dev->user_backend->stop_session(vdev, vsession, NULL);
1312 	pthread_mutex_unlock(&user_dev->lock);
1313 }
1314 
1315 static int
1316 vhost_user_wait_for_session_stop(struct spdk_vhost_session *vsession,
1317 				 unsigned timeout_sec, const char *errmsg)
1318 {
1319 	struct vhost_session_fn_ctx ev_ctx = {0};
1320 	struct spdk_vhost_dev *vdev = vsession->vdev;
1321 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1322 
1323 	ev_ctx.vdev = vdev;
1324 	ev_ctx.vsession_id = vsession->id;
1325 
1326 	spdk_thread_send_msg(vdev->thread, vhost_user_session_stop_event, &ev_ctx);
1327 
1328 	pthread_mutex_unlock(&user_dev->lock);
1329 	wait_for_semaphore(timeout_sec, errmsg);
1330 	pthread_mutex_lock(&user_dev->lock);
1331 
1332 	return g_dpdk_response;
1333 }
1334 
1335 static void
1336 foreach_session_finish_cb(void *arg1)
1337 {
1338 	struct vhost_session_fn_ctx *ev_ctx = arg1;
1339 	struct spdk_vhost_dev *vdev = ev_ctx->vdev;
1340 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1341 
1342 	if (pthread_mutex_trylock(&user_dev->lock) != 0) {
1343 		spdk_thread_send_msg(spdk_get_thread(),
1344 				     foreach_session_finish_cb, arg1);
1345 		return;
1346 	}
1347 
1348 	assert(user_dev->pending_async_op_num > 0);
1349 	user_dev->pending_async_op_num--;
1350 	if (ev_ctx->cpl_fn != NULL) {
1351 		ev_ctx->cpl_fn(vdev, ev_ctx->user_ctx);
1352 	}
1353 
1354 	pthread_mutex_unlock(&user_dev->lock);
1355 	free(ev_ctx);
1356 }
1357 
1358 static void
1359 foreach_session(void *arg1)
1360 {
1361 	struct vhost_session_fn_ctx *ev_ctx = arg1;
1362 	struct spdk_vhost_dev *vdev = ev_ctx->vdev;
1363 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1364 	struct spdk_vhost_session *vsession;
1365 	int rc;
1366 
1367 	if (pthread_mutex_trylock(&user_dev->lock) != 0) {
1368 		spdk_thread_send_msg(spdk_get_thread(), foreach_session, arg1);
1369 		return;
1370 	}
1371 
1372 	TAILQ_FOREACH(vsession, &user_dev->vsessions, tailq) {
1373 		rc = ev_ctx->cb_fn(vdev, vsession, ev_ctx->user_ctx);
1374 		if (rc < 0) {
1375 			goto out;
1376 		}
1377 	}
1378 
1379 out:
1380 	pthread_mutex_unlock(&user_dev->lock);
1381 	spdk_thread_send_msg(g_vhost_user_init_thread, foreach_session_finish_cb, arg1);
1382 }
1383 
1384 void
1385 vhost_user_dev_foreach_session(struct spdk_vhost_dev *vdev,
1386 			       spdk_vhost_session_fn fn,
1387 			       spdk_vhost_dev_fn cpl_fn,
1388 			       void *arg)
1389 {
1390 	struct vhost_session_fn_ctx *ev_ctx;
1391 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1392 
1393 	ev_ctx = calloc(1, sizeof(*ev_ctx));
1394 	if (ev_ctx == NULL) {
1395 		SPDK_ERRLOG("Failed to alloc vhost event.\n");
1396 		assert(false);
1397 		return;
1398 	}
1399 
1400 	ev_ctx->vdev = vdev;
1401 	ev_ctx->cb_fn = fn;
1402 	ev_ctx->cpl_fn = cpl_fn;
1403 	ev_ctx->user_ctx = arg;
1404 
1405 	pthread_mutex_lock(&user_dev->lock);
1406 	assert(user_dev->pending_async_op_num < UINT32_MAX);
1407 	user_dev->pending_async_op_num++;
1408 	pthread_mutex_unlock(&user_dev->lock);
1409 
1410 	spdk_thread_send_msg(vdev->thread, foreach_session, ev_ctx);
1411 }
1412 
1413 void
1414 vhost_user_session_set_interrupt_mode(struct spdk_vhost_session *vsession, bool interrupt_mode)
1415 {
1416 	uint16_t i;
1417 	int rc = 0;
1418 
1419 	for (i = 0; i < vsession->max_queues; i++) {
1420 		struct spdk_vhost_virtqueue *q = &vsession->virtqueue[i];
1421 		uint64_t num_events = 1;
1422 
1423 		/* vring.desc and vring.desc_packed are in a union struct
1424 		 * so q->vring.desc can replace q->vring.desc_packed.
1425 		 */
1426 		if (q->vring.desc == NULL || q->vring.size == 0) {
1427 			continue;
1428 		}
1429 
1430 		if (interrupt_mode) {
1431 
1432 			/* In case of race condition, always kick vring when switch to intr */
1433 			rc = write(q->vring.kickfd, &num_events, sizeof(num_events));
1434 			if (rc < 0) {
1435 				SPDK_ERRLOG("failed to kick vring: %s.\n", spdk_strerror(errno));
1436 			}
1437 
1438 			vsession->interrupt_mode = true;
1439 		} else {
1440 
1441 			vsession->interrupt_mode = false;
1442 		}
1443 	}
1444 }
1445 
1446 static int
1447 extern_vhost_pre_msg_handler(int vid, void *_msg)
1448 {
1449 	struct vhost_user_msg *msg = _msg;
1450 	struct spdk_vhost_session *vsession;
1451 	struct spdk_vhost_user_dev *user_dev;
1452 
1453 	vsession = vhost_session_find_by_vid(vid);
1454 	if (vsession == NULL) {
1455 		SPDK_ERRLOG("Received a message to uninitialized session (vid %d).\n", vid);
1456 		assert(false);
1457 		return RTE_VHOST_MSG_RESULT_ERR;
1458 	}
1459 	user_dev = to_user_dev(vsession->vdev);
1460 
1461 	switch (msg->request) {
1462 	case VHOST_USER_GET_VRING_BASE:
1463 		pthread_mutex_lock(&user_dev->lock);
1464 		if (vsession->started) {
1465 			pthread_mutex_unlock(&user_dev->lock);
1466 			/* `stop_device` is running in synchronous, it
1467 			 * will hold this lock again before exiting.
1468 			 */
1469 			g_spdk_vhost_ops.destroy_device(vid);
1470 		}
1471 		pthread_mutex_unlock(&user_dev->lock);
1472 		break;
1473 	case VHOST_USER_GET_CONFIG: {
1474 		int rc = 0;
1475 
1476 		pthread_mutex_lock(&user_dev->lock);
1477 		if (vsession->vdev->backend->vhost_get_config) {
1478 			rc = vsession->vdev->backend->vhost_get_config(vsession->vdev,
1479 					msg->payload.cfg.region, msg->payload.cfg.size);
1480 			if (rc != 0) {
1481 				msg->size = 0;
1482 			}
1483 		}
1484 		pthread_mutex_unlock(&user_dev->lock);
1485 
1486 		return RTE_VHOST_MSG_RESULT_REPLY;
1487 	}
1488 	case VHOST_USER_SET_CONFIG: {
1489 		int rc = 0;
1490 
1491 		pthread_mutex_lock(&user_dev->lock);
1492 		if (vsession->vdev->backend->vhost_set_config) {
1493 			rc = vsession->vdev->backend->vhost_set_config(vsession->vdev,
1494 					msg->payload.cfg.region, msg->payload.cfg.offset,
1495 					msg->payload.cfg.size, msg->payload.cfg.flags);
1496 		}
1497 		pthread_mutex_unlock(&user_dev->lock);
1498 
1499 		return rc == 0 ? RTE_VHOST_MSG_RESULT_OK : RTE_VHOST_MSG_RESULT_ERR;
1500 	}
1501 	default:
1502 		break;
1503 	}
1504 
1505 	return RTE_VHOST_MSG_RESULT_NOT_HANDLED;
1506 }
1507 
1508 static int
1509 extern_vhost_post_msg_handler(int vid, void *_msg)
1510 {
1511 	struct vhost_user_msg *msg = _msg;
1512 	struct spdk_vhost_session *vsession;
1513 	struct spdk_vhost_user_dev *user_dev;
1514 	uint16_t qid;
1515 	int rc;
1516 
1517 	vsession = vhost_session_find_by_vid(vid);
1518 	if (vsession == NULL) {
1519 		SPDK_ERRLOG("Received a message to uninitialized session (vid %d).\n", vid);
1520 		assert(false);
1521 		return RTE_VHOST_MSG_RESULT_ERR;
1522 	}
1523 	user_dev = to_user_dev(vsession->vdev);
1524 
1525 	if (msg->request == VHOST_USER_SET_MEM_TABLE) {
1526 		vhost_register_memtable_if_required(vsession, vid);
1527 	}
1528 
1529 	switch (msg->request) {
1530 	case VHOST_USER_SET_FEATURES:
1531 		rc = vhost_get_negotiated_features(vid, &vsession->negotiated_features);
1532 		if (rc) {
1533 			SPDK_ERRLOG("vhost device %d: Failed to get negotiated driver features\n", vid);
1534 			return RTE_VHOST_MSG_RESULT_ERR;
1535 		}
1536 		break;
1537 	case VHOST_USER_SET_VRING_CALL:
1538 		qid = (uint16_t)msg->payload.u64;
1539 		rc = set_device_vq_callfd(vsession, qid);
1540 		if (rc) {
1541 			return RTE_VHOST_MSG_RESULT_ERR;
1542 		}
1543 		break;
1544 	case VHOST_USER_SET_VRING_KICK:
1545 		qid = (uint16_t)msg->payload.u64;
1546 		rc = enable_device_vq(vsession, qid);
1547 		if (rc) {
1548 			return RTE_VHOST_MSG_RESULT_ERR;
1549 		}
1550 
1551 		/* vhost-user spec tells us to start polling a queue after receiving
1552 		 * its SET_VRING_KICK message. Let's do it!
1553 		 */
1554 		pthread_mutex_lock(&user_dev->lock);
1555 		if (!vsession->started) {
1556 			pthread_mutex_unlock(&user_dev->lock);
1557 			g_spdk_vhost_ops.new_device(vid);
1558 			return RTE_VHOST_MSG_RESULT_NOT_HANDLED;
1559 		}
1560 		pthread_mutex_unlock(&user_dev->lock);
1561 		break;
1562 	default:
1563 		break;
1564 	}
1565 
1566 	return RTE_VHOST_MSG_RESULT_NOT_HANDLED;
1567 }
1568 
1569 struct rte_vhost_user_extern_ops g_spdk_extern_vhost_ops = {
1570 	.pre_msg_handle = extern_vhost_pre_msg_handler,
1571 	.post_msg_handle = extern_vhost_post_msg_handler,
1572 };
1573 
1574 void
1575 vhost_session_install_rte_compat_hooks(struct spdk_vhost_session *vsession)
1576 {
1577 	int rc;
1578 
1579 	rc = rte_vhost_extern_callback_register(vsession->vid, &g_spdk_extern_vhost_ops, NULL);
1580 	if (rc != 0) {
1581 		SPDK_ERRLOG("rte_vhost_extern_callback_register() failed for vid = %d\n",
1582 			    vsession->vid);
1583 		return;
1584 	}
1585 }
1586 
1587 int
1588 vhost_register_unix_socket(const char *path, const char *ctrl_name,
1589 			   uint64_t virtio_features, uint64_t disabled_features, uint64_t protocol_features)
1590 {
1591 	struct stat file_stat;
1592 	uint64_t features = 0;
1593 	uint64_t flags = 0;
1594 
1595 	/* Register vhost driver to handle vhost messages. */
1596 	if (stat(path, &file_stat) != -1) {
1597 		if (!S_ISSOCK(file_stat.st_mode)) {
1598 			SPDK_ERRLOG("Cannot create a domain socket at path \"%s\": "
1599 				    "The file already exists and is not a socket.\n",
1600 				    path);
1601 			return -EIO;
1602 		} else if (unlink(path) != 0) {
1603 			SPDK_ERRLOG("Cannot create a domain socket at path \"%s\": "
1604 				    "The socket already exists and failed to unlink.\n",
1605 				    path);
1606 			return -EIO;
1607 		}
1608 	}
1609 
1610 #if RTE_VERSION < RTE_VERSION_NUM(20, 8, 0, 0)
1611 	if (rte_vhost_driver_register(path, flags) != 0) {
1612 #else
1613 	flags = spdk_iommu_is_enabled() ? 0 : RTE_VHOST_USER_ASYNC_COPY;
1614 	if (rte_vhost_driver_register(path, flags) != 0) {
1615 #endif
1616 		SPDK_ERRLOG("Could not register controller %s with vhost library\n", ctrl_name);
1617 		SPDK_ERRLOG("Check if domain socket %s already exists\n", path);
1618 		return -EIO;
1619 	}
1620 	if (rte_vhost_driver_set_features(path, virtio_features) ||
1621 	    rte_vhost_driver_disable_features(path, disabled_features)) {
1622 		SPDK_ERRLOG("Couldn't set vhost features for controller %s\n", ctrl_name);
1623 
1624 		rte_vhost_driver_unregister(path);
1625 		return -EIO;
1626 	}
1627 
1628 	if (rte_vhost_driver_callback_register(path, &g_spdk_vhost_ops) != 0) {
1629 		rte_vhost_driver_unregister(path);
1630 		SPDK_ERRLOG("Couldn't register callbacks for controller %s\n", ctrl_name);
1631 		return -EIO;
1632 	}
1633 
1634 	rte_vhost_driver_get_protocol_features(path, &features);
1635 	features |= protocol_features;
1636 	rte_vhost_driver_set_protocol_features(path, features);
1637 
1638 	if (rte_vhost_driver_start(path) != 0) {
1639 		SPDK_ERRLOG("Failed to start vhost driver for controller %s (%d): %s\n",
1640 			    ctrl_name, errno, spdk_strerror(errno));
1641 		rte_vhost_driver_unregister(path);
1642 		return -EIO;
1643 	}
1644 
1645 	return 0;
1646 }
1647 
1648 int
1649 vhost_get_mem_table(int vid, struct rte_vhost_memory **mem)
1650 {
1651 	return rte_vhost_get_mem_table(vid, mem);
1652 }
1653 
1654 int
1655 vhost_driver_unregister(const char *path)
1656 {
1657 	return rte_vhost_driver_unregister(path);
1658 }
1659 
1660 int
1661 vhost_get_negotiated_features(int vid, uint64_t *negotiated_features)
1662 {
1663 	return rte_vhost_get_negotiated_features(vid, negotiated_features);
1664 }
1665 
1666 int
1667 vhost_user_dev_set_coalescing(struct spdk_vhost_user_dev *user_dev, uint32_t delay_base_us,
1668 			      uint32_t iops_threshold)
1669 {
1670 	uint64_t delay_time_base = delay_base_us * spdk_get_ticks_hz() / 1000000ULL;
1671 	uint32_t io_rate = iops_threshold * SPDK_VHOST_STATS_CHECK_INTERVAL_MS / 1000U;
1672 
1673 	if (delay_time_base >= UINT32_MAX) {
1674 		SPDK_ERRLOG("Delay time of %"PRIu32" is to big\n", delay_base_us);
1675 		return -EINVAL;
1676 	} else if (io_rate == 0) {
1677 		SPDK_ERRLOG("IOPS rate of %"PRIu32" is too low. Min is %u\n", io_rate,
1678 			    1000U / SPDK_VHOST_STATS_CHECK_INTERVAL_MS);
1679 		return -EINVAL;
1680 	}
1681 
1682 	user_dev->coalescing_delay_us = delay_base_us;
1683 	user_dev->coalescing_iops_threshold = iops_threshold;
1684 	return 0;
1685 }
1686 
1687 int
1688 vhost_user_session_set_coalescing(struct spdk_vhost_dev *vdev,
1689 				  struct spdk_vhost_session *vsession, void *ctx)
1690 {
1691 	vsession->coalescing_delay_time_base =
1692 		to_user_dev(vdev)->coalescing_delay_us * spdk_get_ticks_hz() / 1000000ULL;
1693 	vsession->coalescing_io_rate_threshold =
1694 		to_user_dev(vdev)->coalescing_iops_threshold * SPDK_VHOST_STATS_CHECK_INTERVAL_MS / 1000U;
1695 	return 0;
1696 }
1697 
1698 int
1699 vhost_user_set_coalescing(struct spdk_vhost_dev *vdev, uint32_t delay_base_us,
1700 			  uint32_t iops_threshold)
1701 {
1702 	int rc;
1703 
1704 	rc = vhost_user_dev_set_coalescing(to_user_dev(vdev), delay_base_us, iops_threshold);
1705 	if (rc != 0) {
1706 		return rc;
1707 	}
1708 
1709 	vhost_user_dev_foreach_session(vdev, vhost_user_session_set_coalescing, NULL, NULL);
1710 
1711 	return 0;
1712 }
1713 
1714 void
1715 vhost_user_get_coalescing(struct spdk_vhost_dev *vdev, uint32_t *delay_base_us,
1716 			  uint32_t *iops_threshold)
1717 {
1718 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1719 
1720 	if (delay_base_us) {
1721 		*delay_base_us = user_dev->coalescing_delay_us;
1722 	}
1723 
1724 	if (iops_threshold) {
1725 		*iops_threshold = user_dev->coalescing_iops_threshold;
1726 	}
1727 }
1728 
1729 int
1730 spdk_vhost_set_socket_path(const char *basename)
1731 {
1732 	int ret;
1733 
1734 	if (basename && strlen(basename) > 0) {
1735 		ret = snprintf(g_vhost_user_dev_dirname, sizeof(g_vhost_user_dev_dirname) - 2, "%s", basename);
1736 		if (ret <= 0) {
1737 			return -EINVAL;
1738 		}
1739 		if ((size_t)ret >= sizeof(g_vhost_user_dev_dirname) - 2) {
1740 			SPDK_ERRLOG("Char dev dir path length %d is too long\n", ret);
1741 			return -EINVAL;
1742 		}
1743 
1744 		if (g_vhost_user_dev_dirname[ret - 1] != '/') {
1745 			g_vhost_user_dev_dirname[ret] = '/';
1746 			g_vhost_user_dev_dirname[ret + 1]  = '\0';
1747 		}
1748 	}
1749 
1750 	return 0;
1751 }
1752 
1753 static void
1754 vhost_dev_thread_exit(void *arg1)
1755 {
1756 	spdk_thread_exit(spdk_get_thread());
1757 }
1758 
1759 static bool g_vhost_user_started = false;
1760 
1761 int
1762 vhost_user_dev_register(struct spdk_vhost_dev *vdev, const char *name, struct spdk_cpuset *cpumask,
1763 			const struct spdk_vhost_user_dev_backend *user_backend)
1764 {
1765 	char path[PATH_MAX];
1766 	struct spdk_vhost_user_dev *user_dev;
1767 
1768 	if (snprintf(path, sizeof(path), "%s%s", g_vhost_user_dev_dirname, name) >= (int)sizeof(path)) {
1769 		SPDK_ERRLOG("Resulting socket path for controller %s is too long: %s%s\n",
1770 			    name, g_vhost_user_dev_dirname, name);
1771 		return -EINVAL;
1772 	}
1773 
1774 	vdev->path = strdup(path);
1775 	if (vdev->path == NULL) {
1776 		return -EIO;
1777 	}
1778 
1779 	user_dev = calloc(1, sizeof(*user_dev));
1780 	if (user_dev == NULL) {
1781 		free(vdev->path);
1782 		return -ENOMEM;
1783 	}
1784 	vdev->ctxt = user_dev;
1785 
1786 	vdev->thread = spdk_thread_create(vdev->name, cpumask);
1787 	if (vdev->thread == NULL) {
1788 		free(user_dev);
1789 		free(vdev->path);
1790 		SPDK_ERRLOG("Failed to create thread for vhost controller %s.\n", name);
1791 		return -EIO;
1792 	}
1793 
1794 	user_dev->user_backend = user_backend;
1795 	user_dev->vdev = vdev;
1796 	user_dev->registered = true;
1797 	TAILQ_INIT(&user_dev->vsessions);
1798 	pthread_mutex_init(&user_dev->lock, NULL);
1799 
1800 	vhost_user_dev_set_coalescing(user_dev, SPDK_VHOST_COALESCING_DELAY_BASE_US,
1801 				      SPDK_VHOST_VQ_IOPS_COALESCING_THRESHOLD);
1802 
1803 	if (vhost_register_unix_socket(path, name, vdev->virtio_features, vdev->disabled_features,
1804 				       vdev->protocol_features)) {
1805 		spdk_thread_send_msg(vdev->thread, vhost_dev_thread_exit, NULL);
1806 		pthread_mutex_destroy(&user_dev->lock);
1807 		free(user_dev);
1808 		free(vdev->path);
1809 		return -EIO;
1810 	}
1811 
1812 	return 0;
1813 }
1814 
1815 int
1816 vhost_user_dev_unregister(struct spdk_vhost_dev *vdev)
1817 {
1818 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1819 	struct spdk_vhost_session *vsession, *tmp_vsession;
1820 
1821 	pthread_mutex_lock(&user_dev->lock);
1822 	if (user_dev->pending_async_op_num) {
1823 		pthread_mutex_unlock(&user_dev->lock);
1824 		return -EBUSY;
1825 	}
1826 
1827 	/* This is the case that uses RPC call `vhost_delete_controller` while VM is connected */
1828 	if (!TAILQ_EMPTY(&user_dev->vsessions) && g_vhost_user_started) {
1829 		SPDK_ERRLOG("Controller %s has still valid connection.\n", vdev->name);
1830 		pthread_mutex_unlock(&user_dev->lock);
1831 		return -EBUSY;
1832 	}
1833 
1834 	/* This is the case that quits the subsystem while VM is connected, the VM
1835 	 * should be stopped by the shutdown thread.
1836 	 */
1837 	if (!g_vhost_user_started) {
1838 		TAILQ_FOREACH_SAFE(vsession, &user_dev->vsessions, tailq, tmp_vsession) {
1839 			assert(vsession->started == false);
1840 			TAILQ_REMOVE(&user_dev->vsessions, vsession, tailq);
1841 			if (vsession->mem) {
1842 				vhost_session_mem_unregister(vsession->mem);
1843 				free(vsession->mem);
1844 			}
1845 			free(vsession->name);
1846 			free(vsession);
1847 		}
1848 	}
1849 
1850 	user_dev->registered = false;
1851 	pthread_mutex_unlock(&user_dev->lock);
1852 
1853 	/* There are no valid connections now, and it's not an error if the domain
1854 	 * socket was already removed by shutdown thread.
1855 	 */
1856 	vhost_driver_unregister(vdev->path);
1857 
1858 	spdk_thread_send_msg(vdev->thread, vhost_dev_thread_exit, NULL);
1859 	pthread_mutex_destroy(&user_dev->lock);
1860 
1861 	free(user_dev);
1862 	free(vdev->path);
1863 
1864 	return 0;
1865 }
1866 
1867 int
1868 vhost_user_init(void)
1869 {
1870 	size_t len;
1871 
1872 	if (g_vhost_user_started) {
1873 		return 0;
1874 	}
1875 
1876 	if (g_vhost_user_dev_dirname[0] == '\0') {
1877 		if (getcwd(g_vhost_user_dev_dirname, sizeof(g_vhost_user_dev_dirname) - 1) == NULL) {
1878 			SPDK_ERRLOG("getcwd failed (%d): %s\n", errno, spdk_strerror(errno));
1879 			return -1;
1880 		}
1881 
1882 		len = strlen(g_vhost_user_dev_dirname);
1883 		if (g_vhost_user_dev_dirname[len - 1] != '/') {
1884 			g_vhost_user_dev_dirname[len] = '/';
1885 			g_vhost_user_dev_dirname[len + 1] = '\0';
1886 		}
1887 	}
1888 
1889 	g_vhost_user_started = true;
1890 
1891 	g_vhost_user_init_thread = spdk_get_thread();
1892 	assert(g_vhost_user_init_thread != NULL);
1893 
1894 	return 0;
1895 }
1896 
1897 static void
1898 vhost_user_session_shutdown_on_init(void *vhost_cb)
1899 {
1900 	spdk_vhost_fini_cb fn = vhost_cb;
1901 
1902 	fn();
1903 }
1904 
1905 static void *
1906 vhost_user_session_shutdown(void *vhost_cb)
1907 {
1908 	struct spdk_vhost_dev *vdev = NULL;
1909 	struct spdk_vhost_session *vsession;
1910 	struct spdk_vhost_user_dev *user_dev;
1911 
1912 	for (vdev = spdk_vhost_dev_next(NULL); vdev != NULL;
1913 	     vdev = spdk_vhost_dev_next(vdev)) {
1914 		user_dev = to_user_dev(vdev);
1915 		pthread_mutex_lock(&user_dev->lock);
1916 		TAILQ_FOREACH(vsession, &user_dev->vsessions, tailq) {
1917 			if (vsession->started || vsession->starting) {
1918 				_stop_session(vsession);
1919 			}
1920 		}
1921 		pthread_mutex_unlock(&user_dev->lock);
1922 		vhost_driver_unregister(vdev->path);
1923 	}
1924 
1925 	SPDK_INFOLOG(vhost, "Exiting\n");
1926 	spdk_thread_send_msg(g_vhost_user_init_thread, vhost_user_session_shutdown_on_init, vhost_cb);
1927 	return NULL;
1928 }
1929 
1930 void
1931 vhost_user_fini(spdk_vhost_fini_cb vhost_cb)
1932 {
1933 	pthread_t tid;
1934 	int rc;
1935 
1936 	if (!g_vhost_user_started) {
1937 		vhost_cb();
1938 		return;
1939 	}
1940 
1941 	g_vhost_user_started = false;
1942 
1943 	/* rte_vhost API for removing sockets is not asynchronous. Since it may call SPDK
1944 	 * ops for stopping a device or removing a connection, we need to call it from
1945 	 * a separate thread to avoid deadlock.
1946 	 */
1947 	rc = pthread_create(&tid, NULL, &vhost_user_session_shutdown, vhost_cb);
1948 	if (rc < 0) {
1949 		SPDK_ERRLOG("Failed to start session shutdown thread (%d): %s\n", rc, spdk_strerror(rc));
1950 		abort();
1951 	}
1952 	pthread_detach(tid);
1953 }
1954 
1955 void
1956 vhost_session_info_json(struct spdk_vhost_dev *vdev, struct spdk_json_write_ctx *w)
1957 {
1958 	struct spdk_vhost_session *vsession;
1959 	struct spdk_vhost_user_dev *user_dev;
1960 
1961 	user_dev = to_user_dev(vdev);
1962 	pthread_mutex_lock(&user_dev->lock);
1963 	TAILQ_FOREACH(vsession, &user_dev->vsessions, tailq) {
1964 		spdk_json_write_object_begin(w);
1965 		spdk_json_write_named_uint32(w, "vid", vsession->vid);
1966 		spdk_json_write_named_uint32(w, "id", vsession->id);
1967 		spdk_json_write_named_string(w, "name", vsession->name);
1968 		spdk_json_write_named_bool(w, "started", vsession->started);
1969 		spdk_json_write_named_uint32(w, "max_queues", vsession->max_queues);
1970 		spdk_json_write_named_uint32(w, "inflight_task_cnt", vsession->task_cnt);
1971 		spdk_json_write_object_end(w);
1972 	}
1973 	pthread_mutex_unlock(&user_dev->lock);
1974 }
1975