1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 #include "spdk/string.h" 36 37 #include <linux/nbd.h> 38 39 #include "spdk/nbd.h" 40 #include "nbd_internal.h" 41 #include "spdk/bdev.h" 42 #include "spdk/endian.h" 43 #include "spdk/env.h" 44 #include "spdk/likely.h" 45 #include "spdk/log.h" 46 #include "spdk/util.h" 47 #include "spdk/thread.h" 48 49 #include "spdk/queue.h" 50 51 #define GET_IO_LOOP_COUNT 16 52 #define NBD_START_BUSY_WAITING_MS 1000 53 #define NBD_STOP_BUSY_WAITING_MS 10000 54 #define NBD_BUSY_POLLING_INTERVAL_US 20000 55 #define NBD_IO_TIMEOUT_S 60 56 57 enum nbd_io_state_t { 58 /* Receiving or ready to receive nbd request header */ 59 NBD_IO_RECV_REQ = 0, 60 /* Receiving write payload */ 61 NBD_IO_RECV_PAYLOAD, 62 /* Transmitting or ready to transmit nbd response header */ 63 NBD_IO_XMIT_RESP, 64 /* Transmitting read payload */ 65 NBD_IO_XMIT_PAYLOAD, 66 }; 67 68 struct nbd_io { 69 struct spdk_nbd_disk *nbd; 70 enum nbd_io_state_t state; 71 72 void *payload; 73 uint32_t payload_size; 74 75 struct nbd_request req; 76 struct nbd_reply resp; 77 78 /* 79 * Tracks current progress on reading/writing a request, 80 * response, or payload from the nbd socket. 81 */ 82 uint32_t offset; 83 84 /* for bdev io_wait */ 85 struct spdk_bdev_io_wait_entry bdev_io_wait; 86 87 TAILQ_ENTRY(nbd_io) tailq; 88 }; 89 90 struct spdk_nbd_disk { 91 struct spdk_bdev *bdev; 92 struct spdk_bdev_desc *bdev_desc; 93 struct spdk_io_channel *ch; 94 int dev_fd; 95 char *nbd_path; 96 int kernel_sp_fd; 97 int spdk_sp_fd; 98 struct spdk_poller *nbd_poller; 99 struct spdk_interrupt *intr; 100 bool interrupt_mode; 101 uint32_t buf_align; 102 103 struct spdk_poller *retry_poller; 104 int retry_count; 105 /* Synchronize nbd_start_kernel pthread and nbd_stop */ 106 bool has_nbd_pthread; 107 108 struct nbd_io *io_in_recv; 109 TAILQ_HEAD(, nbd_io) received_io_list; 110 TAILQ_HEAD(, nbd_io) executed_io_list; 111 112 bool is_started; 113 bool is_closing; 114 /* count of nbd_io in spdk_nbd_disk */ 115 int io_count; 116 117 TAILQ_ENTRY(spdk_nbd_disk) tailq; 118 }; 119 120 struct spdk_nbd_disk_globals { 121 TAILQ_HEAD(, spdk_nbd_disk) disk_head; 122 }; 123 124 static struct spdk_nbd_disk_globals g_spdk_nbd; 125 static spdk_nbd_fini_cb g_fini_cb_fn; 126 static void *g_fini_cb_arg; 127 128 static void _nbd_fini(void *arg1); 129 130 static int 131 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io); 132 static int 133 nbd_io_recv_internal(struct spdk_nbd_disk *nbd); 134 135 int 136 spdk_nbd_init(void) 137 { 138 TAILQ_INIT(&g_spdk_nbd.disk_head); 139 140 return 0; 141 } 142 143 static void 144 _nbd_fini(void *arg1) 145 { 146 struct spdk_nbd_disk *nbd, *nbd_tmp; 147 148 TAILQ_FOREACH_SAFE(nbd, &g_spdk_nbd.disk_head, tailq, nbd_tmp) { 149 if (!nbd->is_closing) { 150 spdk_nbd_stop(nbd); 151 } 152 } 153 154 /* Check if all nbds closed */ 155 if (!TAILQ_FIRST(&g_spdk_nbd.disk_head)) { 156 g_fini_cb_fn(g_fini_cb_arg); 157 } else { 158 spdk_thread_send_msg(spdk_get_thread(), 159 _nbd_fini, NULL); 160 } 161 } 162 163 void 164 spdk_nbd_fini(spdk_nbd_fini_cb cb_fn, void *cb_arg) 165 { 166 g_fini_cb_fn = cb_fn; 167 g_fini_cb_arg = cb_arg; 168 169 _nbd_fini(NULL); 170 } 171 172 static int 173 nbd_disk_register(struct spdk_nbd_disk *nbd) 174 { 175 /* Make sure nbd_path is not used in this SPDK app */ 176 if (nbd_disk_find_by_nbd_path(nbd->nbd_path)) { 177 SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path); 178 return -EBUSY; 179 } 180 181 TAILQ_INSERT_TAIL(&g_spdk_nbd.disk_head, nbd, tailq); 182 183 return 0; 184 } 185 186 static void 187 nbd_disk_unregister(struct spdk_nbd_disk *nbd) 188 { 189 struct spdk_nbd_disk *nbd_idx, *nbd_tmp; 190 191 /* 192 * nbd disk may be stopped before registered. 193 * check whether it was registered. 194 */ 195 TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) { 196 if (nbd == nbd_idx) { 197 TAILQ_REMOVE(&g_spdk_nbd.disk_head, nbd_idx, tailq); 198 break; 199 } 200 } 201 } 202 203 struct spdk_nbd_disk * 204 nbd_disk_find_by_nbd_path(const char *nbd_path) 205 { 206 struct spdk_nbd_disk *nbd; 207 208 /* 209 * check whether nbd has already been registered by nbd path. 210 */ 211 TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) { 212 if (!strcmp(nbd->nbd_path, nbd_path)) { 213 return nbd; 214 } 215 } 216 217 return NULL; 218 } 219 220 struct spdk_nbd_disk *nbd_disk_first(void) 221 { 222 return TAILQ_FIRST(&g_spdk_nbd.disk_head); 223 } 224 225 struct spdk_nbd_disk *nbd_disk_next(struct spdk_nbd_disk *prev) 226 { 227 return TAILQ_NEXT(prev, tailq); 228 } 229 230 const char * 231 nbd_disk_get_nbd_path(struct spdk_nbd_disk *nbd) 232 { 233 return nbd->nbd_path; 234 } 235 236 const char * 237 nbd_disk_get_bdev_name(struct spdk_nbd_disk *nbd) 238 { 239 return spdk_bdev_get_name(nbd->bdev); 240 } 241 242 void 243 spdk_nbd_write_config_json(struct spdk_json_write_ctx *w) 244 { 245 struct spdk_nbd_disk *nbd; 246 247 spdk_json_write_array_begin(w); 248 249 TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) { 250 spdk_json_write_object_begin(w); 251 252 spdk_json_write_named_string(w, "method", "nbd_start_disk"); 253 254 spdk_json_write_named_object_begin(w, "params"); 255 spdk_json_write_named_string(w, "nbd_device", nbd_disk_get_nbd_path(nbd)); 256 spdk_json_write_named_string(w, "bdev_name", nbd_disk_get_bdev_name(nbd)); 257 spdk_json_write_object_end(w); 258 259 spdk_json_write_object_end(w); 260 } 261 262 spdk_json_write_array_end(w); 263 } 264 265 void 266 nbd_disconnect(struct spdk_nbd_disk *nbd) 267 { 268 /* 269 * nbd soft-disconnection to terminate transmission phase. 270 * After receiving this ioctl command, nbd kernel module will send 271 * a NBD_CMD_DISC type io to nbd server in order to inform server. 272 */ 273 ioctl(nbd->dev_fd, NBD_DISCONNECT); 274 } 275 276 static struct nbd_io * 277 nbd_get_io(struct spdk_nbd_disk *nbd) 278 { 279 struct nbd_io *io; 280 281 io = calloc(1, sizeof(*io)); 282 if (!io) { 283 return NULL; 284 } 285 286 io->nbd = nbd; 287 to_be32(&io->resp.magic, NBD_REPLY_MAGIC); 288 289 nbd->io_count++; 290 291 return io; 292 } 293 294 static void 295 nbd_put_io(struct spdk_nbd_disk *nbd, struct nbd_io *io) 296 { 297 if (io->payload) { 298 spdk_free(io->payload); 299 } 300 free(io); 301 302 nbd->io_count--; 303 } 304 305 /* 306 * Check whether received nbd_io are all executed, 307 * and put back executed nbd_io instead of transmitting them 308 * 309 * \return 1 there is still some nbd_io under executing 310 * 0 all nbd_io gotten are freed. 311 */ 312 static int 313 nbd_cleanup_io(struct spdk_nbd_disk *nbd) 314 { 315 /* Try to read the remaining nbd commands in the socket */ 316 while (nbd_io_recv_internal(nbd) > 0); 317 318 /* free io_in_recv */ 319 if (nbd->io_in_recv != NULL) { 320 nbd_put_io(nbd, nbd->io_in_recv); 321 nbd->io_in_recv = NULL; 322 } 323 324 /* 325 * Some nbd_io may be under executing in bdev. 326 * Wait for their done operation. 327 */ 328 if (nbd->io_count != 0) { 329 return 1; 330 } 331 332 return 0; 333 } 334 335 static int 336 _nbd_stop(void *arg) 337 { 338 struct spdk_nbd_disk *nbd = arg; 339 340 if (nbd->nbd_poller) { 341 spdk_poller_unregister(&nbd->nbd_poller); 342 } 343 344 if (nbd->intr) { 345 spdk_interrupt_unregister(&nbd->intr); 346 } 347 348 if (nbd->spdk_sp_fd >= 0) { 349 close(nbd->spdk_sp_fd); 350 nbd->spdk_sp_fd = -1; 351 } 352 353 if (nbd->kernel_sp_fd >= 0) { 354 close(nbd->kernel_sp_fd); 355 nbd->kernel_sp_fd = -1; 356 } 357 358 /* Continue the stop procedure after the exit of nbd_start_kernel pthread */ 359 if (nbd->has_nbd_pthread) { 360 if (nbd->retry_poller == NULL) { 361 nbd->retry_count = NBD_STOP_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US; 362 nbd->retry_poller = SPDK_POLLER_REGISTER(_nbd_stop, nbd, 363 NBD_BUSY_POLLING_INTERVAL_US); 364 return SPDK_POLLER_BUSY; 365 } 366 367 if (nbd->retry_count-- > 0) { 368 return SPDK_POLLER_BUSY; 369 } 370 371 SPDK_ERRLOG("Failed to wait for returning of NBD_DO_IT ioctl.\n"); 372 } 373 374 if (nbd->retry_poller) { 375 spdk_poller_unregister(&nbd->retry_poller); 376 } 377 378 if (nbd->dev_fd >= 0) { 379 /* Clear nbd device only if it is occupied by SPDK app */ 380 if (nbd->nbd_path && nbd_disk_find_by_nbd_path(nbd->nbd_path)) { 381 ioctl(nbd->dev_fd, NBD_CLEAR_QUE); 382 ioctl(nbd->dev_fd, NBD_CLEAR_SOCK); 383 } 384 close(nbd->dev_fd); 385 } 386 387 if (nbd->nbd_path) { 388 free(nbd->nbd_path); 389 } 390 391 if (nbd->ch) { 392 spdk_put_io_channel(nbd->ch); 393 nbd->ch = NULL; 394 } 395 396 if (nbd->bdev_desc) { 397 spdk_bdev_close(nbd->bdev_desc); 398 nbd->bdev_desc = NULL; 399 } 400 401 nbd_disk_unregister(nbd); 402 403 free(nbd); 404 405 return 0; 406 } 407 408 int 409 spdk_nbd_stop(struct spdk_nbd_disk *nbd) 410 { 411 int rc = 0; 412 413 if (nbd == NULL) { 414 return rc; 415 } 416 417 nbd->is_closing = true; 418 419 /* if nbd is not started, it will continue to call nbd stop later */ 420 if (!nbd->is_started) { 421 return 1; 422 } 423 424 /* 425 * Stop action should be called only after all nbd_io are executed. 426 */ 427 428 rc = nbd_cleanup_io(nbd); 429 if (!rc) { 430 _nbd_stop(nbd); 431 } 432 433 return rc; 434 } 435 436 static int64_t 437 nbd_socket_rw(int fd, void *buf, size_t length, bool read_op) 438 { 439 ssize_t rc; 440 441 if (read_op) { 442 rc = read(fd, buf, length); 443 } else { 444 rc = write(fd, buf, length); 445 } 446 447 if (rc == 0) { 448 return -EIO; 449 } else if (rc == -1) { 450 if (errno != EAGAIN) { 451 return -errno; 452 } 453 return 0; 454 } else { 455 return rc; 456 } 457 } 458 459 static void 460 nbd_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 461 { 462 struct nbd_io *io = cb_arg; 463 struct spdk_nbd_disk *nbd = io->nbd; 464 465 if (success) { 466 io->resp.error = 0; 467 } else { 468 to_be32(&io->resp.error, EIO); 469 } 470 471 memcpy(&io->resp.handle, &io->req.handle, sizeof(io->resp.handle)); 472 473 /* When there begins to have executed_io, enable socket writable notice in order to 474 * get it processed in nbd_io_xmit 475 */ 476 if (nbd->interrupt_mode && TAILQ_EMPTY(&nbd->executed_io_list)) { 477 spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT); 478 } 479 480 TAILQ_INSERT_TAIL(&nbd->executed_io_list, io, tailq); 481 482 if (bdev_io != NULL) { 483 spdk_bdev_free_io(bdev_io); 484 } 485 } 486 487 static void 488 nbd_resubmit_io(void *arg) 489 { 490 struct nbd_io *io = (struct nbd_io *)arg; 491 struct spdk_nbd_disk *nbd = io->nbd; 492 int rc = 0; 493 494 rc = nbd_submit_bdev_io(nbd, io); 495 if (rc) { 496 SPDK_INFOLOG(nbd, "nbd: io resubmit for dev %s , io_type %d, returned %d.\n", 497 nbd_disk_get_bdev_name(nbd), from_be32(&io->req.type), rc); 498 } 499 } 500 501 static void 502 nbd_queue_io(struct nbd_io *io) 503 { 504 int rc; 505 struct spdk_bdev *bdev = io->nbd->bdev; 506 507 io->bdev_io_wait.bdev = bdev; 508 io->bdev_io_wait.cb_fn = nbd_resubmit_io; 509 io->bdev_io_wait.cb_arg = io; 510 511 rc = spdk_bdev_queue_io_wait(bdev, io->nbd->ch, &io->bdev_io_wait); 512 if (rc != 0) { 513 SPDK_ERRLOG("Queue io failed in nbd_queue_io, rc=%d.\n", rc); 514 nbd_io_done(NULL, false, io); 515 } 516 } 517 518 static int 519 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io) 520 { 521 struct spdk_bdev_desc *desc = nbd->bdev_desc; 522 struct spdk_io_channel *ch = nbd->ch; 523 int rc = 0; 524 525 switch (from_be32(&io->req.type)) { 526 case NBD_CMD_READ: 527 rc = spdk_bdev_read(desc, ch, io->payload, from_be64(&io->req.from), 528 io->payload_size, nbd_io_done, io); 529 break; 530 case NBD_CMD_WRITE: 531 rc = spdk_bdev_write(desc, ch, io->payload, from_be64(&io->req.from), 532 io->payload_size, nbd_io_done, io); 533 break; 534 #ifdef NBD_FLAG_SEND_FLUSH 535 case NBD_CMD_FLUSH: 536 rc = spdk_bdev_flush(desc, ch, 0, 537 spdk_bdev_get_num_blocks(nbd->bdev) * spdk_bdev_get_block_size(nbd->bdev), 538 nbd_io_done, io); 539 break; 540 #endif 541 #ifdef NBD_FLAG_SEND_TRIM 542 case NBD_CMD_TRIM: 543 rc = spdk_bdev_unmap(desc, ch, from_be64(&io->req.from), 544 from_be32(&io->req.len), nbd_io_done, io); 545 break; 546 #endif 547 case NBD_CMD_DISC: 548 nbd->is_closing = true; 549 rc = spdk_bdev_abort(desc, ch, io, nbd_io_done, io); 550 551 /* when there begins to have executed_io to send, enable socket writable notice */ 552 if (nbd->interrupt_mode && TAILQ_EMPTY(&nbd->executed_io_list)) { 553 spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT); 554 } 555 556 break; 557 default: 558 rc = -1; 559 } 560 561 if (rc < 0) { 562 if (rc == -ENOMEM) { 563 SPDK_INFOLOG(nbd, "No memory, start to queue io.\n"); 564 nbd_queue_io(io); 565 } else { 566 SPDK_ERRLOG("nbd io failed in nbd_queue_io, rc=%d.\n", rc); 567 nbd_io_done(NULL, false, io); 568 } 569 } 570 571 return 0; 572 } 573 574 static int 575 nbd_io_exec(struct spdk_nbd_disk *nbd) 576 { 577 struct nbd_io *io, *io_tmp; 578 int io_count = 0; 579 int ret = 0; 580 581 if (!TAILQ_EMPTY(&nbd->received_io_list)) { 582 TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) { 583 TAILQ_REMOVE(&nbd->received_io_list, io, tailq); 584 ret = nbd_submit_bdev_io(nbd, io); 585 if (ret < 0) { 586 return ret; 587 } 588 589 io_count++; 590 } 591 } 592 593 return io_count; 594 } 595 596 static int 597 nbd_io_recv_internal(struct spdk_nbd_disk *nbd) 598 { 599 struct nbd_io *io; 600 int ret = 0; 601 int received = 0; 602 603 if (nbd->io_in_recv == NULL) { 604 nbd->io_in_recv = nbd_get_io(nbd); 605 if (!nbd->io_in_recv) { 606 return -ENOMEM; 607 } 608 } 609 610 io = nbd->io_in_recv; 611 612 if (io->state == NBD_IO_RECV_REQ) { 613 ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->req + io->offset, 614 sizeof(io->req) - io->offset, true); 615 if (ret < 0) { 616 nbd_put_io(nbd, io); 617 nbd->io_in_recv = NULL; 618 return ret; 619 } 620 621 io->offset += ret; 622 received = ret; 623 624 /* request is fully received */ 625 if (io->offset == sizeof(io->req)) { 626 io->offset = 0; 627 628 /* req magic check */ 629 if (from_be32(&io->req.magic) != NBD_REQUEST_MAGIC) { 630 SPDK_ERRLOG("invalid request magic\n"); 631 nbd_put_io(nbd, io); 632 nbd->io_in_recv = NULL; 633 return -EINVAL; 634 } 635 636 /* io except read/write should ignore payload */ 637 if (from_be32(&io->req.type) == NBD_CMD_WRITE || 638 from_be32(&io->req.type) == NBD_CMD_READ) { 639 io->payload_size = from_be32(&io->req.len); 640 } else { 641 io->payload_size = 0; 642 } 643 644 /* io payload allocate */ 645 if (io->payload_size) { 646 io->payload = spdk_malloc(io->payload_size, nbd->buf_align, NULL, 647 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 648 if (io->payload == NULL) { 649 SPDK_ERRLOG("could not allocate io->payload of size %d\n", io->payload_size); 650 nbd_put_io(nbd, io); 651 nbd->io_in_recv = NULL; 652 return -ENOMEM; 653 } 654 } else { 655 io->payload = NULL; 656 } 657 658 /* next io step */ 659 if (from_be32(&io->req.type) == NBD_CMD_WRITE) { 660 io->state = NBD_IO_RECV_PAYLOAD; 661 } else { 662 io->state = NBD_IO_XMIT_RESP; 663 if (spdk_likely((!nbd->is_closing) && nbd->is_started)) { 664 TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq); 665 } else { 666 nbd_io_done(NULL, false, io); 667 } 668 nbd->io_in_recv = NULL; 669 } 670 } 671 } 672 673 if (io->state == NBD_IO_RECV_PAYLOAD) { 674 ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset, true); 675 if (ret < 0) { 676 nbd_put_io(nbd, io); 677 nbd->io_in_recv = NULL; 678 return ret; 679 } 680 681 io->offset += ret; 682 received += ret; 683 684 /* request payload is fully received */ 685 if (io->offset == io->payload_size) { 686 io->offset = 0; 687 io->state = NBD_IO_XMIT_RESP; 688 if (spdk_likely((!nbd->is_closing) && nbd->is_started)) { 689 TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq); 690 } else { 691 nbd_io_done(NULL, false, io); 692 } 693 nbd->io_in_recv = NULL; 694 } 695 696 } 697 698 return received; 699 } 700 701 static int 702 nbd_io_recv(struct spdk_nbd_disk *nbd) 703 { 704 int i, rc, ret = 0; 705 706 /* 707 * nbd server should not accept request after closing command 708 */ 709 if (nbd->is_closing) { 710 return 0; 711 } 712 713 for (i = 0; i < GET_IO_LOOP_COUNT; i++) { 714 rc = nbd_io_recv_internal(nbd); 715 if (rc < 0) { 716 return rc; 717 } 718 ret += rc; 719 } 720 721 return ret; 722 } 723 724 static int 725 nbd_io_xmit_internal(struct spdk_nbd_disk *nbd) 726 { 727 struct nbd_io *io; 728 int ret = 0; 729 int sent = 0; 730 731 io = TAILQ_FIRST(&nbd->executed_io_list); 732 if (io == NULL) { 733 return 0; 734 } 735 736 /* Remove IO from list now assuming it will be completed. It will be inserted 737 * back to the head if it cannot be completed. This approach is specifically 738 * taken to work around a scan-build use-after-free mischaracterization. 739 */ 740 TAILQ_REMOVE(&nbd->executed_io_list, io, tailq); 741 742 /* resp error and handler are already set in io_done */ 743 744 if (io->state == NBD_IO_XMIT_RESP) { 745 ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->resp + io->offset, 746 sizeof(io->resp) - io->offset, false); 747 if (ret <= 0) { 748 goto reinsert; 749 } 750 751 io->offset += ret; 752 sent = ret; 753 754 /* response is fully transmitted */ 755 if (io->offset == sizeof(io->resp)) { 756 io->offset = 0; 757 758 /* transmit payload only when NBD_CMD_READ with no resp error */ 759 if (from_be32(&io->req.type) != NBD_CMD_READ || io->resp.error != 0) { 760 nbd_put_io(nbd, io); 761 return 0; 762 } else { 763 io->state = NBD_IO_XMIT_PAYLOAD; 764 } 765 } 766 } 767 768 if (io->state == NBD_IO_XMIT_PAYLOAD) { 769 ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset, 770 false); 771 if (ret <= 0) { 772 goto reinsert; 773 } 774 775 io->offset += ret; 776 sent += ret; 777 778 /* read payload is fully transmitted */ 779 if (io->offset == io->payload_size) { 780 nbd_put_io(nbd, io); 781 return sent; 782 } 783 } 784 785 reinsert: 786 TAILQ_INSERT_HEAD(&nbd->executed_io_list, io, tailq); 787 return ret < 0 ? ret : sent; 788 } 789 790 static int 791 nbd_io_xmit(struct spdk_nbd_disk *nbd) 792 { 793 int ret = 0; 794 int rc; 795 796 while (!TAILQ_EMPTY(&nbd->executed_io_list)) { 797 rc = nbd_io_xmit_internal(nbd); 798 if (rc < 0) { 799 return rc; 800 } 801 802 ret += rc; 803 } 804 805 /* When there begins to have no executed_io, disable socket writable notice */ 806 if (nbd->interrupt_mode) { 807 spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN); 808 } 809 810 return ret; 811 } 812 813 /** 814 * Poll an NBD instance. 815 * 816 * \return 0 on success or negated errno values on error (e.g. connection closed). 817 */ 818 static int 819 _nbd_poll(struct spdk_nbd_disk *nbd) 820 { 821 int received, sent, executed; 822 823 /* transmit executed io first */ 824 sent = nbd_io_xmit(nbd); 825 if (sent < 0) { 826 return sent; 827 } 828 829 received = nbd_io_recv(nbd); 830 if (received < 0) { 831 return received; 832 } 833 834 executed = nbd_io_exec(nbd); 835 if (executed < 0) { 836 return executed; 837 } 838 839 return sent + received + executed; 840 } 841 842 static int 843 nbd_poll(void *arg) 844 { 845 struct spdk_nbd_disk *nbd = arg; 846 int rc; 847 848 rc = _nbd_poll(nbd); 849 if (rc < 0) { 850 SPDK_INFOLOG(nbd, "nbd_poll() returned %s (%d); closing connection\n", 851 spdk_strerror(-rc), rc); 852 _nbd_stop(nbd); 853 return SPDK_POLLER_IDLE; 854 } 855 if (nbd->is_closing) { 856 spdk_nbd_stop(nbd); 857 } 858 859 return SPDK_POLLER_BUSY; 860 } 861 862 static void * 863 nbd_start_kernel(void *arg) 864 { 865 struct spdk_nbd_disk *nbd = arg; 866 867 spdk_unaffinitize_thread(); 868 869 /* This will block in the kernel until we close the spdk_sp_fd. */ 870 ioctl(nbd->dev_fd, NBD_DO_IT); 871 872 nbd->has_nbd_pthread = false; 873 874 pthread_exit(NULL); 875 } 876 877 static void 878 nbd_bdev_hot_remove(struct spdk_nbd_disk *nbd) 879 { 880 spdk_nbd_stop(nbd); 881 } 882 883 static void 884 nbd_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, 885 void *event_ctx) 886 { 887 switch (type) { 888 case SPDK_BDEV_EVENT_REMOVE: 889 nbd_bdev_hot_remove(event_ctx); 890 break; 891 default: 892 SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); 893 break; 894 } 895 } 896 897 struct spdk_nbd_start_ctx { 898 struct spdk_nbd_disk *nbd; 899 spdk_nbd_start_cb cb_fn; 900 void *cb_arg; 901 }; 902 903 static void 904 nbd_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 905 { 906 struct spdk_nbd_disk *nbd = cb_arg; 907 908 nbd->interrupt_mode = interrupt_mode; 909 } 910 911 static void 912 nbd_start_complete(struct spdk_nbd_start_ctx *ctx) 913 { 914 int rc; 915 pthread_t tid; 916 unsigned long nbd_flags = 0; 917 918 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_BLKSIZE, spdk_bdev_get_block_size(ctx->nbd->bdev)); 919 if (rc == -1) { 920 SPDK_ERRLOG("ioctl(NBD_SET_BLKSIZE) failed: %s\n", spdk_strerror(errno)); 921 rc = -errno; 922 goto err; 923 } 924 925 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SIZE_BLOCKS, spdk_bdev_get_num_blocks(ctx->nbd->bdev)); 926 if (rc == -1) { 927 SPDK_ERRLOG("ioctl(NBD_SET_SIZE_BLOCKS) failed: %s\n", spdk_strerror(errno)); 928 rc = -errno; 929 goto err; 930 } 931 932 #ifdef NBD_SET_TIMEOUT 933 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_TIMEOUT, NBD_IO_TIMEOUT_S); 934 if (rc == -1) { 935 SPDK_ERRLOG("ioctl(NBD_SET_TIMEOUT) failed: %s\n", spdk_strerror(errno)); 936 rc = -errno; 937 goto err; 938 } 939 #else 940 SPDK_NOTICELOG("ioctl(NBD_SET_TIMEOUT) is not supported.\n"); 941 #endif 942 943 #ifdef NBD_FLAG_SEND_FLUSH 944 nbd_flags |= NBD_FLAG_SEND_FLUSH; 945 #endif 946 #ifdef NBD_FLAG_SEND_TRIM 947 nbd_flags |= NBD_FLAG_SEND_TRIM; 948 #endif 949 if (nbd_flags) { 950 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_FLAGS, nbd_flags); 951 if (rc == -1) { 952 SPDK_ERRLOG("ioctl(NBD_SET_FLAGS, 0x%lx) failed: %s\n", nbd_flags, spdk_strerror(errno)); 953 rc = -errno; 954 goto err; 955 } 956 } 957 958 ctx->nbd->has_nbd_pthread = true; 959 rc = pthread_create(&tid, NULL, nbd_start_kernel, ctx->nbd); 960 if (rc != 0) { 961 ctx->nbd->has_nbd_pthread = false; 962 SPDK_ERRLOG("could not create thread: %s\n", spdk_strerror(rc)); 963 rc = -rc; 964 goto err; 965 } 966 967 rc = pthread_detach(tid); 968 if (rc != 0) { 969 SPDK_ERRLOG("could not detach thread for nbd kernel: %s\n", spdk_strerror(rc)); 970 rc = -rc; 971 goto err; 972 } 973 974 if (spdk_interrupt_mode_is_enabled()) { 975 ctx->nbd->intr = SPDK_INTERRUPT_REGISTER(ctx->nbd->spdk_sp_fd, nbd_poll, ctx->nbd); 976 } 977 978 ctx->nbd->nbd_poller = SPDK_POLLER_REGISTER(nbd_poll, ctx->nbd, 0); 979 spdk_poller_register_interrupt(ctx->nbd->nbd_poller, nbd_poller_set_interrupt_mode, ctx->nbd); 980 981 if (ctx->cb_fn) { 982 ctx->cb_fn(ctx->cb_arg, ctx->nbd, 0); 983 } 984 985 /* nbd will possibly receive stop command while initing */ 986 ctx->nbd->is_started = true; 987 988 free(ctx); 989 return; 990 991 err: 992 _nbd_stop(ctx->nbd); 993 if (ctx->cb_fn) { 994 ctx->cb_fn(ctx->cb_arg, NULL, rc); 995 } 996 free(ctx); 997 } 998 999 static int 1000 nbd_enable_kernel(void *arg) 1001 { 1002 struct spdk_nbd_start_ctx *ctx = arg; 1003 int rc; 1004 1005 /* Declare device setup by this process */ 1006 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SOCK, ctx->nbd->kernel_sp_fd); 1007 1008 if (rc) { 1009 if (errno == EBUSY) { 1010 if (ctx->nbd->retry_poller == NULL) { 1011 ctx->nbd->retry_count = NBD_START_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US; 1012 ctx->nbd->retry_poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx, 1013 NBD_BUSY_POLLING_INTERVAL_US); 1014 return SPDK_POLLER_BUSY; 1015 } else if (ctx->nbd->retry_count-- > 0) { 1016 /* Repeatedly unregiter and register retry poller to avoid scan-build error */ 1017 spdk_poller_unregister(&ctx->nbd->retry_poller); 1018 ctx->nbd->retry_poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx, 1019 NBD_BUSY_POLLING_INTERVAL_US); 1020 return SPDK_POLLER_BUSY; 1021 } 1022 } 1023 1024 SPDK_ERRLOG("ioctl(NBD_SET_SOCK) failed: %s\n", spdk_strerror(errno)); 1025 if (ctx->nbd->retry_poller) { 1026 spdk_poller_unregister(&ctx->nbd->retry_poller); 1027 } 1028 1029 _nbd_stop(ctx->nbd); 1030 1031 if (ctx->cb_fn) { 1032 ctx->cb_fn(ctx->cb_arg, NULL, -errno); 1033 } 1034 1035 free(ctx); 1036 return SPDK_POLLER_BUSY; 1037 } 1038 1039 if (ctx->nbd->retry_poller) { 1040 spdk_poller_unregister(&ctx->nbd->retry_poller); 1041 } 1042 1043 nbd_start_complete(ctx); 1044 1045 return SPDK_POLLER_BUSY; 1046 } 1047 1048 void 1049 spdk_nbd_start(const char *bdev_name, const char *nbd_path, 1050 spdk_nbd_start_cb cb_fn, void *cb_arg) 1051 { 1052 struct spdk_nbd_start_ctx *ctx = NULL; 1053 struct spdk_nbd_disk *nbd = NULL; 1054 struct spdk_bdev *bdev; 1055 int rc; 1056 int sp[2]; 1057 1058 nbd = calloc(1, sizeof(*nbd)); 1059 if (nbd == NULL) { 1060 rc = -ENOMEM; 1061 goto err; 1062 } 1063 1064 nbd->dev_fd = -1; 1065 nbd->spdk_sp_fd = -1; 1066 nbd->kernel_sp_fd = -1; 1067 1068 ctx = calloc(1, sizeof(*ctx)); 1069 if (ctx == NULL) { 1070 rc = -ENOMEM; 1071 goto err; 1072 } 1073 1074 ctx->nbd = nbd; 1075 ctx->cb_fn = cb_fn; 1076 ctx->cb_arg = cb_arg; 1077 1078 rc = spdk_bdev_open_ext(bdev_name, true, nbd_bdev_event_cb, nbd, &nbd->bdev_desc); 1079 if (rc != 0) { 1080 SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc); 1081 goto err; 1082 } 1083 1084 bdev = spdk_bdev_desc_get_bdev(nbd->bdev_desc); 1085 nbd->bdev = bdev; 1086 1087 nbd->ch = spdk_bdev_get_io_channel(nbd->bdev_desc); 1088 nbd->buf_align = spdk_max(spdk_bdev_get_buf_align(bdev), 64); 1089 1090 rc = socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, sp); 1091 if (rc != 0) { 1092 SPDK_ERRLOG("socketpair failed\n"); 1093 rc = -errno; 1094 goto err; 1095 } 1096 1097 nbd->spdk_sp_fd = sp[0]; 1098 nbd->kernel_sp_fd = sp[1]; 1099 nbd->nbd_path = strdup(nbd_path); 1100 if (!nbd->nbd_path) { 1101 SPDK_ERRLOG("strdup allocation failure\n"); 1102 rc = -ENOMEM; 1103 goto err; 1104 } 1105 1106 TAILQ_INIT(&nbd->received_io_list); 1107 TAILQ_INIT(&nbd->executed_io_list); 1108 1109 /* Add nbd_disk to the end of disk list */ 1110 rc = nbd_disk_register(ctx->nbd); 1111 if (rc != 0) { 1112 goto err; 1113 } 1114 1115 nbd->dev_fd = open(nbd_path, O_RDWR | O_DIRECT); 1116 if (nbd->dev_fd == -1) { 1117 SPDK_ERRLOG("open(\"%s\") failed: %s\n", nbd_path, spdk_strerror(errno)); 1118 rc = -errno; 1119 goto err; 1120 } 1121 1122 SPDK_INFOLOG(nbd, "Enabling kernel access to bdev %s via %s\n", 1123 bdev_name, nbd_path); 1124 1125 nbd_enable_kernel(ctx); 1126 return; 1127 1128 err: 1129 free(ctx); 1130 if (nbd) { 1131 _nbd_stop(nbd); 1132 } 1133 1134 if (cb_fn) { 1135 cb_fn(cb_arg, NULL, rc); 1136 } 1137 } 1138 1139 const char * 1140 spdk_nbd_get_path(struct spdk_nbd_disk *nbd) 1141 { 1142 return nbd->nbd_path; 1143 } 1144 1145 SPDK_LOG_REGISTER_COMPONENT(nbd) 1146