1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "blobfs_internal.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/io_channel.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 47 #define BLOBFS_TRACE(file, str, args...) \ 48 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args) 49 50 #define BLOBFS_TRACE_RW(file, str, args...) \ 51 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 54 55 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 56 static struct spdk_mempool *g_cache_pool; 57 static TAILQ_HEAD(, spdk_file) g_caches; 58 static int g_fs_count = 0; 59 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 60 static pthread_spinlock_t g_caches_lock; 61 62 static void 63 __sem_post(void *arg, int bserrno) 64 { 65 sem_t *sem = arg; 66 67 sem_post(sem); 68 } 69 70 void 71 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 72 { 73 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 74 free(cache_buffer); 75 } 76 77 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 78 79 struct spdk_file { 80 struct spdk_filesystem *fs; 81 struct spdk_blob *blob; 82 char *name; 83 uint64_t length; 84 bool is_deleted; 85 bool open_for_writing; 86 uint64_t length_flushed; 87 uint64_t append_pos; 88 uint64_t seq_byte_count; 89 uint64_t next_seq_offset; 90 uint32_t priority; 91 TAILQ_ENTRY(spdk_file) tailq; 92 spdk_blob_id blobid; 93 uint32_t ref_count; 94 pthread_spinlock_t lock; 95 struct cache_buffer *last; 96 struct cache_tree *tree; 97 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 98 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 99 TAILQ_ENTRY(spdk_file) cache_tailq; 100 }; 101 102 struct spdk_deleted_file { 103 spdk_blob_id id; 104 TAILQ_ENTRY(spdk_deleted_file) tailq; 105 }; 106 107 struct spdk_filesystem { 108 struct spdk_blob_store *bs; 109 TAILQ_HEAD(, spdk_file) files; 110 struct spdk_bs_opts bs_opts; 111 struct spdk_bs_dev *bdev; 112 fs_send_request_fn send_request; 113 114 struct { 115 uint32_t max_ops; 116 struct spdk_io_channel *sync_io_channel; 117 struct spdk_fs_channel *sync_fs_channel; 118 } sync_target; 119 120 struct { 121 uint32_t max_ops; 122 struct spdk_io_channel *md_io_channel; 123 struct spdk_fs_channel *md_fs_channel; 124 } md_target; 125 126 struct { 127 uint32_t max_ops; 128 } io_target; 129 }; 130 131 struct spdk_fs_cb_args { 132 union { 133 spdk_fs_op_with_handle_complete fs_op_with_handle; 134 spdk_fs_op_complete fs_op; 135 spdk_file_op_with_handle_complete file_op_with_handle; 136 spdk_file_op_complete file_op; 137 spdk_file_stat_op_complete stat_op; 138 } fn; 139 void *arg; 140 sem_t *sem; 141 struct spdk_filesystem *fs; 142 struct spdk_file *file; 143 int rc; 144 bool from_request; 145 union { 146 struct { 147 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 148 } fs_load; 149 struct { 150 uint64_t length; 151 } truncate; 152 struct { 153 struct spdk_io_channel *channel; 154 void *user_buf; 155 void *pin_buf; 156 int is_read; 157 off_t offset; 158 size_t length; 159 uint64_t start_page; 160 uint64_t num_pages; 161 uint32_t blocklen; 162 } rw; 163 struct { 164 const char *old_name; 165 const char *new_name; 166 } rename; 167 struct { 168 struct cache_buffer *cache_buffer; 169 uint64_t length; 170 } flush; 171 struct { 172 struct cache_buffer *cache_buffer; 173 uint64_t length; 174 uint64_t offset; 175 } readahead; 176 struct { 177 uint64_t offset; 178 TAILQ_ENTRY(spdk_fs_request) tailq; 179 bool xattr_in_progress; 180 } sync; 181 struct { 182 uint32_t num_clusters; 183 } resize; 184 struct { 185 const char *name; 186 uint32_t flags; 187 TAILQ_ENTRY(spdk_fs_request) tailq; 188 } open; 189 struct { 190 const char *name; 191 } create; 192 struct { 193 const char *name; 194 } delete; 195 struct { 196 const char *name; 197 } stat; 198 } op; 199 }; 200 201 static void cache_free_buffers(struct spdk_file *file); 202 203 static void 204 __initialize_cache(void) 205 { 206 assert(g_cache_pool == NULL); 207 208 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 209 g_fs_cache_size / CACHE_BUFFER_SIZE, 210 CACHE_BUFFER_SIZE, 211 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 212 SPDK_ENV_SOCKET_ID_ANY); 213 TAILQ_INIT(&g_caches); 214 pthread_spin_init(&g_caches_lock, 0); 215 } 216 217 static void 218 __free_cache(void) 219 { 220 assert(g_cache_pool != NULL); 221 222 spdk_mempool_free(g_cache_pool); 223 g_cache_pool = NULL; 224 } 225 226 static uint64_t 227 __file_get_blob_size(struct spdk_file *file) 228 { 229 uint64_t cluster_sz; 230 231 cluster_sz = file->fs->bs_opts.cluster_sz; 232 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 233 } 234 235 struct spdk_fs_request { 236 struct spdk_fs_cb_args args; 237 TAILQ_ENTRY(spdk_fs_request) link; 238 struct spdk_fs_channel *channel; 239 }; 240 241 struct spdk_fs_channel { 242 struct spdk_fs_request *req_mem; 243 TAILQ_HEAD(, spdk_fs_request) reqs; 244 sem_t sem; 245 struct spdk_filesystem *fs; 246 struct spdk_io_channel *bs_channel; 247 fs_send_request_fn send_request; 248 bool sync; 249 pthread_spinlock_t lock; 250 }; 251 252 static struct spdk_fs_request * 253 alloc_fs_request(struct spdk_fs_channel *channel) 254 { 255 struct spdk_fs_request *req; 256 257 if (channel->sync) { 258 pthread_spin_lock(&channel->lock); 259 } 260 261 req = TAILQ_FIRST(&channel->reqs); 262 if (req) { 263 TAILQ_REMOVE(&channel->reqs, req, link); 264 } 265 266 if (channel->sync) { 267 pthread_spin_unlock(&channel->lock); 268 } 269 270 if (req == NULL) { 271 return NULL; 272 } 273 memset(req, 0, sizeof(*req)); 274 req->channel = channel; 275 req->args.from_request = true; 276 277 return req; 278 } 279 280 static void 281 free_fs_request(struct spdk_fs_request *req) 282 { 283 struct spdk_fs_channel *channel = req->channel; 284 285 if (channel->sync) { 286 pthread_spin_lock(&channel->lock); 287 } 288 289 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 290 291 if (channel->sync) { 292 pthread_spin_unlock(&channel->lock); 293 } 294 } 295 296 static int 297 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 298 uint32_t max_ops) 299 { 300 uint32_t i; 301 302 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 303 if (!channel->req_mem) { 304 return -1; 305 } 306 307 TAILQ_INIT(&channel->reqs); 308 sem_init(&channel->sem, 0, 0); 309 310 for (i = 0; i < max_ops; i++) { 311 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 312 } 313 314 channel->fs = fs; 315 316 return 0; 317 } 318 319 static int 320 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 321 { 322 struct spdk_filesystem *fs; 323 struct spdk_fs_channel *channel = ctx_buf; 324 325 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 326 327 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 328 } 329 330 static int 331 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 332 { 333 struct spdk_filesystem *fs; 334 struct spdk_fs_channel *channel = ctx_buf; 335 336 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 337 338 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 339 } 340 341 static int 342 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 343 { 344 struct spdk_filesystem *fs; 345 struct spdk_fs_channel *channel = ctx_buf; 346 347 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 348 349 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 350 } 351 352 static void 353 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 354 { 355 struct spdk_fs_channel *channel = ctx_buf; 356 357 free(channel->req_mem); 358 if (channel->bs_channel != NULL) { 359 spdk_bs_free_io_channel(channel->bs_channel); 360 } 361 } 362 363 static void 364 __send_request_direct(fs_request_fn fn, void *arg) 365 { 366 fn(arg); 367 } 368 369 static void 370 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 371 { 372 fs->bs = bs; 373 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 374 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 375 fs->md_target.md_fs_channel->send_request = __send_request_direct; 376 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 377 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 378 379 pthread_mutex_lock(&g_cache_init_lock); 380 if (g_fs_count == 0) { 381 __initialize_cache(); 382 } 383 g_fs_count++; 384 pthread_mutex_unlock(&g_cache_init_lock); 385 } 386 387 static void 388 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 389 { 390 struct spdk_fs_request *req = ctx; 391 struct spdk_fs_cb_args *args = &req->args; 392 struct spdk_filesystem *fs = args->fs; 393 394 if (bserrno == 0) { 395 common_fs_bs_init(fs, bs); 396 } else { 397 free(fs); 398 fs = NULL; 399 } 400 401 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 402 free_fs_request(req); 403 } 404 405 static void 406 fs_conf_parse(void) 407 { 408 struct spdk_conf_section *sp; 409 410 sp = spdk_conf_find_section(NULL, "Blobfs"); 411 if (sp == NULL) { 412 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 413 return; 414 } 415 416 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 417 if (g_fs_cache_buffer_shift <= 0) { 418 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 419 } 420 } 421 422 static struct spdk_filesystem * 423 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 424 { 425 struct spdk_filesystem *fs; 426 427 fs = calloc(1, sizeof(*fs)); 428 if (fs == NULL) { 429 return NULL; 430 } 431 432 fs->bdev = dev; 433 fs->send_request = send_request_fn; 434 TAILQ_INIT(&fs->files); 435 436 fs->md_target.max_ops = 512; 437 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 438 sizeof(struct spdk_fs_channel)); 439 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 440 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 441 442 fs->sync_target.max_ops = 512; 443 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 444 sizeof(struct spdk_fs_channel)); 445 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 446 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 447 448 fs->io_target.max_ops = 512; 449 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 450 sizeof(struct spdk_fs_channel)); 451 452 return fs; 453 } 454 455 void 456 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 457 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 458 { 459 struct spdk_filesystem *fs; 460 struct spdk_fs_request *req; 461 struct spdk_fs_cb_args *args; 462 struct spdk_bs_opts opts = {}; 463 464 fs = fs_alloc(dev, send_request_fn); 465 if (fs == NULL) { 466 cb_fn(cb_arg, NULL, -ENOMEM); 467 return; 468 } 469 470 fs_conf_parse(); 471 472 req = alloc_fs_request(fs->md_target.md_fs_channel); 473 if (req == NULL) { 474 spdk_put_io_channel(fs->md_target.md_io_channel); 475 spdk_io_device_unregister(&fs->md_target, NULL); 476 spdk_put_io_channel(fs->sync_target.sync_io_channel); 477 spdk_io_device_unregister(&fs->sync_target, NULL); 478 spdk_io_device_unregister(&fs->io_target, NULL); 479 free(fs); 480 cb_fn(cb_arg, NULL, -ENOMEM); 481 return; 482 } 483 484 args = &req->args; 485 args->fn.fs_op_with_handle = cb_fn; 486 args->arg = cb_arg; 487 args->fs = fs; 488 489 spdk_bs_opts_init(&opts); 490 strncpy(opts.bstype.bstype, "BLOBFS", SPDK_BLOBSTORE_TYPE_LENGTH); 491 492 spdk_bs_init(dev, &opts, init_cb, req); 493 } 494 495 static struct spdk_file * 496 file_alloc(struct spdk_filesystem *fs) 497 { 498 struct spdk_file *file; 499 500 file = calloc(1, sizeof(*file)); 501 if (file == NULL) { 502 return NULL; 503 } 504 505 file->tree = calloc(1, sizeof(*file->tree)); 506 if (file->tree == NULL) { 507 free(file); 508 return NULL; 509 } 510 511 file->fs = fs; 512 TAILQ_INIT(&file->open_requests); 513 TAILQ_INIT(&file->sync_requests); 514 pthread_spin_init(&file->lock, 0); 515 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 516 file->priority = SPDK_FILE_PRIORITY_LOW; 517 return file; 518 } 519 520 static void iter_delete_cb(void *ctx, int bserrno); 521 522 static int 523 _handle_deleted_files(struct spdk_fs_request *req) 524 { 525 struct spdk_fs_cb_args *args = &req->args; 526 struct spdk_filesystem *fs = args->fs; 527 528 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 529 struct spdk_deleted_file *deleted_file; 530 531 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 532 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 533 spdk_bs_md_delete_blob(fs->bs, deleted_file->id, iter_delete_cb, req); 534 free(deleted_file); 535 return 0; 536 } 537 538 return 1; 539 } 540 541 static void 542 iter_delete_cb(void *ctx, int bserrno) 543 { 544 struct spdk_fs_request *req = ctx; 545 struct spdk_fs_cb_args *args = &req->args; 546 struct spdk_filesystem *fs = args->fs; 547 548 if (_handle_deleted_files(req) == 0) 549 return; 550 551 args->fn.fs_op_with_handle(args->arg, fs, 0); 552 free_fs_request(req); 553 554 } 555 556 static void 557 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 558 { 559 struct spdk_fs_request *req = ctx; 560 struct spdk_fs_cb_args *args = &req->args; 561 struct spdk_filesystem *fs = args->fs; 562 uint64_t *length; 563 const char *name; 564 uint32_t *is_deleted; 565 size_t value_len; 566 567 if (rc == -ENOENT) { 568 /* Finished iterating */ 569 if (_handle_deleted_files(req) == 0) 570 return; 571 args->fn.fs_op_with_handle(args->arg, fs, 0); 572 free_fs_request(req); 573 return; 574 } else if (rc < 0) { 575 args->fn.fs_op_with_handle(args->arg, fs, rc); 576 free_fs_request(req); 577 return; 578 } 579 580 rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len); 581 if (rc < 0) { 582 args->fn.fs_op_with_handle(args->arg, fs, rc); 583 free_fs_request(req); 584 return; 585 } 586 587 rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len); 588 if (rc < 0) { 589 args->fn.fs_op_with_handle(args->arg, fs, rc); 590 free_fs_request(req); 591 return; 592 } 593 594 assert(value_len == 8); 595 596 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 597 rc = spdk_bs_md_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 598 if (rc < 0) { 599 struct spdk_file *f; 600 601 f = file_alloc(fs); 602 if (f == NULL) { 603 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 604 free_fs_request(req); 605 return; 606 } 607 608 f->name = strdup(name); 609 f->blobid = spdk_blob_get_id(blob); 610 f->length = *length; 611 f->length_flushed = *length; 612 f->append_pos = *length; 613 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 614 } else { 615 struct spdk_deleted_file *deleted_file; 616 617 deleted_file = calloc(1, sizeof(*deleted_file)); 618 if (deleted_file == NULL) { 619 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 620 free_fs_request(req); 621 return; 622 } 623 deleted_file->id = spdk_blob_get_id(blob); 624 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 625 } 626 627 spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req); 628 } 629 630 static void 631 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 632 { 633 struct spdk_fs_request *req = ctx; 634 struct spdk_fs_cb_args *args = &req->args; 635 struct spdk_filesystem *fs = args->fs; 636 struct spdk_bs_type bstype; 637 static const struct spdk_bs_type blobfs_type = {"BLOBFS"}; 638 static const struct spdk_bs_type zeros; 639 640 if (bserrno != 0) { 641 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 642 free_fs_request(req); 643 free(fs); 644 return; 645 } 646 647 bstype = spdk_bs_get_bstype(bs); 648 649 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 650 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "assigning bstype\n"); 651 spdk_bs_set_bstype(bs, blobfs_type); 652 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 653 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "not blobfs\n"); 654 SPDK_TRACEDUMP(SPDK_TRACE_BLOB, "bstype", &bstype, sizeof(bstype)); 655 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 656 free_fs_request(req); 657 free(fs); 658 return; 659 } 660 661 common_fs_bs_init(fs, bs); 662 spdk_bs_md_iter_first(fs->bs, iter_cb, req); 663 } 664 665 void 666 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 667 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 668 { 669 struct spdk_filesystem *fs; 670 struct spdk_fs_cb_args *args; 671 struct spdk_fs_request *req; 672 struct spdk_bs_opts opts = {}; 673 674 fs = fs_alloc(dev, send_request_fn); 675 if (fs == NULL) { 676 cb_fn(cb_arg, NULL, -ENOMEM); 677 return; 678 } 679 680 fs_conf_parse(); 681 682 req = alloc_fs_request(fs->md_target.md_fs_channel); 683 if (req == NULL) { 684 spdk_put_io_channel(fs->md_target.md_io_channel); 685 spdk_io_device_unregister(&fs->md_target, NULL); 686 spdk_put_io_channel(fs->sync_target.sync_io_channel); 687 spdk_io_device_unregister(&fs->sync_target, NULL); 688 spdk_io_device_unregister(&fs->io_target, NULL); 689 free(fs); 690 cb_fn(cb_arg, NULL, -ENOMEM); 691 return; 692 } 693 694 args = &req->args; 695 args->fn.fs_op_with_handle = cb_fn; 696 args->arg = cb_arg; 697 args->fs = fs; 698 TAILQ_INIT(&args->op.fs_load.deleted_files); 699 700 spdk_bs_opts_init(&opts); 701 702 spdk_bs_load(dev, &opts, load_cb, req); 703 } 704 705 static void 706 unload_cb(void *ctx, int bserrno) 707 { 708 struct spdk_fs_request *req = ctx; 709 struct spdk_fs_cb_args *args = &req->args; 710 struct spdk_filesystem *fs = args->fs; 711 712 pthread_mutex_lock(&g_cache_init_lock); 713 g_fs_count--; 714 if (g_fs_count == 0) { 715 __free_cache(); 716 } 717 pthread_mutex_unlock(&g_cache_init_lock); 718 719 args->fn.fs_op(args->arg, bserrno); 720 free(req); 721 722 spdk_io_device_unregister(&fs->io_target, NULL); 723 spdk_io_device_unregister(&fs->sync_target, NULL); 724 spdk_io_device_unregister(&fs->md_target, NULL); 725 726 free(fs); 727 } 728 729 void 730 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 731 { 732 struct spdk_fs_request *req; 733 struct spdk_fs_cb_args *args; 734 735 /* 736 * We must free the md_channel before unloading the blobstore, so just 737 * allocate this request from the general heap. 738 */ 739 req = calloc(1, sizeof(*req)); 740 if (req == NULL) { 741 cb_fn(cb_arg, -ENOMEM); 742 return; 743 } 744 745 args = &req->args; 746 args->fn.fs_op = cb_fn; 747 args->arg = cb_arg; 748 args->fs = fs; 749 750 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 751 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 752 spdk_bs_unload(fs->bs, unload_cb, req); 753 } 754 755 static struct spdk_file * 756 fs_find_file(struct spdk_filesystem *fs, const char *name) 757 { 758 struct spdk_file *file; 759 760 TAILQ_FOREACH(file, &fs->files, tailq) { 761 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 762 return file; 763 } 764 } 765 766 return NULL; 767 } 768 769 void 770 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 771 spdk_file_stat_op_complete cb_fn, void *cb_arg) 772 { 773 struct spdk_file_stat stat; 774 struct spdk_file *f = NULL; 775 776 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 777 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 778 return; 779 } 780 781 f = fs_find_file(fs, name); 782 if (f != NULL) { 783 stat.blobid = f->blobid; 784 stat.size = f->length; 785 cb_fn(cb_arg, &stat, 0); 786 return; 787 } 788 789 cb_fn(cb_arg, NULL, -ENOENT); 790 } 791 792 static void 793 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 794 { 795 struct spdk_fs_request *req = arg; 796 struct spdk_fs_cb_args *args = &req->args; 797 798 args->rc = fserrno; 799 if (fserrno == 0) { 800 memcpy(args->arg, stat, sizeof(*stat)); 801 } 802 sem_post(args->sem); 803 } 804 805 static void 806 __file_stat(void *arg) 807 { 808 struct spdk_fs_request *req = arg; 809 struct spdk_fs_cb_args *args = &req->args; 810 811 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 812 args->fn.stat_op, req); 813 } 814 815 int 816 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 817 const char *name, struct spdk_file_stat *stat) 818 { 819 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 820 struct spdk_fs_request *req; 821 int rc; 822 823 req = alloc_fs_request(channel); 824 assert(req != NULL); 825 826 req->args.fs = fs; 827 req->args.op.stat.name = name; 828 req->args.fn.stat_op = __copy_stat; 829 req->args.arg = stat; 830 req->args.sem = &channel->sem; 831 channel->send_request(__file_stat, req); 832 sem_wait(&channel->sem); 833 834 rc = req->args.rc; 835 free_fs_request(req); 836 837 return rc; 838 } 839 840 static void 841 fs_create_blob_close_cb(void *ctx, int bserrno) 842 { 843 struct spdk_fs_request *req = ctx; 844 struct spdk_fs_cb_args *args = &req->args; 845 846 args->fn.file_op(args->arg, bserrno); 847 free_fs_request(req); 848 } 849 850 static void 851 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 852 { 853 struct spdk_fs_request *req = ctx; 854 struct spdk_fs_cb_args *args = &req->args; 855 struct spdk_file *f = args->file; 856 uint64_t length = 0; 857 858 f->blob = blob; 859 spdk_bs_md_resize_blob(blob, 1); 860 spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 861 spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length)); 862 863 spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args); 864 } 865 866 static void 867 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 868 { 869 struct spdk_fs_request *req = ctx; 870 struct spdk_fs_cb_args *args = &req->args; 871 struct spdk_file *f = args->file; 872 873 f->blobid = blobid; 874 spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 875 } 876 877 void 878 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 879 spdk_file_op_complete cb_fn, void *cb_arg) 880 { 881 struct spdk_file *file; 882 struct spdk_fs_request *req; 883 struct spdk_fs_cb_args *args; 884 885 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 886 cb_fn(cb_arg, -ENAMETOOLONG); 887 return; 888 } 889 890 file = fs_find_file(fs, name); 891 if (file != NULL) { 892 cb_fn(cb_arg, -EEXIST); 893 return; 894 } 895 896 file = file_alloc(fs); 897 if (file == NULL) { 898 cb_fn(cb_arg, -ENOMEM); 899 return; 900 } 901 902 req = alloc_fs_request(fs->md_target.md_fs_channel); 903 if (req == NULL) { 904 cb_fn(cb_arg, -ENOMEM); 905 return; 906 } 907 908 args = &req->args; 909 args->file = file; 910 args->fn.file_op = cb_fn; 911 args->arg = cb_arg; 912 913 file->name = strdup(name); 914 spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args); 915 } 916 917 static void 918 __fs_create_file_done(void *arg, int fserrno) 919 { 920 struct spdk_fs_request *req = arg; 921 struct spdk_fs_cb_args *args = &req->args; 922 923 args->rc = fserrno; 924 sem_post(args->sem); 925 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 926 } 927 928 static void 929 __fs_create_file(void *arg) 930 { 931 struct spdk_fs_request *req = arg; 932 struct spdk_fs_cb_args *args = &req->args; 933 934 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 935 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 936 } 937 938 int 939 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 940 { 941 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 942 struct spdk_fs_request *req; 943 struct spdk_fs_cb_args *args; 944 int rc; 945 946 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 947 948 req = alloc_fs_request(channel); 949 assert(req != NULL); 950 951 args = &req->args; 952 args->fs = fs; 953 args->op.create.name = name; 954 args->sem = &channel->sem; 955 fs->send_request(__fs_create_file, req); 956 sem_wait(&channel->sem); 957 rc = args->rc; 958 free_fs_request(req); 959 960 return rc; 961 } 962 963 static void 964 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 965 { 966 struct spdk_fs_request *req = ctx; 967 struct spdk_fs_cb_args *args = &req->args; 968 struct spdk_file *f = args->file; 969 970 f->blob = blob; 971 while (!TAILQ_EMPTY(&f->open_requests)) { 972 req = TAILQ_FIRST(&f->open_requests); 973 args = &req->args; 974 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 975 args->fn.file_op_with_handle(args->arg, f, bserrno); 976 free_fs_request(req); 977 } 978 } 979 980 static void 981 fs_open_blob_create_cb(void *ctx, int bserrno) 982 { 983 struct spdk_fs_request *req = ctx; 984 struct spdk_fs_cb_args *args = &req->args; 985 struct spdk_file *file = args->file; 986 struct spdk_filesystem *fs = args->fs; 987 988 if (file == NULL) { 989 /* 990 * This is from an open with CREATE flag - the file 991 * is now created so look it up in the file list for this 992 * filesystem. 993 */ 994 file = fs_find_file(fs, args->op.open.name); 995 assert(file != NULL); 996 args->file = file; 997 } 998 999 file->ref_count++; 1000 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1001 if (file->ref_count == 1) { 1002 assert(file->blob == NULL); 1003 spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1004 } else if (file->blob != NULL) { 1005 fs_open_blob_done(req, file->blob, 0); 1006 } else { 1007 /* 1008 * The blob open for this file is in progress due to a previous 1009 * open request. When that open completes, it will invoke the 1010 * open callback for this request. 1011 */ 1012 } 1013 } 1014 1015 void 1016 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1017 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1018 { 1019 struct spdk_file *f = NULL; 1020 struct spdk_fs_request *req; 1021 struct spdk_fs_cb_args *args; 1022 1023 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1024 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1025 return; 1026 } 1027 1028 f = fs_find_file(fs, name); 1029 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1030 cb_fn(cb_arg, NULL, -ENOENT); 1031 return; 1032 } 1033 1034 if (f != NULL && f->is_deleted == true) { 1035 cb_fn(cb_arg, NULL, -ENOENT); 1036 return; 1037 } 1038 1039 req = alloc_fs_request(fs->md_target.md_fs_channel); 1040 if (req == NULL) { 1041 cb_fn(cb_arg, NULL, -ENOMEM); 1042 return; 1043 } 1044 1045 args = &req->args; 1046 args->fn.file_op_with_handle = cb_fn; 1047 args->arg = cb_arg; 1048 args->file = f; 1049 args->fs = fs; 1050 args->op.open.name = name; 1051 1052 if (f == NULL) { 1053 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1054 } else { 1055 fs_open_blob_create_cb(req, 0); 1056 } 1057 } 1058 1059 static void 1060 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1061 { 1062 struct spdk_fs_request *req = arg; 1063 struct spdk_fs_cb_args *args = &req->args; 1064 1065 args->file = file; 1066 args->rc = bserrno; 1067 sem_post(args->sem); 1068 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 1069 } 1070 1071 static void 1072 __fs_open_file(void *arg) 1073 { 1074 struct spdk_fs_request *req = arg; 1075 struct spdk_fs_cb_args *args = &req->args; 1076 1077 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 1078 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1079 __fs_open_file_done, req); 1080 } 1081 1082 int 1083 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1084 const char *name, uint32_t flags, struct spdk_file **file) 1085 { 1086 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1087 struct spdk_fs_request *req; 1088 struct spdk_fs_cb_args *args; 1089 int rc; 1090 1091 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1092 1093 req = alloc_fs_request(channel); 1094 assert(req != NULL); 1095 1096 args = &req->args; 1097 args->fs = fs; 1098 args->op.open.name = name; 1099 args->op.open.flags = flags; 1100 args->sem = &channel->sem; 1101 fs->send_request(__fs_open_file, req); 1102 sem_wait(&channel->sem); 1103 rc = args->rc; 1104 if (rc == 0) { 1105 *file = args->file; 1106 } else { 1107 *file = NULL; 1108 } 1109 free_fs_request(req); 1110 1111 return rc; 1112 } 1113 1114 static void 1115 fs_rename_blob_close_cb(void *ctx, int bserrno) 1116 { 1117 struct spdk_fs_request *req = ctx; 1118 struct spdk_fs_cb_args *args = &req->args; 1119 1120 args->fn.fs_op(args->arg, bserrno); 1121 free_fs_request(req); 1122 } 1123 1124 static void 1125 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1126 { 1127 struct spdk_fs_request *req = ctx; 1128 struct spdk_fs_cb_args *args = &req->args; 1129 struct spdk_file *f = args->file; 1130 const char *new_name = args->op.rename.new_name; 1131 1132 f->blob = blob; 1133 spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1134 spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req); 1135 } 1136 1137 static void 1138 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1139 { 1140 struct spdk_fs_cb_args *args = &req->args; 1141 struct spdk_file *f; 1142 1143 f = fs_find_file(args->fs, args->op.rename.old_name); 1144 if (f == NULL) { 1145 args->fn.fs_op(args->arg, -ENOENT); 1146 free_fs_request(req); 1147 return; 1148 } 1149 1150 free(f->name); 1151 f->name = strdup(args->op.rename.new_name); 1152 args->file = f; 1153 spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1154 } 1155 1156 static void 1157 fs_rename_delete_done(void *arg, int fserrno) 1158 { 1159 __spdk_fs_md_rename_file(arg); 1160 } 1161 1162 void 1163 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1164 const char *old_name, const char *new_name, 1165 spdk_file_op_complete cb_fn, void *cb_arg) 1166 { 1167 struct spdk_file *f; 1168 struct spdk_fs_request *req; 1169 struct spdk_fs_cb_args *args; 1170 1171 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1172 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1173 cb_fn(cb_arg, -ENAMETOOLONG); 1174 return; 1175 } 1176 1177 req = alloc_fs_request(fs->md_target.md_fs_channel); 1178 if (req == NULL) { 1179 cb_fn(cb_arg, -ENOMEM); 1180 return; 1181 } 1182 1183 args = &req->args; 1184 args->fn.fs_op = cb_fn; 1185 args->fs = fs; 1186 args->arg = cb_arg; 1187 args->op.rename.old_name = old_name; 1188 args->op.rename.new_name = new_name; 1189 1190 f = fs_find_file(fs, new_name); 1191 if (f == NULL) { 1192 __spdk_fs_md_rename_file(req); 1193 return; 1194 } 1195 1196 /* 1197 * The rename overwrites an existing file. So delete the existing file, then 1198 * do the actual rename. 1199 */ 1200 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1201 } 1202 1203 static void 1204 __fs_rename_file_done(void *arg, int fserrno) 1205 { 1206 struct spdk_fs_request *req = arg; 1207 struct spdk_fs_cb_args *args = &req->args; 1208 1209 args->rc = fserrno; 1210 sem_post(args->sem); 1211 } 1212 1213 static void 1214 __fs_rename_file(void *arg) 1215 { 1216 struct spdk_fs_request *req = arg; 1217 struct spdk_fs_cb_args *args = &req->args; 1218 1219 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1220 __fs_rename_file_done, req); 1221 } 1222 1223 int 1224 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1225 const char *old_name, const char *new_name) 1226 { 1227 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1228 struct spdk_fs_request *req; 1229 struct spdk_fs_cb_args *args; 1230 int rc; 1231 1232 req = alloc_fs_request(channel); 1233 assert(req != NULL); 1234 1235 args = &req->args; 1236 1237 args->fs = fs; 1238 args->op.rename.old_name = old_name; 1239 args->op.rename.new_name = new_name; 1240 args->sem = &channel->sem; 1241 fs->send_request(__fs_rename_file, req); 1242 sem_wait(&channel->sem); 1243 rc = args->rc; 1244 free_fs_request(req); 1245 return rc; 1246 } 1247 1248 static void 1249 blob_delete_cb(void *ctx, int bserrno) 1250 { 1251 struct spdk_fs_request *req = ctx; 1252 struct spdk_fs_cb_args *args = &req->args; 1253 1254 args->fn.file_op(args->arg, bserrno); 1255 free_fs_request(req); 1256 } 1257 1258 void 1259 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1260 spdk_file_op_complete cb_fn, void *cb_arg) 1261 { 1262 struct spdk_file *f; 1263 spdk_blob_id blobid; 1264 struct spdk_fs_request *req; 1265 struct spdk_fs_cb_args *args; 1266 1267 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1268 1269 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1270 cb_fn(cb_arg, -ENAMETOOLONG); 1271 return; 1272 } 1273 1274 f = fs_find_file(fs, name); 1275 if (f == NULL) { 1276 cb_fn(cb_arg, -ENOENT); 1277 return; 1278 } 1279 1280 req = alloc_fs_request(fs->md_target.md_fs_channel); 1281 if (req == NULL) { 1282 cb_fn(cb_arg, -ENOMEM); 1283 return; 1284 } 1285 1286 args = &req->args; 1287 args->fn.file_op = cb_fn; 1288 args->arg = cb_arg; 1289 1290 if (f->ref_count > 0) { 1291 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1292 f->is_deleted = true; 1293 spdk_blob_md_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1294 spdk_bs_md_sync_blob(f->blob, blob_delete_cb, args); 1295 return; 1296 } 1297 1298 TAILQ_REMOVE(&fs->files, f, tailq); 1299 1300 cache_free_buffers(f); 1301 1302 blobid = f->blobid; 1303 1304 free(f->name); 1305 free(f->tree); 1306 free(f); 1307 1308 spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1309 } 1310 1311 static void 1312 __fs_delete_file_done(void *arg, int fserrno) 1313 { 1314 struct spdk_fs_request *req = arg; 1315 struct spdk_fs_cb_args *args = &req->args; 1316 1317 args->rc = fserrno; 1318 sem_post(args->sem); 1319 } 1320 1321 static void 1322 __fs_delete_file(void *arg) 1323 { 1324 struct spdk_fs_request *req = arg; 1325 struct spdk_fs_cb_args *args = &req->args; 1326 1327 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1328 } 1329 1330 int 1331 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1332 const char *name) 1333 { 1334 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1335 struct spdk_fs_request *req; 1336 struct spdk_fs_cb_args *args; 1337 int rc; 1338 1339 req = alloc_fs_request(channel); 1340 assert(req != NULL); 1341 1342 args = &req->args; 1343 args->fs = fs; 1344 args->op.delete.name = name; 1345 args->sem = &channel->sem; 1346 fs->send_request(__fs_delete_file, req); 1347 sem_wait(&channel->sem); 1348 rc = args->rc; 1349 free_fs_request(req); 1350 1351 return rc; 1352 } 1353 1354 spdk_fs_iter 1355 spdk_fs_iter_first(struct spdk_filesystem *fs) 1356 { 1357 struct spdk_file *f; 1358 1359 f = TAILQ_FIRST(&fs->files); 1360 return f; 1361 } 1362 1363 spdk_fs_iter 1364 spdk_fs_iter_next(spdk_fs_iter iter) 1365 { 1366 struct spdk_file *f = iter; 1367 1368 if (f == NULL) { 1369 return NULL; 1370 } 1371 1372 f = TAILQ_NEXT(f, tailq); 1373 return f; 1374 } 1375 1376 const char * 1377 spdk_file_get_name(struct spdk_file *file) 1378 { 1379 return file->name; 1380 } 1381 1382 uint64_t 1383 spdk_file_get_length(struct spdk_file *file) 1384 { 1385 assert(file != NULL); 1386 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1387 return file->length; 1388 } 1389 1390 static void 1391 fs_truncate_complete_cb(void *ctx, int bserrno) 1392 { 1393 struct spdk_fs_request *req = ctx; 1394 struct spdk_fs_cb_args *args = &req->args; 1395 1396 args->fn.file_op(args->arg, bserrno); 1397 free_fs_request(req); 1398 } 1399 1400 static uint64_t 1401 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1402 { 1403 return (length + cluster_sz - 1) / cluster_sz; 1404 } 1405 1406 void 1407 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1408 spdk_file_op_complete cb_fn, void *cb_arg) 1409 { 1410 struct spdk_filesystem *fs; 1411 size_t num_clusters; 1412 struct spdk_fs_request *req; 1413 struct spdk_fs_cb_args *args; 1414 1415 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1416 if (length == file->length) { 1417 cb_fn(cb_arg, 0); 1418 return; 1419 } 1420 1421 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1422 if (req == NULL) { 1423 cb_fn(cb_arg, -ENOMEM); 1424 return; 1425 } 1426 1427 args = &req->args; 1428 args->fn.file_op = cb_fn; 1429 args->arg = cb_arg; 1430 args->file = file; 1431 fs = file->fs; 1432 1433 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1434 1435 spdk_bs_md_resize_blob(file->blob, num_clusters); 1436 spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length)); 1437 1438 file->length = length; 1439 if (file->append_pos > file->length) { 1440 file->append_pos = file->length; 1441 } 1442 1443 spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args); 1444 } 1445 1446 static void 1447 __truncate(void *arg) 1448 { 1449 struct spdk_fs_request *req = arg; 1450 struct spdk_fs_cb_args *args = &req->args; 1451 1452 spdk_file_truncate_async(args->file, args->op.truncate.length, 1453 args->fn.file_op, args->arg); 1454 } 1455 1456 void 1457 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1458 uint64_t length) 1459 { 1460 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1461 struct spdk_fs_request *req; 1462 struct spdk_fs_cb_args *args; 1463 1464 req = alloc_fs_request(channel); 1465 assert(req != NULL); 1466 1467 args = &req->args; 1468 1469 args->file = file; 1470 args->op.truncate.length = length; 1471 args->fn.file_op = __sem_post; 1472 args->arg = &channel->sem; 1473 1474 channel->send_request(__truncate, req); 1475 sem_wait(&channel->sem); 1476 free_fs_request(req); 1477 } 1478 1479 static void 1480 __rw_done(void *ctx, int bserrno) 1481 { 1482 struct spdk_fs_request *req = ctx; 1483 struct spdk_fs_cb_args *args = &req->args; 1484 1485 spdk_dma_free(args->op.rw.pin_buf); 1486 args->fn.file_op(args->arg, bserrno); 1487 free_fs_request(req); 1488 } 1489 1490 static void 1491 __read_done(void *ctx, int bserrno) 1492 { 1493 struct spdk_fs_request *req = ctx; 1494 struct spdk_fs_cb_args *args = &req->args; 1495 1496 if (args->op.rw.is_read) { 1497 memcpy(args->op.rw.user_buf, 1498 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1499 args->op.rw.length); 1500 __rw_done(req, 0); 1501 } else { 1502 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1503 args->op.rw.user_buf, 1504 args->op.rw.length); 1505 spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel, 1506 args->op.rw.pin_buf, 1507 args->op.rw.start_page, args->op.rw.num_pages, 1508 __rw_done, req); 1509 } 1510 } 1511 1512 static void 1513 __do_blob_read(void *ctx, int fserrno) 1514 { 1515 struct spdk_fs_request *req = ctx; 1516 struct spdk_fs_cb_args *args = &req->args; 1517 1518 spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel, 1519 args->op.rw.pin_buf, 1520 args->op.rw.start_page, args->op.rw.num_pages, 1521 __read_done, req); 1522 } 1523 1524 static void 1525 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1526 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1527 { 1528 uint64_t end_page; 1529 1530 *page_size = spdk_bs_get_page_size(file->fs->bs); 1531 *start_page = offset / *page_size; 1532 end_page = (offset + length - 1) / *page_size; 1533 *num_pages = (end_page - *start_page + 1); 1534 } 1535 1536 static void 1537 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1538 void *payload, uint64_t offset, uint64_t length, 1539 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1540 { 1541 struct spdk_fs_request *req; 1542 struct spdk_fs_cb_args *args; 1543 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1544 uint64_t start_page, num_pages, pin_buf_length; 1545 uint32_t page_size; 1546 1547 if (is_read && offset + length > file->length) { 1548 cb_fn(cb_arg, -EINVAL); 1549 return; 1550 } 1551 1552 req = alloc_fs_request(channel); 1553 if (req == NULL) { 1554 cb_fn(cb_arg, -ENOMEM); 1555 return; 1556 } 1557 1558 args = &req->args; 1559 args->fn.file_op = cb_fn; 1560 args->arg = cb_arg; 1561 args->file = file; 1562 args->op.rw.channel = channel->bs_channel; 1563 args->op.rw.user_buf = payload; 1564 args->op.rw.is_read = is_read; 1565 args->op.rw.offset = offset; 1566 args->op.rw.length = length; 1567 1568 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1569 pin_buf_length = num_pages * page_size; 1570 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL); 1571 1572 args->op.rw.start_page = start_page; 1573 args->op.rw.num_pages = num_pages; 1574 1575 if (!is_read && file->length < offset + length) { 1576 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1577 } else { 1578 __do_blob_read(req, 0); 1579 } 1580 } 1581 1582 void 1583 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1584 void *payload, uint64_t offset, uint64_t length, 1585 spdk_file_op_complete cb_fn, void *cb_arg) 1586 { 1587 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1588 } 1589 1590 void 1591 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1592 void *payload, uint64_t offset, uint64_t length, 1593 spdk_file_op_complete cb_fn, void *cb_arg) 1594 { 1595 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n", 1596 file->name, offset, length); 1597 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1598 } 1599 1600 struct spdk_io_channel * 1601 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1602 { 1603 struct spdk_io_channel *io_channel; 1604 struct spdk_fs_channel *fs_channel; 1605 1606 io_channel = spdk_get_io_channel(&fs->io_target); 1607 fs_channel = spdk_io_channel_get_ctx(io_channel); 1608 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1609 fs_channel->send_request = __send_request_direct; 1610 1611 return io_channel; 1612 } 1613 1614 struct spdk_io_channel * 1615 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs) 1616 { 1617 struct spdk_io_channel *io_channel; 1618 struct spdk_fs_channel *fs_channel; 1619 1620 io_channel = spdk_get_io_channel(&fs->io_target); 1621 fs_channel = spdk_io_channel_get_ctx(io_channel); 1622 fs_channel->send_request = fs->send_request; 1623 fs_channel->sync = 1; 1624 pthread_spin_init(&fs_channel->lock, 0); 1625 1626 return io_channel; 1627 } 1628 1629 void 1630 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1631 { 1632 spdk_put_io_channel(channel); 1633 } 1634 1635 void 1636 spdk_fs_set_cache_size(uint64_t size_in_mb) 1637 { 1638 g_fs_cache_size = size_in_mb * 1024 * 1024; 1639 } 1640 1641 uint64_t 1642 spdk_fs_get_cache_size(void) 1643 { 1644 return g_fs_cache_size / (1024 * 1024); 1645 } 1646 1647 static void __file_flush(void *_args); 1648 1649 static void * 1650 alloc_cache_memory_buffer(struct spdk_file *context) 1651 { 1652 struct spdk_file *file; 1653 void *buf; 1654 1655 buf = spdk_mempool_get(g_cache_pool); 1656 if (buf != NULL) { 1657 return buf; 1658 } 1659 1660 pthread_spin_lock(&g_caches_lock); 1661 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1662 if (!file->open_for_writing && 1663 file->priority == SPDK_FILE_PRIORITY_LOW && 1664 file != context) { 1665 break; 1666 } 1667 } 1668 pthread_spin_unlock(&g_caches_lock); 1669 if (file != NULL) { 1670 cache_free_buffers(file); 1671 buf = spdk_mempool_get(g_cache_pool); 1672 if (buf != NULL) { 1673 return buf; 1674 } 1675 } 1676 1677 pthread_spin_lock(&g_caches_lock); 1678 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1679 if (!file->open_for_writing && file != context) { 1680 break; 1681 } 1682 } 1683 pthread_spin_unlock(&g_caches_lock); 1684 if (file != NULL) { 1685 cache_free_buffers(file); 1686 buf = spdk_mempool_get(g_cache_pool); 1687 if (buf != NULL) { 1688 return buf; 1689 } 1690 } 1691 1692 pthread_spin_lock(&g_caches_lock); 1693 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1694 if (file != context) { 1695 break; 1696 } 1697 } 1698 pthread_spin_unlock(&g_caches_lock); 1699 if (file != NULL) { 1700 cache_free_buffers(file); 1701 buf = spdk_mempool_get(g_cache_pool); 1702 if (buf != NULL) { 1703 return buf; 1704 } 1705 } 1706 1707 return NULL; 1708 } 1709 1710 static struct cache_buffer * 1711 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1712 { 1713 struct cache_buffer *buf; 1714 int count = 0; 1715 1716 buf = calloc(1, sizeof(*buf)); 1717 if (buf == NULL) { 1718 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "calloc failed\n"); 1719 return NULL; 1720 } 1721 1722 buf->buf = alloc_cache_memory_buffer(file); 1723 while (buf->buf == NULL) { 1724 /* 1725 * TODO: alloc_cache_memory_buffer() should eventually free 1726 * some buffers. Need a more sophisticated check here, instead 1727 * of just bailing if 100 tries does not result in getting a 1728 * free buffer. This will involve using the sync channel's 1729 * semaphore to block until a buffer becomes available. 1730 */ 1731 if (count++ == 100) { 1732 SPDK_ERRLOG("could not allocate cache buffer\n"); 1733 assert(false); 1734 free(buf); 1735 return NULL; 1736 } 1737 buf->buf = alloc_cache_memory_buffer(file); 1738 } 1739 1740 buf->buf_size = CACHE_BUFFER_SIZE; 1741 buf->offset = offset; 1742 1743 pthread_spin_lock(&g_caches_lock); 1744 if (file->tree->present_mask == 0) { 1745 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1746 } 1747 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1748 pthread_spin_unlock(&g_caches_lock); 1749 1750 return buf; 1751 } 1752 1753 static struct cache_buffer * 1754 cache_append_buffer(struct spdk_file *file) 1755 { 1756 struct cache_buffer *last; 1757 1758 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1759 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1760 1761 last = cache_insert_buffer(file, file->append_pos); 1762 if (last == NULL) { 1763 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n"); 1764 return NULL; 1765 } 1766 1767 file->last = last; 1768 1769 return last; 1770 } 1771 1772 static void 1773 __wake_caller(struct spdk_fs_cb_args *args) 1774 { 1775 sem_post(args->sem); 1776 } 1777 1778 static void __check_sync_reqs(struct spdk_file *file); 1779 1780 static void 1781 __file_cache_finish_sync(struct spdk_file *file) 1782 { 1783 struct spdk_fs_request *sync_req; 1784 struct spdk_fs_cb_args *sync_args; 1785 1786 pthread_spin_lock(&file->lock); 1787 sync_req = TAILQ_FIRST(&file->sync_requests); 1788 sync_args = &sync_req->args; 1789 assert(sync_args->op.sync.offset <= file->length_flushed); 1790 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1791 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1792 pthread_spin_unlock(&file->lock); 1793 1794 sync_args->fn.file_op(sync_args->arg, 0); 1795 __check_sync_reqs(file); 1796 1797 pthread_spin_lock(&file->lock); 1798 free_fs_request(sync_req); 1799 pthread_spin_unlock(&file->lock); 1800 } 1801 1802 static void 1803 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1804 { 1805 struct spdk_file *file = ctx; 1806 1807 __file_cache_finish_sync(file); 1808 } 1809 1810 static void 1811 __free_args(struct spdk_fs_cb_args *args) 1812 { 1813 struct spdk_fs_request *req; 1814 1815 if (!args->from_request) { 1816 free(args); 1817 } else { 1818 /* Depends on args being at the start of the spdk_fs_request structure. */ 1819 req = (struct spdk_fs_request *)args; 1820 free_fs_request(req); 1821 } 1822 } 1823 1824 static void 1825 __check_sync_reqs(struct spdk_file *file) 1826 { 1827 struct spdk_fs_request *sync_req; 1828 1829 pthread_spin_lock(&file->lock); 1830 1831 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1832 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1833 break; 1834 } 1835 } 1836 1837 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1838 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1839 sync_req->args.op.sync.xattr_in_progress = true; 1840 spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed, 1841 sizeof(file->length_flushed)); 1842 1843 pthread_spin_unlock(&file->lock); 1844 spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file); 1845 } else { 1846 pthread_spin_unlock(&file->lock); 1847 } 1848 } 1849 1850 static void 1851 __file_flush_done(void *arg, int bserrno) 1852 { 1853 struct spdk_fs_cb_args *args = arg; 1854 struct spdk_file *file = args->file; 1855 struct cache_buffer *next = args->op.flush.cache_buffer; 1856 1857 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1858 1859 pthread_spin_lock(&file->lock); 1860 next->in_progress = false; 1861 next->bytes_flushed += args->op.flush.length; 1862 file->length_flushed += args->op.flush.length; 1863 if (file->length_flushed > file->length) { 1864 file->length = file->length_flushed; 1865 } 1866 if (next->bytes_flushed == next->buf_size) { 1867 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1868 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1869 } 1870 1871 /* 1872 * Assert that there is no cached data that extends past the end of the underlying 1873 * blob. 1874 */ 1875 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1876 next->bytes_filled == 0); 1877 1878 pthread_spin_unlock(&file->lock); 1879 1880 __check_sync_reqs(file); 1881 1882 __file_flush(args); 1883 } 1884 1885 static void 1886 __file_flush(void *_args) 1887 { 1888 struct spdk_fs_cb_args *args = _args; 1889 struct spdk_file *file = args->file; 1890 struct cache_buffer *next; 1891 uint64_t offset, length, start_page, num_pages; 1892 uint32_t page_size; 1893 1894 pthread_spin_lock(&file->lock); 1895 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1896 if (next == NULL || next->in_progress) { 1897 /* 1898 * There is either no data to flush, or a flush I/O is already in 1899 * progress. So return immediately - if a flush I/O is in 1900 * progress we will flush more data after that is completed. 1901 */ 1902 __free_args(args); 1903 pthread_spin_unlock(&file->lock); 1904 return; 1905 } 1906 1907 offset = next->offset + next->bytes_flushed; 1908 length = next->bytes_filled - next->bytes_flushed; 1909 if (length == 0) { 1910 __free_args(args); 1911 pthread_spin_unlock(&file->lock); 1912 return; 1913 } 1914 args->op.flush.length = length; 1915 args->op.flush.cache_buffer = next; 1916 1917 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1918 1919 next->in_progress = true; 1920 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1921 offset, length, start_page, num_pages); 1922 pthread_spin_unlock(&file->lock); 1923 spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1924 next->buf + (start_page * page_size) - next->offset, 1925 start_page, num_pages, 1926 __file_flush_done, args); 1927 } 1928 1929 static void 1930 __file_extend_done(void *arg, int bserrno) 1931 { 1932 struct spdk_fs_cb_args *args = arg; 1933 1934 __wake_caller(args); 1935 } 1936 1937 static void 1938 __file_extend_blob(void *_args) 1939 { 1940 struct spdk_fs_cb_args *args = _args; 1941 struct spdk_file *file = args->file; 1942 1943 spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters); 1944 1945 spdk_bs_md_sync_blob(file->blob, __file_extend_done, args); 1946 } 1947 1948 static void 1949 __rw_from_file_done(void *arg, int bserrno) 1950 { 1951 struct spdk_fs_cb_args *args = arg; 1952 1953 __wake_caller(args); 1954 __free_args(args); 1955 } 1956 1957 static void 1958 __rw_from_file(void *_args) 1959 { 1960 struct spdk_fs_cb_args *args = _args; 1961 struct spdk_file *file = args->file; 1962 1963 if (args->op.rw.is_read) { 1964 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1965 args->op.rw.offset, args->op.rw.length, 1966 __rw_from_file_done, args); 1967 } else { 1968 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1969 args->op.rw.offset, args->op.rw.length, 1970 __rw_from_file_done, args); 1971 } 1972 } 1973 1974 static int 1975 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1976 uint64_t offset, uint64_t length, bool is_read) 1977 { 1978 struct spdk_fs_cb_args *args; 1979 1980 args = calloc(1, sizeof(*args)); 1981 if (args == NULL) { 1982 sem_post(sem); 1983 return -ENOMEM; 1984 } 1985 1986 args->file = file; 1987 args->sem = sem; 1988 args->op.rw.user_buf = payload; 1989 args->op.rw.offset = offset; 1990 args->op.rw.length = length; 1991 args->op.rw.is_read = is_read; 1992 file->fs->send_request(__rw_from_file, args); 1993 return 0; 1994 } 1995 1996 int 1997 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 1998 void *payload, uint64_t offset, uint64_t length) 1999 { 2000 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2001 struct spdk_fs_cb_args *args; 2002 uint64_t rem_length, copy, blob_size, cluster_sz; 2003 uint32_t cache_buffers_filled = 0; 2004 uint8_t *cur_payload; 2005 struct cache_buffer *last; 2006 2007 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2008 2009 if (length == 0) { 2010 return 0; 2011 } 2012 2013 if (offset != file->append_pos) { 2014 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2015 return -EINVAL; 2016 } 2017 2018 pthread_spin_lock(&file->lock); 2019 file->open_for_writing = true; 2020 2021 if (file->last == NULL) { 2022 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 2023 cache_append_buffer(file); 2024 } else { 2025 int rc; 2026 2027 file->append_pos += length; 2028 pthread_spin_unlock(&file->lock); 2029 rc = __send_rw_from_file(file, &channel->sem, payload, 2030 offset, length, false); 2031 sem_wait(&channel->sem); 2032 return rc; 2033 } 2034 } 2035 2036 blob_size = __file_get_blob_size(file); 2037 2038 if ((offset + length) > blob_size) { 2039 struct spdk_fs_cb_args extend_args = {}; 2040 2041 cluster_sz = file->fs->bs_opts.cluster_sz; 2042 extend_args.sem = &channel->sem; 2043 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2044 extend_args.file = file; 2045 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2046 pthread_spin_unlock(&file->lock); 2047 file->fs->send_request(__file_extend_blob, &extend_args); 2048 sem_wait(&channel->sem); 2049 } 2050 2051 last = file->last; 2052 rem_length = length; 2053 cur_payload = payload; 2054 while (rem_length > 0) { 2055 copy = last->buf_size - last->bytes_filled; 2056 if (copy > rem_length) { 2057 copy = rem_length; 2058 } 2059 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2060 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2061 file->append_pos += copy; 2062 if (file->length < file->append_pos) { 2063 file->length = file->append_pos; 2064 } 2065 cur_payload += copy; 2066 last->bytes_filled += copy; 2067 rem_length -= copy; 2068 if (last->bytes_filled == last->buf_size) { 2069 cache_buffers_filled++; 2070 last = cache_append_buffer(file); 2071 if (last == NULL) { 2072 BLOBFS_TRACE(file, "nomem\n"); 2073 pthread_spin_unlock(&file->lock); 2074 return -ENOMEM; 2075 } 2076 } 2077 } 2078 2079 if (cache_buffers_filled == 0) { 2080 pthread_spin_unlock(&file->lock); 2081 return 0; 2082 } 2083 2084 args = calloc(1, sizeof(*args)); 2085 if (args == NULL) { 2086 pthread_spin_unlock(&file->lock); 2087 return -ENOMEM; 2088 } 2089 2090 args->file = file; 2091 file->fs->send_request(__file_flush, args); 2092 pthread_spin_unlock(&file->lock); 2093 return 0; 2094 } 2095 2096 static void 2097 __readahead_done(void *arg, int bserrno) 2098 { 2099 struct spdk_fs_cb_args *args = arg; 2100 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2101 struct spdk_file *file = args->file; 2102 2103 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2104 2105 pthread_spin_lock(&file->lock); 2106 cache_buffer->bytes_filled = args->op.readahead.length; 2107 cache_buffer->bytes_flushed = args->op.readahead.length; 2108 cache_buffer->in_progress = false; 2109 pthread_spin_unlock(&file->lock); 2110 2111 __free_args(args); 2112 } 2113 2114 static void 2115 __readahead(void *_args) 2116 { 2117 struct spdk_fs_cb_args *args = _args; 2118 struct spdk_file *file = args->file; 2119 uint64_t offset, length, start_page, num_pages; 2120 uint32_t page_size; 2121 2122 offset = args->op.readahead.offset; 2123 length = args->op.readahead.length; 2124 assert(length > 0); 2125 2126 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 2127 2128 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2129 offset, length, start_page, num_pages); 2130 spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2131 args->op.readahead.cache_buffer->buf, 2132 start_page, num_pages, 2133 __readahead_done, args); 2134 } 2135 2136 static uint64_t 2137 __next_cache_buffer_offset(uint64_t offset) 2138 { 2139 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2140 } 2141 2142 static void 2143 check_readahead(struct spdk_file *file, uint64_t offset) 2144 { 2145 struct spdk_fs_cb_args *args; 2146 2147 offset = __next_cache_buffer_offset(offset); 2148 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2149 return; 2150 } 2151 2152 args = calloc(1, sizeof(*args)); 2153 if (args == NULL) { 2154 return; 2155 } 2156 2157 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2158 2159 args->file = file; 2160 args->op.readahead.offset = offset; 2161 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2162 args->op.readahead.cache_buffer->in_progress = true; 2163 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2164 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2165 } else { 2166 args->op.readahead.length = CACHE_BUFFER_SIZE; 2167 } 2168 file->fs->send_request(__readahead, args); 2169 } 2170 2171 static int 2172 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2173 { 2174 struct cache_buffer *buf; 2175 int rc; 2176 2177 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2178 if (buf == NULL) { 2179 pthread_spin_unlock(&file->lock); 2180 rc = __send_rw_from_file(file, sem, payload, offset, length, true); 2181 pthread_spin_lock(&file->lock); 2182 return rc; 2183 } 2184 2185 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2186 length = buf->offset + buf->bytes_filled - offset; 2187 } 2188 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2189 memcpy(payload, &buf->buf[offset - buf->offset], length); 2190 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2191 pthread_spin_lock(&g_caches_lock); 2192 spdk_tree_remove_buffer(file->tree, buf); 2193 if (file->tree->present_mask == 0) { 2194 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2195 } 2196 pthread_spin_unlock(&g_caches_lock); 2197 } 2198 2199 sem_post(sem); 2200 return 0; 2201 } 2202 2203 int64_t 2204 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2205 void *payload, uint64_t offset, uint64_t length) 2206 { 2207 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2208 uint64_t final_offset, final_length; 2209 uint32_t sub_reads = 0; 2210 int rc = 0; 2211 2212 pthread_spin_lock(&file->lock); 2213 2214 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2215 2216 file->open_for_writing = false; 2217 2218 if (length == 0 || offset >= file->append_pos) { 2219 pthread_spin_unlock(&file->lock); 2220 return 0; 2221 } 2222 2223 if (offset + length > file->append_pos) { 2224 length = file->append_pos - offset; 2225 } 2226 2227 if (offset != file->next_seq_offset) { 2228 file->seq_byte_count = 0; 2229 } 2230 file->seq_byte_count += length; 2231 file->next_seq_offset = offset + length; 2232 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2233 check_readahead(file, offset); 2234 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2235 } 2236 2237 final_length = 0; 2238 final_offset = offset + length; 2239 while (offset < final_offset) { 2240 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2241 if (length > (final_offset - offset)) { 2242 length = final_offset - offset; 2243 } 2244 rc = __file_read(file, payload, offset, length, &channel->sem); 2245 if (rc == 0) { 2246 final_length += length; 2247 } else { 2248 break; 2249 } 2250 payload += length; 2251 offset += length; 2252 sub_reads++; 2253 } 2254 pthread_spin_unlock(&file->lock); 2255 while (sub_reads-- > 0) { 2256 sem_wait(&channel->sem); 2257 } 2258 if (rc == 0) { 2259 return final_length; 2260 } else { 2261 return rc; 2262 } 2263 } 2264 2265 static void 2266 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2267 spdk_file_op_complete cb_fn, void *cb_arg) 2268 { 2269 struct spdk_fs_request *sync_req; 2270 struct spdk_fs_request *flush_req; 2271 struct spdk_fs_cb_args *sync_args; 2272 struct spdk_fs_cb_args *flush_args; 2273 2274 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2275 2276 pthread_spin_lock(&file->lock); 2277 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2278 BLOBFS_TRACE(file, "done - no data to flush\n"); 2279 pthread_spin_unlock(&file->lock); 2280 cb_fn(cb_arg, 0); 2281 return; 2282 } 2283 2284 sync_req = alloc_fs_request(channel); 2285 assert(sync_req != NULL); 2286 sync_args = &sync_req->args; 2287 2288 flush_req = alloc_fs_request(channel); 2289 assert(flush_req != NULL); 2290 flush_args = &flush_req->args; 2291 2292 sync_args->file = file; 2293 sync_args->fn.file_op = cb_fn; 2294 sync_args->arg = cb_arg; 2295 sync_args->op.sync.offset = file->append_pos; 2296 sync_args->op.sync.xattr_in_progress = false; 2297 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2298 pthread_spin_unlock(&file->lock); 2299 2300 flush_args->file = file; 2301 channel->send_request(__file_flush, flush_args); 2302 } 2303 2304 int 2305 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2306 { 2307 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2308 2309 _file_sync(file, channel, __sem_post, &channel->sem); 2310 sem_wait(&channel->sem); 2311 2312 return 0; 2313 } 2314 2315 void 2316 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2317 spdk_file_op_complete cb_fn, void *cb_arg) 2318 { 2319 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2320 2321 _file_sync(file, channel, cb_fn, cb_arg); 2322 } 2323 2324 void 2325 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2326 { 2327 BLOBFS_TRACE(file, "priority=%u\n", priority); 2328 file->priority = priority; 2329 2330 } 2331 2332 /* 2333 * Close routines 2334 */ 2335 2336 static void 2337 __file_close_async_done(void *ctx, int bserrno) 2338 { 2339 struct spdk_fs_request *req = ctx; 2340 struct spdk_fs_cb_args *args = &req->args; 2341 struct spdk_file *file = args->file; 2342 2343 if (file->is_deleted) { 2344 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2345 return; 2346 } 2347 args->fn.file_op(args->arg, bserrno); 2348 free_fs_request(req); 2349 } 2350 2351 static void 2352 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2353 { 2354 pthread_spin_lock(&file->lock); 2355 if (file->ref_count == 0) { 2356 pthread_spin_unlock(&file->lock); 2357 __file_close_async_done(req, -EBADF); 2358 return; 2359 } 2360 2361 file->ref_count--; 2362 if (file->ref_count > 0) { 2363 pthread_spin_unlock(&file->lock); 2364 __file_close_async_done(req, 0); 2365 return; 2366 } 2367 2368 pthread_spin_unlock(&file->lock); 2369 2370 spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req); 2371 } 2372 2373 static void 2374 __file_close_async__sync_done(void *arg, int fserrno) 2375 { 2376 struct spdk_fs_request *req = arg; 2377 struct spdk_fs_cb_args *args = &req->args; 2378 2379 __file_close_async(args->file, req); 2380 } 2381 2382 void 2383 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2384 { 2385 struct spdk_fs_request *req; 2386 struct spdk_fs_cb_args *args; 2387 2388 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2389 if (req == NULL) { 2390 cb_fn(cb_arg, -ENOMEM); 2391 return; 2392 } 2393 2394 args = &req->args; 2395 args->file = file; 2396 args->fn.file_op = cb_fn; 2397 args->arg = cb_arg; 2398 2399 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2400 } 2401 2402 static void 2403 __file_close_done(void *arg, int fserrno) 2404 { 2405 struct spdk_fs_cb_args *args = arg; 2406 2407 args->rc = fserrno; 2408 sem_post(args->sem); 2409 } 2410 2411 static void 2412 __file_close(void *arg) 2413 { 2414 struct spdk_fs_request *req = arg; 2415 struct spdk_fs_cb_args *args = &req->args; 2416 struct spdk_file *file = args->file; 2417 2418 __file_close_async(file, req); 2419 } 2420 2421 int 2422 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2423 { 2424 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2425 struct spdk_fs_request *req; 2426 struct spdk_fs_cb_args *args; 2427 2428 req = alloc_fs_request(channel); 2429 assert(req != NULL); 2430 2431 args = &req->args; 2432 2433 spdk_file_sync(file, _channel); 2434 BLOBFS_TRACE(file, "name=%s\n", file->name); 2435 args->file = file; 2436 args->sem = &channel->sem; 2437 args->fn.file_op = __file_close_done; 2438 args->arg = req; 2439 channel->send_request(__file_close, req); 2440 sem_wait(&channel->sem); 2441 2442 return args->rc; 2443 } 2444 2445 static void 2446 cache_free_buffers(struct spdk_file *file) 2447 { 2448 BLOBFS_TRACE(file, "free=%s\n", file->name); 2449 pthread_spin_lock(&file->lock); 2450 pthread_spin_lock(&g_caches_lock); 2451 if (file->tree->present_mask == 0) { 2452 pthread_spin_unlock(&g_caches_lock); 2453 pthread_spin_unlock(&file->lock); 2454 return; 2455 } 2456 spdk_tree_free_buffers(file->tree); 2457 2458 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2459 /* If not freed, put it in the end of the queue */ 2460 if (file->tree->present_mask != 0) { 2461 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2462 } 2463 file->last = NULL; 2464 pthread_spin_unlock(&g_caches_lock); 2465 pthread_spin_unlock(&file->lock); 2466 } 2467 2468 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS); 2469 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW); 2470