xref: /spdk/lib/ublk/ublk.c (revision a4009e7ad3d0aa0cfda4ce321e22161cbd1a26dc)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2022 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include <liburing.h>
7 
8 #include "spdk/stdinc.h"
9 #include "spdk/string.h"
10 #include "spdk/bdev.h"
11 #include "spdk/endian.h"
12 #include "spdk/env.h"
13 #include "spdk/likely.h"
14 #include "spdk/log.h"
15 #include "spdk/util.h"
16 #include "spdk/queue.h"
17 #include "spdk/json.h"
18 #include "spdk/ublk.h"
19 #include "spdk/thread.h"
20 
21 #include "ublk_internal.h"
22 
23 #define UBLK_CTRL_DEV					"/dev/ublk-control"
24 #define UBLK_BLK_CDEV					"/dev/ublkc"
25 
26 #define LINUX_SECTOR_SHIFT				9
27 #define UBLK_IO_MAX_BYTES				SPDK_BDEV_LARGE_BUF_MAX_SIZE
28 #define UBLK_DEV_MAX_QUEUES				32
29 #define UBLK_DEV_MAX_QUEUE_DEPTH			1024
30 #define UBLK_QUEUE_REQUEST				32
31 #define UBLK_STOP_BUSY_WAITING_MS			10000
32 #define UBLK_BUSY_POLLING_INTERVAL_US			20000
33 #define UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US	1000
34 /* By default, kernel ublk_drv driver can support up to 64 block devices */
35 #define UBLK_DEFAULT_MAX_SUPPORTED_DEVS			64
36 
37 #define UBLK_IOBUF_SMALL_CACHE_SIZE			128
38 #define UBLK_IOBUF_LARGE_CACHE_SIZE			32
39 
40 #define UBLK_DEBUGLOG(ublk, format, ...) \
41 	SPDK_DEBUGLOG(ublk, "ublk%d: " format, ublk->ublk_id, ##__VA_ARGS__);
42 
43 static uint32_t g_num_ublk_poll_groups = 0;
44 static uint32_t g_next_ublk_poll_group = 0;
45 static uint32_t g_ublks_max = UBLK_DEFAULT_MAX_SUPPORTED_DEVS;
46 static struct spdk_cpuset g_core_mask;
47 
48 struct ublk_queue;
49 struct ublk_poll_group;
50 struct ublk_io;
51 static void _ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io);
52 static void ublk_dev_queue_fini(struct ublk_queue *q);
53 static int ublk_poll(void *arg);
54 static int ublk_ctrl_cmd(struct spdk_ublk_dev *ublk, uint32_t cmd_op);
55 
56 typedef void (*ublk_next_state_fn)(struct spdk_ublk_dev *ublk);
57 static void ublk_set_params(struct spdk_ublk_dev *ublk);
58 static void ublk_finish_start(struct spdk_ublk_dev *ublk);
59 static void ublk_free_dev(struct spdk_ublk_dev *ublk);
60 
61 static const char *ublk_op_name[64]
62 __attribute__((unused)) = {
63 	[UBLK_CMD_ADD_DEV] =	"UBLK_CMD_ADD_DEV",
64 	[UBLK_CMD_DEL_DEV] =	"UBLK_CMD_DEL_DEV",
65 	[UBLK_CMD_START_DEV] =	"UBLK_CMD_START_DEV",
66 	[UBLK_CMD_STOP_DEV] =	"UBLK_CMD_STOP_DEV",
67 	[UBLK_CMD_SET_PARAMS] =	"UBLK_CMD_SET_PARAMS",
68 };
69 
70 typedef void (*ublk_get_buf_cb)(struct ublk_io *io);
71 
72 struct ublk_io {
73 	void			*payload;
74 	void			*mpool_entry;
75 	bool			need_data;
76 	bool			user_copy;
77 	uint16_t		tag;
78 	uint64_t		payload_size;
79 	uint32_t		cmd_op;
80 	int32_t			result;
81 	struct spdk_bdev_desc	*bdev_desc;
82 	struct spdk_io_channel	*bdev_ch;
83 	const struct ublksrv_io_desc	*iod;
84 	ublk_get_buf_cb		get_buf_cb;
85 	struct ublk_queue	*q;
86 	/* for bdev io_wait */
87 	struct spdk_bdev_io_wait_entry bdev_io_wait;
88 	struct spdk_iobuf_entry	iobuf;
89 
90 	TAILQ_ENTRY(ublk_io)	tailq;
91 };
92 
93 struct ublk_queue {
94 	uint32_t		q_id;
95 	uint32_t		q_depth;
96 	struct ublk_io		*ios;
97 	TAILQ_HEAD(, ublk_io)	completed_io_list;
98 	TAILQ_HEAD(, ublk_io)	inflight_io_list;
99 	uint32_t		cmd_inflight;
100 	bool			is_stopping;
101 	struct ublksrv_io_desc	*io_cmd_buf;
102 	/* ring depth == dev_info->queue_depth. */
103 	struct io_uring		ring;
104 	struct spdk_ublk_dev	*dev;
105 	struct ublk_poll_group	*poll_group;
106 	struct spdk_io_channel	*bdev_ch;
107 
108 	TAILQ_ENTRY(ublk_queue)	tailq;
109 };
110 
111 struct spdk_ublk_dev {
112 	struct spdk_bdev	*bdev;
113 	struct spdk_bdev_desc	*bdev_desc;
114 
115 	int			cdev_fd;
116 	struct ublk_params	dev_params;
117 	struct ublksrv_ctrl_dev_info	dev_info;
118 
119 	uint32_t		ublk_id;
120 	uint32_t		num_queues;
121 	uint32_t		queue_depth;
122 	uint32_t		sector_per_block_shift;
123 	struct ublk_queue	queues[UBLK_DEV_MAX_QUEUES];
124 
125 	struct spdk_poller	*retry_poller;
126 	int			retry_count;
127 	uint32_t		queues_closed;
128 	ublk_start_cb		start_cb;
129 	ublk_del_cb		del_cb;
130 	void			*cb_arg;
131 	ublk_next_state_fn	next_state_fn;
132 	uint32_t		ctrl_ops_in_progress;
133 	bool			is_closing;
134 
135 	TAILQ_ENTRY(spdk_ublk_dev) tailq;
136 	TAILQ_ENTRY(spdk_ublk_dev) wait_tailq;
137 };
138 
139 struct ublk_poll_group {
140 	struct spdk_thread		*ublk_thread;
141 	struct spdk_poller		*ublk_poller;
142 	struct spdk_iobuf_channel	iobuf_ch;
143 	TAILQ_HEAD(, ublk_queue)	queue_list;
144 };
145 
146 struct ublk_tgt {
147 	int			ctrl_fd;
148 	bool			active;
149 	bool			is_destroying;
150 	spdk_ublk_fini_cb	cb_fn;
151 	void			*cb_arg;
152 	struct io_uring		ctrl_ring;
153 	struct spdk_poller	*ctrl_poller;
154 	uint32_t		ctrl_ops_in_progress;
155 	struct ublk_poll_group	*poll_groups;
156 	uint32_t		num_ublk_devs;
157 	uint64_t		features;
158 	/* `ublk_drv` supports UBLK_F_CMD_IOCTL_ENCODE */
159 	bool			ioctl_encode;
160 	/* `ublk_drv` supports UBLK_F_USER_COPY */
161 	bool			user_copy;
162 };
163 
164 static TAILQ_HEAD(, spdk_ublk_dev) g_ublk_devs = TAILQ_HEAD_INITIALIZER(g_ublk_devs);
165 static struct ublk_tgt g_ublk_tgt;
166 
167 /* helpers for using io_uring */
168 static inline int
169 ublk_setup_ring(uint32_t depth, struct io_uring *r, unsigned flags)
170 {
171 	struct io_uring_params p = {};
172 
173 	p.flags = flags | IORING_SETUP_CQSIZE;
174 	p.cq_entries = depth;
175 
176 	return io_uring_queue_init_params(depth, r, &p);
177 }
178 
179 static inline struct io_uring_sqe *
180 ublk_uring_get_sqe(struct io_uring *r, uint32_t idx)
181 {
182 	/* Need to update the idx since we set IORING_SETUP_SQE128 parameter in ublk_setup_ring */
183 	return &r->sq.sqes[idx << 1];
184 }
185 
186 static inline void *
187 ublk_get_sqe_cmd(struct io_uring_sqe *sqe)
188 {
189 	return (void *)&sqe->addr3;
190 }
191 
192 static inline void
193 ublk_set_sqe_cmd_op(struct io_uring_sqe *sqe, uint32_t cmd_op)
194 {
195 	uint32_t opc = cmd_op;
196 
197 	if (g_ublk_tgt.ioctl_encode) {
198 		switch (cmd_op) {
199 		/* ctrl uring */
200 		case UBLK_CMD_GET_DEV_INFO:
201 			opc = _IOR('u', UBLK_CMD_GET_DEV_INFO, struct ublksrv_ctrl_cmd);
202 			break;
203 		case UBLK_CMD_ADD_DEV:
204 			opc = _IOWR('u', UBLK_CMD_ADD_DEV, struct ublksrv_ctrl_cmd);
205 			break;
206 		case UBLK_CMD_DEL_DEV:
207 			opc = _IOWR('u', UBLK_CMD_DEL_DEV, struct ublksrv_ctrl_cmd);
208 			break;
209 		case UBLK_CMD_START_DEV:
210 			opc = _IOWR('u', UBLK_CMD_START_DEV, struct ublksrv_ctrl_cmd);
211 			break;
212 		case UBLK_CMD_STOP_DEV:
213 			opc = _IOWR('u', UBLK_CMD_STOP_DEV, struct ublksrv_ctrl_cmd);
214 			break;
215 		case UBLK_CMD_SET_PARAMS:
216 			opc = _IOWR('u', UBLK_CMD_SET_PARAMS, struct ublksrv_ctrl_cmd);
217 			break;
218 
219 		/* io uring */
220 		case UBLK_IO_FETCH_REQ:
221 			opc = _IOWR('u', UBLK_IO_FETCH_REQ, struct ublksrv_io_cmd);
222 			break;
223 		case UBLK_IO_COMMIT_AND_FETCH_REQ:
224 			opc = _IOWR('u', UBLK_IO_COMMIT_AND_FETCH_REQ, struct ublksrv_io_cmd);
225 			break;
226 		case UBLK_IO_NEED_GET_DATA:
227 			opc = _IOWR('u', UBLK_IO_NEED_GET_DATA, struct ublksrv_io_cmd);
228 			break;
229 		default:
230 			break;
231 		}
232 	}
233 
234 	sqe->off = opc;
235 }
236 
237 static inline uint64_t
238 build_user_data(uint16_t tag, uint8_t op)
239 {
240 	assert(!(tag >> 16) && !(op >> 8));
241 
242 	return tag | (op << 16);
243 }
244 
245 static inline uint16_t
246 user_data_to_tag(uint64_t user_data)
247 {
248 	return user_data & 0xffff;
249 }
250 
251 static inline uint8_t
252 user_data_to_op(uint64_t user_data)
253 {
254 	return (user_data >> 16) & 0xff;
255 }
256 
257 static inline uint64_t
258 ublk_user_copy_pos(uint16_t q_id, uint16_t tag)
259 {
260 	return (uint64_t)UBLKSRV_IO_BUF_OFFSET + ((((uint64_t)q_id) << UBLK_QID_OFF) | (((
261 				uint64_t)tag) << UBLK_TAG_OFF));
262 }
263 
264 void
265 spdk_ublk_init(void)
266 {
267 	assert(spdk_thread_is_app_thread(NULL));
268 
269 	g_ublk_tgt.ctrl_fd = -1;
270 	g_ublk_tgt.ctrl_ring.ring_fd = -1;
271 }
272 
273 static int
274 ublk_ctrl_poller(void *arg)
275 {
276 	struct io_uring *ring = &g_ublk_tgt.ctrl_ring;
277 	struct spdk_ublk_dev *ublk;
278 	struct io_uring_cqe *cqe;
279 	const int max = 8;
280 	int i, count = 0, rc;
281 
282 	if (!g_ublk_tgt.ctrl_ops_in_progress) {
283 		return SPDK_POLLER_IDLE;
284 	}
285 
286 	for (i = 0; i < max; i++) {
287 		rc = io_uring_peek_cqe(ring, &cqe);
288 		if (rc == -EAGAIN) {
289 			break;
290 		}
291 
292 		assert(cqe != NULL);
293 		g_ublk_tgt.ctrl_ops_in_progress--;
294 		ublk = (struct spdk_ublk_dev *)cqe->user_data;
295 		UBLK_DEBUGLOG(ublk, "ctrl cmd completed\n");
296 		ublk->ctrl_ops_in_progress--;
297 		if (ublk->next_state_fn) {
298 			ublk->next_state_fn(ublk);
299 		}
300 		io_uring_cqe_seen(ring, cqe);
301 		count++;
302 	}
303 
304 	return count > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
305 }
306 
307 static int
308 ublk_ctrl_cmd(struct spdk_ublk_dev *ublk, uint32_t cmd_op)
309 {
310 	uint32_t dev_id = ublk->ublk_id;
311 	int rc = -EINVAL;
312 	struct io_uring_sqe *sqe;
313 	struct ublksrv_ctrl_cmd *cmd;
314 
315 	UBLK_DEBUGLOG(ublk, "ctrl cmd %s\n", ublk_op_name[cmd_op]);
316 
317 	sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring);
318 	if (!sqe) {
319 		SPDK_ERRLOG("No available sqe in ctrl ring\n");
320 		assert(false);
321 		return -ENOENT;
322 	}
323 
324 	cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe);
325 	sqe->fd = g_ublk_tgt.ctrl_fd;
326 	sqe->opcode = IORING_OP_URING_CMD;
327 	sqe->ioprio = 0;
328 	cmd->dev_id = dev_id;
329 	cmd->queue_id = -1;
330 	ublk->next_state_fn = NULL;
331 
332 	switch (cmd_op) {
333 	case UBLK_CMD_ADD_DEV:
334 		ublk->next_state_fn = ublk_set_params;
335 		cmd->addr = (__u64)(uintptr_t)&ublk->dev_info;
336 		cmd->len = sizeof(ublk->dev_info);
337 		break;
338 	case UBLK_CMD_SET_PARAMS:
339 		ublk->next_state_fn = ublk_finish_start;
340 		cmd->addr = (__u64)(uintptr_t)&ublk->dev_params;
341 		cmd->len = sizeof(ublk->dev_params);
342 		break;
343 	case UBLK_CMD_START_DEV:
344 		cmd->data[0] = getpid();
345 		break;
346 	case UBLK_CMD_STOP_DEV:
347 		break;
348 	case UBLK_CMD_DEL_DEV:
349 		ublk->next_state_fn = ublk_free_dev;
350 		break;
351 	default:
352 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", cmd_op);
353 		return -EINVAL;
354 	}
355 	ublk_set_sqe_cmd_op(sqe, cmd_op);
356 	io_uring_sqe_set_data(sqe, ublk);
357 
358 	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
359 	if (rc < 0) {
360 		SPDK_ERRLOG("uring submit rc %d\n", rc);
361 		return rc;
362 	}
363 	g_ublk_tgt.ctrl_ops_in_progress++;
364 	ublk->ctrl_ops_in_progress++;
365 
366 	return 0;
367 }
368 
369 static int
370 ublk_ctrl_cmd_get_features(void)
371 {
372 	int rc;
373 	struct io_uring_sqe *sqe;
374 	struct io_uring_cqe *cqe;
375 	struct ublksrv_ctrl_cmd *cmd;
376 	uint32_t cmd_op;
377 
378 	sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring);
379 	if (!sqe) {
380 		SPDK_ERRLOG("No available sqe in ctrl ring\n");
381 		assert(false);
382 		return -ENOENT;
383 	}
384 
385 	cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe);
386 	sqe->fd = g_ublk_tgt.ctrl_fd;
387 	sqe->opcode = IORING_OP_URING_CMD;
388 	sqe->ioprio = 0;
389 	cmd->dev_id = -1;
390 	cmd->queue_id = -1;
391 	cmd->addr = (__u64)(uintptr_t)&g_ublk_tgt.features;
392 	cmd->len = sizeof(g_ublk_tgt.features);
393 
394 	cmd_op = UBLK_U_CMD_GET_FEATURES;
395 	ublk_set_sqe_cmd_op(sqe, cmd_op);
396 
397 	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
398 	if (rc < 0) {
399 		SPDK_ERRLOG("uring submit rc %d\n", rc);
400 		return rc;
401 	}
402 
403 	rc = io_uring_wait_cqe(&g_ublk_tgt.ctrl_ring, &cqe);
404 	if (rc < 0) {
405 		SPDK_ERRLOG("wait cqe rc %d\n", rc);
406 		return rc;
407 	}
408 
409 	if (cqe->res == 0) {
410 		g_ublk_tgt.ioctl_encode = !!(g_ublk_tgt.features & UBLK_F_CMD_IOCTL_ENCODE);
411 		g_ublk_tgt.user_copy = !!(g_ublk_tgt.features & UBLK_F_USER_COPY);
412 	}
413 	io_uring_cqe_seen(&g_ublk_tgt.ctrl_ring, cqe);
414 
415 	return 0;
416 }
417 
418 static int
419 ublk_queue_cmd_buf_sz(uint32_t q_depth)
420 {
421 	uint32_t size = q_depth * sizeof(struct ublksrv_io_desc);
422 	uint32_t page_sz = getpagesize();
423 
424 	/* round up size */
425 	return (size + page_sz - 1) & ~(page_sz - 1);
426 }
427 
428 static int
429 ublk_get_max_support_devs(void)
430 {
431 	FILE *file;
432 	char str[128];
433 
434 	file = fopen("/sys/module/ublk_drv/parameters/ublks_max", "r");
435 	if (!file) {
436 		return -ENOENT;
437 	}
438 
439 	if (!fgets(str, sizeof(str), file)) {
440 		fclose(file);
441 		return -EINVAL;
442 	}
443 	fclose(file);
444 
445 	spdk_str_chomp(str);
446 	return spdk_strtol(str, 10);
447 }
448 
449 static int
450 ublk_open(void)
451 {
452 	int rc, ublks_max;
453 
454 	g_ublk_tgt.ctrl_fd = open(UBLK_CTRL_DEV, O_RDWR);
455 	if (g_ublk_tgt.ctrl_fd < 0) {
456 		rc = errno;
457 		SPDK_ERRLOG("UBLK conrol dev %s can't be opened, error=%s\n", UBLK_CTRL_DEV, spdk_strerror(errno));
458 		return -rc;
459 	}
460 
461 	ublks_max = ublk_get_max_support_devs();
462 	if (ublks_max > 0) {
463 		g_ublks_max = ublks_max;
464 	}
465 
466 	/* We need to set SQPOLL for kernels 6.1 and earlier, since they would not defer ublk ctrl
467 	 * ring processing to a workqueue.  Ctrl ring processing is minimal, so SQPOLL is fine.
468 	 * All the commands sent via control uring for a ublk device is executed one by one, so use
469 	 * ublks_max * 2 as the number of uring entries is enough.
470 	 */
471 	rc = ublk_setup_ring(g_ublks_max * 2, &g_ublk_tgt.ctrl_ring,
472 			     IORING_SETUP_SQE128 | IORING_SETUP_SQPOLL);
473 	if (rc < 0) {
474 		SPDK_ERRLOG("UBLK ctrl queue_init: %s\n", spdk_strerror(-rc));
475 		goto err;
476 	}
477 
478 	rc = ublk_ctrl_cmd_get_features();
479 	if (rc) {
480 		goto err;
481 	}
482 
483 	return 0;
484 
485 err:
486 	close(g_ublk_tgt.ctrl_fd);
487 	g_ublk_tgt.ctrl_fd = -1;
488 	return rc;
489 }
490 
491 static int
492 ublk_parse_core_mask(const char *mask)
493 {
494 	struct spdk_cpuset tmp_mask;
495 	int rc;
496 
497 	if (mask == NULL) {
498 		spdk_env_get_cpuset(&g_core_mask);
499 		return 0;
500 	}
501 
502 	rc = spdk_cpuset_parse(&g_core_mask, mask);
503 	if (rc < 0) {
504 		SPDK_ERRLOG("invalid cpumask %s\n", mask);
505 		return -EINVAL;
506 	}
507 
508 	if (spdk_cpuset_count(&g_core_mask) == 0) {
509 		SPDK_ERRLOG("no cpus specified\n");
510 		return -EINVAL;
511 	}
512 
513 	spdk_env_get_cpuset(&tmp_mask);
514 	spdk_cpuset_and(&tmp_mask, &g_core_mask);
515 
516 	if (!spdk_cpuset_equal(&tmp_mask, &g_core_mask)) {
517 		SPDK_ERRLOG("one of selected cpu is outside of core mask(=%s)\n",
518 			    spdk_cpuset_fmt(&g_core_mask));
519 		return -EINVAL;
520 	}
521 
522 	return 0;
523 }
524 
525 static void
526 ublk_poller_register(void *args)
527 {
528 	struct ublk_poll_group *poll_group = args;
529 	int rc;
530 
531 	assert(spdk_get_thread() == poll_group->ublk_thread);
532 	/* Bind ublk spdk_thread to current CPU core in order to avoid thread context switch
533 	 * during uring processing as required by ublk kernel.
534 	 */
535 	spdk_thread_bind(spdk_get_thread(), true);
536 
537 	TAILQ_INIT(&poll_group->queue_list);
538 	poll_group->ublk_poller = SPDK_POLLER_REGISTER(ublk_poll, poll_group, 0);
539 	rc = spdk_iobuf_channel_init(&poll_group->iobuf_ch, "ublk",
540 				     UBLK_IOBUF_SMALL_CACHE_SIZE, UBLK_IOBUF_LARGE_CACHE_SIZE);
541 	if (rc != 0) {
542 		assert(false);
543 	}
544 }
545 
546 int
547 ublk_create_target(const char *cpumask_str)
548 {
549 	int rc;
550 	uint32_t i;
551 	char thread_name[32];
552 	struct ublk_poll_group *poll_group;
553 
554 	if (g_ublk_tgt.active == true) {
555 		SPDK_ERRLOG("UBLK target has been created\n");
556 		return -EBUSY;
557 	}
558 
559 	rc = ublk_parse_core_mask(cpumask_str);
560 	if (rc != 0) {
561 		return rc;
562 	}
563 
564 	assert(g_ublk_tgt.poll_groups == NULL);
565 	g_ublk_tgt.poll_groups = calloc(spdk_env_get_core_count(), sizeof(*poll_group));
566 	if (!g_ublk_tgt.poll_groups) {
567 		return -ENOMEM;
568 	}
569 
570 	rc = ublk_open();
571 	if (rc != 0) {
572 		SPDK_ERRLOG("Fail to open UBLK, error=%s\n", spdk_strerror(-rc));
573 		free(g_ublk_tgt.poll_groups);
574 		g_ublk_tgt.poll_groups = NULL;
575 		return rc;
576 	}
577 
578 	spdk_iobuf_register_module("ublk");
579 
580 	SPDK_ENV_FOREACH_CORE(i) {
581 		if (!spdk_cpuset_get_cpu(&g_core_mask, i)) {
582 			continue;
583 		}
584 		snprintf(thread_name, sizeof(thread_name), "ublk_thread%u", i);
585 		poll_group = &g_ublk_tgt.poll_groups[g_num_ublk_poll_groups];
586 		poll_group->ublk_thread = spdk_thread_create(thread_name, &g_core_mask);
587 		spdk_thread_send_msg(poll_group->ublk_thread, ublk_poller_register, poll_group);
588 		g_num_ublk_poll_groups++;
589 	}
590 
591 	assert(spdk_thread_is_app_thread(NULL));
592 	g_ublk_tgt.active = true;
593 	g_ublk_tgt.ctrl_ops_in_progress = 0;
594 	g_ublk_tgt.ctrl_poller = SPDK_POLLER_REGISTER(ublk_ctrl_poller, NULL,
595 				 UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US);
596 
597 	SPDK_NOTICELOG("UBLK target created successfully\n");
598 
599 	return 0;
600 }
601 
602 static void
603 _ublk_fini_done(void *args)
604 {
605 	SPDK_DEBUGLOG(ublk, "\n");
606 
607 	g_num_ublk_poll_groups = 0;
608 	g_next_ublk_poll_group = 0;
609 	g_ublk_tgt.is_destroying = false;
610 	g_ublk_tgt.active = false;
611 	g_ublk_tgt.features = 0;
612 	g_ublk_tgt.ioctl_encode = false;
613 	g_ublk_tgt.user_copy = false;
614 
615 	if (g_ublk_tgt.cb_fn) {
616 		g_ublk_tgt.cb_fn(g_ublk_tgt.cb_arg);
617 		g_ublk_tgt.cb_fn = NULL;
618 		g_ublk_tgt.cb_arg = NULL;
619 	}
620 
621 	if (g_ublk_tgt.poll_groups) {
622 		free(g_ublk_tgt.poll_groups);
623 		g_ublk_tgt.poll_groups = NULL;
624 	}
625 
626 }
627 
628 static void
629 ublk_thread_exit(void *args)
630 {
631 	struct spdk_thread *ublk_thread = spdk_get_thread();
632 	uint32_t i;
633 
634 	for (i = 0; i < g_num_ublk_poll_groups; i++) {
635 		if (g_ublk_tgt.poll_groups[i].ublk_thread == ublk_thread) {
636 			spdk_poller_unregister(&g_ublk_tgt.poll_groups[i].ublk_poller);
637 			spdk_iobuf_channel_fini(&g_ublk_tgt.poll_groups[i].iobuf_ch);
638 			spdk_thread_bind(ublk_thread, false);
639 			spdk_thread_exit(ublk_thread);
640 		}
641 	}
642 }
643 
644 static int
645 ublk_close_dev(struct spdk_ublk_dev *ublk)
646 {
647 	int rc;
648 
649 	/* set is_closing */
650 	if (ublk->is_closing) {
651 		return -EBUSY;
652 	}
653 	ublk->is_closing = true;
654 
655 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_STOP_DEV);
656 	if (rc < 0) {
657 		SPDK_ERRLOG("stop dev %d failed\n", ublk->ublk_id);
658 	}
659 	return rc;
660 }
661 
662 static void
663 _ublk_fini(void *args)
664 {
665 	struct spdk_ublk_dev	*ublk, *ublk_tmp;
666 
667 	TAILQ_FOREACH_SAFE(ublk, &g_ublk_devs, tailq, ublk_tmp) {
668 		ublk_close_dev(ublk);
669 	}
670 
671 	/* Check if all ublks closed */
672 	if (TAILQ_EMPTY(&g_ublk_devs)) {
673 		SPDK_DEBUGLOG(ublk, "finish shutdown\n");
674 		spdk_poller_unregister(&g_ublk_tgt.ctrl_poller);
675 		if (g_ublk_tgt.ctrl_ring.ring_fd >= 0) {
676 			io_uring_queue_exit(&g_ublk_tgt.ctrl_ring);
677 			g_ublk_tgt.ctrl_ring.ring_fd = -1;
678 		}
679 		if (g_ublk_tgt.ctrl_fd >= 0) {
680 			close(g_ublk_tgt.ctrl_fd);
681 			g_ublk_tgt.ctrl_fd = -1;
682 		}
683 		spdk_for_each_thread(ublk_thread_exit, NULL, _ublk_fini_done);
684 	} else {
685 		spdk_thread_send_msg(spdk_get_thread(), _ublk_fini, NULL);
686 	}
687 }
688 
689 int
690 spdk_ublk_fini(spdk_ublk_fini_cb cb_fn, void *cb_arg)
691 {
692 	assert(spdk_thread_is_app_thread(NULL));
693 
694 	if (g_ublk_tgt.is_destroying == true) {
695 		/* UBLK target is being destroying */
696 		return -EBUSY;
697 	}
698 	g_ublk_tgt.cb_fn = cb_fn;
699 	g_ublk_tgt.cb_arg = cb_arg;
700 	g_ublk_tgt.is_destroying = true;
701 	_ublk_fini(NULL);
702 
703 	return 0;
704 }
705 
706 int
707 ublk_destroy_target(spdk_ublk_fini_cb cb_fn, void *cb_arg)
708 {
709 	int rc;
710 
711 	if (g_ublk_tgt.active == false) {
712 		/* UBLK target has not been created */
713 		return -ENOENT;
714 	}
715 
716 	rc = spdk_ublk_fini(cb_fn, cb_arg);
717 
718 	return rc;
719 }
720 
721 struct spdk_ublk_dev *
722 ublk_dev_find_by_id(uint32_t ublk_id)
723 {
724 	struct spdk_ublk_dev *ublk;
725 
726 	/* check whether ublk has already been registered by ublk path. */
727 	TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) {
728 		if (ublk->ublk_id == ublk_id) {
729 			return ublk;
730 		}
731 	}
732 
733 	return NULL;
734 }
735 
736 uint32_t
737 ublk_dev_get_id(struct spdk_ublk_dev *ublk)
738 {
739 	return ublk->ublk_id;
740 }
741 
742 struct spdk_ublk_dev *ublk_dev_first(void)
743 {
744 	return TAILQ_FIRST(&g_ublk_devs);
745 }
746 
747 struct spdk_ublk_dev *ublk_dev_next(struct spdk_ublk_dev *prev)
748 {
749 	return TAILQ_NEXT(prev, tailq);
750 }
751 
752 uint32_t
753 ublk_dev_get_queue_depth(struct spdk_ublk_dev *ublk)
754 {
755 	return ublk->queue_depth;
756 }
757 
758 uint32_t
759 ublk_dev_get_num_queues(struct spdk_ublk_dev *ublk)
760 {
761 	return ublk->num_queues;
762 }
763 
764 const char *
765 ublk_dev_get_bdev_name(struct spdk_ublk_dev *ublk)
766 {
767 	return spdk_bdev_get_name(ublk->bdev);
768 }
769 
770 void
771 spdk_ublk_write_config_json(struct spdk_json_write_ctx *w)
772 {
773 	struct spdk_ublk_dev *ublk;
774 
775 	spdk_json_write_array_begin(w);
776 
777 	if (g_ublk_tgt.active) {
778 		spdk_json_write_object_begin(w);
779 
780 		spdk_json_write_named_string(w, "method", "ublk_create_target");
781 		spdk_json_write_named_object_begin(w, "params");
782 		spdk_json_write_named_string(w, "cpumask", spdk_cpuset_fmt(&g_core_mask));
783 		spdk_json_write_object_end(w);
784 
785 		spdk_json_write_object_end(w);
786 	}
787 
788 	TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) {
789 		spdk_json_write_object_begin(w);
790 
791 		spdk_json_write_named_string(w, "method", "ublk_start_disk");
792 
793 		spdk_json_write_named_object_begin(w, "params");
794 		spdk_json_write_named_string(w, "bdev_name", ublk_dev_get_bdev_name(ublk));
795 		spdk_json_write_named_uint32(w, "ublk_id", ublk->ublk_id);
796 		spdk_json_write_named_uint32(w, "num_queues", ublk->num_queues);
797 		spdk_json_write_named_uint32(w, "queue_depth", ublk->queue_depth);
798 		spdk_json_write_object_end(w);
799 
800 		spdk_json_write_object_end(w);
801 	}
802 
803 	spdk_json_write_array_end(w);
804 }
805 
806 static void
807 ublk_dev_list_register(struct spdk_ublk_dev *ublk)
808 {
809 	UBLK_DEBUGLOG(ublk, "add to tailq\n");
810 	TAILQ_INSERT_TAIL(&g_ublk_devs, ublk, tailq);
811 	g_ublk_tgt.num_ublk_devs++;
812 }
813 
814 static void
815 ublk_dev_list_unregister(struct spdk_ublk_dev *ublk)
816 {
817 	/*
818 	 * ublk device may be stopped before registered.
819 	 * check whether it was registered.
820 	 */
821 
822 	if (ublk_dev_find_by_id(ublk->ublk_id)) {
823 		UBLK_DEBUGLOG(ublk, "remove from tailq\n");
824 		TAILQ_REMOVE(&g_ublk_devs, ublk, tailq);
825 		assert(g_ublk_tgt.num_ublk_devs);
826 		g_ublk_tgt.num_ublk_devs--;
827 		return;
828 	}
829 
830 	UBLK_DEBUGLOG(ublk, "not found in tailq\n");
831 	assert(false);
832 }
833 
834 static void
835 ublk_delete_dev(void *arg)
836 {
837 	struct spdk_ublk_dev *ublk = arg;
838 	int rc = 0;
839 	uint32_t q_idx;
840 
841 	assert(spdk_thread_is_app_thread(NULL));
842 	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
843 		ublk_dev_queue_fini(&ublk->queues[q_idx]);
844 	}
845 
846 	if (ublk->cdev_fd >= 0) {
847 		close(ublk->cdev_fd);
848 	}
849 
850 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_DEL_DEV);
851 	if (rc < 0) {
852 		SPDK_ERRLOG("delete dev %d failed\n", ublk->ublk_id);
853 	}
854 }
855 
856 static int
857 _ublk_close_dev_retry(void *arg)
858 {
859 	struct spdk_ublk_dev *ublk = arg;
860 
861 	if (ublk->ctrl_ops_in_progress > 0) {
862 		if (ublk->retry_count-- > 0) {
863 			return SPDK_POLLER_BUSY;
864 		}
865 		SPDK_ERRLOG("Timeout on ctrl op completion.\n");
866 	}
867 	spdk_poller_unregister(&ublk->retry_poller);
868 	ublk_delete_dev(ublk);
869 	return SPDK_POLLER_BUSY;
870 }
871 
872 static void
873 ublk_try_close_dev(void *arg)
874 {
875 	struct spdk_ublk_dev *ublk = arg;
876 
877 	assert(spdk_thread_is_app_thread(NULL));
878 
879 	ublk->queues_closed += 1;
880 	SPDK_DEBUGLOG(ublk_io, "ublkb%u closed queues %u\n", ublk->ublk_id, ublk->queues_closed);
881 
882 	if (ublk->queues_closed < ublk->num_queues) {
883 		return;
884 	}
885 
886 	if (ublk->ctrl_ops_in_progress > 0) {
887 		assert(ublk->retry_poller == NULL);
888 		ublk->retry_count = UBLK_STOP_BUSY_WAITING_MS * 1000ULL / UBLK_BUSY_POLLING_INTERVAL_US;
889 		ublk->retry_poller = SPDK_POLLER_REGISTER(_ublk_close_dev_retry, ublk,
890 				     UBLK_BUSY_POLLING_INTERVAL_US);
891 	} else {
892 		ublk_delete_dev(ublk);
893 	}
894 }
895 
896 static void
897 ublk_try_close_queue(struct ublk_queue *q)
898 {
899 	struct spdk_ublk_dev *ublk = q->dev;
900 
901 	/* Close queue until no I/O is submitted to bdev in flight,
902 	 * no I/O is waiting to commit result, and all I/Os are aborted back.
903 	 */
904 	if (!TAILQ_EMPTY(&q->inflight_io_list) || !TAILQ_EMPTY(&q->completed_io_list) || q->cmd_inflight) {
905 		/* wait for next retry */
906 		return;
907 	}
908 
909 	TAILQ_REMOVE(&q->poll_group->queue_list, q, tailq);
910 	spdk_put_io_channel(q->bdev_ch);
911 	q->bdev_ch = NULL;
912 
913 	spdk_thread_send_msg(spdk_thread_get_app_thread(), ublk_try_close_dev, ublk);
914 }
915 
916 int
917 ublk_stop_disk(uint32_t ublk_id, ublk_del_cb del_cb, void *cb_arg)
918 {
919 	struct spdk_ublk_dev *ublk;
920 
921 	assert(spdk_thread_is_app_thread(NULL));
922 
923 	ublk = ublk_dev_find_by_id(ublk_id);
924 	if (ublk == NULL) {
925 		SPDK_ERRLOG("no ublk dev with ublk_id=%u\n", ublk_id);
926 		return -ENODEV;
927 	}
928 	if (ublk->is_closing) {
929 		SPDK_WARNLOG("ublk %d is closing\n", ublk->ublk_id);
930 		return -EBUSY;
931 	}
932 
933 	ublk->del_cb = del_cb;
934 	ublk->cb_arg = cb_arg;
935 	return ublk_close_dev(ublk);
936 }
937 
938 static inline void
939 ublk_mark_io_done(struct ublk_io *io, int res)
940 {
941 	/*
942 	 * mark io done by target, so that SPDK can commit its
943 	 * result and fetch new request via io_uring command.
944 	 */
945 	io->cmd_op = UBLK_IO_COMMIT_AND_FETCH_REQ;
946 	io->result = res;
947 	io->need_data = false;
948 }
949 
950 static void
951 ublk_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
952 {
953 	struct ublk_io	*io = cb_arg;
954 	struct ublk_queue *q = io->q;
955 	int res;
956 
957 	if (success) {
958 		res = io->result;
959 	} else {
960 		res = -EIO;
961 	}
962 
963 	ublk_mark_io_done(io, res);
964 
965 	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %d res %d)\n",
966 		      q->q_id, io->tag, res);
967 	TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
968 	TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq);
969 
970 	if (bdev_io != NULL) {
971 		spdk_bdev_free_io(bdev_io);
972 	}
973 }
974 
975 static void
976 ublk_queue_user_copy(struct ublk_io *io, bool is_write)
977 {
978 	struct ublk_queue *q = io->q;
979 	const struct ublksrv_io_desc *iod = io->iod;
980 	struct io_uring_sqe *sqe;
981 	uint64_t pos;
982 	uint32_t nbytes;
983 
984 	nbytes = iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
985 	pos = ublk_user_copy_pos(q->q_id, io->tag);
986 	sqe = io_uring_get_sqe(&q->ring);
987 	assert(sqe);
988 
989 	if (is_write) {
990 		io_uring_prep_read(sqe, 0, io->payload, nbytes, pos);
991 	} else {
992 		io_uring_prep_write(sqe, 0, io->payload, nbytes, pos);
993 	}
994 	io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
995 	io_uring_sqe_set_data64(sqe, build_user_data(io->tag, 0));
996 
997 	io->user_copy = true;
998 	TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
999 	TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq);
1000 }
1001 
1002 static void
1003 ublk_user_copy_read_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1004 {
1005 	struct ublk_io	*io = cb_arg;
1006 
1007 	spdk_bdev_free_io(bdev_io);
1008 
1009 	if (success) {
1010 		ublk_queue_user_copy(io, false);
1011 		return;
1012 	}
1013 	/* READ IO Error */
1014 	ublk_io_done(NULL, false, cb_arg);
1015 }
1016 
1017 static void
1018 ublk_resubmit_io(void *arg)
1019 {
1020 	struct ublk_io *io = (struct ublk_io *)arg;
1021 
1022 	_ublk_submit_bdev_io(io->q, io);
1023 }
1024 
1025 static void
1026 ublk_queue_io(struct ublk_io *io)
1027 {
1028 	int rc;
1029 	struct spdk_bdev *bdev = io->q->dev->bdev;
1030 	struct ublk_queue *q = io->q;
1031 
1032 	io->bdev_io_wait.bdev = bdev;
1033 	io->bdev_io_wait.cb_fn = ublk_resubmit_io;
1034 	io->bdev_io_wait.cb_arg = io;
1035 
1036 	rc = spdk_bdev_queue_io_wait(bdev, q->bdev_ch, &io->bdev_io_wait);
1037 	if (rc != 0) {
1038 		SPDK_ERRLOG("Queue io failed in ublk_queue_io, rc=%d.\n", rc);
1039 		ublk_io_done(NULL, false, io);
1040 	}
1041 }
1042 
1043 static void
1044 ublk_io_get_buffer_cb(struct spdk_iobuf_entry *iobuf, void *buf)
1045 {
1046 	struct ublk_io *io = SPDK_CONTAINEROF(iobuf, struct ublk_io, iobuf);
1047 
1048 	io->mpool_entry = buf;
1049 	assert(io->payload == NULL);
1050 	io->payload = (void *)(uintptr_t)SPDK_ALIGN_CEIL((uintptr_t)buf, 4096ULL);
1051 	io->get_buf_cb(io);
1052 }
1053 
1054 static void
1055 ublk_io_get_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch,
1056 		   ublk_get_buf_cb get_buf_cb)
1057 {
1058 	void *buf;
1059 
1060 	io->payload_size = io->iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1061 	io->get_buf_cb = get_buf_cb;
1062 	buf = spdk_iobuf_get(iobuf_ch, io->payload_size, &io->iobuf, ublk_io_get_buffer_cb);
1063 
1064 	if (buf != NULL) {
1065 		ublk_io_get_buffer_cb(&io->iobuf, buf);
1066 	}
1067 }
1068 
1069 static void
1070 ublk_io_put_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch)
1071 {
1072 	if (io->payload) {
1073 		spdk_iobuf_put(iobuf_ch, io->mpool_entry, io->payload_size);
1074 		io->mpool_entry = NULL;
1075 		io->payload = NULL;
1076 	}
1077 }
1078 
1079 static void
1080 _ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io)
1081 {
1082 	struct spdk_ublk_dev *ublk = q->dev;
1083 	struct spdk_bdev_desc *desc = io->bdev_desc;
1084 	struct spdk_io_channel *ch = io->bdev_ch;
1085 	uint64_t offset_blocks, num_blocks;
1086 	spdk_bdev_io_completion_cb read_cb;
1087 	uint8_t ublk_op;
1088 	int rc = 0;
1089 	const struct ublksrv_io_desc *iod = io->iod;
1090 
1091 	ublk_op = ublksrv_get_op(iod);
1092 	offset_blocks = iod->start_sector >> ublk->sector_per_block_shift;
1093 	num_blocks = iod->nr_sectors >> ublk->sector_per_block_shift;
1094 
1095 	switch (ublk_op) {
1096 	case UBLK_IO_OP_READ:
1097 		if (g_ublk_tgt.user_copy) {
1098 			read_cb = ublk_user_copy_read_done;
1099 		} else {
1100 			read_cb = ublk_io_done;
1101 		}
1102 		rc = spdk_bdev_read_blocks(desc, ch, io->payload, offset_blocks, num_blocks, read_cb, io);
1103 		break;
1104 	case UBLK_IO_OP_WRITE:
1105 		rc = spdk_bdev_write_blocks(desc, ch, io->payload, offset_blocks, num_blocks, ublk_io_done, io);
1106 		break;
1107 	case UBLK_IO_OP_FLUSH:
1108 		rc = spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1109 		break;
1110 	case UBLK_IO_OP_DISCARD:
1111 		rc = spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1112 		break;
1113 	case UBLK_IO_OP_WRITE_ZEROES:
1114 		rc = spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1115 		break;
1116 	default:
1117 		rc = -1;
1118 	}
1119 
1120 	if (rc < 0) {
1121 		if (rc == -ENOMEM) {
1122 			SPDK_INFOLOG(ublk, "No memory, start to queue io.\n");
1123 			ublk_queue_io(io);
1124 		} else {
1125 			SPDK_ERRLOG("ublk io failed in ublk_queue_io, rc=%d.\n", rc);
1126 			ublk_io_done(NULL, false, io);
1127 		}
1128 	}
1129 }
1130 
1131 static void
1132 read_get_buffer_done(struct ublk_io *io)
1133 {
1134 	_ublk_submit_bdev_io(io->q, io);
1135 }
1136 
1137 static void
1138 user_copy_write_get_buffer_done(struct ublk_io *io)
1139 {
1140 	ublk_queue_user_copy(io, true);
1141 }
1142 
1143 static void
1144 ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io)
1145 {
1146 	struct spdk_iobuf_channel *iobuf_ch = &q->poll_group->iobuf_ch;
1147 	const struct ublksrv_io_desc *iod = io->iod;
1148 	uint8_t ublk_op;
1149 
1150 	io->result = iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1151 	ublk_op = ublksrv_get_op(iod);
1152 	switch (ublk_op) {
1153 	case UBLK_IO_OP_READ:
1154 		ublk_io_get_buffer(io, iobuf_ch, read_get_buffer_done);
1155 		break;
1156 	case UBLK_IO_OP_WRITE:
1157 		if (g_ublk_tgt.user_copy) {
1158 			ublk_io_get_buffer(io, iobuf_ch, user_copy_write_get_buffer_done);
1159 		} else {
1160 			_ublk_submit_bdev_io(q, io);
1161 		}
1162 		break;
1163 	default:
1164 		_ublk_submit_bdev_io(q, io);
1165 		break;
1166 	}
1167 }
1168 
1169 static inline void
1170 ublksrv_queue_io_cmd(struct ublk_queue *q,
1171 		     struct ublk_io *io, unsigned tag)
1172 {
1173 	struct ublksrv_io_cmd *cmd;
1174 	struct io_uring_sqe *sqe;
1175 	unsigned int cmd_op = 0;;
1176 	uint64_t user_data;
1177 
1178 	/* each io should have operation of fetching or committing */
1179 	assert((io->cmd_op == UBLK_IO_FETCH_REQ) || (io->cmd_op == UBLK_IO_NEED_GET_DATA) ||
1180 	       (io->cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ));
1181 	cmd_op = io->cmd_op;
1182 
1183 	sqe = io_uring_get_sqe(&q->ring);
1184 	assert(sqe);
1185 
1186 	cmd = (struct ublksrv_io_cmd *)ublk_get_sqe_cmd(sqe);
1187 	if (cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ) {
1188 		cmd->result = io->result;
1189 	}
1190 
1191 	/* These fields should be written once, never change */
1192 	ublk_set_sqe_cmd_op(sqe, cmd_op);
1193 	/* dev->cdev_fd */
1194 	sqe->fd		= 0;
1195 	sqe->opcode	= IORING_OP_URING_CMD;
1196 	sqe->flags	= IOSQE_FIXED_FILE;
1197 	sqe->rw_flags	= 0;
1198 	cmd->tag	= tag;
1199 	cmd->addr	= g_ublk_tgt.user_copy ? 0 : (__u64)(uintptr_t)(io->payload);
1200 	cmd->q_id	= q->q_id;
1201 
1202 	user_data = build_user_data(tag, cmd_op);
1203 	io_uring_sqe_set_data64(sqe, user_data);
1204 
1205 	io->cmd_op = 0;
1206 
1207 	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %u cmd_op %u) iof %x stopping %d\n",
1208 		      q->q_id, tag, cmd_op,
1209 		      io->cmd_op, q->is_stopping);
1210 }
1211 
1212 static int
1213 ublk_io_xmit(struct ublk_queue *q)
1214 {
1215 	TAILQ_HEAD(, ublk_io) buffer_free_list;
1216 	struct spdk_iobuf_channel *iobuf_ch;
1217 	int rc = 0, count = 0;
1218 	struct ublk_io *io;
1219 
1220 	if (TAILQ_EMPTY(&q->completed_io_list)) {
1221 		return 0;
1222 	}
1223 
1224 	TAILQ_INIT(&buffer_free_list);
1225 	while (!TAILQ_EMPTY(&q->completed_io_list)) {
1226 		io = TAILQ_FIRST(&q->completed_io_list);
1227 		assert(io != NULL);
1228 		/*
1229 		 * Remove IO from list now assuming it will be completed. It will be inserted
1230 		 * back to the head if it cannot be completed. This approach is specifically
1231 		 * taken to work around a scan-build use-after-free mischaracterization.
1232 		 */
1233 		TAILQ_REMOVE(&q->completed_io_list, io, tailq);
1234 		if (!io->user_copy) {
1235 			if (!io->need_data) {
1236 				TAILQ_INSERT_TAIL(&buffer_free_list, io, tailq);
1237 			}
1238 			ublksrv_queue_io_cmd(q, io, io->tag);
1239 		}
1240 		count++;
1241 	}
1242 
1243 	q->cmd_inflight += count;
1244 	rc = io_uring_submit(&q->ring);
1245 	if (rc != count) {
1246 		SPDK_ERRLOG("could not submit all commands\n");
1247 		assert(false);
1248 	}
1249 
1250 	/* Note: for READ io, ublk will always copy the data out of
1251 	 * the buffers in the io_uring_submit context.  Since we
1252 	 * are not using SQPOLL for IO rings, we can safely free
1253 	 * those IO buffers here.  This design doesn't seem ideal,
1254 	 * but it's what's possible since there is no discrete
1255 	 * COMMIT_REQ operation.  That will need to change in the
1256 	 * future should we ever want to support async copy
1257 	 * operations.
1258 	 */
1259 	iobuf_ch = &q->poll_group->iobuf_ch;
1260 	while (!TAILQ_EMPTY(&buffer_free_list)) {
1261 		io = TAILQ_FIRST(&buffer_free_list);
1262 		TAILQ_REMOVE(&buffer_free_list, io, tailq);
1263 		ublk_io_put_buffer(io, iobuf_ch);
1264 	}
1265 	return rc;
1266 }
1267 
1268 static void
1269 write_get_buffer_done(struct ublk_io *io)
1270 {
1271 	io->need_data = true;
1272 	io->cmd_op = UBLK_IO_NEED_GET_DATA;
1273 	io->result = 0;
1274 
1275 	TAILQ_REMOVE(&io->q->inflight_io_list, io, tailq);
1276 	TAILQ_INSERT_TAIL(&io->q->completed_io_list, io, tailq);
1277 }
1278 
1279 static int
1280 ublk_io_recv(struct ublk_queue *q)
1281 {
1282 	struct io_uring_cqe *cqe;
1283 	unsigned head, tag;
1284 	int fetch, count = 0;
1285 	struct ublk_io *io;
1286 	struct spdk_iobuf_channel *iobuf_ch;
1287 
1288 	if (q->cmd_inflight == 0) {
1289 		return 0;
1290 	}
1291 
1292 	iobuf_ch = &q->poll_group->iobuf_ch;
1293 	io_uring_for_each_cqe(&q->ring, head, cqe) {
1294 		tag = user_data_to_tag(cqe->user_data);
1295 		io = &q->ios[tag];
1296 
1297 		SPDK_DEBUGLOG(ublk_io, "res %d qid %d tag %u, user copy %u, cmd_op %u\n",
1298 			      cqe->res, q->q_id, tag, io->user_copy, user_data_to_op(cqe->user_data));
1299 
1300 		q->cmd_inflight--;
1301 		TAILQ_INSERT_TAIL(&q->inflight_io_list, io, tailq);
1302 
1303 		if (!io->user_copy) {
1304 			fetch = (cqe->res != UBLK_IO_RES_ABORT) && !q->is_stopping;
1305 			if (!fetch) {
1306 				q->is_stopping = true;
1307 				if (io->cmd_op == UBLK_IO_FETCH_REQ) {
1308 					io->cmd_op = 0;
1309 				}
1310 			}
1311 
1312 			if (cqe->res == UBLK_IO_RES_OK) {
1313 				ublk_submit_bdev_io(q, io);
1314 			} else if (cqe->res == UBLK_IO_RES_NEED_GET_DATA) {
1315 				ublk_io_get_buffer(io, iobuf_ch, write_get_buffer_done);
1316 			} else {
1317 				if (cqe->res != UBLK_IO_RES_ABORT) {
1318 					SPDK_ERRLOG("ublk received error io: res %d qid %d tag %u cmd_op %u\n",
1319 						    cqe->res, q->q_id, tag, user_data_to_op(cqe->user_data));
1320 				}
1321 				TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1322 			}
1323 		} else {
1324 
1325 			/* clear `user_copy` for next use of this IO structure */
1326 			io->user_copy = false;
1327 
1328 			assert((ublksrv_get_op(io->iod) == UBLK_IO_OP_READ) ||
1329 			       (ublksrv_get_op(io->iod) == UBLK_IO_OP_WRITE));
1330 			if (cqe->res != io->result) {
1331 				/* EIO */
1332 				ublk_io_done(NULL, false, io);
1333 			} else {
1334 				if (ublksrv_get_op(io->iod) == UBLK_IO_OP_READ) {
1335 					/* bdev_io is already freed in first READ cycle */
1336 					ublk_io_done(NULL, true, io);
1337 				} else {
1338 					_ublk_submit_bdev_io(q, io);
1339 				}
1340 			}
1341 		}
1342 		count += 1;
1343 		if (count == UBLK_QUEUE_REQUEST) {
1344 			break;
1345 		}
1346 	}
1347 	io_uring_cq_advance(&q->ring, count);
1348 
1349 	return count;
1350 }
1351 
1352 static int
1353 ublk_poll(void *arg)
1354 {
1355 	struct ublk_poll_group *poll_group = arg;
1356 	struct ublk_queue *q, *q_tmp;
1357 	int sent, received, count = 0;
1358 
1359 	TAILQ_FOREACH_SAFE(q, &poll_group->queue_list, tailq, q_tmp) {
1360 		sent = ublk_io_xmit(q);
1361 		received = ublk_io_recv(q);
1362 		if (spdk_unlikely(q->is_stopping)) {
1363 			ublk_try_close_queue(q);
1364 		}
1365 		count += sent + received;
1366 	}
1367 	if (count > 0) {
1368 		return SPDK_POLLER_BUSY;
1369 	} else {
1370 		return SPDK_POLLER_IDLE;
1371 	}
1372 }
1373 
1374 static void
1375 ublk_bdev_hot_remove(struct spdk_ublk_dev *ublk)
1376 {
1377 	ublk_close_dev(ublk);
1378 }
1379 
1380 static void
1381 ublk_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1382 		   void *event_ctx)
1383 {
1384 	switch (type) {
1385 	case SPDK_BDEV_EVENT_REMOVE:
1386 		ublk_bdev_hot_remove(event_ctx);
1387 		break;
1388 	default:
1389 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1390 		break;
1391 	}
1392 }
1393 
1394 static void
1395 ublk_dev_init_io_cmds(struct io_uring *r, uint32_t q_depth)
1396 {
1397 	struct io_uring_sqe *sqe;
1398 	uint32_t i;
1399 
1400 	for (i = 0; i < q_depth; i++) {
1401 		sqe = ublk_uring_get_sqe(r, i);
1402 
1403 		/* These fields should be written once, never change */
1404 		sqe->flags = IOSQE_FIXED_FILE;
1405 		sqe->rw_flags = 0;
1406 		sqe->ioprio = 0;
1407 		sqe->off = 0;
1408 	}
1409 }
1410 
1411 static int
1412 ublk_dev_queue_init(struct ublk_queue *q)
1413 {
1414 	int rc = 0, cmd_buf_size;
1415 	uint32_t j;
1416 	struct spdk_ublk_dev *ublk = q->dev;
1417 	unsigned long off;
1418 
1419 	cmd_buf_size = ublk_queue_cmd_buf_sz(q->q_depth);
1420 	off = UBLKSRV_CMD_BUF_OFFSET +
1421 	      q->q_id * (UBLK_MAX_QUEUE_DEPTH * sizeof(struct ublksrv_io_desc));
1422 	q->io_cmd_buf = (struct ublksrv_io_desc *)mmap(0, cmd_buf_size, PROT_READ,
1423 			MAP_SHARED | MAP_POPULATE, ublk->cdev_fd, off);
1424 	if (q->io_cmd_buf == MAP_FAILED) {
1425 		q->io_cmd_buf = NULL;
1426 		rc = -errno;
1427 		SPDK_ERRLOG("Failed at mmap: %s\n", spdk_strerror(-rc));
1428 		goto err;
1429 	}
1430 
1431 	for (j = 0; j < q->q_depth; j++) {
1432 		q->ios[j].cmd_op = UBLK_IO_FETCH_REQ;
1433 		q->ios[j].iod = &q->io_cmd_buf[j];
1434 	}
1435 
1436 	rc = ublk_setup_ring(q->q_depth, &q->ring, IORING_SETUP_SQE128);
1437 	if (rc < 0) {
1438 		SPDK_ERRLOG("Failed at setup uring: %s\n", spdk_strerror(-rc));
1439 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1440 		q->io_cmd_buf = NULL;
1441 		goto err;
1442 	}
1443 
1444 	rc = io_uring_register_files(&q->ring, &ublk->cdev_fd, 1);
1445 	if (rc != 0) {
1446 		SPDK_ERRLOG("Failed at uring register files: %s\n", spdk_strerror(-rc));
1447 		io_uring_queue_exit(&q->ring);
1448 		q->ring.ring_fd = -1;
1449 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1450 		q->io_cmd_buf = NULL;
1451 		goto err;
1452 	}
1453 
1454 	ublk_dev_init_io_cmds(&q->ring, q->q_depth);
1455 
1456 err:
1457 	return rc;
1458 }
1459 
1460 static void
1461 ublk_dev_queue_fini(struct ublk_queue *q)
1462 {
1463 	if (q->ring.ring_fd >= 0) {
1464 		io_uring_unregister_files(&q->ring);
1465 		io_uring_queue_exit(&q->ring);
1466 		q->ring.ring_fd = -1;
1467 	}
1468 	if (q->io_cmd_buf) {
1469 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1470 	}
1471 }
1472 
1473 static void
1474 ublk_dev_queue_io_init(struct ublk_queue *q)
1475 {
1476 	struct ublk_io *io;
1477 	uint32_t i;
1478 	int rc __attribute__((unused));
1479 	void *buf;
1480 
1481 	/* Some older kernels require a buffer to get posted, even
1482 	 * when NEED_GET_DATA has been specified.  So allocate a
1483 	 * temporary buffer, only for purposes of this workaround.
1484 	 * It never actually gets used, so we will free it immediately
1485 	 * after all of the commands are posted.
1486 	 */
1487 	buf = malloc(64);
1488 
1489 	assert(q->bdev_ch != NULL);
1490 
1491 	/* Initialize and submit all io commands to ublk driver */
1492 	for (i = 0; i < q->q_depth; i++) {
1493 		io = &q->ios[i];
1494 		io->tag = (uint16_t)i;
1495 		io->payload = buf;
1496 		io->bdev_ch = q->bdev_ch;
1497 		io->bdev_desc = q->dev->bdev_desc;
1498 		ublksrv_queue_io_cmd(q, io, i);
1499 	}
1500 
1501 	q->cmd_inflight += q->q_depth;
1502 	rc = io_uring_submit(&q->ring);
1503 	assert(rc == (int)q->q_depth);
1504 	for (i = 0; i < q->q_depth; i++) {
1505 		io = &q->ios[i];
1506 		io->payload = NULL;
1507 	}
1508 	free(buf);
1509 }
1510 
1511 static void
1512 ublk_set_params(struct spdk_ublk_dev *ublk)
1513 {
1514 	int rc;
1515 
1516 	ublk->dev_params.len = sizeof(struct ublk_params);
1517 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_SET_PARAMS);
1518 	if (rc < 0) {
1519 		SPDK_ERRLOG("UBLK can't set params for dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
1520 		ublk_delete_dev(ublk);
1521 		if (ublk->start_cb) {
1522 			ublk->start_cb(ublk->cb_arg, rc);
1523 			ublk->start_cb = NULL;
1524 		}
1525 	}
1526 }
1527 
1528 /* Set ublk device parameters based on bdev */
1529 static void
1530 ublk_info_param_init(struct spdk_ublk_dev *ublk)
1531 {
1532 	struct spdk_bdev *bdev = ublk->bdev;
1533 	uint32_t blk_size = spdk_bdev_get_data_block_size(bdev);
1534 	uint32_t pblk_size = spdk_bdev_get_physical_block_size(bdev);
1535 	uint32_t io_opt_blocks = spdk_bdev_get_optimal_io_boundary(bdev);
1536 	uint64_t num_blocks = spdk_bdev_get_num_blocks(bdev);
1537 	uint8_t sectors_per_block = blk_size >> LINUX_SECTOR_SHIFT;
1538 	uint32_t io_min_size = blk_size;
1539 	uint32_t io_opt_size = spdk_max(io_opt_blocks * blk_size, io_min_size);
1540 
1541 	struct ublksrv_ctrl_dev_info uinfo = {
1542 		.queue_depth = ublk->queue_depth,
1543 		.nr_hw_queues = ublk->num_queues,
1544 		.dev_id = ublk->ublk_id,
1545 		.max_io_buf_bytes = UBLK_IO_MAX_BYTES,
1546 		.ublksrv_pid = getpid(),
1547 		.flags = UBLK_F_URING_CMD_COMP_IN_TASK,
1548 	};
1549 	struct ublk_params uparams = {
1550 		.types = UBLK_PARAM_TYPE_BASIC,
1551 		.basic = {
1552 			.logical_bs_shift = spdk_u32log2(blk_size),
1553 			.physical_bs_shift = spdk_u32log2(pblk_size),
1554 			.io_min_shift = spdk_u32log2(io_min_size),
1555 			.io_opt_shift = spdk_u32log2(io_opt_size),
1556 			.dev_sectors = num_blocks * sectors_per_block,
1557 			.max_sectors = UBLK_IO_MAX_BYTES >> LINUX_SECTOR_SHIFT,
1558 		}
1559 	};
1560 
1561 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1562 		uparams.types |= UBLK_PARAM_TYPE_DISCARD;
1563 		uparams.discard.discard_alignment = sectors_per_block;
1564 		uparams.discard.max_discard_sectors = num_blocks * sectors_per_block;
1565 		uparams.discard.max_discard_segments = 1;
1566 		uparams.discard.discard_granularity = blk_size;
1567 		if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1568 			uparams.discard.max_write_zeroes_sectors = num_blocks * sectors_per_block;
1569 		}
1570 	}
1571 
1572 	if (g_ublk_tgt.user_copy) {
1573 		uinfo.flags |= UBLK_F_USER_COPY;
1574 	} else {
1575 		uinfo.flags |= UBLK_F_NEED_GET_DATA;
1576 	}
1577 
1578 	ublk->dev_info = uinfo;
1579 	ublk->dev_params = uparams;
1580 }
1581 
1582 static void
1583 _ublk_free_dev(void *arg)
1584 {
1585 	struct spdk_ublk_dev *ublk = arg;
1586 
1587 	ublk_free_dev(ublk);
1588 }
1589 
1590 static void
1591 free_buffers(void *arg)
1592 {
1593 	struct ublk_queue *q = arg;
1594 	uint32_t i;
1595 
1596 	for (i = 0; i < q->q_depth; i++) {
1597 		ublk_io_put_buffer(&q->ios[i], &q->poll_group->iobuf_ch);
1598 	}
1599 	free(q->ios);
1600 	q->ios = NULL;
1601 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _ublk_free_dev, q->dev);
1602 }
1603 
1604 static void
1605 ublk_free_dev(struct spdk_ublk_dev *ublk)
1606 {
1607 	struct ublk_queue *q;
1608 	uint32_t q_idx;
1609 
1610 	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
1611 		q = &ublk->queues[q_idx];
1612 
1613 		/* The ublk_io of this queue are not initialized. */
1614 		if (q->ios == NULL) {
1615 			continue;
1616 		}
1617 
1618 		/* We found a queue that has an ios array that may have buffers
1619 		 * that need to be freed.  Send a message to the queue's thread
1620 		 * so it can free the buffers back to that thread's iobuf channel.
1621 		 * When it's done, it will set q->ios to NULL and send a message
1622 		 * back to this function to continue.
1623 		 */
1624 		if (q->poll_group) {
1625 			spdk_thread_send_msg(q->poll_group->ublk_thread, free_buffers, q);
1626 			return;
1627 		} else {
1628 			free(q->ios);
1629 			q->ios = NULL;
1630 		}
1631 	}
1632 
1633 	/* All of the buffers associated with the queues have been freed, so now
1634 	 * continue with releasing resources for the rest of the ublk device.
1635 	 */
1636 	if (ublk->bdev_desc) {
1637 		spdk_bdev_close(ublk->bdev_desc);
1638 		ublk->bdev_desc = NULL;
1639 	}
1640 
1641 	ublk_dev_list_unregister(ublk);
1642 
1643 	if (ublk->del_cb) {
1644 		ublk->del_cb(ublk->cb_arg);
1645 	}
1646 	SPDK_NOTICELOG("ublk dev %d stopped\n", ublk->ublk_id);
1647 	free(ublk);
1648 }
1649 
1650 static int
1651 ublk_ios_init(struct spdk_ublk_dev *ublk)
1652 {
1653 	int rc;
1654 	uint32_t i, j;
1655 	struct ublk_queue *q;
1656 
1657 	for (i = 0; i < ublk->num_queues; i++) {
1658 		q = &ublk->queues[i];
1659 
1660 		TAILQ_INIT(&q->completed_io_list);
1661 		TAILQ_INIT(&q->inflight_io_list);
1662 		q->dev = ublk;
1663 		q->q_id = i;
1664 		q->q_depth = ublk->queue_depth;
1665 		q->ios = calloc(q->q_depth, sizeof(struct ublk_io));
1666 		if (!q->ios) {
1667 			rc = -ENOMEM;
1668 			SPDK_ERRLOG("could not allocate queue ios\n");
1669 			goto err;
1670 		}
1671 		for (j = 0; j < q->q_depth; j++) {
1672 			q->ios[j].q = q;
1673 		}
1674 	}
1675 
1676 	return 0;
1677 
1678 err:
1679 	for (i = 0; i < ublk->num_queues; i++) {
1680 		free(q->ios);
1681 		q->ios = NULL;
1682 	}
1683 	return rc;
1684 }
1685 
1686 static void
1687 ublk_queue_run(void *arg1)
1688 {
1689 	struct ublk_queue	*q = arg1;
1690 	struct spdk_ublk_dev *ublk = q->dev;
1691 	struct ublk_poll_group *poll_group = q->poll_group;
1692 
1693 	assert(spdk_get_thread() == poll_group->ublk_thread);
1694 	q->bdev_ch = spdk_bdev_get_io_channel(ublk->bdev_desc);
1695 	/* Queues must be filled with IO in the io pthread */
1696 	ublk_dev_queue_io_init(q);
1697 
1698 	TAILQ_INSERT_TAIL(&poll_group->queue_list, q, tailq);
1699 }
1700 
1701 int
1702 ublk_start_disk(const char *bdev_name, uint32_t ublk_id,
1703 		uint32_t num_queues, uint32_t queue_depth,
1704 		ublk_start_cb start_cb, void *cb_arg)
1705 {
1706 	int			rc;
1707 	uint32_t		i;
1708 	struct spdk_bdev	*bdev;
1709 	struct spdk_ublk_dev	*ublk = NULL;
1710 	uint32_t		sector_per_block;
1711 
1712 	assert(spdk_thread_is_app_thread(NULL));
1713 
1714 	if (g_ublk_tgt.active == false) {
1715 		SPDK_ERRLOG("NO ublk target exist\n");
1716 		return -ENODEV;
1717 	}
1718 
1719 	ublk = ublk_dev_find_by_id(ublk_id);
1720 	if (ublk != NULL) {
1721 		SPDK_DEBUGLOG(ublk, "ublk id %d is in use.\n", ublk_id);
1722 		return -EBUSY;
1723 	}
1724 
1725 	if (g_ublk_tgt.num_ublk_devs >= g_ublks_max) {
1726 		SPDK_DEBUGLOG(ublk, "Reached maximum number of supported devices: %u\n", g_ublks_max);
1727 		return -ENOTSUP;
1728 	}
1729 
1730 	ublk = calloc(1, sizeof(*ublk));
1731 	if (ublk == NULL) {
1732 		return -ENOMEM;
1733 	}
1734 	ublk->start_cb = start_cb;
1735 	ublk->cb_arg = cb_arg;
1736 	ublk->cdev_fd = -1;
1737 	ublk->ublk_id = ublk_id;
1738 	UBLK_DEBUGLOG(ublk, "bdev %s num_queues %d queue_depth %d\n",
1739 		      bdev_name, num_queues, queue_depth);
1740 
1741 	rc = spdk_bdev_open_ext(bdev_name, true, ublk_bdev_event_cb, ublk, &ublk->bdev_desc);
1742 	if (rc != 0) {
1743 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc);
1744 		free(ublk);
1745 		return rc;
1746 	}
1747 
1748 	bdev = spdk_bdev_desc_get_bdev(ublk->bdev_desc);
1749 	ublk->bdev = bdev;
1750 	sector_per_block = spdk_bdev_get_data_block_size(ublk->bdev) >> LINUX_SECTOR_SHIFT;
1751 	ublk->sector_per_block_shift = spdk_u32log2(sector_per_block);
1752 
1753 	ublk->queues_closed = 0;
1754 	ublk->num_queues = num_queues;
1755 	ublk->queue_depth = queue_depth;
1756 	if (ublk->queue_depth > UBLK_DEV_MAX_QUEUE_DEPTH) {
1757 		SPDK_WARNLOG("Set Queue depth %d of UBLK %d to maximum %d\n",
1758 			     ublk->queue_depth, ublk->ublk_id, UBLK_DEV_MAX_QUEUE_DEPTH);
1759 		ublk->queue_depth = UBLK_DEV_MAX_QUEUE_DEPTH;
1760 	}
1761 	if (ublk->num_queues > UBLK_DEV_MAX_QUEUES) {
1762 		SPDK_WARNLOG("Set Queue num %d of UBLK %d to maximum %d\n",
1763 			     ublk->num_queues, ublk->ublk_id, UBLK_DEV_MAX_QUEUES);
1764 		ublk->num_queues = UBLK_DEV_MAX_QUEUES;
1765 	}
1766 	for (i = 0; i < ublk->num_queues; i++) {
1767 		ublk->queues[i].ring.ring_fd = -1;
1768 	}
1769 
1770 	ublk_info_param_init(ublk);
1771 	rc = ublk_ios_init(ublk);
1772 	if (rc != 0) {
1773 		spdk_bdev_close(ublk->bdev_desc);
1774 		free(ublk);
1775 		return rc;
1776 	}
1777 
1778 	SPDK_INFOLOG(ublk, "Enabling kernel access to bdev %s via ublk %d\n",
1779 		     bdev_name, ublk_id);
1780 
1781 	/* Add ublk_dev to the end of disk list */
1782 	ublk_dev_list_register(ublk);
1783 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_ADD_DEV);
1784 	if (rc < 0) {
1785 		SPDK_ERRLOG("UBLK can't add dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
1786 		ublk_free_dev(ublk);
1787 	}
1788 
1789 	return rc;
1790 }
1791 
1792 static void
1793 ublk_finish_start(struct spdk_ublk_dev *ublk)
1794 {
1795 	int			rc;
1796 	uint32_t		q_id;
1797 	struct spdk_thread	*ublk_thread;
1798 	char			buf[64];
1799 
1800 	snprintf(buf, 64, "%s%d", UBLK_BLK_CDEV, ublk->ublk_id);
1801 	ublk->cdev_fd = open(buf, O_RDWR);
1802 	if (ublk->cdev_fd < 0) {
1803 		rc = ublk->cdev_fd;
1804 		SPDK_ERRLOG("can't open %s, rc %d\n", buf, rc);
1805 		goto err;
1806 	}
1807 
1808 	for (q_id = 0; q_id < ublk->num_queues; q_id++) {
1809 		rc = ublk_dev_queue_init(&ublk->queues[q_id]);
1810 		if (rc) {
1811 			goto err;
1812 		}
1813 	}
1814 
1815 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_START_DEV);
1816 	if (rc < 0) {
1817 		SPDK_ERRLOG("start dev %d failed, rc %s\n", ublk->ublk_id,
1818 			    spdk_strerror(-rc));
1819 		goto err;
1820 	}
1821 
1822 	/* Send queue to different spdk_threads for load balance */
1823 	for (q_id = 0; q_id < ublk->num_queues; q_id++) {
1824 		ublk->queues[q_id].poll_group = &g_ublk_tgt.poll_groups[g_next_ublk_poll_group];
1825 		ublk_thread = g_ublk_tgt.poll_groups[g_next_ublk_poll_group].ublk_thread;
1826 		spdk_thread_send_msg(ublk_thread, ublk_queue_run, &ublk->queues[q_id]);
1827 		g_next_ublk_poll_group++;
1828 		if (g_next_ublk_poll_group == g_num_ublk_poll_groups) {
1829 			g_next_ublk_poll_group = 0;
1830 		}
1831 	}
1832 
1833 	goto out;
1834 
1835 err:
1836 	ublk_delete_dev(ublk);
1837 out:
1838 	if (ublk->start_cb) {
1839 		ublk->start_cb(ublk->cb_arg, rc);
1840 		ublk->start_cb = NULL;
1841 	}
1842 }
1843 
1844 SPDK_LOG_REGISTER_COMPONENT(ublk)
1845 SPDK_LOG_REGISTER_COMPONENT(ublk_io)
1846