xref: /spdk/lib/ublk/ublk.c (revision 34efb6523e316572db6767d44c34a1431b7530dd)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2022 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include <liburing.h>
7 
8 #include "spdk/stdinc.h"
9 #include "spdk/string.h"
10 #include "spdk/bdev.h"
11 #include "spdk/endian.h"
12 #include "spdk/env.h"
13 #include "spdk/likely.h"
14 #include "spdk/log.h"
15 #include "spdk/util.h"
16 #include "spdk/queue.h"
17 #include "spdk/json.h"
18 #include "spdk/ublk.h"
19 #include "spdk/thread.h"
20 #include "spdk/file.h"
21 
22 #include "ublk_internal.h"
23 
24 #define UBLK_CTRL_DEV					"/dev/ublk-control"
25 #define UBLK_BLK_CDEV					"/dev/ublkc"
26 
27 #define LINUX_SECTOR_SHIFT				9
28 #define UBLK_IO_MAX_BYTES				SPDK_BDEV_LARGE_BUF_MAX_SIZE
29 #define UBLK_DEV_MAX_QUEUES				32
30 #define UBLK_DEV_MAX_QUEUE_DEPTH			1024
31 #define UBLK_QUEUE_REQUEST				32
32 #define UBLK_STOP_BUSY_WAITING_MS			10000
33 #define UBLK_BUSY_POLLING_INTERVAL_US			20000
34 #define UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US	1000
35 /* By default, kernel ublk_drv driver can support up to 64 block devices */
36 #define UBLK_DEFAULT_MAX_SUPPORTED_DEVS			64
37 
38 #define UBLK_IOBUF_SMALL_CACHE_SIZE			128
39 #define UBLK_IOBUF_LARGE_CACHE_SIZE			32
40 
41 #define UBLK_DEBUGLOG(ublk, format, ...) \
42 	SPDK_DEBUGLOG(ublk, "ublk%d: " format, ublk->ublk_id, ##__VA_ARGS__);
43 
44 static uint32_t g_num_ublk_poll_groups = 0;
45 static uint32_t g_next_ublk_poll_group = 0;
46 static uint32_t g_ublks_max = UBLK_DEFAULT_MAX_SUPPORTED_DEVS;
47 static struct spdk_cpuset g_core_mask;
48 static bool g_disable_user_copy = false;
49 
50 struct ublk_queue;
51 struct ublk_poll_group;
52 struct ublk_io;
53 static void _ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io);
54 static void ublk_dev_queue_fini(struct ublk_queue *q);
55 static int ublk_poll(void *arg);
56 
57 static int ublk_set_params(struct spdk_ublk_dev *ublk);
58 static int ublk_start_dev(struct spdk_ublk_dev *ublk, bool is_recovering);
59 static void ublk_free_dev(struct spdk_ublk_dev *ublk);
60 static void ublk_delete_dev(void *arg);
61 static int ublk_close_dev(struct spdk_ublk_dev *ublk);
62 static int ublk_ctrl_start_recovery(struct spdk_ublk_dev *ublk);
63 
64 static int ublk_ctrl_cmd_submit(struct spdk_ublk_dev *ublk, uint32_t cmd_op);
65 
66 static const char *ublk_op_name[64] = {
67 	[UBLK_CMD_GET_DEV_INFO] = "UBLK_CMD_GET_DEV_INFO",
68 	[UBLK_CMD_ADD_DEV] =	"UBLK_CMD_ADD_DEV",
69 	[UBLK_CMD_DEL_DEV] =	"UBLK_CMD_DEL_DEV",
70 	[UBLK_CMD_START_DEV] =	"UBLK_CMD_START_DEV",
71 	[UBLK_CMD_STOP_DEV] =	"UBLK_CMD_STOP_DEV",
72 	[UBLK_CMD_SET_PARAMS] =	"UBLK_CMD_SET_PARAMS",
73 	[UBLK_CMD_START_USER_RECOVERY] = "UBLK_CMD_START_USER_RECOVERY",
74 	[UBLK_CMD_END_USER_RECOVERY] = "UBLK_CMD_END_USER_RECOVERY",
75 };
76 
77 typedef void (*ublk_get_buf_cb)(struct ublk_io *io);
78 
79 struct ublk_io {
80 	void			*payload;
81 	void			*mpool_entry;
82 	bool			need_data;
83 	bool			user_copy;
84 	uint16_t		tag;
85 	uint64_t		payload_size;
86 	uint32_t		cmd_op;
87 	int32_t			result;
88 	struct spdk_bdev_desc	*bdev_desc;
89 	struct spdk_io_channel	*bdev_ch;
90 	const struct ublksrv_io_desc	*iod;
91 	ublk_get_buf_cb		get_buf_cb;
92 	struct ublk_queue	*q;
93 	/* for bdev io_wait */
94 	struct spdk_bdev_io_wait_entry bdev_io_wait;
95 	struct spdk_iobuf_entry	iobuf;
96 
97 	TAILQ_ENTRY(ublk_io)	tailq;
98 };
99 
100 struct ublk_queue {
101 	uint32_t		q_id;
102 	uint32_t		q_depth;
103 	struct ublk_io		*ios;
104 	TAILQ_HEAD(, ublk_io)	completed_io_list;
105 	TAILQ_HEAD(, ublk_io)	inflight_io_list;
106 	uint32_t		cmd_inflight;
107 	bool			is_stopping;
108 	struct ublksrv_io_desc	*io_cmd_buf;
109 	/* ring depth == dev_info->queue_depth. */
110 	struct io_uring		ring;
111 	struct spdk_ublk_dev	*dev;
112 	struct ublk_poll_group	*poll_group;
113 	struct spdk_io_channel	*bdev_ch;
114 
115 	TAILQ_ENTRY(ublk_queue)	tailq;
116 };
117 
118 struct spdk_ublk_dev {
119 	struct spdk_bdev	*bdev;
120 	struct spdk_bdev_desc	*bdev_desc;
121 
122 	int			cdev_fd;
123 	struct ublk_params	dev_params;
124 	struct ublksrv_ctrl_dev_info	dev_info;
125 
126 	uint32_t		ublk_id;
127 	uint32_t		num_queues;
128 	uint32_t		queue_depth;
129 	uint32_t		online_num_queues;
130 	uint32_t		sector_per_block_shift;
131 	struct ublk_queue	queues[UBLK_DEV_MAX_QUEUES];
132 
133 	struct spdk_poller	*retry_poller;
134 	int			retry_count;
135 	uint32_t		queues_closed;
136 	ublk_ctrl_cb		ctrl_cb;
137 	void			*cb_arg;
138 	uint32_t		current_cmd_op;
139 	uint32_t		ctrl_ops_in_progress;
140 	bool			is_closing;
141 	bool			is_recovering;
142 
143 	TAILQ_ENTRY(spdk_ublk_dev) tailq;
144 	TAILQ_ENTRY(spdk_ublk_dev) wait_tailq;
145 };
146 
147 struct ublk_poll_group {
148 	struct spdk_thread		*ublk_thread;
149 	struct spdk_poller		*ublk_poller;
150 	struct spdk_iobuf_channel	iobuf_ch;
151 	TAILQ_HEAD(, ublk_queue)	queue_list;
152 };
153 
154 struct ublk_tgt {
155 	int			ctrl_fd;
156 	bool			active;
157 	bool			is_destroying;
158 	spdk_ublk_fini_cb	cb_fn;
159 	void			*cb_arg;
160 	struct io_uring		ctrl_ring;
161 	struct spdk_poller	*ctrl_poller;
162 	uint32_t		ctrl_ops_in_progress;
163 	struct ublk_poll_group	*poll_groups;
164 	uint32_t		num_ublk_devs;
165 	uint64_t		features;
166 	/* `ublk_drv` supports UBLK_F_CMD_IOCTL_ENCODE */
167 	bool			ioctl_encode;
168 	/* `ublk_drv` supports UBLK_F_USER_COPY */
169 	bool			user_copy;
170 	/* `ublk_drv` supports UBLK_F_USER_RECOVERY */
171 	bool			user_recovery;
172 };
173 
174 static TAILQ_HEAD(, spdk_ublk_dev) g_ublk_devs = TAILQ_HEAD_INITIALIZER(g_ublk_devs);
175 static struct ublk_tgt g_ublk_tgt;
176 
177 /* helpers for using io_uring */
178 static inline int
179 ublk_setup_ring(uint32_t depth, struct io_uring *r, unsigned flags)
180 {
181 	struct io_uring_params p = {};
182 
183 	p.flags = flags | IORING_SETUP_CQSIZE;
184 	p.cq_entries = depth;
185 
186 	return io_uring_queue_init_params(depth, r, &p);
187 }
188 
189 static inline struct io_uring_sqe *
190 ublk_uring_get_sqe(struct io_uring *r, uint32_t idx)
191 {
192 	/* Need to update the idx since we set IORING_SETUP_SQE128 parameter in ublk_setup_ring */
193 	return &r->sq.sqes[idx << 1];
194 }
195 
196 static inline void *
197 ublk_get_sqe_cmd(struct io_uring_sqe *sqe)
198 {
199 	return (void *)&sqe->addr3;
200 }
201 
202 static inline void
203 ublk_set_sqe_cmd_op(struct io_uring_sqe *sqe, uint32_t cmd_op)
204 {
205 	uint32_t opc = cmd_op;
206 
207 	if (g_ublk_tgt.ioctl_encode) {
208 		switch (cmd_op) {
209 		/* ctrl uring */
210 		case UBLK_CMD_GET_DEV_INFO:
211 			opc = _IOR('u', UBLK_CMD_GET_DEV_INFO, struct ublksrv_ctrl_cmd);
212 			break;
213 		case UBLK_CMD_ADD_DEV:
214 			opc = _IOWR('u', UBLK_CMD_ADD_DEV, struct ublksrv_ctrl_cmd);
215 			break;
216 		case UBLK_CMD_DEL_DEV:
217 			opc = _IOWR('u', UBLK_CMD_DEL_DEV, struct ublksrv_ctrl_cmd);
218 			break;
219 		case UBLK_CMD_START_DEV:
220 			opc = _IOWR('u', UBLK_CMD_START_DEV, struct ublksrv_ctrl_cmd);
221 			break;
222 		case UBLK_CMD_STOP_DEV:
223 			opc = _IOWR('u', UBLK_CMD_STOP_DEV, struct ublksrv_ctrl_cmd);
224 			break;
225 		case UBLK_CMD_SET_PARAMS:
226 			opc = _IOWR('u', UBLK_CMD_SET_PARAMS, struct ublksrv_ctrl_cmd);
227 			break;
228 		case UBLK_CMD_START_USER_RECOVERY:
229 			opc = _IOWR('u', UBLK_CMD_START_USER_RECOVERY, struct ublksrv_ctrl_cmd);
230 			break;
231 		case UBLK_CMD_END_USER_RECOVERY:
232 			opc = _IOWR('u', UBLK_CMD_END_USER_RECOVERY, struct ublksrv_ctrl_cmd);
233 			break;
234 
235 		/* io uring */
236 		case UBLK_IO_FETCH_REQ:
237 			opc = _IOWR('u', UBLK_IO_FETCH_REQ, struct ublksrv_io_cmd);
238 			break;
239 		case UBLK_IO_COMMIT_AND_FETCH_REQ:
240 			opc = _IOWR('u', UBLK_IO_COMMIT_AND_FETCH_REQ, struct ublksrv_io_cmd);
241 			break;
242 		case UBLK_IO_NEED_GET_DATA:
243 			opc = _IOWR('u', UBLK_IO_NEED_GET_DATA, struct ublksrv_io_cmd);
244 			break;
245 		default:
246 			break;
247 		}
248 	}
249 
250 	sqe->off = opc;
251 }
252 
253 static inline uint64_t
254 build_user_data(uint16_t tag, uint8_t op)
255 {
256 	assert(!(tag >> 16) && !(op >> 8));
257 
258 	return tag | (op << 16);
259 }
260 
261 static inline uint16_t
262 user_data_to_tag(uint64_t user_data)
263 {
264 	return user_data & 0xffff;
265 }
266 
267 static inline uint8_t
268 user_data_to_op(uint64_t user_data)
269 {
270 	return (user_data >> 16) & 0xff;
271 }
272 
273 static inline uint64_t
274 ublk_user_copy_pos(uint16_t q_id, uint16_t tag)
275 {
276 	return (uint64_t)UBLKSRV_IO_BUF_OFFSET + ((((uint64_t)q_id) << UBLK_QID_OFF) | (((
277 				uint64_t)tag) << UBLK_TAG_OFF));
278 }
279 
280 void
281 spdk_ublk_init(void)
282 {
283 	assert(spdk_thread_is_app_thread(NULL));
284 
285 	g_ublk_tgt.ctrl_fd = -1;
286 	g_ublk_tgt.ctrl_ring.ring_fd = -1;
287 }
288 
289 static void
290 ublk_ctrl_cmd_error(struct spdk_ublk_dev *ublk, int32_t res)
291 {
292 	assert(res != 0);
293 
294 	SPDK_ERRLOG("ctrlr cmd %s failed, %s\n", ublk_op_name[ublk->current_cmd_op], spdk_strerror(-res));
295 	if (ublk->ctrl_cb) {
296 		ublk->ctrl_cb(ublk->cb_arg, res);
297 		ublk->ctrl_cb = NULL;
298 	}
299 
300 	switch (ublk->current_cmd_op) {
301 	case UBLK_CMD_ADD_DEV:
302 	case UBLK_CMD_SET_PARAMS:
303 	case UBLK_CMD_START_USER_RECOVERY:
304 	case UBLK_CMD_END_USER_RECOVERY:
305 		ublk_delete_dev(ublk);
306 		break;
307 	case UBLK_CMD_START_DEV:
308 		ublk_close_dev(ublk);
309 		break;
310 	case UBLK_CMD_GET_DEV_INFO:
311 		ublk_free_dev(ublk);
312 		break;
313 	case UBLK_CMD_STOP_DEV:
314 	case UBLK_CMD_DEL_DEV:
315 		break;
316 	default:
317 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", ublk->current_cmd_op);
318 		break;
319 	}
320 }
321 
322 static void
323 ublk_ctrl_process_cqe(struct io_uring_cqe *cqe)
324 {
325 	struct spdk_ublk_dev *ublk;
326 	int rc = 0;
327 
328 	ublk = (struct spdk_ublk_dev *)cqe->user_data;
329 	UBLK_DEBUGLOG(ublk, "ctrl cmd %s completed\n", ublk_op_name[ublk->current_cmd_op]);
330 	ublk->ctrl_ops_in_progress--;
331 
332 	if (spdk_unlikely(cqe->res != 0)) {
333 		ublk_ctrl_cmd_error(ublk, cqe->res);
334 		return;
335 	}
336 
337 	switch (ublk->current_cmd_op) {
338 	case UBLK_CMD_ADD_DEV:
339 		rc = ublk_set_params(ublk);
340 		if (rc < 0) {
341 			ublk_delete_dev(ublk);
342 			goto cb_done;
343 		}
344 		break;
345 	case UBLK_CMD_SET_PARAMS:
346 		rc = ublk_start_dev(ublk, false);
347 		if (rc < 0) {
348 			ublk_delete_dev(ublk);
349 			goto cb_done;
350 		}
351 		break;
352 	case UBLK_CMD_START_DEV:
353 		goto cb_done;
354 		break;
355 	case UBLK_CMD_STOP_DEV:
356 		break;
357 	case UBLK_CMD_DEL_DEV:
358 		if (ublk->ctrl_cb) {
359 			ublk->ctrl_cb(ublk->cb_arg, 0);
360 			ublk->ctrl_cb = NULL;
361 		}
362 		ublk_free_dev(ublk);
363 		break;
364 	case UBLK_CMD_GET_DEV_INFO:
365 		rc = ublk_ctrl_start_recovery(ublk);
366 		if (rc < 0) {
367 			ublk_delete_dev(ublk);
368 			goto cb_done;
369 		}
370 		break;
371 	case UBLK_CMD_START_USER_RECOVERY:
372 		rc = ublk_start_dev(ublk, true);
373 		if (rc < 0) {
374 			ublk_delete_dev(ublk);
375 			goto cb_done;
376 		}
377 		break;
378 	case UBLK_CMD_END_USER_RECOVERY:
379 		SPDK_NOTICELOG("Ublk %u recover done successfully\n", ublk->ublk_id);
380 		ublk->is_recovering = false;
381 		goto cb_done;
382 		break;
383 	default:
384 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", ublk->current_cmd_op);
385 		break;
386 	}
387 
388 	return;
389 
390 cb_done:
391 	if (ublk->ctrl_cb) {
392 		ublk->ctrl_cb(ublk->cb_arg, rc);
393 		ublk->ctrl_cb = NULL;
394 	}
395 }
396 
397 static int
398 ublk_ctrl_poller(void *arg)
399 {
400 	struct io_uring *ring = &g_ublk_tgt.ctrl_ring;
401 	struct io_uring_cqe *cqe;
402 	const int max = 8;
403 	int i, count = 0, rc;
404 
405 	if (!g_ublk_tgt.ctrl_ops_in_progress) {
406 		return SPDK_POLLER_IDLE;
407 	}
408 
409 	for (i = 0; i < max; i++) {
410 		rc = io_uring_peek_cqe(ring, &cqe);
411 		if (rc == -EAGAIN) {
412 			break;
413 		}
414 
415 		assert(cqe != NULL);
416 		g_ublk_tgt.ctrl_ops_in_progress--;
417 
418 		ublk_ctrl_process_cqe(cqe);
419 
420 		io_uring_cqe_seen(ring, cqe);
421 		count++;
422 	}
423 
424 	return count > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
425 }
426 
427 static int
428 ublk_ctrl_cmd_submit(struct spdk_ublk_dev *ublk, uint32_t cmd_op)
429 {
430 	uint32_t dev_id = ublk->ublk_id;
431 	int rc = -EINVAL;
432 	struct io_uring_sqe *sqe;
433 	struct ublksrv_ctrl_cmd *cmd;
434 
435 	UBLK_DEBUGLOG(ublk, "ctrl cmd %s\n", ublk_op_name[cmd_op]);
436 
437 	sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring);
438 	if (!sqe) {
439 		SPDK_ERRLOG("No available sqe in ctrl ring\n");
440 		assert(false);
441 		return -ENOENT;
442 	}
443 
444 	cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe);
445 	sqe->fd = g_ublk_tgt.ctrl_fd;
446 	sqe->opcode = IORING_OP_URING_CMD;
447 	sqe->ioprio = 0;
448 	cmd->dev_id = dev_id;
449 	cmd->queue_id = -1;
450 	ublk->current_cmd_op = cmd_op;
451 
452 	switch (cmd_op) {
453 	case UBLK_CMD_ADD_DEV:
454 	case UBLK_CMD_GET_DEV_INFO:
455 		cmd->addr = (__u64)(uintptr_t)&ublk->dev_info;
456 		cmd->len = sizeof(ublk->dev_info);
457 		break;
458 	case UBLK_CMD_SET_PARAMS:
459 		cmd->addr = (__u64)(uintptr_t)&ublk->dev_params;
460 		cmd->len = sizeof(ublk->dev_params);
461 		break;
462 	case UBLK_CMD_START_DEV:
463 		cmd->data[0] = getpid();
464 		break;
465 	case UBLK_CMD_STOP_DEV:
466 		break;
467 	case UBLK_CMD_DEL_DEV:
468 		break;
469 	case UBLK_CMD_START_USER_RECOVERY:
470 		break;
471 	case UBLK_CMD_END_USER_RECOVERY:
472 		cmd->data[0] = getpid();
473 		break;
474 	default:
475 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", cmd_op);
476 		return -EINVAL;
477 	}
478 	ublk_set_sqe_cmd_op(sqe, cmd_op);
479 	io_uring_sqe_set_data(sqe, ublk);
480 
481 	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
482 	if (rc < 0) {
483 		SPDK_ERRLOG("uring submit rc %d\n", rc);
484 		assert(false);
485 		return rc;
486 	}
487 	g_ublk_tgt.ctrl_ops_in_progress++;
488 	ublk->ctrl_ops_in_progress++;
489 
490 	return 0;
491 }
492 
493 static int
494 ublk_ctrl_cmd_get_features(void)
495 {
496 	int rc;
497 	struct io_uring_sqe *sqe;
498 	struct io_uring_cqe *cqe;
499 	struct ublksrv_ctrl_cmd *cmd;
500 	uint32_t cmd_op;
501 
502 	sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring);
503 	if (!sqe) {
504 		SPDK_ERRLOG("No available sqe in ctrl ring\n");
505 		assert(false);
506 		return -ENOENT;
507 	}
508 
509 	cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe);
510 	sqe->fd = g_ublk_tgt.ctrl_fd;
511 	sqe->opcode = IORING_OP_URING_CMD;
512 	sqe->ioprio = 0;
513 	cmd->dev_id = -1;
514 	cmd->queue_id = -1;
515 	cmd->addr = (__u64)(uintptr_t)&g_ublk_tgt.features;
516 	cmd->len = sizeof(g_ublk_tgt.features);
517 
518 	cmd_op = UBLK_U_CMD_GET_FEATURES;
519 	ublk_set_sqe_cmd_op(sqe, cmd_op);
520 
521 	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
522 	if (rc < 0) {
523 		SPDK_ERRLOG("uring submit rc %d\n", rc);
524 		return rc;
525 	}
526 
527 	rc = io_uring_wait_cqe(&g_ublk_tgt.ctrl_ring, &cqe);
528 	if (rc < 0) {
529 		SPDK_ERRLOG("wait cqe rc %d\n", rc);
530 		return rc;
531 	}
532 
533 	if (cqe->res == 0) {
534 		g_ublk_tgt.ioctl_encode = !!(g_ublk_tgt.features & UBLK_F_CMD_IOCTL_ENCODE);
535 		g_ublk_tgt.user_copy = !!(g_ublk_tgt.features & UBLK_F_USER_COPY);
536 		g_ublk_tgt.user_copy &= !g_disable_user_copy;
537 		g_ublk_tgt.user_recovery = !!(g_ublk_tgt.features & UBLK_F_USER_RECOVERY);
538 		SPDK_NOTICELOG("User Copy %s\n", g_ublk_tgt.user_copy ? "enabled" : "disabled");
539 	}
540 	io_uring_cqe_seen(&g_ublk_tgt.ctrl_ring, cqe);
541 
542 	return 0;
543 }
544 
545 static int
546 ublk_queue_cmd_buf_sz(uint32_t q_depth)
547 {
548 	uint32_t size = q_depth * sizeof(struct ublksrv_io_desc);
549 	uint32_t page_sz = getpagesize();
550 
551 	/* round up size */
552 	return (size + page_sz - 1) & ~(page_sz - 1);
553 }
554 
555 static int
556 ublk_open(void)
557 {
558 	uint32_t ublks_max;
559 	int rc;
560 
561 	g_ublk_tgt.ctrl_fd = open(UBLK_CTRL_DEV, O_RDWR);
562 	if (g_ublk_tgt.ctrl_fd < 0) {
563 		rc = errno;
564 		SPDK_ERRLOG("UBLK conrol dev %s can't be opened, error=%s\n", UBLK_CTRL_DEV, spdk_strerror(errno));
565 		return -rc;
566 	}
567 
568 	rc = spdk_read_sysfs_attribute_uint32(&ublks_max, "%s",
569 					      "/sys/module/ublk_drv/parameters/ublks_max");
570 	if (rc == 0 && ublks_max > 0) {
571 		g_ublks_max = ublks_max;
572 	}
573 
574 	/* We need to set SQPOLL for kernels 6.1 and earlier, since they would not defer ublk ctrl
575 	 * ring processing to a workqueue.  Ctrl ring processing is minimal, so SQPOLL is fine.
576 	 * All the commands sent via control uring for a ublk device is executed one by one, so use
577 	 * ublks_max * 2 as the number of uring entries is enough.
578 	 */
579 	rc = ublk_setup_ring(g_ublks_max * 2, &g_ublk_tgt.ctrl_ring,
580 			     IORING_SETUP_SQE128 | IORING_SETUP_SQPOLL);
581 	if (rc < 0) {
582 		SPDK_ERRLOG("UBLK ctrl queue_init: %s\n", spdk_strerror(-rc));
583 		goto err;
584 	}
585 
586 	rc = ublk_ctrl_cmd_get_features();
587 	if (rc) {
588 		goto err;
589 	}
590 
591 	return 0;
592 
593 err:
594 	close(g_ublk_tgt.ctrl_fd);
595 	g_ublk_tgt.ctrl_fd = -1;
596 	return rc;
597 }
598 
599 static int
600 ublk_parse_core_mask(const char *mask)
601 {
602 	struct spdk_cpuset tmp_mask;
603 	int rc;
604 
605 	if (mask == NULL) {
606 		spdk_env_get_cpuset(&g_core_mask);
607 		return 0;
608 	}
609 
610 	rc = spdk_cpuset_parse(&g_core_mask, mask);
611 	if (rc < 0) {
612 		SPDK_ERRLOG("invalid cpumask %s\n", mask);
613 		return -EINVAL;
614 	}
615 
616 	if (spdk_cpuset_count(&g_core_mask) == 0) {
617 		SPDK_ERRLOG("no cpus specified\n");
618 		return -EINVAL;
619 	}
620 
621 	spdk_env_get_cpuset(&tmp_mask);
622 	spdk_cpuset_and(&tmp_mask, &g_core_mask);
623 
624 	if (!spdk_cpuset_equal(&tmp_mask, &g_core_mask)) {
625 		SPDK_ERRLOG("one of selected cpu is outside of core mask(=%s)\n",
626 			    spdk_cpuset_fmt(&g_core_mask));
627 		return -EINVAL;
628 	}
629 
630 	return 0;
631 }
632 
633 static void
634 ublk_poller_register(void *args)
635 {
636 	struct ublk_poll_group *poll_group = args;
637 	int rc;
638 
639 	assert(spdk_get_thread() == poll_group->ublk_thread);
640 	/* Bind ublk spdk_thread to current CPU core in order to avoid thread context switch
641 	 * during uring processing as required by ublk kernel.
642 	 */
643 	spdk_thread_bind(spdk_get_thread(), true);
644 
645 	TAILQ_INIT(&poll_group->queue_list);
646 	poll_group->ublk_poller = SPDK_POLLER_REGISTER(ublk_poll, poll_group, 0);
647 	rc = spdk_iobuf_channel_init(&poll_group->iobuf_ch, "ublk",
648 				     UBLK_IOBUF_SMALL_CACHE_SIZE, UBLK_IOBUF_LARGE_CACHE_SIZE);
649 	if (rc != 0) {
650 		assert(false);
651 	}
652 }
653 
654 struct rpc_create_target {
655 	bool disable_user_copy;
656 };
657 
658 static const struct spdk_json_object_decoder rpc_ublk_create_target[] = {
659 	{"disable_user_copy", offsetof(struct rpc_create_target, disable_user_copy), spdk_json_decode_bool, true},
660 };
661 
662 int
663 ublk_create_target(const char *cpumask_str, const struct spdk_json_val *params)
664 {
665 	int rc;
666 	uint32_t i;
667 	char thread_name[32];
668 	struct rpc_create_target req = {};
669 	struct ublk_poll_group *poll_group;
670 
671 	if (g_ublk_tgt.active == true) {
672 		SPDK_ERRLOG("UBLK target has been created\n");
673 		return -EBUSY;
674 	}
675 
676 	rc = ublk_parse_core_mask(cpumask_str);
677 	if (rc != 0) {
678 		return rc;
679 	}
680 
681 	if (params) {
682 		if (spdk_json_decode_object_relaxed(params, rpc_ublk_create_target,
683 						    SPDK_COUNTOF(rpc_ublk_create_target),
684 						    &req)) {
685 			SPDK_ERRLOG("spdk_json_decode_object failed\n");
686 			return -EINVAL;
687 		}
688 		g_disable_user_copy = req.disable_user_copy;
689 	}
690 
691 	assert(g_ublk_tgt.poll_groups == NULL);
692 	g_ublk_tgt.poll_groups = calloc(spdk_env_get_core_count(), sizeof(*poll_group));
693 	if (!g_ublk_tgt.poll_groups) {
694 		return -ENOMEM;
695 	}
696 
697 	rc = ublk_open();
698 	if (rc != 0) {
699 		SPDK_ERRLOG("Fail to open UBLK, error=%s\n", spdk_strerror(-rc));
700 		free(g_ublk_tgt.poll_groups);
701 		g_ublk_tgt.poll_groups = NULL;
702 		return rc;
703 	}
704 
705 	spdk_iobuf_register_module("ublk");
706 
707 	SPDK_ENV_FOREACH_CORE(i) {
708 		if (!spdk_cpuset_get_cpu(&g_core_mask, i)) {
709 			continue;
710 		}
711 		snprintf(thread_name, sizeof(thread_name), "ublk_thread%u", i);
712 		poll_group = &g_ublk_tgt.poll_groups[g_num_ublk_poll_groups];
713 		poll_group->ublk_thread = spdk_thread_create(thread_name, &g_core_mask);
714 		spdk_thread_send_msg(poll_group->ublk_thread, ublk_poller_register, poll_group);
715 		g_num_ublk_poll_groups++;
716 	}
717 
718 	assert(spdk_thread_is_app_thread(NULL));
719 	g_ublk_tgt.active = true;
720 	g_ublk_tgt.ctrl_ops_in_progress = 0;
721 	g_ublk_tgt.ctrl_poller = SPDK_POLLER_REGISTER(ublk_ctrl_poller, NULL,
722 				 UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US);
723 
724 	SPDK_NOTICELOG("UBLK target created successfully\n");
725 
726 	return 0;
727 }
728 
729 static void
730 _ublk_fini_done(void *args)
731 {
732 	SPDK_DEBUGLOG(ublk, "\n");
733 
734 	g_num_ublk_poll_groups = 0;
735 	g_next_ublk_poll_group = 0;
736 	g_ublk_tgt.is_destroying = false;
737 	g_ublk_tgt.active = false;
738 	g_ublk_tgt.features = 0;
739 	g_ublk_tgt.ioctl_encode = false;
740 	g_ublk_tgt.user_copy = false;
741 	g_ublk_tgt.user_recovery = false;
742 
743 	if (g_ublk_tgt.cb_fn) {
744 		g_ublk_tgt.cb_fn(g_ublk_tgt.cb_arg);
745 		g_ublk_tgt.cb_fn = NULL;
746 		g_ublk_tgt.cb_arg = NULL;
747 	}
748 
749 	if (g_ublk_tgt.poll_groups) {
750 		free(g_ublk_tgt.poll_groups);
751 		g_ublk_tgt.poll_groups = NULL;
752 	}
753 
754 }
755 
756 static void
757 ublk_thread_exit(void *args)
758 {
759 	struct spdk_thread *ublk_thread = spdk_get_thread();
760 	uint32_t i;
761 
762 	for (i = 0; i < g_num_ublk_poll_groups; i++) {
763 		if (g_ublk_tgt.poll_groups[i].ublk_thread == ublk_thread) {
764 			spdk_poller_unregister(&g_ublk_tgt.poll_groups[i].ublk_poller);
765 			spdk_iobuf_channel_fini(&g_ublk_tgt.poll_groups[i].iobuf_ch);
766 			spdk_thread_bind(ublk_thread, false);
767 			spdk_thread_exit(ublk_thread);
768 		}
769 	}
770 }
771 
772 static int
773 ublk_close_dev(struct spdk_ublk_dev *ublk)
774 {
775 	int rc;
776 
777 	/* set is_closing */
778 	if (ublk->is_closing) {
779 		return -EBUSY;
780 	}
781 	ublk->is_closing = true;
782 
783 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_STOP_DEV);
784 	if (rc < 0) {
785 		SPDK_ERRLOG("stop dev %d failed\n", ublk->ublk_id);
786 	}
787 	return rc;
788 }
789 
790 static void
791 _ublk_fini(void *args)
792 {
793 	struct spdk_ublk_dev	*ublk, *ublk_tmp;
794 
795 	TAILQ_FOREACH_SAFE(ublk, &g_ublk_devs, tailq, ublk_tmp) {
796 		ublk_close_dev(ublk);
797 	}
798 
799 	/* Check if all ublks closed */
800 	if (TAILQ_EMPTY(&g_ublk_devs)) {
801 		SPDK_DEBUGLOG(ublk, "finish shutdown\n");
802 		spdk_poller_unregister(&g_ublk_tgt.ctrl_poller);
803 		if (g_ublk_tgt.ctrl_ring.ring_fd >= 0) {
804 			io_uring_queue_exit(&g_ublk_tgt.ctrl_ring);
805 			g_ublk_tgt.ctrl_ring.ring_fd = -1;
806 		}
807 		if (g_ublk_tgt.ctrl_fd >= 0) {
808 			close(g_ublk_tgt.ctrl_fd);
809 			g_ublk_tgt.ctrl_fd = -1;
810 		}
811 		spdk_for_each_thread(ublk_thread_exit, NULL, _ublk_fini_done);
812 	} else {
813 		spdk_thread_send_msg(spdk_get_thread(), _ublk_fini, NULL);
814 	}
815 }
816 
817 int
818 spdk_ublk_fini(spdk_ublk_fini_cb cb_fn, void *cb_arg)
819 {
820 	assert(spdk_thread_is_app_thread(NULL));
821 
822 	if (g_ublk_tgt.is_destroying == true) {
823 		/* UBLK target is being destroying */
824 		return -EBUSY;
825 	}
826 	g_ublk_tgt.cb_fn = cb_fn;
827 	g_ublk_tgt.cb_arg = cb_arg;
828 	g_ublk_tgt.is_destroying = true;
829 	_ublk_fini(NULL);
830 
831 	return 0;
832 }
833 
834 int
835 ublk_destroy_target(spdk_ublk_fini_cb cb_fn, void *cb_arg)
836 {
837 	int rc;
838 
839 	if (g_ublk_tgt.active == false) {
840 		/* UBLK target has not been created */
841 		return -ENOENT;
842 	}
843 
844 	rc = spdk_ublk_fini(cb_fn, cb_arg);
845 
846 	return rc;
847 }
848 
849 struct spdk_ublk_dev *
850 ublk_dev_find_by_id(uint32_t ublk_id)
851 {
852 	struct spdk_ublk_dev *ublk;
853 
854 	/* check whether ublk has already been registered by ublk path. */
855 	TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) {
856 		if (ublk->ublk_id == ublk_id) {
857 			return ublk;
858 		}
859 	}
860 
861 	return NULL;
862 }
863 
864 uint32_t
865 ublk_dev_get_id(struct spdk_ublk_dev *ublk)
866 {
867 	return ublk->ublk_id;
868 }
869 
870 struct spdk_ublk_dev *ublk_dev_first(void)
871 {
872 	return TAILQ_FIRST(&g_ublk_devs);
873 }
874 
875 struct spdk_ublk_dev *ublk_dev_next(struct spdk_ublk_dev *prev)
876 {
877 	return TAILQ_NEXT(prev, tailq);
878 }
879 
880 uint32_t
881 ublk_dev_get_queue_depth(struct spdk_ublk_dev *ublk)
882 {
883 	return ublk->queue_depth;
884 }
885 
886 uint32_t
887 ublk_dev_get_num_queues(struct spdk_ublk_dev *ublk)
888 {
889 	return ublk->num_queues;
890 }
891 
892 const char *
893 ublk_dev_get_bdev_name(struct spdk_ublk_dev *ublk)
894 {
895 	return spdk_bdev_get_name(ublk->bdev);
896 }
897 
898 void
899 spdk_ublk_write_config_json(struct spdk_json_write_ctx *w)
900 {
901 	struct spdk_ublk_dev *ublk;
902 
903 	spdk_json_write_array_begin(w);
904 
905 	if (g_ublk_tgt.active) {
906 		spdk_json_write_object_begin(w);
907 
908 		spdk_json_write_named_string(w, "method", "ublk_create_target");
909 		spdk_json_write_named_object_begin(w, "params");
910 		spdk_json_write_named_string(w, "cpumask", spdk_cpuset_fmt(&g_core_mask));
911 		spdk_json_write_object_end(w);
912 
913 		spdk_json_write_object_end(w);
914 	}
915 
916 	TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) {
917 		spdk_json_write_object_begin(w);
918 
919 		spdk_json_write_named_string(w, "method", "ublk_start_disk");
920 
921 		spdk_json_write_named_object_begin(w, "params");
922 		spdk_json_write_named_string(w, "bdev_name", ublk_dev_get_bdev_name(ublk));
923 		spdk_json_write_named_uint32(w, "ublk_id", ublk->ublk_id);
924 		spdk_json_write_named_uint32(w, "num_queues", ublk->num_queues);
925 		spdk_json_write_named_uint32(w, "queue_depth", ublk->queue_depth);
926 		spdk_json_write_object_end(w);
927 
928 		spdk_json_write_object_end(w);
929 	}
930 
931 	spdk_json_write_array_end(w);
932 }
933 
934 static void
935 ublk_dev_list_register(struct spdk_ublk_dev *ublk)
936 {
937 	UBLK_DEBUGLOG(ublk, "add to tailq\n");
938 	TAILQ_INSERT_TAIL(&g_ublk_devs, ublk, tailq);
939 	g_ublk_tgt.num_ublk_devs++;
940 }
941 
942 static void
943 ublk_dev_list_unregister(struct spdk_ublk_dev *ublk)
944 {
945 	/*
946 	 * ublk device may be stopped before registered.
947 	 * check whether it was registered.
948 	 */
949 
950 	if (ublk_dev_find_by_id(ublk->ublk_id)) {
951 		UBLK_DEBUGLOG(ublk, "remove from tailq\n");
952 		TAILQ_REMOVE(&g_ublk_devs, ublk, tailq);
953 		assert(g_ublk_tgt.num_ublk_devs);
954 		g_ublk_tgt.num_ublk_devs--;
955 		return;
956 	}
957 
958 	UBLK_DEBUGLOG(ublk, "not found in tailq\n");
959 	assert(false);
960 }
961 
962 static void
963 ublk_delete_dev(void *arg)
964 {
965 	struct spdk_ublk_dev *ublk = arg;
966 	int rc = 0;
967 	uint32_t q_idx;
968 
969 	assert(spdk_thread_is_app_thread(NULL));
970 	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
971 		ublk_dev_queue_fini(&ublk->queues[q_idx]);
972 	}
973 
974 	if (ublk->cdev_fd >= 0) {
975 		close(ublk->cdev_fd);
976 	}
977 
978 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_DEL_DEV);
979 	if (rc < 0) {
980 		SPDK_ERRLOG("delete dev %d failed\n", ublk->ublk_id);
981 	}
982 }
983 
984 static int
985 _ublk_close_dev_retry(void *arg)
986 {
987 	struct spdk_ublk_dev *ublk = arg;
988 
989 	if (ublk->ctrl_ops_in_progress > 0) {
990 		if (ublk->retry_count-- > 0) {
991 			return SPDK_POLLER_BUSY;
992 		}
993 		SPDK_ERRLOG("Timeout on ctrl op completion.\n");
994 	}
995 	spdk_poller_unregister(&ublk->retry_poller);
996 	ublk_delete_dev(ublk);
997 	return SPDK_POLLER_BUSY;
998 }
999 
1000 static void
1001 ublk_try_close_dev(void *arg)
1002 {
1003 	struct spdk_ublk_dev *ublk = arg;
1004 
1005 	assert(spdk_thread_is_app_thread(NULL));
1006 
1007 	ublk->queues_closed += 1;
1008 	SPDK_DEBUGLOG(ublk_io, "ublkb%u closed queues %u\n", ublk->ublk_id, ublk->queues_closed);
1009 
1010 	if (ublk->queues_closed < ublk->num_queues) {
1011 		return;
1012 	}
1013 
1014 	if (ublk->ctrl_ops_in_progress > 0) {
1015 		assert(ublk->retry_poller == NULL);
1016 		ublk->retry_count = UBLK_STOP_BUSY_WAITING_MS * 1000ULL / UBLK_BUSY_POLLING_INTERVAL_US;
1017 		ublk->retry_poller = SPDK_POLLER_REGISTER(_ublk_close_dev_retry, ublk,
1018 				     UBLK_BUSY_POLLING_INTERVAL_US);
1019 	} else {
1020 		ublk_delete_dev(ublk);
1021 	}
1022 }
1023 
1024 static void
1025 ublk_try_close_queue(struct ublk_queue *q)
1026 {
1027 	struct spdk_ublk_dev *ublk = q->dev;
1028 
1029 	/* Close queue until no I/O is submitted to bdev in flight,
1030 	 * no I/O is waiting to commit result, and all I/Os are aborted back.
1031 	 */
1032 	if (!TAILQ_EMPTY(&q->inflight_io_list) || !TAILQ_EMPTY(&q->completed_io_list) || q->cmd_inflight) {
1033 		/* wait for next retry */
1034 		return;
1035 	}
1036 
1037 	TAILQ_REMOVE(&q->poll_group->queue_list, q, tailq);
1038 	spdk_put_io_channel(q->bdev_ch);
1039 	q->bdev_ch = NULL;
1040 
1041 	spdk_thread_send_msg(spdk_thread_get_app_thread(), ublk_try_close_dev, ublk);
1042 }
1043 
1044 int
1045 ublk_stop_disk(uint32_t ublk_id, ublk_ctrl_cb ctrl_cb, void *cb_arg)
1046 {
1047 	struct spdk_ublk_dev *ublk;
1048 
1049 	assert(spdk_thread_is_app_thread(NULL));
1050 
1051 	ublk = ublk_dev_find_by_id(ublk_id);
1052 	if (ublk == NULL) {
1053 		SPDK_ERRLOG("no ublk dev with ublk_id=%u\n", ublk_id);
1054 		return -ENODEV;
1055 	}
1056 	if (ublk->is_closing) {
1057 		SPDK_WARNLOG("ublk %d is closing\n", ublk->ublk_id);
1058 		return -EBUSY;
1059 	}
1060 	if (ublk->ctrl_cb) {
1061 		SPDK_WARNLOG("ublk %d is busy with RPC call\n", ublk->ublk_id);
1062 		return -EBUSY;
1063 	}
1064 
1065 	ublk->ctrl_cb = ctrl_cb;
1066 	ublk->cb_arg = cb_arg;
1067 	return ublk_close_dev(ublk);
1068 }
1069 
1070 static inline void
1071 ublk_mark_io_done(struct ublk_io *io, int res)
1072 {
1073 	/*
1074 	 * mark io done by target, so that SPDK can commit its
1075 	 * result and fetch new request via io_uring command.
1076 	 */
1077 	io->cmd_op = UBLK_IO_COMMIT_AND_FETCH_REQ;
1078 	io->result = res;
1079 	io->need_data = false;
1080 }
1081 
1082 static void
1083 ublk_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1084 {
1085 	struct ublk_io	*io = cb_arg;
1086 	struct ublk_queue *q = io->q;
1087 	int res;
1088 
1089 	if (success) {
1090 		res = io->result;
1091 	} else {
1092 		res = -EIO;
1093 	}
1094 
1095 	ublk_mark_io_done(io, res);
1096 
1097 	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %d res %d)\n",
1098 		      q->q_id, io->tag, res);
1099 	TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1100 	TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq);
1101 
1102 	if (bdev_io != NULL) {
1103 		spdk_bdev_free_io(bdev_io);
1104 	}
1105 }
1106 
1107 static void
1108 ublk_queue_user_copy(struct ublk_io *io, bool is_write)
1109 {
1110 	struct ublk_queue *q = io->q;
1111 	const struct ublksrv_io_desc *iod = io->iod;
1112 	struct io_uring_sqe *sqe;
1113 	uint64_t pos;
1114 	uint32_t nbytes;
1115 
1116 	nbytes = iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1117 	pos = ublk_user_copy_pos(q->q_id, io->tag);
1118 	sqe = io_uring_get_sqe(&q->ring);
1119 	assert(sqe);
1120 
1121 	if (is_write) {
1122 		io_uring_prep_read(sqe, 0, io->payload, nbytes, pos);
1123 	} else {
1124 		io_uring_prep_write(sqe, 0, io->payload, nbytes, pos);
1125 	}
1126 	io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
1127 	io_uring_sqe_set_data64(sqe, build_user_data(io->tag, 0));
1128 
1129 	io->user_copy = true;
1130 	TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1131 	TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq);
1132 }
1133 
1134 static void
1135 ublk_user_copy_read_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1136 {
1137 	struct ublk_io	*io = cb_arg;
1138 
1139 	spdk_bdev_free_io(bdev_io);
1140 
1141 	if (success) {
1142 		ublk_queue_user_copy(io, false);
1143 		return;
1144 	}
1145 	/* READ IO Error */
1146 	ublk_io_done(NULL, false, cb_arg);
1147 }
1148 
1149 static void
1150 ublk_resubmit_io(void *arg)
1151 {
1152 	struct ublk_io *io = (struct ublk_io *)arg;
1153 
1154 	_ublk_submit_bdev_io(io->q, io);
1155 }
1156 
1157 static void
1158 ublk_queue_io(struct ublk_io *io)
1159 {
1160 	int rc;
1161 	struct spdk_bdev *bdev = io->q->dev->bdev;
1162 	struct ublk_queue *q = io->q;
1163 
1164 	io->bdev_io_wait.bdev = bdev;
1165 	io->bdev_io_wait.cb_fn = ublk_resubmit_io;
1166 	io->bdev_io_wait.cb_arg = io;
1167 
1168 	rc = spdk_bdev_queue_io_wait(bdev, q->bdev_ch, &io->bdev_io_wait);
1169 	if (rc != 0) {
1170 		SPDK_ERRLOG("Queue io failed in ublk_queue_io, rc=%d.\n", rc);
1171 		ublk_io_done(NULL, false, io);
1172 	}
1173 }
1174 
1175 static void
1176 ublk_io_get_buffer_cb(struct spdk_iobuf_entry *iobuf, void *buf)
1177 {
1178 	struct ublk_io *io = SPDK_CONTAINEROF(iobuf, struct ublk_io, iobuf);
1179 
1180 	io->mpool_entry = buf;
1181 	assert(io->payload == NULL);
1182 	io->payload = (void *)(uintptr_t)SPDK_ALIGN_CEIL((uintptr_t)buf, 4096ULL);
1183 	io->get_buf_cb(io);
1184 }
1185 
1186 static void
1187 ublk_io_get_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch,
1188 		   ublk_get_buf_cb get_buf_cb)
1189 {
1190 	void *buf;
1191 
1192 	io->payload_size = io->iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1193 	io->get_buf_cb = get_buf_cb;
1194 	buf = spdk_iobuf_get(iobuf_ch, io->payload_size, &io->iobuf, ublk_io_get_buffer_cb);
1195 
1196 	if (buf != NULL) {
1197 		ublk_io_get_buffer_cb(&io->iobuf, buf);
1198 	}
1199 }
1200 
1201 static void
1202 ublk_io_put_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch)
1203 {
1204 	if (io->payload) {
1205 		spdk_iobuf_put(iobuf_ch, io->mpool_entry, io->payload_size);
1206 		io->mpool_entry = NULL;
1207 		io->payload = NULL;
1208 	}
1209 }
1210 
1211 static void
1212 _ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io)
1213 {
1214 	struct spdk_ublk_dev *ublk = q->dev;
1215 	struct spdk_bdev_desc *desc = io->bdev_desc;
1216 	struct spdk_io_channel *ch = io->bdev_ch;
1217 	uint64_t offset_blocks, num_blocks;
1218 	spdk_bdev_io_completion_cb read_cb;
1219 	uint8_t ublk_op;
1220 	int rc = 0;
1221 	const struct ublksrv_io_desc *iod = io->iod;
1222 
1223 	ublk_op = ublksrv_get_op(iod);
1224 	offset_blocks = iod->start_sector >> ublk->sector_per_block_shift;
1225 	num_blocks = iod->nr_sectors >> ublk->sector_per_block_shift;
1226 
1227 	switch (ublk_op) {
1228 	case UBLK_IO_OP_READ:
1229 		if (g_ublk_tgt.user_copy) {
1230 			read_cb = ublk_user_copy_read_done;
1231 		} else {
1232 			read_cb = ublk_io_done;
1233 		}
1234 		rc = spdk_bdev_read_blocks(desc, ch, io->payload, offset_blocks, num_blocks, read_cb, io);
1235 		break;
1236 	case UBLK_IO_OP_WRITE:
1237 		rc = spdk_bdev_write_blocks(desc, ch, io->payload, offset_blocks, num_blocks, ublk_io_done, io);
1238 		break;
1239 	case UBLK_IO_OP_FLUSH:
1240 		rc = spdk_bdev_flush_blocks(desc, ch, 0, spdk_bdev_get_num_blocks(ublk->bdev), ublk_io_done, io);
1241 		break;
1242 	case UBLK_IO_OP_DISCARD:
1243 		rc = spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1244 		break;
1245 	case UBLK_IO_OP_WRITE_ZEROES:
1246 		rc = spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1247 		break;
1248 	default:
1249 		rc = -1;
1250 	}
1251 
1252 	if (rc < 0) {
1253 		if (rc == -ENOMEM) {
1254 			SPDK_INFOLOG(ublk, "No memory, start to queue io.\n");
1255 			ublk_queue_io(io);
1256 		} else {
1257 			SPDK_ERRLOG("ublk io failed in ublk_queue_io, rc=%d, ublk_op=%u\n", rc, ublk_op);
1258 			ublk_io_done(NULL, false, io);
1259 		}
1260 	}
1261 }
1262 
1263 static void
1264 read_get_buffer_done(struct ublk_io *io)
1265 {
1266 	_ublk_submit_bdev_io(io->q, io);
1267 }
1268 
1269 static void
1270 user_copy_write_get_buffer_done(struct ublk_io *io)
1271 {
1272 	ublk_queue_user_copy(io, true);
1273 }
1274 
1275 static void
1276 ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io)
1277 {
1278 	struct spdk_iobuf_channel *iobuf_ch = &q->poll_group->iobuf_ch;
1279 	const struct ublksrv_io_desc *iod = io->iod;
1280 	uint8_t ublk_op;
1281 
1282 	io->result = iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1283 	ublk_op = ublksrv_get_op(iod);
1284 	switch (ublk_op) {
1285 	case UBLK_IO_OP_READ:
1286 		ublk_io_get_buffer(io, iobuf_ch, read_get_buffer_done);
1287 		break;
1288 	case UBLK_IO_OP_WRITE:
1289 		if (g_ublk_tgt.user_copy) {
1290 			ublk_io_get_buffer(io, iobuf_ch, user_copy_write_get_buffer_done);
1291 		} else {
1292 			_ublk_submit_bdev_io(q, io);
1293 		}
1294 		break;
1295 	default:
1296 		_ublk_submit_bdev_io(q, io);
1297 		break;
1298 	}
1299 }
1300 
1301 static inline void
1302 ublksrv_queue_io_cmd(struct ublk_queue *q,
1303 		     struct ublk_io *io, unsigned tag)
1304 {
1305 	struct ublksrv_io_cmd *cmd;
1306 	struct io_uring_sqe *sqe;
1307 	unsigned int cmd_op = 0;;
1308 	uint64_t user_data;
1309 
1310 	/* each io should have operation of fetching or committing */
1311 	assert((io->cmd_op == UBLK_IO_FETCH_REQ) || (io->cmd_op == UBLK_IO_NEED_GET_DATA) ||
1312 	       (io->cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ));
1313 	cmd_op = io->cmd_op;
1314 
1315 	sqe = io_uring_get_sqe(&q->ring);
1316 	assert(sqe);
1317 
1318 	cmd = (struct ublksrv_io_cmd *)ublk_get_sqe_cmd(sqe);
1319 	if (cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ) {
1320 		cmd->result = io->result;
1321 	}
1322 
1323 	/* These fields should be written once, never change */
1324 	ublk_set_sqe_cmd_op(sqe, cmd_op);
1325 	/* dev->cdev_fd */
1326 	sqe->fd		= 0;
1327 	sqe->opcode	= IORING_OP_URING_CMD;
1328 	sqe->flags	= IOSQE_FIXED_FILE;
1329 	sqe->rw_flags	= 0;
1330 	cmd->tag	= tag;
1331 	cmd->addr	= g_ublk_tgt.user_copy ? 0 : (__u64)(uintptr_t)(io->payload);
1332 	cmd->q_id	= q->q_id;
1333 
1334 	user_data = build_user_data(tag, cmd_op);
1335 	io_uring_sqe_set_data64(sqe, user_data);
1336 
1337 	io->cmd_op = 0;
1338 
1339 	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %u cmd_op %u) iof %x stopping %d\n",
1340 		      q->q_id, tag, cmd_op,
1341 		      io->cmd_op, q->is_stopping);
1342 }
1343 
1344 static int
1345 ublk_io_xmit(struct ublk_queue *q)
1346 {
1347 	TAILQ_HEAD(, ublk_io) buffer_free_list;
1348 	struct spdk_iobuf_channel *iobuf_ch;
1349 	int rc = 0, count = 0;
1350 	struct ublk_io *io;
1351 
1352 	if (TAILQ_EMPTY(&q->completed_io_list)) {
1353 		return 0;
1354 	}
1355 
1356 	TAILQ_INIT(&buffer_free_list);
1357 	while (!TAILQ_EMPTY(&q->completed_io_list)) {
1358 		io = TAILQ_FIRST(&q->completed_io_list);
1359 		assert(io != NULL);
1360 		/*
1361 		 * Remove IO from list now assuming it will be completed. It will be inserted
1362 		 * back to the head if it cannot be completed. This approach is specifically
1363 		 * taken to work around a scan-build use-after-free mischaracterization.
1364 		 */
1365 		TAILQ_REMOVE(&q->completed_io_list, io, tailq);
1366 		if (!io->user_copy) {
1367 			if (!io->need_data) {
1368 				TAILQ_INSERT_TAIL(&buffer_free_list, io, tailq);
1369 			}
1370 			ublksrv_queue_io_cmd(q, io, io->tag);
1371 		}
1372 		count++;
1373 	}
1374 
1375 	q->cmd_inflight += count;
1376 	rc = io_uring_submit(&q->ring);
1377 	if (rc != count) {
1378 		SPDK_ERRLOG("could not submit all commands\n");
1379 		assert(false);
1380 	}
1381 
1382 	/* Note: for READ io, ublk will always copy the data out of
1383 	 * the buffers in the io_uring_submit context.  Since we
1384 	 * are not using SQPOLL for IO rings, we can safely free
1385 	 * those IO buffers here.  This design doesn't seem ideal,
1386 	 * but it's what's possible since there is no discrete
1387 	 * COMMIT_REQ operation.  That will need to change in the
1388 	 * future should we ever want to support async copy
1389 	 * operations.
1390 	 */
1391 	iobuf_ch = &q->poll_group->iobuf_ch;
1392 	while (!TAILQ_EMPTY(&buffer_free_list)) {
1393 		io = TAILQ_FIRST(&buffer_free_list);
1394 		TAILQ_REMOVE(&buffer_free_list, io, tailq);
1395 		ublk_io_put_buffer(io, iobuf_ch);
1396 	}
1397 	return rc;
1398 }
1399 
1400 static void
1401 write_get_buffer_done(struct ublk_io *io)
1402 {
1403 	io->need_data = true;
1404 	io->cmd_op = UBLK_IO_NEED_GET_DATA;
1405 	io->result = 0;
1406 
1407 	TAILQ_REMOVE(&io->q->inflight_io_list, io, tailq);
1408 	TAILQ_INSERT_TAIL(&io->q->completed_io_list, io, tailq);
1409 }
1410 
1411 static int
1412 ublk_io_recv(struct ublk_queue *q)
1413 {
1414 	struct io_uring_cqe *cqe;
1415 	unsigned head, tag;
1416 	int fetch, count = 0;
1417 	struct ublk_io *io;
1418 	struct spdk_iobuf_channel *iobuf_ch;
1419 
1420 	if (q->cmd_inflight == 0) {
1421 		return 0;
1422 	}
1423 
1424 	iobuf_ch = &q->poll_group->iobuf_ch;
1425 	io_uring_for_each_cqe(&q->ring, head, cqe) {
1426 		tag = user_data_to_tag(cqe->user_data);
1427 		io = &q->ios[tag];
1428 
1429 		SPDK_DEBUGLOG(ublk_io, "res %d qid %d tag %u, user copy %u, cmd_op %u\n",
1430 			      cqe->res, q->q_id, tag, io->user_copy, user_data_to_op(cqe->user_data));
1431 
1432 		q->cmd_inflight--;
1433 		TAILQ_INSERT_TAIL(&q->inflight_io_list, io, tailq);
1434 
1435 		if (!io->user_copy) {
1436 			fetch = (cqe->res != UBLK_IO_RES_ABORT) && !q->is_stopping;
1437 			if (!fetch) {
1438 				q->is_stopping = true;
1439 				if (io->cmd_op == UBLK_IO_FETCH_REQ) {
1440 					io->cmd_op = 0;
1441 				}
1442 			}
1443 
1444 			if (cqe->res == UBLK_IO_RES_OK) {
1445 				ublk_submit_bdev_io(q, io);
1446 			} else if (cqe->res == UBLK_IO_RES_NEED_GET_DATA) {
1447 				ublk_io_get_buffer(io, iobuf_ch, write_get_buffer_done);
1448 			} else {
1449 				if (cqe->res != UBLK_IO_RES_ABORT) {
1450 					SPDK_ERRLOG("ublk received error io: res %d qid %d tag %u cmd_op %u\n",
1451 						    cqe->res, q->q_id, tag, user_data_to_op(cqe->user_data));
1452 				}
1453 				TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1454 			}
1455 		} else {
1456 
1457 			/* clear `user_copy` for next use of this IO structure */
1458 			io->user_copy = false;
1459 
1460 			assert((ublksrv_get_op(io->iod) == UBLK_IO_OP_READ) ||
1461 			       (ublksrv_get_op(io->iod) == UBLK_IO_OP_WRITE));
1462 			if (cqe->res != io->result) {
1463 				/* EIO */
1464 				ublk_io_done(NULL, false, io);
1465 			} else {
1466 				if (ublksrv_get_op(io->iod) == UBLK_IO_OP_READ) {
1467 					/* bdev_io is already freed in first READ cycle */
1468 					ublk_io_done(NULL, true, io);
1469 				} else {
1470 					_ublk_submit_bdev_io(q, io);
1471 				}
1472 			}
1473 		}
1474 		count += 1;
1475 		if (count == UBLK_QUEUE_REQUEST) {
1476 			break;
1477 		}
1478 	}
1479 	io_uring_cq_advance(&q->ring, count);
1480 
1481 	return count;
1482 }
1483 
1484 static int
1485 ublk_poll(void *arg)
1486 {
1487 	struct ublk_poll_group *poll_group = arg;
1488 	struct ublk_queue *q, *q_tmp;
1489 	int sent, received, count = 0;
1490 
1491 	TAILQ_FOREACH_SAFE(q, &poll_group->queue_list, tailq, q_tmp) {
1492 		sent = ublk_io_xmit(q);
1493 		received = ublk_io_recv(q);
1494 		if (spdk_unlikely(q->is_stopping)) {
1495 			ublk_try_close_queue(q);
1496 		}
1497 		count += sent + received;
1498 	}
1499 	if (count > 0) {
1500 		return SPDK_POLLER_BUSY;
1501 	} else {
1502 		return SPDK_POLLER_IDLE;
1503 	}
1504 }
1505 
1506 static void
1507 ublk_bdev_hot_remove(struct spdk_ublk_dev *ublk)
1508 {
1509 	ublk_close_dev(ublk);
1510 }
1511 
1512 static void
1513 ublk_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1514 		   void *event_ctx)
1515 {
1516 	switch (type) {
1517 	case SPDK_BDEV_EVENT_REMOVE:
1518 		ublk_bdev_hot_remove(event_ctx);
1519 		break;
1520 	default:
1521 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1522 		break;
1523 	}
1524 }
1525 
1526 static void
1527 ublk_dev_init_io_cmds(struct io_uring *r, uint32_t q_depth)
1528 {
1529 	struct io_uring_sqe *sqe;
1530 	uint32_t i;
1531 
1532 	for (i = 0; i < q_depth; i++) {
1533 		sqe = ublk_uring_get_sqe(r, i);
1534 
1535 		/* These fields should be written once, never change */
1536 		sqe->flags = IOSQE_FIXED_FILE;
1537 		sqe->rw_flags = 0;
1538 		sqe->ioprio = 0;
1539 		sqe->off = 0;
1540 	}
1541 }
1542 
1543 static int
1544 ublk_dev_queue_init(struct ublk_queue *q)
1545 {
1546 	int rc = 0, cmd_buf_size;
1547 	uint32_t j;
1548 	struct spdk_ublk_dev *ublk = q->dev;
1549 	unsigned long off;
1550 
1551 	cmd_buf_size = ublk_queue_cmd_buf_sz(q->q_depth);
1552 	off = UBLKSRV_CMD_BUF_OFFSET +
1553 	      q->q_id * (UBLK_MAX_QUEUE_DEPTH * sizeof(struct ublksrv_io_desc));
1554 	q->io_cmd_buf = (struct ublksrv_io_desc *)mmap(0, cmd_buf_size, PROT_READ,
1555 			MAP_SHARED | MAP_POPULATE, ublk->cdev_fd, off);
1556 	if (q->io_cmd_buf == MAP_FAILED) {
1557 		q->io_cmd_buf = NULL;
1558 		rc = -errno;
1559 		SPDK_ERRLOG("Failed at mmap: %s\n", spdk_strerror(-rc));
1560 		return rc;
1561 	}
1562 
1563 	for (j = 0; j < q->q_depth; j++) {
1564 		q->ios[j].cmd_op = UBLK_IO_FETCH_REQ;
1565 		q->ios[j].iod = &q->io_cmd_buf[j];
1566 	}
1567 
1568 	rc = ublk_setup_ring(q->q_depth, &q->ring, IORING_SETUP_SQE128);
1569 	if (rc < 0) {
1570 		SPDK_ERRLOG("Failed at setup uring: %s\n", spdk_strerror(-rc));
1571 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1572 		q->io_cmd_buf = NULL;
1573 		return rc;
1574 	}
1575 
1576 	rc = io_uring_register_files(&q->ring, &ublk->cdev_fd, 1);
1577 	if (rc != 0) {
1578 		SPDK_ERRLOG("Failed at uring register files: %s\n", spdk_strerror(-rc));
1579 		io_uring_queue_exit(&q->ring);
1580 		q->ring.ring_fd = -1;
1581 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1582 		q->io_cmd_buf = NULL;
1583 		return rc;
1584 	}
1585 
1586 	ublk_dev_init_io_cmds(&q->ring, q->q_depth);
1587 
1588 	return 0;
1589 }
1590 
1591 static void
1592 ublk_dev_queue_fini(struct ublk_queue *q)
1593 {
1594 	if (q->ring.ring_fd >= 0) {
1595 		io_uring_unregister_files(&q->ring);
1596 		io_uring_queue_exit(&q->ring);
1597 		q->ring.ring_fd = -1;
1598 	}
1599 	if (q->io_cmd_buf) {
1600 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1601 	}
1602 }
1603 
1604 static void
1605 ublk_dev_queue_io_init(struct ublk_queue *q)
1606 {
1607 	struct ublk_io *io;
1608 	uint32_t i;
1609 	int rc __attribute__((unused));
1610 	void *buf;
1611 
1612 	/* Some older kernels require a buffer to get posted, even
1613 	 * when NEED_GET_DATA has been specified.  So allocate a
1614 	 * temporary buffer, only for purposes of this workaround.
1615 	 * It never actually gets used, so we will free it immediately
1616 	 * after all of the commands are posted.
1617 	 */
1618 	buf = malloc(64);
1619 
1620 	assert(q->bdev_ch != NULL);
1621 
1622 	/* Initialize and submit all io commands to ublk driver */
1623 	for (i = 0; i < q->q_depth; i++) {
1624 		io = &q->ios[i];
1625 		io->tag = (uint16_t)i;
1626 		io->payload = buf;
1627 		io->bdev_ch = q->bdev_ch;
1628 		io->bdev_desc = q->dev->bdev_desc;
1629 		ublksrv_queue_io_cmd(q, io, i);
1630 	}
1631 
1632 	q->cmd_inflight += q->q_depth;
1633 	rc = io_uring_submit(&q->ring);
1634 	assert(rc == (int)q->q_depth);
1635 	for (i = 0; i < q->q_depth; i++) {
1636 		io = &q->ios[i];
1637 		io->payload = NULL;
1638 	}
1639 	free(buf);
1640 }
1641 
1642 static int
1643 ublk_set_params(struct spdk_ublk_dev *ublk)
1644 {
1645 	int rc;
1646 
1647 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_SET_PARAMS);
1648 	if (rc < 0) {
1649 		SPDK_ERRLOG("UBLK can't set params for dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
1650 	}
1651 
1652 	return rc;
1653 }
1654 
1655 static void
1656 ublk_dev_info_init(struct spdk_ublk_dev *ublk)
1657 {
1658 	struct ublksrv_ctrl_dev_info uinfo = {
1659 		.queue_depth = ublk->queue_depth,
1660 		.nr_hw_queues = ublk->num_queues,
1661 		.dev_id = ublk->ublk_id,
1662 		.max_io_buf_bytes = UBLK_IO_MAX_BYTES,
1663 		.ublksrv_pid = getpid(),
1664 		.flags = UBLK_F_URING_CMD_COMP_IN_TASK,
1665 	};
1666 
1667 	if (g_ublk_tgt.user_copy) {
1668 		uinfo.flags |= UBLK_F_USER_COPY;
1669 	} else {
1670 		uinfo.flags |= UBLK_F_NEED_GET_DATA;
1671 	}
1672 
1673 	if (g_ublk_tgt.user_recovery) {
1674 		uinfo.flags |= UBLK_F_USER_RECOVERY;
1675 		uinfo.flags |= UBLK_F_USER_RECOVERY_REISSUE;
1676 	}
1677 
1678 	ublk->dev_info = uinfo;
1679 }
1680 
1681 /* Set ublk device parameters based on bdev */
1682 static void
1683 ublk_info_param_init(struct spdk_ublk_dev *ublk)
1684 {
1685 	struct spdk_bdev *bdev = ublk->bdev;
1686 	uint32_t blk_size = spdk_bdev_get_data_block_size(bdev);
1687 	uint32_t pblk_size = spdk_bdev_get_physical_block_size(bdev);
1688 	uint32_t io_opt_blocks = spdk_bdev_get_optimal_io_boundary(bdev);
1689 	uint64_t num_blocks = spdk_bdev_get_num_blocks(bdev);
1690 	uint8_t sectors_per_block = blk_size >> LINUX_SECTOR_SHIFT;
1691 	uint32_t io_min_size = blk_size;
1692 	uint32_t io_opt_size = spdk_max(io_opt_blocks * blk_size, io_min_size);
1693 
1694 	struct ublk_params uparams = {
1695 		.types = UBLK_PARAM_TYPE_BASIC,
1696 		.len = sizeof(struct ublk_params),
1697 		.basic = {
1698 			.logical_bs_shift = spdk_u32log2(blk_size),
1699 			.physical_bs_shift = spdk_u32log2(pblk_size),
1700 			.io_min_shift = spdk_u32log2(io_min_size),
1701 			.io_opt_shift = spdk_u32log2(io_opt_size),
1702 			.dev_sectors = num_blocks * sectors_per_block,
1703 			.max_sectors = UBLK_IO_MAX_BYTES >> LINUX_SECTOR_SHIFT,
1704 		}
1705 	};
1706 
1707 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_FLUSH)) {
1708 		uparams.basic.attrs = UBLK_ATTR_VOLATILE_CACHE;
1709 	}
1710 
1711 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1712 		uparams.types |= UBLK_PARAM_TYPE_DISCARD;
1713 		uparams.discard.discard_alignment = sectors_per_block;
1714 		uparams.discard.max_discard_sectors = num_blocks * sectors_per_block;
1715 		uparams.discard.max_discard_segments = 1;
1716 		uparams.discard.discard_granularity = blk_size;
1717 		if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1718 			uparams.discard.max_write_zeroes_sectors = num_blocks * sectors_per_block;
1719 		}
1720 	}
1721 
1722 	ublk->dev_params = uparams;
1723 }
1724 
1725 static void
1726 _ublk_free_dev(void *arg)
1727 {
1728 	struct spdk_ublk_dev *ublk = arg;
1729 
1730 	ublk_free_dev(ublk);
1731 }
1732 
1733 static void
1734 free_buffers(void *arg)
1735 {
1736 	struct ublk_queue *q = arg;
1737 	uint32_t i;
1738 
1739 	for (i = 0; i < q->q_depth; i++) {
1740 		ublk_io_put_buffer(&q->ios[i], &q->poll_group->iobuf_ch);
1741 	}
1742 	free(q->ios);
1743 	q->ios = NULL;
1744 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _ublk_free_dev, q->dev);
1745 }
1746 
1747 static void
1748 ublk_free_dev(struct spdk_ublk_dev *ublk)
1749 {
1750 	struct ublk_queue *q;
1751 	uint32_t q_idx;
1752 
1753 	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
1754 		q = &ublk->queues[q_idx];
1755 
1756 		/* The ublk_io of this queue are not initialized. */
1757 		if (q->ios == NULL) {
1758 			continue;
1759 		}
1760 
1761 		/* We found a queue that has an ios array that may have buffers
1762 		 * that need to be freed.  Send a message to the queue's thread
1763 		 * so it can free the buffers back to that thread's iobuf channel.
1764 		 * When it's done, it will set q->ios to NULL and send a message
1765 		 * back to this function to continue.
1766 		 */
1767 		if (q->poll_group) {
1768 			spdk_thread_send_msg(q->poll_group->ublk_thread, free_buffers, q);
1769 			return;
1770 		} else {
1771 			free(q->ios);
1772 			q->ios = NULL;
1773 		}
1774 	}
1775 
1776 	/* All of the buffers associated with the queues have been freed, so now
1777 	 * continue with releasing resources for the rest of the ublk device.
1778 	 */
1779 	if (ublk->bdev_desc) {
1780 		spdk_bdev_close(ublk->bdev_desc);
1781 		ublk->bdev_desc = NULL;
1782 	}
1783 
1784 	ublk_dev_list_unregister(ublk);
1785 	SPDK_NOTICELOG("ublk dev %d stopped\n", ublk->ublk_id);
1786 
1787 	free(ublk);
1788 }
1789 
1790 static int
1791 ublk_ios_init(struct spdk_ublk_dev *ublk)
1792 {
1793 	int rc;
1794 	uint32_t i, j;
1795 	struct ublk_queue *q;
1796 
1797 	for (i = 0; i < ublk->num_queues; i++) {
1798 		q = &ublk->queues[i];
1799 
1800 		TAILQ_INIT(&q->completed_io_list);
1801 		TAILQ_INIT(&q->inflight_io_list);
1802 		q->dev = ublk;
1803 		q->q_id = i;
1804 		q->q_depth = ublk->queue_depth;
1805 		q->ios = calloc(q->q_depth, sizeof(struct ublk_io));
1806 		if (!q->ios) {
1807 			rc = -ENOMEM;
1808 			SPDK_ERRLOG("could not allocate queue ios\n");
1809 			goto err;
1810 		}
1811 		for (j = 0; j < q->q_depth; j++) {
1812 			q->ios[j].q = q;
1813 		}
1814 	}
1815 
1816 	return 0;
1817 
1818 err:
1819 	for (i = 0; i < ublk->num_queues; i++) {
1820 		free(q->ios);
1821 		q->ios = NULL;
1822 	}
1823 	return rc;
1824 }
1825 
1826 static void
1827 ublk_queue_recovery_done(void *arg)
1828 {
1829 	struct spdk_ublk_dev *ublk = arg;
1830 
1831 	ublk->online_num_queues++;
1832 	if (ublk->is_recovering && (ublk->online_num_queues == ublk->num_queues)) {
1833 		ublk_ctrl_cmd_submit(ublk, UBLK_CMD_END_USER_RECOVERY);
1834 	}
1835 }
1836 
1837 static void
1838 ublk_queue_run(void *arg1)
1839 {
1840 	struct ublk_queue	*q = arg1;
1841 	struct spdk_ublk_dev *ublk = q->dev;
1842 	struct ublk_poll_group *poll_group = q->poll_group;
1843 
1844 	assert(spdk_get_thread() == poll_group->ublk_thread);
1845 	q->bdev_ch = spdk_bdev_get_io_channel(ublk->bdev_desc);
1846 	/* Queues must be filled with IO in the io pthread */
1847 	ublk_dev_queue_io_init(q);
1848 
1849 	TAILQ_INSERT_TAIL(&poll_group->queue_list, q, tailq);
1850 	spdk_thread_send_msg(spdk_thread_get_app_thread(), ublk_queue_recovery_done, ublk);
1851 }
1852 
1853 int
1854 ublk_start_disk(const char *bdev_name, uint32_t ublk_id,
1855 		uint32_t num_queues, uint32_t queue_depth,
1856 		ublk_ctrl_cb ctrl_cb, void *cb_arg)
1857 {
1858 	int			rc;
1859 	uint32_t		i;
1860 	struct spdk_bdev	*bdev;
1861 	struct spdk_ublk_dev	*ublk = NULL;
1862 	uint32_t		sector_per_block;
1863 
1864 	assert(spdk_thread_is_app_thread(NULL));
1865 
1866 	if (g_ublk_tgt.active == false) {
1867 		SPDK_ERRLOG("NO ublk target exist\n");
1868 		return -ENODEV;
1869 	}
1870 
1871 	ublk = ublk_dev_find_by_id(ublk_id);
1872 	if (ublk != NULL) {
1873 		SPDK_DEBUGLOG(ublk, "ublk id %d is in use.\n", ublk_id);
1874 		return -EBUSY;
1875 	}
1876 
1877 	if (g_ublk_tgt.num_ublk_devs >= g_ublks_max) {
1878 		SPDK_DEBUGLOG(ublk, "Reached maximum number of supported devices: %u\n", g_ublks_max);
1879 		return -ENOTSUP;
1880 	}
1881 
1882 	ublk = calloc(1, sizeof(*ublk));
1883 	if (ublk == NULL) {
1884 		return -ENOMEM;
1885 	}
1886 	ublk->ctrl_cb = ctrl_cb;
1887 	ublk->cb_arg = cb_arg;
1888 	ublk->cdev_fd = -1;
1889 	ublk->ublk_id = ublk_id;
1890 	UBLK_DEBUGLOG(ublk, "bdev %s num_queues %d queue_depth %d\n",
1891 		      bdev_name, num_queues, queue_depth);
1892 
1893 	rc = spdk_bdev_open_ext(bdev_name, true, ublk_bdev_event_cb, ublk, &ublk->bdev_desc);
1894 	if (rc != 0) {
1895 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc);
1896 		free(ublk);
1897 		return rc;
1898 	}
1899 
1900 	bdev = spdk_bdev_desc_get_bdev(ublk->bdev_desc);
1901 	ublk->bdev = bdev;
1902 	sector_per_block = spdk_bdev_get_data_block_size(ublk->bdev) >> LINUX_SECTOR_SHIFT;
1903 	ublk->sector_per_block_shift = spdk_u32log2(sector_per_block);
1904 
1905 	ublk->queues_closed = 0;
1906 	ublk->num_queues = num_queues;
1907 	ublk->queue_depth = queue_depth;
1908 	if (ublk->queue_depth > UBLK_DEV_MAX_QUEUE_DEPTH) {
1909 		SPDK_WARNLOG("Set Queue depth %d of UBLK %d to maximum %d\n",
1910 			     ublk->queue_depth, ublk->ublk_id, UBLK_DEV_MAX_QUEUE_DEPTH);
1911 		ublk->queue_depth = UBLK_DEV_MAX_QUEUE_DEPTH;
1912 	}
1913 	if (ublk->num_queues > UBLK_DEV_MAX_QUEUES) {
1914 		SPDK_WARNLOG("Set Queue num %d of UBLK %d to maximum %d\n",
1915 			     ublk->num_queues, ublk->ublk_id, UBLK_DEV_MAX_QUEUES);
1916 		ublk->num_queues = UBLK_DEV_MAX_QUEUES;
1917 	}
1918 	for (i = 0; i < ublk->num_queues; i++) {
1919 		ublk->queues[i].ring.ring_fd = -1;
1920 	}
1921 
1922 	ublk_dev_info_init(ublk);
1923 	ublk_info_param_init(ublk);
1924 	rc = ublk_ios_init(ublk);
1925 	if (rc != 0) {
1926 		spdk_bdev_close(ublk->bdev_desc);
1927 		free(ublk);
1928 		return rc;
1929 	}
1930 
1931 	SPDK_INFOLOG(ublk, "Enabling kernel access to bdev %s via ublk %d\n",
1932 		     bdev_name, ublk_id);
1933 
1934 	/* Add ublk_dev to the end of disk list */
1935 	ublk_dev_list_register(ublk);
1936 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_ADD_DEV);
1937 	if (rc < 0) {
1938 		SPDK_ERRLOG("UBLK can't add dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
1939 		ublk_free_dev(ublk);
1940 	}
1941 
1942 	return rc;
1943 }
1944 
1945 static int
1946 ublk_start_dev(struct spdk_ublk_dev *ublk, bool is_recovering)
1947 {
1948 	int			rc;
1949 	uint32_t		q_id;
1950 	struct spdk_thread	*ublk_thread;
1951 	char			buf[64];
1952 
1953 	snprintf(buf, 64, "%s%d", UBLK_BLK_CDEV, ublk->ublk_id);
1954 	ublk->cdev_fd = open(buf, O_RDWR);
1955 	if (ublk->cdev_fd < 0) {
1956 		rc = ublk->cdev_fd;
1957 		SPDK_ERRLOG("can't open %s, rc %d\n", buf, rc);
1958 		return rc;
1959 	}
1960 
1961 	for (q_id = 0; q_id < ublk->num_queues; q_id++) {
1962 		rc = ublk_dev_queue_init(&ublk->queues[q_id]);
1963 		if (rc) {
1964 			return rc;
1965 		}
1966 	}
1967 
1968 	if (!is_recovering) {
1969 		rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_START_DEV);
1970 		if (rc < 0) {
1971 			SPDK_ERRLOG("start dev %d failed, rc %s\n", ublk->ublk_id,
1972 				    spdk_strerror(-rc));
1973 			return rc;
1974 		}
1975 	}
1976 
1977 	/* Send queue to different spdk_threads for load balance */
1978 	for (q_id = 0; q_id < ublk->num_queues; q_id++) {
1979 		ublk->queues[q_id].poll_group = &g_ublk_tgt.poll_groups[g_next_ublk_poll_group];
1980 		ublk_thread = g_ublk_tgt.poll_groups[g_next_ublk_poll_group].ublk_thread;
1981 		spdk_thread_send_msg(ublk_thread, ublk_queue_run, &ublk->queues[q_id]);
1982 		g_next_ublk_poll_group++;
1983 		if (g_next_ublk_poll_group == g_num_ublk_poll_groups) {
1984 			g_next_ublk_poll_group = 0;
1985 		}
1986 	}
1987 
1988 	return 0;
1989 }
1990 
1991 static int
1992 ublk_ctrl_start_recovery(struct spdk_ublk_dev *ublk)
1993 {
1994 	int                     rc;
1995 	uint32_t                i;
1996 
1997 	if (ublk->ublk_id != ublk->dev_info.dev_id) {
1998 		SPDK_ERRLOG("Invalid ublk ID\n");
1999 		return -EINVAL;
2000 	}
2001 
2002 	ublk->num_queues = ublk->dev_info.nr_hw_queues;
2003 	ublk->queue_depth = ublk->dev_info.queue_depth;
2004 	ublk->dev_info.ublksrv_pid = getpid();
2005 
2006 	SPDK_DEBUGLOG(ublk, "Recovering ublk %d, num queues %u, queue depth %u, flags 0x%llx\n",
2007 		      ublk->ublk_id,
2008 		      ublk->num_queues, ublk->queue_depth, ublk->dev_info.flags);
2009 
2010 	for (i = 0; i < ublk->num_queues; i++) {
2011 		ublk->queues[i].ring.ring_fd = -1;
2012 	}
2013 
2014 	ublk_info_param_init(ublk);
2015 	rc = ublk_ios_init(ublk);
2016 	if (rc != 0) {
2017 		return rc;
2018 	}
2019 
2020 	ublk->is_recovering = true;
2021 	return ublk_ctrl_cmd_submit(ublk, UBLK_CMD_START_USER_RECOVERY);
2022 }
2023 
2024 int
2025 ublk_start_disk_recovery(const char *bdev_name, uint32_t ublk_id, ublk_ctrl_cb ctrl_cb,
2026 			 void *cb_arg)
2027 {
2028 	int			rc;
2029 	struct spdk_bdev	*bdev;
2030 	struct spdk_ublk_dev	*ublk = NULL;
2031 	uint32_t		sector_per_block;
2032 
2033 	assert(spdk_thread_is_app_thread(NULL));
2034 
2035 	if (g_ublk_tgt.active == false) {
2036 		SPDK_ERRLOG("NO ublk target exist\n");
2037 		return -ENODEV;
2038 	}
2039 
2040 	if (!g_ublk_tgt.user_recovery) {
2041 		SPDK_ERRLOG("User recovery is enabled with kernel version >= 6.4\n");
2042 		return -ENOTSUP;
2043 	}
2044 
2045 	ublk = ublk_dev_find_by_id(ublk_id);
2046 	if (ublk != NULL) {
2047 		SPDK_DEBUGLOG(ublk, "ublk id %d is in use.\n", ublk_id);
2048 		return -EBUSY;
2049 	}
2050 
2051 	if (g_ublk_tgt.num_ublk_devs >= g_ublks_max) {
2052 		SPDK_DEBUGLOG(ublk, "Reached maximum number of supported devices: %u\n", g_ublks_max);
2053 		return -ENOTSUP;
2054 	}
2055 
2056 	ublk = calloc(1, sizeof(*ublk));
2057 	if (ublk == NULL) {
2058 		return -ENOMEM;
2059 	}
2060 	ublk->ctrl_cb = ctrl_cb;
2061 	ublk->cb_arg = cb_arg;
2062 	ublk->cdev_fd = -1;
2063 	ublk->ublk_id = ublk_id;
2064 
2065 	rc = spdk_bdev_open_ext(bdev_name, true, ublk_bdev_event_cb, ublk, &ublk->bdev_desc);
2066 	if (rc != 0) {
2067 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc);
2068 		free(ublk);
2069 		return rc;
2070 	}
2071 
2072 	bdev = spdk_bdev_desc_get_bdev(ublk->bdev_desc);
2073 	ublk->bdev = bdev;
2074 	sector_per_block = spdk_bdev_get_data_block_size(ublk->bdev) >> LINUX_SECTOR_SHIFT;
2075 	ublk->sector_per_block_shift = spdk_u32log2(sector_per_block);
2076 
2077 	SPDK_NOTICELOG("Recovering ublk %d with bdev %s\n", ublk->ublk_id, bdev_name);
2078 
2079 	ublk_dev_list_register(ublk);
2080 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_GET_DEV_INFO);
2081 	if (rc < 0) {
2082 		ublk_free_dev(ublk);
2083 	}
2084 
2085 	return rc;
2086 }
2087 
2088 SPDK_LOG_REGISTER_COMPONENT(ublk)
2089 SPDK_LOG_REGISTER_COMPONENT(ublk_io)
2090