xref: /spdk/lib/vhost/rte_vhost_user.c (revision a6dbe3721eb3b5990707fc3e378c95e505dd8ab5)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2019 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2021 Mellanox Technologies LTD. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/env.h"
10 #include "spdk/likely.h"
11 #include "spdk/string.h"
12 #include "spdk/util.h"
13 #include "spdk/memory.h"
14 #include "spdk/barrier.h"
15 #include "spdk/vhost.h"
16 #include "vhost_internal.h"
17 #include <rte_version.h>
18 
19 #include "spdk_internal/vhost_user.h"
20 
21 /* Path to folder where character device will be created. Can be set by user. */
22 static char g_vhost_user_dev_dirname[PATH_MAX] = "";
23 
24 static struct spdk_thread *g_vhost_user_init_thread;
25 
26 /**
27  * DPDK calls our callbacks synchronously but the work those callbacks
28  * perform needs to be async. Luckily, all DPDK callbacks are called on
29  * a DPDK-internal pthread, so we'll just wait on a semaphore in there.
30  */
31 static sem_t g_dpdk_sem;
32 
33 /** Return code for the current DPDK callback */
34 static int g_dpdk_response;
35 
36 struct vhost_session_fn_ctx {
37 	/** Device pointer obtained before enqueueing the event */
38 	struct spdk_vhost_dev *vdev;
39 
40 	/** ID of the session to send event to. */
41 	uint32_t vsession_id;
42 
43 	/** User provided function to be executed on session's thread. */
44 	spdk_vhost_session_fn cb_fn;
45 
46 	/**
47 	 * User provided function to be called on the init thread
48 	 * after iterating through all sessions.
49 	 */
50 	spdk_vhost_dev_fn cpl_fn;
51 
52 	/** Custom user context */
53 	void *user_ctx;
54 };
55 
56 static struct spdk_vhost_user_dev *
57 to_user_dev(struct spdk_vhost_dev *vdev)
58 {
59 	assert(vdev != NULL);
60 	return vdev->ctxt;
61 }
62 
63 static void
64 __attribute__((constructor))
65 _vhost_user_sem_init(void)
66 {
67 	if (sem_init(&g_dpdk_sem, 0, 0) != 0) {
68 		SPDK_ERRLOG("Failed to initialize semaphore for rte_vhost pthread.\n");
69 		abort();
70 	}
71 }
72 
73 static void
74 __attribute__((destructor))
75 _vhost_user_sem_destroy(void)
76 {
77 	sem_destroy(&g_dpdk_sem);
78 }
79 
80 void *
81 vhost_gpa_to_vva(struct spdk_vhost_session *vsession, uint64_t addr, uint64_t len)
82 {
83 	void *vva;
84 	uint64_t newlen;
85 
86 	newlen = len;
87 	vva = (void *)rte_vhost_va_from_guest_pa(vsession->mem, addr, &newlen);
88 	if (newlen != len) {
89 		return NULL;
90 	}
91 
92 	return vva;
93 
94 }
95 
96 static void
97 vhost_log_req_desc(struct spdk_vhost_session *vsession, struct spdk_vhost_virtqueue *virtqueue,
98 		   uint16_t req_id)
99 {
100 	struct vring_desc *desc, *desc_table;
101 	uint32_t desc_table_size;
102 	int rc;
103 
104 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
105 		return;
106 	}
107 
108 	rc = vhost_vq_get_desc(vsession, virtqueue, req_id, &desc, &desc_table, &desc_table_size);
109 	if (spdk_unlikely(rc != 0)) {
110 		SPDK_ERRLOG("Can't log used ring descriptors!\n");
111 		return;
112 	}
113 
114 	do {
115 		if (vhost_vring_desc_is_wr(desc)) {
116 			/* To be honest, only pages realy touched should be logged, but
117 			 * doing so would require tracking those changes in each backed.
118 			 * Also backend most likely will touch all/most of those pages so
119 			 * for lets assume we touched all pages passed to as writeable buffers. */
120 			rte_vhost_log_write(vsession->vid, desc->addr, desc->len);
121 		}
122 		vhost_vring_desc_get_next(&desc, desc_table, desc_table_size);
123 	} while (desc);
124 }
125 
126 static void
127 vhost_log_used_vring_elem(struct spdk_vhost_session *vsession,
128 			  struct spdk_vhost_virtqueue *virtqueue,
129 			  uint16_t idx)
130 {
131 	uint64_t offset, len;
132 
133 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
134 		return;
135 	}
136 
137 	if (spdk_unlikely(virtqueue->packed.packed_ring)) {
138 		offset = idx * sizeof(struct vring_packed_desc);
139 		len = sizeof(struct vring_packed_desc);
140 	} else {
141 		offset = offsetof(struct vring_used, ring[idx]);
142 		len = sizeof(virtqueue->vring.used->ring[idx]);
143 	}
144 
145 	rte_vhost_log_used_vring(vsession->vid, virtqueue->vring_idx, offset, len);
146 }
147 
148 static void
149 vhost_log_used_vring_idx(struct spdk_vhost_session *vsession,
150 			 struct spdk_vhost_virtqueue *virtqueue)
151 {
152 	uint64_t offset, len;
153 	uint16_t vq_idx;
154 
155 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
156 		return;
157 	}
158 
159 	offset = offsetof(struct vring_used, idx);
160 	len = sizeof(virtqueue->vring.used->idx);
161 	vq_idx = virtqueue - vsession->virtqueue;
162 
163 	rte_vhost_log_used_vring(vsession->vid, vq_idx, offset, len);
164 }
165 
166 /*
167  * Get available requests from avail ring.
168  */
169 uint16_t
170 vhost_vq_avail_ring_get(struct spdk_vhost_virtqueue *virtqueue, uint16_t *reqs,
171 			uint16_t reqs_len)
172 {
173 	struct rte_vhost_vring *vring = &virtqueue->vring;
174 	struct vring_avail *avail = vring->avail;
175 	uint16_t size_mask = vring->size - 1;
176 	uint16_t last_idx = virtqueue->last_avail_idx, avail_idx = avail->idx;
177 	uint16_t count, i;
178 	int rc;
179 	uint64_t u64_value;
180 
181 	spdk_smp_rmb();
182 
183 	if (virtqueue->vsession && spdk_unlikely(virtqueue->vsession->interrupt_mode)) {
184 		/* Read to clear vring's kickfd */
185 		rc = read(vring->kickfd, &u64_value, sizeof(u64_value));
186 		if (rc < 0) {
187 			SPDK_ERRLOG("failed to acknowledge kickfd: %s.\n", spdk_strerror(errno));
188 			return -errno;
189 		}
190 	}
191 
192 	count = avail_idx - last_idx;
193 	if (spdk_likely(count == 0)) {
194 		return 0;
195 	}
196 
197 	if (spdk_unlikely(count > vring->size)) {
198 		/* TODO: the queue is unrecoverably broken and should be marked so.
199 		 * For now we will fail silently and report there are no new avail entries.
200 		 */
201 		return 0;
202 	}
203 
204 	count = spdk_min(count, reqs_len);
205 
206 	virtqueue->last_avail_idx += count;
207 	/* Check whether there are unprocessed reqs in vq, then kick vq manually */
208 	if (virtqueue->vsession && spdk_unlikely(virtqueue->vsession->interrupt_mode)) {
209 		/* If avail_idx is larger than virtqueue's last_avail_idx, then there is unprocessed reqs.
210 		 * avail_idx should get updated here from memory, in case of race condition with guest.
211 		 */
212 		avail_idx = * (volatile uint16_t *) &avail->idx;
213 		if (avail_idx > virtqueue->last_avail_idx) {
214 			/* Write to notify vring's kickfd */
215 			rc = write(vring->kickfd, &u64_value, sizeof(u64_value));
216 			if (rc < 0) {
217 				SPDK_ERRLOG("failed to kick vring: %s.\n", spdk_strerror(errno));
218 				return -errno;
219 			}
220 		}
221 	}
222 
223 	for (i = 0; i < count; i++) {
224 		reqs[i] = vring->avail->ring[(last_idx + i) & size_mask];
225 	}
226 
227 	SPDK_DEBUGLOG(vhost_ring,
228 		      "AVAIL: last_idx=%"PRIu16" avail_idx=%"PRIu16" count=%"PRIu16"\n",
229 		      last_idx, avail_idx, count);
230 
231 	return count;
232 }
233 
234 static bool
235 vhost_vring_desc_is_indirect(struct vring_desc *cur_desc)
236 {
237 	return !!(cur_desc->flags & VRING_DESC_F_INDIRECT);
238 }
239 
240 static bool
241 vhost_vring_packed_desc_is_indirect(struct vring_packed_desc *cur_desc)
242 {
243 	return (cur_desc->flags & VRING_DESC_F_INDIRECT) != 0;
244 }
245 
246 static bool
247 vhost_inflight_packed_desc_is_indirect(spdk_vhost_inflight_desc *cur_desc)
248 {
249 	return (cur_desc->flags & VRING_DESC_F_INDIRECT) != 0;
250 }
251 
252 int
253 vhost_vq_get_desc(struct spdk_vhost_session *vsession, struct spdk_vhost_virtqueue *virtqueue,
254 		  uint16_t req_idx, struct vring_desc **desc, struct vring_desc **desc_table,
255 		  uint32_t *desc_table_size)
256 {
257 	if (spdk_unlikely(req_idx >= virtqueue->vring.size)) {
258 		return -1;
259 	}
260 
261 	*desc = &virtqueue->vring.desc[req_idx];
262 
263 	if (vhost_vring_desc_is_indirect(*desc)) {
264 		*desc_table_size = (*desc)->len / sizeof(**desc);
265 		*desc_table = vhost_gpa_to_vva(vsession, (*desc)->addr,
266 					       sizeof(**desc) * *desc_table_size);
267 		*desc = *desc_table;
268 		if (*desc == NULL) {
269 			return -1;
270 		}
271 
272 		return 0;
273 	}
274 
275 	*desc_table = virtqueue->vring.desc;
276 	*desc_table_size = virtqueue->vring.size;
277 
278 	return 0;
279 }
280 
281 static bool
282 vhost_packed_desc_indirect_to_desc_table(struct spdk_vhost_session *vsession,
283 		uint64_t addr, uint32_t len,
284 		struct vring_packed_desc **desc_table,
285 		uint32_t *desc_table_size)
286 {
287 	*desc_table_size = len / sizeof(struct vring_packed_desc);
288 
289 	*desc_table = vhost_gpa_to_vva(vsession, addr, len);
290 	if (spdk_unlikely(*desc_table == NULL)) {
291 		return false;
292 	}
293 
294 	return true;
295 }
296 
297 int
298 vhost_vq_get_desc_packed(struct spdk_vhost_session *vsession,
299 			 struct spdk_vhost_virtqueue *virtqueue,
300 			 uint16_t req_idx, struct vring_packed_desc **desc,
301 			 struct vring_packed_desc **desc_table, uint32_t *desc_table_size)
302 {
303 	*desc =  &virtqueue->vring.desc_packed[req_idx];
304 
305 	/* In packed ring when the desc is non-indirect we get next desc
306 	 * by judging (desc->flag & VRING_DESC_F_NEXT) != 0. When the desc
307 	 * is indirect we get next desc by idx and desc_table_size. It's
308 	 * different from split ring.
309 	 */
310 	if (vhost_vring_packed_desc_is_indirect(*desc)) {
311 		if (!vhost_packed_desc_indirect_to_desc_table(vsession, (*desc)->addr, (*desc)->len,
312 				desc_table, desc_table_size)) {
313 			return -1;
314 		}
315 
316 		*desc = *desc_table;
317 	} else {
318 		*desc_table = NULL;
319 		*desc_table_size  = 0;
320 	}
321 
322 	return 0;
323 }
324 
325 int
326 vhost_inflight_queue_get_desc(struct spdk_vhost_session *vsession,
327 			      spdk_vhost_inflight_desc *desc_array,
328 			      uint16_t req_idx, spdk_vhost_inflight_desc **desc,
329 			      struct vring_packed_desc  **desc_table, uint32_t *desc_table_size)
330 {
331 	*desc = &desc_array[req_idx];
332 
333 	if (vhost_inflight_packed_desc_is_indirect(*desc)) {
334 		if (!vhost_packed_desc_indirect_to_desc_table(vsession, (*desc)->addr, (*desc)->len,
335 				desc_table, desc_table_size)) {
336 			return -1;
337 		}
338 
339 		/* This desc is the inflight desc not the packed desc.
340 		 * When set the F_INDIRECT the table entry should be the packed desc
341 		 * so set the inflight desc NULL.
342 		 */
343 		*desc = NULL;
344 	} else {
345 		/* When not set the F_INDIRECT means there is no packed desc table */
346 		*desc_table = NULL;
347 		*desc_table_size = 0;
348 	}
349 
350 	return 0;
351 }
352 
353 int
354 vhost_vq_used_signal(struct spdk_vhost_session *vsession,
355 		     struct spdk_vhost_virtqueue *virtqueue)
356 {
357 	if (virtqueue->used_req_cnt == 0) {
358 		return 0;
359 	}
360 
361 	SPDK_DEBUGLOG(vhost_ring,
362 		      "Queue %td - USED RING: sending IRQ: last used %"PRIu16"\n",
363 		      virtqueue - vsession->virtqueue, virtqueue->last_used_idx);
364 
365 	if (rte_vhost_vring_call(vsession->vid, virtqueue->vring_idx) == 0) {
366 		/* interrupt signalled */
367 		virtqueue->req_cnt += virtqueue->used_req_cnt;
368 		virtqueue->used_req_cnt = 0;
369 		return 1;
370 	} else {
371 		/* interrupt not signalled */
372 		return 0;
373 	}
374 }
375 
376 static void
377 session_vq_io_stats_update(struct spdk_vhost_session *vsession,
378 			   struct spdk_vhost_virtqueue *virtqueue, uint64_t now)
379 {
380 	uint32_t irq_delay_base = vsession->coalescing_delay_time_base;
381 	uint32_t io_threshold = vsession->coalescing_io_rate_threshold;
382 	int32_t irq_delay;
383 	uint32_t req_cnt;
384 
385 	req_cnt = virtqueue->req_cnt + virtqueue->used_req_cnt;
386 	if (req_cnt <= io_threshold) {
387 		return;
388 	}
389 
390 	irq_delay = (irq_delay_base * (req_cnt - io_threshold)) / io_threshold;
391 	virtqueue->irq_delay_time = (uint32_t) spdk_max(0, irq_delay);
392 
393 	virtqueue->req_cnt = 0;
394 	virtqueue->next_event_time = now;
395 }
396 
397 static void
398 check_session_vq_io_stats(struct spdk_vhost_session *vsession,
399 			  struct spdk_vhost_virtqueue *virtqueue, uint64_t now)
400 {
401 	if (now < vsession->next_stats_check_time) {
402 		return;
403 	}
404 
405 	vsession->next_stats_check_time = now + vsession->stats_check_interval;
406 	session_vq_io_stats_update(vsession, virtqueue, now);
407 }
408 
409 static inline bool
410 vhost_vq_event_is_suppressed(struct spdk_vhost_virtqueue *vq)
411 {
412 	if (spdk_unlikely(vq->packed.packed_ring)) {
413 		if (vq->vring.driver_event->flags & VRING_PACKED_EVENT_FLAG_DISABLE) {
414 			return true;
415 		}
416 	} else {
417 		if (vq->vring.avail->flags & VRING_AVAIL_F_NO_INTERRUPT) {
418 			return true;
419 		}
420 	}
421 
422 	return false;
423 }
424 
425 void
426 vhost_session_vq_used_signal(struct spdk_vhost_virtqueue *virtqueue)
427 {
428 	struct spdk_vhost_session *vsession = virtqueue->vsession;
429 	uint64_t now;
430 
431 	if (vsession->coalescing_delay_time_base == 0) {
432 		if (virtqueue->vring.desc == NULL) {
433 			return;
434 		}
435 
436 		if (vhost_vq_event_is_suppressed(virtqueue)) {
437 			return;
438 		}
439 
440 		vhost_vq_used_signal(vsession, virtqueue);
441 	} else {
442 		now = spdk_get_ticks();
443 		check_session_vq_io_stats(vsession, virtqueue, now);
444 
445 		/* No need for event right now */
446 		if (now < virtqueue->next_event_time) {
447 			return;
448 		}
449 
450 		if (vhost_vq_event_is_suppressed(virtqueue)) {
451 			return;
452 		}
453 
454 		if (!vhost_vq_used_signal(vsession, virtqueue)) {
455 			return;
456 		}
457 
458 		/* Syscall is quite long so update time */
459 		now = spdk_get_ticks();
460 		virtqueue->next_event_time = now + virtqueue->irq_delay_time;
461 	}
462 }
463 
464 /*
465  * Enqueue id and len to used ring.
466  */
467 void
468 vhost_vq_used_ring_enqueue(struct spdk_vhost_session *vsession,
469 			   struct spdk_vhost_virtqueue *virtqueue,
470 			   uint16_t id, uint32_t len)
471 {
472 	struct rte_vhost_vring *vring = &virtqueue->vring;
473 	struct vring_used *used = vring->used;
474 	uint16_t last_idx = virtqueue->last_used_idx & (vring->size - 1);
475 	uint16_t vq_idx = virtqueue->vring_idx;
476 
477 	SPDK_DEBUGLOG(vhost_ring,
478 		      "Queue %td - USED RING: last_idx=%"PRIu16" req id=%"PRIu16" len=%"PRIu32"\n",
479 		      virtqueue - vsession->virtqueue, virtqueue->last_used_idx, id, len);
480 
481 	vhost_log_req_desc(vsession, virtqueue, id);
482 
483 	virtqueue->last_used_idx++;
484 	used->ring[last_idx].id = id;
485 	used->ring[last_idx].len = len;
486 
487 	/* Ensure the used ring is updated before we log it or increment used->idx. */
488 	spdk_smp_wmb();
489 
490 	rte_vhost_set_last_inflight_io_split(vsession->vid, vq_idx, id);
491 
492 	vhost_log_used_vring_elem(vsession, virtqueue, last_idx);
493 	* (volatile uint16_t *) &used->idx = virtqueue->last_used_idx;
494 	vhost_log_used_vring_idx(vsession, virtqueue);
495 
496 	rte_vhost_clr_inflight_desc_split(vsession->vid, vq_idx, virtqueue->last_used_idx, id);
497 
498 	virtqueue->used_req_cnt++;
499 
500 	if (vsession->interrupt_mode) {
501 		if (virtqueue->vring.desc == NULL || vhost_vq_event_is_suppressed(virtqueue)) {
502 			return;
503 		}
504 
505 		vhost_vq_used_signal(vsession, virtqueue);
506 	}
507 }
508 
509 void
510 vhost_vq_packed_ring_enqueue(struct spdk_vhost_session *vsession,
511 			     struct spdk_vhost_virtqueue *virtqueue,
512 			     uint16_t num_descs, uint16_t buffer_id,
513 			     uint32_t length, uint16_t inflight_head)
514 {
515 	struct vring_packed_desc *desc = &virtqueue->vring.desc_packed[virtqueue->last_used_idx];
516 	bool used, avail;
517 
518 	SPDK_DEBUGLOG(vhost_ring,
519 		      "Queue %td - RING: buffer_id=%"PRIu16"\n",
520 		      virtqueue - vsession->virtqueue, buffer_id);
521 
522 	/* When the descriptor is used, two flags in descriptor
523 	 * avail flag and used flag are set to equal
524 	 * and used flag value == used_wrap_counter.
525 	 */
526 	used = !!(desc->flags & VRING_DESC_F_USED);
527 	avail = !!(desc->flags & VRING_DESC_F_AVAIL);
528 	if (spdk_unlikely(used == virtqueue->packed.used_phase && used == avail)) {
529 		SPDK_ERRLOG("descriptor has been used before\n");
530 		return;
531 	}
532 
533 	/* In used desc addr is unused and len specifies the buffer length
534 	 * that has been written to by the device.
535 	 */
536 	desc->addr = 0;
537 	desc->len = length;
538 
539 	/* This bit specifies whether any data has been written by the device */
540 	if (length != 0) {
541 		desc->flags |= VRING_DESC_F_WRITE;
542 	}
543 
544 	/* Buffer ID is included in the last descriptor in the list.
545 	 * The driver needs to keep track of the size of the list corresponding
546 	 * to each buffer ID.
547 	 */
548 	desc->id = buffer_id;
549 
550 	/* A device MUST NOT make the descriptor used before buffer_id is
551 	 * written to the descriptor.
552 	 */
553 	spdk_smp_wmb();
554 
555 	rte_vhost_set_last_inflight_io_packed(vsession->vid, virtqueue->vring_idx, inflight_head);
556 	/* To mark a desc as used, the device sets the F_USED bit in flags to match
557 	 * the internal Device ring wrap counter. It also sets the F_AVAIL bit to
558 	 * match the same value.
559 	 */
560 	if (virtqueue->packed.used_phase) {
561 		desc->flags |= VRING_DESC_F_AVAIL_USED;
562 	} else {
563 		desc->flags &= ~VRING_DESC_F_AVAIL_USED;
564 	}
565 	rte_vhost_clr_inflight_desc_packed(vsession->vid, virtqueue->vring_idx, inflight_head);
566 
567 	vhost_log_used_vring_elem(vsession, virtqueue, virtqueue->last_used_idx);
568 	virtqueue->last_used_idx += num_descs;
569 	if (virtqueue->last_used_idx >= virtqueue->vring.size) {
570 		virtqueue->last_used_idx -= virtqueue->vring.size;
571 		virtqueue->packed.used_phase = !virtqueue->packed.used_phase;
572 	}
573 
574 	virtqueue->used_req_cnt++;
575 }
576 
577 bool
578 vhost_vq_packed_ring_is_avail(struct spdk_vhost_virtqueue *virtqueue)
579 {
580 	uint16_t flags = virtqueue->vring.desc_packed[virtqueue->last_avail_idx].flags;
581 
582 	/* To mark a desc as available, the driver sets the F_AVAIL bit in flags
583 	 * to match the internal avail wrap counter. It also sets the F_USED bit to
584 	 * match the inverse value but it's not mandatory.
585 	 */
586 	return (!!(flags & VRING_DESC_F_AVAIL) == virtqueue->packed.avail_phase);
587 }
588 
589 bool
590 vhost_vring_packed_desc_is_wr(struct vring_packed_desc *cur_desc)
591 {
592 	return (cur_desc->flags & VRING_DESC_F_WRITE) != 0;
593 }
594 
595 bool
596 vhost_vring_inflight_desc_is_wr(spdk_vhost_inflight_desc *cur_desc)
597 {
598 	return (cur_desc->flags & VRING_DESC_F_WRITE) != 0;
599 }
600 
601 int
602 vhost_vring_packed_desc_get_next(struct vring_packed_desc **desc, uint16_t *req_idx,
603 				 struct spdk_vhost_virtqueue *vq,
604 				 struct vring_packed_desc *desc_table,
605 				 uint32_t desc_table_size)
606 {
607 	if (desc_table != NULL) {
608 		/* When the desc_table isn't NULL means it's indirect and we get the next
609 		 * desc by req_idx and desc_table_size. The return value is NULL means
610 		 * we reach the last desc of this request.
611 		 */
612 		(*req_idx)++;
613 		if (*req_idx < desc_table_size) {
614 			*desc = &desc_table[*req_idx];
615 		} else {
616 			*desc = NULL;
617 		}
618 	} else {
619 		/* When the desc_table is NULL means it's non-indirect and we get the next
620 		 * desc by req_idx and F_NEXT in flags. The return value is NULL means
621 		 * we reach the last desc of this request. When return new desc
622 		 * we update the req_idx too.
623 		 */
624 		if (((*desc)->flags & VRING_DESC_F_NEXT) == 0) {
625 			*desc = NULL;
626 			return 0;
627 		}
628 
629 		*req_idx = (*req_idx + 1) % vq->vring.size;
630 		*desc = &vq->vring.desc_packed[*req_idx];
631 	}
632 
633 	return 0;
634 }
635 
636 static int
637 vhost_vring_desc_payload_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
638 				uint16_t *iov_index, uintptr_t payload, uint64_t remaining)
639 {
640 	uintptr_t vva;
641 	uint64_t len;
642 
643 	do {
644 		if (*iov_index >= SPDK_VHOST_IOVS_MAX) {
645 			SPDK_ERRLOG("SPDK_VHOST_IOVS_MAX(%d) reached\n", SPDK_VHOST_IOVS_MAX);
646 			return -1;
647 		}
648 		len = remaining;
649 		vva = (uintptr_t)rte_vhost_va_from_guest_pa(vsession->mem, payload, &len);
650 		if (vva == 0 || len == 0) {
651 			SPDK_ERRLOG("gpa_to_vva(%p) == NULL\n", (void *)payload);
652 			return -1;
653 		}
654 		iov[*iov_index].iov_base = (void *)vva;
655 		iov[*iov_index].iov_len = len;
656 		remaining -= len;
657 		payload += len;
658 		(*iov_index)++;
659 	} while (remaining);
660 
661 	return 0;
662 }
663 
664 int
665 vhost_vring_packed_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
666 			       uint16_t *iov_index, const struct vring_packed_desc *desc)
667 {
668 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
669 					       desc->addr, desc->len);
670 }
671 
672 int
673 vhost_vring_inflight_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
674 				 uint16_t *iov_index, const spdk_vhost_inflight_desc *desc)
675 {
676 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
677 					       desc->addr, desc->len);
678 }
679 
680 /* 1, Traverse the desc chain to get the buffer_id and return buffer_id as task_idx.
681  * 2, Update the vq->last_avail_idx to point next available desc chain.
682  * 3, Update the avail_wrap_counter if last_avail_idx overturn.
683  */
684 uint16_t
685 vhost_vring_packed_desc_get_buffer_id(struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
686 				      uint16_t *num_descs)
687 {
688 	struct vring_packed_desc *desc;
689 	uint16_t desc_head = req_idx;
690 
691 	*num_descs = 1;
692 
693 	desc =  &vq->vring.desc_packed[req_idx];
694 	if (!vhost_vring_packed_desc_is_indirect(desc)) {
695 		while ((desc->flags & VRING_DESC_F_NEXT) != 0) {
696 			req_idx = (req_idx + 1) % vq->vring.size;
697 			desc = &vq->vring.desc_packed[req_idx];
698 			(*num_descs)++;
699 		}
700 	}
701 
702 	/* Queue Size doesn't have to be a power of 2
703 	 * Device maintains last_avail_idx so we can make sure
704 	 * the value is valid(0 ~ vring.size - 1)
705 	 */
706 	vq->last_avail_idx = (req_idx + 1) % vq->vring.size;
707 	if (vq->last_avail_idx < desc_head) {
708 		vq->packed.avail_phase = !vq->packed.avail_phase;
709 	}
710 
711 	return desc->id;
712 }
713 
714 int
715 vhost_vring_desc_get_next(struct vring_desc **desc,
716 			  struct vring_desc *desc_table, uint32_t desc_table_size)
717 {
718 	struct vring_desc *old_desc = *desc;
719 	uint16_t next_idx;
720 
721 	if ((old_desc->flags & VRING_DESC_F_NEXT) == 0) {
722 		*desc = NULL;
723 		return 0;
724 	}
725 
726 	next_idx = old_desc->next;
727 	if (spdk_unlikely(next_idx >= desc_table_size)) {
728 		*desc = NULL;
729 		return -1;
730 	}
731 
732 	*desc = &desc_table[next_idx];
733 	return 0;
734 }
735 
736 int
737 vhost_vring_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
738 			uint16_t *iov_index, const struct vring_desc *desc)
739 {
740 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
741 					       desc->addr, desc->len);
742 }
743 
744 static inline void
745 vhost_session_mem_region_calc(uint64_t *previous_start, uint64_t *start, uint64_t *end,
746 			      uint64_t *len, struct rte_vhost_mem_region *region)
747 {
748 	*start = FLOOR_2MB(region->mmap_addr);
749 	*end = CEIL_2MB(region->mmap_addr + region->mmap_size);
750 	if (*start == *previous_start) {
751 		*start += (size_t) VALUE_2MB;
752 	}
753 	*previous_start = *start;
754 	*len = *end - *start;
755 }
756 
757 void
758 vhost_session_mem_register(struct rte_vhost_memory *mem)
759 {
760 	uint64_t start, end, len;
761 	uint32_t i;
762 	uint64_t previous_start = UINT64_MAX;
763 
764 
765 	for (i = 0; i < mem->nregions; i++) {
766 		vhost_session_mem_region_calc(&previous_start, &start, &end, &len, &mem->regions[i]);
767 		SPDK_INFOLOG(vhost, "Registering VM memory for vtophys translation - 0x%jx len:0x%jx\n",
768 			     start, len);
769 
770 		if (spdk_mem_register((void *)start, len) != 0) {
771 			SPDK_WARNLOG("Failed to register memory region %"PRIu32". Future vtophys translation might fail.\n",
772 				     i);
773 			continue;
774 		}
775 	}
776 }
777 
778 void
779 vhost_session_mem_unregister(struct rte_vhost_memory *mem)
780 {
781 	uint64_t start, end, len;
782 	uint32_t i;
783 	uint64_t previous_start = UINT64_MAX;
784 
785 	for (i = 0; i < mem->nregions; i++) {
786 		vhost_session_mem_region_calc(&previous_start, &start, &end, &len, &mem->regions[i]);
787 		if (spdk_vtophys((void *) start, NULL) == SPDK_VTOPHYS_ERROR) {
788 			continue; /* region has not been registered */
789 		}
790 
791 		if (spdk_mem_unregister((void *)start, len) != 0) {
792 			assert(false);
793 		}
794 	}
795 }
796 
797 static bool
798 vhost_memory_changed(struct rte_vhost_memory *new,
799 		     struct rte_vhost_memory *old)
800 {
801 	uint32_t i;
802 
803 	if (new->nregions != old->nregions) {
804 		return true;
805 	}
806 
807 	for (i = 0; i < new->nregions; ++i) {
808 		struct rte_vhost_mem_region *new_r = &new->regions[i];
809 		struct rte_vhost_mem_region *old_r = &old->regions[i];
810 
811 		if (new_r->guest_phys_addr != old_r->guest_phys_addr) {
812 			return true;
813 		}
814 		if (new_r->size != old_r->size) {
815 			return true;
816 		}
817 		if (new_r->guest_user_addr != old_r->guest_user_addr) {
818 			return true;
819 		}
820 		if (new_r->mmap_addr != old_r->mmap_addr) {
821 			return true;
822 		}
823 		if (new_r->fd != old_r->fd) {
824 			return true;
825 		}
826 	}
827 
828 	return false;
829 }
830 
831 static int
832 vhost_register_memtable_if_required(struct spdk_vhost_session *vsession, int vid)
833 {
834 	struct rte_vhost_memory *new_mem;
835 
836 	if (vhost_get_mem_table(vid, &new_mem) != 0) {
837 		SPDK_ERRLOG("vhost device %d: Failed to get guest memory table\n", vid);
838 		return -1;
839 	}
840 
841 	if (vsession->mem == NULL) {
842 		SPDK_INFOLOG(vhost, "Start to set memtable\n");
843 		vsession->mem = new_mem;
844 		vhost_session_mem_register(vsession->mem);
845 		return 0;
846 	}
847 
848 	if (vhost_memory_changed(new_mem, vsession->mem)) {
849 		SPDK_INFOLOG(vhost, "Memtable is changed\n");
850 		vhost_session_mem_unregister(vsession->mem);
851 		free(vsession->mem);
852 
853 		vsession->mem = new_mem;
854 		vhost_session_mem_register(vsession->mem);
855 		return 0;
856 
857 	}
858 
859 	SPDK_INFOLOG(vhost, "Memtable is unchanged\n");
860 	free(new_mem);
861 	return 0;
862 }
863 
864 static int
865 _stop_session(struct spdk_vhost_session *vsession)
866 {
867 	struct spdk_vhost_dev *vdev = vsession->vdev;
868 	struct spdk_vhost_user_dev *user_vdev = to_user_dev(vdev);
869 	struct spdk_vhost_virtqueue *q;
870 	int rc;
871 	uint16_t i;
872 
873 	rc = user_vdev->user_backend->stop_session(vsession);
874 	if (rc != 0) {
875 		SPDK_ERRLOG("Couldn't stop device with vid %d.\n", vsession->vid);
876 		return rc;
877 	}
878 
879 	for (i = 0; i < vsession->max_queues; i++) {
880 		q = &vsession->virtqueue[i];
881 
882 		/* vring.desc and vring.desc_packed are in a union struct
883 		 * so q->vring.desc can replace q->vring.desc_packed.
884 		 */
885 		if (q->vring.desc == NULL) {
886 			continue;
887 		}
888 
889 		/* Packed virtqueues support up to 2^15 entries each
890 		 * so left one bit can be used as wrap counter.
891 		 */
892 		if (q->packed.packed_ring) {
893 			q->last_avail_idx = q->last_avail_idx |
894 					    ((uint16_t)q->packed.avail_phase << 15);
895 			q->last_used_idx = q->last_used_idx |
896 					   ((uint16_t)q->packed.used_phase << 15);
897 		}
898 
899 		rte_vhost_set_vring_base(vsession->vid, i, q->last_avail_idx, q->last_used_idx);
900 		q->vring.desc = NULL;
901 	}
902 	vsession->max_queues = 0;
903 
904 	return 0;
905 }
906 
907 static int
908 new_connection(int vid)
909 {
910 	struct spdk_vhost_dev *vdev;
911 	struct spdk_vhost_user_dev *user_dev;
912 	struct spdk_vhost_session *vsession;
913 	size_t dev_dirname_len;
914 	char ifname[PATH_MAX];
915 	char *ctrlr_name;
916 
917 	if (rte_vhost_get_ifname(vid, ifname, PATH_MAX) < 0) {
918 		SPDK_ERRLOG("Couldn't get a valid ifname for device with vid %d\n", vid);
919 		return -1;
920 	}
921 
922 	spdk_vhost_lock();
923 
924 	ctrlr_name = &ifname[0];
925 	dev_dirname_len = strlen(g_vhost_user_dev_dirname);
926 	if (strncmp(ctrlr_name, g_vhost_user_dev_dirname, dev_dirname_len) == 0) {
927 		ctrlr_name += dev_dirname_len;
928 	}
929 
930 	vdev = spdk_vhost_dev_find(ctrlr_name);
931 	if (vdev == NULL) {
932 		SPDK_ERRLOG("Couldn't find device with vid %d to create connection for.\n", vid);
933 		spdk_vhost_unlock();
934 		return -1;
935 	}
936 	user_dev = to_user_dev(vdev);
937 
938 	/* We expect sessions inside user_dev->vsessions to be sorted in ascending
939 	 * order in regard of vsession->id. For now we always set id = vsessions_cnt++
940 	 * and append each session to the very end of the vsessions list.
941 	 * This is required for vhost_user_dev_foreach_session() to work.
942 	 */
943 	if (user_dev->vsessions_num == UINT_MAX) {
944 		assert(false);
945 		return -EINVAL;
946 	}
947 
948 	if (posix_memalign((void **)&vsession, SPDK_CACHE_LINE_SIZE, sizeof(*vsession) +
949 			   user_dev->user_backend->session_ctx_size)) {
950 		SPDK_ERRLOG("vsession alloc failed\n");
951 		spdk_vhost_unlock();
952 		return -1;
953 	}
954 	memset(vsession, 0, sizeof(*vsession) + user_dev->user_backend->session_ctx_size);
955 
956 	vsession->vdev = vdev;
957 	vsession->vid = vid;
958 	vsession->id = user_dev->vsessions_num++;
959 	vsession->name = spdk_sprintf_alloc("%ss%u", vdev->name, vsession->vid);
960 	if (vsession->name == NULL) {
961 		SPDK_ERRLOG("vsession alloc failed\n");
962 		spdk_vhost_unlock();
963 		free(vsession);
964 		return -1;
965 	}
966 	vsession->started = false;
967 	vsession->next_stats_check_time = 0;
968 	vsession->stats_check_interval = SPDK_VHOST_STATS_CHECK_INTERVAL_MS *
969 					 spdk_get_ticks_hz() / 1000UL;
970 	TAILQ_INSERT_TAIL(&user_dev->vsessions, vsession, tailq);
971 
972 	vhost_session_install_rte_compat_hooks(vsession);
973 	spdk_vhost_unlock();
974 	return 0;
975 }
976 
977 static void
978 vhost_user_session_start(void *arg1)
979 {
980 	struct spdk_vhost_session *vsession = arg1;
981 	struct spdk_vhost_dev *vdev = vsession->vdev;
982 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vsession->vdev);
983 	const struct spdk_vhost_user_dev_backend *backend;
984 	int rc;
985 
986 	backend = user_dev->user_backend;
987 	rc = backend->start_session(vdev, vsession, NULL);
988 	if (rc == 0) {
989 		vsession->started = true;
990 
991 		assert(user_dev->active_session_num < UINT32_MAX);
992 		user_dev->active_session_num++;
993 	}
994 }
995 
996 static int
997 set_device_vq_callfd(struct spdk_vhost_session *vsession, uint16_t qid)
998 {
999 	struct spdk_vhost_virtqueue *q;
1000 
1001 	if (qid >= SPDK_VHOST_MAX_VQUEUES) {
1002 		return -EINVAL;
1003 	}
1004 
1005 	q = &vsession->virtqueue[qid];
1006 	/* vq isn't enabled yet */
1007 	if (q->vring_idx != qid) {
1008 		return 0;
1009 	}
1010 
1011 	/* vring.desc and vring.desc_packed are in a union struct
1012 	 * so q->vring.desc can replace q->vring.desc_packed.
1013 	 */
1014 	if (q->vring.desc == NULL || q->vring.size == 0) {
1015 		return 0;
1016 	}
1017 
1018 	/*
1019 	 * Not sure right now but this look like some kind of QEMU bug and guest IO
1020 	 * might be frozed without kicking all queues after live-migration. This look like
1021 	 * the previous vhost instance failed to effectively deliver all interrupts before
1022 	 * the GET_VRING_BASE message. This shouldn't harm guest since spurious interrupts
1023 	 * should be ignored by guest virtio driver.
1024 	 *
1025 	 * Tested on QEMU 2.10.91 and 2.11.50.
1026 	 *
1027 	 * Make sure a successful call of
1028 	 * `rte_vhost_vring_call` will happen
1029 	 * after starting the device.
1030 	 */
1031 	q->used_req_cnt += 1;
1032 
1033 	return 0;
1034 }
1035 
1036 static int
1037 enable_device_vq(struct spdk_vhost_session *vsession, uint16_t qid)
1038 {
1039 	struct spdk_vhost_virtqueue *q;
1040 	bool packed_ring;
1041 	const struct spdk_vhost_user_dev_backend *backend;
1042 	int rc;
1043 
1044 	if (qid >= SPDK_VHOST_MAX_VQUEUES) {
1045 		return -EINVAL;
1046 	}
1047 
1048 	q = &vsession->virtqueue[qid];
1049 	memset(q, 0, sizeof(*q));
1050 	packed_ring = ((vsession->negotiated_features & (1ULL << VIRTIO_F_RING_PACKED)) != 0);
1051 
1052 	q->vsession = vsession;
1053 	q->vring_idx = -1;
1054 	if (rte_vhost_get_vhost_vring(vsession->vid, qid, &q->vring)) {
1055 		return 0;
1056 	}
1057 	q->vring_idx = qid;
1058 	rte_vhost_get_vhost_ring_inflight(vsession->vid, qid, &q->vring_inflight);
1059 
1060 	/* vring.desc and vring.desc_packed are in a union struct
1061 	 * so q->vring.desc can replace q->vring.desc_packed.
1062 	 */
1063 	if (q->vring.desc == NULL || q->vring.size == 0) {
1064 		return 0;
1065 	}
1066 
1067 	if (rte_vhost_get_vring_base(vsession->vid, qid, &q->last_avail_idx, &q->last_used_idx)) {
1068 		q->vring.desc = NULL;
1069 		return 0;
1070 	}
1071 
1072 	backend = to_user_dev(vsession->vdev)->user_backend;
1073 	rc = backend->alloc_vq_tasks(vsession, qid);
1074 	if (rc) {
1075 		return rc;
1076 	}
1077 
1078 	if (packed_ring) {
1079 		/* Use the inflight mem to restore the last_avail_idx and last_used_idx.
1080 		 * When the vring format is packed, there is no used_idx in the
1081 		 * used ring, so VM can't resend the used_idx to VHOST when reconnect.
1082 		 * QEMU version 5.2.0 supports the packed inflight before that it only
1083 		 * supports split ring inflight because it doesn't send negotiated features
1084 		 * before get inflight fd. Users can use RPC to enable this function.
1085 		 */
1086 		if (spdk_unlikely(vsession->vdev->packed_ring_recovery)) {
1087 			rte_vhost_get_vring_base_from_inflight(vsession->vid, qid,
1088 							       &q->last_avail_idx,
1089 							       &q->last_used_idx);
1090 		}
1091 
1092 		/* Packed virtqueues support up to 2^15 entries each
1093 		 * so left one bit can be used as wrap counter.
1094 		 */
1095 		q->packed.avail_phase = q->last_avail_idx >> 15;
1096 		q->last_avail_idx = q->last_avail_idx & 0x7FFF;
1097 		q->packed.used_phase = q->last_used_idx >> 15;
1098 		q->last_used_idx = q->last_used_idx & 0x7FFF;
1099 
1100 		if (!vsession->interrupt_mode) {
1101 			/* Disable I/O submission notifications, we'll be polling. */
1102 			q->vring.device_event->flags = VRING_PACKED_EVENT_FLAG_DISABLE;
1103 		}
1104 	} else {
1105 		if (!vsession->interrupt_mode) {
1106 			/* Disable I/O submission notifications, we'll be polling. */
1107 			q->vring.used->flags = VRING_USED_F_NO_NOTIFY;
1108 		}
1109 	}
1110 
1111 	q->packed.packed_ring = packed_ring;
1112 	vsession->max_queues = spdk_max(vsession->max_queues, qid + 1);
1113 
1114 	return 0;
1115 }
1116 
1117 static int
1118 start_device(int vid)
1119 {
1120 	struct spdk_vhost_dev *vdev;
1121 	struct spdk_vhost_session *vsession;
1122 	int rc = 0;
1123 
1124 	spdk_vhost_lock();
1125 
1126 	vsession = vhost_session_find_by_vid(vid);
1127 	if (vsession == NULL) {
1128 		rc = -1;
1129 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1130 		goto out;
1131 	}
1132 
1133 	if (vsession->started) {
1134 		/* already started, nothing to do */
1135 		goto out;
1136 	}
1137 
1138 	if (!vsession->mem) {
1139 		rc = -1;
1140 		SPDK_ERRLOG("Session %s doesn't set memory table yet\n", vsession->name);
1141 		goto out;
1142 	}
1143 
1144 	vdev = vsession->vdev;
1145 	vhost_user_session_set_coalescing(vdev, vsession, NULL);
1146 	spdk_thread_send_msg(vdev->thread, vhost_user_session_start, vsession);
1147 
1148 out:
1149 	spdk_vhost_unlock();
1150 	return rc;
1151 }
1152 
1153 static void
1154 stop_device(int vid)
1155 {
1156 	struct spdk_vhost_session *vsession;
1157 
1158 	spdk_vhost_lock();
1159 	vsession = vhost_session_find_by_vid(vid);
1160 	if (vsession == NULL) {
1161 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1162 		spdk_vhost_unlock();
1163 		return;
1164 	}
1165 
1166 	if (!vsession->started) {
1167 		/* already stopped, nothing to do */
1168 		spdk_vhost_unlock();
1169 		return;
1170 	}
1171 
1172 	_stop_session(vsession);
1173 	spdk_vhost_unlock();
1174 
1175 	return;
1176 }
1177 
1178 static void
1179 destroy_connection(int vid)
1180 {
1181 	struct spdk_vhost_session *vsession;
1182 
1183 	spdk_vhost_lock();
1184 	vsession = vhost_session_find_by_vid(vid);
1185 	if (vsession == NULL) {
1186 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1187 		spdk_vhost_unlock();
1188 		return;
1189 	}
1190 
1191 	if (vsession->started) {
1192 		if (_stop_session(vsession) != 0) {
1193 			spdk_vhost_unlock();
1194 			return;
1195 		}
1196 	}
1197 
1198 	if (vsession->mem) {
1199 		vhost_session_mem_unregister(vsession->mem);
1200 		free(vsession->mem);
1201 	}
1202 
1203 	TAILQ_REMOVE(&to_user_dev(vsession->vdev)->vsessions, vsession, tailq);
1204 	free(vsession->name);
1205 	free(vsession);
1206 	spdk_vhost_unlock();
1207 }
1208 
1209 #if RTE_VERSION >= RTE_VERSION_NUM(21, 11, 0, 0)
1210 static const struct rte_vhost_device_ops g_spdk_vhost_ops = {
1211 #else
1212 static const struct vhost_device_ops g_spdk_vhost_ops = {
1213 #endif
1214 	.new_device =  start_device,
1215 	.destroy_device = stop_device,
1216 	.new_connection = new_connection,
1217 	.destroy_connection = destroy_connection,
1218 };
1219 
1220 static struct spdk_vhost_session *
1221 vhost_session_find_by_id(struct spdk_vhost_dev *vdev, unsigned id)
1222 {
1223 	struct spdk_vhost_session *vsession;
1224 
1225 	TAILQ_FOREACH(vsession, &to_user_dev(vdev)->vsessions, tailq) {
1226 		if (vsession->id == id) {
1227 			return vsession;
1228 		}
1229 	}
1230 
1231 	return NULL;
1232 }
1233 
1234 struct spdk_vhost_session *
1235 vhost_session_find_by_vid(int vid)
1236 {
1237 	struct spdk_vhost_dev *vdev;
1238 	struct spdk_vhost_session *vsession;
1239 
1240 	for (vdev = spdk_vhost_dev_next(NULL); vdev != NULL;
1241 	     vdev = spdk_vhost_dev_next(vdev)) {
1242 		TAILQ_FOREACH(vsession, &to_user_dev(vdev)->vsessions, tailq) {
1243 			if (vsession->vid == vid) {
1244 				return vsession;
1245 			}
1246 		}
1247 	}
1248 
1249 	return NULL;
1250 }
1251 
1252 static void
1253 wait_for_semaphore(int timeout_sec, const char *errmsg)
1254 {
1255 	struct timespec timeout;
1256 	int rc;
1257 
1258 	clock_gettime(CLOCK_REALTIME, &timeout);
1259 	timeout.tv_sec += timeout_sec;
1260 	rc = sem_timedwait(&g_dpdk_sem, &timeout);
1261 	if (rc != 0) {
1262 		SPDK_ERRLOG("Timeout waiting for event: %s.\n", errmsg);
1263 		sem_wait(&g_dpdk_sem);
1264 	}
1265 }
1266 
1267 static void
1268 vhost_session_cb_done(int rc)
1269 {
1270 	g_dpdk_response = rc;
1271 	sem_post(&g_dpdk_sem);
1272 }
1273 
1274 void
1275 vhost_user_session_stop_done(struct spdk_vhost_session *vsession, int response)
1276 {
1277 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vsession->vdev);
1278 
1279 	if (response == 0) {
1280 		vsession->started = false;
1281 
1282 		assert(user_dev->active_session_num > 0);
1283 		user_dev->active_session_num--;
1284 	}
1285 
1286 	vhost_session_cb_done(response);
1287 }
1288 
1289 static void
1290 vhost_event_cb(void *arg1)
1291 {
1292 	struct vhost_session_fn_ctx *ctx = arg1;
1293 	struct spdk_vhost_session *vsession;
1294 
1295 	if (spdk_vhost_trylock() != 0) {
1296 		spdk_thread_send_msg(spdk_get_thread(), vhost_event_cb, arg1);
1297 		return;
1298 	}
1299 
1300 	vsession = vhost_session_find_by_id(ctx->vdev, ctx->vsession_id);
1301 	ctx->cb_fn(ctx->vdev, vsession, NULL);
1302 	spdk_vhost_unlock();
1303 }
1304 
1305 int
1306 vhost_user_session_send_event(struct spdk_vhost_session *vsession,
1307 			      spdk_vhost_session_fn cb_fn, unsigned timeout_sec,
1308 			      const char *errmsg)
1309 {
1310 	struct vhost_session_fn_ctx ev_ctx = {0};
1311 	struct spdk_vhost_dev *vdev = vsession->vdev;
1312 
1313 	ev_ctx.vdev = vdev;
1314 	ev_ctx.vsession_id = vsession->id;
1315 	ev_ctx.cb_fn = cb_fn;
1316 
1317 	spdk_thread_send_msg(vdev->thread, vhost_event_cb, &ev_ctx);
1318 
1319 	spdk_vhost_unlock();
1320 	wait_for_semaphore(timeout_sec, errmsg);
1321 	spdk_vhost_lock();
1322 
1323 	return g_dpdk_response;
1324 }
1325 
1326 static void
1327 foreach_session_finish_cb(void *arg1)
1328 {
1329 	struct vhost_session_fn_ctx *ev_ctx = arg1;
1330 	struct spdk_vhost_dev *vdev = ev_ctx->vdev;
1331 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1332 
1333 	if (spdk_vhost_trylock() != 0) {
1334 		spdk_thread_send_msg(spdk_get_thread(),
1335 				     foreach_session_finish_cb, arg1);
1336 		return;
1337 	}
1338 
1339 	assert(user_dev->pending_async_op_num > 0);
1340 	user_dev->pending_async_op_num--;
1341 	if (ev_ctx->cpl_fn != NULL) {
1342 		ev_ctx->cpl_fn(vdev, ev_ctx->user_ctx);
1343 	}
1344 
1345 	spdk_vhost_unlock();
1346 	free(ev_ctx);
1347 }
1348 
1349 static void
1350 foreach_session(void *arg1)
1351 {
1352 	struct vhost_session_fn_ctx *ev_ctx = arg1;
1353 	struct spdk_vhost_session *vsession;
1354 	struct spdk_vhost_dev *vdev = ev_ctx->vdev;
1355 	int rc;
1356 
1357 	if (spdk_vhost_trylock() != 0) {
1358 		spdk_thread_send_msg(spdk_get_thread(), foreach_session, arg1);
1359 		return;
1360 	}
1361 
1362 	TAILQ_FOREACH(vsession, &to_user_dev(vdev)->vsessions, tailq) {
1363 		rc = ev_ctx->cb_fn(vdev, vsession, ev_ctx->user_ctx);
1364 		if (rc < 0) {
1365 			goto out;
1366 		}
1367 	}
1368 
1369 out:
1370 	spdk_vhost_unlock();
1371 
1372 	spdk_thread_send_msg(g_vhost_user_init_thread, foreach_session_finish_cb, arg1);
1373 }
1374 
1375 void
1376 vhost_user_dev_foreach_session(struct spdk_vhost_dev *vdev,
1377 			       spdk_vhost_session_fn fn,
1378 			       spdk_vhost_dev_fn cpl_fn,
1379 			       void *arg)
1380 {
1381 	struct vhost_session_fn_ctx *ev_ctx;
1382 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1383 
1384 	ev_ctx = calloc(1, sizeof(*ev_ctx));
1385 	if (ev_ctx == NULL) {
1386 		SPDK_ERRLOG("Failed to alloc vhost event.\n");
1387 		assert(false);
1388 		return;
1389 	}
1390 
1391 	ev_ctx->vdev = vdev;
1392 	ev_ctx->cb_fn = fn;
1393 	ev_ctx->cpl_fn = cpl_fn;
1394 	ev_ctx->user_ctx = arg;
1395 
1396 	assert(user_dev->pending_async_op_num < UINT32_MAX);
1397 	user_dev->pending_async_op_num++;
1398 
1399 	spdk_thread_send_msg(vdev->thread, foreach_session, ev_ctx);
1400 }
1401 
1402 void
1403 vhost_user_session_set_interrupt_mode(struct spdk_vhost_session *vsession, bool interrupt_mode)
1404 {
1405 	uint16_t i;
1406 	bool packed_ring;
1407 	int rc = 0;
1408 
1409 	packed_ring = ((vsession->negotiated_features & (1ULL << VIRTIO_F_RING_PACKED)) != 0);
1410 
1411 	for (i = 0; i < vsession->max_queues; i++) {
1412 		struct spdk_vhost_virtqueue *q = &vsession->virtqueue[i];
1413 		uint64_t num_events = 1;
1414 
1415 		/* vring.desc and vring.desc_packed are in a union struct
1416 		 * so q->vring.desc can replace q->vring.desc_packed.
1417 		 */
1418 		if (q->vring.desc == NULL || q->vring.size == 0) {
1419 			continue;
1420 		}
1421 
1422 		if (interrupt_mode) {
1423 			/* Enable I/O submission notifications, we'll be interrupting. */
1424 			if (packed_ring) {
1425 				* (volatile uint16_t *) &q->vring.device_event->flags = VRING_PACKED_EVENT_FLAG_ENABLE;
1426 			} else {
1427 				* (volatile uint16_t *) &q->vring.used->flags = 0;
1428 			}
1429 
1430 			/* In case of race condition, always kick vring when switch to intr */
1431 			rc = write(q->vring.kickfd, &num_events, sizeof(num_events));
1432 			if (rc < 0) {
1433 				SPDK_ERRLOG("failed to kick vring: %s.\n", spdk_strerror(errno));
1434 			}
1435 
1436 			vsession->interrupt_mode = true;
1437 		} else {
1438 			/* Disable I/O submission notifications, we'll be polling. */
1439 			if (packed_ring) {
1440 				* (volatile uint16_t *) &q->vring.device_event->flags = VRING_PACKED_EVENT_FLAG_DISABLE;
1441 			} else {
1442 				* (volatile uint16_t *) &q->vring.used->flags = VRING_USED_F_NO_NOTIFY;
1443 			}
1444 
1445 			vsession->interrupt_mode = false;
1446 		}
1447 	}
1448 }
1449 
1450 
1451 static int
1452 extern_vhost_pre_msg_handler(int vid, void *_msg)
1453 {
1454 	struct vhost_user_msg *msg = _msg;
1455 	struct spdk_vhost_session *vsession;
1456 
1457 	vsession = vhost_session_find_by_vid(vid);
1458 	if (vsession == NULL) {
1459 		SPDK_ERRLOG("Received a message to unitialized session (vid %d).\n", vid);
1460 		assert(false);
1461 		return RTE_VHOST_MSG_RESULT_ERR;
1462 	}
1463 
1464 	switch (msg->request) {
1465 	case VHOST_USER_GET_VRING_BASE:
1466 		if (vsession->started) {
1467 			g_spdk_vhost_ops.destroy_device(vid);
1468 		}
1469 		break;
1470 	case VHOST_USER_GET_CONFIG: {
1471 		int rc = 0;
1472 
1473 		spdk_vhost_lock();
1474 		if (vsession->vdev->backend->vhost_get_config) {
1475 			rc = vsession->vdev->backend->vhost_get_config(vsession->vdev,
1476 					msg->payload.cfg.region, msg->payload.cfg.size);
1477 			if (rc != 0) {
1478 				msg->size = 0;
1479 			}
1480 		}
1481 		spdk_vhost_unlock();
1482 
1483 		return RTE_VHOST_MSG_RESULT_REPLY;
1484 	}
1485 	case VHOST_USER_SET_CONFIG: {
1486 		int rc = 0;
1487 
1488 		spdk_vhost_lock();
1489 		if (vsession->vdev->backend->vhost_set_config) {
1490 			rc = vsession->vdev->backend->vhost_set_config(vsession->vdev,
1491 					msg->payload.cfg.region, msg->payload.cfg.offset,
1492 					msg->payload.cfg.size, msg->payload.cfg.flags);
1493 		}
1494 		spdk_vhost_unlock();
1495 
1496 		return rc == 0 ? RTE_VHOST_MSG_RESULT_OK : RTE_VHOST_MSG_RESULT_ERR;
1497 	}
1498 	default:
1499 		break;
1500 	}
1501 
1502 	return RTE_VHOST_MSG_RESULT_NOT_HANDLED;
1503 }
1504 
1505 static int
1506 extern_vhost_post_msg_handler(int vid, void *_msg)
1507 {
1508 	struct vhost_user_msg *msg = _msg;
1509 	struct spdk_vhost_session *vsession;
1510 	uint16_t qid;
1511 	int rc;
1512 
1513 	vsession = vhost_session_find_by_vid(vid);
1514 	if (vsession == NULL) {
1515 		SPDK_ERRLOG("Received a message to unitialized session (vid %d).\n", vid);
1516 		assert(false);
1517 		return RTE_VHOST_MSG_RESULT_ERR;
1518 	}
1519 
1520 	if (msg->request == VHOST_USER_SET_MEM_TABLE) {
1521 		vhost_register_memtable_if_required(vsession, vid);
1522 	}
1523 
1524 	switch (msg->request) {
1525 	case VHOST_USER_SET_FEATURES:
1526 		rc = vhost_get_negotiated_features(vid, &vsession->negotiated_features);
1527 		if (rc) {
1528 			SPDK_ERRLOG("vhost device %d: Failed to get negotiated driver features\n", vid);
1529 			return RTE_VHOST_MSG_RESULT_ERR;
1530 		}
1531 		break;
1532 	case VHOST_USER_SET_VRING_CALL:
1533 		qid = (uint16_t)msg->payload.u64;
1534 		rc = set_device_vq_callfd(vsession, qid);
1535 		if (rc) {
1536 			return RTE_VHOST_MSG_RESULT_ERR;
1537 		}
1538 		break;
1539 	case VHOST_USER_SET_VRING_KICK:
1540 		qid = (uint16_t)msg->payload.u64;
1541 		rc = enable_device_vq(vsession, qid);
1542 		if (rc) {
1543 			return RTE_VHOST_MSG_RESULT_ERR;
1544 		}
1545 
1546 		/* vhost-user spec tells us to start polling a queue after receiving
1547 		 * its SET_VRING_KICK message. Let's do it!
1548 		 */
1549 		if (!vsession->started) {
1550 			g_spdk_vhost_ops.new_device(vid);
1551 		}
1552 		break;
1553 	default:
1554 		break;
1555 	}
1556 
1557 	return RTE_VHOST_MSG_RESULT_NOT_HANDLED;
1558 }
1559 
1560 struct rte_vhost_user_extern_ops g_spdk_extern_vhost_ops = {
1561 	.pre_msg_handle = extern_vhost_pre_msg_handler,
1562 	.post_msg_handle = extern_vhost_post_msg_handler,
1563 };
1564 
1565 void
1566 vhost_session_install_rte_compat_hooks(struct spdk_vhost_session *vsession)
1567 {
1568 	int rc;
1569 
1570 	rc = rte_vhost_extern_callback_register(vsession->vid, &g_spdk_extern_vhost_ops, NULL);
1571 	if (rc != 0) {
1572 		SPDK_ERRLOG("rte_vhost_extern_callback_register() failed for vid = %d\n",
1573 			    vsession->vid);
1574 		return;
1575 	}
1576 }
1577 
1578 int
1579 vhost_register_unix_socket(const char *path, const char *ctrl_name,
1580 			   uint64_t virtio_features, uint64_t disabled_features, uint64_t protocol_features)
1581 {
1582 	struct stat file_stat;
1583 	uint64_t features = 0;
1584 
1585 	/* Register vhost driver to handle vhost messages. */
1586 	if (stat(path, &file_stat) != -1) {
1587 		if (!S_ISSOCK(file_stat.st_mode)) {
1588 			SPDK_ERRLOG("Cannot create a domain socket at path \"%s\": "
1589 				    "The file already exists and is not a socket.\n",
1590 				    path);
1591 			return -EIO;
1592 		} else if (unlink(path) != 0) {
1593 			SPDK_ERRLOG("Cannot create a domain socket at path \"%s\": "
1594 				    "The socket already exists and failed to unlink.\n",
1595 				    path);
1596 			return -EIO;
1597 		}
1598 	}
1599 
1600 #if RTE_VERSION < RTE_VERSION_NUM(20, 8, 0, 0)
1601 	if (rte_vhost_driver_register(path, 0) != 0) {
1602 #else
1603 	if (rte_vhost_driver_register(path, RTE_VHOST_USER_ASYNC_COPY) != 0) {
1604 #endif
1605 		SPDK_ERRLOG("Could not register controller %s with vhost library\n", ctrl_name);
1606 		SPDK_ERRLOG("Check if domain socket %s already exists\n", path);
1607 		return -EIO;
1608 	}
1609 	if (rte_vhost_driver_set_features(path, virtio_features) ||
1610 	    rte_vhost_driver_disable_features(path, disabled_features)) {
1611 		SPDK_ERRLOG("Couldn't set vhost features for controller %s\n", ctrl_name);
1612 
1613 		rte_vhost_driver_unregister(path);
1614 		return -EIO;
1615 	}
1616 
1617 	if (rte_vhost_driver_callback_register(path, &g_spdk_vhost_ops) != 0) {
1618 		rte_vhost_driver_unregister(path);
1619 		SPDK_ERRLOG("Couldn't register callbacks for controller %s\n", ctrl_name);
1620 		return -EIO;
1621 	}
1622 
1623 	rte_vhost_driver_get_protocol_features(path, &features);
1624 	features |= protocol_features;
1625 	rte_vhost_driver_set_protocol_features(path, features);
1626 
1627 	if (rte_vhost_driver_start(path) != 0) {
1628 		SPDK_ERRLOG("Failed to start vhost driver for controller %s (%d): %s\n",
1629 			    ctrl_name, errno, spdk_strerror(errno));
1630 		rte_vhost_driver_unregister(path);
1631 		return -EIO;
1632 	}
1633 
1634 	return 0;
1635 }
1636 
1637 int
1638 vhost_get_mem_table(int vid, struct rte_vhost_memory **mem)
1639 {
1640 	return rte_vhost_get_mem_table(vid, mem);
1641 }
1642 
1643 int
1644 vhost_driver_unregister(const char *path)
1645 {
1646 	return rte_vhost_driver_unregister(path);
1647 }
1648 
1649 int
1650 vhost_get_negotiated_features(int vid, uint64_t *negotiated_features)
1651 {
1652 	return rte_vhost_get_negotiated_features(vid, negotiated_features);
1653 }
1654 
1655 int
1656 vhost_user_dev_set_coalescing(struct spdk_vhost_user_dev *user_dev, uint32_t delay_base_us,
1657 			      uint32_t iops_threshold)
1658 {
1659 	uint64_t delay_time_base = delay_base_us * spdk_get_ticks_hz() / 1000000ULL;
1660 	uint32_t io_rate = iops_threshold * SPDK_VHOST_STATS_CHECK_INTERVAL_MS / 1000U;
1661 
1662 	if (delay_time_base >= UINT32_MAX) {
1663 		SPDK_ERRLOG("Delay time of %"PRIu32" is to big\n", delay_base_us);
1664 		return -EINVAL;
1665 	} else if (io_rate == 0) {
1666 		SPDK_ERRLOG("IOPS rate of %"PRIu32" is too low. Min is %u\n", io_rate,
1667 			    1000U / SPDK_VHOST_STATS_CHECK_INTERVAL_MS);
1668 		return -EINVAL;
1669 	}
1670 
1671 	user_dev->coalescing_delay_us = delay_base_us;
1672 	user_dev->coalescing_iops_threshold = iops_threshold;
1673 	return 0;
1674 }
1675 
1676 int
1677 vhost_user_session_set_coalescing(struct spdk_vhost_dev *vdev,
1678 				  struct spdk_vhost_session *vsession, void *ctx)
1679 {
1680 	vsession->coalescing_delay_time_base =
1681 		to_user_dev(vdev)->coalescing_delay_us * spdk_get_ticks_hz() / 1000000ULL;
1682 	vsession->coalescing_io_rate_threshold =
1683 		to_user_dev(vdev)->coalescing_iops_threshold * SPDK_VHOST_STATS_CHECK_INTERVAL_MS / 1000U;
1684 	return 0;
1685 }
1686 
1687 int
1688 spdk_vhost_set_coalescing(struct spdk_vhost_dev *vdev, uint32_t delay_base_us,
1689 			  uint32_t iops_threshold)
1690 {
1691 	int rc;
1692 
1693 	rc = vhost_user_dev_set_coalescing(to_user_dev(vdev), delay_base_us, iops_threshold);
1694 	if (rc != 0) {
1695 		return rc;
1696 	}
1697 
1698 	vhost_user_dev_foreach_session(vdev, vhost_user_session_set_coalescing, NULL, NULL);
1699 	return 0;
1700 }
1701 
1702 void
1703 spdk_vhost_get_coalescing(struct spdk_vhost_dev *vdev, uint32_t *delay_base_us,
1704 			  uint32_t *iops_threshold)
1705 {
1706 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1707 
1708 	if (delay_base_us) {
1709 		*delay_base_us = user_dev->coalescing_delay_us;
1710 	}
1711 
1712 	if (iops_threshold) {
1713 		*iops_threshold = user_dev->coalescing_iops_threshold;
1714 	}
1715 }
1716 
1717 int
1718 spdk_vhost_set_socket_path(const char *basename)
1719 {
1720 	int ret;
1721 
1722 	if (basename && strlen(basename) > 0) {
1723 		ret = snprintf(g_vhost_user_dev_dirname, sizeof(g_vhost_user_dev_dirname) - 2, "%s", basename);
1724 		if (ret <= 0) {
1725 			return -EINVAL;
1726 		}
1727 		if ((size_t)ret >= sizeof(g_vhost_user_dev_dirname) - 2) {
1728 			SPDK_ERRLOG("Char dev dir path length %d is too long\n", ret);
1729 			return -EINVAL;
1730 		}
1731 
1732 		if (g_vhost_user_dev_dirname[ret - 1] != '/') {
1733 			g_vhost_user_dev_dirname[ret] = '/';
1734 			g_vhost_user_dev_dirname[ret + 1]  = '\0';
1735 		}
1736 	}
1737 
1738 	return 0;
1739 }
1740 
1741 static void
1742 vhost_dev_thread_exit(void *arg1)
1743 {
1744 	spdk_thread_exit(spdk_get_thread());
1745 }
1746 
1747 int
1748 vhost_user_dev_register(struct spdk_vhost_dev *vdev, const char *name, struct spdk_cpuset *cpumask,
1749 			const struct spdk_vhost_user_dev_backend *user_backend)
1750 {
1751 	char path[PATH_MAX];
1752 	struct spdk_vhost_user_dev *user_dev;
1753 
1754 	if (snprintf(path, sizeof(path), "%s%s", g_vhost_user_dev_dirname, name) >= (int)sizeof(path)) {
1755 		SPDK_ERRLOG("Resulting socket path for controller %s is too long: %s%s\n",
1756 			    name, g_vhost_user_dev_dirname, name);
1757 		return -EINVAL;
1758 	}
1759 
1760 	vdev->path = strdup(path);
1761 	if (vdev->path == NULL) {
1762 		return -EIO;
1763 	}
1764 
1765 	user_dev = calloc(1, sizeof(*user_dev));
1766 	if (user_dev == NULL) {
1767 		free(vdev->path);
1768 		return -ENOMEM;
1769 	}
1770 	vdev->ctxt = user_dev;
1771 
1772 	vdev->thread = spdk_thread_create(vdev->name, cpumask);
1773 	if (vdev->thread == NULL) {
1774 		free(user_dev);
1775 		free(vdev->path);
1776 		SPDK_ERRLOG("Failed to create thread for vhost controller %s.\n", name);
1777 		return -EIO;
1778 	}
1779 
1780 	vdev->registered = true;
1781 	user_dev->user_backend = user_backend;
1782 	user_dev->vdev = vdev;
1783 	TAILQ_INIT(&user_dev->vsessions);
1784 
1785 	vhost_user_dev_set_coalescing(user_dev, SPDK_VHOST_COALESCING_DELAY_BASE_US,
1786 				      SPDK_VHOST_VQ_IOPS_COALESCING_THRESHOLD);
1787 
1788 	if (vhost_register_unix_socket(path, name, vdev->virtio_features, vdev->disabled_features,
1789 				       vdev->protocol_features)) {
1790 		spdk_thread_send_msg(vdev->thread, vhost_dev_thread_exit, NULL);
1791 		free(user_dev);
1792 		free(vdev->path);
1793 		return -EIO;
1794 	}
1795 
1796 	return 0;
1797 }
1798 
1799 int
1800 vhost_user_dev_unregister(struct spdk_vhost_dev *vdev)
1801 {
1802 	struct spdk_vhost_user_dev *user_dev = to_user_dev(vdev);
1803 
1804 	if (user_dev->pending_async_op_num) {
1805 		return -EBUSY;
1806 	}
1807 
1808 	if (!TAILQ_EMPTY(&user_dev->vsessions)) {
1809 		SPDK_ERRLOG("Controller %s has still valid connection.\n", vdev->name);
1810 		return -EBUSY;
1811 	}
1812 
1813 	if (vdev->registered && vhost_driver_unregister(vdev->path) != 0) {
1814 		SPDK_ERRLOG("Could not unregister controller %s with vhost library\n"
1815 			    "Check if domain socket %s still exists\n",
1816 			    vdev->name, vdev->path);
1817 		return -EIO;
1818 	}
1819 
1820 	spdk_thread_send_msg(vdev->thread, vhost_dev_thread_exit, NULL);
1821 	free(user_dev);
1822 	free(vdev->path);
1823 
1824 	return 0;
1825 }
1826 
1827 static bool g_vhost_user_started = false;
1828 
1829 int
1830 vhost_user_init(void)
1831 {
1832 	size_t len;
1833 
1834 	if (g_vhost_user_started) {
1835 		return 0;
1836 	}
1837 
1838 	if (g_vhost_user_dev_dirname[0] == '\0') {
1839 		if (getcwd(g_vhost_user_dev_dirname, sizeof(g_vhost_user_dev_dirname) - 1) == NULL) {
1840 			SPDK_ERRLOG("getcwd failed (%d): %s\n", errno, spdk_strerror(errno));
1841 			return -1;
1842 		}
1843 
1844 		len = strlen(g_vhost_user_dev_dirname);
1845 		if (g_vhost_user_dev_dirname[len - 1] != '/') {
1846 			g_vhost_user_dev_dirname[len] = '/';
1847 			g_vhost_user_dev_dirname[len + 1] = '\0';
1848 		}
1849 	}
1850 
1851 	g_vhost_user_started = true;
1852 
1853 	g_vhost_user_init_thread = spdk_get_thread();
1854 	assert(g_vhost_user_init_thread != NULL);
1855 
1856 	return 0;
1857 }
1858 
1859 static void
1860 vhost_user_session_shutdown_on_init(void *vhost_cb)
1861 {
1862 	spdk_vhost_fini_cb fn = vhost_cb;
1863 
1864 	fn();
1865 }
1866 
1867 static void *
1868 vhost_user_session_shutdown(void *vhost_cb)
1869 {
1870 	struct spdk_vhost_dev *vdev = NULL;
1871 	struct spdk_vhost_session *vsession;
1872 
1873 	for (vdev = spdk_vhost_dev_next(NULL); vdev != NULL;
1874 	     vdev = spdk_vhost_dev_next(vdev)) {
1875 		spdk_vhost_lock();
1876 		TAILQ_FOREACH(vsession, &to_user_dev(vdev)->vsessions, tailq) {
1877 			if (vsession->started) {
1878 				_stop_session(vsession);
1879 			}
1880 		}
1881 		spdk_vhost_unlock();
1882 		vhost_driver_unregister(vdev->path);
1883 		vdev->registered = false;
1884 	}
1885 
1886 	SPDK_INFOLOG(vhost, "Exiting\n");
1887 	spdk_thread_send_msg(g_vhost_user_init_thread, vhost_user_session_shutdown_on_init, vhost_cb);
1888 	return NULL;
1889 }
1890 
1891 void
1892 vhost_user_fini(spdk_vhost_fini_cb vhost_cb)
1893 {
1894 	pthread_t tid;
1895 	int rc;
1896 
1897 	if (!g_vhost_user_started) {
1898 		vhost_cb();
1899 		return;
1900 	}
1901 
1902 	g_vhost_user_started = false;
1903 
1904 	/* rte_vhost API for removing sockets is not asynchronous. Since it may call SPDK
1905 	 * ops for stopping a device or removing a connection, we need to call it from
1906 	 * a separate thread to avoid deadlock.
1907 	 */
1908 	rc = pthread_create(&tid, NULL, &vhost_user_session_shutdown, vhost_cb);
1909 	if (rc < 0) {
1910 		SPDK_ERRLOG("Failed to start session shutdown thread (%d): %s\n", rc, spdk_strerror(rc));
1911 		abort();
1912 	}
1913 	pthread_detach(tid);
1914 }
1915