xref: /spdk/lib/ublk/ublk.c (revision 07d28d02f73bbcd7732a5421bcaebfb067b46ca0)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2022 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include <linux/ublk_cmd.h>
7 #include <liburing.h>
8 
9 #include "spdk/stdinc.h"
10 #include "spdk/string.h"
11 #include "spdk/bdev.h"
12 #include "spdk/endian.h"
13 #include "spdk/env.h"
14 #include "spdk/likely.h"
15 #include "spdk/log.h"
16 #include "spdk/util.h"
17 #include "spdk/queue.h"
18 #include "spdk/json.h"
19 #include "spdk/ublk.h"
20 #include "spdk/thread.h"
21 
22 #include "ublk_internal.h"
23 
24 #define UBLK_CTRL_DEV					"/dev/ublk-control"
25 #define UBLK_BLK_CDEV					"/dev/ublkc"
26 
27 #define LINUX_SECTOR_SHIFT				9
28 #define UBLK_POLL_GROUP_MAX				128
29 #define UBLK_IO_MAX_BYTES				SPDK_BDEV_LARGE_BUF_MAX_SIZE
30 #define UBLK_DEV_MAX_QUEUES				32
31 #define UBLK_DEV_MAX_QUEUE_DEPTH			1024
32 #define UBLK_QUEUE_REQUEST				32
33 #define UBLK_STOP_BUSY_WAITING_MS			10000
34 #define UBLK_BUSY_POLLING_INTERVAL_US			20000
35 #define UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US	1000
36 /* By default, kernel ublk_drv driver can support up to 64 block devices */
37 #define UBLK_DEFAULT_MAX_SUPPORTED_DEVS			64
38 
39 #define UBLK_IOBUF_SMALL_CACHE_SIZE			128
40 #define UBLK_IOBUF_LARGE_CACHE_SIZE			32
41 
42 #define UBLK_DEBUGLOG(ublk, format, ...) \
43 	SPDK_DEBUGLOG(ublk, "ublk%d: " format, ublk->ublk_id, ##__VA_ARGS__);
44 
45 static uint32_t g_num_ublk_poll_groups = 0;
46 static uint32_t g_next_ublk_poll_group = 0;
47 static uint32_t g_ublks_max = UBLK_DEFAULT_MAX_SUPPORTED_DEVS;
48 static struct spdk_cpuset g_core_mask;
49 
50 struct ublk_queue;
51 struct ublk_poll_group;
52 static void ublk_submit_bdev_io(struct ublk_queue *q, uint16_t tag);
53 static void ublk_dev_queue_fini(struct ublk_queue *q);
54 static int ublk_poll(void *arg);
55 static int ublk_ctrl_cmd(struct spdk_ublk_dev *ublk, uint32_t cmd_op);
56 
57 typedef void (*ublk_next_state_fn)(struct spdk_ublk_dev *ublk);
58 static void ublk_set_params(struct spdk_ublk_dev *ublk);
59 static void ublk_finish_start(struct spdk_ublk_dev *ublk);
60 static void ublk_free_dev(struct spdk_ublk_dev *ublk);
61 
62 static const char *ublk_op_name[64]
63 __attribute__((unused)) = {
64 	[UBLK_CMD_ADD_DEV] =	"UBLK_CMD_ADD_DEV",
65 	[UBLK_CMD_DEL_DEV] =	"UBLK_CMD_DEL_DEV",
66 	[UBLK_CMD_START_DEV] =	"UBLK_CMD_START_DEV",
67 	[UBLK_CMD_STOP_DEV] =	"UBLK_CMD_STOP_DEV",
68 	[UBLK_CMD_SET_PARAMS] =	"UBLK_CMD_SET_PARAMS",
69 };
70 
71 struct ublk_io;
72 typedef void (*ublk_get_buf_cb)(struct ublk_io *io);
73 
74 struct ublk_io {
75 	void			*payload;
76 	void			*mpool_entry;
77 	bool			need_data;
78 	uint32_t		sector_per_block_shift;
79 	uint32_t		payload_size;
80 	uint32_t		cmd_op;
81 	int32_t			result;
82 	struct spdk_bdev_desc	*bdev_desc;
83 	struct spdk_io_channel	*bdev_ch;
84 	const struct ublksrv_io_desc	*iod;
85 	ublk_get_buf_cb		get_buf_cb;
86 	struct ublk_queue	*q;
87 	/* for bdev io_wait */
88 	struct spdk_bdev_io_wait_entry bdev_io_wait;
89 	struct spdk_iobuf_entry	iobuf;
90 
91 	TAILQ_ENTRY(ublk_io)	tailq;
92 };
93 
94 struct ublk_queue {
95 	uint32_t		q_id;
96 	uint32_t		q_depth;
97 	struct ublk_io		*ios;
98 	TAILQ_HEAD(, ublk_io)	completed_io_list;
99 	TAILQ_HEAD(, ublk_io)	inflight_io_list;
100 	uint32_t		cmd_inflight;
101 	bool			is_stopping;
102 	struct ublksrv_io_desc	*io_cmd_buf;
103 	/* ring depth == dev_info->queue_depth. */
104 	struct io_uring		ring;
105 	struct spdk_ublk_dev	*dev;
106 	struct ublk_poll_group	*poll_group;
107 	struct spdk_io_channel	*bdev_ch;
108 
109 	TAILQ_ENTRY(ublk_queue)	tailq;
110 };
111 
112 struct spdk_ublk_dev {
113 	struct spdk_bdev	*bdev;
114 	struct spdk_bdev_desc	*bdev_desc;
115 
116 	int			cdev_fd;
117 	struct ublk_params	dev_params;
118 	struct ublksrv_ctrl_dev_info	dev_info;
119 
120 	uint32_t		ublk_id;
121 	uint32_t		num_queues;
122 	uint32_t		queue_depth;
123 	uint32_t		sector_per_block_shift;
124 	struct ublk_queue	queues[UBLK_DEV_MAX_QUEUES];
125 
126 	struct spdk_poller	*retry_poller;
127 	int			retry_count;
128 	uint32_t		queues_closed;
129 	ublk_start_cb		start_cb;
130 	ublk_del_cb		del_cb;
131 	void			*cb_arg;
132 	uint32_t		ctrl_cmd_op;
133 	ublk_next_state_fn	next_state_fn;
134 	uint32_t		ctrl_ops_in_progress;
135 	bool			is_closing;
136 
137 	TAILQ_ENTRY(spdk_ublk_dev) tailq;
138 	TAILQ_ENTRY(spdk_ublk_dev) wait_tailq;
139 };
140 
141 struct ublk_poll_group {
142 	struct spdk_thread		*ublk_thread;
143 	struct spdk_poller		*ublk_poller;
144 	struct spdk_iobuf_channel	iobuf_ch;
145 	TAILQ_HEAD(, ublk_queue)	queue_list;
146 };
147 
148 struct ublk_tgt {
149 	int			ctrl_fd;
150 	bool			active;
151 	bool			is_destroying;
152 	spdk_ublk_fini_cb	cb_fn;
153 	void			*cb_arg;
154 	struct io_uring		ctrl_ring;
155 	struct spdk_poller	*ctrl_poller;
156 	uint32_t		ctrl_ops_in_progress;
157 	struct ublk_poll_group	poll_group[UBLK_POLL_GROUP_MAX];
158 	uint32_t		num_ublk_devs;
159 };
160 
161 static TAILQ_HEAD(, spdk_ublk_dev) g_ublk_devs = TAILQ_HEAD_INITIALIZER(g_ublk_devs);
162 static struct ublk_tgt g_ublk_tgt;
163 
164 /* helpers for using io_uring */
165 static inline int
166 ublk_setup_ring(uint32_t depth, struct io_uring *r, unsigned flags)
167 {
168 	struct io_uring_params p = {};
169 
170 	p.flags = flags | IORING_SETUP_CQSIZE;
171 	p.cq_entries = depth;
172 
173 	return io_uring_queue_init_params(depth, r, &p);
174 }
175 
176 static inline struct io_uring_sqe *
177 ublk_uring_get_sqe(struct io_uring *r, uint32_t idx)
178 {
179 	/* Need to update the idx since we set IORING_SETUP_SQE128 parameter in ublk_setup_ring */
180 	return &r->sq.sqes[idx << 1];
181 }
182 
183 static inline void *
184 ublk_get_sqe_cmd(struct io_uring_sqe *sqe)
185 {
186 	return (void *)&sqe->addr3;
187 }
188 
189 static inline void
190 ublk_set_sqe_cmd_op(struct io_uring_sqe *sqe, uint32_t cmd_op)
191 {
192 	sqe->off = cmd_op;
193 }
194 
195 static inline uint64_t
196 build_user_data(uint16_t tag, uint8_t op)
197 {
198 	assert(!(tag >> 16) && !(op >> 8));
199 
200 	return tag | (op << 16);
201 }
202 
203 static inline uint16_t
204 user_data_to_tag(uint64_t user_data)
205 {
206 	return user_data & 0xffff;
207 }
208 
209 static inline uint8_t
210 user_data_to_op(uint64_t user_data)
211 {
212 	return (user_data >> 16) & 0xff;
213 }
214 
215 void
216 spdk_ublk_init(void)
217 {
218 	assert(spdk_thread_is_app_thread(NULL));
219 
220 	g_ublk_tgt.ctrl_fd = -1;
221 	g_ublk_tgt.ctrl_ring.ring_fd = -1;
222 }
223 
224 static int
225 ublk_ctrl_poller(void *arg)
226 {
227 	struct io_uring *ring = &g_ublk_tgt.ctrl_ring;
228 	struct spdk_ublk_dev *ublk;
229 	struct io_uring_cqe *cqe;
230 	const int max = 8;
231 	int i, count = 0, rc;
232 
233 	if (!g_ublk_tgt.ctrl_ops_in_progress) {
234 		return SPDK_POLLER_IDLE;
235 	}
236 
237 	for (i = 0; i < max; i++) {
238 		rc = io_uring_peek_cqe(ring, &cqe);
239 		if (rc == -EAGAIN) {
240 			break;
241 		}
242 
243 		assert(cqe != NULL);
244 		g_ublk_tgt.ctrl_ops_in_progress--;
245 		ublk = (struct spdk_ublk_dev *)cqe->user_data;
246 		UBLK_DEBUGLOG(ublk, "ctrl cmd completed\n");
247 		ublk->ctrl_ops_in_progress--;
248 		if (ublk->next_state_fn) {
249 			ublk->next_state_fn(ublk);
250 		}
251 		io_uring_cqe_seen(ring, cqe);
252 		count++;
253 	}
254 
255 	return count > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
256 }
257 
258 static int
259 ublk_ctrl_cmd(struct spdk_ublk_dev *ublk, uint32_t cmd_op)
260 {
261 	uint32_t dev_id = ublk->ublk_id;
262 	int rc = -EINVAL;
263 	struct io_uring_sqe *sqe;
264 	struct ublksrv_ctrl_cmd *cmd;
265 
266 	UBLK_DEBUGLOG(ublk, "ctrl cmd %s\n", ublk_op_name[cmd_op]);
267 
268 	ublk->ctrl_cmd_op = cmd_op;
269 	sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring);
270 	if (!sqe) {
271 		SPDK_ERRLOG("No available sqe in ctrl ring\n");
272 		assert(false);
273 		return -ENOENT;
274 	}
275 
276 	cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe);
277 	sqe->fd = g_ublk_tgt.ctrl_fd;
278 	sqe->opcode = IORING_OP_URING_CMD;
279 	sqe->ioprio = 0;
280 	cmd->dev_id = dev_id;
281 	cmd->queue_id = -1;
282 	ublk->next_state_fn = NULL;
283 
284 	switch (cmd_op) {
285 	case UBLK_CMD_ADD_DEV:
286 		ublk->next_state_fn = ublk_set_params;
287 		cmd->addr = (__u64)(uintptr_t)&ublk->dev_info;
288 		cmd->len = sizeof(ublk->dev_info);
289 		break;
290 	case UBLK_CMD_SET_PARAMS:
291 		ublk->next_state_fn = ublk_finish_start;
292 		cmd->addr = (__u64)(uintptr_t)&ublk->dev_params;
293 		cmd->len = sizeof(ublk->dev_params);
294 		break;
295 	case UBLK_CMD_START_DEV:
296 		cmd->data[0] = getpid();
297 		cmd->data[1] = 0;
298 		break;
299 	case UBLK_CMD_STOP_DEV:
300 		break;
301 	case UBLK_CMD_DEL_DEV:
302 		ublk->next_state_fn = ublk_free_dev;
303 		break;
304 	default:
305 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", cmd_op);
306 		return -EINVAL;
307 	}
308 	ublk_set_sqe_cmd_op(sqe, cmd_op);
309 	io_uring_sqe_set_data(sqe, ublk);
310 
311 	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
312 	if (rc < 0) {
313 		SPDK_ERRLOG("uring submit rc %d\n", rc);
314 		return rc;
315 	}
316 	g_ublk_tgt.ctrl_ops_in_progress++;
317 	ublk->ctrl_ops_in_progress++;
318 
319 	return 0;
320 }
321 
322 static int
323 ublk_queue_cmd_buf_sz(uint32_t q_depth)
324 {
325 	uint32_t size = q_depth * sizeof(struct ublksrv_io_desc);
326 	uint32_t page_sz = getpagesize();
327 
328 	/* round up size */
329 	return (size + page_sz - 1) & ~(page_sz - 1);
330 }
331 
332 static int
333 ublk_get_max_support_devs(void)
334 {
335 	FILE *file;
336 	char str[128];
337 
338 	file = fopen("/sys/module/ublk_drv/parameters/ublks_max", "r");
339 	if (!file) {
340 		return -ENOENT;
341 	}
342 
343 	if (!fgets(str, sizeof(str), file)) {
344 		fclose(file);
345 		return -EINVAL;
346 	}
347 	fclose(file);
348 
349 	spdk_str_chomp(str);
350 	return spdk_strtol(str, 10);
351 }
352 
353 static int
354 ublk_open(void)
355 {
356 	int rc, ublks_max;
357 
358 	g_ublk_tgt.ctrl_fd = open(UBLK_CTRL_DEV, O_RDWR);
359 	if (g_ublk_tgt.ctrl_fd < 0) {
360 		rc = errno;
361 		SPDK_ERRLOG("UBLK conrol dev %s can't be opened, error=%s\n", UBLK_CTRL_DEV, spdk_strerror(errno));
362 		return -rc;
363 	}
364 
365 	ublks_max = ublk_get_max_support_devs();
366 	if (ublks_max > 0) {
367 		g_ublks_max = ublks_max;
368 	}
369 
370 	/* We need to set SQPOLL for kernels 6.1 and earlier, since they would not defer ublk ctrl
371 	 * ring processing to a workqueue.  Ctrl ring processing is minimal, so SQPOLL is fine.
372 	 * All the commands sent via control uring for a ublk device is executed one by one, so use
373 	 * ublks_max * 2 as the number of uring entries is enough.
374 	 */
375 	rc = ublk_setup_ring(g_ublks_max * 2, &g_ublk_tgt.ctrl_ring,
376 			     IORING_SETUP_SQE128 | IORING_SETUP_SQPOLL);
377 	if (rc < 0) {
378 		SPDK_ERRLOG("UBLK ctrl queue_init: %s\n", spdk_strerror(-rc));
379 		close(g_ublk_tgt.ctrl_fd);
380 		g_ublk_tgt.ctrl_fd = -1;
381 		return rc;
382 	}
383 
384 	return 0;
385 }
386 
387 static int
388 ublk_parse_core_mask(const char *mask)
389 {
390 	struct spdk_cpuset tmp_mask;
391 	int rc;
392 
393 	if (mask == NULL) {
394 		spdk_env_get_cpuset(&g_core_mask);
395 		return 0;
396 	}
397 
398 	rc = spdk_cpuset_parse(&g_core_mask, mask);
399 	if (rc < 0) {
400 		SPDK_ERRLOG("invalid cpumask %s\n", mask);
401 		return -EINVAL;
402 	}
403 
404 	if (spdk_cpuset_count(&g_core_mask) == 0) {
405 		SPDK_ERRLOG("no cpus specified\n");
406 		return -EINVAL;
407 	}
408 
409 	spdk_env_get_cpuset(&tmp_mask);
410 	spdk_cpuset_and(&tmp_mask, &g_core_mask);
411 
412 	if (!spdk_cpuset_equal(&tmp_mask, &g_core_mask)) {
413 		SPDK_ERRLOG("one of selected cpu is outside of core mask(=%s)\n",
414 			    spdk_cpuset_fmt(&g_core_mask));
415 		return -EINVAL;
416 	}
417 
418 	return 0;
419 }
420 
421 static void
422 ublk_poller_register(void *args)
423 {
424 	struct ublk_poll_group *poll_group = args;
425 	int rc;
426 
427 	assert(spdk_get_thread() == poll_group->ublk_thread);
428 	/* Bind ublk spdk_thread to current CPU core in order to avoid thread context switch
429 	 * during uring processing as required by ublk kernel.
430 	 */
431 	spdk_thread_bind(spdk_get_thread(), true);
432 
433 	TAILQ_INIT(&poll_group->queue_list);
434 	poll_group->ublk_poller = SPDK_POLLER_REGISTER(ublk_poll, poll_group, 0);
435 	rc = spdk_iobuf_channel_init(&poll_group->iobuf_ch, "ublk",
436 				     UBLK_IOBUF_SMALL_CACHE_SIZE, UBLK_IOBUF_LARGE_CACHE_SIZE);
437 	if (rc != 0) {
438 		assert(false);
439 	}
440 }
441 
442 int
443 ublk_create_target(const char *cpumask_str)
444 {
445 	int rc;
446 	uint32_t i;
447 	char thread_name[32];
448 	struct ublk_poll_group *poll_group;
449 
450 	if (g_ublk_tgt.active == true) {
451 		SPDK_ERRLOG("UBLK target has been created\n");
452 		return -EBUSY;
453 	}
454 
455 	rc = ublk_parse_core_mask(cpumask_str);
456 	if (rc != 0) {
457 		return rc;
458 	}
459 
460 	rc = ublk_open();
461 	if (rc != 0) {
462 		SPDK_ERRLOG("Fail to open UBLK, error=%s\n", spdk_strerror(-rc));
463 		return rc;
464 	}
465 
466 	spdk_iobuf_register_module("ublk");
467 
468 	SPDK_ENV_FOREACH_CORE(i) {
469 		if (!spdk_cpuset_get_cpu(&g_core_mask, i)) {
470 			continue;
471 		}
472 		snprintf(thread_name, sizeof(thread_name), "ublk_thread%u", i);
473 		poll_group = &g_ublk_tgt.poll_group[g_num_ublk_poll_groups];
474 		poll_group->ublk_thread = spdk_thread_create(thread_name, &g_core_mask);
475 		spdk_thread_send_msg(poll_group->ublk_thread, ublk_poller_register, poll_group);
476 		g_num_ublk_poll_groups++;
477 	}
478 
479 	assert(spdk_thread_is_app_thread(NULL));
480 	g_ublk_tgt.active = true;
481 	g_ublk_tgt.ctrl_ops_in_progress = 0;
482 	g_ublk_tgt.ctrl_poller = SPDK_POLLER_REGISTER(ublk_ctrl_poller, NULL,
483 				 UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US);
484 
485 	SPDK_NOTICELOG("UBLK target created successfully\n");
486 
487 	return 0;
488 }
489 
490 static void
491 _ublk_fini_done(void *args)
492 {
493 	SPDK_DEBUGLOG(ublk, "\n");
494 	g_num_ublk_poll_groups = 0;
495 	g_next_ublk_poll_group = 0;
496 	g_ublk_tgt.is_destroying = false;
497 	g_ublk_tgt.active = false;
498 	if (g_ublk_tgt.cb_fn) {
499 		g_ublk_tgt.cb_fn(g_ublk_tgt.cb_arg);
500 		g_ublk_tgt.cb_fn = NULL;
501 		g_ublk_tgt.cb_arg = NULL;
502 	}
503 }
504 
505 static void
506 ublk_thread_exit(void *args)
507 {
508 	struct spdk_thread *ublk_thread = spdk_get_thread();
509 	uint32_t i;
510 
511 	for (i = 0; i < g_num_ublk_poll_groups; i++) {
512 		if (g_ublk_tgt.poll_group[i].ublk_thread == ublk_thread) {
513 			spdk_poller_unregister(&g_ublk_tgt.poll_group[i].ublk_poller);
514 			spdk_iobuf_channel_fini(&g_ublk_tgt.poll_group[i].iobuf_ch);
515 			spdk_thread_bind(ublk_thread, false);
516 			spdk_thread_exit(ublk_thread);
517 		}
518 	}
519 }
520 
521 static int
522 ublk_close_dev(struct spdk_ublk_dev *ublk)
523 {
524 	int rc;
525 
526 	/* set is_closing */
527 	if (ublk->is_closing) {
528 		return -EBUSY;
529 	}
530 	ublk->is_closing = true;
531 
532 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_STOP_DEV);
533 	if (rc < 0) {
534 		SPDK_ERRLOG("stop dev %d failed\n", ublk->ublk_id);
535 	}
536 	return rc;
537 }
538 
539 static void
540 _ublk_fini(void *args)
541 {
542 	struct spdk_ublk_dev	*ublk, *ublk_tmp;
543 
544 	TAILQ_FOREACH_SAFE(ublk, &g_ublk_devs, tailq, ublk_tmp) {
545 		ublk_close_dev(ublk);
546 	}
547 
548 	/* Check if all ublks closed */
549 	if (TAILQ_EMPTY(&g_ublk_devs)) {
550 		SPDK_DEBUGLOG(ublk, "finish shutdown\n");
551 		spdk_poller_unregister(&g_ublk_tgt.ctrl_poller);
552 		if (g_ublk_tgt.ctrl_ring.ring_fd >= 0) {
553 			io_uring_queue_exit(&g_ublk_tgt.ctrl_ring);
554 			g_ublk_tgt.ctrl_ring.ring_fd = -1;
555 		}
556 		if (g_ublk_tgt.ctrl_fd >= 0) {
557 			close(g_ublk_tgt.ctrl_fd);
558 			g_ublk_tgt.ctrl_fd = -1;
559 		}
560 		spdk_for_each_thread(ublk_thread_exit, NULL, _ublk_fini_done);
561 	} else {
562 		spdk_thread_send_msg(spdk_get_thread(), _ublk_fini, NULL);
563 	}
564 }
565 
566 int
567 spdk_ublk_fini(spdk_ublk_fini_cb cb_fn, void *cb_arg)
568 {
569 	assert(spdk_thread_is_app_thread(NULL));
570 
571 	if (g_ublk_tgt.is_destroying == true) {
572 		/* UBLK target is being destroying */
573 		return -EBUSY;
574 	}
575 	g_ublk_tgt.cb_fn = cb_fn;
576 	g_ublk_tgt.cb_arg = cb_arg;
577 	g_ublk_tgt.is_destroying = true;
578 	_ublk_fini(NULL);
579 
580 	return 0;
581 }
582 
583 int
584 ublk_destroy_target(spdk_ublk_fini_cb cb_fn, void *cb_arg)
585 {
586 	int rc;
587 
588 	if (g_ublk_tgt.active == false) {
589 		/* UBLK target has not been created */
590 		return -ENOENT;
591 	}
592 
593 	rc = spdk_ublk_fini(cb_fn, cb_arg);
594 
595 	return rc;
596 }
597 
598 struct spdk_ublk_dev *
599 ublk_dev_find_by_id(uint32_t ublk_id)
600 {
601 	struct spdk_ublk_dev *ublk;
602 
603 	/* check whether ublk has already been registered by ublk path. */
604 	TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) {
605 		if (ublk->ublk_id == ublk_id) {
606 			return ublk;
607 		}
608 	}
609 
610 	return NULL;
611 }
612 
613 uint32_t
614 ublk_dev_get_id(struct spdk_ublk_dev *ublk)
615 {
616 	return ublk->ublk_id;
617 }
618 
619 struct spdk_ublk_dev *ublk_dev_first(void)
620 {
621 	return TAILQ_FIRST(&g_ublk_devs);
622 }
623 
624 struct spdk_ublk_dev *ublk_dev_next(struct spdk_ublk_dev *prev)
625 {
626 	return TAILQ_NEXT(prev, tailq);
627 }
628 
629 uint32_t
630 ublk_dev_get_queue_depth(struct spdk_ublk_dev *ublk)
631 {
632 	return ublk->queue_depth;
633 }
634 
635 uint32_t
636 ublk_dev_get_num_queues(struct spdk_ublk_dev *ublk)
637 {
638 	return ublk->num_queues;
639 }
640 
641 const char *
642 ublk_dev_get_bdev_name(struct spdk_ublk_dev *ublk)
643 {
644 	return spdk_bdev_get_name(ublk->bdev);
645 }
646 
647 void
648 spdk_ublk_write_config_json(struct spdk_json_write_ctx *w)
649 {
650 	struct spdk_ublk_dev *ublk;
651 
652 	spdk_json_write_array_begin(w);
653 
654 	if (g_ublk_tgt.active) {
655 		spdk_json_write_object_begin(w);
656 
657 		spdk_json_write_named_string(w, "method", "ublk_create_target");
658 		spdk_json_write_named_object_begin(w, "params");
659 		spdk_json_write_named_string(w, "cpumask", spdk_cpuset_fmt(&g_core_mask));
660 		spdk_json_write_object_end(w);
661 
662 		spdk_json_write_object_end(w);
663 	}
664 
665 	TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) {
666 		spdk_json_write_object_begin(w);
667 
668 		spdk_json_write_named_string(w, "method", "ublk_start_disk");
669 
670 		spdk_json_write_named_object_begin(w, "params");
671 		spdk_json_write_named_string(w, "bdev_name", ublk_dev_get_bdev_name(ublk));
672 		spdk_json_write_named_uint32(w, "ublk_id", ublk->ublk_id);
673 		spdk_json_write_named_uint32(w, "num_queues", ublk->num_queues);
674 		spdk_json_write_named_uint32(w, "queue_depth", ublk->queue_depth);
675 		spdk_json_write_object_end(w);
676 
677 		spdk_json_write_object_end(w);
678 	}
679 
680 	spdk_json_write_array_end(w);
681 }
682 
683 static void
684 ublk_dev_list_register(struct spdk_ublk_dev *ublk)
685 {
686 	UBLK_DEBUGLOG(ublk, "add to tailq\n");
687 	TAILQ_INSERT_TAIL(&g_ublk_devs, ublk, tailq);
688 	g_ublk_tgt.num_ublk_devs++;
689 }
690 
691 static void
692 ublk_dev_list_unregister(struct spdk_ublk_dev *ublk)
693 {
694 	/*
695 	 * ublk device may be stopped before registered.
696 	 * check whether it was registered.
697 	 */
698 
699 	if (ublk_dev_find_by_id(ublk->ublk_id)) {
700 		UBLK_DEBUGLOG(ublk, "remove from tailq\n");
701 		TAILQ_REMOVE(&g_ublk_devs, ublk, tailq);
702 		assert(g_ublk_tgt.num_ublk_devs);
703 		g_ublk_tgt.num_ublk_devs--;
704 		return;
705 	}
706 
707 	UBLK_DEBUGLOG(ublk, "not found in tailq\n");
708 	assert(false);
709 }
710 
711 static void
712 ublk_delete_dev(void *arg)
713 {
714 	struct spdk_ublk_dev *ublk = arg;
715 	int rc = 0;
716 	uint32_t q_idx;
717 
718 	assert(spdk_thread_is_app_thread(NULL));
719 	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
720 		ublk_dev_queue_fini(&ublk->queues[q_idx]);
721 	}
722 
723 	if (ublk->cdev_fd >= 0) {
724 		close(ublk->cdev_fd);
725 	}
726 
727 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_DEL_DEV);
728 	if (rc < 0) {
729 		SPDK_ERRLOG("delete dev %d failed\n", ublk->ublk_id);
730 	}
731 }
732 
733 static int
734 _ublk_close_dev_retry(void *arg)
735 {
736 	struct spdk_ublk_dev *ublk = arg;
737 
738 	if (ublk->ctrl_ops_in_progress > 0) {
739 		if (ublk->retry_count-- > 0) {
740 			return SPDK_POLLER_BUSY;
741 		}
742 		SPDK_ERRLOG("Timeout on ctrl op completion.\n");
743 	}
744 	spdk_poller_unregister(&ublk->retry_poller);
745 	ublk_delete_dev(ublk);
746 	return SPDK_POLLER_BUSY;
747 }
748 
749 static void
750 ublk_try_close_dev(void *arg)
751 {
752 	struct spdk_ublk_dev *ublk = arg;
753 
754 	assert(spdk_thread_is_app_thread(NULL));
755 	ublk->queues_closed += 1;
756 	if (ublk->queues_closed < ublk->num_queues) {
757 		return;
758 	}
759 
760 	if (ublk->ctrl_ops_in_progress > 0) {
761 		assert(ublk->retry_poller == NULL);
762 		ublk->retry_count = UBLK_STOP_BUSY_WAITING_MS * 1000ULL / UBLK_BUSY_POLLING_INTERVAL_US;
763 		ublk->retry_poller = SPDK_POLLER_REGISTER(_ublk_close_dev_retry, ublk,
764 				     UBLK_BUSY_POLLING_INTERVAL_US);
765 	} else {
766 		ublk_delete_dev(ublk);
767 	}
768 }
769 
770 static void
771 ublk_try_close_queue(struct ublk_queue *q)
772 {
773 	struct spdk_ublk_dev *ublk = q->dev;
774 
775 	/* Close queue until no I/O is submitted to bdev in flight,
776 	 * no I/O is waiting to commit result, and all I/Os are aborted back.
777 	 */
778 	if (!TAILQ_EMPTY(&q->inflight_io_list) || !TAILQ_EMPTY(&q->completed_io_list) || q->cmd_inflight) {
779 		/* wait for next retry */
780 		return;
781 	}
782 
783 	TAILQ_REMOVE(&q->poll_group->queue_list, q, tailq);
784 	spdk_put_io_channel(q->bdev_ch);
785 	q->bdev_ch = NULL;
786 
787 	spdk_thread_send_msg(spdk_thread_get_app_thread(), ublk_try_close_dev, ublk);
788 }
789 
790 int
791 ublk_stop_disk(uint32_t ublk_id, ublk_del_cb del_cb, void *cb_arg)
792 {
793 	struct spdk_ublk_dev *ublk;
794 
795 	assert(spdk_thread_is_app_thread(NULL));
796 
797 	ublk = ublk_dev_find_by_id(ublk_id);
798 	if (ublk == NULL) {
799 		SPDK_ERRLOG("no ublk dev with ublk_id=%u\n", ublk_id);
800 		return -ENODEV;
801 	}
802 	if (ublk->is_closing) {
803 		SPDK_WARNLOG("ublk %d is closing\n", ublk->ublk_id);
804 		return -EBUSY;
805 	}
806 
807 	ublk->del_cb = del_cb;
808 	ublk->cb_arg = cb_arg;
809 	return ublk_close_dev(ublk);
810 }
811 
812 static inline void
813 ublk_mark_io_get_data(struct ublk_io *io)
814 {
815 	io->cmd_op = UBLK_IO_NEED_GET_DATA;
816 	io->result = 0;
817 }
818 
819 static inline void
820 ublk_mark_io_done(struct ublk_io *io, int res)
821 {
822 	/*
823 	 * mark io done by target, so that SPDK can commit its
824 	 * result and fetch new request via io_uring command.
825 	 */
826 	io->cmd_op = UBLK_IO_COMMIT_AND_FETCH_REQ;
827 	io->result = res;
828 }
829 
830 static void
831 ublk_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
832 {
833 	struct ublk_io	*io = cb_arg;
834 	struct ublk_queue *q = io->q;
835 	int res, tag;
836 
837 	if (success) {
838 		res = io->result;
839 	} else {
840 		res = -EIO;
841 	}
842 
843 	ublk_mark_io_done(io, res);
844 	tag = (int)(io - q->ios);
845 	q->ios[tag].need_data = false;
846 
847 	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %d res %d)\n",
848 		      q->q_id, tag, res);
849 	TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
850 	TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq);
851 
852 	if (bdev_io != NULL) {
853 		spdk_bdev_free_io(bdev_io);
854 	}
855 }
856 
857 static void
858 ublk_resubmit_io(void *arg)
859 {
860 	struct ublk_io *io = (struct ublk_io *)arg;
861 	uint16_t tag = (io - io->q->ios);
862 
863 	ublk_submit_bdev_io(io->q, tag);
864 }
865 
866 static void
867 ublk_queue_io(struct ublk_io *io)
868 {
869 	int rc;
870 	struct spdk_bdev *bdev = io->q->dev->bdev;
871 	struct ublk_queue *q = io->q;
872 
873 	io->bdev_io_wait.bdev = bdev;
874 	io->bdev_io_wait.cb_fn = ublk_resubmit_io;
875 	io->bdev_io_wait.cb_arg = io;
876 
877 	rc = spdk_bdev_queue_io_wait(bdev, q->bdev_ch, &io->bdev_io_wait);
878 	if (rc != 0) {
879 		SPDK_ERRLOG("Queue io failed in ublk_queue_io, rc=%d.\n", rc);
880 		ublk_io_done(NULL, false, io);
881 	}
882 }
883 
884 static void
885 ublk_io_get_buffer_cb(struct spdk_iobuf_entry *iobuf, void *buf)
886 {
887 	struct ublk_io *io = SPDK_CONTAINEROF(iobuf, struct ublk_io, iobuf);
888 
889 	io->mpool_entry = buf;
890 	io->payload = (void *)(uintptr_t)SPDK_ALIGN_CEIL((uintptr_t)buf, 4096ULL);
891 	io->get_buf_cb(io);
892 }
893 
894 static void
895 ublk_io_get_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch,
896 		   ublk_get_buf_cb get_buf_cb)
897 {
898 	uint64_t io_size;
899 	void *buf;
900 
901 	io_size = io->iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
902 	io->get_buf_cb = get_buf_cb;
903 	buf = spdk_iobuf_get(iobuf_ch, io_size, &io->iobuf, ublk_io_get_buffer_cb);
904 	if (buf != NULL) {
905 		ublk_io_get_buffer_cb(&io->iobuf, buf);
906 	}
907 }
908 
909 static void
910 ublk_io_put_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch)
911 {
912 	uint64_t io_size;
913 
914 	if (io->payload) {
915 		io_size = io->iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
916 		spdk_iobuf_put(iobuf_ch, io->mpool_entry, io_size);
917 		io->mpool_entry = NULL;
918 		io->payload = NULL;
919 	}
920 }
921 
922 static void
923 read_get_buffer_done(struct ublk_io *io)
924 {
925 	struct spdk_bdev_desc *desc = io->bdev_desc;
926 	struct spdk_io_channel *ch = io->bdev_ch;
927 	uint64_t offset_blocks, num_blocks;
928 	int rc = 0;
929 	const struct ublksrv_io_desc *iod = io->iod;
930 
931 	offset_blocks = iod->start_sector >> io->sector_per_block_shift;
932 	num_blocks = iod->nr_sectors >> io->sector_per_block_shift;
933 
934 	rc = spdk_bdev_read_blocks(desc, ch, io->payload, offset_blocks, num_blocks, ublk_io_done, io);
935 	if (rc == -ENOMEM) {
936 		SPDK_INFOLOG(ublk, "No memory, start to queue io.\n");
937 		ublk_queue_io(io);
938 	} else if (rc < 0) {
939 		SPDK_ERRLOG("ublk io failed in ublk_queue_io, rc=%d.\n", rc);
940 		ublk_io_done(NULL, false, io);
941 	}
942 }
943 
944 static void
945 ublk_submit_bdev_io(struct ublk_queue *q, uint16_t tag)
946 {
947 	struct spdk_ublk_dev *ublk = q->dev;
948 	struct ublk_io *io = &q->ios[tag];
949 	struct spdk_bdev_desc *desc = io->bdev_desc;
950 	struct spdk_io_channel *ch = io->bdev_ch;
951 	struct spdk_iobuf_channel *iobuf_ch = &q->poll_group->iobuf_ch;
952 	uint64_t offset_blocks, num_blocks;
953 	uint8_t ublk_op;
954 	int rc = 0;
955 	const struct ublksrv_io_desc *iod = io->iod;
956 
957 	ublk_op = ublksrv_get_op(iod);
958 	offset_blocks = iod->start_sector >> ublk->sector_per_block_shift;
959 	num_blocks = iod->nr_sectors >> ublk->sector_per_block_shift;
960 
961 	io->result = num_blocks * spdk_bdev_get_data_block_size(ublk->bdev);
962 	switch (ublk_op) {
963 	case UBLK_IO_OP_READ:
964 		ublk_io_get_buffer(io, iobuf_ch, read_get_buffer_done);
965 		return;
966 	case UBLK_IO_OP_WRITE:
967 		assert((void *)iod->addr == io->payload);
968 		rc = spdk_bdev_write_blocks(desc, ch, io->payload, offset_blocks, num_blocks, ublk_io_done, io);
969 		break;
970 	case UBLK_IO_OP_FLUSH:
971 		rc = spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
972 		break;
973 	case UBLK_IO_OP_DISCARD:
974 		rc = spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
975 		break;
976 	case UBLK_IO_OP_WRITE_ZEROES:
977 		rc = spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
978 		break;
979 	default:
980 		rc = -1;
981 	}
982 
983 	if (rc < 0) {
984 		if (rc == -ENOMEM) {
985 			SPDK_INFOLOG(ublk, "No memory, start to queue io.\n");
986 			ublk_queue_io(io);
987 		} else {
988 			SPDK_ERRLOG("ublk io failed in ublk_queue_io, rc=%d.\n", rc);
989 			ublk_io_done(NULL, false, io);
990 		}
991 	}
992 }
993 
994 static inline void
995 ublksrv_queue_io_cmd(struct ublk_queue *q,
996 		     struct ublk_io *io, unsigned tag)
997 {
998 	struct ublksrv_io_cmd *cmd;
999 	struct io_uring_sqe *sqe;
1000 	unsigned int cmd_op = 0;;
1001 	uint64_t user_data;
1002 
1003 	/* each io should have operation of fetching or committing */
1004 	assert((io->cmd_op == UBLK_IO_FETCH_REQ) || (io->cmd_op == UBLK_IO_NEED_GET_DATA) ||
1005 	       (io->cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ));
1006 	cmd_op = io->cmd_op;
1007 
1008 	sqe = io_uring_get_sqe(&q->ring);
1009 	assert(sqe);
1010 
1011 	cmd = (struct ublksrv_io_cmd *)ublk_get_sqe_cmd(sqe);
1012 	if (cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ) {
1013 		cmd->result = io->result;
1014 	}
1015 
1016 	/* These fields should be written once, never change */
1017 	ublk_set_sqe_cmd_op(sqe, cmd_op);
1018 	/* dev->cdev_fd */
1019 	sqe->fd		= 0;
1020 	sqe->opcode	= IORING_OP_URING_CMD;
1021 	sqe->flags	= IOSQE_FIXED_FILE;
1022 	sqe->rw_flags	= 0;
1023 	cmd->tag	= tag;
1024 	cmd->addr	= (__u64)(uintptr_t)(io->payload);
1025 	cmd->q_id	= q->q_id;
1026 
1027 	user_data = build_user_data(tag, cmd_op);
1028 	io_uring_sqe_set_data64(sqe, user_data);
1029 
1030 	io->cmd_op = 0;
1031 	q->cmd_inflight += 1;
1032 
1033 	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %u cmd_op %u) iof %x stopping %d\n",
1034 		      q->q_id, tag, cmd_op,
1035 		      io->cmd_op, q->is_stopping);
1036 }
1037 
1038 static int
1039 ublk_io_xmit(struct ublk_queue *q)
1040 {
1041 	TAILQ_HEAD(, ublk_io) buffer_free_list;
1042 	struct spdk_iobuf_channel *iobuf_ch;
1043 	int rc = 0, count = 0, tag;
1044 	struct ublk_io *io;
1045 
1046 	if (TAILQ_EMPTY(&q->completed_io_list)) {
1047 		return 0;
1048 	}
1049 
1050 	TAILQ_INIT(&buffer_free_list);
1051 	while (!TAILQ_EMPTY(&q->completed_io_list)) {
1052 		io = TAILQ_FIRST(&q->completed_io_list);
1053 		tag = io - io->q->ios;
1054 		assert(io != NULL);
1055 		/*
1056 		 * Remove IO from list now assuming it will be completed. It will be inserted
1057 		 * back to the head if it cannot be completed. This approach is specifically
1058 		 * taken to work around a scan-build use-after-free mischaracterization.
1059 		 */
1060 		TAILQ_REMOVE(&q->completed_io_list, io, tailq);
1061 		if (!io->need_data) {
1062 			TAILQ_INSERT_TAIL(&buffer_free_list, io, tailq);
1063 		}
1064 		ublksrv_queue_io_cmd(q, io, tag);
1065 		count++;
1066 	}
1067 
1068 	rc = io_uring_submit(&q->ring);
1069 	if (rc != count) {
1070 		SPDK_ERRLOG("could not submit all commands\n");
1071 		assert(false);
1072 	}
1073 
1074 	/* Note: for READ io, ublk will always copy the data out of
1075 	 * the buffers in the io_uring_submit context.  Since we
1076 	 * are not using SQPOLL for IO rings, we can safely free
1077 	 * those IO buffers here.  This design doesn't seem ideal,
1078 	 * but it's what's possible since there is no discrete
1079 	 * COMMIT_REQ operation.  That will need to change in the
1080 	 * future should we ever want to support async copy
1081 	 * operations.
1082 	 */
1083 	iobuf_ch = &q->poll_group->iobuf_ch;
1084 	while (!TAILQ_EMPTY(&buffer_free_list)) {
1085 		io = TAILQ_FIRST(&buffer_free_list);
1086 		TAILQ_REMOVE(&buffer_free_list, io, tailq);
1087 		ublk_io_put_buffer(io, iobuf_ch);
1088 	}
1089 	return rc;
1090 }
1091 
1092 static void
1093 write_get_buffer_done(struct ublk_io *io)
1094 {
1095 	io->need_data = true;
1096 	ublk_mark_io_get_data(io);
1097 	TAILQ_REMOVE(&io->q->inflight_io_list, io, tailq);
1098 	TAILQ_INSERT_TAIL(&io->q->completed_io_list, io, tailq);
1099 }
1100 
1101 static int
1102 ublk_io_recv(struct ublk_queue *q)
1103 {
1104 	struct io_uring_cqe *cqe;
1105 	unsigned head, tag;
1106 	int fetch, count = 0;
1107 	struct ublk_io *io;
1108 	struct spdk_iobuf_channel *iobuf_ch;
1109 	unsigned __attribute__((unused)) cmd_op;
1110 
1111 	if (q->cmd_inflight == 0) {
1112 		return 0;
1113 	}
1114 
1115 	iobuf_ch = &q->poll_group->iobuf_ch;
1116 	io_uring_for_each_cqe(&q->ring, head, cqe) {
1117 		tag = user_data_to_tag(cqe->user_data);
1118 		cmd_op = user_data_to_op(cqe->user_data);
1119 		fetch = (cqe->res != UBLK_IO_RES_ABORT) && !q->is_stopping;
1120 
1121 		SPDK_DEBUGLOG(ublk_io, "res %d qid %d tag %u cmd_op %u\n",
1122 			      cqe->res, q->q_id, tag, cmd_op);
1123 
1124 		q->cmd_inflight--;
1125 		io = &q->ios[tag];
1126 
1127 		if (!fetch) {
1128 			q->is_stopping = true;
1129 			if (io->cmd_op == UBLK_IO_FETCH_REQ) {
1130 				io->cmd_op = 0;
1131 			}
1132 		}
1133 
1134 		TAILQ_INSERT_TAIL(&q->inflight_io_list, io, tailq);
1135 		if (cqe->res == UBLK_IO_RES_OK) {
1136 			ublk_submit_bdev_io(q, tag);
1137 		} else if (cqe->res == UBLK_IO_RES_NEED_GET_DATA) {
1138 			ublk_io_get_buffer(io, iobuf_ch, write_get_buffer_done);
1139 		} else {
1140 			if (cqe->res != UBLK_IO_RES_ABORT) {
1141 				SPDK_ERRLOG("ublk received error io: res %d qid %d tag %u cmd_op %u\n",
1142 					    cqe->res, q->q_id, tag, cmd_op);
1143 			}
1144 			TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1145 		}
1146 		count += 1;
1147 		if (count == UBLK_QUEUE_REQUEST) {
1148 			break;
1149 		}
1150 	}
1151 	io_uring_cq_advance(&q->ring, count);
1152 
1153 	return count;
1154 }
1155 
1156 static int
1157 ublk_poll(void *arg)
1158 {
1159 	struct ublk_poll_group *poll_group = arg;
1160 	struct ublk_queue *q, *q_tmp;
1161 	int sent, received, count = 0;
1162 
1163 	TAILQ_FOREACH_SAFE(q, &poll_group->queue_list, tailq, q_tmp) {
1164 		sent = ublk_io_xmit(q);
1165 		received = ublk_io_recv(q);
1166 		if (spdk_unlikely(q->is_stopping)) {
1167 			ublk_try_close_queue(q);
1168 		}
1169 		count += sent + received;
1170 	}
1171 	if (count > 0) {
1172 		return SPDK_POLLER_BUSY;
1173 	} else {
1174 		return SPDK_POLLER_IDLE;
1175 	}
1176 }
1177 
1178 static void
1179 ublk_bdev_hot_remove(struct spdk_ublk_dev *ublk)
1180 {
1181 	ublk_close_dev(ublk);
1182 }
1183 
1184 static void
1185 ublk_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1186 		   void *event_ctx)
1187 {
1188 	switch (type) {
1189 	case SPDK_BDEV_EVENT_REMOVE:
1190 		ublk_bdev_hot_remove(event_ctx);
1191 		break;
1192 	default:
1193 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1194 		break;
1195 	}
1196 }
1197 
1198 static void
1199 ublk_dev_init_io_cmds(struct io_uring *r, uint32_t q_depth)
1200 {
1201 	struct io_uring_sqe *sqe;
1202 	uint32_t i;
1203 
1204 	for (i = 0; i < q_depth; i++) {
1205 		sqe = ublk_uring_get_sqe(r, i);
1206 
1207 		/* These fields should be written once, never change */
1208 		sqe->flags = IOSQE_FIXED_FILE;
1209 		sqe->rw_flags = 0;
1210 		sqe->ioprio = 0;
1211 		sqe->off = 0;
1212 	}
1213 }
1214 
1215 static int
1216 ublk_dev_queue_init(struct ublk_queue *q)
1217 {
1218 	int rc = 0, cmd_buf_size;
1219 	uint32_t j;
1220 	struct spdk_ublk_dev *ublk = q->dev;
1221 	unsigned long off;
1222 
1223 	cmd_buf_size = ublk_queue_cmd_buf_sz(q->q_depth);
1224 	off = UBLKSRV_CMD_BUF_OFFSET +
1225 	      q->q_id * (UBLK_MAX_QUEUE_DEPTH * sizeof(struct ublksrv_io_desc));
1226 	q->io_cmd_buf = (struct ublksrv_io_desc *)mmap(0, cmd_buf_size, PROT_READ,
1227 			MAP_SHARED | MAP_POPULATE, ublk->cdev_fd, off);
1228 	if (q->io_cmd_buf == MAP_FAILED) {
1229 		q->io_cmd_buf = NULL;
1230 		rc = -errno;
1231 		SPDK_ERRLOG("Failed at mmap: %s\n", spdk_strerror(-rc));
1232 		goto err;
1233 	}
1234 
1235 	for (j = 0; j < q->q_depth; j++) {
1236 		q->ios[j].cmd_op = UBLK_IO_FETCH_REQ;
1237 		q->ios[j].iod = &q->io_cmd_buf[j];
1238 	}
1239 
1240 	rc = ublk_setup_ring(q->q_depth, &q->ring, IORING_SETUP_SQE128);
1241 	if (rc < 0) {
1242 		SPDK_ERRLOG("Failed at setup uring: %s\n", spdk_strerror(-rc));
1243 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1244 		q->io_cmd_buf = NULL;
1245 		goto err;
1246 	}
1247 
1248 	rc = io_uring_register_files(&q->ring, &ublk->cdev_fd, 1);
1249 	if (rc != 0) {
1250 		SPDK_ERRLOG("Failed at uring register files: %s\n", spdk_strerror(-rc));
1251 		io_uring_queue_exit(&q->ring);
1252 		q->ring.ring_fd = -1;
1253 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1254 		q->io_cmd_buf = NULL;
1255 		goto err;
1256 	}
1257 
1258 	ublk_dev_init_io_cmds(&q->ring, q->q_depth);
1259 
1260 	return 0;
1261 err:
1262 	return rc;
1263 }
1264 
1265 static void
1266 ublk_dev_queue_fini(struct ublk_queue *q)
1267 {
1268 	if (q->ring.ring_fd >= 0) {
1269 		io_uring_unregister_files(&q->ring);
1270 		io_uring_queue_exit(&q->ring);
1271 		q->ring.ring_fd = -1;
1272 	}
1273 	if (q->io_cmd_buf) {
1274 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1275 	}
1276 }
1277 
1278 static void
1279 ublk_dev_queue_io_init(struct ublk_queue *q)
1280 {
1281 	struct ublk_io *io;
1282 	uint32_t i;
1283 	int rc __attribute__((unused));
1284 	void *buf;
1285 
1286 	/* Some older kernels require a buffer to get posted, even
1287 	 * when NEED_GET_DATA has been specified.  So allocate a
1288 	 * temporary buffer, only for purposes of this workaround.
1289 	 * It never actually gets used, so we will free it immediately
1290 	 * after all of the commands are posted.
1291 	 */
1292 	buf = malloc(64);
1293 
1294 	assert(q->bdev_ch != NULL);
1295 
1296 	/* Initialize and submit all io commands to ublk driver */
1297 	for (i = 0; i < q->q_depth; i++) {
1298 		io = &q->ios[i];
1299 		io->payload = buf;
1300 		io->bdev_ch = q->bdev_ch;
1301 		io->bdev_desc = q->dev->bdev_desc;
1302 		io->sector_per_block_shift = q->dev->sector_per_block_shift;
1303 		ublksrv_queue_io_cmd(q, io, i);
1304 	}
1305 
1306 	rc = io_uring_submit(&q->ring);
1307 	assert(rc == (int)q->q_depth);
1308 	for (i = 0; i < q->q_depth; i++) {
1309 		io = &q->ios[i];
1310 		io->payload = NULL;
1311 	}
1312 	free(buf);
1313 }
1314 
1315 static void
1316 ublk_set_params(struct spdk_ublk_dev *ublk)
1317 {
1318 	int rc;
1319 
1320 	ublk->dev_params.len = sizeof(struct ublk_params);
1321 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_SET_PARAMS);
1322 	if (rc < 0) {
1323 		SPDK_ERRLOG("UBLK can't set params for dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
1324 		ublk_delete_dev(ublk);
1325 		if (ublk->start_cb) {
1326 			ublk->start_cb(ublk->cb_arg, rc);
1327 			ublk->start_cb = NULL;
1328 		}
1329 	}
1330 }
1331 
1332 /* Set ublk device parameters based on bdev */
1333 static void
1334 ublk_info_param_init(struct spdk_ublk_dev *ublk)
1335 {
1336 	struct spdk_bdev *bdev = ublk->bdev;
1337 	uint32_t blk_size = spdk_bdev_get_data_block_size(bdev);
1338 	uint32_t pblk_size = spdk_bdev_get_physical_block_size(bdev);
1339 	uint32_t io_opt_blocks = spdk_bdev_get_optimal_io_boundary(bdev);
1340 	uint64_t num_blocks = spdk_bdev_get_num_blocks(bdev);
1341 	uint8_t sectors_per_block = blk_size >> LINUX_SECTOR_SHIFT;
1342 	uint32_t io_min_size = blk_size;
1343 	uint32_t io_opt_size = spdk_max(io_opt_blocks * blk_size, io_min_size);
1344 
1345 	struct ublksrv_ctrl_dev_info uinfo = {
1346 		.queue_depth = ublk->queue_depth,
1347 		.nr_hw_queues = ublk->num_queues,
1348 		.dev_id = ublk->ublk_id,
1349 		.max_io_buf_bytes = UBLK_IO_MAX_BYTES,
1350 		.ublksrv_pid = getpid(),
1351 		.flags = UBLK_F_NEED_GET_DATA | UBLK_F_URING_CMD_COMP_IN_TASK,
1352 	};
1353 	struct ublk_params uparams = {
1354 		.types = UBLK_PARAM_TYPE_BASIC,
1355 		.basic = {
1356 			.logical_bs_shift = spdk_u32log2(blk_size),
1357 			.physical_bs_shift = spdk_u32log2(pblk_size),
1358 			.io_min_shift = spdk_u32log2(io_min_size),
1359 			.io_opt_shift = spdk_u32log2(io_opt_size),
1360 			.dev_sectors = num_blocks * sectors_per_block,
1361 			.max_sectors = UBLK_IO_MAX_BYTES >> LINUX_SECTOR_SHIFT,
1362 		}
1363 	};
1364 
1365 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1366 		uparams.types |= UBLK_PARAM_TYPE_DISCARD;
1367 		uparams.discard.discard_alignment = sectors_per_block;
1368 		uparams.discard.max_discard_sectors = num_blocks * sectors_per_block;
1369 		uparams.discard.max_discard_segments = 1;
1370 		uparams.discard.discard_granularity = blk_size;
1371 		if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1372 			uparams.discard.max_write_zeroes_sectors = num_blocks * sectors_per_block;
1373 		}
1374 	}
1375 
1376 	ublk->dev_info = uinfo;
1377 	ublk->dev_params = uparams;
1378 }
1379 
1380 static void
1381 _ublk_free_dev(void *arg)
1382 {
1383 	struct spdk_ublk_dev *ublk = arg;
1384 
1385 	ublk_free_dev(ublk);
1386 }
1387 
1388 static void
1389 free_buffers(void *arg)
1390 {
1391 	struct ublk_queue *q = arg;
1392 	uint32_t i;
1393 
1394 	for (i = 0; i < q->q_depth; i++) {
1395 		ublk_io_put_buffer(&q->ios[i], &q->poll_group->iobuf_ch);
1396 	}
1397 	free(q->ios);
1398 	q->ios = NULL;
1399 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _ublk_free_dev, q->dev);
1400 }
1401 
1402 static void
1403 ublk_free_dev(struct spdk_ublk_dev *ublk)
1404 {
1405 	struct ublk_queue *q;
1406 	uint32_t q_idx;
1407 
1408 	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
1409 		q = &ublk->queues[q_idx];
1410 
1411 		/* The ublk_io of this queue are not initialized. */
1412 		if (q->ios == NULL) {
1413 			continue;
1414 		}
1415 
1416 		/* We found a queue that has an ios array that may have buffers
1417 		 * that need to be freed.  Send a message to the queue's thread
1418 		 * so it can free the buffers back to that thread's iobuf channel.
1419 		 * When it's done, it will set q->ios to NULL and send a message
1420 		 * back to this function to continue.
1421 		 */
1422 		if (q->poll_group) {
1423 			spdk_thread_send_msg(q->poll_group->ublk_thread, free_buffers, q);
1424 			return;
1425 		} else {
1426 			free(q->ios);
1427 			q->ios = NULL;
1428 		}
1429 	}
1430 
1431 	/* All of the buffers associated with the queues have been freed, so now
1432 	 * continue with releasing resources for the rest of the ublk device.
1433 	 */
1434 	if (ublk->bdev_desc) {
1435 		spdk_bdev_close(ublk->bdev_desc);
1436 		ublk->bdev_desc = NULL;
1437 	}
1438 
1439 	ublk_dev_list_unregister(ublk);
1440 
1441 	if (ublk->del_cb) {
1442 		ublk->del_cb(ublk->cb_arg);
1443 	}
1444 	SPDK_NOTICELOG("ublk dev %d stopped\n", ublk->ublk_id);
1445 	free(ublk);
1446 }
1447 
1448 static int
1449 ublk_ios_init(struct spdk_ublk_dev *ublk)
1450 {
1451 	int rc;
1452 	uint32_t i, j;
1453 	struct ublk_queue *q;
1454 
1455 	for (i = 0; i < ublk->num_queues; i++) {
1456 		q = &ublk->queues[i];
1457 
1458 		TAILQ_INIT(&q->completed_io_list);
1459 		TAILQ_INIT(&q->inflight_io_list);
1460 		q->dev = ublk;
1461 		q->q_id = i;
1462 		q->q_depth = ublk->queue_depth;
1463 		q->ios = calloc(q->q_depth, sizeof(struct ublk_io));
1464 		if (!q->ios) {
1465 			rc = -ENOMEM;
1466 			SPDK_ERRLOG("could not allocate queue ios\n");
1467 			goto err;
1468 		}
1469 		for (j = 0; j < q->q_depth; j++) {
1470 			q->ios[j].q = q;
1471 		}
1472 	}
1473 
1474 	return 0;
1475 
1476 err:
1477 	for (i = 0; i < ublk->num_queues; i++) {
1478 		free(q->ios);
1479 		q->ios = NULL;
1480 	}
1481 	return rc;
1482 }
1483 
1484 static void
1485 ublk_queue_run(void *arg1)
1486 {
1487 	struct ublk_queue	*q = arg1;
1488 	struct spdk_ublk_dev *ublk = q->dev;
1489 	struct ublk_poll_group *poll_group = q->poll_group;
1490 
1491 	assert(spdk_get_thread() == poll_group->ublk_thread);
1492 	q->bdev_ch = spdk_bdev_get_io_channel(ublk->bdev_desc);
1493 	/* Queues must be filled with IO in the io pthread */
1494 	ublk_dev_queue_io_init(q);
1495 
1496 	TAILQ_INSERT_TAIL(&poll_group->queue_list, q, tailq);
1497 }
1498 
1499 int
1500 ublk_start_disk(const char *bdev_name, uint32_t ublk_id,
1501 		uint32_t num_queues, uint32_t queue_depth,
1502 		ublk_start_cb start_cb, void *cb_arg)
1503 {
1504 	int			rc;
1505 	uint32_t		i;
1506 	struct spdk_bdev	*bdev;
1507 	struct spdk_ublk_dev	*ublk = NULL;
1508 	uint32_t		sector_per_block;
1509 
1510 	assert(spdk_thread_is_app_thread(NULL));
1511 
1512 	if (g_ublk_tgt.active == false) {
1513 		SPDK_ERRLOG("NO ublk target exist\n");
1514 		return -ENODEV;
1515 	}
1516 
1517 	ublk = ublk_dev_find_by_id(ublk_id);
1518 	if (ublk != NULL) {
1519 		SPDK_DEBUGLOG(ublk, "ublk id %d is in use.\n", ublk_id);
1520 		return -EBUSY;
1521 	}
1522 
1523 	if (g_ublk_tgt.num_ublk_devs >= g_ublks_max) {
1524 		SPDK_DEBUGLOG(ublk, "Reached maximum number of supported devices: %u\n", g_ublks_max);
1525 		return -ENOTSUP;
1526 	}
1527 
1528 	ublk = calloc(1, sizeof(*ublk));
1529 	if (ublk == NULL) {
1530 		return -ENOMEM;
1531 	}
1532 	ublk->start_cb = start_cb;
1533 	ublk->cb_arg = cb_arg;
1534 	ublk->cdev_fd = -1;
1535 	ublk->ublk_id = ublk_id;
1536 	UBLK_DEBUGLOG(ublk, "bdev %s num_queues %d queue_depth %d\n",
1537 		      bdev_name, num_queues, queue_depth);
1538 
1539 	rc = spdk_bdev_open_ext(bdev_name, true, ublk_bdev_event_cb, ublk, &ublk->bdev_desc);
1540 	if (rc != 0) {
1541 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc);
1542 		free(ublk);
1543 		return rc;
1544 	}
1545 
1546 	bdev = spdk_bdev_desc_get_bdev(ublk->bdev_desc);
1547 	ublk->bdev = bdev;
1548 	sector_per_block = spdk_bdev_get_data_block_size(ublk->bdev) >> LINUX_SECTOR_SHIFT;
1549 	ublk->sector_per_block_shift = spdk_u32log2(sector_per_block);
1550 
1551 	ublk->queues_closed = 0;
1552 	ublk->num_queues = num_queues;
1553 	ublk->queue_depth = queue_depth;
1554 	if (ublk->queue_depth > UBLK_DEV_MAX_QUEUE_DEPTH) {
1555 		SPDK_WARNLOG("Set Queue depth %d of UBLK %d to maximum %d\n",
1556 			     ublk->queue_depth, ublk->ublk_id, UBLK_DEV_MAX_QUEUE_DEPTH);
1557 		ublk->queue_depth = UBLK_DEV_MAX_QUEUE_DEPTH;
1558 	}
1559 	if (ublk->num_queues > UBLK_DEV_MAX_QUEUES) {
1560 		SPDK_WARNLOG("Set Queue num %d of UBLK %d to maximum %d\n",
1561 			     ublk->num_queues, ublk->ublk_id, UBLK_DEV_MAX_QUEUES);
1562 		ublk->num_queues = UBLK_DEV_MAX_QUEUES;
1563 	}
1564 	for (i = 0; i < ublk->num_queues; i++) {
1565 		ublk->queues[i].ring.ring_fd = -1;
1566 	}
1567 
1568 	ublk_info_param_init(ublk);
1569 	rc = ublk_ios_init(ublk);
1570 	if (rc != 0) {
1571 		spdk_bdev_close(ublk->bdev_desc);
1572 		free(ublk);
1573 		return rc;
1574 	}
1575 
1576 	SPDK_INFOLOG(ublk, "Enabling kernel access to bdev %s via ublk %d\n",
1577 		     bdev_name, ublk_id);
1578 
1579 	/* Add ublk_dev to the end of disk list */
1580 	ublk_dev_list_register(ublk);
1581 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_ADD_DEV);
1582 	if (rc < 0) {
1583 		SPDK_ERRLOG("UBLK can't add dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
1584 		ublk_free_dev(ublk);
1585 	}
1586 
1587 	return rc;
1588 }
1589 
1590 static void
1591 ublk_finish_start(struct spdk_ublk_dev *ublk)
1592 {
1593 	int			rc;
1594 	uint32_t		q_id;
1595 	struct spdk_thread	*ublk_thread;
1596 	char			buf[64];
1597 
1598 	snprintf(buf, 64, "%s%d", UBLK_BLK_CDEV, ublk->ublk_id);
1599 	ublk->cdev_fd = open(buf, O_RDWR);
1600 	if (ublk->cdev_fd < 0) {
1601 		rc = ublk->cdev_fd;
1602 		SPDK_ERRLOG("can't open %s, rc %d\n", buf, rc);
1603 		goto err;
1604 	}
1605 
1606 	for (q_id = 0; q_id < ublk->num_queues; q_id++) {
1607 		rc = ublk_dev_queue_init(&ublk->queues[q_id]);
1608 		if (rc) {
1609 			goto err;
1610 		}
1611 	}
1612 
1613 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_START_DEV);
1614 	if (rc < 0) {
1615 		SPDK_ERRLOG("start dev %d failed, rc %s\n", ublk->ublk_id,
1616 			    spdk_strerror(-rc));
1617 		goto err;
1618 	}
1619 
1620 	/* Send queue to different spdk_threads for load balance */
1621 	for (q_id = 0; q_id < ublk->num_queues; q_id++) {
1622 		ublk->queues[q_id].poll_group = &g_ublk_tgt.poll_group[g_next_ublk_poll_group];
1623 		ublk_thread = g_ublk_tgt.poll_group[g_next_ublk_poll_group].ublk_thread;
1624 		spdk_thread_send_msg(ublk_thread, ublk_queue_run, &ublk->queues[q_id]);
1625 		g_next_ublk_poll_group++;
1626 		if (g_next_ublk_poll_group == g_num_ublk_poll_groups) {
1627 			g_next_ublk_poll_group = 0;
1628 		}
1629 	}
1630 
1631 	goto out;
1632 
1633 err:
1634 	ublk_delete_dev(ublk);
1635 out:
1636 	if (ublk->start_cb) {
1637 		ublk->start_cb(ublk->cb_arg, rc);
1638 		ublk->start_cb = NULL;
1639 	}
1640 }
1641 
1642 SPDK_LOG_REGISTER_COMPONENT(ublk)
1643 SPDK_LOG_REGISTER_COMPONENT(ublk_io)
1644