1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (c) Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/string.h" 8 9 #include <linux/nbd.h> 10 11 #include "spdk/nbd.h" 12 #include "nbd_internal.h" 13 #include "spdk/bdev.h" 14 #include "spdk/endian.h" 15 #include "spdk/env.h" 16 #include "spdk/likely.h" 17 #include "spdk/log.h" 18 #include "spdk/util.h" 19 #include "spdk/thread.h" 20 21 #include "spdk/queue.h" 22 23 #define GET_IO_LOOP_COUNT 16 24 #define NBD_START_BUSY_WAITING_MS 1000 25 #define NBD_STOP_BUSY_WAITING_MS 10000 26 #define NBD_BUSY_POLLING_INTERVAL_US 20000 27 #define NBD_IO_TIMEOUT_S 60 28 29 enum nbd_io_state_t { 30 /* Receiving or ready to receive nbd request header */ 31 NBD_IO_RECV_REQ = 0, 32 /* Receiving write payload */ 33 NBD_IO_RECV_PAYLOAD, 34 /* Transmitting or ready to transmit nbd response header */ 35 NBD_IO_XMIT_RESP, 36 /* Transmitting read payload */ 37 NBD_IO_XMIT_PAYLOAD, 38 }; 39 40 struct nbd_io { 41 struct spdk_nbd_disk *nbd; 42 enum nbd_io_state_t state; 43 44 void *payload; 45 uint32_t payload_size; 46 47 struct nbd_request req; 48 struct nbd_reply resp; 49 50 /* 51 * Tracks current progress on reading/writing a request, 52 * response, or payload from the nbd socket. 53 */ 54 uint32_t offset; 55 56 /* for bdev io_wait */ 57 struct spdk_bdev_io_wait_entry bdev_io_wait; 58 59 TAILQ_ENTRY(nbd_io) tailq; 60 }; 61 62 struct spdk_nbd_disk { 63 struct spdk_bdev *bdev; 64 struct spdk_bdev_desc *bdev_desc; 65 struct spdk_io_channel *ch; 66 int dev_fd; 67 char *nbd_path; 68 int kernel_sp_fd; 69 int spdk_sp_fd; 70 struct spdk_poller *nbd_poller; 71 struct spdk_interrupt *intr; 72 bool interrupt_mode; 73 uint32_t buf_align; 74 75 struct spdk_poller *retry_poller; 76 int retry_count; 77 /* Synchronize nbd_start_kernel pthread and nbd_stop */ 78 bool has_nbd_pthread; 79 80 struct nbd_io *io_in_recv; 81 TAILQ_HEAD(, nbd_io) received_io_list; 82 TAILQ_HEAD(, nbd_io) executed_io_list; 83 TAILQ_HEAD(, nbd_io) processing_io_list; 84 85 bool is_started; 86 bool is_closing; 87 /* count of nbd_io in spdk_nbd_disk */ 88 int io_count; 89 90 TAILQ_ENTRY(spdk_nbd_disk) tailq; 91 }; 92 93 struct spdk_nbd_disk_globals { 94 TAILQ_HEAD(, spdk_nbd_disk) disk_head; 95 }; 96 97 static struct spdk_nbd_disk_globals g_spdk_nbd; 98 static spdk_nbd_fini_cb g_fini_cb_fn; 99 static void *g_fini_cb_arg; 100 101 static void _nbd_fini(void *arg1); 102 103 static int nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io); 104 static int nbd_io_recv_internal(struct spdk_nbd_disk *nbd); 105 106 int 107 spdk_nbd_init(void) 108 { 109 TAILQ_INIT(&g_spdk_nbd.disk_head); 110 111 return 0; 112 } 113 114 static void 115 _nbd_fini(void *arg1) 116 { 117 struct spdk_nbd_disk *nbd, *nbd_tmp; 118 119 TAILQ_FOREACH_SAFE(nbd, &g_spdk_nbd.disk_head, tailq, nbd_tmp) { 120 if (!nbd->is_closing) { 121 spdk_nbd_stop(nbd); 122 } 123 } 124 125 /* Check if all nbds closed */ 126 if (!TAILQ_FIRST(&g_spdk_nbd.disk_head)) { 127 g_fini_cb_fn(g_fini_cb_arg); 128 } else { 129 spdk_thread_send_msg(spdk_get_thread(), 130 _nbd_fini, NULL); 131 } 132 } 133 134 void 135 spdk_nbd_fini(spdk_nbd_fini_cb cb_fn, void *cb_arg) 136 { 137 g_fini_cb_fn = cb_fn; 138 g_fini_cb_arg = cb_arg; 139 140 _nbd_fini(NULL); 141 } 142 143 static int 144 nbd_disk_register(struct spdk_nbd_disk *nbd) 145 { 146 /* Make sure nbd_path is not used in this SPDK app */ 147 if (nbd_disk_find_by_nbd_path(nbd->nbd_path)) { 148 SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path); 149 return -EBUSY; 150 } 151 152 TAILQ_INSERT_TAIL(&g_spdk_nbd.disk_head, nbd, tailq); 153 154 return 0; 155 } 156 157 static void 158 nbd_disk_unregister(struct spdk_nbd_disk *nbd) 159 { 160 struct spdk_nbd_disk *nbd_idx, *nbd_tmp; 161 162 /* 163 * nbd disk may be stopped before registered. 164 * check whether it was registered. 165 */ 166 TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) { 167 if (nbd == nbd_idx) { 168 TAILQ_REMOVE(&g_spdk_nbd.disk_head, nbd_idx, tailq); 169 break; 170 } 171 } 172 } 173 174 struct spdk_nbd_disk * 175 nbd_disk_find_by_nbd_path(const char *nbd_path) 176 { 177 struct spdk_nbd_disk *nbd; 178 179 /* 180 * check whether nbd has already been registered by nbd path. 181 */ 182 TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) { 183 if (!strcmp(nbd->nbd_path, nbd_path)) { 184 return nbd; 185 } 186 } 187 188 return NULL; 189 } 190 191 struct spdk_nbd_disk *nbd_disk_first(void) 192 { 193 return TAILQ_FIRST(&g_spdk_nbd.disk_head); 194 } 195 196 struct spdk_nbd_disk *nbd_disk_next(struct spdk_nbd_disk *prev) 197 { 198 return TAILQ_NEXT(prev, tailq); 199 } 200 201 const char * 202 nbd_disk_get_nbd_path(struct spdk_nbd_disk *nbd) 203 { 204 return nbd->nbd_path; 205 } 206 207 const char * 208 nbd_disk_get_bdev_name(struct spdk_nbd_disk *nbd) 209 { 210 return spdk_bdev_get_name(nbd->bdev); 211 } 212 213 void 214 spdk_nbd_write_config_json(struct spdk_json_write_ctx *w) 215 { 216 struct spdk_nbd_disk *nbd; 217 218 spdk_json_write_array_begin(w); 219 220 TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) { 221 spdk_json_write_object_begin(w); 222 223 spdk_json_write_named_string(w, "method", "nbd_start_disk"); 224 225 spdk_json_write_named_object_begin(w, "params"); 226 spdk_json_write_named_string(w, "nbd_device", nbd_disk_get_nbd_path(nbd)); 227 spdk_json_write_named_string(w, "bdev_name", nbd_disk_get_bdev_name(nbd)); 228 spdk_json_write_object_end(w); 229 230 spdk_json_write_object_end(w); 231 } 232 233 spdk_json_write_array_end(w); 234 } 235 236 void 237 nbd_disconnect(struct spdk_nbd_disk *nbd) 238 { 239 /* 240 * nbd soft-disconnection to terminate transmission phase. 241 * After receiving this ioctl command, nbd kernel module will send 242 * a NBD_CMD_DISC type io to nbd server in order to inform server. 243 */ 244 ioctl(nbd->dev_fd, NBD_DISCONNECT); 245 } 246 247 static struct nbd_io * 248 nbd_get_io(struct spdk_nbd_disk *nbd) 249 { 250 struct nbd_io *io; 251 252 io = calloc(1, sizeof(*io)); 253 if (!io) { 254 return NULL; 255 } 256 257 io->nbd = nbd; 258 to_be32(&io->resp.magic, NBD_REPLY_MAGIC); 259 260 nbd->io_count++; 261 262 return io; 263 } 264 265 static void 266 nbd_put_io(struct spdk_nbd_disk *nbd, struct nbd_io *io) 267 { 268 if (io->payload) { 269 spdk_free(io->payload); 270 } 271 free(io); 272 273 nbd->io_count--; 274 } 275 276 /* 277 * Check whether received nbd_io are all executed, 278 * and put back executed nbd_io instead of transmitting them 279 * 280 * \return 1 there is still some nbd_io under executing 281 * 0 all nbd_io gotten are freed. 282 */ 283 static int 284 nbd_cleanup_io(struct spdk_nbd_disk *nbd) 285 { 286 /* Try to read the remaining nbd commands in the socket */ 287 while (nbd_io_recv_internal(nbd) > 0); 288 289 /* free io_in_recv */ 290 if (nbd->io_in_recv != NULL) { 291 nbd_put_io(nbd, nbd->io_in_recv); 292 nbd->io_in_recv = NULL; 293 } 294 295 /* 296 * Some nbd_io may be under executing in bdev. 297 * Wait for their done operation. 298 */ 299 if (nbd->io_count != 0) { 300 return 1; 301 } 302 303 return 0; 304 } 305 306 static int 307 _nbd_stop(void *arg) 308 { 309 struct spdk_nbd_disk *nbd = arg; 310 311 if (nbd->nbd_poller) { 312 spdk_poller_unregister(&nbd->nbd_poller); 313 } 314 315 if (nbd->intr) { 316 spdk_interrupt_unregister(&nbd->intr); 317 } 318 319 if (nbd->spdk_sp_fd >= 0) { 320 close(nbd->spdk_sp_fd); 321 nbd->spdk_sp_fd = -1; 322 } 323 324 if (nbd->kernel_sp_fd >= 0) { 325 close(nbd->kernel_sp_fd); 326 nbd->kernel_sp_fd = -1; 327 } 328 329 /* Continue the stop procedure after the exit of nbd_start_kernel pthread */ 330 if (nbd->has_nbd_pthread) { 331 if (nbd->retry_poller == NULL) { 332 nbd->retry_count = NBD_STOP_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US; 333 nbd->retry_poller = SPDK_POLLER_REGISTER(_nbd_stop, nbd, 334 NBD_BUSY_POLLING_INTERVAL_US); 335 return SPDK_POLLER_BUSY; 336 } 337 338 if (nbd->retry_count-- > 0) { 339 return SPDK_POLLER_BUSY; 340 } 341 342 SPDK_ERRLOG("Failed to wait for returning of NBD_DO_IT ioctl.\n"); 343 } 344 345 if (nbd->retry_poller) { 346 spdk_poller_unregister(&nbd->retry_poller); 347 } 348 349 if (nbd->dev_fd >= 0) { 350 /* Clear nbd device only if it is occupied by SPDK app */ 351 if (nbd->nbd_path && nbd_disk_find_by_nbd_path(nbd->nbd_path)) { 352 ioctl(nbd->dev_fd, NBD_CLEAR_QUE); 353 ioctl(nbd->dev_fd, NBD_CLEAR_SOCK); 354 } 355 close(nbd->dev_fd); 356 } 357 358 if (nbd->nbd_path) { 359 free(nbd->nbd_path); 360 } 361 362 if (nbd->ch) { 363 spdk_put_io_channel(nbd->ch); 364 nbd->ch = NULL; 365 } 366 367 if (nbd->bdev_desc) { 368 spdk_bdev_close(nbd->bdev_desc); 369 nbd->bdev_desc = NULL; 370 } 371 372 nbd_disk_unregister(nbd); 373 374 free(nbd); 375 376 return 0; 377 } 378 379 int 380 spdk_nbd_stop(struct spdk_nbd_disk *nbd) 381 { 382 int rc = 0; 383 384 if (nbd == NULL) { 385 return rc; 386 } 387 388 nbd->is_closing = true; 389 390 /* if nbd is not started, it will continue to call nbd stop later */ 391 if (!nbd->is_started) { 392 return 1; 393 } 394 395 /* 396 * Stop action should be called only after all nbd_io are executed. 397 */ 398 399 rc = nbd_cleanup_io(nbd); 400 if (!rc) { 401 _nbd_stop(nbd); 402 } 403 404 return rc; 405 } 406 407 static int64_t 408 nbd_socket_rw(int fd, void *buf, size_t length, bool read_op) 409 { 410 ssize_t rc; 411 412 if (read_op) { 413 rc = read(fd, buf, length); 414 } else { 415 rc = write(fd, buf, length); 416 } 417 418 if (rc == 0) { 419 return -EIO; 420 } else if (rc == -1) { 421 if (errno != EAGAIN) { 422 return -errno; 423 } 424 return 0; 425 } else { 426 return rc; 427 } 428 } 429 430 static void 431 nbd_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 432 { 433 struct nbd_io *io = cb_arg; 434 struct spdk_nbd_disk *nbd = io->nbd; 435 436 if (success) { 437 io->resp.error = 0; 438 } else { 439 to_be32(&io->resp.error, EIO); 440 } 441 442 memcpy(&io->resp.handle, &io->req.handle, sizeof(io->resp.handle)); 443 444 /* When there begins to have executed_io, enable socket writable notice in order to 445 * get it processed in nbd_io_xmit 446 */ 447 if (nbd->interrupt_mode && TAILQ_EMPTY(&nbd->executed_io_list)) { 448 spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT); 449 } 450 451 TAILQ_REMOVE(&nbd->processing_io_list, io, tailq); 452 TAILQ_INSERT_TAIL(&nbd->executed_io_list, io, tailq); 453 454 if (bdev_io != NULL) { 455 spdk_bdev_free_io(bdev_io); 456 } 457 } 458 459 static void 460 nbd_resubmit_io(void *arg) 461 { 462 struct nbd_io *io = (struct nbd_io *)arg; 463 struct spdk_nbd_disk *nbd = io->nbd; 464 int rc = 0; 465 466 rc = nbd_submit_bdev_io(nbd, io); 467 if (rc) { 468 SPDK_INFOLOG(nbd, "nbd: io resubmit for dev %s , io_type %d, returned %d.\n", 469 nbd_disk_get_bdev_name(nbd), from_be32(&io->req.type), rc); 470 } 471 } 472 473 static void 474 nbd_queue_io(struct nbd_io *io) 475 { 476 int rc; 477 struct spdk_bdev *bdev = io->nbd->bdev; 478 479 io->bdev_io_wait.bdev = bdev; 480 io->bdev_io_wait.cb_fn = nbd_resubmit_io; 481 io->bdev_io_wait.cb_arg = io; 482 483 rc = spdk_bdev_queue_io_wait(bdev, io->nbd->ch, &io->bdev_io_wait); 484 if (rc != 0) { 485 SPDK_ERRLOG("Queue io failed in nbd_queue_io, rc=%d.\n", rc); 486 nbd_io_done(NULL, false, io); 487 } 488 } 489 490 static int 491 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io) 492 { 493 struct spdk_bdev_desc *desc = nbd->bdev_desc; 494 struct spdk_io_channel *ch = nbd->ch; 495 int rc = 0; 496 497 switch (from_be32(&io->req.type)) { 498 case NBD_CMD_READ: 499 rc = spdk_bdev_read(desc, ch, io->payload, from_be64(&io->req.from), 500 io->payload_size, nbd_io_done, io); 501 break; 502 case NBD_CMD_WRITE: 503 rc = spdk_bdev_write(desc, ch, io->payload, from_be64(&io->req.from), 504 io->payload_size, nbd_io_done, io); 505 break; 506 #ifdef NBD_FLAG_SEND_FLUSH 507 case NBD_CMD_FLUSH: 508 rc = spdk_bdev_flush(desc, ch, 0, 509 spdk_bdev_get_num_blocks(nbd->bdev) * spdk_bdev_get_block_size(nbd->bdev), 510 nbd_io_done, io); 511 break; 512 #endif 513 #ifdef NBD_FLAG_SEND_TRIM 514 case NBD_CMD_TRIM: 515 rc = spdk_bdev_unmap(desc, ch, from_be64(&io->req.from), 516 from_be32(&io->req.len), nbd_io_done, io); 517 break; 518 #endif 519 default: 520 rc = -1; 521 } 522 523 if (rc < 0) { 524 if (rc == -ENOMEM) { 525 SPDK_INFOLOG(nbd, "No memory, start to queue io.\n"); 526 nbd_queue_io(io); 527 } else { 528 SPDK_ERRLOG("nbd io failed in nbd_queue_io, rc=%d.\n", rc); 529 nbd_io_done(NULL, false, io); 530 } 531 } 532 533 return 0; 534 } 535 536 static int 537 nbd_io_exec(struct spdk_nbd_disk *nbd) 538 { 539 struct nbd_io *io, *io_tmp; 540 int io_count = 0; 541 int ret = 0; 542 543 if (!TAILQ_EMPTY(&nbd->received_io_list)) { 544 TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) { 545 TAILQ_REMOVE(&nbd->received_io_list, io, tailq); 546 TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq); 547 ret = nbd_submit_bdev_io(nbd, io); 548 if (ret < 0) { 549 return ret; 550 } 551 552 io_count++; 553 } 554 } 555 556 return io_count; 557 } 558 559 static int 560 nbd_io_recv_internal(struct spdk_nbd_disk *nbd) 561 { 562 struct nbd_io *io; 563 int ret = 0; 564 int received = 0; 565 566 if (nbd->io_in_recv == NULL) { 567 nbd->io_in_recv = nbd_get_io(nbd); 568 if (!nbd->io_in_recv) { 569 return -ENOMEM; 570 } 571 } 572 573 io = nbd->io_in_recv; 574 575 if (io->state == NBD_IO_RECV_REQ) { 576 ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->req + io->offset, 577 sizeof(io->req) - io->offset, true); 578 if (ret < 0) { 579 nbd_put_io(nbd, io); 580 nbd->io_in_recv = NULL; 581 return ret; 582 } 583 584 io->offset += ret; 585 received = ret; 586 587 /* request is fully received */ 588 if (io->offset == sizeof(io->req)) { 589 io->offset = 0; 590 591 /* req magic check */ 592 if (from_be32(&io->req.magic) != NBD_REQUEST_MAGIC) { 593 SPDK_ERRLOG("invalid request magic\n"); 594 nbd_put_io(nbd, io); 595 nbd->io_in_recv = NULL; 596 return -EINVAL; 597 } 598 599 if (from_be32(&io->req.type) == NBD_CMD_DISC) { 600 nbd->is_closing = true; 601 nbd->io_in_recv = NULL; 602 if (nbd->interrupt_mode && TAILQ_EMPTY(&nbd->executed_io_list)) { 603 spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT); 604 } 605 nbd_put_io(nbd, io); 606 /* After receiving NBD_CMD_DISC, nbd will not receive any new commands */ 607 return received; 608 } 609 610 /* io except read/write should ignore payload */ 611 if (from_be32(&io->req.type) == NBD_CMD_WRITE || 612 from_be32(&io->req.type) == NBD_CMD_READ) { 613 io->payload_size = from_be32(&io->req.len); 614 } else { 615 io->payload_size = 0; 616 } 617 618 /* io payload allocate */ 619 if (io->payload_size) { 620 io->payload = spdk_malloc(io->payload_size, nbd->buf_align, NULL, 621 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 622 if (io->payload == NULL) { 623 SPDK_ERRLOG("could not allocate io->payload of size %d\n", io->payload_size); 624 nbd_put_io(nbd, io); 625 nbd->io_in_recv = NULL; 626 return -ENOMEM; 627 } 628 } else { 629 io->payload = NULL; 630 } 631 632 /* next io step */ 633 if (from_be32(&io->req.type) == NBD_CMD_WRITE) { 634 io->state = NBD_IO_RECV_PAYLOAD; 635 } else { 636 io->state = NBD_IO_XMIT_RESP; 637 if (spdk_likely((!nbd->is_closing) && nbd->is_started)) { 638 TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq); 639 } else { 640 TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq); 641 nbd_io_done(NULL, false, io); 642 } 643 nbd->io_in_recv = NULL; 644 } 645 } 646 } 647 648 if (io->state == NBD_IO_RECV_PAYLOAD) { 649 ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset, true); 650 if (ret < 0) { 651 nbd_put_io(nbd, io); 652 nbd->io_in_recv = NULL; 653 return ret; 654 } 655 656 io->offset += ret; 657 received += ret; 658 659 /* request payload is fully received */ 660 if (io->offset == io->payload_size) { 661 io->offset = 0; 662 io->state = NBD_IO_XMIT_RESP; 663 if (spdk_likely((!nbd->is_closing) && nbd->is_started)) { 664 TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq); 665 } else { 666 TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq); 667 nbd_io_done(NULL, false, io); 668 } 669 nbd->io_in_recv = NULL; 670 } 671 672 } 673 674 return received; 675 } 676 677 static int 678 nbd_io_recv(struct spdk_nbd_disk *nbd) 679 { 680 int i, rc, ret = 0; 681 682 /* 683 * nbd server should not accept request after closing command 684 */ 685 if (nbd->is_closing) { 686 return 0; 687 } 688 689 for (i = 0; i < GET_IO_LOOP_COUNT; i++) { 690 rc = nbd_io_recv_internal(nbd); 691 if (rc < 0) { 692 return rc; 693 } 694 ret += rc; 695 if (nbd->is_closing) { 696 break; 697 } 698 } 699 700 return ret; 701 } 702 703 static int 704 nbd_io_xmit_internal(struct spdk_nbd_disk *nbd) 705 { 706 struct nbd_io *io; 707 int ret = 0; 708 int sent = 0; 709 710 io = TAILQ_FIRST(&nbd->executed_io_list); 711 if (io == NULL) { 712 return 0; 713 } 714 715 /* Remove IO from list now assuming it will be completed. It will be inserted 716 * back to the head if it cannot be completed. This approach is specifically 717 * taken to work around a scan-build use-after-free mischaracterization. 718 */ 719 TAILQ_REMOVE(&nbd->executed_io_list, io, tailq); 720 721 /* resp error and handler are already set in io_done */ 722 723 if (io->state == NBD_IO_XMIT_RESP) { 724 ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->resp + io->offset, 725 sizeof(io->resp) - io->offset, false); 726 if (ret <= 0) { 727 goto reinsert; 728 } 729 730 io->offset += ret; 731 sent = ret; 732 733 /* response is fully transmitted */ 734 if (io->offset == sizeof(io->resp)) { 735 io->offset = 0; 736 737 /* transmit payload only when NBD_CMD_READ with no resp error */ 738 if (from_be32(&io->req.type) != NBD_CMD_READ || io->resp.error != 0) { 739 nbd_put_io(nbd, io); 740 return 0; 741 } else { 742 io->state = NBD_IO_XMIT_PAYLOAD; 743 } 744 } 745 } 746 747 if (io->state == NBD_IO_XMIT_PAYLOAD) { 748 ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset, 749 false); 750 if (ret <= 0) { 751 goto reinsert; 752 } 753 754 io->offset += ret; 755 sent += ret; 756 757 /* read payload is fully transmitted */ 758 if (io->offset == io->payload_size) { 759 nbd_put_io(nbd, io); 760 return sent; 761 } 762 } 763 764 reinsert: 765 TAILQ_INSERT_HEAD(&nbd->executed_io_list, io, tailq); 766 return ret < 0 ? ret : sent; 767 } 768 769 static int 770 nbd_io_xmit(struct spdk_nbd_disk *nbd) 771 { 772 int ret = 0; 773 int rc; 774 775 while (!TAILQ_EMPTY(&nbd->executed_io_list)) { 776 rc = nbd_io_xmit_internal(nbd); 777 if (rc < 0) { 778 return rc; 779 } 780 781 ret += rc; 782 } 783 784 /* When there begins to have no executed_io, disable socket writable notice */ 785 if (nbd->interrupt_mode) { 786 spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN); 787 } 788 789 return ret; 790 } 791 792 /** 793 * Poll an NBD instance. 794 * 795 * \return 0 on success or negated errno values on error (e.g. connection closed). 796 */ 797 static int 798 _nbd_poll(struct spdk_nbd_disk *nbd) 799 { 800 int received, sent, executed; 801 802 /* transmit executed io first */ 803 sent = nbd_io_xmit(nbd); 804 if (sent < 0) { 805 return sent; 806 } 807 808 received = nbd_io_recv(nbd); 809 if (received < 0) { 810 return received; 811 } 812 813 executed = nbd_io_exec(nbd); 814 if (executed < 0) { 815 return executed; 816 } 817 818 return sent + received + executed; 819 } 820 821 static int 822 nbd_poll(void *arg) 823 { 824 struct spdk_nbd_disk *nbd = arg; 825 int rc; 826 827 rc = _nbd_poll(nbd); 828 if (rc < 0) { 829 SPDK_INFOLOG(nbd, "nbd_poll() returned %s (%d); closing connection\n", 830 spdk_strerror(-rc), rc); 831 _nbd_stop(nbd); 832 return SPDK_POLLER_IDLE; 833 } 834 if (nbd->is_closing) { 835 spdk_nbd_stop(nbd); 836 } 837 838 return rc == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY; 839 } 840 841 static void * 842 nbd_start_kernel(void *arg) 843 { 844 struct spdk_nbd_disk *nbd = arg; 845 846 spdk_unaffinitize_thread(); 847 848 /* This will block in the kernel until we close the spdk_sp_fd. */ 849 ioctl(nbd->dev_fd, NBD_DO_IT); 850 851 nbd->has_nbd_pthread = false; 852 853 pthread_exit(NULL); 854 } 855 856 static void 857 nbd_bdev_hot_remove(struct spdk_nbd_disk *nbd) 858 { 859 struct nbd_io *io, *io_tmp; 860 861 nbd->is_closing = true; 862 nbd_cleanup_io(nbd); 863 864 if (!TAILQ_EMPTY(&nbd->received_io_list)) { 865 TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) { 866 TAILQ_REMOVE(&nbd->received_io_list, io, tailq); 867 TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq); 868 } 869 } 870 if (!TAILQ_EMPTY(&nbd->processing_io_list)) { 871 TAILQ_FOREACH_SAFE(io, &nbd->processing_io_list, tailq, io_tmp) { 872 nbd_io_done(NULL, false, io); 873 } 874 } 875 } 876 877 static void 878 nbd_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, 879 void *event_ctx) 880 { 881 switch (type) { 882 case SPDK_BDEV_EVENT_REMOVE: 883 nbd_bdev_hot_remove(event_ctx); 884 break; 885 default: 886 SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); 887 break; 888 } 889 } 890 891 struct spdk_nbd_start_ctx { 892 struct spdk_nbd_disk *nbd; 893 spdk_nbd_start_cb cb_fn; 894 void *cb_arg; 895 }; 896 897 static void 898 nbd_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 899 { 900 struct spdk_nbd_disk *nbd = cb_arg; 901 902 nbd->interrupt_mode = interrupt_mode; 903 } 904 905 static void 906 nbd_start_complete(struct spdk_nbd_start_ctx *ctx) 907 { 908 int rc; 909 pthread_t tid; 910 unsigned long nbd_flags = 0; 911 912 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_BLKSIZE, spdk_bdev_get_block_size(ctx->nbd->bdev)); 913 if (rc == -1) { 914 SPDK_ERRLOG("ioctl(NBD_SET_BLKSIZE) failed: %s\n", spdk_strerror(errno)); 915 rc = -errno; 916 goto err; 917 } 918 919 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SIZE_BLOCKS, spdk_bdev_get_num_blocks(ctx->nbd->bdev)); 920 if (rc == -1) { 921 SPDK_ERRLOG("ioctl(NBD_SET_SIZE_BLOCKS) failed: %s\n", spdk_strerror(errno)); 922 rc = -errno; 923 goto err; 924 } 925 926 #ifdef NBD_SET_TIMEOUT 927 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_TIMEOUT, NBD_IO_TIMEOUT_S); 928 if (rc == -1) { 929 SPDK_ERRLOG("ioctl(NBD_SET_TIMEOUT) failed: %s\n", spdk_strerror(errno)); 930 rc = -errno; 931 goto err; 932 } 933 #else 934 SPDK_NOTICELOG("ioctl(NBD_SET_TIMEOUT) is not supported.\n"); 935 #endif 936 937 #ifdef NBD_FLAG_SEND_FLUSH 938 if (spdk_bdev_io_type_supported(ctx->nbd->bdev, SPDK_BDEV_IO_TYPE_FLUSH)) { 939 nbd_flags |= NBD_FLAG_SEND_FLUSH; 940 } 941 #endif 942 #ifdef NBD_FLAG_SEND_TRIM 943 if (spdk_bdev_io_type_supported(ctx->nbd->bdev, SPDK_BDEV_IO_TYPE_UNMAP)) { 944 nbd_flags |= NBD_FLAG_SEND_TRIM; 945 } 946 #endif 947 948 if (nbd_flags) { 949 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_FLAGS, nbd_flags); 950 if (rc == -1) { 951 SPDK_ERRLOG("ioctl(NBD_SET_FLAGS, 0x%lx) failed: %s\n", nbd_flags, spdk_strerror(errno)); 952 rc = -errno; 953 goto err; 954 } 955 } 956 957 ctx->nbd->has_nbd_pthread = true; 958 rc = pthread_create(&tid, NULL, nbd_start_kernel, ctx->nbd); 959 if (rc != 0) { 960 ctx->nbd->has_nbd_pthread = false; 961 SPDK_ERRLOG("could not create thread: %s\n", spdk_strerror(rc)); 962 rc = -rc; 963 goto err; 964 } 965 966 rc = pthread_detach(tid); 967 if (rc != 0) { 968 SPDK_ERRLOG("could not detach thread for nbd kernel: %s\n", spdk_strerror(rc)); 969 rc = -rc; 970 goto err; 971 } 972 973 if (spdk_interrupt_mode_is_enabled()) { 974 ctx->nbd->intr = SPDK_INTERRUPT_REGISTER(ctx->nbd->spdk_sp_fd, nbd_poll, ctx->nbd); 975 } 976 977 ctx->nbd->nbd_poller = SPDK_POLLER_REGISTER(nbd_poll, ctx->nbd, 0); 978 spdk_poller_register_interrupt(ctx->nbd->nbd_poller, nbd_poller_set_interrupt_mode, ctx->nbd); 979 980 if (ctx->cb_fn) { 981 ctx->cb_fn(ctx->cb_arg, ctx->nbd, 0); 982 } 983 984 /* nbd will possibly receive stop command while initing */ 985 ctx->nbd->is_started = true; 986 987 free(ctx); 988 return; 989 990 err: 991 _nbd_stop(ctx->nbd); 992 if (ctx->cb_fn) { 993 ctx->cb_fn(ctx->cb_arg, NULL, rc); 994 } 995 free(ctx); 996 } 997 998 static int 999 nbd_enable_kernel(void *arg) 1000 { 1001 struct spdk_nbd_start_ctx *ctx = arg; 1002 int rc; 1003 1004 /* Declare device setup by this process */ 1005 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SOCK, ctx->nbd->kernel_sp_fd); 1006 1007 if (rc) { 1008 if (errno == EBUSY) { 1009 if (ctx->nbd->retry_poller == NULL) { 1010 ctx->nbd->retry_count = NBD_START_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US; 1011 ctx->nbd->retry_poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx, 1012 NBD_BUSY_POLLING_INTERVAL_US); 1013 return SPDK_POLLER_BUSY; 1014 } else if (ctx->nbd->retry_count-- > 0) { 1015 /* Repeatedly unregister and register retry poller to avoid scan-build error */ 1016 spdk_poller_unregister(&ctx->nbd->retry_poller); 1017 ctx->nbd->retry_poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx, 1018 NBD_BUSY_POLLING_INTERVAL_US); 1019 return SPDK_POLLER_BUSY; 1020 } 1021 } 1022 1023 SPDK_ERRLOG("ioctl(NBD_SET_SOCK) failed: %s\n", spdk_strerror(errno)); 1024 if (ctx->nbd->retry_poller) { 1025 spdk_poller_unregister(&ctx->nbd->retry_poller); 1026 } 1027 1028 _nbd_stop(ctx->nbd); 1029 1030 if (ctx->cb_fn) { 1031 ctx->cb_fn(ctx->cb_arg, NULL, -errno); 1032 } 1033 1034 free(ctx); 1035 return SPDK_POLLER_BUSY; 1036 } 1037 1038 if (ctx->nbd->retry_poller) { 1039 spdk_poller_unregister(&ctx->nbd->retry_poller); 1040 } 1041 1042 nbd_start_complete(ctx); 1043 1044 return SPDK_POLLER_BUSY; 1045 } 1046 1047 void 1048 spdk_nbd_start(const char *bdev_name, const char *nbd_path, 1049 spdk_nbd_start_cb cb_fn, void *cb_arg) 1050 { 1051 struct spdk_nbd_start_ctx *ctx = NULL; 1052 struct spdk_nbd_disk *nbd = NULL; 1053 struct spdk_bdev *bdev; 1054 int rc; 1055 int sp[2]; 1056 1057 nbd = calloc(1, sizeof(*nbd)); 1058 if (nbd == NULL) { 1059 rc = -ENOMEM; 1060 goto err; 1061 } 1062 1063 nbd->dev_fd = -1; 1064 nbd->spdk_sp_fd = -1; 1065 nbd->kernel_sp_fd = -1; 1066 1067 ctx = calloc(1, sizeof(*ctx)); 1068 if (ctx == NULL) { 1069 rc = -ENOMEM; 1070 goto err; 1071 } 1072 1073 ctx->nbd = nbd; 1074 ctx->cb_fn = cb_fn; 1075 ctx->cb_arg = cb_arg; 1076 1077 rc = spdk_bdev_open_ext(bdev_name, true, nbd_bdev_event_cb, nbd, &nbd->bdev_desc); 1078 if (rc != 0) { 1079 SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc); 1080 goto err; 1081 } 1082 1083 bdev = spdk_bdev_desc_get_bdev(nbd->bdev_desc); 1084 nbd->bdev = bdev; 1085 1086 nbd->ch = spdk_bdev_get_io_channel(nbd->bdev_desc); 1087 nbd->buf_align = spdk_max(spdk_bdev_get_buf_align(bdev), 64); 1088 1089 rc = socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, sp); 1090 if (rc != 0) { 1091 SPDK_ERRLOG("socketpair failed\n"); 1092 rc = -errno; 1093 goto err; 1094 } 1095 1096 nbd->spdk_sp_fd = sp[0]; 1097 nbd->kernel_sp_fd = sp[1]; 1098 nbd->nbd_path = strdup(nbd_path); 1099 if (!nbd->nbd_path) { 1100 SPDK_ERRLOG("strdup allocation failure\n"); 1101 rc = -ENOMEM; 1102 goto err; 1103 } 1104 1105 TAILQ_INIT(&nbd->received_io_list); 1106 TAILQ_INIT(&nbd->executed_io_list); 1107 TAILQ_INIT(&nbd->processing_io_list); 1108 1109 /* Add nbd_disk to the end of disk list */ 1110 rc = nbd_disk_register(ctx->nbd); 1111 if (rc != 0) { 1112 goto err; 1113 } 1114 1115 nbd->dev_fd = open(nbd_path, O_RDWR | O_DIRECT); 1116 if (nbd->dev_fd == -1) { 1117 SPDK_ERRLOG("open(\"%s\") failed: %s\n", nbd_path, spdk_strerror(errno)); 1118 rc = -errno; 1119 goto err; 1120 } 1121 1122 SPDK_INFOLOG(nbd, "Enabling kernel access to bdev %s via %s\n", 1123 bdev_name, nbd_path); 1124 1125 nbd_enable_kernel(ctx); 1126 return; 1127 1128 err: 1129 free(ctx); 1130 if (nbd) { 1131 _nbd_stop(nbd); 1132 } 1133 1134 if (cb_fn) { 1135 cb_fn(cb_arg, NULL, rc); 1136 } 1137 } 1138 1139 const char * 1140 spdk_nbd_get_path(struct spdk_nbd_disk *nbd) 1141 { 1142 return nbd->nbd_path; 1143 } 1144 1145 SPDK_LOG_REGISTER_COMPONENT(nbd) 1146