1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "blobfs_internal.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/thread.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 47 #define BLOBFS_TRACE(file, str, args...) \ 48 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 49 50 #define BLOBFS_TRACE_RW(file, str, args...) \ 51 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 55 56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 57 static struct spdk_mempool *g_cache_pool; 58 static TAILQ_HEAD(, spdk_file) g_caches; 59 static int g_fs_count = 0; 60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 61 static pthread_spinlock_t g_caches_lock; 62 63 static void 64 __sem_post(void *arg, int bserrno) 65 { 66 sem_t *sem = arg; 67 68 sem_post(sem); 69 } 70 71 void 72 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 73 { 74 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 75 free(cache_buffer); 76 } 77 78 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 79 80 struct spdk_file { 81 struct spdk_filesystem *fs; 82 struct spdk_blob *blob; 83 char *name; 84 uint64_t length; 85 bool is_deleted; 86 bool open_for_writing; 87 uint64_t length_flushed; 88 uint64_t append_pos; 89 uint64_t seq_byte_count; 90 uint64_t next_seq_offset; 91 uint32_t priority; 92 TAILQ_ENTRY(spdk_file) tailq; 93 spdk_blob_id blobid; 94 uint32_t ref_count; 95 pthread_spinlock_t lock; 96 struct cache_buffer *last; 97 struct cache_tree *tree; 98 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 99 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 100 TAILQ_ENTRY(spdk_file) cache_tailq; 101 }; 102 103 struct spdk_deleted_file { 104 spdk_blob_id id; 105 TAILQ_ENTRY(spdk_deleted_file) tailq; 106 }; 107 108 struct spdk_filesystem { 109 struct spdk_blob_store *bs; 110 TAILQ_HEAD(, spdk_file) files; 111 struct spdk_bs_opts bs_opts; 112 struct spdk_bs_dev *bdev; 113 fs_send_request_fn send_request; 114 115 struct { 116 uint32_t max_ops; 117 struct spdk_io_channel *sync_io_channel; 118 struct spdk_fs_channel *sync_fs_channel; 119 } sync_target; 120 121 struct { 122 uint32_t max_ops; 123 struct spdk_io_channel *md_io_channel; 124 struct spdk_fs_channel *md_fs_channel; 125 } md_target; 126 127 struct { 128 uint32_t max_ops; 129 } io_target; 130 }; 131 132 struct spdk_fs_cb_args { 133 union { 134 spdk_fs_op_with_handle_complete fs_op_with_handle; 135 spdk_fs_op_complete fs_op; 136 spdk_file_op_with_handle_complete file_op_with_handle; 137 spdk_file_op_complete file_op; 138 spdk_file_stat_op_complete stat_op; 139 } fn; 140 void *arg; 141 sem_t *sem; 142 struct spdk_filesystem *fs; 143 struct spdk_file *file; 144 int rc; 145 bool from_request; 146 union { 147 struct { 148 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 149 } fs_load; 150 struct { 151 uint64_t length; 152 } truncate; 153 struct { 154 struct spdk_io_channel *channel; 155 void *user_buf; 156 void *pin_buf; 157 int is_read; 158 off_t offset; 159 size_t length; 160 uint64_t start_page; 161 uint64_t num_pages; 162 uint32_t blocklen; 163 } rw; 164 struct { 165 const char *old_name; 166 const char *new_name; 167 } rename; 168 struct { 169 struct cache_buffer *cache_buffer; 170 uint64_t length; 171 } flush; 172 struct { 173 struct cache_buffer *cache_buffer; 174 uint64_t length; 175 uint64_t offset; 176 } readahead; 177 struct { 178 uint64_t offset; 179 TAILQ_ENTRY(spdk_fs_request) tailq; 180 bool xattr_in_progress; 181 } sync; 182 struct { 183 uint32_t num_clusters; 184 } resize; 185 struct { 186 const char *name; 187 uint32_t flags; 188 TAILQ_ENTRY(spdk_fs_request) tailq; 189 } open; 190 struct { 191 const char *name; 192 struct spdk_blob *blob; 193 } create; 194 struct { 195 const char *name; 196 } delete; 197 struct { 198 const char *name; 199 } stat; 200 } op; 201 }; 202 203 static void cache_free_buffers(struct spdk_file *file); 204 205 void 206 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 207 { 208 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 209 } 210 211 static void 212 __initialize_cache(void) 213 { 214 assert(g_cache_pool == NULL); 215 216 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 217 g_fs_cache_size / CACHE_BUFFER_SIZE, 218 CACHE_BUFFER_SIZE, 219 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 220 SPDK_ENV_SOCKET_ID_ANY); 221 TAILQ_INIT(&g_caches); 222 pthread_spin_init(&g_caches_lock, 0); 223 } 224 225 static void 226 __free_cache(void) 227 { 228 assert(g_cache_pool != NULL); 229 230 spdk_mempool_free(g_cache_pool); 231 g_cache_pool = NULL; 232 } 233 234 static uint64_t 235 __file_get_blob_size(struct spdk_file *file) 236 { 237 uint64_t cluster_sz; 238 239 cluster_sz = file->fs->bs_opts.cluster_sz; 240 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 241 } 242 243 struct spdk_fs_request { 244 struct spdk_fs_cb_args args; 245 TAILQ_ENTRY(spdk_fs_request) link; 246 struct spdk_fs_channel *channel; 247 }; 248 249 struct spdk_fs_channel { 250 struct spdk_fs_request *req_mem; 251 TAILQ_HEAD(, spdk_fs_request) reqs; 252 sem_t sem; 253 struct spdk_filesystem *fs; 254 struct spdk_io_channel *bs_channel; 255 fs_send_request_fn send_request; 256 bool sync; 257 pthread_spinlock_t lock; 258 }; 259 260 static struct spdk_fs_request * 261 alloc_fs_request(struct spdk_fs_channel *channel) 262 { 263 struct spdk_fs_request *req; 264 265 if (channel->sync) { 266 pthread_spin_lock(&channel->lock); 267 } 268 269 req = TAILQ_FIRST(&channel->reqs); 270 if (req) { 271 TAILQ_REMOVE(&channel->reqs, req, link); 272 } 273 274 if (channel->sync) { 275 pthread_spin_unlock(&channel->lock); 276 } 277 278 if (req == NULL) { 279 return NULL; 280 } 281 memset(req, 0, sizeof(*req)); 282 req->channel = channel; 283 req->args.from_request = true; 284 285 return req; 286 } 287 288 static void 289 free_fs_request(struct spdk_fs_request *req) 290 { 291 struct spdk_fs_channel *channel = req->channel; 292 293 if (channel->sync) { 294 pthread_spin_lock(&channel->lock); 295 } 296 297 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 298 299 if (channel->sync) { 300 pthread_spin_unlock(&channel->lock); 301 } 302 } 303 304 static int 305 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 306 uint32_t max_ops) 307 { 308 uint32_t i; 309 310 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 311 if (!channel->req_mem) { 312 return -1; 313 } 314 315 TAILQ_INIT(&channel->reqs); 316 sem_init(&channel->sem, 0, 0); 317 318 for (i = 0; i < max_ops; i++) { 319 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 320 } 321 322 channel->fs = fs; 323 324 return 0; 325 } 326 327 static int 328 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 329 { 330 struct spdk_filesystem *fs; 331 struct spdk_fs_channel *channel = ctx_buf; 332 333 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 334 335 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 336 } 337 338 static int 339 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 340 { 341 struct spdk_filesystem *fs; 342 struct spdk_fs_channel *channel = ctx_buf; 343 344 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 345 346 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 347 } 348 349 static int 350 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 351 { 352 struct spdk_filesystem *fs; 353 struct spdk_fs_channel *channel = ctx_buf; 354 355 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 356 357 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 358 } 359 360 static void 361 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 362 { 363 struct spdk_fs_channel *channel = ctx_buf; 364 365 free(channel->req_mem); 366 if (channel->bs_channel != NULL) { 367 spdk_bs_free_io_channel(channel->bs_channel); 368 } 369 } 370 371 static void 372 __send_request_direct(fs_request_fn fn, void *arg) 373 { 374 fn(arg); 375 } 376 377 static void 378 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 379 { 380 fs->bs = bs; 381 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 382 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 383 fs->md_target.md_fs_channel->send_request = __send_request_direct; 384 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 385 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 386 387 pthread_mutex_lock(&g_cache_init_lock); 388 if (g_fs_count == 0) { 389 __initialize_cache(); 390 } 391 g_fs_count++; 392 pthread_mutex_unlock(&g_cache_init_lock); 393 } 394 395 static void 396 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 397 { 398 struct spdk_fs_request *req = ctx; 399 struct spdk_fs_cb_args *args = &req->args; 400 struct spdk_filesystem *fs = args->fs; 401 402 if (bserrno == 0) { 403 common_fs_bs_init(fs, bs); 404 } else { 405 free(fs); 406 fs = NULL; 407 } 408 409 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 410 free_fs_request(req); 411 } 412 413 static void 414 fs_conf_parse(void) 415 { 416 struct spdk_conf_section *sp; 417 418 sp = spdk_conf_find_section(NULL, "Blobfs"); 419 if (sp == NULL) { 420 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 421 return; 422 } 423 424 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 425 if (g_fs_cache_buffer_shift <= 0) { 426 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 427 } 428 } 429 430 static struct spdk_filesystem * 431 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 432 { 433 struct spdk_filesystem *fs; 434 435 fs = calloc(1, sizeof(*fs)); 436 if (fs == NULL) { 437 return NULL; 438 } 439 440 fs->bdev = dev; 441 fs->send_request = send_request_fn; 442 TAILQ_INIT(&fs->files); 443 444 fs->md_target.max_ops = 512; 445 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 446 sizeof(struct spdk_fs_channel)); 447 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 448 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 449 450 fs->sync_target.max_ops = 512; 451 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 452 sizeof(struct spdk_fs_channel)); 453 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 454 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 455 456 fs->io_target.max_ops = 512; 457 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 458 sizeof(struct spdk_fs_channel)); 459 460 return fs; 461 } 462 463 void 464 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 465 fs_send_request_fn send_request_fn, 466 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 467 { 468 struct spdk_filesystem *fs; 469 struct spdk_fs_request *req; 470 struct spdk_fs_cb_args *args; 471 struct spdk_bs_opts opts = {}; 472 473 fs = fs_alloc(dev, send_request_fn); 474 if (fs == NULL) { 475 cb_fn(cb_arg, NULL, -ENOMEM); 476 return; 477 } 478 479 fs_conf_parse(); 480 481 req = alloc_fs_request(fs->md_target.md_fs_channel); 482 if (req == NULL) { 483 spdk_put_io_channel(fs->md_target.md_io_channel); 484 spdk_io_device_unregister(&fs->md_target, NULL); 485 spdk_put_io_channel(fs->sync_target.sync_io_channel); 486 spdk_io_device_unregister(&fs->sync_target, NULL); 487 spdk_io_device_unregister(&fs->io_target, NULL); 488 free(fs); 489 cb_fn(cb_arg, NULL, -ENOMEM); 490 return; 491 } 492 493 args = &req->args; 494 args->fn.fs_op_with_handle = cb_fn; 495 args->arg = cb_arg; 496 args->fs = fs; 497 498 spdk_bs_opts_init(&opts); 499 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS"); 500 if (opt) { 501 opts.cluster_sz = opt->cluster_sz; 502 } 503 spdk_bs_init(dev, &opts, init_cb, req); 504 } 505 506 static struct spdk_file * 507 file_alloc(struct spdk_filesystem *fs) 508 { 509 struct spdk_file *file; 510 511 file = calloc(1, sizeof(*file)); 512 if (file == NULL) { 513 return NULL; 514 } 515 516 file->tree = calloc(1, sizeof(*file->tree)); 517 if (file->tree == NULL) { 518 free(file); 519 return NULL; 520 } 521 522 file->fs = fs; 523 TAILQ_INIT(&file->open_requests); 524 TAILQ_INIT(&file->sync_requests); 525 pthread_spin_init(&file->lock, 0); 526 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 527 file->priority = SPDK_FILE_PRIORITY_LOW; 528 return file; 529 } 530 531 static void fs_load_done(void *ctx, int bserrno); 532 533 static int 534 _handle_deleted_files(struct spdk_fs_request *req) 535 { 536 struct spdk_fs_cb_args *args = &req->args; 537 struct spdk_filesystem *fs = args->fs; 538 539 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 540 struct spdk_deleted_file *deleted_file; 541 542 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 543 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 544 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 545 free(deleted_file); 546 return 0; 547 } 548 549 return 1; 550 } 551 552 static void 553 fs_load_done(void *ctx, int bserrno) 554 { 555 struct spdk_fs_request *req = ctx; 556 struct spdk_fs_cb_args *args = &req->args; 557 struct spdk_filesystem *fs = args->fs; 558 559 /* The filesystem has been loaded. Now check if there are any files that 560 * were marked for deletion before last unload. Do not complete the 561 * fs_load callback until all of them have been deleted on disk. 562 */ 563 if (_handle_deleted_files(req) == 0) { 564 /* We found a file that's been marked for deleting but not actually 565 * deleted yet. This function will get called again once the delete 566 * operation is completed. 567 */ 568 return; 569 } 570 571 args->fn.fs_op_with_handle(args->arg, fs, 0); 572 free_fs_request(req); 573 574 } 575 576 static void 577 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 578 { 579 struct spdk_fs_request *req = ctx; 580 struct spdk_fs_cb_args *args = &req->args; 581 struct spdk_filesystem *fs = args->fs; 582 uint64_t *length; 583 const char *name; 584 uint32_t *is_deleted; 585 size_t value_len; 586 587 if (rc < 0) { 588 args->fn.fs_op_with_handle(args->arg, fs, rc); 589 free_fs_request(req); 590 return; 591 } 592 593 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 594 if (rc < 0) { 595 args->fn.fs_op_with_handle(args->arg, fs, rc); 596 free_fs_request(req); 597 return; 598 } 599 600 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 601 if (rc < 0) { 602 args->fn.fs_op_with_handle(args->arg, fs, rc); 603 free_fs_request(req); 604 return; 605 } 606 607 assert(value_len == 8); 608 609 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 610 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 611 if (rc < 0) { 612 struct spdk_file *f; 613 614 f = file_alloc(fs); 615 if (f == NULL) { 616 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 617 free_fs_request(req); 618 return; 619 } 620 621 f->name = strdup(name); 622 f->blobid = spdk_blob_get_id(blob); 623 f->length = *length; 624 f->length_flushed = *length; 625 f->append_pos = *length; 626 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 627 } else { 628 struct spdk_deleted_file *deleted_file; 629 630 deleted_file = calloc(1, sizeof(*deleted_file)); 631 if (deleted_file == NULL) { 632 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 633 free_fs_request(req); 634 return; 635 } 636 deleted_file->id = spdk_blob_get_id(blob); 637 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 638 } 639 } 640 641 static void 642 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 643 { 644 struct spdk_fs_request *req = ctx; 645 struct spdk_fs_cb_args *args = &req->args; 646 struct spdk_filesystem *fs = args->fs; 647 struct spdk_bs_type bstype; 648 static const struct spdk_bs_type blobfs_type = {"BLOBFS"}; 649 static const struct spdk_bs_type zeros; 650 651 if (bserrno != 0) { 652 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 653 free_fs_request(req); 654 free(fs); 655 return; 656 } 657 658 bstype = spdk_bs_get_bstype(bs); 659 660 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 661 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n"); 662 spdk_bs_set_bstype(bs, blobfs_type); 663 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 664 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n"); 665 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype)); 666 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 667 free_fs_request(req); 668 free(fs); 669 return; 670 } 671 672 common_fs_bs_init(fs, bs); 673 fs_load_done(req, 0); 674 } 675 676 void 677 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 678 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 679 { 680 struct spdk_filesystem *fs; 681 struct spdk_fs_cb_args *args; 682 struct spdk_fs_request *req; 683 struct spdk_bs_opts bs_opts; 684 685 fs = fs_alloc(dev, send_request_fn); 686 if (fs == NULL) { 687 cb_fn(cb_arg, NULL, -ENOMEM); 688 return; 689 } 690 691 fs_conf_parse(); 692 693 req = alloc_fs_request(fs->md_target.md_fs_channel); 694 if (req == NULL) { 695 spdk_put_io_channel(fs->md_target.md_io_channel); 696 spdk_io_device_unregister(&fs->md_target, NULL); 697 spdk_put_io_channel(fs->sync_target.sync_io_channel); 698 spdk_io_device_unregister(&fs->sync_target, NULL); 699 spdk_io_device_unregister(&fs->io_target, NULL); 700 free(fs); 701 cb_fn(cb_arg, NULL, -ENOMEM); 702 return; 703 } 704 705 args = &req->args; 706 args->fn.fs_op_with_handle = cb_fn; 707 args->arg = cb_arg; 708 args->fs = fs; 709 TAILQ_INIT(&args->op.fs_load.deleted_files); 710 spdk_bs_opts_init(&bs_opts); 711 bs_opts.iter_cb_fn = iter_cb; 712 bs_opts.iter_cb_arg = req; 713 spdk_bs_load(dev, &bs_opts, load_cb, req); 714 } 715 716 static void 717 unload_cb(void *ctx, int bserrno) 718 { 719 struct spdk_fs_request *req = ctx; 720 struct spdk_fs_cb_args *args = &req->args; 721 struct spdk_filesystem *fs = args->fs; 722 723 pthread_mutex_lock(&g_cache_init_lock); 724 g_fs_count--; 725 if (g_fs_count == 0) { 726 __free_cache(); 727 } 728 pthread_mutex_unlock(&g_cache_init_lock); 729 730 args->fn.fs_op(args->arg, bserrno); 731 free(req); 732 733 spdk_io_device_unregister(&fs->io_target, NULL); 734 spdk_io_device_unregister(&fs->sync_target, NULL); 735 spdk_io_device_unregister(&fs->md_target, NULL); 736 737 free(fs); 738 } 739 740 void 741 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 742 { 743 struct spdk_fs_request *req; 744 struct spdk_fs_cb_args *args; 745 746 /* 747 * We must free the md_channel before unloading the blobstore, so just 748 * allocate this request from the general heap. 749 */ 750 req = calloc(1, sizeof(*req)); 751 if (req == NULL) { 752 cb_fn(cb_arg, -ENOMEM); 753 return; 754 } 755 756 args = &req->args; 757 args->fn.fs_op = cb_fn; 758 args->arg = cb_arg; 759 args->fs = fs; 760 761 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 762 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 763 spdk_bs_unload(fs->bs, unload_cb, req); 764 } 765 766 static struct spdk_file * 767 fs_find_file(struct spdk_filesystem *fs, const char *name) 768 { 769 struct spdk_file *file; 770 771 TAILQ_FOREACH(file, &fs->files, tailq) { 772 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 773 return file; 774 } 775 } 776 777 return NULL; 778 } 779 780 void 781 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 782 spdk_file_stat_op_complete cb_fn, void *cb_arg) 783 { 784 struct spdk_file_stat stat; 785 struct spdk_file *f = NULL; 786 787 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 788 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 789 return; 790 } 791 792 f = fs_find_file(fs, name); 793 if (f != NULL) { 794 stat.blobid = f->blobid; 795 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 796 cb_fn(cb_arg, &stat, 0); 797 return; 798 } 799 800 cb_fn(cb_arg, NULL, -ENOENT); 801 } 802 803 static void 804 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 805 { 806 struct spdk_fs_request *req = arg; 807 struct spdk_fs_cb_args *args = &req->args; 808 809 args->rc = fserrno; 810 if (fserrno == 0) { 811 memcpy(args->arg, stat, sizeof(*stat)); 812 } 813 sem_post(args->sem); 814 } 815 816 static void 817 __file_stat(void *arg) 818 { 819 struct spdk_fs_request *req = arg; 820 struct spdk_fs_cb_args *args = &req->args; 821 822 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 823 args->fn.stat_op, req); 824 } 825 826 int 827 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 828 const char *name, struct spdk_file_stat *stat) 829 { 830 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 831 struct spdk_fs_request *req; 832 int rc; 833 834 req = alloc_fs_request(channel); 835 if (req == NULL) { 836 return -ENOMEM; 837 } 838 839 req->args.fs = fs; 840 req->args.op.stat.name = name; 841 req->args.fn.stat_op = __copy_stat; 842 req->args.arg = stat; 843 req->args.sem = &channel->sem; 844 channel->send_request(__file_stat, req); 845 sem_wait(&channel->sem); 846 847 rc = req->args.rc; 848 free_fs_request(req); 849 850 return rc; 851 } 852 853 static void 854 fs_create_blob_close_cb(void *ctx, int bserrno) 855 { 856 struct spdk_fs_request *req = ctx; 857 struct spdk_fs_cb_args *args = &req->args; 858 859 args->fn.file_op(args->arg, bserrno); 860 free_fs_request(req); 861 } 862 863 static void 864 fs_create_blob_resize_cb(void *ctx, int bserrno) 865 { 866 struct spdk_fs_request *req = ctx; 867 struct spdk_fs_cb_args *args = &req->args; 868 struct spdk_file *f = args->file; 869 struct spdk_blob *blob = args->op.create.blob; 870 uint64_t length = 0; 871 872 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 873 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 874 875 spdk_blob_close(blob, fs_create_blob_close_cb, args); 876 } 877 878 static void 879 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 880 { 881 struct spdk_fs_request *req = ctx; 882 struct spdk_fs_cb_args *args = &req->args; 883 884 args->op.create.blob = blob; 885 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 886 } 887 888 static void 889 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 890 { 891 struct spdk_fs_request *req = ctx; 892 struct spdk_fs_cb_args *args = &req->args; 893 struct spdk_file *f = args->file; 894 895 f->blobid = blobid; 896 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 897 } 898 899 void 900 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 901 spdk_file_op_complete cb_fn, void *cb_arg) 902 { 903 struct spdk_file *file; 904 struct spdk_fs_request *req; 905 struct spdk_fs_cb_args *args; 906 907 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 908 cb_fn(cb_arg, -ENAMETOOLONG); 909 return; 910 } 911 912 file = fs_find_file(fs, name); 913 if (file != NULL) { 914 cb_fn(cb_arg, -EEXIST); 915 return; 916 } 917 918 file = file_alloc(fs); 919 if (file == NULL) { 920 cb_fn(cb_arg, -ENOMEM); 921 return; 922 } 923 924 req = alloc_fs_request(fs->md_target.md_fs_channel); 925 if (req == NULL) { 926 cb_fn(cb_arg, -ENOMEM); 927 return; 928 } 929 930 args = &req->args; 931 args->file = file; 932 args->fn.file_op = cb_fn; 933 args->arg = cb_arg; 934 935 file->name = strdup(name); 936 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 937 } 938 939 static void 940 __fs_create_file_done(void *arg, int fserrno) 941 { 942 struct spdk_fs_request *req = arg; 943 struct spdk_fs_cb_args *args = &req->args; 944 945 args->rc = fserrno; 946 sem_post(args->sem); 947 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 948 } 949 950 static void 951 __fs_create_file(void *arg) 952 { 953 struct spdk_fs_request *req = arg; 954 struct spdk_fs_cb_args *args = &req->args; 955 956 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 957 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 958 } 959 960 int 961 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 962 { 963 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 964 struct spdk_fs_request *req; 965 struct spdk_fs_cb_args *args; 966 int rc; 967 968 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 969 970 req = alloc_fs_request(channel); 971 if (req == NULL) { 972 return -ENOMEM; 973 } 974 975 args = &req->args; 976 args->fs = fs; 977 args->op.create.name = name; 978 args->sem = &channel->sem; 979 fs->send_request(__fs_create_file, req); 980 sem_wait(&channel->sem); 981 rc = args->rc; 982 free_fs_request(req); 983 984 return rc; 985 } 986 987 static void 988 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 989 { 990 struct spdk_fs_request *req = ctx; 991 struct spdk_fs_cb_args *args = &req->args; 992 struct spdk_file *f = args->file; 993 994 f->blob = blob; 995 while (!TAILQ_EMPTY(&f->open_requests)) { 996 req = TAILQ_FIRST(&f->open_requests); 997 args = &req->args; 998 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 999 args->fn.file_op_with_handle(args->arg, f, bserrno); 1000 free_fs_request(req); 1001 } 1002 } 1003 1004 static void 1005 fs_open_blob_create_cb(void *ctx, int bserrno) 1006 { 1007 struct spdk_fs_request *req = ctx; 1008 struct spdk_fs_cb_args *args = &req->args; 1009 struct spdk_file *file = args->file; 1010 struct spdk_filesystem *fs = args->fs; 1011 1012 if (file == NULL) { 1013 /* 1014 * This is from an open with CREATE flag - the file 1015 * is now created so look it up in the file list for this 1016 * filesystem. 1017 */ 1018 file = fs_find_file(fs, args->op.open.name); 1019 assert(file != NULL); 1020 args->file = file; 1021 } 1022 1023 file->ref_count++; 1024 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1025 if (file->ref_count == 1) { 1026 assert(file->blob == NULL); 1027 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1028 } else if (file->blob != NULL) { 1029 fs_open_blob_done(req, file->blob, 0); 1030 } else { 1031 /* 1032 * The blob open for this file is in progress due to a previous 1033 * open request. When that open completes, it will invoke the 1034 * open callback for this request. 1035 */ 1036 } 1037 } 1038 1039 void 1040 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1041 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1042 { 1043 struct spdk_file *f = NULL; 1044 struct spdk_fs_request *req; 1045 struct spdk_fs_cb_args *args; 1046 1047 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1048 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1049 return; 1050 } 1051 1052 f = fs_find_file(fs, name); 1053 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1054 cb_fn(cb_arg, NULL, -ENOENT); 1055 return; 1056 } 1057 1058 if (f != NULL && f->is_deleted == true) { 1059 cb_fn(cb_arg, NULL, -ENOENT); 1060 return; 1061 } 1062 1063 req = alloc_fs_request(fs->md_target.md_fs_channel); 1064 if (req == NULL) { 1065 cb_fn(cb_arg, NULL, -ENOMEM); 1066 return; 1067 } 1068 1069 args = &req->args; 1070 args->fn.file_op_with_handle = cb_fn; 1071 args->arg = cb_arg; 1072 args->file = f; 1073 args->fs = fs; 1074 args->op.open.name = name; 1075 1076 if (f == NULL) { 1077 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1078 } else { 1079 fs_open_blob_create_cb(req, 0); 1080 } 1081 } 1082 1083 static void 1084 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1085 { 1086 struct spdk_fs_request *req = arg; 1087 struct spdk_fs_cb_args *args = &req->args; 1088 1089 args->file = file; 1090 args->rc = bserrno; 1091 sem_post(args->sem); 1092 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1093 } 1094 1095 static void 1096 __fs_open_file(void *arg) 1097 { 1098 struct spdk_fs_request *req = arg; 1099 struct spdk_fs_cb_args *args = &req->args; 1100 1101 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1102 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1103 __fs_open_file_done, req); 1104 } 1105 1106 int 1107 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1108 const char *name, uint32_t flags, struct spdk_file **file) 1109 { 1110 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1111 struct spdk_fs_request *req; 1112 struct spdk_fs_cb_args *args; 1113 int rc; 1114 1115 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1116 1117 req = alloc_fs_request(channel); 1118 if (req == NULL) { 1119 return -ENOMEM; 1120 } 1121 1122 args = &req->args; 1123 args->fs = fs; 1124 args->op.open.name = name; 1125 args->op.open.flags = flags; 1126 args->sem = &channel->sem; 1127 fs->send_request(__fs_open_file, req); 1128 sem_wait(&channel->sem); 1129 rc = args->rc; 1130 if (rc == 0) { 1131 *file = args->file; 1132 } else { 1133 *file = NULL; 1134 } 1135 free_fs_request(req); 1136 1137 return rc; 1138 } 1139 1140 static void 1141 fs_rename_blob_close_cb(void *ctx, int bserrno) 1142 { 1143 struct spdk_fs_request *req = ctx; 1144 struct spdk_fs_cb_args *args = &req->args; 1145 1146 args->fn.fs_op(args->arg, bserrno); 1147 free_fs_request(req); 1148 } 1149 1150 static void 1151 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1152 { 1153 struct spdk_fs_request *req = ctx; 1154 struct spdk_fs_cb_args *args = &req->args; 1155 const char *new_name = args->op.rename.new_name; 1156 1157 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1158 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1159 } 1160 1161 static void 1162 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1163 { 1164 struct spdk_fs_cb_args *args = &req->args; 1165 struct spdk_file *f; 1166 1167 f = fs_find_file(args->fs, args->op.rename.old_name); 1168 if (f == NULL) { 1169 args->fn.fs_op(args->arg, -ENOENT); 1170 free_fs_request(req); 1171 return; 1172 } 1173 1174 free(f->name); 1175 f->name = strdup(args->op.rename.new_name); 1176 args->file = f; 1177 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1178 } 1179 1180 static void 1181 fs_rename_delete_done(void *arg, int fserrno) 1182 { 1183 __spdk_fs_md_rename_file(arg); 1184 } 1185 1186 void 1187 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1188 const char *old_name, const char *new_name, 1189 spdk_file_op_complete cb_fn, void *cb_arg) 1190 { 1191 struct spdk_file *f; 1192 struct spdk_fs_request *req; 1193 struct spdk_fs_cb_args *args; 1194 1195 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1196 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1197 cb_fn(cb_arg, -ENAMETOOLONG); 1198 return; 1199 } 1200 1201 req = alloc_fs_request(fs->md_target.md_fs_channel); 1202 if (req == NULL) { 1203 cb_fn(cb_arg, -ENOMEM); 1204 return; 1205 } 1206 1207 args = &req->args; 1208 args->fn.fs_op = cb_fn; 1209 args->fs = fs; 1210 args->arg = cb_arg; 1211 args->op.rename.old_name = old_name; 1212 args->op.rename.new_name = new_name; 1213 1214 f = fs_find_file(fs, new_name); 1215 if (f == NULL) { 1216 __spdk_fs_md_rename_file(req); 1217 return; 1218 } 1219 1220 /* 1221 * The rename overwrites an existing file. So delete the existing file, then 1222 * do the actual rename. 1223 */ 1224 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1225 } 1226 1227 static void 1228 __fs_rename_file_done(void *arg, int fserrno) 1229 { 1230 struct spdk_fs_request *req = arg; 1231 struct spdk_fs_cb_args *args = &req->args; 1232 1233 args->rc = fserrno; 1234 sem_post(args->sem); 1235 } 1236 1237 static void 1238 __fs_rename_file(void *arg) 1239 { 1240 struct spdk_fs_request *req = arg; 1241 struct spdk_fs_cb_args *args = &req->args; 1242 1243 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1244 __fs_rename_file_done, req); 1245 } 1246 1247 int 1248 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1249 const char *old_name, const char *new_name) 1250 { 1251 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1252 struct spdk_fs_request *req; 1253 struct spdk_fs_cb_args *args; 1254 int rc; 1255 1256 req = alloc_fs_request(channel); 1257 if (req == NULL) { 1258 return -ENOMEM; 1259 } 1260 1261 args = &req->args; 1262 1263 args->fs = fs; 1264 args->op.rename.old_name = old_name; 1265 args->op.rename.new_name = new_name; 1266 args->sem = &channel->sem; 1267 fs->send_request(__fs_rename_file, req); 1268 sem_wait(&channel->sem); 1269 rc = args->rc; 1270 free_fs_request(req); 1271 return rc; 1272 } 1273 1274 static void 1275 blob_delete_cb(void *ctx, int bserrno) 1276 { 1277 struct spdk_fs_request *req = ctx; 1278 struct spdk_fs_cb_args *args = &req->args; 1279 1280 args->fn.file_op(args->arg, bserrno); 1281 free_fs_request(req); 1282 } 1283 1284 void 1285 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1286 spdk_file_op_complete cb_fn, void *cb_arg) 1287 { 1288 struct spdk_file *f; 1289 spdk_blob_id blobid; 1290 struct spdk_fs_request *req; 1291 struct spdk_fs_cb_args *args; 1292 1293 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1294 1295 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1296 cb_fn(cb_arg, -ENAMETOOLONG); 1297 return; 1298 } 1299 1300 f = fs_find_file(fs, name); 1301 if (f == NULL) { 1302 cb_fn(cb_arg, -ENOENT); 1303 return; 1304 } 1305 1306 req = alloc_fs_request(fs->md_target.md_fs_channel); 1307 if (req == NULL) { 1308 cb_fn(cb_arg, -ENOMEM); 1309 return; 1310 } 1311 1312 args = &req->args; 1313 args->fn.file_op = cb_fn; 1314 args->arg = cb_arg; 1315 1316 if (f->ref_count > 0) { 1317 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1318 f->is_deleted = true; 1319 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1320 spdk_blob_sync_md(f->blob, blob_delete_cb, args); 1321 return; 1322 } 1323 1324 TAILQ_REMOVE(&fs->files, f, tailq); 1325 1326 cache_free_buffers(f); 1327 1328 blobid = f->blobid; 1329 1330 free(f->name); 1331 free(f->tree); 1332 free(f); 1333 1334 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1335 } 1336 1337 static void 1338 __fs_delete_file_done(void *arg, int fserrno) 1339 { 1340 struct spdk_fs_request *req = arg; 1341 struct spdk_fs_cb_args *args = &req->args; 1342 1343 args->rc = fserrno; 1344 sem_post(args->sem); 1345 } 1346 1347 static void 1348 __fs_delete_file(void *arg) 1349 { 1350 struct spdk_fs_request *req = arg; 1351 struct spdk_fs_cb_args *args = &req->args; 1352 1353 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1354 } 1355 1356 int 1357 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1358 const char *name) 1359 { 1360 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1361 struct spdk_fs_request *req; 1362 struct spdk_fs_cb_args *args; 1363 int rc; 1364 1365 req = alloc_fs_request(channel); 1366 if (req == NULL) { 1367 return -ENOMEM; 1368 } 1369 1370 args = &req->args; 1371 args->fs = fs; 1372 args->op.delete.name = name; 1373 args->sem = &channel->sem; 1374 fs->send_request(__fs_delete_file, req); 1375 sem_wait(&channel->sem); 1376 rc = args->rc; 1377 free_fs_request(req); 1378 1379 return rc; 1380 } 1381 1382 spdk_fs_iter 1383 spdk_fs_iter_first(struct spdk_filesystem *fs) 1384 { 1385 struct spdk_file *f; 1386 1387 f = TAILQ_FIRST(&fs->files); 1388 return f; 1389 } 1390 1391 spdk_fs_iter 1392 spdk_fs_iter_next(spdk_fs_iter iter) 1393 { 1394 struct spdk_file *f = iter; 1395 1396 if (f == NULL) { 1397 return NULL; 1398 } 1399 1400 f = TAILQ_NEXT(f, tailq); 1401 return f; 1402 } 1403 1404 const char * 1405 spdk_file_get_name(struct spdk_file *file) 1406 { 1407 return file->name; 1408 } 1409 1410 uint64_t 1411 spdk_file_get_length(struct spdk_file *file) 1412 { 1413 assert(file != NULL); 1414 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1415 return file->length; 1416 } 1417 1418 static void 1419 fs_truncate_complete_cb(void *ctx, int bserrno) 1420 { 1421 struct spdk_fs_request *req = ctx; 1422 struct spdk_fs_cb_args *args = &req->args; 1423 1424 args->fn.file_op(args->arg, bserrno); 1425 free_fs_request(req); 1426 } 1427 1428 static void 1429 fs_truncate_resize_cb(void *ctx, int bserrno) 1430 { 1431 struct spdk_fs_request *req = ctx; 1432 struct spdk_fs_cb_args *args = &req->args; 1433 struct spdk_file *file = args->file; 1434 uint64_t *length = &args->op.truncate.length; 1435 1436 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1437 1438 file->length = *length; 1439 if (file->append_pos > file->length) { 1440 file->append_pos = file->length; 1441 } 1442 1443 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, args); 1444 } 1445 1446 static uint64_t 1447 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1448 { 1449 return (length + cluster_sz - 1) / cluster_sz; 1450 } 1451 1452 void 1453 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1454 spdk_file_op_complete cb_fn, void *cb_arg) 1455 { 1456 struct spdk_filesystem *fs; 1457 size_t num_clusters; 1458 struct spdk_fs_request *req; 1459 struct spdk_fs_cb_args *args; 1460 1461 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1462 if (length == file->length) { 1463 cb_fn(cb_arg, 0); 1464 return; 1465 } 1466 1467 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1468 if (req == NULL) { 1469 cb_fn(cb_arg, -ENOMEM); 1470 return; 1471 } 1472 1473 args = &req->args; 1474 args->fn.file_op = cb_fn; 1475 args->arg = cb_arg; 1476 args->file = file; 1477 args->op.truncate.length = length; 1478 fs = file->fs; 1479 1480 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1481 1482 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1483 } 1484 1485 static void 1486 __truncate(void *arg) 1487 { 1488 struct spdk_fs_request *req = arg; 1489 struct spdk_fs_cb_args *args = &req->args; 1490 1491 spdk_file_truncate_async(args->file, args->op.truncate.length, 1492 args->fn.file_op, args->arg); 1493 } 1494 1495 void 1496 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1497 uint64_t length) 1498 { 1499 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1500 struct spdk_fs_request *req; 1501 struct spdk_fs_cb_args *args; 1502 1503 req = alloc_fs_request(channel); 1504 assert(req != NULL); 1505 1506 args = &req->args; 1507 1508 args->file = file; 1509 args->op.truncate.length = length; 1510 args->fn.file_op = __sem_post; 1511 args->arg = &channel->sem; 1512 1513 channel->send_request(__truncate, req); 1514 sem_wait(&channel->sem); 1515 free_fs_request(req); 1516 } 1517 1518 static void 1519 __rw_done(void *ctx, int bserrno) 1520 { 1521 struct spdk_fs_request *req = ctx; 1522 struct spdk_fs_cb_args *args = &req->args; 1523 1524 spdk_dma_free(args->op.rw.pin_buf); 1525 args->fn.file_op(args->arg, bserrno); 1526 free_fs_request(req); 1527 } 1528 1529 static void 1530 __read_done(void *ctx, int bserrno) 1531 { 1532 struct spdk_fs_request *req = ctx; 1533 struct spdk_fs_cb_args *args = &req->args; 1534 1535 assert(req != NULL); 1536 if (args->op.rw.is_read) { 1537 memcpy(args->op.rw.user_buf, 1538 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1539 args->op.rw.length); 1540 __rw_done(req, 0); 1541 } else { 1542 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1543 args->op.rw.user_buf, 1544 args->op.rw.length); 1545 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1546 args->op.rw.pin_buf, 1547 args->op.rw.start_page, args->op.rw.num_pages, 1548 __rw_done, req); 1549 } 1550 } 1551 1552 static void 1553 __do_blob_read(void *ctx, int fserrno) 1554 { 1555 struct spdk_fs_request *req = ctx; 1556 struct spdk_fs_cb_args *args = &req->args; 1557 1558 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1559 args->op.rw.pin_buf, 1560 args->op.rw.start_page, args->op.rw.num_pages, 1561 __read_done, req); 1562 } 1563 1564 static void 1565 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1566 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1567 { 1568 uint64_t end_page; 1569 1570 *page_size = spdk_bs_get_page_size(file->fs->bs); 1571 *start_page = offset / *page_size; 1572 end_page = (offset + length - 1) / *page_size; 1573 *num_pages = (end_page - *start_page + 1); 1574 } 1575 1576 static void 1577 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1578 void *payload, uint64_t offset, uint64_t length, 1579 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1580 { 1581 struct spdk_fs_request *req; 1582 struct spdk_fs_cb_args *args; 1583 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1584 uint64_t start_page, num_pages, pin_buf_length; 1585 uint32_t page_size; 1586 1587 if (is_read && offset + length > file->length) { 1588 cb_fn(cb_arg, -EINVAL); 1589 return; 1590 } 1591 1592 req = alloc_fs_request(channel); 1593 if (req == NULL) { 1594 cb_fn(cb_arg, -ENOMEM); 1595 return; 1596 } 1597 1598 args = &req->args; 1599 args->fn.file_op = cb_fn; 1600 args->arg = cb_arg; 1601 args->file = file; 1602 args->op.rw.channel = channel->bs_channel; 1603 args->op.rw.user_buf = payload; 1604 args->op.rw.is_read = is_read; 1605 args->op.rw.offset = offset; 1606 args->op.rw.length = length; 1607 1608 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1609 pin_buf_length = num_pages * page_size; 1610 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL); 1611 if (args->op.rw.pin_buf == NULL) { 1612 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1613 file->name, offset, length); 1614 free_fs_request(req); 1615 cb_fn(cb_arg, -ENOMEM); 1616 return; 1617 } 1618 1619 args->op.rw.start_page = start_page; 1620 args->op.rw.num_pages = num_pages; 1621 1622 if (!is_read && file->length < offset + length) { 1623 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1624 } else { 1625 __do_blob_read(req, 0); 1626 } 1627 } 1628 1629 void 1630 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1631 void *payload, uint64_t offset, uint64_t length, 1632 spdk_file_op_complete cb_fn, void *cb_arg) 1633 { 1634 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1635 } 1636 1637 void 1638 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1639 void *payload, uint64_t offset, uint64_t length, 1640 spdk_file_op_complete cb_fn, void *cb_arg) 1641 { 1642 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1643 file->name, offset, length); 1644 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1645 } 1646 1647 struct spdk_io_channel * 1648 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1649 { 1650 struct spdk_io_channel *io_channel; 1651 struct spdk_fs_channel *fs_channel; 1652 1653 io_channel = spdk_get_io_channel(&fs->io_target); 1654 fs_channel = spdk_io_channel_get_ctx(io_channel); 1655 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1656 fs_channel->send_request = __send_request_direct; 1657 1658 return io_channel; 1659 } 1660 1661 struct spdk_io_channel * 1662 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs) 1663 { 1664 struct spdk_io_channel *io_channel; 1665 struct spdk_fs_channel *fs_channel; 1666 1667 io_channel = spdk_get_io_channel(&fs->io_target); 1668 fs_channel = spdk_io_channel_get_ctx(io_channel); 1669 fs_channel->send_request = fs->send_request; 1670 fs_channel->sync = 1; 1671 pthread_spin_init(&fs_channel->lock, 0); 1672 1673 return io_channel; 1674 } 1675 1676 void 1677 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1678 { 1679 spdk_put_io_channel(channel); 1680 } 1681 1682 void 1683 spdk_fs_set_cache_size(uint64_t size_in_mb) 1684 { 1685 g_fs_cache_size = size_in_mb * 1024 * 1024; 1686 } 1687 1688 uint64_t 1689 spdk_fs_get_cache_size(void) 1690 { 1691 return g_fs_cache_size / (1024 * 1024); 1692 } 1693 1694 static void __file_flush(void *_args); 1695 1696 static void * 1697 alloc_cache_memory_buffer(struct spdk_file *context) 1698 { 1699 struct spdk_file *file; 1700 void *buf; 1701 1702 buf = spdk_mempool_get(g_cache_pool); 1703 if (buf != NULL) { 1704 return buf; 1705 } 1706 1707 pthread_spin_lock(&g_caches_lock); 1708 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1709 if (!file->open_for_writing && 1710 file->priority == SPDK_FILE_PRIORITY_LOW && 1711 file != context) { 1712 break; 1713 } 1714 } 1715 pthread_spin_unlock(&g_caches_lock); 1716 if (file != NULL) { 1717 cache_free_buffers(file); 1718 buf = spdk_mempool_get(g_cache_pool); 1719 if (buf != NULL) { 1720 return buf; 1721 } 1722 } 1723 1724 pthread_spin_lock(&g_caches_lock); 1725 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1726 if (!file->open_for_writing && file != context) { 1727 break; 1728 } 1729 } 1730 pthread_spin_unlock(&g_caches_lock); 1731 if (file != NULL) { 1732 cache_free_buffers(file); 1733 buf = spdk_mempool_get(g_cache_pool); 1734 if (buf != NULL) { 1735 return buf; 1736 } 1737 } 1738 1739 pthread_spin_lock(&g_caches_lock); 1740 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1741 if (file != context) { 1742 break; 1743 } 1744 } 1745 pthread_spin_unlock(&g_caches_lock); 1746 if (file != NULL) { 1747 cache_free_buffers(file); 1748 buf = spdk_mempool_get(g_cache_pool); 1749 if (buf != NULL) { 1750 return buf; 1751 } 1752 } 1753 1754 return NULL; 1755 } 1756 1757 static struct cache_buffer * 1758 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1759 { 1760 struct cache_buffer *buf; 1761 int count = 0; 1762 1763 buf = calloc(1, sizeof(*buf)); 1764 if (buf == NULL) { 1765 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 1766 return NULL; 1767 } 1768 1769 buf->buf = alloc_cache_memory_buffer(file); 1770 while (buf->buf == NULL) { 1771 /* 1772 * TODO: alloc_cache_memory_buffer() should eventually free 1773 * some buffers. Need a more sophisticated check here, instead 1774 * of just bailing if 100 tries does not result in getting a 1775 * free buffer. This will involve using the sync channel's 1776 * semaphore to block until a buffer becomes available. 1777 */ 1778 if (count++ == 100) { 1779 SPDK_ERRLOG("could not allocate cache buffer\n"); 1780 assert(false); 1781 free(buf); 1782 return NULL; 1783 } 1784 buf->buf = alloc_cache_memory_buffer(file); 1785 } 1786 1787 buf->buf_size = CACHE_BUFFER_SIZE; 1788 buf->offset = offset; 1789 1790 pthread_spin_lock(&g_caches_lock); 1791 if (file->tree->present_mask == 0) { 1792 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1793 } 1794 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1795 pthread_spin_unlock(&g_caches_lock); 1796 1797 return buf; 1798 } 1799 1800 static struct cache_buffer * 1801 cache_append_buffer(struct spdk_file *file) 1802 { 1803 struct cache_buffer *last; 1804 1805 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1806 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1807 1808 last = cache_insert_buffer(file, file->append_pos); 1809 if (last == NULL) { 1810 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 1811 return NULL; 1812 } 1813 1814 file->last = last; 1815 1816 return last; 1817 } 1818 1819 static void 1820 __wake_caller(struct spdk_fs_cb_args *args) 1821 { 1822 sem_post(args->sem); 1823 } 1824 1825 static void __check_sync_reqs(struct spdk_file *file); 1826 1827 static void 1828 __file_cache_finish_sync(struct spdk_file *file) 1829 { 1830 struct spdk_fs_request *sync_req; 1831 struct spdk_fs_cb_args *sync_args; 1832 1833 pthread_spin_lock(&file->lock); 1834 sync_req = TAILQ_FIRST(&file->sync_requests); 1835 sync_args = &sync_req->args; 1836 assert(sync_args->op.sync.offset <= file->length_flushed); 1837 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1838 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1839 pthread_spin_unlock(&file->lock); 1840 1841 sync_args->fn.file_op(sync_args->arg, 0); 1842 __check_sync_reqs(file); 1843 1844 pthread_spin_lock(&file->lock); 1845 free_fs_request(sync_req); 1846 pthread_spin_unlock(&file->lock); 1847 } 1848 1849 static void 1850 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1851 { 1852 struct spdk_file *file = ctx; 1853 1854 __file_cache_finish_sync(file); 1855 } 1856 1857 static void 1858 __free_args(struct spdk_fs_cb_args *args) 1859 { 1860 struct spdk_fs_request *req; 1861 1862 if (!args->from_request) { 1863 free(args); 1864 } else { 1865 /* Depends on args being at the start of the spdk_fs_request structure. */ 1866 req = (struct spdk_fs_request *)args; 1867 free_fs_request(req); 1868 } 1869 } 1870 1871 static void 1872 __check_sync_reqs(struct spdk_file *file) 1873 { 1874 struct spdk_fs_request *sync_req; 1875 1876 pthread_spin_lock(&file->lock); 1877 1878 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1879 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1880 break; 1881 } 1882 } 1883 1884 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1885 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1886 sync_req->args.op.sync.xattr_in_progress = true; 1887 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 1888 sizeof(file->length_flushed)); 1889 1890 pthread_spin_unlock(&file->lock); 1891 spdk_blob_sync_md(file->blob, __file_cache_finish_sync_bs_cb, file); 1892 } else { 1893 pthread_spin_unlock(&file->lock); 1894 } 1895 } 1896 1897 static void 1898 __file_flush_done(void *arg, int bserrno) 1899 { 1900 struct spdk_fs_cb_args *args = arg; 1901 struct spdk_file *file = args->file; 1902 struct cache_buffer *next = args->op.flush.cache_buffer; 1903 1904 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1905 1906 pthread_spin_lock(&file->lock); 1907 next->in_progress = false; 1908 next->bytes_flushed += args->op.flush.length; 1909 file->length_flushed += args->op.flush.length; 1910 if (file->length_flushed > file->length) { 1911 file->length = file->length_flushed; 1912 } 1913 if (next->bytes_flushed == next->buf_size) { 1914 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1915 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1916 } 1917 1918 /* 1919 * Assert that there is no cached data that extends past the end of the underlying 1920 * blob. 1921 */ 1922 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1923 next->bytes_filled == 0); 1924 1925 pthread_spin_unlock(&file->lock); 1926 1927 __check_sync_reqs(file); 1928 1929 __file_flush(args); 1930 } 1931 1932 static void 1933 __file_flush(void *_args) 1934 { 1935 struct spdk_fs_cb_args *args = _args; 1936 struct spdk_file *file = args->file; 1937 struct cache_buffer *next; 1938 uint64_t offset, length, start_page, num_pages; 1939 uint32_t page_size; 1940 1941 pthread_spin_lock(&file->lock); 1942 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1943 if (next == NULL || next->in_progress) { 1944 /* 1945 * There is either no data to flush, or a flush I/O is already in 1946 * progress. So return immediately - if a flush I/O is in 1947 * progress we will flush more data after that is completed. 1948 */ 1949 __free_args(args); 1950 if (next == NULL) { 1951 /* 1952 * For cases where a file's cache was evicted, and then the 1953 * file was later appended, we will write the data directly 1954 * to disk and bypass cache. So just update length_flushed 1955 * here to reflect that all data was already written to disk. 1956 */ 1957 file->length_flushed = file->append_pos; 1958 } 1959 pthread_spin_unlock(&file->lock); 1960 if (next == NULL) { 1961 /* 1962 * There is no data to flush, but we still need to check for any 1963 * outstanding sync requests to make sure metadata gets updated. 1964 */ 1965 __check_sync_reqs(file); 1966 } 1967 return; 1968 } 1969 1970 offset = next->offset + next->bytes_flushed; 1971 length = next->bytes_filled - next->bytes_flushed; 1972 if (length == 0) { 1973 __free_args(args); 1974 pthread_spin_unlock(&file->lock); 1975 return; 1976 } 1977 args->op.flush.length = length; 1978 args->op.flush.cache_buffer = next; 1979 1980 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1981 1982 next->in_progress = true; 1983 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1984 offset, length, start_page, num_pages); 1985 pthread_spin_unlock(&file->lock); 1986 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1987 next->buf + (start_page * page_size) - next->offset, 1988 start_page, num_pages, __file_flush_done, args); 1989 } 1990 1991 static void 1992 __file_extend_done(void *arg, int bserrno) 1993 { 1994 struct spdk_fs_cb_args *args = arg; 1995 1996 __wake_caller(args); 1997 } 1998 1999 static void 2000 __file_extend_resize_cb(void *_args, int bserrno) 2001 { 2002 struct spdk_fs_cb_args *args = _args; 2003 struct spdk_file *file = args->file; 2004 2005 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2006 } 2007 2008 static void 2009 __file_extend_blob(void *_args) 2010 { 2011 struct spdk_fs_cb_args *args = _args; 2012 struct spdk_file *file = args->file; 2013 2014 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2015 } 2016 2017 static void 2018 __rw_from_file_done(void *arg, int bserrno) 2019 { 2020 struct spdk_fs_cb_args *args = arg; 2021 2022 __wake_caller(args); 2023 __free_args(args); 2024 } 2025 2026 static void 2027 __rw_from_file(void *_args) 2028 { 2029 struct spdk_fs_cb_args *args = _args; 2030 struct spdk_file *file = args->file; 2031 2032 if (args->op.rw.is_read) { 2033 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 2034 args->op.rw.offset, args->op.rw.length, 2035 __rw_from_file_done, args); 2036 } else { 2037 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 2038 args->op.rw.offset, args->op.rw.length, 2039 __rw_from_file_done, args); 2040 } 2041 } 2042 2043 static int 2044 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 2045 uint64_t offset, uint64_t length, bool is_read) 2046 { 2047 struct spdk_fs_cb_args *args; 2048 2049 args = calloc(1, sizeof(*args)); 2050 if (args == NULL) { 2051 sem_post(sem); 2052 return -ENOMEM; 2053 } 2054 2055 args->file = file; 2056 args->sem = sem; 2057 args->op.rw.user_buf = payload; 2058 args->op.rw.offset = offset; 2059 args->op.rw.length = length; 2060 args->op.rw.is_read = is_read; 2061 file->fs->send_request(__rw_from_file, args); 2062 return 0; 2063 } 2064 2065 int 2066 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 2067 void *payload, uint64_t offset, uint64_t length) 2068 { 2069 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2070 struct spdk_fs_cb_args *args; 2071 uint64_t rem_length, copy, blob_size, cluster_sz; 2072 uint32_t cache_buffers_filled = 0; 2073 uint8_t *cur_payload; 2074 struct cache_buffer *last; 2075 2076 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2077 2078 if (length == 0) { 2079 return 0; 2080 } 2081 2082 if (offset != file->append_pos) { 2083 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2084 return -EINVAL; 2085 } 2086 2087 pthread_spin_lock(&file->lock); 2088 file->open_for_writing = true; 2089 2090 if (file->last == NULL) { 2091 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 2092 cache_append_buffer(file); 2093 } else { 2094 int rc; 2095 2096 file->append_pos += length; 2097 pthread_spin_unlock(&file->lock); 2098 rc = __send_rw_from_file(file, &channel->sem, payload, 2099 offset, length, false); 2100 sem_wait(&channel->sem); 2101 return rc; 2102 } 2103 } 2104 2105 blob_size = __file_get_blob_size(file); 2106 2107 if ((offset + length) > blob_size) { 2108 struct spdk_fs_cb_args extend_args = {}; 2109 2110 cluster_sz = file->fs->bs_opts.cluster_sz; 2111 extend_args.sem = &channel->sem; 2112 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2113 extend_args.file = file; 2114 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2115 pthread_spin_unlock(&file->lock); 2116 file->fs->send_request(__file_extend_blob, &extend_args); 2117 sem_wait(&channel->sem); 2118 } 2119 2120 last = file->last; 2121 rem_length = length; 2122 cur_payload = payload; 2123 while (rem_length > 0) { 2124 copy = last->buf_size - last->bytes_filled; 2125 if (copy > rem_length) { 2126 copy = rem_length; 2127 } 2128 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2129 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2130 file->append_pos += copy; 2131 if (file->length < file->append_pos) { 2132 file->length = file->append_pos; 2133 } 2134 cur_payload += copy; 2135 last->bytes_filled += copy; 2136 rem_length -= copy; 2137 if (last->bytes_filled == last->buf_size) { 2138 cache_buffers_filled++; 2139 last = cache_append_buffer(file); 2140 if (last == NULL) { 2141 BLOBFS_TRACE(file, "nomem\n"); 2142 pthread_spin_unlock(&file->lock); 2143 return -ENOMEM; 2144 } 2145 } 2146 } 2147 2148 pthread_spin_unlock(&file->lock); 2149 2150 if (cache_buffers_filled == 0) { 2151 return 0; 2152 } 2153 2154 args = calloc(1, sizeof(*args)); 2155 if (args == NULL) { 2156 return -ENOMEM; 2157 } 2158 2159 args->file = file; 2160 file->fs->send_request(__file_flush, args); 2161 return 0; 2162 } 2163 2164 static void 2165 __readahead_done(void *arg, int bserrno) 2166 { 2167 struct spdk_fs_cb_args *args = arg; 2168 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2169 struct spdk_file *file = args->file; 2170 2171 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2172 2173 pthread_spin_lock(&file->lock); 2174 cache_buffer->bytes_filled = args->op.readahead.length; 2175 cache_buffer->bytes_flushed = args->op.readahead.length; 2176 cache_buffer->in_progress = false; 2177 pthread_spin_unlock(&file->lock); 2178 2179 __free_args(args); 2180 } 2181 2182 static void 2183 __readahead(void *_args) 2184 { 2185 struct spdk_fs_cb_args *args = _args; 2186 struct spdk_file *file = args->file; 2187 uint64_t offset, length, start_page, num_pages; 2188 uint32_t page_size; 2189 2190 offset = args->op.readahead.offset; 2191 length = args->op.readahead.length; 2192 assert(length > 0); 2193 2194 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 2195 2196 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2197 offset, length, start_page, num_pages); 2198 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2199 args->op.readahead.cache_buffer->buf, 2200 start_page, num_pages, __readahead_done, args); 2201 } 2202 2203 static uint64_t 2204 __next_cache_buffer_offset(uint64_t offset) 2205 { 2206 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2207 } 2208 2209 static void 2210 check_readahead(struct spdk_file *file, uint64_t offset) 2211 { 2212 struct spdk_fs_cb_args *args; 2213 2214 offset = __next_cache_buffer_offset(offset); 2215 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2216 return; 2217 } 2218 2219 args = calloc(1, sizeof(*args)); 2220 if (args == NULL) { 2221 return; 2222 } 2223 2224 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2225 2226 args->file = file; 2227 args->op.readahead.offset = offset; 2228 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2229 args->op.readahead.cache_buffer->in_progress = true; 2230 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2231 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2232 } else { 2233 args->op.readahead.length = CACHE_BUFFER_SIZE; 2234 } 2235 file->fs->send_request(__readahead, args); 2236 } 2237 2238 static int 2239 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2240 { 2241 struct cache_buffer *buf; 2242 int rc; 2243 2244 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2245 if (buf == NULL) { 2246 pthread_spin_unlock(&file->lock); 2247 rc = __send_rw_from_file(file, sem, payload, offset, length, true); 2248 pthread_spin_lock(&file->lock); 2249 return rc; 2250 } 2251 2252 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2253 length = buf->offset + buf->bytes_filled - offset; 2254 } 2255 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2256 memcpy(payload, &buf->buf[offset - buf->offset], length); 2257 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2258 pthread_spin_lock(&g_caches_lock); 2259 spdk_tree_remove_buffer(file->tree, buf); 2260 if (file->tree->present_mask == 0) { 2261 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2262 } 2263 pthread_spin_unlock(&g_caches_lock); 2264 } 2265 2266 sem_post(sem); 2267 return 0; 2268 } 2269 2270 int64_t 2271 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2272 void *payload, uint64_t offset, uint64_t length) 2273 { 2274 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2275 uint64_t final_offset, final_length; 2276 uint32_t sub_reads = 0; 2277 int rc = 0; 2278 2279 pthread_spin_lock(&file->lock); 2280 2281 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2282 2283 file->open_for_writing = false; 2284 2285 if (length == 0 || offset >= file->append_pos) { 2286 pthread_spin_unlock(&file->lock); 2287 return 0; 2288 } 2289 2290 if (offset + length > file->append_pos) { 2291 length = file->append_pos - offset; 2292 } 2293 2294 if (offset != file->next_seq_offset) { 2295 file->seq_byte_count = 0; 2296 } 2297 file->seq_byte_count += length; 2298 file->next_seq_offset = offset + length; 2299 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2300 check_readahead(file, offset); 2301 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2302 } 2303 2304 final_length = 0; 2305 final_offset = offset + length; 2306 while (offset < final_offset) { 2307 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2308 if (length > (final_offset - offset)) { 2309 length = final_offset - offset; 2310 } 2311 rc = __file_read(file, payload, offset, length, &channel->sem); 2312 if (rc == 0) { 2313 final_length += length; 2314 } else { 2315 break; 2316 } 2317 payload += length; 2318 offset += length; 2319 sub_reads++; 2320 } 2321 pthread_spin_unlock(&file->lock); 2322 while (sub_reads-- > 0) { 2323 sem_wait(&channel->sem); 2324 } 2325 if (rc == 0) { 2326 return final_length; 2327 } else { 2328 return rc; 2329 } 2330 } 2331 2332 static void 2333 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2334 spdk_file_op_complete cb_fn, void *cb_arg) 2335 { 2336 struct spdk_fs_request *sync_req; 2337 struct spdk_fs_request *flush_req; 2338 struct spdk_fs_cb_args *sync_args; 2339 struct spdk_fs_cb_args *flush_args; 2340 2341 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2342 2343 pthread_spin_lock(&file->lock); 2344 if (file->append_pos <= file->length_flushed) { 2345 BLOBFS_TRACE(file, "done - no data to flush\n"); 2346 pthread_spin_unlock(&file->lock); 2347 cb_fn(cb_arg, 0); 2348 return; 2349 } 2350 2351 sync_req = alloc_fs_request(channel); 2352 if (!sync_req) { 2353 pthread_spin_unlock(&file->lock); 2354 cb_fn(cb_arg, -ENOMEM); 2355 return; 2356 } 2357 sync_args = &sync_req->args; 2358 2359 flush_req = alloc_fs_request(channel); 2360 if (!flush_req) { 2361 pthread_spin_unlock(&file->lock); 2362 cb_fn(cb_arg, -ENOMEM); 2363 return; 2364 } 2365 flush_args = &flush_req->args; 2366 2367 sync_args->file = file; 2368 sync_args->fn.file_op = cb_fn; 2369 sync_args->arg = cb_arg; 2370 sync_args->op.sync.offset = file->append_pos; 2371 sync_args->op.sync.xattr_in_progress = false; 2372 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2373 pthread_spin_unlock(&file->lock); 2374 2375 flush_args->file = file; 2376 channel->send_request(__file_flush, flush_args); 2377 } 2378 2379 int 2380 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2381 { 2382 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2383 2384 _file_sync(file, channel, __sem_post, &channel->sem); 2385 sem_wait(&channel->sem); 2386 2387 return 0; 2388 } 2389 2390 void 2391 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2392 spdk_file_op_complete cb_fn, void *cb_arg) 2393 { 2394 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2395 2396 _file_sync(file, channel, cb_fn, cb_arg); 2397 } 2398 2399 void 2400 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2401 { 2402 BLOBFS_TRACE(file, "priority=%u\n", priority); 2403 file->priority = priority; 2404 2405 } 2406 2407 /* 2408 * Close routines 2409 */ 2410 2411 static void 2412 __file_close_async_done(void *ctx, int bserrno) 2413 { 2414 struct spdk_fs_request *req = ctx; 2415 struct spdk_fs_cb_args *args = &req->args; 2416 struct spdk_file *file = args->file; 2417 2418 if (file->is_deleted) { 2419 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2420 return; 2421 } 2422 2423 args->fn.file_op(args->arg, bserrno); 2424 free_fs_request(req); 2425 } 2426 2427 static void 2428 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2429 { 2430 struct spdk_blob *blob; 2431 2432 pthread_spin_lock(&file->lock); 2433 if (file->ref_count == 0) { 2434 pthread_spin_unlock(&file->lock); 2435 __file_close_async_done(req, -EBADF); 2436 return; 2437 } 2438 2439 file->ref_count--; 2440 if (file->ref_count > 0) { 2441 pthread_spin_unlock(&file->lock); 2442 req->args.fn.file_op(req->args.arg, 0); 2443 free_fs_request(req); 2444 return; 2445 } 2446 2447 pthread_spin_unlock(&file->lock); 2448 2449 blob = file->blob; 2450 file->blob = NULL; 2451 spdk_blob_close(blob, __file_close_async_done, req); 2452 } 2453 2454 static void 2455 __file_close_async__sync_done(void *arg, int fserrno) 2456 { 2457 struct spdk_fs_request *req = arg; 2458 struct spdk_fs_cb_args *args = &req->args; 2459 2460 __file_close_async(args->file, req); 2461 } 2462 2463 void 2464 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2465 { 2466 struct spdk_fs_request *req; 2467 struct spdk_fs_cb_args *args; 2468 2469 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2470 if (req == NULL) { 2471 cb_fn(cb_arg, -ENOMEM); 2472 return; 2473 } 2474 2475 args = &req->args; 2476 args->file = file; 2477 args->fn.file_op = cb_fn; 2478 args->arg = cb_arg; 2479 2480 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2481 } 2482 2483 static void 2484 __file_close_done(void *arg, int fserrno) 2485 { 2486 struct spdk_fs_cb_args *args = arg; 2487 2488 args->rc = fserrno; 2489 sem_post(args->sem); 2490 } 2491 2492 static void 2493 __file_close(void *arg) 2494 { 2495 struct spdk_fs_request *req = arg; 2496 struct spdk_fs_cb_args *args = &req->args; 2497 struct spdk_file *file = args->file; 2498 2499 __file_close_async(file, req); 2500 } 2501 2502 int 2503 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2504 { 2505 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2506 struct spdk_fs_request *req; 2507 struct spdk_fs_cb_args *args; 2508 2509 req = alloc_fs_request(channel); 2510 if (req == NULL) { 2511 return -ENOMEM; 2512 } 2513 2514 args = &req->args; 2515 2516 spdk_file_sync(file, _channel); 2517 BLOBFS_TRACE(file, "name=%s\n", file->name); 2518 args->file = file; 2519 args->sem = &channel->sem; 2520 args->fn.file_op = __file_close_done; 2521 args->arg = req; 2522 channel->send_request(__file_close, req); 2523 sem_wait(&channel->sem); 2524 2525 return args->rc; 2526 } 2527 2528 static void 2529 cache_free_buffers(struct spdk_file *file) 2530 { 2531 BLOBFS_TRACE(file, "free=%s\n", file->name); 2532 pthread_spin_lock(&file->lock); 2533 pthread_spin_lock(&g_caches_lock); 2534 if (file->tree->present_mask == 0) { 2535 pthread_spin_unlock(&g_caches_lock); 2536 pthread_spin_unlock(&file->lock); 2537 return; 2538 } 2539 spdk_tree_free_buffers(file->tree); 2540 2541 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2542 /* If not freed, put it in the end of the queue */ 2543 if (file->tree->present_mask != 0) { 2544 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2545 } 2546 file->last = NULL; 2547 pthread_spin_unlock(&g_caches_lock); 2548 pthread_spin_unlock(&file->lock); 2549 } 2550 2551 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2552 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2553