1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/string.h" 36 37 #include <linux/nbd.h> 38 39 #include "spdk/nbd.h" 40 #include "nbd_internal.h" 41 #include "spdk/bdev.h" 42 #include "spdk/endian.h" 43 #include "spdk/env.h" 44 #include "spdk/log.h" 45 #include "spdk/util.h" 46 #include "spdk/thread.h" 47 48 #include "spdk/queue.h" 49 50 #define GET_IO_LOOP_COUNT 16 51 #define NBD_BUSY_WAITING_MS 1000 52 #define NBD_BUSY_POLLING_INTERVAL_US 20000 53 54 enum nbd_io_state_t { 55 /* Receiving or ready to receive nbd request header */ 56 NBD_IO_RECV_REQ = 0, 57 /* Receiving write payload */ 58 NBD_IO_RECV_PAYLOAD, 59 /* Transmitting or ready to transmit nbd response header */ 60 NBD_IO_XMIT_RESP, 61 /* Transmitting read payload */ 62 NBD_IO_XMIT_PAYLOAD, 63 }; 64 65 struct nbd_io { 66 struct spdk_nbd_disk *nbd; 67 enum nbd_io_state_t state; 68 69 void *payload; 70 uint32_t payload_size; 71 72 struct nbd_request req; 73 struct nbd_reply resp; 74 75 /* 76 * Tracks current progress on reading/writing a request, 77 * response, or payload from the nbd socket. 78 */ 79 uint32_t offset; 80 81 /* for bdev io_wait */ 82 struct spdk_bdev_io_wait_entry bdev_io_wait; 83 84 TAILQ_ENTRY(nbd_io) tailq; 85 }; 86 87 enum nbd_disk_state_t { 88 NBD_DISK_STATE_RUNNING = 0, 89 /* soft disconnection caused by receiving nbd_cmd_disc */ 90 NBD_DISK_STATE_SOFTDISC, 91 /* hard disconnection caused by mandatory conditions */ 92 NBD_DISK_STATE_HARDDISC, 93 }; 94 95 struct spdk_nbd_disk { 96 struct spdk_bdev *bdev; 97 struct spdk_bdev_desc *bdev_desc; 98 struct spdk_io_channel *ch; 99 int dev_fd; 100 char *nbd_path; 101 int kernel_sp_fd; 102 int spdk_sp_fd; 103 struct spdk_poller *nbd_poller; 104 struct spdk_interrupt *intr; 105 uint32_t buf_align; 106 107 struct nbd_io *io_in_recv; 108 TAILQ_HEAD(, nbd_io) received_io_list; 109 TAILQ_HEAD(, nbd_io) executed_io_list; 110 111 enum nbd_disk_state_t state; 112 /* count of nbd_io in spdk_nbd_disk */ 113 int io_count; 114 115 TAILQ_ENTRY(spdk_nbd_disk) tailq; 116 }; 117 118 struct spdk_nbd_disk_globals { 119 TAILQ_HEAD(, spdk_nbd_disk) disk_head; 120 }; 121 122 static struct spdk_nbd_disk_globals g_spdk_nbd; 123 static spdk_nbd_fini_cb g_fini_cb_fn; 124 static void *g_fini_cb_arg; 125 126 static void _nbd_fini(void *arg1); 127 128 static int 129 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io); 130 131 int 132 spdk_nbd_init(void) 133 { 134 TAILQ_INIT(&g_spdk_nbd.disk_head); 135 136 return 0; 137 } 138 139 static void 140 _nbd_stop_async(void *arg) 141 { 142 struct spdk_nbd_disk *nbd = arg; 143 int rc; 144 145 rc = spdk_nbd_stop(nbd); 146 if (rc) { 147 /* spdk_nbd_stop failed because some IO are still executing. Send a message 148 * to this thread to try again later. */ 149 spdk_thread_send_msg(spdk_get_thread(), 150 _nbd_stop_async, nbd); 151 } else { 152 _nbd_fini(NULL); 153 } 154 } 155 156 static void 157 _nbd_fini(void *arg1) 158 { 159 struct spdk_nbd_disk *nbd_first; 160 161 nbd_first = TAILQ_FIRST(&g_spdk_nbd.disk_head); 162 if (nbd_first) { 163 /* Stop running spdk_nbd_disk */ 164 spdk_thread_send_msg(spdk_io_channel_get_thread(nbd_first->ch), 165 _nbd_stop_async, nbd_first); 166 } else { 167 /* We can directly call final function here, because 168 spdk_subsystem_fini_next handles the case: current thread does not equal 169 to g_final_thread */ 170 g_fini_cb_fn(g_fini_cb_arg); 171 } 172 } 173 174 void 175 spdk_nbd_fini(spdk_nbd_fini_cb cb_fn, void *cb_arg) 176 { 177 g_fini_cb_fn = cb_fn; 178 g_fini_cb_arg = cb_arg; 179 180 _nbd_fini(NULL); 181 } 182 183 static int 184 nbd_disk_register(struct spdk_nbd_disk *nbd) 185 { 186 if (nbd_disk_find_by_nbd_path(nbd->nbd_path)) { 187 SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path); 188 return -EBUSY; 189 } 190 191 TAILQ_INSERT_TAIL(&g_spdk_nbd.disk_head, nbd, tailq); 192 193 return 0; 194 } 195 196 static void 197 nbd_disk_unregister(struct spdk_nbd_disk *nbd) 198 { 199 struct spdk_nbd_disk *nbd_idx, *nbd_tmp; 200 201 /* 202 * nbd disk may be stopped before registered. 203 * check whether it was registered. 204 */ 205 TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) { 206 if (nbd == nbd_idx) { 207 TAILQ_REMOVE(&g_spdk_nbd.disk_head, nbd_idx, tailq); 208 break; 209 } 210 } 211 } 212 213 struct spdk_nbd_disk * 214 nbd_disk_find_by_nbd_path(const char *nbd_path) 215 { 216 struct spdk_nbd_disk *nbd; 217 218 /* 219 * check whether nbd has already been registered by nbd path. 220 */ 221 TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) { 222 if (!strcmp(nbd->nbd_path, nbd_path)) { 223 return nbd; 224 } 225 } 226 227 return NULL; 228 } 229 230 struct spdk_nbd_disk *nbd_disk_first(void) 231 { 232 return TAILQ_FIRST(&g_spdk_nbd.disk_head); 233 } 234 235 struct spdk_nbd_disk *nbd_disk_next(struct spdk_nbd_disk *prev) 236 { 237 return TAILQ_NEXT(prev, tailq); 238 } 239 240 const char * 241 nbd_disk_get_nbd_path(struct spdk_nbd_disk *nbd) 242 { 243 return nbd->nbd_path; 244 } 245 246 const char * 247 nbd_disk_get_bdev_name(struct spdk_nbd_disk *nbd) 248 { 249 return spdk_bdev_get_name(nbd->bdev); 250 } 251 252 void 253 spdk_nbd_write_config_json(struct spdk_json_write_ctx *w) 254 { 255 struct spdk_nbd_disk *nbd; 256 257 spdk_json_write_array_begin(w); 258 259 TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) { 260 spdk_json_write_object_begin(w); 261 262 spdk_json_write_named_string(w, "method", "nbd_start_disk"); 263 264 spdk_json_write_named_object_begin(w, "params"); 265 spdk_json_write_named_string(w, "nbd_device", nbd_disk_get_nbd_path(nbd)); 266 spdk_json_write_named_string(w, "bdev_name", nbd_disk_get_bdev_name(nbd)); 267 spdk_json_write_object_end(w); 268 269 spdk_json_write_object_end(w); 270 } 271 272 spdk_json_write_array_end(w); 273 } 274 275 void 276 nbd_disconnect(struct spdk_nbd_disk *nbd) 277 { 278 /* 279 * nbd soft-disconnection to terminate transmission phase. 280 * After receiving this ioctl command, nbd kernel module will send 281 * a NBD_CMD_DISC type io to nbd server in order to inform server. 282 */ 283 ioctl(nbd->dev_fd, NBD_DISCONNECT); 284 } 285 286 static struct nbd_io * 287 nbd_get_io(struct spdk_nbd_disk *nbd) 288 { 289 struct nbd_io *io; 290 291 io = calloc(1, sizeof(*io)); 292 if (!io) { 293 return NULL; 294 } 295 296 io->nbd = nbd; 297 to_be32(&io->resp.magic, NBD_REPLY_MAGIC); 298 299 nbd->io_count++; 300 301 return io; 302 } 303 304 static void 305 nbd_put_io(struct spdk_nbd_disk *nbd, struct nbd_io *io) 306 { 307 if (io->payload) { 308 spdk_free(io->payload); 309 } 310 free(io); 311 312 nbd->io_count--; 313 } 314 315 /* 316 * Check whether received nbd_io are all transmitted. 317 * 318 * \return 1 there is still some nbd_io not transmitted. 319 * 0 all nbd_io received are transmitted. 320 */ 321 static int 322 nbd_io_xmit_check(struct spdk_nbd_disk *nbd) 323 { 324 if (nbd->io_count == 0) { 325 return 0; 326 } else if (nbd->io_count == 1 && nbd->io_in_recv != NULL) { 327 return 0; 328 } 329 330 return 1; 331 } 332 333 /* 334 * Check whether received nbd_io are all executed, 335 * and put back executed nbd_io instead of transmitting them 336 * 337 * \return 1 there is still some nbd_io under executing 338 * 0 all nbd_io gotten are freed. 339 */ 340 static int 341 nbd_cleanup_io(struct spdk_nbd_disk *nbd) 342 { 343 /* free io_in_recv */ 344 if (nbd->io_in_recv != NULL) { 345 nbd_put_io(nbd, nbd->io_in_recv); 346 nbd->io_in_recv = NULL; 347 } 348 349 /* 350 * Some nbd_io may be under executing in bdev. 351 * Wait for their done operation. 352 */ 353 if (nbd->io_count != 0) { 354 return 1; 355 } 356 357 return 0; 358 } 359 360 static void 361 _nbd_stop(struct spdk_nbd_disk *nbd) 362 { 363 if (nbd->ch) { 364 spdk_put_io_channel(nbd->ch); 365 } 366 367 if (nbd->bdev_desc) { 368 spdk_bdev_close(nbd->bdev_desc); 369 } 370 371 if (nbd->nbd_poller) { 372 spdk_poller_unregister(&nbd->nbd_poller); 373 } 374 375 if (nbd->intr) { 376 spdk_interrupt_unregister(&nbd->intr); 377 } 378 379 if (nbd->spdk_sp_fd >= 0) { 380 close(nbd->spdk_sp_fd); 381 } 382 383 if (nbd->kernel_sp_fd >= 0) { 384 close(nbd->kernel_sp_fd); 385 } 386 387 if (nbd->dev_fd >= 0) { 388 /* Clear nbd device only if it is occupied by SPDK app */ 389 if (nbd->nbd_path && nbd_disk_find_by_nbd_path(nbd->nbd_path)) { 390 ioctl(nbd->dev_fd, NBD_CLEAR_QUE); 391 ioctl(nbd->dev_fd, NBD_CLEAR_SOCK); 392 } 393 close(nbd->dev_fd); 394 } 395 396 if (nbd->nbd_path) { 397 free(nbd->nbd_path); 398 } 399 400 nbd_disk_unregister(nbd); 401 402 free(nbd); 403 } 404 405 int 406 spdk_nbd_stop(struct spdk_nbd_disk *nbd) 407 { 408 int rc = 0; 409 410 if (nbd == NULL) { 411 return rc; 412 } 413 414 nbd->state = NBD_DISK_STATE_HARDDISC; 415 416 /* 417 * Stop action should be called only after all nbd_io are executed. 418 */ 419 420 rc = nbd_cleanup_io(nbd); 421 if (!rc) { 422 _nbd_stop(nbd); 423 } 424 425 return rc; 426 } 427 428 static int64_t 429 nbd_socket_rw(int fd, void *buf, size_t length, bool read_op) 430 { 431 ssize_t rc; 432 433 if (read_op) { 434 rc = read(fd, buf, length); 435 } else { 436 rc = write(fd, buf, length); 437 } 438 439 if (rc == 0) { 440 return -EIO; 441 } else if (rc == -1) { 442 if (errno != EAGAIN) { 443 return -errno; 444 } 445 return 0; 446 } else { 447 return rc; 448 } 449 } 450 451 static void 452 nbd_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 453 { 454 struct nbd_io *io = cb_arg; 455 struct spdk_nbd_disk *nbd = io->nbd; 456 457 if (success) { 458 io->resp.error = 0; 459 } else { 460 to_be32(&io->resp.error, EIO); 461 } 462 463 memcpy(&io->resp.handle, &io->req.handle, sizeof(io->resp.handle)); 464 465 /* When there begins to have executed_io, enable socket writable notice in order to 466 * get it processed in nbd_io_xmit 467 */ 468 if (nbd->intr && TAILQ_EMPTY(&nbd->executed_io_list)) { 469 spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT); 470 } 471 472 TAILQ_INSERT_TAIL(&nbd->executed_io_list, io, tailq); 473 474 if (bdev_io != NULL) { 475 spdk_bdev_free_io(bdev_io); 476 } 477 478 if (nbd->state == NBD_DISK_STATE_HARDDISC && !nbd_cleanup_io(nbd)) { 479 _nbd_stop(nbd); 480 } 481 } 482 483 static void 484 nbd_resubmit_io(void *arg) 485 { 486 struct nbd_io *io = (struct nbd_io *)arg; 487 struct spdk_nbd_disk *nbd = io->nbd; 488 int rc = 0; 489 490 rc = nbd_submit_bdev_io(nbd, io); 491 if (rc) { 492 SPDK_INFOLOG(nbd, "nbd: io resubmit for dev %s , io_type %d, returned %d.\n", 493 nbd_disk_get_bdev_name(nbd), from_be32(&io->req.type), rc); 494 } 495 } 496 497 static void 498 nbd_queue_io(struct nbd_io *io) 499 { 500 int rc; 501 struct spdk_bdev *bdev = io->nbd->bdev; 502 503 io->bdev_io_wait.bdev = bdev; 504 io->bdev_io_wait.cb_fn = nbd_resubmit_io; 505 io->bdev_io_wait.cb_arg = io; 506 507 rc = spdk_bdev_queue_io_wait(bdev, io->nbd->ch, &io->bdev_io_wait); 508 if (rc != 0) { 509 SPDK_ERRLOG("Queue io failed in nbd_queue_io, rc=%d.\n", rc); 510 nbd_io_done(NULL, false, io); 511 } 512 } 513 514 static int 515 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io) 516 { 517 struct spdk_bdev_desc *desc = nbd->bdev_desc; 518 struct spdk_io_channel *ch = nbd->ch; 519 int rc = 0; 520 521 switch (from_be32(&io->req.type)) { 522 case NBD_CMD_READ: 523 rc = spdk_bdev_read(desc, ch, io->payload, from_be64(&io->req.from), 524 io->payload_size, nbd_io_done, io); 525 break; 526 case NBD_CMD_WRITE: 527 rc = spdk_bdev_write(desc, ch, io->payload, from_be64(&io->req.from), 528 io->payload_size, nbd_io_done, io); 529 break; 530 #ifdef NBD_FLAG_SEND_FLUSH 531 case NBD_CMD_FLUSH: 532 rc = spdk_bdev_flush(desc, ch, 0, 533 spdk_bdev_get_num_blocks(nbd->bdev) * spdk_bdev_get_block_size(nbd->bdev), 534 nbd_io_done, io); 535 break; 536 #endif 537 #ifdef NBD_FLAG_SEND_TRIM 538 case NBD_CMD_TRIM: 539 rc = spdk_bdev_unmap(desc, ch, from_be64(&io->req.from), 540 from_be32(&io->req.len), nbd_io_done, io); 541 break; 542 #endif 543 case NBD_CMD_DISC: 544 nbd_put_io(nbd, io); 545 nbd->state = NBD_DISK_STATE_SOFTDISC; 546 547 /* when there begins to have executed_io to send, enable socket writable notice */ 548 if (nbd->intr && TAILQ_EMPTY(&nbd->executed_io_list)) { 549 spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT); 550 } 551 552 break; 553 default: 554 rc = -1; 555 } 556 557 if (rc < 0) { 558 if (rc == -ENOMEM) { 559 SPDK_INFOLOG(nbd, "No memory, start to queue io.\n"); 560 nbd_queue_io(io); 561 } else { 562 SPDK_ERRLOG("nbd io failed in nbd_queue_io, rc=%d.\n", rc); 563 nbd_io_done(NULL, false, io); 564 } 565 } 566 567 return 0; 568 } 569 570 static int 571 nbd_io_exec(struct spdk_nbd_disk *nbd) 572 { 573 struct nbd_io *io, *io_tmp; 574 int io_count = 0; 575 int ret = 0; 576 577 if (!TAILQ_EMPTY(&nbd->received_io_list)) { 578 TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) { 579 TAILQ_REMOVE(&nbd->received_io_list, io, tailq); 580 ret = nbd_submit_bdev_io(nbd, io); 581 if (ret < 0) { 582 return ret; 583 } 584 585 io_count++; 586 } 587 } 588 589 return io_count; 590 } 591 592 static int 593 nbd_io_recv_internal(struct spdk_nbd_disk *nbd) 594 { 595 struct nbd_io *io; 596 int ret = 0; 597 int received = 0; 598 599 if (nbd->io_in_recv == NULL) { 600 nbd->io_in_recv = nbd_get_io(nbd); 601 if (!nbd->io_in_recv) { 602 return -ENOMEM; 603 } 604 } 605 606 io = nbd->io_in_recv; 607 608 if (io->state == NBD_IO_RECV_REQ) { 609 ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->req + io->offset, 610 sizeof(io->req) - io->offset, true); 611 if (ret < 0) { 612 nbd_put_io(nbd, io); 613 nbd->io_in_recv = NULL; 614 return ret; 615 } 616 617 io->offset += ret; 618 received = ret; 619 620 /* request is fully received */ 621 if (io->offset == sizeof(io->req)) { 622 io->offset = 0; 623 624 /* req magic check */ 625 if (from_be32(&io->req.magic) != NBD_REQUEST_MAGIC) { 626 SPDK_ERRLOG("invalid request magic\n"); 627 nbd_put_io(nbd, io); 628 nbd->io_in_recv = NULL; 629 return -EINVAL; 630 } 631 632 /* io except read/write should ignore payload */ 633 if (from_be32(&io->req.type) == NBD_CMD_WRITE || 634 from_be32(&io->req.type) == NBD_CMD_READ) { 635 io->payload_size = from_be32(&io->req.len); 636 } else { 637 io->payload_size = 0; 638 } 639 640 /* io payload allocate */ 641 if (io->payload_size) { 642 io->payload = spdk_malloc(io->payload_size, nbd->buf_align, NULL, 643 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 644 if (io->payload == NULL) { 645 SPDK_ERRLOG("could not allocate io->payload of size %d\n", io->payload_size); 646 nbd_put_io(nbd, io); 647 nbd->io_in_recv = NULL; 648 return -ENOMEM; 649 } 650 } else { 651 io->payload = NULL; 652 } 653 654 /* next io step */ 655 if (from_be32(&io->req.type) == NBD_CMD_WRITE) { 656 io->state = NBD_IO_RECV_PAYLOAD; 657 } else { 658 io->state = NBD_IO_XMIT_RESP; 659 nbd->io_in_recv = NULL; 660 TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq); 661 } 662 } 663 } 664 665 if (io->state == NBD_IO_RECV_PAYLOAD) { 666 ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset, true); 667 if (ret < 0) { 668 nbd_put_io(nbd, io); 669 nbd->io_in_recv = NULL; 670 return ret; 671 } 672 673 io->offset += ret; 674 received += ret; 675 676 /* request payload is fully received */ 677 if (io->offset == io->payload_size) { 678 io->offset = 0; 679 io->state = NBD_IO_XMIT_RESP; 680 nbd->io_in_recv = NULL; 681 TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq); 682 } 683 684 } 685 686 return received; 687 } 688 689 static int 690 nbd_io_recv(struct spdk_nbd_disk *nbd) 691 { 692 int i, rc, ret = 0; 693 694 /* 695 * nbd server should not accept request in both soft and hard 696 * disconnect states. 697 */ 698 if (nbd->state != NBD_DISK_STATE_RUNNING) { 699 return 0; 700 } 701 702 for (i = 0; i < GET_IO_LOOP_COUNT; i++) { 703 rc = nbd_io_recv_internal(nbd); 704 if (rc < 0) { 705 return rc; 706 } 707 ret += rc; 708 } 709 710 return ret; 711 } 712 713 static int 714 nbd_io_xmit_internal(struct spdk_nbd_disk *nbd) 715 { 716 struct nbd_io *io; 717 int ret = 0; 718 int sent = 0; 719 720 io = TAILQ_FIRST(&nbd->executed_io_list); 721 if (io == NULL) { 722 return 0; 723 } 724 725 /* Remove IO from list now assuming it will be completed. It will be inserted 726 * back to the head if it cannot be completed. This approach is specifically 727 * taken to work around a scan-build use-after-free mischaracterization. 728 */ 729 TAILQ_REMOVE(&nbd->executed_io_list, io, tailq); 730 731 /* resp error and handler are already set in io_done */ 732 733 if (io->state == NBD_IO_XMIT_RESP) { 734 ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->resp + io->offset, 735 sizeof(io->resp) - io->offset, false); 736 if (ret <= 0) { 737 goto reinsert; 738 } 739 740 io->offset += ret; 741 sent = ret; 742 743 /* response is fully transmitted */ 744 if (io->offset == sizeof(io->resp)) { 745 io->offset = 0; 746 747 /* transmit payload only when NBD_CMD_READ with no resp error */ 748 if (from_be32(&io->req.type) != NBD_CMD_READ || io->resp.error != 0) { 749 nbd_put_io(nbd, io); 750 return 0; 751 } else { 752 io->state = NBD_IO_XMIT_PAYLOAD; 753 } 754 } 755 } 756 757 if (io->state == NBD_IO_XMIT_PAYLOAD) { 758 ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset, 759 false); 760 if (ret <= 0) { 761 goto reinsert; 762 } 763 764 io->offset += ret; 765 sent += ret; 766 767 /* read payload is fully transmitted */ 768 if (io->offset == io->payload_size) { 769 nbd_put_io(nbd, io); 770 return sent; 771 } 772 } 773 774 reinsert: 775 TAILQ_INSERT_HEAD(&nbd->executed_io_list, io, tailq); 776 return ret < 0 ? ret : sent; 777 } 778 779 static int 780 nbd_io_xmit(struct spdk_nbd_disk *nbd) 781 { 782 int ret = 0; 783 int rc; 784 785 while (!TAILQ_EMPTY(&nbd->executed_io_list)) { 786 rc = nbd_io_xmit_internal(nbd); 787 if (rc < 0) { 788 return rc; 789 } 790 791 ret += rc; 792 } 793 794 /* When there begins to have no executed_io, disable socket writable notice */ 795 if (nbd->intr) { 796 spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN); 797 } 798 799 /* 800 * For soft disconnection, nbd server can close connection after all 801 * outstanding request are transmitted. 802 */ 803 if (nbd->state == NBD_DISK_STATE_SOFTDISC && !nbd_io_xmit_check(nbd)) { 804 return -1; 805 } 806 807 return ret; 808 } 809 810 /** 811 * Poll an NBD instance. 812 * 813 * \return 0 on success or negated errno values on error (e.g. connection closed). 814 */ 815 static int 816 _nbd_poll(struct spdk_nbd_disk *nbd) 817 { 818 int received, sent, executed; 819 820 /* transmit executed io first */ 821 sent = nbd_io_xmit(nbd); 822 if (sent < 0) { 823 return sent; 824 } 825 826 received = nbd_io_recv(nbd); 827 if (received < 0) { 828 return received; 829 } 830 831 executed = nbd_io_exec(nbd); 832 if (executed < 0) { 833 return executed; 834 } 835 836 return sent + received + executed; 837 } 838 839 static int 840 nbd_poll(void *arg) 841 { 842 struct spdk_nbd_disk *nbd = arg; 843 int rc; 844 845 rc = _nbd_poll(nbd); 846 if (rc < 0) { 847 SPDK_INFOLOG(nbd, "nbd_poll() returned %s (%d); closing connection\n", 848 spdk_strerror(-rc), rc); 849 spdk_nbd_stop(nbd); 850 } 851 852 return rc > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE; 853 } 854 855 static void * 856 nbd_start_kernel(void *arg) 857 { 858 int dev_fd = (int)(intptr_t)arg; 859 860 spdk_unaffinitize_thread(); 861 862 /* This will block in the kernel until we close the spdk_sp_fd. */ 863 ioctl(dev_fd, NBD_DO_IT); 864 865 pthread_exit(NULL); 866 } 867 868 static void 869 nbd_bdev_hot_remove(struct spdk_nbd_disk *nbd) 870 { 871 spdk_nbd_stop(nbd); 872 } 873 874 static void 875 nbd_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, 876 void *event_ctx) 877 { 878 switch (type) { 879 case SPDK_BDEV_EVENT_REMOVE: 880 nbd_bdev_hot_remove(event_ctx); 881 break; 882 default: 883 SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); 884 break; 885 } 886 } 887 888 struct spdk_nbd_start_ctx { 889 struct spdk_nbd_disk *nbd; 890 spdk_nbd_start_cb cb_fn; 891 void *cb_arg; 892 struct spdk_poller *poller; 893 int polling_count; 894 }; 895 896 static void 897 nbd_start_complete(struct spdk_nbd_start_ctx *ctx) 898 { 899 int rc; 900 pthread_t tid; 901 int flag; 902 903 /* Add nbd_disk to the end of disk list */ 904 rc = nbd_disk_register(ctx->nbd); 905 if (rc != 0) { 906 SPDK_ERRLOG("Failed to register %s, it should not happen.\n", ctx->nbd->nbd_path); 907 assert(false); 908 goto err; 909 } 910 911 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_BLKSIZE, spdk_bdev_get_block_size(ctx->nbd->bdev)); 912 if (rc == -1) { 913 SPDK_ERRLOG("ioctl(NBD_SET_BLKSIZE) failed: %s\n", spdk_strerror(errno)); 914 rc = -errno; 915 goto err; 916 } 917 918 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SIZE_BLOCKS, spdk_bdev_get_num_blocks(ctx->nbd->bdev)); 919 if (rc == -1) { 920 SPDK_ERRLOG("ioctl(NBD_SET_SIZE_BLOCKS) failed: %s\n", spdk_strerror(errno)); 921 rc = -errno; 922 goto err; 923 } 924 925 #ifdef NBD_FLAG_SEND_TRIM 926 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_FLAGS, NBD_FLAG_SEND_TRIM); 927 if (rc == -1) { 928 SPDK_ERRLOG("ioctl(NBD_SET_FLAGS) failed: %s\n", spdk_strerror(errno)); 929 rc = -errno; 930 goto err; 931 } 932 #endif 933 934 rc = pthread_create(&tid, NULL, nbd_start_kernel, (void *)(intptr_t)ctx->nbd->dev_fd); 935 if (rc != 0) { 936 SPDK_ERRLOG("could not create thread: %s\n", spdk_strerror(rc)); 937 rc = -rc; 938 goto err; 939 } 940 941 rc = pthread_detach(tid); 942 if (rc != 0) { 943 SPDK_ERRLOG("could not detach thread for nbd kernel: %s\n", spdk_strerror(rc)); 944 rc = -rc; 945 goto err; 946 } 947 948 flag = fcntl(ctx->nbd->spdk_sp_fd, F_GETFL); 949 if (fcntl(ctx->nbd->spdk_sp_fd, F_SETFL, flag | O_NONBLOCK) < 0) { 950 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n", 951 ctx->nbd->spdk_sp_fd, spdk_strerror(errno)); 952 rc = -errno; 953 goto err; 954 } 955 956 if (spdk_interrupt_mode_is_enabled()) { 957 ctx->nbd->intr = SPDK_INTERRUPT_REGISTER(ctx->nbd->spdk_sp_fd, nbd_poll, ctx->nbd); 958 } else { 959 ctx->nbd->nbd_poller = SPDK_POLLER_REGISTER(nbd_poll, ctx->nbd, 0); 960 } 961 962 if (ctx->cb_fn) { 963 ctx->cb_fn(ctx->cb_arg, ctx->nbd, 0); 964 } 965 966 free(ctx); 967 return; 968 969 err: 970 spdk_nbd_stop(ctx->nbd); 971 if (ctx->cb_fn) { 972 ctx->cb_fn(ctx->cb_arg, NULL, rc); 973 } 974 free(ctx); 975 } 976 977 static int 978 nbd_enable_kernel(void *arg) 979 { 980 struct spdk_nbd_start_ctx *ctx = arg; 981 int rc; 982 int flag; 983 984 /* Declare device setup by this process */ 985 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SOCK, ctx->nbd->kernel_sp_fd); 986 987 if (!rc) { 988 flag = fcntl(ctx->nbd->kernel_sp_fd, F_GETFL); 989 rc = fcntl(ctx->nbd->kernel_sp_fd, F_SETFL, flag | O_NONBLOCK); 990 if (rc < 0) { 991 SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%s)\n", 992 ctx->nbd->kernel_sp_fd, spdk_strerror(errno)); 993 } 994 } 995 996 if (rc) { 997 if (errno == EBUSY && ctx->polling_count-- > 0) { 998 if (ctx->poller == NULL) { 999 ctx->poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx, 1000 NBD_BUSY_POLLING_INTERVAL_US); 1001 } 1002 /* If the kernel is busy, check back later */ 1003 return SPDK_POLLER_BUSY; 1004 } 1005 1006 SPDK_ERRLOG("ioctl(NBD_SET_SOCK) failed: %s\n", spdk_strerror(errno)); 1007 if (ctx->poller) { 1008 spdk_poller_unregister(&ctx->poller); 1009 } 1010 1011 spdk_nbd_stop(ctx->nbd); 1012 1013 if (ctx->cb_fn) { 1014 ctx->cb_fn(ctx->cb_arg, NULL, -errno); 1015 } 1016 1017 free(ctx); 1018 return SPDK_POLLER_BUSY; 1019 } 1020 1021 if (ctx->poller) { 1022 spdk_poller_unregister(&ctx->poller); 1023 } 1024 1025 nbd_start_complete(ctx); 1026 1027 return SPDK_POLLER_BUSY; 1028 } 1029 1030 void 1031 spdk_nbd_start(const char *bdev_name, const char *nbd_path, 1032 spdk_nbd_start_cb cb_fn, void *cb_arg) 1033 { 1034 struct spdk_nbd_start_ctx *ctx = NULL; 1035 struct spdk_nbd_disk *nbd = NULL; 1036 struct spdk_bdev *bdev; 1037 int rc; 1038 int sp[2]; 1039 1040 nbd = calloc(1, sizeof(*nbd)); 1041 if (nbd == NULL) { 1042 rc = -ENOMEM; 1043 goto err; 1044 } 1045 1046 nbd->dev_fd = -1; 1047 nbd->spdk_sp_fd = -1; 1048 nbd->kernel_sp_fd = -1; 1049 1050 ctx = calloc(1, sizeof(*ctx)); 1051 if (ctx == NULL) { 1052 rc = -ENOMEM; 1053 goto err; 1054 } 1055 1056 ctx->nbd = nbd; 1057 ctx->cb_fn = cb_fn; 1058 ctx->cb_arg = cb_arg; 1059 ctx->polling_count = NBD_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US; 1060 1061 rc = spdk_bdev_open_ext(bdev_name, true, nbd_bdev_event_cb, nbd, &nbd->bdev_desc); 1062 if (rc != 0) { 1063 SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc); 1064 goto err; 1065 } 1066 1067 bdev = spdk_bdev_desc_get_bdev(nbd->bdev_desc); 1068 nbd->bdev = bdev; 1069 1070 nbd->ch = spdk_bdev_get_io_channel(nbd->bdev_desc); 1071 nbd->buf_align = spdk_max(spdk_bdev_get_buf_align(bdev), 64); 1072 1073 rc = socketpair(AF_UNIX, SOCK_STREAM, 0, sp); 1074 if (rc != 0) { 1075 SPDK_ERRLOG("socketpair failed\n"); 1076 rc = -errno; 1077 goto err; 1078 } 1079 1080 nbd->spdk_sp_fd = sp[0]; 1081 nbd->kernel_sp_fd = sp[1]; 1082 nbd->nbd_path = strdup(nbd_path); 1083 if (!nbd->nbd_path) { 1084 SPDK_ERRLOG("strdup allocation failure\n"); 1085 rc = -ENOMEM; 1086 goto err; 1087 } 1088 1089 TAILQ_INIT(&nbd->received_io_list); 1090 TAILQ_INIT(&nbd->executed_io_list); 1091 1092 /* Make sure nbd_path is not used in this SPDK app */ 1093 if (nbd_disk_find_by_nbd_path(nbd->nbd_path)) { 1094 SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path); 1095 rc = -EBUSY; 1096 goto err; 1097 } 1098 1099 nbd->dev_fd = open(nbd_path, O_RDWR | O_DIRECT); 1100 if (nbd->dev_fd == -1) { 1101 SPDK_ERRLOG("open(\"%s\") failed: %s\n", nbd_path, spdk_strerror(errno)); 1102 rc = -errno; 1103 goto err; 1104 } 1105 1106 SPDK_INFOLOG(nbd, "Enabling kernel access to bdev %s via %s\n", 1107 bdev_name, nbd_path); 1108 1109 nbd_enable_kernel(ctx); 1110 return; 1111 1112 err: 1113 free(ctx); 1114 if (nbd) { 1115 spdk_nbd_stop(nbd); 1116 } 1117 1118 if (cb_fn) { 1119 cb_fn(cb_arg, NULL, rc); 1120 } 1121 } 1122 1123 const char * 1124 spdk_nbd_get_path(struct spdk_nbd_disk *nbd) 1125 { 1126 return nbd->nbd_path; 1127 } 1128 1129 SPDK_LOG_REGISTER_COMPONENT(nbd) 1130