1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2022 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include <liburing.h> 7 8 #include "spdk/stdinc.h" 9 #include "spdk/string.h" 10 #include "spdk/bdev.h" 11 #include "spdk/endian.h" 12 #include "spdk/env.h" 13 #include "spdk/likely.h" 14 #include "spdk/log.h" 15 #include "spdk/util.h" 16 #include "spdk/queue.h" 17 #include "spdk/json.h" 18 #include "spdk/ublk.h" 19 #include "spdk/thread.h" 20 21 #include "ublk_internal.h" 22 23 #define UBLK_CTRL_DEV "/dev/ublk-control" 24 #define UBLK_BLK_CDEV "/dev/ublkc" 25 26 #define LINUX_SECTOR_SHIFT 9 27 #define UBLK_IO_MAX_BYTES SPDK_BDEV_LARGE_BUF_MAX_SIZE 28 #define UBLK_DEV_MAX_QUEUES 32 29 #define UBLK_DEV_MAX_QUEUE_DEPTH 1024 30 #define UBLK_QUEUE_REQUEST 32 31 #define UBLK_STOP_BUSY_WAITING_MS 10000 32 #define UBLK_BUSY_POLLING_INTERVAL_US 20000 33 #define UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US 1000 34 /* By default, kernel ublk_drv driver can support up to 64 block devices */ 35 #define UBLK_DEFAULT_MAX_SUPPORTED_DEVS 64 36 37 #define UBLK_IOBUF_SMALL_CACHE_SIZE 128 38 #define UBLK_IOBUF_LARGE_CACHE_SIZE 32 39 40 #define UBLK_DEBUGLOG(ublk, format, ...) \ 41 SPDK_DEBUGLOG(ublk, "ublk%d: " format, ublk->ublk_id, ##__VA_ARGS__); 42 43 static uint32_t g_num_ublk_poll_groups = 0; 44 static uint32_t g_next_ublk_poll_group = 0; 45 static uint32_t g_ublks_max = UBLK_DEFAULT_MAX_SUPPORTED_DEVS; 46 static struct spdk_cpuset g_core_mask; 47 48 struct ublk_queue; 49 struct ublk_poll_group; 50 struct ublk_io; 51 static void _ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io); 52 static void ublk_dev_queue_fini(struct ublk_queue *q); 53 static int ublk_poll(void *arg); 54 static int ublk_ctrl_cmd(struct spdk_ublk_dev *ublk, uint32_t cmd_op); 55 56 static void ublk_set_params(struct spdk_ublk_dev *ublk); 57 static void ublk_finish_start(struct spdk_ublk_dev *ublk); 58 static void ublk_free_dev(struct spdk_ublk_dev *ublk); 59 static void ublk_delete_dev(void *arg); 60 static int ublk_close_dev(struct spdk_ublk_dev *ublk); 61 62 static const char *ublk_op_name[64] 63 __attribute__((unused)) = { 64 [UBLK_CMD_ADD_DEV] = "UBLK_CMD_ADD_DEV", 65 [UBLK_CMD_DEL_DEV] = "UBLK_CMD_DEL_DEV", 66 [UBLK_CMD_START_DEV] = "UBLK_CMD_START_DEV", 67 [UBLK_CMD_STOP_DEV] = "UBLK_CMD_STOP_DEV", 68 [UBLK_CMD_SET_PARAMS] = "UBLK_CMD_SET_PARAMS", 69 }; 70 71 typedef void (*ublk_get_buf_cb)(struct ublk_io *io); 72 73 struct ublk_io { 74 void *payload; 75 void *mpool_entry; 76 bool need_data; 77 bool user_copy; 78 uint16_t tag; 79 uint64_t payload_size; 80 uint32_t cmd_op; 81 int32_t result; 82 struct spdk_bdev_desc *bdev_desc; 83 struct spdk_io_channel *bdev_ch; 84 const struct ublksrv_io_desc *iod; 85 ublk_get_buf_cb get_buf_cb; 86 struct ublk_queue *q; 87 /* for bdev io_wait */ 88 struct spdk_bdev_io_wait_entry bdev_io_wait; 89 struct spdk_iobuf_entry iobuf; 90 91 TAILQ_ENTRY(ublk_io) tailq; 92 }; 93 94 struct ublk_queue { 95 uint32_t q_id; 96 uint32_t q_depth; 97 struct ublk_io *ios; 98 TAILQ_HEAD(, ublk_io) completed_io_list; 99 TAILQ_HEAD(, ublk_io) inflight_io_list; 100 uint32_t cmd_inflight; 101 bool is_stopping; 102 struct ublksrv_io_desc *io_cmd_buf; 103 /* ring depth == dev_info->queue_depth. */ 104 struct io_uring ring; 105 struct spdk_ublk_dev *dev; 106 struct ublk_poll_group *poll_group; 107 struct spdk_io_channel *bdev_ch; 108 109 TAILQ_ENTRY(ublk_queue) tailq; 110 }; 111 112 struct spdk_ublk_dev { 113 struct spdk_bdev *bdev; 114 struct spdk_bdev_desc *bdev_desc; 115 116 int cdev_fd; 117 struct ublk_params dev_params; 118 struct ublksrv_ctrl_dev_info dev_info; 119 120 uint32_t ublk_id; 121 uint32_t num_queues; 122 uint32_t queue_depth; 123 uint32_t sector_per_block_shift; 124 struct ublk_queue queues[UBLK_DEV_MAX_QUEUES]; 125 126 struct spdk_poller *retry_poller; 127 int retry_count; 128 uint32_t queues_closed; 129 ublk_start_cb start_cb; 130 ublk_del_cb del_cb; 131 void *cb_arg; 132 uint32_t current_cmd_op; 133 uint32_t ctrl_ops_in_progress; 134 bool is_closing; 135 136 TAILQ_ENTRY(spdk_ublk_dev) tailq; 137 TAILQ_ENTRY(spdk_ublk_dev) wait_tailq; 138 }; 139 140 struct ublk_poll_group { 141 struct spdk_thread *ublk_thread; 142 struct spdk_poller *ublk_poller; 143 struct spdk_iobuf_channel iobuf_ch; 144 TAILQ_HEAD(, ublk_queue) queue_list; 145 }; 146 147 struct ublk_tgt { 148 int ctrl_fd; 149 bool active; 150 bool is_destroying; 151 spdk_ublk_fini_cb cb_fn; 152 void *cb_arg; 153 struct io_uring ctrl_ring; 154 struct spdk_poller *ctrl_poller; 155 uint32_t ctrl_ops_in_progress; 156 struct ublk_poll_group *poll_groups; 157 uint32_t num_ublk_devs; 158 uint64_t features; 159 /* `ublk_drv` supports UBLK_F_CMD_IOCTL_ENCODE */ 160 bool ioctl_encode; 161 /* `ublk_drv` supports UBLK_F_USER_COPY */ 162 bool user_copy; 163 }; 164 165 static TAILQ_HEAD(, spdk_ublk_dev) g_ublk_devs = TAILQ_HEAD_INITIALIZER(g_ublk_devs); 166 static struct ublk_tgt g_ublk_tgt; 167 168 /* helpers for using io_uring */ 169 static inline int 170 ublk_setup_ring(uint32_t depth, struct io_uring *r, unsigned flags) 171 { 172 struct io_uring_params p = {}; 173 174 p.flags = flags | IORING_SETUP_CQSIZE; 175 p.cq_entries = depth; 176 177 return io_uring_queue_init_params(depth, r, &p); 178 } 179 180 static inline struct io_uring_sqe * 181 ublk_uring_get_sqe(struct io_uring *r, uint32_t idx) 182 { 183 /* Need to update the idx since we set IORING_SETUP_SQE128 parameter in ublk_setup_ring */ 184 return &r->sq.sqes[idx << 1]; 185 } 186 187 static inline void * 188 ublk_get_sqe_cmd(struct io_uring_sqe *sqe) 189 { 190 return (void *)&sqe->addr3; 191 } 192 193 static inline void 194 ublk_set_sqe_cmd_op(struct io_uring_sqe *sqe, uint32_t cmd_op) 195 { 196 uint32_t opc = cmd_op; 197 198 if (g_ublk_tgt.ioctl_encode) { 199 switch (cmd_op) { 200 /* ctrl uring */ 201 case UBLK_CMD_GET_DEV_INFO: 202 opc = _IOR('u', UBLK_CMD_GET_DEV_INFO, struct ublksrv_ctrl_cmd); 203 break; 204 case UBLK_CMD_ADD_DEV: 205 opc = _IOWR('u', UBLK_CMD_ADD_DEV, struct ublksrv_ctrl_cmd); 206 break; 207 case UBLK_CMD_DEL_DEV: 208 opc = _IOWR('u', UBLK_CMD_DEL_DEV, struct ublksrv_ctrl_cmd); 209 break; 210 case UBLK_CMD_START_DEV: 211 opc = _IOWR('u', UBLK_CMD_START_DEV, struct ublksrv_ctrl_cmd); 212 break; 213 case UBLK_CMD_STOP_DEV: 214 opc = _IOWR('u', UBLK_CMD_STOP_DEV, struct ublksrv_ctrl_cmd); 215 break; 216 case UBLK_CMD_SET_PARAMS: 217 opc = _IOWR('u', UBLK_CMD_SET_PARAMS, struct ublksrv_ctrl_cmd); 218 break; 219 220 /* io uring */ 221 case UBLK_IO_FETCH_REQ: 222 opc = _IOWR('u', UBLK_IO_FETCH_REQ, struct ublksrv_io_cmd); 223 break; 224 case UBLK_IO_COMMIT_AND_FETCH_REQ: 225 opc = _IOWR('u', UBLK_IO_COMMIT_AND_FETCH_REQ, struct ublksrv_io_cmd); 226 break; 227 case UBLK_IO_NEED_GET_DATA: 228 opc = _IOWR('u', UBLK_IO_NEED_GET_DATA, struct ublksrv_io_cmd); 229 break; 230 default: 231 break; 232 } 233 } 234 235 sqe->off = opc; 236 } 237 238 static inline uint64_t 239 build_user_data(uint16_t tag, uint8_t op) 240 { 241 assert(!(tag >> 16) && !(op >> 8)); 242 243 return tag | (op << 16); 244 } 245 246 static inline uint16_t 247 user_data_to_tag(uint64_t user_data) 248 { 249 return user_data & 0xffff; 250 } 251 252 static inline uint8_t 253 user_data_to_op(uint64_t user_data) 254 { 255 return (user_data >> 16) & 0xff; 256 } 257 258 static inline uint64_t 259 ublk_user_copy_pos(uint16_t q_id, uint16_t tag) 260 { 261 return (uint64_t)UBLKSRV_IO_BUF_OFFSET + ((((uint64_t)q_id) << UBLK_QID_OFF) | ((( 262 uint64_t)tag) << UBLK_TAG_OFF)); 263 } 264 265 void 266 spdk_ublk_init(void) 267 { 268 assert(spdk_thread_is_app_thread(NULL)); 269 270 g_ublk_tgt.ctrl_fd = -1; 271 g_ublk_tgt.ctrl_ring.ring_fd = -1; 272 } 273 274 static void 275 ublk_ctrl_cmd_error(struct spdk_ublk_dev *ublk, int32_t res) 276 { 277 assert(res != 0); 278 279 SPDK_ERRLOG("ctrlr cmd %s failed, %s\n", ublk_op_name[ublk->current_cmd_op], spdk_strerror(-res)); 280 switch (ublk->current_cmd_op) { 281 case UBLK_CMD_ADD_DEV: 282 case UBLK_CMD_SET_PARAMS: 283 if (ublk->start_cb) { 284 ublk->start_cb(ublk->cb_arg, res); 285 ublk->start_cb = NULL; 286 } 287 288 ublk_delete_dev(ublk); 289 break; 290 case UBLK_CMD_START_DEV: 291 if (ublk->start_cb) { 292 ublk->start_cb(ublk->cb_arg, res); 293 ublk->start_cb = NULL; 294 } 295 296 ublk_close_dev(ublk); 297 break; 298 case UBLK_CMD_STOP_DEV: 299 /* TODO: process stop cmd failure */ 300 break; 301 case UBLK_CMD_DEL_DEV: 302 /* TODO: process del cmd failure */ 303 break; 304 default: 305 SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", ublk->current_cmd_op); 306 break; 307 } 308 } 309 310 static void 311 ublk_ctrl_process_cqe(struct io_uring_cqe *cqe) 312 { 313 struct spdk_ublk_dev *ublk; 314 315 ublk = (struct spdk_ublk_dev *)cqe->user_data; 316 UBLK_DEBUGLOG(ublk, "ctrl cmd completed\n"); 317 ublk->ctrl_ops_in_progress--; 318 319 if (spdk_unlikely(cqe->res != 0)) { 320 SPDK_ERRLOG("ctrlr cmd failed\n"); 321 ublk_ctrl_cmd_error(ublk, cqe->res); 322 return; 323 } 324 325 switch (ublk->current_cmd_op) { 326 case UBLK_CMD_ADD_DEV: 327 ublk_set_params(ublk); 328 break; 329 case UBLK_CMD_SET_PARAMS: 330 ublk_finish_start(ublk); 331 break; 332 case UBLK_CMD_START_DEV: 333 if (ublk->start_cb) { 334 ublk->start_cb(ublk->cb_arg, 0); 335 ublk->start_cb = NULL; 336 } 337 break; 338 case UBLK_CMD_STOP_DEV: 339 break; 340 case UBLK_CMD_DEL_DEV: 341 ublk_free_dev(ublk); 342 break; 343 default: 344 SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", ublk->current_cmd_op); 345 break; 346 } 347 } 348 349 static int 350 ublk_ctrl_poller(void *arg) 351 { 352 struct io_uring *ring = &g_ublk_tgt.ctrl_ring; 353 struct io_uring_cqe *cqe; 354 const int max = 8; 355 int i, count = 0, rc; 356 357 if (!g_ublk_tgt.ctrl_ops_in_progress) { 358 return SPDK_POLLER_IDLE; 359 } 360 361 for (i = 0; i < max; i++) { 362 rc = io_uring_peek_cqe(ring, &cqe); 363 if (rc == -EAGAIN) { 364 break; 365 } 366 367 assert(cqe != NULL); 368 g_ublk_tgt.ctrl_ops_in_progress--; 369 370 ublk_ctrl_process_cqe(cqe); 371 372 io_uring_cqe_seen(ring, cqe); 373 count++; 374 } 375 376 return count > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE; 377 } 378 379 static int 380 ublk_ctrl_cmd(struct spdk_ublk_dev *ublk, uint32_t cmd_op) 381 { 382 uint32_t dev_id = ublk->ublk_id; 383 int rc = -EINVAL; 384 struct io_uring_sqe *sqe; 385 struct ublksrv_ctrl_cmd *cmd; 386 387 UBLK_DEBUGLOG(ublk, "ctrl cmd %s\n", ublk_op_name[cmd_op]); 388 389 sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring); 390 if (!sqe) { 391 SPDK_ERRLOG("No available sqe in ctrl ring\n"); 392 assert(false); 393 return -ENOENT; 394 } 395 396 cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe); 397 sqe->fd = g_ublk_tgt.ctrl_fd; 398 sqe->opcode = IORING_OP_URING_CMD; 399 sqe->ioprio = 0; 400 cmd->dev_id = dev_id; 401 cmd->queue_id = -1; 402 ublk->current_cmd_op = cmd_op; 403 404 switch (cmd_op) { 405 case UBLK_CMD_ADD_DEV: 406 cmd->addr = (__u64)(uintptr_t)&ublk->dev_info; 407 cmd->len = sizeof(ublk->dev_info); 408 break; 409 case UBLK_CMD_SET_PARAMS: 410 cmd->addr = (__u64)(uintptr_t)&ublk->dev_params; 411 cmd->len = sizeof(ublk->dev_params); 412 break; 413 case UBLK_CMD_START_DEV: 414 cmd->data[0] = getpid(); 415 break; 416 case UBLK_CMD_STOP_DEV: 417 break; 418 case UBLK_CMD_DEL_DEV: 419 break; 420 default: 421 SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", cmd_op); 422 return -EINVAL; 423 } 424 ublk_set_sqe_cmd_op(sqe, cmd_op); 425 io_uring_sqe_set_data(sqe, ublk); 426 427 rc = io_uring_submit(&g_ublk_tgt.ctrl_ring); 428 if (rc < 0) { 429 SPDK_ERRLOG("uring submit rc %d\n", rc); 430 return rc; 431 } 432 g_ublk_tgt.ctrl_ops_in_progress++; 433 ublk->ctrl_ops_in_progress++; 434 435 return 0; 436 } 437 438 static int 439 ublk_ctrl_cmd_get_features(void) 440 { 441 int rc; 442 struct io_uring_sqe *sqe; 443 struct io_uring_cqe *cqe; 444 struct ublksrv_ctrl_cmd *cmd; 445 uint32_t cmd_op; 446 447 sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring); 448 if (!sqe) { 449 SPDK_ERRLOG("No available sqe in ctrl ring\n"); 450 assert(false); 451 return -ENOENT; 452 } 453 454 cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe); 455 sqe->fd = g_ublk_tgt.ctrl_fd; 456 sqe->opcode = IORING_OP_URING_CMD; 457 sqe->ioprio = 0; 458 cmd->dev_id = -1; 459 cmd->queue_id = -1; 460 cmd->addr = (__u64)(uintptr_t)&g_ublk_tgt.features; 461 cmd->len = sizeof(g_ublk_tgt.features); 462 463 cmd_op = UBLK_U_CMD_GET_FEATURES; 464 ublk_set_sqe_cmd_op(sqe, cmd_op); 465 466 rc = io_uring_submit(&g_ublk_tgt.ctrl_ring); 467 if (rc < 0) { 468 SPDK_ERRLOG("uring submit rc %d\n", rc); 469 return rc; 470 } 471 472 rc = io_uring_wait_cqe(&g_ublk_tgt.ctrl_ring, &cqe); 473 if (rc < 0) { 474 SPDK_ERRLOG("wait cqe rc %d\n", rc); 475 return rc; 476 } 477 478 if (cqe->res == 0) { 479 g_ublk_tgt.ioctl_encode = !!(g_ublk_tgt.features & UBLK_F_CMD_IOCTL_ENCODE); 480 g_ublk_tgt.user_copy = !!(g_ublk_tgt.features & UBLK_F_USER_COPY); 481 } 482 io_uring_cqe_seen(&g_ublk_tgt.ctrl_ring, cqe); 483 484 return 0; 485 } 486 487 static int 488 ublk_queue_cmd_buf_sz(uint32_t q_depth) 489 { 490 uint32_t size = q_depth * sizeof(struct ublksrv_io_desc); 491 uint32_t page_sz = getpagesize(); 492 493 /* round up size */ 494 return (size + page_sz - 1) & ~(page_sz - 1); 495 } 496 497 static int 498 ublk_get_max_support_devs(void) 499 { 500 FILE *file; 501 char str[128]; 502 503 file = fopen("/sys/module/ublk_drv/parameters/ublks_max", "r"); 504 if (!file) { 505 return -ENOENT; 506 } 507 508 if (!fgets(str, sizeof(str), file)) { 509 fclose(file); 510 return -EINVAL; 511 } 512 fclose(file); 513 514 spdk_str_chomp(str); 515 return spdk_strtol(str, 10); 516 } 517 518 static int 519 ublk_open(void) 520 { 521 int rc, ublks_max; 522 523 g_ublk_tgt.ctrl_fd = open(UBLK_CTRL_DEV, O_RDWR); 524 if (g_ublk_tgt.ctrl_fd < 0) { 525 rc = errno; 526 SPDK_ERRLOG("UBLK conrol dev %s can't be opened, error=%s\n", UBLK_CTRL_DEV, spdk_strerror(errno)); 527 return -rc; 528 } 529 530 ublks_max = ublk_get_max_support_devs(); 531 if (ublks_max > 0) { 532 g_ublks_max = ublks_max; 533 } 534 535 /* We need to set SQPOLL for kernels 6.1 and earlier, since they would not defer ublk ctrl 536 * ring processing to a workqueue. Ctrl ring processing is minimal, so SQPOLL is fine. 537 * All the commands sent via control uring for a ublk device is executed one by one, so use 538 * ublks_max * 2 as the number of uring entries is enough. 539 */ 540 rc = ublk_setup_ring(g_ublks_max * 2, &g_ublk_tgt.ctrl_ring, 541 IORING_SETUP_SQE128 | IORING_SETUP_SQPOLL); 542 if (rc < 0) { 543 SPDK_ERRLOG("UBLK ctrl queue_init: %s\n", spdk_strerror(-rc)); 544 goto err; 545 } 546 547 rc = ublk_ctrl_cmd_get_features(); 548 if (rc) { 549 goto err; 550 } 551 552 return 0; 553 554 err: 555 close(g_ublk_tgt.ctrl_fd); 556 g_ublk_tgt.ctrl_fd = -1; 557 return rc; 558 } 559 560 static int 561 ublk_parse_core_mask(const char *mask) 562 { 563 struct spdk_cpuset tmp_mask; 564 int rc; 565 566 if (mask == NULL) { 567 spdk_env_get_cpuset(&g_core_mask); 568 return 0; 569 } 570 571 rc = spdk_cpuset_parse(&g_core_mask, mask); 572 if (rc < 0) { 573 SPDK_ERRLOG("invalid cpumask %s\n", mask); 574 return -EINVAL; 575 } 576 577 if (spdk_cpuset_count(&g_core_mask) == 0) { 578 SPDK_ERRLOG("no cpus specified\n"); 579 return -EINVAL; 580 } 581 582 spdk_env_get_cpuset(&tmp_mask); 583 spdk_cpuset_and(&tmp_mask, &g_core_mask); 584 585 if (!spdk_cpuset_equal(&tmp_mask, &g_core_mask)) { 586 SPDK_ERRLOG("one of selected cpu is outside of core mask(=%s)\n", 587 spdk_cpuset_fmt(&g_core_mask)); 588 return -EINVAL; 589 } 590 591 return 0; 592 } 593 594 static void 595 ublk_poller_register(void *args) 596 { 597 struct ublk_poll_group *poll_group = args; 598 int rc; 599 600 assert(spdk_get_thread() == poll_group->ublk_thread); 601 /* Bind ublk spdk_thread to current CPU core in order to avoid thread context switch 602 * during uring processing as required by ublk kernel. 603 */ 604 spdk_thread_bind(spdk_get_thread(), true); 605 606 TAILQ_INIT(&poll_group->queue_list); 607 poll_group->ublk_poller = SPDK_POLLER_REGISTER(ublk_poll, poll_group, 0); 608 rc = spdk_iobuf_channel_init(&poll_group->iobuf_ch, "ublk", 609 UBLK_IOBUF_SMALL_CACHE_SIZE, UBLK_IOBUF_LARGE_CACHE_SIZE); 610 if (rc != 0) { 611 assert(false); 612 } 613 } 614 615 int 616 ublk_create_target(const char *cpumask_str) 617 { 618 int rc; 619 uint32_t i; 620 char thread_name[32]; 621 struct ublk_poll_group *poll_group; 622 623 if (g_ublk_tgt.active == true) { 624 SPDK_ERRLOG("UBLK target has been created\n"); 625 return -EBUSY; 626 } 627 628 rc = ublk_parse_core_mask(cpumask_str); 629 if (rc != 0) { 630 return rc; 631 } 632 633 assert(g_ublk_tgt.poll_groups == NULL); 634 g_ublk_tgt.poll_groups = calloc(spdk_env_get_core_count(), sizeof(*poll_group)); 635 if (!g_ublk_tgt.poll_groups) { 636 return -ENOMEM; 637 } 638 639 rc = ublk_open(); 640 if (rc != 0) { 641 SPDK_ERRLOG("Fail to open UBLK, error=%s\n", spdk_strerror(-rc)); 642 free(g_ublk_tgt.poll_groups); 643 g_ublk_tgt.poll_groups = NULL; 644 return rc; 645 } 646 647 spdk_iobuf_register_module("ublk"); 648 649 SPDK_ENV_FOREACH_CORE(i) { 650 if (!spdk_cpuset_get_cpu(&g_core_mask, i)) { 651 continue; 652 } 653 snprintf(thread_name, sizeof(thread_name), "ublk_thread%u", i); 654 poll_group = &g_ublk_tgt.poll_groups[g_num_ublk_poll_groups]; 655 poll_group->ublk_thread = spdk_thread_create(thread_name, &g_core_mask); 656 spdk_thread_send_msg(poll_group->ublk_thread, ublk_poller_register, poll_group); 657 g_num_ublk_poll_groups++; 658 } 659 660 assert(spdk_thread_is_app_thread(NULL)); 661 g_ublk_tgt.active = true; 662 g_ublk_tgt.ctrl_ops_in_progress = 0; 663 g_ublk_tgt.ctrl_poller = SPDK_POLLER_REGISTER(ublk_ctrl_poller, NULL, 664 UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US); 665 666 SPDK_NOTICELOG("UBLK target created successfully\n"); 667 668 return 0; 669 } 670 671 static void 672 _ublk_fini_done(void *args) 673 { 674 SPDK_DEBUGLOG(ublk, "\n"); 675 676 g_num_ublk_poll_groups = 0; 677 g_next_ublk_poll_group = 0; 678 g_ublk_tgt.is_destroying = false; 679 g_ublk_tgt.active = false; 680 g_ublk_tgt.features = 0; 681 g_ublk_tgt.ioctl_encode = false; 682 g_ublk_tgt.user_copy = false; 683 684 if (g_ublk_tgt.cb_fn) { 685 g_ublk_tgt.cb_fn(g_ublk_tgt.cb_arg); 686 g_ublk_tgt.cb_fn = NULL; 687 g_ublk_tgt.cb_arg = NULL; 688 } 689 690 if (g_ublk_tgt.poll_groups) { 691 free(g_ublk_tgt.poll_groups); 692 g_ublk_tgt.poll_groups = NULL; 693 } 694 695 } 696 697 static void 698 ublk_thread_exit(void *args) 699 { 700 struct spdk_thread *ublk_thread = spdk_get_thread(); 701 uint32_t i; 702 703 for (i = 0; i < g_num_ublk_poll_groups; i++) { 704 if (g_ublk_tgt.poll_groups[i].ublk_thread == ublk_thread) { 705 spdk_poller_unregister(&g_ublk_tgt.poll_groups[i].ublk_poller); 706 spdk_iobuf_channel_fini(&g_ublk_tgt.poll_groups[i].iobuf_ch); 707 spdk_thread_bind(ublk_thread, false); 708 spdk_thread_exit(ublk_thread); 709 } 710 } 711 } 712 713 static int 714 ublk_close_dev(struct spdk_ublk_dev *ublk) 715 { 716 int rc; 717 718 /* set is_closing */ 719 if (ublk->is_closing) { 720 return -EBUSY; 721 } 722 ublk->is_closing = true; 723 724 rc = ublk_ctrl_cmd(ublk, UBLK_CMD_STOP_DEV); 725 if (rc < 0) { 726 SPDK_ERRLOG("stop dev %d failed\n", ublk->ublk_id); 727 } 728 return rc; 729 } 730 731 static void 732 _ublk_fini(void *args) 733 { 734 struct spdk_ublk_dev *ublk, *ublk_tmp; 735 736 TAILQ_FOREACH_SAFE(ublk, &g_ublk_devs, tailq, ublk_tmp) { 737 ublk_close_dev(ublk); 738 } 739 740 /* Check if all ublks closed */ 741 if (TAILQ_EMPTY(&g_ublk_devs)) { 742 SPDK_DEBUGLOG(ublk, "finish shutdown\n"); 743 spdk_poller_unregister(&g_ublk_tgt.ctrl_poller); 744 if (g_ublk_tgt.ctrl_ring.ring_fd >= 0) { 745 io_uring_queue_exit(&g_ublk_tgt.ctrl_ring); 746 g_ublk_tgt.ctrl_ring.ring_fd = -1; 747 } 748 if (g_ublk_tgt.ctrl_fd >= 0) { 749 close(g_ublk_tgt.ctrl_fd); 750 g_ublk_tgt.ctrl_fd = -1; 751 } 752 spdk_for_each_thread(ublk_thread_exit, NULL, _ublk_fini_done); 753 } else { 754 spdk_thread_send_msg(spdk_get_thread(), _ublk_fini, NULL); 755 } 756 } 757 758 int 759 spdk_ublk_fini(spdk_ublk_fini_cb cb_fn, void *cb_arg) 760 { 761 assert(spdk_thread_is_app_thread(NULL)); 762 763 if (g_ublk_tgt.is_destroying == true) { 764 /* UBLK target is being destroying */ 765 return -EBUSY; 766 } 767 g_ublk_tgt.cb_fn = cb_fn; 768 g_ublk_tgt.cb_arg = cb_arg; 769 g_ublk_tgt.is_destroying = true; 770 _ublk_fini(NULL); 771 772 return 0; 773 } 774 775 int 776 ublk_destroy_target(spdk_ublk_fini_cb cb_fn, void *cb_arg) 777 { 778 int rc; 779 780 if (g_ublk_tgt.active == false) { 781 /* UBLK target has not been created */ 782 return -ENOENT; 783 } 784 785 rc = spdk_ublk_fini(cb_fn, cb_arg); 786 787 return rc; 788 } 789 790 struct spdk_ublk_dev * 791 ublk_dev_find_by_id(uint32_t ublk_id) 792 { 793 struct spdk_ublk_dev *ublk; 794 795 /* check whether ublk has already been registered by ublk path. */ 796 TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) { 797 if (ublk->ublk_id == ublk_id) { 798 return ublk; 799 } 800 } 801 802 return NULL; 803 } 804 805 uint32_t 806 ublk_dev_get_id(struct spdk_ublk_dev *ublk) 807 { 808 return ublk->ublk_id; 809 } 810 811 struct spdk_ublk_dev *ublk_dev_first(void) 812 { 813 return TAILQ_FIRST(&g_ublk_devs); 814 } 815 816 struct spdk_ublk_dev *ublk_dev_next(struct spdk_ublk_dev *prev) 817 { 818 return TAILQ_NEXT(prev, tailq); 819 } 820 821 uint32_t 822 ublk_dev_get_queue_depth(struct spdk_ublk_dev *ublk) 823 { 824 return ublk->queue_depth; 825 } 826 827 uint32_t 828 ublk_dev_get_num_queues(struct spdk_ublk_dev *ublk) 829 { 830 return ublk->num_queues; 831 } 832 833 const char * 834 ublk_dev_get_bdev_name(struct spdk_ublk_dev *ublk) 835 { 836 return spdk_bdev_get_name(ublk->bdev); 837 } 838 839 void 840 spdk_ublk_write_config_json(struct spdk_json_write_ctx *w) 841 { 842 struct spdk_ublk_dev *ublk; 843 844 spdk_json_write_array_begin(w); 845 846 if (g_ublk_tgt.active) { 847 spdk_json_write_object_begin(w); 848 849 spdk_json_write_named_string(w, "method", "ublk_create_target"); 850 spdk_json_write_named_object_begin(w, "params"); 851 spdk_json_write_named_string(w, "cpumask", spdk_cpuset_fmt(&g_core_mask)); 852 spdk_json_write_object_end(w); 853 854 spdk_json_write_object_end(w); 855 } 856 857 TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) { 858 spdk_json_write_object_begin(w); 859 860 spdk_json_write_named_string(w, "method", "ublk_start_disk"); 861 862 spdk_json_write_named_object_begin(w, "params"); 863 spdk_json_write_named_string(w, "bdev_name", ublk_dev_get_bdev_name(ublk)); 864 spdk_json_write_named_uint32(w, "ublk_id", ublk->ublk_id); 865 spdk_json_write_named_uint32(w, "num_queues", ublk->num_queues); 866 spdk_json_write_named_uint32(w, "queue_depth", ublk->queue_depth); 867 spdk_json_write_object_end(w); 868 869 spdk_json_write_object_end(w); 870 } 871 872 spdk_json_write_array_end(w); 873 } 874 875 static void 876 ublk_dev_list_register(struct spdk_ublk_dev *ublk) 877 { 878 UBLK_DEBUGLOG(ublk, "add to tailq\n"); 879 TAILQ_INSERT_TAIL(&g_ublk_devs, ublk, tailq); 880 g_ublk_tgt.num_ublk_devs++; 881 } 882 883 static void 884 ublk_dev_list_unregister(struct spdk_ublk_dev *ublk) 885 { 886 /* 887 * ublk device may be stopped before registered. 888 * check whether it was registered. 889 */ 890 891 if (ublk_dev_find_by_id(ublk->ublk_id)) { 892 UBLK_DEBUGLOG(ublk, "remove from tailq\n"); 893 TAILQ_REMOVE(&g_ublk_devs, ublk, tailq); 894 assert(g_ublk_tgt.num_ublk_devs); 895 g_ublk_tgt.num_ublk_devs--; 896 return; 897 } 898 899 UBLK_DEBUGLOG(ublk, "not found in tailq\n"); 900 assert(false); 901 } 902 903 static void 904 ublk_delete_dev(void *arg) 905 { 906 struct spdk_ublk_dev *ublk = arg; 907 int rc = 0; 908 uint32_t q_idx; 909 910 assert(spdk_thread_is_app_thread(NULL)); 911 for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) { 912 ublk_dev_queue_fini(&ublk->queues[q_idx]); 913 } 914 915 if (ublk->cdev_fd >= 0) { 916 close(ublk->cdev_fd); 917 } 918 919 rc = ublk_ctrl_cmd(ublk, UBLK_CMD_DEL_DEV); 920 if (rc < 0) { 921 SPDK_ERRLOG("delete dev %d failed\n", ublk->ublk_id); 922 } 923 } 924 925 static int 926 _ublk_close_dev_retry(void *arg) 927 { 928 struct spdk_ublk_dev *ublk = arg; 929 930 if (ublk->ctrl_ops_in_progress > 0) { 931 if (ublk->retry_count-- > 0) { 932 return SPDK_POLLER_BUSY; 933 } 934 SPDK_ERRLOG("Timeout on ctrl op completion.\n"); 935 } 936 spdk_poller_unregister(&ublk->retry_poller); 937 ublk_delete_dev(ublk); 938 return SPDK_POLLER_BUSY; 939 } 940 941 static void 942 ublk_try_close_dev(void *arg) 943 { 944 struct spdk_ublk_dev *ublk = arg; 945 946 assert(spdk_thread_is_app_thread(NULL)); 947 948 ublk->queues_closed += 1; 949 SPDK_DEBUGLOG(ublk_io, "ublkb%u closed queues %u\n", ublk->ublk_id, ublk->queues_closed); 950 951 if (ublk->queues_closed < ublk->num_queues) { 952 return; 953 } 954 955 if (ublk->ctrl_ops_in_progress > 0) { 956 assert(ublk->retry_poller == NULL); 957 ublk->retry_count = UBLK_STOP_BUSY_WAITING_MS * 1000ULL / UBLK_BUSY_POLLING_INTERVAL_US; 958 ublk->retry_poller = SPDK_POLLER_REGISTER(_ublk_close_dev_retry, ublk, 959 UBLK_BUSY_POLLING_INTERVAL_US); 960 } else { 961 ublk_delete_dev(ublk); 962 } 963 } 964 965 static void 966 ublk_try_close_queue(struct ublk_queue *q) 967 { 968 struct spdk_ublk_dev *ublk = q->dev; 969 970 /* Close queue until no I/O is submitted to bdev in flight, 971 * no I/O is waiting to commit result, and all I/Os are aborted back. 972 */ 973 if (!TAILQ_EMPTY(&q->inflight_io_list) || !TAILQ_EMPTY(&q->completed_io_list) || q->cmd_inflight) { 974 /* wait for next retry */ 975 return; 976 } 977 978 TAILQ_REMOVE(&q->poll_group->queue_list, q, tailq); 979 spdk_put_io_channel(q->bdev_ch); 980 q->bdev_ch = NULL; 981 982 spdk_thread_send_msg(spdk_thread_get_app_thread(), ublk_try_close_dev, ublk); 983 } 984 985 int 986 ublk_stop_disk(uint32_t ublk_id, ublk_del_cb del_cb, void *cb_arg) 987 { 988 struct spdk_ublk_dev *ublk; 989 990 assert(spdk_thread_is_app_thread(NULL)); 991 992 ublk = ublk_dev_find_by_id(ublk_id); 993 if (ublk == NULL) { 994 SPDK_ERRLOG("no ublk dev with ublk_id=%u\n", ublk_id); 995 return -ENODEV; 996 } 997 if (ublk->is_closing) { 998 SPDK_WARNLOG("ublk %d is closing\n", ublk->ublk_id); 999 return -EBUSY; 1000 } 1001 1002 ublk->del_cb = del_cb; 1003 ublk->cb_arg = cb_arg; 1004 return ublk_close_dev(ublk); 1005 } 1006 1007 static inline void 1008 ublk_mark_io_done(struct ublk_io *io, int res) 1009 { 1010 /* 1011 * mark io done by target, so that SPDK can commit its 1012 * result and fetch new request via io_uring command. 1013 */ 1014 io->cmd_op = UBLK_IO_COMMIT_AND_FETCH_REQ; 1015 io->result = res; 1016 io->need_data = false; 1017 } 1018 1019 static void 1020 ublk_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1021 { 1022 struct ublk_io *io = cb_arg; 1023 struct ublk_queue *q = io->q; 1024 int res; 1025 1026 if (success) { 1027 res = io->result; 1028 } else { 1029 res = -EIO; 1030 } 1031 1032 ublk_mark_io_done(io, res); 1033 1034 SPDK_DEBUGLOG(ublk_io, "(qid %d tag %d res %d)\n", 1035 q->q_id, io->tag, res); 1036 TAILQ_REMOVE(&q->inflight_io_list, io, tailq); 1037 TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq); 1038 1039 if (bdev_io != NULL) { 1040 spdk_bdev_free_io(bdev_io); 1041 } 1042 } 1043 1044 static void 1045 ublk_queue_user_copy(struct ublk_io *io, bool is_write) 1046 { 1047 struct ublk_queue *q = io->q; 1048 const struct ublksrv_io_desc *iod = io->iod; 1049 struct io_uring_sqe *sqe; 1050 uint64_t pos; 1051 uint32_t nbytes; 1052 1053 nbytes = iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT); 1054 pos = ublk_user_copy_pos(q->q_id, io->tag); 1055 sqe = io_uring_get_sqe(&q->ring); 1056 assert(sqe); 1057 1058 if (is_write) { 1059 io_uring_prep_read(sqe, 0, io->payload, nbytes, pos); 1060 } else { 1061 io_uring_prep_write(sqe, 0, io->payload, nbytes, pos); 1062 } 1063 io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); 1064 io_uring_sqe_set_data64(sqe, build_user_data(io->tag, 0)); 1065 1066 io->user_copy = true; 1067 TAILQ_REMOVE(&q->inflight_io_list, io, tailq); 1068 TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq); 1069 } 1070 1071 static void 1072 ublk_user_copy_read_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1073 { 1074 struct ublk_io *io = cb_arg; 1075 1076 spdk_bdev_free_io(bdev_io); 1077 1078 if (success) { 1079 ublk_queue_user_copy(io, false); 1080 return; 1081 } 1082 /* READ IO Error */ 1083 ublk_io_done(NULL, false, cb_arg); 1084 } 1085 1086 static void 1087 ublk_resubmit_io(void *arg) 1088 { 1089 struct ublk_io *io = (struct ublk_io *)arg; 1090 1091 _ublk_submit_bdev_io(io->q, io); 1092 } 1093 1094 static void 1095 ublk_queue_io(struct ublk_io *io) 1096 { 1097 int rc; 1098 struct spdk_bdev *bdev = io->q->dev->bdev; 1099 struct ublk_queue *q = io->q; 1100 1101 io->bdev_io_wait.bdev = bdev; 1102 io->bdev_io_wait.cb_fn = ublk_resubmit_io; 1103 io->bdev_io_wait.cb_arg = io; 1104 1105 rc = spdk_bdev_queue_io_wait(bdev, q->bdev_ch, &io->bdev_io_wait); 1106 if (rc != 0) { 1107 SPDK_ERRLOG("Queue io failed in ublk_queue_io, rc=%d.\n", rc); 1108 ublk_io_done(NULL, false, io); 1109 } 1110 } 1111 1112 static void 1113 ublk_io_get_buffer_cb(struct spdk_iobuf_entry *iobuf, void *buf) 1114 { 1115 struct ublk_io *io = SPDK_CONTAINEROF(iobuf, struct ublk_io, iobuf); 1116 1117 io->mpool_entry = buf; 1118 assert(io->payload == NULL); 1119 io->payload = (void *)(uintptr_t)SPDK_ALIGN_CEIL((uintptr_t)buf, 4096ULL); 1120 io->get_buf_cb(io); 1121 } 1122 1123 static void 1124 ublk_io_get_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch, 1125 ublk_get_buf_cb get_buf_cb) 1126 { 1127 void *buf; 1128 1129 io->payload_size = io->iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT); 1130 io->get_buf_cb = get_buf_cb; 1131 buf = spdk_iobuf_get(iobuf_ch, io->payload_size, &io->iobuf, ublk_io_get_buffer_cb); 1132 1133 if (buf != NULL) { 1134 ublk_io_get_buffer_cb(&io->iobuf, buf); 1135 } 1136 } 1137 1138 static void 1139 ublk_io_put_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch) 1140 { 1141 if (io->payload) { 1142 spdk_iobuf_put(iobuf_ch, io->mpool_entry, io->payload_size); 1143 io->mpool_entry = NULL; 1144 io->payload = NULL; 1145 } 1146 } 1147 1148 static void 1149 _ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io) 1150 { 1151 struct spdk_ublk_dev *ublk = q->dev; 1152 struct spdk_bdev_desc *desc = io->bdev_desc; 1153 struct spdk_io_channel *ch = io->bdev_ch; 1154 uint64_t offset_blocks, num_blocks; 1155 spdk_bdev_io_completion_cb read_cb; 1156 uint8_t ublk_op; 1157 int rc = 0; 1158 const struct ublksrv_io_desc *iod = io->iod; 1159 1160 ublk_op = ublksrv_get_op(iod); 1161 offset_blocks = iod->start_sector >> ublk->sector_per_block_shift; 1162 num_blocks = iod->nr_sectors >> ublk->sector_per_block_shift; 1163 1164 switch (ublk_op) { 1165 case UBLK_IO_OP_READ: 1166 if (g_ublk_tgt.user_copy) { 1167 read_cb = ublk_user_copy_read_done; 1168 } else { 1169 read_cb = ublk_io_done; 1170 } 1171 rc = spdk_bdev_read_blocks(desc, ch, io->payload, offset_blocks, num_blocks, read_cb, io); 1172 break; 1173 case UBLK_IO_OP_WRITE: 1174 rc = spdk_bdev_write_blocks(desc, ch, io->payload, offset_blocks, num_blocks, ublk_io_done, io); 1175 break; 1176 case UBLK_IO_OP_FLUSH: 1177 rc = spdk_bdev_flush_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io); 1178 break; 1179 case UBLK_IO_OP_DISCARD: 1180 rc = spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io); 1181 break; 1182 case UBLK_IO_OP_WRITE_ZEROES: 1183 rc = spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io); 1184 break; 1185 default: 1186 rc = -1; 1187 } 1188 1189 if (rc < 0) { 1190 if (rc == -ENOMEM) { 1191 SPDK_INFOLOG(ublk, "No memory, start to queue io.\n"); 1192 ublk_queue_io(io); 1193 } else { 1194 SPDK_ERRLOG("ublk io failed in ublk_queue_io, rc=%d.\n", rc); 1195 ublk_io_done(NULL, false, io); 1196 } 1197 } 1198 } 1199 1200 static void 1201 read_get_buffer_done(struct ublk_io *io) 1202 { 1203 _ublk_submit_bdev_io(io->q, io); 1204 } 1205 1206 static void 1207 user_copy_write_get_buffer_done(struct ublk_io *io) 1208 { 1209 ublk_queue_user_copy(io, true); 1210 } 1211 1212 static void 1213 ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io) 1214 { 1215 struct spdk_iobuf_channel *iobuf_ch = &q->poll_group->iobuf_ch; 1216 const struct ublksrv_io_desc *iod = io->iod; 1217 uint8_t ublk_op; 1218 1219 io->result = iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT); 1220 ublk_op = ublksrv_get_op(iod); 1221 switch (ublk_op) { 1222 case UBLK_IO_OP_READ: 1223 ublk_io_get_buffer(io, iobuf_ch, read_get_buffer_done); 1224 break; 1225 case UBLK_IO_OP_WRITE: 1226 if (g_ublk_tgt.user_copy) { 1227 ublk_io_get_buffer(io, iobuf_ch, user_copy_write_get_buffer_done); 1228 } else { 1229 _ublk_submit_bdev_io(q, io); 1230 } 1231 break; 1232 default: 1233 _ublk_submit_bdev_io(q, io); 1234 break; 1235 } 1236 } 1237 1238 static inline void 1239 ublksrv_queue_io_cmd(struct ublk_queue *q, 1240 struct ublk_io *io, unsigned tag) 1241 { 1242 struct ublksrv_io_cmd *cmd; 1243 struct io_uring_sqe *sqe; 1244 unsigned int cmd_op = 0;; 1245 uint64_t user_data; 1246 1247 /* each io should have operation of fetching or committing */ 1248 assert((io->cmd_op == UBLK_IO_FETCH_REQ) || (io->cmd_op == UBLK_IO_NEED_GET_DATA) || 1249 (io->cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ)); 1250 cmd_op = io->cmd_op; 1251 1252 sqe = io_uring_get_sqe(&q->ring); 1253 assert(sqe); 1254 1255 cmd = (struct ublksrv_io_cmd *)ublk_get_sqe_cmd(sqe); 1256 if (cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ) { 1257 cmd->result = io->result; 1258 } 1259 1260 /* These fields should be written once, never change */ 1261 ublk_set_sqe_cmd_op(sqe, cmd_op); 1262 /* dev->cdev_fd */ 1263 sqe->fd = 0; 1264 sqe->opcode = IORING_OP_URING_CMD; 1265 sqe->flags = IOSQE_FIXED_FILE; 1266 sqe->rw_flags = 0; 1267 cmd->tag = tag; 1268 cmd->addr = g_ublk_tgt.user_copy ? 0 : (__u64)(uintptr_t)(io->payload); 1269 cmd->q_id = q->q_id; 1270 1271 user_data = build_user_data(tag, cmd_op); 1272 io_uring_sqe_set_data64(sqe, user_data); 1273 1274 io->cmd_op = 0; 1275 1276 SPDK_DEBUGLOG(ublk_io, "(qid %d tag %u cmd_op %u) iof %x stopping %d\n", 1277 q->q_id, tag, cmd_op, 1278 io->cmd_op, q->is_stopping); 1279 } 1280 1281 static int 1282 ublk_io_xmit(struct ublk_queue *q) 1283 { 1284 TAILQ_HEAD(, ublk_io) buffer_free_list; 1285 struct spdk_iobuf_channel *iobuf_ch; 1286 int rc = 0, count = 0; 1287 struct ublk_io *io; 1288 1289 if (TAILQ_EMPTY(&q->completed_io_list)) { 1290 return 0; 1291 } 1292 1293 TAILQ_INIT(&buffer_free_list); 1294 while (!TAILQ_EMPTY(&q->completed_io_list)) { 1295 io = TAILQ_FIRST(&q->completed_io_list); 1296 assert(io != NULL); 1297 /* 1298 * Remove IO from list now assuming it will be completed. It will be inserted 1299 * back to the head if it cannot be completed. This approach is specifically 1300 * taken to work around a scan-build use-after-free mischaracterization. 1301 */ 1302 TAILQ_REMOVE(&q->completed_io_list, io, tailq); 1303 if (!io->user_copy) { 1304 if (!io->need_data) { 1305 TAILQ_INSERT_TAIL(&buffer_free_list, io, tailq); 1306 } 1307 ublksrv_queue_io_cmd(q, io, io->tag); 1308 } 1309 count++; 1310 } 1311 1312 q->cmd_inflight += count; 1313 rc = io_uring_submit(&q->ring); 1314 if (rc != count) { 1315 SPDK_ERRLOG("could not submit all commands\n"); 1316 assert(false); 1317 } 1318 1319 /* Note: for READ io, ublk will always copy the data out of 1320 * the buffers in the io_uring_submit context. Since we 1321 * are not using SQPOLL for IO rings, we can safely free 1322 * those IO buffers here. This design doesn't seem ideal, 1323 * but it's what's possible since there is no discrete 1324 * COMMIT_REQ operation. That will need to change in the 1325 * future should we ever want to support async copy 1326 * operations. 1327 */ 1328 iobuf_ch = &q->poll_group->iobuf_ch; 1329 while (!TAILQ_EMPTY(&buffer_free_list)) { 1330 io = TAILQ_FIRST(&buffer_free_list); 1331 TAILQ_REMOVE(&buffer_free_list, io, tailq); 1332 ublk_io_put_buffer(io, iobuf_ch); 1333 } 1334 return rc; 1335 } 1336 1337 static void 1338 write_get_buffer_done(struct ublk_io *io) 1339 { 1340 io->need_data = true; 1341 io->cmd_op = UBLK_IO_NEED_GET_DATA; 1342 io->result = 0; 1343 1344 TAILQ_REMOVE(&io->q->inflight_io_list, io, tailq); 1345 TAILQ_INSERT_TAIL(&io->q->completed_io_list, io, tailq); 1346 } 1347 1348 static int 1349 ublk_io_recv(struct ublk_queue *q) 1350 { 1351 struct io_uring_cqe *cqe; 1352 unsigned head, tag; 1353 int fetch, count = 0; 1354 struct ublk_io *io; 1355 struct spdk_iobuf_channel *iobuf_ch; 1356 1357 if (q->cmd_inflight == 0) { 1358 return 0; 1359 } 1360 1361 iobuf_ch = &q->poll_group->iobuf_ch; 1362 io_uring_for_each_cqe(&q->ring, head, cqe) { 1363 tag = user_data_to_tag(cqe->user_data); 1364 io = &q->ios[tag]; 1365 1366 SPDK_DEBUGLOG(ublk_io, "res %d qid %d tag %u, user copy %u, cmd_op %u\n", 1367 cqe->res, q->q_id, tag, io->user_copy, user_data_to_op(cqe->user_data)); 1368 1369 q->cmd_inflight--; 1370 TAILQ_INSERT_TAIL(&q->inflight_io_list, io, tailq); 1371 1372 if (!io->user_copy) { 1373 fetch = (cqe->res != UBLK_IO_RES_ABORT) && !q->is_stopping; 1374 if (!fetch) { 1375 q->is_stopping = true; 1376 if (io->cmd_op == UBLK_IO_FETCH_REQ) { 1377 io->cmd_op = 0; 1378 } 1379 } 1380 1381 if (cqe->res == UBLK_IO_RES_OK) { 1382 ublk_submit_bdev_io(q, io); 1383 } else if (cqe->res == UBLK_IO_RES_NEED_GET_DATA) { 1384 ublk_io_get_buffer(io, iobuf_ch, write_get_buffer_done); 1385 } else { 1386 if (cqe->res != UBLK_IO_RES_ABORT) { 1387 SPDK_ERRLOG("ublk received error io: res %d qid %d tag %u cmd_op %u\n", 1388 cqe->res, q->q_id, tag, user_data_to_op(cqe->user_data)); 1389 } 1390 TAILQ_REMOVE(&q->inflight_io_list, io, tailq); 1391 } 1392 } else { 1393 1394 /* clear `user_copy` for next use of this IO structure */ 1395 io->user_copy = false; 1396 1397 assert((ublksrv_get_op(io->iod) == UBLK_IO_OP_READ) || 1398 (ublksrv_get_op(io->iod) == UBLK_IO_OP_WRITE)); 1399 if (cqe->res != io->result) { 1400 /* EIO */ 1401 ublk_io_done(NULL, false, io); 1402 } else { 1403 if (ublksrv_get_op(io->iod) == UBLK_IO_OP_READ) { 1404 /* bdev_io is already freed in first READ cycle */ 1405 ublk_io_done(NULL, true, io); 1406 } else { 1407 _ublk_submit_bdev_io(q, io); 1408 } 1409 } 1410 } 1411 count += 1; 1412 if (count == UBLK_QUEUE_REQUEST) { 1413 break; 1414 } 1415 } 1416 io_uring_cq_advance(&q->ring, count); 1417 1418 return count; 1419 } 1420 1421 static int 1422 ublk_poll(void *arg) 1423 { 1424 struct ublk_poll_group *poll_group = arg; 1425 struct ublk_queue *q, *q_tmp; 1426 int sent, received, count = 0; 1427 1428 TAILQ_FOREACH_SAFE(q, &poll_group->queue_list, tailq, q_tmp) { 1429 sent = ublk_io_xmit(q); 1430 received = ublk_io_recv(q); 1431 if (spdk_unlikely(q->is_stopping)) { 1432 ublk_try_close_queue(q); 1433 } 1434 count += sent + received; 1435 } 1436 if (count > 0) { 1437 return SPDK_POLLER_BUSY; 1438 } else { 1439 return SPDK_POLLER_IDLE; 1440 } 1441 } 1442 1443 static void 1444 ublk_bdev_hot_remove(struct spdk_ublk_dev *ublk) 1445 { 1446 ublk_close_dev(ublk); 1447 } 1448 1449 static void 1450 ublk_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, 1451 void *event_ctx) 1452 { 1453 switch (type) { 1454 case SPDK_BDEV_EVENT_REMOVE: 1455 ublk_bdev_hot_remove(event_ctx); 1456 break; 1457 default: 1458 SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); 1459 break; 1460 } 1461 } 1462 1463 static void 1464 ublk_dev_init_io_cmds(struct io_uring *r, uint32_t q_depth) 1465 { 1466 struct io_uring_sqe *sqe; 1467 uint32_t i; 1468 1469 for (i = 0; i < q_depth; i++) { 1470 sqe = ublk_uring_get_sqe(r, i); 1471 1472 /* These fields should be written once, never change */ 1473 sqe->flags = IOSQE_FIXED_FILE; 1474 sqe->rw_flags = 0; 1475 sqe->ioprio = 0; 1476 sqe->off = 0; 1477 } 1478 } 1479 1480 static int 1481 ublk_dev_queue_init(struct ublk_queue *q) 1482 { 1483 int rc = 0, cmd_buf_size; 1484 uint32_t j; 1485 struct spdk_ublk_dev *ublk = q->dev; 1486 unsigned long off; 1487 1488 cmd_buf_size = ublk_queue_cmd_buf_sz(q->q_depth); 1489 off = UBLKSRV_CMD_BUF_OFFSET + 1490 q->q_id * (UBLK_MAX_QUEUE_DEPTH * sizeof(struct ublksrv_io_desc)); 1491 q->io_cmd_buf = (struct ublksrv_io_desc *)mmap(0, cmd_buf_size, PROT_READ, 1492 MAP_SHARED | MAP_POPULATE, ublk->cdev_fd, off); 1493 if (q->io_cmd_buf == MAP_FAILED) { 1494 q->io_cmd_buf = NULL; 1495 rc = -errno; 1496 SPDK_ERRLOG("Failed at mmap: %s\n", spdk_strerror(-rc)); 1497 goto err; 1498 } 1499 1500 for (j = 0; j < q->q_depth; j++) { 1501 q->ios[j].cmd_op = UBLK_IO_FETCH_REQ; 1502 q->ios[j].iod = &q->io_cmd_buf[j]; 1503 } 1504 1505 rc = ublk_setup_ring(q->q_depth, &q->ring, IORING_SETUP_SQE128); 1506 if (rc < 0) { 1507 SPDK_ERRLOG("Failed at setup uring: %s\n", spdk_strerror(-rc)); 1508 munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth)); 1509 q->io_cmd_buf = NULL; 1510 goto err; 1511 } 1512 1513 rc = io_uring_register_files(&q->ring, &ublk->cdev_fd, 1); 1514 if (rc != 0) { 1515 SPDK_ERRLOG("Failed at uring register files: %s\n", spdk_strerror(-rc)); 1516 io_uring_queue_exit(&q->ring); 1517 q->ring.ring_fd = -1; 1518 munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth)); 1519 q->io_cmd_buf = NULL; 1520 goto err; 1521 } 1522 1523 ublk_dev_init_io_cmds(&q->ring, q->q_depth); 1524 1525 err: 1526 return rc; 1527 } 1528 1529 static void 1530 ublk_dev_queue_fini(struct ublk_queue *q) 1531 { 1532 if (q->ring.ring_fd >= 0) { 1533 io_uring_unregister_files(&q->ring); 1534 io_uring_queue_exit(&q->ring); 1535 q->ring.ring_fd = -1; 1536 } 1537 if (q->io_cmd_buf) { 1538 munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth)); 1539 } 1540 } 1541 1542 static void 1543 ublk_dev_queue_io_init(struct ublk_queue *q) 1544 { 1545 struct ublk_io *io; 1546 uint32_t i; 1547 int rc __attribute__((unused)); 1548 void *buf; 1549 1550 /* Some older kernels require a buffer to get posted, even 1551 * when NEED_GET_DATA has been specified. So allocate a 1552 * temporary buffer, only for purposes of this workaround. 1553 * It never actually gets used, so we will free it immediately 1554 * after all of the commands are posted. 1555 */ 1556 buf = malloc(64); 1557 1558 assert(q->bdev_ch != NULL); 1559 1560 /* Initialize and submit all io commands to ublk driver */ 1561 for (i = 0; i < q->q_depth; i++) { 1562 io = &q->ios[i]; 1563 io->tag = (uint16_t)i; 1564 io->payload = buf; 1565 io->bdev_ch = q->bdev_ch; 1566 io->bdev_desc = q->dev->bdev_desc; 1567 ublksrv_queue_io_cmd(q, io, i); 1568 } 1569 1570 q->cmd_inflight += q->q_depth; 1571 rc = io_uring_submit(&q->ring); 1572 assert(rc == (int)q->q_depth); 1573 for (i = 0; i < q->q_depth; i++) { 1574 io = &q->ios[i]; 1575 io->payload = NULL; 1576 } 1577 free(buf); 1578 } 1579 1580 static void 1581 ublk_set_params(struct spdk_ublk_dev *ublk) 1582 { 1583 int rc; 1584 1585 ublk->dev_params.len = sizeof(struct ublk_params); 1586 rc = ublk_ctrl_cmd(ublk, UBLK_CMD_SET_PARAMS); 1587 if (rc < 0) { 1588 SPDK_ERRLOG("UBLK can't set params for dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc)); 1589 ublk_delete_dev(ublk); 1590 if (ublk->start_cb) { 1591 ublk->start_cb(ublk->cb_arg, rc); 1592 ublk->start_cb = NULL; 1593 } 1594 } 1595 } 1596 1597 /* Set ublk device parameters based on bdev */ 1598 static void 1599 ublk_info_param_init(struct spdk_ublk_dev *ublk) 1600 { 1601 struct spdk_bdev *bdev = ublk->bdev; 1602 uint32_t blk_size = spdk_bdev_get_data_block_size(bdev); 1603 uint32_t pblk_size = spdk_bdev_get_physical_block_size(bdev); 1604 uint32_t io_opt_blocks = spdk_bdev_get_optimal_io_boundary(bdev); 1605 uint64_t num_blocks = spdk_bdev_get_num_blocks(bdev); 1606 uint8_t sectors_per_block = blk_size >> LINUX_SECTOR_SHIFT; 1607 uint32_t io_min_size = blk_size; 1608 uint32_t io_opt_size = spdk_max(io_opt_blocks * blk_size, io_min_size); 1609 1610 struct ublksrv_ctrl_dev_info uinfo = { 1611 .queue_depth = ublk->queue_depth, 1612 .nr_hw_queues = ublk->num_queues, 1613 .dev_id = ublk->ublk_id, 1614 .max_io_buf_bytes = UBLK_IO_MAX_BYTES, 1615 .ublksrv_pid = getpid(), 1616 .flags = UBLK_F_URING_CMD_COMP_IN_TASK, 1617 }; 1618 struct ublk_params uparams = { 1619 .types = UBLK_PARAM_TYPE_BASIC, 1620 .basic = { 1621 .logical_bs_shift = spdk_u32log2(blk_size), 1622 .physical_bs_shift = spdk_u32log2(pblk_size), 1623 .io_min_shift = spdk_u32log2(io_min_size), 1624 .io_opt_shift = spdk_u32log2(io_opt_size), 1625 .dev_sectors = num_blocks * sectors_per_block, 1626 .max_sectors = UBLK_IO_MAX_BYTES >> LINUX_SECTOR_SHIFT, 1627 } 1628 }; 1629 1630 if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) { 1631 uparams.types |= UBLK_PARAM_TYPE_DISCARD; 1632 uparams.discard.discard_alignment = sectors_per_block; 1633 uparams.discard.max_discard_sectors = num_blocks * sectors_per_block; 1634 uparams.discard.max_discard_segments = 1; 1635 uparams.discard.discard_granularity = blk_size; 1636 if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 1637 uparams.discard.max_write_zeroes_sectors = num_blocks * sectors_per_block; 1638 } 1639 } 1640 1641 if (g_ublk_tgt.user_copy) { 1642 uinfo.flags |= UBLK_F_USER_COPY; 1643 } else { 1644 uinfo.flags |= UBLK_F_NEED_GET_DATA; 1645 } 1646 1647 ublk->dev_info = uinfo; 1648 ublk->dev_params = uparams; 1649 } 1650 1651 static void 1652 _ublk_free_dev(void *arg) 1653 { 1654 struct spdk_ublk_dev *ublk = arg; 1655 1656 ublk_free_dev(ublk); 1657 } 1658 1659 static void 1660 free_buffers(void *arg) 1661 { 1662 struct ublk_queue *q = arg; 1663 uint32_t i; 1664 1665 for (i = 0; i < q->q_depth; i++) { 1666 ublk_io_put_buffer(&q->ios[i], &q->poll_group->iobuf_ch); 1667 } 1668 free(q->ios); 1669 q->ios = NULL; 1670 spdk_thread_send_msg(spdk_thread_get_app_thread(), _ublk_free_dev, q->dev); 1671 } 1672 1673 static void 1674 ublk_free_dev(struct spdk_ublk_dev *ublk) 1675 { 1676 struct ublk_queue *q; 1677 uint32_t q_idx; 1678 1679 for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) { 1680 q = &ublk->queues[q_idx]; 1681 1682 /* The ublk_io of this queue are not initialized. */ 1683 if (q->ios == NULL) { 1684 continue; 1685 } 1686 1687 /* We found a queue that has an ios array that may have buffers 1688 * that need to be freed. Send a message to the queue's thread 1689 * so it can free the buffers back to that thread's iobuf channel. 1690 * When it's done, it will set q->ios to NULL and send a message 1691 * back to this function to continue. 1692 */ 1693 if (q->poll_group) { 1694 spdk_thread_send_msg(q->poll_group->ublk_thread, free_buffers, q); 1695 return; 1696 } else { 1697 free(q->ios); 1698 q->ios = NULL; 1699 } 1700 } 1701 1702 /* All of the buffers associated with the queues have been freed, so now 1703 * continue with releasing resources for the rest of the ublk device. 1704 */ 1705 if (ublk->bdev_desc) { 1706 spdk_bdev_close(ublk->bdev_desc); 1707 ublk->bdev_desc = NULL; 1708 } 1709 1710 ublk_dev_list_unregister(ublk); 1711 1712 if (ublk->del_cb) { 1713 ublk->del_cb(ublk->cb_arg); 1714 } 1715 SPDK_NOTICELOG("ublk dev %d stopped\n", ublk->ublk_id); 1716 free(ublk); 1717 } 1718 1719 static int 1720 ublk_ios_init(struct spdk_ublk_dev *ublk) 1721 { 1722 int rc; 1723 uint32_t i, j; 1724 struct ublk_queue *q; 1725 1726 for (i = 0; i < ublk->num_queues; i++) { 1727 q = &ublk->queues[i]; 1728 1729 TAILQ_INIT(&q->completed_io_list); 1730 TAILQ_INIT(&q->inflight_io_list); 1731 q->dev = ublk; 1732 q->q_id = i; 1733 q->q_depth = ublk->queue_depth; 1734 q->ios = calloc(q->q_depth, sizeof(struct ublk_io)); 1735 if (!q->ios) { 1736 rc = -ENOMEM; 1737 SPDK_ERRLOG("could not allocate queue ios\n"); 1738 goto err; 1739 } 1740 for (j = 0; j < q->q_depth; j++) { 1741 q->ios[j].q = q; 1742 } 1743 } 1744 1745 return 0; 1746 1747 err: 1748 for (i = 0; i < ublk->num_queues; i++) { 1749 free(q->ios); 1750 q->ios = NULL; 1751 } 1752 return rc; 1753 } 1754 1755 static void 1756 ublk_queue_run(void *arg1) 1757 { 1758 struct ublk_queue *q = arg1; 1759 struct spdk_ublk_dev *ublk = q->dev; 1760 struct ublk_poll_group *poll_group = q->poll_group; 1761 1762 assert(spdk_get_thread() == poll_group->ublk_thread); 1763 q->bdev_ch = spdk_bdev_get_io_channel(ublk->bdev_desc); 1764 /* Queues must be filled with IO in the io pthread */ 1765 ublk_dev_queue_io_init(q); 1766 1767 TAILQ_INSERT_TAIL(&poll_group->queue_list, q, tailq); 1768 } 1769 1770 int 1771 ublk_start_disk(const char *bdev_name, uint32_t ublk_id, 1772 uint32_t num_queues, uint32_t queue_depth, 1773 ublk_start_cb start_cb, void *cb_arg) 1774 { 1775 int rc; 1776 uint32_t i; 1777 struct spdk_bdev *bdev; 1778 struct spdk_ublk_dev *ublk = NULL; 1779 uint32_t sector_per_block; 1780 1781 assert(spdk_thread_is_app_thread(NULL)); 1782 1783 if (g_ublk_tgt.active == false) { 1784 SPDK_ERRLOG("NO ublk target exist\n"); 1785 return -ENODEV; 1786 } 1787 1788 ublk = ublk_dev_find_by_id(ublk_id); 1789 if (ublk != NULL) { 1790 SPDK_DEBUGLOG(ublk, "ublk id %d is in use.\n", ublk_id); 1791 return -EBUSY; 1792 } 1793 1794 if (g_ublk_tgt.num_ublk_devs >= g_ublks_max) { 1795 SPDK_DEBUGLOG(ublk, "Reached maximum number of supported devices: %u\n", g_ublks_max); 1796 return -ENOTSUP; 1797 } 1798 1799 ublk = calloc(1, sizeof(*ublk)); 1800 if (ublk == NULL) { 1801 return -ENOMEM; 1802 } 1803 ublk->start_cb = start_cb; 1804 ublk->cb_arg = cb_arg; 1805 ublk->cdev_fd = -1; 1806 ublk->ublk_id = ublk_id; 1807 UBLK_DEBUGLOG(ublk, "bdev %s num_queues %d queue_depth %d\n", 1808 bdev_name, num_queues, queue_depth); 1809 1810 rc = spdk_bdev_open_ext(bdev_name, true, ublk_bdev_event_cb, ublk, &ublk->bdev_desc); 1811 if (rc != 0) { 1812 SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc); 1813 free(ublk); 1814 return rc; 1815 } 1816 1817 bdev = spdk_bdev_desc_get_bdev(ublk->bdev_desc); 1818 ublk->bdev = bdev; 1819 sector_per_block = spdk_bdev_get_data_block_size(ublk->bdev) >> LINUX_SECTOR_SHIFT; 1820 ublk->sector_per_block_shift = spdk_u32log2(sector_per_block); 1821 1822 ublk->queues_closed = 0; 1823 ublk->num_queues = num_queues; 1824 ublk->queue_depth = queue_depth; 1825 if (ublk->queue_depth > UBLK_DEV_MAX_QUEUE_DEPTH) { 1826 SPDK_WARNLOG("Set Queue depth %d of UBLK %d to maximum %d\n", 1827 ublk->queue_depth, ublk->ublk_id, UBLK_DEV_MAX_QUEUE_DEPTH); 1828 ublk->queue_depth = UBLK_DEV_MAX_QUEUE_DEPTH; 1829 } 1830 if (ublk->num_queues > UBLK_DEV_MAX_QUEUES) { 1831 SPDK_WARNLOG("Set Queue num %d of UBLK %d to maximum %d\n", 1832 ublk->num_queues, ublk->ublk_id, UBLK_DEV_MAX_QUEUES); 1833 ublk->num_queues = UBLK_DEV_MAX_QUEUES; 1834 } 1835 for (i = 0; i < ublk->num_queues; i++) { 1836 ublk->queues[i].ring.ring_fd = -1; 1837 } 1838 1839 ublk_info_param_init(ublk); 1840 rc = ublk_ios_init(ublk); 1841 if (rc != 0) { 1842 spdk_bdev_close(ublk->bdev_desc); 1843 free(ublk); 1844 return rc; 1845 } 1846 1847 SPDK_INFOLOG(ublk, "Enabling kernel access to bdev %s via ublk %d\n", 1848 bdev_name, ublk_id); 1849 1850 /* Add ublk_dev to the end of disk list */ 1851 ublk_dev_list_register(ublk); 1852 rc = ublk_ctrl_cmd(ublk, UBLK_CMD_ADD_DEV); 1853 if (rc < 0) { 1854 SPDK_ERRLOG("UBLK can't add dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc)); 1855 ublk_free_dev(ublk); 1856 } 1857 1858 return rc; 1859 } 1860 1861 static void 1862 ublk_finish_start(struct spdk_ublk_dev *ublk) 1863 { 1864 int rc; 1865 uint32_t q_id; 1866 struct spdk_thread *ublk_thread; 1867 char buf[64]; 1868 1869 snprintf(buf, 64, "%s%d", UBLK_BLK_CDEV, ublk->ublk_id); 1870 ublk->cdev_fd = open(buf, O_RDWR); 1871 if (ublk->cdev_fd < 0) { 1872 rc = ublk->cdev_fd; 1873 SPDK_ERRLOG("can't open %s, rc %d\n", buf, rc); 1874 goto err; 1875 } 1876 1877 for (q_id = 0; q_id < ublk->num_queues; q_id++) { 1878 rc = ublk_dev_queue_init(&ublk->queues[q_id]); 1879 if (rc) { 1880 goto err; 1881 } 1882 } 1883 1884 rc = ublk_ctrl_cmd(ublk, UBLK_CMD_START_DEV); 1885 if (rc < 0) { 1886 SPDK_ERRLOG("start dev %d failed, rc %s\n", ublk->ublk_id, 1887 spdk_strerror(-rc)); 1888 goto err; 1889 } 1890 1891 /* Send queue to different spdk_threads for load balance */ 1892 for (q_id = 0; q_id < ublk->num_queues; q_id++) { 1893 ublk->queues[q_id].poll_group = &g_ublk_tgt.poll_groups[g_next_ublk_poll_group]; 1894 ublk_thread = g_ublk_tgt.poll_groups[g_next_ublk_poll_group].ublk_thread; 1895 spdk_thread_send_msg(ublk_thread, ublk_queue_run, &ublk->queues[q_id]); 1896 g_next_ublk_poll_group++; 1897 if (g_next_ublk_poll_group == g_num_ublk_poll_groups) { 1898 g_next_ublk_poll_group = 0; 1899 } 1900 } 1901 1902 goto out; 1903 1904 err: 1905 ublk_delete_dev(ublk); 1906 out: 1907 if (rc < 0 && ublk->start_cb) { 1908 ublk->start_cb(ublk->cb_arg, rc); 1909 ublk->start_cb = NULL; 1910 } 1911 } 1912 1913 SPDK_LOG_REGISTER_COMPONENT(ublk) 1914 SPDK_LOG_REGISTER_COMPONENT(ublk_io) 1915