xref: /spdk/lib/vhost/rte_vhost_user.c (revision d7f0a1820eb52bfce7ca511df55ddc0ca9dffab8)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *   Copyright (c) 2021 Mellanox Technologies LTD. All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/env.h"
38 #include "spdk/likely.h"
39 #include "spdk/string.h"
40 #include "spdk/util.h"
41 #include "spdk/memory.h"
42 #include "spdk/barrier.h"
43 #include "spdk/vhost.h"
44 #include "vhost_internal.h"
45 #include <rte_version.h>
46 
47 #include "spdk_internal/vhost_user.h"
48 
49 bool g_packed_ring_recovery = false;
50 
51 /* Path to folder where character device will be created. Can be set by user. */
52 static char g_vhost_user_dev_dirname[PATH_MAX] = "";
53 
54 static struct spdk_thread *g_vhost_user_init_thread;
55 
56 /**
57  * DPDK calls our callbacks synchronously but the work those callbacks
58  * perform needs to be async. Luckily, all DPDK callbacks are called on
59  * a DPDK-internal pthread, so we'll just wait on a semaphore in there.
60  */
61 static sem_t g_dpdk_sem;
62 
63 /** Return code for the current DPDK callback */
64 static int g_dpdk_response;
65 
66 struct vhost_session_fn_ctx {
67 	/** Device pointer obtained before enqueueing the event */
68 	struct spdk_vhost_dev *vdev;
69 
70 	/** ID of the session to send event to. */
71 	uint32_t vsession_id;
72 
73 	/** User provided function to be executed on session's thread. */
74 	spdk_vhost_session_fn cb_fn;
75 
76 	/**
77 	 * User provided function to be called on the init thread
78 	 * after iterating through all sessions.
79 	 */
80 	spdk_vhost_dev_fn cpl_fn;
81 
82 	/** Custom user context */
83 	void *user_ctx;
84 };
85 
86 static void __attribute__((constructor))
87 _vhost_user_sem_init(void)
88 {
89 	if (sem_init(&g_dpdk_sem, 0, 0) != 0) {
90 		SPDK_ERRLOG("Failed to initialize semaphore for rte_vhost pthread.\n");
91 		abort();
92 	}
93 }
94 
95 static void __attribute__((destructor))
96 _vhost_user_sem_destroy(void)
97 {
98 	sem_destroy(&g_dpdk_sem);
99 }
100 
101 void *vhost_gpa_to_vva(struct spdk_vhost_session *vsession, uint64_t addr, uint64_t len)
102 {
103 	void *vva;
104 	uint64_t newlen;
105 
106 	newlen = len;
107 	vva = (void *)rte_vhost_va_from_guest_pa(vsession->mem, addr, &newlen);
108 	if (newlen != len) {
109 		return NULL;
110 	}
111 
112 	return vva;
113 
114 }
115 
116 static void
117 vhost_log_req_desc(struct spdk_vhost_session *vsession, struct spdk_vhost_virtqueue *virtqueue,
118 		   uint16_t req_id)
119 {
120 	struct vring_desc *desc, *desc_table;
121 	uint32_t desc_table_size;
122 	int rc;
123 
124 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
125 		return;
126 	}
127 
128 	rc = vhost_vq_get_desc(vsession, virtqueue, req_id, &desc, &desc_table, &desc_table_size);
129 	if (spdk_unlikely(rc != 0)) {
130 		SPDK_ERRLOG("Can't log used ring descriptors!\n");
131 		return;
132 	}
133 
134 	do {
135 		if (vhost_vring_desc_is_wr(desc)) {
136 			/* To be honest, only pages realy touched should be logged, but
137 			 * doing so would require tracking those changes in each backed.
138 			 * Also backend most likely will touch all/most of those pages so
139 			 * for lets assume we touched all pages passed to as writeable buffers. */
140 			rte_vhost_log_write(vsession->vid, desc->addr, desc->len);
141 		}
142 		vhost_vring_desc_get_next(&desc, desc_table, desc_table_size);
143 	} while (desc);
144 }
145 
146 static void
147 vhost_log_used_vring_elem(struct spdk_vhost_session *vsession,
148 			  struct spdk_vhost_virtqueue *virtqueue,
149 			  uint16_t idx)
150 {
151 	uint64_t offset, len;
152 
153 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
154 		return;
155 	}
156 
157 	if (spdk_unlikely(virtqueue->packed.packed_ring)) {
158 		offset = idx * sizeof(struct vring_packed_desc);
159 		len = sizeof(struct vring_packed_desc);
160 	} else {
161 		offset = offsetof(struct vring_used, ring[idx]);
162 		len = sizeof(virtqueue->vring.used->ring[idx]);
163 	}
164 
165 	rte_vhost_log_used_vring(vsession->vid, virtqueue->vring_idx, offset, len);
166 }
167 
168 static void
169 vhost_log_used_vring_idx(struct spdk_vhost_session *vsession,
170 			 struct spdk_vhost_virtqueue *virtqueue)
171 {
172 	uint64_t offset, len;
173 	uint16_t vq_idx;
174 
175 	if (spdk_likely(!vhost_dev_has_feature(vsession, VHOST_F_LOG_ALL))) {
176 		return;
177 	}
178 
179 	offset = offsetof(struct vring_used, idx);
180 	len = sizeof(virtqueue->vring.used->idx);
181 	vq_idx = virtqueue - vsession->virtqueue;
182 
183 	rte_vhost_log_used_vring(vsession->vid, vq_idx, offset, len);
184 }
185 
186 /*
187  * Get available requests from avail ring.
188  */
189 uint16_t
190 vhost_vq_avail_ring_get(struct spdk_vhost_virtqueue *virtqueue, uint16_t *reqs,
191 			uint16_t reqs_len)
192 {
193 	struct rte_vhost_vring *vring = &virtqueue->vring;
194 	struct vring_avail *avail = vring->avail;
195 	uint16_t size_mask = vring->size - 1;
196 	uint16_t last_idx = virtqueue->last_avail_idx, avail_idx = avail->idx;
197 	uint16_t count, i;
198 	int rc;
199 	uint64_t u64_value;
200 
201 	spdk_smp_rmb();
202 
203 	if (virtqueue->vsession && spdk_unlikely(virtqueue->vsession->interrupt_mode)) {
204 		/* Read to clear vring's kickfd */
205 		rc = read(vring->kickfd, &u64_value, sizeof(u64_value));
206 		if (rc < 0) {
207 			SPDK_ERRLOG("failed to acknowledge kickfd: %s.\n", spdk_strerror(errno));
208 			return -errno;
209 		}
210 	}
211 
212 	count = avail_idx - last_idx;
213 	if (spdk_likely(count == 0)) {
214 		return 0;
215 	}
216 
217 	if (spdk_unlikely(count > vring->size)) {
218 		/* TODO: the queue is unrecoverably broken and should be marked so.
219 		 * For now we will fail silently and report there are no new avail entries.
220 		 */
221 		return 0;
222 	}
223 
224 	count = spdk_min(count, reqs_len);
225 
226 	virtqueue->last_avail_idx += count;
227 	/* Check whether there are unprocessed reqs in vq, then kick vq manually */
228 	if (virtqueue->vsession && spdk_unlikely(virtqueue->vsession->interrupt_mode)) {
229 		/* If avail_idx is larger than virtqueue's last_avail_idx, then there is unprocessed reqs.
230 		 * avail_idx should get updated here from memory, in case of race condition with guest.
231 		 */
232 		avail_idx = * (volatile uint16_t *) &avail->idx;
233 		if (avail_idx > virtqueue->last_avail_idx) {
234 			/* Write to notify vring's kickfd */
235 			rc = write(vring->kickfd, &u64_value, sizeof(u64_value));
236 			if (rc < 0) {
237 				SPDK_ERRLOG("failed to kick vring: %s.\n", spdk_strerror(errno));
238 				return -errno;
239 			}
240 		}
241 	}
242 
243 	for (i = 0; i < count; i++) {
244 		reqs[i] = vring->avail->ring[(last_idx + i) & size_mask];
245 	}
246 
247 	SPDK_DEBUGLOG(vhost_ring,
248 		      "AVAIL: last_idx=%"PRIu16" avail_idx=%"PRIu16" count=%"PRIu16"\n",
249 		      last_idx, avail_idx, count);
250 
251 	return count;
252 }
253 
254 static bool
255 vhost_vring_desc_is_indirect(struct vring_desc *cur_desc)
256 {
257 	return !!(cur_desc->flags & VRING_DESC_F_INDIRECT);
258 }
259 
260 static bool
261 vhost_vring_packed_desc_is_indirect(struct vring_packed_desc *cur_desc)
262 {
263 	return (cur_desc->flags & VRING_DESC_F_INDIRECT) != 0;
264 }
265 
266 static bool
267 vhost_inflight_packed_desc_is_indirect(spdk_vhost_inflight_desc *cur_desc)
268 {
269 	return (cur_desc->flags & VRING_DESC_F_INDIRECT) != 0;
270 }
271 
272 int
273 vhost_vq_get_desc(struct spdk_vhost_session *vsession, struct spdk_vhost_virtqueue *virtqueue,
274 		  uint16_t req_idx, struct vring_desc **desc, struct vring_desc **desc_table,
275 		  uint32_t *desc_table_size)
276 {
277 	if (spdk_unlikely(req_idx >= virtqueue->vring.size)) {
278 		return -1;
279 	}
280 
281 	*desc = &virtqueue->vring.desc[req_idx];
282 
283 	if (vhost_vring_desc_is_indirect(*desc)) {
284 		*desc_table_size = (*desc)->len / sizeof(**desc);
285 		*desc_table = vhost_gpa_to_vva(vsession, (*desc)->addr,
286 					       sizeof(**desc) * *desc_table_size);
287 		*desc = *desc_table;
288 		if (*desc == NULL) {
289 			return -1;
290 		}
291 
292 		return 0;
293 	}
294 
295 	*desc_table = virtqueue->vring.desc;
296 	*desc_table_size = virtqueue->vring.size;
297 
298 	return 0;
299 }
300 
301 static bool
302 vhost_packed_desc_indirect_to_desc_table(struct spdk_vhost_session *vsession,
303 		uint64_t addr, uint32_t len,
304 		struct vring_packed_desc **desc_table,
305 		uint32_t *desc_table_size)
306 {
307 	*desc_table_size = len / sizeof(struct vring_packed_desc);
308 
309 	*desc_table = vhost_gpa_to_vva(vsession, addr, len);
310 	if (spdk_unlikely(*desc_table == NULL)) {
311 		return false;
312 	}
313 
314 	return true;
315 }
316 
317 int
318 vhost_vq_get_desc_packed(struct spdk_vhost_session *vsession,
319 			 struct spdk_vhost_virtqueue *virtqueue,
320 			 uint16_t req_idx, struct vring_packed_desc **desc,
321 			 struct vring_packed_desc **desc_table, uint32_t *desc_table_size)
322 {
323 	*desc =  &virtqueue->vring.desc_packed[req_idx];
324 
325 	/* In packed ring when the desc is non-indirect we get next desc
326 	 * by judging (desc->flag & VRING_DESC_F_NEXT) != 0. When the desc
327 	 * is indirect we get next desc by idx and desc_table_size. It's
328 	 * different from split ring.
329 	 */
330 	if (vhost_vring_packed_desc_is_indirect(*desc)) {
331 		if (!vhost_packed_desc_indirect_to_desc_table(vsession, (*desc)->addr, (*desc)->len,
332 				desc_table, desc_table_size)) {
333 			return -1;
334 		}
335 
336 		*desc = *desc_table;
337 	} else {
338 		*desc_table = NULL;
339 		*desc_table_size  = 0;
340 	}
341 
342 	return 0;
343 }
344 
345 int
346 vhost_inflight_queue_get_desc(struct spdk_vhost_session *vsession,
347 			      spdk_vhost_inflight_desc *desc_array,
348 			      uint16_t req_idx, spdk_vhost_inflight_desc **desc,
349 			      struct vring_packed_desc  **desc_table, uint32_t *desc_table_size)
350 {
351 	*desc = &desc_array[req_idx];
352 
353 	if (vhost_inflight_packed_desc_is_indirect(*desc)) {
354 		if (!vhost_packed_desc_indirect_to_desc_table(vsession, (*desc)->addr, (*desc)->len,
355 				desc_table, desc_table_size)) {
356 			return -1;
357 		}
358 
359 		/* This desc is the inflight desc not the packed desc.
360 		 * When set the F_INDIRECT the table entry should be the packed desc
361 		 * so set the inflight desc NULL.
362 		 */
363 		*desc = NULL;
364 	} else {
365 		/* When not set the F_INDIRECT means there is no packed desc table */
366 		*desc_table = NULL;
367 		*desc_table_size = 0;
368 	}
369 
370 	return 0;
371 }
372 
373 int
374 vhost_vq_used_signal(struct spdk_vhost_session *vsession,
375 		     struct spdk_vhost_virtqueue *virtqueue)
376 {
377 	if (virtqueue->used_req_cnt == 0) {
378 		return 0;
379 	}
380 
381 	virtqueue->req_cnt += virtqueue->used_req_cnt;
382 	virtqueue->used_req_cnt = 0;
383 
384 	SPDK_DEBUGLOG(vhost_ring,
385 		      "Queue %td - USED RING: sending IRQ: last used %"PRIu16"\n",
386 		      virtqueue - vsession->virtqueue, virtqueue->last_used_idx);
387 
388 	if (rte_vhost_vring_call(vsession->vid, virtqueue->vring_idx) == 0) {
389 		/* interrupt signalled */
390 		return 1;
391 	} else {
392 		/* interrupt not signalled */
393 		return 0;
394 	}
395 }
396 
397 static void
398 session_vq_io_stats_update(struct spdk_vhost_session *vsession,
399 			   struct spdk_vhost_virtqueue *virtqueue, uint64_t now)
400 {
401 	uint32_t irq_delay_base = vsession->coalescing_delay_time_base;
402 	uint32_t io_threshold = vsession->coalescing_io_rate_threshold;
403 	int32_t irq_delay;
404 	uint32_t req_cnt;
405 
406 	req_cnt = virtqueue->req_cnt + virtqueue->used_req_cnt;
407 	if (req_cnt <= io_threshold) {
408 		return;
409 	}
410 
411 	irq_delay = (irq_delay_base * (req_cnt - io_threshold)) / io_threshold;
412 	virtqueue->irq_delay_time = (uint32_t) spdk_max(0, irq_delay);
413 
414 	virtqueue->req_cnt = 0;
415 	virtqueue->next_event_time = now;
416 }
417 
418 static void
419 check_session_vq_io_stats(struct spdk_vhost_session *vsession,
420 			  struct spdk_vhost_virtqueue *virtqueue, uint64_t now)
421 {
422 	if (now < vsession->next_stats_check_time) {
423 		return;
424 	}
425 
426 	vsession->next_stats_check_time = now + vsession->stats_check_interval;
427 	session_vq_io_stats_update(vsession, virtqueue, now);
428 }
429 
430 static inline bool
431 vhost_vq_event_is_suppressed(struct spdk_vhost_virtqueue *vq)
432 {
433 	if (spdk_unlikely(vq->packed.packed_ring)) {
434 		if (vq->vring.driver_event->flags & VRING_PACKED_EVENT_FLAG_DISABLE) {
435 			return true;
436 		}
437 	} else {
438 		if (vq->vring.avail->flags & VRING_AVAIL_F_NO_INTERRUPT) {
439 			return true;
440 		}
441 	}
442 
443 	return false;
444 }
445 
446 void
447 vhost_session_vq_used_signal(struct spdk_vhost_virtqueue *virtqueue)
448 {
449 	struct spdk_vhost_session *vsession = virtqueue->vsession;
450 	uint64_t now;
451 
452 	if (vsession->coalescing_delay_time_base == 0) {
453 		if (virtqueue->vring.desc == NULL) {
454 			return;
455 		}
456 
457 		if (vhost_vq_event_is_suppressed(virtqueue)) {
458 			return;
459 		}
460 
461 		vhost_vq_used_signal(vsession, virtqueue);
462 	} else {
463 		now = spdk_get_ticks();
464 		check_session_vq_io_stats(vsession, virtqueue, now);
465 
466 		/* No need for event right now */
467 		if (now < virtqueue->next_event_time) {
468 			return;
469 		}
470 
471 		if (vhost_vq_event_is_suppressed(virtqueue)) {
472 			return;
473 		}
474 
475 		if (!vhost_vq_used_signal(vsession, virtqueue)) {
476 			return;
477 		}
478 
479 		/* Syscall is quite long so update time */
480 		now = spdk_get_ticks();
481 		virtqueue->next_event_time = now + virtqueue->irq_delay_time;
482 	}
483 }
484 
485 void
486 vhost_session_used_signal(struct spdk_vhost_session *vsession)
487 {
488 	struct spdk_vhost_virtqueue *virtqueue;
489 	uint16_t q_idx;
490 
491 	for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
492 		virtqueue = &vsession->virtqueue[q_idx];
493 		vhost_session_vq_used_signal(virtqueue);
494 	}
495 }
496 
497 /*
498  * Enqueue id and len to used ring.
499  */
500 void
501 vhost_vq_used_ring_enqueue(struct spdk_vhost_session *vsession,
502 			   struct spdk_vhost_virtqueue *virtqueue,
503 			   uint16_t id, uint32_t len)
504 {
505 	struct rte_vhost_vring *vring = &virtqueue->vring;
506 	struct vring_used *used = vring->used;
507 	uint16_t last_idx = virtqueue->last_used_idx & (vring->size - 1);
508 	uint16_t vq_idx = virtqueue->vring_idx;
509 
510 	SPDK_DEBUGLOG(vhost_ring,
511 		      "Queue %td - USED RING: last_idx=%"PRIu16" req id=%"PRIu16" len=%"PRIu32"\n",
512 		      virtqueue - vsession->virtqueue, virtqueue->last_used_idx, id, len);
513 
514 	vhost_log_req_desc(vsession, virtqueue, id);
515 
516 	virtqueue->last_used_idx++;
517 	used->ring[last_idx].id = id;
518 	used->ring[last_idx].len = len;
519 
520 	/* Ensure the used ring is updated before we log it or increment used->idx. */
521 	spdk_smp_wmb();
522 
523 	rte_vhost_set_last_inflight_io_split(vsession->vid, vq_idx, id);
524 
525 	vhost_log_used_vring_elem(vsession, virtqueue, last_idx);
526 	* (volatile uint16_t *) &used->idx = virtqueue->last_used_idx;
527 	vhost_log_used_vring_idx(vsession, virtqueue);
528 
529 	rte_vhost_clr_inflight_desc_split(vsession->vid, vq_idx, virtqueue->last_used_idx, id);
530 
531 	virtqueue->used_req_cnt++;
532 
533 	if (vsession->interrupt_mode) {
534 		if (virtqueue->vring.desc == NULL || vhost_vq_event_is_suppressed(virtqueue)) {
535 			return;
536 		}
537 
538 		vhost_vq_used_signal(vsession, virtqueue);
539 	}
540 }
541 
542 void
543 vhost_vq_packed_ring_enqueue(struct spdk_vhost_session *vsession,
544 			     struct spdk_vhost_virtqueue *virtqueue,
545 			     uint16_t num_descs, uint16_t buffer_id,
546 			     uint32_t length, uint16_t inflight_head)
547 {
548 	struct vring_packed_desc *desc = &virtqueue->vring.desc_packed[virtqueue->last_used_idx];
549 	bool used, avail;
550 
551 	SPDK_DEBUGLOG(vhost_ring,
552 		      "Queue %td - RING: buffer_id=%"PRIu16"\n",
553 		      virtqueue - vsession->virtqueue, buffer_id);
554 
555 	/* When the descriptor is used, two flags in descriptor
556 	 * avail flag and used flag are set to equal
557 	 * and used flag value == used_wrap_counter.
558 	 */
559 	used = !!(desc->flags & VRING_DESC_F_USED);
560 	avail = !!(desc->flags & VRING_DESC_F_AVAIL);
561 	if (spdk_unlikely(used == virtqueue->packed.used_phase && used == avail)) {
562 		SPDK_ERRLOG("descriptor has been used before\n");
563 		return;
564 	}
565 
566 	/* In used desc addr is unused and len specifies the buffer length
567 	 * that has been written to by the device.
568 	 */
569 	desc->addr = 0;
570 	desc->len = length;
571 
572 	/* This bit specifies whether any data has been written by the device */
573 	if (length != 0) {
574 		desc->flags |= VRING_DESC_F_WRITE;
575 	}
576 
577 	/* Buffer ID is included in the last descriptor in the list.
578 	 * The driver needs to keep track of the size of the list corresponding
579 	 * to each buffer ID.
580 	 */
581 	desc->id = buffer_id;
582 
583 	/* A device MUST NOT make the descriptor used before buffer_id is
584 	 * written to the descriptor.
585 	 */
586 	spdk_smp_wmb();
587 
588 	rte_vhost_set_last_inflight_io_packed(vsession->vid, virtqueue->vring_idx, inflight_head);
589 	/* To mark a desc as used, the device sets the F_USED bit in flags to match
590 	 * the internal Device ring wrap counter. It also sets the F_AVAIL bit to
591 	 * match the same value.
592 	 */
593 	if (virtqueue->packed.used_phase) {
594 		desc->flags |= VRING_DESC_F_AVAIL_USED;
595 	} else {
596 		desc->flags &= ~VRING_DESC_F_AVAIL_USED;
597 	}
598 	rte_vhost_clr_inflight_desc_packed(vsession->vid, virtqueue->vring_idx, inflight_head);
599 
600 	vhost_log_used_vring_elem(vsession, virtqueue, virtqueue->last_used_idx);
601 	virtqueue->last_used_idx += num_descs;
602 	if (virtqueue->last_used_idx >= virtqueue->vring.size) {
603 		virtqueue->last_used_idx -= virtqueue->vring.size;
604 		virtqueue->packed.used_phase = !virtqueue->packed.used_phase;
605 	}
606 
607 	virtqueue->used_req_cnt++;
608 }
609 
610 bool
611 vhost_vq_packed_ring_is_avail(struct spdk_vhost_virtqueue *virtqueue)
612 {
613 	uint16_t flags = virtqueue->vring.desc_packed[virtqueue->last_avail_idx].flags;
614 
615 	/* To mark a desc as available, the driver sets the F_AVAIL bit in flags
616 	 * to match the internal avail wrap counter. It also sets the F_USED bit to
617 	 * match the inverse value but it's not mandatory.
618 	 */
619 	return (!!(flags & VRING_DESC_F_AVAIL) == virtqueue->packed.avail_phase);
620 }
621 
622 bool
623 vhost_vring_packed_desc_is_wr(struct vring_packed_desc *cur_desc)
624 {
625 	return (cur_desc->flags & VRING_DESC_F_WRITE) != 0;
626 }
627 
628 bool
629 vhost_vring_inflight_desc_is_wr(spdk_vhost_inflight_desc *cur_desc)
630 {
631 	return (cur_desc->flags & VRING_DESC_F_WRITE) != 0;
632 }
633 
634 int
635 vhost_vring_packed_desc_get_next(struct vring_packed_desc **desc, uint16_t *req_idx,
636 				 struct spdk_vhost_virtqueue *vq,
637 				 struct vring_packed_desc *desc_table,
638 				 uint32_t desc_table_size)
639 {
640 	if (desc_table != NULL) {
641 		/* When the desc_table isn't NULL means it's indirect and we get the next
642 		 * desc by req_idx and desc_table_size. The return value is NULL means
643 		 * we reach the last desc of this request.
644 		 */
645 		(*req_idx)++;
646 		if (*req_idx < desc_table_size) {
647 			*desc = &desc_table[*req_idx];
648 		} else {
649 			*desc = NULL;
650 		}
651 	} else {
652 		/* When the desc_table is NULL means it's non-indirect and we get the next
653 		 * desc by req_idx and F_NEXT in flags. The return value is NULL means
654 		 * we reach the last desc of this request. When return new desc
655 		 * we update the req_idx too.
656 		 */
657 		if (((*desc)->flags & VRING_DESC_F_NEXT) == 0) {
658 			*desc = NULL;
659 			return 0;
660 		}
661 
662 		*req_idx = (*req_idx + 1) % vq->vring.size;
663 		*desc = &vq->vring.desc_packed[*req_idx];
664 	}
665 
666 	return 0;
667 }
668 
669 static int
670 vhost_vring_desc_payload_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
671 				uint16_t *iov_index, uintptr_t payload, uint64_t remaining)
672 {
673 	uintptr_t vva;
674 	uint64_t len;
675 
676 	do {
677 		if (*iov_index >= SPDK_VHOST_IOVS_MAX) {
678 			SPDK_ERRLOG("SPDK_VHOST_IOVS_MAX(%d) reached\n", SPDK_VHOST_IOVS_MAX);
679 			return -1;
680 		}
681 		len = remaining;
682 		vva = (uintptr_t)rte_vhost_va_from_guest_pa(vsession->mem, payload, &len);
683 		if (vva == 0 || len == 0) {
684 			SPDK_ERRLOG("gpa_to_vva(%p) == NULL\n", (void *)payload);
685 			return -1;
686 		}
687 		iov[*iov_index].iov_base = (void *)vva;
688 		iov[*iov_index].iov_len = len;
689 		remaining -= len;
690 		payload += len;
691 		(*iov_index)++;
692 	} while (remaining);
693 
694 	return 0;
695 }
696 
697 int
698 vhost_vring_packed_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
699 			       uint16_t *iov_index, const struct vring_packed_desc *desc)
700 {
701 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
702 					       desc->addr, desc->len);
703 }
704 
705 int
706 vhost_vring_inflight_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
707 				 uint16_t *iov_index, const spdk_vhost_inflight_desc *desc)
708 {
709 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
710 					       desc->addr, desc->len);
711 }
712 
713 /* 1, Traverse the desc chain to get the buffer_id and return buffer_id as task_idx.
714  * 2, Update the vq->last_avail_idx to point next available desc chain.
715  * 3, Update the avail_wrap_counter if last_avail_idx overturn.
716  */
717 uint16_t
718 vhost_vring_packed_desc_get_buffer_id(struct spdk_vhost_virtqueue *vq, uint16_t req_idx,
719 				      uint16_t *num_descs)
720 {
721 	struct vring_packed_desc *desc;
722 	uint16_t desc_head = req_idx;
723 
724 	*num_descs = 1;
725 
726 	desc =  &vq->vring.desc_packed[req_idx];
727 	if (!vhost_vring_packed_desc_is_indirect(desc)) {
728 		while ((desc->flags & VRING_DESC_F_NEXT) != 0) {
729 			req_idx = (req_idx + 1) % vq->vring.size;
730 			desc = &vq->vring.desc_packed[req_idx];
731 			(*num_descs)++;
732 		}
733 	}
734 
735 	/* Queue Size doesn't have to be a power of 2
736 	 * Device maintains last_avail_idx so we can make sure
737 	 * the value is valid(0 ~ vring.size - 1)
738 	 */
739 	vq->last_avail_idx = (req_idx + 1) % vq->vring.size;
740 	if (vq->last_avail_idx < desc_head) {
741 		vq->packed.avail_phase = !vq->packed.avail_phase;
742 	}
743 
744 	return desc->id;
745 }
746 
747 int
748 vhost_vring_desc_get_next(struct vring_desc **desc,
749 			  struct vring_desc *desc_table, uint32_t desc_table_size)
750 {
751 	struct vring_desc *old_desc = *desc;
752 	uint16_t next_idx;
753 
754 	if ((old_desc->flags & VRING_DESC_F_NEXT) == 0) {
755 		*desc = NULL;
756 		return 0;
757 	}
758 
759 	next_idx = old_desc->next;
760 	if (spdk_unlikely(next_idx >= desc_table_size)) {
761 		*desc = NULL;
762 		return -1;
763 	}
764 
765 	*desc = &desc_table[next_idx];
766 	return 0;
767 }
768 
769 int
770 vhost_vring_desc_to_iov(struct spdk_vhost_session *vsession, struct iovec *iov,
771 			uint16_t *iov_index, const struct vring_desc *desc)
772 {
773 	return vhost_vring_desc_payload_to_iov(vsession, iov, iov_index,
774 					       desc->addr, desc->len);
775 }
776 
777 static inline void
778 vhost_session_mem_region_calc(uint64_t *previous_start, uint64_t *start, uint64_t *end,
779 			      uint64_t *len, struct rte_vhost_mem_region *region)
780 {
781 	*start = FLOOR_2MB(region->mmap_addr);
782 	*end = CEIL_2MB(region->mmap_addr + region->mmap_size);
783 	if (*start == *previous_start) {
784 		*start += (size_t) VALUE_2MB;
785 	}
786 	*previous_start = *start;
787 	*len = *end - *start;
788 }
789 
790 void
791 vhost_session_mem_register(struct rte_vhost_memory *mem)
792 {
793 	uint64_t start, end, len;
794 	uint32_t i;
795 	uint64_t previous_start = UINT64_MAX;
796 
797 
798 	for (i = 0; i < mem->nregions; i++) {
799 		vhost_session_mem_region_calc(&previous_start, &start, &end, &len, &mem->regions[i]);
800 		SPDK_INFOLOG(vhost, "Registering VM memory for vtophys translation - 0x%jx len:0x%jx\n",
801 			     start, len);
802 
803 		if (spdk_mem_register((void *)start, len) != 0) {
804 			SPDK_WARNLOG("Failed to register memory region %"PRIu32". Future vtophys translation might fail.\n",
805 				     i);
806 			continue;
807 		}
808 	}
809 }
810 
811 void
812 vhost_session_mem_unregister(struct rte_vhost_memory *mem)
813 {
814 	uint64_t start, end, len;
815 	uint32_t i;
816 	uint64_t previous_start = UINT64_MAX;
817 
818 	for (i = 0; i < mem->nregions; i++) {
819 		vhost_session_mem_region_calc(&previous_start, &start, &end, &len, &mem->regions[i]);
820 		if (spdk_vtophys((void *) start, NULL) == SPDK_VTOPHYS_ERROR) {
821 			continue; /* region has not been registered */
822 		}
823 
824 		if (spdk_mem_unregister((void *)start, len) != 0) {
825 			assert(false);
826 		}
827 	}
828 }
829 
830 static int
831 _stop_session(struct spdk_vhost_session *vsession)
832 {
833 	struct spdk_vhost_dev *vdev = vsession->vdev;
834 	struct spdk_vhost_virtqueue *q;
835 	int rc;
836 	uint16_t i;
837 
838 	rc = vdev->backend->stop_session(vsession);
839 	if (rc != 0) {
840 		SPDK_ERRLOG("Couldn't stop device with vid %d.\n", vsession->vid);
841 		return rc;
842 	}
843 
844 	for (i = 0; i < vsession->max_queues; i++) {
845 		q = &vsession->virtqueue[i];
846 
847 		/* vring.desc and vring.desc_packed are in a union struct
848 		 * so q->vring.desc can replace q->vring.desc_packed.
849 		 */
850 		if (q->vring.desc == NULL) {
851 			continue;
852 		}
853 
854 		/* Packed virtqueues support up to 2^15 entries each
855 		 * so left one bit can be used as wrap counter.
856 		 */
857 		if (q->packed.packed_ring) {
858 			q->last_avail_idx = q->last_avail_idx |
859 					    ((uint16_t)q->packed.avail_phase << 15);
860 			q->last_used_idx = q->last_used_idx |
861 					   ((uint16_t)q->packed.used_phase << 15);
862 		}
863 
864 		rte_vhost_set_vring_base(vsession->vid, i, q->last_avail_idx, q->last_used_idx);
865 	}
866 
867 	vhost_session_mem_unregister(vsession->mem);
868 	free(vsession->mem);
869 
870 	return 0;
871 }
872 
873 static int
874 new_connection(int vid)
875 {
876 	struct spdk_vhost_dev *vdev;
877 	struct spdk_vhost_session *vsession;
878 	size_t dev_dirname_len;
879 	char ifname[PATH_MAX];
880 	char *ctrlr_name;
881 
882 	if (rte_vhost_get_ifname(vid, ifname, PATH_MAX) < 0) {
883 		SPDK_ERRLOG("Couldn't get a valid ifname for device with vid %d\n", vid);
884 		return -1;
885 	}
886 
887 	spdk_vhost_lock();
888 
889 	ctrlr_name = &ifname[0];
890 	dev_dirname_len = strlen(g_vhost_user_dev_dirname);
891 	if (strncmp(ctrlr_name, g_vhost_user_dev_dirname, dev_dirname_len) == 0) {
892 		ctrlr_name += dev_dirname_len;
893 	}
894 
895 	vdev = spdk_vhost_dev_find(ctrlr_name);
896 	if (vdev == NULL) {
897 		SPDK_ERRLOG("Couldn't find device with vid %d to create connection for.\n", vid);
898 		spdk_vhost_unlock();
899 		return -1;
900 	}
901 
902 	/* We expect sessions inside vdev->vsessions to be sorted in ascending
903 	 * order in regard of vsession->id. For now we always set id = vsessions_cnt++
904 	 * and append each session to the very end of the vsessions list.
905 	 * This is required for vhost_user_dev_foreach_session() to work.
906 	 */
907 	if (vdev->vsessions_num == UINT_MAX) {
908 		assert(false);
909 		return -EINVAL;
910 	}
911 
912 	if (posix_memalign((void **)&vsession, SPDK_CACHE_LINE_SIZE, sizeof(*vsession) +
913 			   vdev->backend->session_ctx_size)) {
914 		SPDK_ERRLOG("vsession alloc failed\n");
915 		spdk_vhost_unlock();
916 		return -1;
917 	}
918 	memset(vsession, 0, sizeof(*vsession) + vdev->backend->session_ctx_size);
919 
920 	vsession->vdev = vdev;
921 	vsession->vid = vid;
922 	vsession->id = vdev->vsessions_num++;
923 	vsession->name = spdk_sprintf_alloc("%ss%u", vdev->name, vsession->vid);
924 	if (vsession->name == NULL) {
925 		SPDK_ERRLOG("vsession alloc failed\n");
926 		spdk_vhost_unlock();
927 		free(vsession);
928 		return -1;
929 	}
930 	vsession->started = false;
931 	vsession->initialized = false;
932 	vsession->next_stats_check_time = 0;
933 	vsession->stats_check_interval = SPDK_VHOST_STATS_CHECK_INTERVAL_MS *
934 					 spdk_get_ticks_hz() / 1000UL;
935 	TAILQ_INSERT_TAIL(&vdev->vsessions, vsession, tailq);
936 
937 	vhost_session_install_rte_compat_hooks(vsession);
938 	spdk_vhost_unlock();
939 	return 0;
940 }
941 
942 static int
943 start_device(int vid)
944 {
945 	struct spdk_vhost_dev *vdev;
946 	struct spdk_vhost_session *vsession;
947 	int rc = -1;
948 	uint16_t i;
949 	bool packed_ring;
950 
951 	spdk_vhost_lock();
952 
953 	vsession = vhost_session_find_by_vid(vid);
954 	if (vsession == NULL) {
955 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
956 		goto out;
957 	}
958 
959 	vdev = vsession->vdev;
960 	if (vsession->started) {
961 		/* already started, nothing to do */
962 		rc = 0;
963 		goto out;
964 	}
965 
966 	if (vhost_get_negotiated_features(vid, &vsession->negotiated_features) != 0) {
967 		SPDK_ERRLOG("vhost device %d: Failed to get negotiated driver features\n", vid);
968 		goto out;
969 	}
970 
971 	packed_ring = ((vsession->negotiated_features & (1ULL << VIRTIO_F_RING_PACKED)) != 0);
972 
973 	vsession->max_queues = 0;
974 	memset(vsession->virtqueue, 0, sizeof(vsession->virtqueue));
975 	for (i = 0; i < SPDK_VHOST_MAX_VQUEUES; i++) {
976 		struct spdk_vhost_virtqueue *q = &vsession->virtqueue[i];
977 
978 		q->vsession = vsession;
979 		q->vring_idx = -1;
980 		if (rte_vhost_get_vhost_vring(vid, i, &q->vring)) {
981 			continue;
982 		}
983 		q->vring_idx = i;
984 		rte_vhost_get_vhost_ring_inflight(vid, i, &q->vring_inflight);
985 
986 		/* vring.desc and vring.desc_packed are in a union struct
987 		 * so q->vring.desc can replace q->vring.desc_packed.
988 		 */
989 		if (q->vring.desc == NULL || q->vring.size == 0) {
990 			continue;
991 		}
992 
993 		if (rte_vhost_get_vring_base(vsession->vid, i, &q->last_avail_idx, &q->last_used_idx)) {
994 			q->vring.desc = NULL;
995 			continue;
996 		}
997 
998 		if (packed_ring) {
999 			/* Use the inflight mem to restore the last_avail_idx and last_used_idx.
1000 			 * When the vring format is packed, there is no used_idx in the
1001 			 * used ring, so VM can't resend the used_idx to VHOST when reconnect.
1002 			 * QEMU version 5.2.0 supports the packed inflight before that it only
1003 			 * supports split ring inflight because it doesn't send negotiated features
1004 			 * before get inflight fd. Users can use RPC to enable this function.
1005 			 */
1006 			if (spdk_unlikely(g_packed_ring_recovery)) {
1007 				rte_vhost_get_vring_base_from_inflight(vsession->vid, i,
1008 								       &q->last_avail_idx,
1009 								       &q->last_used_idx);
1010 			}
1011 
1012 			/* Packed virtqueues support up to 2^15 entries each
1013 			 * so left one bit can be used as wrap counter.
1014 			 */
1015 			q->packed.avail_phase = q->last_avail_idx >> 15;
1016 			q->last_avail_idx = q->last_avail_idx & 0x7FFF;
1017 			q->packed.used_phase = q->last_used_idx >> 15;
1018 			q->last_used_idx = q->last_used_idx & 0x7FFF;
1019 
1020 			if (!vsession->interrupt_mode) {
1021 				/* Disable I/O submission notifications, we'll be polling. */
1022 				q->vring.device_event->flags = VRING_PACKED_EVENT_FLAG_DISABLE;
1023 			}
1024 		} else {
1025 			if (!vsession->interrupt_mode) {
1026 				/* Disable I/O submission notifications, we'll be polling. */
1027 				q->vring.used->flags = VRING_USED_F_NO_NOTIFY;
1028 			}
1029 		}
1030 
1031 		q->packed.packed_ring = packed_ring;
1032 		vsession->max_queues = i + 1;
1033 	}
1034 
1035 	if (vhost_get_mem_table(vid, &vsession->mem) != 0) {
1036 		SPDK_ERRLOG("vhost device %d: Failed to get guest memory table\n", vid);
1037 		goto out;
1038 	}
1039 
1040 	/*
1041 	 * Not sure right now but this look like some kind of QEMU bug and guest IO
1042 	 * might be frozed without kicking all queues after live-migration. This look like
1043 	 * the previous vhost instance failed to effectively deliver all interrupts before
1044 	 * the GET_VRING_BASE message. This shouldn't harm guest since spurious interrupts
1045 	 * should be ignored by guest virtio driver.
1046 	 *
1047 	 * Tested on QEMU 2.10.91 and 2.11.50.
1048 	 */
1049 	for (i = 0; i < vsession->max_queues; i++) {
1050 		struct spdk_vhost_virtqueue *q = &vsession->virtqueue[i];
1051 
1052 		/* vring.desc and vring.desc_packed are in a union struct
1053 		 * so q->vring.desc can replace q->vring.desc_packed.
1054 		 */
1055 		if (q->vring.desc != NULL && q->vring.size > 0) {
1056 			rte_vhost_vring_call(vsession->vid, q->vring_idx);
1057 		}
1058 	}
1059 
1060 	vhost_user_session_set_coalescing(vdev, vsession, NULL);
1061 	vhost_session_mem_register(vsession->mem);
1062 	vsession->initialized = true;
1063 	rc = vdev->backend->start_session(vsession);
1064 	if (rc != 0) {
1065 		vhost_session_mem_unregister(vsession->mem);
1066 		free(vsession->mem);
1067 		goto out;
1068 	}
1069 
1070 out:
1071 	spdk_vhost_unlock();
1072 	return rc;
1073 }
1074 
1075 static void
1076 stop_device(int vid)
1077 {
1078 	struct spdk_vhost_session *vsession;
1079 
1080 	spdk_vhost_lock();
1081 	vsession = vhost_session_find_by_vid(vid);
1082 	if (vsession == NULL) {
1083 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1084 		spdk_vhost_unlock();
1085 		return;
1086 	}
1087 
1088 	if (!vsession->started) {
1089 		/* already stopped, nothing to do */
1090 		spdk_vhost_unlock();
1091 		return;
1092 	}
1093 
1094 	_stop_session(vsession);
1095 	spdk_vhost_unlock();
1096 
1097 	return;
1098 }
1099 
1100 static void
1101 destroy_connection(int vid)
1102 {
1103 	struct spdk_vhost_session *vsession;
1104 
1105 	spdk_vhost_lock();
1106 	vsession = vhost_session_find_by_vid(vid);
1107 	if (vsession == NULL) {
1108 		SPDK_ERRLOG("Couldn't find session with vid %d.\n", vid);
1109 		spdk_vhost_unlock();
1110 		return;
1111 	}
1112 
1113 	if (vsession->started) {
1114 		if (_stop_session(vsession) != 0) {
1115 			spdk_vhost_unlock();
1116 			return;
1117 		}
1118 	}
1119 
1120 	TAILQ_REMOVE(&vsession->vdev->vsessions, vsession, tailq);
1121 	free(vsession->name);
1122 	free(vsession);
1123 	spdk_vhost_unlock();
1124 }
1125 
1126 #if RTE_VERSION >= RTE_VERSION_NUM(21, 11, 0, 0)
1127 static const struct rte_vhost_device_ops g_spdk_vhost_ops = {
1128 #else
1129 static const struct vhost_device_ops g_spdk_vhost_ops = {
1130 #endif
1131 	.new_device =  start_device,
1132 	.destroy_device = stop_device,
1133 	.new_connection = new_connection,
1134 	.destroy_connection = destroy_connection,
1135 };
1136 
1137 static struct spdk_vhost_session *
1138 vhost_session_find_by_id(struct spdk_vhost_dev *vdev, unsigned id)
1139 {
1140 	struct spdk_vhost_session *vsession;
1141 
1142 	TAILQ_FOREACH(vsession, &vdev->vsessions, tailq) {
1143 		if (vsession->id == id) {
1144 			return vsession;
1145 		}
1146 	}
1147 
1148 	return NULL;
1149 }
1150 
1151 struct spdk_vhost_session *
1152 vhost_session_find_by_vid(int vid)
1153 {
1154 	struct spdk_vhost_dev *vdev;
1155 	struct spdk_vhost_session *vsession;
1156 
1157 	for (vdev = spdk_vhost_dev_next(NULL); vdev != NULL;
1158 	     vdev = spdk_vhost_dev_next(vdev)) {
1159 		TAILQ_FOREACH(vsession, &vdev->vsessions, tailq) {
1160 			if (vsession->vid == vid) {
1161 				return vsession;
1162 			}
1163 		}
1164 	}
1165 
1166 	return NULL;
1167 }
1168 
1169 static void
1170 wait_for_semaphore(int timeout_sec, const char *errmsg)
1171 {
1172 	struct timespec timeout;
1173 	int rc;
1174 
1175 	clock_gettime(CLOCK_REALTIME, &timeout);
1176 	timeout.tv_sec += timeout_sec;
1177 	rc = sem_timedwait(&g_dpdk_sem, &timeout);
1178 	if (rc != 0) {
1179 		SPDK_ERRLOG("Timeout waiting for event: %s.\n", errmsg);
1180 		sem_wait(&g_dpdk_sem);
1181 	}
1182 }
1183 
1184 static void
1185 vhost_session_cb_done(int rc)
1186 {
1187 	g_dpdk_response = rc;
1188 	sem_post(&g_dpdk_sem);
1189 }
1190 
1191 void
1192 vhost_user_session_start_done(struct spdk_vhost_session *vsession, int response)
1193 {
1194 	if (response == 0) {
1195 		vsession->started = true;
1196 
1197 		assert(vsession->vdev->active_session_num < UINT32_MAX);
1198 		vsession->vdev->active_session_num++;
1199 	}
1200 
1201 	vhost_session_cb_done(response);
1202 }
1203 
1204 void
1205 vhost_user_session_stop_done(struct spdk_vhost_session *vsession, int response)
1206 {
1207 	if (response == 0) {
1208 		vsession->started = false;
1209 
1210 		assert(vsession->vdev->active_session_num > 0);
1211 		vsession->vdev->active_session_num--;
1212 	}
1213 
1214 	vhost_session_cb_done(response);
1215 }
1216 
1217 static void
1218 vhost_event_cb(void *arg1)
1219 {
1220 	struct vhost_session_fn_ctx *ctx = arg1;
1221 	struct spdk_vhost_session *vsession;
1222 
1223 	if (spdk_vhost_trylock() != 0) {
1224 		spdk_thread_send_msg(spdk_get_thread(), vhost_event_cb, arg1);
1225 		return;
1226 	}
1227 
1228 	vsession = vhost_session_find_by_id(ctx->vdev, ctx->vsession_id);
1229 	ctx->cb_fn(ctx->vdev, vsession, NULL);
1230 	spdk_vhost_unlock();
1231 }
1232 
1233 int
1234 vhost_user_session_send_event(struct spdk_vhost_session *vsession,
1235 			 spdk_vhost_session_fn cb_fn, unsigned timeout_sec,
1236 			 const char *errmsg)
1237 {
1238 	struct vhost_session_fn_ctx ev_ctx = {0};
1239 	struct spdk_vhost_dev *vdev = vsession->vdev;
1240 
1241 	ev_ctx.vdev = vdev;
1242 	ev_ctx.vsession_id = vsession->id;
1243 	ev_ctx.cb_fn = cb_fn;
1244 
1245 	spdk_thread_send_msg(vdev->thread, vhost_event_cb, &ev_ctx);
1246 
1247 	spdk_vhost_unlock();
1248 	wait_for_semaphore(timeout_sec, errmsg);
1249 	spdk_vhost_lock();
1250 
1251 	return g_dpdk_response;
1252 }
1253 
1254 static void
1255 foreach_session_finish_cb(void *arg1)
1256 {
1257 	struct vhost_session_fn_ctx *ev_ctx = arg1;
1258 	struct spdk_vhost_dev *vdev = ev_ctx->vdev;
1259 
1260 	if (spdk_vhost_trylock() != 0) {
1261 		spdk_thread_send_msg(spdk_get_thread(),
1262 				     foreach_session_finish_cb, arg1);
1263 		return;
1264 	}
1265 
1266 	assert(vdev->pending_async_op_num > 0);
1267 	vdev->pending_async_op_num--;
1268 	if (ev_ctx->cpl_fn != NULL) {
1269 		ev_ctx->cpl_fn(vdev, ev_ctx->user_ctx);
1270 	}
1271 
1272 	spdk_vhost_unlock();
1273 	free(ev_ctx);
1274 }
1275 
1276 static void
1277 foreach_session(void *arg1)
1278 {
1279 	struct vhost_session_fn_ctx *ev_ctx = arg1;
1280 	struct spdk_vhost_session *vsession;
1281 	struct spdk_vhost_dev *vdev = ev_ctx->vdev;
1282 	int rc;
1283 
1284 	if (spdk_vhost_trylock() != 0) {
1285 		spdk_thread_send_msg(spdk_get_thread(), foreach_session, arg1);
1286 		return;
1287 	}
1288 
1289 	TAILQ_FOREACH(vsession, &vdev->vsessions, tailq) {
1290 		if (vsession->initialized) {
1291 			rc = ev_ctx->cb_fn(vdev, vsession, ev_ctx->user_ctx);
1292 			if (rc < 0) {
1293 				goto out;
1294 			}
1295 		}
1296 	}
1297 
1298 out:
1299 	spdk_vhost_unlock();
1300 
1301 	spdk_thread_send_msg(g_vhost_user_init_thread, foreach_session_finish_cb, arg1);
1302 }
1303 
1304 void
1305 vhost_user_dev_foreach_session(struct spdk_vhost_dev *vdev,
1306 			  spdk_vhost_session_fn fn,
1307 			  spdk_vhost_dev_fn cpl_fn,
1308 			  void *arg)
1309 {
1310 	struct vhost_session_fn_ctx *ev_ctx;
1311 
1312 	ev_ctx = calloc(1, sizeof(*ev_ctx));
1313 	if (ev_ctx == NULL) {
1314 		SPDK_ERRLOG("Failed to alloc vhost event.\n");
1315 		assert(false);
1316 		return;
1317 	}
1318 
1319 	ev_ctx->vdev = vdev;
1320 	ev_ctx->cb_fn = fn;
1321 	ev_ctx->cpl_fn = cpl_fn;
1322 	ev_ctx->user_ctx = arg;
1323 
1324 	assert(vdev->pending_async_op_num < UINT32_MAX);
1325 	vdev->pending_async_op_num++;
1326 
1327 	spdk_thread_send_msg(vdev->thread, foreach_session, ev_ctx);
1328 }
1329 
1330 void
1331 vhost_user_session_set_interrupt_mode(struct spdk_vhost_session *vsession, bool interrupt_mode)
1332 {
1333 	uint16_t i;
1334 	bool packed_ring;
1335 	int rc = 0;
1336 
1337 	packed_ring = ((vsession->negotiated_features & (1ULL << VIRTIO_F_RING_PACKED)) != 0);
1338 
1339 	for (i = 0; i < vsession->max_queues; i++) {
1340 		struct spdk_vhost_virtqueue *q = &vsession->virtqueue[i];
1341 		uint64_t num_events = 1;
1342 
1343 		/* vring.desc and vring.desc_packed are in a union struct
1344 		 * so q->vring.desc can replace q->vring.desc_packed.
1345 		 */
1346 		if (q->vring.desc == NULL || q->vring.size == 0) {
1347 			continue;
1348 		}
1349 
1350 		if (interrupt_mode) {
1351 			/* Enable I/O submission notifications, we'll be interrupting. */
1352 			if (packed_ring) {
1353 				* (volatile uint16_t *) &q->vring.device_event->flags = VRING_PACKED_EVENT_FLAG_ENABLE;
1354 			} else {
1355 				* (volatile uint16_t *) &q->vring.used->flags = 0;
1356 			}
1357 
1358 			/* In case of race condition, always kick vring when switch to intr */
1359 			rc = write(q->vring.kickfd, &num_events, sizeof(num_events));
1360 			if (rc < 0) {
1361 				SPDK_ERRLOG("failed to kick vring: %s.\n", spdk_strerror(errno));
1362 			}
1363 
1364 			vsession->interrupt_mode = true;
1365 		} else {
1366 			/* Disable I/O submission notifications, we'll be polling. */
1367 			if (packed_ring) {
1368 				* (volatile uint16_t *) &q->vring.device_event->flags = VRING_PACKED_EVENT_FLAG_DISABLE;
1369 			} else {
1370 				* (volatile uint16_t *) &q->vring.used->flags = VRING_USED_F_NO_NOTIFY;
1371 			}
1372 
1373 			vsession->interrupt_mode = false;
1374 		}
1375 	}
1376 }
1377 
1378 
1379 static enum rte_vhost_msg_result
1380 extern_vhost_pre_msg_handler(int vid, void *_msg)
1381 {
1382 	struct vhost_user_msg *msg = _msg;
1383 	struct spdk_vhost_session *vsession;
1384 
1385 	vsession = vhost_session_find_by_vid(vid);
1386 	if (vsession == NULL) {
1387 		SPDK_ERRLOG("Received a message to unitialized session (vid %d).\n", vid);
1388 		assert(false);
1389 		return RTE_VHOST_MSG_RESULT_ERR;
1390 	}
1391 
1392 	switch (msg->request) {
1393 	case VHOST_USER_GET_VRING_BASE:
1394 		if (vsession->forced_polling && vsession->started) {
1395 			/* Our queue is stopped for whatever reason, but we may still
1396 			 * need to poll it after it's initialized again.
1397 			 */
1398 			g_spdk_vhost_ops.destroy_device(vid);
1399 		}
1400 		break;
1401 	case VHOST_USER_SET_VRING_BASE:
1402 	case VHOST_USER_SET_VRING_ADDR:
1403 	case VHOST_USER_SET_VRING_NUM:
1404 		if (vsession->forced_polling && vsession->started) {
1405 			/* Additional queues are being initialized, so we either processed
1406 			 * enough I/Os and are switching from SeaBIOS to the OS now, or
1407 			 * we were never in SeaBIOS in the first place. Either way, we
1408 			 * don't need our workaround anymore.
1409 			 */
1410 			g_spdk_vhost_ops.destroy_device(vid);
1411 			vsession->forced_polling = false;
1412 		}
1413 		break;
1414 	case VHOST_USER_SET_VRING_KICK:
1415 		/* rte_vhost(after 20.08) will call new_device after one active vring is
1416 		 * configured, we will start the session before all vrings are available,
1417 		 * so for each new vring, if the session is started, we need to restart it
1418 		 * again.
1419 		 */
1420 	case VHOST_USER_SET_VRING_CALL:
1421 		/* rte_vhost will close the previous callfd and won't notify
1422 		 * us about any change. This will effectively make SPDK fail
1423 		 * to deliver any subsequent interrupts until a session is
1424 		 * restarted. We stop the session here before closing the previous
1425 		 * fd (so that all interrupts must have been delivered by the
1426 		 * time the descriptor is closed) and start right after (which
1427 		 * will make SPDK retrieve the latest, up-to-date callfd from
1428 		 * rte_vhost.
1429 		 */
1430 	case VHOST_USER_SET_MEM_TABLE:
1431 		/* rte_vhost will unmap previous memory that SPDK may still
1432 		 * have pending DMA operations on. We can't let that happen,
1433 		 * so stop the device before letting rte_vhost unmap anything.
1434 		 * This will block until all pending I/Os are finished.
1435 		 * We will start the device again from the post-processing
1436 		 * message handler.
1437 		 */
1438 		if (vsession->started) {
1439 			g_spdk_vhost_ops.destroy_device(vid);
1440 			vsession->needs_restart = true;
1441 		}
1442 		break;
1443 	case VHOST_USER_GET_CONFIG: {
1444 		int rc = 0;
1445 
1446 		spdk_vhost_lock();
1447 		if (vsession->vdev->backend->vhost_get_config) {
1448 			rc = vsession->vdev->backend->vhost_get_config(vsession->vdev,
1449 				msg->payload.cfg.region, msg->payload.cfg.size);
1450 			if (rc != 0) {
1451 				msg->size = 0;
1452 			}
1453 		}
1454 		spdk_vhost_unlock();
1455 
1456 		return RTE_VHOST_MSG_RESULT_REPLY;
1457 	}
1458 	case VHOST_USER_SET_CONFIG: {
1459 		int rc = 0;
1460 
1461 		spdk_vhost_lock();
1462 		if (vsession->vdev->backend->vhost_set_config) {
1463 			rc = vsession->vdev->backend->vhost_set_config(vsession->vdev,
1464 				msg->payload.cfg.region, msg->payload.cfg.offset,
1465 				msg->payload.cfg.size, msg->payload.cfg.flags);
1466 		}
1467 		spdk_vhost_unlock();
1468 
1469 		return rc == 0 ? RTE_VHOST_MSG_RESULT_OK : RTE_VHOST_MSG_RESULT_ERR;
1470 	}
1471 	default:
1472 		break;
1473 	}
1474 
1475 	return RTE_VHOST_MSG_RESULT_NOT_HANDLED;
1476 }
1477 
1478 static enum rte_vhost_msg_result
1479 extern_vhost_post_msg_handler(int vid, void *_msg)
1480 {
1481 	struct vhost_user_msg *msg = _msg;
1482 	struct spdk_vhost_session *vsession;
1483 
1484 	vsession = vhost_session_find_by_vid(vid);
1485 	if (vsession == NULL) {
1486 		SPDK_ERRLOG("Received a message to unitialized session (vid %d).\n", vid);
1487 		assert(false);
1488 		return RTE_VHOST_MSG_RESULT_ERR;
1489 	}
1490 
1491 	if (vsession->needs_restart) {
1492 		g_spdk_vhost_ops.new_device(vid);
1493 		vsession->needs_restart = false;
1494 		return RTE_VHOST_MSG_RESULT_NOT_HANDLED;
1495 	}
1496 
1497 	switch (msg->request) {
1498 	case VHOST_USER_SET_FEATURES:
1499 		/* rte_vhost requires all queues to be fully initialized in order
1500 		 * to start I/O processing. This behavior is not compliant with the
1501 		 * vhost-user specification and doesn't work with QEMU 2.12+, which
1502 		 * will only initialize 1 I/O queue for the SeaBIOS boot.
1503 		 * Theoretically, we should start polling each virtqueue individually
1504 		 * after receiving its SET_VRING_KICK message, but rte_vhost is not
1505 		 * designed to poll individual queues. So here we use a workaround
1506 		 * to detect when the vhost session could be potentially at that SeaBIOS
1507 		 * stage and we mark it to start polling as soon as its first virtqueue
1508 		 * gets initialized. This doesn't hurt any non-QEMU vhost slaves
1509 		 * and allows QEMU 2.12+ to boot correctly. SET_FEATURES could be sent
1510 		 * at any time, but QEMU will send it at least once on SeaBIOS
1511 		 * initialization - whenever powered-up or rebooted.
1512 		 */
1513 		vsession->forced_polling = true;
1514 		break;
1515 	case VHOST_USER_SET_VRING_KICK:
1516 		/* vhost-user spec tells us to start polling a queue after receiving
1517 		 * its SET_VRING_KICK message. Let's do it!
1518 		 */
1519 		if (vsession->forced_polling && !vsession->started) {
1520 			g_spdk_vhost_ops.new_device(vid);
1521 		}
1522 		break;
1523 	default:
1524 		break;
1525 	}
1526 
1527 	return RTE_VHOST_MSG_RESULT_NOT_HANDLED;
1528 }
1529 
1530 struct rte_vhost_user_extern_ops g_spdk_extern_vhost_ops = {
1531 	.pre_msg_handle = extern_vhost_pre_msg_handler,
1532 	.post_msg_handle = extern_vhost_post_msg_handler,
1533 };
1534 
1535 void
1536 vhost_session_install_rte_compat_hooks(struct spdk_vhost_session *vsession)
1537 {
1538 	int rc;
1539 
1540 	rc = rte_vhost_extern_callback_register(vsession->vid, &g_spdk_extern_vhost_ops, NULL);
1541 	if (rc != 0) {
1542 		SPDK_ERRLOG("rte_vhost_extern_callback_register() failed for vid = %d\n",
1543 			    vsession->vid);
1544 		return;
1545 	}
1546 }
1547 
1548 int
1549 vhost_register_unix_socket(const char *path, const char *ctrl_name,
1550 			   uint64_t virtio_features, uint64_t disabled_features, uint64_t protocol_features)
1551 {
1552 	struct stat file_stat;
1553 	uint64_t features = 0;
1554 
1555 	/* Register vhost driver to handle vhost messages. */
1556 	if (stat(path, &file_stat) != -1) {
1557 		if (!S_ISSOCK(file_stat.st_mode)) {
1558 			SPDK_ERRLOG("Cannot create a domain socket at path \"%s\": "
1559 				    "The file already exists and is not a socket.\n",
1560 				    path);
1561 			return -EIO;
1562 		} else if (unlink(path) != 0) {
1563 			SPDK_ERRLOG("Cannot create a domain socket at path \"%s\": "
1564 				    "The socket already exists and failed to unlink.\n",
1565 				    path);
1566 			return -EIO;
1567 		}
1568 	}
1569 
1570 #if RTE_VERSION < RTE_VERSION_NUM(20, 8, 0, 0)
1571 	if (rte_vhost_driver_register(path, 0) != 0) {
1572 #else
1573 	if (rte_vhost_driver_register(path, RTE_VHOST_USER_ASYNC_COPY) != 0) {
1574 #endif
1575 		SPDK_ERRLOG("Could not register controller %s with vhost library\n", ctrl_name);
1576 		SPDK_ERRLOG("Check if domain socket %s already exists\n", path);
1577 		return -EIO;
1578 	}
1579 	if (rte_vhost_driver_set_features(path, virtio_features) ||
1580 	    rte_vhost_driver_disable_features(path, disabled_features)) {
1581 		SPDK_ERRLOG("Couldn't set vhost features for controller %s\n", ctrl_name);
1582 
1583 		rte_vhost_driver_unregister(path);
1584 		return -EIO;
1585 	}
1586 
1587 	if (rte_vhost_driver_callback_register(path, &g_spdk_vhost_ops) != 0) {
1588 		rte_vhost_driver_unregister(path);
1589 		SPDK_ERRLOG("Couldn't register callbacks for controller %s\n", ctrl_name);
1590 		return -EIO;
1591 	}
1592 
1593 	rte_vhost_driver_get_protocol_features(path, &features);
1594 	features |= protocol_features;
1595 	rte_vhost_driver_set_protocol_features(path, features);
1596 
1597 	if (rte_vhost_driver_start(path) != 0) {
1598 		SPDK_ERRLOG("Failed to start vhost driver for controller %s (%d): %s\n",
1599 			    ctrl_name, errno, spdk_strerror(errno));
1600 		rte_vhost_driver_unregister(path);
1601 		return -EIO;
1602 	}
1603 
1604 	return 0;
1605 }
1606 
1607 int
1608 vhost_get_mem_table(int vid, struct rte_vhost_memory **mem)
1609 {
1610 	return rte_vhost_get_mem_table(vid, mem);
1611 }
1612 
1613 int
1614 vhost_driver_unregister(const char *path)
1615 {
1616 	return rte_vhost_driver_unregister(path);
1617 }
1618 
1619 int
1620 vhost_get_negotiated_features(int vid, uint64_t *negotiated_features)
1621 {
1622 	return rte_vhost_get_negotiated_features(vid, negotiated_features);
1623 }
1624 
1625 int
1626 vhost_user_dev_set_coalescing(struct spdk_vhost_dev *vdev, uint32_t delay_base_us,
1627 			 uint32_t iops_threshold)
1628 {
1629 	uint64_t delay_time_base = delay_base_us * spdk_get_ticks_hz() / 1000000ULL;
1630 	uint32_t io_rate = iops_threshold * SPDK_VHOST_STATS_CHECK_INTERVAL_MS / 1000U;
1631 
1632 	if (delay_time_base >= UINT32_MAX) {
1633 		SPDK_ERRLOG("Delay time of %"PRIu32" is to big\n", delay_base_us);
1634 		return -EINVAL;
1635 	} else if (io_rate == 0) {
1636 		SPDK_ERRLOG("IOPS rate of %"PRIu32" is too low. Min is %u\n", io_rate,
1637 			    1000U / SPDK_VHOST_STATS_CHECK_INTERVAL_MS);
1638 		return -EINVAL;
1639 	}
1640 
1641 	vdev->coalescing_delay_us = delay_base_us;
1642 	vdev->coalescing_iops_threshold = iops_threshold;
1643 	return 0;
1644 }
1645 
1646 int
1647 vhost_user_session_set_coalescing(struct spdk_vhost_dev *vdev,
1648 			     struct spdk_vhost_session *vsession, void *ctx)
1649 {
1650 	vsession->coalescing_delay_time_base =
1651 		vdev->coalescing_delay_us * spdk_get_ticks_hz() / 1000000ULL;
1652 	vsession->coalescing_io_rate_threshold =
1653 		vdev->coalescing_iops_threshold * SPDK_VHOST_STATS_CHECK_INTERVAL_MS / 1000U;
1654 	return 0;
1655 }
1656 
1657 int
1658 spdk_vhost_set_coalescing(struct spdk_vhost_dev *vdev, uint32_t delay_base_us,
1659 			  uint32_t iops_threshold)
1660 {
1661 	int rc;
1662 
1663 	rc = vhost_user_dev_set_coalescing(vdev, delay_base_us, iops_threshold);
1664 	if (rc != 0) {
1665 		return rc;
1666 	}
1667 
1668 	vhost_user_dev_foreach_session(vdev, vhost_user_session_set_coalescing, NULL, NULL);
1669 	return 0;
1670 }
1671 
1672 void
1673 spdk_vhost_get_coalescing(struct spdk_vhost_dev *vdev, uint32_t *delay_base_us,
1674 			  uint32_t *iops_threshold)
1675 {
1676 	if (delay_base_us) {
1677 		*delay_base_us = vdev->coalescing_delay_us;
1678 	}
1679 
1680 	if (iops_threshold) {
1681 		*iops_threshold = vdev->coalescing_iops_threshold;
1682 	}
1683 }
1684 
1685 int
1686 spdk_vhost_set_socket_path(const char *basename)
1687 {
1688 	int ret;
1689 
1690 	if (basename && strlen(basename) > 0) {
1691 		ret = snprintf(g_vhost_user_dev_dirname, sizeof(g_vhost_user_dev_dirname) - 2, "%s", basename);
1692 		if (ret <= 0) {
1693 			return -EINVAL;
1694 		}
1695 		if ((size_t)ret >= sizeof(g_vhost_user_dev_dirname) - 2) {
1696 			SPDK_ERRLOG("Char dev dir path length %d is too long\n", ret);
1697 			return -EINVAL;
1698 		}
1699 
1700 		if (g_vhost_user_dev_dirname[ret - 1] != '/') {
1701 			g_vhost_user_dev_dirname[ret] = '/';
1702 			g_vhost_user_dev_dirname[ret + 1]  = '\0';
1703 		}
1704 	}
1705 
1706 	return 0;
1707 }
1708 
1709 static void
1710 vhost_dev_thread_exit(void *arg1)
1711 {
1712 	spdk_thread_exit(spdk_get_thread());
1713 }
1714 
1715 int
1716 vhost_user_dev_register(struct spdk_vhost_dev *vdev, const char *name, struct spdk_cpuset *cpumask,
1717 			const struct spdk_vhost_dev_backend *backend)
1718 {
1719 	char path[PATH_MAX];
1720 
1721 	if (snprintf(path, sizeof(path), "%s%s", g_vhost_user_dev_dirname, name) >= (int)sizeof(path)) {
1722 		SPDK_ERRLOG("Resulting socket path for controller %s is too long: %s%s\n",
1723 				name,g_vhost_user_dev_dirname, name);
1724 		return -EINVAL;
1725 	}
1726 
1727 	vdev->path = strdup(path);
1728 	if (vdev->path == NULL) {
1729 		return -EIO;
1730 	}
1731 
1732 	vdev->thread = spdk_thread_create(vdev->name, cpumask);
1733 	if (vdev->thread == NULL) {
1734 		free(vdev->path);
1735 		SPDK_ERRLOG("Failed to create thread for vhost controller %s.\n", name);
1736 		return -EIO;
1737 	}
1738 
1739 	vdev->registered = true;
1740 	vdev->backend = backend;
1741 	TAILQ_INIT(&vdev->vsessions);
1742 
1743 	vhost_user_dev_set_coalescing(vdev, SPDK_VHOST_COALESCING_DELAY_BASE_US,
1744 				 SPDK_VHOST_VQ_IOPS_COALESCING_THRESHOLD);
1745 
1746 	if (vhost_register_unix_socket(path, name, vdev->virtio_features, vdev->disabled_features,
1747 				       vdev->protocol_features)) {
1748 		spdk_thread_send_msg(vdev->thread, vhost_dev_thread_exit, NULL);
1749 		free(vdev->path);
1750 		return -EIO;
1751 	}
1752 
1753 	return 0;
1754 }
1755 
1756 int
1757 vhost_user_dev_unregister(struct spdk_vhost_dev *vdev)
1758 {
1759 	if (!TAILQ_EMPTY(&vdev->vsessions)) {
1760 		SPDK_ERRLOG("Controller %s has still valid connection.\n", vdev->name);
1761 		return -EBUSY;
1762 	}
1763 
1764 	if (vdev->registered && vhost_driver_unregister(vdev->path) != 0) {
1765 		SPDK_ERRLOG("Could not unregister controller %s with vhost library\n"
1766 			    "Check if domain socket %s still exists\n",
1767 			    vdev->name, vdev->path);
1768 		return -EIO;
1769 	}
1770 
1771 	spdk_thread_send_msg(vdev->thread, vhost_dev_thread_exit, NULL);
1772 	free(vdev->path);
1773 
1774 	return 0;
1775 }
1776 
1777 static bool g_vhost_user_started = false;
1778 
1779 int
1780 vhost_user_init(void)
1781 {
1782 	size_t len;
1783 
1784 	if (g_vhost_user_started) {
1785 		return 0;
1786 	}
1787 
1788 	if (g_vhost_user_dev_dirname[0] == '\0') {
1789 		if (getcwd(g_vhost_user_dev_dirname, sizeof(g_vhost_user_dev_dirname) - 1) == NULL) {
1790 			SPDK_ERRLOG("getcwd failed (%d): %s\n", errno, spdk_strerror(errno));
1791 			return -1;
1792 		}
1793 
1794 		len = strlen(g_vhost_user_dev_dirname);
1795 		if (g_vhost_user_dev_dirname[len - 1] != '/') {
1796 			g_vhost_user_dev_dirname[len] = '/';
1797 			g_vhost_user_dev_dirname[len + 1] = '\0';
1798 		}
1799 	}
1800 
1801 	g_vhost_user_started = true;
1802 
1803 	g_vhost_user_init_thread = spdk_get_thread();
1804 	assert(g_vhost_user_init_thread != NULL);
1805 
1806 	return 0;
1807 }
1808 
1809 static void *
1810 vhost_user_session_shutdown(void *arg)
1811 {
1812 	struct spdk_vhost_dev *vdev = NULL;
1813 	struct spdk_vhost_session *vsession;
1814 	vhost_fini_cb vhost_cb = arg;
1815 
1816 	for (vdev = spdk_vhost_dev_next(NULL); vdev != NULL;
1817 	     vdev = spdk_vhost_dev_next(vdev)) {
1818 		spdk_vhost_lock();
1819 		TAILQ_FOREACH(vsession, &vdev->vsessions, tailq) {
1820 			if (vsession->started) {
1821 				_stop_session(vsession);
1822 			}
1823 		}
1824 		spdk_vhost_unlock();
1825 		vhost_driver_unregister(vdev->path);
1826 		vdev->registered = false;
1827 	}
1828 
1829 	SPDK_INFOLOG(vhost, "Exiting\n");
1830 	spdk_thread_send_msg(g_vhost_user_init_thread, vhost_cb, NULL);
1831 	return NULL;
1832 }
1833 
1834 void
1835 vhost_user_fini(vhost_fini_cb vhost_cb)
1836 {
1837 	pthread_t tid;
1838 	int rc;
1839 
1840 	if (!g_vhost_user_started) {
1841 		vhost_cb(NULL);
1842 		return;
1843 	}
1844 
1845 	g_vhost_user_started = false;
1846 
1847 	/* rte_vhost API for removing sockets is not asynchronous. Since it may call SPDK
1848 	 * ops for stopping a device or removing a connection, we need to call it from
1849 	 * a separate thread to avoid deadlock.
1850 	 */
1851 	rc = pthread_create(&tid, NULL, &vhost_user_session_shutdown, vhost_cb);
1852 	if (rc < 0) {
1853 		SPDK_ERRLOG("Failed to start session shutdown thread (%d): %s\n", rc, spdk_strerror(rc));
1854 		abort();
1855 	}
1856 	pthread_detach(tid);
1857 }
1858