1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "blobfs_internal.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/thread.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 #include "spdk/trace.h" 47 48 #define BLOBFS_TRACE(file, str, args...) \ 49 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 50 51 #define BLOBFS_TRACE_RW(file, str, args...) \ 52 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 53 54 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 55 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 56 57 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 58 static struct spdk_mempool *g_cache_pool; 59 static TAILQ_HEAD(, spdk_file) g_caches; 60 static int g_fs_count = 0; 61 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 62 static pthread_spinlock_t g_caches_lock; 63 64 #define TRACE_GROUP_BLOBFS 0x7 65 #define TRACE_BLOBFS_XATTR_START SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x0) 66 #define TRACE_BLOBFS_XATTR_END SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x1) 67 #define TRACE_BLOBFS_OPEN SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x2) 68 #define TRACE_BLOBFS_CLOSE SPDK_TPOINT_ID(TRACE_GROUP_BLOBFS, 0x3) 69 70 SPDK_TRACE_REGISTER_FN(blobfs_trace, "blobfs", TRACE_GROUP_BLOBFS) 71 { 72 spdk_trace_register_description("BLOBFS_XATTR_START", "", 73 TRACE_BLOBFS_XATTR_START, 74 OWNER_NONE, OBJECT_NONE, 0, 75 SPDK_TRACE_ARG_TYPE_STR, 76 "file: "); 77 spdk_trace_register_description("BLOBFS_XATTR_END", "", 78 TRACE_BLOBFS_XATTR_END, 79 OWNER_NONE, OBJECT_NONE, 0, 80 SPDK_TRACE_ARG_TYPE_STR, 81 "file: "); 82 spdk_trace_register_description("BLOBFS_OPEN", "", 83 TRACE_BLOBFS_OPEN, 84 OWNER_NONE, OBJECT_NONE, 0, 85 SPDK_TRACE_ARG_TYPE_STR, 86 "file: "); 87 spdk_trace_register_description("BLOBFS_CLOSE", "", 88 TRACE_BLOBFS_CLOSE, 89 OWNER_NONE, OBJECT_NONE, 0, 90 SPDK_TRACE_ARG_TYPE_STR, 91 "file: "); 92 } 93 94 void 95 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 96 { 97 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 98 free(cache_buffer); 99 } 100 101 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 102 103 struct spdk_file { 104 struct spdk_filesystem *fs; 105 struct spdk_blob *blob; 106 char *name; 107 uint64_t trace_arg_name; 108 uint64_t length; 109 bool is_deleted; 110 bool open_for_writing; 111 uint64_t length_flushed; 112 uint64_t append_pos; 113 uint64_t seq_byte_count; 114 uint64_t next_seq_offset; 115 uint32_t priority; 116 TAILQ_ENTRY(spdk_file) tailq; 117 spdk_blob_id blobid; 118 uint32_t ref_count; 119 pthread_spinlock_t lock; 120 struct cache_buffer *last; 121 struct cache_tree *tree; 122 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 123 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 124 TAILQ_ENTRY(spdk_file) cache_tailq; 125 }; 126 127 struct spdk_deleted_file { 128 spdk_blob_id id; 129 TAILQ_ENTRY(spdk_deleted_file) tailq; 130 }; 131 132 struct spdk_filesystem { 133 struct spdk_blob_store *bs; 134 TAILQ_HEAD(, spdk_file) files; 135 struct spdk_bs_opts bs_opts; 136 struct spdk_bs_dev *bdev; 137 fs_send_request_fn send_request; 138 139 struct { 140 uint32_t max_ops; 141 struct spdk_io_channel *sync_io_channel; 142 struct spdk_fs_channel *sync_fs_channel; 143 } sync_target; 144 145 struct { 146 uint32_t max_ops; 147 struct spdk_io_channel *md_io_channel; 148 struct spdk_fs_channel *md_fs_channel; 149 } md_target; 150 151 struct { 152 uint32_t max_ops; 153 } io_target; 154 }; 155 156 struct spdk_fs_cb_args { 157 union { 158 spdk_fs_op_with_handle_complete fs_op_with_handle; 159 spdk_fs_op_complete fs_op; 160 spdk_file_op_with_handle_complete file_op_with_handle; 161 spdk_file_op_complete file_op; 162 spdk_file_stat_op_complete stat_op; 163 } fn; 164 void *arg; 165 sem_t *sem; 166 struct spdk_filesystem *fs; 167 struct spdk_file *file; 168 int rc; 169 struct iovec *iovs; 170 uint32_t iovcnt; 171 struct iovec iov; 172 union { 173 struct { 174 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 175 } fs_load; 176 struct { 177 uint64_t length; 178 } truncate; 179 struct { 180 struct spdk_io_channel *channel; 181 void *pin_buf; 182 int is_read; 183 off_t offset; 184 size_t length; 185 uint64_t start_lba; 186 uint64_t num_lba; 187 uint32_t blocklen; 188 } rw; 189 struct { 190 const char *old_name; 191 const char *new_name; 192 } rename; 193 struct { 194 struct cache_buffer *cache_buffer; 195 uint64_t length; 196 } flush; 197 struct { 198 struct cache_buffer *cache_buffer; 199 uint64_t length; 200 uint64_t offset; 201 } readahead; 202 struct { 203 uint64_t offset; 204 TAILQ_ENTRY(spdk_fs_request) tailq; 205 bool xattr_in_progress; 206 } sync; 207 struct { 208 uint32_t num_clusters; 209 } resize; 210 struct { 211 const char *name; 212 uint32_t flags; 213 TAILQ_ENTRY(spdk_fs_request) tailq; 214 } open; 215 struct { 216 const char *name; 217 struct spdk_blob *blob; 218 } create; 219 struct { 220 const char *name; 221 } delete; 222 struct { 223 const char *name; 224 } stat; 225 } op; 226 }; 227 228 static void cache_free_buffers(struct spdk_file *file); 229 static void spdk_fs_io_device_unregister(struct spdk_filesystem *fs); 230 static void spdk_fs_free_io_channels(struct spdk_filesystem *fs); 231 232 void 233 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 234 { 235 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 236 } 237 238 static void 239 __initialize_cache(void) 240 { 241 assert(g_cache_pool == NULL); 242 243 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 244 g_fs_cache_size / CACHE_BUFFER_SIZE, 245 CACHE_BUFFER_SIZE, 246 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 247 SPDK_ENV_SOCKET_ID_ANY); 248 if (!g_cache_pool) { 249 SPDK_ERRLOG("Create mempool failed, you may " 250 "increase the memory and try again\n"); 251 assert(false); 252 } 253 TAILQ_INIT(&g_caches); 254 pthread_spin_init(&g_caches_lock, 0); 255 } 256 257 static void 258 __free_cache(void) 259 { 260 assert(g_cache_pool != NULL); 261 262 spdk_mempool_free(g_cache_pool); 263 g_cache_pool = NULL; 264 } 265 266 static uint64_t 267 __file_get_blob_size(struct spdk_file *file) 268 { 269 uint64_t cluster_sz; 270 271 cluster_sz = file->fs->bs_opts.cluster_sz; 272 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 273 } 274 275 struct spdk_fs_request { 276 struct spdk_fs_cb_args args; 277 TAILQ_ENTRY(spdk_fs_request) link; 278 struct spdk_fs_channel *channel; 279 }; 280 281 struct spdk_fs_channel { 282 struct spdk_fs_request *req_mem; 283 TAILQ_HEAD(, spdk_fs_request) reqs; 284 sem_t sem; 285 struct spdk_filesystem *fs; 286 struct spdk_io_channel *bs_channel; 287 fs_send_request_fn send_request; 288 bool sync; 289 uint32_t outstanding_reqs; 290 pthread_spinlock_t lock; 291 }; 292 293 /* For now, this is effectively an alias. But eventually we'll shift 294 * some data members over. */ 295 struct spdk_fs_thread_ctx { 296 struct spdk_fs_channel ch; 297 }; 298 299 static struct spdk_fs_request * 300 alloc_fs_request_with_iov(struct spdk_fs_channel *channel, uint32_t iovcnt) 301 { 302 struct spdk_fs_request *req; 303 struct iovec *iovs = NULL; 304 305 if (iovcnt > 1) { 306 iovs = calloc(iovcnt, sizeof(struct iovec)); 307 if (!iovs) { 308 return NULL; 309 } 310 } 311 312 if (channel->sync) { 313 pthread_spin_lock(&channel->lock); 314 } 315 316 req = TAILQ_FIRST(&channel->reqs); 317 if (req) { 318 channel->outstanding_reqs++; 319 TAILQ_REMOVE(&channel->reqs, req, link); 320 } 321 322 if (channel->sync) { 323 pthread_spin_unlock(&channel->lock); 324 } 325 326 if (req == NULL) { 327 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 328 free(iovs); 329 return NULL; 330 } 331 memset(req, 0, sizeof(*req)); 332 req->channel = channel; 333 if (iovcnt > 1) { 334 req->args.iovs = iovs; 335 } else { 336 req->args.iovs = &req->args.iov; 337 } 338 req->args.iovcnt = iovcnt; 339 340 return req; 341 } 342 343 static struct spdk_fs_request * 344 alloc_fs_request(struct spdk_fs_channel *channel) 345 { 346 return alloc_fs_request_with_iov(channel, 0); 347 } 348 349 static void 350 free_fs_request(struct spdk_fs_request *req) 351 { 352 struct spdk_fs_channel *channel = req->channel; 353 354 if (req->args.iovcnt > 1) { 355 free(req->args.iovs); 356 } 357 358 if (channel->sync) { 359 pthread_spin_lock(&channel->lock); 360 } 361 362 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 363 channel->outstanding_reqs--; 364 365 if (channel->sync) { 366 pthread_spin_unlock(&channel->lock); 367 } 368 } 369 370 static int 371 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 372 uint32_t max_ops) 373 { 374 uint32_t i; 375 376 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 377 if (!channel->req_mem) { 378 return -1; 379 } 380 381 channel->outstanding_reqs = 0; 382 TAILQ_INIT(&channel->reqs); 383 sem_init(&channel->sem, 0, 0); 384 385 for (i = 0; i < max_ops; i++) { 386 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 387 } 388 389 channel->fs = fs; 390 391 return 0; 392 } 393 394 static int 395 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 396 { 397 struct spdk_filesystem *fs; 398 struct spdk_fs_channel *channel = ctx_buf; 399 400 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 401 402 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 403 } 404 405 static int 406 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 407 { 408 struct spdk_filesystem *fs; 409 struct spdk_fs_channel *channel = ctx_buf; 410 411 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 412 413 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 414 } 415 416 static int 417 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 418 { 419 struct spdk_filesystem *fs; 420 struct spdk_fs_channel *channel = ctx_buf; 421 422 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 423 424 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 425 } 426 427 static void 428 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 429 { 430 struct spdk_fs_channel *channel = ctx_buf; 431 432 if (channel->outstanding_reqs > 0) { 433 SPDK_ERRLOG("channel freed with %" PRIu32 " outstanding requests!\n", 434 channel->outstanding_reqs); 435 } 436 437 free(channel->req_mem); 438 if (channel->bs_channel != NULL) { 439 spdk_bs_free_io_channel(channel->bs_channel); 440 } 441 } 442 443 static void 444 __send_request_direct(fs_request_fn fn, void *arg) 445 { 446 fn(arg); 447 } 448 449 static void 450 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 451 { 452 fs->bs = bs; 453 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 454 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 455 fs->md_target.md_fs_channel->send_request = __send_request_direct; 456 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 457 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 458 459 pthread_mutex_lock(&g_cache_init_lock); 460 if (g_fs_count == 0) { 461 __initialize_cache(); 462 } 463 g_fs_count++; 464 pthread_mutex_unlock(&g_cache_init_lock); 465 } 466 467 static void 468 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 469 { 470 struct spdk_fs_request *req = ctx; 471 struct spdk_fs_cb_args *args = &req->args; 472 struct spdk_filesystem *fs = args->fs; 473 474 if (bserrno == 0) { 475 common_fs_bs_init(fs, bs); 476 } else { 477 free(fs); 478 fs = NULL; 479 } 480 481 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 482 free_fs_request(req); 483 } 484 485 static void 486 fs_conf_parse(void) 487 { 488 struct spdk_conf_section *sp; 489 490 sp = spdk_conf_find_section(NULL, "Blobfs"); 491 if (sp == NULL) { 492 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 493 return; 494 } 495 496 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 497 if (g_fs_cache_buffer_shift <= 0) { 498 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 499 } 500 } 501 502 static struct spdk_filesystem * 503 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 504 { 505 struct spdk_filesystem *fs; 506 507 fs = calloc(1, sizeof(*fs)); 508 if (fs == NULL) { 509 return NULL; 510 } 511 512 fs->bdev = dev; 513 fs->send_request = send_request_fn; 514 TAILQ_INIT(&fs->files); 515 516 fs->md_target.max_ops = 512; 517 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 518 sizeof(struct spdk_fs_channel), "blobfs_md"); 519 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 520 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 521 522 fs->sync_target.max_ops = 512; 523 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 524 sizeof(struct spdk_fs_channel), "blobfs_sync"); 525 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 526 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 527 528 fs->io_target.max_ops = 512; 529 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 530 sizeof(struct spdk_fs_channel), "blobfs_io"); 531 532 return fs; 533 } 534 535 static void 536 __wake_caller(void *arg, int fserrno) 537 { 538 struct spdk_fs_cb_args *args = arg; 539 540 args->rc = fserrno; 541 sem_post(args->sem); 542 } 543 544 void 545 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 546 fs_send_request_fn send_request_fn, 547 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 548 { 549 struct spdk_filesystem *fs; 550 struct spdk_fs_request *req; 551 struct spdk_fs_cb_args *args; 552 struct spdk_bs_opts opts = {}; 553 554 fs = fs_alloc(dev, send_request_fn); 555 if (fs == NULL) { 556 cb_fn(cb_arg, NULL, -ENOMEM); 557 return; 558 } 559 560 fs_conf_parse(); 561 562 req = alloc_fs_request(fs->md_target.md_fs_channel); 563 if (req == NULL) { 564 spdk_fs_free_io_channels(fs); 565 spdk_fs_io_device_unregister(fs); 566 cb_fn(cb_arg, NULL, -ENOMEM); 567 return; 568 } 569 570 args = &req->args; 571 args->fn.fs_op_with_handle = cb_fn; 572 args->arg = cb_arg; 573 args->fs = fs; 574 575 spdk_bs_opts_init(&opts); 576 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS"); 577 if (opt) { 578 opts.cluster_sz = opt->cluster_sz; 579 } 580 spdk_bs_init(dev, &opts, init_cb, req); 581 } 582 583 static struct spdk_file * 584 file_alloc(struct spdk_filesystem *fs) 585 { 586 struct spdk_file *file; 587 588 file = calloc(1, sizeof(*file)); 589 if (file == NULL) { 590 return NULL; 591 } 592 593 file->tree = calloc(1, sizeof(*file->tree)); 594 if (file->tree == NULL) { 595 free(file); 596 return NULL; 597 } 598 599 file->fs = fs; 600 TAILQ_INIT(&file->open_requests); 601 TAILQ_INIT(&file->sync_requests); 602 pthread_spin_init(&file->lock, 0); 603 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 604 file->priority = SPDK_FILE_PRIORITY_LOW; 605 return file; 606 } 607 608 static void fs_load_done(void *ctx, int bserrno); 609 610 static int 611 _handle_deleted_files(struct spdk_fs_request *req) 612 { 613 struct spdk_fs_cb_args *args = &req->args; 614 struct spdk_filesystem *fs = args->fs; 615 616 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 617 struct spdk_deleted_file *deleted_file; 618 619 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 620 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 621 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 622 free(deleted_file); 623 return 0; 624 } 625 626 return 1; 627 } 628 629 static void 630 fs_load_done(void *ctx, int bserrno) 631 { 632 struct spdk_fs_request *req = ctx; 633 struct spdk_fs_cb_args *args = &req->args; 634 struct spdk_filesystem *fs = args->fs; 635 636 /* The filesystem has been loaded. Now check if there are any files that 637 * were marked for deletion before last unload. Do not complete the 638 * fs_load callback until all of them have been deleted on disk. 639 */ 640 if (_handle_deleted_files(req) == 0) { 641 /* We found a file that's been marked for deleting but not actually 642 * deleted yet. This function will get called again once the delete 643 * operation is completed. 644 */ 645 return; 646 } 647 648 args->fn.fs_op_with_handle(args->arg, fs, 0); 649 free_fs_request(req); 650 651 } 652 653 static void 654 _file_build_trace_arg_name(struct spdk_file *f) 655 { 656 f->trace_arg_name = 0; 657 memcpy(&f->trace_arg_name, f->name, 658 spdk_min(sizeof(f->trace_arg_name), strlen(f->name))); 659 } 660 661 static void 662 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 663 { 664 struct spdk_fs_request *req = ctx; 665 struct spdk_fs_cb_args *args = &req->args; 666 struct spdk_filesystem *fs = args->fs; 667 uint64_t *length; 668 const char *name; 669 uint32_t *is_deleted; 670 size_t value_len; 671 672 if (rc < 0) { 673 args->fn.fs_op_with_handle(args->arg, fs, rc); 674 free_fs_request(req); 675 return; 676 } 677 678 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 679 if (rc < 0) { 680 args->fn.fs_op_with_handle(args->arg, fs, rc); 681 free_fs_request(req); 682 return; 683 } 684 685 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 686 if (rc < 0) { 687 args->fn.fs_op_with_handle(args->arg, fs, rc); 688 free_fs_request(req); 689 return; 690 } 691 692 assert(value_len == 8); 693 694 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 695 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 696 if (rc < 0) { 697 struct spdk_file *f; 698 699 f = file_alloc(fs); 700 if (f == NULL) { 701 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 702 free_fs_request(req); 703 return; 704 } 705 706 f->name = strdup(name); 707 _file_build_trace_arg_name(f); 708 f->blobid = spdk_blob_get_id(blob); 709 f->length = *length; 710 f->length_flushed = *length; 711 f->append_pos = *length; 712 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 713 } else { 714 struct spdk_deleted_file *deleted_file; 715 716 deleted_file = calloc(1, sizeof(*deleted_file)); 717 if (deleted_file == NULL) { 718 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 719 free_fs_request(req); 720 return; 721 } 722 deleted_file->id = spdk_blob_get_id(blob); 723 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 724 } 725 } 726 727 static void 728 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 729 { 730 struct spdk_fs_request *req = ctx; 731 struct spdk_fs_cb_args *args = &req->args; 732 struct spdk_filesystem *fs = args->fs; 733 struct spdk_bs_type bstype; 734 static const struct spdk_bs_type blobfs_type = {"BLOBFS"}; 735 static const struct spdk_bs_type zeros; 736 737 if (bserrno != 0) { 738 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 739 free_fs_request(req); 740 free(fs); 741 return; 742 } 743 744 bstype = spdk_bs_get_bstype(bs); 745 746 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 747 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n"); 748 spdk_bs_set_bstype(bs, blobfs_type); 749 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 750 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n"); 751 SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype)); 752 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 753 free_fs_request(req); 754 free(fs); 755 return; 756 } 757 758 common_fs_bs_init(fs, bs); 759 fs_load_done(req, 0); 760 } 761 762 static void 763 spdk_fs_io_device_unregister(struct spdk_filesystem *fs) 764 { 765 assert(fs != NULL); 766 spdk_io_device_unregister(&fs->md_target, NULL); 767 spdk_io_device_unregister(&fs->sync_target, NULL); 768 spdk_io_device_unregister(&fs->io_target, NULL); 769 free(fs); 770 } 771 772 static void 773 spdk_fs_free_io_channels(struct spdk_filesystem *fs) 774 { 775 assert(fs != NULL); 776 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 777 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 778 } 779 780 void 781 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 782 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 783 { 784 struct spdk_filesystem *fs; 785 struct spdk_fs_cb_args *args; 786 struct spdk_fs_request *req; 787 struct spdk_bs_opts bs_opts; 788 789 fs = fs_alloc(dev, send_request_fn); 790 if (fs == NULL) { 791 cb_fn(cb_arg, NULL, -ENOMEM); 792 return; 793 } 794 795 fs_conf_parse(); 796 797 req = alloc_fs_request(fs->md_target.md_fs_channel); 798 if (req == NULL) { 799 spdk_fs_free_io_channels(fs); 800 spdk_fs_io_device_unregister(fs); 801 cb_fn(cb_arg, NULL, -ENOMEM); 802 return; 803 } 804 805 args = &req->args; 806 args->fn.fs_op_with_handle = cb_fn; 807 args->arg = cb_arg; 808 args->fs = fs; 809 TAILQ_INIT(&args->op.fs_load.deleted_files); 810 spdk_bs_opts_init(&bs_opts); 811 bs_opts.iter_cb_fn = iter_cb; 812 bs_opts.iter_cb_arg = req; 813 spdk_bs_load(dev, &bs_opts, load_cb, req); 814 } 815 816 static void 817 unload_cb(void *ctx, int bserrno) 818 { 819 struct spdk_fs_request *req = ctx; 820 struct spdk_fs_cb_args *args = &req->args; 821 struct spdk_filesystem *fs = args->fs; 822 struct spdk_file *file, *tmp; 823 824 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 825 TAILQ_REMOVE(&fs->files, file, tailq); 826 cache_free_buffers(file); 827 free(file->name); 828 free(file->tree); 829 free(file); 830 } 831 832 pthread_mutex_lock(&g_cache_init_lock); 833 g_fs_count--; 834 if (g_fs_count == 0) { 835 __free_cache(); 836 } 837 pthread_mutex_unlock(&g_cache_init_lock); 838 839 args->fn.fs_op(args->arg, bserrno); 840 free(req); 841 842 spdk_fs_io_device_unregister(fs); 843 } 844 845 void 846 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 847 { 848 struct spdk_fs_request *req; 849 struct spdk_fs_cb_args *args; 850 851 /* 852 * We must free the md_channel before unloading the blobstore, so just 853 * allocate this request from the general heap. 854 */ 855 req = calloc(1, sizeof(*req)); 856 if (req == NULL) { 857 cb_fn(cb_arg, -ENOMEM); 858 return; 859 } 860 861 args = &req->args; 862 args->fn.fs_op = cb_fn; 863 args->arg = cb_arg; 864 args->fs = fs; 865 866 spdk_fs_free_io_channels(fs); 867 spdk_bs_unload(fs->bs, unload_cb, req); 868 } 869 870 static struct spdk_file * 871 fs_find_file(struct spdk_filesystem *fs, const char *name) 872 { 873 struct spdk_file *file; 874 875 TAILQ_FOREACH(file, &fs->files, tailq) { 876 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 877 return file; 878 } 879 } 880 881 return NULL; 882 } 883 884 void 885 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 886 spdk_file_stat_op_complete cb_fn, void *cb_arg) 887 { 888 struct spdk_file_stat stat; 889 struct spdk_file *f = NULL; 890 891 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 892 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 893 return; 894 } 895 896 f = fs_find_file(fs, name); 897 if (f != NULL) { 898 stat.blobid = f->blobid; 899 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 900 cb_fn(cb_arg, &stat, 0); 901 return; 902 } 903 904 cb_fn(cb_arg, NULL, -ENOENT); 905 } 906 907 static void 908 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 909 { 910 struct spdk_fs_request *req = arg; 911 struct spdk_fs_cb_args *args = &req->args; 912 913 args->rc = fserrno; 914 if (fserrno == 0) { 915 memcpy(args->arg, stat, sizeof(*stat)); 916 } 917 sem_post(args->sem); 918 } 919 920 static void 921 __file_stat(void *arg) 922 { 923 struct spdk_fs_request *req = arg; 924 struct spdk_fs_cb_args *args = &req->args; 925 926 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 927 args->fn.stat_op, req); 928 } 929 930 int 931 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 932 const char *name, struct spdk_file_stat *stat) 933 { 934 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 935 struct spdk_fs_request *req; 936 int rc; 937 938 req = alloc_fs_request(channel); 939 if (req == NULL) { 940 return -ENOMEM; 941 } 942 943 req->args.fs = fs; 944 req->args.op.stat.name = name; 945 req->args.fn.stat_op = __copy_stat; 946 req->args.arg = stat; 947 req->args.sem = &channel->sem; 948 channel->send_request(__file_stat, req); 949 sem_wait(&channel->sem); 950 951 rc = req->args.rc; 952 free_fs_request(req); 953 954 return rc; 955 } 956 957 static void 958 fs_create_blob_close_cb(void *ctx, int bserrno) 959 { 960 int rc; 961 struct spdk_fs_request *req = ctx; 962 struct spdk_fs_cb_args *args = &req->args; 963 964 rc = args->rc ? args->rc : bserrno; 965 args->fn.file_op(args->arg, rc); 966 free_fs_request(req); 967 } 968 969 static void 970 fs_create_blob_resize_cb(void *ctx, int bserrno) 971 { 972 struct spdk_fs_request *req = ctx; 973 struct spdk_fs_cb_args *args = &req->args; 974 struct spdk_file *f = args->file; 975 struct spdk_blob *blob = args->op.create.blob; 976 uint64_t length = 0; 977 978 args->rc = bserrno; 979 if (bserrno) { 980 spdk_blob_close(blob, fs_create_blob_close_cb, args); 981 return; 982 } 983 984 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 985 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 986 987 spdk_blob_close(blob, fs_create_blob_close_cb, args); 988 } 989 990 static void 991 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 992 { 993 struct spdk_fs_request *req = ctx; 994 struct spdk_fs_cb_args *args = &req->args; 995 996 if (bserrno) { 997 args->fn.file_op(args->arg, bserrno); 998 free_fs_request(req); 999 return; 1000 } 1001 1002 args->op.create.blob = blob; 1003 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 1004 } 1005 1006 static void 1007 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 1008 { 1009 struct spdk_fs_request *req = ctx; 1010 struct spdk_fs_cb_args *args = &req->args; 1011 struct spdk_file *f = args->file; 1012 1013 if (bserrno) { 1014 args->fn.file_op(args->arg, bserrno); 1015 free_fs_request(req); 1016 return; 1017 } 1018 1019 f->blobid = blobid; 1020 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 1021 } 1022 1023 void 1024 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 1025 spdk_file_op_complete cb_fn, void *cb_arg) 1026 { 1027 struct spdk_file *file; 1028 struct spdk_fs_request *req; 1029 struct spdk_fs_cb_args *args; 1030 1031 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1032 cb_fn(cb_arg, -ENAMETOOLONG); 1033 return; 1034 } 1035 1036 file = fs_find_file(fs, name); 1037 if (file != NULL) { 1038 cb_fn(cb_arg, -EEXIST); 1039 return; 1040 } 1041 1042 file = file_alloc(fs); 1043 if (file == NULL) { 1044 cb_fn(cb_arg, -ENOMEM); 1045 return; 1046 } 1047 1048 req = alloc_fs_request(fs->md_target.md_fs_channel); 1049 if (req == NULL) { 1050 cb_fn(cb_arg, -ENOMEM); 1051 return; 1052 } 1053 1054 args = &req->args; 1055 args->file = file; 1056 args->fn.file_op = cb_fn; 1057 args->arg = cb_arg; 1058 1059 file->name = strdup(name); 1060 _file_build_trace_arg_name(file); 1061 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 1062 } 1063 1064 static void 1065 __fs_create_file_done(void *arg, int fserrno) 1066 { 1067 struct spdk_fs_request *req = arg; 1068 struct spdk_fs_cb_args *args = &req->args; 1069 1070 args->rc = fserrno; 1071 sem_post(args->sem); 1072 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1073 } 1074 1075 static void 1076 __fs_create_file(void *arg) 1077 { 1078 struct spdk_fs_request *req = arg; 1079 struct spdk_fs_cb_args *args = &req->args; 1080 1081 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1082 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1083 } 1084 1085 int 1086 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name) 1087 { 1088 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1089 struct spdk_fs_request *req; 1090 struct spdk_fs_cb_args *args; 1091 int rc; 1092 1093 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1094 1095 req = alloc_fs_request(channel); 1096 if (req == NULL) { 1097 return -ENOMEM; 1098 } 1099 1100 args = &req->args; 1101 args->fs = fs; 1102 args->op.create.name = name; 1103 args->sem = &channel->sem; 1104 fs->send_request(__fs_create_file, req); 1105 sem_wait(&channel->sem); 1106 rc = args->rc; 1107 free_fs_request(req); 1108 1109 return rc; 1110 } 1111 1112 static void 1113 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1114 { 1115 struct spdk_fs_request *req = ctx; 1116 struct spdk_fs_cb_args *args = &req->args; 1117 struct spdk_file *f = args->file; 1118 1119 f->blob = blob; 1120 while (!TAILQ_EMPTY(&f->open_requests)) { 1121 req = TAILQ_FIRST(&f->open_requests); 1122 args = &req->args; 1123 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1124 spdk_trace_record(TRACE_BLOBFS_OPEN, 0, 0, 0, f->trace_arg_name); 1125 args->fn.file_op_with_handle(args->arg, f, bserrno); 1126 free_fs_request(req); 1127 } 1128 } 1129 1130 static void 1131 fs_open_blob_create_cb(void *ctx, int bserrno) 1132 { 1133 struct spdk_fs_request *req = ctx; 1134 struct spdk_fs_cb_args *args = &req->args; 1135 struct spdk_file *file = args->file; 1136 struct spdk_filesystem *fs = args->fs; 1137 1138 if (file == NULL) { 1139 /* 1140 * This is from an open with CREATE flag - the file 1141 * is now created so look it up in the file list for this 1142 * filesystem. 1143 */ 1144 file = fs_find_file(fs, args->op.open.name); 1145 assert(file != NULL); 1146 args->file = file; 1147 } 1148 1149 file->ref_count++; 1150 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1151 if (file->ref_count == 1) { 1152 assert(file->blob == NULL); 1153 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1154 } else if (file->blob != NULL) { 1155 fs_open_blob_done(req, file->blob, 0); 1156 } else { 1157 /* 1158 * The blob open for this file is in progress due to a previous 1159 * open request. When that open completes, it will invoke the 1160 * open callback for this request. 1161 */ 1162 } 1163 } 1164 1165 void 1166 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1167 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1168 { 1169 struct spdk_file *f = NULL; 1170 struct spdk_fs_request *req; 1171 struct spdk_fs_cb_args *args; 1172 1173 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1174 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1175 return; 1176 } 1177 1178 f = fs_find_file(fs, name); 1179 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1180 cb_fn(cb_arg, NULL, -ENOENT); 1181 return; 1182 } 1183 1184 if (f != NULL && f->is_deleted == true) { 1185 cb_fn(cb_arg, NULL, -ENOENT); 1186 return; 1187 } 1188 1189 req = alloc_fs_request(fs->md_target.md_fs_channel); 1190 if (req == NULL) { 1191 cb_fn(cb_arg, NULL, -ENOMEM); 1192 return; 1193 } 1194 1195 args = &req->args; 1196 args->fn.file_op_with_handle = cb_fn; 1197 args->arg = cb_arg; 1198 args->file = f; 1199 args->fs = fs; 1200 args->op.open.name = name; 1201 1202 if (f == NULL) { 1203 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1204 } else { 1205 fs_open_blob_create_cb(req, 0); 1206 } 1207 } 1208 1209 static void 1210 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1211 { 1212 struct spdk_fs_request *req = arg; 1213 struct spdk_fs_cb_args *args = &req->args; 1214 1215 args->file = file; 1216 __wake_caller(args, bserrno); 1217 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1218 } 1219 1220 static void 1221 __fs_open_file(void *arg) 1222 { 1223 struct spdk_fs_request *req = arg; 1224 struct spdk_fs_cb_args *args = &req->args; 1225 1226 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1227 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1228 __fs_open_file_done, req); 1229 } 1230 1231 int 1232 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1233 const char *name, uint32_t flags, struct spdk_file **file) 1234 { 1235 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1236 struct spdk_fs_request *req; 1237 struct spdk_fs_cb_args *args; 1238 int rc; 1239 1240 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1241 1242 req = alloc_fs_request(channel); 1243 if (req == NULL) { 1244 return -ENOMEM; 1245 } 1246 1247 args = &req->args; 1248 args->fs = fs; 1249 args->op.open.name = name; 1250 args->op.open.flags = flags; 1251 args->sem = &channel->sem; 1252 fs->send_request(__fs_open_file, req); 1253 sem_wait(&channel->sem); 1254 rc = args->rc; 1255 if (rc == 0) { 1256 *file = args->file; 1257 } else { 1258 *file = NULL; 1259 } 1260 free_fs_request(req); 1261 1262 return rc; 1263 } 1264 1265 static void 1266 fs_rename_blob_close_cb(void *ctx, int bserrno) 1267 { 1268 struct spdk_fs_request *req = ctx; 1269 struct spdk_fs_cb_args *args = &req->args; 1270 1271 args->fn.fs_op(args->arg, bserrno); 1272 free_fs_request(req); 1273 } 1274 1275 static void 1276 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1277 { 1278 struct spdk_fs_request *req = ctx; 1279 struct spdk_fs_cb_args *args = &req->args; 1280 const char *new_name = args->op.rename.new_name; 1281 1282 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1283 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1284 } 1285 1286 static void 1287 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1288 { 1289 struct spdk_fs_cb_args *args = &req->args; 1290 struct spdk_file *f; 1291 1292 f = fs_find_file(args->fs, args->op.rename.old_name); 1293 if (f == NULL) { 1294 args->fn.fs_op(args->arg, -ENOENT); 1295 free_fs_request(req); 1296 return; 1297 } 1298 1299 free(f->name); 1300 f->name = strdup(args->op.rename.new_name); 1301 _file_build_trace_arg_name(f); 1302 args->file = f; 1303 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1304 } 1305 1306 static void 1307 fs_rename_delete_done(void *arg, int fserrno) 1308 { 1309 __spdk_fs_md_rename_file(arg); 1310 } 1311 1312 void 1313 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1314 const char *old_name, const char *new_name, 1315 spdk_file_op_complete cb_fn, void *cb_arg) 1316 { 1317 struct spdk_file *f; 1318 struct spdk_fs_request *req; 1319 struct spdk_fs_cb_args *args; 1320 1321 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1322 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1323 cb_fn(cb_arg, -ENAMETOOLONG); 1324 return; 1325 } 1326 1327 req = alloc_fs_request(fs->md_target.md_fs_channel); 1328 if (req == NULL) { 1329 cb_fn(cb_arg, -ENOMEM); 1330 return; 1331 } 1332 1333 args = &req->args; 1334 args->fn.fs_op = cb_fn; 1335 args->fs = fs; 1336 args->arg = cb_arg; 1337 args->op.rename.old_name = old_name; 1338 args->op.rename.new_name = new_name; 1339 1340 f = fs_find_file(fs, new_name); 1341 if (f == NULL) { 1342 __spdk_fs_md_rename_file(req); 1343 return; 1344 } 1345 1346 /* 1347 * The rename overwrites an existing file. So delete the existing file, then 1348 * do the actual rename. 1349 */ 1350 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1351 } 1352 1353 static void 1354 __fs_rename_file_done(void *arg, int fserrno) 1355 { 1356 struct spdk_fs_request *req = arg; 1357 struct spdk_fs_cb_args *args = &req->args; 1358 1359 __wake_caller(args, fserrno); 1360 } 1361 1362 static void 1363 __fs_rename_file(void *arg) 1364 { 1365 struct spdk_fs_request *req = arg; 1366 struct spdk_fs_cb_args *args = &req->args; 1367 1368 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1369 __fs_rename_file_done, req); 1370 } 1371 1372 int 1373 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1374 const char *old_name, const char *new_name) 1375 { 1376 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1377 struct spdk_fs_request *req; 1378 struct spdk_fs_cb_args *args; 1379 int rc; 1380 1381 req = alloc_fs_request(channel); 1382 if (req == NULL) { 1383 return -ENOMEM; 1384 } 1385 1386 args = &req->args; 1387 1388 args->fs = fs; 1389 args->op.rename.old_name = old_name; 1390 args->op.rename.new_name = new_name; 1391 args->sem = &channel->sem; 1392 fs->send_request(__fs_rename_file, req); 1393 sem_wait(&channel->sem); 1394 rc = args->rc; 1395 free_fs_request(req); 1396 return rc; 1397 } 1398 1399 static void 1400 blob_delete_cb(void *ctx, int bserrno) 1401 { 1402 struct spdk_fs_request *req = ctx; 1403 struct spdk_fs_cb_args *args = &req->args; 1404 1405 args->fn.file_op(args->arg, bserrno); 1406 free_fs_request(req); 1407 } 1408 1409 void 1410 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1411 spdk_file_op_complete cb_fn, void *cb_arg) 1412 { 1413 struct spdk_file *f; 1414 spdk_blob_id blobid; 1415 struct spdk_fs_request *req; 1416 struct spdk_fs_cb_args *args; 1417 1418 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1419 1420 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1421 cb_fn(cb_arg, -ENAMETOOLONG); 1422 return; 1423 } 1424 1425 f = fs_find_file(fs, name); 1426 if (f == NULL) { 1427 cb_fn(cb_arg, -ENOENT); 1428 return; 1429 } 1430 1431 req = alloc_fs_request(fs->md_target.md_fs_channel); 1432 if (req == NULL) { 1433 cb_fn(cb_arg, -ENOMEM); 1434 return; 1435 } 1436 1437 args = &req->args; 1438 args->fn.file_op = cb_fn; 1439 args->arg = cb_arg; 1440 1441 if (f->ref_count > 0) { 1442 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1443 f->is_deleted = true; 1444 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1445 spdk_blob_sync_md(f->blob, blob_delete_cb, req); 1446 return; 1447 } 1448 1449 TAILQ_REMOVE(&fs->files, f, tailq); 1450 1451 cache_free_buffers(f); 1452 1453 blobid = f->blobid; 1454 1455 free(f->name); 1456 free(f->tree); 1457 free(f); 1458 1459 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1460 } 1461 1462 static void 1463 __fs_delete_file_done(void *arg, int fserrno) 1464 { 1465 struct spdk_fs_request *req = arg; 1466 struct spdk_fs_cb_args *args = &req->args; 1467 1468 __wake_caller(args, fserrno); 1469 } 1470 1471 static void 1472 __fs_delete_file(void *arg) 1473 { 1474 struct spdk_fs_request *req = arg; 1475 struct spdk_fs_cb_args *args = &req->args; 1476 1477 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1478 } 1479 1480 int 1481 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1482 const char *name) 1483 { 1484 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1485 struct spdk_fs_request *req; 1486 struct spdk_fs_cb_args *args; 1487 int rc; 1488 1489 req = alloc_fs_request(channel); 1490 if (req == NULL) { 1491 return -ENOMEM; 1492 } 1493 1494 args = &req->args; 1495 args->fs = fs; 1496 args->op.delete.name = name; 1497 args->sem = &channel->sem; 1498 fs->send_request(__fs_delete_file, req); 1499 sem_wait(&channel->sem); 1500 rc = args->rc; 1501 free_fs_request(req); 1502 1503 return rc; 1504 } 1505 1506 spdk_fs_iter 1507 spdk_fs_iter_first(struct spdk_filesystem *fs) 1508 { 1509 struct spdk_file *f; 1510 1511 f = TAILQ_FIRST(&fs->files); 1512 return f; 1513 } 1514 1515 spdk_fs_iter 1516 spdk_fs_iter_next(spdk_fs_iter iter) 1517 { 1518 struct spdk_file *f = iter; 1519 1520 if (f == NULL) { 1521 return NULL; 1522 } 1523 1524 f = TAILQ_NEXT(f, tailq); 1525 return f; 1526 } 1527 1528 const char * 1529 spdk_file_get_name(struct spdk_file *file) 1530 { 1531 return file->name; 1532 } 1533 1534 uint64_t 1535 spdk_file_get_length(struct spdk_file *file) 1536 { 1537 uint64_t length; 1538 1539 assert(file != NULL); 1540 1541 length = file->append_pos >= file->length ? file->append_pos : file->length; 1542 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length); 1543 return length; 1544 } 1545 1546 static void 1547 fs_truncate_complete_cb(void *ctx, int bserrno) 1548 { 1549 struct spdk_fs_request *req = ctx; 1550 struct spdk_fs_cb_args *args = &req->args; 1551 1552 args->fn.file_op(args->arg, bserrno); 1553 free_fs_request(req); 1554 } 1555 1556 static void 1557 fs_truncate_resize_cb(void *ctx, int bserrno) 1558 { 1559 struct spdk_fs_request *req = ctx; 1560 struct spdk_fs_cb_args *args = &req->args; 1561 struct spdk_file *file = args->file; 1562 uint64_t *length = &args->op.truncate.length; 1563 1564 if (bserrno) { 1565 args->fn.file_op(args->arg, bserrno); 1566 free_fs_request(req); 1567 return; 1568 } 1569 1570 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1571 1572 file->length = *length; 1573 if (file->append_pos > file->length) { 1574 file->append_pos = file->length; 1575 } 1576 1577 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, req); 1578 } 1579 1580 static uint64_t 1581 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1582 { 1583 return (length + cluster_sz - 1) / cluster_sz; 1584 } 1585 1586 void 1587 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1588 spdk_file_op_complete cb_fn, void *cb_arg) 1589 { 1590 struct spdk_filesystem *fs; 1591 size_t num_clusters; 1592 struct spdk_fs_request *req; 1593 struct spdk_fs_cb_args *args; 1594 1595 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1596 if (length == file->length) { 1597 cb_fn(cb_arg, 0); 1598 return; 1599 } 1600 1601 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1602 if (req == NULL) { 1603 cb_fn(cb_arg, -ENOMEM); 1604 return; 1605 } 1606 1607 args = &req->args; 1608 args->fn.file_op = cb_fn; 1609 args->arg = cb_arg; 1610 args->file = file; 1611 args->op.truncate.length = length; 1612 fs = file->fs; 1613 1614 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1615 1616 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1617 } 1618 1619 static void 1620 __truncate(void *arg) 1621 { 1622 struct spdk_fs_request *req = arg; 1623 struct spdk_fs_cb_args *args = &req->args; 1624 1625 spdk_file_truncate_async(args->file, args->op.truncate.length, 1626 args->fn.file_op, args); 1627 } 1628 1629 int 1630 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 1631 uint64_t length) 1632 { 1633 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1634 struct spdk_fs_request *req; 1635 struct spdk_fs_cb_args *args; 1636 int rc; 1637 1638 req = alloc_fs_request(channel); 1639 if (req == NULL) { 1640 return -ENOMEM; 1641 } 1642 1643 args = &req->args; 1644 1645 args->file = file; 1646 args->op.truncate.length = length; 1647 args->fn.file_op = __wake_caller; 1648 args->sem = &channel->sem; 1649 1650 channel->send_request(__truncate, req); 1651 sem_wait(&channel->sem); 1652 rc = args->rc; 1653 free_fs_request(req); 1654 1655 return rc; 1656 } 1657 1658 static void 1659 __rw_done(void *ctx, int bserrno) 1660 { 1661 struct spdk_fs_request *req = ctx; 1662 struct spdk_fs_cb_args *args = &req->args; 1663 1664 spdk_free(args->op.rw.pin_buf); 1665 args->fn.file_op(args->arg, bserrno); 1666 free_fs_request(req); 1667 } 1668 1669 static void 1670 __read_done(void *ctx, int bserrno) 1671 { 1672 struct spdk_fs_request *req = ctx; 1673 struct spdk_fs_cb_args *args = &req->args; 1674 1675 assert(req != NULL); 1676 if (args->op.rw.is_read) { 1677 memcpy(args->iovs[0].iov_base, 1678 args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)), 1679 args->iovs[0].iov_len); 1680 __rw_done(req, 0); 1681 } else { 1682 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)), 1683 args->iovs[0].iov_base, 1684 args->iovs[0].iov_len); 1685 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1686 args->op.rw.pin_buf, 1687 args->op.rw.start_lba, args->op.rw.num_lba, 1688 __rw_done, req); 1689 } 1690 } 1691 1692 static void 1693 __do_blob_read(void *ctx, int fserrno) 1694 { 1695 struct spdk_fs_request *req = ctx; 1696 struct spdk_fs_cb_args *args = &req->args; 1697 1698 if (fserrno) { 1699 __rw_done(req, fserrno); 1700 return; 1701 } 1702 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1703 args->op.rw.pin_buf, 1704 args->op.rw.start_lba, args->op.rw.num_lba, 1705 __read_done, req); 1706 } 1707 1708 static void 1709 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1710 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1711 { 1712 uint64_t end_lba; 1713 1714 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1715 *start_lba = offset / *lba_size; 1716 end_lba = (offset + length - 1) / *lba_size; 1717 *num_lba = (end_lba - *start_lba + 1); 1718 } 1719 1720 static void 1721 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1722 void *payload, uint64_t offset, uint64_t length, 1723 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1724 { 1725 struct spdk_fs_request *req; 1726 struct spdk_fs_cb_args *args; 1727 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1728 uint64_t start_lba, num_lba, pin_buf_length; 1729 uint32_t lba_size; 1730 1731 if (is_read && offset + length > file->length) { 1732 cb_fn(cb_arg, -EINVAL); 1733 return; 1734 } 1735 1736 req = alloc_fs_request_with_iov(channel, 1); 1737 if (req == NULL) { 1738 cb_fn(cb_arg, -ENOMEM); 1739 return; 1740 } 1741 1742 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1743 1744 args = &req->args; 1745 args->fn.file_op = cb_fn; 1746 args->arg = cb_arg; 1747 args->file = file; 1748 args->op.rw.channel = channel->bs_channel; 1749 args->iovs[0].iov_base = payload; 1750 args->iovs[0].iov_len = (size_t)length; 1751 args->op.rw.is_read = is_read; 1752 args->op.rw.offset = offset; 1753 args->op.rw.blocklen = lba_size; 1754 1755 pin_buf_length = num_lba * lba_size; 1756 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL, 1757 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1758 if (args->op.rw.pin_buf == NULL) { 1759 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1760 file->name, offset, length); 1761 free_fs_request(req); 1762 cb_fn(cb_arg, -ENOMEM); 1763 return; 1764 } 1765 1766 args->op.rw.start_lba = start_lba; 1767 args->op.rw.num_lba = num_lba; 1768 1769 if (!is_read && file->length < offset + length) { 1770 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1771 } else { 1772 __do_blob_read(req, 0); 1773 } 1774 } 1775 1776 void 1777 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1778 void *payload, uint64_t offset, uint64_t length, 1779 spdk_file_op_complete cb_fn, void *cb_arg) 1780 { 1781 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1782 } 1783 1784 void 1785 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1786 void *payload, uint64_t offset, uint64_t length, 1787 spdk_file_op_complete cb_fn, void *cb_arg) 1788 { 1789 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1790 file->name, offset, length); 1791 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1792 } 1793 1794 struct spdk_io_channel * 1795 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1796 { 1797 struct spdk_io_channel *io_channel; 1798 struct spdk_fs_channel *fs_channel; 1799 1800 io_channel = spdk_get_io_channel(&fs->io_target); 1801 fs_channel = spdk_io_channel_get_ctx(io_channel); 1802 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1803 fs_channel->send_request = __send_request_direct; 1804 1805 return io_channel; 1806 } 1807 1808 void 1809 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1810 { 1811 spdk_put_io_channel(channel); 1812 } 1813 1814 struct spdk_fs_thread_ctx * 1815 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs) 1816 { 1817 struct spdk_fs_thread_ctx *ctx; 1818 1819 ctx = calloc(1, sizeof(*ctx)); 1820 if (!ctx) { 1821 return NULL; 1822 } 1823 1824 _spdk_fs_channel_create(fs, &ctx->ch, 512); 1825 1826 ctx->ch.send_request = fs->send_request; 1827 ctx->ch.sync = 1; 1828 pthread_spin_init(&ctx->ch.lock, 0); 1829 1830 return ctx; 1831 } 1832 1833 1834 void 1835 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx) 1836 { 1837 assert(ctx->ch.sync == 1); 1838 1839 while (true) { 1840 pthread_spin_lock(&ctx->ch.lock); 1841 if (ctx->ch.outstanding_reqs == 0) { 1842 pthread_spin_unlock(&ctx->ch.lock); 1843 break; 1844 } 1845 pthread_spin_unlock(&ctx->ch.lock); 1846 usleep(1000); 1847 } 1848 1849 _spdk_fs_channel_destroy(NULL, &ctx->ch); 1850 free(ctx); 1851 } 1852 1853 void 1854 spdk_fs_set_cache_size(uint64_t size_in_mb) 1855 { 1856 g_fs_cache_size = size_in_mb * 1024 * 1024; 1857 } 1858 1859 uint64_t 1860 spdk_fs_get_cache_size(void) 1861 { 1862 return g_fs_cache_size / (1024 * 1024); 1863 } 1864 1865 static void __file_flush(void *ctx); 1866 1867 static void * 1868 alloc_cache_memory_buffer(struct spdk_file *context) 1869 { 1870 struct spdk_file *file; 1871 void *buf; 1872 1873 buf = spdk_mempool_get(g_cache_pool); 1874 if (buf != NULL) { 1875 return buf; 1876 } 1877 1878 pthread_spin_lock(&g_caches_lock); 1879 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1880 if (!file->open_for_writing && 1881 file->priority == SPDK_FILE_PRIORITY_LOW && 1882 file != context) { 1883 break; 1884 } 1885 } 1886 pthread_spin_unlock(&g_caches_lock); 1887 if (file != NULL) { 1888 cache_free_buffers(file); 1889 buf = spdk_mempool_get(g_cache_pool); 1890 if (buf != NULL) { 1891 return buf; 1892 } 1893 } 1894 1895 pthread_spin_lock(&g_caches_lock); 1896 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1897 if (!file->open_for_writing && file != context) { 1898 break; 1899 } 1900 } 1901 pthread_spin_unlock(&g_caches_lock); 1902 if (file != NULL) { 1903 cache_free_buffers(file); 1904 buf = spdk_mempool_get(g_cache_pool); 1905 if (buf != NULL) { 1906 return buf; 1907 } 1908 } 1909 1910 pthread_spin_lock(&g_caches_lock); 1911 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1912 if (file != context) { 1913 break; 1914 } 1915 } 1916 pthread_spin_unlock(&g_caches_lock); 1917 if (file != NULL) { 1918 cache_free_buffers(file); 1919 buf = spdk_mempool_get(g_cache_pool); 1920 if (buf != NULL) { 1921 return buf; 1922 } 1923 } 1924 1925 return NULL; 1926 } 1927 1928 static struct cache_buffer * 1929 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1930 { 1931 struct cache_buffer *buf; 1932 int count = 0; 1933 1934 buf = calloc(1, sizeof(*buf)); 1935 if (buf == NULL) { 1936 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 1937 return NULL; 1938 } 1939 1940 buf->buf = alloc_cache_memory_buffer(file); 1941 while (buf->buf == NULL) { 1942 /* 1943 * TODO: alloc_cache_memory_buffer() should eventually free 1944 * some buffers. Need a more sophisticated check here, instead 1945 * of just bailing if 100 tries does not result in getting a 1946 * free buffer. This will involve using the sync channel's 1947 * semaphore to block until a buffer becomes available. 1948 */ 1949 if (count++ == 100) { 1950 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 1951 file, offset); 1952 free(buf); 1953 return NULL; 1954 } 1955 buf->buf = alloc_cache_memory_buffer(file); 1956 } 1957 1958 buf->buf_size = CACHE_BUFFER_SIZE; 1959 buf->offset = offset; 1960 1961 pthread_spin_lock(&g_caches_lock); 1962 if (file->tree->present_mask == 0) { 1963 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1964 } 1965 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1966 pthread_spin_unlock(&g_caches_lock); 1967 1968 return buf; 1969 } 1970 1971 static struct cache_buffer * 1972 cache_append_buffer(struct spdk_file *file) 1973 { 1974 struct cache_buffer *last; 1975 1976 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1977 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1978 1979 last = cache_insert_buffer(file, file->append_pos); 1980 if (last == NULL) { 1981 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 1982 return NULL; 1983 } 1984 1985 file->last = last; 1986 1987 return last; 1988 } 1989 1990 static void __check_sync_reqs(struct spdk_file *file); 1991 1992 static void 1993 __file_cache_finish_sync(void *ctx, int bserrno) 1994 { 1995 struct spdk_file *file = ctx; 1996 struct spdk_fs_request *sync_req; 1997 struct spdk_fs_cb_args *sync_args; 1998 1999 pthread_spin_lock(&file->lock); 2000 sync_req = TAILQ_FIRST(&file->sync_requests); 2001 sync_args = &sync_req->args; 2002 assert(sync_args->op.sync.offset <= file->length_flushed); 2003 spdk_trace_record(TRACE_BLOBFS_XATTR_END, 0, sync_args->op.sync.offset, 2004 0, file->trace_arg_name); 2005 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 2006 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 2007 pthread_spin_unlock(&file->lock); 2008 2009 sync_args->fn.file_op(sync_args->arg, bserrno); 2010 __check_sync_reqs(file); 2011 2012 pthread_spin_lock(&file->lock); 2013 free_fs_request(sync_req); 2014 pthread_spin_unlock(&file->lock); 2015 } 2016 2017 static void 2018 __check_sync_reqs(struct spdk_file *file) 2019 { 2020 struct spdk_fs_request *sync_req; 2021 2022 pthread_spin_lock(&file->lock); 2023 2024 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 2025 if (sync_req->args.op.sync.offset <= file->length_flushed) { 2026 break; 2027 } 2028 } 2029 2030 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 2031 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 2032 sync_req->args.op.sync.xattr_in_progress = true; 2033 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 2034 sizeof(file->length_flushed)); 2035 2036 pthread_spin_unlock(&file->lock); 2037 spdk_trace_record(TRACE_BLOBFS_XATTR_START, 0, file->length_flushed, 2038 0, file->trace_arg_name); 2039 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, file); 2040 } else { 2041 pthread_spin_unlock(&file->lock); 2042 } 2043 } 2044 2045 static void 2046 __file_flush_done(void *ctx, int bserrno) 2047 { 2048 struct spdk_fs_request *req = ctx; 2049 struct spdk_fs_cb_args *args = &req->args; 2050 struct spdk_file *file = args->file; 2051 struct cache_buffer *next = args->op.flush.cache_buffer; 2052 2053 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 2054 2055 pthread_spin_lock(&file->lock); 2056 next->in_progress = false; 2057 next->bytes_flushed += args->op.flush.length; 2058 file->length_flushed += args->op.flush.length; 2059 if (file->length_flushed > file->length) { 2060 file->length = file->length_flushed; 2061 } 2062 if (next->bytes_flushed == next->buf_size) { 2063 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 2064 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 2065 } 2066 2067 /* 2068 * Assert that there is no cached data that extends past the end of the underlying 2069 * blob. 2070 */ 2071 assert(next == NULL || next->offset < __file_get_blob_size(file) || 2072 next->bytes_filled == 0); 2073 2074 pthread_spin_unlock(&file->lock); 2075 2076 __check_sync_reqs(file); 2077 2078 __file_flush(req); 2079 } 2080 2081 static void 2082 __file_flush(void *ctx) 2083 { 2084 struct spdk_fs_request *req = ctx; 2085 struct spdk_fs_cb_args *args = &req->args; 2086 struct spdk_file *file = args->file; 2087 struct cache_buffer *next; 2088 uint64_t offset, length, start_lba, num_lba; 2089 uint32_t lba_size; 2090 2091 pthread_spin_lock(&file->lock); 2092 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 2093 if (next == NULL || next->in_progress) { 2094 /* 2095 * There is either no data to flush, or a flush I/O is already in 2096 * progress. So return immediately - if a flush I/O is in 2097 * progress we will flush more data after that is completed. 2098 */ 2099 free_fs_request(req); 2100 if (next == NULL) { 2101 /* 2102 * For cases where a file's cache was evicted, and then the 2103 * file was later appended, we will write the data directly 2104 * to disk and bypass cache. So just update length_flushed 2105 * here to reflect that all data was already written to disk. 2106 */ 2107 file->length_flushed = file->append_pos; 2108 } 2109 pthread_spin_unlock(&file->lock); 2110 if (next == NULL) { 2111 /* 2112 * There is no data to flush, but we still need to check for any 2113 * outstanding sync requests to make sure metadata gets updated. 2114 */ 2115 __check_sync_reqs(file); 2116 } 2117 return; 2118 } 2119 2120 offset = next->offset + next->bytes_flushed; 2121 length = next->bytes_filled - next->bytes_flushed; 2122 if (length == 0) { 2123 free_fs_request(req); 2124 pthread_spin_unlock(&file->lock); 2125 return; 2126 } 2127 args->op.flush.length = length; 2128 args->op.flush.cache_buffer = next; 2129 2130 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2131 2132 next->in_progress = true; 2133 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2134 offset, length, start_lba, num_lba); 2135 pthread_spin_unlock(&file->lock); 2136 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2137 next->buf + (start_lba * lba_size) - next->offset, 2138 start_lba, num_lba, __file_flush_done, req); 2139 } 2140 2141 static void 2142 __file_extend_done(void *arg, int bserrno) 2143 { 2144 struct spdk_fs_cb_args *args = arg; 2145 2146 __wake_caller(args, bserrno); 2147 } 2148 2149 static void 2150 __file_extend_resize_cb(void *_args, int bserrno) 2151 { 2152 struct spdk_fs_cb_args *args = _args; 2153 struct spdk_file *file = args->file; 2154 2155 if (bserrno) { 2156 __wake_caller(args, bserrno); 2157 return; 2158 } 2159 2160 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2161 } 2162 2163 static void 2164 __file_extend_blob(void *_args) 2165 { 2166 struct spdk_fs_cb_args *args = _args; 2167 struct spdk_file *file = args->file; 2168 2169 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2170 } 2171 2172 static void 2173 __rw_from_file_done(void *ctx, int bserrno) 2174 { 2175 struct spdk_fs_request *req = ctx; 2176 2177 __wake_caller(&req->args, bserrno); 2178 free_fs_request(req); 2179 } 2180 2181 static void 2182 __rw_from_file(void *ctx) 2183 { 2184 struct spdk_fs_request *req = ctx; 2185 struct spdk_fs_cb_args *args = &req->args; 2186 struct spdk_file *file = args->file; 2187 2188 if (args->op.rw.is_read) { 2189 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2190 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2191 __rw_from_file_done, req); 2192 } else { 2193 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->iovs[0].iov_base, 2194 args->op.rw.offset, (uint64_t)args->iovs[0].iov_len, 2195 __rw_from_file_done, req); 2196 } 2197 } 2198 2199 static int 2200 __send_rw_from_file(struct spdk_file *file, void *payload, 2201 uint64_t offset, uint64_t length, bool is_read, 2202 struct spdk_fs_channel *channel) 2203 { 2204 struct spdk_fs_request *req; 2205 struct spdk_fs_cb_args *args; 2206 2207 req = alloc_fs_request_with_iov(channel, 1); 2208 if (req == NULL) { 2209 sem_post(&channel->sem); 2210 return -ENOMEM; 2211 } 2212 2213 args = &req->args; 2214 args->file = file; 2215 args->sem = &channel->sem; 2216 args->iovs[0].iov_base = payload; 2217 args->iovs[0].iov_len = (size_t)length; 2218 args->op.rw.offset = offset; 2219 args->op.rw.is_read = is_read; 2220 file->fs->send_request(__rw_from_file, req); 2221 return 0; 2222 } 2223 2224 int 2225 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2226 void *payload, uint64_t offset, uint64_t length) 2227 { 2228 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2229 struct spdk_fs_request *flush_req; 2230 uint64_t rem_length, copy, blob_size, cluster_sz; 2231 uint32_t cache_buffers_filled = 0; 2232 uint8_t *cur_payload; 2233 struct cache_buffer *last; 2234 2235 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2236 2237 if (length == 0) { 2238 return 0; 2239 } 2240 2241 if (offset != file->append_pos) { 2242 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2243 return -EINVAL; 2244 } 2245 2246 pthread_spin_lock(&file->lock); 2247 file->open_for_writing = true; 2248 2249 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2250 cache_append_buffer(file); 2251 } 2252 2253 if (file->last == NULL) { 2254 int rc; 2255 2256 file->append_pos += length; 2257 pthread_spin_unlock(&file->lock); 2258 rc = __send_rw_from_file(file, payload, offset, length, false, channel); 2259 sem_wait(&channel->sem); 2260 return rc; 2261 } 2262 2263 blob_size = __file_get_blob_size(file); 2264 2265 if ((offset + length) > blob_size) { 2266 struct spdk_fs_cb_args extend_args = {}; 2267 2268 cluster_sz = file->fs->bs_opts.cluster_sz; 2269 extend_args.sem = &channel->sem; 2270 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2271 extend_args.file = file; 2272 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2273 pthread_spin_unlock(&file->lock); 2274 file->fs->send_request(__file_extend_blob, &extend_args); 2275 sem_wait(&channel->sem); 2276 if (extend_args.rc) { 2277 return extend_args.rc; 2278 } 2279 } 2280 2281 flush_req = alloc_fs_request(channel); 2282 if (flush_req == NULL) { 2283 pthread_spin_unlock(&file->lock); 2284 return -ENOMEM; 2285 } 2286 2287 last = file->last; 2288 rem_length = length; 2289 cur_payload = payload; 2290 while (rem_length > 0) { 2291 copy = last->buf_size - last->bytes_filled; 2292 if (copy > rem_length) { 2293 copy = rem_length; 2294 } 2295 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2296 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2297 file->append_pos += copy; 2298 if (file->length < file->append_pos) { 2299 file->length = file->append_pos; 2300 } 2301 cur_payload += copy; 2302 last->bytes_filled += copy; 2303 rem_length -= copy; 2304 if (last->bytes_filled == last->buf_size) { 2305 cache_buffers_filled++; 2306 last = cache_append_buffer(file); 2307 if (last == NULL) { 2308 BLOBFS_TRACE(file, "nomem\n"); 2309 free_fs_request(flush_req); 2310 pthread_spin_unlock(&file->lock); 2311 return -ENOMEM; 2312 } 2313 } 2314 } 2315 2316 pthread_spin_unlock(&file->lock); 2317 2318 if (cache_buffers_filled == 0) { 2319 free_fs_request(flush_req); 2320 return 0; 2321 } 2322 2323 flush_req->args.file = file; 2324 file->fs->send_request(__file_flush, flush_req); 2325 return 0; 2326 } 2327 2328 static void 2329 __readahead_done(void *ctx, int bserrno) 2330 { 2331 struct spdk_fs_request *req = ctx; 2332 struct spdk_fs_cb_args *args = &req->args; 2333 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2334 struct spdk_file *file = args->file; 2335 2336 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2337 2338 pthread_spin_lock(&file->lock); 2339 cache_buffer->bytes_filled = args->op.readahead.length; 2340 cache_buffer->bytes_flushed = args->op.readahead.length; 2341 cache_buffer->in_progress = false; 2342 pthread_spin_unlock(&file->lock); 2343 2344 free_fs_request(req); 2345 } 2346 2347 static void 2348 __readahead(void *ctx) 2349 { 2350 struct spdk_fs_request *req = ctx; 2351 struct spdk_fs_cb_args *args = &req->args; 2352 struct spdk_file *file = args->file; 2353 uint64_t offset, length, start_lba, num_lba; 2354 uint32_t lba_size; 2355 2356 offset = args->op.readahead.offset; 2357 length = args->op.readahead.length; 2358 assert(length > 0); 2359 2360 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2361 2362 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2363 offset, length, start_lba, num_lba); 2364 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2365 args->op.readahead.cache_buffer->buf, 2366 start_lba, num_lba, __readahead_done, req); 2367 } 2368 2369 static uint64_t 2370 __next_cache_buffer_offset(uint64_t offset) 2371 { 2372 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2373 } 2374 2375 static void 2376 check_readahead(struct spdk_file *file, uint64_t offset, 2377 struct spdk_fs_channel *channel) 2378 { 2379 struct spdk_fs_request *req; 2380 struct spdk_fs_cb_args *args; 2381 2382 offset = __next_cache_buffer_offset(offset); 2383 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2384 return; 2385 } 2386 2387 req = alloc_fs_request(channel); 2388 if (req == NULL) { 2389 return; 2390 } 2391 args = &req->args; 2392 2393 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2394 2395 args->file = file; 2396 args->op.readahead.offset = offset; 2397 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2398 if (!args->op.readahead.cache_buffer) { 2399 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2400 free_fs_request(req); 2401 return; 2402 } 2403 2404 args->op.readahead.cache_buffer->in_progress = true; 2405 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2406 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2407 } else { 2408 args->op.readahead.length = CACHE_BUFFER_SIZE; 2409 } 2410 file->fs->send_request(__readahead, req); 2411 } 2412 2413 static int 2414 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, 2415 struct spdk_fs_channel *channel) 2416 { 2417 struct cache_buffer *buf; 2418 int rc; 2419 2420 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2421 if (buf == NULL) { 2422 pthread_spin_unlock(&file->lock); 2423 rc = __send_rw_from_file(file, payload, offset, length, true, channel); 2424 pthread_spin_lock(&file->lock); 2425 return rc; 2426 } 2427 2428 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2429 length = buf->offset + buf->bytes_filled - offset; 2430 } 2431 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2432 memcpy(payload, &buf->buf[offset - buf->offset], length); 2433 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2434 pthread_spin_lock(&g_caches_lock); 2435 spdk_tree_remove_buffer(file->tree, buf); 2436 if (file->tree->present_mask == 0) { 2437 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2438 } 2439 pthread_spin_unlock(&g_caches_lock); 2440 } 2441 2442 sem_post(&channel->sem); 2443 return 0; 2444 } 2445 2446 int64_t 2447 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2448 void *payload, uint64_t offset, uint64_t length) 2449 { 2450 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2451 uint64_t final_offset, final_length; 2452 uint32_t sub_reads = 0; 2453 int rc = 0; 2454 2455 pthread_spin_lock(&file->lock); 2456 2457 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2458 2459 file->open_for_writing = false; 2460 2461 if (length == 0 || offset >= file->append_pos) { 2462 pthread_spin_unlock(&file->lock); 2463 return 0; 2464 } 2465 2466 if (offset + length > file->append_pos) { 2467 length = file->append_pos - offset; 2468 } 2469 2470 if (offset != file->next_seq_offset) { 2471 file->seq_byte_count = 0; 2472 } 2473 file->seq_byte_count += length; 2474 file->next_seq_offset = offset + length; 2475 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2476 check_readahead(file, offset, channel); 2477 check_readahead(file, offset + CACHE_BUFFER_SIZE, channel); 2478 } 2479 2480 final_length = 0; 2481 final_offset = offset + length; 2482 while (offset < final_offset) { 2483 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2484 if (length > (final_offset - offset)) { 2485 length = final_offset - offset; 2486 } 2487 rc = __file_read(file, payload, offset, length, channel); 2488 if (rc == 0) { 2489 final_length += length; 2490 } else { 2491 break; 2492 } 2493 payload += length; 2494 offset += length; 2495 sub_reads++; 2496 } 2497 pthread_spin_unlock(&file->lock); 2498 while (sub_reads-- > 0) { 2499 sem_wait(&channel->sem); 2500 } 2501 if (rc == 0) { 2502 return final_length; 2503 } else { 2504 return rc; 2505 } 2506 } 2507 2508 static void 2509 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2510 spdk_file_op_complete cb_fn, void *cb_arg) 2511 { 2512 struct spdk_fs_request *sync_req; 2513 struct spdk_fs_request *flush_req; 2514 struct spdk_fs_cb_args *sync_args; 2515 struct spdk_fs_cb_args *flush_args; 2516 2517 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2518 2519 pthread_spin_lock(&file->lock); 2520 if (file->append_pos <= file->length_flushed) { 2521 BLOBFS_TRACE(file, "done - no data to flush\n"); 2522 pthread_spin_unlock(&file->lock); 2523 cb_fn(cb_arg, 0); 2524 return; 2525 } 2526 2527 sync_req = alloc_fs_request(channel); 2528 if (!sync_req) { 2529 pthread_spin_unlock(&file->lock); 2530 cb_fn(cb_arg, -ENOMEM); 2531 return; 2532 } 2533 sync_args = &sync_req->args; 2534 2535 flush_req = alloc_fs_request(channel); 2536 if (!flush_req) { 2537 pthread_spin_unlock(&file->lock); 2538 cb_fn(cb_arg, -ENOMEM); 2539 return; 2540 } 2541 flush_args = &flush_req->args; 2542 2543 sync_args->file = file; 2544 sync_args->fn.file_op = cb_fn; 2545 sync_args->arg = cb_arg; 2546 sync_args->op.sync.offset = file->append_pos; 2547 sync_args->op.sync.xattr_in_progress = false; 2548 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2549 pthread_spin_unlock(&file->lock); 2550 2551 flush_args->file = file; 2552 channel->send_request(__file_flush, flush_req); 2553 } 2554 2555 int 2556 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2557 { 2558 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2559 struct spdk_fs_cb_args args = {}; 2560 2561 args.sem = &channel->sem; 2562 _file_sync(file, channel, __wake_caller, &args); 2563 sem_wait(&channel->sem); 2564 2565 return args.rc; 2566 } 2567 2568 void 2569 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2570 spdk_file_op_complete cb_fn, void *cb_arg) 2571 { 2572 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2573 2574 _file_sync(file, channel, cb_fn, cb_arg); 2575 } 2576 2577 void 2578 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2579 { 2580 BLOBFS_TRACE(file, "priority=%u\n", priority); 2581 file->priority = priority; 2582 2583 } 2584 2585 /* 2586 * Close routines 2587 */ 2588 2589 static void 2590 __file_close_async_done(void *ctx, int bserrno) 2591 { 2592 struct spdk_fs_request *req = ctx; 2593 struct spdk_fs_cb_args *args = &req->args; 2594 struct spdk_file *file = args->file; 2595 2596 spdk_trace_record(TRACE_BLOBFS_CLOSE, 0, 0, 0, file->trace_arg_name); 2597 2598 if (file->is_deleted) { 2599 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2600 return; 2601 } 2602 2603 args->fn.file_op(args->arg, bserrno); 2604 free_fs_request(req); 2605 } 2606 2607 static void 2608 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2609 { 2610 struct spdk_blob *blob; 2611 2612 pthread_spin_lock(&file->lock); 2613 if (file->ref_count == 0) { 2614 pthread_spin_unlock(&file->lock); 2615 __file_close_async_done(req, -EBADF); 2616 return; 2617 } 2618 2619 file->ref_count--; 2620 if (file->ref_count > 0) { 2621 pthread_spin_unlock(&file->lock); 2622 req->args.fn.file_op(req->args.arg, 0); 2623 free_fs_request(req); 2624 return; 2625 } 2626 2627 pthread_spin_unlock(&file->lock); 2628 2629 blob = file->blob; 2630 file->blob = NULL; 2631 spdk_blob_close(blob, __file_close_async_done, req); 2632 } 2633 2634 static void 2635 __file_close_async__sync_done(void *arg, int fserrno) 2636 { 2637 struct spdk_fs_request *req = arg; 2638 struct spdk_fs_cb_args *args = &req->args; 2639 2640 __file_close_async(args->file, req); 2641 } 2642 2643 void 2644 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2645 { 2646 struct spdk_fs_request *req; 2647 struct spdk_fs_cb_args *args; 2648 2649 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2650 if (req == NULL) { 2651 cb_fn(cb_arg, -ENOMEM); 2652 return; 2653 } 2654 2655 args = &req->args; 2656 args->file = file; 2657 args->fn.file_op = cb_fn; 2658 args->arg = cb_arg; 2659 2660 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2661 } 2662 2663 static void 2664 __file_close(void *arg) 2665 { 2666 struct spdk_fs_request *req = arg; 2667 struct spdk_fs_cb_args *args = &req->args; 2668 struct spdk_file *file = args->file; 2669 2670 __file_close_async(file, req); 2671 } 2672 2673 int 2674 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2675 { 2676 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2677 struct spdk_fs_request *req; 2678 struct spdk_fs_cb_args *args; 2679 2680 req = alloc_fs_request(channel); 2681 if (req == NULL) { 2682 return -ENOMEM; 2683 } 2684 2685 args = &req->args; 2686 2687 spdk_file_sync(file, ctx); 2688 BLOBFS_TRACE(file, "name=%s\n", file->name); 2689 args->file = file; 2690 args->sem = &channel->sem; 2691 args->fn.file_op = __wake_caller; 2692 args->arg = req; 2693 channel->send_request(__file_close, req); 2694 sem_wait(&channel->sem); 2695 2696 return args->rc; 2697 } 2698 2699 int 2700 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2701 { 2702 if (size < sizeof(spdk_blob_id)) { 2703 return -EINVAL; 2704 } 2705 2706 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2707 2708 return sizeof(spdk_blob_id); 2709 } 2710 2711 static void 2712 cache_free_buffers(struct spdk_file *file) 2713 { 2714 BLOBFS_TRACE(file, "free=%s\n", file->name); 2715 pthread_spin_lock(&file->lock); 2716 pthread_spin_lock(&g_caches_lock); 2717 if (file->tree->present_mask == 0) { 2718 pthread_spin_unlock(&g_caches_lock); 2719 pthread_spin_unlock(&file->lock); 2720 return; 2721 } 2722 spdk_tree_free_buffers(file->tree); 2723 2724 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2725 /* If not freed, put it in the end of the queue */ 2726 if (file->tree->present_mask != 0) { 2727 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2728 } 2729 file->last = NULL; 2730 pthread_spin_unlock(&g_caches_lock); 2731 pthread_spin_unlock(&file->lock); 2732 } 2733 2734 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2735 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2736