xref: /spdk/lib/ublk/ublk.c (revision dbcc38f096e5336dcc4ab5e60b10202db51c0a38)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2022 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include <liburing.h>
7 
8 #include "spdk/stdinc.h"
9 #include "spdk/string.h"
10 #include "spdk/bdev.h"
11 #include "spdk/endian.h"
12 #include "spdk/env.h"
13 #include "spdk/likely.h"
14 #include "spdk/log.h"
15 #include "spdk/util.h"
16 #include "spdk/queue.h"
17 #include "spdk/json.h"
18 #include "spdk/ublk.h"
19 #include "spdk/thread.h"
20 
21 #include "ublk_internal.h"
22 
23 #define UBLK_CTRL_DEV					"/dev/ublk-control"
24 #define UBLK_BLK_CDEV					"/dev/ublkc"
25 
26 #define LINUX_SECTOR_SHIFT				9
27 #define UBLK_IO_MAX_BYTES				SPDK_BDEV_LARGE_BUF_MAX_SIZE
28 #define UBLK_DEV_MAX_QUEUES				32
29 #define UBLK_DEV_MAX_QUEUE_DEPTH			1024
30 #define UBLK_QUEUE_REQUEST				32
31 #define UBLK_STOP_BUSY_WAITING_MS			10000
32 #define UBLK_BUSY_POLLING_INTERVAL_US			20000
33 #define UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US	1000
34 /* By default, kernel ublk_drv driver can support up to 64 block devices */
35 #define UBLK_DEFAULT_MAX_SUPPORTED_DEVS			64
36 
37 #define UBLK_IOBUF_SMALL_CACHE_SIZE			128
38 #define UBLK_IOBUF_LARGE_CACHE_SIZE			32
39 
40 #define UBLK_DEBUGLOG(ublk, format, ...) \
41 	SPDK_DEBUGLOG(ublk, "ublk%d: " format, ublk->ublk_id, ##__VA_ARGS__);
42 
43 static uint32_t g_num_ublk_poll_groups = 0;
44 static uint32_t g_next_ublk_poll_group = 0;
45 static uint32_t g_ublks_max = UBLK_DEFAULT_MAX_SUPPORTED_DEVS;
46 static struct spdk_cpuset g_core_mask;
47 
48 struct ublk_queue;
49 struct ublk_poll_group;
50 struct ublk_io;
51 static void _ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io);
52 static void ublk_dev_queue_fini(struct ublk_queue *q);
53 static int ublk_poll(void *arg);
54 
55 static int ublk_set_params(struct spdk_ublk_dev *ublk);
56 static int ublk_start_dev(struct spdk_ublk_dev *ublk, bool is_recovering);
57 static void ublk_free_dev(struct spdk_ublk_dev *ublk);
58 static void ublk_delete_dev(void *arg);
59 static int ublk_close_dev(struct spdk_ublk_dev *ublk);
60 static int ublk_ctrl_start_recovery(struct spdk_ublk_dev *ublk);
61 
62 static int ublk_ctrl_cmd_submit(struct spdk_ublk_dev *ublk, uint32_t cmd_op);
63 
64 static const char *ublk_op_name[64] = {
65 	[UBLK_CMD_GET_DEV_INFO] = "UBLK_CMD_GET_DEV_INFO",
66 	[UBLK_CMD_ADD_DEV] =	"UBLK_CMD_ADD_DEV",
67 	[UBLK_CMD_DEL_DEV] =	"UBLK_CMD_DEL_DEV",
68 	[UBLK_CMD_START_DEV] =	"UBLK_CMD_START_DEV",
69 	[UBLK_CMD_STOP_DEV] =	"UBLK_CMD_STOP_DEV",
70 	[UBLK_CMD_SET_PARAMS] =	"UBLK_CMD_SET_PARAMS",
71 	[UBLK_CMD_START_USER_RECOVERY] = "UBLK_CMD_START_USER_RECOVERY",
72 	[UBLK_CMD_END_USER_RECOVERY] = "UBLK_CMD_END_USER_RECOVERY",
73 };
74 
75 typedef void (*ublk_get_buf_cb)(struct ublk_io *io);
76 
77 struct ublk_io {
78 	void			*payload;
79 	void			*mpool_entry;
80 	bool			need_data;
81 	bool			user_copy;
82 	uint16_t		tag;
83 	uint64_t		payload_size;
84 	uint32_t		cmd_op;
85 	int32_t			result;
86 	struct spdk_bdev_desc	*bdev_desc;
87 	struct spdk_io_channel	*bdev_ch;
88 	const struct ublksrv_io_desc	*iod;
89 	ublk_get_buf_cb		get_buf_cb;
90 	struct ublk_queue	*q;
91 	/* for bdev io_wait */
92 	struct spdk_bdev_io_wait_entry bdev_io_wait;
93 	struct spdk_iobuf_entry	iobuf;
94 
95 	TAILQ_ENTRY(ublk_io)	tailq;
96 };
97 
98 struct ublk_queue {
99 	uint32_t		q_id;
100 	uint32_t		q_depth;
101 	struct ublk_io		*ios;
102 	TAILQ_HEAD(, ublk_io)	completed_io_list;
103 	TAILQ_HEAD(, ublk_io)	inflight_io_list;
104 	uint32_t		cmd_inflight;
105 	bool			is_stopping;
106 	struct ublksrv_io_desc	*io_cmd_buf;
107 	/* ring depth == dev_info->queue_depth. */
108 	struct io_uring		ring;
109 	struct spdk_ublk_dev	*dev;
110 	struct ublk_poll_group	*poll_group;
111 	struct spdk_io_channel	*bdev_ch;
112 
113 	TAILQ_ENTRY(ublk_queue)	tailq;
114 };
115 
116 struct spdk_ublk_dev {
117 	struct spdk_bdev	*bdev;
118 	struct spdk_bdev_desc	*bdev_desc;
119 
120 	int			cdev_fd;
121 	struct ublk_params	dev_params;
122 	struct ublksrv_ctrl_dev_info	dev_info;
123 
124 	uint32_t		ublk_id;
125 	uint32_t		num_queues;
126 	uint32_t		queue_depth;
127 	uint32_t		online_num_queues;
128 	uint32_t		sector_per_block_shift;
129 	struct ublk_queue	queues[UBLK_DEV_MAX_QUEUES];
130 
131 	struct spdk_poller	*retry_poller;
132 	int			retry_count;
133 	uint32_t		queues_closed;
134 	ublk_ctrl_cb		ctrl_cb;
135 	void			*cb_arg;
136 	uint32_t		current_cmd_op;
137 	uint32_t		ctrl_ops_in_progress;
138 	bool			is_closing;
139 	bool			is_recovering;
140 
141 	TAILQ_ENTRY(spdk_ublk_dev) tailq;
142 	TAILQ_ENTRY(spdk_ublk_dev) wait_tailq;
143 };
144 
145 struct ublk_poll_group {
146 	struct spdk_thread		*ublk_thread;
147 	struct spdk_poller		*ublk_poller;
148 	struct spdk_iobuf_channel	iobuf_ch;
149 	TAILQ_HEAD(, ublk_queue)	queue_list;
150 };
151 
152 struct ublk_tgt {
153 	int			ctrl_fd;
154 	bool			active;
155 	bool			is_destroying;
156 	spdk_ublk_fini_cb	cb_fn;
157 	void			*cb_arg;
158 	struct io_uring		ctrl_ring;
159 	struct spdk_poller	*ctrl_poller;
160 	uint32_t		ctrl_ops_in_progress;
161 	struct ublk_poll_group	*poll_groups;
162 	uint32_t		num_ublk_devs;
163 	uint64_t		features;
164 	/* `ublk_drv` supports UBLK_F_CMD_IOCTL_ENCODE */
165 	bool			ioctl_encode;
166 	/* `ublk_drv` supports UBLK_F_USER_COPY */
167 	bool			user_copy;
168 	/* `ublk_drv` supports UBLK_F_USER_RECOVERY */
169 	bool			user_recovery;
170 };
171 
172 static TAILQ_HEAD(, spdk_ublk_dev) g_ublk_devs = TAILQ_HEAD_INITIALIZER(g_ublk_devs);
173 static struct ublk_tgt g_ublk_tgt;
174 
175 /* helpers for using io_uring */
176 static inline int
177 ublk_setup_ring(uint32_t depth, struct io_uring *r, unsigned flags)
178 {
179 	struct io_uring_params p = {};
180 
181 	p.flags = flags | IORING_SETUP_CQSIZE;
182 	p.cq_entries = depth;
183 
184 	return io_uring_queue_init_params(depth, r, &p);
185 }
186 
187 static inline struct io_uring_sqe *
188 ublk_uring_get_sqe(struct io_uring *r, uint32_t idx)
189 {
190 	/* Need to update the idx since we set IORING_SETUP_SQE128 parameter in ublk_setup_ring */
191 	return &r->sq.sqes[idx << 1];
192 }
193 
194 static inline void *
195 ublk_get_sqe_cmd(struct io_uring_sqe *sqe)
196 {
197 	return (void *)&sqe->addr3;
198 }
199 
200 static inline void
201 ublk_set_sqe_cmd_op(struct io_uring_sqe *sqe, uint32_t cmd_op)
202 {
203 	uint32_t opc = cmd_op;
204 
205 	if (g_ublk_tgt.ioctl_encode) {
206 		switch (cmd_op) {
207 		/* ctrl uring */
208 		case UBLK_CMD_GET_DEV_INFO:
209 			opc = _IOR('u', UBLK_CMD_GET_DEV_INFO, struct ublksrv_ctrl_cmd);
210 			break;
211 		case UBLK_CMD_ADD_DEV:
212 			opc = _IOWR('u', UBLK_CMD_ADD_DEV, struct ublksrv_ctrl_cmd);
213 			break;
214 		case UBLK_CMD_DEL_DEV:
215 			opc = _IOWR('u', UBLK_CMD_DEL_DEV, struct ublksrv_ctrl_cmd);
216 			break;
217 		case UBLK_CMD_START_DEV:
218 			opc = _IOWR('u', UBLK_CMD_START_DEV, struct ublksrv_ctrl_cmd);
219 			break;
220 		case UBLK_CMD_STOP_DEV:
221 			opc = _IOWR('u', UBLK_CMD_STOP_DEV, struct ublksrv_ctrl_cmd);
222 			break;
223 		case UBLK_CMD_SET_PARAMS:
224 			opc = _IOWR('u', UBLK_CMD_SET_PARAMS, struct ublksrv_ctrl_cmd);
225 			break;
226 		case UBLK_CMD_START_USER_RECOVERY:
227 			opc = _IOWR('u', UBLK_CMD_START_USER_RECOVERY, struct ublksrv_ctrl_cmd);
228 			break;
229 		case UBLK_CMD_END_USER_RECOVERY:
230 			opc = _IOWR('u', UBLK_CMD_END_USER_RECOVERY, struct ublksrv_ctrl_cmd);
231 			break;
232 
233 		/* io uring */
234 		case UBLK_IO_FETCH_REQ:
235 			opc = _IOWR('u', UBLK_IO_FETCH_REQ, struct ublksrv_io_cmd);
236 			break;
237 		case UBLK_IO_COMMIT_AND_FETCH_REQ:
238 			opc = _IOWR('u', UBLK_IO_COMMIT_AND_FETCH_REQ, struct ublksrv_io_cmd);
239 			break;
240 		case UBLK_IO_NEED_GET_DATA:
241 			opc = _IOWR('u', UBLK_IO_NEED_GET_DATA, struct ublksrv_io_cmd);
242 			break;
243 		default:
244 			break;
245 		}
246 	}
247 
248 	sqe->off = opc;
249 }
250 
251 static inline uint64_t
252 build_user_data(uint16_t tag, uint8_t op)
253 {
254 	assert(!(tag >> 16) && !(op >> 8));
255 
256 	return tag | (op << 16);
257 }
258 
259 static inline uint16_t
260 user_data_to_tag(uint64_t user_data)
261 {
262 	return user_data & 0xffff;
263 }
264 
265 static inline uint8_t
266 user_data_to_op(uint64_t user_data)
267 {
268 	return (user_data >> 16) & 0xff;
269 }
270 
271 static inline uint64_t
272 ublk_user_copy_pos(uint16_t q_id, uint16_t tag)
273 {
274 	return (uint64_t)UBLKSRV_IO_BUF_OFFSET + ((((uint64_t)q_id) << UBLK_QID_OFF) | (((
275 				uint64_t)tag) << UBLK_TAG_OFF));
276 }
277 
278 void
279 spdk_ublk_init(void)
280 {
281 	assert(spdk_thread_is_app_thread(NULL));
282 
283 	g_ublk_tgt.ctrl_fd = -1;
284 	g_ublk_tgt.ctrl_ring.ring_fd = -1;
285 }
286 
287 static void
288 ublk_ctrl_cmd_error(struct spdk_ublk_dev *ublk, int32_t res)
289 {
290 	assert(res != 0);
291 
292 	SPDK_ERRLOG("ctrlr cmd %s failed, %s\n", ublk_op_name[ublk->current_cmd_op], spdk_strerror(-res));
293 	if (ublk->ctrl_cb) {
294 		ublk->ctrl_cb(ublk->cb_arg, res);
295 		ublk->ctrl_cb = NULL;
296 	}
297 
298 	switch (ublk->current_cmd_op) {
299 	case UBLK_CMD_ADD_DEV:
300 	case UBLK_CMD_SET_PARAMS:
301 	case UBLK_CMD_START_USER_RECOVERY:
302 	case UBLK_CMD_END_USER_RECOVERY:
303 		ublk_delete_dev(ublk);
304 		break;
305 	case UBLK_CMD_START_DEV:
306 		ublk_close_dev(ublk);
307 		break;
308 	case UBLK_CMD_GET_DEV_INFO:
309 		ublk_free_dev(ublk);
310 		break;
311 	case UBLK_CMD_STOP_DEV:
312 	case UBLK_CMD_DEL_DEV:
313 		break;
314 	default:
315 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", ublk->current_cmd_op);
316 		break;
317 	}
318 }
319 
320 static void
321 ublk_ctrl_process_cqe(struct io_uring_cqe *cqe)
322 {
323 	struct spdk_ublk_dev *ublk;
324 	int rc = 0;
325 
326 	ublk = (struct spdk_ublk_dev *)cqe->user_data;
327 	UBLK_DEBUGLOG(ublk, "ctrl cmd %s completed\n", ublk_op_name[ublk->current_cmd_op]);
328 	ublk->ctrl_ops_in_progress--;
329 
330 	if (spdk_unlikely(cqe->res != 0)) {
331 		ublk_ctrl_cmd_error(ublk, cqe->res);
332 		return;
333 	}
334 
335 	switch (ublk->current_cmd_op) {
336 	case UBLK_CMD_ADD_DEV:
337 		rc = ublk_set_params(ublk);
338 		if (rc < 0) {
339 			ublk_delete_dev(ublk);
340 			goto cb_done;
341 		}
342 		break;
343 	case UBLK_CMD_SET_PARAMS:
344 		rc = ublk_start_dev(ublk, false);
345 		if (rc < 0) {
346 			ublk_delete_dev(ublk);
347 			goto cb_done;
348 		}
349 		break;
350 	case UBLK_CMD_START_DEV:
351 		goto cb_done;
352 		break;
353 	case UBLK_CMD_STOP_DEV:
354 		break;
355 	case UBLK_CMD_DEL_DEV:
356 		if (ublk->ctrl_cb) {
357 			ublk->ctrl_cb(ublk->cb_arg, 0);
358 			ublk->ctrl_cb = NULL;
359 		}
360 		ublk_free_dev(ublk);
361 		break;
362 	case UBLK_CMD_GET_DEV_INFO:
363 		rc = ublk_ctrl_start_recovery(ublk);
364 		if (rc < 0) {
365 			ublk_delete_dev(ublk);
366 			goto cb_done;
367 		}
368 		break;
369 	case UBLK_CMD_START_USER_RECOVERY:
370 		rc = ublk_start_dev(ublk, true);
371 		if (rc < 0) {
372 			ublk_delete_dev(ublk);
373 			goto cb_done;
374 		}
375 		break;
376 	case UBLK_CMD_END_USER_RECOVERY:
377 		SPDK_NOTICELOG("Ublk %u recover done successfully\n", ublk->ublk_id);
378 		ublk->is_recovering = false;
379 		goto cb_done;
380 		break;
381 	default:
382 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", ublk->current_cmd_op);
383 		break;
384 	}
385 
386 	return;
387 
388 cb_done:
389 	if (ublk->ctrl_cb) {
390 		ublk->ctrl_cb(ublk->cb_arg, rc);
391 		ublk->ctrl_cb = NULL;
392 	}
393 }
394 
395 static int
396 ublk_ctrl_poller(void *arg)
397 {
398 	struct io_uring *ring = &g_ublk_tgt.ctrl_ring;
399 	struct io_uring_cqe *cqe;
400 	const int max = 8;
401 	int i, count = 0, rc;
402 
403 	if (!g_ublk_tgt.ctrl_ops_in_progress) {
404 		return SPDK_POLLER_IDLE;
405 	}
406 
407 	for (i = 0; i < max; i++) {
408 		rc = io_uring_peek_cqe(ring, &cqe);
409 		if (rc == -EAGAIN) {
410 			break;
411 		}
412 
413 		assert(cqe != NULL);
414 		g_ublk_tgt.ctrl_ops_in_progress--;
415 
416 		ublk_ctrl_process_cqe(cqe);
417 
418 		io_uring_cqe_seen(ring, cqe);
419 		count++;
420 	}
421 
422 	return count > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
423 }
424 
425 static int
426 ublk_ctrl_cmd_submit(struct spdk_ublk_dev *ublk, uint32_t cmd_op)
427 {
428 	uint32_t dev_id = ublk->ublk_id;
429 	int rc = -EINVAL;
430 	struct io_uring_sqe *sqe;
431 	struct ublksrv_ctrl_cmd *cmd;
432 
433 	UBLK_DEBUGLOG(ublk, "ctrl cmd %s\n", ublk_op_name[cmd_op]);
434 
435 	sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring);
436 	if (!sqe) {
437 		SPDK_ERRLOG("No available sqe in ctrl ring\n");
438 		assert(false);
439 		return -ENOENT;
440 	}
441 
442 	cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe);
443 	sqe->fd = g_ublk_tgt.ctrl_fd;
444 	sqe->opcode = IORING_OP_URING_CMD;
445 	sqe->ioprio = 0;
446 	cmd->dev_id = dev_id;
447 	cmd->queue_id = -1;
448 	ublk->current_cmd_op = cmd_op;
449 
450 	switch (cmd_op) {
451 	case UBLK_CMD_ADD_DEV:
452 	case UBLK_CMD_GET_DEV_INFO:
453 		cmd->addr = (__u64)(uintptr_t)&ublk->dev_info;
454 		cmd->len = sizeof(ublk->dev_info);
455 		break;
456 	case UBLK_CMD_SET_PARAMS:
457 		cmd->addr = (__u64)(uintptr_t)&ublk->dev_params;
458 		cmd->len = sizeof(ublk->dev_params);
459 		break;
460 	case UBLK_CMD_START_DEV:
461 		cmd->data[0] = getpid();
462 		break;
463 	case UBLK_CMD_STOP_DEV:
464 		break;
465 	case UBLK_CMD_DEL_DEV:
466 		break;
467 	case UBLK_CMD_START_USER_RECOVERY:
468 		break;
469 	case UBLK_CMD_END_USER_RECOVERY:
470 		cmd->data[0] = getpid();
471 		break;
472 	default:
473 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", cmd_op);
474 		return -EINVAL;
475 	}
476 	ublk_set_sqe_cmd_op(sqe, cmd_op);
477 	io_uring_sqe_set_data(sqe, ublk);
478 
479 	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
480 	if (rc < 0) {
481 		SPDK_ERRLOG("uring submit rc %d\n", rc);
482 		assert(false);
483 		return rc;
484 	}
485 	g_ublk_tgt.ctrl_ops_in_progress++;
486 	ublk->ctrl_ops_in_progress++;
487 
488 	return 0;
489 }
490 
491 static int
492 ublk_ctrl_cmd_get_features(void)
493 {
494 	int rc;
495 	struct io_uring_sqe *sqe;
496 	struct io_uring_cqe *cqe;
497 	struct ublksrv_ctrl_cmd *cmd;
498 	uint32_t cmd_op;
499 
500 	sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring);
501 	if (!sqe) {
502 		SPDK_ERRLOG("No available sqe in ctrl ring\n");
503 		assert(false);
504 		return -ENOENT;
505 	}
506 
507 	cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe);
508 	sqe->fd = g_ublk_tgt.ctrl_fd;
509 	sqe->opcode = IORING_OP_URING_CMD;
510 	sqe->ioprio = 0;
511 	cmd->dev_id = -1;
512 	cmd->queue_id = -1;
513 	cmd->addr = (__u64)(uintptr_t)&g_ublk_tgt.features;
514 	cmd->len = sizeof(g_ublk_tgt.features);
515 
516 	cmd_op = UBLK_U_CMD_GET_FEATURES;
517 	ublk_set_sqe_cmd_op(sqe, cmd_op);
518 
519 	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
520 	if (rc < 0) {
521 		SPDK_ERRLOG("uring submit rc %d\n", rc);
522 		return rc;
523 	}
524 
525 	rc = io_uring_wait_cqe(&g_ublk_tgt.ctrl_ring, &cqe);
526 	if (rc < 0) {
527 		SPDK_ERRLOG("wait cqe rc %d\n", rc);
528 		return rc;
529 	}
530 
531 	if (cqe->res == 0) {
532 		g_ublk_tgt.ioctl_encode = !!(g_ublk_tgt.features & UBLK_F_CMD_IOCTL_ENCODE);
533 		g_ublk_tgt.user_copy = !!(g_ublk_tgt.features & UBLK_F_USER_COPY);
534 		g_ublk_tgt.user_recovery = !!(g_ublk_tgt.features & UBLK_F_USER_RECOVERY);
535 	}
536 	io_uring_cqe_seen(&g_ublk_tgt.ctrl_ring, cqe);
537 
538 	return 0;
539 }
540 
541 static int
542 ublk_queue_cmd_buf_sz(uint32_t q_depth)
543 {
544 	uint32_t size = q_depth * sizeof(struct ublksrv_io_desc);
545 	uint32_t page_sz = getpagesize();
546 
547 	/* round up size */
548 	return (size + page_sz - 1) & ~(page_sz - 1);
549 }
550 
551 static int
552 ublk_get_max_support_devs(void)
553 {
554 	FILE *file;
555 	char str[128];
556 
557 	file = fopen("/sys/module/ublk_drv/parameters/ublks_max", "r");
558 	if (!file) {
559 		return -ENOENT;
560 	}
561 
562 	if (!fgets(str, sizeof(str), file)) {
563 		fclose(file);
564 		return -EINVAL;
565 	}
566 	fclose(file);
567 
568 	spdk_str_chomp(str);
569 	return spdk_strtol(str, 10);
570 }
571 
572 static int
573 ublk_open(void)
574 {
575 	int rc, ublks_max;
576 
577 	g_ublk_tgt.ctrl_fd = open(UBLK_CTRL_DEV, O_RDWR);
578 	if (g_ublk_tgt.ctrl_fd < 0) {
579 		rc = errno;
580 		SPDK_ERRLOG("UBLK conrol dev %s can't be opened, error=%s\n", UBLK_CTRL_DEV, spdk_strerror(errno));
581 		return -rc;
582 	}
583 
584 	ublks_max = ublk_get_max_support_devs();
585 	if (ublks_max > 0) {
586 		g_ublks_max = ublks_max;
587 	}
588 
589 	/* We need to set SQPOLL for kernels 6.1 and earlier, since they would not defer ublk ctrl
590 	 * ring processing to a workqueue.  Ctrl ring processing is minimal, so SQPOLL is fine.
591 	 * All the commands sent via control uring for a ublk device is executed one by one, so use
592 	 * ublks_max * 2 as the number of uring entries is enough.
593 	 */
594 	rc = ublk_setup_ring(g_ublks_max * 2, &g_ublk_tgt.ctrl_ring,
595 			     IORING_SETUP_SQE128 | IORING_SETUP_SQPOLL);
596 	if (rc < 0) {
597 		SPDK_ERRLOG("UBLK ctrl queue_init: %s\n", spdk_strerror(-rc));
598 		goto err;
599 	}
600 
601 	rc = ublk_ctrl_cmd_get_features();
602 	if (rc) {
603 		goto err;
604 	}
605 
606 	return 0;
607 
608 err:
609 	close(g_ublk_tgt.ctrl_fd);
610 	g_ublk_tgt.ctrl_fd = -1;
611 	return rc;
612 }
613 
614 static int
615 ublk_parse_core_mask(const char *mask)
616 {
617 	struct spdk_cpuset tmp_mask;
618 	int rc;
619 
620 	if (mask == NULL) {
621 		spdk_env_get_cpuset(&g_core_mask);
622 		return 0;
623 	}
624 
625 	rc = spdk_cpuset_parse(&g_core_mask, mask);
626 	if (rc < 0) {
627 		SPDK_ERRLOG("invalid cpumask %s\n", mask);
628 		return -EINVAL;
629 	}
630 
631 	if (spdk_cpuset_count(&g_core_mask) == 0) {
632 		SPDK_ERRLOG("no cpus specified\n");
633 		return -EINVAL;
634 	}
635 
636 	spdk_env_get_cpuset(&tmp_mask);
637 	spdk_cpuset_and(&tmp_mask, &g_core_mask);
638 
639 	if (!spdk_cpuset_equal(&tmp_mask, &g_core_mask)) {
640 		SPDK_ERRLOG("one of selected cpu is outside of core mask(=%s)\n",
641 			    spdk_cpuset_fmt(&g_core_mask));
642 		return -EINVAL;
643 	}
644 
645 	return 0;
646 }
647 
648 static void
649 ublk_poller_register(void *args)
650 {
651 	struct ublk_poll_group *poll_group = args;
652 	int rc;
653 
654 	assert(spdk_get_thread() == poll_group->ublk_thread);
655 	/* Bind ublk spdk_thread to current CPU core in order to avoid thread context switch
656 	 * during uring processing as required by ublk kernel.
657 	 */
658 	spdk_thread_bind(spdk_get_thread(), true);
659 
660 	TAILQ_INIT(&poll_group->queue_list);
661 	poll_group->ublk_poller = SPDK_POLLER_REGISTER(ublk_poll, poll_group, 0);
662 	rc = spdk_iobuf_channel_init(&poll_group->iobuf_ch, "ublk",
663 				     UBLK_IOBUF_SMALL_CACHE_SIZE, UBLK_IOBUF_LARGE_CACHE_SIZE);
664 	if (rc != 0) {
665 		assert(false);
666 	}
667 }
668 
669 int
670 ublk_create_target(const char *cpumask_str)
671 {
672 	int rc;
673 	uint32_t i;
674 	char thread_name[32];
675 	struct ublk_poll_group *poll_group;
676 
677 	if (g_ublk_tgt.active == true) {
678 		SPDK_ERRLOG("UBLK target has been created\n");
679 		return -EBUSY;
680 	}
681 
682 	rc = ublk_parse_core_mask(cpumask_str);
683 	if (rc != 0) {
684 		return rc;
685 	}
686 
687 	assert(g_ublk_tgt.poll_groups == NULL);
688 	g_ublk_tgt.poll_groups = calloc(spdk_env_get_core_count(), sizeof(*poll_group));
689 	if (!g_ublk_tgt.poll_groups) {
690 		return -ENOMEM;
691 	}
692 
693 	rc = ublk_open();
694 	if (rc != 0) {
695 		SPDK_ERRLOG("Fail to open UBLK, error=%s\n", spdk_strerror(-rc));
696 		free(g_ublk_tgt.poll_groups);
697 		g_ublk_tgt.poll_groups = NULL;
698 		return rc;
699 	}
700 
701 	spdk_iobuf_register_module("ublk");
702 
703 	SPDK_ENV_FOREACH_CORE(i) {
704 		if (!spdk_cpuset_get_cpu(&g_core_mask, i)) {
705 			continue;
706 		}
707 		snprintf(thread_name, sizeof(thread_name), "ublk_thread%u", i);
708 		poll_group = &g_ublk_tgt.poll_groups[g_num_ublk_poll_groups];
709 		poll_group->ublk_thread = spdk_thread_create(thread_name, &g_core_mask);
710 		spdk_thread_send_msg(poll_group->ublk_thread, ublk_poller_register, poll_group);
711 		g_num_ublk_poll_groups++;
712 	}
713 
714 	assert(spdk_thread_is_app_thread(NULL));
715 	g_ublk_tgt.active = true;
716 	g_ublk_tgt.ctrl_ops_in_progress = 0;
717 	g_ublk_tgt.ctrl_poller = SPDK_POLLER_REGISTER(ublk_ctrl_poller, NULL,
718 				 UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US);
719 
720 	SPDK_NOTICELOG("UBLK target created successfully\n");
721 
722 	return 0;
723 }
724 
725 static void
726 _ublk_fini_done(void *args)
727 {
728 	SPDK_DEBUGLOG(ublk, "\n");
729 
730 	g_num_ublk_poll_groups = 0;
731 	g_next_ublk_poll_group = 0;
732 	g_ublk_tgt.is_destroying = false;
733 	g_ublk_tgt.active = false;
734 	g_ublk_tgt.features = 0;
735 	g_ublk_tgt.ioctl_encode = false;
736 	g_ublk_tgt.user_copy = false;
737 	g_ublk_tgt.user_recovery = false;
738 
739 	if (g_ublk_tgt.cb_fn) {
740 		g_ublk_tgt.cb_fn(g_ublk_tgt.cb_arg);
741 		g_ublk_tgt.cb_fn = NULL;
742 		g_ublk_tgt.cb_arg = NULL;
743 	}
744 
745 	if (g_ublk_tgt.poll_groups) {
746 		free(g_ublk_tgt.poll_groups);
747 		g_ublk_tgt.poll_groups = NULL;
748 	}
749 
750 }
751 
752 static void
753 ublk_thread_exit(void *args)
754 {
755 	struct spdk_thread *ublk_thread = spdk_get_thread();
756 	uint32_t i;
757 
758 	for (i = 0; i < g_num_ublk_poll_groups; i++) {
759 		if (g_ublk_tgt.poll_groups[i].ublk_thread == ublk_thread) {
760 			spdk_poller_unregister(&g_ublk_tgt.poll_groups[i].ublk_poller);
761 			spdk_iobuf_channel_fini(&g_ublk_tgt.poll_groups[i].iobuf_ch);
762 			spdk_thread_bind(ublk_thread, false);
763 			spdk_thread_exit(ublk_thread);
764 		}
765 	}
766 }
767 
768 static int
769 ublk_close_dev(struct spdk_ublk_dev *ublk)
770 {
771 	int rc;
772 
773 	/* set is_closing */
774 	if (ublk->is_closing) {
775 		return -EBUSY;
776 	}
777 	ublk->is_closing = true;
778 
779 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_STOP_DEV);
780 	if (rc < 0) {
781 		SPDK_ERRLOG("stop dev %d failed\n", ublk->ublk_id);
782 	}
783 	return rc;
784 }
785 
786 static void
787 _ublk_fini(void *args)
788 {
789 	struct spdk_ublk_dev	*ublk, *ublk_tmp;
790 
791 	TAILQ_FOREACH_SAFE(ublk, &g_ublk_devs, tailq, ublk_tmp) {
792 		ublk_close_dev(ublk);
793 	}
794 
795 	/* Check if all ublks closed */
796 	if (TAILQ_EMPTY(&g_ublk_devs)) {
797 		SPDK_DEBUGLOG(ublk, "finish shutdown\n");
798 		spdk_poller_unregister(&g_ublk_tgt.ctrl_poller);
799 		if (g_ublk_tgt.ctrl_ring.ring_fd >= 0) {
800 			io_uring_queue_exit(&g_ublk_tgt.ctrl_ring);
801 			g_ublk_tgt.ctrl_ring.ring_fd = -1;
802 		}
803 		if (g_ublk_tgt.ctrl_fd >= 0) {
804 			close(g_ublk_tgt.ctrl_fd);
805 			g_ublk_tgt.ctrl_fd = -1;
806 		}
807 		spdk_for_each_thread(ublk_thread_exit, NULL, _ublk_fini_done);
808 	} else {
809 		spdk_thread_send_msg(spdk_get_thread(), _ublk_fini, NULL);
810 	}
811 }
812 
813 int
814 spdk_ublk_fini(spdk_ublk_fini_cb cb_fn, void *cb_arg)
815 {
816 	assert(spdk_thread_is_app_thread(NULL));
817 
818 	if (g_ublk_tgt.is_destroying == true) {
819 		/* UBLK target is being destroying */
820 		return -EBUSY;
821 	}
822 	g_ublk_tgt.cb_fn = cb_fn;
823 	g_ublk_tgt.cb_arg = cb_arg;
824 	g_ublk_tgt.is_destroying = true;
825 	_ublk_fini(NULL);
826 
827 	return 0;
828 }
829 
830 int
831 ublk_destroy_target(spdk_ublk_fini_cb cb_fn, void *cb_arg)
832 {
833 	int rc;
834 
835 	if (g_ublk_tgt.active == false) {
836 		/* UBLK target has not been created */
837 		return -ENOENT;
838 	}
839 
840 	rc = spdk_ublk_fini(cb_fn, cb_arg);
841 
842 	return rc;
843 }
844 
845 struct spdk_ublk_dev *
846 ublk_dev_find_by_id(uint32_t ublk_id)
847 {
848 	struct spdk_ublk_dev *ublk;
849 
850 	/* check whether ublk has already been registered by ublk path. */
851 	TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) {
852 		if (ublk->ublk_id == ublk_id) {
853 			return ublk;
854 		}
855 	}
856 
857 	return NULL;
858 }
859 
860 uint32_t
861 ublk_dev_get_id(struct spdk_ublk_dev *ublk)
862 {
863 	return ublk->ublk_id;
864 }
865 
866 struct spdk_ublk_dev *ublk_dev_first(void)
867 {
868 	return TAILQ_FIRST(&g_ublk_devs);
869 }
870 
871 struct spdk_ublk_dev *ublk_dev_next(struct spdk_ublk_dev *prev)
872 {
873 	return TAILQ_NEXT(prev, tailq);
874 }
875 
876 uint32_t
877 ublk_dev_get_queue_depth(struct spdk_ublk_dev *ublk)
878 {
879 	return ublk->queue_depth;
880 }
881 
882 uint32_t
883 ublk_dev_get_num_queues(struct spdk_ublk_dev *ublk)
884 {
885 	return ublk->num_queues;
886 }
887 
888 const char *
889 ublk_dev_get_bdev_name(struct spdk_ublk_dev *ublk)
890 {
891 	return spdk_bdev_get_name(ublk->bdev);
892 }
893 
894 void
895 spdk_ublk_write_config_json(struct spdk_json_write_ctx *w)
896 {
897 	struct spdk_ublk_dev *ublk;
898 
899 	spdk_json_write_array_begin(w);
900 
901 	if (g_ublk_tgt.active) {
902 		spdk_json_write_object_begin(w);
903 
904 		spdk_json_write_named_string(w, "method", "ublk_create_target");
905 		spdk_json_write_named_object_begin(w, "params");
906 		spdk_json_write_named_string(w, "cpumask", spdk_cpuset_fmt(&g_core_mask));
907 		spdk_json_write_object_end(w);
908 
909 		spdk_json_write_object_end(w);
910 	}
911 
912 	TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) {
913 		spdk_json_write_object_begin(w);
914 
915 		spdk_json_write_named_string(w, "method", "ublk_start_disk");
916 
917 		spdk_json_write_named_object_begin(w, "params");
918 		spdk_json_write_named_string(w, "bdev_name", ublk_dev_get_bdev_name(ublk));
919 		spdk_json_write_named_uint32(w, "ublk_id", ublk->ublk_id);
920 		spdk_json_write_named_uint32(w, "num_queues", ublk->num_queues);
921 		spdk_json_write_named_uint32(w, "queue_depth", ublk->queue_depth);
922 		spdk_json_write_object_end(w);
923 
924 		spdk_json_write_object_end(w);
925 	}
926 
927 	spdk_json_write_array_end(w);
928 }
929 
930 static void
931 ublk_dev_list_register(struct spdk_ublk_dev *ublk)
932 {
933 	UBLK_DEBUGLOG(ublk, "add to tailq\n");
934 	TAILQ_INSERT_TAIL(&g_ublk_devs, ublk, tailq);
935 	g_ublk_tgt.num_ublk_devs++;
936 }
937 
938 static void
939 ublk_dev_list_unregister(struct spdk_ublk_dev *ublk)
940 {
941 	/*
942 	 * ublk device may be stopped before registered.
943 	 * check whether it was registered.
944 	 */
945 
946 	if (ublk_dev_find_by_id(ublk->ublk_id)) {
947 		UBLK_DEBUGLOG(ublk, "remove from tailq\n");
948 		TAILQ_REMOVE(&g_ublk_devs, ublk, tailq);
949 		assert(g_ublk_tgt.num_ublk_devs);
950 		g_ublk_tgt.num_ublk_devs--;
951 		return;
952 	}
953 
954 	UBLK_DEBUGLOG(ublk, "not found in tailq\n");
955 	assert(false);
956 }
957 
958 static void
959 ublk_delete_dev(void *arg)
960 {
961 	struct spdk_ublk_dev *ublk = arg;
962 	int rc = 0;
963 	uint32_t q_idx;
964 
965 	assert(spdk_thread_is_app_thread(NULL));
966 	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
967 		ublk_dev_queue_fini(&ublk->queues[q_idx]);
968 	}
969 
970 	if (ublk->cdev_fd >= 0) {
971 		close(ublk->cdev_fd);
972 	}
973 
974 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_DEL_DEV);
975 	if (rc < 0) {
976 		SPDK_ERRLOG("delete dev %d failed\n", ublk->ublk_id);
977 	}
978 }
979 
980 static int
981 _ublk_close_dev_retry(void *arg)
982 {
983 	struct spdk_ublk_dev *ublk = arg;
984 
985 	if (ublk->ctrl_ops_in_progress > 0) {
986 		if (ublk->retry_count-- > 0) {
987 			return SPDK_POLLER_BUSY;
988 		}
989 		SPDK_ERRLOG("Timeout on ctrl op completion.\n");
990 	}
991 	spdk_poller_unregister(&ublk->retry_poller);
992 	ublk_delete_dev(ublk);
993 	return SPDK_POLLER_BUSY;
994 }
995 
996 static void
997 ublk_try_close_dev(void *arg)
998 {
999 	struct spdk_ublk_dev *ublk = arg;
1000 
1001 	assert(spdk_thread_is_app_thread(NULL));
1002 
1003 	ublk->queues_closed += 1;
1004 	SPDK_DEBUGLOG(ublk_io, "ublkb%u closed queues %u\n", ublk->ublk_id, ublk->queues_closed);
1005 
1006 	if (ublk->queues_closed < ublk->num_queues) {
1007 		return;
1008 	}
1009 
1010 	if (ublk->ctrl_ops_in_progress > 0) {
1011 		assert(ublk->retry_poller == NULL);
1012 		ublk->retry_count = UBLK_STOP_BUSY_WAITING_MS * 1000ULL / UBLK_BUSY_POLLING_INTERVAL_US;
1013 		ublk->retry_poller = SPDK_POLLER_REGISTER(_ublk_close_dev_retry, ublk,
1014 				     UBLK_BUSY_POLLING_INTERVAL_US);
1015 	} else {
1016 		ublk_delete_dev(ublk);
1017 	}
1018 }
1019 
1020 static void
1021 ublk_try_close_queue(struct ublk_queue *q)
1022 {
1023 	struct spdk_ublk_dev *ublk = q->dev;
1024 
1025 	/* Close queue until no I/O is submitted to bdev in flight,
1026 	 * no I/O is waiting to commit result, and all I/Os are aborted back.
1027 	 */
1028 	if (!TAILQ_EMPTY(&q->inflight_io_list) || !TAILQ_EMPTY(&q->completed_io_list) || q->cmd_inflight) {
1029 		/* wait for next retry */
1030 		return;
1031 	}
1032 
1033 	TAILQ_REMOVE(&q->poll_group->queue_list, q, tailq);
1034 	spdk_put_io_channel(q->bdev_ch);
1035 	q->bdev_ch = NULL;
1036 
1037 	spdk_thread_send_msg(spdk_thread_get_app_thread(), ublk_try_close_dev, ublk);
1038 }
1039 
1040 int
1041 ublk_stop_disk(uint32_t ublk_id, ublk_ctrl_cb ctrl_cb, void *cb_arg)
1042 {
1043 	struct spdk_ublk_dev *ublk;
1044 
1045 	assert(spdk_thread_is_app_thread(NULL));
1046 
1047 	ublk = ublk_dev_find_by_id(ublk_id);
1048 	if (ublk == NULL) {
1049 		SPDK_ERRLOG("no ublk dev with ublk_id=%u\n", ublk_id);
1050 		return -ENODEV;
1051 	}
1052 	if (ublk->is_closing) {
1053 		SPDK_WARNLOG("ublk %d is closing\n", ublk->ublk_id);
1054 		return -EBUSY;
1055 	}
1056 	if (ublk->ctrl_cb) {
1057 		SPDK_WARNLOG("ublk %d is busy with RPC call\n", ublk->ublk_id);
1058 		return -EBUSY;
1059 	}
1060 
1061 	ublk->ctrl_cb = ctrl_cb;
1062 	ublk->cb_arg = cb_arg;
1063 	return ublk_close_dev(ublk);
1064 }
1065 
1066 static inline void
1067 ublk_mark_io_done(struct ublk_io *io, int res)
1068 {
1069 	/*
1070 	 * mark io done by target, so that SPDK can commit its
1071 	 * result and fetch new request via io_uring command.
1072 	 */
1073 	io->cmd_op = UBLK_IO_COMMIT_AND_FETCH_REQ;
1074 	io->result = res;
1075 	io->need_data = false;
1076 }
1077 
1078 static void
1079 ublk_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1080 {
1081 	struct ublk_io	*io = cb_arg;
1082 	struct ublk_queue *q = io->q;
1083 	int res;
1084 
1085 	if (success) {
1086 		res = io->result;
1087 	} else {
1088 		res = -EIO;
1089 	}
1090 
1091 	ublk_mark_io_done(io, res);
1092 
1093 	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %d res %d)\n",
1094 		      q->q_id, io->tag, res);
1095 	TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1096 	TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq);
1097 
1098 	if (bdev_io != NULL) {
1099 		spdk_bdev_free_io(bdev_io);
1100 	}
1101 }
1102 
1103 static void
1104 ublk_queue_user_copy(struct ublk_io *io, bool is_write)
1105 {
1106 	struct ublk_queue *q = io->q;
1107 	const struct ublksrv_io_desc *iod = io->iod;
1108 	struct io_uring_sqe *sqe;
1109 	uint64_t pos;
1110 	uint32_t nbytes;
1111 
1112 	nbytes = iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1113 	pos = ublk_user_copy_pos(q->q_id, io->tag);
1114 	sqe = io_uring_get_sqe(&q->ring);
1115 	assert(sqe);
1116 
1117 	if (is_write) {
1118 		io_uring_prep_read(sqe, 0, io->payload, nbytes, pos);
1119 	} else {
1120 		io_uring_prep_write(sqe, 0, io->payload, nbytes, pos);
1121 	}
1122 	io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
1123 	io_uring_sqe_set_data64(sqe, build_user_data(io->tag, 0));
1124 
1125 	io->user_copy = true;
1126 	TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1127 	TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq);
1128 }
1129 
1130 static void
1131 ublk_user_copy_read_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1132 {
1133 	struct ublk_io	*io = cb_arg;
1134 
1135 	spdk_bdev_free_io(bdev_io);
1136 
1137 	if (success) {
1138 		ublk_queue_user_copy(io, false);
1139 		return;
1140 	}
1141 	/* READ IO Error */
1142 	ublk_io_done(NULL, false, cb_arg);
1143 }
1144 
1145 static void
1146 ublk_resubmit_io(void *arg)
1147 {
1148 	struct ublk_io *io = (struct ublk_io *)arg;
1149 
1150 	_ublk_submit_bdev_io(io->q, io);
1151 }
1152 
1153 static void
1154 ublk_queue_io(struct ublk_io *io)
1155 {
1156 	int rc;
1157 	struct spdk_bdev *bdev = io->q->dev->bdev;
1158 	struct ublk_queue *q = io->q;
1159 
1160 	io->bdev_io_wait.bdev = bdev;
1161 	io->bdev_io_wait.cb_fn = ublk_resubmit_io;
1162 	io->bdev_io_wait.cb_arg = io;
1163 
1164 	rc = spdk_bdev_queue_io_wait(bdev, q->bdev_ch, &io->bdev_io_wait);
1165 	if (rc != 0) {
1166 		SPDK_ERRLOG("Queue io failed in ublk_queue_io, rc=%d.\n", rc);
1167 		ublk_io_done(NULL, false, io);
1168 	}
1169 }
1170 
1171 static void
1172 ublk_io_get_buffer_cb(struct spdk_iobuf_entry *iobuf, void *buf)
1173 {
1174 	struct ublk_io *io = SPDK_CONTAINEROF(iobuf, struct ublk_io, iobuf);
1175 
1176 	io->mpool_entry = buf;
1177 	assert(io->payload == NULL);
1178 	io->payload = (void *)(uintptr_t)SPDK_ALIGN_CEIL((uintptr_t)buf, 4096ULL);
1179 	io->get_buf_cb(io);
1180 }
1181 
1182 static void
1183 ublk_io_get_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch,
1184 		   ublk_get_buf_cb get_buf_cb)
1185 {
1186 	void *buf;
1187 
1188 	io->payload_size = io->iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1189 	io->get_buf_cb = get_buf_cb;
1190 	buf = spdk_iobuf_get(iobuf_ch, io->payload_size, &io->iobuf, ublk_io_get_buffer_cb);
1191 
1192 	if (buf != NULL) {
1193 		ublk_io_get_buffer_cb(&io->iobuf, buf);
1194 	}
1195 }
1196 
1197 static void
1198 ublk_io_put_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch)
1199 {
1200 	if (io->payload) {
1201 		spdk_iobuf_put(iobuf_ch, io->mpool_entry, io->payload_size);
1202 		io->mpool_entry = NULL;
1203 		io->payload = NULL;
1204 	}
1205 }
1206 
1207 static void
1208 _ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io)
1209 {
1210 	struct spdk_ublk_dev *ublk = q->dev;
1211 	struct spdk_bdev_desc *desc = io->bdev_desc;
1212 	struct spdk_io_channel *ch = io->bdev_ch;
1213 	uint64_t offset_blocks, num_blocks;
1214 	spdk_bdev_io_completion_cb read_cb;
1215 	uint8_t ublk_op;
1216 	int rc = 0;
1217 	const struct ublksrv_io_desc *iod = io->iod;
1218 
1219 	ublk_op = ublksrv_get_op(iod);
1220 	offset_blocks = iod->start_sector >> ublk->sector_per_block_shift;
1221 	num_blocks = iod->nr_sectors >> ublk->sector_per_block_shift;
1222 
1223 	switch (ublk_op) {
1224 	case UBLK_IO_OP_READ:
1225 		if (g_ublk_tgt.user_copy) {
1226 			read_cb = ublk_user_copy_read_done;
1227 		} else {
1228 			read_cb = ublk_io_done;
1229 		}
1230 		rc = spdk_bdev_read_blocks(desc, ch, io->payload, offset_blocks, num_blocks, read_cb, io);
1231 		break;
1232 	case UBLK_IO_OP_WRITE:
1233 		rc = spdk_bdev_write_blocks(desc, ch, io->payload, offset_blocks, num_blocks, ublk_io_done, io);
1234 		break;
1235 	case UBLK_IO_OP_FLUSH:
1236 		rc = spdk_bdev_flush_blocks(desc, ch, 0, spdk_bdev_get_num_blocks(ublk->bdev), ublk_io_done, io);
1237 		break;
1238 	case UBLK_IO_OP_DISCARD:
1239 		rc = spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1240 		break;
1241 	case UBLK_IO_OP_WRITE_ZEROES:
1242 		rc = spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1243 		break;
1244 	default:
1245 		rc = -1;
1246 	}
1247 
1248 	if (rc < 0) {
1249 		if (rc == -ENOMEM) {
1250 			SPDK_INFOLOG(ublk, "No memory, start to queue io.\n");
1251 			ublk_queue_io(io);
1252 		} else {
1253 			SPDK_ERRLOG("ublk io failed in ublk_queue_io, rc=%d, ublk_op=%u\n", rc, ublk_op);
1254 			ublk_io_done(NULL, false, io);
1255 		}
1256 	}
1257 }
1258 
1259 static void
1260 read_get_buffer_done(struct ublk_io *io)
1261 {
1262 	_ublk_submit_bdev_io(io->q, io);
1263 }
1264 
1265 static void
1266 user_copy_write_get_buffer_done(struct ublk_io *io)
1267 {
1268 	ublk_queue_user_copy(io, true);
1269 }
1270 
1271 static void
1272 ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io)
1273 {
1274 	struct spdk_iobuf_channel *iobuf_ch = &q->poll_group->iobuf_ch;
1275 	const struct ublksrv_io_desc *iod = io->iod;
1276 	uint8_t ublk_op;
1277 
1278 	io->result = iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1279 	ublk_op = ublksrv_get_op(iod);
1280 	switch (ublk_op) {
1281 	case UBLK_IO_OP_READ:
1282 		ublk_io_get_buffer(io, iobuf_ch, read_get_buffer_done);
1283 		break;
1284 	case UBLK_IO_OP_WRITE:
1285 		if (g_ublk_tgt.user_copy) {
1286 			ublk_io_get_buffer(io, iobuf_ch, user_copy_write_get_buffer_done);
1287 		} else {
1288 			_ublk_submit_bdev_io(q, io);
1289 		}
1290 		break;
1291 	default:
1292 		_ublk_submit_bdev_io(q, io);
1293 		break;
1294 	}
1295 }
1296 
1297 static inline void
1298 ublksrv_queue_io_cmd(struct ublk_queue *q,
1299 		     struct ublk_io *io, unsigned tag)
1300 {
1301 	struct ublksrv_io_cmd *cmd;
1302 	struct io_uring_sqe *sqe;
1303 	unsigned int cmd_op = 0;;
1304 	uint64_t user_data;
1305 
1306 	/* each io should have operation of fetching or committing */
1307 	assert((io->cmd_op == UBLK_IO_FETCH_REQ) || (io->cmd_op == UBLK_IO_NEED_GET_DATA) ||
1308 	       (io->cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ));
1309 	cmd_op = io->cmd_op;
1310 
1311 	sqe = io_uring_get_sqe(&q->ring);
1312 	assert(sqe);
1313 
1314 	cmd = (struct ublksrv_io_cmd *)ublk_get_sqe_cmd(sqe);
1315 	if (cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ) {
1316 		cmd->result = io->result;
1317 	}
1318 
1319 	/* These fields should be written once, never change */
1320 	ublk_set_sqe_cmd_op(sqe, cmd_op);
1321 	/* dev->cdev_fd */
1322 	sqe->fd		= 0;
1323 	sqe->opcode	= IORING_OP_URING_CMD;
1324 	sqe->flags	= IOSQE_FIXED_FILE;
1325 	sqe->rw_flags	= 0;
1326 	cmd->tag	= tag;
1327 	cmd->addr	= g_ublk_tgt.user_copy ? 0 : (__u64)(uintptr_t)(io->payload);
1328 	cmd->q_id	= q->q_id;
1329 
1330 	user_data = build_user_data(tag, cmd_op);
1331 	io_uring_sqe_set_data64(sqe, user_data);
1332 
1333 	io->cmd_op = 0;
1334 
1335 	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %u cmd_op %u) iof %x stopping %d\n",
1336 		      q->q_id, tag, cmd_op,
1337 		      io->cmd_op, q->is_stopping);
1338 }
1339 
1340 static int
1341 ublk_io_xmit(struct ublk_queue *q)
1342 {
1343 	TAILQ_HEAD(, ublk_io) buffer_free_list;
1344 	struct spdk_iobuf_channel *iobuf_ch;
1345 	int rc = 0, count = 0;
1346 	struct ublk_io *io;
1347 
1348 	if (TAILQ_EMPTY(&q->completed_io_list)) {
1349 		return 0;
1350 	}
1351 
1352 	TAILQ_INIT(&buffer_free_list);
1353 	while (!TAILQ_EMPTY(&q->completed_io_list)) {
1354 		io = TAILQ_FIRST(&q->completed_io_list);
1355 		assert(io != NULL);
1356 		/*
1357 		 * Remove IO from list now assuming it will be completed. It will be inserted
1358 		 * back to the head if it cannot be completed. This approach is specifically
1359 		 * taken to work around a scan-build use-after-free mischaracterization.
1360 		 */
1361 		TAILQ_REMOVE(&q->completed_io_list, io, tailq);
1362 		if (!io->user_copy) {
1363 			if (!io->need_data) {
1364 				TAILQ_INSERT_TAIL(&buffer_free_list, io, tailq);
1365 			}
1366 			ublksrv_queue_io_cmd(q, io, io->tag);
1367 		}
1368 		count++;
1369 	}
1370 
1371 	q->cmd_inflight += count;
1372 	rc = io_uring_submit(&q->ring);
1373 	if (rc != count) {
1374 		SPDK_ERRLOG("could not submit all commands\n");
1375 		assert(false);
1376 	}
1377 
1378 	/* Note: for READ io, ublk will always copy the data out of
1379 	 * the buffers in the io_uring_submit context.  Since we
1380 	 * are not using SQPOLL for IO rings, we can safely free
1381 	 * those IO buffers here.  This design doesn't seem ideal,
1382 	 * but it's what's possible since there is no discrete
1383 	 * COMMIT_REQ operation.  That will need to change in the
1384 	 * future should we ever want to support async copy
1385 	 * operations.
1386 	 */
1387 	iobuf_ch = &q->poll_group->iobuf_ch;
1388 	while (!TAILQ_EMPTY(&buffer_free_list)) {
1389 		io = TAILQ_FIRST(&buffer_free_list);
1390 		TAILQ_REMOVE(&buffer_free_list, io, tailq);
1391 		ublk_io_put_buffer(io, iobuf_ch);
1392 	}
1393 	return rc;
1394 }
1395 
1396 static void
1397 write_get_buffer_done(struct ublk_io *io)
1398 {
1399 	io->need_data = true;
1400 	io->cmd_op = UBLK_IO_NEED_GET_DATA;
1401 	io->result = 0;
1402 
1403 	TAILQ_REMOVE(&io->q->inflight_io_list, io, tailq);
1404 	TAILQ_INSERT_TAIL(&io->q->completed_io_list, io, tailq);
1405 }
1406 
1407 static int
1408 ublk_io_recv(struct ublk_queue *q)
1409 {
1410 	struct io_uring_cqe *cqe;
1411 	unsigned head, tag;
1412 	int fetch, count = 0;
1413 	struct ublk_io *io;
1414 	struct spdk_iobuf_channel *iobuf_ch;
1415 
1416 	if (q->cmd_inflight == 0) {
1417 		return 0;
1418 	}
1419 
1420 	iobuf_ch = &q->poll_group->iobuf_ch;
1421 	io_uring_for_each_cqe(&q->ring, head, cqe) {
1422 		tag = user_data_to_tag(cqe->user_data);
1423 		io = &q->ios[tag];
1424 
1425 		SPDK_DEBUGLOG(ublk_io, "res %d qid %d tag %u, user copy %u, cmd_op %u\n",
1426 			      cqe->res, q->q_id, tag, io->user_copy, user_data_to_op(cqe->user_data));
1427 
1428 		q->cmd_inflight--;
1429 		TAILQ_INSERT_TAIL(&q->inflight_io_list, io, tailq);
1430 
1431 		if (!io->user_copy) {
1432 			fetch = (cqe->res != UBLK_IO_RES_ABORT) && !q->is_stopping;
1433 			if (!fetch) {
1434 				q->is_stopping = true;
1435 				if (io->cmd_op == UBLK_IO_FETCH_REQ) {
1436 					io->cmd_op = 0;
1437 				}
1438 			}
1439 
1440 			if (cqe->res == UBLK_IO_RES_OK) {
1441 				ublk_submit_bdev_io(q, io);
1442 			} else if (cqe->res == UBLK_IO_RES_NEED_GET_DATA) {
1443 				ublk_io_get_buffer(io, iobuf_ch, write_get_buffer_done);
1444 			} else {
1445 				if (cqe->res != UBLK_IO_RES_ABORT) {
1446 					SPDK_ERRLOG("ublk received error io: res %d qid %d tag %u cmd_op %u\n",
1447 						    cqe->res, q->q_id, tag, user_data_to_op(cqe->user_data));
1448 				}
1449 				TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1450 			}
1451 		} else {
1452 
1453 			/* clear `user_copy` for next use of this IO structure */
1454 			io->user_copy = false;
1455 
1456 			assert((ublksrv_get_op(io->iod) == UBLK_IO_OP_READ) ||
1457 			       (ublksrv_get_op(io->iod) == UBLK_IO_OP_WRITE));
1458 			if (cqe->res != io->result) {
1459 				/* EIO */
1460 				ublk_io_done(NULL, false, io);
1461 			} else {
1462 				if (ublksrv_get_op(io->iod) == UBLK_IO_OP_READ) {
1463 					/* bdev_io is already freed in first READ cycle */
1464 					ublk_io_done(NULL, true, io);
1465 				} else {
1466 					_ublk_submit_bdev_io(q, io);
1467 				}
1468 			}
1469 		}
1470 		count += 1;
1471 		if (count == UBLK_QUEUE_REQUEST) {
1472 			break;
1473 		}
1474 	}
1475 	io_uring_cq_advance(&q->ring, count);
1476 
1477 	return count;
1478 }
1479 
1480 static int
1481 ublk_poll(void *arg)
1482 {
1483 	struct ublk_poll_group *poll_group = arg;
1484 	struct ublk_queue *q, *q_tmp;
1485 	int sent, received, count = 0;
1486 
1487 	TAILQ_FOREACH_SAFE(q, &poll_group->queue_list, tailq, q_tmp) {
1488 		sent = ublk_io_xmit(q);
1489 		received = ublk_io_recv(q);
1490 		if (spdk_unlikely(q->is_stopping)) {
1491 			ublk_try_close_queue(q);
1492 		}
1493 		count += sent + received;
1494 	}
1495 	if (count > 0) {
1496 		return SPDK_POLLER_BUSY;
1497 	} else {
1498 		return SPDK_POLLER_IDLE;
1499 	}
1500 }
1501 
1502 static void
1503 ublk_bdev_hot_remove(struct spdk_ublk_dev *ublk)
1504 {
1505 	ublk_close_dev(ublk);
1506 }
1507 
1508 static void
1509 ublk_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1510 		   void *event_ctx)
1511 {
1512 	switch (type) {
1513 	case SPDK_BDEV_EVENT_REMOVE:
1514 		ublk_bdev_hot_remove(event_ctx);
1515 		break;
1516 	default:
1517 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1518 		break;
1519 	}
1520 }
1521 
1522 static void
1523 ublk_dev_init_io_cmds(struct io_uring *r, uint32_t q_depth)
1524 {
1525 	struct io_uring_sqe *sqe;
1526 	uint32_t i;
1527 
1528 	for (i = 0; i < q_depth; i++) {
1529 		sqe = ublk_uring_get_sqe(r, i);
1530 
1531 		/* These fields should be written once, never change */
1532 		sqe->flags = IOSQE_FIXED_FILE;
1533 		sqe->rw_flags = 0;
1534 		sqe->ioprio = 0;
1535 		sqe->off = 0;
1536 	}
1537 }
1538 
1539 static int
1540 ublk_dev_queue_init(struct ublk_queue *q)
1541 {
1542 	int rc = 0, cmd_buf_size;
1543 	uint32_t j;
1544 	struct spdk_ublk_dev *ublk = q->dev;
1545 	unsigned long off;
1546 
1547 	cmd_buf_size = ublk_queue_cmd_buf_sz(q->q_depth);
1548 	off = UBLKSRV_CMD_BUF_OFFSET +
1549 	      q->q_id * (UBLK_MAX_QUEUE_DEPTH * sizeof(struct ublksrv_io_desc));
1550 	q->io_cmd_buf = (struct ublksrv_io_desc *)mmap(0, cmd_buf_size, PROT_READ,
1551 			MAP_SHARED | MAP_POPULATE, ublk->cdev_fd, off);
1552 	if (q->io_cmd_buf == MAP_FAILED) {
1553 		q->io_cmd_buf = NULL;
1554 		rc = -errno;
1555 		SPDK_ERRLOG("Failed at mmap: %s\n", spdk_strerror(-rc));
1556 		return rc;
1557 	}
1558 
1559 	for (j = 0; j < q->q_depth; j++) {
1560 		q->ios[j].cmd_op = UBLK_IO_FETCH_REQ;
1561 		q->ios[j].iod = &q->io_cmd_buf[j];
1562 	}
1563 
1564 	rc = ublk_setup_ring(q->q_depth, &q->ring, IORING_SETUP_SQE128);
1565 	if (rc < 0) {
1566 		SPDK_ERRLOG("Failed at setup uring: %s\n", spdk_strerror(-rc));
1567 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1568 		q->io_cmd_buf = NULL;
1569 		return rc;
1570 	}
1571 
1572 	rc = io_uring_register_files(&q->ring, &ublk->cdev_fd, 1);
1573 	if (rc != 0) {
1574 		SPDK_ERRLOG("Failed at uring register files: %s\n", spdk_strerror(-rc));
1575 		io_uring_queue_exit(&q->ring);
1576 		q->ring.ring_fd = -1;
1577 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1578 		q->io_cmd_buf = NULL;
1579 		return rc;
1580 	}
1581 
1582 	ublk_dev_init_io_cmds(&q->ring, q->q_depth);
1583 
1584 	return 0;
1585 }
1586 
1587 static void
1588 ublk_dev_queue_fini(struct ublk_queue *q)
1589 {
1590 	if (q->ring.ring_fd >= 0) {
1591 		io_uring_unregister_files(&q->ring);
1592 		io_uring_queue_exit(&q->ring);
1593 		q->ring.ring_fd = -1;
1594 	}
1595 	if (q->io_cmd_buf) {
1596 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1597 	}
1598 }
1599 
1600 static void
1601 ublk_dev_queue_io_init(struct ublk_queue *q)
1602 {
1603 	struct ublk_io *io;
1604 	uint32_t i;
1605 	int rc __attribute__((unused));
1606 	void *buf;
1607 
1608 	/* Some older kernels require a buffer to get posted, even
1609 	 * when NEED_GET_DATA has been specified.  So allocate a
1610 	 * temporary buffer, only for purposes of this workaround.
1611 	 * It never actually gets used, so we will free it immediately
1612 	 * after all of the commands are posted.
1613 	 */
1614 	buf = malloc(64);
1615 
1616 	assert(q->bdev_ch != NULL);
1617 
1618 	/* Initialize and submit all io commands to ublk driver */
1619 	for (i = 0; i < q->q_depth; i++) {
1620 		io = &q->ios[i];
1621 		io->tag = (uint16_t)i;
1622 		io->payload = buf;
1623 		io->bdev_ch = q->bdev_ch;
1624 		io->bdev_desc = q->dev->bdev_desc;
1625 		ublksrv_queue_io_cmd(q, io, i);
1626 	}
1627 
1628 	q->cmd_inflight += q->q_depth;
1629 	rc = io_uring_submit(&q->ring);
1630 	assert(rc == (int)q->q_depth);
1631 	for (i = 0; i < q->q_depth; i++) {
1632 		io = &q->ios[i];
1633 		io->payload = NULL;
1634 	}
1635 	free(buf);
1636 }
1637 
1638 static int
1639 ublk_set_params(struct spdk_ublk_dev *ublk)
1640 {
1641 	int rc;
1642 
1643 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_SET_PARAMS);
1644 	if (rc < 0) {
1645 		SPDK_ERRLOG("UBLK can't set params for dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
1646 	}
1647 
1648 	return rc;
1649 }
1650 
1651 static void
1652 ublk_dev_info_init(struct spdk_ublk_dev *ublk)
1653 {
1654 	struct ublksrv_ctrl_dev_info uinfo = {
1655 		.queue_depth = ublk->queue_depth,
1656 		.nr_hw_queues = ublk->num_queues,
1657 		.dev_id = ublk->ublk_id,
1658 		.max_io_buf_bytes = UBLK_IO_MAX_BYTES,
1659 		.ublksrv_pid = getpid(),
1660 		.flags = UBLK_F_URING_CMD_COMP_IN_TASK,
1661 	};
1662 
1663 	if (g_ublk_tgt.user_copy) {
1664 		uinfo.flags |= UBLK_F_USER_COPY;
1665 	} else {
1666 		uinfo.flags |= UBLK_F_NEED_GET_DATA;
1667 	}
1668 
1669 	if (g_ublk_tgt.user_recovery) {
1670 		uinfo.flags |= UBLK_F_USER_RECOVERY;
1671 		uinfo.flags |= UBLK_F_USER_RECOVERY_REISSUE;
1672 	}
1673 
1674 	ublk->dev_info = uinfo;
1675 }
1676 
1677 /* Set ublk device parameters based on bdev */
1678 static void
1679 ublk_info_param_init(struct spdk_ublk_dev *ublk)
1680 {
1681 	struct spdk_bdev *bdev = ublk->bdev;
1682 	uint32_t blk_size = spdk_bdev_get_data_block_size(bdev);
1683 	uint32_t pblk_size = spdk_bdev_get_physical_block_size(bdev);
1684 	uint32_t io_opt_blocks = spdk_bdev_get_optimal_io_boundary(bdev);
1685 	uint64_t num_blocks = spdk_bdev_get_num_blocks(bdev);
1686 	uint8_t sectors_per_block = blk_size >> LINUX_SECTOR_SHIFT;
1687 	uint32_t io_min_size = blk_size;
1688 	uint32_t io_opt_size = spdk_max(io_opt_blocks * blk_size, io_min_size);
1689 
1690 	struct ublk_params uparams = {
1691 		.types = UBLK_PARAM_TYPE_BASIC,
1692 		.len = sizeof(struct ublk_params),
1693 		.basic = {
1694 			.logical_bs_shift = spdk_u32log2(blk_size),
1695 			.physical_bs_shift = spdk_u32log2(pblk_size),
1696 			.io_min_shift = spdk_u32log2(io_min_size),
1697 			.io_opt_shift = spdk_u32log2(io_opt_size),
1698 			.dev_sectors = num_blocks * sectors_per_block,
1699 			.max_sectors = UBLK_IO_MAX_BYTES >> LINUX_SECTOR_SHIFT,
1700 		}
1701 	};
1702 
1703 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_FLUSH)) {
1704 		uparams.basic.attrs = UBLK_ATTR_VOLATILE_CACHE;
1705 	}
1706 
1707 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1708 		uparams.types |= UBLK_PARAM_TYPE_DISCARD;
1709 		uparams.discard.discard_alignment = sectors_per_block;
1710 		uparams.discard.max_discard_sectors = num_blocks * sectors_per_block;
1711 		uparams.discard.max_discard_segments = 1;
1712 		uparams.discard.discard_granularity = blk_size;
1713 		if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1714 			uparams.discard.max_write_zeroes_sectors = num_blocks * sectors_per_block;
1715 		}
1716 	}
1717 
1718 	ublk->dev_params = uparams;
1719 }
1720 
1721 static void
1722 _ublk_free_dev(void *arg)
1723 {
1724 	struct spdk_ublk_dev *ublk = arg;
1725 
1726 	ublk_free_dev(ublk);
1727 }
1728 
1729 static void
1730 free_buffers(void *arg)
1731 {
1732 	struct ublk_queue *q = arg;
1733 	uint32_t i;
1734 
1735 	for (i = 0; i < q->q_depth; i++) {
1736 		ublk_io_put_buffer(&q->ios[i], &q->poll_group->iobuf_ch);
1737 	}
1738 	free(q->ios);
1739 	q->ios = NULL;
1740 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _ublk_free_dev, q->dev);
1741 }
1742 
1743 static void
1744 ublk_free_dev(struct spdk_ublk_dev *ublk)
1745 {
1746 	struct ublk_queue *q;
1747 	uint32_t q_idx;
1748 
1749 	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
1750 		q = &ublk->queues[q_idx];
1751 
1752 		/* The ublk_io of this queue are not initialized. */
1753 		if (q->ios == NULL) {
1754 			continue;
1755 		}
1756 
1757 		/* We found a queue that has an ios array that may have buffers
1758 		 * that need to be freed.  Send a message to the queue's thread
1759 		 * so it can free the buffers back to that thread's iobuf channel.
1760 		 * When it's done, it will set q->ios to NULL and send a message
1761 		 * back to this function to continue.
1762 		 */
1763 		if (q->poll_group) {
1764 			spdk_thread_send_msg(q->poll_group->ublk_thread, free_buffers, q);
1765 			return;
1766 		} else {
1767 			free(q->ios);
1768 			q->ios = NULL;
1769 		}
1770 	}
1771 
1772 	/* All of the buffers associated with the queues have been freed, so now
1773 	 * continue with releasing resources for the rest of the ublk device.
1774 	 */
1775 	if (ublk->bdev_desc) {
1776 		spdk_bdev_close(ublk->bdev_desc);
1777 		ublk->bdev_desc = NULL;
1778 	}
1779 
1780 	ublk_dev_list_unregister(ublk);
1781 	SPDK_NOTICELOG("ublk dev %d stopped\n", ublk->ublk_id);
1782 
1783 	free(ublk);
1784 }
1785 
1786 static int
1787 ublk_ios_init(struct spdk_ublk_dev *ublk)
1788 {
1789 	int rc;
1790 	uint32_t i, j;
1791 	struct ublk_queue *q;
1792 
1793 	for (i = 0; i < ublk->num_queues; i++) {
1794 		q = &ublk->queues[i];
1795 
1796 		TAILQ_INIT(&q->completed_io_list);
1797 		TAILQ_INIT(&q->inflight_io_list);
1798 		q->dev = ublk;
1799 		q->q_id = i;
1800 		q->q_depth = ublk->queue_depth;
1801 		q->ios = calloc(q->q_depth, sizeof(struct ublk_io));
1802 		if (!q->ios) {
1803 			rc = -ENOMEM;
1804 			SPDK_ERRLOG("could not allocate queue ios\n");
1805 			goto err;
1806 		}
1807 		for (j = 0; j < q->q_depth; j++) {
1808 			q->ios[j].q = q;
1809 		}
1810 	}
1811 
1812 	return 0;
1813 
1814 err:
1815 	for (i = 0; i < ublk->num_queues; i++) {
1816 		free(q->ios);
1817 		q->ios = NULL;
1818 	}
1819 	return rc;
1820 }
1821 
1822 static void
1823 ublk_queue_recovery_done(void *arg)
1824 {
1825 	struct spdk_ublk_dev *ublk = arg;
1826 
1827 	ublk->online_num_queues++;
1828 	if (ublk->is_recovering && (ublk->online_num_queues == ublk->num_queues)) {
1829 		ublk_ctrl_cmd_submit(ublk, UBLK_CMD_END_USER_RECOVERY);
1830 	}
1831 }
1832 
1833 static void
1834 ublk_queue_run(void *arg1)
1835 {
1836 	struct ublk_queue	*q = arg1;
1837 	struct spdk_ublk_dev *ublk = q->dev;
1838 	struct ublk_poll_group *poll_group = q->poll_group;
1839 
1840 	assert(spdk_get_thread() == poll_group->ublk_thread);
1841 	q->bdev_ch = spdk_bdev_get_io_channel(ublk->bdev_desc);
1842 	/* Queues must be filled with IO in the io pthread */
1843 	ublk_dev_queue_io_init(q);
1844 
1845 	TAILQ_INSERT_TAIL(&poll_group->queue_list, q, tailq);
1846 	spdk_thread_send_msg(spdk_thread_get_app_thread(), ublk_queue_recovery_done, ublk);
1847 }
1848 
1849 int
1850 ublk_start_disk(const char *bdev_name, uint32_t ublk_id,
1851 		uint32_t num_queues, uint32_t queue_depth,
1852 		ublk_ctrl_cb ctrl_cb, void *cb_arg)
1853 {
1854 	int			rc;
1855 	uint32_t		i;
1856 	struct spdk_bdev	*bdev;
1857 	struct spdk_ublk_dev	*ublk = NULL;
1858 	uint32_t		sector_per_block;
1859 
1860 	assert(spdk_thread_is_app_thread(NULL));
1861 
1862 	if (g_ublk_tgt.active == false) {
1863 		SPDK_ERRLOG("NO ublk target exist\n");
1864 		return -ENODEV;
1865 	}
1866 
1867 	ublk = ublk_dev_find_by_id(ublk_id);
1868 	if (ublk != NULL) {
1869 		SPDK_DEBUGLOG(ublk, "ublk id %d is in use.\n", ublk_id);
1870 		return -EBUSY;
1871 	}
1872 
1873 	if (g_ublk_tgt.num_ublk_devs >= g_ublks_max) {
1874 		SPDK_DEBUGLOG(ublk, "Reached maximum number of supported devices: %u\n", g_ublks_max);
1875 		return -ENOTSUP;
1876 	}
1877 
1878 	ublk = calloc(1, sizeof(*ublk));
1879 	if (ublk == NULL) {
1880 		return -ENOMEM;
1881 	}
1882 	ublk->ctrl_cb = ctrl_cb;
1883 	ublk->cb_arg = cb_arg;
1884 	ublk->cdev_fd = -1;
1885 	ublk->ublk_id = ublk_id;
1886 	UBLK_DEBUGLOG(ublk, "bdev %s num_queues %d queue_depth %d\n",
1887 		      bdev_name, num_queues, queue_depth);
1888 
1889 	rc = spdk_bdev_open_ext(bdev_name, true, ublk_bdev_event_cb, ublk, &ublk->bdev_desc);
1890 	if (rc != 0) {
1891 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc);
1892 		free(ublk);
1893 		return rc;
1894 	}
1895 
1896 	bdev = spdk_bdev_desc_get_bdev(ublk->bdev_desc);
1897 	ublk->bdev = bdev;
1898 	sector_per_block = spdk_bdev_get_data_block_size(ublk->bdev) >> LINUX_SECTOR_SHIFT;
1899 	ublk->sector_per_block_shift = spdk_u32log2(sector_per_block);
1900 
1901 	ublk->queues_closed = 0;
1902 	ublk->num_queues = num_queues;
1903 	ublk->queue_depth = queue_depth;
1904 	if (ublk->queue_depth > UBLK_DEV_MAX_QUEUE_DEPTH) {
1905 		SPDK_WARNLOG("Set Queue depth %d of UBLK %d to maximum %d\n",
1906 			     ublk->queue_depth, ublk->ublk_id, UBLK_DEV_MAX_QUEUE_DEPTH);
1907 		ublk->queue_depth = UBLK_DEV_MAX_QUEUE_DEPTH;
1908 	}
1909 	if (ublk->num_queues > UBLK_DEV_MAX_QUEUES) {
1910 		SPDK_WARNLOG("Set Queue num %d of UBLK %d to maximum %d\n",
1911 			     ublk->num_queues, ublk->ublk_id, UBLK_DEV_MAX_QUEUES);
1912 		ublk->num_queues = UBLK_DEV_MAX_QUEUES;
1913 	}
1914 	for (i = 0; i < ublk->num_queues; i++) {
1915 		ublk->queues[i].ring.ring_fd = -1;
1916 	}
1917 
1918 	ublk_dev_info_init(ublk);
1919 	ublk_info_param_init(ublk);
1920 	rc = ublk_ios_init(ublk);
1921 	if (rc != 0) {
1922 		spdk_bdev_close(ublk->bdev_desc);
1923 		free(ublk);
1924 		return rc;
1925 	}
1926 
1927 	SPDK_INFOLOG(ublk, "Enabling kernel access to bdev %s via ublk %d\n",
1928 		     bdev_name, ublk_id);
1929 
1930 	/* Add ublk_dev to the end of disk list */
1931 	ublk_dev_list_register(ublk);
1932 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_ADD_DEV);
1933 	if (rc < 0) {
1934 		SPDK_ERRLOG("UBLK can't add dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
1935 		ublk_free_dev(ublk);
1936 	}
1937 
1938 	return rc;
1939 }
1940 
1941 static int
1942 ublk_start_dev(struct spdk_ublk_dev *ublk, bool is_recovering)
1943 {
1944 	int			rc;
1945 	uint32_t		q_id;
1946 	struct spdk_thread	*ublk_thread;
1947 	char			buf[64];
1948 
1949 	snprintf(buf, 64, "%s%d", UBLK_BLK_CDEV, ublk->ublk_id);
1950 	ublk->cdev_fd = open(buf, O_RDWR);
1951 	if (ublk->cdev_fd < 0) {
1952 		rc = ublk->cdev_fd;
1953 		SPDK_ERRLOG("can't open %s, rc %d\n", buf, rc);
1954 		return rc;
1955 	}
1956 
1957 	for (q_id = 0; q_id < ublk->num_queues; q_id++) {
1958 		rc = ublk_dev_queue_init(&ublk->queues[q_id]);
1959 		if (rc) {
1960 			return rc;
1961 		}
1962 	}
1963 
1964 	if (!is_recovering) {
1965 		rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_START_DEV);
1966 		if (rc < 0) {
1967 			SPDK_ERRLOG("start dev %d failed, rc %s\n", ublk->ublk_id,
1968 				    spdk_strerror(-rc));
1969 			return rc;
1970 		}
1971 	}
1972 
1973 	/* Send queue to different spdk_threads for load balance */
1974 	for (q_id = 0; q_id < ublk->num_queues; q_id++) {
1975 		ublk->queues[q_id].poll_group = &g_ublk_tgt.poll_groups[g_next_ublk_poll_group];
1976 		ublk_thread = g_ublk_tgt.poll_groups[g_next_ublk_poll_group].ublk_thread;
1977 		spdk_thread_send_msg(ublk_thread, ublk_queue_run, &ublk->queues[q_id]);
1978 		g_next_ublk_poll_group++;
1979 		if (g_next_ublk_poll_group == g_num_ublk_poll_groups) {
1980 			g_next_ublk_poll_group = 0;
1981 		}
1982 	}
1983 
1984 	return 0;
1985 }
1986 
1987 static int
1988 ublk_ctrl_start_recovery(struct spdk_ublk_dev *ublk)
1989 {
1990 	int                     rc;
1991 	uint32_t                i;
1992 
1993 	if (ublk->ublk_id != ublk->dev_info.dev_id) {
1994 		SPDK_ERRLOG("Invalid ublk ID\n");
1995 		return -EINVAL;
1996 	}
1997 
1998 	ublk->num_queues = ublk->dev_info.nr_hw_queues;
1999 	ublk->queue_depth = ublk->dev_info.queue_depth;
2000 	ublk->dev_info.ublksrv_pid = getpid();
2001 
2002 	SPDK_DEBUGLOG(ublk, "Recovering ublk %d, num queues %u, queue depth %u, flags 0x%llx\n",
2003 		      ublk->ublk_id,
2004 		      ublk->num_queues, ublk->queue_depth, ublk->dev_info.flags);
2005 
2006 	for (i = 0; i < ublk->num_queues; i++) {
2007 		ublk->queues[i].ring.ring_fd = -1;
2008 	}
2009 
2010 	ublk_info_param_init(ublk);
2011 	rc = ublk_ios_init(ublk);
2012 	if (rc != 0) {
2013 		return rc;
2014 	}
2015 
2016 	ublk->is_recovering = true;
2017 	return ublk_ctrl_cmd_submit(ublk, UBLK_CMD_START_USER_RECOVERY);
2018 }
2019 
2020 int
2021 ublk_start_disk_recovery(const char *bdev_name, uint32_t ublk_id, ublk_ctrl_cb ctrl_cb,
2022 			 void *cb_arg)
2023 {
2024 	int			rc;
2025 	struct spdk_bdev	*bdev;
2026 	struct spdk_ublk_dev	*ublk = NULL;
2027 	uint32_t		sector_per_block;
2028 
2029 	assert(spdk_thread_is_app_thread(NULL));
2030 
2031 	if (g_ublk_tgt.active == false) {
2032 		SPDK_ERRLOG("NO ublk target exist\n");
2033 		return -ENODEV;
2034 	}
2035 
2036 	if (!g_ublk_tgt.user_recovery) {
2037 		SPDK_ERRLOG("User recovery is enabled with kernel version >= 6.4\n");
2038 		return -ENOTSUP;
2039 	}
2040 
2041 	ublk = ublk_dev_find_by_id(ublk_id);
2042 	if (ublk != NULL) {
2043 		SPDK_DEBUGLOG(ublk, "ublk id %d is in use.\n", ublk_id);
2044 		return -EBUSY;
2045 	}
2046 
2047 	if (g_ublk_tgt.num_ublk_devs >= g_ublks_max) {
2048 		SPDK_DEBUGLOG(ublk, "Reached maximum number of supported devices: %u\n", g_ublks_max);
2049 		return -ENOTSUP;
2050 	}
2051 
2052 	ublk = calloc(1, sizeof(*ublk));
2053 	if (ublk == NULL) {
2054 		return -ENOMEM;
2055 	}
2056 	ublk->ctrl_cb = ctrl_cb;
2057 	ublk->cb_arg = cb_arg;
2058 	ublk->cdev_fd = -1;
2059 	ublk->ublk_id = ublk_id;
2060 
2061 	rc = spdk_bdev_open_ext(bdev_name, true, ublk_bdev_event_cb, ublk, &ublk->bdev_desc);
2062 	if (rc != 0) {
2063 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc);
2064 		free(ublk);
2065 		return rc;
2066 	}
2067 
2068 	bdev = spdk_bdev_desc_get_bdev(ublk->bdev_desc);
2069 	ublk->bdev = bdev;
2070 	sector_per_block = spdk_bdev_get_data_block_size(ublk->bdev) >> LINUX_SECTOR_SHIFT;
2071 	ublk->sector_per_block_shift = spdk_u32log2(sector_per_block);
2072 
2073 	SPDK_NOTICELOG("Recovering ublk %d with bdev %s\n", ublk->ublk_id, bdev_name);
2074 
2075 	ublk_dev_list_register(ublk);
2076 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_GET_DEV_INFO);
2077 	if (rc < 0) {
2078 		ublk_free_dev(ublk);
2079 	}
2080 
2081 	return rc;
2082 }
2083 
2084 SPDK_LOG_REGISTER_COMPONENT(ublk)
2085 SPDK_LOG_REGISTER_COMPONENT(ublk_io)
2086