xref: /dpdk/examples/vhost_blk/vhost_blk.c (revision 68a03efeed657e6e05f281479b33b51102797e15)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2019 Intel Corporation
3  */
4 
5 #ifndef _GNU_SOURCE
6 #define _GNU_SOURCE
7 #endif
8 #include <pthread.h>
9 #include <sched.h>
10 
11 #include <stdint.h>
12 #include <unistd.h>
13 #include <stdbool.h>
14 #include <signal.h>
15 #include <assert.h>
16 #include <semaphore.h>
17 #include <linux/virtio_blk.h>
18 #include <linux/virtio_ring.h>
19 
20 #include <rte_atomic.h>
21 #include <rte_cycles.h>
22 #include <rte_log.h>
23 #include <rte_malloc.h>
24 #include <rte_vhost.h>
25 
26 #include "vhost_blk.h"
27 #include "blk_spec.h"
28 
29 #define VIRTQ_DESC_F_NEXT	1
30 #define VIRTQ_DESC_F_AVAIL	(1 << 7)
31 #define VIRTQ_DESC_F_USED	(1 << 15)
32 
33 #define MAX_TASK		12
34 
35 #define VHOST_BLK_FEATURES ((1ULL << VIRTIO_F_RING_PACKED) | \
36 			    (1ULL << VIRTIO_F_VERSION_1) |\
37 			    (1ULL << VIRTIO_F_NOTIFY_ON_EMPTY) | \
38 			    (1ULL << VHOST_USER_F_PROTOCOL_FEATURES))
39 #define CTRLR_NAME		"vhost.socket"
40 
41 enum CTRLR_WORKER_STATUS {
42 	WORKER_STATE_START = 0,
43 	WORKER_STATE_STOP,
44 };
45 
46 struct vhost_blk_ctrlr *g_vhost_ctrlr;
47 
48 /* Path to folder where character device will be created. Can be set by user. */
49 static char dev_pathname[PATH_MAX] = "";
50 static sem_t exit_sem;
51 static enum CTRLR_WORKER_STATUS worker_thread_status;
52 
53 struct vhost_blk_ctrlr *
54 vhost_blk_ctrlr_find(const char *ctrlr_name)
55 {
56 	if (ctrlr_name == NULL)
57 		return NULL;
58 
59 	/* currently we only support 1 socket file fd */
60 	return g_vhost_ctrlr;
61 }
62 
63 static uint64_t
64 gpa_to_vva(struct vhost_blk_ctrlr *ctrlr, uint64_t gpa, uint64_t *len)
65 {
66 	assert(ctrlr->mem != NULL);
67 
68 	return rte_vhost_va_from_guest_pa(ctrlr->mem, gpa, len);
69 }
70 
71 static void
72 enqueue_task(struct vhost_blk_task *task)
73 {
74 	struct vhost_blk_queue *vq = task->vq;
75 	struct vring_used *used = vq->vring.used;
76 
77 	rte_vhost_set_last_inflight_io_split(task->ctrlr->vid,
78 		vq->id, task->req_idx);
79 
80 	/* Fill out the next entry in the "used" ring.  id = the
81 	 * index of the descriptor that contained the blk request.
82 	 * len = the total amount of data transferred for the blk
83 	 * request. We must report the correct len, for variable
84 	 * length blk CDBs, where we may return less data than
85 	 * allocated by the guest VM.
86 	 */
87 	used->ring[used->idx & (vq->vring.size - 1)].id = task->req_idx;
88 	used->ring[used->idx & (vq->vring.size - 1)].len = task->data_len;
89 	rte_atomic_thread_fence(__ATOMIC_SEQ_CST);
90 	used->idx++;
91 	rte_atomic_thread_fence(__ATOMIC_SEQ_CST);
92 
93 	rte_vhost_clr_inflight_desc_split(task->ctrlr->vid,
94 		vq->id, used->idx, task->req_idx);
95 
96 	/* Send an interrupt back to the guest VM so that it knows
97 	 * a completion is ready to be processed.
98 	 */
99 	rte_vhost_vring_call(task->ctrlr->vid, vq->id);
100 }
101 
102 static void
103 enqueue_task_packed(struct vhost_blk_task *task)
104 {
105 	struct vhost_blk_queue *vq = task->vq;
106 	struct vring_packed_desc *desc;
107 
108 	rte_vhost_set_last_inflight_io_packed(task->ctrlr->vid, vq->id,
109 					    task->inflight_idx);
110 
111 	desc = &vq->vring.desc_packed[vq->last_used_idx];
112 	desc->id = task->buffer_id;
113 	desc->addr = 0;
114 
115 	rte_atomic_thread_fence(__ATOMIC_SEQ_CST);
116 	if (vq->used_wrap_counter)
117 		desc->flags |= VIRTQ_DESC_F_AVAIL | VIRTQ_DESC_F_USED;
118 	else
119 		desc->flags &= ~(VIRTQ_DESC_F_AVAIL | VIRTQ_DESC_F_USED);
120 	rte_atomic_thread_fence(__ATOMIC_SEQ_CST);
121 
122 	rte_vhost_clr_inflight_desc_packed(task->ctrlr->vid, vq->id,
123 					   task->inflight_idx);
124 
125 	vq->last_used_idx += task->chain_num;
126 	if (vq->last_used_idx >= vq->vring.size) {
127 		vq->last_used_idx -= vq->vring.size;
128 		vq->used_wrap_counter = !vq->used_wrap_counter;
129 	}
130 
131 	/* Send an interrupt back to the guest VM so that it knows
132 	 * a completion is ready to be processed.
133 	 */
134 	rte_vhost_vring_call(task->ctrlr->vid, vq->id);
135 }
136 
137 static bool
138 descriptor_has_next_packed(struct vring_packed_desc *cur_desc)
139 {
140 	return !!(cur_desc->flags & VRING_DESC_F_NEXT);
141 }
142 
143 static bool
144 descriptor_has_next_split(struct vring_desc *cur_desc)
145 {
146 	return !!(cur_desc->flags & VRING_DESC_F_NEXT);
147 }
148 
149 static int
150 desc_payload_to_iovs(struct vhost_blk_ctrlr *ctrlr, struct iovec *iovs,
151 		     uint32_t *iov_index, uintptr_t payload, uint64_t remaining)
152 {
153 	void *vva;
154 	uint64_t len;
155 
156 	do {
157 		if (*iov_index >= VHOST_BLK_MAX_IOVS) {
158 			fprintf(stderr, "VHOST_BLK_MAX_IOVS reached\n");
159 			return -1;
160 		}
161 		len = remaining;
162 		vva = (void *)(uintptr_t)gpa_to_vva(ctrlr,
163 				 payload, &len);
164 		if (!vva || !len) {
165 			fprintf(stderr, "failed to translate desc address.\n");
166 			return -1;
167 		}
168 
169 		iovs[*iov_index].iov_base = vva;
170 		iovs[*iov_index].iov_len = len;
171 		payload += len;
172 		remaining -= len;
173 		(*iov_index)++;
174 	} while (remaining);
175 
176 	return 0;
177 }
178 
179 static struct vring_desc *
180 vring_get_next_desc(struct vhost_blk_queue *vq, struct vring_desc *desc)
181 {
182 	if (descriptor_has_next_split(desc))
183 		return &vq->vring.desc[desc->next];
184 
185 	return NULL;
186 }
187 
188 static struct vring_packed_desc *
189 vring_get_next_desc_packed(struct vhost_blk_queue *vq, uint16_t *req_idx)
190 {
191 	if (descriptor_has_next_packed(&vq->vring.desc_packed[*req_idx])) {
192 		*req_idx = (*req_idx + 1) % vq->vring.size;
193 		return &vq->vring.desc_packed[*req_idx];
194 	}
195 
196 	return NULL;
197 }
198 
199 static struct rte_vhost_inflight_desc_packed *
200 vring_get_next_inflight_desc(struct vhost_blk_queue *vq,
201 			struct rte_vhost_inflight_desc_packed *desc)
202 {
203 	if (!!(desc->flags & VRING_DESC_F_NEXT))
204 		return &vq->inflight_ring.inflight_packed->desc[desc->next];
205 
206 	return NULL;
207 }
208 
209 static int
210 setup_iovs_from_descs_split(struct vhost_blk_ctrlr *ctrlr,
211 			    struct vhost_blk_queue *vq, uint16_t req_idx,
212 			    struct iovec *iovs, uint32_t *iovs_idx,
213 			    uint32_t *payload)
214 {
215 	struct vring_desc *desc = &vq->vring.desc[req_idx];
216 
217 	do {
218 		/* does not support indirect descriptors */
219 		assert((desc->flags & VRING_DESC_F_INDIRECT) == 0);
220 
221 		if (*iovs_idx >= VHOST_BLK_MAX_IOVS) {
222 			fprintf(stderr, "Reach VHOST_BLK_MAX_IOVS\n");
223 			return -1;
224 		}
225 
226 		if (desc_payload_to_iovs(ctrlr, iovs, iovs_idx,
227 			desc->addr, desc->len) != 0) {
228 			fprintf(stderr, "Failed to convert desc payload to iovs\n");
229 			return -1;
230 		}
231 
232 		*payload += desc->len;
233 
234 		desc = vring_get_next_desc(vq, desc);
235 	} while (desc != NULL);
236 
237 	return 0;
238 }
239 
240 static int
241 setup_iovs_from_descs_packed(struct vhost_blk_ctrlr *ctrlr,
242 			     struct vhost_blk_queue *vq, uint16_t req_idx,
243 			     struct iovec *iovs, uint32_t *iovs_idx,
244 			     uint32_t *payload)
245 {
246 	struct vring_packed_desc *desc = &vq->vring.desc_packed[req_idx];
247 
248 	do {
249 		/* does not support indirect descriptors */
250 		assert((desc->flags & VRING_DESC_F_INDIRECT) == 0);
251 
252 		if (*iovs_idx >= VHOST_BLK_MAX_IOVS) {
253 			fprintf(stderr, "Reach VHOST_BLK_MAX_IOVS\n");
254 			return -1;
255 		}
256 
257 		if (desc_payload_to_iovs(ctrlr, iovs, iovs_idx,
258 			desc->addr, desc->len) != 0) {
259 			fprintf(stderr, "Failed to convert desc payload to iovs\n");
260 			return -1;
261 		}
262 
263 		*payload += desc->len;
264 
265 		desc = vring_get_next_desc_packed(vq, &req_idx);
266 	} while (desc != NULL);
267 
268 	return 0;
269 }
270 
271 static int
272 setup_iovs_from_inflight_desc(struct vhost_blk_ctrlr *ctrlr,
273 			      struct vhost_blk_queue *vq, uint16_t req_idx,
274 			      struct iovec *iovs, uint32_t *iovs_idx,
275 			      uint32_t *payload)
276 {
277 	struct rte_vhost_ring_inflight *inflight_vq;
278 	struct rte_vhost_inflight_desc_packed *desc;
279 
280 	inflight_vq = &vq->inflight_ring;
281 	desc = &inflight_vq->inflight_packed->desc[req_idx];
282 
283 	do {
284 		/* does not support indirect descriptors */
285 		assert((desc->flags & VRING_DESC_F_INDIRECT) == 0);
286 
287 		if (*iovs_idx >= VHOST_BLK_MAX_IOVS) {
288 			fprintf(stderr, "Reach VHOST_BLK_MAX_IOVS\n");
289 			return -1;
290 		}
291 
292 		if (desc_payload_to_iovs(ctrlr, iovs, iovs_idx,
293 			desc->addr, desc->len) != 0) {
294 			fprintf(stderr, "Failed to convert desc payload to iovs\n");
295 			return -1;
296 		}
297 
298 		*payload += desc->len;
299 
300 		desc = vring_get_next_inflight_desc(vq, desc);
301 	} while (desc != NULL);
302 
303 	return 0;
304 }
305 
306 static void
307 process_blk_task(struct vhost_blk_task *task)
308 {
309 	uint32_t payload = 0;
310 
311 	if (task->vq->packed_ring) {
312 		struct rte_vhost_ring_inflight *inflight_ring;
313 		struct rte_vhost_resubmit_info *resubmit_inflight;
314 
315 		inflight_ring = &task->vq->inflight_ring;
316 		resubmit_inflight = inflight_ring->resubmit_inflight;
317 
318 		if (resubmit_inflight != NULL &&
319 		    resubmit_inflight->resubmit_list != NULL) {
320 			if (setup_iovs_from_inflight_desc(task->ctrlr, task->vq,
321 				task->req_idx, task->iovs, &task->iovs_cnt,
322 				&payload)) {
323 				fprintf(stderr, "Failed to setup iovs\n");
324 				return;
325 			}
326 		} else {
327 			if (setup_iovs_from_descs_packed(task->ctrlr, task->vq,
328 				task->req_idx, task->iovs, &task->iovs_cnt,
329 				&payload)) {
330 				fprintf(stderr, "Failed to setup iovs\n");
331 				return;
332 			}
333 		}
334 	} else {
335 		if (setup_iovs_from_descs_split(task->ctrlr, task->vq,
336 			task->req_idx, task->iovs, &task->iovs_cnt, &payload)) {
337 			fprintf(stderr, "Failed to setup iovs\n");
338 			return;
339 		}
340 	}
341 
342 	/* First IOV must be the req head. */
343 	task->req = (struct virtio_blk_outhdr *)task->iovs[0].iov_base;
344 	assert(sizeof(*task->req) == task->iovs[0].iov_len);
345 
346 	/* Last IOV must be the status tail. */
347 	task->status = (uint8_t *)task->iovs[task->iovs_cnt - 1].iov_base;
348 	assert(sizeof(*task->status) == task->iovs[task->iovs_cnt - 1].iov_len);
349 
350 	/* Transport data len */
351 	task->data_len = payload - task->iovs[0].iov_len -
352 		task->iovs[task->iovs_cnt - 1].iov_len;
353 
354 	if (vhost_bdev_process_blk_commands(task->ctrlr->bdev, task))
355 		/* invalid response */
356 		*task->status = VIRTIO_BLK_S_IOERR;
357 	else
358 		/* successfully */
359 		*task->status = VIRTIO_BLK_S_OK;
360 
361 	if (task->vq->packed_ring)
362 		enqueue_task_packed(task);
363 	else
364 		enqueue_task(task);
365 }
366 
367 static void
368 blk_task_init(struct vhost_blk_task *task)
369 {
370 	task->iovs_cnt = 0;
371 	task->data_len = 0;
372 	task->req = NULL;
373 	task->status = NULL;
374 }
375 
376 static void
377 submit_inflight_vq(struct vhost_blk_queue *vq)
378 {
379 	struct rte_vhost_ring_inflight *inflight_ring;
380 	struct rte_vhost_resubmit_info *resubmit_inflight;
381 	struct vhost_blk_task *task;
382 
383 	inflight_ring = &vq->inflight_ring;
384 	resubmit_inflight = inflight_ring->resubmit_inflight;
385 
386 	if (resubmit_inflight == NULL ||
387 	    resubmit_inflight->resubmit_num == 0)
388 		return;
389 
390 	fprintf(stdout, "Resubmit inflight num is %d\n",
391 		resubmit_inflight->resubmit_num);
392 
393 	while (resubmit_inflight->resubmit_num-- > 0) {
394 		uint16_t desc_idx;
395 
396 		desc_idx = resubmit_inflight->resubmit_list[
397 					resubmit_inflight->resubmit_num].index;
398 
399 		if (vq->packed_ring) {
400 			uint16_t task_idx;
401 			struct rte_vhost_inflight_desc_packed *desc;
402 
403 			desc = inflight_ring->inflight_packed->desc;
404 			task_idx = desc[desc[desc_idx].last].id;
405 			task = &vq->tasks[task_idx];
406 
407 			task->req_idx = desc_idx;
408 			task->chain_num = desc[desc_idx].num;
409 			task->buffer_id = task_idx;
410 			task->inflight_idx = desc_idx;
411 
412 			vq->last_avail_idx += desc[desc_idx].num;
413 			if (vq->last_avail_idx >= vq->vring.size) {
414 				vq->last_avail_idx -= vq->vring.size;
415 				vq->avail_wrap_counter =
416 					!vq->avail_wrap_counter;
417 			}
418 		} else
419 			/* In split ring, the desc_idx is the req_id
420 			 * which was initialized when allocated the task pool.
421 			 */
422 			task = &vq->tasks[desc_idx];
423 
424 		blk_task_init(task);
425 		process_blk_task(task);
426 	}
427 
428 	free(resubmit_inflight->resubmit_list);
429 	resubmit_inflight->resubmit_list = NULL;
430 }
431 
432 /* Use the buffer_id as the task_idx */
433 static uint16_t
434 vhost_blk_vq_get_desc_chain_buffer_id(struct vhost_blk_queue *vq,
435 				      uint16_t *req_head, uint16_t *num)
436 {
437 	struct vring_packed_desc *desc = &vq->vring.desc_packed[
438 						vq->last_avail_idx];
439 
440 	*req_head = vq->last_avail_idx;
441 	*num = 1;
442 
443 	while (descriptor_has_next_packed(desc)) {
444 		vq->last_avail_idx = (vq->last_avail_idx + 1) % vq->vring.size;
445 		desc = &vq->vring.desc_packed[vq->last_avail_idx];
446 		*num += 1;
447 	}
448 
449 	/* Point to next desc */
450 	vq->last_avail_idx = (vq->last_avail_idx + 1) % vq->vring.size;
451 	if (vq->last_avail_idx < *req_head)
452 		vq->avail_wrap_counter = !vq->avail_wrap_counter;
453 
454 	return desc->id;
455 }
456 
457 static uint16_t
458 vq_get_desc_idx(struct vhost_blk_queue *vq)
459 {
460 	uint16_t desc_idx;
461 	uint16_t last_avail_idx;
462 
463 	last_avail_idx = vq->last_avail_idx & (vq->vring.size - 1);
464 	desc_idx = vq->vring.avail->ring[last_avail_idx];
465 	vq->last_avail_idx++;
466 
467 	return desc_idx;
468 }
469 
470 static int
471 vhost_blk_vq_is_avail(struct vhost_blk_queue *vq)
472 {
473 	if (vq->packed_ring) {
474 		uint16_t flags = vq->vring.desc_packed[
475 					vq->last_avail_idx].flags;
476 		bool avail_wrap_counter = vq->avail_wrap_counter;
477 
478 		return (!!(flags & VIRTQ_DESC_F_AVAIL) == avail_wrap_counter &&
479 			!!(flags & VIRTQ_DESC_F_USED) != avail_wrap_counter);
480 	} else {
481 		if (vq->vring.avail->idx != vq->last_avail_idx)
482 			return 1;
483 
484 		return 0;
485 	}
486 }
487 
488 static void
489 process_vq(struct vhost_blk_queue *vq)
490 {
491 	struct vhost_blk_task *task;
492 
493 	if (vq->packed_ring) {
494 		while (vhost_blk_vq_is_avail(vq)) {
495 			uint16_t task_idx, req_idx, last_idx, chain_num;
496 
497 			task_idx = vhost_blk_vq_get_desc_chain_buffer_id(vq,
498 					&req_idx, &chain_num);
499 			task = &vq->tasks[task_idx];
500 
501 			blk_task_init(task);
502 			task->req_idx = req_idx;
503 			task->chain_num = chain_num;
504 			task->buffer_id = task_idx;
505 			last_idx = (req_idx + chain_num - 1) % vq->vring.size;
506 
507 			rte_vhost_set_inflight_desc_packed(task->ctrlr->vid,
508 							   vq->id,
509 							   task->req_idx,
510 							   last_idx,
511 							   &task->inflight_idx);
512 
513 			process_blk_task(task);
514 		}
515 	} else {
516 		while (vhost_blk_vq_is_avail(vq)) {
517 			uint16_t desc_idx;
518 
519 			desc_idx = vq_get_desc_idx(vq);
520 			task = &vq->tasks[desc_idx];
521 
522 			blk_task_init(task);
523 			rte_vhost_set_inflight_desc_split(task->ctrlr->vid,
524 							  vq->id,
525 							  task->req_idx);
526 			process_blk_task(task);
527 		}
528 	}
529 }
530 
531 static void *
532 ctrlr_worker(void *arg)
533 {
534 	struct vhost_blk_ctrlr *ctrlr = (struct vhost_blk_ctrlr *)arg;
535 	cpu_set_t cpuset;
536 	pthread_t thread;
537 	int i;
538 
539 	fprintf(stdout, "Ctrlr Worker Thread start\n");
540 
541 	if (ctrlr == NULL || ctrlr->bdev == NULL) {
542 		fprintf(stderr,
543 			"%s: Error, invalid argument passed to worker thread\n",
544 			__func__);
545 		exit(0);
546 	}
547 
548 	thread = pthread_self();
549 	CPU_ZERO(&cpuset);
550 	CPU_SET(0, &cpuset);
551 	pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
552 
553 	for (i = 0; i < NUM_OF_BLK_QUEUES; i++)
554 		submit_inflight_vq(&ctrlr->queues[i]);
555 
556 	while (worker_thread_status != WORKER_STATE_STOP)
557 		for (i = 0; i < NUM_OF_BLK_QUEUES; i++)
558 			process_vq(&ctrlr->queues[i]);
559 
560 	fprintf(stdout, "Ctrlr Worker Thread Exiting\n");
561 	sem_post(&exit_sem);
562 	return NULL;
563 }
564 
565 static int
566 alloc_task_pool(struct vhost_blk_ctrlr *ctrlr)
567 {
568 	struct vhost_blk_queue *vq;
569 	int i, j;
570 
571 	for (i = 0; i < NUM_OF_BLK_QUEUES; i++) {
572 		vq = &ctrlr->queues[i];
573 
574 		vq->tasks = rte_zmalloc(NULL,
575 			sizeof(struct vhost_blk_task) * vq->vring.size, 0);
576 		if (!vq->tasks) {
577 			fprintf(stderr, "Failed to allocate task memory\n");
578 			return -1;
579 		}
580 
581 		for (j = 0; j < vq->vring.size; j++) {
582 			vq->tasks[j].req_idx = j;
583 			vq->tasks[j].ctrlr = ctrlr;
584 			vq->tasks[j].vq = vq;
585 		}
586 	}
587 
588 	return 0;
589 }
590 
591 static void
592 free_task_pool(struct vhost_blk_ctrlr *ctrlr)
593 {
594 	int i;
595 
596 	for (i = 0; i < NUM_OF_BLK_QUEUES; i++)
597 		rte_free(ctrlr->queues[i].tasks);
598 }
599 
600 static int
601 new_device(int vid)
602 {
603 	struct vhost_blk_ctrlr *ctrlr;
604 	struct vhost_blk_queue *vq;
605 	char path[PATH_MAX];
606 	uint64_t features, protocol_features;
607 	pthread_t tid;
608 	int i, ret;
609 	bool packed_ring, inflight_shmfd;
610 
611 	ret = rte_vhost_get_ifname(vid, path, PATH_MAX);
612 	if (ret) {
613 		fprintf(stderr, "Failed to get the socket path\n");
614 		return -1;
615 	}
616 
617 	ctrlr = vhost_blk_ctrlr_find(path);
618 	if (!ctrlr) {
619 		fprintf(stderr, "Failed to find controller\n");
620 		return -1;
621 	}
622 
623 	if (ctrlr->started)
624 		return 0;
625 
626 	ctrlr->vid = vid;
627 	ret = rte_vhost_get_negotiated_features(vid, &features);
628 	if (ret) {
629 		fprintf(stderr, "Failed to get the negotiated features\n");
630 		return -1;
631 	}
632 	packed_ring = !!(features & (1ULL << VIRTIO_F_RING_PACKED));
633 
634 	ret = rte_vhost_get_negotiated_protocol_features(
635 		vid, &protocol_features);
636 	if (ret) {
637 		fprintf(stderr,
638 			"Failed to get the negotiated protocol features\n");
639 		return -1;
640 	}
641 	inflight_shmfd = !!(features &
642 			    (1ULL << VHOST_USER_PROTOCOL_F_INFLIGHT_SHMFD));
643 
644 	/* Disable Notifications and init last idx */
645 	for (i = 0; i < NUM_OF_BLK_QUEUES; i++) {
646 		vq = &ctrlr->queues[i];
647 		vq->id = i;
648 
649 		assert(rte_vhost_get_vhost_vring(ctrlr->vid, i,
650 						 &vq->vring) == 0);
651 		assert(rte_vhost_get_vring_base(ctrlr->vid, i,
652 					       &vq->last_avail_idx,
653 					       &vq->last_used_idx) == 0);
654 
655 		if (inflight_shmfd)
656 			assert(rte_vhost_get_vhost_ring_inflight(
657 				       ctrlr->vid, i,
658 				       &vq->inflight_ring) == 0);
659 
660 		if (packed_ring && inflight_shmfd) {
661 			/* for the reconnection */
662 			assert(rte_vhost_get_vring_base_from_inflight(
663 				ctrlr->vid, i,
664 				&vq->last_avail_idx,
665 				&vq->last_used_idx) == 0);
666 
667 			vq->avail_wrap_counter = vq->last_avail_idx &
668 				(1 << 15);
669 			vq->last_avail_idx = vq->last_avail_idx &
670 				0x7fff;
671 			vq->used_wrap_counter = vq->last_used_idx &
672 				(1 << 15);
673 			vq->last_used_idx = vq->last_used_idx &
674 				0x7fff;
675 		}
676 
677 		vq->packed_ring = packed_ring;
678 		rte_vhost_enable_guest_notification(vid, i, 0);
679 	}
680 
681 	assert(rte_vhost_get_mem_table(vid, &ctrlr->mem) == 0);
682 	assert(ctrlr->mem != NULL);
683 	assert(alloc_task_pool(ctrlr) == 0);
684 
685 	/* start polling vring */
686 	worker_thread_status = WORKER_STATE_START;
687 	fprintf(stdout, "New Device %s, Device ID %d\n", path, vid);
688 	if (pthread_create(&tid, NULL, &ctrlr_worker, ctrlr) < 0) {
689 		fprintf(stderr, "Worker Thread Started Failed\n");
690 		return -1;
691 	}
692 
693 	/* device has been started */
694 	ctrlr->started = 1;
695 	pthread_detach(tid);
696 	return 0;
697 }
698 
699 static void
700 destroy_device(int vid)
701 {
702 	char path[PATH_MAX];
703 	struct vhost_blk_ctrlr *ctrlr;
704 	struct vhost_blk_queue *vq;
705 	int i, ret;
706 
707 	ret = rte_vhost_get_ifname(vid, path, PATH_MAX);
708 	if (ret) {
709 		fprintf(stderr, "Destroy Ctrlr Failed\n");
710 		return;
711 	}
712 
713 	fprintf(stdout, "Destroy %s Device ID %d\n", path, vid);
714 	ctrlr = vhost_blk_ctrlr_find(path);
715 	if (!ctrlr) {
716 		fprintf(stderr, "Destroy Ctrlr Failed\n");
717 		return;
718 	}
719 
720 	if (!ctrlr->started)
721 		return;
722 
723 	worker_thread_status = WORKER_STATE_STOP;
724 	sem_wait(&exit_sem);
725 
726 	for (i = 0; i < NUM_OF_BLK_QUEUES; i++) {
727 		vq = &ctrlr->queues[i];
728 		if (vq->packed_ring) {
729 			vq->last_avail_idx |= (vq->avail_wrap_counter <<
730 				15);
731 			vq->last_used_idx |= (vq->used_wrap_counter <<
732 				15);
733 		}
734 
735 		rte_vhost_set_vring_base(ctrlr->vid, i,
736 					 vq->last_avail_idx,
737 					 vq->last_used_idx);
738 	}
739 
740 	free_task_pool(ctrlr);
741 	free(ctrlr->mem);
742 
743 	ctrlr->started = 0;
744 }
745 
746 static int
747 new_connection(int vid)
748 {
749 	/* extend the proper features for block device */
750 	vhost_session_install_rte_compat_hooks(vid);
751 
752 	return 0;
753 }
754 
755 struct vhost_device_ops vhost_blk_device_ops = {
756 	.new_device =  new_device,
757 	.destroy_device = destroy_device,
758 	.new_connection = new_connection,
759 };
760 
761 static struct vhost_block_dev *
762 vhost_blk_bdev_construct(const char *bdev_name,
763 	const char *bdev_serial, uint32_t blk_size, uint64_t blk_cnt,
764 	bool wce_enable)
765 {
766 	struct vhost_block_dev *bdev;
767 
768 	bdev = rte_zmalloc(NULL, sizeof(*bdev), RTE_CACHE_LINE_SIZE);
769 	if (!bdev)
770 		return NULL;
771 
772 	snprintf(bdev->name, sizeof(bdev->name), "%s", bdev_name);
773 	snprintf(bdev->product_name, sizeof(bdev->product_name), "%s",
774 		 bdev_serial);
775 	bdev->blocklen = blk_size;
776 	bdev->blockcnt = blk_cnt;
777 	bdev->write_cache = wce_enable;
778 
779 	fprintf(stdout, "Blocklen=%d, blockcnt=%"PRIx64"\n", bdev->blocklen,
780 		bdev->blockcnt);
781 
782 	/* use memory as disk storage space */
783 	bdev->data = rte_zmalloc(NULL, blk_cnt * blk_size, 0);
784 	if (!bdev->data) {
785 		fprintf(stderr, "No enough reserved huge memory for disk\n");
786 		free(bdev);
787 		return NULL;
788 	}
789 
790 	return bdev;
791 }
792 
793 static struct vhost_blk_ctrlr *
794 vhost_blk_ctrlr_construct(const char *ctrlr_name)
795 {
796 	int ret;
797 	struct vhost_blk_ctrlr *ctrlr;
798 	char *path;
799 	char cwd[PATH_MAX];
800 
801 	/* always use current directory */
802 	path = getcwd(cwd, PATH_MAX);
803 	if (!path) {
804 		fprintf(stderr, "Cannot get current working directory\n");
805 		return NULL;
806 	}
807 	snprintf(dev_pathname, sizeof(dev_pathname), "%s/%s", path, ctrlr_name);
808 
809 	unlink(dev_pathname);
810 
811 	if (rte_vhost_driver_register(dev_pathname, 0) != 0) {
812 		fprintf(stderr, "Socket %s already exists\n", dev_pathname);
813 		return NULL;
814 	}
815 
816 	ret = rte_vhost_driver_set_features(dev_pathname, VHOST_BLK_FEATURES);
817 	if (ret != 0) {
818 		fprintf(stderr, "Set vhost driver features failed\n");
819 		rte_vhost_driver_unregister(dev_pathname);
820 		return NULL;
821 	}
822 
823 	/* set vhost user protocol features */
824 	vhost_dev_install_rte_compat_hooks(dev_pathname);
825 
826 	ctrlr = rte_zmalloc(NULL, sizeof(*ctrlr), RTE_CACHE_LINE_SIZE);
827 	if (!ctrlr) {
828 		rte_vhost_driver_unregister(dev_pathname);
829 		return NULL;
830 	}
831 
832 	/* hardcoded block device information with 128MiB */
833 	ctrlr->bdev = vhost_blk_bdev_construct("malloc0", "vhost_blk_malloc0",
834 						4096, 32768, 0);
835 	if (!ctrlr->bdev) {
836 		rte_free(ctrlr);
837 		rte_vhost_driver_unregister(dev_pathname);
838 		return NULL;
839 	}
840 
841 	rte_vhost_driver_callback_register(dev_pathname,
842 					   &vhost_blk_device_ops);
843 
844 	return ctrlr;
845 }
846 
847 static void
848 vhost_blk_ctrlr_destroy(struct vhost_blk_ctrlr *ctrlr)
849 {
850 	if (ctrlr->bdev != NULL) {
851 		if (ctrlr->bdev->data != NULL)
852 			rte_free(ctrlr->bdev->data);
853 
854 		rte_free(ctrlr->bdev);
855 	}
856 	rte_free(ctrlr);
857 
858 	rte_vhost_driver_unregister(dev_pathname);
859 }
860 
861 static void
862 signal_handler(__rte_unused int signum)
863 {
864 	struct vhost_blk_ctrlr *ctrlr;
865 
866 	ctrlr = vhost_blk_ctrlr_find(dev_pathname);
867 	if (ctrlr == NULL)
868 		return;
869 
870 	if (ctrlr->started)
871 		destroy_device(ctrlr->vid);
872 
873 	vhost_blk_ctrlr_destroy(ctrlr);
874 	exit(0);
875 }
876 
877 int main(int argc, char *argv[])
878 {
879 	int ret;
880 
881 	/* init EAL */
882 	ret = rte_eal_init(argc, argv);
883 	if (ret < 0)
884 		rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
885 
886 	g_vhost_ctrlr = vhost_blk_ctrlr_construct(CTRLR_NAME);
887 	if (g_vhost_ctrlr == NULL) {
888 		fprintf(stderr, "Construct vhost blk controller failed\n");
889 		return 0;
890 	}
891 
892 	if (sem_init(&exit_sem, 0, 0) < 0) {
893 		fprintf(stderr, "Error init exit_sem\n");
894 		return -1;
895 	}
896 
897 	signal(SIGINT, signal_handler);
898 
899 	ret = rte_vhost_driver_start(dev_pathname);
900 	if (ret < 0) {
901 		fprintf(stderr, "Failed to start vhost driver.\n");
902 		return -1;
903 	}
904 
905 	/* loop for exit the application */
906 	while (1)
907 		sleep(1);
908 
909 	return 0;
910 }
911