xref: /spdk/lib/ublk/ublk.c (revision 28d7251a2dda924d4fdee8dc6cd1fc97983fbff4)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2022 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include <linux/ublk_cmd.h>
7 #include <liburing.h>
8 
9 #include "spdk/stdinc.h"
10 #include "spdk/string.h"
11 #include "spdk/bdev.h"
12 #include "spdk/endian.h"
13 #include "spdk/env.h"
14 #include "spdk/likely.h"
15 #include "spdk/log.h"
16 #include "spdk/util.h"
17 #include "spdk/queue.h"
18 #include "spdk/json.h"
19 #include "spdk/ublk.h"
20 #include "spdk/thread.h"
21 
22 #include "ublk_internal.h"
23 
24 #define UBLK_CTRL_DEV					"/dev/ublk-control"
25 #define UBLK_BLK_CDEV					"/dev/ublkc"
26 
27 #define LINUX_SECTOR_SHIFT				9
28 #define UBLK_IO_MAX_BYTES				SPDK_BDEV_LARGE_BUF_MAX_SIZE
29 #define UBLK_DEV_MAX_QUEUES				32
30 #define UBLK_DEV_MAX_QUEUE_DEPTH			1024
31 #define UBLK_QUEUE_REQUEST				32
32 #define UBLK_STOP_BUSY_WAITING_MS			10000
33 #define UBLK_BUSY_POLLING_INTERVAL_US			20000
34 #define UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US	1000
35 /* By default, kernel ublk_drv driver can support up to 64 block devices */
36 #define UBLK_DEFAULT_MAX_SUPPORTED_DEVS			64
37 
38 #define UBLK_IOBUF_SMALL_CACHE_SIZE			128
39 #define UBLK_IOBUF_LARGE_CACHE_SIZE			32
40 
41 #define UBLK_DEBUGLOG(ublk, format, ...) \
42 	SPDK_DEBUGLOG(ublk, "ublk%d: " format, ublk->ublk_id, ##__VA_ARGS__);
43 
44 static uint32_t g_num_ublk_poll_groups = 0;
45 static uint32_t g_next_ublk_poll_group = 0;
46 static uint32_t g_ublks_max = UBLK_DEFAULT_MAX_SUPPORTED_DEVS;
47 static struct spdk_cpuset g_core_mask;
48 
49 struct ublk_queue;
50 struct ublk_poll_group;
51 static void ublk_submit_bdev_io(struct ublk_queue *q, uint16_t tag);
52 static void ublk_dev_queue_fini(struct ublk_queue *q);
53 static int ublk_poll(void *arg);
54 static int ublk_ctrl_cmd(struct spdk_ublk_dev *ublk, uint32_t cmd_op);
55 
56 typedef void (*ublk_next_state_fn)(struct spdk_ublk_dev *ublk);
57 static void ublk_set_params(struct spdk_ublk_dev *ublk);
58 static void ublk_finish_start(struct spdk_ublk_dev *ublk);
59 static void ublk_free_dev(struct spdk_ublk_dev *ublk);
60 
61 static const char *ublk_op_name[64]
62 __attribute__((unused)) = {
63 	[UBLK_CMD_ADD_DEV] =	"UBLK_CMD_ADD_DEV",
64 	[UBLK_CMD_DEL_DEV] =	"UBLK_CMD_DEL_DEV",
65 	[UBLK_CMD_START_DEV] =	"UBLK_CMD_START_DEV",
66 	[UBLK_CMD_STOP_DEV] =	"UBLK_CMD_STOP_DEV",
67 	[UBLK_CMD_SET_PARAMS] =	"UBLK_CMD_SET_PARAMS",
68 };
69 
70 struct ublk_io;
71 typedef void (*ublk_get_buf_cb)(struct ublk_io *io);
72 
73 struct ublk_io {
74 	void			*payload;
75 	void			*mpool_entry;
76 	bool			need_data;
77 	uint32_t		sector_per_block_shift;
78 	uint32_t		payload_size;
79 	uint32_t		cmd_op;
80 	int32_t			result;
81 	struct spdk_bdev_desc	*bdev_desc;
82 	struct spdk_io_channel	*bdev_ch;
83 	const struct ublksrv_io_desc	*iod;
84 	ublk_get_buf_cb		get_buf_cb;
85 	struct ublk_queue	*q;
86 	/* for bdev io_wait */
87 	struct spdk_bdev_io_wait_entry bdev_io_wait;
88 	struct spdk_iobuf_entry	iobuf;
89 
90 	TAILQ_ENTRY(ublk_io)	tailq;
91 };
92 
93 struct ublk_queue {
94 	uint32_t		q_id;
95 	uint32_t		q_depth;
96 	struct ublk_io		*ios;
97 	TAILQ_HEAD(, ublk_io)	completed_io_list;
98 	TAILQ_HEAD(, ublk_io)	inflight_io_list;
99 	uint32_t		cmd_inflight;
100 	bool			is_stopping;
101 	struct ublksrv_io_desc	*io_cmd_buf;
102 	/* ring depth == dev_info->queue_depth. */
103 	struct io_uring		ring;
104 	struct spdk_ublk_dev	*dev;
105 	struct ublk_poll_group	*poll_group;
106 	struct spdk_io_channel	*bdev_ch;
107 
108 	TAILQ_ENTRY(ublk_queue)	tailq;
109 };
110 
111 struct spdk_ublk_dev {
112 	struct spdk_bdev	*bdev;
113 	struct spdk_bdev_desc	*bdev_desc;
114 
115 	int			cdev_fd;
116 	struct ublk_params	dev_params;
117 	struct ublksrv_ctrl_dev_info	dev_info;
118 
119 	uint32_t		ublk_id;
120 	uint32_t		num_queues;
121 	uint32_t		queue_depth;
122 	uint32_t		sector_per_block_shift;
123 	struct ublk_queue	queues[UBLK_DEV_MAX_QUEUES];
124 
125 	struct spdk_poller	*retry_poller;
126 	int			retry_count;
127 	uint32_t		queues_closed;
128 	ublk_start_cb		start_cb;
129 	ublk_del_cb		del_cb;
130 	void			*cb_arg;
131 	ublk_next_state_fn	next_state_fn;
132 	uint32_t		ctrl_ops_in_progress;
133 	bool			is_closing;
134 
135 	TAILQ_ENTRY(spdk_ublk_dev) tailq;
136 	TAILQ_ENTRY(spdk_ublk_dev) wait_tailq;
137 };
138 
139 struct ublk_poll_group {
140 	struct spdk_thread		*ublk_thread;
141 	struct spdk_poller		*ublk_poller;
142 	struct spdk_iobuf_channel	iobuf_ch;
143 	TAILQ_HEAD(, ublk_queue)	queue_list;
144 };
145 
146 struct ublk_tgt {
147 	int			ctrl_fd;
148 	bool			active;
149 	bool			is_destroying;
150 	spdk_ublk_fini_cb	cb_fn;
151 	void			*cb_arg;
152 	struct io_uring		ctrl_ring;
153 	struct spdk_poller	*ctrl_poller;
154 	uint32_t		ctrl_ops_in_progress;
155 	struct ublk_poll_group	*poll_groups;
156 	uint32_t		num_ublk_devs;
157 	uint64_t		features;
158 	/* `ublk_drv` supports UBLK_F_CMD_IOCTL_ENCODE */
159 	bool ioctl_encode;
160 };
161 
162 static TAILQ_HEAD(, spdk_ublk_dev) g_ublk_devs = TAILQ_HEAD_INITIALIZER(g_ublk_devs);
163 static struct ublk_tgt g_ublk_tgt;
164 
165 /* helpers for using io_uring */
166 static inline int
167 ublk_setup_ring(uint32_t depth, struct io_uring *r, unsigned flags)
168 {
169 	struct io_uring_params p = {};
170 
171 	p.flags = flags | IORING_SETUP_CQSIZE;
172 	p.cq_entries = depth;
173 
174 	return io_uring_queue_init_params(depth, r, &p);
175 }
176 
177 static inline struct io_uring_sqe *
178 ublk_uring_get_sqe(struct io_uring *r, uint32_t idx)
179 {
180 	/* Need to update the idx since we set IORING_SETUP_SQE128 parameter in ublk_setup_ring */
181 	return &r->sq.sqes[idx << 1];
182 }
183 
184 static inline void *
185 ublk_get_sqe_cmd(struct io_uring_sqe *sqe)
186 {
187 	return (void *)&sqe->addr3;
188 }
189 
190 static inline void
191 ublk_set_sqe_cmd_op(struct io_uring_sqe *sqe, uint32_t cmd_op)
192 {
193 	uint32_t opc = cmd_op;
194 
195 	if (g_ublk_tgt.ioctl_encode) {
196 		switch (cmd_op) {
197 		/* ctrl uring */
198 		case UBLK_CMD_GET_DEV_INFO:
199 			opc = _IOR('u', UBLK_CMD_GET_DEV_INFO, struct ublksrv_ctrl_cmd);
200 			break;
201 		case UBLK_CMD_ADD_DEV:
202 			opc = _IOWR('u', UBLK_CMD_ADD_DEV, struct ublksrv_ctrl_cmd);
203 			break;
204 		case UBLK_CMD_DEL_DEV:
205 			opc = _IOWR('u', UBLK_CMD_DEL_DEV, struct ublksrv_ctrl_cmd);
206 			break;
207 		case UBLK_CMD_START_DEV:
208 			opc = _IOWR('u', UBLK_CMD_START_DEV, struct ublksrv_ctrl_cmd);
209 			break;
210 		case UBLK_CMD_STOP_DEV:
211 			opc = _IOWR('u', UBLK_CMD_STOP_DEV, struct ublksrv_ctrl_cmd);
212 			break;
213 		case UBLK_CMD_SET_PARAMS:
214 			opc = _IOWR('u', UBLK_CMD_SET_PARAMS, struct ublksrv_ctrl_cmd);
215 			break;
216 
217 		/* io uring */
218 		case UBLK_IO_FETCH_REQ:
219 			opc = _IOWR('u', UBLK_IO_FETCH_REQ, struct ublksrv_io_cmd);
220 			break;
221 		case UBLK_IO_COMMIT_AND_FETCH_REQ:
222 			opc = _IOWR('u', UBLK_IO_COMMIT_AND_FETCH_REQ, struct ublksrv_io_cmd);
223 			break;
224 		case UBLK_IO_NEED_GET_DATA:
225 			opc = _IOWR('u', UBLK_IO_NEED_GET_DATA, struct ublksrv_io_cmd);
226 			break;
227 		default:
228 			break;
229 		}
230 	}
231 
232 	sqe->off = opc;
233 }
234 
235 static inline uint64_t
236 build_user_data(uint16_t tag, uint8_t op)
237 {
238 	assert(!(tag >> 16) && !(op >> 8));
239 
240 	return tag | (op << 16);
241 }
242 
243 static inline uint16_t
244 user_data_to_tag(uint64_t user_data)
245 {
246 	return user_data & 0xffff;
247 }
248 
249 static inline uint8_t
250 user_data_to_op(uint64_t user_data)
251 {
252 	return (user_data >> 16) & 0xff;
253 }
254 
255 void
256 spdk_ublk_init(void)
257 {
258 	assert(spdk_thread_is_app_thread(NULL));
259 
260 	g_ublk_tgt.ctrl_fd = -1;
261 	g_ublk_tgt.ctrl_ring.ring_fd = -1;
262 }
263 
264 static int
265 ublk_ctrl_poller(void *arg)
266 {
267 	struct io_uring *ring = &g_ublk_tgt.ctrl_ring;
268 	struct spdk_ublk_dev *ublk;
269 	struct io_uring_cqe *cqe;
270 	const int max = 8;
271 	int i, count = 0, rc;
272 
273 	if (!g_ublk_tgt.ctrl_ops_in_progress) {
274 		return SPDK_POLLER_IDLE;
275 	}
276 
277 	for (i = 0; i < max; i++) {
278 		rc = io_uring_peek_cqe(ring, &cqe);
279 		if (rc == -EAGAIN) {
280 			break;
281 		}
282 
283 		assert(cqe != NULL);
284 		g_ublk_tgt.ctrl_ops_in_progress--;
285 		ublk = (struct spdk_ublk_dev *)cqe->user_data;
286 		UBLK_DEBUGLOG(ublk, "ctrl cmd completed\n");
287 		ublk->ctrl_ops_in_progress--;
288 		if (ublk->next_state_fn) {
289 			ublk->next_state_fn(ublk);
290 		}
291 		io_uring_cqe_seen(ring, cqe);
292 		count++;
293 	}
294 
295 	return count > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
296 }
297 
298 static int
299 ublk_ctrl_cmd(struct spdk_ublk_dev *ublk, uint32_t cmd_op)
300 {
301 	uint32_t dev_id = ublk->ublk_id;
302 	int rc = -EINVAL;
303 	struct io_uring_sqe *sqe;
304 	struct ublksrv_ctrl_cmd *cmd;
305 
306 	UBLK_DEBUGLOG(ublk, "ctrl cmd %s\n", ublk_op_name[cmd_op]);
307 
308 	sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring);
309 	if (!sqe) {
310 		SPDK_ERRLOG("No available sqe in ctrl ring\n");
311 		assert(false);
312 		return -ENOENT;
313 	}
314 
315 	cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe);
316 	sqe->fd = g_ublk_tgt.ctrl_fd;
317 	sqe->opcode = IORING_OP_URING_CMD;
318 	sqe->ioprio = 0;
319 	cmd->dev_id = dev_id;
320 	cmd->queue_id = -1;
321 	ublk->next_state_fn = NULL;
322 
323 	switch (cmd_op) {
324 	case UBLK_CMD_ADD_DEV:
325 		ublk->next_state_fn = ublk_set_params;
326 		cmd->addr = (__u64)(uintptr_t)&ublk->dev_info;
327 		cmd->len = sizeof(ublk->dev_info);
328 		break;
329 	case UBLK_CMD_SET_PARAMS:
330 		ublk->next_state_fn = ublk_finish_start;
331 		cmd->addr = (__u64)(uintptr_t)&ublk->dev_params;
332 		cmd->len = sizeof(ublk->dev_params);
333 		break;
334 	case UBLK_CMD_START_DEV:
335 		cmd->data[0] = getpid();
336 		break;
337 	case UBLK_CMD_STOP_DEV:
338 		break;
339 	case UBLK_CMD_DEL_DEV:
340 		ublk->next_state_fn = ublk_free_dev;
341 		break;
342 	default:
343 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", cmd_op);
344 		return -EINVAL;
345 	}
346 	ublk_set_sqe_cmd_op(sqe, cmd_op);
347 	io_uring_sqe_set_data(sqe, ublk);
348 
349 	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
350 	if (rc < 0) {
351 		SPDK_ERRLOG("uring submit rc %d\n", rc);
352 		return rc;
353 	}
354 	g_ublk_tgt.ctrl_ops_in_progress++;
355 	ublk->ctrl_ops_in_progress++;
356 
357 	return 0;
358 }
359 
360 static int
361 ublk_ctrl_cmd_get_features(void)
362 {
363 	int rc;
364 	struct io_uring_sqe *sqe;
365 	struct io_uring_cqe *cqe;
366 	struct ublksrv_ctrl_cmd *cmd;
367 	uint32_t cmd_op;
368 
369 	sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring);
370 	if (!sqe) {
371 		SPDK_ERRLOG("No available sqe in ctrl ring\n");
372 		assert(false);
373 		return -ENOENT;
374 	}
375 
376 	cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe);
377 	sqe->fd = g_ublk_tgt.ctrl_fd;
378 	sqe->opcode = IORING_OP_URING_CMD;
379 	sqe->ioprio = 0;
380 	cmd->dev_id = -1;
381 	cmd->queue_id = -1;
382 	cmd->addr = (__u64)(uintptr_t)&g_ublk_tgt.features;
383 	cmd->len = sizeof(g_ublk_tgt.features);
384 
385 	cmd_op = _IOR('u', 0x13, struct ublksrv_ctrl_cmd);
386 	ublk_set_sqe_cmd_op(sqe, cmd_op);
387 
388 	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
389 	if (rc < 0) {
390 		SPDK_ERRLOG("uring submit rc %d\n", rc);
391 		return rc;
392 	}
393 
394 	rc = io_uring_wait_cqe(&g_ublk_tgt.ctrl_ring, &cqe);
395 	if (rc < 0) {
396 		SPDK_ERRLOG("wait cqe rc %d\n", rc);
397 		return rc;
398 	}
399 
400 	if (cqe->res == 0) {
401 		g_ublk_tgt.ioctl_encode = !!(g_ublk_tgt.features & (1ULL << 6));
402 	}
403 	io_uring_cqe_seen(&g_ublk_tgt.ctrl_ring, cqe);
404 
405 	return 0;
406 }
407 
408 static int
409 ublk_queue_cmd_buf_sz(uint32_t q_depth)
410 {
411 	uint32_t size = q_depth * sizeof(struct ublksrv_io_desc);
412 	uint32_t page_sz = getpagesize();
413 
414 	/* round up size */
415 	return (size + page_sz - 1) & ~(page_sz - 1);
416 }
417 
418 static int
419 ublk_get_max_support_devs(void)
420 {
421 	FILE *file;
422 	char str[128];
423 
424 	file = fopen("/sys/module/ublk_drv/parameters/ublks_max", "r");
425 	if (!file) {
426 		return -ENOENT;
427 	}
428 
429 	if (!fgets(str, sizeof(str), file)) {
430 		fclose(file);
431 		return -EINVAL;
432 	}
433 	fclose(file);
434 
435 	spdk_str_chomp(str);
436 	return spdk_strtol(str, 10);
437 }
438 
439 static int
440 ublk_open(void)
441 {
442 	int rc, ublks_max;
443 
444 	g_ublk_tgt.ctrl_fd = open(UBLK_CTRL_DEV, O_RDWR);
445 	if (g_ublk_tgt.ctrl_fd < 0) {
446 		rc = errno;
447 		SPDK_ERRLOG("UBLK conrol dev %s can't be opened, error=%s\n", UBLK_CTRL_DEV, spdk_strerror(errno));
448 		return -rc;
449 	}
450 
451 	ublks_max = ublk_get_max_support_devs();
452 	if (ublks_max > 0) {
453 		g_ublks_max = ublks_max;
454 	}
455 
456 	/* We need to set SQPOLL for kernels 6.1 and earlier, since they would not defer ublk ctrl
457 	 * ring processing to a workqueue.  Ctrl ring processing is minimal, so SQPOLL is fine.
458 	 * All the commands sent via control uring for a ublk device is executed one by one, so use
459 	 * ublks_max * 2 as the number of uring entries is enough.
460 	 */
461 	rc = ublk_setup_ring(g_ublks_max * 2, &g_ublk_tgt.ctrl_ring,
462 			     IORING_SETUP_SQE128 | IORING_SETUP_SQPOLL);
463 	if (rc < 0) {
464 		SPDK_ERRLOG("UBLK ctrl queue_init: %s\n", spdk_strerror(-rc));
465 		goto err;
466 	}
467 
468 	rc = ublk_ctrl_cmd_get_features();
469 	if (rc) {
470 		goto err;
471 	}
472 
473 	return 0;
474 
475 err:
476 	close(g_ublk_tgt.ctrl_fd);
477 	g_ublk_tgt.ctrl_fd = -1;
478 	return rc;
479 }
480 
481 static int
482 ublk_parse_core_mask(const char *mask)
483 {
484 	struct spdk_cpuset tmp_mask;
485 	int rc;
486 
487 	if (mask == NULL) {
488 		spdk_env_get_cpuset(&g_core_mask);
489 		return 0;
490 	}
491 
492 	rc = spdk_cpuset_parse(&g_core_mask, mask);
493 	if (rc < 0) {
494 		SPDK_ERRLOG("invalid cpumask %s\n", mask);
495 		return -EINVAL;
496 	}
497 
498 	if (spdk_cpuset_count(&g_core_mask) == 0) {
499 		SPDK_ERRLOG("no cpus specified\n");
500 		return -EINVAL;
501 	}
502 
503 	spdk_env_get_cpuset(&tmp_mask);
504 	spdk_cpuset_and(&tmp_mask, &g_core_mask);
505 
506 	if (!spdk_cpuset_equal(&tmp_mask, &g_core_mask)) {
507 		SPDK_ERRLOG("one of selected cpu is outside of core mask(=%s)\n",
508 			    spdk_cpuset_fmt(&g_core_mask));
509 		return -EINVAL;
510 	}
511 
512 	return 0;
513 }
514 
515 static void
516 ublk_poller_register(void *args)
517 {
518 	struct ublk_poll_group *poll_group = args;
519 	int rc;
520 
521 	assert(spdk_get_thread() == poll_group->ublk_thread);
522 	/* Bind ublk spdk_thread to current CPU core in order to avoid thread context switch
523 	 * during uring processing as required by ublk kernel.
524 	 */
525 	spdk_thread_bind(spdk_get_thread(), true);
526 
527 	TAILQ_INIT(&poll_group->queue_list);
528 	poll_group->ublk_poller = SPDK_POLLER_REGISTER(ublk_poll, poll_group, 0);
529 	rc = spdk_iobuf_channel_init(&poll_group->iobuf_ch, "ublk",
530 				     UBLK_IOBUF_SMALL_CACHE_SIZE, UBLK_IOBUF_LARGE_CACHE_SIZE);
531 	if (rc != 0) {
532 		assert(false);
533 	}
534 }
535 
536 int
537 ublk_create_target(const char *cpumask_str)
538 {
539 	int rc;
540 	uint32_t i;
541 	char thread_name[32];
542 	struct ublk_poll_group *poll_group;
543 
544 	if (g_ublk_tgt.active == true) {
545 		SPDK_ERRLOG("UBLK target has been created\n");
546 		return -EBUSY;
547 	}
548 
549 	rc = ublk_parse_core_mask(cpumask_str);
550 	if (rc != 0) {
551 		return rc;
552 	}
553 
554 	assert(g_ublk_tgt.poll_groups == NULL);
555 	g_ublk_tgt.poll_groups = calloc(spdk_env_get_core_count(), sizeof(*poll_group));
556 	if (!g_ublk_tgt.poll_groups) {
557 		return -ENOMEM;
558 	}
559 
560 	rc = ublk_open();
561 	if (rc != 0) {
562 		SPDK_ERRLOG("Fail to open UBLK, error=%s\n", spdk_strerror(-rc));
563 		free(g_ublk_tgt.poll_groups);
564 		return rc;
565 	}
566 
567 	spdk_iobuf_register_module("ublk");
568 
569 	SPDK_ENV_FOREACH_CORE(i) {
570 		if (!spdk_cpuset_get_cpu(&g_core_mask, i)) {
571 			continue;
572 		}
573 		snprintf(thread_name, sizeof(thread_name), "ublk_thread%u", i);
574 		poll_group = &g_ublk_tgt.poll_groups[g_num_ublk_poll_groups];
575 		poll_group->ublk_thread = spdk_thread_create(thread_name, &g_core_mask);
576 		spdk_thread_send_msg(poll_group->ublk_thread, ublk_poller_register, poll_group);
577 		g_num_ublk_poll_groups++;
578 	}
579 
580 	assert(spdk_thread_is_app_thread(NULL));
581 	g_ublk_tgt.active = true;
582 	g_ublk_tgt.ctrl_ops_in_progress = 0;
583 	g_ublk_tgt.ctrl_poller = SPDK_POLLER_REGISTER(ublk_ctrl_poller, NULL,
584 				 UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US);
585 
586 	SPDK_NOTICELOG("UBLK target created successfully\n");
587 
588 	return 0;
589 }
590 
591 static void
592 _ublk_fini_done(void *args)
593 {
594 	SPDK_DEBUGLOG(ublk, "\n");
595 
596 	g_num_ublk_poll_groups = 0;
597 	g_next_ublk_poll_group = 0;
598 	g_ublk_tgt.is_destroying = false;
599 	g_ublk_tgt.active = false;
600 	g_ublk_tgt.features = 0;
601 	g_ublk_tgt.ioctl_encode = false;
602 
603 	if (g_ublk_tgt.cb_fn) {
604 		g_ublk_tgt.cb_fn(g_ublk_tgt.cb_arg);
605 		g_ublk_tgt.cb_fn = NULL;
606 		g_ublk_tgt.cb_arg = NULL;
607 	}
608 
609 	if (g_ublk_tgt.poll_groups) {
610 		free(g_ublk_tgt.poll_groups);
611 		g_ublk_tgt.poll_groups = NULL;
612 	}
613 
614 }
615 
616 static void
617 ublk_thread_exit(void *args)
618 {
619 	struct spdk_thread *ublk_thread = spdk_get_thread();
620 	uint32_t i;
621 
622 	for (i = 0; i < g_num_ublk_poll_groups; i++) {
623 		if (g_ublk_tgt.poll_groups[i].ublk_thread == ublk_thread) {
624 			spdk_poller_unregister(&g_ublk_tgt.poll_groups[i].ublk_poller);
625 			spdk_iobuf_channel_fini(&g_ublk_tgt.poll_groups[i].iobuf_ch);
626 			spdk_thread_bind(ublk_thread, false);
627 			spdk_thread_exit(ublk_thread);
628 		}
629 	}
630 }
631 
632 static int
633 ublk_close_dev(struct spdk_ublk_dev *ublk)
634 {
635 	int rc;
636 
637 	/* set is_closing */
638 	if (ublk->is_closing) {
639 		return -EBUSY;
640 	}
641 	ublk->is_closing = true;
642 
643 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_STOP_DEV);
644 	if (rc < 0) {
645 		SPDK_ERRLOG("stop dev %d failed\n", ublk->ublk_id);
646 	}
647 	return rc;
648 }
649 
650 static void
651 _ublk_fini(void *args)
652 {
653 	struct spdk_ublk_dev	*ublk, *ublk_tmp;
654 
655 	TAILQ_FOREACH_SAFE(ublk, &g_ublk_devs, tailq, ublk_tmp) {
656 		ublk_close_dev(ublk);
657 	}
658 
659 	/* Check if all ublks closed */
660 	if (TAILQ_EMPTY(&g_ublk_devs)) {
661 		SPDK_DEBUGLOG(ublk, "finish shutdown\n");
662 		spdk_poller_unregister(&g_ublk_tgt.ctrl_poller);
663 		if (g_ublk_tgt.ctrl_ring.ring_fd >= 0) {
664 			io_uring_queue_exit(&g_ublk_tgt.ctrl_ring);
665 			g_ublk_tgt.ctrl_ring.ring_fd = -1;
666 		}
667 		if (g_ublk_tgt.ctrl_fd >= 0) {
668 			close(g_ublk_tgt.ctrl_fd);
669 			g_ublk_tgt.ctrl_fd = -1;
670 		}
671 		spdk_for_each_thread(ublk_thread_exit, NULL, _ublk_fini_done);
672 	} else {
673 		spdk_thread_send_msg(spdk_get_thread(), _ublk_fini, NULL);
674 	}
675 }
676 
677 int
678 spdk_ublk_fini(spdk_ublk_fini_cb cb_fn, void *cb_arg)
679 {
680 	assert(spdk_thread_is_app_thread(NULL));
681 
682 	if (g_ublk_tgt.is_destroying == true) {
683 		/* UBLK target is being destroying */
684 		return -EBUSY;
685 	}
686 	g_ublk_tgt.cb_fn = cb_fn;
687 	g_ublk_tgt.cb_arg = cb_arg;
688 	g_ublk_tgt.is_destroying = true;
689 	_ublk_fini(NULL);
690 
691 	return 0;
692 }
693 
694 int
695 ublk_destroy_target(spdk_ublk_fini_cb cb_fn, void *cb_arg)
696 {
697 	int rc;
698 
699 	if (g_ublk_tgt.active == false) {
700 		/* UBLK target has not been created */
701 		return -ENOENT;
702 	}
703 
704 	rc = spdk_ublk_fini(cb_fn, cb_arg);
705 
706 	return rc;
707 }
708 
709 struct spdk_ublk_dev *
710 ublk_dev_find_by_id(uint32_t ublk_id)
711 {
712 	struct spdk_ublk_dev *ublk;
713 
714 	/* check whether ublk has already been registered by ublk path. */
715 	TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) {
716 		if (ublk->ublk_id == ublk_id) {
717 			return ublk;
718 		}
719 	}
720 
721 	return NULL;
722 }
723 
724 uint32_t
725 ublk_dev_get_id(struct spdk_ublk_dev *ublk)
726 {
727 	return ublk->ublk_id;
728 }
729 
730 struct spdk_ublk_dev *ublk_dev_first(void)
731 {
732 	return TAILQ_FIRST(&g_ublk_devs);
733 }
734 
735 struct spdk_ublk_dev *ublk_dev_next(struct spdk_ublk_dev *prev)
736 {
737 	return TAILQ_NEXT(prev, tailq);
738 }
739 
740 uint32_t
741 ublk_dev_get_queue_depth(struct spdk_ublk_dev *ublk)
742 {
743 	return ublk->queue_depth;
744 }
745 
746 uint32_t
747 ublk_dev_get_num_queues(struct spdk_ublk_dev *ublk)
748 {
749 	return ublk->num_queues;
750 }
751 
752 const char *
753 ublk_dev_get_bdev_name(struct spdk_ublk_dev *ublk)
754 {
755 	return spdk_bdev_get_name(ublk->bdev);
756 }
757 
758 void
759 spdk_ublk_write_config_json(struct spdk_json_write_ctx *w)
760 {
761 	struct spdk_ublk_dev *ublk;
762 
763 	spdk_json_write_array_begin(w);
764 
765 	if (g_ublk_tgt.active) {
766 		spdk_json_write_object_begin(w);
767 
768 		spdk_json_write_named_string(w, "method", "ublk_create_target");
769 		spdk_json_write_named_object_begin(w, "params");
770 		spdk_json_write_named_string(w, "cpumask", spdk_cpuset_fmt(&g_core_mask));
771 		spdk_json_write_object_end(w);
772 
773 		spdk_json_write_object_end(w);
774 	}
775 
776 	TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) {
777 		spdk_json_write_object_begin(w);
778 
779 		spdk_json_write_named_string(w, "method", "ublk_start_disk");
780 
781 		spdk_json_write_named_object_begin(w, "params");
782 		spdk_json_write_named_string(w, "bdev_name", ublk_dev_get_bdev_name(ublk));
783 		spdk_json_write_named_uint32(w, "ublk_id", ublk->ublk_id);
784 		spdk_json_write_named_uint32(w, "num_queues", ublk->num_queues);
785 		spdk_json_write_named_uint32(w, "queue_depth", ublk->queue_depth);
786 		spdk_json_write_object_end(w);
787 
788 		spdk_json_write_object_end(w);
789 	}
790 
791 	spdk_json_write_array_end(w);
792 }
793 
794 static void
795 ublk_dev_list_register(struct spdk_ublk_dev *ublk)
796 {
797 	UBLK_DEBUGLOG(ublk, "add to tailq\n");
798 	TAILQ_INSERT_TAIL(&g_ublk_devs, ublk, tailq);
799 	g_ublk_tgt.num_ublk_devs++;
800 }
801 
802 static void
803 ublk_dev_list_unregister(struct spdk_ublk_dev *ublk)
804 {
805 	/*
806 	 * ublk device may be stopped before registered.
807 	 * check whether it was registered.
808 	 */
809 
810 	if (ublk_dev_find_by_id(ublk->ublk_id)) {
811 		UBLK_DEBUGLOG(ublk, "remove from tailq\n");
812 		TAILQ_REMOVE(&g_ublk_devs, ublk, tailq);
813 		assert(g_ublk_tgt.num_ublk_devs);
814 		g_ublk_tgt.num_ublk_devs--;
815 		return;
816 	}
817 
818 	UBLK_DEBUGLOG(ublk, "not found in tailq\n");
819 	assert(false);
820 }
821 
822 static void
823 ublk_delete_dev(void *arg)
824 {
825 	struct spdk_ublk_dev *ublk = arg;
826 	int rc = 0;
827 	uint32_t q_idx;
828 
829 	assert(spdk_thread_is_app_thread(NULL));
830 	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
831 		ublk_dev_queue_fini(&ublk->queues[q_idx]);
832 	}
833 
834 	if (ublk->cdev_fd >= 0) {
835 		close(ublk->cdev_fd);
836 	}
837 
838 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_DEL_DEV);
839 	if (rc < 0) {
840 		SPDK_ERRLOG("delete dev %d failed\n", ublk->ublk_id);
841 	}
842 }
843 
844 static int
845 _ublk_close_dev_retry(void *arg)
846 {
847 	struct spdk_ublk_dev *ublk = arg;
848 
849 	if (ublk->ctrl_ops_in_progress > 0) {
850 		if (ublk->retry_count-- > 0) {
851 			return SPDK_POLLER_BUSY;
852 		}
853 		SPDK_ERRLOG("Timeout on ctrl op completion.\n");
854 	}
855 	spdk_poller_unregister(&ublk->retry_poller);
856 	ublk_delete_dev(ublk);
857 	return SPDK_POLLER_BUSY;
858 }
859 
860 static void
861 ublk_try_close_dev(void *arg)
862 {
863 	struct spdk_ublk_dev *ublk = arg;
864 
865 	assert(spdk_thread_is_app_thread(NULL));
866 	ublk->queues_closed += 1;
867 	if (ublk->queues_closed < ublk->num_queues) {
868 		return;
869 	}
870 
871 	if (ublk->ctrl_ops_in_progress > 0) {
872 		assert(ublk->retry_poller == NULL);
873 		ublk->retry_count = UBLK_STOP_BUSY_WAITING_MS * 1000ULL / UBLK_BUSY_POLLING_INTERVAL_US;
874 		ublk->retry_poller = SPDK_POLLER_REGISTER(_ublk_close_dev_retry, ublk,
875 				     UBLK_BUSY_POLLING_INTERVAL_US);
876 	} else {
877 		ublk_delete_dev(ublk);
878 	}
879 }
880 
881 static void
882 ublk_try_close_queue(struct ublk_queue *q)
883 {
884 	struct spdk_ublk_dev *ublk = q->dev;
885 
886 	/* Close queue until no I/O is submitted to bdev in flight,
887 	 * no I/O is waiting to commit result, and all I/Os are aborted back.
888 	 */
889 	if (!TAILQ_EMPTY(&q->inflight_io_list) || !TAILQ_EMPTY(&q->completed_io_list) || q->cmd_inflight) {
890 		/* wait for next retry */
891 		return;
892 	}
893 
894 	TAILQ_REMOVE(&q->poll_group->queue_list, q, tailq);
895 	spdk_put_io_channel(q->bdev_ch);
896 	q->bdev_ch = NULL;
897 
898 	spdk_thread_send_msg(spdk_thread_get_app_thread(), ublk_try_close_dev, ublk);
899 }
900 
901 int
902 ublk_stop_disk(uint32_t ublk_id, ublk_del_cb del_cb, void *cb_arg)
903 {
904 	struct spdk_ublk_dev *ublk;
905 
906 	assert(spdk_thread_is_app_thread(NULL));
907 
908 	ublk = ublk_dev_find_by_id(ublk_id);
909 	if (ublk == NULL) {
910 		SPDK_ERRLOG("no ublk dev with ublk_id=%u\n", ublk_id);
911 		return -ENODEV;
912 	}
913 	if (ublk->is_closing) {
914 		SPDK_WARNLOG("ublk %d is closing\n", ublk->ublk_id);
915 		return -EBUSY;
916 	}
917 
918 	ublk->del_cb = del_cb;
919 	ublk->cb_arg = cb_arg;
920 	return ublk_close_dev(ublk);
921 }
922 
923 static inline void
924 ublk_mark_io_get_data(struct ublk_io *io)
925 {
926 	io->cmd_op = UBLK_IO_NEED_GET_DATA;
927 	io->result = 0;
928 }
929 
930 static inline void
931 ublk_mark_io_done(struct ublk_io *io, int res)
932 {
933 	/*
934 	 * mark io done by target, so that SPDK can commit its
935 	 * result and fetch new request via io_uring command.
936 	 */
937 	io->cmd_op = UBLK_IO_COMMIT_AND_FETCH_REQ;
938 	io->result = res;
939 }
940 
941 static void
942 ublk_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
943 {
944 	struct ublk_io	*io = cb_arg;
945 	struct ublk_queue *q = io->q;
946 	int res, tag;
947 
948 	if (success) {
949 		res = io->result;
950 	} else {
951 		res = -EIO;
952 	}
953 
954 	ublk_mark_io_done(io, res);
955 	tag = (int)(io - q->ios);
956 	q->ios[tag].need_data = false;
957 
958 	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %d res %d)\n",
959 		      q->q_id, tag, res);
960 	TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
961 	TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq);
962 
963 	if (bdev_io != NULL) {
964 		spdk_bdev_free_io(bdev_io);
965 	}
966 }
967 
968 static void
969 ublk_resubmit_io(void *arg)
970 {
971 	struct ublk_io *io = (struct ublk_io *)arg;
972 	uint16_t tag = (io - io->q->ios);
973 
974 	ublk_submit_bdev_io(io->q, tag);
975 }
976 
977 static void
978 ublk_queue_io(struct ublk_io *io)
979 {
980 	int rc;
981 	struct spdk_bdev *bdev = io->q->dev->bdev;
982 	struct ublk_queue *q = io->q;
983 
984 	io->bdev_io_wait.bdev = bdev;
985 	io->bdev_io_wait.cb_fn = ublk_resubmit_io;
986 	io->bdev_io_wait.cb_arg = io;
987 
988 	rc = spdk_bdev_queue_io_wait(bdev, q->bdev_ch, &io->bdev_io_wait);
989 	if (rc != 0) {
990 		SPDK_ERRLOG("Queue io failed in ublk_queue_io, rc=%d.\n", rc);
991 		ublk_io_done(NULL, false, io);
992 	}
993 }
994 
995 static void
996 ublk_io_get_buffer_cb(struct spdk_iobuf_entry *iobuf, void *buf)
997 {
998 	struct ublk_io *io = SPDK_CONTAINEROF(iobuf, struct ublk_io, iobuf);
999 
1000 	io->mpool_entry = buf;
1001 	io->payload = (void *)(uintptr_t)SPDK_ALIGN_CEIL((uintptr_t)buf, 4096ULL);
1002 	io->get_buf_cb(io);
1003 }
1004 
1005 static void
1006 ublk_io_get_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch,
1007 		   ublk_get_buf_cb get_buf_cb)
1008 {
1009 	uint64_t io_size;
1010 	void *buf;
1011 
1012 	io_size = io->iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1013 	io->get_buf_cb = get_buf_cb;
1014 	buf = spdk_iobuf_get(iobuf_ch, io_size, &io->iobuf, ublk_io_get_buffer_cb);
1015 	if (buf != NULL) {
1016 		ublk_io_get_buffer_cb(&io->iobuf, buf);
1017 	}
1018 }
1019 
1020 static void
1021 ublk_io_put_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch)
1022 {
1023 	uint64_t io_size;
1024 
1025 	if (io->payload) {
1026 		io_size = io->iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1027 		spdk_iobuf_put(iobuf_ch, io->mpool_entry, io_size);
1028 		io->mpool_entry = NULL;
1029 		io->payload = NULL;
1030 	}
1031 }
1032 
1033 static void
1034 read_get_buffer_done(struct ublk_io *io)
1035 {
1036 	struct spdk_bdev_desc *desc = io->bdev_desc;
1037 	struct spdk_io_channel *ch = io->bdev_ch;
1038 	uint64_t offset_blocks, num_blocks;
1039 	int rc = 0;
1040 	const struct ublksrv_io_desc *iod = io->iod;
1041 
1042 	offset_blocks = iod->start_sector >> io->sector_per_block_shift;
1043 	num_blocks = iod->nr_sectors >> io->sector_per_block_shift;
1044 
1045 	rc = spdk_bdev_read_blocks(desc, ch, io->payload, offset_blocks, num_blocks, ublk_io_done, io);
1046 	if (rc == -ENOMEM) {
1047 		SPDK_INFOLOG(ublk, "No memory, start to queue io.\n");
1048 		ublk_queue_io(io);
1049 	} else if (rc < 0) {
1050 		SPDK_ERRLOG("ublk io failed in ublk_queue_io, rc=%d.\n", rc);
1051 		ublk_io_done(NULL, false, io);
1052 	}
1053 }
1054 
1055 static void
1056 ublk_submit_bdev_io(struct ublk_queue *q, uint16_t tag)
1057 {
1058 	struct spdk_ublk_dev *ublk = q->dev;
1059 	struct ublk_io *io = &q->ios[tag];
1060 	struct spdk_bdev_desc *desc = io->bdev_desc;
1061 	struct spdk_io_channel *ch = io->bdev_ch;
1062 	struct spdk_iobuf_channel *iobuf_ch = &q->poll_group->iobuf_ch;
1063 	uint64_t offset_blocks, num_blocks;
1064 	uint8_t ublk_op;
1065 	int rc = 0;
1066 	const struct ublksrv_io_desc *iod = io->iod;
1067 
1068 	ublk_op = ublksrv_get_op(iod);
1069 	offset_blocks = iod->start_sector >> ublk->sector_per_block_shift;
1070 	num_blocks = iod->nr_sectors >> ublk->sector_per_block_shift;
1071 
1072 	io->result = num_blocks * spdk_bdev_get_data_block_size(ublk->bdev);
1073 	switch (ublk_op) {
1074 	case UBLK_IO_OP_READ:
1075 		ublk_io_get_buffer(io, iobuf_ch, read_get_buffer_done);
1076 		return;
1077 	case UBLK_IO_OP_WRITE:
1078 		assert((void *)iod->addr == io->payload);
1079 		rc = spdk_bdev_write_blocks(desc, ch, io->payload, offset_blocks, num_blocks, ublk_io_done, io);
1080 		break;
1081 	case UBLK_IO_OP_FLUSH:
1082 		rc = spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1083 		break;
1084 	case UBLK_IO_OP_DISCARD:
1085 		rc = spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1086 		break;
1087 	case UBLK_IO_OP_WRITE_ZEROES:
1088 		rc = spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1089 		break;
1090 	default:
1091 		rc = -1;
1092 	}
1093 
1094 	if (rc < 0) {
1095 		if (rc == -ENOMEM) {
1096 			SPDK_INFOLOG(ublk, "No memory, start to queue io.\n");
1097 			ublk_queue_io(io);
1098 		} else {
1099 			SPDK_ERRLOG("ublk io failed in ublk_queue_io, rc=%d.\n", rc);
1100 			ublk_io_done(NULL, false, io);
1101 		}
1102 	}
1103 }
1104 
1105 static inline void
1106 ublksrv_queue_io_cmd(struct ublk_queue *q,
1107 		     struct ublk_io *io, unsigned tag)
1108 {
1109 	struct ublksrv_io_cmd *cmd;
1110 	struct io_uring_sqe *sqe;
1111 	unsigned int cmd_op = 0;;
1112 	uint64_t user_data;
1113 
1114 	/* each io should have operation of fetching or committing */
1115 	assert((io->cmd_op == UBLK_IO_FETCH_REQ) || (io->cmd_op == UBLK_IO_NEED_GET_DATA) ||
1116 	       (io->cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ));
1117 	cmd_op = io->cmd_op;
1118 
1119 	sqe = io_uring_get_sqe(&q->ring);
1120 	assert(sqe);
1121 
1122 	cmd = (struct ublksrv_io_cmd *)ublk_get_sqe_cmd(sqe);
1123 	if (cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ) {
1124 		cmd->result = io->result;
1125 	}
1126 
1127 	/* These fields should be written once, never change */
1128 	ublk_set_sqe_cmd_op(sqe, cmd_op);
1129 	/* dev->cdev_fd */
1130 	sqe->fd		= 0;
1131 	sqe->opcode	= IORING_OP_URING_CMD;
1132 	sqe->flags	= IOSQE_FIXED_FILE;
1133 	sqe->rw_flags	= 0;
1134 	cmd->tag	= tag;
1135 	cmd->addr	= (__u64)(uintptr_t)(io->payload);
1136 	cmd->q_id	= q->q_id;
1137 
1138 	user_data = build_user_data(tag, cmd_op);
1139 	io_uring_sqe_set_data64(sqe, user_data);
1140 
1141 	io->cmd_op = 0;
1142 	q->cmd_inflight += 1;
1143 
1144 	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %u cmd_op %u) iof %x stopping %d\n",
1145 		      q->q_id, tag, cmd_op,
1146 		      io->cmd_op, q->is_stopping);
1147 }
1148 
1149 static int
1150 ublk_io_xmit(struct ublk_queue *q)
1151 {
1152 	TAILQ_HEAD(, ublk_io) buffer_free_list;
1153 	struct spdk_iobuf_channel *iobuf_ch;
1154 	int rc = 0, count = 0, tag;
1155 	struct ublk_io *io;
1156 
1157 	if (TAILQ_EMPTY(&q->completed_io_list)) {
1158 		return 0;
1159 	}
1160 
1161 	TAILQ_INIT(&buffer_free_list);
1162 	while (!TAILQ_EMPTY(&q->completed_io_list)) {
1163 		io = TAILQ_FIRST(&q->completed_io_list);
1164 		tag = io - io->q->ios;
1165 		assert(io != NULL);
1166 		/*
1167 		 * Remove IO from list now assuming it will be completed. It will be inserted
1168 		 * back to the head if it cannot be completed. This approach is specifically
1169 		 * taken to work around a scan-build use-after-free mischaracterization.
1170 		 */
1171 		TAILQ_REMOVE(&q->completed_io_list, io, tailq);
1172 		if (!io->need_data) {
1173 			TAILQ_INSERT_TAIL(&buffer_free_list, io, tailq);
1174 		}
1175 		ublksrv_queue_io_cmd(q, io, tag);
1176 		count++;
1177 	}
1178 
1179 	rc = io_uring_submit(&q->ring);
1180 	if (rc != count) {
1181 		SPDK_ERRLOG("could not submit all commands\n");
1182 		assert(false);
1183 	}
1184 
1185 	/* Note: for READ io, ublk will always copy the data out of
1186 	 * the buffers in the io_uring_submit context.  Since we
1187 	 * are not using SQPOLL for IO rings, we can safely free
1188 	 * those IO buffers here.  This design doesn't seem ideal,
1189 	 * but it's what's possible since there is no discrete
1190 	 * COMMIT_REQ operation.  That will need to change in the
1191 	 * future should we ever want to support async copy
1192 	 * operations.
1193 	 */
1194 	iobuf_ch = &q->poll_group->iobuf_ch;
1195 	while (!TAILQ_EMPTY(&buffer_free_list)) {
1196 		io = TAILQ_FIRST(&buffer_free_list);
1197 		TAILQ_REMOVE(&buffer_free_list, io, tailq);
1198 		ublk_io_put_buffer(io, iobuf_ch);
1199 	}
1200 	return rc;
1201 }
1202 
1203 static void
1204 write_get_buffer_done(struct ublk_io *io)
1205 {
1206 	io->need_data = true;
1207 	ublk_mark_io_get_data(io);
1208 	TAILQ_REMOVE(&io->q->inflight_io_list, io, tailq);
1209 	TAILQ_INSERT_TAIL(&io->q->completed_io_list, io, tailq);
1210 }
1211 
1212 static int
1213 ublk_io_recv(struct ublk_queue *q)
1214 {
1215 	struct io_uring_cqe *cqe;
1216 	unsigned head, tag;
1217 	int fetch, count = 0;
1218 	struct ublk_io *io;
1219 	struct spdk_iobuf_channel *iobuf_ch;
1220 
1221 	if (q->cmd_inflight == 0) {
1222 		return 0;
1223 	}
1224 
1225 	iobuf_ch = &q->poll_group->iobuf_ch;
1226 	io_uring_for_each_cqe(&q->ring, head, cqe) {
1227 		tag = user_data_to_tag(cqe->user_data);
1228 		fetch = (cqe->res != UBLK_IO_RES_ABORT) && !q->is_stopping;
1229 
1230 		SPDK_DEBUGLOG(ublk_io, "res %d qid %d tag %u cmd_op %u\n",
1231 			      cqe->res, q->q_id, tag, user_data_to_op(cqe->user_data));
1232 
1233 		q->cmd_inflight--;
1234 		io = &q->ios[tag];
1235 
1236 		if (!fetch) {
1237 			q->is_stopping = true;
1238 			if (io->cmd_op == UBLK_IO_FETCH_REQ) {
1239 				io->cmd_op = 0;
1240 			}
1241 		}
1242 
1243 		TAILQ_INSERT_TAIL(&q->inflight_io_list, io, tailq);
1244 		if (cqe->res == UBLK_IO_RES_OK) {
1245 			ublk_submit_bdev_io(q, tag);
1246 		} else if (cqe->res == UBLK_IO_RES_NEED_GET_DATA) {
1247 			ublk_io_get_buffer(io, iobuf_ch, write_get_buffer_done);
1248 		} else {
1249 			if (cqe->res != UBLK_IO_RES_ABORT) {
1250 				SPDK_ERRLOG("ublk received error io: res %d qid %d tag %u cmd_op %u\n",
1251 					    cqe->res, q->q_id, tag, user_data_to_op(cqe->user_data));
1252 			}
1253 			TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1254 		}
1255 		count += 1;
1256 		if (count == UBLK_QUEUE_REQUEST) {
1257 			break;
1258 		}
1259 	}
1260 	io_uring_cq_advance(&q->ring, count);
1261 
1262 	return count;
1263 }
1264 
1265 static int
1266 ublk_poll(void *arg)
1267 {
1268 	struct ublk_poll_group *poll_group = arg;
1269 	struct ublk_queue *q, *q_tmp;
1270 	int sent, received, count = 0;
1271 
1272 	TAILQ_FOREACH_SAFE(q, &poll_group->queue_list, tailq, q_tmp) {
1273 		sent = ublk_io_xmit(q);
1274 		received = ublk_io_recv(q);
1275 		if (spdk_unlikely(q->is_stopping)) {
1276 			ublk_try_close_queue(q);
1277 		}
1278 		count += sent + received;
1279 	}
1280 	if (count > 0) {
1281 		return SPDK_POLLER_BUSY;
1282 	} else {
1283 		return SPDK_POLLER_IDLE;
1284 	}
1285 }
1286 
1287 static void
1288 ublk_bdev_hot_remove(struct spdk_ublk_dev *ublk)
1289 {
1290 	ublk_close_dev(ublk);
1291 }
1292 
1293 static void
1294 ublk_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1295 		   void *event_ctx)
1296 {
1297 	switch (type) {
1298 	case SPDK_BDEV_EVENT_REMOVE:
1299 		ublk_bdev_hot_remove(event_ctx);
1300 		break;
1301 	default:
1302 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1303 		break;
1304 	}
1305 }
1306 
1307 static void
1308 ublk_dev_init_io_cmds(struct io_uring *r, uint32_t q_depth)
1309 {
1310 	struct io_uring_sqe *sqe;
1311 	uint32_t i;
1312 
1313 	for (i = 0; i < q_depth; i++) {
1314 		sqe = ublk_uring_get_sqe(r, i);
1315 
1316 		/* These fields should be written once, never change */
1317 		sqe->flags = IOSQE_FIXED_FILE;
1318 		sqe->rw_flags = 0;
1319 		sqe->ioprio = 0;
1320 		sqe->off = 0;
1321 	}
1322 }
1323 
1324 static int
1325 ublk_dev_queue_init(struct ublk_queue *q)
1326 {
1327 	int rc = 0, cmd_buf_size;
1328 	uint32_t j;
1329 	struct spdk_ublk_dev *ublk = q->dev;
1330 	unsigned long off;
1331 
1332 	cmd_buf_size = ublk_queue_cmd_buf_sz(q->q_depth);
1333 	off = UBLKSRV_CMD_BUF_OFFSET +
1334 	      q->q_id * (UBLK_MAX_QUEUE_DEPTH * sizeof(struct ublksrv_io_desc));
1335 	q->io_cmd_buf = (struct ublksrv_io_desc *)mmap(0, cmd_buf_size, PROT_READ,
1336 			MAP_SHARED | MAP_POPULATE, ublk->cdev_fd, off);
1337 	if (q->io_cmd_buf == MAP_FAILED) {
1338 		q->io_cmd_buf = NULL;
1339 		rc = -errno;
1340 		SPDK_ERRLOG("Failed at mmap: %s\n", spdk_strerror(-rc));
1341 		goto err;
1342 	}
1343 
1344 	for (j = 0; j < q->q_depth; j++) {
1345 		q->ios[j].cmd_op = UBLK_IO_FETCH_REQ;
1346 		q->ios[j].iod = &q->io_cmd_buf[j];
1347 	}
1348 
1349 	rc = ublk_setup_ring(q->q_depth, &q->ring, IORING_SETUP_SQE128);
1350 	if (rc < 0) {
1351 		SPDK_ERRLOG("Failed at setup uring: %s\n", spdk_strerror(-rc));
1352 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1353 		q->io_cmd_buf = NULL;
1354 		goto err;
1355 	}
1356 
1357 	rc = io_uring_register_files(&q->ring, &ublk->cdev_fd, 1);
1358 	if (rc != 0) {
1359 		SPDK_ERRLOG("Failed at uring register files: %s\n", spdk_strerror(-rc));
1360 		io_uring_queue_exit(&q->ring);
1361 		q->ring.ring_fd = -1;
1362 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1363 		q->io_cmd_buf = NULL;
1364 		goto err;
1365 	}
1366 
1367 	ublk_dev_init_io_cmds(&q->ring, q->q_depth);
1368 
1369 	return 0;
1370 err:
1371 	return rc;
1372 }
1373 
1374 static void
1375 ublk_dev_queue_fini(struct ublk_queue *q)
1376 {
1377 	if (q->ring.ring_fd >= 0) {
1378 		io_uring_unregister_files(&q->ring);
1379 		io_uring_queue_exit(&q->ring);
1380 		q->ring.ring_fd = -1;
1381 	}
1382 	if (q->io_cmd_buf) {
1383 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1384 	}
1385 }
1386 
1387 static void
1388 ublk_dev_queue_io_init(struct ublk_queue *q)
1389 {
1390 	struct ublk_io *io;
1391 	uint32_t i;
1392 	int rc __attribute__((unused));
1393 	void *buf;
1394 
1395 	/* Some older kernels require a buffer to get posted, even
1396 	 * when NEED_GET_DATA has been specified.  So allocate a
1397 	 * temporary buffer, only for purposes of this workaround.
1398 	 * It never actually gets used, so we will free it immediately
1399 	 * after all of the commands are posted.
1400 	 */
1401 	buf = malloc(64);
1402 
1403 	assert(q->bdev_ch != NULL);
1404 
1405 	/* Initialize and submit all io commands to ublk driver */
1406 	for (i = 0; i < q->q_depth; i++) {
1407 		io = &q->ios[i];
1408 		io->payload = buf;
1409 		io->bdev_ch = q->bdev_ch;
1410 		io->bdev_desc = q->dev->bdev_desc;
1411 		io->sector_per_block_shift = q->dev->sector_per_block_shift;
1412 		ublksrv_queue_io_cmd(q, io, i);
1413 	}
1414 
1415 	rc = io_uring_submit(&q->ring);
1416 	assert(rc == (int)q->q_depth);
1417 	for (i = 0; i < q->q_depth; i++) {
1418 		io = &q->ios[i];
1419 		io->payload = NULL;
1420 	}
1421 	free(buf);
1422 }
1423 
1424 static void
1425 ublk_set_params(struct spdk_ublk_dev *ublk)
1426 {
1427 	int rc;
1428 
1429 	ublk->dev_params.len = sizeof(struct ublk_params);
1430 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_SET_PARAMS);
1431 	if (rc < 0) {
1432 		SPDK_ERRLOG("UBLK can't set params for dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
1433 		ublk_delete_dev(ublk);
1434 		if (ublk->start_cb) {
1435 			ublk->start_cb(ublk->cb_arg, rc);
1436 			ublk->start_cb = NULL;
1437 		}
1438 	}
1439 }
1440 
1441 /* Set ublk device parameters based on bdev */
1442 static void
1443 ublk_info_param_init(struct spdk_ublk_dev *ublk)
1444 {
1445 	struct spdk_bdev *bdev = ublk->bdev;
1446 	uint32_t blk_size = spdk_bdev_get_data_block_size(bdev);
1447 	uint32_t pblk_size = spdk_bdev_get_physical_block_size(bdev);
1448 	uint32_t io_opt_blocks = spdk_bdev_get_optimal_io_boundary(bdev);
1449 	uint64_t num_blocks = spdk_bdev_get_num_blocks(bdev);
1450 	uint8_t sectors_per_block = blk_size >> LINUX_SECTOR_SHIFT;
1451 	uint32_t io_min_size = blk_size;
1452 	uint32_t io_opt_size = spdk_max(io_opt_blocks * blk_size, io_min_size);
1453 
1454 	struct ublksrv_ctrl_dev_info uinfo = {
1455 		.queue_depth = ublk->queue_depth,
1456 		.nr_hw_queues = ublk->num_queues,
1457 		.dev_id = ublk->ublk_id,
1458 		.max_io_buf_bytes = UBLK_IO_MAX_BYTES,
1459 		.ublksrv_pid = getpid(),
1460 		.flags = UBLK_F_NEED_GET_DATA | UBLK_F_URING_CMD_COMP_IN_TASK,
1461 	};
1462 	struct ublk_params uparams = {
1463 		.types = UBLK_PARAM_TYPE_BASIC,
1464 		.basic = {
1465 			.logical_bs_shift = spdk_u32log2(blk_size),
1466 			.physical_bs_shift = spdk_u32log2(pblk_size),
1467 			.io_min_shift = spdk_u32log2(io_min_size),
1468 			.io_opt_shift = spdk_u32log2(io_opt_size),
1469 			.dev_sectors = num_blocks * sectors_per_block,
1470 			.max_sectors = UBLK_IO_MAX_BYTES >> LINUX_SECTOR_SHIFT,
1471 		}
1472 	};
1473 
1474 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1475 		uparams.types |= UBLK_PARAM_TYPE_DISCARD;
1476 		uparams.discard.discard_alignment = sectors_per_block;
1477 		uparams.discard.max_discard_sectors = num_blocks * sectors_per_block;
1478 		uparams.discard.max_discard_segments = 1;
1479 		uparams.discard.discard_granularity = blk_size;
1480 		if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1481 			uparams.discard.max_write_zeroes_sectors = num_blocks * sectors_per_block;
1482 		}
1483 	}
1484 
1485 	ublk->dev_info = uinfo;
1486 	ublk->dev_params = uparams;
1487 }
1488 
1489 static void
1490 _ublk_free_dev(void *arg)
1491 {
1492 	struct spdk_ublk_dev *ublk = arg;
1493 
1494 	ublk_free_dev(ublk);
1495 }
1496 
1497 static void
1498 free_buffers(void *arg)
1499 {
1500 	struct ublk_queue *q = arg;
1501 	uint32_t i;
1502 
1503 	for (i = 0; i < q->q_depth; i++) {
1504 		ublk_io_put_buffer(&q->ios[i], &q->poll_group->iobuf_ch);
1505 	}
1506 	free(q->ios);
1507 	q->ios = NULL;
1508 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _ublk_free_dev, q->dev);
1509 }
1510 
1511 static void
1512 ublk_free_dev(struct spdk_ublk_dev *ublk)
1513 {
1514 	struct ublk_queue *q;
1515 	uint32_t q_idx;
1516 
1517 	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
1518 		q = &ublk->queues[q_idx];
1519 
1520 		/* The ublk_io of this queue are not initialized. */
1521 		if (q->ios == NULL) {
1522 			continue;
1523 		}
1524 
1525 		/* We found a queue that has an ios array that may have buffers
1526 		 * that need to be freed.  Send a message to the queue's thread
1527 		 * so it can free the buffers back to that thread's iobuf channel.
1528 		 * When it's done, it will set q->ios to NULL and send a message
1529 		 * back to this function to continue.
1530 		 */
1531 		if (q->poll_group) {
1532 			spdk_thread_send_msg(q->poll_group->ublk_thread, free_buffers, q);
1533 			return;
1534 		} else {
1535 			free(q->ios);
1536 			q->ios = NULL;
1537 		}
1538 	}
1539 
1540 	/* All of the buffers associated with the queues have been freed, so now
1541 	 * continue with releasing resources for the rest of the ublk device.
1542 	 */
1543 	if (ublk->bdev_desc) {
1544 		spdk_bdev_close(ublk->bdev_desc);
1545 		ublk->bdev_desc = NULL;
1546 	}
1547 
1548 	ublk_dev_list_unregister(ublk);
1549 
1550 	if (ublk->del_cb) {
1551 		ublk->del_cb(ublk->cb_arg);
1552 	}
1553 	SPDK_NOTICELOG("ublk dev %d stopped\n", ublk->ublk_id);
1554 	free(ublk);
1555 }
1556 
1557 static int
1558 ublk_ios_init(struct spdk_ublk_dev *ublk)
1559 {
1560 	int rc;
1561 	uint32_t i, j;
1562 	struct ublk_queue *q;
1563 
1564 	for (i = 0; i < ublk->num_queues; i++) {
1565 		q = &ublk->queues[i];
1566 
1567 		TAILQ_INIT(&q->completed_io_list);
1568 		TAILQ_INIT(&q->inflight_io_list);
1569 		q->dev = ublk;
1570 		q->q_id = i;
1571 		q->q_depth = ublk->queue_depth;
1572 		q->ios = calloc(q->q_depth, sizeof(struct ublk_io));
1573 		if (!q->ios) {
1574 			rc = -ENOMEM;
1575 			SPDK_ERRLOG("could not allocate queue ios\n");
1576 			goto err;
1577 		}
1578 		for (j = 0; j < q->q_depth; j++) {
1579 			q->ios[j].q = q;
1580 		}
1581 	}
1582 
1583 	return 0;
1584 
1585 err:
1586 	for (i = 0; i < ublk->num_queues; i++) {
1587 		free(q->ios);
1588 		q->ios = NULL;
1589 	}
1590 	return rc;
1591 }
1592 
1593 static void
1594 ublk_queue_run(void *arg1)
1595 {
1596 	struct ublk_queue	*q = arg1;
1597 	struct spdk_ublk_dev *ublk = q->dev;
1598 	struct ublk_poll_group *poll_group = q->poll_group;
1599 
1600 	assert(spdk_get_thread() == poll_group->ublk_thread);
1601 	q->bdev_ch = spdk_bdev_get_io_channel(ublk->bdev_desc);
1602 	/* Queues must be filled with IO in the io pthread */
1603 	ublk_dev_queue_io_init(q);
1604 
1605 	TAILQ_INSERT_TAIL(&poll_group->queue_list, q, tailq);
1606 }
1607 
1608 int
1609 ublk_start_disk(const char *bdev_name, uint32_t ublk_id,
1610 		uint32_t num_queues, uint32_t queue_depth,
1611 		ublk_start_cb start_cb, void *cb_arg)
1612 {
1613 	int			rc;
1614 	uint32_t		i;
1615 	struct spdk_bdev	*bdev;
1616 	struct spdk_ublk_dev	*ublk = NULL;
1617 	uint32_t		sector_per_block;
1618 
1619 	assert(spdk_thread_is_app_thread(NULL));
1620 
1621 	if (g_ublk_tgt.active == false) {
1622 		SPDK_ERRLOG("NO ublk target exist\n");
1623 		return -ENODEV;
1624 	}
1625 
1626 	ublk = ublk_dev_find_by_id(ublk_id);
1627 	if (ublk != NULL) {
1628 		SPDK_DEBUGLOG(ublk, "ublk id %d is in use.\n", ublk_id);
1629 		return -EBUSY;
1630 	}
1631 
1632 	if (g_ublk_tgt.num_ublk_devs >= g_ublks_max) {
1633 		SPDK_DEBUGLOG(ublk, "Reached maximum number of supported devices: %u\n", g_ublks_max);
1634 		return -ENOTSUP;
1635 	}
1636 
1637 	ublk = calloc(1, sizeof(*ublk));
1638 	if (ublk == NULL) {
1639 		return -ENOMEM;
1640 	}
1641 	ublk->start_cb = start_cb;
1642 	ublk->cb_arg = cb_arg;
1643 	ublk->cdev_fd = -1;
1644 	ublk->ublk_id = ublk_id;
1645 	UBLK_DEBUGLOG(ublk, "bdev %s num_queues %d queue_depth %d\n",
1646 		      bdev_name, num_queues, queue_depth);
1647 
1648 	rc = spdk_bdev_open_ext(bdev_name, true, ublk_bdev_event_cb, ublk, &ublk->bdev_desc);
1649 	if (rc != 0) {
1650 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc);
1651 		free(ublk);
1652 		return rc;
1653 	}
1654 
1655 	bdev = spdk_bdev_desc_get_bdev(ublk->bdev_desc);
1656 	ublk->bdev = bdev;
1657 	sector_per_block = spdk_bdev_get_data_block_size(ublk->bdev) >> LINUX_SECTOR_SHIFT;
1658 	ublk->sector_per_block_shift = spdk_u32log2(sector_per_block);
1659 
1660 	ublk->queues_closed = 0;
1661 	ublk->num_queues = num_queues;
1662 	ublk->queue_depth = queue_depth;
1663 	if (ublk->queue_depth > UBLK_DEV_MAX_QUEUE_DEPTH) {
1664 		SPDK_WARNLOG("Set Queue depth %d of UBLK %d to maximum %d\n",
1665 			     ublk->queue_depth, ublk->ublk_id, UBLK_DEV_MAX_QUEUE_DEPTH);
1666 		ublk->queue_depth = UBLK_DEV_MAX_QUEUE_DEPTH;
1667 	}
1668 	if (ublk->num_queues > UBLK_DEV_MAX_QUEUES) {
1669 		SPDK_WARNLOG("Set Queue num %d of UBLK %d to maximum %d\n",
1670 			     ublk->num_queues, ublk->ublk_id, UBLK_DEV_MAX_QUEUES);
1671 		ublk->num_queues = UBLK_DEV_MAX_QUEUES;
1672 	}
1673 	for (i = 0; i < ublk->num_queues; i++) {
1674 		ublk->queues[i].ring.ring_fd = -1;
1675 	}
1676 
1677 	ublk_info_param_init(ublk);
1678 	rc = ublk_ios_init(ublk);
1679 	if (rc != 0) {
1680 		spdk_bdev_close(ublk->bdev_desc);
1681 		free(ublk);
1682 		return rc;
1683 	}
1684 
1685 	SPDK_INFOLOG(ublk, "Enabling kernel access to bdev %s via ublk %d\n",
1686 		     bdev_name, ublk_id);
1687 
1688 	/* Add ublk_dev to the end of disk list */
1689 	ublk_dev_list_register(ublk);
1690 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_ADD_DEV);
1691 	if (rc < 0) {
1692 		SPDK_ERRLOG("UBLK can't add dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
1693 		ublk_free_dev(ublk);
1694 	}
1695 
1696 	return rc;
1697 }
1698 
1699 static void
1700 ublk_finish_start(struct spdk_ublk_dev *ublk)
1701 {
1702 	int			rc;
1703 	uint32_t		q_id;
1704 	struct spdk_thread	*ublk_thread;
1705 	char			buf[64];
1706 
1707 	snprintf(buf, 64, "%s%d", UBLK_BLK_CDEV, ublk->ublk_id);
1708 	ublk->cdev_fd = open(buf, O_RDWR);
1709 	if (ublk->cdev_fd < 0) {
1710 		rc = ublk->cdev_fd;
1711 		SPDK_ERRLOG("can't open %s, rc %d\n", buf, rc);
1712 		goto err;
1713 	}
1714 
1715 	for (q_id = 0; q_id < ublk->num_queues; q_id++) {
1716 		rc = ublk_dev_queue_init(&ublk->queues[q_id]);
1717 		if (rc) {
1718 			goto err;
1719 		}
1720 	}
1721 
1722 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_START_DEV);
1723 	if (rc < 0) {
1724 		SPDK_ERRLOG("start dev %d failed, rc %s\n", ublk->ublk_id,
1725 			    spdk_strerror(-rc));
1726 		goto err;
1727 	}
1728 
1729 	/* Send queue to different spdk_threads for load balance */
1730 	for (q_id = 0; q_id < ublk->num_queues; q_id++) {
1731 		ublk->queues[q_id].poll_group = &g_ublk_tgt.poll_groups[g_next_ublk_poll_group];
1732 		ublk_thread = g_ublk_tgt.poll_groups[g_next_ublk_poll_group].ublk_thread;
1733 		spdk_thread_send_msg(ublk_thread, ublk_queue_run, &ublk->queues[q_id]);
1734 		g_next_ublk_poll_group++;
1735 		if (g_next_ublk_poll_group == g_num_ublk_poll_groups) {
1736 			g_next_ublk_poll_group = 0;
1737 		}
1738 	}
1739 
1740 	goto out;
1741 
1742 err:
1743 	ublk_delete_dev(ublk);
1744 out:
1745 	if (ublk->start_cb) {
1746 		ublk->start_cb(ublk->cb_arg, rc);
1747 		ublk->start_cb = NULL;
1748 	}
1749 }
1750 
1751 SPDK_LOG_REGISTER_COMPONENT(ublk)
1752 SPDK_LOG_REGISTER_COMPONENT(ublk_io)
1753