1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "blobfs_internal.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/io_channel.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 47 #define BLOBFS_TRACE(file, str, args...) \ 48 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 49 50 #define BLOBFS_TRACE_RW(file, str, args...) \ 51 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 54 55 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 56 static struct spdk_mempool *g_cache_pool; 57 static TAILQ_HEAD(, spdk_file) g_caches; 58 static int g_fs_count = 0; 59 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 60 static pthread_spinlock_t g_caches_lock; 61 62 static void 63 __sem_post(void *arg, int bserrno) 64 { 65 sem_t *sem = arg; 66 67 sem_post(sem); 68 } 69 70 void 71 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 72 { 73 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 74 free(cache_buffer); 75 } 76 77 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 78 79 struct spdk_file { 80 struct spdk_filesystem *fs; 81 struct spdk_blob *blob; 82 char *name; 83 uint64_t length; 84 bool is_deleted; 85 bool open_for_writing; 86 uint64_t length_flushed; 87 uint64_t append_pos; 88 uint64_t seq_byte_count; 89 uint64_t next_seq_offset; 90 uint32_t priority; 91 TAILQ_ENTRY(spdk_file) tailq; 92 spdk_blob_id blobid; 93 uint32_t ref_count; 94 pthread_spinlock_t lock; 95 struct cache_buffer *last; 96 struct cache_tree *tree; 97 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 98 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 99 TAILQ_ENTRY(spdk_file) cache_tailq; 100 }; 101 102 struct spdk_deleted_file { 103 spdk_blob_id id; 104 TAILQ_ENTRY(spdk_deleted_file) tailq; 105 }; 106 107 struct spdk_filesystem { 108 struct spdk_blob_store *bs; 109 TAILQ_HEAD(, spdk_file) files; 110 struct spdk_bs_opts bs_opts; 111 struct spdk_bs_dev *bdev; 112 fs_send_request_fn send_request; 113 114 struct { 115 uint32_t max_ops; 116 struct spdk_io_channel *sync_io_channel; 117 struct spdk_fs_channel *sync_fs_channel; 118 } sync_target; 119 120 struct { 121 uint32_t max_ops; 122 struct spdk_io_channel *md_io_channel; 123 struct spdk_fs_channel *md_fs_channel; 124 } md_target; 125 126 struct { 127 uint32_t max_ops; 128 } io_target; 129 }; 130 131 struct spdk_fs_cb_args { 132 union { 133 spdk_fs_op_with_handle_complete fs_op_with_handle; 134 spdk_fs_op_complete fs_op; 135 spdk_file_op_with_handle_complete file_op_with_handle; 136 spdk_file_op_complete file_op; 137 spdk_file_stat_op_complete stat_op; 138 } fn; 139 void *arg; 140 sem_t *sem; 141 struct spdk_filesystem *fs; 142 struct spdk_file *file; 143 int rc; 144 bool from_request; 145 union { 146 struct { 147 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 148 } fs_load; 149 struct { 150 uint64_t length; 151 } truncate; 152 struct { 153 struct spdk_io_channel *channel; 154 void *user_buf; 155 void *pin_buf; 156 int is_read; 157 off_t offset; 158 size_t length; 159 uint64_t start_page; 160 uint64_t num_pages; 161 uint32_t blocklen; 162 } rw; 163 struct { 164 const char *old_name; 165 const char *new_name; 166 } rename; 167 struct { 168 struct cache_buffer *cache_buffer; 169 uint64_t length; 170 } flush; 171 struct { 172 struct cache_buffer *cache_buffer; 173 uint64_t length; 174 uint64_t offset; 175 } readahead; 176 struct { 177 uint64_t offset; 178 TAILQ_ENTRY(spdk_fs_request) tailq; 179 bool xattr_in_progress; 180 } sync; 181 struct { 182 uint32_t num_clusters; 183 } resize; 184 struct { 185 const char *name; 186 uint32_t flags; 187 TAILQ_ENTRY(spdk_fs_request) tailq; 188 } open; 189 struct { 190 const char *name; 191 } create; 192 struct { 193 const char *name; 194 } delete; 195 struct { 196 const char *name; 197 } stat; 198 } op; 199 }; 200 201 static void cache_free_buffers(struct spdk_file *file); 202 203 static void 204 __initialize_cache(void) 205 { 206 assert(g_cache_pool == NULL); 207 208 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 209 g_fs_cache_size / CACHE_BUFFER_SIZE, 210 CACHE_BUFFER_SIZE, 211 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 212 SPDK_ENV_SOCKET_ID_ANY); 213 TAILQ_INIT(&g_caches); 214 pthread_spin_init(&g_caches_lock, 0); 215 } 216 217 static void 218 __free_cache(void) 219 { 220 assert(g_cache_pool != NULL); 221 222 spdk_mempool_free(g_cache_pool); 223 g_cache_pool = NULL; 224 } 225 226 static uint64_t 227 __file_get_blob_size(struct spdk_file *file) 228 { 229 uint64_t cluster_sz; 230 231 cluster_sz = file->fs->bs_opts.cluster_sz; 232 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 233 } 234 235 struct spdk_fs_request { 236 struct spdk_fs_cb_args args; 237 TAILQ_ENTRY(spdk_fs_request) link; 238 struct spdk_fs_channel *channel; 239 }; 240 241 struct spdk_fs_channel { 242 struct spdk_fs_request *req_mem; 243 TAILQ_HEAD(, spdk_fs_request) reqs; 244 sem_t sem; 245 struct spdk_filesystem *fs; 246 struct spdk_io_channel *bs_channel; 247 fs_send_request_fn send_request; 248 bool sync; 249 pthread_spinlock_t lock; 250 }; 251 252 static struct spdk_fs_request * 253 alloc_fs_request(struct spdk_fs_channel *channel) 254 { 255 struct spdk_fs_request *req; 256 257 if (channel->sync) { 258 pthread_spin_lock(&channel->lock); 259 } 260 261 req = TAILQ_FIRST(&channel->reqs); 262 if (req) { 263 TAILQ_REMOVE(&channel->reqs, req, link); 264 } 265 266 if (channel->sync) { 267 pthread_spin_unlock(&channel->lock); 268 } 269 270 if (req == NULL) { 271 return NULL; 272 } 273 memset(req, 0, sizeof(*req)); 274 req->channel = channel; 275 req->args.from_request = true; 276 277 return req; 278 } 279 280 static void 281 free_fs_request(struct spdk_fs_request *req) 282 { 283 struct spdk_fs_channel *channel = req->channel; 284 285 if (channel->sync) { 286 pthread_spin_lock(&channel->lock); 287 } 288 289 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 290 291 if (channel->sync) { 292 pthread_spin_unlock(&channel->lock); 293 } 294 } 295 296 static int 297 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 298 uint32_t max_ops) 299 { 300 uint32_t i; 301 302 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 303 if (!channel->req_mem) { 304 return -1; 305 } 306 307 TAILQ_INIT(&channel->reqs); 308 sem_init(&channel->sem, 0, 0); 309 310 for (i = 0; i < max_ops; i++) { 311 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 312 } 313 314 channel->fs = fs; 315 316 return 0; 317 } 318 319 static int 320 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 321 { 322 struct spdk_filesystem *fs; 323 struct spdk_fs_channel *channel = ctx_buf; 324 325 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 326 327 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 328 } 329 330 static int 331 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 332 { 333 struct spdk_filesystem *fs; 334 struct spdk_fs_channel *channel = ctx_buf; 335 336 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 337 338 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 339 } 340 341 static int 342 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 343 { 344 struct spdk_filesystem *fs; 345 struct spdk_fs_channel *channel = ctx_buf; 346 347 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 348 349 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 350 } 351 352 static void 353 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 354 { 355 struct spdk_fs_channel *channel = ctx_buf; 356 357 free(channel->req_mem); 358 if (channel->bs_channel != NULL) { 359 spdk_bs_free_io_channel(channel->bs_channel); 360 } 361 } 362 363 static void 364 __send_request_direct(fs_request_fn fn, void *arg) 365 { 366 fn(arg); 367 } 368 369 static void 370 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 371 { 372 fs->bs = bs; 373 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 374 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 375 fs->md_target.md_fs_channel->send_request = __send_request_direct; 376 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 377 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 378 379 pthread_mutex_lock(&g_cache_init_lock); 380 if (g_fs_count == 0) { 381 __initialize_cache(); 382 } 383 g_fs_count++; 384 pthread_mutex_unlock(&g_cache_init_lock); 385 } 386 387 static void 388 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 389 { 390 struct spdk_fs_request *req = ctx; 391 struct spdk_fs_cb_args *args = &req->args; 392 struct spdk_filesystem *fs = args->fs; 393 394 if (bserrno == 0) { 395 common_fs_bs_init(fs, bs); 396 } else { 397 free(fs); 398 fs = NULL; 399 } 400 401 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 402 free_fs_request(req); 403 } 404 405 static void 406 fs_conf_parse(void) 407 { 408 struct spdk_conf_section *sp; 409 410 sp = spdk_conf_find_section(NULL, "Blobfs"); 411 if (sp == NULL) { 412 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 413 return; 414 } 415 416 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 417 if (g_fs_cache_buffer_shift <= 0) { 418 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 419 } 420 } 421 422 static struct spdk_filesystem * 423 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 424 { 425 struct spdk_filesystem *fs; 426 427 fs = calloc(1, sizeof(*fs)); 428 if (fs == NULL) { 429 return NULL; 430 } 431 432 fs->bdev = dev; 433 fs->send_request = send_request_fn; 434 TAILQ_INIT(&fs->files); 435 436 fs->md_target.max_ops = 512; 437 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 438 sizeof(struct spdk_fs_channel)); 439 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 440 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 441 442 fs->sync_target.max_ops = 512; 443 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 444 sizeof(struct spdk_fs_channel)); 445 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 446 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 447 448 fs->io_target.max_ops = 512; 449 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 450 sizeof(struct spdk_fs_channel)); 451 452 return fs; 453 } 454 455 void 456 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 457 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 458 { 459 struct spdk_filesystem *fs; 460 struct spdk_fs_request *req; 461 struct spdk_fs_cb_args *args; 462 struct spdk_bs_opts opts = {}; 463 464 fs = fs_alloc(dev, send_request_fn); 465 if (fs == NULL) { 466 cb_fn(cb_arg, NULL, -ENOMEM); 467 return; 468 } 469 470 fs_conf_parse(); 471 472 req = alloc_fs_request(fs->md_target.md_fs_channel); 473 if (req == NULL) { 474 spdk_put_io_channel(fs->md_target.md_io_channel); 475 spdk_io_device_unregister(&fs->md_target, NULL); 476 spdk_put_io_channel(fs->sync_target.sync_io_channel); 477 spdk_io_device_unregister(&fs->sync_target, NULL); 478 spdk_io_device_unregister(&fs->io_target, NULL); 479 free(fs); 480 cb_fn(cb_arg, NULL, -ENOMEM); 481 return; 482 } 483 484 args = &req->args; 485 args->fn.fs_op_with_handle = cb_fn; 486 args->arg = cb_arg; 487 args->fs = fs; 488 489 spdk_bs_opts_init(&opts); 490 strncpy(opts.bstype.bstype, "BLOBFS", SPDK_BLOBSTORE_TYPE_LENGTH); 491 492 spdk_bs_init(dev, &opts, init_cb, req); 493 } 494 495 static struct spdk_file * 496 file_alloc(struct spdk_filesystem *fs) 497 { 498 struct spdk_file *file; 499 500 file = calloc(1, sizeof(*file)); 501 if (file == NULL) { 502 return NULL; 503 } 504 505 file->tree = calloc(1, sizeof(*file->tree)); 506 if (file->tree == NULL) { 507 free(file); 508 return NULL; 509 } 510 511 file->fs = fs; 512 TAILQ_INIT(&file->open_requests); 513 TAILQ_INIT(&file->sync_requests); 514 pthread_spin_init(&file->lock, 0); 515 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 516 file->priority = SPDK_FILE_PRIORITY_LOW; 517 return file; 518 } 519 520 static void iter_delete_cb(void *ctx, int bserrno); 521 522 static int 523 _handle_deleted_files(struct spdk_fs_request *req) 524 { 525 struct spdk_fs_cb_args *args = &req->args; 526 struct spdk_filesystem *fs = args->fs; 527 528 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 529 struct spdk_deleted_file *deleted_file; 530 531 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 532 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 533 spdk_bs_md_delete_blob(fs->bs, deleted_file->id, iter_delete_cb, req); 534 free(deleted_file); 535 return 0; 536 } 537 538 return 1; 539 } 540 541 static void 542 iter_delete_cb(void *ctx, int bserrno) 543 { 544 struct spdk_fs_request *req = ctx; 545 struct spdk_fs_cb_args *args = &req->args; 546 struct spdk_filesystem *fs = args->fs; 547 548 if (_handle_deleted_files(req) == 0) { 549 return; 550 } 551 552 args->fn.fs_op_with_handle(args->arg, fs, 0); 553 free_fs_request(req); 554 555 } 556 557 static void 558 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 559 { 560 struct spdk_fs_request *req = ctx; 561 struct spdk_fs_cb_args *args = &req->args; 562 struct spdk_filesystem *fs = args->fs; 563 uint64_t *length; 564 const char *name; 565 uint32_t *is_deleted; 566 size_t value_len; 567 568 if (rc == -ENOENT) { 569 /* Finished iterating */ 570 if (_handle_deleted_files(req) == 0) { 571 return; 572 } 573 args->fn.fs_op_with_handle(args->arg, fs, 0); 574 free_fs_request(req); 575 return; 576 } else if (rc < 0) { 577 args->fn.fs_op_with_handle(args->arg, fs, rc); 578 free_fs_request(req); 579 return; 580 } 581 582 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 583 if (rc < 0) { 584 args->fn.fs_op_with_handle(args->arg, fs, rc); 585 free_fs_request(req); 586 return; 587 } 588 589 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 590 if (rc < 0) { 591 args->fn.fs_op_with_handle(args->arg, fs, rc); 592 free_fs_request(req); 593 return; 594 } 595 596 assert(value_len == 8); 597 598 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 599 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 600 if (rc < 0) { 601 struct spdk_file *f; 602 603 f = file_alloc(fs); 604 if (f == NULL) { 605 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 606 free_fs_request(req); 607 return; 608 } 609 610 f->name = strdup(name); 611 f->blobid = spdk_blob_get_id(blob); 612 f->length = *length; 613 f->length_flushed = *length; 614 f->append_pos = *length; 615 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 616 } else { 617 struct spdk_deleted_file *deleted_file; 618 619 deleted_file = calloc(1, sizeof(*deleted_file)); 620 if (deleted_file == NULL) { 621 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 622 free_fs_request(req); 623 return; 624 } 625 deleted_file->id = spdk_blob_get_id(blob); 626 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 627 } 628 629 spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req); 630 } 631 632 static void 633 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 634 { 635 struct spdk_fs_request *req = ctx; 636 struct spdk_fs_cb_args *args = &req->args; 637 struct spdk_filesystem *fs = args->fs; 638 struct spdk_bs_type bstype; 639 static const struct spdk_bs_type blobfs_type = {"BLOBFS"}; 640 static const struct spdk_bs_type zeros; 641 642 if (bserrno != 0) { 643 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 644 free_fs_request(req); 645 free(fs); 646 return; 647 } 648 649 bstype = spdk_bs_get_bstype(bs); 650 651 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 652 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n"); 653 spdk_bs_set_bstype(bs, blobfs_type); 654 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 655 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n"); 656 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype)); 657 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 658 free_fs_request(req); 659 free(fs); 660 return; 661 } 662 663 common_fs_bs_init(fs, bs); 664 spdk_bs_md_iter_first(fs->bs, iter_cb, req); 665 } 666 667 void 668 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 669 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 670 { 671 struct spdk_filesystem *fs; 672 struct spdk_fs_cb_args *args; 673 struct spdk_fs_request *req; 674 struct spdk_bs_opts opts = {}; 675 676 fs = fs_alloc(dev, send_request_fn); 677 if (fs == NULL) { 678 cb_fn(cb_arg, NULL, -ENOMEM); 679 return; 680 } 681 682 fs_conf_parse(); 683 684 req = alloc_fs_request(fs->md_target.md_fs_channel); 685 if (req == NULL) { 686 spdk_put_io_channel(fs->md_target.md_io_channel); 687 spdk_io_device_unregister(&fs->md_target, NULL); 688 spdk_put_io_channel(fs->sync_target.sync_io_channel); 689 spdk_io_device_unregister(&fs->sync_target, NULL); 690 spdk_io_device_unregister(&fs->io_target, NULL); 691 free(fs); 692 cb_fn(cb_arg, NULL, -ENOMEM); 693 return; 694 } 695 696 args = &req->args; 697 args->fn.fs_op_with_handle = cb_fn; 698 args->arg = cb_arg; 699 args->fs = fs; 700 TAILQ_INIT(&args->op.fs_load.deleted_files); 701 702 spdk_bs_opts_init(&opts); 703 704 spdk_bs_load(dev, &opts, load_cb, req); 705 } 706 707 static void 708 unload_cb(void *ctx, int bserrno) 709 { 710 struct spdk_fs_request *req = ctx; 711 struct spdk_fs_cb_args *args = &req->args; 712 struct spdk_filesystem *fs = args->fs; 713 714 pthread_mutex_lock(&g_cache_init_lock); 715 g_fs_count--; 716 if (g_fs_count == 0) { 717 __free_cache(); 718 } 719 pthread_mutex_unlock(&g_cache_init_lock); 720 721 args->fn.fs_op(args->arg, bserrno); 722 free(req); 723 724 spdk_io_device_unregister(&fs->io_target, NULL); 725 spdk_io_device_unregister(&fs->sync_target, NULL); 726 spdk_io_device_unregister(&fs->md_target, NULL); 727 728 free(fs); 729 } 730 731 void 732 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 733 { 734 struct spdk_fs_request *req; 735 struct spdk_fs_cb_args *args; 736 737 /* 738 * We must free the md_channel before unloading the blobstore, so just 739 * allocate this request from the general heap. 740 */ 741 req = calloc(1, sizeof(*req)); 742 if (req == NULL) { 743 cb_fn(cb_arg, -ENOMEM); 744 return; 745 } 746 747 args = &req->args; 748 args->fn.fs_op = cb_fn; 749 args->arg = cb_arg; 750 args->fs = fs; 751 752 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 753 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 754 spdk_bs_unload(fs->bs, unload_cb, req); 755 } 756 757 static struct spdk_file * 758 fs_find_file(struct spdk_filesystem *fs, const char *name) 759 { 760 struct spdk_file *file; 761 762 TAILQ_FOREACH(file, &fs->files, tailq) { 763 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 764 return file; 765 } 766 } 767 768 return NULL; 769 } 770 771 void 772 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 773 spdk_file_stat_op_complete cb_fn, void *cb_arg) 774 { 775 struct spdk_file_stat stat; 776 struct spdk_file *f = NULL; 777 778 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 779 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 780 return; 781 } 782 783 f = fs_find_file(fs, name); 784 if (f != NULL) { 785 stat.blobid = f->blobid; 786 stat.size = f->length; 787 cb_fn(cb_arg, &stat, 0); 788 return; 789 } 790 791 cb_fn(cb_arg, NULL, -ENOENT); 792 } 793 794 static void 795 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 796 { 797 struct spdk_fs_request *req = arg; 798 struct spdk_fs_cb_args *args = &req->args; 799 800 args->rc = fserrno; 801 if (fserrno == 0) { 802 memcpy(args->arg, stat, sizeof(*stat)); 803 } 804 sem_post(args->sem); 805 } 806 807 static void 808 __file_stat(void *arg) 809 { 810 struct spdk_fs_request *req = arg; 811 struct spdk_fs_cb_args *args = &req->args; 812 813 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 814 args->fn.stat_op, req); 815 } 816 817 int 818 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 819 const char *name, struct spdk_file_stat *stat) 820 { 821 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 822 struct spdk_fs_request *req; 823 int rc; 824 825 req = alloc_fs_request(channel); 826 assert(req != NULL); 827 828 req->args.fs = fs; 829 req->args.op.stat.name = name; 830 req->args.fn.stat_op = __copy_stat; 831 req->args.arg = stat; 832 req->args.sem = &channel->sem; 833 channel->send_request(__file_stat, req); 834 sem_wait(&channel->sem); 835 836 rc = req->args.rc; 837 free_fs_request(req); 838 839 return rc; 840 } 841 842 static void 843 fs_create_blob_close_cb(void *ctx, int bserrno) 844 { 845 struct spdk_fs_request *req = ctx; 846 struct spdk_fs_cb_args *args = &req->args; 847 848 args->fn.file_op(args->arg, bserrno); 849 free_fs_request(req); 850 } 851 852 static void 853 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 854 { 855 struct spdk_fs_request *req = ctx; 856 struct spdk_fs_cb_args *args = &req->args; 857 struct spdk_file *f = args->file; 858 uint64_t length = 0; 859 860 f->blob = blob; 861 spdk_blob_resize(blob, 1); 862 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 863 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 864 865 spdk_blob_close(&f->blob, fs_create_blob_close_cb, args); 866 } 867 868 static void 869 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 870 { 871 struct spdk_fs_request *req = ctx; 872 struct spdk_fs_cb_args *args = &req->args; 873 struct spdk_file *f = args->file; 874 875 f->blobid = blobid; 876 spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 877 } 878 879 void 880 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 881 spdk_file_op_complete cb_fn, void *cb_arg) 882 { 883 struct spdk_file *file; 884 struct spdk_fs_request *req; 885 struct spdk_fs_cb_args *args; 886 887 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 888 cb_fn(cb_arg, -ENAMETOOLONG); 889 return; 890 } 891 892 file = fs_find_file(fs, name); 893 if (file != NULL) { 894 cb_fn(cb_arg, -EEXIST); 895 return; 896 } 897 898 file = file_alloc(fs); 899 if (file == NULL) { 900 cb_fn(cb_arg, -ENOMEM); 901 return; 902 } 903 904 req = alloc_fs_request(fs->md_target.md_fs_channel); 905 if (req == NULL) { 906 cb_fn(cb_arg, -ENOMEM); 907 return; 908 } 909 910 args = &req->args; 911 args->file = file; 912 args->fn.file_op = cb_fn; 913 args->arg = cb_arg; 914 915 file->name = strdup(name); 916 spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args); 917 } 918 919 static void 920 __fs_create_file_done(void *arg, int fserrno) 921 { 922 struct spdk_fs_request *req = arg; 923 struct spdk_fs_cb_args *args = &req->args; 924 925 args->rc = fserrno; 926 sem_post(args->sem); 927 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 928 } 929 930 static void 931 __fs_create_file(void *arg) 932 { 933 struct spdk_fs_request *req = arg; 934 struct spdk_fs_cb_args *args = &req->args; 935 936 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 937 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 938 } 939 940 int 941 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 942 { 943 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 944 struct spdk_fs_request *req; 945 struct spdk_fs_cb_args *args; 946 int rc; 947 948 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 949 950 req = alloc_fs_request(channel); 951 assert(req != NULL); 952 953 args = &req->args; 954 args->fs = fs; 955 args->op.create.name = name; 956 args->sem = &channel->sem; 957 fs->send_request(__fs_create_file, req); 958 sem_wait(&channel->sem); 959 rc = args->rc; 960 free_fs_request(req); 961 962 return rc; 963 } 964 965 static void 966 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 967 { 968 struct spdk_fs_request *req = ctx; 969 struct spdk_fs_cb_args *args = &req->args; 970 struct spdk_file *f = args->file; 971 972 f->blob = blob; 973 while (!TAILQ_EMPTY(&f->open_requests)) { 974 req = TAILQ_FIRST(&f->open_requests); 975 args = &req->args; 976 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 977 args->fn.file_op_with_handle(args->arg, f, bserrno); 978 free_fs_request(req); 979 } 980 } 981 982 static void 983 fs_open_blob_create_cb(void *ctx, int bserrno) 984 { 985 struct spdk_fs_request *req = ctx; 986 struct spdk_fs_cb_args *args = &req->args; 987 struct spdk_file *file = args->file; 988 struct spdk_filesystem *fs = args->fs; 989 990 if (file == NULL) { 991 /* 992 * This is from an open with CREATE flag - the file 993 * is now created so look it up in the file list for this 994 * filesystem. 995 */ 996 file = fs_find_file(fs, args->op.open.name); 997 assert(file != NULL); 998 args->file = file; 999 } 1000 1001 file->ref_count++; 1002 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1003 if (file->ref_count == 1) { 1004 assert(file->blob == NULL); 1005 spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1006 } else if (file->blob != NULL) { 1007 fs_open_blob_done(req, file->blob, 0); 1008 } else { 1009 /* 1010 * The blob open for this file is in progress due to a previous 1011 * open request. When that open completes, it will invoke the 1012 * open callback for this request. 1013 */ 1014 } 1015 } 1016 1017 void 1018 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1019 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1020 { 1021 struct spdk_file *f = NULL; 1022 struct spdk_fs_request *req; 1023 struct spdk_fs_cb_args *args; 1024 1025 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1026 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1027 return; 1028 } 1029 1030 f = fs_find_file(fs, name); 1031 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1032 cb_fn(cb_arg, NULL, -ENOENT); 1033 return; 1034 } 1035 1036 if (f != NULL && f->is_deleted == true) { 1037 cb_fn(cb_arg, NULL, -ENOENT); 1038 return; 1039 } 1040 1041 req = alloc_fs_request(fs->md_target.md_fs_channel); 1042 if (req == NULL) { 1043 cb_fn(cb_arg, NULL, -ENOMEM); 1044 return; 1045 } 1046 1047 args = &req->args; 1048 args->fn.file_op_with_handle = cb_fn; 1049 args->arg = cb_arg; 1050 args->file = f; 1051 args->fs = fs; 1052 args->op.open.name = name; 1053 1054 if (f == NULL) { 1055 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1056 } else { 1057 fs_open_blob_create_cb(req, 0); 1058 } 1059 } 1060 1061 static void 1062 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1063 { 1064 struct spdk_fs_request *req = arg; 1065 struct spdk_fs_cb_args *args = &req->args; 1066 1067 args->file = file; 1068 args->rc = bserrno; 1069 sem_post(args->sem); 1070 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1071 } 1072 1073 static void 1074 __fs_open_file(void *arg) 1075 { 1076 struct spdk_fs_request *req = arg; 1077 struct spdk_fs_cb_args *args = &req->args; 1078 1079 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1080 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1081 __fs_open_file_done, req); 1082 } 1083 1084 int 1085 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1086 const char *name, uint32_t flags, struct spdk_file **file) 1087 { 1088 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1089 struct spdk_fs_request *req; 1090 struct spdk_fs_cb_args *args; 1091 int rc; 1092 1093 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1094 1095 req = alloc_fs_request(channel); 1096 assert(req != NULL); 1097 1098 args = &req->args; 1099 args->fs = fs; 1100 args->op.open.name = name; 1101 args->op.open.flags = flags; 1102 args->sem = &channel->sem; 1103 fs->send_request(__fs_open_file, req); 1104 sem_wait(&channel->sem); 1105 rc = args->rc; 1106 if (rc == 0) { 1107 *file = args->file; 1108 } else { 1109 *file = NULL; 1110 } 1111 free_fs_request(req); 1112 1113 return rc; 1114 } 1115 1116 static void 1117 fs_rename_blob_close_cb(void *ctx, int bserrno) 1118 { 1119 struct spdk_fs_request *req = ctx; 1120 struct spdk_fs_cb_args *args = &req->args; 1121 1122 args->fn.fs_op(args->arg, bserrno); 1123 free_fs_request(req); 1124 } 1125 1126 static void 1127 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1128 { 1129 struct spdk_fs_request *req = ctx; 1130 struct spdk_fs_cb_args *args = &req->args; 1131 struct spdk_file *f = args->file; 1132 const char *new_name = args->op.rename.new_name; 1133 1134 f->blob = blob; 1135 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1136 spdk_blob_close(&f->blob, fs_rename_blob_close_cb, req); 1137 } 1138 1139 static void 1140 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1141 { 1142 struct spdk_fs_cb_args *args = &req->args; 1143 struct spdk_file *f; 1144 1145 f = fs_find_file(args->fs, args->op.rename.old_name); 1146 if (f == NULL) { 1147 args->fn.fs_op(args->arg, -ENOENT); 1148 free_fs_request(req); 1149 return; 1150 } 1151 1152 free(f->name); 1153 f->name = strdup(args->op.rename.new_name); 1154 args->file = f; 1155 spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1156 } 1157 1158 static void 1159 fs_rename_delete_done(void *arg, int fserrno) 1160 { 1161 __spdk_fs_md_rename_file(arg); 1162 } 1163 1164 void 1165 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1166 const char *old_name, const char *new_name, 1167 spdk_file_op_complete cb_fn, void *cb_arg) 1168 { 1169 struct spdk_file *f; 1170 struct spdk_fs_request *req; 1171 struct spdk_fs_cb_args *args; 1172 1173 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1174 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1175 cb_fn(cb_arg, -ENAMETOOLONG); 1176 return; 1177 } 1178 1179 req = alloc_fs_request(fs->md_target.md_fs_channel); 1180 if (req == NULL) { 1181 cb_fn(cb_arg, -ENOMEM); 1182 return; 1183 } 1184 1185 args = &req->args; 1186 args->fn.fs_op = cb_fn; 1187 args->fs = fs; 1188 args->arg = cb_arg; 1189 args->op.rename.old_name = old_name; 1190 args->op.rename.new_name = new_name; 1191 1192 f = fs_find_file(fs, new_name); 1193 if (f == NULL) { 1194 __spdk_fs_md_rename_file(req); 1195 return; 1196 } 1197 1198 /* 1199 * The rename overwrites an existing file. So delete the existing file, then 1200 * do the actual rename. 1201 */ 1202 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1203 } 1204 1205 static void 1206 __fs_rename_file_done(void *arg, int fserrno) 1207 { 1208 struct spdk_fs_request *req = arg; 1209 struct spdk_fs_cb_args *args = &req->args; 1210 1211 args->rc = fserrno; 1212 sem_post(args->sem); 1213 } 1214 1215 static void 1216 __fs_rename_file(void *arg) 1217 { 1218 struct spdk_fs_request *req = arg; 1219 struct spdk_fs_cb_args *args = &req->args; 1220 1221 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1222 __fs_rename_file_done, req); 1223 } 1224 1225 int 1226 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1227 const char *old_name, const char *new_name) 1228 { 1229 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1230 struct spdk_fs_request *req; 1231 struct spdk_fs_cb_args *args; 1232 int rc; 1233 1234 req = alloc_fs_request(channel); 1235 assert(req != NULL); 1236 1237 args = &req->args; 1238 1239 args->fs = fs; 1240 args->op.rename.old_name = old_name; 1241 args->op.rename.new_name = new_name; 1242 args->sem = &channel->sem; 1243 fs->send_request(__fs_rename_file, req); 1244 sem_wait(&channel->sem); 1245 rc = args->rc; 1246 free_fs_request(req); 1247 return rc; 1248 } 1249 1250 static void 1251 blob_delete_cb(void *ctx, int bserrno) 1252 { 1253 struct spdk_fs_request *req = ctx; 1254 struct spdk_fs_cb_args *args = &req->args; 1255 1256 args->fn.file_op(args->arg, bserrno); 1257 free_fs_request(req); 1258 } 1259 1260 void 1261 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1262 spdk_file_op_complete cb_fn, void *cb_arg) 1263 { 1264 struct spdk_file *f; 1265 spdk_blob_id blobid; 1266 struct spdk_fs_request *req; 1267 struct spdk_fs_cb_args *args; 1268 1269 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1270 1271 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1272 cb_fn(cb_arg, -ENAMETOOLONG); 1273 return; 1274 } 1275 1276 f = fs_find_file(fs, name); 1277 if (f == NULL) { 1278 cb_fn(cb_arg, -ENOENT); 1279 return; 1280 } 1281 1282 req = alloc_fs_request(fs->md_target.md_fs_channel); 1283 if (req == NULL) { 1284 cb_fn(cb_arg, -ENOMEM); 1285 return; 1286 } 1287 1288 args = &req->args; 1289 args->fn.file_op = cb_fn; 1290 args->arg = cb_arg; 1291 1292 if (f->ref_count > 0) { 1293 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1294 f->is_deleted = true; 1295 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1296 spdk_blob_sync_md(f->blob, blob_delete_cb, args); 1297 return; 1298 } 1299 1300 TAILQ_REMOVE(&fs->files, f, tailq); 1301 1302 cache_free_buffers(f); 1303 1304 blobid = f->blobid; 1305 1306 free(f->name); 1307 free(f->tree); 1308 free(f); 1309 1310 spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1311 } 1312 1313 static void 1314 __fs_delete_file_done(void *arg, int fserrno) 1315 { 1316 struct spdk_fs_request *req = arg; 1317 struct spdk_fs_cb_args *args = &req->args; 1318 1319 args->rc = fserrno; 1320 sem_post(args->sem); 1321 } 1322 1323 static void 1324 __fs_delete_file(void *arg) 1325 { 1326 struct spdk_fs_request *req = arg; 1327 struct spdk_fs_cb_args *args = &req->args; 1328 1329 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1330 } 1331 1332 int 1333 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1334 const char *name) 1335 { 1336 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1337 struct spdk_fs_request *req; 1338 struct spdk_fs_cb_args *args; 1339 int rc; 1340 1341 req = alloc_fs_request(channel); 1342 assert(req != NULL); 1343 1344 args = &req->args; 1345 args->fs = fs; 1346 args->op.delete.name = name; 1347 args->sem = &channel->sem; 1348 fs->send_request(__fs_delete_file, req); 1349 sem_wait(&channel->sem); 1350 rc = args->rc; 1351 free_fs_request(req); 1352 1353 return rc; 1354 } 1355 1356 spdk_fs_iter 1357 spdk_fs_iter_first(struct spdk_filesystem *fs) 1358 { 1359 struct spdk_file *f; 1360 1361 f = TAILQ_FIRST(&fs->files); 1362 return f; 1363 } 1364 1365 spdk_fs_iter 1366 spdk_fs_iter_next(spdk_fs_iter iter) 1367 { 1368 struct spdk_file *f = iter; 1369 1370 if (f == NULL) { 1371 return NULL; 1372 } 1373 1374 f = TAILQ_NEXT(f, tailq); 1375 return f; 1376 } 1377 1378 const char * 1379 spdk_file_get_name(struct spdk_file *file) 1380 { 1381 return file->name; 1382 } 1383 1384 uint64_t 1385 spdk_file_get_length(struct spdk_file *file) 1386 { 1387 assert(file != NULL); 1388 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1389 return file->length; 1390 } 1391 1392 static void 1393 fs_truncate_complete_cb(void *ctx, int bserrno) 1394 { 1395 struct spdk_fs_request *req = ctx; 1396 struct spdk_fs_cb_args *args = &req->args; 1397 1398 args->fn.file_op(args->arg, bserrno); 1399 free_fs_request(req); 1400 } 1401 1402 static uint64_t 1403 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1404 { 1405 return (length + cluster_sz - 1) / cluster_sz; 1406 } 1407 1408 void 1409 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1410 spdk_file_op_complete cb_fn, void *cb_arg) 1411 { 1412 struct spdk_filesystem *fs; 1413 size_t num_clusters; 1414 struct spdk_fs_request *req; 1415 struct spdk_fs_cb_args *args; 1416 1417 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1418 if (length == file->length) { 1419 cb_fn(cb_arg, 0); 1420 return; 1421 } 1422 1423 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1424 if (req == NULL) { 1425 cb_fn(cb_arg, -ENOMEM); 1426 return; 1427 } 1428 1429 args = &req->args; 1430 args->fn.file_op = cb_fn; 1431 args->arg = cb_arg; 1432 args->file = file; 1433 fs = file->fs; 1434 1435 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1436 1437 spdk_blob_resize(file->blob, num_clusters); 1438 spdk_blob_set_xattr(file->blob, "length", &length, sizeof(length)); 1439 1440 file->length = length; 1441 if (file->append_pos > file->length) { 1442 file->append_pos = file->length; 1443 } 1444 1445 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, args); 1446 } 1447 1448 static void 1449 __truncate(void *arg) 1450 { 1451 struct spdk_fs_request *req = arg; 1452 struct spdk_fs_cb_args *args = &req->args; 1453 1454 spdk_file_truncate_async(args->file, args->op.truncate.length, 1455 args->fn.file_op, args->arg); 1456 } 1457 1458 void 1459 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1460 uint64_t length) 1461 { 1462 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1463 struct spdk_fs_request *req; 1464 struct spdk_fs_cb_args *args; 1465 1466 req = alloc_fs_request(channel); 1467 assert(req != NULL); 1468 1469 args = &req->args; 1470 1471 args->file = file; 1472 args->op.truncate.length = length; 1473 args->fn.file_op = __sem_post; 1474 args->arg = &channel->sem; 1475 1476 channel->send_request(__truncate, req); 1477 sem_wait(&channel->sem); 1478 free_fs_request(req); 1479 } 1480 1481 static void 1482 __rw_done(void *ctx, int bserrno) 1483 { 1484 struct spdk_fs_request *req = ctx; 1485 struct spdk_fs_cb_args *args = &req->args; 1486 1487 spdk_dma_free(args->op.rw.pin_buf); 1488 args->fn.file_op(args->arg, bserrno); 1489 free_fs_request(req); 1490 } 1491 1492 static void 1493 __read_done(void *ctx, int bserrno) 1494 { 1495 struct spdk_fs_request *req = ctx; 1496 struct spdk_fs_cb_args *args = &req->args; 1497 1498 if (args->op.rw.is_read) { 1499 memcpy(args->op.rw.user_buf, 1500 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1501 args->op.rw.length); 1502 __rw_done(req, 0); 1503 } else { 1504 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1505 args->op.rw.user_buf, 1506 args->op.rw.length); 1507 spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel, 1508 args->op.rw.pin_buf, 1509 args->op.rw.start_page, args->op.rw.num_pages, 1510 __rw_done, req); 1511 } 1512 } 1513 1514 static void 1515 __do_blob_read(void *ctx, int fserrno) 1516 { 1517 struct spdk_fs_request *req = ctx; 1518 struct spdk_fs_cb_args *args = &req->args; 1519 1520 spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel, 1521 args->op.rw.pin_buf, 1522 args->op.rw.start_page, args->op.rw.num_pages, 1523 __read_done, req); 1524 } 1525 1526 static void 1527 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1528 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1529 { 1530 uint64_t end_page; 1531 1532 *page_size = spdk_bs_get_page_size(file->fs->bs); 1533 *start_page = offset / *page_size; 1534 end_page = (offset + length - 1) / *page_size; 1535 *num_pages = (end_page - *start_page + 1); 1536 } 1537 1538 static void 1539 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1540 void *payload, uint64_t offset, uint64_t length, 1541 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1542 { 1543 struct spdk_fs_request *req; 1544 struct spdk_fs_cb_args *args; 1545 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1546 uint64_t start_page, num_pages, pin_buf_length; 1547 uint32_t page_size; 1548 1549 if (is_read && offset + length > file->length) { 1550 cb_fn(cb_arg, -EINVAL); 1551 return; 1552 } 1553 1554 req = alloc_fs_request(channel); 1555 if (req == NULL) { 1556 cb_fn(cb_arg, -ENOMEM); 1557 return; 1558 } 1559 1560 args = &req->args; 1561 args->fn.file_op = cb_fn; 1562 args->arg = cb_arg; 1563 args->file = file; 1564 args->op.rw.channel = channel->bs_channel; 1565 args->op.rw.user_buf = payload; 1566 args->op.rw.is_read = is_read; 1567 args->op.rw.offset = offset; 1568 args->op.rw.length = length; 1569 1570 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1571 pin_buf_length = num_pages * page_size; 1572 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL); 1573 1574 args->op.rw.start_page = start_page; 1575 args->op.rw.num_pages = num_pages; 1576 1577 if (!is_read && file->length < offset + length) { 1578 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1579 } else { 1580 __do_blob_read(req, 0); 1581 } 1582 } 1583 1584 void 1585 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1586 void *payload, uint64_t offset, uint64_t length, 1587 spdk_file_op_complete cb_fn, void *cb_arg) 1588 { 1589 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1590 } 1591 1592 void 1593 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1594 void *payload, uint64_t offset, uint64_t length, 1595 spdk_file_op_complete cb_fn, void *cb_arg) 1596 { 1597 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1598 file->name, offset, length); 1599 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1600 } 1601 1602 struct spdk_io_channel * 1603 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1604 { 1605 struct spdk_io_channel *io_channel; 1606 struct spdk_fs_channel *fs_channel; 1607 1608 io_channel = spdk_get_io_channel(&fs->io_target); 1609 fs_channel = spdk_io_channel_get_ctx(io_channel); 1610 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1611 fs_channel->send_request = __send_request_direct; 1612 1613 return io_channel; 1614 } 1615 1616 struct spdk_io_channel * 1617 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs) 1618 { 1619 struct spdk_io_channel *io_channel; 1620 struct spdk_fs_channel *fs_channel; 1621 1622 io_channel = spdk_get_io_channel(&fs->io_target); 1623 fs_channel = spdk_io_channel_get_ctx(io_channel); 1624 fs_channel->send_request = fs->send_request; 1625 fs_channel->sync = 1; 1626 pthread_spin_init(&fs_channel->lock, 0); 1627 1628 return io_channel; 1629 } 1630 1631 void 1632 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1633 { 1634 spdk_put_io_channel(channel); 1635 } 1636 1637 void 1638 spdk_fs_set_cache_size(uint64_t size_in_mb) 1639 { 1640 g_fs_cache_size = size_in_mb * 1024 * 1024; 1641 } 1642 1643 uint64_t 1644 spdk_fs_get_cache_size(void) 1645 { 1646 return g_fs_cache_size / (1024 * 1024); 1647 } 1648 1649 static void __file_flush(void *_args); 1650 1651 static void * 1652 alloc_cache_memory_buffer(struct spdk_file *context) 1653 { 1654 struct spdk_file *file; 1655 void *buf; 1656 1657 buf = spdk_mempool_get(g_cache_pool); 1658 if (buf != NULL) { 1659 return buf; 1660 } 1661 1662 pthread_spin_lock(&g_caches_lock); 1663 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1664 if (!file->open_for_writing && 1665 file->priority == SPDK_FILE_PRIORITY_LOW && 1666 file != context) { 1667 break; 1668 } 1669 } 1670 pthread_spin_unlock(&g_caches_lock); 1671 if (file != NULL) { 1672 cache_free_buffers(file); 1673 buf = spdk_mempool_get(g_cache_pool); 1674 if (buf != NULL) { 1675 return buf; 1676 } 1677 } 1678 1679 pthread_spin_lock(&g_caches_lock); 1680 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1681 if (!file->open_for_writing && file != context) { 1682 break; 1683 } 1684 } 1685 pthread_spin_unlock(&g_caches_lock); 1686 if (file != NULL) { 1687 cache_free_buffers(file); 1688 buf = spdk_mempool_get(g_cache_pool); 1689 if (buf != NULL) { 1690 return buf; 1691 } 1692 } 1693 1694 pthread_spin_lock(&g_caches_lock); 1695 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1696 if (file != context) { 1697 break; 1698 } 1699 } 1700 pthread_spin_unlock(&g_caches_lock); 1701 if (file != NULL) { 1702 cache_free_buffers(file); 1703 buf = spdk_mempool_get(g_cache_pool); 1704 if (buf != NULL) { 1705 return buf; 1706 } 1707 } 1708 1709 return NULL; 1710 } 1711 1712 static struct cache_buffer * 1713 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1714 { 1715 struct cache_buffer *buf; 1716 int count = 0; 1717 1718 buf = calloc(1, sizeof(*buf)); 1719 if (buf == NULL) { 1720 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 1721 return NULL; 1722 } 1723 1724 buf->buf = alloc_cache_memory_buffer(file); 1725 while (buf->buf == NULL) { 1726 /* 1727 * TODO: alloc_cache_memory_buffer() should eventually free 1728 * some buffers. Need a more sophisticated check here, instead 1729 * of just bailing if 100 tries does not result in getting a 1730 * free buffer. This will involve using the sync channel's 1731 * semaphore to block until a buffer becomes available. 1732 */ 1733 if (count++ == 100) { 1734 SPDK_ERRLOG("could not allocate cache buffer\n"); 1735 assert(false); 1736 free(buf); 1737 return NULL; 1738 } 1739 buf->buf = alloc_cache_memory_buffer(file); 1740 } 1741 1742 buf->buf_size = CACHE_BUFFER_SIZE; 1743 buf->offset = offset; 1744 1745 pthread_spin_lock(&g_caches_lock); 1746 if (file->tree->present_mask == 0) { 1747 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1748 } 1749 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1750 pthread_spin_unlock(&g_caches_lock); 1751 1752 return buf; 1753 } 1754 1755 static struct cache_buffer * 1756 cache_append_buffer(struct spdk_file *file) 1757 { 1758 struct cache_buffer *last; 1759 1760 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1761 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1762 1763 last = cache_insert_buffer(file, file->append_pos); 1764 if (last == NULL) { 1765 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 1766 return NULL; 1767 } 1768 1769 file->last = last; 1770 1771 return last; 1772 } 1773 1774 static void 1775 __wake_caller(struct spdk_fs_cb_args *args) 1776 { 1777 sem_post(args->sem); 1778 } 1779 1780 static void __check_sync_reqs(struct spdk_file *file); 1781 1782 static void 1783 __file_cache_finish_sync(struct spdk_file *file) 1784 { 1785 struct spdk_fs_request *sync_req; 1786 struct spdk_fs_cb_args *sync_args; 1787 1788 pthread_spin_lock(&file->lock); 1789 sync_req = TAILQ_FIRST(&file->sync_requests); 1790 sync_args = &sync_req->args; 1791 assert(sync_args->op.sync.offset <= file->length_flushed); 1792 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1793 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1794 pthread_spin_unlock(&file->lock); 1795 1796 sync_args->fn.file_op(sync_args->arg, 0); 1797 __check_sync_reqs(file); 1798 1799 pthread_spin_lock(&file->lock); 1800 free_fs_request(sync_req); 1801 pthread_spin_unlock(&file->lock); 1802 } 1803 1804 static void 1805 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1806 { 1807 struct spdk_file *file = ctx; 1808 1809 __file_cache_finish_sync(file); 1810 } 1811 1812 static void 1813 __free_args(struct spdk_fs_cb_args *args) 1814 { 1815 struct spdk_fs_request *req; 1816 1817 if (!args->from_request) { 1818 free(args); 1819 } else { 1820 /* Depends on args being at the start of the spdk_fs_request structure. */ 1821 req = (struct spdk_fs_request *)args; 1822 free_fs_request(req); 1823 } 1824 } 1825 1826 static void 1827 __check_sync_reqs(struct spdk_file *file) 1828 { 1829 struct spdk_fs_request *sync_req; 1830 1831 pthread_spin_lock(&file->lock); 1832 1833 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1834 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1835 break; 1836 } 1837 } 1838 1839 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1840 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1841 sync_req->args.op.sync.xattr_in_progress = true; 1842 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 1843 sizeof(file->length_flushed)); 1844 1845 pthread_spin_unlock(&file->lock); 1846 spdk_blob_sync_md(file->blob, __file_cache_finish_sync_bs_cb, file); 1847 } else { 1848 pthread_spin_unlock(&file->lock); 1849 } 1850 } 1851 1852 static void 1853 __file_flush_done(void *arg, int bserrno) 1854 { 1855 struct spdk_fs_cb_args *args = arg; 1856 struct spdk_file *file = args->file; 1857 struct cache_buffer *next = args->op.flush.cache_buffer; 1858 1859 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1860 1861 pthread_spin_lock(&file->lock); 1862 next->in_progress = false; 1863 next->bytes_flushed += args->op.flush.length; 1864 file->length_flushed += args->op.flush.length; 1865 if (file->length_flushed > file->length) { 1866 file->length = file->length_flushed; 1867 } 1868 if (next->bytes_flushed == next->buf_size) { 1869 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1870 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1871 } 1872 1873 /* 1874 * Assert that there is no cached data that extends past the end of the underlying 1875 * blob. 1876 */ 1877 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1878 next->bytes_filled == 0); 1879 1880 pthread_spin_unlock(&file->lock); 1881 1882 __check_sync_reqs(file); 1883 1884 __file_flush(args); 1885 } 1886 1887 static void 1888 __file_flush(void *_args) 1889 { 1890 struct spdk_fs_cb_args *args = _args; 1891 struct spdk_file *file = args->file; 1892 struct cache_buffer *next; 1893 uint64_t offset, length, start_page, num_pages; 1894 uint32_t page_size; 1895 1896 pthread_spin_lock(&file->lock); 1897 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1898 if (next == NULL || next->in_progress) { 1899 /* 1900 * There is either no data to flush, or a flush I/O is already in 1901 * progress. So return immediately - if a flush I/O is in 1902 * progress we will flush more data after that is completed. 1903 */ 1904 __free_args(args); 1905 pthread_spin_unlock(&file->lock); 1906 return; 1907 } 1908 1909 offset = next->offset + next->bytes_flushed; 1910 length = next->bytes_filled - next->bytes_flushed; 1911 if (length == 0) { 1912 __free_args(args); 1913 pthread_spin_unlock(&file->lock); 1914 return; 1915 } 1916 args->op.flush.length = length; 1917 args->op.flush.cache_buffer = next; 1918 1919 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1920 1921 next->in_progress = true; 1922 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1923 offset, length, start_page, num_pages); 1924 pthread_spin_unlock(&file->lock); 1925 spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1926 next->buf + (start_page * page_size) - next->offset, 1927 start_page, num_pages, 1928 __file_flush_done, args); 1929 } 1930 1931 static void 1932 __file_extend_done(void *arg, int bserrno) 1933 { 1934 struct spdk_fs_cb_args *args = arg; 1935 1936 __wake_caller(args); 1937 } 1938 1939 static void 1940 __file_extend_blob(void *_args) 1941 { 1942 struct spdk_fs_cb_args *args = _args; 1943 struct spdk_file *file = args->file; 1944 1945 spdk_blob_resize(file->blob, args->op.resize.num_clusters); 1946 1947 spdk_blob_sync_md(file->blob, __file_extend_done, args); 1948 } 1949 1950 static void 1951 __rw_from_file_done(void *arg, int bserrno) 1952 { 1953 struct spdk_fs_cb_args *args = arg; 1954 1955 __wake_caller(args); 1956 __free_args(args); 1957 } 1958 1959 static void 1960 __rw_from_file(void *_args) 1961 { 1962 struct spdk_fs_cb_args *args = _args; 1963 struct spdk_file *file = args->file; 1964 1965 if (args->op.rw.is_read) { 1966 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1967 args->op.rw.offset, args->op.rw.length, 1968 __rw_from_file_done, args); 1969 } else { 1970 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1971 args->op.rw.offset, args->op.rw.length, 1972 __rw_from_file_done, args); 1973 } 1974 } 1975 1976 static int 1977 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1978 uint64_t offset, uint64_t length, bool is_read) 1979 { 1980 struct spdk_fs_cb_args *args; 1981 1982 args = calloc(1, sizeof(*args)); 1983 if (args == NULL) { 1984 sem_post(sem); 1985 return -ENOMEM; 1986 } 1987 1988 args->file = file; 1989 args->sem = sem; 1990 args->op.rw.user_buf = payload; 1991 args->op.rw.offset = offset; 1992 args->op.rw.length = length; 1993 args->op.rw.is_read = is_read; 1994 file->fs->send_request(__rw_from_file, args); 1995 return 0; 1996 } 1997 1998 int 1999 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 2000 void *payload, uint64_t offset, uint64_t length) 2001 { 2002 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2003 struct spdk_fs_cb_args *args; 2004 uint64_t rem_length, copy, blob_size, cluster_sz; 2005 uint32_t cache_buffers_filled = 0; 2006 uint8_t *cur_payload; 2007 struct cache_buffer *last; 2008 2009 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2010 2011 if (length == 0) { 2012 return 0; 2013 } 2014 2015 if (offset != file->append_pos) { 2016 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2017 return -EINVAL; 2018 } 2019 2020 pthread_spin_lock(&file->lock); 2021 file->open_for_writing = true; 2022 2023 if (file->last == NULL) { 2024 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 2025 cache_append_buffer(file); 2026 } else { 2027 int rc; 2028 2029 file->append_pos += length; 2030 pthread_spin_unlock(&file->lock); 2031 rc = __send_rw_from_file(file, &channel->sem, payload, 2032 offset, length, false); 2033 sem_wait(&channel->sem); 2034 return rc; 2035 } 2036 } 2037 2038 blob_size = __file_get_blob_size(file); 2039 2040 if ((offset + length) > blob_size) { 2041 struct spdk_fs_cb_args extend_args = {}; 2042 2043 cluster_sz = file->fs->bs_opts.cluster_sz; 2044 extend_args.sem = &channel->sem; 2045 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2046 extend_args.file = file; 2047 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2048 pthread_spin_unlock(&file->lock); 2049 file->fs->send_request(__file_extend_blob, &extend_args); 2050 sem_wait(&channel->sem); 2051 } 2052 2053 last = file->last; 2054 rem_length = length; 2055 cur_payload = payload; 2056 while (rem_length > 0) { 2057 copy = last->buf_size - last->bytes_filled; 2058 if (copy > rem_length) { 2059 copy = rem_length; 2060 } 2061 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2062 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2063 file->append_pos += copy; 2064 if (file->length < file->append_pos) { 2065 file->length = file->append_pos; 2066 } 2067 cur_payload += copy; 2068 last->bytes_filled += copy; 2069 rem_length -= copy; 2070 if (last->bytes_filled == last->buf_size) { 2071 cache_buffers_filled++; 2072 last = cache_append_buffer(file); 2073 if (last == NULL) { 2074 BLOBFS_TRACE(file, "nomem\n"); 2075 pthread_spin_unlock(&file->lock); 2076 return -ENOMEM; 2077 } 2078 } 2079 } 2080 2081 if (cache_buffers_filled == 0) { 2082 pthread_spin_unlock(&file->lock); 2083 return 0; 2084 } 2085 2086 args = calloc(1, sizeof(*args)); 2087 if (args == NULL) { 2088 pthread_spin_unlock(&file->lock); 2089 return -ENOMEM; 2090 } 2091 2092 args->file = file; 2093 file->fs->send_request(__file_flush, args); 2094 pthread_spin_unlock(&file->lock); 2095 return 0; 2096 } 2097 2098 static void 2099 __readahead_done(void *arg, int bserrno) 2100 { 2101 struct spdk_fs_cb_args *args = arg; 2102 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2103 struct spdk_file *file = args->file; 2104 2105 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2106 2107 pthread_spin_lock(&file->lock); 2108 cache_buffer->bytes_filled = args->op.readahead.length; 2109 cache_buffer->bytes_flushed = args->op.readahead.length; 2110 cache_buffer->in_progress = false; 2111 pthread_spin_unlock(&file->lock); 2112 2113 __free_args(args); 2114 } 2115 2116 static void 2117 __readahead(void *_args) 2118 { 2119 struct spdk_fs_cb_args *args = _args; 2120 struct spdk_file *file = args->file; 2121 uint64_t offset, length, start_page, num_pages; 2122 uint32_t page_size; 2123 2124 offset = args->op.readahead.offset; 2125 length = args->op.readahead.length; 2126 assert(length > 0); 2127 2128 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 2129 2130 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2131 offset, length, start_page, num_pages); 2132 spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2133 args->op.readahead.cache_buffer->buf, 2134 start_page, num_pages, 2135 __readahead_done, args); 2136 } 2137 2138 static uint64_t 2139 __next_cache_buffer_offset(uint64_t offset) 2140 { 2141 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2142 } 2143 2144 static void 2145 check_readahead(struct spdk_file *file, uint64_t offset) 2146 { 2147 struct spdk_fs_cb_args *args; 2148 2149 offset = __next_cache_buffer_offset(offset); 2150 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2151 return; 2152 } 2153 2154 args = calloc(1, sizeof(*args)); 2155 if (args == NULL) { 2156 return; 2157 } 2158 2159 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2160 2161 args->file = file; 2162 args->op.readahead.offset = offset; 2163 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2164 args->op.readahead.cache_buffer->in_progress = true; 2165 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2166 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2167 } else { 2168 args->op.readahead.length = CACHE_BUFFER_SIZE; 2169 } 2170 file->fs->send_request(__readahead, args); 2171 } 2172 2173 static int 2174 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2175 { 2176 struct cache_buffer *buf; 2177 int rc; 2178 2179 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2180 if (buf == NULL) { 2181 pthread_spin_unlock(&file->lock); 2182 rc = __send_rw_from_file(file, sem, payload, offset, length, true); 2183 pthread_spin_lock(&file->lock); 2184 return rc; 2185 } 2186 2187 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2188 length = buf->offset + buf->bytes_filled - offset; 2189 } 2190 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2191 memcpy(payload, &buf->buf[offset - buf->offset], length); 2192 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2193 pthread_spin_lock(&g_caches_lock); 2194 spdk_tree_remove_buffer(file->tree, buf); 2195 if (file->tree->present_mask == 0) { 2196 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2197 } 2198 pthread_spin_unlock(&g_caches_lock); 2199 } 2200 2201 sem_post(sem); 2202 return 0; 2203 } 2204 2205 int64_t 2206 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2207 void *payload, uint64_t offset, uint64_t length) 2208 { 2209 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2210 uint64_t final_offset, final_length; 2211 uint32_t sub_reads = 0; 2212 int rc = 0; 2213 2214 pthread_spin_lock(&file->lock); 2215 2216 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2217 2218 file->open_for_writing = false; 2219 2220 if (length == 0 || offset >= file->append_pos) { 2221 pthread_spin_unlock(&file->lock); 2222 return 0; 2223 } 2224 2225 if (offset + length > file->append_pos) { 2226 length = file->append_pos - offset; 2227 } 2228 2229 if (offset != file->next_seq_offset) { 2230 file->seq_byte_count = 0; 2231 } 2232 file->seq_byte_count += length; 2233 file->next_seq_offset = offset + length; 2234 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2235 check_readahead(file, offset); 2236 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2237 } 2238 2239 final_length = 0; 2240 final_offset = offset + length; 2241 while (offset < final_offset) { 2242 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2243 if (length > (final_offset - offset)) { 2244 length = final_offset - offset; 2245 } 2246 rc = __file_read(file, payload, offset, length, &channel->sem); 2247 if (rc == 0) { 2248 final_length += length; 2249 } else { 2250 break; 2251 } 2252 payload += length; 2253 offset += length; 2254 sub_reads++; 2255 } 2256 pthread_spin_unlock(&file->lock); 2257 while (sub_reads-- > 0) { 2258 sem_wait(&channel->sem); 2259 } 2260 if (rc == 0) { 2261 return final_length; 2262 } else { 2263 return rc; 2264 } 2265 } 2266 2267 static void 2268 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2269 spdk_file_op_complete cb_fn, void *cb_arg) 2270 { 2271 struct spdk_fs_request *sync_req; 2272 struct spdk_fs_request *flush_req; 2273 struct spdk_fs_cb_args *sync_args; 2274 struct spdk_fs_cb_args *flush_args; 2275 2276 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2277 2278 pthread_spin_lock(&file->lock); 2279 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2280 BLOBFS_TRACE(file, "done - no data to flush\n"); 2281 pthread_spin_unlock(&file->lock); 2282 cb_fn(cb_arg, 0); 2283 return; 2284 } 2285 2286 sync_req = alloc_fs_request(channel); 2287 assert(sync_req != NULL); 2288 sync_args = &sync_req->args; 2289 2290 flush_req = alloc_fs_request(channel); 2291 assert(flush_req != NULL); 2292 flush_args = &flush_req->args; 2293 2294 sync_args->file = file; 2295 sync_args->fn.file_op = cb_fn; 2296 sync_args->arg = cb_arg; 2297 sync_args->op.sync.offset = file->append_pos; 2298 sync_args->op.sync.xattr_in_progress = false; 2299 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2300 pthread_spin_unlock(&file->lock); 2301 2302 flush_args->file = file; 2303 channel->send_request(__file_flush, flush_args); 2304 } 2305 2306 int 2307 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2308 { 2309 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2310 2311 _file_sync(file, channel, __sem_post, &channel->sem); 2312 sem_wait(&channel->sem); 2313 2314 return 0; 2315 } 2316 2317 void 2318 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2319 spdk_file_op_complete cb_fn, void *cb_arg) 2320 { 2321 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2322 2323 _file_sync(file, channel, cb_fn, cb_arg); 2324 } 2325 2326 void 2327 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2328 { 2329 BLOBFS_TRACE(file, "priority=%u\n", priority); 2330 file->priority = priority; 2331 2332 } 2333 2334 /* 2335 * Close routines 2336 */ 2337 2338 static void 2339 __file_close_async_done(void *ctx, int bserrno) 2340 { 2341 struct spdk_fs_request *req = ctx; 2342 struct spdk_fs_cb_args *args = &req->args; 2343 struct spdk_file *file = args->file; 2344 2345 if (file->is_deleted) { 2346 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2347 return; 2348 } 2349 args->fn.file_op(args->arg, bserrno); 2350 free_fs_request(req); 2351 } 2352 2353 static void 2354 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2355 { 2356 pthread_spin_lock(&file->lock); 2357 if (file->ref_count == 0) { 2358 pthread_spin_unlock(&file->lock); 2359 __file_close_async_done(req, -EBADF); 2360 return; 2361 } 2362 2363 file->ref_count--; 2364 if (file->ref_count > 0) { 2365 pthread_spin_unlock(&file->lock); 2366 __file_close_async_done(req, 0); 2367 return; 2368 } 2369 2370 pthread_spin_unlock(&file->lock); 2371 2372 spdk_blob_close(&file->blob, __file_close_async_done, req); 2373 } 2374 2375 static void 2376 __file_close_async__sync_done(void *arg, int fserrno) 2377 { 2378 struct spdk_fs_request *req = arg; 2379 struct spdk_fs_cb_args *args = &req->args; 2380 2381 __file_close_async(args->file, req); 2382 } 2383 2384 void 2385 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2386 { 2387 struct spdk_fs_request *req; 2388 struct spdk_fs_cb_args *args; 2389 2390 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2391 if (req == NULL) { 2392 cb_fn(cb_arg, -ENOMEM); 2393 return; 2394 } 2395 2396 args = &req->args; 2397 args->file = file; 2398 args->fn.file_op = cb_fn; 2399 args->arg = cb_arg; 2400 2401 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2402 } 2403 2404 static void 2405 __file_close_done(void *arg, int fserrno) 2406 { 2407 struct spdk_fs_cb_args *args = arg; 2408 2409 args->rc = fserrno; 2410 sem_post(args->sem); 2411 } 2412 2413 static void 2414 __file_close(void *arg) 2415 { 2416 struct spdk_fs_request *req = arg; 2417 struct spdk_fs_cb_args *args = &req->args; 2418 struct spdk_file *file = args->file; 2419 2420 __file_close_async(file, req); 2421 } 2422 2423 int 2424 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2425 { 2426 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2427 struct spdk_fs_request *req; 2428 struct spdk_fs_cb_args *args; 2429 2430 req = alloc_fs_request(channel); 2431 assert(req != NULL); 2432 2433 args = &req->args; 2434 2435 spdk_file_sync(file, _channel); 2436 BLOBFS_TRACE(file, "name=%s\n", file->name); 2437 args->file = file; 2438 args->sem = &channel->sem; 2439 args->fn.file_op = __file_close_done; 2440 args->arg = req; 2441 channel->send_request(__file_close, req); 2442 sem_wait(&channel->sem); 2443 2444 return args->rc; 2445 } 2446 2447 static void 2448 cache_free_buffers(struct spdk_file *file) 2449 { 2450 BLOBFS_TRACE(file, "free=%s\n", file->name); 2451 pthread_spin_lock(&file->lock); 2452 pthread_spin_lock(&g_caches_lock); 2453 if (file->tree->present_mask == 0) { 2454 pthread_spin_unlock(&g_caches_lock); 2455 pthread_spin_unlock(&file->lock); 2456 return; 2457 } 2458 spdk_tree_free_buffers(file->tree); 2459 2460 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2461 /* If not freed, put it in the end of the queue */ 2462 if (file->tree->present_mask != 0) { 2463 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2464 } 2465 file->last = NULL; 2466 pthread_spin_unlock(&g_caches_lock); 2467 pthread_spin_unlock(&file->lock); 2468 } 2469 2470 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2471 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2472