1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/string.h" 36 37 #include <linux/nbd.h> 38 39 #include "spdk/nbd.h" 40 #include "nbd_internal.h" 41 #include "spdk/bdev.h" 42 #include "spdk/endian.h" 43 #include "spdk/env.h" 44 #include "spdk/log.h" 45 #include "spdk/util.h" 46 #include "spdk/thread.h" 47 48 #include "spdk/queue.h" 49 50 #define GET_IO_LOOP_COUNT 16 51 #define NBD_BUSY_WAITING_MS 1000 52 #define NBD_BUSY_POLLING_INTERVAL_US 20000 53 54 enum nbd_io_state_t { 55 /* Receiving or ready to receive nbd request header */ 56 NBD_IO_RECV_REQ = 0, 57 /* Receiving write payload */ 58 NBD_IO_RECV_PAYLOAD, 59 /* Transmitting or ready to transmit nbd response header */ 60 NBD_IO_XMIT_RESP, 61 /* Transmitting read payload */ 62 NBD_IO_XMIT_PAYLOAD, 63 }; 64 65 struct nbd_io { 66 struct spdk_nbd_disk *nbd; 67 enum nbd_io_state_t state; 68 69 void *payload; 70 uint32_t payload_size; 71 72 struct nbd_request req; 73 struct nbd_reply resp; 74 75 /* 76 * Tracks current progress on reading/writing a request, 77 * response, or payload from the nbd socket. 78 */ 79 uint32_t offset; 80 81 /* for bdev io_wait */ 82 struct spdk_bdev_io_wait_entry bdev_io_wait; 83 84 TAILQ_ENTRY(nbd_io) tailq; 85 }; 86 87 enum nbd_disk_state_t { 88 NBD_DISK_STATE_RUNNING = 0, 89 /* soft disconnection caused by receiving nbd_cmd_disc */ 90 NBD_DISK_STATE_SOFTDISC, 91 /* hard disconnection caused by mandatory conditions */ 92 NBD_DISK_STATE_HARDDISC, 93 }; 94 95 struct spdk_nbd_disk { 96 struct spdk_bdev *bdev; 97 struct spdk_bdev_desc *bdev_desc; 98 struct spdk_io_channel *ch; 99 int dev_fd; 100 char *nbd_path; 101 int kernel_sp_fd; 102 int spdk_sp_fd; 103 struct spdk_poller *nbd_poller; 104 uint32_t buf_align; 105 106 struct nbd_io *io_in_recv; 107 TAILQ_HEAD(, nbd_io) received_io_list; 108 TAILQ_HEAD(, nbd_io) executed_io_list; 109 110 enum nbd_disk_state_t state; 111 /* count of nbd_io in spdk_nbd_disk */ 112 int io_count; 113 114 TAILQ_ENTRY(spdk_nbd_disk) tailq; 115 }; 116 117 struct spdk_nbd_disk_globals { 118 TAILQ_HEAD(, spdk_nbd_disk) disk_head; 119 }; 120 121 static struct spdk_nbd_disk_globals g_spdk_nbd; 122 123 static int 124 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io); 125 126 int 127 spdk_nbd_init(void) 128 { 129 TAILQ_INIT(&g_spdk_nbd.disk_head); 130 131 return 0; 132 } 133 134 void 135 spdk_nbd_fini(void) 136 { 137 struct spdk_nbd_disk *nbd_idx, *nbd_tmp; 138 139 /* 140 * Stop running spdk_nbd_disk. 141 * Here, nbd removing are unnecessary, but _SAFE variant 142 * is needed, since internal nbd_disk_unregister will 143 * remove nbd from TAILQ. 144 */ 145 TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) { 146 spdk_nbd_stop(nbd_idx); 147 } 148 } 149 150 static int 151 nbd_disk_register(struct spdk_nbd_disk *nbd) 152 { 153 if (nbd_disk_find_by_nbd_path(nbd->nbd_path)) { 154 SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path); 155 return -EBUSY; 156 } 157 158 TAILQ_INSERT_TAIL(&g_spdk_nbd.disk_head, nbd, tailq); 159 160 return 0; 161 } 162 163 static void 164 nbd_disk_unregister(struct spdk_nbd_disk *nbd) 165 { 166 struct spdk_nbd_disk *nbd_idx, *nbd_tmp; 167 168 /* 169 * nbd disk may be stopped before registered. 170 * check whether it was registered. 171 */ 172 TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) { 173 if (nbd == nbd_idx) { 174 TAILQ_REMOVE(&g_spdk_nbd.disk_head, nbd_idx, tailq); 175 break; 176 } 177 } 178 } 179 180 struct spdk_nbd_disk * 181 nbd_disk_find_by_nbd_path(const char *nbd_path) 182 { 183 struct spdk_nbd_disk *nbd; 184 185 /* 186 * check whether nbd has already been registered by nbd path. 187 */ 188 TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) { 189 if (!strcmp(nbd->nbd_path, nbd_path)) { 190 return nbd; 191 } 192 } 193 194 return NULL; 195 } 196 197 struct spdk_nbd_disk *nbd_disk_first(void) 198 { 199 return TAILQ_FIRST(&g_spdk_nbd.disk_head); 200 } 201 202 struct spdk_nbd_disk *nbd_disk_next(struct spdk_nbd_disk *prev) 203 { 204 return TAILQ_NEXT(prev, tailq); 205 } 206 207 const char * 208 nbd_disk_get_nbd_path(struct spdk_nbd_disk *nbd) 209 { 210 return nbd->nbd_path; 211 } 212 213 const char * 214 nbd_disk_get_bdev_name(struct spdk_nbd_disk *nbd) 215 { 216 return spdk_bdev_get_name(nbd->bdev); 217 } 218 219 void 220 spdk_nbd_write_config_json(struct spdk_json_write_ctx *w) 221 { 222 struct spdk_nbd_disk *nbd; 223 224 spdk_json_write_array_begin(w); 225 226 TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) { 227 spdk_json_write_object_begin(w); 228 229 spdk_json_write_named_string(w, "method", "nbd_start_disk"); 230 231 spdk_json_write_named_object_begin(w, "params"); 232 spdk_json_write_named_string(w, "nbd_device", nbd_disk_get_nbd_path(nbd)); 233 spdk_json_write_named_string(w, "bdev_name", nbd_disk_get_bdev_name(nbd)); 234 spdk_json_write_object_end(w); 235 236 spdk_json_write_object_end(w); 237 } 238 239 spdk_json_write_array_end(w); 240 } 241 242 void 243 nbd_disconnect(struct spdk_nbd_disk *nbd) 244 { 245 /* 246 * nbd soft-disconnection to terminate transmission phase. 247 * After receiving this ioctl command, nbd kernel module will send 248 * a NBD_CMD_DISC type io to nbd server in order to inform server. 249 */ 250 ioctl(nbd->dev_fd, NBD_DISCONNECT); 251 } 252 253 static struct nbd_io * 254 nbd_get_io(struct spdk_nbd_disk *nbd) 255 { 256 struct nbd_io *io; 257 258 io = calloc(1, sizeof(*io)); 259 if (!io) { 260 return NULL; 261 } 262 263 io->nbd = nbd; 264 to_be32(&io->resp.magic, NBD_REPLY_MAGIC); 265 266 nbd->io_count++; 267 268 return io; 269 } 270 271 static void 272 nbd_put_io(struct spdk_nbd_disk *nbd, struct nbd_io *io) 273 { 274 if (io->payload) { 275 spdk_free(io->payload); 276 } 277 free(io); 278 279 nbd->io_count--; 280 } 281 282 /* 283 * Check whether received nbd_io are all transmitted. 284 * 285 * \return 1 there is still some nbd_io not transmitted. 286 * 0 all nbd_io received are transmitted. 287 */ 288 static int 289 nbd_io_xmit_check(struct spdk_nbd_disk *nbd) 290 { 291 if (nbd->io_count == 0) { 292 return 0; 293 } else if (nbd->io_count == 1 && nbd->io_in_recv != NULL) { 294 return 0; 295 } 296 297 return 1; 298 } 299 300 /* 301 * Check whether received nbd_io are all executed, 302 * and put back executed nbd_io instead of transmitting them 303 * 304 * \return 1 there is still some nbd_io under executing 305 * 0 all nbd_io gotten are freed. 306 */ 307 static int 308 nbd_cleanup_io(struct spdk_nbd_disk *nbd) 309 { 310 struct nbd_io *io, *io_tmp; 311 312 /* free io_in_recv */ 313 if (nbd->io_in_recv != NULL) { 314 nbd_put_io(nbd, nbd->io_in_recv); 315 nbd->io_in_recv = NULL; 316 } 317 318 /* free io in received_io_list */ 319 if (!TAILQ_EMPTY(&nbd->received_io_list)) { 320 TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) { 321 TAILQ_REMOVE(&nbd->received_io_list, io, tailq); 322 nbd_put_io(nbd, io); 323 } 324 } 325 326 /* free io in executed_io_list */ 327 if (!TAILQ_EMPTY(&nbd->executed_io_list)) { 328 TAILQ_FOREACH_SAFE(io, &nbd->executed_io_list, tailq, io_tmp) { 329 TAILQ_REMOVE(&nbd->executed_io_list, io, tailq); 330 nbd_put_io(nbd, io); 331 } 332 } 333 334 /* 335 * Some nbd_io may be under executing in bdev. 336 * Wait for their done operation. 337 */ 338 if (nbd->io_count != 0) { 339 return 1; 340 } 341 342 return 0; 343 } 344 345 static void 346 _nbd_stop(struct spdk_nbd_disk *nbd) 347 { 348 if (nbd->ch) { 349 spdk_put_io_channel(nbd->ch); 350 } 351 352 if (nbd->bdev_desc) { 353 spdk_bdev_close(nbd->bdev_desc); 354 } 355 356 if (nbd->spdk_sp_fd >= 0) { 357 close(nbd->spdk_sp_fd); 358 } 359 360 if (nbd->kernel_sp_fd >= 0) { 361 close(nbd->kernel_sp_fd); 362 } 363 364 if (nbd->dev_fd >= 0) { 365 /* Clear nbd device only if it is occupied by SPDK app */ 366 if (nbd->nbd_path && nbd_disk_find_by_nbd_path(nbd->nbd_path)) { 367 ioctl(nbd->dev_fd, NBD_CLEAR_QUE); 368 ioctl(nbd->dev_fd, NBD_CLEAR_SOCK); 369 } 370 close(nbd->dev_fd); 371 } 372 373 if (nbd->nbd_path) { 374 free(nbd->nbd_path); 375 } 376 377 if (nbd->nbd_poller) { 378 spdk_poller_unregister(&nbd->nbd_poller); 379 } 380 381 nbd_disk_unregister(nbd); 382 383 free(nbd); 384 } 385 386 void 387 spdk_nbd_stop(struct spdk_nbd_disk *nbd) 388 { 389 if (nbd == NULL) { 390 return; 391 } 392 393 nbd->state = NBD_DISK_STATE_HARDDISC; 394 395 /* 396 * Stop action should be called only after all nbd_io are executed. 397 */ 398 if (!nbd_cleanup_io(nbd)) { 399 _nbd_stop(nbd); 400 } 401 } 402 403 static int64_t 404 read_from_socket(int fd, void *buf, size_t length) 405 { 406 ssize_t bytes_read; 407 408 bytes_read = read(fd, buf, length); 409 if (bytes_read == 0) { 410 return -EIO; 411 } else if (bytes_read == -1) { 412 if (errno != EAGAIN) { 413 return -errno; 414 } 415 return 0; 416 } else { 417 return bytes_read; 418 } 419 } 420 421 static int64_t 422 write_to_socket(int fd, void *buf, size_t length) 423 { 424 ssize_t bytes_written; 425 426 bytes_written = write(fd, buf, length); 427 if (bytes_written == 0) { 428 return -EIO; 429 } else if (bytes_written == -1) { 430 if (errno != EAGAIN) { 431 return -errno; 432 } 433 return 0; 434 } else { 435 return bytes_written; 436 } 437 } 438 439 static void 440 nbd_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 441 { 442 struct nbd_io *io = cb_arg; 443 struct spdk_nbd_disk *nbd = io->nbd; 444 445 if (success) { 446 io->resp.error = 0; 447 } else { 448 to_be32(&io->resp.error, EIO); 449 } 450 451 memcpy(&io->resp.handle, &io->req.handle, sizeof(io->resp.handle)); 452 TAILQ_INSERT_TAIL(&nbd->executed_io_list, io, tailq); 453 454 if (bdev_io != NULL) { 455 spdk_bdev_free_io(bdev_io); 456 } 457 458 if (nbd->state == NBD_DISK_STATE_HARDDISC && !nbd_cleanup_io(nbd)) { 459 _nbd_stop(nbd); 460 } 461 } 462 463 static void 464 nbd_resubmit_io(void *arg) 465 { 466 struct nbd_io *io = (struct nbd_io *)arg; 467 struct spdk_nbd_disk *nbd = io->nbd; 468 int rc = 0; 469 470 rc = nbd_submit_bdev_io(nbd, io); 471 if (rc) { 472 SPDK_INFOLOG(nbd, "nbd: io resubmit for dev %s , io_type %d, returned %d.\n", 473 nbd_disk_get_bdev_name(nbd), from_be32(&io->req.type), rc); 474 } 475 } 476 477 static void 478 nbd_queue_io(struct nbd_io *io) 479 { 480 int rc; 481 struct spdk_bdev *bdev = io->nbd->bdev; 482 483 io->bdev_io_wait.bdev = bdev; 484 io->bdev_io_wait.cb_fn = nbd_resubmit_io; 485 io->bdev_io_wait.cb_arg = io; 486 487 rc = spdk_bdev_queue_io_wait(bdev, io->nbd->ch, &io->bdev_io_wait); 488 if (rc != 0) { 489 SPDK_ERRLOG("Queue io failed in nbd_queue_io, rc=%d.\n", rc); 490 nbd_io_done(NULL, false, io); 491 } 492 } 493 494 static int 495 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io) 496 { 497 struct spdk_bdev_desc *desc = nbd->bdev_desc; 498 struct spdk_io_channel *ch = nbd->ch; 499 int rc = 0; 500 501 switch (from_be32(&io->req.type)) { 502 case NBD_CMD_READ: 503 rc = spdk_bdev_read(desc, ch, io->payload, from_be64(&io->req.from), 504 io->payload_size, nbd_io_done, io); 505 break; 506 case NBD_CMD_WRITE: 507 rc = spdk_bdev_write(desc, ch, io->payload, from_be64(&io->req.from), 508 io->payload_size, nbd_io_done, io); 509 break; 510 #ifdef NBD_FLAG_SEND_FLUSH 511 case NBD_CMD_FLUSH: 512 rc = spdk_bdev_flush(desc, ch, 0, 513 spdk_bdev_get_num_blocks(nbd->bdev) * spdk_bdev_get_block_size(nbd->bdev), 514 nbd_io_done, io); 515 break; 516 #endif 517 #ifdef NBD_FLAG_SEND_TRIM 518 case NBD_CMD_TRIM: 519 rc = spdk_bdev_unmap(desc, ch, from_be64(&io->req.from), 520 from_be32(&io->req.len), nbd_io_done, io); 521 break; 522 #endif 523 case NBD_CMD_DISC: 524 nbd_put_io(nbd, io); 525 nbd->state = NBD_DISK_STATE_SOFTDISC; 526 break; 527 default: 528 rc = -1; 529 } 530 531 if (rc < 0) { 532 if (rc == -ENOMEM) { 533 SPDK_INFOLOG(nbd, "No memory, start to queue io.\n"); 534 nbd_queue_io(io); 535 } else { 536 SPDK_ERRLOG("nbd io failed in nbd_queue_io, rc=%d.\n", rc); 537 nbd_io_done(NULL, false, io); 538 } 539 } 540 541 return 0; 542 } 543 544 static int 545 nbd_io_exec(struct spdk_nbd_disk *nbd) 546 { 547 struct nbd_io *io, *io_tmp; 548 int io_count = 0; 549 int ret = 0; 550 551 /* 552 * For soft disconnection, nbd server must handle all outstanding 553 * request before closing connection. 554 */ 555 if (nbd->state == NBD_DISK_STATE_HARDDISC) { 556 return 0; 557 } 558 559 if (!TAILQ_EMPTY(&nbd->received_io_list)) { 560 TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) { 561 TAILQ_REMOVE(&nbd->received_io_list, io, tailq); 562 ret = nbd_submit_bdev_io(nbd, io); 563 if (ret < 0) { 564 return ret; 565 } 566 567 io_count++; 568 } 569 } 570 571 return io_count; 572 } 573 574 static int 575 nbd_io_recv_internal(struct spdk_nbd_disk *nbd) 576 { 577 struct nbd_io *io; 578 int ret = 0; 579 int received = 0; 580 581 if (nbd->io_in_recv == NULL) { 582 nbd->io_in_recv = nbd_get_io(nbd); 583 if (!nbd->io_in_recv) { 584 return -ENOMEM; 585 } 586 } 587 588 io = nbd->io_in_recv; 589 590 if (io->state == NBD_IO_RECV_REQ) { 591 ret = read_from_socket(nbd->spdk_sp_fd, (char *)&io->req + io->offset, 592 sizeof(io->req) - io->offset); 593 if (ret < 0) { 594 nbd_put_io(nbd, io); 595 nbd->io_in_recv = NULL; 596 return ret; 597 } 598 599 io->offset += ret; 600 received = ret; 601 602 /* request is fully received */ 603 if (io->offset == sizeof(io->req)) { 604 io->offset = 0; 605 606 /* req magic check */ 607 if (from_be32(&io->req.magic) != NBD_REQUEST_MAGIC) { 608 SPDK_ERRLOG("invalid request magic\n"); 609 nbd_put_io(nbd, io); 610 nbd->io_in_recv = NULL; 611 return -EINVAL; 612 } 613 614 /* io except read/write should ignore payload */ 615 if (from_be32(&io->req.type) == NBD_CMD_WRITE || 616 from_be32(&io->req.type) == NBD_CMD_READ) { 617 io->payload_size = from_be32(&io->req.len); 618 } else { 619 io->payload_size = 0; 620 } 621 622 /* io payload allocate */ 623 if (io->payload_size) { 624 io->payload = spdk_malloc(io->payload_size, nbd->buf_align, NULL, 625 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 626 if (io->payload == NULL) { 627 SPDK_ERRLOG("could not allocate io->payload of size %d\n", io->payload_size); 628 nbd_put_io(nbd, io); 629 nbd->io_in_recv = NULL; 630 return -ENOMEM; 631 } 632 } else { 633 io->payload = NULL; 634 } 635 636 /* next io step */ 637 if (from_be32(&io->req.type) == NBD_CMD_WRITE) { 638 io->state = NBD_IO_RECV_PAYLOAD; 639 } else { 640 io->state = NBD_IO_XMIT_RESP; 641 nbd->io_in_recv = NULL; 642 TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq); 643 } 644 } 645 } 646 647 if (io->state == NBD_IO_RECV_PAYLOAD) { 648 ret = read_from_socket(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset); 649 if (ret < 0) { 650 nbd_put_io(nbd, io); 651 nbd->io_in_recv = NULL; 652 return ret; 653 } 654 655 io->offset += ret; 656 received += ret; 657 658 /* request payload is fully received */ 659 if (io->offset == io->payload_size) { 660 io->offset = 0; 661 io->state = NBD_IO_XMIT_RESP; 662 nbd->io_in_recv = NULL; 663 TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq); 664 } 665 666 } 667 668 return received; 669 } 670 671 static int 672 nbd_io_recv(struct spdk_nbd_disk *nbd) 673 { 674 int i, rc, ret = 0; 675 676 /* 677 * nbd server should not accept request in both soft and hard 678 * disconnect states. 679 */ 680 if (nbd->state != NBD_DISK_STATE_RUNNING) { 681 return 0; 682 } 683 684 for (i = 0; i < GET_IO_LOOP_COUNT; i++) { 685 rc = nbd_io_recv_internal(nbd); 686 if (rc < 0) { 687 return rc; 688 } 689 ret += rc; 690 } 691 692 return ret; 693 } 694 695 static int 696 nbd_io_xmit_internal(struct spdk_nbd_disk *nbd) 697 { 698 struct nbd_io *io; 699 int ret = 0; 700 int sent = 0; 701 702 io = TAILQ_FIRST(&nbd->executed_io_list); 703 if (io == NULL) { 704 return 0; 705 } 706 707 /* Remove IO from list now assuming it will be completed. It will be inserted 708 * back to the head if it cannot be completed. This approach is specifically 709 * taken to work around a scan-build use-after-free mischaracterization. 710 */ 711 TAILQ_REMOVE(&nbd->executed_io_list, io, tailq); 712 713 /* resp error and handler are already set in io_done */ 714 715 if (io->state == NBD_IO_XMIT_RESP) { 716 ret = write_to_socket(nbd->spdk_sp_fd, (char *)&io->resp + io->offset, 717 sizeof(io->resp) - io->offset); 718 if (ret <= 0) { 719 goto reinsert; 720 } 721 722 io->offset += ret; 723 sent = ret; 724 725 /* response is fully transmitted */ 726 if (io->offset == sizeof(io->resp)) { 727 io->offset = 0; 728 729 /* transmit payload only when NBD_CMD_READ with no resp error */ 730 if (from_be32(&io->req.type) != NBD_CMD_READ || io->resp.error != 0) { 731 nbd_put_io(nbd, io); 732 return 0; 733 } else { 734 io->state = NBD_IO_XMIT_PAYLOAD; 735 } 736 } 737 } 738 739 if (io->state == NBD_IO_XMIT_PAYLOAD) { 740 ret = write_to_socket(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset); 741 if (ret <= 0) { 742 goto reinsert; 743 } 744 745 io->offset += ret; 746 sent += ret; 747 748 /* read payload is fully transmitted */ 749 if (io->offset == io->payload_size) { 750 nbd_put_io(nbd, io); 751 return sent; 752 } 753 } 754 755 reinsert: 756 TAILQ_INSERT_HEAD(&nbd->executed_io_list, io, tailq); 757 return ret < 0 ? ret : sent; 758 } 759 760 static int 761 nbd_io_xmit(struct spdk_nbd_disk *nbd) 762 { 763 int ret = 0; 764 int rc; 765 766 /* 767 * For soft disconnection, nbd server must handle all outstanding 768 * request before closing connection. 769 */ 770 if (nbd->state == NBD_DISK_STATE_HARDDISC) { 771 return 0; 772 } 773 774 while (!TAILQ_EMPTY(&nbd->executed_io_list)) { 775 rc = nbd_io_xmit_internal(nbd); 776 if (rc < 0) { 777 return rc; 778 } 779 780 ret += rc; 781 } 782 783 /* 784 * For soft disconnection, nbd server can close connection after all 785 * outstanding request are transmitted. 786 */ 787 if (nbd->state == NBD_DISK_STATE_SOFTDISC && !nbd_io_xmit_check(nbd)) { 788 return -1; 789 } 790 791 return ret; 792 } 793 794 /** 795 * Poll an NBD instance. 796 * 797 * \return 0 on success or negated errno values on error (e.g. connection closed). 798 */ 799 static int 800 _nbd_poll(struct spdk_nbd_disk *nbd) 801 { 802 int received, sent, executed; 803 804 /* transmit executed io first */ 805 sent = nbd_io_xmit(nbd); 806 if (sent < 0) { 807 return sent; 808 } 809 810 received = nbd_io_recv(nbd); 811 if (received < 0) { 812 return received; 813 } 814 815 executed = nbd_io_exec(nbd); 816 if (executed < 0) { 817 return executed; 818 } 819 820 return sent + received + executed; 821 } 822 823 static int 824 nbd_poll(void *arg) 825 { 826 struct spdk_nbd_disk *nbd = arg; 827 int rc; 828 829 rc = _nbd_poll(nbd); 830 if (rc < 0) { 831 SPDK_INFOLOG(nbd, "nbd_poll() returned %s (%d); closing connection\n", 832 spdk_strerror(-rc), rc); 833 spdk_nbd_stop(nbd); 834 } 835 836 return rc > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE; 837 } 838 839 static void * 840 nbd_start_kernel(void *arg) 841 { 842 int dev_fd = (int)(intptr_t)arg; 843 844 spdk_unaffinitize_thread(); 845 846 /* This will block in the kernel until we close the spdk_sp_fd. */ 847 ioctl(dev_fd, NBD_DO_IT); 848 849 pthread_exit(NULL); 850 } 851 852 static void 853 nbd_bdev_hot_remove(void *remove_ctx) 854 { 855 struct spdk_nbd_disk *nbd = remove_ctx; 856 857 spdk_nbd_stop(nbd); 858 } 859 860 struct spdk_nbd_start_ctx { 861 struct spdk_nbd_disk *nbd; 862 spdk_nbd_start_cb cb_fn; 863 void *cb_arg; 864 struct spdk_poller *poller; 865 int polling_count; 866 }; 867 868 static void 869 nbd_start_complete(struct spdk_nbd_start_ctx *ctx) 870 { 871 int rc; 872 pthread_t tid; 873 int flag; 874 875 /* Add nbd_disk to the end of disk list */ 876 rc = nbd_disk_register(ctx->nbd); 877 if (rc != 0) { 878 SPDK_ERRLOG("Failed to register %s, it should not happen.\n", ctx->nbd->nbd_path); 879 assert(false); 880 goto err; 881 } 882 883 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_BLKSIZE, spdk_bdev_get_block_size(ctx->nbd->bdev)); 884 if (rc == -1) { 885 SPDK_ERRLOG("ioctl(NBD_SET_BLKSIZE) failed: %s\n", spdk_strerror(errno)); 886 rc = -errno; 887 goto err; 888 } 889 890 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SIZE_BLOCKS, spdk_bdev_get_num_blocks(ctx->nbd->bdev)); 891 if (rc == -1) { 892 SPDK_ERRLOG("ioctl(NBD_SET_SIZE_BLOCKS) failed: %s\n", spdk_strerror(errno)); 893 rc = -errno; 894 goto err; 895 } 896 897 #ifdef NBD_FLAG_SEND_TRIM 898 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_FLAGS, NBD_FLAG_SEND_TRIM); 899 if (rc == -1) { 900 SPDK_ERRLOG("ioctl(NBD_SET_FLAGS) failed: %s\n", spdk_strerror(errno)); 901 rc = -errno; 902 goto err; 903 } 904 #endif 905 906 rc = pthread_create(&tid, NULL, nbd_start_kernel, (void *)(intptr_t)ctx->nbd->dev_fd); 907 if (rc != 0) { 908 SPDK_ERRLOG("could not create thread: %s\n", spdk_strerror(rc)); 909 rc = -rc; 910 goto err; 911 } 912 913 rc = pthread_detach(tid); 914 if (rc != 0) { 915 SPDK_ERRLOG("could not detach thread for nbd kernel: %s\n", spdk_strerror(rc)); 916 rc = -rc; 917 goto err; 918 } 919 920 flag = fcntl(ctx->nbd->spdk_sp_fd, F_GETFL); 921 if (fcntl(ctx->nbd->spdk_sp_fd, F_SETFL, flag | O_NONBLOCK) < 0) { 922 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n", 923 ctx->nbd->spdk_sp_fd, spdk_strerror(errno)); 924 rc = -errno; 925 goto err; 926 } 927 928 ctx->nbd->nbd_poller = SPDK_POLLER_REGISTER(nbd_poll, ctx->nbd, 0); 929 930 if (ctx->cb_fn) { 931 ctx->cb_fn(ctx->cb_arg, ctx->nbd, 0); 932 } 933 934 free(ctx); 935 return; 936 937 err: 938 spdk_nbd_stop(ctx->nbd); 939 if (ctx->cb_fn) { 940 ctx->cb_fn(ctx->cb_arg, NULL, rc); 941 } 942 free(ctx); 943 } 944 945 static int 946 nbd_enable_kernel(void *arg) 947 { 948 struct spdk_nbd_start_ctx *ctx = arg; 949 int rc; 950 951 /* Declare device setup by this process */ 952 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SOCK, ctx->nbd->kernel_sp_fd); 953 if (rc == -1) { 954 if (errno == EBUSY && ctx->polling_count-- > 0) { 955 if (ctx->poller == NULL) { 956 ctx->poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx, 957 NBD_BUSY_POLLING_INTERVAL_US); 958 } 959 /* If the kernel is busy, check back later */ 960 return SPDK_POLLER_BUSY; 961 } 962 963 SPDK_ERRLOG("ioctl(NBD_SET_SOCK) failed: %s\n", spdk_strerror(errno)); 964 if (ctx->poller) { 965 spdk_poller_unregister(&ctx->poller); 966 } 967 968 spdk_nbd_stop(ctx->nbd); 969 970 if (ctx->cb_fn) { 971 ctx->cb_fn(ctx->cb_arg, NULL, -errno); 972 } 973 974 free(ctx); 975 return SPDK_POLLER_BUSY; 976 } 977 978 if (ctx->poller) { 979 spdk_poller_unregister(&ctx->poller); 980 } 981 982 nbd_start_complete(ctx); 983 984 return SPDK_POLLER_BUSY; 985 } 986 987 void 988 spdk_nbd_start(const char *bdev_name, const char *nbd_path, 989 spdk_nbd_start_cb cb_fn, void *cb_arg) 990 { 991 struct spdk_nbd_start_ctx *ctx = NULL; 992 struct spdk_nbd_disk *nbd = NULL; 993 struct spdk_bdev *bdev; 994 int rc; 995 int sp[2]; 996 997 bdev = spdk_bdev_get_by_name(bdev_name); 998 if (bdev == NULL) { 999 SPDK_ERRLOG("no bdev %s exists\n", bdev_name); 1000 rc = -EINVAL; 1001 goto err; 1002 } 1003 1004 nbd = calloc(1, sizeof(*nbd)); 1005 if (nbd == NULL) { 1006 rc = -ENOMEM; 1007 goto err; 1008 } 1009 1010 nbd->dev_fd = -1; 1011 nbd->spdk_sp_fd = -1; 1012 nbd->kernel_sp_fd = -1; 1013 1014 ctx = calloc(1, sizeof(*ctx)); 1015 if (ctx == NULL) { 1016 rc = -ENOMEM; 1017 goto err; 1018 } 1019 1020 ctx->nbd = nbd; 1021 ctx->cb_fn = cb_fn; 1022 ctx->cb_arg = cb_arg; 1023 ctx->polling_count = NBD_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US; 1024 1025 rc = spdk_bdev_open(bdev, true, nbd_bdev_hot_remove, nbd, &nbd->bdev_desc); 1026 if (rc != 0) { 1027 SPDK_ERRLOG("could not open bdev %s, error=%d\n", spdk_bdev_get_name(bdev), rc); 1028 goto err; 1029 } 1030 1031 nbd->bdev = bdev; 1032 1033 nbd->ch = spdk_bdev_get_io_channel(nbd->bdev_desc); 1034 nbd->buf_align = spdk_max(spdk_bdev_get_buf_align(bdev), 64); 1035 1036 rc = socketpair(AF_UNIX, SOCK_STREAM, 0, sp); 1037 if (rc != 0) { 1038 SPDK_ERRLOG("socketpair failed\n"); 1039 rc = -errno; 1040 goto err; 1041 } 1042 1043 nbd->spdk_sp_fd = sp[0]; 1044 nbd->kernel_sp_fd = sp[1]; 1045 nbd->nbd_path = strdup(nbd_path); 1046 if (!nbd->nbd_path) { 1047 SPDK_ERRLOG("strdup allocation failure\n"); 1048 rc = -ENOMEM; 1049 goto err; 1050 } 1051 1052 TAILQ_INIT(&nbd->received_io_list); 1053 TAILQ_INIT(&nbd->executed_io_list); 1054 1055 /* Make sure nbd_path is not used in this SPDK app */ 1056 if (nbd_disk_find_by_nbd_path(nbd->nbd_path)) { 1057 SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path); 1058 rc = -EBUSY; 1059 goto err; 1060 } 1061 1062 nbd->dev_fd = open(nbd_path, O_RDWR | O_DIRECT); 1063 if (nbd->dev_fd == -1) { 1064 SPDK_ERRLOG("open(\"%s\") failed: %s\n", nbd_path, spdk_strerror(errno)); 1065 rc = -errno; 1066 goto err; 1067 } 1068 1069 SPDK_INFOLOG(nbd, "Enabling kernel access to bdev %s via %s\n", 1070 spdk_bdev_get_name(bdev), nbd_path); 1071 1072 nbd_enable_kernel(ctx); 1073 return; 1074 1075 err: 1076 free(ctx); 1077 if (nbd) { 1078 spdk_nbd_stop(nbd); 1079 } 1080 1081 if (cb_fn) { 1082 cb_fn(cb_arg, NULL, rc); 1083 } 1084 } 1085 1086 const char * 1087 spdk_nbd_get_path(struct spdk_nbd_disk *nbd) 1088 { 1089 return nbd->nbd_path; 1090 } 1091 1092 SPDK_LOG_REGISTER_COMPONENT(nbd) 1093