1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "blobfs_internal.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/thread.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 47 #define BLOBFS_TRACE(file, str, args...) \ 48 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 49 50 #define BLOBFS_TRACE_RW(file, str, args...) \ 51 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 55 56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 57 static struct spdk_mempool *g_cache_pool; 58 static TAILQ_HEAD(, spdk_file) g_caches; 59 static int g_fs_count = 0; 60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 61 static pthread_spinlock_t g_caches_lock; 62 63 void 64 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 65 { 66 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 67 free(cache_buffer); 68 } 69 70 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 71 72 struct spdk_file { 73 struct spdk_filesystem *fs; 74 struct spdk_blob *blob; 75 char *name; 76 uint64_t length; 77 bool is_deleted; 78 bool open_for_writing; 79 uint64_t length_flushed; 80 uint64_t append_pos; 81 uint64_t seq_byte_count; 82 uint64_t next_seq_offset; 83 uint32_t priority; 84 TAILQ_ENTRY(spdk_file) tailq; 85 spdk_blob_id blobid; 86 uint32_t ref_count; 87 pthread_spinlock_t lock; 88 struct cache_buffer *last; 89 struct cache_tree *tree; 90 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 91 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 92 TAILQ_ENTRY(spdk_file) cache_tailq; 93 }; 94 95 struct spdk_deleted_file { 96 spdk_blob_id id; 97 TAILQ_ENTRY(spdk_deleted_file) tailq; 98 }; 99 100 struct spdk_filesystem { 101 struct spdk_blob_store *bs; 102 TAILQ_HEAD(, spdk_file) files; 103 struct spdk_bs_opts bs_opts; 104 struct spdk_bs_dev *bdev; 105 fs_send_request_fn send_request; 106 107 struct { 108 uint32_t max_ops; 109 struct spdk_io_channel *sync_io_channel; 110 struct spdk_fs_channel *sync_fs_channel; 111 } sync_target; 112 113 struct { 114 uint32_t max_ops; 115 struct spdk_io_channel *md_io_channel; 116 struct spdk_fs_channel *md_fs_channel; 117 } md_target; 118 119 struct { 120 uint32_t max_ops; 121 } io_target; 122 }; 123 124 struct spdk_fs_cb_args { 125 union { 126 spdk_fs_op_with_handle_complete fs_op_with_handle; 127 spdk_fs_op_complete fs_op; 128 spdk_file_op_with_handle_complete file_op_with_handle; 129 spdk_file_op_complete file_op; 130 spdk_file_stat_op_complete stat_op; 131 } fn; 132 void *arg; 133 sem_t *sem; 134 struct spdk_filesystem *fs; 135 struct spdk_file *file; 136 int rc; 137 bool from_request; 138 union { 139 struct { 140 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 141 } fs_load; 142 struct { 143 uint64_t length; 144 } truncate; 145 struct { 146 struct spdk_io_channel *channel; 147 void *user_buf; 148 void *pin_buf; 149 int is_read; 150 off_t offset; 151 size_t length; 152 uint64_t start_page; 153 uint64_t num_pages; 154 uint32_t blocklen; 155 } rw; 156 struct { 157 const char *old_name; 158 const char *new_name; 159 } rename; 160 struct { 161 struct cache_buffer *cache_buffer; 162 uint64_t length; 163 } flush; 164 struct { 165 struct cache_buffer *cache_buffer; 166 uint64_t length; 167 uint64_t offset; 168 } readahead; 169 struct { 170 uint64_t offset; 171 TAILQ_ENTRY(spdk_fs_request) tailq; 172 bool xattr_in_progress; 173 } sync; 174 struct { 175 uint32_t num_clusters; 176 } resize; 177 struct { 178 const char *name; 179 uint32_t flags; 180 TAILQ_ENTRY(spdk_fs_request) tailq; 181 } open; 182 struct { 183 const char *name; 184 struct spdk_blob *blob; 185 } create; 186 struct { 187 const char *name; 188 } delete; 189 struct { 190 const char *name; 191 } stat; 192 } op; 193 }; 194 195 static void cache_free_buffers(struct spdk_file *file); 196 197 void 198 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 199 { 200 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 201 } 202 203 static void 204 __initialize_cache(void) 205 { 206 assert(g_cache_pool == NULL); 207 208 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 209 g_fs_cache_size / CACHE_BUFFER_SIZE, 210 CACHE_BUFFER_SIZE, 211 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 212 SPDK_ENV_SOCKET_ID_ANY); 213 if (!g_cache_pool) { 214 SPDK_ERRLOG("Create mempool failed, you may " 215 "increase the memory and try again\n"); 216 assert(false); 217 } 218 TAILQ_INIT(&g_caches); 219 pthread_spin_init(&g_caches_lock, 0); 220 } 221 222 static void 223 __free_cache(void) 224 { 225 assert(g_cache_pool != NULL); 226 227 spdk_mempool_free(g_cache_pool); 228 g_cache_pool = NULL; 229 } 230 231 static uint64_t 232 __file_get_blob_size(struct spdk_file *file) 233 { 234 uint64_t cluster_sz; 235 236 cluster_sz = file->fs->bs_opts.cluster_sz; 237 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 238 } 239 240 struct spdk_fs_request { 241 struct spdk_fs_cb_args args; 242 TAILQ_ENTRY(spdk_fs_request) link; 243 struct spdk_fs_channel *channel; 244 }; 245 246 struct spdk_fs_channel { 247 struct spdk_fs_request *req_mem; 248 TAILQ_HEAD(, spdk_fs_request) reqs; 249 sem_t sem; 250 struct spdk_filesystem *fs; 251 struct spdk_io_channel *bs_channel; 252 fs_send_request_fn send_request; 253 bool sync; 254 pthread_spinlock_t lock; 255 }; 256 257 static struct spdk_fs_request * 258 alloc_fs_request(struct spdk_fs_channel *channel) 259 { 260 struct spdk_fs_request *req; 261 262 if (channel->sync) { 263 pthread_spin_lock(&channel->lock); 264 } 265 266 req = TAILQ_FIRST(&channel->reqs); 267 if (req) { 268 TAILQ_REMOVE(&channel->reqs, req, link); 269 } 270 271 if (channel->sync) { 272 pthread_spin_unlock(&channel->lock); 273 } 274 275 if (req == NULL) { 276 return NULL; 277 } 278 memset(req, 0, sizeof(*req)); 279 req->channel = channel; 280 req->args.from_request = true; 281 282 return req; 283 } 284 285 static void 286 free_fs_request(struct spdk_fs_request *req) 287 { 288 struct spdk_fs_channel *channel = req->channel; 289 290 if (channel->sync) { 291 pthread_spin_lock(&channel->lock); 292 } 293 294 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 295 296 if (channel->sync) { 297 pthread_spin_unlock(&channel->lock); 298 } 299 } 300 301 static int 302 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 303 uint32_t max_ops) 304 { 305 uint32_t i; 306 307 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 308 if (!channel->req_mem) { 309 return -1; 310 } 311 312 TAILQ_INIT(&channel->reqs); 313 sem_init(&channel->sem, 0, 0); 314 315 for (i = 0; i < max_ops; i++) { 316 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 317 } 318 319 channel->fs = fs; 320 321 return 0; 322 } 323 324 static int 325 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 326 { 327 struct spdk_filesystem *fs; 328 struct spdk_fs_channel *channel = ctx_buf; 329 330 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 331 332 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 333 } 334 335 static int 336 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 337 { 338 struct spdk_filesystem *fs; 339 struct spdk_fs_channel *channel = ctx_buf; 340 341 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 342 343 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 344 } 345 346 static int 347 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 348 { 349 struct spdk_filesystem *fs; 350 struct spdk_fs_channel *channel = ctx_buf; 351 352 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 353 354 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 355 } 356 357 static void 358 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 359 { 360 struct spdk_fs_channel *channel = ctx_buf; 361 362 free(channel->req_mem); 363 if (channel->bs_channel != NULL) { 364 spdk_bs_free_io_channel(channel->bs_channel); 365 } 366 } 367 368 static void 369 __send_request_direct(fs_request_fn fn, void *arg) 370 { 371 fn(arg); 372 } 373 374 static void 375 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 376 { 377 fs->bs = bs; 378 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 379 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 380 fs->md_target.md_fs_channel->send_request = __send_request_direct; 381 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 382 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 383 384 pthread_mutex_lock(&g_cache_init_lock); 385 if (g_fs_count == 0) { 386 __initialize_cache(); 387 } 388 g_fs_count++; 389 pthread_mutex_unlock(&g_cache_init_lock); 390 } 391 392 static void 393 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 394 { 395 struct spdk_fs_request *req = ctx; 396 struct spdk_fs_cb_args *args = &req->args; 397 struct spdk_filesystem *fs = args->fs; 398 399 if (bserrno == 0) { 400 common_fs_bs_init(fs, bs); 401 } else { 402 free(fs); 403 fs = NULL; 404 } 405 406 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 407 free_fs_request(req); 408 } 409 410 static void 411 fs_conf_parse(void) 412 { 413 struct spdk_conf_section *sp; 414 415 sp = spdk_conf_find_section(NULL, "Blobfs"); 416 if (sp == NULL) { 417 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 418 return; 419 } 420 421 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 422 if (g_fs_cache_buffer_shift <= 0) { 423 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 424 } 425 } 426 427 static struct spdk_filesystem * 428 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 429 { 430 struct spdk_filesystem *fs; 431 432 fs = calloc(1, sizeof(*fs)); 433 if (fs == NULL) { 434 return NULL; 435 } 436 437 fs->bdev = dev; 438 fs->send_request = send_request_fn; 439 TAILQ_INIT(&fs->files); 440 441 fs->md_target.max_ops = 512; 442 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 443 sizeof(struct spdk_fs_channel)); 444 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 445 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 446 447 fs->sync_target.max_ops = 512; 448 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 449 sizeof(struct spdk_fs_channel)); 450 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 451 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 452 453 fs->io_target.max_ops = 512; 454 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 455 sizeof(struct spdk_fs_channel)); 456 457 return fs; 458 } 459 460 static void 461 __wake_caller(void *arg, int fserrno) 462 { 463 struct spdk_fs_cb_args *args = arg; 464 465 args->rc = fserrno; 466 sem_post(args->sem); 467 } 468 469 void 470 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 471 fs_send_request_fn send_request_fn, 472 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 473 { 474 struct spdk_filesystem *fs; 475 struct spdk_fs_request *req; 476 struct spdk_fs_cb_args *args; 477 struct spdk_bs_opts opts = {}; 478 479 fs = fs_alloc(dev, send_request_fn); 480 if (fs == NULL) { 481 cb_fn(cb_arg, NULL, -ENOMEM); 482 return; 483 } 484 485 fs_conf_parse(); 486 487 req = alloc_fs_request(fs->md_target.md_fs_channel); 488 if (req == NULL) { 489 spdk_put_io_channel(fs->md_target.md_io_channel); 490 spdk_io_device_unregister(&fs->md_target, NULL); 491 spdk_put_io_channel(fs->sync_target.sync_io_channel); 492 spdk_io_device_unregister(&fs->sync_target, NULL); 493 spdk_io_device_unregister(&fs->io_target, NULL); 494 free(fs); 495 cb_fn(cb_arg, NULL, -ENOMEM); 496 return; 497 } 498 499 args = &req->args; 500 args->fn.fs_op_with_handle = cb_fn; 501 args->arg = cb_arg; 502 args->fs = fs; 503 504 spdk_bs_opts_init(&opts); 505 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS"); 506 if (opt) { 507 opts.cluster_sz = opt->cluster_sz; 508 } 509 spdk_bs_init(dev, &opts, init_cb, req); 510 } 511 512 static struct spdk_file * 513 file_alloc(struct spdk_filesystem *fs) 514 { 515 struct spdk_file *file; 516 517 file = calloc(1, sizeof(*file)); 518 if (file == NULL) { 519 return NULL; 520 } 521 522 file->tree = calloc(1, sizeof(*file->tree)); 523 if (file->tree == NULL) { 524 free(file); 525 return NULL; 526 } 527 528 file->fs = fs; 529 TAILQ_INIT(&file->open_requests); 530 TAILQ_INIT(&file->sync_requests); 531 pthread_spin_init(&file->lock, 0); 532 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 533 file->priority = SPDK_FILE_PRIORITY_LOW; 534 return file; 535 } 536 537 static void fs_load_done(void *ctx, int bserrno); 538 539 static int 540 _handle_deleted_files(struct spdk_fs_request *req) 541 { 542 struct spdk_fs_cb_args *args = &req->args; 543 struct spdk_filesystem *fs = args->fs; 544 545 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 546 struct spdk_deleted_file *deleted_file; 547 548 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 549 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 550 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 551 free(deleted_file); 552 return 0; 553 } 554 555 return 1; 556 } 557 558 static void 559 fs_load_done(void *ctx, int bserrno) 560 { 561 struct spdk_fs_request *req = ctx; 562 struct spdk_fs_cb_args *args = &req->args; 563 struct spdk_filesystem *fs = args->fs; 564 565 /* The filesystem has been loaded. Now check if there are any files that 566 * were marked for deletion before last unload. Do not complete the 567 * fs_load callback until all of them have been deleted on disk. 568 */ 569 if (_handle_deleted_files(req) == 0) { 570 /* We found a file that's been marked for deleting but not actually 571 * deleted yet. This function will get called again once the delete 572 * operation is completed. 573 */ 574 return; 575 } 576 577 args->fn.fs_op_with_handle(args->arg, fs, 0); 578 free_fs_request(req); 579 580 } 581 582 static void 583 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 584 { 585 struct spdk_fs_request *req = ctx; 586 struct spdk_fs_cb_args *args = &req->args; 587 struct spdk_filesystem *fs = args->fs; 588 uint64_t *length; 589 const char *name; 590 uint32_t *is_deleted; 591 size_t value_len; 592 593 if (rc < 0) { 594 args->fn.fs_op_with_handle(args->arg, fs, rc); 595 free_fs_request(req); 596 return; 597 } 598 599 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 600 if (rc < 0) { 601 args->fn.fs_op_with_handle(args->arg, fs, rc); 602 free_fs_request(req); 603 return; 604 } 605 606 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 607 if (rc < 0) { 608 args->fn.fs_op_with_handle(args->arg, fs, rc); 609 free_fs_request(req); 610 return; 611 } 612 613 assert(value_len == 8); 614 615 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 616 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 617 if (rc < 0) { 618 struct spdk_file *f; 619 620 f = file_alloc(fs); 621 if (f == NULL) { 622 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 623 free_fs_request(req); 624 return; 625 } 626 627 f->name = strdup(name); 628 f->blobid = spdk_blob_get_id(blob); 629 f->length = *length; 630 f->length_flushed = *length; 631 f->append_pos = *length; 632 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 633 } else { 634 struct spdk_deleted_file *deleted_file; 635 636 deleted_file = calloc(1, sizeof(*deleted_file)); 637 if (deleted_file == NULL) { 638 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 639 free_fs_request(req); 640 return; 641 } 642 deleted_file->id = spdk_blob_get_id(blob); 643 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 644 } 645 } 646 647 static void 648 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 649 { 650 struct spdk_fs_request *req = ctx; 651 struct spdk_fs_cb_args *args = &req->args; 652 struct spdk_filesystem *fs = args->fs; 653 struct spdk_bs_type bstype; 654 static const struct spdk_bs_type blobfs_type = {"BLOBFS"}; 655 static const struct spdk_bs_type zeros; 656 657 if (bserrno != 0) { 658 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 659 free_fs_request(req); 660 free(fs); 661 return; 662 } 663 664 bstype = spdk_bs_get_bstype(bs); 665 666 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 667 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n"); 668 spdk_bs_set_bstype(bs, blobfs_type); 669 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 670 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n"); 671 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype)); 672 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 673 free_fs_request(req); 674 free(fs); 675 return; 676 } 677 678 common_fs_bs_init(fs, bs); 679 fs_load_done(req, 0); 680 } 681 682 static void 683 spdk_fs_io_device_unregister(struct spdk_filesystem *fs) 684 { 685 assert(fs != NULL); 686 spdk_io_device_unregister(&fs->md_target, NULL); 687 spdk_io_device_unregister(&fs->sync_target, NULL); 688 spdk_io_device_unregister(&fs->io_target, NULL); 689 free(fs); 690 } 691 692 static void 693 spdk_fs_free_io_channels(struct spdk_filesystem *fs) 694 { 695 assert(fs != NULL); 696 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 697 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 698 } 699 700 void 701 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 702 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 703 { 704 struct spdk_filesystem *fs; 705 struct spdk_fs_cb_args *args; 706 struct spdk_fs_request *req; 707 struct spdk_bs_opts bs_opts; 708 709 fs = fs_alloc(dev, send_request_fn); 710 if (fs == NULL) { 711 cb_fn(cb_arg, NULL, -ENOMEM); 712 return; 713 } 714 715 fs_conf_parse(); 716 717 req = alloc_fs_request(fs->md_target.md_fs_channel); 718 if (req == NULL) { 719 spdk_fs_free_io_channels(fs); 720 spdk_fs_io_device_unregister(fs); 721 cb_fn(cb_arg, NULL, -ENOMEM); 722 return; 723 } 724 725 args = &req->args; 726 args->fn.fs_op_with_handle = cb_fn; 727 args->arg = cb_arg; 728 args->fs = fs; 729 TAILQ_INIT(&args->op.fs_load.deleted_files); 730 spdk_bs_opts_init(&bs_opts); 731 bs_opts.iter_cb_fn = iter_cb; 732 bs_opts.iter_cb_arg = req; 733 spdk_bs_load(dev, &bs_opts, load_cb, req); 734 } 735 736 static void 737 unload_cb(void *ctx, int bserrno) 738 { 739 struct spdk_fs_request *req = ctx; 740 struct spdk_fs_cb_args *args = &req->args; 741 struct spdk_filesystem *fs = args->fs; 742 743 pthread_mutex_lock(&g_cache_init_lock); 744 g_fs_count--; 745 if (g_fs_count == 0) { 746 __free_cache(); 747 } 748 pthread_mutex_unlock(&g_cache_init_lock); 749 750 args->fn.fs_op(args->arg, bserrno); 751 free(req); 752 753 spdk_fs_io_device_unregister(fs); 754 } 755 756 void 757 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 758 { 759 struct spdk_fs_request *req; 760 struct spdk_fs_cb_args *args; 761 762 /* 763 * We must free the md_channel before unloading the blobstore, so just 764 * allocate this request from the general heap. 765 */ 766 req = calloc(1, sizeof(*req)); 767 if (req == NULL) { 768 cb_fn(cb_arg, -ENOMEM); 769 return; 770 } 771 772 args = &req->args; 773 args->fn.fs_op = cb_fn; 774 args->arg = cb_arg; 775 args->fs = fs; 776 777 spdk_fs_free_io_channels(fs); 778 spdk_bs_unload(fs->bs, unload_cb, req); 779 } 780 781 static struct spdk_file * 782 fs_find_file(struct spdk_filesystem *fs, const char *name) 783 { 784 struct spdk_file *file; 785 786 TAILQ_FOREACH(file, &fs->files, tailq) { 787 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 788 return file; 789 } 790 } 791 792 return NULL; 793 } 794 795 void 796 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 797 spdk_file_stat_op_complete cb_fn, void *cb_arg) 798 { 799 struct spdk_file_stat stat; 800 struct spdk_file *f = NULL; 801 802 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 803 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 804 return; 805 } 806 807 f = fs_find_file(fs, name); 808 if (f != NULL) { 809 stat.blobid = f->blobid; 810 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 811 cb_fn(cb_arg, &stat, 0); 812 return; 813 } 814 815 cb_fn(cb_arg, NULL, -ENOENT); 816 } 817 818 static void 819 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 820 { 821 struct spdk_fs_request *req = arg; 822 struct spdk_fs_cb_args *args = &req->args; 823 824 args->rc = fserrno; 825 if (fserrno == 0) { 826 memcpy(args->arg, stat, sizeof(*stat)); 827 } 828 sem_post(args->sem); 829 } 830 831 static void 832 __file_stat(void *arg) 833 { 834 struct spdk_fs_request *req = arg; 835 struct spdk_fs_cb_args *args = &req->args; 836 837 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 838 args->fn.stat_op, req); 839 } 840 841 int 842 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 843 const char *name, struct spdk_file_stat *stat) 844 { 845 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 846 struct spdk_fs_request *req; 847 int rc; 848 849 req = alloc_fs_request(channel); 850 if (req == NULL) { 851 return -ENOMEM; 852 } 853 854 req->args.fs = fs; 855 req->args.op.stat.name = name; 856 req->args.fn.stat_op = __copy_stat; 857 req->args.arg = stat; 858 req->args.sem = &channel->sem; 859 channel->send_request(__file_stat, req); 860 sem_wait(&channel->sem); 861 862 rc = req->args.rc; 863 free_fs_request(req); 864 865 return rc; 866 } 867 868 static void 869 fs_create_blob_close_cb(void *ctx, int bserrno) 870 { 871 int rc; 872 struct spdk_fs_request *req = ctx; 873 struct spdk_fs_cb_args *args = &req->args; 874 875 rc = args->rc ? args->rc : bserrno; 876 args->fn.file_op(args->arg, rc); 877 free_fs_request(req); 878 } 879 880 static void 881 fs_create_blob_resize_cb(void *ctx, int bserrno) 882 { 883 struct spdk_fs_request *req = ctx; 884 struct spdk_fs_cb_args *args = &req->args; 885 struct spdk_file *f = args->file; 886 struct spdk_blob *blob = args->op.create.blob; 887 uint64_t length = 0; 888 889 args->rc = bserrno; 890 if (bserrno) { 891 spdk_blob_close(blob, fs_create_blob_close_cb, args); 892 return; 893 } 894 895 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 896 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 897 898 spdk_blob_close(blob, fs_create_blob_close_cb, args); 899 } 900 901 static void 902 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 903 { 904 struct spdk_fs_request *req = ctx; 905 struct spdk_fs_cb_args *args = &req->args; 906 907 if (bserrno) { 908 args->fn.file_op(args->arg, bserrno); 909 free_fs_request(req); 910 return; 911 } 912 913 args->op.create.blob = blob; 914 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 915 } 916 917 static void 918 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 919 { 920 struct spdk_fs_request *req = ctx; 921 struct spdk_fs_cb_args *args = &req->args; 922 struct spdk_file *f = args->file; 923 924 if (bserrno) { 925 args->fn.file_op(args->arg, bserrno); 926 free_fs_request(req); 927 return; 928 } 929 930 f->blobid = blobid; 931 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 932 } 933 934 void 935 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 936 spdk_file_op_complete cb_fn, void *cb_arg) 937 { 938 struct spdk_file *file; 939 struct spdk_fs_request *req; 940 struct spdk_fs_cb_args *args; 941 942 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 943 cb_fn(cb_arg, -ENAMETOOLONG); 944 return; 945 } 946 947 file = fs_find_file(fs, name); 948 if (file != NULL) { 949 cb_fn(cb_arg, -EEXIST); 950 return; 951 } 952 953 file = file_alloc(fs); 954 if (file == NULL) { 955 cb_fn(cb_arg, -ENOMEM); 956 return; 957 } 958 959 req = alloc_fs_request(fs->md_target.md_fs_channel); 960 if (req == NULL) { 961 cb_fn(cb_arg, -ENOMEM); 962 return; 963 } 964 965 args = &req->args; 966 args->file = file; 967 args->fn.file_op = cb_fn; 968 args->arg = cb_arg; 969 970 file->name = strdup(name); 971 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 972 } 973 974 static void 975 __fs_create_file_done(void *arg, int fserrno) 976 { 977 struct spdk_fs_request *req = arg; 978 struct spdk_fs_cb_args *args = &req->args; 979 980 args->rc = fserrno; 981 sem_post(args->sem); 982 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 983 } 984 985 static void 986 __fs_create_file(void *arg) 987 { 988 struct spdk_fs_request *req = arg; 989 struct spdk_fs_cb_args *args = &req->args; 990 991 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 992 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 993 } 994 995 int 996 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 997 { 998 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 999 struct spdk_fs_request *req; 1000 struct spdk_fs_cb_args *args; 1001 int rc; 1002 1003 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1004 1005 req = alloc_fs_request(channel); 1006 if (req == NULL) { 1007 return -ENOMEM; 1008 } 1009 1010 args = &req->args; 1011 args->fs = fs; 1012 args->op.create.name = name; 1013 args->sem = &channel->sem; 1014 fs->send_request(__fs_create_file, req); 1015 sem_wait(&channel->sem); 1016 rc = args->rc; 1017 free_fs_request(req); 1018 1019 return rc; 1020 } 1021 1022 static void 1023 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1024 { 1025 struct spdk_fs_request *req = ctx; 1026 struct spdk_fs_cb_args *args = &req->args; 1027 struct spdk_file *f = args->file; 1028 1029 f->blob = blob; 1030 while (!TAILQ_EMPTY(&f->open_requests)) { 1031 req = TAILQ_FIRST(&f->open_requests); 1032 args = &req->args; 1033 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1034 args->fn.file_op_with_handle(args->arg, f, bserrno); 1035 free_fs_request(req); 1036 } 1037 } 1038 1039 static void 1040 fs_open_blob_create_cb(void *ctx, int bserrno) 1041 { 1042 struct spdk_fs_request *req = ctx; 1043 struct spdk_fs_cb_args *args = &req->args; 1044 struct spdk_file *file = args->file; 1045 struct spdk_filesystem *fs = args->fs; 1046 1047 if (file == NULL) { 1048 /* 1049 * This is from an open with CREATE flag - the file 1050 * is now created so look it up in the file list for this 1051 * filesystem. 1052 */ 1053 file = fs_find_file(fs, args->op.open.name); 1054 assert(file != NULL); 1055 args->file = file; 1056 } 1057 1058 file->ref_count++; 1059 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1060 if (file->ref_count == 1) { 1061 assert(file->blob == NULL); 1062 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1063 } else if (file->blob != NULL) { 1064 fs_open_blob_done(req, file->blob, 0); 1065 } else { 1066 /* 1067 * The blob open for this file is in progress due to a previous 1068 * open request. When that open completes, it will invoke the 1069 * open callback for this request. 1070 */ 1071 } 1072 } 1073 1074 void 1075 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1076 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1077 { 1078 struct spdk_file *f = NULL; 1079 struct spdk_fs_request *req; 1080 struct spdk_fs_cb_args *args; 1081 1082 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1083 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1084 return; 1085 } 1086 1087 f = fs_find_file(fs, name); 1088 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1089 cb_fn(cb_arg, NULL, -ENOENT); 1090 return; 1091 } 1092 1093 if (f != NULL && f->is_deleted == true) { 1094 cb_fn(cb_arg, NULL, -ENOENT); 1095 return; 1096 } 1097 1098 req = alloc_fs_request(fs->md_target.md_fs_channel); 1099 if (req == NULL) { 1100 cb_fn(cb_arg, NULL, -ENOMEM); 1101 return; 1102 } 1103 1104 args = &req->args; 1105 args->fn.file_op_with_handle = cb_fn; 1106 args->arg = cb_arg; 1107 args->file = f; 1108 args->fs = fs; 1109 args->op.open.name = name; 1110 1111 if (f == NULL) { 1112 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1113 } else { 1114 fs_open_blob_create_cb(req, 0); 1115 } 1116 } 1117 1118 static void 1119 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1120 { 1121 struct spdk_fs_request *req = arg; 1122 struct spdk_fs_cb_args *args = &req->args; 1123 1124 args->file = file; 1125 __wake_caller(args, bserrno); 1126 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1127 } 1128 1129 static void 1130 __fs_open_file(void *arg) 1131 { 1132 struct spdk_fs_request *req = arg; 1133 struct spdk_fs_cb_args *args = &req->args; 1134 1135 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1136 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1137 __fs_open_file_done, req); 1138 } 1139 1140 int 1141 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1142 const char *name, uint32_t flags, struct spdk_file **file) 1143 { 1144 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1145 struct spdk_fs_request *req; 1146 struct spdk_fs_cb_args *args; 1147 int rc; 1148 1149 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1150 1151 req = alloc_fs_request(channel); 1152 if (req == NULL) { 1153 return -ENOMEM; 1154 } 1155 1156 args = &req->args; 1157 args->fs = fs; 1158 args->op.open.name = name; 1159 args->op.open.flags = flags; 1160 args->sem = &channel->sem; 1161 fs->send_request(__fs_open_file, req); 1162 sem_wait(&channel->sem); 1163 rc = args->rc; 1164 if (rc == 0) { 1165 *file = args->file; 1166 } else { 1167 *file = NULL; 1168 } 1169 free_fs_request(req); 1170 1171 return rc; 1172 } 1173 1174 static void 1175 fs_rename_blob_close_cb(void *ctx, int bserrno) 1176 { 1177 struct spdk_fs_request *req = ctx; 1178 struct spdk_fs_cb_args *args = &req->args; 1179 1180 args->fn.fs_op(args->arg, bserrno); 1181 free_fs_request(req); 1182 } 1183 1184 static void 1185 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1186 { 1187 struct spdk_fs_request *req = ctx; 1188 struct spdk_fs_cb_args *args = &req->args; 1189 const char *new_name = args->op.rename.new_name; 1190 1191 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1192 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1193 } 1194 1195 static void 1196 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1197 { 1198 struct spdk_fs_cb_args *args = &req->args; 1199 struct spdk_file *f; 1200 1201 f = fs_find_file(args->fs, args->op.rename.old_name); 1202 if (f == NULL) { 1203 args->fn.fs_op(args->arg, -ENOENT); 1204 free_fs_request(req); 1205 return; 1206 } 1207 1208 free(f->name); 1209 f->name = strdup(args->op.rename.new_name); 1210 args->file = f; 1211 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1212 } 1213 1214 static void 1215 fs_rename_delete_done(void *arg, int fserrno) 1216 { 1217 __spdk_fs_md_rename_file(arg); 1218 } 1219 1220 void 1221 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1222 const char *old_name, const char *new_name, 1223 spdk_file_op_complete cb_fn, void *cb_arg) 1224 { 1225 struct spdk_file *f; 1226 struct spdk_fs_request *req; 1227 struct spdk_fs_cb_args *args; 1228 1229 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1230 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1231 cb_fn(cb_arg, -ENAMETOOLONG); 1232 return; 1233 } 1234 1235 req = alloc_fs_request(fs->md_target.md_fs_channel); 1236 if (req == NULL) { 1237 cb_fn(cb_arg, -ENOMEM); 1238 return; 1239 } 1240 1241 args = &req->args; 1242 args->fn.fs_op = cb_fn; 1243 args->fs = fs; 1244 args->arg = cb_arg; 1245 args->op.rename.old_name = old_name; 1246 args->op.rename.new_name = new_name; 1247 1248 f = fs_find_file(fs, new_name); 1249 if (f == NULL) { 1250 __spdk_fs_md_rename_file(req); 1251 return; 1252 } 1253 1254 /* 1255 * The rename overwrites an existing file. So delete the existing file, then 1256 * do the actual rename. 1257 */ 1258 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1259 } 1260 1261 static void 1262 __fs_rename_file_done(void *arg, int fserrno) 1263 { 1264 struct spdk_fs_request *req = arg; 1265 struct spdk_fs_cb_args *args = &req->args; 1266 1267 __wake_caller(args, fserrno); 1268 } 1269 1270 static void 1271 __fs_rename_file(void *arg) 1272 { 1273 struct spdk_fs_request *req = arg; 1274 struct spdk_fs_cb_args *args = &req->args; 1275 1276 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1277 __fs_rename_file_done, req); 1278 } 1279 1280 int 1281 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1282 const char *old_name, const char *new_name) 1283 { 1284 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1285 struct spdk_fs_request *req; 1286 struct spdk_fs_cb_args *args; 1287 int rc; 1288 1289 req = alloc_fs_request(channel); 1290 if (req == NULL) { 1291 return -ENOMEM; 1292 } 1293 1294 args = &req->args; 1295 1296 args->fs = fs; 1297 args->op.rename.old_name = old_name; 1298 args->op.rename.new_name = new_name; 1299 args->sem = &channel->sem; 1300 fs->send_request(__fs_rename_file, req); 1301 sem_wait(&channel->sem); 1302 rc = args->rc; 1303 free_fs_request(req); 1304 return rc; 1305 } 1306 1307 static void 1308 blob_delete_cb(void *ctx, int bserrno) 1309 { 1310 struct spdk_fs_request *req = ctx; 1311 struct spdk_fs_cb_args *args = &req->args; 1312 1313 args->fn.file_op(args->arg, bserrno); 1314 free_fs_request(req); 1315 } 1316 1317 void 1318 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1319 spdk_file_op_complete cb_fn, void *cb_arg) 1320 { 1321 struct spdk_file *f; 1322 spdk_blob_id blobid; 1323 struct spdk_fs_request *req; 1324 struct spdk_fs_cb_args *args; 1325 1326 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1327 1328 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1329 cb_fn(cb_arg, -ENAMETOOLONG); 1330 return; 1331 } 1332 1333 f = fs_find_file(fs, name); 1334 if (f == NULL) { 1335 cb_fn(cb_arg, -ENOENT); 1336 return; 1337 } 1338 1339 req = alloc_fs_request(fs->md_target.md_fs_channel); 1340 if (req == NULL) { 1341 cb_fn(cb_arg, -ENOMEM); 1342 return; 1343 } 1344 1345 args = &req->args; 1346 args->fn.file_op = cb_fn; 1347 args->arg = cb_arg; 1348 1349 if (f->ref_count > 0) { 1350 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1351 f->is_deleted = true; 1352 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1353 spdk_blob_sync_md(f->blob, blob_delete_cb, args); 1354 return; 1355 } 1356 1357 TAILQ_REMOVE(&fs->files, f, tailq); 1358 1359 cache_free_buffers(f); 1360 1361 blobid = f->blobid; 1362 1363 free(f->name); 1364 free(f->tree); 1365 free(f); 1366 1367 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1368 } 1369 1370 static void 1371 __fs_delete_file_done(void *arg, int fserrno) 1372 { 1373 struct spdk_fs_request *req = arg; 1374 struct spdk_fs_cb_args *args = &req->args; 1375 1376 __wake_caller(args, fserrno); 1377 } 1378 1379 static void 1380 __fs_delete_file(void *arg) 1381 { 1382 struct spdk_fs_request *req = arg; 1383 struct spdk_fs_cb_args *args = &req->args; 1384 1385 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1386 } 1387 1388 int 1389 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1390 const char *name) 1391 { 1392 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1393 struct spdk_fs_request *req; 1394 struct spdk_fs_cb_args *args; 1395 int rc; 1396 1397 req = alloc_fs_request(channel); 1398 if (req == NULL) { 1399 return -ENOMEM; 1400 } 1401 1402 args = &req->args; 1403 args->fs = fs; 1404 args->op.delete.name = name; 1405 args->sem = &channel->sem; 1406 fs->send_request(__fs_delete_file, req); 1407 sem_wait(&channel->sem); 1408 rc = args->rc; 1409 free_fs_request(req); 1410 1411 return rc; 1412 } 1413 1414 spdk_fs_iter 1415 spdk_fs_iter_first(struct spdk_filesystem *fs) 1416 { 1417 struct spdk_file *f; 1418 1419 f = TAILQ_FIRST(&fs->files); 1420 return f; 1421 } 1422 1423 spdk_fs_iter 1424 spdk_fs_iter_next(spdk_fs_iter iter) 1425 { 1426 struct spdk_file *f = iter; 1427 1428 if (f == NULL) { 1429 return NULL; 1430 } 1431 1432 f = TAILQ_NEXT(f, tailq); 1433 return f; 1434 } 1435 1436 const char * 1437 spdk_file_get_name(struct spdk_file *file) 1438 { 1439 return file->name; 1440 } 1441 1442 uint64_t 1443 spdk_file_get_length(struct spdk_file *file) 1444 { 1445 assert(file != NULL); 1446 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1447 return file->length; 1448 } 1449 1450 static void 1451 fs_truncate_complete_cb(void *ctx, int bserrno) 1452 { 1453 struct spdk_fs_request *req = ctx; 1454 struct spdk_fs_cb_args *args = &req->args; 1455 1456 args->fn.file_op(args->arg, bserrno); 1457 free_fs_request(req); 1458 } 1459 1460 static void 1461 fs_truncate_resize_cb(void *ctx, int bserrno) 1462 { 1463 struct spdk_fs_request *req = ctx; 1464 struct spdk_fs_cb_args *args = &req->args; 1465 struct spdk_file *file = args->file; 1466 uint64_t *length = &args->op.truncate.length; 1467 1468 if (bserrno) { 1469 args->fn.file_op(args->arg, bserrno); 1470 free_fs_request(req); 1471 return; 1472 } 1473 1474 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1475 1476 file->length = *length; 1477 if (file->append_pos > file->length) { 1478 file->append_pos = file->length; 1479 } 1480 1481 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, args); 1482 } 1483 1484 static uint64_t 1485 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1486 { 1487 return (length + cluster_sz - 1) / cluster_sz; 1488 } 1489 1490 void 1491 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1492 spdk_file_op_complete cb_fn, void *cb_arg) 1493 { 1494 struct spdk_filesystem *fs; 1495 size_t num_clusters; 1496 struct spdk_fs_request *req; 1497 struct spdk_fs_cb_args *args; 1498 1499 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1500 if (length == file->length) { 1501 cb_fn(cb_arg, 0); 1502 return; 1503 } 1504 1505 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1506 if (req == NULL) { 1507 cb_fn(cb_arg, -ENOMEM); 1508 return; 1509 } 1510 1511 args = &req->args; 1512 args->fn.file_op = cb_fn; 1513 args->arg = cb_arg; 1514 args->file = file; 1515 args->op.truncate.length = length; 1516 fs = file->fs; 1517 1518 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1519 1520 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1521 } 1522 1523 static void 1524 __truncate(void *arg) 1525 { 1526 struct spdk_fs_request *req = arg; 1527 struct spdk_fs_cb_args *args = &req->args; 1528 1529 spdk_file_truncate_async(args->file, args->op.truncate.length, 1530 args->fn.file_op, args); 1531 } 1532 1533 int 1534 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1535 uint64_t length) 1536 { 1537 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1538 struct spdk_fs_request *req; 1539 struct spdk_fs_cb_args *args; 1540 int rc; 1541 1542 req = alloc_fs_request(channel); 1543 if (req == NULL) { 1544 return -ENOMEM; 1545 } 1546 1547 args = &req->args; 1548 1549 args->file = file; 1550 args->op.truncate.length = length; 1551 args->fn.file_op = __wake_caller; 1552 args->sem = &channel->sem; 1553 1554 channel->send_request(__truncate, req); 1555 sem_wait(&channel->sem); 1556 rc = args->rc; 1557 free_fs_request(req); 1558 1559 return rc; 1560 } 1561 1562 static void 1563 __rw_done(void *ctx, int bserrno) 1564 { 1565 struct spdk_fs_request *req = ctx; 1566 struct spdk_fs_cb_args *args = &req->args; 1567 1568 spdk_dma_free(args->op.rw.pin_buf); 1569 args->fn.file_op(args->arg, bserrno); 1570 free_fs_request(req); 1571 } 1572 1573 static void 1574 __read_done(void *ctx, int bserrno) 1575 { 1576 struct spdk_fs_request *req = ctx; 1577 struct spdk_fs_cb_args *args = &req->args; 1578 1579 assert(req != NULL); 1580 if (args->op.rw.is_read) { 1581 memcpy(args->op.rw.user_buf, 1582 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1583 args->op.rw.length); 1584 __rw_done(req, 0); 1585 } else { 1586 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1587 args->op.rw.user_buf, 1588 args->op.rw.length); 1589 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1590 args->op.rw.pin_buf, 1591 args->op.rw.start_page, args->op.rw.num_pages, 1592 __rw_done, req); 1593 } 1594 } 1595 1596 static void 1597 __do_blob_read(void *ctx, int fserrno) 1598 { 1599 struct spdk_fs_request *req = ctx; 1600 struct spdk_fs_cb_args *args = &req->args; 1601 1602 if (fserrno) { 1603 __rw_done(req, fserrno); 1604 return; 1605 } 1606 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1607 args->op.rw.pin_buf, 1608 args->op.rw.start_page, args->op.rw.num_pages, 1609 __read_done, req); 1610 } 1611 1612 static void 1613 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1614 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1615 { 1616 uint64_t end_page; 1617 1618 *page_size = spdk_bs_get_page_size(file->fs->bs); 1619 *start_page = offset / *page_size; 1620 end_page = (offset + length - 1) / *page_size; 1621 *num_pages = (end_page - *start_page + 1); 1622 } 1623 1624 static void 1625 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1626 void *payload, uint64_t offset, uint64_t length, 1627 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1628 { 1629 struct spdk_fs_request *req; 1630 struct spdk_fs_cb_args *args; 1631 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1632 uint64_t start_page, num_pages, pin_buf_length; 1633 uint32_t page_size; 1634 1635 if (is_read && offset + length > file->length) { 1636 cb_fn(cb_arg, -EINVAL); 1637 return; 1638 } 1639 1640 req = alloc_fs_request(channel); 1641 if (req == NULL) { 1642 cb_fn(cb_arg, -ENOMEM); 1643 return; 1644 } 1645 1646 args = &req->args; 1647 args->fn.file_op = cb_fn; 1648 args->arg = cb_arg; 1649 args->file = file; 1650 args->op.rw.channel = channel->bs_channel; 1651 args->op.rw.user_buf = payload; 1652 args->op.rw.is_read = is_read; 1653 args->op.rw.offset = offset; 1654 args->op.rw.length = length; 1655 1656 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1657 pin_buf_length = num_pages * page_size; 1658 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL); 1659 if (args->op.rw.pin_buf == NULL) { 1660 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1661 file->name, offset, length); 1662 free_fs_request(req); 1663 cb_fn(cb_arg, -ENOMEM); 1664 return; 1665 } 1666 1667 args->op.rw.start_page = start_page; 1668 args->op.rw.num_pages = num_pages; 1669 1670 if (!is_read && file->length < offset + length) { 1671 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1672 } else { 1673 __do_blob_read(req, 0); 1674 } 1675 } 1676 1677 void 1678 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1679 void *payload, uint64_t offset, uint64_t length, 1680 spdk_file_op_complete cb_fn, void *cb_arg) 1681 { 1682 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1683 } 1684 1685 void 1686 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1687 void *payload, uint64_t offset, uint64_t length, 1688 spdk_file_op_complete cb_fn, void *cb_arg) 1689 { 1690 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1691 file->name, offset, length); 1692 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1693 } 1694 1695 struct spdk_io_channel * 1696 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1697 { 1698 struct spdk_io_channel *io_channel; 1699 struct spdk_fs_channel *fs_channel; 1700 1701 io_channel = spdk_get_io_channel(&fs->io_target); 1702 fs_channel = spdk_io_channel_get_ctx(io_channel); 1703 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1704 fs_channel->send_request = __send_request_direct; 1705 1706 return io_channel; 1707 } 1708 1709 struct spdk_io_channel * 1710 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs) 1711 { 1712 struct spdk_io_channel *io_channel; 1713 struct spdk_fs_channel *fs_channel; 1714 1715 io_channel = spdk_get_io_channel(&fs->io_target); 1716 fs_channel = spdk_io_channel_get_ctx(io_channel); 1717 fs_channel->send_request = fs->send_request; 1718 fs_channel->sync = 1; 1719 pthread_spin_init(&fs_channel->lock, 0); 1720 1721 return io_channel; 1722 } 1723 1724 void 1725 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1726 { 1727 spdk_put_io_channel(channel); 1728 } 1729 1730 void 1731 spdk_fs_set_cache_size(uint64_t size_in_mb) 1732 { 1733 g_fs_cache_size = size_in_mb * 1024 * 1024; 1734 } 1735 1736 uint64_t 1737 spdk_fs_get_cache_size(void) 1738 { 1739 return g_fs_cache_size / (1024 * 1024); 1740 } 1741 1742 static void __file_flush(void *_args); 1743 1744 static void * 1745 alloc_cache_memory_buffer(struct spdk_file *context) 1746 { 1747 struct spdk_file *file; 1748 void *buf; 1749 1750 buf = spdk_mempool_get(g_cache_pool); 1751 if (buf != NULL) { 1752 return buf; 1753 } 1754 1755 pthread_spin_lock(&g_caches_lock); 1756 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1757 if (!file->open_for_writing && 1758 file->priority == SPDK_FILE_PRIORITY_LOW && 1759 file != context) { 1760 break; 1761 } 1762 } 1763 pthread_spin_unlock(&g_caches_lock); 1764 if (file != NULL) { 1765 cache_free_buffers(file); 1766 buf = spdk_mempool_get(g_cache_pool); 1767 if (buf != NULL) { 1768 return buf; 1769 } 1770 } 1771 1772 pthread_spin_lock(&g_caches_lock); 1773 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1774 if (!file->open_for_writing && file != context) { 1775 break; 1776 } 1777 } 1778 pthread_spin_unlock(&g_caches_lock); 1779 if (file != NULL) { 1780 cache_free_buffers(file); 1781 buf = spdk_mempool_get(g_cache_pool); 1782 if (buf != NULL) { 1783 return buf; 1784 } 1785 } 1786 1787 pthread_spin_lock(&g_caches_lock); 1788 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1789 if (file != context) { 1790 break; 1791 } 1792 } 1793 pthread_spin_unlock(&g_caches_lock); 1794 if (file != NULL) { 1795 cache_free_buffers(file); 1796 buf = spdk_mempool_get(g_cache_pool); 1797 if (buf != NULL) { 1798 return buf; 1799 } 1800 } 1801 1802 return NULL; 1803 } 1804 1805 static struct cache_buffer * 1806 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1807 { 1808 struct cache_buffer *buf; 1809 int count = 0; 1810 1811 buf = calloc(1, sizeof(*buf)); 1812 if (buf == NULL) { 1813 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 1814 return NULL; 1815 } 1816 1817 buf->buf = alloc_cache_memory_buffer(file); 1818 while (buf->buf == NULL) { 1819 /* 1820 * TODO: alloc_cache_memory_buffer() should eventually free 1821 * some buffers. Need a more sophisticated check here, instead 1822 * of just bailing if 100 tries does not result in getting a 1823 * free buffer. This will involve using the sync channel's 1824 * semaphore to block until a buffer becomes available. 1825 */ 1826 if (count++ == 100) { 1827 SPDK_ERRLOG("could not allocate cache buffer\n"); 1828 assert(false); 1829 free(buf); 1830 return NULL; 1831 } 1832 buf->buf = alloc_cache_memory_buffer(file); 1833 } 1834 1835 buf->buf_size = CACHE_BUFFER_SIZE; 1836 buf->offset = offset; 1837 1838 pthread_spin_lock(&g_caches_lock); 1839 if (file->tree->present_mask == 0) { 1840 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1841 } 1842 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1843 pthread_spin_unlock(&g_caches_lock); 1844 1845 return buf; 1846 } 1847 1848 static struct cache_buffer * 1849 cache_append_buffer(struct spdk_file *file) 1850 { 1851 struct cache_buffer *last; 1852 1853 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1854 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1855 1856 last = cache_insert_buffer(file, file->append_pos); 1857 if (last == NULL) { 1858 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 1859 return NULL; 1860 } 1861 1862 file->last = last; 1863 1864 return last; 1865 } 1866 1867 static void __check_sync_reqs(struct spdk_file *file); 1868 1869 static void 1870 __file_cache_finish_sync(void *ctx, int bserrno) 1871 { 1872 struct spdk_file *file = ctx; 1873 struct spdk_fs_request *sync_req; 1874 struct spdk_fs_cb_args *sync_args; 1875 1876 pthread_spin_lock(&file->lock); 1877 sync_req = TAILQ_FIRST(&file->sync_requests); 1878 sync_args = &sync_req->args; 1879 assert(sync_args->op.sync.offset <= file->length_flushed); 1880 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1881 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1882 pthread_spin_unlock(&file->lock); 1883 1884 sync_args->fn.file_op(sync_args->arg, bserrno); 1885 __check_sync_reqs(file); 1886 1887 pthread_spin_lock(&file->lock); 1888 free_fs_request(sync_req); 1889 pthread_spin_unlock(&file->lock); 1890 } 1891 1892 static void 1893 __free_args(struct spdk_fs_cb_args *args) 1894 { 1895 struct spdk_fs_request *req; 1896 1897 if (!args->from_request) { 1898 free(args); 1899 } else { 1900 /* Depends on args being at the start of the spdk_fs_request structure. */ 1901 req = (struct spdk_fs_request *)args; 1902 free_fs_request(req); 1903 } 1904 } 1905 1906 static void 1907 __check_sync_reqs(struct spdk_file *file) 1908 { 1909 struct spdk_fs_request *sync_req; 1910 1911 pthread_spin_lock(&file->lock); 1912 1913 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1914 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1915 break; 1916 } 1917 } 1918 1919 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1920 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1921 sync_req->args.op.sync.xattr_in_progress = true; 1922 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 1923 sizeof(file->length_flushed)); 1924 1925 pthread_spin_unlock(&file->lock); 1926 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, file); 1927 } else { 1928 pthread_spin_unlock(&file->lock); 1929 } 1930 } 1931 1932 static void 1933 __file_flush_done(void *arg, int bserrno) 1934 { 1935 struct spdk_fs_cb_args *args = arg; 1936 struct spdk_file *file = args->file; 1937 struct cache_buffer *next = args->op.flush.cache_buffer; 1938 1939 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1940 1941 pthread_spin_lock(&file->lock); 1942 next->in_progress = false; 1943 next->bytes_flushed += args->op.flush.length; 1944 file->length_flushed += args->op.flush.length; 1945 if (file->length_flushed > file->length) { 1946 file->length = file->length_flushed; 1947 } 1948 if (next->bytes_flushed == next->buf_size) { 1949 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1950 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1951 } 1952 1953 /* 1954 * Assert that there is no cached data that extends past the end of the underlying 1955 * blob. 1956 */ 1957 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1958 next->bytes_filled == 0); 1959 1960 pthread_spin_unlock(&file->lock); 1961 1962 __check_sync_reqs(file); 1963 1964 __file_flush(args); 1965 } 1966 1967 static void 1968 __file_flush(void *_args) 1969 { 1970 struct spdk_fs_cb_args *args = _args; 1971 struct spdk_file *file = args->file; 1972 struct cache_buffer *next; 1973 uint64_t offset, length, start_page, num_pages; 1974 uint32_t page_size; 1975 1976 pthread_spin_lock(&file->lock); 1977 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1978 if (next == NULL || next->in_progress) { 1979 /* 1980 * There is either no data to flush, or a flush I/O is already in 1981 * progress. So return immediately - if a flush I/O is in 1982 * progress we will flush more data after that is completed. 1983 */ 1984 __free_args(args); 1985 if (next == NULL) { 1986 /* 1987 * For cases where a file's cache was evicted, and then the 1988 * file was later appended, we will write the data directly 1989 * to disk and bypass cache. So just update length_flushed 1990 * here to reflect that all data was already written to disk. 1991 */ 1992 file->length_flushed = file->append_pos; 1993 } 1994 pthread_spin_unlock(&file->lock); 1995 if (next == NULL) { 1996 /* 1997 * There is no data to flush, but we still need to check for any 1998 * outstanding sync requests to make sure metadata gets updated. 1999 */ 2000 __check_sync_reqs(file); 2001 } 2002 return; 2003 } 2004 2005 offset = next->offset + next->bytes_flushed; 2006 length = next->bytes_filled - next->bytes_flushed; 2007 if (length == 0) { 2008 __free_args(args); 2009 pthread_spin_unlock(&file->lock); 2010 return; 2011 } 2012 args->op.flush.length = length; 2013 args->op.flush.cache_buffer = next; 2014 2015 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 2016 2017 next->in_progress = true; 2018 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2019 offset, length, start_page, num_pages); 2020 pthread_spin_unlock(&file->lock); 2021 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2022 next->buf + (start_page * page_size) - next->offset, 2023 start_page, num_pages, __file_flush_done, args); 2024 } 2025 2026 static void 2027 __file_extend_done(void *arg, int bserrno) 2028 { 2029 struct spdk_fs_cb_args *args = arg; 2030 2031 __wake_caller(args, bserrno); 2032 } 2033 2034 static void 2035 __file_extend_resize_cb(void *_args, int bserrno) 2036 { 2037 struct spdk_fs_cb_args *args = _args; 2038 struct spdk_file *file = args->file; 2039 2040 if (bserrno) { 2041 __wake_caller(args, bserrno); 2042 return; 2043 } 2044 2045 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2046 } 2047 2048 static void 2049 __file_extend_blob(void *_args) 2050 { 2051 struct spdk_fs_cb_args *args = _args; 2052 struct spdk_file *file = args->file; 2053 2054 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2055 } 2056 2057 static void 2058 __rw_from_file_done(void *arg, int bserrno) 2059 { 2060 struct spdk_fs_cb_args *args = arg; 2061 2062 __wake_caller(args, bserrno); 2063 __free_args(args); 2064 } 2065 2066 static void 2067 __rw_from_file(void *_args) 2068 { 2069 struct spdk_fs_cb_args *args = _args; 2070 struct spdk_file *file = args->file; 2071 2072 if (args->op.rw.is_read) { 2073 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 2074 args->op.rw.offset, args->op.rw.length, 2075 __rw_from_file_done, args); 2076 } else { 2077 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 2078 args->op.rw.offset, args->op.rw.length, 2079 __rw_from_file_done, args); 2080 } 2081 } 2082 2083 static int 2084 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 2085 uint64_t offset, uint64_t length, bool is_read) 2086 { 2087 struct spdk_fs_cb_args *args; 2088 2089 args = calloc(1, sizeof(*args)); 2090 if (args == NULL) { 2091 sem_post(sem); 2092 return -ENOMEM; 2093 } 2094 2095 args->file = file; 2096 args->sem = sem; 2097 args->op.rw.user_buf = payload; 2098 args->op.rw.offset = offset; 2099 args->op.rw.length = length; 2100 args->op.rw.is_read = is_read; 2101 file->fs->send_request(__rw_from_file, args); 2102 return 0; 2103 } 2104 2105 int 2106 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 2107 void *payload, uint64_t offset, uint64_t length) 2108 { 2109 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2110 struct spdk_fs_cb_args *args; 2111 uint64_t rem_length, copy, blob_size, cluster_sz; 2112 uint32_t cache_buffers_filled = 0; 2113 uint8_t *cur_payload; 2114 struct cache_buffer *last; 2115 2116 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2117 2118 if (length == 0) { 2119 return 0; 2120 } 2121 2122 if (offset != file->append_pos) { 2123 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2124 return -EINVAL; 2125 } 2126 2127 pthread_spin_lock(&file->lock); 2128 file->open_for_writing = true; 2129 2130 if (file->last == NULL) { 2131 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 2132 cache_append_buffer(file); 2133 } else { 2134 int rc; 2135 2136 file->append_pos += length; 2137 pthread_spin_unlock(&file->lock); 2138 rc = __send_rw_from_file(file, &channel->sem, payload, 2139 offset, length, false); 2140 sem_wait(&channel->sem); 2141 return rc; 2142 } 2143 } 2144 2145 blob_size = __file_get_blob_size(file); 2146 2147 if ((offset + length) > blob_size) { 2148 struct spdk_fs_cb_args extend_args = {}; 2149 2150 cluster_sz = file->fs->bs_opts.cluster_sz; 2151 extend_args.sem = &channel->sem; 2152 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2153 extend_args.file = file; 2154 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2155 pthread_spin_unlock(&file->lock); 2156 file->fs->send_request(__file_extend_blob, &extend_args); 2157 sem_wait(&channel->sem); 2158 if (extend_args.rc) { 2159 return extend_args.rc; 2160 } 2161 } 2162 2163 last = file->last; 2164 rem_length = length; 2165 cur_payload = payload; 2166 while (rem_length > 0) { 2167 copy = last->buf_size - last->bytes_filled; 2168 if (copy > rem_length) { 2169 copy = rem_length; 2170 } 2171 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2172 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2173 file->append_pos += copy; 2174 if (file->length < file->append_pos) { 2175 file->length = file->append_pos; 2176 } 2177 cur_payload += copy; 2178 last->bytes_filled += copy; 2179 rem_length -= copy; 2180 if (last->bytes_filled == last->buf_size) { 2181 cache_buffers_filled++; 2182 last = cache_append_buffer(file); 2183 if (last == NULL) { 2184 BLOBFS_TRACE(file, "nomem\n"); 2185 pthread_spin_unlock(&file->lock); 2186 return -ENOMEM; 2187 } 2188 } 2189 } 2190 2191 pthread_spin_unlock(&file->lock); 2192 2193 if (cache_buffers_filled == 0) { 2194 return 0; 2195 } 2196 2197 args = calloc(1, sizeof(*args)); 2198 if (args == NULL) { 2199 return -ENOMEM; 2200 } 2201 2202 args->file = file; 2203 file->fs->send_request(__file_flush, args); 2204 return 0; 2205 } 2206 2207 static void 2208 __readahead_done(void *arg, int bserrno) 2209 { 2210 struct spdk_fs_cb_args *args = arg; 2211 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2212 struct spdk_file *file = args->file; 2213 2214 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2215 2216 pthread_spin_lock(&file->lock); 2217 cache_buffer->bytes_filled = args->op.readahead.length; 2218 cache_buffer->bytes_flushed = args->op.readahead.length; 2219 cache_buffer->in_progress = false; 2220 pthread_spin_unlock(&file->lock); 2221 2222 __free_args(args); 2223 } 2224 2225 static void 2226 __readahead(void *_args) 2227 { 2228 struct spdk_fs_cb_args *args = _args; 2229 struct spdk_file *file = args->file; 2230 uint64_t offset, length, start_page, num_pages; 2231 uint32_t page_size; 2232 2233 offset = args->op.readahead.offset; 2234 length = args->op.readahead.length; 2235 assert(length > 0); 2236 2237 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 2238 2239 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2240 offset, length, start_page, num_pages); 2241 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2242 args->op.readahead.cache_buffer->buf, 2243 start_page, num_pages, __readahead_done, args); 2244 } 2245 2246 static uint64_t 2247 __next_cache_buffer_offset(uint64_t offset) 2248 { 2249 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2250 } 2251 2252 static void 2253 check_readahead(struct spdk_file *file, uint64_t offset) 2254 { 2255 struct spdk_fs_cb_args *args; 2256 2257 offset = __next_cache_buffer_offset(offset); 2258 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2259 return; 2260 } 2261 2262 args = calloc(1, sizeof(*args)); 2263 if (args == NULL) { 2264 return; 2265 } 2266 2267 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2268 2269 args->file = file; 2270 args->op.readahead.offset = offset; 2271 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2272 if (!args->op.readahead.cache_buffer) { 2273 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2274 free(args); 2275 return; 2276 } 2277 2278 args->op.readahead.cache_buffer->in_progress = true; 2279 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2280 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2281 } else { 2282 args->op.readahead.length = CACHE_BUFFER_SIZE; 2283 } 2284 file->fs->send_request(__readahead, args); 2285 } 2286 2287 static int 2288 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2289 { 2290 struct cache_buffer *buf; 2291 int rc; 2292 2293 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2294 if (buf == NULL) { 2295 pthread_spin_unlock(&file->lock); 2296 rc = __send_rw_from_file(file, sem, payload, offset, length, true); 2297 pthread_spin_lock(&file->lock); 2298 return rc; 2299 } 2300 2301 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2302 length = buf->offset + buf->bytes_filled - offset; 2303 } 2304 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2305 memcpy(payload, &buf->buf[offset - buf->offset], length); 2306 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2307 pthread_spin_lock(&g_caches_lock); 2308 spdk_tree_remove_buffer(file->tree, buf); 2309 if (file->tree->present_mask == 0) { 2310 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2311 } 2312 pthread_spin_unlock(&g_caches_lock); 2313 } 2314 2315 sem_post(sem); 2316 return 0; 2317 } 2318 2319 int64_t 2320 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2321 void *payload, uint64_t offset, uint64_t length) 2322 { 2323 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2324 uint64_t final_offset, final_length; 2325 uint32_t sub_reads = 0; 2326 int rc = 0; 2327 2328 pthread_spin_lock(&file->lock); 2329 2330 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2331 2332 file->open_for_writing = false; 2333 2334 if (length == 0 || offset >= file->append_pos) { 2335 pthread_spin_unlock(&file->lock); 2336 return 0; 2337 } 2338 2339 if (offset + length > file->append_pos) { 2340 length = file->append_pos - offset; 2341 } 2342 2343 if (offset != file->next_seq_offset) { 2344 file->seq_byte_count = 0; 2345 } 2346 file->seq_byte_count += length; 2347 file->next_seq_offset = offset + length; 2348 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2349 check_readahead(file, offset); 2350 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2351 } 2352 2353 final_length = 0; 2354 final_offset = offset + length; 2355 while (offset < final_offset) { 2356 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2357 if (length > (final_offset - offset)) { 2358 length = final_offset - offset; 2359 } 2360 rc = __file_read(file, payload, offset, length, &channel->sem); 2361 if (rc == 0) { 2362 final_length += length; 2363 } else { 2364 break; 2365 } 2366 payload += length; 2367 offset += length; 2368 sub_reads++; 2369 } 2370 pthread_spin_unlock(&file->lock); 2371 while (sub_reads-- > 0) { 2372 sem_wait(&channel->sem); 2373 } 2374 if (rc == 0) { 2375 return final_length; 2376 } else { 2377 return rc; 2378 } 2379 } 2380 2381 static void 2382 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2383 spdk_file_op_complete cb_fn, void *cb_arg) 2384 { 2385 struct spdk_fs_request *sync_req; 2386 struct spdk_fs_request *flush_req; 2387 struct spdk_fs_cb_args *sync_args; 2388 struct spdk_fs_cb_args *flush_args; 2389 2390 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2391 2392 pthread_spin_lock(&file->lock); 2393 if (file->append_pos <= file->length_flushed) { 2394 BLOBFS_TRACE(file, "done - no data to flush\n"); 2395 pthread_spin_unlock(&file->lock); 2396 cb_fn(cb_arg, 0); 2397 return; 2398 } 2399 2400 sync_req = alloc_fs_request(channel); 2401 if (!sync_req) { 2402 pthread_spin_unlock(&file->lock); 2403 cb_fn(cb_arg, -ENOMEM); 2404 return; 2405 } 2406 sync_args = &sync_req->args; 2407 2408 flush_req = alloc_fs_request(channel); 2409 if (!flush_req) { 2410 pthread_spin_unlock(&file->lock); 2411 cb_fn(cb_arg, -ENOMEM); 2412 return; 2413 } 2414 flush_args = &flush_req->args; 2415 2416 sync_args->file = file; 2417 sync_args->fn.file_op = cb_fn; 2418 sync_args->arg = cb_arg; 2419 sync_args->op.sync.offset = file->append_pos; 2420 sync_args->op.sync.xattr_in_progress = false; 2421 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2422 pthread_spin_unlock(&file->lock); 2423 2424 flush_args->file = file; 2425 channel->send_request(__file_flush, flush_args); 2426 } 2427 2428 int 2429 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2430 { 2431 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2432 struct spdk_fs_cb_args args = {}; 2433 2434 args.sem = &channel->sem; 2435 _file_sync(file, channel, __wake_caller, &args); 2436 sem_wait(&channel->sem); 2437 2438 return args.rc; 2439 } 2440 2441 void 2442 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2443 spdk_file_op_complete cb_fn, void *cb_arg) 2444 { 2445 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2446 2447 _file_sync(file, channel, cb_fn, cb_arg); 2448 } 2449 2450 void 2451 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2452 { 2453 BLOBFS_TRACE(file, "priority=%u\n", priority); 2454 file->priority = priority; 2455 2456 } 2457 2458 /* 2459 * Close routines 2460 */ 2461 2462 static void 2463 __file_close_async_done(void *ctx, int bserrno) 2464 { 2465 struct spdk_fs_request *req = ctx; 2466 struct spdk_fs_cb_args *args = &req->args; 2467 struct spdk_file *file = args->file; 2468 2469 if (file->is_deleted) { 2470 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2471 return; 2472 } 2473 2474 args->fn.file_op(args->arg, bserrno); 2475 free_fs_request(req); 2476 } 2477 2478 static void 2479 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2480 { 2481 struct spdk_blob *blob; 2482 2483 pthread_spin_lock(&file->lock); 2484 if (file->ref_count == 0) { 2485 pthread_spin_unlock(&file->lock); 2486 __file_close_async_done(req, -EBADF); 2487 return; 2488 } 2489 2490 file->ref_count--; 2491 if (file->ref_count > 0) { 2492 pthread_spin_unlock(&file->lock); 2493 req->args.fn.file_op(req->args.arg, 0); 2494 free_fs_request(req); 2495 return; 2496 } 2497 2498 pthread_spin_unlock(&file->lock); 2499 2500 blob = file->blob; 2501 file->blob = NULL; 2502 spdk_blob_close(blob, __file_close_async_done, req); 2503 } 2504 2505 static void 2506 __file_close_async__sync_done(void *arg, int fserrno) 2507 { 2508 struct spdk_fs_request *req = arg; 2509 struct spdk_fs_cb_args *args = &req->args; 2510 2511 __file_close_async(args->file, req); 2512 } 2513 2514 void 2515 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2516 { 2517 struct spdk_fs_request *req; 2518 struct spdk_fs_cb_args *args; 2519 2520 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2521 if (req == NULL) { 2522 cb_fn(cb_arg, -ENOMEM); 2523 return; 2524 } 2525 2526 args = &req->args; 2527 args->file = file; 2528 args->fn.file_op = cb_fn; 2529 args->arg = cb_arg; 2530 2531 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2532 } 2533 2534 static void 2535 __file_close_done(void *arg, int fserrno) 2536 { 2537 struct spdk_fs_cb_args *args = arg; 2538 2539 args->rc = fserrno; 2540 sem_post(args->sem); 2541 } 2542 2543 static void 2544 __file_close(void *arg) 2545 { 2546 struct spdk_fs_request *req = arg; 2547 struct spdk_fs_cb_args *args = &req->args; 2548 struct spdk_file *file = args->file; 2549 2550 __file_close_async(file, req); 2551 } 2552 2553 int 2554 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2555 { 2556 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2557 struct spdk_fs_request *req; 2558 struct spdk_fs_cb_args *args; 2559 2560 req = alloc_fs_request(channel); 2561 if (req == NULL) { 2562 return -ENOMEM; 2563 } 2564 2565 args = &req->args; 2566 2567 spdk_file_sync(file, _channel); 2568 BLOBFS_TRACE(file, "name=%s\n", file->name); 2569 args->file = file; 2570 args->sem = &channel->sem; 2571 args->fn.file_op = __file_close_done; 2572 args->arg = req; 2573 channel->send_request(__file_close, req); 2574 sem_wait(&channel->sem); 2575 2576 return args->rc; 2577 } 2578 2579 int 2580 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2581 { 2582 if (size < sizeof(spdk_blob_id)) { 2583 return -EINVAL; 2584 } 2585 2586 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2587 2588 return sizeof(spdk_blob_id); 2589 } 2590 2591 static void 2592 cache_free_buffers(struct spdk_file *file) 2593 { 2594 BLOBFS_TRACE(file, "free=%s\n", file->name); 2595 pthread_spin_lock(&file->lock); 2596 pthread_spin_lock(&g_caches_lock); 2597 if (file->tree->present_mask == 0) { 2598 pthread_spin_unlock(&g_caches_lock); 2599 pthread_spin_unlock(&file->lock); 2600 return; 2601 } 2602 spdk_tree_free_buffers(file->tree); 2603 2604 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2605 /* If not freed, put it in the end of the queue */ 2606 if (file->tree->present_mask != 0) { 2607 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2608 } 2609 file->last = NULL; 2610 pthread_spin_unlock(&g_caches_lock); 2611 pthread_spin_unlock(&file->lock); 2612 } 2613 2614 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2615 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2616