1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "blobfs_internal.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/thread.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 47 #define BLOBFS_TRACE(file, str, args...) \ 48 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 49 50 #define BLOBFS_TRACE_RW(file, str, args...) \ 51 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 55 56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 57 static struct spdk_mempool *g_cache_pool; 58 static TAILQ_HEAD(, spdk_file) g_caches; 59 static int g_fs_count = 0; 60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 61 static pthread_spinlock_t g_caches_lock; 62 63 void 64 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 65 { 66 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 67 free(cache_buffer); 68 } 69 70 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 71 72 struct spdk_file { 73 struct spdk_filesystem *fs; 74 struct spdk_blob *blob; 75 char *name; 76 uint64_t length; 77 bool is_deleted; 78 bool open_for_writing; 79 uint64_t length_flushed; 80 uint64_t append_pos; 81 uint64_t seq_byte_count; 82 uint64_t next_seq_offset; 83 uint32_t priority; 84 TAILQ_ENTRY(spdk_file) tailq; 85 spdk_blob_id blobid; 86 uint32_t ref_count; 87 pthread_spinlock_t lock; 88 struct cache_buffer *last; 89 struct cache_tree *tree; 90 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 91 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 92 TAILQ_ENTRY(spdk_file) cache_tailq; 93 }; 94 95 struct spdk_deleted_file { 96 spdk_blob_id id; 97 TAILQ_ENTRY(spdk_deleted_file) tailq; 98 }; 99 100 struct spdk_filesystem { 101 struct spdk_blob_store *bs; 102 TAILQ_HEAD(, spdk_file) files; 103 struct spdk_bs_opts bs_opts; 104 struct spdk_bs_dev *bdev; 105 fs_send_request_fn send_request; 106 107 struct { 108 uint32_t max_ops; 109 struct spdk_io_channel *sync_io_channel; 110 struct spdk_fs_channel *sync_fs_channel; 111 } sync_target; 112 113 struct { 114 uint32_t max_ops; 115 struct spdk_io_channel *md_io_channel; 116 struct spdk_fs_channel *md_fs_channel; 117 } md_target; 118 119 struct { 120 uint32_t max_ops; 121 } io_target; 122 }; 123 124 struct spdk_fs_cb_args { 125 union { 126 spdk_fs_op_with_handle_complete fs_op_with_handle; 127 spdk_fs_op_complete fs_op; 128 spdk_file_op_with_handle_complete file_op_with_handle; 129 spdk_file_op_complete file_op; 130 spdk_file_stat_op_complete stat_op; 131 } fn; 132 void *arg; 133 sem_t *sem; 134 struct spdk_filesystem *fs; 135 struct spdk_file *file; 136 int rc; 137 bool from_request; 138 union { 139 struct { 140 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 141 } fs_load; 142 struct { 143 uint64_t length; 144 } truncate; 145 struct { 146 struct spdk_io_channel *channel; 147 void *user_buf; 148 void *pin_buf; 149 int is_read; 150 off_t offset; 151 size_t length; 152 uint64_t start_lba; 153 uint64_t num_lba; 154 uint32_t blocklen; 155 } rw; 156 struct { 157 const char *old_name; 158 const char *new_name; 159 } rename; 160 struct { 161 struct cache_buffer *cache_buffer; 162 uint64_t length; 163 } flush; 164 struct { 165 struct cache_buffer *cache_buffer; 166 uint64_t length; 167 uint64_t offset; 168 } readahead; 169 struct { 170 uint64_t offset; 171 TAILQ_ENTRY(spdk_fs_request) tailq; 172 bool xattr_in_progress; 173 } sync; 174 struct { 175 uint32_t num_clusters; 176 } resize; 177 struct { 178 const char *name; 179 uint32_t flags; 180 TAILQ_ENTRY(spdk_fs_request) tailq; 181 } open; 182 struct { 183 const char *name; 184 struct spdk_blob *blob; 185 } create; 186 struct { 187 const char *name; 188 } delete; 189 struct { 190 const char *name; 191 } stat; 192 } op; 193 }; 194 195 static void cache_free_buffers(struct spdk_file *file); 196 197 void 198 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 199 { 200 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 201 } 202 203 static void 204 __initialize_cache(void) 205 { 206 assert(g_cache_pool == NULL); 207 208 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 209 g_fs_cache_size / CACHE_BUFFER_SIZE, 210 CACHE_BUFFER_SIZE, 211 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 212 SPDK_ENV_SOCKET_ID_ANY); 213 if (!g_cache_pool) { 214 SPDK_ERRLOG("Create mempool failed, you may " 215 "increase the memory and try again\n"); 216 assert(false); 217 } 218 TAILQ_INIT(&g_caches); 219 pthread_spin_init(&g_caches_lock, 0); 220 } 221 222 static void 223 __free_cache(void) 224 { 225 assert(g_cache_pool != NULL); 226 227 spdk_mempool_free(g_cache_pool); 228 g_cache_pool = NULL; 229 } 230 231 static uint64_t 232 __file_get_blob_size(struct spdk_file *file) 233 { 234 uint64_t cluster_sz; 235 236 cluster_sz = file->fs->bs_opts.cluster_sz; 237 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 238 } 239 240 struct spdk_fs_request { 241 struct spdk_fs_cb_args args; 242 TAILQ_ENTRY(spdk_fs_request) link; 243 struct spdk_fs_channel *channel; 244 }; 245 246 struct spdk_fs_channel { 247 struct spdk_fs_request *req_mem; 248 TAILQ_HEAD(, spdk_fs_request) reqs; 249 sem_t sem; 250 struct spdk_filesystem *fs; 251 struct spdk_io_channel *bs_channel; 252 fs_send_request_fn send_request; 253 bool sync; 254 pthread_spinlock_t lock; 255 }; 256 257 static struct spdk_fs_request * 258 alloc_fs_request(struct spdk_fs_channel *channel) 259 { 260 struct spdk_fs_request *req; 261 262 if (channel->sync) { 263 pthread_spin_lock(&channel->lock); 264 } 265 266 req = TAILQ_FIRST(&channel->reqs); 267 if (req) { 268 TAILQ_REMOVE(&channel->reqs, req, link); 269 } 270 271 if (channel->sync) { 272 pthread_spin_unlock(&channel->lock); 273 } 274 275 if (req == NULL) { 276 return NULL; 277 } 278 memset(req, 0, sizeof(*req)); 279 req->channel = channel; 280 req->args.from_request = true; 281 282 return req; 283 } 284 285 static void 286 free_fs_request(struct spdk_fs_request *req) 287 { 288 struct spdk_fs_channel *channel = req->channel; 289 290 if (channel->sync) { 291 pthread_spin_lock(&channel->lock); 292 } 293 294 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 295 296 if (channel->sync) { 297 pthread_spin_unlock(&channel->lock); 298 } 299 } 300 301 static int 302 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 303 uint32_t max_ops) 304 { 305 uint32_t i; 306 307 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 308 if (!channel->req_mem) { 309 return -1; 310 } 311 312 TAILQ_INIT(&channel->reqs); 313 sem_init(&channel->sem, 0, 0); 314 315 for (i = 0; i < max_ops; i++) { 316 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 317 } 318 319 channel->fs = fs; 320 321 return 0; 322 } 323 324 static int 325 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 326 { 327 struct spdk_filesystem *fs; 328 struct spdk_fs_channel *channel = ctx_buf; 329 330 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 331 332 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 333 } 334 335 static int 336 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 337 { 338 struct spdk_filesystem *fs; 339 struct spdk_fs_channel *channel = ctx_buf; 340 341 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 342 343 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 344 } 345 346 static int 347 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 348 { 349 struct spdk_filesystem *fs; 350 struct spdk_fs_channel *channel = ctx_buf; 351 352 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 353 354 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 355 } 356 357 static void 358 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 359 { 360 struct spdk_fs_channel *channel = ctx_buf; 361 362 free(channel->req_mem); 363 if (channel->bs_channel != NULL) { 364 spdk_bs_free_io_channel(channel->bs_channel); 365 } 366 } 367 368 static void 369 __send_request_direct(fs_request_fn fn, void *arg) 370 { 371 fn(arg); 372 } 373 374 static void 375 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 376 { 377 fs->bs = bs; 378 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 379 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 380 fs->md_target.md_fs_channel->send_request = __send_request_direct; 381 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 382 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 383 384 pthread_mutex_lock(&g_cache_init_lock); 385 if (g_fs_count == 0) { 386 __initialize_cache(); 387 } 388 g_fs_count++; 389 pthread_mutex_unlock(&g_cache_init_lock); 390 } 391 392 static void 393 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 394 { 395 struct spdk_fs_request *req = ctx; 396 struct spdk_fs_cb_args *args = &req->args; 397 struct spdk_filesystem *fs = args->fs; 398 399 if (bserrno == 0) { 400 common_fs_bs_init(fs, bs); 401 } else { 402 free(fs); 403 fs = NULL; 404 } 405 406 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 407 free_fs_request(req); 408 } 409 410 static void 411 fs_conf_parse(void) 412 { 413 struct spdk_conf_section *sp; 414 415 sp = spdk_conf_find_section(NULL, "Blobfs"); 416 if (sp == NULL) { 417 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 418 return; 419 } 420 421 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 422 if (g_fs_cache_buffer_shift <= 0) { 423 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 424 } 425 } 426 427 static struct spdk_filesystem * 428 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 429 { 430 struct spdk_filesystem *fs; 431 432 fs = calloc(1, sizeof(*fs)); 433 if (fs == NULL) { 434 return NULL; 435 } 436 437 fs->bdev = dev; 438 fs->send_request = send_request_fn; 439 TAILQ_INIT(&fs->files); 440 441 fs->md_target.max_ops = 512; 442 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 443 sizeof(struct spdk_fs_channel), "blobfs_md"); 444 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 445 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 446 447 fs->sync_target.max_ops = 512; 448 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 449 sizeof(struct spdk_fs_channel), "blobfs_sync"); 450 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 451 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 452 453 fs->io_target.max_ops = 512; 454 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 455 sizeof(struct spdk_fs_channel), "blobfs_io"); 456 457 return fs; 458 } 459 460 static void 461 __wake_caller(void *arg, int fserrno) 462 { 463 struct spdk_fs_cb_args *args = arg; 464 465 args->rc = fserrno; 466 sem_post(args->sem); 467 } 468 469 void 470 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 471 fs_send_request_fn send_request_fn, 472 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 473 { 474 struct spdk_filesystem *fs; 475 struct spdk_fs_request *req; 476 struct spdk_fs_cb_args *args; 477 struct spdk_bs_opts opts = {}; 478 479 fs = fs_alloc(dev, send_request_fn); 480 if (fs == NULL) { 481 cb_fn(cb_arg, NULL, -ENOMEM); 482 return; 483 } 484 485 fs_conf_parse(); 486 487 req = alloc_fs_request(fs->md_target.md_fs_channel); 488 if (req == NULL) { 489 spdk_put_io_channel(fs->md_target.md_io_channel); 490 spdk_io_device_unregister(&fs->md_target, NULL); 491 spdk_put_io_channel(fs->sync_target.sync_io_channel); 492 spdk_io_device_unregister(&fs->sync_target, NULL); 493 spdk_io_device_unregister(&fs->io_target, NULL); 494 free(fs); 495 cb_fn(cb_arg, NULL, -ENOMEM); 496 return; 497 } 498 499 args = &req->args; 500 args->fn.fs_op_with_handle = cb_fn; 501 args->arg = cb_arg; 502 args->fs = fs; 503 504 spdk_bs_opts_init(&opts); 505 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS"); 506 if (opt) { 507 opts.cluster_sz = opt->cluster_sz; 508 } 509 spdk_bs_init(dev, &opts, init_cb, req); 510 } 511 512 static struct spdk_file * 513 file_alloc(struct spdk_filesystem *fs) 514 { 515 struct spdk_file *file; 516 517 file = calloc(1, sizeof(*file)); 518 if (file == NULL) { 519 return NULL; 520 } 521 522 file->tree = calloc(1, sizeof(*file->tree)); 523 if (file->tree == NULL) { 524 free(file); 525 return NULL; 526 } 527 528 file->fs = fs; 529 TAILQ_INIT(&file->open_requests); 530 TAILQ_INIT(&file->sync_requests); 531 pthread_spin_init(&file->lock, 0); 532 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 533 file->priority = SPDK_FILE_PRIORITY_LOW; 534 return file; 535 } 536 537 static void fs_load_done(void *ctx, int bserrno); 538 539 static int 540 _handle_deleted_files(struct spdk_fs_request *req) 541 { 542 struct spdk_fs_cb_args *args = &req->args; 543 struct spdk_filesystem *fs = args->fs; 544 545 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 546 struct spdk_deleted_file *deleted_file; 547 548 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 549 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 550 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 551 free(deleted_file); 552 return 0; 553 } 554 555 return 1; 556 } 557 558 static void 559 fs_load_done(void *ctx, int bserrno) 560 { 561 struct spdk_fs_request *req = ctx; 562 struct spdk_fs_cb_args *args = &req->args; 563 struct spdk_filesystem *fs = args->fs; 564 565 /* The filesystem has been loaded. Now check if there are any files that 566 * were marked for deletion before last unload. Do not complete the 567 * fs_load callback until all of them have been deleted on disk. 568 */ 569 if (_handle_deleted_files(req) == 0) { 570 /* We found a file that's been marked for deleting but not actually 571 * deleted yet. This function will get called again once the delete 572 * operation is completed. 573 */ 574 return; 575 } 576 577 args->fn.fs_op_with_handle(args->arg, fs, 0); 578 free_fs_request(req); 579 580 } 581 582 static void 583 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 584 { 585 struct spdk_fs_request *req = ctx; 586 struct spdk_fs_cb_args *args = &req->args; 587 struct spdk_filesystem *fs = args->fs; 588 uint64_t *length; 589 const char *name; 590 uint32_t *is_deleted; 591 size_t value_len; 592 593 if (rc < 0) { 594 args->fn.fs_op_with_handle(args->arg, fs, rc); 595 free_fs_request(req); 596 return; 597 } 598 599 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 600 if (rc < 0) { 601 args->fn.fs_op_with_handle(args->arg, fs, rc); 602 free_fs_request(req); 603 return; 604 } 605 606 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 607 if (rc < 0) { 608 args->fn.fs_op_with_handle(args->arg, fs, rc); 609 free_fs_request(req); 610 return; 611 } 612 613 assert(value_len == 8); 614 615 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 616 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 617 if (rc < 0) { 618 struct spdk_file *f; 619 620 f = file_alloc(fs); 621 if (f == NULL) { 622 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 623 free_fs_request(req); 624 return; 625 } 626 627 f->name = strdup(name); 628 f->blobid = spdk_blob_get_id(blob); 629 f->length = *length; 630 f->length_flushed = *length; 631 f->append_pos = *length; 632 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 633 } else { 634 struct spdk_deleted_file *deleted_file; 635 636 deleted_file = calloc(1, sizeof(*deleted_file)); 637 if (deleted_file == NULL) { 638 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 639 free_fs_request(req); 640 return; 641 } 642 deleted_file->id = spdk_blob_get_id(blob); 643 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 644 } 645 } 646 647 static void 648 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 649 { 650 struct spdk_fs_request *req = ctx; 651 struct spdk_fs_cb_args *args = &req->args; 652 struct spdk_filesystem *fs = args->fs; 653 struct spdk_bs_type bstype; 654 static const struct spdk_bs_type blobfs_type = {"BLOBFS"}; 655 static const struct spdk_bs_type zeros; 656 657 if (bserrno != 0) { 658 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 659 free_fs_request(req); 660 free(fs); 661 return; 662 } 663 664 bstype = spdk_bs_get_bstype(bs); 665 666 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 667 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n"); 668 spdk_bs_set_bstype(bs, blobfs_type); 669 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 670 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n"); 671 SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype)); 672 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 673 free_fs_request(req); 674 free(fs); 675 return; 676 } 677 678 common_fs_bs_init(fs, bs); 679 fs_load_done(req, 0); 680 } 681 682 static void 683 spdk_fs_io_device_unregister(struct spdk_filesystem *fs) 684 { 685 assert(fs != NULL); 686 spdk_io_device_unregister(&fs->md_target, NULL); 687 spdk_io_device_unregister(&fs->sync_target, NULL); 688 spdk_io_device_unregister(&fs->io_target, NULL); 689 free(fs); 690 } 691 692 static void 693 spdk_fs_free_io_channels(struct spdk_filesystem *fs) 694 { 695 assert(fs != NULL); 696 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 697 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 698 } 699 700 void 701 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 702 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 703 { 704 struct spdk_filesystem *fs; 705 struct spdk_fs_cb_args *args; 706 struct spdk_fs_request *req; 707 struct spdk_bs_opts bs_opts; 708 709 fs = fs_alloc(dev, send_request_fn); 710 if (fs == NULL) { 711 cb_fn(cb_arg, NULL, -ENOMEM); 712 return; 713 } 714 715 fs_conf_parse(); 716 717 req = alloc_fs_request(fs->md_target.md_fs_channel); 718 if (req == NULL) { 719 spdk_fs_free_io_channels(fs); 720 spdk_fs_io_device_unregister(fs); 721 cb_fn(cb_arg, NULL, -ENOMEM); 722 return; 723 } 724 725 args = &req->args; 726 args->fn.fs_op_with_handle = cb_fn; 727 args->arg = cb_arg; 728 args->fs = fs; 729 TAILQ_INIT(&args->op.fs_load.deleted_files); 730 spdk_bs_opts_init(&bs_opts); 731 bs_opts.iter_cb_fn = iter_cb; 732 bs_opts.iter_cb_arg = req; 733 spdk_bs_load(dev, &bs_opts, load_cb, req); 734 } 735 736 static void 737 unload_cb(void *ctx, int bserrno) 738 { 739 struct spdk_fs_request *req = ctx; 740 struct spdk_fs_cb_args *args = &req->args; 741 struct spdk_filesystem *fs = args->fs; 742 struct spdk_file *file, *tmp; 743 744 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 745 TAILQ_REMOVE(&fs->files, file, tailq); 746 cache_free_buffers(file); 747 free(file->name); 748 free(file->tree); 749 free(file); 750 } 751 752 pthread_mutex_lock(&g_cache_init_lock); 753 g_fs_count--; 754 if (g_fs_count == 0) { 755 __free_cache(); 756 } 757 pthread_mutex_unlock(&g_cache_init_lock); 758 759 args->fn.fs_op(args->arg, bserrno); 760 free(req); 761 762 spdk_fs_io_device_unregister(fs); 763 } 764 765 void 766 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 767 { 768 struct spdk_fs_request *req; 769 struct spdk_fs_cb_args *args; 770 771 /* 772 * We must free the md_channel before unloading the blobstore, so just 773 * allocate this request from the general heap. 774 */ 775 req = calloc(1, sizeof(*req)); 776 if (req == NULL) { 777 cb_fn(cb_arg, -ENOMEM); 778 return; 779 } 780 781 args = &req->args; 782 args->fn.fs_op = cb_fn; 783 args->arg = cb_arg; 784 args->fs = fs; 785 786 spdk_fs_free_io_channels(fs); 787 spdk_bs_unload(fs->bs, unload_cb, req); 788 } 789 790 static struct spdk_file * 791 fs_find_file(struct spdk_filesystem *fs, const char *name) 792 { 793 struct spdk_file *file; 794 795 TAILQ_FOREACH(file, &fs->files, tailq) { 796 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 797 return file; 798 } 799 } 800 801 return NULL; 802 } 803 804 void 805 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 806 spdk_file_stat_op_complete cb_fn, void *cb_arg) 807 { 808 struct spdk_file_stat stat; 809 struct spdk_file *f = NULL; 810 811 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 812 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 813 return; 814 } 815 816 f = fs_find_file(fs, name); 817 if (f != NULL) { 818 stat.blobid = f->blobid; 819 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 820 cb_fn(cb_arg, &stat, 0); 821 return; 822 } 823 824 cb_fn(cb_arg, NULL, -ENOENT); 825 } 826 827 static void 828 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 829 { 830 struct spdk_fs_request *req = arg; 831 struct spdk_fs_cb_args *args = &req->args; 832 833 args->rc = fserrno; 834 if (fserrno == 0) { 835 memcpy(args->arg, stat, sizeof(*stat)); 836 } 837 sem_post(args->sem); 838 } 839 840 static void 841 __file_stat(void *arg) 842 { 843 struct spdk_fs_request *req = arg; 844 struct spdk_fs_cb_args *args = &req->args; 845 846 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 847 args->fn.stat_op, req); 848 } 849 850 int 851 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 852 const char *name, struct spdk_file_stat *stat) 853 { 854 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 855 struct spdk_fs_request *req; 856 int rc; 857 858 req = alloc_fs_request(channel); 859 if (req == NULL) { 860 return -ENOMEM; 861 } 862 863 req->args.fs = fs; 864 req->args.op.stat.name = name; 865 req->args.fn.stat_op = __copy_stat; 866 req->args.arg = stat; 867 req->args.sem = &channel->sem; 868 channel->send_request(__file_stat, req); 869 sem_wait(&channel->sem); 870 871 rc = req->args.rc; 872 free_fs_request(req); 873 874 return rc; 875 } 876 877 static void 878 fs_create_blob_close_cb(void *ctx, int bserrno) 879 { 880 int rc; 881 struct spdk_fs_request *req = ctx; 882 struct spdk_fs_cb_args *args = &req->args; 883 884 rc = args->rc ? args->rc : bserrno; 885 args->fn.file_op(args->arg, rc); 886 free_fs_request(req); 887 } 888 889 static void 890 fs_create_blob_resize_cb(void *ctx, int bserrno) 891 { 892 struct spdk_fs_request *req = ctx; 893 struct spdk_fs_cb_args *args = &req->args; 894 struct spdk_file *f = args->file; 895 struct spdk_blob *blob = args->op.create.blob; 896 uint64_t length = 0; 897 898 args->rc = bserrno; 899 if (bserrno) { 900 spdk_blob_close(blob, fs_create_blob_close_cb, args); 901 return; 902 } 903 904 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 905 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 906 907 spdk_blob_close(blob, fs_create_blob_close_cb, args); 908 } 909 910 static void 911 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 912 { 913 struct spdk_fs_request *req = ctx; 914 struct spdk_fs_cb_args *args = &req->args; 915 916 if (bserrno) { 917 args->fn.file_op(args->arg, bserrno); 918 free_fs_request(req); 919 return; 920 } 921 922 args->op.create.blob = blob; 923 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 924 } 925 926 static void 927 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 928 { 929 struct spdk_fs_request *req = ctx; 930 struct spdk_fs_cb_args *args = &req->args; 931 struct spdk_file *f = args->file; 932 933 if (bserrno) { 934 args->fn.file_op(args->arg, bserrno); 935 free_fs_request(req); 936 return; 937 } 938 939 f->blobid = blobid; 940 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 941 } 942 943 void 944 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 945 spdk_file_op_complete cb_fn, void *cb_arg) 946 { 947 struct spdk_file *file; 948 struct spdk_fs_request *req; 949 struct spdk_fs_cb_args *args; 950 951 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 952 cb_fn(cb_arg, -ENAMETOOLONG); 953 return; 954 } 955 956 file = fs_find_file(fs, name); 957 if (file != NULL) { 958 cb_fn(cb_arg, -EEXIST); 959 return; 960 } 961 962 file = file_alloc(fs); 963 if (file == NULL) { 964 cb_fn(cb_arg, -ENOMEM); 965 return; 966 } 967 968 req = alloc_fs_request(fs->md_target.md_fs_channel); 969 if (req == NULL) { 970 cb_fn(cb_arg, -ENOMEM); 971 return; 972 } 973 974 args = &req->args; 975 args->file = file; 976 args->fn.file_op = cb_fn; 977 args->arg = cb_arg; 978 979 file->name = strdup(name); 980 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 981 } 982 983 static void 984 __fs_create_file_done(void *arg, int fserrno) 985 { 986 struct spdk_fs_request *req = arg; 987 struct spdk_fs_cb_args *args = &req->args; 988 989 args->rc = fserrno; 990 sem_post(args->sem); 991 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 992 } 993 994 static void 995 __fs_create_file(void *arg) 996 { 997 struct spdk_fs_request *req = arg; 998 struct spdk_fs_cb_args *args = &req->args; 999 1000 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1001 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1002 } 1003 1004 int 1005 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 1006 { 1007 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1008 struct spdk_fs_request *req; 1009 struct spdk_fs_cb_args *args; 1010 int rc; 1011 1012 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1013 1014 req = alloc_fs_request(channel); 1015 if (req == NULL) { 1016 return -ENOMEM; 1017 } 1018 1019 args = &req->args; 1020 args->fs = fs; 1021 args->op.create.name = name; 1022 args->sem = &channel->sem; 1023 fs->send_request(__fs_create_file, req); 1024 sem_wait(&channel->sem); 1025 rc = args->rc; 1026 free_fs_request(req); 1027 1028 return rc; 1029 } 1030 1031 static void 1032 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1033 { 1034 struct spdk_fs_request *req = ctx; 1035 struct spdk_fs_cb_args *args = &req->args; 1036 struct spdk_file *f = args->file; 1037 1038 f->blob = blob; 1039 while (!TAILQ_EMPTY(&f->open_requests)) { 1040 req = TAILQ_FIRST(&f->open_requests); 1041 args = &req->args; 1042 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1043 args->fn.file_op_with_handle(args->arg, f, bserrno); 1044 free_fs_request(req); 1045 } 1046 } 1047 1048 static void 1049 fs_open_blob_create_cb(void *ctx, int bserrno) 1050 { 1051 struct spdk_fs_request *req = ctx; 1052 struct spdk_fs_cb_args *args = &req->args; 1053 struct spdk_file *file = args->file; 1054 struct spdk_filesystem *fs = args->fs; 1055 1056 if (file == NULL) { 1057 /* 1058 * This is from an open with CREATE flag - the file 1059 * is now created so look it up in the file list for this 1060 * filesystem. 1061 */ 1062 file = fs_find_file(fs, args->op.open.name); 1063 assert(file != NULL); 1064 args->file = file; 1065 } 1066 1067 file->ref_count++; 1068 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1069 if (file->ref_count == 1) { 1070 assert(file->blob == NULL); 1071 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1072 } else if (file->blob != NULL) { 1073 fs_open_blob_done(req, file->blob, 0); 1074 } else { 1075 /* 1076 * The blob open for this file is in progress due to a previous 1077 * open request. When that open completes, it will invoke the 1078 * open callback for this request. 1079 */ 1080 } 1081 } 1082 1083 void 1084 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1085 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1086 { 1087 struct spdk_file *f = NULL; 1088 struct spdk_fs_request *req; 1089 struct spdk_fs_cb_args *args; 1090 1091 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1092 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1093 return; 1094 } 1095 1096 f = fs_find_file(fs, name); 1097 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1098 cb_fn(cb_arg, NULL, -ENOENT); 1099 return; 1100 } 1101 1102 if (f != NULL && f->is_deleted == true) { 1103 cb_fn(cb_arg, NULL, -ENOENT); 1104 return; 1105 } 1106 1107 req = alloc_fs_request(fs->md_target.md_fs_channel); 1108 if (req == NULL) { 1109 cb_fn(cb_arg, NULL, -ENOMEM); 1110 return; 1111 } 1112 1113 args = &req->args; 1114 args->fn.file_op_with_handle = cb_fn; 1115 args->arg = cb_arg; 1116 args->file = f; 1117 args->fs = fs; 1118 args->op.open.name = name; 1119 1120 if (f == NULL) { 1121 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1122 } else { 1123 fs_open_blob_create_cb(req, 0); 1124 } 1125 } 1126 1127 static void 1128 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1129 { 1130 struct spdk_fs_request *req = arg; 1131 struct spdk_fs_cb_args *args = &req->args; 1132 1133 args->file = file; 1134 __wake_caller(args, bserrno); 1135 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1136 } 1137 1138 static void 1139 __fs_open_file(void *arg) 1140 { 1141 struct spdk_fs_request *req = arg; 1142 struct spdk_fs_cb_args *args = &req->args; 1143 1144 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1145 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1146 __fs_open_file_done, req); 1147 } 1148 1149 int 1150 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1151 const char *name, uint32_t flags, struct spdk_file **file) 1152 { 1153 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1154 struct spdk_fs_request *req; 1155 struct spdk_fs_cb_args *args; 1156 int rc; 1157 1158 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1159 1160 req = alloc_fs_request(channel); 1161 if (req == NULL) { 1162 return -ENOMEM; 1163 } 1164 1165 args = &req->args; 1166 args->fs = fs; 1167 args->op.open.name = name; 1168 args->op.open.flags = flags; 1169 args->sem = &channel->sem; 1170 fs->send_request(__fs_open_file, req); 1171 sem_wait(&channel->sem); 1172 rc = args->rc; 1173 if (rc == 0) { 1174 *file = args->file; 1175 } else { 1176 *file = NULL; 1177 } 1178 free_fs_request(req); 1179 1180 return rc; 1181 } 1182 1183 static void 1184 fs_rename_blob_close_cb(void *ctx, int bserrno) 1185 { 1186 struct spdk_fs_request *req = ctx; 1187 struct spdk_fs_cb_args *args = &req->args; 1188 1189 args->fn.fs_op(args->arg, bserrno); 1190 free_fs_request(req); 1191 } 1192 1193 static void 1194 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1195 { 1196 struct spdk_fs_request *req = ctx; 1197 struct spdk_fs_cb_args *args = &req->args; 1198 const char *new_name = args->op.rename.new_name; 1199 1200 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1201 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1202 } 1203 1204 static void 1205 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1206 { 1207 struct spdk_fs_cb_args *args = &req->args; 1208 struct spdk_file *f; 1209 1210 f = fs_find_file(args->fs, args->op.rename.old_name); 1211 if (f == NULL) { 1212 args->fn.fs_op(args->arg, -ENOENT); 1213 free_fs_request(req); 1214 return; 1215 } 1216 1217 free(f->name); 1218 f->name = strdup(args->op.rename.new_name); 1219 args->file = f; 1220 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1221 } 1222 1223 static void 1224 fs_rename_delete_done(void *arg, int fserrno) 1225 { 1226 __spdk_fs_md_rename_file(arg); 1227 } 1228 1229 void 1230 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1231 const char *old_name, const char *new_name, 1232 spdk_file_op_complete cb_fn, void *cb_arg) 1233 { 1234 struct spdk_file *f; 1235 struct spdk_fs_request *req; 1236 struct spdk_fs_cb_args *args; 1237 1238 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1239 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1240 cb_fn(cb_arg, -ENAMETOOLONG); 1241 return; 1242 } 1243 1244 req = alloc_fs_request(fs->md_target.md_fs_channel); 1245 if (req == NULL) { 1246 cb_fn(cb_arg, -ENOMEM); 1247 return; 1248 } 1249 1250 args = &req->args; 1251 args->fn.fs_op = cb_fn; 1252 args->fs = fs; 1253 args->arg = cb_arg; 1254 args->op.rename.old_name = old_name; 1255 args->op.rename.new_name = new_name; 1256 1257 f = fs_find_file(fs, new_name); 1258 if (f == NULL) { 1259 __spdk_fs_md_rename_file(req); 1260 return; 1261 } 1262 1263 /* 1264 * The rename overwrites an existing file. So delete the existing file, then 1265 * do the actual rename. 1266 */ 1267 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1268 } 1269 1270 static void 1271 __fs_rename_file_done(void *arg, int fserrno) 1272 { 1273 struct spdk_fs_request *req = arg; 1274 struct spdk_fs_cb_args *args = &req->args; 1275 1276 __wake_caller(args, fserrno); 1277 } 1278 1279 static void 1280 __fs_rename_file(void *arg) 1281 { 1282 struct spdk_fs_request *req = arg; 1283 struct spdk_fs_cb_args *args = &req->args; 1284 1285 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1286 __fs_rename_file_done, req); 1287 } 1288 1289 int 1290 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1291 const char *old_name, const char *new_name) 1292 { 1293 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1294 struct spdk_fs_request *req; 1295 struct spdk_fs_cb_args *args; 1296 int rc; 1297 1298 req = alloc_fs_request(channel); 1299 if (req == NULL) { 1300 return -ENOMEM; 1301 } 1302 1303 args = &req->args; 1304 1305 args->fs = fs; 1306 args->op.rename.old_name = old_name; 1307 args->op.rename.new_name = new_name; 1308 args->sem = &channel->sem; 1309 fs->send_request(__fs_rename_file, req); 1310 sem_wait(&channel->sem); 1311 rc = args->rc; 1312 free_fs_request(req); 1313 return rc; 1314 } 1315 1316 static void 1317 blob_delete_cb(void *ctx, int bserrno) 1318 { 1319 struct spdk_fs_request *req = ctx; 1320 struct spdk_fs_cb_args *args = &req->args; 1321 1322 args->fn.file_op(args->arg, bserrno); 1323 free_fs_request(req); 1324 } 1325 1326 void 1327 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1328 spdk_file_op_complete cb_fn, void *cb_arg) 1329 { 1330 struct spdk_file *f; 1331 spdk_blob_id blobid; 1332 struct spdk_fs_request *req; 1333 struct spdk_fs_cb_args *args; 1334 1335 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1336 1337 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1338 cb_fn(cb_arg, -ENAMETOOLONG); 1339 return; 1340 } 1341 1342 f = fs_find_file(fs, name); 1343 if (f == NULL) { 1344 cb_fn(cb_arg, -ENOENT); 1345 return; 1346 } 1347 1348 req = alloc_fs_request(fs->md_target.md_fs_channel); 1349 if (req == NULL) { 1350 cb_fn(cb_arg, -ENOMEM); 1351 return; 1352 } 1353 1354 args = &req->args; 1355 args->fn.file_op = cb_fn; 1356 args->arg = cb_arg; 1357 1358 if (f->ref_count > 0) { 1359 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1360 f->is_deleted = true; 1361 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1362 spdk_blob_sync_md(f->blob, blob_delete_cb, args); 1363 return; 1364 } 1365 1366 TAILQ_REMOVE(&fs->files, f, tailq); 1367 1368 cache_free_buffers(f); 1369 1370 blobid = f->blobid; 1371 1372 free(f->name); 1373 free(f->tree); 1374 free(f); 1375 1376 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1377 } 1378 1379 static void 1380 __fs_delete_file_done(void *arg, int fserrno) 1381 { 1382 struct spdk_fs_request *req = arg; 1383 struct spdk_fs_cb_args *args = &req->args; 1384 1385 __wake_caller(args, fserrno); 1386 } 1387 1388 static void 1389 __fs_delete_file(void *arg) 1390 { 1391 struct spdk_fs_request *req = arg; 1392 struct spdk_fs_cb_args *args = &req->args; 1393 1394 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1395 } 1396 1397 int 1398 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1399 const char *name) 1400 { 1401 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1402 struct spdk_fs_request *req; 1403 struct spdk_fs_cb_args *args; 1404 int rc; 1405 1406 req = alloc_fs_request(channel); 1407 if (req == NULL) { 1408 return -ENOMEM; 1409 } 1410 1411 args = &req->args; 1412 args->fs = fs; 1413 args->op.delete.name = name; 1414 args->sem = &channel->sem; 1415 fs->send_request(__fs_delete_file, req); 1416 sem_wait(&channel->sem); 1417 rc = args->rc; 1418 free_fs_request(req); 1419 1420 return rc; 1421 } 1422 1423 spdk_fs_iter 1424 spdk_fs_iter_first(struct spdk_filesystem *fs) 1425 { 1426 struct spdk_file *f; 1427 1428 f = TAILQ_FIRST(&fs->files); 1429 return f; 1430 } 1431 1432 spdk_fs_iter 1433 spdk_fs_iter_next(spdk_fs_iter iter) 1434 { 1435 struct spdk_file *f = iter; 1436 1437 if (f == NULL) { 1438 return NULL; 1439 } 1440 1441 f = TAILQ_NEXT(f, tailq); 1442 return f; 1443 } 1444 1445 const char * 1446 spdk_file_get_name(struct spdk_file *file) 1447 { 1448 return file->name; 1449 } 1450 1451 uint64_t 1452 spdk_file_get_length(struct spdk_file *file) 1453 { 1454 uint64_t length; 1455 1456 assert(file != NULL); 1457 1458 length = file->append_pos >= file->length ? file->append_pos : file->length; 1459 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length); 1460 return length; 1461 } 1462 1463 static void 1464 fs_truncate_complete_cb(void *ctx, int bserrno) 1465 { 1466 struct spdk_fs_request *req = ctx; 1467 struct spdk_fs_cb_args *args = &req->args; 1468 1469 args->fn.file_op(args->arg, bserrno); 1470 free_fs_request(req); 1471 } 1472 1473 static void 1474 fs_truncate_resize_cb(void *ctx, int bserrno) 1475 { 1476 struct spdk_fs_request *req = ctx; 1477 struct spdk_fs_cb_args *args = &req->args; 1478 struct spdk_file *file = args->file; 1479 uint64_t *length = &args->op.truncate.length; 1480 1481 if (bserrno) { 1482 args->fn.file_op(args->arg, bserrno); 1483 free_fs_request(req); 1484 return; 1485 } 1486 1487 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1488 1489 file->length = *length; 1490 if (file->append_pos > file->length) { 1491 file->append_pos = file->length; 1492 } 1493 1494 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, args); 1495 } 1496 1497 static uint64_t 1498 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1499 { 1500 return (length + cluster_sz - 1) / cluster_sz; 1501 } 1502 1503 void 1504 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1505 spdk_file_op_complete cb_fn, void *cb_arg) 1506 { 1507 struct spdk_filesystem *fs; 1508 size_t num_clusters; 1509 struct spdk_fs_request *req; 1510 struct spdk_fs_cb_args *args; 1511 1512 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1513 if (length == file->length) { 1514 cb_fn(cb_arg, 0); 1515 return; 1516 } 1517 1518 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1519 if (req == NULL) { 1520 cb_fn(cb_arg, -ENOMEM); 1521 return; 1522 } 1523 1524 args = &req->args; 1525 args->fn.file_op = cb_fn; 1526 args->arg = cb_arg; 1527 args->file = file; 1528 args->op.truncate.length = length; 1529 fs = file->fs; 1530 1531 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1532 1533 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1534 } 1535 1536 static void 1537 __truncate(void *arg) 1538 { 1539 struct spdk_fs_request *req = arg; 1540 struct spdk_fs_cb_args *args = &req->args; 1541 1542 spdk_file_truncate_async(args->file, args->op.truncate.length, 1543 args->fn.file_op, args); 1544 } 1545 1546 int 1547 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1548 uint64_t length) 1549 { 1550 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1551 struct spdk_fs_request *req; 1552 struct spdk_fs_cb_args *args; 1553 int rc; 1554 1555 req = alloc_fs_request(channel); 1556 if (req == NULL) { 1557 return -ENOMEM; 1558 } 1559 1560 args = &req->args; 1561 1562 args->file = file; 1563 args->op.truncate.length = length; 1564 args->fn.file_op = __wake_caller; 1565 args->sem = &channel->sem; 1566 1567 channel->send_request(__truncate, req); 1568 sem_wait(&channel->sem); 1569 rc = args->rc; 1570 free_fs_request(req); 1571 1572 return rc; 1573 } 1574 1575 static void 1576 __rw_done(void *ctx, int bserrno) 1577 { 1578 struct spdk_fs_request *req = ctx; 1579 struct spdk_fs_cb_args *args = &req->args; 1580 1581 spdk_dma_free(args->op.rw.pin_buf); 1582 args->fn.file_op(args->arg, bserrno); 1583 free_fs_request(req); 1584 } 1585 1586 static void 1587 __read_done(void *ctx, int bserrno) 1588 { 1589 struct spdk_fs_request *req = ctx; 1590 struct spdk_fs_cb_args *args = &req->args; 1591 1592 assert(req != NULL); 1593 if (args->op.rw.is_read) { 1594 memcpy(args->op.rw.user_buf, 1595 args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)), 1596 args->op.rw.length); 1597 __rw_done(req, 0); 1598 } else { 1599 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)), 1600 args->op.rw.user_buf, 1601 args->op.rw.length); 1602 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1603 args->op.rw.pin_buf, 1604 args->op.rw.start_lba, args->op.rw.num_lba, 1605 __rw_done, req); 1606 } 1607 } 1608 1609 static void 1610 __do_blob_read(void *ctx, int fserrno) 1611 { 1612 struct spdk_fs_request *req = ctx; 1613 struct spdk_fs_cb_args *args = &req->args; 1614 1615 if (fserrno) { 1616 __rw_done(req, fserrno); 1617 return; 1618 } 1619 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1620 args->op.rw.pin_buf, 1621 args->op.rw.start_lba, args->op.rw.num_lba, 1622 __read_done, req); 1623 } 1624 1625 static void 1626 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1627 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1628 { 1629 uint64_t end_lba; 1630 1631 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1632 *start_lba = offset / *lba_size; 1633 end_lba = (offset + length - 1) / *lba_size; 1634 *num_lba = (end_lba - *start_lba + 1); 1635 } 1636 1637 static void 1638 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1639 void *payload, uint64_t offset, uint64_t length, 1640 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1641 { 1642 struct spdk_fs_request *req; 1643 struct spdk_fs_cb_args *args; 1644 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1645 uint64_t start_lba, num_lba, pin_buf_length; 1646 uint32_t lba_size; 1647 1648 if (is_read && offset + length > file->length) { 1649 cb_fn(cb_arg, -EINVAL); 1650 return; 1651 } 1652 1653 req = alloc_fs_request(channel); 1654 if (req == NULL) { 1655 cb_fn(cb_arg, -ENOMEM); 1656 return; 1657 } 1658 1659 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1660 1661 args = &req->args; 1662 args->fn.file_op = cb_fn; 1663 args->arg = cb_arg; 1664 args->file = file; 1665 args->op.rw.channel = channel->bs_channel; 1666 args->op.rw.user_buf = payload; 1667 args->op.rw.is_read = is_read; 1668 args->op.rw.offset = offset; 1669 args->op.rw.length = length; 1670 args->op.rw.blocklen = lba_size; 1671 1672 pin_buf_length = num_lba * lba_size; 1673 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, lba_size, NULL); 1674 if (args->op.rw.pin_buf == NULL) { 1675 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1676 file->name, offset, length); 1677 free_fs_request(req); 1678 cb_fn(cb_arg, -ENOMEM); 1679 return; 1680 } 1681 1682 args->op.rw.start_lba = start_lba; 1683 args->op.rw.num_lba = num_lba; 1684 1685 if (!is_read && file->length < offset + length) { 1686 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1687 } else { 1688 __do_blob_read(req, 0); 1689 } 1690 } 1691 1692 void 1693 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1694 void *payload, uint64_t offset, uint64_t length, 1695 spdk_file_op_complete cb_fn, void *cb_arg) 1696 { 1697 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1698 } 1699 1700 void 1701 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1702 void *payload, uint64_t offset, uint64_t length, 1703 spdk_file_op_complete cb_fn, void *cb_arg) 1704 { 1705 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1706 file->name, offset, length); 1707 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1708 } 1709 1710 struct spdk_io_channel * 1711 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1712 { 1713 struct spdk_io_channel *io_channel; 1714 struct spdk_fs_channel *fs_channel; 1715 1716 io_channel = spdk_get_io_channel(&fs->io_target); 1717 fs_channel = spdk_io_channel_get_ctx(io_channel); 1718 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1719 fs_channel->send_request = __send_request_direct; 1720 1721 return io_channel; 1722 } 1723 1724 struct spdk_io_channel * 1725 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs) 1726 { 1727 struct spdk_io_channel *io_channel; 1728 struct spdk_fs_channel *fs_channel; 1729 1730 io_channel = spdk_get_io_channel(&fs->io_target); 1731 fs_channel = spdk_io_channel_get_ctx(io_channel); 1732 fs_channel->send_request = fs->send_request; 1733 fs_channel->sync = 1; 1734 pthread_spin_init(&fs_channel->lock, 0); 1735 1736 return io_channel; 1737 } 1738 1739 void 1740 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1741 { 1742 spdk_put_io_channel(channel); 1743 } 1744 1745 void 1746 spdk_fs_set_cache_size(uint64_t size_in_mb) 1747 { 1748 g_fs_cache_size = size_in_mb * 1024 * 1024; 1749 } 1750 1751 uint64_t 1752 spdk_fs_get_cache_size(void) 1753 { 1754 return g_fs_cache_size / (1024 * 1024); 1755 } 1756 1757 static void __file_flush(void *_args); 1758 1759 static void * 1760 alloc_cache_memory_buffer(struct spdk_file *context) 1761 { 1762 struct spdk_file *file; 1763 void *buf; 1764 1765 buf = spdk_mempool_get(g_cache_pool); 1766 if (buf != NULL) { 1767 return buf; 1768 } 1769 1770 pthread_spin_lock(&g_caches_lock); 1771 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1772 if (!file->open_for_writing && 1773 file->priority == SPDK_FILE_PRIORITY_LOW && 1774 file != context) { 1775 break; 1776 } 1777 } 1778 pthread_spin_unlock(&g_caches_lock); 1779 if (file != NULL) { 1780 cache_free_buffers(file); 1781 buf = spdk_mempool_get(g_cache_pool); 1782 if (buf != NULL) { 1783 return buf; 1784 } 1785 } 1786 1787 pthread_spin_lock(&g_caches_lock); 1788 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1789 if (!file->open_for_writing && file != context) { 1790 break; 1791 } 1792 } 1793 pthread_spin_unlock(&g_caches_lock); 1794 if (file != NULL) { 1795 cache_free_buffers(file); 1796 buf = spdk_mempool_get(g_cache_pool); 1797 if (buf != NULL) { 1798 return buf; 1799 } 1800 } 1801 1802 pthread_spin_lock(&g_caches_lock); 1803 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1804 if (file != context) { 1805 break; 1806 } 1807 } 1808 pthread_spin_unlock(&g_caches_lock); 1809 if (file != NULL) { 1810 cache_free_buffers(file); 1811 buf = spdk_mempool_get(g_cache_pool); 1812 if (buf != NULL) { 1813 return buf; 1814 } 1815 } 1816 1817 return NULL; 1818 } 1819 1820 static struct cache_buffer * 1821 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1822 { 1823 struct cache_buffer *buf; 1824 int count = 0; 1825 1826 buf = calloc(1, sizeof(*buf)); 1827 if (buf == NULL) { 1828 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 1829 return NULL; 1830 } 1831 1832 buf->buf = alloc_cache_memory_buffer(file); 1833 while (buf->buf == NULL) { 1834 /* 1835 * TODO: alloc_cache_memory_buffer() should eventually free 1836 * some buffers. Need a more sophisticated check here, instead 1837 * of just bailing if 100 tries does not result in getting a 1838 * free buffer. This will involve using the sync channel's 1839 * semaphore to block until a buffer becomes available. 1840 */ 1841 if (count++ == 100) { 1842 SPDK_ERRLOG("could not allocate cache buffer\n"); 1843 assert(false); 1844 free(buf); 1845 return NULL; 1846 } 1847 buf->buf = alloc_cache_memory_buffer(file); 1848 } 1849 1850 buf->buf_size = CACHE_BUFFER_SIZE; 1851 buf->offset = offset; 1852 1853 pthread_spin_lock(&g_caches_lock); 1854 if (file->tree->present_mask == 0) { 1855 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1856 } 1857 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1858 pthread_spin_unlock(&g_caches_lock); 1859 1860 return buf; 1861 } 1862 1863 static struct cache_buffer * 1864 cache_append_buffer(struct spdk_file *file) 1865 { 1866 struct cache_buffer *last; 1867 1868 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1869 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1870 1871 last = cache_insert_buffer(file, file->append_pos); 1872 if (last == NULL) { 1873 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 1874 return NULL; 1875 } 1876 1877 file->last = last; 1878 1879 return last; 1880 } 1881 1882 static void __check_sync_reqs(struct spdk_file *file); 1883 1884 static void 1885 __file_cache_finish_sync(void *ctx, int bserrno) 1886 { 1887 struct spdk_file *file = ctx; 1888 struct spdk_fs_request *sync_req; 1889 struct spdk_fs_cb_args *sync_args; 1890 1891 pthread_spin_lock(&file->lock); 1892 sync_req = TAILQ_FIRST(&file->sync_requests); 1893 sync_args = &sync_req->args; 1894 assert(sync_args->op.sync.offset <= file->length_flushed); 1895 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1896 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1897 pthread_spin_unlock(&file->lock); 1898 1899 sync_args->fn.file_op(sync_args->arg, bserrno); 1900 __check_sync_reqs(file); 1901 1902 pthread_spin_lock(&file->lock); 1903 free_fs_request(sync_req); 1904 pthread_spin_unlock(&file->lock); 1905 } 1906 1907 static void 1908 __free_args(struct spdk_fs_cb_args *args) 1909 { 1910 struct spdk_fs_request *req; 1911 1912 if (!args->from_request) { 1913 free(args); 1914 } else { 1915 /* Depends on args being at the start of the spdk_fs_request structure. */ 1916 req = (struct spdk_fs_request *)args; 1917 free_fs_request(req); 1918 } 1919 } 1920 1921 static void 1922 __check_sync_reqs(struct spdk_file *file) 1923 { 1924 struct spdk_fs_request *sync_req; 1925 1926 pthread_spin_lock(&file->lock); 1927 1928 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1929 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1930 break; 1931 } 1932 } 1933 1934 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1935 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1936 sync_req->args.op.sync.xattr_in_progress = true; 1937 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 1938 sizeof(file->length_flushed)); 1939 1940 pthread_spin_unlock(&file->lock); 1941 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, file); 1942 } else { 1943 pthread_spin_unlock(&file->lock); 1944 } 1945 } 1946 1947 static void 1948 __file_flush_done(void *arg, int bserrno) 1949 { 1950 struct spdk_fs_cb_args *args = arg; 1951 struct spdk_file *file = args->file; 1952 struct cache_buffer *next = args->op.flush.cache_buffer; 1953 1954 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1955 1956 pthread_spin_lock(&file->lock); 1957 next->in_progress = false; 1958 next->bytes_flushed += args->op.flush.length; 1959 file->length_flushed += args->op.flush.length; 1960 if (file->length_flushed > file->length) { 1961 file->length = file->length_flushed; 1962 } 1963 if (next->bytes_flushed == next->buf_size) { 1964 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1965 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1966 } 1967 1968 /* 1969 * Assert that there is no cached data that extends past the end of the underlying 1970 * blob. 1971 */ 1972 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1973 next->bytes_filled == 0); 1974 1975 pthread_spin_unlock(&file->lock); 1976 1977 __check_sync_reqs(file); 1978 1979 __file_flush(args); 1980 } 1981 1982 static void 1983 __file_flush(void *_args) 1984 { 1985 struct spdk_fs_cb_args *args = _args; 1986 struct spdk_file *file = args->file; 1987 struct cache_buffer *next; 1988 uint64_t offset, length, start_lba, num_lba; 1989 uint32_t lba_size; 1990 1991 pthread_spin_lock(&file->lock); 1992 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1993 if (next == NULL || next->in_progress) { 1994 /* 1995 * There is either no data to flush, or a flush I/O is already in 1996 * progress. So return immediately - if a flush I/O is in 1997 * progress we will flush more data after that is completed. 1998 */ 1999 __free_args(args); 2000 if (next == NULL) { 2001 /* 2002 * For cases where a file's cache was evicted, and then the 2003 * file was later appended, we will write the data directly 2004 * to disk and bypass cache. So just update length_flushed 2005 * here to reflect that all data was already written to disk. 2006 */ 2007 file->length_flushed = file->append_pos; 2008 } 2009 pthread_spin_unlock(&file->lock); 2010 if (next == NULL) { 2011 /* 2012 * There is no data to flush, but we still need to check for any 2013 * outstanding sync requests to make sure metadata gets updated. 2014 */ 2015 __check_sync_reqs(file); 2016 } 2017 return; 2018 } 2019 2020 offset = next->offset + next->bytes_flushed; 2021 length = next->bytes_filled - next->bytes_flushed; 2022 if (length == 0) { 2023 __free_args(args); 2024 pthread_spin_unlock(&file->lock); 2025 return; 2026 } 2027 args->op.flush.length = length; 2028 args->op.flush.cache_buffer = next; 2029 2030 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2031 2032 next->in_progress = true; 2033 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2034 offset, length, start_lba, num_lba); 2035 pthread_spin_unlock(&file->lock); 2036 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2037 next->buf + (start_lba * lba_size) - next->offset, 2038 start_lba, num_lba, __file_flush_done, args); 2039 } 2040 2041 static void 2042 __file_extend_done(void *arg, int bserrno) 2043 { 2044 struct spdk_fs_cb_args *args = arg; 2045 2046 __wake_caller(args, bserrno); 2047 } 2048 2049 static void 2050 __file_extend_resize_cb(void *_args, int bserrno) 2051 { 2052 struct spdk_fs_cb_args *args = _args; 2053 struct spdk_file *file = args->file; 2054 2055 if (bserrno) { 2056 __wake_caller(args, bserrno); 2057 return; 2058 } 2059 2060 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2061 } 2062 2063 static void 2064 __file_extend_blob(void *_args) 2065 { 2066 struct spdk_fs_cb_args *args = _args; 2067 struct spdk_file *file = args->file; 2068 2069 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2070 } 2071 2072 static void 2073 __rw_from_file_done(void *arg, int bserrno) 2074 { 2075 struct spdk_fs_cb_args *args = arg; 2076 2077 __wake_caller(args, bserrno); 2078 __free_args(args); 2079 } 2080 2081 static void 2082 __rw_from_file(void *_args) 2083 { 2084 struct spdk_fs_cb_args *args = _args; 2085 struct spdk_file *file = args->file; 2086 2087 if (args->op.rw.is_read) { 2088 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 2089 args->op.rw.offset, args->op.rw.length, 2090 __rw_from_file_done, args); 2091 } else { 2092 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 2093 args->op.rw.offset, args->op.rw.length, 2094 __rw_from_file_done, args); 2095 } 2096 } 2097 2098 static int 2099 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 2100 uint64_t offset, uint64_t length, bool is_read) 2101 { 2102 struct spdk_fs_cb_args *args; 2103 2104 args = calloc(1, sizeof(*args)); 2105 if (args == NULL) { 2106 sem_post(sem); 2107 return -ENOMEM; 2108 } 2109 2110 args->file = file; 2111 args->sem = sem; 2112 args->op.rw.user_buf = payload; 2113 args->op.rw.offset = offset; 2114 args->op.rw.length = length; 2115 args->op.rw.is_read = is_read; 2116 file->fs->send_request(__rw_from_file, args); 2117 return 0; 2118 } 2119 2120 int 2121 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 2122 void *payload, uint64_t offset, uint64_t length) 2123 { 2124 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2125 struct spdk_fs_cb_args *args; 2126 uint64_t rem_length, copy, blob_size, cluster_sz; 2127 uint32_t cache_buffers_filled = 0; 2128 uint8_t *cur_payload; 2129 struct cache_buffer *last; 2130 2131 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2132 2133 if (length == 0) { 2134 return 0; 2135 } 2136 2137 if (offset != file->append_pos) { 2138 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2139 return -EINVAL; 2140 } 2141 2142 pthread_spin_lock(&file->lock); 2143 file->open_for_writing = true; 2144 2145 if (file->last == NULL) { 2146 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 2147 cache_append_buffer(file); 2148 } else { 2149 int rc; 2150 2151 file->append_pos += length; 2152 pthread_spin_unlock(&file->lock); 2153 rc = __send_rw_from_file(file, &channel->sem, payload, 2154 offset, length, false); 2155 sem_wait(&channel->sem); 2156 return rc; 2157 } 2158 } 2159 2160 blob_size = __file_get_blob_size(file); 2161 2162 if ((offset + length) > blob_size) { 2163 struct spdk_fs_cb_args extend_args = {}; 2164 2165 cluster_sz = file->fs->bs_opts.cluster_sz; 2166 extend_args.sem = &channel->sem; 2167 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2168 extend_args.file = file; 2169 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2170 pthread_spin_unlock(&file->lock); 2171 file->fs->send_request(__file_extend_blob, &extend_args); 2172 sem_wait(&channel->sem); 2173 if (extend_args.rc) { 2174 return extend_args.rc; 2175 } 2176 } 2177 2178 last = file->last; 2179 rem_length = length; 2180 cur_payload = payload; 2181 while (rem_length > 0) { 2182 copy = last->buf_size - last->bytes_filled; 2183 if (copy > rem_length) { 2184 copy = rem_length; 2185 } 2186 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2187 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2188 file->append_pos += copy; 2189 if (file->length < file->append_pos) { 2190 file->length = file->append_pos; 2191 } 2192 cur_payload += copy; 2193 last->bytes_filled += copy; 2194 rem_length -= copy; 2195 if (last->bytes_filled == last->buf_size) { 2196 cache_buffers_filled++; 2197 last = cache_append_buffer(file); 2198 if (last == NULL) { 2199 BLOBFS_TRACE(file, "nomem\n"); 2200 pthread_spin_unlock(&file->lock); 2201 return -ENOMEM; 2202 } 2203 } 2204 } 2205 2206 pthread_spin_unlock(&file->lock); 2207 2208 if (cache_buffers_filled == 0) { 2209 return 0; 2210 } 2211 2212 args = calloc(1, sizeof(*args)); 2213 if (args == NULL) { 2214 return -ENOMEM; 2215 } 2216 2217 args->file = file; 2218 file->fs->send_request(__file_flush, args); 2219 return 0; 2220 } 2221 2222 static void 2223 __readahead_done(void *arg, int bserrno) 2224 { 2225 struct spdk_fs_cb_args *args = arg; 2226 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2227 struct spdk_file *file = args->file; 2228 2229 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2230 2231 pthread_spin_lock(&file->lock); 2232 cache_buffer->bytes_filled = args->op.readahead.length; 2233 cache_buffer->bytes_flushed = args->op.readahead.length; 2234 cache_buffer->in_progress = false; 2235 pthread_spin_unlock(&file->lock); 2236 2237 __free_args(args); 2238 } 2239 2240 static void 2241 __readahead(void *_args) 2242 { 2243 struct spdk_fs_cb_args *args = _args; 2244 struct spdk_file *file = args->file; 2245 uint64_t offset, length, start_lba, num_lba; 2246 uint32_t lba_size; 2247 2248 offset = args->op.readahead.offset; 2249 length = args->op.readahead.length; 2250 assert(length > 0); 2251 2252 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2253 2254 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2255 offset, length, start_lba, num_lba); 2256 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2257 args->op.readahead.cache_buffer->buf, 2258 start_lba, num_lba, __readahead_done, args); 2259 } 2260 2261 static uint64_t 2262 __next_cache_buffer_offset(uint64_t offset) 2263 { 2264 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2265 } 2266 2267 static void 2268 check_readahead(struct spdk_file *file, uint64_t offset) 2269 { 2270 struct spdk_fs_cb_args *args; 2271 2272 offset = __next_cache_buffer_offset(offset); 2273 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2274 return; 2275 } 2276 2277 args = calloc(1, sizeof(*args)); 2278 if (args == NULL) { 2279 return; 2280 } 2281 2282 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2283 2284 args->file = file; 2285 args->op.readahead.offset = offset; 2286 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2287 if (!args->op.readahead.cache_buffer) { 2288 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2289 free(args); 2290 return; 2291 } 2292 2293 args->op.readahead.cache_buffer->in_progress = true; 2294 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2295 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2296 } else { 2297 args->op.readahead.length = CACHE_BUFFER_SIZE; 2298 } 2299 file->fs->send_request(__readahead, args); 2300 } 2301 2302 static int 2303 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2304 { 2305 struct cache_buffer *buf; 2306 int rc; 2307 2308 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2309 if (buf == NULL) { 2310 pthread_spin_unlock(&file->lock); 2311 rc = __send_rw_from_file(file, sem, payload, offset, length, true); 2312 pthread_spin_lock(&file->lock); 2313 return rc; 2314 } 2315 2316 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2317 length = buf->offset + buf->bytes_filled - offset; 2318 } 2319 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2320 memcpy(payload, &buf->buf[offset - buf->offset], length); 2321 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2322 pthread_spin_lock(&g_caches_lock); 2323 spdk_tree_remove_buffer(file->tree, buf); 2324 if (file->tree->present_mask == 0) { 2325 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2326 } 2327 pthread_spin_unlock(&g_caches_lock); 2328 } 2329 2330 sem_post(sem); 2331 return 0; 2332 } 2333 2334 int64_t 2335 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2336 void *payload, uint64_t offset, uint64_t length) 2337 { 2338 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2339 uint64_t final_offset, final_length; 2340 uint32_t sub_reads = 0; 2341 int rc = 0; 2342 2343 pthread_spin_lock(&file->lock); 2344 2345 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2346 2347 file->open_for_writing = false; 2348 2349 if (length == 0 || offset >= file->append_pos) { 2350 pthread_spin_unlock(&file->lock); 2351 return 0; 2352 } 2353 2354 if (offset + length > file->append_pos) { 2355 length = file->append_pos - offset; 2356 } 2357 2358 if (offset != file->next_seq_offset) { 2359 file->seq_byte_count = 0; 2360 } 2361 file->seq_byte_count += length; 2362 file->next_seq_offset = offset + length; 2363 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2364 check_readahead(file, offset); 2365 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2366 } 2367 2368 final_length = 0; 2369 final_offset = offset + length; 2370 while (offset < final_offset) { 2371 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2372 if (length > (final_offset - offset)) { 2373 length = final_offset - offset; 2374 } 2375 rc = __file_read(file, payload, offset, length, &channel->sem); 2376 if (rc == 0) { 2377 final_length += length; 2378 } else { 2379 break; 2380 } 2381 payload += length; 2382 offset += length; 2383 sub_reads++; 2384 } 2385 pthread_spin_unlock(&file->lock); 2386 while (sub_reads-- > 0) { 2387 sem_wait(&channel->sem); 2388 } 2389 if (rc == 0) { 2390 return final_length; 2391 } else { 2392 return rc; 2393 } 2394 } 2395 2396 static void 2397 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2398 spdk_file_op_complete cb_fn, void *cb_arg) 2399 { 2400 struct spdk_fs_request *sync_req; 2401 struct spdk_fs_request *flush_req; 2402 struct spdk_fs_cb_args *sync_args; 2403 struct spdk_fs_cb_args *flush_args; 2404 2405 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2406 2407 pthread_spin_lock(&file->lock); 2408 if (file->append_pos <= file->length_flushed) { 2409 BLOBFS_TRACE(file, "done - no data to flush\n"); 2410 pthread_spin_unlock(&file->lock); 2411 cb_fn(cb_arg, 0); 2412 return; 2413 } 2414 2415 sync_req = alloc_fs_request(channel); 2416 if (!sync_req) { 2417 pthread_spin_unlock(&file->lock); 2418 cb_fn(cb_arg, -ENOMEM); 2419 return; 2420 } 2421 sync_args = &sync_req->args; 2422 2423 flush_req = alloc_fs_request(channel); 2424 if (!flush_req) { 2425 pthread_spin_unlock(&file->lock); 2426 cb_fn(cb_arg, -ENOMEM); 2427 return; 2428 } 2429 flush_args = &flush_req->args; 2430 2431 sync_args->file = file; 2432 sync_args->fn.file_op = cb_fn; 2433 sync_args->arg = cb_arg; 2434 sync_args->op.sync.offset = file->append_pos; 2435 sync_args->op.sync.xattr_in_progress = false; 2436 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2437 pthread_spin_unlock(&file->lock); 2438 2439 flush_args->file = file; 2440 channel->send_request(__file_flush, flush_args); 2441 } 2442 2443 int 2444 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2445 { 2446 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2447 struct spdk_fs_cb_args args = {}; 2448 2449 args.sem = &channel->sem; 2450 _file_sync(file, channel, __wake_caller, &args); 2451 sem_wait(&channel->sem); 2452 2453 return args.rc; 2454 } 2455 2456 void 2457 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2458 spdk_file_op_complete cb_fn, void *cb_arg) 2459 { 2460 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2461 2462 _file_sync(file, channel, cb_fn, cb_arg); 2463 } 2464 2465 void 2466 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2467 { 2468 BLOBFS_TRACE(file, "priority=%u\n", priority); 2469 file->priority = priority; 2470 2471 } 2472 2473 /* 2474 * Close routines 2475 */ 2476 2477 static void 2478 __file_close_async_done(void *ctx, int bserrno) 2479 { 2480 struct spdk_fs_request *req = ctx; 2481 struct spdk_fs_cb_args *args = &req->args; 2482 struct spdk_file *file = args->file; 2483 2484 if (file->is_deleted) { 2485 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2486 return; 2487 } 2488 2489 args->fn.file_op(args->arg, bserrno); 2490 free_fs_request(req); 2491 } 2492 2493 static void 2494 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2495 { 2496 struct spdk_blob *blob; 2497 2498 pthread_spin_lock(&file->lock); 2499 if (file->ref_count == 0) { 2500 pthread_spin_unlock(&file->lock); 2501 __file_close_async_done(req, -EBADF); 2502 return; 2503 } 2504 2505 file->ref_count--; 2506 if (file->ref_count > 0) { 2507 pthread_spin_unlock(&file->lock); 2508 req->args.fn.file_op(req->args.arg, 0); 2509 free_fs_request(req); 2510 return; 2511 } 2512 2513 pthread_spin_unlock(&file->lock); 2514 2515 blob = file->blob; 2516 file->blob = NULL; 2517 spdk_blob_close(blob, __file_close_async_done, req); 2518 } 2519 2520 static void 2521 __file_close_async__sync_done(void *arg, int fserrno) 2522 { 2523 struct spdk_fs_request *req = arg; 2524 struct spdk_fs_cb_args *args = &req->args; 2525 2526 __file_close_async(args->file, req); 2527 } 2528 2529 void 2530 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2531 { 2532 struct spdk_fs_request *req; 2533 struct spdk_fs_cb_args *args; 2534 2535 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2536 if (req == NULL) { 2537 cb_fn(cb_arg, -ENOMEM); 2538 return; 2539 } 2540 2541 args = &req->args; 2542 args->file = file; 2543 args->fn.file_op = cb_fn; 2544 args->arg = cb_arg; 2545 2546 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2547 } 2548 2549 static void 2550 __file_close(void *arg) 2551 { 2552 struct spdk_fs_request *req = arg; 2553 struct spdk_fs_cb_args *args = &req->args; 2554 struct spdk_file *file = args->file; 2555 2556 __file_close_async(file, req); 2557 } 2558 2559 int 2560 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2561 { 2562 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2563 struct spdk_fs_request *req; 2564 struct spdk_fs_cb_args *args; 2565 2566 req = alloc_fs_request(channel); 2567 if (req == NULL) { 2568 return -ENOMEM; 2569 } 2570 2571 args = &req->args; 2572 2573 spdk_file_sync(file, _channel); 2574 BLOBFS_TRACE(file, "name=%s\n", file->name); 2575 args->file = file; 2576 args->sem = &channel->sem; 2577 args->fn.file_op = __wake_caller; 2578 args->arg = req; 2579 channel->send_request(__file_close, req); 2580 sem_wait(&channel->sem); 2581 2582 return args->rc; 2583 } 2584 2585 int 2586 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2587 { 2588 if (size < sizeof(spdk_blob_id)) { 2589 return -EINVAL; 2590 } 2591 2592 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2593 2594 return sizeof(spdk_blob_id); 2595 } 2596 2597 static void 2598 cache_free_buffers(struct spdk_file *file) 2599 { 2600 BLOBFS_TRACE(file, "free=%s\n", file->name); 2601 pthread_spin_lock(&file->lock); 2602 pthread_spin_lock(&g_caches_lock); 2603 if (file->tree->present_mask == 0) { 2604 pthread_spin_unlock(&g_caches_lock); 2605 pthread_spin_unlock(&file->lock); 2606 return; 2607 } 2608 spdk_tree_free_buffers(file->tree); 2609 2610 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2611 /* If not freed, put it in the end of the queue */ 2612 if (file->tree->present_mask != 0) { 2613 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2614 } 2615 file->last = NULL; 2616 pthread_spin_unlock(&g_caches_lock); 2617 pthread_spin_unlock(&file->lock); 2618 } 2619 2620 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2621 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2622