1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "tree.h" 38 39 #include "spdk/queue.h" 40 #include "spdk/thread.h" 41 #include "spdk/assert.h" 42 #include "spdk/env.h" 43 #include "spdk/util.h" 44 #include "spdk/log.h" 45 #include "spdk/trace.h" 46 47 #include "spdk_internal/trace_defs.h" 48 49 #define BLOBFS_TRACE(file, str, args...) \ 50 SPDK_DEBUGLOG(blobfs, "file=%s " str, file->name, ##args) 51 52 #define BLOBFS_TRACE_RW(file, str, args...) \ 53 SPDK_DEBUGLOG(blobfs_rw, "file=%s " str, file->name, ##args) 54 55 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 56 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 57 58 #define SPDK_BLOBFS_SIGNATURE "BLOBFS" 59 60 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 61 static struct spdk_mempool *g_cache_pool; 62 static TAILQ_HEAD(, spdk_file) g_caches = TAILQ_HEAD_INITIALIZER(g_caches); 63 static struct spdk_poller *g_cache_pool_mgmt_poller; 64 static struct spdk_thread *g_cache_pool_thread; 65 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL 66 static int g_fs_count = 0; 67 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 68 69 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS) 70 { 71 struct spdk_trace_tpoint_opts opts[] = { 72 { 73 "BLOBFS_XATTR_START", TRACE_BLOBFS_XATTR_START, 74 OWNER_NONE, OBJECT_NONE, 0, 75 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 76 }, 77 { 78 "BLOBFS_XATTR_END", TRACE_BLOBFS_XATTR_END, 79 OWNER_NONE, OBJECT_NONE, 0, 80 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 81 }, 82 { 83 "BLOBFS_OPEN", TRACE_BLOBFS_OPEN, 84 OWNER_NONE, OBJECT_NONE, 0, 85 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 86 }, 87 { 88 "BLOBFS_CLOSE", TRACE_BLOBFS_CLOSE, 89 OWNER_NONE, OBJECT_NONE, 0, 90 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 91 }, 92 { 93 "BLOBFS_DELETE_START", TRACE_BLOBFS_DELETE_START, 94 OWNER_NONE, OBJECT_NONE, 0, 95 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 96 }, 97 { 98 "BLOBFS_DELETE_DONE", TRACE_BLOBFS_DELETE_DONE, 99 OWNER_NONE, OBJECT_NONE, 0, 100 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 101 } 102 }; 103 104 spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts)); 105 } 106 107 void 108 cache_buffer_free(struct cache_buffer *cache_buffer) 109 { 110 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 111 free(cache_buffer); 112 } 113 114 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 115 116 struct spdk_file { 117 struct spdk_filesystem *fs; 118 struct spdk_blob *blob; 119 char *name; 120 uint64_t length; 121 bool is_deleted; 122 bool open_for_writing; 123 uint64_t length_flushed; 124 uint64_t length_xattr; 125 uint64_t append_pos; 126 uint64_t seq_byte_count; 127 uint64_t next_seq_offset; 128 uint32_t priority; 129 TAILQ_ENTRY(spdk_file) tailq; 130 spdk_blob_id blobid; 131 uint32_t ref_count; 132 pthread_spinlock_t lock; 133 struct cache_buffer *last; 134 struct cache_tree *tree; 135 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 136 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 137 TAILQ_ENTRY(spdk_file) cache_tailq; 138 }; 139 140 struct spdk_deleted_file { 141 spdk_blob_id id; 142 TAILQ_ENTRY(spdk_deleted_file) tailq; 143 }; 144 145 struct spdk_filesystem { 146 struct spdk_blob_store *bs; 147 TAILQ_HEAD(, spdk_file) files; 148 struct spdk_bs_opts bs_opts; 149 struct spdk_bs_dev *bdev; 150 fs_send_request_fn send_request; 151 152 struct { 153 uint32_t max_ops; 154 struct spdk_io_channel *sync_io_channel; 155 struct spdk_fs_channel *sync_fs_channel; 156 } sync_target; 157 158 struct { 159 uint32_t max_ops; 160 struct spdk_io_channel *md_io_channel; 161 struct spdk_fs_channel *md_fs_channel; 162 } md_target; 163 164 struct { 165 uint32_t max_ops; 166 } io_target; 167 }; 168 169 struct spdk_fs_cb_args { 170 union { 171 spdk_fs_op_with_handle_complete fs_op_with_handle; 172 spdk_fs_op_complete fs_op; 173 spdk_file_op_with_handle_complete file_op_with_handle; 174 spdk_file_op_complete file_op; 175 spdk_file_stat_op_complete stat_op; 176 } fn; 177 void *arg; 178 sem_t *sem; 179 struct spdk_filesystem *fs; 180 struct spdk_file *file; 181 int rc; 182 int *rwerrno; 183 struct iovec *iovs; 184 uint32_t iovcnt; 185 struct iovec iov; 186 union { 187 struct { 188 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 189 } fs_load; 190 struct { 191 uint64_t length; 192 } truncate; 193 struct { 194 struct spdk_io_channel *channel; 195 void *pin_buf; 196 int is_read; 197 off_t offset; 198 size_t length; 199 uint64_t start_lba; 200 uint64_t num_lba; 201 uint32_t blocklen; 202 } rw; 203 struct { 204 const char *old_name; 205 const char *new_name; 206 } rename; 207 struct { 208 struct cache_buffer *cache_buffer; 209 uint64_t length; 210 } flush; 211 struct { 212 struct cache_buffer *cache_buffer; 213 uint64_t length; 214 uint64_t offset; 215 } readahead; 216 struct { 217 /* offset of the file when the sync request was made */ 218 uint64_t offset; 219 TAILQ_ENTRY(spdk_fs_request) tailq; 220 bool xattr_in_progress; 221 /* length written to the xattr for this file - this should 222 * always be the same as the offset if only one thread is 223 * writing to the file, but could differ if multiple threads 224 * are appending 225 */ 226 uint64_t length; 227 } sync; 228 struct { 229 uint32_t num_clusters; 230 } resize; 231 struct { 232 const char *name; 233 uint32_t flags; 234 TAILQ_ENTRY(spdk_fs_request) tailq; 235 } open; 236 struct { 237 const char *name; 238 struct spdk_blob *blob; 239 } create; 240 struct { 241 const char *name; 242 } delete; 243 struct { 244 const char *name; 245 } stat; 246 } op; 247 }; 248 249 static void file_free(struct spdk_file *file); 250 static void fs_io_device_unregister(struct spdk_filesystem *fs); 251 static void fs_free_io_channels(struct spdk_filesystem *fs); 252 253 void 254 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 255 { 256 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 257 } 258 259 static int _blobfs_cache_pool_reclaim(void *arg); 260 261 static bool 262 blobfs_cache_pool_need_reclaim(void) 263 { 264 size_t count; 265 266 count = spdk_mempool_count(g_cache_pool); 267 /* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller 268 * when the number of available cache buffer is less than 1/5 of total buffers. 269 */ 270 if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) { 271 return false; 272 } 273 274 return true; 275 } 276 277 static void 278 __start_cache_pool_mgmt(void *ctx) 279 { 280 assert(g_cache_pool == NULL); 281 282 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 283 g_fs_cache_size / CACHE_BUFFER_SIZE, 284 CACHE_BUFFER_SIZE, 285 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 286 SPDK_ENV_SOCKET_ID_ANY); 287 if (!g_cache_pool) { 288 SPDK_ERRLOG("Create mempool failed, you may " 289 "increase the memory and try again\n"); 290 assert(false); 291 } 292 293 assert(g_cache_pool_mgmt_poller == NULL); 294 g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL, 295 BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 296 } 297 298 static void 299 __stop_cache_pool_mgmt(void *ctx) 300 { 301 spdk_poller_unregister(&g_cache_pool_mgmt_poller); 302 303 assert(g_cache_pool != NULL); 304 assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE); 305 spdk_mempool_free(g_cache_pool); 306 g_cache_pool = NULL; 307 308 spdk_thread_exit(g_cache_pool_thread); 309 } 310 311 static void 312 initialize_global_cache(void) 313 { 314 pthread_mutex_lock(&g_cache_init_lock); 315 if (g_fs_count == 0) { 316 g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL); 317 assert(g_cache_pool_thread != NULL); 318 spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL); 319 } 320 g_fs_count++; 321 pthread_mutex_unlock(&g_cache_init_lock); 322 } 323 324 static void 325 free_global_cache(void) 326 { 327 pthread_mutex_lock(&g_cache_init_lock); 328 g_fs_count--; 329 if (g_fs_count == 0) { 330 spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL); 331 } 332 pthread_mutex_unlock(&g_cache_init_lock); 333 } 334 335 static uint64_t 336 __file_get_blob_size(struct spdk_file *file) 337 { 338 uint64_t cluster_sz; 339 340 cluster_sz = file->fs->bs_opts.cluster_sz; 341 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 342 } 343 344 struct spdk_fs_request { 345 struct spdk_fs_cb_args args; 346 TAILQ_ENTRY(spdk_fs_request) link; 347 struct spdk_fs_channel *channel; 348 }; 349 350 struct spdk_fs_channel { 351 struct spdk_fs_request *req_mem; 352 TAILQ_HEAD(, spdk_fs_request) reqs; 353 sem_t sem; 354 struct spdk_filesystem *fs; 355 struct spdk_io_channel *bs_channel; 356 fs_send_request_fn send_request; 357 bool sync; 358 uint32_t outstanding_reqs; 359 pthread_spinlock_t lock; 360 }; 361 362 /* For now, this is effectively an alias. But eventually we'll shift 363 * some data members over. */ 364 struct spdk_fs_thread_ctx { 365 struct spdk_fs_channel ch; 366 }; 367 368 static struct spdk_fs_request * 369 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt) 370 { 371 struct spdk_fs_request *req; 372 struct iovec *iovs = NULL; 373 374 if (iovcnt > 1) { 375 iovs = calloc(iovcnt, sizeof(struct iovec)); 376 if (!iovs) { 377 return NULL; 378 } 379 } 380 381 if (channel->sync) { 382 pthread_spin_lock(&channel->lock); 383 } 384 385 req = TAILQ_FIRST(&channel->reqs); 386 if (req) { 387 channel->outstanding_reqs++; 388 TAILQ_REMOVE(&channel->reqs, req, link); 389 } 390 391 if (channel->sync) { 392 pthread_spin_unlock(&channel->lock); 393 } 394 395 if (req == NULL) { 396 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 397 free(iovs); 398 return NULL; 399 } 400 memset(req, 0, sizeof(*req)); 401 req->channel = channel; 402 if (iovcnt > 1) { 403 req->args.iovs = iovs; 404 } else { 405 req->args.iovs = &req->args.iov; 406 } 407 req->args.iovcnt = iovcnt; 408 409 return req; 410 } 411 412 static struct spdk_fs_request * 413 alloc_fs_request(struct spdk_fs_channel *channel) 414 { 415 return alloc_fs_request_with_iov(channel, 0); 416 } 417 418 static void 419 free_fs_request(struct spdk_fs_request *req) 420 { 421 struct spdk_fs_channel *channel = req->channel; 422 423 if (req->args.iovcnt > 1) { 424 free(req->args.iovs); 425 } 426 427 if (channel->sync) { 428 pthread_spin_lock(&channel->lock); 429 } 430 431 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 432 channel->outstanding_reqs--; 433 434 if (channel->sync) { 435 pthread_spin_unlock(&channel->lock); 436 } 437 } 438 439 static int 440 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 441 uint32_t max_ops) 442 { 443 uint32_t i; 444 445 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 446 if (!channel->req_mem) { 447 return -1; 448 } 449 450 channel->outstanding_reqs = 0; 451 TAILQ_INIT(&channel->reqs); 452 sem_init(&channel->sem, 0, 0); 453 454 for (i = 0; i < max_ops; i++) { 455 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 456 } 457 458 channel->fs = fs; 459 460 return 0; 461 } 462 463 static int 464 fs_md_channel_create(void *io_device, void *ctx_buf) 465 { 466 struct spdk_filesystem *fs; 467 struct spdk_fs_channel *channel = ctx_buf; 468 469 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 470 471 return fs_channel_create(fs, channel, fs->md_target.max_ops); 472 } 473 474 static int 475 fs_sync_channel_create(void *io_device, void *ctx_buf) 476 { 477 struct spdk_filesystem *fs; 478 struct spdk_fs_channel *channel = ctx_buf; 479 480 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 481 482 return fs_channel_create(fs, channel, fs->sync_target.max_ops); 483 } 484 485 static int 486 fs_io_channel_create(void *io_device, void *ctx_buf) 487 { 488 struct spdk_filesystem *fs; 489 struct spdk_fs_channel *channel = ctx_buf; 490 491 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 492 493 return fs_channel_create(fs, channel, fs->io_target.max_ops); 494 } 495 496 static void 497 fs_channel_destroy(void *io_device, void *ctx_buf) 498 { 499 struct spdk_fs_channel *channel = ctx_buf; 500 501 if (channel->outstanding_reqs > 0) { 502 SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n", 503 channel->outstanding_reqs); 504 } 505 506 free(channel->req_mem); 507 if (channel->bs_channel != NULL) { 508 spdk_bs_free_io_channel(channel->bs_channel); 509 } 510 } 511 512 static void 513 __send_request_direct(fs_request_fn fn, void *arg) 514 { 515 fn(arg); 516 } 517 518 static void 519 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 520 { 521 fs->bs = bs; 522 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 523 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 524 fs->md_target.md_fs_channel->send_request = __send_request_direct; 525 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 526 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 527 528 initialize_global_cache(); 529 } 530 531 static void 532 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 533 { 534 struct spdk_fs_request *req = ctx; 535 struct spdk_fs_cb_args *args = &req->args; 536 struct spdk_filesystem *fs = args->fs; 537 538 if (bserrno == 0) { 539 common_fs_bs_init(fs, bs); 540 } else { 541 free(fs); 542 fs = NULL; 543 } 544 545 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 546 free_fs_request(req); 547 } 548 549 static struct spdk_filesystem * 550 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 551 { 552 struct spdk_filesystem *fs; 553 554 fs = calloc(1, sizeof(*fs)); 555 if (fs == NULL) { 556 return NULL; 557 } 558 559 fs->bdev = dev; 560 fs->send_request = send_request_fn; 561 TAILQ_INIT(&fs->files); 562 563 fs->md_target.max_ops = 512; 564 spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy, 565 sizeof(struct spdk_fs_channel), "blobfs_md"); 566 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 567 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 568 569 fs->sync_target.max_ops = 512; 570 spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy, 571 sizeof(struct spdk_fs_channel), "blobfs_sync"); 572 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 573 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 574 575 fs->io_target.max_ops = 512; 576 spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy, 577 sizeof(struct spdk_fs_channel), "blobfs_io"); 578 579 return fs; 580 } 581 582 static void 583 __wake_caller(void *arg, int fserrno) 584 { 585 struct spdk_fs_cb_args *args = arg; 586 587 if ((args->rwerrno != NULL) && (*(args->rwerrno) == 0) && fserrno) { 588 *(args->rwerrno) = fserrno; 589 } 590 args->rc = fserrno; 591 sem_post(args->sem); 592 } 593 594 void 595 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 596 fs_send_request_fn send_request_fn, 597 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 598 { 599 struct spdk_filesystem *fs; 600 struct spdk_fs_request *req; 601 struct spdk_fs_cb_args *args; 602 struct spdk_bs_opts opts = {}; 603 604 fs = fs_alloc(dev, send_request_fn); 605 if (fs == NULL) { 606 cb_fn(cb_arg, NULL, -ENOMEM); 607 return; 608 } 609 610 req = alloc_fs_request(fs->md_target.md_fs_channel); 611 if (req == NULL) { 612 fs_free_io_channels(fs); 613 fs_io_device_unregister(fs); 614 cb_fn(cb_arg, NULL, -ENOMEM); 615 return; 616 } 617 618 args = &req->args; 619 args->fn.fs_op_with_handle = cb_fn; 620 args->arg = cb_arg; 621 args->fs = fs; 622 623 spdk_bs_opts_init(&opts, sizeof(opts)); 624 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE); 625 if (opt) { 626 opts.cluster_sz = opt->cluster_sz; 627 } 628 spdk_bs_init(dev, &opts, init_cb, req); 629 } 630 631 static struct spdk_file * 632 file_alloc(struct spdk_filesystem *fs) 633 { 634 struct spdk_file *file; 635 636 file = calloc(1, sizeof(*file)); 637 if (file == NULL) { 638 return NULL; 639 } 640 641 file->tree = calloc(1, sizeof(*file->tree)); 642 if (file->tree == NULL) { 643 free(file); 644 return NULL; 645 } 646 647 if (pthread_spin_init(&file->lock, 0)) { 648 free(file->tree); 649 free(file); 650 return NULL; 651 } 652 653 file->fs = fs; 654 TAILQ_INIT(&file->open_requests); 655 TAILQ_INIT(&file->sync_requests); 656 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 657 file->priority = SPDK_FILE_PRIORITY_LOW; 658 return file; 659 } 660 661 static void fs_load_done(void *ctx, int bserrno); 662 663 static int 664 _handle_deleted_files(struct spdk_fs_request *req) 665 { 666 struct spdk_fs_cb_args *args = &req->args; 667 struct spdk_filesystem *fs = args->fs; 668 669 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 670 struct spdk_deleted_file *deleted_file; 671 672 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 673 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 674 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 675 free(deleted_file); 676 return 0; 677 } 678 679 return 1; 680 } 681 682 static void 683 fs_load_done(void *ctx, int bserrno) 684 { 685 struct spdk_fs_request *req = ctx; 686 struct spdk_fs_cb_args *args = &req->args; 687 struct spdk_filesystem *fs = args->fs; 688 689 /* The filesystem has been loaded. Now check if there are any files that 690 * were marked for deletion before last unload. Do not complete the 691 * fs_load callback until all of them have been deleted on disk. 692 */ 693 if (_handle_deleted_files(req) == 0) { 694 /* We found a file that's been marked for deleting but not actually 695 * deleted yet. This function will get called again once the delete 696 * operation is completed. 697 */ 698 return; 699 } 700 701 args->fn.fs_op_with_handle(args->arg, fs, 0); 702 free_fs_request(req); 703 704 } 705 706 static void 707 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 708 { 709 struct spdk_fs_request *req = ctx; 710 struct spdk_fs_cb_args *args = &req->args; 711 struct spdk_filesystem *fs = args->fs; 712 uint64_t *length; 713 const char *name; 714 uint32_t *is_deleted; 715 size_t value_len; 716 717 if (rc < 0) { 718 args->fn.fs_op_with_handle(args->arg, fs, rc); 719 free_fs_request(req); 720 return; 721 } 722 723 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 724 if (rc < 0) { 725 args->fn.fs_op_with_handle(args->arg, fs, rc); 726 free_fs_request(req); 727 return; 728 } 729 730 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 731 if (rc < 0) { 732 args->fn.fs_op_with_handle(args->arg, fs, rc); 733 free_fs_request(req); 734 return; 735 } 736 737 assert(value_len == 8); 738 739 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 740 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 741 if (rc < 0) { 742 struct spdk_file *f; 743 744 f = file_alloc(fs); 745 if (f == NULL) { 746 SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n"); 747 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 748 free_fs_request(req); 749 return; 750 } 751 752 f->name = strdup(name); 753 if (!f->name) { 754 SPDK_ERRLOG("Cannot allocate memory for file name\n"); 755 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 756 free_fs_request(req); 757 file_free(f); 758 return; 759 } 760 761 f->blobid = spdk_blob_get_id(blob); 762 f->length = *length; 763 f->length_flushed = *length; 764 f->length_xattr = *length; 765 f->append_pos = *length; 766 SPDK_DEBUGLOG(blobfs, "added file %s length=%ju\n", f->name, f->length); 767 } else { 768 struct spdk_deleted_file *deleted_file; 769 770 deleted_file = calloc(1, sizeof(*deleted_file)); 771 if (deleted_file == NULL) { 772 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 773 free_fs_request(req); 774 return; 775 } 776 deleted_file->id = spdk_blob_get_id(blob); 777 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 778 } 779 } 780 781 static void 782 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 783 { 784 struct spdk_fs_request *req = ctx; 785 struct spdk_fs_cb_args *args = &req->args; 786 struct spdk_filesystem *fs = args->fs; 787 struct spdk_bs_type bstype; 788 static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE}; 789 static const struct spdk_bs_type zeros; 790 791 if (bserrno != 0) { 792 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 793 free_fs_request(req); 794 fs_free_io_channels(fs); 795 fs_io_device_unregister(fs); 796 return; 797 } 798 799 bstype = spdk_bs_get_bstype(bs); 800 801 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 802 SPDK_DEBUGLOG(blobfs, "assigning bstype\n"); 803 spdk_bs_set_bstype(bs, blobfs_type); 804 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 805 SPDK_ERRLOG("not blobfs\n"); 806 SPDK_LOGDUMP(blobfs, "bstype", &bstype, sizeof(bstype)); 807 args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL); 808 free_fs_request(req); 809 fs_free_io_channels(fs); 810 fs_io_device_unregister(fs); 811 return; 812 } 813 814 common_fs_bs_init(fs, bs); 815 fs_load_done(req, 0); 816 } 817 818 static void 819 fs_io_device_unregister(struct spdk_filesystem *fs) 820 { 821 assert(fs != NULL); 822 spdk_io_device_unregister(&fs->md_target, NULL); 823 spdk_io_device_unregister(&fs->sync_target, NULL); 824 spdk_io_device_unregister(&fs->io_target, NULL); 825 free(fs); 826 } 827 828 static void 829 fs_free_io_channels(struct spdk_filesystem *fs) 830 { 831 assert(fs != NULL); 832 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 833 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 834 } 835 836 void 837 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 838 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 839 { 840 struct spdk_filesystem *fs; 841 struct spdk_fs_cb_args *args; 842 struct spdk_fs_request *req; 843 struct spdk_bs_opts bs_opts; 844 845 fs = fs_alloc(dev, send_request_fn); 846 if (fs == NULL) { 847 cb_fn(cb_arg, NULL, -ENOMEM); 848 return; 849 } 850 851 req = alloc_fs_request(fs->md_target.md_fs_channel); 852 if (req == NULL) { 853 fs_free_io_channels(fs); 854 fs_io_device_unregister(fs); 855 cb_fn(cb_arg, NULL, -ENOMEM); 856 return; 857 } 858 859 args = &req->args; 860 args->fn.fs_op_with_handle = cb_fn; 861 args->arg = cb_arg; 862 args->fs = fs; 863 TAILQ_INIT(&args->op.fs_load.deleted_files); 864 spdk_bs_opts_init(&bs_opts, sizeof(bs_opts)); 865 bs_opts.iter_cb_fn = iter_cb; 866 bs_opts.iter_cb_arg = req; 867 spdk_bs_load(dev, &bs_opts, load_cb, req); 868 } 869 870 static void 871 unload_cb(void *ctx, int bserrno) 872 { 873 struct spdk_fs_request *req = ctx; 874 struct spdk_fs_cb_args *args = &req->args; 875 struct spdk_filesystem *fs = args->fs; 876 struct spdk_file *file, *tmp; 877 878 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 879 TAILQ_REMOVE(&fs->files, file, tailq); 880 file_free(file); 881 } 882 883 free_global_cache(); 884 885 args->fn.fs_op(args->arg, bserrno); 886 free(req); 887 888 fs_io_device_unregister(fs); 889 } 890 891 void 892 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 893 { 894 struct spdk_fs_request *req; 895 struct spdk_fs_cb_args *args; 896 897 /* 898 * We must free the md_channel before unloading the blobstore, so just 899 * allocate this request from the general heap. 900 */ 901 req = calloc(1, sizeof(*req)); 902 if (req == NULL) { 903 cb_fn(cb_arg, -ENOMEM); 904 return; 905 } 906 907 args = &req->args; 908 args->fn.fs_op = cb_fn; 909 args->arg = cb_arg; 910 args->fs = fs; 911 912 fs_free_io_channels(fs); 913 spdk_bs_unload(fs->bs, unload_cb, req); 914 } 915 916 static struct spdk_file * 917 fs_find_file(struct spdk_filesystem *fs, const char *name) 918 { 919 struct spdk_file *file; 920 921 TAILQ_FOREACH(file, &fs->files, tailq) { 922 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 923 return file; 924 } 925 } 926 927 return NULL; 928 } 929 930 void 931 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 932 spdk_file_stat_op_complete cb_fn, void *cb_arg) 933 { 934 struct spdk_file_stat stat; 935 struct spdk_file *f = NULL; 936 937 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 938 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 939 return; 940 } 941 942 f = fs_find_file(fs, name); 943 if (f != NULL) { 944 stat.blobid = f->blobid; 945 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 946 cb_fn(cb_arg, &stat, 0); 947 return; 948 } 949 950 cb_fn(cb_arg, NULL, -ENOENT); 951 } 952 953 static void 954 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 955 { 956 struct spdk_fs_request *req = arg; 957 struct spdk_fs_cb_args *args = &req->args; 958 959 args->rc = fserrno; 960 if (fserrno == 0) { 961 memcpy(args->arg, stat, sizeof(*stat)); 962 } 963 sem_post(args->sem); 964 } 965 966 static void 967 __file_stat(void *arg) 968 { 969 struct spdk_fs_request *req = arg; 970 struct spdk_fs_cb_args *args = &req->args; 971 972 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 973 args->fn.stat_op, req); 974 } 975 976 int 977 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 978 const char *name, struct spdk_file_stat *stat) 979 { 980 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 981 struct spdk_fs_request *req; 982 int rc; 983 984 req = alloc_fs_request(channel); 985 if (req == NULL) { 986 SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name); 987 return -ENOMEM; 988 } 989 990 req->args.fs = fs; 991 req->args.op.stat.name = name; 992 req->args.fn.stat_op = __copy_stat; 993 req->args.arg = stat; 994 req->args.sem = &channel->sem; 995 channel->send_request(__file_stat, req); 996 sem_wait(&channel->sem); 997 998 rc = req->args.rc; 999 free_fs_request(req); 1000 1001 return rc; 1002 } 1003 1004 static void 1005 fs_create_blob_close_cb(void *ctx, int bserrno) 1006 { 1007 int rc; 1008 struct spdk_fs_request *req = ctx; 1009 struct spdk_fs_cb_args *args = &req->args; 1010 1011 rc = args->rc ? args->rc : bserrno; 1012 args->fn.file_op(args->arg, rc); 1013 free_fs_request(req); 1014 } 1015 1016 static void 1017 fs_create_blob_resize_cb(void *ctx, int bserrno) 1018 { 1019 struct spdk_fs_request *req = ctx; 1020 struct spdk_fs_cb_args *args = &req->args; 1021 struct spdk_file *f = args->file; 1022 struct spdk_blob *blob = args->op.create.blob; 1023 uint64_t length = 0; 1024 1025 args->rc = bserrno; 1026 if (bserrno) { 1027 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1028 return; 1029 } 1030 1031 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 1032 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 1033 1034 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1035 } 1036 1037 static void 1038 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1039 { 1040 struct spdk_fs_request *req = ctx; 1041 struct spdk_fs_cb_args *args = &req->args; 1042 1043 if (bserrno) { 1044 args->fn.file_op(args->arg, bserrno); 1045 free_fs_request(req); 1046 return; 1047 } 1048 1049 args->op.create.blob = blob; 1050 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 1051 } 1052 1053 static void 1054 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 1055 { 1056 struct spdk_fs_request *req = ctx; 1057 struct spdk_fs_cb_args *args = &req->args; 1058 struct spdk_file *f = args->file; 1059 1060 if (bserrno) { 1061 args->fn.file_op(args->arg, bserrno); 1062 free_fs_request(req); 1063 return; 1064 } 1065 1066 f->blobid = blobid; 1067 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 1068 } 1069 1070 void 1071 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 1072 spdk_file_op_complete cb_fn, void *cb_arg) 1073 { 1074 struct spdk_file *file; 1075 struct spdk_fs_request *req; 1076 struct spdk_fs_cb_args *args; 1077 1078 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1079 cb_fn(cb_arg, -ENAMETOOLONG); 1080 return; 1081 } 1082 1083 file = fs_find_file(fs, name); 1084 if (file != NULL) { 1085 cb_fn(cb_arg, -EEXIST); 1086 return; 1087 } 1088 1089 file = file_alloc(fs); 1090 if (file == NULL) { 1091 SPDK_ERRLOG("Cannot allocate new file for creation\n"); 1092 cb_fn(cb_arg, -ENOMEM); 1093 return; 1094 } 1095 1096 req = alloc_fs_request(fs->md_target.md_fs_channel); 1097 if (req == NULL) { 1098 SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name); 1099 TAILQ_REMOVE(&fs->files, file, tailq); 1100 file_free(file); 1101 cb_fn(cb_arg, -ENOMEM); 1102 return; 1103 } 1104 1105 args = &req->args; 1106 args->file = file; 1107 args->fn.file_op = cb_fn; 1108 args->arg = cb_arg; 1109 1110 file->name = strdup(name); 1111 if (!file->name) { 1112 SPDK_ERRLOG("Cannot allocate file->name for file=%s\n", name); 1113 free_fs_request(req); 1114 TAILQ_REMOVE(&fs->files, file, tailq); 1115 file_free(file); 1116 cb_fn(cb_arg, -ENOMEM); 1117 return; 1118 } 1119 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 1120 } 1121 1122 static void 1123 __fs_create_file_done(void *arg, int fserrno) 1124 { 1125 struct spdk_fs_request *req = arg; 1126 struct spdk_fs_cb_args *args = &req->args; 1127 1128 __wake_caller(args, fserrno); 1129 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name); 1130 } 1131 1132 static void 1133 __fs_create_file(void *arg) 1134 { 1135 struct spdk_fs_request *req = arg; 1136 struct spdk_fs_cb_args *args = &req->args; 1137 1138 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name); 1139 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1140 } 1141 1142 int 1143 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name) 1144 { 1145 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1146 struct spdk_fs_request *req; 1147 struct spdk_fs_cb_args *args; 1148 int rc; 1149 1150 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1151 1152 req = alloc_fs_request(channel); 1153 if (req == NULL) { 1154 SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name); 1155 return -ENOMEM; 1156 } 1157 1158 args = &req->args; 1159 args->fs = fs; 1160 args->op.create.name = name; 1161 args->sem = &channel->sem; 1162 fs->send_request(__fs_create_file, req); 1163 sem_wait(&channel->sem); 1164 rc = args->rc; 1165 free_fs_request(req); 1166 1167 return rc; 1168 } 1169 1170 static void 1171 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1172 { 1173 struct spdk_fs_request *req = ctx; 1174 struct spdk_fs_cb_args *args = &req->args; 1175 struct spdk_file *f = args->file; 1176 1177 f->blob = blob; 1178 while (!TAILQ_EMPTY(&f->open_requests)) { 1179 req = TAILQ_FIRST(&f->open_requests); 1180 args = &req->args; 1181 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1182 spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->name); 1183 args->fn.file_op_with_handle(args->arg, f, bserrno); 1184 free_fs_request(req); 1185 } 1186 } 1187 1188 static void 1189 fs_open_blob_create_cb(void *ctx, int bserrno) 1190 { 1191 struct spdk_fs_request *req = ctx; 1192 struct spdk_fs_cb_args *args = &req->args; 1193 struct spdk_file *file = args->file; 1194 struct spdk_filesystem *fs = args->fs; 1195 1196 if (file == NULL) { 1197 /* 1198 * This is from an open with CREATE flag - the file 1199 * is now created so look it up in the file list for this 1200 * filesystem. 1201 */ 1202 file = fs_find_file(fs, args->op.open.name); 1203 assert(file != NULL); 1204 args->file = file; 1205 } 1206 1207 file->ref_count++; 1208 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1209 if (file->ref_count == 1) { 1210 assert(file->blob == NULL); 1211 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1212 } else if (file->blob != NULL) { 1213 fs_open_blob_done(req, file->blob, 0); 1214 } else { 1215 /* 1216 * The blob open for this file is in progress due to a previous 1217 * open request. When that open completes, it will invoke the 1218 * open callback for this request. 1219 */ 1220 } 1221 } 1222 1223 void 1224 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1225 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1226 { 1227 struct spdk_file *f = NULL; 1228 struct spdk_fs_request *req; 1229 struct spdk_fs_cb_args *args; 1230 1231 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1232 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1233 return; 1234 } 1235 1236 f = fs_find_file(fs, name); 1237 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1238 cb_fn(cb_arg, NULL, -ENOENT); 1239 return; 1240 } 1241 1242 if (f != NULL && f->is_deleted == true) { 1243 cb_fn(cb_arg, NULL, -ENOENT); 1244 return; 1245 } 1246 1247 req = alloc_fs_request(fs->md_target.md_fs_channel); 1248 if (req == NULL) { 1249 SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name); 1250 cb_fn(cb_arg, NULL, -ENOMEM); 1251 return; 1252 } 1253 1254 args = &req->args; 1255 args->fn.file_op_with_handle = cb_fn; 1256 args->arg = cb_arg; 1257 args->file = f; 1258 args->fs = fs; 1259 args->op.open.name = name; 1260 1261 if (f == NULL) { 1262 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1263 } else { 1264 fs_open_blob_create_cb(req, 0); 1265 } 1266 } 1267 1268 static void 1269 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1270 { 1271 struct spdk_fs_request *req = arg; 1272 struct spdk_fs_cb_args *args = &req->args; 1273 1274 args->file = file; 1275 __wake_caller(args, bserrno); 1276 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name); 1277 } 1278 1279 static void 1280 __fs_open_file(void *arg) 1281 { 1282 struct spdk_fs_request *req = arg; 1283 struct spdk_fs_cb_args *args = &req->args; 1284 1285 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name); 1286 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1287 __fs_open_file_done, req); 1288 } 1289 1290 int 1291 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1292 const char *name, uint32_t flags, struct spdk_file **file) 1293 { 1294 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1295 struct spdk_fs_request *req; 1296 struct spdk_fs_cb_args *args; 1297 int rc; 1298 1299 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1300 1301 req = alloc_fs_request(channel); 1302 if (req == NULL) { 1303 SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name); 1304 return -ENOMEM; 1305 } 1306 1307 args = &req->args; 1308 args->fs = fs; 1309 args->op.open.name = name; 1310 args->op.open.flags = flags; 1311 args->sem = &channel->sem; 1312 fs->send_request(__fs_open_file, req); 1313 sem_wait(&channel->sem); 1314 rc = args->rc; 1315 if (rc == 0) { 1316 *file = args->file; 1317 } else { 1318 *file = NULL; 1319 } 1320 free_fs_request(req); 1321 1322 return rc; 1323 } 1324 1325 static void 1326 fs_rename_blob_close_cb(void *ctx, int bserrno) 1327 { 1328 struct spdk_fs_request *req = ctx; 1329 struct spdk_fs_cb_args *args = &req->args; 1330 1331 args->fn.fs_op(args->arg, bserrno); 1332 free_fs_request(req); 1333 } 1334 1335 static void 1336 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1337 { 1338 struct spdk_fs_request *req = ctx; 1339 struct spdk_fs_cb_args *args = &req->args; 1340 const char *new_name = args->op.rename.new_name; 1341 1342 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1343 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1344 } 1345 1346 static void 1347 _fs_md_rename_file(struct spdk_fs_request *req) 1348 { 1349 struct spdk_fs_cb_args *args = &req->args; 1350 struct spdk_file *f; 1351 1352 f = fs_find_file(args->fs, args->op.rename.old_name); 1353 if (f == NULL) { 1354 args->fn.fs_op(args->arg, -ENOENT); 1355 free_fs_request(req); 1356 return; 1357 } 1358 1359 free(f->name); 1360 f->name = strdup(args->op.rename.new_name); 1361 if (!f->name) { 1362 SPDK_ERRLOG("Cannot allocate memory for file name\n"); 1363 args->fn.fs_op(args->arg, -ENOMEM); 1364 free_fs_request(req); 1365 return; 1366 } 1367 1368 args->file = f; 1369 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1370 } 1371 1372 static void 1373 fs_rename_delete_done(void *arg, int fserrno) 1374 { 1375 _fs_md_rename_file(arg); 1376 } 1377 1378 void 1379 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1380 const char *old_name, const char *new_name, 1381 spdk_file_op_complete cb_fn, void *cb_arg) 1382 { 1383 struct spdk_file *f; 1384 struct spdk_fs_request *req; 1385 struct spdk_fs_cb_args *args; 1386 1387 SPDK_DEBUGLOG(blobfs, "old=%s new=%s\n", old_name, new_name); 1388 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1389 cb_fn(cb_arg, -ENAMETOOLONG); 1390 return; 1391 } 1392 1393 req = alloc_fs_request(fs->md_target.md_fs_channel); 1394 if (req == NULL) { 1395 SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name, 1396 new_name); 1397 cb_fn(cb_arg, -ENOMEM); 1398 return; 1399 } 1400 1401 args = &req->args; 1402 args->fn.fs_op = cb_fn; 1403 args->fs = fs; 1404 args->arg = cb_arg; 1405 args->op.rename.old_name = old_name; 1406 args->op.rename.new_name = new_name; 1407 1408 f = fs_find_file(fs, new_name); 1409 if (f == NULL) { 1410 _fs_md_rename_file(req); 1411 return; 1412 } 1413 1414 /* 1415 * The rename overwrites an existing file. So delete the existing file, then 1416 * do the actual rename. 1417 */ 1418 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1419 } 1420 1421 static void 1422 __fs_rename_file_done(void *arg, int fserrno) 1423 { 1424 struct spdk_fs_request *req = arg; 1425 struct spdk_fs_cb_args *args = &req->args; 1426 1427 __wake_caller(args, fserrno); 1428 } 1429 1430 static void 1431 __fs_rename_file(void *arg) 1432 { 1433 struct spdk_fs_request *req = arg; 1434 struct spdk_fs_cb_args *args = &req->args; 1435 1436 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1437 __fs_rename_file_done, req); 1438 } 1439 1440 int 1441 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1442 const char *old_name, const char *new_name) 1443 { 1444 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1445 struct spdk_fs_request *req; 1446 struct spdk_fs_cb_args *args; 1447 int rc; 1448 1449 req = alloc_fs_request(channel); 1450 if (req == NULL) { 1451 SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name); 1452 return -ENOMEM; 1453 } 1454 1455 args = &req->args; 1456 1457 args->fs = fs; 1458 args->op.rename.old_name = old_name; 1459 args->op.rename.new_name = new_name; 1460 args->sem = &channel->sem; 1461 fs->send_request(__fs_rename_file, req); 1462 sem_wait(&channel->sem); 1463 rc = args->rc; 1464 free_fs_request(req); 1465 return rc; 1466 } 1467 1468 static void 1469 blob_delete_cb(void *ctx, int bserrno) 1470 { 1471 struct spdk_fs_request *req = ctx; 1472 struct spdk_fs_cb_args *args = &req->args; 1473 1474 args->fn.file_op(args->arg, bserrno); 1475 free_fs_request(req); 1476 } 1477 1478 void 1479 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1480 spdk_file_op_complete cb_fn, void *cb_arg) 1481 { 1482 struct spdk_file *f; 1483 spdk_blob_id blobid; 1484 struct spdk_fs_request *req; 1485 struct spdk_fs_cb_args *args; 1486 1487 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1488 1489 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1490 cb_fn(cb_arg, -ENAMETOOLONG); 1491 return; 1492 } 1493 1494 f = fs_find_file(fs, name); 1495 if (f == NULL) { 1496 SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name); 1497 cb_fn(cb_arg, -ENOENT); 1498 return; 1499 } 1500 1501 req = alloc_fs_request(fs->md_target.md_fs_channel); 1502 if (req == NULL) { 1503 SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name); 1504 cb_fn(cb_arg, -ENOMEM); 1505 return; 1506 } 1507 1508 args = &req->args; 1509 args->fn.file_op = cb_fn; 1510 args->arg = cb_arg; 1511 1512 if (f->ref_count > 0) { 1513 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1514 f->is_deleted = true; 1515 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1516 spdk_blob_sync_md(f->blob, blob_delete_cb, req); 1517 return; 1518 } 1519 1520 blobid = f->blobid; 1521 TAILQ_REMOVE(&fs->files, f, tailq); 1522 1523 file_free(f); 1524 1525 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1526 } 1527 1528 static void 1529 __fs_delete_file_done(void *arg, int fserrno) 1530 { 1531 struct spdk_fs_request *req = arg; 1532 struct spdk_fs_cb_args *args = &req->args; 1533 1534 spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, args->op.delete.name); 1535 __wake_caller(args, fserrno); 1536 } 1537 1538 static void 1539 __fs_delete_file(void *arg) 1540 { 1541 struct spdk_fs_request *req = arg; 1542 struct spdk_fs_cb_args *args = &req->args; 1543 1544 spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, args->op.delete.name); 1545 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1546 } 1547 1548 int 1549 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1550 const char *name) 1551 { 1552 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1553 struct spdk_fs_request *req; 1554 struct spdk_fs_cb_args *args; 1555 int rc; 1556 1557 req = alloc_fs_request(channel); 1558 if (req == NULL) { 1559 SPDK_DEBUGLOG(blobfs, "Cannot allocate req to delete file=%s\n", name); 1560 return -ENOMEM; 1561 } 1562 1563 args = &req->args; 1564 args->fs = fs; 1565 args->op.delete.name = name; 1566 args->sem = &channel->sem; 1567 fs->send_request(__fs_delete_file, req); 1568 sem_wait(&channel->sem); 1569 rc = args->rc; 1570 free_fs_request(req); 1571 1572 return rc; 1573 } 1574 1575 spdk_fs_iter 1576 spdk_fs_iter_first(struct spdk_filesystem *fs) 1577 { 1578 struct spdk_file *f; 1579 1580 f = TAILQ_FIRST(&fs->files); 1581 return f; 1582 } 1583 1584 spdk_fs_iter 1585 spdk_fs_iter_next(spdk_fs_iter iter) 1586 { 1587 struct spdk_file *f = iter; 1588 1589 if (f == NULL) { 1590 return NULL; 1591 } 1592 1593 f = TAILQ_NEXT(f, tailq); 1594 return f; 1595 } 1596 1597 const char * 1598 spdk_file_get_name(struct spdk_file *file) 1599 { 1600 return file->name; 1601 } 1602 1603 uint64_t 1604 spdk_file_get_length(struct spdk_file *file) 1605 { 1606 uint64_t length; 1607 1608 assert(file != NULL); 1609 1610 length = file->append_pos >= file->length ? file->append_pos : file->length; 1611 SPDK_DEBUGLOG(blobfs, "file=%s length=0x%jx\n", file->name, length); 1612 return length; 1613 } 1614 1615 static void 1616 fs_truncate_complete_cb(void *ctx, int bserrno) 1617 { 1618 struct spdk_fs_request *req = ctx; 1619 struct spdk_fs_cb_args *args = &req->args; 1620 1621 args->fn.file_op(args->arg, bserrno); 1622 free_fs_request(req); 1623 } 1624 1625 static void 1626 fs_truncate_resize_cb(void *ctx, int bserrno) 1627 { 1628 struct spdk_fs_request *req = ctx; 1629 struct spdk_fs_cb_args *args = &req->args; 1630 struct spdk_file *file = args->file; 1631 uint64_t *length = &args->op.truncate.length; 1632 1633 if (bserrno) { 1634 args->fn.file_op(args->arg, bserrno); 1635 free_fs_request(req); 1636 return; 1637 } 1638 1639 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1640 1641 file->length = *length; 1642 if (file->append_pos > file->length) { 1643 file->append_pos = file->length; 1644 } 1645 1646 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req); 1647 } 1648 1649 static uint64_t 1650 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1651 { 1652 return (length + cluster_sz - 1) / cluster_sz; 1653 } 1654 1655 void 1656 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1657 spdk_file_op_complete cb_fn, void *cb_arg) 1658 { 1659 struct spdk_filesystem *fs; 1660 size_t num_clusters; 1661 struct spdk_fs_request *req; 1662 struct spdk_fs_cb_args *args; 1663 1664 SPDK_DEBUGLOG(blobfs, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1665 if (length == file->length) { 1666 cb_fn(cb_arg, 0); 1667 return; 1668 } 1669 1670 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1671 if (req == NULL) { 1672 cb_fn(cb_arg, -ENOMEM); 1673 return; 1674 } 1675 1676 args = &req->args; 1677 args->fn.file_op = cb_fn; 1678 args->arg = cb_arg; 1679 args->file = file; 1680 args->op.truncate.length = length; 1681 fs = file->fs; 1682 1683 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1684 1685 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1686 } 1687 1688 static void 1689 __truncate(void *arg) 1690 { 1691 struct spdk_fs_request *req = arg; 1692 struct spdk_fs_cb_args *args = &req->args; 1693 1694 spdk_file_truncate_async(args->file, args->op.truncate.length, 1695 args->fn.file_op, args); 1696 } 1697 1698 int 1699 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 1700 uint64_t length) 1701 { 1702 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1703 struct spdk_fs_request *req; 1704 struct spdk_fs_cb_args *args; 1705 int rc; 1706 1707 req = alloc_fs_request(channel); 1708 if (req == NULL) { 1709 return -ENOMEM; 1710 } 1711 1712 args = &req->args; 1713 1714 args->file = file; 1715 args->op.truncate.length = length; 1716 args->fn.file_op = __wake_caller; 1717 args->sem = &channel->sem; 1718 1719 channel->send_request(__truncate, req); 1720 sem_wait(&channel->sem); 1721 rc = args->rc; 1722 free_fs_request(req); 1723 1724 return rc; 1725 } 1726 1727 static void 1728 __rw_done(void *ctx, int bserrno) 1729 { 1730 struct spdk_fs_request *req = ctx; 1731 struct spdk_fs_cb_args *args = &req->args; 1732 1733 spdk_free(args->op.rw.pin_buf); 1734 args->fn.file_op(args->arg, bserrno); 1735 free_fs_request(req); 1736 } 1737 1738 static void 1739 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 1740 { 1741 int i; 1742 size_t len; 1743 1744 for (i = 0; i < iovcnt; i++) { 1745 len = spdk_min(iovs[i].iov_len, buf_len); 1746 memcpy(buf, iovs[i].iov_base, len); 1747 buf += len; 1748 assert(buf_len >= len); 1749 buf_len -= len; 1750 } 1751 } 1752 1753 static void 1754 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 1755 { 1756 int i; 1757 size_t len; 1758 1759 for (i = 0; i < iovcnt; i++) { 1760 len = spdk_min(iovs[i].iov_len, buf_len); 1761 memcpy(iovs[i].iov_base, buf, len); 1762 buf += len; 1763 assert(buf_len >= len); 1764 buf_len -= len; 1765 } 1766 } 1767 1768 static void 1769 __read_done(void *ctx, int bserrno) 1770 { 1771 struct spdk_fs_request *req = ctx; 1772 struct spdk_fs_cb_args *args = &req->args; 1773 void *buf; 1774 1775 assert(req != NULL); 1776 buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1))); 1777 if (args->op.rw.is_read) { 1778 _copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length); 1779 __rw_done(req, 0); 1780 } else { 1781 _copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt); 1782 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1783 args->op.rw.pin_buf, 1784 args->op.rw.start_lba, args->op.rw.num_lba, 1785 __rw_done, req); 1786 } 1787 } 1788 1789 static void 1790 __do_blob_read(void *ctx, int fserrno) 1791 { 1792 struct spdk_fs_request *req = ctx; 1793 struct spdk_fs_cb_args *args = &req->args; 1794 1795 if (fserrno) { 1796 __rw_done(req, fserrno); 1797 return; 1798 } 1799 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1800 args->op.rw.pin_buf, 1801 args->op.rw.start_lba, args->op.rw.num_lba, 1802 __read_done, req); 1803 } 1804 1805 static void 1806 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1807 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1808 { 1809 uint64_t end_lba; 1810 1811 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1812 *start_lba = offset / *lba_size; 1813 end_lba = (offset + length - 1) / *lba_size; 1814 *num_lba = (end_lba - *start_lba + 1); 1815 } 1816 1817 static bool 1818 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length) 1819 { 1820 uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1821 1822 if ((offset % lba_size == 0) && (length % lba_size == 0)) { 1823 return true; 1824 } 1825 1826 return false; 1827 } 1828 1829 static void 1830 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt) 1831 { 1832 uint32_t i; 1833 1834 for (i = 0; i < iovcnt; i++) { 1835 req->args.iovs[i].iov_base = iovs[i].iov_base; 1836 req->args.iovs[i].iov_len = iovs[i].iov_len; 1837 } 1838 } 1839 1840 static void 1841 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel, 1842 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1843 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1844 { 1845 struct spdk_fs_request *req; 1846 struct spdk_fs_cb_args *args; 1847 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1848 uint64_t start_lba, num_lba, pin_buf_length; 1849 uint32_t lba_size; 1850 1851 if (is_read && offset + length > file->length) { 1852 cb_fn(cb_arg, -EINVAL); 1853 return; 1854 } 1855 1856 req = alloc_fs_request_with_iov(channel, iovcnt); 1857 if (req == NULL) { 1858 cb_fn(cb_arg, -ENOMEM); 1859 return; 1860 } 1861 1862 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1863 1864 args = &req->args; 1865 args->fn.file_op = cb_fn; 1866 args->arg = cb_arg; 1867 args->file = file; 1868 args->op.rw.channel = channel->bs_channel; 1869 _fs_request_setup_iovs(req, iovs, iovcnt); 1870 args->op.rw.is_read = is_read; 1871 args->op.rw.offset = offset; 1872 args->op.rw.blocklen = lba_size; 1873 1874 pin_buf_length = num_lba * lba_size; 1875 args->op.rw.length = pin_buf_length; 1876 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL, 1877 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1878 if (args->op.rw.pin_buf == NULL) { 1879 SPDK_DEBUGLOG(blobfs, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1880 file->name, offset, length); 1881 free_fs_request(req); 1882 cb_fn(cb_arg, -ENOMEM); 1883 return; 1884 } 1885 1886 args->op.rw.start_lba = start_lba; 1887 args->op.rw.num_lba = num_lba; 1888 1889 if (!is_read && file->length < offset + length) { 1890 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1891 } else if (!is_read && __is_lba_aligned(file, offset, length)) { 1892 _copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt); 1893 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1894 args->op.rw.pin_buf, 1895 args->op.rw.start_lba, args->op.rw.num_lba, 1896 __rw_done, req); 1897 } else { 1898 __do_blob_read(req, 0); 1899 } 1900 } 1901 1902 static void 1903 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel, 1904 void *payload, uint64_t offset, uint64_t length, 1905 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1906 { 1907 struct iovec iov; 1908 1909 iov.iov_base = payload; 1910 iov.iov_len = (size_t)length; 1911 1912 __readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read); 1913 } 1914 1915 void 1916 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1917 void *payload, uint64_t offset, uint64_t length, 1918 spdk_file_op_complete cb_fn, void *cb_arg) 1919 { 1920 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1921 } 1922 1923 void 1924 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel, 1925 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1926 spdk_file_op_complete cb_fn, void *cb_arg) 1927 { 1928 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1929 file->name, offset, length); 1930 1931 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0); 1932 } 1933 1934 void 1935 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1936 void *payload, uint64_t offset, uint64_t length, 1937 spdk_file_op_complete cb_fn, void *cb_arg) 1938 { 1939 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1940 file->name, offset, length); 1941 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1942 } 1943 1944 void 1945 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel, 1946 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1947 spdk_file_op_complete cb_fn, void *cb_arg) 1948 { 1949 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1950 file->name, offset, length); 1951 1952 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1); 1953 } 1954 1955 struct spdk_io_channel * 1956 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1957 { 1958 struct spdk_io_channel *io_channel; 1959 struct spdk_fs_channel *fs_channel; 1960 1961 io_channel = spdk_get_io_channel(&fs->io_target); 1962 fs_channel = spdk_io_channel_get_ctx(io_channel); 1963 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1964 fs_channel->send_request = __send_request_direct; 1965 1966 return io_channel; 1967 } 1968 1969 void 1970 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1971 { 1972 spdk_put_io_channel(channel); 1973 } 1974 1975 struct spdk_fs_thread_ctx * 1976 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs) 1977 { 1978 struct spdk_fs_thread_ctx *ctx; 1979 1980 ctx = calloc(1, sizeof(*ctx)); 1981 if (!ctx) { 1982 return NULL; 1983 } 1984 1985 if (pthread_spin_init(&ctx->ch.lock, 0)) { 1986 free(ctx); 1987 return NULL; 1988 } 1989 1990 fs_channel_create(fs, &ctx->ch, 512); 1991 1992 ctx->ch.send_request = fs->send_request; 1993 ctx->ch.sync = 1; 1994 1995 return ctx; 1996 } 1997 1998 1999 void 2000 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx) 2001 { 2002 assert(ctx->ch.sync == 1); 2003 2004 while (true) { 2005 pthread_spin_lock(&ctx->ch.lock); 2006 if (ctx->ch.outstanding_reqs == 0) { 2007 pthread_spin_unlock(&ctx->ch.lock); 2008 break; 2009 } 2010 pthread_spin_unlock(&ctx->ch.lock); 2011 usleep(1000); 2012 } 2013 2014 fs_channel_destroy(NULL, &ctx->ch); 2015 free(ctx); 2016 } 2017 2018 int 2019 spdk_fs_set_cache_size(uint64_t size_in_mb) 2020 { 2021 /* setting g_fs_cache_size is only permitted if cache pool 2022 * is already freed or hasn't been initialized 2023 */ 2024 if (g_cache_pool != NULL) { 2025 return -EPERM; 2026 } 2027 2028 g_fs_cache_size = size_in_mb * 1024 * 1024; 2029 2030 return 0; 2031 } 2032 2033 uint64_t 2034 spdk_fs_get_cache_size(void) 2035 { 2036 return g_fs_cache_size / (1024 * 1024); 2037 } 2038 2039 static void __file_flush(void *ctx); 2040 2041 /* Try to free some cache buffers from this file. 2042 */ 2043 static int 2044 reclaim_cache_buffers(struct spdk_file *file) 2045 { 2046 int rc; 2047 2048 BLOBFS_TRACE(file, "free=%s\n", file->name); 2049 2050 /* The function is safe to be called with any threads, while the file 2051 * lock maybe locked by other thread for now, so try to get the file 2052 * lock here. 2053 */ 2054 rc = pthread_spin_trylock(&file->lock); 2055 if (rc != 0) { 2056 return -1; 2057 } 2058 2059 if (file->tree->present_mask == 0) { 2060 pthread_spin_unlock(&file->lock); 2061 return -1; 2062 } 2063 tree_free_buffers(file->tree); 2064 2065 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2066 /* If not freed, put it in the end of the queue */ 2067 if (file->tree->present_mask != 0) { 2068 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2069 } else { 2070 file->last = NULL; 2071 } 2072 pthread_spin_unlock(&file->lock); 2073 2074 return 0; 2075 } 2076 2077 static int 2078 _blobfs_cache_pool_reclaim(void *arg) 2079 { 2080 struct spdk_file *file, *tmp; 2081 int rc; 2082 2083 if (!blobfs_cache_pool_need_reclaim()) { 2084 return SPDK_POLLER_IDLE; 2085 } 2086 2087 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2088 if (!file->open_for_writing && 2089 file->priority == SPDK_FILE_PRIORITY_LOW) { 2090 rc = reclaim_cache_buffers(file); 2091 if (rc < 0) { 2092 continue; 2093 } 2094 if (!blobfs_cache_pool_need_reclaim()) { 2095 return SPDK_POLLER_BUSY; 2096 } 2097 break; 2098 } 2099 } 2100 2101 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2102 if (!file->open_for_writing) { 2103 rc = reclaim_cache_buffers(file); 2104 if (rc < 0) { 2105 continue; 2106 } 2107 if (!blobfs_cache_pool_need_reclaim()) { 2108 return SPDK_POLLER_BUSY; 2109 } 2110 break; 2111 } 2112 } 2113 2114 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2115 rc = reclaim_cache_buffers(file); 2116 if (rc < 0) { 2117 continue; 2118 } 2119 break; 2120 } 2121 2122 return SPDK_POLLER_BUSY; 2123 } 2124 2125 static void 2126 _add_file_to_cache_pool(void *ctx) 2127 { 2128 struct spdk_file *file = ctx; 2129 2130 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2131 } 2132 2133 static void 2134 _remove_file_from_cache_pool(void *ctx) 2135 { 2136 struct spdk_file *file = ctx; 2137 2138 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2139 } 2140 2141 static struct cache_buffer * 2142 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 2143 { 2144 struct cache_buffer *buf; 2145 int count = 0; 2146 bool need_update = false; 2147 2148 buf = calloc(1, sizeof(*buf)); 2149 if (buf == NULL) { 2150 SPDK_DEBUGLOG(blobfs, "calloc failed\n"); 2151 return NULL; 2152 } 2153 2154 do { 2155 buf->buf = spdk_mempool_get(g_cache_pool); 2156 if (buf->buf) { 2157 break; 2158 } 2159 if (count++ == 100) { 2160 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 2161 file, offset); 2162 free(buf); 2163 return NULL; 2164 } 2165 usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 2166 } while (true); 2167 2168 buf->buf_size = CACHE_BUFFER_SIZE; 2169 buf->offset = offset; 2170 2171 if (file->tree->present_mask == 0) { 2172 need_update = true; 2173 } 2174 file->tree = tree_insert_buffer(file->tree, buf); 2175 2176 if (need_update) { 2177 spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file); 2178 } 2179 2180 return buf; 2181 } 2182 2183 static struct cache_buffer * 2184 cache_append_buffer(struct spdk_file *file) 2185 { 2186 struct cache_buffer *last; 2187 2188 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 2189 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 2190 2191 last = cache_insert_buffer(file, file->append_pos); 2192 if (last == NULL) { 2193 SPDK_DEBUGLOG(blobfs, "cache_insert_buffer failed\n"); 2194 return NULL; 2195 } 2196 2197 file->last = last; 2198 2199 return last; 2200 } 2201 2202 static void __check_sync_reqs(struct spdk_file *file); 2203 2204 static void 2205 __file_cache_finish_sync(void *ctx, int bserrno) 2206 { 2207 struct spdk_file *file; 2208 struct spdk_fs_request *sync_req = ctx; 2209 struct spdk_fs_cb_args *sync_args; 2210 2211 sync_args = &sync_req->args; 2212 file = sync_args->file; 2213 pthread_spin_lock(&file->lock); 2214 file->length_xattr = sync_args->op.sync.length; 2215 assert(sync_args->op.sync.offset <= file->length_flushed); 2216 spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset, 2217 0, file->name); 2218 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 2219 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 2220 pthread_spin_unlock(&file->lock); 2221 2222 sync_args->fn.file_op(sync_args->arg, bserrno); 2223 2224 free_fs_request(sync_req); 2225 __check_sync_reqs(file); 2226 } 2227 2228 static void 2229 __check_sync_reqs(struct spdk_file *file) 2230 { 2231 struct spdk_fs_request *sync_req; 2232 2233 pthread_spin_lock(&file->lock); 2234 2235 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 2236 if (sync_req->args.op.sync.offset <= file->length_flushed) { 2237 break; 2238 } 2239 } 2240 2241 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 2242 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 2243 sync_req->args.op.sync.xattr_in_progress = true; 2244 sync_req->args.op.sync.length = file->length_flushed; 2245 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 2246 sizeof(file->length_flushed)); 2247 2248 pthread_spin_unlock(&file->lock); 2249 spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed, 2250 0, file->name); 2251 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req); 2252 } else { 2253 pthread_spin_unlock(&file->lock); 2254 } 2255 } 2256 2257 static void 2258 __file_flush_done(void *ctx, int bserrno) 2259 { 2260 struct spdk_fs_request *req = ctx; 2261 struct spdk_fs_cb_args *args = &req->args; 2262 struct spdk_file *file = args->file; 2263 struct cache_buffer *next = args->op.flush.cache_buffer; 2264 2265 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 2266 2267 pthread_spin_lock(&file->lock); 2268 next->in_progress = false; 2269 next->bytes_flushed += args->op.flush.length; 2270 file->length_flushed += args->op.flush.length; 2271 if (file->length_flushed > file->length) { 2272 file->length = file->length_flushed; 2273 } 2274 if (next->bytes_flushed == next->buf_size) { 2275 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 2276 next = tree_find_buffer(file->tree, file->length_flushed); 2277 } 2278 2279 /* 2280 * Assert that there is no cached data that extends past the end of the underlying 2281 * blob. 2282 */ 2283 assert(next == NULL || next->offset < __file_get_blob_size(file) || 2284 next->bytes_filled == 0); 2285 2286 pthread_spin_unlock(&file->lock); 2287 2288 __check_sync_reqs(file); 2289 2290 __file_flush(req); 2291 } 2292 2293 static void 2294 __file_flush(void *ctx) 2295 { 2296 struct spdk_fs_request *req = ctx; 2297 struct spdk_fs_cb_args *args = &req->args; 2298 struct spdk_file *file = args->file; 2299 struct cache_buffer *next; 2300 uint64_t offset, length, start_lba, num_lba; 2301 uint32_t lba_size; 2302 2303 pthread_spin_lock(&file->lock); 2304 next = tree_find_buffer(file->tree, file->length_flushed); 2305 if (next == NULL || next->in_progress || 2306 ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) { 2307 /* 2308 * There is either no data to flush, a flush I/O is already in 2309 * progress, or the next buffer is partially filled but there's no 2310 * outstanding request to sync it. 2311 * So return immediately - if a flush I/O is in progress we will flush 2312 * more data after that is completed, or a partial buffer will get flushed 2313 * when it is either filled or the file is synced. 2314 */ 2315 free_fs_request(req); 2316 if (next == NULL) { 2317 /* 2318 * For cases where a file's cache was evicted, and then the 2319 * file was later appended, we will write the data directly 2320 * to disk and bypass cache. So just update length_flushed 2321 * here to reflect that all data was already written to disk. 2322 */ 2323 file->length_flushed = file->append_pos; 2324 } 2325 pthread_spin_unlock(&file->lock); 2326 if (next == NULL) { 2327 /* 2328 * There is no data to flush, but we still need to check for any 2329 * outstanding sync requests to make sure metadata gets updated. 2330 */ 2331 __check_sync_reqs(file); 2332 } 2333 return; 2334 } 2335 2336 offset = next->offset + next->bytes_flushed; 2337 length = next->bytes_filled - next->bytes_flushed; 2338 if (length == 0) { 2339 free_fs_request(req); 2340 pthread_spin_unlock(&file->lock); 2341 /* 2342 * There is no data to flush, but we still need to check for any 2343 * outstanding sync requests to make sure metadata gets updated. 2344 */ 2345 __check_sync_reqs(file); 2346 return; 2347 } 2348 args->op.flush.length = length; 2349 args->op.flush.cache_buffer = next; 2350 2351 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2352 2353 next->in_progress = true; 2354 BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n", 2355 offset, length, start_lba, num_lba); 2356 pthread_spin_unlock(&file->lock); 2357 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2358 next->buf + (start_lba * lba_size) - next->offset, 2359 start_lba, num_lba, __file_flush_done, req); 2360 } 2361 2362 static void 2363 __file_extend_done(void *arg, int bserrno) 2364 { 2365 struct spdk_fs_cb_args *args = arg; 2366 2367 __wake_caller(args, bserrno); 2368 } 2369 2370 static void 2371 __file_extend_resize_cb(void *_args, int bserrno) 2372 { 2373 struct spdk_fs_cb_args *args = _args; 2374 struct spdk_file *file = args->file; 2375 2376 if (bserrno) { 2377 __wake_caller(args, bserrno); 2378 return; 2379 } 2380 2381 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2382 } 2383 2384 static void 2385 __file_extend_blob(void *_args) 2386 { 2387 struct spdk_fs_cb_args *args = _args; 2388 struct spdk_file *file = args->file; 2389 2390 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2391 } 2392 2393 static void 2394 __rw_from_file_done(void *ctx, int bserrno) 2395 { 2396 struct spdk_fs_request *req = ctx; 2397 2398 __wake_caller(&req->args, bserrno); 2399 free_fs_request(req); 2400 } 2401 2402 static void 2403 __rw_from_file(void *ctx) 2404 { 2405 struct spdk_fs_request *req = ctx; 2406 struct spdk_fs_cb_args *args = &req->args; 2407 struct spdk_file *file = args->file; 2408 2409 if (args->op.rw.is_read) { 2410 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2411 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2412 __rw_from_file_done, req); 2413 } else { 2414 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2415 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2416 __rw_from_file_done, req); 2417 } 2418 } 2419 2420 struct rw_from_file_arg { 2421 struct spdk_fs_channel *channel; 2422 int rwerrno; 2423 }; 2424 2425 static int 2426 __send_rw_from_file(struct spdk_file *file, void *payload, 2427 uint64_t offset, uint64_t length, bool is_read, 2428 struct rw_from_file_arg *arg) 2429 { 2430 struct spdk_fs_request *req; 2431 struct spdk_fs_cb_args *args; 2432 2433 req = alloc_fs_request_with_iov(arg->channel, 1); 2434 if (req == NULL) { 2435 sem_post(&arg->channel->sem); 2436 return -ENOMEM; 2437 } 2438 2439 args = &req->args; 2440 args->file = file; 2441 args->sem = &arg->channel->sem; 2442 args->iovs[0].iov_base = payload; 2443 args->iovs[0].iov_len = (size_t)length; 2444 args->op.rw.offset = offset; 2445 args->op.rw.is_read = is_read; 2446 args->rwerrno = &arg->rwerrno; 2447 file->fs->send_request(__rw_from_file, req); 2448 return 0; 2449 } 2450 2451 int 2452 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2453 void *payload, uint64_t offset, uint64_t length) 2454 { 2455 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2456 struct spdk_fs_request *flush_req; 2457 uint64_t rem_length, copy, blob_size, cluster_sz; 2458 uint32_t cache_buffers_filled = 0; 2459 uint8_t *cur_payload; 2460 struct cache_buffer *last; 2461 2462 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2463 2464 if (length == 0) { 2465 return 0; 2466 } 2467 2468 if (offset != file->append_pos) { 2469 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2470 return -EINVAL; 2471 } 2472 2473 pthread_spin_lock(&file->lock); 2474 file->open_for_writing = true; 2475 2476 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2477 cache_append_buffer(file); 2478 } 2479 2480 if (file->last == NULL) { 2481 struct rw_from_file_arg arg = {}; 2482 int rc; 2483 2484 arg.channel = channel; 2485 arg.rwerrno = 0; 2486 file->append_pos += length; 2487 pthread_spin_unlock(&file->lock); 2488 rc = __send_rw_from_file(file, payload, offset, length, false, &arg); 2489 if (rc != 0) { 2490 return rc; 2491 } 2492 sem_wait(&channel->sem); 2493 return arg.rwerrno; 2494 } 2495 2496 blob_size = __file_get_blob_size(file); 2497 2498 if ((offset + length) > blob_size) { 2499 struct spdk_fs_cb_args extend_args = {}; 2500 2501 cluster_sz = file->fs->bs_opts.cluster_sz; 2502 extend_args.sem = &channel->sem; 2503 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2504 extend_args.file = file; 2505 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2506 pthread_spin_unlock(&file->lock); 2507 file->fs->send_request(__file_extend_blob, &extend_args); 2508 sem_wait(&channel->sem); 2509 if (extend_args.rc) { 2510 return extend_args.rc; 2511 } 2512 } 2513 2514 flush_req = alloc_fs_request(channel); 2515 if (flush_req == NULL) { 2516 pthread_spin_unlock(&file->lock); 2517 return -ENOMEM; 2518 } 2519 2520 last = file->last; 2521 rem_length = length; 2522 cur_payload = payload; 2523 while (rem_length > 0) { 2524 copy = last->buf_size - last->bytes_filled; 2525 if (copy > rem_length) { 2526 copy = rem_length; 2527 } 2528 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2529 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2530 file->append_pos += copy; 2531 if (file->length < file->append_pos) { 2532 file->length = file->append_pos; 2533 } 2534 cur_payload += copy; 2535 last->bytes_filled += copy; 2536 rem_length -= copy; 2537 if (last->bytes_filled == last->buf_size) { 2538 cache_buffers_filled++; 2539 last = cache_append_buffer(file); 2540 if (last == NULL) { 2541 BLOBFS_TRACE(file, "nomem\n"); 2542 free_fs_request(flush_req); 2543 pthread_spin_unlock(&file->lock); 2544 return -ENOMEM; 2545 } 2546 } 2547 } 2548 2549 pthread_spin_unlock(&file->lock); 2550 2551 if (cache_buffers_filled == 0) { 2552 free_fs_request(flush_req); 2553 return 0; 2554 } 2555 2556 flush_req->args.file = file; 2557 file->fs->send_request(__file_flush, flush_req); 2558 return 0; 2559 } 2560 2561 static void 2562 __readahead_done(void *ctx, int bserrno) 2563 { 2564 struct spdk_fs_request *req = ctx; 2565 struct spdk_fs_cb_args *args = &req->args; 2566 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2567 struct spdk_file *file = args->file; 2568 2569 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2570 2571 pthread_spin_lock(&file->lock); 2572 cache_buffer->bytes_filled = args->op.readahead.length; 2573 cache_buffer->bytes_flushed = args->op.readahead.length; 2574 cache_buffer->in_progress = false; 2575 pthread_spin_unlock(&file->lock); 2576 2577 free_fs_request(req); 2578 } 2579 2580 static void 2581 __readahead(void *ctx) 2582 { 2583 struct spdk_fs_request *req = ctx; 2584 struct spdk_fs_cb_args *args = &req->args; 2585 struct spdk_file *file = args->file; 2586 uint64_t offset, length, start_lba, num_lba; 2587 uint32_t lba_size; 2588 2589 offset = args->op.readahead.offset; 2590 length = args->op.readahead.length; 2591 assert(length > 0); 2592 2593 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2594 2595 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2596 offset, length, start_lba, num_lba); 2597 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2598 args->op.readahead.cache_buffer->buf, 2599 start_lba, num_lba, __readahead_done, req); 2600 } 2601 2602 static uint64_t 2603 __next_cache_buffer_offset(uint64_t offset) 2604 { 2605 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2606 } 2607 2608 static void 2609 check_readahead(struct spdk_file *file, uint64_t offset, 2610 struct spdk_fs_channel *channel) 2611 { 2612 struct spdk_fs_request *req; 2613 struct spdk_fs_cb_args *args; 2614 2615 offset = __next_cache_buffer_offset(offset); 2616 if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2617 return; 2618 } 2619 2620 req = alloc_fs_request(channel); 2621 if (req == NULL) { 2622 return; 2623 } 2624 args = &req->args; 2625 2626 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2627 2628 args->file = file; 2629 args->op.readahead.offset = offset; 2630 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2631 if (!args->op.readahead.cache_buffer) { 2632 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2633 free_fs_request(req); 2634 return; 2635 } 2636 2637 args->op.readahead.cache_buffer->in_progress = true; 2638 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2639 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2640 } else { 2641 args->op.readahead.length = CACHE_BUFFER_SIZE; 2642 } 2643 file->fs->send_request(__readahead, req); 2644 } 2645 2646 int64_t 2647 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2648 void *payload, uint64_t offset, uint64_t length) 2649 { 2650 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2651 uint64_t final_offset, final_length; 2652 uint32_t sub_reads = 0; 2653 struct cache_buffer *buf; 2654 uint64_t read_len; 2655 struct rw_from_file_arg arg = {}; 2656 2657 pthread_spin_lock(&file->lock); 2658 2659 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2660 2661 file->open_for_writing = false; 2662 2663 if (length == 0 || offset >= file->append_pos) { 2664 pthread_spin_unlock(&file->lock); 2665 return 0; 2666 } 2667 2668 if (offset + length > file->append_pos) { 2669 length = file->append_pos - offset; 2670 } 2671 2672 if (offset != file->next_seq_offset) { 2673 file->seq_byte_count = 0; 2674 } 2675 file->seq_byte_count += length; 2676 file->next_seq_offset = offset + length; 2677 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2678 check_readahead(file, offset, channel); 2679 check_readahead(file, offset + CACHE_BUFFER_SIZE, channel); 2680 } 2681 2682 arg.channel = channel; 2683 arg.rwerrno = 0; 2684 final_length = 0; 2685 final_offset = offset + length; 2686 while (offset < final_offset) { 2687 int ret = 0; 2688 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2689 if (length > (final_offset - offset)) { 2690 length = final_offset - offset; 2691 } 2692 2693 buf = tree_find_filled_buffer(file->tree, offset); 2694 if (buf == NULL) { 2695 pthread_spin_unlock(&file->lock); 2696 ret = __send_rw_from_file(file, payload, offset, length, true, &arg); 2697 pthread_spin_lock(&file->lock); 2698 if (ret == 0) { 2699 sub_reads++; 2700 } 2701 } else { 2702 read_len = length; 2703 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2704 read_len = buf->offset + buf->bytes_filled - offset; 2705 } 2706 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len); 2707 memcpy(payload, &buf->buf[offset - buf->offset], read_len); 2708 if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) { 2709 tree_remove_buffer(file->tree, buf); 2710 if (file->tree->present_mask == 0) { 2711 spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file); 2712 } 2713 } 2714 } 2715 2716 if (ret == 0) { 2717 final_length += length; 2718 } else { 2719 arg.rwerrno = ret; 2720 break; 2721 } 2722 payload += length; 2723 offset += length; 2724 } 2725 pthread_spin_unlock(&file->lock); 2726 while (sub_reads > 0) { 2727 sem_wait(&channel->sem); 2728 sub_reads--; 2729 } 2730 if (arg.rwerrno == 0) { 2731 return final_length; 2732 } else { 2733 return arg.rwerrno; 2734 } 2735 } 2736 2737 static void 2738 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2739 spdk_file_op_complete cb_fn, void *cb_arg) 2740 { 2741 struct spdk_fs_request *sync_req; 2742 struct spdk_fs_request *flush_req; 2743 struct spdk_fs_cb_args *sync_args; 2744 struct spdk_fs_cb_args *flush_args; 2745 2746 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2747 2748 pthread_spin_lock(&file->lock); 2749 if (file->append_pos <= file->length_xattr) { 2750 BLOBFS_TRACE(file, "done - file already synced\n"); 2751 pthread_spin_unlock(&file->lock); 2752 cb_fn(cb_arg, 0); 2753 return; 2754 } 2755 2756 sync_req = alloc_fs_request(channel); 2757 if (!sync_req) { 2758 SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name); 2759 pthread_spin_unlock(&file->lock); 2760 cb_fn(cb_arg, -ENOMEM); 2761 return; 2762 } 2763 sync_args = &sync_req->args; 2764 2765 flush_req = alloc_fs_request(channel); 2766 if (!flush_req) { 2767 SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name); 2768 free_fs_request(sync_req); 2769 pthread_spin_unlock(&file->lock); 2770 cb_fn(cb_arg, -ENOMEM); 2771 return; 2772 } 2773 flush_args = &flush_req->args; 2774 2775 sync_args->file = file; 2776 sync_args->fn.file_op = cb_fn; 2777 sync_args->arg = cb_arg; 2778 sync_args->op.sync.offset = file->append_pos; 2779 sync_args->op.sync.xattr_in_progress = false; 2780 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2781 pthread_spin_unlock(&file->lock); 2782 2783 flush_args->file = file; 2784 channel->send_request(__file_flush, flush_req); 2785 } 2786 2787 int 2788 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2789 { 2790 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2791 struct spdk_fs_cb_args args = {}; 2792 2793 args.sem = &channel->sem; 2794 _file_sync(file, channel, __wake_caller, &args); 2795 sem_wait(&channel->sem); 2796 2797 return args.rc; 2798 } 2799 2800 void 2801 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2802 spdk_file_op_complete cb_fn, void *cb_arg) 2803 { 2804 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2805 2806 _file_sync(file, channel, cb_fn, cb_arg); 2807 } 2808 2809 void 2810 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2811 { 2812 BLOBFS_TRACE(file, "priority=%u\n", priority); 2813 file->priority = priority; 2814 2815 } 2816 2817 /* 2818 * Close routines 2819 */ 2820 2821 static void 2822 __file_close_async_done(void *ctx, int bserrno) 2823 { 2824 struct spdk_fs_request *req = ctx; 2825 struct spdk_fs_cb_args *args = &req->args; 2826 struct spdk_file *file = args->file; 2827 2828 spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->name); 2829 2830 if (file->is_deleted) { 2831 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2832 return; 2833 } 2834 2835 args->fn.file_op(args->arg, bserrno); 2836 free_fs_request(req); 2837 } 2838 2839 static void 2840 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2841 { 2842 struct spdk_blob *blob; 2843 2844 pthread_spin_lock(&file->lock); 2845 if (file->ref_count == 0) { 2846 pthread_spin_unlock(&file->lock); 2847 __file_close_async_done(req, -EBADF); 2848 return; 2849 } 2850 2851 file->ref_count--; 2852 if (file->ref_count > 0) { 2853 pthread_spin_unlock(&file->lock); 2854 req->args.fn.file_op(req->args.arg, 0); 2855 free_fs_request(req); 2856 return; 2857 } 2858 2859 pthread_spin_unlock(&file->lock); 2860 2861 blob = file->blob; 2862 file->blob = NULL; 2863 spdk_blob_close(blob, __file_close_async_done, req); 2864 } 2865 2866 static void 2867 __file_close_async__sync_done(void *arg, int fserrno) 2868 { 2869 struct spdk_fs_request *req = arg; 2870 struct spdk_fs_cb_args *args = &req->args; 2871 2872 __file_close_async(args->file, req); 2873 } 2874 2875 void 2876 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2877 { 2878 struct spdk_fs_request *req; 2879 struct spdk_fs_cb_args *args; 2880 2881 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2882 if (req == NULL) { 2883 SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name); 2884 cb_fn(cb_arg, -ENOMEM); 2885 return; 2886 } 2887 2888 args = &req->args; 2889 args->file = file; 2890 args->fn.file_op = cb_fn; 2891 args->arg = cb_arg; 2892 2893 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2894 } 2895 2896 static void 2897 __file_close(void *arg) 2898 { 2899 struct spdk_fs_request *req = arg; 2900 struct spdk_fs_cb_args *args = &req->args; 2901 struct spdk_file *file = args->file; 2902 2903 __file_close_async(file, req); 2904 } 2905 2906 int 2907 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2908 { 2909 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2910 struct spdk_fs_request *req; 2911 struct spdk_fs_cb_args *args; 2912 2913 req = alloc_fs_request(channel); 2914 if (req == NULL) { 2915 SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name); 2916 return -ENOMEM; 2917 } 2918 2919 args = &req->args; 2920 2921 spdk_file_sync(file, ctx); 2922 BLOBFS_TRACE(file, "name=%s\n", file->name); 2923 args->file = file; 2924 args->sem = &channel->sem; 2925 args->fn.file_op = __wake_caller; 2926 args->arg = args; 2927 channel->send_request(__file_close, req); 2928 sem_wait(&channel->sem); 2929 2930 return args->rc; 2931 } 2932 2933 int 2934 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2935 { 2936 if (size < sizeof(spdk_blob_id)) { 2937 return -EINVAL; 2938 } 2939 2940 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2941 2942 return sizeof(spdk_blob_id); 2943 } 2944 2945 static void 2946 _file_free(void *ctx) 2947 { 2948 struct spdk_file *file = ctx; 2949 2950 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2951 2952 free(file->name); 2953 free(file->tree); 2954 free(file); 2955 } 2956 2957 static void 2958 file_free(struct spdk_file *file) 2959 { 2960 BLOBFS_TRACE(file, "free=%s\n", file->name); 2961 pthread_spin_lock(&file->lock); 2962 if (file->tree->present_mask == 0) { 2963 pthread_spin_unlock(&file->lock); 2964 free(file->name); 2965 free(file->tree); 2966 free(file); 2967 return; 2968 } 2969 2970 tree_free_buffers(file->tree); 2971 assert(file->tree->present_mask == 0); 2972 spdk_thread_send_msg(g_cache_pool_thread, _file_free, file); 2973 pthread_spin_unlock(&file->lock); 2974 } 2975 2976 SPDK_LOG_REGISTER_COMPONENT(blobfs) 2977 SPDK_LOG_REGISTER_COMPONENT(blobfs_rw) 2978