xref: /spdk/lib/ublk/ublk.c (revision 12fbe739a31b09aff0d05f354d4f3bbef99afc55)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2022 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include <liburing.h>
7 
8 #include "spdk/stdinc.h"
9 #include "spdk/string.h"
10 #include "spdk/bdev.h"
11 #include "spdk/endian.h"
12 #include "spdk/env.h"
13 #include "spdk/likely.h"
14 #include "spdk/log.h"
15 #include "spdk/util.h"
16 #include "spdk/queue.h"
17 #include "spdk/json.h"
18 #include "spdk/ublk.h"
19 #include "spdk/thread.h"
20 
21 #include "ublk_internal.h"
22 
23 #define UBLK_CTRL_DEV					"/dev/ublk-control"
24 #define UBLK_BLK_CDEV					"/dev/ublkc"
25 
26 #define LINUX_SECTOR_SHIFT				9
27 #define UBLK_IO_MAX_BYTES				SPDK_BDEV_LARGE_BUF_MAX_SIZE
28 #define UBLK_DEV_MAX_QUEUES				32
29 #define UBLK_DEV_MAX_QUEUE_DEPTH			1024
30 #define UBLK_QUEUE_REQUEST				32
31 #define UBLK_STOP_BUSY_WAITING_MS			10000
32 #define UBLK_BUSY_POLLING_INTERVAL_US			20000
33 #define UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US	1000
34 /* By default, kernel ublk_drv driver can support up to 64 block devices */
35 #define UBLK_DEFAULT_MAX_SUPPORTED_DEVS			64
36 
37 #define UBLK_IOBUF_SMALL_CACHE_SIZE			128
38 #define UBLK_IOBUF_LARGE_CACHE_SIZE			32
39 
40 #define UBLK_DEBUGLOG(ublk, format, ...) \
41 	SPDK_DEBUGLOG(ublk, "ublk%d: " format, ublk->ublk_id, ##__VA_ARGS__);
42 
43 static uint32_t g_num_ublk_poll_groups = 0;
44 static uint32_t g_next_ublk_poll_group = 0;
45 static uint32_t g_ublks_max = UBLK_DEFAULT_MAX_SUPPORTED_DEVS;
46 static struct spdk_cpuset g_core_mask;
47 
48 struct ublk_queue;
49 struct ublk_poll_group;
50 struct ublk_io;
51 static void _ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io);
52 static void ublk_dev_queue_fini(struct ublk_queue *q);
53 static int ublk_poll(void *arg);
54 
55 static int ublk_set_params(struct spdk_ublk_dev *ublk);
56 static int ublk_finish_start(struct spdk_ublk_dev *ublk);
57 static void ublk_free_dev(struct spdk_ublk_dev *ublk);
58 static void ublk_delete_dev(void *arg);
59 static int ublk_close_dev(struct spdk_ublk_dev *ublk);
60 
61 static const char *ublk_op_name[64]
62 __attribute__((unused)) = {
63 	[UBLK_CMD_ADD_DEV] =	"UBLK_CMD_ADD_DEV",
64 	[UBLK_CMD_DEL_DEV] =	"UBLK_CMD_DEL_DEV",
65 	[UBLK_CMD_START_DEV] =	"UBLK_CMD_START_DEV",
66 	[UBLK_CMD_STOP_DEV] =	"UBLK_CMD_STOP_DEV",
67 	[UBLK_CMD_SET_PARAMS] =	"UBLK_CMD_SET_PARAMS",
68 };
69 
70 typedef void (*ublk_get_buf_cb)(struct ublk_io *io);
71 
72 struct ublk_io {
73 	void			*payload;
74 	void			*mpool_entry;
75 	bool			need_data;
76 	bool			user_copy;
77 	uint16_t		tag;
78 	uint64_t		payload_size;
79 	uint32_t		cmd_op;
80 	int32_t			result;
81 	struct spdk_bdev_desc	*bdev_desc;
82 	struct spdk_io_channel	*bdev_ch;
83 	const struct ublksrv_io_desc	*iod;
84 	ublk_get_buf_cb		get_buf_cb;
85 	struct ublk_queue	*q;
86 	/* for bdev io_wait */
87 	struct spdk_bdev_io_wait_entry bdev_io_wait;
88 	struct spdk_iobuf_entry	iobuf;
89 
90 	TAILQ_ENTRY(ublk_io)	tailq;
91 };
92 
93 struct ublk_queue {
94 	uint32_t		q_id;
95 	uint32_t		q_depth;
96 	struct ublk_io		*ios;
97 	TAILQ_HEAD(, ublk_io)	completed_io_list;
98 	TAILQ_HEAD(, ublk_io)	inflight_io_list;
99 	uint32_t		cmd_inflight;
100 	bool			is_stopping;
101 	struct ublksrv_io_desc	*io_cmd_buf;
102 	/* ring depth == dev_info->queue_depth. */
103 	struct io_uring		ring;
104 	struct spdk_ublk_dev	*dev;
105 	struct ublk_poll_group	*poll_group;
106 	struct spdk_io_channel	*bdev_ch;
107 
108 	TAILQ_ENTRY(ublk_queue)	tailq;
109 };
110 
111 struct spdk_ublk_dev {
112 	struct spdk_bdev	*bdev;
113 	struct spdk_bdev_desc	*bdev_desc;
114 
115 	int			cdev_fd;
116 	struct ublk_params	dev_params;
117 	struct ublksrv_ctrl_dev_info	dev_info;
118 
119 	uint32_t		ublk_id;
120 	uint32_t		num_queues;
121 	uint32_t		queue_depth;
122 	uint32_t		sector_per_block_shift;
123 	struct ublk_queue	queues[UBLK_DEV_MAX_QUEUES];
124 
125 	struct spdk_poller	*retry_poller;
126 	int			retry_count;
127 	uint32_t		queues_closed;
128 	ublk_ctrl_cb		ctrl_cb;
129 	void			*cb_arg;
130 	uint32_t		current_cmd_op;
131 	uint32_t		ctrl_ops_in_progress;
132 	bool			is_closing;
133 
134 	TAILQ_ENTRY(spdk_ublk_dev) tailq;
135 	TAILQ_ENTRY(spdk_ublk_dev) wait_tailq;
136 };
137 
138 struct ublk_poll_group {
139 	struct spdk_thread		*ublk_thread;
140 	struct spdk_poller		*ublk_poller;
141 	struct spdk_iobuf_channel	iobuf_ch;
142 	TAILQ_HEAD(, ublk_queue)	queue_list;
143 };
144 
145 struct ublk_tgt {
146 	int			ctrl_fd;
147 	bool			active;
148 	bool			is_destroying;
149 	spdk_ublk_fini_cb	cb_fn;
150 	void			*cb_arg;
151 	struct io_uring		ctrl_ring;
152 	struct spdk_poller	*ctrl_poller;
153 	uint32_t		ctrl_ops_in_progress;
154 	struct ublk_poll_group	*poll_groups;
155 	uint32_t		num_ublk_devs;
156 	uint64_t		features;
157 	/* `ublk_drv` supports UBLK_F_CMD_IOCTL_ENCODE */
158 	bool			ioctl_encode;
159 	/* `ublk_drv` supports UBLK_F_USER_COPY */
160 	bool			user_copy;
161 };
162 
163 static TAILQ_HEAD(, spdk_ublk_dev) g_ublk_devs = TAILQ_HEAD_INITIALIZER(g_ublk_devs);
164 static struct ublk_tgt g_ublk_tgt;
165 
166 /* helpers for using io_uring */
167 static inline int
168 ublk_setup_ring(uint32_t depth, struct io_uring *r, unsigned flags)
169 {
170 	struct io_uring_params p = {};
171 
172 	p.flags = flags | IORING_SETUP_CQSIZE;
173 	p.cq_entries = depth;
174 
175 	return io_uring_queue_init_params(depth, r, &p);
176 }
177 
178 static inline struct io_uring_sqe *
179 ublk_uring_get_sqe(struct io_uring *r, uint32_t idx)
180 {
181 	/* Need to update the idx since we set IORING_SETUP_SQE128 parameter in ublk_setup_ring */
182 	return &r->sq.sqes[idx << 1];
183 }
184 
185 static inline void *
186 ublk_get_sqe_cmd(struct io_uring_sqe *sqe)
187 {
188 	return (void *)&sqe->addr3;
189 }
190 
191 static inline void
192 ublk_set_sqe_cmd_op(struct io_uring_sqe *sqe, uint32_t cmd_op)
193 {
194 	uint32_t opc = cmd_op;
195 
196 	if (g_ublk_tgt.ioctl_encode) {
197 		switch (cmd_op) {
198 		/* ctrl uring */
199 		case UBLK_CMD_GET_DEV_INFO:
200 			opc = _IOR('u', UBLK_CMD_GET_DEV_INFO, struct ublksrv_ctrl_cmd);
201 			break;
202 		case UBLK_CMD_ADD_DEV:
203 			opc = _IOWR('u', UBLK_CMD_ADD_DEV, struct ublksrv_ctrl_cmd);
204 			break;
205 		case UBLK_CMD_DEL_DEV:
206 			opc = _IOWR('u', UBLK_CMD_DEL_DEV, struct ublksrv_ctrl_cmd);
207 			break;
208 		case UBLK_CMD_START_DEV:
209 			opc = _IOWR('u', UBLK_CMD_START_DEV, struct ublksrv_ctrl_cmd);
210 			break;
211 		case UBLK_CMD_STOP_DEV:
212 			opc = _IOWR('u', UBLK_CMD_STOP_DEV, struct ublksrv_ctrl_cmd);
213 			break;
214 		case UBLK_CMD_SET_PARAMS:
215 			opc = _IOWR('u', UBLK_CMD_SET_PARAMS, struct ublksrv_ctrl_cmd);
216 			break;
217 
218 		/* io uring */
219 		case UBLK_IO_FETCH_REQ:
220 			opc = _IOWR('u', UBLK_IO_FETCH_REQ, struct ublksrv_io_cmd);
221 			break;
222 		case UBLK_IO_COMMIT_AND_FETCH_REQ:
223 			opc = _IOWR('u', UBLK_IO_COMMIT_AND_FETCH_REQ, struct ublksrv_io_cmd);
224 			break;
225 		case UBLK_IO_NEED_GET_DATA:
226 			opc = _IOWR('u', UBLK_IO_NEED_GET_DATA, struct ublksrv_io_cmd);
227 			break;
228 		default:
229 			break;
230 		}
231 	}
232 
233 	sqe->off = opc;
234 }
235 
236 static inline uint64_t
237 build_user_data(uint16_t tag, uint8_t op)
238 {
239 	assert(!(tag >> 16) && !(op >> 8));
240 
241 	return tag | (op << 16);
242 }
243 
244 static inline uint16_t
245 user_data_to_tag(uint64_t user_data)
246 {
247 	return user_data & 0xffff;
248 }
249 
250 static inline uint8_t
251 user_data_to_op(uint64_t user_data)
252 {
253 	return (user_data >> 16) & 0xff;
254 }
255 
256 static inline uint64_t
257 ublk_user_copy_pos(uint16_t q_id, uint16_t tag)
258 {
259 	return (uint64_t)UBLKSRV_IO_BUF_OFFSET + ((((uint64_t)q_id) << UBLK_QID_OFF) | (((
260 				uint64_t)tag) << UBLK_TAG_OFF));
261 }
262 
263 void
264 spdk_ublk_init(void)
265 {
266 	assert(spdk_thread_is_app_thread(NULL));
267 
268 	g_ublk_tgt.ctrl_fd = -1;
269 	g_ublk_tgt.ctrl_ring.ring_fd = -1;
270 }
271 
272 static void
273 ublk_ctrl_cmd_error(struct spdk_ublk_dev *ublk, int32_t res)
274 {
275 	assert(res != 0);
276 
277 	SPDK_ERRLOG("ctrlr cmd %s failed, %s\n", ublk_op_name[ublk->current_cmd_op], spdk_strerror(-res));
278 	if (ublk->ctrl_cb) {
279 		ublk->ctrl_cb(ublk->cb_arg, res);
280 		ublk->ctrl_cb = NULL;
281 	}
282 
283 	switch (ublk->current_cmd_op) {
284 	case UBLK_CMD_ADD_DEV:
285 	case UBLK_CMD_SET_PARAMS:
286 		ublk_delete_dev(ublk);
287 		break;
288 	case UBLK_CMD_START_DEV:
289 		ublk_close_dev(ublk);
290 		break;
291 	case UBLK_CMD_STOP_DEV:
292 	case UBLK_CMD_DEL_DEV:
293 		break;
294 	default:
295 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", ublk->current_cmd_op);
296 		break;
297 	}
298 }
299 
300 static void
301 ublk_ctrl_process_cqe(struct io_uring_cqe *cqe)
302 {
303 	struct spdk_ublk_dev *ublk;
304 	int rc = 0;
305 
306 	ublk = (struct spdk_ublk_dev *)cqe->user_data;
307 	UBLK_DEBUGLOG(ublk, "ctrl cmd completed\n");
308 	ublk->ctrl_ops_in_progress--;
309 
310 	if (spdk_unlikely(cqe->res != 0)) {
311 		SPDK_ERRLOG("ctrlr cmd failed\n");
312 		ublk_ctrl_cmd_error(ublk, cqe->res);
313 		return;
314 	}
315 
316 	switch (ublk->current_cmd_op) {
317 	case UBLK_CMD_ADD_DEV:
318 		rc = ublk_set_params(ublk);
319 		if (rc < 0) {
320 			ublk_delete_dev(ublk);
321 			goto start_done;
322 		}
323 		break;
324 	case UBLK_CMD_SET_PARAMS:
325 		rc = ublk_finish_start(ublk);
326 		if (rc < 0) {
327 			ublk_delete_dev(ublk);
328 			goto start_done;
329 		}
330 		break;
331 	case UBLK_CMD_START_DEV:
332 		goto start_done;
333 		break;
334 	case UBLK_CMD_STOP_DEV:
335 		break;
336 	case UBLK_CMD_DEL_DEV:
337 		if (ublk->ctrl_cb) {
338 			ublk->ctrl_cb(ublk->cb_arg, 0);
339 			ublk->ctrl_cb = NULL;
340 		}
341 		ublk_free_dev(ublk);
342 		break;
343 	default:
344 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", ublk->current_cmd_op);
345 		break;
346 	}
347 
348 	return;
349 
350 start_done:
351 	if (ublk->ctrl_cb) {
352 		ublk->ctrl_cb(ublk->cb_arg, rc);
353 		ublk->ctrl_cb = NULL;
354 	}
355 }
356 
357 static int
358 ublk_ctrl_poller(void *arg)
359 {
360 	struct io_uring *ring = &g_ublk_tgt.ctrl_ring;
361 	struct io_uring_cqe *cqe;
362 	const int max = 8;
363 	int i, count = 0, rc;
364 
365 	if (!g_ublk_tgt.ctrl_ops_in_progress) {
366 		return SPDK_POLLER_IDLE;
367 	}
368 
369 	for (i = 0; i < max; i++) {
370 		rc = io_uring_peek_cqe(ring, &cqe);
371 		if (rc == -EAGAIN) {
372 			break;
373 		}
374 
375 		assert(cqe != NULL);
376 		g_ublk_tgt.ctrl_ops_in_progress--;
377 
378 		ublk_ctrl_process_cqe(cqe);
379 
380 		io_uring_cqe_seen(ring, cqe);
381 		count++;
382 	}
383 
384 	return count > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
385 }
386 
387 static int
388 ublk_ctrl_cmd_submit(struct spdk_ublk_dev *ublk, uint32_t cmd_op)
389 {
390 	uint32_t dev_id = ublk->ublk_id;
391 	int rc = -EINVAL;
392 	struct io_uring_sqe *sqe;
393 	struct ublksrv_ctrl_cmd *cmd;
394 
395 	UBLK_DEBUGLOG(ublk, "ctrl cmd %s\n", ublk_op_name[cmd_op]);
396 
397 	sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring);
398 	if (!sqe) {
399 		SPDK_ERRLOG("No available sqe in ctrl ring\n");
400 		assert(false);
401 		return -ENOENT;
402 	}
403 
404 	cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe);
405 	sqe->fd = g_ublk_tgt.ctrl_fd;
406 	sqe->opcode = IORING_OP_URING_CMD;
407 	sqe->ioprio = 0;
408 	cmd->dev_id = dev_id;
409 	cmd->queue_id = -1;
410 	ublk->current_cmd_op = cmd_op;
411 
412 	switch (cmd_op) {
413 	case UBLK_CMD_ADD_DEV:
414 		cmd->addr = (__u64)(uintptr_t)&ublk->dev_info;
415 		cmd->len = sizeof(ublk->dev_info);
416 		break;
417 	case UBLK_CMD_SET_PARAMS:
418 		cmd->addr = (__u64)(uintptr_t)&ublk->dev_params;
419 		cmd->len = sizeof(ublk->dev_params);
420 		break;
421 	case UBLK_CMD_START_DEV:
422 		cmd->data[0] = getpid();
423 		break;
424 	case UBLK_CMD_STOP_DEV:
425 		break;
426 	case UBLK_CMD_DEL_DEV:
427 		break;
428 	default:
429 		SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", cmd_op);
430 		return -EINVAL;
431 	}
432 	ublk_set_sqe_cmd_op(sqe, cmd_op);
433 	io_uring_sqe_set_data(sqe, ublk);
434 
435 	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
436 	if (rc < 0) {
437 		SPDK_ERRLOG("uring submit rc %d\n", rc);
438 		assert(false);
439 		return rc;
440 	}
441 	g_ublk_tgt.ctrl_ops_in_progress++;
442 	ublk->ctrl_ops_in_progress++;
443 
444 	return 0;
445 }
446 
447 static int
448 ublk_ctrl_cmd_get_features(void)
449 {
450 	int rc;
451 	struct io_uring_sqe *sqe;
452 	struct io_uring_cqe *cqe;
453 	struct ublksrv_ctrl_cmd *cmd;
454 	uint32_t cmd_op;
455 
456 	sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring);
457 	if (!sqe) {
458 		SPDK_ERRLOG("No available sqe in ctrl ring\n");
459 		assert(false);
460 		return -ENOENT;
461 	}
462 
463 	cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe);
464 	sqe->fd = g_ublk_tgt.ctrl_fd;
465 	sqe->opcode = IORING_OP_URING_CMD;
466 	sqe->ioprio = 0;
467 	cmd->dev_id = -1;
468 	cmd->queue_id = -1;
469 	cmd->addr = (__u64)(uintptr_t)&g_ublk_tgt.features;
470 	cmd->len = sizeof(g_ublk_tgt.features);
471 
472 	cmd_op = UBLK_U_CMD_GET_FEATURES;
473 	ublk_set_sqe_cmd_op(sqe, cmd_op);
474 
475 	rc = io_uring_submit(&g_ublk_tgt.ctrl_ring);
476 	if (rc < 0) {
477 		SPDK_ERRLOG("uring submit rc %d\n", rc);
478 		return rc;
479 	}
480 
481 	rc = io_uring_wait_cqe(&g_ublk_tgt.ctrl_ring, &cqe);
482 	if (rc < 0) {
483 		SPDK_ERRLOG("wait cqe rc %d\n", rc);
484 		return rc;
485 	}
486 
487 	if (cqe->res == 0) {
488 		g_ublk_tgt.ioctl_encode = !!(g_ublk_tgt.features & UBLK_F_CMD_IOCTL_ENCODE);
489 		g_ublk_tgt.user_copy = !!(g_ublk_tgt.features & UBLK_F_USER_COPY);
490 	}
491 	io_uring_cqe_seen(&g_ublk_tgt.ctrl_ring, cqe);
492 
493 	return 0;
494 }
495 
496 static int
497 ublk_queue_cmd_buf_sz(uint32_t q_depth)
498 {
499 	uint32_t size = q_depth * sizeof(struct ublksrv_io_desc);
500 	uint32_t page_sz = getpagesize();
501 
502 	/* round up size */
503 	return (size + page_sz - 1) & ~(page_sz - 1);
504 }
505 
506 static int
507 ublk_get_max_support_devs(void)
508 {
509 	FILE *file;
510 	char str[128];
511 
512 	file = fopen("/sys/module/ublk_drv/parameters/ublks_max", "r");
513 	if (!file) {
514 		return -ENOENT;
515 	}
516 
517 	if (!fgets(str, sizeof(str), file)) {
518 		fclose(file);
519 		return -EINVAL;
520 	}
521 	fclose(file);
522 
523 	spdk_str_chomp(str);
524 	return spdk_strtol(str, 10);
525 }
526 
527 static int
528 ublk_open(void)
529 {
530 	int rc, ublks_max;
531 
532 	g_ublk_tgt.ctrl_fd = open(UBLK_CTRL_DEV, O_RDWR);
533 	if (g_ublk_tgt.ctrl_fd < 0) {
534 		rc = errno;
535 		SPDK_ERRLOG("UBLK conrol dev %s can't be opened, error=%s\n", UBLK_CTRL_DEV, spdk_strerror(errno));
536 		return -rc;
537 	}
538 
539 	ublks_max = ublk_get_max_support_devs();
540 	if (ublks_max > 0) {
541 		g_ublks_max = ublks_max;
542 	}
543 
544 	/* We need to set SQPOLL for kernels 6.1 and earlier, since they would not defer ublk ctrl
545 	 * ring processing to a workqueue.  Ctrl ring processing is minimal, so SQPOLL is fine.
546 	 * All the commands sent via control uring for a ublk device is executed one by one, so use
547 	 * ublks_max * 2 as the number of uring entries is enough.
548 	 */
549 	rc = ublk_setup_ring(g_ublks_max * 2, &g_ublk_tgt.ctrl_ring,
550 			     IORING_SETUP_SQE128 | IORING_SETUP_SQPOLL);
551 	if (rc < 0) {
552 		SPDK_ERRLOG("UBLK ctrl queue_init: %s\n", spdk_strerror(-rc));
553 		goto err;
554 	}
555 
556 	rc = ublk_ctrl_cmd_get_features();
557 	if (rc) {
558 		goto err;
559 	}
560 
561 	return 0;
562 
563 err:
564 	close(g_ublk_tgt.ctrl_fd);
565 	g_ublk_tgt.ctrl_fd = -1;
566 	return rc;
567 }
568 
569 static int
570 ublk_parse_core_mask(const char *mask)
571 {
572 	struct spdk_cpuset tmp_mask;
573 	int rc;
574 
575 	if (mask == NULL) {
576 		spdk_env_get_cpuset(&g_core_mask);
577 		return 0;
578 	}
579 
580 	rc = spdk_cpuset_parse(&g_core_mask, mask);
581 	if (rc < 0) {
582 		SPDK_ERRLOG("invalid cpumask %s\n", mask);
583 		return -EINVAL;
584 	}
585 
586 	if (spdk_cpuset_count(&g_core_mask) == 0) {
587 		SPDK_ERRLOG("no cpus specified\n");
588 		return -EINVAL;
589 	}
590 
591 	spdk_env_get_cpuset(&tmp_mask);
592 	spdk_cpuset_and(&tmp_mask, &g_core_mask);
593 
594 	if (!spdk_cpuset_equal(&tmp_mask, &g_core_mask)) {
595 		SPDK_ERRLOG("one of selected cpu is outside of core mask(=%s)\n",
596 			    spdk_cpuset_fmt(&g_core_mask));
597 		return -EINVAL;
598 	}
599 
600 	return 0;
601 }
602 
603 static void
604 ublk_poller_register(void *args)
605 {
606 	struct ublk_poll_group *poll_group = args;
607 	int rc;
608 
609 	assert(spdk_get_thread() == poll_group->ublk_thread);
610 	/* Bind ublk spdk_thread to current CPU core in order to avoid thread context switch
611 	 * during uring processing as required by ublk kernel.
612 	 */
613 	spdk_thread_bind(spdk_get_thread(), true);
614 
615 	TAILQ_INIT(&poll_group->queue_list);
616 	poll_group->ublk_poller = SPDK_POLLER_REGISTER(ublk_poll, poll_group, 0);
617 	rc = spdk_iobuf_channel_init(&poll_group->iobuf_ch, "ublk",
618 				     UBLK_IOBUF_SMALL_CACHE_SIZE, UBLK_IOBUF_LARGE_CACHE_SIZE);
619 	if (rc != 0) {
620 		assert(false);
621 	}
622 }
623 
624 int
625 ublk_create_target(const char *cpumask_str)
626 {
627 	int rc;
628 	uint32_t i;
629 	char thread_name[32];
630 	struct ublk_poll_group *poll_group;
631 
632 	if (g_ublk_tgt.active == true) {
633 		SPDK_ERRLOG("UBLK target has been created\n");
634 		return -EBUSY;
635 	}
636 
637 	rc = ublk_parse_core_mask(cpumask_str);
638 	if (rc != 0) {
639 		return rc;
640 	}
641 
642 	assert(g_ublk_tgt.poll_groups == NULL);
643 	g_ublk_tgt.poll_groups = calloc(spdk_env_get_core_count(), sizeof(*poll_group));
644 	if (!g_ublk_tgt.poll_groups) {
645 		return -ENOMEM;
646 	}
647 
648 	rc = ublk_open();
649 	if (rc != 0) {
650 		SPDK_ERRLOG("Fail to open UBLK, error=%s\n", spdk_strerror(-rc));
651 		free(g_ublk_tgt.poll_groups);
652 		g_ublk_tgt.poll_groups = NULL;
653 		return rc;
654 	}
655 
656 	spdk_iobuf_register_module("ublk");
657 
658 	SPDK_ENV_FOREACH_CORE(i) {
659 		if (!spdk_cpuset_get_cpu(&g_core_mask, i)) {
660 			continue;
661 		}
662 		snprintf(thread_name, sizeof(thread_name), "ublk_thread%u", i);
663 		poll_group = &g_ublk_tgt.poll_groups[g_num_ublk_poll_groups];
664 		poll_group->ublk_thread = spdk_thread_create(thread_name, &g_core_mask);
665 		spdk_thread_send_msg(poll_group->ublk_thread, ublk_poller_register, poll_group);
666 		g_num_ublk_poll_groups++;
667 	}
668 
669 	assert(spdk_thread_is_app_thread(NULL));
670 	g_ublk_tgt.active = true;
671 	g_ublk_tgt.ctrl_ops_in_progress = 0;
672 	g_ublk_tgt.ctrl_poller = SPDK_POLLER_REGISTER(ublk_ctrl_poller, NULL,
673 				 UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US);
674 
675 	SPDK_NOTICELOG("UBLK target created successfully\n");
676 
677 	return 0;
678 }
679 
680 static void
681 _ublk_fini_done(void *args)
682 {
683 	SPDK_DEBUGLOG(ublk, "\n");
684 
685 	g_num_ublk_poll_groups = 0;
686 	g_next_ublk_poll_group = 0;
687 	g_ublk_tgt.is_destroying = false;
688 	g_ublk_tgt.active = false;
689 	g_ublk_tgt.features = 0;
690 	g_ublk_tgt.ioctl_encode = false;
691 	g_ublk_tgt.user_copy = false;
692 
693 	if (g_ublk_tgt.cb_fn) {
694 		g_ublk_tgt.cb_fn(g_ublk_tgt.cb_arg);
695 		g_ublk_tgt.cb_fn = NULL;
696 		g_ublk_tgt.cb_arg = NULL;
697 	}
698 
699 	if (g_ublk_tgt.poll_groups) {
700 		free(g_ublk_tgt.poll_groups);
701 		g_ublk_tgt.poll_groups = NULL;
702 	}
703 
704 }
705 
706 static void
707 ublk_thread_exit(void *args)
708 {
709 	struct spdk_thread *ublk_thread = spdk_get_thread();
710 	uint32_t i;
711 
712 	for (i = 0; i < g_num_ublk_poll_groups; i++) {
713 		if (g_ublk_tgt.poll_groups[i].ublk_thread == ublk_thread) {
714 			spdk_poller_unregister(&g_ublk_tgt.poll_groups[i].ublk_poller);
715 			spdk_iobuf_channel_fini(&g_ublk_tgt.poll_groups[i].iobuf_ch);
716 			spdk_thread_bind(ublk_thread, false);
717 			spdk_thread_exit(ublk_thread);
718 		}
719 	}
720 }
721 
722 static int
723 ublk_close_dev(struct spdk_ublk_dev *ublk)
724 {
725 	int rc;
726 
727 	/* set is_closing */
728 	if (ublk->is_closing) {
729 		return -EBUSY;
730 	}
731 	ublk->is_closing = true;
732 
733 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_STOP_DEV);
734 	if (rc < 0) {
735 		SPDK_ERRLOG("stop dev %d failed\n", ublk->ublk_id);
736 	}
737 	return rc;
738 }
739 
740 static void
741 _ublk_fini(void *args)
742 {
743 	struct spdk_ublk_dev	*ublk, *ublk_tmp;
744 
745 	TAILQ_FOREACH_SAFE(ublk, &g_ublk_devs, tailq, ublk_tmp) {
746 		ublk_close_dev(ublk);
747 	}
748 
749 	/* Check if all ublks closed */
750 	if (TAILQ_EMPTY(&g_ublk_devs)) {
751 		SPDK_DEBUGLOG(ublk, "finish shutdown\n");
752 		spdk_poller_unregister(&g_ublk_tgt.ctrl_poller);
753 		if (g_ublk_tgt.ctrl_ring.ring_fd >= 0) {
754 			io_uring_queue_exit(&g_ublk_tgt.ctrl_ring);
755 			g_ublk_tgt.ctrl_ring.ring_fd = -1;
756 		}
757 		if (g_ublk_tgt.ctrl_fd >= 0) {
758 			close(g_ublk_tgt.ctrl_fd);
759 			g_ublk_tgt.ctrl_fd = -1;
760 		}
761 		spdk_for_each_thread(ublk_thread_exit, NULL, _ublk_fini_done);
762 	} else {
763 		spdk_thread_send_msg(spdk_get_thread(), _ublk_fini, NULL);
764 	}
765 }
766 
767 int
768 spdk_ublk_fini(spdk_ublk_fini_cb cb_fn, void *cb_arg)
769 {
770 	assert(spdk_thread_is_app_thread(NULL));
771 
772 	if (g_ublk_tgt.is_destroying == true) {
773 		/* UBLK target is being destroying */
774 		return -EBUSY;
775 	}
776 	g_ublk_tgt.cb_fn = cb_fn;
777 	g_ublk_tgt.cb_arg = cb_arg;
778 	g_ublk_tgt.is_destroying = true;
779 	_ublk_fini(NULL);
780 
781 	return 0;
782 }
783 
784 int
785 ublk_destroy_target(spdk_ublk_fini_cb cb_fn, void *cb_arg)
786 {
787 	int rc;
788 
789 	if (g_ublk_tgt.active == false) {
790 		/* UBLK target has not been created */
791 		return -ENOENT;
792 	}
793 
794 	rc = spdk_ublk_fini(cb_fn, cb_arg);
795 
796 	return rc;
797 }
798 
799 struct spdk_ublk_dev *
800 ublk_dev_find_by_id(uint32_t ublk_id)
801 {
802 	struct spdk_ublk_dev *ublk;
803 
804 	/* check whether ublk has already been registered by ublk path. */
805 	TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) {
806 		if (ublk->ublk_id == ublk_id) {
807 			return ublk;
808 		}
809 	}
810 
811 	return NULL;
812 }
813 
814 uint32_t
815 ublk_dev_get_id(struct spdk_ublk_dev *ublk)
816 {
817 	return ublk->ublk_id;
818 }
819 
820 struct spdk_ublk_dev *ublk_dev_first(void)
821 {
822 	return TAILQ_FIRST(&g_ublk_devs);
823 }
824 
825 struct spdk_ublk_dev *ublk_dev_next(struct spdk_ublk_dev *prev)
826 {
827 	return TAILQ_NEXT(prev, tailq);
828 }
829 
830 uint32_t
831 ublk_dev_get_queue_depth(struct spdk_ublk_dev *ublk)
832 {
833 	return ublk->queue_depth;
834 }
835 
836 uint32_t
837 ublk_dev_get_num_queues(struct spdk_ublk_dev *ublk)
838 {
839 	return ublk->num_queues;
840 }
841 
842 const char *
843 ublk_dev_get_bdev_name(struct spdk_ublk_dev *ublk)
844 {
845 	return spdk_bdev_get_name(ublk->bdev);
846 }
847 
848 void
849 spdk_ublk_write_config_json(struct spdk_json_write_ctx *w)
850 {
851 	struct spdk_ublk_dev *ublk;
852 
853 	spdk_json_write_array_begin(w);
854 
855 	if (g_ublk_tgt.active) {
856 		spdk_json_write_object_begin(w);
857 
858 		spdk_json_write_named_string(w, "method", "ublk_create_target");
859 		spdk_json_write_named_object_begin(w, "params");
860 		spdk_json_write_named_string(w, "cpumask", spdk_cpuset_fmt(&g_core_mask));
861 		spdk_json_write_object_end(w);
862 
863 		spdk_json_write_object_end(w);
864 	}
865 
866 	TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) {
867 		spdk_json_write_object_begin(w);
868 
869 		spdk_json_write_named_string(w, "method", "ublk_start_disk");
870 
871 		spdk_json_write_named_object_begin(w, "params");
872 		spdk_json_write_named_string(w, "bdev_name", ublk_dev_get_bdev_name(ublk));
873 		spdk_json_write_named_uint32(w, "ublk_id", ublk->ublk_id);
874 		spdk_json_write_named_uint32(w, "num_queues", ublk->num_queues);
875 		spdk_json_write_named_uint32(w, "queue_depth", ublk->queue_depth);
876 		spdk_json_write_object_end(w);
877 
878 		spdk_json_write_object_end(w);
879 	}
880 
881 	spdk_json_write_array_end(w);
882 }
883 
884 static void
885 ublk_dev_list_register(struct spdk_ublk_dev *ublk)
886 {
887 	UBLK_DEBUGLOG(ublk, "add to tailq\n");
888 	TAILQ_INSERT_TAIL(&g_ublk_devs, ublk, tailq);
889 	g_ublk_tgt.num_ublk_devs++;
890 }
891 
892 static void
893 ublk_dev_list_unregister(struct spdk_ublk_dev *ublk)
894 {
895 	/*
896 	 * ublk device may be stopped before registered.
897 	 * check whether it was registered.
898 	 */
899 
900 	if (ublk_dev_find_by_id(ublk->ublk_id)) {
901 		UBLK_DEBUGLOG(ublk, "remove from tailq\n");
902 		TAILQ_REMOVE(&g_ublk_devs, ublk, tailq);
903 		assert(g_ublk_tgt.num_ublk_devs);
904 		g_ublk_tgt.num_ublk_devs--;
905 		return;
906 	}
907 
908 	UBLK_DEBUGLOG(ublk, "not found in tailq\n");
909 	assert(false);
910 }
911 
912 static void
913 ublk_delete_dev(void *arg)
914 {
915 	struct spdk_ublk_dev *ublk = arg;
916 	int rc = 0;
917 	uint32_t q_idx;
918 
919 	assert(spdk_thread_is_app_thread(NULL));
920 	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
921 		ublk_dev_queue_fini(&ublk->queues[q_idx]);
922 	}
923 
924 	if (ublk->cdev_fd >= 0) {
925 		close(ublk->cdev_fd);
926 	}
927 
928 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_DEL_DEV);
929 	if (rc < 0) {
930 		SPDK_ERRLOG("delete dev %d failed\n", ublk->ublk_id);
931 	}
932 }
933 
934 static int
935 _ublk_close_dev_retry(void *arg)
936 {
937 	struct spdk_ublk_dev *ublk = arg;
938 
939 	if (ublk->ctrl_ops_in_progress > 0) {
940 		if (ublk->retry_count-- > 0) {
941 			return SPDK_POLLER_BUSY;
942 		}
943 		SPDK_ERRLOG("Timeout on ctrl op completion.\n");
944 	}
945 	spdk_poller_unregister(&ublk->retry_poller);
946 	ublk_delete_dev(ublk);
947 	return SPDK_POLLER_BUSY;
948 }
949 
950 static void
951 ublk_try_close_dev(void *arg)
952 {
953 	struct spdk_ublk_dev *ublk = arg;
954 
955 	assert(spdk_thread_is_app_thread(NULL));
956 
957 	ublk->queues_closed += 1;
958 	SPDK_DEBUGLOG(ublk_io, "ublkb%u closed queues %u\n", ublk->ublk_id, ublk->queues_closed);
959 
960 	if (ublk->queues_closed < ublk->num_queues) {
961 		return;
962 	}
963 
964 	if (ublk->ctrl_ops_in_progress > 0) {
965 		assert(ublk->retry_poller == NULL);
966 		ublk->retry_count = UBLK_STOP_BUSY_WAITING_MS * 1000ULL / UBLK_BUSY_POLLING_INTERVAL_US;
967 		ublk->retry_poller = SPDK_POLLER_REGISTER(_ublk_close_dev_retry, ublk,
968 				     UBLK_BUSY_POLLING_INTERVAL_US);
969 	} else {
970 		ublk_delete_dev(ublk);
971 	}
972 }
973 
974 static void
975 ublk_try_close_queue(struct ublk_queue *q)
976 {
977 	struct spdk_ublk_dev *ublk = q->dev;
978 
979 	/* Close queue until no I/O is submitted to bdev in flight,
980 	 * no I/O is waiting to commit result, and all I/Os are aborted back.
981 	 */
982 	if (!TAILQ_EMPTY(&q->inflight_io_list) || !TAILQ_EMPTY(&q->completed_io_list) || q->cmd_inflight) {
983 		/* wait for next retry */
984 		return;
985 	}
986 
987 	TAILQ_REMOVE(&q->poll_group->queue_list, q, tailq);
988 	spdk_put_io_channel(q->bdev_ch);
989 	q->bdev_ch = NULL;
990 
991 	spdk_thread_send_msg(spdk_thread_get_app_thread(), ublk_try_close_dev, ublk);
992 }
993 
994 int
995 ublk_stop_disk(uint32_t ublk_id, ublk_ctrl_cb ctrl_cb, void *cb_arg)
996 {
997 	struct spdk_ublk_dev *ublk;
998 
999 	assert(spdk_thread_is_app_thread(NULL));
1000 
1001 	ublk = ublk_dev_find_by_id(ublk_id);
1002 	if (ublk == NULL) {
1003 		SPDK_ERRLOG("no ublk dev with ublk_id=%u\n", ublk_id);
1004 		return -ENODEV;
1005 	}
1006 	if (ublk->is_closing) {
1007 		SPDK_WARNLOG("ublk %d is closing\n", ublk->ublk_id);
1008 		return -EBUSY;
1009 	}
1010 	if (ublk->ctrl_cb) {
1011 		SPDK_WARNLOG("ublk %d is busy with RPC call\n", ublk->ublk_id);
1012 		return -EBUSY;
1013 	}
1014 
1015 	ublk->ctrl_cb = ctrl_cb;
1016 	ublk->cb_arg = cb_arg;
1017 	return ublk_close_dev(ublk);
1018 }
1019 
1020 static inline void
1021 ublk_mark_io_done(struct ublk_io *io, int res)
1022 {
1023 	/*
1024 	 * mark io done by target, so that SPDK can commit its
1025 	 * result and fetch new request via io_uring command.
1026 	 */
1027 	io->cmd_op = UBLK_IO_COMMIT_AND_FETCH_REQ;
1028 	io->result = res;
1029 	io->need_data = false;
1030 }
1031 
1032 static void
1033 ublk_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1034 {
1035 	struct ublk_io	*io = cb_arg;
1036 	struct ublk_queue *q = io->q;
1037 	int res;
1038 
1039 	if (success) {
1040 		res = io->result;
1041 	} else {
1042 		res = -EIO;
1043 	}
1044 
1045 	ublk_mark_io_done(io, res);
1046 
1047 	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %d res %d)\n",
1048 		      q->q_id, io->tag, res);
1049 	TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1050 	TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq);
1051 
1052 	if (bdev_io != NULL) {
1053 		spdk_bdev_free_io(bdev_io);
1054 	}
1055 }
1056 
1057 static void
1058 ublk_queue_user_copy(struct ublk_io *io, bool is_write)
1059 {
1060 	struct ublk_queue *q = io->q;
1061 	const struct ublksrv_io_desc *iod = io->iod;
1062 	struct io_uring_sqe *sqe;
1063 	uint64_t pos;
1064 	uint32_t nbytes;
1065 
1066 	nbytes = iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1067 	pos = ublk_user_copy_pos(q->q_id, io->tag);
1068 	sqe = io_uring_get_sqe(&q->ring);
1069 	assert(sqe);
1070 
1071 	if (is_write) {
1072 		io_uring_prep_read(sqe, 0, io->payload, nbytes, pos);
1073 	} else {
1074 		io_uring_prep_write(sqe, 0, io->payload, nbytes, pos);
1075 	}
1076 	io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
1077 	io_uring_sqe_set_data64(sqe, build_user_data(io->tag, 0));
1078 
1079 	io->user_copy = true;
1080 	TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1081 	TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq);
1082 }
1083 
1084 static void
1085 ublk_user_copy_read_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1086 {
1087 	struct ublk_io	*io = cb_arg;
1088 
1089 	spdk_bdev_free_io(bdev_io);
1090 
1091 	if (success) {
1092 		ublk_queue_user_copy(io, false);
1093 		return;
1094 	}
1095 	/* READ IO Error */
1096 	ublk_io_done(NULL, false, cb_arg);
1097 }
1098 
1099 static void
1100 ublk_resubmit_io(void *arg)
1101 {
1102 	struct ublk_io *io = (struct ublk_io *)arg;
1103 
1104 	_ublk_submit_bdev_io(io->q, io);
1105 }
1106 
1107 static void
1108 ublk_queue_io(struct ublk_io *io)
1109 {
1110 	int rc;
1111 	struct spdk_bdev *bdev = io->q->dev->bdev;
1112 	struct ublk_queue *q = io->q;
1113 
1114 	io->bdev_io_wait.bdev = bdev;
1115 	io->bdev_io_wait.cb_fn = ublk_resubmit_io;
1116 	io->bdev_io_wait.cb_arg = io;
1117 
1118 	rc = spdk_bdev_queue_io_wait(bdev, q->bdev_ch, &io->bdev_io_wait);
1119 	if (rc != 0) {
1120 		SPDK_ERRLOG("Queue io failed in ublk_queue_io, rc=%d.\n", rc);
1121 		ublk_io_done(NULL, false, io);
1122 	}
1123 }
1124 
1125 static void
1126 ublk_io_get_buffer_cb(struct spdk_iobuf_entry *iobuf, void *buf)
1127 {
1128 	struct ublk_io *io = SPDK_CONTAINEROF(iobuf, struct ublk_io, iobuf);
1129 
1130 	io->mpool_entry = buf;
1131 	assert(io->payload == NULL);
1132 	io->payload = (void *)(uintptr_t)SPDK_ALIGN_CEIL((uintptr_t)buf, 4096ULL);
1133 	io->get_buf_cb(io);
1134 }
1135 
1136 static void
1137 ublk_io_get_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch,
1138 		   ublk_get_buf_cb get_buf_cb)
1139 {
1140 	void *buf;
1141 
1142 	io->payload_size = io->iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1143 	io->get_buf_cb = get_buf_cb;
1144 	buf = spdk_iobuf_get(iobuf_ch, io->payload_size, &io->iobuf, ublk_io_get_buffer_cb);
1145 
1146 	if (buf != NULL) {
1147 		ublk_io_get_buffer_cb(&io->iobuf, buf);
1148 	}
1149 }
1150 
1151 static void
1152 ublk_io_put_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch)
1153 {
1154 	if (io->payload) {
1155 		spdk_iobuf_put(iobuf_ch, io->mpool_entry, io->payload_size);
1156 		io->mpool_entry = NULL;
1157 		io->payload = NULL;
1158 	}
1159 }
1160 
1161 static void
1162 _ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io)
1163 {
1164 	struct spdk_ublk_dev *ublk = q->dev;
1165 	struct spdk_bdev_desc *desc = io->bdev_desc;
1166 	struct spdk_io_channel *ch = io->bdev_ch;
1167 	uint64_t offset_blocks, num_blocks;
1168 	spdk_bdev_io_completion_cb read_cb;
1169 	uint8_t ublk_op;
1170 	int rc = 0;
1171 	const struct ublksrv_io_desc *iod = io->iod;
1172 
1173 	ublk_op = ublksrv_get_op(iod);
1174 	offset_blocks = iod->start_sector >> ublk->sector_per_block_shift;
1175 	num_blocks = iod->nr_sectors >> ublk->sector_per_block_shift;
1176 
1177 	switch (ublk_op) {
1178 	case UBLK_IO_OP_READ:
1179 		if (g_ublk_tgt.user_copy) {
1180 			read_cb = ublk_user_copy_read_done;
1181 		} else {
1182 			read_cb = ublk_io_done;
1183 		}
1184 		rc = spdk_bdev_read_blocks(desc, ch, io->payload, offset_blocks, num_blocks, read_cb, io);
1185 		break;
1186 	case UBLK_IO_OP_WRITE:
1187 		rc = spdk_bdev_write_blocks(desc, ch, io->payload, offset_blocks, num_blocks, ublk_io_done, io);
1188 		break;
1189 	case UBLK_IO_OP_FLUSH:
1190 		rc = spdk_bdev_flush_blocks(desc, ch, 0, spdk_bdev_get_num_blocks(ublk->bdev), ublk_io_done, io);
1191 		break;
1192 	case UBLK_IO_OP_DISCARD:
1193 		rc = spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1194 		break;
1195 	case UBLK_IO_OP_WRITE_ZEROES:
1196 		rc = spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io);
1197 		break;
1198 	default:
1199 		rc = -1;
1200 	}
1201 
1202 	if (rc < 0) {
1203 		if (rc == -ENOMEM) {
1204 			SPDK_INFOLOG(ublk, "No memory, start to queue io.\n");
1205 			ublk_queue_io(io);
1206 		} else {
1207 			SPDK_ERRLOG("ublk io failed in ublk_queue_io, rc=%d, ublk_op=%u\n", rc, ublk_op);
1208 			ublk_io_done(NULL, false, io);
1209 		}
1210 	}
1211 }
1212 
1213 static void
1214 read_get_buffer_done(struct ublk_io *io)
1215 {
1216 	_ublk_submit_bdev_io(io->q, io);
1217 }
1218 
1219 static void
1220 user_copy_write_get_buffer_done(struct ublk_io *io)
1221 {
1222 	ublk_queue_user_copy(io, true);
1223 }
1224 
1225 static void
1226 ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io)
1227 {
1228 	struct spdk_iobuf_channel *iobuf_ch = &q->poll_group->iobuf_ch;
1229 	const struct ublksrv_io_desc *iod = io->iod;
1230 	uint8_t ublk_op;
1231 
1232 	io->result = iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT);
1233 	ublk_op = ublksrv_get_op(iod);
1234 	switch (ublk_op) {
1235 	case UBLK_IO_OP_READ:
1236 		ublk_io_get_buffer(io, iobuf_ch, read_get_buffer_done);
1237 		break;
1238 	case UBLK_IO_OP_WRITE:
1239 		if (g_ublk_tgt.user_copy) {
1240 			ublk_io_get_buffer(io, iobuf_ch, user_copy_write_get_buffer_done);
1241 		} else {
1242 			_ublk_submit_bdev_io(q, io);
1243 		}
1244 		break;
1245 	default:
1246 		_ublk_submit_bdev_io(q, io);
1247 		break;
1248 	}
1249 }
1250 
1251 static inline void
1252 ublksrv_queue_io_cmd(struct ublk_queue *q,
1253 		     struct ublk_io *io, unsigned tag)
1254 {
1255 	struct ublksrv_io_cmd *cmd;
1256 	struct io_uring_sqe *sqe;
1257 	unsigned int cmd_op = 0;;
1258 	uint64_t user_data;
1259 
1260 	/* each io should have operation of fetching or committing */
1261 	assert((io->cmd_op == UBLK_IO_FETCH_REQ) || (io->cmd_op == UBLK_IO_NEED_GET_DATA) ||
1262 	       (io->cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ));
1263 	cmd_op = io->cmd_op;
1264 
1265 	sqe = io_uring_get_sqe(&q->ring);
1266 	assert(sqe);
1267 
1268 	cmd = (struct ublksrv_io_cmd *)ublk_get_sqe_cmd(sqe);
1269 	if (cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ) {
1270 		cmd->result = io->result;
1271 	}
1272 
1273 	/* These fields should be written once, never change */
1274 	ublk_set_sqe_cmd_op(sqe, cmd_op);
1275 	/* dev->cdev_fd */
1276 	sqe->fd		= 0;
1277 	sqe->opcode	= IORING_OP_URING_CMD;
1278 	sqe->flags	= IOSQE_FIXED_FILE;
1279 	sqe->rw_flags	= 0;
1280 	cmd->tag	= tag;
1281 	cmd->addr	= g_ublk_tgt.user_copy ? 0 : (__u64)(uintptr_t)(io->payload);
1282 	cmd->q_id	= q->q_id;
1283 
1284 	user_data = build_user_data(tag, cmd_op);
1285 	io_uring_sqe_set_data64(sqe, user_data);
1286 
1287 	io->cmd_op = 0;
1288 
1289 	SPDK_DEBUGLOG(ublk_io, "(qid %d tag %u cmd_op %u) iof %x stopping %d\n",
1290 		      q->q_id, tag, cmd_op,
1291 		      io->cmd_op, q->is_stopping);
1292 }
1293 
1294 static int
1295 ublk_io_xmit(struct ublk_queue *q)
1296 {
1297 	TAILQ_HEAD(, ublk_io) buffer_free_list;
1298 	struct spdk_iobuf_channel *iobuf_ch;
1299 	int rc = 0, count = 0;
1300 	struct ublk_io *io;
1301 
1302 	if (TAILQ_EMPTY(&q->completed_io_list)) {
1303 		return 0;
1304 	}
1305 
1306 	TAILQ_INIT(&buffer_free_list);
1307 	while (!TAILQ_EMPTY(&q->completed_io_list)) {
1308 		io = TAILQ_FIRST(&q->completed_io_list);
1309 		assert(io != NULL);
1310 		/*
1311 		 * Remove IO from list now assuming it will be completed. It will be inserted
1312 		 * back to the head if it cannot be completed. This approach is specifically
1313 		 * taken to work around a scan-build use-after-free mischaracterization.
1314 		 */
1315 		TAILQ_REMOVE(&q->completed_io_list, io, tailq);
1316 		if (!io->user_copy) {
1317 			if (!io->need_data) {
1318 				TAILQ_INSERT_TAIL(&buffer_free_list, io, tailq);
1319 			}
1320 			ublksrv_queue_io_cmd(q, io, io->tag);
1321 		}
1322 		count++;
1323 	}
1324 
1325 	q->cmd_inflight += count;
1326 	rc = io_uring_submit(&q->ring);
1327 	if (rc != count) {
1328 		SPDK_ERRLOG("could not submit all commands\n");
1329 		assert(false);
1330 	}
1331 
1332 	/* Note: for READ io, ublk will always copy the data out of
1333 	 * the buffers in the io_uring_submit context.  Since we
1334 	 * are not using SQPOLL for IO rings, we can safely free
1335 	 * those IO buffers here.  This design doesn't seem ideal,
1336 	 * but it's what's possible since there is no discrete
1337 	 * COMMIT_REQ operation.  That will need to change in the
1338 	 * future should we ever want to support async copy
1339 	 * operations.
1340 	 */
1341 	iobuf_ch = &q->poll_group->iobuf_ch;
1342 	while (!TAILQ_EMPTY(&buffer_free_list)) {
1343 		io = TAILQ_FIRST(&buffer_free_list);
1344 		TAILQ_REMOVE(&buffer_free_list, io, tailq);
1345 		ublk_io_put_buffer(io, iobuf_ch);
1346 	}
1347 	return rc;
1348 }
1349 
1350 static void
1351 write_get_buffer_done(struct ublk_io *io)
1352 {
1353 	io->need_data = true;
1354 	io->cmd_op = UBLK_IO_NEED_GET_DATA;
1355 	io->result = 0;
1356 
1357 	TAILQ_REMOVE(&io->q->inflight_io_list, io, tailq);
1358 	TAILQ_INSERT_TAIL(&io->q->completed_io_list, io, tailq);
1359 }
1360 
1361 static int
1362 ublk_io_recv(struct ublk_queue *q)
1363 {
1364 	struct io_uring_cqe *cqe;
1365 	unsigned head, tag;
1366 	int fetch, count = 0;
1367 	struct ublk_io *io;
1368 	struct spdk_iobuf_channel *iobuf_ch;
1369 
1370 	if (q->cmd_inflight == 0) {
1371 		return 0;
1372 	}
1373 
1374 	iobuf_ch = &q->poll_group->iobuf_ch;
1375 	io_uring_for_each_cqe(&q->ring, head, cqe) {
1376 		tag = user_data_to_tag(cqe->user_data);
1377 		io = &q->ios[tag];
1378 
1379 		SPDK_DEBUGLOG(ublk_io, "res %d qid %d tag %u, user copy %u, cmd_op %u\n",
1380 			      cqe->res, q->q_id, tag, io->user_copy, user_data_to_op(cqe->user_data));
1381 
1382 		q->cmd_inflight--;
1383 		TAILQ_INSERT_TAIL(&q->inflight_io_list, io, tailq);
1384 
1385 		if (!io->user_copy) {
1386 			fetch = (cqe->res != UBLK_IO_RES_ABORT) && !q->is_stopping;
1387 			if (!fetch) {
1388 				q->is_stopping = true;
1389 				if (io->cmd_op == UBLK_IO_FETCH_REQ) {
1390 					io->cmd_op = 0;
1391 				}
1392 			}
1393 
1394 			if (cqe->res == UBLK_IO_RES_OK) {
1395 				ublk_submit_bdev_io(q, io);
1396 			} else if (cqe->res == UBLK_IO_RES_NEED_GET_DATA) {
1397 				ublk_io_get_buffer(io, iobuf_ch, write_get_buffer_done);
1398 			} else {
1399 				if (cqe->res != UBLK_IO_RES_ABORT) {
1400 					SPDK_ERRLOG("ublk received error io: res %d qid %d tag %u cmd_op %u\n",
1401 						    cqe->res, q->q_id, tag, user_data_to_op(cqe->user_data));
1402 				}
1403 				TAILQ_REMOVE(&q->inflight_io_list, io, tailq);
1404 			}
1405 		} else {
1406 
1407 			/* clear `user_copy` for next use of this IO structure */
1408 			io->user_copy = false;
1409 
1410 			assert((ublksrv_get_op(io->iod) == UBLK_IO_OP_READ) ||
1411 			       (ublksrv_get_op(io->iod) == UBLK_IO_OP_WRITE));
1412 			if (cqe->res != io->result) {
1413 				/* EIO */
1414 				ublk_io_done(NULL, false, io);
1415 			} else {
1416 				if (ublksrv_get_op(io->iod) == UBLK_IO_OP_READ) {
1417 					/* bdev_io is already freed in first READ cycle */
1418 					ublk_io_done(NULL, true, io);
1419 				} else {
1420 					_ublk_submit_bdev_io(q, io);
1421 				}
1422 			}
1423 		}
1424 		count += 1;
1425 		if (count == UBLK_QUEUE_REQUEST) {
1426 			break;
1427 		}
1428 	}
1429 	io_uring_cq_advance(&q->ring, count);
1430 
1431 	return count;
1432 }
1433 
1434 static int
1435 ublk_poll(void *arg)
1436 {
1437 	struct ublk_poll_group *poll_group = arg;
1438 	struct ublk_queue *q, *q_tmp;
1439 	int sent, received, count = 0;
1440 
1441 	TAILQ_FOREACH_SAFE(q, &poll_group->queue_list, tailq, q_tmp) {
1442 		sent = ublk_io_xmit(q);
1443 		received = ublk_io_recv(q);
1444 		if (spdk_unlikely(q->is_stopping)) {
1445 			ublk_try_close_queue(q);
1446 		}
1447 		count += sent + received;
1448 	}
1449 	if (count > 0) {
1450 		return SPDK_POLLER_BUSY;
1451 	} else {
1452 		return SPDK_POLLER_IDLE;
1453 	}
1454 }
1455 
1456 static void
1457 ublk_bdev_hot_remove(struct spdk_ublk_dev *ublk)
1458 {
1459 	ublk_close_dev(ublk);
1460 }
1461 
1462 static void
1463 ublk_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1464 		   void *event_ctx)
1465 {
1466 	switch (type) {
1467 	case SPDK_BDEV_EVENT_REMOVE:
1468 		ublk_bdev_hot_remove(event_ctx);
1469 		break;
1470 	default:
1471 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1472 		break;
1473 	}
1474 }
1475 
1476 static void
1477 ublk_dev_init_io_cmds(struct io_uring *r, uint32_t q_depth)
1478 {
1479 	struct io_uring_sqe *sqe;
1480 	uint32_t i;
1481 
1482 	for (i = 0; i < q_depth; i++) {
1483 		sqe = ublk_uring_get_sqe(r, i);
1484 
1485 		/* These fields should be written once, never change */
1486 		sqe->flags = IOSQE_FIXED_FILE;
1487 		sqe->rw_flags = 0;
1488 		sqe->ioprio = 0;
1489 		sqe->off = 0;
1490 	}
1491 }
1492 
1493 static int
1494 ublk_dev_queue_init(struct ublk_queue *q)
1495 {
1496 	int rc = 0, cmd_buf_size;
1497 	uint32_t j;
1498 	struct spdk_ublk_dev *ublk = q->dev;
1499 	unsigned long off;
1500 
1501 	cmd_buf_size = ublk_queue_cmd_buf_sz(q->q_depth);
1502 	off = UBLKSRV_CMD_BUF_OFFSET +
1503 	      q->q_id * (UBLK_MAX_QUEUE_DEPTH * sizeof(struct ublksrv_io_desc));
1504 	q->io_cmd_buf = (struct ublksrv_io_desc *)mmap(0, cmd_buf_size, PROT_READ,
1505 			MAP_SHARED | MAP_POPULATE, ublk->cdev_fd, off);
1506 	if (q->io_cmd_buf == MAP_FAILED) {
1507 		q->io_cmd_buf = NULL;
1508 		rc = -errno;
1509 		SPDK_ERRLOG("Failed at mmap: %s\n", spdk_strerror(-rc));
1510 		return rc;
1511 	}
1512 
1513 	for (j = 0; j < q->q_depth; j++) {
1514 		q->ios[j].cmd_op = UBLK_IO_FETCH_REQ;
1515 		q->ios[j].iod = &q->io_cmd_buf[j];
1516 	}
1517 
1518 	rc = ublk_setup_ring(q->q_depth, &q->ring, IORING_SETUP_SQE128);
1519 	if (rc < 0) {
1520 		SPDK_ERRLOG("Failed at setup uring: %s\n", spdk_strerror(-rc));
1521 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1522 		q->io_cmd_buf = NULL;
1523 		return rc;
1524 	}
1525 
1526 	rc = io_uring_register_files(&q->ring, &ublk->cdev_fd, 1);
1527 	if (rc != 0) {
1528 		SPDK_ERRLOG("Failed at uring register files: %s\n", spdk_strerror(-rc));
1529 		io_uring_queue_exit(&q->ring);
1530 		q->ring.ring_fd = -1;
1531 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1532 		q->io_cmd_buf = NULL;
1533 		return rc;
1534 	}
1535 
1536 	ublk_dev_init_io_cmds(&q->ring, q->q_depth);
1537 
1538 	return 0;
1539 }
1540 
1541 static void
1542 ublk_dev_queue_fini(struct ublk_queue *q)
1543 {
1544 	if (q->ring.ring_fd >= 0) {
1545 		io_uring_unregister_files(&q->ring);
1546 		io_uring_queue_exit(&q->ring);
1547 		q->ring.ring_fd = -1;
1548 	}
1549 	if (q->io_cmd_buf) {
1550 		munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth));
1551 	}
1552 }
1553 
1554 static void
1555 ublk_dev_queue_io_init(struct ublk_queue *q)
1556 {
1557 	struct ublk_io *io;
1558 	uint32_t i;
1559 	int rc __attribute__((unused));
1560 	void *buf;
1561 
1562 	/* Some older kernels require a buffer to get posted, even
1563 	 * when NEED_GET_DATA has been specified.  So allocate a
1564 	 * temporary buffer, only for purposes of this workaround.
1565 	 * It never actually gets used, so we will free it immediately
1566 	 * after all of the commands are posted.
1567 	 */
1568 	buf = malloc(64);
1569 
1570 	assert(q->bdev_ch != NULL);
1571 
1572 	/* Initialize and submit all io commands to ublk driver */
1573 	for (i = 0; i < q->q_depth; i++) {
1574 		io = &q->ios[i];
1575 		io->tag = (uint16_t)i;
1576 		io->payload = buf;
1577 		io->bdev_ch = q->bdev_ch;
1578 		io->bdev_desc = q->dev->bdev_desc;
1579 		ublksrv_queue_io_cmd(q, io, i);
1580 	}
1581 
1582 	q->cmd_inflight += q->q_depth;
1583 	rc = io_uring_submit(&q->ring);
1584 	assert(rc == (int)q->q_depth);
1585 	for (i = 0; i < q->q_depth; i++) {
1586 		io = &q->ios[i];
1587 		io->payload = NULL;
1588 	}
1589 	free(buf);
1590 }
1591 
1592 static int
1593 ublk_set_params(struct spdk_ublk_dev *ublk)
1594 {
1595 	int rc;
1596 
1597 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_SET_PARAMS);
1598 	if (rc < 0) {
1599 		SPDK_ERRLOG("UBLK can't set params for dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
1600 	}
1601 
1602 	return rc;
1603 }
1604 
1605 static void
1606 ublk_dev_info_init(struct spdk_ublk_dev *ublk)
1607 {
1608 	struct ublksrv_ctrl_dev_info uinfo = {
1609 		.queue_depth = ublk->queue_depth,
1610 		.nr_hw_queues = ublk->num_queues,
1611 		.dev_id = ublk->ublk_id,
1612 		.max_io_buf_bytes = UBLK_IO_MAX_BYTES,
1613 		.ublksrv_pid = getpid(),
1614 		.flags = UBLK_F_URING_CMD_COMP_IN_TASK,
1615 	};
1616 
1617 	if (g_ublk_tgt.user_copy) {
1618 		uinfo.flags |= UBLK_F_USER_COPY;
1619 	} else {
1620 		uinfo.flags |= UBLK_F_NEED_GET_DATA;
1621 	}
1622 
1623 	ublk->dev_info = uinfo;
1624 }
1625 
1626 /* Set ublk device parameters based on bdev */
1627 static void
1628 ublk_info_param_init(struct spdk_ublk_dev *ublk)
1629 {
1630 	struct spdk_bdev *bdev = ublk->bdev;
1631 	uint32_t blk_size = spdk_bdev_get_data_block_size(bdev);
1632 	uint32_t pblk_size = spdk_bdev_get_physical_block_size(bdev);
1633 	uint32_t io_opt_blocks = spdk_bdev_get_optimal_io_boundary(bdev);
1634 	uint64_t num_blocks = spdk_bdev_get_num_blocks(bdev);
1635 	uint8_t sectors_per_block = blk_size >> LINUX_SECTOR_SHIFT;
1636 	uint32_t io_min_size = blk_size;
1637 	uint32_t io_opt_size = spdk_max(io_opt_blocks * blk_size, io_min_size);
1638 
1639 	struct ublk_params uparams = {
1640 		.types = UBLK_PARAM_TYPE_BASIC,
1641 		.len = sizeof(struct ublk_params),
1642 		.basic = {
1643 			.logical_bs_shift = spdk_u32log2(blk_size),
1644 			.physical_bs_shift = spdk_u32log2(pblk_size),
1645 			.io_min_shift = spdk_u32log2(io_min_size),
1646 			.io_opt_shift = spdk_u32log2(io_opt_size),
1647 			.dev_sectors = num_blocks * sectors_per_block,
1648 			.max_sectors = UBLK_IO_MAX_BYTES >> LINUX_SECTOR_SHIFT,
1649 		}
1650 	};
1651 
1652 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_FLUSH)) {
1653 		uparams.basic.attrs = UBLK_ATTR_VOLATILE_CACHE;
1654 	}
1655 
1656 	if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1657 		uparams.types |= UBLK_PARAM_TYPE_DISCARD;
1658 		uparams.discard.discard_alignment = sectors_per_block;
1659 		uparams.discard.max_discard_sectors = num_blocks * sectors_per_block;
1660 		uparams.discard.max_discard_segments = 1;
1661 		uparams.discard.discard_granularity = blk_size;
1662 		if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) {
1663 			uparams.discard.max_write_zeroes_sectors = num_blocks * sectors_per_block;
1664 		}
1665 	}
1666 
1667 	ublk->dev_params = uparams;
1668 }
1669 
1670 static void
1671 _ublk_free_dev(void *arg)
1672 {
1673 	struct spdk_ublk_dev *ublk = arg;
1674 
1675 	ublk_free_dev(ublk);
1676 }
1677 
1678 static void
1679 free_buffers(void *arg)
1680 {
1681 	struct ublk_queue *q = arg;
1682 	uint32_t i;
1683 
1684 	for (i = 0; i < q->q_depth; i++) {
1685 		ublk_io_put_buffer(&q->ios[i], &q->poll_group->iobuf_ch);
1686 	}
1687 	free(q->ios);
1688 	q->ios = NULL;
1689 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _ublk_free_dev, q->dev);
1690 }
1691 
1692 static void
1693 ublk_free_dev(struct spdk_ublk_dev *ublk)
1694 {
1695 	struct ublk_queue *q;
1696 	uint32_t q_idx;
1697 
1698 	for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) {
1699 		q = &ublk->queues[q_idx];
1700 
1701 		/* The ublk_io of this queue are not initialized. */
1702 		if (q->ios == NULL) {
1703 			continue;
1704 		}
1705 
1706 		/* We found a queue that has an ios array that may have buffers
1707 		 * that need to be freed.  Send a message to the queue's thread
1708 		 * so it can free the buffers back to that thread's iobuf channel.
1709 		 * When it's done, it will set q->ios to NULL and send a message
1710 		 * back to this function to continue.
1711 		 */
1712 		if (q->poll_group) {
1713 			spdk_thread_send_msg(q->poll_group->ublk_thread, free_buffers, q);
1714 			return;
1715 		} else {
1716 			free(q->ios);
1717 			q->ios = NULL;
1718 		}
1719 	}
1720 
1721 	/* All of the buffers associated with the queues have been freed, so now
1722 	 * continue with releasing resources for the rest of the ublk device.
1723 	 */
1724 	if (ublk->bdev_desc) {
1725 		spdk_bdev_close(ublk->bdev_desc);
1726 		ublk->bdev_desc = NULL;
1727 	}
1728 
1729 	ublk_dev_list_unregister(ublk);
1730 	SPDK_NOTICELOG("ublk dev %d stopped\n", ublk->ublk_id);
1731 
1732 	free(ublk);
1733 }
1734 
1735 static int
1736 ublk_ios_init(struct spdk_ublk_dev *ublk)
1737 {
1738 	int rc;
1739 	uint32_t i, j;
1740 	struct ublk_queue *q;
1741 
1742 	for (i = 0; i < ublk->num_queues; i++) {
1743 		q = &ublk->queues[i];
1744 
1745 		TAILQ_INIT(&q->completed_io_list);
1746 		TAILQ_INIT(&q->inflight_io_list);
1747 		q->dev = ublk;
1748 		q->q_id = i;
1749 		q->q_depth = ublk->queue_depth;
1750 		q->ios = calloc(q->q_depth, sizeof(struct ublk_io));
1751 		if (!q->ios) {
1752 			rc = -ENOMEM;
1753 			SPDK_ERRLOG("could not allocate queue ios\n");
1754 			goto err;
1755 		}
1756 		for (j = 0; j < q->q_depth; j++) {
1757 			q->ios[j].q = q;
1758 		}
1759 	}
1760 
1761 	return 0;
1762 
1763 err:
1764 	for (i = 0; i < ublk->num_queues; i++) {
1765 		free(q->ios);
1766 		q->ios = NULL;
1767 	}
1768 	return rc;
1769 }
1770 
1771 static void
1772 ublk_queue_run(void *arg1)
1773 {
1774 	struct ublk_queue	*q = arg1;
1775 	struct spdk_ublk_dev *ublk = q->dev;
1776 	struct ublk_poll_group *poll_group = q->poll_group;
1777 
1778 	assert(spdk_get_thread() == poll_group->ublk_thread);
1779 	q->bdev_ch = spdk_bdev_get_io_channel(ublk->bdev_desc);
1780 	/* Queues must be filled with IO in the io pthread */
1781 	ublk_dev_queue_io_init(q);
1782 
1783 	TAILQ_INSERT_TAIL(&poll_group->queue_list, q, tailq);
1784 }
1785 
1786 int
1787 ublk_start_disk(const char *bdev_name, uint32_t ublk_id,
1788 		uint32_t num_queues, uint32_t queue_depth,
1789 		ublk_ctrl_cb ctrl_cb, void *cb_arg)
1790 {
1791 	int			rc;
1792 	uint32_t		i;
1793 	struct spdk_bdev	*bdev;
1794 	struct spdk_ublk_dev	*ublk = NULL;
1795 	uint32_t		sector_per_block;
1796 
1797 	assert(spdk_thread_is_app_thread(NULL));
1798 
1799 	if (g_ublk_tgt.active == false) {
1800 		SPDK_ERRLOG("NO ublk target exist\n");
1801 		return -ENODEV;
1802 	}
1803 
1804 	ublk = ublk_dev_find_by_id(ublk_id);
1805 	if (ublk != NULL) {
1806 		SPDK_DEBUGLOG(ublk, "ublk id %d is in use.\n", ublk_id);
1807 		return -EBUSY;
1808 	}
1809 
1810 	if (g_ublk_tgt.num_ublk_devs >= g_ublks_max) {
1811 		SPDK_DEBUGLOG(ublk, "Reached maximum number of supported devices: %u\n", g_ublks_max);
1812 		return -ENOTSUP;
1813 	}
1814 
1815 	ublk = calloc(1, sizeof(*ublk));
1816 	if (ublk == NULL) {
1817 		return -ENOMEM;
1818 	}
1819 	ublk->ctrl_cb = ctrl_cb;
1820 	ublk->cb_arg = cb_arg;
1821 	ublk->cdev_fd = -1;
1822 	ublk->ublk_id = ublk_id;
1823 	UBLK_DEBUGLOG(ublk, "bdev %s num_queues %d queue_depth %d\n",
1824 		      bdev_name, num_queues, queue_depth);
1825 
1826 	rc = spdk_bdev_open_ext(bdev_name, true, ublk_bdev_event_cb, ublk, &ublk->bdev_desc);
1827 	if (rc != 0) {
1828 		SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc);
1829 		free(ublk);
1830 		return rc;
1831 	}
1832 
1833 	bdev = spdk_bdev_desc_get_bdev(ublk->bdev_desc);
1834 	ublk->bdev = bdev;
1835 	sector_per_block = spdk_bdev_get_data_block_size(ublk->bdev) >> LINUX_SECTOR_SHIFT;
1836 	ublk->sector_per_block_shift = spdk_u32log2(sector_per_block);
1837 
1838 	ublk->queues_closed = 0;
1839 	ublk->num_queues = num_queues;
1840 	ublk->queue_depth = queue_depth;
1841 	if (ublk->queue_depth > UBLK_DEV_MAX_QUEUE_DEPTH) {
1842 		SPDK_WARNLOG("Set Queue depth %d of UBLK %d to maximum %d\n",
1843 			     ublk->queue_depth, ublk->ublk_id, UBLK_DEV_MAX_QUEUE_DEPTH);
1844 		ublk->queue_depth = UBLK_DEV_MAX_QUEUE_DEPTH;
1845 	}
1846 	if (ublk->num_queues > UBLK_DEV_MAX_QUEUES) {
1847 		SPDK_WARNLOG("Set Queue num %d of UBLK %d to maximum %d\n",
1848 			     ublk->num_queues, ublk->ublk_id, UBLK_DEV_MAX_QUEUES);
1849 		ublk->num_queues = UBLK_DEV_MAX_QUEUES;
1850 	}
1851 	for (i = 0; i < ublk->num_queues; i++) {
1852 		ublk->queues[i].ring.ring_fd = -1;
1853 	}
1854 
1855 	ublk_dev_info_init(ublk);
1856 	ublk_info_param_init(ublk);
1857 	rc = ublk_ios_init(ublk);
1858 	if (rc != 0) {
1859 		spdk_bdev_close(ublk->bdev_desc);
1860 		free(ublk);
1861 		return rc;
1862 	}
1863 
1864 	SPDK_INFOLOG(ublk, "Enabling kernel access to bdev %s via ublk %d\n",
1865 		     bdev_name, ublk_id);
1866 
1867 	/* Add ublk_dev to the end of disk list */
1868 	ublk_dev_list_register(ublk);
1869 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_ADD_DEV);
1870 	if (rc < 0) {
1871 		SPDK_ERRLOG("UBLK can't add dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc));
1872 		ublk_free_dev(ublk);
1873 	}
1874 
1875 	return rc;
1876 }
1877 
1878 static int
1879 ublk_finish_start(struct spdk_ublk_dev *ublk)
1880 {
1881 	int			rc;
1882 	uint32_t		q_id;
1883 	struct spdk_thread	*ublk_thread;
1884 	char			buf[64];
1885 
1886 	snprintf(buf, 64, "%s%d", UBLK_BLK_CDEV, ublk->ublk_id);
1887 	ublk->cdev_fd = open(buf, O_RDWR);
1888 	if (ublk->cdev_fd < 0) {
1889 		rc = ublk->cdev_fd;
1890 		SPDK_ERRLOG("can't open %s, rc %d\n", buf, rc);
1891 		return rc;
1892 	}
1893 
1894 	for (q_id = 0; q_id < ublk->num_queues; q_id++) {
1895 		rc = ublk_dev_queue_init(&ublk->queues[q_id]);
1896 		if (rc) {
1897 			return rc;
1898 		}
1899 	}
1900 
1901 	rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_START_DEV);
1902 	if (rc < 0) {
1903 		SPDK_ERRLOG("start dev %d failed, rc %s\n", ublk->ublk_id,
1904 			    spdk_strerror(-rc));
1905 		return rc;
1906 	}
1907 
1908 	/* Send queue to different spdk_threads for load balance */
1909 	for (q_id = 0; q_id < ublk->num_queues; q_id++) {
1910 		ublk->queues[q_id].poll_group = &g_ublk_tgt.poll_groups[g_next_ublk_poll_group];
1911 		ublk_thread = g_ublk_tgt.poll_groups[g_next_ublk_poll_group].ublk_thread;
1912 		spdk_thread_send_msg(ublk_thread, ublk_queue_run, &ublk->queues[q_id]);
1913 		g_next_ublk_poll_group++;
1914 		if (g_next_ublk_poll_group == g_num_ublk_poll_groups) {
1915 			g_next_ublk_poll_group = 0;
1916 		}
1917 	}
1918 
1919 	return 0;
1920 }
1921 
1922 SPDK_LOG_REGISTER_COMPONENT(ublk)
1923 SPDK_LOG_REGISTER_COMPONENT(ublk_io)
1924