1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "tree.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/thread.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 #include "spdk/trace.h" 47 48 #define BLOBFS_TRACE(file, str, args...) \ 49 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 50 51 #define BLOBFS_TRACE_RW(file, str, args...) \ 52 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 53 54 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 55 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 56 57 #define SPDK_BLOBFS_SIGNATURE "BLOBFS" 58 59 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 60 static struct spdk_mempool *g_cache_pool; 61 static TAILQ_HEAD(, spdk_file) g_caches; 62 static struct spdk_poller *g_cache_pool_mgmt_poller; 63 static struct spdk_thread *g_cache_pool_thread; 64 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL 65 static int g_fs_count = 0; 66 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 67 static pthread_spinlock_t g_caches_lock; 68 69 #define TRACE_GROUP_BLOBFS 0x7 70 #define TRACE_BLOBFS_XATTR_START SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0) 71 #define TRACE_BLOBFS_XATTR_END SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1) 72 #define TRACE_BLOBFS_OPEN SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2) 73 #define TRACE_BLOBFS_CLOSE SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3) 74 #define TRACE_BLOBFS_DELETE_START SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4) 75 #define TRACE_BLOBFS_DELETE_DONE SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5) 76 77 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS) 78 { 79 spdk_trace_register_description("BLOBFS_XATTR_START", 80 TRACE_BLOBFS_XATTR_START, 81 OWNER_NONE, OBJECT_NONE, 0, 82 SPDK_TRACE_ARG_TYPE_STR, 83 "file: "); 84 spdk_trace_register_description("BLOBFS_XATTR_END", 85 TRACE_BLOBFS_XATTR_END, 86 OWNER_NONE, OBJECT_NONE, 0, 87 SPDK_TRACE_ARG_TYPE_STR, 88 "file: "); 89 spdk_trace_register_description("BLOBFS_OPEN", 90 TRACE_BLOBFS_OPEN, 91 OWNER_NONE, OBJECT_NONE, 0, 92 SPDK_TRACE_ARG_TYPE_STR, 93 "file: "); 94 spdk_trace_register_description("BLOBFS_CLOSE", 95 TRACE_BLOBFS_CLOSE, 96 OWNER_NONE, OBJECT_NONE, 0, 97 SPDK_TRACE_ARG_TYPE_STR, 98 "file: "); 99 spdk_trace_register_description("BLOBFS_DELETE_START", 100 TRACE_BLOBFS_DELETE_START, 101 OWNER_NONE, OBJECT_NONE, 0, 102 SPDK_TRACE_ARG_TYPE_STR, 103 "file: "); 104 spdk_trace_register_description("BLOBFS_DELETE_DONE", 105 TRACE_BLOBFS_DELETE_DONE, 106 OWNER_NONE, OBJECT_NONE, 0, 107 SPDK_TRACE_ARG_TYPE_STR, 108 "file: "); 109 } 110 111 void 112 cache_buffer_free(struct cache_buffer *cache_buffer) 113 { 114 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 115 free(cache_buffer); 116 } 117 118 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 119 120 struct spdk_file { 121 struct spdk_filesystem *fs; 122 struct spdk_blob *blob; 123 char *name; 124 uint64_t trace_arg_name; 125 uint64_t length; 126 bool is_deleted; 127 bool open_for_writing; 128 uint64_t length_flushed; 129 uint64_t length_xattr; 130 uint64_t append_pos; 131 uint64_t seq_byte_count; 132 uint64_t next_seq_offset; 133 uint32_t priority; 134 TAILQ_ENTRY(spdk_file) tailq; 135 spdk_blob_id blobid; 136 uint32_t ref_count; 137 pthread_spinlock_t lock; 138 struct cache_buffer *last; 139 struct cache_tree *tree; 140 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 141 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 142 TAILQ_ENTRY(spdk_file) cache_tailq; 143 }; 144 145 struct spdk_deleted_file { 146 spdk_blob_id id; 147 TAILQ_ENTRY(spdk_deleted_file) tailq; 148 }; 149 150 struct spdk_filesystem { 151 struct spdk_blob_store *bs; 152 TAILQ_HEAD(, spdk_file) files; 153 struct spdk_bs_opts bs_opts; 154 struct spdk_bs_dev *bdev; 155 fs_send_request_fn send_request; 156 157 struct { 158 uint32_t max_ops; 159 struct spdk_io_channel *sync_io_channel; 160 struct spdk_fs_channel *sync_fs_channel; 161 } sync_target; 162 163 struct { 164 uint32_t max_ops; 165 struct spdk_io_channel *md_io_channel; 166 struct spdk_fs_channel *md_fs_channel; 167 } md_target; 168 169 struct { 170 uint32_t max_ops; 171 } io_target; 172 }; 173 174 struct spdk_fs_cb_args { 175 union { 176 spdk_fs_op_with_handle_complete fs_op_with_handle; 177 spdk_fs_op_complete fs_op; 178 spdk_file_op_with_handle_complete file_op_with_handle; 179 spdk_file_op_complete file_op; 180 spdk_file_stat_op_complete stat_op; 181 } fn; 182 void *arg; 183 sem_t *sem; 184 struct spdk_filesystem *fs; 185 struct spdk_file *file; 186 int rc; 187 struct iovec *iovs; 188 uint32_t iovcnt; 189 struct iovec iov; 190 union { 191 struct { 192 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 193 } fs_load; 194 struct { 195 uint64_t length; 196 } truncate; 197 struct { 198 struct spdk_io_channel *channel; 199 void *pin_buf; 200 int is_read; 201 off_t offset; 202 size_t length; 203 uint64_t start_lba; 204 uint64_t num_lba; 205 uint32_t blocklen; 206 } rw; 207 struct { 208 const char *old_name; 209 const char *new_name; 210 } rename; 211 struct { 212 struct cache_buffer *cache_buffer; 213 uint64_t length; 214 } flush; 215 struct { 216 struct cache_buffer *cache_buffer; 217 uint64_t length; 218 uint64_t offset; 219 } readahead; 220 struct { 221 /* offset of the file when the sync request was made */ 222 uint64_t offset; 223 TAILQ_ENTRY(spdk_fs_request) tailq; 224 bool xattr_in_progress; 225 /* length written to the xattr for this file - this should 226 * always be the same as the offset if only one thread is 227 * writing to the file, but could differ if multiple threads 228 * are appending 229 */ 230 uint64_t length; 231 } sync; 232 struct { 233 uint32_t num_clusters; 234 } resize; 235 struct { 236 const char *name; 237 uint32_t flags; 238 TAILQ_ENTRY(spdk_fs_request) tailq; 239 } open; 240 struct { 241 const char *name; 242 struct spdk_blob *blob; 243 } create; 244 struct { 245 const char *name; 246 } delete; 247 struct { 248 const char *name; 249 } stat; 250 } op; 251 }; 252 253 static void file_free(struct spdk_file *file); 254 static void fs_io_device_unregister(struct spdk_filesystem *fs); 255 static void fs_free_io_channels(struct spdk_filesystem *fs); 256 257 void 258 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 259 { 260 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 261 } 262 263 static int _blobfs_cache_pool_reclaim(void *arg); 264 265 static bool 266 blobfs_cache_pool_need_reclaim(void) 267 { 268 size_t count; 269 270 count = spdk_mempool_count(g_cache_pool); 271 /* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller 272 * when the number of available cache buffer is less than 1/5 of total buffers. 273 */ 274 if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) { 275 return false; 276 } 277 278 return true; 279 } 280 281 static void 282 __start_cache_pool_mgmt(void *ctx) 283 { 284 assert(g_cache_pool == NULL); 285 286 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 287 g_fs_cache_size / CACHE_BUFFER_SIZE, 288 CACHE_BUFFER_SIZE, 289 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 290 SPDK_ENV_SOCKET_ID_ANY); 291 if (!g_cache_pool) { 292 SPDK_ERRLOG("Create mempool failed, you may " 293 "increase the memory and try again\n"); 294 assert(false); 295 } 296 TAILQ_INIT(&g_caches); 297 pthread_spin_init(&g_caches_lock, 0); 298 299 assert(g_cache_pool_mgmt_poller == NULL); 300 g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL, 301 BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 302 } 303 304 static void 305 __stop_cache_pool_mgmt(void *ctx) 306 { 307 spdk_poller_unregister(&g_cache_pool_mgmt_poller); 308 309 assert(g_cache_pool != NULL); 310 assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE); 311 spdk_mempool_free(g_cache_pool); 312 g_cache_pool = NULL; 313 314 spdk_thread_exit(g_cache_pool_thread); 315 } 316 317 static void 318 initialize_global_cache(void) 319 { 320 pthread_mutex_lock(&g_cache_init_lock); 321 if (g_fs_count == 0) { 322 g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL); 323 assert(g_cache_pool_thread != NULL); 324 spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL); 325 } 326 g_fs_count++; 327 pthread_mutex_unlock(&g_cache_init_lock); 328 } 329 330 static void 331 free_global_cache(void) 332 { 333 pthread_mutex_lock(&g_cache_init_lock); 334 g_fs_count--; 335 if (g_fs_count == 0) { 336 spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL); 337 } 338 pthread_mutex_unlock(&g_cache_init_lock); 339 } 340 341 static uint64_t 342 __file_get_blob_size(struct spdk_file *file) 343 { 344 uint64_t cluster_sz; 345 346 cluster_sz = file->fs->bs_opts.cluster_sz; 347 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 348 } 349 350 struct spdk_fs_request { 351 struct spdk_fs_cb_args args; 352 TAILQ_ENTRY(spdk_fs_request) link; 353 struct spdk_fs_channel *channel; 354 }; 355 356 struct spdk_fs_channel { 357 struct spdk_fs_request *req_mem; 358 TAILQ_HEAD(, spdk_fs_request) reqs; 359 sem_t sem; 360 struct spdk_filesystem *fs; 361 struct spdk_io_channel *bs_channel; 362 fs_send_request_fn send_request; 363 bool sync; 364 uint32_t outstanding_reqs; 365 pthread_spinlock_t lock; 366 }; 367 368 /* For now, this is effectively an alias. But eventually we'll shift 369 * some data members over. */ 370 struct spdk_fs_thread_ctx { 371 struct spdk_fs_channel ch; 372 }; 373 374 static struct spdk_fs_request * 375 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt) 376 { 377 struct spdk_fs_request *req; 378 struct iovec *iovs = NULL; 379 380 if (iovcnt > 1) { 381 iovs = calloc(iovcnt, sizeof(struct iovec)); 382 if (!iovs) { 383 return NULL; 384 } 385 } 386 387 if (channel->sync) { 388 pthread_spin_lock(&channel->lock); 389 } 390 391 req = TAILQ_FIRST(&channel->reqs); 392 if (req) { 393 channel->outstanding_reqs++; 394 TAILQ_REMOVE(&channel->reqs, req, link); 395 } 396 397 if (channel->sync) { 398 pthread_spin_unlock(&channel->lock); 399 } 400 401 if (req == NULL) { 402 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 403 free(iovs); 404 return NULL; 405 } 406 memset(req, 0, sizeof(*req)); 407 req->channel = channel; 408 if (iovcnt > 1) { 409 req->args.iovs = iovs; 410 } else { 411 req->args.iovs = &req->args.iov; 412 } 413 req->args.iovcnt = iovcnt; 414 415 return req; 416 } 417 418 static struct spdk_fs_request * 419 alloc_fs_request(struct spdk_fs_channel *channel) 420 { 421 return alloc_fs_request_with_iov(channel, 0); 422 } 423 424 static void 425 free_fs_request(struct spdk_fs_request *req) 426 { 427 struct spdk_fs_channel *channel = req->channel; 428 429 if (req->args.iovcnt > 1) { 430 free(req->args.iovs); 431 } 432 433 if (channel->sync) { 434 pthread_spin_lock(&channel->lock); 435 } 436 437 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 438 channel->outstanding_reqs--; 439 440 if (channel->sync) { 441 pthread_spin_unlock(&channel->lock); 442 } 443 } 444 445 static int 446 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 447 uint32_t max_ops) 448 { 449 uint32_t i; 450 451 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 452 if (!channel->req_mem) { 453 return -1; 454 } 455 456 channel->outstanding_reqs = 0; 457 TAILQ_INIT(&channel->reqs); 458 sem_init(&channel->sem, 0, 0); 459 460 for (i = 0; i < max_ops; i++) { 461 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 462 } 463 464 channel->fs = fs; 465 466 return 0; 467 } 468 469 static int 470 fs_md_channel_create(void *io_device, void *ctx_buf) 471 { 472 struct spdk_filesystem *fs; 473 struct spdk_fs_channel *channel = ctx_buf; 474 475 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 476 477 return fs_channel_create(fs, channel, fs->md_target.max_ops); 478 } 479 480 static int 481 fs_sync_channel_create(void *io_device, void *ctx_buf) 482 { 483 struct spdk_filesystem *fs; 484 struct spdk_fs_channel *channel = ctx_buf; 485 486 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 487 488 return fs_channel_create(fs, channel, fs->sync_target.max_ops); 489 } 490 491 static int 492 fs_io_channel_create(void *io_device, void *ctx_buf) 493 { 494 struct spdk_filesystem *fs; 495 struct spdk_fs_channel *channel = ctx_buf; 496 497 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 498 499 return fs_channel_create(fs, channel, fs->io_target.max_ops); 500 } 501 502 static void 503 fs_channel_destroy(void *io_device, void *ctx_buf) 504 { 505 struct spdk_fs_channel *channel = ctx_buf; 506 507 if (channel->outstanding_reqs > 0) { 508 SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n", 509 channel->outstanding_reqs); 510 } 511 512 free(channel->req_mem); 513 if (channel->bs_channel != NULL) { 514 spdk_bs_free_io_channel(channel->bs_channel); 515 } 516 } 517 518 static void 519 __send_request_direct(fs_request_fn fn, void *arg) 520 { 521 fn(arg); 522 } 523 524 static void 525 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 526 { 527 fs->bs = bs; 528 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 529 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 530 fs->md_target.md_fs_channel->send_request = __send_request_direct; 531 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 532 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 533 534 initialize_global_cache(); 535 } 536 537 static void 538 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 539 { 540 struct spdk_fs_request *req = ctx; 541 struct spdk_fs_cb_args *args = &req->args; 542 struct spdk_filesystem *fs = args->fs; 543 544 if (bserrno == 0) { 545 common_fs_bs_init(fs, bs); 546 } else { 547 free(fs); 548 fs = NULL; 549 } 550 551 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 552 free_fs_request(req); 553 } 554 555 static void 556 fs_conf_parse(void) 557 { 558 struct spdk_conf_section *sp; 559 560 sp = spdk_conf_find_section(NULL, "Blobfs"); 561 if (sp == NULL) { 562 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 563 return; 564 } 565 566 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 567 if (g_fs_cache_buffer_shift <= 0) { 568 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 569 } 570 } 571 572 static struct spdk_filesystem * 573 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 574 { 575 struct spdk_filesystem *fs; 576 577 fs = calloc(1, sizeof(*fs)); 578 if (fs == NULL) { 579 return NULL; 580 } 581 582 fs->bdev = dev; 583 fs->send_request = send_request_fn; 584 TAILQ_INIT(&fs->files); 585 586 fs->md_target.max_ops = 512; 587 spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy, 588 sizeof(struct spdk_fs_channel), "blobfs_md"); 589 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 590 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 591 592 fs->sync_target.max_ops = 512; 593 spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy, 594 sizeof(struct spdk_fs_channel), "blobfs_sync"); 595 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 596 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 597 598 fs->io_target.max_ops = 512; 599 spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy, 600 sizeof(struct spdk_fs_channel), "blobfs_io"); 601 602 return fs; 603 } 604 605 static void 606 __wake_caller(void *arg, int fserrno) 607 { 608 struct spdk_fs_cb_args *args = arg; 609 610 args->rc = fserrno; 611 sem_post(args->sem); 612 } 613 614 void 615 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 616 fs_send_request_fn send_request_fn, 617 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 618 { 619 struct spdk_filesystem *fs; 620 struct spdk_fs_request *req; 621 struct spdk_fs_cb_args *args; 622 struct spdk_bs_opts opts = {}; 623 624 fs = fs_alloc(dev, send_request_fn); 625 if (fs == NULL) { 626 cb_fn(cb_arg, NULL, -ENOMEM); 627 return; 628 } 629 630 fs_conf_parse(); 631 632 req = alloc_fs_request(fs->md_target.md_fs_channel); 633 if (req == NULL) { 634 fs_free_io_channels(fs); 635 fs_io_device_unregister(fs); 636 cb_fn(cb_arg, NULL, -ENOMEM); 637 return; 638 } 639 640 args = &req->args; 641 args->fn.fs_op_with_handle = cb_fn; 642 args->arg = cb_arg; 643 args->fs = fs; 644 645 spdk_bs_opts_init(&opts); 646 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE); 647 if (opt) { 648 opts.cluster_sz = opt->cluster_sz; 649 } 650 spdk_bs_init(dev, &opts, init_cb, req); 651 } 652 653 static struct spdk_file * 654 file_alloc(struct spdk_filesystem *fs) 655 { 656 struct spdk_file *file; 657 658 file = calloc(1, sizeof(*file)); 659 if (file == NULL) { 660 return NULL; 661 } 662 663 file->tree = calloc(1, sizeof(*file->tree)); 664 if (file->tree == NULL) { 665 free(file); 666 return NULL; 667 } 668 669 file->fs = fs; 670 TAILQ_INIT(&file->open_requests); 671 TAILQ_INIT(&file->sync_requests); 672 pthread_spin_init(&file->lock, 0); 673 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 674 file->priority = SPDK_FILE_PRIORITY_LOW; 675 return file; 676 } 677 678 static void fs_load_done(void *ctx, int bserrno); 679 680 static int 681 _handle_deleted_files(struct spdk_fs_request *req) 682 { 683 struct spdk_fs_cb_args *args = &req->args; 684 struct spdk_filesystem *fs = args->fs; 685 686 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 687 struct spdk_deleted_file *deleted_file; 688 689 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 690 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 691 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 692 free(deleted_file); 693 return 0; 694 } 695 696 return 1; 697 } 698 699 static void 700 fs_load_done(void *ctx, int bserrno) 701 { 702 struct spdk_fs_request *req = ctx; 703 struct spdk_fs_cb_args *args = &req->args; 704 struct spdk_filesystem *fs = args->fs; 705 706 /* The filesystem has been loaded. Now check if there are any files that 707 * were marked for deletion before last unload. Do not complete the 708 * fs_load callback until all of them have been deleted on disk. 709 */ 710 if (_handle_deleted_files(req) == 0) { 711 /* We found a file that's been marked for deleting but not actually 712 * deleted yet. This function will get called again once the delete 713 * operation is completed. 714 */ 715 return; 716 } 717 718 args->fn.fs_op_with_handle(args->arg, fs, 0); 719 free_fs_request(req); 720 721 } 722 723 static void 724 _file_build_trace_arg_name(struct spdk_file *f) 725 { 726 f->trace_arg_name = 0; 727 memcpy(&f->trace_arg_name, f->name, 728 spdk_min(sizeof(f->trace_arg_name), strlen(f->name))); 729 } 730 731 static void 732 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 733 { 734 struct spdk_fs_request *req = ctx; 735 struct spdk_fs_cb_args *args = &req->args; 736 struct spdk_filesystem *fs = args->fs; 737 uint64_t *length; 738 const char *name; 739 uint32_t *is_deleted; 740 size_t value_len; 741 742 if (rc < 0) { 743 args->fn.fs_op_with_handle(args->arg, fs, rc); 744 free_fs_request(req); 745 return; 746 } 747 748 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 749 if (rc < 0) { 750 args->fn.fs_op_with_handle(args->arg, fs, rc); 751 free_fs_request(req); 752 return; 753 } 754 755 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 756 if (rc < 0) { 757 args->fn.fs_op_with_handle(args->arg, fs, rc); 758 free_fs_request(req); 759 return; 760 } 761 762 assert(value_len == 8); 763 764 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 765 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 766 if (rc < 0) { 767 struct spdk_file *f; 768 769 f = file_alloc(fs); 770 if (f == NULL) { 771 SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n"); 772 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 773 free_fs_request(req); 774 return; 775 } 776 777 f->name = strdup(name); 778 _file_build_trace_arg_name(f); 779 f->blobid = spdk_blob_get_id(blob); 780 f->length = *length; 781 f->length_flushed = *length; 782 f->length_xattr = *length; 783 f->append_pos = *length; 784 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 785 } else { 786 struct spdk_deleted_file *deleted_file; 787 788 deleted_file = calloc(1, sizeof(*deleted_file)); 789 if (deleted_file == NULL) { 790 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 791 free_fs_request(req); 792 return; 793 } 794 deleted_file->id = spdk_blob_get_id(blob); 795 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 796 } 797 } 798 799 static void 800 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 801 { 802 struct spdk_fs_request *req = ctx; 803 struct spdk_fs_cb_args *args = &req->args; 804 struct spdk_filesystem *fs = args->fs; 805 struct spdk_bs_type bstype; 806 static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE}; 807 static const struct spdk_bs_type zeros; 808 809 if (bserrno != 0) { 810 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 811 free_fs_request(req); 812 fs_free_io_channels(fs); 813 fs_io_device_unregister(fs); 814 return; 815 } 816 817 bstype = spdk_bs_get_bstype(bs); 818 819 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 820 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "assigning bstype\n"); 821 spdk_bs_set_bstype(bs, blobfs_type); 822 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 823 SPDK_ERRLOG("not blobfs\n"); 824 SPDK_LOGDUMP(SPDK_LOG_BLOBFS, "bstype", &bstype, sizeof(bstype)); 825 args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL); 826 free_fs_request(req); 827 fs_free_io_channels(fs); 828 fs_io_device_unregister(fs); 829 return; 830 } 831 832 common_fs_bs_init(fs, bs); 833 fs_load_done(req, 0); 834 } 835 836 static void 837 fs_io_device_unregister(struct spdk_filesystem *fs) 838 { 839 assert(fs != NULL); 840 spdk_io_device_unregister(&fs->md_target, NULL); 841 spdk_io_device_unregister(&fs->sync_target, NULL); 842 spdk_io_device_unregister(&fs->io_target, NULL); 843 free(fs); 844 } 845 846 static void 847 fs_free_io_channels(struct spdk_filesystem *fs) 848 { 849 assert(fs != NULL); 850 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 851 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 852 } 853 854 void 855 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 856 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 857 { 858 struct spdk_filesystem *fs; 859 struct spdk_fs_cb_args *args; 860 struct spdk_fs_request *req; 861 struct spdk_bs_opts bs_opts; 862 863 fs = fs_alloc(dev, send_request_fn); 864 if (fs == NULL) { 865 cb_fn(cb_arg, NULL, -ENOMEM); 866 return; 867 } 868 869 fs_conf_parse(); 870 871 req = alloc_fs_request(fs->md_target.md_fs_channel); 872 if (req == NULL) { 873 fs_free_io_channels(fs); 874 fs_io_device_unregister(fs); 875 cb_fn(cb_arg, NULL, -ENOMEM); 876 return; 877 } 878 879 args = &req->args; 880 args->fn.fs_op_with_handle = cb_fn; 881 args->arg = cb_arg; 882 args->fs = fs; 883 TAILQ_INIT(&args->op.fs_load.deleted_files); 884 spdk_bs_opts_init(&bs_opts); 885 bs_opts.iter_cb_fn = iter_cb; 886 bs_opts.iter_cb_arg = req; 887 spdk_bs_load(dev, &bs_opts, load_cb, req); 888 } 889 890 static void 891 unload_cb(void *ctx, int bserrno) 892 { 893 struct spdk_fs_request *req = ctx; 894 struct spdk_fs_cb_args *args = &req->args; 895 struct spdk_filesystem *fs = args->fs; 896 struct spdk_file *file, *tmp; 897 898 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 899 TAILQ_REMOVE(&fs->files, file, tailq); 900 file_free(file); 901 } 902 903 free_global_cache(); 904 905 args->fn.fs_op(args->arg, bserrno); 906 free(req); 907 908 fs_io_device_unregister(fs); 909 } 910 911 void 912 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 913 { 914 struct spdk_fs_request *req; 915 struct spdk_fs_cb_args *args; 916 917 /* 918 * We must free the md_channel before unloading the blobstore, so just 919 * allocate this request from the general heap. 920 */ 921 req = calloc(1, sizeof(*req)); 922 if (req == NULL) { 923 cb_fn(cb_arg, -ENOMEM); 924 return; 925 } 926 927 args = &req->args; 928 args->fn.fs_op = cb_fn; 929 args->arg = cb_arg; 930 args->fs = fs; 931 932 fs_free_io_channels(fs); 933 spdk_bs_unload(fs->bs, unload_cb, req); 934 } 935 936 static struct spdk_file * 937 fs_find_file(struct spdk_filesystem *fs, const char *name) 938 { 939 struct spdk_file *file; 940 941 TAILQ_FOREACH(file, &fs->files, tailq) { 942 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 943 return file; 944 } 945 } 946 947 return NULL; 948 } 949 950 void 951 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 952 spdk_file_stat_op_complete cb_fn, void *cb_arg) 953 { 954 struct spdk_file_stat stat; 955 struct spdk_file *f = NULL; 956 957 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 958 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 959 return; 960 } 961 962 f = fs_find_file(fs, name); 963 if (f != NULL) { 964 stat.blobid = f->blobid; 965 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 966 cb_fn(cb_arg, &stat, 0); 967 return; 968 } 969 970 cb_fn(cb_arg, NULL, -ENOENT); 971 } 972 973 static void 974 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 975 { 976 struct spdk_fs_request *req = arg; 977 struct spdk_fs_cb_args *args = &req->args; 978 979 args->rc = fserrno; 980 if (fserrno == 0) { 981 memcpy(args->arg, stat, sizeof(*stat)); 982 } 983 sem_post(args->sem); 984 } 985 986 static void 987 __file_stat(void *arg) 988 { 989 struct spdk_fs_request *req = arg; 990 struct spdk_fs_cb_args *args = &req->args; 991 992 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 993 args->fn.stat_op, req); 994 } 995 996 int 997 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 998 const char *name, struct spdk_file_stat *stat) 999 { 1000 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1001 struct spdk_fs_request *req; 1002 int rc; 1003 1004 req = alloc_fs_request(channel); 1005 if (req == NULL) { 1006 SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name); 1007 return -ENOMEM; 1008 } 1009 1010 req->args.fs = fs; 1011 req->args.op.stat.name = name; 1012 req->args.fn.stat_op = __copy_stat; 1013 req->args.arg = stat; 1014 req->args.sem = &channel->sem; 1015 channel->send_request(__file_stat, req); 1016 sem_wait(&channel->sem); 1017 1018 rc = req->args.rc; 1019 free_fs_request(req); 1020 1021 return rc; 1022 } 1023 1024 static void 1025 fs_create_blob_close_cb(void *ctx, int bserrno) 1026 { 1027 int rc; 1028 struct spdk_fs_request *req = ctx; 1029 struct spdk_fs_cb_args *args = &req->args; 1030 1031 rc = args->rc ? args->rc : bserrno; 1032 args->fn.file_op(args->arg, rc); 1033 free_fs_request(req); 1034 } 1035 1036 static void 1037 fs_create_blob_resize_cb(void *ctx, int bserrno) 1038 { 1039 struct spdk_fs_request *req = ctx; 1040 struct spdk_fs_cb_args *args = &req->args; 1041 struct spdk_file *f = args->file; 1042 struct spdk_blob *blob = args->op.create.blob; 1043 uint64_t length = 0; 1044 1045 args->rc = bserrno; 1046 if (bserrno) { 1047 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1048 return; 1049 } 1050 1051 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 1052 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 1053 1054 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1055 } 1056 1057 static void 1058 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1059 { 1060 struct spdk_fs_request *req = ctx; 1061 struct spdk_fs_cb_args *args = &req->args; 1062 1063 if (bserrno) { 1064 args->fn.file_op(args->arg, bserrno); 1065 free_fs_request(req); 1066 return; 1067 } 1068 1069 args->op.create.blob = blob; 1070 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 1071 } 1072 1073 static void 1074 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 1075 { 1076 struct spdk_fs_request *req = ctx; 1077 struct spdk_fs_cb_args *args = &req->args; 1078 struct spdk_file *f = args->file; 1079 1080 if (bserrno) { 1081 args->fn.file_op(args->arg, bserrno); 1082 free_fs_request(req); 1083 return; 1084 } 1085 1086 f->blobid = blobid; 1087 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 1088 } 1089 1090 void 1091 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 1092 spdk_file_op_complete cb_fn, void *cb_arg) 1093 { 1094 struct spdk_file *file; 1095 struct spdk_fs_request *req; 1096 struct spdk_fs_cb_args *args; 1097 1098 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1099 cb_fn(cb_arg, -ENAMETOOLONG); 1100 return; 1101 } 1102 1103 file = fs_find_file(fs, name); 1104 if (file != NULL) { 1105 cb_fn(cb_arg, -EEXIST); 1106 return; 1107 } 1108 1109 file = file_alloc(fs); 1110 if (file == NULL) { 1111 SPDK_ERRLOG("Cannot allocate new file for creation\n"); 1112 cb_fn(cb_arg, -ENOMEM); 1113 return; 1114 } 1115 1116 req = alloc_fs_request(fs->md_target.md_fs_channel); 1117 if (req == NULL) { 1118 SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name); 1119 cb_fn(cb_arg, -ENOMEM); 1120 return; 1121 } 1122 1123 args = &req->args; 1124 args->file = file; 1125 args->fn.file_op = cb_fn; 1126 args->arg = cb_arg; 1127 1128 file->name = strdup(name); 1129 _file_build_trace_arg_name(file); 1130 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 1131 } 1132 1133 static void 1134 __fs_create_file_done(void *arg, int fserrno) 1135 { 1136 struct spdk_fs_request *req = arg; 1137 struct spdk_fs_cb_args *args = &req->args; 1138 1139 __wake_caller(args, fserrno); 1140 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1141 } 1142 1143 static void 1144 __fs_create_file(void *arg) 1145 { 1146 struct spdk_fs_request *req = arg; 1147 struct spdk_fs_cb_args *args = &req->args; 1148 1149 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1150 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1151 } 1152 1153 int 1154 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name) 1155 { 1156 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1157 struct spdk_fs_request *req; 1158 struct spdk_fs_cb_args *args; 1159 int rc; 1160 1161 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1162 1163 req = alloc_fs_request(channel); 1164 if (req == NULL) { 1165 SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name); 1166 return -ENOMEM; 1167 } 1168 1169 args = &req->args; 1170 args->fs = fs; 1171 args->op.create.name = name; 1172 args->sem = &channel->sem; 1173 fs->send_request(__fs_create_file, req); 1174 sem_wait(&channel->sem); 1175 rc = args->rc; 1176 free_fs_request(req); 1177 1178 return rc; 1179 } 1180 1181 static void 1182 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1183 { 1184 struct spdk_fs_request *req = ctx; 1185 struct spdk_fs_cb_args *args = &req->args; 1186 struct spdk_file *f = args->file; 1187 1188 f->blob = blob; 1189 while (!TAILQ_EMPTY(&f->open_requests)) { 1190 req = TAILQ_FIRST(&f->open_requests); 1191 args = &req->args; 1192 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1193 spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name); 1194 args->fn.file_op_with_handle(args->arg, f, bserrno); 1195 free_fs_request(req); 1196 } 1197 } 1198 1199 static void 1200 fs_open_blob_create_cb(void *ctx, int bserrno) 1201 { 1202 struct spdk_fs_request *req = ctx; 1203 struct spdk_fs_cb_args *args = &req->args; 1204 struct spdk_file *file = args->file; 1205 struct spdk_filesystem *fs = args->fs; 1206 1207 if (file == NULL) { 1208 /* 1209 * This is from an open with CREATE flag - the file 1210 * is now created so look it up in the file list for this 1211 * filesystem. 1212 */ 1213 file = fs_find_file(fs, args->op.open.name); 1214 assert(file != NULL); 1215 args->file = file; 1216 } 1217 1218 file->ref_count++; 1219 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1220 if (file->ref_count == 1) { 1221 assert(file->blob == NULL); 1222 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1223 } else if (file->blob != NULL) { 1224 fs_open_blob_done(req, file->blob, 0); 1225 } else { 1226 /* 1227 * The blob open for this file is in progress due to a previous 1228 * open request. When that open completes, it will invoke the 1229 * open callback for this request. 1230 */ 1231 } 1232 } 1233 1234 void 1235 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1236 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1237 { 1238 struct spdk_file *f = NULL; 1239 struct spdk_fs_request *req; 1240 struct spdk_fs_cb_args *args; 1241 1242 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1243 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1244 return; 1245 } 1246 1247 f = fs_find_file(fs, name); 1248 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1249 cb_fn(cb_arg, NULL, -ENOENT); 1250 return; 1251 } 1252 1253 if (f != NULL && f->is_deleted == true) { 1254 cb_fn(cb_arg, NULL, -ENOENT); 1255 return; 1256 } 1257 1258 req = alloc_fs_request(fs->md_target.md_fs_channel); 1259 if (req == NULL) { 1260 SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name); 1261 cb_fn(cb_arg, NULL, -ENOMEM); 1262 return; 1263 } 1264 1265 args = &req->args; 1266 args->fn.file_op_with_handle = cb_fn; 1267 args->arg = cb_arg; 1268 args->file = f; 1269 args->fs = fs; 1270 args->op.open.name = name; 1271 1272 if (f == NULL) { 1273 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1274 } else { 1275 fs_open_blob_create_cb(req, 0); 1276 } 1277 } 1278 1279 static void 1280 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1281 { 1282 struct spdk_fs_request *req = arg; 1283 struct spdk_fs_cb_args *args = &req->args; 1284 1285 args->file = file; 1286 __wake_caller(args, bserrno); 1287 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1288 } 1289 1290 static void 1291 __fs_open_file(void *arg) 1292 { 1293 struct spdk_fs_request *req = arg; 1294 struct spdk_fs_cb_args *args = &req->args; 1295 1296 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1297 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1298 __fs_open_file_done, req); 1299 } 1300 1301 int 1302 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1303 const char *name, uint32_t flags, struct spdk_file **file) 1304 { 1305 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1306 struct spdk_fs_request *req; 1307 struct spdk_fs_cb_args *args; 1308 int rc; 1309 1310 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1311 1312 req = alloc_fs_request(channel); 1313 if (req == NULL) { 1314 SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name); 1315 return -ENOMEM; 1316 } 1317 1318 args = &req->args; 1319 args->fs = fs; 1320 args->op.open.name = name; 1321 args->op.open.flags = flags; 1322 args->sem = &channel->sem; 1323 fs->send_request(__fs_open_file, req); 1324 sem_wait(&channel->sem); 1325 rc = args->rc; 1326 if (rc == 0) { 1327 *file = args->file; 1328 } else { 1329 *file = NULL; 1330 } 1331 free_fs_request(req); 1332 1333 return rc; 1334 } 1335 1336 static void 1337 fs_rename_blob_close_cb(void *ctx, int bserrno) 1338 { 1339 struct spdk_fs_request *req = ctx; 1340 struct spdk_fs_cb_args *args = &req->args; 1341 1342 args->fn.fs_op(args->arg, bserrno); 1343 free_fs_request(req); 1344 } 1345 1346 static void 1347 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1348 { 1349 struct spdk_fs_request *req = ctx; 1350 struct spdk_fs_cb_args *args = &req->args; 1351 const char *new_name = args->op.rename.new_name; 1352 1353 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1354 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1355 } 1356 1357 static void 1358 _fs_md_rename_file(struct spdk_fs_request *req) 1359 { 1360 struct spdk_fs_cb_args *args = &req->args; 1361 struct spdk_file *f; 1362 1363 f = fs_find_file(args->fs, args->op.rename.old_name); 1364 if (f == NULL) { 1365 args->fn.fs_op(args->arg, -ENOENT); 1366 free_fs_request(req); 1367 return; 1368 } 1369 1370 free(f->name); 1371 f->name = strdup(args->op.rename.new_name); 1372 _file_build_trace_arg_name(f); 1373 args->file = f; 1374 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1375 } 1376 1377 static void 1378 fs_rename_delete_done(void *arg, int fserrno) 1379 { 1380 _fs_md_rename_file(arg); 1381 } 1382 1383 void 1384 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1385 const char *old_name, const char *new_name, 1386 spdk_file_op_complete cb_fn, void *cb_arg) 1387 { 1388 struct spdk_file *f; 1389 struct spdk_fs_request *req; 1390 struct spdk_fs_cb_args *args; 1391 1392 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1393 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1394 cb_fn(cb_arg, -ENAMETOOLONG); 1395 return; 1396 } 1397 1398 req = alloc_fs_request(fs->md_target.md_fs_channel); 1399 if (req == NULL) { 1400 SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name, 1401 new_name); 1402 cb_fn(cb_arg, -ENOMEM); 1403 return; 1404 } 1405 1406 args = &req->args; 1407 args->fn.fs_op = cb_fn; 1408 args->fs = fs; 1409 args->arg = cb_arg; 1410 args->op.rename.old_name = old_name; 1411 args->op.rename.new_name = new_name; 1412 1413 f = fs_find_file(fs, new_name); 1414 if (f == NULL) { 1415 _fs_md_rename_file(req); 1416 return; 1417 } 1418 1419 /* 1420 * The rename overwrites an existing file. So delete the existing file, then 1421 * do the actual rename. 1422 */ 1423 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1424 } 1425 1426 static void 1427 __fs_rename_file_done(void *arg, int fserrno) 1428 { 1429 struct spdk_fs_request *req = arg; 1430 struct spdk_fs_cb_args *args = &req->args; 1431 1432 __wake_caller(args, fserrno); 1433 } 1434 1435 static void 1436 __fs_rename_file(void *arg) 1437 { 1438 struct spdk_fs_request *req = arg; 1439 struct spdk_fs_cb_args *args = &req->args; 1440 1441 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1442 __fs_rename_file_done, req); 1443 } 1444 1445 int 1446 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1447 const char *old_name, const char *new_name) 1448 { 1449 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1450 struct spdk_fs_request *req; 1451 struct spdk_fs_cb_args *args; 1452 int rc; 1453 1454 req = alloc_fs_request(channel); 1455 if (req == NULL) { 1456 SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name); 1457 return -ENOMEM; 1458 } 1459 1460 args = &req->args; 1461 1462 args->fs = fs; 1463 args->op.rename.old_name = old_name; 1464 args->op.rename.new_name = new_name; 1465 args->sem = &channel->sem; 1466 fs->send_request(__fs_rename_file, req); 1467 sem_wait(&channel->sem); 1468 rc = args->rc; 1469 free_fs_request(req); 1470 return rc; 1471 } 1472 1473 static void 1474 blob_delete_cb(void *ctx, int bserrno) 1475 { 1476 struct spdk_fs_request *req = ctx; 1477 struct spdk_fs_cb_args *args = &req->args; 1478 1479 args->fn.file_op(args->arg, bserrno); 1480 free_fs_request(req); 1481 } 1482 1483 void 1484 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1485 spdk_file_op_complete cb_fn, void *cb_arg) 1486 { 1487 struct spdk_file *f; 1488 spdk_blob_id blobid; 1489 struct spdk_fs_request *req; 1490 struct spdk_fs_cb_args *args; 1491 1492 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1493 1494 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1495 cb_fn(cb_arg, -ENAMETOOLONG); 1496 return; 1497 } 1498 1499 f = fs_find_file(fs, name); 1500 if (f == NULL) { 1501 SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name); 1502 cb_fn(cb_arg, -ENOENT); 1503 return; 1504 } 1505 1506 req = alloc_fs_request(fs->md_target.md_fs_channel); 1507 if (req == NULL) { 1508 SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name); 1509 cb_fn(cb_arg, -ENOMEM); 1510 return; 1511 } 1512 1513 args = &req->args; 1514 args->fn.file_op = cb_fn; 1515 args->arg = cb_arg; 1516 1517 if (f->ref_count > 0) { 1518 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1519 f->is_deleted = true; 1520 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1521 spdk_blob_sync_md(f->blob, blob_delete_cb, req); 1522 return; 1523 } 1524 1525 blobid = f->blobid; 1526 TAILQ_REMOVE(&fs->files, f, tailq); 1527 1528 file_free(f); 1529 1530 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1531 } 1532 1533 static uint64_t 1534 fs_name_to_uint64(const char *name) 1535 { 1536 uint64_t result = 0; 1537 memcpy(&result, name, spdk_min(sizeof(result), strlen(name))); 1538 return result; 1539 } 1540 1541 static void 1542 __fs_delete_file_done(void *arg, int fserrno) 1543 { 1544 struct spdk_fs_request *req = arg; 1545 struct spdk_fs_cb_args *args = &req->args; 1546 1547 spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, fs_name_to_uint64(args->op.delete.name)); 1548 __wake_caller(args, fserrno); 1549 } 1550 1551 static void 1552 __fs_delete_file(void *arg) 1553 { 1554 struct spdk_fs_request *req = arg; 1555 struct spdk_fs_cb_args *args = &req->args; 1556 1557 spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, fs_name_to_uint64(args->op.delete.name)); 1558 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1559 } 1560 1561 int 1562 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1563 const char *name) 1564 { 1565 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1566 struct spdk_fs_request *req; 1567 struct spdk_fs_cb_args *args; 1568 int rc; 1569 1570 req = alloc_fs_request(channel); 1571 if (req == NULL) { 1572 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Cannot allocate req to delete file=%s\n", name); 1573 return -ENOMEM; 1574 } 1575 1576 args = &req->args; 1577 args->fs = fs; 1578 args->op.delete.name = name; 1579 args->sem = &channel->sem; 1580 fs->send_request(__fs_delete_file, req); 1581 sem_wait(&channel->sem); 1582 rc = args->rc; 1583 free_fs_request(req); 1584 1585 return rc; 1586 } 1587 1588 spdk_fs_iter 1589 spdk_fs_iter_first(struct spdk_filesystem *fs) 1590 { 1591 struct spdk_file *f; 1592 1593 f = TAILQ_FIRST(&fs->files); 1594 return f; 1595 } 1596 1597 spdk_fs_iter 1598 spdk_fs_iter_next(spdk_fs_iter iter) 1599 { 1600 struct spdk_file *f = iter; 1601 1602 if (f == NULL) { 1603 return NULL; 1604 } 1605 1606 f = TAILQ_NEXT(f, tailq); 1607 return f; 1608 } 1609 1610 const char * 1611 spdk_file_get_name(struct spdk_file *file) 1612 { 1613 return file->name; 1614 } 1615 1616 uint64_t 1617 spdk_file_get_length(struct spdk_file *file) 1618 { 1619 uint64_t length; 1620 1621 assert(file != NULL); 1622 1623 length = file->append_pos >= file->length ? file->append_pos : file->length; 1624 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length); 1625 return length; 1626 } 1627 1628 static void 1629 fs_truncate_complete_cb(void *ctx, int bserrno) 1630 { 1631 struct spdk_fs_request *req = ctx; 1632 struct spdk_fs_cb_args *args = &req->args; 1633 1634 args->fn.file_op(args->arg, bserrno); 1635 free_fs_request(req); 1636 } 1637 1638 static void 1639 fs_truncate_resize_cb(void *ctx, int bserrno) 1640 { 1641 struct spdk_fs_request *req = ctx; 1642 struct spdk_fs_cb_args *args = &req->args; 1643 struct spdk_file *file = args->file; 1644 uint64_t *length = &args->op.truncate.length; 1645 1646 if (bserrno) { 1647 args->fn.file_op(args->arg, bserrno); 1648 free_fs_request(req); 1649 return; 1650 } 1651 1652 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1653 1654 file->length = *length; 1655 if (file->append_pos > file->length) { 1656 file->append_pos = file->length; 1657 } 1658 1659 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req); 1660 } 1661 1662 static uint64_t 1663 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1664 { 1665 return (length + cluster_sz - 1) / cluster_sz; 1666 } 1667 1668 void 1669 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1670 spdk_file_op_complete cb_fn, void *cb_arg) 1671 { 1672 struct spdk_filesystem *fs; 1673 size_t num_clusters; 1674 struct spdk_fs_request *req; 1675 struct spdk_fs_cb_args *args; 1676 1677 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1678 if (length == file->length) { 1679 cb_fn(cb_arg, 0); 1680 return; 1681 } 1682 1683 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1684 if (req == NULL) { 1685 cb_fn(cb_arg, -ENOMEM); 1686 return; 1687 } 1688 1689 args = &req->args; 1690 args->fn.file_op = cb_fn; 1691 args->arg = cb_arg; 1692 args->file = file; 1693 args->op.truncate.length = length; 1694 fs = file->fs; 1695 1696 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1697 1698 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1699 } 1700 1701 static void 1702 __truncate(void *arg) 1703 { 1704 struct spdk_fs_request *req = arg; 1705 struct spdk_fs_cb_args *args = &req->args; 1706 1707 spdk_file_truncate_async(args->file, args->op.truncate.length, 1708 args->fn.file_op, args); 1709 } 1710 1711 int 1712 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 1713 uint64_t length) 1714 { 1715 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1716 struct spdk_fs_request *req; 1717 struct spdk_fs_cb_args *args; 1718 int rc; 1719 1720 req = alloc_fs_request(channel); 1721 if (req == NULL) { 1722 return -ENOMEM; 1723 } 1724 1725 args = &req->args; 1726 1727 args->file = file; 1728 args->op.truncate.length = length; 1729 args->fn.file_op = __wake_caller; 1730 args->sem = &channel->sem; 1731 1732 channel->send_request(__truncate, req); 1733 sem_wait(&channel->sem); 1734 rc = args->rc; 1735 free_fs_request(req); 1736 1737 return rc; 1738 } 1739 1740 static void 1741 __rw_done(void *ctx, int bserrno) 1742 { 1743 struct spdk_fs_request *req = ctx; 1744 struct spdk_fs_cb_args *args = &req->args; 1745 1746 spdk_free(args->op.rw.pin_buf); 1747 args->fn.file_op(args->arg, bserrno); 1748 free_fs_request(req); 1749 } 1750 1751 static void 1752 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 1753 { 1754 int i; 1755 size_t len; 1756 1757 for (i = 0; i < iovcnt; i++) { 1758 len = spdk_min(iovs[i].iov_len, buf_len); 1759 memcpy(buf, iovs[i].iov_base, len); 1760 buf += len; 1761 assert(buf_len >= len); 1762 buf_len -= len; 1763 } 1764 } 1765 1766 static void 1767 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 1768 { 1769 int i; 1770 size_t len; 1771 1772 for (i = 0; i < iovcnt; i++) { 1773 len = spdk_min(iovs[i].iov_len, buf_len); 1774 memcpy(iovs[i].iov_base, buf, len); 1775 buf += len; 1776 assert(buf_len >= len); 1777 buf_len -= len; 1778 } 1779 } 1780 1781 static void 1782 __read_done(void *ctx, int bserrno) 1783 { 1784 struct spdk_fs_request *req = ctx; 1785 struct spdk_fs_cb_args *args = &req->args; 1786 void *buf; 1787 1788 assert(req != NULL); 1789 buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1))); 1790 if (args->op.rw.is_read) { 1791 _copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length); 1792 __rw_done(req, 0); 1793 } else { 1794 _copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt); 1795 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1796 args->op.rw.pin_buf, 1797 args->op.rw.start_lba, args->op.rw.num_lba, 1798 __rw_done, req); 1799 } 1800 } 1801 1802 static void 1803 __do_blob_read(void *ctx, int fserrno) 1804 { 1805 struct spdk_fs_request *req = ctx; 1806 struct spdk_fs_cb_args *args = &req->args; 1807 1808 if (fserrno) { 1809 __rw_done(req, fserrno); 1810 return; 1811 } 1812 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1813 args->op.rw.pin_buf, 1814 args->op.rw.start_lba, args->op.rw.num_lba, 1815 __read_done, req); 1816 } 1817 1818 static void 1819 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1820 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1821 { 1822 uint64_t end_lba; 1823 1824 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1825 *start_lba = offset / *lba_size; 1826 end_lba = (offset + length - 1) / *lba_size; 1827 *num_lba = (end_lba - *start_lba + 1); 1828 } 1829 1830 static bool 1831 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length) 1832 { 1833 uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1834 1835 if ((offset % lba_size == 0) && (length % lba_size == 0)) { 1836 return true; 1837 } 1838 1839 return false; 1840 } 1841 1842 static void 1843 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt) 1844 { 1845 uint32_t i; 1846 1847 for (i = 0; i < iovcnt; i++) { 1848 req->args.iovs[i].iov_base = iovs[i].iov_base; 1849 req->args.iovs[i].iov_len = iovs[i].iov_len; 1850 } 1851 } 1852 1853 static void 1854 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel, 1855 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1856 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1857 { 1858 struct spdk_fs_request *req; 1859 struct spdk_fs_cb_args *args; 1860 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1861 uint64_t start_lba, num_lba, pin_buf_length; 1862 uint32_t lba_size; 1863 1864 if (is_read && offset + length > file->length) { 1865 cb_fn(cb_arg, -EINVAL); 1866 return; 1867 } 1868 1869 req = alloc_fs_request_with_iov(channel, iovcnt); 1870 if (req == NULL) { 1871 cb_fn(cb_arg, -ENOMEM); 1872 return; 1873 } 1874 1875 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1876 1877 args = &req->args; 1878 args->fn.file_op = cb_fn; 1879 args->arg = cb_arg; 1880 args->file = file; 1881 args->op.rw.channel = channel->bs_channel; 1882 _fs_request_setup_iovs(req, iovs, iovcnt); 1883 args->op.rw.is_read = is_read; 1884 args->op.rw.offset = offset; 1885 args->op.rw.blocklen = lba_size; 1886 1887 pin_buf_length = num_lba * lba_size; 1888 args->op.rw.length = pin_buf_length; 1889 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL, 1890 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1891 if (args->op.rw.pin_buf == NULL) { 1892 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1893 file->name, offset, length); 1894 free_fs_request(req); 1895 cb_fn(cb_arg, -ENOMEM); 1896 return; 1897 } 1898 1899 args->op.rw.start_lba = start_lba; 1900 args->op.rw.num_lba = num_lba; 1901 1902 if (!is_read && file->length < offset + length) { 1903 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1904 } else if (!is_read && __is_lba_aligned(file, offset, length)) { 1905 _copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt); 1906 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1907 args->op.rw.pin_buf, 1908 args->op.rw.start_lba, args->op.rw.num_lba, 1909 __rw_done, req); 1910 } else { 1911 __do_blob_read(req, 0); 1912 } 1913 } 1914 1915 static void 1916 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel, 1917 void *payload, uint64_t offset, uint64_t length, 1918 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1919 { 1920 struct iovec iov; 1921 1922 iov.iov_base = payload; 1923 iov.iov_len = (size_t)length; 1924 1925 __readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read); 1926 } 1927 1928 void 1929 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1930 void *payload, uint64_t offset, uint64_t length, 1931 spdk_file_op_complete cb_fn, void *cb_arg) 1932 { 1933 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1934 } 1935 1936 void 1937 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel, 1938 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1939 spdk_file_op_complete cb_fn, void *cb_arg) 1940 { 1941 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1942 file->name, offset, length); 1943 1944 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0); 1945 } 1946 1947 void 1948 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1949 void *payload, uint64_t offset, uint64_t length, 1950 spdk_file_op_complete cb_fn, void *cb_arg) 1951 { 1952 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1953 file->name, offset, length); 1954 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1955 } 1956 1957 void 1958 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel, 1959 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1960 spdk_file_op_complete cb_fn, void *cb_arg) 1961 { 1962 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1963 file->name, offset, length); 1964 1965 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1); 1966 } 1967 1968 struct spdk_io_channel * 1969 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1970 { 1971 struct spdk_io_channel *io_channel; 1972 struct spdk_fs_channel *fs_channel; 1973 1974 io_channel = spdk_get_io_channel(&fs->io_target); 1975 fs_channel = spdk_io_channel_get_ctx(io_channel); 1976 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1977 fs_channel->send_request = __send_request_direct; 1978 1979 return io_channel; 1980 } 1981 1982 void 1983 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1984 { 1985 spdk_put_io_channel(channel); 1986 } 1987 1988 struct spdk_fs_thread_ctx * 1989 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs) 1990 { 1991 struct spdk_fs_thread_ctx *ctx; 1992 1993 ctx = calloc(1, sizeof(*ctx)); 1994 if (!ctx) { 1995 return NULL; 1996 } 1997 1998 fs_channel_create(fs, &ctx->ch, 512); 1999 2000 ctx->ch.send_request = fs->send_request; 2001 ctx->ch.sync = 1; 2002 pthread_spin_init(&ctx->ch.lock, 0); 2003 2004 return ctx; 2005 } 2006 2007 2008 void 2009 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx) 2010 { 2011 assert(ctx->ch.sync == 1); 2012 2013 while (true) { 2014 pthread_spin_lock(&ctx->ch.lock); 2015 if (ctx->ch.outstanding_reqs == 0) { 2016 pthread_spin_unlock(&ctx->ch.lock); 2017 break; 2018 } 2019 pthread_spin_unlock(&ctx->ch.lock); 2020 usleep(1000); 2021 } 2022 2023 fs_channel_destroy(NULL, &ctx->ch); 2024 free(ctx); 2025 } 2026 2027 int 2028 spdk_fs_set_cache_size(uint64_t size_in_mb) 2029 { 2030 /* setting g_fs_cache_size is only permitted if cache pool 2031 * is already freed or hasn't been initialized 2032 */ 2033 if (g_cache_pool != NULL) { 2034 return -EPERM; 2035 } 2036 2037 g_fs_cache_size = size_in_mb * 1024 * 1024; 2038 2039 return 0; 2040 } 2041 2042 uint64_t 2043 spdk_fs_get_cache_size(void) 2044 { 2045 return g_fs_cache_size / (1024 * 1024); 2046 } 2047 2048 static void __file_flush(void *ctx); 2049 2050 /* Try to free some cache buffers of this file, this function must 2051 * be called while holding g_caches_lock. 2052 */ 2053 static int 2054 reclaim_cache_buffers(struct spdk_file *file) 2055 { 2056 int rc; 2057 2058 BLOBFS_TRACE(file, "free=%s\n", file->name); 2059 2060 /* The function is safe to be called with any threads, while the file 2061 * lock maybe locked by other thread for now, so try to get the file 2062 * lock here. 2063 */ 2064 rc = pthread_spin_trylock(&file->lock); 2065 if (rc != 0) { 2066 return -1; 2067 } 2068 2069 if (file->tree->present_mask == 0) { 2070 pthread_spin_unlock(&file->lock); 2071 return -1; 2072 } 2073 tree_free_buffers(file->tree); 2074 2075 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2076 /* If not freed, put it in the end of the queue */ 2077 if (file->tree->present_mask != 0) { 2078 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2079 } else { 2080 file->last = NULL; 2081 } 2082 pthread_spin_unlock(&file->lock); 2083 2084 return 0; 2085 } 2086 2087 static int 2088 _blobfs_cache_pool_reclaim(void *arg) 2089 { 2090 struct spdk_file *file, *tmp; 2091 int rc; 2092 2093 if (!blobfs_cache_pool_need_reclaim()) { 2094 return 0; 2095 } 2096 2097 pthread_spin_lock(&g_caches_lock); 2098 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2099 if (!file->open_for_writing && 2100 file->priority == SPDK_FILE_PRIORITY_LOW) { 2101 rc = reclaim_cache_buffers(file); 2102 if (rc < 0) { 2103 continue; 2104 } 2105 if (!blobfs_cache_pool_need_reclaim()) { 2106 pthread_spin_unlock(&g_caches_lock); 2107 return 1; 2108 } 2109 break; 2110 } 2111 } 2112 2113 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2114 if (!file->open_for_writing) { 2115 rc = reclaim_cache_buffers(file); 2116 if (rc < 0) { 2117 continue; 2118 } 2119 if (!blobfs_cache_pool_need_reclaim()) { 2120 pthread_spin_unlock(&g_caches_lock); 2121 return 1; 2122 } 2123 break; 2124 } 2125 } 2126 2127 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2128 rc = reclaim_cache_buffers(file); 2129 if (rc < 0) { 2130 continue; 2131 } 2132 break; 2133 } 2134 pthread_spin_unlock(&g_caches_lock); 2135 2136 return 1; 2137 } 2138 2139 static void 2140 _add_file_to_cache_pool(void *ctx) 2141 { 2142 struct spdk_file *file = ctx; 2143 2144 pthread_spin_lock(&g_caches_lock); 2145 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2146 pthread_spin_unlock(&g_caches_lock); 2147 } 2148 2149 static void 2150 _remove_file_from_cache_pool(void *ctx) 2151 { 2152 struct spdk_file *file = ctx; 2153 2154 pthread_spin_lock(&g_caches_lock); 2155 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2156 pthread_spin_unlock(&g_caches_lock); 2157 } 2158 2159 static struct cache_buffer * 2160 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 2161 { 2162 struct cache_buffer *buf; 2163 int count = 0; 2164 bool need_update = false; 2165 2166 buf = calloc(1, sizeof(*buf)); 2167 if (buf == NULL) { 2168 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 2169 return NULL; 2170 } 2171 2172 do { 2173 buf->buf = spdk_mempool_get(g_cache_pool); 2174 if (buf->buf) { 2175 break; 2176 } 2177 if (count++ == 100) { 2178 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 2179 file, offset); 2180 free(buf); 2181 return NULL; 2182 } 2183 usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 2184 } while (true); 2185 2186 buf->buf_size = CACHE_BUFFER_SIZE; 2187 buf->offset = offset; 2188 2189 if (file->tree->present_mask == 0) { 2190 need_update = true; 2191 } 2192 file->tree = tree_insert_buffer(file->tree, buf); 2193 2194 if (need_update) { 2195 spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file); 2196 } 2197 2198 return buf; 2199 } 2200 2201 static struct cache_buffer * 2202 cache_append_buffer(struct spdk_file *file) 2203 { 2204 struct cache_buffer *last; 2205 2206 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 2207 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 2208 2209 last = cache_insert_buffer(file, file->append_pos); 2210 if (last == NULL) { 2211 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 2212 return NULL; 2213 } 2214 2215 file->last = last; 2216 2217 return last; 2218 } 2219 2220 static void __check_sync_reqs(struct spdk_file *file); 2221 2222 static void 2223 __file_cache_finish_sync(void *ctx, int bserrno) 2224 { 2225 struct spdk_file *file; 2226 struct spdk_fs_request *sync_req = ctx; 2227 struct spdk_fs_cb_args *sync_args; 2228 2229 sync_args = &sync_req->args; 2230 file = sync_args->file; 2231 pthread_spin_lock(&file->lock); 2232 file->length_xattr = sync_args->op.sync.length; 2233 assert(sync_args->op.sync.offset <= file->length_flushed); 2234 spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset, 2235 0, file->trace_arg_name); 2236 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 2237 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 2238 pthread_spin_unlock(&file->lock); 2239 2240 sync_args->fn.file_op(sync_args->arg, bserrno); 2241 2242 free_fs_request(sync_req); 2243 __check_sync_reqs(file); 2244 } 2245 2246 static void 2247 __check_sync_reqs(struct spdk_file *file) 2248 { 2249 struct spdk_fs_request *sync_req; 2250 2251 pthread_spin_lock(&file->lock); 2252 2253 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 2254 if (sync_req->args.op.sync.offset <= file->length_flushed) { 2255 break; 2256 } 2257 } 2258 2259 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 2260 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 2261 sync_req->args.op.sync.xattr_in_progress = true; 2262 sync_req->args.op.sync.length = file->length_flushed; 2263 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 2264 sizeof(file->length_flushed)); 2265 2266 pthread_spin_unlock(&file->lock); 2267 spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed, 2268 0, file->trace_arg_name); 2269 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req); 2270 } else { 2271 pthread_spin_unlock(&file->lock); 2272 } 2273 } 2274 2275 static void 2276 __file_flush_done(void *ctx, int bserrno) 2277 { 2278 struct spdk_fs_request *req = ctx; 2279 struct spdk_fs_cb_args *args = &req->args; 2280 struct spdk_file *file = args->file; 2281 struct cache_buffer *next = args->op.flush.cache_buffer; 2282 2283 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 2284 2285 pthread_spin_lock(&file->lock); 2286 next->in_progress = false; 2287 next->bytes_flushed += args->op.flush.length; 2288 file->length_flushed += args->op.flush.length; 2289 if (file->length_flushed > file->length) { 2290 file->length = file->length_flushed; 2291 } 2292 if (next->bytes_flushed == next->buf_size) { 2293 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 2294 next = tree_find_buffer(file->tree, file->length_flushed); 2295 } 2296 2297 /* 2298 * Assert that there is no cached data that extends past the end of the underlying 2299 * blob. 2300 */ 2301 assert(next == NULL || next->offset < __file_get_blob_size(file) || 2302 next->bytes_filled == 0); 2303 2304 pthread_spin_unlock(&file->lock); 2305 2306 __check_sync_reqs(file); 2307 2308 __file_flush(req); 2309 } 2310 2311 static void 2312 __file_flush(void *ctx) 2313 { 2314 struct spdk_fs_request *req = ctx; 2315 struct spdk_fs_cb_args *args = &req->args; 2316 struct spdk_file *file = args->file; 2317 struct cache_buffer *next; 2318 uint64_t offset, length, start_lba, num_lba; 2319 uint32_t lba_size; 2320 2321 pthread_spin_lock(&file->lock); 2322 next = tree_find_buffer(file->tree, file->length_flushed); 2323 if (next == NULL || next->in_progress || 2324 ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) { 2325 /* 2326 * There is either no data to flush, a flush I/O is already in 2327 * progress, or the next buffer is partially filled but there's no 2328 * outstanding request to sync it. 2329 * So return immediately - if a flush I/O is in progress we will flush 2330 * more data after that is completed, or a partial buffer will get flushed 2331 * when it is either filled or the file is synced. 2332 */ 2333 free_fs_request(req); 2334 if (next == NULL) { 2335 /* 2336 * For cases where a file's cache was evicted, and then the 2337 * file was later appended, we will write the data directly 2338 * to disk and bypass cache. So just update length_flushed 2339 * here to reflect that all data was already written to disk. 2340 */ 2341 file->length_flushed = file->append_pos; 2342 } 2343 pthread_spin_unlock(&file->lock); 2344 if (next == NULL) { 2345 /* 2346 * There is no data to flush, but we still need to check for any 2347 * outstanding sync requests to make sure metadata gets updated. 2348 */ 2349 __check_sync_reqs(file); 2350 } 2351 return; 2352 } 2353 2354 offset = next->offset + next->bytes_flushed; 2355 length = next->bytes_filled - next->bytes_flushed; 2356 if (length == 0) { 2357 free_fs_request(req); 2358 pthread_spin_unlock(&file->lock); 2359 /* 2360 * There is no data to flush, but we still need to check for any 2361 * outstanding sync requests to make sure metadata gets updated. 2362 */ 2363 __check_sync_reqs(file); 2364 return; 2365 } 2366 args->op.flush.length = length; 2367 args->op.flush.cache_buffer = next; 2368 2369 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2370 2371 next->in_progress = true; 2372 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2373 offset, length, start_lba, num_lba); 2374 pthread_spin_unlock(&file->lock); 2375 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2376 next->buf + (start_lba * lba_size) - next->offset, 2377 start_lba, num_lba, __file_flush_done, req); 2378 } 2379 2380 static void 2381 __file_extend_done(void *arg, int bserrno) 2382 { 2383 struct spdk_fs_cb_args *args = arg; 2384 2385 __wake_caller(args, bserrno); 2386 } 2387 2388 static void 2389 __file_extend_resize_cb(void *_args, int bserrno) 2390 { 2391 struct spdk_fs_cb_args *args = _args; 2392 struct spdk_file *file = args->file; 2393 2394 if (bserrno) { 2395 __wake_caller(args, bserrno); 2396 return; 2397 } 2398 2399 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2400 } 2401 2402 static void 2403 __file_extend_blob(void *_args) 2404 { 2405 struct spdk_fs_cb_args *args = _args; 2406 struct spdk_file *file = args->file; 2407 2408 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2409 } 2410 2411 static void 2412 __rw_from_file_done(void *ctx, int bserrno) 2413 { 2414 struct spdk_fs_request *req = ctx; 2415 2416 __wake_caller(&req->args, bserrno); 2417 free_fs_request(req); 2418 } 2419 2420 static void 2421 __rw_from_file(void *ctx) 2422 { 2423 struct spdk_fs_request *req = ctx; 2424 struct spdk_fs_cb_args *args = &req->args; 2425 struct spdk_file *file = args->file; 2426 2427 if (args->op.rw.is_read) { 2428 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2429 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2430 __rw_from_file_done, req); 2431 } else { 2432 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2433 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2434 __rw_from_file_done, req); 2435 } 2436 } 2437 2438 static int 2439 __send_rw_from_file(struct spdk_file *file, void *payload, 2440 uint64_t offset, uint64_t length, bool is_read, 2441 struct spdk_fs_channel *channel) 2442 { 2443 struct spdk_fs_request *req; 2444 struct spdk_fs_cb_args *args; 2445 2446 req = alloc_fs_request_with_iov(channel, 1); 2447 if (req == NULL) { 2448 sem_post(&channel->sem); 2449 return -ENOMEM; 2450 } 2451 2452 args = &req->args; 2453 args->file = file; 2454 args->sem = &channel->sem; 2455 args->iovs[0].iov_base = payload; 2456 args->iovs[0].iov_len = (size_t)length; 2457 args->op.rw.offset = offset; 2458 args->op.rw.is_read = is_read; 2459 file->fs->send_request(__rw_from_file, req); 2460 return 0; 2461 } 2462 2463 int 2464 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2465 void *payload, uint64_t offset, uint64_t length) 2466 { 2467 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2468 struct spdk_fs_request *flush_req; 2469 uint64_t rem_length, copy, blob_size, cluster_sz; 2470 uint32_t cache_buffers_filled = 0; 2471 uint8_t *cur_payload; 2472 struct cache_buffer *last; 2473 2474 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2475 2476 if (length == 0) { 2477 return 0; 2478 } 2479 2480 if (offset != file->append_pos) { 2481 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2482 return -EINVAL; 2483 } 2484 2485 pthread_spin_lock(&file->lock); 2486 file->open_for_writing = true; 2487 2488 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2489 cache_append_buffer(file); 2490 } 2491 2492 if (file->last == NULL) { 2493 int rc; 2494 2495 file->append_pos += length; 2496 pthread_spin_unlock(&file->lock); 2497 rc = __send_rw_from_file(file, payload, offset, length, false, channel); 2498 sem_wait(&channel->sem); 2499 return rc; 2500 } 2501 2502 blob_size = __file_get_blob_size(file); 2503 2504 if ((offset + length) > blob_size) { 2505 struct spdk_fs_cb_args extend_args = {}; 2506 2507 cluster_sz = file->fs->bs_opts.cluster_sz; 2508 extend_args.sem = &channel->sem; 2509 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2510 extend_args.file = file; 2511 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2512 pthread_spin_unlock(&file->lock); 2513 file->fs->send_request(__file_extend_blob, &extend_args); 2514 sem_wait(&channel->sem); 2515 if (extend_args.rc) { 2516 return extend_args.rc; 2517 } 2518 } 2519 2520 flush_req = alloc_fs_request(channel); 2521 if (flush_req == NULL) { 2522 pthread_spin_unlock(&file->lock); 2523 return -ENOMEM; 2524 } 2525 2526 last = file->last; 2527 rem_length = length; 2528 cur_payload = payload; 2529 while (rem_length > 0) { 2530 copy = last->buf_size - last->bytes_filled; 2531 if (copy > rem_length) { 2532 copy = rem_length; 2533 } 2534 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2535 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2536 file->append_pos += copy; 2537 if (file->length < file->append_pos) { 2538 file->length = file->append_pos; 2539 } 2540 cur_payload += copy; 2541 last->bytes_filled += copy; 2542 rem_length -= copy; 2543 if (last->bytes_filled == last->buf_size) { 2544 cache_buffers_filled++; 2545 last = cache_append_buffer(file); 2546 if (last == NULL) { 2547 BLOBFS_TRACE(file, "nomem\n"); 2548 free_fs_request(flush_req); 2549 pthread_spin_unlock(&file->lock); 2550 return -ENOMEM; 2551 } 2552 } 2553 } 2554 2555 pthread_spin_unlock(&file->lock); 2556 2557 if (cache_buffers_filled == 0) { 2558 free_fs_request(flush_req); 2559 return 0; 2560 } 2561 2562 flush_req->args.file = file; 2563 file->fs->send_request(__file_flush, flush_req); 2564 return 0; 2565 } 2566 2567 static void 2568 __readahead_done(void *ctx, int bserrno) 2569 { 2570 struct spdk_fs_request *req = ctx; 2571 struct spdk_fs_cb_args *args = &req->args; 2572 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2573 struct spdk_file *file = args->file; 2574 2575 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2576 2577 pthread_spin_lock(&file->lock); 2578 cache_buffer->bytes_filled = args->op.readahead.length; 2579 cache_buffer->bytes_flushed = args->op.readahead.length; 2580 cache_buffer->in_progress = false; 2581 pthread_spin_unlock(&file->lock); 2582 2583 free_fs_request(req); 2584 } 2585 2586 static void 2587 __readahead(void *ctx) 2588 { 2589 struct spdk_fs_request *req = ctx; 2590 struct spdk_fs_cb_args *args = &req->args; 2591 struct spdk_file *file = args->file; 2592 uint64_t offset, length, start_lba, num_lba; 2593 uint32_t lba_size; 2594 2595 offset = args->op.readahead.offset; 2596 length = args->op.readahead.length; 2597 assert(length > 0); 2598 2599 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2600 2601 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2602 offset, length, start_lba, num_lba); 2603 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2604 args->op.readahead.cache_buffer->buf, 2605 start_lba, num_lba, __readahead_done, req); 2606 } 2607 2608 static uint64_t 2609 __next_cache_buffer_offset(uint64_t offset) 2610 { 2611 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2612 } 2613 2614 static void 2615 check_readahead(struct spdk_file *file, uint64_t offset, 2616 struct spdk_fs_channel *channel) 2617 { 2618 struct spdk_fs_request *req; 2619 struct spdk_fs_cb_args *args; 2620 2621 offset = __next_cache_buffer_offset(offset); 2622 if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2623 return; 2624 } 2625 2626 req = alloc_fs_request(channel); 2627 if (req == NULL) { 2628 return; 2629 } 2630 args = &req->args; 2631 2632 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2633 2634 args->file = file; 2635 args->op.readahead.offset = offset; 2636 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2637 if (!args->op.readahead.cache_buffer) { 2638 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2639 free_fs_request(req); 2640 return; 2641 } 2642 2643 args->op.readahead.cache_buffer->in_progress = true; 2644 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2645 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2646 } else { 2647 args->op.readahead.length = CACHE_BUFFER_SIZE; 2648 } 2649 file->fs->send_request(__readahead, req); 2650 } 2651 2652 int64_t 2653 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2654 void *payload, uint64_t offset, uint64_t length) 2655 { 2656 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2657 uint64_t final_offset, final_length; 2658 uint32_t sub_reads = 0; 2659 struct cache_buffer *buf; 2660 uint64_t read_len; 2661 int rc = 0; 2662 2663 pthread_spin_lock(&file->lock); 2664 2665 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2666 2667 file->open_for_writing = false; 2668 2669 if (length == 0 || offset >= file->append_pos) { 2670 pthread_spin_unlock(&file->lock); 2671 return 0; 2672 } 2673 2674 if (offset + length > file->append_pos) { 2675 length = file->append_pos - offset; 2676 } 2677 2678 if (offset != file->next_seq_offset) { 2679 file->seq_byte_count = 0; 2680 } 2681 file->seq_byte_count += length; 2682 file->next_seq_offset = offset + length; 2683 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2684 check_readahead(file, offset, channel); 2685 check_readahead(file, offset + CACHE_BUFFER_SIZE, channel); 2686 } 2687 2688 final_length = 0; 2689 final_offset = offset + length; 2690 while (offset < final_offset) { 2691 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2692 if (length > (final_offset - offset)) { 2693 length = final_offset - offset; 2694 } 2695 2696 buf = tree_find_filled_buffer(file->tree, offset); 2697 if (buf == NULL) { 2698 pthread_spin_unlock(&file->lock); 2699 rc = __send_rw_from_file(file, payload, offset, length, true, channel); 2700 pthread_spin_lock(&file->lock); 2701 if (rc == 0) { 2702 sub_reads++; 2703 } 2704 } else { 2705 read_len = length; 2706 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2707 read_len = buf->offset + buf->bytes_filled - offset; 2708 } 2709 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len); 2710 memcpy(payload, &buf->buf[offset - buf->offset], read_len); 2711 if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) { 2712 tree_remove_buffer(file->tree, buf); 2713 if (file->tree->present_mask == 0) { 2714 spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file); 2715 } 2716 } 2717 } 2718 2719 if (rc == 0) { 2720 final_length += length; 2721 } else { 2722 break; 2723 } 2724 payload += length; 2725 offset += length; 2726 } 2727 pthread_spin_unlock(&file->lock); 2728 while (sub_reads > 0) { 2729 sem_wait(&channel->sem); 2730 sub_reads--; 2731 } 2732 if (rc == 0) { 2733 return final_length; 2734 } else { 2735 return rc; 2736 } 2737 } 2738 2739 static void 2740 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2741 spdk_file_op_complete cb_fn, void *cb_arg) 2742 { 2743 struct spdk_fs_request *sync_req; 2744 struct spdk_fs_request *flush_req; 2745 struct spdk_fs_cb_args *sync_args; 2746 struct spdk_fs_cb_args *flush_args; 2747 2748 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2749 2750 pthread_spin_lock(&file->lock); 2751 if (file->append_pos <= file->length_xattr) { 2752 BLOBFS_TRACE(file, "done - file already synced\n"); 2753 pthread_spin_unlock(&file->lock); 2754 cb_fn(cb_arg, 0); 2755 return; 2756 } 2757 2758 sync_req = alloc_fs_request(channel); 2759 if (!sync_req) { 2760 SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name); 2761 pthread_spin_unlock(&file->lock); 2762 cb_fn(cb_arg, -ENOMEM); 2763 return; 2764 } 2765 sync_args = &sync_req->args; 2766 2767 flush_req = alloc_fs_request(channel); 2768 if (!flush_req) { 2769 SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name); 2770 free_fs_request(sync_req); 2771 pthread_spin_unlock(&file->lock); 2772 cb_fn(cb_arg, -ENOMEM); 2773 return; 2774 } 2775 flush_args = &flush_req->args; 2776 2777 sync_args->file = file; 2778 sync_args->fn.file_op = cb_fn; 2779 sync_args->arg = cb_arg; 2780 sync_args->op.sync.offset = file->append_pos; 2781 sync_args->op.sync.xattr_in_progress = false; 2782 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2783 pthread_spin_unlock(&file->lock); 2784 2785 flush_args->file = file; 2786 channel->send_request(__file_flush, flush_req); 2787 } 2788 2789 int 2790 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2791 { 2792 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2793 struct spdk_fs_cb_args args = {}; 2794 2795 args.sem = &channel->sem; 2796 _file_sync(file, channel, __wake_caller, &args); 2797 sem_wait(&channel->sem); 2798 2799 return args.rc; 2800 } 2801 2802 void 2803 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2804 spdk_file_op_complete cb_fn, void *cb_arg) 2805 { 2806 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2807 2808 _file_sync(file, channel, cb_fn, cb_arg); 2809 } 2810 2811 void 2812 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2813 { 2814 BLOBFS_TRACE(file, "priority=%u\n", priority); 2815 file->priority = priority; 2816 2817 } 2818 2819 /* 2820 * Close routines 2821 */ 2822 2823 static void 2824 __file_close_async_done(void *ctx, int bserrno) 2825 { 2826 struct spdk_fs_request *req = ctx; 2827 struct spdk_fs_cb_args *args = &req->args; 2828 struct spdk_file *file = args->file; 2829 2830 spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name); 2831 2832 if (file->is_deleted) { 2833 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2834 return; 2835 } 2836 2837 args->fn.file_op(args->arg, bserrno); 2838 free_fs_request(req); 2839 } 2840 2841 static void 2842 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2843 { 2844 struct spdk_blob *blob; 2845 2846 pthread_spin_lock(&file->lock); 2847 if (file->ref_count == 0) { 2848 pthread_spin_unlock(&file->lock); 2849 __file_close_async_done(req, -EBADF); 2850 return; 2851 } 2852 2853 file->ref_count--; 2854 if (file->ref_count > 0) { 2855 pthread_spin_unlock(&file->lock); 2856 req->args.fn.file_op(req->args.arg, 0); 2857 free_fs_request(req); 2858 return; 2859 } 2860 2861 pthread_spin_unlock(&file->lock); 2862 2863 blob = file->blob; 2864 file->blob = NULL; 2865 spdk_blob_close(blob, __file_close_async_done, req); 2866 } 2867 2868 static void 2869 __file_close_async__sync_done(void *arg, int fserrno) 2870 { 2871 struct spdk_fs_request *req = arg; 2872 struct spdk_fs_cb_args *args = &req->args; 2873 2874 __file_close_async(args->file, req); 2875 } 2876 2877 void 2878 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2879 { 2880 struct spdk_fs_request *req; 2881 struct spdk_fs_cb_args *args; 2882 2883 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2884 if (req == NULL) { 2885 SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name); 2886 cb_fn(cb_arg, -ENOMEM); 2887 return; 2888 } 2889 2890 args = &req->args; 2891 args->file = file; 2892 args->fn.file_op = cb_fn; 2893 args->arg = cb_arg; 2894 2895 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2896 } 2897 2898 static void 2899 __file_close(void *arg) 2900 { 2901 struct spdk_fs_request *req = arg; 2902 struct spdk_fs_cb_args *args = &req->args; 2903 struct spdk_file *file = args->file; 2904 2905 __file_close_async(file, req); 2906 } 2907 2908 int 2909 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2910 { 2911 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2912 struct spdk_fs_request *req; 2913 struct spdk_fs_cb_args *args; 2914 2915 req = alloc_fs_request(channel); 2916 if (req == NULL) { 2917 SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name); 2918 return -ENOMEM; 2919 } 2920 2921 args = &req->args; 2922 2923 spdk_file_sync(file, ctx); 2924 BLOBFS_TRACE(file, "name=%s\n", file->name); 2925 args->file = file; 2926 args->sem = &channel->sem; 2927 args->fn.file_op = __wake_caller; 2928 args->arg = args; 2929 channel->send_request(__file_close, req); 2930 sem_wait(&channel->sem); 2931 2932 return args->rc; 2933 } 2934 2935 int 2936 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2937 { 2938 if (size < sizeof(spdk_blob_id)) { 2939 return -EINVAL; 2940 } 2941 2942 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2943 2944 return sizeof(spdk_blob_id); 2945 } 2946 2947 static void 2948 _file_free(void *ctx) 2949 { 2950 struct spdk_file *file = ctx; 2951 2952 pthread_spin_lock(&g_caches_lock); 2953 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2954 pthread_spin_unlock(&g_caches_lock); 2955 2956 free(file->name); 2957 free(file->tree); 2958 free(file); 2959 } 2960 2961 static void 2962 file_free(struct spdk_file *file) 2963 { 2964 BLOBFS_TRACE(file, "free=%s\n", file->name); 2965 pthread_spin_lock(&file->lock); 2966 if (file->tree->present_mask == 0) { 2967 pthread_spin_unlock(&file->lock); 2968 free(file->name); 2969 free(file->tree); 2970 free(file); 2971 return; 2972 } 2973 2974 tree_free_buffers(file->tree); 2975 assert(file->tree->present_mask == 0); 2976 spdk_thread_send_msg(g_cache_pool_thread, _file_free, file); 2977 pthread_spin_unlock(&file->lock); 2978 } 2979 2980 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2981 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2982