xref: /spdk/lib/ublk/ublk.c (revision 1e3d25b901a6b9d2dce4999e2ecbc02f98d79f05)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2022 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include <liburing.h>
7 
8 #include "spdk/stdinc.h"
9 #include "spdk/string.h"
10 #include "spdk/bdev.h"
11 #include "spdk/endian.h"
12 #include "spdk/env.h"
13 #include "spdk/likely.h"
14 #include "spdk/log.h"
15 #include "spdk/util.h"
16 #include "spdk/queue.h"
17 #include "spdk/json.h"
18 #include "spdk/ublk.h"
19 #include "spdk/thread.h"
20 
21 #include "ublk_internal.h"
22 
23 #define UBLK_CTRL_DEV					"/dev/ublk-control"
24 #define UBLK_BLK_CDEV					"/dev/ublkc"
25 
26 #define LINUX_SECTOR_SHIFT				9
27 #define UBLK_IO_MAX_BYTES				SPDK_BDEV_LARGE_BUF_MAX_SIZE
28 #define UBLK_DEV_MAX_QUEUES				32
29 #define UBLK_DEV_MAX_QUEUE_DEPTH			1024
30 #define UBLK_QUEUE_REQUEST				32
31 #define UBLK_STOP_BUSY_WAITING_MS			10000
32 #define UBLK_BUSY_POLLING_INTERVAL_US			20000
33 #define UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US	1000
34 /* By default, kernel ublk_drv driver can support up to 64 block devices */
35 #define UBLK_DEFAULT_MAX_SUPPORTED_DEVS			64
36 
37 #define UBLK_IOBUF_SMALL_CACHE_SIZE			128
38 #define UBLK_IOBUF_LARGE_CACHE_SIZE			32
39 
40 #define UBLK_DEBUGLOG(ublk, format, ...) \
41 	SPDK_DEBUGLOG(ublk, "ublk%d: " format, ublk->ublk_id, ##__VA_ARGS__);
42 
43 static uint32_t g_num_ublk_poll_groups = 0;
44 static uint32_t g_next_ublk_poll_group = 0;
45 static uint32_t g_ublks_max = UBLK_DEFAULT_MAX_SUPPORTED_DEVS;
46 static struct spdk_cpuset g_core_mask;
47 
48 struct ublk_queue;
49 struct ublk_poll_group;
50 struct ublk_io;
51 static void _ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io);
52 static void ublk_dev_queue_fini(struct ublk_queue *q);
53 static int ublk_poll(void *arg);
54 static int ublk_ctrl_cmd(struct spdk_ublk_dev *ublk, uint32_t cmd_op);
55 
56 static void ublk_set_params(struct spdk_ublk_dev *ublk);
57 static void ublk_finish_start(struct spdk_ublk_dev *ublk);
58 static void ublk_free_dev(struct spdk_ublk_dev *ublk);
59 static void ublk_delete_dev(void *arg);
60 static int ublk_close_dev(struct spdk_ublk_dev *ublk);
61 
62 static const char *ublk_op_name[64]
63 __attribute__((unused)) = {
64 	[UBLK_CMD_ADD_DEV] =	"UBLK_CMD_ADD_DEV",
65 	[UBLK_CMD_DEL_DEV] =	"UBLK_CMD_DEL_DEV",
66 	[UBLK_CMD_START_DEV] =	"UBLK_CMD_START_DEV",
67 	[UBLK_CMD_STOP_DEV] =	"UBLK_CMD_STOP_DEV",
68 	[UBLK_CMD_SET_PARAMS] =	"UBLK_CMD_SET_PARAMS",
69 };
70 
71 typedef void (*ublk_get_buf_cb)(struct ublk_io *io);
72 
73 struct ublk_io {
74 	void			*payload;
75 	void			*mpool_entry;
76 	bool			need_data;
77 	bool			user_copy;
78 	uint16_t		tag;
79 	uint64_t		payload_size;
80 	uint32_t		cmd_op;
81 	int32_t			result;
82 	struct spdk_bdev_desc	*bdev_desc;
83 	struct spdk_io_channel	*bdev_ch;
84 	const struct ublksrv_io_desc	*iod;
85 	ublk_get_buf_cb		get_buf_cb;
86 	struct ublk_queue	*q;
87 	/* for bdev io_wait */
88 	struct spdk_bdev_io_wait_entry bdev_io_wait;
89 	struct spdk_iobuf_entry	iobuf;
90 
91 	TAILQ_ENTRY(ublk_io)	tailq;
92 };
93 
94 struct ublk_queue {
95 	uint32_t		q_id;
96 	uint32_t		q_depth;
97 	struct ublk_io		*ios;
98 	TAILQ_HEAD(, ublk_io)	completed_io_list;
99 	TAILQ_HEAD(, ublk_io)	inflight_io_list;
100 	uint32_t		cmd_inflight;
101 	bool			is_stopping;
102 	struct ublksrv_io_desc	*io_cmd_buf;
103 	/* ring depth == dev_info->queue_depth. */
104 	struct io_uring		ring;
105 	struct spdk_ublk_dev	*dev;
106 	struct ublk_poll_group	*poll_group;
107 	struct spdk_io_channel	*bdev_ch;
108 
109 	TAILQ_ENTRY(ublk_queue)	tailq;
110 };
111 
112 struct spdk_ublk_dev {
113 	struct spdk_bdev	*bdev;
114 	struct spdk_bdev_desc	*bdev_desc;
115 
116 	int			cdev_fd;
117 	struct ublk_params	dev_params;
118 	struct ublksrv_ctrl_dev_info	dev_info;
119 
120 	uint32_t		ublk_id;
121 	uint32_t		num_queues;
122 	uint32_t		queue_depth;
123 	uint32_t		sector_per_block_shift;
124 	struct ublk_queue	queues[UBLK_DEV_MAX_QUEUES];
125 
126 	struct spdk_poller	*retry_poller;
127 	int			retry_count;
128 	uint32_t		queues_closed;
129 	ublk_start_cb		start_cb;
130 	ublk_del_cb		del_cb;
131 	void			*cb_arg;
132 	uint32_t		current_cmd_op;
133 	uint32_t		ctrl_ops_in_progress;
134 	bool			is_closing;
135 
136 	TAILQ_ENTRY(spdk_ublk_dev) tailq;
137 	TAILQ_ENTRY(spdk_ublk_dev) wait_tailq;
138 };
139 
140 struct ublk_poll_group {
141 	struct spdk_thread		*ublk_thread;
142 	struct spdk_poller		*ublk_poller;
143 	struct spdk_iobuf_channel	iobuf_ch;
144 	TAILQ_HEAD(, ublk_queue)	queue_list;
145 };
146 
147 struct ublk_tgt {
148 	int			ctrl_fd;
149 	bool			active;
150 	bool			is_destroying;
151 	spdk_ublk_fini_cb	cb_fn;
152 	void			*cb_arg;
153 	struct io_uring		ctrl_ring;
154 	struct spdk_poller	*ctrl_poller;
155 	uint32_t		ctrl_ops_in_progress;
156 	struct ublk_poll_group	*poll_groups;
157 	uint32_t		num_ublk_devs;
158 	uint64_t		features;
159 	/* `ublk_drv` supports UBLK_F_CMD_IOCTL_ENCODE */
160 	bool			ioctl_encode;
161 	/* `ublk_drv` supports UBLK_F_USER_COPY */
162 	bool			user_copy;
163 };
164 
165 static TAILQ_HEAD(, spdk_ublk_dev) g_ublk_devs = TAILQ_HEAD_INITIALIZER(g_ublk_devs);
166 static struct ublk_tgt g_ublk_tgt;
167 
168 /* helpers for using io_uring */
169 static inline int
170 ublk_setup_ring(uint32_t depth, struct io_uring *r, unsigned flags)
171 {
172 	struct io_uring_params p = {};
173 
174 	p.flags = flags | IORING_SETUP_CQSIZE;
175 	p.cq_entries = depth;
176 
177 	return io_uring_queue_init_params(depth, r, &p);
178 }
179 
180 static inline struct io_uring_sqe *
181 ublk_uring_get_sqe(struct io_uring *r, uint32_t idx)
182 {
183 	/* Need to update the idx since we set IORING_SETUP_SQE128 parameter in ublk_setup_ring */
184 	return &r->sq.sqes[idx << 1];
185 }
186 
187 static inline void *
188 ublk_get_sqe_cmd(struct io_uring_sqe *sqe)
189 {
190 	return (void *)&sqe->addr3;
191 }
192 
193 static inline void
194 ublk_set_sqe_cmd_op(struct io_uring_sqe *sqe, uint32_t cmd_op)
195 {
196 	uint32_t opc = cmd_op;
197 
198 	if (g_ublk_tgt.ioctl_encode) {
199 		switch (cmd_op) {
200 		/* ctrl uring */
201 		case UBLK_CMD_GET_DEV_INFO:
202 			opc = _IOR('u', UBLK_CMD_GET_DEV_INFO, struct ublksrv_ctrl_cmd);
203 			break;
204 		case UBLK_CMD_ADD_DEV:
205 			opc = _IOWR('u', UBLK_CMD_ADD_DEV, struct ublksrv_ctrl_cmd);
206 			break;
207 		case UBLK_CMD_DEL_DEV:
208 			opc = _IOWR('u', UBLK_CMD_DEL_DEV, struct ublksrv_ctrl_cmd);
209 			break;
210 		case UBLK_CMD_START_DEV:
211 			opc = _IOWR('u', UBLK_CMD_START_DEV, struct ublksrv_ctrl_cmd);
212 			break;
213 		case UBLK_CMD_STOP_DEV:
214 			opc = _IOWR('u', UBLK_CMD_STOP_DEV, struct ublksrv_ctrl_cmd);
215 			break;
216 		case UBLK_CMD_SET_PARAMS:
217 			opc = _IOWR('u', UBLK_CMD_SET_PARAMS, struct ublksrv_ctrl_cmd);
218 			break;
219 
220 		/* io uring */
221 		case UBLK_IO_FETCH_REQ:
222 			opc = _IOWR('u', UBLK_IO_FETCH_REQ, struct ublksrv_io_cmd);
223 			break;
224 		case UBLK_IO_COMMIT_AND_FETCH_REQ:
225 			opc = _IOWR('u', UBLK_IO_COMMIT_AND_FETCH_REQ, struct ublksrv_io_cmd);
226 			break;
227 		case UBLK_IO_NEED_GET_DATA:
228 			opc = _IOWR('u', UBLK_IO_NEED_GET_DATA, struct ublksrv_io_cmd);
229 			break;
230 		default:
231 			break;
232 		}
233 	}
234 
235 	sqe->off = opc;
236 }
237 
238 static inline uint64_t
239 build_user_data(uint16_t tag, uint8_t op)
240 {
241 	assert(!(tag >> 16) && !(op >> 8));
242 
243 	return tag | (op << 16);
244 }
245 
246 static inline uint16_t
247 user_data_to_tag(uint64_t user_data)
248 {
249 	return user_data & 0xffff;
250 }
251 
252 static inline uint8_t
253 user_data_to_op(uint64_t user_data)
254 {
255 	return (user_data >> 16) & 0xff;
256 }
257 
258 static inline uint64_t
259 ublk_user_copy_pos(uint16_t q_id, uint16_t tag)
260 {
261 	return (uint64_t)UBLKSRV_IO_BUF_OFFSET + ((((uint64_t)q_id) << UBLK_QID_OFF) | (((
262 				uint64_t)tag) << UBLK_TAG_OFF));
263 }
264 
265 void
266 spdk_ublk_init(void)
267 {
268 	assert(spdk_thread_is_app_thread(NULL));
269 
270 	g_ublk_tgt.ctrl_fd = -1;
271 	g_ublk_tgt.ctrl_ring.ring_fd = -1;
272 }
273 
274 static void
275 ublk_ctrl_cmd_error(struct spdk_ublk_dev *ublk, int32_t res)
276 {
277 	assert(res != 0);
278 
279 	SPDK_ERRLOG("ctrlr cmd %s failed, %s\n", ublk_op_name[ublk->current_cmd_op], spdk_strerror(-res));
280 	switch (ublk->current_cmd_op) {
281 	case UBLK_CMD_ADD_DEV:
282 	case UBLK_CMD_SET_PARAMS:
283 		if (ublk->start_cb) {
284 			ublk->start_cb(ublk->cb_arg, res);
285 			ublk->start_cb = NULL;
286 		}
287 
288 		ublk_delete_dev(ublk);
289 		break;
290 	case UBLK_CMD_START_DEV:
291 		if (ublk->start_cb) {
292 			ublk->start_cb(ublk->cb_arg, res);
293 			ublk->start_cb = NULL;
294 		}
295 
296 		ublk_close_dev(ublk);
297 		break;
298 	case UBLK_CMD_STOP_DEV:
299 		/* TODO: process stop cmd failure */
300 		break;
301 	case UBLK_CMD_DEL_DEV:
302 		/* TODO: process del cmd failure */
303 		break;
304 	default:
305 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", ublk->current_cmd_op);
306 		break;
307 	}
308 }
309 
310 static void
311 ublk_ctrl_process_cqe(struct io_uring_cqe *cqe)
312 {
313 	struct spdk_ublk_dev *ublk;
314 
315 	ublk = (struct spdk_ublk_dev *)cqe->user_data;
316 	UBLK_DEBUGLOG(ublk, "ctrl cmd completed\n");
317 	ublk->ctrl_ops_in_progress--;
318 
319 	if (spdk_unlikely(cqe->res != 0)) {
320 		SPDK_ERRLOG("ctrlr cmd failed\n");
321 		ublk_ctrl_cmd_error(ublk, cqe->res);
322 		return;
323 	}
324 
325 	switch (ublk->current_cmd_op) {
326 	case UBLK_CMD_ADD_DEV:
327 		ublk_set_params(ublk);
328 		break;
329 	case UBLK_CMD_SET_PARAMS:
330 		ublk_finish_start(ublk);
331 		break;
332 	case UBLK_CMD_START_DEV:
333 		if (ublk->start_cb) {
334 			ublk->start_cb(ublk->cb_arg, 0);
335 			ublk->start_cb = NULL;
336 		}
337 		break;
338 	case UBLK_CMD_STOP_DEV:
339 		break;
340 	case UBLK_CMD_DEL_DEV:
341 		ublk_free_dev(ublk);
342 		break;
343 	default:
344 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", ublk->current_cmd_op);
345 		break;
346 	}
347 }
348 
349 static int
350 ublk_ctrl_poller(void *arg)
351 {
352 	struct io_uring *ring = &g_ublk_tgt.ctrl_ring;
353 	struct io_uring_cqe *cqe;
354 	const int max = 8;
355 	int i, count = 0, rc;
356 
357 	if (!g_ublk_tgt.ctrl_ops_in_progress) {
358 		return SPDK_POLLER_IDLE;
359 	}
360 
361 	for (i = 0; i < max; i++) {
362 		rc = io_uring_peek_cqe(ring, &cqe);
363 		if (rc == -EAGAIN) {
364 			break;
365 		}
366 
367 		assert(cqe != NULL);
368 		g_ublk_tgt.ctrl_ops_in_progress--;
369 
370 		ublk_ctrl_process_cqe(cqe);
371 
372 		io_uring_cqe_seen(ring, cqe);
373 		count++;
374 	}
375 
376 	return count > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
377 }
378 
379 static int
380 ublk_ctrl_cmd(struct spdk_ublk_dev *ublk, uint32_t cmd_op)
381 {
382 	uint32_t dev_id = ublk->ublk_id;
383 	int rc = -EINVAL;
384 	struct io_uring_sqe *sqe;
385 	struct ublksrv_ctrl_cmd *cmd;
386 
387 	UBLK_DEBUGLOG(ublk, "ctrl cmd %s\n", ublk_op_name[cmd_op]);
388 
389 	sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring);
390 	if (!sqe) {
391 		SPDK_ERRLOG("No available sqe in ctrl ring\n");
392 		assert(false);
393 		return -ENOENT;
394 	}
395 
396 	cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe);
397 	sqe->fd = g_ublk_tgt.ctrl_fd;
398 	sqe->opcode = IORING_OP_URING_CMD;
399 	sqe->ioprio = 0;
400 	cmd->dev_id = dev_id;
401 	cmd->queue_id = -1;
402 	ublk->current_cmd_op = cmd_op;
403 
404 	switch (cmd_op) {
405 	case UBLK_CMD_ADD_DEV:
406 		cmd->addr = (__u64)(uintptr_t)&ublk->dev_info;
407 		cmd->len = sizeof(ublk->dev_info);
408 		break;
409 	case UBLK_CMD_SET_PARAMS:
410 		cmd->addr = (__u64)(uintptr_t)&ublk->dev_params;
411 		cmd->len = sizeof(ublk->dev_params);
412 		break;
413 	case UBLK_CMD_START_DEV:
414 		cmd->data[0] = getpid();
415 		break;
416 	case UBLK_CMD_STOP_DEV:
417 		break;
418 	case UBLK_CMD_DEL_DEV:
419 		break;
420 	default:
421 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", cmd_op);
422 		return -EINVAL;
423 	}
424 	ublk_set_sqe_cmd_op(sqe, cmd_op);
425 	io_uring_sqe_set_data(sqe, ublk);
426 
427 	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
428 	if (rc < 0) {
429 		SPDK_ERRLOG("uring submit rc %d\n", rc);
430 		return rc;
431 	}
432 	g_ublk_tgt.ctrl_ops_in_progress++;
433 	ublk->ctrl_ops_in_progress++;
434 
435 	return 0;
436 }
437 
438 static int
439 ublk_ctrl_cmd_get_features(void)
440 {
441 	int rc;
442 	struct io_uring_sqe *sqe;
443 	struct io_uring_cqe *cqe;
444 	struct ublksrv_ctrl_cmd *cmd;
445 	uint32_t cmd_op;
446 
447 	sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring);
448 	if (!sqe) {
449 		SPDK_ERRLOG("No available sqe in ctrl ring\n");
450 		assert(false);
451 		return -ENOENT;
452 	}
453 
454 	cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe);
455 	sqe->fd = g_ublk_tgt.ctrl_fd;
456 	sqe->opcode = IORING_OP_URING_CMD;
457 	sqe->ioprio = 0;
458 	cmd->dev_id = -1;
459 	cmd->queue_id = -1;
460 	cmd->addr = (__u64)(uintptr_t)&g_ublk_tgt.features;
461 	cmd->len = sizeof(g_ublk_tgt.features);
462 
463 	cmd_op = UBLK_U_CMD_GET_FEATURES;
464 	ublk_set_sqe_cmd_op(sqe, cmd_op);
465 
466 	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
467 	if (rc < 0) {
468 		SPDK_ERRLOG("uring submit rc %d\n", rc);
469 		return rc;
470 	}
471 
472 	rc = io_uring_wait_cqe(&g_ublk_tgt.ctrl_ring, &cqe);
473 	if (rc < 0) {
474 		SPDK_ERRLOG("wait cqe rc %d\n", rc);
475 		return rc;
476 	}
477 
478 	if (cqe->res == 0) {
479 		g_ublk_tgt.ioctl_encode = !!(g_ublk_tgt.features & UBLK_F_CMD_IOCTL_ENCODE);
480 		g_ublk_tgt.user_copy = !!(g_ublk_tgt.features & UBLK_F_USER_COPY);
481 	}
482 	io_uring_cqe_seen(&g_ublk_tgt.ctrl_ring, cqe);
483 
484 	return 0;
485 }
486 
487 static int
488 ublk_queue_cmd_buf_sz(uint32_t q_depth)
489 {
490 	uint32_t size = q_depth * sizeof(struct ublksrv_io_desc);
491 	uint32_t page_sz = getpagesize();
492 
493 	/* round up size */
494 	return (size + page_sz - 1) & ~(page_sz - 1);
495 }
496 
497 static int
498 ublk_get_max_support_devs(void)
499 {
500 	FILE *file;
501 	char str[128];
502 
503 	file = fopen("/sys/module/ublk_drv/parameters/ublks_max", "r");
504 	if (!file) {
505 		return -ENOENT;
506 	}
507 
508 	if (!fgets(str, sizeof(str), file)) {
509 		fclose(file);
510 		return -EINVAL;
511 	}
512 	fclose(file);
513 
514 	spdk_str_chomp(str);
515 	return spdk_strtol(str, 10);
516 }
517 
518 static int
519 ublk_open(void)
520 {
521 	int rc, ublks_max;
522 
523 	g_ublk_tgt.ctrl_fd = open(UBLK_CTRL_DEV, O_RDWR);
524 	if (g_ublk_tgt.ctrl_fd < 0) {
525 		rc = errno;
526 		SPDK_ERRLOG("UBLK conrol dev %s can't be opened, error=%s\n", UBLK_CTRL_DEV, spdk_strerror(errno));
527 		return -rc;
528 	}
529 
530 	ublks_max = ublk_get_max_support_devs();
531 	if (ublks_max > 0) {
532 		g_ublks_max = ublks_max;
533 	}
534 
535 	/* We need to set SQPOLL for kernels 6.1 and earlier, since they would not defer ublk ctrl
536 	 * ring processing to a workqueue.  Ctrl ring processing is minimal, so SQPOLL is fine.
537 	 * All the commands sent via control uring for a ublk device is executed one by one, so use
538 	 * ublks_max * 2 as the number of uring entries is enough.
539 	 */
540 	rc = ublk_setup_ring(g_ublks_max * 2, &g_ublk_tgt.ctrl_ring,
541 			     IORING_SETUP_SQE128 | IORING_SETUP_SQPOLL);
542 	if (rc < 0) {
543 		SPDK_ERRLOG("UBLK ctrl queue_init: %s\n", spdk_strerror(-rc));
544 		goto err;
545 	}
546 
547 	rc = ublk_ctrl_cmd_get_features();
548 	if (rc) {
549 		goto err;
550 	}
551 
552 	return 0;
553 
554 err:
555 	close(g_ublk_tgt.ctrl_fd);
556 	g_ublk_tgt.ctrl_fd = -1;
557 	return rc;
558 }
559 
560 static int
561 ublk_parse_core_mask(const char *mask)
562 {
563 	struct spdk_cpuset tmp_mask;
564 	int rc;
565 
566 	if (mask == NULL) {
567 		spdk_env_get_cpuset(&g_core_mask);
568 		return 0;
569 	}
570 
571 	rc = spdk_cpuset_parse(&g_core_mask, mask);
572 	if (rc < 0) {
573 		SPDK_ERRLOG("invalid cpumask %s\n", mask);
574 		return -EINVAL;
575 	}
576 
577 	if (spdk_cpuset_count(&g_core_mask) == 0) {
578 		SPDK_ERRLOG("no cpus specified\n");
579 		return -EINVAL;
580 	}
581 
582 	spdk_env_get_cpuset(&tmp_mask);
583 	spdk_cpuset_and(&tmp_mask, &g_core_mask);
584 
585 	if (!spdk_cpuset_equal(&tmp_mask, &g_core_mask)) {
586 		SPDK_ERRLOG("one of selected cpu is outside of core mask(=%s)\n",
587 			    spdk_cpuset_fmt(&g_core_mask));
588 		return -EINVAL;
589 	}
590 
591 	return 0;
592 }
593 
594 static void
595 ublk_poller_register(void *args)
596 {
597 	struct ublk_poll_group *poll_group = args;
598 	int rc;
599 
600 	assert(spdk_get_thread() == poll_group->ublk_thread);
601 	/* Bind ublk spdk_thread to current CPU core in order to avoid thread context switch
602 	 * during uring processing as required by ublk kernel.
603 	 */
604 	spdk_thread_bind(spdk_get_thread(), true);
605 
606 	TAILQ_INIT(&poll_group->queue_list);
607 	poll_group->ublk_poller = SPDK_POLLER_REGISTER(ublk_poll, poll_group, 0);
608 	rc = spdk_iobuf_channel_init(&poll_group->iobuf_ch, "ublk",
609 				     UBLK_IOBUF_SMALL_CACHE_SIZE, UBLK_IOBUF_LARGE_CACHE_SIZE);
610 	if (rc != 0) {
611 		assert(false);
612 	}
613 }
614 
615 int
616 ublk_create_target(const char *cpumask_str)
617 {
618 	int rc;
619 	uint32_t i;
620 	char thread_name[32];
621 	struct ublk_poll_group *poll_group;
622 
623 	if (g_ublk_tgt.active == true) {
624 		SPDK_ERRLOG("UBLK target has been created\n");
625 		return -EBUSY;
626 	}
627 
628 	rc = ublk_parse_core_mask(cpumask_str);
629 	if (rc != 0) {
630 		return rc;
631 	}
632 
633 	assert(g_ublk_tgt.poll_groups == NULL);
634 	g_ublk_tgt.poll_groups = calloc(spdk_env_get_core_count(), sizeof(*poll_group));
635 	if (!g_ublk_tgt.poll_groups) {
636 		return -ENOMEM;
637 	}
638 
639 	rc = ublk_open();
640 	if (rc != 0) {
641 		SPDK_ERRLOG("Fail to open UBLK, error=%s\n", spdk_strerror(-rc));
642 		free(g_ublk_tgt.poll_groups);
643 		g_ublk_tgt.poll_groups = NULL;
644 		return rc;
645 	}
646 
647 	spdk_iobuf_register_module("ublk");
648 
649 	SPDK_ENV_FOREACH_CORE(i) {
650 		if (!spdk_cpuset_get_cpu(&g_core_mask, i)) {
651 			continue;
652 		}
653 		snprintf(thread_name, sizeof(thread_name), "ublk_thread%u", i);
654 		poll_group = &g_ublk_tgt.poll_groups[g_num_ublk_poll_groups];
655 		poll_group->ublk_thread = spdk_thread_create(thread_name, &g_core_mask);
656 		spdk_thread_send_msg(poll_group->ublk_thread, ublk_poller_register, poll_group);
657 		g_num_ublk_poll_groups++;
658 	}
659 
660 	assert(spdk_thread_is_app_thread(NULL));
661 	g_ublk_tgt.active = true;
662 	g_ublk_tgt.ctrl_ops_in_progress = 0;
663 	g_ublk_tgt.ctrl_poller = SPDK_POLLER_REGISTER(ublk_ctrl_poller, NULL,
664 				 UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US);
665 
666 	SPDK_NOTICELOG("UBLK target created successfully\n");
667 
668 	return 0;
669 }
670 
671 static void
672 _ublk_fini_done(void *args)
673 {
674 	SPDK_DEBUGLOG(ublk, "\n");
675 
676 	g_num_ublk_poll_groups = 0;
677 	g_next_ublk_poll_group = 0;
678 	g_ublk_tgt.is_destroying = false;
679 	g_ublk_tgt.active = false;
680 	g_ublk_tgt.features = 0;
681 	g_ublk_tgt.ioctl_encode = false;
682 	g_ublk_tgt.user_copy = false;
683 
684 	if (g_ublk_tgt.cb_fn) {
685 		g_ublk_tgt.cb_fn(g_ublk_tgt.cb_arg);
686 		g_ublk_tgt.cb_fn = NULL;
687 		g_ublk_tgt.cb_arg = NULL;
688 	}
689 
690 	if (g_ublk_tgt.poll_groups) {
691 		free(g_ublk_tgt.poll_groups);
692 		g_ublk_tgt.poll_groups = NULL;
693 	}
694 
695 }
696 
697 static void
698 ublk_thread_exit(void *args)
699 {
700 	struct spdk_thread *ublk_thread = spdk_get_thread();
701 	uint32_t i;
702 
703 	for (i = 0; i < g_num_ublk_poll_groups; i++) {
704 		if (g_ublk_tgt.poll_groups[i].ublk_thread == ublk_thread) {
705 			spdk_poller_unregister(&g_ublk_tgt.poll_groups[i].ublk_poller);
706 			spdk_iobuf_channel_fini(&g_ublk_tgt.poll_groups[i].iobuf_ch);
707 			spdk_thread_bind(ublk_thread, false);
708 			spdk_thread_exit(ublk_thread);
709 		}
710 	}
711 }
712 
713 static int
714 ublk_close_dev(struct spdk_ublk_dev *ublk)
715 {
716 	int rc;
717 
718 	/* set is_closing */
719 	if (ublk->is_closing) {
720 		return -EBUSY;
721 	}
722 	ublk->is_closing = true;
723 
724 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_STOP_DEV);
725 	if (rc < 0) {
726 		SPDK_ERRLOG("stop dev %d failed\n", ublk->ublk_id);
727 	}
728 	return rc;
729 }
730 
731 static void
732 _ublk_fini(void *args)
733 {
734 	struct spdk_ublk_dev	*ublk, *ublk_tmp;
735 
736 	TAILQ_FOREACH_SAFE(ublk, &g_ublk_devs, tailq, ublk_tmp) {
737 		ublk_close_dev(ublk);
738 	}
739 
740 	/* Check if all ublks closed */
741 	if (TAILQ_EMPTY(&g_ublk_devs)) {
742 		SPDK_DEBUGLOG(ublk, "finish shutdown\n");
743 		spdk_poller_unregister(&g_ublk_tgt.ctrl_poller);
744 		if (g_ublk_tgt.ctrl_ring.ring_fd >= 0) {
745 			io_uring_queue_exit(&g_ublk_tgt.ctrl_ring);
746 			g_ublk_tgt.ctrl_ring.ring_fd = -1;
747 		}
748 		if (g_ublk_tgt.ctrl_fd >= 0) {
749 			close(g_ublk_tgt.ctrl_fd);
750 			g_ublk_tgt.ctrl_fd = -1;
751 		}
752 		spdk_for_each_thread(ublk_thread_exit, NULL, _ublk_fini_done);
753 	} else {
754 		spdk_thread_send_msg(spdk_get_thread(), _ublk_fini, NULL);
755 	}
756 }
757 
758 int
759 spdk_ublk_fini(spdk_ublk_fini_cb cb_fn, void *cb_arg)
760 {
761 	assert(spdk_thread_is_app_thread(NULL));
762 
763 	if (g_ublk_tgt.is_destroying == true) {
764 		/* UBLK target is being destroying */
765 		return -EBUSY;
766 	}
767 	g_ublk_tgt.cb_fn = cb_fn;
768 	g_ublk_tgt.cb_arg = cb_arg;
769 	g_ublk_tgt.is_destroying = true;
770 	_ublk_fini(NULL);
771 
772 	return 0;
773 }
774 
775 int
776 ublk_destroy_target(spdk_ublk_fini_cb cb_fn, void *cb_arg)
777 {
778 	int rc;
779 
780 	if (g_ublk_tgt.active == false) {
781 		/* UBLK target has not been created */
782 		return -ENOENT;
783 	}
784 
785 	rc = spdk_ublk_fini(cb_fn, cb_arg);
786 
787 	return rc;
788 }
789 
790 struct spdk_ublk_dev *
791 ublk_dev_find_by_id(uint32_t ublk_id)
792 {
793 	struct spdk_ublk_dev *ublk;
794 
795 	/* check whether ublk has already been registered by ublk path. */
796 	TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) {
797 		if (ublk->ublk_id == ublk_id) {
798 			return ublk;
799 		}
800 	}
801 
802 	return NULL;
803 }
804 
805 uint32_t
806 ublk_dev_get_id(struct spdk_ublk_dev *ublk)
807 {
808 	return ublk->ublk_id;
809 }
810 
811 struct spdk_ublk_dev *ublk_dev_first(void)
812 {
813 	return TAILQ_FIRST(&g_ublk_devs);
814 }
815 
816 struct spdk_ublk_dev *ublk_dev_next(struct spdk_ublk_dev *prev)
817 {
818 	return TAILQ_NEXT(prev, tailq);
819 }
820 
821 uint32_t
822 ublk_dev_get_queue_depth(struct spdk_ublk_dev *ublk)
823 {
824 	return ublk->queue_depth;
825 }
826 
827 uint32_t
828 ublk_dev_get_num_queues(struct spdk_ublk_dev *ublk)
829 {
830 	return ublk->num_queues;
831 }
832 
833 const char *
834 ublk_dev_get_bdev_name(struct spdk_ublk_dev *ublk)
835 {
836 	return spdk_bdev_get_name(ublk->bdev);
837 }
838 
839 void
840 spdk_ublk_write_config_json(struct spdk_json_write_ctx *w)
841 {
842 	struct spdk_ublk_dev *ublk;
843 
844 	spdk_json_write_array_begin(w);
845 
846 	if (g_ublk_tgt.active) {
847 		spdk_json_write_object_begin(w);
848 
849 		spdk_json_write_named_string(w, "method", "ublk_create_target");
850 		spdk_json_write_named_object_begin(w, "params");
851 		spdk_json_write_named_string(w, "cpumask", spdk_cpuset_fmt(&g_core_mask));
852 		spdk_json_write_object_end(w);
853 
854 		spdk_json_write_object_end(w);
855 	}
856 
857 	TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) {
858 		spdk_json_write_object_begin(w);
859 
860 		spdk_json_write_named_string(w, "method", "ublk_start_disk");
861 
862 		spdk_json_write_named_object_begin(w, "params");
863 		spdk_json_write_named_string(w, "bdev_name", ublk_dev_get_bdev_name(ublk));
864 		spdk_json_write_named_uint32(w, "ublk_id", ublk->ublk_id);
865 		spdk_json_write_named_uint32(w, "num_queues", ublk->num_queues);
866 		spdk_json_write_named_uint32(w, "queue_depth", ublk->queue_depth);
867 		spdk_json_write_object_end(w);
868 
869 		spdk_json_write_object_end(w);
870 	}
871 
872 	spdk_json_write_array_end(w);
873 }
874 
875 static void
876 ublk_dev_list_register(struct spdk_ublk_dev *ublk)
877 {
878 	UBLK_DEBUGLOG(ublk, "add to tailq\n");
879 	TAILQ_INSERT_TAIL(&g_ublk_devs, ublk, tailq);
880 	g_ublk_tgt.num_ublk_devs++;
881 }
882 
883 static void
884 ublk_dev_list_unregister(struct spdk_ublk_dev *ublk)
885 {
886 	/*
887 	 * ublk device may be stopped before registered.
888 	 * check whether it was registered.
889 	 */
890 
891 	if (ublk_dev_find_by_id(ublk->ublk_id)) {
892 		UBLK_DEBUGLOG(ublk, "remove from tailq\n");
893 		TAILQ_REMOVE(&g_ublk_devs, ublk, tailq);
894 		assert(g_ublk_tgt.num_ublk_devs);
895 		g_ublk_tgt.num_ublk_devs--;
896 		return;
897 	}
898 
899 	UBLK_DEBUGLOG(ublk, "not found in tailq\n");
900 	assert(false);
901 }
902 
903 static void
904 ublk_delete_dev(void *arg)
905 {
906 	struct spdk_ublk_dev *ublk = arg;
907 	int rc = 0;
908 	uint32_t q_idx;
909 
910 	assert(spdk_thread_is_app_thread(NULL));
911 	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
912 		ublk_dev_queue_fini(&ublk->queues[q_idx]);
913 	}
914 
915 	if (ublk->cdev_fd >= 0) {
916 		close(ublk->cdev_fd);
917 	}
918 
919 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_DEL_DEV);
920 	if (rc < 0) {
921 		SPDK_ERRLOG("delete dev %d failed\n", ublk->ublk_id);
922 	}
923 }
924 
925 static int
926 _ublk_close_dev_retry(void *arg)
927 {
928 	struct spdk_ublk_dev *ublk = arg;
929 
930 	if (ublk->ctrl_ops_in_progress > 0) {
931 		if (ublk->retry_count-- > 0) {
932 			return SPDK_POLLER_BUSY;
933 		}
934 		SPDK_ERRLOG("Timeout on ctrl op completion.\n");
935 	}
936 	spdk_poller_unregister(&ublk->retry_poller);
937 	ublk_delete_dev(ublk);
938 	return SPDK_POLLER_BUSY;
939 }
940 
941 static void
942 ublk_try_close_dev(void *arg)
943 {
944 	struct spdk_ublk_dev *ublk = arg;
945 
946 	assert(spdk_thread_is_app_thread(NULL));
947 
948 	ublk->queues_closed += 1;
949 	SPDK_DEBUGLOG(ublk_io, "ublkb%u closed queues %u\n", ublk->ublk_id, ublk->queues_closed);
950 
951 	if (ublk->queues_closed < ublk->num_queues) {
952 		return;
953 	}
954 
955 	if (ublk->ctrl_ops_in_progress > 0) {
956 		assert(ublk->retry_poller == NULL);
957 		ublk->retry_count = UBLK_STOP_BUSY_WAITING_MS * 1000ULL / UBLK_BUSY_POLLING_INTERVAL_US;
958 		ublk->retry_poller = SPDK_POLLER_REGISTER(_ublk_close_dev_retry, ublk,
959 				     UBLK_BUSY_POLLING_INTERVAL_US);
960 	} else {
961 		ublk_delete_dev(ublk);
962 	}
963 }
964 
965 static void
966 ublk_try_close_queue(struct ublk_queue *q)
967 {
968 	struct spdk_ublk_dev *ublk = q->dev;
969 
970 	/* Close queue until no I/O is submitted to bdev in flight,
971 	 * no I/O is waiting to commit result, and all I/Os are aborted back.
972 	 */
973 	if (!TAILQ_EMPTY(&q->inflight_io_list) || !TAILQ_EMPTY(&q->completed_io_list) || q->cmd_inflight) {
974 		/* wait for next retry */
975 		return;
976 	}
977 
978 	TAILQ_REMOVE(&q->poll_group->queue_list, q, tailq);
979 	spdk_put_io_channel(q->bdev_ch);
980 	q->bdev_ch = NULL;
981 
982 	spdk_thread_send_msg(spdk_thread_get_app_thread(), ublk_try_close_dev, ublk);
983 }
984 
985 int
986 ublk_stop_disk(uint32_t ublk_id, ublk_del_cb del_cb, void *cb_arg)
987 {
988 	struct spdk_ublk_dev *ublk;
989 
990 	assert(spdk_thread_is_app_thread(NULL));
991 
992 	ublk = ublk_dev_find_by_id(ublk_id);
993 	if (ublk == NULL) {
994 		SPDK_ERRLOG("no ublk dev with ublk_id=%u\n", ublk_id);
995 		return -ENODEV;
996 	}
997 	if (ublk->is_closing) {
998 		SPDK_WARNLOG("ublk %d is closing\n", ublk->ublk_id);
999 		return -EBUSY;
1000 	}
1001 
1002 	ublk->del_cb = del_cb;
1003 	ublk->cb_arg = cb_arg;
1004 	return ublk_close_dev(ublk);
1005 }
1006 
1007 static inline void
1008 ublk_mark_io_done(struct ublk_io *io, int res)
1009 {
1010 	/*
1011 	 * mark io done by target, so that SPDK can commit its
1012 	 * result and fetch new request via io_uring command.
1013 	 */
1014 	io->cmd_op = UBLK_IO_COMMIT_AND_FETCH_REQ;
1015 	io->result = res;
1016 	io->need_data = false;
1017 }
1018 
1019 static void
1020 ublk_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1021 {
1022 	struct ublk_io	*io = cb_arg;
1023 	struct ublk_queue *q = io->q;
1024 	int res;
1025 
1026 	if (success) {
1027 		res = io->result;
1028 	} else {
1029 		res = -EIO;
1030 	}
1031 
1032 	ublk_mark_io_done(io, res);
1033 
1034 	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %d res %d)\n",
1035 		      q->q_id, io->tag, res);
1036 	TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1037 	TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq);
1038 
1039 	if (bdev_io != NULL) {
1040 		spdk_bdev_free_io(bdev_io);
1041 	}
1042 }
1043 
1044 static void
1045 ublk_queue_user_copy(struct ublk_io *io, bool is_write)
1046 {
1047 	struct ublk_queue *q = io->q;
1048 	const struct ublksrv_io_desc *iod = io->iod;
1049 	struct io_uring_sqe *sqe;
1050 	uint64_t pos;
1051 	uint32_t nbytes;
1052 
1053 	nbytes = iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1054 	pos = ublk_user_copy_pos(q->q_id, io->tag);
1055 	sqe = io_uring_get_sqe(&q->ring);
1056 	assert(sqe);
1057 
1058 	if (is_write) {
1059 		io_uring_prep_read(sqe, 0, io->payload, nbytes, pos);
1060 	} else {
1061 		io_uring_prep_write(sqe, 0, io->payload, nbytes, pos);
1062 	}
1063 	io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
1064 	io_uring_sqe_set_data64(sqe, build_user_data(io->tag, 0));
1065 
1066 	io->user_copy = true;
1067 	TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1068 	TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq);
1069 }
1070 
1071 static void
1072 ublk_user_copy_read_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1073 {
1074 	struct ublk_io	*io = cb_arg;
1075 
1076 	spdk_bdev_free_io(bdev_io);
1077 
1078 	if (success) {
1079 		ublk_queue_user_copy(io, false);
1080 		return;
1081 	}
1082 	/* READ IO Error */
1083 	ublk_io_done(NULL, false, cb_arg);
1084 }
1085 
1086 static void
1087 ublk_resubmit_io(void *arg)
1088 {
1089 	struct ublk_io *io = (struct ublk_io *)arg;
1090 
1091 	_ublk_submit_bdev_io(io->q, io);
1092 }
1093 
1094 static void
1095 ublk_queue_io(struct ublk_io *io)
1096 {
1097 	int rc;
1098 	struct spdk_bdev *bdev = io->q->dev->bdev;
1099 	struct ublk_queue *q = io->q;
1100 
1101 	io->bdev_io_wait.bdev = bdev;
1102 	io->bdev_io_wait.cb_fn = ublk_resubmit_io;
1103 	io->bdev_io_wait.cb_arg = io;
1104 
1105 	rc = spdk_bdev_queue_io_wait(bdev, q->bdev_ch, &io->bdev_io_wait);
1106 	if (rc != 0) {
1107 		SPDK_ERRLOG("Queue io failed in ublk_queue_io, rc=%d.\n", rc);
1108 		ublk_io_done(NULL, false, io);
1109 	}
1110 }
1111 
1112 static void
1113 ublk_io_get_buffer_cb(struct spdk_iobuf_entry *iobuf, void *buf)
1114 {
1115 	struct ublk_io *io = SPDK_CONTAINEROF(iobuf, struct ublk_io, iobuf);
1116 
1117 	io->mpool_entry = buf;
1118 	assert(io->payload == NULL);
1119 	io->payload = (void *)(uintptr_t)SPDK_ALIGN_CEIL((uintptr_t)buf, 4096ULL);
1120 	io->get_buf_cb(io);
1121 }
1122 
1123 static void
1124 ublk_io_get_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch,
1125 		   ublk_get_buf_cb get_buf_cb)
1126 {
1127 	void *buf;
1128 
1129 	io->payload_size = io->iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1130 	io->get_buf_cb = get_buf_cb;
1131 	buf = spdk_iobuf_get(iobuf_ch, io->payload_size, &io->iobuf, ublk_io_get_buffer_cb);
1132 
1133 	if (buf != NULL) {
1134 		ublk_io_get_buffer_cb(&io->iobuf, buf);
1135 	}
1136 }
1137 
1138 static void
1139 ublk_io_put_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch)
1140 {
1141 	if (io->payload) {
1142 		spdk_iobuf_put(iobuf_ch, io->mpool_entry, io->payload_size);
1143 		io->mpool_entry = NULL;
1144 		io->payload = NULL;
1145 	}
1146 }
1147 
1148 static void
1149 _ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io)
1150 {
1151 	struct spdk_ublk_dev *ublk = q->dev;
1152 	struct spdk_bdev_desc *desc = io->bdev_desc;
1153 	struct spdk_io_channel *ch = io->bdev_ch;
1154 	uint64_t offset_blocks, num_blocks;
1155 	spdk_bdev_io_completion_cb read_cb;
1156 	uint8_t ublk_op;
1157 	int rc = 0;
1158 	const struct ublksrv_io_desc *iod = io->iod;
1159 
1160 	ublk_op = ublksrv_get_op(iod);
1161 	offset_blocks = iod->start_sector >> ublk->sector_per_block_shift;
1162 	num_blocks = iod->nr_sectors >> ublk->sector_per_block_shift;
1163 
1164 	switch (ublk_op) {
1165 	case UBLK_IO_OP_READ:
1166 		if (g_ublk_tgt.user_copy) {
1167 			read_cb = ublk_user_copy_read_done;
1168 		} else {
1169 			read_cb = ublk_io_done;
1170 		}
1171 		rc = spdk_bdev_read_blocks(desc, ch, io->payload, offset_blocks, num_blocks, read_cb, io);
1172 		break;
1173 	case UBLK_IO_OP_WRITE:
1174 		rc = spdk_bdev_write_blocks(desc, ch, io->payload, offset_blocks, num_blocks, ublk_io_done, io);
1175 		break;
1176 	case UBLK_IO_OP_FLUSH:
1177 		rc = spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1178 		break;
1179 	case UBLK_IO_OP_DISCARD:
1180 		rc = spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1181 		break;
1182 	case UBLK_IO_OP_WRITE_ZEROES:
1183 		rc = spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1184 		break;
1185 	default:
1186 		rc = -1;
1187 	}
1188 
1189 	if (rc < 0) {
1190 		if (rc == -ENOMEM) {
1191 			SPDK_INFOLOG(ublk, "No memory, start to queue io.\n");
1192 			ublk_queue_io(io);
1193 		} else {
1194 			SPDK_ERRLOG("ublk io failed in ublk_queue_io, rc=%d.\n", rc);
1195 			ublk_io_done(NULL, false, io);
1196 		}
1197 	}
1198 }
1199 
1200 static void
1201 read_get_buffer_done(struct ublk_io *io)
1202 {
1203 	_ublk_submit_bdev_io(io->q, io);
1204 }
1205 
1206 static void
1207 user_copy_write_get_buffer_done(struct ublk_io *io)
1208 {
1209 	ublk_queue_user_copy(io, true);
1210 }
1211 
1212 static void
1213 ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io)
1214 {
1215 	struct spdk_iobuf_channel *iobuf_ch = &q->poll_group->iobuf_ch;
1216 	const struct ublksrv_io_desc *iod = io->iod;
1217 	uint8_t ublk_op;
1218 
1219 	io->result = iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1220 	ublk_op = ublksrv_get_op(iod);
1221 	switch (ublk_op) {
1222 	case UBLK_IO_OP_READ:
1223 		ublk_io_get_buffer(io, iobuf_ch, read_get_buffer_done);
1224 		break;
1225 	case UBLK_IO_OP_WRITE:
1226 		if (g_ublk_tgt.user_copy) {
1227 			ublk_io_get_buffer(io, iobuf_ch, user_copy_write_get_buffer_done);
1228 		} else {
1229 			_ublk_submit_bdev_io(q, io);
1230 		}
1231 		break;
1232 	default:
1233 		_ublk_submit_bdev_io(q, io);
1234 		break;
1235 	}
1236 }
1237 
1238 static inline void
1239 ublksrv_queue_io_cmd(struct ublk_queue *q,
1240 		     struct ublk_io *io, unsigned tag)
1241 {
1242 	struct ublksrv_io_cmd *cmd;
1243 	struct io_uring_sqe *sqe;
1244 	unsigned int cmd_op = 0;;
1245 	uint64_t user_data;
1246 
1247 	/* each io should have operation of fetching or committing */
1248 	assert((io->cmd_op == UBLK_IO_FETCH_REQ) || (io->cmd_op == UBLK_IO_NEED_GET_DATA) ||
1249 	       (io->cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ));
1250 	cmd_op = io->cmd_op;
1251 
1252 	sqe = io_uring_get_sqe(&q->ring);
1253 	assert(sqe);
1254 
1255 	cmd = (struct ublksrv_io_cmd *)ublk_get_sqe_cmd(sqe);
1256 	if (cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ) {
1257 		cmd->result = io->result;
1258 	}
1259 
1260 	/* These fields should be written once, never change */
1261 	ublk_set_sqe_cmd_op(sqe, cmd_op);
1262 	/* dev->cdev_fd */
1263 	sqe->fd		= 0;
1264 	sqe->opcode	= IORING_OP_URING_CMD;
1265 	sqe->flags	= IOSQE_FIXED_FILE;
1266 	sqe->rw_flags	= 0;
1267 	cmd->tag	= tag;
1268 	cmd->addr	= g_ublk_tgt.user_copy ? 0 : (__u64)(uintptr_t)(io->payload);
1269 	cmd->q_id	= q->q_id;
1270 
1271 	user_data = build_user_data(tag, cmd_op);
1272 	io_uring_sqe_set_data64(sqe, user_data);
1273 
1274 	io->cmd_op = 0;
1275 
1276 	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %u cmd_op %u) iof %x stopping %d\n",
1277 		      q->q_id, tag, cmd_op,
1278 		      io->cmd_op, q->is_stopping);
1279 }
1280 
1281 static int
1282 ublk_io_xmit(struct ublk_queue *q)
1283 {
1284 	TAILQ_HEAD(, ublk_io) buffer_free_list;
1285 	struct spdk_iobuf_channel *iobuf_ch;
1286 	int rc = 0, count = 0;
1287 	struct ublk_io *io;
1288 
1289 	if (TAILQ_EMPTY(&q->completed_io_list)) {
1290 		return 0;
1291 	}
1292 
1293 	TAILQ_INIT(&buffer_free_list);
1294 	while (!TAILQ_EMPTY(&q->completed_io_list)) {
1295 		io = TAILQ_FIRST(&q->completed_io_list);
1296 		assert(io != NULL);
1297 		/*
1298 		 * Remove IO from list now assuming it will be completed. It will be inserted
1299 		 * back to the head if it cannot be completed. This approach is specifically
1300 		 * taken to work around a scan-build use-after-free mischaracterization.
1301 		 */
1302 		TAILQ_REMOVE(&q->completed_io_list, io, tailq);
1303 		if (!io->user_copy) {
1304 			if (!io->need_data) {
1305 				TAILQ_INSERT_TAIL(&buffer_free_list, io, tailq);
1306 			}
1307 			ublksrv_queue_io_cmd(q, io, io->tag);
1308 		}
1309 		count++;
1310 	}
1311 
1312 	q->cmd_inflight += count;
1313 	rc = io_uring_submit(&q->ring);
1314 	if (rc != count) {
1315 		SPDK_ERRLOG("could not submit all commands\n");
1316 		assert(false);
1317 	}
1318 
1319 	/* Note: for READ io, ublk will always copy the data out of
1320 	 * the buffers in the io_uring_submit context.  Since we
1321 	 * are not using SQPOLL for IO rings, we can safely free
1322 	 * those IO buffers here.  This design doesn't seem ideal,
1323 	 * but it's what's possible since there is no discrete
1324 	 * COMMIT_REQ operation.  That will need to change in the
1325 	 * future should we ever want to support async copy
1326 	 * operations.
1327 	 */
1328 	iobuf_ch = &q->poll_group->iobuf_ch;
1329 	while (!TAILQ_EMPTY(&buffer_free_list)) {
1330 		io = TAILQ_FIRST(&buffer_free_list);
1331 		TAILQ_REMOVE(&buffer_free_list, io, tailq);
1332 		ublk_io_put_buffer(io, iobuf_ch);
1333 	}
1334 	return rc;
1335 }
1336 
1337 static void
1338 write_get_buffer_done(struct ublk_io *io)
1339 {
1340 	io->need_data = true;
1341 	io->cmd_op = UBLK_IO_NEED_GET_DATA;
1342 	io->result = 0;
1343 
1344 	TAILQ_REMOVE(&io->q->inflight_io_list, io, tailq);
1345 	TAILQ_INSERT_TAIL(&io->q->completed_io_list, io, tailq);
1346 }
1347 
1348 static int
1349 ublk_io_recv(struct ublk_queue *q)
1350 {
1351 	struct io_uring_cqe *cqe;
1352 	unsigned head, tag;
1353 	int fetch, count = 0;
1354 	struct ublk_io *io;
1355 	struct spdk_iobuf_channel *iobuf_ch;
1356 
1357 	if (q->cmd_inflight == 0) {
1358 		return 0;
1359 	}
1360 
1361 	iobuf_ch = &q->poll_group->iobuf_ch;
1362 	io_uring_for_each_cqe(&q->ring, head, cqe) {
1363 		tag = user_data_to_tag(cqe->user_data);
1364 		io = &q->ios[tag];
1365 
1366 		SPDK_DEBUGLOG(ublk_io, "res %d qid %d tag %u, user copy %u, cmd_op %u\n",
1367 			      cqe->res, q->q_id, tag, io->user_copy, user_data_to_op(cqe->user_data));
1368 
1369 		q->cmd_inflight--;
1370 		TAILQ_INSERT_TAIL(&q->inflight_io_list, io, tailq);
1371 
1372 		if (!io->user_copy) {
1373 			fetch = (cqe->res != UBLK_IO_RES_ABORT) && !q->is_stopping;
1374 			if (!fetch) {
1375 				q->is_stopping = true;
1376 				if (io->cmd_op == UBLK_IO_FETCH_REQ) {
1377 					io->cmd_op = 0;
1378 				}
1379 			}
1380 
1381 			if (cqe->res == UBLK_IO_RES_OK) {
1382 				ublk_submit_bdev_io(q, io);
1383 			} else if (cqe->res == UBLK_IO_RES_NEED_GET_DATA) {
1384 				ublk_io_get_buffer(io, iobuf_ch, write_get_buffer_done);
1385 			} else {
1386 				if (cqe->res != UBLK_IO_RES_ABORT) {
1387 					SPDK_ERRLOG("ublk received error io: res %d qid %d tag %u cmd_op %u\n",
1388 						    cqe->res, q->q_id, tag, user_data_to_op(cqe->user_data));
1389 				}
1390 				TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1391 			}
1392 		} else {
1393 
1394 			/* clear `user_copy` for next use of this IO structure */
1395 			io->user_copy = false;
1396 
1397 			assert((ublksrv_get_op(io->iod) == UBLK_IO_OP_READ) ||
1398 			       (ublksrv_get_op(io->iod) == UBLK_IO_OP_WRITE));
1399 			if (cqe->res != io->result) {
1400 				/* EIO */
1401 				ublk_io_done(NULL, false, io);
1402 			} else {
1403 				if (ublksrv_get_op(io->iod) == UBLK_IO_OP_READ) {
1404 					/* bdev_io is already freed in first READ cycle */
1405 					ublk_io_done(NULL, true, io);
1406 				} else {
1407 					_ublk_submit_bdev_io(q, io);
1408 				}
1409 			}
1410 		}
1411 		count += 1;
1412 		if (count == UBLK_QUEUE_REQUEST) {
1413 			break;
1414 		}
1415 	}
1416 	io_uring_cq_advance(&q->ring, count);
1417 
1418 	return count;
1419 }
1420 
1421 static int
1422 ublk_poll(void *arg)
1423 {
1424 	struct ublk_poll_group *poll_group = arg;
1425 	struct ublk_queue *q, *q_tmp;
1426 	int sent, received, count = 0;
1427 
1428 	TAILQ_FOREACH_SAFE(q, &poll_group->queue_list, tailq, q_tmp) {
1429 		sent = ublk_io_xmit(q);
1430 		received = ublk_io_recv(q);
1431 		if (spdk_unlikely(q->is_stopping)) {
1432 			ublk_try_close_queue(q);
1433 		}
1434 		count += sent + received;
1435 	}
1436 	if (count > 0) {
1437 		return SPDK_POLLER_BUSY;
1438 	} else {
1439 		return SPDK_POLLER_IDLE;
1440 	}
1441 }
1442 
1443 static void
1444 ublk_bdev_hot_remove(struct spdk_ublk_dev *ublk)
1445 {
1446 	ublk_close_dev(ublk);
1447 }
1448 
1449 static void
1450 ublk_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1451 		   void *event_ctx)
1452 {
1453 	switch (type) {
1454 	case SPDK_BDEV_EVENT_REMOVE:
1455 		ublk_bdev_hot_remove(event_ctx);
1456 		break;
1457 	default:
1458 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1459 		break;
1460 	}
1461 }
1462 
1463 static void
1464 ublk_dev_init_io_cmds(struct io_uring *r, uint32_t q_depth)
1465 {
1466 	struct io_uring_sqe *sqe;
1467 	uint32_t i;
1468 
1469 	for (i = 0; i < q_depth; i++) {
1470 		sqe = ublk_uring_get_sqe(r, i);
1471 
1472 		/* These fields should be written once, never change */
1473 		sqe->flags = IOSQE_FIXED_FILE;
1474 		sqe->rw_flags = 0;
1475 		sqe->ioprio = 0;
1476 		sqe->off = 0;
1477 	}
1478 }
1479 
1480 static int
1481 ublk_dev_queue_init(struct ublk_queue *q)
1482 {
1483 	int rc = 0, cmd_buf_size;
1484 	uint32_t j;
1485 	struct spdk_ublk_dev *ublk = q->dev;
1486 	unsigned long off;
1487 
1488 	cmd_buf_size = ublk_queue_cmd_buf_sz(q->q_depth);
1489 	off = UBLKSRV_CMD_BUF_OFFSET +
1490 	      q->q_id * (UBLK_MAX_QUEUE_DEPTH * sizeof(struct ublksrv_io_desc));
1491 	q->io_cmd_buf = (struct ublksrv_io_desc *)mmap(0, cmd_buf_size, PROT_READ,
1492 			MAP_SHARED | MAP_POPULATE, ublk->cdev_fd, off);
1493 	if (q->io_cmd_buf == MAP_FAILED) {
1494 		q->io_cmd_buf = NULL;
1495 		rc = -errno;
1496 		SPDK_ERRLOG("Failed at mmap: %s\n", spdk_strerror(-rc));
1497 		goto err;
1498 	}
1499 
1500 	for (j = 0; j < q->q_depth; j++) {
1501 		q->ios[j].cmd_op = UBLK_IO_FETCH_REQ;
1502 		q->ios[j].iod = &q->io_cmd_buf[j];
1503 	}
1504 
1505 	rc = ublk_setup_ring(q->q_depth, &q->ring, IORING_SETUP_SQE128);
1506 	if (rc < 0) {
1507 		SPDK_ERRLOG("Failed at setup uring: %s\n", spdk_strerror(-rc));
1508 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1509 		q->io_cmd_buf = NULL;
1510 		goto err;
1511 	}
1512 
1513 	rc = io_uring_register_files(&q->ring, &ublk->cdev_fd, 1);
1514 	if (rc != 0) {
1515 		SPDK_ERRLOG("Failed at uring register files: %s\n", spdk_strerror(-rc));
1516 		io_uring_queue_exit(&q->ring);
1517 		q->ring.ring_fd = -1;
1518 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1519 		q->io_cmd_buf = NULL;
1520 		goto err;
1521 	}
1522 
1523 	ublk_dev_init_io_cmds(&q->ring, q->q_depth);
1524 
1525 err:
1526 	return rc;
1527 }
1528 
1529 static void
1530 ublk_dev_queue_fini(struct ublk_queue *q)
1531 {
1532 	if (q->ring.ring_fd >= 0) {
1533 		io_uring_unregister_files(&q->ring);
1534 		io_uring_queue_exit(&q->ring);
1535 		q->ring.ring_fd = -1;
1536 	}
1537 	if (q->io_cmd_buf) {
1538 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1539 	}
1540 }
1541 
1542 static void
1543 ublk_dev_queue_io_init(struct ublk_queue *q)
1544 {
1545 	struct ublk_io *io;
1546 	uint32_t i;
1547 	int rc __attribute__((unused));
1548 	void *buf;
1549 
1550 	/* Some older kernels require a buffer to get posted, even
1551 	 * when NEED_GET_DATA has been specified.  So allocate a
1552 	 * temporary buffer, only for purposes of this workaround.
1553 	 * It never actually gets used, so we will free it immediately
1554 	 * after all of the commands are posted.
1555 	 */
1556 	buf = malloc(64);
1557 
1558 	assert(q->bdev_ch != NULL);
1559 
1560 	/* Initialize and submit all io commands to ublk driver */
1561 	for (i = 0; i < q->q_depth; i++) {
1562 		io = &q->ios[i];
1563 		io->tag = (uint16_t)i;
1564 		io->payload = buf;
1565 		io->bdev_ch = q->bdev_ch;
1566 		io->bdev_desc = q->dev->bdev_desc;
1567 		ublksrv_queue_io_cmd(q, io, i);
1568 	}
1569 
1570 	q->cmd_inflight += q->q_depth;
1571 	rc = io_uring_submit(&q->ring);
1572 	assert(rc == (int)q->q_depth);
1573 	for (i = 0; i < q->q_depth; i++) {
1574 		io = &q->ios[i];
1575 		io->payload = NULL;
1576 	}
1577 	free(buf);
1578 }
1579 
1580 static void
1581 ublk_set_params(struct spdk_ublk_dev *ublk)
1582 {
1583 	int rc;
1584 
1585 	ublk->dev_params.len = sizeof(struct ublk_params);
1586 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_SET_PARAMS);
1587 	if (rc < 0) {
1588 		SPDK_ERRLOG("UBLK can't set params for dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
1589 		ublk_delete_dev(ublk);
1590 		if (ublk->start_cb) {
1591 			ublk->start_cb(ublk->cb_arg, rc);
1592 			ublk->start_cb = NULL;
1593 		}
1594 	}
1595 }
1596 
1597 /* Set ublk device parameters based on bdev */
1598 static void
1599 ublk_info_param_init(struct spdk_ublk_dev *ublk)
1600 {
1601 	struct spdk_bdev *bdev = ublk->bdev;
1602 	uint32_t blk_size = spdk_bdev_get_data_block_size(bdev);
1603 	uint32_t pblk_size = spdk_bdev_get_physical_block_size(bdev);
1604 	uint32_t io_opt_blocks = spdk_bdev_get_optimal_io_boundary(bdev);
1605 	uint64_t num_blocks = spdk_bdev_get_num_blocks(bdev);
1606 	uint8_t sectors_per_block = blk_size >> LINUX_SECTOR_SHIFT;
1607 	uint32_t io_min_size = blk_size;
1608 	uint32_t io_opt_size = spdk_max(io_opt_blocks * blk_size, io_min_size);
1609 
1610 	struct ublksrv_ctrl_dev_info uinfo = {
1611 		.queue_depth = ublk->queue_depth,
1612 		.nr_hw_queues = ublk->num_queues,
1613 		.dev_id = ublk->ublk_id,
1614 		.max_io_buf_bytes = UBLK_IO_MAX_BYTES,
1615 		.ublksrv_pid = getpid(),
1616 		.flags = UBLK_F_URING_CMD_COMP_IN_TASK,
1617 	};
1618 	struct ublk_params uparams = {
1619 		.types = UBLK_PARAM_TYPE_BASIC,
1620 		.basic = {
1621 			.logical_bs_shift = spdk_u32log2(blk_size),
1622 			.physical_bs_shift = spdk_u32log2(pblk_size),
1623 			.io_min_shift = spdk_u32log2(io_min_size),
1624 			.io_opt_shift = spdk_u32log2(io_opt_size),
1625 			.dev_sectors = num_blocks * sectors_per_block,
1626 			.max_sectors = UBLK_IO_MAX_BYTES >> LINUX_SECTOR_SHIFT,
1627 		}
1628 	};
1629 
1630 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1631 		uparams.types |= UBLK_PARAM_TYPE_DISCARD;
1632 		uparams.discard.discard_alignment = sectors_per_block;
1633 		uparams.discard.max_discard_sectors = num_blocks * sectors_per_block;
1634 		uparams.discard.max_discard_segments = 1;
1635 		uparams.discard.discard_granularity = blk_size;
1636 		if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1637 			uparams.discard.max_write_zeroes_sectors = num_blocks * sectors_per_block;
1638 		}
1639 	}
1640 
1641 	if (g_ublk_tgt.user_copy) {
1642 		uinfo.flags |= UBLK_F_USER_COPY;
1643 	} else {
1644 		uinfo.flags |= UBLK_F_NEED_GET_DATA;
1645 	}
1646 
1647 	ublk->dev_info = uinfo;
1648 	ublk->dev_params = uparams;
1649 }
1650 
1651 static void
1652 _ublk_free_dev(void *arg)
1653 {
1654 	struct spdk_ublk_dev *ublk = arg;
1655 
1656 	ublk_free_dev(ublk);
1657 }
1658 
1659 static void
1660 free_buffers(void *arg)
1661 {
1662 	struct ublk_queue *q = arg;
1663 	uint32_t i;
1664 
1665 	for (i = 0; i < q->q_depth; i++) {
1666 		ublk_io_put_buffer(&q->ios[i], &q->poll_group->iobuf_ch);
1667 	}
1668 	free(q->ios);
1669 	q->ios = NULL;
1670 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _ublk_free_dev, q->dev);
1671 }
1672 
1673 static void
1674 ublk_free_dev(struct spdk_ublk_dev *ublk)
1675 {
1676 	struct ublk_queue *q;
1677 	uint32_t q_idx;
1678 
1679 	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
1680 		q = &ublk->queues[q_idx];
1681 
1682 		/* The ublk_io of this queue are not initialized. */
1683 		if (q->ios == NULL) {
1684 			continue;
1685 		}
1686 
1687 		/* We found a queue that has an ios array that may have buffers
1688 		 * that need to be freed.  Send a message to the queue's thread
1689 		 * so it can free the buffers back to that thread's iobuf channel.
1690 		 * When it's done, it will set q->ios to NULL and send a message
1691 		 * back to this function to continue.
1692 		 */
1693 		if (q->poll_group) {
1694 			spdk_thread_send_msg(q->poll_group->ublk_thread, free_buffers, q);
1695 			return;
1696 		} else {
1697 			free(q->ios);
1698 			q->ios = NULL;
1699 		}
1700 	}
1701 
1702 	/* All of the buffers associated with the queues have been freed, so now
1703 	 * continue with releasing resources for the rest of the ublk device.
1704 	 */
1705 	if (ublk->bdev_desc) {
1706 		spdk_bdev_close(ublk->bdev_desc);
1707 		ublk->bdev_desc = NULL;
1708 	}
1709 
1710 	ublk_dev_list_unregister(ublk);
1711 
1712 	if (ublk->del_cb) {
1713 		ublk->del_cb(ublk->cb_arg);
1714 	}
1715 	SPDK_NOTICELOG("ublk dev %d stopped\n", ublk->ublk_id);
1716 	free(ublk);
1717 }
1718 
1719 static int
1720 ublk_ios_init(struct spdk_ublk_dev *ublk)
1721 {
1722 	int rc;
1723 	uint32_t i, j;
1724 	struct ublk_queue *q;
1725 
1726 	for (i = 0; i < ublk->num_queues; i++) {
1727 		q = &ublk->queues[i];
1728 
1729 		TAILQ_INIT(&q->completed_io_list);
1730 		TAILQ_INIT(&q->inflight_io_list);
1731 		q->dev = ublk;
1732 		q->q_id = i;
1733 		q->q_depth = ublk->queue_depth;
1734 		q->ios = calloc(q->q_depth, sizeof(struct ublk_io));
1735 		if (!q->ios) {
1736 			rc = -ENOMEM;
1737 			SPDK_ERRLOG("could not allocate queue ios\n");
1738 			goto err;
1739 		}
1740 		for (j = 0; j < q->q_depth; j++) {
1741 			q->ios[j].q = q;
1742 		}
1743 	}
1744 
1745 	return 0;
1746 
1747 err:
1748 	for (i = 0; i < ublk->num_queues; i++) {
1749 		free(q->ios);
1750 		q->ios = NULL;
1751 	}
1752 	return rc;
1753 }
1754 
1755 static void
1756 ublk_queue_run(void *arg1)
1757 {
1758 	struct ublk_queue	*q = arg1;
1759 	struct spdk_ublk_dev *ublk = q->dev;
1760 	struct ublk_poll_group *poll_group = q->poll_group;
1761 
1762 	assert(spdk_get_thread() == poll_group->ublk_thread);
1763 	q->bdev_ch = spdk_bdev_get_io_channel(ublk->bdev_desc);
1764 	/* Queues must be filled with IO in the io pthread */
1765 	ublk_dev_queue_io_init(q);
1766 
1767 	TAILQ_INSERT_TAIL(&poll_group->queue_list, q, tailq);
1768 }
1769 
1770 int
1771 ublk_start_disk(const char *bdev_name, uint32_t ublk_id,
1772 		uint32_t num_queues, uint32_t queue_depth,
1773 		ublk_start_cb start_cb, void *cb_arg)
1774 {
1775 	int			rc;
1776 	uint32_t		i;
1777 	struct spdk_bdev	*bdev;
1778 	struct spdk_ublk_dev	*ublk = NULL;
1779 	uint32_t		sector_per_block;
1780 
1781 	assert(spdk_thread_is_app_thread(NULL));
1782 
1783 	if (g_ublk_tgt.active == false) {
1784 		SPDK_ERRLOG("NO ublk target exist\n");
1785 		return -ENODEV;
1786 	}
1787 
1788 	ublk = ublk_dev_find_by_id(ublk_id);
1789 	if (ublk != NULL) {
1790 		SPDK_DEBUGLOG(ublk, "ublk id %d is in use.\n", ublk_id);
1791 		return -EBUSY;
1792 	}
1793 
1794 	if (g_ublk_tgt.num_ublk_devs >= g_ublks_max) {
1795 		SPDK_DEBUGLOG(ublk, "Reached maximum number of supported devices: %u\n", g_ublks_max);
1796 		return -ENOTSUP;
1797 	}
1798 
1799 	ublk = calloc(1, sizeof(*ublk));
1800 	if (ublk == NULL) {
1801 		return -ENOMEM;
1802 	}
1803 	ublk->start_cb = start_cb;
1804 	ublk->cb_arg = cb_arg;
1805 	ublk->cdev_fd = -1;
1806 	ublk->ublk_id = ublk_id;
1807 	UBLK_DEBUGLOG(ublk, "bdev %s num_queues %d queue_depth %d\n",
1808 		      bdev_name, num_queues, queue_depth);
1809 
1810 	rc = spdk_bdev_open_ext(bdev_name, true, ublk_bdev_event_cb, ublk, &ublk->bdev_desc);
1811 	if (rc != 0) {
1812 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc);
1813 		free(ublk);
1814 		return rc;
1815 	}
1816 
1817 	bdev = spdk_bdev_desc_get_bdev(ublk->bdev_desc);
1818 	ublk->bdev = bdev;
1819 	sector_per_block = spdk_bdev_get_data_block_size(ublk->bdev) >> LINUX_SECTOR_SHIFT;
1820 	ublk->sector_per_block_shift = spdk_u32log2(sector_per_block);
1821 
1822 	ublk->queues_closed = 0;
1823 	ublk->num_queues = num_queues;
1824 	ublk->queue_depth = queue_depth;
1825 	if (ublk->queue_depth > UBLK_DEV_MAX_QUEUE_DEPTH) {
1826 		SPDK_WARNLOG("Set Queue depth %d of UBLK %d to maximum %d\n",
1827 			     ublk->queue_depth, ublk->ublk_id, UBLK_DEV_MAX_QUEUE_DEPTH);
1828 		ublk->queue_depth = UBLK_DEV_MAX_QUEUE_DEPTH;
1829 	}
1830 	if (ublk->num_queues > UBLK_DEV_MAX_QUEUES) {
1831 		SPDK_WARNLOG("Set Queue num %d of UBLK %d to maximum %d\n",
1832 			     ublk->num_queues, ublk->ublk_id, UBLK_DEV_MAX_QUEUES);
1833 		ublk->num_queues = UBLK_DEV_MAX_QUEUES;
1834 	}
1835 	for (i = 0; i < ublk->num_queues; i++) {
1836 		ublk->queues[i].ring.ring_fd = -1;
1837 	}
1838 
1839 	ublk_info_param_init(ublk);
1840 	rc = ublk_ios_init(ublk);
1841 	if (rc != 0) {
1842 		spdk_bdev_close(ublk->bdev_desc);
1843 		free(ublk);
1844 		return rc;
1845 	}
1846 
1847 	SPDK_INFOLOG(ublk, "Enabling kernel access to bdev %s via ublk %d\n",
1848 		     bdev_name, ublk_id);
1849 
1850 	/* Add ublk_dev to the end of disk list */
1851 	ublk_dev_list_register(ublk);
1852 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_ADD_DEV);
1853 	if (rc < 0) {
1854 		SPDK_ERRLOG("UBLK can't add dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
1855 		ublk_free_dev(ublk);
1856 	}
1857 
1858 	return rc;
1859 }
1860 
1861 static void
1862 ublk_finish_start(struct spdk_ublk_dev *ublk)
1863 {
1864 	int			rc;
1865 	uint32_t		q_id;
1866 	struct spdk_thread	*ublk_thread;
1867 	char			buf[64];
1868 
1869 	snprintf(buf, 64, "%s%d", UBLK_BLK_CDEV, ublk->ublk_id);
1870 	ublk->cdev_fd = open(buf, O_RDWR);
1871 	if (ublk->cdev_fd < 0) {
1872 		rc = ublk->cdev_fd;
1873 		SPDK_ERRLOG("can't open %s, rc %d\n", buf, rc);
1874 		goto err;
1875 	}
1876 
1877 	for (q_id = 0; q_id < ublk->num_queues; q_id++) {
1878 		rc = ublk_dev_queue_init(&ublk->queues[q_id]);
1879 		if (rc) {
1880 			goto err;
1881 		}
1882 	}
1883 
1884 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_START_DEV);
1885 	if (rc < 0) {
1886 		SPDK_ERRLOG("start dev %d failed, rc %s\n", ublk->ublk_id,
1887 			    spdk_strerror(-rc));
1888 		goto err;
1889 	}
1890 
1891 	/* Send queue to different spdk_threads for load balance */
1892 	for (q_id = 0; q_id < ublk->num_queues; q_id++) {
1893 		ublk->queues[q_id].poll_group = &g_ublk_tgt.poll_groups[g_next_ublk_poll_group];
1894 		ublk_thread = g_ublk_tgt.poll_groups[g_next_ublk_poll_group].ublk_thread;
1895 		spdk_thread_send_msg(ublk_thread, ublk_queue_run, &ublk->queues[q_id]);
1896 		g_next_ublk_poll_group++;
1897 		if (g_next_ublk_poll_group == g_num_ublk_poll_groups) {
1898 			g_next_ublk_poll_group = 0;
1899 		}
1900 	}
1901 
1902 	goto out;
1903 
1904 err:
1905 	ublk_delete_dev(ublk);
1906 out:
1907 	if (rc < 0 && ublk->start_cb) {
1908 		ublk->start_cb(ublk->cb_arg, rc);
1909 		ublk->start_cb = NULL;
1910 	}
1911 }
1912 
1913 SPDK_LOG_REGISTER_COMPONENT(ublk)
1914 SPDK_LOG_REGISTER_COMPONENT(ublk_io)
1915