xref: /spdk/lib/vhost/vhost.c (revision 2d2fde0d7fd038942625dad26c0d62d8216afb51)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include <linux/virtio_ring.h>
37 #include <linux/virtio_scsi.h>
38 
39 #include <rte_vhost.h>
40 
41 #include "spdk_internal/log.h"
42 #include "spdk/env.h"
43 #include "spdk/scsi.h"
44 #include "spdk/conf.h"
45 #include "spdk/event.h"
46 #include "spdk/scsi_spec.h"
47 #include "spdk/likely.h"
48 
49 #include "spdk/vhost.h"
50 #include "task.h"
51 #include "vhost_iommu.h"
52 
53 static uint32_t g_num_ctrlrs[RTE_MAX_LCORE];
54 
55 #define CONTROLQ_POLL_PERIOD_US (1000 * 5)
56 
57 #define VIRTIO_SCSI_CONTROLQ   0
58 #define VIRTIO_SCSI_EVENTQ   1
59 #define VIRTIO_SCSI_REQUESTQ   2
60 
61 /* Path to folder where character device will be created. Can be set by user. */
62 static char dev_dirname[PATH_MAX] = "";
63 
64 #define SPDK_CACHE_LINE_SIZE RTE_CACHE_LINE_SIZE
65 
66 #define MAX_VHOST_DEVICE	1024
67 
68 #ifndef VIRTIO_F_VERSION_1
69 #define VIRTIO_F_VERSION_1 32
70 #endif
71 
72 #define VHOST_USER_F_PROTOCOL_FEATURES	30
73 
74 /* Features supported by SPDK VHOST lib. */
75 #define SPDK_VHOST_SCSI_FEATURES	((1ULL << VIRTIO_F_VERSION_1) | \
76 					(1ULL << VHOST_F_LOG_ALL) | \
77 					(1ULL << VHOST_USER_F_PROTOCOL_FEATURES) | \
78 					(1ULL << VIRTIO_SCSI_F_INOUT) | \
79 					(1ULL << VIRTIO_SCSI_F_HOTPLUG) | \
80 					(1ULL << VIRTIO_SCSI_F_CHANGE ) | \
81 					(1ULL << VIRTIO_SCSI_F_T10_PI ))
82 
83 /* Features that are specified in VIRTIO SCSI but currently not supported:
84  * - Live migration not supported yet
85  * - Hotplug/hotremove
86  * - LUN params change
87  * - T10 PI
88  */
89 #define SPDK_VHOST_SCSI_DISABLED_FEATURES	((1ULL << VHOST_F_LOG_ALL) | \
90 						(1ULL << VIRTIO_SCSI_F_HOTPLUG) | \
91 						(1ULL << VIRTIO_SCSI_F_CHANGE ) | \
92 						(1ULL << VIRTIO_SCSI_F_T10_PI ))
93 
94 struct spdk_vhost_dev {
95 	struct rte_vhost_memory *mem;
96 	int vid;
97 	uint16_t num_queues;
98 	struct rte_vhost_vring virtqueue[0] __attribute((aligned(SPDK_CACHE_LINE_SIZE)));
99 };
100 
101 static void
102 spdk_vhost_dev_free(struct spdk_vhost_dev *dev)
103 {
104 	free(dev->mem);
105 	spdk_free(dev);
106 }
107 
108 static void
109 spdk_vhost_dev_destruct(struct spdk_vhost_dev *dev)
110 {
111 	struct rte_vhost_vring *q;
112 	uint16_t i;
113 
114 	for (i = 0; i < dev->num_queues; i++) {
115 		q = &dev->virtqueue[i];
116 		rte_vhost_set_vhost_vring_last_idx(dev->vid, i, q->last_avail_idx, q->last_used_idx);
117 	}
118 
119 	spdk_vhost_dev_free(dev);
120 }
121 
122 static struct spdk_vhost_dev *
123 spdk_vhost_dev_create(int vid)
124 {
125 	uint16_t num_queues = rte_vhost_get_vring_num(vid);
126 	size_t size = sizeof(struct spdk_vhost_dev) + num_queues * sizeof(struct rte_vhost_vring);
127 	struct spdk_vhost_dev *dev = spdk_zmalloc(size, SPDK_CACHE_LINE_SIZE, NULL);
128 	uint16_t i;
129 
130 	if (dev == NULL) {
131 		SPDK_ERRLOG("vhost device %d: Failed to allocate new vhost device with %"PRIu16" queues\n", vid,
132 			    num_queues);
133 		return NULL;
134 	}
135 
136 	for (i = 0; i < num_queues; i++) {
137 		if (rte_vhost_get_vhost_vring(vid, i, &dev->virtqueue[i])) {
138 			SPDK_ERRLOG("vhost device %d: Failed to get information of queue %"PRIu16"\n", vid, i);
139 			goto err;
140 		}
141 
142 		/* Disable notifications. */
143 		if (rte_vhost_enable_guest_notification(vid, i, 0) != 0) {
144 			SPDK_ERRLOG("vhost device %d: Failed to disable guest notification on queue %"PRIu16"\n", vid, i);
145 			goto err;
146 		}
147 
148 	}
149 
150 	dev->vid = vid;
151 	dev->num_queues = num_queues;
152 
153 	if (rte_vhost_get_mem_table(vid, &dev->mem) != 0) {
154 		SPDK_ERRLOG("vhost device %d: Failed to get guest memory table\n", vid);
155 		goto err;
156 	}
157 	return dev;
158 
159 err:
160 	spdk_vhost_dev_free(dev);
161 	return NULL;
162 }
163 
164 static uint64_t
165 gpa_to_vva(struct spdk_vhost_dev *vdev, uint64_t addr)
166 {
167 	return rte_vhost_gpa_to_vva(vdev->mem, addr);
168 }
169 
170 struct spdk_vhost_scsi_ctrlr {
171 	char *name;
172 	struct spdk_vhost_dev *dev;
173 
174 	/**< TODO make this an array of spdk_scsi_devs.  The vhost scsi
175 	 *   request will tell us which scsi_dev to use.
176 	 */
177 	struct spdk_scsi_dev *scsi_dev[SPDK_VHOST_SCSI_CTRLR_MAX_DEVS];
178 
179 	int task_cnt;
180 
181 	struct spdk_poller *requestq_poller;
182 	struct spdk_poller *controlq_poller;
183 
184 	int32_t lcore;
185 
186 	uint64_t cpumask;
187 } __rte_cache_aligned;
188 
189 /* This maps from the integer index passed by DPDK to the our controller representation. */
190 /* MAX_VHOST_DEVICE from DPDK. */
191 static struct spdk_vhost_scsi_ctrlr *dpdk_vid_mapping[MAX_VHOST_DEVICE];
192 
193 /*
194  * Get available requests from avail ring.
195  */
196 static uint16_t
197 vq_avail_ring_get(struct rte_vhost_vring *vq, uint16_t *reqs, uint16_t reqs_len)
198 {
199 	struct vring_avail *avail = vq->avail;
200 	uint16_t size_mask = vq->size - 1;
201 	uint16_t last_idx = vq->last_avail_idx, avail_idx = avail->idx;
202 	uint16_t count = RTE_MIN((avail_idx - last_idx) & size_mask, reqs_len);
203 	uint16_t i;
204 
205 	if (spdk_likely(count == 0)) {
206 		return 0;
207 	}
208 
209 	vq->last_avail_idx += count;
210 	for (i = 0; i < count; i++) {
211 		reqs[i] = vq->avail->ring[(last_idx + i) & size_mask];
212 	}
213 
214 	SPDK_TRACELOG(SPDK_TRACE_VHOST_RING,
215 		      "AVAIL: last_idx=%"PRIu16" avail_idx=%"PRIu16" count=%"PRIu16"\n",
216 		      last_idx, avail_idx, count);
217 
218 	return count;
219 }
220 
221 /*
222  * Enqueue id and len to used ring.
223  */
224 static void
225 vq_used_ring_enqueue(struct rte_vhost_vring *vq, uint16_t id, uint32_t len)
226 {
227 	struct vring_used *used = vq->used;
228 	uint16_t size_mask = vq->size - 1;
229 	uint16_t last_idx = vq->last_used_idx;
230 
231 	SPDK_TRACELOG(SPDK_TRACE_VHOST_RING, "USED: last_idx=%"PRIu16" req id=%"PRIu16" len=%"PRIu32"\n",
232 		      last_idx, id, len);
233 
234 	vq->last_used_idx++;
235 	last_idx &= size_mask;
236 
237 	used->ring[last_idx].id = id;
238 	used->ring[last_idx].len = len;
239 
240 	rte_compiler_barrier();
241 
242 	vq->used->idx = vq->last_used_idx;
243 	eventfd_write(vq->callfd, (eventfd_t)1);
244 }
245 
246 static bool
247 vring_desc_has_next(struct vring_desc *cur_desc)
248 {
249 	return !!(cur_desc->flags & VRING_DESC_F_NEXT);
250 }
251 
252 static struct vring_desc *
253 vring_desc_get_next(struct vring_desc *vq_desc, struct vring_desc *cur_desc)
254 {
255 	assert(vring_desc_has_next(cur_desc));
256 	return &vq_desc[cur_desc->next];
257 }
258 
259 static bool
260 vring_desc_is_wr(struct vring_desc *cur_desc)
261 {
262 	return !!(cur_desc->flags & VRING_DESC_F_WRITE);
263 }
264 
265 static void task_submit(struct spdk_vhost_task *task);
266 static int process_request(struct spdk_vhost_task *task);
267 static void invalid_request(struct spdk_vhost_task *task);
268 
269 static void
270 submit_completion(struct spdk_vhost_task *task)
271 {
272 	struct iovec *iovs = NULL;
273 	int result;
274 
275 	vq_used_ring_enqueue(task->vq, task->req_idx, task->scsi.data_transferred);
276 	SPDK_TRACELOG(SPDK_TRACE_VHOST, "Finished task (%p) req_idx=%d\n", task, task->req_idx);
277 
278 	if (task->scsi.iovs != &task->scsi.iov) {
279 		iovs = task->scsi.iovs;
280 		task->scsi.iovs = &task->scsi.iov;
281 		task->scsi.iovcnt = 1;
282 	}
283 
284 	spdk_vhost_task_put(task);
285 
286 	if (!iovs) {
287 		return;
288 	}
289 
290 	while (1) {
291 		task = spdk_vhost_dequeue_task();
292 		if (!task) {
293 			spdk_vhost_iovec_free(iovs);
294 			break;
295 		}
296 
297 		/* Set iovs so underlying functions will not try to alloc IOV */
298 		task->scsi.iovs = iovs;
299 		task->scsi.iovcnt = VHOST_SCSI_IOVS_LEN;
300 
301 		result = process_request(task);
302 		if (result == 0) {
303 			task_submit(task);
304 			break;
305 		} else {
306 			task->scsi.iovs = &task->scsi.iov;
307 			task->scsi.iovcnt = 1;
308 			invalid_request(task);
309 		}
310 	}
311 }
312 
313 static void
314 process_mgmt_task_completion(void *arg1, void *arg2)
315 {
316 	struct spdk_vhost_task *task = arg1;
317 
318 	submit_completion(task);
319 }
320 
321 static void
322 process_task_completion(void *arg1, void *arg2)
323 {
324 	struct spdk_vhost_task *task = arg1;
325 
326 	/* The SCSI task has completed.  Do final processing and then post
327 	   notification to the virtqueue's "used" ring.
328 	 */
329 	task->resp->status = task->scsi.status;
330 
331 	if (task->scsi.status != SPDK_SCSI_STATUS_GOOD) {
332 		memcpy(task->resp->sense, task->scsi.sense_data, task->scsi.sense_data_len);
333 		task->resp->sense_len = task->scsi.sense_data_len;
334 	}
335 	task->resp->resid = task->scsi.transfer_len - task->scsi.data_transferred;
336 
337 	submit_completion(task);
338 }
339 
340 static void
341 task_submit(struct spdk_vhost_task *task)
342 {
343 	/* The task is ready to be submitted.  First create the callback event that
344 	   will be invoked when the SCSI command is completed.  See process_task_completion()
345 	   for what SPDK vhost-scsi does when the task is completed.
346 	 */
347 
348 	task->resp->response = VIRTIO_SCSI_S_OK;
349 	task->scsi.cb_event = spdk_event_allocate(rte_lcore_id(),
350 			      process_task_completion,
351 			      task, NULL);
352 	spdk_scsi_dev_queue_task(task->scsi_dev, &task->scsi);
353 }
354 
355 static void
356 mgmt_task_submit(struct spdk_vhost_task *task)
357 {
358 	task->tmf_resp->response = VIRTIO_SCSI_S_OK;
359 	task->scsi.cb_event = spdk_event_allocate(rte_lcore_id(),
360 			      process_mgmt_task_completion,
361 			      task, NULL);
362 	spdk_scsi_dev_queue_mgmt_task(task->scsi_dev, &task->scsi);
363 }
364 
365 static void
366 invalid_request(struct spdk_vhost_task *task)
367 {
368 	vq_used_ring_enqueue(task->vq, task->req_idx, 0);
369 	spdk_vhost_task_put(task);
370 
371 	SPDK_TRACELOG(SPDK_TRACE_VHOST, "Invalid request (status=%" PRIu8")\n",
372 		      task->resp ? task->resp->response : -1);
373 }
374 
375 static struct spdk_scsi_dev *
376 get_scsi_dev(struct spdk_vhost_scsi_ctrlr *vdev, const __u8 *lun)
377 {
378 	SPDK_TRACEDUMP(SPDK_TRACE_VHOST_QUEUE, "LUN", lun, 8);
379 	/* First byte must be 1 and second is target */
380 	if (lun[0] != 1 || lun[1] >= SPDK_VHOST_SCSI_CTRLR_MAX_DEVS)
381 		return NULL;
382 
383 	return vdev->scsi_dev[lun[1]];
384 }
385 
386 static struct spdk_scsi_lun *
387 get_scsi_lun(struct spdk_scsi_dev *scsi_dev, const __u8 *lun)
388 {
389 	uint16_t lun_id = (((uint16_t)lun[2] << 8) | lun[3]) & 0x3FFF;
390 
391 	/* For now only one LUN per controller is allowed so no need to search LUN IDs */
392 	if (likely(scsi_dev != NULL)) {
393 		return spdk_scsi_dev_get_lun(scsi_dev, lun_id);
394 	}
395 
396 	return NULL;
397 }
398 
399 static void
400 process_ctrl_request(struct spdk_vhost_scsi_ctrlr *vdev, struct rte_vhost_vring *controlq,
401 		     uint16_t req_idx)
402 {
403 	struct spdk_vhost_task *task;
404 
405 	struct vring_desc *desc;
406 	struct virtio_scsi_ctrl_tmf_req *ctrl_req;
407 	struct virtio_scsi_ctrl_an_resp *an_resp;
408 
409 	desc = &controlq->desc[req_idx];
410 	ctrl_req = (void *)gpa_to_vva(vdev->dev, desc->addr);
411 
412 	SPDK_TRACELOG(SPDK_TRACE_VHOST_QUEUE,
413 		      "Processing controlq descriptor: desc %d/%p, desc_addr %p, len %d, flags %d, last_used_idx %d; kickfd %d; size %d\n",
414 		      req_idx, desc, (void *)desc->addr, desc->len, desc->flags, controlq->last_used_idx,
415 		      controlq->kickfd, controlq->size);
416 	SPDK_TRACEDUMP(SPDK_TRACE_VHOST_QUEUE, "Request desriptor", (uint8_t *)ctrl_req,
417 		       desc->len);
418 
419 	task = spdk_vhost_task_get(&vdev->task_cnt);
420 	task->vq = controlq;
421 	task->vdev = vdev;
422 	task->req_idx = req_idx;
423 	task->scsi_dev = get_scsi_dev(task->vdev, ctrl_req->lun);
424 
425 	/* Process the TMF request */
426 	switch (ctrl_req->type) {
427 	case VIRTIO_SCSI_T_TMF:
428 		/* Get the response buffer */
429 		assert(vring_desc_has_next(desc));
430 		desc = vring_desc_get_next(controlq->desc, desc);
431 		task->tmf_resp = (void *)gpa_to_vva(vdev->dev, desc->addr);
432 
433 		/* Check if we are processing a valid request */
434 		if (task->scsi_dev == NULL) {
435 			task->tmf_resp->response = VIRTIO_SCSI_S_BAD_TARGET;
436 			break;
437 		}
438 
439 		switch (ctrl_req->subtype) {
440 		case VIRTIO_SCSI_T_TMF_LOGICAL_UNIT_RESET:
441 			/* Handle LUN reset */
442 			SPDK_TRACELOG(SPDK_TRACE_VHOST_QUEUE, "LUN reset\n");
443 			task->scsi.type = SPDK_SCSI_TASK_TYPE_MANAGE;
444 			task->scsi.function = SPDK_SCSI_TASK_FUNC_LUN_RESET;
445 			task->scsi.lun = get_scsi_lun(task->scsi_dev, ctrl_req->lun);
446 
447 			mgmt_task_submit(task);
448 			return;
449 		default:
450 			task->tmf_resp->response = VIRTIO_SCSI_S_ABORTED;
451 			/* Unsupported command */
452 			SPDK_TRACELOG(SPDK_TRACE_VHOST_QUEUE, "Unsupported TMF command %x\n", ctrl_req->subtype);
453 			break;
454 		}
455 		break;
456 	case VIRTIO_SCSI_T_AN_QUERY:
457 	case VIRTIO_SCSI_T_AN_SUBSCRIBE: {
458 		desc = vring_desc_get_next(controlq->desc, desc);
459 		an_resp = (void *)gpa_to_vva(vdev->dev, desc->addr);
460 		an_resp->response = VIRTIO_SCSI_S_ABORTED;
461 		break;
462 	}
463 	default:
464 		SPDK_TRACELOG(SPDK_TRACE_VHOST_QUEUE, "Unsupported control command %x\n", ctrl_req->type);
465 		break;
466 	}
467 
468 	vq_used_ring_enqueue(controlq, req_idx, 0);
469 	spdk_vhost_task_put(task);
470 }
471 
472 /*
473  * Process task's descriptor chain and setup data related fields.
474  * Return
475  *   -1 if request is invalid and must be aborted,
476  *    0 if all data are set,
477  *    1 if it was not possible to allocate IO vector for this task.
478  */
479 static int
480 task_data_setup(struct spdk_vhost_task *task,
481 		struct virtio_scsi_cmd_req **req)
482 {
483 	struct rte_vhost_vring *vq = task->vq;
484 	struct spdk_vhost_dev *dev = task->vdev->dev;
485 	struct vring_desc *desc =  &task->vq->desc[task->req_idx];
486 	struct iovec *iovs = task->scsi.iovs;
487 	uint16_t iovcnt = 0, iovcnt_max = task->scsi.iovcnt;
488 	uint32_t len = 0;
489 
490 	assert(iovcnt_max == 1 || iovcnt_max == VHOST_SCSI_IOVS_LEN);
491 
492 	/* Sanity check. First descriptor must be readable and must have next one. */
493 	if (unlikely(vring_desc_is_wr(desc) || !vring_desc_has_next(desc))) {
494 		SPDK_WARNLOG("Invalid first (request) descriptor.\n");
495 		task->resp = NULL;
496 		goto abort_task;
497 	}
498 
499 	*req = (void *)gpa_to_vva(dev, desc->addr);
500 
501 	desc = vring_desc_get_next(vq->desc, desc);
502 	task->scsi.dxfer_dir = vring_desc_is_wr(desc) ? SPDK_SCSI_DIR_FROM_DEV : SPDK_SCSI_DIR_TO_DEV;
503 
504 	if (task->scsi.dxfer_dir == SPDK_SCSI_DIR_FROM_DEV) {
505 		/*
506 		 * FROM_DEV (READ): [RD_req][WR_resp][WR_buf0]...[WR_bufN]
507 		 */
508 		task->resp = (void *)gpa_to_vva(dev, desc->addr);
509 		if (!vring_desc_has_next(desc)) {
510 			/*
511 			 * TEST UNIT READY command and some others might not contain any payload and this is not an error.
512 			 */
513 			SPDK_TRACELOG(SPDK_TRACE_VHOST_DATA,
514 				      "No payload descriptors for FROM DEV command req_idx=%"PRIu16".\n", task->req_idx);
515 			SPDK_TRACEDUMP(SPDK_TRACE_VHOST_DATA, "CDB=", (*req)->cdb, VIRTIO_SCSI_CDB_SIZE);
516 			task->scsi.iovcnt = 1;
517 			task->scsi.iovs[0].iov_len = 0;
518 			task->scsi.length = 0;
519 			task->scsi.transfer_len = 0;
520 			return 0;
521 		}
522 
523 		desc = vring_desc_get_next(vq->desc, desc);
524 		if (iovcnt_max != VHOST_SCSI_IOVS_LEN && vring_desc_has_next(desc)) {
525 			iovs = spdk_vhost_iovec_alloc();
526 			if (iovs == NULL) {
527 				return 1;
528 			}
529 
530 			iovcnt_max = VHOST_SCSI_IOVS_LEN;
531 		}
532 
533 		/* All remaining descriptors are data. */
534 		while (iovcnt < iovcnt_max) {
535 			iovs[iovcnt].iov_base = (void *)gpa_to_vva(dev, desc->addr);
536 			iovs[iovcnt].iov_len = desc->len;
537 			len += desc->len;
538 			iovcnt++;
539 
540 			if (!vring_desc_has_next(desc))
541 				break;
542 
543 			desc = vring_desc_get_next(vq->desc, desc);
544 			if (unlikely(!vring_desc_is_wr(desc))) {
545 				SPDK_WARNLOG("FROM DEV cmd: descriptor nr %" PRIu16" in payload chain is read only.\n", iovcnt);
546 				task->resp = NULL;
547 				goto abort_task;
548 			}
549 		}
550 	} else {
551 		SPDK_TRACELOG(SPDK_TRACE_VHOST_DATA, "TO DEV");
552 		/*
553 		 * TO_DEV (WRITE):[RD_req][RD_buf0]...[RD_bufN][WR_resp]
554 		 * No need to check descriptor WR flag as this is done while setting scsi.dxfer_dir.
555 		 */
556 
557 		if (iovcnt_max != VHOST_SCSI_IOVS_LEN && vring_desc_has_next(desc)) {
558 			/* If next descriptor is not for response, allocate iovs. */
559 			if (!vring_desc_is_wr(vring_desc_get_next(vq->desc, desc))) {
560 				iovs = spdk_vhost_iovec_alloc();
561 
562 				if (iovs == NULL) {
563 					return 1;
564 				}
565 
566 				iovcnt_max = VHOST_SCSI_IOVS_LEN;
567 			}
568 		}
569 
570 		/* Process descriptors up to response. */
571 		while (!vring_desc_is_wr(desc) && iovcnt < iovcnt_max) {
572 			iovs[iovcnt].iov_base = (void *)gpa_to_vva(dev, desc->addr);
573 			iovs[iovcnt].iov_len = desc->len;
574 			len += desc->len;
575 			iovcnt++;
576 
577 			if (!vring_desc_has_next(desc)) {
578 				SPDK_WARNLOG("TO_DEV cmd: no response descriptor.\n");
579 				task->resp = NULL;
580 				goto abort_task;
581 			}
582 
583 			desc = vring_desc_get_next(vq->desc, desc);
584 		}
585 
586 		task->resp = (void *)gpa_to_vva(dev, desc->addr);
587 		if (vring_desc_has_next(desc)) {
588 			SPDK_WARNLOG("TO_DEV cmd: ignoring unexpected descriptors after response descriptor.\n");
589 		}
590 	}
591 
592 	if (iovcnt_max > 1 && iovcnt == iovcnt_max) {
593 		SPDK_WARNLOG("Too many IO vectors in chain!\n");
594 		goto abort_task;
595 	}
596 
597 	task->scsi.iovs = iovs;
598 	task->scsi.iovcnt = iovcnt;
599 	task->scsi.length = len;
600 	task->scsi.transfer_len = len;
601 	return 0;
602 
603 abort_task:
604 	if (iovs != task->scsi.iovs) {
605 		spdk_vhost_iovec_free(iovs);
606 	}
607 
608 	if (task->resp) {
609 		task->resp->response = VIRTIO_SCSI_S_ABORTED;
610 	}
611 
612 	return -1;
613 }
614 
615 static int
616 process_request(struct spdk_vhost_task *task)
617 {
618 	struct virtio_scsi_cmd_req *req;
619 	int result;
620 
621 	result = task_data_setup(task, &req);
622 	if (result) {
623 		return result;
624 	}
625 
626 	task->scsi_dev = get_scsi_dev(task->vdev, req->lun);
627 	if (unlikely(task->scsi_dev == NULL)) {
628 		task->resp->response = VIRTIO_SCSI_S_BAD_TARGET;
629 		return -1;
630 	}
631 
632 	task->scsi.lun = get_scsi_lun(task->scsi_dev, req->lun);
633 	task->scsi.cdb = req->cdb;
634 	task->scsi.target_port = spdk_scsi_dev_find_port_by_id(task->scsi_dev, 0);
635 	SPDK_TRACEDUMP(SPDK_TRACE_VHOST_DATA, "request CDB", req->cdb, VIRTIO_SCSI_CDB_SIZE);
636 	return 0;
637 }
638 
639 static void
640 process_controlq(struct spdk_vhost_scsi_ctrlr *vdev, struct rte_vhost_vring *vq)
641 {
642 	uint16_t reqs[32];
643 	uint16_t reqs_cnt, i;
644 
645 	reqs_cnt = vq_avail_ring_get(vq, reqs, RTE_DIM(reqs));
646 	for (i = 0; i < reqs_cnt; i++) {
647 		process_ctrl_request(vdev, vq, reqs[i]);
648 	}
649 }
650 
651 static void
652 process_requestq(struct spdk_vhost_scsi_ctrlr *vdev, struct rte_vhost_vring *vq)
653 {
654 	uint16_t reqs[32];
655 	uint16_t reqs_cnt, i;
656 	struct spdk_vhost_task *task;
657 	int result;
658 
659 	reqs_cnt = vq_avail_ring_get(vq, reqs, RTE_DIM(reqs));
660 	assert(reqs_cnt <= 32);
661 
662 	for (i = 0; i < reqs_cnt; i++) {
663 		task = spdk_vhost_task_get(&vdev->task_cnt);
664 
665 		SPDK_TRACELOG(SPDK_TRACE_VHOST, "====== Starting processing request idx %"PRIu16"======\n",
666 			      reqs[i]);
667 		task->vq = vq;
668 		task->vdev = vdev;
669 		task->req_idx = reqs[i];
670 		result = process_request(task);
671 		if (likely(result == 0)) {
672 			task_submit(task);
673 			SPDK_TRACELOG(SPDK_TRACE_VHOST, "====== Task %p req_idx %d submitted ======\n", task,
674 				      task->req_idx);
675 		} else if (result > 0) {
676 			spdk_vhost_enqueue_task(task);
677 			SPDK_TRACELOG(SPDK_TRACE_VHOST, "====== Task %p req_idx %d deferred ======\n", task, task->req_idx);
678 		} else {
679 			invalid_request(task);
680 			SPDK_TRACELOG(SPDK_TRACE_VHOST, "====== Task %p req_idx %d failed ======\n", task, task->req_idx);
681 		}
682 	}
683 }
684 
685 static void
686 vdev_controlq_worker(void *arg)
687 {
688 	struct spdk_vhost_scsi_ctrlr *vdev = arg;
689 
690 	process_controlq(vdev, &vdev->dev->virtqueue[VIRTIO_SCSI_CONTROLQ]);
691 }
692 
693 static void
694 vdev_worker(void *arg)
695 {
696 	struct spdk_vhost_scsi_ctrlr *vdev = arg;
697 	uint32_t q_idx;
698 
699 	for (q_idx = VIRTIO_SCSI_REQUESTQ; q_idx < vdev->dev->num_queues; q_idx++) {
700 		process_requestq(vdev, &vdev->dev->virtqueue[q_idx]);
701 	}
702 }
703 
704 #define SHIFT_2MB	21
705 #define SIZE_2MB	(1ULL << SHIFT_2MB)
706 #define FLOOR_2MB(x)	(((uintptr_t)x) / SIZE_2MB) << SHIFT_2MB
707 #define CEIL_2MB(x)	((((uintptr_t)x) + SIZE_2MB - 1) / SIZE_2MB) << SHIFT_2MB
708 
709 static void
710 vdev_event_done_cb(void *arg1, void *arg2)
711 {
712 	sem_post((sem_t *)arg2);
713 }
714 
715 static struct spdk_event *
716 vhost_sem_event_alloc(uint32_t core, spdk_event_fn fn, void *arg1, sem_t *sem)
717 {
718 	if (sem_init(sem, 0, 0) < 0)
719 		rte_panic("Failed to initialize semaphore.");
720 
721 	return spdk_event_allocate(core, fn, arg1, sem);
722 }
723 
724 static int
725 vhost_sem_timedwait(sem_t *sem, unsigned sec)
726 {
727 	struct timespec timeout;
728 	int rc;
729 
730 	clock_gettime(CLOCK_REALTIME, &timeout);
731 	timeout.tv_sec += sec;
732 
733 	rc = sem_timedwait(sem, &timeout);
734 	sem_destroy(sem);
735 
736 	return rc;
737 }
738 
739 static void
740 add_vdev_cb(void *arg1, void *arg2)
741 {
742 	struct spdk_vhost_scsi_ctrlr *vdev = arg1;
743 	struct rte_vhost_mem_region *region;
744 	uint32_t i;
745 
746 	for (i = 0; i < SPDK_VHOST_SCSI_CTRLR_MAX_DEVS; i++) {
747 		if (vdev->scsi_dev[i] == NULL) {
748 			continue;
749 		}
750 		spdk_scsi_dev_allocate_io_channels(vdev->scsi_dev[i]);
751 	}
752 	SPDK_NOTICELOG("Started poller for vhost controller %s on lcore %d\n", vdev->name, vdev->lcore);
753 
754 	for (i = 0; i < vdev->dev->mem->nregions; i++) {
755 		uint64_t start, end, len;
756 		region = &vdev->dev->mem->regions[i];
757 		start = FLOOR_2MB(region->mmap_addr);
758 		end = CEIL_2MB(region->mmap_addr + region->mmap_size);
759 		len = end - start;
760 		SPDK_NOTICELOG("Registering VM memory for vtophys translation - 0x%jx len:0x%jx\n",
761 			       start, len);
762 		spdk_mem_register((void *)start, len);
763 		spdk_iommu_mem_register(region->host_user_addr, region->size);
764 
765 	}
766 
767 	spdk_poller_register(&vdev->requestq_poller, vdev_worker, vdev, vdev->lcore, 0);
768 	spdk_poller_register(&vdev->controlq_poller, vdev_controlq_worker, vdev, vdev->lcore,
769 			     CONTROLQ_POLL_PERIOD_US);
770 	sem_post((sem_t *)arg2);
771 }
772 
773 static void
774 remove_vdev_cb(void *arg1, void *arg2)
775 {
776 	struct spdk_vhost_scsi_ctrlr *vdev = arg1;
777 	struct rte_vhost_mem_region *region;
778 	uint32_t i;
779 
780 	for (i = 0; i < SPDK_VHOST_SCSI_CTRLR_MAX_DEVS; i++) {
781 		if (vdev->scsi_dev[i] == NULL) {
782 			continue;
783 		}
784 		spdk_scsi_dev_free_io_channels(vdev->scsi_dev[i]);
785 	}
786 
787 	SPDK_NOTICELOG("Stopping poller for vhost controller %s\n", vdev->name);
788 	for (i = 0; i < vdev->dev->mem->nregions; i++) {
789 		uint64_t start, end, len;
790 		region = &vdev->dev->mem->regions[i];
791 		start = FLOOR_2MB(region->mmap_addr);
792 		end = CEIL_2MB(region->mmap_addr + region->mmap_size);
793 		len = end - start;
794 		spdk_iommu_mem_unregister(region->host_user_addr, region->size);
795 		spdk_mem_unregister((void *)start, len);
796 	}
797 
798 	sem_post((sem_t *)arg2);
799 }
800 
801 static void
802 destroy_device(int vid)
803 {
804 	struct spdk_vhost_scsi_ctrlr *vdev;
805 	struct spdk_event *event;
806 	sem_t done_sem;
807 	uint32_t i;
808 
809 	assert(vid < MAX_VHOST_DEVICE);
810 	vdev = dpdk_vid_mapping[vid];
811 
812 	event = vhost_sem_event_alloc(vdev->lcore, vdev_event_done_cb, NULL, &done_sem);
813 	spdk_poller_unregister(&vdev->requestq_poller, event);
814 	if (vhost_sem_timedwait(&done_sem, 1))
815 		rte_panic("%s: failed to unregister request queue poller.\n", vdev->name);
816 
817 	event = vhost_sem_event_alloc(vdev->lcore, vdev_event_done_cb, NULL, &done_sem);
818 	spdk_poller_unregister(&vdev->controlq_poller, event);
819 	if (vhost_sem_timedwait(&done_sem, 1))
820 		rte_panic("%s: failed to unregister control queue poller.\n", vdev->name);
821 
822 	/* Wait for all tasks to finish */
823 	for (i = 1000; i && vdev->task_cnt > 0; i--) {
824 		usleep(1000);
825 	}
826 
827 	if (vdev->task_cnt > 0) {
828 		rte_panic("%s: pending tasks did not finish in 1s.\n", vdev->name);
829 	}
830 
831 	event = vhost_sem_event_alloc(vdev->lcore, remove_vdev_cb, vdev, &done_sem);
832 	spdk_event_call(event);
833 	if (vhost_sem_timedwait(&done_sem, 1))
834 		rte_panic("%s: failed to unregister poller.\n", vdev->name);
835 
836 	g_num_ctrlrs[vdev->lcore]--;
837 	vdev->lcore = -1;
838 
839 	spdk_vhost_dev_destruct(vdev->dev);
840 	vdev->dev = NULL;
841 	dpdk_vid_mapping[vid] = NULL;
842 }
843 
844 #define LUN_DEV_NAME_SIZE 8
845 #define MAX_SCSI_CTRLRS 15
846 
847 static struct spdk_vhost_scsi_ctrlr *spdk_vhost_ctrlrs[MAX_SCSI_CTRLRS];
848 
849 static struct spdk_vhost_scsi_ctrlr *
850 spdk_vhost_scsi_ctrlr_find(const char *ctrlr_name)
851 {
852 	unsigned i;
853 	size_t dev_dirname_len = strlen(dev_dirname);
854 
855 	if (strncmp(ctrlr_name, dev_dirname, dev_dirname_len) == 0) {
856 		ctrlr_name += dev_dirname_len;
857 	}
858 
859 	for (i = 0; i < MAX_SCSI_CTRLRS; i++) {
860 		if (spdk_vhost_ctrlrs[i] == NULL) {
861 			continue;
862 		}
863 
864 		if (strcmp(spdk_vhost_ctrlrs[i]->name, ctrlr_name) == 0) {
865 			return spdk_vhost_ctrlrs[i];
866 		}
867 	}
868 
869 	return NULL;
870 }
871 
872 
873 static int new_device(int vid);
874 static void destroy_device(int vid);
875 /*
876  * These callback allow devices to be added to the data core when configuration
877  * has been fully complete.
878  */
879 static const struct vhost_device_ops spdk_vhost_scsi_device_ops = {
880 	.new_device =  new_device,
881 	.destroy_device = destroy_device,
882 };
883 
884 int
885 spdk_vhost_scsi_ctrlr_construct(const char *name, uint64_t cpumask)
886 {
887 	struct spdk_vhost_scsi_ctrlr *vdev;
888 	unsigned ctrlr_num;
889 	char path[PATH_MAX];
890 	struct stat file_stat;
891 
892 	if (name == NULL) {
893 		SPDK_ERRLOG("Can't add controller with no name\n");
894 		return -EINVAL;
895 	}
896 
897 	if ((cpumask & spdk_app_get_core_mask()) != cpumask) {
898 		SPDK_ERRLOG("cpumask 0x%jx not a subset of app mask 0x%jx\n",
899 			    cpumask, spdk_app_get_core_mask());
900 		return -EINVAL;
901 	}
902 
903 	if (spdk_vhost_scsi_ctrlr_find(name)) {
904 		SPDK_ERRLOG("vhost scsi controller %s already exists.\n", name);
905 		return -EEXIST;
906 	}
907 
908 	for (ctrlr_num = 0; ctrlr_num < MAX_SCSI_CTRLRS; ctrlr_num++) {
909 		if (spdk_vhost_ctrlrs[ctrlr_num] == NULL) {
910 			break;
911 		}
912 	}
913 
914 	if (ctrlr_num == MAX_SCSI_CTRLRS) {
915 		SPDK_ERRLOG("Max scsi controllers reached (%d).\n", MAX_SCSI_CTRLRS);
916 		return -ENOSPC;
917 	}
918 
919 	if (snprintf(path, sizeof(path), "%s%s", dev_dirname, name) >= (int)sizeof(path)) {
920 		SPDK_ERRLOG("Resulting socket path for controller %s is too long: %s%s\n", name, dev_dirname, name);
921 		return -EINVAL;
922 	}
923 
924 	/* Register vhost driver to handle vhost messages. */
925 	if (stat(path, &file_stat) != -1) {
926 		if (!S_ISSOCK(file_stat.st_mode)) {
927 			SPDK_ERRLOG("Cannot remove %s: not a socket.\n", path);
928 			return -EINVAL;
929 		} else if (unlink(path) != 0) {
930 			rte_exit(EXIT_FAILURE, "Cannot remove %s.\n", path);
931 		}
932 	}
933 
934 	if (rte_vhost_driver_register(path, 0) != 0) {
935 		SPDK_ERRLOG("Could not register controller %s with vhost library\n", name);
936 		SPDK_ERRLOG("Check if domain socket %s already exists\n", path);
937 		return -EIO;
938 	}
939 	if (rte_vhost_driver_set_features(path, SPDK_VHOST_SCSI_FEATURES) ||
940 	    rte_vhost_driver_disable_features(path, SPDK_VHOST_SCSI_DISABLED_FEATURES)) {
941 		SPDK_ERRLOG("Couldn't set vhost features for controller %s\n", name);
942 		return -EINVAL;
943 	}
944 
945 	if (rte_vhost_driver_callback_register(path, &spdk_vhost_scsi_device_ops) != 0) {
946 		SPDK_ERRLOG("Couldn't register callbacks for controller %s\n", name);
947 		return -ENOENT;
948 	}
949 
950 	vdev = spdk_zmalloc(sizeof(*vdev), RTE_CACHE_LINE_SIZE, NULL);
951 	if (vdev == NULL) {
952 		SPDK_ERRLOG("Couldn't allocate memory for vhost dev\n");
953 		return -ENOMEM;
954 	}
955 
956 	vdev->name =  strdup(name);
957 	vdev->cpumask = cpumask;
958 	vdev->lcore = -1;
959 
960 	if (rte_vhost_driver_start(path) != 0) {
961 		SPDK_ERRLOG("Failed to start vhost driver for controller %s (%d): %s", name, errno,
962 			    strerror(errno));
963 		free(vdev->name);
964 		spdk_free(vdev);
965 		return -EIO;
966 	}
967 
968 	spdk_vhost_ctrlrs[ctrlr_num] = vdev;
969 	SPDK_NOTICELOG("Controller %s: new controller added\n", name);
970 	return 0;
971 }
972 
973 int
974 spdk_vhost_parse_core_mask(const char *mask, uint64_t *cpumask)
975 {
976 	char *end;
977 
978 	if (mask == NULL || cpumask == NULL) {
979 		return -1;
980 	}
981 
982 	errno = 0;
983 	*cpumask = strtoull(mask, &end, 16);
984 
985 	if (*end != '\0' || errno || !*cpumask ||
986 	    ((*cpumask & spdk_app_get_core_mask()) != *cpumask)) {
987 
988 		SPDK_ERRLOG("cpumask %s not a subset of app mask 0x%jx\n",
989 			    mask, spdk_app_get_core_mask());
990 		return -1;
991 	}
992 
993 	return 0;
994 }
995 
996 struct spdk_scsi_dev *
997 spdk_vhost_scsi_ctrlr_get_dev(struct spdk_vhost_scsi_ctrlr *ctrlr, uint8_t num)
998 {
999 	assert(ctrlr != NULL);
1000 	assert(num < SPDK_VHOST_SCSI_CTRLR_MAX_DEVS);
1001 	return ctrlr->scsi_dev[num];
1002 }
1003 
1004 int
1005 spdk_vhost_scsi_ctrlr_add_dev(const char *ctrlr_name, unsigned scsi_dev_num, const char *lun_name)
1006 {
1007 	struct spdk_vhost_scsi_ctrlr *vdev;
1008 	char dev_name[SPDK_SCSI_DEV_MAX_NAME];
1009 	int lun_id_list[1];
1010 	char *lun_names_list[1];
1011 
1012 	if (ctrlr_name == NULL) {
1013 		SPDK_ERRLOG("No controller name\n");
1014 		return -EINVAL;
1015 	}
1016 
1017 	if (scsi_dev_num >= SPDK_VHOST_SCSI_CTRLR_MAX_DEVS) {
1018 		SPDK_ERRLOG("Controller %d device number too big (max %d)\n", scsi_dev_num,
1019 			    SPDK_VHOST_SCSI_CTRLR_MAX_DEVS);
1020 		return -EINVAL;
1021 	}
1022 
1023 	if (lun_name == NULL) {
1024 		SPDK_ERRLOG("No lun name specified \n");
1025 		return -EINVAL;
1026 	} else if (strlen(lun_name) >= SPDK_SCSI_DEV_MAX_NAME) {
1027 		SPDK_ERRLOG("LUN name '%s' too long (max %d).\n", lun_name, SPDK_SCSI_DEV_MAX_NAME - 1);
1028 		return -1;
1029 	}
1030 
1031 	vdev = spdk_vhost_scsi_ctrlr_find(ctrlr_name);
1032 	if (vdev == NULL) {
1033 		SPDK_ERRLOG("Controller %s is not defined\n", ctrlr_name);
1034 		return -ENODEV;
1035 	}
1036 
1037 	if (vdev->lcore != -1) {
1038 		SPDK_ERRLOG("Controller %s is in use and hotplug is not supported\n", ctrlr_name);
1039 		return -ENODEV;
1040 	}
1041 
1042 	if (vdev->scsi_dev[scsi_dev_num] != NULL) {
1043 		SPDK_ERRLOG("Controller %s dev %u already occupied\n", ctrlr_name, scsi_dev_num);
1044 		return -EEXIST;
1045 	}
1046 
1047 	/*
1048 	 * At this stage only one LUN per device
1049 	 */
1050 	snprintf(dev_name, sizeof(dev_name), "Dev %u", scsi_dev_num);
1051 	lun_id_list[0] = 0;
1052 	lun_names_list[0] = (char *)lun_name;
1053 
1054 	vdev->scsi_dev[scsi_dev_num] = spdk_scsi_dev_construct(dev_name, lun_names_list, lun_id_list, 1);
1055 	if (vdev->scsi_dev[scsi_dev_num] == NULL) {
1056 		SPDK_ERRLOG("Couldn't create spdk SCSI device '%s' using lun device '%s' in controller: %s\n",
1057 			    dev_name, lun_name, vdev->name);
1058 		return -EINVAL;
1059 	}
1060 
1061 	spdk_scsi_dev_add_port(vdev->scsi_dev[scsi_dev_num], 0, "vhost");
1062 	SPDK_NOTICELOG("Controller %s: defined device '%s' using lun '%s'\n",
1063 		       vdev->name, dev_name, lun_name);
1064 	return 0;
1065 }
1066 
1067 struct spdk_vhost_scsi_ctrlr *
1068 spdk_vhost_scsi_ctrlr_next(struct spdk_vhost_scsi_ctrlr *prev)
1069 {
1070 	int i = 0;
1071 
1072 	if (prev != NULL) {
1073 		for (; i < MAX_SCSI_CTRLRS; i++) {
1074 			if (spdk_vhost_ctrlrs[i] == prev) {
1075 				break;
1076 			}
1077 		}
1078 
1079 		i++;
1080 	}
1081 
1082 	for (; i < MAX_SCSI_CTRLRS; i++) {
1083 		if (spdk_vhost_ctrlrs[i] == NULL) {
1084 			continue;
1085 		}
1086 
1087 		return spdk_vhost_ctrlrs[i];
1088 	}
1089 
1090 	return NULL;
1091 }
1092 
1093 const char *
1094 spdk_vhost_scsi_ctrlr_get_name(struct spdk_vhost_scsi_ctrlr *ctrlr)
1095 {
1096 	assert(ctrlr != NULL);
1097 	return ctrlr->name;
1098 }
1099 
1100 uint64_t
1101 spdk_vhost_scsi_ctrlr_get_cpumask(struct spdk_vhost_scsi_ctrlr *ctrlr)
1102 {
1103 	assert(ctrlr != NULL);
1104 	return ctrlr->cpumask;
1105 }
1106 
1107 static int spdk_vhost_scsi_controller_construct(void)
1108 {
1109 	struct spdk_conf_section *sp = spdk_conf_first_section(NULL);
1110 	int i, dev_num;
1111 	unsigned ctrlr_num = 0;
1112 	char *lun_name, *dev_num_str;
1113 	char *cpumask_str;
1114 	char *name;
1115 	uint64_t cpumask;
1116 
1117 	while (sp != NULL) {
1118 		if (!spdk_conf_section_match_prefix(sp, "VhostScsi")) {
1119 			sp = spdk_conf_next_section(sp);
1120 			continue;
1121 		}
1122 
1123 		if (sscanf(spdk_conf_section_get_name(sp), "VhostScsi%u", &ctrlr_num) != 1) {
1124 			SPDK_ERRLOG("Section '%s' has non-numeric suffix.\n",
1125 				    spdk_conf_section_get_name(sp));
1126 			return -1;
1127 		}
1128 
1129 		name =  spdk_conf_section_get_val(sp, "Name");
1130 		cpumask_str = spdk_conf_section_get_val(sp, "Cpumask");
1131 		if (cpumask_str == NULL) {
1132 			cpumask = spdk_app_get_core_mask();
1133 		} else if (spdk_vhost_parse_core_mask(cpumask_str, &cpumask)) {
1134 			SPDK_ERRLOG("%s: Error parsing cpumask '%s' while creating controller\n", name, cpumask_str);
1135 			return -1;
1136 		}
1137 
1138 		if (spdk_vhost_scsi_ctrlr_construct(name, cpumask) < 0) {
1139 			return -1;
1140 		}
1141 
1142 		for (i = 0; spdk_conf_section_get_nval(sp, "Dev", i) != NULL; i++) {
1143 			dev_num_str = spdk_conf_section_get_nmval(sp, "Dev", i, 0);
1144 			if (dev_num_str == NULL) {
1145 				SPDK_ERRLOG("%s: Invalid or missing Dev number\n", name);
1146 				return -1;
1147 			}
1148 
1149 			dev_num = (int)strtol(dev_num_str, NULL, 10);
1150 			lun_name = spdk_conf_section_get_nmval(sp, "Dev", i, 1);
1151 			if (lun_name == NULL) {
1152 				SPDK_ERRLOG("%s: Invalid or missing LUN name for dev %d\n", name, dev_num);
1153 				return -1;
1154 			} else if (spdk_conf_section_get_nmval(sp, "Dev", i, 2)) {
1155 				SPDK_ERRLOG("%s: Only one LUN per vhost SCSI device supported\n", name);
1156 				return -1;
1157 			}
1158 
1159 			if (spdk_vhost_scsi_ctrlr_add_dev(name, dev_num, lun_name) < 0) {
1160 				return -1;
1161 			}
1162 		}
1163 
1164 		sp = spdk_conf_next_section(sp);
1165 
1166 	}
1167 
1168 	return 0;
1169 }
1170 
1171 static uint32_t
1172 spdk_vhost_scsi_allocate_reactor(uint64_t cpumask)
1173 {
1174 	uint32_t i, selected_core;
1175 	uint32_t min_ctrlrs;
1176 
1177 	cpumask &= spdk_app_get_core_mask();
1178 
1179 	if (cpumask == 0) {
1180 		return 0;
1181 	}
1182 
1183 	min_ctrlrs = INT_MAX;
1184 	selected_core = 0;
1185 
1186 	for (i = 0; i < RTE_MAX_LCORE && i < 64; i++) {
1187 		if (!((1ULL << i) & cpumask)) {
1188 			continue;
1189 		}
1190 
1191 		if (g_num_ctrlrs[i] < min_ctrlrs) {
1192 			selected_core = i;
1193 			min_ctrlrs = g_num_ctrlrs[i];
1194 		}
1195 	}
1196 
1197 	g_num_ctrlrs[selected_core]++;
1198 	return selected_core;
1199 }
1200 
1201 /*
1202  * A new device is added to a data core. First the device is added to the main linked list
1203  * and then allocated to a specific data core.
1204  */
1205 static int
1206 new_device(int vid)
1207 {
1208 	struct spdk_vhost_scsi_ctrlr *vdev = NULL;
1209 	struct spdk_event *event;
1210 
1211 	char ifname[PATH_MAX];
1212 	sem_t added;
1213 
1214 	assert(vid < MAX_VHOST_DEVICE);
1215 
1216 	if (rte_vhost_get_ifname(vid, ifname, PATH_MAX) < 0) {
1217 		SPDK_ERRLOG("Couldn't get a valid ifname for device %d\n", vid);
1218 		return -1;
1219 	}
1220 
1221 	vdev = spdk_vhost_scsi_ctrlr_find(ifname);
1222 	if (vdev == NULL) {
1223 		SPDK_ERRLOG("Controller %s not found.\n", ifname);
1224 		return -1;
1225 	}
1226 
1227 	if (vdev->lcore != -1) {
1228 		SPDK_ERRLOG("Controller %s already connected.\n", ifname);
1229 		return -1;
1230 	}
1231 
1232 	assert(vdev->dev == NULL);
1233 	vdev->dev = spdk_vhost_dev_create(vid);
1234 	if (vdev->dev == NULL) {
1235 		return -1;
1236 	}
1237 
1238 	dpdk_vid_mapping[vid] = vdev;
1239 	vdev->lcore = spdk_vhost_scsi_allocate_reactor(vdev->cpumask);
1240 
1241 	event = vhost_sem_event_alloc(vdev->lcore, add_vdev_cb, vdev, &added);
1242 	spdk_event_call(event);
1243 	if (vhost_sem_timedwait(&added, 1))
1244 		rte_panic("Failed to register new device '%s'\n", vdev->name);
1245 	return 0;
1246 }
1247 
1248 void
1249 spdk_vhost_startup(void *arg1, void *arg2)
1250 {
1251 	int ret;
1252 	const char *basename = arg1;
1253 
1254 	if (basename && strlen(basename) > 0) {
1255 		ret = snprintf(dev_dirname, sizeof(dev_dirname) - 2, "%s", basename);
1256 		if ((size_t)ret >= sizeof(dev_dirname) - 2) {
1257 			rte_exit(EXIT_FAILURE, "Char dev dir path length %d is too long\n", ret);
1258 		}
1259 
1260 		if (dev_dirname[ret - 1] != '/') {
1261 			dev_dirname[ret] = '/';
1262 			dev_dirname[ret + 1]  = '\0';
1263 		}
1264 	}
1265 
1266 	ret = spdk_vhost_scsi_controller_construct();
1267 	if (ret != 0)
1268 		rte_exit(EXIT_FAILURE, "Cannot construct vhost controllers\n");
1269 }
1270 
1271 static void *
1272 session_shutdown(void *arg)
1273 {
1274 	struct spdk_vhost_scsi_ctrlr *vdev = NULL;
1275 	int i;
1276 
1277 	for (i = 0; i < MAX_SCSI_CTRLRS; i++) {
1278 		vdev = spdk_vhost_ctrlrs[i];
1279 		if (vdev == NULL) {
1280 			continue;
1281 		}
1282 		rte_vhost_driver_unregister(vdev->name);
1283 	}
1284 
1285 	SPDK_NOTICELOG("Exiting\n");
1286 	spdk_app_stop(0);
1287 	return NULL;
1288 }
1289 
1290 /*
1291  * When we receive a INT signal. Execute shutdown in separate thread to avoid deadlock.
1292  */
1293 void
1294 spdk_vhost_shutdown_cb(void)
1295 {
1296 	pthread_t tid;
1297 	if (pthread_create(&tid, NULL, &session_shutdown, NULL) < 0)
1298 		rte_panic("Failed to start session shutdown thread (%d): %s", errno, strerror(errno));
1299 	pthread_detach(tid);
1300 }
1301 
1302 SPDK_LOG_REGISTER_TRACE_FLAG("vhost", SPDK_TRACE_VHOST)
1303 SPDK_LOG_REGISTER_TRACE_FLAG("vhost_ring", SPDK_TRACE_VHOST_RING)
1304 SPDK_LOG_REGISTER_TRACE_FLAG("vhost_queue", SPDK_TRACE_VHOST_QUEUE)
1305 SPDK_LOG_REGISTER_TRACE_FLAG("vhost_data", SPDK_TRACE_VHOST_DATA)
1306