1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "blobfs_internal.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/thread.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 47 #define BLOBFS_TRACE(file, str, args...) \ 48 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 49 50 #define BLOBFS_TRACE_RW(file, str, args...) \ 51 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 55 56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 57 static struct spdk_mempool *g_cache_pool; 58 static TAILQ_HEAD(, spdk_file) g_caches; 59 static int g_fs_count = 0; 60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 61 static pthread_spinlock_t g_caches_lock; 62 63 void 64 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 65 { 66 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 67 free(cache_buffer); 68 } 69 70 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 71 72 struct spdk_file { 73 struct spdk_filesystem *fs; 74 struct spdk_blob *blob; 75 char *name; 76 uint64_t length; 77 bool is_deleted; 78 bool open_for_writing; 79 uint64_t length_flushed; 80 uint64_t append_pos; 81 uint64_t seq_byte_count; 82 uint64_t next_seq_offset; 83 uint32_t priority; 84 TAILQ_ENTRY(spdk_file) tailq; 85 spdk_blob_id blobid; 86 uint32_t ref_count; 87 pthread_spinlock_t lock; 88 struct cache_buffer *last; 89 struct cache_tree *tree; 90 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 91 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 92 TAILQ_ENTRY(spdk_file) cache_tailq; 93 }; 94 95 struct spdk_deleted_file { 96 spdk_blob_id id; 97 TAILQ_ENTRY(spdk_deleted_file) tailq; 98 }; 99 100 struct spdk_filesystem { 101 struct spdk_blob_store *bs; 102 TAILQ_HEAD(, spdk_file) files; 103 struct spdk_bs_opts bs_opts; 104 struct spdk_bs_dev *bdev; 105 fs_send_request_fn send_request; 106 107 struct { 108 uint32_t max_ops; 109 struct spdk_io_channel *sync_io_channel; 110 struct spdk_fs_channel *sync_fs_channel; 111 } sync_target; 112 113 struct { 114 uint32_t max_ops; 115 struct spdk_io_channel *md_io_channel; 116 struct spdk_fs_channel *md_fs_channel; 117 } md_target; 118 119 struct { 120 uint32_t max_ops; 121 } io_target; 122 }; 123 124 struct spdk_fs_cb_args { 125 union { 126 spdk_fs_op_with_handle_complete fs_op_with_handle; 127 spdk_fs_op_complete fs_op; 128 spdk_file_op_with_handle_complete file_op_with_handle; 129 spdk_file_op_complete file_op; 130 spdk_file_stat_op_complete stat_op; 131 } fn; 132 void *arg; 133 sem_t *sem; 134 struct spdk_filesystem *fs; 135 struct spdk_file *file; 136 int rc; 137 union { 138 struct { 139 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 140 } fs_load; 141 struct { 142 uint64_t length; 143 } truncate; 144 struct { 145 struct spdk_io_channel *channel; 146 void *user_buf; 147 void *pin_buf; 148 int is_read; 149 off_t offset; 150 size_t length; 151 uint64_t start_lba; 152 uint64_t num_lba; 153 uint32_t blocklen; 154 } rw; 155 struct { 156 const char *old_name; 157 const char *new_name; 158 } rename; 159 struct { 160 struct cache_buffer *cache_buffer; 161 uint64_t length; 162 } flush; 163 struct { 164 struct cache_buffer *cache_buffer; 165 uint64_t length; 166 uint64_t offset; 167 } readahead; 168 struct { 169 uint64_t offset; 170 TAILQ_ENTRY(spdk_fs_request) tailq; 171 bool xattr_in_progress; 172 } sync; 173 struct { 174 uint32_t num_clusters; 175 } resize; 176 struct { 177 const char *name; 178 uint32_t flags; 179 TAILQ_ENTRY(spdk_fs_request) tailq; 180 } open; 181 struct { 182 const char *name; 183 struct spdk_blob *blob; 184 } create; 185 struct { 186 const char *name; 187 } delete; 188 struct { 189 const char *name; 190 } stat; 191 } op; 192 }; 193 194 static void cache_free_buffers(struct spdk_file *file); 195 196 void 197 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 198 { 199 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 200 } 201 202 static void 203 __initialize_cache(void) 204 { 205 assert(g_cache_pool == NULL); 206 207 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 208 g_fs_cache_size / CACHE_BUFFER_SIZE, 209 CACHE_BUFFER_SIZE, 210 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 211 SPDK_ENV_SOCKET_ID_ANY); 212 if (!g_cache_pool) { 213 SPDK_ERRLOG("Create mempool failed, you may " 214 "increase the memory and try again\n"); 215 assert(false); 216 } 217 TAILQ_INIT(&g_caches); 218 pthread_spin_init(&g_caches_lock, 0); 219 } 220 221 static void 222 __free_cache(void) 223 { 224 assert(g_cache_pool != NULL); 225 226 spdk_mempool_free(g_cache_pool); 227 g_cache_pool = NULL; 228 } 229 230 static uint64_t 231 __file_get_blob_size(struct spdk_file *file) 232 { 233 uint64_t cluster_sz; 234 235 cluster_sz = file->fs->bs_opts.cluster_sz; 236 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 237 } 238 239 struct spdk_fs_request { 240 struct spdk_fs_cb_args args; 241 TAILQ_ENTRY(spdk_fs_request) link; 242 struct spdk_fs_channel *channel; 243 }; 244 245 struct spdk_fs_channel { 246 struct spdk_fs_request *req_mem; 247 TAILQ_HEAD(, spdk_fs_request) reqs; 248 sem_t sem; 249 struct spdk_filesystem *fs; 250 struct spdk_io_channel *bs_channel; 251 fs_send_request_fn send_request; 252 bool sync; 253 pthread_spinlock_t lock; 254 }; 255 256 /* For now, this is effectively an alias. But eventually we'll shift 257 * some data members over. */ 258 struct spdk_fs_thread_ctx { 259 struct spdk_fs_channel ch; 260 }; 261 262 static struct spdk_fs_request * 263 alloc_fs_request(struct spdk_fs_channel *channel) 264 { 265 struct spdk_fs_request *req; 266 267 if (channel->sync) { 268 pthread_spin_lock(&channel->lock); 269 } 270 271 req = TAILQ_FIRST(&channel->reqs); 272 if (req) { 273 TAILQ_REMOVE(&channel->reqs, req, link); 274 } 275 276 if (channel->sync) { 277 pthread_spin_unlock(&channel->lock); 278 } 279 280 if (req == NULL) { 281 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 282 return NULL; 283 } 284 memset(req, 0, sizeof(*req)); 285 req->channel = channel; 286 287 return req; 288 } 289 290 static void 291 free_fs_request(struct spdk_fs_request *req) 292 { 293 struct spdk_fs_channel *channel = req->channel; 294 295 if (channel->sync) { 296 pthread_spin_lock(&channel->lock); 297 } 298 299 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 300 301 if (channel->sync) { 302 pthread_spin_unlock(&channel->lock); 303 } 304 } 305 306 static int 307 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 308 uint32_t max_ops) 309 { 310 uint32_t i; 311 312 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 313 if (!channel->req_mem) { 314 return -1; 315 } 316 317 TAILQ_INIT(&channel->reqs); 318 sem_init(&channel->sem, 0, 0); 319 320 for (i = 0; i < max_ops; i++) { 321 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 322 } 323 324 channel->fs = fs; 325 326 return 0; 327 } 328 329 static int 330 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 331 { 332 struct spdk_filesystem *fs; 333 struct spdk_fs_channel *channel = ctx_buf; 334 335 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 336 337 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 338 } 339 340 static int 341 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 342 { 343 struct spdk_filesystem *fs; 344 struct spdk_fs_channel *channel = ctx_buf; 345 346 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 347 348 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 349 } 350 351 static int 352 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 353 { 354 struct spdk_filesystem *fs; 355 struct spdk_fs_channel *channel = ctx_buf; 356 357 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 358 359 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 360 } 361 362 static void 363 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 364 { 365 struct spdk_fs_channel *channel = ctx_buf; 366 367 free(channel->req_mem); 368 if (channel->bs_channel != NULL) { 369 spdk_bs_free_io_channel(channel->bs_channel); 370 } 371 } 372 373 static void 374 __send_request_direct(fs_request_fn fn, void *arg) 375 { 376 fn(arg); 377 } 378 379 static void 380 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 381 { 382 fs->bs = bs; 383 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 384 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 385 fs->md_target.md_fs_channel->send_request = __send_request_direct; 386 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 387 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 388 389 pthread_mutex_lock(&g_cache_init_lock); 390 if (g_fs_count == 0) { 391 __initialize_cache(); 392 } 393 g_fs_count++; 394 pthread_mutex_unlock(&g_cache_init_lock); 395 } 396 397 static void 398 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 399 { 400 struct spdk_fs_request *req = ctx; 401 struct spdk_fs_cb_args *args = &req->args; 402 struct spdk_filesystem *fs = args->fs; 403 404 if (bserrno == 0) { 405 common_fs_bs_init(fs, bs); 406 } else { 407 free(fs); 408 fs = NULL; 409 } 410 411 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 412 free_fs_request(req); 413 } 414 415 static void 416 fs_conf_parse(void) 417 { 418 struct spdk_conf_section *sp; 419 420 sp = spdk_conf_find_section(NULL, "Blobfs"); 421 if (sp == NULL) { 422 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 423 return; 424 } 425 426 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 427 if (g_fs_cache_buffer_shift <= 0) { 428 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 429 } 430 } 431 432 static struct spdk_filesystem * 433 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 434 { 435 struct spdk_filesystem *fs; 436 437 fs = calloc(1, sizeof(*fs)); 438 if (fs == NULL) { 439 return NULL; 440 } 441 442 fs->bdev = dev; 443 fs->send_request = send_request_fn; 444 TAILQ_INIT(&fs->files); 445 446 fs->md_target.max_ops = 512; 447 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 448 sizeof(struct spdk_fs_channel), "blobfs_md"); 449 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 450 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 451 452 fs->sync_target.max_ops = 512; 453 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 454 sizeof(struct spdk_fs_channel), "blobfs_sync"); 455 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 456 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 457 458 fs->io_target.max_ops = 512; 459 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 460 sizeof(struct spdk_fs_channel), "blobfs_io"); 461 462 return fs; 463 } 464 465 static void 466 __wake_caller(void *arg, int fserrno) 467 { 468 struct spdk_fs_cb_args *args = arg; 469 470 args->rc = fserrno; 471 sem_post(args->sem); 472 } 473 474 void 475 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 476 fs_send_request_fn send_request_fn, 477 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 478 { 479 struct spdk_filesystem *fs; 480 struct spdk_fs_request *req; 481 struct spdk_fs_cb_args *args; 482 struct spdk_bs_opts opts = {}; 483 484 fs = fs_alloc(dev, send_request_fn); 485 if (fs == NULL) { 486 cb_fn(cb_arg, NULL, -ENOMEM); 487 return; 488 } 489 490 fs_conf_parse(); 491 492 req = alloc_fs_request(fs->md_target.md_fs_channel); 493 if (req == NULL) { 494 spdk_put_io_channel(fs->md_target.md_io_channel); 495 spdk_io_device_unregister(&fs->md_target, NULL); 496 spdk_put_io_channel(fs->sync_target.sync_io_channel); 497 spdk_io_device_unregister(&fs->sync_target, NULL); 498 spdk_io_device_unregister(&fs->io_target, NULL); 499 free(fs); 500 cb_fn(cb_arg, NULL, -ENOMEM); 501 return; 502 } 503 504 args = &req->args; 505 args->fn.fs_op_with_handle = cb_fn; 506 args->arg = cb_arg; 507 args->fs = fs; 508 509 spdk_bs_opts_init(&opts); 510 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS"); 511 if (opt) { 512 opts.cluster_sz = opt->cluster_sz; 513 } 514 spdk_bs_init(dev, &opts, init_cb, req); 515 } 516 517 static struct spdk_file * 518 file_alloc(struct spdk_filesystem *fs) 519 { 520 struct spdk_file *file; 521 522 file = calloc(1, sizeof(*file)); 523 if (file == NULL) { 524 return NULL; 525 } 526 527 file->tree = calloc(1, sizeof(*file->tree)); 528 if (file->tree == NULL) { 529 free(file); 530 return NULL; 531 } 532 533 file->fs = fs; 534 TAILQ_INIT(&file->open_requests); 535 TAILQ_INIT(&file->sync_requests); 536 pthread_spin_init(&file->lock, 0); 537 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 538 file->priority = SPDK_FILE_PRIORITY_LOW; 539 return file; 540 } 541 542 static void fs_load_done(void *ctx, int bserrno); 543 544 static int 545 _handle_deleted_files(struct spdk_fs_request *req) 546 { 547 struct spdk_fs_cb_args *args = &req->args; 548 struct spdk_filesystem *fs = args->fs; 549 550 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 551 struct spdk_deleted_file *deleted_file; 552 553 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 554 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 555 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 556 free(deleted_file); 557 return 0; 558 } 559 560 return 1; 561 } 562 563 static void 564 fs_load_done(void *ctx, int bserrno) 565 { 566 struct spdk_fs_request *req = ctx; 567 struct spdk_fs_cb_args *args = &req->args; 568 struct spdk_filesystem *fs = args->fs; 569 570 /* The filesystem has been loaded. Now check if there are any files that 571 * were marked for deletion before last unload. Do not complete the 572 * fs_load callback until all of them have been deleted on disk. 573 */ 574 if (_handle_deleted_files(req) == 0) { 575 /* We found a file that's been marked for deleting but not actually 576 * deleted yet. This function will get called again once the delete 577 * operation is completed. 578 */ 579 return; 580 } 581 582 args->fn.fs_op_with_handle(args->arg, fs, 0); 583 free_fs_request(req); 584 585 } 586 587 static void 588 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 589 { 590 struct spdk_fs_request *req = ctx; 591 struct spdk_fs_cb_args *args = &req->args; 592 struct spdk_filesystem *fs = args->fs; 593 uint64_t *length; 594 const char *name; 595 uint32_t *is_deleted; 596 size_t value_len; 597 598 if (rc < 0) { 599 args->fn.fs_op_with_handle(args->arg, fs, rc); 600 free_fs_request(req); 601 return; 602 } 603 604 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 605 if (rc < 0) { 606 args->fn.fs_op_with_handle(args->arg, fs, rc); 607 free_fs_request(req); 608 return; 609 } 610 611 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 612 if (rc < 0) { 613 args->fn.fs_op_with_handle(args->arg, fs, rc); 614 free_fs_request(req); 615 return; 616 } 617 618 assert(value_len == 8); 619 620 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 621 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 622 if (rc < 0) { 623 struct spdk_file *f; 624 625 f = file_alloc(fs); 626 if (f == NULL) { 627 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 628 free_fs_request(req); 629 return; 630 } 631 632 f->name = strdup(name); 633 f->blobid = spdk_blob_get_id(blob); 634 f->length = *length; 635 f->length_flushed = *length; 636 f->append_pos = *length; 637 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 638 } else { 639 struct spdk_deleted_file *deleted_file; 640 641 deleted_file = calloc(1, sizeof(*deleted_file)); 642 if (deleted_file == NULL) { 643 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 644 free_fs_request(req); 645 return; 646 } 647 deleted_file->id = spdk_blob_get_id(blob); 648 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 649 } 650 } 651 652 static void 653 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 654 { 655 struct spdk_fs_request *req = ctx; 656 struct spdk_fs_cb_args *args = &req->args; 657 struct spdk_filesystem *fs = args->fs; 658 struct spdk_bs_type bstype; 659 static const struct spdk_bs_type blobfs_type = {"BLOBFS"}; 660 static const struct spdk_bs_type zeros; 661 662 if (bserrno != 0) { 663 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 664 free_fs_request(req); 665 free(fs); 666 return; 667 } 668 669 bstype = spdk_bs_get_bstype(bs); 670 671 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 672 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n"); 673 spdk_bs_set_bstype(bs, blobfs_type); 674 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 675 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n"); 676 SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype)); 677 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 678 free_fs_request(req); 679 free(fs); 680 return; 681 } 682 683 common_fs_bs_init(fs, bs); 684 fs_load_done(req, 0); 685 } 686 687 static void 688 spdk_fs_io_device_unregister(struct spdk_filesystem *fs) 689 { 690 assert(fs != NULL); 691 spdk_io_device_unregister(&fs->md_target, NULL); 692 spdk_io_device_unregister(&fs->sync_target, NULL); 693 spdk_io_device_unregister(&fs->io_target, NULL); 694 free(fs); 695 } 696 697 static void 698 spdk_fs_free_io_channels(struct spdk_filesystem *fs) 699 { 700 assert(fs != NULL); 701 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 702 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 703 } 704 705 void 706 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 707 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 708 { 709 struct spdk_filesystem *fs; 710 struct spdk_fs_cb_args *args; 711 struct spdk_fs_request *req; 712 struct spdk_bs_opts bs_opts; 713 714 fs = fs_alloc(dev, send_request_fn); 715 if (fs == NULL) { 716 cb_fn(cb_arg, NULL, -ENOMEM); 717 return; 718 } 719 720 fs_conf_parse(); 721 722 req = alloc_fs_request(fs->md_target.md_fs_channel); 723 if (req == NULL) { 724 spdk_fs_free_io_channels(fs); 725 spdk_fs_io_device_unregister(fs); 726 cb_fn(cb_arg, NULL, -ENOMEM); 727 return; 728 } 729 730 args = &req->args; 731 args->fn.fs_op_with_handle = cb_fn; 732 args->arg = cb_arg; 733 args->fs = fs; 734 TAILQ_INIT(&args->op.fs_load.deleted_files); 735 spdk_bs_opts_init(&bs_opts); 736 bs_opts.iter_cb_fn = iter_cb; 737 bs_opts.iter_cb_arg = req; 738 spdk_bs_load(dev, &bs_opts, load_cb, req); 739 } 740 741 static void 742 unload_cb(void *ctx, int bserrno) 743 { 744 struct spdk_fs_request *req = ctx; 745 struct spdk_fs_cb_args *args = &req->args; 746 struct spdk_filesystem *fs = args->fs; 747 struct spdk_file *file, *tmp; 748 749 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 750 TAILQ_REMOVE(&fs->files, file, tailq); 751 cache_free_buffers(file); 752 free(file->name); 753 free(file->tree); 754 free(file); 755 } 756 757 pthread_mutex_lock(&g_cache_init_lock); 758 g_fs_count--; 759 if (g_fs_count == 0) { 760 __free_cache(); 761 } 762 pthread_mutex_unlock(&g_cache_init_lock); 763 764 args->fn.fs_op(args->arg, bserrno); 765 free(req); 766 767 spdk_fs_io_device_unregister(fs); 768 } 769 770 void 771 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 772 { 773 struct spdk_fs_request *req; 774 struct spdk_fs_cb_args *args; 775 776 /* 777 * We must free the md_channel before unloading the blobstore, so just 778 * allocate this request from the general heap. 779 */ 780 req = calloc(1, sizeof(*req)); 781 if (req == NULL) { 782 cb_fn(cb_arg, -ENOMEM); 783 return; 784 } 785 786 args = &req->args; 787 args->fn.fs_op = cb_fn; 788 args->arg = cb_arg; 789 args->fs = fs; 790 791 spdk_fs_free_io_channels(fs); 792 spdk_bs_unload(fs->bs, unload_cb, req); 793 } 794 795 static struct spdk_file * 796 fs_find_file(struct spdk_filesystem *fs, const char *name) 797 { 798 struct spdk_file *file; 799 800 TAILQ_FOREACH(file, &fs->files, tailq) { 801 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 802 return file; 803 } 804 } 805 806 return NULL; 807 } 808 809 void 810 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 811 spdk_file_stat_op_complete cb_fn, void *cb_arg) 812 { 813 struct spdk_file_stat stat; 814 struct spdk_file *f = NULL; 815 816 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 817 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 818 return; 819 } 820 821 f = fs_find_file(fs, name); 822 if (f != NULL) { 823 stat.blobid = f->blobid; 824 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 825 cb_fn(cb_arg, &stat, 0); 826 return; 827 } 828 829 cb_fn(cb_arg, NULL, -ENOENT); 830 } 831 832 static void 833 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 834 { 835 struct spdk_fs_request *req = arg; 836 struct spdk_fs_cb_args *args = &req->args; 837 838 args->rc = fserrno; 839 if (fserrno == 0) { 840 memcpy(args->arg, stat, sizeof(*stat)); 841 } 842 sem_post(args->sem); 843 } 844 845 static void 846 __file_stat(void *arg) 847 { 848 struct spdk_fs_request *req = arg; 849 struct spdk_fs_cb_args *args = &req->args; 850 851 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 852 args->fn.stat_op, req); 853 } 854 855 int 856 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 857 const char *name, struct spdk_file_stat *stat) 858 { 859 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 860 struct spdk_fs_request *req; 861 int rc; 862 863 req = alloc_fs_request(channel); 864 if (req == NULL) { 865 return -ENOMEM; 866 } 867 868 req->args.fs = fs; 869 req->args.op.stat.name = name; 870 req->args.fn.stat_op = __copy_stat; 871 req->args.arg = stat; 872 req->args.sem = &channel->sem; 873 channel->send_request(__file_stat, req); 874 sem_wait(&channel->sem); 875 876 rc = req->args.rc; 877 free_fs_request(req); 878 879 return rc; 880 } 881 882 static void 883 fs_create_blob_close_cb(void *ctx, int bserrno) 884 { 885 int rc; 886 struct spdk_fs_request *req = ctx; 887 struct spdk_fs_cb_args *args = &req->args; 888 889 rc = args->rc ? args->rc : bserrno; 890 args->fn.file_op(args->arg, rc); 891 free_fs_request(req); 892 } 893 894 static void 895 fs_create_blob_resize_cb(void *ctx, int bserrno) 896 { 897 struct spdk_fs_request *req = ctx; 898 struct spdk_fs_cb_args *args = &req->args; 899 struct spdk_file *f = args->file; 900 struct spdk_blob *blob = args->op.create.blob; 901 uint64_t length = 0; 902 903 args->rc = bserrno; 904 if (bserrno) { 905 spdk_blob_close(blob, fs_create_blob_close_cb, args); 906 return; 907 } 908 909 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 910 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 911 912 spdk_blob_close(blob, fs_create_blob_close_cb, args); 913 } 914 915 static void 916 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 917 { 918 struct spdk_fs_request *req = ctx; 919 struct spdk_fs_cb_args *args = &req->args; 920 921 if (bserrno) { 922 args->fn.file_op(args->arg, bserrno); 923 free_fs_request(req); 924 return; 925 } 926 927 args->op.create.blob = blob; 928 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 929 } 930 931 static void 932 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 933 { 934 struct spdk_fs_request *req = ctx; 935 struct spdk_fs_cb_args *args = &req->args; 936 struct spdk_file *f = args->file; 937 938 if (bserrno) { 939 args->fn.file_op(args->arg, bserrno); 940 free_fs_request(req); 941 return; 942 } 943 944 f->blobid = blobid; 945 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 946 } 947 948 void 949 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 950 spdk_file_op_complete cb_fn, void *cb_arg) 951 { 952 struct spdk_file *file; 953 struct spdk_fs_request *req; 954 struct spdk_fs_cb_args *args; 955 956 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 957 cb_fn(cb_arg, -ENAMETOOLONG); 958 return; 959 } 960 961 file = fs_find_file(fs, name); 962 if (file != NULL) { 963 cb_fn(cb_arg, -EEXIST); 964 return; 965 } 966 967 file = file_alloc(fs); 968 if (file == NULL) { 969 cb_fn(cb_arg, -ENOMEM); 970 return; 971 } 972 973 req = alloc_fs_request(fs->md_target.md_fs_channel); 974 if (req == NULL) { 975 cb_fn(cb_arg, -ENOMEM); 976 return; 977 } 978 979 args = &req->args; 980 args->file = file; 981 args->fn.file_op = cb_fn; 982 args->arg = cb_arg; 983 984 file->name = strdup(name); 985 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 986 } 987 988 static void 989 __fs_create_file_done(void *arg, int fserrno) 990 { 991 struct spdk_fs_request *req = arg; 992 struct spdk_fs_cb_args *args = &req->args; 993 994 args->rc = fserrno; 995 sem_post(args->sem); 996 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 997 } 998 999 static void 1000 __fs_create_file(void *arg) 1001 { 1002 struct spdk_fs_request *req = arg; 1003 struct spdk_fs_cb_args *args = &req->args; 1004 1005 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1006 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1007 } 1008 1009 int 1010 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name) 1011 { 1012 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1013 struct spdk_fs_request *req; 1014 struct spdk_fs_cb_args *args; 1015 int rc; 1016 1017 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1018 1019 req = alloc_fs_request(channel); 1020 if (req == NULL) { 1021 return -ENOMEM; 1022 } 1023 1024 args = &req->args; 1025 args->fs = fs; 1026 args->op.create.name = name; 1027 args->sem = &channel->sem; 1028 fs->send_request(__fs_create_file, req); 1029 sem_wait(&channel->sem); 1030 rc = args->rc; 1031 free_fs_request(req); 1032 1033 return rc; 1034 } 1035 1036 static void 1037 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1038 { 1039 struct spdk_fs_request *req = ctx; 1040 struct spdk_fs_cb_args *args = &req->args; 1041 struct spdk_file *f = args->file; 1042 1043 f->blob = blob; 1044 while (!TAILQ_EMPTY(&f->open_requests)) { 1045 req = TAILQ_FIRST(&f->open_requests); 1046 args = &req->args; 1047 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1048 args->fn.file_op_with_handle(args->arg, f, bserrno); 1049 free_fs_request(req); 1050 } 1051 } 1052 1053 static void 1054 fs_open_blob_create_cb(void *ctx, int bserrno) 1055 { 1056 struct spdk_fs_request *req = ctx; 1057 struct spdk_fs_cb_args *args = &req->args; 1058 struct spdk_file *file = args->file; 1059 struct spdk_filesystem *fs = args->fs; 1060 1061 if (file == NULL) { 1062 /* 1063 * This is from an open with CREATE flag - the file 1064 * is now created so look it up in the file list for this 1065 * filesystem. 1066 */ 1067 file = fs_find_file(fs, args->op.open.name); 1068 assert(file != NULL); 1069 args->file = file; 1070 } 1071 1072 file->ref_count++; 1073 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1074 if (file->ref_count == 1) { 1075 assert(file->blob == NULL); 1076 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1077 } else if (file->blob != NULL) { 1078 fs_open_blob_done(req, file->blob, 0); 1079 } else { 1080 /* 1081 * The blob open for this file is in progress due to a previous 1082 * open request. When that open completes, it will invoke the 1083 * open callback for this request. 1084 */ 1085 } 1086 } 1087 1088 void 1089 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1090 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1091 { 1092 struct spdk_file *f = NULL; 1093 struct spdk_fs_request *req; 1094 struct spdk_fs_cb_args *args; 1095 1096 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1097 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1098 return; 1099 } 1100 1101 f = fs_find_file(fs, name); 1102 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1103 cb_fn(cb_arg, NULL, -ENOENT); 1104 return; 1105 } 1106 1107 if (f != NULL && f->is_deleted == true) { 1108 cb_fn(cb_arg, NULL, -ENOENT); 1109 return; 1110 } 1111 1112 req = alloc_fs_request(fs->md_target.md_fs_channel); 1113 if (req == NULL) { 1114 cb_fn(cb_arg, NULL, -ENOMEM); 1115 return; 1116 } 1117 1118 args = &req->args; 1119 args->fn.file_op_with_handle = cb_fn; 1120 args->arg = cb_arg; 1121 args->file = f; 1122 args->fs = fs; 1123 args->op.open.name = name; 1124 1125 if (f == NULL) { 1126 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1127 } else { 1128 fs_open_blob_create_cb(req, 0); 1129 } 1130 } 1131 1132 static void 1133 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1134 { 1135 struct spdk_fs_request *req = arg; 1136 struct spdk_fs_cb_args *args = &req->args; 1137 1138 args->file = file; 1139 __wake_caller(args, bserrno); 1140 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1141 } 1142 1143 static void 1144 __fs_open_file(void *arg) 1145 { 1146 struct spdk_fs_request *req = arg; 1147 struct spdk_fs_cb_args *args = &req->args; 1148 1149 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1150 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1151 __fs_open_file_done, req); 1152 } 1153 1154 int 1155 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1156 const char *name, uint32_t flags, struct spdk_file **file) 1157 { 1158 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1159 struct spdk_fs_request *req; 1160 struct spdk_fs_cb_args *args; 1161 int rc; 1162 1163 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1164 1165 req = alloc_fs_request(channel); 1166 if (req == NULL) { 1167 return -ENOMEM; 1168 } 1169 1170 args = &req->args; 1171 args->fs = fs; 1172 args->op.open.name = name; 1173 args->op.open.flags = flags; 1174 args->sem = &channel->sem; 1175 fs->send_request(__fs_open_file, req); 1176 sem_wait(&channel->sem); 1177 rc = args->rc; 1178 if (rc == 0) { 1179 *file = args->file; 1180 } else { 1181 *file = NULL; 1182 } 1183 free_fs_request(req); 1184 1185 return rc; 1186 } 1187 1188 static void 1189 fs_rename_blob_close_cb(void *ctx, int bserrno) 1190 { 1191 struct spdk_fs_request *req = ctx; 1192 struct spdk_fs_cb_args *args = &req->args; 1193 1194 args->fn.fs_op(args->arg, bserrno); 1195 free_fs_request(req); 1196 } 1197 1198 static void 1199 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1200 { 1201 struct spdk_fs_request *req = ctx; 1202 struct spdk_fs_cb_args *args = &req->args; 1203 const char *new_name = args->op.rename.new_name; 1204 1205 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1206 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1207 } 1208 1209 static void 1210 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1211 { 1212 struct spdk_fs_cb_args *args = &req->args; 1213 struct spdk_file *f; 1214 1215 f = fs_find_file(args->fs, args->op.rename.old_name); 1216 if (f == NULL) { 1217 args->fn.fs_op(args->arg, -ENOENT); 1218 free_fs_request(req); 1219 return; 1220 } 1221 1222 free(f->name); 1223 f->name = strdup(args->op.rename.new_name); 1224 args->file = f; 1225 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1226 } 1227 1228 static void 1229 fs_rename_delete_done(void *arg, int fserrno) 1230 { 1231 __spdk_fs_md_rename_file(arg); 1232 } 1233 1234 void 1235 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1236 const char *old_name, const char *new_name, 1237 spdk_file_op_complete cb_fn, void *cb_arg) 1238 { 1239 struct spdk_file *f; 1240 struct spdk_fs_request *req; 1241 struct spdk_fs_cb_args *args; 1242 1243 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1244 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1245 cb_fn(cb_arg, -ENAMETOOLONG); 1246 return; 1247 } 1248 1249 req = alloc_fs_request(fs->md_target.md_fs_channel); 1250 if (req == NULL) { 1251 cb_fn(cb_arg, -ENOMEM); 1252 return; 1253 } 1254 1255 args = &req->args; 1256 args->fn.fs_op = cb_fn; 1257 args->fs = fs; 1258 args->arg = cb_arg; 1259 args->op.rename.old_name = old_name; 1260 args->op.rename.new_name = new_name; 1261 1262 f = fs_find_file(fs, new_name); 1263 if (f == NULL) { 1264 __spdk_fs_md_rename_file(req); 1265 return; 1266 } 1267 1268 /* 1269 * The rename overwrites an existing file. So delete the existing file, then 1270 * do the actual rename. 1271 */ 1272 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1273 } 1274 1275 static void 1276 __fs_rename_file_done(void *arg, int fserrno) 1277 { 1278 struct spdk_fs_request *req = arg; 1279 struct spdk_fs_cb_args *args = &req->args; 1280 1281 __wake_caller(args, fserrno); 1282 } 1283 1284 static void 1285 __fs_rename_file(void *arg) 1286 { 1287 struct spdk_fs_request *req = arg; 1288 struct spdk_fs_cb_args *args = &req->args; 1289 1290 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1291 __fs_rename_file_done, req); 1292 } 1293 1294 int 1295 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1296 const char *old_name, const char *new_name) 1297 { 1298 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1299 struct spdk_fs_request *req; 1300 struct spdk_fs_cb_args *args; 1301 int rc; 1302 1303 req = alloc_fs_request(channel); 1304 if (req == NULL) { 1305 return -ENOMEM; 1306 } 1307 1308 args = &req->args; 1309 1310 args->fs = fs; 1311 args->op.rename.old_name = old_name; 1312 args->op.rename.new_name = new_name; 1313 args->sem = &channel->sem; 1314 fs->send_request(__fs_rename_file, req); 1315 sem_wait(&channel->sem); 1316 rc = args->rc; 1317 free_fs_request(req); 1318 return rc; 1319 } 1320 1321 static void 1322 blob_delete_cb(void *ctx, int bserrno) 1323 { 1324 struct spdk_fs_request *req = ctx; 1325 struct spdk_fs_cb_args *args = &req->args; 1326 1327 args->fn.file_op(args->arg, bserrno); 1328 free_fs_request(req); 1329 } 1330 1331 void 1332 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1333 spdk_file_op_complete cb_fn, void *cb_arg) 1334 { 1335 struct spdk_file *f; 1336 spdk_blob_id blobid; 1337 struct spdk_fs_request *req; 1338 struct spdk_fs_cb_args *args; 1339 1340 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1341 1342 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1343 cb_fn(cb_arg, -ENAMETOOLONG); 1344 return; 1345 } 1346 1347 f = fs_find_file(fs, name); 1348 if (f == NULL) { 1349 cb_fn(cb_arg, -ENOENT); 1350 return; 1351 } 1352 1353 req = alloc_fs_request(fs->md_target.md_fs_channel); 1354 if (req == NULL) { 1355 cb_fn(cb_arg, -ENOMEM); 1356 return; 1357 } 1358 1359 args = &req->args; 1360 args->fn.file_op = cb_fn; 1361 args->arg = cb_arg; 1362 1363 if (f->ref_count > 0) { 1364 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1365 f->is_deleted = true; 1366 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1367 spdk_blob_sync_md(f->blob, blob_delete_cb, args); 1368 return; 1369 } 1370 1371 TAILQ_REMOVE(&fs->files, f, tailq); 1372 1373 cache_free_buffers(f); 1374 1375 blobid = f->blobid; 1376 1377 free(f->name); 1378 free(f->tree); 1379 free(f); 1380 1381 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1382 } 1383 1384 static void 1385 __fs_delete_file_done(void *arg, int fserrno) 1386 { 1387 struct spdk_fs_request *req = arg; 1388 struct spdk_fs_cb_args *args = &req->args; 1389 1390 __wake_caller(args, fserrno); 1391 } 1392 1393 static void 1394 __fs_delete_file(void *arg) 1395 { 1396 struct spdk_fs_request *req = arg; 1397 struct spdk_fs_cb_args *args = &req->args; 1398 1399 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1400 } 1401 1402 int 1403 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1404 const char *name) 1405 { 1406 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1407 struct spdk_fs_request *req; 1408 struct spdk_fs_cb_args *args; 1409 int rc; 1410 1411 req = alloc_fs_request(channel); 1412 if (req == NULL) { 1413 return -ENOMEM; 1414 } 1415 1416 args = &req->args; 1417 args->fs = fs; 1418 args->op.delete.name = name; 1419 args->sem = &channel->sem; 1420 fs->send_request(__fs_delete_file, req); 1421 sem_wait(&channel->sem); 1422 rc = args->rc; 1423 free_fs_request(req); 1424 1425 return rc; 1426 } 1427 1428 spdk_fs_iter 1429 spdk_fs_iter_first(struct spdk_filesystem *fs) 1430 { 1431 struct spdk_file *f; 1432 1433 f = TAILQ_FIRST(&fs->files); 1434 return f; 1435 } 1436 1437 spdk_fs_iter 1438 spdk_fs_iter_next(spdk_fs_iter iter) 1439 { 1440 struct spdk_file *f = iter; 1441 1442 if (f == NULL) { 1443 return NULL; 1444 } 1445 1446 f = TAILQ_NEXT(f, tailq); 1447 return f; 1448 } 1449 1450 const char * 1451 spdk_file_get_name(struct spdk_file *file) 1452 { 1453 return file->name; 1454 } 1455 1456 uint64_t 1457 spdk_file_get_length(struct spdk_file *file) 1458 { 1459 uint64_t length; 1460 1461 assert(file != NULL); 1462 1463 length = file->append_pos >= file->length ? file->append_pos : file->length; 1464 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length); 1465 return length; 1466 } 1467 1468 static void 1469 fs_truncate_complete_cb(void *ctx, int bserrno) 1470 { 1471 struct spdk_fs_request *req = ctx; 1472 struct spdk_fs_cb_args *args = &req->args; 1473 1474 args->fn.file_op(args->arg, bserrno); 1475 free_fs_request(req); 1476 } 1477 1478 static void 1479 fs_truncate_resize_cb(void *ctx, int bserrno) 1480 { 1481 struct spdk_fs_request *req = ctx; 1482 struct spdk_fs_cb_args *args = &req->args; 1483 struct spdk_file *file = args->file; 1484 uint64_t *length = &args->op.truncate.length; 1485 1486 if (bserrno) { 1487 args->fn.file_op(args->arg, bserrno); 1488 free_fs_request(req); 1489 return; 1490 } 1491 1492 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1493 1494 file->length = *length; 1495 if (file->append_pos > file->length) { 1496 file->append_pos = file->length; 1497 } 1498 1499 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, args); 1500 } 1501 1502 static uint64_t 1503 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1504 { 1505 return (length + cluster_sz - 1) / cluster_sz; 1506 } 1507 1508 void 1509 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1510 spdk_file_op_complete cb_fn, void *cb_arg) 1511 { 1512 struct spdk_filesystem *fs; 1513 size_t num_clusters; 1514 struct spdk_fs_request *req; 1515 struct spdk_fs_cb_args *args; 1516 1517 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1518 if (length == file->length) { 1519 cb_fn(cb_arg, 0); 1520 return; 1521 } 1522 1523 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1524 if (req == NULL) { 1525 cb_fn(cb_arg, -ENOMEM); 1526 return; 1527 } 1528 1529 args = &req->args; 1530 args->fn.file_op = cb_fn; 1531 args->arg = cb_arg; 1532 args->file = file; 1533 args->op.truncate.length = length; 1534 fs = file->fs; 1535 1536 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1537 1538 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1539 } 1540 1541 static void 1542 __truncate(void *arg) 1543 { 1544 struct spdk_fs_request *req = arg; 1545 struct spdk_fs_cb_args *args = &req->args; 1546 1547 spdk_file_truncate_async(args->file, args->op.truncate.length, 1548 args->fn.file_op, args); 1549 } 1550 1551 int 1552 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 1553 uint64_t length) 1554 { 1555 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1556 struct spdk_fs_request *req; 1557 struct spdk_fs_cb_args *args; 1558 int rc; 1559 1560 req = alloc_fs_request(channel); 1561 if (req == NULL) { 1562 return -ENOMEM; 1563 } 1564 1565 args = &req->args; 1566 1567 args->file = file; 1568 args->op.truncate.length = length; 1569 args->fn.file_op = __wake_caller; 1570 args->sem = &channel->sem; 1571 1572 channel->send_request(__truncate, req); 1573 sem_wait(&channel->sem); 1574 rc = args->rc; 1575 free_fs_request(req); 1576 1577 return rc; 1578 } 1579 1580 static void 1581 __rw_done(void *ctx, int bserrno) 1582 { 1583 struct spdk_fs_request *req = ctx; 1584 struct spdk_fs_cb_args *args = &req->args; 1585 1586 spdk_free(args->op.rw.pin_buf); 1587 args->fn.file_op(args->arg, bserrno); 1588 free_fs_request(req); 1589 } 1590 1591 static void 1592 __read_done(void *ctx, int bserrno) 1593 { 1594 struct spdk_fs_request *req = ctx; 1595 struct spdk_fs_cb_args *args = &req->args; 1596 1597 assert(req != NULL); 1598 if (args->op.rw.is_read) { 1599 memcpy(args->op.rw.user_buf, 1600 args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)), 1601 args->op.rw.length); 1602 __rw_done(req, 0); 1603 } else { 1604 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)), 1605 args->op.rw.user_buf, 1606 args->op.rw.length); 1607 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1608 args->op.rw.pin_buf, 1609 args->op.rw.start_lba, args->op.rw.num_lba, 1610 __rw_done, req); 1611 } 1612 } 1613 1614 static void 1615 __do_blob_read(void *ctx, int fserrno) 1616 { 1617 struct spdk_fs_request *req = ctx; 1618 struct spdk_fs_cb_args *args = &req->args; 1619 1620 if (fserrno) { 1621 __rw_done(req, fserrno); 1622 return; 1623 } 1624 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1625 args->op.rw.pin_buf, 1626 args->op.rw.start_lba, args->op.rw.num_lba, 1627 __read_done, req); 1628 } 1629 1630 static void 1631 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1632 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1633 { 1634 uint64_t end_lba; 1635 1636 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1637 *start_lba = offset / *lba_size; 1638 end_lba = (offset + length - 1) / *lba_size; 1639 *num_lba = (end_lba - *start_lba + 1); 1640 } 1641 1642 static void 1643 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1644 void *payload, uint64_t offset, uint64_t length, 1645 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1646 { 1647 struct spdk_fs_request *req; 1648 struct spdk_fs_cb_args *args; 1649 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1650 uint64_t start_lba, num_lba, pin_buf_length; 1651 uint32_t lba_size; 1652 1653 if (is_read && offset + length > file->length) { 1654 cb_fn(cb_arg, -EINVAL); 1655 return; 1656 } 1657 1658 req = alloc_fs_request(channel); 1659 if (req == NULL) { 1660 cb_fn(cb_arg, -ENOMEM); 1661 return; 1662 } 1663 1664 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1665 1666 args = &req->args; 1667 args->fn.file_op = cb_fn; 1668 args->arg = cb_arg; 1669 args->file = file; 1670 args->op.rw.channel = channel->bs_channel; 1671 args->op.rw.user_buf = payload; 1672 args->op.rw.is_read = is_read; 1673 args->op.rw.offset = offset; 1674 args->op.rw.length = length; 1675 args->op.rw.blocklen = lba_size; 1676 1677 pin_buf_length = num_lba * lba_size; 1678 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL, 1679 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1680 if (args->op.rw.pin_buf == NULL) { 1681 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1682 file->name, offset, length); 1683 free_fs_request(req); 1684 cb_fn(cb_arg, -ENOMEM); 1685 return; 1686 } 1687 1688 args->op.rw.start_lba = start_lba; 1689 args->op.rw.num_lba = num_lba; 1690 1691 if (!is_read && file->length < offset + length) { 1692 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1693 } else { 1694 __do_blob_read(req, 0); 1695 } 1696 } 1697 1698 void 1699 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1700 void *payload, uint64_t offset, uint64_t length, 1701 spdk_file_op_complete cb_fn, void *cb_arg) 1702 { 1703 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1704 } 1705 1706 void 1707 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1708 void *payload, uint64_t offset, uint64_t length, 1709 spdk_file_op_complete cb_fn, void *cb_arg) 1710 { 1711 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1712 file->name, offset, length); 1713 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1714 } 1715 1716 struct spdk_io_channel * 1717 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1718 { 1719 struct spdk_io_channel *io_channel; 1720 struct spdk_fs_channel *fs_channel; 1721 1722 io_channel = spdk_get_io_channel(&fs->io_target); 1723 fs_channel = spdk_io_channel_get_ctx(io_channel); 1724 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1725 fs_channel->send_request = __send_request_direct; 1726 1727 return io_channel; 1728 } 1729 1730 void 1731 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1732 { 1733 spdk_put_io_channel(channel); 1734 } 1735 1736 struct spdk_fs_thread_ctx * 1737 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs) 1738 { 1739 struct spdk_fs_thread_ctx *ctx; 1740 1741 ctx = calloc(1, sizeof(*ctx)); 1742 if (!ctx) { 1743 return NULL; 1744 } 1745 1746 _spdk_fs_channel_create(fs, &ctx->ch, 512); 1747 1748 ctx->ch.send_request = fs->send_request; 1749 ctx->ch.sync = 1; 1750 pthread_spin_init(&ctx->ch.lock, 0); 1751 1752 return ctx; 1753 } 1754 1755 1756 void 1757 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx) 1758 { 1759 _spdk_fs_channel_destroy(NULL, &ctx->ch); 1760 free(ctx); 1761 } 1762 1763 void 1764 spdk_fs_set_cache_size(uint64_t size_in_mb) 1765 { 1766 g_fs_cache_size = size_in_mb * 1024 * 1024; 1767 } 1768 1769 uint64_t 1770 spdk_fs_get_cache_size(void) 1771 { 1772 return g_fs_cache_size / (1024 * 1024); 1773 } 1774 1775 static void __file_flush(void *ctx); 1776 1777 static void * 1778 alloc_cache_memory_buffer(struct spdk_file *context) 1779 { 1780 struct spdk_file *file; 1781 void *buf; 1782 1783 buf = spdk_mempool_get(g_cache_pool); 1784 if (buf != NULL) { 1785 return buf; 1786 } 1787 1788 pthread_spin_lock(&g_caches_lock); 1789 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1790 if (!file->open_for_writing && 1791 file->priority == SPDK_FILE_PRIORITY_LOW && 1792 file != context) { 1793 break; 1794 } 1795 } 1796 pthread_spin_unlock(&g_caches_lock); 1797 if (file != NULL) { 1798 cache_free_buffers(file); 1799 buf = spdk_mempool_get(g_cache_pool); 1800 if (buf != NULL) { 1801 return buf; 1802 } 1803 } 1804 1805 pthread_spin_lock(&g_caches_lock); 1806 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1807 if (!file->open_for_writing && file != context) { 1808 break; 1809 } 1810 } 1811 pthread_spin_unlock(&g_caches_lock); 1812 if (file != NULL) { 1813 cache_free_buffers(file); 1814 buf = spdk_mempool_get(g_cache_pool); 1815 if (buf != NULL) { 1816 return buf; 1817 } 1818 } 1819 1820 pthread_spin_lock(&g_caches_lock); 1821 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1822 if (file != context) { 1823 break; 1824 } 1825 } 1826 pthread_spin_unlock(&g_caches_lock); 1827 if (file != NULL) { 1828 cache_free_buffers(file); 1829 buf = spdk_mempool_get(g_cache_pool); 1830 if (buf != NULL) { 1831 return buf; 1832 } 1833 } 1834 1835 return NULL; 1836 } 1837 1838 static struct cache_buffer * 1839 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1840 { 1841 struct cache_buffer *buf; 1842 int count = 0; 1843 1844 buf = calloc(1, sizeof(*buf)); 1845 if (buf == NULL) { 1846 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 1847 return NULL; 1848 } 1849 1850 buf->buf = alloc_cache_memory_buffer(file); 1851 while (buf->buf == NULL) { 1852 /* 1853 * TODO: alloc_cache_memory_buffer() should eventually free 1854 * some buffers. Need a more sophisticated check here, instead 1855 * of just bailing if 100 tries does not result in getting a 1856 * free buffer. This will involve using the sync channel's 1857 * semaphore to block until a buffer becomes available. 1858 */ 1859 if (count++ == 100) { 1860 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 1861 file, offset); 1862 free(buf); 1863 return NULL; 1864 } 1865 buf->buf = alloc_cache_memory_buffer(file); 1866 } 1867 1868 buf->buf_size = CACHE_BUFFER_SIZE; 1869 buf->offset = offset; 1870 1871 pthread_spin_lock(&g_caches_lock); 1872 if (file->tree->present_mask == 0) { 1873 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1874 } 1875 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1876 pthread_spin_unlock(&g_caches_lock); 1877 1878 return buf; 1879 } 1880 1881 static struct cache_buffer * 1882 cache_append_buffer(struct spdk_file *file) 1883 { 1884 struct cache_buffer *last; 1885 1886 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1887 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1888 1889 last = cache_insert_buffer(file, file->append_pos); 1890 if (last == NULL) { 1891 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 1892 return NULL; 1893 } 1894 1895 file->last = last; 1896 1897 return last; 1898 } 1899 1900 static void __check_sync_reqs(struct spdk_file *file); 1901 1902 static void 1903 __file_cache_finish_sync(void *ctx, int bserrno) 1904 { 1905 struct spdk_file *file = ctx; 1906 struct spdk_fs_request *sync_req; 1907 struct spdk_fs_cb_args *sync_args; 1908 1909 pthread_spin_lock(&file->lock); 1910 sync_req = TAILQ_FIRST(&file->sync_requests); 1911 sync_args = &sync_req->args; 1912 assert(sync_args->op.sync.offset <= file->length_flushed); 1913 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1914 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1915 pthread_spin_unlock(&file->lock); 1916 1917 sync_args->fn.file_op(sync_args->arg, bserrno); 1918 __check_sync_reqs(file); 1919 1920 pthread_spin_lock(&file->lock); 1921 free_fs_request(sync_req); 1922 pthread_spin_unlock(&file->lock); 1923 } 1924 1925 static void 1926 __check_sync_reqs(struct spdk_file *file) 1927 { 1928 struct spdk_fs_request *sync_req; 1929 1930 pthread_spin_lock(&file->lock); 1931 1932 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1933 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1934 break; 1935 } 1936 } 1937 1938 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1939 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1940 sync_req->args.op.sync.xattr_in_progress = true; 1941 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 1942 sizeof(file->length_flushed)); 1943 1944 pthread_spin_unlock(&file->lock); 1945 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, file); 1946 } else { 1947 pthread_spin_unlock(&file->lock); 1948 } 1949 } 1950 1951 static void 1952 __file_flush_done(void *ctx, int bserrno) 1953 { 1954 struct spdk_fs_request *req = ctx; 1955 struct spdk_fs_cb_args *args = &req->args; 1956 struct spdk_file *file = args->file; 1957 struct cache_buffer *next = args->op.flush.cache_buffer; 1958 1959 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1960 1961 pthread_spin_lock(&file->lock); 1962 next->in_progress = false; 1963 next->bytes_flushed += args->op.flush.length; 1964 file->length_flushed += args->op.flush.length; 1965 if (file->length_flushed > file->length) { 1966 file->length = file->length_flushed; 1967 } 1968 if (next->bytes_flushed == next->buf_size) { 1969 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1970 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1971 } 1972 1973 /* 1974 * Assert that there is no cached data that extends past the end of the underlying 1975 * blob. 1976 */ 1977 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1978 next->bytes_filled == 0); 1979 1980 pthread_spin_unlock(&file->lock); 1981 1982 __check_sync_reqs(file); 1983 1984 __file_flush(req); 1985 } 1986 1987 static void 1988 __file_flush(void *ctx) 1989 { 1990 struct spdk_fs_request *req = ctx; 1991 struct spdk_fs_cb_args *args = &req->args; 1992 struct spdk_file *file = args->file; 1993 struct cache_buffer *next; 1994 uint64_t offset, length, start_lba, num_lba; 1995 uint32_t lba_size; 1996 1997 pthread_spin_lock(&file->lock); 1998 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1999 if (next == NULL || next->in_progress) { 2000 /* 2001 * There is either no data to flush, or a flush I/O is already in 2002 * progress. So return immediately - if a flush I/O is in 2003 * progress we will flush more data after that is completed. 2004 */ 2005 free_fs_request(req); 2006 if (next == NULL) { 2007 /* 2008 * For cases where a file's cache was evicted, and then the 2009 * file was later appended, we will write the data directly 2010 * to disk and bypass cache. So just update length_flushed 2011 * here to reflect that all data was already written to disk. 2012 */ 2013 file->length_flushed = file->append_pos; 2014 } 2015 pthread_spin_unlock(&file->lock); 2016 if (next == NULL) { 2017 /* 2018 * There is no data to flush, but we still need to check for any 2019 * outstanding sync requests to make sure metadata gets updated. 2020 */ 2021 __check_sync_reqs(file); 2022 } 2023 return; 2024 } 2025 2026 offset = next->offset + next->bytes_flushed; 2027 length = next->bytes_filled - next->bytes_flushed; 2028 if (length == 0) { 2029 free_fs_request(req); 2030 pthread_spin_unlock(&file->lock); 2031 return; 2032 } 2033 args->op.flush.length = length; 2034 args->op.flush.cache_buffer = next; 2035 2036 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2037 2038 next->in_progress = true; 2039 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2040 offset, length, start_lba, num_lba); 2041 pthread_spin_unlock(&file->lock); 2042 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2043 next->buf + (start_lba * lba_size) - next->offset, 2044 start_lba, num_lba, __file_flush_done, req); 2045 } 2046 2047 static void 2048 __file_extend_done(void *arg, int bserrno) 2049 { 2050 struct spdk_fs_cb_args *args = arg; 2051 2052 __wake_caller(args, bserrno); 2053 } 2054 2055 static void 2056 __file_extend_resize_cb(void *_args, int bserrno) 2057 { 2058 struct spdk_fs_cb_args *args = _args; 2059 struct spdk_file *file = args->file; 2060 2061 if (bserrno) { 2062 __wake_caller(args, bserrno); 2063 return; 2064 } 2065 2066 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2067 } 2068 2069 static void 2070 __file_extend_blob(void *_args) 2071 { 2072 struct spdk_fs_cb_args *args = _args; 2073 struct spdk_file *file = args->file; 2074 2075 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2076 } 2077 2078 static void 2079 __rw_from_file_done(void *ctx, int bserrno) 2080 { 2081 struct spdk_fs_request *req = ctx; 2082 2083 __wake_caller(&req->args, bserrno); 2084 free_fs_request(req); 2085 } 2086 2087 static void 2088 __rw_from_file(void *ctx) 2089 { 2090 struct spdk_fs_request *req = ctx; 2091 struct spdk_fs_cb_args *args = &req->args; 2092 struct spdk_file *file = args->file; 2093 2094 if (args->op.rw.is_read) { 2095 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 2096 args->op.rw.offset, args->op.rw.length, 2097 __rw_from_file_done, req); 2098 } else { 2099 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 2100 args->op.rw.offset, args->op.rw.length, 2101 __rw_from_file_done, req); 2102 } 2103 } 2104 2105 static int 2106 __send_rw_from_file(struct spdk_file *file, void *payload, 2107 uint64_t offset, uint64_t length, bool is_read, 2108 struct spdk_fs_channel *channel) 2109 { 2110 struct spdk_fs_request *req; 2111 struct spdk_fs_cb_args *args; 2112 2113 req = alloc_fs_request(channel); 2114 if (req == NULL) { 2115 sem_post(&channel->sem); 2116 return -ENOMEM; 2117 } 2118 2119 args = &req->args; 2120 args->file = file; 2121 args->sem = &channel->sem; 2122 args->op.rw.user_buf = payload; 2123 args->op.rw.offset = offset; 2124 args->op.rw.length = length; 2125 args->op.rw.is_read = is_read; 2126 file->fs->send_request(__rw_from_file, req); 2127 return 0; 2128 } 2129 2130 int 2131 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2132 void *payload, uint64_t offset, uint64_t length) 2133 { 2134 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2135 struct spdk_fs_request *flush_req; 2136 uint64_t rem_length, copy, blob_size, cluster_sz; 2137 uint32_t cache_buffers_filled = 0; 2138 uint8_t *cur_payload; 2139 struct cache_buffer *last; 2140 2141 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2142 2143 if (length == 0) { 2144 return 0; 2145 } 2146 2147 if (offset != file->append_pos) { 2148 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2149 return -EINVAL; 2150 } 2151 2152 pthread_spin_lock(&file->lock); 2153 file->open_for_writing = true; 2154 2155 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2156 cache_append_buffer(file); 2157 } 2158 2159 if (file->last == NULL) { 2160 int rc; 2161 2162 file->append_pos += length; 2163 pthread_spin_unlock(&file->lock); 2164 rc = __send_rw_from_file(file, payload, offset, length, false, channel); 2165 sem_wait(&channel->sem); 2166 return rc; 2167 } 2168 2169 blob_size = __file_get_blob_size(file); 2170 2171 if ((offset + length) > blob_size) { 2172 struct spdk_fs_cb_args extend_args = {}; 2173 2174 cluster_sz = file->fs->bs_opts.cluster_sz; 2175 extend_args.sem = &channel->sem; 2176 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2177 extend_args.file = file; 2178 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2179 pthread_spin_unlock(&file->lock); 2180 file->fs->send_request(__file_extend_blob, &extend_args); 2181 sem_wait(&channel->sem); 2182 if (extend_args.rc) { 2183 return extend_args.rc; 2184 } 2185 } 2186 2187 flush_req = alloc_fs_request(channel); 2188 if (flush_req == NULL) { 2189 pthread_spin_unlock(&file->lock); 2190 return -ENOMEM; 2191 } 2192 2193 last = file->last; 2194 rem_length = length; 2195 cur_payload = payload; 2196 while (rem_length > 0) { 2197 copy = last->buf_size - last->bytes_filled; 2198 if (copy > rem_length) { 2199 copy = rem_length; 2200 } 2201 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2202 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2203 file->append_pos += copy; 2204 if (file->length < file->append_pos) { 2205 file->length = file->append_pos; 2206 } 2207 cur_payload += copy; 2208 last->bytes_filled += copy; 2209 rem_length -= copy; 2210 if (last->bytes_filled == last->buf_size) { 2211 cache_buffers_filled++; 2212 last = cache_append_buffer(file); 2213 if (last == NULL) { 2214 BLOBFS_TRACE(file, "nomem\n"); 2215 free_fs_request(flush_req); 2216 pthread_spin_unlock(&file->lock); 2217 return -ENOMEM; 2218 } 2219 } 2220 } 2221 2222 pthread_spin_unlock(&file->lock); 2223 2224 if (cache_buffers_filled == 0) { 2225 free_fs_request(flush_req); 2226 return 0; 2227 } 2228 2229 flush_req->args.file = file; 2230 file->fs->send_request(__file_flush, flush_req); 2231 return 0; 2232 } 2233 2234 static void 2235 __readahead_done(void *ctx, int bserrno) 2236 { 2237 struct spdk_fs_request *req = ctx; 2238 struct spdk_fs_cb_args *args = &req->args; 2239 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2240 struct spdk_file *file = args->file; 2241 2242 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2243 2244 pthread_spin_lock(&file->lock); 2245 cache_buffer->bytes_filled = args->op.readahead.length; 2246 cache_buffer->bytes_flushed = args->op.readahead.length; 2247 cache_buffer->in_progress = false; 2248 pthread_spin_unlock(&file->lock); 2249 2250 free_fs_request(req); 2251 } 2252 2253 static void 2254 __readahead(void *ctx) 2255 { 2256 struct spdk_fs_request *req = ctx; 2257 struct spdk_fs_cb_args *args = &req->args; 2258 struct spdk_file *file = args->file; 2259 uint64_t offset, length, start_lba, num_lba; 2260 uint32_t lba_size; 2261 2262 offset = args->op.readahead.offset; 2263 length = args->op.readahead.length; 2264 assert(length > 0); 2265 2266 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2267 2268 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2269 offset, length, start_lba, num_lba); 2270 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2271 args->op.readahead.cache_buffer->buf, 2272 start_lba, num_lba, __readahead_done, req); 2273 } 2274 2275 static uint64_t 2276 __next_cache_buffer_offset(uint64_t offset) 2277 { 2278 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2279 } 2280 2281 static void 2282 check_readahead(struct spdk_file *file, uint64_t offset, 2283 struct spdk_fs_channel *channel) 2284 { 2285 struct spdk_fs_request *req; 2286 struct spdk_fs_cb_args *args; 2287 2288 offset = __next_cache_buffer_offset(offset); 2289 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2290 return; 2291 } 2292 2293 req = alloc_fs_request(channel); 2294 if (req == NULL) { 2295 return; 2296 } 2297 args = &req->args; 2298 2299 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2300 2301 args->file = file; 2302 args->op.readahead.offset = offset; 2303 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2304 if (!args->op.readahead.cache_buffer) { 2305 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2306 free_fs_request(req); 2307 return; 2308 } 2309 2310 args->op.readahead.cache_buffer->in_progress = true; 2311 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2312 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2313 } else { 2314 args->op.readahead.length = CACHE_BUFFER_SIZE; 2315 } 2316 file->fs->send_request(__readahead, req); 2317 } 2318 2319 static int 2320 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, 2321 struct spdk_fs_channel *channel) 2322 { 2323 struct cache_buffer *buf; 2324 int rc; 2325 2326 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2327 if (buf == NULL) { 2328 pthread_spin_unlock(&file->lock); 2329 rc = __send_rw_from_file(file, payload, offset, length, true, channel); 2330 pthread_spin_lock(&file->lock); 2331 return rc; 2332 } 2333 2334 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2335 length = buf->offset + buf->bytes_filled - offset; 2336 } 2337 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2338 memcpy(payload, &buf->buf[offset - buf->offset], length); 2339 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2340 pthread_spin_lock(&g_caches_lock); 2341 spdk_tree_remove_buffer(file->tree, buf); 2342 if (file->tree->present_mask == 0) { 2343 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2344 } 2345 pthread_spin_unlock(&g_caches_lock); 2346 } 2347 2348 sem_post(&channel->sem); 2349 return 0; 2350 } 2351 2352 int64_t 2353 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2354 void *payload, uint64_t offset, uint64_t length) 2355 { 2356 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2357 uint64_t final_offset, final_length; 2358 uint32_t sub_reads = 0; 2359 int rc = 0; 2360 2361 pthread_spin_lock(&file->lock); 2362 2363 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2364 2365 file->open_for_writing = false; 2366 2367 if (length == 0 || offset >= file->append_pos) { 2368 pthread_spin_unlock(&file->lock); 2369 return 0; 2370 } 2371 2372 if (offset + length > file->append_pos) { 2373 length = file->append_pos - offset; 2374 } 2375 2376 if (offset != file->next_seq_offset) { 2377 file->seq_byte_count = 0; 2378 } 2379 file->seq_byte_count += length; 2380 file->next_seq_offset = offset + length; 2381 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2382 check_readahead(file, offset, channel); 2383 check_readahead(file, offset + CACHE_BUFFER_SIZE, channel); 2384 } 2385 2386 final_length = 0; 2387 final_offset = offset + length; 2388 while (offset < final_offset) { 2389 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2390 if (length > (final_offset - offset)) { 2391 length = final_offset - offset; 2392 } 2393 rc = __file_read(file, payload, offset, length, channel); 2394 if (rc == 0) { 2395 final_length += length; 2396 } else { 2397 break; 2398 } 2399 payload += length; 2400 offset += length; 2401 sub_reads++; 2402 } 2403 pthread_spin_unlock(&file->lock); 2404 while (sub_reads-- > 0) { 2405 sem_wait(&channel->sem); 2406 } 2407 if (rc == 0) { 2408 return final_length; 2409 } else { 2410 return rc; 2411 } 2412 } 2413 2414 static void 2415 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2416 spdk_file_op_complete cb_fn, void *cb_arg) 2417 { 2418 struct spdk_fs_request *sync_req; 2419 struct spdk_fs_request *flush_req; 2420 struct spdk_fs_cb_args *sync_args; 2421 struct spdk_fs_cb_args *flush_args; 2422 2423 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2424 2425 pthread_spin_lock(&file->lock); 2426 if (file->append_pos <= file->length_flushed) { 2427 BLOBFS_TRACE(file, "done - no data to flush\n"); 2428 pthread_spin_unlock(&file->lock); 2429 cb_fn(cb_arg, 0); 2430 return; 2431 } 2432 2433 sync_req = alloc_fs_request(channel); 2434 if (!sync_req) { 2435 pthread_spin_unlock(&file->lock); 2436 cb_fn(cb_arg, -ENOMEM); 2437 return; 2438 } 2439 sync_args = &sync_req->args; 2440 2441 flush_req = alloc_fs_request(channel); 2442 if (!flush_req) { 2443 pthread_spin_unlock(&file->lock); 2444 cb_fn(cb_arg, -ENOMEM); 2445 return; 2446 } 2447 flush_args = &flush_req->args; 2448 2449 sync_args->file = file; 2450 sync_args->fn.file_op = cb_fn; 2451 sync_args->arg = cb_arg; 2452 sync_args->op.sync.offset = file->append_pos; 2453 sync_args->op.sync.xattr_in_progress = false; 2454 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2455 pthread_spin_unlock(&file->lock); 2456 2457 flush_args->file = file; 2458 channel->send_request(__file_flush, flush_req); 2459 } 2460 2461 int 2462 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2463 { 2464 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2465 struct spdk_fs_cb_args args = {}; 2466 2467 args.sem = &channel->sem; 2468 _file_sync(file, channel, __wake_caller, &args); 2469 sem_wait(&channel->sem); 2470 2471 return args.rc; 2472 } 2473 2474 void 2475 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2476 spdk_file_op_complete cb_fn, void *cb_arg) 2477 { 2478 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2479 2480 _file_sync(file, channel, cb_fn, cb_arg); 2481 } 2482 2483 void 2484 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2485 { 2486 BLOBFS_TRACE(file, "priority=%u\n", priority); 2487 file->priority = priority; 2488 2489 } 2490 2491 /* 2492 * Close routines 2493 */ 2494 2495 static void 2496 __file_close_async_done(void *ctx, int bserrno) 2497 { 2498 struct spdk_fs_request *req = ctx; 2499 struct spdk_fs_cb_args *args = &req->args; 2500 struct spdk_file *file = args->file; 2501 2502 if (file->is_deleted) { 2503 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2504 return; 2505 } 2506 2507 args->fn.file_op(args->arg, bserrno); 2508 free_fs_request(req); 2509 } 2510 2511 static void 2512 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2513 { 2514 struct spdk_blob *blob; 2515 2516 pthread_spin_lock(&file->lock); 2517 if (file->ref_count == 0) { 2518 pthread_spin_unlock(&file->lock); 2519 __file_close_async_done(req, -EBADF); 2520 return; 2521 } 2522 2523 file->ref_count--; 2524 if (file->ref_count > 0) { 2525 pthread_spin_unlock(&file->lock); 2526 req->args.fn.file_op(req->args.arg, 0); 2527 free_fs_request(req); 2528 return; 2529 } 2530 2531 pthread_spin_unlock(&file->lock); 2532 2533 blob = file->blob; 2534 file->blob = NULL; 2535 spdk_blob_close(blob, __file_close_async_done, req); 2536 } 2537 2538 static void 2539 __file_close_async__sync_done(void *arg, int fserrno) 2540 { 2541 struct spdk_fs_request *req = arg; 2542 struct spdk_fs_cb_args *args = &req->args; 2543 2544 __file_close_async(args->file, req); 2545 } 2546 2547 void 2548 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2549 { 2550 struct spdk_fs_request *req; 2551 struct spdk_fs_cb_args *args; 2552 2553 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2554 if (req == NULL) { 2555 cb_fn(cb_arg, -ENOMEM); 2556 return; 2557 } 2558 2559 args = &req->args; 2560 args->file = file; 2561 args->fn.file_op = cb_fn; 2562 args->arg = cb_arg; 2563 2564 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2565 } 2566 2567 static void 2568 __file_close(void *arg) 2569 { 2570 struct spdk_fs_request *req = arg; 2571 struct spdk_fs_cb_args *args = &req->args; 2572 struct spdk_file *file = args->file; 2573 2574 __file_close_async(file, req); 2575 } 2576 2577 int 2578 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2579 { 2580 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2581 struct spdk_fs_request *req; 2582 struct spdk_fs_cb_args *args; 2583 2584 req = alloc_fs_request(channel); 2585 if (req == NULL) { 2586 return -ENOMEM; 2587 } 2588 2589 args = &req->args; 2590 2591 spdk_file_sync(file, ctx); 2592 BLOBFS_TRACE(file, "name=%s\n", file->name); 2593 args->file = file; 2594 args->sem = &channel->sem; 2595 args->fn.file_op = __wake_caller; 2596 args->arg = req; 2597 channel->send_request(__file_close, req); 2598 sem_wait(&channel->sem); 2599 2600 return args->rc; 2601 } 2602 2603 int 2604 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2605 { 2606 if (size < sizeof(spdk_blob_id)) { 2607 return -EINVAL; 2608 } 2609 2610 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2611 2612 return sizeof(spdk_blob_id); 2613 } 2614 2615 static void 2616 cache_free_buffers(struct spdk_file *file) 2617 { 2618 BLOBFS_TRACE(file, "free=%s\n", file->name); 2619 pthread_spin_lock(&file->lock); 2620 pthread_spin_lock(&g_caches_lock); 2621 if (file->tree->present_mask == 0) { 2622 pthread_spin_unlock(&g_caches_lock); 2623 pthread_spin_unlock(&file->lock); 2624 return; 2625 } 2626 spdk_tree_free_buffers(file->tree); 2627 2628 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2629 /* If not freed, put it in the end of the queue */ 2630 if (file->tree->present_mask != 0) { 2631 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2632 } 2633 file->last = NULL; 2634 pthread_spin_unlock(&g_caches_lock); 2635 pthread_spin_unlock(&file->lock); 2636 } 2637 2638 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2639 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2640