1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "blobfs_internal.h" 38 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/assert.h" 42 #include "spdk/env.h" 43 #include "spdk/util.h" 44 #include "spdk_internal/log.h" 45 46 #define BLOBFS_TRACE(file, str, args...) \ 47 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args) 48 49 #define BLOBFS_TRACE_RW(file, str, args...) \ 50 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args) 51 52 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 53 54 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 55 static struct spdk_mempool *g_cache_pool; 56 static TAILQ_HEAD(, spdk_file) g_caches; 57 static int g_fs_count = 0; 58 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 59 static pthread_spinlock_t g_caches_lock; 60 61 static void 62 __sem_post(void *arg, int bserrno) 63 { 64 sem_t *sem = arg; 65 66 sem_post(sem); 67 } 68 69 void 70 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 71 { 72 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 73 free(cache_buffer); 74 } 75 76 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 77 78 struct spdk_file { 79 struct spdk_filesystem *fs; 80 struct spdk_blob *blob; 81 char *name; 82 uint64_t length; 83 bool is_deleted; 84 bool open_for_writing; 85 uint64_t length_flushed; 86 uint64_t append_pos; 87 uint64_t seq_byte_count; 88 uint64_t next_seq_offset; 89 uint32_t priority; 90 TAILQ_ENTRY(spdk_file) tailq; 91 spdk_blob_id blobid; 92 uint32_t ref_count; 93 pthread_spinlock_t lock; 94 struct cache_buffer *last; 95 struct cache_tree *tree; 96 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 97 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 98 TAILQ_ENTRY(spdk_file) cache_tailq; 99 }; 100 101 struct spdk_deleted_file { 102 spdk_blob_id id; 103 TAILQ_ENTRY(spdk_deleted_file) tailq; 104 }; 105 106 struct spdk_filesystem { 107 struct spdk_blob_store *bs; 108 TAILQ_HEAD(, spdk_file) files; 109 struct spdk_bs_opts bs_opts; 110 struct spdk_bs_dev *bdev; 111 fs_send_request_fn send_request; 112 113 struct { 114 uint32_t max_ops; 115 struct spdk_io_channel *sync_io_channel; 116 struct spdk_fs_channel *sync_fs_channel; 117 } sync_target; 118 119 struct { 120 uint32_t max_ops; 121 struct spdk_io_channel *md_io_channel; 122 struct spdk_fs_channel *md_fs_channel; 123 } md_target; 124 125 struct { 126 uint32_t max_ops; 127 } io_target; 128 }; 129 130 struct spdk_fs_cb_args { 131 union { 132 spdk_fs_op_with_handle_complete fs_op_with_handle; 133 spdk_fs_op_complete fs_op; 134 spdk_file_op_with_handle_complete file_op_with_handle; 135 spdk_file_op_complete file_op; 136 spdk_file_stat_op_complete stat_op; 137 } fn; 138 void *arg; 139 sem_t *sem; 140 struct spdk_filesystem *fs; 141 struct spdk_file *file; 142 int rc; 143 bool from_request; 144 union { 145 struct { 146 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 147 } fs_load; 148 struct { 149 uint64_t length; 150 } truncate; 151 struct { 152 struct spdk_io_channel *channel; 153 void *user_buf; 154 void *pin_buf; 155 int is_read; 156 off_t offset; 157 size_t length; 158 uint64_t start_page; 159 uint64_t num_pages; 160 uint32_t blocklen; 161 } rw; 162 struct { 163 const char *old_name; 164 const char *new_name; 165 } rename; 166 struct { 167 struct cache_buffer *cache_buffer; 168 uint64_t length; 169 } flush; 170 struct { 171 struct cache_buffer *cache_buffer; 172 uint64_t length; 173 uint64_t offset; 174 } readahead; 175 struct { 176 uint64_t offset; 177 TAILQ_ENTRY(spdk_fs_request) tailq; 178 bool xattr_in_progress; 179 } sync; 180 struct { 181 uint32_t num_clusters; 182 } resize; 183 struct { 184 const char *name; 185 uint32_t flags; 186 TAILQ_ENTRY(spdk_fs_request) tailq; 187 } open; 188 struct { 189 const char *name; 190 } create; 191 struct { 192 const char *name; 193 } delete; 194 struct { 195 const char *name; 196 } stat; 197 } op; 198 }; 199 200 static void cache_free_buffers(struct spdk_file *file); 201 202 static void 203 __initialize_cache(void) 204 { 205 assert(g_cache_pool == NULL); 206 207 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 208 g_fs_cache_size / CACHE_BUFFER_SIZE, 209 CACHE_BUFFER_SIZE, 210 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 211 SPDK_ENV_SOCKET_ID_ANY); 212 TAILQ_INIT(&g_caches); 213 pthread_spin_init(&g_caches_lock, 0); 214 } 215 216 static void 217 __free_cache(void) 218 { 219 assert(g_cache_pool != NULL); 220 221 spdk_mempool_free(g_cache_pool); 222 g_cache_pool = NULL; 223 } 224 225 static uint64_t 226 __file_get_blob_size(struct spdk_file *file) 227 { 228 uint64_t cluster_sz; 229 230 cluster_sz = file->fs->bs_opts.cluster_sz; 231 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 232 } 233 234 struct spdk_fs_request { 235 struct spdk_fs_cb_args args; 236 TAILQ_ENTRY(spdk_fs_request) link; 237 struct spdk_fs_channel *channel; 238 }; 239 240 struct spdk_fs_channel { 241 struct spdk_fs_request *req_mem; 242 TAILQ_HEAD(, spdk_fs_request) reqs; 243 sem_t sem; 244 struct spdk_filesystem *fs; 245 struct spdk_io_channel *bs_channel; 246 fs_send_request_fn send_request; 247 bool sync; 248 pthread_spinlock_t lock; 249 }; 250 251 static struct spdk_fs_request * 252 alloc_fs_request(struct spdk_fs_channel *channel) 253 { 254 struct spdk_fs_request *req; 255 256 if (channel->sync) { 257 pthread_spin_lock(&channel->lock); 258 } 259 260 req = TAILQ_FIRST(&channel->reqs); 261 if (req) { 262 TAILQ_REMOVE(&channel->reqs, req, link); 263 } 264 265 if (channel->sync) { 266 pthread_spin_unlock(&channel->lock); 267 } 268 269 if (req == NULL) { 270 return NULL; 271 } 272 memset(req, 0, sizeof(*req)); 273 req->channel = channel; 274 req->args.from_request = true; 275 276 return req; 277 } 278 279 static void 280 free_fs_request(struct spdk_fs_request *req) 281 { 282 struct spdk_fs_channel *channel = req->channel; 283 284 if (channel->sync) { 285 pthread_spin_lock(&channel->lock); 286 } 287 288 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 289 290 if (channel->sync) { 291 pthread_spin_unlock(&channel->lock); 292 } 293 } 294 295 static int 296 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 297 uint32_t max_ops) 298 { 299 uint32_t i; 300 301 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 302 if (!channel->req_mem) { 303 return -1; 304 } 305 306 TAILQ_INIT(&channel->reqs); 307 sem_init(&channel->sem, 0, 0); 308 309 for (i = 0; i < max_ops; i++) { 310 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 311 } 312 313 channel->fs = fs; 314 315 return 0; 316 } 317 318 static int 319 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 320 { 321 struct spdk_filesystem *fs; 322 struct spdk_fs_channel *channel = ctx_buf; 323 324 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 325 326 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 327 } 328 329 static int 330 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 331 { 332 struct spdk_filesystem *fs; 333 struct spdk_fs_channel *channel = ctx_buf; 334 335 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 336 337 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 338 } 339 340 static int 341 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 342 { 343 struct spdk_filesystem *fs; 344 struct spdk_fs_channel *channel = ctx_buf; 345 346 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 347 348 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 349 } 350 351 static void 352 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 353 { 354 struct spdk_fs_channel *channel = ctx_buf; 355 356 free(channel->req_mem); 357 if (channel->bs_channel != NULL) { 358 spdk_bs_free_io_channel(channel->bs_channel); 359 } 360 } 361 362 static void 363 __send_request_direct(fs_request_fn fn, void *arg) 364 { 365 fn(arg); 366 } 367 368 static void 369 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 370 { 371 fs->bs = bs; 372 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 373 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 374 fs->md_target.md_fs_channel->send_request = __send_request_direct; 375 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 376 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 377 378 pthread_mutex_lock(&g_cache_init_lock); 379 if (g_fs_count == 0) { 380 __initialize_cache(); 381 } 382 g_fs_count++; 383 pthread_mutex_unlock(&g_cache_init_lock); 384 } 385 386 static void 387 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 388 { 389 struct spdk_fs_request *req = ctx; 390 struct spdk_fs_cb_args *args = &req->args; 391 struct spdk_filesystem *fs = args->fs; 392 393 if (bserrno == 0) { 394 common_fs_bs_init(fs, bs); 395 } else { 396 free(fs); 397 fs = NULL; 398 } 399 400 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 401 free_fs_request(req); 402 } 403 404 static struct spdk_filesystem * 405 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 406 { 407 struct spdk_filesystem *fs; 408 409 fs = calloc(1, sizeof(*fs)); 410 if (fs == NULL) { 411 return NULL; 412 } 413 414 fs->bdev = dev; 415 fs->send_request = send_request_fn; 416 TAILQ_INIT(&fs->files); 417 418 fs->md_target.max_ops = 512; 419 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 420 sizeof(struct spdk_fs_channel)); 421 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 422 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 423 424 fs->sync_target.max_ops = 512; 425 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 426 sizeof(struct spdk_fs_channel)); 427 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 428 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 429 430 fs->io_target.max_ops = 512; 431 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 432 sizeof(struct spdk_fs_channel)); 433 434 return fs; 435 } 436 437 void 438 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 439 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 440 { 441 struct spdk_filesystem *fs; 442 struct spdk_fs_request *req; 443 struct spdk_fs_cb_args *args; 444 struct spdk_bs_opts opts = {}; 445 446 fs = fs_alloc(dev, send_request_fn); 447 if (fs == NULL) { 448 cb_fn(cb_arg, NULL, -ENOMEM); 449 return; 450 } 451 452 req = alloc_fs_request(fs->md_target.md_fs_channel); 453 if (req == NULL) { 454 spdk_put_io_channel(fs->md_target.md_io_channel); 455 spdk_io_device_unregister(&fs->md_target, NULL); 456 spdk_put_io_channel(fs->sync_target.sync_io_channel); 457 spdk_io_device_unregister(&fs->sync_target, NULL); 458 spdk_io_device_unregister(&fs->io_target, NULL); 459 free(fs); 460 cb_fn(cb_arg, NULL, -ENOMEM); 461 return; 462 } 463 464 args = &req->args; 465 args->fn.fs_op_with_handle = cb_fn; 466 args->arg = cb_arg; 467 args->fs = fs; 468 469 spdk_bs_opts_init(&opts); 470 strncpy(opts.bstype.bstype, "BLOBFS", SPDK_BLOBSTORE_TYPE_LENGTH); 471 472 spdk_bs_init(dev, &opts, init_cb, req); 473 } 474 475 static struct spdk_file * 476 file_alloc(struct spdk_filesystem *fs) 477 { 478 struct spdk_file *file; 479 480 file = calloc(1, sizeof(*file)); 481 if (file == NULL) { 482 return NULL; 483 } 484 485 file->tree = calloc(1, sizeof(*file->tree)); 486 if (file->tree == NULL) { 487 free(file); 488 return NULL; 489 } 490 491 file->fs = fs; 492 TAILQ_INIT(&file->open_requests); 493 TAILQ_INIT(&file->sync_requests); 494 pthread_spin_init(&file->lock, 0); 495 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 496 file->priority = SPDK_FILE_PRIORITY_LOW; 497 return file; 498 } 499 500 static void iter_delete_cb(void *ctx, int bserrno); 501 502 static int 503 _handle_deleted_files(struct spdk_fs_request *req) 504 { 505 struct spdk_fs_cb_args *args = &req->args; 506 struct spdk_filesystem *fs = args->fs; 507 508 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 509 struct spdk_deleted_file *deleted_file; 510 511 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 512 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 513 spdk_bs_md_delete_blob(fs->bs, deleted_file->id, iter_delete_cb, req); 514 free(deleted_file); 515 return 0; 516 } 517 518 return 1; 519 } 520 521 static void 522 iter_delete_cb(void *ctx, int bserrno) 523 { 524 struct spdk_fs_request *req = ctx; 525 struct spdk_fs_cb_args *args = &req->args; 526 struct spdk_filesystem *fs = args->fs; 527 528 if (_handle_deleted_files(req) == 0) 529 return; 530 531 args->fn.fs_op_with_handle(args->arg, fs, 0); 532 free_fs_request(req); 533 534 } 535 536 static void 537 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 538 { 539 struct spdk_fs_request *req = ctx; 540 struct spdk_fs_cb_args *args = &req->args; 541 struct spdk_filesystem *fs = args->fs; 542 uint64_t *length; 543 const char *name; 544 uint32_t *is_deleted; 545 size_t value_len; 546 547 if (rc == -ENOENT) { 548 /* Finished iterating */ 549 if (_handle_deleted_files(req) == 0) 550 return; 551 args->fn.fs_op_with_handle(args->arg, fs, 0); 552 free_fs_request(req); 553 return; 554 } else if (rc < 0) { 555 args->fn.fs_op_with_handle(args->arg, fs, rc); 556 free_fs_request(req); 557 return; 558 } 559 560 rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len); 561 if (rc < 0) { 562 args->fn.fs_op_with_handle(args->arg, fs, rc); 563 free_fs_request(req); 564 return; 565 } 566 567 rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len); 568 if (rc < 0) { 569 args->fn.fs_op_with_handle(args->arg, fs, rc); 570 free_fs_request(req); 571 return; 572 } 573 574 assert(value_len == 8); 575 576 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 577 rc = spdk_bs_md_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 578 if (rc < 0) { 579 struct spdk_file *f; 580 581 f = file_alloc(fs); 582 if (f == NULL) { 583 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 584 free_fs_request(req); 585 return; 586 } 587 588 f->name = strdup(name); 589 f->blobid = spdk_blob_get_id(blob); 590 f->length = *length; 591 f->length_flushed = *length; 592 f->append_pos = *length; 593 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 594 } else { 595 struct spdk_deleted_file *deleted_file; 596 597 deleted_file = calloc(1, sizeof(*deleted_file)); 598 if (deleted_file == NULL) { 599 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 600 free_fs_request(req); 601 return; 602 } 603 deleted_file->id = spdk_blob_get_id(blob); 604 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 605 } 606 607 spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req); 608 } 609 610 static void 611 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 612 { 613 struct spdk_fs_request *req = ctx; 614 struct spdk_fs_cb_args *args = &req->args; 615 struct spdk_filesystem *fs = args->fs; 616 struct spdk_bs_type bstype; 617 static const struct spdk_bs_type blobfs_type = {"BLOBFS"}; 618 static const struct spdk_bs_type zeros; 619 620 if (bserrno != 0) { 621 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 622 free_fs_request(req); 623 free(fs); 624 return; 625 } 626 627 bstype = spdk_bs_get_bstype(bs); 628 629 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 630 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "assigning bstype"); 631 spdk_bs_set_bstype(bs, blobfs_type); 632 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 633 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "not blobfs: %s", bstype.bstype); 634 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 635 free_fs_request(req); 636 free(fs); 637 return; 638 } 639 640 common_fs_bs_init(fs, bs); 641 spdk_bs_md_iter_first(fs->bs, iter_cb, req); 642 } 643 644 void 645 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 646 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 647 { 648 struct spdk_filesystem *fs; 649 struct spdk_fs_cb_args *args; 650 struct spdk_fs_request *req; 651 struct spdk_bs_opts opts = {}; 652 653 fs = fs_alloc(dev, send_request_fn); 654 if (fs == NULL) { 655 cb_fn(cb_arg, NULL, -ENOMEM); 656 return; 657 } 658 659 req = alloc_fs_request(fs->md_target.md_fs_channel); 660 if (req == NULL) { 661 spdk_put_io_channel(fs->md_target.md_io_channel); 662 spdk_io_device_unregister(&fs->md_target, NULL); 663 spdk_put_io_channel(fs->sync_target.sync_io_channel); 664 spdk_io_device_unregister(&fs->sync_target, NULL); 665 spdk_io_device_unregister(&fs->io_target, NULL); 666 free(fs); 667 cb_fn(cb_arg, NULL, -ENOMEM); 668 return; 669 } 670 671 args = &req->args; 672 args->fn.fs_op_with_handle = cb_fn; 673 args->arg = cb_arg; 674 args->fs = fs; 675 TAILQ_INIT(&args->op.fs_load.deleted_files); 676 677 spdk_bs_opts_init(&opts); 678 679 spdk_bs_load(dev, &opts, load_cb, req); 680 } 681 682 static void 683 unload_cb(void *ctx, int bserrno) 684 { 685 struct spdk_fs_request *req = ctx; 686 struct spdk_fs_cb_args *args = &req->args; 687 struct spdk_filesystem *fs = args->fs; 688 689 pthread_mutex_lock(&g_cache_init_lock); 690 g_fs_count--; 691 if (g_fs_count == 0) { 692 __free_cache(); 693 } 694 pthread_mutex_unlock(&g_cache_init_lock); 695 696 args->fn.fs_op(args->arg, bserrno); 697 free(req); 698 699 spdk_io_device_unregister(&fs->io_target, NULL); 700 spdk_io_device_unregister(&fs->sync_target, NULL); 701 spdk_io_device_unregister(&fs->md_target, NULL); 702 703 free(fs); 704 } 705 706 void 707 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 708 { 709 struct spdk_fs_request *req; 710 struct spdk_fs_cb_args *args; 711 712 /* 713 * We must free the md_channel before unloading the blobstore, so just 714 * allocate this request from the general heap. 715 */ 716 req = calloc(1, sizeof(*req)); 717 if (req == NULL) { 718 cb_fn(cb_arg, -ENOMEM); 719 return; 720 } 721 722 args = &req->args; 723 args->fn.fs_op = cb_fn; 724 args->arg = cb_arg; 725 args->fs = fs; 726 727 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 728 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 729 spdk_bs_unload(fs->bs, unload_cb, req); 730 } 731 732 static struct spdk_file * 733 fs_find_file(struct spdk_filesystem *fs, const char *name) 734 { 735 struct spdk_file *file; 736 737 TAILQ_FOREACH(file, &fs->files, tailq) { 738 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 739 return file; 740 } 741 } 742 743 return NULL; 744 } 745 746 void 747 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 748 spdk_file_stat_op_complete cb_fn, void *cb_arg) 749 { 750 struct spdk_file_stat stat; 751 struct spdk_file *f = NULL; 752 753 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 754 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 755 return; 756 } 757 758 f = fs_find_file(fs, name); 759 if (f != NULL) { 760 stat.blobid = f->blobid; 761 stat.size = f->length; 762 cb_fn(cb_arg, &stat, 0); 763 return; 764 } 765 766 cb_fn(cb_arg, NULL, -ENOENT); 767 } 768 769 static void 770 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 771 { 772 struct spdk_fs_request *req = arg; 773 struct spdk_fs_cb_args *args = &req->args; 774 775 args->rc = fserrno; 776 if (fserrno == 0) { 777 memcpy(args->arg, stat, sizeof(*stat)); 778 } 779 sem_post(args->sem); 780 } 781 782 static void 783 __file_stat(void *arg) 784 { 785 struct spdk_fs_request *req = arg; 786 struct spdk_fs_cb_args *args = &req->args; 787 788 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 789 args->fn.stat_op, req); 790 } 791 792 int 793 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 794 const char *name, struct spdk_file_stat *stat) 795 { 796 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 797 struct spdk_fs_request *req; 798 int rc; 799 800 req = alloc_fs_request(channel); 801 assert(req != NULL); 802 803 req->args.fs = fs; 804 req->args.op.stat.name = name; 805 req->args.fn.stat_op = __copy_stat; 806 req->args.arg = stat; 807 req->args.sem = &channel->sem; 808 channel->send_request(__file_stat, req); 809 sem_wait(&channel->sem); 810 811 rc = req->args.rc; 812 free_fs_request(req); 813 814 return rc; 815 } 816 817 static void 818 fs_create_blob_close_cb(void *ctx, int bserrno) 819 { 820 struct spdk_fs_request *req = ctx; 821 struct spdk_fs_cb_args *args = &req->args; 822 823 args->fn.file_op(args->arg, bserrno); 824 free_fs_request(req); 825 } 826 827 static void 828 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 829 { 830 struct spdk_fs_request *req = ctx; 831 struct spdk_fs_cb_args *args = &req->args; 832 struct spdk_file *f = args->file; 833 uint64_t length = 0; 834 835 f->blob = blob; 836 spdk_bs_md_resize_blob(blob, 1); 837 spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 838 spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length)); 839 840 spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args); 841 } 842 843 static void 844 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 845 { 846 struct spdk_fs_request *req = ctx; 847 struct spdk_fs_cb_args *args = &req->args; 848 struct spdk_file *f = args->file; 849 850 f->blobid = blobid; 851 spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 852 } 853 854 void 855 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 856 spdk_file_op_complete cb_fn, void *cb_arg) 857 { 858 struct spdk_file *file; 859 struct spdk_fs_request *req; 860 struct spdk_fs_cb_args *args; 861 862 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 863 cb_fn(cb_arg, -ENAMETOOLONG); 864 return; 865 } 866 867 file = fs_find_file(fs, name); 868 if (file != NULL) { 869 cb_fn(cb_arg, -EEXIST); 870 return; 871 } 872 873 file = file_alloc(fs); 874 if (file == NULL) { 875 cb_fn(cb_arg, -ENOMEM); 876 return; 877 } 878 879 req = alloc_fs_request(fs->md_target.md_fs_channel); 880 if (req == NULL) { 881 cb_fn(cb_arg, -ENOMEM); 882 return; 883 } 884 885 args = &req->args; 886 args->file = file; 887 args->fn.file_op = cb_fn; 888 args->arg = cb_arg; 889 890 file->name = strdup(name); 891 spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args); 892 } 893 894 static void 895 __fs_create_file_done(void *arg, int fserrno) 896 { 897 struct spdk_fs_request *req = arg; 898 struct spdk_fs_cb_args *args = &req->args; 899 900 args->rc = fserrno; 901 sem_post(args->sem); 902 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 903 } 904 905 static void 906 __fs_create_file(void *arg) 907 { 908 struct spdk_fs_request *req = arg; 909 struct spdk_fs_cb_args *args = &req->args; 910 911 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 912 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 913 } 914 915 int 916 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 917 { 918 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 919 struct spdk_fs_request *req; 920 struct spdk_fs_cb_args *args; 921 int rc; 922 923 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 924 925 req = alloc_fs_request(channel); 926 assert(req != NULL); 927 928 args = &req->args; 929 args->fs = fs; 930 args->op.create.name = name; 931 args->sem = &channel->sem; 932 fs->send_request(__fs_create_file, req); 933 sem_wait(&channel->sem); 934 rc = args->rc; 935 free_fs_request(req); 936 937 return rc; 938 } 939 940 static void 941 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 942 { 943 struct spdk_fs_request *req = ctx; 944 struct spdk_fs_cb_args *args = &req->args; 945 struct spdk_file *f = args->file; 946 947 f->blob = blob; 948 while (!TAILQ_EMPTY(&f->open_requests)) { 949 req = TAILQ_FIRST(&f->open_requests); 950 args = &req->args; 951 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 952 args->fn.file_op_with_handle(args->arg, f, bserrno); 953 free_fs_request(req); 954 } 955 } 956 957 static void 958 fs_open_blob_create_cb(void *ctx, int bserrno) 959 { 960 struct spdk_fs_request *req = ctx; 961 struct spdk_fs_cb_args *args = &req->args; 962 struct spdk_file *file = args->file; 963 struct spdk_filesystem *fs = args->fs; 964 965 if (file == NULL) { 966 /* 967 * This is from an open with CREATE flag - the file 968 * is now created so look it up in the file list for this 969 * filesystem. 970 */ 971 file = fs_find_file(fs, args->op.open.name); 972 assert(file != NULL); 973 args->file = file; 974 } 975 976 file->ref_count++; 977 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 978 if (file->ref_count == 1) { 979 assert(file->blob == NULL); 980 spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 981 } else if (file->blob != NULL) { 982 fs_open_blob_done(req, file->blob, 0); 983 } else { 984 /* 985 * The blob open for this file is in progress due to a previous 986 * open request. When that open completes, it will invoke the 987 * open callback for this request. 988 */ 989 } 990 } 991 992 void 993 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 994 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 995 { 996 struct spdk_file *f = NULL; 997 struct spdk_fs_request *req; 998 struct spdk_fs_cb_args *args; 999 1000 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1001 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1002 return; 1003 } 1004 1005 f = fs_find_file(fs, name); 1006 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1007 cb_fn(cb_arg, NULL, -ENOENT); 1008 return; 1009 } 1010 1011 if (f != NULL && f->is_deleted == true) { 1012 cb_fn(cb_arg, NULL, -ENOENT); 1013 return; 1014 } 1015 1016 req = alloc_fs_request(fs->md_target.md_fs_channel); 1017 if (req == NULL) { 1018 cb_fn(cb_arg, NULL, -ENOMEM); 1019 return; 1020 } 1021 1022 args = &req->args; 1023 args->fn.file_op_with_handle = cb_fn; 1024 args->arg = cb_arg; 1025 args->file = f; 1026 args->fs = fs; 1027 args->op.open.name = name; 1028 1029 if (f == NULL) { 1030 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1031 } else { 1032 fs_open_blob_create_cb(req, 0); 1033 } 1034 } 1035 1036 static void 1037 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1038 { 1039 struct spdk_fs_request *req = arg; 1040 struct spdk_fs_cb_args *args = &req->args; 1041 1042 args->file = file; 1043 args->rc = bserrno; 1044 sem_post(args->sem); 1045 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 1046 } 1047 1048 static void 1049 __fs_open_file(void *arg) 1050 { 1051 struct spdk_fs_request *req = arg; 1052 struct spdk_fs_cb_args *args = &req->args; 1053 1054 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 1055 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1056 __fs_open_file_done, req); 1057 } 1058 1059 int 1060 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1061 const char *name, uint32_t flags, struct spdk_file **file) 1062 { 1063 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1064 struct spdk_fs_request *req; 1065 struct spdk_fs_cb_args *args; 1066 int rc; 1067 1068 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1069 1070 req = alloc_fs_request(channel); 1071 assert(req != NULL); 1072 1073 args = &req->args; 1074 args->fs = fs; 1075 args->op.open.name = name; 1076 args->op.open.flags = flags; 1077 args->sem = &channel->sem; 1078 fs->send_request(__fs_open_file, req); 1079 sem_wait(&channel->sem); 1080 rc = args->rc; 1081 if (rc == 0) { 1082 *file = args->file; 1083 } else { 1084 *file = NULL; 1085 } 1086 free_fs_request(req); 1087 1088 return rc; 1089 } 1090 1091 static void 1092 fs_rename_blob_close_cb(void *ctx, int bserrno) 1093 { 1094 struct spdk_fs_request *req = ctx; 1095 struct spdk_fs_cb_args *args = &req->args; 1096 1097 args->fn.fs_op(args->arg, bserrno); 1098 free_fs_request(req); 1099 } 1100 1101 static void 1102 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1103 { 1104 struct spdk_fs_request *req = ctx; 1105 struct spdk_fs_cb_args *args = &req->args; 1106 struct spdk_file *f = args->file; 1107 const char *new_name = args->op.rename.new_name; 1108 1109 f->blob = blob; 1110 spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1111 spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req); 1112 } 1113 1114 static void 1115 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1116 { 1117 struct spdk_fs_cb_args *args = &req->args; 1118 struct spdk_file *f; 1119 1120 f = fs_find_file(args->fs, args->op.rename.old_name); 1121 if (f == NULL) { 1122 args->fn.fs_op(args->arg, -ENOENT); 1123 free_fs_request(req); 1124 return; 1125 } 1126 1127 free(f->name); 1128 f->name = strdup(args->op.rename.new_name); 1129 args->file = f; 1130 spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1131 } 1132 1133 static void 1134 fs_rename_delete_done(void *arg, int fserrno) 1135 { 1136 __spdk_fs_md_rename_file(arg); 1137 } 1138 1139 void 1140 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1141 const char *old_name, const char *new_name, 1142 spdk_file_op_complete cb_fn, void *cb_arg) 1143 { 1144 struct spdk_file *f; 1145 struct spdk_fs_request *req; 1146 struct spdk_fs_cb_args *args; 1147 1148 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1149 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1150 cb_fn(cb_arg, -ENAMETOOLONG); 1151 return; 1152 } 1153 1154 req = alloc_fs_request(fs->md_target.md_fs_channel); 1155 if (req == NULL) { 1156 cb_fn(cb_arg, -ENOMEM); 1157 return; 1158 } 1159 1160 args = &req->args; 1161 args->fn.fs_op = cb_fn; 1162 args->fs = fs; 1163 args->arg = cb_arg; 1164 args->op.rename.old_name = old_name; 1165 args->op.rename.new_name = new_name; 1166 1167 f = fs_find_file(fs, new_name); 1168 if (f == NULL) { 1169 __spdk_fs_md_rename_file(req); 1170 return; 1171 } 1172 1173 /* 1174 * The rename overwrites an existing file. So delete the existing file, then 1175 * do the actual rename. 1176 */ 1177 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1178 } 1179 1180 static void 1181 __fs_rename_file_done(void *arg, int fserrno) 1182 { 1183 struct spdk_fs_request *req = arg; 1184 struct spdk_fs_cb_args *args = &req->args; 1185 1186 args->rc = fserrno; 1187 sem_post(args->sem); 1188 } 1189 1190 static void 1191 __fs_rename_file(void *arg) 1192 { 1193 struct spdk_fs_request *req = arg; 1194 struct spdk_fs_cb_args *args = &req->args; 1195 1196 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1197 __fs_rename_file_done, req); 1198 } 1199 1200 int 1201 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1202 const char *old_name, const char *new_name) 1203 { 1204 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1205 struct spdk_fs_request *req; 1206 struct spdk_fs_cb_args *args; 1207 int rc; 1208 1209 req = alloc_fs_request(channel); 1210 assert(req != NULL); 1211 1212 args = &req->args; 1213 1214 args->fs = fs; 1215 args->op.rename.old_name = old_name; 1216 args->op.rename.new_name = new_name; 1217 args->sem = &channel->sem; 1218 fs->send_request(__fs_rename_file, req); 1219 sem_wait(&channel->sem); 1220 rc = args->rc; 1221 free_fs_request(req); 1222 return rc; 1223 } 1224 1225 static void 1226 blob_delete_cb(void *ctx, int bserrno) 1227 { 1228 struct spdk_fs_request *req = ctx; 1229 struct spdk_fs_cb_args *args = &req->args; 1230 1231 args->fn.file_op(args->arg, bserrno); 1232 free_fs_request(req); 1233 } 1234 1235 void 1236 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1237 spdk_file_op_complete cb_fn, void *cb_arg) 1238 { 1239 struct spdk_file *f; 1240 spdk_blob_id blobid; 1241 struct spdk_fs_request *req; 1242 struct spdk_fs_cb_args *args; 1243 1244 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1245 1246 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1247 cb_fn(cb_arg, -ENAMETOOLONG); 1248 return; 1249 } 1250 1251 f = fs_find_file(fs, name); 1252 if (f == NULL) { 1253 cb_fn(cb_arg, -ENOENT); 1254 return; 1255 } 1256 1257 req = alloc_fs_request(fs->md_target.md_fs_channel); 1258 if (req == NULL) { 1259 cb_fn(cb_arg, -ENOMEM); 1260 return; 1261 } 1262 1263 args = &req->args; 1264 args->fn.file_op = cb_fn; 1265 args->arg = cb_arg; 1266 1267 if (f->ref_count > 0) { 1268 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1269 f->is_deleted = true; 1270 spdk_blob_md_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1271 spdk_bs_md_sync_blob(f->blob, blob_delete_cb, args); 1272 return; 1273 } 1274 1275 TAILQ_REMOVE(&fs->files, f, tailq); 1276 1277 cache_free_buffers(f); 1278 1279 blobid = f->blobid; 1280 1281 free(f->name); 1282 free(f->tree); 1283 free(f); 1284 1285 spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1286 } 1287 1288 static void 1289 __fs_delete_file_done(void *arg, int fserrno) 1290 { 1291 struct spdk_fs_request *req = arg; 1292 struct spdk_fs_cb_args *args = &req->args; 1293 1294 args->rc = fserrno; 1295 sem_post(args->sem); 1296 } 1297 1298 static void 1299 __fs_delete_file(void *arg) 1300 { 1301 struct spdk_fs_request *req = arg; 1302 struct spdk_fs_cb_args *args = &req->args; 1303 1304 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1305 } 1306 1307 int 1308 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1309 const char *name) 1310 { 1311 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1312 struct spdk_fs_request *req; 1313 struct spdk_fs_cb_args *args; 1314 int rc; 1315 1316 req = alloc_fs_request(channel); 1317 assert(req != NULL); 1318 1319 args = &req->args; 1320 args->fs = fs; 1321 args->op.delete.name = name; 1322 args->sem = &channel->sem; 1323 fs->send_request(__fs_delete_file, req); 1324 sem_wait(&channel->sem); 1325 rc = args->rc; 1326 free_fs_request(req); 1327 1328 return rc; 1329 } 1330 1331 spdk_fs_iter 1332 spdk_fs_iter_first(struct spdk_filesystem *fs) 1333 { 1334 struct spdk_file *f; 1335 1336 f = TAILQ_FIRST(&fs->files); 1337 return f; 1338 } 1339 1340 spdk_fs_iter 1341 spdk_fs_iter_next(spdk_fs_iter iter) 1342 { 1343 struct spdk_file *f = iter; 1344 1345 if (f == NULL) { 1346 return NULL; 1347 } 1348 1349 f = TAILQ_NEXT(f, tailq); 1350 return f; 1351 } 1352 1353 const char * 1354 spdk_file_get_name(struct spdk_file *file) 1355 { 1356 return file->name; 1357 } 1358 1359 uint64_t 1360 spdk_file_get_length(struct spdk_file *file) 1361 { 1362 assert(file != NULL); 1363 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1364 return file->length; 1365 } 1366 1367 static void 1368 fs_truncate_complete_cb(void *ctx, int bserrno) 1369 { 1370 struct spdk_fs_request *req = ctx; 1371 struct spdk_fs_cb_args *args = &req->args; 1372 1373 args->fn.file_op(args->arg, bserrno); 1374 free_fs_request(req); 1375 } 1376 1377 static uint64_t 1378 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1379 { 1380 return (length + cluster_sz - 1) / cluster_sz; 1381 } 1382 1383 void 1384 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1385 spdk_file_op_complete cb_fn, void *cb_arg) 1386 { 1387 struct spdk_filesystem *fs; 1388 size_t num_clusters; 1389 struct spdk_fs_request *req; 1390 struct spdk_fs_cb_args *args; 1391 1392 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1393 if (length == file->length) { 1394 cb_fn(cb_arg, 0); 1395 return; 1396 } 1397 1398 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1399 if (req == NULL) { 1400 cb_fn(cb_arg, -ENOMEM); 1401 return; 1402 } 1403 1404 args = &req->args; 1405 args->fn.file_op = cb_fn; 1406 args->arg = cb_arg; 1407 args->file = file; 1408 fs = file->fs; 1409 1410 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1411 1412 spdk_bs_md_resize_blob(file->blob, num_clusters); 1413 spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length)); 1414 1415 file->length = length; 1416 if (file->append_pos > file->length) { 1417 file->append_pos = file->length; 1418 } 1419 1420 spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args); 1421 } 1422 1423 static void 1424 __truncate(void *arg) 1425 { 1426 struct spdk_fs_request *req = arg; 1427 struct spdk_fs_cb_args *args = &req->args; 1428 1429 spdk_file_truncate_async(args->file, args->op.truncate.length, 1430 args->fn.file_op, args->arg); 1431 } 1432 1433 void 1434 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1435 uint64_t length) 1436 { 1437 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1438 struct spdk_fs_request *req; 1439 struct spdk_fs_cb_args *args; 1440 1441 req = alloc_fs_request(channel); 1442 assert(req != NULL); 1443 1444 args = &req->args; 1445 1446 args->file = file; 1447 args->op.truncate.length = length; 1448 args->fn.file_op = __sem_post; 1449 args->arg = &channel->sem; 1450 1451 channel->send_request(__truncate, req); 1452 sem_wait(&channel->sem); 1453 free_fs_request(req); 1454 } 1455 1456 static void 1457 __rw_done(void *ctx, int bserrno) 1458 { 1459 struct spdk_fs_request *req = ctx; 1460 struct spdk_fs_cb_args *args = &req->args; 1461 1462 spdk_dma_free(args->op.rw.pin_buf); 1463 args->fn.file_op(args->arg, bserrno); 1464 free_fs_request(req); 1465 } 1466 1467 static void 1468 __read_done(void *ctx, int bserrno) 1469 { 1470 struct spdk_fs_request *req = ctx; 1471 struct spdk_fs_cb_args *args = &req->args; 1472 1473 if (args->op.rw.is_read) { 1474 memcpy(args->op.rw.user_buf, 1475 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1476 args->op.rw.length); 1477 __rw_done(req, 0); 1478 } else { 1479 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1480 args->op.rw.user_buf, 1481 args->op.rw.length); 1482 spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel, 1483 args->op.rw.pin_buf, 1484 args->op.rw.start_page, args->op.rw.num_pages, 1485 __rw_done, req); 1486 } 1487 } 1488 1489 static void 1490 __do_blob_read(void *ctx, int fserrno) 1491 { 1492 struct spdk_fs_request *req = ctx; 1493 struct spdk_fs_cb_args *args = &req->args; 1494 1495 spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel, 1496 args->op.rw.pin_buf, 1497 args->op.rw.start_page, args->op.rw.num_pages, 1498 __read_done, req); 1499 } 1500 1501 static void 1502 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1503 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1504 { 1505 uint64_t end_page; 1506 1507 *page_size = spdk_bs_get_page_size(file->fs->bs); 1508 *start_page = offset / *page_size; 1509 end_page = (offset + length - 1) / *page_size; 1510 *num_pages = (end_page - *start_page + 1); 1511 } 1512 1513 static void 1514 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1515 void *payload, uint64_t offset, uint64_t length, 1516 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1517 { 1518 struct spdk_fs_request *req; 1519 struct spdk_fs_cb_args *args; 1520 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1521 uint64_t start_page, num_pages, pin_buf_length; 1522 uint32_t page_size; 1523 1524 if (is_read && offset + length > file->length) { 1525 cb_fn(cb_arg, -EINVAL); 1526 return; 1527 } 1528 1529 req = alloc_fs_request(channel); 1530 if (req == NULL) { 1531 cb_fn(cb_arg, -ENOMEM); 1532 return; 1533 } 1534 1535 args = &req->args; 1536 args->fn.file_op = cb_fn; 1537 args->arg = cb_arg; 1538 args->file = file; 1539 args->op.rw.channel = channel->bs_channel; 1540 args->op.rw.user_buf = payload; 1541 args->op.rw.is_read = is_read; 1542 args->op.rw.offset = offset; 1543 args->op.rw.length = length; 1544 1545 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1546 pin_buf_length = num_pages * page_size; 1547 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL); 1548 1549 args->op.rw.start_page = start_page; 1550 args->op.rw.num_pages = num_pages; 1551 1552 if (!is_read && file->length < offset + length) { 1553 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1554 } else { 1555 __do_blob_read(req, 0); 1556 } 1557 } 1558 1559 void 1560 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1561 void *payload, uint64_t offset, uint64_t length, 1562 spdk_file_op_complete cb_fn, void *cb_arg) 1563 { 1564 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1565 } 1566 1567 void 1568 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1569 void *payload, uint64_t offset, uint64_t length, 1570 spdk_file_op_complete cb_fn, void *cb_arg) 1571 { 1572 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n", 1573 file->name, offset, length); 1574 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1575 } 1576 1577 struct spdk_io_channel * 1578 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1579 { 1580 struct spdk_io_channel *io_channel; 1581 struct spdk_fs_channel *fs_channel; 1582 1583 io_channel = spdk_get_io_channel(&fs->io_target); 1584 fs_channel = spdk_io_channel_get_ctx(io_channel); 1585 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1586 fs_channel->send_request = __send_request_direct; 1587 1588 return io_channel; 1589 } 1590 1591 struct spdk_io_channel * 1592 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs) 1593 { 1594 struct spdk_io_channel *io_channel; 1595 struct spdk_fs_channel *fs_channel; 1596 1597 io_channel = spdk_get_io_channel(&fs->io_target); 1598 fs_channel = spdk_io_channel_get_ctx(io_channel); 1599 fs_channel->send_request = fs->send_request; 1600 fs_channel->sync = 1; 1601 pthread_spin_init(&fs_channel->lock, 0); 1602 1603 return io_channel; 1604 } 1605 1606 void 1607 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1608 { 1609 spdk_put_io_channel(channel); 1610 } 1611 1612 void 1613 spdk_fs_set_cache_size(uint64_t size_in_mb) 1614 { 1615 g_fs_cache_size = size_in_mb * 1024 * 1024; 1616 } 1617 1618 uint64_t 1619 spdk_fs_get_cache_size(void) 1620 { 1621 return g_fs_cache_size / (1024 * 1024); 1622 } 1623 1624 static void __file_flush(void *_args); 1625 1626 static void * 1627 alloc_cache_memory_buffer(struct spdk_file *context) 1628 { 1629 struct spdk_file *file; 1630 void *buf; 1631 1632 buf = spdk_mempool_get(g_cache_pool); 1633 if (buf != NULL) { 1634 return buf; 1635 } 1636 1637 pthread_spin_lock(&g_caches_lock); 1638 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1639 if (!file->open_for_writing && 1640 file->priority == SPDK_FILE_PRIORITY_LOW && 1641 file != context) { 1642 break; 1643 } 1644 } 1645 pthread_spin_unlock(&g_caches_lock); 1646 if (file != NULL) { 1647 cache_free_buffers(file); 1648 buf = spdk_mempool_get(g_cache_pool); 1649 if (buf != NULL) { 1650 return buf; 1651 } 1652 } 1653 1654 pthread_spin_lock(&g_caches_lock); 1655 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1656 if (!file->open_for_writing && file != context) { 1657 break; 1658 } 1659 } 1660 pthread_spin_unlock(&g_caches_lock); 1661 if (file != NULL) { 1662 cache_free_buffers(file); 1663 buf = spdk_mempool_get(g_cache_pool); 1664 if (buf != NULL) { 1665 return buf; 1666 } 1667 } 1668 1669 pthread_spin_lock(&g_caches_lock); 1670 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1671 if (file != context) { 1672 break; 1673 } 1674 } 1675 pthread_spin_unlock(&g_caches_lock); 1676 if (file != NULL) { 1677 cache_free_buffers(file); 1678 buf = spdk_mempool_get(g_cache_pool); 1679 if (buf != NULL) { 1680 return buf; 1681 } 1682 } 1683 1684 return NULL; 1685 } 1686 1687 static struct cache_buffer * 1688 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1689 { 1690 struct cache_buffer *buf; 1691 int count = 0; 1692 1693 buf = calloc(1, sizeof(*buf)); 1694 if (buf == NULL) { 1695 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "calloc failed\n"); 1696 return NULL; 1697 } 1698 1699 buf->buf = alloc_cache_memory_buffer(file); 1700 while (buf->buf == NULL) { 1701 /* 1702 * TODO: alloc_cache_memory_buffer() should eventually free 1703 * some buffers. Need a more sophisticated check here, instead 1704 * of just bailing if 100 tries does not result in getting a 1705 * free buffer. This will involve using the sync channel's 1706 * semaphore to block until a buffer becomes available. 1707 */ 1708 if (count++ == 100) { 1709 SPDK_ERRLOG("could not allocate cache buffer\n"); 1710 assert(false); 1711 free(buf); 1712 return NULL; 1713 } 1714 buf->buf = alloc_cache_memory_buffer(file); 1715 } 1716 1717 buf->buf_size = CACHE_BUFFER_SIZE; 1718 buf->offset = offset; 1719 1720 pthread_spin_lock(&g_caches_lock); 1721 if (file->tree->present_mask == 0) { 1722 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1723 } 1724 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1725 pthread_spin_unlock(&g_caches_lock); 1726 1727 return buf; 1728 } 1729 1730 static struct cache_buffer * 1731 cache_append_buffer(struct spdk_file *file) 1732 { 1733 struct cache_buffer *last; 1734 1735 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1736 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1737 1738 last = cache_insert_buffer(file, file->append_pos); 1739 if (last == NULL) { 1740 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n"); 1741 return NULL; 1742 } 1743 1744 file->last = last; 1745 1746 return last; 1747 } 1748 1749 static void 1750 __wake_caller(struct spdk_fs_cb_args *args) 1751 { 1752 sem_post(args->sem); 1753 } 1754 1755 static void __check_sync_reqs(struct spdk_file *file); 1756 1757 static void 1758 __file_cache_finish_sync(struct spdk_file *file) 1759 { 1760 struct spdk_fs_request *sync_req; 1761 struct spdk_fs_cb_args *sync_args; 1762 1763 pthread_spin_lock(&file->lock); 1764 sync_req = TAILQ_FIRST(&file->sync_requests); 1765 sync_args = &sync_req->args; 1766 assert(sync_args->op.sync.offset <= file->length_flushed); 1767 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1768 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1769 pthread_spin_unlock(&file->lock); 1770 1771 sync_args->fn.file_op(sync_args->arg, 0); 1772 __check_sync_reqs(file); 1773 1774 pthread_spin_lock(&file->lock); 1775 free_fs_request(sync_req); 1776 pthread_spin_unlock(&file->lock); 1777 } 1778 1779 static void 1780 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1781 { 1782 struct spdk_file *file = ctx; 1783 1784 __file_cache_finish_sync(file); 1785 } 1786 1787 static void 1788 __free_args(struct spdk_fs_cb_args *args) 1789 { 1790 struct spdk_fs_request *req; 1791 1792 if (!args->from_request) { 1793 free(args); 1794 } else { 1795 /* Depends on args being at the start of the spdk_fs_request structure. */ 1796 req = (struct spdk_fs_request *)args; 1797 free_fs_request(req); 1798 } 1799 } 1800 1801 static void 1802 __check_sync_reqs(struct spdk_file *file) 1803 { 1804 struct spdk_fs_request *sync_req; 1805 1806 pthread_spin_lock(&file->lock); 1807 1808 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1809 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1810 break; 1811 } 1812 } 1813 1814 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1815 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1816 sync_req->args.op.sync.xattr_in_progress = true; 1817 spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed, 1818 sizeof(file->length_flushed)); 1819 1820 pthread_spin_unlock(&file->lock); 1821 spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file); 1822 } else { 1823 pthread_spin_unlock(&file->lock); 1824 } 1825 } 1826 1827 static void 1828 __file_flush_done(void *arg, int bserrno) 1829 { 1830 struct spdk_fs_cb_args *args = arg; 1831 struct spdk_file *file = args->file; 1832 struct cache_buffer *next = args->op.flush.cache_buffer; 1833 1834 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1835 1836 pthread_spin_lock(&file->lock); 1837 next->in_progress = false; 1838 next->bytes_flushed += args->op.flush.length; 1839 file->length_flushed += args->op.flush.length; 1840 if (file->length_flushed > file->length) { 1841 file->length = file->length_flushed; 1842 } 1843 if (next->bytes_flushed == next->buf_size) { 1844 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1845 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1846 } 1847 1848 /* 1849 * Assert that there is no cached data that extends past the end of the underlying 1850 * blob. 1851 */ 1852 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1853 next->bytes_filled == 0); 1854 1855 pthread_spin_unlock(&file->lock); 1856 1857 __check_sync_reqs(file); 1858 1859 __file_flush(args); 1860 } 1861 1862 static void 1863 __file_flush(void *_args) 1864 { 1865 struct spdk_fs_cb_args *args = _args; 1866 struct spdk_file *file = args->file; 1867 struct cache_buffer *next; 1868 uint64_t offset, length, start_page, num_pages; 1869 uint32_t page_size; 1870 1871 pthread_spin_lock(&file->lock); 1872 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1873 if (next == NULL || next->in_progress) { 1874 /* 1875 * There is either no data to flush, or a flush I/O is already in 1876 * progress. So return immediately - if a flush I/O is in 1877 * progress we will flush more data after that is completed. 1878 */ 1879 __free_args(args); 1880 pthread_spin_unlock(&file->lock); 1881 return; 1882 } 1883 1884 offset = next->offset + next->bytes_flushed; 1885 length = next->bytes_filled - next->bytes_flushed; 1886 if (length == 0) { 1887 __free_args(args); 1888 pthread_spin_unlock(&file->lock); 1889 return; 1890 } 1891 args->op.flush.length = length; 1892 args->op.flush.cache_buffer = next; 1893 1894 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1895 1896 next->in_progress = true; 1897 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1898 offset, length, start_page, num_pages); 1899 pthread_spin_unlock(&file->lock); 1900 spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1901 next->buf + (start_page * page_size) - next->offset, 1902 start_page, num_pages, 1903 __file_flush_done, args); 1904 } 1905 1906 static void 1907 __file_extend_done(void *arg, int bserrno) 1908 { 1909 struct spdk_fs_cb_args *args = arg; 1910 1911 __wake_caller(args); 1912 } 1913 1914 static void 1915 __file_extend_blob(void *_args) 1916 { 1917 struct spdk_fs_cb_args *args = _args; 1918 struct spdk_file *file = args->file; 1919 1920 spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters); 1921 1922 spdk_bs_md_sync_blob(file->blob, __file_extend_done, args); 1923 } 1924 1925 static void 1926 __rw_from_file_done(void *arg, int bserrno) 1927 { 1928 struct spdk_fs_cb_args *args = arg; 1929 1930 __wake_caller(args); 1931 __free_args(args); 1932 } 1933 1934 static void 1935 __rw_from_file(void *_args) 1936 { 1937 struct spdk_fs_cb_args *args = _args; 1938 struct spdk_file *file = args->file; 1939 1940 if (args->op.rw.is_read) { 1941 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1942 args->op.rw.offset, args->op.rw.length, 1943 __rw_from_file_done, args); 1944 } else { 1945 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1946 args->op.rw.offset, args->op.rw.length, 1947 __rw_from_file_done, args); 1948 } 1949 } 1950 1951 static int 1952 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1953 uint64_t offset, uint64_t length, bool is_read) 1954 { 1955 struct spdk_fs_cb_args *args; 1956 1957 args = calloc(1, sizeof(*args)); 1958 if (args == NULL) { 1959 sem_post(sem); 1960 return -ENOMEM; 1961 } 1962 1963 args->file = file; 1964 args->sem = sem; 1965 args->op.rw.user_buf = payload; 1966 args->op.rw.offset = offset; 1967 args->op.rw.length = length; 1968 args->op.rw.is_read = is_read; 1969 file->fs->send_request(__rw_from_file, args); 1970 return 0; 1971 } 1972 1973 int 1974 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 1975 void *payload, uint64_t offset, uint64_t length) 1976 { 1977 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1978 struct spdk_fs_cb_args *args; 1979 uint64_t rem_length, copy, blob_size, cluster_sz; 1980 uint32_t cache_buffers_filled = 0; 1981 uint8_t *cur_payload; 1982 struct cache_buffer *last; 1983 1984 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 1985 1986 if (length == 0) { 1987 return 0; 1988 } 1989 1990 if (offset != file->append_pos) { 1991 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 1992 return -EINVAL; 1993 } 1994 1995 pthread_spin_lock(&file->lock); 1996 file->open_for_writing = true; 1997 1998 if (file->last == NULL) { 1999 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 2000 cache_append_buffer(file); 2001 } else { 2002 int rc; 2003 2004 file->append_pos += length; 2005 pthread_spin_unlock(&file->lock); 2006 rc = __send_rw_from_file(file, &channel->sem, payload, 2007 offset, length, false); 2008 sem_wait(&channel->sem); 2009 return rc; 2010 } 2011 } 2012 2013 blob_size = __file_get_blob_size(file); 2014 2015 if ((offset + length) > blob_size) { 2016 struct spdk_fs_cb_args extend_args = {}; 2017 2018 cluster_sz = file->fs->bs_opts.cluster_sz; 2019 extend_args.sem = &channel->sem; 2020 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2021 extend_args.file = file; 2022 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2023 pthread_spin_unlock(&file->lock); 2024 file->fs->send_request(__file_extend_blob, &extend_args); 2025 sem_wait(&channel->sem); 2026 } 2027 2028 last = file->last; 2029 rem_length = length; 2030 cur_payload = payload; 2031 while (rem_length > 0) { 2032 copy = last->buf_size - last->bytes_filled; 2033 if (copy > rem_length) { 2034 copy = rem_length; 2035 } 2036 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2037 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2038 file->append_pos += copy; 2039 if (file->length < file->append_pos) { 2040 file->length = file->append_pos; 2041 } 2042 cur_payload += copy; 2043 last->bytes_filled += copy; 2044 rem_length -= copy; 2045 if (last->bytes_filled == last->buf_size) { 2046 cache_buffers_filled++; 2047 last = cache_append_buffer(file); 2048 if (last == NULL) { 2049 BLOBFS_TRACE(file, "nomem\n"); 2050 pthread_spin_unlock(&file->lock); 2051 return -ENOMEM; 2052 } 2053 } 2054 } 2055 2056 if (cache_buffers_filled == 0) { 2057 pthread_spin_unlock(&file->lock); 2058 return 0; 2059 } 2060 2061 args = calloc(1, sizeof(*args)); 2062 if (args == NULL) { 2063 pthread_spin_unlock(&file->lock); 2064 return -ENOMEM; 2065 } 2066 2067 args->file = file; 2068 file->fs->send_request(__file_flush, args); 2069 pthread_spin_unlock(&file->lock); 2070 return 0; 2071 } 2072 2073 static void 2074 __readahead_done(void *arg, int bserrno) 2075 { 2076 struct spdk_fs_cb_args *args = arg; 2077 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2078 struct spdk_file *file = args->file; 2079 2080 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2081 2082 pthread_spin_lock(&file->lock); 2083 cache_buffer->bytes_filled = args->op.readahead.length; 2084 cache_buffer->bytes_flushed = args->op.readahead.length; 2085 cache_buffer->in_progress = false; 2086 pthread_spin_unlock(&file->lock); 2087 2088 __free_args(args); 2089 } 2090 2091 static void 2092 __readahead(void *_args) 2093 { 2094 struct spdk_fs_cb_args *args = _args; 2095 struct spdk_file *file = args->file; 2096 uint64_t offset, length, start_page, num_pages; 2097 uint32_t page_size; 2098 2099 offset = args->op.readahead.offset; 2100 length = args->op.readahead.length; 2101 assert(length > 0); 2102 2103 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 2104 2105 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2106 offset, length, start_page, num_pages); 2107 spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2108 args->op.readahead.cache_buffer->buf, 2109 start_page, num_pages, 2110 __readahead_done, args); 2111 } 2112 2113 static uint64_t 2114 __next_cache_buffer_offset(uint64_t offset) 2115 { 2116 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2117 } 2118 2119 static void 2120 check_readahead(struct spdk_file *file, uint64_t offset) 2121 { 2122 struct spdk_fs_cb_args *args; 2123 2124 offset = __next_cache_buffer_offset(offset); 2125 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2126 return; 2127 } 2128 2129 args = calloc(1, sizeof(*args)); 2130 if (args == NULL) { 2131 return; 2132 } 2133 2134 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2135 2136 args->file = file; 2137 args->op.readahead.offset = offset; 2138 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2139 args->op.readahead.cache_buffer->in_progress = true; 2140 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2141 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2142 } else { 2143 args->op.readahead.length = CACHE_BUFFER_SIZE; 2144 } 2145 file->fs->send_request(__readahead, args); 2146 } 2147 2148 static int 2149 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2150 { 2151 struct cache_buffer *buf; 2152 int rc; 2153 2154 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2155 if (buf == NULL) { 2156 pthread_spin_unlock(&file->lock); 2157 rc = __send_rw_from_file(file, sem, payload, offset, length, true); 2158 pthread_spin_lock(&file->lock); 2159 return rc; 2160 } 2161 2162 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2163 length = buf->offset + buf->bytes_filled - offset; 2164 } 2165 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2166 memcpy(payload, &buf->buf[offset - buf->offset], length); 2167 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2168 pthread_spin_lock(&g_caches_lock); 2169 spdk_tree_remove_buffer(file->tree, buf); 2170 if (file->tree->present_mask == 0) { 2171 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2172 } 2173 pthread_spin_unlock(&g_caches_lock); 2174 } 2175 2176 sem_post(sem); 2177 return 0; 2178 } 2179 2180 int64_t 2181 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2182 void *payload, uint64_t offset, uint64_t length) 2183 { 2184 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2185 uint64_t final_offset, final_length; 2186 uint32_t sub_reads = 0; 2187 int rc = 0; 2188 2189 pthread_spin_lock(&file->lock); 2190 2191 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2192 2193 file->open_for_writing = false; 2194 2195 if (length == 0 || offset >= file->append_pos) { 2196 pthread_spin_unlock(&file->lock); 2197 return 0; 2198 } 2199 2200 if (offset + length > file->append_pos) { 2201 length = file->append_pos - offset; 2202 } 2203 2204 if (offset != file->next_seq_offset) { 2205 file->seq_byte_count = 0; 2206 } 2207 file->seq_byte_count += length; 2208 file->next_seq_offset = offset + length; 2209 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2210 check_readahead(file, offset); 2211 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2212 } 2213 2214 final_length = 0; 2215 final_offset = offset + length; 2216 while (offset < final_offset) { 2217 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2218 if (length > (final_offset - offset)) { 2219 length = final_offset - offset; 2220 } 2221 rc = __file_read(file, payload, offset, length, &channel->sem); 2222 if (rc == 0) { 2223 final_length += length; 2224 } else { 2225 break; 2226 } 2227 payload += length; 2228 offset += length; 2229 sub_reads++; 2230 } 2231 pthread_spin_unlock(&file->lock); 2232 while (sub_reads-- > 0) { 2233 sem_wait(&channel->sem); 2234 } 2235 if (rc == 0) { 2236 return final_length; 2237 } else { 2238 return rc; 2239 } 2240 } 2241 2242 static void 2243 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2244 spdk_file_op_complete cb_fn, void *cb_arg) 2245 { 2246 struct spdk_fs_request *sync_req; 2247 struct spdk_fs_request *flush_req; 2248 struct spdk_fs_cb_args *sync_args; 2249 struct spdk_fs_cb_args *flush_args; 2250 2251 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2252 2253 pthread_spin_lock(&file->lock); 2254 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2255 BLOBFS_TRACE(file, "done - no data to flush\n"); 2256 pthread_spin_unlock(&file->lock); 2257 cb_fn(cb_arg, 0); 2258 return; 2259 } 2260 2261 sync_req = alloc_fs_request(channel); 2262 assert(sync_req != NULL); 2263 sync_args = &sync_req->args; 2264 2265 flush_req = alloc_fs_request(channel); 2266 assert(flush_req != NULL); 2267 flush_args = &flush_req->args; 2268 2269 sync_args->file = file; 2270 sync_args->fn.file_op = cb_fn; 2271 sync_args->arg = cb_arg; 2272 sync_args->op.sync.offset = file->append_pos; 2273 sync_args->op.sync.xattr_in_progress = false; 2274 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2275 pthread_spin_unlock(&file->lock); 2276 2277 flush_args->file = file; 2278 channel->send_request(__file_flush, flush_args); 2279 } 2280 2281 int 2282 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2283 { 2284 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2285 2286 _file_sync(file, channel, __sem_post, &channel->sem); 2287 sem_wait(&channel->sem); 2288 2289 return 0; 2290 } 2291 2292 void 2293 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2294 spdk_file_op_complete cb_fn, void *cb_arg) 2295 { 2296 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2297 2298 _file_sync(file, channel, cb_fn, cb_arg); 2299 } 2300 2301 void 2302 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2303 { 2304 BLOBFS_TRACE(file, "priority=%u\n", priority); 2305 file->priority = priority; 2306 2307 } 2308 2309 /* 2310 * Close routines 2311 */ 2312 2313 static void 2314 __file_close_async_done(void *ctx, int bserrno) 2315 { 2316 struct spdk_fs_request *req = ctx; 2317 struct spdk_fs_cb_args *args = &req->args; 2318 struct spdk_file *file = args->file; 2319 2320 if (file->is_deleted) { 2321 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2322 return; 2323 } 2324 args->fn.file_op(args->arg, bserrno); 2325 free_fs_request(req); 2326 } 2327 2328 static void 2329 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2330 { 2331 pthread_spin_lock(&file->lock); 2332 if (file->ref_count == 0) { 2333 pthread_spin_unlock(&file->lock); 2334 __file_close_async_done(req, -EBADF); 2335 return; 2336 } 2337 2338 file->ref_count--; 2339 if (file->ref_count > 0) { 2340 pthread_spin_unlock(&file->lock); 2341 __file_close_async_done(req, 0); 2342 return; 2343 } 2344 2345 pthread_spin_unlock(&file->lock); 2346 2347 spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req); 2348 } 2349 2350 static void 2351 __file_close_async__sync_done(void *arg, int fserrno) 2352 { 2353 struct spdk_fs_request *req = arg; 2354 struct spdk_fs_cb_args *args = &req->args; 2355 2356 __file_close_async(args->file, req); 2357 } 2358 2359 void 2360 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2361 { 2362 struct spdk_fs_request *req; 2363 struct spdk_fs_cb_args *args; 2364 2365 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2366 if (req == NULL) { 2367 cb_fn(cb_arg, -ENOMEM); 2368 return; 2369 } 2370 2371 args = &req->args; 2372 args->file = file; 2373 args->fn.file_op = cb_fn; 2374 args->arg = cb_arg; 2375 2376 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2377 } 2378 2379 static void 2380 __file_close_done(void *arg, int fserrno) 2381 { 2382 struct spdk_fs_cb_args *args = arg; 2383 2384 args->rc = fserrno; 2385 sem_post(args->sem); 2386 } 2387 2388 static void 2389 __file_close(void *arg) 2390 { 2391 struct spdk_fs_request *req = arg; 2392 struct spdk_fs_cb_args *args = &req->args; 2393 struct spdk_file *file = args->file; 2394 2395 __file_close_async(file, req); 2396 } 2397 2398 int 2399 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2400 { 2401 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2402 struct spdk_fs_request *req; 2403 struct spdk_fs_cb_args *args; 2404 2405 req = alloc_fs_request(channel); 2406 assert(req != NULL); 2407 2408 args = &req->args; 2409 2410 spdk_file_sync(file, _channel); 2411 BLOBFS_TRACE(file, "name=%s\n", file->name); 2412 args->file = file; 2413 args->sem = &channel->sem; 2414 args->fn.file_op = __file_close_done; 2415 args->arg = req; 2416 channel->send_request(__file_close, req); 2417 sem_wait(&channel->sem); 2418 2419 return args->rc; 2420 } 2421 2422 static void 2423 cache_free_buffers(struct spdk_file *file) 2424 { 2425 BLOBFS_TRACE(file, "free=%s\n", file->name); 2426 pthread_spin_lock(&file->lock); 2427 pthread_spin_lock(&g_caches_lock); 2428 if (file->tree->present_mask == 0) { 2429 pthread_spin_unlock(&g_caches_lock); 2430 pthread_spin_unlock(&file->lock); 2431 return; 2432 } 2433 spdk_tree_free_buffers(file->tree); 2434 2435 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2436 /* If not freed, put it in the end of the queue */ 2437 if (file->tree->present_mask != 0) { 2438 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2439 } 2440 file->last = NULL; 2441 pthread_spin_unlock(&g_caches_lock); 2442 pthread_spin_unlock(&file->lock); 2443 } 2444 2445 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS); 2446 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW); 2447