1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2022 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include <liburing.h> 7 8 #include "spdk/stdinc.h" 9 #include "spdk/string.h" 10 #include "spdk/bdev.h" 11 #include "spdk/endian.h" 12 #include "spdk/env.h" 13 #include "spdk/likely.h" 14 #include "spdk/log.h" 15 #include "spdk/util.h" 16 #include "spdk/queue.h" 17 #include "spdk/json.h" 18 #include "spdk/ublk.h" 19 #include "spdk/thread.h" 20 21 #include "ublk_internal.h" 22 23 #define UBLK_CTRL_DEV "/dev/ublk-control" 24 #define UBLK_BLK_CDEV "/dev/ublkc" 25 26 #define LINUX_SECTOR_SHIFT 9 27 #define UBLK_IO_MAX_BYTES SPDK_BDEV_LARGE_BUF_MAX_SIZE 28 #define UBLK_DEV_MAX_QUEUES 32 29 #define UBLK_DEV_MAX_QUEUE_DEPTH 1024 30 #define UBLK_QUEUE_REQUEST 32 31 #define UBLK_STOP_BUSY_WAITING_MS 10000 32 #define UBLK_BUSY_POLLING_INTERVAL_US 20000 33 #define UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US 1000 34 /* By default, kernel ublk_drv driver can support up to 64 block devices */ 35 #define UBLK_DEFAULT_MAX_SUPPORTED_DEVS 64 36 37 #define UBLK_IOBUF_SMALL_CACHE_SIZE 128 38 #define UBLK_IOBUF_LARGE_CACHE_SIZE 32 39 40 #define UBLK_DEBUGLOG(ublk, format, ...) \ 41 SPDK_DEBUGLOG(ublk, "ublk%d: " format, ublk->ublk_id, ##__VA_ARGS__); 42 43 static uint32_t g_num_ublk_poll_groups = 0; 44 static uint32_t g_next_ublk_poll_group = 0; 45 static uint32_t g_ublks_max = UBLK_DEFAULT_MAX_SUPPORTED_DEVS; 46 static struct spdk_cpuset g_core_mask; 47 48 struct ublk_queue; 49 struct ublk_poll_group; 50 struct ublk_io; 51 static void _ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io); 52 static void ublk_dev_queue_fini(struct ublk_queue *q); 53 static int ublk_poll(void *arg); 54 55 static int ublk_set_params(struct spdk_ublk_dev *ublk); 56 static int ublk_finish_start(struct spdk_ublk_dev *ublk); 57 static void ublk_free_dev(struct spdk_ublk_dev *ublk); 58 static void ublk_delete_dev(void *arg); 59 static int ublk_close_dev(struct spdk_ublk_dev *ublk); 60 61 static const char *ublk_op_name[64] 62 __attribute__((unused)) = { 63 [UBLK_CMD_ADD_DEV] = "UBLK_CMD_ADD_DEV", 64 [UBLK_CMD_DEL_DEV] = "UBLK_CMD_DEL_DEV", 65 [UBLK_CMD_START_DEV] = "UBLK_CMD_START_DEV", 66 [UBLK_CMD_STOP_DEV] = "UBLK_CMD_STOP_DEV", 67 [UBLK_CMD_SET_PARAMS] = "UBLK_CMD_SET_PARAMS", 68 }; 69 70 typedef void (*ublk_get_buf_cb)(struct ublk_io *io); 71 72 struct ublk_io { 73 void *payload; 74 void *mpool_entry; 75 bool need_data; 76 bool user_copy; 77 uint16_t tag; 78 uint64_t payload_size; 79 uint32_t cmd_op; 80 int32_t result; 81 struct spdk_bdev_desc *bdev_desc; 82 struct spdk_io_channel *bdev_ch; 83 const struct ublksrv_io_desc *iod; 84 ublk_get_buf_cb get_buf_cb; 85 struct ublk_queue *q; 86 /* for bdev io_wait */ 87 struct spdk_bdev_io_wait_entry bdev_io_wait; 88 struct spdk_iobuf_entry iobuf; 89 90 TAILQ_ENTRY(ublk_io) tailq; 91 }; 92 93 struct ublk_queue { 94 uint32_t q_id; 95 uint32_t q_depth; 96 struct ublk_io *ios; 97 TAILQ_HEAD(, ublk_io) completed_io_list; 98 TAILQ_HEAD(, ublk_io) inflight_io_list; 99 uint32_t cmd_inflight; 100 bool is_stopping; 101 struct ublksrv_io_desc *io_cmd_buf; 102 /* ring depth == dev_info->queue_depth. */ 103 struct io_uring ring; 104 struct spdk_ublk_dev *dev; 105 struct ublk_poll_group *poll_group; 106 struct spdk_io_channel *bdev_ch; 107 108 TAILQ_ENTRY(ublk_queue) tailq; 109 }; 110 111 struct spdk_ublk_dev { 112 struct spdk_bdev *bdev; 113 struct spdk_bdev_desc *bdev_desc; 114 115 int cdev_fd; 116 struct ublk_params dev_params; 117 struct ublksrv_ctrl_dev_info dev_info; 118 119 uint32_t ublk_id; 120 uint32_t num_queues; 121 uint32_t queue_depth; 122 uint32_t sector_per_block_shift; 123 struct ublk_queue queues[UBLK_DEV_MAX_QUEUES]; 124 125 struct spdk_poller *retry_poller; 126 int retry_count; 127 uint32_t queues_closed; 128 ublk_ctrl_cb ctrl_cb; 129 void *cb_arg; 130 uint32_t current_cmd_op; 131 uint32_t ctrl_ops_in_progress; 132 bool is_closing; 133 134 TAILQ_ENTRY(spdk_ublk_dev) tailq; 135 TAILQ_ENTRY(spdk_ublk_dev) wait_tailq; 136 }; 137 138 struct ublk_poll_group { 139 struct spdk_thread *ublk_thread; 140 struct spdk_poller *ublk_poller; 141 struct spdk_iobuf_channel iobuf_ch; 142 TAILQ_HEAD(, ublk_queue) queue_list; 143 }; 144 145 struct ublk_tgt { 146 int ctrl_fd; 147 bool active; 148 bool is_destroying; 149 spdk_ublk_fini_cb cb_fn; 150 void *cb_arg; 151 struct io_uring ctrl_ring; 152 struct spdk_poller *ctrl_poller; 153 uint32_t ctrl_ops_in_progress; 154 struct ublk_poll_group *poll_groups; 155 uint32_t num_ublk_devs; 156 uint64_t features; 157 /* `ublk_drv` supports UBLK_F_CMD_IOCTL_ENCODE */ 158 bool ioctl_encode; 159 /* `ublk_drv` supports UBLK_F_USER_COPY */ 160 bool user_copy; 161 }; 162 163 static TAILQ_HEAD(, spdk_ublk_dev) g_ublk_devs = TAILQ_HEAD_INITIALIZER(g_ublk_devs); 164 static struct ublk_tgt g_ublk_tgt; 165 166 /* helpers for using io_uring */ 167 static inline int 168 ublk_setup_ring(uint32_t depth, struct io_uring *r, unsigned flags) 169 { 170 struct io_uring_params p = {}; 171 172 p.flags = flags | IORING_SETUP_CQSIZE; 173 p.cq_entries = depth; 174 175 return io_uring_queue_init_params(depth, r, &p); 176 } 177 178 static inline struct io_uring_sqe * 179 ublk_uring_get_sqe(struct io_uring *r, uint32_t idx) 180 { 181 /* Need to update the idx since we set IORING_SETUP_SQE128 parameter in ublk_setup_ring */ 182 return &r->sq.sqes[idx << 1]; 183 } 184 185 static inline void * 186 ublk_get_sqe_cmd(struct io_uring_sqe *sqe) 187 { 188 return (void *)&sqe->addr3; 189 } 190 191 static inline void 192 ublk_set_sqe_cmd_op(struct io_uring_sqe *sqe, uint32_t cmd_op) 193 { 194 uint32_t opc = cmd_op; 195 196 if (g_ublk_tgt.ioctl_encode) { 197 switch (cmd_op) { 198 /* ctrl uring */ 199 case UBLK_CMD_GET_DEV_INFO: 200 opc = _IOR('u', UBLK_CMD_GET_DEV_INFO, struct ublksrv_ctrl_cmd); 201 break; 202 case UBLK_CMD_ADD_DEV: 203 opc = _IOWR('u', UBLK_CMD_ADD_DEV, struct ublksrv_ctrl_cmd); 204 break; 205 case UBLK_CMD_DEL_DEV: 206 opc = _IOWR('u', UBLK_CMD_DEL_DEV, struct ublksrv_ctrl_cmd); 207 break; 208 case UBLK_CMD_START_DEV: 209 opc = _IOWR('u', UBLK_CMD_START_DEV, struct ublksrv_ctrl_cmd); 210 break; 211 case UBLK_CMD_STOP_DEV: 212 opc = _IOWR('u', UBLK_CMD_STOP_DEV, struct ublksrv_ctrl_cmd); 213 break; 214 case UBLK_CMD_SET_PARAMS: 215 opc = _IOWR('u', UBLK_CMD_SET_PARAMS, struct ublksrv_ctrl_cmd); 216 break; 217 218 /* io uring */ 219 case UBLK_IO_FETCH_REQ: 220 opc = _IOWR('u', UBLK_IO_FETCH_REQ, struct ublksrv_io_cmd); 221 break; 222 case UBLK_IO_COMMIT_AND_FETCH_REQ: 223 opc = _IOWR('u', UBLK_IO_COMMIT_AND_FETCH_REQ, struct ublksrv_io_cmd); 224 break; 225 case UBLK_IO_NEED_GET_DATA: 226 opc = _IOWR('u', UBLK_IO_NEED_GET_DATA, struct ublksrv_io_cmd); 227 break; 228 default: 229 break; 230 } 231 } 232 233 sqe->off = opc; 234 } 235 236 static inline uint64_t 237 build_user_data(uint16_t tag, uint8_t op) 238 { 239 assert(!(tag >> 16) && !(op >> 8)); 240 241 return tag | (op << 16); 242 } 243 244 static inline uint16_t 245 user_data_to_tag(uint64_t user_data) 246 { 247 return user_data & 0xffff; 248 } 249 250 static inline uint8_t 251 user_data_to_op(uint64_t user_data) 252 { 253 return (user_data >> 16) & 0xff; 254 } 255 256 static inline uint64_t 257 ublk_user_copy_pos(uint16_t q_id, uint16_t tag) 258 { 259 return (uint64_t)UBLKSRV_IO_BUF_OFFSET + ((((uint64_t)q_id) << UBLK_QID_OFF) | ((( 260 uint64_t)tag) << UBLK_TAG_OFF)); 261 } 262 263 void 264 spdk_ublk_init(void) 265 { 266 assert(spdk_thread_is_app_thread(NULL)); 267 268 g_ublk_tgt.ctrl_fd = -1; 269 g_ublk_tgt.ctrl_ring.ring_fd = -1; 270 } 271 272 static void 273 ublk_ctrl_cmd_error(struct spdk_ublk_dev *ublk, int32_t res) 274 { 275 assert(res != 0); 276 277 SPDK_ERRLOG("ctrlr cmd %s failed, %s\n", ublk_op_name[ublk->current_cmd_op], spdk_strerror(-res)); 278 if (ublk->ctrl_cb) { 279 ublk->ctrl_cb(ublk->cb_arg, res); 280 ublk->ctrl_cb = NULL; 281 } 282 283 switch (ublk->current_cmd_op) { 284 case UBLK_CMD_ADD_DEV: 285 case UBLK_CMD_SET_PARAMS: 286 ublk_delete_dev(ublk); 287 break; 288 case UBLK_CMD_START_DEV: 289 ublk_close_dev(ublk); 290 break; 291 case UBLK_CMD_STOP_DEV: 292 case UBLK_CMD_DEL_DEV: 293 break; 294 default: 295 SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", ublk->current_cmd_op); 296 break; 297 } 298 } 299 300 static void 301 ublk_ctrl_process_cqe(struct io_uring_cqe *cqe) 302 { 303 struct spdk_ublk_dev *ublk; 304 int rc = 0; 305 306 ublk = (struct spdk_ublk_dev *)cqe->user_data; 307 UBLK_DEBUGLOG(ublk, "ctrl cmd completed\n"); 308 ublk->ctrl_ops_in_progress--; 309 310 if (spdk_unlikely(cqe->res != 0)) { 311 SPDK_ERRLOG("ctrlr cmd failed\n"); 312 ublk_ctrl_cmd_error(ublk, cqe->res); 313 return; 314 } 315 316 switch (ublk->current_cmd_op) { 317 case UBLK_CMD_ADD_DEV: 318 rc = ublk_set_params(ublk); 319 if (rc < 0) { 320 ublk_delete_dev(ublk); 321 goto start_done; 322 } 323 break; 324 case UBLK_CMD_SET_PARAMS: 325 rc = ublk_finish_start(ublk); 326 if (rc < 0) { 327 ublk_delete_dev(ublk); 328 goto start_done; 329 } 330 break; 331 case UBLK_CMD_START_DEV: 332 goto start_done; 333 break; 334 case UBLK_CMD_STOP_DEV: 335 break; 336 case UBLK_CMD_DEL_DEV: 337 if (ublk->ctrl_cb) { 338 ublk->ctrl_cb(ublk->cb_arg, 0); 339 ublk->ctrl_cb = NULL; 340 } 341 ublk_free_dev(ublk); 342 break; 343 default: 344 SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", ublk->current_cmd_op); 345 break; 346 } 347 348 return; 349 350 start_done: 351 if (ublk->ctrl_cb) { 352 ublk->ctrl_cb(ublk->cb_arg, rc); 353 ublk->ctrl_cb = NULL; 354 } 355 } 356 357 static int 358 ublk_ctrl_poller(void *arg) 359 { 360 struct io_uring *ring = &g_ublk_tgt.ctrl_ring; 361 struct io_uring_cqe *cqe; 362 const int max = 8; 363 int i, count = 0, rc; 364 365 if (!g_ublk_tgt.ctrl_ops_in_progress) { 366 return SPDK_POLLER_IDLE; 367 } 368 369 for (i = 0; i < max; i++) { 370 rc = io_uring_peek_cqe(ring, &cqe); 371 if (rc == -EAGAIN) { 372 break; 373 } 374 375 assert(cqe != NULL); 376 g_ublk_tgt.ctrl_ops_in_progress--; 377 378 ublk_ctrl_process_cqe(cqe); 379 380 io_uring_cqe_seen(ring, cqe); 381 count++; 382 } 383 384 return count > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE; 385 } 386 387 static int 388 ublk_ctrl_cmd_submit(struct spdk_ublk_dev *ublk, uint32_t cmd_op) 389 { 390 uint32_t dev_id = ublk->ublk_id; 391 int rc = -EINVAL; 392 struct io_uring_sqe *sqe; 393 struct ublksrv_ctrl_cmd *cmd; 394 395 UBLK_DEBUGLOG(ublk, "ctrl cmd %s\n", ublk_op_name[cmd_op]); 396 397 sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring); 398 if (!sqe) { 399 SPDK_ERRLOG("No available sqe in ctrl ring\n"); 400 assert(false); 401 return -ENOENT; 402 } 403 404 cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe); 405 sqe->fd = g_ublk_tgt.ctrl_fd; 406 sqe->opcode = IORING_OP_URING_CMD; 407 sqe->ioprio = 0; 408 cmd->dev_id = dev_id; 409 cmd->queue_id = -1; 410 ublk->current_cmd_op = cmd_op; 411 412 switch (cmd_op) { 413 case UBLK_CMD_ADD_DEV: 414 cmd->addr = (__u64)(uintptr_t)&ublk->dev_info; 415 cmd->len = sizeof(ublk->dev_info); 416 break; 417 case UBLK_CMD_SET_PARAMS: 418 cmd->addr = (__u64)(uintptr_t)&ublk->dev_params; 419 cmd->len = sizeof(ublk->dev_params); 420 break; 421 case UBLK_CMD_START_DEV: 422 cmd->data[0] = getpid(); 423 break; 424 case UBLK_CMD_STOP_DEV: 425 break; 426 case UBLK_CMD_DEL_DEV: 427 break; 428 default: 429 SPDK_ERRLOG("No match cmd operation,cmd_op = %d\n", cmd_op); 430 return -EINVAL; 431 } 432 ublk_set_sqe_cmd_op(sqe, cmd_op); 433 io_uring_sqe_set_data(sqe, ublk); 434 435 rc = io_uring_submit(&g_ublk_tgt.ctrl_ring); 436 if (rc < 0) { 437 SPDK_ERRLOG("uring submit rc %d\n", rc); 438 assert(false); 439 return rc; 440 } 441 g_ublk_tgt.ctrl_ops_in_progress++; 442 ublk->ctrl_ops_in_progress++; 443 444 return 0; 445 } 446 447 static int 448 ublk_ctrl_cmd_get_features(void) 449 { 450 int rc; 451 struct io_uring_sqe *sqe; 452 struct io_uring_cqe *cqe; 453 struct ublksrv_ctrl_cmd *cmd; 454 uint32_t cmd_op; 455 456 sqe = io_uring_get_sqe(&g_ublk_tgt.ctrl_ring); 457 if (!sqe) { 458 SPDK_ERRLOG("No available sqe in ctrl ring\n"); 459 assert(false); 460 return -ENOENT; 461 } 462 463 cmd = (struct ublksrv_ctrl_cmd *)ublk_get_sqe_cmd(sqe); 464 sqe->fd = g_ublk_tgt.ctrl_fd; 465 sqe->opcode = IORING_OP_URING_CMD; 466 sqe->ioprio = 0; 467 cmd->dev_id = -1; 468 cmd->queue_id = -1; 469 cmd->addr = (__u64)(uintptr_t)&g_ublk_tgt.features; 470 cmd->len = sizeof(g_ublk_tgt.features); 471 472 cmd_op = UBLK_U_CMD_GET_FEATURES; 473 ublk_set_sqe_cmd_op(sqe, cmd_op); 474 475 rc = io_uring_submit(&g_ublk_tgt.ctrl_ring); 476 if (rc < 0) { 477 SPDK_ERRLOG("uring submit rc %d\n", rc); 478 return rc; 479 } 480 481 rc = io_uring_wait_cqe(&g_ublk_tgt.ctrl_ring, &cqe); 482 if (rc < 0) { 483 SPDK_ERRLOG("wait cqe rc %d\n", rc); 484 return rc; 485 } 486 487 if (cqe->res == 0) { 488 g_ublk_tgt.ioctl_encode = !!(g_ublk_tgt.features & UBLK_F_CMD_IOCTL_ENCODE); 489 g_ublk_tgt.user_copy = !!(g_ublk_tgt.features & UBLK_F_USER_COPY); 490 } 491 io_uring_cqe_seen(&g_ublk_tgt.ctrl_ring, cqe); 492 493 return 0; 494 } 495 496 static int 497 ublk_queue_cmd_buf_sz(uint32_t q_depth) 498 { 499 uint32_t size = q_depth * sizeof(struct ublksrv_io_desc); 500 uint32_t page_sz = getpagesize(); 501 502 /* round up size */ 503 return (size + page_sz - 1) & ~(page_sz - 1); 504 } 505 506 static int 507 ublk_get_max_support_devs(void) 508 { 509 FILE *file; 510 char str[128]; 511 512 file = fopen("/sys/module/ublk_drv/parameters/ublks_max", "r"); 513 if (!file) { 514 return -ENOENT; 515 } 516 517 if (!fgets(str, sizeof(str), file)) { 518 fclose(file); 519 return -EINVAL; 520 } 521 fclose(file); 522 523 spdk_str_chomp(str); 524 return spdk_strtol(str, 10); 525 } 526 527 static int 528 ublk_open(void) 529 { 530 int rc, ublks_max; 531 532 g_ublk_tgt.ctrl_fd = open(UBLK_CTRL_DEV, O_RDWR); 533 if (g_ublk_tgt.ctrl_fd < 0) { 534 rc = errno; 535 SPDK_ERRLOG("UBLK conrol dev %s can't be opened, error=%s\n", UBLK_CTRL_DEV, spdk_strerror(errno)); 536 return -rc; 537 } 538 539 ublks_max = ublk_get_max_support_devs(); 540 if (ublks_max > 0) { 541 g_ublks_max = ublks_max; 542 } 543 544 /* We need to set SQPOLL for kernels 6.1 and earlier, since they would not defer ublk ctrl 545 * ring processing to a workqueue. Ctrl ring processing is minimal, so SQPOLL is fine. 546 * All the commands sent via control uring for a ublk device is executed one by one, so use 547 * ublks_max * 2 as the number of uring entries is enough. 548 */ 549 rc = ublk_setup_ring(g_ublks_max * 2, &g_ublk_tgt.ctrl_ring, 550 IORING_SETUP_SQE128 | IORING_SETUP_SQPOLL); 551 if (rc < 0) { 552 SPDK_ERRLOG("UBLK ctrl queue_init: %s\n", spdk_strerror(-rc)); 553 goto err; 554 } 555 556 rc = ublk_ctrl_cmd_get_features(); 557 if (rc) { 558 goto err; 559 } 560 561 return 0; 562 563 err: 564 close(g_ublk_tgt.ctrl_fd); 565 g_ublk_tgt.ctrl_fd = -1; 566 return rc; 567 } 568 569 static int 570 ublk_parse_core_mask(const char *mask) 571 { 572 struct spdk_cpuset tmp_mask; 573 int rc; 574 575 if (mask == NULL) { 576 spdk_env_get_cpuset(&g_core_mask); 577 return 0; 578 } 579 580 rc = spdk_cpuset_parse(&g_core_mask, mask); 581 if (rc < 0) { 582 SPDK_ERRLOG("invalid cpumask %s\n", mask); 583 return -EINVAL; 584 } 585 586 if (spdk_cpuset_count(&g_core_mask) == 0) { 587 SPDK_ERRLOG("no cpus specified\n"); 588 return -EINVAL; 589 } 590 591 spdk_env_get_cpuset(&tmp_mask); 592 spdk_cpuset_and(&tmp_mask, &g_core_mask); 593 594 if (!spdk_cpuset_equal(&tmp_mask, &g_core_mask)) { 595 SPDK_ERRLOG("one of selected cpu is outside of core mask(=%s)\n", 596 spdk_cpuset_fmt(&g_core_mask)); 597 return -EINVAL; 598 } 599 600 return 0; 601 } 602 603 static void 604 ublk_poller_register(void *args) 605 { 606 struct ublk_poll_group *poll_group = args; 607 int rc; 608 609 assert(spdk_get_thread() == poll_group->ublk_thread); 610 /* Bind ublk spdk_thread to current CPU core in order to avoid thread context switch 611 * during uring processing as required by ublk kernel. 612 */ 613 spdk_thread_bind(spdk_get_thread(), true); 614 615 TAILQ_INIT(&poll_group->queue_list); 616 poll_group->ublk_poller = SPDK_POLLER_REGISTER(ublk_poll, poll_group, 0); 617 rc = spdk_iobuf_channel_init(&poll_group->iobuf_ch, "ublk", 618 UBLK_IOBUF_SMALL_CACHE_SIZE, UBLK_IOBUF_LARGE_CACHE_SIZE); 619 if (rc != 0) { 620 assert(false); 621 } 622 } 623 624 int 625 ublk_create_target(const char *cpumask_str) 626 { 627 int rc; 628 uint32_t i; 629 char thread_name[32]; 630 struct ublk_poll_group *poll_group; 631 632 if (g_ublk_tgt.active == true) { 633 SPDK_ERRLOG("UBLK target has been created\n"); 634 return -EBUSY; 635 } 636 637 rc = ublk_parse_core_mask(cpumask_str); 638 if (rc != 0) { 639 return rc; 640 } 641 642 assert(g_ublk_tgt.poll_groups == NULL); 643 g_ublk_tgt.poll_groups = calloc(spdk_env_get_core_count(), sizeof(*poll_group)); 644 if (!g_ublk_tgt.poll_groups) { 645 return -ENOMEM; 646 } 647 648 rc = ublk_open(); 649 if (rc != 0) { 650 SPDK_ERRLOG("Fail to open UBLK, error=%s\n", spdk_strerror(-rc)); 651 free(g_ublk_tgt.poll_groups); 652 g_ublk_tgt.poll_groups = NULL; 653 return rc; 654 } 655 656 spdk_iobuf_register_module("ublk"); 657 658 SPDK_ENV_FOREACH_CORE(i) { 659 if (!spdk_cpuset_get_cpu(&g_core_mask, i)) { 660 continue; 661 } 662 snprintf(thread_name, sizeof(thread_name), "ublk_thread%u", i); 663 poll_group = &g_ublk_tgt.poll_groups[g_num_ublk_poll_groups]; 664 poll_group->ublk_thread = spdk_thread_create(thread_name, &g_core_mask); 665 spdk_thread_send_msg(poll_group->ublk_thread, ublk_poller_register, poll_group); 666 g_num_ublk_poll_groups++; 667 } 668 669 assert(spdk_thread_is_app_thread(NULL)); 670 g_ublk_tgt.active = true; 671 g_ublk_tgt.ctrl_ops_in_progress = 0; 672 g_ublk_tgt.ctrl_poller = SPDK_POLLER_REGISTER(ublk_ctrl_poller, NULL, 673 UBLK_DEFAULT_CTRL_URING_POLLING_INTERVAL_US); 674 675 SPDK_NOTICELOG("UBLK target created successfully\n"); 676 677 return 0; 678 } 679 680 static void 681 _ublk_fini_done(void *args) 682 { 683 SPDK_DEBUGLOG(ublk, "\n"); 684 685 g_num_ublk_poll_groups = 0; 686 g_next_ublk_poll_group = 0; 687 g_ublk_tgt.is_destroying = false; 688 g_ublk_tgt.active = false; 689 g_ublk_tgt.features = 0; 690 g_ublk_tgt.ioctl_encode = false; 691 g_ublk_tgt.user_copy = false; 692 693 if (g_ublk_tgt.cb_fn) { 694 g_ublk_tgt.cb_fn(g_ublk_tgt.cb_arg); 695 g_ublk_tgt.cb_fn = NULL; 696 g_ublk_tgt.cb_arg = NULL; 697 } 698 699 if (g_ublk_tgt.poll_groups) { 700 free(g_ublk_tgt.poll_groups); 701 g_ublk_tgt.poll_groups = NULL; 702 } 703 704 } 705 706 static void 707 ublk_thread_exit(void *args) 708 { 709 struct spdk_thread *ublk_thread = spdk_get_thread(); 710 uint32_t i; 711 712 for (i = 0; i < g_num_ublk_poll_groups; i++) { 713 if (g_ublk_tgt.poll_groups[i].ublk_thread == ublk_thread) { 714 spdk_poller_unregister(&g_ublk_tgt.poll_groups[i].ublk_poller); 715 spdk_iobuf_channel_fini(&g_ublk_tgt.poll_groups[i].iobuf_ch); 716 spdk_thread_bind(ublk_thread, false); 717 spdk_thread_exit(ublk_thread); 718 } 719 } 720 } 721 722 static int 723 ublk_close_dev(struct spdk_ublk_dev *ublk) 724 { 725 int rc; 726 727 /* set is_closing */ 728 if (ublk->is_closing) { 729 return -EBUSY; 730 } 731 ublk->is_closing = true; 732 733 rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_STOP_DEV); 734 if (rc < 0) { 735 SPDK_ERRLOG("stop dev %d failed\n", ublk->ublk_id); 736 } 737 return rc; 738 } 739 740 static void 741 _ublk_fini(void *args) 742 { 743 struct spdk_ublk_dev *ublk, *ublk_tmp; 744 745 TAILQ_FOREACH_SAFE(ublk, &g_ublk_devs, tailq, ublk_tmp) { 746 ublk_close_dev(ublk); 747 } 748 749 /* Check if all ublks closed */ 750 if (TAILQ_EMPTY(&g_ublk_devs)) { 751 SPDK_DEBUGLOG(ublk, "finish shutdown\n"); 752 spdk_poller_unregister(&g_ublk_tgt.ctrl_poller); 753 if (g_ublk_tgt.ctrl_ring.ring_fd >= 0) { 754 io_uring_queue_exit(&g_ublk_tgt.ctrl_ring); 755 g_ublk_tgt.ctrl_ring.ring_fd = -1; 756 } 757 if (g_ublk_tgt.ctrl_fd >= 0) { 758 close(g_ublk_tgt.ctrl_fd); 759 g_ublk_tgt.ctrl_fd = -1; 760 } 761 spdk_for_each_thread(ublk_thread_exit, NULL, _ublk_fini_done); 762 } else { 763 spdk_thread_send_msg(spdk_get_thread(), _ublk_fini, NULL); 764 } 765 } 766 767 int 768 spdk_ublk_fini(spdk_ublk_fini_cb cb_fn, void *cb_arg) 769 { 770 assert(spdk_thread_is_app_thread(NULL)); 771 772 if (g_ublk_tgt.is_destroying == true) { 773 /* UBLK target is being destroying */ 774 return -EBUSY; 775 } 776 g_ublk_tgt.cb_fn = cb_fn; 777 g_ublk_tgt.cb_arg = cb_arg; 778 g_ublk_tgt.is_destroying = true; 779 _ublk_fini(NULL); 780 781 return 0; 782 } 783 784 int 785 ublk_destroy_target(spdk_ublk_fini_cb cb_fn, void *cb_arg) 786 { 787 int rc; 788 789 if (g_ublk_tgt.active == false) { 790 /* UBLK target has not been created */ 791 return -ENOENT; 792 } 793 794 rc = spdk_ublk_fini(cb_fn, cb_arg); 795 796 return rc; 797 } 798 799 struct spdk_ublk_dev * 800 ublk_dev_find_by_id(uint32_t ublk_id) 801 { 802 struct spdk_ublk_dev *ublk; 803 804 /* check whether ublk has already been registered by ublk path. */ 805 TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) { 806 if (ublk->ublk_id == ublk_id) { 807 return ublk; 808 } 809 } 810 811 return NULL; 812 } 813 814 uint32_t 815 ublk_dev_get_id(struct spdk_ublk_dev *ublk) 816 { 817 return ublk->ublk_id; 818 } 819 820 struct spdk_ublk_dev *ublk_dev_first(void) 821 { 822 return TAILQ_FIRST(&g_ublk_devs); 823 } 824 825 struct spdk_ublk_dev *ublk_dev_next(struct spdk_ublk_dev *prev) 826 { 827 return TAILQ_NEXT(prev, tailq); 828 } 829 830 uint32_t 831 ublk_dev_get_queue_depth(struct spdk_ublk_dev *ublk) 832 { 833 return ublk->queue_depth; 834 } 835 836 uint32_t 837 ublk_dev_get_num_queues(struct spdk_ublk_dev *ublk) 838 { 839 return ublk->num_queues; 840 } 841 842 const char * 843 ublk_dev_get_bdev_name(struct spdk_ublk_dev *ublk) 844 { 845 return spdk_bdev_get_name(ublk->bdev); 846 } 847 848 void 849 spdk_ublk_write_config_json(struct spdk_json_write_ctx *w) 850 { 851 struct spdk_ublk_dev *ublk; 852 853 spdk_json_write_array_begin(w); 854 855 if (g_ublk_tgt.active) { 856 spdk_json_write_object_begin(w); 857 858 spdk_json_write_named_string(w, "method", "ublk_create_target"); 859 spdk_json_write_named_object_begin(w, "params"); 860 spdk_json_write_named_string(w, "cpumask", spdk_cpuset_fmt(&g_core_mask)); 861 spdk_json_write_object_end(w); 862 863 spdk_json_write_object_end(w); 864 } 865 866 TAILQ_FOREACH(ublk, &g_ublk_devs, tailq) { 867 spdk_json_write_object_begin(w); 868 869 spdk_json_write_named_string(w, "method", "ublk_start_disk"); 870 871 spdk_json_write_named_object_begin(w, "params"); 872 spdk_json_write_named_string(w, "bdev_name", ublk_dev_get_bdev_name(ublk)); 873 spdk_json_write_named_uint32(w, "ublk_id", ublk->ublk_id); 874 spdk_json_write_named_uint32(w, "num_queues", ublk->num_queues); 875 spdk_json_write_named_uint32(w, "queue_depth", ublk->queue_depth); 876 spdk_json_write_object_end(w); 877 878 spdk_json_write_object_end(w); 879 } 880 881 spdk_json_write_array_end(w); 882 } 883 884 static void 885 ublk_dev_list_register(struct spdk_ublk_dev *ublk) 886 { 887 UBLK_DEBUGLOG(ublk, "add to tailq\n"); 888 TAILQ_INSERT_TAIL(&g_ublk_devs, ublk, tailq); 889 g_ublk_tgt.num_ublk_devs++; 890 } 891 892 static void 893 ublk_dev_list_unregister(struct spdk_ublk_dev *ublk) 894 { 895 /* 896 * ublk device may be stopped before registered. 897 * check whether it was registered. 898 */ 899 900 if (ublk_dev_find_by_id(ublk->ublk_id)) { 901 UBLK_DEBUGLOG(ublk, "remove from tailq\n"); 902 TAILQ_REMOVE(&g_ublk_devs, ublk, tailq); 903 assert(g_ublk_tgt.num_ublk_devs); 904 g_ublk_tgt.num_ublk_devs--; 905 return; 906 } 907 908 UBLK_DEBUGLOG(ublk, "not found in tailq\n"); 909 assert(false); 910 } 911 912 static void 913 ublk_delete_dev(void *arg) 914 { 915 struct spdk_ublk_dev *ublk = arg; 916 int rc = 0; 917 uint32_t q_idx; 918 919 assert(spdk_thread_is_app_thread(NULL)); 920 for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) { 921 ublk_dev_queue_fini(&ublk->queues[q_idx]); 922 } 923 924 if (ublk->cdev_fd >= 0) { 925 close(ublk->cdev_fd); 926 } 927 928 rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_DEL_DEV); 929 if (rc < 0) { 930 SPDK_ERRLOG("delete dev %d failed\n", ublk->ublk_id); 931 } 932 } 933 934 static int 935 _ublk_close_dev_retry(void *arg) 936 { 937 struct spdk_ublk_dev *ublk = arg; 938 939 if (ublk->ctrl_ops_in_progress > 0) { 940 if (ublk->retry_count-- > 0) { 941 return SPDK_POLLER_BUSY; 942 } 943 SPDK_ERRLOG("Timeout on ctrl op completion.\n"); 944 } 945 spdk_poller_unregister(&ublk->retry_poller); 946 ublk_delete_dev(ublk); 947 return SPDK_POLLER_BUSY; 948 } 949 950 static void 951 ublk_try_close_dev(void *arg) 952 { 953 struct spdk_ublk_dev *ublk = arg; 954 955 assert(spdk_thread_is_app_thread(NULL)); 956 957 ublk->queues_closed += 1; 958 SPDK_DEBUGLOG(ublk_io, "ublkb%u closed queues %u\n", ublk->ublk_id, ublk->queues_closed); 959 960 if (ublk->queues_closed < ublk->num_queues) { 961 return; 962 } 963 964 if (ublk->ctrl_ops_in_progress > 0) { 965 assert(ublk->retry_poller == NULL); 966 ublk->retry_count = UBLK_STOP_BUSY_WAITING_MS * 1000ULL / UBLK_BUSY_POLLING_INTERVAL_US; 967 ublk->retry_poller = SPDK_POLLER_REGISTER(_ublk_close_dev_retry, ublk, 968 UBLK_BUSY_POLLING_INTERVAL_US); 969 } else { 970 ublk_delete_dev(ublk); 971 } 972 } 973 974 static void 975 ublk_try_close_queue(struct ublk_queue *q) 976 { 977 struct spdk_ublk_dev *ublk = q->dev; 978 979 /* Close queue until no I/O is submitted to bdev in flight, 980 * no I/O is waiting to commit result, and all I/Os are aborted back. 981 */ 982 if (!TAILQ_EMPTY(&q->inflight_io_list) || !TAILQ_EMPTY(&q->completed_io_list) || q->cmd_inflight) { 983 /* wait for next retry */ 984 return; 985 } 986 987 TAILQ_REMOVE(&q->poll_group->queue_list, q, tailq); 988 spdk_put_io_channel(q->bdev_ch); 989 q->bdev_ch = NULL; 990 991 spdk_thread_send_msg(spdk_thread_get_app_thread(), ublk_try_close_dev, ublk); 992 } 993 994 int 995 ublk_stop_disk(uint32_t ublk_id, ublk_ctrl_cb ctrl_cb, void *cb_arg) 996 { 997 struct spdk_ublk_dev *ublk; 998 999 assert(spdk_thread_is_app_thread(NULL)); 1000 1001 ublk = ublk_dev_find_by_id(ublk_id); 1002 if (ublk == NULL) { 1003 SPDK_ERRLOG("no ublk dev with ublk_id=%u\n", ublk_id); 1004 return -ENODEV; 1005 } 1006 if (ublk->is_closing) { 1007 SPDK_WARNLOG("ublk %d is closing\n", ublk->ublk_id); 1008 return -EBUSY; 1009 } 1010 if (ublk->ctrl_cb) { 1011 SPDK_WARNLOG("ublk %d is busy with RPC call\n", ublk->ublk_id); 1012 return -EBUSY; 1013 } 1014 1015 ublk->ctrl_cb = ctrl_cb; 1016 ublk->cb_arg = cb_arg; 1017 return ublk_close_dev(ublk); 1018 } 1019 1020 static inline void 1021 ublk_mark_io_done(struct ublk_io *io, int res) 1022 { 1023 /* 1024 * mark io done by target, so that SPDK can commit its 1025 * result and fetch new request via io_uring command. 1026 */ 1027 io->cmd_op = UBLK_IO_COMMIT_AND_FETCH_REQ; 1028 io->result = res; 1029 io->need_data = false; 1030 } 1031 1032 static void 1033 ublk_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1034 { 1035 struct ublk_io *io = cb_arg; 1036 struct ublk_queue *q = io->q; 1037 int res; 1038 1039 if (success) { 1040 res = io->result; 1041 } else { 1042 res = -EIO; 1043 } 1044 1045 ublk_mark_io_done(io, res); 1046 1047 SPDK_DEBUGLOG(ublk_io, "(qid %d tag %d res %d)\n", 1048 q->q_id, io->tag, res); 1049 TAILQ_REMOVE(&q->inflight_io_list, io, tailq); 1050 TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq); 1051 1052 if (bdev_io != NULL) { 1053 spdk_bdev_free_io(bdev_io); 1054 } 1055 } 1056 1057 static void 1058 ublk_queue_user_copy(struct ublk_io *io, bool is_write) 1059 { 1060 struct ublk_queue *q = io->q; 1061 const struct ublksrv_io_desc *iod = io->iod; 1062 struct io_uring_sqe *sqe; 1063 uint64_t pos; 1064 uint32_t nbytes; 1065 1066 nbytes = iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT); 1067 pos = ublk_user_copy_pos(q->q_id, io->tag); 1068 sqe = io_uring_get_sqe(&q->ring); 1069 assert(sqe); 1070 1071 if (is_write) { 1072 io_uring_prep_read(sqe, 0, io->payload, nbytes, pos); 1073 } else { 1074 io_uring_prep_write(sqe, 0, io->payload, nbytes, pos); 1075 } 1076 io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); 1077 io_uring_sqe_set_data64(sqe, build_user_data(io->tag, 0)); 1078 1079 io->user_copy = true; 1080 TAILQ_REMOVE(&q->inflight_io_list, io, tailq); 1081 TAILQ_INSERT_TAIL(&q->completed_io_list, io, tailq); 1082 } 1083 1084 static void 1085 ublk_user_copy_read_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 1086 { 1087 struct ublk_io *io = cb_arg; 1088 1089 spdk_bdev_free_io(bdev_io); 1090 1091 if (success) { 1092 ublk_queue_user_copy(io, false); 1093 return; 1094 } 1095 /* READ IO Error */ 1096 ublk_io_done(NULL, false, cb_arg); 1097 } 1098 1099 static void 1100 ublk_resubmit_io(void *arg) 1101 { 1102 struct ublk_io *io = (struct ublk_io *)arg; 1103 1104 _ublk_submit_bdev_io(io->q, io); 1105 } 1106 1107 static void 1108 ublk_queue_io(struct ublk_io *io) 1109 { 1110 int rc; 1111 struct spdk_bdev *bdev = io->q->dev->bdev; 1112 struct ublk_queue *q = io->q; 1113 1114 io->bdev_io_wait.bdev = bdev; 1115 io->bdev_io_wait.cb_fn = ublk_resubmit_io; 1116 io->bdev_io_wait.cb_arg = io; 1117 1118 rc = spdk_bdev_queue_io_wait(bdev, q->bdev_ch, &io->bdev_io_wait); 1119 if (rc != 0) { 1120 SPDK_ERRLOG("Queue io failed in ublk_queue_io, rc=%d.\n", rc); 1121 ublk_io_done(NULL, false, io); 1122 } 1123 } 1124 1125 static void 1126 ublk_io_get_buffer_cb(struct spdk_iobuf_entry *iobuf, void *buf) 1127 { 1128 struct ublk_io *io = SPDK_CONTAINEROF(iobuf, struct ublk_io, iobuf); 1129 1130 io->mpool_entry = buf; 1131 assert(io->payload == NULL); 1132 io->payload = (void *)(uintptr_t)SPDK_ALIGN_CEIL((uintptr_t)buf, 4096ULL); 1133 io->get_buf_cb(io); 1134 } 1135 1136 static void 1137 ublk_io_get_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch, 1138 ublk_get_buf_cb get_buf_cb) 1139 { 1140 void *buf; 1141 1142 io->payload_size = io->iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT); 1143 io->get_buf_cb = get_buf_cb; 1144 buf = spdk_iobuf_get(iobuf_ch, io->payload_size, &io->iobuf, ublk_io_get_buffer_cb); 1145 1146 if (buf != NULL) { 1147 ublk_io_get_buffer_cb(&io->iobuf, buf); 1148 } 1149 } 1150 1151 static void 1152 ublk_io_put_buffer(struct ublk_io *io, struct spdk_iobuf_channel *iobuf_ch) 1153 { 1154 if (io->payload) { 1155 spdk_iobuf_put(iobuf_ch, io->mpool_entry, io->payload_size); 1156 io->mpool_entry = NULL; 1157 io->payload = NULL; 1158 } 1159 } 1160 1161 static void 1162 _ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io) 1163 { 1164 struct spdk_ublk_dev *ublk = q->dev; 1165 struct spdk_bdev_desc *desc = io->bdev_desc; 1166 struct spdk_io_channel *ch = io->bdev_ch; 1167 uint64_t offset_blocks, num_blocks; 1168 spdk_bdev_io_completion_cb read_cb; 1169 uint8_t ublk_op; 1170 int rc = 0; 1171 const struct ublksrv_io_desc *iod = io->iod; 1172 1173 ublk_op = ublksrv_get_op(iod); 1174 offset_blocks = iod->start_sector >> ublk->sector_per_block_shift; 1175 num_blocks = iod->nr_sectors >> ublk->sector_per_block_shift; 1176 1177 switch (ublk_op) { 1178 case UBLK_IO_OP_READ: 1179 if (g_ublk_tgt.user_copy) { 1180 read_cb = ublk_user_copy_read_done; 1181 } else { 1182 read_cb = ublk_io_done; 1183 } 1184 rc = spdk_bdev_read_blocks(desc, ch, io->payload, offset_blocks, num_blocks, read_cb, io); 1185 break; 1186 case UBLK_IO_OP_WRITE: 1187 rc = spdk_bdev_write_blocks(desc, ch, io->payload, offset_blocks, num_blocks, ublk_io_done, io); 1188 break; 1189 case UBLK_IO_OP_FLUSH: 1190 rc = spdk_bdev_flush_blocks(desc, ch, 0, spdk_bdev_get_num_blocks(ublk->bdev), ublk_io_done, io); 1191 break; 1192 case UBLK_IO_OP_DISCARD: 1193 rc = spdk_bdev_unmap_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io); 1194 break; 1195 case UBLK_IO_OP_WRITE_ZEROES: 1196 rc = spdk_bdev_write_zeroes_blocks(desc, ch, offset_blocks, num_blocks, ublk_io_done, io); 1197 break; 1198 default: 1199 rc = -1; 1200 } 1201 1202 if (rc < 0) { 1203 if (rc == -ENOMEM) { 1204 SPDK_INFOLOG(ublk, "No memory, start to queue io.\n"); 1205 ublk_queue_io(io); 1206 } else { 1207 SPDK_ERRLOG("ublk io failed in ublk_queue_io, rc=%d, ublk_op=%u\n", rc, ublk_op); 1208 ublk_io_done(NULL, false, io); 1209 } 1210 } 1211 } 1212 1213 static void 1214 read_get_buffer_done(struct ublk_io *io) 1215 { 1216 _ublk_submit_bdev_io(io->q, io); 1217 } 1218 1219 static void 1220 user_copy_write_get_buffer_done(struct ublk_io *io) 1221 { 1222 ublk_queue_user_copy(io, true); 1223 } 1224 1225 static void 1226 ublk_submit_bdev_io(struct ublk_queue *q, struct ublk_io *io) 1227 { 1228 struct spdk_iobuf_channel *iobuf_ch = &q->poll_group->iobuf_ch; 1229 const struct ublksrv_io_desc *iod = io->iod; 1230 uint8_t ublk_op; 1231 1232 io->result = iod->nr_sectors * (1ULL << LINUX_SECTOR_SHIFT); 1233 ublk_op = ublksrv_get_op(iod); 1234 switch (ublk_op) { 1235 case UBLK_IO_OP_READ: 1236 ublk_io_get_buffer(io, iobuf_ch, read_get_buffer_done); 1237 break; 1238 case UBLK_IO_OP_WRITE: 1239 if (g_ublk_tgt.user_copy) { 1240 ublk_io_get_buffer(io, iobuf_ch, user_copy_write_get_buffer_done); 1241 } else { 1242 _ublk_submit_bdev_io(q, io); 1243 } 1244 break; 1245 default: 1246 _ublk_submit_bdev_io(q, io); 1247 break; 1248 } 1249 } 1250 1251 static inline void 1252 ublksrv_queue_io_cmd(struct ublk_queue *q, 1253 struct ublk_io *io, unsigned tag) 1254 { 1255 struct ublksrv_io_cmd *cmd; 1256 struct io_uring_sqe *sqe; 1257 unsigned int cmd_op = 0;; 1258 uint64_t user_data; 1259 1260 /* each io should have operation of fetching or committing */ 1261 assert((io->cmd_op == UBLK_IO_FETCH_REQ) || (io->cmd_op == UBLK_IO_NEED_GET_DATA) || 1262 (io->cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ)); 1263 cmd_op = io->cmd_op; 1264 1265 sqe = io_uring_get_sqe(&q->ring); 1266 assert(sqe); 1267 1268 cmd = (struct ublksrv_io_cmd *)ublk_get_sqe_cmd(sqe); 1269 if (cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ) { 1270 cmd->result = io->result; 1271 } 1272 1273 /* These fields should be written once, never change */ 1274 ublk_set_sqe_cmd_op(sqe, cmd_op); 1275 /* dev->cdev_fd */ 1276 sqe->fd = 0; 1277 sqe->opcode = IORING_OP_URING_CMD; 1278 sqe->flags = IOSQE_FIXED_FILE; 1279 sqe->rw_flags = 0; 1280 cmd->tag = tag; 1281 cmd->addr = g_ublk_tgt.user_copy ? 0 : (__u64)(uintptr_t)(io->payload); 1282 cmd->q_id = q->q_id; 1283 1284 user_data = build_user_data(tag, cmd_op); 1285 io_uring_sqe_set_data64(sqe, user_data); 1286 1287 io->cmd_op = 0; 1288 1289 SPDK_DEBUGLOG(ublk_io, "(qid %d tag %u cmd_op %u) iof %x stopping %d\n", 1290 q->q_id, tag, cmd_op, 1291 io->cmd_op, q->is_stopping); 1292 } 1293 1294 static int 1295 ublk_io_xmit(struct ublk_queue *q) 1296 { 1297 TAILQ_HEAD(, ublk_io) buffer_free_list; 1298 struct spdk_iobuf_channel *iobuf_ch; 1299 int rc = 0, count = 0; 1300 struct ublk_io *io; 1301 1302 if (TAILQ_EMPTY(&q->completed_io_list)) { 1303 return 0; 1304 } 1305 1306 TAILQ_INIT(&buffer_free_list); 1307 while (!TAILQ_EMPTY(&q->completed_io_list)) { 1308 io = TAILQ_FIRST(&q->completed_io_list); 1309 assert(io != NULL); 1310 /* 1311 * Remove IO from list now assuming it will be completed. It will be inserted 1312 * back to the head if it cannot be completed. This approach is specifically 1313 * taken to work around a scan-build use-after-free mischaracterization. 1314 */ 1315 TAILQ_REMOVE(&q->completed_io_list, io, tailq); 1316 if (!io->user_copy) { 1317 if (!io->need_data) { 1318 TAILQ_INSERT_TAIL(&buffer_free_list, io, tailq); 1319 } 1320 ublksrv_queue_io_cmd(q, io, io->tag); 1321 } 1322 count++; 1323 } 1324 1325 q->cmd_inflight += count; 1326 rc = io_uring_submit(&q->ring); 1327 if (rc != count) { 1328 SPDK_ERRLOG("could not submit all commands\n"); 1329 assert(false); 1330 } 1331 1332 /* Note: for READ io, ublk will always copy the data out of 1333 * the buffers in the io_uring_submit context. Since we 1334 * are not using SQPOLL for IO rings, we can safely free 1335 * those IO buffers here. This design doesn't seem ideal, 1336 * but it's what's possible since there is no discrete 1337 * COMMIT_REQ operation. That will need to change in the 1338 * future should we ever want to support async copy 1339 * operations. 1340 */ 1341 iobuf_ch = &q->poll_group->iobuf_ch; 1342 while (!TAILQ_EMPTY(&buffer_free_list)) { 1343 io = TAILQ_FIRST(&buffer_free_list); 1344 TAILQ_REMOVE(&buffer_free_list, io, tailq); 1345 ublk_io_put_buffer(io, iobuf_ch); 1346 } 1347 return rc; 1348 } 1349 1350 static void 1351 write_get_buffer_done(struct ublk_io *io) 1352 { 1353 io->need_data = true; 1354 io->cmd_op = UBLK_IO_NEED_GET_DATA; 1355 io->result = 0; 1356 1357 TAILQ_REMOVE(&io->q->inflight_io_list, io, tailq); 1358 TAILQ_INSERT_TAIL(&io->q->completed_io_list, io, tailq); 1359 } 1360 1361 static int 1362 ublk_io_recv(struct ublk_queue *q) 1363 { 1364 struct io_uring_cqe *cqe; 1365 unsigned head, tag; 1366 int fetch, count = 0; 1367 struct ublk_io *io; 1368 struct spdk_iobuf_channel *iobuf_ch; 1369 1370 if (q->cmd_inflight == 0) { 1371 return 0; 1372 } 1373 1374 iobuf_ch = &q->poll_group->iobuf_ch; 1375 io_uring_for_each_cqe(&q->ring, head, cqe) { 1376 tag = user_data_to_tag(cqe->user_data); 1377 io = &q->ios[tag]; 1378 1379 SPDK_DEBUGLOG(ublk_io, "res %d qid %d tag %u, user copy %u, cmd_op %u\n", 1380 cqe->res, q->q_id, tag, io->user_copy, user_data_to_op(cqe->user_data)); 1381 1382 q->cmd_inflight--; 1383 TAILQ_INSERT_TAIL(&q->inflight_io_list, io, tailq); 1384 1385 if (!io->user_copy) { 1386 fetch = (cqe->res != UBLK_IO_RES_ABORT) && !q->is_stopping; 1387 if (!fetch) { 1388 q->is_stopping = true; 1389 if (io->cmd_op == UBLK_IO_FETCH_REQ) { 1390 io->cmd_op = 0; 1391 } 1392 } 1393 1394 if (cqe->res == UBLK_IO_RES_OK) { 1395 ublk_submit_bdev_io(q, io); 1396 } else if (cqe->res == UBLK_IO_RES_NEED_GET_DATA) { 1397 ublk_io_get_buffer(io, iobuf_ch, write_get_buffer_done); 1398 } else { 1399 if (cqe->res != UBLK_IO_RES_ABORT) { 1400 SPDK_ERRLOG("ublk received error io: res %d qid %d tag %u cmd_op %u\n", 1401 cqe->res, q->q_id, tag, user_data_to_op(cqe->user_data)); 1402 } 1403 TAILQ_REMOVE(&q->inflight_io_list, io, tailq); 1404 } 1405 } else { 1406 1407 /* clear `user_copy` for next use of this IO structure */ 1408 io->user_copy = false; 1409 1410 assert((ublksrv_get_op(io->iod) == UBLK_IO_OP_READ) || 1411 (ublksrv_get_op(io->iod) == UBLK_IO_OP_WRITE)); 1412 if (cqe->res != io->result) { 1413 /* EIO */ 1414 ublk_io_done(NULL, false, io); 1415 } else { 1416 if (ublksrv_get_op(io->iod) == UBLK_IO_OP_READ) { 1417 /* bdev_io is already freed in first READ cycle */ 1418 ublk_io_done(NULL, true, io); 1419 } else { 1420 _ublk_submit_bdev_io(q, io); 1421 } 1422 } 1423 } 1424 count += 1; 1425 if (count == UBLK_QUEUE_REQUEST) { 1426 break; 1427 } 1428 } 1429 io_uring_cq_advance(&q->ring, count); 1430 1431 return count; 1432 } 1433 1434 static int 1435 ublk_poll(void *arg) 1436 { 1437 struct ublk_poll_group *poll_group = arg; 1438 struct ublk_queue *q, *q_tmp; 1439 int sent, received, count = 0; 1440 1441 TAILQ_FOREACH_SAFE(q, &poll_group->queue_list, tailq, q_tmp) { 1442 sent = ublk_io_xmit(q); 1443 received = ublk_io_recv(q); 1444 if (spdk_unlikely(q->is_stopping)) { 1445 ublk_try_close_queue(q); 1446 } 1447 count += sent + received; 1448 } 1449 if (count > 0) { 1450 return SPDK_POLLER_BUSY; 1451 } else { 1452 return SPDK_POLLER_IDLE; 1453 } 1454 } 1455 1456 static void 1457 ublk_bdev_hot_remove(struct spdk_ublk_dev *ublk) 1458 { 1459 ublk_close_dev(ublk); 1460 } 1461 1462 static void 1463 ublk_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, 1464 void *event_ctx) 1465 { 1466 switch (type) { 1467 case SPDK_BDEV_EVENT_REMOVE: 1468 ublk_bdev_hot_remove(event_ctx); 1469 break; 1470 default: 1471 SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); 1472 break; 1473 } 1474 } 1475 1476 static void 1477 ublk_dev_init_io_cmds(struct io_uring *r, uint32_t q_depth) 1478 { 1479 struct io_uring_sqe *sqe; 1480 uint32_t i; 1481 1482 for (i = 0; i < q_depth; i++) { 1483 sqe = ublk_uring_get_sqe(r, i); 1484 1485 /* These fields should be written once, never change */ 1486 sqe->flags = IOSQE_FIXED_FILE; 1487 sqe->rw_flags = 0; 1488 sqe->ioprio = 0; 1489 sqe->off = 0; 1490 } 1491 } 1492 1493 static int 1494 ublk_dev_queue_init(struct ublk_queue *q) 1495 { 1496 int rc = 0, cmd_buf_size; 1497 uint32_t j; 1498 struct spdk_ublk_dev *ublk = q->dev; 1499 unsigned long off; 1500 1501 cmd_buf_size = ublk_queue_cmd_buf_sz(q->q_depth); 1502 off = UBLKSRV_CMD_BUF_OFFSET + 1503 q->q_id * (UBLK_MAX_QUEUE_DEPTH * sizeof(struct ublksrv_io_desc)); 1504 q->io_cmd_buf = (struct ublksrv_io_desc *)mmap(0, cmd_buf_size, PROT_READ, 1505 MAP_SHARED | MAP_POPULATE, ublk->cdev_fd, off); 1506 if (q->io_cmd_buf == MAP_FAILED) { 1507 q->io_cmd_buf = NULL; 1508 rc = -errno; 1509 SPDK_ERRLOG("Failed at mmap: %s\n", spdk_strerror(-rc)); 1510 return rc; 1511 } 1512 1513 for (j = 0; j < q->q_depth; j++) { 1514 q->ios[j].cmd_op = UBLK_IO_FETCH_REQ; 1515 q->ios[j].iod = &q->io_cmd_buf[j]; 1516 } 1517 1518 rc = ublk_setup_ring(q->q_depth, &q->ring, IORING_SETUP_SQE128); 1519 if (rc < 0) { 1520 SPDK_ERRLOG("Failed at setup uring: %s\n", spdk_strerror(-rc)); 1521 munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth)); 1522 q->io_cmd_buf = NULL; 1523 return rc; 1524 } 1525 1526 rc = io_uring_register_files(&q->ring, &ublk->cdev_fd, 1); 1527 if (rc != 0) { 1528 SPDK_ERRLOG("Failed at uring register files: %s\n", spdk_strerror(-rc)); 1529 io_uring_queue_exit(&q->ring); 1530 q->ring.ring_fd = -1; 1531 munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth)); 1532 q->io_cmd_buf = NULL; 1533 return rc; 1534 } 1535 1536 ublk_dev_init_io_cmds(&q->ring, q->q_depth); 1537 1538 return 0; 1539 } 1540 1541 static void 1542 ublk_dev_queue_fini(struct ublk_queue *q) 1543 { 1544 if (q->ring.ring_fd >= 0) { 1545 io_uring_unregister_files(&q->ring); 1546 io_uring_queue_exit(&q->ring); 1547 q->ring.ring_fd = -1; 1548 } 1549 if (q->io_cmd_buf) { 1550 munmap(q->io_cmd_buf, ublk_queue_cmd_buf_sz(q->q_depth)); 1551 } 1552 } 1553 1554 static void 1555 ublk_dev_queue_io_init(struct ublk_queue *q) 1556 { 1557 struct ublk_io *io; 1558 uint32_t i; 1559 int rc __attribute__((unused)); 1560 void *buf; 1561 1562 /* Some older kernels require a buffer to get posted, even 1563 * when NEED_GET_DATA has been specified. So allocate a 1564 * temporary buffer, only for purposes of this workaround. 1565 * It never actually gets used, so we will free it immediately 1566 * after all of the commands are posted. 1567 */ 1568 buf = malloc(64); 1569 1570 assert(q->bdev_ch != NULL); 1571 1572 /* Initialize and submit all io commands to ublk driver */ 1573 for (i = 0; i < q->q_depth; i++) { 1574 io = &q->ios[i]; 1575 io->tag = (uint16_t)i; 1576 io->payload = buf; 1577 io->bdev_ch = q->bdev_ch; 1578 io->bdev_desc = q->dev->bdev_desc; 1579 ublksrv_queue_io_cmd(q, io, i); 1580 } 1581 1582 q->cmd_inflight += q->q_depth; 1583 rc = io_uring_submit(&q->ring); 1584 assert(rc == (int)q->q_depth); 1585 for (i = 0; i < q->q_depth; i++) { 1586 io = &q->ios[i]; 1587 io->payload = NULL; 1588 } 1589 free(buf); 1590 } 1591 1592 static int 1593 ublk_set_params(struct spdk_ublk_dev *ublk) 1594 { 1595 int rc; 1596 1597 rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_SET_PARAMS); 1598 if (rc < 0) { 1599 SPDK_ERRLOG("UBLK can't set params for dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc)); 1600 } 1601 1602 return rc; 1603 } 1604 1605 static void 1606 ublk_dev_info_init(struct spdk_ublk_dev *ublk) 1607 { 1608 struct ublksrv_ctrl_dev_info uinfo = { 1609 .queue_depth = ublk->queue_depth, 1610 .nr_hw_queues = ublk->num_queues, 1611 .dev_id = ublk->ublk_id, 1612 .max_io_buf_bytes = UBLK_IO_MAX_BYTES, 1613 .ublksrv_pid = getpid(), 1614 .flags = UBLK_F_URING_CMD_COMP_IN_TASK, 1615 }; 1616 1617 if (g_ublk_tgt.user_copy) { 1618 uinfo.flags |= UBLK_F_USER_COPY; 1619 } else { 1620 uinfo.flags |= UBLK_F_NEED_GET_DATA; 1621 } 1622 1623 ublk->dev_info = uinfo; 1624 } 1625 1626 /* Set ublk device parameters based on bdev */ 1627 static void 1628 ublk_info_param_init(struct spdk_ublk_dev *ublk) 1629 { 1630 struct spdk_bdev *bdev = ublk->bdev; 1631 uint32_t blk_size = spdk_bdev_get_data_block_size(bdev); 1632 uint32_t pblk_size = spdk_bdev_get_physical_block_size(bdev); 1633 uint32_t io_opt_blocks = spdk_bdev_get_optimal_io_boundary(bdev); 1634 uint64_t num_blocks = spdk_bdev_get_num_blocks(bdev); 1635 uint8_t sectors_per_block = blk_size >> LINUX_SECTOR_SHIFT; 1636 uint32_t io_min_size = blk_size; 1637 uint32_t io_opt_size = spdk_max(io_opt_blocks * blk_size, io_min_size); 1638 1639 struct ublk_params uparams = { 1640 .types = UBLK_PARAM_TYPE_BASIC, 1641 .len = sizeof(struct ublk_params), 1642 .basic = { 1643 .logical_bs_shift = spdk_u32log2(blk_size), 1644 .physical_bs_shift = spdk_u32log2(pblk_size), 1645 .io_min_shift = spdk_u32log2(io_min_size), 1646 .io_opt_shift = spdk_u32log2(io_opt_size), 1647 .dev_sectors = num_blocks * sectors_per_block, 1648 .max_sectors = UBLK_IO_MAX_BYTES >> LINUX_SECTOR_SHIFT, 1649 } 1650 }; 1651 1652 if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_FLUSH)) { 1653 uparams.basic.attrs = UBLK_ATTR_VOLATILE_CACHE; 1654 } 1655 1656 if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) { 1657 uparams.types |= UBLK_PARAM_TYPE_DISCARD; 1658 uparams.discard.discard_alignment = sectors_per_block; 1659 uparams.discard.max_discard_sectors = num_blocks * sectors_per_block; 1660 uparams.discard.max_discard_segments = 1; 1661 uparams.discard.discard_granularity = blk_size; 1662 if (spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_WRITE_ZEROES)) { 1663 uparams.discard.max_write_zeroes_sectors = num_blocks * sectors_per_block; 1664 } 1665 } 1666 1667 ublk->dev_params = uparams; 1668 } 1669 1670 static void 1671 _ublk_free_dev(void *arg) 1672 { 1673 struct spdk_ublk_dev *ublk = arg; 1674 1675 ublk_free_dev(ublk); 1676 } 1677 1678 static void 1679 free_buffers(void *arg) 1680 { 1681 struct ublk_queue *q = arg; 1682 uint32_t i; 1683 1684 for (i = 0; i < q->q_depth; i++) { 1685 ublk_io_put_buffer(&q->ios[i], &q->poll_group->iobuf_ch); 1686 } 1687 free(q->ios); 1688 q->ios = NULL; 1689 spdk_thread_send_msg(spdk_thread_get_app_thread(), _ublk_free_dev, q->dev); 1690 } 1691 1692 static void 1693 ublk_free_dev(struct spdk_ublk_dev *ublk) 1694 { 1695 struct ublk_queue *q; 1696 uint32_t q_idx; 1697 1698 for (q_idx = 0; q_idx < ublk->num_queues; q_idx++) { 1699 q = &ublk->queues[q_idx]; 1700 1701 /* The ublk_io of this queue are not initialized. */ 1702 if (q->ios == NULL) { 1703 continue; 1704 } 1705 1706 /* We found a queue that has an ios array that may have buffers 1707 * that need to be freed. Send a message to the queue's thread 1708 * so it can free the buffers back to that thread's iobuf channel. 1709 * When it's done, it will set q->ios to NULL and send a message 1710 * back to this function to continue. 1711 */ 1712 if (q->poll_group) { 1713 spdk_thread_send_msg(q->poll_group->ublk_thread, free_buffers, q); 1714 return; 1715 } else { 1716 free(q->ios); 1717 q->ios = NULL; 1718 } 1719 } 1720 1721 /* All of the buffers associated with the queues have been freed, so now 1722 * continue with releasing resources for the rest of the ublk device. 1723 */ 1724 if (ublk->bdev_desc) { 1725 spdk_bdev_close(ublk->bdev_desc); 1726 ublk->bdev_desc = NULL; 1727 } 1728 1729 ublk_dev_list_unregister(ublk); 1730 SPDK_NOTICELOG("ublk dev %d stopped\n", ublk->ublk_id); 1731 1732 free(ublk); 1733 } 1734 1735 static int 1736 ublk_ios_init(struct spdk_ublk_dev *ublk) 1737 { 1738 int rc; 1739 uint32_t i, j; 1740 struct ublk_queue *q; 1741 1742 for (i = 0; i < ublk->num_queues; i++) { 1743 q = &ublk->queues[i]; 1744 1745 TAILQ_INIT(&q->completed_io_list); 1746 TAILQ_INIT(&q->inflight_io_list); 1747 q->dev = ublk; 1748 q->q_id = i; 1749 q->q_depth = ublk->queue_depth; 1750 q->ios = calloc(q->q_depth, sizeof(struct ublk_io)); 1751 if (!q->ios) { 1752 rc = -ENOMEM; 1753 SPDK_ERRLOG("could not allocate queue ios\n"); 1754 goto err; 1755 } 1756 for (j = 0; j < q->q_depth; j++) { 1757 q->ios[j].q = q; 1758 } 1759 } 1760 1761 return 0; 1762 1763 err: 1764 for (i = 0; i < ublk->num_queues; i++) { 1765 free(q->ios); 1766 q->ios = NULL; 1767 } 1768 return rc; 1769 } 1770 1771 static void 1772 ublk_queue_run(void *arg1) 1773 { 1774 struct ublk_queue *q = arg1; 1775 struct spdk_ublk_dev *ublk = q->dev; 1776 struct ublk_poll_group *poll_group = q->poll_group; 1777 1778 assert(spdk_get_thread() == poll_group->ublk_thread); 1779 q->bdev_ch = spdk_bdev_get_io_channel(ublk->bdev_desc); 1780 /* Queues must be filled with IO in the io pthread */ 1781 ublk_dev_queue_io_init(q); 1782 1783 TAILQ_INSERT_TAIL(&poll_group->queue_list, q, tailq); 1784 } 1785 1786 int 1787 ublk_start_disk(const char *bdev_name, uint32_t ublk_id, 1788 uint32_t num_queues, uint32_t queue_depth, 1789 ublk_ctrl_cb ctrl_cb, void *cb_arg) 1790 { 1791 int rc; 1792 uint32_t i; 1793 struct spdk_bdev *bdev; 1794 struct spdk_ublk_dev *ublk = NULL; 1795 uint32_t sector_per_block; 1796 1797 assert(spdk_thread_is_app_thread(NULL)); 1798 1799 if (g_ublk_tgt.active == false) { 1800 SPDK_ERRLOG("NO ublk target exist\n"); 1801 return -ENODEV; 1802 } 1803 1804 ublk = ublk_dev_find_by_id(ublk_id); 1805 if (ublk != NULL) { 1806 SPDK_DEBUGLOG(ublk, "ublk id %d is in use.\n", ublk_id); 1807 return -EBUSY; 1808 } 1809 1810 if (g_ublk_tgt.num_ublk_devs >= g_ublks_max) { 1811 SPDK_DEBUGLOG(ublk, "Reached maximum number of supported devices: %u\n", g_ublks_max); 1812 return -ENOTSUP; 1813 } 1814 1815 ublk = calloc(1, sizeof(*ublk)); 1816 if (ublk == NULL) { 1817 return -ENOMEM; 1818 } 1819 ublk->ctrl_cb = ctrl_cb; 1820 ublk->cb_arg = cb_arg; 1821 ublk->cdev_fd = -1; 1822 ublk->ublk_id = ublk_id; 1823 UBLK_DEBUGLOG(ublk, "bdev %s num_queues %d queue_depth %d\n", 1824 bdev_name, num_queues, queue_depth); 1825 1826 rc = spdk_bdev_open_ext(bdev_name, true, ublk_bdev_event_cb, ublk, &ublk->bdev_desc); 1827 if (rc != 0) { 1828 SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc); 1829 free(ublk); 1830 return rc; 1831 } 1832 1833 bdev = spdk_bdev_desc_get_bdev(ublk->bdev_desc); 1834 ublk->bdev = bdev; 1835 sector_per_block = spdk_bdev_get_data_block_size(ublk->bdev) >> LINUX_SECTOR_SHIFT; 1836 ublk->sector_per_block_shift = spdk_u32log2(sector_per_block); 1837 1838 ublk->queues_closed = 0; 1839 ublk->num_queues = num_queues; 1840 ublk->queue_depth = queue_depth; 1841 if (ublk->queue_depth > UBLK_DEV_MAX_QUEUE_DEPTH) { 1842 SPDK_WARNLOG("Set Queue depth %d of UBLK %d to maximum %d\n", 1843 ublk->queue_depth, ublk->ublk_id, UBLK_DEV_MAX_QUEUE_DEPTH); 1844 ublk->queue_depth = UBLK_DEV_MAX_QUEUE_DEPTH; 1845 } 1846 if (ublk->num_queues > UBLK_DEV_MAX_QUEUES) { 1847 SPDK_WARNLOG("Set Queue num %d of UBLK %d to maximum %d\n", 1848 ublk->num_queues, ublk->ublk_id, UBLK_DEV_MAX_QUEUES); 1849 ublk->num_queues = UBLK_DEV_MAX_QUEUES; 1850 } 1851 for (i = 0; i < ublk->num_queues; i++) { 1852 ublk->queues[i].ring.ring_fd = -1; 1853 } 1854 1855 ublk_dev_info_init(ublk); 1856 ublk_info_param_init(ublk); 1857 rc = ublk_ios_init(ublk); 1858 if (rc != 0) { 1859 spdk_bdev_close(ublk->bdev_desc); 1860 free(ublk); 1861 return rc; 1862 } 1863 1864 SPDK_INFOLOG(ublk, "Enabling kernel access to bdev %s via ublk %d\n", 1865 bdev_name, ublk_id); 1866 1867 /* Add ublk_dev to the end of disk list */ 1868 ublk_dev_list_register(ublk); 1869 rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_ADD_DEV); 1870 if (rc < 0) { 1871 SPDK_ERRLOG("UBLK can't add dev %d, rc %s\n", ublk->ublk_id, spdk_strerror(-rc)); 1872 ublk_free_dev(ublk); 1873 } 1874 1875 return rc; 1876 } 1877 1878 static int 1879 ublk_finish_start(struct spdk_ublk_dev *ublk) 1880 { 1881 int rc; 1882 uint32_t q_id; 1883 struct spdk_thread *ublk_thread; 1884 char buf[64]; 1885 1886 snprintf(buf, 64, "%s%d", UBLK_BLK_CDEV, ublk->ublk_id); 1887 ublk->cdev_fd = open(buf, O_RDWR); 1888 if (ublk->cdev_fd < 0) { 1889 rc = ublk->cdev_fd; 1890 SPDK_ERRLOG("can't open %s, rc %d\n", buf, rc); 1891 return rc; 1892 } 1893 1894 for (q_id = 0; q_id < ublk->num_queues; q_id++) { 1895 rc = ublk_dev_queue_init(&ublk->queues[q_id]); 1896 if (rc) { 1897 return rc; 1898 } 1899 } 1900 1901 rc = ublk_ctrl_cmd_submit(ublk, UBLK_CMD_START_DEV); 1902 if (rc < 0) { 1903 SPDK_ERRLOG("start dev %d failed, rc %s\n", ublk->ublk_id, 1904 spdk_strerror(-rc)); 1905 return rc; 1906 } 1907 1908 /* Send queue to different spdk_threads for load balance */ 1909 for (q_id = 0; q_id < ublk->num_queues; q_id++) { 1910 ublk->queues[q_id].poll_group = &g_ublk_tgt.poll_groups[g_next_ublk_poll_group]; 1911 ublk_thread = g_ublk_tgt.poll_groups[g_next_ublk_poll_group].ublk_thread; 1912 spdk_thread_send_msg(ublk_thread, ublk_queue_run, &ublk->queues[q_id]); 1913 g_next_ublk_poll_group++; 1914 if (g_next_ublk_poll_group == g_num_ublk_poll_groups) { 1915 g_next_ublk_poll_group = 0; 1916 } 1917 } 1918 1919 return 0; 1920 } 1921 1922 SPDK_LOG_REGISTER_COMPONENT(ublk) 1923 SPDK_LOG_REGISTER_COMPONENT(ublk_io) 1924