1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/string.h" 36 37 #include <linux/nbd.h> 38 39 #include "spdk/nbd.h" 40 #include "nbd_internal.h" 41 #include "spdk/bdev.h" 42 #include "spdk/endian.h" 43 #include "spdk/env.h" 44 #include "spdk/likely.h" 45 #include "spdk/log.h" 46 #include "spdk/util.h" 47 #include "spdk/thread.h" 48 49 #include "spdk/queue.h" 50 51 #define GET_IO_LOOP_COUNT 16 52 #define NBD_START_BUSY_WAITING_MS 1000 53 #define NBD_STOP_BUSY_WAITING_MS 10000 54 #define NBD_BUSY_POLLING_INTERVAL_US 20000 55 #define NBD_IO_TIMEOUT_S 60 56 57 enum nbd_io_state_t { 58 /* Receiving or ready to receive nbd request header */ 59 NBD_IO_RECV_REQ = 0, 60 /* Receiving write payload */ 61 NBD_IO_RECV_PAYLOAD, 62 /* Transmitting or ready to transmit nbd response header */ 63 NBD_IO_XMIT_RESP, 64 /* Transmitting read payload */ 65 NBD_IO_XMIT_PAYLOAD, 66 }; 67 68 struct nbd_io { 69 struct spdk_nbd_disk *nbd; 70 enum nbd_io_state_t state; 71 72 void *payload; 73 uint32_t payload_size; 74 75 struct nbd_request req; 76 struct nbd_reply resp; 77 78 /* 79 * Tracks current progress on reading/writing a request, 80 * response, or payload from the nbd socket. 81 */ 82 uint32_t offset; 83 84 /* for bdev io_wait */ 85 struct spdk_bdev_io_wait_entry bdev_io_wait; 86 87 TAILQ_ENTRY(nbd_io) tailq; 88 }; 89 90 struct spdk_nbd_disk { 91 struct spdk_bdev *bdev; 92 struct spdk_bdev_desc *bdev_desc; 93 struct spdk_io_channel *ch; 94 int dev_fd; 95 char *nbd_path; 96 int kernel_sp_fd; 97 int spdk_sp_fd; 98 struct spdk_poller *nbd_poller; 99 struct spdk_interrupt *intr; 100 bool interrupt_mode; 101 uint32_t buf_align; 102 103 struct spdk_poller *retry_poller; 104 int retry_count; 105 /* Synchronize nbd_start_kernel pthread and nbd_stop */ 106 bool has_nbd_pthread; 107 108 struct nbd_io *io_in_recv; 109 TAILQ_HEAD(, nbd_io) received_io_list; 110 TAILQ_HEAD(, nbd_io) executed_io_list; 111 TAILQ_HEAD(, nbd_io) processing_io_list; 112 113 bool is_started; 114 bool is_closing; 115 /* count of nbd_io in spdk_nbd_disk */ 116 int io_count; 117 118 TAILQ_ENTRY(spdk_nbd_disk) tailq; 119 }; 120 121 struct spdk_nbd_disk_globals { 122 TAILQ_HEAD(, spdk_nbd_disk) disk_head; 123 }; 124 125 static struct spdk_nbd_disk_globals g_spdk_nbd; 126 static spdk_nbd_fini_cb g_fini_cb_fn; 127 static void *g_fini_cb_arg; 128 129 static void _nbd_fini(void *arg1); 130 131 static int 132 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io); 133 static int 134 nbd_io_recv_internal(struct spdk_nbd_disk *nbd); 135 136 int 137 spdk_nbd_init(void) 138 { 139 TAILQ_INIT(&g_spdk_nbd.disk_head); 140 141 return 0; 142 } 143 144 static void 145 _nbd_fini(void *arg1) 146 { 147 struct spdk_nbd_disk *nbd, *nbd_tmp; 148 149 TAILQ_FOREACH_SAFE(nbd, &g_spdk_nbd.disk_head, tailq, nbd_tmp) { 150 if (!nbd->is_closing) { 151 spdk_nbd_stop(nbd); 152 } 153 } 154 155 /* Check if all nbds closed */ 156 if (!TAILQ_FIRST(&g_spdk_nbd.disk_head)) { 157 g_fini_cb_fn(g_fini_cb_arg); 158 } else { 159 spdk_thread_send_msg(spdk_get_thread(), 160 _nbd_fini, NULL); 161 } 162 } 163 164 void 165 spdk_nbd_fini(spdk_nbd_fini_cb cb_fn, void *cb_arg) 166 { 167 g_fini_cb_fn = cb_fn; 168 g_fini_cb_arg = cb_arg; 169 170 _nbd_fini(NULL); 171 } 172 173 static int 174 nbd_disk_register(struct spdk_nbd_disk *nbd) 175 { 176 /* Make sure nbd_path is not used in this SPDK app */ 177 if (nbd_disk_find_by_nbd_path(nbd->nbd_path)) { 178 SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path); 179 return -EBUSY; 180 } 181 182 TAILQ_INSERT_TAIL(&g_spdk_nbd.disk_head, nbd, tailq); 183 184 return 0; 185 } 186 187 static void 188 nbd_disk_unregister(struct spdk_nbd_disk *nbd) 189 { 190 struct spdk_nbd_disk *nbd_idx, *nbd_tmp; 191 192 /* 193 * nbd disk may be stopped before registered. 194 * check whether it was registered. 195 */ 196 TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) { 197 if (nbd == nbd_idx) { 198 TAILQ_REMOVE(&g_spdk_nbd.disk_head, nbd_idx, tailq); 199 break; 200 } 201 } 202 } 203 204 struct spdk_nbd_disk * 205 nbd_disk_find_by_nbd_path(const char *nbd_path) 206 { 207 struct spdk_nbd_disk *nbd; 208 209 /* 210 * check whether nbd has already been registered by nbd path. 211 */ 212 TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) { 213 if (!strcmp(nbd->nbd_path, nbd_path)) { 214 return nbd; 215 } 216 } 217 218 return NULL; 219 } 220 221 struct spdk_nbd_disk *nbd_disk_first(void) 222 { 223 return TAILQ_FIRST(&g_spdk_nbd.disk_head); 224 } 225 226 struct spdk_nbd_disk *nbd_disk_next(struct spdk_nbd_disk *prev) 227 { 228 return TAILQ_NEXT(prev, tailq); 229 } 230 231 const char * 232 nbd_disk_get_nbd_path(struct spdk_nbd_disk *nbd) 233 { 234 return nbd->nbd_path; 235 } 236 237 const char * 238 nbd_disk_get_bdev_name(struct spdk_nbd_disk *nbd) 239 { 240 return spdk_bdev_get_name(nbd->bdev); 241 } 242 243 void 244 spdk_nbd_write_config_json(struct spdk_json_write_ctx *w) 245 { 246 struct spdk_nbd_disk *nbd; 247 248 spdk_json_write_array_begin(w); 249 250 TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) { 251 spdk_json_write_object_begin(w); 252 253 spdk_json_write_named_string(w, "method", "nbd_start_disk"); 254 255 spdk_json_write_named_object_begin(w, "params"); 256 spdk_json_write_named_string(w, "nbd_device", nbd_disk_get_nbd_path(nbd)); 257 spdk_json_write_named_string(w, "bdev_name", nbd_disk_get_bdev_name(nbd)); 258 spdk_json_write_object_end(w); 259 260 spdk_json_write_object_end(w); 261 } 262 263 spdk_json_write_array_end(w); 264 } 265 266 void 267 nbd_disconnect(struct spdk_nbd_disk *nbd) 268 { 269 /* 270 * nbd soft-disconnection to terminate transmission phase. 271 * After receiving this ioctl command, nbd kernel module will send 272 * a NBD_CMD_DISC type io to nbd server in order to inform server. 273 */ 274 ioctl(nbd->dev_fd, NBD_DISCONNECT); 275 } 276 277 static struct nbd_io * 278 nbd_get_io(struct spdk_nbd_disk *nbd) 279 { 280 struct nbd_io *io; 281 282 io = calloc(1, sizeof(*io)); 283 if (!io) { 284 return NULL; 285 } 286 287 io->nbd = nbd; 288 to_be32(&io->resp.magic, NBD_REPLY_MAGIC); 289 290 nbd->io_count++; 291 292 return io; 293 } 294 295 static void 296 nbd_put_io(struct spdk_nbd_disk *nbd, struct nbd_io *io) 297 { 298 if (io->payload) { 299 spdk_free(io->payload); 300 } 301 free(io); 302 303 nbd->io_count--; 304 } 305 306 /* 307 * Check whether received nbd_io are all executed, 308 * and put back executed nbd_io instead of transmitting them 309 * 310 * \return 1 there is still some nbd_io under executing 311 * 0 all nbd_io gotten are freed. 312 */ 313 static int 314 nbd_cleanup_io(struct spdk_nbd_disk *nbd) 315 { 316 /* Try to read the remaining nbd commands in the socket */ 317 while (nbd_io_recv_internal(nbd) > 0); 318 319 /* free io_in_recv */ 320 if (nbd->io_in_recv != NULL) { 321 nbd_put_io(nbd, nbd->io_in_recv); 322 nbd->io_in_recv = NULL; 323 } 324 325 /* 326 * Some nbd_io may be under executing in bdev. 327 * Wait for their done operation. 328 */ 329 if (nbd->io_count != 0) { 330 return 1; 331 } 332 333 return 0; 334 } 335 336 static int 337 _nbd_stop(void *arg) 338 { 339 struct spdk_nbd_disk *nbd = arg; 340 341 if (nbd->nbd_poller) { 342 spdk_poller_unregister(&nbd->nbd_poller); 343 } 344 345 if (nbd->intr) { 346 spdk_interrupt_unregister(&nbd->intr); 347 } 348 349 if (nbd->spdk_sp_fd >= 0) { 350 close(nbd->spdk_sp_fd); 351 nbd->spdk_sp_fd = -1; 352 } 353 354 if (nbd->kernel_sp_fd >= 0) { 355 close(nbd->kernel_sp_fd); 356 nbd->kernel_sp_fd = -1; 357 } 358 359 /* Continue the stop procedure after the exit of nbd_start_kernel pthread */ 360 if (nbd->has_nbd_pthread) { 361 if (nbd->retry_poller == NULL) { 362 nbd->retry_count = NBD_STOP_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US; 363 nbd->retry_poller = SPDK_POLLER_REGISTER(_nbd_stop, nbd, 364 NBD_BUSY_POLLING_INTERVAL_US); 365 return SPDK_POLLER_BUSY; 366 } 367 368 if (nbd->retry_count-- > 0) { 369 return SPDK_POLLER_BUSY; 370 } 371 372 SPDK_ERRLOG("Failed to wait for returning of NBD_DO_IT ioctl.\n"); 373 } 374 375 if (nbd->retry_poller) { 376 spdk_poller_unregister(&nbd->retry_poller); 377 } 378 379 if (nbd->dev_fd >= 0) { 380 /* Clear nbd device only if it is occupied by SPDK app */ 381 if (nbd->nbd_path && nbd_disk_find_by_nbd_path(nbd->nbd_path)) { 382 ioctl(nbd->dev_fd, NBD_CLEAR_QUE); 383 ioctl(nbd->dev_fd, NBD_CLEAR_SOCK); 384 } 385 close(nbd->dev_fd); 386 } 387 388 if (nbd->nbd_path) { 389 free(nbd->nbd_path); 390 } 391 392 if (nbd->ch) { 393 spdk_put_io_channel(nbd->ch); 394 nbd->ch = NULL; 395 } 396 397 if (nbd->bdev_desc) { 398 spdk_bdev_close(nbd->bdev_desc); 399 nbd->bdev_desc = NULL; 400 } 401 402 nbd_disk_unregister(nbd); 403 404 free(nbd); 405 406 return 0; 407 } 408 409 int 410 spdk_nbd_stop(struct spdk_nbd_disk *nbd) 411 { 412 int rc = 0; 413 414 if (nbd == NULL) { 415 return rc; 416 } 417 418 nbd->is_closing = true; 419 420 /* if nbd is not started, it will continue to call nbd stop later */ 421 if (!nbd->is_started) { 422 return 1; 423 } 424 425 /* 426 * Stop action should be called only after all nbd_io are executed. 427 */ 428 429 rc = nbd_cleanup_io(nbd); 430 if (!rc) { 431 _nbd_stop(nbd); 432 } 433 434 return rc; 435 } 436 437 static int64_t 438 nbd_socket_rw(int fd, void *buf, size_t length, bool read_op) 439 { 440 ssize_t rc; 441 442 if (read_op) { 443 rc = read(fd, buf, length); 444 } else { 445 rc = write(fd, buf, length); 446 } 447 448 if (rc == 0) { 449 return -EIO; 450 } else if (rc == -1) { 451 if (errno != EAGAIN) { 452 return -errno; 453 } 454 return 0; 455 } else { 456 return rc; 457 } 458 } 459 460 static void 461 nbd_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 462 { 463 struct nbd_io *io = cb_arg; 464 struct spdk_nbd_disk *nbd = io->nbd; 465 466 if (success) { 467 io->resp.error = 0; 468 } else { 469 to_be32(&io->resp.error, EIO); 470 } 471 472 memcpy(&io->resp.handle, &io->req.handle, sizeof(io->resp.handle)); 473 474 /* When there begins to have executed_io, enable socket writable notice in order to 475 * get it processed in nbd_io_xmit 476 */ 477 if (nbd->interrupt_mode && TAILQ_EMPTY(&nbd->executed_io_list)) { 478 spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT); 479 } 480 481 TAILQ_REMOVE(&nbd->processing_io_list, io, tailq); 482 TAILQ_INSERT_TAIL(&nbd->executed_io_list, io, tailq); 483 484 if (bdev_io != NULL) { 485 spdk_bdev_free_io(bdev_io); 486 } 487 } 488 489 static void 490 nbd_resubmit_io(void *arg) 491 { 492 struct nbd_io *io = (struct nbd_io *)arg; 493 struct spdk_nbd_disk *nbd = io->nbd; 494 int rc = 0; 495 496 rc = nbd_submit_bdev_io(nbd, io); 497 if (rc) { 498 SPDK_INFOLOG(nbd, "nbd: io resubmit for dev %s , io_type %d, returned %d.\n", 499 nbd_disk_get_bdev_name(nbd), from_be32(&io->req.type), rc); 500 } 501 } 502 503 static void 504 nbd_queue_io(struct nbd_io *io) 505 { 506 int rc; 507 struct spdk_bdev *bdev = io->nbd->bdev; 508 509 io->bdev_io_wait.bdev = bdev; 510 io->bdev_io_wait.cb_fn = nbd_resubmit_io; 511 io->bdev_io_wait.cb_arg = io; 512 513 rc = spdk_bdev_queue_io_wait(bdev, io->nbd->ch, &io->bdev_io_wait); 514 if (rc != 0) { 515 SPDK_ERRLOG("Queue io failed in nbd_queue_io, rc=%d.\n", rc); 516 nbd_io_done(NULL, false, io); 517 } 518 } 519 520 static int 521 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io) 522 { 523 struct spdk_bdev_desc *desc = nbd->bdev_desc; 524 struct spdk_io_channel *ch = nbd->ch; 525 int rc = 0; 526 527 switch (from_be32(&io->req.type)) { 528 case NBD_CMD_READ: 529 rc = spdk_bdev_read(desc, ch, io->payload, from_be64(&io->req.from), 530 io->payload_size, nbd_io_done, io); 531 break; 532 case NBD_CMD_WRITE: 533 rc = spdk_bdev_write(desc, ch, io->payload, from_be64(&io->req.from), 534 io->payload_size, nbd_io_done, io); 535 break; 536 #ifdef NBD_FLAG_SEND_FLUSH 537 case NBD_CMD_FLUSH: 538 rc = spdk_bdev_flush(desc, ch, 0, 539 spdk_bdev_get_num_blocks(nbd->bdev) * spdk_bdev_get_block_size(nbd->bdev), 540 nbd_io_done, io); 541 break; 542 #endif 543 #ifdef NBD_FLAG_SEND_TRIM 544 case NBD_CMD_TRIM: 545 rc = spdk_bdev_unmap(desc, ch, from_be64(&io->req.from), 546 from_be32(&io->req.len), nbd_io_done, io); 547 break; 548 #endif 549 default: 550 rc = -1; 551 } 552 553 if (rc < 0) { 554 if (rc == -ENOMEM) { 555 SPDK_INFOLOG(nbd, "No memory, start to queue io.\n"); 556 nbd_queue_io(io); 557 } else { 558 SPDK_ERRLOG("nbd io failed in nbd_queue_io, rc=%d.\n", rc); 559 nbd_io_done(NULL, false, io); 560 } 561 } 562 563 return 0; 564 } 565 566 static int 567 nbd_io_exec(struct spdk_nbd_disk *nbd) 568 { 569 struct nbd_io *io, *io_tmp; 570 int io_count = 0; 571 int ret = 0; 572 573 if (!TAILQ_EMPTY(&nbd->received_io_list)) { 574 TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) { 575 TAILQ_REMOVE(&nbd->received_io_list, io, tailq); 576 TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq); 577 ret = nbd_submit_bdev_io(nbd, io); 578 if (ret < 0) { 579 return ret; 580 } 581 582 io_count++; 583 } 584 } 585 586 return io_count; 587 } 588 589 static int 590 nbd_io_recv_internal(struct spdk_nbd_disk *nbd) 591 { 592 struct nbd_io *io; 593 int ret = 0; 594 int received = 0; 595 596 if (nbd->io_in_recv == NULL) { 597 nbd->io_in_recv = nbd_get_io(nbd); 598 if (!nbd->io_in_recv) { 599 return -ENOMEM; 600 } 601 } 602 603 io = nbd->io_in_recv; 604 605 if (io->state == NBD_IO_RECV_REQ) { 606 ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->req + io->offset, 607 sizeof(io->req) - io->offset, true); 608 if (ret < 0) { 609 nbd_put_io(nbd, io); 610 nbd->io_in_recv = NULL; 611 return ret; 612 } 613 614 io->offset += ret; 615 received = ret; 616 617 /* request is fully received */ 618 if (io->offset == sizeof(io->req)) { 619 io->offset = 0; 620 621 /* req magic check */ 622 if (from_be32(&io->req.magic) != NBD_REQUEST_MAGIC) { 623 SPDK_ERRLOG("invalid request magic\n"); 624 nbd_put_io(nbd, io); 625 nbd->io_in_recv = NULL; 626 return -EINVAL; 627 } 628 629 if (from_be32(&io->req.type) == NBD_CMD_DISC) { 630 nbd->is_closing = true; 631 nbd->io_in_recv = NULL; 632 if (nbd->interrupt_mode && TAILQ_EMPTY(&nbd->executed_io_list)) { 633 spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT); 634 } 635 nbd_put_io(nbd, io); 636 /* After receiving NBD_CMD_DISC, nbd will not receive any new commands */ 637 return received; 638 } 639 640 /* io except read/write should ignore payload */ 641 if (from_be32(&io->req.type) == NBD_CMD_WRITE || 642 from_be32(&io->req.type) == NBD_CMD_READ) { 643 io->payload_size = from_be32(&io->req.len); 644 } else { 645 io->payload_size = 0; 646 } 647 648 /* io payload allocate */ 649 if (io->payload_size) { 650 io->payload = spdk_malloc(io->payload_size, nbd->buf_align, NULL, 651 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 652 if (io->payload == NULL) { 653 SPDK_ERRLOG("could not allocate io->payload of size %d\n", io->payload_size); 654 nbd_put_io(nbd, io); 655 nbd->io_in_recv = NULL; 656 return -ENOMEM; 657 } 658 } else { 659 io->payload = NULL; 660 } 661 662 /* next io step */ 663 if (from_be32(&io->req.type) == NBD_CMD_WRITE) { 664 io->state = NBD_IO_RECV_PAYLOAD; 665 } else { 666 io->state = NBD_IO_XMIT_RESP; 667 if (spdk_likely((!nbd->is_closing) && nbd->is_started)) { 668 TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq); 669 } else { 670 TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq); 671 nbd_io_done(NULL, false, io); 672 } 673 nbd->io_in_recv = NULL; 674 } 675 } 676 } 677 678 if (io->state == NBD_IO_RECV_PAYLOAD) { 679 ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset, true); 680 if (ret < 0) { 681 nbd_put_io(nbd, io); 682 nbd->io_in_recv = NULL; 683 return ret; 684 } 685 686 io->offset += ret; 687 received += ret; 688 689 /* request payload is fully received */ 690 if (io->offset == io->payload_size) { 691 io->offset = 0; 692 io->state = NBD_IO_XMIT_RESP; 693 if (spdk_likely((!nbd->is_closing) && nbd->is_started)) { 694 TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq); 695 } else { 696 TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq); 697 nbd_io_done(NULL, false, io); 698 } 699 nbd->io_in_recv = NULL; 700 } 701 702 } 703 704 return received; 705 } 706 707 static int 708 nbd_io_recv(struct spdk_nbd_disk *nbd) 709 { 710 int i, rc, ret = 0; 711 712 /* 713 * nbd server should not accept request after closing command 714 */ 715 if (nbd->is_closing) { 716 return 0; 717 } 718 719 for (i = 0; i < GET_IO_LOOP_COUNT; i++) { 720 rc = nbd_io_recv_internal(nbd); 721 if (rc < 0) { 722 return rc; 723 } 724 ret += rc; 725 if (nbd->is_closing) { 726 break; 727 } 728 } 729 730 return ret; 731 } 732 733 static int 734 nbd_io_xmit_internal(struct spdk_nbd_disk *nbd) 735 { 736 struct nbd_io *io; 737 int ret = 0; 738 int sent = 0; 739 740 io = TAILQ_FIRST(&nbd->executed_io_list); 741 if (io == NULL) { 742 return 0; 743 } 744 745 /* Remove IO from list now assuming it will be completed. It will be inserted 746 * back to the head if it cannot be completed. This approach is specifically 747 * taken to work around a scan-build use-after-free mischaracterization. 748 */ 749 TAILQ_REMOVE(&nbd->executed_io_list, io, tailq); 750 751 /* resp error and handler are already set in io_done */ 752 753 if (io->state == NBD_IO_XMIT_RESP) { 754 ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->resp + io->offset, 755 sizeof(io->resp) - io->offset, false); 756 if (ret <= 0) { 757 goto reinsert; 758 } 759 760 io->offset += ret; 761 sent = ret; 762 763 /* response is fully transmitted */ 764 if (io->offset == sizeof(io->resp)) { 765 io->offset = 0; 766 767 /* transmit payload only when NBD_CMD_READ with no resp error */ 768 if (from_be32(&io->req.type) != NBD_CMD_READ || io->resp.error != 0) { 769 nbd_put_io(nbd, io); 770 return 0; 771 } else { 772 io->state = NBD_IO_XMIT_PAYLOAD; 773 } 774 } 775 } 776 777 if (io->state == NBD_IO_XMIT_PAYLOAD) { 778 ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset, 779 false); 780 if (ret <= 0) { 781 goto reinsert; 782 } 783 784 io->offset += ret; 785 sent += ret; 786 787 /* read payload is fully transmitted */ 788 if (io->offset == io->payload_size) { 789 nbd_put_io(nbd, io); 790 return sent; 791 } 792 } 793 794 reinsert: 795 TAILQ_INSERT_HEAD(&nbd->executed_io_list, io, tailq); 796 return ret < 0 ? ret : sent; 797 } 798 799 static int 800 nbd_io_xmit(struct spdk_nbd_disk *nbd) 801 { 802 int ret = 0; 803 int rc; 804 805 while (!TAILQ_EMPTY(&nbd->executed_io_list)) { 806 rc = nbd_io_xmit_internal(nbd); 807 if (rc < 0) { 808 return rc; 809 } 810 811 ret += rc; 812 } 813 814 /* When there begins to have no executed_io, disable socket writable notice */ 815 if (nbd->interrupt_mode) { 816 spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN); 817 } 818 819 return ret; 820 } 821 822 /** 823 * Poll an NBD instance. 824 * 825 * \return 0 on success or negated errno values on error (e.g. connection closed). 826 */ 827 static int 828 _nbd_poll(struct spdk_nbd_disk *nbd) 829 { 830 int received, sent, executed; 831 832 /* transmit executed io first */ 833 sent = nbd_io_xmit(nbd); 834 if (sent < 0) { 835 return sent; 836 } 837 838 received = nbd_io_recv(nbd); 839 if (received < 0) { 840 return received; 841 } 842 843 executed = nbd_io_exec(nbd); 844 if (executed < 0) { 845 return executed; 846 } 847 848 return sent + received + executed; 849 } 850 851 static int 852 nbd_poll(void *arg) 853 { 854 struct spdk_nbd_disk *nbd = arg; 855 int rc; 856 857 rc = _nbd_poll(nbd); 858 if (rc < 0) { 859 SPDK_INFOLOG(nbd, "nbd_poll() returned %s (%d); closing connection\n", 860 spdk_strerror(-rc), rc); 861 _nbd_stop(nbd); 862 return SPDK_POLLER_IDLE; 863 } 864 if (nbd->is_closing) { 865 spdk_nbd_stop(nbd); 866 } 867 868 return SPDK_POLLER_BUSY; 869 } 870 871 static void * 872 nbd_start_kernel(void *arg) 873 { 874 struct spdk_nbd_disk *nbd = arg; 875 876 spdk_unaffinitize_thread(); 877 878 /* This will block in the kernel until we close the spdk_sp_fd. */ 879 ioctl(nbd->dev_fd, NBD_DO_IT); 880 881 nbd->has_nbd_pthread = false; 882 883 pthread_exit(NULL); 884 } 885 886 static void 887 nbd_bdev_hot_remove(struct spdk_nbd_disk *nbd) 888 { 889 struct nbd_io *io, *io_tmp; 890 891 nbd->is_closing = true; 892 nbd_cleanup_io(nbd); 893 894 if (!TAILQ_EMPTY(&nbd->received_io_list)) { 895 TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) { 896 TAILQ_REMOVE(&nbd->received_io_list, io, tailq); 897 TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq); 898 } 899 } 900 if (!TAILQ_EMPTY(&nbd->processing_io_list)) { 901 TAILQ_FOREACH_SAFE(io, &nbd->processing_io_list, tailq, io_tmp) { 902 nbd_io_done(NULL, false, io); 903 } 904 } 905 } 906 907 static void 908 nbd_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, 909 void *event_ctx) 910 { 911 switch (type) { 912 case SPDK_BDEV_EVENT_REMOVE: 913 nbd_bdev_hot_remove(event_ctx); 914 break; 915 default: 916 SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); 917 break; 918 } 919 } 920 921 struct spdk_nbd_start_ctx { 922 struct spdk_nbd_disk *nbd; 923 spdk_nbd_start_cb cb_fn; 924 void *cb_arg; 925 }; 926 927 static void 928 nbd_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 929 { 930 struct spdk_nbd_disk *nbd = cb_arg; 931 932 nbd->interrupt_mode = interrupt_mode; 933 } 934 935 static void 936 nbd_start_complete(struct spdk_nbd_start_ctx *ctx) 937 { 938 int rc; 939 pthread_t tid; 940 unsigned long nbd_flags = 0; 941 942 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_BLKSIZE, spdk_bdev_get_block_size(ctx->nbd->bdev)); 943 if (rc == -1) { 944 SPDK_ERRLOG("ioctl(NBD_SET_BLKSIZE) failed: %s\n", spdk_strerror(errno)); 945 rc = -errno; 946 goto err; 947 } 948 949 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SIZE_BLOCKS, spdk_bdev_get_num_blocks(ctx->nbd->bdev)); 950 if (rc == -1) { 951 SPDK_ERRLOG("ioctl(NBD_SET_SIZE_BLOCKS) failed: %s\n", spdk_strerror(errno)); 952 rc = -errno; 953 goto err; 954 } 955 956 #ifdef NBD_SET_TIMEOUT 957 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_TIMEOUT, NBD_IO_TIMEOUT_S); 958 if (rc == -1) { 959 SPDK_ERRLOG("ioctl(NBD_SET_TIMEOUT) failed: %s\n", spdk_strerror(errno)); 960 rc = -errno; 961 goto err; 962 } 963 #else 964 SPDK_NOTICELOG("ioctl(NBD_SET_TIMEOUT) is not supported.\n"); 965 #endif 966 967 #ifdef NBD_FLAG_SEND_FLUSH 968 nbd_flags |= NBD_FLAG_SEND_FLUSH; 969 #endif 970 #ifdef NBD_FLAG_SEND_TRIM 971 nbd_flags |= NBD_FLAG_SEND_TRIM; 972 #endif 973 if (nbd_flags) { 974 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_FLAGS, nbd_flags); 975 if (rc == -1) { 976 SPDK_ERRLOG("ioctl(NBD_SET_FLAGS, 0x%lx) failed: %s\n", nbd_flags, spdk_strerror(errno)); 977 rc = -errno; 978 goto err; 979 } 980 } 981 982 ctx->nbd->has_nbd_pthread = true; 983 rc = pthread_create(&tid, NULL, nbd_start_kernel, ctx->nbd); 984 if (rc != 0) { 985 ctx->nbd->has_nbd_pthread = false; 986 SPDK_ERRLOG("could not create thread: %s\n", spdk_strerror(rc)); 987 rc = -rc; 988 goto err; 989 } 990 991 rc = pthread_detach(tid); 992 if (rc != 0) { 993 SPDK_ERRLOG("could not detach thread for nbd kernel: %s\n", spdk_strerror(rc)); 994 rc = -rc; 995 goto err; 996 } 997 998 if (spdk_interrupt_mode_is_enabled()) { 999 ctx->nbd->intr = SPDK_INTERRUPT_REGISTER(ctx->nbd->spdk_sp_fd, nbd_poll, ctx->nbd); 1000 } 1001 1002 ctx->nbd->nbd_poller = SPDK_POLLER_REGISTER(nbd_poll, ctx->nbd, 0); 1003 spdk_poller_register_interrupt(ctx->nbd->nbd_poller, nbd_poller_set_interrupt_mode, ctx->nbd); 1004 1005 if (ctx->cb_fn) { 1006 ctx->cb_fn(ctx->cb_arg, ctx->nbd, 0); 1007 } 1008 1009 /* nbd will possibly receive stop command while initing */ 1010 ctx->nbd->is_started = true; 1011 1012 free(ctx); 1013 return; 1014 1015 err: 1016 _nbd_stop(ctx->nbd); 1017 if (ctx->cb_fn) { 1018 ctx->cb_fn(ctx->cb_arg, NULL, rc); 1019 } 1020 free(ctx); 1021 } 1022 1023 static int 1024 nbd_enable_kernel(void *arg) 1025 { 1026 struct spdk_nbd_start_ctx *ctx = arg; 1027 int rc; 1028 1029 /* Declare device setup by this process */ 1030 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SOCK, ctx->nbd->kernel_sp_fd); 1031 1032 if (rc) { 1033 if (errno == EBUSY) { 1034 if (ctx->nbd->retry_poller == NULL) { 1035 ctx->nbd->retry_count = NBD_START_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US; 1036 ctx->nbd->retry_poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx, 1037 NBD_BUSY_POLLING_INTERVAL_US); 1038 return SPDK_POLLER_BUSY; 1039 } else if (ctx->nbd->retry_count-- > 0) { 1040 /* Repeatedly unregister and register retry poller to avoid scan-build error */ 1041 spdk_poller_unregister(&ctx->nbd->retry_poller); 1042 ctx->nbd->retry_poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx, 1043 NBD_BUSY_POLLING_INTERVAL_US); 1044 return SPDK_POLLER_BUSY; 1045 } 1046 } 1047 1048 SPDK_ERRLOG("ioctl(NBD_SET_SOCK) failed: %s\n", spdk_strerror(errno)); 1049 if (ctx->nbd->retry_poller) { 1050 spdk_poller_unregister(&ctx->nbd->retry_poller); 1051 } 1052 1053 _nbd_stop(ctx->nbd); 1054 1055 if (ctx->cb_fn) { 1056 ctx->cb_fn(ctx->cb_arg, NULL, -errno); 1057 } 1058 1059 free(ctx); 1060 return SPDK_POLLER_BUSY; 1061 } 1062 1063 if (ctx->nbd->retry_poller) { 1064 spdk_poller_unregister(&ctx->nbd->retry_poller); 1065 } 1066 1067 nbd_start_complete(ctx); 1068 1069 return SPDK_POLLER_BUSY; 1070 } 1071 1072 void 1073 spdk_nbd_start(const char *bdev_name, const char *nbd_path, 1074 spdk_nbd_start_cb cb_fn, void *cb_arg) 1075 { 1076 struct spdk_nbd_start_ctx *ctx = NULL; 1077 struct spdk_nbd_disk *nbd = NULL; 1078 struct spdk_bdev *bdev; 1079 int rc; 1080 int sp[2]; 1081 1082 nbd = calloc(1, sizeof(*nbd)); 1083 if (nbd == NULL) { 1084 rc = -ENOMEM; 1085 goto err; 1086 } 1087 1088 nbd->dev_fd = -1; 1089 nbd->spdk_sp_fd = -1; 1090 nbd->kernel_sp_fd = -1; 1091 1092 ctx = calloc(1, sizeof(*ctx)); 1093 if (ctx == NULL) { 1094 rc = -ENOMEM; 1095 goto err; 1096 } 1097 1098 ctx->nbd = nbd; 1099 ctx->cb_fn = cb_fn; 1100 ctx->cb_arg = cb_arg; 1101 1102 rc = spdk_bdev_open_ext(bdev_name, true, nbd_bdev_event_cb, nbd, &nbd->bdev_desc); 1103 if (rc != 0) { 1104 SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc); 1105 goto err; 1106 } 1107 1108 bdev = spdk_bdev_desc_get_bdev(nbd->bdev_desc); 1109 nbd->bdev = bdev; 1110 1111 nbd->ch = spdk_bdev_get_io_channel(nbd->bdev_desc); 1112 nbd->buf_align = spdk_max(spdk_bdev_get_buf_align(bdev), 64); 1113 1114 rc = socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, sp); 1115 if (rc != 0) { 1116 SPDK_ERRLOG("socketpair failed\n"); 1117 rc = -errno; 1118 goto err; 1119 } 1120 1121 nbd->spdk_sp_fd = sp[0]; 1122 nbd->kernel_sp_fd = sp[1]; 1123 nbd->nbd_path = strdup(nbd_path); 1124 if (!nbd->nbd_path) { 1125 SPDK_ERRLOG("strdup allocation failure\n"); 1126 rc = -ENOMEM; 1127 goto err; 1128 } 1129 1130 TAILQ_INIT(&nbd->received_io_list); 1131 TAILQ_INIT(&nbd->executed_io_list); 1132 TAILQ_INIT(&nbd->processing_io_list); 1133 1134 /* Add nbd_disk to the end of disk list */ 1135 rc = nbd_disk_register(ctx->nbd); 1136 if (rc != 0) { 1137 goto err; 1138 } 1139 1140 nbd->dev_fd = open(nbd_path, O_RDWR | O_DIRECT); 1141 if (nbd->dev_fd == -1) { 1142 SPDK_ERRLOG("open(\"%s\") failed: %s\n", nbd_path, spdk_strerror(errno)); 1143 rc = -errno; 1144 goto err; 1145 } 1146 1147 SPDK_INFOLOG(nbd, "Enabling kernel access to bdev %s via %s\n", 1148 bdev_name, nbd_path); 1149 1150 nbd_enable_kernel(ctx); 1151 return; 1152 1153 err: 1154 free(ctx); 1155 if (nbd) { 1156 _nbd_stop(nbd); 1157 } 1158 1159 if (cb_fn) { 1160 cb_fn(cb_arg, NULL, rc); 1161 } 1162 } 1163 1164 const char * 1165 spdk_nbd_get_path(struct spdk_nbd_disk *nbd) 1166 { 1167 return nbd->nbd_path; 1168 } 1169 1170 SPDK_LOG_REGISTER_COMPONENT(nbd) 1171