1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "blobfs_internal.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/io_channel.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 47 #define BLOBFS_TRACE(file, str, args...) \ 48 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 49 50 #define BLOBFS_TRACE_RW(file, str, args...) \ 51 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 54 #define SPDK_BLOBFS_OPTS_CLUSTER_SZ (1024 * 1024) 55 56 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 57 static struct spdk_mempool *g_cache_pool; 58 static TAILQ_HEAD(, spdk_file) g_caches; 59 static int g_fs_count = 0; 60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 61 static pthread_spinlock_t g_caches_lock; 62 63 static void 64 __sem_post(void *arg, int bserrno) 65 { 66 sem_t *sem = arg; 67 68 sem_post(sem); 69 } 70 71 void 72 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 73 { 74 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 75 free(cache_buffer); 76 } 77 78 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 79 80 struct spdk_file { 81 struct spdk_filesystem *fs; 82 struct spdk_blob *blob; 83 char *name; 84 uint64_t length; 85 bool is_deleted; 86 bool open_for_writing; 87 uint64_t length_flushed; 88 uint64_t append_pos; 89 uint64_t seq_byte_count; 90 uint64_t next_seq_offset; 91 uint32_t priority; 92 TAILQ_ENTRY(spdk_file) tailq; 93 spdk_blob_id blobid; 94 uint32_t ref_count; 95 pthread_spinlock_t lock; 96 struct cache_buffer *last; 97 struct cache_tree *tree; 98 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 99 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 100 TAILQ_ENTRY(spdk_file) cache_tailq; 101 }; 102 103 struct spdk_deleted_file { 104 spdk_blob_id id; 105 TAILQ_ENTRY(spdk_deleted_file) tailq; 106 }; 107 108 struct spdk_filesystem { 109 struct spdk_blob_store *bs; 110 TAILQ_HEAD(, spdk_file) files; 111 struct spdk_bs_opts bs_opts; 112 struct spdk_bs_dev *bdev; 113 fs_send_request_fn send_request; 114 115 struct { 116 uint32_t max_ops; 117 struct spdk_io_channel *sync_io_channel; 118 struct spdk_fs_channel *sync_fs_channel; 119 } sync_target; 120 121 struct { 122 uint32_t max_ops; 123 struct spdk_io_channel *md_io_channel; 124 struct spdk_fs_channel *md_fs_channel; 125 } md_target; 126 127 struct { 128 uint32_t max_ops; 129 } io_target; 130 }; 131 132 struct spdk_fs_cb_args { 133 union { 134 spdk_fs_op_with_handle_complete fs_op_with_handle; 135 spdk_fs_op_complete fs_op; 136 spdk_file_op_with_handle_complete file_op_with_handle; 137 spdk_file_op_complete file_op; 138 spdk_file_stat_op_complete stat_op; 139 } fn; 140 void *arg; 141 sem_t *sem; 142 struct spdk_filesystem *fs; 143 struct spdk_file *file; 144 int rc; 145 bool from_request; 146 union { 147 struct { 148 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 149 } fs_load; 150 struct { 151 uint64_t length; 152 } truncate; 153 struct { 154 struct spdk_io_channel *channel; 155 void *user_buf; 156 void *pin_buf; 157 int is_read; 158 off_t offset; 159 size_t length; 160 uint64_t start_page; 161 uint64_t num_pages; 162 uint32_t blocklen; 163 } rw; 164 struct { 165 const char *old_name; 166 const char *new_name; 167 } rename; 168 struct { 169 struct cache_buffer *cache_buffer; 170 uint64_t length; 171 } flush; 172 struct { 173 struct cache_buffer *cache_buffer; 174 uint64_t length; 175 uint64_t offset; 176 } readahead; 177 struct { 178 uint64_t offset; 179 TAILQ_ENTRY(spdk_fs_request) tailq; 180 bool xattr_in_progress; 181 } sync; 182 struct { 183 uint32_t num_clusters; 184 } resize; 185 struct { 186 const char *name; 187 uint32_t flags; 188 TAILQ_ENTRY(spdk_fs_request) tailq; 189 } open; 190 struct { 191 const char *name; 192 } create; 193 struct { 194 const char *name; 195 } delete; 196 struct { 197 const char *name; 198 } stat; 199 } op; 200 }; 201 202 static void cache_free_buffers(struct spdk_file *file); 203 204 void 205 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 206 { 207 opts->cluster_sz = SPDK_BLOBFS_OPTS_CLUSTER_SZ; 208 } 209 210 static void 211 __initialize_cache(void) 212 { 213 assert(g_cache_pool == NULL); 214 215 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 216 g_fs_cache_size / CACHE_BUFFER_SIZE, 217 CACHE_BUFFER_SIZE, 218 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 219 SPDK_ENV_SOCKET_ID_ANY); 220 TAILQ_INIT(&g_caches); 221 pthread_spin_init(&g_caches_lock, 0); 222 } 223 224 static void 225 __free_cache(void) 226 { 227 assert(g_cache_pool != NULL); 228 229 spdk_mempool_free(g_cache_pool); 230 g_cache_pool = NULL; 231 } 232 233 static uint64_t 234 __file_get_blob_size(struct spdk_file *file) 235 { 236 uint64_t cluster_sz; 237 238 cluster_sz = file->fs->bs_opts.cluster_sz; 239 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 240 } 241 242 struct spdk_fs_request { 243 struct spdk_fs_cb_args args; 244 TAILQ_ENTRY(spdk_fs_request) link; 245 struct spdk_fs_channel *channel; 246 }; 247 248 struct spdk_fs_channel { 249 struct spdk_fs_request *req_mem; 250 TAILQ_HEAD(, spdk_fs_request) reqs; 251 sem_t sem; 252 struct spdk_filesystem *fs; 253 struct spdk_io_channel *bs_channel; 254 fs_send_request_fn send_request; 255 bool sync; 256 pthread_spinlock_t lock; 257 }; 258 259 static struct spdk_fs_request * 260 alloc_fs_request(struct spdk_fs_channel *channel) 261 { 262 struct spdk_fs_request *req; 263 264 if (channel->sync) { 265 pthread_spin_lock(&channel->lock); 266 } 267 268 req = TAILQ_FIRST(&channel->reqs); 269 if (req) { 270 TAILQ_REMOVE(&channel->reqs, req, link); 271 } 272 273 if (channel->sync) { 274 pthread_spin_unlock(&channel->lock); 275 } 276 277 if (req == NULL) { 278 return NULL; 279 } 280 memset(req, 0, sizeof(*req)); 281 req->channel = channel; 282 req->args.from_request = true; 283 284 return req; 285 } 286 287 static void 288 free_fs_request(struct spdk_fs_request *req) 289 { 290 struct spdk_fs_channel *channel = req->channel; 291 292 if (channel->sync) { 293 pthread_spin_lock(&channel->lock); 294 } 295 296 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 297 298 if (channel->sync) { 299 pthread_spin_unlock(&channel->lock); 300 } 301 } 302 303 static int 304 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 305 uint32_t max_ops) 306 { 307 uint32_t i; 308 309 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 310 if (!channel->req_mem) { 311 return -1; 312 } 313 314 TAILQ_INIT(&channel->reqs); 315 sem_init(&channel->sem, 0, 0); 316 317 for (i = 0; i < max_ops; i++) { 318 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 319 } 320 321 channel->fs = fs; 322 323 return 0; 324 } 325 326 static int 327 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 328 { 329 struct spdk_filesystem *fs; 330 struct spdk_fs_channel *channel = ctx_buf; 331 332 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 333 334 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 335 } 336 337 static int 338 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 339 { 340 struct spdk_filesystem *fs; 341 struct spdk_fs_channel *channel = ctx_buf; 342 343 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 344 345 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 346 } 347 348 static int 349 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 350 { 351 struct spdk_filesystem *fs; 352 struct spdk_fs_channel *channel = ctx_buf; 353 354 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 355 356 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 357 } 358 359 static void 360 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 361 { 362 struct spdk_fs_channel *channel = ctx_buf; 363 364 free(channel->req_mem); 365 if (channel->bs_channel != NULL) { 366 spdk_bs_free_io_channel(channel->bs_channel); 367 } 368 } 369 370 static void 371 __send_request_direct(fs_request_fn fn, void *arg) 372 { 373 fn(arg); 374 } 375 376 static void 377 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 378 { 379 fs->bs = bs; 380 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 381 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 382 fs->md_target.md_fs_channel->send_request = __send_request_direct; 383 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 384 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 385 386 pthread_mutex_lock(&g_cache_init_lock); 387 if (g_fs_count == 0) { 388 __initialize_cache(); 389 } 390 g_fs_count++; 391 pthread_mutex_unlock(&g_cache_init_lock); 392 } 393 394 static void 395 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 396 { 397 struct spdk_fs_request *req = ctx; 398 struct spdk_fs_cb_args *args = &req->args; 399 struct spdk_filesystem *fs = args->fs; 400 401 if (bserrno == 0) { 402 common_fs_bs_init(fs, bs); 403 } else { 404 free(fs); 405 fs = NULL; 406 } 407 408 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 409 free_fs_request(req); 410 } 411 412 static void 413 fs_conf_parse(void) 414 { 415 struct spdk_conf_section *sp; 416 417 sp = spdk_conf_find_section(NULL, "Blobfs"); 418 if (sp == NULL) { 419 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 420 return; 421 } 422 423 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 424 if (g_fs_cache_buffer_shift <= 0) { 425 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 426 } 427 } 428 429 static struct spdk_filesystem * 430 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 431 { 432 struct spdk_filesystem *fs; 433 434 fs = calloc(1, sizeof(*fs)); 435 if (fs == NULL) { 436 return NULL; 437 } 438 439 fs->bdev = dev; 440 fs->send_request = send_request_fn; 441 TAILQ_INIT(&fs->files); 442 443 fs->md_target.max_ops = 512; 444 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 445 sizeof(struct spdk_fs_channel)); 446 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 447 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 448 449 fs->sync_target.max_ops = 512; 450 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 451 sizeof(struct spdk_fs_channel)); 452 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 453 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 454 455 fs->io_target.max_ops = 512; 456 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 457 sizeof(struct spdk_fs_channel)); 458 459 return fs; 460 } 461 462 void 463 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 464 fs_send_request_fn send_request_fn, 465 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 466 { 467 struct spdk_filesystem *fs; 468 struct spdk_fs_request *req; 469 struct spdk_fs_cb_args *args; 470 struct spdk_bs_opts opts = {}; 471 472 fs = fs_alloc(dev, send_request_fn); 473 if (fs == NULL) { 474 cb_fn(cb_arg, NULL, -ENOMEM); 475 return; 476 } 477 478 fs_conf_parse(); 479 480 req = alloc_fs_request(fs->md_target.md_fs_channel); 481 if (req == NULL) { 482 spdk_put_io_channel(fs->md_target.md_io_channel); 483 spdk_io_device_unregister(&fs->md_target, NULL); 484 spdk_put_io_channel(fs->sync_target.sync_io_channel); 485 spdk_io_device_unregister(&fs->sync_target, NULL); 486 spdk_io_device_unregister(&fs->io_target, NULL); 487 free(fs); 488 cb_fn(cb_arg, NULL, -ENOMEM); 489 return; 490 } 491 492 args = &req->args; 493 args->fn.fs_op_with_handle = cb_fn; 494 args->arg = cb_arg; 495 args->fs = fs; 496 497 spdk_bs_opts_init(&opts); 498 strncpy(opts.bstype.bstype, "BLOBFS", SPDK_BLOBSTORE_TYPE_LENGTH); 499 if (opt) { 500 opts.cluster_sz = opt->cluster_sz; 501 } 502 spdk_bs_init(dev, &opts, init_cb, req); 503 } 504 505 static struct spdk_file * 506 file_alloc(struct spdk_filesystem *fs) 507 { 508 struct spdk_file *file; 509 510 file = calloc(1, sizeof(*file)); 511 if (file == NULL) { 512 return NULL; 513 } 514 515 file->tree = calloc(1, sizeof(*file->tree)); 516 if (file->tree == NULL) { 517 free(file); 518 return NULL; 519 } 520 521 file->fs = fs; 522 TAILQ_INIT(&file->open_requests); 523 TAILQ_INIT(&file->sync_requests); 524 pthread_spin_init(&file->lock, 0); 525 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 526 file->priority = SPDK_FILE_PRIORITY_LOW; 527 return file; 528 } 529 530 static void iter_delete_cb(void *ctx, int bserrno); 531 532 static int 533 _handle_deleted_files(struct spdk_fs_request *req) 534 { 535 struct spdk_fs_cb_args *args = &req->args; 536 struct spdk_filesystem *fs = args->fs; 537 538 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 539 struct spdk_deleted_file *deleted_file; 540 541 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 542 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 543 spdk_bs_delete_blob(fs->bs, deleted_file->id, iter_delete_cb, req); 544 free(deleted_file); 545 return 0; 546 } 547 548 return 1; 549 } 550 551 static void 552 iter_delete_cb(void *ctx, int bserrno) 553 { 554 struct spdk_fs_request *req = ctx; 555 struct spdk_fs_cb_args *args = &req->args; 556 struct spdk_filesystem *fs = args->fs; 557 558 if (_handle_deleted_files(req) == 0) { 559 return; 560 } 561 562 args->fn.fs_op_with_handle(args->arg, fs, 0); 563 free_fs_request(req); 564 565 } 566 567 static void 568 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 569 { 570 struct spdk_fs_request *req = ctx; 571 struct spdk_fs_cb_args *args = &req->args; 572 struct spdk_filesystem *fs = args->fs; 573 uint64_t *length; 574 const char *name; 575 uint32_t *is_deleted; 576 size_t value_len; 577 578 if (rc < 0) { 579 args->fn.fs_op_with_handle(args->arg, fs, rc); 580 free_fs_request(req); 581 return; 582 } 583 584 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 585 if (rc < 0) { 586 args->fn.fs_op_with_handle(args->arg, fs, rc); 587 free_fs_request(req); 588 return; 589 } 590 591 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 592 if (rc < 0) { 593 args->fn.fs_op_with_handle(args->arg, fs, rc); 594 free_fs_request(req); 595 return; 596 } 597 598 assert(value_len == 8); 599 600 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 601 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 602 if (rc < 0) { 603 struct spdk_file *f; 604 605 f = file_alloc(fs); 606 if (f == NULL) { 607 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 608 free_fs_request(req); 609 return; 610 } 611 612 f->name = strdup(name); 613 f->blobid = spdk_blob_get_id(blob); 614 f->length = *length; 615 f->length_flushed = *length; 616 f->append_pos = *length; 617 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 618 } else { 619 struct spdk_deleted_file *deleted_file; 620 621 deleted_file = calloc(1, sizeof(*deleted_file)); 622 if (deleted_file == NULL) { 623 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 624 free_fs_request(req); 625 return; 626 } 627 deleted_file->id = spdk_blob_get_id(blob); 628 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 629 } 630 } 631 632 static void 633 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 634 { 635 struct spdk_fs_request *req = ctx; 636 struct spdk_fs_cb_args *args = &req->args; 637 struct spdk_filesystem *fs = args->fs; 638 struct spdk_bs_type bstype; 639 static const struct spdk_bs_type blobfs_type = {"BLOBFS"}; 640 static const struct spdk_bs_type zeros; 641 642 if (bserrno != 0) { 643 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 644 free_fs_request(req); 645 free(fs); 646 return; 647 } 648 649 bstype = spdk_bs_get_bstype(bs); 650 651 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 652 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n"); 653 spdk_bs_set_bstype(bs, blobfs_type); 654 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 655 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n"); 656 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype)); 657 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 658 free_fs_request(req); 659 free(fs); 660 return; 661 } 662 663 common_fs_bs_init(fs, bs); 664 if (_handle_deleted_files(req) == 0) { 665 return; 666 } 667 args->fn.fs_op_with_handle(args->arg, fs, 0); 668 free_fs_request(req); 669 } 670 671 void 672 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 673 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 674 { 675 struct spdk_filesystem *fs; 676 struct spdk_fs_cb_args *args; 677 struct spdk_fs_request *req; 678 struct spdk_bs_opts bs_opts; 679 680 fs = fs_alloc(dev, send_request_fn); 681 if (fs == NULL) { 682 cb_fn(cb_arg, NULL, -ENOMEM); 683 return; 684 } 685 686 fs_conf_parse(); 687 688 req = alloc_fs_request(fs->md_target.md_fs_channel); 689 if (req == NULL) { 690 spdk_put_io_channel(fs->md_target.md_io_channel); 691 spdk_io_device_unregister(&fs->md_target, NULL); 692 spdk_put_io_channel(fs->sync_target.sync_io_channel); 693 spdk_io_device_unregister(&fs->sync_target, NULL); 694 spdk_io_device_unregister(&fs->io_target, NULL); 695 free(fs); 696 cb_fn(cb_arg, NULL, -ENOMEM); 697 return; 698 } 699 700 args = &req->args; 701 args->fn.fs_op_with_handle = cb_fn; 702 args->arg = cb_arg; 703 args->fs = fs; 704 TAILQ_INIT(&args->op.fs_load.deleted_files); 705 spdk_bs_opts_init(&bs_opts); 706 bs_opts.iter_cb_fn = iter_cb; 707 bs_opts.iter_cb_arg = req; 708 spdk_bs_load(dev, &bs_opts, load_cb, req); 709 } 710 711 static void 712 unload_cb(void *ctx, int bserrno) 713 { 714 struct spdk_fs_request *req = ctx; 715 struct spdk_fs_cb_args *args = &req->args; 716 struct spdk_filesystem *fs = args->fs; 717 718 pthread_mutex_lock(&g_cache_init_lock); 719 g_fs_count--; 720 if (g_fs_count == 0) { 721 __free_cache(); 722 } 723 pthread_mutex_unlock(&g_cache_init_lock); 724 725 args->fn.fs_op(args->arg, bserrno); 726 free(req); 727 728 spdk_io_device_unregister(&fs->io_target, NULL); 729 spdk_io_device_unregister(&fs->sync_target, NULL); 730 spdk_io_device_unregister(&fs->md_target, NULL); 731 732 free(fs); 733 } 734 735 void 736 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 737 { 738 struct spdk_fs_request *req; 739 struct spdk_fs_cb_args *args; 740 741 /* 742 * We must free the md_channel before unloading the blobstore, so just 743 * allocate this request from the general heap. 744 */ 745 req = calloc(1, sizeof(*req)); 746 if (req == NULL) { 747 cb_fn(cb_arg, -ENOMEM); 748 return; 749 } 750 751 args = &req->args; 752 args->fn.fs_op = cb_fn; 753 args->arg = cb_arg; 754 args->fs = fs; 755 756 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 757 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 758 spdk_bs_unload(fs->bs, unload_cb, req); 759 } 760 761 static struct spdk_file * 762 fs_find_file(struct spdk_filesystem *fs, const char *name) 763 { 764 struct spdk_file *file; 765 766 TAILQ_FOREACH(file, &fs->files, tailq) { 767 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 768 return file; 769 } 770 } 771 772 return NULL; 773 } 774 775 void 776 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 777 spdk_file_stat_op_complete cb_fn, void *cb_arg) 778 { 779 struct spdk_file_stat stat; 780 struct spdk_file *f = NULL; 781 782 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 783 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 784 return; 785 } 786 787 f = fs_find_file(fs, name); 788 if (f != NULL) { 789 stat.blobid = f->blobid; 790 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 791 cb_fn(cb_arg, &stat, 0); 792 return; 793 } 794 795 cb_fn(cb_arg, NULL, -ENOENT); 796 } 797 798 static void 799 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 800 { 801 struct spdk_fs_request *req = arg; 802 struct spdk_fs_cb_args *args = &req->args; 803 804 args->rc = fserrno; 805 if (fserrno == 0) { 806 memcpy(args->arg, stat, sizeof(*stat)); 807 } 808 sem_post(args->sem); 809 } 810 811 static void 812 __file_stat(void *arg) 813 { 814 struct spdk_fs_request *req = arg; 815 struct spdk_fs_cb_args *args = &req->args; 816 817 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 818 args->fn.stat_op, req); 819 } 820 821 int 822 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 823 const char *name, struct spdk_file_stat *stat) 824 { 825 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 826 struct spdk_fs_request *req; 827 int rc; 828 829 req = alloc_fs_request(channel); 830 assert(req != NULL); 831 832 req->args.fs = fs; 833 req->args.op.stat.name = name; 834 req->args.fn.stat_op = __copy_stat; 835 req->args.arg = stat; 836 req->args.sem = &channel->sem; 837 channel->send_request(__file_stat, req); 838 sem_wait(&channel->sem); 839 840 rc = req->args.rc; 841 free_fs_request(req); 842 843 return rc; 844 } 845 846 static void 847 fs_create_blob_close_cb(void *ctx, int bserrno) 848 { 849 struct spdk_fs_request *req = ctx; 850 struct spdk_fs_cb_args *args = &req->args; 851 852 args->fn.file_op(args->arg, bserrno); 853 free_fs_request(req); 854 } 855 856 static void 857 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 858 { 859 struct spdk_fs_request *req = ctx; 860 struct spdk_fs_cb_args *args = &req->args; 861 struct spdk_file *f = args->file; 862 uint64_t length = 0; 863 864 spdk_blob_resize(blob, 1); 865 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 866 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 867 868 spdk_blob_close(blob, fs_create_blob_close_cb, args); 869 } 870 871 static void 872 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 873 { 874 struct spdk_fs_request *req = ctx; 875 struct spdk_fs_cb_args *args = &req->args; 876 struct spdk_file *f = args->file; 877 878 f->blobid = blobid; 879 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 880 } 881 882 void 883 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 884 spdk_file_op_complete cb_fn, void *cb_arg) 885 { 886 struct spdk_file *file; 887 struct spdk_fs_request *req; 888 struct spdk_fs_cb_args *args; 889 890 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 891 cb_fn(cb_arg, -ENAMETOOLONG); 892 return; 893 } 894 895 file = fs_find_file(fs, name); 896 if (file != NULL) { 897 cb_fn(cb_arg, -EEXIST); 898 return; 899 } 900 901 file = file_alloc(fs); 902 if (file == NULL) { 903 cb_fn(cb_arg, -ENOMEM); 904 return; 905 } 906 907 req = alloc_fs_request(fs->md_target.md_fs_channel); 908 if (req == NULL) { 909 cb_fn(cb_arg, -ENOMEM); 910 return; 911 } 912 913 args = &req->args; 914 args->file = file; 915 args->fn.file_op = cb_fn; 916 args->arg = cb_arg; 917 918 file->name = strdup(name); 919 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 920 } 921 922 static void 923 __fs_create_file_done(void *arg, int fserrno) 924 { 925 struct spdk_fs_request *req = arg; 926 struct spdk_fs_cb_args *args = &req->args; 927 928 args->rc = fserrno; 929 sem_post(args->sem); 930 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 931 } 932 933 static void 934 __fs_create_file(void *arg) 935 { 936 struct spdk_fs_request *req = arg; 937 struct spdk_fs_cb_args *args = &req->args; 938 939 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 940 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 941 } 942 943 int 944 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 945 { 946 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 947 struct spdk_fs_request *req; 948 struct spdk_fs_cb_args *args; 949 int rc; 950 951 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 952 953 req = alloc_fs_request(channel); 954 assert(req != NULL); 955 956 args = &req->args; 957 args->fs = fs; 958 args->op.create.name = name; 959 args->sem = &channel->sem; 960 fs->send_request(__fs_create_file, req); 961 sem_wait(&channel->sem); 962 rc = args->rc; 963 free_fs_request(req); 964 965 return rc; 966 } 967 968 static void 969 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 970 { 971 struct spdk_fs_request *req = ctx; 972 struct spdk_fs_cb_args *args = &req->args; 973 struct spdk_file *f = args->file; 974 975 f->blob = blob; 976 while (!TAILQ_EMPTY(&f->open_requests)) { 977 req = TAILQ_FIRST(&f->open_requests); 978 args = &req->args; 979 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 980 args->fn.file_op_with_handle(args->arg, f, bserrno); 981 free_fs_request(req); 982 } 983 } 984 985 static void 986 fs_open_blob_create_cb(void *ctx, int bserrno) 987 { 988 struct spdk_fs_request *req = ctx; 989 struct spdk_fs_cb_args *args = &req->args; 990 struct spdk_file *file = args->file; 991 struct spdk_filesystem *fs = args->fs; 992 993 if (file == NULL) { 994 /* 995 * This is from an open with CREATE flag - the file 996 * is now created so look it up in the file list for this 997 * filesystem. 998 */ 999 file = fs_find_file(fs, args->op.open.name); 1000 assert(file != NULL); 1001 args->file = file; 1002 } 1003 1004 file->ref_count++; 1005 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1006 if (file->ref_count == 1) { 1007 assert(file->blob == NULL); 1008 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1009 } else if (file->blob != NULL) { 1010 fs_open_blob_done(req, file->blob, 0); 1011 } else { 1012 /* 1013 * The blob open for this file is in progress due to a previous 1014 * open request. When that open completes, it will invoke the 1015 * open callback for this request. 1016 */ 1017 } 1018 } 1019 1020 void 1021 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1022 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1023 { 1024 struct spdk_file *f = NULL; 1025 struct spdk_fs_request *req; 1026 struct spdk_fs_cb_args *args; 1027 1028 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1029 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1030 return; 1031 } 1032 1033 f = fs_find_file(fs, name); 1034 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1035 cb_fn(cb_arg, NULL, -ENOENT); 1036 return; 1037 } 1038 1039 if (f != NULL && f->is_deleted == true) { 1040 cb_fn(cb_arg, NULL, -ENOENT); 1041 return; 1042 } 1043 1044 req = alloc_fs_request(fs->md_target.md_fs_channel); 1045 if (req == NULL) { 1046 cb_fn(cb_arg, NULL, -ENOMEM); 1047 return; 1048 } 1049 1050 args = &req->args; 1051 args->fn.file_op_with_handle = cb_fn; 1052 args->arg = cb_arg; 1053 args->file = f; 1054 args->fs = fs; 1055 args->op.open.name = name; 1056 1057 if (f == NULL) { 1058 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1059 } else { 1060 fs_open_blob_create_cb(req, 0); 1061 } 1062 } 1063 1064 static void 1065 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1066 { 1067 struct spdk_fs_request *req = arg; 1068 struct spdk_fs_cb_args *args = &req->args; 1069 1070 args->file = file; 1071 args->rc = bserrno; 1072 sem_post(args->sem); 1073 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1074 } 1075 1076 static void 1077 __fs_open_file(void *arg) 1078 { 1079 struct spdk_fs_request *req = arg; 1080 struct spdk_fs_cb_args *args = &req->args; 1081 1082 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1083 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1084 __fs_open_file_done, req); 1085 } 1086 1087 int 1088 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1089 const char *name, uint32_t flags, struct spdk_file **file) 1090 { 1091 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1092 struct spdk_fs_request *req; 1093 struct spdk_fs_cb_args *args; 1094 int rc; 1095 1096 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1097 1098 req = alloc_fs_request(channel); 1099 assert(req != NULL); 1100 1101 args = &req->args; 1102 args->fs = fs; 1103 args->op.open.name = name; 1104 args->op.open.flags = flags; 1105 args->sem = &channel->sem; 1106 fs->send_request(__fs_open_file, req); 1107 sem_wait(&channel->sem); 1108 rc = args->rc; 1109 if (rc == 0) { 1110 *file = args->file; 1111 } else { 1112 *file = NULL; 1113 } 1114 free_fs_request(req); 1115 1116 return rc; 1117 } 1118 1119 static void 1120 fs_rename_blob_close_cb(void *ctx, int bserrno) 1121 { 1122 struct spdk_fs_request *req = ctx; 1123 struct spdk_fs_cb_args *args = &req->args; 1124 1125 args->fn.fs_op(args->arg, bserrno); 1126 free_fs_request(req); 1127 } 1128 1129 static void 1130 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1131 { 1132 struct spdk_fs_request *req = ctx; 1133 struct spdk_fs_cb_args *args = &req->args; 1134 const char *new_name = args->op.rename.new_name; 1135 1136 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1137 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1138 } 1139 1140 static void 1141 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1142 { 1143 struct spdk_fs_cb_args *args = &req->args; 1144 struct spdk_file *f; 1145 1146 f = fs_find_file(args->fs, args->op.rename.old_name); 1147 if (f == NULL) { 1148 args->fn.fs_op(args->arg, -ENOENT); 1149 free_fs_request(req); 1150 return; 1151 } 1152 1153 free(f->name); 1154 f->name = strdup(args->op.rename.new_name); 1155 args->file = f; 1156 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1157 } 1158 1159 static void 1160 fs_rename_delete_done(void *arg, int fserrno) 1161 { 1162 __spdk_fs_md_rename_file(arg); 1163 } 1164 1165 void 1166 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1167 const char *old_name, const char *new_name, 1168 spdk_file_op_complete cb_fn, void *cb_arg) 1169 { 1170 struct spdk_file *f; 1171 struct spdk_fs_request *req; 1172 struct spdk_fs_cb_args *args; 1173 1174 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1175 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1176 cb_fn(cb_arg, -ENAMETOOLONG); 1177 return; 1178 } 1179 1180 req = alloc_fs_request(fs->md_target.md_fs_channel); 1181 if (req == NULL) { 1182 cb_fn(cb_arg, -ENOMEM); 1183 return; 1184 } 1185 1186 args = &req->args; 1187 args->fn.fs_op = cb_fn; 1188 args->fs = fs; 1189 args->arg = cb_arg; 1190 args->op.rename.old_name = old_name; 1191 args->op.rename.new_name = new_name; 1192 1193 f = fs_find_file(fs, new_name); 1194 if (f == NULL) { 1195 __spdk_fs_md_rename_file(req); 1196 return; 1197 } 1198 1199 /* 1200 * The rename overwrites an existing file. So delete the existing file, then 1201 * do the actual rename. 1202 */ 1203 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1204 } 1205 1206 static void 1207 __fs_rename_file_done(void *arg, int fserrno) 1208 { 1209 struct spdk_fs_request *req = arg; 1210 struct spdk_fs_cb_args *args = &req->args; 1211 1212 args->rc = fserrno; 1213 sem_post(args->sem); 1214 } 1215 1216 static void 1217 __fs_rename_file(void *arg) 1218 { 1219 struct spdk_fs_request *req = arg; 1220 struct spdk_fs_cb_args *args = &req->args; 1221 1222 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1223 __fs_rename_file_done, req); 1224 } 1225 1226 int 1227 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1228 const char *old_name, const char *new_name) 1229 { 1230 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1231 struct spdk_fs_request *req; 1232 struct spdk_fs_cb_args *args; 1233 int rc; 1234 1235 req = alloc_fs_request(channel); 1236 assert(req != NULL); 1237 1238 args = &req->args; 1239 1240 args->fs = fs; 1241 args->op.rename.old_name = old_name; 1242 args->op.rename.new_name = new_name; 1243 args->sem = &channel->sem; 1244 fs->send_request(__fs_rename_file, req); 1245 sem_wait(&channel->sem); 1246 rc = args->rc; 1247 free_fs_request(req); 1248 return rc; 1249 } 1250 1251 static void 1252 blob_delete_cb(void *ctx, int bserrno) 1253 { 1254 struct spdk_fs_request *req = ctx; 1255 struct spdk_fs_cb_args *args = &req->args; 1256 1257 args->fn.file_op(args->arg, bserrno); 1258 free_fs_request(req); 1259 } 1260 1261 void 1262 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1263 spdk_file_op_complete cb_fn, void *cb_arg) 1264 { 1265 struct spdk_file *f; 1266 spdk_blob_id blobid; 1267 struct spdk_fs_request *req; 1268 struct spdk_fs_cb_args *args; 1269 1270 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1271 1272 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1273 cb_fn(cb_arg, -ENAMETOOLONG); 1274 return; 1275 } 1276 1277 f = fs_find_file(fs, name); 1278 if (f == NULL) { 1279 cb_fn(cb_arg, -ENOENT); 1280 return; 1281 } 1282 1283 req = alloc_fs_request(fs->md_target.md_fs_channel); 1284 if (req == NULL) { 1285 cb_fn(cb_arg, -ENOMEM); 1286 return; 1287 } 1288 1289 args = &req->args; 1290 args->fn.file_op = cb_fn; 1291 args->arg = cb_arg; 1292 1293 if (f->ref_count > 0) { 1294 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1295 f->is_deleted = true; 1296 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1297 spdk_blob_sync_md(f->blob, blob_delete_cb, args); 1298 return; 1299 } 1300 1301 TAILQ_REMOVE(&fs->files, f, tailq); 1302 1303 cache_free_buffers(f); 1304 1305 blobid = f->blobid; 1306 1307 free(f->name); 1308 free(f->tree); 1309 free(f); 1310 1311 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1312 } 1313 1314 static void 1315 __fs_delete_file_done(void *arg, int fserrno) 1316 { 1317 struct spdk_fs_request *req = arg; 1318 struct spdk_fs_cb_args *args = &req->args; 1319 1320 args->rc = fserrno; 1321 sem_post(args->sem); 1322 } 1323 1324 static void 1325 __fs_delete_file(void *arg) 1326 { 1327 struct spdk_fs_request *req = arg; 1328 struct spdk_fs_cb_args *args = &req->args; 1329 1330 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1331 } 1332 1333 int 1334 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1335 const char *name) 1336 { 1337 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1338 struct spdk_fs_request *req; 1339 struct spdk_fs_cb_args *args; 1340 int rc; 1341 1342 req = alloc_fs_request(channel); 1343 assert(req != NULL); 1344 1345 args = &req->args; 1346 args->fs = fs; 1347 args->op.delete.name = name; 1348 args->sem = &channel->sem; 1349 fs->send_request(__fs_delete_file, req); 1350 sem_wait(&channel->sem); 1351 rc = args->rc; 1352 free_fs_request(req); 1353 1354 return rc; 1355 } 1356 1357 spdk_fs_iter 1358 spdk_fs_iter_first(struct spdk_filesystem *fs) 1359 { 1360 struct spdk_file *f; 1361 1362 f = TAILQ_FIRST(&fs->files); 1363 return f; 1364 } 1365 1366 spdk_fs_iter 1367 spdk_fs_iter_next(spdk_fs_iter iter) 1368 { 1369 struct spdk_file *f = iter; 1370 1371 if (f == NULL) { 1372 return NULL; 1373 } 1374 1375 f = TAILQ_NEXT(f, tailq); 1376 return f; 1377 } 1378 1379 const char * 1380 spdk_file_get_name(struct spdk_file *file) 1381 { 1382 return file->name; 1383 } 1384 1385 uint64_t 1386 spdk_file_get_length(struct spdk_file *file) 1387 { 1388 assert(file != NULL); 1389 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1390 return file->length; 1391 } 1392 1393 static void 1394 fs_truncate_complete_cb(void *ctx, int bserrno) 1395 { 1396 struct spdk_fs_request *req = ctx; 1397 struct spdk_fs_cb_args *args = &req->args; 1398 1399 args->fn.file_op(args->arg, bserrno); 1400 free_fs_request(req); 1401 } 1402 1403 static uint64_t 1404 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1405 { 1406 return (length + cluster_sz - 1) / cluster_sz; 1407 } 1408 1409 void 1410 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1411 spdk_file_op_complete cb_fn, void *cb_arg) 1412 { 1413 struct spdk_filesystem *fs; 1414 size_t num_clusters; 1415 struct spdk_fs_request *req; 1416 struct spdk_fs_cb_args *args; 1417 1418 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1419 if (length == file->length) { 1420 cb_fn(cb_arg, 0); 1421 return; 1422 } 1423 1424 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1425 if (req == NULL) { 1426 cb_fn(cb_arg, -ENOMEM); 1427 return; 1428 } 1429 1430 args = &req->args; 1431 args->fn.file_op = cb_fn; 1432 args->arg = cb_arg; 1433 args->file = file; 1434 fs = file->fs; 1435 1436 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1437 1438 spdk_blob_resize(file->blob, num_clusters); 1439 spdk_blob_set_xattr(file->blob, "length", &length, sizeof(length)); 1440 1441 file->length = length; 1442 if (file->append_pos > file->length) { 1443 file->append_pos = file->length; 1444 } 1445 1446 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, args); 1447 } 1448 1449 static void 1450 __truncate(void *arg) 1451 { 1452 struct spdk_fs_request *req = arg; 1453 struct spdk_fs_cb_args *args = &req->args; 1454 1455 spdk_file_truncate_async(args->file, args->op.truncate.length, 1456 args->fn.file_op, args->arg); 1457 } 1458 1459 void 1460 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1461 uint64_t length) 1462 { 1463 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1464 struct spdk_fs_request *req; 1465 struct spdk_fs_cb_args *args; 1466 1467 req = alloc_fs_request(channel); 1468 assert(req != NULL); 1469 1470 args = &req->args; 1471 1472 args->file = file; 1473 args->op.truncate.length = length; 1474 args->fn.file_op = __sem_post; 1475 args->arg = &channel->sem; 1476 1477 channel->send_request(__truncate, req); 1478 sem_wait(&channel->sem); 1479 free_fs_request(req); 1480 } 1481 1482 static void 1483 __rw_done(void *ctx, int bserrno) 1484 { 1485 struct spdk_fs_request *req = ctx; 1486 struct spdk_fs_cb_args *args = &req->args; 1487 1488 spdk_dma_free(args->op.rw.pin_buf); 1489 args->fn.file_op(args->arg, bserrno); 1490 free_fs_request(req); 1491 } 1492 1493 static void 1494 __read_done(void *ctx, int bserrno) 1495 { 1496 struct spdk_fs_request *req = ctx; 1497 struct spdk_fs_cb_args *args = &req->args; 1498 1499 if (args->op.rw.is_read) { 1500 memcpy(args->op.rw.user_buf, 1501 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1502 args->op.rw.length); 1503 __rw_done(req, 0); 1504 } else { 1505 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1506 args->op.rw.user_buf, 1507 args->op.rw.length); 1508 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1509 args->op.rw.pin_buf, 1510 args->op.rw.start_page, args->op.rw.num_pages, 1511 __rw_done, req); 1512 } 1513 } 1514 1515 static void 1516 __do_blob_read(void *ctx, int fserrno) 1517 { 1518 struct spdk_fs_request *req = ctx; 1519 struct spdk_fs_cb_args *args = &req->args; 1520 1521 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1522 args->op.rw.pin_buf, 1523 args->op.rw.start_page, args->op.rw.num_pages, 1524 __read_done, req); 1525 } 1526 1527 static void 1528 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1529 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1530 { 1531 uint64_t end_page; 1532 1533 *page_size = spdk_bs_get_page_size(file->fs->bs); 1534 *start_page = offset / *page_size; 1535 end_page = (offset + length - 1) / *page_size; 1536 *num_pages = (end_page - *start_page + 1); 1537 } 1538 1539 static void 1540 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1541 void *payload, uint64_t offset, uint64_t length, 1542 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1543 { 1544 struct spdk_fs_request *req; 1545 struct spdk_fs_cb_args *args; 1546 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1547 uint64_t start_page, num_pages, pin_buf_length; 1548 uint32_t page_size; 1549 1550 if (is_read && offset + length > file->length) { 1551 cb_fn(cb_arg, -EINVAL); 1552 return; 1553 } 1554 1555 req = alloc_fs_request(channel); 1556 if (req == NULL) { 1557 cb_fn(cb_arg, -ENOMEM); 1558 return; 1559 } 1560 1561 args = &req->args; 1562 args->fn.file_op = cb_fn; 1563 args->arg = cb_arg; 1564 args->file = file; 1565 args->op.rw.channel = channel->bs_channel; 1566 args->op.rw.user_buf = payload; 1567 args->op.rw.is_read = is_read; 1568 args->op.rw.offset = offset; 1569 args->op.rw.length = length; 1570 1571 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1572 pin_buf_length = num_pages * page_size; 1573 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL); 1574 1575 args->op.rw.start_page = start_page; 1576 args->op.rw.num_pages = num_pages; 1577 1578 if (!is_read && file->length < offset + length) { 1579 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1580 } else { 1581 __do_blob_read(req, 0); 1582 } 1583 } 1584 1585 void 1586 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1587 void *payload, uint64_t offset, uint64_t length, 1588 spdk_file_op_complete cb_fn, void *cb_arg) 1589 { 1590 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1591 } 1592 1593 void 1594 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1595 void *payload, uint64_t offset, uint64_t length, 1596 spdk_file_op_complete cb_fn, void *cb_arg) 1597 { 1598 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1599 file->name, offset, length); 1600 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1601 } 1602 1603 struct spdk_io_channel * 1604 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1605 { 1606 struct spdk_io_channel *io_channel; 1607 struct spdk_fs_channel *fs_channel; 1608 1609 io_channel = spdk_get_io_channel(&fs->io_target); 1610 fs_channel = spdk_io_channel_get_ctx(io_channel); 1611 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1612 fs_channel->send_request = __send_request_direct; 1613 1614 return io_channel; 1615 } 1616 1617 struct spdk_io_channel * 1618 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs) 1619 { 1620 struct spdk_io_channel *io_channel; 1621 struct spdk_fs_channel *fs_channel; 1622 1623 io_channel = spdk_get_io_channel(&fs->io_target); 1624 fs_channel = spdk_io_channel_get_ctx(io_channel); 1625 fs_channel->send_request = fs->send_request; 1626 fs_channel->sync = 1; 1627 pthread_spin_init(&fs_channel->lock, 0); 1628 1629 return io_channel; 1630 } 1631 1632 void 1633 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1634 { 1635 spdk_put_io_channel(channel); 1636 } 1637 1638 void 1639 spdk_fs_set_cache_size(uint64_t size_in_mb) 1640 { 1641 g_fs_cache_size = size_in_mb * 1024 * 1024; 1642 } 1643 1644 uint64_t 1645 spdk_fs_get_cache_size(void) 1646 { 1647 return g_fs_cache_size / (1024 * 1024); 1648 } 1649 1650 static void __file_flush(void *_args); 1651 1652 static void * 1653 alloc_cache_memory_buffer(struct spdk_file *context) 1654 { 1655 struct spdk_file *file; 1656 void *buf; 1657 1658 buf = spdk_mempool_get(g_cache_pool); 1659 if (buf != NULL) { 1660 return buf; 1661 } 1662 1663 pthread_spin_lock(&g_caches_lock); 1664 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1665 if (!file->open_for_writing && 1666 file->priority == SPDK_FILE_PRIORITY_LOW && 1667 file != context) { 1668 break; 1669 } 1670 } 1671 pthread_spin_unlock(&g_caches_lock); 1672 if (file != NULL) { 1673 cache_free_buffers(file); 1674 buf = spdk_mempool_get(g_cache_pool); 1675 if (buf != NULL) { 1676 return buf; 1677 } 1678 } 1679 1680 pthread_spin_lock(&g_caches_lock); 1681 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1682 if (!file->open_for_writing && file != context) { 1683 break; 1684 } 1685 } 1686 pthread_spin_unlock(&g_caches_lock); 1687 if (file != NULL) { 1688 cache_free_buffers(file); 1689 buf = spdk_mempool_get(g_cache_pool); 1690 if (buf != NULL) { 1691 return buf; 1692 } 1693 } 1694 1695 pthread_spin_lock(&g_caches_lock); 1696 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1697 if (file != context) { 1698 break; 1699 } 1700 } 1701 pthread_spin_unlock(&g_caches_lock); 1702 if (file != NULL) { 1703 cache_free_buffers(file); 1704 buf = spdk_mempool_get(g_cache_pool); 1705 if (buf != NULL) { 1706 return buf; 1707 } 1708 } 1709 1710 return NULL; 1711 } 1712 1713 static struct cache_buffer * 1714 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1715 { 1716 struct cache_buffer *buf; 1717 int count = 0; 1718 1719 buf = calloc(1, sizeof(*buf)); 1720 if (buf == NULL) { 1721 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 1722 return NULL; 1723 } 1724 1725 buf->buf = alloc_cache_memory_buffer(file); 1726 while (buf->buf == NULL) { 1727 /* 1728 * TODO: alloc_cache_memory_buffer() should eventually free 1729 * some buffers. Need a more sophisticated check here, instead 1730 * of just bailing if 100 tries does not result in getting a 1731 * free buffer. This will involve using the sync channel's 1732 * semaphore to block until a buffer becomes available. 1733 */ 1734 if (count++ == 100) { 1735 SPDK_ERRLOG("could not allocate cache buffer\n"); 1736 assert(false); 1737 free(buf); 1738 return NULL; 1739 } 1740 buf->buf = alloc_cache_memory_buffer(file); 1741 } 1742 1743 buf->buf_size = CACHE_BUFFER_SIZE; 1744 buf->offset = offset; 1745 1746 pthread_spin_lock(&g_caches_lock); 1747 if (file->tree->present_mask == 0) { 1748 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1749 } 1750 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1751 pthread_spin_unlock(&g_caches_lock); 1752 1753 return buf; 1754 } 1755 1756 static struct cache_buffer * 1757 cache_append_buffer(struct spdk_file *file) 1758 { 1759 struct cache_buffer *last; 1760 1761 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1762 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1763 1764 last = cache_insert_buffer(file, file->append_pos); 1765 if (last == NULL) { 1766 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 1767 return NULL; 1768 } 1769 1770 file->last = last; 1771 1772 return last; 1773 } 1774 1775 static void 1776 __wake_caller(struct spdk_fs_cb_args *args) 1777 { 1778 sem_post(args->sem); 1779 } 1780 1781 static void __check_sync_reqs(struct spdk_file *file); 1782 1783 static void 1784 __file_cache_finish_sync(struct spdk_file *file) 1785 { 1786 struct spdk_fs_request *sync_req; 1787 struct spdk_fs_cb_args *sync_args; 1788 1789 pthread_spin_lock(&file->lock); 1790 sync_req = TAILQ_FIRST(&file->sync_requests); 1791 sync_args = &sync_req->args; 1792 assert(sync_args->op.sync.offset <= file->length_flushed); 1793 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1794 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1795 pthread_spin_unlock(&file->lock); 1796 1797 sync_args->fn.file_op(sync_args->arg, 0); 1798 __check_sync_reqs(file); 1799 1800 pthread_spin_lock(&file->lock); 1801 free_fs_request(sync_req); 1802 pthread_spin_unlock(&file->lock); 1803 } 1804 1805 static void 1806 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1807 { 1808 struct spdk_file *file = ctx; 1809 1810 __file_cache_finish_sync(file); 1811 } 1812 1813 static void 1814 __free_args(struct spdk_fs_cb_args *args) 1815 { 1816 struct spdk_fs_request *req; 1817 1818 if (!args->from_request) { 1819 free(args); 1820 } else { 1821 /* Depends on args being at the start of the spdk_fs_request structure. */ 1822 req = (struct spdk_fs_request *)args; 1823 free_fs_request(req); 1824 } 1825 } 1826 1827 static void 1828 __check_sync_reqs(struct spdk_file *file) 1829 { 1830 struct spdk_fs_request *sync_req; 1831 1832 pthread_spin_lock(&file->lock); 1833 1834 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1835 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1836 break; 1837 } 1838 } 1839 1840 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1841 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1842 sync_req->args.op.sync.xattr_in_progress = true; 1843 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 1844 sizeof(file->length_flushed)); 1845 1846 pthread_spin_unlock(&file->lock); 1847 spdk_blob_sync_md(file->blob, __file_cache_finish_sync_bs_cb, file); 1848 } else { 1849 pthread_spin_unlock(&file->lock); 1850 } 1851 } 1852 1853 static void 1854 __file_flush_done(void *arg, int bserrno) 1855 { 1856 struct spdk_fs_cb_args *args = arg; 1857 struct spdk_file *file = args->file; 1858 struct cache_buffer *next = args->op.flush.cache_buffer; 1859 1860 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1861 1862 pthread_spin_lock(&file->lock); 1863 next->in_progress = false; 1864 next->bytes_flushed += args->op.flush.length; 1865 file->length_flushed += args->op.flush.length; 1866 if (file->length_flushed > file->length) { 1867 file->length = file->length_flushed; 1868 } 1869 if (next->bytes_flushed == next->buf_size) { 1870 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1871 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1872 } 1873 1874 /* 1875 * Assert that there is no cached data that extends past the end of the underlying 1876 * blob. 1877 */ 1878 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1879 next->bytes_filled == 0); 1880 1881 pthread_spin_unlock(&file->lock); 1882 1883 __check_sync_reqs(file); 1884 1885 __file_flush(args); 1886 } 1887 1888 static void 1889 __file_flush(void *_args) 1890 { 1891 struct spdk_fs_cb_args *args = _args; 1892 struct spdk_file *file = args->file; 1893 struct cache_buffer *next; 1894 uint64_t offset, length, start_page, num_pages; 1895 uint32_t page_size; 1896 1897 pthread_spin_lock(&file->lock); 1898 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1899 if (next == NULL || next->in_progress) { 1900 /* 1901 * There is either no data to flush, or a flush I/O is already in 1902 * progress. So return immediately - if a flush I/O is in 1903 * progress we will flush more data after that is completed. 1904 */ 1905 __free_args(args); 1906 pthread_spin_unlock(&file->lock); 1907 return; 1908 } 1909 1910 offset = next->offset + next->bytes_flushed; 1911 length = next->bytes_filled - next->bytes_flushed; 1912 if (length == 0) { 1913 __free_args(args); 1914 pthread_spin_unlock(&file->lock); 1915 return; 1916 } 1917 args->op.flush.length = length; 1918 args->op.flush.cache_buffer = next; 1919 1920 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1921 1922 next->in_progress = true; 1923 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1924 offset, length, start_page, num_pages); 1925 pthread_spin_unlock(&file->lock); 1926 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1927 next->buf + (start_page * page_size) - next->offset, 1928 start_page, num_pages, __file_flush_done, args); 1929 } 1930 1931 static void 1932 __file_extend_done(void *arg, int bserrno) 1933 { 1934 struct spdk_fs_cb_args *args = arg; 1935 1936 __wake_caller(args); 1937 } 1938 1939 static void 1940 __file_extend_blob(void *_args) 1941 { 1942 struct spdk_fs_cb_args *args = _args; 1943 struct spdk_file *file = args->file; 1944 1945 spdk_blob_resize(file->blob, args->op.resize.num_clusters); 1946 1947 spdk_blob_sync_md(file->blob, __file_extend_done, args); 1948 } 1949 1950 static void 1951 __rw_from_file_done(void *arg, int bserrno) 1952 { 1953 struct spdk_fs_cb_args *args = arg; 1954 1955 __wake_caller(args); 1956 __free_args(args); 1957 } 1958 1959 static void 1960 __rw_from_file(void *_args) 1961 { 1962 struct spdk_fs_cb_args *args = _args; 1963 struct spdk_file *file = args->file; 1964 1965 if (args->op.rw.is_read) { 1966 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1967 args->op.rw.offset, args->op.rw.length, 1968 __rw_from_file_done, args); 1969 } else { 1970 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1971 args->op.rw.offset, args->op.rw.length, 1972 __rw_from_file_done, args); 1973 } 1974 } 1975 1976 static int 1977 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1978 uint64_t offset, uint64_t length, bool is_read) 1979 { 1980 struct spdk_fs_cb_args *args; 1981 1982 args = calloc(1, sizeof(*args)); 1983 if (args == NULL) { 1984 sem_post(sem); 1985 return -ENOMEM; 1986 } 1987 1988 args->file = file; 1989 args->sem = sem; 1990 args->op.rw.user_buf = payload; 1991 args->op.rw.offset = offset; 1992 args->op.rw.length = length; 1993 args->op.rw.is_read = is_read; 1994 file->fs->send_request(__rw_from_file, args); 1995 return 0; 1996 } 1997 1998 int 1999 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 2000 void *payload, uint64_t offset, uint64_t length) 2001 { 2002 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2003 struct spdk_fs_cb_args *args; 2004 uint64_t rem_length, copy, blob_size, cluster_sz; 2005 uint32_t cache_buffers_filled = 0; 2006 uint8_t *cur_payload; 2007 struct cache_buffer *last; 2008 2009 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2010 2011 if (length == 0) { 2012 return 0; 2013 } 2014 2015 if (offset != file->append_pos) { 2016 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2017 return -EINVAL; 2018 } 2019 2020 pthread_spin_lock(&file->lock); 2021 file->open_for_writing = true; 2022 2023 if (file->last == NULL) { 2024 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 2025 cache_append_buffer(file); 2026 } else { 2027 int rc; 2028 2029 file->append_pos += length; 2030 pthread_spin_unlock(&file->lock); 2031 rc = __send_rw_from_file(file, &channel->sem, payload, 2032 offset, length, false); 2033 sem_wait(&channel->sem); 2034 return rc; 2035 } 2036 } 2037 2038 blob_size = __file_get_blob_size(file); 2039 2040 if ((offset + length) > blob_size) { 2041 struct spdk_fs_cb_args extend_args = {}; 2042 2043 cluster_sz = file->fs->bs_opts.cluster_sz; 2044 extend_args.sem = &channel->sem; 2045 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2046 extend_args.file = file; 2047 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2048 pthread_spin_unlock(&file->lock); 2049 file->fs->send_request(__file_extend_blob, &extend_args); 2050 sem_wait(&channel->sem); 2051 } 2052 2053 last = file->last; 2054 rem_length = length; 2055 cur_payload = payload; 2056 while (rem_length > 0) { 2057 copy = last->buf_size - last->bytes_filled; 2058 if (copy > rem_length) { 2059 copy = rem_length; 2060 } 2061 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2062 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2063 file->append_pos += copy; 2064 if (file->length < file->append_pos) { 2065 file->length = file->append_pos; 2066 } 2067 cur_payload += copy; 2068 last->bytes_filled += copy; 2069 rem_length -= copy; 2070 if (last->bytes_filled == last->buf_size) { 2071 cache_buffers_filled++; 2072 last = cache_append_buffer(file); 2073 if (last == NULL) { 2074 BLOBFS_TRACE(file, "nomem\n"); 2075 pthread_spin_unlock(&file->lock); 2076 return -ENOMEM; 2077 } 2078 } 2079 } 2080 2081 pthread_spin_unlock(&file->lock); 2082 2083 if (cache_buffers_filled == 0) { 2084 return 0; 2085 } 2086 2087 args = calloc(1, sizeof(*args)); 2088 if (args == NULL) { 2089 return -ENOMEM; 2090 } 2091 2092 args->file = file; 2093 file->fs->send_request(__file_flush, args); 2094 return 0; 2095 } 2096 2097 static void 2098 __readahead_done(void *arg, int bserrno) 2099 { 2100 struct spdk_fs_cb_args *args = arg; 2101 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2102 struct spdk_file *file = args->file; 2103 2104 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2105 2106 pthread_spin_lock(&file->lock); 2107 cache_buffer->bytes_filled = args->op.readahead.length; 2108 cache_buffer->bytes_flushed = args->op.readahead.length; 2109 cache_buffer->in_progress = false; 2110 pthread_spin_unlock(&file->lock); 2111 2112 __free_args(args); 2113 } 2114 2115 static void 2116 __readahead(void *_args) 2117 { 2118 struct spdk_fs_cb_args *args = _args; 2119 struct spdk_file *file = args->file; 2120 uint64_t offset, length, start_page, num_pages; 2121 uint32_t page_size; 2122 2123 offset = args->op.readahead.offset; 2124 length = args->op.readahead.length; 2125 assert(length > 0); 2126 2127 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 2128 2129 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2130 offset, length, start_page, num_pages); 2131 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2132 args->op.readahead.cache_buffer->buf, 2133 start_page, num_pages, __readahead_done, args); 2134 } 2135 2136 static uint64_t 2137 __next_cache_buffer_offset(uint64_t offset) 2138 { 2139 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2140 } 2141 2142 static void 2143 check_readahead(struct spdk_file *file, uint64_t offset) 2144 { 2145 struct spdk_fs_cb_args *args; 2146 2147 offset = __next_cache_buffer_offset(offset); 2148 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2149 return; 2150 } 2151 2152 args = calloc(1, sizeof(*args)); 2153 if (args == NULL) { 2154 return; 2155 } 2156 2157 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2158 2159 args->file = file; 2160 args->op.readahead.offset = offset; 2161 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2162 args->op.readahead.cache_buffer->in_progress = true; 2163 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2164 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2165 } else { 2166 args->op.readahead.length = CACHE_BUFFER_SIZE; 2167 } 2168 file->fs->send_request(__readahead, args); 2169 } 2170 2171 static int 2172 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2173 { 2174 struct cache_buffer *buf; 2175 int rc; 2176 2177 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2178 if (buf == NULL) { 2179 pthread_spin_unlock(&file->lock); 2180 rc = __send_rw_from_file(file, sem, payload, offset, length, true); 2181 pthread_spin_lock(&file->lock); 2182 return rc; 2183 } 2184 2185 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2186 length = buf->offset + buf->bytes_filled - offset; 2187 } 2188 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2189 memcpy(payload, &buf->buf[offset - buf->offset], length); 2190 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2191 pthread_spin_lock(&g_caches_lock); 2192 spdk_tree_remove_buffer(file->tree, buf); 2193 if (file->tree->present_mask == 0) { 2194 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2195 } 2196 pthread_spin_unlock(&g_caches_lock); 2197 } 2198 2199 sem_post(sem); 2200 return 0; 2201 } 2202 2203 int64_t 2204 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2205 void *payload, uint64_t offset, uint64_t length) 2206 { 2207 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2208 uint64_t final_offset, final_length; 2209 uint32_t sub_reads = 0; 2210 int rc = 0; 2211 2212 pthread_spin_lock(&file->lock); 2213 2214 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2215 2216 file->open_for_writing = false; 2217 2218 if (length == 0 || offset >= file->append_pos) { 2219 pthread_spin_unlock(&file->lock); 2220 return 0; 2221 } 2222 2223 if (offset + length > file->append_pos) { 2224 length = file->append_pos - offset; 2225 } 2226 2227 if (offset != file->next_seq_offset) { 2228 file->seq_byte_count = 0; 2229 } 2230 file->seq_byte_count += length; 2231 file->next_seq_offset = offset + length; 2232 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2233 check_readahead(file, offset); 2234 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2235 } 2236 2237 final_length = 0; 2238 final_offset = offset + length; 2239 while (offset < final_offset) { 2240 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2241 if (length > (final_offset - offset)) { 2242 length = final_offset - offset; 2243 } 2244 rc = __file_read(file, payload, offset, length, &channel->sem); 2245 if (rc == 0) { 2246 final_length += length; 2247 } else { 2248 break; 2249 } 2250 payload += length; 2251 offset += length; 2252 sub_reads++; 2253 } 2254 pthread_spin_unlock(&file->lock); 2255 while (sub_reads-- > 0) { 2256 sem_wait(&channel->sem); 2257 } 2258 if (rc == 0) { 2259 return final_length; 2260 } else { 2261 return rc; 2262 } 2263 } 2264 2265 static void 2266 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2267 spdk_file_op_complete cb_fn, void *cb_arg) 2268 { 2269 struct spdk_fs_request *sync_req; 2270 struct spdk_fs_request *flush_req; 2271 struct spdk_fs_cb_args *sync_args; 2272 struct spdk_fs_cb_args *flush_args; 2273 2274 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2275 2276 pthread_spin_lock(&file->lock); 2277 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2278 BLOBFS_TRACE(file, "done - no data to flush\n"); 2279 pthread_spin_unlock(&file->lock); 2280 cb_fn(cb_arg, 0); 2281 return; 2282 } 2283 2284 sync_req = alloc_fs_request(channel); 2285 assert(sync_req != NULL); 2286 sync_args = &sync_req->args; 2287 2288 flush_req = alloc_fs_request(channel); 2289 assert(flush_req != NULL); 2290 flush_args = &flush_req->args; 2291 2292 sync_args->file = file; 2293 sync_args->fn.file_op = cb_fn; 2294 sync_args->arg = cb_arg; 2295 sync_args->op.sync.offset = file->append_pos; 2296 sync_args->op.sync.xattr_in_progress = false; 2297 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2298 pthread_spin_unlock(&file->lock); 2299 2300 flush_args->file = file; 2301 channel->send_request(__file_flush, flush_args); 2302 } 2303 2304 int 2305 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2306 { 2307 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2308 2309 _file_sync(file, channel, __sem_post, &channel->sem); 2310 sem_wait(&channel->sem); 2311 2312 return 0; 2313 } 2314 2315 void 2316 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2317 spdk_file_op_complete cb_fn, void *cb_arg) 2318 { 2319 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2320 2321 _file_sync(file, channel, cb_fn, cb_arg); 2322 } 2323 2324 void 2325 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2326 { 2327 BLOBFS_TRACE(file, "priority=%u\n", priority); 2328 file->priority = priority; 2329 2330 } 2331 2332 /* 2333 * Close routines 2334 */ 2335 2336 static void 2337 __file_close_async_done(void *ctx, int bserrno) 2338 { 2339 struct spdk_fs_request *req = ctx; 2340 struct spdk_fs_cb_args *args = &req->args; 2341 struct spdk_file *file = args->file; 2342 2343 if (file->is_deleted) { 2344 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2345 return; 2346 } 2347 2348 args->fn.file_op(args->arg, bserrno); 2349 free_fs_request(req); 2350 } 2351 2352 static void 2353 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2354 { 2355 struct spdk_blob *blob; 2356 2357 pthread_spin_lock(&file->lock); 2358 if (file->ref_count == 0) { 2359 pthread_spin_unlock(&file->lock); 2360 __file_close_async_done(req, -EBADF); 2361 return; 2362 } 2363 2364 file->ref_count--; 2365 if (file->ref_count > 0) { 2366 pthread_spin_unlock(&file->lock); 2367 req->args.fn.file_op(req->args.arg, 0); 2368 free_fs_request(req); 2369 return; 2370 } 2371 2372 pthread_spin_unlock(&file->lock); 2373 2374 blob = file->blob; 2375 file->blob = NULL; 2376 spdk_blob_close(blob, __file_close_async_done, req); 2377 } 2378 2379 static void 2380 __file_close_async__sync_done(void *arg, int fserrno) 2381 { 2382 struct spdk_fs_request *req = arg; 2383 struct spdk_fs_cb_args *args = &req->args; 2384 2385 __file_close_async(args->file, req); 2386 } 2387 2388 void 2389 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2390 { 2391 struct spdk_fs_request *req; 2392 struct spdk_fs_cb_args *args; 2393 2394 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2395 if (req == NULL) { 2396 cb_fn(cb_arg, -ENOMEM); 2397 return; 2398 } 2399 2400 args = &req->args; 2401 args->file = file; 2402 args->fn.file_op = cb_fn; 2403 args->arg = cb_arg; 2404 2405 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2406 } 2407 2408 static void 2409 __file_close_done(void *arg, int fserrno) 2410 { 2411 struct spdk_fs_cb_args *args = arg; 2412 2413 args->rc = fserrno; 2414 sem_post(args->sem); 2415 } 2416 2417 static void 2418 __file_close(void *arg) 2419 { 2420 struct spdk_fs_request *req = arg; 2421 struct spdk_fs_cb_args *args = &req->args; 2422 struct spdk_file *file = args->file; 2423 2424 __file_close_async(file, req); 2425 } 2426 2427 int 2428 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2429 { 2430 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2431 struct spdk_fs_request *req; 2432 struct spdk_fs_cb_args *args; 2433 2434 req = alloc_fs_request(channel); 2435 assert(req != NULL); 2436 2437 args = &req->args; 2438 2439 spdk_file_sync(file, _channel); 2440 BLOBFS_TRACE(file, "name=%s\n", file->name); 2441 args->file = file; 2442 args->sem = &channel->sem; 2443 args->fn.file_op = __file_close_done; 2444 args->arg = req; 2445 channel->send_request(__file_close, req); 2446 sem_wait(&channel->sem); 2447 2448 return args->rc; 2449 } 2450 2451 static void 2452 cache_free_buffers(struct spdk_file *file) 2453 { 2454 BLOBFS_TRACE(file, "free=%s\n", file->name); 2455 pthread_spin_lock(&file->lock); 2456 pthread_spin_lock(&g_caches_lock); 2457 if (file->tree->present_mask == 0) { 2458 pthread_spin_unlock(&g_caches_lock); 2459 pthread_spin_unlock(&file->lock); 2460 return; 2461 } 2462 spdk_tree_free_buffers(file->tree); 2463 2464 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2465 /* If not freed, put it in the end of the queue */ 2466 if (file->tree->present_mask != 0) { 2467 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2468 } 2469 file->last = NULL; 2470 pthread_spin_unlock(&g_caches_lock); 2471 pthread_spin_unlock(&file->lock); 2472 } 2473 2474 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2475 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2476