1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "tree.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/thread.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 #include "spdk/trace.h" 47 48 #define BLOBFS_TRACE(file, str, args...) \ 49 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 50 51 #define BLOBFS_TRACE_RW(file, str, args...) \ 52 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 53 54 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 55 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 56 57 #define SPDK_BLOBFS_SIGNATURE "BLOBFS" 58 59 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 60 static struct spdk_mempool *g_cache_pool; 61 static TAILQ_HEAD(, spdk_file) g_caches; 62 static struct spdk_poller *g_cache_pool_mgmt_poller; 63 static struct spdk_thread *g_cache_pool_thread; 64 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL 65 static int g_fs_count = 0; 66 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 67 68 #define TRACE_GROUP_BLOBFS 0x7 69 #define TRACE_BLOBFS_XATTR_START SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0) 70 #define TRACE_BLOBFS_XATTR_END SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1) 71 #define TRACE_BLOBFS_OPEN SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2) 72 #define TRACE_BLOBFS_CLOSE SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3) 73 #define TRACE_BLOBFS_DELETE_START SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4) 74 #define TRACE_BLOBFS_DELETE_DONE SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5) 75 76 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS) 77 { 78 spdk_trace_register_description("BLOBFS_XATTR_START", 79 TRACE_BLOBFS_XATTR_START, 80 OWNER_NONE, OBJECT_NONE, 0, 81 SPDK_TRACE_ARG_TYPE_STR, 82 "file: "); 83 spdk_trace_register_description("BLOBFS_XATTR_END", 84 TRACE_BLOBFS_XATTR_END, 85 OWNER_NONE, OBJECT_NONE, 0, 86 SPDK_TRACE_ARG_TYPE_STR, 87 "file: "); 88 spdk_trace_register_description("BLOBFS_OPEN", 89 TRACE_BLOBFS_OPEN, 90 OWNER_NONE, OBJECT_NONE, 0, 91 SPDK_TRACE_ARG_TYPE_STR, 92 "file: "); 93 spdk_trace_register_description("BLOBFS_CLOSE", 94 TRACE_BLOBFS_CLOSE, 95 OWNER_NONE, OBJECT_NONE, 0, 96 SPDK_TRACE_ARG_TYPE_STR, 97 "file: "); 98 spdk_trace_register_description("BLOBFS_DELETE_START", 99 TRACE_BLOBFS_DELETE_START, 100 OWNER_NONE, OBJECT_NONE, 0, 101 SPDK_TRACE_ARG_TYPE_STR, 102 "file: "); 103 spdk_trace_register_description("BLOBFS_DELETE_DONE", 104 TRACE_BLOBFS_DELETE_DONE, 105 OWNER_NONE, OBJECT_NONE, 0, 106 SPDK_TRACE_ARG_TYPE_STR, 107 "file: "); 108 } 109 110 void 111 cache_buffer_free(struct cache_buffer *cache_buffer) 112 { 113 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 114 free(cache_buffer); 115 } 116 117 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 118 119 struct spdk_file { 120 struct spdk_filesystem *fs; 121 struct spdk_blob *blob; 122 char *name; 123 uint64_t trace_arg_name; 124 uint64_t length; 125 bool is_deleted; 126 bool open_for_writing; 127 uint64_t length_flushed; 128 uint64_t length_xattr; 129 uint64_t append_pos; 130 uint64_t seq_byte_count; 131 uint64_t next_seq_offset; 132 uint32_t priority; 133 TAILQ_ENTRY(spdk_file) tailq; 134 spdk_blob_id blobid; 135 uint32_t ref_count; 136 pthread_spinlock_t lock; 137 struct cache_buffer *last; 138 struct cache_tree *tree; 139 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 140 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 141 TAILQ_ENTRY(spdk_file) cache_tailq; 142 }; 143 144 struct spdk_deleted_file { 145 spdk_blob_id id; 146 TAILQ_ENTRY(spdk_deleted_file) tailq; 147 }; 148 149 struct spdk_filesystem { 150 struct spdk_blob_store *bs; 151 TAILQ_HEAD(, spdk_file) files; 152 struct spdk_bs_opts bs_opts; 153 struct spdk_bs_dev *bdev; 154 fs_send_request_fn send_request; 155 156 struct { 157 uint32_t max_ops; 158 struct spdk_io_channel *sync_io_channel; 159 struct spdk_fs_channel *sync_fs_channel; 160 } sync_target; 161 162 struct { 163 uint32_t max_ops; 164 struct spdk_io_channel *md_io_channel; 165 struct spdk_fs_channel *md_fs_channel; 166 } md_target; 167 168 struct { 169 uint32_t max_ops; 170 } io_target; 171 }; 172 173 struct spdk_fs_cb_args { 174 union { 175 spdk_fs_op_with_handle_complete fs_op_with_handle; 176 spdk_fs_op_complete fs_op; 177 spdk_file_op_with_handle_complete file_op_with_handle; 178 spdk_file_op_complete file_op; 179 spdk_file_stat_op_complete stat_op; 180 } fn; 181 void *arg; 182 sem_t *sem; 183 struct spdk_filesystem *fs; 184 struct spdk_file *file; 185 int rc; 186 struct iovec *iovs; 187 uint32_t iovcnt; 188 struct iovec iov; 189 union { 190 struct { 191 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 192 } fs_load; 193 struct { 194 uint64_t length; 195 } truncate; 196 struct { 197 struct spdk_io_channel *channel; 198 void *pin_buf; 199 int is_read; 200 off_t offset; 201 size_t length; 202 uint64_t start_lba; 203 uint64_t num_lba; 204 uint32_t blocklen; 205 } rw; 206 struct { 207 const char *old_name; 208 const char *new_name; 209 } rename; 210 struct { 211 struct cache_buffer *cache_buffer; 212 uint64_t length; 213 } flush; 214 struct { 215 struct cache_buffer *cache_buffer; 216 uint64_t length; 217 uint64_t offset; 218 } readahead; 219 struct { 220 /* offset of the file when the sync request was made */ 221 uint64_t offset; 222 TAILQ_ENTRY(spdk_fs_request) tailq; 223 bool xattr_in_progress; 224 /* length written to the xattr for this file - this should 225 * always be the same as the offset if only one thread is 226 * writing to the file, but could differ if multiple threads 227 * are appending 228 */ 229 uint64_t length; 230 } sync; 231 struct { 232 uint32_t num_clusters; 233 } resize; 234 struct { 235 const char *name; 236 uint32_t flags; 237 TAILQ_ENTRY(spdk_fs_request) tailq; 238 } open; 239 struct { 240 const char *name; 241 struct spdk_blob *blob; 242 } create; 243 struct { 244 const char *name; 245 } delete; 246 struct { 247 const char *name; 248 } stat; 249 } op; 250 }; 251 252 static void file_free(struct spdk_file *file); 253 static void fs_io_device_unregister(struct spdk_filesystem *fs); 254 static void fs_free_io_channels(struct spdk_filesystem *fs); 255 256 void 257 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 258 { 259 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 260 } 261 262 static int _blobfs_cache_pool_reclaim(void *arg); 263 264 static bool 265 blobfs_cache_pool_need_reclaim(void) 266 { 267 size_t count; 268 269 count = spdk_mempool_count(g_cache_pool); 270 /* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller 271 * when the number of available cache buffer is less than 1/5 of total buffers. 272 */ 273 if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) { 274 return false; 275 } 276 277 return true; 278 } 279 280 static void 281 __start_cache_pool_mgmt(void *ctx) 282 { 283 assert(g_cache_pool == NULL); 284 285 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 286 g_fs_cache_size / CACHE_BUFFER_SIZE, 287 CACHE_BUFFER_SIZE, 288 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 289 SPDK_ENV_SOCKET_ID_ANY); 290 if (!g_cache_pool) { 291 SPDK_ERRLOG("Create mempool failed, you may " 292 "increase the memory and try again\n"); 293 assert(false); 294 } 295 TAILQ_INIT(&g_caches); 296 297 assert(g_cache_pool_mgmt_poller == NULL); 298 g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL, 299 BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 300 } 301 302 static void 303 __stop_cache_pool_mgmt(void *ctx) 304 { 305 spdk_poller_unregister(&g_cache_pool_mgmt_poller); 306 307 assert(g_cache_pool != NULL); 308 assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE); 309 spdk_mempool_free(g_cache_pool); 310 g_cache_pool = NULL; 311 312 spdk_thread_exit(g_cache_pool_thread); 313 } 314 315 static void 316 initialize_global_cache(void) 317 { 318 pthread_mutex_lock(&g_cache_init_lock); 319 if (g_fs_count == 0) { 320 g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL); 321 assert(g_cache_pool_thread != NULL); 322 spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL); 323 } 324 g_fs_count++; 325 pthread_mutex_unlock(&g_cache_init_lock); 326 } 327 328 static void 329 free_global_cache(void) 330 { 331 pthread_mutex_lock(&g_cache_init_lock); 332 g_fs_count--; 333 if (g_fs_count == 0) { 334 spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL); 335 } 336 pthread_mutex_unlock(&g_cache_init_lock); 337 } 338 339 static uint64_t 340 __file_get_blob_size(struct spdk_file *file) 341 { 342 uint64_t cluster_sz; 343 344 cluster_sz = file->fs->bs_opts.cluster_sz; 345 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 346 } 347 348 struct spdk_fs_request { 349 struct spdk_fs_cb_args args; 350 TAILQ_ENTRY(spdk_fs_request) link; 351 struct spdk_fs_channel *channel; 352 }; 353 354 struct spdk_fs_channel { 355 struct spdk_fs_request *req_mem; 356 TAILQ_HEAD(, spdk_fs_request) reqs; 357 sem_t sem; 358 struct spdk_filesystem *fs; 359 struct spdk_io_channel *bs_channel; 360 fs_send_request_fn send_request; 361 bool sync; 362 uint32_t outstanding_reqs; 363 pthread_spinlock_t lock; 364 }; 365 366 /* For now, this is effectively an alias. But eventually we'll shift 367 * some data members over. */ 368 struct spdk_fs_thread_ctx { 369 struct spdk_fs_channel ch; 370 }; 371 372 static struct spdk_fs_request * 373 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt) 374 { 375 struct spdk_fs_request *req; 376 struct iovec *iovs = NULL; 377 378 if (iovcnt > 1) { 379 iovs = calloc(iovcnt, sizeof(struct iovec)); 380 if (!iovs) { 381 return NULL; 382 } 383 } 384 385 if (channel->sync) { 386 pthread_spin_lock(&channel->lock); 387 } 388 389 req = TAILQ_FIRST(&channel->reqs); 390 if (req) { 391 channel->outstanding_reqs++; 392 TAILQ_REMOVE(&channel->reqs, req, link); 393 } 394 395 if (channel->sync) { 396 pthread_spin_unlock(&channel->lock); 397 } 398 399 if (req == NULL) { 400 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 401 free(iovs); 402 return NULL; 403 } 404 memset(req, 0, sizeof(*req)); 405 req->channel = channel; 406 if (iovcnt > 1) { 407 req->args.iovs = iovs; 408 } else { 409 req->args.iovs = &req->args.iov; 410 } 411 req->args.iovcnt = iovcnt; 412 413 return req; 414 } 415 416 static struct spdk_fs_request * 417 alloc_fs_request(struct spdk_fs_channel *channel) 418 { 419 return alloc_fs_request_with_iov(channel, 0); 420 } 421 422 static void 423 free_fs_request(struct spdk_fs_request *req) 424 { 425 struct spdk_fs_channel *channel = req->channel; 426 427 if (req->args.iovcnt > 1) { 428 free(req->args.iovs); 429 } 430 431 if (channel->sync) { 432 pthread_spin_lock(&channel->lock); 433 } 434 435 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 436 channel->outstanding_reqs--; 437 438 if (channel->sync) { 439 pthread_spin_unlock(&channel->lock); 440 } 441 } 442 443 static int 444 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 445 uint32_t max_ops) 446 { 447 uint32_t i; 448 449 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 450 if (!channel->req_mem) { 451 return -1; 452 } 453 454 channel->outstanding_reqs = 0; 455 TAILQ_INIT(&channel->reqs); 456 sem_init(&channel->sem, 0, 0); 457 458 for (i = 0; i < max_ops; i++) { 459 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 460 } 461 462 channel->fs = fs; 463 464 return 0; 465 } 466 467 static int 468 fs_md_channel_create(void *io_device, void *ctx_buf) 469 { 470 struct spdk_filesystem *fs; 471 struct spdk_fs_channel *channel = ctx_buf; 472 473 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 474 475 return fs_channel_create(fs, channel, fs->md_target.max_ops); 476 } 477 478 static int 479 fs_sync_channel_create(void *io_device, void *ctx_buf) 480 { 481 struct spdk_filesystem *fs; 482 struct spdk_fs_channel *channel = ctx_buf; 483 484 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 485 486 return fs_channel_create(fs, channel, fs->sync_target.max_ops); 487 } 488 489 static int 490 fs_io_channel_create(void *io_device, void *ctx_buf) 491 { 492 struct spdk_filesystem *fs; 493 struct spdk_fs_channel *channel = ctx_buf; 494 495 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 496 497 return fs_channel_create(fs, channel, fs->io_target.max_ops); 498 } 499 500 static void 501 fs_channel_destroy(void *io_device, void *ctx_buf) 502 { 503 struct spdk_fs_channel *channel = ctx_buf; 504 505 if (channel->outstanding_reqs > 0) { 506 SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n", 507 channel->outstanding_reqs); 508 } 509 510 free(channel->req_mem); 511 if (channel->bs_channel != NULL) { 512 spdk_bs_free_io_channel(channel->bs_channel); 513 } 514 } 515 516 static void 517 __send_request_direct(fs_request_fn fn, void *arg) 518 { 519 fn(arg); 520 } 521 522 static void 523 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 524 { 525 fs->bs = bs; 526 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 527 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 528 fs->md_target.md_fs_channel->send_request = __send_request_direct; 529 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 530 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 531 532 initialize_global_cache(); 533 } 534 535 static void 536 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 537 { 538 struct spdk_fs_request *req = ctx; 539 struct spdk_fs_cb_args *args = &req->args; 540 struct spdk_filesystem *fs = args->fs; 541 542 if (bserrno == 0) { 543 common_fs_bs_init(fs, bs); 544 } else { 545 free(fs); 546 fs = NULL; 547 } 548 549 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 550 free_fs_request(req); 551 } 552 553 static void 554 fs_conf_parse(void) 555 { 556 struct spdk_conf_section *sp; 557 558 sp = spdk_conf_find_section(NULL, "Blobfs"); 559 if (sp == NULL) { 560 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 561 return; 562 } 563 564 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 565 if (g_fs_cache_buffer_shift <= 0) { 566 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 567 } 568 } 569 570 static struct spdk_filesystem * 571 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 572 { 573 struct spdk_filesystem *fs; 574 575 fs = calloc(1, sizeof(*fs)); 576 if (fs == NULL) { 577 return NULL; 578 } 579 580 fs->bdev = dev; 581 fs->send_request = send_request_fn; 582 TAILQ_INIT(&fs->files); 583 584 fs->md_target.max_ops = 512; 585 spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy, 586 sizeof(struct spdk_fs_channel), "blobfs_md"); 587 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 588 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 589 590 fs->sync_target.max_ops = 512; 591 spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy, 592 sizeof(struct spdk_fs_channel), "blobfs_sync"); 593 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 594 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 595 596 fs->io_target.max_ops = 512; 597 spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy, 598 sizeof(struct spdk_fs_channel), "blobfs_io"); 599 600 return fs; 601 } 602 603 static void 604 __wake_caller(void *arg, int fserrno) 605 { 606 struct spdk_fs_cb_args *args = arg; 607 608 args->rc = fserrno; 609 sem_post(args->sem); 610 } 611 612 void 613 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 614 fs_send_request_fn send_request_fn, 615 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 616 { 617 struct spdk_filesystem *fs; 618 struct spdk_fs_request *req; 619 struct spdk_fs_cb_args *args; 620 struct spdk_bs_opts opts = {}; 621 622 fs = fs_alloc(dev, send_request_fn); 623 if (fs == NULL) { 624 cb_fn(cb_arg, NULL, -ENOMEM); 625 return; 626 } 627 628 fs_conf_parse(); 629 630 req = alloc_fs_request(fs->md_target.md_fs_channel); 631 if (req == NULL) { 632 fs_free_io_channels(fs); 633 fs_io_device_unregister(fs); 634 cb_fn(cb_arg, NULL, -ENOMEM); 635 return; 636 } 637 638 args = &req->args; 639 args->fn.fs_op_with_handle = cb_fn; 640 args->arg = cb_arg; 641 args->fs = fs; 642 643 spdk_bs_opts_init(&opts); 644 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE); 645 if (opt) { 646 opts.cluster_sz = opt->cluster_sz; 647 } 648 spdk_bs_init(dev, &opts, init_cb, req); 649 } 650 651 static struct spdk_file * 652 file_alloc(struct spdk_filesystem *fs) 653 { 654 struct spdk_file *file; 655 656 file = calloc(1, sizeof(*file)); 657 if (file == NULL) { 658 return NULL; 659 } 660 661 file->tree = calloc(1, sizeof(*file->tree)); 662 if (file->tree == NULL) { 663 free(file); 664 return NULL; 665 } 666 667 file->fs = fs; 668 TAILQ_INIT(&file->open_requests); 669 TAILQ_INIT(&file->sync_requests); 670 pthread_spin_init(&file->lock, 0); 671 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 672 file->priority = SPDK_FILE_PRIORITY_LOW; 673 return file; 674 } 675 676 static void fs_load_done(void *ctx, int bserrno); 677 678 static int 679 _handle_deleted_files(struct spdk_fs_request *req) 680 { 681 struct spdk_fs_cb_args *args = &req->args; 682 struct spdk_filesystem *fs = args->fs; 683 684 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 685 struct spdk_deleted_file *deleted_file; 686 687 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 688 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 689 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 690 free(deleted_file); 691 return 0; 692 } 693 694 return 1; 695 } 696 697 static void 698 fs_load_done(void *ctx, int bserrno) 699 { 700 struct spdk_fs_request *req = ctx; 701 struct spdk_fs_cb_args *args = &req->args; 702 struct spdk_filesystem *fs = args->fs; 703 704 /* The filesystem has been loaded. Now check if there are any files that 705 * were marked for deletion before last unload. Do not complete the 706 * fs_load callback until all of them have been deleted on disk. 707 */ 708 if (_handle_deleted_files(req) == 0) { 709 /* We found a file that's been marked for deleting but not actually 710 * deleted yet. This function will get called again once the delete 711 * operation is completed. 712 */ 713 return; 714 } 715 716 args->fn.fs_op_with_handle(args->arg, fs, 0); 717 free_fs_request(req); 718 719 } 720 721 static void 722 _file_build_trace_arg_name(struct spdk_file *f) 723 { 724 f->trace_arg_name = 0; 725 memcpy(&f->trace_arg_name, f->name, 726 spdk_min(sizeof(f->trace_arg_name), strlen(f->name))); 727 } 728 729 static void 730 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 731 { 732 struct spdk_fs_request *req = ctx; 733 struct spdk_fs_cb_args *args = &req->args; 734 struct spdk_filesystem *fs = args->fs; 735 uint64_t *length; 736 const char *name; 737 uint32_t *is_deleted; 738 size_t value_len; 739 740 if (rc < 0) { 741 args->fn.fs_op_with_handle(args->arg, fs, rc); 742 free_fs_request(req); 743 return; 744 } 745 746 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 747 if (rc < 0) { 748 args->fn.fs_op_with_handle(args->arg, fs, rc); 749 free_fs_request(req); 750 return; 751 } 752 753 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 754 if (rc < 0) { 755 args->fn.fs_op_with_handle(args->arg, fs, rc); 756 free_fs_request(req); 757 return; 758 } 759 760 assert(value_len == 8); 761 762 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 763 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 764 if (rc < 0) { 765 struct spdk_file *f; 766 767 f = file_alloc(fs); 768 if (f == NULL) { 769 SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n"); 770 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 771 free_fs_request(req); 772 return; 773 } 774 775 f->name = strdup(name); 776 _file_build_trace_arg_name(f); 777 f->blobid = spdk_blob_get_id(blob); 778 f->length = *length; 779 f->length_flushed = *length; 780 f->length_xattr = *length; 781 f->append_pos = *length; 782 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 783 } else { 784 struct spdk_deleted_file *deleted_file; 785 786 deleted_file = calloc(1, sizeof(*deleted_file)); 787 if (deleted_file == NULL) { 788 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 789 free_fs_request(req); 790 return; 791 } 792 deleted_file->id = spdk_blob_get_id(blob); 793 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 794 } 795 } 796 797 static void 798 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 799 { 800 struct spdk_fs_request *req = ctx; 801 struct spdk_fs_cb_args *args = &req->args; 802 struct spdk_filesystem *fs = args->fs; 803 struct spdk_bs_type bstype; 804 static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE}; 805 static const struct spdk_bs_type zeros; 806 807 if (bserrno != 0) { 808 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 809 free_fs_request(req); 810 fs_free_io_channels(fs); 811 fs_io_device_unregister(fs); 812 return; 813 } 814 815 bstype = spdk_bs_get_bstype(bs); 816 817 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 818 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "assigning bstype\n"); 819 spdk_bs_set_bstype(bs, blobfs_type); 820 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 821 SPDK_ERRLOG("not blobfs\n"); 822 SPDK_LOGDUMP(SPDK_LOG_BLOBFS, "bstype", &bstype, sizeof(bstype)); 823 args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL); 824 free_fs_request(req); 825 fs_free_io_channels(fs); 826 fs_io_device_unregister(fs); 827 return; 828 } 829 830 common_fs_bs_init(fs, bs); 831 fs_load_done(req, 0); 832 } 833 834 static void 835 fs_io_device_unregister(struct spdk_filesystem *fs) 836 { 837 assert(fs != NULL); 838 spdk_io_device_unregister(&fs->md_target, NULL); 839 spdk_io_device_unregister(&fs->sync_target, NULL); 840 spdk_io_device_unregister(&fs->io_target, NULL); 841 free(fs); 842 } 843 844 static void 845 fs_free_io_channels(struct spdk_filesystem *fs) 846 { 847 assert(fs != NULL); 848 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 849 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 850 } 851 852 void 853 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 854 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 855 { 856 struct spdk_filesystem *fs; 857 struct spdk_fs_cb_args *args; 858 struct spdk_fs_request *req; 859 struct spdk_bs_opts bs_opts; 860 861 fs = fs_alloc(dev, send_request_fn); 862 if (fs == NULL) { 863 cb_fn(cb_arg, NULL, -ENOMEM); 864 return; 865 } 866 867 fs_conf_parse(); 868 869 req = alloc_fs_request(fs->md_target.md_fs_channel); 870 if (req == NULL) { 871 fs_free_io_channels(fs); 872 fs_io_device_unregister(fs); 873 cb_fn(cb_arg, NULL, -ENOMEM); 874 return; 875 } 876 877 args = &req->args; 878 args->fn.fs_op_with_handle = cb_fn; 879 args->arg = cb_arg; 880 args->fs = fs; 881 TAILQ_INIT(&args->op.fs_load.deleted_files); 882 spdk_bs_opts_init(&bs_opts); 883 bs_opts.iter_cb_fn = iter_cb; 884 bs_opts.iter_cb_arg = req; 885 spdk_bs_load(dev, &bs_opts, load_cb, req); 886 } 887 888 static void 889 unload_cb(void *ctx, int bserrno) 890 { 891 struct spdk_fs_request *req = ctx; 892 struct spdk_fs_cb_args *args = &req->args; 893 struct spdk_filesystem *fs = args->fs; 894 struct spdk_file *file, *tmp; 895 896 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 897 TAILQ_REMOVE(&fs->files, file, tailq); 898 file_free(file); 899 } 900 901 free_global_cache(); 902 903 args->fn.fs_op(args->arg, bserrno); 904 free(req); 905 906 fs_io_device_unregister(fs); 907 } 908 909 void 910 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 911 { 912 struct spdk_fs_request *req; 913 struct spdk_fs_cb_args *args; 914 915 /* 916 * We must free the md_channel before unloading the blobstore, so just 917 * allocate this request from the general heap. 918 */ 919 req = calloc(1, sizeof(*req)); 920 if (req == NULL) { 921 cb_fn(cb_arg, -ENOMEM); 922 return; 923 } 924 925 args = &req->args; 926 args->fn.fs_op = cb_fn; 927 args->arg = cb_arg; 928 args->fs = fs; 929 930 fs_free_io_channels(fs); 931 spdk_bs_unload(fs->bs, unload_cb, req); 932 } 933 934 static struct spdk_file * 935 fs_find_file(struct spdk_filesystem *fs, const char *name) 936 { 937 struct spdk_file *file; 938 939 TAILQ_FOREACH(file, &fs->files, tailq) { 940 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 941 return file; 942 } 943 } 944 945 return NULL; 946 } 947 948 void 949 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 950 spdk_file_stat_op_complete cb_fn, void *cb_arg) 951 { 952 struct spdk_file_stat stat; 953 struct spdk_file *f = NULL; 954 955 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 956 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 957 return; 958 } 959 960 f = fs_find_file(fs, name); 961 if (f != NULL) { 962 stat.blobid = f->blobid; 963 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 964 cb_fn(cb_arg, &stat, 0); 965 return; 966 } 967 968 cb_fn(cb_arg, NULL, -ENOENT); 969 } 970 971 static void 972 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 973 { 974 struct spdk_fs_request *req = arg; 975 struct spdk_fs_cb_args *args = &req->args; 976 977 args->rc = fserrno; 978 if (fserrno == 0) { 979 memcpy(args->arg, stat, sizeof(*stat)); 980 } 981 sem_post(args->sem); 982 } 983 984 static void 985 __file_stat(void *arg) 986 { 987 struct spdk_fs_request *req = arg; 988 struct spdk_fs_cb_args *args = &req->args; 989 990 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 991 args->fn.stat_op, req); 992 } 993 994 int 995 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 996 const char *name, struct spdk_file_stat *stat) 997 { 998 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 999 struct spdk_fs_request *req; 1000 int rc; 1001 1002 req = alloc_fs_request(channel); 1003 if (req == NULL) { 1004 SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name); 1005 return -ENOMEM; 1006 } 1007 1008 req->args.fs = fs; 1009 req->args.op.stat.name = name; 1010 req->args.fn.stat_op = __copy_stat; 1011 req->args.arg = stat; 1012 req->args.sem = &channel->sem; 1013 channel->send_request(__file_stat, req); 1014 sem_wait(&channel->sem); 1015 1016 rc = req->args.rc; 1017 free_fs_request(req); 1018 1019 return rc; 1020 } 1021 1022 static void 1023 fs_create_blob_close_cb(void *ctx, int bserrno) 1024 { 1025 int rc; 1026 struct spdk_fs_request *req = ctx; 1027 struct spdk_fs_cb_args *args = &req->args; 1028 1029 rc = args->rc ? args->rc : bserrno; 1030 args->fn.file_op(args->arg, rc); 1031 free_fs_request(req); 1032 } 1033 1034 static void 1035 fs_create_blob_resize_cb(void *ctx, int bserrno) 1036 { 1037 struct spdk_fs_request *req = ctx; 1038 struct spdk_fs_cb_args *args = &req->args; 1039 struct spdk_file *f = args->file; 1040 struct spdk_blob *blob = args->op.create.blob; 1041 uint64_t length = 0; 1042 1043 args->rc = bserrno; 1044 if (bserrno) { 1045 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1046 return; 1047 } 1048 1049 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 1050 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 1051 1052 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1053 } 1054 1055 static void 1056 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1057 { 1058 struct spdk_fs_request *req = ctx; 1059 struct spdk_fs_cb_args *args = &req->args; 1060 1061 if (bserrno) { 1062 args->fn.file_op(args->arg, bserrno); 1063 free_fs_request(req); 1064 return; 1065 } 1066 1067 args->op.create.blob = blob; 1068 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 1069 } 1070 1071 static void 1072 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 1073 { 1074 struct spdk_fs_request *req = ctx; 1075 struct spdk_fs_cb_args *args = &req->args; 1076 struct spdk_file *f = args->file; 1077 1078 if (bserrno) { 1079 args->fn.file_op(args->arg, bserrno); 1080 free_fs_request(req); 1081 return; 1082 } 1083 1084 f->blobid = blobid; 1085 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 1086 } 1087 1088 void 1089 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 1090 spdk_file_op_complete cb_fn, void *cb_arg) 1091 { 1092 struct spdk_file *file; 1093 struct spdk_fs_request *req; 1094 struct spdk_fs_cb_args *args; 1095 1096 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1097 cb_fn(cb_arg, -ENAMETOOLONG); 1098 return; 1099 } 1100 1101 file = fs_find_file(fs, name); 1102 if (file != NULL) { 1103 cb_fn(cb_arg, -EEXIST); 1104 return; 1105 } 1106 1107 file = file_alloc(fs); 1108 if (file == NULL) { 1109 SPDK_ERRLOG("Cannot allocate new file for creation\n"); 1110 cb_fn(cb_arg, -ENOMEM); 1111 return; 1112 } 1113 1114 req = alloc_fs_request(fs->md_target.md_fs_channel); 1115 if (req == NULL) { 1116 SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name); 1117 cb_fn(cb_arg, -ENOMEM); 1118 return; 1119 } 1120 1121 args = &req->args; 1122 args->file = file; 1123 args->fn.file_op = cb_fn; 1124 args->arg = cb_arg; 1125 1126 file->name = strdup(name); 1127 _file_build_trace_arg_name(file); 1128 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 1129 } 1130 1131 static void 1132 __fs_create_file_done(void *arg, int fserrno) 1133 { 1134 struct spdk_fs_request *req = arg; 1135 struct spdk_fs_cb_args *args = &req->args; 1136 1137 __wake_caller(args, fserrno); 1138 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1139 } 1140 1141 static void 1142 __fs_create_file(void *arg) 1143 { 1144 struct spdk_fs_request *req = arg; 1145 struct spdk_fs_cb_args *args = &req->args; 1146 1147 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1148 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1149 } 1150 1151 int 1152 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name) 1153 { 1154 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1155 struct spdk_fs_request *req; 1156 struct spdk_fs_cb_args *args; 1157 int rc; 1158 1159 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1160 1161 req = alloc_fs_request(channel); 1162 if (req == NULL) { 1163 SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name); 1164 return -ENOMEM; 1165 } 1166 1167 args = &req->args; 1168 args->fs = fs; 1169 args->op.create.name = name; 1170 args->sem = &channel->sem; 1171 fs->send_request(__fs_create_file, req); 1172 sem_wait(&channel->sem); 1173 rc = args->rc; 1174 free_fs_request(req); 1175 1176 return rc; 1177 } 1178 1179 static void 1180 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1181 { 1182 struct spdk_fs_request *req = ctx; 1183 struct spdk_fs_cb_args *args = &req->args; 1184 struct spdk_file *f = args->file; 1185 1186 f->blob = blob; 1187 while (!TAILQ_EMPTY(&f->open_requests)) { 1188 req = TAILQ_FIRST(&f->open_requests); 1189 args = &req->args; 1190 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1191 spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name); 1192 args->fn.file_op_with_handle(args->arg, f, bserrno); 1193 free_fs_request(req); 1194 } 1195 } 1196 1197 static void 1198 fs_open_blob_create_cb(void *ctx, int bserrno) 1199 { 1200 struct spdk_fs_request *req = ctx; 1201 struct spdk_fs_cb_args *args = &req->args; 1202 struct spdk_file *file = args->file; 1203 struct spdk_filesystem *fs = args->fs; 1204 1205 if (file == NULL) { 1206 /* 1207 * This is from an open with CREATE flag - the file 1208 * is now created so look it up in the file list for this 1209 * filesystem. 1210 */ 1211 file = fs_find_file(fs, args->op.open.name); 1212 assert(file != NULL); 1213 args->file = file; 1214 } 1215 1216 file->ref_count++; 1217 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1218 if (file->ref_count == 1) { 1219 assert(file->blob == NULL); 1220 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1221 } else if (file->blob != NULL) { 1222 fs_open_blob_done(req, file->blob, 0); 1223 } else { 1224 /* 1225 * The blob open for this file is in progress due to a previous 1226 * open request. When that open completes, it will invoke the 1227 * open callback for this request. 1228 */ 1229 } 1230 } 1231 1232 void 1233 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1234 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1235 { 1236 struct spdk_file *f = NULL; 1237 struct spdk_fs_request *req; 1238 struct spdk_fs_cb_args *args; 1239 1240 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1241 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1242 return; 1243 } 1244 1245 f = fs_find_file(fs, name); 1246 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1247 cb_fn(cb_arg, NULL, -ENOENT); 1248 return; 1249 } 1250 1251 if (f != NULL && f->is_deleted == true) { 1252 cb_fn(cb_arg, NULL, -ENOENT); 1253 return; 1254 } 1255 1256 req = alloc_fs_request(fs->md_target.md_fs_channel); 1257 if (req == NULL) { 1258 SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name); 1259 cb_fn(cb_arg, NULL, -ENOMEM); 1260 return; 1261 } 1262 1263 args = &req->args; 1264 args->fn.file_op_with_handle = cb_fn; 1265 args->arg = cb_arg; 1266 args->file = f; 1267 args->fs = fs; 1268 args->op.open.name = name; 1269 1270 if (f == NULL) { 1271 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1272 } else { 1273 fs_open_blob_create_cb(req, 0); 1274 } 1275 } 1276 1277 static void 1278 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1279 { 1280 struct spdk_fs_request *req = arg; 1281 struct spdk_fs_cb_args *args = &req->args; 1282 1283 args->file = file; 1284 __wake_caller(args, bserrno); 1285 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1286 } 1287 1288 static void 1289 __fs_open_file(void *arg) 1290 { 1291 struct spdk_fs_request *req = arg; 1292 struct spdk_fs_cb_args *args = &req->args; 1293 1294 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1295 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1296 __fs_open_file_done, req); 1297 } 1298 1299 int 1300 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1301 const char *name, uint32_t flags, struct spdk_file **file) 1302 { 1303 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1304 struct spdk_fs_request *req; 1305 struct spdk_fs_cb_args *args; 1306 int rc; 1307 1308 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1309 1310 req = alloc_fs_request(channel); 1311 if (req == NULL) { 1312 SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name); 1313 return -ENOMEM; 1314 } 1315 1316 args = &req->args; 1317 args->fs = fs; 1318 args->op.open.name = name; 1319 args->op.open.flags = flags; 1320 args->sem = &channel->sem; 1321 fs->send_request(__fs_open_file, req); 1322 sem_wait(&channel->sem); 1323 rc = args->rc; 1324 if (rc == 0) { 1325 *file = args->file; 1326 } else { 1327 *file = NULL; 1328 } 1329 free_fs_request(req); 1330 1331 return rc; 1332 } 1333 1334 static void 1335 fs_rename_blob_close_cb(void *ctx, int bserrno) 1336 { 1337 struct spdk_fs_request *req = ctx; 1338 struct spdk_fs_cb_args *args = &req->args; 1339 1340 args->fn.fs_op(args->arg, bserrno); 1341 free_fs_request(req); 1342 } 1343 1344 static void 1345 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1346 { 1347 struct spdk_fs_request *req = ctx; 1348 struct spdk_fs_cb_args *args = &req->args; 1349 const char *new_name = args->op.rename.new_name; 1350 1351 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1352 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1353 } 1354 1355 static void 1356 _fs_md_rename_file(struct spdk_fs_request *req) 1357 { 1358 struct spdk_fs_cb_args *args = &req->args; 1359 struct spdk_file *f; 1360 1361 f = fs_find_file(args->fs, args->op.rename.old_name); 1362 if (f == NULL) { 1363 args->fn.fs_op(args->arg, -ENOENT); 1364 free_fs_request(req); 1365 return; 1366 } 1367 1368 free(f->name); 1369 f->name = strdup(args->op.rename.new_name); 1370 _file_build_trace_arg_name(f); 1371 args->file = f; 1372 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1373 } 1374 1375 static void 1376 fs_rename_delete_done(void *arg, int fserrno) 1377 { 1378 _fs_md_rename_file(arg); 1379 } 1380 1381 void 1382 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1383 const char *old_name, const char *new_name, 1384 spdk_file_op_complete cb_fn, void *cb_arg) 1385 { 1386 struct spdk_file *f; 1387 struct spdk_fs_request *req; 1388 struct spdk_fs_cb_args *args; 1389 1390 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1391 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1392 cb_fn(cb_arg, -ENAMETOOLONG); 1393 return; 1394 } 1395 1396 req = alloc_fs_request(fs->md_target.md_fs_channel); 1397 if (req == NULL) { 1398 SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name, 1399 new_name); 1400 cb_fn(cb_arg, -ENOMEM); 1401 return; 1402 } 1403 1404 args = &req->args; 1405 args->fn.fs_op = cb_fn; 1406 args->fs = fs; 1407 args->arg = cb_arg; 1408 args->op.rename.old_name = old_name; 1409 args->op.rename.new_name = new_name; 1410 1411 f = fs_find_file(fs, new_name); 1412 if (f == NULL) { 1413 _fs_md_rename_file(req); 1414 return; 1415 } 1416 1417 /* 1418 * The rename overwrites an existing file. So delete the existing file, then 1419 * do the actual rename. 1420 */ 1421 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1422 } 1423 1424 static void 1425 __fs_rename_file_done(void *arg, int fserrno) 1426 { 1427 struct spdk_fs_request *req = arg; 1428 struct spdk_fs_cb_args *args = &req->args; 1429 1430 __wake_caller(args, fserrno); 1431 } 1432 1433 static void 1434 __fs_rename_file(void *arg) 1435 { 1436 struct spdk_fs_request *req = arg; 1437 struct spdk_fs_cb_args *args = &req->args; 1438 1439 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1440 __fs_rename_file_done, req); 1441 } 1442 1443 int 1444 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1445 const char *old_name, const char *new_name) 1446 { 1447 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1448 struct spdk_fs_request *req; 1449 struct spdk_fs_cb_args *args; 1450 int rc; 1451 1452 req = alloc_fs_request(channel); 1453 if (req == NULL) { 1454 SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name); 1455 return -ENOMEM; 1456 } 1457 1458 args = &req->args; 1459 1460 args->fs = fs; 1461 args->op.rename.old_name = old_name; 1462 args->op.rename.new_name = new_name; 1463 args->sem = &channel->sem; 1464 fs->send_request(__fs_rename_file, req); 1465 sem_wait(&channel->sem); 1466 rc = args->rc; 1467 free_fs_request(req); 1468 return rc; 1469 } 1470 1471 static void 1472 blob_delete_cb(void *ctx, int bserrno) 1473 { 1474 struct spdk_fs_request *req = ctx; 1475 struct spdk_fs_cb_args *args = &req->args; 1476 1477 args->fn.file_op(args->arg, bserrno); 1478 free_fs_request(req); 1479 } 1480 1481 void 1482 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1483 spdk_file_op_complete cb_fn, void *cb_arg) 1484 { 1485 struct spdk_file *f; 1486 spdk_blob_id blobid; 1487 struct spdk_fs_request *req; 1488 struct spdk_fs_cb_args *args; 1489 1490 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1491 1492 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1493 cb_fn(cb_arg, -ENAMETOOLONG); 1494 return; 1495 } 1496 1497 f = fs_find_file(fs, name); 1498 if (f == NULL) { 1499 SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name); 1500 cb_fn(cb_arg, -ENOENT); 1501 return; 1502 } 1503 1504 req = alloc_fs_request(fs->md_target.md_fs_channel); 1505 if (req == NULL) { 1506 SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name); 1507 cb_fn(cb_arg, -ENOMEM); 1508 return; 1509 } 1510 1511 args = &req->args; 1512 args->fn.file_op = cb_fn; 1513 args->arg = cb_arg; 1514 1515 if (f->ref_count > 0) { 1516 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1517 f->is_deleted = true; 1518 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1519 spdk_blob_sync_md(f->blob, blob_delete_cb, req); 1520 return; 1521 } 1522 1523 blobid = f->blobid; 1524 TAILQ_REMOVE(&fs->files, f, tailq); 1525 1526 file_free(f); 1527 1528 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1529 } 1530 1531 static uint64_t 1532 fs_name_to_uint64(const char *name) 1533 { 1534 uint64_t result = 0; 1535 memcpy(&result, name, spdk_min(sizeof(result), strlen(name))); 1536 return result; 1537 } 1538 1539 static void 1540 __fs_delete_file_done(void *arg, int fserrno) 1541 { 1542 struct spdk_fs_request *req = arg; 1543 struct spdk_fs_cb_args *args = &req->args; 1544 1545 spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, fs_name_to_uint64(args->op.delete.name)); 1546 __wake_caller(args, fserrno); 1547 } 1548 1549 static void 1550 __fs_delete_file(void *arg) 1551 { 1552 struct spdk_fs_request *req = arg; 1553 struct spdk_fs_cb_args *args = &req->args; 1554 1555 spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, fs_name_to_uint64(args->op.delete.name)); 1556 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1557 } 1558 1559 int 1560 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1561 const char *name) 1562 { 1563 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1564 struct spdk_fs_request *req; 1565 struct spdk_fs_cb_args *args; 1566 int rc; 1567 1568 req = alloc_fs_request(channel); 1569 if (req == NULL) { 1570 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Cannot allocate req to delete file=%s\n", name); 1571 return -ENOMEM; 1572 } 1573 1574 args = &req->args; 1575 args->fs = fs; 1576 args->op.delete.name = name; 1577 args->sem = &channel->sem; 1578 fs->send_request(__fs_delete_file, req); 1579 sem_wait(&channel->sem); 1580 rc = args->rc; 1581 free_fs_request(req); 1582 1583 return rc; 1584 } 1585 1586 spdk_fs_iter 1587 spdk_fs_iter_first(struct spdk_filesystem *fs) 1588 { 1589 struct spdk_file *f; 1590 1591 f = TAILQ_FIRST(&fs->files); 1592 return f; 1593 } 1594 1595 spdk_fs_iter 1596 spdk_fs_iter_next(spdk_fs_iter iter) 1597 { 1598 struct spdk_file *f = iter; 1599 1600 if (f == NULL) { 1601 return NULL; 1602 } 1603 1604 f = TAILQ_NEXT(f, tailq); 1605 return f; 1606 } 1607 1608 const char * 1609 spdk_file_get_name(struct spdk_file *file) 1610 { 1611 return file->name; 1612 } 1613 1614 uint64_t 1615 spdk_file_get_length(struct spdk_file *file) 1616 { 1617 uint64_t length; 1618 1619 assert(file != NULL); 1620 1621 length = file->append_pos >= file->length ? file->append_pos : file->length; 1622 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length); 1623 return length; 1624 } 1625 1626 static void 1627 fs_truncate_complete_cb(void *ctx, int bserrno) 1628 { 1629 struct spdk_fs_request *req = ctx; 1630 struct spdk_fs_cb_args *args = &req->args; 1631 1632 args->fn.file_op(args->arg, bserrno); 1633 free_fs_request(req); 1634 } 1635 1636 static void 1637 fs_truncate_resize_cb(void *ctx, int bserrno) 1638 { 1639 struct spdk_fs_request *req = ctx; 1640 struct spdk_fs_cb_args *args = &req->args; 1641 struct spdk_file *file = args->file; 1642 uint64_t *length = &args->op.truncate.length; 1643 1644 if (bserrno) { 1645 args->fn.file_op(args->arg, bserrno); 1646 free_fs_request(req); 1647 return; 1648 } 1649 1650 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1651 1652 file->length = *length; 1653 if (file->append_pos > file->length) { 1654 file->append_pos = file->length; 1655 } 1656 1657 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req); 1658 } 1659 1660 static uint64_t 1661 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1662 { 1663 return (length + cluster_sz - 1) / cluster_sz; 1664 } 1665 1666 void 1667 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1668 spdk_file_op_complete cb_fn, void *cb_arg) 1669 { 1670 struct spdk_filesystem *fs; 1671 size_t num_clusters; 1672 struct spdk_fs_request *req; 1673 struct spdk_fs_cb_args *args; 1674 1675 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1676 if (length == file->length) { 1677 cb_fn(cb_arg, 0); 1678 return; 1679 } 1680 1681 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1682 if (req == NULL) { 1683 cb_fn(cb_arg, -ENOMEM); 1684 return; 1685 } 1686 1687 args = &req->args; 1688 args->fn.file_op = cb_fn; 1689 args->arg = cb_arg; 1690 args->file = file; 1691 args->op.truncate.length = length; 1692 fs = file->fs; 1693 1694 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1695 1696 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1697 } 1698 1699 static void 1700 __truncate(void *arg) 1701 { 1702 struct spdk_fs_request *req = arg; 1703 struct spdk_fs_cb_args *args = &req->args; 1704 1705 spdk_file_truncate_async(args->file, args->op.truncate.length, 1706 args->fn.file_op, args); 1707 } 1708 1709 int 1710 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 1711 uint64_t length) 1712 { 1713 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1714 struct spdk_fs_request *req; 1715 struct spdk_fs_cb_args *args; 1716 int rc; 1717 1718 req = alloc_fs_request(channel); 1719 if (req == NULL) { 1720 return -ENOMEM; 1721 } 1722 1723 args = &req->args; 1724 1725 args->file = file; 1726 args->op.truncate.length = length; 1727 args->fn.file_op = __wake_caller; 1728 args->sem = &channel->sem; 1729 1730 channel->send_request(__truncate, req); 1731 sem_wait(&channel->sem); 1732 rc = args->rc; 1733 free_fs_request(req); 1734 1735 return rc; 1736 } 1737 1738 static void 1739 __rw_done(void *ctx, int bserrno) 1740 { 1741 struct spdk_fs_request *req = ctx; 1742 struct spdk_fs_cb_args *args = &req->args; 1743 1744 spdk_free(args->op.rw.pin_buf); 1745 args->fn.file_op(args->arg, bserrno); 1746 free_fs_request(req); 1747 } 1748 1749 static void 1750 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 1751 { 1752 int i; 1753 size_t len; 1754 1755 for (i = 0; i < iovcnt; i++) { 1756 len = spdk_min(iovs[i].iov_len, buf_len); 1757 memcpy(buf, iovs[i].iov_base, len); 1758 buf += len; 1759 assert(buf_len >= len); 1760 buf_len -= len; 1761 } 1762 } 1763 1764 static void 1765 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 1766 { 1767 int i; 1768 size_t len; 1769 1770 for (i = 0; i < iovcnt; i++) { 1771 len = spdk_min(iovs[i].iov_len, buf_len); 1772 memcpy(iovs[i].iov_base, buf, len); 1773 buf += len; 1774 assert(buf_len >= len); 1775 buf_len -= len; 1776 } 1777 } 1778 1779 static void 1780 __read_done(void *ctx, int bserrno) 1781 { 1782 struct spdk_fs_request *req = ctx; 1783 struct spdk_fs_cb_args *args = &req->args; 1784 void *buf; 1785 1786 assert(req != NULL); 1787 buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1))); 1788 if (args->op.rw.is_read) { 1789 _copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length); 1790 __rw_done(req, 0); 1791 } else { 1792 _copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt); 1793 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1794 args->op.rw.pin_buf, 1795 args->op.rw.start_lba, args->op.rw.num_lba, 1796 __rw_done, req); 1797 } 1798 } 1799 1800 static void 1801 __do_blob_read(void *ctx, int fserrno) 1802 { 1803 struct spdk_fs_request *req = ctx; 1804 struct spdk_fs_cb_args *args = &req->args; 1805 1806 if (fserrno) { 1807 __rw_done(req, fserrno); 1808 return; 1809 } 1810 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1811 args->op.rw.pin_buf, 1812 args->op.rw.start_lba, args->op.rw.num_lba, 1813 __read_done, req); 1814 } 1815 1816 static void 1817 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1818 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1819 { 1820 uint64_t end_lba; 1821 1822 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1823 *start_lba = offset / *lba_size; 1824 end_lba = (offset + length - 1) / *lba_size; 1825 *num_lba = (end_lba - *start_lba + 1); 1826 } 1827 1828 static bool 1829 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length) 1830 { 1831 uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1832 1833 if ((offset % lba_size == 0) && (length % lba_size == 0)) { 1834 return true; 1835 } 1836 1837 return false; 1838 } 1839 1840 static void 1841 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt) 1842 { 1843 uint32_t i; 1844 1845 for (i = 0; i < iovcnt; i++) { 1846 req->args.iovs[i].iov_base = iovs[i].iov_base; 1847 req->args.iovs[i].iov_len = iovs[i].iov_len; 1848 } 1849 } 1850 1851 static void 1852 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel, 1853 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1854 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1855 { 1856 struct spdk_fs_request *req; 1857 struct spdk_fs_cb_args *args; 1858 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1859 uint64_t start_lba, num_lba, pin_buf_length; 1860 uint32_t lba_size; 1861 1862 if (is_read && offset + length > file->length) { 1863 cb_fn(cb_arg, -EINVAL); 1864 return; 1865 } 1866 1867 req = alloc_fs_request_with_iov(channel, iovcnt); 1868 if (req == NULL) { 1869 cb_fn(cb_arg, -ENOMEM); 1870 return; 1871 } 1872 1873 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1874 1875 args = &req->args; 1876 args->fn.file_op = cb_fn; 1877 args->arg = cb_arg; 1878 args->file = file; 1879 args->op.rw.channel = channel->bs_channel; 1880 _fs_request_setup_iovs(req, iovs, iovcnt); 1881 args->op.rw.is_read = is_read; 1882 args->op.rw.offset = offset; 1883 args->op.rw.blocklen = lba_size; 1884 1885 pin_buf_length = num_lba * lba_size; 1886 args->op.rw.length = pin_buf_length; 1887 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL, 1888 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1889 if (args->op.rw.pin_buf == NULL) { 1890 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1891 file->name, offset, length); 1892 free_fs_request(req); 1893 cb_fn(cb_arg, -ENOMEM); 1894 return; 1895 } 1896 1897 args->op.rw.start_lba = start_lba; 1898 args->op.rw.num_lba = num_lba; 1899 1900 if (!is_read && file->length < offset + length) { 1901 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1902 } else if (!is_read && __is_lba_aligned(file, offset, length)) { 1903 _copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt); 1904 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1905 args->op.rw.pin_buf, 1906 args->op.rw.start_lba, args->op.rw.num_lba, 1907 __rw_done, req); 1908 } else { 1909 __do_blob_read(req, 0); 1910 } 1911 } 1912 1913 static void 1914 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel, 1915 void *payload, uint64_t offset, uint64_t length, 1916 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1917 { 1918 struct iovec iov; 1919 1920 iov.iov_base = payload; 1921 iov.iov_len = (size_t)length; 1922 1923 __readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read); 1924 } 1925 1926 void 1927 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1928 void *payload, uint64_t offset, uint64_t length, 1929 spdk_file_op_complete cb_fn, void *cb_arg) 1930 { 1931 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1932 } 1933 1934 void 1935 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel, 1936 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1937 spdk_file_op_complete cb_fn, void *cb_arg) 1938 { 1939 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1940 file->name, offset, length); 1941 1942 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0); 1943 } 1944 1945 void 1946 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1947 void *payload, uint64_t offset, uint64_t length, 1948 spdk_file_op_complete cb_fn, void *cb_arg) 1949 { 1950 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1951 file->name, offset, length); 1952 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1953 } 1954 1955 void 1956 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel, 1957 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1958 spdk_file_op_complete cb_fn, void *cb_arg) 1959 { 1960 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1961 file->name, offset, length); 1962 1963 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1); 1964 } 1965 1966 struct spdk_io_channel * 1967 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1968 { 1969 struct spdk_io_channel *io_channel; 1970 struct spdk_fs_channel *fs_channel; 1971 1972 io_channel = spdk_get_io_channel(&fs->io_target); 1973 fs_channel = spdk_io_channel_get_ctx(io_channel); 1974 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1975 fs_channel->send_request = __send_request_direct; 1976 1977 return io_channel; 1978 } 1979 1980 void 1981 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1982 { 1983 spdk_put_io_channel(channel); 1984 } 1985 1986 struct spdk_fs_thread_ctx * 1987 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs) 1988 { 1989 struct spdk_fs_thread_ctx *ctx; 1990 1991 ctx = calloc(1, sizeof(*ctx)); 1992 if (!ctx) { 1993 return NULL; 1994 } 1995 1996 fs_channel_create(fs, &ctx->ch, 512); 1997 1998 ctx->ch.send_request = fs->send_request; 1999 ctx->ch.sync = 1; 2000 pthread_spin_init(&ctx->ch.lock, 0); 2001 2002 return ctx; 2003 } 2004 2005 2006 void 2007 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx) 2008 { 2009 assert(ctx->ch.sync == 1); 2010 2011 while (true) { 2012 pthread_spin_lock(&ctx->ch.lock); 2013 if (ctx->ch.outstanding_reqs == 0) { 2014 pthread_spin_unlock(&ctx->ch.lock); 2015 break; 2016 } 2017 pthread_spin_unlock(&ctx->ch.lock); 2018 usleep(1000); 2019 } 2020 2021 fs_channel_destroy(NULL, &ctx->ch); 2022 free(ctx); 2023 } 2024 2025 int 2026 spdk_fs_set_cache_size(uint64_t size_in_mb) 2027 { 2028 /* setting g_fs_cache_size is only permitted if cache pool 2029 * is already freed or hasn't been initialized 2030 */ 2031 if (g_cache_pool != NULL) { 2032 return -EPERM; 2033 } 2034 2035 g_fs_cache_size = size_in_mb * 1024 * 1024; 2036 2037 return 0; 2038 } 2039 2040 uint64_t 2041 spdk_fs_get_cache_size(void) 2042 { 2043 return g_fs_cache_size / (1024 * 1024); 2044 } 2045 2046 static void __file_flush(void *ctx); 2047 2048 /* Try to free some cache buffers from this file. 2049 */ 2050 static int 2051 reclaim_cache_buffers(struct spdk_file *file) 2052 { 2053 int rc; 2054 2055 BLOBFS_TRACE(file, "free=%s\n", file->name); 2056 2057 /* The function is safe to be called with any threads, while the file 2058 * lock maybe locked by other thread for now, so try to get the file 2059 * lock here. 2060 */ 2061 rc = pthread_spin_trylock(&file->lock); 2062 if (rc != 0) { 2063 return -1; 2064 } 2065 2066 if (file->tree->present_mask == 0) { 2067 pthread_spin_unlock(&file->lock); 2068 return -1; 2069 } 2070 tree_free_buffers(file->tree); 2071 2072 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2073 /* If not freed, put it in the end of the queue */ 2074 if (file->tree->present_mask != 0) { 2075 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2076 } else { 2077 file->last = NULL; 2078 } 2079 pthread_spin_unlock(&file->lock); 2080 2081 return 0; 2082 } 2083 2084 static int 2085 _blobfs_cache_pool_reclaim(void *arg) 2086 { 2087 struct spdk_file *file, *tmp; 2088 int rc; 2089 2090 if (!blobfs_cache_pool_need_reclaim()) { 2091 return 0; 2092 } 2093 2094 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2095 if (!file->open_for_writing && 2096 file->priority == SPDK_FILE_PRIORITY_LOW) { 2097 rc = reclaim_cache_buffers(file); 2098 if (rc < 0) { 2099 continue; 2100 } 2101 if (!blobfs_cache_pool_need_reclaim()) { 2102 return 1; 2103 } 2104 break; 2105 } 2106 } 2107 2108 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2109 if (!file->open_for_writing) { 2110 rc = reclaim_cache_buffers(file); 2111 if (rc < 0) { 2112 continue; 2113 } 2114 if (!blobfs_cache_pool_need_reclaim()) { 2115 return 1; 2116 } 2117 break; 2118 } 2119 } 2120 2121 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2122 rc = reclaim_cache_buffers(file); 2123 if (rc < 0) { 2124 continue; 2125 } 2126 break; 2127 } 2128 2129 return 1; 2130 } 2131 2132 static void 2133 _add_file_to_cache_pool(void *ctx) 2134 { 2135 struct spdk_file *file = ctx; 2136 2137 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2138 } 2139 2140 static void 2141 _remove_file_from_cache_pool(void *ctx) 2142 { 2143 struct spdk_file *file = ctx; 2144 2145 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2146 } 2147 2148 static struct cache_buffer * 2149 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 2150 { 2151 struct cache_buffer *buf; 2152 int count = 0; 2153 bool need_update = false; 2154 2155 buf = calloc(1, sizeof(*buf)); 2156 if (buf == NULL) { 2157 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 2158 return NULL; 2159 } 2160 2161 do { 2162 buf->buf = spdk_mempool_get(g_cache_pool); 2163 if (buf->buf) { 2164 break; 2165 } 2166 if (count++ == 100) { 2167 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 2168 file, offset); 2169 free(buf); 2170 return NULL; 2171 } 2172 usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 2173 } while (true); 2174 2175 buf->buf_size = CACHE_BUFFER_SIZE; 2176 buf->offset = offset; 2177 2178 if (file->tree->present_mask == 0) { 2179 need_update = true; 2180 } 2181 file->tree = tree_insert_buffer(file->tree, buf); 2182 2183 if (need_update) { 2184 spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file); 2185 } 2186 2187 return buf; 2188 } 2189 2190 static struct cache_buffer * 2191 cache_append_buffer(struct spdk_file *file) 2192 { 2193 struct cache_buffer *last; 2194 2195 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 2196 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 2197 2198 last = cache_insert_buffer(file, file->append_pos); 2199 if (last == NULL) { 2200 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 2201 return NULL; 2202 } 2203 2204 file->last = last; 2205 2206 return last; 2207 } 2208 2209 static void __check_sync_reqs(struct spdk_file *file); 2210 2211 static void 2212 __file_cache_finish_sync(void *ctx, int bserrno) 2213 { 2214 struct spdk_file *file; 2215 struct spdk_fs_request *sync_req = ctx; 2216 struct spdk_fs_cb_args *sync_args; 2217 2218 sync_args = &sync_req->args; 2219 file = sync_args->file; 2220 pthread_spin_lock(&file->lock); 2221 file->length_xattr = sync_args->op.sync.length; 2222 assert(sync_args->op.sync.offset <= file->length_flushed); 2223 spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset, 2224 0, file->trace_arg_name); 2225 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 2226 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 2227 pthread_spin_unlock(&file->lock); 2228 2229 sync_args->fn.file_op(sync_args->arg, bserrno); 2230 2231 free_fs_request(sync_req); 2232 __check_sync_reqs(file); 2233 } 2234 2235 static void 2236 __check_sync_reqs(struct spdk_file *file) 2237 { 2238 struct spdk_fs_request *sync_req; 2239 2240 pthread_spin_lock(&file->lock); 2241 2242 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 2243 if (sync_req->args.op.sync.offset <= file->length_flushed) { 2244 break; 2245 } 2246 } 2247 2248 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 2249 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 2250 sync_req->args.op.sync.xattr_in_progress = true; 2251 sync_req->args.op.sync.length = file->length_flushed; 2252 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 2253 sizeof(file->length_flushed)); 2254 2255 pthread_spin_unlock(&file->lock); 2256 spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed, 2257 0, file->trace_arg_name); 2258 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req); 2259 } else { 2260 pthread_spin_unlock(&file->lock); 2261 } 2262 } 2263 2264 static void 2265 __file_flush_done(void *ctx, int bserrno) 2266 { 2267 struct spdk_fs_request *req = ctx; 2268 struct spdk_fs_cb_args *args = &req->args; 2269 struct spdk_file *file = args->file; 2270 struct cache_buffer *next = args->op.flush.cache_buffer; 2271 2272 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 2273 2274 pthread_spin_lock(&file->lock); 2275 next->in_progress = false; 2276 next->bytes_flushed += args->op.flush.length; 2277 file->length_flushed += args->op.flush.length; 2278 if (file->length_flushed > file->length) { 2279 file->length = file->length_flushed; 2280 } 2281 if (next->bytes_flushed == next->buf_size) { 2282 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 2283 next = tree_find_buffer(file->tree, file->length_flushed); 2284 } 2285 2286 /* 2287 * Assert that there is no cached data that extends past the end of the underlying 2288 * blob. 2289 */ 2290 assert(next == NULL || next->offset < __file_get_blob_size(file) || 2291 next->bytes_filled == 0); 2292 2293 pthread_spin_unlock(&file->lock); 2294 2295 __check_sync_reqs(file); 2296 2297 __file_flush(req); 2298 } 2299 2300 static void 2301 __file_flush(void *ctx) 2302 { 2303 struct spdk_fs_request *req = ctx; 2304 struct spdk_fs_cb_args *args = &req->args; 2305 struct spdk_file *file = args->file; 2306 struct cache_buffer *next; 2307 uint64_t offset, length, start_lba, num_lba; 2308 uint32_t lba_size; 2309 2310 pthread_spin_lock(&file->lock); 2311 next = tree_find_buffer(file->tree, file->length_flushed); 2312 if (next == NULL || next->in_progress || 2313 ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) { 2314 /* 2315 * There is either no data to flush, a flush I/O is already in 2316 * progress, or the next buffer is partially filled but there's no 2317 * outstanding request to sync it. 2318 * So return immediately - if a flush I/O is in progress we will flush 2319 * more data after that is completed, or a partial buffer will get flushed 2320 * when it is either filled or the file is synced. 2321 */ 2322 free_fs_request(req); 2323 if (next == NULL) { 2324 /* 2325 * For cases where a file's cache was evicted, and then the 2326 * file was later appended, we will write the data directly 2327 * to disk and bypass cache. So just update length_flushed 2328 * here to reflect that all data was already written to disk. 2329 */ 2330 file->length_flushed = file->append_pos; 2331 } 2332 pthread_spin_unlock(&file->lock); 2333 if (next == NULL) { 2334 /* 2335 * There is no data to flush, but we still need to check for any 2336 * outstanding sync requests to make sure metadata gets updated. 2337 */ 2338 __check_sync_reqs(file); 2339 } 2340 return; 2341 } 2342 2343 offset = next->offset + next->bytes_flushed; 2344 length = next->bytes_filled - next->bytes_flushed; 2345 if (length == 0) { 2346 free_fs_request(req); 2347 pthread_spin_unlock(&file->lock); 2348 /* 2349 * There is no data to flush, but we still need to check for any 2350 * outstanding sync requests to make sure metadata gets updated. 2351 */ 2352 __check_sync_reqs(file); 2353 return; 2354 } 2355 args->op.flush.length = length; 2356 args->op.flush.cache_buffer = next; 2357 2358 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2359 2360 next->in_progress = true; 2361 BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n", 2362 offset, length, start_lba, num_lba); 2363 pthread_spin_unlock(&file->lock); 2364 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2365 next->buf + (start_lba * lba_size) - next->offset, 2366 start_lba, num_lba, __file_flush_done, req); 2367 } 2368 2369 static void 2370 __file_extend_done(void *arg, int bserrno) 2371 { 2372 struct spdk_fs_cb_args *args = arg; 2373 2374 __wake_caller(args, bserrno); 2375 } 2376 2377 static void 2378 __file_extend_resize_cb(void *_args, int bserrno) 2379 { 2380 struct spdk_fs_cb_args *args = _args; 2381 struct spdk_file *file = args->file; 2382 2383 if (bserrno) { 2384 __wake_caller(args, bserrno); 2385 return; 2386 } 2387 2388 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2389 } 2390 2391 static void 2392 __file_extend_blob(void *_args) 2393 { 2394 struct spdk_fs_cb_args *args = _args; 2395 struct spdk_file *file = args->file; 2396 2397 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2398 } 2399 2400 static void 2401 __rw_from_file_done(void *ctx, int bserrno) 2402 { 2403 struct spdk_fs_request *req = ctx; 2404 2405 __wake_caller(&req->args, bserrno); 2406 free_fs_request(req); 2407 } 2408 2409 static void 2410 __rw_from_file(void *ctx) 2411 { 2412 struct spdk_fs_request *req = ctx; 2413 struct spdk_fs_cb_args *args = &req->args; 2414 struct spdk_file *file = args->file; 2415 2416 if (args->op.rw.is_read) { 2417 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2418 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2419 __rw_from_file_done, req); 2420 } else { 2421 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2422 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2423 __rw_from_file_done, req); 2424 } 2425 } 2426 2427 static int 2428 __send_rw_from_file(struct spdk_file *file, void *payload, 2429 uint64_t offset, uint64_t length, bool is_read, 2430 struct spdk_fs_channel *channel) 2431 { 2432 struct spdk_fs_request *req; 2433 struct spdk_fs_cb_args *args; 2434 2435 req = alloc_fs_request_with_iov(channel, 1); 2436 if (req == NULL) { 2437 sem_post(&channel->sem); 2438 return -ENOMEM; 2439 } 2440 2441 args = &req->args; 2442 args->file = file; 2443 args->sem = &channel->sem; 2444 args->iovs[0].iov_base = payload; 2445 args->iovs[0].iov_len = (size_t)length; 2446 args->op.rw.offset = offset; 2447 args->op.rw.is_read = is_read; 2448 file->fs->send_request(__rw_from_file, req); 2449 return 0; 2450 } 2451 2452 int 2453 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2454 void *payload, uint64_t offset, uint64_t length) 2455 { 2456 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2457 struct spdk_fs_request *flush_req; 2458 uint64_t rem_length, copy, blob_size, cluster_sz; 2459 uint32_t cache_buffers_filled = 0; 2460 uint8_t *cur_payload; 2461 struct cache_buffer *last; 2462 2463 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2464 2465 if (length == 0) { 2466 return 0; 2467 } 2468 2469 if (offset != file->append_pos) { 2470 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2471 return -EINVAL; 2472 } 2473 2474 pthread_spin_lock(&file->lock); 2475 file->open_for_writing = true; 2476 2477 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2478 cache_append_buffer(file); 2479 } 2480 2481 if (file->last == NULL) { 2482 int rc; 2483 2484 file->append_pos += length; 2485 pthread_spin_unlock(&file->lock); 2486 rc = __send_rw_from_file(file, payload, offset, length, false, channel); 2487 sem_wait(&channel->sem); 2488 return rc; 2489 } 2490 2491 blob_size = __file_get_blob_size(file); 2492 2493 if ((offset + length) > blob_size) { 2494 struct spdk_fs_cb_args extend_args = {}; 2495 2496 cluster_sz = file->fs->bs_opts.cluster_sz; 2497 extend_args.sem = &channel->sem; 2498 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2499 extend_args.file = file; 2500 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2501 pthread_spin_unlock(&file->lock); 2502 file->fs->send_request(__file_extend_blob, &extend_args); 2503 sem_wait(&channel->sem); 2504 if (extend_args.rc) { 2505 return extend_args.rc; 2506 } 2507 } 2508 2509 flush_req = alloc_fs_request(channel); 2510 if (flush_req == NULL) { 2511 pthread_spin_unlock(&file->lock); 2512 return -ENOMEM; 2513 } 2514 2515 last = file->last; 2516 rem_length = length; 2517 cur_payload = payload; 2518 while (rem_length > 0) { 2519 copy = last->buf_size - last->bytes_filled; 2520 if (copy > rem_length) { 2521 copy = rem_length; 2522 } 2523 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2524 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2525 file->append_pos += copy; 2526 if (file->length < file->append_pos) { 2527 file->length = file->append_pos; 2528 } 2529 cur_payload += copy; 2530 last->bytes_filled += copy; 2531 rem_length -= copy; 2532 if (last->bytes_filled == last->buf_size) { 2533 cache_buffers_filled++; 2534 last = cache_append_buffer(file); 2535 if (last == NULL) { 2536 BLOBFS_TRACE(file, "nomem\n"); 2537 free_fs_request(flush_req); 2538 pthread_spin_unlock(&file->lock); 2539 return -ENOMEM; 2540 } 2541 } 2542 } 2543 2544 pthread_spin_unlock(&file->lock); 2545 2546 if (cache_buffers_filled == 0) { 2547 free_fs_request(flush_req); 2548 return 0; 2549 } 2550 2551 flush_req->args.file = file; 2552 file->fs->send_request(__file_flush, flush_req); 2553 return 0; 2554 } 2555 2556 static void 2557 __readahead_done(void *ctx, int bserrno) 2558 { 2559 struct spdk_fs_request *req = ctx; 2560 struct spdk_fs_cb_args *args = &req->args; 2561 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2562 struct spdk_file *file = args->file; 2563 2564 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2565 2566 pthread_spin_lock(&file->lock); 2567 cache_buffer->bytes_filled = args->op.readahead.length; 2568 cache_buffer->bytes_flushed = args->op.readahead.length; 2569 cache_buffer->in_progress = false; 2570 pthread_spin_unlock(&file->lock); 2571 2572 free_fs_request(req); 2573 } 2574 2575 static void 2576 __readahead(void *ctx) 2577 { 2578 struct spdk_fs_request *req = ctx; 2579 struct spdk_fs_cb_args *args = &req->args; 2580 struct spdk_file *file = args->file; 2581 uint64_t offset, length, start_lba, num_lba; 2582 uint32_t lba_size; 2583 2584 offset = args->op.readahead.offset; 2585 length = args->op.readahead.length; 2586 assert(length > 0); 2587 2588 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2589 2590 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2591 offset, length, start_lba, num_lba); 2592 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2593 args->op.readahead.cache_buffer->buf, 2594 start_lba, num_lba, __readahead_done, req); 2595 } 2596 2597 static uint64_t 2598 __next_cache_buffer_offset(uint64_t offset) 2599 { 2600 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2601 } 2602 2603 static void 2604 check_readahead(struct spdk_file *file, uint64_t offset, 2605 struct spdk_fs_channel *channel) 2606 { 2607 struct spdk_fs_request *req; 2608 struct spdk_fs_cb_args *args; 2609 2610 offset = __next_cache_buffer_offset(offset); 2611 if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2612 return; 2613 } 2614 2615 req = alloc_fs_request(channel); 2616 if (req == NULL) { 2617 return; 2618 } 2619 args = &req->args; 2620 2621 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2622 2623 args->file = file; 2624 args->op.readahead.offset = offset; 2625 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2626 if (!args->op.readahead.cache_buffer) { 2627 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2628 free_fs_request(req); 2629 return; 2630 } 2631 2632 args->op.readahead.cache_buffer->in_progress = true; 2633 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2634 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2635 } else { 2636 args->op.readahead.length = CACHE_BUFFER_SIZE; 2637 } 2638 file->fs->send_request(__readahead, req); 2639 } 2640 2641 int64_t 2642 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2643 void *payload, uint64_t offset, uint64_t length) 2644 { 2645 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2646 uint64_t final_offset, final_length; 2647 uint32_t sub_reads = 0; 2648 struct cache_buffer *buf; 2649 uint64_t read_len; 2650 int rc = 0; 2651 2652 pthread_spin_lock(&file->lock); 2653 2654 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2655 2656 file->open_for_writing = false; 2657 2658 if (length == 0 || offset >= file->append_pos) { 2659 pthread_spin_unlock(&file->lock); 2660 return 0; 2661 } 2662 2663 if (offset + length > file->append_pos) { 2664 length = file->append_pos - offset; 2665 } 2666 2667 if (offset != file->next_seq_offset) { 2668 file->seq_byte_count = 0; 2669 } 2670 file->seq_byte_count += length; 2671 file->next_seq_offset = offset + length; 2672 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2673 check_readahead(file, offset, channel); 2674 check_readahead(file, offset + CACHE_BUFFER_SIZE, channel); 2675 } 2676 2677 final_length = 0; 2678 final_offset = offset + length; 2679 while (offset < final_offset) { 2680 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2681 if (length > (final_offset - offset)) { 2682 length = final_offset - offset; 2683 } 2684 2685 buf = tree_find_filled_buffer(file->tree, offset); 2686 if (buf == NULL) { 2687 pthread_spin_unlock(&file->lock); 2688 rc = __send_rw_from_file(file, payload, offset, length, true, channel); 2689 pthread_spin_lock(&file->lock); 2690 if (rc == 0) { 2691 sub_reads++; 2692 } 2693 } else { 2694 read_len = length; 2695 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2696 read_len = buf->offset + buf->bytes_filled - offset; 2697 } 2698 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len); 2699 memcpy(payload, &buf->buf[offset - buf->offset], read_len); 2700 if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) { 2701 tree_remove_buffer(file->tree, buf); 2702 if (file->tree->present_mask == 0) { 2703 spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file); 2704 } 2705 } 2706 } 2707 2708 if (rc == 0) { 2709 final_length += length; 2710 } else { 2711 break; 2712 } 2713 payload += length; 2714 offset += length; 2715 } 2716 pthread_spin_unlock(&file->lock); 2717 while (sub_reads > 0) { 2718 sem_wait(&channel->sem); 2719 sub_reads--; 2720 } 2721 if (rc == 0) { 2722 return final_length; 2723 } else { 2724 return rc; 2725 } 2726 } 2727 2728 static void 2729 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2730 spdk_file_op_complete cb_fn, void *cb_arg) 2731 { 2732 struct spdk_fs_request *sync_req; 2733 struct spdk_fs_request *flush_req; 2734 struct spdk_fs_cb_args *sync_args; 2735 struct spdk_fs_cb_args *flush_args; 2736 2737 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2738 2739 pthread_spin_lock(&file->lock); 2740 if (file->append_pos <= file->length_xattr) { 2741 BLOBFS_TRACE(file, "done - file already synced\n"); 2742 pthread_spin_unlock(&file->lock); 2743 cb_fn(cb_arg, 0); 2744 return; 2745 } 2746 2747 sync_req = alloc_fs_request(channel); 2748 if (!sync_req) { 2749 SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name); 2750 pthread_spin_unlock(&file->lock); 2751 cb_fn(cb_arg, -ENOMEM); 2752 return; 2753 } 2754 sync_args = &sync_req->args; 2755 2756 flush_req = alloc_fs_request(channel); 2757 if (!flush_req) { 2758 SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name); 2759 free_fs_request(sync_req); 2760 pthread_spin_unlock(&file->lock); 2761 cb_fn(cb_arg, -ENOMEM); 2762 return; 2763 } 2764 flush_args = &flush_req->args; 2765 2766 sync_args->file = file; 2767 sync_args->fn.file_op = cb_fn; 2768 sync_args->arg = cb_arg; 2769 sync_args->op.sync.offset = file->append_pos; 2770 sync_args->op.sync.xattr_in_progress = false; 2771 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2772 pthread_spin_unlock(&file->lock); 2773 2774 flush_args->file = file; 2775 channel->send_request(__file_flush, flush_req); 2776 } 2777 2778 int 2779 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2780 { 2781 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2782 struct spdk_fs_cb_args args = {}; 2783 2784 args.sem = &channel->sem; 2785 _file_sync(file, channel, __wake_caller, &args); 2786 sem_wait(&channel->sem); 2787 2788 return args.rc; 2789 } 2790 2791 void 2792 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2793 spdk_file_op_complete cb_fn, void *cb_arg) 2794 { 2795 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2796 2797 _file_sync(file, channel, cb_fn, cb_arg); 2798 } 2799 2800 void 2801 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2802 { 2803 BLOBFS_TRACE(file, "priority=%u\n", priority); 2804 file->priority = priority; 2805 2806 } 2807 2808 /* 2809 * Close routines 2810 */ 2811 2812 static void 2813 __file_close_async_done(void *ctx, int bserrno) 2814 { 2815 struct spdk_fs_request *req = ctx; 2816 struct spdk_fs_cb_args *args = &req->args; 2817 struct spdk_file *file = args->file; 2818 2819 spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name); 2820 2821 if (file->is_deleted) { 2822 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2823 return; 2824 } 2825 2826 args->fn.file_op(args->arg, bserrno); 2827 free_fs_request(req); 2828 } 2829 2830 static void 2831 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2832 { 2833 struct spdk_blob *blob; 2834 2835 pthread_spin_lock(&file->lock); 2836 if (file->ref_count == 0) { 2837 pthread_spin_unlock(&file->lock); 2838 __file_close_async_done(req, -EBADF); 2839 return; 2840 } 2841 2842 file->ref_count--; 2843 if (file->ref_count > 0) { 2844 pthread_spin_unlock(&file->lock); 2845 req->args.fn.file_op(req->args.arg, 0); 2846 free_fs_request(req); 2847 return; 2848 } 2849 2850 pthread_spin_unlock(&file->lock); 2851 2852 blob = file->blob; 2853 file->blob = NULL; 2854 spdk_blob_close(blob, __file_close_async_done, req); 2855 } 2856 2857 static void 2858 __file_close_async__sync_done(void *arg, int fserrno) 2859 { 2860 struct spdk_fs_request *req = arg; 2861 struct spdk_fs_cb_args *args = &req->args; 2862 2863 __file_close_async(args->file, req); 2864 } 2865 2866 void 2867 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2868 { 2869 struct spdk_fs_request *req; 2870 struct spdk_fs_cb_args *args; 2871 2872 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2873 if (req == NULL) { 2874 SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name); 2875 cb_fn(cb_arg, -ENOMEM); 2876 return; 2877 } 2878 2879 args = &req->args; 2880 args->file = file; 2881 args->fn.file_op = cb_fn; 2882 args->arg = cb_arg; 2883 2884 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2885 } 2886 2887 static void 2888 __file_close(void *arg) 2889 { 2890 struct spdk_fs_request *req = arg; 2891 struct spdk_fs_cb_args *args = &req->args; 2892 struct spdk_file *file = args->file; 2893 2894 __file_close_async(file, req); 2895 } 2896 2897 int 2898 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2899 { 2900 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2901 struct spdk_fs_request *req; 2902 struct spdk_fs_cb_args *args; 2903 2904 req = alloc_fs_request(channel); 2905 if (req == NULL) { 2906 SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name); 2907 return -ENOMEM; 2908 } 2909 2910 args = &req->args; 2911 2912 spdk_file_sync(file, ctx); 2913 BLOBFS_TRACE(file, "name=%s\n", file->name); 2914 args->file = file; 2915 args->sem = &channel->sem; 2916 args->fn.file_op = __wake_caller; 2917 args->arg = args; 2918 channel->send_request(__file_close, req); 2919 sem_wait(&channel->sem); 2920 2921 return args->rc; 2922 } 2923 2924 int 2925 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2926 { 2927 if (size < sizeof(spdk_blob_id)) { 2928 return -EINVAL; 2929 } 2930 2931 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2932 2933 return sizeof(spdk_blob_id); 2934 } 2935 2936 static void 2937 _file_free(void *ctx) 2938 { 2939 struct spdk_file *file = ctx; 2940 2941 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2942 2943 free(file->name); 2944 free(file->tree); 2945 free(file); 2946 } 2947 2948 static void 2949 file_free(struct spdk_file *file) 2950 { 2951 BLOBFS_TRACE(file, "free=%s\n", file->name); 2952 pthread_spin_lock(&file->lock); 2953 if (file->tree->present_mask == 0) { 2954 pthread_spin_unlock(&file->lock); 2955 free(file->name); 2956 free(file->tree); 2957 free(file); 2958 return; 2959 } 2960 2961 tree_free_buffers(file->tree); 2962 assert(file->tree->present_mask == 0); 2963 spdk_thread_send_msg(g_cache_pool_thread, _file_free, file); 2964 pthread_spin_unlock(&file->lock); 2965 } 2966 2967 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2968 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2969