xref: /spdk/lib/vhost/vhost_scsi.c (revision a6014eb2adf0c95816b23ef94a911005fa047511)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include <linux/virtio_scsi.h>
37 
38 #include "spdk/env.h"
39 #include "spdk/scsi.h"
40 #include "spdk/scsi_spec.h"
41 #include "spdk/conf.h"
42 #include "spdk/event.h"
43 #include "spdk/util.h"
44 #include "spdk/likely.h"
45 
46 #include "spdk/vhost.h"
47 #include "vhost_internal.h"
48 
49 /* Features supported by SPDK VHOST lib. */
50 #define SPDK_VHOST_SCSI_FEATURES	((1ULL << VIRTIO_F_VERSION_1) | \
51 					(1ULL << VHOST_F_LOG_ALL) | \
52 					(1ULL << VHOST_USER_F_PROTOCOL_FEATURES) | \
53 					(1ULL << VIRTIO_F_NOTIFY_ON_EMPTY) | \
54 					(1ULL << VIRTIO_SCSI_F_INOUT) | \
55 					(1ULL << VIRTIO_SCSI_F_HOTPLUG) | \
56 					(1ULL << VIRTIO_SCSI_F_CHANGE ) | \
57 					(1ULL << VIRTIO_SCSI_F_T10_PI ))
58 
59 /* Features that are specified in VIRTIO SCSI but currently not supported:
60  * - Live migration not supported yet
61  * - T10 PI
62  */
63 #define SPDK_VHOST_SCSI_DISABLED_FEATURES	((1ULL << VHOST_F_LOG_ALL) | \
64 						(1ULL << VIRTIO_SCSI_F_T10_PI ))
65 
66 #define MGMT_POLL_PERIOD_US (1000 * 5)
67 
68 #define VIRTIO_SCSI_CONTROLQ   0
69 #define VIRTIO_SCSI_EVENTQ   1
70 #define VIRTIO_SCSI_REQUESTQ   2
71 
72 /* Allocated iovec buffer len */
73 #define SPDK_VHOST_SCSI_IOVS_LEN 128
74 
75 struct spdk_vhost_scsi_dev {
76 	struct spdk_vhost_dev vdev;
77 	struct spdk_scsi_dev *scsi_dev[SPDK_VHOST_SCSI_CTRLR_MAX_DEVS];
78 	bool removed_dev[SPDK_VHOST_SCSI_CTRLR_MAX_DEVS];
79 
80 	struct spdk_ring *task_pool;
81 	struct spdk_poller *requestq_poller;
82 	struct spdk_poller *mgmt_poller;
83 
84 	struct spdk_ring *eventq_ring;
85 } __rte_cache_aligned;
86 
87 struct spdk_vhost_scsi_task {
88 	struct spdk_scsi_task	scsi;
89 	struct iovec iovs[SPDK_VHOST_SCSI_IOVS_LEN];
90 
91 	union {
92 		struct virtio_scsi_cmd_resp *resp;
93 		struct virtio_scsi_ctrl_tmf_resp *tmf_resp;
94 	};
95 
96 	struct spdk_vhost_scsi_dev *svdev;
97 	struct spdk_scsi_dev *scsi_dev;
98 
99 	int req_idx;
100 
101 	struct rte_vhost_vring *vq;
102 };
103 
104 static int new_device(int vid);
105 static void destroy_device(int vid);
106 
107 const struct spdk_vhost_dev_backend spdk_vhost_scsi_device_backend = {
108 	.virtio_features = SPDK_VHOST_SCSI_FEATURES,
109 	.disabled_features = SPDK_VHOST_SCSI_DISABLED_FEATURES,
110 	.ops = {
111 		.new_device =  new_device,
112 		.destroy_device = destroy_device,
113 	}
114 };
115 
116 static void
117 spdk_vhost_scsi_task_put(struct spdk_vhost_scsi_task *task)
118 {
119 	spdk_scsi_task_put(&task->scsi);
120 }
121 
122 static void
123 spdk_vhost_scsi_task_free_cb(struct spdk_scsi_task *scsi_task)
124 {
125 	struct spdk_vhost_scsi_task *task = SPDK_CONTAINEROF(scsi_task, struct spdk_vhost_scsi_task, scsi);
126 
127 	assert(task->svdev->vdev.task_cnt > 0);
128 	task->svdev->vdev.task_cnt--;
129 	spdk_ring_enqueue(task->svdev->task_pool, (void **) &task, 1);
130 }
131 
132 static void
133 spdk_vhost_get_tasks(struct spdk_vhost_scsi_dev *svdev, struct spdk_vhost_scsi_task **tasks,
134 		     size_t count)
135 {
136 	size_t res_count;
137 
138 	res_count = spdk_ring_dequeue(svdev->task_pool, (void **)tasks, count);
139 	if (res_count != count) {
140 		SPDK_ERRLOG("%s: couldn't get %zu tasks from task_pool\n", svdev->vdev.name, count);
141 		/* FIXME: we should never run out of tasks, but what if we do? */
142 		abort();
143 	}
144 
145 	assert(svdev->vdev.task_cnt <= INT_MAX - (int) res_count);
146 	svdev->vdev.task_cnt += res_count;
147 }
148 
149 static void
150 process_eventq(struct spdk_vhost_scsi_dev *svdev)
151 {
152 	struct rte_vhost_vring *vq;
153 	struct vring_desc *desc;
154 	struct virtio_scsi_event *ev, *desc_ev;
155 	uint32_t req_size;
156 	uint16_t req;
157 
158 	vq = &svdev->vdev.virtqueue[VIRTIO_SCSI_EVENTQ];
159 
160 	while (spdk_ring_dequeue(svdev->eventq_ring, (void **)&ev, 1) == 1) {
161 		if (spdk_vhost_vq_avail_ring_get(vq, &req, 1) != 1) {
162 			SPDK_ERRLOG("Controller %s: Failed to send virtio event (no avail ring entries?).\n",
163 				    svdev->vdev.name);
164 			spdk_dma_free(ev);
165 			break;
166 		}
167 
168 		desc =  spdk_vhost_vq_get_desc(vq, req);
169 		desc_ev = spdk_vhost_gpa_to_vva(&svdev->vdev, desc->addr);
170 
171 		if (desc->len >= sizeof(*desc_ev) && desc_ev != NULL) {
172 			req_size = sizeof(*desc_ev);
173 			memcpy(desc_ev, ev, sizeof(*desc_ev));
174 		} else {
175 			SPDK_ERRLOG("Controller %s: Invalid eventq descriptor.\n", svdev->vdev.name);
176 			req_size = 0;
177 		}
178 
179 		spdk_vhost_vq_used_ring_enqueue(&svdev->vdev, vq, req, req_size);
180 		spdk_dma_free(ev);
181 	}
182 }
183 
184 static void
185 process_removed_devs(struct spdk_vhost_scsi_dev *svdev)
186 {
187 	struct spdk_scsi_dev *dev;
188 	int i;
189 
190 	for (i = 0; i < SPDK_VHOST_SCSI_CTRLR_MAX_DEVS; ++i) {
191 		dev = svdev->scsi_dev[i];
192 
193 		if (dev && svdev->removed_dev[i] && !spdk_scsi_dev_has_pending_tasks(dev)) {
194 			spdk_scsi_dev_free_io_channels(dev);
195 			spdk_scsi_dev_destruct(dev);
196 			svdev->scsi_dev[i] = NULL;
197 			SPDK_NOTICELOG("%s: hotremoved device 'Dev %u'.\n", svdev->vdev.name, i);
198 		}
199 	}
200 }
201 
202 static void
203 eventq_enqueue(struct spdk_vhost_scsi_dev *svdev, const struct spdk_scsi_dev *dev,
204 	       const struct spdk_scsi_lun *lun, uint32_t event, uint32_t reason)
205 {
206 	struct virtio_scsi_event *ev;
207 	int dev_id, lun_id;
208 
209 	if (dev == NULL) {
210 		SPDK_ERRLOG("%s: eventq device cannot be NULL.\n", svdev->vdev.name);
211 		return;
212 	}
213 
214 	for (dev_id = 0; dev_id < SPDK_VHOST_SCSI_CTRLR_MAX_DEVS; dev_id++) {
215 		if (svdev->scsi_dev[dev_id] == dev) {
216 			break;
217 		}
218 	}
219 
220 	if (dev_id == SPDK_VHOST_SCSI_CTRLR_MAX_DEVS) {
221 		SPDK_ERRLOG("Dev %s is not a part of vhost scsi controller '%s'.\n", spdk_scsi_dev_get_name(dev),
222 			    svdev->vdev.name);
223 		return;
224 	}
225 
226 	/* some events may apply to the entire device via lun id set to 0 */
227 	lun_id = lun == NULL ? 0 : spdk_scsi_lun_get_id(lun);
228 
229 	ev = spdk_dma_zmalloc(sizeof(*ev), SPDK_CACHE_LINE_SIZE, NULL);
230 	assert(ev);
231 
232 	ev->event = event;
233 	ev->lun[0] = 1;
234 	ev->lun[1] = dev_id;
235 	ev->lun[2] = lun_id >> 8; /* relies on linux kernel implementation */
236 	ev->lun[3] = lun_id & 0xFF;
237 	ev->reason = reason;
238 
239 	if (spdk_ring_enqueue(svdev->eventq_ring, (void **)&ev, 1) != 1) {
240 		SPDK_ERRLOG("Controller %s: Failed to inform guest about LUN #%d removal (no room in ring?).\n",
241 			    svdev->vdev.name, lun_id);
242 		spdk_dma_free(ev);
243 	}
244 }
245 
246 static void
247 submit_completion(struct spdk_vhost_scsi_task *task)
248 {
249 	spdk_vhost_vq_used_ring_enqueue(&task->svdev->vdev, task->vq, task->req_idx,
250 					task->scsi.data_transferred);
251 	SPDK_TRACELOG(SPDK_TRACE_VHOST_SCSI, "Finished task (%p) req_idx=%d\n", task, task->req_idx);
252 
253 	spdk_vhost_scsi_task_put(task);
254 }
255 
256 static void
257 spdk_vhost_scsi_task_mgmt_cpl(struct spdk_scsi_task *scsi_task)
258 {
259 	struct spdk_vhost_scsi_task *task = SPDK_CONTAINEROF(scsi_task, struct spdk_vhost_scsi_task, scsi);
260 
261 	submit_completion(task);
262 }
263 
264 static void
265 spdk_vhost_scsi_task_cpl(struct spdk_scsi_task *scsi_task)
266 {
267 	struct spdk_vhost_scsi_task *task = SPDK_CONTAINEROF(scsi_task, struct spdk_vhost_scsi_task, scsi);
268 
269 	/* The SCSI task has completed.  Do final processing and then post
270 	   notification to the virtqueue's "used" ring.
271 	 */
272 	task->resp->status = task->scsi.status;
273 
274 	if (task->scsi.status != SPDK_SCSI_STATUS_GOOD) {
275 		memcpy(task->resp->sense, task->scsi.sense_data, task->scsi.sense_data_len);
276 		task->resp->sense_len = task->scsi.sense_data_len;
277 	}
278 	task->resp->resid = task->scsi.transfer_len - task->scsi.data_transferred;
279 
280 	submit_completion(task);
281 }
282 
283 static void
284 task_submit(struct spdk_vhost_scsi_task *task)
285 {
286 	/* The task is ready to be submitted.  First create the callback event that
287 	   will be invoked when the SCSI command is completed.  See spdk_vhost_scsi_task_cpl()
288 	   for what SPDK vhost-scsi does when the task is completed.
289 	 */
290 
291 	task->resp->response = VIRTIO_SCSI_S_OK;
292 	spdk_scsi_dev_queue_task(task->scsi_dev, &task->scsi);
293 }
294 
295 static void
296 mgmt_task_submit(struct spdk_vhost_scsi_task *task, enum spdk_scsi_task_func func)
297 {
298 	task->tmf_resp->response = VIRTIO_SCSI_S_OK;
299 	spdk_scsi_dev_queue_mgmt_task(task->scsi_dev, &task->scsi, func);
300 }
301 
302 static void
303 invalid_request(struct spdk_vhost_scsi_task *task)
304 {
305 	/* Flush eventq so that guest is instantly notified about any hotremoved luns.
306 	 * This might prevent him from sending more invalid requests and trying to reset
307 	 * the device.
308 	 */
309 	process_eventq(task->svdev);
310 	spdk_vhost_vq_used_ring_enqueue(&task->svdev->vdev, task->vq, task->req_idx, 0);
311 	spdk_vhost_scsi_task_put(task);
312 
313 	SPDK_TRACELOG(SPDK_TRACE_VHOST_SCSI, "Invalid request (status=%" PRIu8")\n",
314 		      task->resp ? task->resp->response : -1);
315 }
316 
317 static int
318 spdk_vhost_scsi_task_init_target(struct spdk_vhost_scsi_task *task, const __u8 *lun)
319 {
320 	struct spdk_scsi_dev *dev;
321 	uint16_t lun_id = (((uint16_t)lun[2] << 8) | lun[3]) & 0x3FFF;
322 
323 	SPDK_TRACEDUMP(SPDK_TRACE_VHOST_SCSI_QUEUE, "LUN", lun, 8);
324 
325 	/* First byte must be 1 and second is target */
326 	if (lun[0] != 1 || lun[1] >= SPDK_VHOST_SCSI_CTRLR_MAX_DEVS)
327 		return -1;
328 
329 	dev = task->svdev->scsi_dev[lun[1]];
330 	task->scsi_dev = dev;
331 	if (dev == NULL) {
332 		/* If dev has been hotremoved, return 0 to allow sending additional
333 		 * hotremove event via sense codes.
334 		 */
335 		return task->svdev->removed_dev[lun[1]] ? 0 : -1;
336 	}
337 
338 	task->scsi.target_port = spdk_scsi_dev_find_port_by_id(task->scsi_dev, 0);
339 	task->scsi.lun = spdk_scsi_dev_get_lun(dev, lun_id);
340 	return 0;
341 }
342 
343 static void
344 process_ctrl_request(struct spdk_vhost_scsi_task *task)
345 {
346 	struct vring_desc *desc;
347 	struct virtio_scsi_ctrl_tmf_req *ctrl_req;
348 	struct virtio_scsi_ctrl_an_resp *an_resp;
349 
350 	spdk_scsi_task_construct(&task->scsi, spdk_vhost_scsi_task_mgmt_cpl, spdk_vhost_scsi_task_free_cb,
351 				 NULL);
352 	desc = spdk_vhost_vq_get_desc(task->vq, task->req_idx);
353 	ctrl_req = spdk_vhost_gpa_to_vva(&task->svdev->vdev, desc->addr);
354 
355 	SPDK_TRACELOG(SPDK_TRACE_VHOST_SCSI_QUEUE,
356 		      "Processing controlq descriptor: desc %d/%p, desc_addr %p, len %d, flags %d, last_used_idx %d; kickfd %d; size %d\n",
357 		      task->req_idx, desc, (void *)desc->addr, desc->len, desc->flags, task->vq->last_used_idx,
358 		      task->vq->kickfd, task->vq->size);
359 	SPDK_TRACEDUMP(SPDK_TRACE_VHOST_SCSI_QUEUE, "Request desriptor", (uint8_t *)ctrl_req,
360 		       desc->len);
361 
362 	spdk_vhost_scsi_task_init_target(task, ctrl_req->lun);
363 
364 	/* Process the TMF request */
365 	switch (ctrl_req->type) {
366 	case VIRTIO_SCSI_T_TMF:
367 		/* Get the response buffer */
368 		assert(spdk_vhost_vring_desc_has_next(desc));
369 		desc = spdk_vhost_vring_desc_get_next(task->vq->desc, desc);
370 		task->tmf_resp = spdk_vhost_gpa_to_vva(&task->svdev->vdev, desc->addr);
371 
372 		/* Check if we are processing a valid request */
373 		if (task->scsi_dev == NULL) {
374 			task->tmf_resp->response = VIRTIO_SCSI_S_BAD_TARGET;
375 			break;
376 		}
377 
378 		switch (ctrl_req->subtype) {
379 		case VIRTIO_SCSI_T_TMF_LOGICAL_UNIT_RESET:
380 			/* Handle LUN reset */
381 			SPDK_TRACELOG(SPDK_TRACE_VHOST_SCSI_QUEUE, "LUN reset\n");
382 
383 			mgmt_task_submit(task, SPDK_SCSI_TASK_FUNC_LUN_RESET);
384 			return;
385 		default:
386 			task->tmf_resp->response = VIRTIO_SCSI_S_ABORTED;
387 			/* Unsupported command */
388 			SPDK_TRACELOG(SPDK_TRACE_VHOST_SCSI_QUEUE, "Unsupported TMF command %x\n", ctrl_req->subtype);
389 			break;
390 		}
391 		break;
392 	case VIRTIO_SCSI_T_AN_QUERY:
393 	case VIRTIO_SCSI_T_AN_SUBSCRIBE: {
394 		desc = spdk_vhost_vring_desc_get_next(task->vq->desc, desc);
395 		an_resp = spdk_vhost_gpa_to_vva(&task->svdev->vdev, desc->addr);
396 		an_resp->response = VIRTIO_SCSI_S_ABORTED;
397 		break;
398 	}
399 	default:
400 		SPDK_TRACELOG(SPDK_TRACE_VHOST_SCSI_QUEUE, "Unsupported control command %x\n", ctrl_req->type);
401 		break;
402 	}
403 
404 	spdk_vhost_vq_used_ring_enqueue(&task->svdev->vdev, task->vq, task->req_idx, 0);
405 	spdk_vhost_scsi_task_put(task);
406 }
407 
408 /*
409  * Process task's descriptor chain and setup data related fields.
410  * Return
411  *   -1 if request is invalid and must be aborted,
412  *    0 if all data are set,
413  *    1 if it was not possible to allocate IO vector for this task.
414  */
415 static int
416 task_data_setup(struct spdk_vhost_scsi_task *task,
417 		struct virtio_scsi_cmd_req **req)
418 {
419 	struct rte_vhost_vring *vq = task->vq;
420 	struct spdk_vhost_dev *vdev = &task->svdev->vdev;
421 	struct vring_desc *desc =  spdk_vhost_vq_get_desc(task->vq, task->req_idx);
422 	struct iovec *iovs = task->iovs;
423 	uint16_t iovcnt = 0, iovcnt_max = SPDK_VHOST_SCSI_IOVS_LEN;
424 	uint32_t len = 0;
425 
426 	/* Sanity check. First descriptor must be readable and must have next one. */
427 	if (spdk_unlikely(spdk_vhost_vring_desc_is_wr(desc) || !spdk_vhost_vring_desc_has_next(desc))) {
428 		SPDK_WARNLOG("Invalid first (request) descriptor.\n");
429 		task->resp = NULL;
430 		goto abort_task;
431 	}
432 
433 	spdk_scsi_task_construct(&task->scsi, spdk_vhost_scsi_task_cpl, spdk_vhost_scsi_task_free_cb, NULL);
434 	*req = spdk_vhost_gpa_to_vva(vdev, desc->addr);
435 
436 	desc = spdk_vhost_vring_desc_get_next(vq->desc, desc);
437 	task->scsi.dxfer_dir = spdk_vhost_vring_desc_is_wr(desc) ? SPDK_SCSI_DIR_FROM_DEV :
438 			       SPDK_SCSI_DIR_TO_DEV;
439 	task->scsi.iovs = iovs;
440 
441 	if (task->scsi.dxfer_dir == SPDK_SCSI_DIR_FROM_DEV) {
442 		/*
443 		 * FROM_DEV (READ): [RD_req][WR_resp][WR_buf0]...[WR_bufN]
444 		 */
445 		task->resp = spdk_vhost_gpa_to_vva(vdev, desc->addr);
446 		if (!spdk_vhost_vring_desc_has_next(desc)) {
447 			/*
448 			 * TEST UNIT READY command and some others might not contain any payload and this is not an error.
449 			 */
450 			SPDK_TRACELOG(SPDK_TRACE_VHOST_SCSI_DATA,
451 				      "No payload descriptors for FROM DEV command req_idx=%"PRIu16".\n", task->req_idx);
452 			SPDK_TRACEDUMP(SPDK_TRACE_VHOST_SCSI_DATA, "CDB=", (*req)->cdb, VIRTIO_SCSI_CDB_SIZE);
453 			task->scsi.iovcnt = 1;
454 			task->scsi.iovs[0].iov_len = 0;
455 			task->scsi.length = 0;
456 			task->scsi.transfer_len = 0;
457 			return 0;
458 		}
459 
460 		desc = spdk_vhost_vring_desc_get_next(vq->desc, desc);
461 
462 		/* All remaining descriptors are data. */
463 		while (iovcnt < iovcnt_max) {
464 			spdk_vhost_vring_desc_to_iov(vdev, &iovs[iovcnt], desc);
465 			len += desc->len;
466 			iovcnt++;
467 
468 			if (!spdk_vhost_vring_desc_has_next(desc))
469 				break;
470 
471 			desc = spdk_vhost_vring_desc_get_next(vq->desc, desc);
472 			if (spdk_unlikely(!spdk_vhost_vring_desc_is_wr(desc))) {
473 				SPDK_WARNLOG("FROM DEV cmd: descriptor nr %" PRIu16" in payload chain is read only.\n", iovcnt);
474 				task->resp = NULL;
475 				goto abort_task;
476 			}
477 		}
478 	} else {
479 		SPDK_TRACELOG(SPDK_TRACE_VHOST_SCSI_DATA, "TO DEV");
480 		/*
481 		 * TO_DEV (WRITE):[RD_req][RD_buf0]...[RD_bufN][WR_resp]
482 		 * No need to check descriptor WR flag as this is done while setting scsi.dxfer_dir.
483 		 */
484 
485 		/* Process descriptors up to response. */
486 		while (!spdk_vhost_vring_desc_is_wr(desc) && iovcnt < iovcnt_max) {
487 			spdk_vhost_vring_desc_to_iov(vdev, &iovs[iovcnt], desc);
488 			len += desc->len;
489 			iovcnt++;
490 
491 			if (!spdk_vhost_vring_desc_has_next(desc)) {
492 				SPDK_WARNLOG("TO_DEV cmd: no response descriptor.\n");
493 				task->resp = NULL;
494 				goto abort_task;
495 			}
496 
497 			desc = spdk_vhost_vring_desc_get_next(vq->desc, desc);
498 		}
499 
500 		task->resp = spdk_vhost_gpa_to_vva(vdev, desc->addr);
501 		if (spdk_vhost_vring_desc_has_next(desc)) {
502 			SPDK_WARNLOG("TO_DEV cmd: ignoring unexpected descriptors after response descriptor.\n");
503 		}
504 	}
505 
506 	if (iovcnt == iovcnt_max) {
507 		SPDK_WARNLOG("Too many IO vectors in chain!\n");
508 		goto abort_task;
509 	}
510 
511 	task->scsi.iovcnt = iovcnt;
512 	task->scsi.length = len;
513 	task->scsi.transfer_len = len;
514 	return 0;
515 
516 abort_task:
517 	if (task->resp) {
518 		task->resp->response = VIRTIO_SCSI_S_ABORTED;
519 	}
520 
521 	return -1;
522 }
523 
524 static int
525 process_request(struct spdk_vhost_scsi_task *task)
526 {
527 	struct virtio_scsi_cmd_req *req;
528 	int result;
529 
530 	result = task_data_setup(task, &req);
531 	if (result) {
532 		return result;
533 	}
534 
535 	result = spdk_vhost_scsi_task_init_target(task, req->lun);
536 	if (spdk_unlikely(result != 0)) {
537 		task->resp->response = VIRTIO_SCSI_S_BAD_TARGET;
538 		return -1;
539 	}
540 
541 	task->scsi.cdb = req->cdb;
542 	SPDK_TRACEDUMP(SPDK_TRACE_VHOST_SCSI_DATA, "request CDB", req->cdb, VIRTIO_SCSI_CDB_SIZE);
543 
544 	if (spdk_unlikely(task->scsi.lun == NULL)) {
545 		spdk_scsi_task_process_null_lun(&task->scsi);
546 		task->resp->response = VIRTIO_SCSI_S_OK;
547 		return 1;
548 	}
549 
550 	return 0;
551 }
552 
553 static void
554 process_controlq(struct spdk_vhost_scsi_dev *svdev, struct rte_vhost_vring *vq)
555 {
556 	struct spdk_vhost_scsi_task *tasks[32];
557 	struct spdk_vhost_scsi_task *task;
558 	uint16_t reqs[32];
559 	uint16_t reqs_cnt, i;
560 
561 	reqs_cnt = spdk_vhost_vq_avail_ring_get(vq, reqs, SPDK_COUNTOF(reqs));
562 	spdk_vhost_get_tasks(svdev, tasks, reqs_cnt);
563 	for (i = 0; i < reqs_cnt; i++) {
564 		task = tasks[i];
565 		memset(task, 0, sizeof(*task));
566 		task->vq = vq;
567 		task->svdev = svdev;
568 		task->req_idx = reqs[i];
569 
570 		process_ctrl_request(task);
571 	}
572 }
573 
574 static void
575 process_requestq(struct spdk_vhost_scsi_dev *svdev, struct rte_vhost_vring *vq)
576 {
577 	struct spdk_vhost_scsi_task *tasks[32];
578 	struct spdk_vhost_scsi_task *task;
579 	uint16_t reqs[32];
580 	uint16_t reqs_cnt, i;
581 	int result;
582 
583 	reqs_cnt = spdk_vhost_vq_avail_ring_get(vq, reqs, SPDK_COUNTOF(reqs));
584 	assert(reqs_cnt <= 32);
585 
586 	spdk_vhost_get_tasks(svdev, tasks, reqs_cnt);
587 
588 	for (i = 0; i < reqs_cnt; i++) {
589 		SPDK_TRACELOG(SPDK_TRACE_VHOST_SCSI, "====== Starting processing request idx %"PRIu16"======\n",
590 			      reqs[i]);
591 
592 		task = tasks[i];
593 		memset(task, 0, sizeof(*task));
594 		task->vq = vq;
595 		task->svdev = svdev;
596 		task->req_idx = reqs[i];
597 		result = process_request(task);
598 		if (likely(result == 0)) {
599 			task_submit(task);
600 			SPDK_TRACELOG(SPDK_TRACE_VHOST_SCSI, "====== Task %p req_idx %d submitted ======\n", task,
601 				      task->req_idx);
602 		} else if (result > 0) {
603 			spdk_vhost_scsi_task_cpl(&task->scsi);
604 			SPDK_TRACELOG(SPDK_TRACE_VHOST_SCSI, "====== Task %p req_idx %d finished early ======\n", task,
605 				      task->req_idx);
606 		} else {
607 			invalid_request(task);
608 			SPDK_TRACELOG(SPDK_TRACE_VHOST_SCSI, "====== Task %p req_idx %d failed ======\n", task,
609 				      task->req_idx);
610 		}
611 	}
612 }
613 
614 static void
615 vdev_mgmt_worker(void *arg)
616 {
617 	struct spdk_vhost_scsi_dev *svdev = arg;
618 
619 	process_removed_devs(svdev);
620 	process_eventq(svdev);
621 	process_controlq(svdev, &svdev->vdev.virtqueue[VIRTIO_SCSI_CONTROLQ]);
622 }
623 
624 static void
625 vdev_worker(void *arg)
626 {
627 	struct spdk_vhost_scsi_dev *svdev = arg;
628 	uint32_t q_idx;
629 
630 	for (q_idx = VIRTIO_SCSI_REQUESTQ; q_idx < svdev->vdev.num_queues; q_idx++) {
631 		process_requestq(svdev, &svdev->vdev.virtqueue[q_idx]);
632 	}
633 }
634 
635 static void
636 add_vdev_cb(void *arg)
637 {
638 	struct spdk_vhost_scsi_dev *svdev = arg;
639 	struct spdk_vhost_dev *vdev = &svdev->vdev;
640 	uint32_t i;
641 
642 	for (i = 0; i < SPDK_VHOST_SCSI_CTRLR_MAX_DEVS; i++) {
643 		if (svdev->scsi_dev[i] == NULL) {
644 			continue;
645 		}
646 		spdk_scsi_dev_allocate_io_channels(svdev->scsi_dev[i]);
647 	}
648 	SPDK_NOTICELOG("Started poller for vhost controller %s on lcore %d\n", vdev->name, vdev->lcore);
649 
650 	spdk_vhost_dev_mem_register(vdev);
651 
652 	spdk_poller_register(&svdev->requestq_poller, vdev_worker, svdev, vdev->lcore, 0);
653 	spdk_poller_register(&svdev->mgmt_poller, vdev_mgmt_worker, svdev, vdev->lcore,
654 			     MGMT_POLL_PERIOD_US);
655 }
656 
657 static void
658 remove_vdev_cb(void *arg)
659 {
660 	struct spdk_vhost_scsi_dev *svdev = arg;
661 	void *ev;
662 	uint32_t i;
663 
664 	for (i = 0; i < SPDK_VHOST_SCSI_CTRLR_MAX_DEVS; i++) {
665 		if (svdev->scsi_dev[i] == NULL) {
666 			continue;
667 		}
668 		spdk_scsi_dev_free_io_channels(svdev->scsi_dev[i]);
669 	}
670 
671 	SPDK_NOTICELOG("Stopping poller for vhost controller %s\n", svdev->vdev.name);
672 	spdk_vhost_dev_mem_unregister(&svdev->vdev);
673 
674 	/* Cleanup not sent events */
675 	while (spdk_ring_dequeue(svdev->eventq_ring, &ev, 1) == 1) {
676 		spdk_dma_free(ev);
677 	}
678 }
679 
680 static struct spdk_vhost_scsi_dev *
681 to_scsi_dev(struct spdk_vhost_dev *ctrlr)
682 {
683 	if (ctrlr == NULL) {
684 		return NULL;
685 	}
686 
687 	if (ctrlr->type != SPDK_VHOST_DEV_T_SCSI) {
688 		SPDK_ERRLOG("Controller %s: expected SCSI controller (%d) but got %d\n",
689 			    ctrlr->name, SPDK_VHOST_DEV_T_SCSI, ctrlr->type);
690 		return NULL;
691 	}
692 
693 	return (struct spdk_vhost_scsi_dev *)ctrlr;
694 }
695 
696 int
697 spdk_vhost_scsi_dev_construct(const char *name, uint64_t cpumask)
698 {
699 	struct spdk_vhost_scsi_dev *svdev = spdk_dma_zmalloc(sizeof(struct spdk_vhost_scsi_dev),
700 					    SPDK_CACHE_LINE_SIZE, NULL);
701 	int rc;
702 
703 	if (svdev == NULL) {
704 		return -ENOMEM;
705 	}
706 
707 	svdev->eventq_ring = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 16, SOCKET_ID_ANY);
708 	if (svdev->eventq_ring == NULL) {
709 		spdk_dma_free(svdev);
710 		return -ENOMEM;
711 	}
712 
713 	rc = spdk_vhost_dev_construct(&svdev->vdev, name, cpumask, SPDK_VHOST_DEV_T_SCSI,
714 				      &spdk_vhost_scsi_device_backend);
715 
716 	if (rc) {
717 		spdk_ring_free(svdev->eventq_ring);
718 		spdk_dma_free(svdev);
719 		return rc;
720 	}
721 
722 	return 0;
723 }
724 
725 int
726 spdk_vhost_scsi_dev_remove(struct spdk_vhost_dev *vdev)
727 {
728 	struct spdk_vhost_scsi_dev *svdev = to_scsi_dev(vdev);
729 	int i;
730 
731 	if (svdev == NULL) {
732 		return -EINVAL;
733 	}
734 
735 	for (i = 0; i < SPDK_VHOST_SCSI_CTRLR_MAX_DEVS; ++i) {
736 		if (svdev->scsi_dev[i]) {
737 			SPDK_ERRLOG("Trying to remove non-empty controller: %s.\n", vdev->name);
738 			return -EBUSY;
739 		}
740 	}
741 
742 	if (spdk_vhost_dev_remove(vdev) != 0) {
743 		return -EIO;
744 	}
745 
746 	spdk_ring_free(svdev->eventq_ring);
747 	spdk_dma_free(svdev);
748 	return 0;
749 }
750 
751 struct spdk_scsi_dev *
752 spdk_vhost_scsi_dev_get_dev(struct spdk_vhost_dev *vdev, uint8_t num)
753 {
754 	struct spdk_vhost_scsi_dev *svdev;
755 
756 	assert(num < SPDK_VHOST_SCSI_CTRLR_MAX_DEVS);
757 	svdev = to_scsi_dev(vdev);
758 
759 	return svdev ? svdev->scsi_dev[num] : NULL;
760 }
761 
762 static void
763 spdk_vhost_scsi_lun_hotremove(const struct spdk_scsi_lun *lun, void *arg)
764 {
765 	struct spdk_vhost_scsi_dev *svdev = arg;
766 
767 	assert(lun != NULL);
768 	assert(svdev != NULL);
769 	if ((svdev->vdev.negotiated_features & (1ULL << VIRTIO_SCSI_F_HOTPLUG)) == 0) {
770 		SPDK_WARNLOG("Controller %s: hotremove is not supported\n", svdev->vdev.name);
771 		return;
772 	}
773 
774 	eventq_enqueue(svdev, spdk_scsi_lun_get_dev(lun), lun, VIRTIO_SCSI_T_TRANSPORT_RESET,
775 		       VIRTIO_SCSI_EVT_RESET_REMOVED);
776 }
777 
778 int
779 spdk_vhost_scsi_dev_add_dev(const char *ctrlr_name, unsigned scsi_dev_num, const char *lun_name)
780 {
781 	struct spdk_vhost_scsi_dev *svdev;
782 	struct spdk_vhost_dev *vdev;
783 	char dev_name[SPDK_SCSI_DEV_MAX_NAME];
784 	int lun_id_list[1];
785 	char *lun_names_list[1];
786 
787 	if (ctrlr_name == NULL) {
788 		SPDK_ERRLOG("No controller name\n");
789 		return -EINVAL;
790 	}
791 
792 	if (scsi_dev_num >= SPDK_VHOST_SCSI_CTRLR_MAX_DEVS) {
793 		SPDK_ERRLOG("Controller %d device number too big (max %d)\n", scsi_dev_num,
794 			    SPDK_VHOST_SCSI_CTRLR_MAX_DEVS);
795 		return -EINVAL;
796 	}
797 
798 	if (lun_name == NULL) {
799 		SPDK_ERRLOG("No lun name specified \n");
800 		return -EINVAL;
801 	} else if (strlen(lun_name) >= SPDK_SCSI_DEV_MAX_NAME) {
802 		SPDK_ERRLOG("LUN name '%s' too long (max %d).\n", lun_name, SPDK_SCSI_DEV_MAX_NAME - 1);
803 		return -1;
804 	}
805 
806 	vdev = spdk_vhost_dev_find(ctrlr_name);
807 	if (vdev == NULL) {
808 		SPDK_ERRLOG("Controller %s is not defined.\n", ctrlr_name);
809 		return -ENODEV;
810 	}
811 
812 	svdev = to_scsi_dev(vdev);
813 	if (svdev == NULL) {
814 		return -EINVAL;
815 	}
816 
817 	if (vdev->lcore != -1) {
818 		SPDK_ERRLOG("Controller %s is in use and hotplug is not supported\n", ctrlr_name);
819 		return -ENODEV;
820 	}
821 
822 	if (svdev->scsi_dev[scsi_dev_num] != NULL) {
823 		SPDK_ERRLOG("Controller %s dev %u already occupied\n", ctrlr_name, scsi_dev_num);
824 		return -EEXIST;
825 	}
826 
827 	/*
828 	 * At this stage only one LUN per device
829 	 */
830 	snprintf(dev_name, sizeof(dev_name), "Dev %u", scsi_dev_num);
831 	lun_id_list[0] = 0;
832 	lun_names_list[0] = (char *)lun_name;
833 
834 	svdev->removed_dev[scsi_dev_num] = false;
835 	svdev->scsi_dev[scsi_dev_num] = spdk_scsi_dev_construct(dev_name, lun_names_list, lun_id_list, 1,
836 					SPDK_SPC_PROTOCOL_IDENTIFIER_SAS, spdk_vhost_scsi_lun_hotremove, svdev);
837 
838 	if (svdev->scsi_dev[scsi_dev_num] == NULL) {
839 		SPDK_ERRLOG("Couldn't create spdk SCSI device '%s' using lun device '%s' in controller: %s\n",
840 			    dev_name, lun_name, vdev->name);
841 		return -EINVAL;
842 	}
843 	spdk_scsi_dev_add_port(svdev->scsi_dev[scsi_dev_num], 0, "vhost");
844 	SPDK_NOTICELOG("Controller %s: defined device '%s' using lun '%s'\n",
845 		       vdev->name, dev_name, lun_name);
846 	return 0;
847 }
848 
849 int
850 spdk_vhost_scsi_dev_remove_dev(struct spdk_vhost_dev *vdev, unsigned scsi_dev_num)
851 {
852 	struct spdk_vhost_scsi_dev *svdev;
853 	struct spdk_scsi_dev *scsi_dev;
854 
855 	if (scsi_dev_num >= SPDK_VHOST_SCSI_CTRLR_MAX_DEVS) {
856 		SPDK_ERRLOG("%s: invalid device number %d\n", vdev->name, scsi_dev_num);
857 		return -EINVAL;
858 	}
859 
860 	svdev = to_scsi_dev(vdev);
861 	if (svdev == NULL) {
862 		return -ENODEV;
863 	}
864 
865 	scsi_dev = svdev->scsi_dev[scsi_dev_num];
866 	if (scsi_dev == NULL) {
867 		SPDK_ERRLOG("Controller %s dev %u is not occupied\n", vdev->name, scsi_dev_num);
868 		return -ENODEV;
869 	}
870 
871 	if (svdev->vdev.lcore == -1) {
872 		/* controller is not in use, remove dev and exit */
873 		spdk_scsi_dev_destruct(scsi_dev);
874 		svdev->scsi_dev[scsi_dev_num] = NULL;
875 		SPDK_NOTICELOG("%s: removed device 'Dev %u'\n", vdev->name, scsi_dev_num);
876 		return 0;
877 	}
878 
879 	if ((svdev->vdev.negotiated_features & (1ULL << VIRTIO_SCSI_F_HOTPLUG)) == 0) {
880 		SPDK_WARNLOG("Controller %s: hotremove is not supported\n", svdev->vdev.name);
881 		return -ENOTSUP;
882 	}
883 
884 	svdev->removed_dev[scsi_dev_num] = true;
885 	eventq_enqueue(svdev, scsi_dev, NULL, VIRTIO_SCSI_T_TRANSPORT_RESET, VIRTIO_SCSI_EVT_RESET_REMOVED);
886 
887 	SPDK_NOTICELOG("%s: 'Dev %u' marked for hotremove.\n", vdev->name, scsi_dev_num);
888 	return 0;
889 }
890 
891 int
892 spdk_vhost_scsi_controller_construct(void)
893 {
894 	struct spdk_conf_section *sp = spdk_conf_first_section(NULL);
895 	int i, dev_num;
896 	unsigned ctrlr_num = 0;
897 	char *lun_name, *dev_num_str;
898 	char *cpumask_str;
899 	char *name;
900 	uint64_t cpumask;
901 
902 	while (sp != NULL) {
903 		if (!spdk_conf_section_match_prefix(sp, "VhostScsi")) {
904 			sp = spdk_conf_next_section(sp);
905 			continue;
906 		}
907 
908 		if (sscanf(spdk_conf_section_get_name(sp), "VhostScsi%u", &ctrlr_num) != 1) {
909 			SPDK_ERRLOG("Section '%s' has non-numeric suffix.\n",
910 				    spdk_conf_section_get_name(sp));
911 			return -1;
912 		}
913 
914 		name =  spdk_conf_section_get_val(sp, "Name");
915 		cpumask_str = spdk_conf_section_get_val(sp, "Cpumask");
916 		if (cpumask_str == NULL) {
917 			cpumask = spdk_app_get_core_mask();
918 		} else if (spdk_vhost_parse_core_mask(cpumask_str, &cpumask)) {
919 			SPDK_ERRLOG("%s: Error parsing cpumask '%s' while creating controller\n", name, cpumask_str);
920 			return -1;
921 		}
922 
923 		if (spdk_vhost_scsi_dev_construct(name, cpumask) < 0) {
924 			return -1;
925 		}
926 
927 		for (i = 0; spdk_conf_section_get_nval(sp, "Dev", i) != NULL; i++) {
928 			dev_num_str = spdk_conf_section_get_nmval(sp, "Dev", i, 0);
929 			if (dev_num_str == NULL) {
930 				SPDK_ERRLOG("%s: Invalid or missing Dev number\n", name);
931 				return -1;
932 			}
933 
934 			dev_num = (int)strtol(dev_num_str, NULL, 10);
935 			lun_name = spdk_conf_section_get_nmval(sp, "Dev", i, 1);
936 			if (lun_name == NULL) {
937 				SPDK_ERRLOG("%s: Invalid or missing LUN name for dev %d\n", name, dev_num);
938 				return -1;
939 			} else if (spdk_conf_section_get_nmval(sp, "Dev", i, 2)) {
940 				SPDK_ERRLOG("%s: Only one LUN per vhost SCSI device supported\n", name);
941 				return -1;
942 			}
943 
944 			if (spdk_vhost_scsi_dev_add_dev(name, dev_num, lun_name) < 0) {
945 				return -1;
946 			}
947 		}
948 
949 		sp = spdk_conf_next_section(sp);
950 
951 	}
952 
953 	return 0;
954 }
955 
956 static void
957 free_task_pool(struct spdk_vhost_scsi_dev *svdev)
958 {
959 	struct spdk_vhost_task *task;
960 
961 	if (!svdev->task_pool) {
962 		return;
963 	}
964 
965 	while (spdk_ring_dequeue(svdev->task_pool, (void **)&task, 1) == 1) {
966 		spdk_dma_free(task);
967 	}
968 
969 	spdk_ring_free(svdev->task_pool);
970 	svdev->task_pool = NULL;
971 }
972 
973 static int
974 alloc_task_pool(struct spdk_vhost_scsi_dev *svdev)
975 {
976 	struct spdk_vhost_scsi_task *task;
977 	uint32_t task_cnt = 0;
978 	uint32_t ring_size;
979 	uint16_t i;
980 	int rc;
981 
982 	for (i = 0; i < svdev->vdev.num_queues; i++) {
983 		/*
984 		 * FIXME:
985 		 * this is too big because we need only size/2 from each queue but for now
986 		 * lets leave it as is to be sure we are not mistaken.
987 		 *
988 		 * Limit the pool size to 1024 * num_queues. This should be enough as QEMU have the
989 		 * same hard limit for queue size.
990 		 */
991 		task_cnt += spdk_min(svdev->vdev.virtqueue[i].size, 1024);
992 	}
993 
994 	ring_size = spdk_align32pow2(task_cnt + 1);
995 	svdev->task_pool = spdk_ring_create(SPDK_RING_TYPE_SP_SC, ring_size,
996 					    spdk_env_get_socket_id(svdev->vdev.lcore));
997 	if (svdev->task_pool == NULL) {
998 		SPDK_ERRLOG("Controller %s: Failed to init vhost scsi task pool\n", svdev->vdev.name);
999 		return -1;
1000 	}
1001 
1002 	for (i = 0; i < task_cnt; ++i) {
1003 		task = spdk_dma_zmalloc(sizeof(*task), SPDK_CACHE_LINE_SIZE, NULL);
1004 		if (task == NULL) {
1005 			SPDK_ERRLOG("Controller %s: Failed to allocate task\n", svdev->vdev.name);
1006 			free_task_pool(svdev);
1007 			return -1;
1008 		}
1009 
1010 		rc = spdk_ring_enqueue(svdev->task_pool, (void **)&task, 1);
1011 		if (rc != 1) {
1012 			SPDK_ERRLOG("Controller %s: Failed to alloc %"PRIu32" vhost scsi tasks\n", svdev->vdev.name,
1013 				    task_cnt);
1014 			free_task_pool(svdev);
1015 			return -1;
1016 		}
1017 	}
1018 
1019 	return 0;
1020 }
1021 
1022 /*
1023  * A new device is added to a data core. First the device is added to the main linked list
1024  * and then allocated to a specific data core.
1025  */
1026 static int
1027 new_device(int vid)
1028 {
1029 	struct spdk_vhost_scsi_dev *svdev = NULL;
1030 
1031 	svdev = to_scsi_dev(spdk_vhost_dev_load(vid));
1032 	if (svdev == NULL) {
1033 		return -1;
1034 	}
1035 
1036 	if (alloc_task_pool(svdev)) {
1037 		spdk_vhost_dev_unload(&svdev->vdev);
1038 		return -1;
1039 	}
1040 
1041 	spdk_vhost_timed_event_send(svdev->vdev.lcore, add_vdev_cb, svdev, 1, "add scsi vdev");
1042 	return 0;
1043 }
1044 
1045 static void
1046 destroy_device(int vid)
1047 {
1048 	struct spdk_vhost_scsi_dev *svdev;
1049 	struct spdk_vhost_dev *vdev;
1050 	struct spdk_vhost_timed_event event = {0};
1051 	uint32_t i;
1052 
1053 	vdev = spdk_vhost_dev_find_by_vid(vid);
1054 	if (vdev == NULL) {
1055 		rte_panic("Couldn't find device with vid %d to stop.\n", vid);
1056 	}
1057 	svdev = to_scsi_dev(vdev);
1058 	assert(svdev);
1059 
1060 	spdk_vhost_timed_event_init(&event, vdev->lcore, NULL, NULL, 1);
1061 	spdk_poller_unregister(&svdev->requestq_poller, event.spdk_event);
1062 	spdk_vhost_timed_event_wait(&event, "unregister request queue poller");
1063 
1064 	spdk_vhost_timed_event_init(&event, vdev->lcore, NULL, NULL, 1);
1065 	spdk_poller_unregister(&svdev->mgmt_poller, event.spdk_event);
1066 	spdk_vhost_timed_event_wait(&event, "unregister management poller");
1067 
1068 	/* Wait for all tasks to finish */
1069 	for (i = 1000; i && vdev->task_cnt > 0; i--) {
1070 		usleep(1000);
1071 	}
1072 
1073 	if (vdev->task_cnt > 0) {
1074 		SPDK_ERRLOG("%s: pending tasks did not finish in 1s.\n", vdev->name);
1075 	}
1076 
1077 	spdk_vhost_timed_event_send(vdev->lcore, remove_vdev_cb, svdev, 1, "remove scsi vdev");
1078 
1079 	free_task_pool(svdev);
1080 	spdk_vhost_dev_unload(vdev);
1081 }
1082 
1083 int
1084 spdk_vhost_init(void)
1085 {
1086 	return 0;
1087 }
1088 
1089 int
1090 spdk_vhost_fini(void)
1091 {
1092 	return 0;
1093 }
1094 
1095 SPDK_LOG_REGISTER_TRACE_FLAG("vhost_scsi", SPDK_TRACE_VHOST_SCSI)
1096 SPDK_LOG_REGISTER_TRACE_FLAG("vhost_scsi_queue", SPDK_TRACE_VHOST_SCSI_QUEUE)
1097 SPDK_LOG_REGISTER_TRACE_FLAG("vhost_scsi_data", SPDK_TRACE_VHOST_SCSI_DATA)
1098