xref: /spdk/lib/ublk/ublk.c (revision 7521dc6f4b7ea46945f4add1aabf3f320c81ad5a)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2022 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include <liburing.h>
7 
8 #include "spdk/stdinc.h"
9 #include "spdk/string.h"
10 #include "spdk/bdev.h"
11 #include "spdk/endian.h"
12 #include "spdk/env.h"
13 #include "spdk/likely.h"
14 #include "spdk/log.h"
15 #include "spdk/util.h"
16 #include "spdk/queue.h"
17 #include "spdk/json.h"
18 #include "spdk/ublk.h"
19 #include "spdk/thread.h"
20 
21 #include "ublk_internal.h"
22 
23 #define UBLK_CTRL_DEV					"/dev/ublk-control"
24 #define UBLK_BLK_CDEV					"/dev/ublkc"
25 
26 #define LINUX_SECTOR_SHIFT				9
27 #define UBLK_IO_MAX_BYTES				SPDK_BDEV_LARGE_BUF_MAX_SIZE
28 #define UBLK_DEV_MAX_QUEUES				32
29 #define UBLK_DEV_MAX_QUEUE_DEPTH			1024
30 #define UBLK_QUEUE_REQUEST				32
31 #define UBLK_STOP_BUSY_WAITING_MS			10000
32 #define UBLK_BUSY_POLLING_INTERVAL_US			20000
33 #define UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US	1000
34 /* By default, kernel ublk_drv driver can support up to 64 block devices */
35 #define UBLK_DEFAULT_MAX_SUPPORTED_DEVS			64
36 
37 #define UBLK_IOBUF_SMALL_CACHE_SIZE			128
38 #define UBLK_IOBUF_LARGE_CACHE_SIZE			32
39 
40 #define UBLK_DEBUGLOG(ublk, format, ...) \
41 	SPDK_DEBUGLOG(ublk, "ublk%d: " format, ublk->ublk_id, ##__VA_ARGS__);
42 
43 static uint32_t g_num_ublk_poll_groups = 0;
44 static uint32_t g_next_ublk_poll_group = 0;
45 static uint32_t g_ublks_max = UBLK_DEFAULT_MAX_SUPPORTED_DEVS;
46 static struct spdk_cpuset g_core_mask;
47 static bool g_disable_user_copy = false;
48 
49 struct ublk_queue;
50 struct ublk_poll_group;
51 struct ublk_io;
52 static void _ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io);
53 static void ublk_dev_queue_fini(struct ublk_queue *q);
54 static int ublk_poll(void *arg);
55 
56 static int ublk_set_params(struct spdk_ublk_dev *ublk);
57 static int ublk_start_dev(struct spdk_ublk_dev *ublk, bool is_recovering);
58 static void ublk_free_dev(struct spdk_ublk_dev *ublk);
59 static void ublk_delete_dev(void *arg);
60 static int ublk_close_dev(struct spdk_ublk_dev *ublk);
61 static int ublk_ctrl_start_recovery(struct spdk_ublk_dev *ublk);
62 
63 static int ublk_ctrl_cmd_submit(struct spdk_ublk_dev *ublk, uint32_t cmd_op);
64 
65 static const char *ublk_op_name[64] = {
66 	[UBLK_CMD_GET_DEV_INFO] = "UBLK_CMD_GET_DEV_INFO",
67 	[UBLK_CMD_ADD_DEV] =	"UBLK_CMD_ADD_DEV",
68 	[UBLK_CMD_DEL_DEV] =	"UBLK_CMD_DEL_DEV",
69 	[UBLK_CMD_START_DEV] =	"UBLK_CMD_START_DEV",
70 	[UBLK_CMD_STOP_DEV] =	"UBLK_CMD_STOP_DEV",
71 	[UBLK_CMD_SET_PARAMS] =	"UBLK_CMD_SET_PARAMS",
72 	[UBLK_CMD_START_USER_RECOVERY] = "UBLK_CMD_START_USER_RECOVERY",
73 	[UBLK_CMD_END_USER_RECOVERY] = "UBLK_CMD_END_USER_RECOVERY",
74 };
75 
76 typedef void (*ublk_get_buf_cb)(struct ublk_io *io);
77 
78 struct ublk_io {
79 	void			*payload;
80 	void			*mpool_entry;
81 	bool			need_data;
82 	bool			user_copy;
83 	uint16_t		tag;
84 	uint64_t		payload_size;
85 	uint32_t		cmd_op;
86 	int32_t			result;
87 	struct spdk_bdev_desc	*bdev_desc;
88 	struct spdk_io_channel	*bdev_ch;
89 	const struct ublksrv_io_desc	*iod;
90 	ublk_get_buf_cb		get_buf_cb;
91 	struct ublk_queue	*q;
92 	/* for bdev io_wait */
93 	struct spdk_bdev_io_wait_entry bdev_io_wait;
94 	struct spdk_iobuf_entry	iobuf;
95 
96 	TAILQ_ENTRY(ublk_io)	tailq;
97 };
98 
99 struct ublk_queue {
100 	uint32_t		q_id;
101 	uint32_t		q_depth;
102 	struct ublk_io		*ios;
103 	TAILQ_HEAD(, ublk_io)	completed_io_list;
104 	TAILQ_HEAD(, ublk_io)	inflight_io_list;
105 	uint32_t		cmd_inflight;
106 	bool			is_stopping;
107 	struct ublksrv_io_desc	*io_cmd_buf;
108 	/* ring depth == dev_info->queue_depth. */
109 	struct io_uring		ring;
110 	struct spdk_ublk_dev	*dev;
111 	struct ublk_poll_group	*poll_group;
112 	struct spdk_io_channel	*bdev_ch;
113 
114 	TAILQ_ENTRY(ublk_queue)	tailq;
115 };
116 
117 struct spdk_ublk_dev {
118 	struct spdk_bdev	*bdev;
119 	struct spdk_bdev_desc	*bdev_desc;
120 
121 	int			cdev_fd;
122 	struct ublk_params	dev_params;
123 	struct ublksrv_ctrl_dev_info	dev_info;
124 
125 	uint32_t		ublk_id;
126 	uint32_t		num_queues;
127 	uint32_t		queue_depth;
128 	uint32_t		online_num_queues;
129 	uint32_t		sector_per_block_shift;
130 	struct ublk_queue	queues[UBLK_DEV_MAX_QUEUES];
131 
132 	struct spdk_poller	*retry_poller;
133 	int			retry_count;
134 	uint32_t		queues_closed;
135 	ublk_ctrl_cb		ctrl_cb;
136 	void			*cb_arg;
137 	uint32_t		current_cmd_op;
138 	uint32_t		ctrl_ops_in_progress;
139 	bool			is_closing;
140 	bool			is_recovering;
141 
142 	TAILQ_ENTRY(spdk_ublk_dev) tailq;
143 	TAILQ_ENTRY(spdk_ublk_dev) wait_tailq;
144 };
145 
146 struct ublk_poll_group {
147 	struct spdk_thread		*ublk_thread;
148 	struct spdk_poller		*ublk_poller;
149 	struct spdk_iobuf_channel	iobuf_ch;
150 	TAILQ_HEAD(, ublk_queue)	queue_list;
151 };
152 
153 struct ublk_tgt {
154 	int			ctrl_fd;
155 	bool			active;
156 	bool			is_destroying;
157 	spdk_ublk_fini_cb	cb_fn;
158 	void			*cb_arg;
159 	struct io_uring		ctrl_ring;
160 	struct spdk_poller	*ctrl_poller;
161 	uint32_t		ctrl_ops_in_progress;
162 	struct ublk_poll_group	*poll_groups;
163 	uint32_t		num_ublk_devs;
164 	uint64_t		features;
165 	/* `ublk_drv` supports UBLK_F_CMD_IOCTL_ENCODE */
166 	bool			ioctl_encode;
167 	/* `ublk_drv` supports UBLK_F_USER_COPY */
168 	bool			user_copy;
169 	/* `ublk_drv` supports UBLK_F_USER_RECOVERY */
170 	bool			user_recovery;
171 };
172 
173 static TAILQ_HEAD(, spdk_ublk_dev) g_ublk_devs = TAILQ_HEAD_INITIALIZER(g_ublk_devs);
174 static struct ublk_tgt g_ublk_tgt;
175 
176 /* helpers for using io_uring */
177 static inline int
178 ublk_setup_ring(uint32_t depth, struct io_uring *r, unsigned flags)
179 {
180 	struct io_uring_params p = {};
181 
182 	p.flags = flags | IORING_SETUP_CQSIZE;
183 	p.cq_entries = depth;
184 
185 	return io_uring_queue_init_params(depth, r, &p);
186 }
187 
188 static inline struct io_uring_sqe *
189 ublk_uring_get_sqe(struct io_uring *r, uint32_t idx)
190 {
191 	/* Need to update the idx since we set IORING_SETUP_SQE128 parameter in ublk_setup_ring */
192 	return &r->sq.sqes[idx << 1];
193 }
194 
195 static inline void *
196 ublk_get_sqe_cmd(struct io_uring_sqe *sqe)
197 {
198 	return (void *)&sqe->addr3;
199 }
200 
201 static inline void
202 ublk_set_sqe_cmd_op(struct io_uring_sqe *sqe, uint32_t cmd_op)
203 {
204 	uint32_t opc = cmd_op;
205 
206 	if (g_ublk_tgt.ioctl_encode) {
207 		switch (cmd_op) {
208 		/* ctrl uring */
209 		case UBLK_CMD_GET_DEV_INFO:
210 			opc = _IOR('u', UBLK_CMD_GET_DEV_INFO, struct ublksrv_ctrl_cmd);
211 			break;
212 		case UBLK_CMD_ADD_DEV:
213 			opc = _IOWR('u', UBLK_CMD_ADD_DEV, struct ublksrv_ctrl_cmd);
214 			break;
215 		case UBLK_CMD_DEL_DEV:
216 			opc = _IOWR('u', UBLK_CMD_DEL_DEV, struct ublksrv_ctrl_cmd);
217 			break;
218 		case UBLK_CMD_START_DEV:
219 			opc = _IOWR('u', UBLK_CMD_START_DEV, struct ublksrv_ctrl_cmd);
220 			break;
221 		case UBLK_CMD_STOP_DEV:
222 			opc = _IOWR('u', UBLK_CMD_STOP_DEV, struct ublksrv_ctrl_cmd);
223 			break;
224 		case UBLK_CMD_SET_PARAMS:
225 			opc = _IOWR('u', UBLK_CMD_SET_PARAMS, struct ublksrv_ctrl_cmd);
226 			break;
227 		case UBLK_CMD_START_USER_RECOVERY:
228 			opc = _IOWR('u', UBLK_CMD_START_USER_RECOVERY, struct ublksrv_ctrl_cmd);
229 			break;
230 		case UBLK_CMD_END_USER_RECOVERY:
231 			opc = _IOWR('u', UBLK_CMD_END_USER_RECOVERY, struct ublksrv_ctrl_cmd);
232 			break;
233 
234 		/* io uring */
235 		case UBLK_IO_FETCH_REQ:
236 			opc = _IOWR('u', UBLK_IO_FETCH_REQ, struct ublksrv_io_cmd);
237 			break;
238 		case UBLK_IO_COMMIT_AND_FETCH_REQ:
239 			opc = _IOWR('u', UBLK_IO_COMMIT_AND_FETCH_REQ, struct ublksrv_io_cmd);
240 			break;
241 		case UBLK_IO_NEED_GET_DATA:
242 			opc = _IOWR('u', UBLK_IO_NEED_GET_DATA, struct ublksrv_io_cmd);
243 			break;
244 		default:
245 			break;
246 		}
247 	}
248 
249 	sqe->off = opc;
250 }
251 
252 static inline uint64_t
253 build_user_data(uint16_t tag, uint8_t op)
254 {
255 	assert(!(tag >> 16) && !(op >> 8));
256 
257 	return tag | (op << 16);
258 }
259 
260 static inline uint16_t
261 user_data_to_tag(uint64_t user_data)
262 {
263 	return user_data & 0xffff;
264 }
265 
266 static inline uint8_t
267 user_data_to_op(uint64_t user_data)
268 {
269 	return (user_data >> 16) & 0xff;
270 }
271 
272 static inline uint64_t
273 ublk_user_copy_pos(uint16_t q_id, uint16_t tag)
274 {
275 	return (uint64_t)UBLKSRV_IO_BUF_OFFSET + ((((uint64_t)q_id) << UBLK_QID_OFF) | (((
276 				uint64_t)tag) << UBLK_TAG_OFF));
277 }
278 
279 void
280 spdk_ublk_init(void)
281 {
282 	assert(spdk_thread_is_app_thread(NULL));
283 
284 	g_ublk_tgt.ctrl_fd = -1;
285 	g_ublk_tgt.ctrl_ring.ring_fd = -1;
286 }
287 
288 static void
289 ublk_ctrl_cmd_error(struct spdk_ublk_dev *ublk, int32_t res)
290 {
291 	assert(res != 0);
292 
293 	SPDK_ERRLOG("ctrlr cmd %s failed, %s\n", ublk_op_name[ublk->current_cmd_op], spdk_strerror(-res));
294 	if (ublk->ctrl_cb) {
295 		ublk->ctrl_cb(ublk->cb_arg, res);
296 		ublk->ctrl_cb = NULL;
297 	}
298 
299 	switch (ublk->current_cmd_op) {
300 	case UBLK_CMD_ADD_DEV:
301 	case UBLK_CMD_SET_PARAMS:
302 	case UBLK_CMD_START_USER_RECOVERY:
303 	case UBLK_CMD_END_USER_RECOVERY:
304 		ublk_delete_dev(ublk);
305 		break;
306 	case UBLK_CMD_START_DEV:
307 		ublk_close_dev(ublk);
308 		break;
309 	case UBLK_CMD_GET_DEV_INFO:
310 		ublk_free_dev(ublk);
311 		break;
312 	case UBLK_CMD_STOP_DEV:
313 	case UBLK_CMD_DEL_DEV:
314 		break;
315 	default:
316 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", ublk->current_cmd_op);
317 		break;
318 	}
319 }
320 
321 static void
322 ublk_ctrl_process_cqe(struct io_uring_cqe *cqe)
323 {
324 	struct spdk_ublk_dev *ublk;
325 	int rc = 0;
326 
327 	ublk = (struct spdk_ublk_dev *)cqe->user_data;
328 	UBLK_DEBUGLOG(ublk, "ctrl cmd %s completed\n", ublk_op_name[ublk->current_cmd_op]);
329 	ublk->ctrl_ops_in_progress--;
330 
331 	if (spdk_unlikely(cqe->res != 0)) {
332 		ublk_ctrl_cmd_error(ublk, cqe->res);
333 		return;
334 	}
335 
336 	switch (ublk->current_cmd_op) {
337 	case UBLK_CMD_ADD_DEV:
338 		rc = ublk_set_params(ublk);
339 		if (rc < 0) {
340 			ublk_delete_dev(ublk);
341 			goto cb_done;
342 		}
343 		break;
344 	case UBLK_CMD_SET_PARAMS:
345 		rc = ublk_start_dev(ublk, false);
346 		if (rc < 0) {
347 			ublk_delete_dev(ublk);
348 			goto cb_done;
349 		}
350 		break;
351 	case UBLK_CMD_START_DEV:
352 		goto cb_done;
353 		break;
354 	case UBLK_CMD_STOP_DEV:
355 		break;
356 	case UBLK_CMD_DEL_DEV:
357 		if (ublk->ctrl_cb) {
358 			ublk->ctrl_cb(ublk->cb_arg, 0);
359 			ublk->ctrl_cb = NULL;
360 		}
361 		ublk_free_dev(ublk);
362 		break;
363 	case UBLK_CMD_GET_DEV_INFO:
364 		rc = ublk_ctrl_start_recovery(ublk);
365 		if (rc < 0) {
366 			ublk_delete_dev(ublk);
367 			goto cb_done;
368 		}
369 		break;
370 	case UBLK_CMD_START_USER_RECOVERY:
371 		rc = ublk_start_dev(ublk, true);
372 		if (rc < 0) {
373 			ublk_delete_dev(ublk);
374 			goto cb_done;
375 		}
376 		break;
377 	case UBLK_CMD_END_USER_RECOVERY:
378 		SPDK_NOTICELOG("Ublk %u recover done successfully\n", ublk->ublk_id);
379 		ublk->is_recovering = false;
380 		goto cb_done;
381 		break;
382 	default:
383 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", ublk->current_cmd_op);
384 		break;
385 	}
386 
387 	return;
388 
389 cb_done:
390 	if (ublk->ctrl_cb) {
391 		ublk->ctrl_cb(ublk->cb_arg, rc);
392 		ublk->ctrl_cb = NULL;
393 	}
394 }
395 
396 static int
397 ublk_ctrl_poller(void *arg)
398 {
399 	struct io_uring *ring = &g_ublk_tgt.ctrl_ring;
400 	struct io_uring_cqe *cqe;
401 	const int max = 8;
402 	int i, count = 0, rc;
403 
404 	if (!g_ublk_tgt.ctrl_ops_in_progress) {
405 		return SPDK_POLLER_IDLE;
406 	}
407 
408 	for (i = 0; i < max; i++) {
409 		rc = io_uring_peek_cqe(ring, &cqe);
410 		if (rc == -EAGAIN) {
411 			break;
412 		}
413 
414 		assert(cqe != NULL);
415 		g_ublk_tgt.ctrl_ops_in_progress--;
416 
417 		ublk_ctrl_process_cqe(cqe);
418 
419 		io_uring_cqe_seen(ring, cqe);
420 		count++;
421 	}
422 
423 	return count > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
424 }
425 
426 static int
427 ublk_ctrl_cmd_submit(struct spdk_ublk_dev *ublk, uint32_t cmd_op)
428 {
429 	uint32_t dev_id = ublk->ublk_id;
430 	int rc = -EINVAL;
431 	struct io_uring_sqe *sqe;
432 	struct ublksrv_ctrl_cmd *cmd;
433 
434 	UBLK_DEBUGLOG(ublk, "ctrl cmd %s\n", ublk_op_name[cmd_op]);
435 
436 	sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring);
437 	if (!sqe) {
438 		SPDK_ERRLOG("No available sqe in ctrl ring\n");
439 		assert(false);
440 		return -ENOENT;
441 	}
442 
443 	cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe);
444 	sqe->fd = g_ublk_tgt.ctrl_fd;
445 	sqe->opcode = IORING_OP_URING_CMD;
446 	sqe->ioprio = 0;
447 	cmd->dev_id = dev_id;
448 	cmd->queue_id = -1;
449 	ublk->current_cmd_op = cmd_op;
450 
451 	switch (cmd_op) {
452 	case UBLK_CMD_ADD_DEV:
453 	case UBLK_CMD_GET_DEV_INFO:
454 		cmd->addr = (__u64)(uintptr_t)&ublk->dev_info;
455 		cmd->len = sizeof(ublk->dev_info);
456 		break;
457 	case UBLK_CMD_SET_PARAMS:
458 		cmd->addr = (__u64)(uintptr_t)&ublk->dev_params;
459 		cmd->len = sizeof(ublk->dev_params);
460 		break;
461 	case UBLK_CMD_START_DEV:
462 		cmd->data[0] = getpid();
463 		break;
464 	case UBLK_CMD_STOP_DEV:
465 		break;
466 	case UBLK_CMD_DEL_DEV:
467 		break;
468 	case UBLK_CMD_START_USER_RECOVERY:
469 		break;
470 	case UBLK_CMD_END_USER_RECOVERY:
471 		cmd->data[0] = getpid();
472 		break;
473 	default:
474 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", cmd_op);
475 		return -EINVAL;
476 	}
477 	ublk_set_sqe_cmd_op(sqe, cmd_op);
478 	io_uring_sqe_set_data(sqe, ublk);
479 
480 	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
481 	if (rc < 0) {
482 		SPDK_ERRLOG("uring submit rc %d\n", rc);
483 		assert(false);
484 		return rc;
485 	}
486 	g_ublk_tgt.ctrl_ops_in_progress++;
487 	ublk->ctrl_ops_in_progress++;
488 
489 	return 0;
490 }
491 
492 static int
493 ublk_ctrl_cmd_get_features(void)
494 {
495 	int rc;
496 	struct io_uring_sqe *sqe;
497 	struct io_uring_cqe *cqe;
498 	struct ublksrv_ctrl_cmd *cmd;
499 	uint32_t cmd_op;
500 
501 	sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring);
502 	if (!sqe) {
503 		SPDK_ERRLOG("No available sqe in ctrl ring\n");
504 		assert(false);
505 		return -ENOENT;
506 	}
507 
508 	cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe);
509 	sqe->fd = g_ublk_tgt.ctrl_fd;
510 	sqe->opcode = IORING_OP_URING_CMD;
511 	sqe->ioprio = 0;
512 	cmd->dev_id = -1;
513 	cmd->queue_id = -1;
514 	cmd->addr = (__u64)(uintptr_t)&g_ublk_tgt.features;
515 	cmd->len = sizeof(g_ublk_tgt.features);
516 
517 	cmd_op = UBLK_U_CMD_GET_FEATURES;
518 	ublk_set_sqe_cmd_op(sqe, cmd_op);
519 
520 	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
521 	if (rc < 0) {
522 		SPDK_ERRLOG("uring submit rc %d\n", rc);
523 		return rc;
524 	}
525 
526 	rc = io_uring_wait_cqe(&g_ublk_tgt.ctrl_ring, &cqe);
527 	if (rc < 0) {
528 		SPDK_ERRLOG("wait cqe rc %d\n", rc);
529 		return rc;
530 	}
531 
532 	if (cqe->res == 0) {
533 		g_ublk_tgt.ioctl_encode = !!(g_ublk_tgt.features & UBLK_F_CMD_IOCTL_ENCODE);
534 		g_ublk_tgt.user_copy = !!(g_ublk_tgt.features & UBLK_F_USER_COPY);
535 		g_ublk_tgt.user_copy &= !g_disable_user_copy;
536 		g_ublk_tgt.user_recovery = !!(g_ublk_tgt.features & UBLK_F_USER_RECOVERY);
537 		SPDK_NOTICELOG("User Copy %s\n", g_ublk_tgt.user_copy ? "enabled" : "disabled");
538 	}
539 	io_uring_cqe_seen(&g_ublk_tgt.ctrl_ring, cqe);
540 
541 	return 0;
542 }
543 
544 static int
545 ublk_queue_cmd_buf_sz(uint32_t q_depth)
546 {
547 	uint32_t size = q_depth * sizeof(struct ublksrv_io_desc);
548 	uint32_t page_sz = getpagesize();
549 
550 	/* round up size */
551 	return (size + page_sz - 1) & ~(page_sz - 1);
552 }
553 
554 static int
555 ublk_get_max_support_devs(void)
556 {
557 	FILE *file;
558 	char str[128];
559 
560 	file = fopen("/sys/module/ublk_drv/parameters/ublks_max", "r");
561 	if (!file) {
562 		return -ENOENT;
563 	}
564 
565 	if (!fgets(str, sizeof(str), file)) {
566 		fclose(file);
567 		return -EINVAL;
568 	}
569 	fclose(file);
570 
571 	spdk_str_chomp(str);
572 	return spdk_strtol(str, 10);
573 }
574 
575 static int
576 ublk_open(void)
577 {
578 	int rc, ublks_max;
579 
580 	g_ublk_tgt.ctrl_fd = open(UBLK_CTRL_DEV, O_RDWR);
581 	if (g_ublk_tgt.ctrl_fd < 0) {
582 		rc = errno;
583 		SPDK_ERRLOG("UBLK conrol dev %s can't be opened, error=%s\n", UBLK_CTRL_DEV, spdk_strerror(errno));
584 		return -rc;
585 	}
586 
587 	ublks_max = ublk_get_max_support_devs();
588 	if (ublks_max > 0) {
589 		g_ublks_max = ublks_max;
590 	}
591 
592 	/* We need to set SQPOLL for kernels 6.1 and earlier, since they would not defer ublk ctrl
593 	 * ring processing to a workqueue.  Ctrl ring processing is minimal, so SQPOLL is fine.
594 	 * All the commands sent via control uring for a ublk device is executed one by one, so use
595 	 * ublks_max * 2 as the number of uring entries is enough.
596 	 */
597 	rc = ublk_setup_ring(g_ublks_max * 2, &g_ublk_tgt.ctrl_ring,
598 			     IORING_SETUP_SQE128 | IORING_SETUP_SQPOLL);
599 	if (rc < 0) {
600 		SPDK_ERRLOG("UBLK ctrl queue_init: %s\n", spdk_strerror(-rc));
601 		goto err;
602 	}
603 
604 	rc = ublk_ctrl_cmd_get_features();
605 	if (rc) {
606 		goto err;
607 	}
608 
609 	return 0;
610 
611 err:
612 	close(g_ublk_tgt.ctrl_fd);
613 	g_ublk_tgt.ctrl_fd = -1;
614 	return rc;
615 }
616 
617 static int
618 ublk_parse_core_mask(const char *mask)
619 {
620 	struct spdk_cpuset tmp_mask;
621 	int rc;
622 
623 	if (mask == NULL) {
624 		spdk_env_get_cpuset(&g_core_mask);
625 		return 0;
626 	}
627 
628 	rc = spdk_cpuset_parse(&g_core_mask, mask);
629 	if (rc < 0) {
630 		SPDK_ERRLOG("invalid cpumask %s\n", mask);
631 		return -EINVAL;
632 	}
633 
634 	if (spdk_cpuset_count(&g_core_mask) == 0) {
635 		SPDK_ERRLOG("no cpus specified\n");
636 		return -EINVAL;
637 	}
638 
639 	spdk_env_get_cpuset(&tmp_mask);
640 	spdk_cpuset_and(&tmp_mask, &g_core_mask);
641 
642 	if (!spdk_cpuset_equal(&tmp_mask, &g_core_mask)) {
643 		SPDK_ERRLOG("one of selected cpu is outside of core mask(=%s)\n",
644 			    spdk_cpuset_fmt(&g_core_mask));
645 		return -EINVAL;
646 	}
647 
648 	return 0;
649 }
650 
651 static void
652 ublk_poller_register(void *args)
653 {
654 	struct ublk_poll_group *poll_group = args;
655 	int rc;
656 
657 	assert(spdk_get_thread() == poll_group->ublk_thread);
658 	/* Bind ublk spdk_thread to current CPU core in order to avoid thread context switch
659 	 * during uring processing as required by ublk kernel.
660 	 */
661 	spdk_thread_bind(spdk_get_thread(), true);
662 
663 	TAILQ_INIT(&poll_group->queue_list);
664 	poll_group->ublk_poller = SPDK_POLLER_REGISTER(ublk_poll, poll_group, 0);
665 	rc = spdk_iobuf_channel_init(&poll_group->iobuf_ch, "ublk",
666 				     UBLK_IOBUF_SMALL_CACHE_SIZE, UBLK_IOBUF_LARGE_CACHE_SIZE);
667 	if (rc != 0) {
668 		assert(false);
669 	}
670 }
671 
672 struct rpc_create_target {
673 	bool disable_user_copy;
674 };
675 
676 static const struct spdk_json_object_decoder rpc_ublk_create_target[] = {
677 	{"disable_user_copy", offsetof(struct rpc_create_target, disable_user_copy), spdk_json_decode_bool, true},
678 };
679 
680 int
681 ublk_create_target(const char *cpumask_str, const struct spdk_json_val *params)
682 {
683 	int rc;
684 	uint32_t i;
685 	char thread_name[32];
686 	struct rpc_create_target req = {};
687 	struct ublk_poll_group *poll_group;
688 
689 	if (g_ublk_tgt.active == true) {
690 		SPDK_ERRLOG("UBLK target has been created\n");
691 		return -EBUSY;
692 	}
693 
694 	rc = ublk_parse_core_mask(cpumask_str);
695 	if (rc != 0) {
696 		return rc;
697 	}
698 
699 	if (params) {
700 		if (spdk_json_decode_object_relaxed(params, rpc_ublk_create_target,
701 						    SPDK_COUNTOF(rpc_ublk_create_target),
702 						    &req)) {
703 			SPDK_ERRLOG("spdk_json_decode_object failed\n");
704 			return -EINVAL;
705 		}
706 		g_disable_user_copy = req.disable_user_copy;
707 	}
708 
709 	assert(g_ublk_tgt.poll_groups == NULL);
710 	g_ublk_tgt.poll_groups = calloc(spdk_env_get_core_count(), sizeof(*poll_group));
711 	if (!g_ublk_tgt.poll_groups) {
712 		return -ENOMEM;
713 	}
714 
715 	rc = ublk_open();
716 	if (rc != 0) {
717 		SPDK_ERRLOG("Fail to open UBLK, error=%s\n", spdk_strerror(-rc));
718 		free(g_ublk_tgt.poll_groups);
719 		g_ublk_tgt.poll_groups = NULL;
720 		return rc;
721 	}
722 
723 	spdk_iobuf_register_module("ublk");
724 
725 	SPDK_ENV_FOREACH_CORE(i) {
726 		if (!spdk_cpuset_get_cpu(&g_core_mask, i)) {
727 			continue;
728 		}
729 		snprintf(thread_name, sizeof(thread_name), "ublk_thread%u", i);
730 		poll_group = &g_ublk_tgt.poll_groups[g_num_ublk_poll_groups];
731 		poll_group->ublk_thread = spdk_thread_create(thread_name, &g_core_mask);
732 		spdk_thread_send_msg(poll_group->ublk_thread, ublk_poller_register, poll_group);
733 		g_num_ublk_poll_groups++;
734 	}
735 
736 	assert(spdk_thread_is_app_thread(NULL));
737 	g_ublk_tgt.active = true;
738 	g_ublk_tgt.ctrl_ops_in_progress = 0;
739 	g_ublk_tgt.ctrl_poller = SPDK_POLLER_REGISTER(ublk_ctrl_poller, NULL,
740 				 UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US);
741 
742 	SPDK_NOTICELOG("UBLK target created successfully\n");
743 
744 	return 0;
745 }
746 
747 static void
748 _ublk_fini_done(void *args)
749 {
750 	SPDK_DEBUGLOG(ublk, "\n");
751 
752 	g_num_ublk_poll_groups = 0;
753 	g_next_ublk_poll_group = 0;
754 	g_ublk_tgt.is_destroying = false;
755 	g_ublk_tgt.active = false;
756 	g_ublk_tgt.features = 0;
757 	g_ublk_tgt.ioctl_encode = false;
758 	g_ublk_tgt.user_copy = false;
759 	g_ublk_tgt.user_recovery = false;
760 
761 	if (g_ublk_tgt.cb_fn) {
762 		g_ublk_tgt.cb_fn(g_ublk_tgt.cb_arg);
763 		g_ublk_tgt.cb_fn = NULL;
764 		g_ublk_tgt.cb_arg = NULL;
765 	}
766 
767 	if (g_ublk_tgt.poll_groups) {
768 		free(g_ublk_tgt.poll_groups);
769 		g_ublk_tgt.poll_groups = NULL;
770 	}
771 
772 }
773 
774 static void
775 ublk_thread_exit(void *args)
776 {
777 	struct spdk_thread *ublk_thread = spdk_get_thread();
778 	uint32_t i;
779 
780 	for (i = 0; i < g_num_ublk_poll_groups; i++) {
781 		if (g_ublk_tgt.poll_groups[i].ublk_thread == ublk_thread) {
782 			spdk_poller_unregister(&g_ublk_tgt.poll_groups[i].ublk_poller);
783 			spdk_iobuf_channel_fini(&g_ublk_tgt.poll_groups[i].iobuf_ch);
784 			spdk_thread_bind(ublk_thread, false);
785 			spdk_thread_exit(ublk_thread);
786 		}
787 	}
788 }
789 
790 static int
791 ublk_close_dev(struct spdk_ublk_dev *ublk)
792 {
793 	int rc;
794 
795 	/* set is_closing */
796 	if (ublk->is_closing) {
797 		return -EBUSY;
798 	}
799 	ublk->is_closing = true;
800 
801 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_STOP_DEV);
802 	if (rc < 0) {
803 		SPDK_ERRLOG("stop dev %d failed\n", ublk->ublk_id);
804 	}
805 	return rc;
806 }
807 
808 static void
809 _ublk_fini(void *args)
810 {
811 	struct spdk_ublk_dev	*ublk, *ublk_tmp;
812 
813 	TAILQ_FOREACH_SAFE(ublk, &g_ublk_devs, tailq, ublk_tmp) {
814 		ublk_close_dev(ublk);
815 	}
816 
817 	/* Check if all ublks closed */
818 	if (TAILQ_EMPTY(&g_ublk_devs)) {
819 		SPDK_DEBUGLOG(ublk, "finish shutdown\n");
820 		spdk_poller_unregister(&g_ublk_tgt.ctrl_poller);
821 		if (g_ublk_tgt.ctrl_ring.ring_fd >= 0) {
822 			io_uring_queue_exit(&g_ublk_tgt.ctrl_ring);
823 			g_ublk_tgt.ctrl_ring.ring_fd = -1;
824 		}
825 		if (g_ublk_tgt.ctrl_fd >= 0) {
826 			close(g_ublk_tgt.ctrl_fd);
827 			g_ublk_tgt.ctrl_fd = -1;
828 		}
829 		spdk_for_each_thread(ublk_thread_exit, NULL, _ublk_fini_done);
830 	} else {
831 		spdk_thread_send_msg(spdk_get_thread(), _ublk_fini, NULL);
832 	}
833 }
834 
835 int
836 spdk_ublk_fini(spdk_ublk_fini_cb cb_fn, void *cb_arg)
837 {
838 	assert(spdk_thread_is_app_thread(NULL));
839 
840 	if (g_ublk_tgt.is_destroying == true) {
841 		/* UBLK target is being destroying */
842 		return -EBUSY;
843 	}
844 	g_ublk_tgt.cb_fn = cb_fn;
845 	g_ublk_tgt.cb_arg = cb_arg;
846 	g_ublk_tgt.is_destroying = true;
847 	_ublk_fini(NULL);
848 
849 	return 0;
850 }
851 
852 int
853 ublk_destroy_target(spdk_ublk_fini_cb cb_fn, void *cb_arg)
854 {
855 	int rc;
856 
857 	if (g_ublk_tgt.active == false) {
858 		/* UBLK target has not been created */
859 		return -ENOENT;
860 	}
861 
862 	rc = spdk_ublk_fini(cb_fn, cb_arg);
863 
864 	return rc;
865 }
866 
867 struct spdk_ublk_dev *
868 ublk_dev_find_by_id(uint32_t ublk_id)
869 {
870 	struct spdk_ublk_dev *ublk;
871 
872 	/* check whether ublk has already been registered by ublk path. */
873 	TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) {
874 		if (ublk->ublk_id == ublk_id) {
875 			return ublk;
876 		}
877 	}
878 
879 	return NULL;
880 }
881 
882 uint32_t
883 ublk_dev_get_id(struct spdk_ublk_dev *ublk)
884 {
885 	return ublk->ublk_id;
886 }
887 
888 struct spdk_ublk_dev *ublk_dev_first(void)
889 {
890 	return TAILQ_FIRST(&g_ublk_devs);
891 }
892 
893 struct spdk_ublk_dev *ublk_dev_next(struct spdk_ublk_dev *prev)
894 {
895 	return TAILQ_NEXT(prev, tailq);
896 }
897 
898 uint32_t
899 ublk_dev_get_queue_depth(struct spdk_ublk_dev *ublk)
900 {
901 	return ublk->queue_depth;
902 }
903 
904 uint32_t
905 ublk_dev_get_num_queues(struct spdk_ublk_dev *ublk)
906 {
907 	return ublk->num_queues;
908 }
909 
910 const char *
911 ublk_dev_get_bdev_name(struct spdk_ublk_dev *ublk)
912 {
913 	return spdk_bdev_get_name(ublk->bdev);
914 }
915 
916 void
917 spdk_ublk_write_config_json(struct spdk_json_write_ctx *w)
918 {
919 	struct spdk_ublk_dev *ublk;
920 
921 	spdk_json_write_array_begin(w);
922 
923 	if (g_ublk_tgt.active) {
924 		spdk_json_write_object_begin(w);
925 
926 		spdk_json_write_named_string(w, "method", "ublk_create_target");
927 		spdk_json_write_named_object_begin(w, "params");
928 		spdk_json_write_named_string(w, "cpumask", spdk_cpuset_fmt(&g_core_mask));
929 		spdk_json_write_object_end(w);
930 
931 		spdk_json_write_object_end(w);
932 	}
933 
934 	TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) {
935 		spdk_json_write_object_begin(w);
936 
937 		spdk_json_write_named_string(w, "method", "ublk_start_disk");
938 
939 		spdk_json_write_named_object_begin(w, "params");
940 		spdk_json_write_named_string(w, "bdev_name", ublk_dev_get_bdev_name(ublk));
941 		spdk_json_write_named_uint32(w, "ublk_id", ublk->ublk_id);
942 		spdk_json_write_named_uint32(w, "num_queues", ublk->num_queues);
943 		spdk_json_write_named_uint32(w, "queue_depth", ublk->queue_depth);
944 		spdk_json_write_object_end(w);
945 
946 		spdk_json_write_object_end(w);
947 	}
948 
949 	spdk_json_write_array_end(w);
950 }
951 
952 static void
953 ublk_dev_list_register(struct spdk_ublk_dev *ublk)
954 {
955 	UBLK_DEBUGLOG(ublk, "add to tailq\n");
956 	TAILQ_INSERT_TAIL(&g_ublk_devs, ublk, tailq);
957 	g_ublk_tgt.num_ublk_devs++;
958 }
959 
960 static void
961 ublk_dev_list_unregister(struct spdk_ublk_dev *ublk)
962 {
963 	/*
964 	 * ublk device may be stopped before registered.
965 	 * check whether it was registered.
966 	 */
967 
968 	if (ublk_dev_find_by_id(ublk->ublk_id)) {
969 		UBLK_DEBUGLOG(ublk, "remove from tailq\n");
970 		TAILQ_REMOVE(&g_ublk_devs, ublk, tailq);
971 		assert(g_ublk_tgt.num_ublk_devs);
972 		g_ublk_tgt.num_ublk_devs--;
973 		return;
974 	}
975 
976 	UBLK_DEBUGLOG(ublk, "not found in tailq\n");
977 	assert(false);
978 }
979 
980 static void
981 ublk_delete_dev(void *arg)
982 {
983 	struct spdk_ublk_dev *ublk = arg;
984 	int rc = 0;
985 	uint32_t q_idx;
986 
987 	assert(spdk_thread_is_app_thread(NULL));
988 	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
989 		ublk_dev_queue_fini(&ublk->queues[q_idx]);
990 	}
991 
992 	if (ublk->cdev_fd >= 0) {
993 		close(ublk->cdev_fd);
994 	}
995 
996 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_DEL_DEV);
997 	if (rc < 0) {
998 		SPDK_ERRLOG("delete dev %d failed\n", ublk->ublk_id);
999 	}
1000 }
1001 
1002 static int
1003 _ublk_close_dev_retry(void *arg)
1004 {
1005 	struct spdk_ublk_dev *ublk = arg;
1006 
1007 	if (ublk->ctrl_ops_in_progress > 0) {
1008 		if (ublk->retry_count-- > 0) {
1009 			return SPDK_POLLER_BUSY;
1010 		}
1011 		SPDK_ERRLOG("Timeout on ctrl op completion.\n");
1012 	}
1013 	spdk_poller_unregister(&ublk->retry_poller);
1014 	ublk_delete_dev(ublk);
1015 	return SPDK_POLLER_BUSY;
1016 }
1017 
1018 static void
1019 ublk_try_close_dev(void *arg)
1020 {
1021 	struct spdk_ublk_dev *ublk = arg;
1022 
1023 	assert(spdk_thread_is_app_thread(NULL));
1024 
1025 	ublk->queues_closed += 1;
1026 	SPDK_DEBUGLOG(ublk_io, "ublkb%u closed queues %u\n", ublk->ublk_id, ublk->queues_closed);
1027 
1028 	if (ublk->queues_closed < ublk->num_queues) {
1029 		return;
1030 	}
1031 
1032 	if (ublk->ctrl_ops_in_progress > 0) {
1033 		assert(ublk->retry_poller == NULL);
1034 		ublk->retry_count = UBLK_STOP_BUSY_WAITING_MS * 1000ULL / UBLK_BUSY_POLLING_INTERVAL_US;
1035 		ublk->retry_poller = SPDK_POLLER_REGISTER(_ublk_close_dev_retry, ublk,
1036 				     UBLK_BUSY_POLLING_INTERVAL_US);
1037 	} else {
1038 		ublk_delete_dev(ublk);
1039 	}
1040 }
1041 
1042 static void
1043 ublk_try_close_queue(struct ublk_queue *q)
1044 {
1045 	struct spdk_ublk_dev *ublk = q->dev;
1046 
1047 	/* Close queue until no I/O is submitted to bdev in flight,
1048 	 * no I/O is waiting to commit result, and all I/Os are aborted back.
1049 	 */
1050 	if (!TAILQ_EMPTY(&q->inflight_io_list) || !TAILQ_EMPTY(&q->completed_io_list) || q->cmd_inflight) {
1051 		/* wait for next retry */
1052 		return;
1053 	}
1054 
1055 	TAILQ_REMOVE(&q->poll_group->queue_list, q, tailq);
1056 	spdk_put_io_channel(q->bdev_ch);
1057 	q->bdev_ch = NULL;
1058 
1059 	spdk_thread_send_msg(spdk_thread_get_app_thread(), ublk_try_close_dev, ublk);
1060 }
1061 
1062 int
1063 ublk_stop_disk(uint32_t ublk_id, ublk_ctrl_cb ctrl_cb, void *cb_arg)
1064 {
1065 	struct spdk_ublk_dev *ublk;
1066 
1067 	assert(spdk_thread_is_app_thread(NULL));
1068 
1069 	ublk = ublk_dev_find_by_id(ublk_id);
1070 	if (ublk == NULL) {
1071 		SPDK_ERRLOG("no ublk dev with ublk_id=%u\n", ublk_id);
1072 		return -ENODEV;
1073 	}
1074 	if (ublk->is_closing) {
1075 		SPDK_WARNLOG("ublk %d is closing\n", ublk->ublk_id);
1076 		return -EBUSY;
1077 	}
1078 	if (ublk->ctrl_cb) {
1079 		SPDK_WARNLOG("ublk %d is busy with RPC call\n", ublk->ublk_id);
1080 		return -EBUSY;
1081 	}
1082 
1083 	ublk->ctrl_cb = ctrl_cb;
1084 	ublk->cb_arg = cb_arg;
1085 	return ublk_close_dev(ublk);
1086 }
1087 
1088 static inline void
1089 ublk_mark_io_done(struct ublk_io *io, int res)
1090 {
1091 	/*
1092 	 * mark io done by target, so that SPDK can commit its
1093 	 * result and fetch new request via io_uring command.
1094 	 */
1095 	io->cmd_op = UBLK_IO_COMMIT_AND_FETCH_REQ;
1096 	io->result = res;
1097 	io->need_data = false;
1098 }
1099 
1100 static void
1101 ublk_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1102 {
1103 	struct ublk_io	*io = cb_arg;
1104 	struct ublk_queue *q = io->q;
1105 	int res;
1106 
1107 	if (success) {
1108 		res = io->result;
1109 	} else {
1110 		res = -EIO;
1111 	}
1112 
1113 	ublk_mark_io_done(io, res);
1114 
1115 	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %d res %d)\n",
1116 		      q->q_id, io->tag, res);
1117 	TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1118 	TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq);
1119 
1120 	if (bdev_io != NULL) {
1121 		spdk_bdev_free_io(bdev_io);
1122 	}
1123 }
1124 
1125 static void
1126 ublk_queue_user_copy(struct ublk_io *io, bool is_write)
1127 {
1128 	struct ublk_queue *q = io->q;
1129 	const struct ublksrv_io_desc *iod = io->iod;
1130 	struct io_uring_sqe *sqe;
1131 	uint64_t pos;
1132 	uint32_t nbytes;
1133 
1134 	nbytes = iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1135 	pos = ublk_user_copy_pos(q->q_id, io->tag);
1136 	sqe = io_uring_get_sqe(&q->ring);
1137 	assert(sqe);
1138 
1139 	if (is_write) {
1140 		io_uring_prep_read(sqe, 0, io->payload, nbytes, pos);
1141 	} else {
1142 		io_uring_prep_write(sqe, 0, io->payload, nbytes, pos);
1143 	}
1144 	io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
1145 	io_uring_sqe_set_data64(sqe, build_user_data(io->tag, 0));
1146 
1147 	io->user_copy = true;
1148 	TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1149 	TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq);
1150 }
1151 
1152 static void
1153 ublk_user_copy_read_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1154 {
1155 	struct ublk_io	*io = cb_arg;
1156 
1157 	spdk_bdev_free_io(bdev_io);
1158 
1159 	if (success) {
1160 		ublk_queue_user_copy(io, false);
1161 		return;
1162 	}
1163 	/* READ IO Error */
1164 	ublk_io_done(NULL, false, cb_arg);
1165 }
1166 
1167 static void
1168 ublk_resubmit_io(void *arg)
1169 {
1170 	struct ublk_io *io = (struct ublk_io *)arg;
1171 
1172 	_ublk_submit_bdev_io(io->q, io);
1173 }
1174 
1175 static void
1176 ublk_queue_io(struct ublk_io *io)
1177 {
1178 	int rc;
1179 	struct spdk_bdev *bdev = io->q->dev->bdev;
1180 	struct ublk_queue *q = io->q;
1181 
1182 	io->bdev_io_wait.bdev = bdev;
1183 	io->bdev_io_wait.cb_fn = ublk_resubmit_io;
1184 	io->bdev_io_wait.cb_arg = io;
1185 
1186 	rc = spdk_bdev_queue_io_wait(bdev, q->bdev_ch, &io->bdev_io_wait);
1187 	if (rc != 0) {
1188 		SPDK_ERRLOG("Queue io failed in ublk_queue_io, rc=%d.\n", rc);
1189 		ublk_io_done(NULL, false, io);
1190 	}
1191 }
1192 
1193 static void
1194 ublk_io_get_buffer_cb(struct spdk_iobuf_entry *iobuf, void *buf)
1195 {
1196 	struct ublk_io *io = SPDK_CONTAINEROF(iobuf, struct ublk_io, iobuf);
1197 
1198 	io->mpool_entry = buf;
1199 	assert(io->payload == NULL);
1200 	io->payload = (void *)(uintptr_t)SPDK_ALIGN_CEIL((uintptr_t)buf, 4096ULL);
1201 	io->get_buf_cb(io);
1202 }
1203 
1204 static void
1205 ublk_io_get_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch,
1206 		   ublk_get_buf_cb get_buf_cb)
1207 {
1208 	void *buf;
1209 
1210 	io->payload_size = io->iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1211 	io->get_buf_cb = get_buf_cb;
1212 	buf = spdk_iobuf_get(iobuf_ch, io->payload_size, &io->iobuf, ublk_io_get_buffer_cb);
1213 
1214 	if (buf != NULL) {
1215 		ublk_io_get_buffer_cb(&io->iobuf, buf);
1216 	}
1217 }
1218 
1219 static void
1220 ublk_io_put_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch)
1221 {
1222 	if (io->payload) {
1223 		spdk_iobuf_put(iobuf_ch, io->mpool_entry, io->payload_size);
1224 		io->mpool_entry = NULL;
1225 		io->payload = NULL;
1226 	}
1227 }
1228 
1229 static void
1230 _ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io)
1231 {
1232 	struct spdk_ublk_dev *ublk = q->dev;
1233 	struct spdk_bdev_desc *desc = io->bdev_desc;
1234 	struct spdk_io_channel *ch = io->bdev_ch;
1235 	uint64_t offset_blocks, num_blocks;
1236 	spdk_bdev_io_completion_cb read_cb;
1237 	uint8_t ublk_op;
1238 	int rc = 0;
1239 	const struct ublksrv_io_desc *iod = io->iod;
1240 
1241 	ublk_op = ublksrv_get_op(iod);
1242 	offset_blocks = iod->start_sector >> ublk->sector_per_block_shift;
1243 	num_blocks = iod->nr_sectors >> ublk->sector_per_block_shift;
1244 
1245 	switch (ublk_op) {
1246 	case UBLK_IO_OP_READ:
1247 		if (g_ublk_tgt.user_copy) {
1248 			read_cb = ublk_user_copy_read_done;
1249 		} else {
1250 			read_cb = ublk_io_done;
1251 		}
1252 		rc = spdk_bdev_read_blocks(desc, ch, io->payload, offset_blocks, num_blocks, read_cb, io);
1253 		break;
1254 	case UBLK_IO_OP_WRITE:
1255 		rc = spdk_bdev_write_blocks(desc, ch, io->payload, offset_blocks, num_blocks, ublk_io_done, io);
1256 		break;
1257 	case UBLK_IO_OP_FLUSH:
1258 		rc = spdk_bdev_flush_blocks(desc, ch, 0, spdk_bdev_get_num_blocks(ublk->bdev), ublk_io_done, io);
1259 		break;
1260 	case UBLK_IO_OP_DISCARD:
1261 		rc = spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1262 		break;
1263 	case UBLK_IO_OP_WRITE_ZEROES:
1264 		rc = spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1265 		break;
1266 	default:
1267 		rc = -1;
1268 	}
1269 
1270 	if (rc < 0) {
1271 		if (rc == -ENOMEM) {
1272 			SPDK_INFOLOG(ublk, "No memory, start to queue io.\n");
1273 			ublk_queue_io(io);
1274 		} else {
1275 			SPDK_ERRLOG("ublk io failed in ublk_queue_io, rc=%d, ublk_op=%u\n", rc, ublk_op);
1276 			ublk_io_done(NULL, false, io);
1277 		}
1278 	}
1279 }
1280 
1281 static void
1282 read_get_buffer_done(struct ublk_io *io)
1283 {
1284 	_ublk_submit_bdev_io(io->q, io);
1285 }
1286 
1287 static void
1288 user_copy_write_get_buffer_done(struct ublk_io *io)
1289 {
1290 	ublk_queue_user_copy(io, true);
1291 }
1292 
1293 static void
1294 ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io)
1295 {
1296 	struct spdk_iobuf_channel *iobuf_ch = &q->poll_group->iobuf_ch;
1297 	const struct ublksrv_io_desc *iod = io->iod;
1298 	uint8_t ublk_op;
1299 
1300 	io->result = iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1301 	ublk_op = ublksrv_get_op(iod);
1302 	switch (ublk_op) {
1303 	case UBLK_IO_OP_READ:
1304 		ublk_io_get_buffer(io, iobuf_ch, read_get_buffer_done);
1305 		break;
1306 	case UBLK_IO_OP_WRITE:
1307 		if (g_ublk_tgt.user_copy) {
1308 			ublk_io_get_buffer(io, iobuf_ch, user_copy_write_get_buffer_done);
1309 		} else {
1310 			_ublk_submit_bdev_io(q, io);
1311 		}
1312 		break;
1313 	default:
1314 		_ublk_submit_bdev_io(q, io);
1315 		break;
1316 	}
1317 }
1318 
1319 static inline void
1320 ublksrv_queue_io_cmd(struct ublk_queue *q,
1321 		     struct ublk_io *io, unsigned tag)
1322 {
1323 	struct ublksrv_io_cmd *cmd;
1324 	struct io_uring_sqe *sqe;
1325 	unsigned int cmd_op = 0;;
1326 	uint64_t user_data;
1327 
1328 	/* each io should have operation of fetching or committing */
1329 	assert((io->cmd_op == UBLK_IO_FETCH_REQ) || (io->cmd_op == UBLK_IO_NEED_GET_DATA) ||
1330 	       (io->cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ));
1331 	cmd_op = io->cmd_op;
1332 
1333 	sqe = io_uring_get_sqe(&q->ring);
1334 	assert(sqe);
1335 
1336 	cmd = (struct ublksrv_io_cmd *)ublk_get_sqe_cmd(sqe);
1337 	if (cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ) {
1338 		cmd->result = io->result;
1339 	}
1340 
1341 	/* These fields should be written once, never change */
1342 	ublk_set_sqe_cmd_op(sqe, cmd_op);
1343 	/* dev->cdev_fd */
1344 	sqe->fd		= 0;
1345 	sqe->opcode	= IORING_OP_URING_CMD;
1346 	sqe->flags	= IOSQE_FIXED_FILE;
1347 	sqe->rw_flags	= 0;
1348 	cmd->tag	= tag;
1349 	cmd->addr	= g_ublk_tgt.user_copy ? 0 : (__u64)(uintptr_t)(io->payload);
1350 	cmd->q_id	= q->q_id;
1351 
1352 	user_data = build_user_data(tag, cmd_op);
1353 	io_uring_sqe_set_data64(sqe, user_data);
1354 
1355 	io->cmd_op = 0;
1356 
1357 	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %u cmd_op %u) iof %x stopping %d\n",
1358 		      q->q_id, tag, cmd_op,
1359 		      io->cmd_op, q->is_stopping);
1360 }
1361 
1362 static int
1363 ublk_io_xmit(struct ublk_queue *q)
1364 {
1365 	TAILQ_HEAD(, ublk_io) buffer_free_list;
1366 	struct spdk_iobuf_channel *iobuf_ch;
1367 	int rc = 0, count = 0;
1368 	struct ublk_io *io;
1369 
1370 	if (TAILQ_EMPTY(&q->completed_io_list)) {
1371 		return 0;
1372 	}
1373 
1374 	TAILQ_INIT(&buffer_free_list);
1375 	while (!TAILQ_EMPTY(&q->completed_io_list)) {
1376 		io = TAILQ_FIRST(&q->completed_io_list);
1377 		assert(io != NULL);
1378 		/*
1379 		 * Remove IO from list now assuming it will be completed. It will be inserted
1380 		 * back to the head if it cannot be completed. This approach is specifically
1381 		 * taken to work around a scan-build use-after-free mischaracterization.
1382 		 */
1383 		TAILQ_REMOVE(&q->completed_io_list, io, tailq);
1384 		if (!io->user_copy) {
1385 			if (!io->need_data) {
1386 				TAILQ_INSERT_TAIL(&buffer_free_list, io, tailq);
1387 			}
1388 			ublksrv_queue_io_cmd(q, io, io->tag);
1389 		}
1390 		count++;
1391 	}
1392 
1393 	q->cmd_inflight += count;
1394 	rc = io_uring_submit(&q->ring);
1395 	if (rc != count) {
1396 		SPDK_ERRLOG("could not submit all commands\n");
1397 		assert(false);
1398 	}
1399 
1400 	/* Note: for READ io, ublk will always copy the data out of
1401 	 * the buffers in the io_uring_submit context.  Since we
1402 	 * are not using SQPOLL for IO rings, we can safely free
1403 	 * those IO buffers here.  This design doesn't seem ideal,
1404 	 * but it's what's possible since there is no discrete
1405 	 * COMMIT_REQ operation.  That will need to change in the
1406 	 * future should we ever want to support async copy
1407 	 * operations.
1408 	 */
1409 	iobuf_ch = &q->poll_group->iobuf_ch;
1410 	while (!TAILQ_EMPTY(&buffer_free_list)) {
1411 		io = TAILQ_FIRST(&buffer_free_list);
1412 		TAILQ_REMOVE(&buffer_free_list, io, tailq);
1413 		ublk_io_put_buffer(io, iobuf_ch);
1414 	}
1415 	return rc;
1416 }
1417 
1418 static void
1419 write_get_buffer_done(struct ublk_io *io)
1420 {
1421 	io->need_data = true;
1422 	io->cmd_op = UBLK_IO_NEED_GET_DATA;
1423 	io->result = 0;
1424 
1425 	TAILQ_REMOVE(&io->q->inflight_io_list, io, tailq);
1426 	TAILQ_INSERT_TAIL(&io->q->completed_io_list, io, tailq);
1427 }
1428 
1429 static int
1430 ublk_io_recv(struct ublk_queue *q)
1431 {
1432 	struct io_uring_cqe *cqe;
1433 	unsigned head, tag;
1434 	int fetch, count = 0;
1435 	struct ublk_io *io;
1436 	struct spdk_iobuf_channel *iobuf_ch;
1437 
1438 	if (q->cmd_inflight == 0) {
1439 		return 0;
1440 	}
1441 
1442 	iobuf_ch = &q->poll_group->iobuf_ch;
1443 	io_uring_for_each_cqe(&q->ring, head, cqe) {
1444 		tag = user_data_to_tag(cqe->user_data);
1445 		io = &q->ios[tag];
1446 
1447 		SPDK_DEBUGLOG(ublk_io, "res %d qid %d tag %u, user copy %u, cmd_op %u\n",
1448 			      cqe->res, q->q_id, tag, io->user_copy, user_data_to_op(cqe->user_data));
1449 
1450 		q->cmd_inflight--;
1451 		TAILQ_INSERT_TAIL(&q->inflight_io_list, io, tailq);
1452 
1453 		if (!io->user_copy) {
1454 			fetch = (cqe->res != UBLK_IO_RES_ABORT) && !q->is_stopping;
1455 			if (!fetch) {
1456 				q->is_stopping = true;
1457 				if (io->cmd_op == UBLK_IO_FETCH_REQ) {
1458 					io->cmd_op = 0;
1459 				}
1460 			}
1461 
1462 			if (cqe->res == UBLK_IO_RES_OK) {
1463 				ublk_submit_bdev_io(q, io);
1464 			} else if (cqe->res == UBLK_IO_RES_NEED_GET_DATA) {
1465 				ublk_io_get_buffer(io, iobuf_ch, write_get_buffer_done);
1466 			} else {
1467 				if (cqe->res != UBLK_IO_RES_ABORT) {
1468 					SPDK_ERRLOG("ublk received error io: res %d qid %d tag %u cmd_op %u\n",
1469 						    cqe->res, q->q_id, tag, user_data_to_op(cqe->user_data));
1470 				}
1471 				TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1472 			}
1473 		} else {
1474 
1475 			/* clear `user_copy` for next use of this IO structure */
1476 			io->user_copy = false;
1477 
1478 			assert((ublksrv_get_op(io->iod) == UBLK_IO_OP_READ) ||
1479 			       (ublksrv_get_op(io->iod) == UBLK_IO_OP_WRITE));
1480 			if (cqe->res != io->result) {
1481 				/* EIO */
1482 				ublk_io_done(NULL, false, io);
1483 			} else {
1484 				if (ublksrv_get_op(io->iod) == UBLK_IO_OP_READ) {
1485 					/* bdev_io is already freed in first READ cycle */
1486 					ublk_io_done(NULL, true, io);
1487 				} else {
1488 					_ublk_submit_bdev_io(q, io);
1489 				}
1490 			}
1491 		}
1492 		count += 1;
1493 		if (count == UBLK_QUEUE_REQUEST) {
1494 			break;
1495 		}
1496 	}
1497 	io_uring_cq_advance(&q->ring, count);
1498 
1499 	return count;
1500 }
1501 
1502 static int
1503 ublk_poll(void *arg)
1504 {
1505 	struct ublk_poll_group *poll_group = arg;
1506 	struct ublk_queue *q, *q_tmp;
1507 	int sent, received, count = 0;
1508 
1509 	TAILQ_FOREACH_SAFE(q, &poll_group->queue_list, tailq, q_tmp) {
1510 		sent = ublk_io_xmit(q);
1511 		received = ublk_io_recv(q);
1512 		if (spdk_unlikely(q->is_stopping)) {
1513 			ublk_try_close_queue(q);
1514 		}
1515 		count += sent + received;
1516 	}
1517 	if (count > 0) {
1518 		return SPDK_POLLER_BUSY;
1519 	} else {
1520 		return SPDK_POLLER_IDLE;
1521 	}
1522 }
1523 
1524 static void
1525 ublk_bdev_hot_remove(struct spdk_ublk_dev *ublk)
1526 {
1527 	ublk_close_dev(ublk);
1528 }
1529 
1530 static void
1531 ublk_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1532 		   void *event_ctx)
1533 {
1534 	switch (type) {
1535 	case SPDK_BDEV_EVENT_REMOVE:
1536 		ublk_bdev_hot_remove(event_ctx);
1537 		break;
1538 	default:
1539 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1540 		break;
1541 	}
1542 }
1543 
1544 static void
1545 ublk_dev_init_io_cmds(struct io_uring *r, uint32_t q_depth)
1546 {
1547 	struct io_uring_sqe *sqe;
1548 	uint32_t i;
1549 
1550 	for (i = 0; i < q_depth; i++) {
1551 		sqe = ublk_uring_get_sqe(r, i);
1552 
1553 		/* These fields should be written once, never change */
1554 		sqe->flags = IOSQE_FIXED_FILE;
1555 		sqe->rw_flags = 0;
1556 		sqe->ioprio = 0;
1557 		sqe->off = 0;
1558 	}
1559 }
1560 
1561 static int
1562 ublk_dev_queue_init(struct ublk_queue *q)
1563 {
1564 	int rc = 0, cmd_buf_size;
1565 	uint32_t j;
1566 	struct spdk_ublk_dev *ublk = q->dev;
1567 	unsigned long off;
1568 
1569 	cmd_buf_size = ublk_queue_cmd_buf_sz(q->q_depth);
1570 	off = UBLKSRV_CMD_BUF_OFFSET +
1571 	      q->q_id * (UBLK_MAX_QUEUE_DEPTH * sizeof(struct ublksrv_io_desc));
1572 	q->io_cmd_buf = (struct ublksrv_io_desc *)mmap(0, cmd_buf_size, PROT_READ,
1573 			MAP_SHARED | MAP_POPULATE, ublk->cdev_fd, off);
1574 	if (q->io_cmd_buf == MAP_FAILED) {
1575 		q->io_cmd_buf = NULL;
1576 		rc = -errno;
1577 		SPDK_ERRLOG("Failed at mmap: %s\n", spdk_strerror(-rc));
1578 		return rc;
1579 	}
1580 
1581 	for (j = 0; j < q->q_depth; j++) {
1582 		q->ios[j].cmd_op = UBLK_IO_FETCH_REQ;
1583 		q->ios[j].iod = &q->io_cmd_buf[j];
1584 	}
1585 
1586 	rc = ublk_setup_ring(q->q_depth, &q->ring, IORING_SETUP_SQE128);
1587 	if (rc < 0) {
1588 		SPDK_ERRLOG("Failed at setup uring: %s\n", spdk_strerror(-rc));
1589 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1590 		q->io_cmd_buf = NULL;
1591 		return rc;
1592 	}
1593 
1594 	rc = io_uring_register_files(&q->ring, &ublk->cdev_fd, 1);
1595 	if (rc != 0) {
1596 		SPDK_ERRLOG("Failed at uring register files: %s\n", spdk_strerror(-rc));
1597 		io_uring_queue_exit(&q->ring);
1598 		q->ring.ring_fd = -1;
1599 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1600 		q->io_cmd_buf = NULL;
1601 		return rc;
1602 	}
1603 
1604 	ublk_dev_init_io_cmds(&q->ring, q->q_depth);
1605 
1606 	return 0;
1607 }
1608 
1609 static void
1610 ublk_dev_queue_fini(struct ublk_queue *q)
1611 {
1612 	if (q->ring.ring_fd >= 0) {
1613 		io_uring_unregister_files(&q->ring);
1614 		io_uring_queue_exit(&q->ring);
1615 		q->ring.ring_fd = -1;
1616 	}
1617 	if (q->io_cmd_buf) {
1618 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1619 	}
1620 }
1621 
1622 static void
1623 ublk_dev_queue_io_init(struct ublk_queue *q)
1624 {
1625 	struct ublk_io *io;
1626 	uint32_t i;
1627 	int rc __attribute__((unused));
1628 	void *buf;
1629 
1630 	/* Some older kernels require a buffer to get posted, even
1631 	 * when NEED_GET_DATA has been specified.  So allocate a
1632 	 * temporary buffer, only for purposes of this workaround.
1633 	 * It never actually gets used, so we will free it immediately
1634 	 * after all of the commands are posted.
1635 	 */
1636 	buf = malloc(64);
1637 
1638 	assert(q->bdev_ch != NULL);
1639 
1640 	/* Initialize and submit all io commands to ublk driver */
1641 	for (i = 0; i < q->q_depth; i++) {
1642 		io = &q->ios[i];
1643 		io->tag = (uint16_t)i;
1644 		io->payload = buf;
1645 		io->bdev_ch = q->bdev_ch;
1646 		io->bdev_desc = q->dev->bdev_desc;
1647 		ublksrv_queue_io_cmd(q, io, i);
1648 	}
1649 
1650 	q->cmd_inflight += q->q_depth;
1651 	rc = io_uring_submit(&q->ring);
1652 	assert(rc == (int)q->q_depth);
1653 	for (i = 0; i < q->q_depth; i++) {
1654 		io = &q->ios[i];
1655 		io->payload = NULL;
1656 	}
1657 	free(buf);
1658 }
1659 
1660 static int
1661 ublk_set_params(struct spdk_ublk_dev *ublk)
1662 {
1663 	int rc;
1664 
1665 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_SET_PARAMS);
1666 	if (rc < 0) {
1667 		SPDK_ERRLOG("UBLK can't set params for dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
1668 	}
1669 
1670 	return rc;
1671 }
1672 
1673 static void
1674 ublk_dev_info_init(struct spdk_ublk_dev *ublk)
1675 {
1676 	struct ublksrv_ctrl_dev_info uinfo = {
1677 		.queue_depth = ublk->queue_depth,
1678 		.nr_hw_queues = ublk->num_queues,
1679 		.dev_id = ublk->ublk_id,
1680 		.max_io_buf_bytes = UBLK_IO_MAX_BYTES,
1681 		.ublksrv_pid = getpid(),
1682 		.flags = UBLK_F_URING_CMD_COMP_IN_TASK,
1683 	};
1684 
1685 	if (g_ublk_tgt.user_copy) {
1686 		uinfo.flags |= UBLK_F_USER_COPY;
1687 	} else {
1688 		uinfo.flags |= UBLK_F_NEED_GET_DATA;
1689 	}
1690 
1691 	if (g_ublk_tgt.user_recovery) {
1692 		uinfo.flags |= UBLK_F_USER_RECOVERY;
1693 		uinfo.flags |= UBLK_F_USER_RECOVERY_REISSUE;
1694 	}
1695 
1696 	ublk->dev_info = uinfo;
1697 }
1698 
1699 /* Set ublk device parameters based on bdev */
1700 static void
1701 ublk_info_param_init(struct spdk_ublk_dev *ublk)
1702 {
1703 	struct spdk_bdev *bdev = ublk->bdev;
1704 	uint32_t blk_size = spdk_bdev_get_data_block_size(bdev);
1705 	uint32_t pblk_size = spdk_bdev_get_physical_block_size(bdev);
1706 	uint32_t io_opt_blocks = spdk_bdev_get_optimal_io_boundary(bdev);
1707 	uint64_t num_blocks = spdk_bdev_get_num_blocks(bdev);
1708 	uint8_t sectors_per_block = blk_size >> LINUX_SECTOR_SHIFT;
1709 	uint32_t io_min_size = blk_size;
1710 	uint32_t io_opt_size = spdk_max(io_opt_blocks * blk_size, io_min_size);
1711 
1712 	struct ublk_params uparams = {
1713 		.types = UBLK_PARAM_TYPE_BASIC,
1714 		.len = sizeof(struct ublk_params),
1715 		.basic = {
1716 			.logical_bs_shift = spdk_u32log2(blk_size),
1717 			.physical_bs_shift = spdk_u32log2(pblk_size),
1718 			.io_min_shift = spdk_u32log2(io_min_size),
1719 			.io_opt_shift = spdk_u32log2(io_opt_size),
1720 			.dev_sectors = num_blocks * sectors_per_block,
1721 			.max_sectors = UBLK_IO_MAX_BYTES >> LINUX_SECTOR_SHIFT,
1722 		}
1723 	};
1724 
1725 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_FLUSH)) {
1726 		uparams.basic.attrs = UBLK_ATTR_VOLATILE_CACHE;
1727 	}
1728 
1729 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1730 		uparams.types |= UBLK_PARAM_TYPE_DISCARD;
1731 		uparams.discard.discard_alignment = sectors_per_block;
1732 		uparams.discard.max_discard_sectors = num_blocks * sectors_per_block;
1733 		uparams.discard.max_discard_segments = 1;
1734 		uparams.discard.discard_granularity = blk_size;
1735 		if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1736 			uparams.discard.max_write_zeroes_sectors = num_blocks * sectors_per_block;
1737 		}
1738 	}
1739 
1740 	ublk->dev_params = uparams;
1741 }
1742 
1743 static void
1744 _ublk_free_dev(void *arg)
1745 {
1746 	struct spdk_ublk_dev *ublk = arg;
1747 
1748 	ublk_free_dev(ublk);
1749 }
1750 
1751 static void
1752 free_buffers(void *arg)
1753 {
1754 	struct ublk_queue *q = arg;
1755 	uint32_t i;
1756 
1757 	for (i = 0; i < q->q_depth; i++) {
1758 		ublk_io_put_buffer(&q->ios[i], &q->poll_group->iobuf_ch);
1759 	}
1760 	free(q->ios);
1761 	q->ios = NULL;
1762 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _ublk_free_dev, q->dev);
1763 }
1764 
1765 static void
1766 ublk_free_dev(struct spdk_ublk_dev *ublk)
1767 {
1768 	struct ublk_queue *q;
1769 	uint32_t q_idx;
1770 
1771 	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
1772 		q = &ublk->queues[q_idx];
1773 
1774 		/* The ublk_io of this queue are not initialized. */
1775 		if (q->ios == NULL) {
1776 			continue;
1777 		}
1778 
1779 		/* We found a queue that has an ios array that may have buffers
1780 		 * that need to be freed.  Send a message to the queue's thread
1781 		 * so it can free the buffers back to that thread's iobuf channel.
1782 		 * When it's done, it will set q->ios to NULL and send a message
1783 		 * back to this function to continue.
1784 		 */
1785 		if (q->poll_group) {
1786 			spdk_thread_send_msg(q->poll_group->ublk_thread, free_buffers, q);
1787 			return;
1788 		} else {
1789 			free(q->ios);
1790 			q->ios = NULL;
1791 		}
1792 	}
1793 
1794 	/* All of the buffers associated with the queues have been freed, so now
1795 	 * continue with releasing resources for the rest of the ublk device.
1796 	 */
1797 	if (ublk->bdev_desc) {
1798 		spdk_bdev_close(ublk->bdev_desc);
1799 		ublk->bdev_desc = NULL;
1800 	}
1801 
1802 	ublk_dev_list_unregister(ublk);
1803 	SPDK_NOTICELOG("ublk dev %d stopped\n", ublk->ublk_id);
1804 
1805 	free(ublk);
1806 }
1807 
1808 static int
1809 ublk_ios_init(struct spdk_ublk_dev *ublk)
1810 {
1811 	int rc;
1812 	uint32_t i, j;
1813 	struct ublk_queue *q;
1814 
1815 	for (i = 0; i < ublk->num_queues; i++) {
1816 		q = &ublk->queues[i];
1817 
1818 		TAILQ_INIT(&q->completed_io_list);
1819 		TAILQ_INIT(&q->inflight_io_list);
1820 		q->dev = ublk;
1821 		q->q_id = i;
1822 		q->q_depth = ublk->queue_depth;
1823 		q->ios = calloc(q->q_depth, sizeof(struct ublk_io));
1824 		if (!q->ios) {
1825 			rc = -ENOMEM;
1826 			SPDK_ERRLOG("could not allocate queue ios\n");
1827 			goto err;
1828 		}
1829 		for (j = 0; j < q->q_depth; j++) {
1830 			q->ios[j].q = q;
1831 		}
1832 	}
1833 
1834 	return 0;
1835 
1836 err:
1837 	for (i = 0; i < ublk->num_queues; i++) {
1838 		free(q->ios);
1839 		q->ios = NULL;
1840 	}
1841 	return rc;
1842 }
1843 
1844 static void
1845 ublk_queue_recovery_done(void *arg)
1846 {
1847 	struct spdk_ublk_dev *ublk = arg;
1848 
1849 	ublk->online_num_queues++;
1850 	if (ublk->is_recovering && (ublk->online_num_queues == ublk->num_queues)) {
1851 		ublk_ctrl_cmd_submit(ublk, UBLK_CMD_END_USER_RECOVERY);
1852 	}
1853 }
1854 
1855 static void
1856 ublk_queue_run(void *arg1)
1857 {
1858 	struct ublk_queue	*q = arg1;
1859 	struct spdk_ublk_dev *ublk = q->dev;
1860 	struct ublk_poll_group *poll_group = q->poll_group;
1861 
1862 	assert(spdk_get_thread() == poll_group->ublk_thread);
1863 	q->bdev_ch = spdk_bdev_get_io_channel(ublk->bdev_desc);
1864 	/* Queues must be filled with IO in the io pthread */
1865 	ublk_dev_queue_io_init(q);
1866 
1867 	TAILQ_INSERT_TAIL(&poll_group->queue_list, q, tailq);
1868 	spdk_thread_send_msg(spdk_thread_get_app_thread(), ublk_queue_recovery_done, ublk);
1869 }
1870 
1871 int
1872 ublk_start_disk(const char *bdev_name, uint32_t ublk_id,
1873 		uint32_t num_queues, uint32_t queue_depth,
1874 		ublk_ctrl_cb ctrl_cb, void *cb_arg)
1875 {
1876 	int			rc;
1877 	uint32_t		i;
1878 	struct spdk_bdev	*bdev;
1879 	struct spdk_ublk_dev	*ublk = NULL;
1880 	uint32_t		sector_per_block;
1881 
1882 	assert(spdk_thread_is_app_thread(NULL));
1883 
1884 	if (g_ublk_tgt.active == false) {
1885 		SPDK_ERRLOG("NO ublk target exist\n");
1886 		return -ENODEV;
1887 	}
1888 
1889 	ublk = ublk_dev_find_by_id(ublk_id);
1890 	if (ublk != NULL) {
1891 		SPDK_DEBUGLOG(ublk, "ublk id %d is in use.\n", ublk_id);
1892 		return -EBUSY;
1893 	}
1894 
1895 	if (g_ublk_tgt.num_ublk_devs >= g_ublks_max) {
1896 		SPDK_DEBUGLOG(ublk, "Reached maximum number of supported devices: %u\n", g_ublks_max);
1897 		return -ENOTSUP;
1898 	}
1899 
1900 	ublk = calloc(1, sizeof(*ublk));
1901 	if (ublk == NULL) {
1902 		return -ENOMEM;
1903 	}
1904 	ublk->ctrl_cb = ctrl_cb;
1905 	ublk->cb_arg = cb_arg;
1906 	ublk->cdev_fd = -1;
1907 	ublk->ublk_id = ublk_id;
1908 	UBLK_DEBUGLOG(ublk, "bdev %s num_queues %d queue_depth %d\n",
1909 		      bdev_name, num_queues, queue_depth);
1910 
1911 	rc = spdk_bdev_open_ext(bdev_name, true, ublk_bdev_event_cb, ublk, &ublk->bdev_desc);
1912 	if (rc != 0) {
1913 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc);
1914 		free(ublk);
1915 		return rc;
1916 	}
1917 
1918 	bdev = spdk_bdev_desc_get_bdev(ublk->bdev_desc);
1919 	ublk->bdev = bdev;
1920 	sector_per_block = spdk_bdev_get_data_block_size(ublk->bdev) >> LINUX_SECTOR_SHIFT;
1921 	ublk->sector_per_block_shift = spdk_u32log2(sector_per_block);
1922 
1923 	ublk->queues_closed = 0;
1924 	ublk->num_queues = num_queues;
1925 	ublk->queue_depth = queue_depth;
1926 	if (ublk->queue_depth > UBLK_DEV_MAX_QUEUE_DEPTH) {
1927 		SPDK_WARNLOG("Set Queue depth %d of UBLK %d to maximum %d\n",
1928 			     ublk->queue_depth, ublk->ublk_id, UBLK_DEV_MAX_QUEUE_DEPTH);
1929 		ublk->queue_depth = UBLK_DEV_MAX_QUEUE_DEPTH;
1930 	}
1931 	if (ublk->num_queues > UBLK_DEV_MAX_QUEUES) {
1932 		SPDK_WARNLOG("Set Queue num %d of UBLK %d to maximum %d\n",
1933 			     ublk->num_queues, ublk->ublk_id, UBLK_DEV_MAX_QUEUES);
1934 		ublk->num_queues = UBLK_DEV_MAX_QUEUES;
1935 	}
1936 	for (i = 0; i < ublk->num_queues; i++) {
1937 		ublk->queues[i].ring.ring_fd = -1;
1938 	}
1939 
1940 	ublk_dev_info_init(ublk);
1941 	ublk_info_param_init(ublk);
1942 	rc = ublk_ios_init(ublk);
1943 	if (rc != 0) {
1944 		spdk_bdev_close(ublk->bdev_desc);
1945 		free(ublk);
1946 		return rc;
1947 	}
1948 
1949 	SPDK_INFOLOG(ublk, "Enabling kernel access to bdev %s via ublk %d\n",
1950 		     bdev_name, ublk_id);
1951 
1952 	/* Add ublk_dev to the end of disk list */
1953 	ublk_dev_list_register(ublk);
1954 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_ADD_DEV);
1955 	if (rc < 0) {
1956 		SPDK_ERRLOG("UBLK can't add dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
1957 		ublk_free_dev(ublk);
1958 	}
1959 
1960 	return rc;
1961 }
1962 
1963 static int
1964 ublk_start_dev(struct spdk_ublk_dev *ublk, bool is_recovering)
1965 {
1966 	int			rc;
1967 	uint32_t		q_id;
1968 	struct spdk_thread	*ublk_thread;
1969 	char			buf[64];
1970 
1971 	snprintf(buf, 64, "%s%d", UBLK_BLK_CDEV, ublk->ublk_id);
1972 	ublk->cdev_fd = open(buf, O_RDWR);
1973 	if (ublk->cdev_fd < 0) {
1974 		rc = ublk->cdev_fd;
1975 		SPDK_ERRLOG("can't open %s, rc %d\n", buf, rc);
1976 		return rc;
1977 	}
1978 
1979 	for (q_id = 0; q_id < ublk->num_queues; q_id++) {
1980 		rc = ublk_dev_queue_init(&ublk->queues[q_id]);
1981 		if (rc) {
1982 			return rc;
1983 		}
1984 	}
1985 
1986 	if (!is_recovering) {
1987 		rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_START_DEV);
1988 		if (rc < 0) {
1989 			SPDK_ERRLOG("start dev %d failed, rc %s\n", ublk->ublk_id,
1990 				    spdk_strerror(-rc));
1991 			return rc;
1992 		}
1993 	}
1994 
1995 	/* Send queue to different spdk_threads for load balance */
1996 	for (q_id = 0; q_id < ublk->num_queues; q_id++) {
1997 		ublk->queues[q_id].poll_group = &g_ublk_tgt.poll_groups[g_next_ublk_poll_group];
1998 		ublk_thread = g_ublk_tgt.poll_groups[g_next_ublk_poll_group].ublk_thread;
1999 		spdk_thread_send_msg(ublk_thread, ublk_queue_run, &ublk->queues[q_id]);
2000 		g_next_ublk_poll_group++;
2001 		if (g_next_ublk_poll_group == g_num_ublk_poll_groups) {
2002 			g_next_ublk_poll_group = 0;
2003 		}
2004 	}
2005 
2006 	return 0;
2007 }
2008 
2009 static int
2010 ublk_ctrl_start_recovery(struct spdk_ublk_dev *ublk)
2011 {
2012 	int                     rc;
2013 	uint32_t                i;
2014 
2015 	if (ublk->ublk_id != ublk->dev_info.dev_id) {
2016 		SPDK_ERRLOG("Invalid ublk ID\n");
2017 		return -EINVAL;
2018 	}
2019 
2020 	ublk->num_queues = ublk->dev_info.nr_hw_queues;
2021 	ublk->queue_depth = ublk->dev_info.queue_depth;
2022 	ublk->dev_info.ublksrv_pid = getpid();
2023 
2024 	SPDK_DEBUGLOG(ublk, "Recovering ublk %d, num queues %u, queue depth %u, flags 0x%llx\n",
2025 		      ublk->ublk_id,
2026 		      ublk->num_queues, ublk->queue_depth, ublk->dev_info.flags);
2027 
2028 	for (i = 0; i < ublk->num_queues; i++) {
2029 		ublk->queues[i].ring.ring_fd = -1;
2030 	}
2031 
2032 	ublk_info_param_init(ublk);
2033 	rc = ublk_ios_init(ublk);
2034 	if (rc != 0) {
2035 		return rc;
2036 	}
2037 
2038 	ublk->is_recovering = true;
2039 	return ublk_ctrl_cmd_submit(ublk, UBLK_CMD_START_USER_RECOVERY);
2040 }
2041 
2042 int
2043 ublk_start_disk_recovery(const char *bdev_name, uint32_t ublk_id, ublk_ctrl_cb ctrl_cb,
2044 			 void *cb_arg)
2045 {
2046 	int			rc;
2047 	struct spdk_bdev	*bdev;
2048 	struct spdk_ublk_dev	*ublk = NULL;
2049 	uint32_t		sector_per_block;
2050 
2051 	assert(spdk_thread_is_app_thread(NULL));
2052 
2053 	if (g_ublk_tgt.active == false) {
2054 		SPDK_ERRLOG("NO ublk target exist\n");
2055 		return -ENODEV;
2056 	}
2057 
2058 	if (!g_ublk_tgt.user_recovery) {
2059 		SPDK_ERRLOG("User recovery is enabled with kernel version >= 6.4\n");
2060 		return -ENOTSUP;
2061 	}
2062 
2063 	ublk = ublk_dev_find_by_id(ublk_id);
2064 	if (ublk != NULL) {
2065 		SPDK_DEBUGLOG(ublk, "ublk id %d is in use.\n", ublk_id);
2066 		return -EBUSY;
2067 	}
2068 
2069 	if (g_ublk_tgt.num_ublk_devs >= g_ublks_max) {
2070 		SPDK_DEBUGLOG(ublk, "Reached maximum number of supported devices: %u\n", g_ublks_max);
2071 		return -ENOTSUP;
2072 	}
2073 
2074 	ublk = calloc(1, sizeof(*ublk));
2075 	if (ublk == NULL) {
2076 		return -ENOMEM;
2077 	}
2078 	ublk->ctrl_cb = ctrl_cb;
2079 	ublk->cb_arg = cb_arg;
2080 	ublk->cdev_fd = -1;
2081 	ublk->ublk_id = ublk_id;
2082 
2083 	rc = spdk_bdev_open_ext(bdev_name, true, ublk_bdev_event_cb, ublk, &ublk->bdev_desc);
2084 	if (rc != 0) {
2085 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc);
2086 		free(ublk);
2087 		return rc;
2088 	}
2089 
2090 	bdev = spdk_bdev_desc_get_bdev(ublk->bdev_desc);
2091 	ublk->bdev = bdev;
2092 	sector_per_block = spdk_bdev_get_data_block_size(ublk->bdev) >> LINUX_SECTOR_SHIFT;
2093 	ublk->sector_per_block_shift = spdk_u32log2(sector_per_block);
2094 
2095 	SPDK_NOTICELOG("Recovering ublk %d with bdev %s\n", ublk->ublk_id, bdev_name);
2096 
2097 	ublk_dev_list_register(ublk);
2098 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_GET_DEV_INFO);
2099 	if (rc < 0) {
2100 		ublk_free_dev(ublk);
2101 	}
2102 
2103 	return rc;
2104 }
2105 
2106 SPDK_LOG_REGISTER_COMPONENT(ublk)
2107 SPDK_LOG_REGISTER_COMPONENT(ublk_io)
2108