1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "blobfs_internal.h" 38 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/assert.h" 42 #include "spdk/env.h" 43 #include "spdk/util.h" 44 #include "spdk_internal/log.h" 45 46 #define BLOBFS_TRACE(file, str, args...) \ 47 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args) 48 49 #define BLOBFS_TRACE_RW(file, str, args...) \ 50 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args) 51 52 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 53 54 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 55 static struct spdk_mempool *g_cache_pool; 56 static TAILQ_HEAD(, spdk_file) g_caches; 57 static int g_fs_count = 0; 58 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 59 static pthread_spinlock_t g_caches_lock; 60 61 static void 62 __sem_post(void *arg, int bserrno) 63 { 64 sem_t *sem = arg; 65 66 sem_post(sem); 67 } 68 69 void 70 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 71 { 72 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 73 free(cache_buffer); 74 } 75 76 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 77 78 struct spdk_file { 79 struct spdk_filesystem *fs; 80 struct spdk_blob *blob; 81 char *name; 82 uint64_t length; 83 bool is_deleted; 84 bool open_for_writing; 85 uint64_t length_flushed; 86 uint64_t append_pos; 87 uint64_t seq_byte_count; 88 uint64_t next_seq_offset; 89 uint32_t priority; 90 TAILQ_ENTRY(spdk_file) tailq; 91 spdk_blob_id blobid; 92 uint32_t ref_count; 93 pthread_spinlock_t lock; 94 struct cache_buffer *last; 95 struct cache_tree *tree; 96 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 97 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 98 TAILQ_ENTRY(spdk_file) cache_tailq; 99 }; 100 101 struct spdk_deleted_file { 102 spdk_blob_id id; 103 TAILQ_ENTRY(spdk_deleted_file) tailq; 104 }; 105 106 struct spdk_filesystem { 107 struct spdk_blob_store *bs; 108 TAILQ_HEAD(, spdk_file) files; 109 struct spdk_bs_opts bs_opts; 110 struct spdk_bs_dev *bdev; 111 fs_send_request_fn send_request; 112 113 struct { 114 uint32_t max_ops; 115 struct spdk_io_channel *sync_io_channel; 116 struct spdk_fs_channel *sync_fs_channel; 117 } sync_target; 118 119 struct { 120 uint32_t max_ops; 121 struct spdk_io_channel *md_io_channel; 122 struct spdk_fs_channel *md_fs_channel; 123 } md_target; 124 125 struct { 126 uint32_t max_ops; 127 } io_target; 128 }; 129 130 struct spdk_fs_cb_args { 131 union { 132 spdk_fs_op_with_handle_complete fs_op_with_handle; 133 spdk_fs_op_complete fs_op; 134 spdk_file_op_with_handle_complete file_op_with_handle; 135 spdk_file_op_complete file_op; 136 spdk_file_stat_op_complete stat_op; 137 } fn; 138 void *arg; 139 sem_t *sem; 140 struct spdk_filesystem *fs; 141 struct spdk_file *file; 142 int rc; 143 bool from_request; 144 union { 145 struct { 146 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 147 } fs_load; 148 struct { 149 uint64_t length; 150 } truncate; 151 struct { 152 struct spdk_io_channel *channel; 153 void *user_buf; 154 void *pin_buf; 155 int is_read; 156 off_t offset; 157 size_t length; 158 uint64_t start_page; 159 uint64_t num_pages; 160 uint32_t blocklen; 161 } rw; 162 struct { 163 const char *old_name; 164 const char *new_name; 165 } rename; 166 struct { 167 struct cache_buffer *cache_buffer; 168 uint64_t length; 169 } flush; 170 struct { 171 struct cache_buffer *cache_buffer; 172 uint64_t length; 173 uint64_t offset; 174 } readahead; 175 struct { 176 uint64_t offset; 177 TAILQ_ENTRY(spdk_fs_request) tailq; 178 bool xattr_in_progress; 179 } sync; 180 struct { 181 uint32_t num_clusters; 182 } resize; 183 struct { 184 const char *name; 185 uint32_t flags; 186 TAILQ_ENTRY(spdk_fs_request) tailq; 187 } open; 188 struct { 189 const char *name; 190 } create; 191 struct { 192 const char *name; 193 } delete; 194 struct { 195 const char *name; 196 } stat; 197 } op; 198 }; 199 200 static void cache_free_buffers(struct spdk_file *file); 201 202 static void 203 __initialize_cache(void) 204 { 205 assert(g_cache_pool == NULL); 206 207 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 208 g_fs_cache_size / CACHE_BUFFER_SIZE, 209 CACHE_BUFFER_SIZE, 210 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 211 SPDK_ENV_SOCKET_ID_ANY); 212 TAILQ_INIT(&g_caches); 213 pthread_spin_init(&g_caches_lock, 0); 214 } 215 216 static void 217 __free_cache(void) 218 { 219 assert(g_cache_pool != NULL); 220 221 spdk_mempool_free(g_cache_pool); 222 g_cache_pool = NULL; 223 } 224 225 static uint64_t 226 __file_get_blob_size(struct spdk_file *file) 227 { 228 uint64_t cluster_sz; 229 230 cluster_sz = file->fs->bs_opts.cluster_sz; 231 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 232 } 233 234 struct spdk_fs_request { 235 struct spdk_fs_cb_args args; 236 TAILQ_ENTRY(spdk_fs_request) link; 237 struct spdk_fs_channel *channel; 238 }; 239 240 struct spdk_fs_channel { 241 struct spdk_fs_request *req_mem; 242 TAILQ_HEAD(, spdk_fs_request) reqs; 243 sem_t sem; 244 struct spdk_filesystem *fs; 245 struct spdk_io_channel *bs_channel; 246 fs_send_request_fn send_request; 247 bool sync; 248 pthread_spinlock_t lock; 249 }; 250 251 static struct spdk_fs_request * 252 alloc_fs_request(struct spdk_fs_channel *channel) 253 { 254 struct spdk_fs_request *req; 255 256 if (channel->sync) { 257 pthread_spin_lock(&channel->lock); 258 } 259 260 req = TAILQ_FIRST(&channel->reqs); 261 if (req) { 262 TAILQ_REMOVE(&channel->reqs, req, link); 263 } 264 265 if (channel->sync) { 266 pthread_spin_unlock(&channel->lock); 267 } 268 269 if (req == NULL) { 270 return NULL; 271 } 272 memset(req, 0, sizeof(*req)); 273 req->channel = channel; 274 req->args.from_request = true; 275 276 return req; 277 } 278 279 static void 280 free_fs_request(struct spdk_fs_request *req) 281 { 282 struct spdk_fs_channel *channel = req->channel; 283 284 if (channel->sync) { 285 pthread_spin_lock(&channel->lock); 286 } 287 288 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 289 290 if (channel->sync) { 291 pthread_spin_unlock(&channel->lock); 292 } 293 } 294 295 static int 296 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 297 uint32_t max_ops) 298 { 299 uint32_t i; 300 301 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 302 if (!channel->req_mem) { 303 return -1; 304 } 305 306 TAILQ_INIT(&channel->reqs); 307 sem_init(&channel->sem, 0, 0); 308 309 for (i = 0; i < max_ops; i++) { 310 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 311 } 312 313 channel->fs = fs; 314 315 return 0; 316 } 317 318 static int 319 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 320 { 321 struct spdk_filesystem *fs; 322 struct spdk_fs_channel *channel = ctx_buf; 323 324 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 325 326 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 327 } 328 329 static int 330 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 331 { 332 struct spdk_filesystem *fs; 333 struct spdk_fs_channel *channel = ctx_buf; 334 335 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 336 337 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 338 } 339 340 static int 341 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 342 { 343 struct spdk_filesystem *fs; 344 struct spdk_fs_channel *channel = ctx_buf; 345 346 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 347 348 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 349 } 350 351 static void 352 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 353 { 354 struct spdk_fs_channel *channel = ctx_buf; 355 356 free(channel->req_mem); 357 if (channel->bs_channel != NULL) { 358 spdk_bs_free_io_channel(channel->bs_channel); 359 } 360 } 361 362 static void 363 __send_request_direct(fs_request_fn fn, void *arg) 364 { 365 fn(arg); 366 } 367 368 static void 369 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 370 { 371 fs->bs = bs; 372 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 373 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 374 fs->md_target.md_fs_channel->send_request = __send_request_direct; 375 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 376 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 377 378 pthread_mutex_lock(&g_cache_init_lock); 379 if (g_fs_count == 0) { 380 __initialize_cache(); 381 } 382 g_fs_count++; 383 pthread_mutex_unlock(&g_cache_init_lock); 384 } 385 386 static void 387 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 388 { 389 struct spdk_fs_request *req = ctx; 390 struct spdk_fs_cb_args *args = &req->args; 391 struct spdk_filesystem *fs = args->fs; 392 393 if (bserrno == 0) { 394 common_fs_bs_init(fs, bs); 395 } else { 396 free(fs); 397 fs = NULL; 398 } 399 400 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 401 free_fs_request(req); 402 } 403 404 static struct spdk_filesystem * 405 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 406 { 407 struct spdk_filesystem *fs; 408 409 fs = calloc(1, sizeof(*fs)); 410 if (fs == NULL) { 411 return NULL; 412 } 413 414 fs->bdev = dev; 415 fs->send_request = send_request_fn; 416 TAILQ_INIT(&fs->files); 417 418 fs->md_target.max_ops = 512; 419 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 420 sizeof(struct spdk_fs_channel)); 421 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 422 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 423 424 fs->sync_target.max_ops = 512; 425 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 426 sizeof(struct spdk_fs_channel)); 427 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 428 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 429 430 fs->io_target.max_ops = 512; 431 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 432 sizeof(struct spdk_fs_channel)); 433 434 return fs; 435 } 436 437 void 438 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 439 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 440 { 441 struct spdk_filesystem *fs; 442 struct spdk_fs_request *req; 443 struct spdk_fs_cb_args *args; 444 445 fs = fs_alloc(dev, send_request_fn); 446 if (fs == NULL) { 447 cb_fn(cb_arg, NULL, -ENOMEM); 448 return; 449 } 450 451 req = alloc_fs_request(fs->md_target.md_fs_channel); 452 if (req == NULL) { 453 spdk_put_io_channel(fs->md_target.md_io_channel); 454 spdk_io_device_unregister(&fs->md_target, NULL); 455 spdk_put_io_channel(fs->sync_target.sync_io_channel); 456 spdk_io_device_unregister(&fs->sync_target, NULL); 457 spdk_io_device_unregister(&fs->io_target, NULL); 458 free(fs); 459 cb_fn(cb_arg, NULL, -ENOMEM); 460 return; 461 } 462 463 args = &req->args; 464 args->fn.fs_op_with_handle = cb_fn; 465 args->arg = cb_arg; 466 args->fs = fs; 467 468 spdk_bs_init(dev, NULL, init_cb, req); 469 } 470 471 static struct spdk_file * 472 file_alloc(struct spdk_filesystem *fs) 473 { 474 struct spdk_file *file; 475 476 file = calloc(1, sizeof(*file)); 477 if (file == NULL) { 478 return NULL; 479 } 480 481 file->tree = calloc(1, sizeof(*file->tree)); 482 if (file->tree == NULL) { 483 free(file); 484 return NULL; 485 } 486 487 file->fs = fs; 488 TAILQ_INIT(&file->open_requests); 489 TAILQ_INIT(&file->sync_requests); 490 pthread_spin_init(&file->lock, 0); 491 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 492 file->priority = SPDK_FILE_PRIORITY_LOW; 493 return file; 494 } 495 496 static void iter_delete_cb(void *ctx, int bserrno); 497 498 static int 499 _handle_deleted_files(struct spdk_fs_request *req) 500 { 501 struct spdk_fs_cb_args *args = &req->args; 502 struct spdk_filesystem *fs = args->fs; 503 504 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 505 struct spdk_deleted_file *deleted_file; 506 507 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 508 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 509 spdk_bs_md_delete_blob(fs->bs, deleted_file->id, iter_delete_cb, req); 510 free(deleted_file); 511 return 0; 512 } 513 514 return 1; 515 } 516 517 static void 518 iter_delete_cb(void *ctx, int bserrno) 519 { 520 struct spdk_fs_request *req = ctx; 521 struct spdk_fs_cb_args *args = &req->args; 522 struct spdk_filesystem *fs = args->fs; 523 524 if (_handle_deleted_files(req) == 0) 525 return; 526 527 args->fn.fs_op_with_handle(args->arg, fs, 0); 528 free_fs_request(req); 529 530 } 531 532 static void 533 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 534 { 535 struct spdk_fs_request *req = ctx; 536 struct spdk_fs_cb_args *args = &req->args; 537 struct spdk_filesystem *fs = args->fs; 538 uint64_t *length; 539 const char *name; 540 uint32_t *is_deleted; 541 size_t value_len; 542 543 if (rc == -ENOENT) { 544 /* Finished iterating */ 545 if (_handle_deleted_files(req) == 0) 546 return; 547 args->fn.fs_op_with_handle(args->arg, fs, 0); 548 free_fs_request(req); 549 return; 550 } else if (rc < 0) { 551 args->fn.fs_op_with_handle(args->arg, fs, rc); 552 free_fs_request(req); 553 return; 554 } 555 556 rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len); 557 if (rc < 0) { 558 args->fn.fs_op_with_handle(args->arg, fs, rc); 559 free_fs_request(req); 560 return; 561 } 562 563 rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len); 564 if (rc < 0) { 565 args->fn.fs_op_with_handle(args->arg, fs, rc); 566 free_fs_request(req); 567 return; 568 } 569 570 assert(value_len == 8); 571 572 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 573 rc = spdk_bs_md_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 574 if (rc < 0) { 575 struct spdk_file *f; 576 577 f = file_alloc(fs); 578 if (f == NULL) { 579 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 580 free_fs_request(req); 581 return; 582 } 583 584 f->name = strdup(name); 585 f->blobid = spdk_blob_get_id(blob); 586 f->length = *length; 587 f->length_flushed = *length; 588 f->append_pos = *length; 589 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 590 } else { 591 struct spdk_deleted_file *deleted_file; 592 593 deleted_file = calloc(1, sizeof(*deleted_file)); 594 if (deleted_file == NULL) { 595 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 596 free_fs_request(req); 597 return; 598 } 599 deleted_file->id = spdk_blob_get_id(blob); 600 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 601 } 602 603 spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req); 604 } 605 606 static void 607 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 608 { 609 struct spdk_fs_request *req = ctx; 610 struct spdk_fs_cb_args *args = &req->args; 611 struct spdk_filesystem *fs = args->fs; 612 613 if (bserrno != 0) { 614 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 615 free_fs_request(req); 616 free(fs); 617 return; 618 } 619 620 common_fs_bs_init(fs, bs); 621 spdk_bs_md_iter_first(fs->bs, iter_cb, req); 622 } 623 624 void 625 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 626 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 627 { 628 struct spdk_filesystem *fs; 629 struct spdk_fs_cb_args *args; 630 struct spdk_fs_request *req; 631 632 fs = fs_alloc(dev, send_request_fn); 633 if (fs == NULL) { 634 cb_fn(cb_arg, NULL, -ENOMEM); 635 return; 636 } 637 638 req = alloc_fs_request(fs->md_target.md_fs_channel); 639 if (req == NULL) { 640 spdk_put_io_channel(fs->md_target.md_io_channel); 641 spdk_io_device_unregister(&fs->md_target, NULL); 642 spdk_put_io_channel(fs->sync_target.sync_io_channel); 643 spdk_io_device_unregister(&fs->sync_target, NULL); 644 spdk_io_device_unregister(&fs->io_target, NULL); 645 free(fs); 646 cb_fn(cb_arg, NULL, -ENOMEM); 647 return; 648 } 649 650 args = &req->args; 651 args->fn.fs_op_with_handle = cb_fn; 652 args->arg = cb_arg; 653 args->fs = fs; 654 TAILQ_INIT(&args->op.fs_load.deleted_files); 655 spdk_bs_load(dev, load_cb, req); 656 } 657 658 static void 659 unload_cb(void *ctx, int bserrno) 660 { 661 struct spdk_fs_request *req = ctx; 662 struct spdk_fs_cb_args *args = &req->args; 663 struct spdk_filesystem *fs = args->fs; 664 665 pthread_mutex_lock(&g_cache_init_lock); 666 g_fs_count--; 667 if (g_fs_count == 0) { 668 __free_cache(); 669 } 670 pthread_mutex_unlock(&g_cache_init_lock); 671 672 args->fn.fs_op(args->arg, bserrno); 673 free(req); 674 675 spdk_io_device_unregister(&fs->io_target, NULL); 676 spdk_io_device_unregister(&fs->sync_target, NULL); 677 spdk_io_device_unregister(&fs->md_target, NULL); 678 679 free(fs); 680 } 681 682 void 683 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 684 { 685 struct spdk_fs_request *req; 686 struct spdk_fs_cb_args *args; 687 688 /* 689 * We must free the md_channel before unloading the blobstore, so just 690 * allocate this request from the general heap. 691 */ 692 req = calloc(1, sizeof(*req)); 693 if (req == NULL) { 694 cb_fn(cb_arg, -ENOMEM); 695 return; 696 } 697 698 args = &req->args; 699 args->fn.fs_op = cb_fn; 700 args->arg = cb_arg; 701 args->fs = fs; 702 703 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 704 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 705 spdk_bs_unload(fs->bs, unload_cb, req); 706 } 707 708 static struct spdk_file * 709 fs_find_file(struct spdk_filesystem *fs, const char *name) 710 { 711 struct spdk_file *file; 712 713 TAILQ_FOREACH(file, &fs->files, tailq) { 714 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 715 return file; 716 } 717 } 718 719 return NULL; 720 } 721 722 void 723 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 724 spdk_file_stat_op_complete cb_fn, void *cb_arg) 725 { 726 struct spdk_file_stat stat; 727 struct spdk_file *f = NULL; 728 729 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 730 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 731 return; 732 } 733 734 f = fs_find_file(fs, name); 735 if (f != NULL) { 736 stat.blobid = f->blobid; 737 stat.size = f->length; 738 cb_fn(cb_arg, &stat, 0); 739 return; 740 } 741 742 cb_fn(cb_arg, NULL, -ENOENT); 743 } 744 745 static void 746 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 747 { 748 struct spdk_fs_request *req = arg; 749 struct spdk_fs_cb_args *args = &req->args; 750 751 args->rc = fserrno; 752 if (fserrno == 0) { 753 memcpy(args->arg, stat, sizeof(*stat)); 754 } 755 sem_post(args->sem); 756 } 757 758 static void 759 __file_stat(void *arg) 760 { 761 struct spdk_fs_request *req = arg; 762 struct spdk_fs_cb_args *args = &req->args; 763 764 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 765 args->fn.stat_op, req); 766 } 767 768 int 769 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 770 const char *name, struct spdk_file_stat *stat) 771 { 772 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 773 struct spdk_fs_request *req; 774 int rc; 775 776 req = alloc_fs_request(channel); 777 assert(req != NULL); 778 779 req->args.fs = fs; 780 req->args.op.stat.name = name; 781 req->args.fn.stat_op = __copy_stat; 782 req->args.arg = stat; 783 req->args.sem = &channel->sem; 784 channel->send_request(__file_stat, req); 785 sem_wait(&channel->sem); 786 787 rc = req->args.rc; 788 free_fs_request(req); 789 790 return rc; 791 } 792 793 static void 794 fs_create_blob_close_cb(void *ctx, int bserrno) 795 { 796 struct spdk_fs_request *req = ctx; 797 struct spdk_fs_cb_args *args = &req->args; 798 799 args->fn.file_op(args->arg, bserrno); 800 free_fs_request(req); 801 } 802 803 static void 804 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 805 { 806 struct spdk_fs_request *req = ctx; 807 struct spdk_fs_cb_args *args = &req->args; 808 struct spdk_file *f = args->file; 809 uint64_t length = 0; 810 811 f->blob = blob; 812 spdk_bs_md_resize_blob(blob, 1); 813 spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 814 spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length)); 815 816 spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args); 817 } 818 819 static void 820 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 821 { 822 struct spdk_fs_request *req = ctx; 823 struct spdk_fs_cb_args *args = &req->args; 824 struct spdk_file *f = args->file; 825 826 f->blobid = blobid; 827 spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 828 } 829 830 void 831 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 832 spdk_file_op_complete cb_fn, void *cb_arg) 833 { 834 struct spdk_file *file; 835 struct spdk_fs_request *req; 836 struct spdk_fs_cb_args *args; 837 838 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 839 cb_fn(cb_arg, -ENAMETOOLONG); 840 return; 841 } 842 843 file = fs_find_file(fs, name); 844 if (file != NULL) { 845 cb_fn(cb_arg, -EEXIST); 846 return; 847 } 848 849 file = file_alloc(fs); 850 if (file == NULL) { 851 cb_fn(cb_arg, -ENOMEM); 852 return; 853 } 854 855 req = alloc_fs_request(fs->md_target.md_fs_channel); 856 if (req == NULL) { 857 cb_fn(cb_arg, -ENOMEM); 858 return; 859 } 860 861 args = &req->args; 862 args->file = file; 863 args->fn.file_op = cb_fn; 864 args->arg = cb_arg; 865 866 file->name = strdup(name); 867 spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args); 868 } 869 870 static void 871 __fs_create_file_done(void *arg, int fserrno) 872 { 873 struct spdk_fs_request *req = arg; 874 struct spdk_fs_cb_args *args = &req->args; 875 876 args->rc = fserrno; 877 sem_post(args->sem); 878 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 879 } 880 881 static void 882 __fs_create_file(void *arg) 883 { 884 struct spdk_fs_request *req = arg; 885 struct spdk_fs_cb_args *args = &req->args; 886 887 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 888 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 889 } 890 891 int 892 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 893 { 894 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 895 struct spdk_fs_request *req; 896 struct spdk_fs_cb_args *args; 897 int rc; 898 899 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 900 901 req = alloc_fs_request(channel); 902 assert(req != NULL); 903 904 args = &req->args; 905 args->fs = fs; 906 args->op.create.name = name; 907 args->sem = &channel->sem; 908 fs->send_request(__fs_create_file, req); 909 sem_wait(&channel->sem); 910 rc = args->rc; 911 free_fs_request(req); 912 913 return rc; 914 } 915 916 static void 917 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 918 { 919 struct spdk_fs_request *req = ctx; 920 struct spdk_fs_cb_args *args = &req->args; 921 struct spdk_file *f = args->file; 922 923 f->blob = blob; 924 while (!TAILQ_EMPTY(&f->open_requests)) { 925 req = TAILQ_FIRST(&f->open_requests); 926 args = &req->args; 927 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 928 args->fn.file_op_with_handle(args->arg, f, bserrno); 929 free_fs_request(req); 930 } 931 } 932 933 static void 934 fs_open_blob_create_cb(void *ctx, int bserrno) 935 { 936 struct spdk_fs_request *req = ctx; 937 struct spdk_fs_cb_args *args = &req->args; 938 struct spdk_file *file = args->file; 939 struct spdk_filesystem *fs = args->fs; 940 941 if (file == NULL) { 942 /* 943 * This is from an open with CREATE flag - the file 944 * is now created so look it up in the file list for this 945 * filesystem. 946 */ 947 file = fs_find_file(fs, args->op.open.name); 948 assert(file != NULL); 949 args->file = file; 950 } 951 952 file->ref_count++; 953 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 954 if (file->ref_count == 1) { 955 assert(file->blob == NULL); 956 spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 957 } else if (file->blob != NULL) { 958 fs_open_blob_done(req, file->blob, 0); 959 } else { 960 /* 961 * The blob open for this file is in progress due to a previous 962 * open request. When that open completes, it will invoke the 963 * open callback for this request. 964 */ 965 } 966 } 967 968 void 969 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 970 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 971 { 972 struct spdk_file *f = NULL; 973 struct spdk_fs_request *req; 974 struct spdk_fs_cb_args *args; 975 976 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 977 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 978 return; 979 } 980 981 f = fs_find_file(fs, name); 982 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 983 cb_fn(cb_arg, NULL, -ENOENT); 984 return; 985 } 986 987 if (f != NULL && f->is_deleted == true) { 988 cb_fn(cb_arg, NULL, -ENOENT); 989 return; 990 } 991 992 req = alloc_fs_request(fs->md_target.md_fs_channel); 993 if (req == NULL) { 994 cb_fn(cb_arg, NULL, -ENOMEM); 995 return; 996 } 997 998 args = &req->args; 999 args->fn.file_op_with_handle = cb_fn; 1000 args->arg = cb_arg; 1001 args->file = f; 1002 args->fs = fs; 1003 args->op.open.name = name; 1004 1005 if (f == NULL) { 1006 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1007 } else { 1008 fs_open_blob_create_cb(req, 0); 1009 } 1010 } 1011 1012 static void 1013 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1014 { 1015 struct spdk_fs_request *req = arg; 1016 struct spdk_fs_cb_args *args = &req->args; 1017 1018 args->file = file; 1019 args->rc = bserrno; 1020 sem_post(args->sem); 1021 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 1022 } 1023 1024 static void 1025 __fs_open_file(void *arg) 1026 { 1027 struct spdk_fs_request *req = arg; 1028 struct spdk_fs_cb_args *args = &req->args; 1029 1030 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 1031 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1032 __fs_open_file_done, req); 1033 } 1034 1035 int 1036 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1037 const char *name, uint32_t flags, struct spdk_file **file) 1038 { 1039 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1040 struct spdk_fs_request *req; 1041 struct spdk_fs_cb_args *args; 1042 int rc; 1043 1044 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1045 1046 req = alloc_fs_request(channel); 1047 assert(req != NULL); 1048 1049 args = &req->args; 1050 args->fs = fs; 1051 args->op.open.name = name; 1052 args->op.open.flags = flags; 1053 args->sem = &channel->sem; 1054 fs->send_request(__fs_open_file, req); 1055 sem_wait(&channel->sem); 1056 rc = args->rc; 1057 if (rc == 0) { 1058 *file = args->file; 1059 } else { 1060 *file = NULL; 1061 } 1062 free_fs_request(req); 1063 1064 return rc; 1065 } 1066 1067 static void 1068 fs_rename_blob_close_cb(void *ctx, int bserrno) 1069 { 1070 struct spdk_fs_request *req = ctx; 1071 struct spdk_fs_cb_args *args = &req->args; 1072 1073 args->fn.fs_op(args->arg, bserrno); 1074 free_fs_request(req); 1075 } 1076 1077 static void 1078 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1079 { 1080 struct spdk_fs_request *req = ctx; 1081 struct spdk_fs_cb_args *args = &req->args; 1082 struct spdk_file *f = args->file; 1083 const char *new_name = args->op.rename.new_name; 1084 1085 f->blob = blob; 1086 spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1087 spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req); 1088 } 1089 1090 static void 1091 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1092 { 1093 struct spdk_fs_cb_args *args = &req->args; 1094 struct spdk_file *f; 1095 1096 f = fs_find_file(args->fs, args->op.rename.old_name); 1097 if (f == NULL) { 1098 args->fn.fs_op(args->arg, -ENOENT); 1099 free_fs_request(req); 1100 return; 1101 } 1102 1103 free(f->name); 1104 f->name = strdup(args->op.rename.new_name); 1105 args->file = f; 1106 spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1107 } 1108 1109 static void 1110 fs_rename_delete_done(void *arg, int fserrno) 1111 { 1112 __spdk_fs_md_rename_file(arg); 1113 } 1114 1115 void 1116 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1117 const char *old_name, const char *new_name, 1118 spdk_file_op_complete cb_fn, void *cb_arg) 1119 { 1120 struct spdk_file *f; 1121 struct spdk_fs_request *req; 1122 struct spdk_fs_cb_args *args; 1123 1124 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1125 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1126 cb_fn(cb_arg, -ENAMETOOLONG); 1127 return; 1128 } 1129 1130 req = alloc_fs_request(fs->md_target.md_fs_channel); 1131 if (req == NULL) { 1132 cb_fn(cb_arg, -ENOMEM); 1133 return; 1134 } 1135 1136 args = &req->args; 1137 args->fn.fs_op = cb_fn; 1138 args->fs = fs; 1139 args->arg = cb_arg; 1140 args->op.rename.old_name = old_name; 1141 args->op.rename.new_name = new_name; 1142 1143 f = fs_find_file(fs, new_name); 1144 if (f == NULL) { 1145 __spdk_fs_md_rename_file(req); 1146 return; 1147 } 1148 1149 /* 1150 * The rename overwrites an existing file. So delete the existing file, then 1151 * do the actual rename. 1152 */ 1153 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1154 } 1155 1156 static void 1157 __fs_rename_file_done(void *arg, int fserrno) 1158 { 1159 struct spdk_fs_request *req = arg; 1160 struct spdk_fs_cb_args *args = &req->args; 1161 1162 args->rc = fserrno; 1163 sem_post(args->sem); 1164 } 1165 1166 static void 1167 __fs_rename_file(void *arg) 1168 { 1169 struct spdk_fs_request *req = arg; 1170 struct spdk_fs_cb_args *args = &req->args; 1171 1172 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1173 __fs_rename_file_done, req); 1174 } 1175 1176 int 1177 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1178 const char *old_name, const char *new_name) 1179 { 1180 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1181 struct spdk_fs_request *req; 1182 struct spdk_fs_cb_args *args; 1183 int rc; 1184 1185 req = alloc_fs_request(channel); 1186 assert(req != NULL); 1187 1188 args = &req->args; 1189 1190 args->fs = fs; 1191 args->op.rename.old_name = old_name; 1192 args->op.rename.new_name = new_name; 1193 args->sem = &channel->sem; 1194 fs->send_request(__fs_rename_file, req); 1195 sem_wait(&channel->sem); 1196 rc = args->rc; 1197 free_fs_request(req); 1198 return rc; 1199 } 1200 1201 static void 1202 blob_delete_cb(void *ctx, int bserrno) 1203 { 1204 struct spdk_fs_request *req = ctx; 1205 struct spdk_fs_cb_args *args = &req->args; 1206 1207 args->fn.file_op(args->arg, bserrno); 1208 free_fs_request(req); 1209 } 1210 1211 void 1212 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1213 spdk_file_op_complete cb_fn, void *cb_arg) 1214 { 1215 struct spdk_file *f; 1216 spdk_blob_id blobid; 1217 struct spdk_fs_request *req; 1218 struct spdk_fs_cb_args *args; 1219 1220 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1221 1222 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1223 cb_fn(cb_arg, -ENAMETOOLONG); 1224 return; 1225 } 1226 1227 f = fs_find_file(fs, name); 1228 if (f == NULL) { 1229 cb_fn(cb_arg, -ENOENT); 1230 return; 1231 } 1232 1233 req = alloc_fs_request(fs->md_target.md_fs_channel); 1234 if (req == NULL) { 1235 cb_fn(cb_arg, -ENOMEM); 1236 return; 1237 } 1238 1239 args = &req->args; 1240 args->fn.file_op = cb_fn; 1241 args->arg = cb_arg; 1242 1243 if (f->ref_count > 0) { 1244 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1245 f->is_deleted = true; 1246 spdk_blob_md_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1247 spdk_bs_md_sync_blob(f->blob, blob_delete_cb, args); 1248 return; 1249 } 1250 1251 TAILQ_REMOVE(&fs->files, f, tailq); 1252 1253 cache_free_buffers(f); 1254 1255 blobid = f->blobid; 1256 1257 free(f->name); 1258 free(f->tree); 1259 free(f); 1260 1261 spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1262 } 1263 1264 static void 1265 __fs_delete_file_done(void *arg, int fserrno) 1266 { 1267 struct spdk_fs_request *req = arg; 1268 struct spdk_fs_cb_args *args = &req->args; 1269 1270 args->rc = fserrno; 1271 sem_post(args->sem); 1272 } 1273 1274 static void 1275 __fs_delete_file(void *arg) 1276 { 1277 struct spdk_fs_request *req = arg; 1278 struct spdk_fs_cb_args *args = &req->args; 1279 1280 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1281 } 1282 1283 int 1284 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1285 const char *name) 1286 { 1287 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1288 struct spdk_fs_request *req; 1289 struct spdk_fs_cb_args *args; 1290 int rc; 1291 1292 req = alloc_fs_request(channel); 1293 assert(req != NULL); 1294 1295 args = &req->args; 1296 args->fs = fs; 1297 args->op.delete.name = name; 1298 args->sem = &channel->sem; 1299 fs->send_request(__fs_delete_file, req); 1300 sem_wait(&channel->sem); 1301 rc = args->rc; 1302 free_fs_request(req); 1303 1304 return rc; 1305 } 1306 1307 spdk_fs_iter 1308 spdk_fs_iter_first(struct spdk_filesystem *fs) 1309 { 1310 struct spdk_file *f; 1311 1312 f = TAILQ_FIRST(&fs->files); 1313 return f; 1314 } 1315 1316 spdk_fs_iter 1317 spdk_fs_iter_next(spdk_fs_iter iter) 1318 { 1319 struct spdk_file *f = iter; 1320 1321 if (f == NULL) { 1322 return NULL; 1323 } 1324 1325 f = TAILQ_NEXT(f, tailq); 1326 return f; 1327 } 1328 1329 const char * 1330 spdk_file_get_name(struct spdk_file *file) 1331 { 1332 return file->name; 1333 } 1334 1335 uint64_t 1336 spdk_file_get_length(struct spdk_file *file) 1337 { 1338 assert(file != NULL); 1339 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1340 return file->length; 1341 } 1342 1343 static void 1344 fs_truncate_complete_cb(void *ctx, int bserrno) 1345 { 1346 struct spdk_fs_request *req = ctx; 1347 struct spdk_fs_cb_args *args = &req->args; 1348 1349 args->fn.file_op(args->arg, bserrno); 1350 free_fs_request(req); 1351 } 1352 1353 static uint64_t 1354 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1355 { 1356 return (length + cluster_sz - 1) / cluster_sz; 1357 } 1358 1359 void 1360 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1361 spdk_file_op_complete cb_fn, void *cb_arg) 1362 { 1363 struct spdk_filesystem *fs; 1364 size_t num_clusters; 1365 struct spdk_fs_request *req; 1366 struct spdk_fs_cb_args *args; 1367 1368 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1369 if (length == file->length) { 1370 cb_fn(cb_arg, 0); 1371 return; 1372 } 1373 1374 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1375 if (req == NULL) { 1376 cb_fn(cb_arg, -ENOMEM); 1377 return; 1378 } 1379 1380 args = &req->args; 1381 args->fn.file_op = cb_fn; 1382 args->arg = cb_arg; 1383 args->file = file; 1384 fs = file->fs; 1385 1386 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1387 1388 spdk_bs_md_resize_blob(file->blob, num_clusters); 1389 spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length)); 1390 1391 file->length = length; 1392 if (file->append_pos > file->length) { 1393 file->append_pos = file->length; 1394 } 1395 1396 spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args); 1397 } 1398 1399 static void 1400 __truncate(void *arg) 1401 { 1402 struct spdk_fs_request *req = arg; 1403 struct spdk_fs_cb_args *args = &req->args; 1404 1405 spdk_file_truncate_async(args->file, args->op.truncate.length, 1406 args->fn.file_op, args->arg); 1407 } 1408 1409 void 1410 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1411 uint64_t length) 1412 { 1413 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1414 struct spdk_fs_request *req; 1415 struct spdk_fs_cb_args *args; 1416 1417 req = alloc_fs_request(channel); 1418 assert(req != NULL); 1419 1420 args = &req->args; 1421 1422 args->file = file; 1423 args->op.truncate.length = length; 1424 args->fn.file_op = __sem_post; 1425 args->arg = &channel->sem; 1426 1427 channel->send_request(__truncate, req); 1428 sem_wait(&channel->sem); 1429 free_fs_request(req); 1430 } 1431 1432 static void 1433 __rw_done(void *ctx, int bserrno) 1434 { 1435 struct spdk_fs_request *req = ctx; 1436 struct spdk_fs_cb_args *args = &req->args; 1437 1438 spdk_dma_free(args->op.rw.pin_buf); 1439 args->fn.file_op(args->arg, bserrno); 1440 free_fs_request(req); 1441 } 1442 1443 static void 1444 __read_done(void *ctx, int bserrno) 1445 { 1446 struct spdk_fs_request *req = ctx; 1447 struct spdk_fs_cb_args *args = &req->args; 1448 1449 if (args->op.rw.is_read) { 1450 memcpy(args->op.rw.user_buf, 1451 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1452 args->op.rw.length); 1453 __rw_done(req, 0); 1454 } else { 1455 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1456 args->op.rw.user_buf, 1457 args->op.rw.length); 1458 spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel, 1459 args->op.rw.pin_buf, 1460 args->op.rw.start_page, args->op.rw.num_pages, 1461 __rw_done, req); 1462 } 1463 } 1464 1465 static void 1466 __do_blob_read(void *ctx, int fserrno) 1467 { 1468 struct spdk_fs_request *req = ctx; 1469 struct spdk_fs_cb_args *args = &req->args; 1470 1471 spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel, 1472 args->op.rw.pin_buf, 1473 args->op.rw.start_page, args->op.rw.num_pages, 1474 __read_done, req); 1475 } 1476 1477 static void 1478 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1479 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1480 { 1481 uint64_t end_page; 1482 1483 *page_size = spdk_bs_get_page_size(file->fs->bs); 1484 *start_page = offset / *page_size; 1485 end_page = (offset + length - 1) / *page_size; 1486 *num_pages = (end_page - *start_page + 1); 1487 } 1488 1489 static void 1490 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1491 void *payload, uint64_t offset, uint64_t length, 1492 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1493 { 1494 struct spdk_fs_request *req; 1495 struct spdk_fs_cb_args *args; 1496 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1497 uint64_t start_page, num_pages, pin_buf_length; 1498 uint32_t page_size; 1499 1500 if (is_read && offset + length > file->length) { 1501 cb_fn(cb_arg, -EINVAL); 1502 return; 1503 } 1504 1505 req = alloc_fs_request(channel); 1506 if (req == NULL) { 1507 cb_fn(cb_arg, -ENOMEM); 1508 return; 1509 } 1510 1511 args = &req->args; 1512 args->fn.file_op = cb_fn; 1513 args->arg = cb_arg; 1514 args->file = file; 1515 args->op.rw.channel = channel->bs_channel; 1516 args->op.rw.user_buf = payload; 1517 args->op.rw.is_read = is_read; 1518 args->op.rw.offset = offset; 1519 args->op.rw.length = length; 1520 1521 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1522 pin_buf_length = num_pages * page_size; 1523 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL); 1524 1525 args->op.rw.start_page = start_page; 1526 args->op.rw.num_pages = num_pages; 1527 1528 if (!is_read && file->length < offset + length) { 1529 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1530 } else { 1531 __do_blob_read(req, 0); 1532 } 1533 } 1534 1535 void 1536 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1537 void *payload, uint64_t offset, uint64_t length, 1538 spdk_file_op_complete cb_fn, void *cb_arg) 1539 { 1540 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1541 } 1542 1543 void 1544 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1545 void *payload, uint64_t offset, uint64_t length, 1546 spdk_file_op_complete cb_fn, void *cb_arg) 1547 { 1548 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n", 1549 file->name, offset, length); 1550 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1551 } 1552 1553 struct spdk_io_channel * 1554 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1555 { 1556 struct spdk_io_channel *io_channel; 1557 struct spdk_fs_channel *fs_channel; 1558 1559 io_channel = spdk_get_io_channel(&fs->io_target); 1560 fs_channel = spdk_io_channel_get_ctx(io_channel); 1561 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1562 fs_channel->send_request = __send_request_direct; 1563 1564 return io_channel; 1565 } 1566 1567 struct spdk_io_channel * 1568 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs) 1569 { 1570 struct spdk_io_channel *io_channel; 1571 struct spdk_fs_channel *fs_channel; 1572 1573 io_channel = spdk_get_io_channel(&fs->io_target); 1574 fs_channel = spdk_io_channel_get_ctx(io_channel); 1575 fs_channel->send_request = fs->send_request; 1576 fs_channel->sync = 1; 1577 pthread_spin_init(&fs_channel->lock, 0); 1578 1579 return io_channel; 1580 } 1581 1582 void 1583 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1584 { 1585 spdk_put_io_channel(channel); 1586 } 1587 1588 void 1589 spdk_fs_set_cache_size(uint64_t size_in_mb) 1590 { 1591 g_fs_cache_size = size_in_mb * 1024 * 1024; 1592 } 1593 1594 uint64_t 1595 spdk_fs_get_cache_size(void) 1596 { 1597 return g_fs_cache_size / (1024 * 1024); 1598 } 1599 1600 static void __file_flush(void *_args); 1601 1602 static void * 1603 alloc_cache_memory_buffer(struct spdk_file *context) 1604 { 1605 struct spdk_file *file; 1606 void *buf; 1607 1608 buf = spdk_mempool_get(g_cache_pool); 1609 if (buf != NULL) { 1610 return buf; 1611 } 1612 1613 pthread_spin_lock(&g_caches_lock); 1614 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1615 if (!file->open_for_writing && 1616 file->priority == SPDK_FILE_PRIORITY_LOW && 1617 file != context) { 1618 break; 1619 } 1620 } 1621 pthread_spin_unlock(&g_caches_lock); 1622 if (file != NULL) { 1623 cache_free_buffers(file); 1624 buf = spdk_mempool_get(g_cache_pool); 1625 if (buf != NULL) { 1626 return buf; 1627 } 1628 } 1629 1630 pthread_spin_lock(&g_caches_lock); 1631 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1632 if (!file->open_for_writing && file != context) { 1633 break; 1634 } 1635 } 1636 pthread_spin_unlock(&g_caches_lock); 1637 if (file != NULL) { 1638 cache_free_buffers(file); 1639 buf = spdk_mempool_get(g_cache_pool); 1640 if (buf != NULL) { 1641 return buf; 1642 } 1643 } 1644 1645 pthread_spin_lock(&g_caches_lock); 1646 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1647 if (file != context) { 1648 break; 1649 } 1650 } 1651 pthread_spin_unlock(&g_caches_lock); 1652 if (file != NULL) { 1653 cache_free_buffers(file); 1654 buf = spdk_mempool_get(g_cache_pool); 1655 if (buf != NULL) { 1656 return buf; 1657 } 1658 } 1659 1660 return NULL; 1661 } 1662 1663 static struct cache_buffer * 1664 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1665 { 1666 struct cache_buffer *buf; 1667 int count = 0; 1668 1669 buf = calloc(1, sizeof(*buf)); 1670 if (buf == NULL) { 1671 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "calloc failed\n"); 1672 return NULL; 1673 } 1674 1675 buf->buf = alloc_cache_memory_buffer(file); 1676 while (buf->buf == NULL) { 1677 /* 1678 * TODO: alloc_cache_memory_buffer() should eventually free 1679 * some buffers. Need a more sophisticated check here, instead 1680 * of just bailing if 100 tries does not result in getting a 1681 * free buffer. This will involve using the sync channel's 1682 * semaphore to block until a buffer becomes available. 1683 */ 1684 if (count++ == 100) { 1685 SPDK_ERRLOG("could not allocate cache buffer\n"); 1686 assert(false); 1687 free(buf); 1688 return NULL; 1689 } 1690 buf->buf = alloc_cache_memory_buffer(file); 1691 } 1692 1693 buf->buf_size = CACHE_BUFFER_SIZE; 1694 buf->offset = offset; 1695 1696 pthread_spin_lock(&g_caches_lock); 1697 if (file->tree->present_mask == 0) { 1698 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1699 } 1700 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1701 pthread_spin_unlock(&g_caches_lock); 1702 1703 return buf; 1704 } 1705 1706 static struct cache_buffer * 1707 cache_append_buffer(struct spdk_file *file) 1708 { 1709 struct cache_buffer *last; 1710 1711 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1712 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1713 1714 last = cache_insert_buffer(file, file->append_pos); 1715 if (last == NULL) { 1716 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n"); 1717 return NULL; 1718 } 1719 1720 file->last = last; 1721 1722 return last; 1723 } 1724 1725 static void 1726 __wake_caller(struct spdk_fs_cb_args *args) 1727 { 1728 sem_post(args->sem); 1729 } 1730 1731 static void __check_sync_reqs(struct spdk_file *file); 1732 1733 static void 1734 __file_cache_finish_sync(struct spdk_file *file) 1735 { 1736 struct spdk_fs_request *sync_req; 1737 struct spdk_fs_cb_args *sync_args; 1738 1739 pthread_spin_lock(&file->lock); 1740 sync_req = TAILQ_FIRST(&file->sync_requests); 1741 sync_args = &sync_req->args; 1742 assert(sync_args->op.sync.offset <= file->length_flushed); 1743 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1744 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1745 pthread_spin_unlock(&file->lock); 1746 1747 sync_args->fn.file_op(sync_args->arg, 0); 1748 __check_sync_reqs(file); 1749 1750 pthread_spin_lock(&file->lock); 1751 free_fs_request(sync_req); 1752 pthread_spin_unlock(&file->lock); 1753 } 1754 1755 static void 1756 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1757 { 1758 struct spdk_file *file = ctx; 1759 1760 __file_cache_finish_sync(file); 1761 } 1762 1763 static void 1764 __free_args(struct spdk_fs_cb_args *args) 1765 { 1766 struct spdk_fs_request *req; 1767 1768 if (!args->from_request) { 1769 free(args); 1770 } else { 1771 /* Depends on args being at the start of the spdk_fs_request structure. */ 1772 req = (struct spdk_fs_request *)args; 1773 free_fs_request(req); 1774 } 1775 } 1776 1777 static void 1778 __check_sync_reqs(struct spdk_file *file) 1779 { 1780 struct spdk_fs_request *sync_req; 1781 1782 pthread_spin_lock(&file->lock); 1783 1784 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1785 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1786 break; 1787 } 1788 } 1789 1790 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1791 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1792 sync_req->args.op.sync.xattr_in_progress = true; 1793 spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed, 1794 sizeof(file->length_flushed)); 1795 1796 pthread_spin_unlock(&file->lock); 1797 spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file); 1798 } else { 1799 pthread_spin_unlock(&file->lock); 1800 } 1801 } 1802 1803 static void 1804 __file_flush_done(void *arg, int bserrno) 1805 { 1806 struct spdk_fs_cb_args *args = arg; 1807 struct spdk_file *file = args->file; 1808 struct cache_buffer *next = args->op.flush.cache_buffer; 1809 1810 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1811 1812 pthread_spin_lock(&file->lock); 1813 next->in_progress = false; 1814 next->bytes_flushed += args->op.flush.length; 1815 file->length_flushed += args->op.flush.length; 1816 if (file->length_flushed > file->length) { 1817 file->length = file->length_flushed; 1818 } 1819 if (next->bytes_flushed == next->buf_size) { 1820 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1821 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1822 } 1823 1824 /* 1825 * Assert that there is no cached data that extends past the end of the underlying 1826 * blob. 1827 */ 1828 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1829 next->bytes_filled == 0); 1830 1831 pthread_spin_unlock(&file->lock); 1832 1833 __check_sync_reqs(file); 1834 1835 __file_flush(args); 1836 } 1837 1838 static void 1839 __file_flush(void *_args) 1840 { 1841 struct spdk_fs_cb_args *args = _args; 1842 struct spdk_file *file = args->file; 1843 struct cache_buffer *next; 1844 uint64_t offset, length, start_page, num_pages; 1845 uint32_t page_size; 1846 1847 pthread_spin_lock(&file->lock); 1848 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1849 if (next == NULL || next->in_progress) { 1850 /* 1851 * There is either no data to flush, or a flush I/O is already in 1852 * progress. So return immediately - if a flush I/O is in 1853 * progress we will flush more data after that is completed. 1854 */ 1855 __free_args(args); 1856 pthread_spin_unlock(&file->lock); 1857 return; 1858 } 1859 1860 offset = next->offset + next->bytes_flushed; 1861 length = next->bytes_filled - next->bytes_flushed; 1862 if (length == 0) { 1863 __free_args(args); 1864 pthread_spin_unlock(&file->lock); 1865 return; 1866 } 1867 args->op.flush.length = length; 1868 args->op.flush.cache_buffer = next; 1869 1870 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1871 1872 next->in_progress = true; 1873 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1874 offset, length, start_page, num_pages); 1875 pthread_spin_unlock(&file->lock); 1876 spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1877 next->buf + (start_page * page_size) - next->offset, 1878 start_page, num_pages, 1879 __file_flush_done, args); 1880 } 1881 1882 static void 1883 __file_extend_done(void *arg, int bserrno) 1884 { 1885 struct spdk_fs_cb_args *args = arg; 1886 1887 __wake_caller(args); 1888 } 1889 1890 static void 1891 __file_extend_blob(void *_args) 1892 { 1893 struct spdk_fs_cb_args *args = _args; 1894 struct spdk_file *file = args->file; 1895 1896 spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters); 1897 1898 spdk_bs_md_sync_blob(file->blob, __file_extend_done, args); 1899 } 1900 1901 static void 1902 __rw_from_file_done(void *arg, int bserrno) 1903 { 1904 struct spdk_fs_cb_args *args = arg; 1905 1906 __wake_caller(args); 1907 __free_args(args); 1908 } 1909 1910 static void 1911 __rw_from_file(void *_args) 1912 { 1913 struct spdk_fs_cb_args *args = _args; 1914 struct spdk_file *file = args->file; 1915 1916 if (args->op.rw.is_read) { 1917 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1918 args->op.rw.offset, args->op.rw.length, 1919 __rw_from_file_done, args); 1920 } else { 1921 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1922 args->op.rw.offset, args->op.rw.length, 1923 __rw_from_file_done, args); 1924 } 1925 } 1926 1927 static int 1928 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1929 uint64_t offset, uint64_t length, bool is_read) 1930 { 1931 struct spdk_fs_cb_args *args; 1932 1933 args = calloc(1, sizeof(*args)); 1934 if (args == NULL) { 1935 sem_post(sem); 1936 return -ENOMEM; 1937 } 1938 1939 args->file = file; 1940 args->sem = sem; 1941 args->op.rw.user_buf = payload; 1942 args->op.rw.offset = offset; 1943 args->op.rw.length = length; 1944 args->op.rw.is_read = is_read; 1945 file->fs->send_request(__rw_from_file, args); 1946 return 0; 1947 } 1948 1949 int 1950 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 1951 void *payload, uint64_t offset, uint64_t length) 1952 { 1953 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1954 struct spdk_fs_cb_args *args; 1955 uint64_t rem_length, copy, blob_size, cluster_sz; 1956 uint32_t cache_buffers_filled = 0; 1957 uint8_t *cur_payload; 1958 struct cache_buffer *last; 1959 1960 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 1961 1962 if (length == 0) { 1963 return 0; 1964 } 1965 1966 if (offset != file->append_pos) { 1967 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 1968 return -EINVAL; 1969 } 1970 1971 pthread_spin_lock(&file->lock); 1972 file->open_for_writing = true; 1973 1974 if (file->last == NULL) { 1975 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 1976 cache_append_buffer(file); 1977 } else { 1978 int rc; 1979 1980 file->append_pos += length; 1981 pthread_spin_unlock(&file->lock); 1982 rc = __send_rw_from_file(file, &channel->sem, payload, 1983 offset, length, false); 1984 sem_wait(&channel->sem); 1985 return rc; 1986 } 1987 } 1988 1989 blob_size = __file_get_blob_size(file); 1990 1991 if ((offset + length) > blob_size) { 1992 struct spdk_fs_cb_args extend_args = {}; 1993 1994 cluster_sz = file->fs->bs_opts.cluster_sz; 1995 extend_args.sem = &channel->sem; 1996 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 1997 extend_args.file = file; 1998 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 1999 pthread_spin_unlock(&file->lock); 2000 file->fs->send_request(__file_extend_blob, &extend_args); 2001 sem_wait(&channel->sem); 2002 } 2003 2004 last = file->last; 2005 rem_length = length; 2006 cur_payload = payload; 2007 while (rem_length > 0) { 2008 copy = last->buf_size - last->bytes_filled; 2009 if (copy > rem_length) { 2010 copy = rem_length; 2011 } 2012 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2013 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2014 file->append_pos += copy; 2015 if (file->length < file->append_pos) { 2016 file->length = file->append_pos; 2017 } 2018 cur_payload += copy; 2019 last->bytes_filled += copy; 2020 rem_length -= copy; 2021 if (last->bytes_filled == last->buf_size) { 2022 cache_buffers_filled++; 2023 last = cache_append_buffer(file); 2024 if (last == NULL) { 2025 BLOBFS_TRACE(file, "nomem\n"); 2026 pthread_spin_unlock(&file->lock); 2027 return -ENOMEM; 2028 } 2029 } 2030 } 2031 2032 if (cache_buffers_filled == 0) { 2033 pthread_spin_unlock(&file->lock); 2034 return 0; 2035 } 2036 2037 args = calloc(1, sizeof(*args)); 2038 if (args == NULL) { 2039 pthread_spin_unlock(&file->lock); 2040 return -ENOMEM; 2041 } 2042 2043 args->file = file; 2044 file->fs->send_request(__file_flush, args); 2045 pthread_spin_unlock(&file->lock); 2046 return 0; 2047 } 2048 2049 static void 2050 __readahead_done(void *arg, int bserrno) 2051 { 2052 struct spdk_fs_cb_args *args = arg; 2053 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2054 struct spdk_file *file = args->file; 2055 2056 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2057 2058 pthread_spin_lock(&file->lock); 2059 cache_buffer->bytes_filled = args->op.readahead.length; 2060 cache_buffer->bytes_flushed = args->op.readahead.length; 2061 cache_buffer->in_progress = false; 2062 pthread_spin_unlock(&file->lock); 2063 2064 __free_args(args); 2065 } 2066 2067 static void 2068 __readahead(void *_args) 2069 { 2070 struct spdk_fs_cb_args *args = _args; 2071 struct spdk_file *file = args->file; 2072 uint64_t offset, length, start_page, num_pages; 2073 uint32_t page_size; 2074 2075 offset = args->op.readahead.offset; 2076 length = args->op.readahead.length; 2077 assert(length > 0); 2078 2079 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 2080 2081 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2082 offset, length, start_page, num_pages); 2083 spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2084 args->op.readahead.cache_buffer->buf, 2085 start_page, num_pages, 2086 __readahead_done, args); 2087 } 2088 2089 static uint64_t 2090 __next_cache_buffer_offset(uint64_t offset) 2091 { 2092 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2093 } 2094 2095 static void 2096 check_readahead(struct spdk_file *file, uint64_t offset) 2097 { 2098 struct spdk_fs_cb_args *args; 2099 2100 offset = __next_cache_buffer_offset(offset); 2101 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2102 return; 2103 } 2104 2105 args = calloc(1, sizeof(*args)); 2106 if (args == NULL) { 2107 return; 2108 } 2109 2110 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2111 2112 args->file = file; 2113 args->op.readahead.offset = offset; 2114 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2115 args->op.readahead.cache_buffer->in_progress = true; 2116 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2117 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2118 } else { 2119 args->op.readahead.length = CACHE_BUFFER_SIZE; 2120 } 2121 file->fs->send_request(__readahead, args); 2122 } 2123 2124 static int 2125 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2126 { 2127 struct cache_buffer *buf; 2128 int rc; 2129 2130 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2131 if (buf == NULL) { 2132 pthread_spin_unlock(&file->lock); 2133 rc = __send_rw_from_file(file, sem, payload, offset, length, true); 2134 pthread_spin_lock(&file->lock); 2135 return rc; 2136 } 2137 2138 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2139 length = buf->offset + buf->bytes_filled - offset; 2140 } 2141 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2142 memcpy(payload, &buf->buf[offset - buf->offset], length); 2143 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2144 pthread_spin_lock(&g_caches_lock); 2145 spdk_tree_remove_buffer(file->tree, buf); 2146 if (file->tree->present_mask == 0) { 2147 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2148 } 2149 pthread_spin_unlock(&g_caches_lock); 2150 } 2151 2152 sem_post(sem); 2153 return 0; 2154 } 2155 2156 int64_t 2157 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2158 void *payload, uint64_t offset, uint64_t length) 2159 { 2160 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2161 uint64_t final_offset, final_length; 2162 uint32_t sub_reads = 0; 2163 int rc = 0; 2164 2165 pthread_spin_lock(&file->lock); 2166 2167 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2168 2169 file->open_for_writing = false; 2170 2171 if (length == 0 || offset >= file->append_pos) { 2172 pthread_spin_unlock(&file->lock); 2173 return 0; 2174 } 2175 2176 if (offset + length > file->append_pos) { 2177 length = file->append_pos - offset; 2178 } 2179 2180 if (offset != file->next_seq_offset) { 2181 file->seq_byte_count = 0; 2182 } 2183 file->seq_byte_count += length; 2184 file->next_seq_offset = offset + length; 2185 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2186 check_readahead(file, offset); 2187 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2188 } 2189 2190 final_length = 0; 2191 final_offset = offset + length; 2192 while (offset < final_offset) { 2193 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2194 if (length > (final_offset - offset)) { 2195 length = final_offset - offset; 2196 } 2197 rc = __file_read(file, payload, offset, length, &channel->sem); 2198 if (rc == 0) { 2199 final_length += length; 2200 } else { 2201 break; 2202 } 2203 payload += length; 2204 offset += length; 2205 sub_reads++; 2206 } 2207 pthread_spin_unlock(&file->lock); 2208 while (sub_reads-- > 0) { 2209 sem_wait(&channel->sem); 2210 } 2211 if (rc == 0) { 2212 return final_length; 2213 } else { 2214 return rc; 2215 } 2216 } 2217 2218 static void 2219 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2220 spdk_file_op_complete cb_fn, void *cb_arg) 2221 { 2222 struct spdk_fs_request *sync_req; 2223 struct spdk_fs_request *flush_req; 2224 struct spdk_fs_cb_args *sync_args; 2225 struct spdk_fs_cb_args *flush_args; 2226 2227 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2228 2229 pthread_spin_lock(&file->lock); 2230 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2231 BLOBFS_TRACE(file, "done - no data to flush\n"); 2232 pthread_spin_unlock(&file->lock); 2233 cb_fn(cb_arg, 0); 2234 return; 2235 } 2236 2237 sync_req = alloc_fs_request(channel); 2238 assert(sync_req != NULL); 2239 sync_args = &sync_req->args; 2240 2241 flush_req = alloc_fs_request(channel); 2242 assert(flush_req != NULL); 2243 flush_args = &flush_req->args; 2244 2245 sync_args->file = file; 2246 sync_args->fn.file_op = cb_fn; 2247 sync_args->arg = cb_arg; 2248 sync_args->op.sync.offset = file->append_pos; 2249 sync_args->op.sync.xattr_in_progress = false; 2250 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2251 pthread_spin_unlock(&file->lock); 2252 2253 flush_args->file = file; 2254 channel->send_request(__file_flush, flush_args); 2255 } 2256 2257 int 2258 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2259 { 2260 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2261 2262 _file_sync(file, channel, __sem_post, &channel->sem); 2263 sem_wait(&channel->sem); 2264 2265 return 0; 2266 } 2267 2268 void 2269 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2270 spdk_file_op_complete cb_fn, void *cb_arg) 2271 { 2272 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2273 2274 _file_sync(file, channel, cb_fn, cb_arg); 2275 } 2276 2277 void 2278 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2279 { 2280 BLOBFS_TRACE(file, "priority=%u\n", priority); 2281 file->priority = priority; 2282 2283 } 2284 2285 /* 2286 * Close routines 2287 */ 2288 2289 static void 2290 __file_close_async_done(void *ctx, int bserrno) 2291 { 2292 struct spdk_fs_request *req = ctx; 2293 struct spdk_fs_cb_args *args = &req->args; 2294 struct spdk_file *file = args->file; 2295 2296 if (file->is_deleted) { 2297 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2298 return; 2299 } 2300 args->fn.file_op(args->arg, bserrno); 2301 free_fs_request(req); 2302 } 2303 2304 static void 2305 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2306 { 2307 pthread_spin_lock(&file->lock); 2308 if (file->ref_count == 0) { 2309 pthread_spin_unlock(&file->lock); 2310 __file_close_async_done(req, -EBADF); 2311 return; 2312 } 2313 2314 file->ref_count--; 2315 if (file->ref_count > 0) { 2316 pthread_spin_unlock(&file->lock); 2317 __file_close_async_done(req, 0); 2318 return; 2319 } 2320 2321 pthread_spin_unlock(&file->lock); 2322 2323 spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req); 2324 } 2325 2326 static void 2327 __file_close_async__sync_done(void *arg, int fserrno) 2328 { 2329 struct spdk_fs_request *req = arg; 2330 struct spdk_fs_cb_args *args = &req->args; 2331 2332 __file_close_async(args->file, req); 2333 } 2334 2335 void 2336 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2337 { 2338 struct spdk_fs_request *req; 2339 struct spdk_fs_cb_args *args; 2340 2341 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2342 if (req == NULL) { 2343 cb_fn(cb_arg, -ENOMEM); 2344 return; 2345 } 2346 2347 args = &req->args; 2348 args->file = file; 2349 args->fn.file_op = cb_fn; 2350 args->arg = cb_arg; 2351 2352 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2353 } 2354 2355 static void 2356 __file_close_done(void *arg, int fserrno) 2357 { 2358 struct spdk_fs_cb_args *args = arg; 2359 2360 args->rc = fserrno; 2361 sem_post(args->sem); 2362 } 2363 2364 static void 2365 __file_close(void *arg) 2366 { 2367 struct spdk_fs_request *req = arg; 2368 struct spdk_fs_cb_args *args = &req->args; 2369 struct spdk_file *file = args->file; 2370 2371 __file_close_async(file, req); 2372 } 2373 2374 int 2375 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2376 { 2377 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2378 struct spdk_fs_request *req; 2379 struct spdk_fs_cb_args *args; 2380 2381 req = alloc_fs_request(channel); 2382 assert(req != NULL); 2383 2384 args = &req->args; 2385 2386 spdk_file_sync(file, _channel); 2387 BLOBFS_TRACE(file, "name=%s\n", file->name); 2388 args->file = file; 2389 args->sem = &channel->sem; 2390 args->fn.file_op = __file_close_done; 2391 args->arg = req; 2392 channel->send_request(__file_close, req); 2393 sem_wait(&channel->sem); 2394 2395 return args->rc; 2396 } 2397 2398 static void 2399 cache_free_buffers(struct spdk_file *file) 2400 { 2401 BLOBFS_TRACE(file, "free=%s\n", file->name); 2402 pthread_spin_lock(&file->lock); 2403 pthread_spin_lock(&g_caches_lock); 2404 if (file->tree->present_mask == 0) { 2405 pthread_spin_unlock(&g_caches_lock); 2406 pthread_spin_unlock(&file->lock); 2407 return; 2408 } 2409 spdk_tree_free_buffers(file->tree); 2410 2411 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2412 /* If not freed, put it in the end of the queue */ 2413 if (file->tree->present_mask != 0) { 2414 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2415 } 2416 file->last = NULL; 2417 pthread_spin_unlock(&g_caches_lock); 2418 pthread_spin_unlock(&file->lock); 2419 } 2420 2421 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS); 2422 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW); 2423