1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "tree.h" 38 39 #include "spdk/queue.h" 40 #include "spdk/thread.h" 41 #include "spdk/assert.h" 42 #include "spdk/env.h" 43 #include "spdk/util.h" 44 #include "spdk/log.h" 45 #include "spdk/trace.h" 46 47 #define BLOBFS_TRACE(file, str, args...) \ 48 SPDK_DEBUGLOG(blobfs, "file=%s " str, file->name, ##args) 49 50 #define BLOBFS_TRACE_RW(file, str, args...) \ 51 SPDK_DEBUGLOG(blobfs_rw, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 55 56 #define SPDK_BLOBFS_SIGNATURE "BLOBFS" 57 58 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 59 static struct spdk_mempool *g_cache_pool; 60 static TAILQ_HEAD(, spdk_file) g_caches = TAILQ_HEAD_INITIALIZER(g_caches); 61 static struct spdk_poller *g_cache_pool_mgmt_poller; 62 static struct spdk_thread *g_cache_pool_thread; 63 #define BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US 1000ULL 64 static int g_fs_count = 0; 65 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 66 67 #define TRACE_GROUP_BLOBFS 0x7 68 #define TRACE_BLOBFS_XATTR_START SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0) 69 #define TRACE_BLOBFS_XATTR_END SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1) 70 #define TRACE_BLOBFS_OPEN SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2) 71 #define TRACE_BLOBFS_CLOSE SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3) 72 #define TRACE_BLOBFS_DELETE_START SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x4) 73 #define TRACE_BLOBFS_DELETE_DONE SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x5) 74 75 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS) 76 { 77 struct spdk_trace_tpoint_opts opts[] = { 78 { 79 "BLOBFS_XATTR_START", TRACE_BLOBFS_XATTR_START, 80 OWNER_NONE, OBJECT_NONE, 0, 81 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 82 }, 83 { 84 "BLOBFS_XATTR_END", TRACE_BLOBFS_XATTR_END, 85 OWNER_NONE, OBJECT_NONE, 0, 86 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 87 }, 88 { 89 "BLOBFS_OPEN", TRACE_BLOBFS_OPEN, 90 OWNER_NONE, OBJECT_NONE, 0, 91 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 92 }, 93 { 94 "BLOBFS_CLOSE", TRACE_BLOBFS_CLOSE, 95 OWNER_NONE, OBJECT_NONE, 0, 96 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 97 }, 98 { 99 "BLOBFS_DELETE_START", TRACE_BLOBFS_DELETE_START, 100 OWNER_NONE, OBJECT_NONE, 0, 101 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 102 }, 103 { 104 "BLOBFS_DELETE_DONE", TRACE_BLOBFS_DELETE_DONE, 105 OWNER_NONE, OBJECT_NONE, 0, 106 {{ "file", SPDK_TRACE_ARG_TYPE_STR, 40 }}, 107 } 108 }; 109 110 spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts)); 111 } 112 113 void 114 cache_buffer_free(struct cache_buffer *cache_buffer) 115 { 116 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 117 free(cache_buffer); 118 } 119 120 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 121 122 struct spdk_file { 123 struct spdk_filesystem *fs; 124 struct spdk_blob *blob; 125 char *name; 126 uint64_t length; 127 bool is_deleted; 128 bool open_for_writing; 129 uint64_t length_flushed; 130 uint64_t length_xattr; 131 uint64_t append_pos; 132 uint64_t seq_byte_count; 133 uint64_t next_seq_offset; 134 uint32_t priority; 135 TAILQ_ENTRY(spdk_file) tailq; 136 spdk_blob_id blobid; 137 uint32_t ref_count; 138 pthread_spinlock_t lock; 139 struct cache_buffer *last; 140 struct cache_tree *tree; 141 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 142 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 143 TAILQ_ENTRY(spdk_file) cache_tailq; 144 }; 145 146 struct spdk_deleted_file { 147 spdk_blob_id id; 148 TAILQ_ENTRY(spdk_deleted_file) tailq; 149 }; 150 151 struct spdk_filesystem { 152 struct spdk_blob_store *bs; 153 TAILQ_HEAD(, spdk_file) files; 154 struct spdk_bs_opts bs_opts; 155 struct spdk_bs_dev *bdev; 156 fs_send_request_fn send_request; 157 158 struct { 159 uint32_t max_ops; 160 struct spdk_io_channel *sync_io_channel; 161 struct spdk_fs_channel *sync_fs_channel; 162 } sync_target; 163 164 struct { 165 uint32_t max_ops; 166 struct spdk_io_channel *md_io_channel; 167 struct spdk_fs_channel *md_fs_channel; 168 } md_target; 169 170 struct { 171 uint32_t max_ops; 172 } io_target; 173 }; 174 175 struct spdk_fs_cb_args { 176 union { 177 spdk_fs_op_with_handle_complete fs_op_with_handle; 178 spdk_fs_op_complete fs_op; 179 spdk_file_op_with_handle_complete file_op_with_handle; 180 spdk_file_op_complete file_op; 181 spdk_file_stat_op_complete stat_op; 182 } fn; 183 void *arg; 184 sem_t *sem; 185 struct spdk_filesystem *fs; 186 struct spdk_file *file; 187 int rc; 188 int *rwerrno; 189 struct iovec *iovs; 190 uint32_t iovcnt; 191 struct iovec iov; 192 union { 193 struct { 194 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 195 } fs_load; 196 struct { 197 uint64_t length; 198 } truncate; 199 struct { 200 struct spdk_io_channel *channel; 201 void *pin_buf; 202 int is_read; 203 off_t offset; 204 size_t length; 205 uint64_t start_lba; 206 uint64_t num_lba; 207 uint32_t blocklen; 208 } rw; 209 struct { 210 const char *old_name; 211 const char *new_name; 212 } rename; 213 struct { 214 struct cache_buffer *cache_buffer; 215 uint64_t length; 216 } flush; 217 struct { 218 struct cache_buffer *cache_buffer; 219 uint64_t length; 220 uint64_t offset; 221 } readahead; 222 struct { 223 /* offset of the file when the sync request was made */ 224 uint64_t offset; 225 TAILQ_ENTRY(spdk_fs_request) tailq; 226 bool xattr_in_progress; 227 /* length written to the xattr for this file - this should 228 * always be the same as the offset if only one thread is 229 * writing to the file, but could differ if multiple threads 230 * are appending 231 */ 232 uint64_t length; 233 } sync; 234 struct { 235 uint32_t num_clusters; 236 } resize; 237 struct { 238 const char *name; 239 uint32_t flags; 240 TAILQ_ENTRY(spdk_fs_request) tailq; 241 } open; 242 struct { 243 const char *name; 244 struct spdk_blob *blob; 245 } create; 246 struct { 247 const char *name; 248 } delete; 249 struct { 250 const char *name; 251 } stat; 252 } op; 253 }; 254 255 static void file_free(struct spdk_file *file); 256 static void fs_io_device_unregister(struct spdk_filesystem *fs); 257 static void fs_free_io_channels(struct spdk_filesystem *fs); 258 259 void 260 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 261 { 262 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 263 } 264 265 static int _blobfs_cache_pool_reclaim(void *arg); 266 267 static bool 268 blobfs_cache_pool_need_reclaim(void) 269 { 270 size_t count; 271 272 count = spdk_mempool_count(g_cache_pool); 273 /* We define a aggressive policy here as the requirements from db_bench are batched, so start the poller 274 * when the number of available cache buffer is less than 1/5 of total buffers. 275 */ 276 if (count > (size_t)g_fs_cache_size / CACHE_BUFFER_SIZE / 5) { 277 return false; 278 } 279 280 return true; 281 } 282 283 static void 284 __start_cache_pool_mgmt(void *ctx) 285 { 286 assert(g_cache_pool == NULL); 287 288 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 289 g_fs_cache_size / CACHE_BUFFER_SIZE, 290 CACHE_BUFFER_SIZE, 291 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 292 SPDK_ENV_SOCKET_ID_ANY); 293 if (!g_cache_pool) { 294 SPDK_ERRLOG("Create mempool failed, you may " 295 "increase the memory and try again\n"); 296 assert(false); 297 } 298 299 assert(g_cache_pool_mgmt_poller == NULL); 300 g_cache_pool_mgmt_poller = SPDK_POLLER_REGISTER(_blobfs_cache_pool_reclaim, NULL, 301 BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 302 } 303 304 static void 305 __stop_cache_pool_mgmt(void *ctx) 306 { 307 spdk_poller_unregister(&g_cache_pool_mgmt_poller); 308 309 assert(g_cache_pool != NULL); 310 assert(spdk_mempool_count(g_cache_pool) == g_fs_cache_size / CACHE_BUFFER_SIZE); 311 spdk_mempool_free(g_cache_pool); 312 g_cache_pool = NULL; 313 314 spdk_thread_exit(g_cache_pool_thread); 315 } 316 317 static void 318 initialize_global_cache(void) 319 { 320 pthread_mutex_lock(&g_cache_init_lock); 321 if (g_fs_count == 0) { 322 g_cache_pool_thread = spdk_thread_create("cache_pool_mgmt", NULL); 323 assert(g_cache_pool_thread != NULL); 324 spdk_thread_send_msg(g_cache_pool_thread, __start_cache_pool_mgmt, NULL); 325 } 326 g_fs_count++; 327 pthread_mutex_unlock(&g_cache_init_lock); 328 } 329 330 static void 331 free_global_cache(void) 332 { 333 pthread_mutex_lock(&g_cache_init_lock); 334 g_fs_count--; 335 if (g_fs_count == 0) { 336 spdk_thread_send_msg(g_cache_pool_thread, __stop_cache_pool_mgmt, NULL); 337 } 338 pthread_mutex_unlock(&g_cache_init_lock); 339 } 340 341 static uint64_t 342 __file_get_blob_size(struct spdk_file *file) 343 { 344 uint64_t cluster_sz; 345 346 cluster_sz = file->fs->bs_opts.cluster_sz; 347 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 348 } 349 350 struct spdk_fs_request { 351 struct spdk_fs_cb_args args; 352 TAILQ_ENTRY(spdk_fs_request) link; 353 struct spdk_fs_channel *channel; 354 }; 355 356 struct spdk_fs_channel { 357 struct spdk_fs_request *req_mem; 358 TAILQ_HEAD(, spdk_fs_request) reqs; 359 sem_t sem; 360 struct spdk_filesystem *fs; 361 struct spdk_io_channel *bs_channel; 362 fs_send_request_fn send_request; 363 bool sync; 364 uint32_t outstanding_reqs; 365 pthread_spinlock_t lock; 366 }; 367 368 /* For now, this is effectively an alias. But eventually we'll shift 369 * some data members over. */ 370 struct spdk_fs_thread_ctx { 371 struct spdk_fs_channel ch; 372 }; 373 374 static struct spdk_fs_request * 375 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt) 376 { 377 struct spdk_fs_request *req; 378 struct iovec *iovs = NULL; 379 380 if (iovcnt > 1) { 381 iovs = calloc(iovcnt, sizeof(struct iovec)); 382 if (!iovs) { 383 return NULL; 384 } 385 } 386 387 if (channel->sync) { 388 pthread_spin_lock(&channel->lock); 389 } 390 391 req = TAILQ_FIRST(&channel->reqs); 392 if (req) { 393 channel->outstanding_reqs++; 394 TAILQ_REMOVE(&channel->reqs, req, link); 395 } 396 397 if (channel->sync) { 398 pthread_spin_unlock(&channel->lock); 399 } 400 401 if (req == NULL) { 402 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 403 free(iovs); 404 return NULL; 405 } 406 memset(req, 0, sizeof(*req)); 407 req->channel = channel; 408 if (iovcnt > 1) { 409 req->args.iovs = iovs; 410 } else { 411 req->args.iovs = &req->args.iov; 412 } 413 req->args.iovcnt = iovcnt; 414 415 return req; 416 } 417 418 static struct spdk_fs_request * 419 alloc_fs_request(struct spdk_fs_channel *channel) 420 { 421 return alloc_fs_request_with_iov(channel, 0); 422 } 423 424 static void 425 free_fs_request(struct spdk_fs_request *req) 426 { 427 struct spdk_fs_channel *channel = req->channel; 428 429 if (req->args.iovcnt > 1) { 430 free(req->args.iovs); 431 } 432 433 if (channel->sync) { 434 pthread_spin_lock(&channel->lock); 435 } 436 437 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 438 channel->outstanding_reqs--; 439 440 if (channel->sync) { 441 pthread_spin_unlock(&channel->lock); 442 } 443 } 444 445 static int 446 fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 447 uint32_t max_ops) 448 { 449 uint32_t i; 450 451 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 452 if (!channel->req_mem) { 453 return -1; 454 } 455 456 channel->outstanding_reqs = 0; 457 TAILQ_INIT(&channel->reqs); 458 sem_init(&channel->sem, 0, 0); 459 460 for (i = 0; i < max_ops; i++) { 461 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 462 } 463 464 channel->fs = fs; 465 466 return 0; 467 } 468 469 static int 470 fs_md_channel_create(void *io_device, void *ctx_buf) 471 { 472 struct spdk_filesystem *fs; 473 struct spdk_fs_channel *channel = ctx_buf; 474 475 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 476 477 return fs_channel_create(fs, channel, fs->md_target.max_ops); 478 } 479 480 static int 481 fs_sync_channel_create(void *io_device, void *ctx_buf) 482 { 483 struct spdk_filesystem *fs; 484 struct spdk_fs_channel *channel = ctx_buf; 485 486 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 487 488 return fs_channel_create(fs, channel, fs->sync_target.max_ops); 489 } 490 491 static int 492 fs_io_channel_create(void *io_device, void *ctx_buf) 493 { 494 struct spdk_filesystem *fs; 495 struct spdk_fs_channel *channel = ctx_buf; 496 497 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 498 499 return fs_channel_create(fs, channel, fs->io_target.max_ops); 500 } 501 502 static void 503 fs_channel_destroy(void *io_device, void *ctx_buf) 504 { 505 struct spdk_fs_channel *channel = ctx_buf; 506 507 if (channel->outstanding_reqs > 0) { 508 SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n", 509 channel->outstanding_reqs); 510 } 511 512 free(channel->req_mem); 513 if (channel->bs_channel != NULL) { 514 spdk_bs_free_io_channel(channel->bs_channel); 515 } 516 } 517 518 static void 519 __send_request_direct(fs_request_fn fn, void *arg) 520 { 521 fn(arg); 522 } 523 524 static void 525 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 526 { 527 fs->bs = bs; 528 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 529 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 530 fs->md_target.md_fs_channel->send_request = __send_request_direct; 531 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 532 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 533 534 initialize_global_cache(); 535 } 536 537 static void 538 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 539 { 540 struct spdk_fs_request *req = ctx; 541 struct spdk_fs_cb_args *args = &req->args; 542 struct spdk_filesystem *fs = args->fs; 543 544 if (bserrno == 0) { 545 common_fs_bs_init(fs, bs); 546 } else { 547 free(fs); 548 fs = NULL; 549 } 550 551 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 552 free_fs_request(req); 553 } 554 555 static struct spdk_filesystem * 556 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 557 { 558 struct spdk_filesystem *fs; 559 560 fs = calloc(1, sizeof(*fs)); 561 if (fs == NULL) { 562 return NULL; 563 } 564 565 fs->bdev = dev; 566 fs->send_request = send_request_fn; 567 TAILQ_INIT(&fs->files); 568 569 fs->md_target.max_ops = 512; 570 spdk_io_device_register(&fs->md_target, fs_md_channel_create, fs_channel_destroy, 571 sizeof(struct spdk_fs_channel), "blobfs_md"); 572 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 573 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 574 575 fs->sync_target.max_ops = 512; 576 spdk_io_device_register(&fs->sync_target, fs_sync_channel_create, fs_channel_destroy, 577 sizeof(struct spdk_fs_channel), "blobfs_sync"); 578 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 579 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 580 581 fs->io_target.max_ops = 512; 582 spdk_io_device_register(&fs->io_target, fs_io_channel_create, fs_channel_destroy, 583 sizeof(struct spdk_fs_channel), "blobfs_io"); 584 585 return fs; 586 } 587 588 static void 589 __wake_caller(void *arg, int fserrno) 590 { 591 struct spdk_fs_cb_args *args = arg; 592 593 if ((args->rwerrno != NULL) && (*(args->rwerrno) == 0) && fserrno) { 594 *(args->rwerrno) = fserrno; 595 } 596 args->rc = fserrno; 597 sem_post(args->sem); 598 } 599 600 void 601 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 602 fs_send_request_fn send_request_fn, 603 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 604 { 605 struct spdk_filesystem *fs; 606 struct spdk_fs_request *req; 607 struct spdk_fs_cb_args *args; 608 struct spdk_bs_opts opts = {}; 609 610 fs = fs_alloc(dev, send_request_fn); 611 if (fs == NULL) { 612 cb_fn(cb_arg, NULL, -ENOMEM); 613 return; 614 } 615 616 req = alloc_fs_request(fs->md_target.md_fs_channel); 617 if (req == NULL) { 618 fs_free_io_channels(fs); 619 fs_io_device_unregister(fs); 620 cb_fn(cb_arg, NULL, -ENOMEM); 621 return; 622 } 623 624 args = &req->args; 625 args->fn.fs_op_with_handle = cb_fn; 626 args->arg = cb_arg; 627 args->fs = fs; 628 629 spdk_bs_opts_init(&opts, sizeof(opts)); 630 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), SPDK_BLOBFS_SIGNATURE); 631 if (opt) { 632 opts.cluster_sz = opt->cluster_sz; 633 } 634 spdk_bs_init(dev, &opts, init_cb, req); 635 } 636 637 static struct spdk_file * 638 file_alloc(struct spdk_filesystem *fs) 639 { 640 struct spdk_file *file; 641 642 file = calloc(1, sizeof(*file)); 643 if (file == NULL) { 644 return NULL; 645 } 646 647 file->tree = calloc(1, sizeof(*file->tree)); 648 if (file->tree == NULL) { 649 free(file); 650 return NULL; 651 } 652 653 if (pthread_spin_init(&file->lock, 0)) { 654 free(file->tree); 655 free(file); 656 return NULL; 657 } 658 659 file->fs = fs; 660 TAILQ_INIT(&file->open_requests); 661 TAILQ_INIT(&file->sync_requests); 662 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 663 file->priority = SPDK_FILE_PRIORITY_LOW; 664 return file; 665 } 666 667 static void fs_load_done(void *ctx, int bserrno); 668 669 static int 670 _handle_deleted_files(struct spdk_fs_request *req) 671 { 672 struct spdk_fs_cb_args *args = &req->args; 673 struct spdk_filesystem *fs = args->fs; 674 675 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 676 struct spdk_deleted_file *deleted_file; 677 678 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 679 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 680 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 681 free(deleted_file); 682 return 0; 683 } 684 685 return 1; 686 } 687 688 static void 689 fs_load_done(void *ctx, int bserrno) 690 { 691 struct spdk_fs_request *req = ctx; 692 struct spdk_fs_cb_args *args = &req->args; 693 struct spdk_filesystem *fs = args->fs; 694 695 /* The filesystem has been loaded. Now check if there are any files that 696 * were marked for deletion before last unload. Do not complete the 697 * fs_load callback until all of them have been deleted on disk. 698 */ 699 if (_handle_deleted_files(req) == 0) { 700 /* We found a file that's been marked for deleting but not actually 701 * deleted yet. This function will get called again once the delete 702 * operation is completed. 703 */ 704 return; 705 } 706 707 args->fn.fs_op_with_handle(args->arg, fs, 0); 708 free_fs_request(req); 709 710 } 711 712 static void 713 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 714 { 715 struct spdk_fs_request *req = ctx; 716 struct spdk_fs_cb_args *args = &req->args; 717 struct spdk_filesystem *fs = args->fs; 718 uint64_t *length; 719 const char *name; 720 uint32_t *is_deleted; 721 size_t value_len; 722 723 if (rc < 0) { 724 args->fn.fs_op_with_handle(args->arg, fs, rc); 725 free_fs_request(req); 726 return; 727 } 728 729 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 730 if (rc < 0) { 731 args->fn.fs_op_with_handle(args->arg, fs, rc); 732 free_fs_request(req); 733 return; 734 } 735 736 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 737 if (rc < 0) { 738 args->fn.fs_op_with_handle(args->arg, fs, rc); 739 free_fs_request(req); 740 return; 741 } 742 743 assert(value_len == 8); 744 745 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 746 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 747 if (rc < 0) { 748 struct spdk_file *f; 749 750 f = file_alloc(fs); 751 if (f == NULL) { 752 SPDK_ERRLOG("Cannot allocate file to handle deleted file on disk\n"); 753 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 754 free_fs_request(req); 755 return; 756 } 757 758 f->name = strdup(name); 759 f->blobid = spdk_blob_get_id(blob); 760 f->length = *length; 761 f->length_flushed = *length; 762 f->length_xattr = *length; 763 f->append_pos = *length; 764 SPDK_DEBUGLOG(blobfs, "added file %s length=%ju\n", f->name, f->length); 765 } else { 766 struct spdk_deleted_file *deleted_file; 767 768 deleted_file = calloc(1, sizeof(*deleted_file)); 769 if (deleted_file == NULL) { 770 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 771 free_fs_request(req); 772 return; 773 } 774 deleted_file->id = spdk_blob_get_id(blob); 775 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 776 } 777 } 778 779 static void 780 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 781 { 782 struct spdk_fs_request *req = ctx; 783 struct spdk_fs_cb_args *args = &req->args; 784 struct spdk_filesystem *fs = args->fs; 785 struct spdk_bs_type bstype; 786 static const struct spdk_bs_type blobfs_type = {SPDK_BLOBFS_SIGNATURE}; 787 static const struct spdk_bs_type zeros; 788 789 if (bserrno != 0) { 790 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 791 free_fs_request(req); 792 fs_free_io_channels(fs); 793 fs_io_device_unregister(fs); 794 return; 795 } 796 797 bstype = spdk_bs_get_bstype(bs); 798 799 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 800 SPDK_DEBUGLOG(blobfs, "assigning bstype\n"); 801 spdk_bs_set_bstype(bs, blobfs_type); 802 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 803 SPDK_ERRLOG("not blobfs\n"); 804 SPDK_LOGDUMP(blobfs, "bstype", &bstype, sizeof(bstype)); 805 args->fn.fs_op_with_handle(args->arg, NULL, -EINVAL); 806 free_fs_request(req); 807 fs_free_io_channels(fs); 808 fs_io_device_unregister(fs); 809 return; 810 } 811 812 common_fs_bs_init(fs, bs); 813 fs_load_done(req, 0); 814 } 815 816 static void 817 fs_io_device_unregister(struct spdk_filesystem *fs) 818 { 819 assert(fs != NULL); 820 spdk_io_device_unregister(&fs->md_target, NULL); 821 spdk_io_device_unregister(&fs->sync_target, NULL); 822 spdk_io_device_unregister(&fs->io_target, NULL); 823 free(fs); 824 } 825 826 static void 827 fs_free_io_channels(struct spdk_filesystem *fs) 828 { 829 assert(fs != NULL); 830 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 831 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 832 } 833 834 void 835 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 836 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 837 { 838 struct spdk_filesystem *fs; 839 struct spdk_fs_cb_args *args; 840 struct spdk_fs_request *req; 841 struct spdk_bs_opts bs_opts; 842 843 fs = fs_alloc(dev, send_request_fn); 844 if (fs == NULL) { 845 cb_fn(cb_arg, NULL, -ENOMEM); 846 return; 847 } 848 849 req = alloc_fs_request(fs->md_target.md_fs_channel); 850 if (req == NULL) { 851 fs_free_io_channels(fs); 852 fs_io_device_unregister(fs); 853 cb_fn(cb_arg, NULL, -ENOMEM); 854 return; 855 } 856 857 args = &req->args; 858 args->fn.fs_op_with_handle = cb_fn; 859 args->arg = cb_arg; 860 args->fs = fs; 861 TAILQ_INIT(&args->op.fs_load.deleted_files); 862 spdk_bs_opts_init(&bs_opts, sizeof(bs_opts)); 863 bs_opts.iter_cb_fn = iter_cb; 864 bs_opts.iter_cb_arg = req; 865 spdk_bs_load(dev, &bs_opts, load_cb, req); 866 } 867 868 static void 869 unload_cb(void *ctx, int bserrno) 870 { 871 struct spdk_fs_request *req = ctx; 872 struct spdk_fs_cb_args *args = &req->args; 873 struct spdk_filesystem *fs = args->fs; 874 struct spdk_file *file, *tmp; 875 876 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 877 TAILQ_REMOVE(&fs->files, file, tailq); 878 file_free(file); 879 } 880 881 free_global_cache(); 882 883 args->fn.fs_op(args->arg, bserrno); 884 free(req); 885 886 fs_io_device_unregister(fs); 887 } 888 889 void 890 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 891 { 892 struct spdk_fs_request *req; 893 struct spdk_fs_cb_args *args; 894 895 /* 896 * We must free the md_channel before unloading the blobstore, so just 897 * allocate this request from the general heap. 898 */ 899 req = calloc(1, sizeof(*req)); 900 if (req == NULL) { 901 cb_fn(cb_arg, -ENOMEM); 902 return; 903 } 904 905 args = &req->args; 906 args->fn.fs_op = cb_fn; 907 args->arg = cb_arg; 908 args->fs = fs; 909 910 fs_free_io_channels(fs); 911 spdk_bs_unload(fs->bs, unload_cb, req); 912 } 913 914 static struct spdk_file * 915 fs_find_file(struct spdk_filesystem *fs, const char *name) 916 { 917 struct spdk_file *file; 918 919 TAILQ_FOREACH(file, &fs->files, tailq) { 920 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 921 return file; 922 } 923 } 924 925 return NULL; 926 } 927 928 void 929 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 930 spdk_file_stat_op_complete cb_fn, void *cb_arg) 931 { 932 struct spdk_file_stat stat; 933 struct spdk_file *f = NULL; 934 935 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 936 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 937 return; 938 } 939 940 f = fs_find_file(fs, name); 941 if (f != NULL) { 942 stat.blobid = f->blobid; 943 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 944 cb_fn(cb_arg, &stat, 0); 945 return; 946 } 947 948 cb_fn(cb_arg, NULL, -ENOENT); 949 } 950 951 static void 952 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 953 { 954 struct spdk_fs_request *req = arg; 955 struct spdk_fs_cb_args *args = &req->args; 956 957 args->rc = fserrno; 958 if (fserrno == 0) { 959 memcpy(args->arg, stat, sizeof(*stat)); 960 } 961 sem_post(args->sem); 962 } 963 964 static void 965 __file_stat(void *arg) 966 { 967 struct spdk_fs_request *req = arg; 968 struct spdk_fs_cb_args *args = &req->args; 969 970 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 971 args->fn.stat_op, req); 972 } 973 974 int 975 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 976 const char *name, struct spdk_file_stat *stat) 977 { 978 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 979 struct spdk_fs_request *req; 980 int rc; 981 982 req = alloc_fs_request(channel); 983 if (req == NULL) { 984 SPDK_ERRLOG("Cannot allocate stat req on file=%s\n", name); 985 return -ENOMEM; 986 } 987 988 req->args.fs = fs; 989 req->args.op.stat.name = name; 990 req->args.fn.stat_op = __copy_stat; 991 req->args.arg = stat; 992 req->args.sem = &channel->sem; 993 channel->send_request(__file_stat, req); 994 sem_wait(&channel->sem); 995 996 rc = req->args.rc; 997 free_fs_request(req); 998 999 return rc; 1000 } 1001 1002 static void 1003 fs_create_blob_close_cb(void *ctx, int bserrno) 1004 { 1005 int rc; 1006 struct spdk_fs_request *req = ctx; 1007 struct spdk_fs_cb_args *args = &req->args; 1008 1009 rc = args->rc ? args->rc : bserrno; 1010 args->fn.file_op(args->arg, rc); 1011 free_fs_request(req); 1012 } 1013 1014 static void 1015 fs_create_blob_resize_cb(void *ctx, int bserrno) 1016 { 1017 struct spdk_fs_request *req = ctx; 1018 struct spdk_fs_cb_args *args = &req->args; 1019 struct spdk_file *f = args->file; 1020 struct spdk_blob *blob = args->op.create.blob; 1021 uint64_t length = 0; 1022 1023 args->rc = bserrno; 1024 if (bserrno) { 1025 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1026 return; 1027 } 1028 1029 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 1030 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 1031 1032 spdk_blob_close(blob, fs_create_blob_close_cb, args); 1033 } 1034 1035 static void 1036 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1037 { 1038 struct spdk_fs_request *req = ctx; 1039 struct spdk_fs_cb_args *args = &req->args; 1040 1041 if (bserrno) { 1042 args->fn.file_op(args->arg, bserrno); 1043 free_fs_request(req); 1044 return; 1045 } 1046 1047 args->op.create.blob = blob; 1048 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 1049 } 1050 1051 static void 1052 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 1053 { 1054 struct spdk_fs_request *req = ctx; 1055 struct spdk_fs_cb_args *args = &req->args; 1056 struct spdk_file *f = args->file; 1057 1058 if (bserrno) { 1059 args->fn.file_op(args->arg, bserrno); 1060 free_fs_request(req); 1061 return; 1062 } 1063 1064 f->blobid = blobid; 1065 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 1066 } 1067 1068 void 1069 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 1070 spdk_file_op_complete cb_fn, void *cb_arg) 1071 { 1072 struct spdk_file *file; 1073 struct spdk_fs_request *req; 1074 struct spdk_fs_cb_args *args; 1075 1076 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1077 cb_fn(cb_arg, -ENAMETOOLONG); 1078 return; 1079 } 1080 1081 file = fs_find_file(fs, name); 1082 if (file != NULL) { 1083 cb_fn(cb_arg, -EEXIST); 1084 return; 1085 } 1086 1087 file = file_alloc(fs); 1088 if (file == NULL) { 1089 SPDK_ERRLOG("Cannot allocate new file for creation\n"); 1090 cb_fn(cb_arg, -ENOMEM); 1091 return; 1092 } 1093 1094 req = alloc_fs_request(fs->md_target.md_fs_channel); 1095 if (req == NULL) { 1096 SPDK_ERRLOG("Cannot allocate create async req for file=%s\n", name); 1097 TAILQ_REMOVE(&fs->files, file, tailq); 1098 file_free(file); 1099 cb_fn(cb_arg, -ENOMEM); 1100 return; 1101 } 1102 1103 args = &req->args; 1104 args->file = file; 1105 args->fn.file_op = cb_fn; 1106 args->arg = cb_arg; 1107 1108 file->name = strdup(name); 1109 if (!file->name) { 1110 SPDK_ERRLOG("Cannot allocate file->name for file=%s\n", name); 1111 free_fs_request(req); 1112 TAILQ_REMOVE(&fs->files, file, tailq); 1113 file_free(file); 1114 cb_fn(cb_arg, -ENOMEM); 1115 return; 1116 } 1117 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 1118 } 1119 1120 static void 1121 __fs_create_file_done(void *arg, int fserrno) 1122 { 1123 struct spdk_fs_request *req = arg; 1124 struct spdk_fs_cb_args *args = &req->args; 1125 1126 __wake_caller(args, fserrno); 1127 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name); 1128 } 1129 1130 static void 1131 __fs_create_file(void *arg) 1132 { 1133 struct spdk_fs_request *req = arg; 1134 struct spdk_fs_cb_args *args = &req->args; 1135 1136 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.create.name); 1137 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1138 } 1139 1140 int 1141 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name) 1142 { 1143 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1144 struct spdk_fs_request *req; 1145 struct spdk_fs_cb_args *args; 1146 int rc; 1147 1148 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1149 1150 req = alloc_fs_request(channel); 1151 if (req == NULL) { 1152 SPDK_ERRLOG("Cannot allocate req to create file=%s\n", name); 1153 return -ENOMEM; 1154 } 1155 1156 args = &req->args; 1157 args->fs = fs; 1158 args->op.create.name = name; 1159 args->sem = &channel->sem; 1160 fs->send_request(__fs_create_file, req); 1161 sem_wait(&channel->sem); 1162 rc = args->rc; 1163 free_fs_request(req); 1164 1165 return rc; 1166 } 1167 1168 static void 1169 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1170 { 1171 struct spdk_fs_request *req = ctx; 1172 struct spdk_fs_cb_args *args = &req->args; 1173 struct spdk_file *f = args->file; 1174 1175 f->blob = blob; 1176 while (!TAILQ_EMPTY(&f->open_requests)) { 1177 req = TAILQ_FIRST(&f->open_requests); 1178 args = &req->args; 1179 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1180 spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->name); 1181 args->fn.file_op_with_handle(args->arg, f, bserrno); 1182 free_fs_request(req); 1183 } 1184 } 1185 1186 static void 1187 fs_open_blob_create_cb(void *ctx, int bserrno) 1188 { 1189 struct spdk_fs_request *req = ctx; 1190 struct spdk_fs_cb_args *args = &req->args; 1191 struct spdk_file *file = args->file; 1192 struct spdk_filesystem *fs = args->fs; 1193 1194 if (file == NULL) { 1195 /* 1196 * This is from an open with CREATE flag - the file 1197 * is now created so look it up in the file list for this 1198 * filesystem. 1199 */ 1200 file = fs_find_file(fs, args->op.open.name); 1201 assert(file != NULL); 1202 args->file = file; 1203 } 1204 1205 file->ref_count++; 1206 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1207 if (file->ref_count == 1) { 1208 assert(file->blob == NULL); 1209 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1210 } else if (file->blob != NULL) { 1211 fs_open_blob_done(req, file->blob, 0); 1212 } else { 1213 /* 1214 * The blob open for this file is in progress due to a previous 1215 * open request. When that open completes, it will invoke the 1216 * open callback for this request. 1217 */ 1218 } 1219 } 1220 1221 void 1222 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1223 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1224 { 1225 struct spdk_file *f = NULL; 1226 struct spdk_fs_request *req; 1227 struct spdk_fs_cb_args *args; 1228 1229 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1230 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1231 return; 1232 } 1233 1234 f = fs_find_file(fs, name); 1235 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1236 cb_fn(cb_arg, NULL, -ENOENT); 1237 return; 1238 } 1239 1240 if (f != NULL && f->is_deleted == true) { 1241 cb_fn(cb_arg, NULL, -ENOENT); 1242 return; 1243 } 1244 1245 req = alloc_fs_request(fs->md_target.md_fs_channel); 1246 if (req == NULL) { 1247 SPDK_ERRLOG("Cannot allocate async open req for file=%s\n", name); 1248 cb_fn(cb_arg, NULL, -ENOMEM); 1249 return; 1250 } 1251 1252 args = &req->args; 1253 args->fn.file_op_with_handle = cb_fn; 1254 args->arg = cb_arg; 1255 args->file = f; 1256 args->fs = fs; 1257 args->op.open.name = name; 1258 1259 if (f == NULL) { 1260 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1261 } else { 1262 fs_open_blob_create_cb(req, 0); 1263 } 1264 } 1265 1266 static void 1267 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1268 { 1269 struct spdk_fs_request *req = arg; 1270 struct spdk_fs_cb_args *args = &req->args; 1271 1272 args->file = file; 1273 __wake_caller(args, bserrno); 1274 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name); 1275 } 1276 1277 static void 1278 __fs_open_file(void *arg) 1279 { 1280 struct spdk_fs_request *req = arg; 1281 struct spdk_fs_cb_args *args = &req->args; 1282 1283 SPDK_DEBUGLOG(blobfs, "file=%s\n", args->op.open.name); 1284 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1285 __fs_open_file_done, req); 1286 } 1287 1288 int 1289 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1290 const char *name, uint32_t flags, struct spdk_file **file) 1291 { 1292 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1293 struct spdk_fs_request *req; 1294 struct spdk_fs_cb_args *args; 1295 int rc; 1296 1297 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1298 1299 req = alloc_fs_request(channel); 1300 if (req == NULL) { 1301 SPDK_ERRLOG("Cannot allocate req for opening file=%s\n", name); 1302 return -ENOMEM; 1303 } 1304 1305 args = &req->args; 1306 args->fs = fs; 1307 args->op.open.name = name; 1308 args->op.open.flags = flags; 1309 args->sem = &channel->sem; 1310 fs->send_request(__fs_open_file, req); 1311 sem_wait(&channel->sem); 1312 rc = args->rc; 1313 if (rc == 0) { 1314 *file = args->file; 1315 } else { 1316 *file = NULL; 1317 } 1318 free_fs_request(req); 1319 1320 return rc; 1321 } 1322 1323 static void 1324 fs_rename_blob_close_cb(void *ctx, int bserrno) 1325 { 1326 struct spdk_fs_request *req = ctx; 1327 struct spdk_fs_cb_args *args = &req->args; 1328 1329 args->fn.fs_op(args->arg, bserrno); 1330 free_fs_request(req); 1331 } 1332 1333 static void 1334 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1335 { 1336 struct spdk_fs_request *req = ctx; 1337 struct spdk_fs_cb_args *args = &req->args; 1338 const char *new_name = args->op.rename.new_name; 1339 1340 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1341 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1342 } 1343 1344 static void 1345 _fs_md_rename_file(struct spdk_fs_request *req) 1346 { 1347 struct spdk_fs_cb_args *args = &req->args; 1348 struct spdk_file *f; 1349 1350 f = fs_find_file(args->fs, args->op.rename.old_name); 1351 if (f == NULL) { 1352 args->fn.fs_op(args->arg, -ENOENT); 1353 free_fs_request(req); 1354 return; 1355 } 1356 1357 free(f->name); 1358 f->name = strdup(args->op.rename.new_name); 1359 args->file = f; 1360 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1361 } 1362 1363 static void 1364 fs_rename_delete_done(void *arg, int fserrno) 1365 { 1366 _fs_md_rename_file(arg); 1367 } 1368 1369 void 1370 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1371 const char *old_name, const char *new_name, 1372 spdk_file_op_complete cb_fn, void *cb_arg) 1373 { 1374 struct spdk_file *f; 1375 struct spdk_fs_request *req; 1376 struct spdk_fs_cb_args *args; 1377 1378 SPDK_DEBUGLOG(blobfs, "old=%s new=%s\n", old_name, new_name); 1379 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1380 cb_fn(cb_arg, -ENAMETOOLONG); 1381 return; 1382 } 1383 1384 req = alloc_fs_request(fs->md_target.md_fs_channel); 1385 if (req == NULL) { 1386 SPDK_ERRLOG("Cannot allocate rename async req for renaming file from %s to %s\n", old_name, 1387 new_name); 1388 cb_fn(cb_arg, -ENOMEM); 1389 return; 1390 } 1391 1392 args = &req->args; 1393 args->fn.fs_op = cb_fn; 1394 args->fs = fs; 1395 args->arg = cb_arg; 1396 args->op.rename.old_name = old_name; 1397 args->op.rename.new_name = new_name; 1398 1399 f = fs_find_file(fs, new_name); 1400 if (f == NULL) { 1401 _fs_md_rename_file(req); 1402 return; 1403 } 1404 1405 /* 1406 * The rename overwrites an existing file. So delete the existing file, then 1407 * do the actual rename. 1408 */ 1409 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1410 } 1411 1412 static void 1413 __fs_rename_file_done(void *arg, int fserrno) 1414 { 1415 struct spdk_fs_request *req = arg; 1416 struct spdk_fs_cb_args *args = &req->args; 1417 1418 __wake_caller(args, fserrno); 1419 } 1420 1421 static void 1422 __fs_rename_file(void *arg) 1423 { 1424 struct spdk_fs_request *req = arg; 1425 struct spdk_fs_cb_args *args = &req->args; 1426 1427 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1428 __fs_rename_file_done, req); 1429 } 1430 1431 int 1432 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1433 const char *old_name, const char *new_name) 1434 { 1435 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1436 struct spdk_fs_request *req; 1437 struct spdk_fs_cb_args *args; 1438 int rc; 1439 1440 req = alloc_fs_request(channel); 1441 if (req == NULL) { 1442 SPDK_ERRLOG("Cannot allocate rename req for file=%s\n", old_name); 1443 return -ENOMEM; 1444 } 1445 1446 args = &req->args; 1447 1448 args->fs = fs; 1449 args->op.rename.old_name = old_name; 1450 args->op.rename.new_name = new_name; 1451 args->sem = &channel->sem; 1452 fs->send_request(__fs_rename_file, req); 1453 sem_wait(&channel->sem); 1454 rc = args->rc; 1455 free_fs_request(req); 1456 return rc; 1457 } 1458 1459 static void 1460 blob_delete_cb(void *ctx, int bserrno) 1461 { 1462 struct spdk_fs_request *req = ctx; 1463 struct spdk_fs_cb_args *args = &req->args; 1464 1465 args->fn.file_op(args->arg, bserrno); 1466 free_fs_request(req); 1467 } 1468 1469 void 1470 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1471 spdk_file_op_complete cb_fn, void *cb_arg) 1472 { 1473 struct spdk_file *f; 1474 spdk_blob_id blobid; 1475 struct spdk_fs_request *req; 1476 struct spdk_fs_cb_args *args; 1477 1478 SPDK_DEBUGLOG(blobfs, "file=%s\n", name); 1479 1480 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1481 cb_fn(cb_arg, -ENAMETOOLONG); 1482 return; 1483 } 1484 1485 f = fs_find_file(fs, name); 1486 if (f == NULL) { 1487 SPDK_ERRLOG("Cannot find the file=%s to deleted\n", name); 1488 cb_fn(cb_arg, -ENOENT); 1489 return; 1490 } 1491 1492 req = alloc_fs_request(fs->md_target.md_fs_channel); 1493 if (req == NULL) { 1494 SPDK_ERRLOG("Cannot allocate the req for the file=%s to deleted\n", name); 1495 cb_fn(cb_arg, -ENOMEM); 1496 return; 1497 } 1498 1499 args = &req->args; 1500 args->fn.file_op = cb_fn; 1501 args->arg = cb_arg; 1502 1503 if (f->ref_count > 0) { 1504 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1505 f->is_deleted = true; 1506 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1507 spdk_blob_sync_md(f->blob, blob_delete_cb, req); 1508 return; 1509 } 1510 1511 blobid = f->blobid; 1512 TAILQ_REMOVE(&fs->files, f, tailq); 1513 1514 file_free(f); 1515 1516 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1517 } 1518 1519 static void 1520 __fs_delete_file_done(void *arg, int fserrno) 1521 { 1522 struct spdk_fs_request *req = arg; 1523 struct spdk_fs_cb_args *args = &req->args; 1524 1525 spdk_trace_record(TRACE_BLOBFS_DELETE_DONE, 0, 0, 0, args->op.delete.name); 1526 __wake_caller(args, fserrno); 1527 } 1528 1529 static void 1530 __fs_delete_file(void *arg) 1531 { 1532 struct spdk_fs_request *req = arg; 1533 struct spdk_fs_cb_args *args = &req->args; 1534 1535 spdk_trace_record(TRACE_BLOBFS_DELETE_START, 0, 0, 0, args->op.delete.name); 1536 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1537 } 1538 1539 int 1540 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1541 const char *name) 1542 { 1543 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1544 struct spdk_fs_request *req; 1545 struct spdk_fs_cb_args *args; 1546 int rc; 1547 1548 req = alloc_fs_request(channel); 1549 if (req == NULL) { 1550 SPDK_DEBUGLOG(blobfs, "Cannot allocate req to delete file=%s\n", name); 1551 return -ENOMEM; 1552 } 1553 1554 args = &req->args; 1555 args->fs = fs; 1556 args->op.delete.name = name; 1557 args->sem = &channel->sem; 1558 fs->send_request(__fs_delete_file, req); 1559 sem_wait(&channel->sem); 1560 rc = args->rc; 1561 free_fs_request(req); 1562 1563 return rc; 1564 } 1565 1566 spdk_fs_iter 1567 spdk_fs_iter_first(struct spdk_filesystem *fs) 1568 { 1569 struct spdk_file *f; 1570 1571 f = TAILQ_FIRST(&fs->files); 1572 return f; 1573 } 1574 1575 spdk_fs_iter 1576 spdk_fs_iter_next(spdk_fs_iter iter) 1577 { 1578 struct spdk_file *f = iter; 1579 1580 if (f == NULL) { 1581 return NULL; 1582 } 1583 1584 f = TAILQ_NEXT(f, tailq); 1585 return f; 1586 } 1587 1588 const char * 1589 spdk_file_get_name(struct spdk_file *file) 1590 { 1591 return file->name; 1592 } 1593 1594 uint64_t 1595 spdk_file_get_length(struct spdk_file *file) 1596 { 1597 uint64_t length; 1598 1599 assert(file != NULL); 1600 1601 length = file->append_pos >= file->length ? file->append_pos : file->length; 1602 SPDK_DEBUGLOG(blobfs, "file=%s length=0x%jx\n", file->name, length); 1603 return length; 1604 } 1605 1606 static void 1607 fs_truncate_complete_cb(void *ctx, int bserrno) 1608 { 1609 struct spdk_fs_request *req = ctx; 1610 struct spdk_fs_cb_args *args = &req->args; 1611 1612 args->fn.file_op(args->arg, bserrno); 1613 free_fs_request(req); 1614 } 1615 1616 static void 1617 fs_truncate_resize_cb(void *ctx, int bserrno) 1618 { 1619 struct spdk_fs_request *req = ctx; 1620 struct spdk_fs_cb_args *args = &req->args; 1621 struct spdk_file *file = args->file; 1622 uint64_t *length = &args->op.truncate.length; 1623 1624 if (bserrno) { 1625 args->fn.file_op(args->arg, bserrno); 1626 free_fs_request(req); 1627 return; 1628 } 1629 1630 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1631 1632 file->length = *length; 1633 if (file->append_pos > file->length) { 1634 file->append_pos = file->length; 1635 } 1636 1637 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req); 1638 } 1639 1640 static uint64_t 1641 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1642 { 1643 return (length + cluster_sz - 1) / cluster_sz; 1644 } 1645 1646 void 1647 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1648 spdk_file_op_complete cb_fn, void *cb_arg) 1649 { 1650 struct spdk_filesystem *fs; 1651 size_t num_clusters; 1652 struct spdk_fs_request *req; 1653 struct spdk_fs_cb_args *args; 1654 1655 SPDK_DEBUGLOG(blobfs, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1656 if (length == file->length) { 1657 cb_fn(cb_arg, 0); 1658 return; 1659 } 1660 1661 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1662 if (req == NULL) { 1663 cb_fn(cb_arg, -ENOMEM); 1664 return; 1665 } 1666 1667 args = &req->args; 1668 args->fn.file_op = cb_fn; 1669 args->arg = cb_arg; 1670 args->file = file; 1671 args->op.truncate.length = length; 1672 fs = file->fs; 1673 1674 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1675 1676 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1677 } 1678 1679 static void 1680 __truncate(void *arg) 1681 { 1682 struct spdk_fs_request *req = arg; 1683 struct spdk_fs_cb_args *args = &req->args; 1684 1685 spdk_file_truncate_async(args->file, args->op.truncate.length, 1686 args->fn.file_op, args); 1687 } 1688 1689 int 1690 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 1691 uint64_t length) 1692 { 1693 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1694 struct spdk_fs_request *req; 1695 struct spdk_fs_cb_args *args; 1696 int rc; 1697 1698 req = alloc_fs_request(channel); 1699 if (req == NULL) { 1700 return -ENOMEM; 1701 } 1702 1703 args = &req->args; 1704 1705 args->file = file; 1706 args->op.truncate.length = length; 1707 args->fn.file_op = __wake_caller; 1708 args->sem = &channel->sem; 1709 1710 channel->send_request(__truncate, req); 1711 sem_wait(&channel->sem); 1712 rc = args->rc; 1713 free_fs_request(req); 1714 1715 return rc; 1716 } 1717 1718 static void 1719 __rw_done(void *ctx, int bserrno) 1720 { 1721 struct spdk_fs_request *req = ctx; 1722 struct spdk_fs_cb_args *args = &req->args; 1723 1724 spdk_free(args->op.rw.pin_buf); 1725 args->fn.file_op(args->arg, bserrno); 1726 free_fs_request(req); 1727 } 1728 1729 static void 1730 _copy_iovs_to_buf(void *buf, size_t buf_len, struct iovec *iovs, int iovcnt) 1731 { 1732 int i; 1733 size_t len; 1734 1735 for (i = 0; i < iovcnt; i++) { 1736 len = spdk_min(iovs[i].iov_len, buf_len); 1737 memcpy(buf, iovs[i].iov_base, len); 1738 buf += len; 1739 assert(buf_len >= len); 1740 buf_len -= len; 1741 } 1742 } 1743 1744 static void 1745 _copy_buf_to_iovs(struct iovec *iovs, int iovcnt, void *buf, size_t buf_len) 1746 { 1747 int i; 1748 size_t len; 1749 1750 for (i = 0; i < iovcnt; i++) { 1751 len = spdk_min(iovs[i].iov_len, buf_len); 1752 memcpy(iovs[i].iov_base, buf, len); 1753 buf += len; 1754 assert(buf_len >= len); 1755 buf_len -= len; 1756 } 1757 } 1758 1759 static void 1760 __read_done(void *ctx, int bserrno) 1761 { 1762 struct spdk_fs_request *req = ctx; 1763 struct spdk_fs_cb_args *args = &req->args; 1764 void *buf; 1765 1766 assert(req != NULL); 1767 buf = (void *)((uintptr_t)args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1))); 1768 if (args->op.rw.is_read) { 1769 _copy_buf_to_iovs(args->iovs, args->iovcnt, buf, args->op.rw.length); 1770 __rw_done(req, 0); 1771 } else { 1772 _copy_iovs_to_buf(buf, args->op.rw.length, args->iovs, args->iovcnt); 1773 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1774 args->op.rw.pin_buf, 1775 args->op.rw.start_lba, args->op.rw.num_lba, 1776 __rw_done, req); 1777 } 1778 } 1779 1780 static void 1781 __do_blob_read(void *ctx, int fserrno) 1782 { 1783 struct spdk_fs_request *req = ctx; 1784 struct spdk_fs_cb_args *args = &req->args; 1785 1786 if (fserrno) { 1787 __rw_done(req, fserrno); 1788 return; 1789 } 1790 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1791 args->op.rw.pin_buf, 1792 args->op.rw.start_lba, args->op.rw.num_lba, 1793 __read_done, req); 1794 } 1795 1796 static void 1797 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1798 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1799 { 1800 uint64_t end_lba; 1801 1802 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1803 *start_lba = offset / *lba_size; 1804 end_lba = (offset + length - 1) / *lba_size; 1805 *num_lba = (end_lba - *start_lba + 1); 1806 } 1807 1808 static bool 1809 __is_lba_aligned(struct spdk_file *file, uint64_t offset, uint64_t length) 1810 { 1811 uint32_t lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1812 1813 if ((offset % lba_size == 0) && (length % lba_size == 0)) { 1814 return true; 1815 } 1816 1817 return false; 1818 } 1819 1820 static void 1821 _fs_request_setup_iovs(struct spdk_fs_request *req, struct iovec *iovs, uint32_t iovcnt) 1822 { 1823 uint32_t i; 1824 1825 for (i = 0; i < iovcnt; i++) { 1826 req->args.iovs[i].iov_base = iovs[i].iov_base; 1827 req->args.iovs[i].iov_len = iovs[i].iov_len; 1828 } 1829 } 1830 1831 static void 1832 __readvwritev(struct spdk_file *file, struct spdk_io_channel *_channel, 1833 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1834 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1835 { 1836 struct spdk_fs_request *req; 1837 struct spdk_fs_cb_args *args; 1838 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1839 uint64_t start_lba, num_lba, pin_buf_length; 1840 uint32_t lba_size; 1841 1842 if (is_read && offset + length > file->length) { 1843 cb_fn(cb_arg, -EINVAL); 1844 return; 1845 } 1846 1847 req = alloc_fs_request_with_iov(channel, iovcnt); 1848 if (req == NULL) { 1849 cb_fn(cb_arg, -ENOMEM); 1850 return; 1851 } 1852 1853 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1854 1855 args = &req->args; 1856 args->fn.file_op = cb_fn; 1857 args->arg = cb_arg; 1858 args->file = file; 1859 args->op.rw.channel = channel->bs_channel; 1860 _fs_request_setup_iovs(req, iovs, iovcnt); 1861 args->op.rw.is_read = is_read; 1862 args->op.rw.offset = offset; 1863 args->op.rw.blocklen = lba_size; 1864 1865 pin_buf_length = num_lba * lba_size; 1866 args->op.rw.length = pin_buf_length; 1867 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL, 1868 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1869 if (args->op.rw.pin_buf == NULL) { 1870 SPDK_DEBUGLOG(blobfs, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1871 file->name, offset, length); 1872 free_fs_request(req); 1873 cb_fn(cb_arg, -ENOMEM); 1874 return; 1875 } 1876 1877 args->op.rw.start_lba = start_lba; 1878 args->op.rw.num_lba = num_lba; 1879 1880 if (!is_read && file->length < offset + length) { 1881 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1882 } else if (!is_read && __is_lba_aligned(file, offset, length)) { 1883 _copy_iovs_to_buf(args->op.rw.pin_buf, args->op.rw.length, args->iovs, args->iovcnt); 1884 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1885 args->op.rw.pin_buf, 1886 args->op.rw.start_lba, args->op.rw.num_lba, 1887 __rw_done, req); 1888 } else { 1889 __do_blob_read(req, 0); 1890 } 1891 } 1892 1893 static void 1894 __readwrite(struct spdk_file *file, struct spdk_io_channel *channel, 1895 void *payload, uint64_t offset, uint64_t length, 1896 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1897 { 1898 struct iovec iov; 1899 1900 iov.iov_base = payload; 1901 iov.iov_len = (size_t)length; 1902 1903 __readvwritev(file, channel, &iov, 1, offset, length, cb_fn, cb_arg, is_read); 1904 } 1905 1906 void 1907 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1908 void *payload, uint64_t offset, uint64_t length, 1909 spdk_file_op_complete cb_fn, void *cb_arg) 1910 { 1911 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1912 } 1913 1914 void 1915 spdk_file_writev_async(struct spdk_file *file, struct spdk_io_channel *channel, 1916 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1917 spdk_file_op_complete cb_fn, void *cb_arg) 1918 { 1919 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1920 file->name, offset, length); 1921 1922 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 0); 1923 } 1924 1925 void 1926 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1927 void *payload, uint64_t offset, uint64_t length, 1928 spdk_file_op_complete cb_fn, void *cb_arg) 1929 { 1930 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1931 file->name, offset, length); 1932 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1933 } 1934 1935 void 1936 spdk_file_readv_async(struct spdk_file *file, struct spdk_io_channel *channel, 1937 struct iovec *iovs, uint32_t iovcnt, uint64_t offset, uint64_t length, 1938 spdk_file_op_complete cb_fn, void *cb_arg) 1939 { 1940 SPDK_DEBUGLOG(blobfs, "file=%s offset=%jx length=%jx\n", 1941 file->name, offset, length); 1942 1943 __readvwritev(file, channel, iovs, iovcnt, offset, length, cb_fn, cb_arg, 1); 1944 } 1945 1946 struct spdk_io_channel * 1947 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1948 { 1949 struct spdk_io_channel *io_channel; 1950 struct spdk_fs_channel *fs_channel; 1951 1952 io_channel = spdk_get_io_channel(&fs->io_target); 1953 fs_channel = spdk_io_channel_get_ctx(io_channel); 1954 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1955 fs_channel->send_request = __send_request_direct; 1956 1957 return io_channel; 1958 } 1959 1960 void 1961 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1962 { 1963 spdk_put_io_channel(channel); 1964 } 1965 1966 struct spdk_fs_thread_ctx * 1967 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs) 1968 { 1969 struct spdk_fs_thread_ctx *ctx; 1970 1971 ctx = calloc(1, sizeof(*ctx)); 1972 if (!ctx) { 1973 return NULL; 1974 } 1975 1976 if (pthread_spin_init(&ctx->ch.lock, 0)) { 1977 free(ctx); 1978 return NULL; 1979 } 1980 1981 fs_channel_create(fs, &ctx->ch, 512); 1982 1983 ctx->ch.send_request = fs->send_request; 1984 ctx->ch.sync = 1; 1985 1986 return ctx; 1987 } 1988 1989 1990 void 1991 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx) 1992 { 1993 assert(ctx->ch.sync == 1); 1994 1995 while (true) { 1996 pthread_spin_lock(&ctx->ch.lock); 1997 if (ctx->ch.outstanding_reqs == 0) { 1998 pthread_spin_unlock(&ctx->ch.lock); 1999 break; 2000 } 2001 pthread_spin_unlock(&ctx->ch.lock); 2002 usleep(1000); 2003 } 2004 2005 fs_channel_destroy(NULL, &ctx->ch); 2006 free(ctx); 2007 } 2008 2009 int 2010 spdk_fs_set_cache_size(uint64_t size_in_mb) 2011 { 2012 /* setting g_fs_cache_size is only permitted if cache pool 2013 * is already freed or hasn't been initialized 2014 */ 2015 if (g_cache_pool != NULL) { 2016 return -EPERM; 2017 } 2018 2019 g_fs_cache_size = size_in_mb * 1024 * 1024; 2020 2021 return 0; 2022 } 2023 2024 uint64_t 2025 spdk_fs_get_cache_size(void) 2026 { 2027 return g_fs_cache_size / (1024 * 1024); 2028 } 2029 2030 static void __file_flush(void *ctx); 2031 2032 /* Try to free some cache buffers from this file. 2033 */ 2034 static int 2035 reclaim_cache_buffers(struct spdk_file *file) 2036 { 2037 int rc; 2038 2039 BLOBFS_TRACE(file, "free=%s\n", file->name); 2040 2041 /* The function is safe to be called with any threads, while the file 2042 * lock maybe locked by other thread for now, so try to get the file 2043 * lock here. 2044 */ 2045 rc = pthread_spin_trylock(&file->lock); 2046 if (rc != 0) { 2047 return -1; 2048 } 2049 2050 if (file->tree->present_mask == 0) { 2051 pthread_spin_unlock(&file->lock); 2052 return -1; 2053 } 2054 tree_free_buffers(file->tree); 2055 2056 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2057 /* If not freed, put it in the end of the queue */ 2058 if (file->tree->present_mask != 0) { 2059 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2060 } else { 2061 file->last = NULL; 2062 } 2063 pthread_spin_unlock(&file->lock); 2064 2065 return 0; 2066 } 2067 2068 static int 2069 _blobfs_cache_pool_reclaim(void *arg) 2070 { 2071 struct spdk_file *file, *tmp; 2072 int rc; 2073 2074 if (!blobfs_cache_pool_need_reclaim()) { 2075 return SPDK_POLLER_IDLE; 2076 } 2077 2078 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2079 if (!file->open_for_writing && 2080 file->priority == SPDK_FILE_PRIORITY_LOW) { 2081 rc = reclaim_cache_buffers(file); 2082 if (rc < 0) { 2083 continue; 2084 } 2085 if (!blobfs_cache_pool_need_reclaim()) { 2086 return SPDK_POLLER_BUSY; 2087 } 2088 break; 2089 } 2090 } 2091 2092 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2093 if (!file->open_for_writing) { 2094 rc = reclaim_cache_buffers(file); 2095 if (rc < 0) { 2096 continue; 2097 } 2098 if (!blobfs_cache_pool_need_reclaim()) { 2099 return SPDK_POLLER_BUSY; 2100 } 2101 break; 2102 } 2103 } 2104 2105 TAILQ_FOREACH_SAFE(file, &g_caches, cache_tailq, tmp) { 2106 rc = reclaim_cache_buffers(file); 2107 if (rc < 0) { 2108 continue; 2109 } 2110 break; 2111 } 2112 2113 return SPDK_POLLER_BUSY; 2114 } 2115 2116 static void 2117 _add_file_to_cache_pool(void *ctx) 2118 { 2119 struct spdk_file *file = ctx; 2120 2121 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2122 } 2123 2124 static void 2125 _remove_file_from_cache_pool(void *ctx) 2126 { 2127 struct spdk_file *file = ctx; 2128 2129 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2130 } 2131 2132 static struct cache_buffer * 2133 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 2134 { 2135 struct cache_buffer *buf; 2136 int count = 0; 2137 bool need_update = false; 2138 2139 buf = calloc(1, sizeof(*buf)); 2140 if (buf == NULL) { 2141 SPDK_DEBUGLOG(blobfs, "calloc failed\n"); 2142 return NULL; 2143 } 2144 2145 do { 2146 buf->buf = spdk_mempool_get(g_cache_pool); 2147 if (buf->buf) { 2148 break; 2149 } 2150 if (count++ == 100) { 2151 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 2152 file, offset); 2153 free(buf); 2154 return NULL; 2155 } 2156 usleep(BLOBFS_CACHE_POOL_POLL_PERIOD_IN_US); 2157 } while (true); 2158 2159 buf->buf_size = CACHE_BUFFER_SIZE; 2160 buf->offset = offset; 2161 2162 if (file->tree->present_mask == 0) { 2163 need_update = true; 2164 } 2165 file->tree = tree_insert_buffer(file->tree, buf); 2166 2167 if (need_update) { 2168 spdk_thread_send_msg(g_cache_pool_thread, _add_file_to_cache_pool, file); 2169 } 2170 2171 return buf; 2172 } 2173 2174 static struct cache_buffer * 2175 cache_append_buffer(struct spdk_file *file) 2176 { 2177 struct cache_buffer *last; 2178 2179 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 2180 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 2181 2182 last = cache_insert_buffer(file, file->append_pos); 2183 if (last == NULL) { 2184 SPDK_DEBUGLOG(blobfs, "cache_insert_buffer failed\n"); 2185 return NULL; 2186 } 2187 2188 file->last = last; 2189 2190 return last; 2191 } 2192 2193 static void __check_sync_reqs(struct spdk_file *file); 2194 2195 static void 2196 __file_cache_finish_sync(void *ctx, int bserrno) 2197 { 2198 struct spdk_file *file; 2199 struct spdk_fs_request *sync_req = ctx; 2200 struct spdk_fs_cb_args *sync_args; 2201 2202 sync_args = &sync_req->args; 2203 file = sync_args->file; 2204 pthread_spin_lock(&file->lock); 2205 file->length_xattr = sync_args->op.sync.length; 2206 assert(sync_args->op.sync.offset <= file->length_flushed); 2207 spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset, 2208 0, file->name); 2209 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 2210 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 2211 pthread_spin_unlock(&file->lock); 2212 2213 sync_args->fn.file_op(sync_args->arg, bserrno); 2214 2215 free_fs_request(sync_req); 2216 __check_sync_reqs(file); 2217 } 2218 2219 static void 2220 __check_sync_reqs(struct spdk_file *file) 2221 { 2222 struct spdk_fs_request *sync_req; 2223 2224 pthread_spin_lock(&file->lock); 2225 2226 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 2227 if (sync_req->args.op.sync.offset <= file->length_flushed) { 2228 break; 2229 } 2230 } 2231 2232 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 2233 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 2234 sync_req->args.op.sync.xattr_in_progress = true; 2235 sync_req->args.op.sync.length = file->length_flushed; 2236 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 2237 sizeof(file->length_flushed)); 2238 2239 pthread_spin_unlock(&file->lock); 2240 spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed, 2241 0, file->name); 2242 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, sync_req); 2243 } else { 2244 pthread_spin_unlock(&file->lock); 2245 } 2246 } 2247 2248 static void 2249 __file_flush_done(void *ctx, int bserrno) 2250 { 2251 struct spdk_fs_request *req = ctx; 2252 struct spdk_fs_cb_args *args = &req->args; 2253 struct spdk_file *file = args->file; 2254 struct cache_buffer *next = args->op.flush.cache_buffer; 2255 2256 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 2257 2258 pthread_spin_lock(&file->lock); 2259 next->in_progress = false; 2260 next->bytes_flushed += args->op.flush.length; 2261 file->length_flushed += args->op.flush.length; 2262 if (file->length_flushed > file->length) { 2263 file->length = file->length_flushed; 2264 } 2265 if (next->bytes_flushed == next->buf_size) { 2266 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 2267 next = tree_find_buffer(file->tree, file->length_flushed); 2268 } 2269 2270 /* 2271 * Assert that there is no cached data that extends past the end of the underlying 2272 * blob. 2273 */ 2274 assert(next == NULL || next->offset < __file_get_blob_size(file) || 2275 next->bytes_filled == 0); 2276 2277 pthread_spin_unlock(&file->lock); 2278 2279 __check_sync_reqs(file); 2280 2281 __file_flush(req); 2282 } 2283 2284 static void 2285 __file_flush(void *ctx) 2286 { 2287 struct spdk_fs_request *req = ctx; 2288 struct spdk_fs_cb_args *args = &req->args; 2289 struct spdk_file *file = args->file; 2290 struct cache_buffer *next; 2291 uint64_t offset, length, start_lba, num_lba; 2292 uint32_t lba_size; 2293 2294 pthread_spin_lock(&file->lock); 2295 next = tree_find_buffer(file->tree, file->length_flushed); 2296 if (next == NULL || next->in_progress || 2297 ((next->bytes_filled < next->buf_size) && TAILQ_EMPTY(&file->sync_requests))) { 2298 /* 2299 * There is either no data to flush, a flush I/O is already in 2300 * progress, or the next buffer is partially filled but there's no 2301 * outstanding request to sync it. 2302 * So return immediately - if a flush I/O is in progress we will flush 2303 * more data after that is completed, or a partial buffer will get flushed 2304 * when it is either filled or the file is synced. 2305 */ 2306 free_fs_request(req); 2307 if (next == NULL) { 2308 /* 2309 * For cases where a file's cache was evicted, and then the 2310 * file was later appended, we will write the data directly 2311 * to disk and bypass cache. So just update length_flushed 2312 * here to reflect that all data was already written to disk. 2313 */ 2314 file->length_flushed = file->append_pos; 2315 } 2316 pthread_spin_unlock(&file->lock); 2317 if (next == NULL) { 2318 /* 2319 * There is no data to flush, but we still need to check for any 2320 * outstanding sync requests to make sure metadata gets updated. 2321 */ 2322 __check_sync_reqs(file); 2323 } 2324 return; 2325 } 2326 2327 offset = next->offset + next->bytes_flushed; 2328 length = next->bytes_filled - next->bytes_flushed; 2329 if (length == 0) { 2330 free_fs_request(req); 2331 pthread_spin_unlock(&file->lock); 2332 /* 2333 * There is no data to flush, but we still need to check for any 2334 * outstanding sync requests to make sure metadata gets updated. 2335 */ 2336 __check_sync_reqs(file); 2337 return; 2338 } 2339 args->op.flush.length = length; 2340 args->op.flush.cache_buffer = next; 2341 2342 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2343 2344 next->in_progress = true; 2345 BLOBFS_TRACE(file, "offset=0x%jx length=0x%jx page start=0x%jx num=0x%jx\n", 2346 offset, length, start_lba, num_lba); 2347 pthread_spin_unlock(&file->lock); 2348 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2349 next->buf + (start_lba * lba_size) - next->offset, 2350 start_lba, num_lba, __file_flush_done, req); 2351 } 2352 2353 static void 2354 __file_extend_done(void *arg, int bserrno) 2355 { 2356 struct spdk_fs_cb_args *args = arg; 2357 2358 __wake_caller(args, bserrno); 2359 } 2360 2361 static void 2362 __file_extend_resize_cb(void *_args, int bserrno) 2363 { 2364 struct spdk_fs_cb_args *args = _args; 2365 struct spdk_file *file = args->file; 2366 2367 if (bserrno) { 2368 __wake_caller(args, bserrno); 2369 return; 2370 } 2371 2372 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2373 } 2374 2375 static void 2376 __file_extend_blob(void *_args) 2377 { 2378 struct spdk_fs_cb_args *args = _args; 2379 struct spdk_file *file = args->file; 2380 2381 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2382 } 2383 2384 static void 2385 __rw_from_file_done(void *ctx, int bserrno) 2386 { 2387 struct spdk_fs_request *req = ctx; 2388 2389 __wake_caller(&req->args, bserrno); 2390 free_fs_request(req); 2391 } 2392 2393 static void 2394 __rw_from_file(void *ctx) 2395 { 2396 struct spdk_fs_request *req = ctx; 2397 struct spdk_fs_cb_args *args = &req->args; 2398 struct spdk_file *file = args->file; 2399 2400 if (args->op.rw.is_read) { 2401 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2402 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2403 __rw_from_file_done, req); 2404 } else { 2405 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2406 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2407 __rw_from_file_done, req); 2408 } 2409 } 2410 2411 struct rw_from_file_arg { 2412 struct spdk_fs_channel *channel; 2413 int rwerrno; 2414 }; 2415 2416 static int 2417 __send_rw_from_file(struct spdk_file *file, void *payload, 2418 uint64_t offset, uint64_t length, bool is_read, 2419 struct rw_from_file_arg *arg) 2420 { 2421 struct spdk_fs_request *req; 2422 struct spdk_fs_cb_args *args; 2423 2424 req = alloc_fs_request_with_iov(arg->channel, 1); 2425 if (req == NULL) { 2426 sem_post(&arg->channel->sem); 2427 return -ENOMEM; 2428 } 2429 2430 args = &req->args; 2431 args->file = file; 2432 args->sem = &arg->channel->sem; 2433 args->iovs[0].iov_base = payload; 2434 args->iovs[0].iov_len = (size_t)length; 2435 args->op.rw.offset = offset; 2436 args->op.rw.is_read = is_read; 2437 args->rwerrno = &arg->rwerrno; 2438 file->fs->send_request(__rw_from_file, req); 2439 return 0; 2440 } 2441 2442 int 2443 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2444 void *payload, uint64_t offset, uint64_t length) 2445 { 2446 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2447 struct spdk_fs_request *flush_req; 2448 uint64_t rem_length, copy, blob_size, cluster_sz; 2449 uint32_t cache_buffers_filled = 0; 2450 uint8_t *cur_payload; 2451 struct cache_buffer *last; 2452 2453 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2454 2455 if (length == 0) { 2456 return 0; 2457 } 2458 2459 if (offset != file->append_pos) { 2460 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2461 return -EINVAL; 2462 } 2463 2464 pthread_spin_lock(&file->lock); 2465 file->open_for_writing = true; 2466 2467 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2468 cache_append_buffer(file); 2469 } 2470 2471 if (file->last == NULL) { 2472 struct rw_from_file_arg arg = {}; 2473 int rc; 2474 2475 arg.channel = channel; 2476 arg.rwerrno = 0; 2477 file->append_pos += length; 2478 pthread_spin_unlock(&file->lock); 2479 rc = __send_rw_from_file(file, payload, offset, length, false, &arg); 2480 if (rc != 0) { 2481 return rc; 2482 } 2483 sem_wait(&channel->sem); 2484 return arg.rwerrno; 2485 } 2486 2487 blob_size = __file_get_blob_size(file); 2488 2489 if ((offset + length) > blob_size) { 2490 struct spdk_fs_cb_args extend_args = {}; 2491 2492 cluster_sz = file->fs->bs_opts.cluster_sz; 2493 extend_args.sem = &channel->sem; 2494 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2495 extend_args.file = file; 2496 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2497 pthread_spin_unlock(&file->lock); 2498 file->fs->send_request(__file_extend_blob, &extend_args); 2499 sem_wait(&channel->sem); 2500 if (extend_args.rc) { 2501 return extend_args.rc; 2502 } 2503 } 2504 2505 flush_req = alloc_fs_request(channel); 2506 if (flush_req == NULL) { 2507 pthread_spin_unlock(&file->lock); 2508 return -ENOMEM; 2509 } 2510 2511 last = file->last; 2512 rem_length = length; 2513 cur_payload = payload; 2514 while (rem_length > 0) { 2515 copy = last->buf_size - last->bytes_filled; 2516 if (copy > rem_length) { 2517 copy = rem_length; 2518 } 2519 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2520 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2521 file->append_pos += copy; 2522 if (file->length < file->append_pos) { 2523 file->length = file->append_pos; 2524 } 2525 cur_payload += copy; 2526 last->bytes_filled += copy; 2527 rem_length -= copy; 2528 if (last->bytes_filled == last->buf_size) { 2529 cache_buffers_filled++; 2530 last = cache_append_buffer(file); 2531 if (last == NULL) { 2532 BLOBFS_TRACE(file, "nomem\n"); 2533 free_fs_request(flush_req); 2534 pthread_spin_unlock(&file->lock); 2535 return -ENOMEM; 2536 } 2537 } 2538 } 2539 2540 pthread_spin_unlock(&file->lock); 2541 2542 if (cache_buffers_filled == 0) { 2543 free_fs_request(flush_req); 2544 return 0; 2545 } 2546 2547 flush_req->args.file = file; 2548 file->fs->send_request(__file_flush, flush_req); 2549 return 0; 2550 } 2551 2552 static void 2553 __readahead_done(void *ctx, int bserrno) 2554 { 2555 struct spdk_fs_request *req = ctx; 2556 struct spdk_fs_cb_args *args = &req->args; 2557 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2558 struct spdk_file *file = args->file; 2559 2560 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2561 2562 pthread_spin_lock(&file->lock); 2563 cache_buffer->bytes_filled = args->op.readahead.length; 2564 cache_buffer->bytes_flushed = args->op.readahead.length; 2565 cache_buffer->in_progress = false; 2566 pthread_spin_unlock(&file->lock); 2567 2568 free_fs_request(req); 2569 } 2570 2571 static void 2572 __readahead(void *ctx) 2573 { 2574 struct spdk_fs_request *req = ctx; 2575 struct spdk_fs_cb_args *args = &req->args; 2576 struct spdk_file *file = args->file; 2577 uint64_t offset, length, start_lba, num_lba; 2578 uint32_t lba_size; 2579 2580 offset = args->op.readahead.offset; 2581 length = args->op.readahead.length; 2582 assert(length > 0); 2583 2584 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2585 2586 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2587 offset, length, start_lba, num_lba); 2588 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2589 args->op.readahead.cache_buffer->buf, 2590 start_lba, num_lba, __readahead_done, req); 2591 } 2592 2593 static uint64_t 2594 __next_cache_buffer_offset(uint64_t offset) 2595 { 2596 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2597 } 2598 2599 static void 2600 check_readahead(struct spdk_file *file, uint64_t offset, 2601 struct spdk_fs_channel *channel) 2602 { 2603 struct spdk_fs_request *req; 2604 struct spdk_fs_cb_args *args; 2605 2606 offset = __next_cache_buffer_offset(offset); 2607 if (tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2608 return; 2609 } 2610 2611 req = alloc_fs_request(channel); 2612 if (req == NULL) { 2613 return; 2614 } 2615 args = &req->args; 2616 2617 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2618 2619 args->file = file; 2620 args->op.readahead.offset = offset; 2621 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2622 if (!args->op.readahead.cache_buffer) { 2623 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2624 free_fs_request(req); 2625 return; 2626 } 2627 2628 args->op.readahead.cache_buffer->in_progress = true; 2629 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2630 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2631 } else { 2632 args->op.readahead.length = CACHE_BUFFER_SIZE; 2633 } 2634 file->fs->send_request(__readahead, req); 2635 } 2636 2637 int64_t 2638 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2639 void *payload, uint64_t offset, uint64_t length) 2640 { 2641 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2642 uint64_t final_offset, final_length; 2643 uint32_t sub_reads = 0; 2644 struct cache_buffer *buf; 2645 uint64_t read_len; 2646 struct rw_from_file_arg arg = {}; 2647 2648 pthread_spin_lock(&file->lock); 2649 2650 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2651 2652 file->open_for_writing = false; 2653 2654 if (length == 0 || offset >= file->append_pos) { 2655 pthread_spin_unlock(&file->lock); 2656 return 0; 2657 } 2658 2659 if (offset + length > file->append_pos) { 2660 length = file->append_pos - offset; 2661 } 2662 2663 if (offset != file->next_seq_offset) { 2664 file->seq_byte_count = 0; 2665 } 2666 file->seq_byte_count += length; 2667 file->next_seq_offset = offset + length; 2668 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2669 check_readahead(file, offset, channel); 2670 check_readahead(file, offset + CACHE_BUFFER_SIZE, channel); 2671 } 2672 2673 arg.channel = channel; 2674 arg.rwerrno = 0; 2675 final_length = 0; 2676 final_offset = offset + length; 2677 while (offset < final_offset) { 2678 int ret = 0; 2679 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2680 if (length > (final_offset - offset)) { 2681 length = final_offset - offset; 2682 } 2683 2684 buf = tree_find_filled_buffer(file->tree, offset); 2685 if (buf == NULL) { 2686 pthread_spin_unlock(&file->lock); 2687 ret = __send_rw_from_file(file, payload, offset, length, true, &arg); 2688 pthread_spin_lock(&file->lock); 2689 if (ret == 0) { 2690 sub_reads++; 2691 } 2692 } else { 2693 read_len = length; 2694 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2695 read_len = buf->offset + buf->bytes_filled - offset; 2696 } 2697 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, read_len); 2698 memcpy(payload, &buf->buf[offset - buf->offset], read_len); 2699 if ((offset + read_len) % CACHE_BUFFER_SIZE == 0) { 2700 tree_remove_buffer(file->tree, buf); 2701 if (file->tree->present_mask == 0) { 2702 spdk_thread_send_msg(g_cache_pool_thread, _remove_file_from_cache_pool, file); 2703 } 2704 } 2705 } 2706 2707 if (ret == 0) { 2708 final_length += length; 2709 } else { 2710 arg.rwerrno = ret; 2711 break; 2712 } 2713 payload += length; 2714 offset += length; 2715 } 2716 pthread_spin_unlock(&file->lock); 2717 while (sub_reads > 0) { 2718 sem_wait(&channel->sem); 2719 sub_reads--; 2720 } 2721 if (arg.rwerrno == 0) { 2722 return final_length; 2723 } else { 2724 return arg.rwerrno; 2725 } 2726 } 2727 2728 static void 2729 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2730 spdk_file_op_complete cb_fn, void *cb_arg) 2731 { 2732 struct spdk_fs_request *sync_req; 2733 struct spdk_fs_request *flush_req; 2734 struct spdk_fs_cb_args *sync_args; 2735 struct spdk_fs_cb_args *flush_args; 2736 2737 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2738 2739 pthread_spin_lock(&file->lock); 2740 if (file->append_pos <= file->length_xattr) { 2741 BLOBFS_TRACE(file, "done - file already synced\n"); 2742 pthread_spin_unlock(&file->lock); 2743 cb_fn(cb_arg, 0); 2744 return; 2745 } 2746 2747 sync_req = alloc_fs_request(channel); 2748 if (!sync_req) { 2749 SPDK_ERRLOG("Cannot allocate sync req for file=%s\n", file->name); 2750 pthread_spin_unlock(&file->lock); 2751 cb_fn(cb_arg, -ENOMEM); 2752 return; 2753 } 2754 sync_args = &sync_req->args; 2755 2756 flush_req = alloc_fs_request(channel); 2757 if (!flush_req) { 2758 SPDK_ERRLOG("Cannot allocate flush req for file=%s\n", file->name); 2759 free_fs_request(sync_req); 2760 pthread_spin_unlock(&file->lock); 2761 cb_fn(cb_arg, -ENOMEM); 2762 return; 2763 } 2764 flush_args = &flush_req->args; 2765 2766 sync_args->file = file; 2767 sync_args->fn.file_op = cb_fn; 2768 sync_args->arg = cb_arg; 2769 sync_args->op.sync.offset = file->append_pos; 2770 sync_args->op.sync.xattr_in_progress = false; 2771 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2772 pthread_spin_unlock(&file->lock); 2773 2774 flush_args->file = file; 2775 channel->send_request(__file_flush, flush_req); 2776 } 2777 2778 int 2779 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2780 { 2781 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2782 struct spdk_fs_cb_args args = {}; 2783 2784 args.sem = &channel->sem; 2785 _file_sync(file, channel, __wake_caller, &args); 2786 sem_wait(&channel->sem); 2787 2788 return args.rc; 2789 } 2790 2791 void 2792 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2793 spdk_file_op_complete cb_fn, void *cb_arg) 2794 { 2795 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2796 2797 _file_sync(file, channel, cb_fn, cb_arg); 2798 } 2799 2800 void 2801 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2802 { 2803 BLOBFS_TRACE(file, "priority=%u\n", priority); 2804 file->priority = priority; 2805 2806 } 2807 2808 /* 2809 * Close routines 2810 */ 2811 2812 static void 2813 __file_close_async_done(void *ctx, int bserrno) 2814 { 2815 struct spdk_fs_request *req = ctx; 2816 struct spdk_fs_cb_args *args = &req->args; 2817 struct spdk_file *file = args->file; 2818 2819 spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->name); 2820 2821 if (file->is_deleted) { 2822 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2823 return; 2824 } 2825 2826 args->fn.file_op(args->arg, bserrno); 2827 free_fs_request(req); 2828 } 2829 2830 static void 2831 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2832 { 2833 struct spdk_blob *blob; 2834 2835 pthread_spin_lock(&file->lock); 2836 if (file->ref_count == 0) { 2837 pthread_spin_unlock(&file->lock); 2838 __file_close_async_done(req, -EBADF); 2839 return; 2840 } 2841 2842 file->ref_count--; 2843 if (file->ref_count > 0) { 2844 pthread_spin_unlock(&file->lock); 2845 req->args.fn.file_op(req->args.arg, 0); 2846 free_fs_request(req); 2847 return; 2848 } 2849 2850 pthread_spin_unlock(&file->lock); 2851 2852 blob = file->blob; 2853 file->blob = NULL; 2854 spdk_blob_close(blob, __file_close_async_done, req); 2855 } 2856 2857 static void 2858 __file_close_async__sync_done(void *arg, int fserrno) 2859 { 2860 struct spdk_fs_request *req = arg; 2861 struct spdk_fs_cb_args *args = &req->args; 2862 2863 __file_close_async(args->file, req); 2864 } 2865 2866 void 2867 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2868 { 2869 struct spdk_fs_request *req; 2870 struct spdk_fs_cb_args *args; 2871 2872 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2873 if (req == NULL) { 2874 SPDK_ERRLOG("Cannot allocate close async req for file=%s\n", file->name); 2875 cb_fn(cb_arg, -ENOMEM); 2876 return; 2877 } 2878 2879 args = &req->args; 2880 args->file = file; 2881 args->fn.file_op = cb_fn; 2882 args->arg = cb_arg; 2883 2884 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2885 } 2886 2887 static void 2888 __file_close(void *arg) 2889 { 2890 struct spdk_fs_request *req = arg; 2891 struct spdk_fs_cb_args *args = &req->args; 2892 struct spdk_file *file = args->file; 2893 2894 __file_close_async(file, req); 2895 } 2896 2897 int 2898 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2899 { 2900 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2901 struct spdk_fs_request *req; 2902 struct spdk_fs_cb_args *args; 2903 2904 req = alloc_fs_request(channel); 2905 if (req == NULL) { 2906 SPDK_ERRLOG("Cannot allocate close req for file=%s\n", file->name); 2907 return -ENOMEM; 2908 } 2909 2910 args = &req->args; 2911 2912 spdk_file_sync(file, ctx); 2913 BLOBFS_TRACE(file, "name=%s\n", file->name); 2914 args->file = file; 2915 args->sem = &channel->sem; 2916 args->fn.file_op = __wake_caller; 2917 args->arg = args; 2918 channel->send_request(__file_close, req); 2919 sem_wait(&channel->sem); 2920 2921 return args->rc; 2922 } 2923 2924 int 2925 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2926 { 2927 if (size < sizeof(spdk_blob_id)) { 2928 return -EINVAL; 2929 } 2930 2931 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2932 2933 return sizeof(spdk_blob_id); 2934 } 2935 2936 static void 2937 _file_free(void *ctx) 2938 { 2939 struct spdk_file *file = ctx; 2940 2941 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2942 2943 free(file->name); 2944 free(file->tree); 2945 free(file); 2946 } 2947 2948 static void 2949 file_free(struct spdk_file *file) 2950 { 2951 BLOBFS_TRACE(file, "free=%s\n", file->name); 2952 pthread_spin_lock(&file->lock); 2953 if (file->tree->present_mask == 0) { 2954 pthread_spin_unlock(&file->lock); 2955 free(file->name); 2956 free(file->tree); 2957 free(file); 2958 return; 2959 } 2960 2961 tree_free_buffers(file->tree); 2962 assert(file->tree->present_mask == 0); 2963 spdk_thread_send_msg(g_cache_pool_thread, _file_free, file); 2964 pthread_spin_unlock(&file->lock); 2965 } 2966 2967 SPDK_LOG_REGISTER_COMPONENT(blobfs) 2968 SPDK_LOG_REGISTER_COMPONENT(blobfs_rw) 2969