1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (c) Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 8 #include "spdk/blobfs.h" 9 #include "tree.h" 10 11 #include "spdk/queue.h" 12 #include "spdk/thread.h" 13 #include "spdk/assert.h" 14 #include "spdk/env.h" 15 #include "spdk/util.h" 16 #include "spdk/log.h" 17 #include "spdk/trace.h" 18 19 #include "spdk_internal/trace_defs.h" 20 21 #define BLOBFS_TRACE(file, str, args...) \ 22 SPDK_DEBUGLOG(blobfs, "file=%s " str, file->name, ##args) 23 24 #define BLOBFS_TRACE_RW(file, str, args...) \ 25 SPDK_DEBUGLOG(blobfs_rw, "file=%s " str, file->name, ##args) 26 27 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 28 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 29 30 #define SPDK_BLOBFS_SIGNATURE "BLOBFS" 31 32 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 33 static struct spdk_mempool *g_cache_pool; 34 static TAILQ_HEAD(, spdk_file) g_caches = TAILQ_HEAD_INITIALIZER(g_caches); 35 static struct spdk_poller *g_cache_pool_mgmt_poller; 36 static struct spdk_thread *g_cache_pool_thread; 37 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL 38 static int g_fs_count = 0; 39 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 40 41 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS) 42 { 43 struct spdk_trace_tpoint_opts opts[] = { 44 { 45 "BLOBFS_XATTR_START", TRACE_BLOBFS_XATTR_START, 46 OWNER_NONE, OBJECT_NONE, 0, 47 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 48 }, 49 { 50 "BLOBFS_XATTR_END", TRACE_BLOBFS_XATTR_END, 51 OWNER_NONE, OBJECT_NONE, 0, 52 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 53 }, 54 { 55 "BLOBFS_OPEN", TRACE_BLOBFS_OPEN, 56 OWNER_NONE, OBJECT_NONE, 0, 57 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 58 }, 59 { 60 "BLOBFS_CLOSE", TRACE_BLOBFS_CLOSE, 61 OWNER_NONE, OBJECT_NONE, 0, 62 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 63 }, 64 { 65 "BLOBFS_DELETE_START", TRACE_BLOBFS_DELETE_START, 66 OWNER_NONE, OBJECT_NONE, 0, 67 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 68 }, 69 { 70 "BLOBFS_DELETE_DONE", TRACE_BLOBFS_DELETE_DONE, 71 OWNER_NONE, OBJECT_NONE, 0, 72 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 73 } 74 }; 75 76 spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts)); 77 } 78 79 void 80 cache_buffer_free(struct cache_buffer *cache_buffer) 81 { 82 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 83 free(cache_buffer); 84 } 85 86 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 87 88 struct spdk_file { 89 struct spdk_filesystem *fs; 90 struct spdk_blob *blob; 91 char *name; 92 uint64_t length; 93 bool is_deleted; 94 bool open_for_writing; 95 uint64_t length_flushed; 96 uint64_t length_xattr; 97 uint64_t append_pos; 98 uint64_t seq_byte_count; 99 uint64_t next_seq_offset; 100 uint32_t priority; 101 TAILQ_ENTRY(spdk_file) tailq; 102 spdk_blob_id blobid; 103 uint32_t ref_count; 104 pthread_spinlock_t lock; 105 struct cache_buffer *last; 106 struct cache_tree *tree; 107 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 108 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 109 TAILQ_ENTRY(spdk_file) cache_tailq; 110 }; 111 112 struct spdk_deleted_file { 113 spdk_blob_id id; 114 TAILQ_ENTRY(spdk_deleted_file) tailq; 115 }; 116 117 struct spdk_filesystem { 118 struct spdk_blob_store *bs; 119 TAILQ_HEAD(, spdk_file) files; 120 struct spdk_bs_opts bs_opts; 121 struct spdk_bs_dev *bdev; 122 fs_send_request_fn send_request; 123 124 struct { 125 uint32_t max_ops; 126 struct spdk_io_channel *sync_io_channel; 127 struct spdk_fs_channel *sync_fs_channel; 128 } sync_target; 129 130 struct { 131 uint32_t max_ops; 132 struct spdk_io_channel *md_io_channel; 133 struct spdk_fs_channel *md_fs_channel; 134 } md_target; 135 136 struct { 137 uint32_t max_ops; 138 } io_target; 139 }; 140 141 struct spdk_fs_cb_args { 142 union { 143 spdk_fs_op_with_handle_complete fs_op_with_handle; 144 spdk_fs_op_complete fs_op; 145 spdk_file_op_with_handle_complete file_op_with_handle; 146 spdk_file_op_complete file_op; 147 spdk_file_stat_op_complete stat_op; 148 } fn; 149 void *arg; 150 sem_t *sem; 151 struct spdk_filesystem *fs; 152 struct spdk_file *file; 153 int rc; 154 int *rwerrno; 155 struct iovec *iovs; 156 uint32_t iovcnt; 157 struct iovec iov; 158 union { 159 struct { 160 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 161 } fs_load; 162 struct { 163 uint64_t length; 164 } truncate; 165 struct { 166 struct spdk_io_channel *channel; 167 void *pin_buf; 168 int is_read; 169 off_t offset; 170 size_t length; 171 uint64_t start_lba; 172 uint64_t num_lba; 173 uint32_t blocklen; 174 } rw; 175 struct { 176 const char *old_name; 177 const char *new_name; 178 } rename; 179 struct { 180 struct cache_buffer *cache_buffer; 181 uint64_t length; 182 } flush; 183 struct { 184 struct cache_buffer *cache_buffer; 185 uint64_t length; 186 uint64_t offset; 187 } readahead; 188 struct { 189 /* offset of the file when the sync request was made */ 190 uint64_t offset; 191 TAILQ_ENTRY(spdk_fs_request) tailq; 192 bool xattr_in_progress; 193 /* length written to the xattr for this file - this should 194 * always be the same as the offset if only one thread is 195 * writing to the file, but could differ if multiple threads 196 * are appending 197 */ 198 uint64_t length; 199 } sync; 200 struct { 201 uint32_t num_clusters; 202 } resize; 203 struct { 204 const char *name; 205 uint32_t flags; 206 TAILQ_ENTRY(spdk_fs_request) tailq; 207 } open; 208 struct { 209 const char *name; 210 struct spdk_blob *blob; 211 } create; 212 struct { 213 const char *name; 214 } delete; 215 struct { 216 const char *name; 217 } stat; 218 } op; 219 }; 220 221 static void file_free(struct spdk_file *file); 222 static void fs_io_device_unregister(struct spdk_filesystem *fs); 223 static void fs_free_io_channels(struct spdk_filesystem *fs); 224 225 void 226 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 227 { 228 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 229 } 230 231 static int _blobfs_cache_pool_reclaim(void *arg); 232 233 static bool 234 blobfs_cache_pool_need_reclaim(void) 235 { 236 size_t count; 237 238 count = spdk_mempool_count(g_cache_pool); 239 /* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller 240 * when the number of available cache buffer is less than 1/5 of total buffers. 241 */ 242 if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) { 243 return false; 244 } 245 246 return true; 247 } 248 249 static void 250 __start_cache_pool_mgmt(void *ctx) 251 { 252 assert(g_cache_pool == NULL); 253 254 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 255 g_fs_cache_size / CACHE_BUFFER_SIZE, 256 CACHE_BUFFER_SIZE, 257 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 258 SPDK_ENV_SOCKET_ID_ANY); 259 if (!g_cache_pool) { 260 SPDK_ERRLOG("Create mempool failed, you may " 261 "increase the memory and try again\n"); 262 assert(false); 263 } 264 265 assert(g_cache_pool_mgmt_poller == NULL); 266 g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL, 267 BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 268 } 269 270 static void 271 __stop_cache_pool_mgmt(void *ctx) 272 { 273 spdk_poller_unregister(&g_cache_pool_mgmt_poller); 274 275 assert(g_cache_pool != NULL); 276 assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE); 277 spdk_mempool_free(g_cache_pool); 278 g_cache_pool = NULL; 279 280 spdk_thread_exit(g_cache_pool_thread); 281 } 282 283 static void 284 initialize_global_cache(void) 285 { 286 pthread_mutex_lock(&g_cache_init_lock); 287 if (g_fs_count == 0) { 288 g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL); 289 assert(g_cache_pool_thread != NULL); 290 spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL); 291 } 292 g_fs_count++; 293 pthread_mutex_unlock(&g_cache_init_lock); 294 } 295 296 static void 297 free_global_cache(void) 298 { 299 pthread_mutex_lock(&g_cache_init_lock); 300 g_fs_count--; 301 if (g_fs_count == 0) { 302 spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL); 303 } 304 pthread_mutex_unlock(&g_cache_init_lock); 305 } 306 307 static uint64_t 308 __file_get_blob_size(struct spdk_file *file) 309 { 310 uint64_t cluster_sz; 311 312 cluster_sz = file->fs->bs_opts.cluster_sz; 313 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 314 } 315 316 struct spdk_fs_request { 317 struct spdk_fs_cb_args args; 318 TAILQ_ENTRY(spdk_fs_request) link; 319 struct spdk_fs_channel *channel; 320 }; 321 322 struct spdk_fs_channel { 323 struct spdk_fs_request *req_mem; 324 TAILQ_HEAD(, spdk_fs_request) reqs; 325 sem_t sem; 326 struct spdk_filesystem *fs; 327 struct spdk_io_channel *bs_channel; 328 fs_send_request_fn send_request; 329 bool sync; 330 uint32_t outstanding_reqs; 331 pthread_spinlock_t lock; 332 }; 333 334 /* For now, this is effectively an alias. But eventually we'll shift 335 * some data members over. */ 336 struct spdk_fs_thread_ctx { 337 struct spdk_fs_channel ch; 338 }; 339 340 static struct spdk_fs_request * 341 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt) 342 { 343 struct spdk_fs_request *req; 344 struct iovec *iovs = NULL; 345 346 if (iovcnt > 1) { 347 iovs = calloc(iovcnt, sizeof(struct iovec)); 348 if (!iovs) { 349 return NULL; 350 } 351 } 352 353 if (channel->sync) { 354 pthread_spin_lock(&channel->lock); 355 } 356 357 req = TAILQ_FIRST(&channel->reqs); 358 if (req) { 359 channel->outstanding_reqs++; 360 TAILQ_REMOVE(&channel->reqs, req, link); 361 } 362 363 if (channel->sync) { 364 pthread_spin_unlock(&channel->lock); 365 } 366 367 if (req == NULL) { 368 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 369 free(iovs); 370 return NULL; 371 } 372 memset(req, 0, sizeof(*req)); 373 req->channel = channel; 374 if (iovcnt > 1) { 375 req->args.iovs = iovs; 376 } else { 377 req->args.iovs = &req->args.iov; 378 } 379 req->args.iovcnt = iovcnt; 380 381 return req; 382 } 383 384 static struct spdk_fs_request * 385 alloc_fs_request(struct spdk_fs_channel *channel) 386 { 387 return alloc_fs_request_with_iov(channel, 0); 388 } 389 390 static void 391 free_fs_request(struct spdk_fs_request *req) 392 { 393 struct spdk_fs_channel *channel = req->channel; 394 395 if (req->args.iovcnt > 1) { 396 free(req->args.iovs); 397 } 398 399 if (channel->sync) { 400 pthread_spin_lock(&channel->lock); 401 } 402 403 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 404 channel->outstanding_reqs--; 405 406 if (channel->sync) { 407 pthread_spin_unlock(&channel->lock); 408 } 409 } 410 411 static int 412 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 413 uint32_t max_ops) 414 { 415 uint32_t i; 416 417 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 418 if (!channel->req_mem) { 419 return -1; 420 } 421 422 channel->outstanding_reqs = 0; 423 TAILQ_INIT(&channel->reqs); 424 sem_init(&channel->sem, 0, 0); 425 426 for (i = 0; i < max_ops; i++) { 427 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 428 } 429 430 channel->fs = fs; 431 432 return 0; 433 } 434 435 static int 436 fs_md_channel_create(void *io_device, void *ctx_buf) 437 { 438 struct spdk_filesystem *fs; 439 struct spdk_fs_channel *channel = ctx_buf; 440 441 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 442 443 return fs_channel_create(fs, channel, fs->md_target.max_ops); 444 } 445 446 static int 447 fs_sync_channel_create(void *io_device, void *ctx_buf) 448 { 449 struct spdk_filesystem *fs; 450 struct spdk_fs_channel *channel = ctx_buf; 451 452 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 453 454 return fs_channel_create(fs, channel, fs->sync_target.max_ops); 455 } 456 457 static int 458 fs_io_channel_create(void *io_device, void *ctx_buf) 459 { 460 struct spdk_filesystem *fs; 461 struct spdk_fs_channel *channel = ctx_buf; 462 463 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 464 465 return fs_channel_create(fs, channel, fs->io_target.max_ops); 466 } 467 468 static void 469 fs_channel_destroy(void *io_device, void *ctx_buf) 470 { 471 struct spdk_fs_channel *channel = ctx_buf; 472 473 if (channel->outstanding_reqs > 0) { 474 SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n", 475 channel->outstanding_reqs); 476 } 477 478 free(channel->req_mem); 479 if (channel->bs_channel != NULL) { 480 spdk_bs_free_io_channel(channel->bs_channel); 481 } 482 } 483 484 static void 485 __send_request_direct(fs_request_fn fn, void *arg) 486 { 487 fn(arg); 488 } 489 490 static void 491 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 492 { 493 fs->bs = bs; 494 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 495 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 496 fs->md_target.md_fs_channel->send_request = __send_request_direct; 497 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 498 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 499 500 initialize_global_cache(); 501 } 502 503 static void 504 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 505 { 506 struct spdk_fs_request *req = ctx; 507 struct spdk_fs_cb_args *args = &req->args; 508 struct spdk_filesystem *fs = args->fs; 509 510 if (bserrno == 0) { 511 common_fs_bs_init(fs, bs); 512 } else { 513 free(fs); 514 fs = NULL; 515 } 516 517 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 518 free_fs_request(req); 519 } 520 521 static struct spdk_filesystem * 522 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 523 { 524 struct spdk_filesystem *fs; 525 526 fs = calloc(1, sizeof(*fs)); 527 if (fs == NULL) { 528 return NULL; 529 } 530 531 fs->bdev = dev; 532 fs->send_request = send_request_fn; 533 TAILQ_INIT(&fs->files); 534 535 fs->md_target.max_ops = 512; 536 spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy, 537 sizeof(struct spdk_fs_channel), "blobfs_md"); 538 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 539 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 540 541 fs->sync_target.max_ops = 512; 542 spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy, 543 sizeof(struct spdk_fs_channel), "blobfs_sync"); 544 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 545 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 546 547 fs->io_target.max_ops = 512; 548 spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy, 549 sizeof(struct spdk_fs_channel), "blobfs_io"); 550 551 return fs; 552 } 553 554 static void 555 __wake_caller(void *arg, int fserrno) 556 { 557 struct spdk_fs_cb_args *args = arg; 558 559 if ((args->rwerrno != NULL) && (*(args->rwerrno) == 0) && fserrno) { 560 *(args->rwerrno) = fserrno; 561 } 562 args->rc = fserrno; 563 sem_post(args->sem); 564 } 565 566 void 567 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 568 fs_send_request_fn send_request_fn, 569 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 570 { 571 struct spdk_filesystem *fs; 572 struct spdk_fs_request *req; 573 struct spdk_fs_cb_args *args; 574 struct spdk_bs_opts opts = {}; 575 576 fs = fs_alloc(dev, send_request_fn); 577 if (fs == NULL) { 578 cb_fn(cb_arg, NULL, -ENOMEM); 579 return; 580 } 581 582 req = alloc_fs_request(fs->md_target.md_fs_channel); 583 if (req == NULL) { 584 fs_free_io_channels(fs); 585 fs_io_device_unregister(fs); 586 cb_fn(cb_arg, NULL, -ENOMEM); 587 return; 588 } 589 590 args = &req->args; 591 args->fn.fs_op_with_handle = cb_fn; 592 args->arg = cb_arg; 593 args->fs = fs; 594 595 spdk_bs_opts_init(&opts, sizeof(opts)); 596 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE); 597 if (opt) { 598 opts.cluster_sz = opt->cluster_sz; 599 } 600 spdk_bs_init(dev, &opts, init_cb, req); 601 } 602 603 static struct spdk_file * 604 file_alloc(struct spdk_filesystem *fs) 605 { 606 struct spdk_file *file; 607 608 file = calloc(1, sizeof(*file)); 609 if (file == NULL) { 610 return NULL; 611 } 612 613 file->tree = calloc(1, sizeof(*file->tree)); 614 if (file->tree == NULL) { 615 free(file); 616 return NULL; 617 } 618 619 if (pthread_spin_init(&file->lock, 0)) { 620 free(file->tree); 621 free(file); 622 return NULL; 623 } 624 625 file->fs = fs; 626 TAILQ_INIT(&file->open_requests); 627 TAILQ_INIT(&file->sync_requests); 628 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 629 file->priority = SPDK_FILE_PRIORITY_LOW; 630 return file; 631 } 632 633 static void fs_load_done(void *ctx, int bserrno); 634 635 static int 636 _handle_deleted_files(struct spdk_fs_request *req) 637 { 638 struct spdk_fs_cb_args *args = &req->args; 639 struct spdk_filesystem *fs = args->fs; 640 641 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 642 struct spdk_deleted_file *deleted_file; 643 644 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 645 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 646 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 647 free(deleted_file); 648 return 0; 649 } 650 651 return 1; 652 } 653 654 static void 655 fs_load_done(void *ctx, int bserrno) 656 { 657 struct spdk_fs_request *req = ctx; 658 struct spdk_fs_cb_args *args = &req->args; 659 struct spdk_filesystem *fs = args->fs; 660 661 /* The filesystem has been loaded. Now check if there are any files that 662 * were marked for deletion before last unload. Do not complete the 663 * fs_load callback until all of them have been deleted on disk. 664 */ 665 if (_handle_deleted_files(req) == 0) { 666 /* We found a file that's been marked for deleting but not actually 667 * deleted yet. This function will get called again once the delete 668 * operation is completed. 669 */ 670 return; 671 } 672 673 args->fn.fs_op_with_handle(args->arg, fs, 0); 674 free_fs_request(req); 675 676 } 677 678 static void 679 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 680 { 681 struct spdk_fs_request *req = ctx; 682 struct spdk_fs_cb_args *args = &req->args; 683 struct spdk_filesystem *fs = args->fs; 684 uint64_t *length; 685 const char *name; 686 uint32_t *is_deleted; 687 size_t value_len; 688 689 if (rc < 0) { 690 args->fn.fs_op_with_handle(args->arg, fs, rc); 691 free_fs_request(req); 692 return; 693 } 694 695 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 696 if (rc < 0) { 697 args->fn.fs_op_with_handle(args->arg, fs, rc); 698 free_fs_request(req); 699 return; 700 } 701 702 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 703 if (rc < 0) { 704 args->fn.fs_op_with_handle(args->arg, fs, rc); 705 free_fs_request(req); 706 return; 707 } 708 709 assert(value_len == 8); 710 711 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 712 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 713 if (rc < 0) { 714 struct spdk_file *f; 715 716 f = file_alloc(fs); 717 if (f == NULL) { 718 SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n"); 719 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 720 free_fs_request(req); 721 return; 722 } 723 724 f->name = strdup(name); 725 if (!f->name) { 726 SPDK_ERRLOG("Cannot allocate memory for file name\n"); 727 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 728 free_fs_request(req); 729 file_free(f); 730 return; 731 } 732 733 f->blobid = spdk_blob_get_id(blob); 734 f->length = *length; 735 f->length_flushed = *length; 736 f->length_xattr = *length; 737 f->append_pos = *length; 738 SPDK_DEBUGLOG(blobfs, "added file %s length=%ju\n", f->name, f->length); 739 } else { 740 struct spdk_deleted_file *deleted_file; 741 742 deleted_file = calloc(1, sizeof(*deleted_file)); 743 if (deleted_file == NULL) { 744 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 745 free_fs_request(req); 746 return; 747 } 748 deleted_file->id = spdk_blob_get_id(blob); 749 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 750 } 751 } 752 753 static void 754 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 755 { 756 struct spdk_fs_request *req = ctx; 757 struct spdk_fs_cb_args *args = &req->args; 758 struct spdk_filesystem *fs = args->fs; 759 struct spdk_bs_type bstype; 760 static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE}; 761 static const struct spdk_bs_type zeros; 762 763 if (bserrno != 0) { 764 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 765 free_fs_request(req); 766 fs_free_io_channels(fs); 767 fs_io_device_unregister(fs); 768 return; 769 } 770 771 bstype = spdk_bs_get_bstype(bs); 772 773 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 774 SPDK_DEBUGLOG(blobfs, "assigning bstype\n"); 775 spdk_bs_set_bstype(bs, blobfs_type); 776 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 777 SPDK_ERRLOG("not blobfs\n"); 778 SPDK_LOGDUMP(blobfs, "bstype", &bstype, sizeof(bstype)); 779 args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL); 780 free_fs_request(req); 781 fs_free_io_channels(fs); 782 fs_io_device_unregister(fs); 783 return; 784 } 785 786 common_fs_bs_init(fs, bs); 787 fs_load_done(req, 0); 788 } 789 790 static void 791 fs_io_device_unregister(struct spdk_filesystem *fs) 792 { 793 assert(fs != NULL); 794 spdk_io_device_unregister(&fs->md_target, NULL); 795 spdk_io_device_unregister(&fs->sync_target, NULL); 796 spdk_io_device_unregister(&fs->io_target, NULL); 797 free(fs); 798 } 799 800 static void 801 fs_free_io_channels(struct spdk_filesystem *fs) 802 { 803 assert(fs != NULL); 804 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 805 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 806 } 807 808 void 809 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 810 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 811 { 812 struct spdk_filesystem *fs; 813 struct spdk_fs_cb_args *args; 814 struct spdk_fs_request *req; 815 struct spdk_bs_opts bs_opts; 816 817 fs = fs_alloc(dev, send_request_fn); 818 if (fs == NULL) { 819 cb_fn(cb_arg, NULL, -ENOMEM); 820 return; 821 } 822 823 req = alloc_fs_request(fs->md_target.md_fs_channel); 824 if (req == NULL) { 825 fs_free_io_channels(fs); 826 fs_io_device_unregister(fs); 827 cb_fn(cb_arg, NULL, -ENOMEM); 828 return; 829 } 830 831 args = &req->args; 832 args->fn.fs_op_with_handle = cb_fn; 833 args->arg = cb_arg; 834 args->fs = fs; 835 TAILQ_INIT(&args->op.fs_load.deleted_files); 836 spdk_bs_opts_init(&bs_opts, sizeof(bs_opts)); 837 bs_opts.iter_cb_fn = iter_cb; 838 bs_opts.iter_cb_arg = req; 839 spdk_bs_load(dev, &bs_opts, load_cb, req); 840 } 841 842 static void 843 unload_cb(void *ctx, int bserrno) 844 { 845 struct spdk_fs_request *req = ctx; 846 struct spdk_fs_cb_args *args = &req->args; 847 struct spdk_filesystem *fs = args->fs; 848 struct spdk_file *file, *tmp; 849 850 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 851 TAILQ_REMOVE(&fs->files, file, tailq); 852 file_free(file); 853 } 854 855 free_global_cache(); 856 857 args->fn.fs_op(args->arg, bserrno); 858 free(req); 859 860 fs_io_device_unregister(fs); 861 } 862 863 void 864 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 865 { 866 struct spdk_fs_request *req; 867 struct spdk_fs_cb_args *args; 868 869 /* 870 * We must free the md_channel before unloading the blobstore, so just 871 * allocate this request from the general heap. 872 */ 873 req = calloc(1, sizeof(*req)); 874 if (req == NULL) { 875 cb_fn(cb_arg, -ENOMEM); 876 return; 877 } 878 879 args = &req->args; 880 args->fn.fs_op = cb_fn; 881 args->arg = cb_arg; 882 args->fs = fs; 883 884 fs_free_io_channels(fs); 885 spdk_bs_unload(fs->bs, unload_cb, req); 886 } 887 888 static struct spdk_file * 889 fs_find_file(struct spdk_filesystem *fs, const char *name) 890 { 891 struct spdk_file *file; 892 893 TAILQ_FOREACH(file, &fs->files, tailq) { 894 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 895 return file; 896 } 897 } 898 899 return NULL; 900 } 901 902 void 903 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 904 spdk_file_stat_op_complete cb_fn, void *cb_arg) 905 { 906 struct spdk_file_stat stat; 907 struct spdk_file *f = NULL; 908 909 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 910 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 911 return; 912 } 913 914 f = fs_find_file(fs, name); 915 if (f != NULL) { 916 stat.blobid = f->blobid; 917 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 918 cb_fn(cb_arg, &stat, 0); 919 return; 920 } 921 922 cb_fn(cb_arg, NULL, -ENOENT); 923 } 924 925 static void 926 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 927 { 928 struct spdk_fs_request *req = arg; 929 struct spdk_fs_cb_args *args = &req->args; 930 931 args->rc = fserrno; 932 if (fserrno == 0) { 933 memcpy(args->arg, stat, sizeof(*stat)); 934 } 935 sem_post(args->sem); 936 } 937 938 static void 939 __file_stat(void *arg) 940 { 941 struct spdk_fs_request *req = arg; 942 struct spdk_fs_cb_args *args = &req->args; 943 944 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 945 args->fn.stat_op, req); 946 } 947 948 int 949 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 950 const char *name, struct spdk_file_stat *stat) 951 { 952 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 953 struct spdk_fs_request *req; 954 int rc; 955 956 req = alloc_fs_request(channel); 957 if (req == NULL) { 958 SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name); 959 return -ENOMEM; 960 } 961 962 req->args.fs = fs; 963 req->args.op.stat.name = name; 964 req->args.fn.stat_op = __copy_stat; 965 req->args.arg = stat; 966 req->args.sem = &channel->sem; 967 channel->send_request(__file_stat, req); 968 sem_wait(&channel->sem); 969 970 rc = req->args.rc; 971 free_fs_request(req); 972 973 return rc; 974 } 975 976 static void 977 fs_create_blob_close_cb(void *ctx, int bserrno) 978 { 979 int rc; 980 struct spdk_fs_request *req = ctx; 981 struct spdk_fs_cb_args *args = &req->args; 982 983 rc = args->rc ? args->rc : bserrno; 984 args->fn.file_op(args->arg, rc); 985 free_fs_request(req); 986 } 987 988 static void 989 fs_create_blob_resize_cb(void *ctx, int bserrno) 990 { 991 struct spdk_fs_request *req = ctx; 992 struct spdk_fs_cb_args *args = &req->args; 993 struct spdk_file *f = args->file; 994 struct spdk_blob *blob = args->op.create.blob; 995 uint64_t length = 0; 996 997 args->rc = bserrno; 998 if (bserrno) { 999 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1000 return; 1001 } 1002 1003 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 1004 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 1005 1006 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1007 } 1008 1009 static void 1010 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1011 { 1012 struct spdk_fs_request *req = ctx; 1013 struct spdk_fs_cb_args *args = &req->args; 1014 1015 if (bserrno) { 1016 args->fn.file_op(args->arg, bserrno); 1017 free_fs_request(req); 1018 return; 1019 } 1020 1021 args->op.create.blob = blob; 1022 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 1023 } 1024 1025 static void 1026 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 1027 { 1028 struct spdk_fs_request *req = ctx; 1029 struct spdk_fs_cb_args *args = &req->args; 1030 struct spdk_file *f = args->file; 1031 1032 if (bserrno) { 1033 args->fn.file_op(args->arg, bserrno); 1034 free_fs_request(req); 1035 return; 1036 } 1037 1038 f->blobid = blobid; 1039 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 1040 } 1041 1042 void 1043 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 1044 spdk_file_op_complete cb_fn, void *cb_arg) 1045 { 1046 struct spdk_file *file; 1047 struct spdk_fs_request *req; 1048 struct spdk_fs_cb_args *args; 1049 1050 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1051 cb_fn(cb_arg, -ENAMETOOLONG); 1052 return; 1053 } 1054 1055 file = fs_find_file(fs, name); 1056 if (file != NULL) { 1057 cb_fn(cb_arg, -EEXIST); 1058 return; 1059 } 1060 1061 file = file_alloc(fs); 1062 if (file == NULL) { 1063 SPDK_ERRLOG("Cannot allocate new file for creation\n"); 1064 cb_fn(cb_arg, -ENOMEM); 1065 return; 1066 } 1067 1068 req = alloc_fs_request(fs->md_target.md_fs_channel); 1069 if (req == NULL) { 1070 SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name); 1071 TAILQ_REMOVE(&fs->files, file, tailq); 1072 file_free(file); 1073 cb_fn(cb_arg, -ENOMEM); 1074 return; 1075 } 1076 1077 args = &req->args; 1078 args->file = file; 1079 args->fn.file_op = cb_fn; 1080 args->arg = cb_arg; 1081 1082 file->name = strdup(name); 1083 if (!file->name) { 1084 SPDK_ERRLOG("Cannot allocate file->name for file=%s\n", name); 1085 free_fs_request(req); 1086 TAILQ_REMOVE(&fs->files, file, tailq); 1087 file_free(file); 1088 cb_fn(cb_arg, -ENOMEM); 1089 return; 1090 } 1091 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 1092 } 1093 1094 static void 1095 __fs_create_file_done(void *arg, int fserrno) 1096 { 1097 struct spdk_fs_request *req = arg; 1098 struct spdk_fs_cb_args *args = &req->args; 1099 1100 __wake_caller(args, fserrno); 1101 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name); 1102 } 1103 1104 static void 1105 __fs_create_file(void *arg) 1106 { 1107 struct spdk_fs_request *req = arg; 1108 struct spdk_fs_cb_args *args = &req->args; 1109 1110 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name); 1111 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1112 } 1113 1114 int 1115 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name) 1116 { 1117 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1118 struct spdk_fs_request *req; 1119 struct spdk_fs_cb_args *args; 1120 int rc; 1121 1122 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1123 1124 req = alloc_fs_request(channel); 1125 if (req == NULL) { 1126 SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name); 1127 return -ENOMEM; 1128 } 1129 1130 args = &req->args; 1131 args->fs = fs; 1132 args->op.create.name = name; 1133 args->sem = &channel->sem; 1134 fs->send_request(__fs_create_file, req); 1135 sem_wait(&channel->sem); 1136 rc = args->rc; 1137 free_fs_request(req); 1138 1139 return rc; 1140 } 1141 1142 static void 1143 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1144 { 1145 struct spdk_fs_request *req = ctx; 1146 struct spdk_fs_cb_args *args = &req->args; 1147 struct spdk_file *f = args->file; 1148 1149 f->blob = blob; 1150 while (!TAILQ_EMPTY(&f->open_requests)) { 1151 req = TAILQ_FIRST(&f->open_requests); 1152 args = &req->args; 1153 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1154 spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->name); 1155 args->fn.file_op_with_handle(args->arg, f, bserrno); 1156 free_fs_request(req); 1157 } 1158 } 1159 1160 static void 1161 fs_open_blob_create_cb(void *ctx, int bserrno) 1162 { 1163 struct spdk_fs_request *req = ctx; 1164 struct spdk_fs_cb_args *args = &req->args; 1165 struct spdk_file *file = args->file; 1166 struct spdk_filesystem *fs = args->fs; 1167 1168 if (file == NULL) { 1169 /* 1170 * This is from an open with CREATE flag - the file 1171 * is now created so look it up in the file list for this 1172 * filesystem. 1173 */ 1174 file = fs_find_file(fs, args->op.open.name); 1175 assert(file != NULL); 1176 args->file = file; 1177 } 1178 1179 file->ref_count++; 1180 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1181 if (file->ref_count == 1) { 1182 assert(file->blob == NULL); 1183 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1184 } else if (file->blob != NULL) { 1185 fs_open_blob_done(req, file->blob, 0); 1186 } else { 1187 /* 1188 * The blob open for this file is in progress due to a previous 1189 * open request. When that open completes, it will invoke the 1190 * open callback for this request. 1191 */ 1192 } 1193 } 1194 1195 void 1196 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1197 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1198 { 1199 struct spdk_file *f = NULL; 1200 struct spdk_fs_request *req; 1201 struct spdk_fs_cb_args *args; 1202 1203 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1204 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1205 return; 1206 } 1207 1208 f = fs_find_file(fs, name); 1209 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1210 cb_fn(cb_arg, NULL, -ENOENT); 1211 return; 1212 } 1213 1214 if (f != NULL && f->is_deleted == true) { 1215 cb_fn(cb_arg, NULL, -ENOENT); 1216 return; 1217 } 1218 1219 req = alloc_fs_request(fs->md_target.md_fs_channel); 1220 if (req == NULL) { 1221 SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name); 1222 cb_fn(cb_arg, NULL, -ENOMEM); 1223 return; 1224 } 1225 1226 args = &req->args; 1227 args->fn.file_op_with_handle = cb_fn; 1228 args->arg = cb_arg; 1229 args->file = f; 1230 args->fs = fs; 1231 args->op.open.name = name; 1232 1233 if (f == NULL) { 1234 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1235 } else { 1236 fs_open_blob_create_cb(req, 0); 1237 } 1238 } 1239 1240 static void 1241 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1242 { 1243 struct spdk_fs_request *req = arg; 1244 struct spdk_fs_cb_args *args = &req->args; 1245 1246 args->file = file; 1247 __wake_caller(args, bserrno); 1248 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name); 1249 } 1250 1251 static void 1252 __fs_open_file(void *arg) 1253 { 1254 struct spdk_fs_request *req = arg; 1255 struct spdk_fs_cb_args *args = &req->args; 1256 1257 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name); 1258 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1259 __fs_open_file_done, req); 1260 } 1261 1262 int 1263 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1264 const char *name, uint32_t flags, struct spdk_file **file) 1265 { 1266 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1267 struct spdk_fs_request *req; 1268 struct spdk_fs_cb_args *args; 1269 int rc; 1270 1271 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1272 1273 req = alloc_fs_request(channel); 1274 if (req == NULL) { 1275 SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name); 1276 return -ENOMEM; 1277 } 1278 1279 args = &req->args; 1280 args->fs = fs; 1281 args->op.open.name = name; 1282 args->op.open.flags = flags; 1283 args->sem = &channel->sem; 1284 fs->send_request(__fs_open_file, req); 1285 sem_wait(&channel->sem); 1286 rc = args->rc; 1287 if (rc == 0) { 1288 *file = args->file; 1289 } else { 1290 *file = NULL; 1291 } 1292 free_fs_request(req); 1293 1294 return rc; 1295 } 1296 1297 static void 1298 fs_rename_blob_close_cb(void *ctx, int bserrno) 1299 { 1300 struct spdk_fs_request *req = ctx; 1301 struct spdk_fs_cb_args *args = &req->args; 1302 1303 args->fn.fs_op(args->arg, bserrno); 1304 free_fs_request(req); 1305 } 1306 1307 static void 1308 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1309 { 1310 struct spdk_fs_request *req = ctx; 1311 struct spdk_fs_cb_args *args = &req->args; 1312 const char *new_name = args->op.rename.new_name; 1313 1314 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1315 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1316 } 1317 1318 static void 1319 _fs_md_rename_file(struct spdk_fs_request *req) 1320 { 1321 struct spdk_fs_cb_args *args = &req->args; 1322 struct spdk_file *f; 1323 1324 f = fs_find_file(args->fs, args->op.rename.old_name); 1325 if (f == NULL) { 1326 args->fn.fs_op(args->arg, -ENOENT); 1327 free_fs_request(req); 1328 return; 1329 } 1330 1331 free(f->name); 1332 f->name = strdup(args->op.rename.new_name); 1333 if (!f->name) { 1334 SPDK_ERRLOG("Cannot allocate memory for file name\n"); 1335 args->fn.fs_op(args->arg, -ENOMEM); 1336 free_fs_request(req); 1337 return; 1338 } 1339 1340 args->file = f; 1341 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1342 } 1343 1344 static void 1345 fs_rename_delete_done(void *arg, int fserrno) 1346 { 1347 _fs_md_rename_file(arg); 1348 } 1349 1350 void 1351 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1352 const char *old_name, const char *new_name, 1353 spdk_file_op_complete cb_fn, void *cb_arg) 1354 { 1355 struct spdk_file *f; 1356 struct spdk_fs_request *req; 1357 struct spdk_fs_cb_args *args; 1358 1359 SPDK_DEBUGLOG(blobfs, "old=%s new=%s\n", old_name, new_name); 1360 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1361 cb_fn(cb_arg, -ENAMETOOLONG); 1362 return; 1363 } 1364 1365 req = alloc_fs_request(fs->md_target.md_fs_channel); 1366 if (req == NULL) { 1367 SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name, 1368 new_name); 1369 cb_fn(cb_arg, -ENOMEM); 1370 return; 1371 } 1372 1373 args = &req->args; 1374 args->fn.fs_op = cb_fn; 1375 args->fs = fs; 1376 args->arg = cb_arg; 1377 args->op.rename.old_name = old_name; 1378 args->op.rename.new_name = new_name; 1379 1380 f = fs_find_file(fs, new_name); 1381 if (f == NULL) { 1382 _fs_md_rename_file(req); 1383 return; 1384 } 1385 1386 /* 1387 * The rename overwrites an existing file. So delete the existing file, then 1388 * do the actual rename. 1389 */ 1390 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1391 } 1392 1393 static void 1394 __fs_rename_file_done(void *arg, int fserrno) 1395 { 1396 struct spdk_fs_request *req = arg; 1397 struct spdk_fs_cb_args *args = &req->args; 1398 1399 __wake_caller(args, fserrno); 1400 } 1401 1402 static void 1403 __fs_rename_file(void *arg) 1404 { 1405 struct spdk_fs_request *req = arg; 1406 struct spdk_fs_cb_args *args = &req->args; 1407 1408 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1409 __fs_rename_file_done, req); 1410 } 1411 1412 int 1413 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1414 const char *old_name, const char *new_name) 1415 { 1416 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1417 struct spdk_fs_request *req; 1418 struct spdk_fs_cb_args *args; 1419 int rc; 1420 1421 req = alloc_fs_request(channel); 1422 if (req == NULL) { 1423 SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name); 1424 return -ENOMEM; 1425 } 1426 1427 args = &req->args; 1428 1429 args->fs = fs; 1430 args->op.rename.old_name = old_name; 1431 args->op.rename.new_name = new_name; 1432 args->sem = &channel->sem; 1433 fs->send_request(__fs_rename_file, req); 1434 sem_wait(&channel->sem); 1435 rc = args->rc; 1436 free_fs_request(req); 1437 return rc; 1438 } 1439 1440 static void 1441 blob_delete_cb(void *ctx, int bserrno) 1442 { 1443 struct spdk_fs_request *req = ctx; 1444 struct spdk_fs_cb_args *args = &req->args; 1445 1446 args->fn.file_op(args->arg, bserrno); 1447 free_fs_request(req); 1448 } 1449 1450 void 1451 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1452 spdk_file_op_complete cb_fn, void *cb_arg) 1453 { 1454 struct spdk_file *f; 1455 spdk_blob_id blobid; 1456 struct spdk_fs_request *req; 1457 struct spdk_fs_cb_args *args; 1458 1459 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1460 1461 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1462 cb_fn(cb_arg, -ENAMETOOLONG); 1463 return; 1464 } 1465 1466 f = fs_find_file(fs, name); 1467 if (f == NULL) { 1468 SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name); 1469 cb_fn(cb_arg, -ENOENT); 1470 return; 1471 } 1472 1473 req = alloc_fs_request(fs->md_target.md_fs_channel); 1474 if (req == NULL) { 1475 SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name); 1476 cb_fn(cb_arg, -ENOMEM); 1477 return; 1478 } 1479 1480 args = &req->args; 1481 args->fn.file_op = cb_fn; 1482 args->arg = cb_arg; 1483 1484 if (f->ref_count > 0) { 1485 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1486 f->is_deleted = true; 1487 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1488 spdk_blob_sync_md(f->blob, blob_delete_cb, req); 1489 return; 1490 } 1491 1492 blobid = f->blobid; 1493 TAILQ_REMOVE(&fs->files, f, tailq); 1494 1495 file_free(f); 1496 1497 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1498 } 1499 1500 static void 1501 __fs_delete_file_done(void *arg, int fserrno) 1502 { 1503 struct spdk_fs_request *req = arg; 1504 struct spdk_fs_cb_args *args = &req->args; 1505 1506 spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, args->op.delete.name); 1507 __wake_caller(args, fserrno); 1508 } 1509 1510 static void 1511 __fs_delete_file(void *arg) 1512 { 1513 struct spdk_fs_request *req = arg; 1514 struct spdk_fs_cb_args *args = &req->args; 1515 1516 spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, args->op.delete.name); 1517 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1518 } 1519 1520 int 1521 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1522 const char *name) 1523 { 1524 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1525 struct spdk_fs_request *req; 1526 struct spdk_fs_cb_args *args; 1527 int rc; 1528 1529 req = alloc_fs_request(channel); 1530 if (req == NULL) { 1531 SPDK_DEBUGLOG(blobfs, "Cannot allocate req to delete file=%s\n", name); 1532 return -ENOMEM; 1533 } 1534 1535 args = &req->args; 1536 args->fs = fs; 1537 args->op.delete.name = name; 1538 args->sem = &channel->sem; 1539 fs->send_request(__fs_delete_file, req); 1540 sem_wait(&channel->sem); 1541 rc = args->rc; 1542 free_fs_request(req); 1543 1544 return rc; 1545 } 1546 1547 spdk_fs_iter 1548 spdk_fs_iter_first(struct spdk_filesystem *fs) 1549 { 1550 struct spdk_file *f; 1551 1552 f = TAILQ_FIRST(&fs->files); 1553 return f; 1554 } 1555 1556 spdk_fs_iter 1557 spdk_fs_iter_next(spdk_fs_iter iter) 1558 { 1559 struct spdk_file *f = iter; 1560 1561 if (f == NULL) { 1562 return NULL; 1563 } 1564 1565 f = TAILQ_NEXT(f, tailq); 1566 return f; 1567 } 1568 1569 const char * 1570 spdk_file_get_name(struct spdk_file *file) 1571 { 1572 return file->name; 1573 } 1574 1575 uint64_t 1576 spdk_file_get_length(struct spdk_file *file) 1577 { 1578 uint64_t length; 1579 1580 assert(file != NULL); 1581 1582 length = file->append_pos >= file->length ? file->append_pos : file->length; 1583 SPDK_DEBUGLOG(blobfs, "file=%s length=0x%jx\n", file->name, length); 1584 return length; 1585 } 1586 1587 static void 1588 fs_truncate_complete_cb(void *ctx, int bserrno) 1589 { 1590 struct spdk_fs_request *req = ctx; 1591 struct spdk_fs_cb_args *args = &req->args; 1592 1593 args->fn.file_op(args->arg, bserrno); 1594 free_fs_request(req); 1595 } 1596 1597 static void 1598 fs_truncate_resize_cb(void *ctx, int bserrno) 1599 { 1600 struct spdk_fs_request *req = ctx; 1601 struct spdk_fs_cb_args *args = &req->args; 1602 struct spdk_file *file = args->file; 1603 uint64_t *length = &args->op.truncate.length; 1604 1605 if (bserrno) { 1606 args->fn.file_op(args->arg, bserrno); 1607 free_fs_request(req); 1608 return; 1609 } 1610 1611 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1612 1613 file->length = *length; 1614 if (file->append_pos > file->length) { 1615 file->append_pos = file->length; 1616 } 1617 1618 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req); 1619 } 1620 1621 static uint64_t 1622 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1623 { 1624 return (length + cluster_sz - 1) / cluster_sz; 1625 } 1626 1627 void 1628 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1629 spdk_file_op_complete cb_fn, void *cb_arg) 1630 { 1631 struct spdk_filesystem *fs; 1632 size_t num_clusters; 1633 struct spdk_fs_request *req; 1634 struct spdk_fs_cb_args *args; 1635 1636 SPDK_DEBUGLOG(blobfs, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1637 if (length == file->length) { 1638 cb_fn(cb_arg, 0); 1639 return; 1640 } 1641 1642 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1643 if (req == NULL) { 1644 cb_fn(cb_arg, -ENOMEM); 1645 return; 1646 } 1647 1648 args = &req->args; 1649 args->fn.file_op = cb_fn; 1650 args->arg = cb_arg; 1651 args->file = file; 1652 args->op.truncate.length = length; 1653 fs = file->fs; 1654 1655 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1656 1657 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1658 } 1659 1660 static void 1661 __truncate(void *arg) 1662 { 1663 struct spdk_fs_request *req = arg; 1664 struct spdk_fs_cb_args *args = &req->args; 1665 1666 spdk_file_truncate_async(args->file, args->op.truncate.length, 1667 args->fn.file_op, args); 1668 } 1669 1670 int 1671 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 1672 uint64_t length) 1673 { 1674 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1675 struct spdk_fs_request *req; 1676 struct spdk_fs_cb_args *args; 1677 int rc; 1678 1679 req = alloc_fs_request(channel); 1680 if (req == NULL) { 1681 return -ENOMEM; 1682 } 1683 1684 args = &req->args; 1685 1686 args->file = file; 1687 args->op.truncate.length = length; 1688 args->fn.file_op = __wake_caller; 1689 args->sem = &channel->sem; 1690 1691 channel->send_request(__truncate, req); 1692 sem_wait(&channel->sem); 1693 rc = args->rc; 1694 free_fs_request(req); 1695 1696 return rc; 1697 } 1698 1699 static void 1700 __rw_done(void *ctx, int bserrno) 1701 { 1702 struct spdk_fs_request *req = ctx; 1703 struct spdk_fs_cb_args *args = &req->args; 1704 1705 spdk_free(args->op.rw.pin_buf); 1706 args->fn.file_op(args->arg, bserrno); 1707 free_fs_request(req); 1708 } 1709 1710 static void 1711 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 1712 { 1713 int i; 1714 size_t len; 1715 1716 for (i = 0; i < iovcnt; i++) { 1717 len = spdk_min(iovs[i].iov_len, buf_len); 1718 memcpy(buf, iovs[i].iov_base, len); 1719 buf += len; 1720 assert(buf_len >= len); 1721 buf_len -= len; 1722 } 1723 } 1724 1725 static void 1726 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 1727 { 1728 int i; 1729 size_t len; 1730 1731 for (i = 0; i < iovcnt; i++) { 1732 len = spdk_min(iovs[i].iov_len, buf_len); 1733 memcpy(iovs[i].iov_base, buf, len); 1734 buf += len; 1735 assert(buf_len >= len); 1736 buf_len -= len; 1737 } 1738 } 1739 1740 static void 1741 __read_done(void *ctx, int bserrno) 1742 { 1743 struct spdk_fs_request *req = ctx; 1744 struct spdk_fs_cb_args *args = &req->args; 1745 void *buf; 1746 1747 assert(req != NULL); 1748 buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1))); 1749 if (args->op.rw.is_read) { 1750 _copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length); 1751 __rw_done(req, 0); 1752 } else { 1753 _copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt); 1754 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1755 args->op.rw.pin_buf, 1756 args->op.rw.start_lba, args->op.rw.num_lba, 1757 __rw_done, req); 1758 } 1759 } 1760 1761 static void 1762 __do_blob_read(void *ctx, int fserrno) 1763 { 1764 struct spdk_fs_request *req = ctx; 1765 struct spdk_fs_cb_args *args = &req->args; 1766 1767 if (fserrno) { 1768 __rw_done(req, fserrno); 1769 return; 1770 } 1771 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1772 args->op.rw.pin_buf, 1773 args->op.rw.start_lba, args->op.rw.num_lba, 1774 __read_done, req); 1775 } 1776 1777 static void 1778 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1779 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1780 { 1781 uint64_t end_lba; 1782 1783 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1784 *start_lba = offset / *lba_size; 1785 end_lba = (offset + length - 1) / *lba_size; 1786 *num_lba = (end_lba - *start_lba + 1); 1787 } 1788 1789 static bool 1790 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length) 1791 { 1792 uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1793 1794 if ((offset % lba_size == 0) && (length % lba_size == 0)) { 1795 return true; 1796 } 1797 1798 return false; 1799 } 1800 1801 static void 1802 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt) 1803 { 1804 uint32_t i; 1805 1806 for (i = 0; i < iovcnt; i++) { 1807 req->args.iovs[i].iov_base = iovs[i].iov_base; 1808 req->args.iovs[i].iov_len = iovs[i].iov_len; 1809 } 1810 } 1811 1812 static void 1813 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel, 1814 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1815 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1816 { 1817 struct spdk_fs_request *req; 1818 struct spdk_fs_cb_args *args; 1819 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1820 uint64_t start_lba, num_lba, pin_buf_length; 1821 uint32_t lba_size; 1822 1823 if (is_read && offset + length > file->length) { 1824 cb_fn(cb_arg, -EINVAL); 1825 return; 1826 } 1827 1828 req = alloc_fs_request_with_iov(channel, iovcnt); 1829 if (req == NULL) { 1830 cb_fn(cb_arg, -ENOMEM); 1831 return; 1832 } 1833 1834 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1835 1836 args = &req->args; 1837 args->fn.file_op = cb_fn; 1838 args->arg = cb_arg; 1839 args->file = file; 1840 args->op.rw.channel = channel->bs_channel; 1841 _fs_request_setup_iovs(req, iovs, iovcnt); 1842 args->op.rw.is_read = is_read; 1843 args->op.rw.offset = offset; 1844 args->op.rw.blocklen = lba_size; 1845 1846 pin_buf_length = num_lba * lba_size; 1847 args->op.rw.length = pin_buf_length; 1848 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL, 1849 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1850 if (args->op.rw.pin_buf == NULL) { 1851 SPDK_DEBUGLOG(blobfs, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1852 file->name, offset, length); 1853 free_fs_request(req); 1854 cb_fn(cb_arg, -ENOMEM); 1855 return; 1856 } 1857 1858 args->op.rw.start_lba = start_lba; 1859 args->op.rw.num_lba = num_lba; 1860 1861 if (!is_read && file->length < offset + length) { 1862 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1863 } else if (!is_read && __is_lba_aligned(file, offset, length)) { 1864 _copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt); 1865 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1866 args->op.rw.pin_buf, 1867 args->op.rw.start_lba, args->op.rw.num_lba, 1868 __rw_done, req); 1869 } else { 1870 __do_blob_read(req, 0); 1871 } 1872 } 1873 1874 static void 1875 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel, 1876 void *payload, uint64_t offset, uint64_t length, 1877 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1878 { 1879 struct iovec iov; 1880 1881 iov.iov_base = payload; 1882 iov.iov_len = (size_t)length; 1883 1884 __readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read); 1885 } 1886 1887 void 1888 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1889 void *payload, uint64_t offset, uint64_t length, 1890 spdk_file_op_complete cb_fn, void *cb_arg) 1891 { 1892 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1893 } 1894 1895 void 1896 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel, 1897 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1898 spdk_file_op_complete cb_fn, void *cb_arg) 1899 { 1900 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1901 file->name, offset, length); 1902 1903 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0); 1904 } 1905 1906 void 1907 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1908 void *payload, uint64_t offset, uint64_t length, 1909 spdk_file_op_complete cb_fn, void *cb_arg) 1910 { 1911 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1912 file->name, offset, length); 1913 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1914 } 1915 1916 void 1917 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel, 1918 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1919 spdk_file_op_complete cb_fn, void *cb_arg) 1920 { 1921 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1922 file->name, offset, length); 1923 1924 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1); 1925 } 1926 1927 struct spdk_io_channel * 1928 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1929 { 1930 struct spdk_io_channel *io_channel; 1931 struct spdk_fs_channel *fs_channel; 1932 1933 io_channel = spdk_get_io_channel(&fs->io_target); 1934 fs_channel = spdk_io_channel_get_ctx(io_channel); 1935 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1936 fs_channel->send_request = __send_request_direct; 1937 1938 return io_channel; 1939 } 1940 1941 void 1942 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1943 { 1944 spdk_put_io_channel(channel); 1945 } 1946 1947 struct spdk_fs_thread_ctx * 1948 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs) 1949 { 1950 struct spdk_fs_thread_ctx *ctx; 1951 1952 ctx = calloc(1, sizeof(*ctx)); 1953 if (!ctx) { 1954 return NULL; 1955 } 1956 1957 if (pthread_spin_init(&ctx->ch.lock, 0)) { 1958 free(ctx); 1959 return NULL; 1960 } 1961 1962 fs_channel_create(fs, &ctx->ch, 512); 1963 1964 ctx->ch.send_request = fs->send_request; 1965 ctx->ch.sync = 1; 1966 1967 return ctx; 1968 } 1969 1970 1971 void 1972 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx) 1973 { 1974 assert(ctx->ch.sync == 1); 1975 1976 while (true) { 1977 pthread_spin_lock(&ctx->ch.lock); 1978 if (ctx->ch.outstanding_reqs == 0) { 1979 pthread_spin_unlock(&ctx->ch.lock); 1980 break; 1981 } 1982 pthread_spin_unlock(&ctx->ch.lock); 1983 usleep(1000); 1984 } 1985 1986 fs_channel_destroy(NULL, &ctx->ch); 1987 free(ctx); 1988 } 1989 1990 int 1991 spdk_fs_set_cache_size(uint64_t size_in_mb) 1992 { 1993 /* setting g_fs_cache_size is only permitted if cache pool 1994 * is already freed or hasn't been initialized 1995 */ 1996 if (g_cache_pool != NULL) { 1997 return -EPERM; 1998 } 1999 2000 g_fs_cache_size = size_in_mb * 1024 * 1024; 2001 2002 return 0; 2003 } 2004 2005 uint64_t 2006 spdk_fs_get_cache_size(void) 2007 { 2008 return g_fs_cache_size / (1024 * 1024); 2009 } 2010 2011 static void __file_flush(void *ctx); 2012 2013 /* Try to free some cache buffers from this file. 2014 */ 2015 static int 2016 reclaim_cache_buffers(struct spdk_file *file) 2017 { 2018 int rc; 2019 2020 BLOBFS_TRACE(file, "free=%s\n", file->name); 2021 2022 /* The function is safe to be called with any threads, while the file 2023 * lock maybe locked by other thread for now, so try to get the file 2024 * lock here. 2025 */ 2026 rc = pthread_spin_trylock(&file->lock); 2027 if (rc != 0) { 2028 return -1; 2029 } 2030 2031 if (file->tree->present_mask == 0) { 2032 pthread_spin_unlock(&file->lock); 2033 return -1; 2034 } 2035 tree_free_buffers(file->tree); 2036 2037 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2038 /* If not freed, put it in the end of the queue */ 2039 if (file->tree->present_mask != 0) { 2040 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2041 } else { 2042 file->last = NULL; 2043 } 2044 pthread_spin_unlock(&file->lock); 2045 2046 return 0; 2047 } 2048 2049 static int 2050 _blobfs_cache_pool_reclaim(void *arg) 2051 { 2052 struct spdk_file *file, *tmp; 2053 int rc; 2054 2055 if (!blobfs_cache_pool_need_reclaim()) { 2056 return SPDK_POLLER_IDLE; 2057 } 2058 2059 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2060 if (!file->open_for_writing && 2061 file->priority == SPDK_FILE_PRIORITY_LOW) { 2062 rc = reclaim_cache_buffers(file); 2063 if (rc < 0) { 2064 continue; 2065 } 2066 if (!blobfs_cache_pool_need_reclaim()) { 2067 return SPDK_POLLER_BUSY; 2068 } 2069 break; 2070 } 2071 } 2072 2073 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2074 if (!file->open_for_writing) { 2075 rc = reclaim_cache_buffers(file); 2076 if (rc < 0) { 2077 continue; 2078 } 2079 if (!blobfs_cache_pool_need_reclaim()) { 2080 return SPDK_POLLER_BUSY; 2081 } 2082 break; 2083 } 2084 } 2085 2086 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2087 rc = reclaim_cache_buffers(file); 2088 if (rc < 0) { 2089 continue; 2090 } 2091 break; 2092 } 2093 2094 return SPDK_POLLER_BUSY; 2095 } 2096 2097 static void 2098 _add_file_to_cache_pool(void *ctx) 2099 { 2100 struct spdk_file *file = ctx; 2101 2102 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2103 } 2104 2105 static void 2106 _remove_file_from_cache_pool(void *ctx) 2107 { 2108 struct spdk_file *file = ctx; 2109 2110 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2111 } 2112 2113 static struct cache_buffer * 2114 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 2115 { 2116 struct cache_buffer *buf; 2117 int count = 0; 2118 bool need_update = false; 2119 2120 buf = calloc(1, sizeof(*buf)); 2121 if (buf == NULL) { 2122 SPDK_DEBUGLOG(blobfs, "calloc failed\n"); 2123 return NULL; 2124 } 2125 2126 do { 2127 buf->buf = spdk_mempool_get(g_cache_pool); 2128 if (buf->buf) { 2129 break; 2130 } 2131 if (count++ == 100) { 2132 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 2133 file, offset); 2134 free(buf); 2135 return NULL; 2136 } 2137 usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 2138 } while (true); 2139 2140 buf->buf_size = CACHE_BUFFER_SIZE; 2141 buf->offset = offset; 2142 2143 if (file->tree->present_mask == 0) { 2144 need_update = true; 2145 } 2146 file->tree = tree_insert_buffer(file->tree, buf); 2147 2148 if (need_update) { 2149 spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file); 2150 } 2151 2152 return buf; 2153 } 2154 2155 static struct cache_buffer * 2156 cache_append_buffer(struct spdk_file *file) 2157 { 2158 struct cache_buffer *last; 2159 2160 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 2161 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 2162 2163 last = cache_insert_buffer(file, file->append_pos); 2164 if (last == NULL) { 2165 SPDK_DEBUGLOG(blobfs, "cache_insert_buffer failed\n"); 2166 return NULL; 2167 } 2168 2169 file->last = last; 2170 2171 return last; 2172 } 2173 2174 static void __check_sync_reqs(struct spdk_file *file); 2175 2176 static void 2177 __file_cache_finish_sync(void *ctx, int bserrno) 2178 { 2179 struct spdk_file *file; 2180 struct spdk_fs_request *sync_req = ctx; 2181 struct spdk_fs_cb_args *sync_args; 2182 2183 sync_args = &sync_req->args; 2184 file = sync_args->file; 2185 pthread_spin_lock(&file->lock); 2186 file->length_xattr = sync_args->op.sync.length; 2187 assert(sync_args->op.sync.offset <= file->length_flushed); 2188 spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset, 2189 0, file->name); 2190 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 2191 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 2192 pthread_spin_unlock(&file->lock); 2193 2194 sync_args->fn.file_op(sync_args->arg, bserrno); 2195 2196 free_fs_request(sync_req); 2197 __check_sync_reqs(file); 2198 } 2199 2200 static void 2201 __check_sync_reqs(struct spdk_file *file) 2202 { 2203 struct spdk_fs_request *sync_req; 2204 2205 pthread_spin_lock(&file->lock); 2206 2207 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 2208 if (sync_req->args.op.sync.offset <= file->length_flushed) { 2209 break; 2210 } 2211 } 2212 2213 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 2214 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 2215 sync_req->args.op.sync.xattr_in_progress = true; 2216 sync_req->args.op.sync.length = file->length_flushed; 2217 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 2218 sizeof(file->length_flushed)); 2219 2220 pthread_spin_unlock(&file->lock); 2221 spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed, 2222 0, file->name); 2223 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req); 2224 } else { 2225 pthread_spin_unlock(&file->lock); 2226 } 2227 } 2228 2229 static void 2230 __file_flush_done(void *ctx, int bserrno) 2231 { 2232 struct spdk_fs_request *req = ctx; 2233 struct spdk_fs_cb_args *args = &req->args; 2234 struct spdk_file *file = args->file; 2235 struct cache_buffer *next = args->op.flush.cache_buffer; 2236 2237 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 2238 2239 pthread_spin_lock(&file->lock); 2240 next->in_progress = false; 2241 next->bytes_flushed += args->op.flush.length; 2242 file->length_flushed += args->op.flush.length; 2243 if (file->length_flushed > file->length) { 2244 file->length = file->length_flushed; 2245 } 2246 if (next->bytes_flushed == next->buf_size) { 2247 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 2248 next = tree_find_buffer(file->tree, file->length_flushed); 2249 } 2250 2251 /* 2252 * Assert that there is no cached data that extends past the end of the underlying 2253 * blob. 2254 */ 2255 assert(next == NULL || next->offset < __file_get_blob_size(file) || 2256 next->bytes_filled == 0); 2257 2258 pthread_spin_unlock(&file->lock); 2259 2260 __check_sync_reqs(file); 2261 2262 __file_flush(req); 2263 } 2264 2265 static void 2266 __file_flush(void *ctx) 2267 { 2268 struct spdk_fs_request *req = ctx; 2269 struct spdk_fs_cb_args *args = &req->args; 2270 struct spdk_file *file = args->file; 2271 struct cache_buffer *next; 2272 uint64_t offset, length, start_lba, num_lba; 2273 uint32_t lba_size; 2274 2275 pthread_spin_lock(&file->lock); 2276 next = tree_find_buffer(file->tree, file->length_flushed); 2277 if (next == NULL || next->in_progress || 2278 ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) { 2279 /* 2280 * There is either no data to flush, a flush I/O is already in 2281 * progress, or the next buffer is partially filled but there's no 2282 * outstanding request to sync it. 2283 * So return immediately - if a flush I/O is in progress we will flush 2284 * more data after that is completed, or a partial buffer will get flushed 2285 * when it is either filled or the file is synced. 2286 */ 2287 free_fs_request(req); 2288 if (next == NULL) { 2289 /* 2290 * For cases where a file's cache was evicted, and then the 2291 * file was later appended, we will write the data directly 2292 * to disk and bypass cache. So just update length_flushed 2293 * here to reflect that all data was already written to disk. 2294 */ 2295 file->length_flushed = file->append_pos; 2296 } 2297 pthread_spin_unlock(&file->lock); 2298 if (next == NULL) { 2299 /* 2300 * There is no data to flush, but we still need to check for any 2301 * outstanding sync requests to make sure metadata gets updated. 2302 */ 2303 __check_sync_reqs(file); 2304 } 2305 return; 2306 } 2307 2308 offset = next->offset + next->bytes_flushed; 2309 length = next->bytes_filled - next->bytes_flushed; 2310 if (length == 0) { 2311 free_fs_request(req); 2312 pthread_spin_unlock(&file->lock); 2313 /* 2314 * There is no data to flush, but we still need to check for any 2315 * outstanding sync requests to make sure metadata gets updated. 2316 */ 2317 __check_sync_reqs(file); 2318 return; 2319 } 2320 args->op.flush.length = length; 2321 args->op.flush.cache_buffer = next; 2322 2323 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2324 2325 next->in_progress = true; 2326 BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n", 2327 offset, length, start_lba, num_lba); 2328 pthread_spin_unlock(&file->lock); 2329 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2330 next->buf + (start_lba * lba_size) - next->offset, 2331 start_lba, num_lba, __file_flush_done, req); 2332 } 2333 2334 static void 2335 __file_extend_done(void *arg, int bserrno) 2336 { 2337 struct spdk_fs_cb_args *args = arg; 2338 2339 __wake_caller(args, bserrno); 2340 } 2341 2342 static void 2343 __file_extend_resize_cb(void *_args, int bserrno) 2344 { 2345 struct spdk_fs_cb_args *args = _args; 2346 struct spdk_file *file = args->file; 2347 2348 if (bserrno) { 2349 __wake_caller(args, bserrno); 2350 return; 2351 } 2352 2353 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2354 } 2355 2356 static void 2357 __file_extend_blob(void *_args) 2358 { 2359 struct spdk_fs_cb_args *args = _args; 2360 struct spdk_file *file = args->file; 2361 2362 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2363 } 2364 2365 static void 2366 __rw_from_file_done(void *ctx, int bserrno) 2367 { 2368 struct spdk_fs_request *req = ctx; 2369 2370 __wake_caller(&req->args, bserrno); 2371 free_fs_request(req); 2372 } 2373 2374 static void 2375 __rw_from_file(void *ctx) 2376 { 2377 struct spdk_fs_request *req = ctx; 2378 struct spdk_fs_cb_args *args = &req->args; 2379 struct spdk_file *file = args->file; 2380 2381 if (args->op.rw.is_read) { 2382 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2383 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2384 __rw_from_file_done, req); 2385 } else { 2386 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2387 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2388 __rw_from_file_done, req); 2389 } 2390 } 2391 2392 struct rw_from_file_arg { 2393 struct spdk_fs_channel *channel; 2394 int rwerrno; 2395 }; 2396 2397 static int 2398 __send_rw_from_file(struct spdk_file *file, void *payload, 2399 uint64_t offset, uint64_t length, bool is_read, 2400 struct rw_from_file_arg *arg) 2401 { 2402 struct spdk_fs_request *req; 2403 struct spdk_fs_cb_args *args; 2404 2405 req = alloc_fs_request_with_iov(arg->channel, 1); 2406 if (req == NULL) { 2407 sem_post(&arg->channel->sem); 2408 return -ENOMEM; 2409 } 2410 2411 args = &req->args; 2412 args->file = file; 2413 args->sem = &arg->channel->sem; 2414 args->iovs[0].iov_base = payload; 2415 args->iovs[0].iov_len = (size_t)length; 2416 args->op.rw.offset = offset; 2417 args->op.rw.is_read = is_read; 2418 args->rwerrno = &arg->rwerrno; 2419 file->fs->send_request(__rw_from_file, req); 2420 return 0; 2421 } 2422 2423 int 2424 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2425 void *payload, uint64_t offset, uint64_t length) 2426 { 2427 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2428 struct spdk_fs_request *flush_req; 2429 uint64_t rem_length, copy, blob_size, cluster_sz; 2430 uint32_t cache_buffers_filled = 0; 2431 uint8_t *cur_payload; 2432 struct cache_buffer *last; 2433 2434 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2435 2436 if (length == 0) { 2437 return 0; 2438 } 2439 2440 if (offset != file->append_pos) { 2441 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2442 return -EINVAL; 2443 } 2444 2445 pthread_spin_lock(&file->lock); 2446 file->open_for_writing = true; 2447 2448 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2449 cache_append_buffer(file); 2450 } 2451 2452 if (file->last == NULL) { 2453 struct rw_from_file_arg arg = {}; 2454 int rc; 2455 2456 arg.channel = channel; 2457 arg.rwerrno = 0; 2458 file->append_pos += length; 2459 pthread_spin_unlock(&file->lock); 2460 rc = __send_rw_from_file(file, payload, offset, length, false, &arg); 2461 if (rc != 0) { 2462 return rc; 2463 } 2464 sem_wait(&channel->sem); 2465 return arg.rwerrno; 2466 } 2467 2468 blob_size = __file_get_blob_size(file); 2469 2470 if ((offset + length) > blob_size) { 2471 struct spdk_fs_cb_args extend_args = {}; 2472 2473 cluster_sz = file->fs->bs_opts.cluster_sz; 2474 extend_args.sem = &channel->sem; 2475 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2476 extend_args.file = file; 2477 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2478 pthread_spin_unlock(&file->lock); 2479 file->fs->send_request(__file_extend_blob, &extend_args); 2480 sem_wait(&channel->sem); 2481 if (extend_args.rc) { 2482 return extend_args.rc; 2483 } 2484 } 2485 2486 flush_req = alloc_fs_request(channel); 2487 if (flush_req == NULL) { 2488 pthread_spin_unlock(&file->lock); 2489 return -ENOMEM; 2490 } 2491 2492 last = file->last; 2493 rem_length = length; 2494 cur_payload = payload; 2495 while (rem_length > 0) { 2496 copy = last->buf_size - last->bytes_filled; 2497 if (copy > rem_length) { 2498 copy = rem_length; 2499 } 2500 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2501 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2502 file->append_pos += copy; 2503 if (file->length < file->append_pos) { 2504 file->length = file->append_pos; 2505 } 2506 cur_payload += copy; 2507 last->bytes_filled += copy; 2508 rem_length -= copy; 2509 if (last->bytes_filled == last->buf_size) { 2510 cache_buffers_filled++; 2511 last = cache_append_buffer(file); 2512 if (last == NULL) { 2513 BLOBFS_TRACE(file, "nomem\n"); 2514 free_fs_request(flush_req); 2515 pthread_spin_unlock(&file->lock); 2516 return -ENOMEM; 2517 } 2518 } 2519 } 2520 2521 pthread_spin_unlock(&file->lock); 2522 2523 if (cache_buffers_filled == 0) { 2524 free_fs_request(flush_req); 2525 return 0; 2526 } 2527 2528 flush_req->args.file = file; 2529 file->fs->send_request(__file_flush, flush_req); 2530 return 0; 2531 } 2532 2533 static void 2534 __readahead_done(void *ctx, int bserrno) 2535 { 2536 struct spdk_fs_request *req = ctx; 2537 struct spdk_fs_cb_args *args = &req->args; 2538 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2539 struct spdk_file *file = args->file; 2540 2541 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2542 2543 pthread_spin_lock(&file->lock); 2544 cache_buffer->bytes_filled = args->op.readahead.length; 2545 cache_buffer->bytes_flushed = args->op.readahead.length; 2546 cache_buffer->in_progress = false; 2547 pthread_spin_unlock(&file->lock); 2548 2549 free_fs_request(req); 2550 } 2551 2552 static void 2553 __readahead(void *ctx) 2554 { 2555 struct spdk_fs_request *req = ctx; 2556 struct spdk_fs_cb_args *args = &req->args; 2557 struct spdk_file *file = args->file; 2558 uint64_t offset, length, start_lba, num_lba; 2559 uint32_t lba_size; 2560 2561 offset = args->op.readahead.offset; 2562 length = args->op.readahead.length; 2563 assert(length > 0); 2564 2565 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2566 2567 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2568 offset, length, start_lba, num_lba); 2569 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2570 args->op.readahead.cache_buffer->buf, 2571 start_lba, num_lba, __readahead_done, req); 2572 } 2573 2574 static uint64_t 2575 __next_cache_buffer_offset(uint64_t offset) 2576 { 2577 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2578 } 2579 2580 static void 2581 check_readahead(struct spdk_file *file, uint64_t offset, 2582 struct spdk_fs_channel *channel) 2583 { 2584 struct spdk_fs_request *req; 2585 struct spdk_fs_cb_args *args; 2586 2587 offset = __next_cache_buffer_offset(offset); 2588 if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2589 return; 2590 } 2591 2592 req = alloc_fs_request(channel); 2593 if (req == NULL) { 2594 return; 2595 } 2596 args = &req->args; 2597 2598 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2599 2600 args->file = file; 2601 args->op.readahead.offset = offset; 2602 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2603 if (!args->op.readahead.cache_buffer) { 2604 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2605 free_fs_request(req); 2606 return; 2607 } 2608 2609 args->op.readahead.cache_buffer->in_progress = true; 2610 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2611 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2612 } else { 2613 args->op.readahead.length = CACHE_BUFFER_SIZE; 2614 } 2615 file->fs->send_request(__readahead, req); 2616 } 2617 2618 int64_t 2619 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2620 void *payload, uint64_t offset, uint64_t length) 2621 { 2622 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2623 uint64_t final_offset, final_length; 2624 uint32_t sub_reads = 0; 2625 struct cache_buffer *buf; 2626 uint64_t read_len; 2627 struct rw_from_file_arg arg = {}; 2628 2629 pthread_spin_lock(&file->lock); 2630 2631 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2632 2633 file->open_for_writing = false; 2634 2635 if (length == 0 || offset >= file->append_pos) { 2636 pthread_spin_unlock(&file->lock); 2637 return 0; 2638 } 2639 2640 if (offset + length > file->append_pos) { 2641 length = file->append_pos - offset; 2642 } 2643 2644 if (offset != file->next_seq_offset) { 2645 file->seq_byte_count = 0; 2646 } 2647 file->seq_byte_count += length; 2648 file->next_seq_offset = offset + length; 2649 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2650 check_readahead(file, offset, channel); 2651 check_readahead(file, offset + CACHE_BUFFER_SIZE, channel); 2652 } 2653 2654 arg.channel = channel; 2655 arg.rwerrno = 0; 2656 final_length = 0; 2657 final_offset = offset + length; 2658 while (offset < final_offset) { 2659 int ret = 0; 2660 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2661 if (length > (final_offset - offset)) { 2662 length = final_offset - offset; 2663 } 2664 2665 buf = tree_find_filled_buffer(file->tree, offset); 2666 if (buf == NULL) { 2667 pthread_spin_unlock(&file->lock); 2668 ret = __send_rw_from_file(file, payload, offset, length, true, &arg); 2669 pthread_spin_lock(&file->lock); 2670 if (ret == 0) { 2671 sub_reads++; 2672 } 2673 } else { 2674 read_len = length; 2675 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2676 read_len = buf->offset + buf->bytes_filled - offset; 2677 } 2678 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len); 2679 memcpy(payload, &buf->buf[offset - buf->offset], read_len); 2680 if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) { 2681 tree_remove_buffer(file->tree, buf); 2682 if (file->tree->present_mask == 0) { 2683 spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file); 2684 } 2685 } 2686 } 2687 2688 if (ret == 0) { 2689 final_length += length; 2690 } else { 2691 arg.rwerrno = ret; 2692 break; 2693 } 2694 payload += length; 2695 offset += length; 2696 } 2697 pthread_spin_unlock(&file->lock); 2698 while (sub_reads > 0) { 2699 sem_wait(&channel->sem); 2700 sub_reads--; 2701 } 2702 if (arg.rwerrno == 0) { 2703 return final_length; 2704 } else { 2705 return arg.rwerrno; 2706 } 2707 } 2708 2709 static void 2710 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2711 spdk_file_op_complete cb_fn, void *cb_arg) 2712 { 2713 struct spdk_fs_request *sync_req; 2714 struct spdk_fs_request *flush_req; 2715 struct spdk_fs_cb_args *sync_args; 2716 struct spdk_fs_cb_args *flush_args; 2717 2718 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2719 2720 pthread_spin_lock(&file->lock); 2721 if (file->append_pos <= file->length_xattr) { 2722 BLOBFS_TRACE(file, "done - file already synced\n"); 2723 pthread_spin_unlock(&file->lock); 2724 cb_fn(cb_arg, 0); 2725 return; 2726 } 2727 2728 sync_req = alloc_fs_request(channel); 2729 if (!sync_req) { 2730 SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name); 2731 pthread_spin_unlock(&file->lock); 2732 cb_fn(cb_arg, -ENOMEM); 2733 return; 2734 } 2735 sync_args = &sync_req->args; 2736 2737 flush_req = alloc_fs_request(channel); 2738 if (!flush_req) { 2739 SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name); 2740 free_fs_request(sync_req); 2741 pthread_spin_unlock(&file->lock); 2742 cb_fn(cb_arg, -ENOMEM); 2743 return; 2744 } 2745 flush_args = &flush_req->args; 2746 2747 sync_args->file = file; 2748 sync_args->fn.file_op = cb_fn; 2749 sync_args->arg = cb_arg; 2750 sync_args->op.sync.offset = file->append_pos; 2751 sync_args->op.sync.xattr_in_progress = false; 2752 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2753 pthread_spin_unlock(&file->lock); 2754 2755 flush_args->file = file; 2756 channel->send_request(__file_flush, flush_req); 2757 } 2758 2759 int 2760 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2761 { 2762 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2763 struct spdk_fs_cb_args args = {}; 2764 2765 args.sem = &channel->sem; 2766 _file_sync(file, channel, __wake_caller, &args); 2767 sem_wait(&channel->sem); 2768 2769 return args.rc; 2770 } 2771 2772 void 2773 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2774 spdk_file_op_complete cb_fn, void *cb_arg) 2775 { 2776 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2777 2778 _file_sync(file, channel, cb_fn, cb_arg); 2779 } 2780 2781 void 2782 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2783 { 2784 BLOBFS_TRACE(file, "priority=%u\n", priority); 2785 file->priority = priority; 2786 2787 } 2788 2789 /* 2790 * Close routines 2791 */ 2792 2793 static void 2794 __file_close_async_done(void *ctx, int bserrno) 2795 { 2796 struct spdk_fs_request *req = ctx; 2797 struct spdk_fs_cb_args *args = &req->args; 2798 struct spdk_file *file = args->file; 2799 2800 spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->name); 2801 2802 if (file->is_deleted) { 2803 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2804 return; 2805 } 2806 2807 args->fn.file_op(args->arg, bserrno); 2808 free_fs_request(req); 2809 } 2810 2811 static void 2812 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2813 { 2814 struct spdk_blob *blob; 2815 2816 pthread_spin_lock(&file->lock); 2817 if (file->ref_count == 0) { 2818 pthread_spin_unlock(&file->lock); 2819 __file_close_async_done(req, -EBADF); 2820 return; 2821 } 2822 2823 file->ref_count--; 2824 if (file->ref_count > 0) { 2825 pthread_spin_unlock(&file->lock); 2826 req->args.fn.file_op(req->args.arg, 0); 2827 free_fs_request(req); 2828 return; 2829 } 2830 2831 pthread_spin_unlock(&file->lock); 2832 2833 blob = file->blob; 2834 file->blob = NULL; 2835 spdk_blob_close(blob, __file_close_async_done, req); 2836 } 2837 2838 static void 2839 __file_close_async__sync_done(void *arg, int fserrno) 2840 { 2841 struct spdk_fs_request *req = arg; 2842 struct spdk_fs_cb_args *args = &req->args; 2843 2844 __file_close_async(args->file, req); 2845 } 2846 2847 void 2848 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2849 { 2850 struct spdk_fs_request *req; 2851 struct spdk_fs_cb_args *args; 2852 2853 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2854 if (req == NULL) { 2855 SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name); 2856 cb_fn(cb_arg, -ENOMEM); 2857 return; 2858 } 2859 2860 args = &req->args; 2861 args->file = file; 2862 args->fn.file_op = cb_fn; 2863 args->arg = cb_arg; 2864 2865 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2866 } 2867 2868 static void 2869 __file_close(void *arg) 2870 { 2871 struct spdk_fs_request *req = arg; 2872 struct spdk_fs_cb_args *args = &req->args; 2873 struct spdk_file *file = args->file; 2874 2875 __file_close_async(file, req); 2876 } 2877 2878 int 2879 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2880 { 2881 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2882 struct spdk_fs_request *req; 2883 struct spdk_fs_cb_args *args; 2884 2885 req = alloc_fs_request(channel); 2886 if (req == NULL) { 2887 SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name); 2888 return -ENOMEM; 2889 } 2890 2891 args = &req->args; 2892 2893 spdk_file_sync(file, ctx); 2894 BLOBFS_TRACE(file, "name=%s\n", file->name); 2895 args->file = file; 2896 args->sem = &channel->sem; 2897 args->fn.file_op = __wake_caller; 2898 args->arg = args; 2899 channel->send_request(__file_close, req); 2900 sem_wait(&channel->sem); 2901 2902 return args->rc; 2903 } 2904 2905 int 2906 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2907 { 2908 if (size < sizeof(spdk_blob_id)) { 2909 return -EINVAL; 2910 } 2911 2912 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2913 2914 return sizeof(spdk_blob_id); 2915 } 2916 2917 static void 2918 _file_free(void *ctx) 2919 { 2920 struct spdk_file *file = ctx; 2921 2922 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2923 2924 free(file->name); 2925 free(file->tree); 2926 free(file); 2927 } 2928 2929 static void 2930 file_free(struct spdk_file *file) 2931 { 2932 BLOBFS_TRACE(file, "free=%s\n", file->name); 2933 pthread_spin_lock(&file->lock); 2934 if (file->tree->present_mask == 0) { 2935 pthread_spin_unlock(&file->lock); 2936 free(file->name); 2937 free(file->tree); 2938 free(file); 2939 return; 2940 } 2941 2942 tree_free_buffers(file->tree); 2943 assert(file->tree->present_mask == 0); 2944 spdk_thread_send_msg(g_cache_pool_thread, _file_free, file); 2945 pthread_spin_unlock(&file->lock); 2946 } 2947 2948 SPDK_LOG_REGISTER_COMPONENT(blobfs) 2949 SPDK_LOG_REGISTER_COMPONENT(blobfs_rw) 2950