1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "tree.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/thread.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 #include "spdk/trace.h" 47 48 #define BLOBFS_TRACE(file, str, args...) \ 49 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 50 51 #define BLOBFS_TRACE_RW(file, str, args...) \ 52 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 53 54 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 55 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 56 57 #define SPDK_BLOBFS_SIGNATURE "BLOBFS" 58 59 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 60 static struct spdk_mempool *g_cache_pool; 61 static TAILQ_HEAD(, spdk_file) g_caches; 62 static struct spdk_poller *g_cache_pool_mgmt_poller; 63 static struct spdk_thread *g_cache_pool_thread; 64 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL 65 static int g_fs_count = 0; 66 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 67 static pthread_spinlock_t g_caches_lock; 68 69 #define TRACE_GROUP_BLOBFS 0x7 70 #define TRACE_BLOBFS_XATTR_START SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0) 71 #define TRACE_BLOBFS_XATTR_END SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1) 72 #define TRACE_BLOBFS_OPEN SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2) 73 #define TRACE_BLOBFS_CLOSE SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3) 74 #define TRACE_BLOBFS_DELETE_START SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4) 75 #define TRACE_BLOBFS_DELETE_DONE SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5) 76 77 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS) 78 { 79 spdk_trace_register_description("BLOBFS_XATTR_START", 80 TRACE_BLOBFS_XATTR_START, 81 OWNER_NONE, OBJECT_NONE, 0, 82 SPDK_TRACE_ARG_TYPE_STR, 83 "file: "); 84 spdk_trace_register_description("BLOBFS_XATTR_END", 85 TRACE_BLOBFS_XATTR_END, 86 OWNER_NONE, OBJECT_NONE, 0, 87 SPDK_TRACE_ARG_TYPE_STR, 88 "file: "); 89 spdk_trace_register_description("BLOBFS_OPEN", 90 TRACE_BLOBFS_OPEN, 91 OWNER_NONE, OBJECT_NONE, 0, 92 SPDK_TRACE_ARG_TYPE_STR, 93 "file: "); 94 spdk_trace_register_description("BLOBFS_CLOSE", 95 TRACE_BLOBFS_CLOSE, 96 OWNER_NONE, OBJECT_NONE, 0, 97 SPDK_TRACE_ARG_TYPE_STR, 98 "file: "); 99 spdk_trace_register_description("BLOBFS_DELETE_START", 100 TRACE_BLOBFS_DELETE_START, 101 OWNER_NONE, OBJECT_NONE, 0, 102 SPDK_TRACE_ARG_TYPE_STR, 103 "file: "); 104 spdk_trace_register_description("BLOBFS_DELETE_DONE", 105 TRACE_BLOBFS_DELETE_DONE, 106 OWNER_NONE, OBJECT_NONE, 0, 107 SPDK_TRACE_ARG_TYPE_STR, 108 "file: "); 109 } 110 111 void 112 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 113 { 114 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 115 free(cache_buffer); 116 } 117 118 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 119 120 struct spdk_file { 121 struct spdk_filesystem *fs; 122 struct spdk_blob *blob; 123 char *name; 124 uint64_t trace_arg_name; 125 uint64_t length; 126 bool is_deleted; 127 bool open_for_writing; 128 uint64_t length_flushed; 129 uint64_t length_xattr; 130 uint64_t append_pos; 131 uint64_t seq_byte_count; 132 uint64_t next_seq_offset; 133 uint32_t priority; 134 TAILQ_ENTRY(spdk_file) tailq; 135 spdk_blob_id blobid; 136 uint32_t ref_count; 137 pthread_spinlock_t lock; 138 struct cache_buffer *last; 139 struct cache_tree *tree; 140 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 141 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 142 TAILQ_ENTRY(spdk_file) cache_tailq; 143 }; 144 145 struct spdk_deleted_file { 146 spdk_blob_id id; 147 TAILQ_ENTRY(spdk_deleted_file) tailq; 148 }; 149 150 struct spdk_filesystem { 151 struct spdk_blob_store *bs; 152 TAILQ_HEAD(, spdk_file) files; 153 struct spdk_bs_opts bs_opts; 154 struct spdk_bs_dev *bdev; 155 fs_send_request_fn send_request; 156 157 struct { 158 uint32_t max_ops; 159 struct spdk_io_channel *sync_io_channel; 160 struct spdk_fs_channel *sync_fs_channel; 161 } sync_target; 162 163 struct { 164 uint32_t max_ops; 165 struct spdk_io_channel *md_io_channel; 166 struct spdk_fs_channel *md_fs_channel; 167 } md_target; 168 169 struct { 170 uint32_t max_ops; 171 } io_target; 172 }; 173 174 struct spdk_fs_cb_args { 175 union { 176 spdk_fs_op_with_handle_complete fs_op_with_handle; 177 spdk_fs_op_complete fs_op; 178 spdk_file_op_with_handle_complete file_op_with_handle; 179 spdk_file_op_complete file_op; 180 spdk_file_stat_op_complete stat_op; 181 } fn; 182 void *arg; 183 sem_t *sem; 184 struct spdk_filesystem *fs; 185 struct spdk_file *file; 186 int rc; 187 struct iovec *iovs; 188 uint32_t iovcnt; 189 struct iovec iov; 190 union { 191 struct { 192 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 193 } fs_load; 194 struct { 195 uint64_t length; 196 } truncate; 197 struct { 198 struct spdk_io_channel *channel; 199 void *pin_buf; 200 int is_read; 201 off_t offset; 202 size_t length; 203 uint64_t start_lba; 204 uint64_t num_lba; 205 uint32_t blocklen; 206 } rw; 207 struct { 208 const char *old_name; 209 const char *new_name; 210 } rename; 211 struct { 212 struct cache_buffer *cache_buffer; 213 uint64_t length; 214 } flush; 215 struct { 216 struct cache_buffer *cache_buffer; 217 uint64_t length; 218 uint64_t offset; 219 } readahead; 220 struct { 221 /* offset of the file when the sync request was made */ 222 uint64_t offset; 223 TAILQ_ENTRY(spdk_fs_request) tailq; 224 bool xattr_in_progress; 225 /* length written to the xattr for this file - this should 226 * always be the same as the offset if only one thread is 227 * writing to the file, but could differ if multiple threads 228 * are appending 229 */ 230 uint64_t length; 231 } sync; 232 struct { 233 uint32_t num_clusters; 234 } resize; 235 struct { 236 const char *name; 237 uint32_t flags; 238 TAILQ_ENTRY(spdk_fs_request) tailq; 239 } open; 240 struct { 241 const char *name; 242 struct spdk_blob *blob; 243 } create; 244 struct { 245 const char *name; 246 } delete; 247 struct { 248 const char *name; 249 } stat; 250 } op; 251 }; 252 253 static void cache_free_buffers(struct spdk_file *file); 254 static void spdk_fs_io_device_unregister(struct spdk_filesystem *fs); 255 static void spdk_fs_free_io_channels(struct spdk_filesystem *fs); 256 257 void 258 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 259 { 260 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 261 } 262 263 static int _blobfs_cache_pool_reclaim(void *arg); 264 265 static bool 266 blobfs_cache_pool_need_reclaim(void) 267 { 268 size_t count; 269 270 count = spdk_mempool_count(g_cache_pool); 271 /* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller 272 * when the number of available cache buffer is less than 1/5 of total buffers. 273 */ 274 if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) { 275 return false; 276 } 277 278 return true; 279 } 280 281 static void 282 __start_cache_pool_mgmt(void *ctx) 283 { 284 assert(g_cache_pool == NULL); 285 286 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 287 g_fs_cache_size / CACHE_BUFFER_SIZE, 288 CACHE_BUFFER_SIZE, 289 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 290 SPDK_ENV_SOCKET_ID_ANY); 291 if (!g_cache_pool) { 292 SPDK_ERRLOG("Create mempool failed, you may " 293 "increase the memory and try again\n"); 294 assert(false); 295 } 296 TAILQ_INIT(&g_caches); 297 pthread_spin_init(&g_caches_lock, 0); 298 299 assert(g_cache_pool_mgmt_poller == NULL); 300 g_cache_pool_mgmt_poller = spdk_poller_register(_blobfs_cache_pool_reclaim, NULL, 301 BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 302 } 303 304 static void 305 __stop_cache_pool_mgmt(void *ctx) 306 { 307 spdk_poller_unregister(&g_cache_pool_mgmt_poller); 308 309 assert(g_cache_pool != NULL); 310 assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE); 311 spdk_mempool_free(g_cache_pool); 312 g_cache_pool = NULL; 313 314 spdk_thread_exit(g_cache_pool_thread); 315 } 316 317 static void 318 initialize_global_cache(void) 319 { 320 pthread_mutex_lock(&g_cache_init_lock); 321 if (g_fs_count == 0) { 322 g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL); 323 assert(g_cache_pool_thread != NULL); 324 spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL); 325 } 326 g_fs_count++; 327 pthread_mutex_unlock(&g_cache_init_lock); 328 } 329 330 static void 331 free_global_cache(void) 332 { 333 pthread_mutex_lock(&g_cache_init_lock); 334 g_fs_count--; 335 if (g_fs_count == 0) { 336 spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL); 337 } 338 pthread_mutex_unlock(&g_cache_init_lock); 339 } 340 341 static uint64_t 342 __file_get_blob_size(struct spdk_file *file) 343 { 344 uint64_t cluster_sz; 345 346 cluster_sz = file->fs->bs_opts.cluster_sz; 347 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 348 } 349 350 struct spdk_fs_request { 351 struct spdk_fs_cb_args args; 352 TAILQ_ENTRY(spdk_fs_request) link; 353 struct spdk_fs_channel *channel; 354 }; 355 356 struct spdk_fs_channel { 357 struct spdk_fs_request *req_mem; 358 TAILQ_HEAD(, spdk_fs_request) reqs; 359 sem_t sem; 360 struct spdk_filesystem *fs; 361 struct spdk_io_channel *bs_channel; 362 fs_send_request_fn send_request; 363 bool sync; 364 uint32_t outstanding_reqs; 365 pthread_spinlock_t lock; 366 }; 367 368 /* For now, this is effectively an alias. But eventually we'll shift 369 * some data members over. */ 370 struct spdk_fs_thread_ctx { 371 struct spdk_fs_channel ch; 372 }; 373 374 static struct spdk_fs_request * 375 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt) 376 { 377 struct spdk_fs_request *req; 378 struct iovec *iovs = NULL; 379 380 if (iovcnt > 1) { 381 iovs = calloc(iovcnt, sizeof(struct iovec)); 382 if (!iovs) { 383 return NULL; 384 } 385 } 386 387 if (channel->sync) { 388 pthread_spin_lock(&channel->lock); 389 } 390 391 req = TAILQ_FIRST(&channel->reqs); 392 if (req) { 393 channel->outstanding_reqs++; 394 TAILQ_REMOVE(&channel->reqs, req, link); 395 } 396 397 if (channel->sync) { 398 pthread_spin_unlock(&channel->lock); 399 } 400 401 if (req == NULL) { 402 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 403 free(iovs); 404 return NULL; 405 } 406 memset(req, 0, sizeof(*req)); 407 req->channel = channel; 408 if (iovcnt > 1) { 409 req->args.iovs = iovs; 410 } else { 411 req->args.iovs = &req->args.iov; 412 } 413 req->args.iovcnt = iovcnt; 414 415 return req; 416 } 417 418 static struct spdk_fs_request * 419 alloc_fs_request(struct spdk_fs_channel *channel) 420 { 421 return alloc_fs_request_with_iov(channel, 0); 422 } 423 424 static void 425 free_fs_request(struct spdk_fs_request *req) 426 { 427 struct spdk_fs_channel *channel = req->channel; 428 429 if (req->args.iovcnt > 1) { 430 free(req->args.iovs); 431 } 432 433 if (channel->sync) { 434 pthread_spin_lock(&channel->lock); 435 } 436 437 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 438 channel->outstanding_reqs--; 439 440 if (channel->sync) { 441 pthread_spin_unlock(&channel->lock); 442 } 443 } 444 445 static int 446 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 447 uint32_t max_ops) 448 { 449 uint32_t i; 450 451 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 452 if (!channel->req_mem) { 453 return -1; 454 } 455 456 channel->outstanding_reqs = 0; 457 TAILQ_INIT(&channel->reqs); 458 sem_init(&channel->sem, 0, 0); 459 460 for (i = 0; i < max_ops; i++) { 461 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 462 } 463 464 channel->fs = fs; 465 466 return 0; 467 } 468 469 static int 470 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 471 { 472 struct spdk_filesystem *fs; 473 struct spdk_fs_channel *channel = ctx_buf; 474 475 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 476 477 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 478 } 479 480 static int 481 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 482 { 483 struct spdk_filesystem *fs; 484 struct spdk_fs_channel *channel = ctx_buf; 485 486 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 487 488 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 489 } 490 491 static int 492 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 493 { 494 struct spdk_filesystem *fs; 495 struct spdk_fs_channel *channel = ctx_buf; 496 497 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 498 499 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 500 } 501 502 static void 503 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 504 { 505 struct spdk_fs_channel *channel = ctx_buf; 506 507 if (channel->outstanding_reqs > 0) { 508 SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n", 509 channel->outstanding_reqs); 510 } 511 512 free(channel->req_mem); 513 if (channel->bs_channel != NULL) { 514 spdk_bs_free_io_channel(channel->bs_channel); 515 } 516 } 517 518 static void 519 __send_request_direct(fs_request_fn fn, void *arg) 520 { 521 fn(arg); 522 } 523 524 static void 525 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 526 { 527 fs->bs = bs; 528 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 529 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 530 fs->md_target.md_fs_channel->send_request = __send_request_direct; 531 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 532 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 533 534 initialize_global_cache(); 535 } 536 537 static void 538 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 539 { 540 struct spdk_fs_request *req = ctx; 541 struct spdk_fs_cb_args *args = &req->args; 542 struct spdk_filesystem *fs = args->fs; 543 544 if (bserrno == 0) { 545 common_fs_bs_init(fs, bs); 546 } else { 547 free(fs); 548 fs = NULL; 549 } 550 551 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 552 free_fs_request(req); 553 } 554 555 static void 556 fs_conf_parse(void) 557 { 558 struct spdk_conf_section *sp; 559 560 sp = spdk_conf_find_section(NULL, "Blobfs"); 561 if (sp == NULL) { 562 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 563 return; 564 } 565 566 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 567 if (g_fs_cache_buffer_shift <= 0) { 568 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 569 } 570 } 571 572 static struct spdk_filesystem * 573 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 574 { 575 struct spdk_filesystem *fs; 576 577 fs = calloc(1, sizeof(*fs)); 578 if (fs == NULL) { 579 return NULL; 580 } 581 582 fs->bdev = dev; 583 fs->send_request = send_request_fn; 584 TAILQ_INIT(&fs->files); 585 586 fs->md_target.max_ops = 512; 587 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 588 sizeof(struct spdk_fs_channel), "blobfs_md"); 589 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 590 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 591 592 fs->sync_target.max_ops = 512; 593 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 594 sizeof(struct spdk_fs_channel), "blobfs_sync"); 595 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 596 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 597 598 fs->io_target.max_ops = 512; 599 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 600 sizeof(struct spdk_fs_channel), "blobfs_io"); 601 602 return fs; 603 } 604 605 static void 606 __wake_caller(void *arg, int fserrno) 607 { 608 struct spdk_fs_cb_args *args = arg; 609 610 args->rc = fserrno; 611 sem_post(args->sem); 612 } 613 614 void 615 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 616 fs_send_request_fn send_request_fn, 617 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 618 { 619 struct spdk_filesystem *fs; 620 struct spdk_fs_request *req; 621 struct spdk_fs_cb_args *args; 622 struct spdk_bs_opts opts = {}; 623 624 fs = fs_alloc(dev, send_request_fn); 625 if (fs == NULL) { 626 cb_fn(cb_arg, NULL, -ENOMEM); 627 return; 628 } 629 630 fs_conf_parse(); 631 632 req = alloc_fs_request(fs->md_target.md_fs_channel); 633 if (req == NULL) { 634 spdk_fs_free_io_channels(fs); 635 spdk_fs_io_device_unregister(fs); 636 cb_fn(cb_arg, NULL, -ENOMEM); 637 return; 638 } 639 640 args = &req->args; 641 args->fn.fs_op_with_handle = cb_fn; 642 args->arg = cb_arg; 643 args->fs = fs; 644 645 spdk_bs_opts_init(&opts); 646 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE); 647 if (opt) { 648 opts.cluster_sz = opt->cluster_sz; 649 } 650 spdk_bs_init(dev, &opts, init_cb, req); 651 } 652 653 static struct spdk_file * 654 file_alloc(struct spdk_filesystem *fs) 655 { 656 struct spdk_file *file; 657 658 file = calloc(1, sizeof(*file)); 659 if (file == NULL) { 660 return NULL; 661 } 662 663 file->tree = calloc(1, sizeof(*file->tree)); 664 if (file->tree == NULL) { 665 free(file); 666 return NULL; 667 } 668 669 file->fs = fs; 670 TAILQ_INIT(&file->open_requests); 671 TAILQ_INIT(&file->sync_requests); 672 pthread_spin_init(&file->lock, 0); 673 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 674 file->priority = SPDK_FILE_PRIORITY_LOW; 675 return file; 676 } 677 678 static void fs_load_done(void *ctx, int bserrno); 679 680 static int 681 _handle_deleted_files(struct spdk_fs_request *req) 682 { 683 struct spdk_fs_cb_args *args = &req->args; 684 struct spdk_filesystem *fs = args->fs; 685 686 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 687 struct spdk_deleted_file *deleted_file; 688 689 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 690 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 691 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 692 free(deleted_file); 693 return 0; 694 } 695 696 return 1; 697 } 698 699 static void 700 fs_load_done(void *ctx, int bserrno) 701 { 702 struct spdk_fs_request *req = ctx; 703 struct spdk_fs_cb_args *args = &req->args; 704 struct spdk_filesystem *fs = args->fs; 705 706 /* The filesystem has been loaded. Now check if there are any files that 707 * were marked for deletion before last unload. Do not complete the 708 * fs_load callback until all of them have been deleted on disk. 709 */ 710 if (_handle_deleted_files(req) == 0) { 711 /* We found a file that's been marked for deleting but not actually 712 * deleted yet. This function will get called again once the delete 713 * operation is completed. 714 */ 715 return; 716 } 717 718 args->fn.fs_op_with_handle(args->arg, fs, 0); 719 free_fs_request(req); 720 721 } 722 723 static void 724 _file_build_trace_arg_name(struct spdk_file *f) 725 { 726 f->trace_arg_name = 0; 727 memcpy(&f->trace_arg_name, f->name, 728 spdk_min(sizeof(f->trace_arg_name), strlen(f->name))); 729 } 730 731 static void 732 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 733 { 734 struct spdk_fs_request *req = ctx; 735 struct spdk_fs_cb_args *args = &req->args; 736 struct spdk_filesystem *fs = args->fs; 737 uint64_t *length; 738 const char *name; 739 uint32_t *is_deleted; 740 size_t value_len; 741 742 if (rc < 0) { 743 args->fn.fs_op_with_handle(args->arg, fs, rc); 744 free_fs_request(req); 745 return; 746 } 747 748 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 749 if (rc < 0) { 750 args->fn.fs_op_with_handle(args->arg, fs, rc); 751 free_fs_request(req); 752 return; 753 } 754 755 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 756 if (rc < 0) { 757 args->fn.fs_op_with_handle(args->arg, fs, rc); 758 free_fs_request(req); 759 return; 760 } 761 762 assert(value_len == 8); 763 764 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 765 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 766 if (rc < 0) { 767 struct spdk_file *f; 768 769 f = file_alloc(fs); 770 if (f == NULL) { 771 SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n"); 772 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 773 free_fs_request(req); 774 return; 775 } 776 777 f->name = strdup(name); 778 _file_build_trace_arg_name(f); 779 f->blobid = spdk_blob_get_id(blob); 780 f->length = *length; 781 f->length_flushed = *length; 782 f->length_xattr = *length; 783 f->append_pos = *length; 784 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 785 } else { 786 struct spdk_deleted_file *deleted_file; 787 788 deleted_file = calloc(1, sizeof(*deleted_file)); 789 if (deleted_file == NULL) { 790 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 791 free_fs_request(req); 792 return; 793 } 794 deleted_file->id = spdk_blob_get_id(blob); 795 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 796 } 797 } 798 799 static void 800 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 801 { 802 struct spdk_fs_request *req = ctx; 803 struct spdk_fs_cb_args *args = &req->args; 804 struct spdk_filesystem *fs = args->fs; 805 struct spdk_bs_type bstype; 806 static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE}; 807 static const struct spdk_bs_type zeros; 808 809 if (bserrno != 0) { 810 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 811 free_fs_request(req); 812 spdk_fs_free_io_channels(fs); 813 spdk_fs_io_device_unregister(fs); 814 return; 815 } 816 817 bstype = spdk_bs_get_bstype(bs); 818 819 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 820 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n"); 821 spdk_bs_set_bstype(bs, blobfs_type); 822 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 823 SPDK_ERRLOG("not blobfs\n"); 824 SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype)); 825 args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL); 826 free_fs_request(req); 827 spdk_fs_free_io_channels(fs); 828 spdk_fs_io_device_unregister(fs); 829 return; 830 } 831 832 common_fs_bs_init(fs, bs); 833 fs_load_done(req, 0); 834 } 835 836 static void 837 spdk_fs_io_device_unregister(struct spdk_filesystem *fs) 838 { 839 assert(fs != NULL); 840 spdk_io_device_unregister(&fs->md_target, NULL); 841 spdk_io_device_unregister(&fs->sync_target, NULL); 842 spdk_io_device_unregister(&fs->io_target, NULL); 843 free(fs); 844 } 845 846 static void 847 spdk_fs_free_io_channels(struct spdk_filesystem *fs) 848 { 849 assert(fs != NULL); 850 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 851 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 852 } 853 854 void 855 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 856 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 857 { 858 struct spdk_filesystem *fs; 859 struct spdk_fs_cb_args *args; 860 struct spdk_fs_request *req; 861 struct spdk_bs_opts bs_opts; 862 863 fs = fs_alloc(dev, send_request_fn); 864 if (fs == NULL) { 865 cb_fn(cb_arg, NULL, -ENOMEM); 866 return; 867 } 868 869 fs_conf_parse(); 870 871 req = alloc_fs_request(fs->md_target.md_fs_channel); 872 if (req == NULL) { 873 spdk_fs_free_io_channels(fs); 874 spdk_fs_io_device_unregister(fs); 875 cb_fn(cb_arg, NULL, -ENOMEM); 876 return; 877 } 878 879 args = &req->args; 880 args->fn.fs_op_with_handle = cb_fn; 881 args->arg = cb_arg; 882 args->fs = fs; 883 TAILQ_INIT(&args->op.fs_load.deleted_files); 884 spdk_bs_opts_init(&bs_opts); 885 bs_opts.iter_cb_fn = iter_cb; 886 bs_opts.iter_cb_arg = req; 887 spdk_bs_load(dev, &bs_opts, load_cb, req); 888 } 889 890 static void 891 unload_cb(void *ctx, int bserrno) 892 { 893 struct spdk_fs_request *req = ctx; 894 struct spdk_fs_cb_args *args = &req->args; 895 struct spdk_filesystem *fs = args->fs; 896 struct spdk_file *file, *tmp; 897 898 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 899 TAILQ_REMOVE(&fs->files, file, tailq); 900 cache_free_buffers(file); 901 free(file->name); 902 free(file->tree); 903 free(file); 904 } 905 906 free_global_cache(); 907 908 args->fn.fs_op(args->arg, bserrno); 909 free(req); 910 911 spdk_fs_io_device_unregister(fs); 912 } 913 914 void 915 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 916 { 917 struct spdk_fs_request *req; 918 struct spdk_fs_cb_args *args; 919 920 /* 921 * We must free the md_channel before unloading the blobstore, so just 922 * allocate this request from the general heap. 923 */ 924 req = calloc(1, sizeof(*req)); 925 if (req == NULL) { 926 cb_fn(cb_arg, -ENOMEM); 927 return; 928 } 929 930 args = &req->args; 931 args->fn.fs_op = cb_fn; 932 args->arg = cb_arg; 933 args->fs = fs; 934 935 spdk_fs_free_io_channels(fs); 936 spdk_bs_unload(fs->bs, unload_cb, req); 937 } 938 939 static struct spdk_file * 940 fs_find_file(struct spdk_filesystem *fs, const char *name) 941 { 942 struct spdk_file *file; 943 944 TAILQ_FOREACH(file, &fs->files, tailq) { 945 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 946 return file; 947 } 948 } 949 950 return NULL; 951 } 952 953 void 954 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 955 spdk_file_stat_op_complete cb_fn, void *cb_arg) 956 { 957 struct spdk_file_stat stat; 958 struct spdk_file *f = NULL; 959 960 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 961 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 962 return; 963 } 964 965 f = fs_find_file(fs, name); 966 if (f != NULL) { 967 stat.blobid = f->blobid; 968 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 969 cb_fn(cb_arg, &stat, 0); 970 return; 971 } 972 973 cb_fn(cb_arg, NULL, -ENOENT); 974 } 975 976 static void 977 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 978 { 979 struct spdk_fs_request *req = arg; 980 struct spdk_fs_cb_args *args = &req->args; 981 982 args->rc = fserrno; 983 if (fserrno == 0) { 984 memcpy(args->arg, stat, sizeof(*stat)); 985 } 986 sem_post(args->sem); 987 } 988 989 static void 990 __file_stat(void *arg) 991 { 992 struct spdk_fs_request *req = arg; 993 struct spdk_fs_cb_args *args = &req->args; 994 995 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 996 args->fn.stat_op, req); 997 } 998 999 int 1000 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1001 const char *name, struct spdk_file_stat *stat) 1002 { 1003 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1004 struct spdk_fs_request *req; 1005 int rc; 1006 1007 req = alloc_fs_request(channel); 1008 if (req == NULL) { 1009 SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name); 1010 return -ENOMEM; 1011 } 1012 1013 req->args.fs = fs; 1014 req->args.op.stat.name = name; 1015 req->args.fn.stat_op = __copy_stat; 1016 req->args.arg = stat; 1017 req->args.sem = &channel->sem; 1018 channel->send_request(__file_stat, req); 1019 sem_wait(&channel->sem); 1020 1021 rc = req->args.rc; 1022 free_fs_request(req); 1023 1024 return rc; 1025 } 1026 1027 static void 1028 fs_create_blob_close_cb(void *ctx, int bserrno) 1029 { 1030 int rc; 1031 struct spdk_fs_request *req = ctx; 1032 struct spdk_fs_cb_args *args = &req->args; 1033 1034 rc = args->rc ? args->rc : bserrno; 1035 args->fn.file_op(args->arg, rc); 1036 free_fs_request(req); 1037 } 1038 1039 static void 1040 fs_create_blob_resize_cb(void *ctx, int bserrno) 1041 { 1042 struct spdk_fs_request *req = ctx; 1043 struct spdk_fs_cb_args *args = &req->args; 1044 struct spdk_file *f = args->file; 1045 struct spdk_blob *blob = args->op.create.blob; 1046 uint64_t length = 0; 1047 1048 args->rc = bserrno; 1049 if (bserrno) { 1050 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1051 return; 1052 } 1053 1054 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 1055 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 1056 1057 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1058 } 1059 1060 static void 1061 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1062 { 1063 struct spdk_fs_request *req = ctx; 1064 struct spdk_fs_cb_args *args = &req->args; 1065 1066 if (bserrno) { 1067 args->fn.file_op(args->arg, bserrno); 1068 free_fs_request(req); 1069 return; 1070 } 1071 1072 args->op.create.blob = blob; 1073 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 1074 } 1075 1076 static void 1077 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 1078 { 1079 struct spdk_fs_request *req = ctx; 1080 struct spdk_fs_cb_args *args = &req->args; 1081 struct spdk_file *f = args->file; 1082 1083 if (bserrno) { 1084 args->fn.file_op(args->arg, bserrno); 1085 free_fs_request(req); 1086 return; 1087 } 1088 1089 f->blobid = blobid; 1090 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 1091 } 1092 1093 void 1094 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 1095 spdk_file_op_complete cb_fn, void *cb_arg) 1096 { 1097 struct spdk_file *file; 1098 struct spdk_fs_request *req; 1099 struct spdk_fs_cb_args *args; 1100 1101 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1102 cb_fn(cb_arg, -ENAMETOOLONG); 1103 return; 1104 } 1105 1106 file = fs_find_file(fs, name); 1107 if (file != NULL) { 1108 cb_fn(cb_arg, -EEXIST); 1109 return; 1110 } 1111 1112 file = file_alloc(fs); 1113 if (file == NULL) { 1114 SPDK_ERRLOG("Cannot allocate new file for creation\n"); 1115 cb_fn(cb_arg, -ENOMEM); 1116 return; 1117 } 1118 1119 req = alloc_fs_request(fs->md_target.md_fs_channel); 1120 if (req == NULL) { 1121 SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name); 1122 cb_fn(cb_arg, -ENOMEM); 1123 return; 1124 } 1125 1126 args = &req->args; 1127 args->file = file; 1128 args->fn.file_op = cb_fn; 1129 args->arg = cb_arg; 1130 1131 file->name = strdup(name); 1132 _file_build_trace_arg_name(file); 1133 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 1134 } 1135 1136 static void 1137 __fs_create_file_done(void *arg, int fserrno) 1138 { 1139 struct spdk_fs_request *req = arg; 1140 struct spdk_fs_cb_args *args = &req->args; 1141 1142 __wake_caller(args, fserrno); 1143 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1144 } 1145 1146 static void 1147 __fs_create_file(void *arg) 1148 { 1149 struct spdk_fs_request *req = arg; 1150 struct spdk_fs_cb_args *args = &req->args; 1151 1152 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1153 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1154 } 1155 1156 int 1157 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name) 1158 { 1159 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1160 struct spdk_fs_request *req; 1161 struct spdk_fs_cb_args *args; 1162 int rc; 1163 1164 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1165 1166 req = alloc_fs_request(channel); 1167 if (req == NULL) { 1168 SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name); 1169 return -ENOMEM; 1170 } 1171 1172 args = &req->args; 1173 args->fs = fs; 1174 args->op.create.name = name; 1175 args->sem = &channel->sem; 1176 fs->send_request(__fs_create_file, req); 1177 sem_wait(&channel->sem); 1178 rc = args->rc; 1179 free_fs_request(req); 1180 1181 return rc; 1182 } 1183 1184 static void 1185 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1186 { 1187 struct spdk_fs_request *req = ctx; 1188 struct spdk_fs_cb_args *args = &req->args; 1189 struct spdk_file *f = args->file; 1190 1191 f->blob = blob; 1192 while (!TAILQ_EMPTY(&f->open_requests)) { 1193 req = TAILQ_FIRST(&f->open_requests); 1194 args = &req->args; 1195 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1196 spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name); 1197 args->fn.file_op_with_handle(args->arg, f, bserrno); 1198 free_fs_request(req); 1199 } 1200 } 1201 1202 static void 1203 fs_open_blob_create_cb(void *ctx, int bserrno) 1204 { 1205 struct spdk_fs_request *req = ctx; 1206 struct spdk_fs_cb_args *args = &req->args; 1207 struct spdk_file *file = args->file; 1208 struct spdk_filesystem *fs = args->fs; 1209 1210 if (file == NULL) { 1211 /* 1212 * This is from an open with CREATE flag - the file 1213 * is now created so look it up in the file list for this 1214 * filesystem. 1215 */ 1216 file = fs_find_file(fs, args->op.open.name); 1217 assert(file != NULL); 1218 args->file = file; 1219 } 1220 1221 file->ref_count++; 1222 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1223 if (file->ref_count == 1) { 1224 assert(file->blob == NULL); 1225 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1226 } else if (file->blob != NULL) { 1227 fs_open_blob_done(req, file->blob, 0); 1228 } else { 1229 /* 1230 * The blob open for this file is in progress due to a previous 1231 * open request. When that open completes, it will invoke the 1232 * open callback for this request. 1233 */ 1234 } 1235 } 1236 1237 void 1238 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1239 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1240 { 1241 struct spdk_file *f = NULL; 1242 struct spdk_fs_request *req; 1243 struct spdk_fs_cb_args *args; 1244 1245 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1246 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1247 return; 1248 } 1249 1250 f = fs_find_file(fs, name); 1251 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1252 cb_fn(cb_arg, NULL, -ENOENT); 1253 return; 1254 } 1255 1256 if (f != NULL && f->is_deleted == true) { 1257 cb_fn(cb_arg, NULL, -ENOENT); 1258 return; 1259 } 1260 1261 req = alloc_fs_request(fs->md_target.md_fs_channel); 1262 if (req == NULL) { 1263 SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name); 1264 cb_fn(cb_arg, NULL, -ENOMEM); 1265 return; 1266 } 1267 1268 args = &req->args; 1269 args->fn.file_op_with_handle = cb_fn; 1270 args->arg = cb_arg; 1271 args->file = f; 1272 args->fs = fs; 1273 args->op.open.name = name; 1274 1275 if (f == NULL) { 1276 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1277 } else { 1278 fs_open_blob_create_cb(req, 0); 1279 } 1280 } 1281 1282 static void 1283 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1284 { 1285 struct spdk_fs_request *req = arg; 1286 struct spdk_fs_cb_args *args = &req->args; 1287 1288 args->file = file; 1289 __wake_caller(args, bserrno); 1290 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1291 } 1292 1293 static void 1294 __fs_open_file(void *arg) 1295 { 1296 struct spdk_fs_request *req = arg; 1297 struct spdk_fs_cb_args *args = &req->args; 1298 1299 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1300 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1301 __fs_open_file_done, req); 1302 } 1303 1304 int 1305 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1306 const char *name, uint32_t flags, struct spdk_file **file) 1307 { 1308 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1309 struct spdk_fs_request *req; 1310 struct spdk_fs_cb_args *args; 1311 int rc; 1312 1313 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1314 1315 req = alloc_fs_request(channel); 1316 if (req == NULL) { 1317 SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name); 1318 return -ENOMEM; 1319 } 1320 1321 args = &req->args; 1322 args->fs = fs; 1323 args->op.open.name = name; 1324 args->op.open.flags = flags; 1325 args->sem = &channel->sem; 1326 fs->send_request(__fs_open_file, req); 1327 sem_wait(&channel->sem); 1328 rc = args->rc; 1329 if (rc == 0) { 1330 *file = args->file; 1331 } else { 1332 *file = NULL; 1333 } 1334 free_fs_request(req); 1335 1336 return rc; 1337 } 1338 1339 static void 1340 fs_rename_blob_close_cb(void *ctx, int bserrno) 1341 { 1342 struct spdk_fs_request *req = ctx; 1343 struct spdk_fs_cb_args *args = &req->args; 1344 1345 args->fn.fs_op(args->arg, bserrno); 1346 free_fs_request(req); 1347 } 1348 1349 static void 1350 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1351 { 1352 struct spdk_fs_request *req = ctx; 1353 struct spdk_fs_cb_args *args = &req->args; 1354 const char *new_name = args->op.rename.new_name; 1355 1356 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1357 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1358 } 1359 1360 static void 1361 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1362 { 1363 struct spdk_fs_cb_args *args = &req->args; 1364 struct spdk_file *f; 1365 1366 f = fs_find_file(args->fs, args->op.rename.old_name); 1367 if (f == NULL) { 1368 args->fn.fs_op(args->arg, -ENOENT); 1369 free_fs_request(req); 1370 return; 1371 } 1372 1373 free(f->name); 1374 f->name = strdup(args->op.rename.new_name); 1375 _file_build_trace_arg_name(f); 1376 args->file = f; 1377 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1378 } 1379 1380 static void 1381 fs_rename_delete_done(void *arg, int fserrno) 1382 { 1383 __spdk_fs_md_rename_file(arg); 1384 } 1385 1386 void 1387 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1388 const char *old_name, const char *new_name, 1389 spdk_file_op_complete cb_fn, void *cb_arg) 1390 { 1391 struct spdk_file *f; 1392 struct spdk_fs_request *req; 1393 struct spdk_fs_cb_args *args; 1394 1395 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1396 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1397 cb_fn(cb_arg, -ENAMETOOLONG); 1398 return; 1399 } 1400 1401 req = alloc_fs_request(fs->md_target.md_fs_channel); 1402 if (req == NULL) { 1403 SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name, 1404 new_name); 1405 cb_fn(cb_arg, -ENOMEM); 1406 return; 1407 } 1408 1409 args = &req->args; 1410 args->fn.fs_op = cb_fn; 1411 args->fs = fs; 1412 args->arg = cb_arg; 1413 args->op.rename.old_name = old_name; 1414 args->op.rename.new_name = new_name; 1415 1416 f = fs_find_file(fs, new_name); 1417 if (f == NULL) { 1418 __spdk_fs_md_rename_file(req); 1419 return; 1420 } 1421 1422 /* 1423 * The rename overwrites an existing file. So delete the existing file, then 1424 * do the actual rename. 1425 */ 1426 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1427 } 1428 1429 static void 1430 __fs_rename_file_done(void *arg, int fserrno) 1431 { 1432 struct spdk_fs_request *req = arg; 1433 struct spdk_fs_cb_args *args = &req->args; 1434 1435 __wake_caller(args, fserrno); 1436 } 1437 1438 static void 1439 __fs_rename_file(void *arg) 1440 { 1441 struct spdk_fs_request *req = arg; 1442 struct spdk_fs_cb_args *args = &req->args; 1443 1444 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1445 __fs_rename_file_done, req); 1446 } 1447 1448 int 1449 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1450 const char *old_name, const char *new_name) 1451 { 1452 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1453 struct spdk_fs_request *req; 1454 struct spdk_fs_cb_args *args; 1455 int rc; 1456 1457 req = alloc_fs_request(channel); 1458 if (req == NULL) { 1459 SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name); 1460 return -ENOMEM; 1461 } 1462 1463 args = &req->args; 1464 1465 args->fs = fs; 1466 args->op.rename.old_name = old_name; 1467 args->op.rename.new_name = new_name; 1468 args->sem = &channel->sem; 1469 fs->send_request(__fs_rename_file, req); 1470 sem_wait(&channel->sem); 1471 rc = args->rc; 1472 free_fs_request(req); 1473 return rc; 1474 } 1475 1476 static void 1477 blob_delete_cb(void *ctx, int bserrno) 1478 { 1479 struct spdk_fs_request *req = ctx; 1480 struct spdk_fs_cb_args *args = &req->args; 1481 1482 args->fn.file_op(args->arg, bserrno); 1483 free_fs_request(req); 1484 } 1485 1486 void 1487 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1488 spdk_file_op_complete cb_fn, void *cb_arg) 1489 { 1490 struct spdk_file *f; 1491 spdk_blob_id blobid; 1492 struct spdk_fs_request *req; 1493 struct spdk_fs_cb_args *args; 1494 1495 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1496 1497 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1498 cb_fn(cb_arg, -ENAMETOOLONG); 1499 return; 1500 } 1501 1502 f = fs_find_file(fs, name); 1503 if (f == NULL) { 1504 SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name); 1505 cb_fn(cb_arg, -ENOENT); 1506 return; 1507 } 1508 1509 req = alloc_fs_request(fs->md_target.md_fs_channel); 1510 if (req == NULL) { 1511 SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name); 1512 cb_fn(cb_arg, -ENOMEM); 1513 return; 1514 } 1515 1516 args = &req->args; 1517 args->fn.file_op = cb_fn; 1518 args->arg = cb_arg; 1519 1520 if (f->ref_count > 0) { 1521 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1522 f->is_deleted = true; 1523 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1524 spdk_blob_sync_md(f->blob, blob_delete_cb, req); 1525 return; 1526 } 1527 1528 TAILQ_REMOVE(&fs->files, f, tailq); 1529 1530 /* It's safe to free cache buffers here while another thread 1531 * is trying to free the same file cache buffers, because each 1532 * thread will get the g_caches_lock first. 1533 */ 1534 cache_free_buffers(f); 1535 1536 blobid = f->blobid; 1537 1538 free(f->name); 1539 free(f->tree); 1540 free(f); 1541 1542 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1543 } 1544 1545 static uint64_t 1546 fs_name_to_uint64(const char *name) 1547 { 1548 uint64_t result = 0; 1549 memcpy(&result, name, spdk_min(sizeof(result), strlen(name))); 1550 return result; 1551 } 1552 1553 static void 1554 __fs_delete_file_done(void *arg, int fserrno) 1555 { 1556 struct spdk_fs_request *req = arg; 1557 struct spdk_fs_cb_args *args = &req->args; 1558 1559 spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, fs_name_to_uint64(args->op.delete.name)); 1560 __wake_caller(args, fserrno); 1561 } 1562 1563 static void 1564 __fs_delete_file(void *arg) 1565 { 1566 struct spdk_fs_request *req = arg; 1567 struct spdk_fs_cb_args *args = &req->args; 1568 1569 spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, fs_name_to_uint64(args->op.delete.name)); 1570 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1571 } 1572 1573 int 1574 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1575 const char *name) 1576 { 1577 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1578 struct spdk_fs_request *req; 1579 struct spdk_fs_cb_args *args; 1580 int rc; 1581 1582 req = alloc_fs_request(channel); 1583 if (req == NULL) { 1584 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Cannot allocate req to delete file=%s\n", name); 1585 return -ENOMEM; 1586 } 1587 1588 args = &req->args; 1589 args->fs = fs; 1590 args->op.delete.name = name; 1591 args->sem = &channel->sem; 1592 fs->send_request(__fs_delete_file, req); 1593 sem_wait(&channel->sem); 1594 rc = args->rc; 1595 free_fs_request(req); 1596 1597 return rc; 1598 } 1599 1600 spdk_fs_iter 1601 spdk_fs_iter_first(struct spdk_filesystem *fs) 1602 { 1603 struct spdk_file *f; 1604 1605 f = TAILQ_FIRST(&fs->files); 1606 return f; 1607 } 1608 1609 spdk_fs_iter 1610 spdk_fs_iter_next(spdk_fs_iter iter) 1611 { 1612 struct spdk_file *f = iter; 1613 1614 if (f == NULL) { 1615 return NULL; 1616 } 1617 1618 f = TAILQ_NEXT(f, tailq); 1619 return f; 1620 } 1621 1622 const char * 1623 spdk_file_get_name(struct spdk_file *file) 1624 { 1625 return file->name; 1626 } 1627 1628 uint64_t 1629 spdk_file_get_length(struct spdk_file *file) 1630 { 1631 uint64_t length; 1632 1633 assert(file != NULL); 1634 1635 length = file->append_pos >= file->length ? file->append_pos : file->length; 1636 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length); 1637 return length; 1638 } 1639 1640 static void 1641 fs_truncate_complete_cb(void *ctx, int bserrno) 1642 { 1643 struct spdk_fs_request *req = ctx; 1644 struct spdk_fs_cb_args *args = &req->args; 1645 1646 args->fn.file_op(args->arg, bserrno); 1647 free_fs_request(req); 1648 } 1649 1650 static void 1651 fs_truncate_resize_cb(void *ctx, int bserrno) 1652 { 1653 struct spdk_fs_request *req = ctx; 1654 struct spdk_fs_cb_args *args = &req->args; 1655 struct spdk_file *file = args->file; 1656 uint64_t *length = &args->op.truncate.length; 1657 1658 if (bserrno) { 1659 args->fn.file_op(args->arg, bserrno); 1660 free_fs_request(req); 1661 return; 1662 } 1663 1664 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1665 1666 file->length = *length; 1667 if (file->append_pos > file->length) { 1668 file->append_pos = file->length; 1669 } 1670 1671 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req); 1672 } 1673 1674 static uint64_t 1675 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1676 { 1677 return (length + cluster_sz - 1) / cluster_sz; 1678 } 1679 1680 void 1681 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1682 spdk_file_op_complete cb_fn, void *cb_arg) 1683 { 1684 struct spdk_filesystem *fs; 1685 size_t num_clusters; 1686 struct spdk_fs_request *req; 1687 struct spdk_fs_cb_args *args; 1688 1689 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1690 if (length == file->length) { 1691 cb_fn(cb_arg, 0); 1692 return; 1693 } 1694 1695 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1696 if (req == NULL) { 1697 cb_fn(cb_arg, -ENOMEM); 1698 return; 1699 } 1700 1701 args = &req->args; 1702 args->fn.file_op = cb_fn; 1703 args->arg = cb_arg; 1704 args->file = file; 1705 args->op.truncate.length = length; 1706 fs = file->fs; 1707 1708 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1709 1710 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1711 } 1712 1713 static void 1714 __truncate(void *arg) 1715 { 1716 struct spdk_fs_request *req = arg; 1717 struct spdk_fs_cb_args *args = &req->args; 1718 1719 spdk_file_truncate_async(args->file, args->op.truncate.length, 1720 args->fn.file_op, args); 1721 } 1722 1723 int 1724 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 1725 uint64_t length) 1726 { 1727 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1728 struct spdk_fs_request *req; 1729 struct spdk_fs_cb_args *args; 1730 int rc; 1731 1732 req = alloc_fs_request(channel); 1733 if (req == NULL) { 1734 return -ENOMEM; 1735 } 1736 1737 args = &req->args; 1738 1739 args->file = file; 1740 args->op.truncate.length = length; 1741 args->fn.file_op = __wake_caller; 1742 args->sem = &channel->sem; 1743 1744 channel->send_request(__truncate, req); 1745 sem_wait(&channel->sem); 1746 rc = args->rc; 1747 free_fs_request(req); 1748 1749 return rc; 1750 } 1751 1752 static void 1753 __rw_done(void *ctx, int bserrno) 1754 { 1755 struct spdk_fs_request *req = ctx; 1756 struct spdk_fs_cb_args *args = &req->args; 1757 1758 spdk_free(args->op.rw.pin_buf); 1759 args->fn.file_op(args->arg, bserrno); 1760 free_fs_request(req); 1761 } 1762 1763 static void 1764 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 1765 { 1766 int i; 1767 size_t len; 1768 1769 for (i = 0; i < iovcnt; i++) { 1770 len = spdk_min(iovs[i].iov_len, buf_len); 1771 memcpy(buf, iovs[i].iov_base, len); 1772 buf += len; 1773 assert(buf_len >= len); 1774 buf_len -= len; 1775 } 1776 } 1777 1778 static void 1779 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 1780 { 1781 int i; 1782 size_t len; 1783 1784 for (i = 0; i < iovcnt; i++) { 1785 len = spdk_min(iovs[i].iov_len, buf_len); 1786 memcpy(iovs[i].iov_base, buf, len); 1787 buf += len; 1788 assert(buf_len >= len); 1789 buf_len -= len; 1790 } 1791 } 1792 1793 static void 1794 __read_done(void *ctx, int bserrno) 1795 { 1796 struct spdk_fs_request *req = ctx; 1797 struct spdk_fs_cb_args *args = &req->args; 1798 void *buf; 1799 1800 assert(req != NULL); 1801 buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1))); 1802 if (args->op.rw.is_read) { 1803 _copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length); 1804 __rw_done(req, 0); 1805 } else { 1806 _copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt); 1807 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1808 args->op.rw.pin_buf, 1809 args->op.rw.start_lba, args->op.rw.num_lba, 1810 __rw_done, req); 1811 } 1812 } 1813 1814 static void 1815 __do_blob_read(void *ctx, int fserrno) 1816 { 1817 struct spdk_fs_request *req = ctx; 1818 struct spdk_fs_cb_args *args = &req->args; 1819 1820 if (fserrno) { 1821 __rw_done(req, fserrno); 1822 return; 1823 } 1824 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1825 args->op.rw.pin_buf, 1826 args->op.rw.start_lba, args->op.rw.num_lba, 1827 __read_done, req); 1828 } 1829 1830 static void 1831 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1832 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1833 { 1834 uint64_t end_lba; 1835 1836 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1837 *start_lba = offset / *lba_size; 1838 end_lba = (offset + length - 1) / *lba_size; 1839 *num_lba = (end_lba - *start_lba + 1); 1840 } 1841 1842 static bool 1843 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length) 1844 { 1845 uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1846 1847 if ((offset % lba_size == 0) && (length % lba_size == 0)) { 1848 return true; 1849 } 1850 1851 return false; 1852 } 1853 1854 static void 1855 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt) 1856 { 1857 uint32_t i; 1858 1859 for (i = 0; i < iovcnt; i++) { 1860 req->args.iovs[i].iov_base = iovs[i].iov_base; 1861 req->args.iovs[i].iov_len = iovs[i].iov_len; 1862 } 1863 } 1864 1865 static void 1866 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel, 1867 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1868 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1869 { 1870 struct spdk_fs_request *req; 1871 struct spdk_fs_cb_args *args; 1872 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1873 uint64_t start_lba, num_lba, pin_buf_length; 1874 uint32_t lba_size; 1875 1876 if (is_read && offset + length > file->length) { 1877 cb_fn(cb_arg, -EINVAL); 1878 return; 1879 } 1880 1881 req = alloc_fs_request_with_iov(channel, iovcnt); 1882 if (req == NULL) { 1883 cb_fn(cb_arg, -ENOMEM); 1884 return; 1885 } 1886 1887 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1888 1889 args = &req->args; 1890 args->fn.file_op = cb_fn; 1891 args->arg = cb_arg; 1892 args->file = file; 1893 args->op.rw.channel = channel->bs_channel; 1894 _fs_request_setup_iovs(req, iovs, iovcnt); 1895 args->op.rw.is_read = is_read; 1896 args->op.rw.offset = offset; 1897 args->op.rw.blocklen = lba_size; 1898 1899 pin_buf_length = num_lba * lba_size; 1900 args->op.rw.length = pin_buf_length; 1901 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL, 1902 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1903 if (args->op.rw.pin_buf == NULL) { 1904 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1905 file->name, offset, length); 1906 free_fs_request(req); 1907 cb_fn(cb_arg, -ENOMEM); 1908 return; 1909 } 1910 1911 args->op.rw.start_lba = start_lba; 1912 args->op.rw.num_lba = num_lba; 1913 1914 if (!is_read && file->length < offset + length) { 1915 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1916 } else if (!is_read && __is_lba_aligned(file, offset, length)) { 1917 _copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt); 1918 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1919 args->op.rw.pin_buf, 1920 args->op.rw.start_lba, args->op.rw.num_lba, 1921 __rw_done, req); 1922 } else { 1923 __do_blob_read(req, 0); 1924 } 1925 } 1926 1927 static void 1928 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel, 1929 void *payload, uint64_t offset, uint64_t length, 1930 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1931 { 1932 struct iovec iov; 1933 1934 iov.iov_base = payload; 1935 iov.iov_len = (size_t)length; 1936 1937 __readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read); 1938 } 1939 1940 void 1941 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1942 void *payload, uint64_t offset, uint64_t length, 1943 spdk_file_op_complete cb_fn, void *cb_arg) 1944 { 1945 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1946 } 1947 1948 void 1949 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel, 1950 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1951 spdk_file_op_complete cb_fn, void *cb_arg) 1952 { 1953 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1954 file->name, offset, length); 1955 1956 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0); 1957 } 1958 1959 void 1960 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1961 void *payload, uint64_t offset, uint64_t length, 1962 spdk_file_op_complete cb_fn, void *cb_arg) 1963 { 1964 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1965 file->name, offset, length); 1966 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1967 } 1968 1969 void 1970 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel, 1971 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1972 spdk_file_op_complete cb_fn, void *cb_arg) 1973 { 1974 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1975 file->name, offset, length); 1976 1977 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1); 1978 } 1979 1980 struct spdk_io_channel * 1981 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1982 { 1983 struct spdk_io_channel *io_channel; 1984 struct spdk_fs_channel *fs_channel; 1985 1986 io_channel = spdk_get_io_channel(&fs->io_target); 1987 fs_channel = spdk_io_channel_get_ctx(io_channel); 1988 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1989 fs_channel->send_request = __send_request_direct; 1990 1991 return io_channel; 1992 } 1993 1994 void 1995 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1996 { 1997 spdk_put_io_channel(channel); 1998 } 1999 2000 struct spdk_fs_thread_ctx * 2001 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs) 2002 { 2003 struct spdk_fs_thread_ctx *ctx; 2004 2005 ctx = calloc(1, sizeof(*ctx)); 2006 if (!ctx) { 2007 return NULL; 2008 } 2009 2010 _spdk_fs_channel_create(fs, &ctx->ch, 512); 2011 2012 ctx->ch.send_request = fs->send_request; 2013 ctx->ch.sync = 1; 2014 pthread_spin_init(&ctx->ch.lock, 0); 2015 2016 return ctx; 2017 } 2018 2019 2020 void 2021 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx) 2022 { 2023 assert(ctx->ch.sync == 1); 2024 2025 while (true) { 2026 pthread_spin_lock(&ctx->ch.lock); 2027 if (ctx->ch.outstanding_reqs == 0) { 2028 pthread_spin_unlock(&ctx->ch.lock); 2029 break; 2030 } 2031 pthread_spin_unlock(&ctx->ch.lock); 2032 usleep(1000); 2033 } 2034 2035 _spdk_fs_channel_destroy(NULL, &ctx->ch); 2036 free(ctx); 2037 } 2038 2039 int 2040 spdk_fs_set_cache_size(uint64_t size_in_mb) 2041 { 2042 /* setting g_fs_cache_size is only permitted if cache pool 2043 * is already freed or hasn't been initialized 2044 */ 2045 if (g_cache_pool != NULL) { 2046 return -EPERM; 2047 } 2048 2049 g_fs_cache_size = size_in_mb * 1024 * 1024; 2050 2051 return 0; 2052 } 2053 2054 uint64_t 2055 spdk_fs_get_cache_size(void) 2056 { 2057 return g_fs_cache_size / (1024 * 1024); 2058 } 2059 2060 static void __file_flush(void *ctx); 2061 2062 /* Try to free some cache buffers of this file, this function must 2063 * be called while holding g_caches_lock. 2064 */ 2065 static int 2066 reclaim_cache_buffers(struct spdk_file *file) 2067 { 2068 int rc; 2069 2070 BLOBFS_TRACE(file, "free=%s\n", file->name); 2071 2072 /* The function is safe to be called with any threads, while the file 2073 * lock maybe locked by other thread for now, so try to get the file 2074 * lock here. 2075 */ 2076 rc = pthread_spin_trylock(&file->lock); 2077 if (rc != 0) { 2078 return -1; 2079 } 2080 2081 if (file->tree->present_mask == 0) { 2082 pthread_spin_unlock(&file->lock); 2083 return -1; 2084 } 2085 spdk_tree_free_buffers(file->tree); 2086 2087 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2088 /* If not freed, put it in the end of the queue */ 2089 if (file->tree->present_mask != 0) { 2090 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2091 } else { 2092 file->last = NULL; 2093 } 2094 pthread_spin_unlock(&file->lock); 2095 2096 return 0; 2097 } 2098 2099 static int 2100 _blobfs_cache_pool_reclaim(void *arg) 2101 { 2102 struct spdk_file *file, *tmp; 2103 int rc; 2104 2105 if (!blobfs_cache_pool_need_reclaim()) { 2106 return 0; 2107 } 2108 2109 pthread_spin_lock(&g_caches_lock); 2110 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2111 if (!file->open_for_writing && 2112 file->priority == SPDK_FILE_PRIORITY_LOW) { 2113 rc = reclaim_cache_buffers(file); 2114 if (rc < 0) { 2115 continue; 2116 } 2117 if (!blobfs_cache_pool_need_reclaim()) { 2118 pthread_spin_unlock(&g_caches_lock); 2119 return 1; 2120 } 2121 break; 2122 } 2123 } 2124 2125 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2126 if (!file->open_for_writing) { 2127 rc = reclaim_cache_buffers(file); 2128 if (rc < 0) { 2129 continue; 2130 } 2131 if (!blobfs_cache_pool_need_reclaim()) { 2132 pthread_spin_unlock(&g_caches_lock); 2133 return 1; 2134 } 2135 break; 2136 } 2137 } 2138 2139 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2140 rc = reclaim_cache_buffers(file); 2141 if (rc < 0) { 2142 continue; 2143 } 2144 break; 2145 } 2146 pthread_spin_unlock(&g_caches_lock); 2147 2148 return 1; 2149 } 2150 2151 static struct cache_buffer * 2152 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 2153 { 2154 struct cache_buffer *buf; 2155 int count = 0; 2156 2157 buf = calloc(1, sizeof(*buf)); 2158 if (buf == NULL) { 2159 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 2160 return NULL; 2161 } 2162 2163 do { 2164 buf->buf = spdk_mempool_get(g_cache_pool); 2165 if (buf->buf) { 2166 break; 2167 } 2168 if (count++ == 100) { 2169 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 2170 file, offset); 2171 free(buf); 2172 return NULL; 2173 } 2174 usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 2175 } while (true); 2176 2177 buf->buf_size = CACHE_BUFFER_SIZE; 2178 buf->offset = offset; 2179 2180 pthread_spin_lock(&g_caches_lock); 2181 if (file->tree->present_mask == 0) { 2182 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2183 } 2184 file->tree = spdk_tree_insert_buffer(file->tree, buf); 2185 pthread_spin_unlock(&g_caches_lock); 2186 2187 return buf; 2188 } 2189 2190 static struct cache_buffer * 2191 cache_append_buffer(struct spdk_file *file) 2192 { 2193 struct cache_buffer *last; 2194 2195 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 2196 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 2197 2198 last = cache_insert_buffer(file, file->append_pos); 2199 if (last == NULL) { 2200 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 2201 return NULL; 2202 } 2203 2204 file->last = last; 2205 2206 return last; 2207 } 2208 2209 static void __check_sync_reqs(struct spdk_file *file); 2210 2211 static void 2212 __file_cache_finish_sync(void *ctx, int bserrno) 2213 { 2214 struct spdk_file *file; 2215 struct spdk_fs_request *sync_req = ctx; 2216 struct spdk_fs_cb_args *sync_args; 2217 2218 sync_args = &sync_req->args; 2219 file = sync_args->file; 2220 pthread_spin_lock(&file->lock); 2221 file->length_xattr = sync_args->op.sync.length; 2222 assert(sync_args->op.sync.offset <= file->length_flushed); 2223 spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset, 2224 0, file->trace_arg_name); 2225 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 2226 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 2227 pthread_spin_unlock(&file->lock); 2228 2229 sync_args->fn.file_op(sync_args->arg, bserrno); 2230 pthread_spin_lock(&file->lock); 2231 free_fs_request(sync_req); 2232 pthread_spin_unlock(&file->lock); 2233 2234 __check_sync_reqs(file); 2235 } 2236 2237 static void 2238 __check_sync_reqs(struct spdk_file *file) 2239 { 2240 struct spdk_fs_request *sync_req; 2241 2242 pthread_spin_lock(&file->lock); 2243 2244 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 2245 if (sync_req->args.op.sync.offset <= file->length_flushed) { 2246 break; 2247 } 2248 } 2249 2250 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 2251 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 2252 sync_req->args.op.sync.xattr_in_progress = true; 2253 sync_req->args.op.sync.length = file->length_flushed; 2254 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 2255 sizeof(file->length_flushed)); 2256 2257 pthread_spin_unlock(&file->lock); 2258 spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed, 2259 0, file->trace_arg_name); 2260 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req); 2261 } else { 2262 pthread_spin_unlock(&file->lock); 2263 } 2264 } 2265 2266 static void 2267 __file_flush_done(void *ctx, int bserrno) 2268 { 2269 struct spdk_fs_request *req = ctx; 2270 struct spdk_fs_cb_args *args = &req->args; 2271 struct spdk_file *file = args->file; 2272 struct cache_buffer *next = args->op.flush.cache_buffer; 2273 2274 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 2275 2276 pthread_spin_lock(&file->lock); 2277 next->in_progress = false; 2278 next->bytes_flushed += args->op.flush.length; 2279 file->length_flushed += args->op.flush.length; 2280 if (file->length_flushed > file->length) { 2281 file->length = file->length_flushed; 2282 } 2283 if (next->bytes_flushed == next->buf_size) { 2284 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 2285 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 2286 } 2287 2288 /* 2289 * Assert that there is no cached data that extends past the end of the underlying 2290 * blob. 2291 */ 2292 assert(next == NULL || next->offset < __file_get_blob_size(file) || 2293 next->bytes_filled == 0); 2294 2295 pthread_spin_unlock(&file->lock); 2296 2297 __check_sync_reqs(file); 2298 2299 __file_flush(req); 2300 } 2301 2302 static void 2303 __file_flush(void *ctx) 2304 { 2305 struct spdk_fs_request *req = ctx; 2306 struct spdk_fs_cb_args *args = &req->args; 2307 struct spdk_file *file = args->file; 2308 struct cache_buffer *next; 2309 uint64_t offset, length, start_lba, num_lba; 2310 uint32_t lba_size; 2311 2312 pthread_spin_lock(&file->lock); 2313 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 2314 if (next == NULL || next->in_progress || 2315 ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) { 2316 /* 2317 * There is either no data to flush, a flush I/O is already in 2318 * progress, or the next buffer is partially filled but there's no 2319 * outstanding request to sync it. 2320 * So return immediately - if a flush I/O is in progress we will flush 2321 * more data after that is completed, or a partial buffer will get flushed 2322 * when it is either filled or the file is synced. 2323 */ 2324 free_fs_request(req); 2325 if (next == NULL) { 2326 /* 2327 * For cases where a file's cache was evicted, and then the 2328 * file was later appended, we will write the data directly 2329 * to disk and bypass cache. So just update length_flushed 2330 * here to reflect that all data was already written to disk. 2331 */ 2332 file->length_flushed = file->append_pos; 2333 } 2334 pthread_spin_unlock(&file->lock); 2335 if (next == NULL) { 2336 /* 2337 * There is no data to flush, but we still need to check for any 2338 * outstanding sync requests to make sure metadata gets updated. 2339 */ 2340 __check_sync_reqs(file); 2341 } 2342 return; 2343 } 2344 2345 offset = next->offset + next->bytes_flushed; 2346 length = next->bytes_filled - next->bytes_flushed; 2347 if (length == 0) { 2348 free_fs_request(req); 2349 pthread_spin_unlock(&file->lock); 2350 /* 2351 * There is no data to flush, but we still need to check for any 2352 * outstanding sync requests to make sure metadata gets updated. 2353 */ 2354 __check_sync_reqs(file); 2355 return; 2356 } 2357 args->op.flush.length = length; 2358 args->op.flush.cache_buffer = next; 2359 2360 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2361 2362 next->in_progress = true; 2363 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2364 offset, length, start_lba, num_lba); 2365 pthread_spin_unlock(&file->lock); 2366 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2367 next->buf + (start_lba * lba_size) - next->offset, 2368 start_lba, num_lba, __file_flush_done, req); 2369 } 2370 2371 static void 2372 __file_extend_done(void *arg, int bserrno) 2373 { 2374 struct spdk_fs_cb_args *args = arg; 2375 2376 __wake_caller(args, bserrno); 2377 } 2378 2379 static void 2380 __file_extend_resize_cb(void *_args, int bserrno) 2381 { 2382 struct spdk_fs_cb_args *args = _args; 2383 struct spdk_file *file = args->file; 2384 2385 if (bserrno) { 2386 __wake_caller(args, bserrno); 2387 return; 2388 } 2389 2390 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2391 } 2392 2393 static void 2394 __file_extend_blob(void *_args) 2395 { 2396 struct spdk_fs_cb_args *args = _args; 2397 struct spdk_file *file = args->file; 2398 2399 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2400 } 2401 2402 static void 2403 __rw_from_file_done(void *ctx, int bserrno) 2404 { 2405 struct spdk_fs_request *req = ctx; 2406 2407 __wake_caller(&req->args, bserrno); 2408 free_fs_request(req); 2409 } 2410 2411 static void 2412 __rw_from_file(void *ctx) 2413 { 2414 struct spdk_fs_request *req = ctx; 2415 struct spdk_fs_cb_args *args = &req->args; 2416 struct spdk_file *file = args->file; 2417 2418 if (args->op.rw.is_read) { 2419 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2420 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2421 __rw_from_file_done, req); 2422 } else { 2423 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2424 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2425 __rw_from_file_done, req); 2426 } 2427 } 2428 2429 static int 2430 __send_rw_from_file(struct spdk_file *file, void *payload, 2431 uint64_t offset, uint64_t length, bool is_read, 2432 struct spdk_fs_channel *channel) 2433 { 2434 struct spdk_fs_request *req; 2435 struct spdk_fs_cb_args *args; 2436 2437 req = alloc_fs_request_with_iov(channel, 1); 2438 if (req == NULL) { 2439 sem_post(&channel->sem); 2440 return -ENOMEM; 2441 } 2442 2443 args = &req->args; 2444 args->file = file; 2445 args->sem = &channel->sem; 2446 args->iovs[0].iov_base = payload; 2447 args->iovs[0].iov_len = (size_t)length; 2448 args->op.rw.offset = offset; 2449 args->op.rw.is_read = is_read; 2450 file->fs->send_request(__rw_from_file, req); 2451 return 0; 2452 } 2453 2454 int 2455 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2456 void *payload, uint64_t offset, uint64_t length) 2457 { 2458 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2459 struct spdk_fs_request *flush_req; 2460 uint64_t rem_length, copy, blob_size, cluster_sz; 2461 uint32_t cache_buffers_filled = 0; 2462 uint8_t *cur_payload; 2463 struct cache_buffer *last; 2464 2465 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2466 2467 if (length == 0) { 2468 return 0; 2469 } 2470 2471 if (offset != file->append_pos) { 2472 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2473 return -EINVAL; 2474 } 2475 2476 pthread_spin_lock(&file->lock); 2477 file->open_for_writing = true; 2478 2479 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2480 cache_append_buffer(file); 2481 } 2482 2483 if (file->last == NULL) { 2484 int rc; 2485 2486 file->append_pos += length; 2487 pthread_spin_unlock(&file->lock); 2488 rc = __send_rw_from_file(file, payload, offset, length, false, channel); 2489 sem_wait(&channel->sem); 2490 return rc; 2491 } 2492 2493 blob_size = __file_get_blob_size(file); 2494 2495 if ((offset + length) > blob_size) { 2496 struct spdk_fs_cb_args extend_args = {}; 2497 2498 cluster_sz = file->fs->bs_opts.cluster_sz; 2499 extend_args.sem = &channel->sem; 2500 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2501 extend_args.file = file; 2502 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2503 pthread_spin_unlock(&file->lock); 2504 file->fs->send_request(__file_extend_blob, &extend_args); 2505 sem_wait(&channel->sem); 2506 if (extend_args.rc) { 2507 return extend_args.rc; 2508 } 2509 } 2510 2511 flush_req = alloc_fs_request(channel); 2512 if (flush_req == NULL) { 2513 pthread_spin_unlock(&file->lock); 2514 return -ENOMEM; 2515 } 2516 2517 last = file->last; 2518 rem_length = length; 2519 cur_payload = payload; 2520 while (rem_length > 0) { 2521 copy = last->buf_size - last->bytes_filled; 2522 if (copy > rem_length) { 2523 copy = rem_length; 2524 } 2525 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2526 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2527 file->append_pos += copy; 2528 if (file->length < file->append_pos) { 2529 file->length = file->append_pos; 2530 } 2531 cur_payload += copy; 2532 last->bytes_filled += copy; 2533 rem_length -= copy; 2534 if (last->bytes_filled == last->buf_size) { 2535 cache_buffers_filled++; 2536 last = cache_append_buffer(file); 2537 if (last == NULL) { 2538 BLOBFS_TRACE(file, "nomem\n"); 2539 free_fs_request(flush_req); 2540 pthread_spin_unlock(&file->lock); 2541 return -ENOMEM; 2542 } 2543 } 2544 } 2545 2546 pthread_spin_unlock(&file->lock); 2547 2548 if (cache_buffers_filled == 0) { 2549 free_fs_request(flush_req); 2550 return 0; 2551 } 2552 2553 flush_req->args.file = file; 2554 file->fs->send_request(__file_flush, flush_req); 2555 return 0; 2556 } 2557 2558 static void 2559 __readahead_done(void *ctx, int bserrno) 2560 { 2561 struct spdk_fs_request *req = ctx; 2562 struct spdk_fs_cb_args *args = &req->args; 2563 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2564 struct spdk_file *file = args->file; 2565 2566 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2567 2568 pthread_spin_lock(&file->lock); 2569 cache_buffer->bytes_filled = args->op.readahead.length; 2570 cache_buffer->bytes_flushed = args->op.readahead.length; 2571 cache_buffer->in_progress = false; 2572 pthread_spin_unlock(&file->lock); 2573 2574 free_fs_request(req); 2575 } 2576 2577 static void 2578 __readahead(void *ctx) 2579 { 2580 struct spdk_fs_request *req = ctx; 2581 struct spdk_fs_cb_args *args = &req->args; 2582 struct spdk_file *file = args->file; 2583 uint64_t offset, length, start_lba, num_lba; 2584 uint32_t lba_size; 2585 2586 offset = args->op.readahead.offset; 2587 length = args->op.readahead.length; 2588 assert(length > 0); 2589 2590 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2591 2592 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2593 offset, length, start_lba, num_lba); 2594 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2595 args->op.readahead.cache_buffer->buf, 2596 start_lba, num_lba, __readahead_done, req); 2597 } 2598 2599 static uint64_t 2600 __next_cache_buffer_offset(uint64_t offset) 2601 { 2602 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2603 } 2604 2605 static void 2606 check_readahead(struct spdk_file *file, uint64_t offset, 2607 struct spdk_fs_channel *channel) 2608 { 2609 struct spdk_fs_request *req; 2610 struct spdk_fs_cb_args *args; 2611 2612 offset = __next_cache_buffer_offset(offset); 2613 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2614 return; 2615 } 2616 2617 req = alloc_fs_request(channel); 2618 if (req == NULL) { 2619 return; 2620 } 2621 args = &req->args; 2622 2623 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2624 2625 args->file = file; 2626 args->op.readahead.offset = offset; 2627 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2628 if (!args->op.readahead.cache_buffer) { 2629 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2630 free_fs_request(req); 2631 return; 2632 } 2633 2634 args->op.readahead.cache_buffer->in_progress = true; 2635 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2636 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2637 } else { 2638 args->op.readahead.length = CACHE_BUFFER_SIZE; 2639 } 2640 file->fs->send_request(__readahead, req); 2641 } 2642 2643 int64_t 2644 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2645 void *payload, uint64_t offset, uint64_t length) 2646 { 2647 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2648 uint64_t final_offset, final_length; 2649 uint32_t sub_reads = 0; 2650 struct cache_buffer *buf; 2651 uint64_t read_len; 2652 int rc = 0; 2653 2654 pthread_spin_lock(&file->lock); 2655 2656 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2657 2658 file->open_for_writing = false; 2659 2660 if (length == 0 || offset >= file->append_pos) { 2661 pthread_spin_unlock(&file->lock); 2662 return 0; 2663 } 2664 2665 if (offset + length > file->append_pos) { 2666 length = file->append_pos - offset; 2667 } 2668 2669 if (offset != file->next_seq_offset) { 2670 file->seq_byte_count = 0; 2671 } 2672 file->seq_byte_count += length; 2673 file->next_seq_offset = offset + length; 2674 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2675 check_readahead(file, offset, channel); 2676 check_readahead(file, offset + CACHE_BUFFER_SIZE, channel); 2677 } 2678 2679 final_length = 0; 2680 final_offset = offset + length; 2681 while (offset < final_offset) { 2682 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2683 if (length > (final_offset - offset)) { 2684 length = final_offset - offset; 2685 } 2686 2687 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2688 if (buf == NULL) { 2689 pthread_spin_unlock(&file->lock); 2690 rc = __send_rw_from_file(file, payload, offset, length, true, channel); 2691 pthread_spin_lock(&file->lock); 2692 if (rc == 0) { 2693 sub_reads++; 2694 } 2695 } else { 2696 read_len = length; 2697 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2698 read_len = buf->offset + buf->bytes_filled - offset; 2699 } 2700 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len); 2701 memcpy(payload, &buf->buf[offset - buf->offset], read_len); 2702 if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) { 2703 pthread_spin_lock(&g_caches_lock); 2704 spdk_tree_remove_buffer(file->tree, buf); 2705 if (file->tree->present_mask == 0) { 2706 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2707 } 2708 pthread_spin_unlock(&g_caches_lock); 2709 } 2710 } 2711 2712 if (rc == 0) { 2713 final_length += length; 2714 } else { 2715 break; 2716 } 2717 payload += length; 2718 offset += length; 2719 } 2720 pthread_spin_unlock(&file->lock); 2721 while (sub_reads > 0) { 2722 sem_wait(&channel->sem); 2723 sub_reads--; 2724 } 2725 if (rc == 0) { 2726 return final_length; 2727 } else { 2728 return rc; 2729 } 2730 } 2731 2732 static void 2733 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2734 spdk_file_op_complete cb_fn, void *cb_arg) 2735 { 2736 struct spdk_fs_request *sync_req; 2737 struct spdk_fs_request *flush_req; 2738 struct spdk_fs_cb_args *sync_args; 2739 struct spdk_fs_cb_args *flush_args; 2740 2741 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2742 2743 pthread_spin_lock(&file->lock); 2744 if (file->append_pos <= file->length_xattr) { 2745 BLOBFS_TRACE(file, "done - file already synced\n"); 2746 pthread_spin_unlock(&file->lock); 2747 cb_fn(cb_arg, 0); 2748 return; 2749 } 2750 2751 sync_req = alloc_fs_request(channel); 2752 if (!sync_req) { 2753 SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name); 2754 pthread_spin_unlock(&file->lock); 2755 cb_fn(cb_arg, -ENOMEM); 2756 return; 2757 } 2758 sync_args = &sync_req->args; 2759 2760 flush_req = alloc_fs_request(channel); 2761 if (!flush_req) { 2762 SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name); 2763 free_fs_request(sync_req); 2764 pthread_spin_unlock(&file->lock); 2765 cb_fn(cb_arg, -ENOMEM); 2766 return; 2767 } 2768 flush_args = &flush_req->args; 2769 2770 sync_args->file = file; 2771 sync_args->fn.file_op = cb_fn; 2772 sync_args->arg = cb_arg; 2773 sync_args->op.sync.offset = file->append_pos; 2774 sync_args->op.sync.xattr_in_progress = false; 2775 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2776 pthread_spin_unlock(&file->lock); 2777 2778 flush_args->file = file; 2779 channel->send_request(__file_flush, flush_req); 2780 } 2781 2782 int 2783 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2784 { 2785 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2786 struct spdk_fs_cb_args args = {}; 2787 2788 args.sem = &channel->sem; 2789 _file_sync(file, channel, __wake_caller, &args); 2790 sem_wait(&channel->sem); 2791 2792 return args.rc; 2793 } 2794 2795 void 2796 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2797 spdk_file_op_complete cb_fn, void *cb_arg) 2798 { 2799 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2800 2801 _file_sync(file, channel, cb_fn, cb_arg); 2802 } 2803 2804 void 2805 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2806 { 2807 BLOBFS_TRACE(file, "priority=%u\n", priority); 2808 file->priority = priority; 2809 2810 } 2811 2812 /* 2813 * Close routines 2814 */ 2815 2816 static void 2817 __file_close_async_done(void *ctx, int bserrno) 2818 { 2819 struct spdk_fs_request *req = ctx; 2820 struct spdk_fs_cb_args *args = &req->args; 2821 struct spdk_file *file = args->file; 2822 2823 spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name); 2824 2825 if (file->is_deleted) { 2826 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2827 return; 2828 } 2829 2830 args->fn.file_op(args->arg, bserrno); 2831 free_fs_request(req); 2832 } 2833 2834 static void 2835 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2836 { 2837 struct spdk_blob *blob; 2838 2839 pthread_spin_lock(&file->lock); 2840 if (file->ref_count == 0) { 2841 pthread_spin_unlock(&file->lock); 2842 __file_close_async_done(req, -EBADF); 2843 return; 2844 } 2845 2846 file->ref_count--; 2847 if (file->ref_count > 0) { 2848 pthread_spin_unlock(&file->lock); 2849 req->args.fn.file_op(req->args.arg, 0); 2850 free_fs_request(req); 2851 return; 2852 } 2853 2854 pthread_spin_unlock(&file->lock); 2855 2856 blob = file->blob; 2857 file->blob = NULL; 2858 spdk_blob_close(blob, __file_close_async_done, req); 2859 } 2860 2861 static void 2862 __file_close_async__sync_done(void *arg, int fserrno) 2863 { 2864 struct spdk_fs_request *req = arg; 2865 struct spdk_fs_cb_args *args = &req->args; 2866 2867 __file_close_async(args->file, req); 2868 } 2869 2870 void 2871 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2872 { 2873 struct spdk_fs_request *req; 2874 struct spdk_fs_cb_args *args; 2875 2876 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2877 if (req == NULL) { 2878 SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name); 2879 cb_fn(cb_arg, -ENOMEM); 2880 return; 2881 } 2882 2883 args = &req->args; 2884 args->file = file; 2885 args->fn.file_op = cb_fn; 2886 args->arg = cb_arg; 2887 2888 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2889 } 2890 2891 static void 2892 __file_close(void *arg) 2893 { 2894 struct spdk_fs_request *req = arg; 2895 struct spdk_fs_cb_args *args = &req->args; 2896 struct spdk_file *file = args->file; 2897 2898 __file_close_async(file, req); 2899 } 2900 2901 int 2902 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2903 { 2904 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2905 struct spdk_fs_request *req; 2906 struct spdk_fs_cb_args *args; 2907 2908 req = alloc_fs_request(channel); 2909 if (req == NULL) { 2910 SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name); 2911 return -ENOMEM; 2912 } 2913 2914 args = &req->args; 2915 2916 spdk_file_sync(file, ctx); 2917 BLOBFS_TRACE(file, "name=%s\n", file->name); 2918 args->file = file; 2919 args->sem = &channel->sem; 2920 args->fn.file_op = __wake_caller; 2921 args->arg = args; 2922 channel->send_request(__file_close, req); 2923 sem_wait(&channel->sem); 2924 2925 return args->rc; 2926 } 2927 2928 int 2929 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2930 { 2931 if (size < sizeof(spdk_blob_id)) { 2932 return -EINVAL; 2933 } 2934 2935 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2936 2937 return sizeof(spdk_blob_id); 2938 } 2939 2940 static void 2941 cache_free_buffers(struct spdk_file *file) 2942 { 2943 BLOBFS_TRACE(file, "free=%s\n", file->name); 2944 pthread_spin_lock(&file->lock); 2945 pthread_spin_lock(&g_caches_lock); 2946 if (file->tree->present_mask == 0) { 2947 pthread_spin_unlock(&g_caches_lock); 2948 pthread_spin_unlock(&file->lock); 2949 return; 2950 } 2951 spdk_tree_free_buffers(file->tree); 2952 2953 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2954 assert(file->tree->present_mask == 0); 2955 file->last = NULL; 2956 pthread_spin_unlock(&g_caches_lock); 2957 pthread_spin_unlock(&file->lock); 2958 } 2959 2960 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2961 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2962