1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "tree.h" 38 39 #include "spdk/queue.h" 40 #include "spdk/thread.h" 41 #include "spdk/assert.h" 42 #include "spdk/env.h" 43 #include "spdk/util.h" 44 #include "spdk/log.h" 45 #include "spdk/trace.h" 46 47 #define BLOBFS_TRACE(file, str, args...) \ 48 SPDK_DEBUGLOG(blobfs, "file=%s " str, file->name, ##args) 49 50 #define BLOBFS_TRACE_RW(file, str, args...) \ 51 SPDK_DEBUGLOG(blobfs_rw, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 55 56 #define SPDK_BLOBFS_SIGNATURE "BLOBFS" 57 58 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 59 static struct spdk_mempool *g_cache_pool; 60 static TAILQ_HEAD(, spdk_file) g_caches = TAILQ_HEAD_INITIALIZER(g_caches); 61 static struct spdk_poller *g_cache_pool_mgmt_poller; 62 static struct spdk_thread *g_cache_pool_thread; 63 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL 64 static int g_fs_count = 0; 65 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 66 67 #define TRACE_GROUP_BLOBFS 0x7 68 #define TRACE_BLOBFS_XATTR_START SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0) 69 #define TRACE_BLOBFS_XATTR_END SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1) 70 #define TRACE_BLOBFS_OPEN SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2) 71 #define TRACE_BLOBFS_CLOSE SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3) 72 #define TRACE_BLOBFS_DELETE_START SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4) 73 #define TRACE_BLOBFS_DELETE_DONE SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5) 74 75 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS) 76 { 77 spdk_trace_register_description("BLOBFS_XATTR_START", 78 TRACE_BLOBFS_XATTR_START, 79 OWNER_NONE, OBJECT_NONE, 0, 80 SPDK_TRACE_ARG_TYPE_STR, 81 "file"); 82 spdk_trace_register_description("BLOBFS_XATTR_END", 83 TRACE_BLOBFS_XATTR_END, 84 OWNER_NONE, OBJECT_NONE, 0, 85 SPDK_TRACE_ARG_TYPE_STR, 86 "file"); 87 spdk_trace_register_description("BLOBFS_OPEN", 88 TRACE_BLOBFS_OPEN, 89 OWNER_NONE, OBJECT_NONE, 0, 90 SPDK_TRACE_ARG_TYPE_STR, 91 "file"); 92 spdk_trace_register_description("BLOBFS_CLOSE", 93 TRACE_BLOBFS_CLOSE, 94 OWNER_NONE, OBJECT_NONE, 0, 95 SPDK_TRACE_ARG_TYPE_STR, 96 "file"); 97 spdk_trace_register_description("BLOBFS_DELETE_START", 98 TRACE_BLOBFS_DELETE_START, 99 OWNER_NONE, OBJECT_NONE, 0, 100 SPDK_TRACE_ARG_TYPE_STR, 101 "file"); 102 spdk_trace_register_description("BLOBFS_DELETE_DONE", 103 TRACE_BLOBFS_DELETE_DONE, 104 OWNER_NONE, OBJECT_NONE, 0, 105 SPDK_TRACE_ARG_TYPE_STR, 106 "file"); 107 } 108 109 void 110 cache_buffer_free(struct cache_buffer *cache_buffer) 111 { 112 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 113 free(cache_buffer); 114 } 115 116 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 117 118 struct spdk_file { 119 struct spdk_filesystem *fs; 120 struct spdk_blob *blob; 121 char *name; 122 uint64_t trace_arg_name; 123 uint64_t length; 124 bool is_deleted; 125 bool open_for_writing; 126 uint64_t length_flushed; 127 uint64_t length_xattr; 128 uint64_t append_pos; 129 uint64_t seq_byte_count; 130 uint64_t next_seq_offset; 131 uint32_t priority; 132 TAILQ_ENTRY(spdk_file) tailq; 133 spdk_blob_id blobid; 134 uint32_t ref_count; 135 pthread_spinlock_t lock; 136 struct cache_buffer *last; 137 struct cache_tree *tree; 138 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 139 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 140 TAILQ_ENTRY(spdk_file) cache_tailq; 141 }; 142 143 struct spdk_deleted_file { 144 spdk_blob_id id; 145 TAILQ_ENTRY(spdk_deleted_file) tailq; 146 }; 147 148 struct spdk_filesystem { 149 struct spdk_blob_store *bs; 150 TAILQ_HEAD(, spdk_file) files; 151 struct spdk_bs_opts bs_opts; 152 struct spdk_bs_dev *bdev; 153 fs_send_request_fn send_request; 154 155 struct { 156 uint32_t max_ops; 157 struct spdk_io_channel *sync_io_channel; 158 struct spdk_fs_channel *sync_fs_channel; 159 } sync_target; 160 161 struct { 162 uint32_t max_ops; 163 struct spdk_io_channel *md_io_channel; 164 struct spdk_fs_channel *md_fs_channel; 165 } md_target; 166 167 struct { 168 uint32_t max_ops; 169 } io_target; 170 }; 171 172 struct spdk_fs_cb_args { 173 union { 174 spdk_fs_op_with_handle_complete fs_op_with_handle; 175 spdk_fs_op_complete fs_op; 176 spdk_file_op_with_handle_complete file_op_with_handle; 177 spdk_file_op_complete file_op; 178 spdk_file_stat_op_complete stat_op; 179 } fn; 180 void *arg; 181 sem_t *sem; 182 struct spdk_filesystem *fs; 183 struct spdk_file *file; 184 int rc; 185 int *rwerrno; 186 struct iovec *iovs; 187 uint32_t iovcnt; 188 struct iovec iov; 189 union { 190 struct { 191 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 192 } fs_load; 193 struct { 194 uint64_t length; 195 } truncate; 196 struct { 197 struct spdk_io_channel *channel; 198 void *pin_buf; 199 int is_read; 200 off_t offset; 201 size_t length; 202 uint64_t start_lba; 203 uint64_t num_lba; 204 uint32_t blocklen; 205 } rw; 206 struct { 207 const char *old_name; 208 const char *new_name; 209 } rename; 210 struct { 211 struct cache_buffer *cache_buffer; 212 uint64_t length; 213 } flush; 214 struct { 215 struct cache_buffer *cache_buffer; 216 uint64_t length; 217 uint64_t offset; 218 } readahead; 219 struct { 220 /* offset of the file when the sync request was made */ 221 uint64_t offset; 222 TAILQ_ENTRY(spdk_fs_request) tailq; 223 bool xattr_in_progress; 224 /* length written to the xattr for this file - this should 225 * always be the same as the offset if only one thread is 226 * writing to the file, but could differ if multiple threads 227 * are appending 228 */ 229 uint64_t length; 230 } sync; 231 struct { 232 uint32_t num_clusters; 233 } resize; 234 struct { 235 const char *name; 236 uint32_t flags; 237 TAILQ_ENTRY(spdk_fs_request) tailq; 238 } open; 239 struct { 240 const char *name; 241 struct spdk_blob *blob; 242 } create; 243 struct { 244 const char *name; 245 } delete; 246 struct { 247 const char *name; 248 } stat; 249 } op; 250 }; 251 252 static void file_free(struct spdk_file *file); 253 static void fs_io_device_unregister(struct spdk_filesystem *fs); 254 static void fs_free_io_channels(struct spdk_filesystem *fs); 255 256 void 257 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 258 { 259 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 260 } 261 262 static int _blobfs_cache_pool_reclaim(void *arg); 263 264 static bool 265 blobfs_cache_pool_need_reclaim(void) 266 { 267 size_t count; 268 269 count = spdk_mempool_count(g_cache_pool); 270 /* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller 271 * when the number of available cache buffer is less than 1/5 of total buffers. 272 */ 273 if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) { 274 return false; 275 } 276 277 return true; 278 } 279 280 static void 281 __start_cache_pool_mgmt(void *ctx) 282 { 283 assert(g_cache_pool == NULL); 284 285 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 286 g_fs_cache_size / CACHE_BUFFER_SIZE, 287 CACHE_BUFFER_SIZE, 288 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 289 SPDK_ENV_SOCKET_ID_ANY); 290 if (!g_cache_pool) { 291 SPDK_ERRLOG("Create mempool failed, you may " 292 "increase the memory and try again\n"); 293 assert(false); 294 } 295 296 assert(g_cache_pool_mgmt_poller == NULL); 297 g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL, 298 BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 299 } 300 301 static void 302 __stop_cache_pool_mgmt(void *ctx) 303 { 304 spdk_poller_unregister(&g_cache_pool_mgmt_poller); 305 306 assert(g_cache_pool != NULL); 307 assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE); 308 spdk_mempool_free(g_cache_pool); 309 g_cache_pool = NULL; 310 311 spdk_thread_exit(g_cache_pool_thread); 312 } 313 314 static void 315 initialize_global_cache(void) 316 { 317 pthread_mutex_lock(&g_cache_init_lock); 318 if (g_fs_count == 0) { 319 g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL); 320 assert(g_cache_pool_thread != NULL); 321 spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL); 322 } 323 g_fs_count++; 324 pthread_mutex_unlock(&g_cache_init_lock); 325 } 326 327 static void 328 free_global_cache(void) 329 { 330 pthread_mutex_lock(&g_cache_init_lock); 331 g_fs_count--; 332 if (g_fs_count == 0) { 333 spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL); 334 } 335 pthread_mutex_unlock(&g_cache_init_lock); 336 } 337 338 static uint64_t 339 __file_get_blob_size(struct spdk_file *file) 340 { 341 uint64_t cluster_sz; 342 343 cluster_sz = file->fs->bs_opts.cluster_sz; 344 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 345 } 346 347 struct spdk_fs_request { 348 struct spdk_fs_cb_args args; 349 TAILQ_ENTRY(spdk_fs_request) link; 350 struct spdk_fs_channel *channel; 351 }; 352 353 struct spdk_fs_channel { 354 struct spdk_fs_request *req_mem; 355 TAILQ_HEAD(, spdk_fs_request) reqs; 356 sem_t sem; 357 struct spdk_filesystem *fs; 358 struct spdk_io_channel *bs_channel; 359 fs_send_request_fn send_request; 360 bool sync; 361 uint32_t outstanding_reqs; 362 pthread_spinlock_t lock; 363 }; 364 365 /* For now, this is effectively an alias. But eventually we'll shift 366 * some data members over. */ 367 struct spdk_fs_thread_ctx { 368 struct spdk_fs_channel ch; 369 }; 370 371 static struct spdk_fs_request * 372 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt) 373 { 374 struct spdk_fs_request *req; 375 struct iovec *iovs = NULL; 376 377 if (iovcnt > 1) { 378 iovs = calloc(iovcnt, sizeof(struct iovec)); 379 if (!iovs) { 380 return NULL; 381 } 382 } 383 384 if (channel->sync) { 385 pthread_spin_lock(&channel->lock); 386 } 387 388 req = TAILQ_FIRST(&channel->reqs); 389 if (req) { 390 channel->outstanding_reqs++; 391 TAILQ_REMOVE(&channel->reqs, req, link); 392 } 393 394 if (channel->sync) { 395 pthread_spin_unlock(&channel->lock); 396 } 397 398 if (req == NULL) { 399 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 400 free(iovs); 401 return NULL; 402 } 403 memset(req, 0, sizeof(*req)); 404 req->channel = channel; 405 if (iovcnt > 1) { 406 req->args.iovs = iovs; 407 } else { 408 req->args.iovs = &req->args.iov; 409 } 410 req->args.iovcnt = iovcnt; 411 412 return req; 413 } 414 415 static struct spdk_fs_request * 416 alloc_fs_request(struct spdk_fs_channel *channel) 417 { 418 return alloc_fs_request_with_iov(channel, 0); 419 } 420 421 static void 422 free_fs_request(struct spdk_fs_request *req) 423 { 424 struct spdk_fs_channel *channel = req->channel; 425 426 if (req->args.iovcnt > 1) { 427 free(req->args.iovs); 428 } 429 430 if (channel->sync) { 431 pthread_spin_lock(&channel->lock); 432 } 433 434 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 435 channel->outstanding_reqs--; 436 437 if (channel->sync) { 438 pthread_spin_unlock(&channel->lock); 439 } 440 } 441 442 static int 443 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 444 uint32_t max_ops) 445 { 446 uint32_t i; 447 448 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 449 if (!channel->req_mem) { 450 return -1; 451 } 452 453 channel->outstanding_reqs = 0; 454 TAILQ_INIT(&channel->reqs); 455 sem_init(&channel->sem, 0, 0); 456 457 for (i = 0; i < max_ops; i++) { 458 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 459 } 460 461 channel->fs = fs; 462 463 return 0; 464 } 465 466 static int 467 fs_md_channel_create(void *io_device, void *ctx_buf) 468 { 469 struct spdk_filesystem *fs; 470 struct spdk_fs_channel *channel = ctx_buf; 471 472 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 473 474 return fs_channel_create(fs, channel, fs->md_target.max_ops); 475 } 476 477 static int 478 fs_sync_channel_create(void *io_device, void *ctx_buf) 479 { 480 struct spdk_filesystem *fs; 481 struct spdk_fs_channel *channel = ctx_buf; 482 483 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 484 485 return fs_channel_create(fs, channel, fs->sync_target.max_ops); 486 } 487 488 static int 489 fs_io_channel_create(void *io_device, void *ctx_buf) 490 { 491 struct spdk_filesystem *fs; 492 struct spdk_fs_channel *channel = ctx_buf; 493 494 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 495 496 return fs_channel_create(fs, channel, fs->io_target.max_ops); 497 } 498 499 static void 500 fs_channel_destroy(void *io_device, void *ctx_buf) 501 { 502 struct spdk_fs_channel *channel = ctx_buf; 503 504 if (channel->outstanding_reqs > 0) { 505 SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n", 506 channel->outstanding_reqs); 507 } 508 509 free(channel->req_mem); 510 if (channel->bs_channel != NULL) { 511 spdk_bs_free_io_channel(channel->bs_channel); 512 } 513 } 514 515 static void 516 __send_request_direct(fs_request_fn fn, void *arg) 517 { 518 fn(arg); 519 } 520 521 static void 522 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 523 { 524 fs->bs = bs; 525 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 526 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 527 fs->md_target.md_fs_channel->send_request = __send_request_direct; 528 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 529 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 530 531 initialize_global_cache(); 532 } 533 534 static void 535 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 536 { 537 struct spdk_fs_request *req = ctx; 538 struct spdk_fs_cb_args *args = &req->args; 539 struct spdk_filesystem *fs = args->fs; 540 541 if (bserrno == 0) { 542 common_fs_bs_init(fs, bs); 543 } else { 544 free(fs); 545 fs = NULL; 546 } 547 548 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 549 free_fs_request(req); 550 } 551 552 static struct spdk_filesystem * 553 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 554 { 555 struct spdk_filesystem *fs; 556 557 fs = calloc(1, sizeof(*fs)); 558 if (fs == NULL) { 559 return NULL; 560 } 561 562 fs->bdev = dev; 563 fs->send_request = send_request_fn; 564 TAILQ_INIT(&fs->files); 565 566 fs->md_target.max_ops = 512; 567 spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy, 568 sizeof(struct spdk_fs_channel), "blobfs_md"); 569 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 570 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 571 572 fs->sync_target.max_ops = 512; 573 spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy, 574 sizeof(struct spdk_fs_channel), "blobfs_sync"); 575 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 576 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 577 578 fs->io_target.max_ops = 512; 579 spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy, 580 sizeof(struct spdk_fs_channel), "blobfs_io"); 581 582 return fs; 583 } 584 585 static void 586 __wake_caller(void *arg, int fserrno) 587 { 588 struct spdk_fs_cb_args *args = arg; 589 590 if ((args->rwerrno != NULL) && (*(args->rwerrno) == 0) && fserrno) { 591 *(args->rwerrno) = fserrno; 592 } 593 args->rc = fserrno; 594 sem_post(args->sem); 595 } 596 597 void 598 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 599 fs_send_request_fn send_request_fn, 600 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 601 { 602 struct spdk_filesystem *fs; 603 struct spdk_fs_request *req; 604 struct spdk_fs_cb_args *args; 605 struct spdk_bs_opts opts = {}; 606 607 fs = fs_alloc(dev, send_request_fn); 608 if (fs == NULL) { 609 cb_fn(cb_arg, NULL, -ENOMEM); 610 return; 611 } 612 613 req = alloc_fs_request(fs->md_target.md_fs_channel); 614 if (req == NULL) { 615 fs_free_io_channels(fs); 616 fs_io_device_unregister(fs); 617 cb_fn(cb_arg, NULL, -ENOMEM); 618 return; 619 } 620 621 args = &req->args; 622 args->fn.fs_op_with_handle = cb_fn; 623 args->arg = cb_arg; 624 args->fs = fs; 625 626 spdk_bs_opts_init(&opts, sizeof(opts)); 627 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE); 628 if (opt) { 629 opts.cluster_sz = opt->cluster_sz; 630 } 631 spdk_bs_init(dev, &opts, init_cb, req); 632 } 633 634 static struct spdk_file * 635 file_alloc(struct spdk_filesystem *fs) 636 { 637 struct spdk_file *file; 638 639 file = calloc(1, sizeof(*file)); 640 if (file == NULL) { 641 return NULL; 642 } 643 644 file->tree = calloc(1, sizeof(*file->tree)); 645 if (file->tree == NULL) { 646 free(file); 647 return NULL; 648 } 649 650 if (pthread_spin_init(&file->lock, 0)) { 651 free(file->tree); 652 free(file); 653 return NULL; 654 } 655 656 file->fs = fs; 657 TAILQ_INIT(&file->open_requests); 658 TAILQ_INIT(&file->sync_requests); 659 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 660 file->priority = SPDK_FILE_PRIORITY_LOW; 661 return file; 662 } 663 664 static void fs_load_done(void *ctx, int bserrno); 665 666 static int 667 _handle_deleted_files(struct spdk_fs_request *req) 668 { 669 struct spdk_fs_cb_args *args = &req->args; 670 struct spdk_filesystem *fs = args->fs; 671 672 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 673 struct spdk_deleted_file *deleted_file; 674 675 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 676 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 677 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 678 free(deleted_file); 679 return 0; 680 } 681 682 return 1; 683 } 684 685 static void 686 fs_load_done(void *ctx, int bserrno) 687 { 688 struct spdk_fs_request *req = ctx; 689 struct spdk_fs_cb_args *args = &req->args; 690 struct spdk_filesystem *fs = args->fs; 691 692 /* The filesystem has been loaded. Now check if there are any files that 693 * were marked for deletion before last unload. Do not complete the 694 * fs_load callback until all of them have been deleted on disk. 695 */ 696 if (_handle_deleted_files(req) == 0) { 697 /* We found a file that's been marked for deleting but not actually 698 * deleted yet. This function will get called again once the delete 699 * operation is completed. 700 */ 701 return; 702 } 703 704 args->fn.fs_op_with_handle(args->arg, fs, 0); 705 free_fs_request(req); 706 707 } 708 709 static void 710 _file_build_trace_arg_name(struct spdk_file *f) 711 { 712 f->trace_arg_name = 0; 713 memcpy(&f->trace_arg_name, f->name, 714 spdk_min(sizeof(f->trace_arg_name), strlen(f->name))); 715 } 716 717 static void 718 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 719 { 720 struct spdk_fs_request *req = ctx; 721 struct spdk_fs_cb_args *args = &req->args; 722 struct spdk_filesystem *fs = args->fs; 723 uint64_t *length; 724 const char *name; 725 uint32_t *is_deleted; 726 size_t value_len; 727 728 if (rc < 0) { 729 args->fn.fs_op_with_handle(args->arg, fs, rc); 730 free_fs_request(req); 731 return; 732 } 733 734 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 735 if (rc < 0) { 736 args->fn.fs_op_with_handle(args->arg, fs, rc); 737 free_fs_request(req); 738 return; 739 } 740 741 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 742 if (rc < 0) { 743 args->fn.fs_op_with_handle(args->arg, fs, rc); 744 free_fs_request(req); 745 return; 746 } 747 748 assert(value_len == 8); 749 750 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 751 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 752 if (rc < 0) { 753 struct spdk_file *f; 754 755 f = file_alloc(fs); 756 if (f == NULL) { 757 SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n"); 758 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 759 free_fs_request(req); 760 return; 761 } 762 763 f->name = strdup(name); 764 _file_build_trace_arg_name(f); 765 f->blobid = spdk_blob_get_id(blob); 766 f->length = *length; 767 f->length_flushed = *length; 768 f->length_xattr = *length; 769 f->append_pos = *length; 770 SPDK_DEBUGLOG(blobfs, "added file %s length=%ju\n", f->name, f->length); 771 } else { 772 struct spdk_deleted_file *deleted_file; 773 774 deleted_file = calloc(1, sizeof(*deleted_file)); 775 if (deleted_file == NULL) { 776 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 777 free_fs_request(req); 778 return; 779 } 780 deleted_file->id = spdk_blob_get_id(blob); 781 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 782 } 783 } 784 785 static void 786 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 787 { 788 struct spdk_fs_request *req = ctx; 789 struct spdk_fs_cb_args *args = &req->args; 790 struct spdk_filesystem *fs = args->fs; 791 struct spdk_bs_type bstype; 792 static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE}; 793 static const struct spdk_bs_type zeros; 794 795 if (bserrno != 0) { 796 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 797 free_fs_request(req); 798 fs_free_io_channels(fs); 799 fs_io_device_unregister(fs); 800 return; 801 } 802 803 bstype = spdk_bs_get_bstype(bs); 804 805 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 806 SPDK_DEBUGLOG(blobfs, "assigning bstype\n"); 807 spdk_bs_set_bstype(bs, blobfs_type); 808 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 809 SPDK_ERRLOG("not blobfs\n"); 810 SPDK_LOGDUMP(blobfs, "bstype", &bstype, sizeof(bstype)); 811 args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL); 812 free_fs_request(req); 813 fs_free_io_channels(fs); 814 fs_io_device_unregister(fs); 815 return; 816 } 817 818 common_fs_bs_init(fs, bs); 819 fs_load_done(req, 0); 820 } 821 822 static void 823 fs_io_device_unregister(struct spdk_filesystem *fs) 824 { 825 assert(fs != NULL); 826 spdk_io_device_unregister(&fs->md_target, NULL); 827 spdk_io_device_unregister(&fs->sync_target, NULL); 828 spdk_io_device_unregister(&fs->io_target, NULL); 829 free(fs); 830 } 831 832 static void 833 fs_free_io_channels(struct spdk_filesystem *fs) 834 { 835 assert(fs != NULL); 836 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 837 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 838 } 839 840 void 841 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 842 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 843 { 844 struct spdk_filesystem *fs; 845 struct spdk_fs_cb_args *args; 846 struct spdk_fs_request *req; 847 struct spdk_bs_opts bs_opts; 848 849 fs = fs_alloc(dev, send_request_fn); 850 if (fs == NULL) { 851 cb_fn(cb_arg, NULL, -ENOMEM); 852 return; 853 } 854 855 req = alloc_fs_request(fs->md_target.md_fs_channel); 856 if (req == NULL) { 857 fs_free_io_channels(fs); 858 fs_io_device_unregister(fs); 859 cb_fn(cb_arg, NULL, -ENOMEM); 860 return; 861 } 862 863 args = &req->args; 864 args->fn.fs_op_with_handle = cb_fn; 865 args->arg = cb_arg; 866 args->fs = fs; 867 TAILQ_INIT(&args->op.fs_load.deleted_files); 868 spdk_bs_opts_init(&bs_opts, sizeof(bs_opts)); 869 bs_opts.iter_cb_fn = iter_cb; 870 bs_opts.iter_cb_arg = req; 871 spdk_bs_load(dev, &bs_opts, load_cb, req); 872 } 873 874 static void 875 unload_cb(void *ctx, int bserrno) 876 { 877 struct spdk_fs_request *req = ctx; 878 struct spdk_fs_cb_args *args = &req->args; 879 struct spdk_filesystem *fs = args->fs; 880 struct spdk_file *file, *tmp; 881 882 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 883 TAILQ_REMOVE(&fs->files, file, tailq); 884 file_free(file); 885 } 886 887 free_global_cache(); 888 889 args->fn.fs_op(args->arg, bserrno); 890 free(req); 891 892 fs_io_device_unregister(fs); 893 } 894 895 void 896 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 897 { 898 struct spdk_fs_request *req; 899 struct spdk_fs_cb_args *args; 900 901 /* 902 * We must free the md_channel before unloading the blobstore, so just 903 * allocate this request from the general heap. 904 */ 905 req = calloc(1, sizeof(*req)); 906 if (req == NULL) { 907 cb_fn(cb_arg, -ENOMEM); 908 return; 909 } 910 911 args = &req->args; 912 args->fn.fs_op = cb_fn; 913 args->arg = cb_arg; 914 args->fs = fs; 915 916 fs_free_io_channels(fs); 917 spdk_bs_unload(fs->bs, unload_cb, req); 918 } 919 920 static struct spdk_file * 921 fs_find_file(struct spdk_filesystem *fs, const char *name) 922 { 923 struct spdk_file *file; 924 925 TAILQ_FOREACH(file, &fs->files, tailq) { 926 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 927 return file; 928 } 929 } 930 931 return NULL; 932 } 933 934 void 935 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 936 spdk_file_stat_op_complete cb_fn, void *cb_arg) 937 { 938 struct spdk_file_stat stat; 939 struct spdk_file *f = NULL; 940 941 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 942 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 943 return; 944 } 945 946 f = fs_find_file(fs, name); 947 if (f != NULL) { 948 stat.blobid = f->blobid; 949 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 950 cb_fn(cb_arg, &stat, 0); 951 return; 952 } 953 954 cb_fn(cb_arg, NULL, -ENOENT); 955 } 956 957 static void 958 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 959 { 960 struct spdk_fs_request *req = arg; 961 struct spdk_fs_cb_args *args = &req->args; 962 963 args->rc = fserrno; 964 if (fserrno == 0) { 965 memcpy(args->arg, stat, sizeof(*stat)); 966 } 967 sem_post(args->sem); 968 } 969 970 static void 971 __file_stat(void *arg) 972 { 973 struct spdk_fs_request *req = arg; 974 struct spdk_fs_cb_args *args = &req->args; 975 976 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 977 args->fn.stat_op, req); 978 } 979 980 int 981 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 982 const char *name, struct spdk_file_stat *stat) 983 { 984 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 985 struct spdk_fs_request *req; 986 int rc; 987 988 req = alloc_fs_request(channel); 989 if (req == NULL) { 990 SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name); 991 return -ENOMEM; 992 } 993 994 req->args.fs = fs; 995 req->args.op.stat.name = name; 996 req->args.fn.stat_op = __copy_stat; 997 req->args.arg = stat; 998 req->args.sem = &channel->sem; 999 channel->send_request(__file_stat, req); 1000 sem_wait(&channel->sem); 1001 1002 rc = req->args.rc; 1003 free_fs_request(req); 1004 1005 return rc; 1006 } 1007 1008 static void 1009 fs_create_blob_close_cb(void *ctx, int bserrno) 1010 { 1011 int rc; 1012 struct spdk_fs_request *req = ctx; 1013 struct spdk_fs_cb_args *args = &req->args; 1014 1015 rc = args->rc ? args->rc : bserrno; 1016 args->fn.file_op(args->arg, rc); 1017 free_fs_request(req); 1018 } 1019 1020 static void 1021 fs_create_blob_resize_cb(void *ctx, int bserrno) 1022 { 1023 struct spdk_fs_request *req = ctx; 1024 struct spdk_fs_cb_args *args = &req->args; 1025 struct spdk_file *f = args->file; 1026 struct spdk_blob *blob = args->op.create.blob; 1027 uint64_t length = 0; 1028 1029 args->rc = bserrno; 1030 if (bserrno) { 1031 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1032 return; 1033 } 1034 1035 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 1036 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 1037 1038 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1039 } 1040 1041 static void 1042 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1043 { 1044 struct spdk_fs_request *req = ctx; 1045 struct spdk_fs_cb_args *args = &req->args; 1046 1047 if (bserrno) { 1048 args->fn.file_op(args->arg, bserrno); 1049 free_fs_request(req); 1050 return; 1051 } 1052 1053 args->op.create.blob = blob; 1054 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 1055 } 1056 1057 static void 1058 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 1059 { 1060 struct spdk_fs_request *req = ctx; 1061 struct spdk_fs_cb_args *args = &req->args; 1062 struct spdk_file *f = args->file; 1063 1064 if (bserrno) { 1065 args->fn.file_op(args->arg, bserrno); 1066 free_fs_request(req); 1067 return; 1068 } 1069 1070 f->blobid = blobid; 1071 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 1072 } 1073 1074 void 1075 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 1076 spdk_file_op_complete cb_fn, void *cb_arg) 1077 { 1078 struct spdk_file *file; 1079 struct spdk_fs_request *req; 1080 struct spdk_fs_cb_args *args; 1081 1082 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1083 cb_fn(cb_arg, -ENAMETOOLONG); 1084 return; 1085 } 1086 1087 file = fs_find_file(fs, name); 1088 if (file != NULL) { 1089 cb_fn(cb_arg, -EEXIST); 1090 return; 1091 } 1092 1093 file = file_alloc(fs); 1094 if (file == NULL) { 1095 SPDK_ERRLOG("Cannot allocate new file for creation\n"); 1096 cb_fn(cb_arg, -ENOMEM); 1097 return; 1098 } 1099 1100 req = alloc_fs_request(fs->md_target.md_fs_channel); 1101 if (req == NULL) { 1102 SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name); 1103 TAILQ_REMOVE(&fs->files, file, tailq); 1104 file_free(file); 1105 cb_fn(cb_arg, -ENOMEM); 1106 return; 1107 } 1108 1109 args = &req->args; 1110 args->file = file; 1111 args->fn.file_op = cb_fn; 1112 args->arg = cb_arg; 1113 1114 file->name = strdup(name); 1115 if (!file->name) { 1116 SPDK_ERRLOG("Cannot allocate file->name for file=%s\n", name); 1117 free_fs_request(req); 1118 TAILQ_REMOVE(&fs->files, file, tailq); 1119 file_free(file); 1120 cb_fn(cb_arg, -ENOMEM); 1121 return; 1122 } 1123 _file_build_trace_arg_name(file); 1124 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 1125 } 1126 1127 static void 1128 __fs_create_file_done(void *arg, int fserrno) 1129 { 1130 struct spdk_fs_request *req = arg; 1131 struct spdk_fs_cb_args *args = &req->args; 1132 1133 __wake_caller(args, fserrno); 1134 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name); 1135 } 1136 1137 static void 1138 __fs_create_file(void *arg) 1139 { 1140 struct spdk_fs_request *req = arg; 1141 struct spdk_fs_cb_args *args = &req->args; 1142 1143 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name); 1144 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1145 } 1146 1147 int 1148 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name) 1149 { 1150 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1151 struct spdk_fs_request *req; 1152 struct spdk_fs_cb_args *args; 1153 int rc; 1154 1155 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1156 1157 req = alloc_fs_request(channel); 1158 if (req == NULL) { 1159 SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name); 1160 return -ENOMEM; 1161 } 1162 1163 args = &req->args; 1164 args->fs = fs; 1165 args->op.create.name = name; 1166 args->sem = &channel->sem; 1167 fs->send_request(__fs_create_file, req); 1168 sem_wait(&channel->sem); 1169 rc = args->rc; 1170 free_fs_request(req); 1171 1172 return rc; 1173 } 1174 1175 static void 1176 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1177 { 1178 struct spdk_fs_request *req = ctx; 1179 struct spdk_fs_cb_args *args = &req->args; 1180 struct spdk_file *f = args->file; 1181 1182 f->blob = blob; 1183 while (!TAILQ_EMPTY(&f->open_requests)) { 1184 req = TAILQ_FIRST(&f->open_requests); 1185 args = &req->args; 1186 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1187 spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name); 1188 args->fn.file_op_with_handle(args->arg, f, bserrno); 1189 free_fs_request(req); 1190 } 1191 } 1192 1193 static void 1194 fs_open_blob_create_cb(void *ctx, int bserrno) 1195 { 1196 struct spdk_fs_request *req = ctx; 1197 struct spdk_fs_cb_args *args = &req->args; 1198 struct spdk_file *file = args->file; 1199 struct spdk_filesystem *fs = args->fs; 1200 1201 if (file == NULL) { 1202 /* 1203 * This is from an open with CREATE flag - the file 1204 * is now created so look it up in the file list for this 1205 * filesystem. 1206 */ 1207 file = fs_find_file(fs, args->op.open.name); 1208 assert(file != NULL); 1209 args->file = file; 1210 } 1211 1212 file->ref_count++; 1213 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1214 if (file->ref_count == 1) { 1215 assert(file->blob == NULL); 1216 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1217 } else if (file->blob != NULL) { 1218 fs_open_blob_done(req, file->blob, 0); 1219 } else { 1220 /* 1221 * The blob open for this file is in progress due to a previous 1222 * open request. When that open completes, it will invoke the 1223 * open callback for this request. 1224 */ 1225 } 1226 } 1227 1228 void 1229 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1230 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1231 { 1232 struct spdk_file *f = NULL; 1233 struct spdk_fs_request *req; 1234 struct spdk_fs_cb_args *args; 1235 1236 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1237 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1238 return; 1239 } 1240 1241 f = fs_find_file(fs, name); 1242 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1243 cb_fn(cb_arg, NULL, -ENOENT); 1244 return; 1245 } 1246 1247 if (f != NULL && f->is_deleted == true) { 1248 cb_fn(cb_arg, NULL, -ENOENT); 1249 return; 1250 } 1251 1252 req = alloc_fs_request(fs->md_target.md_fs_channel); 1253 if (req == NULL) { 1254 SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name); 1255 cb_fn(cb_arg, NULL, -ENOMEM); 1256 return; 1257 } 1258 1259 args = &req->args; 1260 args->fn.file_op_with_handle = cb_fn; 1261 args->arg = cb_arg; 1262 args->file = f; 1263 args->fs = fs; 1264 args->op.open.name = name; 1265 1266 if (f == NULL) { 1267 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1268 } else { 1269 fs_open_blob_create_cb(req, 0); 1270 } 1271 } 1272 1273 static void 1274 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1275 { 1276 struct spdk_fs_request *req = arg; 1277 struct spdk_fs_cb_args *args = &req->args; 1278 1279 args->file = file; 1280 __wake_caller(args, bserrno); 1281 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name); 1282 } 1283 1284 static void 1285 __fs_open_file(void *arg) 1286 { 1287 struct spdk_fs_request *req = arg; 1288 struct spdk_fs_cb_args *args = &req->args; 1289 1290 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name); 1291 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1292 __fs_open_file_done, req); 1293 } 1294 1295 int 1296 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1297 const char *name, uint32_t flags, struct spdk_file **file) 1298 { 1299 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1300 struct spdk_fs_request *req; 1301 struct spdk_fs_cb_args *args; 1302 int rc; 1303 1304 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1305 1306 req = alloc_fs_request(channel); 1307 if (req == NULL) { 1308 SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name); 1309 return -ENOMEM; 1310 } 1311 1312 args = &req->args; 1313 args->fs = fs; 1314 args->op.open.name = name; 1315 args->op.open.flags = flags; 1316 args->sem = &channel->sem; 1317 fs->send_request(__fs_open_file, req); 1318 sem_wait(&channel->sem); 1319 rc = args->rc; 1320 if (rc == 0) { 1321 *file = args->file; 1322 } else { 1323 *file = NULL; 1324 } 1325 free_fs_request(req); 1326 1327 return rc; 1328 } 1329 1330 static void 1331 fs_rename_blob_close_cb(void *ctx, int bserrno) 1332 { 1333 struct spdk_fs_request *req = ctx; 1334 struct spdk_fs_cb_args *args = &req->args; 1335 1336 args->fn.fs_op(args->arg, bserrno); 1337 free_fs_request(req); 1338 } 1339 1340 static void 1341 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1342 { 1343 struct spdk_fs_request *req = ctx; 1344 struct spdk_fs_cb_args *args = &req->args; 1345 const char *new_name = args->op.rename.new_name; 1346 1347 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1348 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1349 } 1350 1351 static void 1352 _fs_md_rename_file(struct spdk_fs_request *req) 1353 { 1354 struct spdk_fs_cb_args *args = &req->args; 1355 struct spdk_file *f; 1356 1357 f = fs_find_file(args->fs, args->op.rename.old_name); 1358 if (f == NULL) { 1359 args->fn.fs_op(args->arg, -ENOENT); 1360 free_fs_request(req); 1361 return; 1362 } 1363 1364 free(f->name); 1365 f->name = strdup(args->op.rename.new_name); 1366 _file_build_trace_arg_name(f); 1367 args->file = f; 1368 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1369 } 1370 1371 static void 1372 fs_rename_delete_done(void *arg, int fserrno) 1373 { 1374 _fs_md_rename_file(arg); 1375 } 1376 1377 void 1378 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1379 const char *old_name, const char *new_name, 1380 spdk_file_op_complete cb_fn, void *cb_arg) 1381 { 1382 struct spdk_file *f; 1383 struct spdk_fs_request *req; 1384 struct spdk_fs_cb_args *args; 1385 1386 SPDK_DEBUGLOG(blobfs, "old=%s new=%s\n", old_name, new_name); 1387 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1388 cb_fn(cb_arg, -ENAMETOOLONG); 1389 return; 1390 } 1391 1392 req = alloc_fs_request(fs->md_target.md_fs_channel); 1393 if (req == NULL) { 1394 SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name, 1395 new_name); 1396 cb_fn(cb_arg, -ENOMEM); 1397 return; 1398 } 1399 1400 args = &req->args; 1401 args->fn.fs_op = cb_fn; 1402 args->fs = fs; 1403 args->arg = cb_arg; 1404 args->op.rename.old_name = old_name; 1405 args->op.rename.new_name = new_name; 1406 1407 f = fs_find_file(fs, new_name); 1408 if (f == NULL) { 1409 _fs_md_rename_file(req); 1410 return; 1411 } 1412 1413 /* 1414 * The rename overwrites an existing file. So delete the existing file, then 1415 * do the actual rename. 1416 */ 1417 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1418 } 1419 1420 static void 1421 __fs_rename_file_done(void *arg, int fserrno) 1422 { 1423 struct spdk_fs_request *req = arg; 1424 struct spdk_fs_cb_args *args = &req->args; 1425 1426 __wake_caller(args, fserrno); 1427 } 1428 1429 static void 1430 __fs_rename_file(void *arg) 1431 { 1432 struct spdk_fs_request *req = arg; 1433 struct spdk_fs_cb_args *args = &req->args; 1434 1435 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1436 __fs_rename_file_done, req); 1437 } 1438 1439 int 1440 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1441 const char *old_name, const char *new_name) 1442 { 1443 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1444 struct spdk_fs_request *req; 1445 struct spdk_fs_cb_args *args; 1446 int rc; 1447 1448 req = alloc_fs_request(channel); 1449 if (req == NULL) { 1450 SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name); 1451 return -ENOMEM; 1452 } 1453 1454 args = &req->args; 1455 1456 args->fs = fs; 1457 args->op.rename.old_name = old_name; 1458 args->op.rename.new_name = new_name; 1459 args->sem = &channel->sem; 1460 fs->send_request(__fs_rename_file, req); 1461 sem_wait(&channel->sem); 1462 rc = args->rc; 1463 free_fs_request(req); 1464 return rc; 1465 } 1466 1467 static void 1468 blob_delete_cb(void *ctx, int bserrno) 1469 { 1470 struct spdk_fs_request *req = ctx; 1471 struct spdk_fs_cb_args *args = &req->args; 1472 1473 args->fn.file_op(args->arg, bserrno); 1474 free_fs_request(req); 1475 } 1476 1477 void 1478 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1479 spdk_file_op_complete cb_fn, void *cb_arg) 1480 { 1481 struct spdk_file *f; 1482 spdk_blob_id blobid; 1483 struct spdk_fs_request *req; 1484 struct spdk_fs_cb_args *args; 1485 1486 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1487 1488 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1489 cb_fn(cb_arg, -ENAMETOOLONG); 1490 return; 1491 } 1492 1493 f = fs_find_file(fs, name); 1494 if (f == NULL) { 1495 SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name); 1496 cb_fn(cb_arg, -ENOENT); 1497 return; 1498 } 1499 1500 req = alloc_fs_request(fs->md_target.md_fs_channel); 1501 if (req == NULL) { 1502 SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name); 1503 cb_fn(cb_arg, -ENOMEM); 1504 return; 1505 } 1506 1507 args = &req->args; 1508 args->fn.file_op = cb_fn; 1509 args->arg = cb_arg; 1510 1511 if (f->ref_count > 0) { 1512 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1513 f->is_deleted = true; 1514 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1515 spdk_blob_sync_md(f->blob, blob_delete_cb, req); 1516 return; 1517 } 1518 1519 blobid = f->blobid; 1520 TAILQ_REMOVE(&fs->files, f, tailq); 1521 1522 file_free(f); 1523 1524 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1525 } 1526 1527 static uint64_t 1528 fs_name_to_uint64(const char *name) 1529 { 1530 uint64_t result = 0; 1531 memcpy(&result, name, spdk_min(sizeof(result), strlen(name))); 1532 return result; 1533 } 1534 1535 static void 1536 __fs_delete_file_done(void *arg, int fserrno) 1537 { 1538 struct spdk_fs_request *req = arg; 1539 struct spdk_fs_cb_args *args = &req->args; 1540 1541 spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, fs_name_to_uint64(args->op.delete.name)); 1542 __wake_caller(args, fserrno); 1543 } 1544 1545 static void 1546 __fs_delete_file(void *arg) 1547 { 1548 struct spdk_fs_request *req = arg; 1549 struct spdk_fs_cb_args *args = &req->args; 1550 1551 spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, fs_name_to_uint64(args->op.delete.name)); 1552 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1553 } 1554 1555 int 1556 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1557 const char *name) 1558 { 1559 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1560 struct spdk_fs_request *req; 1561 struct spdk_fs_cb_args *args; 1562 int rc; 1563 1564 req = alloc_fs_request(channel); 1565 if (req == NULL) { 1566 SPDK_DEBUGLOG(blobfs, "Cannot allocate req to delete file=%s\n", name); 1567 return -ENOMEM; 1568 } 1569 1570 args = &req->args; 1571 args->fs = fs; 1572 args->op.delete.name = name; 1573 args->sem = &channel->sem; 1574 fs->send_request(__fs_delete_file, req); 1575 sem_wait(&channel->sem); 1576 rc = args->rc; 1577 free_fs_request(req); 1578 1579 return rc; 1580 } 1581 1582 spdk_fs_iter 1583 spdk_fs_iter_first(struct spdk_filesystem *fs) 1584 { 1585 struct spdk_file *f; 1586 1587 f = TAILQ_FIRST(&fs->files); 1588 return f; 1589 } 1590 1591 spdk_fs_iter 1592 spdk_fs_iter_next(spdk_fs_iter iter) 1593 { 1594 struct spdk_file *f = iter; 1595 1596 if (f == NULL) { 1597 return NULL; 1598 } 1599 1600 f = TAILQ_NEXT(f, tailq); 1601 return f; 1602 } 1603 1604 const char * 1605 spdk_file_get_name(struct spdk_file *file) 1606 { 1607 return file->name; 1608 } 1609 1610 uint64_t 1611 spdk_file_get_length(struct spdk_file *file) 1612 { 1613 uint64_t length; 1614 1615 assert(file != NULL); 1616 1617 length = file->append_pos >= file->length ? file->append_pos : file->length; 1618 SPDK_DEBUGLOG(blobfs, "file=%s length=0x%jx\n", file->name, length); 1619 return length; 1620 } 1621 1622 static void 1623 fs_truncate_complete_cb(void *ctx, int bserrno) 1624 { 1625 struct spdk_fs_request *req = ctx; 1626 struct spdk_fs_cb_args *args = &req->args; 1627 1628 args->fn.file_op(args->arg, bserrno); 1629 free_fs_request(req); 1630 } 1631 1632 static void 1633 fs_truncate_resize_cb(void *ctx, int bserrno) 1634 { 1635 struct spdk_fs_request *req = ctx; 1636 struct spdk_fs_cb_args *args = &req->args; 1637 struct spdk_file *file = args->file; 1638 uint64_t *length = &args->op.truncate.length; 1639 1640 if (bserrno) { 1641 args->fn.file_op(args->arg, bserrno); 1642 free_fs_request(req); 1643 return; 1644 } 1645 1646 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1647 1648 file->length = *length; 1649 if (file->append_pos > file->length) { 1650 file->append_pos = file->length; 1651 } 1652 1653 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req); 1654 } 1655 1656 static uint64_t 1657 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1658 { 1659 return (length + cluster_sz - 1) / cluster_sz; 1660 } 1661 1662 void 1663 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1664 spdk_file_op_complete cb_fn, void *cb_arg) 1665 { 1666 struct spdk_filesystem *fs; 1667 size_t num_clusters; 1668 struct spdk_fs_request *req; 1669 struct spdk_fs_cb_args *args; 1670 1671 SPDK_DEBUGLOG(blobfs, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1672 if (length == file->length) { 1673 cb_fn(cb_arg, 0); 1674 return; 1675 } 1676 1677 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1678 if (req == NULL) { 1679 cb_fn(cb_arg, -ENOMEM); 1680 return; 1681 } 1682 1683 args = &req->args; 1684 args->fn.file_op = cb_fn; 1685 args->arg = cb_arg; 1686 args->file = file; 1687 args->op.truncate.length = length; 1688 fs = file->fs; 1689 1690 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1691 1692 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1693 } 1694 1695 static void 1696 __truncate(void *arg) 1697 { 1698 struct spdk_fs_request *req = arg; 1699 struct spdk_fs_cb_args *args = &req->args; 1700 1701 spdk_file_truncate_async(args->file, args->op.truncate.length, 1702 args->fn.file_op, args); 1703 } 1704 1705 int 1706 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 1707 uint64_t length) 1708 { 1709 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1710 struct spdk_fs_request *req; 1711 struct spdk_fs_cb_args *args; 1712 int rc; 1713 1714 req = alloc_fs_request(channel); 1715 if (req == NULL) { 1716 return -ENOMEM; 1717 } 1718 1719 args = &req->args; 1720 1721 args->file = file; 1722 args->op.truncate.length = length; 1723 args->fn.file_op = __wake_caller; 1724 args->sem = &channel->sem; 1725 1726 channel->send_request(__truncate, req); 1727 sem_wait(&channel->sem); 1728 rc = args->rc; 1729 free_fs_request(req); 1730 1731 return rc; 1732 } 1733 1734 static void 1735 __rw_done(void *ctx, int bserrno) 1736 { 1737 struct spdk_fs_request *req = ctx; 1738 struct spdk_fs_cb_args *args = &req->args; 1739 1740 spdk_free(args->op.rw.pin_buf); 1741 args->fn.file_op(args->arg, bserrno); 1742 free_fs_request(req); 1743 } 1744 1745 static void 1746 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 1747 { 1748 int i; 1749 size_t len; 1750 1751 for (i = 0; i < iovcnt; i++) { 1752 len = spdk_min(iovs[i].iov_len, buf_len); 1753 memcpy(buf, iovs[i].iov_base, len); 1754 buf += len; 1755 assert(buf_len >= len); 1756 buf_len -= len; 1757 } 1758 } 1759 1760 static void 1761 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 1762 { 1763 int i; 1764 size_t len; 1765 1766 for (i = 0; i < iovcnt; i++) { 1767 len = spdk_min(iovs[i].iov_len, buf_len); 1768 memcpy(iovs[i].iov_base, buf, len); 1769 buf += len; 1770 assert(buf_len >= len); 1771 buf_len -= len; 1772 } 1773 } 1774 1775 static void 1776 __read_done(void *ctx, int bserrno) 1777 { 1778 struct spdk_fs_request *req = ctx; 1779 struct spdk_fs_cb_args *args = &req->args; 1780 void *buf; 1781 1782 assert(req != NULL); 1783 buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1))); 1784 if (args->op.rw.is_read) { 1785 _copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length); 1786 __rw_done(req, 0); 1787 } else { 1788 _copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt); 1789 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1790 args->op.rw.pin_buf, 1791 args->op.rw.start_lba, args->op.rw.num_lba, 1792 __rw_done, req); 1793 } 1794 } 1795 1796 static void 1797 __do_blob_read(void *ctx, int fserrno) 1798 { 1799 struct spdk_fs_request *req = ctx; 1800 struct spdk_fs_cb_args *args = &req->args; 1801 1802 if (fserrno) { 1803 __rw_done(req, fserrno); 1804 return; 1805 } 1806 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1807 args->op.rw.pin_buf, 1808 args->op.rw.start_lba, args->op.rw.num_lba, 1809 __read_done, req); 1810 } 1811 1812 static void 1813 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1814 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1815 { 1816 uint64_t end_lba; 1817 1818 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1819 *start_lba = offset / *lba_size; 1820 end_lba = (offset + length - 1) / *lba_size; 1821 *num_lba = (end_lba - *start_lba + 1); 1822 } 1823 1824 static bool 1825 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length) 1826 { 1827 uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1828 1829 if ((offset % lba_size == 0) && (length % lba_size == 0)) { 1830 return true; 1831 } 1832 1833 return false; 1834 } 1835 1836 static void 1837 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt) 1838 { 1839 uint32_t i; 1840 1841 for (i = 0; i < iovcnt; i++) { 1842 req->args.iovs[i].iov_base = iovs[i].iov_base; 1843 req->args.iovs[i].iov_len = iovs[i].iov_len; 1844 } 1845 } 1846 1847 static void 1848 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel, 1849 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1850 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1851 { 1852 struct spdk_fs_request *req; 1853 struct spdk_fs_cb_args *args; 1854 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1855 uint64_t start_lba, num_lba, pin_buf_length; 1856 uint32_t lba_size; 1857 1858 if (is_read && offset + length > file->length) { 1859 cb_fn(cb_arg, -EINVAL); 1860 return; 1861 } 1862 1863 req = alloc_fs_request_with_iov(channel, iovcnt); 1864 if (req == NULL) { 1865 cb_fn(cb_arg, -ENOMEM); 1866 return; 1867 } 1868 1869 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1870 1871 args = &req->args; 1872 args->fn.file_op = cb_fn; 1873 args->arg = cb_arg; 1874 args->file = file; 1875 args->op.rw.channel = channel->bs_channel; 1876 _fs_request_setup_iovs(req, iovs, iovcnt); 1877 args->op.rw.is_read = is_read; 1878 args->op.rw.offset = offset; 1879 args->op.rw.blocklen = lba_size; 1880 1881 pin_buf_length = num_lba * lba_size; 1882 args->op.rw.length = pin_buf_length; 1883 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL, 1884 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1885 if (args->op.rw.pin_buf == NULL) { 1886 SPDK_DEBUGLOG(blobfs, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1887 file->name, offset, length); 1888 free_fs_request(req); 1889 cb_fn(cb_arg, -ENOMEM); 1890 return; 1891 } 1892 1893 args->op.rw.start_lba = start_lba; 1894 args->op.rw.num_lba = num_lba; 1895 1896 if (!is_read && file->length < offset + length) { 1897 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1898 } else if (!is_read && __is_lba_aligned(file, offset, length)) { 1899 _copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt); 1900 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1901 args->op.rw.pin_buf, 1902 args->op.rw.start_lba, args->op.rw.num_lba, 1903 __rw_done, req); 1904 } else { 1905 __do_blob_read(req, 0); 1906 } 1907 } 1908 1909 static void 1910 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel, 1911 void *payload, uint64_t offset, uint64_t length, 1912 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1913 { 1914 struct iovec iov; 1915 1916 iov.iov_base = payload; 1917 iov.iov_len = (size_t)length; 1918 1919 __readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read); 1920 } 1921 1922 void 1923 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1924 void *payload, uint64_t offset, uint64_t length, 1925 spdk_file_op_complete cb_fn, void *cb_arg) 1926 { 1927 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1928 } 1929 1930 void 1931 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel, 1932 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1933 spdk_file_op_complete cb_fn, void *cb_arg) 1934 { 1935 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1936 file->name, offset, length); 1937 1938 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0); 1939 } 1940 1941 void 1942 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1943 void *payload, uint64_t offset, uint64_t length, 1944 spdk_file_op_complete cb_fn, void *cb_arg) 1945 { 1946 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1947 file->name, offset, length); 1948 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1949 } 1950 1951 void 1952 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel, 1953 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1954 spdk_file_op_complete cb_fn, void *cb_arg) 1955 { 1956 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1957 file->name, offset, length); 1958 1959 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1); 1960 } 1961 1962 struct spdk_io_channel * 1963 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1964 { 1965 struct spdk_io_channel *io_channel; 1966 struct spdk_fs_channel *fs_channel; 1967 1968 io_channel = spdk_get_io_channel(&fs->io_target); 1969 fs_channel = spdk_io_channel_get_ctx(io_channel); 1970 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1971 fs_channel->send_request = __send_request_direct; 1972 1973 return io_channel; 1974 } 1975 1976 void 1977 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1978 { 1979 spdk_put_io_channel(channel); 1980 } 1981 1982 struct spdk_fs_thread_ctx * 1983 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs) 1984 { 1985 struct spdk_fs_thread_ctx *ctx; 1986 1987 ctx = calloc(1, sizeof(*ctx)); 1988 if (!ctx) { 1989 return NULL; 1990 } 1991 1992 if (pthread_spin_init(&ctx->ch.lock, 0)) { 1993 free(ctx); 1994 return NULL; 1995 } 1996 1997 fs_channel_create(fs, &ctx->ch, 512); 1998 1999 ctx->ch.send_request = fs->send_request; 2000 ctx->ch.sync = 1; 2001 2002 return ctx; 2003 } 2004 2005 2006 void 2007 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx) 2008 { 2009 assert(ctx->ch.sync == 1); 2010 2011 while (true) { 2012 pthread_spin_lock(&ctx->ch.lock); 2013 if (ctx->ch.outstanding_reqs == 0) { 2014 pthread_spin_unlock(&ctx->ch.lock); 2015 break; 2016 } 2017 pthread_spin_unlock(&ctx->ch.lock); 2018 usleep(1000); 2019 } 2020 2021 fs_channel_destroy(NULL, &ctx->ch); 2022 free(ctx); 2023 } 2024 2025 int 2026 spdk_fs_set_cache_size(uint64_t size_in_mb) 2027 { 2028 /* setting g_fs_cache_size is only permitted if cache pool 2029 * is already freed or hasn't been initialized 2030 */ 2031 if (g_cache_pool != NULL) { 2032 return -EPERM; 2033 } 2034 2035 g_fs_cache_size = size_in_mb * 1024 * 1024; 2036 2037 return 0; 2038 } 2039 2040 uint64_t 2041 spdk_fs_get_cache_size(void) 2042 { 2043 return g_fs_cache_size / (1024 * 1024); 2044 } 2045 2046 static void __file_flush(void *ctx); 2047 2048 /* Try to free some cache buffers from this file. 2049 */ 2050 static int 2051 reclaim_cache_buffers(struct spdk_file *file) 2052 { 2053 int rc; 2054 2055 BLOBFS_TRACE(file, "free=%s\n", file->name); 2056 2057 /* The function is safe to be called with any threads, while the file 2058 * lock maybe locked by other thread for now, so try to get the file 2059 * lock here. 2060 */ 2061 rc = pthread_spin_trylock(&file->lock); 2062 if (rc != 0) { 2063 return -1; 2064 } 2065 2066 if (file->tree->present_mask == 0) { 2067 pthread_spin_unlock(&file->lock); 2068 return -1; 2069 } 2070 tree_free_buffers(file->tree); 2071 2072 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2073 /* If not freed, put it in the end of the queue */ 2074 if (file->tree->present_mask != 0) { 2075 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2076 } else { 2077 file->last = NULL; 2078 } 2079 pthread_spin_unlock(&file->lock); 2080 2081 return 0; 2082 } 2083 2084 static int 2085 _blobfs_cache_pool_reclaim(void *arg) 2086 { 2087 struct spdk_file *file, *tmp; 2088 int rc; 2089 2090 if (!blobfs_cache_pool_need_reclaim()) { 2091 return SPDK_POLLER_IDLE; 2092 } 2093 2094 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2095 if (!file->open_for_writing && 2096 file->priority == SPDK_FILE_PRIORITY_LOW) { 2097 rc = reclaim_cache_buffers(file); 2098 if (rc < 0) { 2099 continue; 2100 } 2101 if (!blobfs_cache_pool_need_reclaim()) { 2102 return SPDK_POLLER_BUSY; 2103 } 2104 break; 2105 } 2106 } 2107 2108 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2109 if (!file->open_for_writing) { 2110 rc = reclaim_cache_buffers(file); 2111 if (rc < 0) { 2112 continue; 2113 } 2114 if (!blobfs_cache_pool_need_reclaim()) { 2115 return SPDK_POLLER_BUSY; 2116 } 2117 break; 2118 } 2119 } 2120 2121 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2122 rc = reclaim_cache_buffers(file); 2123 if (rc < 0) { 2124 continue; 2125 } 2126 break; 2127 } 2128 2129 return SPDK_POLLER_BUSY; 2130 } 2131 2132 static void 2133 _add_file_to_cache_pool(void *ctx) 2134 { 2135 struct spdk_file *file = ctx; 2136 2137 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2138 } 2139 2140 static void 2141 _remove_file_from_cache_pool(void *ctx) 2142 { 2143 struct spdk_file *file = ctx; 2144 2145 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2146 } 2147 2148 static struct cache_buffer * 2149 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 2150 { 2151 struct cache_buffer *buf; 2152 int count = 0; 2153 bool need_update = false; 2154 2155 buf = calloc(1, sizeof(*buf)); 2156 if (buf == NULL) { 2157 SPDK_DEBUGLOG(blobfs, "calloc failed\n"); 2158 return NULL; 2159 } 2160 2161 do { 2162 buf->buf = spdk_mempool_get(g_cache_pool); 2163 if (buf->buf) { 2164 break; 2165 } 2166 if (count++ == 100) { 2167 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 2168 file, offset); 2169 free(buf); 2170 return NULL; 2171 } 2172 usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 2173 } while (true); 2174 2175 buf->buf_size = CACHE_BUFFER_SIZE; 2176 buf->offset = offset; 2177 2178 if (file->tree->present_mask == 0) { 2179 need_update = true; 2180 } 2181 file->tree = tree_insert_buffer(file->tree, buf); 2182 2183 if (need_update) { 2184 spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file); 2185 } 2186 2187 return buf; 2188 } 2189 2190 static struct cache_buffer * 2191 cache_append_buffer(struct spdk_file *file) 2192 { 2193 struct cache_buffer *last; 2194 2195 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 2196 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 2197 2198 last = cache_insert_buffer(file, file->append_pos); 2199 if (last == NULL) { 2200 SPDK_DEBUGLOG(blobfs, "cache_insert_buffer failed\n"); 2201 return NULL; 2202 } 2203 2204 file->last = last; 2205 2206 return last; 2207 } 2208 2209 static void __check_sync_reqs(struct spdk_file *file); 2210 2211 static void 2212 __file_cache_finish_sync(void *ctx, int bserrno) 2213 { 2214 struct spdk_file *file; 2215 struct spdk_fs_request *sync_req = ctx; 2216 struct spdk_fs_cb_args *sync_args; 2217 2218 sync_args = &sync_req->args; 2219 file = sync_args->file; 2220 pthread_spin_lock(&file->lock); 2221 file->length_xattr = sync_args->op.sync.length; 2222 assert(sync_args->op.sync.offset <= file->length_flushed); 2223 spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset, 2224 0, file->trace_arg_name); 2225 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 2226 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 2227 pthread_spin_unlock(&file->lock); 2228 2229 sync_args->fn.file_op(sync_args->arg, bserrno); 2230 2231 free_fs_request(sync_req); 2232 __check_sync_reqs(file); 2233 } 2234 2235 static void 2236 __check_sync_reqs(struct spdk_file *file) 2237 { 2238 struct spdk_fs_request *sync_req; 2239 2240 pthread_spin_lock(&file->lock); 2241 2242 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 2243 if (sync_req->args.op.sync.offset <= file->length_flushed) { 2244 break; 2245 } 2246 } 2247 2248 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 2249 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 2250 sync_req->args.op.sync.xattr_in_progress = true; 2251 sync_req->args.op.sync.length = file->length_flushed; 2252 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 2253 sizeof(file->length_flushed)); 2254 2255 pthread_spin_unlock(&file->lock); 2256 spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed, 2257 0, file->trace_arg_name); 2258 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req); 2259 } else { 2260 pthread_spin_unlock(&file->lock); 2261 } 2262 } 2263 2264 static void 2265 __file_flush_done(void *ctx, int bserrno) 2266 { 2267 struct spdk_fs_request *req = ctx; 2268 struct spdk_fs_cb_args *args = &req->args; 2269 struct spdk_file *file = args->file; 2270 struct cache_buffer *next = args->op.flush.cache_buffer; 2271 2272 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 2273 2274 pthread_spin_lock(&file->lock); 2275 next->in_progress = false; 2276 next->bytes_flushed += args->op.flush.length; 2277 file->length_flushed += args->op.flush.length; 2278 if (file->length_flushed > file->length) { 2279 file->length = file->length_flushed; 2280 } 2281 if (next->bytes_flushed == next->buf_size) { 2282 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 2283 next = tree_find_buffer(file->tree, file->length_flushed); 2284 } 2285 2286 /* 2287 * Assert that there is no cached data that extends past the end of the underlying 2288 * blob. 2289 */ 2290 assert(next == NULL || next->offset < __file_get_blob_size(file) || 2291 next->bytes_filled == 0); 2292 2293 pthread_spin_unlock(&file->lock); 2294 2295 __check_sync_reqs(file); 2296 2297 __file_flush(req); 2298 } 2299 2300 static void 2301 __file_flush(void *ctx) 2302 { 2303 struct spdk_fs_request *req = ctx; 2304 struct spdk_fs_cb_args *args = &req->args; 2305 struct spdk_file *file = args->file; 2306 struct cache_buffer *next; 2307 uint64_t offset, length, start_lba, num_lba; 2308 uint32_t lba_size; 2309 2310 pthread_spin_lock(&file->lock); 2311 next = tree_find_buffer(file->tree, file->length_flushed); 2312 if (next == NULL || next->in_progress || 2313 ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) { 2314 /* 2315 * There is either no data to flush, a flush I/O is already in 2316 * progress, or the next buffer is partially filled but there's no 2317 * outstanding request to sync it. 2318 * So return immediately - if a flush I/O is in progress we will flush 2319 * more data after that is completed, or a partial buffer will get flushed 2320 * when it is either filled or the file is synced. 2321 */ 2322 free_fs_request(req); 2323 if (next == NULL) { 2324 /* 2325 * For cases where a file's cache was evicted, and then the 2326 * file was later appended, we will write the data directly 2327 * to disk and bypass cache. So just update length_flushed 2328 * here to reflect that all data was already written to disk. 2329 */ 2330 file->length_flushed = file->append_pos; 2331 } 2332 pthread_spin_unlock(&file->lock); 2333 if (next == NULL) { 2334 /* 2335 * There is no data to flush, but we still need to check for any 2336 * outstanding sync requests to make sure metadata gets updated. 2337 */ 2338 __check_sync_reqs(file); 2339 } 2340 return; 2341 } 2342 2343 offset = next->offset + next->bytes_flushed; 2344 length = next->bytes_filled - next->bytes_flushed; 2345 if (length == 0) { 2346 free_fs_request(req); 2347 pthread_spin_unlock(&file->lock); 2348 /* 2349 * There is no data to flush, but we still need to check for any 2350 * outstanding sync requests to make sure metadata gets updated. 2351 */ 2352 __check_sync_reqs(file); 2353 return; 2354 } 2355 args->op.flush.length = length; 2356 args->op.flush.cache_buffer = next; 2357 2358 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2359 2360 next->in_progress = true; 2361 BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n", 2362 offset, length, start_lba, num_lba); 2363 pthread_spin_unlock(&file->lock); 2364 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2365 next->buf + (start_lba * lba_size) - next->offset, 2366 start_lba, num_lba, __file_flush_done, req); 2367 } 2368 2369 static void 2370 __file_extend_done(void *arg, int bserrno) 2371 { 2372 struct spdk_fs_cb_args *args = arg; 2373 2374 __wake_caller(args, bserrno); 2375 } 2376 2377 static void 2378 __file_extend_resize_cb(void *_args, int bserrno) 2379 { 2380 struct spdk_fs_cb_args *args = _args; 2381 struct spdk_file *file = args->file; 2382 2383 if (bserrno) { 2384 __wake_caller(args, bserrno); 2385 return; 2386 } 2387 2388 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2389 } 2390 2391 static void 2392 __file_extend_blob(void *_args) 2393 { 2394 struct spdk_fs_cb_args *args = _args; 2395 struct spdk_file *file = args->file; 2396 2397 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2398 } 2399 2400 static void 2401 __rw_from_file_done(void *ctx, int bserrno) 2402 { 2403 struct spdk_fs_request *req = ctx; 2404 2405 __wake_caller(&req->args, bserrno); 2406 free_fs_request(req); 2407 } 2408 2409 static void 2410 __rw_from_file(void *ctx) 2411 { 2412 struct spdk_fs_request *req = ctx; 2413 struct spdk_fs_cb_args *args = &req->args; 2414 struct spdk_file *file = args->file; 2415 2416 if (args->op.rw.is_read) { 2417 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2418 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2419 __rw_from_file_done, req); 2420 } else { 2421 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2422 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2423 __rw_from_file_done, req); 2424 } 2425 } 2426 2427 struct rw_from_file_arg { 2428 struct spdk_fs_channel *channel; 2429 int rwerrno; 2430 }; 2431 2432 static int 2433 __send_rw_from_file(struct spdk_file *file, void *payload, 2434 uint64_t offset, uint64_t length, bool is_read, 2435 struct rw_from_file_arg *arg) 2436 { 2437 struct spdk_fs_request *req; 2438 struct spdk_fs_cb_args *args; 2439 2440 req = alloc_fs_request_with_iov(arg->channel, 1); 2441 if (req == NULL) { 2442 sem_post(&arg->channel->sem); 2443 return -ENOMEM; 2444 } 2445 2446 args = &req->args; 2447 args->file = file; 2448 args->sem = &arg->channel->sem; 2449 args->iovs[0].iov_base = payload; 2450 args->iovs[0].iov_len = (size_t)length; 2451 args->op.rw.offset = offset; 2452 args->op.rw.is_read = is_read; 2453 args->rwerrno = &arg->rwerrno; 2454 file->fs->send_request(__rw_from_file, req); 2455 return 0; 2456 } 2457 2458 int 2459 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2460 void *payload, uint64_t offset, uint64_t length) 2461 { 2462 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2463 struct spdk_fs_request *flush_req; 2464 uint64_t rem_length, copy, blob_size, cluster_sz; 2465 uint32_t cache_buffers_filled = 0; 2466 uint8_t *cur_payload; 2467 struct cache_buffer *last; 2468 2469 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2470 2471 if (length == 0) { 2472 return 0; 2473 } 2474 2475 if (offset != file->append_pos) { 2476 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2477 return -EINVAL; 2478 } 2479 2480 pthread_spin_lock(&file->lock); 2481 file->open_for_writing = true; 2482 2483 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2484 cache_append_buffer(file); 2485 } 2486 2487 if (file->last == NULL) { 2488 struct rw_from_file_arg arg = {}; 2489 int rc; 2490 2491 arg.channel = channel; 2492 arg.rwerrno = 0; 2493 file->append_pos += length; 2494 pthread_spin_unlock(&file->lock); 2495 rc = __send_rw_from_file(file, payload, offset, length, false, &arg); 2496 if (rc != 0) { 2497 return rc; 2498 } 2499 sem_wait(&channel->sem); 2500 return arg.rwerrno; 2501 } 2502 2503 blob_size = __file_get_blob_size(file); 2504 2505 if ((offset + length) > blob_size) { 2506 struct spdk_fs_cb_args extend_args = {}; 2507 2508 cluster_sz = file->fs->bs_opts.cluster_sz; 2509 extend_args.sem = &channel->sem; 2510 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2511 extend_args.file = file; 2512 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2513 pthread_spin_unlock(&file->lock); 2514 file->fs->send_request(__file_extend_blob, &extend_args); 2515 sem_wait(&channel->sem); 2516 if (extend_args.rc) { 2517 return extend_args.rc; 2518 } 2519 } 2520 2521 flush_req = alloc_fs_request(channel); 2522 if (flush_req == NULL) { 2523 pthread_spin_unlock(&file->lock); 2524 return -ENOMEM; 2525 } 2526 2527 last = file->last; 2528 rem_length = length; 2529 cur_payload = payload; 2530 while (rem_length > 0) { 2531 copy = last->buf_size - last->bytes_filled; 2532 if (copy > rem_length) { 2533 copy = rem_length; 2534 } 2535 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2536 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2537 file->append_pos += copy; 2538 if (file->length < file->append_pos) { 2539 file->length = file->append_pos; 2540 } 2541 cur_payload += copy; 2542 last->bytes_filled += copy; 2543 rem_length -= copy; 2544 if (last->bytes_filled == last->buf_size) { 2545 cache_buffers_filled++; 2546 last = cache_append_buffer(file); 2547 if (last == NULL) { 2548 BLOBFS_TRACE(file, "nomem\n"); 2549 free_fs_request(flush_req); 2550 pthread_spin_unlock(&file->lock); 2551 return -ENOMEM; 2552 } 2553 } 2554 } 2555 2556 pthread_spin_unlock(&file->lock); 2557 2558 if (cache_buffers_filled == 0) { 2559 free_fs_request(flush_req); 2560 return 0; 2561 } 2562 2563 flush_req->args.file = file; 2564 file->fs->send_request(__file_flush, flush_req); 2565 return 0; 2566 } 2567 2568 static void 2569 __readahead_done(void *ctx, int bserrno) 2570 { 2571 struct spdk_fs_request *req = ctx; 2572 struct spdk_fs_cb_args *args = &req->args; 2573 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2574 struct spdk_file *file = args->file; 2575 2576 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2577 2578 pthread_spin_lock(&file->lock); 2579 cache_buffer->bytes_filled = args->op.readahead.length; 2580 cache_buffer->bytes_flushed = args->op.readahead.length; 2581 cache_buffer->in_progress = false; 2582 pthread_spin_unlock(&file->lock); 2583 2584 free_fs_request(req); 2585 } 2586 2587 static void 2588 __readahead(void *ctx) 2589 { 2590 struct spdk_fs_request *req = ctx; 2591 struct spdk_fs_cb_args *args = &req->args; 2592 struct spdk_file *file = args->file; 2593 uint64_t offset, length, start_lba, num_lba; 2594 uint32_t lba_size; 2595 2596 offset = args->op.readahead.offset; 2597 length = args->op.readahead.length; 2598 assert(length > 0); 2599 2600 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2601 2602 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2603 offset, length, start_lba, num_lba); 2604 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2605 args->op.readahead.cache_buffer->buf, 2606 start_lba, num_lba, __readahead_done, req); 2607 } 2608 2609 static uint64_t 2610 __next_cache_buffer_offset(uint64_t offset) 2611 { 2612 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2613 } 2614 2615 static void 2616 check_readahead(struct spdk_file *file, uint64_t offset, 2617 struct spdk_fs_channel *channel) 2618 { 2619 struct spdk_fs_request *req; 2620 struct spdk_fs_cb_args *args; 2621 2622 offset = __next_cache_buffer_offset(offset); 2623 if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2624 return; 2625 } 2626 2627 req = alloc_fs_request(channel); 2628 if (req == NULL) { 2629 return; 2630 } 2631 args = &req->args; 2632 2633 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2634 2635 args->file = file; 2636 args->op.readahead.offset = offset; 2637 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2638 if (!args->op.readahead.cache_buffer) { 2639 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2640 free_fs_request(req); 2641 return; 2642 } 2643 2644 args->op.readahead.cache_buffer->in_progress = true; 2645 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2646 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2647 } else { 2648 args->op.readahead.length = CACHE_BUFFER_SIZE; 2649 } 2650 file->fs->send_request(__readahead, req); 2651 } 2652 2653 int64_t 2654 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2655 void *payload, uint64_t offset, uint64_t length) 2656 { 2657 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2658 uint64_t final_offset, final_length; 2659 uint32_t sub_reads = 0; 2660 struct cache_buffer *buf; 2661 uint64_t read_len; 2662 struct rw_from_file_arg arg = {}; 2663 2664 pthread_spin_lock(&file->lock); 2665 2666 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2667 2668 file->open_for_writing = false; 2669 2670 if (length == 0 || offset >= file->append_pos) { 2671 pthread_spin_unlock(&file->lock); 2672 return 0; 2673 } 2674 2675 if (offset + length > file->append_pos) { 2676 length = file->append_pos - offset; 2677 } 2678 2679 if (offset != file->next_seq_offset) { 2680 file->seq_byte_count = 0; 2681 } 2682 file->seq_byte_count += length; 2683 file->next_seq_offset = offset + length; 2684 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2685 check_readahead(file, offset, channel); 2686 check_readahead(file, offset + CACHE_BUFFER_SIZE, channel); 2687 } 2688 2689 arg.channel = channel; 2690 arg.rwerrno = 0; 2691 final_length = 0; 2692 final_offset = offset + length; 2693 while (offset < final_offset) { 2694 int ret = 0; 2695 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2696 if (length > (final_offset - offset)) { 2697 length = final_offset - offset; 2698 } 2699 2700 buf = tree_find_filled_buffer(file->tree, offset); 2701 if (buf == NULL) { 2702 pthread_spin_unlock(&file->lock); 2703 ret = __send_rw_from_file(file, payload, offset, length, true, &arg); 2704 pthread_spin_lock(&file->lock); 2705 if (ret == 0) { 2706 sub_reads++; 2707 } 2708 } else { 2709 read_len = length; 2710 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2711 read_len = buf->offset + buf->bytes_filled - offset; 2712 } 2713 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len); 2714 memcpy(payload, &buf->buf[offset - buf->offset], read_len); 2715 if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) { 2716 tree_remove_buffer(file->tree, buf); 2717 if (file->tree->present_mask == 0) { 2718 spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file); 2719 } 2720 } 2721 } 2722 2723 if (ret == 0) { 2724 final_length += length; 2725 } else { 2726 arg.rwerrno = ret; 2727 break; 2728 } 2729 payload += length; 2730 offset += length; 2731 } 2732 pthread_spin_unlock(&file->lock); 2733 while (sub_reads > 0) { 2734 sem_wait(&channel->sem); 2735 sub_reads--; 2736 } 2737 if (arg.rwerrno == 0) { 2738 return final_length; 2739 } else { 2740 return arg.rwerrno; 2741 } 2742 } 2743 2744 static void 2745 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2746 spdk_file_op_complete cb_fn, void *cb_arg) 2747 { 2748 struct spdk_fs_request *sync_req; 2749 struct spdk_fs_request *flush_req; 2750 struct spdk_fs_cb_args *sync_args; 2751 struct spdk_fs_cb_args *flush_args; 2752 2753 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2754 2755 pthread_spin_lock(&file->lock); 2756 if (file->append_pos <= file->length_xattr) { 2757 BLOBFS_TRACE(file, "done - file already synced\n"); 2758 pthread_spin_unlock(&file->lock); 2759 cb_fn(cb_arg, 0); 2760 return; 2761 } 2762 2763 sync_req = alloc_fs_request(channel); 2764 if (!sync_req) { 2765 SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name); 2766 pthread_spin_unlock(&file->lock); 2767 cb_fn(cb_arg, -ENOMEM); 2768 return; 2769 } 2770 sync_args = &sync_req->args; 2771 2772 flush_req = alloc_fs_request(channel); 2773 if (!flush_req) { 2774 SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name); 2775 free_fs_request(sync_req); 2776 pthread_spin_unlock(&file->lock); 2777 cb_fn(cb_arg, -ENOMEM); 2778 return; 2779 } 2780 flush_args = &flush_req->args; 2781 2782 sync_args->file = file; 2783 sync_args->fn.file_op = cb_fn; 2784 sync_args->arg = cb_arg; 2785 sync_args->op.sync.offset = file->append_pos; 2786 sync_args->op.sync.xattr_in_progress = false; 2787 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2788 pthread_spin_unlock(&file->lock); 2789 2790 flush_args->file = file; 2791 channel->send_request(__file_flush, flush_req); 2792 } 2793 2794 int 2795 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2796 { 2797 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2798 struct spdk_fs_cb_args args = {}; 2799 2800 args.sem = &channel->sem; 2801 _file_sync(file, channel, __wake_caller, &args); 2802 sem_wait(&channel->sem); 2803 2804 return args.rc; 2805 } 2806 2807 void 2808 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2809 spdk_file_op_complete cb_fn, void *cb_arg) 2810 { 2811 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2812 2813 _file_sync(file, channel, cb_fn, cb_arg); 2814 } 2815 2816 void 2817 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2818 { 2819 BLOBFS_TRACE(file, "priority=%u\n", priority); 2820 file->priority = priority; 2821 2822 } 2823 2824 /* 2825 * Close routines 2826 */ 2827 2828 static void 2829 __file_close_async_done(void *ctx, int bserrno) 2830 { 2831 struct spdk_fs_request *req = ctx; 2832 struct spdk_fs_cb_args *args = &req->args; 2833 struct spdk_file *file = args->file; 2834 2835 spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name); 2836 2837 if (file->is_deleted) { 2838 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2839 return; 2840 } 2841 2842 args->fn.file_op(args->arg, bserrno); 2843 free_fs_request(req); 2844 } 2845 2846 static void 2847 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2848 { 2849 struct spdk_blob *blob; 2850 2851 pthread_spin_lock(&file->lock); 2852 if (file->ref_count == 0) { 2853 pthread_spin_unlock(&file->lock); 2854 __file_close_async_done(req, -EBADF); 2855 return; 2856 } 2857 2858 file->ref_count--; 2859 if (file->ref_count > 0) { 2860 pthread_spin_unlock(&file->lock); 2861 req->args.fn.file_op(req->args.arg, 0); 2862 free_fs_request(req); 2863 return; 2864 } 2865 2866 pthread_spin_unlock(&file->lock); 2867 2868 blob = file->blob; 2869 file->blob = NULL; 2870 spdk_blob_close(blob, __file_close_async_done, req); 2871 } 2872 2873 static void 2874 __file_close_async__sync_done(void *arg, int fserrno) 2875 { 2876 struct spdk_fs_request *req = arg; 2877 struct spdk_fs_cb_args *args = &req->args; 2878 2879 __file_close_async(args->file, req); 2880 } 2881 2882 void 2883 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2884 { 2885 struct spdk_fs_request *req; 2886 struct spdk_fs_cb_args *args; 2887 2888 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2889 if (req == NULL) { 2890 SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name); 2891 cb_fn(cb_arg, -ENOMEM); 2892 return; 2893 } 2894 2895 args = &req->args; 2896 args->file = file; 2897 args->fn.file_op = cb_fn; 2898 args->arg = cb_arg; 2899 2900 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2901 } 2902 2903 static void 2904 __file_close(void *arg) 2905 { 2906 struct spdk_fs_request *req = arg; 2907 struct spdk_fs_cb_args *args = &req->args; 2908 struct spdk_file *file = args->file; 2909 2910 __file_close_async(file, req); 2911 } 2912 2913 int 2914 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2915 { 2916 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2917 struct spdk_fs_request *req; 2918 struct spdk_fs_cb_args *args; 2919 2920 req = alloc_fs_request(channel); 2921 if (req == NULL) { 2922 SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name); 2923 return -ENOMEM; 2924 } 2925 2926 args = &req->args; 2927 2928 spdk_file_sync(file, ctx); 2929 BLOBFS_TRACE(file, "name=%s\n", file->name); 2930 args->file = file; 2931 args->sem = &channel->sem; 2932 args->fn.file_op = __wake_caller; 2933 args->arg = args; 2934 channel->send_request(__file_close, req); 2935 sem_wait(&channel->sem); 2936 2937 return args->rc; 2938 } 2939 2940 int 2941 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2942 { 2943 if (size < sizeof(spdk_blob_id)) { 2944 return -EINVAL; 2945 } 2946 2947 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2948 2949 return sizeof(spdk_blob_id); 2950 } 2951 2952 static void 2953 _file_free(void *ctx) 2954 { 2955 struct spdk_file *file = ctx; 2956 2957 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2958 2959 free(file->name); 2960 free(file->tree); 2961 free(file); 2962 } 2963 2964 static void 2965 file_free(struct spdk_file *file) 2966 { 2967 BLOBFS_TRACE(file, "free=%s\n", file->name); 2968 pthread_spin_lock(&file->lock); 2969 if (file->tree->present_mask == 0) { 2970 pthread_spin_unlock(&file->lock); 2971 free(file->name); 2972 free(file->tree); 2973 free(file); 2974 return; 2975 } 2976 2977 tree_free_buffers(file->tree); 2978 assert(file->tree->present_mask == 0); 2979 spdk_thread_send_msg(g_cache_pool_thread, _file_free, file); 2980 pthread_spin_unlock(&file->lock); 2981 } 2982 2983 SPDK_LOG_REGISTER_COMPONENT(blobfs) 2984 SPDK_LOG_REGISTER_COMPONENT(blobfs_rw) 2985