1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2017 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 8 #include "spdk/blobfs.h" 9 #include "cache_tree.h" 10 11 #include "spdk/queue.h" 12 #include "spdk/thread.h" 13 #include "spdk/assert.h" 14 #include "spdk/env.h" 15 #include "spdk/util.h" 16 #include "spdk/log.h" 17 #include "spdk/trace.h" 18 19 #include "spdk_internal/trace_defs.h" 20 21 #define BLOBFS_TRACE(file, str, args...) \ 22 SPDK_DEBUGLOG(blobfs, "file=%s " str, file->name, ##args) 23 24 #define BLOBFS_TRACE_RW(file, str, args...) \ 25 SPDK_DEBUGLOG(blobfs_rw, "file=%s " str, file->name, ##args) 26 27 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 28 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 29 30 #define SPDK_BLOBFS_SIGNATURE "BLOBFS" 31 32 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 33 static struct spdk_mempool *g_cache_pool; 34 static TAILQ_HEAD(, spdk_file) g_caches = TAILQ_HEAD_INITIALIZER(g_caches); 35 static struct spdk_poller *g_cache_pool_mgmt_poller; 36 static struct spdk_thread *g_cache_pool_thread; 37 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL 38 static int g_fs_count = 0; 39 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 40 41 static void 42 blobfs_trace(void) 43 { 44 struct spdk_trace_tpoint_opts opts[] = { 45 { 46 "BLOBFS_XATTR_START", TRACE_BLOBFS_XATTR_START, 47 OWNER_TYPE_NONE, OBJECT_NONE, 0, 48 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 49 }, 50 { 51 "BLOBFS_XATTR_END", TRACE_BLOBFS_XATTR_END, 52 OWNER_TYPE_NONE, OBJECT_NONE, 0, 53 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 54 }, 55 { 56 "BLOBFS_OPEN", TRACE_BLOBFS_OPEN, 57 OWNER_TYPE_NONE, OBJECT_NONE, 0, 58 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 59 }, 60 { 61 "BLOBFS_CLOSE", TRACE_BLOBFS_CLOSE, 62 OWNER_TYPE_NONE, OBJECT_NONE, 0, 63 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 64 }, 65 { 66 "BLOBFS_DELETE_START", TRACE_BLOBFS_DELETE_START, 67 OWNER_TYPE_NONE, OBJECT_NONE, 0, 68 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 69 }, 70 { 71 "BLOBFS_DELETE_DONE", TRACE_BLOBFS_DELETE_DONE, 72 OWNER_TYPE_NONE, OBJECT_NONE, 0, 73 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 74 } 75 }; 76 77 spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts)); 78 } 79 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS) 80 81 void 82 cache_buffer_free(struct cache_buffer *cache_buffer) 83 { 84 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 85 free(cache_buffer); 86 } 87 88 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 89 90 struct spdk_file { 91 struct spdk_filesystem *fs; 92 struct spdk_blob *blob; 93 char *name; 94 uint64_t length; 95 bool is_deleted; 96 bool open_for_writing; 97 uint64_t length_flushed; 98 uint64_t length_xattr; 99 uint64_t append_pos; 100 uint64_t seq_byte_count; 101 uint64_t next_seq_offset; 102 uint32_t priority; 103 TAILQ_ENTRY(spdk_file) tailq; 104 spdk_blob_id blobid; 105 uint32_t ref_count; 106 pthread_spinlock_t lock; 107 struct cache_buffer *last; 108 struct cache_tree *tree; 109 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 110 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 111 TAILQ_ENTRY(spdk_file) cache_tailq; 112 }; 113 114 struct spdk_deleted_file { 115 spdk_blob_id id; 116 TAILQ_ENTRY(spdk_deleted_file) tailq; 117 }; 118 119 struct spdk_filesystem { 120 struct spdk_blob_store *bs; 121 TAILQ_HEAD(, spdk_file) files; 122 struct spdk_bs_opts bs_opts; 123 struct spdk_bs_dev *bdev; 124 fs_send_request_fn send_request; 125 126 struct { 127 uint32_t max_ops; 128 struct spdk_io_channel *sync_io_channel; 129 struct spdk_fs_channel *sync_fs_channel; 130 } sync_target; 131 132 struct { 133 uint32_t max_ops; 134 struct spdk_io_channel *md_io_channel; 135 struct spdk_fs_channel *md_fs_channel; 136 } md_target; 137 138 struct { 139 uint32_t max_ops; 140 } io_target; 141 }; 142 143 struct spdk_fs_cb_args { 144 union { 145 spdk_fs_op_with_handle_complete fs_op_with_handle; 146 spdk_fs_op_complete fs_op; 147 spdk_file_op_with_handle_complete file_op_with_handle; 148 spdk_file_op_complete file_op; 149 spdk_file_stat_op_complete stat_op; 150 } fn; 151 void *arg; 152 sem_t *sem; 153 struct spdk_filesystem *fs; 154 struct spdk_file *file; 155 int rc; 156 int *rwerrno; 157 struct iovec *iovs; 158 uint32_t iovcnt; 159 struct iovec iov; 160 union { 161 struct { 162 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 163 } fs_load; 164 struct { 165 uint64_t length; 166 } truncate; 167 struct { 168 struct spdk_io_channel *channel; 169 void *pin_buf; 170 int is_read; 171 off_t offset; 172 size_t length; 173 uint64_t start_lba; 174 uint64_t num_lba; 175 uint32_t blocklen; 176 } rw; 177 struct { 178 const char *old_name; 179 const char *new_name; 180 } rename; 181 struct { 182 struct cache_buffer *cache_buffer; 183 uint64_t length; 184 } flush; 185 struct { 186 struct cache_buffer *cache_buffer; 187 uint64_t length; 188 uint64_t offset; 189 } readahead; 190 struct { 191 /* offset of the file when the sync request was made */ 192 uint64_t offset; 193 TAILQ_ENTRY(spdk_fs_request) tailq; 194 bool xattr_in_progress; 195 /* length written to the xattr for this file - this should 196 * always be the same as the offset if only one thread is 197 * writing to the file, but could differ if multiple threads 198 * are appending 199 */ 200 uint64_t length; 201 } sync; 202 struct { 203 uint32_t num_clusters; 204 } resize; 205 struct { 206 const char *name; 207 uint32_t flags; 208 TAILQ_ENTRY(spdk_fs_request) tailq; 209 } open; 210 struct { 211 const char *name; 212 struct spdk_blob *blob; 213 } create; 214 struct { 215 const char *name; 216 } delete; 217 struct { 218 const char *name; 219 } stat; 220 } op; 221 }; 222 223 static void file_free(struct spdk_file *file); 224 static void fs_io_device_unregister(struct spdk_filesystem *fs); 225 static void fs_free_io_channels(struct spdk_filesystem *fs); 226 227 void 228 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 229 { 230 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 231 } 232 233 static int _blobfs_cache_pool_reclaim(void *arg); 234 235 static bool 236 blobfs_cache_pool_need_reclaim(void) 237 { 238 size_t count; 239 240 count = spdk_mempool_count(g_cache_pool); 241 /* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller 242 * when the number of available cache buffer is less than 1/5 of total buffers. 243 */ 244 if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) { 245 return false; 246 } 247 248 return true; 249 } 250 251 static void 252 __start_cache_pool_mgmt(void *ctx) 253 { 254 assert(g_cache_pool_mgmt_poller == NULL); 255 g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL, 256 BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 257 } 258 259 static void 260 __stop_cache_pool_mgmt(void *ctx) 261 { 262 spdk_poller_unregister(&g_cache_pool_mgmt_poller); 263 264 assert(g_cache_pool != NULL); 265 assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE); 266 spdk_mempool_free(g_cache_pool); 267 g_cache_pool = NULL; 268 269 spdk_thread_exit(g_cache_pool_thread); 270 } 271 272 static void 273 allocate_cache_pool(void) 274 { 275 assert(g_cache_pool == NULL); 276 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 277 g_fs_cache_size / CACHE_BUFFER_SIZE, 278 CACHE_BUFFER_SIZE, 279 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 280 SPDK_ENV_NUMA_ID_ANY); 281 if (!g_cache_pool) { 282 if (spdk_mempool_lookup("spdk_fs_cache") != NULL) { 283 SPDK_ERRLOG("Unable to allocate mempool: already exists\n"); 284 SPDK_ERRLOG("Probably running in multiprocess environment, which is " 285 "unsupported by the blobfs library\n"); 286 } else { 287 SPDK_ERRLOG("Create mempool failed, you may " 288 "increase the memory and try again\n"); 289 } 290 assert(false); 291 } 292 } 293 294 static void 295 initialize_global_cache(void) 296 { 297 pthread_mutex_lock(&g_cache_init_lock); 298 if (g_fs_count == 0) { 299 allocate_cache_pool(); 300 g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL); 301 assert(g_cache_pool_thread != NULL); 302 spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL); 303 } 304 g_fs_count++; 305 pthread_mutex_unlock(&g_cache_init_lock); 306 } 307 308 static void 309 free_global_cache(void) 310 { 311 pthread_mutex_lock(&g_cache_init_lock); 312 g_fs_count--; 313 if (g_fs_count == 0) { 314 spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL); 315 } 316 pthread_mutex_unlock(&g_cache_init_lock); 317 } 318 319 static uint64_t 320 __file_get_blob_size(struct spdk_file *file) 321 { 322 uint64_t cluster_sz; 323 324 cluster_sz = file->fs->bs_opts.cluster_sz; 325 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 326 } 327 328 struct spdk_fs_request { 329 struct spdk_fs_cb_args args; 330 TAILQ_ENTRY(spdk_fs_request) link; 331 struct spdk_fs_channel *channel; 332 }; 333 334 struct spdk_fs_channel { 335 struct spdk_fs_request *req_mem; 336 TAILQ_HEAD(, spdk_fs_request) reqs; 337 sem_t sem; 338 struct spdk_filesystem *fs; 339 struct spdk_io_channel *bs_channel; 340 fs_send_request_fn send_request; 341 bool sync; 342 uint32_t outstanding_reqs; 343 pthread_spinlock_t lock; 344 }; 345 346 /* For now, this is effectively an alias. But eventually we'll shift 347 * some data members over. */ 348 struct spdk_fs_thread_ctx { 349 struct spdk_fs_channel ch; 350 }; 351 352 static struct spdk_fs_request * 353 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt) 354 { 355 struct spdk_fs_request *req; 356 struct iovec *iovs = NULL; 357 358 if (iovcnt > 1) { 359 iovs = calloc(iovcnt, sizeof(struct iovec)); 360 if (!iovs) { 361 return NULL; 362 } 363 } 364 365 if (channel->sync) { 366 pthread_spin_lock(&channel->lock); 367 } 368 369 req = TAILQ_FIRST(&channel->reqs); 370 if (req) { 371 channel->outstanding_reqs++; 372 TAILQ_REMOVE(&channel->reqs, req, link); 373 } 374 375 if (channel->sync) { 376 pthread_spin_unlock(&channel->lock); 377 } 378 379 if (req == NULL) { 380 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 381 free(iovs); 382 return NULL; 383 } 384 memset(req, 0, sizeof(*req)); 385 req->channel = channel; 386 if (iovcnt > 1) { 387 req->args.iovs = iovs; 388 } else { 389 req->args.iovs = &req->args.iov; 390 } 391 req->args.iovcnt = iovcnt; 392 393 return req; 394 } 395 396 static struct spdk_fs_request * 397 alloc_fs_request(struct spdk_fs_channel *channel) 398 { 399 return alloc_fs_request_with_iov(channel, 0); 400 } 401 402 static void 403 free_fs_request(struct spdk_fs_request *req) 404 { 405 struct spdk_fs_channel *channel = req->channel; 406 407 if (req->args.iovcnt > 1) { 408 free(req->args.iovs); 409 } 410 411 if (channel->sync) { 412 pthread_spin_lock(&channel->lock); 413 } 414 415 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 416 channel->outstanding_reqs--; 417 418 if (channel->sync) { 419 pthread_spin_unlock(&channel->lock); 420 } 421 } 422 423 static int 424 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 425 uint32_t max_ops) 426 { 427 uint32_t i; 428 429 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 430 if (!channel->req_mem) { 431 return -1; 432 } 433 434 channel->outstanding_reqs = 0; 435 TAILQ_INIT(&channel->reqs); 436 sem_init(&channel->sem, 0, 0); 437 438 for (i = 0; i < max_ops; i++) { 439 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 440 } 441 442 channel->fs = fs; 443 444 return 0; 445 } 446 447 static int 448 fs_md_channel_create(void *io_device, void *ctx_buf) 449 { 450 struct spdk_filesystem *fs; 451 struct spdk_fs_channel *channel = ctx_buf; 452 453 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 454 455 return fs_channel_create(fs, channel, fs->md_target.max_ops); 456 } 457 458 static int 459 fs_sync_channel_create(void *io_device, void *ctx_buf) 460 { 461 struct spdk_filesystem *fs; 462 struct spdk_fs_channel *channel = ctx_buf; 463 464 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 465 466 return fs_channel_create(fs, channel, fs->sync_target.max_ops); 467 } 468 469 static int 470 fs_io_channel_create(void *io_device, void *ctx_buf) 471 { 472 struct spdk_filesystem *fs; 473 struct spdk_fs_channel *channel = ctx_buf; 474 475 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 476 477 return fs_channel_create(fs, channel, fs->io_target.max_ops); 478 } 479 480 static void 481 fs_channel_destroy(void *io_device, void *ctx_buf) 482 { 483 struct spdk_fs_channel *channel = ctx_buf; 484 485 if (channel->outstanding_reqs > 0) { 486 SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n", 487 channel->outstanding_reqs); 488 } 489 490 free(channel->req_mem); 491 if (channel->bs_channel != NULL) { 492 spdk_bs_free_io_channel(channel->bs_channel); 493 } 494 } 495 496 static void 497 __send_request_direct(fs_request_fn fn, void *arg) 498 { 499 fn(arg); 500 } 501 502 static void 503 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 504 { 505 fs->bs = bs; 506 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 507 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 508 fs->md_target.md_fs_channel->send_request = __send_request_direct; 509 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 510 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 511 512 initialize_global_cache(); 513 } 514 515 static void 516 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 517 { 518 struct spdk_fs_request *req = ctx; 519 struct spdk_fs_cb_args *args = &req->args; 520 struct spdk_filesystem *fs = args->fs; 521 522 if (bserrno == 0) { 523 common_fs_bs_init(fs, bs); 524 } else { 525 free(fs); 526 fs = NULL; 527 } 528 529 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 530 free_fs_request(req); 531 } 532 533 static struct spdk_filesystem * 534 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 535 { 536 struct spdk_filesystem *fs; 537 538 fs = calloc(1, sizeof(*fs)); 539 if (fs == NULL) { 540 return NULL; 541 } 542 543 fs->bdev = dev; 544 fs->send_request = send_request_fn; 545 TAILQ_INIT(&fs->files); 546 547 fs->md_target.max_ops = 512; 548 spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy, 549 sizeof(struct spdk_fs_channel), "blobfs_md"); 550 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 551 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 552 553 fs->sync_target.max_ops = 512; 554 spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy, 555 sizeof(struct spdk_fs_channel), "blobfs_sync"); 556 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 557 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 558 559 fs->io_target.max_ops = 512; 560 spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy, 561 sizeof(struct spdk_fs_channel), "blobfs_io"); 562 563 return fs; 564 } 565 566 static void 567 __wake_caller(void *arg, int fserrno) 568 { 569 struct spdk_fs_cb_args *args = arg; 570 571 if ((args->rwerrno != NULL) && (*(args->rwerrno) == 0) && fserrno) { 572 *(args->rwerrno) = fserrno; 573 } 574 args->rc = fserrno; 575 sem_post(args->sem); 576 } 577 578 void 579 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 580 fs_send_request_fn send_request_fn, 581 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 582 { 583 struct spdk_filesystem *fs; 584 struct spdk_fs_request *req; 585 struct spdk_fs_cb_args *args; 586 struct spdk_bs_opts opts = {}; 587 588 fs = fs_alloc(dev, send_request_fn); 589 if (fs == NULL) { 590 cb_fn(cb_arg, NULL, -ENOMEM); 591 return; 592 } 593 594 req = alloc_fs_request(fs->md_target.md_fs_channel); 595 if (req == NULL) { 596 fs_free_io_channels(fs); 597 fs_io_device_unregister(fs); 598 cb_fn(cb_arg, NULL, -ENOMEM); 599 return; 600 } 601 602 args = &req->args; 603 args->fn.fs_op_with_handle = cb_fn; 604 args->arg = cb_arg; 605 args->fs = fs; 606 607 spdk_bs_opts_init(&opts, sizeof(opts)); 608 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE); 609 if (opt) { 610 opts.cluster_sz = opt->cluster_sz; 611 } 612 spdk_bs_init(dev, &opts, init_cb, req); 613 } 614 615 static struct spdk_file * 616 file_alloc(struct spdk_filesystem *fs) 617 { 618 struct spdk_file *file; 619 620 file = calloc(1, sizeof(*file)); 621 if (file == NULL) { 622 return NULL; 623 } 624 625 file->tree = calloc(1, sizeof(*file->tree)); 626 if (file->tree == NULL) { 627 free(file); 628 return NULL; 629 } 630 631 if (pthread_spin_init(&file->lock, 0)) { 632 free(file->tree); 633 free(file); 634 return NULL; 635 } 636 637 file->fs = fs; 638 TAILQ_INIT(&file->open_requests); 639 TAILQ_INIT(&file->sync_requests); 640 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 641 file->priority = SPDK_FILE_PRIORITY_LOW; 642 return file; 643 } 644 645 static void fs_load_done(void *ctx, int bserrno); 646 647 static int 648 _handle_deleted_files(struct spdk_fs_request *req) 649 { 650 struct spdk_fs_cb_args *args = &req->args; 651 struct spdk_filesystem *fs = args->fs; 652 653 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 654 struct spdk_deleted_file *deleted_file; 655 656 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 657 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 658 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 659 free(deleted_file); 660 return 0; 661 } 662 663 return 1; 664 } 665 666 static void 667 fs_load_done(void *ctx, int bserrno) 668 { 669 struct spdk_fs_request *req = ctx; 670 struct spdk_fs_cb_args *args = &req->args; 671 struct spdk_filesystem *fs = args->fs; 672 673 /* The filesystem has been loaded. Now check if there are any files that 674 * were marked for deletion before last unload. Do not complete the 675 * fs_load callback until all of them have been deleted on disk. 676 */ 677 if (_handle_deleted_files(req) == 0) { 678 /* We found a file that's been marked for deleting but not actually 679 * deleted yet. This function will get called again once the delete 680 * operation is completed. 681 */ 682 return; 683 } 684 685 args->fn.fs_op_with_handle(args->arg, fs, 0); 686 free_fs_request(req); 687 688 } 689 690 static void 691 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 692 { 693 struct spdk_fs_request *req = ctx; 694 struct spdk_fs_cb_args *args = &req->args; 695 struct spdk_filesystem *fs = args->fs; 696 uint64_t *length; 697 const char *name; 698 uint32_t *is_deleted; 699 size_t value_len; 700 701 if (rc < 0) { 702 args->fn.fs_op_with_handle(args->arg, fs, rc); 703 free_fs_request(req); 704 return; 705 } 706 707 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 708 if (rc < 0) { 709 args->fn.fs_op_with_handle(args->arg, fs, rc); 710 free_fs_request(req); 711 return; 712 } 713 714 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 715 if (rc < 0) { 716 args->fn.fs_op_with_handle(args->arg, fs, rc); 717 free_fs_request(req); 718 return; 719 } 720 721 assert(value_len == 8); 722 723 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 724 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 725 if (rc < 0) { 726 struct spdk_file *f; 727 728 f = file_alloc(fs); 729 if (f == NULL) { 730 SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n"); 731 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 732 free_fs_request(req); 733 return; 734 } 735 736 f->name = strdup(name); 737 if (!f->name) { 738 SPDK_ERRLOG("Cannot allocate memory for file name\n"); 739 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 740 free_fs_request(req); 741 file_free(f); 742 return; 743 } 744 745 f->blobid = spdk_blob_get_id(blob); 746 f->length = *length; 747 f->length_flushed = *length; 748 f->length_xattr = *length; 749 f->append_pos = *length; 750 SPDK_DEBUGLOG(blobfs, "added file %s length=%ju\n", f->name, f->length); 751 } else { 752 struct spdk_deleted_file *deleted_file; 753 754 deleted_file = calloc(1, sizeof(*deleted_file)); 755 if (deleted_file == NULL) { 756 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 757 free_fs_request(req); 758 return; 759 } 760 deleted_file->id = spdk_blob_get_id(blob); 761 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 762 } 763 } 764 765 static void 766 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 767 { 768 struct spdk_fs_request *req = ctx; 769 struct spdk_fs_cb_args *args = &req->args; 770 struct spdk_filesystem *fs = args->fs; 771 struct spdk_bs_type bstype; 772 static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE}; 773 static const struct spdk_bs_type zeros; 774 775 if (bserrno != 0) { 776 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 777 free_fs_request(req); 778 fs_free_io_channels(fs); 779 fs_io_device_unregister(fs); 780 return; 781 } 782 783 bstype = spdk_bs_get_bstype(bs); 784 785 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 786 SPDK_DEBUGLOG(blobfs, "assigning bstype\n"); 787 spdk_bs_set_bstype(bs, blobfs_type); 788 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 789 SPDK_ERRLOG("not blobfs\n"); 790 SPDK_LOGDUMP(blobfs, "bstype", &bstype, sizeof(bstype)); 791 args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL); 792 free_fs_request(req); 793 fs_free_io_channels(fs); 794 fs_io_device_unregister(fs); 795 return; 796 } 797 798 common_fs_bs_init(fs, bs); 799 fs_load_done(req, 0); 800 } 801 802 static void 803 fs_io_device_unregister(struct spdk_filesystem *fs) 804 { 805 assert(fs != NULL); 806 spdk_io_device_unregister(&fs->md_target, NULL); 807 spdk_io_device_unregister(&fs->sync_target, NULL); 808 spdk_io_device_unregister(&fs->io_target, NULL); 809 free(fs); 810 } 811 812 static void 813 fs_free_io_channels(struct spdk_filesystem *fs) 814 { 815 assert(fs != NULL); 816 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 817 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 818 } 819 820 void 821 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 822 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 823 { 824 struct spdk_filesystem *fs; 825 struct spdk_fs_cb_args *args; 826 struct spdk_fs_request *req; 827 struct spdk_bs_opts bs_opts; 828 829 fs = fs_alloc(dev, send_request_fn); 830 if (fs == NULL) { 831 cb_fn(cb_arg, NULL, -ENOMEM); 832 return; 833 } 834 835 req = alloc_fs_request(fs->md_target.md_fs_channel); 836 if (req == NULL) { 837 fs_free_io_channels(fs); 838 fs_io_device_unregister(fs); 839 cb_fn(cb_arg, NULL, -ENOMEM); 840 return; 841 } 842 843 args = &req->args; 844 args->fn.fs_op_with_handle = cb_fn; 845 args->arg = cb_arg; 846 args->fs = fs; 847 TAILQ_INIT(&args->op.fs_load.deleted_files); 848 spdk_bs_opts_init(&bs_opts, sizeof(bs_opts)); 849 bs_opts.iter_cb_fn = iter_cb; 850 bs_opts.iter_cb_arg = req; 851 spdk_bs_load(dev, &bs_opts, load_cb, req); 852 } 853 854 static void 855 unload_cb(void *ctx, int bserrno) 856 { 857 struct spdk_fs_request *req = ctx; 858 struct spdk_fs_cb_args *args = &req->args; 859 struct spdk_filesystem *fs = args->fs; 860 struct spdk_file *file, *tmp; 861 862 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 863 TAILQ_REMOVE(&fs->files, file, tailq); 864 file_free(file); 865 } 866 867 free_global_cache(); 868 869 args->fn.fs_op(args->arg, bserrno); 870 free(req); 871 872 fs_io_device_unregister(fs); 873 } 874 875 void 876 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 877 { 878 struct spdk_fs_request *req; 879 struct spdk_fs_cb_args *args; 880 881 /* 882 * We must free the md_channel before unloading the blobstore, so just 883 * allocate this request from the general heap. 884 */ 885 req = calloc(1, sizeof(*req)); 886 if (req == NULL) { 887 cb_fn(cb_arg, -ENOMEM); 888 return; 889 } 890 891 args = &req->args; 892 args->fn.fs_op = cb_fn; 893 args->arg = cb_arg; 894 args->fs = fs; 895 896 fs_free_io_channels(fs); 897 spdk_bs_unload(fs->bs, unload_cb, req); 898 } 899 900 static struct spdk_file * 901 fs_find_file(struct spdk_filesystem *fs, const char *name) 902 { 903 struct spdk_file *file; 904 905 TAILQ_FOREACH(file, &fs->files, tailq) { 906 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 907 return file; 908 } 909 } 910 911 return NULL; 912 } 913 914 void 915 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 916 spdk_file_stat_op_complete cb_fn, void *cb_arg) 917 { 918 struct spdk_file_stat stat; 919 struct spdk_file *f = NULL; 920 921 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 922 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 923 return; 924 } 925 926 f = fs_find_file(fs, name); 927 if (f != NULL) { 928 stat.blobid = f->blobid; 929 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 930 cb_fn(cb_arg, &stat, 0); 931 return; 932 } 933 934 cb_fn(cb_arg, NULL, -ENOENT); 935 } 936 937 static void 938 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 939 { 940 struct spdk_fs_request *req = arg; 941 struct spdk_fs_cb_args *args = &req->args; 942 943 args->rc = fserrno; 944 if (fserrno == 0) { 945 memcpy(args->arg, stat, sizeof(*stat)); 946 } 947 sem_post(args->sem); 948 } 949 950 static void 951 __file_stat(void *arg) 952 { 953 struct spdk_fs_request *req = arg; 954 struct spdk_fs_cb_args *args = &req->args; 955 956 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 957 args->fn.stat_op, req); 958 } 959 960 int 961 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 962 const char *name, struct spdk_file_stat *stat) 963 { 964 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 965 struct spdk_fs_request *req; 966 int rc; 967 968 req = alloc_fs_request(channel); 969 if (req == NULL) { 970 SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name); 971 return -ENOMEM; 972 } 973 974 req->args.fs = fs; 975 req->args.op.stat.name = name; 976 req->args.fn.stat_op = __copy_stat; 977 req->args.arg = stat; 978 req->args.sem = &channel->sem; 979 channel->send_request(__file_stat, req); 980 sem_wait(&channel->sem); 981 982 rc = req->args.rc; 983 free_fs_request(req); 984 985 return rc; 986 } 987 988 static void 989 fs_create_blob_close_cb(void *ctx, int bserrno) 990 { 991 int rc; 992 struct spdk_fs_request *req = ctx; 993 struct spdk_fs_cb_args *args = &req->args; 994 995 rc = args->rc ? args->rc : bserrno; 996 args->fn.file_op(args->arg, rc); 997 free_fs_request(req); 998 } 999 1000 static void 1001 fs_create_blob_resize_cb(void *ctx, int bserrno) 1002 { 1003 struct spdk_fs_request *req = ctx; 1004 struct spdk_fs_cb_args *args = &req->args; 1005 struct spdk_file *f = args->file; 1006 struct spdk_blob *blob = args->op.create.blob; 1007 uint64_t length = 0; 1008 1009 args->rc = bserrno; 1010 if (bserrno) { 1011 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1012 return; 1013 } 1014 1015 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 1016 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 1017 1018 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1019 } 1020 1021 static void 1022 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1023 { 1024 struct spdk_fs_request *req = ctx; 1025 struct spdk_fs_cb_args *args = &req->args; 1026 1027 if (bserrno) { 1028 args->fn.file_op(args->arg, bserrno); 1029 free_fs_request(req); 1030 return; 1031 } 1032 1033 args->op.create.blob = blob; 1034 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 1035 } 1036 1037 static void 1038 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 1039 { 1040 struct spdk_fs_request *req = ctx; 1041 struct spdk_fs_cb_args *args = &req->args; 1042 struct spdk_file *f = args->file; 1043 1044 if (bserrno) { 1045 args->fn.file_op(args->arg, bserrno); 1046 free_fs_request(req); 1047 return; 1048 } 1049 1050 f->blobid = blobid; 1051 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 1052 } 1053 1054 void 1055 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 1056 spdk_file_op_complete cb_fn, void *cb_arg) 1057 { 1058 struct spdk_file *file; 1059 struct spdk_fs_request *req; 1060 struct spdk_fs_cb_args *args; 1061 1062 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1063 cb_fn(cb_arg, -ENAMETOOLONG); 1064 return; 1065 } 1066 1067 file = fs_find_file(fs, name); 1068 if (file != NULL) { 1069 cb_fn(cb_arg, -EEXIST); 1070 return; 1071 } 1072 1073 file = file_alloc(fs); 1074 if (file == NULL) { 1075 SPDK_ERRLOG("Cannot allocate new file for creation\n"); 1076 cb_fn(cb_arg, -ENOMEM); 1077 return; 1078 } 1079 1080 req = alloc_fs_request(fs->md_target.md_fs_channel); 1081 if (req == NULL) { 1082 SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name); 1083 TAILQ_REMOVE(&fs->files, file, tailq); 1084 file_free(file); 1085 cb_fn(cb_arg, -ENOMEM); 1086 return; 1087 } 1088 1089 args = &req->args; 1090 args->file = file; 1091 args->fn.file_op = cb_fn; 1092 args->arg = cb_arg; 1093 1094 file->name = strdup(name); 1095 if (!file->name) { 1096 SPDK_ERRLOG("Cannot allocate file->name for file=%s\n", name); 1097 free_fs_request(req); 1098 TAILQ_REMOVE(&fs->files, file, tailq); 1099 file_free(file); 1100 cb_fn(cb_arg, -ENOMEM); 1101 return; 1102 } 1103 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 1104 } 1105 1106 static void 1107 __fs_create_file_done(void *arg, int fserrno) 1108 { 1109 struct spdk_fs_request *req = arg; 1110 struct spdk_fs_cb_args *args = &req->args; 1111 1112 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name); 1113 __wake_caller(args, fserrno); 1114 } 1115 1116 static void 1117 __fs_create_file(void *arg) 1118 { 1119 struct spdk_fs_request *req = arg; 1120 struct spdk_fs_cb_args *args = &req->args; 1121 1122 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name); 1123 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1124 } 1125 1126 int 1127 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name) 1128 { 1129 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1130 struct spdk_fs_request *req; 1131 struct spdk_fs_cb_args *args; 1132 int rc; 1133 1134 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1135 1136 req = alloc_fs_request(channel); 1137 if (req == NULL) { 1138 SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name); 1139 return -ENOMEM; 1140 } 1141 1142 args = &req->args; 1143 args->fs = fs; 1144 args->op.create.name = name; 1145 args->sem = &channel->sem; 1146 fs->send_request(__fs_create_file, req); 1147 sem_wait(&channel->sem); 1148 rc = args->rc; 1149 free_fs_request(req); 1150 1151 return rc; 1152 } 1153 1154 static void 1155 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1156 { 1157 struct spdk_fs_request *req = ctx; 1158 struct spdk_fs_cb_args *args = &req->args; 1159 struct spdk_file *f = args->file; 1160 1161 f->blob = blob; 1162 while (!TAILQ_EMPTY(&f->open_requests)) { 1163 req = TAILQ_FIRST(&f->open_requests); 1164 args = &req->args; 1165 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1166 spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->name); 1167 args->fn.file_op_with_handle(args->arg, f, bserrno); 1168 free_fs_request(req); 1169 } 1170 } 1171 1172 static void 1173 fs_open_blob_create_cb(void *ctx, int bserrno) 1174 { 1175 struct spdk_fs_request *req = ctx; 1176 struct spdk_fs_cb_args *args = &req->args; 1177 struct spdk_file *file = args->file; 1178 struct spdk_filesystem *fs = args->fs; 1179 1180 if (file == NULL) { 1181 /* 1182 * This is from an open with CREATE flag - the file 1183 * is now created so look it up in the file list for this 1184 * filesystem. 1185 */ 1186 file = fs_find_file(fs, args->op.open.name); 1187 assert(file != NULL); 1188 args->file = file; 1189 } 1190 1191 file->ref_count++; 1192 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1193 if (file->ref_count == 1) { 1194 assert(file->blob == NULL); 1195 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1196 } else if (file->blob != NULL) { 1197 fs_open_blob_done(req, file->blob, 0); 1198 } else { 1199 /* 1200 * The blob open for this file is in progress due to a previous 1201 * open request. When that open completes, it will invoke the 1202 * open callback for this request. 1203 */ 1204 } 1205 } 1206 1207 void 1208 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1209 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1210 { 1211 struct spdk_file *f = NULL; 1212 struct spdk_fs_request *req; 1213 struct spdk_fs_cb_args *args; 1214 1215 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1216 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1217 return; 1218 } 1219 1220 f = fs_find_file(fs, name); 1221 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1222 cb_fn(cb_arg, NULL, -ENOENT); 1223 return; 1224 } 1225 1226 if (f != NULL && f->is_deleted == true) { 1227 cb_fn(cb_arg, NULL, -ENOENT); 1228 return; 1229 } 1230 1231 req = alloc_fs_request(fs->md_target.md_fs_channel); 1232 if (req == NULL) { 1233 SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name); 1234 cb_fn(cb_arg, NULL, -ENOMEM); 1235 return; 1236 } 1237 1238 args = &req->args; 1239 args->fn.file_op_with_handle = cb_fn; 1240 args->arg = cb_arg; 1241 args->file = f; 1242 args->fs = fs; 1243 args->op.open.name = name; 1244 1245 if (f == NULL) { 1246 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1247 } else { 1248 fs_open_blob_create_cb(req, 0); 1249 } 1250 } 1251 1252 static void 1253 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1254 { 1255 struct spdk_fs_request *req = arg; 1256 struct spdk_fs_cb_args *args = &req->args; 1257 1258 args->file = file; 1259 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name); 1260 __wake_caller(args, bserrno); 1261 } 1262 1263 static void 1264 __fs_open_file(void *arg) 1265 { 1266 struct spdk_fs_request *req = arg; 1267 struct spdk_fs_cb_args *args = &req->args; 1268 1269 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name); 1270 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1271 __fs_open_file_done, req); 1272 } 1273 1274 int 1275 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1276 const char *name, uint32_t flags, struct spdk_file **file) 1277 { 1278 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1279 struct spdk_fs_request *req; 1280 struct spdk_fs_cb_args *args; 1281 int rc; 1282 1283 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1284 1285 req = alloc_fs_request(channel); 1286 if (req == NULL) { 1287 SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name); 1288 return -ENOMEM; 1289 } 1290 1291 args = &req->args; 1292 args->fs = fs; 1293 args->op.open.name = name; 1294 args->op.open.flags = flags; 1295 args->sem = &channel->sem; 1296 fs->send_request(__fs_open_file, req); 1297 sem_wait(&channel->sem); 1298 rc = args->rc; 1299 if (rc == 0) { 1300 *file = args->file; 1301 } else { 1302 *file = NULL; 1303 } 1304 free_fs_request(req); 1305 1306 return rc; 1307 } 1308 1309 static void 1310 fs_rename_blob_close_cb(void *ctx, int bserrno) 1311 { 1312 struct spdk_fs_request *req = ctx; 1313 struct spdk_fs_cb_args *args = &req->args; 1314 1315 args->fn.fs_op(args->arg, bserrno); 1316 free_fs_request(req); 1317 } 1318 1319 static void 1320 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1321 { 1322 struct spdk_fs_request *req = ctx; 1323 struct spdk_fs_cb_args *args = &req->args; 1324 const char *new_name = args->op.rename.new_name; 1325 1326 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1327 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1328 } 1329 1330 static void 1331 _fs_md_rename_file(struct spdk_fs_request *req) 1332 { 1333 struct spdk_fs_cb_args *args = &req->args; 1334 struct spdk_file *f; 1335 1336 f = fs_find_file(args->fs, args->op.rename.old_name); 1337 if (f == NULL) { 1338 args->fn.fs_op(args->arg, -ENOENT); 1339 free_fs_request(req); 1340 return; 1341 } 1342 1343 free(f->name); 1344 f->name = strdup(args->op.rename.new_name); 1345 if (!f->name) { 1346 SPDK_ERRLOG("Cannot allocate memory for file name\n"); 1347 args->fn.fs_op(args->arg, -ENOMEM); 1348 free_fs_request(req); 1349 return; 1350 } 1351 1352 args->file = f; 1353 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1354 } 1355 1356 static void 1357 fs_rename_delete_done(void *arg, int fserrno) 1358 { 1359 _fs_md_rename_file(arg); 1360 } 1361 1362 void 1363 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1364 const char *old_name, const char *new_name, 1365 spdk_file_op_complete cb_fn, void *cb_arg) 1366 { 1367 struct spdk_file *f; 1368 struct spdk_fs_request *req; 1369 struct spdk_fs_cb_args *args; 1370 1371 SPDK_DEBUGLOG(blobfs, "old=%s new=%s\n", old_name, new_name); 1372 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1373 cb_fn(cb_arg, -ENAMETOOLONG); 1374 return; 1375 } 1376 1377 req = alloc_fs_request(fs->md_target.md_fs_channel); 1378 if (req == NULL) { 1379 SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name, 1380 new_name); 1381 cb_fn(cb_arg, -ENOMEM); 1382 return; 1383 } 1384 1385 args = &req->args; 1386 args->fn.fs_op = cb_fn; 1387 args->fs = fs; 1388 args->arg = cb_arg; 1389 args->op.rename.old_name = old_name; 1390 args->op.rename.new_name = new_name; 1391 1392 f = fs_find_file(fs, new_name); 1393 if (f == NULL) { 1394 _fs_md_rename_file(req); 1395 return; 1396 } 1397 1398 /* 1399 * The rename overwrites an existing file. So delete the existing file, then 1400 * do the actual rename. 1401 */ 1402 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1403 } 1404 1405 static void 1406 __fs_rename_file_done(void *arg, int fserrno) 1407 { 1408 struct spdk_fs_request *req = arg; 1409 struct spdk_fs_cb_args *args = &req->args; 1410 1411 __wake_caller(args, fserrno); 1412 } 1413 1414 static void 1415 __fs_rename_file(void *arg) 1416 { 1417 struct spdk_fs_request *req = arg; 1418 struct spdk_fs_cb_args *args = &req->args; 1419 1420 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1421 __fs_rename_file_done, req); 1422 } 1423 1424 int 1425 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1426 const char *old_name, const char *new_name) 1427 { 1428 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1429 struct spdk_fs_request *req; 1430 struct spdk_fs_cb_args *args; 1431 int rc; 1432 1433 req = alloc_fs_request(channel); 1434 if (req == NULL) { 1435 SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name); 1436 return -ENOMEM; 1437 } 1438 1439 args = &req->args; 1440 1441 args->fs = fs; 1442 args->op.rename.old_name = old_name; 1443 args->op.rename.new_name = new_name; 1444 args->sem = &channel->sem; 1445 fs->send_request(__fs_rename_file, req); 1446 sem_wait(&channel->sem); 1447 rc = args->rc; 1448 free_fs_request(req); 1449 return rc; 1450 } 1451 1452 static void 1453 blob_delete_cb(void *ctx, int bserrno) 1454 { 1455 struct spdk_fs_request *req = ctx; 1456 struct spdk_fs_cb_args *args = &req->args; 1457 1458 args->fn.file_op(args->arg, bserrno); 1459 free_fs_request(req); 1460 } 1461 1462 void 1463 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1464 spdk_file_op_complete cb_fn, void *cb_arg) 1465 { 1466 struct spdk_file *f; 1467 spdk_blob_id blobid; 1468 struct spdk_fs_request *req; 1469 struct spdk_fs_cb_args *args; 1470 1471 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1472 1473 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1474 cb_fn(cb_arg, -ENAMETOOLONG); 1475 return; 1476 } 1477 1478 f = fs_find_file(fs, name); 1479 if (f == NULL) { 1480 SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name); 1481 cb_fn(cb_arg, -ENOENT); 1482 return; 1483 } 1484 1485 req = alloc_fs_request(fs->md_target.md_fs_channel); 1486 if (req == NULL) { 1487 SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name); 1488 cb_fn(cb_arg, -ENOMEM); 1489 return; 1490 } 1491 1492 args = &req->args; 1493 args->fn.file_op = cb_fn; 1494 args->arg = cb_arg; 1495 1496 if (f->ref_count > 0) { 1497 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1498 f->is_deleted = true; 1499 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1500 spdk_blob_sync_md(f->blob, blob_delete_cb, req); 1501 return; 1502 } 1503 1504 blobid = f->blobid; 1505 TAILQ_REMOVE(&fs->files, f, tailq); 1506 1507 file_free(f); 1508 1509 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1510 } 1511 1512 static void 1513 __fs_delete_file_done(void *arg, int fserrno) 1514 { 1515 struct spdk_fs_request *req = arg; 1516 struct spdk_fs_cb_args *args = &req->args; 1517 1518 spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, args->op.delete.name); 1519 __wake_caller(args, fserrno); 1520 } 1521 1522 static void 1523 __fs_delete_file(void *arg) 1524 { 1525 struct spdk_fs_request *req = arg; 1526 struct spdk_fs_cb_args *args = &req->args; 1527 1528 spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, args->op.delete.name); 1529 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1530 } 1531 1532 int 1533 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1534 const char *name) 1535 { 1536 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1537 struct spdk_fs_request *req; 1538 struct spdk_fs_cb_args *args; 1539 int rc; 1540 1541 req = alloc_fs_request(channel); 1542 if (req == NULL) { 1543 SPDK_DEBUGLOG(blobfs, "Cannot allocate req to delete file=%s\n", name); 1544 return -ENOMEM; 1545 } 1546 1547 args = &req->args; 1548 args->fs = fs; 1549 args->op.delete.name = name; 1550 args->sem = &channel->sem; 1551 fs->send_request(__fs_delete_file, req); 1552 sem_wait(&channel->sem); 1553 rc = args->rc; 1554 free_fs_request(req); 1555 1556 return rc; 1557 } 1558 1559 spdk_fs_iter 1560 spdk_fs_iter_first(struct spdk_filesystem *fs) 1561 { 1562 struct spdk_file *f; 1563 1564 f = TAILQ_FIRST(&fs->files); 1565 return f; 1566 } 1567 1568 spdk_fs_iter 1569 spdk_fs_iter_next(spdk_fs_iter iter) 1570 { 1571 struct spdk_file *f = iter; 1572 1573 if (f == NULL) { 1574 return NULL; 1575 } 1576 1577 f = TAILQ_NEXT(f, tailq); 1578 return f; 1579 } 1580 1581 const char * 1582 spdk_file_get_name(struct spdk_file *file) 1583 { 1584 return file->name; 1585 } 1586 1587 uint64_t 1588 spdk_file_get_length(struct spdk_file *file) 1589 { 1590 uint64_t length; 1591 1592 assert(file != NULL); 1593 1594 length = file->append_pos >= file->length ? file->append_pos : file->length; 1595 SPDK_DEBUGLOG(blobfs, "file=%s length=0x%jx\n", file->name, length); 1596 return length; 1597 } 1598 1599 static void 1600 fs_truncate_complete_cb(void *ctx, int bserrno) 1601 { 1602 struct spdk_fs_request *req = ctx; 1603 struct spdk_fs_cb_args *args = &req->args; 1604 1605 args->fn.file_op(args->arg, bserrno); 1606 free_fs_request(req); 1607 } 1608 1609 static void 1610 fs_truncate_resize_cb(void *ctx, int bserrno) 1611 { 1612 struct spdk_fs_request *req = ctx; 1613 struct spdk_fs_cb_args *args = &req->args; 1614 struct spdk_file *file = args->file; 1615 uint64_t *length = &args->op.truncate.length; 1616 1617 if (bserrno) { 1618 args->fn.file_op(args->arg, bserrno); 1619 free_fs_request(req); 1620 return; 1621 } 1622 1623 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1624 1625 file->length = *length; 1626 if (file->append_pos > file->length) { 1627 file->append_pos = file->length; 1628 } 1629 1630 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req); 1631 } 1632 1633 static uint64_t 1634 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1635 { 1636 return (length + cluster_sz - 1) / cluster_sz; 1637 } 1638 1639 void 1640 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1641 spdk_file_op_complete cb_fn, void *cb_arg) 1642 { 1643 struct spdk_filesystem *fs; 1644 size_t num_clusters; 1645 struct spdk_fs_request *req; 1646 struct spdk_fs_cb_args *args; 1647 1648 SPDK_DEBUGLOG(blobfs, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1649 if (length == file->length) { 1650 cb_fn(cb_arg, 0); 1651 return; 1652 } 1653 1654 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1655 if (req == NULL) { 1656 cb_fn(cb_arg, -ENOMEM); 1657 return; 1658 } 1659 1660 args = &req->args; 1661 args->fn.file_op = cb_fn; 1662 args->arg = cb_arg; 1663 args->file = file; 1664 args->op.truncate.length = length; 1665 fs = file->fs; 1666 1667 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1668 1669 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1670 } 1671 1672 static void 1673 __truncate(void *arg) 1674 { 1675 struct spdk_fs_request *req = arg; 1676 struct spdk_fs_cb_args *args = &req->args; 1677 1678 spdk_file_truncate_async(args->file, args->op.truncate.length, 1679 args->fn.file_op, args); 1680 } 1681 1682 int 1683 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 1684 uint64_t length) 1685 { 1686 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1687 struct spdk_fs_request *req; 1688 struct spdk_fs_cb_args *args; 1689 int rc; 1690 1691 req = alloc_fs_request(channel); 1692 if (req == NULL) { 1693 return -ENOMEM; 1694 } 1695 1696 args = &req->args; 1697 1698 args->file = file; 1699 args->op.truncate.length = length; 1700 args->fn.file_op = __wake_caller; 1701 args->sem = &channel->sem; 1702 1703 channel->send_request(__truncate, req); 1704 sem_wait(&channel->sem); 1705 rc = args->rc; 1706 free_fs_request(req); 1707 1708 return rc; 1709 } 1710 1711 static void 1712 __rw_done(void *ctx, int bserrno) 1713 { 1714 struct spdk_fs_request *req = ctx; 1715 struct spdk_fs_cb_args *args = &req->args; 1716 1717 spdk_free(args->op.rw.pin_buf); 1718 args->fn.file_op(args->arg, bserrno); 1719 free_fs_request(req); 1720 } 1721 1722 static void 1723 __read_done(void *ctx, int bserrno) 1724 { 1725 struct spdk_fs_request *req = ctx; 1726 struct spdk_fs_cb_args *args = &req->args; 1727 void *buf; 1728 1729 if (bserrno) { 1730 __rw_done(req, bserrno); 1731 return; 1732 } 1733 1734 assert(req != NULL); 1735 buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1))); 1736 if (args->op.rw.is_read) { 1737 spdk_copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length); 1738 __rw_done(req, 0); 1739 } else { 1740 spdk_copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt); 1741 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1742 args->op.rw.pin_buf, 1743 args->op.rw.start_lba, args->op.rw.num_lba, 1744 __rw_done, req); 1745 } 1746 } 1747 1748 static void 1749 __do_blob_read(void *ctx, int fserrno) 1750 { 1751 struct spdk_fs_request *req = ctx; 1752 struct spdk_fs_cb_args *args = &req->args; 1753 1754 if (fserrno) { 1755 __rw_done(req, fserrno); 1756 return; 1757 } 1758 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1759 args->op.rw.pin_buf, 1760 args->op.rw.start_lba, args->op.rw.num_lba, 1761 __read_done, req); 1762 } 1763 1764 static void 1765 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1766 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1767 { 1768 uint64_t end_lba; 1769 1770 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1771 *start_lba = offset / *lba_size; 1772 end_lba = (offset + length - 1) / *lba_size; 1773 *num_lba = (end_lba - *start_lba + 1); 1774 } 1775 1776 static bool 1777 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length) 1778 { 1779 uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1780 1781 if ((offset % lba_size == 0) && (length % lba_size == 0)) { 1782 return true; 1783 } 1784 1785 return false; 1786 } 1787 1788 static void 1789 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt) 1790 { 1791 uint32_t i; 1792 1793 for (i = 0; i < iovcnt; i++) { 1794 req->args.iovs[i].iov_base = iovs[i].iov_base; 1795 req->args.iovs[i].iov_len = iovs[i].iov_len; 1796 } 1797 } 1798 1799 static void 1800 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel, 1801 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1802 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1803 { 1804 struct spdk_fs_request *req; 1805 struct spdk_fs_cb_args *args; 1806 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1807 uint64_t start_lba, num_lba, pin_buf_length; 1808 uint32_t lba_size; 1809 1810 if (is_read && offset + length > file->length) { 1811 cb_fn(cb_arg, -EINVAL); 1812 return; 1813 } 1814 1815 req = alloc_fs_request_with_iov(channel, iovcnt); 1816 if (req == NULL) { 1817 cb_fn(cb_arg, -ENOMEM); 1818 return; 1819 } 1820 1821 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1822 1823 args = &req->args; 1824 args->fn.file_op = cb_fn; 1825 args->arg = cb_arg; 1826 args->file = file; 1827 args->op.rw.channel = channel->bs_channel; 1828 _fs_request_setup_iovs(req, iovs, iovcnt); 1829 args->op.rw.is_read = is_read; 1830 args->op.rw.offset = offset; 1831 args->op.rw.blocklen = lba_size; 1832 1833 pin_buf_length = num_lba * lba_size; 1834 args->op.rw.length = pin_buf_length; 1835 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL, 1836 SPDK_ENV_NUMA_ID_ANY, SPDK_MALLOC_DMA); 1837 if (args->op.rw.pin_buf == NULL) { 1838 SPDK_DEBUGLOG(blobfs, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1839 file->name, offset, length); 1840 free_fs_request(req); 1841 cb_fn(cb_arg, -ENOMEM); 1842 return; 1843 } 1844 1845 args->op.rw.start_lba = start_lba; 1846 args->op.rw.num_lba = num_lba; 1847 1848 if (!is_read && file->length < offset + length) { 1849 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1850 } else if (!is_read && __is_lba_aligned(file, offset, length)) { 1851 spdk_copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt); 1852 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1853 args->op.rw.pin_buf, 1854 args->op.rw.start_lba, args->op.rw.num_lba, 1855 __rw_done, req); 1856 } else { 1857 __do_blob_read(req, 0); 1858 } 1859 } 1860 1861 static void 1862 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel, 1863 void *payload, uint64_t offset, uint64_t length, 1864 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1865 { 1866 struct iovec iov; 1867 1868 iov.iov_base = payload; 1869 iov.iov_len = (size_t)length; 1870 1871 __readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read); 1872 } 1873 1874 void 1875 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1876 void *payload, uint64_t offset, uint64_t length, 1877 spdk_file_op_complete cb_fn, void *cb_arg) 1878 { 1879 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1880 } 1881 1882 void 1883 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel, 1884 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1885 spdk_file_op_complete cb_fn, void *cb_arg) 1886 { 1887 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1888 file->name, offset, length); 1889 1890 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0); 1891 } 1892 1893 void 1894 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1895 void *payload, uint64_t offset, uint64_t length, 1896 spdk_file_op_complete cb_fn, void *cb_arg) 1897 { 1898 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1899 file->name, offset, length); 1900 1901 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1902 } 1903 1904 void 1905 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel, 1906 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1907 spdk_file_op_complete cb_fn, void *cb_arg) 1908 { 1909 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1910 file->name, offset, length); 1911 1912 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1); 1913 } 1914 1915 struct spdk_io_channel * 1916 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1917 { 1918 struct spdk_io_channel *io_channel; 1919 struct spdk_fs_channel *fs_channel; 1920 1921 io_channel = spdk_get_io_channel(&fs->io_target); 1922 fs_channel = spdk_io_channel_get_ctx(io_channel); 1923 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1924 fs_channel->send_request = __send_request_direct; 1925 1926 return io_channel; 1927 } 1928 1929 void 1930 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1931 { 1932 spdk_put_io_channel(channel); 1933 } 1934 1935 struct spdk_fs_thread_ctx * 1936 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs) 1937 { 1938 struct spdk_fs_thread_ctx *ctx; 1939 1940 ctx = calloc(1, sizeof(*ctx)); 1941 if (!ctx) { 1942 return NULL; 1943 } 1944 1945 if (pthread_spin_init(&ctx->ch.lock, 0)) { 1946 free(ctx); 1947 return NULL; 1948 } 1949 1950 fs_channel_create(fs, &ctx->ch, 512); 1951 1952 ctx->ch.send_request = fs->send_request; 1953 ctx->ch.sync = 1; 1954 1955 return ctx; 1956 } 1957 1958 1959 void 1960 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx) 1961 { 1962 assert(ctx->ch.sync == 1); 1963 1964 while (true) { 1965 pthread_spin_lock(&ctx->ch.lock); 1966 if (ctx->ch.outstanding_reqs == 0) { 1967 pthread_spin_unlock(&ctx->ch.lock); 1968 break; 1969 } 1970 pthread_spin_unlock(&ctx->ch.lock); 1971 usleep(1000); 1972 } 1973 1974 fs_channel_destroy(NULL, &ctx->ch); 1975 free(ctx); 1976 } 1977 1978 int 1979 spdk_fs_set_cache_size(uint64_t size_in_mb) 1980 { 1981 /* setting g_fs_cache_size is only permitted if cache pool 1982 * is already freed or hasn't been initialized 1983 */ 1984 if (g_cache_pool != NULL) { 1985 return -EPERM; 1986 } 1987 1988 g_fs_cache_size = size_in_mb * 1024 * 1024; 1989 1990 return 0; 1991 } 1992 1993 uint64_t 1994 spdk_fs_get_cache_size(void) 1995 { 1996 return g_fs_cache_size / (1024 * 1024); 1997 } 1998 1999 static void __file_flush(void *ctx); 2000 2001 /* Try to free some cache buffers from this file. 2002 */ 2003 static int 2004 reclaim_cache_buffers(struct spdk_file *file) 2005 { 2006 int rc; 2007 2008 BLOBFS_TRACE(file, "free=%s\n", file->name); 2009 2010 /* The function is safe to be called with any threads, while the file 2011 * lock maybe locked by other thread for now, so try to get the file 2012 * lock here. 2013 */ 2014 rc = pthread_spin_trylock(&file->lock); 2015 if (rc != 0) { 2016 return -1; 2017 } 2018 2019 if (file->tree->present_mask == 0) { 2020 pthread_spin_unlock(&file->lock); 2021 return -1; 2022 } 2023 tree_free_buffers(file->tree); 2024 2025 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2026 /* If not freed, put it in the end of the queue */ 2027 if (file->tree->present_mask != 0) { 2028 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2029 } 2030 2031 /* tree_free_buffers() may have freed the buffer pointed to by file->last. 2032 * So check if current append_pos is still in the cache, and if not, clear 2033 * file->last. 2034 */ 2035 if (tree_find_buffer(file->tree, file->append_pos) == NULL) { 2036 file->last = NULL; 2037 } 2038 2039 pthread_spin_unlock(&file->lock); 2040 2041 return 0; 2042 } 2043 2044 static int 2045 _blobfs_cache_pool_reclaim(void *arg) 2046 { 2047 struct spdk_file *file, *tmp; 2048 int rc; 2049 2050 if (!blobfs_cache_pool_need_reclaim()) { 2051 return SPDK_POLLER_IDLE; 2052 } 2053 2054 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2055 if (!file->open_for_writing && 2056 file->priority == SPDK_FILE_PRIORITY_LOW) { 2057 rc = reclaim_cache_buffers(file); 2058 if (rc < 0) { 2059 continue; 2060 } 2061 if (!blobfs_cache_pool_need_reclaim()) { 2062 return SPDK_POLLER_BUSY; 2063 } 2064 break; 2065 } 2066 } 2067 2068 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2069 if (!file->open_for_writing) { 2070 rc = reclaim_cache_buffers(file); 2071 if (rc < 0) { 2072 continue; 2073 } 2074 if (!blobfs_cache_pool_need_reclaim()) { 2075 return SPDK_POLLER_BUSY; 2076 } 2077 break; 2078 } 2079 } 2080 2081 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2082 rc = reclaim_cache_buffers(file); 2083 if (rc < 0) { 2084 continue; 2085 } 2086 break; 2087 } 2088 2089 return SPDK_POLLER_BUSY; 2090 } 2091 2092 static void 2093 _add_file_to_cache_pool(void *ctx) 2094 { 2095 struct spdk_file *file = ctx; 2096 2097 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2098 } 2099 2100 static void 2101 _remove_file_from_cache_pool(void *ctx) 2102 { 2103 struct spdk_file *file = ctx; 2104 2105 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2106 } 2107 2108 static struct cache_buffer * 2109 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 2110 { 2111 struct cache_buffer *buf; 2112 int count = 0; 2113 bool need_update = false; 2114 2115 buf = calloc(1, sizeof(*buf)); 2116 if (buf == NULL) { 2117 SPDK_DEBUGLOG(blobfs, "calloc failed\n"); 2118 return NULL; 2119 } 2120 2121 do { 2122 buf->buf = spdk_mempool_get(g_cache_pool); 2123 if (buf->buf) { 2124 break; 2125 } 2126 if (count++ == 100) { 2127 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 2128 file, offset); 2129 free(buf); 2130 return NULL; 2131 } 2132 usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 2133 } while (true); 2134 2135 buf->buf_size = CACHE_BUFFER_SIZE; 2136 buf->offset = offset; 2137 2138 if (file->tree->present_mask == 0) { 2139 need_update = true; 2140 } 2141 file->tree = tree_insert_buffer(file->tree, buf); 2142 2143 if (need_update) { 2144 spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file); 2145 } 2146 2147 return buf; 2148 } 2149 2150 static struct cache_buffer * 2151 cache_append_buffer(struct spdk_file *file) 2152 { 2153 struct cache_buffer *last; 2154 2155 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 2156 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 2157 2158 last = cache_insert_buffer(file, file->append_pos); 2159 if (last == NULL) { 2160 SPDK_DEBUGLOG(blobfs, "cache_insert_buffer failed\n"); 2161 return NULL; 2162 } 2163 2164 file->last = last; 2165 2166 return last; 2167 } 2168 2169 static void __check_sync_reqs(struct spdk_file *file); 2170 2171 static void 2172 __file_cache_finish_sync(void *ctx, int bserrno) 2173 { 2174 struct spdk_file *file; 2175 struct spdk_fs_request *sync_req = ctx; 2176 struct spdk_fs_cb_args *sync_args; 2177 2178 sync_args = &sync_req->args; 2179 file = sync_args->file; 2180 pthread_spin_lock(&file->lock); 2181 file->length_xattr = sync_args->op.sync.length; 2182 assert(sync_args->op.sync.offset <= file->length_flushed); 2183 spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset, 2184 0, file->name); 2185 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 2186 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 2187 pthread_spin_unlock(&file->lock); 2188 2189 sync_args->fn.file_op(sync_args->arg, bserrno); 2190 2191 free_fs_request(sync_req); 2192 __check_sync_reqs(file); 2193 } 2194 2195 static void 2196 __check_sync_reqs(struct spdk_file *file) 2197 { 2198 struct spdk_fs_request *sync_req; 2199 2200 pthread_spin_lock(&file->lock); 2201 2202 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 2203 if (sync_req->args.op.sync.offset <= file->length_flushed) { 2204 break; 2205 } 2206 } 2207 2208 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 2209 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 2210 sync_req->args.op.sync.xattr_in_progress = true; 2211 sync_req->args.op.sync.length = file->length_flushed; 2212 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 2213 sizeof(file->length_flushed)); 2214 2215 pthread_spin_unlock(&file->lock); 2216 spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed, 2217 0, file->name); 2218 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req); 2219 } else { 2220 pthread_spin_unlock(&file->lock); 2221 } 2222 } 2223 2224 static void 2225 __file_flush_done(void *ctx, int bserrno) 2226 { 2227 struct spdk_fs_request *req = ctx; 2228 struct spdk_fs_cb_args *args = &req->args; 2229 struct spdk_file *file = args->file; 2230 struct cache_buffer *next = args->op.flush.cache_buffer; 2231 2232 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 2233 2234 pthread_spin_lock(&file->lock); 2235 next->in_progress = false; 2236 next->bytes_flushed += args->op.flush.length; 2237 file->length_flushed += args->op.flush.length; 2238 if (file->length_flushed > file->length) { 2239 file->length = file->length_flushed; 2240 } 2241 if (next->bytes_flushed == next->buf_size) { 2242 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 2243 next = tree_find_buffer(file->tree, file->length_flushed); 2244 } 2245 2246 /* 2247 * Assert that there is no cached data that extends past the end of the underlying 2248 * blob. 2249 */ 2250 assert(next == NULL || next->offset < __file_get_blob_size(file) || 2251 next->bytes_filled == 0); 2252 2253 pthread_spin_unlock(&file->lock); 2254 2255 __check_sync_reqs(file); 2256 2257 __file_flush(req); 2258 } 2259 2260 static void 2261 __file_flush(void *ctx) 2262 { 2263 struct spdk_fs_request *req = ctx; 2264 struct spdk_fs_cb_args *args = &req->args; 2265 struct spdk_file *file = args->file; 2266 struct cache_buffer *next; 2267 uint64_t offset, length, start_lba, num_lba; 2268 uint32_t lba_size; 2269 2270 pthread_spin_lock(&file->lock); 2271 next = tree_find_buffer(file->tree, file->length_flushed); 2272 if (next == NULL || next->in_progress || 2273 ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) { 2274 /* 2275 * There is either no data to flush, a flush I/O is already in 2276 * progress, or the next buffer is partially filled but there's no 2277 * outstanding request to sync it. 2278 * So return immediately - if a flush I/O is in progress we will flush 2279 * more data after that is completed, or a partial buffer will get flushed 2280 * when it is either filled or the file is synced. 2281 */ 2282 free_fs_request(req); 2283 if (next == NULL) { 2284 /* 2285 * For cases where a file's cache was evicted, and then the 2286 * file was later appended, we will write the data directly 2287 * to disk and bypass cache. So just update length_flushed 2288 * here to reflect that all data was already written to disk. 2289 */ 2290 file->length_flushed = file->append_pos; 2291 } 2292 pthread_spin_unlock(&file->lock); 2293 if (next == NULL) { 2294 /* 2295 * There is no data to flush, but we still need to check for any 2296 * outstanding sync requests to make sure metadata gets updated. 2297 */ 2298 __check_sync_reqs(file); 2299 } 2300 return; 2301 } 2302 2303 offset = next->offset + next->bytes_flushed; 2304 length = next->bytes_filled - next->bytes_flushed; 2305 if (length == 0) { 2306 free_fs_request(req); 2307 pthread_spin_unlock(&file->lock); 2308 /* 2309 * There is no data to flush, but we still need to check for any 2310 * outstanding sync requests to make sure metadata gets updated. 2311 */ 2312 __check_sync_reqs(file); 2313 return; 2314 } 2315 args->op.flush.length = length; 2316 args->op.flush.cache_buffer = next; 2317 2318 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2319 2320 next->in_progress = true; 2321 BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n", 2322 offset, length, start_lba, num_lba); 2323 pthread_spin_unlock(&file->lock); 2324 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2325 next->buf + (start_lba * lba_size) - next->offset, 2326 start_lba, num_lba, __file_flush_done, req); 2327 } 2328 2329 static void 2330 __file_extend_done(void *arg, int bserrno) 2331 { 2332 struct spdk_fs_cb_args *args = arg; 2333 2334 __wake_caller(args, bserrno); 2335 } 2336 2337 static void 2338 __file_extend_resize_cb(void *_args, int bserrno) 2339 { 2340 struct spdk_fs_cb_args *args = _args; 2341 struct spdk_file *file = args->file; 2342 2343 if (bserrno) { 2344 __wake_caller(args, bserrno); 2345 return; 2346 } 2347 2348 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2349 } 2350 2351 static void 2352 __file_extend_blob(void *_args) 2353 { 2354 struct spdk_fs_cb_args *args = _args; 2355 struct spdk_file *file = args->file; 2356 2357 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2358 } 2359 2360 static void 2361 __rw_from_file_done(void *ctx, int bserrno) 2362 { 2363 struct spdk_fs_request *req = ctx; 2364 2365 __wake_caller(&req->args, bserrno); 2366 free_fs_request(req); 2367 } 2368 2369 static void 2370 __rw_from_file(void *ctx) 2371 { 2372 struct spdk_fs_request *req = ctx; 2373 struct spdk_fs_cb_args *args = &req->args; 2374 struct spdk_file *file = args->file; 2375 2376 if (args->op.rw.is_read) { 2377 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2378 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2379 __rw_from_file_done, req); 2380 } else { 2381 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2382 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2383 __rw_from_file_done, req); 2384 } 2385 } 2386 2387 struct rw_from_file_arg { 2388 struct spdk_fs_channel *channel; 2389 int rwerrno; 2390 }; 2391 2392 static int 2393 __send_rw_from_file(struct spdk_file *file, void *payload, 2394 uint64_t offset, uint64_t length, bool is_read, 2395 struct rw_from_file_arg *arg) 2396 { 2397 struct spdk_fs_request *req; 2398 struct spdk_fs_cb_args *args; 2399 2400 req = alloc_fs_request_with_iov(arg->channel, 1); 2401 if (req == NULL) { 2402 sem_post(&arg->channel->sem); 2403 return -ENOMEM; 2404 } 2405 2406 args = &req->args; 2407 args->file = file; 2408 args->sem = &arg->channel->sem; 2409 args->iovs[0].iov_base = payload; 2410 args->iovs[0].iov_len = (size_t)length; 2411 args->op.rw.offset = offset; 2412 args->op.rw.is_read = is_read; 2413 args->rwerrno = &arg->rwerrno; 2414 file->fs->send_request(__rw_from_file, req); 2415 return 0; 2416 } 2417 2418 int 2419 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2420 void *payload, uint64_t offset, uint64_t length) 2421 { 2422 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2423 struct spdk_fs_request *flush_req; 2424 uint64_t rem_length, copy, blob_size, cluster_sz; 2425 uint32_t cache_buffers_filled = 0; 2426 uint8_t *cur_payload; 2427 struct cache_buffer *last; 2428 2429 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2430 2431 if (length == 0) { 2432 return 0; 2433 } 2434 2435 if (offset != file->append_pos) { 2436 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2437 return -EINVAL; 2438 } 2439 2440 pthread_spin_lock(&file->lock); 2441 file->open_for_writing = true; 2442 2443 do { 2444 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2445 cache_append_buffer(file); 2446 } 2447 2448 if (file->last == NULL) { 2449 struct rw_from_file_arg arg = {}; 2450 int rc; 2451 2452 arg.channel = channel; 2453 arg.rwerrno = 0; 2454 file->append_pos += length; 2455 pthread_spin_unlock(&file->lock); 2456 rc = __send_rw_from_file(file, payload, offset, length, false, &arg); 2457 if (rc != 0) { 2458 return rc; 2459 } 2460 sem_wait(&channel->sem); 2461 return arg.rwerrno; 2462 } 2463 2464 blob_size = __file_get_blob_size(file); 2465 2466 if ((offset + length) > blob_size) { 2467 struct spdk_fs_cb_args extend_args = {}; 2468 2469 cluster_sz = file->fs->bs_opts.cluster_sz; 2470 extend_args.sem = &channel->sem; 2471 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2472 extend_args.file = file; 2473 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2474 pthread_spin_unlock(&file->lock); 2475 file->fs->send_request(__file_extend_blob, &extend_args); 2476 sem_wait(&channel->sem); 2477 if (extend_args.rc) { 2478 return extend_args.rc; 2479 } 2480 pthread_spin_lock(&file->lock); 2481 } 2482 } while (file->last == NULL); 2483 2484 flush_req = alloc_fs_request(channel); 2485 if (flush_req == NULL) { 2486 pthread_spin_unlock(&file->lock); 2487 return -ENOMEM; 2488 } 2489 2490 last = file->last; 2491 rem_length = length; 2492 cur_payload = payload; 2493 while (rem_length > 0) { 2494 copy = last->buf_size - last->bytes_filled; 2495 if (copy > rem_length) { 2496 copy = rem_length; 2497 } 2498 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2499 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2500 file->append_pos += copy; 2501 if (file->length < file->append_pos) { 2502 file->length = file->append_pos; 2503 } 2504 cur_payload += copy; 2505 last->bytes_filled += copy; 2506 rem_length -= copy; 2507 if (last->bytes_filled == last->buf_size) { 2508 cache_buffers_filled++; 2509 last = cache_append_buffer(file); 2510 if (last == NULL) { 2511 BLOBFS_TRACE(file, "nomem\n"); 2512 free_fs_request(flush_req); 2513 pthread_spin_unlock(&file->lock); 2514 return -ENOMEM; 2515 } 2516 } 2517 } 2518 2519 pthread_spin_unlock(&file->lock); 2520 2521 if (cache_buffers_filled == 0) { 2522 free_fs_request(flush_req); 2523 return 0; 2524 } 2525 2526 flush_req->args.file = file; 2527 file->fs->send_request(__file_flush, flush_req); 2528 return 0; 2529 } 2530 2531 static void 2532 __readahead_done(void *ctx, int bserrno) 2533 { 2534 struct spdk_fs_request *req = ctx; 2535 struct spdk_fs_cb_args *args = &req->args; 2536 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2537 struct spdk_file *file = args->file; 2538 2539 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2540 2541 pthread_spin_lock(&file->lock); 2542 cache_buffer->bytes_filled = args->op.readahead.length; 2543 cache_buffer->bytes_flushed = args->op.readahead.length; 2544 cache_buffer->in_progress = false; 2545 pthread_spin_unlock(&file->lock); 2546 2547 free_fs_request(req); 2548 } 2549 2550 static void 2551 __readahead(void *ctx) 2552 { 2553 struct spdk_fs_request *req = ctx; 2554 struct spdk_fs_cb_args *args = &req->args; 2555 struct spdk_file *file = args->file; 2556 uint64_t offset, length, start_lba, num_lba; 2557 uint32_t lba_size; 2558 2559 offset = args->op.readahead.offset; 2560 length = args->op.readahead.length; 2561 assert(length > 0); 2562 2563 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2564 2565 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2566 offset, length, start_lba, num_lba); 2567 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2568 args->op.readahead.cache_buffer->buf, 2569 start_lba, num_lba, __readahead_done, req); 2570 } 2571 2572 static uint64_t 2573 __next_cache_buffer_offset(uint64_t offset) 2574 { 2575 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2576 } 2577 2578 static void 2579 check_readahead(struct spdk_file *file, uint64_t offset, 2580 struct spdk_fs_channel *channel) 2581 { 2582 struct spdk_fs_request *req; 2583 struct spdk_fs_cb_args *args; 2584 2585 offset = __next_cache_buffer_offset(offset); 2586 if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2587 return; 2588 } 2589 2590 req = alloc_fs_request(channel); 2591 if (req == NULL) { 2592 return; 2593 } 2594 args = &req->args; 2595 2596 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2597 2598 args->file = file; 2599 args->op.readahead.offset = offset; 2600 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2601 if (!args->op.readahead.cache_buffer) { 2602 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2603 free_fs_request(req); 2604 return; 2605 } 2606 2607 args->op.readahead.cache_buffer->in_progress = true; 2608 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2609 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2610 } else { 2611 args->op.readahead.length = CACHE_BUFFER_SIZE; 2612 } 2613 file->fs->send_request(__readahead, req); 2614 } 2615 2616 int64_t 2617 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2618 void *payload, uint64_t offset, uint64_t length) 2619 { 2620 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2621 uint64_t final_offset, final_length; 2622 uint32_t sub_reads = 0; 2623 struct cache_buffer *buf; 2624 uint64_t read_len; 2625 struct rw_from_file_arg arg = {}; 2626 2627 pthread_spin_lock(&file->lock); 2628 2629 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2630 2631 file->open_for_writing = false; 2632 2633 if (length == 0 || offset >= file->append_pos) { 2634 pthread_spin_unlock(&file->lock); 2635 return 0; 2636 } 2637 2638 if (offset + length > file->append_pos) { 2639 length = file->append_pos - offset; 2640 } 2641 2642 if (offset != file->next_seq_offset) { 2643 file->seq_byte_count = 0; 2644 } 2645 file->seq_byte_count += length; 2646 file->next_seq_offset = offset + length; 2647 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2648 check_readahead(file, offset, channel); 2649 check_readahead(file, offset + CACHE_BUFFER_SIZE, channel); 2650 } 2651 2652 arg.channel = channel; 2653 arg.rwerrno = 0; 2654 final_length = 0; 2655 final_offset = offset + length; 2656 while (offset < final_offset) { 2657 int ret = 0; 2658 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2659 if (length > (final_offset - offset)) { 2660 length = final_offset - offset; 2661 } 2662 2663 buf = tree_find_filled_buffer(file->tree, offset); 2664 if (buf == NULL) { 2665 pthread_spin_unlock(&file->lock); 2666 ret = __send_rw_from_file(file, payload, offset, length, true, &arg); 2667 pthread_spin_lock(&file->lock); 2668 if (ret == 0) { 2669 sub_reads++; 2670 } 2671 } else { 2672 read_len = length; 2673 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2674 read_len = buf->offset + buf->bytes_filled - offset; 2675 } 2676 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len); 2677 memcpy(payload, &buf->buf[offset - buf->offset], read_len); 2678 if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) { 2679 tree_remove_buffer(file->tree, buf); 2680 if (file->tree->present_mask == 0) { 2681 spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file); 2682 } 2683 } 2684 } 2685 2686 if (ret == 0) { 2687 final_length += length; 2688 } else { 2689 arg.rwerrno = ret; 2690 break; 2691 } 2692 payload += length; 2693 offset += length; 2694 } 2695 pthread_spin_unlock(&file->lock); 2696 while (sub_reads > 0) { 2697 sem_wait(&channel->sem); 2698 sub_reads--; 2699 } 2700 if (arg.rwerrno == 0) { 2701 return final_length; 2702 } else { 2703 return arg.rwerrno; 2704 } 2705 } 2706 2707 static void 2708 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2709 spdk_file_op_complete cb_fn, void *cb_arg) 2710 { 2711 struct spdk_fs_request *sync_req; 2712 struct spdk_fs_request *flush_req; 2713 struct spdk_fs_cb_args *sync_args; 2714 struct spdk_fs_cb_args *flush_args; 2715 2716 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2717 2718 pthread_spin_lock(&file->lock); 2719 if (file->append_pos <= file->length_xattr) { 2720 BLOBFS_TRACE(file, "done - file already synced\n"); 2721 pthread_spin_unlock(&file->lock); 2722 cb_fn(cb_arg, 0); 2723 return; 2724 } 2725 2726 sync_req = alloc_fs_request(channel); 2727 if (!sync_req) { 2728 SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name); 2729 pthread_spin_unlock(&file->lock); 2730 cb_fn(cb_arg, -ENOMEM); 2731 return; 2732 } 2733 sync_args = &sync_req->args; 2734 2735 flush_req = alloc_fs_request(channel); 2736 if (!flush_req) { 2737 SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name); 2738 free_fs_request(sync_req); 2739 pthread_spin_unlock(&file->lock); 2740 cb_fn(cb_arg, -ENOMEM); 2741 return; 2742 } 2743 flush_args = &flush_req->args; 2744 2745 sync_args->file = file; 2746 sync_args->fn.file_op = cb_fn; 2747 sync_args->arg = cb_arg; 2748 sync_args->op.sync.offset = file->append_pos; 2749 sync_args->op.sync.xattr_in_progress = false; 2750 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2751 pthread_spin_unlock(&file->lock); 2752 2753 flush_args->file = file; 2754 channel->send_request(__file_flush, flush_req); 2755 } 2756 2757 int 2758 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2759 { 2760 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2761 struct spdk_fs_cb_args args = {}; 2762 2763 args.sem = &channel->sem; 2764 _file_sync(file, channel, __wake_caller, &args); 2765 sem_wait(&channel->sem); 2766 2767 return args.rc; 2768 } 2769 2770 void 2771 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2772 spdk_file_op_complete cb_fn, void *cb_arg) 2773 { 2774 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2775 2776 _file_sync(file, channel, cb_fn, cb_arg); 2777 } 2778 2779 void 2780 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2781 { 2782 BLOBFS_TRACE(file, "priority=%u\n", priority); 2783 file->priority = priority; 2784 2785 } 2786 2787 /* 2788 * Close routines 2789 */ 2790 2791 static void 2792 __file_close_async_done(void *ctx, int bserrno) 2793 { 2794 struct spdk_fs_request *req = ctx; 2795 struct spdk_fs_cb_args *args = &req->args; 2796 struct spdk_file *file = args->file; 2797 2798 spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->name); 2799 2800 if (file->is_deleted) { 2801 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2802 return; 2803 } 2804 2805 args->fn.file_op(args->arg, bserrno); 2806 free_fs_request(req); 2807 } 2808 2809 static void 2810 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2811 { 2812 struct spdk_blob *blob; 2813 2814 pthread_spin_lock(&file->lock); 2815 if (file->ref_count == 0) { 2816 pthread_spin_unlock(&file->lock); 2817 __file_close_async_done(req, -EBADF); 2818 return; 2819 } 2820 2821 file->ref_count--; 2822 if (file->ref_count > 0) { 2823 pthread_spin_unlock(&file->lock); 2824 req->args.fn.file_op(req->args.arg, 0); 2825 free_fs_request(req); 2826 return; 2827 } 2828 2829 pthread_spin_unlock(&file->lock); 2830 2831 blob = file->blob; 2832 file->blob = NULL; 2833 spdk_blob_close(blob, __file_close_async_done, req); 2834 } 2835 2836 static void 2837 __file_close_async__sync_done(void *arg, int fserrno) 2838 { 2839 struct spdk_fs_request *req = arg; 2840 struct spdk_fs_cb_args *args = &req->args; 2841 2842 __file_close_async(args->file, req); 2843 } 2844 2845 void 2846 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2847 { 2848 struct spdk_fs_request *req; 2849 struct spdk_fs_cb_args *args; 2850 2851 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2852 if (req == NULL) { 2853 SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name); 2854 cb_fn(cb_arg, -ENOMEM); 2855 return; 2856 } 2857 2858 args = &req->args; 2859 args->file = file; 2860 args->fn.file_op = cb_fn; 2861 args->arg = cb_arg; 2862 2863 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2864 } 2865 2866 static void 2867 __file_close(void *arg) 2868 { 2869 struct spdk_fs_request *req = arg; 2870 struct spdk_fs_cb_args *args = &req->args; 2871 struct spdk_file *file = args->file; 2872 2873 __file_close_async(file, req); 2874 } 2875 2876 int 2877 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2878 { 2879 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2880 struct spdk_fs_request *req; 2881 struct spdk_fs_cb_args *args; 2882 2883 req = alloc_fs_request(channel); 2884 if (req == NULL) { 2885 SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name); 2886 return -ENOMEM; 2887 } 2888 2889 args = &req->args; 2890 2891 spdk_file_sync(file, ctx); 2892 BLOBFS_TRACE(file, "name=%s\n", file->name); 2893 args->file = file; 2894 args->sem = &channel->sem; 2895 args->fn.file_op = __wake_caller; 2896 args->arg = args; 2897 channel->send_request(__file_close, req); 2898 sem_wait(&channel->sem); 2899 2900 return args->rc; 2901 } 2902 2903 int 2904 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2905 { 2906 if (size < sizeof(spdk_blob_id)) { 2907 return -EINVAL; 2908 } 2909 2910 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2911 2912 return sizeof(spdk_blob_id); 2913 } 2914 2915 static void 2916 _file_free(void *ctx) 2917 { 2918 struct spdk_file *file = ctx; 2919 2920 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2921 2922 free(file->name); 2923 free(file->tree); 2924 free(file); 2925 } 2926 2927 static void 2928 file_free(struct spdk_file *file) 2929 { 2930 BLOBFS_TRACE(file, "free=%s\n", file->name); 2931 pthread_spin_lock(&file->lock); 2932 if (file->tree->present_mask == 0) { 2933 pthread_spin_unlock(&file->lock); 2934 free(file->name); 2935 free(file->tree); 2936 free(file); 2937 return; 2938 } 2939 2940 tree_free_buffers(file->tree); 2941 assert(file->tree->present_mask == 0); 2942 spdk_thread_send_msg(g_cache_pool_thread, _file_free, file); 2943 pthread_spin_unlock(&file->lock); 2944 } 2945 2946 SPDK_LOG_REGISTER_COMPONENT(blobfs) 2947 SPDK_LOG_REGISTER_COMPONENT(blobfs_rw) 2948