1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "blobfs_internal.h" 38 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/assert.h" 42 #include "spdk/env.h" 43 #include "spdk/util.h" 44 #include "spdk_internal/log.h" 45 46 #define BLOBFS_TRACE(file, str, args...) \ 47 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args) 48 49 #define BLOBFS_TRACE_RW(file, str, args...) \ 50 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args) 51 52 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 53 54 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 55 static struct spdk_mempool *g_cache_pool; 56 static TAILQ_HEAD(, spdk_file) g_caches; 57 static int g_fs_count = 0; 58 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 59 static pthread_spinlock_t g_caches_lock; 60 61 static void 62 __sem_post(void *arg, int bserrno) 63 { 64 sem_t *sem = arg; 65 66 sem_post(sem); 67 } 68 69 void 70 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 71 { 72 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 73 free(cache_buffer); 74 } 75 76 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 77 78 struct spdk_file { 79 struct spdk_filesystem *fs; 80 struct spdk_blob *blob; 81 char *name; 82 uint64_t length; 83 bool is_deleted; 84 bool open_for_writing; 85 uint64_t length_flushed; 86 uint64_t append_pos; 87 uint64_t seq_byte_count; 88 uint64_t next_seq_offset; 89 uint32_t priority; 90 TAILQ_ENTRY(spdk_file) tailq; 91 spdk_blob_id blobid; 92 uint32_t ref_count; 93 pthread_spinlock_t lock; 94 struct cache_buffer *last; 95 struct cache_tree *tree; 96 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 97 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 98 TAILQ_ENTRY(spdk_file) cache_tailq; 99 }; 100 101 struct spdk_deleted_file { 102 spdk_blob_id id; 103 TAILQ_ENTRY(spdk_deleted_file) tailq; 104 }; 105 106 struct spdk_filesystem { 107 struct spdk_blob_store *bs; 108 TAILQ_HEAD(, spdk_file) files; 109 struct spdk_bs_opts bs_opts; 110 struct spdk_bs_dev *bdev; 111 fs_send_request_fn send_request; 112 113 struct { 114 uint32_t max_ops; 115 struct spdk_io_channel *sync_io_channel; 116 struct spdk_fs_channel *sync_fs_channel; 117 } sync_target; 118 119 struct { 120 uint32_t max_ops; 121 struct spdk_io_channel *md_io_channel; 122 struct spdk_fs_channel *md_fs_channel; 123 } md_target; 124 125 struct { 126 uint32_t max_ops; 127 } io_target; 128 }; 129 130 struct spdk_fs_cb_args { 131 union { 132 spdk_fs_op_with_handle_complete fs_op_with_handle; 133 spdk_fs_op_complete fs_op; 134 spdk_file_op_with_handle_complete file_op_with_handle; 135 spdk_file_op_complete file_op; 136 spdk_file_stat_op_complete stat_op; 137 } fn; 138 void *arg; 139 sem_t *sem; 140 struct spdk_filesystem *fs; 141 struct spdk_file *file; 142 int rc; 143 bool from_request; 144 union { 145 struct { 146 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 147 } fs_load; 148 struct { 149 uint64_t length; 150 } truncate; 151 struct { 152 struct spdk_io_channel *channel; 153 void *user_buf; 154 void *pin_buf; 155 int is_read; 156 off_t offset; 157 size_t length; 158 uint64_t start_page; 159 uint64_t num_pages; 160 uint32_t blocklen; 161 } rw; 162 struct { 163 const char *old_name; 164 const char *new_name; 165 } rename; 166 struct { 167 struct cache_buffer *cache_buffer; 168 uint64_t length; 169 } flush; 170 struct { 171 struct cache_buffer *cache_buffer; 172 uint64_t length; 173 uint64_t offset; 174 } readahead; 175 struct { 176 uint64_t offset; 177 TAILQ_ENTRY(spdk_fs_request) tailq; 178 bool xattr_in_progress; 179 } sync; 180 struct { 181 uint32_t num_clusters; 182 } resize; 183 struct { 184 const char *name; 185 uint32_t flags; 186 TAILQ_ENTRY(spdk_fs_request) tailq; 187 } open; 188 struct { 189 const char *name; 190 } create; 191 struct { 192 const char *name; 193 } delete; 194 struct { 195 const char *name; 196 } stat; 197 } op; 198 }; 199 200 static void cache_free_buffers(struct spdk_file *file); 201 202 static void 203 __initialize_cache(void) 204 { 205 assert(g_cache_pool == NULL); 206 207 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 208 g_fs_cache_size / CACHE_BUFFER_SIZE, 209 CACHE_BUFFER_SIZE, 210 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 211 SPDK_ENV_SOCKET_ID_ANY); 212 TAILQ_INIT(&g_caches); 213 pthread_spin_init(&g_caches_lock, 0); 214 } 215 216 static void 217 __free_cache(void) 218 { 219 assert(g_cache_pool != NULL); 220 221 spdk_mempool_free(g_cache_pool); 222 g_cache_pool = NULL; 223 } 224 225 static uint64_t 226 __file_get_blob_size(struct spdk_file *file) 227 { 228 uint64_t cluster_sz; 229 230 cluster_sz = file->fs->bs_opts.cluster_sz; 231 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 232 } 233 234 struct spdk_fs_request { 235 struct spdk_fs_cb_args args; 236 TAILQ_ENTRY(spdk_fs_request) link; 237 struct spdk_fs_channel *channel; 238 }; 239 240 struct spdk_fs_channel { 241 struct spdk_fs_request *req_mem; 242 TAILQ_HEAD(, spdk_fs_request) reqs; 243 sem_t sem; 244 struct spdk_filesystem *fs; 245 struct spdk_io_channel *bs_channel; 246 fs_send_request_fn send_request; 247 bool sync; 248 pthread_spinlock_t lock; 249 }; 250 251 static struct spdk_fs_request * 252 alloc_fs_request(struct spdk_fs_channel *channel) 253 { 254 struct spdk_fs_request *req; 255 256 if (channel->sync) { 257 pthread_spin_lock(&channel->lock); 258 } 259 260 req = TAILQ_FIRST(&channel->reqs); 261 if (req) { 262 TAILQ_REMOVE(&channel->reqs, req, link); 263 } 264 265 if (channel->sync) { 266 pthread_spin_unlock(&channel->lock); 267 } 268 269 if (req == NULL) { 270 return NULL; 271 } 272 memset(req, 0, sizeof(*req)); 273 req->channel = channel; 274 req->args.from_request = true; 275 276 return req; 277 } 278 279 static void 280 free_fs_request(struct spdk_fs_request *req) 281 { 282 struct spdk_fs_channel *channel = req->channel; 283 284 if (channel->sync) { 285 pthread_spin_lock(&channel->lock); 286 } 287 288 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 289 290 if (channel->sync) { 291 pthread_spin_unlock(&channel->lock); 292 } 293 } 294 295 static int 296 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 297 uint32_t max_ops) 298 { 299 uint32_t i; 300 301 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 302 if (!channel->req_mem) { 303 return -1; 304 } 305 306 TAILQ_INIT(&channel->reqs); 307 sem_init(&channel->sem, 0, 0); 308 309 for (i = 0; i < max_ops; i++) { 310 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 311 } 312 313 channel->fs = fs; 314 315 return 0; 316 } 317 318 static int 319 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 320 { 321 struct spdk_filesystem *fs; 322 struct spdk_fs_channel *channel = ctx_buf; 323 324 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 325 326 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 327 } 328 329 static int 330 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 331 { 332 struct spdk_filesystem *fs; 333 struct spdk_fs_channel *channel = ctx_buf; 334 335 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 336 337 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 338 } 339 340 static int 341 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 342 { 343 struct spdk_filesystem *fs; 344 struct spdk_fs_channel *channel = ctx_buf; 345 346 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 347 348 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 349 } 350 351 static void 352 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 353 { 354 struct spdk_fs_channel *channel = ctx_buf; 355 356 free(channel->req_mem); 357 if (channel->bs_channel != NULL) { 358 spdk_bs_free_io_channel(channel->bs_channel); 359 } 360 } 361 362 static void 363 __send_request_direct(fs_request_fn fn, void *arg) 364 { 365 fn(arg); 366 } 367 368 static void 369 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 370 { 371 fs->bs = bs; 372 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 373 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 374 fs->md_target.md_fs_channel->send_request = __send_request_direct; 375 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 376 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 377 378 pthread_mutex_lock(&g_cache_init_lock); 379 if (g_fs_count == 0) { 380 __initialize_cache(); 381 } 382 g_fs_count++; 383 pthread_mutex_unlock(&g_cache_init_lock); 384 } 385 386 static void 387 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 388 { 389 struct spdk_fs_request *req = ctx; 390 struct spdk_fs_cb_args *args = &req->args; 391 struct spdk_filesystem *fs = args->fs; 392 393 if (bserrno == 0) { 394 common_fs_bs_init(fs, bs); 395 } else { 396 free(fs); 397 fs = NULL; 398 } 399 400 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 401 free_fs_request(req); 402 } 403 404 static struct spdk_filesystem * 405 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 406 { 407 struct spdk_filesystem *fs; 408 409 fs = calloc(1, sizeof(*fs)); 410 if (fs == NULL) { 411 return NULL; 412 } 413 414 fs->bdev = dev; 415 fs->send_request = send_request_fn; 416 TAILQ_INIT(&fs->files); 417 418 fs->md_target.max_ops = 512; 419 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 420 sizeof(struct spdk_fs_channel)); 421 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 422 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 423 424 fs->sync_target.max_ops = 512; 425 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 426 sizeof(struct spdk_fs_channel)); 427 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 428 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 429 430 fs->io_target.max_ops = 512; 431 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 432 sizeof(struct spdk_fs_channel)); 433 434 return fs; 435 } 436 437 void 438 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 439 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 440 { 441 struct spdk_filesystem *fs; 442 struct spdk_fs_request *req; 443 struct spdk_fs_cb_args *args; 444 struct spdk_bs_opts opts = {}; 445 446 fs = fs_alloc(dev, send_request_fn); 447 if (fs == NULL) { 448 cb_fn(cb_arg, NULL, -ENOMEM); 449 return; 450 } 451 452 req = alloc_fs_request(fs->md_target.md_fs_channel); 453 if (req == NULL) { 454 spdk_put_io_channel(fs->md_target.md_io_channel); 455 spdk_io_device_unregister(&fs->md_target, NULL); 456 spdk_put_io_channel(fs->sync_target.sync_io_channel); 457 spdk_io_device_unregister(&fs->sync_target, NULL); 458 spdk_io_device_unregister(&fs->io_target, NULL); 459 free(fs); 460 cb_fn(cb_arg, NULL, -ENOMEM); 461 return; 462 } 463 464 args = &req->args; 465 args->fn.fs_op_with_handle = cb_fn; 466 args->arg = cb_arg; 467 args->fs = fs; 468 469 spdk_bs_opts_init(&opts); 470 strncpy(opts.bstype.bstype, "BLOBFS", SPDK_BLOBSTORE_TYPE_LENGTH); 471 472 spdk_bs_init(dev, &opts, init_cb, req); 473 } 474 475 static struct spdk_file * 476 file_alloc(struct spdk_filesystem *fs) 477 { 478 struct spdk_file *file; 479 480 file = calloc(1, sizeof(*file)); 481 if (file == NULL) { 482 return NULL; 483 } 484 485 file->tree = calloc(1, sizeof(*file->tree)); 486 if (file->tree == NULL) { 487 free(file); 488 return NULL; 489 } 490 491 file->fs = fs; 492 TAILQ_INIT(&file->open_requests); 493 TAILQ_INIT(&file->sync_requests); 494 pthread_spin_init(&file->lock, 0); 495 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 496 file->priority = SPDK_FILE_PRIORITY_LOW; 497 return file; 498 } 499 500 static void iter_delete_cb(void *ctx, int bserrno); 501 502 static int 503 _handle_deleted_files(struct spdk_fs_request *req) 504 { 505 struct spdk_fs_cb_args *args = &req->args; 506 struct spdk_filesystem *fs = args->fs; 507 508 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 509 struct spdk_deleted_file *deleted_file; 510 511 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 512 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 513 spdk_bs_md_delete_blob(fs->bs, deleted_file->id, iter_delete_cb, req); 514 free(deleted_file); 515 return 0; 516 } 517 518 return 1; 519 } 520 521 static void 522 iter_delete_cb(void *ctx, int bserrno) 523 { 524 struct spdk_fs_request *req = ctx; 525 struct spdk_fs_cb_args *args = &req->args; 526 struct spdk_filesystem *fs = args->fs; 527 528 if (_handle_deleted_files(req) == 0) 529 return; 530 531 args->fn.fs_op_with_handle(args->arg, fs, 0); 532 free_fs_request(req); 533 534 } 535 536 static void 537 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 538 { 539 struct spdk_fs_request *req = ctx; 540 struct spdk_fs_cb_args *args = &req->args; 541 struct spdk_filesystem *fs = args->fs; 542 uint64_t *length; 543 const char *name; 544 uint32_t *is_deleted; 545 size_t value_len; 546 547 if (rc == -ENOENT) { 548 /* Finished iterating */ 549 if (_handle_deleted_files(req) == 0) 550 return; 551 args->fn.fs_op_with_handle(args->arg, fs, 0); 552 free_fs_request(req); 553 return; 554 } else if (rc < 0) { 555 args->fn.fs_op_with_handle(args->arg, fs, rc); 556 free_fs_request(req); 557 return; 558 } 559 560 rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len); 561 if (rc < 0) { 562 args->fn.fs_op_with_handle(args->arg, fs, rc); 563 free_fs_request(req); 564 return; 565 } 566 567 rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len); 568 if (rc < 0) { 569 args->fn.fs_op_with_handle(args->arg, fs, rc); 570 free_fs_request(req); 571 return; 572 } 573 574 assert(value_len == 8); 575 576 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 577 rc = spdk_bs_md_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 578 if (rc < 0) { 579 struct spdk_file *f; 580 581 f = file_alloc(fs); 582 if (f == NULL) { 583 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 584 free_fs_request(req); 585 return; 586 } 587 588 f->name = strdup(name); 589 f->blobid = spdk_blob_get_id(blob); 590 f->length = *length; 591 f->length_flushed = *length; 592 f->append_pos = *length; 593 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 594 } else { 595 struct spdk_deleted_file *deleted_file; 596 597 deleted_file = calloc(1, sizeof(*deleted_file)); 598 if (deleted_file == NULL) { 599 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 600 free_fs_request(req); 601 return; 602 } 603 deleted_file->id = spdk_blob_get_id(blob); 604 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 605 } 606 607 spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req); 608 } 609 610 static void 611 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 612 { 613 struct spdk_fs_request *req = ctx; 614 struct spdk_fs_cb_args *args = &req->args; 615 struct spdk_filesystem *fs = args->fs; 616 struct spdk_bs_type bstype; 617 static const char blobfs_type[SPDK_BLOBSTORE_TYPE_LENGTH] = {"BLOBFS"}; 618 static const char zeros[SPDK_BLOBSTORE_TYPE_LENGTH]; 619 620 if (bserrno != 0) { 621 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 622 free_fs_request(req); 623 free(fs); 624 return; 625 } 626 627 bstype = spdk_bs_get_bstype(bs); 628 629 if (!memcmp(&bstype, zeros, SPDK_BLOBSTORE_TYPE_LENGTH)) { 630 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "assigning bstype"); 631 snprintf(bstype.bstype, SPDK_BLOBSTORE_TYPE_LENGTH, blobfs_type); 632 spdk_bs_set_bstype(bs, bstype); 633 } else if (strncmp(bstype.bstype, blobfs_type, SPDK_BLOBSTORE_TYPE_LENGTH)) { 634 SPDK_DEBUGLOG(SPDK_TRACE_BLOB, "not blobfs: %s", bstype.bstype); 635 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 636 free_fs_request(req); 637 free(fs); 638 return; 639 } 640 641 common_fs_bs_init(fs, bs); 642 spdk_bs_md_iter_first(fs->bs, iter_cb, req); 643 } 644 645 void 646 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 647 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 648 { 649 struct spdk_filesystem *fs; 650 struct spdk_fs_cb_args *args; 651 struct spdk_fs_request *req; 652 struct spdk_bs_opts opts = {}; 653 654 fs = fs_alloc(dev, send_request_fn); 655 if (fs == NULL) { 656 cb_fn(cb_arg, NULL, -ENOMEM); 657 return; 658 } 659 660 req = alloc_fs_request(fs->md_target.md_fs_channel); 661 if (req == NULL) { 662 spdk_put_io_channel(fs->md_target.md_io_channel); 663 spdk_io_device_unregister(&fs->md_target, NULL); 664 spdk_put_io_channel(fs->sync_target.sync_io_channel); 665 spdk_io_device_unregister(&fs->sync_target, NULL); 666 spdk_io_device_unregister(&fs->io_target, NULL); 667 free(fs); 668 cb_fn(cb_arg, NULL, -ENOMEM); 669 return; 670 } 671 672 args = &req->args; 673 args->fn.fs_op_with_handle = cb_fn; 674 args->arg = cb_arg; 675 args->fs = fs; 676 TAILQ_INIT(&args->op.fs_load.deleted_files); 677 678 spdk_bs_opts_init(&opts); 679 680 spdk_bs_load(dev, &opts, load_cb, req); 681 } 682 683 static void 684 unload_cb(void *ctx, int bserrno) 685 { 686 struct spdk_fs_request *req = ctx; 687 struct spdk_fs_cb_args *args = &req->args; 688 struct spdk_filesystem *fs = args->fs; 689 690 pthread_mutex_lock(&g_cache_init_lock); 691 g_fs_count--; 692 if (g_fs_count == 0) { 693 __free_cache(); 694 } 695 pthread_mutex_unlock(&g_cache_init_lock); 696 697 args->fn.fs_op(args->arg, bserrno); 698 free(req); 699 700 spdk_io_device_unregister(&fs->io_target, NULL); 701 spdk_io_device_unregister(&fs->sync_target, NULL); 702 spdk_io_device_unregister(&fs->md_target, NULL); 703 704 free(fs); 705 } 706 707 void 708 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 709 { 710 struct spdk_fs_request *req; 711 struct spdk_fs_cb_args *args; 712 713 /* 714 * We must free the md_channel before unloading the blobstore, so just 715 * allocate this request from the general heap. 716 */ 717 req = calloc(1, sizeof(*req)); 718 if (req == NULL) { 719 cb_fn(cb_arg, -ENOMEM); 720 return; 721 } 722 723 args = &req->args; 724 args->fn.fs_op = cb_fn; 725 args->arg = cb_arg; 726 args->fs = fs; 727 728 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 729 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 730 spdk_bs_unload(fs->bs, unload_cb, req); 731 } 732 733 static struct spdk_file * 734 fs_find_file(struct spdk_filesystem *fs, const char *name) 735 { 736 struct spdk_file *file; 737 738 TAILQ_FOREACH(file, &fs->files, tailq) { 739 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 740 return file; 741 } 742 } 743 744 return NULL; 745 } 746 747 void 748 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 749 spdk_file_stat_op_complete cb_fn, void *cb_arg) 750 { 751 struct spdk_file_stat stat; 752 struct spdk_file *f = NULL; 753 754 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 755 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 756 return; 757 } 758 759 f = fs_find_file(fs, name); 760 if (f != NULL) { 761 stat.blobid = f->blobid; 762 stat.size = f->length; 763 cb_fn(cb_arg, &stat, 0); 764 return; 765 } 766 767 cb_fn(cb_arg, NULL, -ENOENT); 768 } 769 770 static void 771 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 772 { 773 struct spdk_fs_request *req = arg; 774 struct spdk_fs_cb_args *args = &req->args; 775 776 args->rc = fserrno; 777 if (fserrno == 0) { 778 memcpy(args->arg, stat, sizeof(*stat)); 779 } 780 sem_post(args->sem); 781 } 782 783 static void 784 __file_stat(void *arg) 785 { 786 struct spdk_fs_request *req = arg; 787 struct spdk_fs_cb_args *args = &req->args; 788 789 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 790 args->fn.stat_op, req); 791 } 792 793 int 794 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 795 const char *name, struct spdk_file_stat *stat) 796 { 797 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 798 struct spdk_fs_request *req; 799 int rc; 800 801 req = alloc_fs_request(channel); 802 assert(req != NULL); 803 804 req->args.fs = fs; 805 req->args.op.stat.name = name; 806 req->args.fn.stat_op = __copy_stat; 807 req->args.arg = stat; 808 req->args.sem = &channel->sem; 809 channel->send_request(__file_stat, req); 810 sem_wait(&channel->sem); 811 812 rc = req->args.rc; 813 free_fs_request(req); 814 815 return rc; 816 } 817 818 static void 819 fs_create_blob_close_cb(void *ctx, int bserrno) 820 { 821 struct spdk_fs_request *req = ctx; 822 struct spdk_fs_cb_args *args = &req->args; 823 824 args->fn.file_op(args->arg, bserrno); 825 free_fs_request(req); 826 } 827 828 static void 829 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 830 { 831 struct spdk_fs_request *req = ctx; 832 struct spdk_fs_cb_args *args = &req->args; 833 struct spdk_file *f = args->file; 834 uint64_t length = 0; 835 836 f->blob = blob; 837 spdk_bs_md_resize_blob(blob, 1); 838 spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 839 spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length)); 840 841 spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args); 842 } 843 844 static void 845 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 846 { 847 struct spdk_fs_request *req = ctx; 848 struct spdk_fs_cb_args *args = &req->args; 849 struct spdk_file *f = args->file; 850 851 f->blobid = blobid; 852 spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 853 } 854 855 void 856 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 857 spdk_file_op_complete cb_fn, void *cb_arg) 858 { 859 struct spdk_file *file; 860 struct spdk_fs_request *req; 861 struct spdk_fs_cb_args *args; 862 863 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 864 cb_fn(cb_arg, -ENAMETOOLONG); 865 return; 866 } 867 868 file = fs_find_file(fs, name); 869 if (file != NULL) { 870 cb_fn(cb_arg, -EEXIST); 871 return; 872 } 873 874 file = file_alloc(fs); 875 if (file == NULL) { 876 cb_fn(cb_arg, -ENOMEM); 877 return; 878 } 879 880 req = alloc_fs_request(fs->md_target.md_fs_channel); 881 if (req == NULL) { 882 cb_fn(cb_arg, -ENOMEM); 883 return; 884 } 885 886 args = &req->args; 887 args->file = file; 888 args->fn.file_op = cb_fn; 889 args->arg = cb_arg; 890 891 file->name = strdup(name); 892 spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args); 893 } 894 895 static void 896 __fs_create_file_done(void *arg, int fserrno) 897 { 898 struct spdk_fs_request *req = arg; 899 struct spdk_fs_cb_args *args = &req->args; 900 901 args->rc = fserrno; 902 sem_post(args->sem); 903 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 904 } 905 906 static void 907 __fs_create_file(void *arg) 908 { 909 struct spdk_fs_request *req = arg; 910 struct spdk_fs_cb_args *args = &req->args; 911 912 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 913 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 914 } 915 916 int 917 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 918 { 919 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 920 struct spdk_fs_request *req; 921 struct spdk_fs_cb_args *args; 922 int rc; 923 924 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 925 926 req = alloc_fs_request(channel); 927 assert(req != NULL); 928 929 args = &req->args; 930 args->fs = fs; 931 args->op.create.name = name; 932 args->sem = &channel->sem; 933 fs->send_request(__fs_create_file, req); 934 sem_wait(&channel->sem); 935 rc = args->rc; 936 free_fs_request(req); 937 938 return rc; 939 } 940 941 static void 942 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 943 { 944 struct spdk_fs_request *req = ctx; 945 struct spdk_fs_cb_args *args = &req->args; 946 struct spdk_file *f = args->file; 947 948 f->blob = blob; 949 while (!TAILQ_EMPTY(&f->open_requests)) { 950 req = TAILQ_FIRST(&f->open_requests); 951 args = &req->args; 952 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 953 args->fn.file_op_with_handle(args->arg, f, bserrno); 954 free_fs_request(req); 955 } 956 } 957 958 static void 959 fs_open_blob_create_cb(void *ctx, int bserrno) 960 { 961 struct spdk_fs_request *req = ctx; 962 struct spdk_fs_cb_args *args = &req->args; 963 struct spdk_file *file = args->file; 964 struct spdk_filesystem *fs = args->fs; 965 966 if (file == NULL) { 967 /* 968 * This is from an open with CREATE flag - the file 969 * is now created so look it up in the file list for this 970 * filesystem. 971 */ 972 file = fs_find_file(fs, args->op.open.name); 973 assert(file != NULL); 974 args->file = file; 975 } 976 977 file->ref_count++; 978 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 979 if (file->ref_count == 1) { 980 assert(file->blob == NULL); 981 spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 982 } else if (file->blob != NULL) { 983 fs_open_blob_done(req, file->blob, 0); 984 } else { 985 /* 986 * The blob open for this file is in progress due to a previous 987 * open request. When that open completes, it will invoke the 988 * open callback for this request. 989 */ 990 } 991 } 992 993 void 994 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 995 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 996 { 997 struct spdk_file *f = NULL; 998 struct spdk_fs_request *req; 999 struct spdk_fs_cb_args *args; 1000 1001 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1002 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1003 return; 1004 } 1005 1006 f = fs_find_file(fs, name); 1007 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1008 cb_fn(cb_arg, NULL, -ENOENT); 1009 return; 1010 } 1011 1012 if (f != NULL && f->is_deleted == true) { 1013 cb_fn(cb_arg, NULL, -ENOENT); 1014 return; 1015 } 1016 1017 req = alloc_fs_request(fs->md_target.md_fs_channel); 1018 if (req == NULL) { 1019 cb_fn(cb_arg, NULL, -ENOMEM); 1020 return; 1021 } 1022 1023 args = &req->args; 1024 args->fn.file_op_with_handle = cb_fn; 1025 args->arg = cb_arg; 1026 args->file = f; 1027 args->fs = fs; 1028 args->op.open.name = name; 1029 1030 if (f == NULL) { 1031 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1032 } else { 1033 fs_open_blob_create_cb(req, 0); 1034 } 1035 } 1036 1037 static void 1038 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1039 { 1040 struct spdk_fs_request *req = arg; 1041 struct spdk_fs_cb_args *args = &req->args; 1042 1043 args->file = file; 1044 args->rc = bserrno; 1045 sem_post(args->sem); 1046 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 1047 } 1048 1049 static void 1050 __fs_open_file(void *arg) 1051 { 1052 struct spdk_fs_request *req = arg; 1053 struct spdk_fs_cb_args *args = &req->args; 1054 1055 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 1056 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1057 __fs_open_file_done, req); 1058 } 1059 1060 int 1061 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1062 const char *name, uint32_t flags, struct spdk_file **file) 1063 { 1064 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1065 struct spdk_fs_request *req; 1066 struct spdk_fs_cb_args *args; 1067 int rc; 1068 1069 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1070 1071 req = alloc_fs_request(channel); 1072 assert(req != NULL); 1073 1074 args = &req->args; 1075 args->fs = fs; 1076 args->op.open.name = name; 1077 args->op.open.flags = flags; 1078 args->sem = &channel->sem; 1079 fs->send_request(__fs_open_file, req); 1080 sem_wait(&channel->sem); 1081 rc = args->rc; 1082 if (rc == 0) { 1083 *file = args->file; 1084 } else { 1085 *file = NULL; 1086 } 1087 free_fs_request(req); 1088 1089 return rc; 1090 } 1091 1092 static void 1093 fs_rename_blob_close_cb(void *ctx, int bserrno) 1094 { 1095 struct spdk_fs_request *req = ctx; 1096 struct spdk_fs_cb_args *args = &req->args; 1097 1098 args->fn.fs_op(args->arg, bserrno); 1099 free_fs_request(req); 1100 } 1101 1102 static void 1103 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1104 { 1105 struct spdk_fs_request *req = ctx; 1106 struct spdk_fs_cb_args *args = &req->args; 1107 struct spdk_file *f = args->file; 1108 const char *new_name = args->op.rename.new_name; 1109 1110 f->blob = blob; 1111 spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1112 spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req); 1113 } 1114 1115 static void 1116 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1117 { 1118 struct spdk_fs_cb_args *args = &req->args; 1119 struct spdk_file *f; 1120 1121 f = fs_find_file(args->fs, args->op.rename.old_name); 1122 if (f == NULL) { 1123 args->fn.fs_op(args->arg, -ENOENT); 1124 free_fs_request(req); 1125 return; 1126 } 1127 1128 free(f->name); 1129 f->name = strdup(args->op.rename.new_name); 1130 args->file = f; 1131 spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1132 } 1133 1134 static void 1135 fs_rename_delete_done(void *arg, int fserrno) 1136 { 1137 __spdk_fs_md_rename_file(arg); 1138 } 1139 1140 void 1141 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1142 const char *old_name, const char *new_name, 1143 spdk_file_op_complete cb_fn, void *cb_arg) 1144 { 1145 struct spdk_file *f; 1146 struct spdk_fs_request *req; 1147 struct spdk_fs_cb_args *args; 1148 1149 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1150 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1151 cb_fn(cb_arg, -ENAMETOOLONG); 1152 return; 1153 } 1154 1155 req = alloc_fs_request(fs->md_target.md_fs_channel); 1156 if (req == NULL) { 1157 cb_fn(cb_arg, -ENOMEM); 1158 return; 1159 } 1160 1161 args = &req->args; 1162 args->fn.fs_op = cb_fn; 1163 args->fs = fs; 1164 args->arg = cb_arg; 1165 args->op.rename.old_name = old_name; 1166 args->op.rename.new_name = new_name; 1167 1168 f = fs_find_file(fs, new_name); 1169 if (f == NULL) { 1170 __spdk_fs_md_rename_file(req); 1171 return; 1172 } 1173 1174 /* 1175 * The rename overwrites an existing file. So delete the existing file, then 1176 * do the actual rename. 1177 */ 1178 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1179 } 1180 1181 static void 1182 __fs_rename_file_done(void *arg, int fserrno) 1183 { 1184 struct spdk_fs_request *req = arg; 1185 struct spdk_fs_cb_args *args = &req->args; 1186 1187 args->rc = fserrno; 1188 sem_post(args->sem); 1189 } 1190 1191 static void 1192 __fs_rename_file(void *arg) 1193 { 1194 struct spdk_fs_request *req = arg; 1195 struct spdk_fs_cb_args *args = &req->args; 1196 1197 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1198 __fs_rename_file_done, req); 1199 } 1200 1201 int 1202 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1203 const char *old_name, const char *new_name) 1204 { 1205 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1206 struct spdk_fs_request *req; 1207 struct spdk_fs_cb_args *args; 1208 int rc; 1209 1210 req = alloc_fs_request(channel); 1211 assert(req != NULL); 1212 1213 args = &req->args; 1214 1215 args->fs = fs; 1216 args->op.rename.old_name = old_name; 1217 args->op.rename.new_name = new_name; 1218 args->sem = &channel->sem; 1219 fs->send_request(__fs_rename_file, req); 1220 sem_wait(&channel->sem); 1221 rc = args->rc; 1222 free_fs_request(req); 1223 return rc; 1224 } 1225 1226 static void 1227 blob_delete_cb(void *ctx, int bserrno) 1228 { 1229 struct spdk_fs_request *req = ctx; 1230 struct spdk_fs_cb_args *args = &req->args; 1231 1232 args->fn.file_op(args->arg, bserrno); 1233 free_fs_request(req); 1234 } 1235 1236 void 1237 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1238 spdk_file_op_complete cb_fn, void *cb_arg) 1239 { 1240 struct spdk_file *f; 1241 spdk_blob_id blobid; 1242 struct spdk_fs_request *req; 1243 struct spdk_fs_cb_args *args; 1244 1245 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1246 1247 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1248 cb_fn(cb_arg, -ENAMETOOLONG); 1249 return; 1250 } 1251 1252 f = fs_find_file(fs, name); 1253 if (f == NULL) { 1254 cb_fn(cb_arg, -ENOENT); 1255 return; 1256 } 1257 1258 req = alloc_fs_request(fs->md_target.md_fs_channel); 1259 if (req == NULL) { 1260 cb_fn(cb_arg, -ENOMEM); 1261 return; 1262 } 1263 1264 args = &req->args; 1265 args->fn.file_op = cb_fn; 1266 args->arg = cb_arg; 1267 1268 if (f->ref_count > 0) { 1269 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1270 f->is_deleted = true; 1271 spdk_blob_md_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1272 spdk_bs_md_sync_blob(f->blob, blob_delete_cb, args); 1273 return; 1274 } 1275 1276 TAILQ_REMOVE(&fs->files, f, tailq); 1277 1278 cache_free_buffers(f); 1279 1280 blobid = f->blobid; 1281 1282 free(f->name); 1283 free(f->tree); 1284 free(f); 1285 1286 spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1287 } 1288 1289 static void 1290 __fs_delete_file_done(void *arg, int fserrno) 1291 { 1292 struct spdk_fs_request *req = arg; 1293 struct spdk_fs_cb_args *args = &req->args; 1294 1295 args->rc = fserrno; 1296 sem_post(args->sem); 1297 } 1298 1299 static void 1300 __fs_delete_file(void *arg) 1301 { 1302 struct spdk_fs_request *req = arg; 1303 struct spdk_fs_cb_args *args = &req->args; 1304 1305 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1306 } 1307 1308 int 1309 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1310 const char *name) 1311 { 1312 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1313 struct spdk_fs_request *req; 1314 struct spdk_fs_cb_args *args; 1315 int rc; 1316 1317 req = alloc_fs_request(channel); 1318 assert(req != NULL); 1319 1320 args = &req->args; 1321 args->fs = fs; 1322 args->op.delete.name = name; 1323 args->sem = &channel->sem; 1324 fs->send_request(__fs_delete_file, req); 1325 sem_wait(&channel->sem); 1326 rc = args->rc; 1327 free_fs_request(req); 1328 1329 return rc; 1330 } 1331 1332 spdk_fs_iter 1333 spdk_fs_iter_first(struct spdk_filesystem *fs) 1334 { 1335 struct spdk_file *f; 1336 1337 f = TAILQ_FIRST(&fs->files); 1338 return f; 1339 } 1340 1341 spdk_fs_iter 1342 spdk_fs_iter_next(spdk_fs_iter iter) 1343 { 1344 struct spdk_file *f = iter; 1345 1346 if (f == NULL) { 1347 return NULL; 1348 } 1349 1350 f = TAILQ_NEXT(f, tailq); 1351 return f; 1352 } 1353 1354 const char * 1355 spdk_file_get_name(struct spdk_file *file) 1356 { 1357 return file->name; 1358 } 1359 1360 uint64_t 1361 spdk_file_get_length(struct spdk_file *file) 1362 { 1363 assert(file != NULL); 1364 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1365 return file->length; 1366 } 1367 1368 static void 1369 fs_truncate_complete_cb(void *ctx, int bserrno) 1370 { 1371 struct spdk_fs_request *req = ctx; 1372 struct spdk_fs_cb_args *args = &req->args; 1373 1374 args->fn.file_op(args->arg, bserrno); 1375 free_fs_request(req); 1376 } 1377 1378 static uint64_t 1379 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1380 { 1381 return (length + cluster_sz - 1) / cluster_sz; 1382 } 1383 1384 void 1385 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1386 spdk_file_op_complete cb_fn, void *cb_arg) 1387 { 1388 struct spdk_filesystem *fs; 1389 size_t num_clusters; 1390 struct spdk_fs_request *req; 1391 struct spdk_fs_cb_args *args; 1392 1393 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1394 if (length == file->length) { 1395 cb_fn(cb_arg, 0); 1396 return; 1397 } 1398 1399 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1400 if (req == NULL) { 1401 cb_fn(cb_arg, -ENOMEM); 1402 return; 1403 } 1404 1405 args = &req->args; 1406 args->fn.file_op = cb_fn; 1407 args->arg = cb_arg; 1408 args->file = file; 1409 fs = file->fs; 1410 1411 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1412 1413 spdk_bs_md_resize_blob(file->blob, num_clusters); 1414 spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length)); 1415 1416 file->length = length; 1417 if (file->append_pos > file->length) { 1418 file->append_pos = file->length; 1419 } 1420 1421 spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args); 1422 } 1423 1424 static void 1425 __truncate(void *arg) 1426 { 1427 struct spdk_fs_request *req = arg; 1428 struct spdk_fs_cb_args *args = &req->args; 1429 1430 spdk_file_truncate_async(args->file, args->op.truncate.length, 1431 args->fn.file_op, args->arg); 1432 } 1433 1434 void 1435 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1436 uint64_t length) 1437 { 1438 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1439 struct spdk_fs_request *req; 1440 struct spdk_fs_cb_args *args; 1441 1442 req = alloc_fs_request(channel); 1443 assert(req != NULL); 1444 1445 args = &req->args; 1446 1447 args->file = file; 1448 args->op.truncate.length = length; 1449 args->fn.file_op = __sem_post; 1450 args->arg = &channel->sem; 1451 1452 channel->send_request(__truncate, req); 1453 sem_wait(&channel->sem); 1454 free_fs_request(req); 1455 } 1456 1457 static void 1458 __rw_done(void *ctx, int bserrno) 1459 { 1460 struct spdk_fs_request *req = ctx; 1461 struct spdk_fs_cb_args *args = &req->args; 1462 1463 spdk_dma_free(args->op.rw.pin_buf); 1464 args->fn.file_op(args->arg, bserrno); 1465 free_fs_request(req); 1466 } 1467 1468 static void 1469 __read_done(void *ctx, int bserrno) 1470 { 1471 struct spdk_fs_request *req = ctx; 1472 struct spdk_fs_cb_args *args = &req->args; 1473 1474 if (args->op.rw.is_read) { 1475 memcpy(args->op.rw.user_buf, 1476 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1477 args->op.rw.length); 1478 __rw_done(req, 0); 1479 } else { 1480 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1481 args->op.rw.user_buf, 1482 args->op.rw.length); 1483 spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel, 1484 args->op.rw.pin_buf, 1485 args->op.rw.start_page, args->op.rw.num_pages, 1486 __rw_done, req); 1487 } 1488 } 1489 1490 static void 1491 __do_blob_read(void *ctx, int fserrno) 1492 { 1493 struct spdk_fs_request *req = ctx; 1494 struct spdk_fs_cb_args *args = &req->args; 1495 1496 spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel, 1497 args->op.rw.pin_buf, 1498 args->op.rw.start_page, args->op.rw.num_pages, 1499 __read_done, req); 1500 } 1501 1502 static void 1503 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1504 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1505 { 1506 uint64_t end_page; 1507 1508 *page_size = spdk_bs_get_page_size(file->fs->bs); 1509 *start_page = offset / *page_size; 1510 end_page = (offset + length - 1) / *page_size; 1511 *num_pages = (end_page - *start_page + 1); 1512 } 1513 1514 static void 1515 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1516 void *payload, uint64_t offset, uint64_t length, 1517 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1518 { 1519 struct spdk_fs_request *req; 1520 struct spdk_fs_cb_args *args; 1521 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1522 uint64_t start_page, num_pages, pin_buf_length; 1523 uint32_t page_size; 1524 1525 if (is_read && offset + length > file->length) { 1526 cb_fn(cb_arg, -EINVAL); 1527 return; 1528 } 1529 1530 req = alloc_fs_request(channel); 1531 if (req == NULL) { 1532 cb_fn(cb_arg, -ENOMEM); 1533 return; 1534 } 1535 1536 args = &req->args; 1537 args->fn.file_op = cb_fn; 1538 args->arg = cb_arg; 1539 args->file = file; 1540 args->op.rw.channel = channel->bs_channel; 1541 args->op.rw.user_buf = payload; 1542 args->op.rw.is_read = is_read; 1543 args->op.rw.offset = offset; 1544 args->op.rw.length = length; 1545 1546 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1547 pin_buf_length = num_pages * page_size; 1548 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL); 1549 1550 args->op.rw.start_page = start_page; 1551 args->op.rw.num_pages = num_pages; 1552 1553 if (!is_read && file->length < offset + length) { 1554 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1555 } else { 1556 __do_blob_read(req, 0); 1557 } 1558 } 1559 1560 void 1561 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1562 void *payload, uint64_t offset, uint64_t length, 1563 spdk_file_op_complete cb_fn, void *cb_arg) 1564 { 1565 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1566 } 1567 1568 void 1569 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1570 void *payload, uint64_t offset, uint64_t length, 1571 spdk_file_op_complete cb_fn, void *cb_arg) 1572 { 1573 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n", 1574 file->name, offset, length); 1575 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1576 } 1577 1578 struct spdk_io_channel * 1579 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1580 { 1581 struct spdk_io_channel *io_channel; 1582 struct spdk_fs_channel *fs_channel; 1583 1584 io_channel = spdk_get_io_channel(&fs->io_target); 1585 fs_channel = spdk_io_channel_get_ctx(io_channel); 1586 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1587 fs_channel->send_request = __send_request_direct; 1588 1589 return io_channel; 1590 } 1591 1592 struct spdk_io_channel * 1593 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs) 1594 { 1595 struct spdk_io_channel *io_channel; 1596 struct spdk_fs_channel *fs_channel; 1597 1598 io_channel = spdk_get_io_channel(&fs->io_target); 1599 fs_channel = spdk_io_channel_get_ctx(io_channel); 1600 fs_channel->send_request = fs->send_request; 1601 fs_channel->sync = 1; 1602 pthread_spin_init(&fs_channel->lock, 0); 1603 1604 return io_channel; 1605 } 1606 1607 void 1608 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1609 { 1610 spdk_put_io_channel(channel); 1611 } 1612 1613 void 1614 spdk_fs_set_cache_size(uint64_t size_in_mb) 1615 { 1616 g_fs_cache_size = size_in_mb * 1024 * 1024; 1617 } 1618 1619 uint64_t 1620 spdk_fs_get_cache_size(void) 1621 { 1622 return g_fs_cache_size / (1024 * 1024); 1623 } 1624 1625 static void __file_flush(void *_args); 1626 1627 static void * 1628 alloc_cache_memory_buffer(struct spdk_file *context) 1629 { 1630 struct spdk_file *file; 1631 void *buf; 1632 1633 buf = spdk_mempool_get(g_cache_pool); 1634 if (buf != NULL) { 1635 return buf; 1636 } 1637 1638 pthread_spin_lock(&g_caches_lock); 1639 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1640 if (!file->open_for_writing && 1641 file->priority == SPDK_FILE_PRIORITY_LOW && 1642 file != context) { 1643 break; 1644 } 1645 } 1646 pthread_spin_unlock(&g_caches_lock); 1647 if (file != NULL) { 1648 cache_free_buffers(file); 1649 buf = spdk_mempool_get(g_cache_pool); 1650 if (buf != NULL) { 1651 return buf; 1652 } 1653 } 1654 1655 pthread_spin_lock(&g_caches_lock); 1656 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1657 if (!file->open_for_writing && file != context) { 1658 break; 1659 } 1660 } 1661 pthread_spin_unlock(&g_caches_lock); 1662 if (file != NULL) { 1663 cache_free_buffers(file); 1664 buf = spdk_mempool_get(g_cache_pool); 1665 if (buf != NULL) { 1666 return buf; 1667 } 1668 } 1669 1670 pthread_spin_lock(&g_caches_lock); 1671 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1672 if (file != context) { 1673 break; 1674 } 1675 } 1676 pthread_spin_unlock(&g_caches_lock); 1677 if (file != NULL) { 1678 cache_free_buffers(file); 1679 buf = spdk_mempool_get(g_cache_pool); 1680 if (buf != NULL) { 1681 return buf; 1682 } 1683 } 1684 1685 return NULL; 1686 } 1687 1688 static struct cache_buffer * 1689 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1690 { 1691 struct cache_buffer *buf; 1692 int count = 0; 1693 1694 buf = calloc(1, sizeof(*buf)); 1695 if (buf == NULL) { 1696 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "calloc failed\n"); 1697 return NULL; 1698 } 1699 1700 buf->buf = alloc_cache_memory_buffer(file); 1701 while (buf->buf == NULL) { 1702 /* 1703 * TODO: alloc_cache_memory_buffer() should eventually free 1704 * some buffers. Need a more sophisticated check here, instead 1705 * of just bailing if 100 tries does not result in getting a 1706 * free buffer. This will involve using the sync channel's 1707 * semaphore to block until a buffer becomes available. 1708 */ 1709 if (count++ == 100) { 1710 SPDK_ERRLOG("could not allocate cache buffer\n"); 1711 assert(false); 1712 free(buf); 1713 return NULL; 1714 } 1715 buf->buf = alloc_cache_memory_buffer(file); 1716 } 1717 1718 buf->buf_size = CACHE_BUFFER_SIZE; 1719 buf->offset = offset; 1720 1721 pthread_spin_lock(&g_caches_lock); 1722 if (file->tree->present_mask == 0) { 1723 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1724 } 1725 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1726 pthread_spin_unlock(&g_caches_lock); 1727 1728 return buf; 1729 } 1730 1731 static struct cache_buffer * 1732 cache_append_buffer(struct spdk_file *file) 1733 { 1734 struct cache_buffer *last; 1735 1736 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1737 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1738 1739 last = cache_insert_buffer(file, file->append_pos); 1740 if (last == NULL) { 1741 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n"); 1742 return NULL; 1743 } 1744 1745 file->last = last; 1746 1747 return last; 1748 } 1749 1750 static void 1751 __wake_caller(struct spdk_fs_cb_args *args) 1752 { 1753 sem_post(args->sem); 1754 } 1755 1756 static void __check_sync_reqs(struct spdk_file *file); 1757 1758 static void 1759 __file_cache_finish_sync(struct spdk_file *file) 1760 { 1761 struct spdk_fs_request *sync_req; 1762 struct spdk_fs_cb_args *sync_args; 1763 1764 pthread_spin_lock(&file->lock); 1765 sync_req = TAILQ_FIRST(&file->sync_requests); 1766 sync_args = &sync_req->args; 1767 assert(sync_args->op.sync.offset <= file->length_flushed); 1768 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1769 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1770 pthread_spin_unlock(&file->lock); 1771 1772 sync_args->fn.file_op(sync_args->arg, 0); 1773 __check_sync_reqs(file); 1774 1775 pthread_spin_lock(&file->lock); 1776 free_fs_request(sync_req); 1777 pthread_spin_unlock(&file->lock); 1778 } 1779 1780 static void 1781 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1782 { 1783 struct spdk_file *file = ctx; 1784 1785 __file_cache_finish_sync(file); 1786 } 1787 1788 static void 1789 __free_args(struct spdk_fs_cb_args *args) 1790 { 1791 struct spdk_fs_request *req; 1792 1793 if (!args->from_request) { 1794 free(args); 1795 } else { 1796 /* Depends on args being at the start of the spdk_fs_request structure. */ 1797 req = (struct spdk_fs_request *)args; 1798 free_fs_request(req); 1799 } 1800 } 1801 1802 static void 1803 __check_sync_reqs(struct spdk_file *file) 1804 { 1805 struct spdk_fs_request *sync_req; 1806 1807 pthread_spin_lock(&file->lock); 1808 1809 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1810 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1811 break; 1812 } 1813 } 1814 1815 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1816 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1817 sync_req->args.op.sync.xattr_in_progress = true; 1818 spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed, 1819 sizeof(file->length_flushed)); 1820 1821 pthread_spin_unlock(&file->lock); 1822 spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file); 1823 } else { 1824 pthread_spin_unlock(&file->lock); 1825 } 1826 } 1827 1828 static void 1829 __file_flush_done(void *arg, int bserrno) 1830 { 1831 struct spdk_fs_cb_args *args = arg; 1832 struct spdk_file *file = args->file; 1833 struct cache_buffer *next = args->op.flush.cache_buffer; 1834 1835 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1836 1837 pthread_spin_lock(&file->lock); 1838 next->in_progress = false; 1839 next->bytes_flushed += args->op.flush.length; 1840 file->length_flushed += args->op.flush.length; 1841 if (file->length_flushed > file->length) { 1842 file->length = file->length_flushed; 1843 } 1844 if (next->bytes_flushed == next->buf_size) { 1845 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1846 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1847 } 1848 1849 /* 1850 * Assert that there is no cached data that extends past the end of the underlying 1851 * blob. 1852 */ 1853 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1854 next->bytes_filled == 0); 1855 1856 pthread_spin_unlock(&file->lock); 1857 1858 __check_sync_reqs(file); 1859 1860 __file_flush(args); 1861 } 1862 1863 static void 1864 __file_flush(void *_args) 1865 { 1866 struct spdk_fs_cb_args *args = _args; 1867 struct spdk_file *file = args->file; 1868 struct cache_buffer *next; 1869 uint64_t offset, length, start_page, num_pages; 1870 uint32_t page_size; 1871 1872 pthread_spin_lock(&file->lock); 1873 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1874 if (next == NULL || next->in_progress) { 1875 /* 1876 * There is either no data to flush, or a flush I/O is already in 1877 * progress. So return immediately - if a flush I/O is in 1878 * progress we will flush more data after that is completed. 1879 */ 1880 __free_args(args); 1881 pthread_spin_unlock(&file->lock); 1882 return; 1883 } 1884 1885 offset = next->offset + next->bytes_flushed; 1886 length = next->bytes_filled - next->bytes_flushed; 1887 if (length == 0) { 1888 __free_args(args); 1889 pthread_spin_unlock(&file->lock); 1890 return; 1891 } 1892 args->op.flush.length = length; 1893 args->op.flush.cache_buffer = next; 1894 1895 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1896 1897 next->in_progress = true; 1898 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1899 offset, length, start_page, num_pages); 1900 pthread_spin_unlock(&file->lock); 1901 spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1902 next->buf + (start_page * page_size) - next->offset, 1903 start_page, num_pages, 1904 __file_flush_done, args); 1905 } 1906 1907 static void 1908 __file_extend_done(void *arg, int bserrno) 1909 { 1910 struct spdk_fs_cb_args *args = arg; 1911 1912 __wake_caller(args); 1913 } 1914 1915 static void 1916 __file_extend_blob(void *_args) 1917 { 1918 struct spdk_fs_cb_args *args = _args; 1919 struct spdk_file *file = args->file; 1920 1921 spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters); 1922 1923 spdk_bs_md_sync_blob(file->blob, __file_extend_done, args); 1924 } 1925 1926 static void 1927 __rw_from_file_done(void *arg, int bserrno) 1928 { 1929 struct spdk_fs_cb_args *args = arg; 1930 1931 __wake_caller(args); 1932 __free_args(args); 1933 } 1934 1935 static void 1936 __rw_from_file(void *_args) 1937 { 1938 struct spdk_fs_cb_args *args = _args; 1939 struct spdk_file *file = args->file; 1940 1941 if (args->op.rw.is_read) { 1942 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1943 args->op.rw.offset, args->op.rw.length, 1944 __rw_from_file_done, args); 1945 } else { 1946 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1947 args->op.rw.offset, args->op.rw.length, 1948 __rw_from_file_done, args); 1949 } 1950 } 1951 1952 static int 1953 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1954 uint64_t offset, uint64_t length, bool is_read) 1955 { 1956 struct spdk_fs_cb_args *args; 1957 1958 args = calloc(1, sizeof(*args)); 1959 if (args == NULL) { 1960 sem_post(sem); 1961 return -ENOMEM; 1962 } 1963 1964 args->file = file; 1965 args->sem = sem; 1966 args->op.rw.user_buf = payload; 1967 args->op.rw.offset = offset; 1968 args->op.rw.length = length; 1969 args->op.rw.is_read = is_read; 1970 file->fs->send_request(__rw_from_file, args); 1971 return 0; 1972 } 1973 1974 int 1975 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 1976 void *payload, uint64_t offset, uint64_t length) 1977 { 1978 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1979 struct spdk_fs_cb_args *args; 1980 uint64_t rem_length, copy, blob_size, cluster_sz; 1981 uint32_t cache_buffers_filled = 0; 1982 uint8_t *cur_payload; 1983 struct cache_buffer *last; 1984 1985 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 1986 1987 if (length == 0) { 1988 return 0; 1989 } 1990 1991 if (offset != file->append_pos) { 1992 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 1993 return -EINVAL; 1994 } 1995 1996 pthread_spin_lock(&file->lock); 1997 file->open_for_writing = true; 1998 1999 if (file->last == NULL) { 2000 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 2001 cache_append_buffer(file); 2002 } else { 2003 int rc; 2004 2005 file->append_pos += length; 2006 pthread_spin_unlock(&file->lock); 2007 rc = __send_rw_from_file(file, &channel->sem, payload, 2008 offset, length, false); 2009 sem_wait(&channel->sem); 2010 return rc; 2011 } 2012 } 2013 2014 blob_size = __file_get_blob_size(file); 2015 2016 if ((offset + length) > blob_size) { 2017 struct spdk_fs_cb_args extend_args = {}; 2018 2019 cluster_sz = file->fs->bs_opts.cluster_sz; 2020 extend_args.sem = &channel->sem; 2021 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2022 extend_args.file = file; 2023 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2024 pthread_spin_unlock(&file->lock); 2025 file->fs->send_request(__file_extend_blob, &extend_args); 2026 sem_wait(&channel->sem); 2027 } 2028 2029 last = file->last; 2030 rem_length = length; 2031 cur_payload = payload; 2032 while (rem_length > 0) { 2033 copy = last->buf_size - last->bytes_filled; 2034 if (copy > rem_length) { 2035 copy = rem_length; 2036 } 2037 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2038 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2039 file->append_pos += copy; 2040 if (file->length < file->append_pos) { 2041 file->length = file->append_pos; 2042 } 2043 cur_payload += copy; 2044 last->bytes_filled += copy; 2045 rem_length -= copy; 2046 if (last->bytes_filled == last->buf_size) { 2047 cache_buffers_filled++; 2048 last = cache_append_buffer(file); 2049 if (last == NULL) { 2050 BLOBFS_TRACE(file, "nomem\n"); 2051 pthread_spin_unlock(&file->lock); 2052 return -ENOMEM; 2053 } 2054 } 2055 } 2056 2057 if (cache_buffers_filled == 0) { 2058 pthread_spin_unlock(&file->lock); 2059 return 0; 2060 } 2061 2062 args = calloc(1, sizeof(*args)); 2063 if (args == NULL) { 2064 pthread_spin_unlock(&file->lock); 2065 return -ENOMEM; 2066 } 2067 2068 args->file = file; 2069 file->fs->send_request(__file_flush, args); 2070 pthread_spin_unlock(&file->lock); 2071 return 0; 2072 } 2073 2074 static void 2075 __readahead_done(void *arg, int bserrno) 2076 { 2077 struct spdk_fs_cb_args *args = arg; 2078 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2079 struct spdk_file *file = args->file; 2080 2081 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2082 2083 pthread_spin_lock(&file->lock); 2084 cache_buffer->bytes_filled = args->op.readahead.length; 2085 cache_buffer->bytes_flushed = args->op.readahead.length; 2086 cache_buffer->in_progress = false; 2087 pthread_spin_unlock(&file->lock); 2088 2089 __free_args(args); 2090 } 2091 2092 static void 2093 __readahead(void *_args) 2094 { 2095 struct spdk_fs_cb_args *args = _args; 2096 struct spdk_file *file = args->file; 2097 uint64_t offset, length, start_page, num_pages; 2098 uint32_t page_size; 2099 2100 offset = args->op.readahead.offset; 2101 length = args->op.readahead.length; 2102 assert(length > 0); 2103 2104 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 2105 2106 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2107 offset, length, start_page, num_pages); 2108 spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2109 args->op.readahead.cache_buffer->buf, 2110 start_page, num_pages, 2111 __readahead_done, args); 2112 } 2113 2114 static uint64_t 2115 __next_cache_buffer_offset(uint64_t offset) 2116 { 2117 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2118 } 2119 2120 static void 2121 check_readahead(struct spdk_file *file, uint64_t offset) 2122 { 2123 struct spdk_fs_cb_args *args; 2124 2125 offset = __next_cache_buffer_offset(offset); 2126 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2127 return; 2128 } 2129 2130 args = calloc(1, sizeof(*args)); 2131 if (args == NULL) { 2132 return; 2133 } 2134 2135 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2136 2137 args->file = file; 2138 args->op.readahead.offset = offset; 2139 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2140 args->op.readahead.cache_buffer->in_progress = true; 2141 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2142 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2143 } else { 2144 args->op.readahead.length = CACHE_BUFFER_SIZE; 2145 } 2146 file->fs->send_request(__readahead, args); 2147 } 2148 2149 static int 2150 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2151 { 2152 struct cache_buffer *buf; 2153 int rc; 2154 2155 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2156 if (buf == NULL) { 2157 pthread_spin_unlock(&file->lock); 2158 rc = __send_rw_from_file(file, sem, payload, offset, length, true); 2159 pthread_spin_lock(&file->lock); 2160 return rc; 2161 } 2162 2163 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2164 length = buf->offset + buf->bytes_filled - offset; 2165 } 2166 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2167 memcpy(payload, &buf->buf[offset - buf->offset], length); 2168 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2169 pthread_spin_lock(&g_caches_lock); 2170 spdk_tree_remove_buffer(file->tree, buf); 2171 if (file->tree->present_mask == 0) { 2172 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2173 } 2174 pthread_spin_unlock(&g_caches_lock); 2175 } 2176 2177 sem_post(sem); 2178 return 0; 2179 } 2180 2181 int64_t 2182 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2183 void *payload, uint64_t offset, uint64_t length) 2184 { 2185 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2186 uint64_t final_offset, final_length; 2187 uint32_t sub_reads = 0; 2188 int rc = 0; 2189 2190 pthread_spin_lock(&file->lock); 2191 2192 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2193 2194 file->open_for_writing = false; 2195 2196 if (length == 0 || offset >= file->append_pos) { 2197 pthread_spin_unlock(&file->lock); 2198 return 0; 2199 } 2200 2201 if (offset + length > file->append_pos) { 2202 length = file->append_pos - offset; 2203 } 2204 2205 if (offset != file->next_seq_offset) { 2206 file->seq_byte_count = 0; 2207 } 2208 file->seq_byte_count += length; 2209 file->next_seq_offset = offset + length; 2210 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2211 check_readahead(file, offset); 2212 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2213 } 2214 2215 final_length = 0; 2216 final_offset = offset + length; 2217 while (offset < final_offset) { 2218 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2219 if (length > (final_offset - offset)) { 2220 length = final_offset - offset; 2221 } 2222 rc = __file_read(file, payload, offset, length, &channel->sem); 2223 if (rc == 0) { 2224 final_length += length; 2225 } else { 2226 break; 2227 } 2228 payload += length; 2229 offset += length; 2230 sub_reads++; 2231 } 2232 pthread_spin_unlock(&file->lock); 2233 while (sub_reads-- > 0) { 2234 sem_wait(&channel->sem); 2235 } 2236 if (rc == 0) { 2237 return final_length; 2238 } else { 2239 return rc; 2240 } 2241 } 2242 2243 static void 2244 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2245 spdk_file_op_complete cb_fn, void *cb_arg) 2246 { 2247 struct spdk_fs_request *sync_req; 2248 struct spdk_fs_request *flush_req; 2249 struct spdk_fs_cb_args *sync_args; 2250 struct spdk_fs_cb_args *flush_args; 2251 2252 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2253 2254 pthread_spin_lock(&file->lock); 2255 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2256 BLOBFS_TRACE(file, "done - no data to flush\n"); 2257 pthread_spin_unlock(&file->lock); 2258 cb_fn(cb_arg, 0); 2259 return; 2260 } 2261 2262 sync_req = alloc_fs_request(channel); 2263 assert(sync_req != NULL); 2264 sync_args = &sync_req->args; 2265 2266 flush_req = alloc_fs_request(channel); 2267 assert(flush_req != NULL); 2268 flush_args = &flush_req->args; 2269 2270 sync_args->file = file; 2271 sync_args->fn.file_op = cb_fn; 2272 sync_args->arg = cb_arg; 2273 sync_args->op.sync.offset = file->append_pos; 2274 sync_args->op.sync.xattr_in_progress = false; 2275 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2276 pthread_spin_unlock(&file->lock); 2277 2278 flush_args->file = file; 2279 channel->send_request(__file_flush, flush_args); 2280 } 2281 2282 int 2283 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2284 { 2285 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2286 2287 _file_sync(file, channel, __sem_post, &channel->sem); 2288 sem_wait(&channel->sem); 2289 2290 return 0; 2291 } 2292 2293 void 2294 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2295 spdk_file_op_complete cb_fn, void *cb_arg) 2296 { 2297 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2298 2299 _file_sync(file, channel, cb_fn, cb_arg); 2300 } 2301 2302 void 2303 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2304 { 2305 BLOBFS_TRACE(file, "priority=%u\n", priority); 2306 file->priority = priority; 2307 2308 } 2309 2310 /* 2311 * Close routines 2312 */ 2313 2314 static void 2315 __file_close_async_done(void *ctx, int bserrno) 2316 { 2317 struct spdk_fs_request *req = ctx; 2318 struct spdk_fs_cb_args *args = &req->args; 2319 struct spdk_file *file = args->file; 2320 2321 if (file->is_deleted) { 2322 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2323 return; 2324 } 2325 args->fn.file_op(args->arg, bserrno); 2326 free_fs_request(req); 2327 } 2328 2329 static void 2330 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2331 { 2332 pthread_spin_lock(&file->lock); 2333 if (file->ref_count == 0) { 2334 pthread_spin_unlock(&file->lock); 2335 __file_close_async_done(req, -EBADF); 2336 return; 2337 } 2338 2339 file->ref_count--; 2340 if (file->ref_count > 0) { 2341 pthread_spin_unlock(&file->lock); 2342 __file_close_async_done(req, 0); 2343 return; 2344 } 2345 2346 pthread_spin_unlock(&file->lock); 2347 2348 spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req); 2349 } 2350 2351 static void 2352 __file_close_async__sync_done(void *arg, int fserrno) 2353 { 2354 struct spdk_fs_request *req = arg; 2355 struct spdk_fs_cb_args *args = &req->args; 2356 2357 __file_close_async(args->file, req); 2358 } 2359 2360 void 2361 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2362 { 2363 struct spdk_fs_request *req; 2364 struct spdk_fs_cb_args *args; 2365 2366 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2367 if (req == NULL) { 2368 cb_fn(cb_arg, -ENOMEM); 2369 return; 2370 } 2371 2372 args = &req->args; 2373 args->file = file; 2374 args->fn.file_op = cb_fn; 2375 args->arg = cb_arg; 2376 2377 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2378 } 2379 2380 static void 2381 __file_close_done(void *arg, int fserrno) 2382 { 2383 struct spdk_fs_cb_args *args = arg; 2384 2385 args->rc = fserrno; 2386 sem_post(args->sem); 2387 } 2388 2389 static void 2390 __file_close(void *arg) 2391 { 2392 struct spdk_fs_request *req = arg; 2393 struct spdk_fs_cb_args *args = &req->args; 2394 struct spdk_file *file = args->file; 2395 2396 __file_close_async(file, req); 2397 } 2398 2399 int 2400 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2401 { 2402 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2403 struct spdk_fs_request *req; 2404 struct spdk_fs_cb_args *args; 2405 2406 req = alloc_fs_request(channel); 2407 assert(req != NULL); 2408 2409 args = &req->args; 2410 2411 spdk_file_sync(file, _channel); 2412 BLOBFS_TRACE(file, "name=%s\n", file->name); 2413 args->file = file; 2414 args->sem = &channel->sem; 2415 args->fn.file_op = __file_close_done; 2416 args->arg = req; 2417 channel->send_request(__file_close, req); 2418 sem_wait(&channel->sem); 2419 2420 return args->rc; 2421 } 2422 2423 static void 2424 cache_free_buffers(struct spdk_file *file) 2425 { 2426 BLOBFS_TRACE(file, "free=%s\n", file->name); 2427 pthread_spin_lock(&file->lock); 2428 pthread_spin_lock(&g_caches_lock); 2429 if (file->tree->present_mask == 0) { 2430 pthread_spin_unlock(&g_caches_lock); 2431 pthread_spin_unlock(&file->lock); 2432 return; 2433 } 2434 spdk_tree_free_buffers(file->tree); 2435 2436 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2437 /* If not freed, put it in the end of the queue */ 2438 if (file->tree->present_mask != 0) { 2439 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2440 } 2441 file->last = NULL; 2442 pthread_spin_unlock(&g_caches_lock); 2443 pthread_spin_unlock(&file->lock); 2444 } 2445 2446 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS); 2447 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW); 2448