1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2017 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 8 #include "spdk/blobfs.h" 9 #include "cache_tree.h" 10 11 #include "spdk/queue.h" 12 #include "spdk/thread.h" 13 #include "spdk/assert.h" 14 #include "spdk/env.h" 15 #include "spdk/util.h" 16 #include "spdk/log.h" 17 #include "spdk/trace.h" 18 19 #include "spdk_internal/trace_defs.h" 20 21 #define BLOBFS_TRACE(file, str, args...) \ 22 SPDK_DEBUGLOG(blobfs, "file=%s " str, file->name, ##args) 23 24 #define BLOBFS_TRACE_RW(file, str, args...) \ 25 SPDK_DEBUGLOG(blobfs_rw, "file=%s " str, file->name, ##args) 26 27 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 28 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 29 30 #define SPDK_BLOBFS_SIGNATURE "BLOBFS" 31 32 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 33 static struct spdk_mempool *g_cache_pool; 34 static TAILQ_HEAD(, spdk_file) g_caches = TAILQ_HEAD_INITIALIZER(g_caches); 35 static struct spdk_poller *g_cache_pool_mgmt_poller; 36 static struct spdk_thread *g_cache_pool_thread; 37 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL 38 static int g_fs_count = 0; 39 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 40 41 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS) 42 { 43 struct spdk_trace_tpoint_opts opts[] = { 44 { 45 "BLOBFS_XATTR_START", TRACE_BLOBFS_XATTR_START, 46 OWNER_NONE, OBJECT_NONE, 0, 47 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 48 }, 49 { 50 "BLOBFS_XATTR_END", TRACE_BLOBFS_XATTR_END, 51 OWNER_NONE, OBJECT_NONE, 0, 52 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 53 }, 54 { 55 "BLOBFS_OPEN", TRACE_BLOBFS_OPEN, 56 OWNER_NONE, OBJECT_NONE, 0, 57 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 58 }, 59 { 60 "BLOBFS_CLOSE", TRACE_BLOBFS_CLOSE, 61 OWNER_NONE, OBJECT_NONE, 0, 62 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 63 }, 64 { 65 "BLOBFS_DELETE_START", TRACE_BLOBFS_DELETE_START, 66 OWNER_NONE, OBJECT_NONE, 0, 67 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 68 }, 69 { 70 "BLOBFS_DELETE_DONE", TRACE_BLOBFS_DELETE_DONE, 71 OWNER_NONE, OBJECT_NONE, 0, 72 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 73 } 74 }; 75 76 spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts)); 77 } 78 79 void 80 cache_buffer_free(struct cache_buffer *cache_buffer) 81 { 82 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 83 free(cache_buffer); 84 } 85 86 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 87 88 struct spdk_file { 89 struct spdk_filesystem *fs; 90 struct spdk_blob *blob; 91 char *name; 92 uint64_t length; 93 bool is_deleted; 94 bool open_for_writing; 95 uint64_t length_flushed; 96 uint64_t length_xattr; 97 uint64_t append_pos; 98 uint64_t seq_byte_count; 99 uint64_t next_seq_offset; 100 uint32_t priority; 101 TAILQ_ENTRY(spdk_file) tailq; 102 spdk_blob_id blobid; 103 uint32_t ref_count; 104 pthread_spinlock_t lock; 105 struct cache_buffer *last; 106 struct cache_tree *tree; 107 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 108 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 109 TAILQ_ENTRY(spdk_file) cache_tailq; 110 }; 111 112 struct spdk_deleted_file { 113 spdk_blob_id id; 114 TAILQ_ENTRY(spdk_deleted_file) tailq; 115 }; 116 117 struct spdk_filesystem { 118 struct spdk_blob_store *bs; 119 TAILQ_HEAD(, spdk_file) files; 120 struct spdk_bs_opts bs_opts; 121 struct spdk_bs_dev *bdev; 122 fs_send_request_fn send_request; 123 124 struct { 125 uint32_t max_ops; 126 struct spdk_io_channel *sync_io_channel; 127 struct spdk_fs_channel *sync_fs_channel; 128 } sync_target; 129 130 struct { 131 uint32_t max_ops; 132 struct spdk_io_channel *md_io_channel; 133 struct spdk_fs_channel *md_fs_channel; 134 } md_target; 135 136 struct { 137 uint32_t max_ops; 138 } io_target; 139 }; 140 141 struct spdk_fs_cb_args { 142 union { 143 spdk_fs_op_with_handle_complete fs_op_with_handle; 144 spdk_fs_op_complete fs_op; 145 spdk_file_op_with_handle_complete file_op_with_handle; 146 spdk_file_op_complete file_op; 147 spdk_file_stat_op_complete stat_op; 148 } fn; 149 void *arg; 150 sem_t *sem; 151 struct spdk_filesystem *fs; 152 struct spdk_file *file; 153 int rc; 154 int *rwerrno; 155 struct iovec *iovs; 156 uint32_t iovcnt; 157 struct iovec iov; 158 union { 159 struct { 160 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 161 } fs_load; 162 struct { 163 uint64_t length; 164 } truncate; 165 struct { 166 struct spdk_io_channel *channel; 167 void *pin_buf; 168 int is_read; 169 off_t offset; 170 size_t length; 171 uint64_t start_lba; 172 uint64_t num_lba; 173 uint32_t blocklen; 174 } rw; 175 struct { 176 const char *old_name; 177 const char *new_name; 178 } rename; 179 struct { 180 struct cache_buffer *cache_buffer; 181 uint64_t length; 182 } flush; 183 struct { 184 struct cache_buffer *cache_buffer; 185 uint64_t length; 186 uint64_t offset; 187 } readahead; 188 struct { 189 /* offset of the file when the sync request was made */ 190 uint64_t offset; 191 TAILQ_ENTRY(spdk_fs_request) tailq; 192 bool xattr_in_progress; 193 /* length written to the xattr for this file - this should 194 * always be the same as the offset if only one thread is 195 * writing to the file, but could differ if multiple threads 196 * are appending 197 */ 198 uint64_t length; 199 } sync; 200 struct { 201 uint32_t num_clusters; 202 } resize; 203 struct { 204 const char *name; 205 uint32_t flags; 206 TAILQ_ENTRY(spdk_fs_request) tailq; 207 } open; 208 struct { 209 const char *name; 210 struct spdk_blob *blob; 211 } create; 212 struct { 213 const char *name; 214 } delete; 215 struct { 216 const char *name; 217 } stat; 218 } op; 219 }; 220 221 static void file_free(struct spdk_file *file); 222 static void fs_io_device_unregister(struct spdk_filesystem *fs); 223 static void fs_free_io_channels(struct spdk_filesystem *fs); 224 225 void 226 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 227 { 228 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 229 } 230 231 static int _blobfs_cache_pool_reclaim(void *arg); 232 233 static bool 234 blobfs_cache_pool_need_reclaim(void) 235 { 236 size_t count; 237 238 count = spdk_mempool_count(g_cache_pool); 239 /* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller 240 * when the number of available cache buffer is less than 1/5 of total buffers. 241 */ 242 if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) { 243 return false; 244 } 245 246 return true; 247 } 248 249 static void 250 __start_cache_pool_mgmt(void *ctx) 251 { 252 assert(g_cache_pool_mgmt_poller == NULL); 253 g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL, 254 BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 255 } 256 257 static void 258 __stop_cache_pool_mgmt(void *ctx) 259 { 260 spdk_poller_unregister(&g_cache_pool_mgmt_poller); 261 262 assert(g_cache_pool != NULL); 263 assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE); 264 spdk_mempool_free(g_cache_pool); 265 g_cache_pool = NULL; 266 267 spdk_thread_exit(g_cache_pool_thread); 268 } 269 270 static void 271 allocate_cache_pool(void) 272 { 273 assert(g_cache_pool == NULL); 274 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 275 g_fs_cache_size / CACHE_BUFFER_SIZE, 276 CACHE_BUFFER_SIZE, 277 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 278 SPDK_ENV_SOCKET_ID_ANY); 279 if (!g_cache_pool) { 280 if (spdk_mempool_lookup("spdk_fs_cache") != NULL) { 281 SPDK_ERRLOG("Unable to allocate mempool: already exists\n"); 282 SPDK_ERRLOG("Probably running in multiprocess environment, which is " 283 "unsupported by the blobfs library\n"); 284 } else { 285 SPDK_ERRLOG("Create mempool failed, you may " 286 "increase the memory and try again\n"); 287 } 288 assert(false); 289 } 290 } 291 292 static void 293 initialize_global_cache(void) 294 { 295 pthread_mutex_lock(&g_cache_init_lock); 296 if (g_fs_count == 0) { 297 allocate_cache_pool(); 298 g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL); 299 assert(g_cache_pool_thread != NULL); 300 spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL); 301 } 302 g_fs_count++; 303 pthread_mutex_unlock(&g_cache_init_lock); 304 } 305 306 static void 307 free_global_cache(void) 308 { 309 pthread_mutex_lock(&g_cache_init_lock); 310 g_fs_count--; 311 if (g_fs_count == 0) { 312 spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL); 313 } 314 pthread_mutex_unlock(&g_cache_init_lock); 315 } 316 317 static uint64_t 318 __file_get_blob_size(struct spdk_file *file) 319 { 320 uint64_t cluster_sz; 321 322 cluster_sz = file->fs->bs_opts.cluster_sz; 323 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 324 } 325 326 struct spdk_fs_request { 327 struct spdk_fs_cb_args args; 328 TAILQ_ENTRY(spdk_fs_request) link; 329 struct spdk_fs_channel *channel; 330 }; 331 332 struct spdk_fs_channel { 333 struct spdk_fs_request *req_mem; 334 TAILQ_HEAD(, spdk_fs_request) reqs; 335 sem_t sem; 336 struct spdk_filesystem *fs; 337 struct spdk_io_channel *bs_channel; 338 fs_send_request_fn send_request; 339 bool sync; 340 uint32_t outstanding_reqs; 341 pthread_spinlock_t lock; 342 }; 343 344 /* For now, this is effectively an alias. But eventually we'll shift 345 * some data members over. */ 346 struct spdk_fs_thread_ctx { 347 struct spdk_fs_channel ch; 348 }; 349 350 static struct spdk_fs_request * 351 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt) 352 { 353 struct spdk_fs_request *req; 354 struct iovec *iovs = NULL; 355 356 if (iovcnt > 1) { 357 iovs = calloc(iovcnt, sizeof(struct iovec)); 358 if (!iovs) { 359 return NULL; 360 } 361 } 362 363 if (channel->sync) { 364 pthread_spin_lock(&channel->lock); 365 } 366 367 req = TAILQ_FIRST(&channel->reqs); 368 if (req) { 369 channel->outstanding_reqs++; 370 TAILQ_REMOVE(&channel->reqs, req, link); 371 } 372 373 if (channel->sync) { 374 pthread_spin_unlock(&channel->lock); 375 } 376 377 if (req == NULL) { 378 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 379 free(iovs); 380 return NULL; 381 } 382 memset(req, 0, sizeof(*req)); 383 req->channel = channel; 384 if (iovcnt > 1) { 385 req->args.iovs = iovs; 386 } else { 387 req->args.iovs = &req->args.iov; 388 } 389 req->args.iovcnt = iovcnt; 390 391 return req; 392 } 393 394 static struct spdk_fs_request * 395 alloc_fs_request(struct spdk_fs_channel *channel) 396 { 397 return alloc_fs_request_with_iov(channel, 0); 398 } 399 400 static void 401 free_fs_request(struct spdk_fs_request *req) 402 { 403 struct spdk_fs_channel *channel = req->channel; 404 405 if (req->args.iovcnt > 1) { 406 free(req->args.iovs); 407 } 408 409 if (channel->sync) { 410 pthread_spin_lock(&channel->lock); 411 } 412 413 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 414 channel->outstanding_reqs--; 415 416 if (channel->sync) { 417 pthread_spin_unlock(&channel->lock); 418 } 419 } 420 421 static int 422 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 423 uint32_t max_ops) 424 { 425 uint32_t i; 426 427 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 428 if (!channel->req_mem) { 429 return -1; 430 } 431 432 channel->outstanding_reqs = 0; 433 TAILQ_INIT(&channel->reqs); 434 sem_init(&channel->sem, 0, 0); 435 436 for (i = 0; i < max_ops; i++) { 437 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 438 } 439 440 channel->fs = fs; 441 442 return 0; 443 } 444 445 static int 446 fs_md_channel_create(void *io_device, void *ctx_buf) 447 { 448 struct spdk_filesystem *fs; 449 struct spdk_fs_channel *channel = ctx_buf; 450 451 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 452 453 return fs_channel_create(fs, channel, fs->md_target.max_ops); 454 } 455 456 static int 457 fs_sync_channel_create(void *io_device, void *ctx_buf) 458 { 459 struct spdk_filesystem *fs; 460 struct spdk_fs_channel *channel = ctx_buf; 461 462 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 463 464 return fs_channel_create(fs, channel, fs->sync_target.max_ops); 465 } 466 467 static int 468 fs_io_channel_create(void *io_device, void *ctx_buf) 469 { 470 struct spdk_filesystem *fs; 471 struct spdk_fs_channel *channel = ctx_buf; 472 473 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 474 475 return fs_channel_create(fs, channel, fs->io_target.max_ops); 476 } 477 478 static void 479 fs_channel_destroy(void *io_device, void *ctx_buf) 480 { 481 struct spdk_fs_channel *channel = ctx_buf; 482 483 if (channel->outstanding_reqs > 0) { 484 SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n", 485 channel->outstanding_reqs); 486 } 487 488 free(channel->req_mem); 489 if (channel->bs_channel != NULL) { 490 spdk_bs_free_io_channel(channel->bs_channel); 491 } 492 } 493 494 static void 495 __send_request_direct(fs_request_fn fn, void *arg) 496 { 497 fn(arg); 498 } 499 500 static void 501 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 502 { 503 fs->bs = bs; 504 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 505 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 506 fs->md_target.md_fs_channel->send_request = __send_request_direct; 507 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 508 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 509 510 initialize_global_cache(); 511 } 512 513 static void 514 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 515 { 516 struct spdk_fs_request *req = ctx; 517 struct spdk_fs_cb_args *args = &req->args; 518 struct spdk_filesystem *fs = args->fs; 519 520 if (bserrno == 0) { 521 common_fs_bs_init(fs, bs); 522 } else { 523 free(fs); 524 fs = NULL; 525 } 526 527 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 528 free_fs_request(req); 529 } 530 531 static struct spdk_filesystem * 532 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 533 { 534 struct spdk_filesystem *fs; 535 536 fs = calloc(1, sizeof(*fs)); 537 if (fs == NULL) { 538 return NULL; 539 } 540 541 fs->bdev = dev; 542 fs->send_request = send_request_fn; 543 TAILQ_INIT(&fs->files); 544 545 fs->md_target.max_ops = 512; 546 spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy, 547 sizeof(struct spdk_fs_channel), "blobfs_md"); 548 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 549 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 550 551 fs->sync_target.max_ops = 512; 552 spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy, 553 sizeof(struct spdk_fs_channel), "blobfs_sync"); 554 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 555 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 556 557 fs->io_target.max_ops = 512; 558 spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy, 559 sizeof(struct spdk_fs_channel), "blobfs_io"); 560 561 return fs; 562 } 563 564 static void 565 __wake_caller(void *arg, int fserrno) 566 { 567 struct spdk_fs_cb_args *args = arg; 568 569 if ((args->rwerrno != NULL) && (*(args->rwerrno) == 0) && fserrno) { 570 *(args->rwerrno) = fserrno; 571 } 572 args->rc = fserrno; 573 sem_post(args->sem); 574 } 575 576 void 577 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 578 fs_send_request_fn send_request_fn, 579 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 580 { 581 struct spdk_filesystem *fs; 582 struct spdk_fs_request *req; 583 struct spdk_fs_cb_args *args; 584 struct spdk_bs_opts opts = {}; 585 586 fs = fs_alloc(dev, send_request_fn); 587 if (fs == NULL) { 588 cb_fn(cb_arg, NULL, -ENOMEM); 589 return; 590 } 591 592 req = alloc_fs_request(fs->md_target.md_fs_channel); 593 if (req == NULL) { 594 fs_free_io_channels(fs); 595 fs_io_device_unregister(fs); 596 cb_fn(cb_arg, NULL, -ENOMEM); 597 return; 598 } 599 600 args = &req->args; 601 args->fn.fs_op_with_handle = cb_fn; 602 args->arg = cb_arg; 603 args->fs = fs; 604 605 spdk_bs_opts_init(&opts, sizeof(opts)); 606 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE); 607 if (opt) { 608 opts.cluster_sz = opt->cluster_sz; 609 } 610 spdk_bs_init(dev, &opts, init_cb, req); 611 } 612 613 static struct spdk_file * 614 file_alloc(struct spdk_filesystem *fs) 615 { 616 struct spdk_file *file; 617 618 file = calloc(1, sizeof(*file)); 619 if (file == NULL) { 620 return NULL; 621 } 622 623 file->tree = calloc(1, sizeof(*file->tree)); 624 if (file->tree == NULL) { 625 free(file); 626 return NULL; 627 } 628 629 if (pthread_spin_init(&file->lock, 0)) { 630 free(file->tree); 631 free(file); 632 return NULL; 633 } 634 635 file->fs = fs; 636 TAILQ_INIT(&file->open_requests); 637 TAILQ_INIT(&file->sync_requests); 638 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 639 file->priority = SPDK_FILE_PRIORITY_LOW; 640 return file; 641 } 642 643 static void fs_load_done(void *ctx, int bserrno); 644 645 static int 646 _handle_deleted_files(struct spdk_fs_request *req) 647 { 648 struct spdk_fs_cb_args *args = &req->args; 649 struct spdk_filesystem *fs = args->fs; 650 651 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 652 struct spdk_deleted_file *deleted_file; 653 654 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 655 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 656 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 657 free(deleted_file); 658 return 0; 659 } 660 661 return 1; 662 } 663 664 static void 665 fs_load_done(void *ctx, int bserrno) 666 { 667 struct spdk_fs_request *req = ctx; 668 struct spdk_fs_cb_args *args = &req->args; 669 struct spdk_filesystem *fs = args->fs; 670 671 /* The filesystem has been loaded. Now check if there are any files that 672 * were marked for deletion before last unload. Do not complete the 673 * fs_load callback until all of them have been deleted on disk. 674 */ 675 if (_handle_deleted_files(req) == 0) { 676 /* We found a file that's been marked for deleting but not actually 677 * deleted yet. This function will get called again once the delete 678 * operation is completed. 679 */ 680 return; 681 } 682 683 args->fn.fs_op_with_handle(args->arg, fs, 0); 684 free_fs_request(req); 685 686 } 687 688 static void 689 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 690 { 691 struct spdk_fs_request *req = ctx; 692 struct spdk_fs_cb_args *args = &req->args; 693 struct spdk_filesystem *fs = args->fs; 694 uint64_t *length; 695 const char *name; 696 uint32_t *is_deleted; 697 size_t value_len; 698 699 if (rc < 0) { 700 args->fn.fs_op_with_handle(args->arg, fs, rc); 701 free_fs_request(req); 702 return; 703 } 704 705 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 706 if (rc < 0) { 707 args->fn.fs_op_with_handle(args->arg, fs, rc); 708 free_fs_request(req); 709 return; 710 } 711 712 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 713 if (rc < 0) { 714 args->fn.fs_op_with_handle(args->arg, fs, rc); 715 free_fs_request(req); 716 return; 717 } 718 719 assert(value_len == 8); 720 721 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 722 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 723 if (rc < 0) { 724 struct spdk_file *f; 725 726 f = file_alloc(fs); 727 if (f == NULL) { 728 SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n"); 729 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 730 free_fs_request(req); 731 return; 732 } 733 734 f->name = strdup(name); 735 if (!f->name) { 736 SPDK_ERRLOG("Cannot allocate memory for file name\n"); 737 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 738 free_fs_request(req); 739 file_free(f); 740 return; 741 } 742 743 f->blobid = spdk_blob_get_id(blob); 744 f->length = *length; 745 f->length_flushed = *length; 746 f->length_xattr = *length; 747 f->append_pos = *length; 748 SPDK_DEBUGLOG(blobfs, "added file %s length=%ju\n", f->name, f->length); 749 } else { 750 struct spdk_deleted_file *deleted_file; 751 752 deleted_file = calloc(1, sizeof(*deleted_file)); 753 if (deleted_file == NULL) { 754 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 755 free_fs_request(req); 756 return; 757 } 758 deleted_file->id = spdk_blob_get_id(blob); 759 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 760 } 761 } 762 763 static void 764 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 765 { 766 struct spdk_fs_request *req = ctx; 767 struct spdk_fs_cb_args *args = &req->args; 768 struct spdk_filesystem *fs = args->fs; 769 struct spdk_bs_type bstype; 770 static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE}; 771 static const struct spdk_bs_type zeros; 772 773 if (bserrno != 0) { 774 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 775 free_fs_request(req); 776 fs_free_io_channels(fs); 777 fs_io_device_unregister(fs); 778 return; 779 } 780 781 bstype = spdk_bs_get_bstype(bs); 782 783 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 784 SPDK_DEBUGLOG(blobfs, "assigning bstype\n"); 785 spdk_bs_set_bstype(bs, blobfs_type); 786 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 787 SPDK_ERRLOG("not blobfs\n"); 788 SPDK_LOGDUMP(blobfs, "bstype", &bstype, sizeof(bstype)); 789 args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL); 790 free_fs_request(req); 791 fs_free_io_channels(fs); 792 fs_io_device_unregister(fs); 793 return; 794 } 795 796 common_fs_bs_init(fs, bs); 797 fs_load_done(req, 0); 798 } 799 800 static void 801 fs_io_device_unregister(struct spdk_filesystem *fs) 802 { 803 assert(fs != NULL); 804 spdk_io_device_unregister(&fs->md_target, NULL); 805 spdk_io_device_unregister(&fs->sync_target, NULL); 806 spdk_io_device_unregister(&fs->io_target, NULL); 807 free(fs); 808 } 809 810 static void 811 fs_free_io_channels(struct spdk_filesystem *fs) 812 { 813 assert(fs != NULL); 814 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 815 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 816 } 817 818 void 819 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 820 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 821 { 822 struct spdk_filesystem *fs; 823 struct spdk_fs_cb_args *args; 824 struct spdk_fs_request *req; 825 struct spdk_bs_opts bs_opts; 826 827 fs = fs_alloc(dev, send_request_fn); 828 if (fs == NULL) { 829 cb_fn(cb_arg, NULL, -ENOMEM); 830 return; 831 } 832 833 req = alloc_fs_request(fs->md_target.md_fs_channel); 834 if (req == NULL) { 835 fs_free_io_channels(fs); 836 fs_io_device_unregister(fs); 837 cb_fn(cb_arg, NULL, -ENOMEM); 838 return; 839 } 840 841 args = &req->args; 842 args->fn.fs_op_with_handle = cb_fn; 843 args->arg = cb_arg; 844 args->fs = fs; 845 TAILQ_INIT(&args->op.fs_load.deleted_files); 846 spdk_bs_opts_init(&bs_opts, sizeof(bs_opts)); 847 bs_opts.iter_cb_fn = iter_cb; 848 bs_opts.iter_cb_arg = req; 849 spdk_bs_load(dev, &bs_opts, load_cb, req); 850 } 851 852 static void 853 unload_cb(void *ctx, int bserrno) 854 { 855 struct spdk_fs_request *req = ctx; 856 struct spdk_fs_cb_args *args = &req->args; 857 struct spdk_filesystem *fs = args->fs; 858 struct spdk_file *file, *tmp; 859 860 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 861 TAILQ_REMOVE(&fs->files, file, tailq); 862 file_free(file); 863 } 864 865 free_global_cache(); 866 867 args->fn.fs_op(args->arg, bserrno); 868 free(req); 869 870 fs_io_device_unregister(fs); 871 } 872 873 void 874 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 875 { 876 struct spdk_fs_request *req; 877 struct spdk_fs_cb_args *args; 878 879 /* 880 * We must free the md_channel before unloading the blobstore, so just 881 * allocate this request from the general heap. 882 */ 883 req = calloc(1, sizeof(*req)); 884 if (req == NULL) { 885 cb_fn(cb_arg, -ENOMEM); 886 return; 887 } 888 889 args = &req->args; 890 args->fn.fs_op = cb_fn; 891 args->arg = cb_arg; 892 args->fs = fs; 893 894 fs_free_io_channels(fs); 895 spdk_bs_unload(fs->bs, unload_cb, req); 896 } 897 898 static struct spdk_file * 899 fs_find_file(struct spdk_filesystem *fs, const char *name) 900 { 901 struct spdk_file *file; 902 903 TAILQ_FOREACH(file, &fs->files, tailq) { 904 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 905 return file; 906 } 907 } 908 909 return NULL; 910 } 911 912 void 913 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 914 spdk_file_stat_op_complete cb_fn, void *cb_arg) 915 { 916 struct spdk_file_stat stat; 917 struct spdk_file *f = NULL; 918 919 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 920 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 921 return; 922 } 923 924 f = fs_find_file(fs, name); 925 if (f != NULL) { 926 stat.blobid = f->blobid; 927 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 928 cb_fn(cb_arg, &stat, 0); 929 return; 930 } 931 932 cb_fn(cb_arg, NULL, -ENOENT); 933 } 934 935 static void 936 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 937 { 938 struct spdk_fs_request *req = arg; 939 struct spdk_fs_cb_args *args = &req->args; 940 941 args->rc = fserrno; 942 if (fserrno == 0) { 943 memcpy(args->arg, stat, sizeof(*stat)); 944 } 945 sem_post(args->sem); 946 } 947 948 static void 949 __file_stat(void *arg) 950 { 951 struct spdk_fs_request *req = arg; 952 struct spdk_fs_cb_args *args = &req->args; 953 954 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 955 args->fn.stat_op, req); 956 } 957 958 int 959 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 960 const char *name, struct spdk_file_stat *stat) 961 { 962 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 963 struct spdk_fs_request *req; 964 int rc; 965 966 req = alloc_fs_request(channel); 967 if (req == NULL) { 968 SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name); 969 return -ENOMEM; 970 } 971 972 req->args.fs = fs; 973 req->args.op.stat.name = name; 974 req->args.fn.stat_op = __copy_stat; 975 req->args.arg = stat; 976 req->args.sem = &channel->sem; 977 channel->send_request(__file_stat, req); 978 sem_wait(&channel->sem); 979 980 rc = req->args.rc; 981 free_fs_request(req); 982 983 return rc; 984 } 985 986 static void 987 fs_create_blob_close_cb(void *ctx, int bserrno) 988 { 989 int rc; 990 struct spdk_fs_request *req = ctx; 991 struct spdk_fs_cb_args *args = &req->args; 992 993 rc = args->rc ? args->rc : bserrno; 994 args->fn.file_op(args->arg, rc); 995 free_fs_request(req); 996 } 997 998 static void 999 fs_create_blob_resize_cb(void *ctx, int bserrno) 1000 { 1001 struct spdk_fs_request *req = ctx; 1002 struct spdk_fs_cb_args *args = &req->args; 1003 struct spdk_file *f = args->file; 1004 struct spdk_blob *blob = args->op.create.blob; 1005 uint64_t length = 0; 1006 1007 args->rc = bserrno; 1008 if (bserrno) { 1009 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1010 return; 1011 } 1012 1013 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 1014 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 1015 1016 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1017 } 1018 1019 static void 1020 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1021 { 1022 struct spdk_fs_request *req = ctx; 1023 struct spdk_fs_cb_args *args = &req->args; 1024 1025 if (bserrno) { 1026 args->fn.file_op(args->arg, bserrno); 1027 free_fs_request(req); 1028 return; 1029 } 1030 1031 args->op.create.blob = blob; 1032 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 1033 } 1034 1035 static void 1036 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 1037 { 1038 struct spdk_fs_request *req = ctx; 1039 struct spdk_fs_cb_args *args = &req->args; 1040 struct spdk_file *f = args->file; 1041 1042 if (bserrno) { 1043 args->fn.file_op(args->arg, bserrno); 1044 free_fs_request(req); 1045 return; 1046 } 1047 1048 f->blobid = blobid; 1049 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 1050 } 1051 1052 void 1053 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 1054 spdk_file_op_complete cb_fn, void *cb_arg) 1055 { 1056 struct spdk_file *file; 1057 struct spdk_fs_request *req; 1058 struct spdk_fs_cb_args *args; 1059 1060 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1061 cb_fn(cb_arg, -ENAMETOOLONG); 1062 return; 1063 } 1064 1065 file = fs_find_file(fs, name); 1066 if (file != NULL) { 1067 cb_fn(cb_arg, -EEXIST); 1068 return; 1069 } 1070 1071 file = file_alloc(fs); 1072 if (file == NULL) { 1073 SPDK_ERRLOG("Cannot allocate new file for creation\n"); 1074 cb_fn(cb_arg, -ENOMEM); 1075 return; 1076 } 1077 1078 req = alloc_fs_request(fs->md_target.md_fs_channel); 1079 if (req == NULL) { 1080 SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name); 1081 TAILQ_REMOVE(&fs->files, file, tailq); 1082 file_free(file); 1083 cb_fn(cb_arg, -ENOMEM); 1084 return; 1085 } 1086 1087 args = &req->args; 1088 args->file = file; 1089 args->fn.file_op = cb_fn; 1090 args->arg = cb_arg; 1091 1092 file->name = strdup(name); 1093 if (!file->name) { 1094 SPDK_ERRLOG("Cannot allocate file->name for file=%s\n", name); 1095 free_fs_request(req); 1096 TAILQ_REMOVE(&fs->files, file, tailq); 1097 file_free(file); 1098 cb_fn(cb_arg, -ENOMEM); 1099 return; 1100 } 1101 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 1102 } 1103 1104 static void 1105 __fs_create_file_done(void *arg, int fserrno) 1106 { 1107 struct spdk_fs_request *req = arg; 1108 struct spdk_fs_cb_args *args = &req->args; 1109 1110 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name); 1111 __wake_caller(args, fserrno); 1112 } 1113 1114 static void 1115 __fs_create_file(void *arg) 1116 { 1117 struct spdk_fs_request *req = arg; 1118 struct spdk_fs_cb_args *args = &req->args; 1119 1120 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name); 1121 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1122 } 1123 1124 int 1125 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name) 1126 { 1127 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1128 struct spdk_fs_request *req; 1129 struct spdk_fs_cb_args *args; 1130 int rc; 1131 1132 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1133 1134 req = alloc_fs_request(channel); 1135 if (req == NULL) { 1136 SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name); 1137 return -ENOMEM; 1138 } 1139 1140 args = &req->args; 1141 args->fs = fs; 1142 args->op.create.name = name; 1143 args->sem = &channel->sem; 1144 fs->send_request(__fs_create_file, req); 1145 sem_wait(&channel->sem); 1146 rc = args->rc; 1147 free_fs_request(req); 1148 1149 return rc; 1150 } 1151 1152 static void 1153 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1154 { 1155 struct spdk_fs_request *req = ctx; 1156 struct spdk_fs_cb_args *args = &req->args; 1157 struct spdk_file *f = args->file; 1158 1159 f->blob = blob; 1160 while (!TAILQ_EMPTY(&f->open_requests)) { 1161 req = TAILQ_FIRST(&f->open_requests); 1162 args = &req->args; 1163 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1164 spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->name); 1165 args->fn.file_op_with_handle(args->arg, f, bserrno); 1166 free_fs_request(req); 1167 } 1168 } 1169 1170 static void 1171 fs_open_blob_create_cb(void *ctx, int bserrno) 1172 { 1173 struct spdk_fs_request *req = ctx; 1174 struct spdk_fs_cb_args *args = &req->args; 1175 struct spdk_file *file = args->file; 1176 struct spdk_filesystem *fs = args->fs; 1177 1178 if (file == NULL) { 1179 /* 1180 * This is from an open with CREATE flag - the file 1181 * is now created so look it up in the file list for this 1182 * filesystem. 1183 */ 1184 file = fs_find_file(fs, args->op.open.name); 1185 assert(file != NULL); 1186 args->file = file; 1187 } 1188 1189 file->ref_count++; 1190 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1191 if (file->ref_count == 1) { 1192 assert(file->blob == NULL); 1193 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1194 } else if (file->blob != NULL) { 1195 fs_open_blob_done(req, file->blob, 0); 1196 } else { 1197 /* 1198 * The blob open for this file is in progress due to a previous 1199 * open request. When that open completes, it will invoke the 1200 * open callback for this request. 1201 */ 1202 } 1203 } 1204 1205 void 1206 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1207 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1208 { 1209 struct spdk_file *f = NULL; 1210 struct spdk_fs_request *req; 1211 struct spdk_fs_cb_args *args; 1212 1213 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1214 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1215 return; 1216 } 1217 1218 f = fs_find_file(fs, name); 1219 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1220 cb_fn(cb_arg, NULL, -ENOENT); 1221 return; 1222 } 1223 1224 if (f != NULL && f->is_deleted == true) { 1225 cb_fn(cb_arg, NULL, -ENOENT); 1226 return; 1227 } 1228 1229 req = alloc_fs_request(fs->md_target.md_fs_channel); 1230 if (req == NULL) { 1231 SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name); 1232 cb_fn(cb_arg, NULL, -ENOMEM); 1233 return; 1234 } 1235 1236 args = &req->args; 1237 args->fn.file_op_with_handle = cb_fn; 1238 args->arg = cb_arg; 1239 args->file = f; 1240 args->fs = fs; 1241 args->op.open.name = name; 1242 1243 if (f == NULL) { 1244 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1245 } else { 1246 fs_open_blob_create_cb(req, 0); 1247 } 1248 } 1249 1250 static void 1251 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1252 { 1253 struct spdk_fs_request *req = arg; 1254 struct spdk_fs_cb_args *args = &req->args; 1255 1256 args->file = file; 1257 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name); 1258 __wake_caller(args, bserrno); 1259 } 1260 1261 static void 1262 __fs_open_file(void *arg) 1263 { 1264 struct spdk_fs_request *req = arg; 1265 struct spdk_fs_cb_args *args = &req->args; 1266 1267 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name); 1268 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1269 __fs_open_file_done, req); 1270 } 1271 1272 int 1273 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1274 const char *name, uint32_t flags, struct spdk_file **file) 1275 { 1276 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1277 struct spdk_fs_request *req; 1278 struct spdk_fs_cb_args *args; 1279 int rc; 1280 1281 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1282 1283 req = alloc_fs_request(channel); 1284 if (req == NULL) { 1285 SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name); 1286 return -ENOMEM; 1287 } 1288 1289 args = &req->args; 1290 args->fs = fs; 1291 args->op.open.name = name; 1292 args->op.open.flags = flags; 1293 args->sem = &channel->sem; 1294 fs->send_request(__fs_open_file, req); 1295 sem_wait(&channel->sem); 1296 rc = args->rc; 1297 if (rc == 0) { 1298 *file = args->file; 1299 } else { 1300 *file = NULL; 1301 } 1302 free_fs_request(req); 1303 1304 return rc; 1305 } 1306 1307 static void 1308 fs_rename_blob_close_cb(void *ctx, int bserrno) 1309 { 1310 struct spdk_fs_request *req = ctx; 1311 struct spdk_fs_cb_args *args = &req->args; 1312 1313 args->fn.fs_op(args->arg, bserrno); 1314 free_fs_request(req); 1315 } 1316 1317 static void 1318 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1319 { 1320 struct spdk_fs_request *req = ctx; 1321 struct spdk_fs_cb_args *args = &req->args; 1322 const char *new_name = args->op.rename.new_name; 1323 1324 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1325 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1326 } 1327 1328 static void 1329 _fs_md_rename_file(struct spdk_fs_request *req) 1330 { 1331 struct spdk_fs_cb_args *args = &req->args; 1332 struct spdk_file *f; 1333 1334 f = fs_find_file(args->fs, args->op.rename.old_name); 1335 if (f == NULL) { 1336 args->fn.fs_op(args->arg, -ENOENT); 1337 free_fs_request(req); 1338 return; 1339 } 1340 1341 free(f->name); 1342 f->name = strdup(args->op.rename.new_name); 1343 if (!f->name) { 1344 SPDK_ERRLOG("Cannot allocate memory for file name\n"); 1345 args->fn.fs_op(args->arg, -ENOMEM); 1346 free_fs_request(req); 1347 return; 1348 } 1349 1350 args->file = f; 1351 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1352 } 1353 1354 static void 1355 fs_rename_delete_done(void *arg, int fserrno) 1356 { 1357 _fs_md_rename_file(arg); 1358 } 1359 1360 void 1361 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1362 const char *old_name, const char *new_name, 1363 spdk_file_op_complete cb_fn, void *cb_arg) 1364 { 1365 struct spdk_file *f; 1366 struct spdk_fs_request *req; 1367 struct spdk_fs_cb_args *args; 1368 1369 SPDK_DEBUGLOG(blobfs, "old=%s new=%s\n", old_name, new_name); 1370 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1371 cb_fn(cb_arg, -ENAMETOOLONG); 1372 return; 1373 } 1374 1375 req = alloc_fs_request(fs->md_target.md_fs_channel); 1376 if (req == NULL) { 1377 SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name, 1378 new_name); 1379 cb_fn(cb_arg, -ENOMEM); 1380 return; 1381 } 1382 1383 args = &req->args; 1384 args->fn.fs_op = cb_fn; 1385 args->fs = fs; 1386 args->arg = cb_arg; 1387 args->op.rename.old_name = old_name; 1388 args->op.rename.new_name = new_name; 1389 1390 f = fs_find_file(fs, new_name); 1391 if (f == NULL) { 1392 _fs_md_rename_file(req); 1393 return; 1394 } 1395 1396 /* 1397 * The rename overwrites an existing file. So delete the existing file, then 1398 * do the actual rename. 1399 */ 1400 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1401 } 1402 1403 static void 1404 __fs_rename_file_done(void *arg, int fserrno) 1405 { 1406 struct spdk_fs_request *req = arg; 1407 struct spdk_fs_cb_args *args = &req->args; 1408 1409 __wake_caller(args, fserrno); 1410 } 1411 1412 static void 1413 __fs_rename_file(void *arg) 1414 { 1415 struct spdk_fs_request *req = arg; 1416 struct spdk_fs_cb_args *args = &req->args; 1417 1418 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1419 __fs_rename_file_done, req); 1420 } 1421 1422 int 1423 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1424 const char *old_name, const char *new_name) 1425 { 1426 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1427 struct spdk_fs_request *req; 1428 struct spdk_fs_cb_args *args; 1429 int rc; 1430 1431 req = alloc_fs_request(channel); 1432 if (req == NULL) { 1433 SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name); 1434 return -ENOMEM; 1435 } 1436 1437 args = &req->args; 1438 1439 args->fs = fs; 1440 args->op.rename.old_name = old_name; 1441 args->op.rename.new_name = new_name; 1442 args->sem = &channel->sem; 1443 fs->send_request(__fs_rename_file, req); 1444 sem_wait(&channel->sem); 1445 rc = args->rc; 1446 free_fs_request(req); 1447 return rc; 1448 } 1449 1450 static void 1451 blob_delete_cb(void *ctx, int bserrno) 1452 { 1453 struct spdk_fs_request *req = ctx; 1454 struct spdk_fs_cb_args *args = &req->args; 1455 1456 args->fn.file_op(args->arg, bserrno); 1457 free_fs_request(req); 1458 } 1459 1460 void 1461 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1462 spdk_file_op_complete cb_fn, void *cb_arg) 1463 { 1464 struct spdk_file *f; 1465 spdk_blob_id blobid; 1466 struct spdk_fs_request *req; 1467 struct spdk_fs_cb_args *args; 1468 1469 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1470 1471 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1472 cb_fn(cb_arg, -ENAMETOOLONG); 1473 return; 1474 } 1475 1476 f = fs_find_file(fs, name); 1477 if (f == NULL) { 1478 SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name); 1479 cb_fn(cb_arg, -ENOENT); 1480 return; 1481 } 1482 1483 req = alloc_fs_request(fs->md_target.md_fs_channel); 1484 if (req == NULL) { 1485 SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name); 1486 cb_fn(cb_arg, -ENOMEM); 1487 return; 1488 } 1489 1490 args = &req->args; 1491 args->fn.file_op = cb_fn; 1492 args->arg = cb_arg; 1493 1494 if (f->ref_count > 0) { 1495 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1496 f->is_deleted = true; 1497 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1498 spdk_blob_sync_md(f->blob, blob_delete_cb, req); 1499 return; 1500 } 1501 1502 blobid = f->blobid; 1503 TAILQ_REMOVE(&fs->files, f, tailq); 1504 1505 file_free(f); 1506 1507 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1508 } 1509 1510 static void 1511 __fs_delete_file_done(void *arg, int fserrno) 1512 { 1513 struct spdk_fs_request *req = arg; 1514 struct spdk_fs_cb_args *args = &req->args; 1515 1516 spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, args->op.delete.name); 1517 __wake_caller(args, fserrno); 1518 } 1519 1520 static void 1521 __fs_delete_file(void *arg) 1522 { 1523 struct spdk_fs_request *req = arg; 1524 struct spdk_fs_cb_args *args = &req->args; 1525 1526 spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, args->op.delete.name); 1527 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1528 } 1529 1530 int 1531 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1532 const char *name) 1533 { 1534 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1535 struct spdk_fs_request *req; 1536 struct spdk_fs_cb_args *args; 1537 int rc; 1538 1539 req = alloc_fs_request(channel); 1540 if (req == NULL) { 1541 SPDK_DEBUGLOG(blobfs, "Cannot allocate req to delete file=%s\n", name); 1542 return -ENOMEM; 1543 } 1544 1545 args = &req->args; 1546 args->fs = fs; 1547 args->op.delete.name = name; 1548 args->sem = &channel->sem; 1549 fs->send_request(__fs_delete_file, req); 1550 sem_wait(&channel->sem); 1551 rc = args->rc; 1552 free_fs_request(req); 1553 1554 return rc; 1555 } 1556 1557 spdk_fs_iter 1558 spdk_fs_iter_first(struct spdk_filesystem *fs) 1559 { 1560 struct spdk_file *f; 1561 1562 f = TAILQ_FIRST(&fs->files); 1563 return f; 1564 } 1565 1566 spdk_fs_iter 1567 spdk_fs_iter_next(spdk_fs_iter iter) 1568 { 1569 struct spdk_file *f = iter; 1570 1571 if (f == NULL) { 1572 return NULL; 1573 } 1574 1575 f = TAILQ_NEXT(f, tailq); 1576 return f; 1577 } 1578 1579 const char * 1580 spdk_file_get_name(struct spdk_file *file) 1581 { 1582 return file->name; 1583 } 1584 1585 uint64_t 1586 spdk_file_get_length(struct spdk_file *file) 1587 { 1588 uint64_t length; 1589 1590 assert(file != NULL); 1591 1592 length = file->append_pos >= file->length ? file->append_pos : file->length; 1593 SPDK_DEBUGLOG(blobfs, "file=%s length=0x%jx\n", file->name, length); 1594 return length; 1595 } 1596 1597 static void 1598 fs_truncate_complete_cb(void *ctx, int bserrno) 1599 { 1600 struct spdk_fs_request *req = ctx; 1601 struct spdk_fs_cb_args *args = &req->args; 1602 1603 args->fn.file_op(args->arg, bserrno); 1604 free_fs_request(req); 1605 } 1606 1607 static void 1608 fs_truncate_resize_cb(void *ctx, int bserrno) 1609 { 1610 struct spdk_fs_request *req = ctx; 1611 struct spdk_fs_cb_args *args = &req->args; 1612 struct spdk_file *file = args->file; 1613 uint64_t *length = &args->op.truncate.length; 1614 1615 if (bserrno) { 1616 args->fn.file_op(args->arg, bserrno); 1617 free_fs_request(req); 1618 return; 1619 } 1620 1621 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1622 1623 file->length = *length; 1624 if (file->append_pos > file->length) { 1625 file->append_pos = file->length; 1626 } 1627 1628 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req); 1629 } 1630 1631 static uint64_t 1632 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1633 { 1634 return (length + cluster_sz - 1) / cluster_sz; 1635 } 1636 1637 void 1638 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1639 spdk_file_op_complete cb_fn, void *cb_arg) 1640 { 1641 struct spdk_filesystem *fs; 1642 size_t num_clusters; 1643 struct spdk_fs_request *req; 1644 struct spdk_fs_cb_args *args; 1645 1646 SPDK_DEBUGLOG(blobfs, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1647 if (length == file->length) { 1648 cb_fn(cb_arg, 0); 1649 return; 1650 } 1651 1652 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1653 if (req == NULL) { 1654 cb_fn(cb_arg, -ENOMEM); 1655 return; 1656 } 1657 1658 args = &req->args; 1659 args->fn.file_op = cb_fn; 1660 args->arg = cb_arg; 1661 args->file = file; 1662 args->op.truncate.length = length; 1663 fs = file->fs; 1664 1665 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1666 1667 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1668 } 1669 1670 static void 1671 __truncate(void *arg) 1672 { 1673 struct spdk_fs_request *req = arg; 1674 struct spdk_fs_cb_args *args = &req->args; 1675 1676 spdk_file_truncate_async(args->file, args->op.truncate.length, 1677 args->fn.file_op, args); 1678 } 1679 1680 int 1681 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 1682 uint64_t length) 1683 { 1684 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1685 struct spdk_fs_request *req; 1686 struct spdk_fs_cb_args *args; 1687 int rc; 1688 1689 req = alloc_fs_request(channel); 1690 if (req == NULL) { 1691 return -ENOMEM; 1692 } 1693 1694 args = &req->args; 1695 1696 args->file = file; 1697 args->op.truncate.length = length; 1698 args->fn.file_op = __wake_caller; 1699 args->sem = &channel->sem; 1700 1701 channel->send_request(__truncate, req); 1702 sem_wait(&channel->sem); 1703 rc = args->rc; 1704 free_fs_request(req); 1705 1706 return rc; 1707 } 1708 1709 static void 1710 __rw_done(void *ctx, int bserrno) 1711 { 1712 struct spdk_fs_request *req = ctx; 1713 struct spdk_fs_cb_args *args = &req->args; 1714 1715 spdk_free(args->op.rw.pin_buf); 1716 args->fn.file_op(args->arg, bserrno); 1717 free_fs_request(req); 1718 } 1719 1720 static void 1721 __read_done(void *ctx, int bserrno) 1722 { 1723 struct spdk_fs_request *req = ctx; 1724 struct spdk_fs_cb_args *args = &req->args; 1725 void *buf; 1726 1727 if (bserrno) { 1728 __rw_done(req, bserrno); 1729 return; 1730 } 1731 1732 assert(req != NULL); 1733 buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1))); 1734 if (args->op.rw.is_read) { 1735 spdk_copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length); 1736 __rw_done(req, 0); 1737 } else { 1738 spdk_copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt); 1739 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1740 args->op.rw.pin_buf, 1741 args->op.rw.start_lba, args->op.rw.num_lba, 1742 __rw_done, req); 1743 } 1744 } 1745 1746 static void 1747 __do_blob_read(void *ctx, int fserrno) 1748 { 1749 struct spdk_fs_request *req = ctx; 1750 struct spdk_fs_cb_args *args = &req->args; 1751 1752 if (fserrno) { 1753 __rw_done(req, fserrno); 1754 return; 1755 } 1756 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1757 args->op.rw.pin_buf, 1758 args->op.rw.start_lba, args->op.rw.num_lba, 1759 __read_done, req); 1760 } 1761 1762 static void 1763 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1764 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1765 { 1766 uint64_t end_lba; 1767 1768 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1769 *start_lba = offset / *lba_size; 1770 end_lba = (offset + length - 1) / *lba_size; 1771 *num_lba = (end_lba - *start_lba + 1); 1772 } 1773 1774 static bool 1775 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length) 1776 { 1777 uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1778 1779 if ((offset % lba_size == 0) && (length % lba_size == 0)) { 1780 return true; 1781 } 1782 1783 return false; 1784 } 1785 1786 static void 1787 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt) 1788 { 1789 uint32_t i; 1790 1791 for (i = 0; i < iovcnt; i++) { 1792 req->args.iovs[i].iov_base = iovs[i].iov_base; 1793 req->args.iovs[i].iov_len = iovs[i].iov_len; 1794 } 1795 } 1796 1797 static void 1798 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel, 1799 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1800 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1801 { 1802 struct spdk_fs_request *req; 1803 struct spdk_fs_cb_args *args; 1804 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1805 uint64_t start_lba, num_lba, pin_buf_length; 1806 uint32_t lba_size; 1807 1808 if (is_read && offset + length > file->length) { 1809 cb_fn(cb_arg, -EINVAL); 1810 return; 1811 } 1812 1813 req = alloc_fs_request_with_iov(channel, iovcnt); 1814 if (req == NULL) { 1815 cb_fn(cb_arg, -ENOMEM); 1816 return; 1817 } 1818 1819 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1820 1821 args = &req->args; 1822 args->fn.file_op = cb_fn; 1823 args->arg = cb_arg; 1824 args->file = file; 1825 args->op.rw.channel = channel->bs_channel; 1826 _fs_request_setup_iovs(req, iovs, iovcnt); 1827 args->op.rw.is_read = is_read; 1828 args->op.rw.offset = offset; 1829 args->op.rw.blocklen = lba_size; 1830 1831 pin_buf_length = num_lba * lba_size; 1832 args->op.rw.length = pin_buf_length; 1833 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL, 1834 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1835 if (args->op.rw.pin_buf == NULL) { 1836 SPDK_DEBUGLOG(blobfs, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1837 file->name, offset, length); 1838 free_fs_request(req); 1839 cb_fn(cb_arg, -ENOMEM); 1840 return; 1841 } 1842 1843 args->op.rw.start_lba = start_lba; 1844 args->op.rw.num_lba = num_lba; 1845 1846 if (!is_read && file->length < offset + length) { 1847 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1848 } else if (!is_read && __is_lba_aligned(file, offset, length)) { 1849 spdk_copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt); 1850 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1851 args->op.rw.pin_buf, 1852 args->op.rw.start_lba, args->op.rw.num_lba, 1853 __rw_done, req); 1854 } else { 1855 __do_blob_read(req, 0); 1856 } 1857 } 1858 1859 static void 1860 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel, 1861 void *payload, uint64_t offset, uint64_t length, 1862 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1863 { 1864 struct iovec iov; 1865 1866 iov.iov_base = payload; 1867 iov.iov_len = (size_t)length; 1868 1869 __readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read); 1870 } 1871 1872 void 1873 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1874 void *payload, uint64_t offset, uint64_t length, 1875 spdk_file_op_complete cb_fn, void *cb_arg) 1876 { 1877 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1878 } 1879 1880 void 1881 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel, 1882 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1883 spdk_file_op_complete cb_fn, void *cb_arg) 1884 { 1885 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1886 file->name, offset, length); 1887 1888 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0); 1889 } 1890 1891 void 1892 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1893 void *payload, uint64_t offset, uint64_t length, 1894 spdk_file_op_complete cb_fn, void *cb_arg) 1895 { 1896 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1897 file->name, offset, length); 1898 1899 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1900 } 1901 1902 void 1903 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel, 1904 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1905 spdk_file_op_complete cb_fn, void *cb_arg) 1906 { 1907 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1908 file->name, offset, length); 1909 1910 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1); 1911 } 1912 1913 struct spdk_io_channel * 1914 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1915 { 1916 struct spdk_io_channel *io_channel; 1917 struct spdk_fs_channel *fs_channel; 1918 1919 io_channel = spdk_get_io_channel(&fs->io_target); 1920 fs_channel = spdk_io_channel_get_ctx(io_channel); 1921 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1922 fs_channel->send_request = __send_request_direct; 1923 1924 return io_channel; 1925 } 1926 1927 void 1928 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1929 { 1930 spdk_put_io_channel(channel); 1931 } 1932 1933 struct spdk_fs_thread_ctx * 1934 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs) 1935 { 1936 struct spdk_fs_thread_ctx *ctx; 1937 1938 ctx = calloc(1, sizeof(*ctx)); 1939 if (!ctx) { 1940 return NULL; 1941 } 1942 1943 if (pthread_spin_init(&ctx->ch.lock, 0)) { 1944 free(ctx); 1945 return NULL; 1946 } 1947 1948 fs_channel_create(fs, &ctx->ch, 512); 1949 1950 ctx->ch.send_request = fs->send_request; 1951 ctx->ch.sync = 1; 1952 1953 return ctx; 1954 } 1955 1956 1957 void 1958 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx) 1959 { 1960 assert(ctx->ch.sync == 1); 1961 1962 while (true) { 1963 pthread_spin_lock(&ctx->ch.lock); 1964 if (ctx->ch.outstanding_reqs == 0) { 1965 pthread_spin_unlock(&ctx->ch.lock); 1966 break; 1967 } 1968 pthread_spin_unlock(&ctx->ch.lock); 1969 usleep(1000); 1970 } 1971 1972 fs_channel_destroy(NULL, &ctx->ch); 1973 free(ctx); 1974 } 1975 1976 int 1977 spdk_fs_set_cache_size(uint64_t size_in_mb) 1978 { 1979 /* setting g_fs_cache_size is only permitted if cache pool 1980 * is already freed or hasn't been initialized 1981 */ 1982 if (g_cache_pool != NULL) { 1983 return -EPERM; 1984 } 1985 1986 g_fs_cache_size = size_in_mb * 1024 * 1024; 1987 1988 return 0; 1989 } 1990 1991 uint64_t 1992 spdk_fs_get_cache_size(void) 1993 { 1994 return g_fs_cache_size / (1024 * 1024); 1995 } 1996 1997 static void __file_flush(void *ctx); 1998 1999 /* Try to free some cache buffers from this file. 2000 */ 2001 static int 2002 reclaim_cache_buffers(struct spdk_file *file) 2003 { 2004 int rc; 2005 2006 BLOBFS_TRACE(file, "free=%s\n", file->name); 2007 2008 /* The function is safe to be called with any threads, while the file 2009 * lock maybe locked by other thread for now, so try to get the file 2010 * lock here. 2011 */ 2012 rc = pthread_spin_trylock(&file->lock); 2013 if (rc != 0) { 2014 return -1; 2015 } 2016 2017 if (file->tree->present_mask == 0) { 2018 pthread_spin_unlock(&file->lock); 2019 return -1; 2020 } 2021 tree_free_buffers(file->tree); 2022 2023 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2024 /* If not freed, put it in the end of the queue */ 2025 if (file->tree->present_mask != 0) { 2026 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2027 } else { 2028 file->last = NULL; 2029 } 2030 pthread_spin_unlock(&file->lock); 2031 2032 return 0; 2033 } 2034 2035 static int 2036 _blobfs_cache_pool_reclaim(void *arg) 2037 { 2038 struct spdk_file *file, *tmp; 2039 int rc; 2040 2041 if (!blobfs_cache_pool_need_reclaim()) { 2042 return SPDK_POLLER_IDLE; 2043 } 2044 2045 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2046 if (!file->open_for_writing && 2047 file->priority == SPDK_FILE_PRIORITY_LOW) { 2048 rc = reclaim_cache_buffers(file); 2049 if (rc < 0) { 2050 continue; 2051 } 2052 if (!blobfs_cache_pool_need_reclaim()) { 2053 return SPDK_POLLER_BUSY; 2054 } 2055 break; 2056 } 2057 } 2058 2059 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2060 if (!file->open_for_writing) { 2061 rc = reclaim_cache_buffers(file); 2062 if (rc < 0) { 2063 continue; 2064 } 2065 if (!blobfs_cache_pool_need_reclaim()) { 2066 return SPDK_POLLER_BUSY; 2067 } 2068 break; 2069 } 2070 } 2071 2072 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2073 rc = reclaim_cache_buffers(file); 2074 if (rc < 0) { 2075 continue; 2076 } 2077 break; 2078 } 2079 2080 return SPDK_POLLER_BUSY; 2081 } 2082 2083 static void 2084 _add_file_to_cache_pool(void *ctx) 2085 { 2086 struct spdk_file *file = ctx; 2087 2088 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2089 } 2090 2091 static void 2092 _remove_file_from_cache_pool(void *ctx) 2093 { 2094 struct spdk_file *file = ctx; 2095 2096 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2097 } 2098 2099 static struct cache_buffer * 2100 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 2101 { 2102 struct cache_buffer *buf; 2103 int count = 0; 2104 bool need_update = false; 2105 2106 buf = calloc(1, sizeof(*buf)); 2107 if (buf == NULL) { 2108 SPDK_DEBUGLOG(blobfs, "calloc failed\n"); 2109 return NULL; 2110 } 2111 2112 do { 2113 buf->buf = spdk_mempool_get(g_cache_pool); 2114 if (buf->buf) { 2115 break; 2116 } 2117 if (count++ == 100) { 2118 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 2119 file, offset); 2120 free(buf); 2121 return NULL; 2122 } 2123 usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 2124 } while (true); 2125 2126 buf->buf_size = CACHE_BUFFER_SIZE; 2127 buf->offset = offset; 2128 2129 if (file->tree->present_mask == 0) { 2130 need_update = true; 2131 } 2132 file->tree = tree_insert_buffer(file->tree, buf); 2133 2134 if (need_update) { 2135 spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file); 2136 } 2137 2138 return buf; 2139 } 2140 2141 static struct cache_buffer * 2142 cache_append_buffer(struct spdk_file *file) 2143 { 2144 struct cache_buffer *last; 2145 2146 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 2147 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 2148 2149 last = cache_insert_buffer(file, file->append_pos); 2150 if (last == NULL) { 2151 SPDK_DEBUGLOG(blobfs, "cache_insert_buffer failed\n"); 2152 return NULL; 2153 } 2154 2155 file->last = last; 2156 2157 return last; 2158 } 2159 2160 static void __check_sync_reqs(struct spdk_file *file); 2161 2162 static void 2163 __file_cache_finish_sync(void *ctx, int bserrno) 2164 { 2165 struct spdk_file *file; 2166 struct spdk_fs_request *sync_req = ctx; 2167 struct spdk_fs_cb_args *sync_args; 2168 2169 sync_args = &sync_req->args; 2170 file = sync_args->file; 2171 pthread_spin_lock(&file->lock); 2172 file->length_xattr = sync_args->op.sync.length; 2173 assert(sync_args->op.sync.offset <= file->length_flushed); 2174 spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset, 2175 0, file->name); 2176 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 2177 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 2178 pthread_spin_unlock(&file->lock); 2179 2180 sync_args->fn.file_op(sync_args->arg, bserrno); 2181 2182 free_fs_request(sync_req); 2183 __check_sync_reqs(file); 2184 } 2185 2186 static void 2187 __check_sync_reqs(struct spdk_file *file) 2188 { 2189 struct spdk_fs_request *sync_req; 2190 2191 pthread_spin_lock(&file->lock); 2192 2193 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 2194 if (sync_req->args.op.sync.offset <= file->length_flushed) { 2195 break; 2196 } 2197 } 2198 2199 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 2200 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 2201 sync_req->args.op.sync.xattr_in_progress = true; 2202 sync_req->args.op.sync.length = file->length_flushed; 2203 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 2204 sizeof(file->length_flushed)); 2205 2206 pthread_spin_unlock(&file->lock); 2207 spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed, 2208 0, file->name); 2209 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req); 2210 } else { 2211 pthread_spin_unlock(&file->lock); 2212 } 2213 } 2214 2215 static void 2216 __file_flush_done(void *ctx, int bserrno) 2217 { 2218 struct spdk_fs_request *req = ctx; 2219 struct spdk_fs_cb_args *args = &req->args; 2220 struct spdk_file *file = args->file; 2221 struct cache_buffer *next = args->op.flush.cache_buffer; 2222 2223 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 2224 2225 pthread_spin_lock(&file->lock); 2226 next->in_progress = false; 2227 next->bytes_flushed += args->op.flush.length; 2228 file->length_flushed += args->op.flush.length; 2229 if (file->length_flushed > file->length) { 2230 file->length = file->length_flushed; 2231 } 2232 if (next->bytes_flushed == next->buf_size) { 2233 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 2234 next = tree_find_buffer(file->tree, file->length_flushed); 2235 } 2236 2237 /* 2238 * Assert that there is no cached data that extends past the end of the underlying 2239 * blob. 2240 */ 2241 assert(next == NULL || next->offset < __file_get_blob_size(file) || 2242 next->bytes_filled == 0); 2243 2244 pthread_spin_unlock(&file->lock); 2245 2246 __check_sync_reqs(file); 2247 2248 __file_flush(req); 2249 } 2250 2251 static void 2252 __file_flush(void *ctx) 2253 { 2254 struct spdk_fs_request *req = ctx; 2255 struct spdk_fs_cb_args *args = &req->args; 2256 struct spdk_file *file = args->file; 2257 struct cache_buffer *next; 2258 uint64_t offset, length, start_lba, num_lba; 2259 uint32_t lba_size; 2260 2261 pthread_spin_lock(&file->lock); 2262 next = tree_find_buffer(file->tree, file->length_flushed); 2263 if (next == NULL || next->in_progress || 2264 ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) { 2265 /* 2266 * There is either no data to flush, a flush I/O is already in 2267 * progress, or the next buffer is partially filled but there's no 2268 * outstanding request to sync it. 2269 * So return immediately - if a flush I/O is in progress we will flush 2270 * more data after that is completed, or a partial buffer will get flushed 2271 * when it is either filled or the file is synced. 2272 */ 2273 free_fs_request(req); 2274 if (next == NULL) { 2275 /* 2276 * For cases where a file's cache was evicted, and then the 2277 * file was later appended, we will write the data directly 2278 * to disk and bypass cache. So just update length_flushed 2279 * here to reflect that all data was already written to disk. 2280 */ 2281 file->length_flushed = file->append_pos; 2282 } 2283 pthread_spin_unlock(&file->lock); 2284 if (next == NULL) { 2285 /* 2286 * There is no data to flush, but we still need to check for any 2287 * outstanding sync requests to make sure metadata gets updated. 2288 */ 2289 __check_sync_reqs(file); 2290 } 2291 return; 2292 } 2293 2294 offset = next->offset + next->bytes_flushed; 2295 length = next->bytes_filled - next->bytes_flushed; 2296 if (length == 0) { 2297 free_fs_request(req); 2298 pthread_spin_unlock(&file->lock); 2299 /* 2300 * There is no data to flush, but we still need to check for any 2301 * outstanding sync requests to make sure metadata gets updated. 2302 */ 2303 __check_sync_reqs(file); 2304 return; 2305 } 2306 args->op.flush.length = length; 2307 args->op.flush.cache_buffer = next; 2308 2309 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2310 2311 next->in_progress = true; 2312 BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n", 2313 offset, length, start_lba, num_lba); 2314 pthread_spin_unlock(&file->lock); 2315 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2316 next->buf + (start_lba * lba_size) - next->offset, 2317 start_lba, num_lba, __file_flush_done, req); 2318 } 2319 2320 static void 2321 __file_extend_done(void *arg, int bserrno) 2322 { 2323 struct spdk_fs_cb_args *args = arg; 2324 2325 __wake_caller(args, bserrno); 2326 } 2327 2328 static void 2329 __file_extend_resize_cb(void *_args, int bserrno) 2330 { 2331 struct spdk_fs_cb_args *args = _args; 2332 struct spdk_file *file = args->file; 2333 2334 if (bserrno) { 2335 __wake_caller(args, bserrno); 2336 return; 2337 } 2338 2339 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2340 } 2341 2342 static void 2343 __file_extend_blob(void *_args) 2344 { 2345 struct spdk_fs_cb_args *args = _args; 2346 struct spdk_file *file = args->file; 2347 2348 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2349 } 2350 2351 static void 2352 __rw_from_file_done(void *ctx, int bserrno) 2353 { 2354 struct spdk_fs_request *req = ctx; 2355 2356 __wake_caller(&req->args, bserrno); 2357 free_fs_request(req); 2358 } 2359 2360 static void 2361 __rw_from_file(void *ctx) 2362 { 2363 struct spdk_fs_request *req = ctx; 2364 struct spdk_fs_cb_args *args = &req->args; 2365 struct spdk_file *file = args->file; 2366 2367 if (args->op.rw.is_read) { 2368 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2369 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2370 __rw_from_file_done, req); 2371 } else { 2372 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2373 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2374 __rw_from_file_done, req); 2375 } 2376 } 2377 2378 struct rw_from_file_arg { 2379 struct spdk_fs_channel *channel; 2380 int rwerrno; 2381 }; 2382 2383 static int 2384 __send_rw_from_file(struct spdk_file *file, void *payload, 2385 uint64_t offset, uint64_t length, bool is_read, 2386 struct rw_from_file_arg *arg) 2387 { 2388 struct spdk_fs_request *req; 2389 struct spdk_fs_cb_args *args; 2390 2391 req = alloc_fs_request_with_iov(arg->channel, 1); 2392 if (req == NULL) { 2393 sem_post(&arg->channel->sem); 2394 return -ENOMEM; 2395 } 2396 2397 args = &req->args; 2398 args->file = file; 2399 args->sem = &arg->channel->sem; 2400 args->iovs[0].iov_base = payload; 2401 args->iovs[0].iov_len = (size_t)length; 2402 args->op.rw.offset = offset; 2403 args->op.rw.is_read = is_read; 2404 args->rwerrno = &arg->rwerrno; 2405 file->fs->send_request(__rw_from_file, req); 2406 return 0; 2407 } 2408 2409 int 2410 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2411 void *payload, uint64_t offset, uint64_t length) 2412 { 2413 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2414 struct spdk_fs_request *flush_req; 2415 uint64_t rem_length, copy, blob_size, cluster_sz; 2416 uint32_t cache_buffers_filled = 0; 2417 uint8_t *cur_payload; 2418 struct cache_buffer *last; 2419 2420 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2421 2422 if (length == 0) { 2423 return 0; 2424 } 2425 2426 if (offset != file->append_pos) { 2427 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2428 return -EINVAL; 2429 } 2430 2431 pthread_spin_lock(&file->lock); 2432 file->open_for_writing = true; 2433 2434 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2435 cache_append_buffer(file); 2436 } 2437 2438 if (file->last == NULL) { 2439 struct rw_from_file_arg arg = {}; 2440 int rc; 2441 2442 arg.channel = channel; 2443 arg.rwerrno = 0; 2444 file->append_pos += length; 2445 pthread_spin_unlock(&file->lock); 2446 rc = __send_rw_from_file(file, payload, offset, length, false, &arg); 2447 if (rc != 0) { 2448 return rc; 2449 } 2450 sem_wait(&channel->sem); 2451 return arg.rwerrno; 2452 } 2453 2454 blob_size = __file_get_blob_size(file); 2455 2456 if ((offset + length) > blob_size) { 2457 struct spdk_fs_cb_args extend_args = {}; 2458 2459 cluster_sz = file->fs->bs_opts.cluster_sz; 2460 extend_args.sem = &channel->sem; 2461 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2462 extend_args.file = file; 2463 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2464 pthread_spin_unlock(&file->lock); 2465 file->fs->send_request(__file_extend_blob, &extend_args); 2466 sem_wait(&channel->sem); 2467 if (extend_args.rc) { 2468 return extend_args.rc; 2469 } 2470 pthread_spin_lock(&file->lock); 2471 } 2472 2473 flush_req = alloc_fs_request(channel); 2474 if (flush_req == NULL) { 2475 pthread_spin_unlock(&file->lock); 2476 return -ENOMEM; 2477 } 2478 2479 last = file->last; 2480 rem_length = length; 2481 cur_payload = payload; 2482 while (rem_length > 0) { 2483 copy = last->buf_size - last->bytes_filled; 2484 if (copy > rem_length) { 2485 copy = rem_length; 2486 } 2487 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2488 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2489 file->append_pos += copy; 2490 if (file->length < file->append_pos) { 2491 file->length = file->append_pos; 2492 } 2493 cur_payload += copy; 2494 last->bytes_filled += copy; 2495 rem_length -= copy; 2496 if (last->bytes_filled == last->buf_size) { 2497 cache_buffers_filled++; 2498 last = cache_append_buffer(file); 2499 if (last == NULL) { 2500 BLOBFS_TRACE(file, "nomem\n"); 2501 free_fs_request(flush_req); 2502 pthread_spin_unlock(&file->lock); 2503 return -ENOMEM; 2504 } 2505 } 2506 } 2507 2508 pthread_spin_unlock(&file->lock); 2509 2510 if (cache_buffers_filled == 0) { 2511 free_fs_request(flush_req); 2512 return 0; 2513 } 2514 2515 flush_req->args.file = file; 2516 file->fs->send_request(__file_flush, flush_req); 2517 return 0; 2518 } 2519 2520 static void 2521 __readahead_done(void *ctx, int bserrno) 2522 { 2523 struct spdk_fs_request *req = ctx; 2524 struct spdk_fs_cb_args *args = &req->args; 2525 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2526 struct spdk_file *file = args->file; 2527 2528 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2529 2530 pthread_spin_lock(&file->lock); 2531 cache_buffer->bytes_filled = args->op.readahead.length; 2532 cache_buffer->bytes_flushed = args->op.readahead.length; 2533 cache_buffer->in_progress = false; 2534 pthread_spin_unlock(&file->lock); 2535 2536 free_fs_request(req); 2537 } 2538 2539 static void 2540 __readahead(void *ctx) 2541 { 2542 struct spdk_fs_request *req = ctx; 2543 struct spdk_fs_cb_args *args = &req->args; 2544 struct spdk_file *file = args->file; 2545 uint64_t offset, length, start_lba, num_lba; 2546 uint32_t lba_size; 2547 2548 offset = args->op.readahead.offset; 2549 length = args->op.readahead.length; 2550 assert(length > 0); 2551 2552 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2553 2554 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2555 offset, length, start_lba, num_lba); 2556 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2557 args->op.readahead.cache_buffer->buf, 2558 start_lba, num_lba, __readahead_done, req); 2559 } 2560 2561 static uint64_t 2562 __next_cache_buffer_offset(uint64_t offset) 2563 { 2564 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2565 } 2566 2567 static void 2568 check_readahead(struct spdk_file *file, uint64_t offset, 2569 struct spdk_fs_channel *channel) 2570 { 2571 struct spdk_fs_request *req; 2572 struct spdk_fs_cb_args *args; 2573 2574 offset = __next_cache_buffer_offset(offset); 2575 if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2576 return; 2577 } 2578 2579 req = alloc_fs_request(channel); 2580 if (req == NULL) { 2581 return; 2582 } 2583 args = &req->args; 2584 2585 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2586 2587 args->file = file; 2588 args->op.readahead.offset = offset; 2589 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2590 if (!args->op.readahead.cache_buffer) { 2591 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2592 free_fs_request(req); 2593 return; 2594 } 2595 2596 args->op.readahead.cache_buffer->in_progress = true; 2597 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2598 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2599 } else { 2600 args->op.readahead.length = CACHE_BUFFER_SIZE; 2601 } 2602 file->fs->send_request(__readahead, req); 2603 } 2604 2605 int64_t 2606 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2607 void *payload, uint64_t offset, uint64_t length) 2608 { 2609 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2610 uint64_t final_offset, final_length; 2611 uint32_t sub_reads = 0; 2612 struct cache_buffer *buf; 2613 uint64_t read_len; 2614 struct rw_from_file_arg arg = {}; 2615 2616 pthread_spin_lock(&file->lock); 2617 2618 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2619 2620 file->open_for_writing = false; 2621 2622 if (length == 0 || offset >= file->append_pos) { 2623 pthread_spin_unlock(&file->lock); 2624 return 0; 2625 } 2626 2627 if (offset + length > file->append_pos) { 2628 length = file->append_pos - offset; 2629 } 2630 2631 if (offset != file->next_seq_offset) { 2632 file->seq_byte_count = 0; 2633 } 2634 file->seq_byte_count += length; 2635 file->next_seq_offset = offset + length; 2636 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2637 check_readahead(file, offset, channel); 2638 check_readahead(file, offset + CACHE_BUFFER_SIZE, channel); 2639 } 2640 2641 arg.channel = channel; 2642 arg.rwerrno = 0; 2643 final_length = 0; 2644 final_offset = offset + length; 2645 while (offset < final_offset) { 2646 int ret = 0; 2647 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2648 if (length > (final_offset - offset)) { 2649 length = final_offset - offset; 2650 } 2651 2652 buf = tree_find_filled_buffer(file->tree, offset); 2653 if (buf == NULL) { 2654 pthread_spin_unlock(&file->lock); 2655 ret = __send_rw_from_file(file, payload, offset, length, true, &arg); 2656 pthread_spin_lock(&file->lock); 2657 if (ret == 0) { 2658 sub_reads++; 2659 } 2660 } else { 2661 read_len = length; 2662 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2663 read_len = buf->offset + buf->bytes_filled - offset; 2664 } 2665 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len); 2666 memcpy(payload, &buf->buf[offset - buf->offset], read_len); 2667 if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) { 2668 tree_remove_buffer(file->tree, buf); 2669 if (file->tree->present_mask == 0) { 2670 spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file); 2671 } 2672 } 2673 } 2674 2675 if (ret == 0) { 2676 final_length += length; 2677 } else { 2678 arg.rwerrno = ret; 2679 break; 2680 } 2681 payload += length; 2682 offset += length; 2683 } 2684 pthread_spin_unlock(&file->lock); 2685 while (sub_reads > 0) { 2686 sem_wait(&channel->sem); 2687 sub_reads--; 2688 } 2689 if (arg.rwerrno == 0) { 2690 return final_length; 2691 } else { 2692 return arg.rwerrno; 2693 } 2694 } 2695 2696 static void 2697 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2698 spdk_file_op_complete cb_fn, void *cb_arg) 2699 { 2700 struct spdk_fs_request *sync_req; 2701 struct spdk_fs_request *flush_req; 2702 struct spdk_fs_cb_args *sync_args; 2703 struct spdk_fs_cb_args *flush_args; 2704 2705 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2706 2707 pthread_spin_lock(&file->lock); 2708 if (file->append_pos <= file->length_xattr) { 2709 BLOBFS_TRACE(file, "done - file already synced\n"); 2710 pthread_spin_unlock(&file->lock); 2711 cb_fn(cb_arg, 0); 2712 return; 2713 } 2714 2715 sync_req = alloc_fs_request(channel); 2716 if (!sync_req) { 2717 SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name); 2718 pthread_spin_unlock(&file->lock); 2719 cb_fn(cb_arg, -ENOMEM); 2720 return; 2721 } 2722 sync_args = &sync_req->args; 2723 2724 flush_req = alloc_fs_request(channel); 2725 if (!flush_req) { 2726 SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name); 2727 free_fs_request(sync_req); 2728 pthread_spin_unlock(&file->lock); 2729 cb_fn(cb_arg, -ENOMEM); 2730 return; 2731 } 2732 flush_args = &flush_req->args; 2733 2734 sync_args->file = file; 2735 sync_args->fn.file_op = cb_fn; 2736 sync_args->arg = cb_arg; 2737 sync_args->op.sync.offset = file->append_pos; 2738 sync_args->op.sync.xattr_in_progress = false; 2739 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2740 pthread_spin_unlock(&file->lock); 2741 2742 flush_args->file = file; 2743 channel->send_request(__file_flush, flush_req); 2744 } 2745 2746 int 2747 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2748 { 2749 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2750 struct spdk_fs_cb_args args = {}; 2751 2752 args.sem = &channel->sem; 2753 _file_sync(file, channel, __wake_caller, &args); 2754 sem_wait(&channel->sem); 2755 2756 return args.rc; 2757 } 2758 2759 void 2760 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2761 spdk_file_op_complete cb_fn, void *cb_arg) 2762 { 2763 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2764 2765 _file_sync(file, channel, cb_fn, cb_arg); 2766 } 2767 2768 void 2769 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2770 { 2771 BLOBFS_TRACE(file, "priority=%u\n", priority); 2772 file->priority = priority; 2773 2774 } 2775 2776 /* 2777 * Close routines 2778 */ 2779 2780 static void 2781 __file_close_async_done(void *ctx, int bserrno) 2782 { 2783 struct spdk_fs_request *req = ctx; 2784 struct spdk_fs_cb_args *args = &req->args; 2785 struct spdk_file *file = args->file; 2786 2787 spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->name); 2788 2789 if (file->is_deleted) { 2790 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2791 return; 2792 } 2793 2794 args->fn.file_op(args->arg, bserrno); 2795 free_fs_request(req); 2796 } 2797 2798 static void 2799 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2800 { 2801 struct spdk_blob *blob; 2802 2803 pthread_spin_lock(&file->lock); 2804 if (file->ref_count == 0) { 2805 pthread_spin_unlock(&file->lock); 2806 __file_close_async_done(req, -EBADF); 2807 return; 2808 } 2809 2810 file->ref_count--; 2811 if (file->ref_count > 0) { 2812 pthread_spin_unlock(&file->lock); 2813 req->args.fn.file_op(req->args.arg, 0); 2814 free_fs_request(req); 2815 return; 2816 } 2817 2818 pthread_spin_unlock(&file->lock); 2819 2820 blob = file->blob; 2821 file->blob = NULL; 2822 spdk_blob_close(blob, __file_close_async_done, req); 2823 } 2824 2825 static void 2826 __file_close_async__sync_done(void *arg, int fserrno) 2827 { 2828 struct spdk_fs_request *req = arg; 2829 struct spdk_fs_cb_args *args = &req->args; 2830 2831 __file_close_async(args->file, req); 2832 } 2833 2834 void 2835 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2836 { 2837 struct spdk_fs_request *req; 2838 struct spdk_fs_cb_args *args; 2839 2840 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2841 if (req == NULL) { 2842 SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name); 2843 cb_fn(cb_arg, -ENOMEM); 2844 return; 2845 } 2846 2847 args = &req->args; 2848 args->file = file; 2849 args->fn.file_op = cb_fn; 2850 args->arg = cb_arg; 2851 2852 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2853 } 2854 2855 static void 2856 __file_close(void *arg) 2857 { 2858 struct spdk_fs_request *req = arg; 2859 struct spdk_fs_cb_args *args = &req->args; 2860 struct spdk_file *file = args->file; 2861 2862 __file_close_async(file, req); 2863 } 2864 2865 int 2866 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2867 { 2868 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2869 struct spdk_fs_request *req; 2870 struct spdk_fs_cb_args *args; 2871 2872 req = alloc_fs_request(channel); 2873 if (req == NULL) { 2874 SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name); 2875 return -ENOMEM; 2876 } 2877 2878 args = &req->args; 2879 2880 spdk_file_sync(file, ctx); 2881 BLOBFS_TRACE(file, "name=%s\n", file->name); 2882 args->file = file; 2883 args->sem = &channel->sem; 2884 args->fn.file_op = __wake_caller; 2885 args->arg = args; 2886 channel->send_request(__file_close, req); 2887 sem_wait(&channel->sem); 2888 2889 return args->rc; 2890 } 2891 2892 int 2893 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2894 { 2895 if (size < sizeof(spdk_blob_id)) { 2896 return -EINVAL; 2897 } 2898 2899 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2900 2901 return sizeof(spdk_blob_id); 2902 } 2903 2904 static void 2905 _file_free(void *ctx) 2906 { 2907 struct spdk_file *file = ctx; 2908 2909 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2910 2911 free(file->name); 2912 free(file->tree); 2913 free(file); 2914 } 2915 2916 static void 2917 file_free(struct spdk_file *file) 2918 { 2919 BLOBFS_TRACE(file, "free=%s\n", file->name); 2920 pthread_spin_lock(&file->lock); 2921 if (file->tree->present_mask == 0) { 2922 pthread_spin_unlock(&file->lock); 2923 free(file->name); 2924 free(file->tree); 2925 free(file); 2926 return; 2927 } 2928 2929 tree_free_buffers(file->tree); 2930 assert(file->tree->present_mask == 0); 2931 spdk_thread_send_msg(g_cache_pool_thread, _file_free, file); 2932 pthread_spin_unlock(&file->lock); 2933 } 2934 2935 SPDK_LOG_REGISTER_COMPONENT(blobfs) 2936 SPDK_LOG_REGISTER_COMPONENT(blobfs_rw) 2937