1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2017 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 #include "spdk/string.h" 8 9 #include <linux/nbd.h> 10 11 #include "spdk/nbd.h" 12 #include "nbd_internal.h" 13 #include "spdk/bdev.h" 14 #include "spdk/endian.h" 15 #include "spdk/env.h" 16 #include "spdk/likely.h" 17 #include "spdk/log.h" 18 #include "spdk/util.h" 19 #include "spdk/thread.h" 20 21 #include "spdk/queue.h" 22 23 #define GET_IO_LOOP_COUNT 16 24 #define NBD_START_BUSY_WAITING_MS 1000 25 #define NBD_STOP_BUSY_WAITING_MS 10000 26 #define NBD_BUSY_POLLING_INTERVAL_US 20000 27 #define NBD_IO_TIMEOUT_S 60 28 29 enum nbd_io_state_t { 30 /* Receiving or ready to receive nbd request header */ 31 NBD_IO_RECV_REQ = 0, 32 /* Receiving write payload */ 33 NBD_IO_RECV_PAYLOAD, 34 /* Transmitting or ready to transmit nbd response header */ 35 NBD_IO_XMIT_RESP, 36 /* Transmitting read payload */ 37 NBD_IO_XMIT_PAYLOAD, 38 }; 39 40 struct nbd_io { 41 struct spdk_nbd_disk *nbd; 42 enum nbd_io_state_t state; 43 44 void *payload; 45 uint32_t payload_size; 46 47 struct nbd_request req; 48 struct nbd_reply resp; 49 50 /* 51 * Tracks current progress on reading/writing a request, 52 * response, or payload from the nbd socket. 53 */ 54 uint32_t offset; 55 56 /* for bdev io_wait */ 57 struct spdk_bdev_io_wait_entry bdev_io_wait; 58 59 TAILQ_ENTRY(nbd_io) tailq; 60 }; 61 62 struct spdk_nbd_disk { 63 struct spdk_bdev *bdev; 64 struct spdk_bdev_desc *bdev_desc; 65 struct spdk_io_channel *ch; 66 int dev_fd; 67 char *nbd_path; 68 int kernel_sp_fd; 69 int spdk_sp_fd; 70 struct spdk_poller *nbd_poller; 71 struct spdk_interrupt *intr; 72 bool interrupt_mode; 73 uint32_t buf_align; 74 75 struct spdk_poller *retry_poller; 76 int retry_count; 77 /* Synchronize nbd_start_kernel pthread and nbd_stop */ 78 bool has_nbd_pthread; 79 80 struct nbd_io *io_in_recv; 81 TAILQ_HEAD(, nbd_io) received_io_list; 82 TAILQ_HEAD(, nbd_io) executed_io_list; 83 TAILQ_HEAD(, nbd_io) processing_io_list; 84 85 bool is_started; 86 bool is_closing; 87 /* count of nbd_io in spdk_nbd_disk */ 88 int io_count; 89 90 TAILQ_ENTRY(spdk_nbd_disk) tailq; 91 }; 92 93 struct spdk_nbd_disk_globals { 94 TAILQ_HEAD(, spdk_nbd_disk) disk_head; 95 }; 96 97 static struct spdk_nbd_disk_globals g_spdk_nbd; 98 static spdk_nbd_fini_cb g_fini_cb_fn; 99 static void *g_fini_cb_arg; 100 101 static void _nbd_fini(void *arg1); 102 103 static int nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io); 104 static int nbd_io_recv_internal(struct spdk_nbd_disk *nbd); 105 106 int 107 spdk_nbd_init(void) 108 { 109 TAILQ_INIT(&g_spdk_nbd.disk_head); 110 111 return 0; 112 } 113 114 static void 115 _nbd_fini(void *arg1) 116 { 117 struct spdk_nbd_disk *nbd, *nbd_tmp; 118 119 TAILQ_FOREACH_SAFE(nbd, &g_spdk_nbd.disk_head, tailq, nbd_tmp) { 120 if (!nbd->is_closing) { 121 spdk_nbd_stop(nbd); 122 } 123 } 124 125 /* Check if all nbds closed */ 126 if (!TAILQ_FIRST(&g_spdk_nbd.disk_head)) { 127 g_fini_cb_fn(g_fini_cb_arg); 128 } else { 129 spdk_thread_send_msg(spdk_get_thread(), 130 _nbd_fini, NULL); 131 } 132 } 133 134 void 135 spdk_nbd_fini(spdk_nbd_fini_cb cb_fn, void *cb_arg) 136 { 137 g_fini_cb_fn = cb_fn; 138 g_fini_cb_arg = cb_arg; 139 140 _nbd_fini(NULL); 141 } 142 143 static int 144 nbd_disk_register(struct spdk_nbd_disk *nbd) 145 { 146 /* Make sure nbd_path is not used in this SPDK app */ 147 if (nbd_disk_find_by_nbd_path(nbd->nbd_path)) { 148 SPDK_NOTICELOG("%s is already exported\n", nbd->nbd_path); 149 return -EBUSY; 150 } 151 152 TAILQ_INSERT_TAIL(&g_spdk_nbd.disk_head, nbd, tailq); 153 154 return 0; 155 } 156 157 static void 158 nbd_disk_unregister(struct spdk_nbd_disk *nbd) 159 { 160 struct spdk_nbd_disk *nbd_idx, *nbd_tmp; 161 162 /* 163 * nbd disk may be stopped before registered. 164 * check whether it was registered. 165 */ 166 TAILQ_FOREACH_SAFE(nbd_idx, &g_spdk_nbd.disk_head, tailq, nbd_tmp) { 167 if (nbd == nbd_idx) { 168 TAILQ_REMOVE(&g_spdk_nbd.disk_head, nbd_idx, tailq); 169 break; 170 } 171 } 172 } 173 174 struct spdk_nbd_disk * 175 nbd_disk_find_by_nbd_path(const char *nbd_path) 176 { 177 struct spdk_nbd_disk *nbd; 178 179 /* 180 * check whether nbd has already been registered by nbd path. 181 */ 182 TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) { 183 if (!strcmp(nbd->nbd_path, nbd_path)) { 184 return nbd; 185 } 186 } 187 188 return NULL; 189 } 190 191 struct spdk_nbd_disk *nbd_disk_first(void) 192 { 193 return TAILQ_FIRST(&g_spdk_nbd.disk_head); 194 } 195 196 struct spdk_nbd_disk *nbd_disk_next(struct spdk_nbd_disk *prev) 197 { 198 return TAILQ_NEXT(prev, tailq); 199 } 200 201 const char * 202 nbd_disk_get_nbd_path(struct spdk_nbd_disk *nbd) 203 { 204 return nbd->nbd_path; 205 } 206 207 const char * 208 nbd_disk_get_bdev_name(struct spdk_nbd_disk *nbd) 209 { 210 return spdk_bdev_get_name(nbd->bdev); 211 } 212 213 void 214 spdk_nbd_write_config_json(struct spdk_json_write_ctx *w) 215 { 216 struct spdk_nbd_disk *nbd; 217 218 spdk_json_write_array_begin(w); 219 220 TAILQ_FOREACH(nbd, &g_spdk_nbd.disk_head, tailq) { 221 spdk_json_write_object_begin(w); 222 223 spdk_json_write_named_string(w, "method", "nbd_start_disk"); 224 225 spdk_json_write_named_object_begin(w, "params"); 226 spdk_json_write_named_string(w, "nbd_device", nbd_disk_get_nbd_path(nbd)); 227 spdk_json_write_named_string(w, "bdev_name", nbd_disk_get_bdev_name(nbd)); 228 spdk_json_write_object_end(w); 229 230 spdk_json_write_object_end(w); 231 } 232 233 spdk_json_write_array_end(w); 234 } 235 236 void 237 nbd_disconnect(struct spdk_nbd_disk *nbd) 238 { 239 /* 240 * nbd soft-disconnection to terminate transmission phase. 241 * After receiving this ioctl command, nbd kernel module will send 242 * a NBD_CMD_DISC type io to nbd server in order to inform server. 243 */ 244 ioctl(nbd->dev_fd, NBD_DISCONNECT); 245 } 246 247 static struct nbd_io * 248 nbd_get_io(struct spdk_nbd_disk *nbd) 249 { 250 struct nbd_io *io; 251 252 io = calloc(1, sizeof(*io)); 253 if (!io) { 254 return NULL; 255 } 256 257 io->nbd = nbd; 258 to_be32(&io->resp.magic, NBD_REPLY_MAGIC); 259 260 nbd->io_count++; 261 262 return io; 263 } 264 265 static void 266 nbd_put_io(struct spdk_nbd_disk *nbd, struct nbd_io *io) 267 { 268 if (io->payload) { 269 spdk_free(io->payload); 270 } 271 free(io); 272 273 nbd->io_count--; 274 } 275 276 /* 277 * Check whether received nbd_io are all executed, 278 * and put back executed nbd_io instead of transmitting them 279 * 280 * \return 1 there is still some nbd_io under executing 281 * 0 all nbd_io gotten are freed. 282 */ 283 static int 284 nbd_cleanup_io(struct spdk_nbd_disk *nbd) 285 { 286 /* Try to read the remaining nbd commands in the socket */ 287 while (nbd_io_recv_internal(nbd) > 0); 288 289 /* free io_in_recv */ 290 if (nbd->io_in_recv != NULL) { 291 nbd_put_io(nbd, nbd->io_in_recv); 292 nbd->io_in_recv = NULL; 293 } 294 295 /* 296 * Some nbd_io may be under executing in bdev. 297 * Wait for their done operation. 298 */ 299 if (nbd->io_count != 0) { 300 return 1; 301 } 302 303 return 0; 304 } 305 306 static int 307 _nbd_stop(void *arg) 308 { 309 struct spdk_nbd_disk *nbd = arg; 310 311 if (nbd->nbd_poller) { 312 spdk_poller_unregister(&nbd->nbd_poller); 313 } 314 315 if (nbd->intr) { 316 spdk_interrupt_unregister(&nbd->intr); 317 } 318 319 if (nbd->spdk_sp_fd >= 0) { 320 close(nbd->spdk_sp_fd); 321 nbd->spdk_sp_fd = -1; 322 } 323 324 if (nbd->kernel_sp_fd >= 0) { 325 close(nbd->kernel_sp_fd); 326 nbd->kernel_sp_fd = -1; 327 } 328 329 /* Continue the stop procedure after the exit of nbd_start_kernel pthread */ 330 if (nbd->has_nbd_pthread) { 331 if (nbd->retry_poller == NULL) { 332 nbd->retry_count = NBD_STOP_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US; 333 nbd->retry_poller = SPDK_POLLER_REGISTER(_nbd_stop, nbd, 334 NBD_BUSY_POLLING_INTERVAL_US); 335 return SPDK_POLLER_BUSY; 336 } 337 338 if (nbd->retry_count-- > 0) { 339 return SPDK_POLLER_BUSY; 340 } 341 342 SPDK_ERRLOG("Failed to wait for returning of NBD_DO_IT ioctl.\n"); 343 } 344 345 if (nbd->retry_poller) { 346 spdk_poller_unregister(&nbd->retry_poller); 347 } 348 349 if (nbd->dev_fd >= 0) { 350 /* Clear nbd device only if it is occupied by SPDK app */ 351 if (nbd->nbd_path && nbd_disk_find_by_nbd_path(nbd->nbd_path)) { 352 ioctl(nbd->dev_fd, NBD_CLEAR_QUE); 353 ioctl(nbd->dev_fd, NBD_CLEAR_SOCK); 354 } 355 close(nbd->dev_fd); 356 } 357 358 if (nbd->nbd_path) { 359 free(nbd->nbd_path); 360 } 361 362 if (nbd->ch) { 363 spdk_put_io_channel(nbd->ch); 364 nbd->ch = NULL; 365 } 366 367 if (nbd->bdev_desc) { 368 spdk_bdev_close(nbd->bdev_desc); 369 nbd->bdev_desc = NULL; 370 } 371 372 nbd_disk_unregister(nbd); 373 374 free(nbd); 375 376 return 0; 377 } 378 379 int 380 spdk_nbd_stop(struct spdk_nbd_disk *nbd) 381 { 382 int rc = 0; 383 384 if (nbd == NULL) { 385 return rc; 386 } 387 388 nbd->is_closing = true; 389 390 /* if nbd is not started, it will continue to call nbd stop later */ 391 if (!nbd->is_started) { 392 return 1; 393 } 394 395 /* 396 * Stop action should be called only after all nbd_io are executed. 397 */ 398 399 rc = nbd_cleanup_io(nbd); 400 if (!rc) { 401 _nbd_stop(nbd); 402 } 403 404 return rc; 405 } 406 407 static int64_t 408 nbd_socket_rw(int fd, void *buf, size_t length, bool read_op) 409 { 410 ssize_t rc; 411 412 if (read_op) { 413 rc = read(fd, buf, length); 414 } else { 415 rc = write(fd, buf, length); 416 } 417 418 if (rc == 0) { 419 return -EIO; 420 } else if (rc == -1) { 421 if (errno != EAGAIN) { 422 return -errno; 423 } 424 return 0; 425 } else { 426 return rc; 427 } 428 } 429 430 static void 431 nbd_io_done(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg) 432 { 433 struct nbd_io *io = cb_arg; 434 struct spdk_nbd_disk *nbd = io->nbd; 435 436 if (success) { 437 io->resp.error = 0; 438 } else { 439 to_be32(&io->resp.error, EIO); 440 } 441 442 memcpy(&io->resp.handle, &io->req.handle, sizeof(io->resp.handle)); 443 444 /* When there begins to have executed_io, enable socket writable notice in order to 445 * get it processed in nbd_io_xmit 446 */ 447 if (nbd->interrupt_mode && TAILQ_EMPTY(&nbd->executed_io_list)) { 448 spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT); 449 } 450 451 TAILQ_REMOVE(&nbd->processing_io_list, io, tailq); 452 TAILQ_INSERT_TAIL(&nbd->executed_io_list, io, tailq); 453 454 if (bdev_io != NULL) { 455 spdk_bdev_free_io(bdev_io); 456 } 457 } 458 459 static void 460 nbd_resubmit_io(void *arg) 461 { 462 struct nbd_io *io = (struct nbd_io *)arg; 463 struct spdk_nbd_disk *nbd = io->nbd; 464 int rc = 0; 465 466 rc = nbd_submit_bdev_io(nbd, io); 467 if (rc) { 468 SPDK_INFOLOG(nbd, "nbd: io resubmit for dev %s , io_type %d, returned %d.\n", 469 nbd_disk_get_bdev_name(nbd), from_be32(&io->req.type), rc); 470 } 471 } 472 473 static void 474 nbd_queue_io(struct nbd_io *io) 475 { 476 int rc; 477 struct spdk_bdev *bdev = io->nbd->bdev; 478 479 io->bdev_io_wait.bdev = bdev; 480 io->bdev_io_wait.cb_fn = nbd_resubmit_io; 481 io->bdev_io_wait.cb_arg = io; 482 483 rc = spdk_bdev_queue_io_wait(bdev, io->nbd->ch, &io->bdev_io_wait); 484 if (rc != 0) { 485 SPDK_ERRLOG("Queue io failed in nbd_queue_io, rc=%d.\n", rc); 486 nbd_io_done(NULL, false, io); 487 } 488 } 489 490 static int 491 nbd_submit_bdev_io(struct spdk_nbd_disk *nbd, struct nbd_io *io) 492 { 493 struct spdk_bdev_desc *desc = nbd->bdev_desc; 494 struct spdk_io_channel *ch = nbd->ch; 495 int rc = 0; 496 497 switch (from_be32(&io->req.type)) { 498 case NBD_CMD_READ: 499 rc = spdk_bdev_read(desc, ch, io->payload, from_be64(&io->req.from), 500 io->payload_size, nbd_io_done, io); 501 break; 502 case NBD_CMD_WRITE: 503 rc = spdk_bdev_write(desc, ch, io->payload, from_be64(&io->req.from), 504 io->payload_size, nbd_io_done, io); 505 break; 506 #ifdef NBD_FLAG_SEND_FLUSH 507 case NBD_CMD_FLUSH: 508 rc = spdk_bdev_flush(desc, ch, 0, 509 spdk_bdev_get_num_blocks(nbd->bdev) * spdk_bdev_get_block_size(nbd->bdev), 510 nbd_io_done, io); 511 break; 512 #endif 513 #ifdef NBD_FLAG_SEND_TRIM 514 case NBD_CMD_TRIM: 515 rc = spdk_bdev_unmap(desc, ch, from_be64(&io->req.from), 516 from_be32(&io->req.len), nbd_io_done, io); 517 break; 518 #endif 519 default: 520 rc = -1; 521 } 522 523 if (rc < 0) { 524 if (rc == -ENOMEM) { 525 SPDK_INFOLOG(nbd, "No memory, start to queue io.\n"); 526 nbd_queue_io(io); 527 } else { 528 SPDK_ERRLOG("nbd io failed in nbd_queue_io, rc=%d.\n", rc); 529 nbd_io_done(NULL, false, io); 530 } 531 } 532 533 return 0; 534 } 535 536 static int 537 nbd_io_exec(struct spdk_nbd_disk *nbd) 538 { 539 struct nbd_io *io, *io_tmp; 540 int io_count = 0; 541 int ret = 0; 542 543 TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) { 544 TAILQ_REMOVE(&nbd->received_io_list, io, tailq); 545 TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq); 546 ret = nbd_submit_bdev_io(nbd, io); 547 if (ret < 0) { 548 return ret; 549 } 550 551 io_count++; 552 } 553 554 return io_count; 555 } 556 557 static int 558 nbd_io_recv_internal(struct spdk_nbd_disk *nbd) 559 { 560 struct nbd_io *io; 561 int ret = 0; 562 int received = 0; 563 564 if (nbd->io_in_recv == NULL) { 565 nbd->io_in_recv = nbd_get_io(nbd); 566 if (!nbd->io_in_recv) { 567 return -ENOMEM; 568 } 569 } 570 571 io = nbd->io_in_recv; 572 573 if (io->state == NBD_IO_RECV_REQ) { 574 ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->req + io->offset, 575 sizeof(io->req) - io->offset, true); 576 if (ret < 0) { 577 nbd_put_io(nbd, io); 578 nbd->io_in_recv = NULL; 579 return ret; 580 } 581 582 io->offset += ret; 583 received = ret; 584 585 /* request is fully received */ 586 if (io->offset == sizeof(io->req)) { 587 io->offset = 0; 588 589 /* req magic check */ 590 if (from_be32(&io->req.magic) != NBD_REQUEST_MAGIC) { 591 SPDK_ERRLOG("invalid request magic\n"); 592 nbd_put_io(nbd, io); 593 nbd->io_in_recv = NULL; 594 return -EINVAL; 595 } 596 597 if (from_be32(&io->req.type) == NBD_CMD_DISC) { 598 nbd->is_closing = true; 599 nbd->io_in_recv = NULL; 600 if (nbd->interrupt_mode && TAILQ_EMPTY(&nbd->executed_io_list)) { 601 spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN | SPDK_INTERRUPT_EVENT_OUT); 602 } 603 nbd_put_io(nbd, io); 604 /* After receiving NBD_CMD_DISC, nbd will not receive any new commands */ 605 return received; 606 } 607 608 /* io except read/write should ignore payload */ 609 if (from_be32(&io->req.type) == NBD_CMD_WRITE || 610 from_be32(&io->req.type) == NBD_CMD_READ) { 611 io->payload_size = from_be32(&io->req.len); 612 } else { 613 io->payload_size = 0; 614 } 615 616 /* io payload allocate */ 617 if (io->payload_size) { 618 io->payload = spdk_malloc(io->payload_size, nbd->buf_align, NULL, 619 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); 620 if (io->payload == NULL) { 621 SPDK_ERRLOG("could not allocate io->payload of size %d\n", io->payload_size); 622 nbd_put_io(nbd, io); 623 nbd->io_in_recv = NULL; 624 return -ENOMEM; 625 } 626 } else { 627 io->payload = NULL; 628 } 629 630 /* next io step */ 631 if (from_be32(&io->req.type) == NBD_CMD_WRITE) { 632 io->state = NBD_IO_RECV_PAYLOAD; 633 } else { 634 io->state = NBD_IO_XMIT_RESP; 635 if (spdk_likely((!nbd->is_closing) && nbd->is_started)) { 636 TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq); 637 } else { 638 TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq); 639 nbd_io_done(NULL, false, io); 640 } 641 nbd->io_in_recv = NULL; 642 } 643 } 644 } 645 646 if (io->state == NBD_IO_RECV_PAYLOAD) { 647 ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset, true); 648 if (ret < 0) { 649 nbd_put_io(nbd, io); 650 nbd->io_in_recv = NULL; 651 return ret; 652 } 653 654 io->offset += ret; 655 received += ret; 656 657 /* request payload is fully received */ 658 if (io->offset == io->payload_size) { 659 io->offset = 0; 660 io->state = NBD_IO_XMIT_RESP; 661 if (spdk_likely((!nbd->is_closing) && nbd->is_started)) { 662 TAILQ_INSERT_TAIL(&nbd->received_io_list, io, tailq); 663 } else { 664 TAILQ_INSERT_TAIL(&nbd->processing_io_list, io, tailq); 665 nbd_io_done(NULL, false, io); 666 } 667 nbd->io_in_recv = NULL; 668 } 669 670 } 671 672 return received; 673 } 674 675 static int 676 nbd_io_recv(struct spdk_nbd_disk *nbd) 677 { 678 int i, rc, ret = 0; 679 680 /* 681 * nbd server should not accept request after closing command 682 */ 683 if (nbd->is_closing) { 684 return 0; 685 } 686 687 for (i = 0; i < GET_IO_LOOP_COUNT; i++) { 688 rc = nbd_io_recv_internal(nbd); 689 if (rc < 0) { 690 return rc; 691 } 692 ret += rc; 693 if (nbd->is_closing) { 694 break; 695 } 696 } 697 698 return ret; 699 } 700 701 static int 702 nbd_io_xmit_internal(struct spdk_nbd_disk *nbd) 703 { 704 struct nbd_io *io; 705 int ret = 0; 706 int sent = 0; 707 708 io = TAILQ_FIRST(&nbd->executed_io_list); 709 if (io == NULL) { 710 return 0; 711 } 712 713 /* Remove IO from list now assuming it will be completed. It will be inserted 714 * back to the head if it cannot be completed. This approach is specifically 715 * taken to work around a scan-build use-after-free mischaracterization. 716 */ 717 TAILQ_REMOVE(&nbd->executed_io_list, io, tailq); 718 719 /* resp error and handler are already set in io_done */ 720 721 if (io->state == NBD_IO_XMIT_RESP) { 722 ret = nbd_socket_rw(nbd->spdk_sp_fd, (char *)&io->resp + io->offset, 723 sizeof(io->resp) - io->offset, false); 724 if (ret <= 0) { 725 goto reinsert; 726 } 727 728 io->offset += ret; 729 sent = ret; 730 731 /* response is fully transmitted */ 732 if (io->offset == sizeof(io->resp)) { 733 io->offset = 0; 734 735 /* transmit payload only when NBD_CMD_READ with no resp error */ 736 if (from_be32(&io->req.type) != NBD_CMD_READ || io->resp.error != 0) { 737 nbd_put_io(nbd, io); 738 return 0; 739 } else { 740 io->state = NBD_IO_XMIT_PAYLOAD; 741 } 742 } 743 } 744 745 if (io->state == NBD_IO_XMIT_PAYLOAD) { 746 ret = nbd_socket_rw(nbd->spdk_sp_fd, io->payload + io->offset, io->payload_size - io->offset, 747 false); 748 if (ret <= 0) { 749 goto reinsert; 750 } 751 752 io->offset += ret; 753 sent += ret; 754 755 /* read payload is fully transmitted */ 756 if (io->offset == io->payload_size) { 757 nbd_put_io(nbd, io); 758 return sent; 759 } 760 } 761 762 reinsert: 763 TAILQ_INSERT_HEAD(&nbd->executed_io_list, io, tailq); 764 return ret < 0 ? ret : sent; 765 } 766 767 static int 768 nbd_io_xmit(struct spdk_nbd_disk *nbd) 769 { 770 int ret = 0; 771 int rc; 772 773 while (!TAILQ_EMPTY(&nbd->executed_io_list)) { 774 rc = nbd_io_xmit_internal(nbd); 775 if (rc < 0) { 776 return rc; 777 } 778 779 ret += rc; 780 } 781 782 /* When there begins to have no executed_io, disable socket writable notice */ 783 if (nbd->interrupt_mode) { 784 spdk_interrupt_set_event_types(nbd->intr, SPDK_INTERRUPT_EVENT_IN); 785 } 786 787 return ret; 788 } 789 790 /** 791 * Poll an NBD instance. 792 * 793 * \return 0 on success or negated errno values on error (e.g. connection closed). 794 */ 795 static int 796 _nbd_poll(struct spdk_nbd_disk *nbd) 797 { 798 int received, sent, executed; 799 800 /* transmit executed io first */ 801 sent = nbd_io_xmit(nbd); 802 if (sent < 0) { 803 return sent; 804 } 805 806 received = nbd_io_recv(nbd); 807 if (received < 0) { 808 return received; 809 } 810 811 executed = nbd_io_exec(nbd); 812 if (executed < 0) { 813 return executed; 814 } 815 816 return sent + received + executed; 817 } 818 819 static int 820 nbd_poll(void *arg) 821 { 822 struct spdk_nbd_disk *nbd = arg; 823 int rc; 824 825 rc = _nbd_poll(nbd); 826 if (rc < 0) { 827 SPDK_INFOLOG(nbd, "nbd_poll() returned %s (%d); closing connection\n", 828 spdk_strerror(-rc), rc); 829 _nbd_stop(nbd); 830 return SPDK_POLLER_IDLE; 831 } 832 if (nbd->is_closing && nbd->io_count == 0) { 833 spdk_nbd_stop(nbd); 834 } 835 836 return rc == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY; 837 } 838 839 static void * 840 nbd_start_kernel(void *arg) 841 { 842 struct spdk_nbd_disk *nbd = arg; 843 844 spdk_unaffinitize_thread(); 845 846 /* This will block in the kernel until we close the spdk_sp_fd. */ 847 ioctl(nbd->dev_fd, NBD_DO_IT); 848 849 nbd->has_nbd_pthread = false; 850 851 pthread_exit(NULL); 852 } 853 854 static void 855 nbd_bdev_hot_remove(struct spdk_nbd_disk *nbd) 856 { 857 struct nbd_io *io, *io_tmp; 858 859 nbd->is_closing = true; 860 nbd_cleanup_io(nbd); 861 862 TAILQ_FOREACH_SAFE(io, &nbd->received_io_list, tailq, io_tmp) { 863 TAILQ_REMOVE(&nbd->received_io_list, io, tailq); 864 nbd_io_done(NULL, false, io); 865 } 866 } 867 868 static void 869 nbd_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, 870 void *event_ctx) 871 { 872 switch (type) { 873 case SPDK_BDEV_EVENT_REMOVE: 874 nbd_bdev_hot_remove(event_ctx); 875 break; 876 default: 877 SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type); 878 break; 879 } 880 } 881 882 struct spdk_nbd_start_ctx { 883 struct spdk_nbd_disk *nbd; 884 spdk_nbd_start_cb cb_fn; 885 void *cb_arg; 886 }; 887 888 static void 889 nbd_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg, bool interrupt_mode) 890 { 891 struct spdk_nbd_disk *nbd = cb_arg; 892 893 nbd->interrupt_mode = interrupt_mode; 894 } 895 896 static void 897 nbd_start_complete(struct spdk_nbd_start_ctx *ctx) 898 { 899 int rc; 900 pthread_t tid; 901 unsigned long nbd_flags = 0; 902 903 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_BLKSIZE, spdk_bdev_get_block_size(ctx->nbd->bdev)); 904 if (rc == -1) { 905 SPDK_ERRLOG("ioctl(NBD_SET_BLKSIZE) failed: %s\n", spdk_strerror(errno)); 906 rc = -errno; 907 goto err; 908 } 909 910 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SIZE_BLOCKS, spdk_bdev_get_num_blocks(ctx->nbd->bdev)); 911 if (rc == -1) { 912 SPDK_ERRLOG("ioctl(NBD_SET_SIZE_BLOCKS) failed: %s\n", spdk_strerror(errno)); 913 rc = -errno; 914 goto err; 915 } 916 917 #ifdef NBD_SET_TIMEOUT 918 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_TIMEOUT, NBD_IO_TIMEOUT_S); 919 if (rc == -1) { 920 SPDK_ERRLOG("ioctl(NBD_SET_TIMEOUT) failed: %s\n", spdk_strerror(errno)); 921 rc = -errno; 922 goto err; 923 } 924 #else 925 SPDK_NOTICELOG("ioctl(NBD_SET_TIMEOUT) is not supported.\n"); 926 #endif 927 928 #ifdef NBD_FLAG_SEND_FLUSH 929 if (spdk_bdev_io_type_supported(ctx->nbd->bdev, SPDK_BDEV_IO_TYPE_FLUSH)) { 930 nbd_flags |= NBD_FLAG_SEND_FLUSH; 931 } 932 #endif 933 #ifdef NBD_FLAG_SEND_TRIM 934 if (spdk_bdev_io_type_supported(ctx->nbd->bdev, SPDK_BDEV_IO_TYPE_UNMAP)) { 935 nbd_flags |= NBD_FLAG_SEND_TRIM; 936 } 937 #endif 938 939 if (nbd_flags) { 940 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_FLAGS, nbd_flags); 941 if (rc == -1) { 942 SPDK_ERRLOG("ioctl(NBD_SET_FLAGS, 0x%lx) failed: %s\n", nbd_flags, spdk_strerror(errno)); 943 rc = -errno; 944 goto err; 945 } 946 } 947 948 ctx->nbd->has_nbd_pthread = true; 949 rc = pthread_create(&tid, NULL, nbd_start_kernel, ctx->nbd); 950 if (rc != 0) { 951 ctx->nbd->has_nbd_pthread = false; 952 SPDK_ERRLOG("could not create thread: %s\n", spdk_strerror(rc)); 953 rc = -rc; 954 goto err; 955 } 956 957 rc = pthread_detach(tid); 958 if (rc != 0) { 959 SPDK_ERRLOG("could not detach thread for nbd kernel: %s\n", spdk_strerror(rc)); 960 rc = -rc; 961 goto err; 962 } 963 964 if (spdk_interrupt_mode_is_enabled()) { 965 ctx->nbd->intr = SPDK_INTERRUPT_REGISTER(ctx->nbd->spdk_sp_fd, nbd_poll, ctx->nbd); 966 } 967 968 ctx->nbd->nbd_poller = SPDK_POLLER_REGISTER(nbd_poll, ctx->nbd, 0); 969 spdk_poller_register_interrupt(ctx->nbd->nbd_poller, nbd_poller_set_interrupt_mode, ctx->nbd); 970 971 if (ctx->cb_fn) { 972 ctx->cb_fn(ctx->cb_arg, ctx->nbd, 0); 973 } 974 975 /* nbd will possibly receive stop command while initing */ 976 ctx->nbd->is_started = true; 977 978 free(ctx); 979 return; 980 981 err: 982 _nbd_stop(ctx->nbd); 983 if (ctx->cb_fn) { 984 ctx->cb_fn(ctx->cb_arg, NULL, rc); 985 } 986 free(ctx); 987 } 988 989 static int 990 nbd_enable_kernel(void *arg) 991 { 992 struct spdk_nbd_start_ctx *ctx = arg; 993 int rc; 994 995 /* Declare device setup by this process */ 996 rc = ioctl(ctx->nbd->dev_fd, NBD_SET_SOCK, ctx->nbd->kernel_sp_fd); 997 998 if (rc) { 999 if (errno == EBUSY) { 1000 if (ctx->nbd->retry_poller == NULL) { 1001 ctx->nbd->retry_count = NBD_START_BUSY_WAITING_MS * 1000ULL / NBD_BUSY_POLLING_INTERVAL_US; 1002 ctx->nbd->retry_poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx, 1003 NBD_BUSY_POLLING_INTERVAL_US); 1004 return SPDK_POLLER_BUSY; 1005 } else if (ctx->nbd->retry_count-- > 0) { 1006 /* Repeatedly unregister and register retry poller to avoid scan-build error */ 1007 spdk_poller_unregister(&ctx->nbd->retry_poller); 1008 ctx->nbd->retry_poller = SPDK_POLLER_REGISTER(nbd_enable_kernel, ctx, 1009 NBD_BUSY_POLLING_INTERVAL_US); 1010 return SPDK_POLLER_BUSY; 1011 } 1012 } 1013 1014 SPDK_ERRLOG("ioctl(NBD_SET_SOCK) failed: %s\n", spdk_strerror(errno)); 1015 if (ctx->nbd->retry_poller) { 1016 spdk_poller_unregister(&ctx->nbd->retry_poller); 1017 } 1018 1019 _nbd_stop(ctx->nbd); 1020 1021 if (ctx->cb_fn) { 1022 ctx->cb_fn(ctx->cb_arg, NULL, -errno); 1023 } 1024 1025 free(ctx); 1026 return SPDK_POLLER_BUSY; 1027 } 1028 1029 if (ctx->nbd->retry_poller) { 1030 spdk_poller_unregister(&ctx->nbd->retry_poller); 1031 } 1032 1033 nbd_start_complete(ctx); 1034 1035 return SPDK_POLLER_BUSY; 1036 } 1037 1038 void 1039 spdk_nbd_start(const char *bdev_name, const char *nbd_path, 1040 spdk_nbd_start_cb cb_fn, void *cb_arg) 1041 { 1042 struct spdk_nbd_start_ctx *ctx = NULL; 1043 struct spdk_nbd_disk *nbd = NULL; 1044 struct spdk_bdev *bdev; 1045 int rc; 1046 int sp[2]; 1047 1048 nbd = calloc(1, sizeof(*nbd)); 1049 if (nbd == NULL) { 1050 rc = -ENOMEM; 1051 goto err; 1052 } 1053 1054 nbd->dev_fd = -1; 1055 nbd->spdk_sp_fd = -1; 1056 nbd->kernel_sp_fd = -1; 1057 1058 ctx = calloc(1, sizeof(*ctx)); 1059 if (ctx == NULL) { 1060 rc = -ENOMEM; 1061 goto err; 1062 } 1063 1064 ctx->nbd = nbd; 1065 ctx->cb_fn = cb_fn; 1066 ctx->cb_arg = cb_arg; 1067 1068 rc = spdk_bdev_open_ext(bdev_name, true, nbd_bdev_event_cb, nbd, &nbd->bdev_desc); 1069 if (rc != 0) { 1070 SPDK_ERRLOG("could not open bdev %s, error=%d\n", bdev_name, rc); 1071 goto err; 1072 } 1073 1074 bdev = spdk_bdev_desc_get_bdev(nbd->bdev_desc); 1075 nbd->bdev = bdev; 1076 1077 nbd->ch = spdk_bdev_get_io_channel(nbd->bdev_desc); 1078 nbd->buf_align = spdk_max(spdk_bdev_get_buf_align(bdev), 64); 1079 1080 rc = socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, sp); 1081 if (rc != 0) { 1082 SPDK_ERRLOG("socketpair failed\n"); 1083 rc = -errno; 1084 goto err; 1085 } 1086 1087 nbd->spdk_sp_fd = sp[0]; 1088 nbd->kernel_sp_fd = sp[1]; 1089 nbd->nbd_path = strdup(nbd_path); 1090 if (!nbd->nbd_path) { 1091 SPDK_ERRLOG("strdup allocation failure\n"); 1092 rc = -ENOMEM; 1093 goto err; 1094 } 1095 1096 TAILQ_INIT(&nbd->received_io_list); 1097 TAILQ_INIT(&nbd->executed_io_list); 1098 TAILQ_INIT(&nbd->processing_io_list); 1099 1100 /* Add nbd_disk to the end of disk list */ 1101 rc = nbd_disk_register(ctx->nbd); 1102 if (rc != 0) { 1103 goto err; 1104 } 1105 1106 nbd->dev_fd = open(nbd_path, O_RDWR | O_DIRECT); 1107 if (nbd->dev_fd == -1) { 1108 SPDK_ERRLOG("open(\"%s\") failed: %s\n", nbd_path, spdk_strerror(errno)); 1109 rc = -errno; 1110 goto err; 1111 } 1112 1113 SPDK_INFOLOG(nbd, "Enabling kernel access to bdev %s via %s\n", 1114 bdev_name, nbd_path); 1115 1116 nbd_enable_kernel(ctx); 1117 return; 1118 1119 err: 1120 free(ctx); 1121 if (nbd) { 1122 _nbd_stop(nbd); 1123 } 1124 1125 if (cb_fn) { 1126 cb_fn(cb_arg, NULL, rc); 1127 } 1128 } 1129 1130 const char * 1131 spdk_nbd_get_path(struct spdk_nbd_disk *nbd) 1132 { 1133 return nbd->nbd_path; 1134 } 1135 1136 SPDK_LOG_REGISTER_COMPONENT(nbd) 1137