1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "tree.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/thread.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 #include "spdk/trace.h" 47 48 #define BLOBFS_TRACE(file, str, args...) \ 49 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 50 51 #define BLOBFS_TRACE_RW(file, str, args...) \ 52 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 53 54 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 55 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 56 57 #define SPDK_BLOBFS_SIGNATURE "BLOBFS" 58 59 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 60 static struct spdk_mempool *g_cache_pool; 61 static TAILQ_HEAD(, spdk_file) g_caches; 62 static struct spdk_poller *g_cache_pool_mgmt_poller; 63 static struct spdk_thread *g_cache_pool_thread; 64 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL 65 static int g_fs_count = 0; 66 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 67 68 #define TRACE_GROUP_BLOBFS 0x7 69 #define TRACE_BLOBFS_XATTR_START SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0) 70 #define TRACE_BLOBFS_XATTR_END SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1) 71 #define TRACE_BLOBFS_OPEN SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2) 72 #define TRACE_BLOBFS_CLOSE SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3) 73 #define TRACE_BLOBFS_DELETE_START SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4) 74 #define TRACE_BLOBFS_DELETE_DONE SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5) 75 76 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS) 77 { 78 spdk_trace_register_description("BLOBFS_XATTR_START", 79 TRACE_BLOBFS_XATTR_START, 80 OWNER_NONE, OBJECT_NONE, 0, 81 SPDK_TRACE_ARG_TYPE_STR, 82 "file: "); 83 spdk_trace_register_description("BLOBFS_XATTR_END", 84 TRACE_BLOBFS_XATTR_END, 85 OWNER_NONE, OBJECT_NONE, 0, 86 SPDK_TRACE_ARG_TYPE_STR, 87 "file: "); 88 spdk_trace_register_description("BLOBFS_OPEN", 89 TRACE_BLOBFS_OPEN, 90 OWNER_NONE, OBJECT_NONE, 0, 91 SPDK_TRACE_ARG_TYPE_STR, 92 "file: "); 93 spdk_trace_register_description("BLOBFS_CLOSE", 94 TRACE_BLOBFS_CLOSE, 95 OWNER_NONE, OBJECT_NONE, 0, 96 SPDK_TRACE_ARG_TYPE_STR, 97 "file: "); 98 spdk_trace_register_description("BLOBFS_DELETE_START", 99 TRACE_BLOBFS_DELETE_START, 100 OWNER_NONE, OBJECT_NONE, 0, 101 SPDK_TRACE_ARG_TYPE_STR, 102 "file: "); 103 spdk_trace_register_description("BLOBFS_DELETE_DONE", 104 TRACE_BLOBFS_DELETE_DONE, 105 OWNER_NONE, OBJECT_NONE, 0, 106 SPDK_TRACE_ARG_TYPE_STR, 107 "file: "); 108 } 109 110 void 111 cache_buffer_free(struct cache_buffer *cache_buffer) 112 { 113 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 114 free(cache_buffer); 115 } 116 117 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 118 119 struct spdk_file { 120 struct spdk_filesystem *fs; 121 struct spdk_blob *blob; 122 char *name; 123 uint64_t trace_arg_name; 124 uint64_t length; 125 bool is_deleted; 126 bool open_for_writing; 127 uint64_t length_flushed; 128 uint64_t length_xattr; 129 uint64_t append_pos; 130 uint64_t seq_byte_count; 131 uint64_t next_seq_offset; 132 uint32_t priority; 133 TAILQ_ENTRY(spdk_file) tailq; 134 spdk_blob_id blobid; 135 uint32_t ref_count; 136 pthread_spinlock_t lock; 137 struct cache_buffer *last; 138 struct cache_tree *tree; 139 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 140 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 141 TAILQ_ENTRY(spdk_file) cache_tailq; 142 }; 143 144 struct spdk_deleted_file { 145 spdk_blob_id id; 146 TAILQ_ENTRY(spdk_deleted_file) tailq; 147 }; 148 149 struct spdk_filesystem { 150 struct spdk_blob_store *bs; 151 TAILQ_HEAD(, spdk_file) files; 152 struct spdk_bs_opts bs_opts; 153 struct spdk_bs_dev *bdev; 154 fs_send_request_fn send_request; 155 156 struct { 157 uint32_t max_ops; 158 struct spdk_io_channel *sync_io_channel; 159 struct spdk_fs_channel *sync_fs_channel; 160 } sync_target; 161 162 struct { 163 uint32_t max_ops; 164 struct spdk_io_channel *md_io_channel; 165 struct spdk_fs_channel *md_fs_channel; 166 } md_target; 167 168 struct { 169 uint32_t max_ops; 170 } io_target; 171 }; 172 173 struct spdk_fs_cb_args { 174 union { 175 spdk_fs_op_with_handle_complete fs_op_with_handle; 176 spdk_fs_op_complete fs_op; 177 spdk_file_op_with_handle_complete file_op_with_handle; 178 spdk_file_op_complete file_op; 179 spdk_file_stat_op_complete stat_op; 180 } fn; 181 void *arg; 182 sem_t *sem; 183 struct spdk_filesystem *fs; 184 struct spdk_file *file; 185 int rc; 186 struct iovec *iovs; 187 uint32_t iovcnt; 188 struct iovec iov; 189 union { 190 struct { 191 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 192 } fs_load; 193 struct { 194 uint64_t length; 195 } truncate; 196 struct { 197 struct spdk_io_channel *channel; 198 void *pin_buf; 199 int is_read; 200 off_t offset; 201 size_t length; 202 uint64_t start_lba; 203 uint64_t num_lba; 204 uint32_t blocklen; 205 } rw; 206 struct { 207 const char *old_name; 208 const char *new_name; 209 } rename; 210 struct { 211 struct cache_buffer *cache_buffer; 212 uint64_t length; 213 } flush; 214 struct { 215 struct cache_buffer *cache_buffer; 216 uint64_t length; 217 uint64_t offset; 218 } readahead; 219 struct { 220 /* offset of the file when the sync request was made */ 221 uint64_t offset; 222 TAILQ_ENTRY(spdk_fs_request) tailq; 223 bool xattr_in_progress; 224 /* length written to the xattr for this file - this should 225 * always be the same as the offset if only one thread is 226 * writing to the file, but could differ if multiple threads 227 * are appending 228 */ 229 uint64_t length; 230 } sync; 231 struct { 232 uint32_t num_clusters; 233 } resize; 234 struct { 235 const char *name; 236 uint32_t flags; 237 TAILQ_ENTRY(spdk_fs_request) tailq; 238 } open; 239 struct { 240 const char *name; 241 struct spdk_blob *blob; 242 } create; 243 struct { 244 const char *name; 245 } delete; 246 struct { 247 const char *name; 248 } stat; 249 } op; 250 }; 251 252 static void file_free(struct spdk_file *file); 253 static void fs_io_device_unregister(struct spdk_filesystem *fs); 254 static void fs_free_io_channels(struct spdk_filesystem *fs); 255 256 void 257 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 258 { 259 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 260 } 261 262 static int _blobfs_cache_pool_reclaim(void *arg); 263 264 static bool 265 blobfs_cache_pool_need_reclaim(void) 266 { 267 size_t count; 268 269 count = spdk_mempool_count(g_cache_pool); 270 /* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller 271 * when the number of available cache buffer is less than 1/5 of total buffers. 272 */ 273 if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) { 274 return false; 275 } 276 277 return true; 278 } 279 280 static void 281 __start_cache_pool_mgmt(void *ctx) 282 { 283 assert(g_cache_pool == NULL); 284 285 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 286 g_fs_cache_size / CACHE_BUFFER_SIZE, 287 CACHE_BUFFER_SIZE, 288 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 289 SPDK_ENV_SOCKET_ID_ANY); 290 if (!g_cache_pool) { 291 SPDK_ERRLOG("Create mempool failed, you may " 292 "increase the memory and try again\n"); 293 assert(false); 294 } 295 TAILQ_INIT(&g_caches); 296 297 assert(g_cache_pool_mgmt_poller == NULL); 298 g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL, 299 BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 300 } 301 302 static void 303 __stop_cache_pool_mgmt(void *ctx) 304 { 305 spdk_poller_unregister(&g_cache_pool_mgmt_poller); 306 307 assert(g_cache_pool != NULL); 308 assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE); 309 spdk_mempool_free(g_cache_pool); 310 g_cache_pool = NULL; 311 312 spdk_thread_exit(g_cache_pool_thread); 313 } 314 315 static void 316 initialize_global_cache(void) 317 { 318 pthread_mutex_lock(&g_cache_init_lock); 319 if (g_fs_count == 0) { 320 g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL); 321 assert(g_cache_pool_thread != NULL); 322 spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL); 323 } 324 g_fs_count++; 325 pthread_mutex_unlock(&g_cache_init_lock); 326 } 327 328 static void 329 free_global_cache(void) 330 { 331 pthread_mutex_lock(&g_cache_init_lock); 332 g_fs_count--; 333 if (g_fs_count == 0) { 334 spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL); 335 } 336 pthread_mutex_unlock(&g_cache_init_lock); 337 } 338 339 static uint64_t 340 __file_get_blob_size(struct spdk_file *file) 341 { 342 uint64_t cluster_sz; 343 344 cluster_sz = file->fs->bs_opts.cluster_sz; 345 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 346 } 347 348 struct spdk_fs_request { 349 struct spdk_fs_cb_args args; 350 TAILQ_ENTRY(spdk_fs_request) link; 351 struct spdk_fs_channel *channel; 352 }; 353 354 struct spdk_fs_channel { 355 struct spdk_fs_request *req_mem; 356 TAILQ_HEAD(, spdk_fs_request) reqs; 357 sem_t sem; 358 struct spdk_filesystem *fs; 359 struct spdk_io_channel *bs_channel; 360 fs_send_request_fn send_request; 361 bool sync; 362 uint32_t outstanding_reqs; 363 pthread_spinlock_t lock; 364 }; 365 366 /* For now, this is effectively an alias. But eventually we'll shift 367 * some data members over. */ 368 struct spdk_fs_thread_ctx { 369 struct spdk_fs_channel ch; 370 }; 371 372 static struct spdk_fs_request * 373 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt) 374 { 375 struct spdk_fs_request *req; 376 struct iovec *iovs = NULL; 377 378 if (iovcnt > 1) { 379 iovs = calloc(iovcnt, sizeof(struct iovec)); 380 if (!iovs) { 381 return NULL; 382 } 383 } 384 385 if (channel->sync) { 386 pthread_spin_lock(&channel->lock); 387 } 388 389 req = TAILQ_FIRST(&channel->reqs); 390 if (req) { 391 channel->outstanding_reqs++; 392 TAILQ_REMOVE(&channel->reqs, req, link); 393 } 394 395 if (channel->sync) { 396 pthread_spin_unlock(&channel->lock); 397 } 398 399 if (req == NULL) { 400 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 401 free(iovs); 402 return NULL; 403 } 404 memset(req, 0, sizeof(*req)); 405 req->channel = channel; 406 if (iovcnt > 1) { 407 req->args.iovs = iovs; 408 } else { 409 req->args.iovs = &req->args.iov; 410 } 411 req->args.iovcnt = iovcnt; 412 413 return req; 414 } 415 416 static struct spdk_fs_request * 417 alloc_fs_request(struct spdk_fs_channel *channel) 418 { 419 return alloc_fs_request_with_iov(channel, 0); 420 } 421 422 static void 423 free_fs_request(struct spdk_fs_request *req) 424 { 425 struct spdk_fs_channel *channel = req->channel; 426 427 if (req->args.iovcnt > 1) { 428 free(req->args.iovs); 429 } 430 431 if (channel->sync) { 432 pthread_spin_lock(&channel->lock); 433 } 434 435 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 436 channel->outstanding_reqs--; 437 438 if (channel->sync) { 439 pthread_spin_unlock(&channel->lock); 440 } 441 } 442 443 static int 444 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 445 uint32_t max_ops) 446 { 447 uint32_t i; 448 449 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 450 if (!channel->req_mem) { 451 return -1; 452 } 453 454 channel->outstanding_reqs = 0; 455 TAILQ_INIT(&channel->reqs); 456 sem_init(&channel->sem, 0, 0); 457 458 for (i = 0; i < max_ops; i++) { 459 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 460 } 461 462 channel->fs = fs; 463 464 return 0; 465 } 466 467 static int 468 fs_md_channel_create(void *io_device, void *ctx_buf) 469 { 470 struct spdk_filesystem *fs; 471 struct spdk_fs_channel *channel = ctx_buf; 472 473 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 474 475 return fs_channel_create(fs, channel, fs->md_target.max_ops); 476 } 477 478 static int 479 fs_sync_channel_create(void *io_device, void *ctx_buf) 480 { 481 struct spdk_filesystem *fs; 482 struct spdk_fs_channel *channel = ctx_buf; 483 484 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 485 486 return fs_channel_create(fs, channel, fs->sync_target.max_ops); 487 } 488 489 static int 490 fs_io_channel_create(void *io_device, void *ctx_buf) 491 { 492 struct spdk_filesystem *fs; 493 struct spdk_fs_channel *channel = ctx_buf; 494 495 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 496 497 return fs_channel_create(fs, channel, fs->io_target.max_ops); 498 } 499 500 static void 501 fs_channel_destroy(void *io_device, void *ctx_buf) 502 { 503 struct spdk_fs_channel *channel = ctx_buf; 504 505 if (channel->outstanding_reqs > 0) { 506 SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n", 507 channel->outstanding_reqs); 508 } 509 510 free(channel->req_mem); 511 if (channel->bs_channel != NULL) { 512 spdk_bs_free_io_channel(channel->bs_channel); 513 } 514 } 515 516 static void 517 __send_request_direct(fs_request_fn fn, void *arg) 518 { 519 fn(arg); 520 } 521 522 static void 523 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 524 { 525 fs->bs = bs; 526 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 527 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 528 fs->md_target.md_fs_channel->send_request = __send_request_direct; 529 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 530 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 531 532 initialize_global_cache(); 533 } 534 535 static void 536 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 537 { 538 struct spdk_fs_request *req = ctx; 539 struct spdk_fs_cb_args *args = &req->args; 540 struct spdk_filesystem *fs = args->fs; 541 542 if (bserrno == 0) { 543 common_fs_bs_init(fs, bs); 544 } else { 545 free(fs); 546 fs = NULL; 547 } 548 549 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 550 free_fs_request(req); 551 } 552 553 static void 554 fs_conf_parse(void) 555 { 556 struct spdk_conf_section *sp; 557 int cache_buffer_shift; 558 559 sp = spdk_conf_find_section(NULL, "Blobfs"); 560 if (sp == NULL) { 561 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 562 return; 563 } 564 565 cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 566 if (cache_buffer_shift <= 0) { 567 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 568 } else { 569 g_fs_cache_buffer_shift = cache_buffer_shift; 570 } 571 } 572 573 static struct spdk_filesystem * 574 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 575 { 576 struct spdk_filesystem *fs; 577 578 fs = calloc(1, sizeof(*fs)); 579 if (fs == NULL) { 580 return NULL; 581 } 582 583 fs->bdev = dev; 584 fs->send_request = send_request_fn; 585 TAILQ_INIT(&fs->files); 586 587 fs->md_target.max_ops = 512; 588 spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy, 589 sizeof(struct spdk_fs_channel), "blobfs_md"); 590 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 591 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 592 593 fs->sync_target.max_ops = 512; 594 spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy, 595 sizeof(struct spdk_fs_channel), "blobfs_sync"); 596 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 597 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 598 599 fs->io_target.max_ops = 512; 600 spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy, 601 sizeof(struct spdk_fs_channel), "blobfs_io"); 602 603 return fs; 604 } 605 606 static void 607 __wake_caller(void *arg, int fserrno) 608 { 609 struct spdk_fs_cb_args *args = arg; 610 611 args->rc = fserrno; 612 sem_post(args->sem); 613 } 614 615 void 616 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 617 fs_send_request_fn send_request_fn, 618 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 619 { 620 struct spdk_filesystem *fs; 621 struct spdk_fs_request *req; 622 struct spdk_fs_cb_args *args; 623 struct spdk_bs_opts opts = {}; 624 625 fs = fs_alloc(dev, send_request_fn); 626 if (fs == NULL) { 627 cb_fn(cb_arg, NULL, -ENOMEM); 628 return; 629 } 630 631 fs_conf_parse(); 632 633 req = alloc_fs_request(fs->md_target.md_fs_channel); 634 if (req == NULL) { 635 fs_free_io_channels(fs); 636 fs_io_device_unregister(fs); 637 cb_fn(cb_arg, NULL, -ENOMEM); 638 return; 639 } 640 641 args = &req->args; 642 args->fn.fs_op_with_handle = cb_fn; 643 args->arg = cb_arg; 644 args->fs = fs; 645 646 spdk_bs_opts_init(&opts); 647 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE); 648 if (opt) { 649 opts.cluster_sz = opt->cluster_sz; 650 } 651 spdk_bs_init(dev, &opts, init_cb, req); 652 } 653 654 static struct spdk_file * 655 file_alloc(struct spdk_filesystem *fs) 656 { 657 struct spdk_file *file; 658 659 file = calloc(1, sizeof(*file)); 660 if (file == NULL) { 661 return NULL; 662 } 663 664 file->tree = calloc(1, sizeof(*file->tree)); 665 if (file->tree == NULL) { 666 free(file); 667 return NULL; 668 } 669 670 file->fs = fs; 671 TAILQ_INIT(&file->open_requests); 672 TAILQ_INIT(&file->sync_requests); 673 pthread_spin_init(&file->lock, 0); 674 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 675 file->priority = SPDK_FILE_PRIORITY_LOW; 676 return file; 677 } 678 679 static void fs_load_done(void *ctx, int bserrno); 680 681 static int 682 _handle_deleted_files(struct spdk_fs_request *req) 683 { 684 struct spdk_fs_cb_args *args = &req->args; 685 struct spdk_filesystem *fs = args->fs; 686 687 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 688 struct spdk_deleted_file *deleted_file; 689 690 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 691 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 692 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 693 free(deleted_file); 694 return 0; 695 } 696 697 return 1; 698 } 699 700 static void 701 fs_load_done(void *ctx, int bserrno) 702 { 703 struct spdk_fs_request *req = ctx; 704 struct spdk_fs_cb_args *args = &req->args; 705 struct spdk_filesystem *fs = args->fs; 706 707 /* The filesystem has been loaded. Now check if there are any files that 708 * were marked for deletion before last unload. Do not complete the 709 * fs_load callback until all of them have been deleted on disk. 710 */ 711 if (_handle_deleted_files(req) == 0) { 712 /* We found a file that's been marked for deleting but not actually 713 * deleted yet. This function will get called again once the delete 714 * operation is completed. 715 */ 716 return; 717 } 718 719 args->fn.fs_op_with_handle(args->arg, fs, 0); 720 free_fs_request(req); 721 722 } 723 724 static void 725 _file_build_trace_arg_name(struct spdk_file *f) 726 { 727 f->trace_arg_name = 0; 728 memcpy(&f->trace_arg_name, f->name, 729 spdk_min(sizeof(f->trace_arg_name), strlen(f->name))); 730 } 731 732 static void 733 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 734 { 735 struct spdk_fs_request *req = ctx; 736 struct spdk_fs_cb_args *args = &req->args; 737 struct spdk_filesystem *fs = args->fs; 738 uint64_t *length; 739 const char *name; 740 uint32_t *is_deleted; 741 size_t value_len; 742 743 if (rc < 0) { 744 args->fn.fs_op_with_handle(args->arg, fs, rc); 745 free_fs_request(req); 746 return; 747 } 748 749 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 750 if (rc < 0) { 751 args->fn.fs_op_with_handle(args->arg, fs, rc); 752 free_fs_request(req); 753 return; 754 } 755 756 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 757 if (rc < 0) { 758 args->fn.fs_op_with_handle(args->arg, fs, rc); 759 free_fs_request(req); 760 return; 761 } 762 763 assert(value_len == 8); 764 765 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 766 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 767 if (rc < 0) { 768 struct spdk_file *f; 769 770 f = file_alloc(fs); 771 if (f == NULL) { 772 SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n"); 773 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 774 free_fs_request(req); 775 return; 776 } 777 778 f->name = strdup(name); 779 _file_build_trace_arg_name(f); 780 f->blobid = spdk_blob_get_id(blob); 781 f->length = *length; 782 f->length_flushed = *length; 783 f->length_xattr = *length; 784 f->append_pos = *length; 785 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 786 } else { 787 struct spdk_deleted_file *deleted_file; 788 789 deleted_file = calloc(1, sizeof(*deleted_file)); 790 if (deleted_file == NULL) { 791 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 792 free_fs_request(req); 793 return; 794 } 795 deleted_file->id = spdk_blob_get_id(blob); 796 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 797 } 798 } 799 800 static void 801 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 802 { 803 struct spdk_fs_request *req = ctx; 804 struct spdk_fs_cb_args *args = &req->args; 805 struct spdk_filesystem *fs = args->fs; 806 struct spdk_bs_type bstype; 807 static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE}; 808 static const struct spdk_bs_type zeros; 809 810 if (bserrno != 0) { 811 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 812 free_fs_request(req); 813 fs_free_io_channels(fs); 814 fs_io_device_unregister(fs); 815 return; 816 } 817 818 bstype = spdk_bs_get_bstype(bs); 819 820 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 821 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "assigning bstype\n"); 822 spdk_bs_set_bstype(bs, blobfs_type); 823 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 824 SPDK_ERRLOG("not blobfs\n"); 825 SPDK_LOGDUMP(SPDK_LOG_BLOBFS, "bstype", &bstype, sizeof(bstype)); 826 args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL); 827 free_fs_request(req); 828 fs_free_io_channels(fs); 829 fs_io_device_unregister(fs); 830 return; 831 } 832 833 common_fs_bs_init(fs, bs); 834 fs_load_done(req, 0); 835 } 836 837 static void 838 fs_io_device_unregister(struct spdk_filesystem *fs) 839 { 840 assert(fs != NULL); 841 spdk_io_device_unregister(&fs->md_target, NULL); 842 spdk_io_device_unregister(&fs->sync_target, NULL); 843 spdk_io_device_unregister(&fs->io_target, NULL); 844 free(fs); 845 } 846 847 static void 848 fs_free_io_channels(struct spdk_filesystem *fs) 849 { 850 assert(fs != NULL); 851 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 852 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 853 } 854 855 void 856 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 857 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 858 { 859 struct spdk_filesystem *fs; 860 struct spdk_fs_cb_args *args; 861 struct spdk_fs_request *req; 862 struct spdk_bs_opts bs_opts; 863 864 fs = fs_alloc(dev, send_request_fn); 865 if (fs == NULL) { 866 cb_fn(cb_arg, NULL, -ENOMEM); 867 return; 868 } 869 870 fs_conf_parse(); 871 872 req = alloc_fs_request(fs->md_target.md_fs_channel); 873 if (req == NULL) { 874 fs_free_io_channels(fs); 875 fs_io_device_unregister(fs); 876 cb_fn(cb_arg, NULL, -ENOMEM); 877 return; 878 } 879 880 args = &req->args; 881 args->fn.fs_op_with_handle = cb_fn; 882 args->arg = cb_arg; 883 args->fs = fs; 884 TAILQ_INIT(&args->op.fs_load.deleted_files); 885 spdk_bs_opts_init(&bs_opts); 886 bs_opts.iter_cb_fn = iter_cb; 887 bs_opts.iter_cb_arg = req; 888 spdk_bs_load(dev, &bs_opts, load_cb, req); 889 } 890 891 static void 892 unload_cb(void *ctx, int bserrno) 893 { 894 struct spdk_fs_request *req = ctx; 895 struct spdk_fs_cb_args *args = &req->args; 896 struct spdk_filesystem *fs = args->fs; 897 struct spdk_file *file, *tmp; 898 899 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 900 TAILQ_REMOVE(&fs->files, file, tailq); 901 file_free(file); 902 } 903 904 free_global_cache(); 905 906 args->fn.fs_op(args->arg, bserrno); 907 free(req); 908 909 fs_io_device_unregister(fs); 910 } 911 912 void 913 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 914 { 915 struct spdk_fs_request *req; 916 struct spdk_fs_cb_args *args; 917 918 /* 919 * We must free the md_channel before unloading the blobstore, so just 920 * allocate this request from the general heap. 921 */ 922 req = calloc(1, sizeof(*req)); 923 if (req == NULL) { 924 cb_fn(cb_arg, -ENOMEM); 925 return; 926 } 927 928 args = &req->args; 929 args->fn.fs_op = cb_fn; 930 args->arg = cb_arg; 931 args->fs = fs; 932 933 fs_free_io_channels(fs); 934 spdk_bs_unload(fs->bs, unload_cb, req); 935 } 936 937 static struct spdk_file * 938 fs_find_file(struct spdk_filesystem *fs, const char *name) 939 { 940 struct spdk_file *file; 941 942 TAILQ_FOREACH(file, &fs->files, tailq) { 943 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 944 return file; 945 } 946 } 947 948 return NULL; 949 } 950 951 void 952 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 953 spdk_file_stat_op_complete cb_fn, void *cb_arg) 954 { 955 struct spdk_file_stat stat; 956 struct spdk_file *f = NULL; 957 958 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 959 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 960 return; 961 } 962 963 f = fs_find_file(fs, name); 964 if (f != NULL) { 965 stat.blobid = f->blobid; 966 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 967 cb_fn(cb_arg, &stat, 0); 968 return; 969 } 970 971 cb_fn(cb_arg, NULL, -ENOENT); 972 } 973 974 static void 975 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 976 { 977 struct spdk_fs_request *req = arg; 978 struct spdk_fs_cb_args *args = &req->args; 979 980 args->rc = fserrno; 981 if (fserrno == 0) { 982 memcpy(args->arg, stat, sizeof(*stat)); 983 } 984 sem_post(args->sem); 985 } 986 987 static void 988 __file_stat(void *arg) 989 { 990 struct spdk_fs_request *req = arg; 991 struct spdk_fs_cb_args *args = &req->args; 992 993 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 994 args->fn.stat_op, req); 995 } 996 997 int 998 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 999 const char *name, struct spdk_file_stat *stat) 1000 { 1001 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1002 struct spdk_fs_request *req; 1003 int rc; 1004 1005 req = alloc_fs_request(channel); 1006 if (req == NULL) { 1007 SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name); 1008 return -ENOMEM; 1009 } 1010 1011 req->args.fs = fs; 1012 req->args.op.stat.name = name; 1013 req->args.fn.stat_op = __copy_stat; 1014 req->args.arg = stat; 1015 req->args.sem = &channel->sem; 1016 channel->send_request(__file_stat, req); 1017 sem_wait(&channel->sem); 1018 1019 rc = req->args.rc; 1020 free_fs_request(req); 1021 1022 return rc; 1023 } 1024 1025 static void 1026 fs_create_blob_close_cb(void *ctx, int bserrno) 1027 { 1028 int rc; 1029 struct spdk_fs_request *req = ctx; 1030 struct spdk_fs_cb_args *args = &req->args; 1031 1032 rc = args->rc ? args->rc : bserrno; 1033 args->fn.file_op(args->arg, rc); 1034 free_fs_request(req); 1035 } 1036 1037 static void 1038 fs_create_blob_resize_cb(void *ctx, int bserrno) 1039 { 1040 struct spdk_fs_request *req = ctx; 1041 struct spdk_fs_cb_args *args = &req->args; 1042 struct spdk_file *f = args->file; 1043 struct spdk_blob *blob = args->op.create.blob; 1044 uint64_t length = 0; 1045 1046 args->rc = bserrno; 1047 if (bserrno) { 1048 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1049 return; 1050 } 1051 1052 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 1053 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 1054 1055 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1056 } 1057 1058 static void 1059 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1060 { 1061 struct spdk_fs_request *req = ctx; 1062 struct spdk_fs_cb_args *args = &req->args; 1063 1064 if (bserrno) { 1065 args->fn.file_op(args->arg, bserrno); 1066 free_fs_request(req); 1067 return; 1068 } 1069 1070 args->op.create.blob = blob; 1071 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 1072 } 1073 1074 static void 1075 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 1076 { 1077 struct spdk_fs_request *req = ctx; 1078 struct spdk_fs_cb_args *args = &req->args; 1079 struct spdk_file *f = args->file; 1080 1081 if (bserrno) { 1082 args->fn.file_op(args->arg, bserrno); 1083 free_fs_request(req); 1084 return; 1085 } 1086 1087 f->blobid = blobid; 1088 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 1089 } 1090 1091 void 1092 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 1093 spdk_file_op_complete cb_fn, void *cb_arg) 1094 { 1095 struct spdk_file *file; 1096 struct spdk_fs_request *req; 1097 struct spdk_fs_cb_args *args; 1098 1099 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1100 cb_fn(cb_arg, -ENAMETOOLONG); 1101 return; 1102 } 1103 1104 file = fs_find_file(fs, name); 1105 if (file != NULL) { 1106 cb_fn(cb_arg, -EEXIST); 1107 return; 1108 } 1109 1110 file = file_alloc(fs); 1111 if (file == NULL) { 1112 SPDK_ERRLOG("Cannot allocate new file for creation\n"); 1113 cb_fn(cb_arg, -ENOMEM); 1114 return; 1115 } 1116 1117 req = alloc_fs_request(fs->md_target.md_fs_channel); 1118 if (req == NULL) { 1119 SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name); 1120 cb_fn(cb_arg, -ENOMEM); 1121 return; 1122 } 1123 1124 args = &req->args; 1125 args->file = file; 1126 args->fn.file_op = cb_fn; 1127 args->arg = cb_arg; 1128 1129 file->name = strdup(name); 1130 _file_build_trace_arg_name(file); 1131 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 1132 } 1133 1134 static void 1135 __fs_create_file_done(void *arg, int fserrno) 1136 { 1137 struct spdk_fs_request *req = arg; 1138 struct spdk_fs_cb_args *args = &req->args; 1139 1140 __wake_caller(args, fserrno); 1141 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1142 } 1143 1144 static void 1145 __fs_create_file(void *arg) 1146 { 1147 struct spdk_fs_request *req = arg; 1148 struct spdk_fs_cb_args *args = &req->args; 1149 1150 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1151 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1152 } 1153 1154 int 1155 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name) 1156 { 1157 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1158 struct spdk_fs_request *req; 1159 struct spdk_fs_cb_args *args; 1160 int rc; 1161 1162 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1163 1164 req = alloc_fs_request(channel); 1165 if (req == NULL) { 1166 SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name); 1167 return -ENOMEM; 1168 } 1169 1170 args = &req->args; 1171 args->fs = fs; 1172 args->op.create.name = name; 1173 args->sem = &channel->sem; 1174 fs->send_request(__fs_create_file, req); 1175 sem_wait(&channel->sem); 1176 rc = args->rc; 1177 free_fs_request(req); 1178 1179 return rc; 1180 } 1181 1182 static void 1183 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1184 { 1185 struct spdk_fs_request *req = ctx; 1186 struct spdk_fs_cb_args *args = &req->args; 1187 struct spdk_file *f = args->file; 1188 1189 f->blob = blob; 1190 while (!TAILQ_EMPTY(&f->open_requests)) { 1191 req = TAILQ_FIRST(&f->open_requests); 1192 args = &req->args; 1193 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1194 spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name); 1195 args->fn.file_op_with_handle(args->arg, f, bserrno); 1196 free_fs_request(req); 1197 } 1198 } 1199 1200 static void 1201 fs_open_blob_create_cb(void *ctx, int bserrno) 1202 { 1203 struct spdk_fs_request *req = ctx; 1204 struct spdk_fs_cb_args *args = &req->args; 1205 struct spdk_file *file = args->file; 1206 struct spdk_filesystem *fs = args->fs; 1207 1208 if (file == NULL) { 1209 /* 1210 * This is from an open with CREATE flag - the file 1211 * is now created so look it up in the file list for this 1212 * filesystem. 1213 */ 1214 file = fs_find_file(fs, args->op.open.name); 1215 assert(file != NULL); 1216 args->file = file; 1217 } 1218 1219 file->ref_count++; 1220 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1221 if (file->ref_count == 1) { 1222 assert(file->blob == NULL); 1223 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1224 } else if (file->blob != NULL) { 1225 fs_open_blob_done(req, file->blob, 0); 1226 } else { 1227 /* 1228 * The blob open for this file is in progress due to a previous 1229 * open request. When that open completes, it will invoke the 1230 * open callback for this request. 1231 */ 1232 } 1233 } 1234 1235 void 1236 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1237 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1238 { 1239 struct spdk_file *f = NULL; 1240 struct spdk_fs_request *req; 1241 struct spdk_fs_cb_args *args; 1242 1243 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1244 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1245 return; 1246 } 1247 1248 f = fs_find_file(fs, name); 1249 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1250 cb_fn(cb_arg, NULL, -ENOENT); 1251 return; 1252 } 1253 1254 if (f != NULL && f->is_deleted == true) { 1255 cb_fn(cb_arg, NULL, -ENOENT); 1256 return; 1257 } 1258 1259 req = alloc_fs_request(fs->md_target.md_fs_channel); 1260 if (req == NULL) { 1261 SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name); 1262 cb_fn(cb_arg, NULL, -ENOMEM); 1263 return; 1264 } 1265 1266 args = &req->args; 1267 args->fn.file_op_with_handle = cb_fn; 1268 args->arg = cb_arg; 1269 args->file = f; 1270 args->fs = fs; 1271 args->op.open.name = name; 1272 1273 if (f == NULL) { 1274 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1275 } else { 1276 fs_open_blob_create_cb(req, 0); 1277 } 1278 } 1279 1280 static void 1281 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1282 { 1283 struct spdk_fs_request *req = arg; 1284 struct spdk_fs_cb_args *args = &req->args; 1285 1286 args->file = file; 1287 __wake_caller(args, bserrno); 1288 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1289 } 1290 1291 static void 1292 __fs_open_file(void *arg) 1293 { 1294 struct spdk_fs_request *req = arg; 1295 struct spdk_fs_cb_args *args = &req->args; 1296 1297 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1298 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1299 __fs_open_file_done, req); 1300 } 1301 1302 int 1303 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1304 const char *name, uint32_t flags, struct spdk_file **file) 1305 { 1306 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1307 struct spdk_fs_request *req; 1308 struct spdk_fs_cb_args *args; 1309 int rc; 1310 1311 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1312 1313 req = alloc_fs_request(channel); 1314 if (req == NULL) { 1315 SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name); 1316 return -ENOMEM; 1317 } 1318 1319 args = &req->args; 1320 args->fs = fs; 1321 args->op.open.name = name; 1322 args->op.open.flags = flags; 1323 args->sem = &channel->sem; 1324 fs->send_request(__fs_open_file, req); 1325 sem_wait(&channel->sem); 1326 rc = args->rc; 1327 if (rc == 0) { 1328 *file = args->file; 1329 } else { 1330 *file = NULL; 1331 } 1332 free_fs_request(req); 1333 1334 return rc; 1335 } 1336 1337 static void 1338 fs_rename_blob_close_cb(void *ctx, int bserrno) 1339 { 1340 struct spdk_fs_request *req = ctx; 1341 struct spdk_fs_cb_args *args = &req->args; 1342 1343 args->fn.fs_op(args->arg, bserrno); 1344 free_fs_request(req); 1345 } 1346 1347 static void 1348 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1349 { 1350 struct spdk_fs_request *req = ctx; 1351 struct spdk_fs_cb_args *args = &req->args; 1352 const char *new_name = args->op.rename.new_name; 1353 1354 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1355 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1356 } 1357 1358 static void 1359 _fs_md_rename_file(struct spdk_fs_request *req) 1360 { 1361 struct spdk_fs_cb_args *args = &req->args; 1362 struct spdk_file *f; 1363 1364 f = fs_find_file(args->fs, args->op.rename.old_name); 1365 if (f == NULL) { 1366 args->fn.fs_op(args->arg, -ENOENT); 1367 free_fs_request(req); 1368 return; 1369 } 1370 1371 free(f->name); 1372 f->name = strdup(args->op.rename.new_name); 1373 _file_build_trace_arg_name(f); 1374 args->file = f; 1375 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1376 } 1377 1378 static void 1379 fs_rename_delete_done(void *arg, int fserrno) 1380 { 1381 _fs_md_rename_file(arg); 1382 } 1383 1384 void 1385 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1386 const char *old_name, const char *new_name, 1387 spdk_file_op_complete cb_fn, void *cb_arg) 1388 { 1389 struct spdk_file *f; 1390 struct spdk_fs_request *req; 1391 struct spdk_fs_cb_args *args; 1392 1393 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1394 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1395 cb_fn(cb_arg, -ENAMETOOLONG); 1396 return; 1397 } 1398 1399 req = alloc_fs_request(fs->md_target.md_fs_channel); 1400 if (req == NULL) { 1401 SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name, 1402 new_name); 1403 cb_fn(cb_arg, -ENOMEM); 1404 return; 1405 } 1406 1407 args = &req->args; 1408 args->fn.fs_op = cb_fn; 1409 args->fs = fs; 1410 args->arg = cb_arg; 1411 args->op.rename.old_name = old_name; 1412 args->op.rename.new_name = new_name; 1413 1414 f = fs_find_file(fs, new_name); 1415 if (f == NULL) { 1416 _fs_md_rename_file(req); 1417 return; 1418 } 1419 1420 /* 1421 * The rename overwrites an existing file. So delete the existing file, then 1422 * do the actual rename. 1423 */ 1424 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1425 } 1426 1427 static void 1428 __fs_rename_file_done(void *arg, int fserrno) 1429 { 1430 struct spdk_fs_request *req = arg; 1431 struct spdk_fs_cb_args *args = &req->args; 1432 1433 __wake_caller(args, fserrno); 1434 } 1435 1436 static void 1437 __fs_rename_file(void *arg) 1438 { 1439 struct spdk_fs_request *req = arg; 1440 struct spdk_fs_cb_args *args = &req->args; 1441 1442 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1443 __fs_rename_file_done, req); 1444 } 1445 1446 int 1447 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1448 const char *old_name, const char *new_name) 1449 { 1450 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1451 struct spdk_fs_request *req; 1452 struct spdk_fs_cb_args *args; 1453 int rc; 1454 1455 req = alloc_fs_request(channel); 1456 if (req == NULL) { 1457 SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name); 1458 return -ENOMEM; 1459 } 1460 1461 args = &req->args; 1462 1463 args->fs = fs; 1464 args->op.rename.old_name = old_name; 1465 args->op.rename.new_name = new_name; 1466 args->sem = &channel->sem; 1467 fs->send_request(__fs_rename_file, req); 1468 sem_wait(&channel->sem); 1469 rc = args->rc; 1470 free_fs_request(req); 1471 return rc; 1472 } 1473 1474 static void 1475 blob_delete_cb(void *ctx, int bserrno) 1476 { 1477 struct spdk_fs_request *req = ctx; 1478 struct spdk_fs_cb_args *args = &req->args; 1479 1480 args->fn.file_op(args->arg, bserrno); 1481 free_fs_request(req); 1482 } 1483 1484 void 1485 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1486 spdk_file_op_complete cb_fn, void *cb_arg) 1487 { 1488 struct spdk_file *f; 1489 spdk_blob_id blobid; 1490 struct spdk_fs_request *req; 1491 struct spdk_fs_cb_args *args; 1492 1493 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1494 1495 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1496 cb_fn(cb_arg, -ENAMETOOLONG); 1497 return; 1498 } 1499 1500 f = fs_find_file(fs, name); 1501 if (f == NULL) { 1502 SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name); 1503 cb_fn(cb_arg, -ENOENT); 1504 return; 1505 } 1506 1507 req = alloc_fs_request(fs->md_target.md_fs_channel); 1508 if (req == NULL) { 1509 SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name); 1510 cb_fn(cb_arg, -ENOMEM); 1511 return; 1512 } 1513 1514 args = &req->args; 1515 args->fn.file_op = cb_fn; 1516 args->arg = cb_arg; 1517 1518 if (f->ref_count > 0) { 1519 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1520 f->is_deleted = true; 1521 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1522 spdk_blob_sync_md(f->blob, blob_delete_cb, req); 1523 return; 1524 } 1525 1526 blobid = f->blobid; 1527 TAILQ_REMOVE(&fs->files, f, tailq); 1528 1529 file_free(f); 1530 1531 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1532 } 1533 1534 static uint64_t 1535 fs_name_to_uint64(const char *name) 1536 { 1537 uint64_t result = 0; 1538 memcpy(&result, name, spdk_min(sizeof(result), strlen(name))); 1539 return result; 1540 } 1541 1542 static void 1543 __fs_delete_file_done(void *arg, int fserrno) 1544 { 1545 struct spdk_fs_request *req = arg; 1546 struct spdk_fs_cb_args *args = &req->args; 1547 1548 spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, fs_name_to_uint64(args->op.delete.name)); 1549 __wake_caller(args, fserrno); 1550 } 1551 1552 static void 1553 __fs_delete_file(void *arg) 1554 { 1555 struct spdk_fs_request *req = arg; 1556 struct spdk_fs_cb_args *args = &req->args; 1557 1558 spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, fs_name_to_uint64(args->op.delete.name)); 1559 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1560 } 1561 1562 int 1563 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1564 const char *name) 1565 { 1566 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1567 struct spdk_fs_request *req; 1568 struct spdk_fs_cb_args *args; 1569 int rc; 1570 1571 req = alloc_fs_request(channel); 1572 if (req == NULL) { 1573 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Cannot allocate req to delete file=%s\n", name); 1574 return -ENOMEM; 1575 } 1576 1577 args = &req->args; 1578 args->fs = fs; 1579 args->op.delete.name = name; 1580 args->sem = &channel->sem; 1581 fs->send_request(__fs_delete_file, req); 1582 sem_wait(&channel->sem); 1583 rc = args->rc; 1584 free_fs_request(req); 1585 1586 return rc; 1587 } 1588 1589 spdk_fs_iter 1590 spdk_fs_iter_first(struct spdk_filesystem *fs) 1591 { 1592 struct spdk_file *f; 1593 1594 f = TAILQ_FIRST(&fs->files); 1595 return f; 1596 } 1597 1598 spdk_fs_iter 1599 spdk_fs_iter_next(spdk_fs_iter iter) 1600 { 1601 struct spdk_file *f = iter; 1602 1603 if (f == NULL) { 1604 return NULL; 1605 } 1606 1607 f = TAILQ_NEXT(f, tailq); 1608 return f; 1609 } 1610 1611 const char * 1612 spdk_file_get_name(struct spdk_file *file) 1613 { 1614 return file->name; 1615 } 1616 1617 uint64_t 1618 spdk_file_get_length(struct spdk_file *file) 1619 { 1620 uint64_t length; 1621 1622 assert(file != NULL); 1623 1624 length = file->append_pos >= file->length ? file->append_pos : file->length; 1625 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length); 1626 return length; 1627 } 1628 1629 static void 1630 fs_truncate_complete_cb(void *ctx, int bserrno) 1631 { 1632 struct spdk_fs_request *req = ctx; 1633 struct spdk_fs_cb_args *args = &req->args; 1634 1635 args->fn.file_op(args->arg, bserrno); 1636 free_fs_request(req); 1637 } 1638 1639 static void 1640 fs_truncate_resize_cb(void *ctx, int bserrno) 1641 { 1642 struct spdk_fs_request *req = ctx; 1643 struct spdk_fs_cb_args *args = &req->args; 1644 struct spdk_file *file = args->file; 1645 uint64_t *length = &args->op.truncate.length; 1646 1647 if (bserrno) { 1648 args->fn.file_op(args->arg, bserrno); 1649 free_fs_request(req); 1650 return; 1651 } 1652 1653 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1654 1655 file->length = *length; 1656 if (file->append_pos > file->length) { 1657 file->append_pos = file->length; 1658 } 1659 1660 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req); 1661 } 1662 1663 static uint64_t 1664 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1665 { 1666 return (length + cluster_sz - 1) / cluster_sz; 1667 } 1668 1669 void 1670 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1671 spdk_file_op_complete cb_fn, void *cb_arg) 1672 { 1673 struct spdk_filesystem *fs; 1674 size_t num_clusters; 1675 struct spdk_fs_request *req; 1676 struct spdk_fs_cb_args *args; 1677 1678 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1679 if (length == file->length) { 1680 cb_fn(cb_arg, 0); 1681 return; 1682 } 1683 1684 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1685 if (req == NULL) { 1686 cb_fn(cb_arg, -ENOMEM); 1687 return; 1688 } 1689 1690 args = &req->args; 1691 args->fn.file_op = cb_fn; 1692 args->arg = cb_arg; 1693 args->file = file; 1694 args->op.truncate.length = length; 1695 fs = file->fs; 1696 1697 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1698 1699 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1700 } 1701 1702 static void 1703 __truncate(void *arg) 1704 { 1705 struct spdk_fs_request *req = arg; 1706 struct spdk_fs_cb_args *args = &req->args; 1707 1708 spdk_file_truncate_async(args->file, args->op.truncate.length, 1709 args->fn.file_op, args); 1710 } 1711 1712 int 1713 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 1714 uint64_t length) 1715 { 1716 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1717 struct spdk_fs_request *req; 1718 struct spdk_fs_cb_args *args; 1719 int rc; 1720 1721 req = alloc_fs_request(channel); 1722 if (req == NULL) { 1723 return -ENOMEM; 1724 } 1725 1726 args = &req->args; 1727 1728 args->file = file; 1729 args->op.truncate.length = length; 1730 args->fn.file_op = __wake_caller; 1731 args->sem = &channel->sem; 1732 1733 channel->send_request(__truncate, req); 1734 sem_wait(&channel->sem); 1735 rc = args->rc; 1736 free_fs_request(req); 1737 1738 return rc; 1739 } 1740 1741 static void 1742 __rw_done(void *ctx, int bserrno) 1743 { 1744 struct spdk_fs_request *req = ctx; 1745 struct spdk_fs_cb_args *args = &req->args; 1746 1747 spdk_free(args->op.rw.pin_buf); 1748 args->fn.file_op(args->arg, bserrno); 1749 free_fs_request(req); 1750 } 1751 1752 static void 1753 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 1754 { 1755 int i; 1756 size_t len; 1757 1758 for (i = 0; i < iovcnt; i++) { 1759 len = spdk_min(iovs[i].iov_len, buf_len); 1760 memcpy(buf, iovs[i].iov_base, len); 1761 buf += len; 1762 assert(buf_len >= len); 1763 buf_len -= len; 1764 } 1765 } 1766 1767 static void 1768 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 1769 { 1770 int i; 1771 size_t len; 1772 1773 for (i = 0; i < iovcnt; i++) { 1774 len = spdk_min(iovs[i].iov_len, buf_len); 1775 memcpy(iovs[i].iov_base, buf, len); 1776 buf += len; 1777 assert(buf_len >= len); 1778 buf_len -= len; 1779 } 1780 } 1781 1782 static void 1783 __read_done(void *ctx, int bserrno) 1784 { 1785 struct spdk_fs_request *req = ctx; 1786 struct spdk_fs_cb_args *args = &req->args; 1787 void *buf; 1788 1789 assert(req != NULL); 1790 buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1))); 1791 if (args->op.rw.is_read) { 1792 _copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length); 1793 __rw_done(req, 0); 1794 } else { 1795 _copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt); 1796 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1797 args->op.rw.pin_buf, 1798 args->op.rw.start_lba, args->op.rw.num_lba, 1799 __rw_done, req); 1800 } 1801 } 1802 1803 static void 1804 __do_blob_read(void *ctx, int fserrno) 1805 { 1806 struct spdk_fs_request *req = ctx; 1807 struct spdk_fs_cb_args *args = &req->args; 1808 1809 if (fserrno) { 1810 __rw_done(req, fserrno); 1811 return; 1812 } 1813 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1814 args->op.rw.pin_buf, 1815 args->op.rw.start_lba, args->op.rw.num_lba, 1816 __read_done, req); 1817 } 1818 1819 static void 1820 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1821 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1822 { 1823 uint64_t end_lba; 1824 1825 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1826 *start_lba = offset / *lba_size; 1827 end_lba = (offset + length - 1) / *lba_size; 1828 *num_lba = (end_lba - *start_lba + 1); 1829 } 1830 1831 static bool 1832 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length) 1833 { 1834 uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1835 1836 if ((offset % lba_size == 0) && (length % lba_size == 0)) { 1837 return true; 1838 } 1839 1840 return false; 1841 } 1842 1843 static void 1844 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt) 1845 { 1846 uint32_t i; 1847 1848 for (i = 0; i < iovcnt; i++) { 1849 req->args.iovs[i].iov_base = iovs[i].iov_base; 1850 req->args.iovs[i].iov_len = iovs[i].iov_len; 1851 } 1852 } 1853 1854 static void 1855 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel, 1856 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1857 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1858 { 1859 struct spdk_fs_request *req; 1860 struct spdk_fs_cb_args *args; 1861 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1862 uint64_t start_lba, num_lba, pin_buf_length; 1863 uint32_t lba_size; 1864 1865 if (is_read && offset + length > file->length) { 1866 cb_fn(cb_arg, -EINVAL); 1867 return; 1868 } 1869 1870 req = alloc_fs_request_with_iov(channel, iovcnt); 1871 if (req == NULL) { 1872 cb_fn(cb_arg, -ENOMEM); 1873 return; 1874 } 1875 1876 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1877 1878 args = &req->args; 1879 args->fn.file_op = cb_fn; 1880 args->arg = cb_arg; 1881 args->file = file; 1882 args->op.rw.channel = channel->bs_channel; 1883 _fs_request_setup_iovs(req, iovs, iovcnt); 1884 args->op.rw.is_read = is_read; 1885 args->op.rw.offset = offset; 1886 args->op.rw.blocklen = lba_size; 1887 1888 pin_buf_length = num_lba * lba_size; 1889 args->op.rw.length = pin_buf_length; 1890 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL, 1891 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1892 if (args->op.rw.pin_buf == NULL) { 1893 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1894 file->name, offset, length); 1895 free_fs_request(req); 1896 cb_fn(cb_arg, -ENOMEM); 1897 return; 1898 } 1899 1900 args->op.rw.start_lba = start_lba; 1901 args->op.rw.num_lba = num_lba; 1902 1903 if (!is_read && file->length < offset + length) { 1904 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1905 } else if (!is_read && __is_lba_aligned(file, offset, length)) { 1906 _copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt); 1907 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1908 args->op.rw.pin_buf, 1909 args->op.rw.start_lba, args->op.rw.num_lba, 1910 __rw_done, req); 1911 } else { 1912 __do_blob_read(req, 0); 1913 } 1914 } 1915 1916 static void 1917 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel, 1918 void *payload, uint64_t offset, uint64_t length, 1919 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1920 { 1921 struct iovec iov; 1922 1923 iov.iov_base = payload; 1924 iov.iov_len = (size_t)length; 1925 1926 __readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read); 1927 } 1928 1929 void 1930 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1931 void *payload, uint64_t offset, uint64_t length, 1932 spdk_file_op_complete cb_fn, void *cb_arg) 1933 { 1934 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1935 } 1936 1937 void 1938 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel, 1939 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1940 spdk_file_op_complete cb_fn, void *cb_arg) 1941 { 1942 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1943 file->name, offset, length); 1944 1945 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0); 1946 } 1947 1948 void 1949 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1950 void *payload, uint64_t offset, uint64_t length, 1951 spdk_file_op_complete cb_fn, void *cb_arg) 1952 { 1953 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1954 file->name, offset, length); 1955 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1956 } 1957 1958 void 1959 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel, 1960 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1961 spdk_file_op_complete cb_fn, void *cb_arg) 1962 { 1963 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1964 file->name, offset, length); 1965 1966 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1); 1967 } 1968 1969 struct spdk_io_channel * 1970 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1971 { 1972 struct spdk_io_channel *io_channel; 1973 struct spdk_fs_channel *fs_channel; 1974 1975 io_channel = spdk_get_io_channel(&fs->io_target); 1976 fs_channel = spdk_io_channel_get_ctx(io_channel); 1977 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1978 fs_channel->send_request = __send_request_direct; 1979 1980 return io_channel; 1981 } 1982 1983 void 1984 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1985 { 1986 spdk_put_io_channel(channel); 1987 } 1988 1989 struct spdk_fs_thread_ctx * 1990 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs) 1991 { 1992 struct spdk_fs_thread_ctx *ctx; 1993 1994 ctx = calloc(1, sizeof(*ctx)); 1995 if (!ctx) { 1996 return NULL; 1997 } 1998 1999 fs_channel_create(fs, &ctx->ch, 512); 2000 2001 ctx->ch.send_request = fs->send_request; 2002 ctx->ch.sync = 1; 2003 pthread_spin_init(&ctx->ch.lock, 0); 2004 2005 return ctx; 2006 } 2007 2008 2009 void 2010 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx) 2011 { 2012 assert(ctx->ch.sync == 1); 2013 2014 while (true) { 2015 pthread_spin_lock(&ctx->ch.lock); 2016 if (ctx->ch.outstanding_reqs == 0) { 2017 pthread_spin_unlock(&ctx->ch.lock); 2018 break; 2019 } 2020 pthread_spin_unlock(&ctx->ch.lock); 2021 usleep(1000); 2022 } 2023 2024 fs_channel_destroy(NULL, &ctx->ch); 2025 free(ctx); 2026 } 2027 2028 int 2029 spdk_fs_set_cache_size(uint64_t size_in_mb) 2030 { 2031 /* setting g_fs_cache_size is only permitted if cache pool 2032 * is already freed or hasn't been initialized 2033 */ 2034 if (g_cache_pool != NULL) { 2035 return -EPERM; 2036 } 2037 2038 g_fs_cache_size = size_in_mb * 1024 * 1024; 2039 2040 return 0; 2041 } 2042 2043 uint64_t 2044 spdk_fs_get_cache_size(void) 2045 { 2046 return g_fs_cache_size / (1024 * 1024); 2047 } 2048 2049 static void __file_flush(void *ctx); 2050 2051 /* Try to free some cache buffers from this file. 2052 */ 2053 static int 2054 reclaim_cache_buffers(struct spdk_file *file) 2055 { 2056 int rc; 2057 2058 BLOBFS_TRACE(file, "free=%s\n", file->name); 2059 2060 /* The function is safe to be called with any threads, while the file 2061 * lock maybe locked by other thread for now, so try to get the file 2062 * lock here. 2063 */ 2064 rc = pthread_spin_trylock(&file->lock); 2065 if (rc != 0) { 2066 return -1; 2067 } 2068 2069 if (file->tree->present_mask == 0) { 2070 pthread_spin_unlock(&file->lock); 2071 return -1; 2072 } 2073 tree_free_buffers(file->tree); 2074 2075 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2076 /* If not freed, put it in the end of the queue */ 2077 if (file->tree->present_mask != 0) { 2078 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2079 } else { 2080 file->last = NULL; 2081 } 2082 pthread_spin_unlock(&file->lock); 2083 2084 return 0; 2085 } 2086 2087 static int 2088 _blobfs_cache_pool_reclaim(void *arg) 2089 { 2090 struct spdk_file *file, *tmp; 2091 int rc; 2092 2093 if (!blobfs_cache_pool_need_reclaim()) { 2094 return SPDK_POLLER_IDLE; 2095 } 2096 2097 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2098 if (!file->open_for_writing && 2099 file->priority == SPDK_FILE_PRIORITY_LOW) { 2100 rc = reclaim_cache_buffers(file); 2101 if (rc < 0) { 2102 continue; 2103 } 2104 if (!blobfs_cache_pool_need_reclaim()) { 2105 return SPDK_POLLER_BUSY; 2106 } 2107 break; 2108 } 2109 } 2110 2111 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2112 if (!file->open_for_writing) { 2113 rc = reclaim_cache_buffers(file); 2114 if (rc < 0) { 2115 continue; 2116 } 2117 if (!blobfs_cache_pool_need_reclaim()) { 2118 return SPDK_POLLER_BUSY; 2119 } 2120 break; 2121 } 2122 } 2123 2124 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2125 rc = reclaim_cache_buffers(file); 2126 if (rc < 0) { 2127 continue; 2128 } 2129 break; 2130 } 2131 2132 return SPDK_POLLER_BUSY; 2133 } 2134 2135 static void 2136 _add_file_to_cache_pool(void *ctx) 2137 { 2138 struct spdk_file *file = ctx; 2139 2140 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2141 } 2142 2143 static void 2144 _remove_file_from_cache_pool(void *ctx) 2145 { 2146 struct spdk_file *file = ctx; 2147 2148 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2149 } 2150 2151 static struct cache_buffer * 2152 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 2153 { 2154 struct cache_buffer *buf; 2155 int count = 0; 2156 bool need_update = false; 2157 2158 buf = calloc(1, sizeof(*buf)); 2159 if (buf == NULL) { 2160 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 2161 return NULL; 2162 } 2163 2164 do { 2165 buf->buf = spdk_mempool_get(g_cache_pool); 2166 if (buf->buf) { 2167 break; 2168 } 2169 if (count++ == 100) { 2170 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 2171 file, offset); 2172 free(buf); 2173 return NULL; 2174 } 2175 usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 2176 } while (true); 2177 2178 buf->buf_size = CACHE_BUFFER_SIZE; 2179 buf->offset = offset; 2180 2181 if (file->tree->present_mask == 0) { 2182 need_update = true; 2183 } 2184 file->tree = tree_insert_buffer(file->tree, buf); 2185 2186 if (need_update) { 2187 spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file); 2188 } 2189 2190 return buf; 2191 } 2192 2193 static struct cache_buffer * 2194 cache_append_buffer(struct spdk_file *file) 2195 { 2196 struct cache_buffer *last; 2197 2198 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 2199 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 2200 2201 last = cache_insert_buffer(file, file->append_pos); 2202 if (last == NULL) { 2203 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 2204 return NULL; 2205 } 2206 2207 file->last = last; 2208 2209 return last; 2210 } 2211 2212 static void __check_sync_reqs(struct spdk_file *file); 2213 2214 static void 2215 __file_cache_finish_sync(void *ctx, int bserrno) 2216 { 2217 struct spdk_file *file; 2218 struct spdk_fs_request *sync_req = ctx; 2219 struct spdk_fs_cb_args *sync_args; 2220 2221 sync_args = &sync_req->args; 2222 file = sync_args->file; 2223 pthread_spin_lock(&file->lock); 2224 file->length_xattr = sync_args->op.sync.length; 2225 assert(sync_args->op.sync.offset <= file->length_flushed); 2226 spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset, 2227 0, file->trace_arg_name); 2228 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 2229 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 2230 pthread_spin_unlock(&file->lock); 2231 2232 sync_args->fn.file_op(sync_args->arg, bserrno); 2233 2234 free_fs_request(sync_req); 2235 __check_sync_reqs(file); 2236 } 2237 2238 static void 2239 __check_sync_reqs(struct spdk_file *file) 2240 { 2241 struct spdk_fs_request *sync_req; 2242 2243 pthread_spin_lock(&file->lock); 2244 2245 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 2246 if (sync_req->args.op.sync.offset <= file->length_flushed) { 2247 break; 2248 } 2249 } 2250 2251 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 2252 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 2253 sync_req->args.op.sync.xattr_in_progress = true; 2254 sync_req->args.op.sync.length = file->length_flushed; 2255 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 2256 sizeof(file->length_flushed)); 2257 2258 pthread_spin_unlock(&file->lock); 2259 spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed, 2260 0, file->trace_arg_name); 2261 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req); 2262 } else { 2263 pthread_spin_unlock(&file->lock); 2264 } 2265 } 2266 2267 static void 2268 __file_flush_done(void *ctx, int bserrno) 2269 { 2270 struct spdk_fs_request *req = ctx; 2271 struct spdk_fs_cb_args *args = &req->args; 2272 struct spdk_file *file = args->file; 2273 struct cache_buffer *next = args->op.flush.cache_buffer; 2274 2275 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 2276 2277 pthread_spin_lock(&file->lock); 2278 next->in_progress = false; 2279 next->bytes_flushed += args->op.flush.length; 2280 file->length_flushed += args->op.flush.length; 2281 if (file->length_flushed > file->length) { 2282 file->length = file->length_flushed; 2283 } 2284 if (next->bytes_flushed == next->buf_size) { 2285 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 2286 next = tree_find_buffer(file->tree, file->length_flushed); 2287 } 2288 2289 /* 2290 * Assert that there is no cached data that extends past the end of the underlying 2291 * blob. 2292 */ 2293 assert(next == NULL || next->offset < __file_get_blob_size(file) || 2294 next->bytes_filled == 0); 2295 2296 pthread_spin_unlock(&file->lock); 2297 2298 __check_sync_reqs(file); 2299 2300 __file_flush(req); 2301 } 2302 2303 static void 2304 __file_flush(void *ctx) 2305 { 2306 struct spdk_fs_request *req = ctx; 2307 struct spdk_fs_cb_args *args = &req->args; 2308 struct spdk_file *file = args->file; 2309 struct cache_buffer *next; 2310 uint64_t offset, length, start_lba, num_lba; 2311 uint32_t lba_size; 2312 2313 pthread_spin_lock(&file->lock); 2314 next = tree_find_buffer(file->tree, file->length_flushed); 2315 if (next == NULL || next->in_progress || 2316 ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) { 2317 /* 2318 * There is either no data to flush, a flush I/O is already in 2319 * progress, or the next buffer is partially filled but there's no 2320 * outstanding request to sync it. 2321 * So return immediately - if a flush I/O is in progress we will flush 2322 * more data after that is completed, or a partial buffer will get flushed 2323 * when it is either filled or the file is synced. 2324 */ 2325 free_fs_request(req); 2326 if (next == NULL) { 2327 /* 2328 * For cases where a file's cache was evicted, and then the 2329 * file was later appended, we will write the data directly 2330 * to disk and bypass cache. So just update length_flushed 2331 * here to reflect that all data was already written to disk. 2332 */ 2333 file->length_flushed = file->append_pos; 2334 } 2335 pthread_spin_unlock(&file->lock); 2336 if (next == NULL) { 2337 /* 2338 * There is no data to flush, but we still need to check for any 2339 * outstanding sync requests to make sure metadata gets updated. 2340 */ 2341 __check_sync_reqs(file); 2342 } 2343 return; 2344 } 2345 2346 offset = next->offset + next->bytes_flushed; 2347 length = next->bytes_filled - next->bytes_flushed; 2348 if (length == 0) { 2349 free_fs_request(req); 2350 pthread_spin_unlock(&file->lock); 2351 /* 2352 * There is no data to flush, but we still need to check for any 2353 * outstanding sync requests to make sure metadata gets updated. 2354 */ 2355 __check_sync_reqs(file); 2356 return; 2357 } 2358 args->op.flush.length = length; 2359 args->op.flush.cache_buffer = next; 2360 2361 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2362 2363 next->in_progress = true; 2364 BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n", 2365 offset, length, start_lba, num_lba); 2366 pthread_spin_unlock(&file->lock); 2367 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2368 next->buf + (start_lba * lba_size) - next->offset, 2369 start_lba, num_lba, __file_flush_done, req); 2370 } 2371 2372 static void 2373 __file_extend_done(void *arg, int bserrno) 2374 { 2375 struct spdk_fs_cb_args *args = arg; 2376 2377 __wake_caller(args, bserrno); 2378 } 2379 2380 static void 2381 __file_extend_resize_cb(void *_args, int bserrno) 2382 { 2383 struct spdk_fs_cb_args *args = _args; 2384 struct spdk_file *file = args->file; 2385 2386 if (bserrno) { 2387 __wake_caller(args, bserrno); 2388 return; 2389 } 2390 2391 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2392 } 2393 2394 static void 2395 __file_extend_blob(void *_args) 2396 { 2397 struct spdk_fs_cb_args *args = _args; 2398 struct spdk_file *file = args->file; 2399 2400 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2401 } 2402 2403 static void 2404 __rw_from_file_done(void *ctx, int bserrno) 2405 { 2406 struct spdk_fs_request *req = ctx; 2407 2408 __wake_caller(&req->args, bserrno); 2409 free_fs_request(req); 2410 } 2411 2412 static void 2413 __rw_from_file(void *ctx) 2414 { 2415 struct spdk_fs_request *req = ctx; 2416 struct spdk_fs_cb_args *args = &req->args; 2417 struct spdk_file *file = args->file; 2418 2419 if (args->op.rw.is_read) { 2420 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2421 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2422 __rw_from_file_done, req); 2423 } else { 2424 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2425 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2426 __rw_from_file_done, req); 2427 } 2428 } 2429 2430 static int 2431 __send_rw_from_file(struct spdk_file *file, void *payload, 2432 uint64_t offset, uint64_t length, bool is_read, 2433 struct spdk_fs_channel *channel) 2434 { 2435 struct spdk_fs_request *req; 2436 struct spdk_fs_cb_args *args; 2437 2438 req = alloc_fs_request_with_iov(channel, 1); 2439 if (req == NULL) { 2440 sem_post(&channel->sem); 2441 return -ENOMEM; 2442 } 2443 2444 args = &req->args; 2445 args->file = file; 2446 args->sem = &channel->sem; 2447 args->iovs[0].iov_base = payload; 2448 args->iovs[0].iov_len = (size_t)length; 2449 args->op.rw.offset = offset; 2450 args->op.rw.is_read = is_read; 2451 file->fs->send_request(__rw_from_file, req); 2452 return 0; 2453 } 2454 2455 int 2456 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2457 void *payload, uint64_t offset, uint64_t length) 2458 { 2459 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2460 struct spdk_fs_request *flush_req; 2461 uint64_t rem_length, copy, blob_size, cluster_sz; 2462 uint32_t cache_buffers_filled = 0; 2463 uint8_t *cur_payload; 2464 struct cache_buffer *last; 2465 2466 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2467 2468 if (length == 0) { 2469 return 0; 2470 } 2471 2472 if (offset != file->append_pos) { 2473 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2474 return -EINVAL; 2475 } 2476 2477 pthread_spin_lock(&file->lock); 2478 file->open_for_writing = true; 2479 2480 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2481 cache_append_buffer(file); 2482 } 2483 2484 if (file->last == NULL) { 2485 int rc; 2486 2487 file->append_pos += length; 2488 pthread_spin_unlock(&file->lock); 2489 rc = __send_rw_from_file(file, payload, offset, length, false, channel); 2490 sem_wait(&channel->sem); 2491 return rc; 2492 } 2493 2494 blob_size = __file_get_blob_size(file); 2495 2496 if ((offset + length) > blob_size) { 2497 struct spdk_fs_cb_args extend_args = {}; 2498 2499 cluster_sz = file->fs->bs_opts.cluster_sz; 2500 extend_args.sem = &channel->sem; 2501 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2502 extend_args.file = file; 2503 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2504 pthread_spin_unlock(&file->lock); 2505 file->fs->send_request(__file_extend_blob, &extend_args); 2506 sem_wait(&channel->sem); 2507 if (extend_args.rc) { 2508 return extend_args.rc; 2509 } 2510 } 2511 2512 flush_req = alloc_fs_request(channel); 2513 if (flush_req == NULL) { 2514 pthread_spin_unlock(&file->lock); 2515 return -ENOMEM; 2516 } 2517 2518 last = file->last; 2519 rem_length = length; 2520 cur_payload = payload; 2521 while (rem_length > 0) { 2522 copy = last->buf_size - last->bytes_filled; 2523 if (copy > rem_length) { 2524 copy = rem_length; 2525 } 2526 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2527 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2528 file->append_pos += copy; 2529 if (file->length < file->append_pos) { 2530 file->length = file->append_pos; 2531 } 2532 cur_payload += copy; 2533 last->bytes_filled += copy; 2534 rem_length -= copy; 2535 if (last->bytes_filled == last->buf_size) { 2536 cache_buffers_filled++; 2537 last = cache_append_buffer(file); 2538 if (last == NULL) { 2539 BLOBFS_TRACE(file, "nomem\n"); 2540 free_fs_request(flush_req); 2541 pthread_spin_unlock(&file->lock); 2542 return -ENOMEM; 2543 } 2544 } 2545 } 2546 2547 pthread_spin_unlock(&file->lock); 2548 2549 if (cache_buffers_filled == 0) { 2550 free_fs_request(flush_req); 2551 return 0; 2552 } 2553 2554 flush_req->args.file = file; 2555 file->fs->send_request(__file_flush, flush_req); 2556 return 0; 2557 } 2558 2559 static void 2560 __readahead_done(void *ctx, int bserrno) 2561 { 2562 struct spdk_fs_request *req = ctx; 2563 struct spdk_fs_cb_args *args = &req->args; 2564 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2565 struct spdk_file *file = args->file; 2566 2567 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2568 2569 pthread_spin_lock(&file->lock); 2570 cache_buffer->bytes_filled = args->op.readahead.length; 2571 cache_buffer->bytes_flushed = args->op.readahead.length; 2572 cache_buffer->in_progress = false; 2573 pthread_spin_unlock(&file->lock); 2574 2575 free_fs_request(req); 2576 } 2577 2578 static void 2579 __readahead(void *ctx) 2580 { 2581 struct spdk_fs_request *req = ctx; 2582 struct spdk_fs_cb_args *args = &req->args; 2583 struct spdk_file *file = args->file; 2584 uint64_t offset, length, start_lba, num_lba; 2585 uint32_t lba_size; 2586 2587 offset = args->op.readahead.offset; 2588 length = args->op.readahead.length; 2589 assert(length > 0); 2590 2591 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2592 2593 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2594 offset, length, start_lba, num_lba); 2595 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2596 args->op.readahead.cache_buffer->buf, 2597 start_lba, num_lba, __readahead_done, req); 2598 } 2599 2600 static uint64_t 2601 __next_cache_buffer_offset(uint64_t offset) 2602 { 2603 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2604 } 2605 2606 static void 2607 check_readahead(struct spdk_file *file, uint64_t offset, 2608 struct spdk_fs_channel *channel) 2609 { 2610 struct spdk_fs_request *req; 2611 struct spdk_fs_cb_args *args; 2612 2613 offset = __next_cache_buffer_offset(offset); 2614 if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2615 return; 2616 } 2617 2618 req = alloc_fs_request(channel); 2619 if (req == NULL) { 2620 return; 2621 } 2622 args = &req->args; 2623 2624 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2625 2626 args->file = file; 2627 args->op.readahead.offset = offset; 2628 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2629 if (!args->op.readahead.cache_buffer) { 2630 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2631 free_fs_request(req); 2632 return; 2633 } 2634 2635 args->op.readahead.cache_buffer->in_progress = true; 2636 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2637 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2638 } else { 2639 args->op.readahead.length = CACHE_BUFFER_SIZE; 2640 } 2641 file->fs->send_request(__readahead, req); 2642 } 2643 2644 int64_t 2645 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2646 void *payload, uint64_t offset, uint64_t length) 2647 { 2648 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2649 uint64_t final_offset, final_length; 2650 uint32_t sub_reads = 0; 2651 struct cache_buffer *buf; 2652 uint64_t read_len; 2653 int rc = 0; 2654 2655 pthread_spin_lock(&file->lock); 2656 2657 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2658 2659 file->open_for_writing = false; 2660 2661 if (length == 0 || offset >= file->append_pos) { 2662 pthread_spin_unlock(&file->lock); 2663 return 0; 2664 } 2665 2666 if (offset + length > file->append_pos) { 2667 length = file->append_pos - offset; 2668 } 2669 2670 if (offset != file->next_seq_offset) { 2671 file->seq_byte_count = 0; 2672 } 2673 file->seq_byte_count += length; 2674 file->next_seq_offset = offset + length; 2675 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2676 check_readahead(file, offset, channel); 2677 check_readahead(file, offset + CACHE_BUFFER_SIZE, channel); 2678 } 2679 2680 final_length = 0; 2681 final_offset = offset + length; 2682 while (offset < final_offset) { 2683 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2684 if (length > (final_offset - offset)) { 2685 length = final_offset - offset; 2686 } 2687 2688 buf = tree_find_filled_buffer(file->tree, offset); 2689 if (buf == NULL) { 2690 pthread_spin_unlock(&file->lock); 2691 rc = __send_rw_from_file(file, payload, offset, length, true, channel); 2692 pthread_spin_lock(&file->lock); 2693 if (rc == 0) { 2694 sub_reads++; 2695 } 2696 } else { 2697 read_len = length; 2698 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2699 read_len = buf->offset + buf->bytes_filled - offset; 2700 } 2701 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len); 2702 memcpy(payload, &buf->buf[offset - buf->offset], read_len); 2703 if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) { 2704 tree_remove_buffer(file->tree, buf); 2705 if (file->tree->present_mask == 0) { 2706 spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file); 2707 } 2708 } 2709 } 2710 2711 if (rc == 0) { 2712 final_length += length; 2713 } else { 2714 break; 2715 } 2716 payload += length; 2717 offset += length; 2718 } 2719 pthread_spin_unlock(&file->lock); 2720 while (sub_reads > 0) { 2721 sem_wait(&channel->sem); 2722 sub_reads--; 2723 } 2724 if (rc == 0) { 2725 return final_length; 2726 } else { 2727 return rc; 2728 } 2729 } 2730 2731 static void 2732 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2733 spdk_file_op_complete cb_fn, void *cb_arg) 2734 { 2735 struct spdk_fs_request *sync_req; 2736 struct spdk_fs_request *flush_req; 2737 struct spdk_fs_cb_args *sync_args; 2738 struct spdk_fs_cb_args *flush_args; 2739 2740 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2741 2742 pthread_spin_lock(&file->lock); 2743 if (file->append_pos <= file->length_xattr) { 2744 BLOBFS_TRACE(file, "done - file already synced\n"); 2745 pthread_spin_unlock(&file->lock); 2746 cb_fn(cb_arg, 0); 2747 return; 2748 } 2749 2750 sync_req = alloc_fs_request(channel); 2751 if (!sync_req) { 2752 SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name); 2753 pthread_spin_unlock(&file->lock); 2754 cb_fn(cb_arg, -ENOMEM); 2755 return; 2756 } 2757 sync_args = &sync_req->args; 2758 2759 flush_req = alloc_fs_request(channel); 2760 if (!flush_req) { 2761 SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name); 2762 free_fs_request(sync_req); 2763 pthread_spin_unlock(&file->lock); 2764 cb_fn(cb_arg, -ENOMEM); 2765 return; 2766 } 2767 flush_args = &flush_req->args; 2768 2769 sync_args->file = file; 2770 sync_args->fn.file_op = cb_fn; 2771 sync_args->arg = cb_arg; 2772 sync_args->op.sync.offset = file->append_pos; 2773 sync_args->op.sync.xattr_in_progress = false; 2774 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2775 pthread_spin_unlock(&file->lock); 2776 2777 flush_args->file = file; 2778 channel->send_request(__file_flush, flush_req); 2779 } 2780 2781 int 2782 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2783 { 2784 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2785 struct spdk_fs_cb_args args = {}; 2786 2787 args.sem = &channel->sem; 2788 _file_sync(file, channel, __wake_caller, &args); 2789 sem_wait(&channel->sem); 2790 2791 return args.rc; 2792 } 2793 2794 void 2795 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2796 spdk_file_op_complete cb_fn, void *cb_arg) 2797 { 2798 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2799 2800 _file_sync(file, channel, cb_fn, cb_arg); 2801 } 2802 2803 void 2804 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2805 { 2806 BLOBFS_TRACE(file, "priority=%u\n", priority); 2807 file->priority = priority; 2808 2809 } 2810 2811 /* 2812 * Close routines 2813 */ 2814 2815 static void 2816 __file_close_async_done(void *ctx, int bserrno) 2817 { 2818 struct spdk_fs_request *req = ctx; 2819 struct spdk_fs_cb_args *args = &req->args; 2820 struct spdk_file *file = args->file; 2821 2822 spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name); 2823 2824 if (file->is_deleted) { 2825 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2826 return; 2827 } 2828 2829 args->fn.file_op(args->arg, bserrno); 2830 free_fs_request(req); 2831 } 2832 2833 static void 2834 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2835 { 2836 struct spdk_blob *blob; 2837 2838 pthread_spin_lock(&file->lock); 2839 if (file->ref_count == 0) { 2840 pthread_spin_unlock(&file->lock); 2841 __file_close_async_done(req, -EBADF); 2842 return; 2843 } 2844 2845 file->ref_count--; 2846 if (file->ref_count > 0) { 2847 pthread_spin_unlock(&file->lock); 2848 req->args.fn.file_op(req->args.arg, 0); 2849 free_fs_request(req); 2850 return; 2851 } 2852 2853 pthread_spin_unlock(&file->lock); 2854 2855 blob = file->blob; 2856 file->blob = NULL; 2857 spdk_blob_close(blob, __file_close_async_done, req); 2858 } 2859 2860 static void 2861 __file_close_async__sync_done(void *arg, int fserrno) 2862 { 2863 struct spdk_fs_request *req = arg; 2864 struct spdk_fs_cb_args *args = &req->args; 2865 2866 __file_close_async(args->file, req); 2867 } 2868 2869 void 2870 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2871 { 2872 struct spdk_fs_request *req; 2873 struct spdk_fs_cb_args *args; 2874 2875 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2876 if (req == NULL) { 2877 SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name); 2878 cb_fn(cb_arg, -ENOMEM); 2879 return; 2880 } 2881 2882 args = &req->args; 2883 args->file = file; 2884 args->fn.file_op = cb_fn; 2885 args->arg = cb_arg; 2886 2887 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2888 } 2889 2890 static void 2891 __file_close(void *arg) 2892 { 2893 struct spdk_fs_request *req = arg; 2894 struct spdk_fs_cb_args *args = &req->args; 2895 struct spdk_file *file = args->file; 2896 2897 __file_close_async(file, req); 2898 } 2899 2900 int 2901 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2902 { 2903 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2904 struct spdk_fs_request *req; 2905 struct spdk_fs_cb_args *args; 2906 2907 req = alloc_fs_request(channel); 2908 if (req == NULL) { 2909 SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name); 2910 return -ENOMEM; 2911 } 2912 2913 args = &req->args; 2914 2915 spdk_file_sync(file, ctx); 2916 BLOBFS_TRACE(file, "name=%s\n", file->name); 2917 args->file = file; 2918 args->sem = &channel->sem; 2919 args->fn.file_op = __wake_caller; 2920 args->arg = args; 2921 channel->send_request(__file_close, req); 2922 sem_wait(&channel->sem); 2923 2924 return args->rc; 2925 } 2926 2927 int 2928 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2929 { 2930 if (size < sizeof(spdk_blob_id)) { 2931 return -EINVAL; 2932 } 2933 2934 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2935 2936 return sizeof(spdk_blob_id); 2937 } 2938 2939 static void 2940 _file_free(void *ctx) 2941 { 2942 struct spdk_file *file = ctx; 2943 2944 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2945 2946 free(file->name); 2947 free(file->tree); 2948 free(file); 2949 } 2950 2951 static void 2952 file_free(struct spdk_file *file) 2953 { 2954 BLOBFS_TRACE(file, "free=%s\n", file->name); 2955 pthread_spin_lock(&file->lock); 2956 if (file->tree->present_mask == 0) { 2957 pthread_spin_unlock(&file->lock); 2958 free(file->name); 2959 free(file->tree); 2960 free(file); 2961 return; 2962 } 2963 2964 tree_free_buffers(file->tree); 2965 assert(file->tree->present_mask == 0); 2966 spdk_thread_send_msg(g_cache_pool_thread, _file_free, file); 2967 pthread_spin_unlock(&file->lock); 2968 } 2969 2970 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2971 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2972