xref: /spdk/lib/ublk/ublk.c (revision 88655eb2b7be22c5da5b20743e63d111448af52c)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2022 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include <liburing.h>
7 
8 #include "spdk/stdinc.h"
9 #include "spdk/string.h"
10 #include "spdk/bdev.h"
11 #include "spdk/endian.h"
12 #include "spdk/env.h"
13 #include "spdk/likely.h"
14 #include "spdk/log.h"
15 #include "spdk/util.h"
16 #include "spdk/queue.h"
17 #include "spdk/json.h"
18 #include "spdk/ublk.h"
19 #include "spdk/thread.h"
20 #include "spdk/file.h"
21 
22 #include "ublk_internal.h"
23 
24 #define UBLK_CTRL_DEV					"/dev/ublk-control"
25 #define UBLK_BLK_CDEV					"/dev/ublkc"
26 
27 #define LINUX_SECTOR_SHIFT				9
28 #define UBLK_IO_MAX_BYTES				SPDK_BDEV_LARGE_BUF_MAX_SIZE
29 #define UBLK_DEV_MAX_QUEUES				32
30 #define UBLK_DEV_MAX_QUEUE_DEPTH			1024
31 #define UBLK_QUEUE_REQUEST				32
32 #define UBLK_STOP_BUSY_WAITING_MS			10000
33 #define UBLK_BUSY_POLLING_INTERVAL_US			20000
34 #define UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US	1000
35 /* By default, kernel ublk_drv driver can support up to 64 block devices */
36 #define UBLK_DEFAULT_MAX_SUPPORTED_DEVS			64
37 
38 #define UBLK_IOBUF_SMALL_CACHE_SIZE			128
39 #define UBLK_IOBUF_LARGE_CACHE_SIZE			32
40 
41 #define UBLK_DEBUGLOG(ublk, format, ...) \
42 	SPDK_DEBUGLOG(ublk, "ublk%d: " format, ublk->ublk_id, ##__VA_ARGS__);
43 
44 static uint32_t g_num_ublk_poll_groups = 0;
45 static uint32_t g_next_ublk_poll_group = 0;
46 static uint32_t g_ublks_max = UBLK_DEFAULT_MAX_SUPPORTED_DEVS;
47 static struct spdk_cpuset g_core_mask;
48 static bool g_disable_user_copy = false;
49 
50 struct ublk_queue;
51 struct ublk_poll_group;
52 struct ublk_io;
53 static void _ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io);
54 static void ublk_dev_queue_fini(struct ublk_queue *q);
55 static int ublk_poll(void *arg);
56 
57 static int ublk_set_params(struct spdk_ublk_dev *ublk);
58 static int ublk_start_dev(struct spdk_ublk_dev *ublk, bool is_recovering);
59 static void ublk_free_dev(struct spdk_ublk_dev *ublk);
60 static void ublk_delete_dev(void *arg);
61 static int ublk_close_dev(struct spdk_ublk_dev *ublk);
62 static int ublk_ctrl_start_recovery(struct spdk_ublk_dev *ublk);
63 
64 static int ublk_ctrl_cmd_submit(struct spdk_ublk_dev *ublk, uint32_t cmd_op);
65 
66 static const char *ublk_op_name[64] = {
67 	[UBLK_CMD_GET_DEV_INFO] = "UBLK_CMD_GET_DEV_INFO",
68 	[UBLK_CMD_ADD_DEV] =	"UBLK_CMD_ADD_DEV",
69 	[UBLK_CMD_DEL_DEV] =	"UBLK_CMD_DEL_DEV",
70 	[UBLK_CMD_START_DEV] =	"UBLK_CMD_START_DEV",
71 	[UBLK_CMD_STOP_DEV] =	"UBLK_CMD_STOP_DEV",
72 	[UBLK_CMD_SET_PARAMS] =	"UBLK_CMD_SET_PARAMS",
73 	[UBLK_CMD_START_USER_RECOVERY] = "UBLK_CMD_START_USER_RECOVERY",
74 	[UBLK_CMD_END_USER_RECOVERY] = "UBLK_CMD_END_USER_RECOVERY",
75 };
76 
77 typedef void (*ublk_get_buf_cb)(struct ublk_io *io);
78 
79 struct ublk_io {
80 	void			*payload;
81 	void			*mpool_entry;
82 	bool			need_data;
83 	bool			user_copy;
84 	uint16_t		tag;
85 	uint64_t		payload_size;
86 	uint32_t		cmd_op;
87 	int32_t			result;
88 	struct spdk_bdev_desc	*bdev_desc;
89 	struct spdk_io_channel	*bdev_ch;
90 	const struct ublksrv_io_desc	*iod;
91 	ublk_get_buf_cb		get_buf_cb;
92 	struct ublk_queue	*q;
93 	/* for bdev io_wait */
94 	struct spdk_bdev_io_wait_entry bdev_io_wait;
95 	struct spdk_iobuf_entry	iobuf;
96 
97 	TAILQ_ENTRY(ublk_io)	tailq;
98 };
99 
100 struct ublk_queue {
101 	uint32_t		q_id;
102 	uint32_t		q_depth;
103 	struct ublk_io		*ios;
104 	TAILQ_HEAD(, ublk_io)	completed_io_list;
105 	TAILQ_HEAD(, ublk_io)	inflight_io_list;
106 	uint32_t		cmd_inflight;
107 	bool			is_stopping;
108 	struct ublksrv_io_desc	*io_cmd_buf;
109 	/* ring depth == dev_info->queue_depth. */
110 	struct io_uring		ring;
111 	struct spdk_ublk_dev	*dev;
112 	struct ublk_poll_group	*poll_group;
113 	struct spdk_io_channel	*bdev_ch;
114 
115 	TAILQ_ENTRY(ublk_queue)	tailq;
116 };
117 
118 struct spdk_ublk_dev {
119 	struct spdk_bdev	*bdev;
120 	struct spdk_bdev_desc	*bdev_desc;
121 
122 	int			cdev_fd;
123 	struct ublk_params	dev_params;
124 	struct ublksrv_ctrl_dev_info	dev_info;
125 
126 	uint32_t		ublk_id;
127 	uint32_t		num_queues;
128 	uint32_t		queue_depth;
129 	uint32_t		online_num_queues;
130 	uint32_t		sector_per_block_shift;
131 	struct ublk_queue	queues[UBLK_DEV_MAX_QUEUES];
132 
133 	struct spdk_poller	*retry_poller;
134 	int			retry_count;
135 	uint32_t		queues_closed;
136 	ublk_ctrl_cb		ctrl_cb;
137 	void			*cb_arg;
138 	uint32_t		current_cmd_op;
139 	uint32_t		ctrl_ops_in_progress;
140 	bool			is_closing;
141 	bool			is_recovering;
142 
143 	TAILQ_ENTRY(spdk_ublk_dev) tailq;
144 	TAILQ_ENTRY(spdk_ublk_dev) wait_tailq;
145 };
146 
147 struct ublk_poll_group {
148 	struct spdk_thread		*ublk_thread;
149 	struct spdk_poller		*ublk_poller;
150 	struct spdk_iobuf_channel	iobuf_ch;
151 	TAILQ_HEAD(, ublk_queue)	queue_list;
152 };
153 
154 struct ublk_tgt {
155 	int			ctrl_fd;
156 	bool			active;
157 	bool			is_destroying;
158 	spdk_ublk_fini_cb	cb_fn;
159 	void			*cb_arg;
160 	struct io_uring		ctrl_ring;
161 	struct spdk_poller	*ctrl_poller;
162 	uint32_t		ctrl_ops_in_progress;
163 	struct ublk_poll_group	*poll_groups;
164 	uint32_t		num_ublk_devs;
165 	uint64_t		features;
166 	/* `ublk_drv` supports UBLK_F_CMD_IOCTL_ENCODE */
167 	bool			ioctl_encode;
168 	/* `ublk_drv` supports UBLK_F_USER_COPY */
169 	bool			user_copy;
170 	/* `ublk_drv` supports UBLK_F_USER_RECOVERY */
171 	bool			user_recovery;
172 };
173 
174 static TAILQ_HEAD(, spdk_ublk_dev) g_ublk_devs = TAILQ_HEAD_INITIALIZER(g_ublk_devs);
175 static struct ublk_tgt g_ublk_tgt;
176 
177 /* helpers for using io_uring */
178 static inline int
179 ublk_setup_ring(uint32_t depth, struct io_uring *r, unsigned flags)
180 {
181 	struct io_uring_params p = {};
182 
183 	p.flags = flags | IORING_SETUP_CQSIZE;
184 	p.cq_entries = depth;
185 
186 	return io_uring_queue_init_params(depth, r, &p);
187 }
188 
189 static inline struct io_uring_sqe *
190 ublk_uring_get_sqe(struct io_uring *r, uint32_t idx)
191 {
192 	/* Need to update the idx since we set IORING_SETUP_SQE128 parameter in ublk_setup_ring */
193 	return &r->sq.sqes[idx << 1];
194 }
195 
196 static inline void *
197 ublk_get_sqe_cmd(struct io_uring_sqe *sqe)
198 {
199 	return (void *)&sqe->addr3;
200 }
201 
202 static inline void
203 ublk_set_sqe_cmd_op(struct io_uring_sqe *sqe, uint32_t cmd_op)
204 {
205 	uint32_t opc = cmd_op;
206 
207 	if (g_ublk_tgt.ioctl_encode) {
208 		switch (cmd_op) {
209 		/* ctrl uring */
210 		case UBLK_CMD_GET_DEV_INFO:
211 			opc = _IOR('u', UBLK_CMD_GET_DEV_INFO, struct ublksrv_ctrl_cmd);
212 			break;
213 		case UBLK_CMD_ADD_DEV:
214 			opc = _IOWR('u', UBLK_CMD_ADD_DEV, struct ublksrv_ctrl_cmd);
215 			break;
216 		case UBLK_CMD_DEL_DEV:
217 			opc = _IOWR('u', UBLK_CMD_DEL_DEV, struct ublksrv_ctrl_cmd);
218 			break;
219 		case UBLK_CMD_START_DEV:
220 			opc = _IOWR('u', UBLK_CMD_START_DEV, struct ublksrv_ctrl_cmd);
221 			break;
222 		case UBLK_CMD_STOP_DEV:
223 			opc = _IOWR('u', UBLK_CMD_STOP_DEV, struct ublksrv_ctrl_cmd);
224 			break;
225 		case UBLK_CMD_SET_PARAMS:
226 			opc = _IOWR('u', UBLK_CMD_SET_PARAMS, struct ublksrv_ctrl_cmd);
227 			break;
228 		case UBLK_CMD_START_USER_RECOVERY:
229 			opc = _IOWR('u', UBLK_CMD_START_USER_RECOVERY, struct ublksrv_ctrl_cmd);
230 			break;
231 		case UBLK_CMD_END_USER_RECOVERY:
232 			opc = _IOWR('u', UBLK_CMD_END_USER_RECOVERY, struct ublksrv_ctrl_cmd);
233 			break;
234 
235 		/* io uring */
236 		case UBLK_IO_FETCH_REQ:
237 			opc = _IOWR('u', UBLK_IO_FETCH_REQ, struct ublksrv_io_cmd);
238 			break;
239 		case UBLK_IO_COMMIT_AND_FETCH_REQ:
240 			opc = _IOWR('u', UBLK_IO_COMMIT_AND_FETCH_REQ, struct ublksrv_io_cmd);
241 			break;
242 		case UBLK_IO_NEED_GET_DATA:
243 			opc = _IOWR('u', UBLK_IO_NEED_GET_DATA, struct ublksrv_io_cmd);
244 			break;
245 		default:
246 			break;
247 		}
248 	}
249 
250 	sqe->off = opc;
251 }
252 
253 static inline uint64_t
254 build_user_data(uint16_t tag, uint8_t op)
255 {
256 	assert(!(tag >> 16) && !(op >> 8));
257 
258 	return tag | (op << 16);
259 }
260 
261 static inline uint16_t
262 user_data_to_tag(uint64_t user_data)
263 {
264 	return user_data & 0xffff;
265 }
266 
267 static inline uint8_t
268 user_data_to_op(uint64_t user_data)
269 {
270 	return (user_data >> 16) & 0xff;
271 }
272 
273 static inline uint64_t
274 ublk_user_copy_pos(uint16_t q_id, uint16_t tag)
275 {
276 	return (uint64_t)UBLKSRV_IO_BUF_OFFSET + ((((uint64_t)q_id) << UBLK_QID_OFF) | (((
277 				uint64_t)tag) << UBLK_TAG_OFF));
278 }
279 
280 void
281 spdk_ublk_init(void)
282 {
283 	assert(spdk_thread_is_app_thread(NULL));
284 
285 	g_ublk_tgt.ctrl_fd = -1;
286 	g_ublk_tgt.ctrl_ring.ring_fd = -1;
287 }
288 
289 static void
290 ublk_ctrl_cmd_error(struct spdk_ublk_dev *ublk, int32_t res)
291 {
292 	assert(res != 0);
293 
294 	SPDK_ERRLOG("ctrlr cmd %s failed, %s\n", ublk_op_name[ublk->current_cmd_op], spdk_strerror(-res));
295 	if (ublk->ctrl_cb) {
296 		ublk->ctrl_cb(ublk->cb_arg, res);
297 		ublk->ctrl_cb = NULL;
298 	}
299 
300 	switch (ublk->current_cmd_op) {
301 	case UBLK_CMD_ADD_DEV:
302 	case UBLK_CMD_SET_PARAMS:
303 	case UBLK_CMD_START_USER_RECOVERY:
304 	case UBLK_CMD_END_USER_RECOVERY:
305 		ublk_delete_dev(ublk);
306 		break;
307 	case UBLK_CMD_START_DEV:
308 		ublk_close_dev(ublk);
309 		break;
310 	case UBLK_CMD_GET_DEV_INFO:
311 		ublk_free_dev(ublk);
312 		break;
313 	case UBLK_CMD_STOP_DEV:
314 	case UBLK_CMD_DEL_DEV:
315 		break;
316 	default:
317 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", ublk->current_cmd_op);
318 		break;
319 	}
320 }
321 
322 static int
323 _ublk_get_device_state_retry(void *arg)
324 {
325 	struct spdk_ublk_dev *ublk = arg;
326 	int rc;
327 
328 	spdk_poller_unregister(&ublk->retry_poller);
329 
330 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_GET_DEV_INFO);
331 	if (rc < 0) {
332 		ublk_delete_dev(ublk);
333 		if (ublk->ctrl_cb) {
334 			ublk->ctrl_cb(ublk->cb_arg, rc);
335 			ublk->ctrl_cb = NULL;
336 		}
337 	}
338 
339 	return SPDK_POLLER_BUSY;
340 }
341 
342 static void
343 ublk_ctrl_process_cqe(struct io_uring_cqe *cqe)
344 {
345 	struct spdk_ublk_dev *ublk;
346 	int rc = 0;
347 
348 	ublk = (struct spdk_ublk_dev *)cqe->user_data;
349 	UBLK_DEBUGLOG(ublk, "ctrl cmd %s completed\n", ublk_op_name[ublk->current_cmd_op]);
350 	ublk->ctrl_ops_in_progress--;
351 
352 	if (spdk_unlikely(cqe->res != 0)) {
353 		ublk_ctrl_cmd_error(ublk, cqe->res);
354 		return;
355 	}
356 
357 	switch (ublk->current_cmd_op) {
358 	case UBLK_CMD_ADD_DEV:
359 		rc = ublk_set_params(ublk);
360 		if (rc < 0) {
361 			ublk_delete_dev(ublk);
362 			goto cb_done;
363 		}
364 		break;
365 	case UBLK_CMD_SET_PARAMS:
366 		rc = ublk_start_dev(ublk, false);
367 		if (rc < 0) {
368 			ublk_delete_dev(ublk);
369 			goto cb_done;
370 		}
371 		break;
372 	case UBLK_CMD_START_DEV:
373 		goto cb_done;
374 		break;
375 	case UBLK_CMD_STOP_DEV:
376 		break;
377 	case UBLK_CMD_DEL_DEV:
378 		if (ublk->ctrl_cb) {
379 			ublk->ctrl_cb(ublk->cb_arg, 0);
380 			ublk->ctrl_cb = NULL;
381 		}
382 		ublk_free_dev(ublk);
383 		break;
384 	case UBLK_CMD_GET_DEV_INFO:
385 		if (ublk->ublk_id != ublk->dev_info.dev_id) {
386 			SPDK_ERRLOG("Invalid ublk ID\n");
387 			rc = -EINVAL;
388 			goto cb_done;
389 		}
390 
391 		UBLK_DEBUGLOG(ublk, "Ublk %u device state %u\n", ublk->ublk_id, ublk->dev_info.state);
392 		/* kernel ublk_drv driver returns -EBUSY if device state isn't UBLK_S_DEV_QUIESCED */
393 		if ((ublk->dev_info.state != UBLK_S_DEV_QUIESCED) && (ublk->retry_count < 3)) {
394 			ublk->retry_count++;
395 			ublk->retry_poller = SPDK_POLLER_REGISTER(_ublk_get_device_state_retry, ublk, 1000000);
396 			return;
397 		}
398 
399 		rc = ublk_ctrl_start_recovery(ublk);
400 		if (rc < 0) {
401 			ublk_delete_dev(ublk);
402 			goto cb_done;
403 		}
404 		break;
405 	case UBLK_CMD_START_USER_RECOVERY:
406 		rc = ublk_start_dev(ublk, true);
407 		if (rc < 0) {
408 			ublk_delete_dev(ublk);
409 			goto cb_done;
410 		}
411 		break;
412 	case UBLK_CMD_END_USER_RECOVERY:
413 		SPDK_NOTICELOG("Ublk %u recover done successfully\n", ublk->ublk_id);
414 		ublk->is_recovering = false;
415 		goto cb_done;
416 		break;
417 	default:
418 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", ublk->current_cmd_op);
419 		break;
420 	}
421 
422 	return;
423 
424 cb_done:
425 	if (ublk->ctrl_cb) {
426 		ublk->ctrl_cb(ublk->cb_arg, rc);
427 		ublk->ctrl_cb = NULL;
428 	}
429 }
430 
431 static int
432 ublk_ctrl_poller(void *arg)
433 {
434 	struct io_uring *ring = &g_ublk_tgt.ctrl_ring;
435 	struct io_uring_cqe *cqe;
436 	const int max = 8;
437 	int i, count = 0, rc;
438 
439 	if (!g_ublk_tgt.ctrl_ops_in_progress) {
440 		return SPDK_POLLER_IDLE;
441 	}
442 
443 	for (i = 0; i < max; i++) {
444 		rc = io_uring_peek_cqe(ring, &cqe);
445 		if (rc == -EAGAIN) {
446 			break;
447 		}
448 
449 		assert(cqe != NULL);
450 		g_ublk_tgt.ctrl_ops_in_progress--;
451 
452 		ublk_ctrl_process_cqe(cqe);
453 
454 		io_uring_cqe_seen(ring, cqe);
455 		count++;
456 	}
457 
458 	return count > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
459 }
460 
461 static int
462 ublk_ctrl_cmd_submit(struct spdk_ublk_dev *ublk, uint32_t cmd_op)
463 {
464 	uint32_t dev_id = ublk->ublk_id;
465 	int rc = -EINVAL;
466 	struct io_uring_sqe *sqe;
467 	struct ublksrv_ctrl_cmd *cmd;
468 
469 	UBLK_DEBUGLOG(ublk, "ctrl cmd %s\n", ublk_op_name[cmd_op]);
470 
471 	sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring);
472 	if (!sqe) {
473 		SPDK_ERRLOG("No available sqe in ctrl ring\n");
474 		assert(false);
475 		return -ENOENT;
476 	}
477 
478 	cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe);
479 	sqe->fd = g_ublk_tgt.ctrl_fd;
480 	sqe->opcode = IORING_OP_URING_CMD;
481 	sqe->ioprio = 0;
482 	cmd->dev_id = dev_id;
483 	cmd->queue_id = -1;
484 	ublk->current_cmd_op = cmd_op;
485 
486 	switch (cmd_op) {
487 	case UBLK_CMD_ADD_DEV:
488 	case UBLK_CMD_GET_DEV_INFO:
489 		cmd->addr = (__u64)(uintptr_t)&ublk->dev_info;
490 		cmd->len = sizeof(ublk->dev_info);
491 		break;
492 	case UBLK_CMD_SET_PARAMS:
493 		cmd->addr = (__u64)(uintptr_t)&ublk->dev_params;
494 		cmd->len = sizeof(ublk->dev_params);
495 		break;
496 	case UBLK_CMD_START_DEV:
497 		cmd->data[0] = getpid();
498 		break;
499 	case UBLK_CMD_STOP_DEV:
500 		break;
501 	case UBLK_CMD_DEL_DEV:
502 		break;
503 	case UBLK_CMD_START_USER_RECOVERY:
504 		break;
505 	case UBLK_CMD_END_USER_RECOVERY:
506 		cmd->data[0] = getpid();
507 		break;
508 	default:
509 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", cmd_op);
510 		return -EINVAL;
511 	}
512 	ublk_set_sqe_cmd_op(sqe, cmd_op);
513 	io_uring_sqe_set_data(sqe, ublk);
514 
515 	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
516 	if (rc < 0) {
517 		SPDK_ERRLOG("uring submit rc %d\n", rc);
518 		assert(false);
519 		return rc;
520 	}
521 	g_ublk_tgt.ctrl_ops_in_progress++;
522 	ublk->ctrl_ops_in_progress++;
523 
524 	return 0;
525 }
526 
527 static int
528 ublk_ctrl_cmd_get_features(void)
529 {
530 	int rc;
531 	struct io_uring_sqe *sqe;
532 	struct io_uring_cqe *cqe;
533 	struct ublksrv_ctrl_cmd *cmd;
534 	uint32_t cmd_op;
535 
536 	sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring);
537 	if (!sqe) {
538 		SPDK_ERRLOG("No available sqe in ctrl ring\n");
539 		assert(false);
540 		return -ENOENT;
541 	}
542 
543 	cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe);
544 	sqe->fd = g_ublk_tgt.ctrl_fd;
545 	sqe->opcode = IORING_OP_URING_CMD;
546 	sqe->ioprio = 0;
547 	cmd->dev_id = -1;
548 	cmd->queue_id = -1;
549 	cmd->addr = (__u64)(uintptr_t)&g_ublk_tgt.features;
550 	cmd->len = sizeof(g_ublk_tgt.features);
551 
552 	cmd_op = UBLK_U_CMD_GET_FEATURES;
553 	ublk_set_sqe_cmd_op(sqe, cmd_op);
554 
555 	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
556 	if (rc < 0) {
557 		SPDK_ERRLOG("uring submit rc %d\n", rc);
558 		return rc;
559 	}
560 
561 	rc = io_uring_wait_cqe(&g_ublk_tgt.ctrl_ring, &cqe);
562 	if (rc < 0) {
563 		SPDK_ERRLOG("wait cqe rc %d\n", rc);
564 		return rc;
565 	}
566 
567 	if (cqe->res == 0) {
568 		g_ublk_tgt.ioctl_encode = !!(g_ublk_tgt.features & UBLK_F_CMD_IOCTL_ENCODE);
569 		g_ublk_tgt.user_copy = !!(g_ublk_tgt.features & UBLK_F_USER_COPY);
570 		g_ublk_tgt.user_copy &= !g_disable_user_copy;
571 		g_ublk_tgt.user_recovery = !!(g_ublk_tgt.features & UBLK_F_USER_RECOVERY);
572 		SPDK_NOTICELOG("User Copy %s\n", g_ublk_tgt.user_copy ? "enabled" : "disabled");
573 	}
574 	io_uring_cqe_seen(&g_ublk_tgt.ctrl_ring, cqe);
575 
576 	return 0;
577 }
578 
579 static int
580 ublk_queue_cmd_buf_sz(uint32_t q_depth)
581 {
582 	uint32_t size = q_depth * sizeof(struct ublksrv_io_desc);
583 	uint32_t page_sz = getpagesize();
584 
585 	/* round up size */
586 	return (size + page_sz - 1) & ~(page_sz - 1);
587 }
588 
589 static int
590 ublk_open(void)
591 {
592 	uint32_t ublks_max;
593 	int rc;
594 
595 	g_ublk_tgt.ctrl_fd = open(UBLK_CTRL_DEV, O_RDWR);
596 	if (g_ublk_tgt.ctrl_fd < 0) {
597 		rc = errno;
598 		SPDK_ERRLOG("UBLK control dev %s can't be opened, error=%s\n", UBLK_CTRL_DEV, spdk_strerror(errno));
599 		return -rc;
600 	}
601 
602 	rc = spdk_read_sysfs_attribute_uint32(&ublks_max, "%s",
603 					      "/sys/module/ublk_drv/parameters/ublks_max");
604 	if (rc == 0 && ublks_max > 0) {
605 		g_ublks_max = ublks_max;
606 	}
607 
608 	/* We need to set SQPOLL for kernels 6.1 and earlier, since they would not defer ublk ctrl
609 	 * ring processing to a workqueue.  Ctrl ring processing is minimal, so SQPOLL is fine.
610 	 * All the commands sent via control uring for a ublk device is executed one by one, so use
611 	 * ublks_max * 2 as the number of uring entries is enough.
612 	 */
613 	rc = ublk_setup_ring(g_ublks_max * 2, &g_ublk_tgt.ctrl_ring,
614 			     IORING_SETUP_SQE128 | IORING_SETUP_SQPOLL);
615 	if (rc < 0) {
616 		SPDK_ERRLOG("UBLK ctrl queue_init: %s\n", spdk_strerror(-rc));
617 		goto err;
618 	}
619 
620 	rc = ublk_ctrl_cmd_get_features();
621 	if (rc) {
622 		goto err;
623 	}
624 
625 	return 0;
626 
627 err:
628 	close(g_ublk_tgt.ctrl_fd);
629 	g_ublk_tgt.ctrl_fd = -1;
630 	return rc;
631 }
632 
633 static int
634 ublk_parse_core_mask(const char *mask)
635 {
636 	struct spdk_cpuset tmp_mask;
637 	int rc;
638 
639 	if (mask == NULL) {
640 		spdk_env_get_cpuset(&g_core_mask);
641 		return 0;
642 	}
643 
644 	rc = spdk_cpuset_parse(&g_core_mask, mask);
645 	if (rc < 0) {
646 		SPDK_ERRLOG("invalid cpumask %s\n", mask);
647 		return -EINVAL;
648 	}
649 
650 	if (spdk_cpuset_count(&g_core_mask) == 0) {
651 		SPDK_ERRLOG("no cpus specified\n");
652 		return -EINVAL;
653 	}
654 
655 	spdk_env_get_cpuset(&tmp_mask);
656 	spdk_cpuset_and(&tmp_mask, &g_core_mask);
657 
658 	if (!spdk_cpuset_equal(&tmp_mask, &g_core_mask)) {
659 		SPDK_ERRLOG("one of selected cpu is outside of core mask(=%s)\n",
660 			    spdk_cpuset_fmt(&g_core_mask));
661 		return -EINVAL;
662 	}
663 
664 	return 0;
665 }
666 
667 static void
668 ublk_poller_register(void *args)
669 {
670 	struct ublk_poll_group *poll_group = args;
671 	int rc;
672 
673 	assert(spdk_get_thread() == poll_group->ublk_thread);
674 	/* Bind ublk spdk_thread to current CPU core in order to avoid thread context switch
675 	 * during uring processing as required by ublk kernel.
676 	 */
677 	spdk_thread_bind(spdk_get_thread(), true);
678 
679 	TAILQ_INIT(&poll_group->queue_list);
680 	poll_group->ublk_poller = SPDK_POLLER_REGISTER(ublk_poll, poll_group, 0);
681 	rc = spdk_iobuf_channel_init(&poll_group->iobuf_ch, "ublk",
682 				     UBLK_IOBUF_SMALL_CACHE_SIZE, UBLK_IOBUF_LARGE_CACHE_SIZE);
683 	if (rc != 0) {
684 		assert(false);
685 	}
686 }
687 
688 struct rpc_create_target {
689 	bool disable_user_copy;
690 };
691 
692 static const struct spdk_json_object_decoder rpc_ublk_create_target[] = {
693 	{"disable_user_copy", offsetof(struct rpc_create_target, disable_user_copy), spdk_json_decode_bool, true},
694 };
695 
696 int
697 ublk_create_target(const char *cpumask_str, const struct spdk_json_val *params)
698 {
699 	int rc;
700 	uint32_t i;
701 	char thread_name[32];
702 	struct rpc_create_target req = {};
703 	struct ublk_poll_group *poll_group;
704 
705 	if (g_ublk_tgt.active == true) {
706 		SPDK_ERRLOG("UBLK target has been created\n");
707 		return -EBUSY;
708 	}
709 
710 	rc = ublk_parse_core_mask(cpumask_str);
711 	if (rc != 0) {
712 		return rc;
713 	}
714 
715 	if (params) {
716 		if (spdk_json_decode_object_relaxed(params, rpc_ublk_create_target,
717 						    SPDK_COUNTOF(rpc_ublk_create_target),
718 						    &req)) {
719 			SPDK_ERRLOG("spdk_json_decode_object failed\n");
720 			return -EINVAL;
721 		}
722 		g_disable_user_copy = req.disable_user_copy;
723 	}
724 
725 	assert(g_ublk_tgt.poll_groups == NULL);
726 	g_ublk_tgt.poll_groups = calloc(spdk_env_get_core_count(), sizeof(*poll_group));
727 	if (!g_ublk_tgt.poll_groups) {
728 		return -ENOMEM;
729 	}
730 
731 	rc = ublk_open();
732 	if (rc != 0) {
733 		SPDK_ERRLOG("Fail to open UBLK, error=%s\n", spdk_strerror(-rc));
734 		free(g_ublk_tgt.poll_groups);
735 		g_ublk_tgt.poll_groups = NULL;
736 		return rc;
737 	}
738 
739 	spdk_iobuf_register_module("ublk");
740 
741 	SPDK_ENV_FOREACH_CORE(i) {
742 		if (!spdk_cpuset_get_cpu(&g_core_mask, i)) {
743 			continue;
744 		}
745 		snprintf(thread_name, sizeof(thread_name), "ublk_thread%u", i);
746 		poll_group = &g_ublk_tgt.poll_groups[g_num_ublk_poll_groups];
747 		poll_group->ublk_thread = spdk_thread_create(thread_name, &g_core_mask);
748 		spdk_thread_send_msg(poll_group->ublk_thread, ublk_poller_register, poll_group);
749 		g_num_ublk_poll_groups++;
750 	}
751 
752 	assert(spdk_thread_is_app_thread(NULL));
753 	g_ublk_tgt.active = true;
754 	g_ublk_tgt.ctrl_ops_in_progress = 0;
755 	g_ublk_tgt.ctrl_poller = SPDK_POLLER_REGISTER(ublk_ctrl_poller, NULL,
756 				 UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US);
757 
758 	SPDK_NOTICELOG("UBLK target created successfully\n");
759 
760 	return 0;
761 }
762 
763 static void
764 _ublk_fini_done(void *args)
765 {
766 	SPDK_DEBUGLOG(ublk, "\n");
767 
768 	g_num_ublk_poll_groups = 0;
769 	g_next_ublk_poll_group = 0;
770 	g_ublk_tgt.is_destroying = false;
771 	g_ublk_tgt.active = false;
772 	g_ublk_tgt.features = 0;
773 	g_ublk_tgt.ioctl_encode = false;
774 	g_ublk_tgt.user_copy = false;
775 	g_ublk_tgt.user_recovery = false;
776 
777 	if (g_ublk_tgt.cb_fn) {
778 		g_ublk_tgt.cb_fn(g_ublk_tgt.cb_arg);
779 		g_ublk_tgt.cb_fn = NULL;
780 		g_ublk_tgt.cb_arg = NULL;
781 	}
782 
783 	if (g_ublk_tgt.poll_groups) {
784 		free(g_ublk_tgt.poll_groups);
785 		g_ublk_tgt.poll_groups = NULL;
786 	}
787 
788 }
789 
790 static void
791 ublk_thread_exit(void *args)
792 {
793 	struct spdk_thread *ublk_thread = spdk_get_thread();
794 	uint32_t i;
795 
796 	for (i = 0; i < g_num_ublk_poll_groups; i++) {
797 		if (g_ublk_tgt.poll_groups[i].ublk_thread == ublk_thread) {
798 			spdk_poller_unregister(&g_ublk_tgt.poll_groups[i].ublk_poller);
799 			spdk_iobuf_channel_fini(&g_ublk_tgt.poll_groups[i].iobuf_ch);
800 			spdk_thread_bind(ublk_thread, false);
801 			spdk_thread_exit(ublk_thread);
802 		}
803 	}
804 }
805 
806 static int
807 ublk_close_dev(struct spdk_ublk_dev *ublk)
808 {
809 	int rc;
810 
811 	/* set is_closing */
812 	if (ublk->is_closing) {
813 		return -EBUSY;
814 	}
815 	ublk->is_closing = true;
816 
817 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_STOP_DEV);
818 	if (rc < 0) {
819 		SPDK_ERRLOG("stop dev %d failed\n", ublk->ublk_id);
820 	}
821 	return rc;
822 }
823 
824 static void
825 _ublk_fini(void *args)
826 {
827 	struct spdk_ublk_dev	*ublk, *ublk_tmp;
828 
829 	TAILQ_FOREACH_SAFE(ublk, &g_ublk_devs, tailq, ublk_tmp) {
830 		ublk_close_dev(ublk);
831 	}
832 
833 	/* Check if all ublks closed */
834 	if (TAILQ_EMPTY(&g_ublk_devs)) {
835 		SPDK_DEBUGLOG(ublk, "finish shutdown\n");
836 		spdk_poller_unregister(&g_ublk_tgt.ctrl_poller);
837 		if (g_ublk_tgt.ctrl_ring.ring_fd >= 0) {
838 			io_uring_queue_exit(&g_ublk_tgt.ctrl_ring);
839 			g_ublk_tgt.ctrl_ring.ring_fd = -1;
840 		}
841 		if (g_ublk_tgt.ctrl_fd >= 0) {
842 			close(g_ublk_tgt.ctrl_fd);
843 			g_ublk_tgt.ctrl_fd = -1;
844 		}
845 		spdk_for_each_thread(ublk_thread_exit, NULL, _ublk_fini_done);
846 	} else {
847 		spdk_thread_send_msg(spdk_get_thread(), _ublk_fini, NULL);
848 	}
849 }
850 
851 int
852 spdk_ublk_fini(spdk_ublk_fini_cb cb_fn, void *cb_arg)
853 {
854 	assert(spdk_thread_is_app_thread(NULL));
855 
856 	if (g_ublk_tgt.is_destroying == true) {
857 		/* UBLK target is being destroying */
858 		return -EBUSY;
859 	}
860 	g_ublk_tgt.cb_fn = cb_fn;
861 	g_ublk_tgt.cb_arg = cb_arg;
862 	g_ublk_tgt.is_destroying = true;
863 	_ublk_fini(NULL);
864 
865 	return 0;
866 }
867 
868 int
869 ublk_destroy_target(spdk_ublk_fini_cb cb_fn, void *cb_arg)
870 {
871 	int rc;
872 
873 	if (g_ublk_tgt.active == false) {
874 		/* UBLK target has not been created */
875 		return -ENOENT;
876 	}
877 
878 	rc = spdk_ublk_fini(cb_fn, cb_arg);
879 
880 	return rc;
881 }
882 
883 struct spdk_ublk_dev *
884 ublk_dev_find_by_id(uint32_t ublk_id)
885 {
886 	struct spdk_ublk_dev *ublk;
887 
888 	/* check whether ublk has already been registered by ublk path. */
889 	TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) {
890 		if (ublk->ublk_id == ublk_id) {
891 			return ublk;
892 		}
893 	}
894 
895 	return NULL;
896 }
897 
898 uint32_t
899 ublk_dev_get_id(struct spdk_ublk_dev *ublk)
900 {
901 	return ublk->ublk_id;
902 }
903 
904 struct spdk_ublk_dev *ublk_dev_first(void)
905 {
906 	return TAILQ_FIRST(&g_ublk_devs);
907 }
908 
909 struct spdk_ublk_dev *ublk_dev_next(struct spdk_ublk_dev *prev)
910 {
911 	return TAILQ_NEXT(prev, tailq);
912 }
913 
914 uint32_t
915 ublk_dev_get_queue_depth(struct spdk_ublk_dev *ublk)
916 {
917 	return ublk->queue_depth;
918 }
919 
920 uint32_t
921 ublk_dev_get_num_queues(struct spdk_ublk_dev *ublk)
922 {
923 	return ublk->num_queues;
924 }
925 
926 const char *
927 ublk_dev_get_bdev_name(struct spdk_ublk_dev *ublk)
928 {
929 	return spdk_bdev_get_name(ublk->bdev);
930 }
931 
932 void
933 spdk_ublk_write_config_json(struct spdk_json_write_ctx *w)
934 {
935 	struct spdk_ublk_dev *ublk;
936 
937 	spdk_json_write_array_begin(w);
938 
939 	if (g_ublk_tgt.active) {
940 		spdk_json_write_object_begin(w);
941 
942 		spdk_json_write_named_string(w, "method", "ublk_create_target");
943 		spdk_json_write_named_object_begin(w, "params");
944 		spdk_json_write_named_string(w, "cpumask", spdk_cpuset_fmt(&g_core_mask));
945 		spdk_json_write_object_end(w);
946 
947 		spdk_json_write_object_end(w);
948 	}
949 
950 	TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) {
951 		spdk_json_write_object_begin(w);
952 
953 		spdk_json_write_named_string(w, "method", "ublk_start_disk");
954 
955 		spdk_json_write_named_object_begin(w, "params");
956 		spdk_json_write_named_string(w, "bdev_name", ublk_dev_get_bdev_name(ublk));
957 		spdk_json_write_named_uint32(w, "ublk_id", ublk->ublk_id);
958 		spdk_json_write_named_uint32(w, "num_queues", ublk->num_queues);
959 		spdk_json_write_named_uint32(w, "queue_depth", ublk->queue_depth);
960 		spdk_json_write_object_end(w);
961 
962 		spdk_json_write_object_end(w);
963 	}
964 
965 	spdk_json_write_array_end(w);
966 }
967 
968 static void
969 ublk_dev_list_register(struct spdk_ublk_dev *ublk)
970 {
971 	UBLK_DEBUGLOG(ublk, "add to tailq\n");
972 	TAILQ_INSERT_TAIL(&g_ublk_devs, ublk, tailq);
973 	g_ublk_tgt.num_ublk_devs++;
974 }
975 
976 static void
977 ublk_dev_list_unregister(struct spdk_ublk_dev *ublk)
978 {
979 	/*
980 	 * ublk device may be stopped before registered.
981 	 * check whether it was registered.
982 	 */
983 
984 	if (ublk_dev_find_by_id(ublk->ublk_id)) {
985 		UBLK_DEBUGLOG(ublk, "remove from tailq\n");
986 		TAILQ_REMOVE(&g_ublk_devs, ublk, tailq);
987 		assert(g_ublk_tgt.num_ublk_devs);
988 		g_ublk_tgt.num_ublk_devs--;
989 		return;
990 	}
991 
992 	UBLK_DEBUGLOG(ublk, "not found in tailq\n");
993 	assert(false);
994 }
995 
996 static void
997 ublk_delete_dev(void *arg)
998 {
999 	struct spdk_ublk_dev *ublk = arg;
1000 	int rc = 0;
1001 	uint32_t q_idx;
1002 
1003 	assert(spdk_thread_is_app_thread(NULL));
1004 	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
1005 		ublk_dev_queue_fini(&ublk->queues[q_idx]);
1006 	}
1007 
1008 	if (ublk->cdev_fd >= 0) {
1009 		close(ublk->cdev_fd);
1010 	}
1011 
1012 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_DEL_DEV);
1013 	if (rc < 0) {
1014 		SPDK_ERRLOG("delete dev %d failed\n", ublk->ublk_id);
1015 	}
1016 }
1017 
1018 static int
1019 _ublk_close_dev_retry(void *arg)
1020 {
1021 	struct spdk_ublk_dev *ublk = arg;
1022 
1023 	if (ublk->ctrl_ops_in_progress > 0) {
1024 		if (ublk->retry_count-- > 0) {
1025 			return SPDK_POLLER_BUSY;
1026 		}
1027 		SPDK_ERRLOG("Timeout on ctrl op completion.\n");
1028 	}
1029 	spdk_poller_unregister(&ublk->retry_poller);
1030 	ublk_delete_dev(ublk);
1031 	return SPDK_POLLER_BUSY;
1032 }
1033 
1034 static void
1035 ublk_try_close_dev(void *arg)
1036 {
1037 	struct spdk_ublk_dev *ublk = arg;
1038 
1039 	assert(spdk_thread_is_app_thread(NULL));
1040 
1041 	ublk->queues_closed += 1;
1042 	SPDK_DEBUGLOG(ublk_io, "ublkb%u closed queues %u\n", ublk->ublk_id, ublk->queues_closed);
1043 
1044 	if (ublk->queues_closed < ublk->num_queues) {
1045 		return;
1046 	}
1047 
1048 	if (ublk->ctrl_ops_in_progress > 0) {
1049 		assert(ublk->retry_poller == NULL);
1050 		ublk->retry_count = UBLK_STOP_BUSY_WAITING_MS * 1000ULL / UBLK_BUSY_POLLING_INTERVAL_US;
1051 		ublk->retry_poller = SPDK_POLLER_REGISTER(_ublk_close_dev_retry, ublk,
1052 				     UBLK_BUSY_POLLING_INTERVAL_US);
1053 	} else {
1054 		ublk_delete_dev(ublk);
1055 	}
1056 }
1057 
1058 static void
1059 ublk_try_close_queue(struct ublk_queue *q)
1060 {
1061 	struct spdk_ublk_dev *ublk = q->dev;
1062 
1063 	/* Close queue until no I/O is submitted to bdev in flight,
1064 	 * no I/O is waiting to commit result, and all I/Os are aborted back.
1065 	 */
1066 	if (!TAILQ_EMPTY(&q->inflight_io_list) || !TAILQ_EMPTY(&q->completed_io_list) || q->cmd_inflight) {
1067 		/* wait for next retry */
1068 		return;
1069 	}
1070 
1071 	TAILQ_REMOVE(&q->poll_group->queue_list, q, tailq);
1072 	spdk_put_io_channel(q->bdev_ch);
1073 	q->bdev_ch = NULL;
1074 
1075 	spdk_thread_send_msg(spdk_thread_get_app_thread(), ublk_try_close_dev, ublk);
1076 }
1077 
1078 int
1079 ublk_stop_disk(uint32_t ublk_id, ublk_ctrl_cb ctrl_cb, void *cb_arg)
1080 {
1081 	struct spdk_ublk_dev *ublk;
1082 
1083 	assert(spdk_thread_is_app_thread(NULL));
1084 
1085 	ublk = ublk_dev_find_by_id(ublk_id);
1086 	if (ublk == NULL) {
1087 		SPDK_ERRLOG("no ublk dev with ublk_id=%u\n", ublk_id);
1088 		return -ENODEV;
1089 	}
1090 	if (ublk->is_closing) {
1091 		SPDK_WARNLOG("ublk %d is closing\n", ublk->ublk_id);
1092 		return -EBUSY;
1093 	}
1094 	if (ublk->ctrl_cb) {
1095 		SPDK_WARNLOG("ublk %d is busy with RPC call\n", ublk->ublk_id);
1096 		return -EBUSY;
1097 	}
1098 
1099 	ublk->ctrl_cb = ctrl_cb;
1100 	ublk->cb_arg = cb_arg;
1101 	return ublk_close_dev(ublk);
1102 }
1103 
1104 static inline void
1105 ublk_mark_io_done(struct ublk_io *io, int res)
1106 {
1107 	/*
1108 	 * mark io done by target, so that SPDK can commit its
1109 	 * result and fetch new request via io_uring command.
1110 	 */
1111 	io->cmd_op = UBLK_IO_COMMIT_AND_FETCH_REQ;
1112 	io->result = res;
1113 	io->need_data = false;
1114 }
1115 
1116 static void
1117 ublk_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1118 {
1119 	struct ublk_io	*io = cb_arg;
1120 	struct ublk_queue *q = io->q;
1121 	int res;
1122 
1123 	if (success) {
1124 		res = io->result;
1125 	} else {
1126 		res = -EIO;
1127 	}
1128 
1129 	ublk_mark_io_done(io, res);
1130 
1131 	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %d res %d)\n",
1132 		      q->q_id, io->tag, res);
1133 	TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1134 	TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq);
1135 
1136 	if (bdev_io != NULL) {
1137 		spdk_bdev_free_io(bdev_io);
1138 	}
1139 }
1140 
1141 static void
1142 ublk_queue_user_copy(struct ublk_io *io, bool is_write)
1143 {
1144 	struct ublk_queue *q = io->q;
1145 	const struct ublksrv_io_desc *iod = io->iod;
1146 	struct io_uring_sqe *sqe;
1147 	uint64_t pos;
1148 	uint32_t nbytes;
1149 
1150 	nbytes = iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1151 	pos = ublk_user_copy_pos(q->q_id, io->tag);
1152 	sqe = io_uring_get_sqe(&q->ring);
1153 	assert(sqe);
1154 
1155 	if (is_write) {
1156 		io_uring_prep_read(sqe, 0, io->payload, nbytes, pos);
1157 	} else {
1158 		io_uring_prep_write(sqe, 0, io->payload, nbytes, pos);
1159 	}
1160 	io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
1161 	io_uring_sqe_set_data64(sqe, build_user_data(io->tag, 0));
1162 
1163 	io->user_copy = true;
1164 	TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1165 	TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq);
1166 }
1167 
1168 static void
1169 ublk_user_copy_read_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1170 {
1171 	struct ublk_io	*io = cb_arg;
1172 
1173 	spdk_bdev_free_io(bdev_io);
1174 
1175 	if (success) {
1176 		ublk_queue_user_copy(io, false);
1177 		return;
1178 	}
1179 	/* READ IO Error */
1180 	ublk_io_done(NULL, false, cb_arg);
1181 }
1182 
1183 static void
1184 ublk_resubmit_io(void *arg)
1185 {
1186 	struct ublk_io *io = (struct ublk_io *)arg;
1187 
1188 	_ublk_submit_bdev_io(io->q, io);
1189 }
1190 
1191 static void
1192 ublk_queue_io(struct ublk_io *io)
1193 {
1194 	int rc;
1195 	struct spdk_bdev *bdev = io->q->dev->bdev;
1196 	struct ublk_queue *q = io->q;
1197 
1198 	io->bdev_io_wait.bdev = bdev;
1199 	io->bdev_io_wait.cb_fn = ublk_resubmit_io;
1200 	io->bdev_io_wait.cb_arg = io;
1201 
1202 	rc = spdk_bdev_queue_io_wait(bdev, q->bdev_ch, &io->bdev_io_wait);
1203 	if (rc != 0) {
1204 		SPDK_ERRLOG("Queue io failed in ublk_queue_io, rc=%d.\n", rc);
1205 		ublk_io_done(NULL, false, io);
1206 	}
1207 }
1208 
1209 static void
1210 ublk_io_get_buffer_cb(struct spdk_iobuf_entry *iobuf, void *buf)
1211 {
1212 	struct ublk_io *io = SPDK_CONTAINEROF(iobuf, struct ublk_io, iobuf);
1213 
1214 	io->mpool_entry = buf;
1215 	assert(io->payload == NULL);
1216 	io->payload = (void *)(uintptr_t)SPDK_ALIGN_CEIL((uintptr_t)buf, 4096ULL);
1217 	io->get_buf_cb(io);
1218 }
1219 
1220 static void
1221 ublk_io_get_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch,
1222 		   ublk_get_buf_cb get_buf_cb)
1223 {
1224 	void *buf;
1225 
1226 	io->payload_size = io->iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1227 	io->get_buf_cb = get_buf_cb;
1228 	buf = spdk_iobuf_get(iobuf_ch, io->payload_size, &io->iobuf, ublk_io_get_buffer_cb);
1229 
1230 	if (buf != NULL) {
1231 		ublk_io_get_buffer_cb(&io->iobuf, buf);
1232 	}
1233 }
1234 
1235 static void
1236 ublk_io_put_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch)
1237 {
1238 	if (io->payload) {
1239 		spdk_iobuf_put(iobuf_ch, io->mpool_entry, io->payload_size);
1240 		io->mpool_entry = NULL;
1241 		io->payload = NULL;
1242 	}
1243 }
1244 
1245 static void
1246 _ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io)
1247 {
1248 	struct spdk_ublk_dev *ublk = q->dev;
1249 	struct spdk_bdev_desc *desc = io->bdev_desc;
1250 	struct spdk_io_channel *ch = io->bdev_ch;
1251 	uint64_t offset_blocks, num_blocks;
1252 	spdk_bdev_io_completion_cb read_cb;
1253 	uint8_t ublk_op;
1254 	int rc = 0;
1255 	const struct ublksrv_io_desc *iod = io->iod;
1256 
1257 	ublk_op = ublksrv_get_op(iod);
1258 	offset_blocks = iod->start_sector >> ublk->sector_per_block_shift;
1259 	num_blocks = iod->nr_sectors >> ublk->sector_per_block_shift;
1260 
1261 	switch (ublk_op) {
1262 	case UBLK_IO_OP_READ:
1263 		if (g_ublk_tgt.user_copy) {
1264 			read_cb = ublk_user_copy_read_done;
1265 		} else {
1266 			read_cb = ublk_io_done;
1267 		}
1268 		rc = spdk_bdev_read_blocks(desc, ch, io->payload, offset_blocks, num_blocks, read_cb, io);
1269 		break;
1270 	case UBLK_IO_OP_WRITE:
1271 		rc = spdk_bdev_write_blocks(desc, ch, io->payload, offset_blocks, num_blocks, ublk_io_done, io);
1272 		break;
1273 	case UBLK_IO_OP_FLUSH:
1274 		rc = spdk_bdev_flush_blocks(desc, ch, 0, spdk_bdev_get_num_blocks(ublk->bdev), ublk_io_done, io);
1275 		break;
1276 	case UBLK_IO_OP_DISCARD:
1277 		rc = spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1278 		break;
1279 	case UBLK_IO_OP_WRITE_ZEROES:
1280 		rc = spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1281 		break;
1282 	default:
1283 		rc = -1;
1284 	}
1285 
1286 	if (rc < 0) {
1287 		if (rc == -ENOMEM) {
1288 			SPDK_INFOLOG(ublk, "No memory, start to queue io.\n");
1289 			ublk_queue_io(io);
1290 		} else {
1291 			SPDK_ERRLOG("ublk io failed in ublk_queue_io, rc=%d, ublk_op=%u\n", rc, ublk_op);
1292 			ublk_io_done(NULL, false, io);
1293 		}
1294 	}
1295 }
1296 
1297 static void
1298 read_get_buffer_done(struct ublk_io *io)
1299 {
1300 	_ublk_submit_bdev_io(io->q, io);
1301 }
1302 
1303 static void
1304 user_copy_write_get_buffer_done(struct ublk_io *io)
1305 {
1306 	ublk_queue_user_copy(io, true);
1307 }
1308 
1309 static void
1310 ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io)
1311 {
1312 	struct spdk_iobuf_channel *iobuf_ch = &q->poll_group->iobuf_ch;
1313 	const struct ublksrv_io_desc *iod = io->iod;
1314 	uint8_t ublk_op;
1315 
1316 	io->result = iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1317 	ublk_op = ublksrv_get_op(iod);
1318 	switch (ublk_op) {
1319 	case UBLK_IO_OP_READ:
1320 		ublk_io_get_buffer(io, iobuf_ch, read_get_buffer_done);
1321 		break;
1322 	case UBLK_IO_OP_WRITE:
1323 		if (g_ublk_tgt.user_copy) {
1324 			ublk_io_get_buffer(io, iobuf_ch, user_copy_write_get_buffer_done);
1325 		} else {
1326 			_ublk_submit_bdev_io(q, io);
1327 		}
1328 		break;
1329 	default:
1330 		_ublk_submit_bdev_io(q, io);
1331 		break;
1332 	}
1333 }
1334 
1335 static inline void
1336 ublksrv_queue_io_cmd(struct ublk_queue *q,
1337 		     struct ublk_io *io, unsigned tag)
1338 {
1339 	struct ublksrv_io_cmd *cmd;
1340 	struct io_uring_sqe *sqe;
1341 	unsigned int cmd_op = 0;;
1342 	uint64_t user_data;
1343 
1344 	/* each io should have operation of fetching or committing */
1345 	assert((io->cmd_op == UBLK_IO_FETCH_REQ) || (io->cmd_op == UBLK_IO_NEED_GET_DATA) ||
1346 	       (io->cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ));
1347 	cmd_op = io->cmd_op;
1348 
1349 	sqe = io_uring_get_sqe(&q->ring);
1350 	assert(sqe);
1351 
1352 	cmd = (struct ublksrv_io_cmd *)ublk_get_sqe_cmd(sqe);
1353 	if (cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ) {
1354 		cmd->result = io->result;
1355 	}
1356 
1357 	/* These fields should be written once, never change */
1358 	ublk_set_sqe_cmd_op(sqe, cmd_op);
1359 	/* dev->cdev_fd */
1360 	sqe->fd		= 0;
1361 	sqe->opcode	= IORING_OP_URING_CMD;
1362 	sqe->flags	= IOSQE_FIXED_FILE;
1363 	sqe->rw_flags	= 0;
1364 	cmd->tag	= tag;
1365 	cmd->addr	= g_ublk_tgt.user_copy ? 0 : (__u64)(uintptr_t)(io->payload);
1366 	cmd->q_id	= q->q_id;
1367 
1368 	user_data = build_user_data(tag, cmd_op);
1369 	io_uring_sqe_set_data64(sqe, user_data);
1370 
1371 	io->cmd_op = 0;
1372 
1373 	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %u cmd_op %u) iof %x stopping %d\n",
1374 		      q->q_id, tag, cmd_op,
1375 		      io->cmd_op, q->is_stopping);
1376 }
1377 
1378 static int
1379 ublk_io_xmit(struct ublk_queue *q)
1380 {
1381 	TAILQ_HEAD(, ublk_io) buffer_free_list;
1382 	struct spdk_iobuf_channel *iobuf_ch;
1383 	int rc = 0, count = 0;
1384 	struct ublk_io *io;
1385 
1386 	if (TAILQ_EMPTY(&q->completed_io_list)) {
1387 		return 0;
1388 	}
1389 
1390 	TAILQ_INIT(&buffer_free_list);
1391 	while (!TAILQ_EMPTY(&q->completed_io_list)) {
1392 		io = TAILQ_FIRST(&q->completed_io_list);
1393 		assert(io != NULL);
1394 		/*
1395 		 * Remove IO from list now assuming it will be completed. It will be inserted
1396 		 * back to the head if it cannot be completed. This approach is specifically
1397 		 * taken to work around a scan-build use-after-free mischaracterization.
1398 		 */
1399 		TAILQ_REMOVE(&q->completed_io_list, io, tailq);
1400 		if (!io->user_copy) {
1401 			if (!io->need_data) {
1402 				TAILQ_INSERT_TAIL(&buffer_free_list, io, tailq);
1403 			}
1404 			ublksrv_queue_io_cmd(q, io, io->tag);
1405 		}
1406 		count++;
1407 	}
1408 
1409 	q->cmd_inflight += count;
1410 	rc = io_uring_submit(&q->ring);
1411 	if (rc != count) {
1412 		SPDK_ERRLOG("could not submit all commands\n");
1413 		assert(false);
1414 	}
1415 
1416 	/* Note: for READ io, ublk will always copy the data out of
1417 	 * the buffers in the io_uring_submit context.  Since we
1418 	 * are not using SQPOLL for IO rings, we can safely free
1419 	 * those IO buffers here.  This design doesn't seem ideal,
1420 	 * but it's what's possible since there is no discrete
1421 	 * COMMIT_REQ operation.  That will need to change in the
1422 	 * future should we ever want to support async copy
1423 	 * operations.
1424 	 */
1425 	iobuf_ch = &q->poll_group->iobuf_ch;
1426 	while (!TAILQ_EMPTY(&buffer_free_list)) {
1427 		io = TAILQ_FIRST(&buffer_free_list);
1428 		TAILQ_REMOVE(&buffer_free_list, io, tailq);
1429 		ublk_io_put_buffer(io, iobuf_ch);
1430 	}
1431 	return rc;
1432 }
1433 
1434 static void
1435 write_get_buffer_done(struct ublk_io *io)
1436 {
1437 	io->need_data = true;
1438 	io->cmd_op = UBLK_IO_NEED_GET_DATA;
1439 	io->result = 0;
1440 
1441 	TAILQ_REMOVE(&io->q->inflight_io_list, io, tailq);
1442 	TAILQ_INSERT_TAIL(&io->q->completed_io_list, io, tailq);
1443 }
1444 
1445 static int
1446 ublk_io_recv(struct ublk_queue *q)
1447 {
1448 	struct io_uring_cqe *cqe;
1449 	unsigned head, tag;
1450 	int fetch, count = 0;
1451 	struct ublk_io *io;
1452 	struct spdk_iobuf_channel *iobuf_ch;
1453 
1454 	if (q->cmd_inflight == 0) {
1455 		return 0;
1456 	}
1457 
1458 	iobuf_ch = &q->poll_group->iobuf_ch;
1459 	io_uring_for_each_cqe(&q->ring, head, cqe) {
1460 		tag = user_data_to_tag(cqe->user_data);
1461 		io = &q->ios[tag];
1462 
1463 		SPDK_DEBUGLOG(ublk_io, "res %d qid %d tag %u, user copy %u, cmd_op %u\n",
1464 			      cqe->res, q->q_id, tag, io->user_copy, user_data_to_op(cqe->user_data));
1465 
1466 		q->cmd_inflight--;
1467 		TAILQ_INSERT_TAIL(&q->inflight_io_list, io, tailq);
1468 
1469 		if (!io->user_copy) {
1470 			fetch = (cqe->res != UBLK_IO_RES_ABORT) && !q->is_stopping;
1471 			if (!fetch) {
1472 				q->is_stopping = true;
1473 				if (io->cmd_op == UBLK_IO_FETCH_REQ) {
1474 					io->cmd_op = 0;
1475 				}
1476 			}
1477 
1478 			if (cqe->res == UBLK_IO_RES_OK) {
1479 				ublk_submit_bdev_io(q, io);
1480 			} else if (cqe->res == UBLK_IO_RES_NEED_GET_DATA) {
1481 				ublk_io_get_buffer(io, iobuf_ch, write_get_buffer_done);
1482 			} else {
1483 				if (cqe->res != UBLK_IO_RES_ABORT) {
1484 					SPDK_ERRLOG("ublk received error io: res %d qid %d tag %u cmd_op %u\n",
1485 						    cqe->res, q->q_id, tag, user_data_to_op(cqe->user_data));
1486 				}
1487 				TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1488 			}
1489 		} else {
1490 
1491 			/* clear `user_copy` for next use of this IO structure */
1492 			io->user_copy = false;
1493 
1494 			assert((ublksrv_get_op(io->iod) == UBLK_IO_OP_READ) ||
1495 			       (ublksrv_get_op(io->iod) == UBLK_IO_OP_WRITE));
1496 			if (cqe->res != io->result) {
1497 				/* EIO */
1498 				ublk_io_done(NULL, false, io);
1499 			} else {
1500 				if (ublksrv_get_op(io->iod) == UBLK_IO_OP_READ) {
1501 					/* bdev_io is already freed in first READ cycle */
1502 					ublk_io_done(NULL, true, io);
1503 				} else {
1504 					_ublk_submit_bdev_io(q, io);
1505 				}
1506 			}
1507 		}
1508 		count += 1;
1509 		if (count == UBLK_QUEUE_REQUEST) {
1510 			break;
1511 		}
1512 	}
1513 	io_uring_cq_advance(&q->ring, count);
1514 
1515 	return count;
1516 }
1517 
1518 static int
1519 ublk_poll(void *arg)
1520 {
1521 	struct ublk_poll_group *poll_group = arg;
1522 	struct ublk_queue *q, *q_tmp;
1523 	int sent, received, count = 0;
1524 
1525 	TAILQ_FOREACH_SAFE(q, &poll_group->queue_list, tailq, q_tmp) {
1526 		sent = ublk_io_xmit(q);
1527 		received = ublk_io_recv(q);
1528 		if (spdk_unlikely(q->is_stopping)) {
1529 			ublk_try_close_queue(q);
1530 		}
1531 		count += sent + received;
1532 	}
1533 	if (count > 0) {
1534 		return SPDK_POLLER_BUSY;
1535 	} else {
1536 		return SPDK_POLLER_IDLE;
1537 	}
1538 }
1539 
1540 static void
1541 ublk_bdev_hot_remove(struct spdk_ublk_dev *ublk)
1542 {
1543 	ublk_close_dev(ublk);
1544 }
1545 
1546 static void
1547 ublk_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1548 		   void *event_ctx)
1549 {
1550 	switch (type) {
1551 	case SPDK_BDEV_EVENT_REMOVE:
1552 		ublk_bdev_hot_remove(event_ctx);
1553 		break;
1554 	default:
1555 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1556 		break;
1557 	}
1558 }
1559 
1560 static void
1561 ublk_dev_init_io_cmds(struct io_uring *r, uint32_t q_depth)
1562 {
1563 	struct io_uring_sqe *sqe;
1564 	uint32_t i;
1565 
1566 	for (i = 0; i < q_depth; i++) {
1567 		sqe = ublk_uring_get_sqe(r, i);
1568 
1569 		/* These fields should be written once, never change */
1570 		sqe->flags = IOSQE_FIXED_FILE;
1571 		sqe->rw_flags = 0;
1572 		sqe->ioprio = 0;
1573 		sqe->off = 0;
1574 	}
1575 }
1576 
1577 static int
1578 ublk_dev_queue_init(struct ublk_queue *q)
1579 {
1580 	int rc = 0, cmd_buf_size;
1581 	uint32_t j;
1582 	struct spdk_ublk_dev *ublk = q->dev;
1583 	unsigned long off;
1584 
1585 	cmd_buf_size = ublk_queue_cmd_buf_sz(q->q_depth);
1586 	off = UBLKSRV_CMD_BUF_OFFSET +
1587 	      q->q_id * (UBLK_MAX_QUEUE_DEPTH * sizeof(struct ublksrv_io_desc));
1588 	q->io_cmd_buf = (struct ublksrv_io_desc *)mmap(0, cmd_buf_size, PROT_READ,
1589 			MAP_SHARED | MAP_POPULATE, ublk->cdev_fd, off);
1590 	if (q->io_cmd_buf == MAP_FAILED) {
1591 		q->io_cmd_buf = NULL;
1592 		rc = -errno;
1593 		SPDK_ERRLOG("Failed at mmap: %s\n", spdk_strerror(-rc));
1594 		return rc;
1595 	}
1596 
1597 	for (j = 0; j < q->q_depth; j++) {
1598 		q->ios[j].cmd_op = UBLK_IO_FETCH_REQ;
1599 		q->ios[j].iod = &q->io_cmd_buf[j];
1600 	}
1601 
1602 	rc = ublk_setup_ring(q->q_depth, &q->ring, IORING_SETUP_SQE128);
1603 	if (rc < 0) {
1604 		SPDK_ERRLOG("Failed at setup uring: %s\n", spdk_strerror(-rc));
1605 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1606 		q->io_cmd_buf = NULL;
1607 		return rc;
1608 	}
1609 
1610 	rc = io_uring_register_files(&q->ring, &ublk->cdev_fd, 1);
1611 	if (rc != 0) {
1612 		SPDK_ERRLOG("Failed at uring register files: %s\n", spdk_strerror(-rc));
1613 		io_uring_queue_exit(&q->ring);
1614 		q->ring.ring_fd = -1;
1615 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1616 		q->io_cmd_buf = NULL;
1617 		return rc;
1618 	}
1619 
1620 	ublk_dev_init_io_cmds(&q->ring, q->q_depth);
1621 
1622 	return 0;
1623 }
1624 
1625 static void
1626 ublk_dev_queue_fini(struct ublk_queue *q)
1627 {
1628 	if (q->ring.ring_fd >= 0) {
1629 		io_uring_unregister_files(&q->ring);
1630 		io_uring_queue_exit(&q->ring);
1631 		q->ring.ring_fd = -1;
1632 	}
1633 	if (q->io_cmd_buf) {
1634 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1635 	}
1636 }
1637 
1638 static void
1639 ublk_dev_queue_io_init(struct ublk_queue *q)
1640 {
1641 	struct ublk_io *io;
1642 	uint32_t i;
1643 	int rc __attribute__((unused));
1644 	void *buf;
1645 
1646 	/* Some older kernels require a buffer to get posted, even
1647 	 * when NEED_GET_DATA has been specified.  So allocate a
1648 	 * temporary buffer, only for purposes of this workaround.
1649 	 * It never actually gets used, so we will free it immediately
1650 	 * after all of the commands are posted.
1651 	 */
1652 	buf = malloc(64);
1653 
1654 	assert(q->bdev_ch != NULL);
1655 
1656 	/* Initialize and submit all io commands to ublk driver */
1657 	for (i = 0; i < q->q_depth; i++) {
1658 		io = &q->ios[i];
1659 		io->tag = (uint16_t)i;
1660 		io->payload = buf;
1661 		io->bdev_ch = q->bdev_ch;
1662 		io->bdev_desc = q->dev->bdev_desc;
1663 		ublksrv_queue_io_cmd(q, io, i);
1664 	}
1665 
1666 	q->cmd_inflight += q->q_depth;
1667 	rc = io_uring_submit(&q->ring);
1668 	assert(rc == (int)q->q_depth);
1669 	for (i = 0; i < q->q_depth; i++) {
1670 		io = &q->ios[i];
1671 		io->payload = NULL;
1672 	}
1673 	free(buf);
1674 }
1675 
1676 static int
1677 ublk_set_params(struct spdk_ublk_dev *ublk)
1678 {
1679 	int rc;
1680 
1681 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_SET_PARAMS);
1682 	if (rc < 0) {
1683 		SPDK_ERRLOG("UBLK can't set params for dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
1684 	}
1685 
1686 	return rc;
1687 }
1688 
1689 static void
1690 ublk_dev_info_init(struct spdk_ublk_dev *ublk)
1691 {
1692 	struct ublksrv_ctrl_dev_info uinfo = {
1693 		.queue_depth = ublk->queue_depth,
1694 		.nr_hw_queues = ublk->num_queues,
1695 		.dev_id = ublk->ublk_id,
1696 		.max_io_buf_bytes = UBLK_IO_MAX_BYTES,
1697 		.ublksrv_pid = getpid(),
1698 		.flags = UBLK_F_URING_CMD_COMP_IN_TASK,
1699 	};
1700 
1701 	if (g_ublk_tgt.user_copy) {
1702 		uinfo.flags |= UBLK_F_USER_COPY;
1703 	} else {
1704 		uinfo.flags |= UBLK_F_NEED_GET_DATA;
1705 	}
1706 
1707 	if (g_ublk_tgt.user_recovery) {
1708 		uinfo.flags |= UBLK_F_USER_RECOVERY;
1709 		uinfo.flags |= UBLK_F_USER_RECOVERY_REISSUE;
1710 	}
1711 
1712 	ublk->dev_info = uinfo;
1713 }
1714 
1715 /* Set ublk device parameters based on bdev */
1716 static void
1717 ublk_info_param_init(struct spdk_ublk_dev *ublk)
1718 {
1719 	struct spdk_bdev *bdev = ublk->bdev;
1720 	uint32_t blk_size = spdk_bdev_get_data_block_size(bdev);
1721 	uint32_t pblk_size = spdk_bdev_get_physical_block_size(bdev);
1722 	uint32_t io_opt_blocks = spdk_bdev_get_optimal_io_boundary(bdev);
1723 	uint64_t num_blocks = spdk_bdev_get_num_blocks(bdev);
1724 	uint8_t sectors_per_block = blk_size >> LINUX_SECTOR_SHIFT;
1725 	uint32_t io_min_size = blk_size;
1726 	uint32_t io_opt_size = spdk_max(io_opt_blocks * blk_size, io_min_size);
1727 
1728 	struct ublk_params uparams = {
1729 		.types = UBLK_PARAM_TYPE_BASIC,
1730 		.len = sizeof(struct ublk_params),
1731 		.basic = {
1732 			.logical_bs_shift = spdk_u32log2(blk_size),
1733 			.physical_bs_shift = spdk_u32log2(pblk_size),
1734 			.io_min_shift = spdk_u32log2(io_min_size),
1735 			.io_opt_shift = spdk_u32log2(io_opt_size),
1736 			.dev_sectors = num_blocks * sectors_per_block,
1737 			.max_sectors = UBLK_IO_MAX_BYTES >> LINUX_SECTOR_SHIFT,
1738 		}
1739 	};
1740 
1741 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_FLUSH)) {
1742 		uparams.basic.attrs = UBLK_ATTR_VOLATILE_CACHE;
1743 	}
1744 
1745 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1746 		uparams.types |= UBLK_PARAM_TYPE_DISCARD;
1747 		uparams.discard.discard_alignment = sectors_per_block;
1748 		uparams.discard.max_discard_sectors = num_blocks * sectors_per_block;
1749 		uparams.discard.max_discard_segments = 1;
1750 		uparams.discard.discard_granularity = blk_size;
1751 		if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1752 			uparams.discard.max_write_zeroes_sectors = num_blocks * sectors_per_block;
1753 		}
1754 	}
1755 
1756 	ublk->dev_params = uparams;
1757 }
1758 
1759 static void
1760 _ublk_free_dev(void *arg)
1761 {
1762 	struct spdk_ublk_dev *ublk = arg;
1763 
1764 	ublk_free_dev(ublk);
1765 }
1766 
1767 static void
1768 free_buffers(void *arg)
1769 {
1770 	struct ublk_queue *q = arg;
1771 	uint32_t i;
1772 
1773 	for (i = 0; i < q->q_depth; i++) {
1774 		ublk_io_put_buffer(&q->ios[i], &q->poll_group->iobuf_ch);
1775 	}
1776 	free(q->ios);
1777 	q->ios = NULL;
1778 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _ublk_free_dev, q->dev);
1779 }
1780 
1781 static void
1782 ublk_free_dev(struct spdk_ublk_dev *ublk)
1783 {
1784 	struct ublk_queue *q;
1785 	uint32_t q_idx;
1786 
1787 	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
1788 		q = &ublk->queues[q_idx];
1789 
1790 		/* The ublk_io of this queue are not initialized. */
1791 		if (q->ios == NULL) {
1792 			continue;
1793 		}
1794 
1795 		/* We found a queue that has an ios array that may have buffers
1796 		 * that need to be freed.  Send a message to the queue's thread
1797 		 * so it can free the buffers back to that thread's iobuf channel.
1798 		 * When it's done, it will set q->ios to NULL and send a message
1799 		 * back to this function to continue.
1800 		 */
1801 		if (q->poll_group) {
1802 			spdk_thread_send_msg(q->poll_group->ublk_thread, free_buffers, q);
1803 			return;
1804 		} else {
1805 			free(q->ios);
1806 			q->ios = NULL;
1807 		}
1808 	}
1809 
1810 	/* All of the buffers associated with the queues have been freed, so now
1811 	 * continue with releasing resources for the rest of the ublk device.
1812 	 */
1813 	if (ublk->bdev_desc) {
1814 		spdk_bdev_close(ublk->bdev_desc);
1815 		ublk->bdev_desc = NULL;
1816 	}
1817 
1818 	ublk_dev_list_unregister(ublk);
1819 	SPDK_NOTICELOG("ublk dev %d stopped\n", ublk->ublk_id);
1820 
1821 	free(ublk);
1822 }
1823 
1824 static int
1825 ublk_ios_init(struct spdk_ublk_dev *ublk)
1826 {
1827 	int rc;
1828 	uint32_t i, j;
1829 	struct ublk_queue *q;
1830 
1831 	for (i = 0; i < ublk->num_queues; i++) {
1832 		q = &ublk->queues[i];
1833 
1834 		TAILQ_INIT(&q->completed_io_list);
1835 		TAILQ_INIT(&q->inflight_io_list);
1836 		q->dev = ublk;
1837 		q->q_id = i;
1838 		q->q_depth = ublk->queue_depth;
1839 		q->ios = calloc(q->q_depth, sizeof(struct ublk_io));
1840 		if (!q->ios) {
1841 			rc = -ENOMEM;
1842 			SPDK_ERRLOG("could not allocate queue ios\n");
1843 			goto err;
1844 		}
1845 		for (j = 0; j < q->q_depth; j++) {
1846 			q->ios[j].q = q;
1847 		}
1848 	}
1849 
1850 	return 0;
1851 
1852 err:
1853 	for (i = 0; i < ublk->num_queues; i++) {
1854 		free(q->ios);
1855 		q->ios = NULL;
1856 	}
1857 	return rc;
1858 }
1859 
1860 static void
1861 ublk_queue_recovery_done(void *arg)
1862 {
1863 	struct spdk_ublk_dev *ublk = arg;
1864 
1865 	ublk->online_num_queues++;
1866 	if (ublk->is_recovering && (ublk->online_num_queues == ublk->num_queues)) {
1867 		ublk_ctrl_cmd_submit(ublk, UBLK_CMD_END_USER_RECOVERY);
1868 	}
1869 }
1870 
1871 static void
1872 ublk_queue_run(void *arg1)
1873 {
1874 	struct ublk_queue	*q = arg1;
1875 	struct spdk_ublk_dev *ublk = q->dev;
1876 	struct ublk_poll_group *poll_group = q->poll_group;
1877 
1878 	assert(spdk_get_thread() == poll_group->ublk_thread);
1879 	q->bdev_ch = spdk_bdev_get_io_channel(ublk->bdev_desc);
1880 	/* Queues must be filled with IO in the io pthread */
1881 	ublk_dev_queue_io_init(q);
1882 
1883 	TAILQ_INSERT_TAIL(&poll_group->queue_list, q, tailq);
1884 	spdk_thread_send_msg(spdk_thread_get_app_thread(), ublk_queue_recovery_done, ublk);
1885 }
1886 
1887 int
1888 ublk_start_disk(const char *bdev_name, uint32_t ublk_id,
1889 		uint32_t num_queues, uint32_t queue_depth,
1890 		ublk_ctrl_cb ctrl_cb, void *cb_arg)
1891 {
1892 	int			rc;
1893 	uint32_t		i;
1894 	struct spdk_bdev	*bdev;
1895 	struct spdk_ublk_dev	*ublk = NULL;
1896 	uint32_t		sector_per_block;
1897 
1898 	assert(spdk_thread_is_app_thread(NULL));
1899 
1900 	if (g_ublk_tgt.active == false) {
1901 		SPDK_ERRLOG("NO ublk target exist\n");
1902 		return -ENODEV;
1903 	}
1904 
1905 	ublk = ublk_dev_find_by_id(ublk_id);
1906 	if (ublk != NULL) {
1907 		SPDK_DEBUGLOG(ublk, "ublk id %d is in use.\n", ublk_id);
1908 		return -EBUSY;
1909 	}
1910 
1911 	if (g_ublk_tgt.num_ublk_devs >= g_ublks_max) {
1912 		SPDK_DEBUGLOG(ublk, "Reached maximum number of supported devices: %u\n", g_ublks_max);
1913 		return -ENOTSUP;
1914 	}
1915 
1916 	ublk = calloc(1, sizeof(*ublk));
1917 	if (ublk == NULL) {
1918 		return -ENOMEM;
1919 	}
1920 	ublk->ctrl_cb = ctrl_cb;
1921 	ublk->cb_arg = cb_arg;
1922 	ublk->cdev_fd = -1;
1923 	ublk->ublk_id = ublk_id;
1924 	UBLK_DEBUGLOG(ublk, "bdev %s num_queues %d queue_depth %d\n",
1925 		      bdev_name, num_queues, queue_depth);
1926 
1927 	rc = spdk_bdev_open_ext(bdev_name, true, ublk_bdev_event_cb, ublk, &ublk->bdev_desc);
1928 	if (rc != 0) {
1929 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc);
1930 		free(ublk);
1931 		return rc;
1932 	}
1933 
1934 	bdev = spdk_bdev_desc_get_bdev(ublk->bdev_desc);
1935 	ublk->bdev = bdev;
1936 	sector_per_block = spdk_bdev_get_data_block_size(ublk->bdev) >> LINUX_SECTOR_SHIFT;
1937 	ublk->sector_per_block_shift = spdk_u32log2(sector_per_block);
1938 
1939 	ublk->queues_closed = 0;
1940 	ublk->num_queues = num_queues;
1941 	ublk->queue_depth = queue_depth;
1942 	if (ublk->queue_depth > UBLK_DEV_MAX_QUEUE_DEPTH) {
1943 		SPDK_WARNLOG("Set Queue depth %d of UBLK %d to maximum %d\n",
1944 			     ublk->queue_depth, ublk->ublk_id, UBLK_DEV_MAX_QUEUE_DEPTH);
1945 		ublk->queue_depth = UBLK_DEV_MAX_QUEUE_DEPTH;
1946 	}
1947 	if (ublk->num_queues > UBLK_DEV_MAX_QUEUES) {
1948 		SPDK_WARNLOG("Set Queue num %d of UBLK %d to maximum %d\n",
1949 			     ublk->num_queues, ublk->ublk_id, UBLK_DEV_MAX_QUEUES);
1950 		ublk->num_queues = UBLK_DEV_MAX_QUEUES;
1951 	}
1952 	for (i = 0; i < ublk->num_queues; i++) {
1953 		ublk->queues[i].ring.ring_fd = -1;
1954 	}
1955 
1956 	ublk_dev_info_init(ublk);
1957 	ublk_info_param_init(ublk);
1958 	rc = ublk_ios_init(ublk);
1959 	if (rc != 0) {
1960 		spdk_bdev_close(ublk->bdev_desc);
1961 		free(ublk);
1962 		return rc;
1963 	}
1964 
1965 	SPDK_INFOLOG(ublk, "Enabling kernel access to bdev %s via ublk %d\n",
1966 		     bdev_name, ublk_id);
1967 
1968 	/* Add ublk_dev to the end of disk list */
1969 	ublk_dev_list_register(ublk);
1970 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_ADD_DEV);
1971 	if (rc < 0) {
1972 		SPDK_ERRLOG("UBLK can't add dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
1973 		ublk_free_dev(ublk);
1974 	}
1975 
1976 	return rc;
1977 }
1978 
1979 static int
1980 ublk_start_dev(struct spdk_ublk_dev *ublk, bool is_recovering)
1981 {
1982 	int			rc;
1983 	uint32_t		q_id;
1984 	struct spdk_thread	*ublk_thread;
1985 	char			buf[64];
1986 
1987 	snprintf(buf, 64, "%s%d", UBLK_BLK_CDEV, ublk->ublk_id);
1988 	ublk->cdev_fd = open(buf, O_RDWR);
1989 	if (ublk->cdev_fd < 0) {
1990 		rc = ublk->cdev_fd;
1991 		SPDK_ERRLOG("can't open %s, rc %d\n", buf, rc);
1992 		return rc;
1993 	}
1994 
1995 	for (q_id = 0; q_id < ublk->num_queues; q_id++) {
1996 		rc = ublk_dev_queue_init(&ublk->queues[q_id]);
1997 		if (rc) {
1998 			return rc;
1999 		}
2000 	}
2001 
2002 	if (!is_recovering) {
2003 		rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_START_DEV);
2004 		if (rc < 0) {
2005 			SPDK_ERRLOG("start dev %d failed, rc %s\n", ublk->ublk_id,
2006 				    spdk_strerror(-rc));
2007 			return rc;
2008 		}
2009 	}
2010 
2011 	/* Send queue to different spdk_threads for load balance */
2012 	for (q_id = 0; q_id < ublk->num_queues; q_id++) {
2013 		ublk->queues[q_id].poll_group = &g_ublk_tgt.poll_groups[g_next_ublk_poll_group];
2014 		ublk_thread = g_ublk_tgt.poll_groups[g_next_ublk_poll_group].ublk_thread;
2015 		spdk_thread_send_msg(ublk_thread, ublk_queue_run, &ublk->queues[q_id]);
2016 		g_next_ublk_poll_group++;
2017 		if (g_next_ublk_poll_group == g_num_ublk_poll_groups) {
2018 			g_next_ublk_poll_group = 0;
2019 		}
2020 	}
2021 
2022 	return 0;
2023 }
2024 
2025 static int
2026 ublk_ctrl_start_recovery(struct spdk_ublk_dev *ublk)
2027 {
2028 	int                     rc;
2029 	uint32_t                i;
2030 
2031 	ublk->num_queues = ublk->dev_info.nr_hw_queues;
2032 	ublk->queue_depth = ublk->dev_info.queue_depth;
2033 	ublk->dev_info.ublksrv_pid = getpid();
2034 
2035 	SPDK_DEBUGLOG(ublk, "Recovering ublk %d, num queues %u, queue depth %u, flags 0x%llx\n",
2036 		      ublk->ublk_id,
2037 		      ublk->num_queues, ublk->queue_depth, ublk->dev_info.flags);
2038 
2039 	for (i = 0; i < ublk->num_queues; i++) {
2040 		ublk->queues[i].ring.ring_fd = -1;
2041 	}
2042 
2043 	ublk_info_param_init(ublk);
2044 	rc = ublk_ios_init(ublk);
2045 	if (rc != 0) {
2046 		return rc;
2047 	}
2048 
2049 	ublk->is_recovering = true;
2050 	return ublk_ctrl_cmd_submit(ublk, UBLK_CMD_START_USER_RECOVERY);
2051 }
2052 
2053 int
2054 ublk_start_disk_recovery(const char *bdev_name, uint32_t ublk_id, ublk_ctrl_cb ctrl_cb,
2055 			 void *cb_arg)
2056 {
2057 	int			rc;
2058 	struct spdk_bdev	*bdev;
2059 	struct spdk_ublk_dev	*ublk = NULL;
2060 	uint32_t		sector_per_block;
2061 
2062 	assert(spdk_thread_is_app_thread(NULL));
2063 
2064 	if (g_ublk_tgt.active == false) {
2065 		SPDK_ERRLOG("NO ublk target exist\n");
2066 		return -ENODEV;
2067 	}
2068 
2069 	if (!g_ublk_tgt.user_recovery) {
2070 		SPDK_ERRLOG("User recovery is enabled with kernel version >= 6.4\n");
2071 		return -ENOTSUP;
2072 	}
2073 
2074 	ublk = ublk_dev_find_by_id(ublk_id);
2075 	if (ublk != NULL) {
2076 		SPDK_DEBUGLOG(ublk, "ublk id %d is in use.\n", ublk_id);
2077 		return -EBUSY;
2078 	}
2079 
2080 	if (g_ublk_tgt.num_ublk_devs >= g_ublks_max) {
2081 		SPDK_DEBUGLOG(ublk, "Reached maximum number of supported devices: %u\n", g_ublks_max);
2082 		return -ENOTSUP;
2083 	}
2084 
2085 	ublk = calloc(1, sizeof(*ublk));
2086 	if (ublk == NULL) {
2087 		return -ENOMEM;
2088 	}
2089 	ublk->ctrl_cb = ctrl_cb;
2090 	ublk->cb_arg = cb_arg;
2091 	ublk->cdev_fd = -1;
2092 	ublk->ublk_id = ublk_id;
2093 
2094 	rc = spdk_bdev_open_ext(bdev_name, true, ublk_bdev_event_cb, ublk, &ublk->bdev_desc);
2095 	if (rc != 0) {
2096 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc);
2097 		free(ublk);
2098 		return rc;
2099 	}
2100 
2101 	bdev = spdk_bdev_desc_get_bdev(ublk->bdev_desc);
2102 	ublk->bdev = bdev;
2103 	sector_per_block = spdk_bdev_get_data_block_size(ublk->bdev) >> LINUX_SECTOR_SHIFT;
2104 	ublk->sector_per_block_shift = spdk_u32log2(sector_per_block);
2105 
2106 	SPDK_NOTICELOG("Recovering ublk %d with bdev %s\n", ublk->ublk_id, bdev_name);
2107 
2108 	ublk_dev_list_register(ublk);
2109 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_GET_DEV_INFO);
2110 	if (rc < 0) {
2111 		ublk_free_dev(ublk);
2112 	}
2113 
2114 	return rc;
2115 }
2116 
2117 SPDK_LOG_REGISTER_COMPONENT(ublk)
2118 SPDK_LOG_REGISTER_COMPONENT(ublk_io)
2119