1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2017 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/string.h" 8 9 #include <linux/nbd.h> 10 11 #include "spdk/nbd.h" 12 #include "nbd_internal.h" 13 #include "spdk/bdev.h" 14 #include "spdk/endian.h" 15 #include "spdk/env.h" 16 #include "spdk/likely.h" 17 #include "spdk/log.h" 18 #include "spdk/util.h" 19 #include "spdk/thread.h" 20 21 #include "spdk/queue.h" 22 23 #define GET_IO_LOOP_COUNT 16 24 #define NBD_START_BUSY_WAITING_MS 1000 25 #define NBD_STOP_BUSY_WAITING_MS 10000 26 #define NBD_BUSY_POLLING_INTERVAL_US 20000 27 #define NBD_IO_TIMEOUT_S 60 28 29 enum nbd_io_state_t { 30 /* Receiving or ready to receive nbd request header */ 31 NBD_IO_RECV_REQ = 0, 32 /* Receiving write payload */ 33 NBD_IO_RECV_PAYLOAD, 34 /* Transmitting or ready to transmit nbd response header */ 35 NBD_IO_XMIT_RESP, 36 /* Transmitting read payload */ 37 NBD_IO_XMIT_PAYLOAD, 38 }; 39 40 struct nbd_io { 41 struct spdk_nbd_disk *nbd; 42 enum nbd_io_state_t state; 43 44 void *payload; 45 uint32_t payload_size; 46 47 struct nbd_request req; 48 struct nbd_reply resp; 49 50 /* 51 * Tracks current progress on reading/writing a request, 52 * response, or payload from the nbd socket. 53 */ 54 uint32_t offset; 55 56 /* for bdev io_wait */ 57 struct spdk_bdev_io_wait_entry bdev_io_wait; 58 59 TAILQ_ENTRY(nbd_io) tailq; 60 }; 61 62 struct spdk_nbd_disk { 63 struct spdk_bdev *bdev; 64 struct spdk_bdev_desc *bdev_desc; 65 struct spdk_io_channel *ch; 66 int dev_fd; 67 char *nbd_path; 68 int kernel_sp_fd; 69 int spdk_sp_fd; 70 struct spdk_poller *nbd_poller; 71 struct spdk_interrupt *intr; 72 bool interrupt_mode; 73 uint32_t buf_align; 74 75 struct spdk_poller *retry_poller; 76 int retry_count; 77 /* Synchronize nbd_start_kernel pthread and nbd_stop */ 78 bool has_nbd_pthread; 79 80 struct nbd_io *io_in_recv; 81 TAILQ_HEAD(, nbd_io) received_io_list; 82 TAILQ_HEAD(, nbd_io) executed_io_list; 83 TAILQ_HEAD(, nbd_io) processing_io_list; 84 85 bool is_started; 86 bool is_closing; 87 /* count of nbd_io in spdk_nbd_disk */ 88 int io_count; 89 90 TAILQ_ENTRY(spdk_nbd_disk) tailq; 91 }; 92 93 struct spdk_nbd_disk_globals { 94 TAILQ_HEAD(, spdk_nbd_disk) disk_head; 95 }; 96 97 static struct spdk_nbd_disk_globals g_spdk_nbd; 98 static spdk_nbd_fini_cb g_fini_cb_fn; 99 static void *g_fini_cb_arg; 100 101 static void _nbd_fini(void *arg1); 102 103 static int nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io); 104 static int nbd_io_recv_internal(struct spdk_nbd_disk *nbd); 105 106 int 107 spdk_nbd_init(void) 108 { 109 TAILQ_INIT(&g_spdk_nbd.disk_head); 110 111 return 0; 112 } 113 114 static void 115 _nbd_fini(void *arg1) 116 { 117 struct spdk_nbd_disk *nbd, *nbd_tmp; 118 119 TAILQ_FOREACH_SAFE(nbd, &g_spdk_nbd.disk_head, tailq, nbd_tmp) { 120 if (!nbd->is_closing) { 121 spdk_nbd_stop(nbd); 122 } 123 } 124 125 /* Check if all nbds closed */ 126 if (!TAILQ_FIRST(&g_spdk_nbd.disk_head)) { 127 g_fini_cb_fn(g_fini_cb_arg); 128 } else { 129 spdk_thread_send_msg(spdk_get_thread(), 130 _nbd_fini, NULL); 131 } 132 } 133 134 void 135 spdk_nbd_fini(spdk_nbd_fini_cb cb_fn, void *cb_arg) 136 { 137 g_fini_cb_fn = cb_fn; 138 g_fini_cb_arg = cb_arg; 139 140 _nbd_fini(NULL); 141 } 142 143 static int 144 nbd_disk_register(struct spdk_nbd_disk *nbd) 145 { 146 /* Make sure nbd_path is not used in this SPDK app */ 147 if (nbd_disk_find_by_nbd_path(nbd->nbd_path)) { 148 SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path); 149 return -EBUSY; 150 } 151 152 TAILQ_INSERT_TAIL(&g_spdk_nbd.disk_head, nbd, tailq); 153 154 return 0; 155 } 156 157 static void 158 nbd_disk_unregister(struct spdk_nbd_disk *nbd) 159 { 160 struct spdk_nbd_disk *nbd_idx, *nbd_tmp; 161 162 /* 163 * nbd disk may be stopped before registered. 164 * check whether it was registered. 165 */ 166 TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) { 167 if (nbd == nbd_idx) { 168 TAILQ_REMOVE(&g_spdk_nbd.disk_head, nbd_idx, tailq); 169 break; 170 } 171 } 172 } 173 174 struct spdk_nbd_disk * 175 nbd_disk_find_by_nbd_path(const char *nbd_path) 176 { 177 struct spdk_nbd_disk *nbd; 178 179 /* 180 * check whether nbd has already been registered by nbd path. 181 */ 182 TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) { 183 if (!strcmp(nbd->nbd_path, nbd_path)) { 184 return nbd; 185 } 186 } 187 188 return NULL; 189 } 190 191 struct spdk_nbd_disk *nbd_disk_first(void) 192 { 193 return TAILQ_FIRST(&g_spdk_nbd.disk_head); 194 } 195 196 struct spdk_nbd_disk *nbd_disk_next(struct spdk_nbd_disk *prev) 197 { 198 return TAILQ_NEXT(prev, tailq); 199 } 200 201 const char * 202 nbd_disk_get_nbd_path(struct spdk_nbd_disk *nbd) 203 { 204 return nbd->nbd_path; 205 } 206 207 const char * 208 nbd_disk_get_bdev_name(struct spdk_nbd_disk *nbd) 209 { 210 return spdk_bdev_get_name(nbd->bdev); 211 } 212 213 void 214 spdk_nbd_write_config_json(struct spdk_json_write_ctx *w) 215 { 216 struct spdk_nbd_disk *nbd; 217 218 spdk_json_write_array_begin(w); 219 220 TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) { 221 spdk_json_write_object_begin(w); 222 223 spdk_json_write_named_string(w, "method", "nbd_start_disk"); 224 225 spdk_json_write_named_object_begin(w, "params"); 226 spdk_json_write_named_string(w, "nbd_device", nbd_disk_get_nbd_path(nbd)); 227 spdk_json_write_named_string(w, "bdev_name", nbd_disk_get_bdev_name(nbd)); 228 spdk_json_write_object_end(w); 229 230 spdk_json_write_object_end(w); 231 } 232 233 spdk_json_write_array_end(w); 234 } 235 236 void 237 nbd_disconnect(struct spdk_nbd_disk *nbd) 238 { 239 /* 240 * nbd soft-disconnection to terminate transmission phase. 241 * After receiving this ioctl command, nbd kernel module will send 242 * a NBD_CMD_DISC type io to nbd server in order to inform server. 243 */ 244 ioctl(nbd->dev_fd, NBD_DISCONNECT); 245 } 246 247 static struct nbd_io * 248 nbd_get_io(struct spdk_nbd_disk *nbd) 249 { 250 struct nbd_io *io; 251 252 io = calloc(1, sizeof(*io)); 253 if (!io) { 254 return NULL; 255 } 256 257 io->nbd = nbd; 258 to_be32(&io->resp.magic, NBD_REPLY_MAGIC); 259 260 nbd->io_count++; 261 262 return io; 263 } 264 265 static void 266 nbd_put_io(struct spdk_nbd_disk *nbd, struct nbd_io *io) 267 { 268 if (io->payload) { 269 spdk_free(io->payload); 270 } 271 free(io); 272 273 nbd->io_count--; 274 } 275 276 /* 277 * Check whether received nbd_io are all executed, 278 * and put back executed nbd_io instead of transmitting them 279 * 280 * \return 1 there is still some nbd_io under executing 281 * 0 all nbd_io gotten are freed. 282 */ 283 static int 284 nbd_cleanup_io(struct spdk_nbd_disk *nbd) 285 { 286 /* Try to read the remaining nbd commands in the socket */ 287 while (nbd_io_recv_internal(nbd) > 0); 288 289 /* free io_in_recv */ 290 if (nbd->io_in_recv != NULL) { 291 nbd_put_io(nbd, nbd->io_in_recv); 292 nbd->io_in_recv = NULL; 293 } 294 295 /* 296 * Some nbd_io may be under executing in bdev. 297 * Wait for their done operation. 298 */ 299 if (nbd->io_count != 0) { 300 return 1; 301 } 302 303 return 0; 304 } 305 306 static int 307 _nbd_stop(void *arg) 308 { 309 struct spdk_nbd_disk *nbd = arg; 310 311 if (nbd->nbd_poller) { 312 spdk_poller_unregister(&nbd->nbd_poller); 313 } 314 315 if (nbd->intr) { 316 spdk_interrupt_unregister(&nbd->intr); 317 } 318 319 if (nbd->spdk_sp_fd >= 0) { 320 close(nbd->spdk_sp_fd); 321 nbd->spdk_sp_fd = -1; 322 } 323 324 if (nbd->kernel_sp_fd >= 0) { 325 close(nbd->kernel_sp_fd); 326 nbd->kernel_sp_fd = -1; 327 } 328 329 /* Continue the stop procedure after the exit of nbd_start_kernel pthread */ 330 if (nbd->has_nbd_pthread) { 331 if (nbd->retry_poller == NULL) { 332 nbd->retry_count = NBD_STOP_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US; 333 nbd->retry_poller = SPDK_POLLER_REGISTER(_nbd_stop, nbd, 334 NBD_BUSY_POLLING_INTERVAL_US); 335 return SPDK_POLLER_BUSY; 336 } 337 338 if (nbd->retry_count-- > 0) { 339 return SPDK_POLLER_BUSY; 340 } 341 342 SPDK_ERRLOG("Failed to wait for returning of NBD_DO_IT ioctl.\n"); 343 } 344 345 if (nbd->retry_poller) { 346 spdk_poller_unregister(&nbd->retry_poller); 347 } 348 349 if (nbd->dev_fd >= 0) { 350 /* Clear nbd device only if it is occupied by SPDK app */ 351 if (nbd->nbd_path && nbd_disk_find_by_nbd_path(nbd->nbd_path)) { 352 ioctl(nbd->dev_fd, NBD_CLEAR_QUE); 353 ioctl(nbd->dev_fd, NBD_CLEAR_SOCK); 354 } 355 close(nbd->dev_fd); 356 } 357 358 if (nbd->nbd_path) { 359 free(nbd->nbd_path); 360 } 361 362 if (nbd->ch) { 363 spdk_put_io_channel(nbd->ch); 364 nbd->ch = NULL; 365 } 366 367 if (nbd->bdev_desc) { 368 spdk_bdev_close(nbd->bdev_desc); 369 nbd->bdev_desc = NULL; 370 } 371 372 nbd_disk_unregister(nbd); 373 374 free(nbd); 375 376 return 0; 377 } 378 379 int 380 spdk_nbd_stop(struct spdk_nbd_disk *nbd) 381 { 382 int rc = 0; 383 384 if (nbd == NULL) { 385 return rc; 386 } 387 388 nbd->is_closing = true; 389 390 /* if nbd is not started, it will continue to call nbd stop later */ 391 if (!nbd->is_started) { 392 return 1; 393 } 394 395 /* 396 * Stop action should be called only after all nbd_io are executed. 397 */ 398 399 rc = nbd_cleanup_io(nbd); 400 if (!rc) { 401 _nbd_stop(nbd); 402 } 403 404 return rc; 405 } 406 407 static int64_t 408 nbd_socket_rw(int fd, void *buf, size_t length, bool read_op) 409 { 410 ssize_t rc; 411 412 if (read_op) { 413 rc = read(fd, buf, length); 414 } else { 415 rc = write(fd, buf, length); 416 } 417 418 if (rc == 0) { 419 return -EIO; 420 } else if (rc == -1) { 421 if (errno != EAGAIN) { 422 return -errno; 423 } 424 return 0; 425 } else { 426 return rc; 427 } 428 } 429 430 static void 431 nbd_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 432 { 433 struct nbd_io *io = cb_arg; 434 struct spdk_nbd_disk *nbd = io->nbd; 435 436 if (success) { 437 io->resp.error = 0; 438 } else { 439 to_be32(&io->resp.error, EIO); 440 } 441 442 memcpy(&io->resp.handle, &io->req.handle, sizeof(io->resp.handle)); 443 444 /* When there begins to have executed_io, enable socket writable notice in order to 445 * get it processed in nbd_io_xmit 446 */ 447 if (nbd->interrupt_mode && TAILQ_EMPTY(&nbd->executed_io_list)) { 448 spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT); 449 } 450 451 TAILQ_REMOVE(&nbd->processing_io_list, io, tailq); 452 TAILQ_INSERT_TAIL(&nbd->executed_io_list, io, tailq); 453 454 if (bdev_io != NULL) { 455 spdk_bdev_free_io(bdev_io); 456 } 457 } 458 459 static void 460 nbd_resubmit_io(void *arg) 461 { 462 struct nbd_io *io = (struct nbd_io *)arg; 463 struct spdk_nbd_disk *nbd = io->nbd; 464 int rc = 0; 465 466 rc = nbd_submit_bdev_io(nbd, io); 467 if (rc) { 468 SPDK_INFOLOG(nbd, "nbd: io resubmit for dev %s , io_type %d, returned %d.\n", 469 nbd_disk_get_bdev_name(nbd), from_be32(&io->req.type), rc); 470 } 471 } 472 473 static void 474 nbd_queue_io(struct nbd_io *io) 475 { 476 int rc; 477 struct spdk_bdev *bdev = io->nbd->bdev; 478 479 io->bdev_io_wait.bdev = bdev; 480 io->bdev_io_wait.cb_fn = nbd_resubmit_io; 481 io->bdev_io_wait.cb_arg = io; 482 483 rc = spdk_bdev_queue_io_wait(bdev, io->nbd->ch, &io->bdev_io_wait); 484 if (rc != 0) { 485 SPDK_ERRLOG("Queue io failed in nbd_queue_io, rc=%d.\n", rc); 486 nbd_io_done(NULL, false, io); 487 } 488 } 489 490 static int 491 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io) 492 { 493 struct spdk_bdev_desc *desc = nbd->bdev_desc; 494 struct spdk_io_channel *ch = nbd->ch; 495 int rc = 0; 496 497 switch (from_be32(&io->req.type)) { 498 case NBD_CMD_READ: 499 rc = spdk_bdev_read(desc, ch, io->payload, from_be64(&io->req.from), 500 io->payload_size, nbd_io_done, io); 501 break; 502 case NBD_CMD_WRITE: 503 rc = spdk_bdev_write(desc, ch, io->payload, from_be64(&io->req.from), 504 io->payload_size, nbd_io_done, io); 505 break; 506 #ifdef NBD_FLAG_SEND_FLUSH 507 case NBD_CMD_FLUSH: 508 rc = spdk_bdev_flush(desc, ch, 0, 509 spdk_bdev_get_num_blocks(nbd->bdev) * spdk_bdev_get_block_size(nbd->bdev), 510 nbd_io_done, io); 511 break; 512 #endif 513 #ifdef NBD_FLAG_SEND_TRIM 514 case NBD_CMD_TRIM: 515 rc = spdk_bdev_unmap(desc, ch, from_be64(&io->req.from), 516 from_be32(&io->req.len), nbd_io_done, io); 517 break; 518 #endif 519 default: 520 rc = -1; 521 } 522 523 if (rc < 0) { 524 if (rc == -ENOMEM) { 525 SPDK_INFOLOG(nbd, "No memory, start to queue io.\n"); 526 nbd_queue_io(io); 527 } else { 528 SPDK_ERRLOG("nbd io failed in nbd_queue_io, rc=%d.\n", rc); 529 nbd_io_done(NULL, false, io); 530 } 531 } 532 533 return 0; 534 } 535 536 static int 537 nbd_io_exec(struct spdk_nbd_disk *nbd) 538 { 539 struct nbd_io *io, *io_tmp; 540 int io_count = 0; 541 int ret = 0; 542 543 TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) { 544 TAILQ_REMOVE(&nbd->received_io_list, io, tailq); 545 TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq); 546 ret = nbd_submit_bdev_io(nbd, io); 547 if (ret < 0) { 548 return ret; 549 } 550 551 io_count++; 552 } 553 554 return io_count; 555 } 556 557 static int 558 nbd_io_recv_internal(struct spdk_nbd_disk *nbd) 559 { 560 struct nbd_io *io; 561 int ret = 0; 562 int received = 0; 563 564 if (nbd->io_in_recv == NULL) { 565 nbd->io_in_recv = nbd_get_io(nbd); 566 if (!nbd->io_in_recv) { 567 return -ENOMEM; 568 } 569 } 570 571 io = nbd->io_in_recv; 572 573 if (io->state == NBD_IO_RECV_REQ) { 574 ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->req + io->offset, 575 sizeof(io->req) - io->offset, true); 576 if (ret < 0) { 577 nbd_put_io(nbd, io); 578 nbd->io_in_recv = NULL; 579 return ret; 580 } 581 582 io->offset += ret; 583 received = ret; 584 585 /* request is fully received */ 586 if (io->offset == sizeof(io->req)) { 587 io->offset = 0; 588 589 /* req magic check */ 590 if (from_be32(&io->req.magic) != NBD_REQUEST_MAGIC) { 591 SPDK_ERRLOG("invalid request magic\n"); 592 nbd_put_io(nbd, io); 593 nbd->io_in_recv = NULL; 594 return -EINVAL; 595 } 596 597 if (from_be32(&io->req.type) == NBD_CMD_DISC) { 598 nbd->is_closing = true; 599 nbd->io_in_recv = NULL; 600 if (nbd->interrupt_mode && TAILQ_EMPTY(&nbd->executed_io_list)) { 601 spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT); 602 } 603 nbd_put_io(nbd, io); 604 /* After receiving NBD_CMD_DISC, nbd will not receive any new commands */ 605 return received; 606 } 607 608 /* io except read/write should ignore payload */ 609 if (from_be32(&io->req.type) == NBD_CMD_WRITE || 610 from_be32(&io->req.type) == NBD_CMD_READ) { 611 io->payload_size = from_be32(&io->req.len); 612 } else { 613 io->payload_size = 0; 614 } 615 616 /* io payload allocate */ 617 if (io->payload_size) { 618 io->payload = spdk_malloc(io->payload_size, nbd->buf_align, NULL, 619 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 620 if (io->payload == NULL) { 621 SPDK_ERRLOG("could not allocate io->payload of size %d\n", io->payload_size); 622 nbd_put_io(nbd, io); 623 nbd->io_in_recv = NULL; 624 return -ENOMEM; 625 } 626 } else { 627 io->payload = NULL; 628 } 629 630 /* next io step */ 631 if (from_be32(&io->req.type) == NBD_CMD_WRITE) { 632 io->state = NBD_IO_RECV_PAYLOAD; 633 } else { 634 io->state = NBD_IO_XMIT_RESP; 635 if (spdk_likely((!nbd->is_closing) && nbd->is_started)) { 636 TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq); 637 } else { 638 TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq); 639 nbd_io_done(NULL, false, io); 640 } 641 nbd->io_in_recv = NULL; 642 } 643 } 644 } 645 646 if (io->state == NBD_IO_RECV_PAYLOAD) { 647 ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset, true); 648 if (ret < 0) { 649 nbd_put_io(nbd, io); 650 nbd->io_in_recv = NULL; 651 return ret; 652 } 653 654 io->offset += ret; 655 received += ret; 656 657 /* request payload is fully received */ 658 if (io->offset == io->payload_size) { 659 io->offset = 0; 660 io->state = NBD_IO_XMIT_RESP; 661 if (spdk_likely((!nbd->is_closing) && nbd->is_started)) { 662 TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq); 663 } else { 664 TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq); 665 nbd_io_done(NULL, false, io); 666 } 667 nbd->io_in_recv = NULL; 668 } 669 670 } 671 672 return received; 673 } 674 675 static int 676 nbd_io_recv(struct spdk_nbd_disk *nbd) 677 { 678 int i, rc, ret = 0; 679 680 /* 681 * nbd server should not accept request after closing command 682 */ 683 if (nbd->is_closing) { 684 return 0; 685 } 686 687 for (i = 0; i < GET_IO_LOOP_COUNT; i++) { 688 rc = nbd_io_recv_internal(nbd); 689 if (rc < 0) { 690 return rc; 691 } 692 ret += rc; 693 if (nbd->is_closing) { 694 break; 695 } 696 } 697 698 return ret; 699 } 700 701 static int 702 nbd_io_xmit_internal(struct spdk_nbd_disk *nbd) 703 { 704 struct nbd_io *io; 705 int ret = 0; 706 int sent = 0; 707 708 io = TAILQ_FIRST(&nbd->executed_io_list); 709 if (io == NULL) { 710 return 0; 711 } 712 713 /* Remove IO from list now assuming it will be completed. It will be inserted 714 * back to the head if it cannot be completed. This approach is specifically 715 * taken to work around a scan-build use-after-free mischaracterization. 716 */ 717 TAILQ_REMOVE(&nbd->executed_io_list, io, tailq); 718 719 /* resp error and handler are already set in io_done */ 720 721 if (io->state == NBD_IO_XMIT_RESP) { 722 ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->resp + io->offset, 723 sizeof(io->resp) - io->offset, false); 724 if (ret <= 0) { 725 goto reinsert; 726 } 727 728 io->offset += ret; 729 sent = ret; 730 731 /* response is fully transmitted */ 732 if (io->offset == sizeof(io->resp)) { 733 io->offset = 0; 734 735 /* transmit payload only when NBD_CMD_READ with no resp error */ 736 if (from_be32(&io->req.type) != NBD_CMD_READ || io->resp.error != 0) { 737 nbd_put_io(nbd, io); 738 return 0; 739 } else { 740 io->state = NBD_IO_XMIT_PAYLOAD; 741 } 742 } 743 } 744 745 if (io->state == NBD_IO_XMIT_PAYLOAD) { 746 ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset, 747 false); 748 if (ret <= 0) { 749 goto reinsert; 750 } 751 752 io->offset += ret; 753 sent += ret; 754 755 /* read payload is fully transmitted */ 756 if (io->offset == io->payload_size) { 757 nbd_put_io(nbd, io); 758 return sent; 759 } 760 } 761 762 reinsert: 763 TAILQ_INSERT_HEAD(&nbd->executed_io_list, io, tailq); 764 return ret < 0 ? ret : sent; 765 } 766 767 static int 768 nbd_io_xmit(struct spdk_nbd_disk *nbd) 769 { 770 int ret = 0; 771 int rc; 772 773 while (!TAILQ_EMPTY(&nbd->executed_io_list)) { 774 rc = nbd_io_xmit_internal(nbd); 775 if (rc < 0) { 776 return rc; 777 } 778 779 ret += rc; 780 } 781 782 /* When there begins to have no executed_io, disable socket writable notice */ 783 if (nbd->interrupt_mode) { 784 spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN); 785 } 786 787 return ret; 788 } 789 790 /** 791 * Poll an NBD instance. 792 * 793 * \return 0 on success or negated errno values on error (e.g. connection closed). 794 */ 795 static int 796 _nbd_poll(struct spdk_nbd_disk *nbd) 797 { 798 int received, sent, executed; 799 800 /* transmit executed io first */ 801 sent = nbd_io_xmit(nbd); 802 if (sent < 0) { 803 return sent; 804 } 805 806 received = nbd_io_recv(nbd); 807 if (received < 0) { 808 return received; 809 } 810 811 executed = nbd_io_exec(nbd); 812 if (executed < 0) { 813 return executed; 814 } 815 816 return sent + received + executed; 817 } 818 819 static int 820 nbd_poll(void *arg) 821 { 822 struct spdk_nbd_disk *nbd = arg; 823 int rc; 824 825 rc = _nbd_poll(nbd); 826 if (rc < 0) { 827 SPDK_INFOLOG(nbd, "nbd_poll() returned %s (%d); closing connection\n", 828 spdk_strerror(-rc), rc); 829 _nbd_stop(nbd); 830 return SPDK_POLLER_IDLE; 831 } 832 if (nbd->is_closing && nbd->io_count == 0) { 833 spdk_nbd_stop(nbd); 834 } 835 836 return rc == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY; 837 } 838 839 struct spdk_nbd_start_ctx { 840 struct spdk_nbd_disk *nbd; 841 spdk_nbd_start_cb cb_fn; 842 void *cb_arg; 843 struct spdk_thread *thread; 844 }; 845 846 static void 847 nbd_start_complete(void *arg) 848 { 849 struct spdk_nbd_start_ctx *ctx = arg; 850 851 if (ctx->cb_fn) { 852 ctx->cb_fn(ctx->cb_arg, ctx->nbd, 0); 853 } 854 855 /* nbd will possibly receive stop command while initing */ 856 ctx->nbd->is_started = true; 857 858 free(ctx); 859 } 860 861 static void * 862 nbd_start_kernel(void *arg) 863 { 864 struct spdk_nbd_start_ctx *ctx = arg; 865 struct spdk_nbd_disk *nbd = ctx->nbd; 866 867 spdk_unaffinitize_thread(); 868 869 /* Send a message to complete the start context - this is the 870 * latest point we can do it, since the NBD_DO_IT ioctl will 871 * block in the kernel. 872 */ 873 spdk_thread_send_msg(ctx->thread, nbd_start_complete, ctx); 874 875 /* This will block in the kernel until we close the spdk_sp_fd. */ 876 ioctl(nbd->dev_fd, NBD_DO_IT); 877 878 nbd->has_nbd_pthread = false; 879 880 pthread_exit(NULL); 881 } 882 883 static void 884 nbd_bdev_hot_remove(struct spdk_nbd_disk *nbd) 885 { 886 struct nbd_io *io, *io_tmp; 887 888 nbd->is_closing = true; 889 nbd_cleanup_io(nbd); 890 891 TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) { 892 TAILQ_REMOVE(&nbd->received_io_list, io, tailq); 893 nbd_io_done(NULL, false, io); 894 } 895 } 896 897 static void 898 nbd_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, 899 void *event_ctx) 900 { 901 switch (type) { 902 case SPDK_BDEV_EVENT_REMOVE: 903 nbd_bdev_hot_remove(event_ctx); 904 break; 905 default: 906 SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); 907 break; 908 } 909 } 910 911 static void 912 nbd_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 913 { 914 struct spdk_nbd_disk *nbd = cb_arg; 915 916 nbd->interrupt_mode = interrupt_mode; 917 } 918 919 static void 920 nbd_start_continue(struct spdk_nbd_start_ctx *ctx) 921 { 922 int rc; 923 pthread_t tid; 924 unsigned long nbd_flags = 0; 925 926 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_BLKSIZE, spdk_bdev_get_block_size(ctx->nbd->bdev)); 927 if (rc == -1) { 928 SPDK_ERRLOG("ioctl(NBD_SET_BLKSIZE) failed: %s\n", spdk_strerror(errno)); 929 rc = -errno; 930 goto err; 931 } 932 933 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SIZE_BLOCKS, spdk_bdev_get_num_blocks(ctx->nbd->bdev)); 934 if (rc == -1) { 935 SPDK_ERRLOG("ioctl(NBD_SET_SIZE_BLOCKS) failed: %s\n", spdk_strerror(errno)); 936 rc = -errno; 937 goto err; 938 } 939 940 #ifdef NBD_SET_TIMEOUT 941 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_TIMEOUT, NBD_IO_TIMEOUT_S); 942 if (rc == -1) { 943 SPDK_ERRLOG("ioctl(NBD_SET_TIMEOUT) failed: %s\n", spdk_strerror(errno)); 944 rc = -errno; 945 goto err; 946 } 947 #else 948 SPDK_NOTICELOG("ioctl(NBD_SET_TIMEOUT) is not supported.\n"); 949 #endif 950 951 #ifdef NBD_FLAG_SEND_FLUSH 952 if (spdk_bdev_io_type_supported(ctx->nbd->bdev, SPDK_BDEV_IO_TYPE_FLUSH)) { 953 nbd_flags |= NBD_FLAG_SEND_FLUSH; 954 } 955 #endif 956 #ifdef NBD_FLAG_SEND_TRIM 957 if (spdk_bdev_io_type_supported(ctx->nbd->bdev, SPDK_BDEV_IO_TYPE_UNMAP)) { 958 nbd_flags |= NBD_FLAG_SEND_TRIM; 959 } 960 #endif 961 962 if (nbd_flags) { 963 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_FLAGS, nbd_flags); 964 if (rc == -1) { 965 SPDK_ERRLOG("ioctl(NBD_SET_FLAGS, 0x%lx) failed: %s\n", nbd_flags, spdk_strerror(errno)); 966 rc = -errno; 967 goto err; 968 } 969 } 970 971 ctx->nbd->has_nbd_pthread = true; 972 rc = pthread_create(&tid, NULL, nbd_start_kernel, ctx); 973 if (rc != 0) { 974 ctx->nbd->has_nbd_pthread = false; 975 SPDK_ERRLOG("could not create thread: %s\n", spdk_strerror(rc)); 976 rc = -rc; 977 goto err; 978 } 979 980 rc = pthread_detach(tid); 981 if (rc != 0) { 982 SPDK_ERRLOG("could not detach thread for nbd kernel: %s\n", spdk_strerror(rc)); 983 rc = -rc; 984 goto err; 985 } 986 987 if (spdk_interrupt_mode_is_enabled()) { 988 ctx->nbd->intr = SPDK_INTERRUPT_REGISTER(ctx->nbd->spdk_sp_fd, nbd_poll, ctx->nbd); 989 } 990 991 ctx->nbd->nbd_poller = SPDK_POLLER_REGISTER(nbd_poll, ctx->nbd, 0); 992 spdk_poller_register_interrupt(ctx->nbd->nbd_poller, nbd_poller_set_interrupt_mode, ctx->nbd); 993 return; 994 995 err: 996 _nbd_stop(ctx->nbd); 997 if (ctx->cb_fn) { 998 ctx->cb_fn(ctx->cb_arg, NULL, rc); 999 } 1000 free(ctx); 1001 } 1002 1003 static int 1004 nbd_enable_kernel(void *arg) 1005 { 1006 struct spdk_nbd_start_ctx *ctx = arg; 1007 int rc; 1008 1009 /* Declare device setup by this process */ 1010 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SOCK, ctx->nbd->kernel_sp_fd); 1011 1012 if (rc) { 1013 if (errno == EBUSY) { 1014 if (ctx->nbd->retry_poller == NULL) { 1015 ctx->nbd->retry_count = NBD_START_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US; 1016 ctx->nbd->retry_poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx, 1017 NBD_BUSY_POLLING_INTERVAL_US); 1018 return SPDK_POLLER_BUSY; 1019 } else if (ctx->nbd->retry_count-- > 0) { 1020 /* Repeatedly unregister and register retry poller to avoid scan-build error */ 1021 spdk_poller_unregister(&ctx->nbd->retry_poller); 1022 ctx->nbd->retry_poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx, 1023 NBD_BUSY_POLLING_INTERVAL_US); 1024 return SPDK_POLLER_BUSY; 1025 } 1026 } 1027 1028 SPDK_ERRLOG("ioctl(NBD_SET_SOCK) failed: %s\n", spdk_strerror(errno)); 1029 if (ctx->nbd->retry_poller) { 1030 spdk_poller_unregister(&ctx->nbd->retry_poller); 1031 } 1032 1033 _nbd_stop(ctx->nbd); 1034 1035 if (ctx->cb_fn) { 1036 ctx->cb_fn(ctx->cb_arg, NULL, -errno); 1037 } 1038 1039 free(ctx); 1040 return SPDK_POLLER_BUSY; 1041 } 1042 1043 if (ctx->nbd->retry_poller) { 1044 spdk_poller_unregister(&ctx->nbd->retry_poller); 1045 } 1046 1047 nbd_start_continue(ctx); 1048 1049 return SPDK_POLLER_BUSY; 1050 } 1051 1052 void 1053 spdk_nbd_start(const char *bdev_name, const char *nbd_path, 1054 spdk_nbd_start_cb cb_fn, void *cb_arg) 1055 { 1056 struct spdk_nbd_start_ctx *ctx = NULL; 1057 struct spdk_nbd_disk *nbd = NULL; 1058 struct spdk_bdev *bdev; 1059 int rc; 1060 int sp[2]; 1061 1062 nbd = calloc(1, sizeof(*nbd)); 1063 if (nbd == NULL) { 1064 rc = -ENOMEM; 1065 goto err; 1066 } 1067 1068 nbd->dev_fd = -1; 1069 nbd->spdk_sp_fd = -1; 1070 nbd->kernel_sp_fd = -1; 1071 1072 ctx = calloc(1, sizeof(*ctx)); 1073 if (ctx == NULL) { 1074 rc = -ENOMEM; 1075 goto err; 1076 } 1077 1078 ctx->nbd = nbd; 1079 ctx->cb_fn = cb_fn; 1080 ctx->cb_arg = cb_arg; 1081 ctx->thread = spdk_get_thread(); 1082 1083 rc = spdk_bdev_open_ext(bdev_name, true, nbd_bdev_event_cb, nbd, &nbd->bdev_desc); 1084 if (rc != 0) { 1085 SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc); 1086 goto err; 1087 } 1088 1089 bdev = spdk_bdev_desc_get_bdev(nbd->bdev_desc); 1090 nbd->bdev = bdev; 1091 1092 nbd->ch = spdk_bdev_get_io_channel(nbd->bdev_desc); 1093 nbd->buf_align = spdk_max(spdk_bdev_get_buf_align(bdev), 64); 1094 1095 rc = socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, sp); 1096 if (rc != 0) { 1097 SPDK_ERRLOG("socketpair failed\n"); 1098 rc = -errno; 1099 goto err; 1100 } 1101 1102 nbd->spdk_sp_fd = sp[0]; 1103 nbd->kernel_sp_fd = sp[1]; 1104 nbd->nbd_path = strdup(nbd_path); 1105 if (!nbd->nbd_path) { 1106 SPDK_ERRLOG("strdup allocation failure\n"); 1107 rc = -ENOMEM; 1108 goto err; 1109 } 1110 1111 TAILQ_INIT(&nbd->received_io_list); 1112 TAILQ_INIT(&nbd->executed_io_list); 1113 TAILQ_INIT(&nbd->processing_io_list); 1114 1115 /* Add nbd_disk to the end of disk list */ 1116 rc = nbd_disk_register(ctx->nbd); 1117 if (rc != 0) { 1118 goto err; 1119 } 1120 1121 nbd->dev_fd = open(nbd_path, O_RDWR | O_DIRECT); 1122 if (nbd->dev_fd == -1) { 1123 SPDK_ERRLOG("open(\"%s\") failed: %s\n", nbd_path, spdk_strerror(errno)); 1124 rc = -errno; 1125 goto err; 1126 } 1127 1128 SPDK_INFOLOG(nbd, "Enabling kernel access to bdev %s via %s\n", 1129 bdev_name, nbd_path); 1130 1131 nbd_enable_kernel(ctx); 1132 return; 1133 1134 err: 1135 free(ctx); 1136 if (nbd) { 1137 _nbd_stop(nbd); 1138 } 1139 1140 if (cb_fn) { 1141 cb_fn(cb_arg, NULL, rc); 1142 } 1143 } 1144 1145 const char * 1146 spdk_nbd_get_path(struct spdk_nbd_disk *nbd) 1147 { 1148 return nbd->nbd_path; 1149 } 1150 1151 SPDK_LOG_REGISTER_COMPONENT(nbd) 1152