1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "blobfs_internal.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/io_channel.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 47 #define BLOBFS_TRACE(file, str, args...) \ 48 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 49 50 #define BLOBFS_TRACE_RW(file, str, args...) \ 51 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 54 #define SPDK_BLOBFS_OPTS_CLUSTER_SZ (1024 * 1024) 55 56 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 57 static struct spdk_mempool *g_cache_pool; 58 static TAILQ_HEAD(, spdk_file) g_caches; 59 static int g_fs_count = 0; 60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 61 static pthread_spinlock_t g_caches_lock; 62 63 static void 64 __sem_post(void *arg, int bserrno) 65 { 66 sem_t *sem = arg; 67 68 sem_post(sem); 69 } 70 71 void 72 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 73 { 74 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 75 free(cache_buffer); 76 } 77 78 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 79 80 struct spdk_file { 81 struct spdk_filesystem *fs; 82 struct spdk_blob *blob; 83 char *name; 84 uint64_t length; 85 bool is_deleted; 86 bool open_for_writing; 87 uint64_t length_flushed; 88 uint64_t append_pos; 89 uint64_t seq_byte_count; 90 uint64_t next_seq_offset; 91 uint32_t priority; 92 TAILQ_ENTRY(spdk_file) tailq; 93 spdk_blob_id blobid; 94 uint32_t ref_count; 95 pthread_spinlock_t lock; 96 struct cache_buffer *last; 97 struct cache_tree *tree; 98 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 99 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 100 TAILQ_ENTRY(spdk_file) cache_tailq; 101 }; 102 103 struct spdk_deleted_file { 104 spdk_blob_id id; 105 TAILQ_ENTRY(spdk_deleted_file) tailq; 106 }; 107 108 struct spdk_filesystem { 109 struct spdk_blob_store *bs; 110 TAILQ_HEAD(, spdk_file) files; 111 struct spdk_bs_opts bs_opts; 112 struct spdk_bs_dev *bdev; 113 fs_send_request_fn send_request; 114 115 struct { 116 uint32_t max_ops; 117 struct spdk_io_channel *sync_io_channel; 118 struct spdk_fs_channel *sync_fs_channel; 119 } sync_target; 120 121 struct { 122 uint32_t max_ops; 123 struct spdk_io_channel *md_io_channel; 124 struct spdk_fs_channel *md_fs_channel; 125 } md_target; 126 127 struct { 128 uint32_t max_ops; 129 } io_target; 130 }; 131 132 struct spdk_fs_cb_args { 133 union { 134 spdk_fs_op_with_handle_complete fs_op_with_handle; 135 spdk_fs_op_complete fs_op; 136 spdk_file_op_with_handle_complete file_op_with_handle; 137 spdk_file_op_complete file_op; 138 spdk_file_stat_op_complete stat_op; 139 } fn; 140 void *arg; 141 sem_t *sem; 142 struct spdk_filesystem *fs; 143 struct spdk_file *file; 144 int rc; 145 bool from_request; 146 union { 147 struct { 148 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 149 } fs_load; 150 struct { 151 uint64_t length; 152 } truncate; 153 struct { 154 struct spdk_io_channel *channel; 155 void *user_buf; 156 void *pin_buf; 157 int is_read; 158 off_t offset; 159 size_t length; 160 uint64_t start_page; 161 uint64_t num_pages; 162 uint32_t blocklen; 163 } rw; 164 struct { 165 const char *old_name; 166 const char *new_name; 167 } rename; 168 struct { 169 struct cache_buffer *cache_buffer; 170 uint64_t length; 171 } flush; 172 struct { 173 struct cache_buffer *cache_buffer; 174 uint64_t length; 175 uint64_t offset; 176 } readahead; 177 struct { 178 uint64_t offset; 179 TAILQ_ENTRY(spdk_fs_request) tailq; 180 bool xattr_in_progress; 181 } sync; 182 struct { 183 uint32_t num_clusters; 184 } resize; 185 struct { 186 const char *name; 187 uint32_t flags; 188 TAILQ_ENTRY(spdk_fs_request) tailq; 189 } open; 190 struct { 191 const char *name; 192 } create; 193 struct { 194 const char *name; 195 } delete; 196 struct { 197 const char *name; 198 } stat; 199 } op; 200 }; 201 202 static void cache_free_buffers(struct spdk_file *file); 203 204 void 205 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 206 { 207 opts->cluster_sz = SPDK_BLOBFS_OPTS_CLUSTER_SZ; 208 } 209 210 static void 211 __initialize_cache(void) 212 { 213 assert(g_cache_pool == NULL); 214 215 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 216 g_fs_cache_size / CACHE_BUFFER_SIZE, 217 CACHE_BUFFER_SIZE, 218 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 219 SPDK_ENV_SOCKET_ID_ANY); 220 TAILQ_INIT(&g_caches); 221 pthread_spin_init(&g_caches_lock, 0); 222 } 223 224 static void 225 __free_cache(void) 226 { 227 assert(g_cache_pool != NULL); 228 229 spdk_mempool_free(g_cache_pool); 230 g_cache_pool = NULL; 231 } 232 233 static uint64_t 234 __file_get_blob_size(struct spdk_file *file) 235 { 236 uint64_t cluster_sz; 237 238 cluster_sz = file->fs->bs_opts.cluster_sz; 239 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 240 } 241 242 struct spdk_fs_request { 243 struct spdk_fs_cb_args args; 244 TAILQ_ENTRY(spdk_fs_request) link; 245 struct spdk_fs_channel *channel; 246 }; 247 248 struct spdk_fs_channel { 249 struct spdk_fs_request *req_mem; 250 TAILQ_HEAD(, spdk_fs_request) reqs; 251 sem_t sem; 252 struct spdk_filesystem *fs; 253 struct spdk_io_channel *bs_channel; 254 fs_send_request_fn send_request; 255 bool sync; 256 pthread_spinlock_t lock; 257 }; 258 259 static struct spdk_fs_request * 260 alloc_fs_request(struct spdk_fs_channel *channel) 261 { 262 struct spdk_fs_request *req; 263 264 if (channel->sync) { 265 pthread_spin_lock(&channel->lock); 266 } 267 268 req = TAILQ_FIRST(&channel->reqs); 269 if (req) { 270 TAILQ_REMOVE(&channel->reqs, req, link); 271 } 272 273 if (channel->sync) { 274 pthread_spin_unlock(&channel->lock); 275 } 276 277 if (req == NULL) { 278 return NULL; 279 } 280 memset(req, 0, sizeof(*req)); 281 req->channel = channel; 282 req->args.from_request = true; 283 284 return req; 285 } 286 287 static void 288 free_fs_request(struct spdk_fs_request *req) 289 { 290 struct spdk_fs_channel *channel = req->channel; 291 292 if (channel->sync) { 293 pthread_spin_lock(&channel->lock); 294 } 295 296 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 297 298 if (channel->sync) { 299 pthread_spin_unlock(&channel->lock); 300 } 301 } 302 303 static int 304 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 305 uint32_t max_ops) 306 { 307 uint32_t i; 308 309 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 310 if (!channel->req_mem) { 311 return -1; 312 } 313 314 TAILQ_INIT(&channel->reqs); 315 sem_init(&channel->sem, 0, 0); 316 317 for (i = 0; i < max_ops; i++) { 318 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 319 } 320 321 channel->fs = fs; 322 323 return 0; 324 } 325 326 static int 327 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 328 { 329 struct spdk_filesystem *fs; 330 struct spdk_fs_channel *channel = ctx_buf; 331 332 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 333 334 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 335 } 336 337 static int 338 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 339 { 340 struct spdk_filesystem *fs; 341 struct spdk_fs_channel *channel = ctx_buf; 342 343 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 344 345 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 346 } 347 348 static int 349 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 350 { 351 struct spdk_filesystem *fs; 352 struct spdk_fs_channel *channel = ctx_buf; 353 354 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 355 356 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 357 } 358 359 static void 360 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 361 { 362 struct spdk_fs_channel *channel = ctx_buf; 363 364 free(channel->req_mem); 365 if (channel->bs_channel != NULL) { 366 spdk_bs_free_io_channel(channel->bs_channel); 367 } 368 } 369 370 static void 371 __send_request_direct(fs_request_fn fn, void *arg) 372 { 373 fn(arg); 374 } 375 376 static void 377 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 378 { 379 fs->bs = bs; 380 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 381 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 382 fs->md_target.md_fs_channel->send_request = __send_request_direct; 383 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 384 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 385 386 pthread_mutex_lock(&g_cache_init_lock); 387 if (g_fs_count == 0) { 388 __initialize_cache(); 389 } 390 g_fs_count++; 391 pthread_mutex_unlock(&g_cache_init_lock); 392 } 393 394 static void 395 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 396 { 397 struct spdk_fs_request *req = ctx; 398 struct spdk_fs_cb_args *args = &req->args; 399 struct spdk_filesystem *fs = args->fs; 400 401 if (bserrno == 0) { 402 common_fs_bs_init(fs, bs); 403 } else { 404 free(fs); 405 fs = NULL; 406 } 407 408 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 409 free_fs_request(req); 410 } 411 412 static void 413 fs_conf_parse(void) 414 { 415 struct spdk_conf_section *sp; 416 417 sp = spdk_conf_find_section(NULL, "Blobfs"); 418 if (sp == NULL) { 419 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 420 return; 421 } 422 423 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 424 if (g_fs_cache_buffer_shift <= 0) { 425 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 426 } 427 } 428 429 static struct spdk_filesystem * 430 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 431 { 432 struct spdk_filesystem *fs; 433 434 fs = calloc(1, sizeof(*fs)); 435 if (fs == NULL) { 436 return NULL; 437 } 438 439 fs->bdev = dev; 440 fs->send_request = send_request_fn; 441 TAILQ_INIT(&fs->files); 442 443 fs->md_target.max_ops = 512; 444 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 445 sizeof(struct spdk_fs_channel)); 446 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 447 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 448 449 fs->sync_target.max_ops = 512; 450 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 451 sizeof(struct spdk_fs_channel)); 452 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 453 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 454 455 fs->io_target.max_ops = 512; 456 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 457 sizeof(struct spdk_fs_channel)); 458 459 return fs; 460 } 461 462 void 463 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 464 fs_send_request_fn send_request_fn, 465 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 466 { 467 struct spdk_filesystem *fs; 468 struct spdk_fs_request *req; 469 struct spdk_fs_cb_args *args; 470 struct spdk_bs_opts opts = {}; 471 472 fs = fs_alloc(dev, send_request_fn); 473 if (fs == NULL) { 474 cb_fn(cb_arg, NULL, -ENOMEM); 475 return; 476 } 477 478 fs_conf_parse(); 479 480 req = alloc_fs_request(fs->md_target.md_fs_channel); 481 if (req == NULL) { 482 spdk_put_io_channel(fs->md_target.md_io_channel); 483 spdk_io_device_unregister(&fs->md_target, NULL); 484 spdk_put_io_channel(fs->sync_target.sync_io_channel); 485 spdk_io_device_unregister(&fs->sync_target, NULL); 486 spdk_io_device_unregister(&fs->io_target, NULL); 487 free(fs); 488 cb_fn(cb_arg, NULL, -ENOMEM); 489 return; 490 } 491 492 args = &req->args; 493 args->fn.fs_op_with_handle = cb_fn; 494 args->arg = cb_arg; 495 args->fs = fs; 496 497 spdk_bs_opts_init(&opts); 498 strncpy(opts.bstype.bstype, "BLOBFS", SPDK_BLOBSTORE_TYPE_LENGTH); 499 if (opt) { 500 opts.cluster_sz = opt->cluster_sz; 501 } 502 spdk_bs_init(dev, &opts, init_cb, req); 503 } 504 505 static struct spdk_file * 506 file_alloc(struct spdk_filesystem *fs) 507 { 508 struct spdk_file *file; 509 510 file = calloc(1, sizeof(*file)); 511 if (file == NULL) { 512 return NULL; 513 } 514 515 file->tree = calloc(1, sizeof(*file->tree)); 516 if (file->tree == NULL) { 517 free(file); 518 return NULL; 519 } 520 521 file->fs = fs; 522 TAILQ_INIT(&file->open_requests); 523 TAILQ_INIT(&file->sync_requests); 524 pthread_spin_init(&file->lock, 0); 525 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 526 file->priority = SPDK_FILE_PRIORITY_LOW; 527 return file; 528 } 529 530 static void iter_delete_cb(void *ctx, int bserrno); 531 532 static int 533 _handle_deleted_files(struct spdk_fs_request *req) 534 { 535 struct spdk_fs_cb_args *args = &req->args; 536 struct spdk_filesystem *fs = args->fs; 537 538 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 539 struct spdk_deleted_file *deleted_file; 540 541 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 542 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 543 spdk_bs_delete_blob(fs->bs, deleted_file->id, iter_delete_cb, req); 544 free(deleted_file); 545 return 0; 546 } 547 548 return 1; 549 } 550 551 static void 552 iter_delete_cb(void *ctx, int bserrno) 553 { 554 struct spdk_fs_request *req = ctx; 555 struct spdk_fs_cb_args *args = &req->args; 556 struct spdk_filesystem *fs = args->fs; 557 558 if (_handle_deleted_files(req) == 0) { 559 return; 560 } 561 562 args->fn.fs_op_with_handle(args->arg, fs, 0); 563 free_fs_request(req); 564 565 } 566 567 static void 568 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 569 { 570 struct spdk_fs_request *req = ctx; 571 struct spdk_fs_cb_args *args = &req->args; 572 struct spdk_filesystem *fs = args->fs; 573 uint64_t *length; 574 const char *name; 575 uint32_t *is_deleted; 576 size_t value_len; 577 578 if (rc == -ENOENT) { 579 /* Finished iterating */ 580 if (_handle_deleted_files(req) == 0) { 581 return; 582 } 583 args->fn.fs_op_with_handle(args->arg, fs, 0); 584 free_fs_request(req); 585 return; 586 } else if (rc < 0) { 587 args->fn.fs_op_with_handle(args->arg, fs, rc); 588 free_fs_request(req); 589 return; 590 } 591 592 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 593 if (rc < 0) { 594 args->fn.fs_op_with_handle(args->arg, fs, rc); 595 free_fs_request(req); 596 return; 597 } 598 599 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 600 if (rc < 0) { 601 args->fn.fs_op_with_handle(args->arg, fs, rc); 602 free_fs_request(req); 603 return; 604 } 605 606 assert(value_len == 8); 607 608 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 609 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 610 if (rc < 0) { 611 struct spdk_file *f; 612 613 f = file_alloc(fs); 614 if (f == NULL) { 615 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 616 free_fs_request(req); 617 return; 618 } 619 620 f->name = strdup(name); 621 f->blobid = spdk_blob_get_id(blob); 622 f->length = *length; 623 f->length_flushed = *length; 624 f->append_pos = *length; 625 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 626 } else { 627 struct spdk_deleted_file *deleted_file; 628 629 deleted_file = calloc(1, sizeof(*deleted_file)); 630 if (deleted_file == NULL) { 631 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 632 free_fs_request(req); 633 return; 634 } 635 deleted_file->id = spdk_blob_get_id(blob); 636 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 637 } 638 639 spdk_bs_iter_next(fs->bs, blob, iter_cb, req); 640 } 641 642 static void 643 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 644 { 645 struct spdk_fs_request *req = ctx; 646 struct spdk_fs_cb_args *args = &req->args; 647 struct spdk_filesystem *fs = args->fs; 648 struct spdk_bs_type bstype; 649 static const struct spdk_bs_type blobfs_type = {"BLOBFS"}; 650 static const struct spdk_bs_type zeros; 651 652 if (bserrno != 0) { 653 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 654 free_fs_request(req); 655 free(fs); 656 return; 657 } 658 659 bstype = spdk_bs_get_bstype(bs); 660 661 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 662 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n"); 663 spdk_bs_set_bstype(bs, blobfs_type); 664 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 665 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n"); 666 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype)); 667 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 668 free_fs_request(req); 669 free(fs); 670 return; 671 } 672 673 common_fs_bs_init(fs, bs); 674 spdk_bs_iter_first(fs->bs, iter_cb, req); 675 } 676 677 void 678 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 679 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 680 { 681 struct spdk_filesystem *fs; 682 struct spdk_fs_cb_args *args; 683 struct spdk_fs_request *req; 684 struct spdk_bs_opts opts = {}; 685 686 fs = fs_alloc(dev, send_request_fn); 687 if (fs == NULL) { 688 cb_fn(cb_arg, NULL, -ENOMEM); 689 return; 690 } 691 692 fs_conf_parse(); 693 694 req = alloc_fs_request(fs->md_target.md_fs_channel); 695 if (req == NULL) { 696 spdk_put_io_channel(fs->md_target.md_io_channel); 697 spdk_io_device_unregister(&fs->md_target, NULL); 698 spdk_put_io_channel(fs->sync_target.sync_io_channel); 699 spdk_io_device_unregister(&fs->sync_target, NULL); 700 spdk_io_device_unregister(&fs->io_target, NULL); 701 free(fs); 702 cb_fn(cb_arg, NULL, -ENOMEM); 703 return; 704 } 705 706 args = &req->args; 707 args->fn.fs_op_with_handle = cb_fn; 708 args->arg = cb_arg; 709 args->fs = fs; 710 TAILQ_INIT(&args->op.fs_load.deleted_files); 711 712 spdk_bs_opts_init(&opts); 713 714 spdk_bs_load(dev, &opts, load_cb, req); 715 } 716 717 static void 718 unload_cb(void *ctx, int bserrno) 719 { 720 struct spdk_fs_request *req = ctx; 721 struct spdk_fs_cb_args *args = &req->args; 722 struct spdk_filesystem *fs = args->fs; 723 724 pthread_mutex_lock(&g_cache_init_lock); 725 g_fs_count--; 726 if (g_fs_count == 0) { 727 __free_cache(); 728 } 729 pthread_mutex_unlock(&g_cache_init_lock); 730 731 args->fn.fs_op(args->arg, bserrno); 732 free(req); 733 734 spdk_io_device_unregister(&fs->io_target, NULL); 735 spdk_io_device_unregister(&fs->sync_target, NULL); 736 spdk_io_device_unregister(&fs->md_target, NULL); 737 738 free(fs); 739 } 740 741 void 742 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 743 { 744 struct spdk_fs_request *req; 745 struct spdk_fs_cb_args *args; 746 747 /* 748 * We must free the md_channel before unloading the blobstore, so just 749 * allocate this request from the general heap. 750 */ 751 req = calloc(1, sizeof(*req)); 752 if (req == NULL) { 753 cb_fn(cb_arg, -ENOMEM); 754 return; 755 } 756 757 args = &req->args; 758 args->fn.fs_op = cb_fn; 759 args->arg = cb_arg; 760 args->fs = fs; 761 762 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 763 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 764 spdk_bs_unload(fs->bs, unload_cb, req); 765 } 766 767 static struct spdk_file * 768 fs_find_file(struct spdk_filesystem *fs, const char *name) 769 { 770 struct spdk_file *file; 771 772 TAILQ_FOREACH(file, &fs->files, tailq) { 773 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 774 return file; 775 } 776 } 777 778 return NULL; 779 } 780 781 void 782 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 783 spdk_file_stat_op_complete cb_fn, void *cb_arg) 784 { 785 struct spdk_file_stat stat; 786 struct spdk_file *f = NULL; 787 788 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 789 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 790 return; 791 } 792 793 f = fs_find_file(fs, name); 794 if (f != NULL) { 795 stat.blobid = f->blobid; 796 stat.size = f->length; 797 cb_fn(cb_arg, &stat, 0); 798 return; 799 } 800 801 cb_fn(cb_arg, NULL, -ENOENT); 802 } 803 804 static void 805 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 806 { 807 struct spdk_fs_request *req = arg; 808 struct spdk_fs_cb_args *args = &req->args; 809 810 args->rc = fserrno; 811 if (fserrno == 0) { 812 memcpy(args->arg, stat, sizeof(*stat)); 813 } 814 sem_post(args->sem); 815 } 816 817 static void 818 __file_stat(void *arg) 819 { 820 struct spdk_fs_request *req = arg; 821 struct spdk_fs_cb_args *args = &req->args; 822 823 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 824 args->fn.stat_op, req); 825 } 826 827 int 828 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 829 const char *name, struct spdk_file_stat *stat) 830 { 831 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 832 struct spdk_fs_request *req; 833 int rc; 834 835 req = alloc_fs_request(channel); 836 assert(req != NULL); 837 838 req->args.fs = fs; 839 req->args.op.stat.name = name; 840 req->args.fn.stat_op = __copy_stat; 841 req->args.arg = stat; 842 req->args.sem = &channel->sem; 843 channel->send_request(__file_stat, req); 844 sem_wait(&channel->sem); 845 846 rc = req->args.rc; 847 free_fs_request(req); 848 849 return rc; 850 } 851 852 static void 853 fs_create_blob_close_cb(void *ctx, int bserrno) 854 { 855 struct spdk_fs_request *req = ctx; 856 struct spdk_fs_cb_args *args = &req->args; 857 858 args->fn.file_op(args->arg, bserrno); 859 free_fs_request(req); 860 } 861 862 static void 863 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 864 { 865 struct spdk_fs_request *req = ctx; 866 struct spdk_fs_cb_args *args = &req->args; 867 struct spdk_file *f = args->file; 868 uint64_t length = 0; 869 870 spdk_blob_resize(blob, 1); 871 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 872 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 873 874 spdk_blob_close(blob, fs_create_blob_close_cb, args); 875 } 876 877 static void 878 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 879 { 880 struct spdk_fs_request *req = ctx; 881 struct spdk_fs_cb_args *args = &req->args; 882 struct spdk_file *f = args->file; 883 884 f->blobid = blobid; 885 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 886 } 887 888 void 889 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 890 spdk_file_op_complete cb_fn, void *cb_arg) 891 { 892 struct spdk_file *file; 893 struct spdk_fs_request *req; 894 struct spdk_fs_cb_args *args; 895 896 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 897 cb_fn(cb_arg, -ENAMETOOLONG); 898 return; 899 } 900 901 file = fs_find_file(fs, name); 902 if (file != NULL) { 903 cb_fn(cb_arg, -EEXIST); 904 return; 905 } 906 907 file = file_alloc(fs); 908 if (file == NULL) { 909 cb_fn(cb_arg, -ENOMEM); 910 return; 911 } 912 913 req = alloc_fs_request(fs->md_target.md_fs_channel); 914 if (req == NULL) { 915 cb_fn(cb_arg, -ENOMEM); 916 return; 917 } 918 919 args = &req->args; 920 args->file = file; 921 args->fn.file_op = cb_fn; 922 args->arg = cb_arg; 923 924 file->name = strdup(name); 925 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 926 } 927 928 static void 929 __fs_create_file_done(void *arg, int fserrno) 930 { 931 struct spdk_fs_request *req = arg; 932 struct spdk_fs_cb_args *args = &req->args; 933 934 args->rc = fserrno; 935 sem_post(args->sem); 936 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 937 } 938 939 static void 940 __fs_create_file(void *arg) 941 { 942 struct spdk_fs_request *req = arg; 943 struct spdk_fs_cb_args *args = &req->args; 944 945 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 946 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 947 } 948 949 int 950 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 951 { 952 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 953 struct spdk_fs_request *req; 954 struct spdk_fs_cb_args *args; 955 int rc; 956 957 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 958 959 req = alloc_fs_request(channel); 960 assert(req != NULL); 961 962 args = &req->args; 963 args->fs = fs; 964 args->op.create.name = name; 965 args->sem = &channel->sem; 966 fs->send_request(__fs_create_file, req); 967 sem_wait(&channel->sem); 968 rc = args->rc; 969 free_fs_request(req); 970 971 return rc; 972 } 973 974 static void 975 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 976 { 977 struct spdk_fs_request *req = ctx; 978 struct spdk_fs_cb_args *args = &req->args; 979 struct spdk_file *f = args->file; 980 981 f->blob = blob; 982 while (!TAILQ_EMPTY(&f->open_requests)) { 983 req = TAILQ_FIRST(&f->open_requests); 984 args = &req->args; 985 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 986 args->fn.file_op_with_handle(args->arg, f, bserrno); 987 free_fs_request(req); 988 } 989 } 990 991 static void 992 fs_open_blob_create_cb(void *ctx, int bserrno) 993 { 994 struct spdk_fs_request *req = ctx; 995 struct spdk_fs_cb_args *args = &req->args; 996 struct spdk_file *file = args->file; 997 struct spdk_filesystem *fs = args->fs; 998 999 if (file == NULL) { 1000 /* 1001 * This is from an open with CREATE flag - the file 1002 * is now created so look it up in the file list for this 1003 * filesystem. 1004 */ 1005 file = fs_find_file(fs, args->op.open.name); 1006 assert(file != NULL); 1007 args->file = file; 1008 } 1009 1010 file->ref_count++; 1011 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1012 if (file->ref_count == 1) { 1013 assert(file->blob == NULL); 1014 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1015 } else if (file->blob != NULL) { 1016 fs_open_blob_done(req, file->blob, 0); 1017 } else { 1018 /* 1019 * The blob open for this file is in progress due to a previous 1020 * open request. When that open completes, it will invoke the 1021 * open callback for this request. 1022 */ 1023 } 1024 } 1025 1026 void 1027 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1028 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1029 { 1030 struct spdk_file *f = NULL; 1031 struct spdk_fs_request *req; 1032 struct spdk_fs_cb_args *args; 1033 1034 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1035 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1036 return; 1037 } 1038 1039 f = fs_find_file(fs, name); 1040 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1041 cb_fn(cb_arg, NULL, -ENOENT); 1042 return; 1043 } 1044 1045 if (f != NULL && f->is_deleted == true) { 1046 cb_fn(cb_arg, NULL, -ENOENT); 1047 return; 1048 } 1049 1050 req = alloc_fs_request(fs->md_target.md_fs_channel); 1051 if (req == NULL) { 1052 cb_fn(cb_arg, NULL, -ENOMEM); 1053 return; 1054 } 1055 1056 args = &req->args; 1057 args->fn.file_op_with_handle = cb_fn; 1058 args->arg = cb_arg; 1059 args->file = f; 1060 args->fs = fs; 1061 args->op.open.name = name; 1062 1063 if (f == NULL) { 1064 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1065 } else { 1066 fs_open_blob_create_cb(req, 0); 1067 } 1068 } 1069 1070 static void 1071 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1072 { 1073 struct spdk_fs_request *req = arg; 1074 struct spdk_fs_cb_args *args = &req->args; 1075 1076 args->file = file; 1077 args->rc = bserrno; 1078 sem_post(args->sem); 1079 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1080 } 1081 1082 static void 1083 __fs_open_file(void *arg) 1084 { 1085 struct spdk_fs_request *req = arg; 1086 struct spdk_fs_cb_args *args = &req->args; 1087 1088 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1089 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1090 __fs_open_file_done, req); 1091 } 1092 1093 int 1094 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1095 const char *name, uint32_t flags, struct spdk_file **file) 1096 { 1097 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1098 struct spdk_fs_request *req; 1099 struct spdk_fs_cb_args *args; 1100 int rc; 1101 1102 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1103 1104 req = alloc_fs_request(channel); 1105 assert(req != NULL); 1106 1107 args = &req->args; 1108 args->fs = fs; 1109 args->op.open.name = name; 1110 args->op.open.flags = flags; 1111 args->sem = &channel->sem; 1112 fs->send_request(__fs_open_file, req); 1113 sem_wait(&channel->sem); 1114 rc = args->rc; 1115 if (rc == 0) { 1116 *file = args->file; 1117 } else { 1118 *file = NULL; 1119 } 1120 free_fs_request(req); 1121 1122 return rc; 1123 } 1124 1125 static void 1126 fs_rename_blob_close_cb(void *ctx, int bserrno) 1127 { 1128 struct spdk_fs_request *req = ctx; 1129 struct spdk_fs_cb_args *args = &req->args; 1130 1131 args->fn.fs_op(args->arg, bserrno); 1132 free_fs_request(req); 1133 } 1134 1135 static void 1136 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1137 { 1138 struct spdk_fs_request *req = ctx; 1139 struct spdk_fs_cb_args *args = &req->args; 1140 const char *new_name = args->op.rename.new_name; 1141 1142 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1143 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1144 } 1145 1146 static void 1147 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1148 { 1149 struct spdk_fs_cb_args *args = &req->args; 1150 struct spdk_file *f; 1151 1152 f = fs_find_file(args->fs, args->op.rename.old_name); 1153 if (f == NULL) { 1154 args->fn.fs_op(args->arg, -ENOENT); 1155 free_fs_request(req); 1156 return; 1157 } 1158 1159 free(f->name); 1160 f->name = strdup(args->op.rename.new_name); 1161 args->file = f; 1162 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1163 } 1164 1165 static void 1166 fs_rename_delete_done(void *arg, int fserrno) 1167 { 1168 __spdk_fs_md_rename_file(arg); 1169 } 1170 1171 void 1172 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1173 const char *old_name, const char *new_name, 1174 spdk_file_op_complete cb_fn, void *cb_arg) 1175 { 1176 struct spdk_file *f; 1177 struct spdk_fs_request *req; 1178 struct spdk_fs_cb_args *args; 1179 1180 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1181 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1182 cb_fn(cb_arg, -ENAMETOOLONG); 1183 return; 1184 } 1185 1186 req = alloc_fs_request(fs->md_target.md_fs_channel); 1187 if (req == NULL) { 1188 cb_fn(cb_arg, -ENOMEM); 1189 return; 1190 } 1191 1192 args = &req->args; 1193 args->fn.fs_op = cb_fn; 1194 args->fs = fs; 1195 args->arg = cb_arg; 1196 args->op.rename.old_name = old_name; 1197 args->op.rename.new_name = new_name; 1198 1199 f = fs_find_file(fs, new_name); 1200 if (f == NULL) { 1201 __spdk_fs_md_rename_file(req); 1202 return; 1203 } 1204 1205 /* 1206 * The rename overwrites an existing file. So delete the existing file, then 1207 * do the actual rename. 1208 */ 1209 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1210 } 1211 1212 static void 1213 __fs_rename_file_done(void *arg, int fserrno) 1214 { 1215 struct spdk_fs_request *req = arg; 1216 struct spdk_fs_cb_args *args = &req->args; 1217 1218 args->rc = fserrno; 1219 sem_post(args->sem); 1220 } 1221 1222 static void 1223 __fs_rename_file(void *arg) 1224 { 1225 struct spdk_fs_request *req = arg; 1226 struct spdk_fs_cb_args *args = &req->args; 1227 1228 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1229 __fs_rename_file_done, req); 1230 } 1231 1232 int 1233 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1234 const char *old_name, const char *new_name) 1235 { 1236 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1237 struct spdk_fs_request *req; 1238 struct spdk_fs_cb_args *args; 1239 int rc; 1240 1241 req = alloc_fs_request(channel); 1242 assert(req != NULL); 1243 1244 args = &req->args; 1245 1246 args->fs = fs; 1247 args->op.rename.old_name = old_name; 1248 args->op.rename.new_name = new_name; 1249 args->sem = &channel->sem; 1250 fs->send_request(__fs_rename_file, req); 1251 sem_wait(&channel->sem); 1252 rc = args->rc; 1253 free_fs_request(req); 1254 return rc; 1255 } 1256 1257 static void 1258 blob_delete_cb(void *ctx, int bserrno) 1259 { 1260 struct spdk_fs_request *req = ctx; 1261 struct spdk_fs_cb_args *args = &req->args; 1262 1263 args->fn.file_op(args->arg, bserrno); 1264 free_fs_request(req); 1265 } 1266 1267 void 1268 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1269 spdk_file_op_complete cb_fn, void *cb_arg) 1270 { 1271 struct spdk_file *f; 1272 spdk_blob_id blobid; 1273 struct spdk_fs_request *req; 1274 struct spdk_fs_cb_args *args; 1275 1276 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1277 1278 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1279 cb_fn(cb_arg, -ENAMETOOLONG); 1280 return; 1281 } 1282 1283 f = fs_find_file(fs, name); 1284 if (f == NULL) { 1285 cb_fn(cb_arg, -ENOENT); 1286 return; 1287 } 1288 1289 req = alloc_fs_request(fs->md_target.md_fs_channel); 1290 if (req == NULL) { 1291 cb_fn(cb_arg, -ENOMEM); 1292 return; 1293 } 1294 1295 args = &req->args; 1296 args->fn.file_op = cb_fn; 1297 args->arg = cb_arg; 1298 1299 if (f->ref_count > 0) { 1300 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1301 f->is_deleted = true; 1302 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1303 spdk_blob_sync_md(f->blob, blob_delete_cb, args); 1304 return; 1305 } 1306 1307 TAILQ_REMOVE(&fs->files, f, tailq); 1308 1309 cache_free_buffers(f); 1310 1311 blobid = f->blobid; 1312 1313 free(f->name); 1314 free(f->tree); 1315 free(f); 1316 1317 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1318 } 1319 1320 static void 1321 __fs_delete_file_done(void *arg, int fserrno) 1322 { 1323 struct spdk_fs_request *req = arg; 1324 struct spdk_fs_cb_args *args = &req->args; 1325 1326 args->rc = fserrno; 1327 sem_post(args->sem); 1328 } 1329 1330 static void 1331 __fs_delete_file(void *arg) 1332 { 1333 struct spdk_fs_request *req = arg; 1334 struct spdk_fs_cb_args *args = &req->args; 1335 1336 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1337 } 1338 1339 int 1340 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1341 const char *name) 1342 { 1343 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1344 struct spdk_fs_request *req; 1345 struct spdk_fs_cb_args *args; 1346 int rc; 1347 1348 req = alloc_fs_request(channel); 1349 assert(req != NULL); 1350 1351 args = &req->args; 1352 args->fs = fs; 1353 args->op.delete.name = name; 1354 args->sem = &channel->sem; 1355 fs->send_request(__fs_delete_file, req); 1356 sem_wait(&channel->sem); 1357 rc = args->rc; 1358 free_fs_request(req); 1359 1360 return rc; 1361 } 1362 1363 spdk_fs_iter 1364 spdk_fs_iter_first(struct spdk_filesystem *fs) 1365 { 1366 struct spdk_file *f; 1367 1368 f = TAILQ_FIRST(&fs->files); 1369 return f; 1370 } 1371 1372 spdk_fs_iter 1373 spdk_fs_iter_next(spdk_fs_iter iter) 1374 { 1375 struct spdk_file *f = iter; 1376 1377 if (f == NULL) { 1378 return NULL; 1379 } 1380 1381 f = TAILQ_NEXT(f, tailq); 1382 return f; 1383 } 1384 1385 const char * 1386 spdk_file_get_name(struct spdk_file *file) 1387 { 1388 return file->name; 1389 } 1390 1391 uint64_t 1392 spdk_file_get_length(struct spdk_file *file) 1393 { 1394 assert(file != NULL); 1395 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1396 return file->length; 1397 } 1398 1399 static void 1400 fs_truncate_complete_cb(void *ctx, int bserrno) 1401 { 1402 struct spdk_fs_request *req = ctx; 1403 struct spdk_fs_cb_args *args = &req->args; 1404 1405 args->fn.file_op(args->arg, bserrno); 1406 free_fs_request(req); 1407 } 1408 1409 static uint64_t 1410 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1411 { 1412 return (length + cluster_sz - 1) / cluster_sz; 1413 } 1414 1415 void 1416 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1417 spdk_file_op_complete cb_fn, void *cb_arg) 1418 { 1419 struct spdk_filesystem *fs; 1420 size_t num_clusters; 1421 struct spdk_fs_request *req; 1422 struct spdk_fs_cb_args *args; 1423 1424 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1425 if (length == file->length) { 1426 cb_fn(cb_arg, 0); 1427 return; 1428 } 1429 1430 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1431 if (req == NULL) { 1432 cb_fn(cb_arg, -ENOMEM); 1433 return; 1434 } 1435 1436 args = &req->args; 1437 args->fn.file_op = cb_fn; 1438 args->arg = cb_arg; 1439 args->file = file; 1440 fs = file->fs; 1441 1442 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1443 1444 spdk_blob_resize(file->blob, num_clusters); 1445 spdk_blob_set_xattr(file->blob, "length", &length, sizeof(length)); 1446 1447 file->length = length; 1448 if (file->append_pos > file->length) { 1449 file->append_pos = file->length; 1450 } 1451 1452 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, args); 1453 } 1454 1455 static void 1456 __truncate(void *arg) 1457 { 1458 struct spdk_fs_request *req = arg; 1459 struct spdk_fs_cb_args *args = &req->args; 1460 1461 spdk_file_truncate_async(args->file, args->op.truncate.length, 1462 args->fn.file_op, args->arg); 1463 } 1464 1465 void 1466 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1467 uint64_t length) 1468 { 1469 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1470 struct spdk_fs_request *req; 1471 struct spdk_fs_cb_args *args; 1472 1473 req = alloc_fs_request(channel); 1474 assert(req != NULL); 1475 1476 args = &req->args; 1477 1478 args->file = file; 1479 args->op.truncate.length = length; 1480 args->fn.file_op = __sem_post; 1481 args->arg = &channel->sem; 1482 1483 channel->send_request(__truncate, req); 1484 sem_wait(&channel->sem); 1485 free_fs_request(req); 1486 } 1487 1488 static void 1489 __rw_done(void *ctx, int bserrno) 1490 { 1491 struct spdk_fs_request *req = ctx; 1492 struct spdk_fs_cb_args *args = &req->args; 1493 1494 spdk_dma_free(args->op.rw.pin_buf); 1495 args->fn.file_op(args->arg, bserrno); 1496 free_fs_request(req); 1497 } 1498 1499 static void 1500 __read_done(void *ctx, int bserrno) 1501 { 1502 struct spdk_fs_request *req = ctx; 1503 struct spdk_fs_cb_args *args = &req->args; 1504 1505 if (args->op.rw.is_read) { 1506 memcpy(args->op.rw.user_buf, 1507 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1508 args->op.rw.length); 1509 __rw_done(req, 0); 1510 } else { 1511 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1512 args->op.rw.user_buf, 1513 args->op.rw.length); 1514 spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel, 1515 args->op.rw.pin_buf, 1516 args->op.rw.start_page, args->op.rw.num_pages, 1517 __rw_done, req); 1518 } 1519 } 1520 1521 static void 1522 __do_blob_read(void *ctx, int fserrno) 1523 { 1524 struct spdk_fs_request *req = ctx; 1525 struct spdk_fs_cb_args *args = &req->args; 1526 1527 spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel, 1528 args->op.rw.pin_buf, 1529 args->op.rw.start_page, args->op.rw.num_pages, 1530 __read_done, req); 1531 } 1532 1533 static void 1534 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1535 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1536 { 1537 uint64_t end_page; 1538 1539 *page_size = spdk_bs_get_page_size(file->fs->bs); 1540 *start_page = offset / *page_size; 1541 end_page = (offset + length - 1) / *page_size; 1542 *num_pages = (end_page - *start_page + 1); 1543 } 1544 1545 static void 1546 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1547 void *payload, uint64_t offset, uint64_t length, 1548 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1549 { 1550 struct spdk_fs_request *req; 1551 struct spdk_fs_cb_args *args; 1552 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1553 uint64_t start_page, num_pages, pin_buf_length; 1554 uint32_t page_size; 1555 1556 if (is_read && offset + length > file->length) { 1557 cb_fn(cb_arg, -EINVAL); 1558 return; 1559 } 1560 1561 req = alloc_fs_request(channel); 1562 if (req == NULL) { 1563 cb_fn(cb_arg, -ENOMEM); 1564 return; 1565 } 1566 1567 args = &req->args; 1568 args->fn.file_op = cb_fn; 1569 args->arg = cb_arg; 1570 args->file = file; 1571 args->op.rw.channel = channel->bs_channel; 1572 args->op.rw.user_buf = payload; 1573 args->op.rw.is_read = is_read; 1574 args->op.rw.offset = offset; 1575 args->op.rw.length = length; 1576 1577 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1578 pin_buf_length = num_pages * page_size; 1579 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL); 1580 1581 args->op.rw.start_page = start_page; 1582 args->op.rw.num_pages = num_pages; 1583 1584 if (!is_read && file->length < offset + length) { 1585 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1586 } else { 1587 __do_blob_read(req, 0); 1588 } 1589 } 1590 1591 void 1592 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1593 void *payload, uint64_t offset, uint64_t length, 1594 spdk_file_op_complete cb_fn, void *cb_arg) 1595 { 1596 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1597 } 1598 1599 void 1600 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1601 void *payload, uint64_t offset, uint64_t length, 1602 spdk_file_op_complete cb_fn, void *cb_arg) 1603 { 1604 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1605 file->name, offset, length); 1606 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1607 } 1608 1609 struct spdk_io_channel * 1610 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1611 { 1612 struct spdk_io_channel *io_channel; 1613 struct spdk_fs_channel *fs_channel; 1614 1615 io_channel = spdk_get_io_channel(&fs->io_target); 1616 fs_channel = spdk_io_channel_get_ctx(io_channel); 1617 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1618 fs_channel->send_request = __send_request_direct; 1619 1620 return io_channel; 1621 } 1622 1623 struct spdk_io_channel * 1624 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs) 1625 { 1626 struct spdk_io_channel *io_channel; 1627 struct spdk_fs_channel *fs_channel; 1628 1629 io_channel = spdk_get_io_channel(&fs->io_target); 1630 fs_channel = spdk_io_channel_get_ctx(io_channel); 1631 fs_channel->send_request = fs->send_request; 1632 fs_channel->sync = 1; 1633 pthread_spin_init(&fs_channel->lock, 0); 1634 1635 return io_channel; 1636 } 1637 1638 void 1639 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1640 { 1641 spdk_put_io_channel(channel); 1642 } 1643 1644 void 1645 spdk_fs_set_cache_size(uint64_t size_in_mb) 1646 { 1647 g_fs_cache_size = size_in_mb * 1024 * 1024; 1648 } 1649 1650 uint64_t 1651 spdk_fs_get_cache_size(void) 1652 { 1653 return g_fs_cache_size / (1024 * 1024); 1654 } 1655 1656 static void __file_flush(void *_args); 1657 1658 static void * 1659 alloc_cache_memory_buffer(struct spdk_file *context) 1660 { 1661 struct spdk_file *file; 1662 void *buf; 1663 1664 buf = spdk_mempool_get(g_cache_pool); 1665 if (buf != NULL) { 1666 return buf; 1667 } 1668 1669 pthread_spin_lock(&g_caches_lock); 1670 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1671 if (!file->open_for_writing && 1672 file->priority == SPDK_FILE_PRIORITY_LOW && 1673 file != context) { 1674 break; 1675 } 1676 } 1677 pthread_spin_unlock(&g_caches_lock); 1678 if (file != NULL) { 1679 cache_free_buffers(file); 1680 buf = spdk_mempool_get(g_cache_pool); 1681 if (buf != NULL) { 1682 return buf; 1683 } 1684 } 1685 1686 pthread_spin_lock(&g_caches_lock); 1687 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1688 if (!file->open_for_writing && file != context) { 1689 break; 1690 } 1691 } 1692 pthread_spin_unlock(&g_caches_lock); 1693 if (file != NULL) { 1694 cache_free_buffers(file); 1695 buf = spdk_mempool_get(g_cache_pool); 1696 if (buf != NULL) { 1697 return buf; 1698 } 1699 } 1700 1701 pthread_spin_lock(&g_caches_lock); 1702 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1703 if (file != context) { 1704 break; 1705 } 1706 } 1707 pthread_spin_unlock(&g_caches_lock); 1708 if (file != NULL) { 1709 cache_free_buffers(file); 1710 buf = spdk_mempool_get(g_cache_pool); 1711 if (buf != NULL) { 1712 return buf; 1713 } 1714 } 1715 1716 return NULL; 1717 } 1718 1719 static struct cache_buffer * 1720 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1721 { 1722 struct cache_buffer *buf; 1723 int count = 0; 1724 1725 buf = calloc(1, sizeof(*buf)); 1726 if (buf == NULL) { 1727 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 1728 return NULL; 1729 } 1730 1731 buf->buf = alloc_cache_memory_buffer(file); 1732 while (buf->buf == NULL) { 1733 /* 1734 * TODO: alloc_cache_memory_buffer() should eventually free 1735 * some buffers. Need a more sophisticated check here, instead 1736 * of just bailing if 100 tries does not result in getting a 1737 * free buffer. This will involve using the sync channel's 1738 * semaphore to block until a buffer becomes available. 1739 */ 1740 if (count++ == 100) { 1741 SPDK_ERRLOG("could not allocate cache buffer\n"); 1742 assert(false); 1743 free(buf); 1744 return NULL; 1745 } 1746 buf->buf = alloc_cache_memory_buffer(file); 1747 } 1748 1749 buf->buf_size = CACHE_BUFFER_SIZE; 1750 buf->offset = offset; 1751 1752 pthread_spin_lock(&g_caches_lock); 1753 if (file->tree->present_mask == 0) { 1754 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1755 } 1756 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1757 pthread_spin_unlock(&g_caches_lock); 1758 1759 return buf; 1760 } 1761 1762 static struct cache_buffer * 1763 cache_append_buffer(struct spdk_file *file) 1764 { 1765 struct cache_buffer *last; 1766 1767 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1768 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1769 1770 last = cache_insert_buffer(file, file->append_pos); 1771 if (last == NULL) { 1772 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 1773 return NULL; 1774 } 1775 1776 file->last = last; 1777 1778 return last; 1779 } 1780 1781 static void 1782 __wake_caller(struct spdk_fs_cb_args *args) 1783 { 1784 sem_post(args->sem); 1785 } 1786 1787 static void __check_sync_reqs(struct spdk_file *file); 1788 1789 static void 1790 __file_cache_finish_sync(struct spdk_file *file) 1791 { 1792 struct spdk_fs_request *sync_req; 1793 struct spdk_fs_cb_args *sync_args; 1794 1795 pthread_spin_lock(&file->lock); 1796 sync_req = TAILQ_FIRST(&file->sync_requests); 1797 sync_args = &sync_req->args; 1798 assert(sync_args->op.sync.offset <= file->length_flushed); 1799 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1800 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1801 pthread_spin_unlock(&file->lock); 1802 1803 sync_args->fn.file_op(sync_args->arg, 0); 1804 __check_sync_reqs(file); 1805 1806 pthread_spin_lock(&file->lock); 1807 free_fs_request(sync_req); 1808 pthread_spin_unlock(&file->lock); 1809 } 1810 1811 static void 1812 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1813 { 1814 struct spdk_file *file = ctx; 1815 1816 __file_cache_finish_sync(file); 1817 } 1818 1819 static void 1820 __free_args(struct spdk_fs_cb_args *args) 1821 { 1822 struct spdk_fs_request *req; 1823 1824 if (!args->from_request) { 1825 free(args); 1826 } else { 1827 /* Depends on args being at the start of the spdk_fs_request structure. */ 1828 req = (struct spdk_fs_request *)args; 1829 free_fs_request(req); 1830 } 1831 } 1832 1833 static void 1834 __check_sync_reqs(struct spdk_file *file) 1835 { 1836 struct spdk_fs_request *sync_req; 1837 1838 pthread_spin_lock(&file->lock); 1839 1840 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1841 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1842 break; 1843 } 1844 } 1845 1846 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1847 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1848 sync_req->args.op.sync.xattr_in_progress = true; 1849 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 1850 sizeof(file->length_flushed)); 1851 1852 pthread_spin_unlock(&file->lock); 1853 spdk_blob_sync_md(file->blob, __file_cache_finish_sync_bs_cb, file); 1854 } else { 1855 pthread_spin_unlock(&file->lock); 1856 } 1857 } 1858 1859 static void 1860 __file_flush_done(void *arg, int bserrno) 1861 { 1862 struct spdk_fs_cb_args *args = arg; 1863 struct spdk_file *file = args->file; 1864 struct cache_buffer *next = args->op.flush.cache_buffer; 1865 1866 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1867 1868 pthread_spin_lock(&file->lock); 1869 next->in_progress = false; 1870 next->bytes_flushed += args->op.flush.length; 1871 file->length_flushed += args->op.flush.length; 1872 if (file->length_flushed > file->length) { 1873 file->length = file->length_flushed; 1874 } 1875 if (next->bytes_flushed == next->buf_size) { 1876 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1877 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1878 } 1879 1880 /* 1881 * Assert that there is no cached data that extends past the end of the underlying 1882 * blob. 1883 */ 1884 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1885 next->bytes_filled == 0); 1886 1887 pthread_spin_unlock(&file->lock); 1888 1889 __check_sync_reqs(file); 1890 1891 __file_flush(args); 1892 } 1893 1894 static void 1895 __file_flush(void *_args) 1896 { 1897 struct spdk_fs_cb_args *args = _args; 1898 struct spdk_file *file = args->file; 1899 struct cache_buffer *next; 1900 uint64_t offset, length, start_page, num_pages; 1901 uint32_t page_size; 1902 1903 pthread_spin_lock(&file->lock); 1904 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1905 if (next == NULL || next->in_progress) { 1906 /* 1907 * There is either no data to flush, or a flush I/O is already in 1908 * progress. So return immediately - if a flush I/O is in 1909 * progress we will flush more data after that is completed. 1910 */ 1911 __free_args(args); 1912 pthread_spin_unlock(&file->lock); 1913 return; 1914 } 1915 1916 offset = next->offset + next->bytes_flushed; 1917 length = next->bytes_filled - next->bytes_flushed; 1918 if (length == 0) { 1919 __free_args(args); 1920 pthread_spin_unlock(&file->lock); 1921 return; 1922 } 1923 args->op.flush.length = length; 1924 args->op.flush.cache_buffer = next; 1925 1926 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1927 1928 next->in_progress = true; 1929 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1930 offset, length, start_page, num_pages); 1931 pthread_spin_unlock(&file->lock); 1932 spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1933 next->buf + (start_page * page_size) - next->offset, 1934 start_page, num_pages, 1935 __file_flush_done, args); 1936 } 1937 1938 static void 1939 __file_extend_done(void *arg, int bserrno) 1940 { 1941 struct spdk_fs_cb_args *args = arg; 1942 1943 __wake_caller(args); 1944 } 1945 1946 static void 1947 __file_extend_blob(void *_args) 1948 { 1949 struct spdk_fs_cb_args *args = _args; 1950 struct spdk_file *file = args->file; 1951 1952 spdk_blob_resize(file->blob, args->op.resize.num_clusters); 1953 1954 spdk_blob_sync_md(file->blob, __file_extend_done, args); 1955 } 1956 1957 static void 1958 __rw_from_file_done(void *arg, int bserrno) 1959 { 1960 struct spdk_fs_cb_args *args = arg; 1961 1962 __wake_caller(args); 1963 __free_args(args); 1964 } 1965 1966 static void 1967 __rw_from_file(void *_args) 1968 { 1969 struct spdk_fs_cb_args *args = _args; 1970 struct spdk_file *file = args->file; 1971 1972 if (args->op.rw.is_read) { 1973 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1974 args->op.rw.offset, args->op.rw.length, 1975 __rw_from_file_done, args); 1976 } else { 1977 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1978 args->op.rw.offset, args->op.rw.length, 1979 __rw_from_file_done, args); 1980 } 1981 } 1982 1983 static int 1984 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1985 uint64_t offset, uint64_t length, bool is_read) 1986 { 1987 struct spdk_fs_cb_args *args; 1988 1989 args = calloc(1, sizeof(*args)); 1990 if (args == NULL) { 1991 sem_post(sem); 1992 return -ENOMEM; 1993 } 1994 1995 args->file = file; 1996 args->sem = sem; 1997 args->op.rw.user_buf = payload; 1998 args->op.rw.offset = offset; 1999 args->op.rw.length = length; 2000 args->op.rw.is_read = is_read; 2001 file->fs->send_request(__rw_from_file, args); 2002 return 0; 2003 } 2004 2005 int 2006 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 2007 void *payload, uint64_t offset, uint64_t length) 2008 { 2009 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2010 struct spdk_fs_cb_args *args; 2011 uint64_t rem_length, copy, blob_size, cluster_sz; 2012 uint32_t cache_buffers_filled = 0; 2013 uint8_t *cur_payload; 2014 struct cache_buffer *last; 2015 2016 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2017 2018 if (length == 0) { 2019 return 0; 2020 } 2021 2022 if (offset != file->append_pos) { 2023 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2024 return -EINVAL; 2025 } 2026 2027 pthread_spin_lock(&file->lock); 2028 file->open_for_writing = true; 2029 2030 if (file->last == NULL) { 2031 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 2032 cache_append_buffer(file); 2033 } else { 2034 int rc; 2035 2036 file->append_pos += length; 2037 pthread_spin_unlock(&file->lock); 2038 rc = __send_rw_from_file(file, &channel->sem, payload, 2039 offset, length, false); 2040 sem_wait(&channel->sem); 2041 return rc; 2042 } 2043 } 2044 2045 blob_size = __file_get_blob_size(file); 2046 2047 if ((offset + length) > blob_size) { 2048 struct spdk_fs_cb_args extend_args = {}; 2049 2050 cluster_sz = file->fs->bs_opts.cluster_sz; 2051 extend_args.sem = &channel->sem; 2052 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2053 extend_args.file = file; 2054 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2055 pthread_spin_unlock(&file->lock); 2056 file->fs->send_request(__file_extend_blob, &extend_args); 2057 sem_wait(&channel->sem); 2058 } 2059 2060 last = file->last; 2061 rem_length = length; 2062 cur_payload = payload; 2063 while (rem_length > 0) { 2064 copy = last->buf_size - last->bytes_filled; 2065 if (copy > rem_length) { 2066 copy = rem_length; 2067 } 2068 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2069 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2070 file->append_pos += copy; 2071 if (file->length < file->append_pos) { 2072 file->length = file->append_pos; 2073 } 2074 cur_payload += copy; 2075 last->bytes_filled += copy; 2076 rem_length -= copy; 2077 if (last->bytes_filled == last->buf_size) { 2078 cache_buffers_filled++; 2079 last = cache_append_buffer(file); 2080 if (last == NULL) { 2081 BLOBFS_TRACE(file, "nomem\n"); 2082 pthread_spin_unlock(&file->lock); 2083 return -ENOMEM; 2084 } 2085 } 2086 } 2087 2088 pthread_spin_unlock(&file->lock); 2089 2090 if (cache_buffers_filled == 0) { 2091 return 0; 2092 } 2093 2094 args = calloc(1, sizeof(*args)); 2095 if (args == NULL) { 2096 return -ENOMEM; 2097 } 2098 2099 args->file = file; 2100 file->fs->send_request(__file_flush, args); 2101 return 0; 2102 } 2103 2104 static void 2105 __readahead_done(void *arg, int bserrno) 2106 { 2107 struct spdk_fs_cb_args *args = arg; 2108 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2109 struct spdk_file *file = args->file; 2110 2111 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2112 2113 pthread_spin_lock(&file->lock); 2114 cache_buffer->bytes_filled = args->op.readahead.length; 2115 cache_buffer->bytes_flushed = args->op.readahead.length; 2116 cache_buffer->in_progress = false; 2117 pthread_spin_unlock(&file->lock); 2118 2119 __free_args(args); 2120 } 2121 2122 static void 2123 __readahead(void *_args) 2124 { 2125 struct spdk_fs_cb_args *args = _args; 2126 struct spdk_file *file = args->file; 2127 uint64_t offset, length, start_page, num_pages; 2128 uint32_t page_size; 2129 2130 offset = args->op.readahead.offset; 2131 length = args->op.readahead.length; 2132 assert(length > 0); 2133 2134 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 2135 2136 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2137 offset, length, start_page, num_pages); 2138 spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2139 args->op.readahead.cache_buffer->buf, 2140 start_page, num_pages, 2141 __readahead_done, args); 2142 } 2143 2144 static uint64_t 2145 __next_cache_buffer_offset(uint64_t offset) 2146 { 2147 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2148 } 2149 2150 static void 2151 check_readahead(struct spdk_file *file, uint64_t offset) 2152 { 2153 struct spdk_fs_cb_args *args; 2154 2155 offset = __next_cache_buffer_offset(offset); 2156 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2157 return; 2158 } 2159 2160 args = calloc(1, sizeof(*args)); 2161 if (args == NULL) { 2162 return; 2163 } 2164 2165 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2166 2167 args->file = file; 2168 args->op.readahead.offset = offset; 2169 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2170 args->op.readahead.cache_buffer->in_progress = true; 2171 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2172 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2173 } else { 2174 args->op.readahead.length = CACHE_BUFFER_SIZE; 2175 } 2176 file->fs->send_request(__readahead, args); 2177 } 2178 2179 static int 2180 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2181 { 2182 struct cache_buffer *buf; 2183 int rc; 2184 2185 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2186 if (buf == NULL) { 2187 pthread_spin_unlock(&file->lock); 2188 rc = __send_rw_from_file(file, sem, payload, offset, length, true); 2189 pthread_spin_lock(&file->lock); 2190 return rc; 2191 } 2192 2193 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2194 length = buf->offset + buf->bytes_filled - offset; 2195 } 2196 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2197 memcpy(payload, &buf->buf[offset - buf->offset], length); 2198 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2199 pthread_spin_lock(&g_caches_lock); 2200 spdk_tree_remove_buffer(file->tree, buf); 2201 if (file->tree->present_mask == 0) { 2202 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2203 } 2204 pthread_spin_unlock(&g_caches_lock); 2205 } 2206 2207 sem_post(sem); 2208 return 0; 2209 } 2210 2211 int64_t 2212 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2213 void *payload, uint64_t offset, uint64_t length) 2214 { 2215 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2216 uint64_t final_offset, final_length; 2217 uint32_t sub_reads = 0; 2218 int rc = 0; 2219 2220 pthread_spin_lock(&file->lock); 2221 2222 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2223 2224 file->open_for_writing = false; 2225 2226 if (length == 0 || offset >= file->append_pos) { 2227 pthread_spin_unlock(&file->lock); 2228 return 0; 2229 } 2230 2231 if (offset + length > file->append_pos) { 2232 length = file->append_pos - offset; 2233 } 2234 2235 if (offset != file->next_seq_offset) { 2236 file->seq_byte_count = 0; 2237 } 2238 file->seq_byte_count += length; 2239 file->next_seq_offset = offset + length; 2240 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2241 check_readahead(file, offset); 2242 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2243 } 2244 2245 final_length = 0; 2246 final_offset = offset + length; 2247 while (offset < final_offset) { 2248 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2249 if (length > (final_offset - offset)) { 2250 length = final_offset - offset; 2251 } 2252 rc = __file_read(file, payload, offset, length, &channel->sem); 2253 if (rc == 0) { 2254 final_length += length; 2255 } else { 2256 break; 2257 } 2258 payload += length; 2259 offset += length; 2260 sub_reads++; 2261 } 2262 pthread_spin_unlock(&file->lock); 2263 while (sub_reads-- > 0) { 2264 sem_wait(&channel->sem); 2265 } 2266 if (rc == 0) { 2267 return final_length; 2268 } else { 2269 return rc; 2270 } 2271 } 2272 2273 static void 2274 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2275 spdk_file_op_complete cb_fn, void *cb_arg) 2276 { 2277 struct spdk_fs_request *sync_req; 2278 struct spdk_fs_request *flush_req; 2279 struct spdk_fs_cb_args *sync_args; 2280 struct spdk_fs_cb_args *flush_args; 2281 2282 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2283 2284 pthread_spin_lock(&file->lock); 2285 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2286 BLOBFS_TRACE(file, "done - no data to flush\n"); 2287 pthread_spin_unlock(&file->lock); 2288 cb_fn(cb_arg, 0); 2289 return; 2290 } 2291 2292 sync_req = alloc_fs_request(channel); 2293 assert(sync_req != NULL); 2294 sync_args = &sync_req->args; 2295 2296 flush_req = alloc_fs_request(channel); 2297 assert(flush_req != NULL); 2298 flush_args = &flush_req->args; 2299 2300 sync_args->file = file; 2301 sync_args->fn.file_op = cb_fn; 2302 sync_args->arg = cb_arg; 2303 sync_args->op.sync.offset = file->append_pos; 2304 sync_args->op.sync.xattr_in_progress = false; 2305 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2306 pthread_spin_unlock(&file->lock); 2307 2308 flush_args->file = file; 2309 channel->send_request(__file_flush, flush_args); 2310 } 2311 2312 int 2313 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2314 { 2315 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2316 2317 _file_sync(file, channel, __sem_post, &channel->sem); 2318 sem_wait(&channel->sem); 2319 2320 return 0; 2321 } 2322 2323 void 2324 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2325 spdk_file_op_complete cb_fn, void *cb_arg) 2326 { 2327 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2328 2329 _file_sync(file, channel, cb_fn, cb_arg); 2330 } 2331 2332 void 2333 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2334 { 2335 BLOBFS_TRACE(file, "priority=%u\n", priority); 2336 file->priority = priority; 2337 2338 } 2339 2340 /* 2341 * Close routines 2342 */ 2343 2344 static void 2345 __file_close_async_done(void *ctx, int bserrno) 2346 { 2347 struct spdk_fs_request *req = ctx; 2348 struct spdk_fs_cb_args *args = &req->args; 2349 struct spdk_file *file = args->file; 2350 2351 if (file->is_deleted) { 2352 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2353 return; 2354 } 2355 args->fn.file_op(args->arg, bserrno); 2356 free_fs_request(req); 2357 } 2358 2359 static void 2360 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2361 { 2362 struct spdk_blob *blob; 2363 2364 pthread_spin_lock(&file->lock); 2365 if (file->ref_count == 0) { 2366 pthread_spin_unlock(&file->lock); 2367 __file_close_async_done(req, -EBADF); 2368 return; 2369 } 2370 2371 file->ref_count--; 2372 if (file->ref_count > 0) { 2373 pthread_spin_unlock(&file->lock); 2374 __file_close_async_done(req, 0); 2375 return; 2376 } 2377 2378 pthread_spin_unlock(&file->lock); 2379 2380 blob = file->blob; 2381 file->blob = NULL; 2382 spdk_blob_close(blob, __file_close_async_done, req); 2383 } 2384 2385 static void 2386 __file_close_async__sync_done(void *arg, int fserrno) 2387 { 2388 struct spdk_fs_request *req = arg; 2389 struct spdk_fs_cb_args *args = &req->args; 2390 2391 __file_close_async(args->file, req); 2392 } 2393 2394 void 2395 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2396 { 2397 struct spdk_fs_request *req; 2398 struct spdk_fs_cb_args *args; 2399 2400 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2401 if (req == NULL) { 2402 cb_fn(cb_arg, -ENOMEM); 2403 return; 2404 } 2405 2406 args = &req->args; 2407 args->file = file; 2408 args->fn.file_op = cb_fn; 2409 args->arg = cb_arg; 2410 2411 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2412 } 2413 2414 static void 2415 __file_close_done(void *arg, int fserrno) 2416 { 2417 struct spdk_fs_cb_args *args = arg; 2418 2419 args->rc = fserrno; 2420 sem_post(args->sem); 2421 } 2422 2423 static void 2424 __file_close(void *arg) 2425 { 2426 struct spdk_fs_request *req = arg; 2427 struct spdk_fs_cb_args *args = &req->args; 2428 struct spdk_file *file = args->file; 2429 2430 __file_close_async(file, req); 2431 } 2432 2433 int 2434 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2435 { 2436 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2437 struct spdk_fs_request *req; 2438 struct spdk_fs_cb_args *args; 2439 2440 req = alloc_fs_request(channel); 2441 assert(req != NULL); 2442 2443 args = &req->args; 2444 2445 spdk_file_sync(file, _channel); 2446 BLOBFS_TRACE(file, "name=%s\n", file->name); 2447 args->file = file; 2448 args->sem = &channel->sem; 2449 args->fn.file_op = __file_close_done; 2450 args->arg = req; 2451 channel->send_request(__file_close, req); 2452 sem_wait(&channel->sem); 2453 2454 return args->rc; 2455 } 2456 2457 static void 2458 cache_free_buffers(struct spdk_file *file) 2459 { 2460 BLOBFS_TRACE(file, "free=%s\n", file->name); 2461 pthread_spin_lock(&file->lock); 2462 pthread_spin_lock(&g_caches_lock); 2463 if (file->tree->present_mask == 0) { 2464 pthread_spin_unlock(&g_caches_lock); 2465 pthread_spin_unlock(&file->lock); 2466 return; 2467 } 2468 spdk_tree_free_buffers(file->tree); 2469 2470 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2471 /* If not freed, put it in the end of the queue */ 2472 if (file->tree->present_mask != 0) { 2473 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2474 } 2475 file->last = NULL; 2476 pthread_spin_unlock(&g_caches_lock); 2477 pthread_spin_unlock(&file->lock); 2478 } 2479 2480 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2481 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2482