xref: /spdk/lib/ublk/ublk.c (revision c17d6515ff59ba4b286713c4ff14c089426e6c4c)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2022 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include <linux/ublk_cmd.h>
7 #include <liburing.h>
8 
9 #include "spdk/stdinc.h"
10 #include "spdk/string.h"
11 #include "spdk/bdev.h"
12 #include "spdk/endian.h"
13 #include "spdk/env.h"
14 #include "spdk/likely.h"
15 #include "spdk/log.h"
16 #include "spdk/util.h"
17 #include "spdk/queue.h"
18 #include "spdk/json.h"
19 #include "spdk/ublk.h"
20 #include "spdk/thread.h"
21 
22 #include "ublk_internal.h"
23 
24 #define UBLK_CTRL_DEV					"/dev/ublk-control"
25 #define UBLK_BLK_CDEV					"/dev/ublkc"
26 
27 #define LINUX_SECTOR_SHIFT				9
28 #define UBLK_IO_MAX_BYTES				SPDK_BDEV_LARGE_BUF_MAX_SIZE
29 #define UBLK_DEV_MAX_QUEUES				32
30 #define UBLK_DEV_MAX_QUEUE_DEPTH			1024
31 #define UBLK_QUEUE_REQUEST				32
32 #define UBLK_STOP_BUSY_WAITING_MS			10000
33 #define UBLK_BUSY_POLLING_INTERVAL_US			20000
34 #define UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US	1000
35 /* By default, kernel ublk_drv driver can support up to 64 block devices */
36 #define UBLK_DEFAULT_MAX_SUPPORTED_DEVS			64
37 
38 #define UBLK_IOBUF_SMALL_CACHE_SIZE			128
39 #define UBLK_IOBUF_LARGE_CACHE_SIZE			32
40 
41 #define UBLK_DEBUGLOG(ublk, format, ...) \
42 	SPDK_DEBUGLOG(ublk, "ublk%d: " format, ublk->ublk_id, ##__VA_ARGS__);
43 
44 static uint32_t g_num_ublk_poll_groups = 0;
45 static uint32_t g_next_ublk_poll_group = 0;
46 static uint32_t g_ublks_max = UBLK_DEFAULT_MAX_SUPPORTED_DEVS;
47 static struct spdk_cpuset g_core_mask;
48 
49 struct ublk_queue;
50 struct ublk_poll_group;
51 struct ublk_io;
52 static void _ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io);
53 static void ublk_dev_queue_fini(struct ublk_queue *q);
54 static int ublk_poll(void *arg);
55 static int ublk_ctrl_cmd(struct spdk_ublk_dev *ublk, uint32_t cmd_op);
56 
57 typedef void (*ublk_next_state_fn)(struct spdk_ublk_dev *ublk);
58 static void ublk_set_params(struct spdk_ublk_dev *ublk);
59 static void ublk_finish_start(struct spdk_ublk_dev *ublk);
60 static void ublk_free_dev(struct spdk_ublk_dev *ublk);
61 
62 static const char *ublk_op_name[64]
63 __attribute__((unused)) = {
64 	[UBLK_CMD_ADD_DEV] =	"UBLK_CMD_ADD_DEV",
65 	[UBLK_CMD_DEL_DEV] =	"UBLK_CMD_DEL_DEV",
66 	[UBLK_CMD_START_DEV] =	"UBLK_CMD_START_DEV",
67 	[UBLK_CMD_STOP_DEV] =	"UBLK_CMD_STOP_DEV",
68 	[UBLK_CMD_SET_PARAMS] =	"UBLK_CMD_SET_PARAMS",
69 };
70 
71 typedef void (*ublk_get_buf_cb)(struct ublk_io *io);
72 
73 struct ublk_io {
74 	void			*payload;
75 	void			*mpool_entry;
76 	bool			need_data;
77 	uint16_t		tag;
78 	uint32_t		payload_size;
79 	uint32_t		cmd_op;
80 	int32_t			result;
81 	struct spdk_bdev_desc	*bdev_desc;
82 	struct spdk_io_channel	*bdev_ch;
83 	const struct ublksrv_io_desc	*iod;
84 	ublk_get_buf_cb		get_buf_cb;
85 	struct ublk_queue	*q;
86 	/* for bdev io_wait */
87 	struct spdk_bdev_io_wait_entry bdev_io_wait;
88 	struct spdk_iobuf_entry	iobuf;
89 
90 	TAILQ_ENTRY(ublk_io)	tailq;
91 };
92 
93 struct ublk_queue {
94 	uint32_t		q_id;
95 	uint32_t		q_depth;
96 	struct ublk_io		*ios;
97 	TAILQ_HEAD(, ublk_io)	completed_io_list;
98 	TAILQ_HEAD(, ublk_io)	inflight_io_list;
99 	uint32_t		cmd_inflight;
100 	bool			is_stopping;
101 	struct ublksrv_io_desc	*io_cmd_buf;
102 	/* ring depth == dev_info->queue_depth. */
103 	struct io_uring		ring;
104 	struct spdk_ublk_dev	*dev;
105 	struct ublk_poll_group	*poll_group;
106 	struct spdk_io_channel	*bdev_ch;
107 
108 	TAILQ_ENTRY(ublk_queue)	tailq;
109 };
110 
111 struct spdk_ublk_dev {
112 	struct spdk_bdev	*bdev;
113 	struct spdk_bdev_desc	*bdev_desc;
114 
115 	int			cdev_fd;
116 	struct ublk_params	dev_params;
117 	struct ublksrv_ctrl_dev_info	dev_info;
118 
119 	uint32_t		ublk_id;
120 	uint32_t		num_queues;
121 	uint32_t		queue_depth;
122 	uint32_t		sector_per_block_shift;
123 	struct ublk_queue	queues[UBLK_DEV_MAX_QUEUES];
124 
125 	struct spdk_poller	*retry_poller;
126 	int			retry_count;
127 	uint32_t		queues_closed;
128 	ublk_start_cb		start_cb;
129 	ublk_del_cb		del_cb;
130 	void			*cb_arg;
131 	ublk_next_state_fn	next_state_fn;
132 	uint32_t		ctrl_ops_in_progress;
133 	bool			is_closing;
134 
135 	TAILQ_ENTRY(spdk_ublk_dev) tailq;
136 	TAILQ_ENTRY(spdk_ublk_dev) wait_tailq;
137 };
138 
139 struct ublk_poll_group {
140 	struct spdk_thread		*ublk_thread;
141 	struct spdk_poller		*ublk_poller;
142 	struct spdk_iobuf_channel	iobuf_ch;
143 	TAILQ_HEAD(, ublk_queue)	queue_list;
144 };
145 
146 struct ublk_tgt {
147 	int			ctrl_fd;
148 	bool			active;
149 	bool			is_destroying;
150 	spdk_ublk_fini_cb	cb_fn;
151 	void			*cb_arg;
152 	struct io_uring		ctrl_ring;
153 	struct spdk_poller	*ctrl_poller;
154 	uint32_t		ctrl_ops_in_progress;
155 	struct ublk_poll_group	*poll_groups;
156 	uint32_t		num_ublk_devs;
157 	uint64_t		features;
158 	/* `ublk_drv` supports UBLK_F_CMD_IOCTL_ENCODE */
159 	bool ioctl_encode;
160 };
161 
162 static TAILQ_HEAD(, spdk_ublk_dev) g_ublk_devs = TAILQ_HEAD_INITIALIZER(g_ublk_devs);
163 static struct ublk_tgt g_ublk_tgt;
164 
165 /* helpers for using io_uring */
166 static inline int
167 ublk_setup_ring(uint32_t depth, struct io_uring *r, unsigned flags)
168 {
169 	struct io_uring_params p = {};
170 
171 	p.flags = flags | IORING_SETUP_CQSIZE;
172 	p.cq_entries = depth;
173 
174 	return io_uring_queue_init_params(depth, r, &p);
175 }
176 
177 static inline struct io_uring_sqe *
178 ublk_uring_get_sqe(struct io_uring *r, uint32_t idx)
179 {
180 	/* Need to update the idx since we set IORING_SETUP_SQE128 parameter in ublk_setup_ring */
181 	return &r->sq.sqes[idx << 1];
182 }
183 
184 static inline void *
185 ublk_get_sqe_cmd(struct io_uring_sqe *sqe)
186 {
187 	return (void *)&sqe->addr3;
188 }
189 
190 static inline void
191 ublk_set_sqe_cmd_op(struct io_uring_sqe *sqe, uint32_t cmd_op)
192 {
193 	uint32_t opc = cmd_op;
194 
195 	if (g_ublk_tgt.ioctl_encode) {
196 		switch (cmd_op) {
197 		/* ctrl uring */
198 		case UBLK_CMD_GET_DEV_INFO:
199 			opc = _IOR('u', UBLK_CMD_GET_DEV_INFO, struct ublksrv_ctrl_cmd);
200 			break;
201 		case UBLK_CMD_ADD_DEV:
202 			opc = _IOWR('u', UBLK_CMD_ADD_DEV, struct ublksrv_ctrl_cmd);
203 			break;
204 		case UBLK_CMD_DEL_DEV:
205 			opc = _IOWR('u', UBLK_CMD_DEL_DEV, struct ublksrv_ctrl_cmd);
206 			break;
207 		case UBLK_CMD_START_DEV:
208 			opc = _IOWR('u', UBLK_CMD_START_DEV, struct ublksrv_ctrl_cmd);
209 			break;
210 		case UBLK_CMD_STOP_DEV:
211 			opc = _IOWR('u', UBLK_CMD_STOP_DEV, struct ublksrv_ctrl_cmd);
212 			break;
213 		case UBLK_CMD_SET_PARAMS:
214 			opc = _IOWR('u', UBLK_CMD_SET_PARAMS, struct ublksrv_ctrl_cmd);
215 			break;
216 
217 		/* io uring */
218 		case UBLK_IO_FETCH_REQ:
219 			opc = _IOWR('u', UBLK_IO_FETCH_REQ, struct ublksrv_io_cmd);
220 			break;
221 		case UBLK_IO_COMMIT_AND_FETCH_REQ:
222 			opc = _IOWR('u', UBLK_IO_COMMIT_AND_FETCH_REQ, struct ublksrv_io_cmd);
223 			break;
224 		case UBLK_IO_NEED_GET_DATA:
225 			opc = _IOWR('u', UBLK_IO_NEED_GET_DATA, struct ublksrv_io_cmd);
226 			break;
227 		default:
228 			break;
229 		}
230 	}
231 
232 	sqe->off = opc;
233 }
234 
235 static inline uint64_t
236 build_user_data(uint16_t tag, uint8_t op)
237 {
238 	assert(!(tag >> 16) && !(op >> 8));
239 
240 	return tag | (op << 16);
241 }
242 
243 static inline uint16_t
244 user_data_to_tag(uint64_t user_data)
245 {
246 	return user_data & 0xffff;
247 }
248 
249 static inline uint8_t
250 user_data_to_op(uint64_t user_data)
251 {
252 	return (user_data >> 16) & 0xff;
253 }
254 
255 void
256 spdk_ublk_init(void)
257 {
258 	assert(spdk_thread_is_app_thread(NULL));
259 
260 	g_ublk_tgt.ctrl_fd = -1;
261 	g_ublk_tgt.ctrl_ring.ring_fd = -1;
262 }
263 
264 static int
265 ublk_ctrl_poller(void *arg)
266 {
267 	struct io_uring *ring = &g_ublk_tgt.ctrl_ring;
268 	struct spdk_ublk_dev *ublk;
269 	struct io_uring_cqe *cqe;
270 	const int max = 8;
271 	int i, count = 0, rc;
272 
273 	if (!g_ublk_tgt.ctrl_ops_in_progress) {
274 		return SPDK_POLLER_IDLE;
275 	}
276 
277 	for (i = 0; i < max; i++) {
278 		rc = io_uring_peek_cqe(ring, &cqe);
279 		if (rc == -EAGAIN) {
280 			break;
281 		}
282 
283 		assert(cqe != NULL);
284 		g_ublk_tgt.ctrl_ops_in_progress--;
285 		ublk = (struct spdk_ublk_dev *)cqe->user_data;
286 		UBLK_DEBUGLOG(ublk, "ctrl cmd completed\n");
287 		ublk->ctrl_ops_in_progress--;
288 		if (ublk->next_state_fn) {
289 			ublk->next_state_fn(ublk);
290 		}
291 		io_uring_cqe_seen(ring, cqe);
292 		count++;
293 	}
294 
295 	return count > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
296 }
297 
298 static int
299 ublk_ctrl_cmd(struct spdk_ublk_dev *ublk, uint32_t cmd_op)
300 {
301 	uint32_t dev_id = ublk->ublk_id;
302 	int rc = -EINVAL;
303 	struct io_uring_sqe *sqe;
304 	struct ublksrv_ctrl_cmd *cmd;
305 
306 	UBLK_DEBUGLOG(ublk, "ctrl cmd %s\n", ublk_op_name[cmd_op]);
307 
308 	sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring);
309 	if (!sqe) {
310 		SPDK_ERRLOG("No available sqe in ctrl ring\n");
311 		assert(false);
312 		return -ENOENT;
313 	}
314 
315 	cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe);
316 	sqe->fd = g_ublk_tgt.ctrl_fd;
317 	sqe->opcode = IORING_OP_URING_CMD;
318 	sqe->ioprio = 0;
319 	cmd->dev_id = dev_id;
320 	cmd->queue_id = -1;
321 	ublk->next_state_fn = NULL;
322 
323 	switch (cmd_op) {
324 	case UBLK_CMD_ADD_DEV:
325 		ublk->next_state_fn = ublk_set_params;
326 		cmd->addr = (__u64)(uintptr_t)&ublk->dev_info;
327 		cmd->len = sizeof(ublk->dev_info);
328 		break;
329 	case UBLK_CMD_SET_PARAMS:
330 		ublk->next_state_fn = ublk_finish_start;
331 		cmd->addr = (__u64)(uintptr_t)&ublk->dev_params;
332 		cmd->len = sizeof(ublk->dev_params);
333 		break;
334 	case UBLK_CMD_START_DEV:
335 		cmd->data[0] = getpid();
336 		break;
337 	case UBLK_CMD_STOP_DEV:
338 		break;
339 	case UBLK_CMD_DEL_DEV:
340 		ublk->next_state_fn = ublk_free_dev;
341 		break;
342 	default:
343 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", cmd_op);
344 		return -EINVAL;
345 	}
346 	ublk_set_sqe_cmd_op(sqe, cmd_op);
347 	io_uring_sqe_set_data(sqe, ublk);
348 
349 	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
350 	if (rc < 0) {
351 		SPDK_ERRLOG("uring submit rc %d\n", rc);
352 		return rc;
353 	}
354 	g_ublk_tgt.ctrl_ops_in_progress++;
355 	ublk->ctrl_ops_in_progress++;
356 
357 	return 0;
358 }
359 
360 static int
361 ublk_ctrl_cmd_get_features(void)
362 {
363 	int rc;
364 	struct io_uring_sqe *sqe;
365 	struct io_uring_cqe *cqe;
366 	struct ublksrv_ctrl_cmd *cmd;
367 	uint32_t cmd_op;
368 
369 	sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring);
370 	if (!sqe) {
371 		SPDK_ERRLOG("No available sqe in ctrl ring\n");
372 		assert(false);
373 		return -ENOENT;
374 	}
375 
376 	cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe);
377 	sqe->fd = g_ublk_tgt.ctrl_fd;
378 	sqe->opcode = IORING_OP_URING_CMD;
379 	sqe->ioprio = 0;
380 	cmd->dev_id = -1;
381 	cmd->queue_id = -1;
382 	cmd->addr = (__u64)(uintptr_t)&g_ublk_tgt.features;
383 	cmd->len = sizeof(g_ublk_tgt.features);
384 
385 	cmd_op = _IOR('u', 0x13, struct ublksrv_ctrl_cmd);
386 	ublk_set_sqe_cmd_op(sqe, cmd_op);
387 
388 	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
389 	if (rc < 0) {
390 		SPDK_ERRLOG("uring submit rc %d\n", rc);
391 		return rc;
392 	}
393 
394 	rc = io_uring_wait_cqe(&g_ublk_tgt.ctrl_ring, &cqe);
395 	if (rc < 0) {
396 		SPDK_ERRLOG("wait cqe rc %d\n", rc);
397 		return rc;
398 	}
399 
400 	if (cqe->res == 0) {
401 		g_ublk_tgt.ioctl_encode = !!(g_ublk_tgt.features & (1ULL << 6));
402 	}
403 	io_uring_cqe_seen(&g_ublk_tgt.ctrl_ring, cqe);
404 
405 	return 0;
406 }
407 
408 static int
409 ublk_queue_cmd_buf_sz(uint32_t q_depth)
410 {
411 	uint32_t size = q_depth * sizeof(struct ublksrv_io_desc);
412 	uint32_t page_sz = getpagesize();
413 
414 	/* round up size */
415 	return (size + page_sz - 1) & ~(page_sz - 1);
416 }
417 
418 static int
419 ublk_get_max_support_devs(void)
420 {
421 	FILE *file;
422 	char str[128];
423 
424 	file = fopen("/sys/module/ublk_drv/parameters/ublks_max", "r");
425 	if (!file) {
426 		return -ENOENT;
427 	}
428 
429 	if (!fgets(str, sizeof(str), file)) {
430 		fclose(file);
431 		return -EINVAL;
432 	}
433 	fclose(file);
434 
435 	spdk_str_chomp(str);
436 	return spdk_strtol(str, 10);
437 }
438 
439 static int
440 ublk_open(void)
441 {
442 	int rc, ublks_max;
443 
444 	g_ublk_tgt.ctrl_fd = open(UBLK_CTRL_DEV, O_RDWR);
445 	if (g_ublk_tgt.ctrl_fd < 0) {
446 		rc = errno;
447 		SPDK_ERRLOG("UBLK conrol dev %s can't be opened, error=%s\n", UBLK_CTRL_DEV, spdk_strerror(errno));
448 		return -rc;
449 	}
450 
451 	ublks_max = ublk_get_max_support_devs();
452 	if (ublks_max > 0) {
453 		g_ublks_max = ublks_max;
454 	}
455 
456 	/* We need to set SQPOLL for kernels 6.1 and earlier, since they would not defer ublk ctrl
457 	 * ring processing to a workqueue.  Ctrl ring processing is minimal, so SQPOLL is fine.
458 	 * All the commands sent via control uring for a ublk device is executed one by one, so use
459 	 * ublks_max * 2 as the number of uring entries is enough.
460 	 */
461 	rc = ublk_setup_ring(g_ublks_max * 2, &g_ublk_tgt.ctrl_ring,
462 			     IORING_SETUP_SQE128 | IORING_SETUP_SQPOLL);
463 	if (rc < 0) {
464 		SPDK_ERRLOG("UBLK ctrl queue_init: %s\n", spdk_strerror(-rc));
465 		goto err;
466 	}
467 
468 	rc = ublk_ctrl_cmd_get_features();
469 	if (rc) {
470 		goto err;
471 	}
472 
473 	return 0;
474 
475 err:
476 	close(g_ublk_tgt.ctrl_fd);
477 	g_ublk_tgt.ctrl_fd = -1;
478 	return rc;
479 }
480 
481 static int
482 ublk_parse_core_mask(const char *mask)
483 {
484 	struct spdk_cpuset tmp_mask;
485 	int rc;
486 
487 	if (mask == NULL) {
488 		spdk_env_get_cpuset(&g_core_mask);
489 		return 0;
490 	}
491 
492 	rc = spdk_cpuset_parse(&g_core_mask, mask);
493 	if (rc < 0) {
494 		SPDK_ERRLOG("invalid cpumask %s\n", mask);
495 		return -EINVAL;
496 	}
497 
498 	if (spdk_cpuset_count(&g_core_mask) == 0) {
499 		SPDK_ERRLOG("no cpus specified\n");
500 		return -EINVAL;
501 	}
502 
503 	spdk_env_get_cpuset(&tmp_mask);
504 	spdk_cpuset_and(&tmp_mask, &g_core_mask);
505 
506 	if (!spdk_cpuset_equal(&tmp_mask, &g_core_mask)) {
507 		SPDK_ERRLOG("one of selected cpu is outside of core mask(=%s)\n",
508 			    spdk_cpuset_fmt(&g_core_mask));
509 		return -EINVAL;
510 	}
511 
512 	return 0;
513 }
514 
515 static void
516 ublk_poller_register(void *args)
517 {
518 	struct ublk_poll_group *poll_group = args;
519 	int rc;
520 
521 	assert(spdk_get_thread() == poll_group->ublk_thread);
522 	/* Bind ublk spdk_thread to current CPU core in order to avoid thread context switch
523 	 * during uring processing as required by ublk kernel.
524 	 */
525 	spdk_thread_bind(spdk_get_thread(), true);
526 
527 	TAILQ_INIT(&poll_group->queue_list);
528 	poll_group->ublk_poller = SPDK_POLLER_REGISTER(ublk_poll, poll_group, 0);
529 	rc = spdk_iobuf_channel_init(&poll_group->iobuf_ch, "ublk",
530 				     UBLK_IOBUF_SMALL_CACHE_SIZE, UBLK_IOBUF_LARGE_CACHE_SIZE);
531 	if (rc != 0) {
532 		assert(false);
533 	}
534 }
535 
536 int
537 ublk_create_target(const char *cpumask_str)
538 {
539 	int rc;
540 	uint32_t i;
541 	char thread_name[32];
542 	struct ublk_poll_group *poll_group;
543 
544 	if (g_ublk_tgt.active == true) {
545 		SPDK_ERRLOG("UBLK target has been created\n");
546 		return -EBUSY;
547 	}
548 
549 	rc = ublk_parse_core_mask(cpumask_str);
550 	if (rc != 0) {
551 		return rc;
552 	}
553 
554 	assert(g_ublk_tgt.poll_groups == NULL);
555 	g_ublk_tgt.poll_groups = calloc(spdk_env_get_core_count(), sizeof(*poll_group));
556 	if (!g_ublk_tgt.poll_groups) {
557 		return -ENOMEM;
558 	}
559 
560 	rc = ublk_open();
561 	if (rc != 0) {
562 		SPDK_ERRLOG("Fail to open UBLK, error=%s\n", spdk_strerror(-rc));
563 		free(g_ublk_tgt.poll_groups);
564 		return rc;
565 	}
566 
567 	spdk_iobuf_register_module("ublk");
568 
569 	SPDK_ENV_FOREACH_CORE(i) {
570 		if (!spdk_cpuset_get_cpu(&g_core_mask, i)) {
571 			continue;
572 		}
573 		snprintf(thread_name, sizeof(thread_name), "ublk_thread%u", i);
574 		poll_group = &g_ublk_tgt.poll_groups[g_num_ublk_poll_groups];
575 		poll_group->ublk_thread = spdk_thread_create(thread_name, &g_core_mask);
576 		spdk_thread_send_msg(poll_group->ublk_thread, ublk_poller_register, poll_group);
577 		g_num_ublk_poll_groups++;
578 	}
579 
580 	assert(spdk_thread_is_app_thread(NULL));
581 	g_ublk_tgt.active = true;
582 	g_ublk_tgt.ctrl_ops_in_progress = 0;
583 	g_ublk_tgt.ctrl_poller = SPDK_POLLER_REGISTER(ublk_ctrl_poller, NULL,
584 				 UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US);
585 
586 	SPDK_NOTICELOG("UBLK target created successfully\n");
587 
588 	return 0;
589 }
590 
591 static void
592 _ublk_fini_done(void *args)
593 {
594 	SPDK_DEBUGLOG(ublk, "\n");
595 
596 	g_num_ublk_poll_groups = 0;
597 	g_next_ublk_poll_group = 0;
598 	g_ublk_tgt.is_destroying = false;
599 	g_ublk_tgt.active = false;
600 	g_ublk_tgt.features = 0;
601 	g_ublk_tgt.ioctl_encode = false;
602 
603 	if (g_ublk_tgt.cb_fn) {
604 		g_ublk_tgt.cb_fn(g_ublk_tgt.cb_arg);
605 		g_ublk_tgt.cb_fn = NULL;
606 		g_ublk_tgt.cb_arg = NULL;
607 	}
608 
609 	if (g_ublk_tgt.poll_groups) {
610 		free(g_ublk_tgt.poll_groups);
611 		g_ublk_tgt.poll_groups = NULL;
612 	}
613 
614 }
615 
616 static void
617 ublk_thread_exit(void *args)
618 {
619 	struct spdk_thread *ublk_thread = spdk_get_thread();
620 	uint32_t i;
621 
622 	for (i = 0; i < g_num_ublk_poll_groups; i++) {
623 		if (g_ublk_tgt.poll_groups[i].ublk_thread == ublk_thread) {
624 			spdk_poller_unregister(&g_ublk_tgt.poll_groups[i].ublk_poller);
625 			spdk_iobuf_channel_fini(&g_ublk_tgt.poll_groups[i].iobuf_ch);
626 			spdk_thread_bind(ublk_thread, false);
627 			spdk_thread_exit(ublk_thread);
628 		}
629 	}
630 }
631 
632 static int
633 ublk_close_dev(struct spdk_ublk_dev *ublk)
634 {
635 	int rc;
636 
637 	/* set is_closing */
638 	if (ublk->is_closing) {
639 		return -EBUSY;
640 	}
641 	ublk->is_closing = true;
642 
643 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_STOP_DEV);
644 	if (rc < 0) {
645 		SPDK_ERRLOG("stop dev %d failed\n", ublk->ublk_id);
646 	}
647 	return rc;
648 }
649 
650 static void
651 _ublk_fini(void *args)
652 {
653 	struct spdk_ublk_dev	*ublk, *ublk_tmp;
654 
655 	TAILQ_FOREACH_SAFE(ublk, &g_ublk_devs, tailq, ublk_tmp) {
656 		ublk_close_dev(ublk);
657 	}
658 
659 	/* Check if all ublks closed */
660 	if (TAILQ_EMPTY(&g_ublk_devs)) {
661 		SPDK_DEBUGLOG(ublk, "finish shutdown\n");
662 		spdk_poller_unregister(&g_ublk_tgt.ctrl_poller);
663 		if (g_ublk_tgt.ctrl_ring.ring_fd >= 0) {
664 			io_uring_queue_exit(&g_ublk_tgt.ctrl_ring);
665 			g_ublk_tgt.ctrl_ring.ring_fd = -1;
666 		}
667 		if (g_ublk_tgt.ctrl_fd >= 0) {
668 			close(g_ublk_tgt.ctrl_fd);
669 			g_ublk_tgt.ctrl_fd = -1;
670 		}
671 		spdk_for_each_thread(ublk_thread_exit, NULL, _ublk_fini_done);
672 	} else {
673 		spdk_thread_send_msg(spdk_get_thread(), _ublk_fini, NULL);
674 	}
675 }
676 
677 int
678 spdk_ublk_fini(spdk_ublk_fini_cb cb_fn, void *cb_arg)
679 {
680 	assert(spdk_thread_is_app_thread(NULL));
681 
682 	if (g_ublk_tgt.is_destroying == true) {
683 		/* UBLK target is being destroying */
684 		return -EBUSY;
685 	}
686 	g_ublk_tgt.cb_fn = cb_fn;
687 	g_ublk_tgt.cb_arg = cb_arg;
688 	g_ublk_tgt.is_destroying = true;
689 	_ublk_fini(NULL);
690 
691 	return 0;
692 }
693 
694 int
695 ublk_destroy_target(spdk_ublk_fini_cb cb_fn, void *cb_arg)
696 {
697 	int rc;
698 
699 	if (g_ublk_tgt.active == false) {
700 		/* UBLK target has not been created */
701 		return -ENOENT;
702 	}
703 
704 	rc = spdk_ublk_fini(cb_fn, cb_arg);
705 
706 	return rc;
707 }
708 
709 struct spdk_ublk_dev *
710 ublk_dev_find_by_id(uint32_t ublk_id)
711 {
712 	struct spdk_ublk_dev *ublk;
713 
714 	/* check whether ublk has already been registered by ublk path. */
715 	TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) {
716 		if (ublk->ublk_id == ublk_id) {
717 			return ublk;
718 		}
719 	}
720 
721 	return NULL;
722 }
723 
724 uint32_t
725 ublk_dev_get_id(struct spdk_ublk_dev *ublk)
726 {
727 	return ublk->ublk_id;
728 }
729 
730 struct spdk_ublk_dev *ublk_dev_first(void)
731 {
732 	return TAILQ_FIRST(&g_ublk_devs);
733 }
734 
735 struct spdk_ublk_dev *ublk_dev_next(struct spdk_ublk_dev *prev)
736 {
737 	return TAILQ_NEXT(prev, tailq);
738 }
739 
740 uint32_t
741 ublk_dev_get_queue_depth(struct spdk_ublk_dev *ublk)
742 {
743 	return ublk->queue_depth;
744 }
745 
746 uint32_t
747 ublk_dev_get_num_queues(struct spdk_ublk_dev *ublk)
748 {
749 	return ublk->num_queues;
750 }
751 
752 const char *
753 ublk_dev_get_bdev_name(struct spdk_ublk_dev *ublk)
754 {
755 	return spdk_bdev_get_name(ublk->bdev);
756 }
757 
758 void
759 spdk_ublk_write_config_json(struct spdk_json_write_ctx *w)
760 {
761 	struct spdk_ublk_dev *ublk;
762 
763 	spdk_json_write_array_begin(w);
764 
765 	if (g_ublk_tgt.active) {
766 		spdk_json_write_object_begin(w);
767 
768 		spdk_json_write_named_string(w, "method", "ublk_create_target");
769 		spdk_json_write_named_object_begin(w, "params");
770 		spdk_json_write_named_string(w, "cpumask", spdk_cpuset_fmt(&g_core_mask));
771 		spdk_json_write_object_end(w);
772 
773 		spdk_json_write_object_end(w);
774 	}
775 
776 	TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) {
777 		spdk_json_write_object_begin(w);
778 
779 		spdk_json_write_named_string(w, "method", "ublk_start_disk");
780 
781 		spdk_json_write_named_object_begin(w, "params");
782 		spdk_json_write_named_string(w, "bdev_name", ublk_dev_get_bdev_name(ublk));
783 		spdk_json_write_named_uint32(w, "ublk_id", ublk->ublk_id);
784 		spdk_json_write_named_uint32(w, "num_queues", ublk->num_queues);
785 		spdk_json_write_named_uint32(w, "queue_depth", ublk->queue_depth);
786 		spdk_json_write_object_end(w);
787 
788 		spdk_json_write_object_end(w);
789 	}
790 
791 	spdk_json_write_array_end(w);
792 }
793 
794 static void
795 ublk_dev_list_register(struct spdk_ublk_dev *ublk)
796 {
797 	UBLK_DEBUGLOG(ublk, "add to tailq\n");
798 	TAILQ_INSERT_TAIL(&g_ublk_devs, ublk, tailq);
799 	g_ublk_tgt.num_ublk_devs++;
800 }
801 
802 static void
803 ublk_dev_list_unregister(struct spdk_ublk_dev *ublk)
804 {
805 	/*
806 	 * ublk device may be stopped before registered.
807 	 * check whether it was registered.
808 	 */
809 
810 	if (ublk_dev_find_by_id(ublk->ublk_id)) {
811 		UBLK_DEBUGLOG(ublk, "remove from tailq\n");
812 		TAILQ_REMOVE(&g_ublk_devs, ublk, tailq);
813 		assert(g_ublk_tgt.num_ublk_devs);
814 		g_ublk_tgt.num_ublk_devs--;
815 		return;
816 	}
817 
818 	UBLK_DEBUGLOG(ublk, "not found in tailq\n");
819 	assert(false);
820 }
821 
822 static void
823 ublk_delete_dev(void *arg)
824 {
825 	struct spdk_ublk_dev *ublk = arg;
826 	int rc = 0;
827 	uint32_t q_idx;
828 
829 	assert(spdk_thread_is_app_thread(NULL));
830 	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
831 		ublk_dev_queue_fini(&ublk->queues[q_idx]);
832 	}
833 
834 	if (ublk->cdev_fd >= 0) {
835 		close(ublk->cdev_fd);
836 	}
837 
838 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_DEL_DEV);
839 	if (rc < 0) {
840 		SPDK_ERRLOG("delete dev %d failed\n", ublk->ublk_id);
841 	}
842 }
843 
844 static int
845 _ublk_close_dev_retry(void *arg)
846 {
847 	struct spdk_ublk_dev *ublk = arg;
848 
849 	if (ublk->ctrl_ops_in_progress > 0) {
850 		if (ublk->retry_count-- > 0) {
851 			return SPDK_POLLER_BUSY;
852 		}
853 		SPDK_ERRLOG("Timeout on ctrl op completion.\n");
854 	}
855 	spdk_poller_unregister(&ublk->retry_poller);
856 	ublk_delete_dev(ublk);
857 	return SPDK_POLLER_BUSY;
858 }
859 
860 static void
861 ublk_try_close_dev(void *arg)
862 {
863 	struct spdk_ublk_dev *ublk = arg;
864 
865 	assert(spdk_thread_is_app_thread(NULL));
866 	ublk->queues_closed += 1;
867 	if (ublk->queues_closed < ublk->num_queues) {
868 		return;
869 	}
870 
871 	if (ublk->ctrl_ops_in_progress > 0) {
872 		assert(ublk->retry_poller == NULL);
873 		ublk->retry_count = UBLK_STOP_BUSY_WAITING_MS * 1000ULL / UBLK_BUSY_POLLING_INTERVAL_US;
874 		ublk->retry_poller = SPDK_POLLER_REGISTER(_ublk_close_dev_retry, ublk,
875 				     UBLK_BUSY_POLLING_INTERVAL_US);
876 	} else {
877 		ublk_delete_dev(ublk);
878 	}
879 }
880 
881 static void
882 ublk_try_close_queue(struct ublk_queue *q)
883 {
884 	struct spdk_ublk_dev *ublk = q->dev;
885 
886 	/* Close queue until no I/O is submitted to bdev in flight,
887 	 * no I/O is waiting to commit result, and all I/Os are aborted back.
888 	 */
889 	if (!TAILQ_EMPTY(&q->inflight_io_list) || !TAILQ_EMPTY(&q->completed_io_list) || q->cmd_inflight) {
890 		/* wait for next retry */
891 		return;
892 	}
893 
894 	TAILQ_REMOVE(&q->poll_group->queue_list, q, tailq);
895 	spdk_put_io_channel(q->bdev_ch);
896 	q->bdev_ch = NULL;
897 
898 	spdk_thread_send_msg(spdk_thread_get_app_thread(), ublk_try_close_dev, ublk);
899 }
900 
901 int
902 ublk_stop_disk(uint32_t ublk_id, ublk_del_cb del_cb, void *cb_arg)
903 {
904 	struct spdk_ublk_dev *ublk;
905 
906 	assert(spdk_thread_is_app_thread(NULL));
907 
908 	ublk = ublk_dev_find_by_id(ublk_id);
909 	if (ublk == NULL) {
910 		SPDK_ERRLOG("no ublk dev with ublk_id=%u\n", ublk_id);
911 		return -ENODEV;
912 	}
913 	if (ublk->is_closing) {
914 		SPDK_WARNLOG("ublk %d is closing\n", ublk->ublk_id);
915 		return -EBUSY;
916 	}
917 
918 	ublk->del_cb = del_cb;
919 	ublk->cb_arg = cb_arg;
920 	return ublk_close_dev(ublk);
921 }
922 
923 static inline void
924 ublk_mark_io_done(struct ublk_io *io, int res)
925 {
926 	/*
927 	 * mark io done by target, so that SPDK can commit its
928 	 * result and fetch new request via io_uring command.
929 	 */
930 	io->cmd_op = UBLK_IO_COMMIT_AND_FETCH_REQ;
931 	io->result = res;
932 	io->need_data = false;
933 }
934 
935 static void
936 ublk_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
937 {
938 	struct ublk_io	*io = cb_arg;
939 	struct ublk_queue *q = io->q;
940 	int res;
941 
942 	if (success) {
943 		res = io->result;
944 	} else {
945 		res = -EIO;
946 	}
947 
948 	ublk_mark_io_done(io, res);
949 
950 	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %d res %d)\n",
951 		      q->q_id, io->tag, res);
952 	TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
953 	TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq);
954 
955 	if (bdev_io != NULL) {
956 		spdk_bdev_free_io(bdev_io);
957 	}
958 }
959 
960 static void
961 ublk_resubmit_io(void *arg)
962 {
963 	struct ublk_io *io = (struct ublk_io *)arg;
964 
965 	_ublk_submit_bdev_io(io->q, io);
966 }
967 
968 static void
969 ublk_queue_io(struct ublk_io *io)
970 {
971 	int rc;
972 	struct spdk_bdev *bdev = io->q->dev->bdev;
973 	struct ublk_queue *q = io->q;
974 
975 	io->bdev_io_wait.bdev = bdev;
976 	io->bdev_io_wait.cb_fn = ublk_resubmit_io;
977 	io->bdev_io_wait.cb_arg = io;
978 
979 	rc = spdk_bdev_queue_io_wait(bdev, q->bdev_ch, &io->bdev_io_wait);
980 	if (rc != 0) {
981 		SPDK_ERRLOG("Queue io failed in ublk_queue_io, rc=%d.\n", rc);
982 		ublk_io_done(NULL, false, io);
983 	}
984 }
985 
986 static void
987 ublk_io_get_buffer_cb(struct spdk_iobuf_entry *iobuf, void *buf)
988 {
989 	struct ublk_io *io = SPDK_CONTAINEROF(iobuf, struct ublk_io, iobuf);
990 
991 	io->mpool_entry = buf;
992 	assert(io->payload == NULL);
993 	io->payload = (void *)(uintptr_t)SPDK_ALIGN_CEIL((uintptr_t)buf, 4096ULL);
994 	io->get_buf_cb(io);
995 }
996 
997 static void
998 ublk_io_get_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch,
999 		   ublk_get_buf_cb get_buf_cb)
1000 {
1001 	uint64_t io_size;
1002 	void *buf;
1003 
1004 	io_size = io->iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1005 	io->get_buf_cb = get_buf_cb;
1006 	buf = spdk_iobuf_get(iobuf_ch, io_size, &io->iobuf, ublk_io_get_buffer_cb);
1007 	if (buf != NULL) {
1008 		ublk_io_get_buffer_cb(&io->iobuf, buf);
1009 	}
1010 }
1011 
1012 static void
1013 ublk_io_put_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch)
1014 {
1015 	uint64_t io_size;
1016 
1017 	if (io->payload) {
1018 		io_size = io->iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1019 		spdk_iobuf_put(iobuf_ch, io->mpool_entry, io_size);
1020 		io->mpool_entry = NULL;
1021 		io->payload = NULL;
1022 	}
1023 }
1024 
1025 static void
1026 _ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io)
1027 {
1028 	struct spdk_ublk_dev *ublk = q->dev;
1029 	struct spdk_bdev_desc *desc = io->bdev_desc;
1030 	struct spdk_io_channel *ch = io->bdev_ch;
1031 	uint64_t offset_blocks, num_blocks;
1032 	uint8_t ublk_op;
1033 	int rc = 0;
1034 	const struct ublksrv_io_desc *iod = io->iod;
1035 
1036 	ublk_op = ublksrv_get_op(iod);
1037 	offset_blocks = iod->start_sector >> ublk->sector_per_block_shift;
1038 	num_blocks = iod->nr_sectors >> ublk->sector_per_block_shift;
1039 
1040 	io->result = num_blocks * spdk_bdev_get_data_block_size(ublk->bdev);
1041 	switch (ublk_op) {
1042 	case UBLK_IO_OP_READ:
1043 		rc = spdk_bdev_read_blocks(desc, ch, io->payload, offset_blocks, num_blocks, ublk_io_done, io);
1044 		break;
1045 	case UBLK_IO_OP_WRITE:
1046 		assert((void *)iod->addr == io->payload);
1047 		rc = spdk_bdev_write_blocks(desc, ch, io->payload, offset_blocks, num_blocks, ublk_io_done, io);
1048 		break;
1049 	case UBLK_IO_OP_FLUSH:
1050 		rc = spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1051 		break;
1052 	case UBLK_IO_OP_DISCARD:
1053 		rc = spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1054 		break;
1055 	case UBLK_IO_OP_WRITE_ZEROES:
1056 		rc = spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1057 		break;
1058 	default:
1059 		rc = -1;
1060 	}
1061 
1062 	if (rc < 0) {
1063 		if (rc == -ENOMEM) {
1064 			SPDK_INFOLOG(ublk, "No memory, start to queue io.\n");
1065 			ublk_queue_io(io);
1066 		} else {
1067 			SPDK_ERRLOG("ublk io failed in ublk_queue_io, rc=%d.\n", rc);
1068 			ublk_io_done(NULL, false, io);
1069 		}
1070 	}
1071 }
1072 
1073 static void
1074 read_get_buffer_done(struct ublk_io *io)
1075 {
1076 	_ublk_submit_bdev_io(io->q, io);
1077 }
1078 
1079 static void
1080 ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io)
1081 {
1082 	struct spdk_iobuf_channel *iobuf_ch = &q->poll_group->iobuf_ch;
1083 	const struct ublksrv_io_desc *iod = io->iod;
1084 	uint8_t ublk_op;
1085 
1086 	ublk_op = ublksrv_get_op(iod);
1087 	switch (ublk_op) {
1088 	case UBLK_IO_OP_READ:
1089 		ublk_io_get_buffer(io, iobuf_ch, read_get_buffer_done);
1090 		break;
1091 	default:
1092 		_ublk_submit_bdev_io(q, io);
1093 		break;
1094 	}
1095 }
1096 
1097 static inline void
1098 ublksrv_queue_io_cmd(struct ublk_queue *q,
1099 		     struct ublk_io *io, unsigned tag)
1100 {
1101 	struct ublksrv_io_cmd *cmd;
1102 	struct io_uring_sqe *sqe;
1103 	unsigned int cmd_op = 0;;
1104 	uint64_t user_data;
1105 
1106 	/* each io should have operation of fetching or committing */
1107 	assert((io->cmd_op == UBLK_IO_FETCH_REQ) || (io->cmd_op == UBLK_IO_NEED_GET_DATA) ||
1108 	       (io->cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ));
1109 	cmd_op = io->cmd_op;
1110 
1111 	sqe = io_uring_get_sqe(&q->ring);
1112 	assert(sqe);
1113 
1114 	cmd = (struct ublksrv_io_cmd *)ublk_get_sqe_cmd(sqe);
1115 	if (cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ) {
1116 		cmd->result = io->result;
1117 	}
1118 
1119 	/* These fields should be written once, never change */
1120 	ublk_set_sqe_cmd_op(sqe, cmd_op);
1121 	/* dev->cdev_fd */
1122 	sqe->fd		= 0;
1123 	sqe->opcode	= IORING_OP_URING_CMD;
1124 	sqe->flags	= IOSQE_FIXED_FILE;
1125 	sqe->rw_flags	= 0;
1126 	cmd->tag	= tag;
1127 	cmd->addr	= (__u64)(uintptr_t)(io->payload);
1128 	cmd->q_id	= q->q_id;
1129 
1130 	user_data = build_user_data(tag, cmd_op);
1131 	io_uring_sqe_set_data64(sqe, user_data);
1132 
1133 	io->cmd_op = 0;
1134 	q->cmd_inflight += 1;
1135 
1136 	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %u cmd_op %u) iof %x stopping %d\n",
1137 		      q->q_id, tag, cmd_op,
1138 		      io->cmd_op, q->is_stopping);
1139 }
1140 
1141 static int
1142 ublk_io_xmit(struct ublk_queue *q)
1143 {
1144 	TAILQ_HEAD(, ublk_io) buffer_free_list;
1145 	struct spdk_iobuf_channel *iobuf_ch;
1146 	int rc = 0, count = 0;
1147 	struct ublk_io *io;
1148 
1149 	if (TAILQ_EMPTY(&q->completed_io_list)) {
1150 		return 0;
1151 	}
1152 
1153 	TAILQ_INIT(&buffer_free_list);
1154 	while (!TAILQ_EMPTY(&q->completed_io_list)) {
1155 		io = TAILQ_FIRST(&q->completed_io_list);
1156 		assert(io != NULL);
1157 		/*
1158 		 * Remove IO from list now assuming it will be completed. It will be inserted
1159 		 * back to the head if it cannot be completed. This approach is specifically
1160 		 * taken to work around a scan-build use-after-free mischaracterization.
1161 		 */
1162 		TAILQ_REMOVE(&q->completed_io_list, io, tailq);
1163 		if (!io->need_data) {
1164 			TAILQ_INSERT_TAIL(&buffer_free_list, io, tailq);
1165 		}
1166 		ublksrv_queue_io_cmd(q, io, io->tag);
1167 		count++;
1168 	}
1169 
1170 	rc = io_uring_submit(&q->ring);
1171 	if (rc != count) {
1172 		SPDK_ERRLOG("could not submit all commands\n");
1173 		assert(false);
1174 	}
1175 
1176 	/* Note: for READ io, ublk will always copy the data out of
1177 	 * the buffers in the io_uring_submit context.  Since we
1178 	 * are not using SQPOLL for IO rings, we can safely free
1179 	 * those IO buffers here.  This design doesn't seem ideal,
1180 	 * but it's what's possible since there is no discrete
1181 	 * COMMIT_REQ operation.  That will need to change in the
1182 	 * future should we ever want to support async copy
1183 	 * operations.
1184 	 */
1185 	iobuf_ch = &q->poll_group->iobuf_ch;
1186 	while (!TAILQ_EMPTY(&buffer_free_list)) {
1187 		io = TAILQ_FIRST(&buffer_free_list);
1188 		TAILQ_REMOVE(&buffer_free_list, io, tailq);
1189 		ublk_io_put_buffer(io, iobuf_ch);
1190 	}
1191 	return rc;
1192 }
1193 
1194 static void
1195 write_get_buffer_done(struct ublk_io *io)
1196 {
1197 	io->need_data = true;
1198 	io->cmd_op = UBLK_IO_NEED_GET_DATA;
1199 	io->result = 0;
1200 
1201 	TAILQ_REMOVE(&io->q->inflight_io_list, io, tailq);
1202 	TAILQ_INSERT_TAIL(&io->q->completed_io_list, io, tailq);
1203 }
1204 
1205 static int
1206 ublk_io_recv(struct ublk_queue *q)
1207 {
1208 	struct io_uring_cqe *cqe;
1209 	unsigned head, tag;
1210 	int fetch, count = 0;
1211 	struct ublk_io *io;
1212 	struct spdk_iobuf_channel *iobuf_ch;
1213 
1214 	if (q->cmd_inflight == 0) {
1215 		return 0;
1216 	}
1217 
1218 	iobuf_ch = &q->poll_group->iobuf_ch;
1219 	io_uring_for_each_cqe(&q->ring, head, cqe) {
1220 		tag = user_data_to_tag(cqe->user_data);
1221 		fetch = (cqe->res != UBLK_IO_RES_ABORT) && !q->is_stopping;
1222 
1223 		SPDK_DEBUGLOG(ublk_io, "res %d qid %d tag %u cmd_op %u\n",
1224 			      cqe->res, q->q_id, tag, user_data_to_op(cqe->user_data));
1225 
1226 		q->cmd_inflight--;
1227 		io = &q->ios[tag];
1228 
1229 		if (!fetch) {
1230 			q->is_stopping = true;
1231 			if (io->cmd_op == UBLK_IO_FETCH_REQ) {
1232 				io->cmd_op = 0;
1233 			}
1234 		}
1235 
1236 		TAILQ_INSERT_TAIL(&q->inflight_io_list, io, tailq);
1237 		if (cqe->res == UBLK_IO_RES_OK) {
1238 			ublk_submit_bdev_io(q, io);
1239 		} else if (cqe->res == UBLK_IO_RES_NEED_GET_DATA) {
1240 			ublk_io_get_buffer(io, iobuf_ch, write_get_buffer_done);
1241 		} else {
1242 			if (cqe->res != UBLK_IO_RES_ABORT) {
1243 				SPDK_ERRLOG("ublk received error io: res %d qid %d tag %u cmd_op %u\n",
1244 					    cqe->res, q->q_id, tag, user_data_to_op(cqe->user_data));
1245 			}
1246 			TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1247 		}
1248 		count += 1;
1249 		if (count == UBLK_QUEUE_REQUEST) {
1250 			break;
1251 		}
1252 	}
1253 	io_uring_cq_advance(&q->ring, count);
1254 
1255 	return count;
1256 }
1257 
1258 static int
1259 ublk_poll(void *arg)
1260 {
1261 	struct ublk_poll_group *poll_group = arg;
1262 	struct ublk_queue *q, *q_tmp;
1263 	int sent, received, count = 0;
1264 
1265 	TAILQ_FOREACH_SAFE(q, &poll_group->queue_list, tailq, q_tmp) {
1266 		sent = ublk_io_xmit(q);
1267 		received = ublk_io_recv(q);
1268 		if (spdk_unlikely(q->is_stopping)) {
1269 			ublk_try_close_queue(q);
1270 		}
1271 		count += sent + received;
1272 	}
1273 	if (count > 0) {
1274 		return SPDK_POLLER_BUSY;
1275 	} else {
1276 		return SPDK_POLLER_IDLE;
1277 	}
1278 }
1279 
1280 static void
1281 ublk_bdev_hot_remove(struct spdk_ublk_dev *ublk)
1282 {
1283 	ublk_close_dev(ublk);
1284 }
1285 
1286 static void
1287 ublk_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1288 		   void *event_ctx)
1289 {
1290 	switch (type) {
1291 	case SPDK_BDEV_EVENT_REMOVE:
1292 		ublk_bdev_hot_remove(event_ctx);
1293 		break;
1294 	default:
1295 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1296 		break;
1297 	}
1298 }
1299 
1300 static void
1301 ublk_dev_init_io_cmds(struct io_uring *r, uint32_t q_depth)
1302 {
1303 	struct io_uring_sqe *sqe;
1304 	uint32_t i;
1305 
1306 	for (i = 0; i < q_depth; i++) {
1307 		sqe = ublk_uring_get_sqe(r, i);
1308 
1309 		/* These fields should be written once, never change */
1310 		sqe->flags = IOSQE_FIXED_FILE;
1311 		sqe->rw_flags = 0;
1312 		sqe->ioprio = 0;
1313 		sqe->off = 0;
1314 	}
1315 }
1316 
1317 static int
1318 ublk_dev_queue_init(struct ublk_queue *q)
1319 {
1320 	int rc = 0, cmd_buf_size;
1321 	uint32_t j;
1322 	struct spdk_ublk_dev *ublk = q->dev;
1323 	unsigned long off;
1324 
1325 	cmd_buf_size = ublk_queue_cmd_buf_sz(q->q_depth);
1326 	off = UBLKSRV_CMD_BUF_OFFSET +
1327 	      q->q_id * (UBLK_MAX_QUEUE_DEPTH * sizeof(struct ublksrv_io_desc));
1328 	q->io_cmd_buf = (struct ublksrv_io_desc *)mmap(0, cmd_buf_size, PROT_READ,
1329 			MAP_SHARED | MAP_POPULATE, ublk->cdev_fd, off);
1330 	if (q->io_cmd_buf == MAP_FAILED) {
1331 		q->io_cmd_buf = NULL;
1332 		rc = -errno;
1333 		SPDK_ERRLOG("Failed at mmap: %s\n", spdk_strerror(-rc));
1334 		goto err;
1335 	}
1336 
1337 	for (j = 0; j < q->q_depth; j++) {
1338 		q->ios[j].cmd_op = UBLK_IO_FETCH_REQ;
1339 		q->ios[j].iod = &q->io_cmd_buf[j];
1340 	}
1341 
1342 	rc = ublk_setup_ring(q->q_depth, &q->ring, IORING_SETUP_SQE128);
1343 	if (rc < 0) {
1344 		SPDK_ERRLOG("Failed at setup uring: %s\n", spdk_strerror(-rc));
1345 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1346 		q->io_cmd_buf = NULL;
1347 		goto err;
1348 	}
1349 
1350 	rc = io_uring_register_files(&q->ring, &ublk->cdev_fd, 1);
1351 	if (rc != 0) {
1352 		SPDK_ERRLOG("Failed at uring register files: %s\n", spdk_strerror(-rc));
1353 		io_uring_queue_exit(&q->ring);
1354 		q->ring.ring_fd = -1;
1355 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1356 		q->io_cmd_buf = NULL;
1357 		goto err;
1358 	}
1359 
1360 	ublk_dev_init_io_cmds(&q->ring, q->q_depth);
1361 
1362 	return 0;
1363 err:
1364 	return rc;
1365 }
1366 
1367 static void
1368 ublk_dev_queue_fini(struct ublk_queue *q)
1369 {
1370 	if (q->ring.ring_fd >= 0) {
1371 		io_uring_unregister_files(&q->ring);
1372 		io_uring_queue_exit(&q->ring);
1373 		q->ring.ring_fd = -1;
1374 	}
1375 	if (q->io_cmd_buf) {
1376 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1377 	}
1378 }
1379 
1380 static void
1381 ublk_dev_queue_io_init(struct ublk_queue *q)
1382 {
1383 	struct ublk_io *io;
1384 	uint32_t i;
1385 	int rc __attribute__((unused));
1386 	void *buf;
1387 
1388 	/* Some older kernels require a buffer to get posted, even
1389 	 * when NEED_GET_DATA has been specified.  So allocate a
1390 	 * temporary buffer, only for purposes of this workaround.
1391 	 * It never actually gets used, so we will free it immediately
1392 	 * after all of the commands are posted.
1393 	 */
1394 	buf = malloc(64);
1395 
1396 	assert(q->bdev_ch != NULL);
1397 
1398 	/* Initialize and submit all io commands to ublk driver */
1399 	for (i = 0; i < q->q_depth; i++) {
1400 		io = &q->ios[i];
1401 		io->tag = (uint16_t)i;
1402 		io->payload = buf;
1403 		io->bdev_ch = q->bdev_ch;
1404 		io->bdev_desc = q->dev->bdev_desc;
1405 		ublksrv_queue_io_cmd(q, io, i);
1406 	}
1407 
1408 	rc = io_uring_submit(&q->ring);
1409 	assert(rc == (int)q->q_depth);
1410 	for (i = 0; i < q->q_depth; i++) {
1411 		io = &q->ios[i];
1412 		io->payload = NULL;
1413 	}
1414 	free(buf);
1415 }
1416 
1417 static void
1418 ublk_set_params(struct spdk_ublk_dev *ublk)
1419 {
1420 	int rc;
1421 
1422 	ublk->dev_params.len = sizeof(struct ublk_params);
1423 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_SET_PARAMS);
1424 	if (rc < 0) {
1425 		SPDK_ERRLOG("UBLK can't set params for dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
1426 		ublk_delete_dev(ublk);
1427 		if (ublk->start_cb) {
1428 			ublk->start_cb(ublk->cb_arg, rc);
1429 			ublk->start_cb = NULL;
1430 		}
1431 	}
1432 }
1433 
1434 /* Set ublk device parameters based on bdev */
1435 static void
1436 ublk_info_param_init(struct spdk_ublk_dev *ublk)
1437 {
1438 	struct spdk_bdev *bdev = ublk->bdev;
1439 	uint32_t blk_size = spdk_bdev_get_data_block_size(bdev);
1440 	uint32_t pblk_size = spdk_bdev_get_physical_block_size(bdev);
1441 	uint32_t io_opt_blocks = spdk_bdev_get_optimal_io_boundary(bdev);
1442 	uint64_t num_blocks = spdk_bdev_get_num_blocks(bdev);
1443 	uint8_t sectors_per_block = blk_size >> LINUX_SECTOR_SHIFT;
1444 	uint32_t io_min_size = blk_size;
1445 	uint32_t io_opt_size = spdk_max(io_opt_blocks * blk_size, io_min_size);
1446 
1447 	struct ublksrv_ctrl_dev_info uinfo = {
1448 		.queue_depth = ublk->queue_depth,
1449 		.nr_hw_queues = ublk->num_queues,
1450 		.dev_id = ublk->ublk_id,
1451 		.max_io_buf_bytes = UBLK_IO_MAX_BYTES,
1452 		.ublksrv_pid = getpid(),
1453 		.flags = UBLK_F_NEED_GET_DATA | UBLK_F_URING_CMD_COMP_IN_TASK,
1454 	};
1455 	struct ublk_params uparams = {
1456 		.types = UBLK_PARAM_TYPE_BASIC,
1457 		.basic = {
1458 			.logical_bs_shift = spdk_u32log2(blk_size),
1459 			.physical_bs_shift = spdk_u32log2(pblk_size),
1460 			.io_min_shift = spdk_u32log2(io_min_size),
1461 			.io_opt_shift = spdk_u32log2(io_opt_size),
1462 			.dev_sectors = num_blocks * sectors_per_block,
1463 			.max_sectors = UBLK_IO_MAX_BYTES >> LINUX_SECTOR_SHIFT,
1464 		}
1465 	};
1466 
1467 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1468 		uparams.types |= UBLK_PARAM_TYPE_DISCARD;
1469 		uparams.discard.discard_alignment = sectors_per_block;
1470 		uparams.discard.max_discard_sectors = num_blocks * sectors_per_block;
1471 		uparams.discard.max_discard_segments = 1;
1472 		uparams.discard.discard_granularity = blk_size;
1473 		if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1474 			uparams.discard.max_write_zeroes_sectors = num_blocks * sectors_per_block;
1475 		}
1476 	}
1477 
1478 	ublk->dev_info = uinfo;
1479 	ublk->dev_params = uparams;
1480 }
1481 
1482 static void
1483 _ublk_free_dev(void *arg)
1484 {
1485 	struct spdk_ublk_dev *ublk = arg;
1486 
1487 	ublk_free_dev(ublk);
1488 }
1489 
1490 static void
1491 free_buffers(void *arg)
1492 {
1493 	struct ublk_queue *q = arg;
1494 	uint32_t i;
1495 
1496 	for (i = 0; i < q->q_depth; i++) {
1497 		ublk_io_put_buffer(&q->ios[i], &q->poll_group->iobuf_ch);
1498 	}
1499 	free(q->ios);
1500 	q->ios = NULL;
1501 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _ublk_free_dev, q->dev);
1502 }
1503 
1504 static void
1505 ublk_free_dev(struct spdk_ublk_dev *ublk)
1506 {
1507 	struct ublk_queue *q;
1508 	uint32_t q_idx;
1509 
1510 	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
1511 		q = &ublk->queues[q_idx];
1512 
1513 		/* The ublk_io of this queue are not initialized. */
1514 		if (q->ios == NULL) {
1515 			continue;
1516 		}
1517 
1518 		/* We found a queue that has an ios array that may have buffers
1519 		 * that need to be freed.  Send a message to the queue's thread
1520 		 * so it can free the buffers back to that thread's iobuf channel.
1521 		 * When it's done, it will set q->ios to NULL and send a message
1522 		 * back to this function to continue.
1523 		 */
1524 		if (q->poll_group) {
1525 			spdk_thread_send_msg(q->poll_group->ublk_thread, free_buffers, q);
1526 			return;
1527 		} else {
1528 			free(q->ios);
1529 			q->ios = NULL;
1530 		}
1531 	}
1532 
1533 	/* All of the buffers associated with the queues have been freed, so now
1534 	 * continue with releasing resources for the rest of the ublk device.
1535 	 */
1536 	if (ublk->bdev_desc) {
1537 		spdk_bdev_close(ublk->bdev_desc);
1538 		ublk->bdev_desc = NULL;
1539 	}
1540 
1541 	ublk_dev_list_unregister(ublk);
1542 
1543 	if (ublk->del_cb) {
1544 		ublk->del_cb(ublk->cb_arg);
1545 	}
1546 	SPDK_NOTICELOG("ublk dev %d stopped\n", ublk->ublk_id);
1547 	free(ublk);
1548 }
1549 
1550 static int
1551 ublk_ios_init(struct spdk_ublk_dev *ublk)
1552 {
1553 	int rc;
1554 	uint32_t i, j;
1555 	struct ublk_queue *q;
1556 
1557 	for (i = 0; i < ublk->num_queues; i++) {
1558 		q = &ublk->queues[i];
1559 
1560 		TAILQ_INIT(&q->completed_io_list);
1561 		TAILQ_INIT(&q->inflight_io_list);
1562 		q->dev = ublk;
1563 		q->q_id = i;
1564 		q->q_depth = ublk->queue_depth;
1565 		q->ios = calloc(q->q_depth, sizeof(struct ublk_io));
1566 		if (!q->ios) {
1567 			rc = -ENOMEM;
1568 			SPDK_ERRLOG("could not allocate queue ios\n");
1569 			goto err;
1570 		}
1571 		for (j = 0; j < q->q_depth; j++) {
1572 			q->ios[j].q = q;
1573 		}
1574 	}
1575 
1576 	return 0;
1577 
1578 err:
1579 	for (i = 0; i < ublk->num_queues; i++) {
1580 		free(q->ios);
1581 		q->ios = NULL;
1582 	}
1583 	return rc;
1584 }
1585 
1586 static void
1587 ublk_queue_run(void *arg1)
1588 {
1589 	struct ublk_queue	*q = arg1;
1590 	struct spdk_ublk_dev *ublk = q->dev;
1591 	struct ublk_poll_group *poll_group = q->poll_group;
1592 
1593 	assert(spdk_get_thread() == poll_group->ublk_thread);
1594 	q->bdev_ch = spdk_bdev_get_io_channel(ublk->bdev_desc);
1595 	/* Queues must be filled with IO in the io pthread */
1596 	ublk_dev_queue_io_init(q);
1597 
1598 	TAILQ_INSERT_TAIL(&poll_group->queue_list, q, tailq);
1599 }
1600 
1601 int
1602 ublk_start_disk(const char *bdev_name, uint32_t ublk_id,
1603 		uint32_t num_queues, uint32_t queue_depth,
1604 		ublk_start_cb start_cb, void *cb_arg)
1605 {
1606 	int			rc;
1607 	uint32_t		i;
1608 	struct spdk_bdev	*bdev;
1609 	struct spdk_ublk_dev	*ublk = NULL;
1610 	uint32_t		sector_per_block;
1611 
1612 	assert(spdk_thread_is_app_thread(NULL));
1613 
1614 	if (g_ublk_tgt.active == false) {
1615 		SPDK_ERRLOG("NO ublk target exist\n");
1616 		return -ENODEV;
1617 	}
1618 
1619 	ublk = ublk_dev_find_by_id(ublk_id);
1620 	if (ublk != NULL) {
1621 		SPDK_DEBUGLOG(ublk, "ublk id %d is in use.\n", ublk_id);
1622 		return -EBUSY;
1623 	}
1624 
1625 	if (g_ublk_tgt.num_ublk_devs >= g_ublks_max) {
1626 		SPDK_DEBUGLOG(ublk, "Reached maximum number of supported devices: %u\n", g_ublks_max);
1627 		return -ENOTSUP;
1628 	}
1629 
1630 	ublk = calloc(1, sizeof(*ublk));
1631 	if (ublk == NULL) {
1632 		return -ENOMEM;
1633 	}
1634 	ublk->start_cb = start_cb;
1635 	ublk->cb_arg = cb_arg;
1636 	ublk->cdev_fd = -1;
1637 	ublk->ublk_id = ublk_id;
1638 	UBLK_DEBUGLOG(ublk, "bdev %s num_queues %d queue_depth %d\n",
1639 		      bdev_name, num_queues, queue_depth);
1640 
1641 	rc = spdk_bdev_open_ext(bdev_name, true, ublk_bdev_event_cb, ublk, &ublk->bdev_desc);
1642 	if (rc != 0) {
1643 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc);
1644 		free(ublk);
1645 		return rc;
1646 	}
1647 
1648 	bdev = spdk_bdev_desc_get_bdev(ublk->bdev_desc);
1649 	ublk->bdev = bdev;
1650 	sector_per_block = spdk_bdev_get_data_block_size(ublk->bdev) >> LINUX_SECTOR_SHIFT;
1651 	ublk->sector_per_block_shift = spdk_u32log2(sector_per_block);
1652 
1653 	ublk->queues_closed = 0;
1654 	ublk->num_queues = num_queues;
1655 	ublk->queue_depth = queue_depth;
1656 	if (ublk->queue_depth > UBLK_DEV_MAX_QUEUE_DEPTH) {
1657 		SPDK_WARNLOG("Set Queue depth %d of UBLK %d to maximum %d\n",
1658 			     ublk->queue_depth, ublk->ublk_id, UBLK_DEV_MAX_QUEUE_DEPTH);
1659 		ublk->queue_depth = UBLK_DEV_MAX_QUEUE_DEPTH;
1660 	}
1661 	if (ublk->num_queues > UBLK_DEV_MAX_QUEUES) {
1662 		SPDK_WARNLOG("Set Queue num %d of UBLK %d to maximum %d\n",
1663 			     ublk->num_queues, ublk->ublk_id, UBLK_DEV_MAX_QUEUES);
1664 		ublk->num_queues = UBLK_DEV_MAX_QUEUES;
1665 	}
1666 	for (i = 0; i < ublk->num_queues; i++) {
1667 		ublk->queues[i].ring.ring_fd = -1;
1668 	}
1669 
1670 	ublk_info_param_init(ublk);
1671 	rc = ublk_ios_init(ublk);
1672 	if (rc != 0) {
1673 		spdk_bdev_close(ublk->bdev_desc);
1674 		free(ublk);
1675 		return rc;
1676 	}
1677 
1678 	SPDK_INFOLOG(ublk, "Enabling kernel access to bdev %s via ublk %d\n",
1679 		     bdev_name, ublk_id);
1680 
1681 	/* Add ublk_dev to the end of disk list */
1682 	ublk_dev_list_register(ublk);
1683 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_ADD_DEV);
1684 	if (rc < 0) {
1685 		SPDK_ERRLOG("UBLK can't add dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
1686 		ublk_free_dev(ublk);
1687 	}
1688 
1689 	return rc;
1690 }
1691 
1692 static void
1693 ublk_finish_start(struct spdk_ublk_dev *ublk)
1694 {
1695 	int			rc;
1696 	uint32_t		q_id;
1697 	struct spdk_thread	*ublk_thread;
1698 	char			buf[64];
1699 
1700 	snprintf(buf, 64, "%s%d", UBLK_BLK_CDEV, ublk->ublk_id);
1701 	ublk->cdev_fd = open(buf, O_RDWR);
1702 	if (ublk->cdev_fd < 0) {
1703 		rc = ublk->cdev_fd;
1704 		SPDK_ERRLOG("can't open %s, rc %d\n", buf, rc);
1705 		goto err;
1706 	}
1707 
1708 	for (q_id = 0; q_id < ublk->num_queues; q_id++) {
1709 		rc = ublk_dev_queue_init(&ublk->queues[q_id]);
1710 		if (rc) {
1711 			goto err;
1712 		}
1713 	}
1714 
1715 	rc = ublk_ctrl_cmd(ublk, UBLK_CMD_START_DEV);
1716 	if (rc < 0) {
1717 		SPDK_ERRLOG("start dev %d failed, rc %s\n", ublk->ublk_id,
1718 			    spdk_strerror(-rc));
1719 		goto err;
1720 	}
1721 
1722 	/* Send queue to different spdk_threads for load balance */
1723 	for (q_id = 0; q_id < ublk->num_queues; q_id++) {
1724 		ublk->queues[q_id].poll_group = &g_ublk_tgt.poll_groups[g_next_ublk_poll_group];
1725 		ublk_thread = g_ublk_tgt.poll_groups[g_next_ublk_poll_group].ublk_thread;
1726 		spdk_thread_send_msg(ublk_thread, ublk_queue_run, &ublk->queues[q_id]);
1727 		g_next_ublk_poll_group++;
1728 		if (g_next_ublk_poll_group == g_num_ublk_poll_groups) {
1729 			g_next_ublk_poll_group = 0;
1730 		}
1731 	}
1732 
1733 	goto out;
1734 
1735 err:
1736 	ublk_delete_dev(ublk);
1737 out:
1738 	if (ublk->start_cb) {
1739 		ublk->start_cb(ublk->cb_arg, rc);
1740 		ublk->start_cb = NULL;
1741 	}
1742 }
1743 
1744 SPDK_LOG_REGISTER_COMPONENT(ublk)
1745 SPDK_LOG_REGISTER_COMPONENT(ublk_io)
1746