1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "tree.h" 38 39 #include "spdk/queue.h" 40 #include "spdk/thread.h" 41 #include "spdk/assert.h" 42 #include "spdk/env.h" 43 #include "spdk/util.h" 44 #include "spdk/log.h" 45 #include "spdk/trace.h" 46 47 #define BLOBFS_TRACE(file, str, args...) \ 48 SPDK_DEBUGLOG(blobfs, "file=%s " str, file->name, ##args) 49 50 #define BLOBFS_TRACE_RW(file, str, args...) \ 51 SPDK_DEBUGLOG(blobfs_rw, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 55 56 #define SPDK_BLOBFS_SIGNATURE "BLOBFS" 57 58 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 59 static struct spdk_mempool *g_cache_pool; 60 static TAILQ_HEAD(, spdk_file) g_caches = TAILQ_HEAD_INITIALIZER(g_caches); 61 static struct spdk_poller *g_cache_pool_mgmt_poller; 62 static struct spdk_thread *g_cache_pool_thread; 63 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL 64 static int g_fs_count = 0; 65 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 66 67 #define TRACE_GROUP_BLOBFS 0x7 68 #define TRACE_BLOBFS_XATTR_START SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0) 69 #define TRACE_BLOBFS_XATTR_END SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1) 70 #define TRACE_BLOBFS_OPEN SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2) 71 #define TRACE_BLOBFS_CLOSE SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3) 72 #define TRACE_BLOBFS_DELETE_START SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4) 73 #define TRACE_BLOBFS_DELETE_DONE SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5) 74 75 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS) 76 { 77 spdk_trace_register_description("BLOBFS_XATTR_START", 78 TRACE_BLOBFS_XATTR_START, 79 OWNER_NONE, OBJECT_NONE, 0, 80 SPDK_TRACE_ARG_TYPE_STR, 81 "file: "); 82 spdk_trace_register_description("BLOBFS_XATTR_END", 83 TRACE_BLOBFS_XATTR_END, 84 OWNER_NONE, OBJECT_NONE, 0, 85 SPDK_TRACE_ARG_TYPE_STR, 86 "file: "); 87 spdk_trace_register_description("BLOBFS_OPEN", 88 TRACE_BLOBFS_OPEN, 89 OWNER_NONE, OBJECT_NONE, 0, 90 SPDK_TRACE_ARG_TYPE_STR, 91 "file: "); 92 spdk_trace_register_description("BLOBFS_CLOSE", 93 TRACE_BLOBFS_CLOSE, 94 OWNER_NONE, OBJECT_NONE, 0, 95 SPDK_TRACE_ARG_TYPE_STR, 96 "file: "); 97 spdk_trace_register_description("BLOBFS_DELETE_START", 98 TRACE_BLOBFS_DELETE_START, 99 OWNER_NONE, OBJECT_NONE, 0, 100 SPDK_TRACE_ARG_TYPE_STR, 101 "file: "); 102 spdk_trace_register_description("BLOBFS_DELETE_DONE", 103 TRACE_BLOBFS_DELETE_DONE, 104 OWNER_NONE, OBJECT_NONE, 0, 105 SPDK_TRACE_ARG_TYPE_STR, 106 "file: "); 107 } 108 109 void 110 cache_buffer_free(struct cache_buffer *cache_buffer) 111 { 112 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 113 free(cache_buffer); 114 } 115 116 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 117 118 struct spdk_file { 119 struct spdk_filesystem *fs; 120 struct spdk_blob *blob; 121 char *name; 122 uint64_t trace_arg_name; 123 uint64_t length; 124 bool is_deleted; 125 bool open_for_writing; 126 uint64_t length_flushed; 127 uint64_t length_xattr; 128 uint64_t append_pos; 129 uint64_t seq_byte_count; 130 uint64_t next_seq_offset; 131 uint32_t priority; 132 TAILQ_ENTRY(spdk_file) tailq; 133 spdk_blob_id blobid; 134 uint32_t ref_count; 135 pthread_spinlock_t lock; 136 struct cache_buffer *last; 137 struct cache_tree *tree; 138 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 139 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 140 TAILQ_ENTRY(spdk_file) cache_tailq; 141 }; 142 143 struct spdk_deleted_file { 144 spdk_blob_id id; 145 TAILQ_ENTRY(spdk_deleted_file) tailq; 146 }; 147 148 struct spdk_filesystem { 149 struct spdk_blob_store *bs; 150 TAILQ_HEAD(, spdk_file) files; 151 struct spdk_bs_opts bs_opts; 152 struct spdk_bs_dev *bdev; 153 fs_send_request_fn send_request; 154 155 struct { 156 uint32_t max_ops; 157 struct spdk_io_channel *sync_io_channel; 158 struct spdk_fs_channel *sync_fs_channel; 159 } sync_target; 160 161 struct { 162 uint32_t max_ops; 163 struct spdk_io_channel *md_io_channel; 164 struct spdk_fs_channel *md_fs_channel; 165 } md_target; 166 167 struct { 168 uint32_t max_ops; 169 } io_target; 170 }; 171 172 struct spdk_fs_cb_args { 173 union { 174 spdk_fs_op_with_handle_complete fs_op_with_handle; 175 spdk_fs_op_complete fs_op; 176 spdk_file_op_with_handle_complete file_op_with_handle; 177 spdk_file_op_complete file_op; 178 spdk_file_stat_op_complete stat_op; 179 } fn; 180 void *arg; 181 sem_t *sem; 182 struct spdk_filesystem *fs; 183 struct spdk_file *file; 184 int rc; 185 int *rwerrno; 186 struct iovec *iovs; 187 uint32_t iovcnt; 188 struct iovec iov; 189 union { 190 struct { 191 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 192 } fs_load; 193 struct { 194 uint64_t length; 195 } truncate; 196 struct { 197 struct spdk_io_channel *channel; 198 void *pin_buf; 199 int is_read; 200 off_t offset; 201 size_t length; 202 uint64_t start_lba; 203 uint64_t num_lba; 204 uint32_t blocklen; 205 } rw; 206 struct { 207 const char *old_name; 208 const char *new_name; 209 } rename; 210 struct { 211 struct cache_buffer *cache_buffer; 212 uint64_t length; 213 } flush; 214 struct { 215 struct cache_buffer *cache_buffer; 216 uint64_t length; 217 uint64_t offset; 218 } readahead; 219 struct { 220 /* offset of the file when the sync request was made */ 221 uint64_t offset; 222 TAILQ_ENTRY(spdk_fs_request) tailq; 223 bool xattr_in_progress; 224 /* length written to the xattr for this file - this should 225 * always be the same as the offset if only one thread is 226 * writing to the file, but could differ if multiple threads 227 * are appending 228 */ 229 uint64_t length; 230 } sync; 231 struct { 232 uint32_t num_clusters; 233 } resize; 234 struct { 235 const char *name; 236 uint32_t flags; 237 TAILQ_ENTRY(spdk_fs_request) tailq; 238 } open; 239 struct { 240 const char *name; 241 struct spdk_blob *blob; 242 } create; 243 struct { 244 const char *name; 245 } delete; 246 struct { 247 const char *name; 248 } stat; 249 } op; 250 }; 251 252 static void file_free(struct spdk_file *file); 253 static void fs_io_device_unregister(struct spdk_filesystem *fs); 254 static void fs_free_io_channels(struct spdk_filesystem *fs); 255 256 void 257 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 258 { 259 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 260 } 261 262 static int _blobfs_cache_pool_reclaim(void *arg); 263 264 static bool 265 blobfs_cache_pool_need_reclaim(void) 266 { 267 size_t count; 268 269 count = spdk_mempool_count(g_cache_pool); 270 /* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller 271 * when the number of available cache buffer is less than 1/5 of total buffers. 272 */ 273 if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) { 274 return false; 275 } 276 277 return true; 278 } 279 280 static void 281 __start_cache_pool_mgmt(void *ctx) 282 { 283 assert(g_cache_pool == NULL); 284 285 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 286 g_fs_cache_size / CACHE_BUFFER_SIZE, 287 CACHE_BUFFER_SIZE, 288 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 289 SPDK_ENV_SOCKET_ID_ANY); 290 if (!g_cache_pool) { 291 SPDK_ERRLOG("Create mempool failed, you may " 292 "increase the memory and try again\n"); 293 assert(false); 294 } 295 296 assert(g_cache_pool_mgmt_poller == NULL); 297 g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL, 298 BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 299 } 300 301 static void 302 __stop_cache_pool_mgmt(void *ctx) 303 { 304 spdk_poller_unregister(&g_cache_pool_mgmt_poller); 305 306 assert(g_cache_pool != NULL); 307 assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE); 308 spdk_mempool_free(g_cache_pool); 309 g_cache_pool = NULL; 310 311 spdk_thread_exit(g_cache_pool_thread); 312 } 313 314 static void 315 initialize_global_cache(void) 316 { 317 pthread_mutex_lock(&g_cache_init_lock); 318 if (g_fs_count == 0) { 319 g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL); 320 assert(g_cache_pool_thread != NULL); 321 spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL); 322 } 323 g_fs_count++; 324 pthread_mutex_unlock(&g_cache_init_lock); 325 } 326 327 static void 328 free_global_cache(void) 329 { 330 pthread_mutex_lock(&g_cache_init_lock); 331 g_fs_count--; 332 if (g_fs_count == 0) { 333 spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL); 334 } 335 pthread_mutex_unlock(&g_cache_init_lock); 336 } 337 338 static uint64_t 339 __file_get_blob_size(struct spdk_file *file) 340 { 341 uint64_t cluster_sz; 342 343 cluster_sz = file->fs->bs_opts.cluster_sz; 344 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 345 } 346 347 struct spdk_fs_request { 348 struct spdk_fs_cb_args args; 349 TAILQ_ENTRY(spdk_fs_request) link; 350 struct spdk_fs_channel *channel; 351 }; 352 353 struct spdk_fs_channel { 354 struct spdk_fs_request *req_mem; 355 TAILQ_HEAD(, spdk_fs_request) reqs; 356 sem_t sem; 357 struct spdk_filesystem *fs; 358 struct spdk_io_channel *bs_channel; 359 fs_send_request_fn send_request; 360 bool sync; 361 uint32_t outstanding_reqs; 362 pthread_spinlock_t lock; 363 }; 364 365 /* For now, this is effectively an alias. But eventually we'll shift 366 * some data members over. */ 367 struct spdk_fs_thread_ctx { 368 struct spdk_fs_channel ch; 369 }; 370 371 static struct spdk_fs_request * 372 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt) 373 { 374 struct spdk_fs_request *req; 375 struct iovec *iovs = NULL; 376 377 if (iovcnt > 1) { 378 iovs = calloc(iovcnt, sizeof(struct iovec)); 379 if (!iovs) { 380 return NULL; 381 } 382 } 383 384 if (channel->sync) { 385 pthread_spin_lock(&channel->lock); 386 } 387 388 req = TAILQ_FIRST(&channel->reqs); 389 if (req) { 390 channel->outstanding_reqs++; 391 TAILQ_REMOVE(&channel->reqs, req, link); 392 } 393 394 if (channel->sync) { 395 pthread_spin_unlock(&channel->lock); 396 } 397 398 if (req == NULL) { 399 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 400 free(iovs); 401 return NULL; 402 } 403 memset(req, 0, sizeof(*req)); 404 req->channel = channel; 405 if (iovcnt > 1) { 406 req->args.iovs = iovs; 407 } else { 408 req->args.iovs = &req->args.iov; 409 } 410 req->args.iovcnt = iovcnt; 411 412 return req; 413 } 414 415 static struct spdk_fs_request * 416 alloc_fs_request(struct spdk_fs_channel *channel) 417 { 418 return alloc_fs_request_with_iov(channel, 0); 419 } 420 421 static void 422 free_fs_request(struct spdk_fs_request *req) 423 { 424 struct spdk_fs_channel *channel = req->channel; 425 426 if (req->args.iovcnt > 1) { 427 free(req->args.iovs); 428 } 429 430 if (channel->sync) { 431 pthread_spin_lock(&channel->lock); 432 } 433 434 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 435 channel->outstanding_reqs--; 436 437 if (channel->sync) { 438 pthread_spin_unlock(&channel->lock); 439 } 440 } 441 442 static int 443 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 444 uint32_t max_ops) 445 { 446 uint32_t i; 447 448 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 449 if (!channel->req_mem) { 450 return -1; 451 } 452 453 channel->outstanding_reqs = 0; 454 TAILQ_INIT(&channel->reqs); 455 sem_init(&channel->sem, 0, 0); 456 457 for (i = 0; i < max_ops; i++) { 458 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 459 } 460 461 channel->fs = fs; 462 463 return 0; 464 } 465 466 static int 467 fs_md_channel_create(void *io_device, void *ctx_buf) 468 { 469 struct spdk_filesystem *fs; 470 struct spdk_fs_channel *channel = ctx_buf; 471 472 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 473 474 return fs_channel_create(fs, channel, fs->md_target.max_ops); 475 } 476 477 static int 478 fs_sync_channel_create(void *io_device, void *ctx_buf) 479 { 480 struct spdk_filesystem *fs; 481 struct spdk_fs_channel *channel = ctx_buf; 482 483 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 484 485 return fs_channel_create(fs, channel, fs->sync_target.max_ops); 486 } 487 488 static int 489 fs_io_channel_create(void *io_device, void *ctx_buf) 490 { 491 struct spdk_filesystem *fs; 492 struct spdk_fs_channel *channel = ctx_buf; 493 494 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 495 496 return fs_channel_create(fs, channel, fs->io_target.max_ops); 497 } 498 499 static void 500 fs_channel_destroy(void *io_device, void *ctx_buf) 501 { 502 struct spdk_fs_channel *channel = ctx_buf; 503 504 if (channel->outstanding_reqs > 0) { 505 SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n", 506 channel->outstanding_reqs); 507 } 508 509 free(channel->req_mem); 510 if (channel->bs_channel != NULL) { 511 spdk_bs_free_io_channel(channel->bs_channel); 512 } 513 } 514 515 static void 516 __send_request_direct(fs_request_fn fn, void *arg) 517 { 518 fn(arg); 519 } 520 521 static void 522 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 523 { 524 fs->bs = bs; 525 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 526 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 527 fs->md_target.md_fs_channel->send_request = __send_request_direct; 528 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 529 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 530 531 initialize_global_cache(); 532 } 533 534 static void 535 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 536 { 537 struct spdk_fs_request *req = ctx; 538 struct spdk_fs_cb_args *args = &req->args; 539 struct spdk_filesystem *fs = args->fs; 540 541 if (bserrno == 0) { 542 common_fs_bs_init(fs, bs); 543 } else { 544 free(fs); 545 fs = NULL; 546 } 547 548 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 549 free_fs_request(req); 550 } 551 552 static struct spdk_filesystem * 553 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 554 { 555 struct spdk_filesystem *fs; 556 557 fs = calloc(1, sizeof(*fs)); 558 if (fs == NULL) { 559 return NULL; 560 } 561 562 fs->bdev = dev; 563 fs->send_request = send_request_fn; 564 TAILQ_INIT(&fs->files); 565 566 fs->md_target.max_ops = 512; 567 spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy, 568 sizeof(struct spdk_fs_channel), "blobfs_md"); 569 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 570 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 571 572 fs->sync_target.max_ops = 512; 573 spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy, 574 sizeof(struct spdk_fs_channel), "blobfs_sync"); 575 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 576 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 577 578 fs->io_target.max_ops = 512; 579 spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy, 580 sizeof(struct spdk_fs_channel), "blobfs_io"); 581 582 return fs; 583 } 584 585 static void 586 __wake_caller(void *arg, int fserrno) 587 { 588 struct spdk_fs_cb_args *args = arg; 589 590 if ((args->rwerrno != NULL) && (*(args->rwerrno) == 0) && fserrno) { 591 *(args->rwerrno) = fserrno; 592 } 593 args->rc = fserrno; 594 sem_post(args->sem); 595 } 596 597 void 598 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 599 fs_send_request_fn send_request_fn, 600 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 601 { 602 struct spdk_filesystem *fs; 603 struct spdk_fs_request *req; 604 struct spdk_fs_cb_args *args; 605 struct spdk_bs_opts opts = {}; 606 607 fs = fs_alloc(dev, send_request_fn); 608 if (fs == NULL) { 609 cb_fn(cb_arg, NULL, -ENOMEM); 610 return; 611 } 612 613 req = alloc_fs_request(fs->md_target.md_fs_channel); 614 if (req == NULL) { 615 fs_free_io_channels(fs); 616 fs_io_device_unregister(fs); 617 cb_fn(cb_arg, NULL, -ENOMEM); 618 return; 619 } 620 621 args = &req->args; 622 args->fn.fs_op_with_handle = cb_fn; 623 args->arg = cb_arg; 624 args->fs = fs; 625 626 spdk_bs_opts_init(&opts, sizeof(opts)); 627 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE); 628 if (opt) { 629 opts.cluster_sz = opt->cluster_sz; 630 } 631 spdk_bs_init(dev, &opts, init_cb, req); 632 } 633 634 static struct spdk_file * 635 file_alloc(struct spdk_filesystem *fs) 636 { 637 struct spdk_file *file; 638 639 file = calloc(1, sizeof(*file)); 640 if (file == NULL) { 641 return NULL; 642 } 643 644 file->tree = calloc(1, sizeof(*file->tree)); 645 if (file->tree == NULL) { 646 free(file); 647 return NULL; 648 } 649 650 if (pthread_spin_init(&file->lock, 0)) { 651 free(file->tree); 652 free(file); 653 return NULL; 654 } 655 656 file->fs = fs; 657 TAILQ_INIT(&file->open_requests); 658 TAILQ_INIT(&file->sync_requests); 659 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 660 file->priority = SPDK_FILE_PRIORITY_LOW; 661 return file; 662 } 663 664 static void fs_load_done(void *ctx, int bserrno); 665 666 static int 667 _handle_deleted_files(struct spdk_fs_request *req) 668 { 669 struct spdk_fs_cb_args *args = &req->args; 670 struct spdk_filesystem *fs = args->fs; 671 672 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 673 struct spdk_deleted_file *deleted_file; 674 675 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 676 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 677 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 678 free(deleted_file); 679 return 0; 680 } 681 682 return 1; 683 } 684 685 static void 686 fs_load_done(void *ctx, int bserrno) 687 { 688 struct spdk_fs_request *req = ctx; 689 struct spdk_fs_cb_args *args = &req->args; 690 struct spdk_filesystem *fs = args->fs; 691 692 /* The filesystem has been loaded. Now check if there are any files that 693 * were marked for deletion before last unload. Do not complete the 694 * fs_load callback until all of them have been deleted on disk. 695 */ 696 if (_handle_deleted_files(req) == 0) { 697 /* We found a file that's been marked for deleting but not actually 698 * deleted yet. This function will get called again once the delete 699 * operation is completed. 700 */ 701 return; 702 } 703 704 args->fn.fs_op_with_handle(args->arg, fs, 0); 705 free_fs_request(req); 706 707 } 708 709 static void 710 _file_build_trace_arg_name(struct spdk_file *f) 711 { 712 f->trace_arg_name = 0; 713 memcpy(&f->trace_arg_name, f->name, 714 spdk_min(sizeof(f->trace_arg_name), strlen(f->name))); 715 } 716 717 static void 718 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 719 { 720 struct spdk_fs_request *req = ctx; 721 struct spdk_fs_cb_args *args = &req->args; 722 struct spdk_filesystem *fs = args->fs; 723 uint64_t *length; 724 const char *name; 725 uint32_t *is_deleted; 726 size_t value_len; 727 728 if (rc < 0) { 729 args->fn.fs_op_with_handle(args->arg, fs, rc); 730 free_fs_request(req); 731 return; 732 } 733 734 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 735 if (rc < 0) { 736 args->fn.fs_op_with_handle(args->arg, fs, rc); 737 free_fs_request(req); 738 return; 739 } 740 741 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 742 if (rc < 0) { 743 args->fn.fs_op_with_handle(args->arg, fs, rc); 744 free_fs_request(req); 745 return; 746 } 747 748 assert(value_len == 8); 749 750 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 751 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 752 if (rc < 0) { 753 struct spdk_file *f; 754 755 f = file_alloc(fs); 756 if (f == NULL) { 757 SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n"); 758 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 759 free_fs_request(req); 760 return; 761 } 762 763 f->name = strdup(name); 764 _file_build_trace_arg_name(f); 765 f->blobid = spdk_blob_get_id(blob); 766 f->length = *length; 767 f->length_flushed = *length; 768 f->length_xattr = *length; 769 f->append_pos = *length; 770 SPDK_DEBUGLOG(blobfs, "added file %s length=%ju\n", f->name, f->length); 771 } else { 772 struct spdk_deleted_file *deleted_file; 773 774 deleted_file = calloc(1, sizeof(*deleted_file)); 775 if (deleted_file == NULL) { 776 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 777 free_fs_request(req); 778 return; 779 } 780 deleted_file->id = spdk_blob_get_id(blob); 781 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 782 } 783 } 784 785 static void 786 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 787 { 788 struct spdk_fs_request *req = ctx; 789 struct spdk_fs_cb_args *args = &req->args; 790 struct spdk_filesystem *fs = args->fs; 791 struct spdk_bs_type bstype; 792 static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE}; 793 static const struct spdk_bs_type zeros; 794 795 if (bserrno != 0) { 796 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 797 free_fs_request(req); 798 fs_free_io_channels(fs); 799 fs_io_device_unregister(fs); 800 return; 801 } 802 803 bstype = spdk_bs_get_bstype(bs); 804 805 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 806 SPDK_DEBUGLOG(blobfs, "assigning bstype\n"); 807 spdk_bs_set_bstype(bs, blobfs_type); 808 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 809 SPDK_ERRLOG("not blobfs\n"); 810 SPDK_LOGDUMP(blobfs, "bstype", &bstype, sizeof(bstype)); 811 args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL); 812 free_fs_request(req); 813 fs_free_io_channels(fs); 814 fs_io_device_unregister(fs); 815 return; 816 } 817 818 common_fs_bs_init(fs, bs); 819 fs_load_done(req, 0); 820 } 821 822 static void 823 fs_io_device_unregister(struct spdk_filesystem *fs) 824 { 825 assert(fs != NULL); 826 spdk_io_device_unregister(&fs->md_target, NULL); 827 spdk_io_device_unregister(&fs->sync_target, NULL); 828 spdk_io_device_unregister(&fs->io_target, NULL); 829 free(fs); 830 } 831 832 static void 833 fs_free_io_channels(struct spdk_filesystem *fs) 834 { 835 assert(fs != NULL); 836 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 837 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 838 } 839 840 void 841 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 842 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 843 { 844 struct spdk_filesystem *fs; 845 struct spdk_fs_cb_args *args; 846 struct spdk_fs_request *req; 847 struct spdk_bs_opts bs_opts; 848 849 fs = fs_alloc(dev, send_request_fn); 850 if (fs == NULL) { 851 cb_fn(cb_arg, NULL, -ENOMEM); 852 return; 853 } 854 855 req = alloc_fs_request(fs->md_target.md_fs_channel); 856 if (req == NULL) { 857 fs_free_io_channels(fs); 858 fs_io_device_unregister(fs); 859 cb_fn(cb_arg, NULL, -ENOMEM); 860 return; 861 } 862 863 args = &req->args; 864 args->fn.fs_op_with_handle = cb_fn; 865 args->arg = cb_arg; 866 args->fs = fs; 867 TAILQ_INIT(&args->op.fs_load.deleted_files); 868 spdk_bs_opts_init(&bs_opts, sizeof(bs_opts)); 869 bs_opts.iter_cb_fn = iter_cb; 870 bs_opts.iter_cb_arg = req; 871 spdk_bs_load(dev, &bs_opts, load_cb, req); 872 } 873 874 static void 875 unload_cb(void *ctx, int bserrno) 876 { 877 struct spdk_fs_request *req = ctx; 878 struct spdk_fs_cb_args *args = &req->args; 879 struct spdk_filesystem *fs = args->fs; 880 struct spdk_file *file, *tmp; 881 882 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 883 TAILQ_REMOVE(&fs->files, file, tailq); 884 file_free(file); 885 } 886 887 free_global_cache(); 888 889 args->fn.fs_op(args->arg, bserrno); 890 free(req); 891 892 fs_io_device_unregister(fs); 893 } 894 895 void 896 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 897 { 898 struct spdk_fs_request *req; 899 struct spdk_fs_cb_args *args; 900 901 /* 902 * We must free the md_channel before unloading the blobstore, so just 903 * allocate this request from the general heap. 904 */ 905 req = calloc(1, sizeof(*req)); 906 if (req == NULL) { 907 cb_fn(cb_arg, -ENOMEM); 908 return; 909 } 910 911 args = &req->args; 912 args->fn.fs_op = cb_fn; 913 args->arg = cb_arg; 914 args->fs = fs; 915 916 fs_free_io_channels(fs); 917 spdk_bs_unload(fs->bs, unload_cb, req); 918 } 919 920 static struct spdk_file * 921 fs_find_file(struct spdk_filesystem *fs, const char *name) 922 { 923 struct spdk_file *file; 924 925 TAILQ_FOREACH(file, &fs->files, tailq) { 926 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 927 return file; 928 } 929 } 930 931 return NULL; 932 } 933 934 void 935 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 936 spdk_file_stat_op_complete cb_fn, void *cb_arg) 937 { 938 struct spdk_file_stat stat; 939 struct spdk_file *f = NULL; 940 941 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 942 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 943 return; 944 } 945 946 f = fs_find_file(fs, name); 947 if (f != NULL) { 948 stat.blobid = f->blobid; 949 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 950 cb_fn(cb_arg, &stat, 0); 951 return; 952 } 953 954 cb_fn(cb_arg, NULL, -ENOENT); 955 } 956 957 static void 958 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 959 { 960 struct spdk_fs_request *req = arg; 961 struct spdk_fs_cb_args *args = &req->args; 962 963 args->rc = fserrno; 964 if (fserrno == 0) { 965 memcpy(args->arg, stat, sizeof(*stat)); 966 } 967 sem_post(args->sem); 968 } 969 970 static void 971 __file_stat(void *arg) 972 { 973 struct spdk_fs_request *req = arg; 974 struct spdk_fs_cb_args *args = &req->args; 975 976 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 977 args->fn.stat_op, req); 978 } 979 980 int 981 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 982 const char *name, struct spdk_file_stat *stat) 983 { 984 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 985 struct spdk_fs_request *req; 986 int rc; 987 988 req = alloc_fs_request(channel); 989 if (req == NULL) { 990 SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name); 991 return -ENOMEM; 992 } 993 994 req->args.fs = fs; 995 req->args.op.stat.name = name; 996 req->args.fn.stat_op = __copy_stat; 997 req->args.arg = stat; 998 req->args.sem = &channel->sem; 999 channel->send_request(__file_stat, req); 1000 sem_wait(&channel->sem); 1001 1002 rc = req->args.rc; 1003 free_fs_request(req); 1004 1005 return rc; 1006 } 1007 1008 static void 1009 fs_create_blob_close_cb(void *ctx, int bserrno) 1010 { 1011 int rc; 1012 struct spdk_fs_request *req = ctx; 1013 struct spdk_fs_cb_args *args = &req->args; 1014 1015 rc = args->rc ? args->rc : bserrno; 1016 args->fn.file_op(args->arg, rc); 1017 free_fs_request(req); 1018 } 1019 1020 static void 1021 fs_create_blob_resize_cb(void *ctx, int bserrno) 1022 { 1023 struct spdk_fs_request *req = ctx; 1024 struct spdk_fs_cb_args *args = &req->args; 1025 struct spdk_file *f = args->file; 1026 struct spdk_blob *blob = args->op.create.blob; 1027 uint64_t length = 0; 1028 1029 args->rc = bserrno; 1030 if (bserrno) { 1031 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1032 return; 1033 } 1034 1035 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 1036 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 1037 1038 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1039 } 1040 1041 static void 1042 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1043 { 1044 struct spdk_fs_request *req = ctx; 1045 struct spdk_fs_cb_args *args = &req->args; 1046 1047 if (bserrno) { 1048 args->fn.file_op(args->arg, bserrno); 1049 free_fs_request(req); 1050 return; 1051 } 1052 1053 args->op.create.blob = blob; 1054 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 1055 } 1056 1057 static void 1058 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 1059 { 1060 struct spdk_fs_request *req = ctx; 1061 struct spdk_fs_cb_args *args = &req->args; 1062 struct spdk_file *f = args->file; 1063 1064 if (bserrno) { 1065 args->fn.file_op(args->arg, bserrno); 1066 free_fs_request(req); 1067 return; 1068 } 1069 1070 f->blobid = blobid; 1071 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 1072 } 1073 1074 void 1075 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 1076 spdk_file_op_complete cb_fn, void *cb_arg) 1077 { 1078 struct spdk_file *file; 1079 struct spdk_fs_request *req; 1080 struct spdk_fs_cb_args *args; 1081 1082 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1083 cb_fn(cb_arg, -ENAMETOOLONG); 1084 return; 1085 } 1086 1087 file = fs_find_file(fs, name); 1088 if (file != NULL) { 1089 cb_fn(cb_arg, -EEXIST); 1090 return; 1091 } 1092 1093 file = file_alloc(fs); 1094 if (file == NULL) { 1095 SPDK_ERRLOG("Cannot allocate new file for creation\n"); 1096 cb_fn(cb_arg, -ENOMEM); 1097 return; 1098 } 1099 1100 req = alloc_fs_request(fs->md_target.md_fs_channel); 1101 if (req == NULL) { 1102 SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name); 1103 cb_fn(cb_arg, -ENOMEM); 1104 return; 1105 } 1106 1107 args = &req->args; 1108 args->file = file; 1109 args->fn.file_op = cb_fn; 1110 args->arg = cb_arg; 1111 1112 file->name = strdup(name); 1113 _file_build_trace_arg_name(file); 1114 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 1115 } 1116 1117 static void 1118 __fs_create_file_done(void *arg, int fserrno) 1119 { 1120 struct spdk_fs_request *req = arg; 1121 struct spdk_fs_cb_args *args = &req->args; 1122 1123 __wake_caller(args, fserrno); 1124 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name); 1125 } 1126 1127 static void 1128 __fs_create_file(void *arg) 1129 { 1130 struct spdk_fs_request *req = arg; 1131 struct spdk_fs_cb_args *args = &req->args; 1132 1133 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name); 1134 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1135 } 1136 1137 int 1138 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name) 1139 { 1140 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1141 struct spdk_fs_request *req; 1142 struct spdk_fs_cb_args *args; 1143 int rc; 1144 1145 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1146 1147 req = alloc_fs_request(channel); 1148 if (req == NULL) { 1149 SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name); 1150 return -ENOMEM; 1151 } 1152 1153 args = &req->args; 1154 args->fs = fs; 1155 args->op.create.name = name; 1156 args->sem = &channel->sem; 1157 fs->send_request(__fs_create_file, req); 1158 sem_wait(&channel->sem); 1159 rc = args->rc; 1160 free_fs_request(req); 1161 1162 return rc; 1163 } 1164 1165 static void 1166 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1167 { 1168 struct spdk_fs_request *req = ctx; 1169 struct spdk_fs_cb_args *args = &req->args; 1170 struct spdk_file *f = args->file; 1171 1172 f->blob = blob; 1173 while (!TAILQ_EMPTY(&f->open_requests)) { 1174 req = TAILQ_FIRST(&f->open_requests); 1175 args = &req->args; 1176 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1177 spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name); 1178 args->fn.file_op_with_handle(args->arg, f, bserrno); 1179 free_fs_request(req); 1180 } 1181 } 1182 1183 static void 1184 fs_open_blob_create_cb(void *ctx, int bserrno) 1185 { 1186 struct spdk_fs_request *req = ctx; 1187 struct spdk_fs_cb_args *args = &req->args; 1188 struct spdk_file *file = args->file; 1189 struct spdk_filesystem *fs = args->fs; 1190 1191 if (file == NULL) { 1192 /* 1193 * This is from an open with CREATE flag - the file 1194 * is now created so look it up in the file list for this 1195 * filesystem. 1196 */ 1197 file = fs_find_file(fs, args->op.open.name); 1198 assert(file != NULL); 1199 args->file = file; 1200 } 1201 1202 file->ref_count++; 1203 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1204 if (file->ref_count == 1) { 1205 assert(file->blob == NULL); 1206 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1207 } else if (file->blob != NULL) { 1208 fs_open_blob_done(req, file->blob, 0); 1209 } else { 1210 /* 1211 * The blob open for this file is in progress due to a previous 1212 * open request. When that open completes, it will invoke the 1213 * open callback for this request. 1214 */ 1215 } 1216 } 1217 1218 void 1219 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1220 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1221 { 1222 struct spdk_file *f = NULL; 1223 struct spdk_fs_request *req; 1224 struct spdk_fs_cb_args *args; 1225 1226 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1227 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1228 return; 1229 } 1230 1231 f = fs_find_file(fs, name); 1232 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1233 cb_fn(cb_arg, NULL, -ENOENT); 1234 return; 1235 } 1236 1237 if (f != NULL && f->is_deleted == true) { 1238 cb_fn(cb_arg, NULL, -ENOENT); 1239 return; 1240 } 1241 1242 req = alloc_fs_request(fs->md_target.md_fs_channel); 1243 if (req == NULL) { 1244 SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name); 1245 cb_fn(cb_arg, NULL, -ENOMEM); 1246 return; 1247 } 1248 1249 args = &req->args; 1250 args->fn.file_op_with_handle = cb_fn; 1251 args->arg = cb_arg; 1252 args->file = f; 1253 args->fs = fs; 1254 args->op.open.name = name; 1255 1256 if (f == NULL) { 1257 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1258 } else { 1259 fs_open_blob_create_cb(req, 0); 1260 } 1261 } 1262 1263 static void 1264 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1265 { 1266 struct spdk_fs_request *req = arg; 1267 struct spdk_fs_cb_args *args = &req->args; 1268 1269 args->file = file; 1270 __wake_caller(args, bserrno); 1271 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name); 1272 } 1273 1274 static void 1275 __fs_open_file(void *arg) 1276 { 1277 struct spdk_fs_request *req = arg; 1278 struct spdk_fs_cb_args *args = &req->args; 1279 1280 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name); 1281 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1282 __fs_open_file_done, req); 1283 } 1284 1285 int 1286 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1287 const char *name, uint32_t flags, struct spdk_file **file) 1288 { 1289 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1290 struct spdk_fs_request *req; 1291 struct spdk_fs_cb_args *args; 1292 int rc; 1293 1294 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1295 1296 req = alloc_fs_request(channel); 1297 if (req == NULL) { 1298 SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name); 1299 return -ENOMEM; 1300 } 1301 1302 args = &req->args; 1303 args->fs = fs; 1304 args->op.open.name = name; 1305 args->op.open.flags = flags; 1306 args->sem = &channel->sem; 1307 fs->send_request(__fs_open_file, req); 1308 sem_wait(&channel->sem); 1309 rc = args->rc; 1310 if (rc == 0) { 1311 *file = args->file; 1312 } else { 1313 *file = NULL; 1314 } 1315 free_fs_request(req); 1316 1317 return rc; 1318 } 1319 1320 static void 1321 fs_rename_blob_close_cb(void *ctx, int bserrno) 1322 { 1323 struct spdk_fs_request *req = ctx; 1324 struct spdk_fs_cb_args *args = &req->args; 1325 1326 args->fn.fs_op(args->arg, bserrno); 1327 free_fs_request(req); 1328 } 1329 1330 static void 1331 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1332 { 1333 struct spdk_fs_request *req = ctx; 1334 struct spdk_fs_cb_args *args = &req->args; 1335 const char *new_name = args->op.rename.new_name; 1336 1337 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1338 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1339 } 1340 1341 static void 1342 _fs_md_rename_file(struct spdk_fs_request *req) 1343 { 1344 struct spdk_fs_cb_args *args = &req->args; 1345 struct spdk_file *f; 1346 1347 f = fs_find_file(args->fs, args->op.rename.old_name); 1348 if (f == NULL) { 1349 args->fn.fs_op(args->arg, -ENOENT); 1350 free_fs_request(req); 1351 return; 1352 } 1353 1354 free(f->name); 1355 f->name = strdup(args->op.rename.new_name); 1356 _file_build_trace_arg_name(f); 1357 args->file = f; 1358 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1359 } 1360 1361 static void 1362 fs_rename_delete_done(void *arg, int fserrno) 1363 { 1364 _fs_md_rename_file(arg); 1365 } 1366 1367 void 1368 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1369 const char *old_name, const char *new_name, 1370 spdk_file_op_complete cb_fn, void *cb_arg) 1371 { 1372 struct spdk_file *f; 1373 struct spdk_fs_request *req; 1374 struct spdk_fs_cb_args *args; 1375 1376 SPDK_DEBUGLOG(blobfs, "old=%s new=%s\n", old_name, new_name); 1377 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1378 cb_fn(cb_arg, -ENAMETOOLONG); 1379 return; 1380 } 1381 1382 req = alloc_fs_request(fs->md_target.md_fs_channel); 1383 if (req == NULL) { 1384 SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name, 1385 new_name); 1386 cb_fn(cb_arg, -ENOMEM); 1387 return; 1388 } 1389 1390 args = &req->args; 1391 args->fn.fs_op = cb_fn; 1392 args->fs = fs; 1393 args->arg = cb_arg; 1394 args->op.rename.old_name = old_name; 1395 args->op.rename.new_name = new_name; 1396 1397 f = fs_find_file(fs, new_name); 1398 if (f == NULL) { 1399 _fs_md_rename_file(req); 1400 return; 1401 } 1402 1403 /* 1404 * The rename overwrites an existing file. So delete the existing file, then 1405 * do the actual rename. 1406 */ 1407 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1408 } 1409 1410 static void 1411 __fs_rename_file_done(void *arg, int fserrno) 1412 { 1413 struct spdk_fs_request *req = arg; 1414 struct spdk_fs_cb_args *args = &req->args; 1415 1416 __wake_caller(args, fserrno); 1417 } 1418 1419 static void 1420 __fs_rename_file(void *arg) 1421 { 1422 struct spdk_fs_request *req = arg; 1423 struct spdk_fs_cb_args *args = &req->args; 1424 1425 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1426 __fs_rename_file_done, req); 1427 } 1428 1429 int 1430 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1431 const char *old_name, const char *new_name) 1432 { 1433 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1434 struct spdk_fs_request *req; 1435 struct spdk_fs_cb_args *args; 1436 int rc; 1437 1438 req = alloc_fs_request(channel); 1439 if (req == NULL) { 1440 SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name); 1441 return -ENOMEM; 1442 } 1443 1444 args = &req->args; 1445 1446 args->fs = fs; 1447 args->op.rename.old_name = old_name; 1448 args->op.rename.new_name = new_name; 1449 args->sem = &channel->sem; 1450 fs->send_request(__fs_rename_file, req); 1451 sem_wait(&channel->sem); 1452 rc = args->rc; 1453 free_fs_request(req); 1454 return rc; 1455 } 1456 1457 static void 1458 blob_delete_cb(void *ctx, int bserrno) 1459 { 1460 struct spdk_fs_request *req = ctx; 1461 struct spdk_fs_cb_args *args = &req->args; 1462 1463 args->fn.file_op(args->arg, bserrno); 1464 free_fs_request(req); 1465 } 1466 1467 void 1468 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1469 spdk_file_op_complete cb_fn, void *cb_arg) 1470 { 1471 struct spdk_file *f; 1472 spdk_blob_id blobid; 1473 struct spdk_fs_request *req; 1474 struct spdk_fs_cb_args *args; 1475 1476 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1477 1478 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1479 cb_fn(cb_arg, -ENAMETOOLONG); 1480 return; 1481 } 1482 1483 f = fs_find_file(fs, name); 1484 if (f == NULL) { 1485 SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name); 1486 cb_fn(cb_arg, -ENOENT); 1487 return; 1488 } 1489 1490 req = alloc_fs_request(fs->md_target.md_fs_channel); 1491 if (req == NULL) { 1492 SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name); 1493 cb_fn(cb_arg, -ENOMEM); 1494 return; 1495 } 1496 1497 args = &req->args; 1498 args->fn.file_op = cb_fn; 1499 args->arg = cb_arg; 1500 1501 if (f->ref_count > 0) { 1502 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1503 f->is_deleted = true; 1504 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1505 spdk_blob_sync_md(f->blob, blob_delete_cb, req); 1506 return; 1507 } 1508 1509 blobid = f->blobid; 1510 TAILQ_REMOVE(&fs->files, f, tailq); 1511 1512 file_free(f); 1513 1514 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1515 } 1516 1517 static uint64_t 1518 fs_name_to_uint64(const char *name) 1519 { 1520 uint64_t result = 0; 1521 memcpy(&result, name, spdk_min(sizeof(result), strlen(name))); 1522 return result; 1523 } 1524 1525 static void 1526 __fs_delete_file_done(void *arg, int fserrno) 1527 { 1528 struct spdk_fs_request *req = arg; 1529 struct spdk_fs_cb_args *args = &req->args; 1530 1531 spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, fs_name_to_uint64(args->op.delete.name)); 1532 __wake_caller(args, fserrno); 1533 } 1534 1535 static void 1536 __fs_delete_file(void *arg) 1537 { 1538 struct spdk_fs_request *req = arg; 1539 struct spdk_fs_cb_args *args = &req->args; 1540 1541 spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, fs_name_to_uint64(args->op.delete.name)); 1542 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1543 } 1544 1545 int 1546 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1547 const char *name) 1548 { 1549 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1550 struct spdk_fs_request *req; 1551 struct spdk_fs_cb_args *args; 1552 int rc; 1553 1554 req = alloc_fs_request(channel); 1555 if (req == NULL) { 1556 SPDK_DEBUGLOG(blobfs, "Cannot allocate req to delete file=%s\n", name); 1557 return -ENOMEM; 1558 } 1559 1560 args = &req->args; 1561 args->fs = fs; 1562 args->op.delete.name = name; 1563 args->sem = &channel->sem; 1564 fs->send_request(__fs_delete_file, req); 1565 sem_wait(&channel->sem); 1566 rc = args->rc; 1567 free_fs_request(req); 1568 1569 return rc; 1570 } 1571 1572 spdk_fs_iter 1573 spdk_fs_iter_first(struct spdk_filesystem *fs) 1574 { 1575 struct spdk_file *f; 1576 1577 f = TAILQ_FIRST(&fs->files); 1578 return f; 1579 } 1580 1581 spdk_fs_iter 1582 spdk_fs_iter_next(spdk_fs_iter iter) 1583 { 1584 struct spdk_file *f = iter; 1585 1586 if (f == NULL) { 1587 return NULL; 1588 } 1589 1590 f = TAILQ_NEXT(f, tailq); 1591 return f; 1592 } 1593 1594 const char * 1595 spdk_file_get_name(struct spdk_file *file) 1596 { 1597 return file->name; 1598 } 1599 1600 uint64_t 1601 spdk_file_get_length(struct spdk_file *file) 1602 { 1603 uint64_t length; 1604 1605 assert(file != NULL); 1606 1607 length = file->append_pos >= file->length ? file->append_pos : file->length; 1608 SPDK_DEBUGLOG(blobfs, "file=%s length=0x%jx\n", file->name, length); 1609 return length; 1610 } 1611 1612 static void 1613 fs_truncate_complete_cb(void *ctx, int bserrno) 1614 { 1615 struct spdk_fs_request *req = ctx; 1616 struct spdk_fs_cb_args *args = &req->args; 1617 1618 args->fn.file_op(args->arg, bserrno); 1619 free_fs_request(req); 1620 } 1621 1622 static void 1623 fs_truncate_resize_cb(void *ctx, int bserrno) 1624 { 1625 struct spdk_fs_request *req = ctx; 1626 struct spdk_fs_cb_args *args = &req->args; 1627 struct spdk_file *file = args->file; 1628 uint64_t *length = &args->op.truncate.length; 1629 1630 if (bserrno) { 1631 args->fn.file_op(args->arg, bserrno); 1632 free_fs_request(req); 1633 return; 1634 } 1635 1636 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1637 1638 file->length = *length; 1639 if (file->append_pos > file->length) { 1640 file->append_pos = file->length; 1641 } 1642 1643 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req); 1644 } 1645 1646 static uint64_t 1647 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1648 { 1649 return (length + cluster_sz - 1) / cluster_sz; 1650 } 1651 1652 void 1653 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1654 spdk_file_op_complete cb_fn, void *cb_arg) 1655 { 1656 struct spdk_filesystem *fs; 1657 size_t num_clusters; 1658 struct spdk_fs_request *req; 1659 struct spdk_fs_cb_args *args; 1660 1661 SPDK_DEBUGLOG(blobfs, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1662 if (length == file->length) { 1663 cb_fn(cb_arg, 0); 1664 return; 1665 } 1666 1667 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1668 if (req == NULL) { 1669 cb_fn(cb_arg, -ENOMEM); 1670 return; 1671 } 1672 1673 args = &req->args; 1674 args->fn.file_op = cb_fn; 1675 args->arg = cb_arg; 1676 args->file = file; 1677 args->op.truncate.length = length; 1678 fs = file->fs; 1679 1680 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1681 1682 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1683 } 1684 1685 static void 1686 __truncate(void *arg) 1687 { 1688 struct spdk_fs_request *req = arg; 1689 struct spdk_fs_cb_args *args = &req->args; 1690 1691 spdk_file_truncate_async(args->file, args->op.truncate.length, 1692 args->fn.file_op, args); 1693 } 1694 1695 int 1696 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 1697 uint64_t length) 1698 { 1699 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1700 struct spdk_fs_request *req; 1701 struct spdk_fs_cb_args *args; 1702 int rc; 1703 1704 req = alloc_fs_request(channel); 1705 if (req == NULL) { 1706 return -ENOMEM; 1707 } 1708 1709 args = &req->args; 1710 1711 args->file = file; 1712 args->op.truncate.length = length; 1713 args->fn.file_op = __wake_caller; 1714 args->sem = &channel->sem; 1715 1716 channel->send_request(__truncate, req); 1717 sem_wait(&channel->sem); 1718 rc = args->rc; 1719 free_fs_request(req); 1720 1721 return rc; 1722 } 1723 1724 static void 1725 __rw_done(void *ctx, int bserrno) 1726 { 1727 struct spdk_fs_request *req = ctx; 1728 struct spdk_fs_cb_args *args = &req->args; 1729 1730 spdk_free(args->op.rw.pin_buf); 1731 args->fn.file_op(args->arg, bserrno); 1732 free_fs_request(req); 1733 } 1734 1735 static void 1736 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 1737 { 1738 int i; 1739 size_t len; 1740 1741 for (i = 0; i < iovcnt; i++) { 1742 len = spdk_min(iovs[i].iov_len, buf_len); 1743 memcpy(buf, iovs[i].iov_base, len); 1744 buf += len; 1745 assert(buf_len >= len); 1746 buf_len -= len; 1747 } 1748 } 1749 1750 static void 1751 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 1752 { 1753 int i; 1754 size_t len; 1755 1756 for (i = 0; i < iovcnt; i++) { 1757 len = spdk_min(iovs[i].iov_len, buf_len); 1758 memcpy(iovs[i].iov_base, buf, len); 1759 buf += len; 1760 assert(buf_len >= len); 1761 buf_len -= len; 1762 } 1763 } 1764 1765 static void 1766 __read_done(void *ctx, int bserrno) 1767 { 1768 struct spdk_fs_request *req = ctx; 1769 struct spdk_fs_cb_args *args = &req->args; 1770 void *buf; 1771 1772 assert(req != NULL); 1773 buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1))); 1774 if (args->op.rw.is_read) { 1775 _copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length); 1776 __rw_done(req, 0); 1777 } else { 1778 _copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt); 1779 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1780 args->op.rw.pin_buf, 1781 args->op.rw.start_lba, args->op.rw.num_lba, 1782 __rw_done, req); 1783 } 1784 } 1785 1786 static void 1787 __do_blob_read(void *ctx, int fserrno) 1788 { 1789 struct spdk_fs_request *req = ctx; 1790 struct spdk_fs_cb_args *args = &req->args; 1791 1792 if (fserrno) { 1793 __rw_done(req, fserrno); 1794 return; 1795 } 1796 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1797 args->op.rw.pin_buf, 1798 args->op.rw.start_lba, args->op.rw.num_lba, 1799 __read_done, req); 1800 } 1801 1802 static void 1803 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1804 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1805 { 1806 uint64_t end_lba; 1807 1808 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1809 *start_lba = offset / *lba_size; 1810 end_lba = (offset + length - 1) / *lba_size; 1811 *num_lba = (end_lba - *start_lba + 1); 1812 } 1813 1814 static bool 1815 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length) 1816 { 1817 uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1818 1819 if ((offset % lba_size == 0) && (length % lba_size == 0)) { 1820 return true; 1821 } 1822 1823 return false; 1824 } 1825 1826 static void 1827 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt) 1828 { 1829 uint32_t i; 1830 1831 for (i = 0; i < iovcnt; i++) { 1832 req->args.iovs[i].iov_base = iovs[i].iov_base; 1833 req->args.iovs[i].iov_len = iovs[i].iov_len; 1834 } 1835 } 1836 1837 static void 1838 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel, 1839 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1840 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1841 { 1842 struct spdk_fs_request *req; 1843 struct spdk_fs_cb_args *args; 1844 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1845 uint64_t start_lba, num_lba, pin_buf_length; 1846 uint32_t lba_size; 1847 1848 if (is_read && offset + length > file->length) { 1849 cb_fn(cb_arg, -EINVAL); 1850 return; 1851 } 1852 1853 req = alloc_fs_request_with_iov(channel, iovcnt); 1854 if (req == NULL) { 1855 cb_fn(cb_arg, -ENOMEM); 1856 return; 1857 } 1858 1859 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1860 1861 args = &req->args; 1862 args->fn.file_op = cb_fn; 1863 args->arg = cb_arg; 1864 args->file = file; 1865 args->op.rw.channel = channel->bs_channel; 1866 _fs_request_setup_iovs(req, iovs, iovcnt); 1867 args->op.rw.is_read = is_read; 1868 args->op.rw.offset = offset; 1869 args->op.rw.blocklen = lba_size; 1870 1871 pin_buf_length = num_lba * lba_size; 1872 args->op.rw.length = pin_buf_length; 1873 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL, 1874 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1875 if (args->op.rw.pin_buf == NULL) { 1876 SPDK_DEBUGLOG(blobfs, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1877 file->name, offset, length); 1878 free_fs_request(req); 1879 cb_fn(cb_arg, -ENOMEM); 1880 return; 1881 } 1882 1883 args->op.rw.start_lba = start_lba; 1884 args->op.rw.num_lba = num_lba; 1885 1886 if (!is_read && file->length < offset + length) { 1887 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1888 } else if (!is_read && __is_lba_aligned(file, offset, length)) { 1889 _copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt); 1890 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1891 args->op.rw.pin_buf, 1892 args->op.rw.start_lba, args->op.rw.num_lba, 1893 __rw_done, req); 1894 } else { 1895 __do_blob_read(req, 0); 1896 } 1897 } 1898 1899 static void 1900 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel, 1901 void *payload, uint64_t offset, uint64_t length, 1902 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1903 { 1904 struct iovec iov; 1905 1906 iov.iov_base = payload; 1907 iov.iov_len = (size_t)length; 1908 1909 __readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read); 1910 } 1911 1912 void 1913 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1914 void *payload, uint64_t offset, uint64_t length, 1915 spdk_file_op_complete cb_fn, void *cb_arg) 1916 { 1917 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1918 } 1919 1920 void 1921 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel, 1922 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1923 spdk_file_op_complete cb_fn, void *cb_arg) 1924 { 1925 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1926 file->name, offset, length); 1927 1928 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0); 1929 } 1930 1931 void 1932 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1933 void *payload, uint64_t offset, uint64_t length, 1934 spdk_file_op_complete cb_fn, void *cb_arg) 1935 { 1936 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1937 file->name, offset, length); 1938 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1939 } 1940 1941 void 1942 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel, 1943 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1944 spdk_file_op_complete cb_fn, void *cb_arg) 1945 { 1946 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1947 file->name, offset, length); 1948 1949 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1); 1950 } 1951 1952 struct spdk_io_channel * 1953 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1954 { 1955 struct spdk_io_channel *io_channel; 1956 struct spdk_fs_channel *fs_channel; 1957 1958 io_channel = spdk_get_io_channel(&fs->io_target); 1959 fs_channel = spdk_io_channel_get_ctx(io_channel); 1960 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1961 fs_channel->send_request = __send_request_direct; 1962 1963 return io_channel; 1964 } 1965 1966 void 1967 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1968 { 1969 spdk_put_io_channel(channel); 1970 } 1971 1972 struct spdk_fs_thread_ctx * 1973 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs) 1974 { 1975 struct spdk_fs_thread_ctx *ctx; 1976 1977 ctx = calloc(1, sizeof(*ctx)); 1978 if (!ctx) { 1979 return NULL; 1980 } 1981 1982 if (pthread_spin_init(&ctx->ch.lock, 0)) { 1983 free(ctx); 1984 return NULL; 1985 } 1986 1987 fs_channel_create(fs, &ctx->ch, 512); 1988 1989 ctx->ch.send_request = fs->send_request; 1990 ctx->ch.sync = 1; 1991 1992 return ctx; 1993 } 1994 1995 1996 void 1997 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx) 1998 { 1999 assert(ctx->ch.sync == 1); 2000 2001 while (true) { 2002 pthread_spin_lock(&ctx->ch.lock); 2003 if (ctx->ch.outstanding_reqs == 0) { 2004 pthread_spin_unlock(&ctx->ch.lock); 2005 break; 2006 } 2007 pthread_spin_unlock(&ctx->ch.lock); 2008 usleep(1000); 2009 } 2010 2011 fs_channel_destroy(NULL, &ctx->ch); 2012 free(ctx); 2013 } 2014 2015 int 2016 spdk_fs_set_cache_size(uint64_t size_in_mb) 2017 { 2018 /* setting g_fs_cache_size is only permitted if cache pool 2019 * is already freed or hasn't been initialized 2020 */ 2021 if (g_cache_pool != NULL) { 2022 return -EPERM; 2023 } 2024 2025 g_fs_cache_size = size_in_mb * 1024 * 1024; 2026 2027 return 0; 2028 } 2029 2030 uint64_t 2031 spdk_fs_get_cache_size(void) 2032 { 2033 return g_fs_cache_size / (1024 * 1024); 2034 } 2035 2036 static void __file_flush(void *ctx); 2037 2038 /* Try to free some cache buffers from this file. 2039 */ 2040 static int 2041 reclaim_cache_buffers(struct spdk_file *file) 2042 { 2043 int rc; 2044 2045 BLOBFS_TRACE(file, "free=%s\n", file->name); 2046 2047 /* The function is safe to be called with any threads, while the file 2048 * lock maybe locked by other thread for now, so try to get the file 2049 * lock here. 2050 */ 2051 rc = pthread_spin_trylock(&file->lock); 2052 if (rc != 0) { 2053 return -1; 2054 } 2055 2056 if (file->tree->present_mask == 0) { 2057 pthread_spin_unlock(&file->lock); 2058 return -1; 2059 } 2060 tree_free_buffers(file->tree); 2061 2062 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2063 /* If not freed, put it in the end of the queue */ 2064 if (file->tree->present_mask != 0) { 2065 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2066 } else { 2067 file->last = NULL; 2068 } 2069 pthread_spin_unlock(&file->lock); 2070 2071 return 0; 2072 } 2073 2074 static int 2075 _blobfs_cache_pool_reclaim(void *arg) 2076 { 2077 struct spdk_file *file, *tmp; 2078 int rc; 2079 2080 if (!blobfs_cache_pool_need_reclaim()) { 2081 return SPDK_POLLER_IDLE; 2082 } 2083 2084 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2085 if (!file->open_for_writing && 2086 file->priority == SPDK_FILE_PRIORITY_LOW) { 2087 rc = reclaim_cache_buffers(file); 2088 if (rc < 0) { 2089 continue; 2090 } 2091 if (!blobfs_cache_pool_need_reclaim()) { 2092 return SPDK_POLLER_BUSY; 2093 } 2094 break; 2095 } 2096 } 2097 2098 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2099 if (!file->open_for_writing) { 2100 rc = reclaim_cache_buffers(file); 2101 if (rc < 0) { 2102 continue; 2103 } 2104 if (!blobfs_cache_pool_need_reclaim()) { 2105 return SPDK_POLLER_BUSY; 2106 } 2107 break; 2108 } 2109 } 2110 2111 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2112 rc = reclaim_cache_buffers(file); 2113 if (rc < 0) { 2114 continue; 2115 } 2116 break; 2117 } 2118 2119 return SPDK_POLLER_BUSY; 2120 } 2121 2122 static void 2123 _add_file_to_cache_pool(void *ctx) 2124 { 2125 struct spdk_file *file = ctx; 2126 2127 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2128 } 2129 2130 static void 2131 _remove_file_from_cache_pool(void *ctx) 2132 { 2133 struct spdk_file *file = ctx; 2134 2135 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2136 } 2137 2138 static struct cache_buffer * 2139 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 2140 { 2141 struct cache_buffer *buf; 2142 int count = 0; 2143 bool need_update = false; 2144 2145 buf = calloc(1, sizeof(*buf)); 2146 if (buf == NULL) { 2147 SPDK_DEBUGLOG(blobfs, "calloc failed\n"); 2148 return NULL; 2149 } 2150 2151 do { 2152 buf->buf = spdk_mempool_get(g_cache_pool); 2153 if (buf->buf) { 2154 break; 2155 } 2156 if (count++ == 100) { 2157 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 2158 file, offset); 2159 free(buf); 2160 return NULL; 2161 } 2162 usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 2163 } while (true); 2164 2165 buf->buf_size = CACHE_BUFFER_SIZE; 2166 buf->offset = offset; 2167 2168 if (file->tree->present_mask == 0) { 2169 need_update = true; 2170 } 2171 file->tree = tree_insert_buffer(file->tree, buf); 2172 2173 if (need_update) { 2174 spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file); 2175 } 2176 2177 return buf; 2178 } 2179 2180 static struct cache_buffer * 2181 cache_append_buffer(struct spdk_file *file) 2182 { 2183 struct cache_buffer *last; 2184 2185 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 2186 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 2187 2188 last = cache_insert_buffer(file, file->append_pos); 2189 if (last == NULL) { 2190 SPDK_DEBUGLOG(blobfs, "cache_insert_buffer failed\n"); 2191 return NULL; 2192 } 2193 2194 file->last = last; 2195 2196 return last; 2197 } 2198 2199 static void __check_sync_reqs(struct spdk_file *file); 2200 2201 static void 2202 __file_cache_finish_sync(void *ctx, int bserrno) 2203 { 2204 struct spdk_file *file; 2205 struct spdk_fs_request *sync_req = ctx; 2206 struct spdk_fs_cb_args *sync_args; 2207 2208 sync_args = &sync_req->args; 2209 file = sync_args->file; 2210 pthread_spin_lock(&file->lock); 2211 file->length_xattr = sync_args->op.sync.length; 2212 assert(sync_args->op.sync.offset <= file->length_flushed); 2213 spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset, 2214 0, file->trace_arg_name); 2215 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 2216 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 2217 pthread_spin_unlock(&file->lock); 2218 2219 sync_args->fn.file_op(sync_args->arg, bserrno); 2220 2221 free_fs_request(sync_req); 2222 __check_sync_reqs(file); 2223 } 2224 2225 static void 2226 __check_sync_reqs(struct spdk_file *file) 2227 { 2228 struct spdk_fs_request *sync_req; 2229 2230 pthread_spin_lock(&file->lock); 2231 2232 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 2233 if (sync_req->args.op.sync.offset <= file->length_flushed) { 2234 break; 2235 } 2236 } 2237 2238 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 2239 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 2240 sync_req->args.op.sync.xattr_in_progress = true; 2241 sync_req->args.op.sync.length = file->length_flushed; 2242 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 2243 sizeof(file->length_flushed)); 2244 2245 pthread_spin_unlock(&file->lock); 2246 spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed, 2247 0, file->trace_arg_name); 2248 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req); 2249 } else { 2250 pthread_spin_unlock(&file->lock); 2251 } 2252 } 2253 2254 static void 2255 __file_flush_done(void *ctx, int bserrno) 2256 { 2257 struct spdk_fs_request *req = ctx; 2258 struct spdk_fs_cb_args *args = &req->args; 2259 struct spdk_file *file = args->file; 2260 struct cache_buffer *next = args->op.flush.cache_buffer; 2261 2262 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 2263 2264 pthread_spin_lock(&file->lock); 2265 next->in_progress = false; 2266 next->bytes_flushed += args->op.flush.length; 2267 file->length_flushed += args->op.flush.length; 2268 if (file->length_flushed > file->length) { 2269 file->length = file->length_flushed; 2270 } 2271 if (next->bytes_flushed == next->buf_size) { 2272 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 2273 next = tree_find_buffer(file->tree, file->length_flushed); 2274 } 2275 2276 /* 2277 * Assert that there is no cached data that extends past the end of the underlying 2278 * blob. 2279 */ 2280 assert(next == NULL || next->offset < __file_get_blob_size(file) || 2281 next->bytes_filled == 0); 2282 2283 pthread_spin_unlock(&file->lock); 2284 2285 __check_sync_reqs(file); 2286 2287 __file_flush(req); 2288 } 2289 2290 static void 2291 __file_flush(void *ctx) 2292 { 2293 struct spdk_fs_request *req = ctx; 2294 struct spdk_fs_cb_args *args = &req->args; 2295 struct spdk_file *file = args->file; 2296 struct cache_buffer *next; 2297 uint64_t offset, length, start_lba, num_lba; 2298 uint32_t lba_size; 2299 2300 pthread_spin_lock(&file->lock); 2301 next = tree_find_buffer(file->tree, file->length_flushed); 2302 if (next == NULL || next->in_progress || 2303 ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) { 2304 /* 2305 * There is either no data to flush, a flush I/O is already in 2306 * progress, or the next buffer is partially filled but there's no 2307 * outstanding request to sync it. 2308 * So return immediately - if a flush I/O is in progress we will flush 2309 * more data after that is completed, or a partial buffer will get flushed 2310 * when it is either filled or the file is synced. 2311 */ 2312 free_fs_request(req); 2313 if (next == NULL) { 2314 /* 2315 * For cases where a file's cache was evicted, and then the 2316 * file was later appended, we will write the data directly 2317 * to disk and bypass cache. So just update length_flushed 2318 * here to reflect that all data was already written to disk. 2319 */ 2320 file->length_flushed = file->append_pos; 2321 } 2322 pthread_spin_unlock(&file->lock); 2323 if (next == NULL) { 2324 /* 2325 * There is no data to flush, but we still need to check for any 2326 * outstanding sync requests to make sure metadata gets updated. 2327 */ 2328 __check_sync_reqs(file); 2329 } 2330 return; 2331 } 2332 2333 offset = next->offset + next->bytes_flushed; 2334 length = next->bytes_filled - next->bytes_flushed; 2335 if (length == 0) { 2336 free_fs_request(req); 2337 pthread_spin_unlock(&file->lock); 2338 /* 2339 * There is no data to flush, but we still need to check for any 2340 * outstanding sync requests to make sure metadata gets updated. 2341 */ 2342 __check_sync_reqs(file); 2343 return; 2344 } 2345 args->op.flush.length = length; 2346 args->op.flush.cache_buffer = next; 2347 2348 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2349 2350 next->in_progress = true; 2351 BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n", 2352 offset, length, start_lba, num_lba); 2353 pthread_spin_unlock(&file->lock); 2354 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2355 next->buf + (start_lba * lba_size) - next->offset, 2356 start_lba, num_lba, __file_flush_done, req); 2357 } 2358 2359 static void 2360 __file_extend_done(void *arg, int bserrno) 2361 { 2362 struct spdk_fs_cb_args *args = arg; 2363 2364 __wake_caller(args, bserrno); 2365 } 2366 2367 static void 2368 __file_extend_resize_cb(void *_args, int bserrno) 2369 { 2370 struct spdk_fs_cb_args *args = _args; 2371 struct spdk_file *file = args->file; 2372 2373 if (bserrno) { 2374 __wake_caller(args, bserrno); 2375 return; 2376 } 2377 2378 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2379 } 2380 2381 static void 2382 __file_extend_blob(void *_args) 2383 { 2384 struct spdk_fs_cb_args *args = _args; 2385 struct spdk_file *file = args->file; 2386 2387 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2388 } 2389 2390 static void 2391 __rw_from_file_done(void *ctx, int bserrno) 2392 { 2393 struct spdk_fs_request *req = ctx; 2394 2395 __wake_caller(&req->args, bserrno); 2396 free_fs_request(req); 2397 } 2398 2399 static void 2400 __rw_from_file(void *ctx) 2401 { 2402 struct spdk_fs_request *req = ctx; 2403 struct spdk_fs_cb_args *args = &req->args; 2404 struct spdk_file *file = args->file; 2405 2406 if (args->op.rw.is_read) { 2407 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2408 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2409 __rw_from_file_done, req); 2410 } else { 2411 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2412 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2413 __rw_from_file_done, req); 2414 } 2415 } 2416 2417 struct rw_from_file_arg { 2418 struct spdk_fs_channel *channel; 2419 int rwerrno; 2420 }; 2421 2422 static int 2423 __send_rw_from_file(struct spdk_file *file, void *payload, 2424 uint64_t offset, uint64_t length, bool is_read, 2425 struct rw_from_file_arg *arg) 2426 { 2427 struct spdk_fs_request *req; 2428 struct spdk_fs_cb_args *args; 2429 2430 req = alloc_fs_request_with_iov(arg->channel, 1); 2431 if (req == NULL) { 2432 sem_post(&arg->channel->sem); 2433 return -ENOMEM; 2434 } 2435 2436 args = &req->args; 2437 args->file = file; 2438 args->sem = &arg->channel->sem; 2439 args->iovs[0].iov_base = payload; 2440 args->iovs[0].iov_len = (size_t)length; 2441 args->op.rw.offset = offset; 2442 args->op.rw.is_read = is_read; 2443 args->rwerrno = &arg->rwerrno; 2444 file->fs->send_request(__rw_from_file, req); 2445 return 0; 2446 } 2447 2448 int 2449 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2450 void *payload, uint64_t offset, uint64_t length) 2451 { 2452 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2453 struct spdk_fs_request *flush_req; 2454 uint64_t rem_length, copy, blob_size, cluster_sz; 2455 uint32_t cache_buffers_filled = 0; 2456 uint8_t *cur_payload; 2457 struct cache_buffer *last; 2458 2459 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2460 2461 if (length == 0) { 2462 return 0; 2463 } 2464 2465 if (offset != file->append_pos) { 2466 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2467 return -EINVAL; 2468 } 2469 2470 pthread_spin_lock(&file->lock); 2471 file->open_for_writing = true; 2472 2473 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2474 cache_append_buffer(file); 2475 } 2476 2477 if (file->last == NULL) { 2478 struct rw_from_file_arg arg = {}; 2479 int rc; 2480 2481 arg.channel = channel; 2482 arg.rwerrno = 0; 2483 file->append_pos += length; 2484 pthread_spin_unlock(&file->lock); 2485 rc = __send_rw_from_file(file, payload, offset, length, false, &arg); 2486 if (rc != 0) { 2487 return rc; 2488 } 2489 sem_wait(&channel->sem); 2490 return arg.rwerrno; 2491 } 2492 2493 blob_size = __file_get_blob_size(file); 2494 2495 if ((offset + length) > blob_size) { 2496 struct spdk_fs_cb_args extend_args = {}; 2497 2498 cluster_sz = file->fs->bs_opts.cluster_sz; 2499 extend_args.sem = &channel->sem; 2500 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2501 extend_args.file = file; 2502 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2503 pthread_spin_unlock(&file->lock); 2504 file->fs->send_request(__file_extend_blob, &extend_args); 2505 sem_wait(&channel->sem); 2506 if (extend_args.rc) { 2507 return extend_args.rc; 2508 } 2509 } 2510 2511 flush_req = alloc_fs_request(channel); 2512 if (flush_req == NULL) { 2513 pthread_spin_unlock(&file->lock); 2514 return -ENOMEM; 2515 } 2516 2517 last = file->last; 2518 rem_length = length; 2519 cur_payload = payload; 2520 while (rem_length > 0) { 2521 copy = last->buf_size - last->bytes_filled; 2522 if (copy > rem_length) { 2523 copy = rem_length; 2524 } 2525 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2526 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2527 file->append_pos += copy; 2528 if (file->length < file->append_pos) { 2529 file->length = file->append_pos; 2530 } 2531 cur_payload += copy; 2532 last->bytes_filled += copy; 2533 rem_length -= copy; 2534 if (last->bytes_filled == last->buf_size) { 2535 cache_buffers_filled++; 2536 last = cache_append_buffer(file); 2537 if (last == NULL) { 2538 BLOBFS_TRACE(file, "nomem\n"); 2539 free_fs_request(flush_req); 2540 pthread_spin_unlock(&file->lock); 2541 return -ENOMEM; 2542 } 2543 } 2544 } 2545 2546 pthread_spin_unlock(&file->lock); 2547 2548 if (cache_buffers_filled == 0) { 2549 free_fs_request(flush_req); 2550 return 0; 2551 } 2552 2553 flush_req->args.file = file; 2554 file->fs->send_request(__file_flush, flush_req); 2555 return 0; 2556 } 2557 2558 static void 2559 __readahead_done(void *ctx, int bserrno) 2560 { 2561 struct spdk_fs_request *req = ctx; 2562 struct spdk_fs_cb_args *args = &req->args; 2563 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2564 struct spdk_file *file = args->file; 2565 2566 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2567 2568 pthread_spin_lock(&file->lock); 2569 cache_buffer->bytes_filled = args->op.readahead.length; 2570 cache_buffer->bytes_flushed = args->op.readahead.length; 2571 cache_buffer->in_progress = false; 2572 pthread_spin_unlock(&file->lock); 2573 2574 free_fs_request(req); 2575 } 2576 2577 static void 2578 __readahead(void *ctx) 2579 { 2580 struct spdk_fs_request *req = ctx; 2581 struct spdk_fs_cb_args *args = &req->args; 2582 struct spdk_file *file = args->file; 2583 uint64_t offset, length, start_lba, num_lba; 2584 uint32_t lba_size; 2585 2586 offset = args->op.readahead.offset; 2587 length = args->op.readahead.length; 2588 assert(length > 0); 2589 2590 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2591 2592 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2593 offset, length, start_lba, num_lba); 2594 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2595 args->op.readahead.cache_buffer->buf, 2596 start_lba, num_lba, __readahead_done, req); 2597 } 2598 2599 static uint64_t 2600 __next_cache_buffer_offset(uint64_t offset) 2601 { 2602 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2603 } 2604 2605 static void 2606 check_readahead(struct spdk_file *file, uint64_t offset, 2607 struct spdk_fs_channel *channel) 2608 { 2609 struct spdk_fs_request *req; 2610 struct spdk_fs_cb_args *args; 2611 2612 offset = __next_cache_buffer_offset(offset); 2613 if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2614 return; 2615 } 2616 2617 req = alloc_fs_request(channel); 2618 if (req == NULL) { 2619 return; 2620 } 2621 args = &req->args; 2622 2623 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2624 2625 args->file = file; 2626 args->op.readahead.offset = offset; 2627 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2628 if (!args->op.readahead.cache_buffer) { 2629 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2630 free_fs_request(req); 2631 return; 2632 } 2633 2634 args->op.readahead.cache_buffer->in_progress = true; 2635 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2636 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2637 } else { 2638 args->op.readahead.length = CACHE_BUFFER_SIZE; 2639 } 2640 file->fs->send_request(__readahead, req); 2641 } 2642 2643 int64_t 2644 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2645 void *payload, uint64_t offset, uint64_t length) 2646 { 2647 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2648 uint64_t final_offset, final_length; 2649 uint32_t sub_reads = 0; 2650 struct cache_buffer *buf; 2651 uint64_t read_len; 2652 struct rw_from_file_arg arg = {}; 2653 2654 pthread_spin_lock(&file->lock); 2655 2656 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2657 2658 file->open_for_writing = false; 2659 2660 if (length == 0 || offset >= file->append_pos) { 2661 pthread_spin_unlock(&file->lock); 2662 return 0; 2663 } 2664 2665 if (offset + length > file->append_pos) { 2666 length = file->append_pos - offset; 2667 } 2668 2669 if (offset != file->next_seq_offset) { 2670 file->seq_byte_count = 0; 2671 } 2672 file->seq_byte_count += length; 2673 file->next_seq_offset = offset + length; 2674 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2675 check_readahead(file, offset, channel); 2676 check_readahead(file, offset + CACHE_BUFFER_SIZE, channel); 2677 } 2678 2679 arg.channel = channel; 2680 arg.rwerrno = 0; 2681 final_length = 0; 2682 final_offset = offset + length; 2683 while (offset < final_offset) { 2684 int ret = 0; 2685 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2686 if (length > (final_offset - offset)) { 2687 length = final_offset - offset; 2688 } 2689 2690 buf = tree_find_filled_buffer(file->tree, offset); 2691 if (buf == NULL) { 2692 pthread_spin_unlock(&file->lock); 2693 ret = __send_rw_from_file(file, payload, offset, length, true, &arg); 2694 pthread_spin_lock(&file->lock); 2695 if (ret == 0) { 2696 sub_reads++; 2697 } 2698 } else { 2699 read_len = length; 2700 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2701 read_len = buf->offset + buf->bytes_filled - offset; 2702 } 2703 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len); 2704 memcpy(payload, &buf->buf[offset - buf->offset], read_len); 2705 if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) { 2706 tree_remove_buffer(file->tree, buf); 2707 if (file->tree->present_mask == 0) { 2708 spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file); 2709 } 2710 } 2711 } 2712 2713 if (ret == 0) { 2714 final_length += length; 2715 } else { 2716 arg.rwerrno = ret; 2717 break; 2718 } 2719 payload += length; 2720 offset += length; 2721 } 2722 pthread_spin_unlock(&file->lock); 2723 while (sub_reads > 0) { 2724 sem_wait(&channel->sem); 2725 sub_reads--; 2726 } 2727 if (arg.rwerrno == 0) { 2728 return final_length; 2729 } else { 2730 return arg.rwerrno; 2731 } 2732 } 2733 2734 static void 2735 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2736 spdk_file_op_complete cb_fn, void *cb_arg) 2737 { 2738 struct spdk_fs_request *sync_req; 2739 struct spdk_fs_request *flush_req; 2740 struct spdk_fs_cb_args *sync_args; 2741 struct spdk_fs_cb_args *flush_args; 2742 2743 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2744 2745 pthread_spin_lock(&file->lock); 2746 if (file->append_pos <= file->length_xattr) { 2747 BLOBFS_TRACE(file, "done - file already synced\n"); 2748 pthread_spin_unlock(&file->lock); 2749 cb_fn(cb_arg, 0); 2750 return; 2751 } 2752 2753 sync_req = alloc_fs_request(channel); 2754 if (!sync_req) { 2755 SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name); 2756 pthread_spin_unlock(&file->lock); 2757 cb_fn(cb_arg, -ENOMEM); 2758 return; 2759 } 2760 sync_args = &sync_req->args; 2761 2762 flush_req = alloc_fs_request(channel); 2763 if (!flush_req) { 2764 SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name); 2765 free_fs_request(sync_req); 2766 pthread_spin_unlock(&file->lock); 2767 cb_fn(cb_arg, -ENOMEM); 2768 return; 2769 } 2770 flush_args = &flush_req->args; 2771 2772 sync_args->file = file; 2773 sync_args->fn.file_op = cb_fn; 2774 sync_args->arg = cb_arg; 2775 sync_args->op.sync.offset = file->append_pos; 2776 sync_args->op.sync.xattr_in_progress = false; 2777 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2778 pthread_spin_unlock(&file->lock); 2779 2780 flush_args->file = file; 2781 channel->send_request(__file_flush, flush_req); 2782 } 2783 2784 int 2785 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2786 { 2787 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2788 struct spdk_fs_cb_args args = {}; 2789 2790 args.sem = &channel->sem; 2791 _file_sync(file, channel, __wake_caller, &args); 2792 sem_wait(&channel->sem); 2793 2794 return args.rc; 2795 } 2796 2797 void 2798 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2799 spdk_file_op_complete cb_fn, void *cb_arg) 2800 { 2801 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2802 2803 _file_sync(file, channel, cb_fn, cb_arg); 2804 } 2805 2806 void 2807 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2808 { 2809 BLOBFS_TRACE(file, "priority=%u\n", priority); 2810 file->priority = priority; 2811 2812 } 2813 2814 /* 2815 * Close routines 2816 */ 2817 2818 static void 2819 __file_close_async_done(void *ctx, int bserrno) 2820 { 2821 struct spdk_fs_request *req = ctx; 2822 struct spdk_fs_cb_args *args = &req->args; 2823 struct spdk_file *file = args->file; 2824 2825 spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name); 2826 2827 if (file->is_deleted) { 2828 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2829 return; 2830 } 2831 2832 args->fn.file_op(args->arg, bserrno); 2833 free_fs_request(req); 2834 } 2835 2836 static void 2837 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2838 { 2839 struct spdk_blob *blob; 2840 2841 pthread_spin_lock(&file->lock); 2842 if (file->ref_count == 0) { 2843 pthread_spin_unlock(&file->lock); 2844 __file_close_async_done(req, -EBADF); 2845 return; 2846 } 2847 2848 file->ref_count--; 2849 if (file->ref_count > 0) { 2850 pthread_spin_unlock(&file->lock); 2851 req->args.fn.file_op(req->args.arg, 0); 2852 free_fs_request(req); 2853 return; 2854 } 2855 2856 pthread_spin_unlock(&file->lock); 2857 2858 blob = file->blob; 2859 file->blob = NULL; 2860 spdk_blob_close(blob, __file_close_async_done, req); 2861 } 2862 2863 static void 2864 __file_close_async__sync_done(void *arg, int fserrno) 2865 { 2866 struct spdk_fs_request *req = arg; 2867 struct spdk_fs_cb_args *args = &req->args; 2868 2869 __file_close_async(args->file, req); 2870 } 2871 2872 void 2873 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2874 { 2875 struct spdk_fs_request *req; 2876 struct spdk_fs_cb_args *args; 2877 2878 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2879 if (req == NULL) { 2880 SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name); 2881 cb_fn(cb_arg, -ENOMEM); 2882 return; 2883 } 2884 2885 args = &req->args; 2886 args->file = file; 2887 args->fn.file_op = cb_fn; 2888 args->arg = cb_arg; 2889 2890 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2891 } 2892 2893 static void 2894 __file_close(void *arg) 2895 { 2896 struct spdk_fs_request *req = arg; 2897 struct spdk_fs_cb_args *args = &req->args; 2898 struct spdk_file *file = args->file; 2899 2900 __file_close_async(file, req); 2901 } 2902 2903 int 2904 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2905 { 2906 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2907 struct spdk_fs_request *req; 2908 struct spdk_fs_cb_args *args; 2909 2910 req = alloc_fs_request(channel); 2911 if (req == NULL) { 2912 SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name); 2913 return -ENOMEM; 2914 } 2915 2916 args = &req->args; 2917 2918 spdk_file_sync(file, ctx); 2919 BLOBFS_TRACE(file, "name=%s\n", file->name); 2920 args->file = file; 2921 args->sem = &channel->sem; 2922 args->fn.file_op = __wake_caller; 2923 args->arg = args; 2924 channel->send_request(__file_close, req); 2925 sem_wait(&channel->sem); 2926 2927 return args->rc; 2928 } 2929 2930 int 2931 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2932 { 2933 if (size < sizeof(spdk_blob_id)) { 2934 return -EINVAL; 2935 } 2936 2937 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2938 2939 return sizeof(spdk_blob_id); 2940 } 2941 2942 static void 2943 _file_free(void *ctx) 2944 { 2945 struct spdk_file *file = ctx; 2946 2947 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2948 2949 free(file->name); 2950 free(file->tree); 2951 free(file); 2952 } 2953 2954 static void 2955 file_free(struct spdk_file *file) 2956 { 2957 BLOBFS_TRACE(file, "free=%s\n", file->name); 2958 pthread_spin_lock(&file->lock); 2959 if (file->tree->present_mask == 0) { 2960 pthread_spin_unlock(&file->lock); 2961 free(file->name); 2962 free(file->tree); 2963 free(file); 2964 return; 2965 } 2966 2967 tree_free_buffers(file->tree); 2968 assert(file->tree->present_mask == 0); 2969 spdk_thread_send_msg(g_cache_pool_thread, _file_free, file); 2970 pthread_spin_unlock(&file->lock); 2971 } 2972 2973 SPDK_LOG_REGISTER_COMPONENT(blobfs) 2974 SPDK_LOG_REGISTER_COMPONENT(blobfs_rw) 2975