1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "blobfs_internal.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/thread.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 47 #define BLOBFS_TRACE(file, str, args...) \ 48 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 49 50 #define BLOBFS_TRACE_RW(file, str, args...) \ 51 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 55 56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 57 static struct spdk_mempool *g_cache_pool; 58 static TAILQ_HEAD(, spdk_file) g_caches; 59 static int g_fs_count = 0; 60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 61 static pthread_spinlock_t g_caches_lock; 62 63 static void 64 __sem_post(void *arg, int bserrno) 65 { 66 sem_t *sem = arg; 67 68 sem_post(sem); 69 } 70 71 void 72 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 73 { 74 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 75 free(cache_buffer); 76 } 77 78 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 79 80 struct spdk_file { 81 struct spdk_filesystem *fs; 82 struct spdk_blob *blob; 83 char *name; 84 uint64_t length; 85 bool is_deleted; 86 bool open_for_writing; 87 uint64_t length_flushed; 88 uint64_t append_pos; 89 uint64_t seq_byte_count; 90 uint64_t next_seq_offset; 91 uint32_t priority; 92 TAILQ_ENTRY(spdk_file) tailq; 93 spdk_blob_id blobid; 94 uint32_t ref_count; 95 pthread_spinlock_t lock; 96 struct cache_buffer *last; 97 struct cache_tree *tree; 98 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 99 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 100 TAILQ_ENTRY(spdk_file) cache_tailq; 101 }; 102 103 struct spdk_deleted_file { 104 spdk_blob_id id; 105 TAILQ_ENTRY(spdk_deleted_file) tailq; 106 }; 107 108 struct spdk_filesystem { 109 struct spdk_blob_store *bs; 110 TAILQ_HEAD(, spdk_file) files; 111 struct spdk_bs_opts bs_opts; 112 struct spdk_bs_dev *bdev; 113 fs_send_request_fn send_request; 114 115 struct { 116 uint32_t max_ops; 117 struct spdk_io_channel *sync_io_channel; 118 struct spdk_fs_channel *sync_fs_channel; 119 } sync_target; 120 121 struct { 122 uint32_t max_ops; 123 struct spdk_io_channel *md_io_channel; 124 struct spdk_fs_channel *md_fs_channel; 125 } md_target; 126 127 struct { 128 uint32_t max_ops; 129 } io_target; 130 }; 131 132 struct spdk_fs_cb_args { 133 union { 134 spdk_fs_op_with_handle_complete fs_op_with_handle; 135 spdk_fs_op_complete fs_op; 136 spdk_file_op_with_handle_complete file_op_with_handle; 137 spdk_file_op_complete file_op; 138 spdk_file_stat_op_complete stat_op; 139 } fn; 140 void *arg; 141 sem_t *sem; 142 struct spdk_filesystem *fs; 143 struct spdk_file *file; 144 int rc; 145 bool from_request; 146 union { 147 struct { 148 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 149 } fs_load; 150 struct { 151 uint64_t length; 152 } truncate; 153 struct { 154 struct spdk_io_channel *channel; 155 void *user_buf; 156 void *pin_buf; 157 int is_read; 158 off_t offset; 159 size_t length; 160 uint64_t start_page; 161 uint64_t num_pages; 162 uint32_t blocklen; 163 } rw; 164 struct { 165 const char *old_name; 166 const char *new_name; 167 } rename; 168 struct { 169 struct cache_buffer *cache_buffer; 170 uint64_t length; 171 } flush; 172 struct { 173 struct cache_buffer *cache_buffer; 174 uint64_t length; 175 uint64_t offset; 176 } readahead; 177 struct { 178 uint64_t offset; 179 TAILQ_ENTRY(spdk_fs_request) tailq; 180 bool xattr_in_progress; 181 } sync; 182 struct { 183 uint32_t num_clusters; 184 } resize; 185 struct { 186 const char *name; 187 uint32_t flags; 188 TAILQ_ENTRY(spdk_fs_request) tailq; 189 } open; 190 struct { 191 const char *name; 192 struct spdk_blob *blob; 193 } create; 194 struct { 195 const char *name; 196 } delete; 197 struct { 198 const char *name; 199 } stat; 200 } op; 201 }; 202 203 static void cache_free_buffers(struct spdk_file *file); 204 205 void 206 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 207 { 208 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 209 } 210 211 static void 212 __initialize_cache(void) 213 { 214 assert(g_cache_pool == NULL); 215 216 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 217 g_fs_cache_size / CACHE_BUFFER_SIZE, 218 CACHE_BUFFER_SIZE, 219 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 220 SPDK_ENV_SOCKET_ID_ANY); 221 if (!g_cache_pool) { 222 SPDK_ERRLOG("Create mempool failed, you may " 223 "increase the memory and try again\n"); 224 assert(false); 225 } 226 TAILQ_INIT(&g_caches); 227 pthread_spin_init(&g_caches_lock, 0); 228 } 229 230 static void 231 __free_cache(void) 232 { 233 assert(g_cache_pool != NULL); 234 235 spdk_mempool_free(g_cache_pool); 236 g_cache_pool = NULL; 237 } 238 239 static uint64_t 240 __file_get_blob_size(struct spdk_file *file) 241 { 242 uint64_t cluster_sz; 243 244 cluster_sz = file->fs->bs_opts.cluster_sz; 245 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 246 } 247 248 struct spdk_fs_request { 249 struct spdk_fs_cb_args args; 250 TAILQ_ENTRY(spdk_fs_request) link; 251 struct spdk_fs_channel *channel; 252 }; 253 254 struct spdk_fs_channel { 255 struct spdk_fs_request *req_mem; 256 TAILQ_HEAD(, spdk_fs_request) reqs; 257 sem_t sem; 258 struct spdk_filesystem *fs; 259 struct spdk_io_channel *bs_channel; 260 fs_send_request_fn send_request; 261 bool sync; 262 pthread_spinlock_t lock; 263 }; 264 265 static struct spdk_fs_request * 266 alloc_fs_request(struct spdk_fs_channel *channel) 267 { 268 struct spdk_fs_request *req; 269 270 if (channel->sync) { 271 pthread_spin_lock(&channel->lock); 272 } 273 274 req = TAILQ_FIRST(&channel->reqs); 275 if (req) { 276 TAILQ_REMOVE(&channel->reqs, req, link); 277 } 278 279 if (channel->sync) { 280 pthread_spin_unlock(&channel->lock); 281 } 282 283 if (req == NULL) { 284 return NULL; 285 } 286 memset(req, 0, sizeof(*req)); 287 req->channel = channel; 288 req->args.from_request = true; 289 290 return req; 291 } 292 293 static void 294 free_fs_request(struct spdk_fs_request *req) 295 { 296 struct spdk_fs_channel *channel = req->channel; 297 298 if (channel->sync) { 299 pthread_spin_lock(&channel->lock); 300 } 301 302 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 303 304 if (channel->sync) { 305 pthread_spin_unlock(&channel->lock); 306 } 307 } 308 309 static int 310 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 311 uint32_t max_ops) 312 { 313 uint32_t i; 314 315 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 316 if (!channel->req_mem) { 317 return -1; 318 } 319 320 TAILQ_INIT(&channel->reqs); 321 sem_init(&channel->sem, 0, 0); 322 323 for (i = 0; i < max_ops; i++) { 324 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 325 } 326 327 channel->fs = fs; 328 329 return 0; 330 } 331 332 static int 333 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 334 { 335 struct spdk_filesystem *fs; 336 struct spdk_fs_channel *channel = ctx_buf; 337 338 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 339 340 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 341 } 342 343 static int 344 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 345 { 346 struct spdk_filesystem *fs; 347 struct spdk_fs_channel *channel = ctx_buf; 348 349 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 350 351 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 352 } 353 354 static int 355 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 356 { 357 struct spdk_filesystem *fs; 358 struct spdk_fs_channel *channel = ctx_buf; 359 360 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 361 362 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 363 } 364 365 static void 366 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 367 { 368 struct spdk_fs_channel *channel = ctx_buf; 369 370 free(channel->req_mem); 371 if (channel->bs_channel != NULL) { 372 spdk_bs_free_io_channel(channel->bs_channel); 373 } 374 } 375 376 static void 377 __send_request_direct(fs_request_fn fn, void *arg) 378 { 379 fn(arg); 380 } 381 382 static void 383 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 384 { 385 fs->bs = bs; 386 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 387 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 388 fs->md_target.md_fs_channel->send_request = __send_request_direct; 389 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 390 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 391 392 pthread_mutex_lock(&g_cache_init_lock); 393 if (g_fs_count == 0) { 394 __initialize_cache(); 395 } 396 g_fs_count++; 397 pthread_mutex_unlock(&g_cache_init_lock); 398 } 399 400 static void 401 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 402 { 403 struct spdk_fs_request *req = ctx; 404 struct spdk_fs_cb_args *args = &req->args; 405 struct spdk_filesystem *fs = args->fs; 406 407 if (bserrno == 0) { 408 common_fs_bs_init(fs, bs); 409 } else { 410 free(fs); 411 fs = NULL; 412 } 413 414 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 415 free_fs_request(req); 416 } 417 418 static void 419 fs_conf_parse(void) 420 { 421 struct spdk_conf_section *sp; 422 423 sp = spdk_conf_find_section(NULL, "Blobfs"); 424 if (sp == NULL) { 425 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 426 return; 427 } 428 429 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 430 if (g_fs_cache_buffer_shift <= 0) { 431 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 432 } 433 } 434 435 static struct spdk_filesystem * 436 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 437 { 438 struct spdk_filesystem *fs; 439 440 fs = calloc(1, sizeof(*fs)); 441 if (fs == NULL) { 442 return NULL; 443 } 444 445 fs->bdev = dev; 446 fs->send_request = send_request_fn; 447 TAILQ_INIT(&fs->files); 448 449 fs->md_target.max_ops = 512; 450 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 451 sizeof(struct spdk_fs_channel)); 452 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 453 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 454 455 fs->sync_target.max_ops = 512; 456 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 457 sizeof(struct spdk_fs_channel)); 458 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 459 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 460 461 fs->io_target.max_ops = 512; 462 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 463 sizeof(struct spdk_fs_channel)); 464 465 return fs; 466 } 467 468 void 469 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 470 fs_send_request_fn send_request_fn, 471 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 472 { 473 struct spdk_filesystem *fs; 474 struct spdk_fs_request *req; 475 struct spdk_fs_cb_args *args; 476 struct spdk_bs_opts opts = {}; 477 478 fs = fs_alloc(dev, send_request_fn); 479 if (fs == NULL) { 480 cb_fn(cb_arg, NULL, -ENOMEM); 481 return; 482 } 483 484 fs_conf_parse(); 485 486 req = alloc_fs_request(fs->md_target.md_fs_channel); 487 if (req == NULL) { 488 spdk_put_io_channel(fs->md_target.md_io_channel); 489 spdk_io_device_unregister(&fs->md_target, NULL); 490 spdk_put_io_channel(fs->sync_target.sync_io_channel); 491 spdk_io_device_unregister(&fs->sync_target, NULL); 492 spdk_io_device_unregister(&fs->io_target, NULL); 493 free(fs); 494 cb_fn(cb_arg, NULL, -ENOMEM); 495 return; 496 } 497 498 args = &req->args; 499 args->fn.fs_op_with_handle = cb_fn; 500 args->arg = cb_arg; 501 args->fs = fs; 502 503 spdk_bs_opts_init(&opts); 504 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS"); 505 if (opt) { 506 opts.cluster_sz = opt->cluster_sz; 507 } 508 spdk_bs_init(dev, &opts, init_cb, req); 509 } 510 511 static struct spdk_file * 512 file_alloc(struct spdk_filesystem *fs) 513 { 514 struct spdk_file *file; 515 516 file = calloc(1, sizeof(*file)); 517 if (file == NULL) { 518 return NULL; 519 } 520 521 file->tree = calloc(1, sizeof(*file->tree)); 522 if (file->tree == NULL) { 523 free(file); 524 return NULL; 525 } 526 527 file->fs = fs; 528 TAILQ_INIT(&file->open_requests); 529 TAILQ_INIT(&file->sync_requests); 530 pthread_spin_init(&file->lock, 0); 531 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 532 file->priority = SPDK_FILE_PRIORITY_LOW; 533 return file; 534 } 535 536 static void fs_load_done(void *ctx, int bserrno); 537 538 static int 539 _handle_deleted_files(struct spdk_fs_request *req) 540 { 541 struct spdk_fs_cb_args *args = &req->args; 542 struct spdk_filesystem *fs = args->fs; 543 544 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 545 struct spdk_deleted_file *deleted_file; 546 547 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 548 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 549 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 550 free(deleted_file); 551 return 0; 552 } 553 554 return 1; 555 } 556 557 static void 558 fs_load_done(void *ctx, int bserrno) 559 { 560 struct spdk_fs_request *req = ctx; 561 struct spdk_fs_cb_args *args = &req->args; 562 struct spdk_filesystem *fs = args->fs; 563 564 /* The filesystem has been loaded. Now check if there are any files that 565 * were marked for deletion before last unload. Do not complete the 566 * fs_load callback until all of them have been deleted on disk. 567 */ 568 if (_handle_deleted_files(req) == 0) { 569 /* We found a file that's been marked for deleting but not actually 570 * deleted yet. This function will get called again once the delete 571 * operation is completed. 572 */ 573 return; 574 } 575 576 args->fn.fs_op_with_handle(args->arg, fs, 0); 577 free_fs_request(req); 578 579 } 580 581 static void 582 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 583 { 584 struct spdk_fs_request *req = ctx; 585 struct spdk_fs_cb_args *args = &req->args; 586 struct spdk_filesystem *fs = args->fs; 587 uint64_t *length; 588 const char *name; 589 uint32_t *is_deleted; 590 size_t value_len; 591 592 if (rc < 0) { 593 args->fn.fs_op_with_handle(args->arg, fs, rc); 594 free_fs_request(req); 595 return; 596 } 597 598 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 599 if (rc < 0) { 600 args->fn.fs_op_with_handle(args->arg, fs, rc); 601 free_fs_request(req); 602 return; 603 } 604 605 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 606 if (rc < 0) { 607 args->fn.fs_op_with_handle(args->arg, fs, rc); 608 free_fs_request(req); 609 return; 610 } 611 612 assert(value_len == 8); 613 614 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 615 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 616 if (rc < 0) { 617 struct spdk_file *f; 618 619 f = file_alloc(fs); 620 if (f == NULL) { 621 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 622 free_fs_request(req); 623 return; 624 } 625 626 f->name = strdup(name); 627 f->blobid = spdk_blob_get_id(blob); 628 f->length = *length; 629 f->length_flushed = *length; 630 f->append_pos = *length; 631 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 632 } else { 633 struct spdk_deleted_file *deleted_file; 634 635 deleted_file = calloc(1, sizeof(*deleted_file)); 636 if (deleted_file == NULL) { 637 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 638 free_fs_request(req); 639 return; 640 } 641 deleted_file->id = spdk_blob_get_id(blob); 642 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 643 } 644 } 645 646 static void 647 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 648 { 649 struct spdk_fs_request *req = ctx; 650 struct spdk_fs_cb_args *args = &req->args; 651 struct spdk_filesystem *fs = args->fs; 652 struct spdk_bs_type bstype; 653 static const struct spdk_bs_type blobfs_type = {"BLOBFS"}; 654 static const struct spdk_bs_type zeros; 655 656 if (bserrno != 0) { 657 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 658 free_fs_request(req); 659 free(fs); 660 return; 661 } 662 663 bstype = spdk_bs_get_bstype(bs); 664 665 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 666 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n"); 667 spdk_bs_set_bstype(bs, blobfs_type); 668 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 669 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n"); 670 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype)); 671 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 672 free_fs_request(req); 673 free(fs); 674 return; 675 } 676 677 common_fs_bs_init(fs, bs); 678 fs_load_done(req, 0); 679 } 680 681 static void 682 spdk_fs_io_device_unregister(struct spdk_filesystem *fs) 683 { 684 assert(fs != NULL); 685 spdk_io_device_unregister(&fs->md_target, NULL); 686 spdk_io_device_unregister(&fs->sync_target, NULL); 687 spdk_io_device_unregister(&fs->io_target, NULL); 688 free(fs); 689 } 690 691 static void 692 spdk_fs_free_io_channels(struct spdk_filesystem *fs) 693 { 694 assert(fs != NULL); 695 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 696 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 697 } 698 699 void 700 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 701 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 702 { 703 struct spdk_filesystem *fs; 704 struct spdk_fs_cb_args *args; 705 struct spdk_fs_request *req; 706 struct spdk_bs_opts bs_opts; 707 708 fs = fs_alloc(dev, send_request_fn); 709 if (fs == NULL) { 710 cb_fn(cb_arg, NULL, -ENOMEM); 711 return; 712 } 713 714 fs_conf_parse(); 715 716 req = alloc_fs_request(fs->md_target.md_fs_channel); 717 if (req == NULL) { 718 spdk_fs_free_io_channels(fs); 719 spdk_fs_io_device_unregister(fs); 720 cb_fn(cb_arg, NULL, -ENOMEM); 721 return; 722 } 723 724 args = &req->args; 725 args->fn.fs_op_with_handle = cb_fn; 726 args->arg = cb_arg; 727 args->fs = fs; 728 TAILQ_INIT(&args->op.fs_load.deleted_files); 729 spdk_bs_opts_init(&bs_opts); 730 bs_opts.iter_cb_fn = iter_cb; 731 bs_opts.iter_cb_arg = req; 732 spdk_bs_load(dev, &bs_opts, load_cb, req); 733 } 734 735 static void 736 unload_cb(void *ctx, int bserrno) 737 { 738 struct spdk_fs_request *req = ctx; 739 struct spdk_fs_cb_args *args = &req->args; 740 struct spdk_filesystem *fs = args->fs; 741 742 pthread_mutex_lock(&g_cache_init_lock); 743 g_fs_count--; 744 if (g_fs_count == 0) { 745 __free_cache(); 746 } 747 pthread_mutex_unlock(&g_cache_init_lock); 748 749 args->fn.fs_op(args->arg, bserrno); 750 free(req); 751 752 spdk_fs_io_device_unregister(fs); 753 } 754 755 void 756 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 757 { 758 struct spdk_fs_request *req; 759 struct spdk_fs_cb_args *args; 760 761 /* 762 * We must free the md_channel before unloading the blobstore, so just 763 * allocate this request from the general heap. 764 */ 765 req = calloc(1, sizeof(*req)); 766 if (req == NULL) { 767 cb_fn(cb_arg, -ENOMEM); 768 return; 769 } 770 771 args = &req->args; 772 args->fn.fs_op = cb_fn; 773 args->arg = cb_arg; 774 args->fs = fs; 775 776 spdk_fs_free_io_channels(fs); 777 spdk_bs_unload(fs->bs, unload_cb, req); 778 } 779 780 static struct spdk_file * 781 fs_find_file(struct spdk_filesystem *fs, const char *name) 782 { 783 struct spdk_file *file; 784 785 TAILQ_FOREACH(file, &fs->files, tailq) { 786 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 787 return file; 788 } 789 } 790 791 return NULL; 792 } 793 794 void 795 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 796 spdk_file_stat_op_complete cb_fn, void *cb_arg) 797 { 798 struct spdk_file_stat stat; 799 struct spdk_file *f = NULL; 800 801 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 802 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 803 return; 804 } 805 806 f = fs_find_file(fs, name); 807 if (f != NULL) { 808 stat.blobid = f->blobid; 809 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 810 cb_fn(cb_arg, &stat, 0); 811 return; 812 } 813 814 cb_fn(cb_arg, NULL, -ENOENT); 815 } 816 817 static void 818 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 819 { 820 struct spdk_fs_request *req = arg; 821 struct spdk_fs_cb_args *args = &req->args; 822 823 args->rc = fserrno; 824 if (fserrno == 0) { 825 memcpy(args->arg, stat, sizeof(*stat)); 826 } 827 sem_post(args->sem); 828 } 829 830 static void 831 __file_stat(void *arg) 832 { 833 struct spdk_fs_request *req = arg; 834 struct spdk_fs_cb_args *args = &req->args; 835 836 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 837 args->fn.stat_op, req); 838 } 839 840 int 841 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 842 const char *name, struct spdk_file_stat *stat) 843 { 844 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 845 struct spdk_fs_request *req; 846 int rc; 847 848 req = alloc_fs_request(channel); 849 if (req == NULL) { 850 return -ENOMEM; 851 } 852 853 req->args.fs = fs; 854 req->args.op.stat.name = name; 855 req->args.fn.stat_op = __copy_stat; 856 req->args.arg = stat; 857 req->args.sem = &channel->sem; 858 channel->send_request(__file_stat, req); 859 sem_wait(&channel->sem); 860 861 rc = req->args.rc; 862 free_fs_request(req); 863 864 return rc; 865 } 866 867 static void 868 fs_create_blob_close_cb(void *ctx, int bserrno) 869 { 870 struct spdk_fs_request *req = ctx; 871 struct spdk_fs_cb_args *args = &req->args; 872 873 args->fn.file_op(args->arg, bserrno); 874 free_fs_request(req); 875 } 876 877 static void 878 fs_create_blob_resize_cb(void *ctx, int bserrno) 879 { 880 struct spdk_fs_request *req = ctx; 881 struct spdk_fs_cb_args *args = &req->args; 882 struct spdk_file *f = args->file; 883 struct spdk_blob *blob = args->op.create.blob; 884 uint64_t length = 0; 885 886 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 887 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 888 889 spdk_blob_close(blob, fs_create_blob_close_cb, args); 890 } 891 892 static void 893 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 894 { 895 struct spdk_fs_request *req = ctx; 896 struct spdk_fs_cb_args *args = &req->args; 897 898 args->op.create.blob = blob; 899 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 900 } 901 902 static void 903 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 904 { 905 struct spdk_fs_request *req = ctx; 906 struct spdk_fs_cb_args *args = &req->args; 907 struct spdk_file *f = args->file; 908 909 f->blobid = blobid; 910 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 911 } 912 913 void 914 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 915 spdk_file_op_complete cb_fn, void *cb_arg) 916 { 917 struct spdk_file *file; 918 struct spdk_fs_request *req; 919 struct spdk_fs_cb_args *args; 920 921 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 922 cb_fn(cb_arg, -ENAMETOOLONG); 923 return; 924 } 925 926 file = fs_find_file(fs, name); 927 if (file != NULL) { 928 cb_fn(cb_arg, -EEXIST); 929 return; 930 } 931 932 file = file_alloc(fs); 933 if (file == NULL) { 934 cb_fn(cb_arg, -ENOMEM); 935 return; 936 } 937 938 req = alloc_fs_request(fs->md_target.md_fs_channel); 939 if (req == NULL) { 940 cb_fn(cb_arg, -ENOMEM); 941 return; 942 } 943 944 args = &req->args; 945 args->file = file; 946 args->fn.file_op = cb_fn; 947 args->arg = cb_arg; 948 949 file->name = strdup(name); 950 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 951 } 952 953 static void 954 __fs_create_file_done(void *arg, int fserrno) 955 { 956 struct spdk_fs_request *req = arg; 957 struct spdk_fs_cb_args *args = &req->args; 958 959 args->rc = fserrno; 960 sem_post(args->sem); 961 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 962 } 963 964 static void 965 __fs_create_file(void *arg) 966 { 967 struct spdk_fs_request *req = arg; 968 struct spdk_fs_cb_args *args = &req->args; 969 970 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 971 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 972 } 973 974 int 975 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 976 { 977 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 978 struct spdk_fs_request *req; 979 struct spdk_fs_cb_args *args; 980 int rc; 981 982 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 983 984 req = alloc_fs_request(channel); 985 if (req == NULL) { 986 return -ENOMEM; 987 } 988 989 args = &req->args; 990 args->fs = fs; 991 args->op.create.name = name; 992 args->sem = &channel->sem; 993 fs->send_request(__fs_create_file, req); 994 sem_wait(&channel->sem); 995 rc = args->rc; 996 free_fs_request(req); 997 998 return rc; 999 } 1000 1001 static void 1002 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1003 { 1004 struct spdk_fs_request *req = ctx; 1005 struct spdk_fs_cb_args *args = &req->args; 1006 struct spdk_file *f = args->file; 1007 1008 f->blob = blob; 1009 while (!TAILQ_EMPTY(&f->open_requests)) { 1010 req = TAILQ_FIRST(&f->open_requests); 1011 args = &req->args; 1012 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1013 args->fn.file_op_with_handle(args->arg, f, bserrno); 1014 free_fs_request(req); 1015 } 1016 } 1017 1018 static void 1019 fs_open_blob_create_cb(void *ctx, int bserrno) 1020 { 1021 struct spdk_fs_request *req = ctx; 1022 struct spdk_fs_cb_args *args = &req->args; 1023 struct spdk_file *file = args->file; 1024 struct spdk_filesystem *fs = args->fs; 1025 1026 if (file == NULL) { 1027 /* 1028 * This is from an open with CREATE flag - the file 1029 * is now created so look it up in the file list for this 1030 * filesystem. 1031 */ 1032 file = fs_find_file(fs, args->op.open.name); 1033 assert(file != NULL); 1034 args->file = file; 1035 } 1036 1037 file->ref_count++; 1038 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1039 if (file->ref_count == 1) { 1040 assert(file->blob == NULL); 1041 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1042 } else if (file->blob != NULL) { 1043 fs_open_blob_done(req, file->blob, 0); 1044 } else { 1045 /* 1046 * The blob open for this file is in progress due to a previous 1047 * open request. When that open completes, it will invoke the 1048 * open callback for this request. 1049 */ 1050 } 1051 } 1052 1053 void 1054 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1055 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1056 { 1057 struct spdk_file *f = NULL; 1058 struct spdk_fs_request *req; 1059 struct spdk_fs_cb_args *args; 1060 1061 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1062 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1063 return; 1064 } 1065 1066 f = fs_find_file(fs, name); 1067 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1068 cb_fn(cb_arg, NULL, -ENOENT); 1069 return; 1070 } 1071 1072 if (f != NULL && f->is_deleted == true) { 1073 cb_fn(cb_arg, NULL, -ENOENT); 1074 return; 1075 } 1076 1077 req = alloc_fs_request(fs->md_target.md_fs_channel); 1078 if (req == NULL) { 1079 cb_fn(cb_arg, NULL, -ENOMEM); 1080 return; 1081 } 1082 1083 args = &req->args; 1084 args->fn.file_op_with_handle = cb_fn; 1085 args->arg = cb_arg; 1086 args->file = f; 1087 args->fs = fs; 1088 args->op.open.name = name; 1089 1090 if (f == NULL) { 1091 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1092 } else { 1093 fs_open_blob_create_cb(req, 0); 1094 } 1095 } 1096 1097 static void 1098 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1099 { 1100 struct spdk_fs_request *req = arg; 1101 struct spdk_fs_cb_args *args = &req->args; 1102 1103 args->file = file; 1104 args->rc = bserrno; 1105 sem_post(args->sem); 1106 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1107 } 1108 1109 static void 1110 __fs_open_file(void *arg) 1111 { 1112 struct spdk_fs_request *req = arg; 1113 struct spdk_fs_cb_args *args = &req->args; 1114 1115 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1116 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1117 __fs_open_file_done, req); 1118 } 1119 1120 int 1121 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1122 const char *name, uint32_t flags, struct spdk_file **file) 1123 { 1124 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1125 struct spdk_fs_request *req; 1126 struct spdk_fs_cb_args *args; 1127 int rc; 1128 1129 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1130 1131 req = alloc_fs_request(channel); 1132 if (req == NULL) { 1133 return -ENOMEM; 1134 } 1135 1136 args = &req->args; 1137 args->fs = fs; 1138 args->op.open.name = name; 1139 args->op.open.flags = flags; 1140 args->sem = &channel->sem; 1141 fs->send_request(__fs_open_file, req); 1142 sem_wait(&channel->sem); 1143 rc = args->rc; 1144 if (rc == 0) { 1145 *file = args->file; 1146 } else { 1147 *file = NULL; 1148 } 1149 free_fs_request(req); 1150 1151 return rc; 1152 } 1153 1154 static void 1155 fs_rename_blob_close_cb(void *ctx, int bserrno) 1156 { 1157 struct spdk_fs_request *req = ctx; 1158 struct spdk_fs_cb_args *args = &req->args; 1159 1160 args->fn.fs_op(args->arg, bserrno); 1161 free_fs_request(req); 1162 } 1163 1164 static void 1165 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1166 { 1167 struct spdk_fs_request *req = ctx; 1168 struct spdk_fs_cb_args *args = &req->args; 1169 const char *new_name = args->op.rename.new_name; 1170 1171 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1172 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1173 } 1174 1175 static void 1176 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1177 { 1178 struct spdk_fs_cb_args *args = &req->args; 1179 struct spdk_file *f; 1180 1181 f = fs_find_file(args->fs, args->op.rename.old_name); 1182 if (f == NULL) { 1183 args->fn.fs_op(args->arg, -ENOENT); 1184 free_fs_request(req); 1185 return; 1186 } 1187 1188 free(f->name); 1189 f->name = strdup(args->op.rename.new_name); 1190 args->file = f; 1191 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1192 } 1193 1194 static void 1195 fs_rename_delete_done(void *arg, int fserrno) 1196 { 1197 __spdk_fs_md_rename_file(arg); 1198 } 1199 1200 void 1201 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1202 const char *old_name, const char *new_name, 1203 spdk_file_op_complete cb_fn, void *cb_arg) 1204 { 1205 struct spdk_file *f; 1206 struct spdk_fs_request *req; 1207 struct spdk_fs_cb_args *args; 1208 1209 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1210 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1211 cb_fn(cb_arg, -ENAMETOOLONG); 1212 return; 1213 } 1214 1215 req = alloc_fs_request(fs->md_target.md_fs_channel); 1216 if (req == NULL) { 1217 cb_fn(cb_arg, -ENOMEM); 1218 return; 1219 } 1220 1221 args = &req->args; 1222 args->fn.fs_op = cb_fn; 1223 args->fs = fs; 1224 args->arg = cb_arg; 1225 args->op.rename.old_name = old_name; 1226 args->op.rename.new_name = new_name; 1227 1228 f = fs_find_file(fs, new_name); 1229 if (f == NULL) { 1230 __spdk_fs_md_rename_file(req); 1231 return; 1232 } 1233 1234 /* 1235 * The rename overwrites an existing file. So delete the existing file, then 1236 * do the actual rename. 1237 */ 1238 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1239 } 1240 1241 static void 1242 __fs_rename_file_done(void *arg, int fserrno) 1243 { 1244 struct spdk_fs_request *req = arg; 1245 struct spdk_fs_cb_args *args = &req->args; 1246 1247 args->rc = fserrno; 1248 sem_post(args->sem); 1249 } 1250 1251 static void 1252 __fs_rename_file(void *arg) 1253 { 1254 struct spdk_fs_request *req = arg; 1255 struct spdk_fs_cb_args *args = &req->args; 1256 1257 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1258 __fs_rename_file_done, req); 1259 } 1260 1261 int 1262 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1263 const char *old_name, const char *new_name) 1264 { 1265 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1266 struct spdk_fs_request *req; 1267 struct spdk_fs_cb_args *args; 1268 int rc; 1269 1270 req = alloc_fs_request(channel); 1271 if (req == NULL) { 1272 return -ENOMEM; 1273 } 1274 1275 args = &req->args; 1276 1277 args->fs = fs; 1278 args->op.rename.old_name = old_name; 1279 args->op.rename.new_name = new_name; 1280 args->sem = &channel->sem; 1281 fs->send_request(__fs_rename_file, req); 1282 sem_wait(&channel->sem); 1283 rc = args->rc; 1284 free_fs_request(req); 1285 return rc; 1286 } 1287 1288 static void 1289 blob_delete_cb(void *ctx, int bserrno) 1290 { 1291 struct spdk_fs_request *req = ctx; 1292 struct spdk_fs_cb_args *args = &req->args; 1293 1294 args->fn.file_op(args->arg, bserrno); 1295 free_fs_request(req); 1296 } 1297 1298 void 1299 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1300 spdk_file_op_complete cb_fn, void *cb_arg) 1301 { 1302 struct spdk_file *f; 1303 spdk_blob_id blobid; 1304 struct spdk_fs_request *req; 1305 struct spdk_fs_cb_args *args; 1306 1307 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1308 1309 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1310 cb_fn(cb_arg, -ENAMETOOLONG); 1311 return; 1312 } 1313 1314 f = fs_find_file(fs, name); 1315 if (f == NULL) { 1316 cb_fn(cb_arg, -ENOENT); 1317 return; 1318 } 1319 1320 req = alloc_fs_request(fs->md_target.md_fs_channel); 1321 if (req == NULL) { 1322 cb_fn(cb_arg, -ENOMEM); 1323 return; 1324 } 1325 1326 args = &req->args; 1327 args->fn.file_op = cb_fn; 1328 args->arg = cb_arg; 1329 1330 if (f->ref_count > 0) { 1331 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1332 f->is_deleted = true; 1333 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1334 spdk_blob_sync_md(f->blob, blob_delete_cb, args); 1335 return; 1336 } 1337 1338 TAILQ_REMOVE(&fs->files, f, tailq); 1339 1340 cache_free_buffers(f); 1341 1342 blobid = f->blobid; 1343 1344 free(f->name); 1345 free(f->tree); 1346 free(f); 1347 1348 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1349 } 1350 1351 static void 1352 __fs_delete_file_done(void *arg, int fserrno) 1353 { 1354 struct spdk_fs_request *req = arg; 1355 struct spdk_fs_cb_args *args = &req->args; 1356 1357 args->rc = fserrno; 1358 sem_post(args->sem); 1359 } 1360 1361 static void 1362 __fs_delete_file(void *arg) 1363 { 1364 struct spdk_fs_request *req = arg; 1365 struct spdk_fs_cb_args *args = &req->args; 1366 1367 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1368 } 1369 1370 int 1371 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1372 const char *name) 1373 { 1374 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1375 struct spdk_fs_request *req; 1376 struct spdk_fs_cb_args *args; 1377 int rc; 1378 1379 req = alloc_fs_request(channel); 1380 if (req == NULL) { 1381 return -ENOMEM; 1382 } 1383 1384 args = &req->args; 1385 args->fs = fs; 1386 args->op.delete.name = name; 1387 args->sem = &channel->sem; 1388 fs->send_request(__fs_delete_file, req); 1389 sem_wait(&channel->sem); 1390 rc = args->rc; 1391 free_fs_request(req); 1392 1393 return rc; 1394 } 1395 1396 spdk_fs_iter 1397 spdk_fs_iter_first(struct spdk_filesystem *fs) 1398 { 1399 struct spdk_file *f; 1400 1401 f = TAILQ_FIRST(&fs->files); 1402 return f; 1403 } 1404 1405 spdk_fs_iter 1406 spdk_fs_iter_next(spdk_fs_iter iter) 1407 { 1408 struct spdk_file *f = iter; 1409 1410 if (f == NULL) { 1411 return NULL; 1412 } 1413 1414 f = TAILQ_NEXT(f, tailq); 1415 return f; 1416 } 1417 1418 const char * 1419 spdk_file_get_name(struct spdk_file *file) 1420 { 1421 return file->name; 1422 } 1423 1424 uint64_t 1425 spdk_file_get_length(struct spdk_file *file) 1426 { 1427 assert(file != NULL); 1428 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1429 return file->length; 1430 } 1431 1432 static void 1433 fs_truncate_complete_cb(void *ctx, int bserrno) 1434 { 1435 struct spdk_fs_request *req = ctx; 1436 struct spdk_fs_cb_args *args = &req->args; 1437 1438 args->fn.file_op(args->arg, bserrno); 1439 free_fs_request(req); 1440 } 1441 1442 static void 1443 fs_truncate_resize_cb(void *ctx, int bserrno) 1444 { 1445 struct spdk_fs_request *req = ctx; 1446 struct spdk_fs_cb_args *args = &req->args; 1447 struct spdk_file *file = args->file; 1448 uint64_t *length = &args->op.truncate.length; 1449 1450 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1451 1452 file->length = *length; 1453 if (file->append_pos > file->length) { 1454 file->append_pos = file->length; 1455 } 1456 1457 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, args); 1458 } 1459 1460 static uint64_t 1461 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1462 { 1463 return (length + cluster_sz - 1) / cluster_sz; 1464 } 1465 1466 void 1467 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1468 spdk_file_op_complete cb_fn, void *cb_arg) 1469 { 1470 struct spdk_filesystem *fs; 1471 size_t num_clusters; 1472 struct spdk_fs_request *req; 1473 struct spdk_fs_cb_args *args; 1474 1475 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1476 if (length == file->length) { 1477 cb_fn(cb_arg, 0); 1478 return; 1479 } 1480 1481 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1482 if (req == NULL) { 1483 cb_fn(cb_arg, -ENOMEM); 1484 return; 1485 } 1486 1487 args = &req->args; 1488 args->fn.file_op = cb_fn; 1489 args->arg = cb_arg; 1490 args->file = file; 1491 args->op.truncate.length = length; 1492 fs = file->fs; 1493 1494 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1495 1496 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1497 } 1498 1499 static void 1500 __truncate(void *arg) 1501 { 1502 struct spdk_fs_request *req = arg; 1503 struct spdk_fs_cb_args *args = &req->args; 1504 1505 spdk_file_truncate_async(args->file, args->op.truncate.length, 1506 args->fn.file_op, args->arg); 1507 } 1508 1509 int 1510 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1511 uint64_t length) 1512 { 1513 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1514 struct spdk_fs_request *req; 1515 struct spdk_fs_cb_args *args; 1516 1517 req = alloc_fs_request(channel); 1518 if (req == NULL) { 1519 return -ENOMEM; 1520 } 1521 1522 args = &req->args; 1523 1524 args->file = file; 1525 args->op.truncate.length = length; 1526 args->fn.file_op = __sem_post; 1527 args->arg = &channel->sem; 1528 1529 channel->send_request(__truncate, req); 1530 sem_wait(&channel->sem); 1531 free_fs_request(req); 1532 1533 return 0; 1534 } 1535 1536 static void 1537 __rw_done(void *ctx, int bserrno) 1538 { 1539 struct spdk_fs_request *req = ctx; 1540 struct spdk_fs_cb_args *args = &req->args; 1541 1542 spdk_dma_free(args->op.rw.pin_buf); 1543 args->fn.file_op(args->arg, bserrno); 1544 free_fs_request(req); 1545 } 1546 1547 static void 1548 __read_done(void *ctx, int bserrno) 1549 { 1550 struct spdk_fs_request *req = ctx; 1551 struct spdk_fs_cb_args *args = &req->args; 1552 1553 assert(req != NULL); 1554 if (args->op.rw.is_read) { 1555 memcpy(args->op.rw.user_buf, 1556 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1557 args->op.rw.length); 1558 __rw_done(req, 0); 1559 } else { 1560 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1561 args->op.rw.user_buf, 1562 args->op.rw.length); 1563 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1564 args->op.rw.pin_buf, 1565 args->op.rw.start_page, args->op.rw.num_pages, 1566 __rw_done, req); 1567 } 1568 } 1569 1570 static void 1571 __do_blob_read(void *ctx, int fserrno) 1572 { 1573 struct spdk_fs_request *req = ctx; 1574 struct spdk_fs_cb_args *args = &req->args; 1575 1576 if (fserrno) { 1577 __rw_done(req, fserrno); 1578 return; 1579 } 1580 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1581 args->op.rw.pin_buf, 1582 args->op.rw.start_page, args->op.rw.num_pages, 1583 __read_done, req); 1584 } 1585 1586 static void 1587 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1588 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1589 { 1590 uint64_t end_page; 1591 1592 *page_size = spdk_bs_get_page_size(file->fs->bs); 1593 *start_page = offset / *page_size; 1594 end_page = (offset + length - 1) / *page_size; 1595 *num_pages = (end_page - *start_page + 1); 1596 } 1597 1598 static void 1599 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1600 void *payload, uint64_t offset, uint64_t length, 1601 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1602 { 1603 struct spdk_fs_request *req; 1604 struct spdk_fs_cb_args *args; 1605 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1606 uint64_t start_page, num_pages, pin_buf_length; 1607 uint32_t page_size; 1608 1609 if (is_read && offset + length > file->length) { 1610 cb_fn(cb_arg, -EINVAL); 1611 return; 1612 } 1613 1614 req = alloc_fs_request(channel); 1615 if (req == NULL) { 1616 cb_fn(cb_arg, -ENOMEM); 1617 return; 1618 } 1619 1620 args = &req->args; 1621 args->fn.file_op = cb_fn; 1622 args->arg = cb_arg; 1623 args->file = file; 1624 args->op.rw.channel = channel->bs_channel; 1625 args->op.rw.user_buf = payload; 1626 args->op.rw.is_read = is_read; 1627 args->op.rw.offset = offset; 1628 args->op.rw.length = length; 1629 1630 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1631 pin_buf_length = num_pages * page_size; 1632 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL); 1633 if (args->op.rw.pin_buf == NULL) { 1634 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1635 file->name, offset, length); 1636 free_fs_request(req); 1637 cb_fn(cb_arg, -ENOMEM); 1638 return; 1639 } 1640 1641 args->op.rw.start_page = start_page; 1642 args->op.rw.num_pages = num_pages; 1643 1644 if (!is_read && file->length < offset + length) { 1645 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1646 } else { 1647 __do_blob_read(req, 0); 1648 } 1649 } 1650 1651 void 1652 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1653 void *payload, uint64_t offset, uint64_t length, 1654 spdk_file_op_complete cb_fn, void *cb_arg) 1655 { 1656 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1657 } 1658 1659 void 1660 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1661 void *payload, uint64_t offset, uint64_t length, 1662 spdk_file_op_complete cb_fn, void *cb_arg) 1663 { 1664 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1665 file->name, offset, length); 1666 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1667 } 1668 1669 struct spdk_io_channel * 1670 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1671 { 1672 struct spdk_io_channel *io_channel; 1673 struct spdk_fs_channel *fs_channel; 1674 1675 io_channel = spdk_get_io_channel(&fs->io_target); 1676 fs_channel = spdk_io_channel_get_ctx(io_channel); 1677 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1678 fs_channel->send_request = __send_request_direct; 1679 1680 return io_channel; 1681 } 1682 1683 struct spdk_io_channel * 1684 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs) 1685 { 1686 struct spdk_io_channel *io_channel; 1687 struct spdk_fs_channel *fs_channel; 1688 1689 io_channel = spdk_get_io_channel(&fs->io_target); 1690 fs_channel = spdk_io_channel_get_ctx(io_channel); 1691 fs_channel->send_request = fs->send_request; 1692 fs_channel->sync = 1; 1693 pthread_spin_init(&fs_channel->lock, 0); 1694 1695 return io_channel; 1696 } 1697 1698 void 1699 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1700 { 1701 spdk_put_io_channel(channel); 1702 } 1703 1704 void 1705 spdk_fs_set_cache_size(uint64_t size_in_mb) 1706 { 1707 g_fs_cache_size = size_in_mb * 1024 * 1024; 1708 } 1709 1710 uint64_t 1711 spdk_fs_get_cache_size(void) 1712 { 1713 return g_fs_cache_size / (1024 * 1024); 1714 } 1715 1716 static void __file_flush(void *_args); 1717 1718 static void * 1719 alloc_cache_memory_buffer(struct spdk_file *context) 1720 { 1721 struct spdk_file *file; 1722 void *buf; 1723 1724 buf = spdk_mempool_get(g_cache_pool); 1725 if (buf != NULL) { 1726 return buf; 1727 } 1728 1729 pthread_spin_lock(&g_caches_lock); 1730 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1731 if (!file->open_for_writing && 1732 file->priority == SPDK_FILE_PRIORITY_LOW && 1733 file != context) { 1734 break; 1735 } 1736 } 1737 pthread_spin_unlock(&g_caches_lock); 1738 if (file != NULL) { 1739 cache_free_buffers(file); 1740 buf = spdk_mempool_get(g_cache_pool); 1741 if (buf != NULL) { 1742 return buf; 1743 } 1744 } 1745 1746 pthread_spin_lock(&g_caches_lock); 1747 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1748 if (!file->open_for_writing && file != context) { 1749 break; 1750 } 1751 } 1752 pthread_spin_unlock(&g_caches_lock); 1753 if (file != NULL) { 1754 cache_free_buffers(file); 1755 buf = spdk_mempool_get(g_cache_pool); 1756 if (buf != NULL) { 1757 return buf; 1758 } 1759 } 1760 1761 pthread_spin_lock(&g_caches_lock); 1762 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1763 if (file != context) { 1764 break; 1765 } 1766 } 1767 pthread_spin_unlock(&g_caches_lock); 1768 if (file != NULL) { 1769 cache_free_buffers(file); 1770 buf = spdk_mempool_get(g_cache_pool); 1771 if (buf != NULL) { 1772 return buf; 1773 } 1774 } 1775 1776 return NULL; 1777 } 1778 1779 static struct cache_buffer * 1780 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1781 { 1782 struct cache_buffer *buf; 1783 int count = 0; 1784 1785 buf = calloc(1, sizeof(*buf)); 1786 if (buf == NULL) { 1787 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 1788 return NULL; 1789 } 1790 1791 buf->buf = alloc_cache_memory_buffer(file); 1792 while (buf->buf == NULL) { 1793 /* 1794 * TODO: alloc_cache_memory_buffer() should eventually free 1795 * some buffers. Need a more sophisticated check here, instead 1796 * of just bailing if 100 tries does not result in getting a 1797 * free buffer. This will involve using the sync channel's 1798 * semaphore to block until a buffer becomes available. 1799 */ 1800 if (count++ == 100) { 1801 SPDK_ERRLOG("could not allocate cache buffer\n"); 1802 assert(false); 1803 free(buf); 1804 return NULL; 1805 } 1806 buf->buf = alloc_cache_memory_buffer(file); 1807 } 1808 1809 buf->buf_size = CACHE_BUFFER_SIZE; 1810 buf->offset = offset; 1811 1812 pthread_spin_lock(&g_caches_lock); 1813 if (file->tree->present_mask == 0) { 1814 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1815 } 1816 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1817 pthread_spin_unlock(&g_caches_lock); 1818 1819 return buf; 1820 } 1821 1822 static struct cache_buffer * 1823 cache_append_buffer(struct spdk_file *file) 1824 { 1825 struct cache_buffer *last; 1826 1827 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1828 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1829 1830 last = cache_insert_buffer(file, file->append_pos); 1831 if (last == NULL) { 1832 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 1833 return NULL; 1834 } 1835 1836 file->last = last; 1837 1838 return last; 1839 } 1840 1841 static void 1842 __wake_caller(struct spdk_fs_cb_args *args) 1843 { 1844 sem_post(args->sem); 1845 } 1846 1847 static void __check_sync_reqs(struct spdk_file *file); 1848 1849 static void 1850 __file_cache_finish_sync(struct spdk_file *file) 1851 { 1852 struct spdk_fs_request *sync_req; 1853 struct spdk_fs_cb_args *sync_args; 1854 1855 pthread_spin_lock(&file->lock); 1856 sync_req = TAILQ_FIRST(&file->sync_requests); 1857 sync_args = &sync_req->args; 1858 assert(sync_args->op.sync.offset <= file->length_flushed); 1859 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1860 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1861 pthread_spin_unlock(&file->lock); 1862 1863 sync_args->fn.file_op(sync_args->arg, 0); 1864 __check_sync_reqs(file); 1865 1866 pthread_spin_lock(&file->lock); 1867 free_fs_request(sync_req); 1868 pthread_spin_unlock(&file->lock); 1869 } 1870 1871 static void 1872 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1873 { 1874 struct spdk_file *file = ctx; 1875 1876 __file_cache_finish_sync(file); 1877 } 1878 1879 static void 1880 __free_args(struct spdk_fs_cb_args *args) 1881 { 1882 struct spdk_fs_request *req; 1883 1884 if (!args->from_request) { 1885 free(args); 1886 } else { 1887 /* Depends on args being at the start of the spdk_fs_request structure. */ 1888 req = (struct spdk_fs_request *)args; 1889 free_fs_request(req); 1890 } 1891 } 1892 1893 static void 1894 __check_sync_reqs(struct spdk_file *file) 1895 { 1896 struct spdk_fs_request *sync_req; 1897 1898 pthread_spin_lock(&file->lock); 1899 1900 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1901 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1902 break; 1903 } 1904 } 1905 1906 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1907 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1908 sync_req->args.op.sync.xattr_in_progress = true; 1909 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 1910 sizeof(file->length_flushed)); 1911 1912 pthread_spin_unlock(&file->lock); 1913 spdk_blob_sync_md(file->blob, __file_cache_finish_sync_bs_cb, file); 1914 } else { 1915 pthread_spin_unlock(&file->lock); 1916 } 1917 } 1918 1919 static void 1920 __file_flush_done(void *arg, int bserrno) 1921 { 1922 struct spdk_fs_cb_args *args = arg; 1923 struct spdk_file *file = args->file; 1924 struct cache_buffer *next = args->op.flush.cache_buffer; 1925 1926 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1927 1928 pthread_spin_lock(&file->lock); 1929 next->in_progress = false; 1930 next->bytes_flushed += args->op.flush.length; 1931 file->length_flushed += args->op.flush.length; 1932 if (file->length_flushed > file->length) { 1933 file->length = file->length_flushed; 1934 } 1935 if (next->bytes_flushed == next->buf_size) { 1936 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1937 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1938 } 1939 1940 /* 1941 * Assert that there is no cached data that extends past the end of the underlying 1942 * blob. 1943 */ 1944 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1945 next->bytes_filled == 0); 1946 1947 pthread_spin_unlock(&file->lock); 1948 1949 __check_sync_reqs(file); 1950 1951 __file_flush(args); 1952 } 1953 1954 static void 1955 __file_flush(void *_args) 1956 { 1957 struct spdk_fs_cb_args *args = _args; 1958 struct spdk_file *file = args->file; 1959 struct cache_buffer *next; 1960 uint64_t offset, length, start_page, num_pages; 1961 uint32_t page_size; 1962 1963 pthread_spin_lock(&file->lock); 1964 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1965 if (next == NULL || next->in_progress) { 1966 /* 1967 * There is either no data to flush, or a flush I/O is already in 1968 * progress. So return immediately - if a flush I/O is in 1969 * progress we will flush more data after that is completed. 1970 */ 1971 __free_args(args); 1972 if (next == NULL) { 1973 /* 1974 * For cases where a file's cache was evicted, and then the 1975 * file was later appended, we will write the data directly 1976 * to disk and bypass cache. So just update length_flushed 1977 * here to reflect that all data was already written to disk. 1978 */ 1979 file->length_flushed = file->append_pos; 1980 } 1981 pthread_spin_unlock(&file->lock); 1982 if (next == NULL) { 1983 /* 1984 * There is no data to flush, but we still need to check for any 1985 * outstanding sync requests to make sure metadata gets updated. 1986 */ 1987 __check_sync_reqs(file); 1988 } 1989 return; 1990 } 1991 1992 offset = next->offset + next->bytes_flushed; 1993 length = next->bytes_filled - next->bytes_flushed; 1994 if (length == 0) { 1995 __free_args(args); 1996 pthread_spin_unlock(&file->lock); 1997 return; 1998 } 1999 args->op.flush.length = length; 2000 args->op.flush.cache_buffer = next; 2001 2002 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 2003 2004 next->in_progress = true; 2005 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2006 offset, length, start_page, num_pages); 2007 pthread_spin_unlock(&file->lock); 2008 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2009 next->buf + (start_page * page_size) - next->offset, 2010 start_page, num_pages, __file_flush_done, args); 2011 } 2012 2013 static void 2014 __file_extend_done(void *arg, int bserrno) 2015 { 2016 struct spdk_fs_cb_args *args = arg; 2017 2018 __wake_caller(args); 2019 } 2020 2021 static void 2022 __file_extend_resize_cb(void *_args, int bserrno) 2023 { 2024 struct spdk_fs_cb_args *args = _args; 2025 struct spdk_file *file = args->file; 2026 2027 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2028 } 2029 2030 static void 2031 __file_extend_blob(void *_args) 2032 { 2033 struct spdk_fs_cb_args *args = _args; 2034 struct spdk_file *file = args->file; 2035 2036 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2037 } 2038 2039 static void 2040 __rw_from_file_done(void *arg, int bserrno) 2041 { 2042 struct spdk_fs_cb_args *args = arg; 2043 2044 __wake_caller(args); 2045 __free_args(args); 2046 } 2047 2048 static void 2049 __rw_from_file(void *_args) 2050 { 2051 struct spdk_fs_cb_args *args = _args; 2052 struct spdk_file *file = args->file; 2053 2054 if (args->op.rw.is_read) { 2055 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 2056 args->op.rw.offset, args->op.rw.length, 2057 __rw_from_file_done, args); 2058 } else { 2059 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 2060 args->op.rw.offset, args->op.rw.length, 2061 __rw_from_file_done, args); 2062 } 2063 } 2064 2065 static int 2066 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 2067 uint64_t offset, uint64_t length, bool is_read) 2068 { 2069 struct spdk_fs_cb_args *args; 2070 2071 args = calloc(1, sizeof(*args)); 2072 if (args == NULL) { 2073 sem_post(sem); 2074 return -ENOMEM; 2075 } 2076 2077 args->file = file; 2078 args->sem = sem; 2079 args->op.rw.user_buf = payload; 2080 args->op.rw.offset = offset; 2081 args->op.rw.length = length; 2082 args->op.rw.is_read = is_read; 2083 file->fs->send_request(__rw_from_file, args); 2084 return 0; 2085 } 2086 2087 int 2088 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 2089 void *payload, uint64_t offset, uint64_t length) 2090 { 2091 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2092 struct spdk_fs_cb_args *args; 2093 uint64_t rem_length, copy, blob_size, cluster_sz; 2094 uint32_t cache_buffers_filled = 0; 2095 uint8_t *cur_payload; 2096 struct cache_buffer *last; 2097 2098 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2099 2100 if (length == 0) { 2101 return 0; 2102 } 2103 2104 if (offset != file->append_pos) { 2105 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2106 return -EINVAL; 2107 } 2108 2109 pthread_spin_lock(&file->lock); 2110 file->open_for_writing = true; 2111 2112 if (file->last == NULL) { 2113 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 2114 cache_append_buffer(file); 2115 } else { 2116 int rc; 2117 2118 file->append_pos += length; 2119 pthread_spin_unlock(&file->lock); 2120 rc = __send_rw_from_file(file, &channel->sem, payload, 2121 offset, length, false); 2122 sem_wait(&channel->sem); 2123 return rc; 2124 } 2125 } 2126 2127 blob_size = __file_get_blob_size(file); 2128 2129 if ((offset + length) > blob_size) { 2130 struct spdk_fs_cb_args extend_args = {}; 2131 2132 cluster_sz = file->fs->bs_opts.cluster_sz; 2133 extend_args.sem = &channel->sem; 2134 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2135 extend_args.file = file; 2136 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2137 pthread_spin_unlock(&file->lock); 2138 file->fs->send_request(__file_extend_blob, &extend_args); 2139 sem_wait(&channel->sem); 2140 } 2141 2142 last = file->last; 2143 rem_length = length; 2144 cur_payload = payload; 2145 while (rem_length > 0) { 2146 copy = last->buf_size - last->bytes_filled; 2147 if (copy > rem_length) { 2148 copy = rem_length; 2149 } 2150 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2151 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2152 file->append_pos += copy; 2153 if (file->length < file->append_pos) { 2154 file->length = file->append_pos; 2155 } 2156 cur_payload += copy; 2157 last->bytes_filled += copy; 2158 rem_length -= copy; 2159 if (last->bytes_filled == last->buf_size) { 2160 cache_buffers_filled++; 2161 last = cache_append_buffer(file); 2162 if (last == NULL) { 2163 BLOBFS_TRACE(file, "nomem\n"); 2164 pthread_spin_unlock(&file->lock); 2165 return -ENOMEM; 2166 } 2167 } 2168 } 2169 2170 pthread_spin_unlock(&file->lock); 2171 2172 if (cache_buffers_filled == 0) { 2173 return 0; 2174 } 2175 2176 args = calloc(1, sizeof(*args)); 2177 if (args == NULL) { 2178 return -ENOMEM; 2179 } 2180 2181 args->file = file; 2182 file->fs->send_request(__file_flush, args); 2183 return 0; 2184 } 2185 2186 static void 2187 __readahead_done(void *arg, int bserrno) 2188 { 2189 struct spdk_fs_cb_args *args = arg; 2190 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2191 struct spdk_file *file = args->file; 2192 2193 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2194 2195 pthread_spin_lock(&file->lock); 2196 cache_buffer->bytes_filled = args->op.readahead.length; 2197 cache_buffer->bytes_flushed = args->op.readahead.length; 2198 cache_buffer->in_progress = false; 2199 pthread_spin_unlock(&file->lock); 2200 2201 __free_args(args); 2202 } 2203 2204 static void 2205 __readahead(void *_args) 2206 { 2207 struct spdk_fs_cb_args *args = _args; 2208 struct spdk_file *file = args->file; 2209 uint64_t offset, length, start_page, num_pages; 2210 uint32_t page_size; 2211 2212 offset = args->op.readahead.offset; 2213 length = args->op.readahead.length; 2214 assert(length > 0); 2215 2216 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 2217 2218 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2219 offset, length, start_page, num_pages); 2220 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2221 args->op.readahead.cache_buffer->buf, 2222 start_page, num_pages, __readahead_done, args); 2223 } 2224 2225 static uint64_t 2226 __next_cache_buffer_offset(uint64_t offset) 2227 { 2228 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2229 } 2230 2231 static void 2232 check_readahead(struct spdk_file *file, uint64_t offset) 2233 { 2234 struct spdk_fs_cb_args *args; 2235 2236 offset = __next_cache_buffer_offset(offset); 2237 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2238 return; 2239 } 2240 2241 args = calloc(1, sizeof(*args)); 2242 if (args == NULL) { 2243 return; 2244 } 2245 2246 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2247 2248 args->file = file; 2249 args->op.readahead.offset = offset; 2250 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2251 if (!args->op.readahead.cache_buffer) { 2252 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2253 free(args); 2254 return; 2255 } 2256 2257 args->op.readahead.cache_buffer->in_progress = true; 2258 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2259 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2260 } else { 2261 args->op.readahead.length = CACHE_BUFFER_SIZE; 2262 } 2263 file->fs->send_request(__readahead, args); 2264 } 2265 2266 static int 2267 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2268 { 2269 struct cache_buffer *buf; 2270 int rc; 2271 2272 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2273 if (buf == NULL) { 2274 pthread_spin_unlock(&file->lock); 2275 rc = __send_rw_from_file(file, sem, payload, offset, length, true); 2276 pthread_spin_lock(&file->lock); 2277 return rc; 2278 } 2279 2280 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2281 length = buf->offset + buf->bytes_filled - offset; 2282 } 2283 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2284 memcpy(payload, &buf->buf[offset - buf->offset], length); 2285 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2286 pthread_spin_lock(&g_caches_lock); 2287 spdk_tree_remove_buffer(file->tree, buf); 2288 if (file->tree->present_mask == 0) { 2289 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2290 } 2291 pthread_spin_unlock(&g_caches_lock); 2292 } 2293 2294 sem_post(sem); 2295 return 0; 2296 } 2297 2298 int64_t 2299 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2300 void *payload, uint64_t offset, uint64_t length) 2301 { 2302 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2303 uint64_t final_offset, final_length; 2304 uint32_t sub_reads = 0; 2305 int rc = 0; 2306 2307 pthread_spin_lock(&file->lock); 2308 2309 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2310 2311 file->open_for_writing = false; 2312 2313 if (length == 0 || offset >= file->append_pos) { 2314 pthread_spin_unlock(&file->lock); 2315 return 0; 2316 } 2317 2318 if (offset + length > file->append_pos) { 2319 length = file->append_pos - offset; 2320 } 2321 2322 if (offset != file->next_seq_offset) { 2323 file->seq_byte_count = 0; 2324 } 2325 file->seq_byte_count += length; 2326 file->next_seq_offset = offset + length; 2327 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2328 check_readahead(file, offset); 2329 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2330 } 2331 2332 final_length = 0; 2333 final_offset = offset + length; 2334 while (offset < final_offset) { 2335 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2336 if (length > (final_offset - offset)) { 2337 length = final_offset - offset; 2338 } 2339 rc = __file_read(file, payload, offset, length, &channel->sem); 2340 if (rc == 0) { 2341 final_length += length; 2342 } else { 2343 break; 2344 } 2345 payload += length; 2346 offset += length; 2347 sub_reads++; 2348 } 2349 pthread_spin_unlock(&file->lock); 2350 while (sub_reads-- > 0) { 2351 sem_wait(&channel->sem); 2352 } 2353 if (rc == 0) { 2354 return final_length; 2355 } else { 2356 return rc; 2357 } 2358 } 2359 2360 static void 2361 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2362 spdk_file_op_complete cb_fn, void *cb_arg) 2363 { 2364 struct spdk_fs_request *sync_req; 2365 struct spdk_fs_request *flush_req; 2366 struct spdk_fs_cb_args *sync_args; 2367 struct spdk_fs_cb_args *flush_args; 2368 2369 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2370 2371 pthread_spin_lock(&file->lock); 2372 if (file->append_pos <= file->length_flushed) { 2373 BLOBFS_TRACE(file, "done - no data to flush\n"); 2374 pthread_spin_unlock(&file->lock); 2375 cb_fn(cb_arg, 0); 2376 return; 2377 } 2378 2379 sync_req = alloc_fs_request(channel); 2380 if (!sync_req) { 2381 pthread_spin_unlock(&file->lock); 2382 cb_fn(cb_arg, -ENOMEM); 2383 return; 2384 } 2385 sync_args = &sync_req->args; 2386 2387 flush_req = alloc_fs_request(channel); 2388 if (!flush_req) { 2389 pthread_spin_unlock(&file->lock); 2390 cb_fn(cb_arg, -ENOMEM); 2391 return; 2392 } 2393 flush_args = &flush_req->args; 2394 2395 sync_args->file = file; 2396 sync_args->fn.file_op = cb_fn; 2397 sync_args->arg = cb_arg; 2398 sync_args->op.sync.offset = file->append_pos; 2399 sync_args->op.sync.xattr_in_progress = false; 2400 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2401 pthread_spin_unlock(&file->lock); 2402 2403 flush_args->file = file; 2404 channel->send_request(__file_flush, flush_args); 2405 } 2406 2407 int 2408 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2409 { 2410 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2411 2412 _file_sync(file, channel, __sem_post, &channel->sem); 2413 sem_wait(&channel->sem); 2414 2415 return 0; 2416 } 2417 2418 void 2419 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2420 spdk_file_op_complete cb_fn, void *cb_arg) 2421 { 2422 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2423 2424 _file_sync(file, channel, cb_fn, cb_arg); 2425 } 2426 2427 void 2428 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2429 { 2430 BLOBFS_TRACE(file, "priority=%u\n", priority); 2431 file->priority = priority; 2432 2433 } 2434 2435 /* 2436 * Close routines 2437 */ 2438 2439 static void 2440 __file_close_async_done(void *ctx, int bserrno) 2441 { 2442 struct spdk_fs_request *req = ctx; 2443 struct spdk_fs_cb_args *args = &req->args; 2444 struct spdk_file *file = args->file; 2445 2446 if (file->is_deleted) { 2447 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2448 return; 2449 } 2450 2451 args->fn.file_op(args->arg, bserrno); 2452 free_fs_request(req); 2453 } 2454 2455 static void 2456 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2457 { 2458 struct spdk_blob *blob; 2459 2460 pthread_spin_lock(&file->lock); 2461 if (file->ref_count == 0) { 2462 pthread_spin_unlock(&file->lock); 2463 __file_close_async_done(req, -EBADF); 2464 return; 2465 } 2466 2467 file->ref_count--; 2468 if (file->ref_count > 0) { 2469 pthread_spin_unlock(&file->lock); 2470 req->args.fn.file_op(req->args.arg, 0); 2471 free_fs_request(req); 2472 return; 2473 } 2474 2475 pthread_spin_unlock(&file->lock); 2476 2477 blob = file->blob; 2478 file->blob = NULL; 2479 spdk_blob_close(blob, __file_close_async_done, req); 2480 } 2481 2482 static void 2483 __file_close_async__sync_done(void *arg, int fserrno) 2484 { 2485 struct spdk_fs_request *req = arg; 2486 struct spdk_fs_cb_args *args = &req->args; 2487 2488 __file_close_async(args->file, req); 2489 } 2490 2491 void 2492 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2493 { 2494 struct spdk_fs_request *req; 2495 struct spdk_fs_cb_args *args; 2496 2497 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2498 if (req == NULL) { 2499 cb_fn(cb_arg, -ENOMEM); 2500 return; 2501 } 2502 2503 args = &req->args; 2504 args->file = file; 2505 args->fn.file_op = cb_fn; 2506 args->arg = cb_arg; 2507 2508 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2509 } 2510 2511 static void 2512 __file_close_done(void *arg, int fserrno) 2513 { 2514 struct spdk_fs_cb_args *args = arg; 2515 2516 args->rc = fserrno; 2517 sem_post(args->sem); 2518 } 2519 2520 static void 2521 __file_close(void *arg) 2522 { 2523 struct spdk_fs_request *req = arg; 2524 struct spdk_fs_cb_args *args = &req->args; 2525 struct spdk_file *file = args->file; 2526 2527 __file_close_async(file, req); 2528 } 2529 2530 int 2531 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2532 { 2533 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2534 struct spdk_fs_request *req; 2535 struct spdk_fs_cb_args *args; 2536 2537 req = alloc_fs_request(channel); 2538 if (req == NULL) { 2539 return -ENOMEM; 2540 } 2541 2542 args = &req->args; 2543 2544 spdk_file_sync(file, _channel); 2545 BLOBFS_TRACE(file, "name=%s\n", file->name); 2546 args->file = file; 2547 args->sem = &channel->sem; 2548 args->fn.file_op = __file_close_done; 2549 args->arg = req; 2550 channel->send_request(__file_close, req); 2551 sem_wait(&channel->sem); 2552 2553 return args->rc; 2554 } 2555 2556 static void 2557 cache_free_buffers(struct spdk_file *file) 2558 { 2559 BLOBFS_TRACE(file, "free=%s\n", file->name); 2560 pthread_spin_lock(&file->lock); 2561 pthread_spin_lock(&g_caches_lock); 2562 if (file->tree->present_mask == 0) { 2563 pthread_spin_unlock(&g_caches_lock); 2564 pthread_spin_unlock(&file->lock); 2565 return; 2566 } 2567 spdk_tree_free_buffers(file->tree); 2568 2569 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2570 /* If not freed, put it in the end of the queue */ 2571 if (file->tree->present_mask != 0) { 2572 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2573 } 2574 file->last = NULL; 2575 pthread_spin_unlock(&g_caches_lock); 2576 pthread_spin_unlock(&file->lock); 2577 } 2578 2579 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2580 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2581