1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "tree.h" 38 39 #include "spdk/queue.h" 40 #include "spdk/thread.h" 41 #include "spdk/assert.h" 42 #include "spdk/env.h" 43 #include "spdk/util.h" 44 #include "spdk/log.h" 45 #include "spdk/trace.h" 46 47 #include "spdk_internal/trace_defs.h" 48 49 #define BLOBFS_TRACE(file, str, args...) \ 50 SPDK_DEBUGLOG(blobfs, "file=%s " str, file->name, ##args) 51 52 #define BLOBFS_TRACE_RW(file, str, args...) \ 53 SPDK_DEBUGLOG(blobfs_rw, "file=%s " str, file->name, ##args) 54 55 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 56 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 57 58 #define SPDK_BLOBFS_SIGNATURE "BLOBFS" 59 60 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 61 static struct spdk_mempool *g_cache_pool; 62 static TAILQ_HEAD(, spdk_file) g_caches = TAILQ_HEAD_INITIALIZER(g_caches); 63 static struct spdk_poller *g_cache_pool_mgmt_poller; 64 static struct spdk_thread *g_cache_pool_thread; 65 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL 66 static int g_fs_count = 0; 67 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 68 69 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS) 70 { 71 struct spdk_trace_tpoint_opts opts[] = { 72 { 73 "BLOBFS_XATTR_START", TRACE_BLOBFS_XATTR_START, 74 OWNER_NONE, OBJECT_NONE, 0, 75 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 76 }, 77 { 78 "BLOBFS_XATTR_END", TRACE_BLOBFS_XATTR_END, 79 OWNER_NONE, OBJECT_NONE, 0, 80 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 81 }, 82 { 83 "BLOBFS_OPEN", TRACE_BLOBFS_OPEN, 84 OWNER_NONE, OBJECT_NONE, 0, 85 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 86 }, 87 { 88 "BLOBFS_CLOSE", TRACE_BLOBFS_CLOSE, 89 OWNER_NONE, OBJECT_NONE, 0, 90 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 91 }, 92 { 93 "BLOBFS_DELETE_START", TRACE_BLOBFS_DELETE_START, 94 OWNER_NONE, OBJECT_NONE, 0, 95 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 96 }, 97 { 98 "BLOBFS_DELETE_DONE", TRACE_BLOBFS_DELETE_DONE, 99 OWNER_NONE, OBJECT_NONE, 0, 100 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 101 } 102 }; 103 104 spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts)); 105 } 106 107 void 108 cache_buffer_free(struct cache_buffer *cache_buffer) 109 { 110 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 111 free(cache_buffer); 112 } 113 114 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 115 116 struct spdk_file { 117 struct spdk_filesystem *fs; 118 struct spdk_blob *blob; 119 char *name; 120 uint64_t length; 121 bool is_deleted; 122 bool open_for_writing; 123 uint64_t length_flushed; 124 uint64_t length_xattr; 125 uint64_t append_pos; 126 uint64_t seq_byte_count; 127 uint64_t next_seq_offset; 128 uint32_t priority; 129 TAILQ_ENTRY(spdk_file) tailq; 130 spdk_blob_id blobid; 131 uint32_t ref_count; 132 pthread_spinlock_t lock; 133 struct cache_buffer *last; 134 struct cache_tree *tree; 135 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 136 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 137 TAILQ_ENTRY(spdk_file) cache_tailq; 138 }; 139 140 struct spdk_deleted_file { 141 spdk_blob_id id; 142 TAILQ_ENTRY(spdk_deleted_file) tailq; 143 }; 144 145 struct spdk_filesystem { 146 struct spdk_blob_store *bs; 147 TAILQ_HEAD(, spdk_file) files; 148 struct spdk_bs_opts bs_opts; 149 struct spdk_bs_dev *bdev; 150 fs_send_request_fn send_request; 151 152 struct { 153 uint32_t max_ops; 154 struct spdk_io_channel *sync_io_channel; 155 struct spdk_fs_channel *sync_fs_channel; 156 } sync_target; 157 158 struct { 159 uint32_t max_ops; 160 struct spdk_io_channel *md_io_channel; 161 struct spdk_fs_channel *md_fs_channel; 162 } md_target; 163 164 struct { 165 uint32_t max_ops; 166 } io_target; 167 }; 168 169 struct spdk_fs_cb_args { 170 union { 171 spdk_fs_op_with_handle_complete fs_op_with_handle; 172 spdk_fs_op_complete fs_op; 173 spdk_file_op_with_handle_complete file_op_with_handle; 174 spdk_file_op_complete file_op; 175 spdk_file_stat_op_complete stat_op; 176 } fn; 177 void *arg; 178 sem_t *sem; 179 struct spdk_filesystem *fs; 180 struct spdk_file *file; 181 int rc; 182 int *rwerrno; 183 struct iovec *iovs; 184 uint32_t iovcnt; 185 struct iovec iov; 186 union { 187 struct { 188 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 189 } fs_load; 190 struct { 191 uint64_t length; 192 } truncate; 193 struct { 194 struct spdk_io_channel *channel; 195 void *pin_buf; 196 int is_read; 197 off_t offset; 198 size_t length; 199 uint64_t start_lba; 200 uint64_t num_lba; 201 uint32_t blocklen; 202 } rw; 203 struct { 204 const char *old_name; 205 const char *new_name; 206 } rename; 207 struct { 208 struct cache_buffer *cache_buffer; 209 uint64_t length; 210 } flush; 211 struct { 212 struct cache_buffer *cache_buffer; 213 uint64_t length; 214 uint64_t offset; 215 } readahead; 216 struct { 217 /* offset of the file when the sync request was made */ 218 uint64_t offset; 219 TAILQ_ENTRY(spdk_fs_request) tailq; 220 bool xattr_in_progress; 221 /* length written to the xattr for this file - this should 222 * always be the same as the offset if only one thread is 223 * writing to the file, but could differ if multiple threads 224 * are appending 225 */ 226 uint64_t length; 227 } sync; 228 struct { 229 uint32_t num_clusters; 230 } resize; 231 struct { 232 const char *name; 233 uint32_t flags; 234 TAILQ_ENTRY(spdk_fs_request) tailq; 235 } open; 236 struct { 237 const char *name; 238 struct spdk_blob *blob; 239 } create; 240 struct { 241 const char *name; 242 } delete; 243 struct { 244 const char *name; 245 } stat; 246 } op; 247 }; 248 249 static void file_free(struct spdk_file *file); 250 static void fs_io_device_unregister(struct spdk_filesystem *fs); 251 static void fs_free_io_channels(struct spdk_filesystem *fs); 252 253 void 254 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 255 { 256 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 257 } 258 259 static int _blobfs_cache_pool_reclaim(void *arg); 260 261 static bool 262 blobfs_cache_pool_need_reclaim(void) 263 { 264 size_t count; 265 266 count = spdk_mempool_count(g_cache_pool); 267 /* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller 268 * when the number of available cache buffer is less than 1/5 of total buffers. 269 */ 270 if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) { 271 return false; 272 } 273 274 return true; 275 } 276 277 static void 278 __start_cache_pool_mgmt(void *ctx) 279 { 280 assert(g_cache_pool == NULL); 281 282 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 283 g_fs_cache_size / CACHE_BUFFER_SIZE, 284 CACHE_BUFFER_SIZE, 285 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 286 SPDK_ENV_SOCKET_ID_ANY); 287 if (!g_cache_pool) { 288 SPDK_ERRLOG("Create mempool failed, you may " 289 "increase the memory and try again\n"); 290 assert(false); 291 } 292 293 assert(g_cache_pool_mgmt_poller == NULL); 294 g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL, 295 BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 296 } 297 298 static void 299 __stop_cache_pool_mgmt(void *ctx) 300 { 301 spdk_poller_unregister(&g_cache_pool_mgmt_poller); 302 303 assert(g_cache_pool != NULL); 304 assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE); 305 spdk_mempool_free(g_cache_pool); 306 g_cache_pool = NULL; 307 308 spdk_thread_exit(g_cache_pool_thread); 309 } 310 311 static void 312 initialize_global_cache(void) 313 { 314 pthread_mutex_lock(&g_cache_init_lock); 315 if (g_fs_count == 0) { 316 g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL); 317 assert(g_cache_pool_thread != NULL); 318 spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL); 319 } 320 g_fs_count++; 321 pthread_mutex_unlock(&g_cache_init_lock); 322 } 323 324 static void 325 free_global_cache(void) 326 { 327 pthread_mutex_lock(&g_cache_init_lock); 328 g_fs_count--; 329 if (g_fs_count == 0) { 330 spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL); 331 } 332 pthread_mutex_unlock(&g_cache_init_lock); 333 } 334 335 static uint64_t 336 __file_get_blob_size(struct spdk_file *file) 337 { 338 uint64_t cluster_sz; 339 340 cluster_sz = file->fs->bs_opts.cluster_sz; 341 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 342 } 343 344 struct spdk_fs_request { 345 struct spdk_fs_cb_args args; 346 TAILQ_ENTRY(spdk_fs_request) link; 347 struct spdk_fs_channel *channel; 348 }; 349 350 struct spdk_fs_channel { 351 struct spdk_fs_request *req_mem; 352 TAILQ_HEAD(, spdk_fs_request) reqs; 353 sem_t sem; 354 struct spdk_filesystem *fs; 355 struct spdk_io_channel *bs_channel; 356 fs_send_request_fn send_request; 357 bool sync; 358 uint32_t outstanding_reqs; 359 pthread_spinlock_t lock; 360 }; 361 362 /* For now, this is effectively an alias. But eventually we'll shift 363 * some data members over. */ 364 struct spdk_fs_thread_ctx { 365 struct spdk_fs_channel ch; 366 }; 367 368 static struct spdk_fs_request * 369 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt) 370 { 371 struct spdk_fs_request *req; 372 struct iovec *iovs = NULL; 373 374 if (iovcnt > 1) { 375 iovs = calloc(iovcnt, sizeof(struct iovec)); 376 if (!iovs) { 377 return NULL; 378 } 379 } 380 381 if (channel->sync) { 382 pthread_spin_lock(&channel->lock); 383 } 384 385 req = TAILQ_FIRST(&channel->reqs); 386 if (req) { 387 channel->outstanding_reqs++; 388 TAILQ_REMOVE(&channel->reqs, req, link); 389 } 390 391 if (channel->sync) { 392 pthread_spin_unlock(&channel->lock); 393 } 394 395 if (req == NULL) { 396 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 397 free(iovs); 398 return NULL; 399 } 400 memset(req, 0, sizeof(*req)); 401 req->channel = channel; 402 if (iovcnt > 1) { 403 req->args.iovs = iovs; 404 } else { 405 req->args.iovs = &req->args.iov; 406 } 407 req->args.iovcnt = iovcnt; 408 409 return req; 410 } 411 412 static struct spdk_fs_request * 413 alloc_fs_request(struct spdk_fs_channel *channel) 414 { 415 return alloc_fs_request_with_iov(channel, 0); 416 } 417 418 static void 419 free_fs_request(struct spdk_fs_request *req) 420 { 421 struct spdk_fs_channel *channel = req->channel; 422 423 if (req->args.iovcnt > 1) { 424 free(req->args.iovs); 425 } 426 427 if (channel->sync) { 428 pthread_spin_lock(&channel->lock); 429 } 430 431 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 432 channel->outstanding_reqs--; 433 434 if (channel->sync) { 435 pthread_spin_unlock(&channel->lock); 436 } 437 } 438 439 static int 440 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 441 uint32_t max_ops) 442 { 443 uint32_t i; 444 445 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 446 if (!channel->req_mem) { 447 return -1; 448 } 449 450 channel->outstanding_reqs = 0; 451 TAILQ_INIT(&channel->reqs); 452 sem_init(&channel->sem, 0, 0); 453 454 for (i = 0; i < max_ops; i++) { 455 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 456 } 457 458 channel->fs = fs; 459 460 return 0; 461 } 462 463 static int 464 fs_md_channel_create(void *io_device, void *ctx_buf) 465 { 466 struct spdk_filesystem *fs; 467 struct spdk_fs_channel *channel = ctx_buf; 468 469 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 470 471 return fs_channel_create(fs, channel, fs->md_target.max_ops); 472 } 473 474 static int 475 fs_sync_channel_create(void *io_device, void *ctx_buf) 476 { 477 struct spdk_filesystem *fs; 478 struct spdk_fs_channel *channel = ctx_buf; 479 480 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 481 482 return fs_channel_create(fs, channel, fs->sync_target.max_ops); 483 } 484 485 static int 486 fs_io_channel_create(void *io_device, void *ctx_buf) 487 { 488 struct spdk_filesystem *fs; 489 struct spdk_fs_channel *channel = ctx_buf; 490 491 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 492 493 return fs_channel_create(fs, channel, fs->io_target.max_ops); 494 } 495 496 static void 497 fs_channel_destroy(void *io_device, void *ctx_buf) 498 { 499 struct spdk_fs_channel *channel = ctx_buf; 500 501 if (channel->outstanding_reqs > 0) { 502 SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n", 503 channel->outstanding_reqs); 504 } 505 506 free(channel->req_mem); 507 if (channel->bs_channel != NULL) { 508 spdk_bs_free_io_channel(channel->bs_channel); 509 } 510 } 511 512 static void 513 __send_request_direct(fs_request_fn fn, void *arg) 514 { 515 fn(arg); 516 } 517 518 static void 519 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 520 { 521 fs->bs = bs; 522 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 523 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 524 fs->md_target.md_fs_channel->send_request = __send_request_direct; 525 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 526 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 527 528 initialize_global_cache(); 529 } 530 531 static void 532 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 533 { 534 struct spdk_fs_request *req = ctx; 535 struct spdk_fs_cb_args *args = &req->args; 536 struct spdk_filesystem *fs = args->fs; 537 538 if (bserrno == 0) { 539 common_fs_bs_init(fs, bs); 540 } else { 541 free(fs); 542 fs = NULL; 543 } 544 545 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 546 free_fs_request(req); 547 } 548 549 static struct spdk_filesystem * 550 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 551 { 552 struct spdk_filesystem *fs; 553 554 fs = calloc(1, sizeof(*fs)); 555 if (fs == NULL) { 556 return NULL; 557 } 558 559 fs->bdev = dev; 560 fs->send_request = send_request_fn; 561 TAILQ_INIT(&fs->files); 562 563 fs->md_target.max_ops = 512; 564 spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy, 565 sizeof(struct spdk_fs_channel), "blobfs_md"); 566 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 567 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 568 569 fs->sync_target.max_ops = 512; 570 spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy, 571 sizeof(struct spdk_fs_channel), "blobfs_sync"); 572 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 573 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 574 575 fs->io_target.max_ops = 512; 576 spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy, 577 sizeof(struct spdk_fs_channel), "blobfs_io"); 578 579 return fs; 580 } 581 582 static void 583 __wake_caller(void *arg, int fserrno) 584 { 585 struct spdk_fs_cb_args *args = arg; 586 587 if ((args->rwerrno != NULL) && (*(args->rwerrno) == 0) && fserrno) { 588 *(args->rwerrno) = fserrno; 589 } 590 args->rc = fserrno; 591 sem_post(args->sem); 592 } 593 594 void 595 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 596 fs_send_request_fn send_request_fn, 597 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 598 { 599 struct spdk_filesystem *fs; 600 struct spdk_fs_request *req; 601 struct spdk_fs_cb_args *args; 602 struct spdk_bs_opts opts = {}; 603 604 fs = fs_alloc(dev, send_request_fn); 605 if (fs == NULL) { 606 cb_fn(cb_arg, NULL, -ENOMEM); 607 return; 608 } 609 610 req = alloc_fs_request(fs->md_target.md_fs_channel); 611 if (req == NULL) { 612 fs_free_io_channels(fs); 613 fs_io_device_unregister(fs); 614 cb_fn(cb_arg, NULL, -ENOMEM); 615 return; 616 } 617 618 args = &req->args; 619 args->fn.fs_op_with_handle = cb_fn; 620 args->arg = cb_arg; 621 args->fs = fs; 622 623 spdk_bs_opts_init(&opts, sizeof(opts)); 624 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE); 625 if (opt) { 626 opts.cluster_sz = opt->cluster_sz; 627 } 628 spdk_bs_init(dev, &opts, init_cb, req); 629 } 630 631 static struct spdk_file * 632 file_alloc(struct spdk_filesystem *fs) 633 { 634 struct spdk_file *file; 635 636 file = calloc(1, sizeof(*file)); 637 if (file == NULL) { 638 return NULL; 639 } 640 641 file->tree = calloc(1, sizeof(*file->tree)); 642 if (file->tree == NULL) { 643 free(file); 644 return NULL; 645 } 646 647 if (pthread_spin_init(&file->lock, 0)) { 648 free(file->tree); 649 free(file); 650 return NULL; 651 } 652 653 file->fs = fs; 654 TAILQ_INIT(&file->open_requests); 655 TAILQ_INIT(&file->sync_requests); 656 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 657 file->priority = SPDK_FILE_PRIORITY_LOW; 658 return file; 659 } 660 661 static void fs_load_done(void *ctx, int bserrno); 662 663 static int 664 _handle_deleted_files(struct spdk_fs_request *req) 665 { 666 struct spdk_fs_cb_args *args = &req->args; 667 struct spdk_filesystem *fs = args->fs; 668 669 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 670 struct spdk_deleted_file *deleted_file; 671 672 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 673 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 674 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 675 free(deleted_file); 676 return 0; 677 } 678 679 return 1; 680 } 681 682 static void 683 fs_load_done(void *ctx, int bserrno) 684 { 685 struct spdk_fs_request *req = ctx; 686 struct spdk_fs_cb_args *args = &req->args; 687 struct spdk_filesystem *fs = args->fs; 688 689 /* The filesystem has been loaded. Now check if there are any files that 690 * were marked for deletion before last unload. Do not complete the 691 * fs_load callback until all of them have been deleted on disk. 692 */ 693 if (_handle_deleted_files(req) == 0) { 694 /* We found a file that's been marked for deleting but not actually 695 * deleted yet. This function will get called again once the delete 696 * operation is completed. 697 */ 698 return; 699 } 700 701 args->fn.fs_op_with_handle(args->arg, fs, 0); 702 free_fs_request(req); 703 704 } 705 706 static void 707 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 708 { 709 struct spdk_fs_request *req = ctx; 710 struct spdk_fs_cb_args *args = &req->args; 711 struct spdk_filesystem *fs = args->fs; 712 uint64_t *length; 713 const char *name; 714 uint32_t *is_deleted; 715 size_t value_len; 716 717 if (rc < 0) { 718 args->fn.fs_op_with_handle(args->arg, fs, rc); 719 free_fs_request(req); 720 return; 721 } 722 723 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 724 if (rc < 0) { 725 args->fn.fs_op_with_handle(args->arg, fs, rc); 726 free_fs_request(req); 727 return; 728 } 729 730 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 731 if (rc < 0) { 732 args->fn.fs_op_with_handle(args->arg, fs, rc); 733 free_fs_request(req); 734 return; 735 } 736 737 assert(value_len == 8); 738 739 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 740 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 741 if (rc < 0) { 742 struct spdk_file *f; 743 744 f = file_alloc(fs); 745 if (f == NULL) { 746 SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n"); 747 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 748 free_fs_request(req); 749 return; 750 } 751 752 f->name = strdup(name); 753 f->blobid = spdk_blob_get_id(blob); 754 f->length = *length; 755 f->length_flushed = *length; 756 f->length_xattr = *length; 757 f->append_pos = *length; 758 SPDK_DEBUGLOG(blobfs, "added file %s length=%ju\n", f->name, f->length); 759 } else { 760 struct spdk_deleted_file *deleted_file; 761 762 deleted_file = calloc(1, sizeof(*deleted_file)); 763 if (deleted_file == NULL) { 764 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 765 free_fs_request(req); 766 return; 767 } 768 deleted_file->id = spdk_blob_get_id(blob); 769 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 770 } 771 } 772 773 static void 774 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 775 { 776 struct spdk_fs_request *req = ctx; 777 struct spdk_fs_cb_args *args = &req->args; 778 struct spdk_filesystem *fs = args->fs; 779 struct spdk_bs_type bstype; 780 static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE}; 781 static const struct spdk_bs_type zeros; 782 783 if (bserrno != 0) { 784 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 785 free_fs_request(req); 786 fs_free_io_channels(fs); 787 fs_io_device_unregister(fs); 788 return; 789 } 790 791 bstype = spdk_bs_get_bstype(bs); 792 793 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 794 SPDK_DEBUGLOG(blobfs, "assigning bstype\n"); 795 spdk_bs_set_bstype(bs, blobfs_type); 796 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 797 SPDK_ERRLOG("not blobfs\n"); 798 SPDK_LOGDUMP(blobfs, "bstype", &bstype, sizeof(bstype)); 799 args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL); 800 free_fs_request(req); 801 fs_free_io_channels(fs); 802 fs_io_device_unregister(fs); 803 return; 804 } 805 806 common_fs_bs_init(fs, bs); 807 fs_load_done(req, 0); 808 } 809 810 static void 811 fs_io_device_unregister(struct spdk_filesystem *fs) 812 { 813 assert(fs != NULL); 814 spdk_io_device_unregister(&fs->md_target, NULL); 815 spdk_io_device_unregister(&fs->sync_target, NULL); 816 spdk_io_device_unregister(&fs->io_target, NULL); 817 free(fs); 818 } 819 820 static void 821 fs_free_io_channels(struct spdk_filesystem *fs) 822 { 823 assert(fs != NULL); 824 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 825 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 826 } 827 828 void 829 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 830 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 831 { 832 struct spdk_filesystem *fs; 833 struct spdk_fs_cb_args *args; 834 struct spdk_fs_request *req; 835 struct spdk_bs_opts bs_opts; 836 837 fs = fs_alloc(dev, send_request_fn); 838 if (fs == NULL) { 839 cb_fn(cb_arg, NULL, -ENOMEM); 840 return; 841 } 842 843 req = alloc_fs_request(fs->md_target.md_fs_channel); 844 if (req == NULL) { 845 fs_free_io_channels(fs); 846 fs_io_device_unregister(fs); 847 cb_fn(cb_arg, NULL, -ENOMEM); 848 return; 849 } 850 851 args = &req->args; 852 args->fn.fs_op_with_handle = cb_fn; 853 args->arg = cb_arg; 854 args->fs = fs; 855 TAILQ_INIT(&args->op.fs_load.deleted_files); 856 spdk_bs_opts_init(&bs_opts, sizeof(bs_opts)); 857 bs_opts.iter_cb_fn = iter_cb; 858 bs_opts.iter_cb_arg = req; 859 spdk_bs_load(dev, &bs_opts, load_cb, req); 860 } 861 862 static void 863 unload_cb(void *ctx, int bserrno) 864 { 865 struct spdk_fs_request *req = ctx; 866 struct spdk_fs_cb_args *args = &req->args; 867 struct spdk_filesystem *fs = args->fs; 868 struct spdk_file *file, *tmp; 869 870 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 871 TAILQ_REMOVE(&fs->files, file, tailq); 872 file_free(file); 873 } 874 875 free_global_cache(); 876 877 args->fn.fs_op(args->arg, bserrno); 878 free(req); 879 880 fs_io_device_unregister(fs); 881 } 882 883 void 884 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 885 { 886 struct spdk_fs_request *req; 887 struct spdk_fs_cb_args *args; 888 889 /* 890 * We must free the md_channel before unloading the blobstore, so just 891 * allocate this request from the general heap. 892 */ 893 req = calloc(1, sizeof(*req)); 894 if (req == NULL) { 895 cb_fn(cb_arg, -ENOMEM); 896 return; 897 } 898 899 args = &req->args; 900 args->fn.fs_op = cb_fn; 901 args->arg = cb_arg; 902 args->fs = fs; 903 904 fs_free_io_channels(fs); 905 spdk_bs_unload(fs->bs, unload_cb, req); 906 } 907 908 static struct spdk_file * 909 fs_find_file(struct spdk_filesystem *fs, const char *name) 910 { 911 struct spdk_file *file; 912 913 TAILQ_FOREACH(file, &fs->files, tailq) { 914 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 915 return file; 916 } 917 } 918 919 return NULL; 920 } 921 922 void 923 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 924 spdk_file_stat_op_complete cb_fn, void *cb_arg) 925 { 926 struct spdk_file_stat stat; 927 struct spdk_file *f = NULL; 928 929 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 930 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 931 return; 932 } 933 934 f = fs_find_file(fs, name); 935 if (f != NULL) { 936 stat.blobid = f->blobid; 937 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 938 cb_fn(cb_arg, &stat, 0); 939 return; 940 } 941 942 cb_fn(cb_arg, NULL, -ENOENT); 943 } 944 945 static void 946 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 947 { 948 struct spdk_fs_request *req = arg; 949 struct spdk_fs_cb_args *args = &req->args; 950 951 args->rc = fserrno; 952 if (fserrno == 0) { 953 memcpy(args->arg, stat, sizeof(*stat)); 954 } 955 sem_post(args->sem); 956 } 957 958 static void 959 __file_stat(void *arg) 960 { 961 struct spdk_fs_request *req = arg; 962 struct spdk_fs_cb_args *args = &req->args; 963 964 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 965 args->fn.stat_op, req); 966 } 967 968 int 969 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 970 const char *name, struct spdk_file_stat *stat) 971 { 972 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 973 struct spdk_fs_request *req; 974 int rc; 975 976 req = alloc_fs_request(channel); 977 if (req == NULL) { 978 SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name); 979 return -ENOMEM; 980 } 981 982 req->args.fs = fs; 983 req->args.op.stat.name = name; 984 req->args.fn.stat_op = __copy_stat; 985 req->args.arg = stat; 986 req->args.sem = &channel->sem; 987 channel->send_request(__file_stat, req); 988 sem_wait(&channel->sem); 989 990 rc = req->args.rc; 991 free_fs_request(req); 992 993 return rc; 994 } 995 996 static void 997 fs_create_blob_close_cb(void *ctx, int bserrno) 998 { 999 int rc; 1000 struct spdk_fs_request *req = ctx; 1001 struct spdk_fs_cb_args *args = &req->args; 1002 1003 rc = args->rc ? args->rc : bserrno; 1004 args->fn.file_op(args->arg, rc); 1005 free_fs_request(req); 1006 } 1007 1008 static void 1009 fs_create_blob_resize_cb(void *ctx, int bserrno) 1010 { 1011 struct spdk_fs_request *req = ctx; 1012 struct spdk_fs_cb_args *args = &req->args; 1013 struct spdk_file *f = args->file; 1014 struct spdk_blob *blob = args->op.create.blob; 1015 uint64_t length = 0; 1016 1017 args->rc = bserrno; 1018 if (bserrno) { 1019 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1020 return; 1021 } 1022 1023 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 1024 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 1025 1026 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1027 } 1028 1029 static void 1030 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1031 { 1032 struct spdk_fs_request *req = ctx; 1033 struct spdk_fs_cb_args *args = &req->args; 1034 1035 if (bserrno) { 1036 args->fn.file_op(args->arg, bserrno); 1037 free_fs_request(req); 1038 return; 1039 } 1040 1041 args->op.create.blob = blob; 1042 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 1043 } 1044 1045 static void 1046 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 1047 { 1048 struct spdk_fs_request *req = ctx; 1049 struct spdk_fs_cb_args *args = &req->args; 1050 struct spdk_file *f = args->file; 1051 1052 if (bserrno) { 1053 args->fn.file_op(args->arg, bserrno); 1054 free_fs_request(req); 1055 return; 1056 } 1057 1058 f->blobid = blobid; 1059 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 1060 } 1061 1062 void 1063 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 1064 spdk_file_op_complete cb_fn, void *cb_arg) 1065 { 1066 struct spdk_file *file; 1067 struct spdk_fs_request *req; 1068 struct spdk_fs_cb_args *args; 1069 1070 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1071 cb_fn(cb_arg, -ENAMETOOLONG); 1072 return; 1073 } 1074 1075 file = fs_find_file(fs, name); 1076 if (file != NULL) { 1077 cb_fn(cb_arg, -EEXIST); 1078 return; 1079 } 1080 1081 file = file_alloc(fs); 1082 if (file == NULL) { 1083 SPDK_ERRLOG("Cannot allocate new file for creation\n"); 1084 cb_fn(cb_arg, -ENOMEM); 1085 return; 1086 } 1087 1088 req = alloc_fs_request(fs->md_target.md_fs_channel); 1089 if (req == NULL) { 1090 SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name); 1091 TAILQ_REMOVE(&fs->files, file, tailq); 1092 file_free(file); 1093 cb_fn(cb_arg, -ENOMEM); 1094 return; 1095 } 1096 1097 args = &req->args; 1098 args->file = file; 1099 args->fn.file_op = cb_fn; 1100 args->arg = cb_arg; 1101 1102 file->name = strdup(name); 1103 if (!file->name) { 1104 SPDK_ERRLOG("Cannot allocate file->name for file=%s\n", name); 1105 free_fs_request(req); 1106 TAILQ_REMOVE(&fs->files, file, tailq); 1107 file_free(file); 1108 cb_fn(cb_arg, -ENOMEM); 1109 return; 1110 } 1111 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 1112 } 1113 1114 static void 1115 __fs_create_file_done(void *arg, int fserrno) 1116 { 1117 struct spdk_fs_request *req = arg; 1118 struct spdk_fs_cb_args *args = &req->args; 1119 1120 __wake_caller(args, fserrno); 1121 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name); 1122 } 1123 1124 static void 1125 __fs_create_file(void *arg) 1126 { 1127 struct spdk_fs_request *req = arg; 1128 struct spdk_fs_cb_args *args = &req->args; 1129 1130 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name); 1131 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1132 } 1133 1134 int 1135 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name) 1136 { 1137 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1138 struct spdk_fs_request *req; 1139 struct spdk_fs_cb_args *args; 1140 int rc; 1141 1142 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1143 1144 req = alloc_fs_request(channel); 1145 if (req == NULL) { 1146 SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name); 1147 return -ENOMEM; 1148 } 1149 1150 args = &req->args; 1151 args->fs = fs; 1152 args->op.create.name = name; 1153 args->sem = &channel->sem; 1154 fs->send_request(__fs_create_file, req); 1155 sem_wait(&channel->sem); 1156 rc = args->rc; 1157 free_fs_request(req); 1158 1159 return rc; 1160 } 1161 1162 static void 1163 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1164 { 1165 struct spdk_fs_request *req = ctx; 1166 struct spdk_fs_cb_args *args = &req->args; 1167 struct spdk_file *f = args->file; 1168 1169 f->blob = blob; 1170 while (!TAILQ_EMPTY(&f->open_requests)) { 1171 req = TAILQ_FIRST(&f->open_requests); 1172 args = &req->args; 1173 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1174 spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->name); 1175 args->fn.file_op_with_handle(args->arg, f, bserrno); 1176 free_fs_request(req); 1177 } 1178 } 1179 1180 static void 1181 fs_open_blob_create_cb(void *ctx, int bserrno) 1182 { 1183 struct spdk_fs_request *req = ctx; 1184 struct spdk_fs_cb_args *args = &req->args; 1185 struct spdk_file *file = args->file; 1186 struct spdk_filesystem *fs = args->fs; 1187 1188 if (file == NULL) { 1189 /* 1190 * This is from an open with CREATE flag - the file 1191 * is now created so look it up in the file list for this 1192 * filesystem. 1193 */ 1194 file = fs_find_file(fs, args->op.open.name); 1195 assert(file != NULL); 1196 args->file = file; 1197 } 1198 1199 file->ref_count++; 1200 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1201 if (file->ref_count == 1) { 1202 assert(file->blob == NULL); 1203 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1204 } else if (file->blob != NULL) { 1205 fs_open_blob_done(req, file->blob, 0); 1206 } else { 1207 /* 1208 * The blob open for this file is in progress due to a previous 1209 * open request. When that open completes, it will invoke the 1210 * open callback for this request. 1211 */ 1212 } 1213 } 1214 1215 void 1216 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1217 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1218 { 1219 struct spdk_file *f = NULL; 1220 struct spdk_fs_request *req; 1221 struct spdk_fs_cb_args *args; 1222 1223 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1224 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1225 return; 1226 } 1227 1228 f = fs_find_file(fs, name); 1229 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1230 cb_fn(cb_arg, NULL, -ENOENT); 1231 return; 1232 } 1233 1234 if (f != NULL && f->is_deleted == true) { 1235 cb_fn(cb_arg, NULL, -ENOENT); 1236 return; 1237 } 1238 1239 req = alloc_fs_request(fs->md_target.md_fs_channel); 1240 if (req == NULL) { 1241 SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name); 1242 cb_fn(cb_arg, NULL, -ENOMEM); 1243 return; 1244 } 1245 1246 args = &req->args; 1247 args->fn.file_op_with_handle = cb_fn; 1248 args->arg = cb_arg; 1249 args->file = f; 1250 args->fs = fs; 1251 args->op.open.name = name; 1252 1253 if (f == NULL) { 1254 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1255 } else { 1256 fs_open_blob_create_cb(req, 0); 1257 } 1258 } 1259 1260 static void 1261 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1262 { 1263 struct spdk_fs_request *req = arg; 1264 struct spdk_fs_cb_args *args = &req->args; 1265 1266 args->file = file; 1267 __wake_caller(args, bserrno); 1268 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name); 1269 } 1270 1271 static void 1272 __fs_open_file(void *arg) 1273 { 1274 struct spdk_fs_request *req = arg; 1275 struct spdk_fs_cb_args *args = &req->args; 1276 1277 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name); 1278 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1279 __fs_open_file_done, req); 1280 } 1281 1282 int 1283 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1284 const char *name, uint32_t flags, struct spdk_file **file) 1285 { 1286 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1287 struct spdk_fs_request *req; 1288 struct spdk_fs_cb_args *args; 1289 int rc; 1290 1291 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1292 1293 req = alloc_fs_request(channel); 1294 if (req == NULL) { 1295 SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name); 1296 return -ENOMEM; 1297 } 1298 1299 args = &req->args; 1300 args->fs = fs; 1301 args->op.open.name = name; 1302 args->op.open.flags = flags; 1303 args->sem = &channel->sem; 1304 fs->send_request(__fs_open_file, req); 1305 sem_wait(&channel->sem); 1306 rc = args->rc; 1307 if (rc == 0) { 1308 *file = args->file; 1309 } else { 1310 *file = NULL; 1311 } 1312 free_fs_request(req); 1313 1314 return rc; 1315 } 1316 1317 static void 1318 fs_rename_blob_close_cb(void *ctx, int bserrno) 1319 { 1320 struct spdk_fs_request *req = ctx; 1321 struct spdk_fs_cb_args *args = &req->args; 1322 1323 args->fn.fs_op(args->arg, bserrno); 1324 free_fs_request(req); 1325 } 1326 1327 static void 1328 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1329 { 1330 struct spdk_fs_request *req = ctx; 1331 struct spdk_fs_cb_args *args = &req->args; 1332 const char *new_name = args->op.rename.new_name; 1333 1334 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1335 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1336 } 1337 1338 static void 1339 _fs_md_rename_file(struct spdk_fs_request *req) 1340 { 1341 struct spdk_fs_cb_args *args = &req->args; 1342 struct spdk_file *f; 1343 1344 f = fs_find_file(args->fs, args->op.rename.old_name); 1345 if (f == NULL) { 1346 args->fn.fs_op(args->arg, -ENOENT); 1347 free_fs_request(req); 1348 return; 1349 } 1350 1351 free(f->name); 1352 f->name = strdup(args->op.rename.new_name); 1353 args->file = f; 1354 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1355 } 1356 1357 static void 1358 fs_rename_delete_done(void *arg, int fserrno) 1359 { 1360 _fs_md_rename_file(arg); 1361 } 1362 1363 void 1364 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1365 const char *old_name, const char *new_name, 1366 spdk_file_op_complete cb_fn, void *cb_arg) 1367 { 1368 struct spdk_file *f; 1369 struct spdk_fs_request *req; 1370 struct spdk_fs_cb_args *args; 1371 1372 SPDK_DEBUGLOG(blobfs, "old=%s new=%s\n", old_name, new_name); 1373 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1374 cb_fn(cb_arg, -ENAMETOOLONG); 1375 return; 1376 } 1377 1378 req = alloc_fs_request(fs->md_target.md_fs_channel); 1379 if (req == NULL) { 1380 SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name, 1381 new_name); 1382 cb_fn(cb_arg, -ENOMEM); 1383 return; 1384 } 1385 1386 args = &req->args; 1387 args->fn.fs_op = cb_fn; 1388 args->fs = fs; 1389 args->arg = cb_arg; 1390 args->op.rename.old_name = old_name; 1391 args->op.rename.new_name = new_name; 1392 1393 f = fs_find_file(fs, new_name); 1394 if (f == NULL) { 1395 _fs_md_rename_file(req); 1396 return; 1397 } 1398 1399 /* 1400 * The rename overwrites an existing file. So delete the existing file, then 1401 * do the actual rename. 1402 */ 1403 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1404 } 1405 1406 static void 1407 __fs_rename_file_done(void *arg, int fserrno) 1408 { 1409 struct spdk_fs_request *req = arg; 1410 struct spdk_fs_cb_args *args = &req->args; 1411 1412 __wake_caller(args, fserrno); 1413 } 1414 1415 static void 1416 __fs_rename_file(void *arg) 1417 { 1418 struct spdk_fs_request *req = arg; 1419 struct spdk_fs_cb_args *args = &req->args; 1420 1421 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1422 __fs_rename_file_done, req); 1423 } 1424 1425 int 1426 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1427 const char *old_name, const char *new_name) 1428 { 1429 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1430 struct spdk_fs_request *req; 1431 struct spdk_fs_cb_args *args; 1432 int rc; 1433 1434 req = alloc_fs_request(channel); 1435 if (req == NULL) { 1436 SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name); 1437 return -ENOMEM; 1438 } 1439 1440 args = &req->args; 1441 1442 args->fs = fs; 1443 args->op.rename.old_name = old_name; 1444 args->op.rename.new_name = new_name; 1445 args->sem = &channel->sem; 1446 fs->send_request(__fs_rename_file, req); 1447 sem_wait(&channel->sem); 1448 rc = args->rc; 1449 free_fs_request(req); 1450 return rc; 1451 } 1452 1453 static void 1454 blob_delete_cb(void *ctx, int bserrno) 1455 { 1456 struct spdk_fs_request *req = ctx; 1457 struct spdk_fs_cb_args *args = &req->args; 1458 1459 args->fn.file_op(args->arg, bserrno); 1460 free_fs_request(req); 1461 } 1462 1463 void 1464 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1465 spdk_file_op_complete cb_fn, void *cb_arg) 1466 { 1467 struct spdk_file *f; 1468 spdk_blob_id blobid; 1469 struct spdk_fs_request *req; 1470 struct spdk_fs_cb_args *args; 1471 1472 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1473 1474 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1475 cb_fn(cb_arg, -ENAMETOOLONG); 1476 return; 1477 } 1478 1479 f = fs_find_file(fs, name); 1480 if (f == NULL) { 1481 SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name); 1482 cb_fn(cb_arg, -ENOENT); 1483 return; 1484 } 1485 1486 req = alloc_fs_request(fs->md_target.md_fs_channel); 1487 if (req == NULL) { 1488 SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name); 1489 cb_fn(cb_arg, -ENOMEM); 1490 return; 1491 } 1492 1493 args = &req->args; 1494 args->fn.file_op = cb_fn; 1495 args->arg = cb_arg; 1496 1497 if (f->ref_count > 0) { 1498 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1499 f->is_deleted = true; 1500 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1501 spdk_blob_sync_md(f->blob, blob_delete_cb, req); 1502 return; 1503 } 1504 1505 blobid = f->blobid; 1506 TAILQ_REMOVE(&fs->files, f, tailq); 1507 1508 file_free(f); 1509 1510 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1511 } 1512 1513 static void 1514 __fs_delete_file_done(void *arg, int fserrno) 1515 { 1516 struct spdk_fs_request *req = arg; 1517 struct spdk_fs_cb_args *args = &req->args; 1518 1519 spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, args->op.delete.name); 1520 __wake_caller(args, fserrno); 1521 } 1522 1523 static void 1524 __fs_delete_file(void *arg) 1525 { 1526 struct spdk_fs_request *req = arg; 1527 struct spdk_fs_cb_args *args = &req->args; 1528 1529 spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, args->op.delete.name); 1530 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1531 } 1532 1533 int 1534 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1535 const char *name) 1536 { 1537 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1538 struct spdk_fs_request *req; 1539 struct spdk_fs_cb_args *args; 1540 int rc; 1541 1542 req = alloc_fs_request(channel); 1543 if (req == NULL) { 1544 SPDK_DEBUGLOG(blobfs, "Cannot allocate req to delete file=%s\n", name); 1545 return -ENOMEM; 1546 } 1547 1548 args = &req->args; 1549 args->fs = fs; 1550 args->op.delete.name = name; 1551 args->sem = &channel->sem; 1552 fs->send_request(__fs_delete_file, req); 1553 sem_wait(&channel->sem); 1554 rc = args->rc; 1555 free_fs_request(req); 1556 1557 return rc; 1558 } 1559 1560 spdk_fs_iter 1561 spdk_fs_iter_first(struct spdk_filesystem *fs) 1562 { 1563 struct spdk_file *f; 1564 1565 f = TAILQ_FIRST(&fs->files); 1566 return f; 1567 } 1568 1569 spdk_fs_iter 1570 spdk_fs_iter_next(spdk_fs_iter iter) 1571 { 1572 struct spdk_file *f = iter; 1573 1574 if (f == NULL) { 1575 return NULL; 1576 } 1577 1578 f = TAILQ_NEXT(f, tailq); 1579 return f; 1580 } 1581 1582 const char * 1583 spdk_file_get_name(struct spdk_file *file) 1584 { 1585 return file->name; 1586 } 1587 1588 uint64_t 1589 spdk_file_get_length(struct spdk_file *file) 1590 { 1591 uint64_t length; 1592 1593 assert(file != NULL); 1594 1595 length = file->append_pos >= file->length ? file->append_pos : file->length; 1596 SPDK_DEBUGLOG(blobfs, "file=%s length=0x%jx\n", file->name, length); 1597 return length; 1598 } 1599 1600 static void 1601 fs_truncate_complete_cb(void *ctx, int bserrno) 1602 { 1603 struct spdk_fs_request *req = ctx; 1604 struct spdk_fs_cb_args *args = &req->args; 1605 1606 args->fn.file_op(args->arg, bserrno); 1607 free_fs_request(req); 1608 } 1609 1610 static void 1611 fs_truncate_resize_cb(void *ctx, int bserrno) 1612 { 1613 struct spdk_fs_request *req = ctx; 1614 struct spdk_fs_cb_args *args = &req->args; 1615 struct spdk_file *file = args->file; 1616 uint64_t *length = &args->op.truncate.length; 1617 1618 if (bserrno) { 1619 args->fn.file_op(args->arg, bserrno); 1620 free_fs_request(req); 1621 return; 1622 } 1623 1624 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1625 1626 file->length = *length; 1627 if (file->append_pos > file->length) { 1628 file->append_pos = file->length; 1629 } 1630 1631 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req); 1632 } 1633 1634 static uint64_t 1635 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1636 { 1637 return (length + cluster_sz - 1) / cluster_sz; 1638 } 1639 1640 void 1641 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1642 spdk_file_op_complete cb_fn, void *cb_arg) 1643 { 1644 struct spdk_filesystem *fs; 1645 size_t num_clusters; 1646 struct spdk_fs_request *req; 1647 struct spdk_fs_cb_args *args; 1648 1649 SPDK_DEBUGLOG(blobfs, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1650 if (length == file->length) { 1651 cb_fn(cb_arg, 0); 1652 return; 1653 } 1654 1655 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1656 if (req == NULL) { 1657 cb_fn(cb_arg, -ENOMEM); 1658 return; 1659 } 1660 1661 args = &req->args; 1662 args->fn.file_op = cb_fn; 1663 args->arg = cb_arg; 1664 args->file = file; 1665 args->op.truncate.length = length; 1666 fs = file->fs; 1667 1668 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1669 1670 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1671 } 1672 1673 static void 1674 __truncate(void *arg) 1675 { 1676 struct spdk_fs_request *req = arg; 1677 struct spdk_fs_cb_args *args = &req->args; 1678 1679 spdk_file_truncate_async(args->file, args->op.truncate.length, 1680 args->fn.file_op, args); 1681 } 1682 1683 int 1684 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 1685 uint64_t length) 1686 { 1687 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1688 struct spdk_fs_request *req; 1689 struct spdk_fs_cb_args *args; 1690 int rc; 1691 1692 req = alloc_fs_request(channel); 1693 if (req == NULL) { 1694 return -ENOMEM; 1695 } 1696 1697 args = &req->args; 1698 1699 args->file = file; 1700 args->op.truncate.length = length; 1701 args->fn.file_op = __wake_caller; 1702 args->sem = &channel->sem; 1703 1704 channel->send_request(__truncate, req); 1705 sem_wait(&channel->sem); 1706 rc = args->rc; 1707 free_fs_request(req); 1708 1709 return rc; 1710 } 1711 1712 static void 1713 __rw_done(void *ctx, int bserrno) 1714 { 1715 struct spdk_fs_request *req = ctx; 1716 struct spdk_fs_cb_args *args = &req->args; 1717 1718 spdk_free(args->op.rw.pin_buf); 1719 args->fn.file_op(args->arg, bserrno); 1720 free_fs_request(req); 1721 } 1722 1723 static void 1724 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 1725 { 1726 int i; 1727 size_t len; 1728 1729 for (i = 0; i < iovcnt; i++) { 1730 len = spdk_min(iovs[i].iov_len, buf_len); 1731 memcpy(buf, iovs[i].iov_base, len); 1732 buf += len; 1733 assert(buf_len >= len); 1734 buf_len -= len; 1735 } 1736 } 1737 1738 static void 1739 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 1740 { 1741 int i; 1742 size_t len; 1743 1744 for (i = 0; i < iovcnt; i++) { 1745 len = spdk_min(iovs[i].iov_len, buf_len); 1746 memcpy(iovs[i].iov_base, buf, len); 1747 buf += len; 1748 assert(buf_len >= len); 1749 buf_len -= len; 1750 } 1751 } 1752 1753 static void 1754 __read_done(void *ctx, int bserrno) 1755 { 1756 struct spdk_fs_request *req = ctx; 1757 struct spdk_fs_cb_args *args = &req->args; 1758 void *buf; 1759 1760 assert(req != NULL); 1761 buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1))); 1762 if (args->op.rw.is_read) { 1763 _copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length); 1764 __rw_done(req, 0); 1765 } else { 1766 _copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt); 1767 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1768 args->op.rw.pin_buf, 1769 args->op.rw.start_lba, args->op.rw.num_lba, 1770 __rw_done, req); 1771 } 1772 } 1773 1774 static void 1775 __do_blob_read(void *ctx, int fserrno) 1776 { 1777 struct spdk_fs_request *req = ctx; 1778 struct spdk_fs_cb_args *args = &req->args; 1779 1780 if (fserrno) { 1781 __rw_done(req, fserrno); 1782 return; 1783 } 1784 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1785 args->op.rw.pin_buf, 1786 args->op.rw.start_lba, args->op.rw.num_lba, 1787 __read_done, req); 1788 } 1789 1790 static void 1791 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1792 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1793 { 1794 uint64_t end_lba; 1795 1796 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1797 *start_lba = offset / *lba_size; 1798 end_lba = (offset + length - 1) / *lba_size; 1799 *num_lba = (end_lba - *start_lba + 1); 1800 } 1801 1802 static bool 1803 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length) 1804 { 1805 uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1806 1807 if ((offset % lba_size == 0) && (length % lba_size == 0)) { 1808 return true; 1809 } 1810 1811 return false; 1812 } 1813 1814 static void 1815 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt) 1816 { 1817 uint32_t i; 1818 1819 for (i = 0; i < iovcnt; i++) { 1820 req->args.iovs[i].iov_base = iovs[i].iov_base; 1821 req->args.iovs[i].iov_len = iovs[i].iov_len; 1822 } 1823 } 1824 1825 static void 1826 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel, 1827 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1828 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1829 { 1830 struct spdk_fs_request *req; 1831 struct spdk_fs_cb_args *args; 1832 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1833 uint64_t start_lba, num_lba, pin_buf_length; 1834 uint32_t lba_size; 1835 1836 if (is_read && offset + length > file->length) { 1837 cb_fn(cb_arg, -EINVAL); 1838 return; 1839 } 1840 1841 req = alloc_fs_request_with_iov(channel, iovcnt); 1842 if (req == NULL) { 1843 cb_fn(cb_arg, -ENOMEM); 1844 return; 1845 } 1846 1847 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1848 1849 args = &req->args; 1850 args->fn.file_op = cb_fn; 1851 args->arg = cb_arg; 1852 args->file = file; 1853 args->op.rw.channel = channel->bs_channel; 1854 _fs_request_setup_iovs(req, iovs, iovcnt); 1855 args->op.rw.is_read = is_read; 1856 args->op.rw.offset = offset; 1857 args->op.rw.blocklen = lba_size; 1858 1859 pin_buf_length = num_lba * lba_size; 1860 args->op.rw.length = pin_buf_length; 1861 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL, 1862 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1863 if (args->op.rw.pin_buf == NULL) { 1864 SPDK_DEBUGLOG(blobfs, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1865 file->name, offset, length); 1866 free_fs_request(req); 1867 cb_fn(cb_arg, -ENOMEM); 1868 return; 1869 } 1870 1871 args->op.rw.start_lba = start_lba; 1872 args->op.rw.num_lba = num_lba; 1873 1874 if (!is_read && file->length < offset + length) { 1875 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1876 } else if (!is_read && __is_lba_aligned(file, offset, length)) { 1877 _copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt); 1878 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1879 args->op.rw.pin_buf, 1880 args->op.rw.start_lba, args->op.rw.num_lba, 1881 __rw_done, req); 1882 } else { 1883 __do_blob_read(req, 0); 1884 } 1885 } 1886 1887 static void 1888 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel, 1889 void *payload, uint64_t offset, uint64_t length, 1890 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1891 { 1892 struct iovec iov; 1893 1894 iov.iov_base = payload; 1895 iov.iov_len = (size_t)length; 1896 1897 __readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read); 1898 } 1899 1900 void 1901 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1902 void *payload, uint64_t offset, uint64_t length, 1903 spdk_file_op_complete cb_fn, void *cb_arg) 1904 { 1905 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1906 } 1907 1908 void 1909 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel, 1910 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1911 spdk_file_op_complete cb_fn, void *cb_arg) 1912 { 1913 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1914 file->name, offset, length); 1915 1916 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0); 1917 } 1918 1919 void 1920 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1921 void *payload, uint64_t offset, uint64_t length, 1922 spdk_file_op_complete cb_fn, void *cb_arg) 1923 { 1924 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1925 file->name, offset, length); 1926 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1927 } 1928 1929 void 1930 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel, 1931 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1932 spdk_file_op_complete cb_fn, void *cb_arg) 1933 { 1934 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1935 file->name, offset, length); 1936 1937 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1); 1938 } 1939 1940 struct spdk_io_channel * 1941 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1942 { 1943 struct spdk_io_channel *io_channel; 1944 struct spdk_fs_channel *fs_channel; 1945 1946 io_channel = spdk_get_io_channel(&fs->io_target); 1947 fs_channel = spdk_io_channel_get_ctx(io_channel); 1948 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1949 fs_channel->send_request = __send_request_direct; 1950 1951 return io_channel; 1952 } 1953 1954 void 1955 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1956 { 1957 spdk_put_io_channel(channel); 1958 } 1959 1960 struct spdk_fs_thread_ctx * 1961 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs) 1962 { 1963 struct spdk_fs_thread_ctx *ctx; 1964 1965 ctx = calloc(1, sizeof(*ctx)); 1966 if (!ctx) { 1967 return NULL; 1968 } 1969 1970 if (pthread_spin_init(&ctx->ch.lock, 0)) { 1971 free(ctx); 1972 return NULL; 1973 } 1974 1975 fs_channel_create(fs, &ctx->ch, 512); 1976 1977 ctx->ch.send_request = fs->send_request; 1978 ctx->ch.sync = 1; 1979 1980 return ctx; 1981 } 1982 1983 1984 void 1985 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx) 1986 { 1987 assert(ctx->ch.sync == 1); 1988 1989 while (true) { 1990 pthread_spin_lock(&ctx->ch.lock); 1991 if (ctx->ch.outstanding_reqs == 0) { 1992 pthread_spin_unlock(&ctx->ch.lock); 1993 break; 1994 } 1995 pthread_spin_unlock(&ctx->ch.lock); 1996 usleep(1000); 1997 } 1998 1999 fs_channel_destroy(NULL, &ctx->ch); 2000 free(ctx); 2001 } 2002 2003 int 2004 spdk_fs_set_cache_size(uint64_t size_in_mb) 2005 { 2006 /* setting g_fs_cache_size is only permitted if cache pool 2007 * is already freed or hasn't been initialized 2008 */ 2009 if (g_cache_pool != NULL) { 2010 return -EPERM; 2011 } 2012 2013 g_fs_cache_size = size_in_mb * 1024 * 1024; 2014 2015 return 0; 2016 } 2017 2018 uint64_t 2019 spdk_fs_get_cache_size(void) 2020 { 2021 return g_fs_cache_size / (1024 * 1024); 2022 } 2023 2024 static void __file_flush(void *ctx); 2025 2026 /* Try to free some cache buffers from this file. 2027 */ 2028 static int 2029 reclaim_cache_buffers(struct spdk_file *file) 2030 { 2031 int rc; 2032 2033 BLOBFS_TRACE(file, "free=%s\n", file->name); 2034 2035 /* The function is safe to be called with any threads, while the file 2036 * lock maybe locked by other thread for now, so try to get the file 2037 * lock here. 2038 */ 2039 rc = pthread_spin_trylock(&file->lock); 2040 if (rc != 0) { 2041 return -1; 2042 } 2043 2044 if (file->tree->present_mask == 0) { 2045 pthread_spin_unlock(&file->lock); 2046 return -1; 2047 } 2048 tree_free_buffers(file->tree); 2049 2050 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2051 /* If not freed, put it in the end of the queue */ 2052 if (file->tree->present_mask != 0) { 2053 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2054 } else { 2055 file->last = NULL; 2056 } 2057 pthread_spin_unlock(&file->lock); 2058 2059 return 0; 2060 } 2061 2062 static int 2063 _blobfs_cache_pool_reclaim(void *arg) 2064 { 2065 struct spdk_file *file, *tmp; 2066 int rc; 2067 2068 if (!blobfs_cache_pool_need_reclaim()) { 2069 return SPDK_POLLER_IDLE; 2070 } 2071 2072 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2073 if (!file->open_for_writing && 2074 file->priority == SPDK_FILE_PRIORITY_LOW) { 2075 rc = reclaim_cache_buffers(file); 2076 if (rc < 0) { 2077 continue; 2078 } 2079 if (!blobfs_cache_pool_need_reclaim()) { 2080 return SPDK_POLLER_BUSY; 2081 } 2082 break; 2083 } 2084 } 2085 2086 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2087 if (!file->open_for_writing) { 2088 rc = reclaim_cache_buffers(file); 2089 if (rc < 0) { 2090 continue; 2091 } 2092 if (!blobfs_cache_pool_need_reclaim()) { 2093 return SPDK_POLLER_BUSY; 2094 } 2095 break; 2096 } 2097 } 2098 2099 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2100 rc = reclaim_cache_buffers(file); 2101 if (rc < 0) { 2102 continue; 2103 } 2104 break; 2105 } 2106 2107 return SPDK_POLLER_BUSY; 2108 } 2109 2110 static void 2111 _add_file_to_cache_pool(void *ctx) 2112 { 2113 struct spdk_file *file = ctx; 2114 2115 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2116 } 2117 2118 static void 2119 _remove_file_from_cache_pool(void *ctx) 2120 { 2121 struct spdk_file *file = ctx; 2122 2123 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2124 } 2125 2126 static struct cache_buffer * 2127 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 2128 { 2129 struct cache_buffer *buf; 2130 int count = 0; 2131 bool need_update = false; 2132 2133 buf = calloc(1, sizeof(*buf)); 2134 if (buf == NULL) { 2135 SPDK_DEBUGLOG(blobfs, "calloc failed\n"); 2136 return NULL; 2137 } 2138 2139 do { 2140 buf->buf = spdk_mempool_get(g_cache_pool); 2141 if (buf->buf) { 2142 break; 2143 } 2144 if (count++ == 100) { 2145 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 2146 file, offset); 2147 free(buf); 2148 return NULL; 2149 } 2150 usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 2151 } while (true); 2152 2153 buf->buf_size = CACHE_BUFFER_SIZE; 2154 buf->offset = offset; 2155 2156 if (file->tree->present_mask == 0) { 2157 need_update = true; 2158 } 2159 file->tree = tree_insert_buffer(file->tree, buf); 2160 2161 if (need_update) { 2162 spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file); 2163 } 2164 2165 return buf; 2166 } 2167 2168 static struct cache_buffer * 2169 cache_append_buffer(struct spdk_file *file) 2170 { 2171 struct cache_buffer *last; 2172 2173 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 2174 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 2175 2176 last = cache_insert_buffer(file, file->append_pos); 2177 if (last == NULL) { 2178 SPDK_DEBUGLOG(blobfs, "cache_insert_buffer failed\n"); 2179 return NULL; 2180 } 2181 2182 file->last = last; 2183 2184 return last; 2185 } 2186 2187 static void __check_sync_reqs(struct spdk_file *file); 2188 2189 static void 2190 __file_cache_finish_sync(void *ctx, int bserrno) 2191 { 2192 struct spdk_file *file; 2193 struct spdk_fs_request *sync_req = ctx; 2194 struct spdk_fs_cb_args *sync_args; 2195 2196 sync_args = &sync_req->args; 2197 file = sync_args->file; 2198 pthread_spin_lock(&file->lock); 2199 file->length_xattr = sync_args->op.sync.length; 2200 assert(sync_args->op.sync.offset <= file->length_flushed); 2201 spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset, 2202 0, file->name); 2203 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 2204 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 2205 pthread_spin_unlock(&file->lock); 2206 2207 sync_args->fn.file_op(sync_args->arg, bserrno); 2208 2209 free_fs_request(sync_req); 2210 __check_sync_reqs(file); 2211 } 2212 2213 static void 2214 __check_sync_reqs(struct spdk_file *file) 2215 { 2216 struct spdk_fs_request *sync_req; 2217 2218 pthread_spin_lock(&file->lock); 2219 2220 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 2221 if (sync_req->args.op.sync.offset <= file->length_flushed) { 2222 break; 2223 } 2224 } 2225 2226 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 2227 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 2228 sync_req->args.op.sync.xattr_in_progress = true; 2229 sync_req->args.op.sync.length = file->length_flushed; 2230 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 2231 sizeof(file->length_flushed)); 2232 2233 pthread_spin_unlock(&file->lock); 2234 spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed, 2235 0, file->name); 2236 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req); 2237 } else { 2238 pthread_spin_unlock(&file->lock); 2239 } 2240 } 2241 2242 static void 2243 __file_flush_done(void *ctx, int bserrno) 2244 { 2245 struct spdk_fs_request *req = ctx; 2246 struct spdk_fs_cb_args *args = &req->args; 2247 struct spdk_file *file = args->file; 2248 struct cache_buffer *next = args->op.flush.cache_buffer; 2249 2250 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 2251 2252 pthread_spin_lock(&file->lock); 2253 next->in_progress = false; 2254 next->bytes_flushed += args->op.flush.length; 2255 file->length_flushed += args->op.flush.length; 2256 if (file->length_flushed > file->length) { 2257 file->length = file->length_flushed; 2258 } 2259 if (next->bytes_flushed == next->buf_size) { 2260 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 2261 next = tree_find_buffer(file->tree, file->length_flushed); 2262 } 2263 2264 /* 2265 * Assert that there is no cached data that extends past the end of the underlying 2266 * blob. 2267 */ 2268 assert(next == NULL || next->offset < __file_get_blob_size(file) || 2269 next->bytes_filled == 0); 2270 2271 pthread_spin_unlock(&file->lock); 2272 2273 __check_sync_reqs(file); 2274 2275 __file_flush(req); 2276 } 2277 2278 static void 2279 __file_flush(void *ctx) 2280 { 2281 struct spdk_fs_request *req = ctx; 2282 struct spdk_fs_cb_args *args = &req->args; 2283 struct spdk_file *file = args->file; 2284 struct cache_buffer *next; 2285 uint64_t offset, length, start_lba, num_lba; 2286 uint32_t lba_size; 2287 2288 pthread_spin_lock(&file->lock); 2289 next = tree_find_buffer(file->tree, file->length_flushed); 2290 if (next == NULL || next->in_progress || 2291 ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) { 2292 /* 2293 * There is either no data to flush, a flush I/O is already in 2294 * progress, or the next buffer is partially filled but there's no 2295 * outstanding request to sync it. 2296 * So return immediately - if a flush I/O is in progress we will flush 2297 * more data after that is completed, or a partial buffer will get flushed 2298 * when it is either filled or the file is synced. 2299 */ 2300 free_fs_request(req); 2301 if (next == NULL) { 2302 /* 2303 * For cases where a file's cache was evicted, and then the 2304 * file was later appended, we will write the data directly 2305 * to disk and bypass cache. So just update length_flushed 2306 * here to reflect that all data was already written to disk. 2307 */ 2308 file->length_flushed = file->append_pos; 2309 } 2310 pthread_spin_unlock(&file->lock); 2311 if (next == NULL) { 2312 /* 2313 * There is no data to flush, but we still need to check for any 2314 * outstanding sync requests to make sure metadata gets updated. 2315 */ 2316 __check_sync_reqs(file); 2317 } 2318 return; 2319 } 2320 2321 offset = next->offset + next->bytes_flushed; 2322 length = next->bytes_filled - next->bytes_flushed; 2323 if (length == 0) { 2324 free_fs_request(req); 2325 pthread_spin_unlock(&file->lock); 2326 /* 2327 * There is no data to flush, but we still need to check for any 2328 * outstanding sync requests to make sure metadata gets updated. 2329 */ 2330 __check_sync_reqs(file); 2331 return; 2332 } 2333 args->op.flush.length = length; 2334 args->op.flush.cache_buffer = next; 2335 2336 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2337 2338 next->in_progress = true; 2339 BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n", 2340 offset, length, start_lba, num_lba); 2341 pthread_spin_unlock(&file->lock); 2342 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2343 next->buf + (start_lba * lba_size) - next->offset, 2344 start_lba, num_lba, __file_flush_done, req); 2345 } 2346 2347 static void 2348 __file_extend_done(void *arg, int bserrno) 2349 { 2350 struct spdk_fs_cb_args *args = arg; 2351 2352 __wake_caller(args, bserrno); 2353 } 2354 2355 static void 2356 __file_extend_resize_cb(void *_args, int bserrno) 2357 { 2358 struct spdk_fs_cb_args *args = _args; 2359 struct spdk_file *file = args->file; 2360 2361 if (bserrno) { 2362 __wake_caller(args, bserrno); 2363 return; 2364 } 2365 2366 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2367 } 2368 2369 static void 2370 __file_extend_blob(void *_args) 2371 { 2372 struct spdk_fs_cb_args *args = _args; 2373 struct spdk_file *file = args->file; 2374 2375 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2376 } 2377 2378 static void 2379 __rw_from_file_done(void *ctx, int bserrno) 2380 { 2381 struct spdk_fs_request *req = ctx; 2382 2383 __wake_caller(&req->args, bserrno); 2384 free_fs_request(req); 2385 } 2386 2387 static void 2388 __rw_from_file(void *ctx) 2389 { 2390 struct spdk_fs_request *req = ctx; 2391 struct spdk_fs_cb_args *args = &req->args; 2392 struct spdk_file *file = args->file; 2393 2394 if (args->op.rw.is_read) { 2395 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2396 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2397 __rw_from_file_done, req); 2398 } else { 2399 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2400 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2401 __rw_from_file_done, req); 2402 } 2403 } 2404 2405 struct rw_from_file_arg { 2406 struct spdk_fs_channel *channel; 2407 int rwerrno; 2408 }; 2409 2410 static int 2411 __send_rw_from_file(struct spdk_file *file, void *payload, 2412 uint64_t offset, uint64_t length, bool is_read, 2413 struct rw_from_file_arg *arg) 2414 { 2415 struct spdk_fs_request *req; 2416 struct spdk_fs_cb_args *args; 2417 2418 req = alloc_fs_request_with_iov(arg->channel, 1); 2419 if (req == NULL) { 2420 sem_post(&arg->channel->sem); 2421 return -ENOMEM; 2422 } 2423 2424 args = &req->args; 2425 args->file = file; 2426 args->sem = &arg->channel->sem; 2427 args->iovs[0].iov_base = payload; 2428 args->iovs[0].iov_len = (size_t)length; 2429 args->op.rw.offset = offset; 2430 args->op.rw.is_read = is_read; 2431 args->rwerrno = &arg->rwerrno; 2432 file->fs->send_request(__rw_from_file, req); 2433 return 0; 2434 } 2435 2436 int 2437 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2438 void *payload, uint64_t offset, uint64_t length) 2439 { 2440 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2441 struct spdk_fs_request *flush_req; 2442 uint64_t rem_length, copy, blob_size, cluster_sz; 2443 uint32_t cache_buffers_filled = 0; 2444 uint8_t *cur_payload; 2445 struct cache_buffer *last; 2446 2447 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2448 2449 if (length == 0) { 2450 return 0; 2451 } 2452 2453 if (offset != file->append_pos) { 2454 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2455 return -EINVAL; 2456 } 2457 2458 pthread_spin_lock(&file->lock); 2459 file->open_for_writing = true; 2460 2461 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2462 cache_append_buffer(file); 2463 } 2464 2465 if (file->last == NULL) { 2466 struct rw_from_file_arg arg = {}; 2467 int rc; 2468 2469 arg.channel = channel; 2470 arg.rwerrno = 0; 2471 file->append_pos += length; 2472 pthread_spin_unlock(&file->lock); 2473 rc = __send_rw_from_file(file, payload, offset, length, false, &arg); 2474 if (rc != 0) { 2475 return rc; 2476 } 2477 sem_wait(&channel->sem); 2478 return arg.rwerrno; 2479 } 2480 2481 blob_size = __file_get_blob_size(file); 2482 2483 if ((offset + length) > blob_size) { 2484 struct spdk_fs_cb_args extend_args = {}; 2485 2486 cluster_sz = file->fs->bs_opts.cluster_sz; 2487 extend_args.sem = &channel->sem; 2488 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2489 extend_args.file = file; 2490 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2491 pthread_spin_unlock(&file->lock); 2492 file->fs->send_request(__file_extend_blob, &extend_args); 2493 sem_wait(&channel->sem); 2494 if (extend_args.rc) { 2495 return extend_args.rc; 2496 } 2497 } 2498 2499 flush_req = alloc_fs_request(channel); 2500 if (flush_req == NULL) { 2501 pthread_spin_unlock(&file->lock); 2502 return -ENOMEM; 2503 } 2504 2505 last = file->last; 2506 rem_length = length; 2507 cur_payload = payload; 2508 while (rem_length > 0) { 2509 copy = last->buf_size - last->bytes_filled; 2510 if (copy > rem_length) { 2511 copy = rem_length; 2512 } 2513 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2514 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2515 file->append_pos += copy; 2516 if (file->length < file->append_pos) { 2517 file->length = file->append_pos; 2518 } 2519 cur_payload += copy; 2520 last->bytes_filled += copy; 2521 rem_length -= copy; 2522 if (last->bytes_filled == last->buf_size) { 2523 cache_buffers_filled++; 2524 last = cache_append_buffer(file); 2525 if (last == NULL) { 2526 BLOBFS_TRACE(file, "nomem\n"); 2527 free_fs_request(flush_req); 2528 pthread_spin_unlock(&file->lock); 2529 return -ENOMEM; 2530 } 2531 } 2532 } 2533 2534 pthread_spin_unlock(&file->lock); 2535 2536 if (cache_buffers_filled == 0) { 2537 free_fs_request(flush_req); 2538 return 0; 2539 } 2540 2541 flush_req->args.file = file; 2542 file->fs->send_request(__file_flush, flush_req); 2543 return 0; 2544 } 2545 2546 static void 2547 __readahead_done(void *ctx, int bserrno) 2548 { 2549 struct spdk_fs_request *req = ctx; 2550 struct spdk_fs_cb_args *args = &req->args; 2551 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2552 struct spdk_file *file = args->file; 2553 2554 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2555 2556 pthread_spin_lock(&file->lock); 2557 cache_buffer->bytes_filled = args->op.readahead.length; 2558 cache_buffer->bytes_flushed = args->op.readahead.length; 2559 cache_buffer->in_progress = false; 2560 pthread_spin_unlock(&file->lock); 2561 2562 free_fs_request(req); 2563 } 2564 2565 static void 2566 __readahead(void *ctx) 2567 { 2568 struct spdk_fs_request *req = ctx; 2569 struct spdk_fs_cb_args *args = &req->args; 2570 struct spdk_file *file = args->file; 2571 uint64_t offset, length, start_lba, num_lba; 2572 uint32_t lba_size; 2573 2574 offset = args->op.readahead.offset; 2575 length = args->op.readahead.length; 2576 assert(length > 0); 2577 2578 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2579 2580 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2581 offset, length, start_lba, num_lba); 2582 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2583 args->op.readahead.cache_buffer->buf, 2584 start_lba, num_lba, __readahead_done, req); 2585 } 2586 2587 static uint64_t 2588 __next_cache_buffer_offset(uint64_t offset) 2589 { 2590 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2591 } 2592 2593 static void 2594 check_readahead(struct spdk_file *file, uint64_t offset, 2595 struct spdk_fs_channel *channel) 2596 { 2597 struct spdk_fs_request *req; 2598 struct spdk_fs_cb_args *args; 2599 2600 offset = __next_cache_buffer_offset(offset); 2601 if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2602 return; 2603 } 2604 2605 req = alloc_fs_request(channel); 2606 if (req == NULL) { 2607 return; 2608 } 2609 args = &req->args; 2610 2611 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2612 2613 args->file = file; 2614 args->op.readahead.offset = offset; 2615 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2616 if (!args->op.readahead.cache_buffer) { 2617 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2618 free_fs_request(req); 2619 return; 2620 } 2621 2622 args->op.readahead.cache_buffer->in_progress = true; 2623 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2624 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2625 } else { 2626 args->op.readahead.length = CACHE_BUFFER_SIZE; 2627 } 2628 file->fs->send_request(__readahead, req); 2629 } 2630 2631 int64_t 2632 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2633 void *payload, uint64_t offset, uint64_t length) 2634 { 2635 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2636 uint64_t final_offset, final_length; 2637 uint32_t sub_reads = 0; 2638 struct cache_buffer *buf; 2639 uint64_t read_len; 2640 struct rw_from_file_arg arg = {}; 2641 2642 pthread_spin_lock(&file->lock); 2643 2644 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2645 2646 file->open_for_writing = false; 2647 2648 if (length == 0 || offset >= file->append_pos) { 2649 pthread_spin_unlock(&file->lock); 2650 return 0; 2651 } 2652 2653 if (offset + length > file->append_pos) { 2654 length = file->append_pos - offset; 2655 } 2656 2657 if (offset != file->next_seq_offset) { 2658 file->seq_byte_count = 0; 2659 } 2660 file->seq_byte_count += length; 2661 file->next_seq_offset = offset + length; 2662 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2663 check_readahead(file, offset, channel); 2664 check_readahead(file, offset + CACHE_BUFFER_SIZE, channel); 2665 } 2666 2667 arg.channel = channel; 2668 arg.rwerrno = 0; 2669 final_length = 0; 2670 final_offset = offset + length; 2671 while (offset < final_offset) { 2672 int ret = 0; 2673 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2674 if (length > (final_offset - offset)) { 2675 length = final_offset - offset; 2676 } 2677 2678 buf = tree_find_filled_buffer(file->tree, offset); 2679 if (buf == NULL) { 2680 pthread_spin_unlock(&file->lock); 2681 ret = __send_rw_from_file(file, payload, offset, length, true, &arg); 2682 pthread_spin_lock(&file->lock); 2683 if (ret == 0) { 2684 sub_reads++; 2685 } 2686 } else { 2687 read_len = length; 2688 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2689 read_len = buf->offset + buf->bytes_filled - offset; 2690 } 2691 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len); 2692 memcpy(payload, &buf->buf[offset - buf->offset], read_len); 2693 if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) { 2694 tree_remove_buffer(file->tree, buf); 2695 if (file->tree->present_mask == 0) { 2696 spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file); 2697 } 2698 } 2699 } 2700 2701 if (ret == 0) { 2702 final_length += length; 2703 } else { 2704 arg.rwerrno = ret; 2705 break; 2706 } 2707 payload += length; 2708 offset += length; 2709 } 2710 pthread_spin_unlock(&file->lock); 2711 while (sub_reads > 0) { 2712 sem_wait(&channel->sem); 2713 sub_reads--; 2714 } 2715 if (arg.rwerrno == 0) { 2716 return final_length; 2717 } else { 2718 return arg.rwerrno; 2719 } 2720 } 2721 2722 static void 2723 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2724 spdk_file_op_complete cb_fn, void *cb_arg) 2725 { 2726 struct spdk_fs_request *sync_req; 2727 struct spdk_fs_request *flush_req; 2728 struct spdk_fs_cb_args *sync_args; 2729 struct spdk_fs_cb_args *flush_args; 2730 2731 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2732 2733 pthread_spin_lock(&file->lock); 2734 if (file->append_pos <= file->length_xattr) { 2735 BLOBFS_TRACE(file, "done - file already synced\n"); 2736 pthread_spin_unlock(&file->lock); 2737 cb_fn(cb_arg, 0); 2738 return; 2739 } 2740 2741 sync_req = alloc_fs_request(channel); 2742 if (!sync_req) { 2743 SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name); 2744 pthread_spin_unlock(&file->lock); 2745 cb_fn(cb_arg, -ENOMEM); 2746 return; 2747 } 2748 sync_args = &sync_req->args; 2749 2750 flush_req = alloc_fs_request(channel); 2751 if (!flush_req) { 2752 SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name); 2753 free_fs_request(sync_req); 2754 pthread_spin_unlock(&file->lock); 2755 cb_fn(cb_arg, -ENOMEM); 2756 return; 2757 } 2758 flush_args = &flush_req->args; 2759 2760 sync_args->file = file; 2761 sync_args->fn.file_op = cb_fn; 2762 sync_args->arg = cb_arg; 2763 sync_args->op.sync.offset = file->append_pos; 2764 sync_args->op.sync.xattr_in_progress = false; 2765 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2766 pthread_spin_unlock(&file->lock); 2767 2768 flush_args->file = file; 2769 channel->send_request(__file_flush, flush_req); 2770 } 2771 2772 int 2773 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2774 { 2775 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2776 struct spdk_fs_cb_args args = {}; 2777 2778 args.sem = &channel->sem; 2779 _file_sync(file, channel, __wake_caller, &args); 2780 sem_wait(&channel->sem); 2781 2782 return args.rc; 2783 } 2784 2785 void 2786 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2787 spdk_file_op_complete cb_fn, void *cb_arg) 2788 { 2789 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2790 2791 _file_sync(file, channel, cb_fn, cb_arg); 2792 } 2793 2794 void 2795 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2796 { 2797 BLOBFS_TRACE(file, "priority=%u\n", priority); 2798 file->priority = priority; 2799 2800 } 2801 2802 /* 2803 * Close routines 2804 */ 2805 2806 static void 2807 __file_close_async_done(void *ctx, int bserrno) 2808 { 2809 struct spdk_fs_request *req = ctx; 2810 struct spdk_fs_cb_args *args = &req->args; 2811 struct spdk_file *file = args->file; 2812 2813 spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->name); 2814 2815 if (file->is_deleted) { 2816 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2817 return; 2818 } 2819 2820 args->fn.file_op(args->arg, bserrno); 2821 free_fs_request(req); 2822 } 2823 2824 static void 2825 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2826 { 2827 struct spdk_blob *blob; 2828 2829 pthread_spin_lock(&file->lock); 2830 if (file->ref_count == 0) { 2831 pthread_spin_unlock(&file->lock); 2832 __file_close_async_done(req, -EBADF); 2833 return; 2834 } 2835 2836 file->ref_count--; 2837 if (file->ref_count > 0) { 2838 pthread_spin_unlock(&file->lock); 2839 req->args.fn.file_op(req->args.arg, 0); 2840 free_fs_request(req); 2841 return; 2842 } 2843 2844 pthread_spin_unlock(&file->lock); 2845 2846 blob = file->blob; 2847 file->blob = NULL; 2848 spdk_blob_close(blob, __file_close_async_done, req); 2849 } 2850 2851 static void 2852 __file_close_async__sync_done(void *arg, int fserrno) 2853 { 2854 struct spdk_fs_request *req = arg; 2855 struct spdk_fs_cb_args *args = &req->args; 2856 2857 __file_close_async(args->file, req); 2858 } 2859 2860 void 2861 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2862 { 2863 struct spdk_fs_request *req; 2864 struct spdk_fs_cb_args *args; 2865 2866 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2867 if (req == NULL) { 2868 SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name); 2869 cb_fn(cb_arg, -ENOMEM); 2870 return; 2871 } 2872 2873 args = &req->args; 2874 args->file = file; 2875 args->fn.file_op = cb_fn; 2876 args->arg = cb_arg; 2877 2878 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2879 } 2880 2881 static void 2882 __file_close(void *arg) 2883 { 2884 struct spdk_fs_request *req = arg; 2885 struct spdk_fs_cb_args *args = &req->args; 2886 struct spdk_file *file = args->file; 2887 2888 __file_close_async(file, req); 2889 } 2890 2891 int 2892 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2893 { 2894 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2895 struct spdk_fs_request *req; 2896 struct spdk_fs_cb_args *args; 2897 2898 req = alloc_fs_request(channel); 2899 if (req == NULL) { 2900 SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name); 2901 return -ENOMEM; 2902 } 2903 2904 args = &req->args; 2905 2906 spdk_file_sync(file, ctx); 2907 BLOBFS_TRACE(file, "name=%s\n", file->name); 2908 args->file = file; 2909 args->sem = &channel->sem; 2910 args->fn.file_op = __wake_caller; 2911 args->arg = args; 2912 channel->send_request(__file_close, req); 2913 sem_wait(&channel->sem); 2914 2915 return args->rc; 2916 } 2917 2918 int 2919 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2920 { 2921 if (size < sizeof(spdk_blob_id)) { 2922 return -EINVAL; 2923 } 2924 2925 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2926 2927 return sizeof(spdk_blob_id); 2928 } 2929 2930 static void 2931 _file_free(void *ctx) 2932 { 2933 struct spdk_file *file = ctx; 2934 2935 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2936 2937 free(file->name); 2938 free(file->tree); 2939 free(file); 2940 } 2941 2942 static void 2943 file_free(struct spdk_file *file) 2944 { 2945 BLOBFS_TRACE(file, "free=%s\n", file->name); 2946 pthread_spin_lock(&file->lock); 2947 if (file->tree->present_mask == 0) { 2948 pthread_spin_unlock(&file->lock); 2949 free(file->name); 2950 free(file->tree); 2951 free(file); 2952 return; 2953 } 2954 2955 tree_free_buffers(file->tree); 2956 assert(file->tree->present_mask == 0); 2957 spdk_thread_send_msg(g_cache_pool_thread, _file_free, file); 2958 pthread_spin_unlock(&file->lock); 2959 } 2960 2961 SPDK_LOG_REGISTER_COMPONENT(blobfs) 2962 SPDK_LOG_REGISTER_COMPONENT(blobfs_rw) 2963