1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "blobfs_internal.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/thread.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 47 #define BLOBFS_TRACE(file, str, args...) \ 48 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 49 50 #define BLOBFS_TRACE_RW(file, str, args...) \ 51 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 55 56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 57 static struct spdk_mempool *g_cache_pool; 58 static TAILQ_HEAD(, spdk_file) g_caches; 59 static int g_fs_count = 0; 60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 61 static pthread_spinlock_t g_caches_lock; 62 63 static void 64 __sem_post(void *arg, int bserrno) 65 { 66 sem_t *sem = arg; 67 68 sem_post(sem); 69 } 70 71 void 72 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 73 { 74 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 75 free(cache_buffer); 76 } 77 78 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 79 80 struct spdk_file { 81 struct spdk_filesystem *fs; 82 struct spdk_blob *blob; 83 char *name; 84 uint64_t length; 85 bool is_deleted; 86 bool open_for_writing; 87 uint64_t length_flushed; 88 uint64_t append_pos; 89 uint64_t seq_byte_count; 90 uint64_t next_seq_offset; 91 uint32_t priority; 92 TAILQ_ENTRY(spdk_file) tailq; 93 spdk_blob_id blobid; 94 uint32_t ref_count; 95 pthread_spinlock_t lock; 96 struct cache_buffer *last; 97 struct cache_tree *tree; 98 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 99 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 100 TAILQ_ENTRY(spdk_file) cache_tailq; 101 }; 102 103 struct spdk_deleted_file { 104 spdk_blob_id id; 105 TAILQ_ENTRY(spdk_deleted_file) tailq; 106 }; 107 108 struct spdk_filesystem { 109 struct spdk_blob_store *bs; 110 TAILQ_HEAD(, spdk_file) files; 111 struct spdk_bs_opts bs_opts; 112 struct spdk_bs_dev *bdev; 113 fs_send_request_fn send_request; 114 115 struct { 116 uint32_t max_ops; 117 struct spdk_io_channel *sync_io_channel; 118 struct spdk_fs_channel *sync_fs_channel; 119 } sync_target; 120 121 struct { 122 uint32_t max_ops; 123 struct spdk_io_channel *md_io_channel; 124 struct spdk_fs_channel *md_fs_channel; 125 } md_target; 126 127 struct { 128 uint32_t max_ops; 129 } io_target; 130 }; 131 132 struct spdk_fs_cb_args { 133 union { 134 spdk_fs_op_with_handle_complete fs_op_with_handle; 135 spdk_fs_op_complete fs_op; 136 spdk_file_op_with_handle_complete file_op_with_handle; 137 spdk_file_op_complete file_op; 138 spdk_file_stat_op_complete stat_op; 139 } fn; 140 void *arg; 141 sem_t *sem; 142 struct spdk_filesystem *fs; 143 struct spdk_file *file; 144 int rc; 145 bool from_request; 146 union { 147 struct { 148 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 149 } fs_load; 150 struct { 151 uint64_t length; 152 } truncate; 153 struct { 154 struct spdk_io_channel *channel; 155 void *user_buf; 156 void *pin_buf; 157 int is_read; 158 off_t offset; 159 size_t length; 160 uint64_t start_page; 161 uint64_t num_pages; 162 uint32_t blocklen; 163 } rw; 164 struct { 165 const char *old_name; 166 const char *new_name; 167 } rename; 168 struct { 169 struct cache_buffer *cache_buffer; 170 uint64_t length; 171 } flush; 172 struct { 173 struct cache_buffer *cache_buffer; 174 uint64_t length; 175 uint64_t offset; 176 } readahead; 177 struct { 178 uint64_t offset; 179 TAILQ_ENTRY(spdk_fs_request) tailq; 180 bool xattr_in_progress; 181 } sync; 182 struct { 183 uint32_t num_clusters; 184 } resize; 185 struct { 186 const char *name; 187 uint32_t flags; 188 TAILQ_ENTRY(spdk_fs_request) tailq; 189 } open; 190 struct { 191 const char *name; 192 struct spdk_blob *blob; 193 } create; 194 struct { 195 const char *name; 196 } delete; 197 struct { 198 const char *name; 199 } stat; 200 } op; 201 }; 202 203 static void cache_free_buffers(struct spdk_file *file); 204 205 void 206 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 207 { 208 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 209 } 210 211 static void 212 __initialize_cache(void) 213 { 214 assert(g_cache_pool == NULL); 215 216 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 217 g_fs_cache_size / CACHE_BUFFER_SIZE, 218 CACHE_BUFFER_SIZE, 219 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 220 SPDK_ENV_SOCKET_ID_ANY); 221 TAILQ_INIT(&g_caches); 222 pthread_spin_init(&g_caches_lock, 0); 223 } 224 225 static void 226 __free_cache(void) 227 { 228 assert(g_cache_pool != NULL); 229 230 spdk_mempool_free(g_cache_pool); 231 g_cache_pool = NULL; 232 } 233 234 static uint64_t 235 __file_get_blob_size(struct spdk_file *file) 236 { 237 uint64_t cluster_sz; 238 239 cluster_sz = file->fs->bs_opts.cluster_sz; 240 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 241 } 242 243 struct spdk_fs_request { 244 struct spdk_fs_cb_args args; 245 TAILQ_ENTRY(spdk_fs_request) link; 246 struct spdk_fs_channel *channel; 247 }; 248 249 struct spdk_fs_channel { 250 struct spdk_fs_request *req_mem; 251 TAILQ_HEAD(, spdk_fs_request) reqs; 252 sem_t sem; 253 struct spdk_filesystem *fs; 254 struct spdk_io_channel *bs_channel; 255 fs_send_request_fn send_request; 256 bool sync; 257 pthread_spinlock_t lock; 258 }; 259 260 static struct spdk_fs_request * 261 alloc_fs_request(struct spdk_fs_channel *channel) 262 { 263 struct spdk_fs_request *req; 264 265 if (channel->sync) { 266 pthread_spin_lock(&channel->lock); 267 } 268 269 req = TAILQ_FIRST(&channel->reqs); 270 if (req) { 271 TAILQ_REMOVE(&channel->reqs, req, link); 272 } 273 274 if (channel->sync) { 275 pthread_spin_unlock(&channel->lock); 276 } 277 278 if (req == NULL) { 279 return NULL; 280 } 281 memset(req, 0, sizeof(*req)); 282 req->channel = channel; 283 req->args.from_request = true; 284 285 return req; 286 } 287 288 static void 289 free_fs_request(struct spdk_fs_request *req) 290 { 291 struct spdk_fs_channel *channel = req->channel; 292 293 if (channel->sync) { 294 pthread_spin_lock(&channel->lock); 295 } 296 297 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 298 299 if (channel->sync) { 300 pthread_spin_unlock(&channel->lock); 301 } 302 } 303 304 static int 305 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 306 uint32_t max_ops) 307 { 308 uint32_t i; 309 310 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 311 if (!channel->req_mem) { 312 return -1; 313 } 314 315 TAILQ_INIT(&channel->reqs); 316 sem_init(&channel->sem, 0, 0); 317 318 for (i = 0; i < max_ops; i++) { 319 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 320 } 321 322 channel->fs = fs; 323 324 return 0; 325 } 326 327 static int 328 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 329 { 330 struct spdk_filesystem *fs; 331 struct spdk_fs_channel *channel = ctx_buf; 332 333 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 334 335 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 336 } 337 338 static int 339 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 340 { 341 struct spdk_filesystem *fs; 342 struct spdk_fs_channel *channel = ctx_buf; 343 344 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 345 346 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 347 } 348 349 static int 350 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 351 { 352 struct spdk_filesystem *fs; 353 struct spdk_fs_channel *channel = ctx_buf; 354 355 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 356 357 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 358 } 359 360 static void 361 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 362 { 363 struct spdk_fs_channel *channel = ctx_buf; 364 365 free(channel->req_mem); 366 if (channel->bs_channel != NULL) { 367 spdk_bs_free_io_channel(channel->bs_channel); 368 } 369 } 370 371 static void 372 __send_request_direct(fs_request_fn fn, void *arg) 373 { 374 fn(arg); 375 } 376 377 static void 378 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 379 { 380 fs->bs = bs; 381 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 382 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 383 fs->md_target.md_fs_channel->send_request = __send_request_direct; 384 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 385 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 386 387 pthread_mutex_lock(&g_cache_init_lock); 388 if (g_fs_count == 0) { 389 __initialize_cache(); 390 } 391 g_fs_count++; 392 pthread_mutex_unlock(&g_cache_init_lock); 393 } 394 395 static void 396 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 397 { 398 struct spdk_fs_request *req = ctx; 399 struct spdk_fs_cb_args *args = &req->args; 400 struct spdk_filesystem *fs = args->fs; 401 402 if (bserrno == 0) { 403 common_fs_bs_init(fs, bs); 404 } else { 405 free(fs); 406 fs = NULL; 407 } 408 409 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 410 free_fs_request(req); 411 } 412 413 static void 414 fs_conf_parse(void) 415 { 416 struct spdk_conf_section *sp; 417 418 sp = spdk_conf_find_section(NULL, "Blobfs"); 419 if (sp == NULL) { 420 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 421 return; 422 } 423 424 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 425 if (g_fs_cache_buffer_shift <= 0) { 426 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 427 } 428 } 429 430 static struct spdk_filesystem * 431 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 432 { 433 struct spdk_filesystem *fs; 434 435 fs = calloc(1, sizeof(*fs)); 436 if (fs == NULL) { 437 return NULL; 438 } 439 440 fs->bdev = dev; 441 fs->send_request = send_request_fn; 442 TAILQ_INIT(&fs->files); 443 444 fs->md_target.max_ops = 512; 445 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 446 sizeof(struct spdk_fs_channel)); 447 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 448 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 449 450 fs->sync_target.max_ops = 512; 451 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 452 sizeof(struct spdk_fs_channel)); 453 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 454 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 455 456 fs->io_target.max_ops = 512; 457 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 458 sizeof(struct spdk_fs_channel)); 459 460 return fs; 461 } 462 463 void 464 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 465 fs_send_request_fn send_request_fn, 466 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 467 { 468 struct spdk_filesystem *fs; 469 struct spdk_fs_request *req; 470 struct spdk_fs_cb_args *args; 471 struct spdk_bs_opts opts = {}; 472 473 fs = fs_alloc(dev, send_request_fn); 474 if (fs == NULL) { 475 cb_fn(cb_arg, NULL, -ENOMEM); 476 return; 477 } 478 479 fs_conf_parse(); 480 481 req = alloc_fs_request(fs->md_target.md_fs_channel); 482 if (req == NULL) { 483 spdk_put_io_channel(fs->md_target.md_io_channel); 484 spdk_io_device_unregister(&fs->md_target, NULL); 485 spdk_put_io_channel(fs->sync_target.sync_io_channel); 486 spdk_io_device_unregister(&fs->sync_target, NULL); 487 spdk_io_device_unregister(&fs->io_target, NULL); 488 free(fs); 489 cb_fn(cb_arg, NULL, -ENOMEM); 490 return; 491 } 492 493 args = &req->args; 494 args->fn.fs_op_with_handle = cb_fn; 495 args->arg = cb_arg; 496 args->fs = fs; 497 498 spdk_bs_opts_init(&opts); 499 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS"); 500 if (opt) { 501 opts.cluster_sz = opt->cluster_sz; 502 } 503 spdk_bs_init(dev, &opts, init_cb, req); 504 } 505 506 static struct spdk_file * 507 file_alloc(struct spdk_filesystem *fs) 508 { 509 struct spdk_file *file; 510 511 file = calloc(1, sizeof(*file)); 512 if (file == NULL) { 513 return NULL; 514 } 515 516 file->tree = calloc(1, sizeof(*file->tree)); 517 if (file->tree == NULL) { 518 free(file); 519 return NULL; 520 } 521 522 file->fs = fs; 523 TAILQ_INIT(&file->open_requests); 524 TAILQ_INIT(&file->sync_requests); 525 pthread_spin_init(&file->lock, 0); 526 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 527 file->priority = SPDK_FILE_PRIORITY_LOW; 528 return file; 529 } 530 531 static void fs_load_done(void *ctx, int bserrno); 532 533 static int 534 _handle_deleted_files(struct spdk_fs_request *req) 535 { 536 struct spdk_fs_cb_args *args = &req->args; 537 struct spdk_filesystem *fs = args->fs; 538 539 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 540 struct spdk_deleted_file *deleted_file; 541 542 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 543 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 544 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 545 free(deleted_file); 546 return 0; 547 } 548 549 return 1; 550 } 551 552 static void 553 fs_load_done(void *ctx, int bserrno) 554 { 555 struct spdk_fs_request *req = ctx; 556 struct spdk_fs_cb_args *args = &req->args; 557 struct spdk_filesystem *fs = args->fs; 558 559 /* The filesystem has been loaded. Now check if there are any files that 560 * were marked for deletion before last unload. Do not complete the 561 * fs_load callback until all of them have been deleted on disk. 562 */ 563 if (_handle_deleted_files(req) == 0) { 564 /* We found a file that's been marked for deleting but not actually 565 * deleted yet. This function will get called again once the delete 566 * operation is completed. 567 */ 568 return; 569 } 570 571 args->fn.fs_op_with_handle(args->arg, fs, 0); 572 free_fs_request(req); 573 574 } 575 576 static void 577 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 578 { 579 struct spdk_fs_request *req = ctx; 580 struct spdk_fs_cb_args *args = &req->args; 581 struct spdk_filesystem *fs = args->fs; 582 uint64_t *length; 583 const char *name; 584 uint32_t *is_deleted; 585 size_t value_len; 586 587 if (rc < 0) { 588 args->fn.fs_op_with_handle(args->arg, fs, rc); 589 free_fs_request(req); 590 return; 591 } 592 593 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 594 if (rc < 0) { 595 args->fn.fs_op_with_handle(args->arg, fs, rc); 596 free_fs_request(req); 597 return; 598 } 599 600 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 601 if (rc < 0) { 602 args->fn.fs_op_with_handle(args->arg, fs, rc); 603 free_fs_request(req); 604 return; 605 } 606 607 assert(value_len == 8); 608 609 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 610 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 611 if (rc < 0) { 612 struct spdk_file *f; 613 614 f = file_alloc(fs); 615 if (f == NULL) { 616 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 617 free_fs_request(req); 618 return; 619 } 620 621 f->name = strdup(name); 622 f->blobid = spdk_blob_get_id(blob); 623 f->length = *length; 624 f->length_flushed = *length; 625 f->append_pos = *length; 626 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 627 } else { 628 struct spdk_deleted_file *deleted_file; 629 630 deleted_file = calloc(1, sizeof(*deleted_file)); 631 if (deleted_file == NULL) { 632 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 633 free_fs_request(req); 634 return; 635 } 636 deleted_file->id = spdk_blob_get_id(blob); 637 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 638 } 639 } 640 641 static void 642 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 643 { 644 struct spdk_fs_request *req = ctx; 645 struct spdk_fs_cb_args *args = &req->args; 646 struct spdk_filesystem *fs = args->fs; 647 struct spdk_bs_type bstype; 648 static const struct spdk_bs_type blobfs_type = {"BLOBFS"}; 649 static const struct spdk_bs_type zeros; 650 651 if (bserrno != 0) { 652 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 653 free_fs_request(req); 654 free(fs); 655 return; 656 } 657 658 bstype = spdk_bs_get_bstype(bs); 659 660 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 661 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n"); 662 spdk_bs_set_bstype(bs, blobfs_type); 663 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 664 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n"); 665 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype)); 666 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 667 free_fs_request(req); 668 free(fs); 669 return; 670 } 671 672 common_fs_bs_init(fs, bs); 673 fs_load_done(req, 0); 674 } 675 676 static void 677 spdk_fs_io_device_unregister(struct spdk_filesystem *fs) 678 { 679 assert(fs != NULL); 680 spdk_io_device_unregister(&fs->md_target, NULL); 681 spdk_io_device_unregister(&fs->sync_target, NULL); 682 spdk_io_device_unregister(&fs->io_target, NULL); 683 free(fs); 684 } 685 686 static void 687 spdk_fs_free_io_channels(struct spdk_filesystem *fs) 688 { 689 assert(fs != NULL); 690 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 691 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 692 } 693 694 void 695 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 696 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 697 { 698 struct spdk_filesystem *fs; 699 struct spdk_fs_cb_args *args; 700 struct spdk_fs_request *req; 701 struct spdk_bs_opts bs_opts; 702 703 fs = fs_alloc(dev, send_request_fn); 704 if (fs == NULL) { 705 cb_fn(cb_arg, NULL, -ENOMEM); 706 return; 707 } 708 709 fs_conf_parse(); 710 711 req = alloc_fs_request(fs->md_target.md_fs_channel); 712 if (req == NULL) { 713 spdk_fs_free_io_channels(fs); 714 spdk_fs_io_device_unregister(fs); 715 cb_fn(cb_arg, NULL, -ENOMEM); 716 return; 717 } 718 719 args = &req->args; 720 args->fn.fs_op_with_handle = cb_fn; 721 args->arg = cb_arg; 722 args->fs = fs; 723 TAILQ_INIT(&args->op.fs_load.deleted_files); 724 spdk_bs_opts_init(&bs_opts); 725 bs_opts.iter_cb_fn = iter_cb; 726 bs_opts.iter_cb_arg = req; 727 spdk_bs_load(dev, &bs_opts, load_cb, req); 728 } 729 730 static void 731 unload_cb(void *ctx, int bserrno) 732 { 733 struct spdk_fs_request *req = ctx; 734 struct spdk_fs_cb_args *args = &req->args; 735 struct spdk_filesystem *fs = args->fs; 736 737 pthread_mutex_lock(&g_cache_init_lock); 738 g_fs_count--; 739 if (g_fs_count == 0) { 740 __free_cache(); 741 } 742 pthread_mutex_unlock(&g_cache_init_lock); 743 744 args->fn.fs_op(args->arg, bserrno); 745 free(req); 746 747 spdk_fs_io_device_unregister(fs); 748 } 749 750 void 751 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 752 { 753 struct spdk_fs_request *req; 754 struct spdk_fs_cb_args *args; 755 756 /* 757 * We must free the md_channel before unloading the blobstore, so just 758 * allocate this request from the general heap. 759 */ 760 req = calloc(1, sizeof(*req)); 761 if (req == NULL) { 762 cb_fn(cb_arg, -ENOMEM); 763 return; 764 } 765 766 args = &req->args; 767 args->fn.fs_op = cb_fn; 768 args->arg = cb_arg; 769 args->fs = fs; 770 771 spdk_fs_free_io_channels(fs); 772 spdk_bs_unload(fs->bs, unload_cb, req); 773 } 774 775 static struct spdk_file * 776 fs_find_file(struct spdk_filesystem *fs, const char *name) 777 { 778 struct spdk_file *file; 779 780 TAILQ_FOREACH(file, &fs->files, tailq) { 781 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 782 return file; 783 } 784 } 785 786 return NULL; 787 } 788 789 void 790 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 791 spdk_file_stat_op_complete cb_fn, void *cb_arg) 792 { 793 struct spdk_file_stat stat; 794 struct spdk_file *f = NULL; 795 796 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 797 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 798 return; 799 } 800 801 f = fs_find_file(fs, name); 802 if (f != NULL) { 803 stat.blobid = f->blobid; 804 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 805 cb_fn(cb_arg, &stat, 0); 806 return; 807 } 808 809 cb_fn(cb_arg, NULL, -ENOENT); 810 } 811 812 static void 813 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 814 { 815 struct spdk_fs_request *req = arg; 816 struct spdk_fs_cb_args *args = &req->args; 817 818 args->rc = fserrno; 819 if (fserrno == 0) { 820 memcpy(args->arg, stat, sizeof(*stat)); 821 } 822 sem_post(args->sem); 823 } 824 825 static void 826 __file_stat(void *arg) 827 { 828 struct spdk_fs_request *req = arg; 829 struct spdk_fs_cb_args *args = &req->args; 830 831 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 832 args->fn.stat_op, req); 833 } 834 835 int 836 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 837 const char *name, struct spdk_file_stat *stat) 838 { 839 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 840 struct spdk_fs_request *req; 841 int rc; 842 843 req = alloc_fs_request(channel); 844 if (req == NULL) { 845 return -ENOMEM; 846 } 847 848 req->args.fs = fs; 849 req->args.op.stat.name = name; 850 req->args.fn.stat_op = __copy_stat; 851 req->args.arg = stat; 852 req->args.sem = &channel->sem; 853 channel->send_request(__file_stat, req); 854 sem_wait(&channel->sem); 855 856 rc = req->args.rc; 857 free_fs_request(req); 858 859 return rc; 860 } 861 862 static void 863 fs_create_blob_close_cb(void *ctx, int bserrno) 864 { 865 struct spdk_fs_request *req = ctx; 866 struct spdk_fs_cb_args *args = &req->args; 867 868 args->fn.file_op(args->arg, bserrno); 869 free_fs_request(req); 870 } 871 872 static void 873 fs_create_blob_resize_cb(void *ctx, int bserrno) 874 { 875 struct spdk_fs_request *req = ctx; 876 struct spdk_fs_cb_args *args = &req->args; 877 struct spdk_file *f = args->file; 878 struct spdk_blob *blob = args->op.create.blob; 879 uint64_t length = 0; 880 881 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 882 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 883 884 spdk_blob_close(blob, fs_create_blob_close_cb, args); 885 } 886 887 static void 888 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 889 { 890 struct spdk_fs_request *req = ctx; 891 struct spdk_fs_cb_args *args = &req->args; 892 893 args->op.create.blob = blob; 894 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 895 } 896 897 static void 898 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 899 { 900 struct spdk_fs_request *req = ctx; 901 struct spdk_fs_cb_args *args = &req->args; 902 struct spdk_file *f = args->file; 903 904 f->blobid = blobid; 905 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 906 } 907 908 void 909 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 910 spdk_file_op_complete cb_fn, void *cb_arg) 911 { 912 struct spdk_file *file; 913 struct spdk_fs_request *req; 914 struct spdk_fs_cb_args *args; 915 916 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 917 cb_fn(cb_arg, -ENAMETOOLONG); 918 return; 919 } 920 921 file = fs_find_file(fs, name); 922 if (file != NULL) { 923 cb_fn(cb_arg, -EEXIST); 924 return; 925 } 926 927 file = file_alloc(fs); 928 if (file == NULL) { 929 cb_fn(cb_arg, -ENOMEM); 930 return; 931 } 932 933 req = alloc_fs_request(fs->md_target.md_fs_channel); 934 if (req == NULL) { 935 cb_fn(cb_arg, -ENOMEM); 936 return; 937 } 938 939 args = &req->args; 940 args->file = file; 941 args->fn.file_op = cb_fn; 942 args->arg = cb_arg; 943 944 file->name = strdup(name); 945 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 946 } 947 948 static void 949 __fs_create_file_done(void *arg, int fserrno) 950 { 951 struct spdk_fs_request *req = arg; 952 struct spdk_fs_cb_args *args = &req->args; 953 954 args->rc = fserrno; 955 sem_post(args->sem); 956 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 957 } 958 959 static void 960 __fs_create_file(void *arg) 961 { 962 struct spdk_fs_request *req = arg; 963 struct spdk_fs_cb_args *args = &req->args; 964 965 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 966 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 967 } 968 969 int 970 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 971 { 972 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 973 struct spdk_fs_request *req; 974 struct spdk_fs_cb_args *args; 975 int rc; 976 977 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 978 979 req = alloc_fs_request(channel); 980 if (req == NULL) { 981 return -ENOMEM; 982 } 983 984 args = &req->args; 985 args->fs = fs; 986 args->op.create.name = name; 987 args->sem = &channel->sem; 988 fs->send_request(__fs_create_file, req); 989 sem_wait(&channel->sem); 990 rc = args->rc; 991 free_fs_request(req); 992 993 return rc; 994 } 995 996 static void 997 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 998 { 999 struct spdk_fs_request *req = ctx; 1000 struct spdk_fs_cb_args *args = &req->args; 1001 struct spdk_file *f = args->file; 1002 1003 f->blob = blob; 1004 while (!TAILQ_EMPTY(&f->open_requests)) { 1005 req = TAILQ_FIRST(&f->open_requests); 1006 args = &req->args; 1007 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1008 args->fn.file_op_with_handle(args->arg, f, bserrno); 1009 free_fs_request(req); 1010 } 1011 } 1012 1013 static void 1014 fs_open_blob_create_cb(void *ctx, int bserrno) 1015 { 1016 struct spdk_fs_request *req = ctx; 1017 struct spdk_fs_cb_args *args = &req->args; 1018 struct spdk_file *file = args->file; 1019 struct spdk_filesystem *fs = args->fs; 1020 1021 if (file == NULL) { 1022 /* 1023 * This is from an open with CREATE flag - the file 1024 * is now created so look it up in the file list for this 1025 * filesystem. 1026 */ 1027 file = fs_find_file(fs, args->op.open.name); 1028 assert(file != NULL); 1029 args->file = file; 1030 } 1031 1032 file->ref_count++; 1033 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1034 if (file->ref_count == 1) { 1035 assert(file->blob == NULL); 1036 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1037 } else if (file->blob != NULL) { 1038 fs_open_blob_done(req, file->blob, 0); 1039 } else { 1040 /* 1041 * The blob open for this file is in progress due to a previous 1042 * open request. When that open completes, it will invoke the 1043 * open callback for this request. 1044 */ 1045 } 1046 } 1047 1048 void 1049 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1050 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1051 { 1052 struct spdk_file *f = NULL; 1053 struct spdk_fs_request *req; 1054 struct spdk_fs_cb_args *args; 1055 1056 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1057 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1058 return; 1059 } 1060 1061 f = fs_find_file(fs, name); 1062 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1063 cb_fn(cb_arg, NULL, -ENOENT); 1064 return; 1065 } 1066 1067 if (f != NULL && f->is_deleted == true) { 1068 cb_fn(cb_arg, NULL, -ENOENT); 1069 return; 1070 } 1071 1072 req = alloc_fs_request(fs->md_target.md_fs_channel); 1073 if (req == NULL) { 1074 cb_fn(cb_arg, NULL, -ENOMEM); 1075 return; 1076 } 1077 1078 args = &req->args; 1079 args->fn.file_op_with_handle = cb_fn; 1080 args->arg = cb_arg; 1081 args->file = f; 1082 args->fs = fs; 1083 args->op.open.name = name; 1084 1085 if (f == NULL) { 1086 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1087 } else { 1088 fs_open_blob_create_cb(req, 0); 1089 } 1090 } 1091 1092 static void 1093 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1094 { 1095 struct spdk_fs_request *req = arg; 1096 struct spdk_fs_cb_args *args = &req->args; 1097 1098 args->file = file; 1099 args->rc = bserrno; 1100 sem_post(args->sem); 1101 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1102 } 1103 1104 static void 1105 __fs_open_file(void *arg) 1106 { 1107 struct spdk_fs_request *req = arg; 1108 struct spdk_fs_cb_args *args = &req->args; 1109 1110 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1111 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1112 __fs_open_file_done, req); 1113 } 1114 1115 int 1116 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1117 const char *name, uint32_t flags, struct spdk_file **file) 1118 { 1119 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1120 struct spdk_fs_request *req; 1121 struct spdk_fs_cb_args *args; 1122 int rc; 1123 1124 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1125 1126 req = alloc_fs_request(channel); 1127 if (req == NULL) { 1128 return -ENOMEM; 1129 } 1130 1131 args = &req->args; 1132 args->fs = fs; 1133 args->op.open.name = name; 1134 args->op.open.flags = flags; 1135 args->sem = &channel->sem; 1136 fs->send_request(__fs_open_file, req); 1137 sem_wait(&channel->sem); 1138 rc = args->rc; 1139 if (rc == 0) { 1140 *file = args->file; 1141 } else { 1142 *file = NULL; 1143 } 1144 free_fs_request(req); 1145 1146 return rc; 1147 } 1148 1149 static void 1150 fs_rename_blob_close_cb(void *ctx, int bserrno) 1151 { 1152 struct spdk_fs_request *req = ctx; 1153 struct spdk_fs_cb_args *args = &req->args; 1154 1155 args->fn.fs_op(args->arg, bserrno); 1156 free_fs_request(req); 1157 } 1158 1159 static void 1160 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1161 { 1162 struct spdk_fs_request *req = ctx; 1163 struct spdk_fs_cb_args *args = &req->args; 1164 const char *new_name = args->op.rename.new_name; 1165 1166 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1167 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1168 } 1169 1170 static void 1171 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1172 { 1173 struct spdk_fs_cb_args *args = &req->args; 1174 struct spdk_file *f; 1175 1176 f = fs_find_file(args->fs, args->op.rename.old_name); 1177 if (f == NULL) { 1178 args->fn.fs_op(args->arg, -ENOENT); 1179 free_fs_request(req); 1180 return; 1181 } 1182 1183 free(f->name); 1184 f->name = strdup(args->op.rename.new_name); 1185 args->file = f; 1186 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1187 } 1188 1189 static void 1190 fs_rename_delete_done(void *arg, int fserrno) 1191 { 1192 __spdk_fs_md_rename_file(arg); 1193 } 1194 1195 void 1196 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1197 const char *old_name, const char *new_name, 1198 spdk_file_op_complete cb_fn, void *cb_arg) 1199 { 1200 struct spdk_file *f; 1201 struct spdk_fs_request *req; 1202 struct spdk_fs_cb_args *args; 1203 1204 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1205 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1206 cb_fn(cb_arg, -ENAMETOOLONG); 1207 return; 1208 } 1209 1210 req = alloc_fs_request(fs->md_target.md_fs_channel); 1211 if (req == NULL) { 1212 cb_fn(cb_arg, -ENOMEM); 1213 return; 1214 } 1215 1216 args = &req->args; 1217 args->fn.fs_op = cb_fn; 1218 args->fs = fs; 1219 args->arg = cb_arg; 1220 args->op.rename.old_name = old_name; 1221 args->op.rename.new_name = new_name; 1222 1223 f = fs_find_file(fs, new_name); 1224 if (f == NULL) { 1225 __spdk_fs_md_rename_file(req); 1226 return; 1227 } 1228 1229 /* 1230 * The rename overwrites an existing file. So delete the existing file, then 1231 * do the actual rename. 1232 */ 1233 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1234 } 1235 1236 static void 1237 __fs_rename_file_done(void *arg, int fserrno) 1238 { 1239 struct spdk_fs_request *req = arg; 1240 struct spdk_fs_cb_args *args = &req->args; 1241 1242 args->rc = fserrno; 1243 sem_post(args->sem); 1244 } 1245 1246 static void 1247 __fs_rename_file(void *arg) 1248 { 1249 struct spdk_fs_request *req = arg; 1250 struct spdk_fs_cb_args *args = &req->args; 1251 1252 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1253 __fs_rename_file_done, req); 1254 } 1255 1256 int 1257 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1258 const char *old_name, const char *new_name) 1259 { 1260 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1261 struct spdk_fs_request *req; 1262 struct spdk_fs_cb_args *args; 1263 int rc; 1264 1265 req = alloc_fs_request(channel); 1266 if (req == NULL) { 1267 return -ENOMEM; 1268 } 1269 1270 args = &req->args; 1271 1272 args->fs = fs; 1273 args->op.rename.old_name = old_name; 1274 args->op.rename.new_name = new_name; 1275 args->sem = &channel->sem; 1276 fs->send_request(__fs_rename_file, req); 1277 sem_wait(&channel->sem); 1278 rc = args->rc; 1279 free_fs_request(req); 1280 return rc; 1281 } 1282 1283 static void 1284 blob_delete_cb(void *ctx, int bserrno) 1285 { 1286 struct spdk_fs_request *req = ctx; 1287 struct spdk_fs_cb_args *args = &req->args; 1288 1289 args->fn.file_op(args->arg, bserrno); 1290 free_fs_request(req); 1291 } 1292 1293 void 1294 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1295 spdk_file_op_complete cb_fn, void *cb_arg) 1296 { 1297 struct spdk_file *f; 1298 spdk_blob_id blobid; 1299 struct spdk_fs_request *req; 1300 struct spdk_fs_cb_args *args; 1301 1302 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1303 1304 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1305 cb_fn(cb_arg, -ENAMETOOLONG); 1306 return; 1307 } 1308 1309 f = fs_find_file(fs, name); 1310 if (f == NULL) { 1311 cb_fn(cb_arg, -ENOENT); 1312 return; 1313 } 1314 1315 req = alloc_fs_request(fs->md_target.md_fs_channel); 1316 if (req == NULL) { 1317 cb_fn(cb_arg, -ENOMEM); 1318 return; 1319 } 1320 1321 args = &req->args; 1322 args->fn.file_op = cb_fn; 1323 args->arg = cb_arg; 1324 1325 if (f->ref_count > 0) { 1326 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1327 f->is_deleted = true; 1328 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1329 spdk_blob_sync_md(f->blob, blob_delete_cb, args); 1330 return; 1331 } 1332 1333 TAILQ_REMOVE(&fs->files, f, tailq); 1334 1335 cache_free_buffers(f); 1336 1337 blobid = f->blobid; 1338 1339 free(f->name); 1340 free(f->tree); 1341 free(f); 1342 1343 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1344 } 1345 1346 static void 1347 __fs_delete_file_done(void *arg, int fserrno) 1348 { 1349 struct spdk_fs_request *req = arg; 1350 struct spdk_fs_cb_args *args = &req->args; 1351 1352 args->rc = fserrno; 1353 sem_post(args->sem); 1354 } 1355 1356 static void 1357 __fs_delete_file(void *arg) 1358 { 1359 struct spdk_fs_request *req = arg; 1360 struct spdk_fs_cb_args *args = &req->args; 1361 1362 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1363 } 1364 1365 int 1366 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1367 const char *name) 1368 { 1369 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1370 struct spdk_fs_request *req; 1371 struct spdk_fs_cb_args *args; 1372 int rc; 1373 1374 req = alloc_fs_request(channel); 1375 if (req == NULL) { 1376 return -ENOMEM; 1377 } 1378 1379 args = &req->args; 1380 args->fs = fs; 1381 args->op.delete.name = name; 1382 args->sem = &channel->sem; 1383 fs->send_request(__fs_delete_file, req); 1384 sem_wait(&channel->sem); 1385 rc = args->rc; 1386 free_fs_request(req); 1387 1388 return rc; 1389 } 1390 1391 spdk_fs_iter 1392 spdk_fs_iter_first(struct spdk_filesystem *fs) 1393 { 1394 struct spdk_file *f; 1395 1396 f = TAILQ_FIRST(&fs->files); 1397 return f; 1398 } 1399 1400 spdk_fs_iter 1401 spdk_fs_iter_next(spdk_fs_iter iter) 1402 { 1403 struct spdk_file *f = iter; 1404 1405 if (f == NULL) { 1406 return NULL; 1407 } 1408 1409 f = TAILQ_NEXT(f, tailq); 1410 return f; 1411 } 1412 1413 const char * 1414 spdk_file_get_name(struct spdk_file *file) 1415 { 1416 return file->name; 1417 } 1418 1419 uint64_t 1420 spdk_file_get_length(struct spdk_file *file) 1421 { 1422 assert(file != NULL); 1423 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1424 return file->length; 1425 } 1426 1427 static void 1428 fs_truncate_complete_cb(void *ctx, int bserrno) 1429 { 1430 struct spdk_fs_request *req = ctx; 1431 struct spdk_fs_cb_args *args = &req->args; 1432 1433 args->fn.file_op(args->arg, bserrno); 1434 free_fs_request(req); 1435 } 1436 1437 static void 1438 fs_truncate_resize_cb(void *ctx, int bserrno) 1439 { 1440 struct spdk_fs_request *req = ctx; 1441 struct spdk_fs_cb_args *args = &req->args; 1442 struct spdk_file *file = args->file; 1443 uint64_t *length = &args->op.truncate.length; 1444 1445 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1446 1447 file->length = *length; 1448 if (file->append_pos > file->length) { 1449 file->append_pos = file->length; 1450 } 1451 1452 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, args); 1453 } 1454 1455 static uint64_t 1456 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1457 { 1458 return (length + cluster_sz - 1) / cluster_sz; 1459 } 1460 1461 void 1462 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1463 spdk_file_op_complete cb_fn, void *cb_arg) 1464 { 1465 struct spdk_filesystem *fs; 1466 size_t num_clusters; 1467 struct spdk_fs_request *req; 1468 struct spdk_fs_cb_args *args; 1469 1470 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1471 if (length == file->length) { 1472 cb_fn(cb_arg, 0); 1473 return; 1474 } 1475 1476 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1477 if (req == NULL) { 1478 cb_fn(cb_arg, -ENOMEM); 1479 return; 1480 } 1481 1482 args = &req->args; 1483 args->fn.file_op = cb_fn; 1484 args->arg = cb_arg; 1485 args->file = file; 1486 args->op.truncate.length = length; 1487 fs = file->fs; 1488 1489 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1490 1491 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1492 } 1493 1494 static void 1495 __truncate(void *arg) 1496 { 1497 struct spdk_fs_request *req = arg; 1498 struct spdk_fs_cb_args *args = &req->args; 1499 1500 spdk_file_truncate_async(args->file, args->op.truncate.length, 1501 args->fn.file_op, args->arg); 1502 } 1503 1504 int 1505 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1506 uint64_t length) 1507 { 1508 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1509 struct spdk_fs_request *req; 1510 struct spdk_fs_cb_args *args; 1511 1512 req = alloc_fs_request(channel); 1513 if (req == NULL) { 1514 return -ENOMEM; 1515 } 1516 1517 args = &req->args; 1518 1519 args->file = file; 1520 args->op.truncate.length = length; 1521 args->fn.file_op = __sem_post; 1522 args->arg = &channel->sem; 1523 1524 channel->send_request(__truncate, req); 1525 sem_wait(&channel->sem); 1526 free_fs_request(req); 1527 1528 return 0; 1529 } 1530 1531 static void 1532 __rw_done(void *ctx, int bserrno) 1533 { 1534 struct spdk_fs_request *req = ctx; 1535 struct spdk_fs_cb_args *args = &req->args; 1536 1537 spdk_dma_free(args->op.rw.pin_buf); 1538 args->fn.file_op(args->arg, bserrno); 1539 free_fs_request(req); 1540 } 1541 1542 static void 1543 __read_done(void *ctx, int bserrno) 1544 { 1545 struct spdk_fs_request *req = ctx; 1546 struct spdk_fs_cb_args *args = &req->args; 1547 1548 assert(req != NULL); 1549 if (args->op.rw.is_read) { 1550 memcpy(args->op.rw.user_buf, 1551 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1552 args->op.rw.length); 1553 __rw_done(req, 0); 1554 } else { 1555 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1556 args->op.rw.user_buf, 1557 args->op.rw.length); 1558 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1559 args->op.rw.pin_buf, 1560 args->op.rw.start_page, args->op.rw.num_pages, 1561 __rw_done, req); 1562 } 1563 } 1564 1565 static void 1566 __do_blob_read(void *ctx, int fserrno) 1567 { 1568 struct spdk_fs_request *req = ctx; 1569 struct spdk_fs_cb_args *args = &req->args; 1570 1571 if (fserrno) { 1572 __rw_done(req, fserrno); 1573 return; 1574 } 1575 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1576 args->op.rw.pin_buf, 1577 args->op.rw.start_page, args->op.rw.num_pages, 1578 __read_done, req); 1579 } 1580 1581 static void 1582 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1583 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1584 { 1585 uint64_t end_page; 1586 1587 *page_size = spdk_bs_get_page_size(file->fs->bs); 1588 *start_page = offset / *page_size; 1589 end_page = (offset + length - 1) / *page_size; 1590 *num_pages = (end_page - *start_page + 1); 1591 } 1592 1593 static void 1594 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1595 void *payload, uint64_t offset, uint64_t length, 1596 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1597 { 1598 struct spdk_fs_request *req; 1599 struct spdk_fs_cb_args *args; 1600 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1601 uint64_t start_page, num_pages, pin_buf_length; 1602 uint32_t page_size; 1603 1604 if (is_read && offset + length > file->length) { 1605 cb_fn(cb_arg, -EINVAL); 1606 return; 1607 } 1608 1609 req = alloc_fs_request(channel); 1610 if (req == NULL) { 1611 cb_fn(cb_arg, -ENOMEM); 1612 return; 1613 } 1614 1615 args = &req->args; 1616 args->fn.file_op = cb_fn; 1617 args->arg = cb_arg; 1618 args->file = file; 1619 args->op.rw.channel = channel->bs_channel; 1620 args->op.rw.user_buf = payload; 1621 args->op.rw.is_read = is_read; 1622 args->op.rw.offset = offset; 1623 args->op.rw.length = length; 1624 1625 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1626 pin_buf_length = num_pages * page_size; 1627 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL); 1628 if (args->op.rw.pin_buf == NULL) { 1629 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1630 file->name, offset, length); 1631 free_fs_request(req); 1632 cb_fn(cb_arg, -ENOMEM); 1633 return; 1634 } 1635 1636 args->op.rw.start_page = start_page; 1637 args->op.rw.num_pages = num_pages; 1638 1639 if (!is_read && file->length < offset + length) { 1640 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1641 } else { 1642 __do_blob_read(req, 0); 1643 } 1644 } 1645 1646 void 1647 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1648 void *payload, uint64_t offset, uint64_t length, 1649 spdk_file_op_complete cb_fn, void *cb_arg) 1650 { 1651 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1652 } 1653 1654 void 1655 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1656 void *payload, uint64_t offset, uint64_t length, 1657 spdk_file_op_complete cb_fn, void *cb_arg) 1658 { 1659 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1660 file->name, offset, length); 1661 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1662 } 1663 1664 struct spdk_io_channel * 1665 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1666 { 1667 struct spdk_io_channel *io_channel; 1668 struct spdk_fs_channel *fs_channel; 1669 1670 io_channel = spdk_get_io_channel(&fs->io_target); 1671 fs_channel = spdk_io_channel_get_ctx(io_channel); 1672 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1673 fs_channel->send_request = __send_request_direct; 1674 1675 return io_channel; 1676 } 1677 1678 struct spdk_io_channel * 1679 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs) 1680 { 1681 struct spdk_io_channel *io_channel; 1682 struct spdk_fs_channel *fs_channel; 1683 1684 io_channel = spdk_get_io_channel(&fs->io_target); 1685 fs_channel = spdk_io_channel_get_ctx(io_channel); 1686 fs_channel->send_request = fs->send_request; 1687 fs_channel->sync = 1; 1688 pthread_spin_init(&fs_channel->lock, 0); 1689 1690 return io_channel; 1691 } 1692 1693 void 1694 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1695 { 1696 spdk_put_io_channel(channel); 1697 } 1698 1699 void 1700 spdk_fs_set_cache_size(uint64_t size_in_mb) 1701 { 1702 g_fs_cache_size = size_in_mb * 1024 * 1024; 1703 } 1704 1705 uint64_t 1706 spdk_fs_get_cache_size(void) 1707 { 1708 return g_fs_cache_size / (1024 * 1024); 1709 } 1710 1711 static void __file_flush(void *_args); 1712 1713 static void * 1714 alloc_cache_memory_buffer(struct spdk_file *context) 1715 { 1716 struct spdk_file *file; 1717 void *buf; 1718 1719 buf = spdk_mempool_get(g_cache_pool); 1720 if (buf != NULL) { 1721 return buf; 1722 } 1723 1724 pthread_spin_lock(&g_caches_lock); 1725 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1726 if (!file->open_for_writing && 1727 file->priority == SPDK_FILE_PRIORITY_LOW && 1728 file != context) { 1729 break; 1730 } 1731 } 1732 pthread_spin_unlock(&g_caches_lock); 1733 if (file != NULL) { 1734 cache_free_buffers(file); 1735 buf = spdk_mempool_get(g_cache_pool); 1736 if (buf != NULL) { 1737 return buf; 1738 } 1739 } 1740 1741 pthread_spin_lock(&g_caches_lock); 1742 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1743 if (!file->open_for_writing && file != context) { 1744 break; 1745 } 1746 } 1747 pthread_spin_unlock(&g_caches_lock); 1748 if (file != NULL) { 1749 cache_free_buffers(file); 1750 buf = spdk_mempool_get(g_cache_pool); 1751 if (buf != NULL) { 1752 return buf; 1753 } 1754 } 1755 1756 pthread_spin_lock(&g_caches_lock); 1757 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1758 if (file != context) { 1759 break; 1760 } 1761 } 1762 pthread_spin_unlock(&g_caches_lock); 1763 if (file != NULL) { 1764 cache_free_buffers(file); 1765 buf = spdk_mempool_get(g_cache_pool); 1766 if (buf != NULL) { 1767 return buf; 1768 } 1769 } 1770 1771 return NULL; 1772 } 1773 1774 static struct cache_buffer * 1775 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1776 { 1777 struct cache_buffer *buf; 1778 int count = 0; 1779 1780 buf = calloc(1, sizeof(*buf)); 1781 if (buf == NULL) { 1782 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 1783 return NULL; 1784 } 1785 1786 buf->buf = alloc_cache_memory_buffer(file); 1787 while (buf->buf == NULL) { 1788 /* 1789 * TODO: alloc_cache_memory_buffer() should eventually free 1790 * some buffers. Need a more sophisticated check here, instead 1791 * of just bailing if 100 tries does not result in getting a 1792 * free buffer. This will involve using the sync channel's 1793 * semaphore to block until a buffer becomes available. 1794 */ 1795 if (count++ == 100) { 1796 SPDK_ERRLOG("could not allocate cache buffer\n"); 1797 assert(false); 1798 free(buf); 1799 return NULL; 1800 } 1801 buf->buf = alloc_cache_memory_buffer(file); 1802 } 1803 1804 buf->buf_size = CACHE_BUFFER_SIZE; 1805 buf->offset = offset; 1806 1807 pthread_spin_lock(&g_caches_lock); 1808 if (file->tree->present_mask == 0) { 1809 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1810 } 1811 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1812 pthread_spin_unlock(&g_caches_lock); 1813 1814 return buf; 1815 } 1816 1817 static struct cache_buffer * 1818 cache_append_buffer(struct spdk_file *file) 1819 { 1820 struct cache_buffer *last; 1821 1822 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1823 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1824 1825 last = cache_insert_buffer(file, file->append_pos); 1826 if (last == NULL) { 1827 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 1828 return NULL; 1829 } 1830 1831 file->last = last; 1832 1833 return last; 1834 } 1835 1836 static void 1837 __wake_caller(struct spdk_fs_cb_args *args) 1838 { 1839 sem_post(args->sem); 1840 } 1841 1842 static void __check_sync_reqs(struct spdk_file *file); 1843 1844 static void 1845 __file_cache_finish_sync(struct spdk_file *file) 1846 { 1847 struct spdk_fs_request *sync_req; 1848 struct spdk_fs_cb_args *sync_args; 1849 1850 pthread_spin_lock(&file->lock); 1851 sync_req = TAILQ_FIRST(&file->sync_requests); 1852 sync_args = &sync_req->args; 1853 assert(sync_args->op.sync.offset <= file->length_flushed); 1854 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1855 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1856 pthread_spin_unlock(&file->lock); 1857 1858 sync_args->fn.file_op(sync_args->arg, 0); 1859 __check_sync_reqs(file); 1860 1861 pthread_spin_lock(&file->lock); 1862 free_fs_request(sync_req); 1863 pthread_spin_unlock(&file->lock); 1864 } 1865 1866 static void 1867 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1868 { 1869 struct spdk_file *file = ctx; 1870 1871 __file_cache_finish_sync(file); 1872 } 1873 1874 static void 1875 __free_args(struct spdk_fs_cb_args *args) 1876 { 1877 struct spdk_fs_request *req; 1878 1879 if (!args->from_request) { 1880 free(args); 1881 } else { 1882 /* Depends on args being at the start of the spdk_fs_request structure. */ 1883 req = (struct spdk_fs_request *)args; 1884 free_fs_request(req); 1885 } 1886 } 1887 1888 static void 1889 __check_sync_reqs(struct spdk_file *file) 1890 { 1891 struct spdk_fs_request *sync_req; 1892 1893 pthread_spin_lock(&file->lock); 1894 1895 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1896 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1897 break; 1898 } 1899 } 1900 1901 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1902 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1903 sync_req->args.op.sync.xattr_in_progress = true; 1904 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 1905 sizeof(file->length_flushed)); 1906 1907 pthread_spin_unlock(&file->lock); 1908 spdk_blob_sync_md(file->blob, __file_cache_finish_sync_bs_cb, file); 1909 } else { 1910 pthread_spin_unlock(&file->lock); 1911 } 1912 } 1913 1914 static void 1915 __file_flush_done(void *arg, int bserrno) 1916 { 1917 struct spdk_fs_cb_args *args = arg; 1918 struct spdk_file *file = args->file; 1919 struct cache_buffer *next = args->op.flush.cache_buffer; 1920 1921 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1922 1923 pthread_spin_lock(&file->lock); 1924 next->in_progress = false; 1925 next->bytes_flushed += args->op.flush.length; 1926 file->length_flushed += args->op.flush.length; 1927 if (file->length_flushed > file->length) { 1928 file->length = file->length_flushed; 1929 } 1930 if (next->bytes_flushed == next->buf_size) { 1931 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1932 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1933 } 1934 1935 /* 1936 * Assert that there is no cached data that extends past the end of the underlying 1937 * blob. 1938 */ 1939 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1940 next->bytes_filled == 0); 1941 1942 pthread_spin_unlock(&file->lock); 1943 1944 __check_sync_reqs(file); 1945 1946 __file_flush(args); 1947 } 1948 1949 static void 1950 __file_flush(void *_args) 1951 { 1952 struct spdk_fs_cb_args *args = _args; 1953 struct spdk_file *file = args->file; 1954 struct cache_buffer *next; 1955 uint64_t offset, length, start_page, num_pages; 1956 uint32_t page_size; 1957 1958 pthread_spin_lock(&file->lock); 1959 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1960 if (next == NULL || next->in_progress) { 1961 /* 1962 * There is either no data to flush, or a flush I/O is already in 1963 * progress. So return immediately - if a flush I/O is in 1964 * progress we will flush more data after that is completed. 1965 */ 1966 __free_args(args); 1967 if (next == NULL) { 1968 /* 1969 * For cases where a file's cache was evicted, and then the 1970 * file was later appended, we will write the data directly 1971 * to disk and bypass cache. So just update length_flushed 1972 * here to reflect that all data was already written to disk. 1973 */ 1974 file->length_flushed = file->append_pos; 1975 } 1976 pthread_spin_unlock(&file->lock); 1977 if (next == NULL) { 1978 /* 1979 * There is no data to flush, but we still need to check for any 1980 * outstanding sync requests to make sure metadata gets updated. 1981 */ 1982 __check_sync_reqs(file); 1983 } 1984 return; 1985 } 1986 1987 offset = next->offset + next->bytes_flushed; 1988 length = next->bytes_filled - next->bytes_flushed; 1989 if (length == 0) { 1990 __free_args(args); 1991 pthread_spin_unlock(&file->lock); 1992 return; 1993 } 1994 args->op.flush.length = length; 1995 args->op.flush.cache_buffer = next; 1996 1997 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1998 1999 next->in_progress = true; 2000 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2001 offset, length, start_page, num_pages); 2002 pthread_spin_unlock(&file->lock); 2003 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2004 next->buf + (start_page * page_size) - next->offset, 2005 start_page, num_pages, __file_flush_done, args); 2006 } 2007 2008 static void 2009 __file_extend_done(void *arg, int bserrno) 2010 { 2011 struct spdk_fs_cb_args *args = arg; 2012 2013 __wake_caller(args); 2014 } 2015 2016 static void 2017 __file_extend_resize_cb(void *_args, int bserrno) 2018 { 2019 struct spdk_fs_cb_args *args = _args; 2020 struct spdk_file *file = args->file; 2021 2022 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2023 } 2024 2025 static void 2026 __file_extend_blob(void *_args) 2027 { 2028 struct spdk_fs_cb_args *args = _args; 2029 struct spdk_file *file = args->file; 2030 2031 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2032 } 2033 2034 static void 2035 __rw_from_file_done(void *arg, int bserrno) 2036 { 2037 struct spdk_fs_cb_args *args = arg; 2038 2039 __wake_caller(args); 2040 __free_args(args); 2041 } 2042 2043 static void 2044 __rw_from_file(void *_args) 2045 { 2046 struct spdk_fs_cb_args *args = _args; 2047 struct spdk_file *file = args->file; 2048 2049 if (args->op.rw.is_read) { 2050 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 2051 args->op.rw.offset, args->op.rw.length, 2052 __rw_from_file_done, args); 2053 } else { 2054 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 2055 args->op.rw.offset, args->op.rw.length, 2056 __rw_from_file_done, args); 2057 } 2058 } 2059 2060 static int 2061 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 2062 uint64_t offset, uint64_t length, bool is_read) 2063 { 2064 struct spdk_fs_cb_args *args; 2065 2066 args = calloc(1, sizeof(*args)); 2067 if (args == NULL) { 2068 sem_post(sem); 2069 return -ENOMEM; 2070 } 2071 2072 args->file = file; 2073 args->sem = sem; 2074 args->op.rw.user_buf = payload; 2075 args->op.rw.offset = offset; 2076 args->op.rw.length = length; 2077 args->op.rw.is_read = is_read; 2078 file->fs->send_request(__rw_from_file, args); 2079 return 0; 2080 } 2081 2082 int 2083 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 2084 void *payload, uint64_t offset, uint64_t length) 2085 { 2086 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2087 struct spdk_fs_cb_args *args; 2088 uint64_t rem_length, copy, blob_size, cluster_sz; 2089 uint32_t cache_buffers_filled = 0; 2090 uint8_t *cur_payload; 2091 struct cache_buffer *last; 2092 2093 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2094 2095 if (length == 0) { 2096 return 0; 2097 } 2098 2099 if (offset != file->append_pos) { 2100 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2101 return -EINVAL; 2102 } 2103 2104 pthread_spin_lock(&file->lock); 2105 file->open_for_writing = true; 2106 2107 if (file->last == NULL) { 2108 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 2109 cache_append_buffer(file); 2110 } else { 2111 int rc; 2112 2113 file->append_pos += length; 2114 pthread_spin_unlock(&file->lock); 2115 rc = __send_rw_from_file(file, &channel->sem, payload, 2116 offset, length, false); 2117 sem_wait(&channel->sem); 2118 return rc; 2119 } 2120 } 2121 2122 blob_size = __file_get_blob_size(file); 2123 2124 if ((offset + length) > blob_size) { 2125 struct spdk_fs_cb_args extend_args = {}; 2126 2127 cluster_sz = file->fs->bs_opts.cluster_sz; 2128 extend_args.sem = &channel->sem; 2129 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2130 extend_args.file = file; 2131 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2132 pthread_spin_unlock(&file->lock); 2133 file->fs->send_request(__file_extend_blob, &extend_args); 2134 sem_wait(&channel->sem); 2135 } 2136 2137 last = file->last; 2138 rem_length = length; 2139 cur_payload = payload; 2140 while (rem_length > 0) { 2141 copy = last->buf_size - last->bytes_filled; 2142 if (copy > rem_length) { 2143 copy = rem_length; 2144 } 2145 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2146 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2147 file->append_pos += copy; 2148 if (file->length < file->append_pos) { 2149 file->length = file->append_pos; 2150 } 2151 cur_payload += copy; 2152 last->bytes_filled += copy; 2153 rem_length -= copy; 2154 if (last->bytes_filled == last->buf_size) { 2155 cache_buffers_filled++; 2156 last = cache_append_buffer(file); 2157 if (last == NULL) { 2158 BLOBFS_TRACE(file, "nomem\n"); 2159 pthread_spin_unlock(&file->lock); 2160 return -ENOMEM; 2161 } 2162 } 2163 } 2164 2165 pthread_spin_unlock(&file->lock); 2166 2167 if (cache_buffers_filled == 0) { 2168 return 0; 2169 } 2170 2171 args = calloc(1, sizeof(*args)); 2172 if (args == NULL) { 2173 return -ENOMEM; 2174 } 2175 2176 args->file = file; 2177 file->fs->send_request(__file_flush, args); 2178 return 0; 2179 } 2180 2181 static void 2182 __readahead_done(void *arg, int bserrno) 2183 { 2184 struct spdk_fs_cb_args *args = arg; 2185 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2186 struct spdk_file *file = args->file; 2187 2188 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2189 2190 pthread_spin_lock(&file->lock); 2191 cache_buffer->bytes_filled = args->op.readahead.length; 2192 cache_buffer->bytes_flushed = args->op.readahead.length; 2193 cache_buffer->in_progress = false; 2194 pthread_spin_unlock(&file->lock); 2195 2196 __free_args(args); 2197 } 2198 2199 static void 2200 __readahead(void *_args) 2201 { 2202 struct spdk_fs_cb_args *args = _args; 2203 struct spdk_file *file = args->file; 2204 uint64_t offset, length, start_page, num_pages; 2205 uint32_t page_size; 2206 2207 offset = args->op.readahead.offset; 2208 length = args->op.readahead.length; 2209 assert(length > 0); 2210 2211 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 2212 2213 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2214 offset, length, start_page, num_pages); 2215 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2216 args->op.readahead.cache_buffer->buf, 2217 start_page, num_pages, __readahead_done, args); 2218 } 2219 2220 static uint64_t 2221 __next_cache_buffer_offset(uint64_t offset) 2222 { 2223 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2224 } 2225 2226 static void 2227 check_readahead(struct spdk_file *file, uint64_t offset) 2228 { 2229 struct spdk_fs_cb_args *args; 2230 2231 offset = __next_cache_buffer_offset(offset); 2232 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2233 return; 2234 } 2235 2236 args = calloc(1, sizeof(*args)); 2237 if (args == NULL) { 2238 return; 2239 } 2240 2241 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2242 2243 args->file = file; 2244 args->op.readahead.offset = offset; 2245 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2246 if (!args->op.readahead.cache_buffer) { 2247 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2248 free(args); 2249 return; 2250 } 2251 2252 args->op.readahead.cache_buffer->in_progress = true; 2253 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2254 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2255 } else { 2256 args->op.readahead.length = CACHE_BUFFER_SIZE; 2257 } 2258 file->fs->send_request(__readahead, args); 2259 } 2260 2261 static int 2262 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2263 { 2264 struct cache_buffer *buf; 2265 int rc; 2266 2267 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2268 if (buf == NULL) { 2269 pthread_spin_unlock(&file->lock); 2270 rc = __send_rw_from_file(file, sem, payload, offset, length, true); 2271 pthread_spin_lock(&file->lock); 2272 return rc; 2273 } 2274 2275 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2276 length = buf->offset + buf->bytes_filled - offset; 2277 } 2278 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2279 memcpy(payload, &buf->buf[offset - buf->offset], length); 2280 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2281 pthread_spin_lock(&g_caches_lock); 2282 spdk_tree_remove_buffer(file->tree, buf); 2283 if (file->tree->present_mask == 0) { 2284 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2285 } 2286 pthread_spin_unlock(&g_caches_lock); 2287 } 2288 2289 sem_post(sem); 2290 return 0; 2291 } 2292 2293 int64_t 2294 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2295 void *payload, uint64_t offset, uint64_t length) 2296 { 2297 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2298 uint64_t final_offset, final_length; 2299 uint32_t sub_reads = 0; 2300 int rc = 0; 2301 2302 pthread_spin_lock(&file->lock); 2303 2304 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2305 2306 file->open_for_writing = false; 2307 2308 if (length == 0 || offset >= file->append_pos) { 2309 pthread_spin_unlock(&file->lock); 2310 return 0; 2311 } 2312 2313 if (offset + length > file->append_pos) { 2314 length = file->append_pos - offset; 2315 } 2316 2317 if (offset != file->next_seq_offset) { 2318 file->seq_byte_count = 0; 2319 } 2320 file->seq_byte_count += length; 2321 file->next_seq_offset = offset + length; 2322 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2323 check_readahead(file, offset); 2324 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2325 } 2326 2327 final_length = 0; 2328 final_offset = offset + length; 2329 while (offset < final_offset) { 2330 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2331 if (length > (final_offset - offset)) { 2332 length = final_offset - offset; 2333 } 2334 rc = __file_read(file, payload, offset, length, &channel->sem); 2335 if (rc == 0) { 2336 final_length += length; 2337 } else { 2338 break; 2339 } 2340 payload += length; 2341 offset += length; 2342 sub_reads++; 2343 } 2344 pthread_spin_unlock(&file->lock); 2345 while (sub_reads-- > 0) { 2346 sem_wait(&channel->sem); 2347 } 2348 if (rc == 0) { 2349 return final_length; 2350 } else { 2351 return rc; 2352 } 2353 } 2354 2355 static void 2356 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2357 spdk_file_op_complete cb_fn, void *cb_arg) 2358 { 2359 struct spdk_fs_request *sync_req; 2360 struct spdk_fs_request *flush_req; 2361 struct spdk_fs_cb_args *sync_args; 2362 struct spdk_fs_cb_args *flush_args; 2363 2364 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2365 2366 pthread_spin_lock(&file->lock); 2367 if (file->append_pos <= file->length_flushed) { 2368 BLOBFS_TRACE(file, "done - no data to flush\n"); 2369 pthread_spin_unlock(&file->lock); 2370 cb_fn(cb_arg, 0); 2371 return; 2372 } 2373 2374 sync_req = alloc_fs_request(channel); 2375 if (!sync_req) { 2376 pthread_spin_unlock(&file->lock); 2377 cb_fn(cb_arg, -ENOMEM); 2378 return; 2379 } 2380 sync_args = &sync_req->args; 2381 2382 flush_req = alloc_fs_request(channel); 2383 if (!flush_req) { 2384 pthread_spin_unlock(&file->lock); 2385 cb_fn(cb_arg, -ENOMEM); 2386 return; 2387 } 2388 flush_args = &flush_req->args; 2389 2390 sync_args->file = file; 2391 sync_args->fn.file_op = cb_fn; 2392 sync_args->arg = cb_arg; 2393 sync_args->op.sync.offset = file->append_pos; 2394 sync_args->op.sync.xattr_in_progress = false; 2395 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2396 pthread_spin_unlock(&file->lock); 2397 2398 flush_args->file = file; 2399 channel->send_request(__file_flush, flush_args); 2400 } 2401 2402 int 2403 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2404 { 2405 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2406 2407 _file_sync(file, channel, __sem_post, &channel->sem); 2408 sem_wait(&channel->sem); 2409 2410 return 0; 2411 } 2412 2413 void 2414 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2415 spdk_file_op_complete cb_fn, void *cb_arg) 2416 { 2417 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2418 2419 _file_sync(file, channel, cb_fn, cb_arg); 2420 } 2421 2422 void 2423 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2424 { 2425 BLOBFS_TRACE(file, "priority=%u\n", priority); 2426 file->priority = priority; 2427 2428 } 2429 2430 /* 2431 * Close routines 2432 */ 2433 2434 static void 2435 __file_close_async_done(void *ctx, int bserrno) 2436 { 2437 struct spdk_fs_request *req = ctx; 2438 struct spdk_fs_cb_args *args = &req->args; 2439 struct spdk_file *file = args->file; 2440 2441 if (file->is_deleted) { 2442 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2443 return; 2444 } 2445 2446 args->fn.file_op(args->arg, bserrno); 2447 free_fs_request(req); 2448 } 2449 2450 static void 2451 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2452 { 2453 struct spdk_blob *blob; 2454 2455 pthread_spin_lock(&file->lock); 2456 if (file->ref_count == 0) { 2457 pthread_spin_unlock(&file->lock); 2458 __file_close_async_done(req, -EBADF); 2459 return; 2460 } 2461 2462 file->ref_count--; 2463 if (file->ref_count > 0) { 2464 pthread_spin_unlock(&file->lock); 2465 req->args.fn.file_op(req->args.arg, 0); 2466 free_fs_request(req); 2467 return; 2468 } 2469 2470 pthread_spin_unlock(&file->lock); 2471 2472 blob = file->blob; 2473 file->blob = NULL; 2474 spdk_blob_close(blob, __file_close_async_done, req); 2475 } 2476 2477 static void 2478 __file_close_async__sync_done(void *arg, int fserrno) 2479 { 2480 struct spdk_fs_request *req = arg; 2481 struct spdk_fs_cb_args *args = &req->args; 2482 2483 __file_close_async(args->file, req); 2484 } 2485 2486 void 2487 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2488 { 2489 struct spdk_fs_request *req; 2490 struct spdk_fs_cb_args *args; 2491 2492 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2493 if (req == NULL) { 2494 cb_fn(cb_arg, -ENOMEM); 2495 return; 2496 } 2497 2498 args = &req->args; 2499 args->file = file; 2500 args->fn.file_op = cb_fn; 2501 args->arg = cb_arg; 2502 2503 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2504 } 2505 2506 static void 2507 __file_close_done(void *arg, int fserrno) 2508 { 2509 struct spdk_fs_cb_args *args = arg; 2510 2511 args->rc = fserrno; 2512 sem_post(args->sem); 2513 } 2514 2515 static void 2516 __file_close(void *arg) 2517 { 2518 struct spdk_fs_request *req = arg; 2519 struct spdk_fs_cb_args *args = &req->args; 2520 struct spdk_file *file = args->file; 2521 2522 __file_close_async(file, req); 2523 } 2524 2525 int 2526 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2527 { 2528 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2529 struct spdk_fs_request *req; 2530 struct spdk_fs_cb_args *args; 2531 2532 req = alloc_fs_request(channel); 2533 if (req == NULL) { 2534 return -ENOMEM; 2535 } 2536 2537 args = &req->args; 2538 2539 spdk_file_sync(file, _channel); 2540 BLOBFS_TRACE(file, "name=%s\n", file->name); 2541 args->file = file; 2542 args->sem = &channel->sem; 2543 args->fn.file_op = __file_close_done; 2544 args->arg = req; 2545 channel->send_request(__file_close, req); 2546 sem_wait(&channel->sem); 2547 2548 return args->rc; 2549 } 2550 2551 static void 2552 cache_free_buffers(struct spdk_file *file) 2553 { 2554 BLOBFS_TRACE(file, "free=%s\n", file->name); 2555 pthread_spin_lock(&file->lock); 2556 pthread_spin_lock(&g_caches_lock); 2557 if (file->tree->present_mask == 0) { 2558 pthread_spin_unlock(&g_caches_lock); 2559 pthread_spin_unlock(&file->lock); 2560 return; 2561 } 2562 spdk_tree_free_buffers(file->tree); 2563 2564 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2565 /* If not freed, put it in the end of the queue */ 2566 if (file->tree->present_mask != 0) { 2567 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2568 } 2569 file->last = NULL; 2570 pthread_spin_unlock(&g_caches_lock); 2571 pthread_spin_unlock(&file->lock); 2572 } 2573 2574 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2575 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2576