1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "tree.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/thread.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 #include "spdk/trace.h" 47 48 #define BLOBFS_TRACE(file, str, args...) \ 49 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 50 51 #define BLOBFS_TRACE_RW(file, str, args...) \ 52 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 53 54 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 55 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 56 57 #define SPDK_BLOBFS_SIGNATURE "BLOBFS" 58 59 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 60 static struct spdk_mempool *g_cache_pool; 61 static TAILQ_HEAD(, spdk_file) g_caches; 62 static struct spdk_poller *g_cache_pool_mgmt_poller; 63 static struct spdk_thread *g_cache_pool_thread; 64 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL 65 static int g_fs_count = 0; 66 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 67 68 #define TRACE_GROUP_BLOBFS 0x7 69 #define TRACE_BLOBFS_XATTR_START SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0) 70 #define TRACE_BLOBFS_XATTR_END SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1) 71 #define TRACE_BLOBFS_OPEN SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2) 72 #define TRACE_BLOBFS_CLOSE SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3) 73 #define TRACE_BLOBFS_DELETE_START SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4) 74 #define TRACE_BLOBFS_DELETE_DONE SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5) 75 76 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS) 77 { 78 spdk_trace_register_description("BLOBFS_XATTR_START", 79 TRACE_BLOBFS_XATTR_START, 80 OWNER_NONE, OBJECT_NONE, 0, 81 SPDK_TRACE_ARG_TYPE_STR, 82 "file: "); 83 spdk_trace_register_description("BLOBFS_XATTR_END", 84 TRACE_BLOBFS_XATTR_END, 85 OWNER_NONE, OBJECT_NONE, 0, 86 SPDK_TRACE_ARG_TYPE_STR, 87 "file: "); 88 spdk_trace_register_description("BLOBFS_OPEN", 89 TRACE_BLOBFS_OPEN, 90 OWNER_NONE, OBJECT_NONE, 0, 91 SPDK_TRACE_ARG_TYPE_STR, 92 "file: "); 93 spdk_trace_register_description("BLOBFS_CLOSE", 94 TRACE_BLOBFS_CLOSE, 95 OWNER_NONE, OBJECT_NONE, 0, 96 SPDK_TRACE_ARG_TYPE_STR, 97 "file: "); 98 spdk_trace_register_description("BLOBFS_DELETE_START", 99 TRACE_BLOBFS_DELETE_START, 100 OWNER_NONE, OBJECT_NONE, 0, 101 SPDK_TRACE_ARG_TYPE_STR, 102 "file: "); 103 spdk_trace_register_description("BLOBFS_DELETE_DONE", 104 TRACE_BLOBFS_DELETE_DONE, 105 OWNER_NONE, OBJECT_NONE, 0, 106 SPDK_TRACE_ARG_TYPE_STR, 107 "file: "); 108 } 109 110 void 111 cache_buffer_free(struct cache_buffer *cache_buffer) 112 { 113 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 114 free(cache_buffer); 115 } 116 117 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 118 119 struct spdk_file { 120 struct spdk_filesystem *fs; 121 struct spdk_blob *blob; 122 char *name; 123 uint64_t trace_arg_name; 124 uint64_t length; 125 bool is_deleted; 126 bool open_for_writing; 127 uint64_t length_flushed; 128 uint64_t length_xattr; 129 uint64_t append_pos; 130 uint64_t seq_byte_count; 131 uint64_t next_seq_offset; 132 uint32_t priority; 133 TAILQ_ENTRY(spdk_file) tailq; 134 spdk_blob_id blobid; 135 uint32_t ref_count; 136 pthread_spinlock_t lock; 137 struct cache_buffer *last; 138 struct cache_tree *tree; 139 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 140 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 141 TAILQ_ENTRY(spdk_file) cache_tailq; 142 }; 143 144 struct spdk_deleted_file { 145 spdk_blob_id id; 146 TAILQ_ENTRY(spdk_deleted_file) tailq; 147 }; 148 149 struct spdk_filesystem { 150 struct spdk_blob_store *bs; 151 TAILQ_HEAD(, spdk_file) files; 152 struct spdk_bs_opts bs_opts; 153 struct spdk_bs_dev *bdev; 154 fs_send_request_fn send_request; 155 156 struct { 157 uint32_t max_ops; 158 struct spdk_io_channel *sync_io_channel; 159 struct spdk_fs_channel *sync_fs_channel; 160 } sync_target; 161 162 struct { 163 uint32_t max_ops; 164 struct spdk_io_channel *md_io_channel; 165 struct spdk_fs_channel *md_fs_channel; 166 } md_target; 167 168 struct { 169 uint32_t max_ops; 170 } io_target; 171 }; 172 173 struct spdk_fs_cb_args { 174 union { 175 spdk_fs_op_with_handle_complete fs_op_with_handle; 176 spdk_fs_op_complete fs_op; 177 spdk_file_op_with_handle_complete file_op_with_handle; 178 spdk_file_op_complete file_op; 179 spdk_file_stat_op_complete stat_op; 180 } fn; 181 void *arg; 182 sem_t *sem; 183 struct spdk_filesystem *fs; 184 struct spdk_file *file; 185 int rc; 186 struct iovec *iovs; 187 uint32_t iovcnt; 188 struct iovec iov; 189 union { 190 struct { 191 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 192 } fs_load; 193 struct { 194 uint64_t length; 195 } truncate; 196 struct { 197 struct spdk_io_channel *channel; 198 void *pin_buf; 199 int is_read; 200 off_t offset; 201 size_t length; 202 uint64_t start_lba; 203 uint64_t num_lba; 204 uint32_t blocklen; 205 } rw; 206 struct { 207 const char *old_name; 208 const char *new_name; 209 } rename; 210 struct { 211 struct cache_buffer *cache_buffer; 212 uint64_t length; 213 } flush; 214 struct { 215 struct cache_buffer *cache_buffer; 216 uint64_t length; 217 uint64_t offset; 218 } readahead; 219 struct { 220 /* offset of the file when the sync request was made */ 221 uint64_t offset; 222 TAILQ_ENTRY(spdk_fs_request) tailq; 223 bool xattr_in_progress; 224 /* length written to the xattr for this file - this should 225 * always be the same as the offset if only one thread is 226 * writing to the file, but could differ if multiple threads 227 * are appending 228 */ 229 uint64_t length; 230 } sync; 231 struct { 232 uint32_t num_clusters; 233 } resize; 234 struct { 235 const char *name; 236 uint32_t flags; 237 TAILQ_ENTRY(spdk_fs_request) tailq; 238 } open; 239 struct { 240 const char *name; 241 struct spdk_blob *blob; 242 } create; 243 struct { 244 const char *name; 245 } delete; 246 struct { 247 const char *name; 248 } stat; 249 } op; 250 }; 251 252 static void file_free(struct spdk_file *file); 253 static void fs_io_device_unregister(struct spdk_filesystem *fs); 254 static void fs_free_io_channels(struct spdk_filesystem *fs); 255 256 void 257 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 258 { 259 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 260 } 261 262 static int _blobfs_cache_pool_reclaim(void *arg); 263 264 static bool 265 blobfs_cache_pool_need_reclaim(void) 266 { 267 size_t count; 268 269 count = spdk_mempool_count(g_cache_pool); 270 /* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller 271 * when the number of available cache buffer is less than 1/5 of total buffers. 272 */ 273 if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) { 274 return false; 275 } 276 277 return true; 278 } 279 280 static void 281 __start_cache_pool_mgmt(void *ctx) 282 { 283 assert(g_cache_pool == NULL); 284 285 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 286 g_fs_cache_size / CACHE_BUFFER_SIZE, 287 CACHE_BUFFER_SIZE, 288 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 289 SPDK_ENV_SOCKET_ID_ANY); 290 if (!g_cache_pool) { 291 SPDK_ERRLOG("Create mempool failed, you may " 292 "increase the memory and try again\n"); 293 assert(false); 294 } 295 TAILQ_INIT(&g_caches); 296 297 assert(g_cache_pool_mgmt_poller == NULL); 298 g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL, 299 BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 300 } 301 302 static void 303 __stop_cache_pool_mgmt(void *ctx) 304 { 305 spdk_poller_unregister(&g_cache_pool_mgmt_poller); 306 307 assert(g_cache_pool != NULL); 308 assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE); 309 spdk_mempool_free(g_cache_pool); 310 g_cache_pool = NULL; 311 312 spdk_thread_exit(g_cache_pool_thread); 313 } 314 315 static void 316 initialize_global_cache(void) 317 { 318 pthread_mutex_lock(&g_cache_init_lock); 319 if (g_fs_count == 0) { 320 g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL); 321 assert(g_cache_pool_thread != NULL); 322 spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL); 323 } 324 g_fs_count++; 325 pthread_mutex_unlock(&g_cache_init_lock); 326 } 327 328 static void 329 free_global_cache(void) 330 { 331 pthread_mutex_lock(&g_cache_init_lock); 332 g_fs_count--; 333 if (g_fs_count == 0) { 334 spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL); 335 } 336 pthread_mutex_unlock(&g_cache_init_lock); 337 } 338 339 static uint64_t 340 __file_get_blob_size(struct spdk_file *file) 341 { 342 uint64_t cluster_sz; 343 344 cluster_sz = file->fs->bs_opts.cluster_sz; 345 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 346 } 347 348 struct spdk_fs_request { 349 struct spdk_fs_cb_args args; 350 TAILQ_ENTRY(spdk_fs_request) link; 351 struct spdk_fs_channel *channel; 352 }; 353 354 struct spdk_fs_channel { 355 struct spdk_fs_request *req_mem; 356 TAILQ_HEAD(, spdk_fs_request) reqs; 357 sem_t sem; 358 struct spdk_filesystem *fs; 359 struct spdk_io_channel *bs_channel; 360 fs_send_request_fn send_request; 361 bool sync; 362 uint32_t outstanding_reqs; 363 pthread_spinlock_t lock; 364 }; 365 366 /* For now, this is effectively an alias. But eventually we'll shift 367 * some data members over. */ 368 struct spdk_fs_thread_ctx { 369 struct spdk_fs_channel ch; 370 }; 371 372 static struct spdk_fs_request * 373 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt) 374 { 375 struct spdk_fs_request *req; 376 struct iovec *iovs = NULL; 377 378 if (iovcnt > 1) { 379 iovs = calloc(iovcnt, sizeof(struct iovec)); 380 if (!iovs) { 381 return NULL; 382 } 383 } 384 385 if (channel->sync) { 386 pthread_spin_lock(&channel->lock); 387 } 388 389 req = TAILQ_FIRST(&channel->reqs); 390 if (req) { 391 channel->outstanding_reqs++; 392 TAILQ_REMOVE(&channel->reqs, req, link); 393 } 394 395 if (channel->sync) { 396 pthread_spin_unlock(&channel->lock); 397 } 398 399 if (req == NULL) { 400 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 401 free(iovs); 402 return NULL; 403 } 404 memset(req, 0, sizeof(*req)); 405 req->channel = channel; 406 if (iovcnt > 1) { 407 req->args.iovs = iovs; 408 } else { 409 req->args.iovs = &req->args.iov; 410 } 411 req->args.iovcnt = iovcnt; 412 413 return req; 414 } 415 416 static struct spdk_fs_request * 417 alloc_fs_request(struct spdk_fs_channel *channel) 418 { 419 return alloc_fs_request_with_iov(channel, 0); 420 } 421 422 static void 423 free_fs_request(struct spdk_fs_request *req) 424 { 425 struct spdk_fs_channel *channel = req->channel; 426 427 if (req->args.iovcnt > 1) { 428 free(req->args.iovs); 429 } 430 431 if (channel->sync) { 432 pthread_spin_lock(&channel->lock); 433 } 434 435 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 436 channel->outstanding_reqs--; 437 438 if (channel->sync) { 439 pthread_spin_unlock(&channel->lock); 440 } 441 } 442 443 static int 444 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 445 uint32_t max_ops) 446 { 447 uint32_t i; 448 449 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 450 if (!channel->req_mem) { 451 return -1; 452 } 453 454 channel->outstanding_reqs = 0; 455 TAILQ_INIT(&channel->reqs); 456 sem_init(&channel->sem, 0, 0); 457 458 for (i = 0; i < max_ops; i++) { 459 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 460 } 461 462 channel->fs = fs; 463 464 return 0; 465 } 466 467 static int 468 fs_md_channel_create(void *io_device, void *ctx_buf) 469 { 470 struct spdk_filesystem *fs; 471 struct spdk_fs_channel *channel = ctx_buf; 472 473 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 474 475 return fs_channel_create(fs, channel, fs->md_target.max_ops); 476 } 477 478 static int 479 fs_sync_channel_create(void *io_device, void *ctx_buf) 480 { 481 struct spdk_filesystem *fs; 482 struct spdk_fs_channel *channel = ctx_buf; 483 484 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 485 486 return fs_channel_create(fs, channel, fs->sync_target.max_ops); 487 } 488 489 static int 490 fs_io_channel_create(void *io_device, void *ctx_buf) 491 { 492 struct spdk_filesystem *fs; 493 struct spdk_fs_channel *channel = ctx_buf; 494 495 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 496 497 return fs_channel_create(fs, channel, fs->io_target.max_ops); 498 } 499 500 static void 501 fs_channel_destroy(void *io_device, void *ctx_buf) 502 { 503 struct spdk_fs_channel *channel = ctx_buf; 504 505 if (channel->outstanding_reqs > 0) { 506 SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n", 507 channel->outstanding_reqs); 508 } 509 510 free(channel->req_mem); 511 if (channel->bs_channel != NULL) { 512 spdk_bs_free_io_channel(channel->bs_channel); 513 } 514 } 515 516 static void 517 __send_request_direct(fs_request_fn fn, void *arg) 518 { 519 fn(arg); 520 } 521 522 static void 523 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 524 { 525 fs->bs = bs; 526 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 527 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 528 fs->md_target.md_fs_channel->send_request = __send_request_direct; 529 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 530 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 531 532 initialize_global_cache(); 533 } 534 535 static void 536 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 537 { 538 struct spdk_fs_request *req = ctx; 539 struct spdk_fs_cb_args *args = &req->args; 540 struct spdk_filesystem *fs = args->fs; 541 542 if (bserrno == 0) { 543 common_fs_bs_init(fs, bs); 544 } else { 545 free(fs); 546 fs = NULL; 547 } 548 549 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 550 free_fs_request(req); 551 } 552 553 static void 554 fs_conf_parse(void) 555 { 556 struct spdk_conf_section *sp; 557 int cache_buffer_shift; 558 559 sp = spdk_conf_find_section(NULL, "Blobfs"); 560 if (sp == NULL) { 561 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 562 return; 563 } 564 565 cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 566 if (cache_buffer_shift <= 0) { 567 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 568 } else { 569 g_fs_cache_buffer_shift = cache_buffer_shift; 570 } 571 } 572 573 static struct spdk_filesystem * 574 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 575 { 576 struct spdk_filesystem *fs; 577 578 fs = calloc(1, sizeof(*fs)); 579 if (fs == NULL) { 580 return NULL; 581 } 582 583 fs->bdev = dev; 584 fs->send_request = send_request_fn; 585 TAILQ_INIT(&fs->files); 586 587 fs->md_target.max_ops = 512; 588 spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy, 589 sizeof(struct spdk_fs_channel), "blobfs_md"); 590 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 591 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 592 593 fs->sync_target.max_ops = 512; 594 spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy, 595 sizeof(struct spdk_fs_channel), "blobfs_sync"); 596 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 597 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 598 599 fs->io_target.max_ops = 512; 600 spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy, 601 sizeof(struct spdk_fs_channel), "blobfs_io"); 602 603 return fs; 604 } 605 606 static void 607 __wake_caller(void *arg, int fserrno) 608 { 609 struct spdk_fs_cb_args *args = arg; 610 611 args->rc = fserrno; 612 sem_post(args->sem); 613 } 614 615 void 616 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 617 fs_send_request_fn send_request_fn, 618 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 619 { 620 struct spdk_filesystem *fs; 621 struct spdk_fs_request *req; 622 struct spdk_fs_cb_args *args; 623 struct spdk_bs_opts opts = {}; 624 625 fs = fs_alloc(dev, send_request_fn); 626 if (fs == NULL) { 627 cb_fn(cb_arg, NULL, -ENOMEM); 628 return; 629 } 630 631 fs_conf_parse(); 632 633 req = alloc_fs_request(fs->md_target.md_fs_channel); 634 if (req == NULL) { 635 fs_free_io_channels(fs); 636 fs_io_device_unregister(fs); 637 cb_fn(cb_arg, NULL, -ENOMEM); 638 return; 639 } 640 641 args = &req->args; 642 args->fn.fs_op_with_handle = cb_fn; 643 args->arg = cb_arg; 644 args->fs = fs; 645 646 spdk_bs_opts_init(&opts); 647 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE); 648 if (opt) { 649 opts.cluster_sz = opt->cluster_sz; 650 } 651 spdk_bs_init(dev, &opts, init_cb, req); 652 } 653 654 static struct spdk_file * 655 file_alloc(struct spdk_filesystem *fs) 656 { 657 struct spdk_file *file; 658 659 file = calloc(1, sizeof(*file)); 660 if (file == NULL) { 661 return NULL; 662 } 663 664 file->tree = calloc(1, sizeof(*file->tree)); 665 if (file->tree == NULL) { 666 free(file); 667 return NULL; 668 } 669 670 if (pthread_spin_init(&file->lock, 0)) { 671 free(file->tree); 672 free(file); 673 return NULL; 674 } 675 676 file->fs = fs; 677 TAILQ_INIT(&file->open_requests); 678 TAILQ_INIT(&file->sync_requests); 679 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 680 file->priority = SPDK_FILE_PRIORITY_LOW; 681 return file; 682 } 683 684 static void fs_load_done(void *ctx, int bserrno); 685 686 static int 687 _handle_deleted_files(struct spdk_fs_request *req) 688 { 689 struct spdk_fs_cb_args *args = &req->args; 690 struct spdk_filesystem *fs = args->fs; 691 692 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 693 struct spdk_deleted_file *deleted_file; 694 695 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 696 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 697 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 698 free(deleted_file); 699 return 0; 700 } 701 702 return 1; 703 } 704 705 static void 706 fs_load_done(void *ctx, int bserrno) 707 { 708 struct spdk_fs_request *req = ctx; 709 struct spdk_fs_cb_args *args = &req->args; 710 struct spdk_filesystem *fs = args->fs; 711 712 /* The filesystem has been loaded. Now check if there are any files that 713 * were marked for deletion before last unload. Do not complete the 714 * fs_load callback until all of them have been deleted on disk. 715 */ 716 if (_handle_deleted_files(req) == 0) { 717 /* We found a file that's been marked for deleting but not actually 718 * deleted yet. This function will get called again once the delete 719 * operation is completed. 720 */ 721 return; 722 } 723 724 args->fn.fs_op_with_handle(args->arg, fs, 0); 725 free_fs_request(req); 726 727 } 728 729 static void 730 _file_build_trace_arg_name(struct spdk_file *f) 731 { 732 f->trace_arg_name = 0; 733 memcpy(&f->trace_arg_name, f->name, 734 spdk_min(sizeof(f->trace_arg_name), strlen(f->name))); 735 } 736 737 static void 738 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 739 { 740 struct spdk_fs_request *req = ctx; 741 struct spdk_fs_cb_args *args = &req->args; 742 struct spdk_filesystem *fs = args->fs; 743 uint64_t *length; 744 const char *name; 745 uint32_t *is_deleted; 746 size_t value_len; 747 748 if (rc < 0) { 749 args->fn.fs_op_with_handle(args->arg, fs, rc); 750 free_fs_request(req); 751 return; 752 } 753 754 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 755 if (rc < 0) { 756 args->fn.fs_op_with_handle(args->arg, fs, rc); 757 free_fs_request(req); 758 return; 759 } 760 761 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 762 if (rc < 0) { 763 args->fn.fs_op_with_handle(args->arg, fs, rc); 764 free_fs_request(req); 765 return; 766 } 767 768 assert(value_len == 8); 769 770 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 771 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 772 if (rc < 0) { 773 struct spdk_file *f; 774 775 f = file_alloc(fs); 776 if (f == NULL) { 777 SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n"); 778 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 779 free_fs_request(req); 780 return; 781 } 782 783 f->name = strdup(name); 784 _file_build_trace_arg_name(f); 785 f->blobid = spdk_blob_get_id(blob); 786 f->length = *length; 787 f->length_flushed = *length; 788 f->length_xattr = *length; 789 f->append_pos = *length; 790 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 791 } else { 792 struct spdk_deleted_file *deleted_file; 793 794 deleted_file = calloc(1, sizeof(*deleted_file)); 795 if (deleted_file == NULL) { 796 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 797 free_fs_request(req); 798 return; 799 } 800 deleted_file->id = spdk_blob_get_id(blob); 801 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 802 } 803 } 804 805 static void 806 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 807 { 808 struct spdk_fs_request *req = ctx; 809 struct spdk_fs_cb_args *args = &req->args; 810 struct spdk_filesystem *fs = args->fs; 811 struct spdk_bs_type bstype; 812 static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE}; 813 static const struct spdk_bs_type zeros; 814 815 if (bserrno != 0) { 816 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 817 free_fs_request(req); 818 fs_free_io_channels(fs); 819 fs_io_device_unregister(fs); 820 return; 821 } 822 823 bstype = spdk_bs_get_bstype(bs); 824 825 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 826 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "assigning bstype\n"); 827 spdk_bs_set_bstype(bs, blobfs_type); 828 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 829 SPDK_ERRLOG("not blobfs\n"); 830 SPDK_LOGDUMP(SPDK_LOG_BLOBFS, "bstype", &bstype, sizeof(bstype)); 831 args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL); 832 free_fs_request(req); 833 fs_free_io_channels(fs); 834 fs_io_device_unregister(fs); 835 return; 836 } 837 838 common_fs_bs_init(fs, bs); 839 fs_load_done(req, 0); 840 } 841 842 static void 843 fs_io_device_unregister(struct spdk_filesystem *fs) 844 { 845 assert(fs != NULL); 846 spdk_io_device_unregister(&fs->md_target, NULL); 847 spdk_io_device_unregister(&fs->sync_target, NULL); 848 spdk_io_device_unregister(&fs->io_target, NULL); 849 free(fs); 850 } 851 852 static void 853 fs_free_io_channels(struct spdk_filesystem *fs) 854 { 855 assert(fs != NULL); 856 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 857 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 858 } 859 860 void 861 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 862 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 863 { 864 struct spdk_filesystem *fs; 865 struct spdk_fs_cb_args *args; 866 struct spdk_fs_request *req; 867 struct spdk_bs_opts bs_opts; 868 869 fs = fs_alloc(dev, send_request_fn); 870 if (fs == NULL) { 871 cb_fn(cb_arg, NULL, -ENOMEM); 872 return; 873 } 874 875 fs_conf_parse(); 876 877 req = alloc_fs_request(fs->md_target.md_fs_channel); 878 if (req == NULL) { 879 fs_free_io_channels(fs); 880 fs_io_device_unregister(fs); 881 cb_fn(cb_arg, NULL, -ENOMEM); 882 return; 883 } 884 885 args = &req->args; 886 args->fn.fs_op_with_handle = cb_fn; 887 args->arg = cb_arg; 888 args->fs = fs; 889 TAILQ_INIT(&args->op.fs_load.deleted_files); 890 spdk_bs_opts_init(&bs_opts); 891 bs_opts.iter_cb_fn = iter_cb; 892 bs_opts.iter_cb_arg = req; 893 spdk_bs_load(dev, &bs_opts, load_cb, req); 894 } 895 896 static void 897 unload_cb(void *ctx, int bserrno) 898 { 899 struct spdk_fs_request *req = ctx; 900 struct spdk_fs_cb_args *args = &req->args; 901 struct spdk_filesystem *fs = args->fs; 902 struct spdk_file *file, *tmp; 903 904 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 905 TAILQ_REMOVE(&fs->files, file, tailq); 906 file_free(file); 907 } 908 909 free_global_cache(); 910 911 args->fn.fs_op(args->arg, bserrno); 912 free(req); 913 914 fs_io_device_unregister(fs); 915 } 916 917 void 918 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 919 { 920 struct spdk_fs_request *req; 921 struct spdk_fs_cb_args *args; 922 923 /* 924 * We must free the md_channel before unloading the blobstore, so just 925 * allocate this request from the general heap. 926 */ 927 req = calloc(1, sizeof(*req)); 928 if (req == NULL) { 929 cb_fn(cb_arg, -ENOMEM); 930 return; 931 } 932 933 args = &req->args; 934 args->fn.fs_op = cb_fn; 935 args->arg = cb_arg; 936 args->fs = fs; 937 938 fs_free_io_channels(fs); 939 spdk_bs_unload(fs->bs, unload_cb, req); 940 } 941 942 static struct spdk_file * 943 fs_find_file(struct spdk_filesystem *fs, const char *name) 944 { 945 struct spdk_file *file; 946 947 TAILQ_FOREACH(file, &fs->files, tailq) { 948 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 949 return file; 950 } 951 } 952 953 return NULL; 954 } 955 956 void 957 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 958 spdk_file_stat_op_complete cb_fn, void *cb_arg) 959 { 960 struct spdk_file_stat stat; 961 struct spdk_file *f = NULL; 962 963 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 964 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 965 return; 966 } 967 968 f = fs_find_file(fs, name); 969 if (f != NULL) { 970 stat.blobid = f->blobid; 971 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 972 cb_fn(cb_arg, &stat, 0); 973 return; 974 } 975 976 cb_fn(cb_arg, NULL, -ENOENT); 977 } 978 979 static void 980 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 981 { 982 struct spdk_fs_request *req = arg; 983 struct spdk_fs_cb_args *args = &req->args; 984 985 args->rc = fserrno; 986 if (fserrno == 0) { 987 memcpy(args->arg, stat, sizeof(*stat)); 988 } 989 sem_post(args->sem); 990 } 991 992 static void 993 __file_stat(void *arg) 994 { 995 struct spdk_fs_request *req = arg; 996 struct spdk_fs_cb_args *args = &req->args; 997 998 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 999 args->fn.stat_op, req); 1000 } 1001 1002 int 1003 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1004 const char *name, struct spdk_file_stat *stat) 1005 { 1006 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1007 struct spdk_fs_request *req; 1008 int rc; 1009 1010 req = alloc_fs_request(channel); 1011 if (req == NULL) { 1012 SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name); 1013 return -ENOMEM; 1014 } 1015 1016 req->args.fs = fs; 1017 req->args.op.stat.name = name; 1018 req->args.fn.stat_op = __copy_stat; 1019 req->args.arg = stat; 1020 req->args.sem = &channel->sem; 1021 channel->send_request(__file_stat, req); 1022 sem_wait(&channel->sem); 1023 1024 rc = req->args.rc; 1025 free_fs_request(req); 1026 1027 return rc; 1028 } 1029 1030 static void 1031 fs_create_blob_close_cb(void *ctx, int bserrno) 1032 { 1033 int rc; 1034 struct spdk_fs_request *req = ctx; 1035 struct spdk_fs_cb_args *args = &req->args; 1036 1037 rc = args->rc ? args->rc : bserrno; 1038 args->fn.file_op(args->arg, rc); 1039 free_fs_request(req); 1040 } 1041 1042 static void 1043 fs_create_blob_resize_cb(void *ctx, int bserrno) 1044 { 1045 struct spdk_fs_request *req = ctx; 1046 struct spdk_fs_cb_args *args = &req->args; 1047 struct spdk_file *f = args->file; 1048 struct spdk_blob *blob = args->op.create.blob; 1049 uint64_t length = 0; 1050 1051 args->rc = bserrno; 1052 if (bserrno) { 1053 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1054 return; 1055 } 1056 1057 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 1058 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 1059 1060 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1061 } 1062 1063 static void 1064 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1065 { 1066 struct spdk_fs_request *req = ctx; 1067 struct spdk_fs_cb_args *args = &req->args; 1068 1069 if (bserrno) { 1070 args->fn.file_op(args->arg, bserrno); 1071 free_fs_request(req); 1072 return; 1073 } 1074 1075 args->op.create.blob = blob; 1076 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 1077 } 1078 1079 static void 1080 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 1081 { 1082 struct spdk_fs_request *req = ctx; 1083 struct spdk_fs_cb_args *args = &req->args; 1084 struct spdk_file *f = args->file; 1085 1086 if (bserrno) { 1087 args->fn.file_op(args->arg, bserrno); 1088 free_fs_request(req); 1089 return; 1090 } 1091 1092 f->blobid = blobid; 1093 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 1094 } 1095 1096 void 1097 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 1098 spdk_file_op_complete cb_fn, void *cb_arg) 1099 { 1100 struct spdk_file *file; 1101 struct spdk_fs_request *req; 1102 struct spdk_fs_cb_args *args; 1103 1104 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1105 cb_fn(cb_arg, -ENAMETOOLONG); 1106 return; 1107 } 1108 1109 file = fs_find_file(fs, name); 1110 if (file != NULL) { 1111 cb_fn(cb_arg, -EEXIST); 1112 return; 1113 } 1114 1115 file = file_alloc(fs); 1116 if (file == NULL) { 1117 SPDK_ERRLOG("Cannot allocate new file for creation\n"); 1118 cb_fn(cb_arg, -ENOMEM); 1119 return; 1120 } 1121 1122 req = alloc_fs_request(fs->md_target.md_fs_channel); 1123 if (req == NULL) { 1124 SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name); 1125 cb_fn(cb_arg, -ENOMEM); 1126 return; 1127 } 1128 1129 args = &req->args; 1130 args->file = file; 1131 args->fn.file_op = cb_fn; 1132 args->arg = cb_arg; 1133 1134 file->name = strdup(name); 1135 _file_build_trace_arg_name(file); 1136 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 1137 } 1138 1139 static void 1140 __fs_create_file_done(void *arg, int fserrno) 1141 { 1142 struct spdk_fs_request *req = arg; 1143 struct spdk_fs_cb_args *args = &req->args; 1144 1145 __wake_caller(args, fserrno); 1146 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1147 } 1148 1149 static void 1150 __fs_create_file(void *arg) 1151 { 1152 struct spdk_fs_request *req = arg; 1153 struct spdk_fs_cb_args *args = &req->args; 1154 1155 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1156 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1157 } 1158 1159 int 1160 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name) 1161 { 1162 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1163 struct spdk_fs_request *req; 1164 struct spdk_fs_cb_args *args; 1165 int rc; 1166 1167 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1168 1169 req = alloc_fs_request(channel); 1170 if (req == NULL) { 1171 SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name); 1172 return -ENOMEM; 1173 } 1174 1175 args = &req->args; 1176 args->fs = fs; 1177 args->op.create.name = name; 1178 args->sem = &channel->sem; 1179 fs->send_request(__fs_create_file, req); 1180 sem_wait(&channel->sem); 1181 rc = args->rc; 1182 free_fs_request(req); 1183 1184 return rc; 1185 } 1186 1187 static void 1188 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1189 { 1190 struct spdk_fs_request *req = ctx; 1191 struct spdk_fs_cb_args *args = &req->args; 1192 struct spdk_file *f = args->file; 1193 1194 f->blob = blob; 1195 while (!TAILQ_EMPTY(&f->open_requests)) { 1196 req = TAILQ_FIRST(&f->open_requests); 1197 args = &req->args; 1198 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1199 spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name); 1200 args->fn.file_op_with_handle(args->arg, f, bserrno); 1201 free_fs_request(req); 1202 } 1203 } 1204 1205 static void 1206 fs_open_blob_create_cb(void *ctx, int bserrno) 1207 { 1208 struct spdk_fs_request *req = ctx; 1209 struct spdk_fs_cb_args *args = &req->args; 1210 struct spdk_file *file = args->file; 1211 struct spdk_filesystem *fs = args->fs; 1212 1213 if (file == NULL) { 1214 /* 1215 * This is from an open with CREATE flag - the file 1216 * is now created so look it up in the file list for this 1217 * filesystem. 1218 */ 1219 file = fs_find_file(fs, args->op.open.name); 1220 assert(file != NULL); 1221 args->file = file; 1222 } 1223 1224 file->ref_count++; 1225 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1226 if (file->ref_count == 1) { 1227 assert(file->blob == NULL); 1228 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1229 } else if (file->blob != NULL) { 1230 fs_open_blob_done(req, file->blob, 0); 1231 } else { 1232 /* 1233 * The blob open for this file is in progress due to a previous 1234 * open request. When that open completes, it will invoke the 1235 * open callback for this request. 1236 */ 1237 } 1238 } 1239 1240 void 1241 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1242 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1243 { 1244 struct spdk_file *f = NULL; 1245 struct spdk_fs_request *req; 1246 struct spdk_fs_cb_args *args; 1247 1248 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1249 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1250 return; 1251 } 1252 1253 f = fs_find_file(fs, name); 1254 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1255 cb_fn(cb_arg, NULL, -ENOENT); 1256 return; 1257 } 1258 1259 if (f != NULL && f->is_deleted == true) { 1260 cb_fn(cb_arg, NULL, -ENOENT); 1261 return; 1262 } 1263 1264 req = alloc_fs_request(fs->md_target.md_fs_channel); 1265 if (req == NULL) { 1266 SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name); 1267 cb_fn(cb_arg, NULL, -ENOMEM); 1268 return; 1269 } 1270 1271 args = &req->args; 1272 args->fn.file_op_with_handle = cb_fn; 1273 args->arg = cb_arg; 1274 args->file = f; 1275 args->fs = fs; 1276 args->op.open.name = name; 1277 1278 if (f == NULL) { 1279 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1280 } else { 1281 fs_open_blob_create_cb(req, 0); 1282 } 1283 } 1284 1285 static void 1286 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1287 { 1288 struct spdk_fs_request *req = arg; 1289 struct spdk_fs_cb_args *args = &req->args; 1290 1291 args->file = file; 1292 __wake_caller(args, bserrno); 1293 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1294 } 1295 1296 static void 1297 __fs_open_file(void *arg) 1298 { 1299 struct spdk_fs_request *req = arg; 1300 struct spdk_fs_cb_args *args = &req->args; 1301 1302 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1303 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1304 __fs_open_file_done, req); 1305 } 1306 1307 int 1308 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1309 const char *name, uint32_t flags, struct spdk_file **file) 1310 { 1311 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1312 struct spdk_fs_request *req; 1313 struct spdk_fs_cb_args *args; 1314 int rc; 1315 1316 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1317 1318 req = alloc_fs_request(channel); 1319 if (req == NULL) { 1320 SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name); 1321 return -ENOMEM; 1322 } 1323 1324 args = &req->args; 1325 args->fs = fs; 1326 args->op.open.name = name; 1327 args->op.open.flags = flags; 1328 args->sem = &channel->sem; 1329 fs->send_request(__fs_open_file, req); 1330 sem_wait(&channel->sem); 1331 rc = args->rc; 1332 if (rc == 0) { 1333 *file = args->file; 1334 } else { 1335 *file = NULL; 1336 } 1337 free_fs_request(req); 1338 1339 return rc; 1340 } 1341 1342 static void 1343 fs_rename_blob_close_cb(void *ctx, int bserrno) 1344 { 1345 struct spdk_fs_request *req = ctx; 1346 struct spdk_fs_cb_args *args = &req->args; 1347 1348 args->fn.fs_op(args->arg, bserrno); 1349 free_fs_request(req); 1350 } 1351 1352 static void 1353 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1354 { 1355 struct spdk_fs_request *req = ctx; 1356 struct spdk_fs_cb_args *args = &req->args; 1357 const char *new_name = args->op.rename.new_name; 1358 1359 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1360 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1361 } 1362 1363 static void 1364 _fs_md_rename_file(struct spdk_fs_request *req) 1365 { 1366 struct spdk_fs_cb_args *args = &req->args; 1367 struct spdk_file *f; 1368 1369 f = fs_find_file(args->fs, args->op.rename.old_name); 1370 if (f == NULL) { 1371 args->fn.fs_op(args->arg, -ENOENT); 1372 free_fs_request(req); 1373 return; 1374 } 1375 1376 free(f->name); 1377 f->name = strdup(args->op.rename.new_name); 1378 _file_build_trace_arg_name(f); 1379 args->file = f; 1380 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1381 } 1382 1383 static void 1384 fs_rename_delete_done(void *arg, int fserrno) 1385 { 1386 _fs_md_rename_file(arg); 1387 } 1388 1389 void 1390 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1391 const char *old_name, const char *new_name, 1392 spdk_file_op_complete cb_fn, void *cb_arg) 1393 { 1394 struct spdk_file *f; 1395 struct spdk_fs_request *req; 1396 struct spdk_fs_cb_args *args; 1397 1398 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1399 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1400 cb_fn(cb_arg, -ENAMETOOLONG); 1401 return; 1402 } 1403 1404 req = alloc_fs_request(fs->md_target.md_fs_channel); 1405 if (req == NULL) { 1406 SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name, 1407 new_name); 1408 cb_fn(cb_arg, -ENOMEM); 1409 return; 1410 } 1411 1412 args = &req->args; 1413 args->fn.fs_op = cb_fn; 1414 args->fs = fs; 1415 args->arg = cb_arg; 1416 args->op.rename.old_name = old_name; 1417 args->op.rename.new_name = new_name; 1418 1419 f = fs_find_file(fs, new_name); 1420 if (f == NULL) { 1421 _fs_md_rename_file(req); 1422 return; 1423 } 1424 1425 /* 1426 * The rename overwrites an existing file. So delete the existing file, then 1427 * do the actual rename. 1428 */ 1429 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1430 } 1431 1432 static void 1433 __fs_rename_file_done(void *arg, int fserrno) 1434 { 1435 struct spdk_fs_request *req = arg; 1436 struct spdk_fs_cb_args *args = &req->args; 1437 1438 __wake_caller(args, fserrno); 1439 } 1440 1441 static void 1442 __fs_rename_file(void *arg) 1443 { 1444 struct spdk_fs_request *req = arg; 1445 struct spdk_fs_cb_args *args = &req->args; 1446 1447 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1448 __fs_rename_file_done, req); 1449 } 1450 1451 int 1452 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1453 const char *old_name, const char *new_name) 1454 { 1455 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1456 struct spdk_fs_request *req; 1457 struct spdk_fs_cb_args *args; 1458 int rc; 1459 1460 req = alloc_fs_request(channel); 1461 if (req == NULL) { 1462 SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name); 1463 return -ENOMEM; 1464 } 1465 1466 args = &req->args; 1467 1468 args->fs = fs; 1469 args->op.rename.old_name = old_name; 1470 args->op.rename.new_name = new_name; 1471 args->sem = &channel->sem; 1472 fs->send_request(__fs_rename_file, req); 1473 sem_wait(&channel->sem); 1474 rc = args->rc; 1475 free_fs_request(req); 1476 return rc; 1477 } 1478 1479 static void 1480 blob_delete_cb(void *ctx, int bserrno) 1481 { 1482 struct spdk_fs_request *req = ctx; 1483 struct spdk_fs_cb_args *args = &req->args; 1484 1485 args->fn.file_op(args->arg, bserrno); 1486 free_fs_request(req); 1487 } 1488 1489 void 1490 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1491 spdk_file_op_complete cb_fn, void *cb_arg) 1492 { 1493 struct spdk_file *f; 1494 spdk_blob_id blobid; 1495 struct spdk_fs_request *req; 1496 struct spdk_fs_cb_args *args; 1497 1498 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1499 1500 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1501 cb_fn(cb_arg, -ENAMETOOLONG); 1502 return; 1503 } 1504 1505 f = fs_find_file(fs, name); 1506 if (f == NULL) { 1507 SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name); 1508 cb_fn(cb_arg, -ENOENT); 1509 return; 1510 } 1511 1512 req = alloc_fs_request(fs->md_target.md_fs_channel); 1513 if (req == NULL) { 1514 SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name); 1515 cb_fn(cb_arg, -ENOMEM); 1516 return; 1517 } 1518 1519 args = &req->args; 1520 args->fn.file_op = cb_fn; 1521 args->arg = cb_arg; 1522 1523 if (f->ref_count > 0) { 1524 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1525 f->is_deleted = true; 1526 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1527 spdk_blob_sync_md(f->blob, blob_delete_cb, req); 1528 return; 1529 } 1530 1531 blobid = f->blobid; 1532 TAILQ_REMOVE(&fs->files, f, tailq); 1533 1534 file_free(f); 1535 1536 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1537 } 1538 1539 static uint64_t 1540 fs_name_to_uint64(const char *name) 1541 { 1542 uint64_t result = 0; 1543 memcpy(&result, name, spdk_min(sizeof(result), strlen(name))); 1544 return result; 1545 } 1546 1547 static void 1548 __fs_delete_file_done(void *arg, int fserrno) 1549 { 1550 struct spdk_fs_request *req = arg; 1551 struct spdk_fs_cb_args *args = &req->args; 1552 1553 spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, fs_name_to_uint64(args->op.delete.name)); 1554 __wake_caller(args, fserrno); 1555 } 1556 1557 static void 1558 __fs_delete_file(void *arg) 1559 { 1560 struct spdk_fs_request *req = arg; 1561 struct spdk_fs_cb_args *args = &req->args; 1562 1563 spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, fs_name_to_uint64(args->op.delete.name)); 1564 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1565 } 1566 1567 int 1568 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1569 const char *name) 1570 { 1571 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1572 struct spdk_fs_request *req; 1573 struct spdk_fs_cb_args *args; 1574 int rc; 1575 1576 req = alloc_fs_request(channel); 1577 if (req == NULL) { 1578 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Cannot allocate req to delete file=%s\n", name); 1579 return -ENOMEM; 1580 } 1581 1582 args = &req->args; 1583 args->fs = fs; 1584 args->op.delete.name = name; 1585 args->sem = &channel->sem; 1586 fs->send_request(__fs_delete_file, req); 1587 sem_wait(&channel->sem); 1588 rc = args->rc; 1589 free_fs_request(req); 1590 1591 return rc; 1592 } 1593 1594 spdk_fs_iter 1595 spdk_fs_iter_first(struct spdk_filesystem *fs) 1596 { 1597 struct spdk_file *f; 1598 1599 f = TAILQ_FIRST(&fs->files); 1600 return f; 1601 } 1602 1603 spdk_fs_iter 1604 spdk_fs_iter_next(spdk_fs_iter iter) 1605 { 1606 struct spdk_file *f = iter; 1607 1608 if (f == NULL) { 1609 return NULL; 1610 } 1611 1612 f = TAILQ_NEXT(f, tailq); 1613 return f; 1614 } 1615 1616 const char * 1617 spdk_file_get_name(struct spdk_file *file) 1618 { 1619 return file->name; 1620 } 1621 1622 uint64_t 1623 spdk_file_get_length(struct spdk_file *file) 1624 { 1625 uint64_t length; 1626 1627 assert(file != NULL); 1628 1629 length = file->append_pos >= file->length ? file->append_pos : file->length; 1630 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length); 1631 return length; 1632 } 1633 1634 static void 1635 fs_truncate_complete_cb(void *ctx, int bserrno) 1636 { 1637 struct spdk_fs_request *req = ctx; 1638 struct spdk_fs_cb_args *args = &req->args; 1639 1640 args->fn.file_op(args->arg, bserrno); 1641 free_fs_request(req); 1642 } 1643 1644 static void 1645 fs_truncate_resize_cb(void *ctx, int bserrno) 1646 { 1647 struct spdk_fs_request *req = ctx; 1648 struct spdk_fs_cb_args *args = &req->args; 1649 struct spdk_file *file = args->file; 1650 uint64_t *length = &args->op.truncate.length; 1651 1652 if (bserrno) { 1653 args->fn.file_op(args->arg, bserrno); 1654 free_fs_request(req); 1655 return; 1656 } 1657 1658 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1659 1660 file->length = *length; 1661 if (file->append_pos > file->length) { 1662 file->append_pos = file->length; 1663 } 1664 1665 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req); 1666 } 1667 1668 static uint64_t 1669 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1670 { 1671 return (length + cluster_sz - 1) / cluster_sz; 1672 } 1673 1674 void 1675 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1676 spdk_file_op_complete cb_fn, void *cb_arg) 1677 { 1678 struct spdk_filesystem *fs; 1679 size_t num_clusters; 1680 struct spdk_fs_request *req; 1681 struct spdk_fs_cb_args *args; 1682 1683 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1684 if (length == file->length) { 1685 cb_fn(cb_arg, 0); 1686 return; 1687 } 1688 1689 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1690 if (req == NULL) { 1691 cb_fn(cb_arg, -ENOMEM); 1692 return; 1693 } 1694 1695 args = &req->args; 1696 args->fn.file_op = cb_fn; 1697 args->arg = cb_arg; 1698 args->file = file; 1699 args->op.truncate.length = length; 1700 fs = file->fs; 1701 1702 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1703 1704 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1705 } 1706 1707 static void 1708 __truncate(void *arg) 1709 { 1710 struct spdk_fs_request *req = arg; 1711 struct spdk_fs_cb_args *args = &req->args; 1712 1713 spdk_file_truncate_async(args->file, args->op.truncate.length, 1714 args->fn.file_op, args); 1715 } 1716 1717 int 1718 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 1719 uint64_t length) 1720 { 1721 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1722 struct spdk_fs_request *req; 1723 struct spdk_fs_cb_args *args; 1724 int rc; 1725 1726 req = alloc_fs_request(channel); 1727 if (req == NULL) { 1728 return -ENOMEM; 1729 } 1730 1731 args = &req->args; 1732 1733 args->file = file; 1734 args->op.truncate.length = length; 1735 args->fn.file_op = __wake_caller; 1736 args->sem = &channel->sem; 1737 1738 channel->send_request(__truncate, req); 1739 sem_wait(&channel->sem); 1740 rc = args->rc; 1741 free_fs_request(req); 1742 1743 return rc; 1744 } 1745 1746 static void 1747 __rw_done(void *ctx, int bserrno) 1748 { 1749 struct spdk_fs_request *req = ctx; 1750 struct spdk_fs_cb_args *args = &req->args; 1751 1752 spdk_free(args->op.rw.pin_buf); 1753 args->fn.file_op(args->arg, bserrno); 1754 free_fs_request(req); 1755 } 1756 1757 static void 1758 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 1759 { 1760 int i; 1761 size_t len; 1762 1763 for (i = 0; i < iovcnt; i++) { 1764 len = spdk_min(iovs[i].iov_len, buf_len); 1765 memcpy(buf, iovs[i].iov_base, len); 1766 buf += len; 1767 assert(buf_len >= len); 1768 buf_len -= len; 1769 } 1770 } 1771 1772 static void 1773 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 1774 { 1775 int i; 1776 size_t len; 1777 1778 for (i = 0; i < iovcnt; i++) { 1779 len = spdk_min(iovs[i].iov_len, buf_len); 1780 memcpy(iovs[i].iov_base, buf, len); 1781 buf += len; 1782 assert(buf_len >= len); 1783 buf_len -= len; 1784 } 1785 } 1786 1787 static void 1788 __read_done(void *ctx, int bserrno) 1789 { 1790 struct spdk_fs_request *req = ctx; 1791 struct spdk_fs_cb_args *args = &req->args; 1792 void *buf; 1793 1794 assert(req != NULL); 1795 buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1))); 1796 if (args->op.rw.is_read) { 1797 _copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length); 1798 __rw_done(req, 0); 1799 } else { 1800 _copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt); 1801 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1802 args->op.rw.pin_buf, 1803 args->op.rw.start_lba, args->op.rw.num_lba, 1804 __rw_done, req); 1805 } 1806 } 1807 1808 static void 1809 __do_blob_read(void *ctx, int fserrno) 1810 { 1811 struct spdk_fs_request *req = ctx; 1812 struct spdk_fs_cb_args *args = &req->args; 1813 1814 if (fserrno) { 1815 __rw_done(req, fserrno); 1816 return; 1817 } 1818 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1819 args->op.rw.pin_buf, 1820 args->op.rw.start_lba, args->op.rw.num_lba, 1821 __read_done, req); 1822 } 1823 1824 static void 1825 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1826 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1827 { 1828 uint64_t end_lba; 1829 1830 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1831 *start_lba = offset / *lba_size; 1832 end_lba = (offset + length - 1) / *lba_size; 1833 *num_lba = (end_lba - *start_lba + 1); 1834 } 1835 1836 static bool 1837 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length) 1838 { 1839 uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1840 1841 if ((offset % lba_size == 0) && (length % lba_size == 0)) { 1842 return true; 1843 } 1844 1845 return false; 1846 } 1847 1848 static void 1849 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt) 1850 { 1851 uint32_t i; 1852 1853 for (i = 0; i < iovcnt; i++) { 1854 req->args.iovs[i].iov_base = iovs[i].iov_base; 1855 req->args.iovs[i].iov_len = iovs[i].iov_len; 1856 } 1857 } 1858 1859 static void 1860 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel, 1861 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1862 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1863 { 1864 struct spdk_fs_request *req; 1865 struct spdk_fs_cb_args *args; 1866 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1867 uint64_t start_lba, num_lba, pin_buf_length; 1868 uint32_t lba_size; 1869 1870 if (is_read && offset + length > file->length) { 1871 cb_fn(cb_arg, -EINVAL); 1872 return; 1873 } 1874 1875 req = alloc_fs_request_with_iov(channel, iovcnt); 1876 if (req == NULL) { 1877 cb_fn(cb_arg, -ENOMEM); 1878 return; 1879 } 1880 1881 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1882 1883 args = &req->args; 1884 args->fn.file_op = cb_fn; 1885 args->arg = cb_arg; 1886 args->file = file; 1887 args->op.rw.channel = channel->bs_channel; 1888 _fs_request_setup_iovs(req, iovs, iovcnt); 1889 args->op.rw.is_read = is_read; 1890 args->op.rw.offset = offset; 1891 args->op.rw.blocklen = lba_size; 1892 1893 pin_buf_length = num_lba * lba_size; 1894 args->op.rw.length = pin_buf_length; 1895 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL, 1896 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1897 if (args->op.rw.pin_buf == NULL) { 1898 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1899 file->name, offset, length); 1900 free_fs_request(req); 1901 cb_fn(cb_arg, -ENOMEM); 1902 return; 1903 } 1904 1905 args->op.rw.start_lba = start_lba; 1906 args->op.rw.num_lba = num_lba; 1907 1908 if (!is_read && file->length < offset + length) { 1909 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1910 } else if (!is_read && __is_lba_aligned(file, offset, length)) { 1911 _copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt); 1912 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1913 args->op.rw.pin_buf, 1914 args->op.rw.start_lba, args->op.rw.num_lba, 1915 __rw_done, req); 1916 } else { 1917 __do_blob_read(req, 0); 1918 } 1919 } 1920 1921 static void 1922 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel, 1923 void *payload, uint64_t offset, uint64_t length, 1924 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1925 { 1926 struct iovec iov; 1927 1928 iov.iov_base = payload; 1929 iov.iov_len = (size_t)length; 1930 1931 __readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read); 1932 } 1933 1934 void 1935 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1936 void *payload, uint64_t offset, uint64_t length, 1937 spdk_file_op_complete cb_fn, void *cb_arg) 1938 { 1939 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1940 } 1941 1942 void 1943 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel, 1944 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1945 spdk_file_op_complete cb_fn, void *cb_arg) 1946 { 1947 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1948 file->name, offset, length); 1949 1950 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0); 1951 } 1952 1953 void 1954 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1955 void *payload, uint64_t offset, uint64_t length, 1956 spdk_file_op_complete cb_fn, void *cb_arg) 1957 { 1958 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1959 file->name, offset, length); 1960 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1961 } 1962 1963 void 1964 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel, 1965 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1966 spdk_file_op_complete cb_fn, void *cb_arg) 1967 { 1968 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1969 file->name, offset, length); 1970 1971 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1); 1972 } 1973 1974 struct spdk_io_channel * 1975 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1976 { 1977 struct spdk_io_channel *io_channel; 1978 struct spdk_fs_channel *fs_channel; 1979 1980 io_channel = spdk_get_io_channel(&fs->io_target); 1981 fs_channel = spdk_io_channel_get_ctx(io_channel); 1982 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1983 fs_channel->send_request = __send_request_direct; 1984 1985 return io_channel; 1986 } 1987 1988 void 1989 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1990 { 1991 spdk_put_io_channel(channel); 1992 } 1993 1994 struct spdk_fs_thread_ctx * 1995 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs) 1996 { 1997 struct spdk_fs_thread_ctx *ctx; 1998 1999 ctx = calloc(1, sizeof(*ctx)); 2000 if (!ctx) { 2001 return NULL; 2002 } 2003 2004 if (pthread_spin_init(&ctx->ch.lock, 0)) { 2005 free(ctx); 2006 return NULL; 2007 } 2008 2009 fs_channel_create(fs, &ctx->ch, 512); 2010 2011 ctx->ch.send_request = fs->send_request; 2012 ctx->ch.sync = 1; 2013 2014 return ctx; 2015 } 2016 2017 2018 void 2019 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx) 2020 { 2021 assert(ctx->ch.sync == 1); 2022 2023 while (true) { 2024 pthread_spin_lock(&ctx->ch.lock); 2025 if (ctx->ch.outstanding_reqs == 0) { 2026 pthread_spin_unlock(&ctx->ch.lock); 2027 break; 2028 } 2029 pthread_spin_unlock(&ctx->ch.lock); 2030 usleep(1000); 2031 } 2032 2033 fs_channel_destroy(NULL, &ctx->ch); 2034 free(ctx); 2035 } 2036 2037 int 2038 spdk_fs_set_cache_size(uint64_t size_in_mb) 2039 { 2040 /* setting g_fs_cache_size is only permitted if cache pool 2041 * is already freed or hasn't been initialized 2042 */ 2043 if (g_cache_pool != NULL) { 2044 return -EPERM; 2045 } 2046 2047 g_fs_cache_size = size_in_mb * 1024 * 1024; 2048 2049 return 0; 2050 } 2051 2052 uint64_t 2053 spdk_fs_get_cache_size(void) 2054 { 2055 return g_fs_cache_size / (1024 * 1024); 2056 } 2057 2058 static void __file_flush(void *ctx); 2059 2060 /* Try to free some cache buffers from this file. 2061 */ 2062 static int 2063 reclaim_cache_buffers(struct spdk_file *file) 2064 { 2065 int rc; 2066 2067 BLOBFS_TRACE(file, "free=%s\n", file->name); 2068 2069 /* The function is safe to be called with any threads, while the file 2070 * lock maybe locked by other thread for now, so try to get the file 2071 * lock here. 2072 */ 2073 rc = pthread_spin_trylock(&file->lock); 2074 if (rc != 0) { 2075 return -1; 2076 } 2077 2078 if (file->tree->present_mask == 0) { 2079 pthread_spin_unlock(&file->lock); 2080 return -1; 2081 } 2082 tree_free_buffers(file->tree); 2083 2084 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2085 /* If not freed, put it in the end of the queue */ 2086 if (file->tree->present_mask != 0) { 2087 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2088 } else { 2089 file->last = NULL; 2090 } 2091 pthread_spin_unlock(&file->lock); 2092 2093 return 0; 2094 } 2095 2096 static int 2097 _blobfs_cache_pool_reclaim(void *arg) 2098 { 2099 struct spdk_file *file, *tmp; 2100 int rc; 2101 2102 if (!blobfs_cache_pool_need_reclaim()) { 2103 return SPDK_POLLER_IDLE; 2104 } 2105 2106 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2107 if (!file->open_for_writing && 2108 file->priority == SPDK_FILE_PRIORITY_LOW) { 2109 rc = reclaim_cache_buffers(file); 2110 if (rc < 0) { 2111 continue; 2112 } 2113 if (!blobfs_cache_pool_need_reclaim()) { 2114 return SPDK_POLLER_BUSY; 2115 } 2116 break; 2117 } 2118 } 2119 2120 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2121 if (!file->open_for_writing) { 2122 rc = reclaim_cache_buffers(file); 2123 if (rc < 0) { 2124 continue; 2125 } 2126 if (!blobfs_cache_pool_need_reclaim()) { 2127 return SPDK_POLLER_BUSY; 2128 } 2129 break; 2130 } 2131 } 2132 2133 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2134 rc = reclaim_cache_buffers(file); 2135 if (rc < 0) { 2136 continue; 2137 } 2138 break; 2139 } 2140 2141 return SPDK_POLLER_BUSY; 2142 } 2143 2144 static void 2145 _add_file_to_cache_pool(void *ctx) 2146 { 2147 struct spdk_file *file = ctx; 2148 2149 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2150 } 2151 2152 static void 2153 _remove_file_from_cache_pool(void *ctx) 2154 { 2155 struct spdk_file *file = ctx; 2156 2157 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2158 } 2159 2160 static struct cache_buffer * 2161 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 2162 { 2163 struct cache_buffer *buf; 2164 int count = 0; 2165 bool need_update = false; 2166 2167 buf = calloc(1, sizeof(*buf)); 2168 if (buf == NULL) { 2169 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 2170 return NULL; 2171 } 2172 2173 do { 2174 buf->buf = spdk_mempool_get(g_cache_pool); 2175 if (buf->buf) { 2176 break; 2177 } 2178 if (count++ == 100) { 2179 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 2180 file, offset); 2181 free(buf); 2182 return NULL; 2183 } 2184 usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 2185 } while (true); 2186 2187 buf->buf_size = CACHE_BUFFER_SIZE; 2188 buf->offset = offset; 2189 2190 if (file->tree->present_mask == 0) { 2191 need_update = true; 2192 } 2193 file->tree = tree_insert_buffer(file->tree, buf); 2194 2195 if (need_update) { 2196 spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file); 2197 } 2198 2199 return buf; 2200 } 2201 2202 static struct cache_buffer * 2203 cache_append_buffer(struct spdk_file *file) 2204 { 2205 struct cache_buffer *last; 2206 2207 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 2208 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 2209 2210 last = cache_insert_buffer(file, file->append_pos); 2211 if (last == NULL) { 2212 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 2213 return NULL; 2214 } 2215 2216 file->last = last; 2217 2218 return last; 2219 } 2220 2221 static void __check_sync_reqs(struct spdk_file *file); 2222 2223 static void 2224 __file_cache_finish_sync(void *ctx, int bserrno) 2225 { 2226 struct spdk_file *file; 2227 struct spdk_fs_request *sync_req = ctx; 2228 struct spdk_fs_cb_args *sync_args; 2229 2230 sync_args = &sync_req->args; 2231 file = sync_args->file; 2232 pthread_spin_lock(&file->lock); 2233 file->length_xattr = sync_args->op.sync.length; 2234 assert(sync_args->op.sync.offset <= file->length_flushed); 2235 spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset, 2236 0, file->trace_arg_name); 2237 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 2238 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 2239 pthread_spin_unlock(&file->lock); 2240 2241 sync_args->fn.file_op(sync_args->arg, bserrno); 2242 2243 free_fs_request(sync_req); 2244 __check_sync_reqs(file); 2245 } 2246 2247 static void 2248 __check_sync_reqs(struct spdk_file *file) 2249 { 2250 struct spdk_fs_request *sync_req; 2251 2252 pthread_spin_lock(&file->lock); 2253 2254 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 2255 if (sync_req->args.op.sync.offset <= file->length_flushed) { 2256 break; 2257 } 2258 } 2259 2260 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 2261 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 2262 sync_req->args.op.sync.xattr_in_progress = true; 2263 sync_req->args.op.sync.length = file->length_flushed; 2264 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 2265 sizeof(file->length_flushed)); 2266 2267 pthread_spin_unlock(&file->lock); 2268 spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed, 2269 0, file->trace_arg_name); 2270 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req); 2271 } else { 2272 pthread_spin_unlock(&file->lock); 2273 } 2274 } 2275 2276 static void 2277 __file_flush_done(void *ctx, int bserrno) 2278 { 2279 struct spdk_fs_request *req = ctx; 2280 struct spdk_fs_cb_args *args = &req->args; 2281 struct spdk_file *file = args->file; 2282 struct cache_buffer *next = args->op.flush.cache_buffer; 2283 2284 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 2285 2286 pthread_spin_lock(&file->lock); 2287 next->in_progress = false; 2288 next->bytes_flushed += args->op.flush.length; 2289 file->length_flushed += args->op.flush.length; 2290 if (file->length_flushed > file->length) { 2291 file->length = file->length_flushed; 2292 } 2293 if (next->bytes_flushed == next->buf_size) { 2294 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 2295 next = tree_find_buffer(file->tree, file->length_flushed); 2296 } 2297 2298 /* 2299 * Assert that there is no cached data that extends past the end of the underlying 2300 * blob. 2301 */ 2302 assert(next == NULL || next->offset < __file_get_blob_size(file) || 2303 next->bytes_filled == 0); 2304 2305 pthread_spin_unlock(&file->lock); 2306 2307 __check_sync_reqs(file); 2308 2309 __file_flush(req); 2310 } 2311 2312 static void 2313 __file_flush(void *ctx) 2314 { 2315 struct spdk_fs_request *req = ctx; 2316 struct spdk_fs_cb_args *args = &req->args; 2317 struct spdk_file *file = args->file; 2318 struct cache_buffer *next; 2319 uint64_t offset, length, start_lba, num_lba; 2320 uint32_t lba_size; 2321 2322 pthread_spin_lock(&file->lock); 2323 next = tree_find_buffer(file->tree, file->length_flushed); 2324 if (next == NULL || next->in_progress || 2325 ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) { 2326 /* 2327 * There is either no data to flush, a flush I/O is already in 2328 * progress, or the next buffer is partially filled but there's no 2329 * outstanding request to sync it. 2330 * So return immediately - if a flush I/O is in progress we will flush 2331 * more data after that is completed, or a partial buffer will get flushed 2332 * when it is either filled or the file is synced. 2333 */ 2334 free_fs_request(req); 2335 if (next == NULL) { 2336 /* 2337 * For cases where a file's cache was evicted, and then the 2338 * file was later appended, we will write the data directly 2339 * to disk and bypass cache. So just update length_flushed 2340 * here to reflect that all data was already written to disk. 2341 */ 2342 file->length_flushed = file->append_pos; 2343 } 2344 pthread_spin_unlock(&file->lock); 2345 if (next == NULL) { 2346 /* 2347 * There is no data to flush, but we still need to check for any 2348 * outstanding sync requests to make sure metadata gets updated. 2349 */ 2350 __check_sync_reqs(file); 2351 } 2352 return; 2353 } 2354 2355 offset = next->offset + next->bytes_flushed; 2356 length = next->bytes_filled - next->bytes_flushed; 2357 if (length == 0) { 2358 free_fs_request(req); 2359 pthread_spin_unlock(&file->lock); 2360 /* 2361 * There is no data to flush, but we still need to check for any 2362 * outstanding sync requests to make sure metadata gets updated. 2363 */ 2364 __check_sync_reqs(file); 2365 return; 2366 } 2367 args->op.flush.length = length; 2368 args->op.flush.cache_buffer = next; 2369 2370 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2371 2372 next->in_progress = true; 2373 BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n", 2374 offset, length, start_lba, num_lba); 2375 pthread_spin_unlock(&file->lock); 2376 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2377 next->buf + (start_lba * lba_size) - next->offset, 2378 start_lba, num_lba, __file_flush_done, req); 2379 } 2380 2381 static void 2382 __file_extend_done(void *arg, int bserrno) 2383 { 2384 struct spdk_fs_cb_args *args = arg; 2385 2386 __wake_caller(args, bserrno); 2387 } 2388 2389 static void 2390 __file_extend_resize_cb(void *_args, int bserrno) 2391 { 2392 struct spdk_fs_cb_args *args = _args; 2393 struct spdk_file *file = args->file; 2394 2395 if (bserrno) { 2396 __wake_caller(args, bserrno); 2397 return; 2398 } 2399 2400 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2401 } 2402 2403 static void 2404 __file_extend_blob(void *_args) 2405 { 2406 struct spdk_fs_cb_args *args = _args; 2407 struct spdk_file *file = args->file; 2408 2409 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2410 } 2411 2412 static void 2413 __rw_from_file_done(void *ctx, int bserrno) 2414 { 2415 struct spdk_fs_request *req = ctx; 2416 2417 __wake_caller(&req->args, bserrno); 2418 free_fs_request(req); 2419 } 2420 2421 static void 2422 __rw_from_file(void *ctx) 2423 { 2424 struct spdk_fs_request *req = ctx; 2425 struct spdk_fs_cb_args *args = &req->args; 2426 struct spdk_file *file = args->file; 2427 2428 if (args->op.rw.is_read) { 2429 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2430 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2431 __rw_from_file_done, req); 2432 } else { 2433 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2434 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2435 __rw_from_file_done, req); 2436 } 2437 } 2438 2439 static int 2440 __send_rw_from_file(struct spdk_file *file, void *payload, 2441 uint64_t offset, uint64_t length, bool is_read, 2442 struct spdk_fs_channel *channel) 2443 { 2444 struct spdk_fs_request *req; 2445 struct spdk_fs_cb_args *args; 2446 2447 req = alloc_fs_request_with_iov(channel, 1); 2448 if (req == NULL) { 2449 sem_post(&channel->sem); 2450 return -ENOMEM; 2451 } 2452 2453 args = &req->args; 2454 args->file = file; 2455 args->sem = &channel->sem; 2456 args->iovs[0].iov_base = payload; 2457 args->iovs[0].iov_len = (size_t)length; 2458 args->op.rw.offset = offset; 2459 args->op.rw.is_read = is_read; 2460 file->fs->send_request(__rw_from_file, req); 2461 return 0; 2462 } 2463 2464 int 2465 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2466 void *payload, uint64_t offset, uint64_t length) 2467 { 2468 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2469 struct spdk_fs_request *flush_req; 2470 uint64_t rem_length, copy, blob_size, cluster_sz; 2471 uint32_t cache_buffers_filled = 0; 2472 uint8_t *cur_payload; 2473 struct cache_buffer *last; 2474 2475 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2476 2477 if (length == 0) { 2478 return 0; 2479 } 2480 2481 if (offset != file->append_pos) { 2482 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2483 return -EINVAL; 2484 } 2485 2486 pthread_spin_lock(&file->lock); 2487 file->open_for_writing = true; 2488 2489 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2490 cache_append_buffer(file); 2491 } 2492 2493 if (file->last == NULL) { 2494 int rc; 2495 2496 file->append_pos += length; 2497 pthread_spin_unlock(&file->lock); 2498 rc = __send_rw_from_file(file, payload, offset, length, false, channel); 2499 sem_wait(&channel->sem); 2500 return rc; 2501 } 2502 2503 blob_size = __file_get_blob_size(file); 2504 2505 if ((offset + length) > blob_size) { 2506 struct spdk_fs_cb_args extend_args = {}; 2507 2508 cluster_sz = file->fs->bs_opts.cluster_sz; 2509 extend_args.sem = &channel->sem; 2510 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2511 extend_args.file = file; 2512 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2513 pthread_spin_unlock(&file->lock); 2514 file->fs->send_request(__file_extend_blob, &extend_args); 2515 sem_wait(&channel->sem); 2516 if (extend_args.rc) { 2517 return extend_args.rc; 2518 } 2519 } 2520 2521 flush_req = alloc_fs_request(channel); 2522 if (flush_req == NULL) { 2523 pthread_spin_unlock(&file->lock); 2524 return -ENOMEM; 2525 } 2526 2527 last = file->last; 2528 rem_length = length; 2529 cur_payload = payload; 2530 while (rem_length > 0) { 2531 copy = last->buf_size - last->bytes_filled; 2532 if (copy > rem_length) { 2533 copy = rem_length; 2534 } 2535 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2536 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2537 file->append_pos += copy; 2538 if (file->length < file->append_pos) { 2539 file->length = file->append_pos; 2540 } 2541 cur_payload += copy; 2542 last->bytes_filled += copy; 2543 rem_length -= copy; 2544 if (last->bytes_filled == last->buf_size) { 2545 cache_buffers_filled++; 2546 last = cache_append_buffer(file); 2547 if (last == NULL) { 2548 BLOBFS_TRACE(file, "nomem\n"); 2549 free_fs_request(flush_req); 2550 pthread_spin_unlock(&file->lock); 2551 return -ENOMEM; 2552 } 2553 } 2554 } 2555 2556 pthread_spin_unlock(&file->lock); 2557 2558 if (cache_buffers_filled == 0) { 2559 free_fs_request(flush_req); 2560 return 0; 2561 } 2562 2563 flush_req->args.file = file; 2564 file->fs->send_request(__file_flush, flush_req); 2565 return 0; 2566 } 2567 2568 static void 2569 __readahead_done(void *ctx, int bserrno) 2570 { 2571 struct spdk_fs_request *req = ctx; 2572 struct spdk_fs_cb_args *args = &req->args; 2573 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2574 struct spdk_file *file = args->file; 2575 2576 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2577 2578 pthread_spin_lock(&file->lock); 2579 cache_buffer->bytes_filled = args->op.readahead.length; 2580 cache_buffer->bytes_flushed = args->op.readahead.length; 2581 cache_buffer->in_progress = false; 2582 pthread_spin_unlock(&file->lock); 2583 2584 free_fs_request(req); 2585 } 2586 2587 static void 2588 __readahead(void *ctx) 2589 { 2590 struct spdk_fs_request *req = ctx; 2591 struct spdk_fs_cb_args *args = &req->args; 2592 struct spdk_file *file = args->file; 2593 uint64_t offset, length, start_lba, num_lba; 2594 uint32_t lba_size; 2595 2596 offset = args->op.readahead.offset; 2597 length = args->op.readahead.length; 2598 assert(length > 0); 2599 2600 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2601 2602 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2603 offset, length, start_lba, num_lba); 2604 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2605 args->op.readahead.cache_buffer->buf, 2606 start_lba, num_lba, __readahead_done, req); 2607 } 2608 2609 static uint64_t 2610 __next_cache_buffer_offset(uint64_t offset) 2611 { 2612 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2613 } 2614 2615 static void 2616 check_readahead(struct spdk_file *file, uint64_t offset, 2617 struct spdk_fs_channel *channel) 2618 { 2619 struct spdk_fs_request *req; 2620 struct spdk_fs_cb_args *args; 2621 2622 offset = __next_cache_buffer_offset(offset); 2623 if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2624 return; 2625 } 2626 2627 req = alloc_fs_request(channel); 2628 if (req == NULL) { 2629 return; 2630 } 2631 args = &req->args; 2632 2633 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2634 2635 args->file = file; 2636 args->op.readahead.offset = offset; 2637 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2638 if (!args->op.readahead.cache_buffer) { 2639 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2640 free_fs_request(req); 2641 return; 2642 } 2643 2644 args->op.readahead.cache_buffer->in_progress = true; 2645 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2646 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2647 } else { 2648 args->op.readahead.length = CACHE_BUFFER_SIZE; 2649 } 2650 file->fs->send_request(__readahead, req); 2651 } 2652 2653 int64_t 2654 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2655 void *payload, uint64_t offset, uint64_t length) 2656 { 2657 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2658 uint64_t final_offset, final_length; 2659 uint32_t sub_reads = 0; 2660 struct cache_buffer *buf; 2661 uint64_t read_len; 2662 int rc = 0; 2663 2664 pthread_spin_lock(&file->lock); 2665 2666 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2667 2668 file->open_for_writing = false; 2669 2670 if (length == 0 || offset >= file->append_pos) { 2671 pthread_spin_unlock(&file->lock); 2672 return 0; 2673 } 2674 2675 if (offset + length > file->append_pos) { 2676 length = file->append_pos - offset; 2677 } 2678 2679 if (offset != file->next_seq_offset) { 2680 file->seq_byte_count = 0; 2681 } 2682 file->seq_byte_count += length; 2683 file->next_seq_offset = offset + length; 2684 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2685 check_readahead(file, offset, channel); 2686 check_readahead(file, offset + CACHE_BUFFER_SIZE, channel); 2687 } 2688 2689 final_length = 0; 2690 final_offset = offset + length; 2691 while (offset < final_offset) { 2692 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2693 if (length > (final_offset - offset)) { 2694 length = final_offset - offset; 2695 } 2696 2697 buf = tree_find_filled_buffer(file->tree, offset); 2698 if (buf == NULL) { 2699 pthread_spin_unlock(&file->lock); 2700 rc = __send_rw_from_file(file, payload, offset, length, true, channel); 2701 pthread_spin_lock(&file->lock); 2702 if (rc == 0) { 2703 sub_reads++; 2704 } 2705 } else { 2706 read_len = length; 2707 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2708 read_len = buf->offset + buf->bytes_filled - offset; 2709 } 2710 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len); 2711 memcpy(payload, &buf->buf[offset - buf->offset], read_len); 2712 if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) { 2713 tree_remove_buffer(file->tree, buf); 2714 if (file->tree->present_mask == 0) { 2715 spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file); 2716 } 2717 } 2718 } 2719 2720 if (rc == 0) { 2721 final_length += length; 2722 } else { 2723 break; 2724 } 2725 payload += length; 2726 offset += length; 2727 } 2728 pthread_spin_unlock(&file->lock); 2729 while (sub_reads > 0) { 2730 sem_wait(&channel->sem); 2731 sub_reads--; 2732 } 2733 if (rc == 0) { 2734 return final_length; 2735 } else { 2736 return rc; 2737 } 2738 } 2739 2740 static void 2741 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2742 spdk_file_op_complete cb_fn, void *cb_arg) 2743 { 2744 struct spdk_fs_request *sync_req; 2745 struct spdk_fs_request *flush_req; 2746 struct spdk_fs_cb_args *sync_args; 2747 struct spdk_fs_cb_args *flush_args; 2748 2749 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2750 2751 pthread_spin_lock(&file->lock); 2752 if (file->append_pos <= file->length_xattr) { 2753 BLOBFS_TRACE(file, "done - file already synced\n"); 2754 pthread_spin_unlock(&file->lock); 2755 cb_fn(cb_arg, 0); 2756 return; 2757 } 2758 2759 sync_req = alloc_fs_request(channel); 2760 if (!sync_req) { 2761 SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name); 2762 pthread_spin_unlock(&file->lock); 2763 cb_fn(cb_arg, -ENOMEM); 2764 return; 2765 } 2766 sync_args = &sync_req->args; 2767 2768 flush_req = alloc_fs_request(channel); 2769 if (!flush_req) { 2770 SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name); 2771 free_fs_request(sync_req); 2772 pthread_spin_unlock(&file->lock); 2773 cb_fn(cb_arg, -ENOMEM); 2774 return; 2775 } 2776 flush_args = &flush_req->args; 2777 2778 sync_args->file = file; 2779 sync_args->fn.file_op = cb_fn; 2780 sync_args->arg = cb_arg; 2781 sync_args->op.sync.offset = file->append_pos; 2782 sync_args->op.sync.xattr_in_progress = false; 2783 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2784 pthread_spin_unlock(&file->lock); 2785 2786 flush_args->file = file; 2787 channel->send_request(__file_flush, flush_req); 2788 } 2789 2790 int 2791 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2792 { 2793 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2794 struct spdk_fs_cb_args args = {}; 2795 2796 args.sem = &channel->sem; 2797 _file_sync(file, channel, __wake_caller, &args); 2798 sem_wait(&channel->sem); 2799 2800 return args.rc; 2801 } 2802 2803 void 2804 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2805 spdk_file_op_complete cb_fn, void *cb_arg) 2806 { 2807 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2808 2809 _file_sync(file, channel, cb_fn, cb_arg); 2810 } 2811 2812 void 2813 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2814 { 2815 BLOBFS_TRACE(file, "priority=%u\n", priority); 2816 file->priority = priority; 2817 2818 } 2819 2820 /* 2821 * Close routines 2822 */ 2823 2824 static void 2825 __file_close_async_done(void *ctx, int bserrno) 2826 { 2827 struct spdk_fs_request *req = ctx; 2828 struct spdk_fs_cb_args *args = &req->args; 2829 struct spdk_file *file = args->file; 2830 2831 spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name); 2832 2833 if (file->is_deleted) { 2834 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2835 return; 2836 } 2837 2838 args->fn.file_op(args->arg, bserrno); 2839 free_fs_request(req); 2840 } 2841 2842 static void 2843 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2844 { 2845 struct spdk_blob *blob; 2846 2847 pthread_spin_lock(&file->lock); 2848 if (file->ref_count == 0) { 2849 pthread_spin_unlock(&file->lock); 2850 __file_close_async_done(req, -EBADF); 2851 return; 2852 } 2853 2854 file->ref_count--; 2855 if (file->ref_count > 0) { 2856 pthread_spin_unlock(&file->lock); 2857 req->args.fn.file_op(req->args.arg, 0); 2858 free_fs_request(req); 2859 return; 2860 } 2861 2862 pthread_spin_unlock(&file->lock); 2863 2864 blob = file->blob; 2865 file->blob = NULL; 2866 spdk_blob_close(blob, __file_close_async_done, req); 2867 } 2868 2869 static void 2870 __file_close_async__sync_done(void *arg, int fserrno) 2871 { 2872 struct spdk_fs_request *req = arg; 2873 struct spdk_fs_cb_args *args = &req->args; 2874 2875 __file_close_async(args->file, req); 2876 } 2877 2878 void 2879 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2880 { 2881 struct spdk_fs_request *req; 2882 struct spdk_fs_cb_args *args; 2883 2884 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2885 if (req == NULL) { 2886 SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name); 2887 cb_fn(cb_arg, -ENOMEM); 2888 return; 2889 } 2890 2891 args = &req->args; 2892 args->file = file; 2893 args->fn.file_op = cb_fn; 2894 args->arg = cb_arg; 2895 2896 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2897 } 2898 2899 static void 2900 __file_close(void *arg) 2901 { 2902 struct spdk_fs_request *req = arg; 2903 struct spdk_fs_cb_args *args = &req->args; 2904 struct spdk_file *file = args->file; 2905 2906 __file_close_async(file, req); 2907 } 2908 2909 int 2910 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2911 { 2912 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2913 struct spdk_fs_request *req; 2914 struct spdk_fs_cb_args *args; 2915 2916 req = alloc_fs_request(channel); 2917 if (req == NULL) { 2918 SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name); 2919 return -ENOMEM; 2920 } 2921 2922 args = &req->args; 2923 2924 spdk_file_sync(file, ctx); 2925 BLOBFS_TRACE(file, "name=%s\n", file->name); 2926 args->file = file; 2927 args->sem = &channel->sem; 2928 args->fn.file_op = __wake_caller; 2929 args->arg = args; 2930 channel->send_request(__file_close, req); 2931 sem_wait(&channel->sem); 2932 2933 return args->rc; 2934 } 2935 2936 int 2937 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2938 { 2939 if (size < sizeof(spdk_blob_id)) { 2940 return -EINVAL; 2941 } 2942 2943 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2944 2945 return sizeof(spdk_blob_id); 2946 } 2947 2948 static void 2949 _file_free(void *ctx) 2950 { 2951 struct spdk_file *file = ctx; 2952 2953 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2954 2955 free(file->name); 2956 free(file->tree); 2957 free(file); 2958 } 2959 2960 static void 2961 file_free(struct spdk_file *file) 2962 { 2963 BLOBFS_TRACE(file, "free=%s\n", file->name); 2964 pthread_spin_lock(&file->lock); 2965 if (file->tree->present_mask == 0) { 2966 pthread_spin_unlock(&file->lock); 2967 free(file->name); 2968 free(file->tree); 2969 free(file); 2970 return; 2971 } 2972 2973 tree_free_buffers(file->tree); 2974 assert(file->tree->present_mask == 0); 2975 spdk_thread_send_msg(g_cache_pool_thread, _file_free, file); 2976 pthread_spin_unlock(&file->lock); 2977 } 2978 2979 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2980 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2981