1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "blobfs_internal.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/thread.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 47 #define BLOBFS_TRACE(file, str, args...) \ 48 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 49 50 #define BLOBFS_TRACE_RW(file, str, args...) \ 51 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 55 56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 57 static struct spdk_mempool *g_cache_pool; 58 static TAILQ_HEAD(, spdk_file) g_caches; 59 static int g_fs_count = 0; 60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 61 static pthread_spinlock_t g_caches_lock; 62 63 void 64 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 65 { 66 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 67 free(cache_buffer); 68 } 69 70 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 71 72 struct spdk_file { 73 struct spdk_filesystem *fs; 74 struct spdk_blob *blob; 75 char *name; 76 uint64_t length; 77 bool is_deleted; 78 bool open_for_writing; 79 uint64_t length_flushed; 80 uint64_t append_pos; 81 uint64_t seq_byte_count; 82 uint64_t next_seq_offset; 83 uint32_t priority; 84 TAILQ_ENTRY(spdk_file) tailq; 85 spdk_blob_id blobid; 86 uint32_t ref_count; 87 pthread_spinlock_t lock; 88 struct cache_buffer *last; 89 struct cache_tree *tree; 90 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 91 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 92 TAILQ_ENTRY(spdk_file) cache_tailq; 93 }; 94 95 struct spdk_deleted_file { 96 spdk_blob_id id; 97 TAILQ_ENTRY(spdk_deleted_file) tailq; 98 }; 99 100 struct spdk_filesystem { 101 struct spdk_blob_store *bs; 102 TAILQ_HEAD(, spdk_file) files; 103 struct spdk_bs_opts bs_opts; 104 struct spdk_bs_dev *bdev; 105 fs_send_request_fn send_request; 106 107 struct { 108 uint32_t max_ops; 109 struct spdk_io_channel *sync_io_channel; 110 struct spdk_fs_channel *sync_fs_channel; 111 } sync_target; 112 113 struct { 114 uint32_t max_ops; 115 struct spdk_io_channel *md_io_channel; 116 struct spdk_fs_channel *md_fs_channel; 117 } md_target; 118 119 struct { 120 uint32_t max_ops; 121 } io_target; 122 }; 123 124 struct spdk_fs_cb_args { 125 union { 126 spdk_fs_op_with_handle_complete fs_op_with_handle; 127 spdk_fs_op_complete fs_op; 128 spdk_file_op_with_handle_complete file_op_with_handle; 129 spdk_file_op_complete file_op; 130 spdk_file_stat_op_complete stat_op; 131 } fn; 132 void *arg; 133 sem_t *sem; 134 struct spdk_filesystem *fs; 135 struct spdk_file *file; 136 int rc; 137 bool from_request; 138 union { 139 struct { 140 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 141 } fs_load; 142 struct { 143 uint64_t length; 144 } truncate; 145 struct { 146 struct spdk_io_channel *channel; 147 void *user_buf; 148 void *pin_buf; 149 int is_read; 150 off_t offset; 151 size_t length; 152 uint64_t start_lba; 153 uint64_t num_lba; 154 uint32_t blocklen; 155 } rw; 156 struct { 157 const char *old_name; 158 const char *new_name; 159 } rename; 160 struct { 161 struct cache_buffer *cache_buffer; 162 uint64_t length; 163 } flush; 164 struct { 165 struct cache_buffer *cache_buffer; 166 uint64_t length; 167 uint64_t offset; 168 } readahead; 169 struct { 170 uint64_t offset; 171 TAILQ_ENTRY(spdk_fs_request) tailq; 172 bool xattr_in_progress; 173 } sync; 174 struct { 175 uint32_t num_clusters; 176 } resize; 177 struct { 178 const char *name; 179 uint32_t flags; 180 TAILQ_ENTRY(spdk_fs_request) tailq; 181 } open; 182 struct { 183 const char *name; 184 struct spdk_blob *blob; 185 } create; 186 struct { 187 const char *name; 188 } delete; 189 struct { 190 const char *name; 191 } stat; 192 } op; 193 }; 194 195 static void cache_free_buffers(struct spdk_file *file); 196 197 void 198 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 199 { 200 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 201 } 202 203 static void 204 __initialize_cache(void) 205 { 206 assert(g_cache_pool == NULL); 207 208 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 209 g_fs_cache_size / CACHE_BUFFER_SIZE, 210 CACHE_BUFFER_SIZE, 211 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 212 SPDK_ENV_SOCKET_ID_ANY); 213 if (!g_cache_pool) { 214 SPDK_ERRLOG("Create mempool failed, you may " 215 "increase the memory and try again\n"); 216 assert(false); 217 } 218 TAILQ_INIT(&g_caches); 219 pthread_spin_init(&g_caches_lock, 0); 220 } 221 222 static void 223 __free_cache(void) 224 { 225 assert(g_cache_pool != NULL); 226 227 spdk_mempool_free(g_cache_pool); 228 g_cache_pool = NULL; 229 } 230 231 static uint64_t 232 __file_get_blob_size(struct spdk_file *file) 233 { 234 uint64_t cluster_sz; 235 236 cluster_sz = file->fs->bs_opts.cluster_sz; 237 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 238 } 239 240 struct spdk_fs_request { 241 struct spdk_fs_cb_args args; 242 TAILQ_ENTRY(spdk_fs_request) link; 243 struct spdk_fs_channel *channel; 244 }; 245 246 struct spdk_fs_channel { 247 struct spdk_fs_request *req_mem; 248 TAILQ_HEAD(, spdk_fs_request) reqs; 249 sem_t sem; 250 struct spdk_filesystem *fs; 251 struct spdk_io_channel *bs_channel; 252 fs_send_request_fn send_request; 253 bool sync; 254 pthread_spinlock_t lock; 255 }; 256 257 /* For now, this is effectively an alias. But eventually we'll shift 258 * some data members over. */ 259 struct spdk_fs_thread_ctx { 260 struct spdk_fs_channel ch; 261 }; 262 263 static struct spdk_fs_request * 264 alloc_fs_request(struct spdk_fs_channel *channel) 265 { 266 struct spdk_fs_request *req; 267 268 if (channel->sync) { 269 pthread_spin_lock(&channel->lock); 270 } 271 272 req = TAILQ_FIRST(&channel->reqs); 273 if (req) { 274 TAILQ_REMOVE(&channel->reqs, req, link); 275 } 276 277 if (channel->sync) { 278 pthread_spin_unlock(&channel->lock); 279 } 280 281 if (req == NULL) { 282 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 283 return NULL; 284 } 285 memset(req, 0, sizeof(*req)); 286 req->channel = channel; 287 req->args.from_request = true; 288 289 return req; 290 } 291 292 static void 293 free_fs_request(struct spdk_fs_request *req) 294 { 295 struct spdk_fs_channel *channel = req->channel; 296 297 if (channel->sync) { 298 pthread_spin_lock(&channel->lock); 299 } 300 301 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 302 303 if (channel->sync) { 304 pthread_spin_unlock(&channel->lock); 305 } 306 } 307 308 static int 309 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 310 uint32_t max_ops) 311 { 312 uint32_t i; 313 314 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 315 if (!channel->req_mem) { 316 return -1; 317 } 318 319 TAILQ_INIT(&channel->reqs); 320 sem_init(&channel->sem, 0, 0); 321 322 for (i = 0; i < max_ops; i++) { 323 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 324 } 325 326 channel->fs = fs; 327 328 return 0; 329 } 330 331 static int 332 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 333 { 334 struct spdk_filesystem *fs; 335 struct spdk_fs_channel *channel = ctx_buf; 336 337 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 338 339 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 340 } 341 342 static int 343 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 344 { 345 struct spdk_filesystem *fs; 346 struct spdk_fs_channel *channel = ctx_buf; 347 348 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 349 350 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 351 } 352 353 static int 354 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 355 { 356 struct spdk_filesystem *fs; 357 struct spdk_fs_channel *channel = ctx_buf; 358 359 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 360 361 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 362 } 363 364 static void 365 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 366 { 367 struct spdk_fs_channel *channel = ctx_buf; 368 369 free(channel->req_mem); 370 if (channel->bs_channel != NULL) { 371 spdk_bs_free_io_channel(channel->bs_channel); 372 } 373 } 374 375 static void 376 __send_request_direct(fs_request_fn fn, void *arg) 377 { 378 fn(arg); 379 } 380 381 static void 382 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 383 { 384 fs->bs = bs; 385 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 386 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 387 fs->md_target.md_fs_channel->send_request = __send_request_direct; 388 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 389 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 390 391 pthread_mutex_lock(&g_cache_init_lock); 392 if (g_fs_count == 0) { 393 __initialize_cache(); 394 } 395 g_fs_count++; 396 pthread_mutex_unlock(&g_cache_init_lock); 397 } 398 399 static void 400 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 401 { 402 struct spdk_fs_request *req = ctx; 403 struct spdk_fs_cb_args *args = &req->args; 404 struct spdk_filesystem *fs = args->fs; 405 406 if (bserrno == 0) { 407 common_fs_bs_init(fs, bs); 408 } else { 409 free(fs); 410 fs = NULL; 411 } 412 413 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 414 free_fs_request(req); 415 } 416 417 static void 418 fs_conf_parse(void) 419 { 420 struct spdk_conf_section *sp; 421 422 sp = spdk_conf_find_section(NULL, "Blobfs"); 423 if (sp == NULL) { 424 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 425 return; 426 } 427 428 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 429 if (g_fs_cache_buffer_shift <= 0) { 430 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 431 } 432 } 433 434 static struct spdk_filesystem * 435 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 436 { 437 struct spdk_filesystem *fs; 438 439 fs = calloc(1, sizeof(*fs)); 440 if (fs == NULL) { 441 return NULL; 442 } 443 444 fs->bdev = dev; 445 fs->send_request = send_request_fn; 446 TAILQ_INIT(&fs->files); 447 448 fs->md_target.max_ops = 512; 449 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 450 sizeof(struct spdk_fs_channel), "blobfs_md"); 451 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 452 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 453 454 fs->sync_target.max_ops = 512; 455 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 456 sizeof(struct spdk_fs_channel), "blobfs_sync"); 457 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 458 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 459 460 fs->io_target.max_ops = 512; 461 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 462 sizeof(struct spdk_fs_channel), "blobfs_io"); 463 464 return fs; 465 } 466 467 static void 468 __wake_caller(void *arg, int fserrno) 469 { 470 struct spdk_fs_cb_args *args = arg; 471 472 args->rc = fserrno; 473 sem_post(args->sem); 474 } 475 476 void 477 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 478 fs_send_request_fn send_request_fn, 479 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 480 { 481 struct spdk_filesystem *fs; 482 struct spdk_fs_request *req; 483 struct spdk_fs_cb_args *args; 484 struct spdk_bs_opts opts = {}; 485 486 fs = fs_alloc(dev, send_request_fn); 487 if (fs == NULL) { 488 cb_fn(cb_arg, NULL, -ENOMEM); 489 return; 490 } 491 492 fs_conf_parse(); 493 494 req = alloc_fs_request(fs->md_target.md_fs_channel); 495 if (req == NULL) { 496 spdk_put_io_channel(fs->md_target.md_io_channel); 497 spdk_io_device_unregister(&fs->md_target, NULL); 498 spdk_put_io_channel(fs->sync_target.sync_io_channel); 499 spdk_io_device_unregister(&fs->sync_target, NULL); 500 spdk_io_device_unregister(&fs->io_target, NULL); 501 free(fs); 502 cb_fn(cb_arg, NULL, -ENOMEM); 503 return; 504 } 505 506 args = &req->args; 507 args->fn.fs_op_with_handle = cb_fn; 508 args->arg = cb_arg; 509 args->fs = fs; 510 511 spdk_bs_opts_init(&opts); 512 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS"); 513 if (opt) { 514 opts.cluster_sz = opt->cluster_sz; 515 } 516 spdk_bs_init(dev, &opts, init_cb, req); 517 } 518 519 static struct spdk_file * 520 file_alloc(struct spdk_filesystem *fs) 521 { 522 struct spdk_file *file; 523 524 file = calloc(1, sizeof(*file)); 525 if (file == NULL) { 526 return NULL; 527 } 528 529 file->tree = calloc(1, sizeof(*file->tree)); 530 if (file->tree == NULL) { 531 free(file); 532 return NULL; 533 } 534 535 file->fs = fs; 536 TAILQ_INIT(&file->open_requests); 537 TAILQ_INIT(&file->sync_requests); 538 pthread_spin_init(&file->lock, 0); 539 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 540 file->priority = SPDK_FILE_PRIORITY_LOW; 541 return file; 542 } 543 544 static void fs_load_done(void *ctx, int bserrno); 545 546 static int 547 _handle_deleted_files(struct spdk_fs_request *req) 548 { 549 struct spdk_fs_cb_args *args = &req->args; 550 struct spdk_filesystem *fs = args->fs; 551 552 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 553 struct spdk_deleted_file *deleted_file; 554 555 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 556 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 557 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 558 free(deleted_file); 559 return 0; 560 } 561 562 return 1; 563 } 564 565 static void 566 fs_load_done(void *ctx, int bserrno) 567 { 568 struct spdk_fs_request *req = ctx; 569 struct spdk_fs_cb_args *args = &req->args; 570 struct spdk_filesystem *fs = args->fs; 571 572 /* The filesystem has been loaded. Now check if there are any files that 573 * were marked for deletion before last unload. Do not complete the 574 * fs_load callback until all of them have been deleted on disk. 575 */ 576 if (_handle_deleted_files(req) == 0) { 577 /* We found a file that's been marked for deleting but not actually 578 * deleted yet. This function will get called again once the delete 579 * operation is completed. 580 */ 581 return; 582 } 583 584 args->fn.fs_op_with_handle(args->arg, fs, 0); 585 free_fs_request(req); 586 587 } 588 589 static void 590 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 591 { 592 struct spdk_fs_request *req = ctx; 593 struct spdk_fs_cb_args *args = &req->args; 594 struct spdk_filesystem *fs = args->fs; 595 uint64_t *length; 596 const char *name; 597 uint32_t *is_deleted; 598 size_t value_len; 599 600 if (rc < 0) { 601 args->fn.fs_op_with_handle(args->arg, fs, rc); 602 free_fs_request(req); 603 return; 604 } 605 606 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 607 if (rc < 0) { 608 args->fn.fs_op_with_handle(args->arg, fs, rc); 609 free_fs_request(req); 610 return; 611 } 612 613 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 614 if (rc < 0) { 615 args->fn.fs_op_with_handle(args->arg, fs, rc); 616 free_fs_request(req); 617 return; 618 } 619 620 assert(value_len == 8); 621 622 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 623 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 624 if (rc < 0) { 625 struct spdk_file *f; 626 627 f = file_alloc(fs); 628 if (f == NULL) { 629 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 630 free_fs_request(req); 631 return; 632 } 633 634 f->name = strdup(name); 635 f->blobid = spdk_blob_get_id(blob); 636 f->length = *length; 637 f->length_flushed = *length; 638 f->append_pos = *length; 639 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 640 } else { 641 struct spdk_deleted_file *deleted_file; 642 643 deleted_file = calloc(1, sizeof(*deleted_file)); 644 if (deleted_file == NULL) { 645 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 646 free_fs_request(req); 647 return; 648 } 649 deleted_file->id = spdk_blob_get_id(blob); 650 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 651 } 652 } 653 654 static void 655 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 656 { 657 struct spdk_fs_request *req = ctx; 658 struct spdk_fs_cb_args *args = &req->args; 659 struct spdk_filesystem *fs = args->fs; 660 struct spdk_bs_type bstype; 661 static const struct spdk_bs_type blobfs_type = {"BLOBFS"}; 662 static const struct spdk_bs_type zeros; 663 664 if (bserrno != 0) { 665 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 666 free_fs_request(req); 667 free(fs); 668 return; 669 } 670 671 bstype = spdk_bs_get_bstype(bs); 672 673 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 674 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n"); 675 spdk_bs_set_bstype(bs, blobfs_type); 676 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 677 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n"); 678 SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype)); 679 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 680 free_fs_request(req); 681 free(fs); 682 return; 683 } 684 685 common_fs_bs_init(fs, bs); 686 fs_load_done(req, 0); 687 } 688 689 static void 690 spdk_fs_io_device_unregister(struct spdk_filesystem *fs) 691 { 692 assert(fs != NULL); 693 spdk_io_device_unregister(&fs->md_target, NULL); 694 spdk_io_device_unregister(&fs->sync_target, NULL); 695 spdk_io_device_unregister(&fs->io_target, NULL); 696 free(fs); 697 } 698 699 static void 700 spdk_fs_free_io_channels(struct spdk_filesystem *fs) 701 { 702 assert(fs != NULL); 703 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 704 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 705 } 706 707 void 708 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 709 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 710 { 711 struct spdk_filesystem *fs; 712 struct spdk_fs_cb_args *args; 713 struct spdk_fs_request *req; 714 struct spdk_bs_opts bs_opts; 715 716 fs = fs_alloc(dev, send_request_fn); 717 if (fs == NULL) { 718 cb_fn(cb_arg, NULL, -ENOMEM); 719 return; 720 } 721 722 fs_conf_parse(); 723 724 req = alloc_fs_request(fs->md_target.md_fs_channel); 725 if (req == NULL) { 726 spdk_fs_free_io_channels(fs); 727 spdk_fs_io_device_unregister(fs); 728 cb_fn(cb_arg, NULL, -ENOMEM); 729 return; 730 } 731 732 args = &req->args; 733 args->fn.fs_op_with_handle = cb_fn; 734 args->arg = cb_arg; 735 args->fs = fs; 736 TAILQ_INIT(&args->op.fs_load.deleted_files); 737 spdk_bs_opts_init(&bs_opts); 738 bs_opts.iter_cb_fn = iter_cb; 739 bs_opts.iter_cb_arg = req; 740 spdk_bs_load(dev, &bs_opts, load_cb, req); 741 } 742 743 static void 744 unload_cb(void *ctx, int bserrno) 745 { 746 struct spdk_fs_request *req = ctx; 747 struct spdk_fs_cb_args *args = &req->args; 748 struct spdk_filesystem *fs = args->fs; 749 struct spdk_file *file, *tmp; 750 751 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 752 TAILQ_REMOVE(&fs->files, file, tailq); 753 cache_free_buffers(file); 754 free(file->name); 755 free(file->tree); 756 free(file); 757 } 758 759 pthread_mutex_lock(&g_cache_init_lock); 760 g_fs_count--; 761 if (g_fs_count == 0) { 762 __free_cache(); 763 } 764 pthread_mutex_unlock(&g_cache_init_lock); 765 766 args->fn.fs_op(args->arg, bserrno); 767 free(req); 768 769 spdk_fs_io_device_unregister(fs); 770 } 771 772 void 773 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 774 { 775 struct spdk_fs_request *req; 776 struct spdk_fs_cb_args *args; 777 778 /* 779 * We must free the md_channel before unloading the blobstore, so just 780 * allocate this request from the general heap. 781 */ 782 req = calloc(1, sizeof(*req)); 783 if (req == NULL) { 784 cb_fn(cb_arg, -ENOMEM); 785 return; 786 } 787 788 args = &req->args; 789 args->fn.fs_op = cb_fn; 790 args->arg = cb_arg; 791 args->fs = fs; 792 793 spdk_fs_free_io_channels(fs); 794 spdk_bs_unload(fs->bs, unload_cb, req); 795 } 796 797 static struct spdk_file * 798 fs_find_file(struct spdk_filesystem *fs, const char *name) 799 { 800 struct spdk_file *file; 801 802 TAILQ_FOREACH(file, &fs->files, tailq) { 803 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 804 return file; 805 } 806 } 807 808 return NULL; 809 } 810 811 void 812 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 813 spdk_file_stat_op_complete cb_fn, void *cb_arg) 814 { 815 struct spdk_file_stat stat; 816 struct spdk_file *f = NULL; 817 818 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 819 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 820 return; 821 } 822 823 f = fs_find_file(fs, name); 824 if (f != NULL) { 825 stat.blobid = f->blobid; 826 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 827 cb_fn(cb_arg, &stat, 0); 828 return; 829 } 830 831 cb_fn(cb_arg, NULL, -ENOENT); 832 } 833 834 static void 835 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 836 { 837 struct spdk_fs_request *req = arg; 838 struct spdk_fs_cb_args *args = &req->args; 839 840 args->rc = fserrno; 841 if (fserrno == 0) { 842 memcpy(args->arg, stat, sizeof(*stat)); 843 } 844 sem_post(args->sem); 845 } 846 847 static void 848 __file_stat(void *arg) 849 { 850 struct spdk_fs_request *req = arg; 851 struct spdk_fs_cb_args *args = &req->args; 852 853 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 854 args->fn.stat_op, req); 855 } 856 857 int 858 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 859 const char *name, struct spdk_file_stat *stat) 860 { 861 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 862 struct spdk_fs_request *req; 863 int rc; 864 865 req = alloc_fs_request(channel); 866 if (req == NULL) { 867 return -ENOMEM; 868 } 869 870 req->args.fs = fs; 871 req->args.op.stat.name = name; 872 req->args.fn.stat_op = __copy_stat; 873 req->args.arg = stat; 874 req->args.sem = &channel->sem; 875 channel->send_request(__file_stat, req); 876 sem_wait(&channel->sem); 877 878 rc = req->args.rc; 879 free_fs_request(req); 880 881 return rc; 882 } 883 884 static void 885 fs_create_blob_close_cb(void *ctx, int bserrno) 886 { 887 int rc; 888 struct spdk_fs_request *req = ctx; 889 struct spdk_fs_cb_args *args = &req->args; 890 891 rc = args->rc ? args->rc : bserrno; 892 args->fn.file_op(args->arg, rc); 893 free_fs_request(req); 894 } 895 896 static void 897 fs_create_blob_resize_cb(void *ctx, int bserrno) 898 { 899 struct spdk_fs_request *req = ctx; 900 struct spdk_fs_cb_args *args = &req->args; 901 struct spdk_file *f = args->file; 902 struct spdk_blob *blob = args->op.create.blob; 903 uint64_t length = 0; 904 905 args->rc = bserrno; 906 if (bserrno) { 907 spdk_blob_close(blob, fs_create_blob_close_cb, args); 908 return; 909 } 910 911 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 912 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 913 914 spdk_blob_close(blob, fs_create_blob_close_cb, args); 915 } 916 917 static void 918 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 919 { 920 struct spdk_fs_request *req = ctx; 921 struct spdk_fs_cb_args *args = &req->args; 922 923 if (bserrno) { 924 args->fn.file_op(args->arg, bserrno); 925 free_fs_request(req); 926 return; 927 } 928 929 args->op.create.blob = blob; 930 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 931 } 932 933 static void 934 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 935 { 936 struct spdk_fs_request *req = ctx; 937 struct spdk_fs_cb_args *args = &req->args; 938 struct spdk_file *f = args->file; 939 940 if (bserrno) { 941 args->fn.file_op(args->arg, bserrno); 942 free_fs_request(req); 943 return; 944 } 945 946 f->blobid = blobid; 947 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 948 } 949 950 void 951 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 952 spdk_file_op_complete cb_fn, void *cb_arg) 953 { 954 struct spdk_file *file; 955 struct spdk_fs_request *req; 956 struct spdk_fs_cb_args *args; 957 958 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 959 cb_fn(cb_arg, -ENAMETOOLONG); 960 return; 961 } 962 963 file = fs_find_file(fs, name); 964 if (file != NULL) { 965 cb_fn(cb_arg, -EEXIST); 966 return; 967 } 968 969 file = file_alloc(fs); 970 if (file == NULL) { 971 cb_fn(cb_arg, -ENOMEM); 972 return; 973 } 974 975 req = alloc_fs_request(fs->md_target.md_fs_channel); 976 if (req == NULL) { 977 cb_fn(cb_arg, -ENOMEM); 978 return; 979 } 980 981 args = &req->args; 982 args->file = file; 983 args->fn.file_op = cb_fn; 984 args->arg = cb_arg; 985 986 file->name = strdup(name); 987 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 988 } 989 990 static void 991 __fs_create_file_done(void *arg, int fserrno) 992 { 993 struct spdk_fs_request *req = arg; 994 struct spdk_fs_cb_args *args = &req->args; 995 996 args->rc = fserrno; 997 sem_post(args->sem); 998 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 999 } 1000 1001 static void 1002 __fs_create_file(void *arg) 1003 { 1004 struct spdk_fs_request *req = arg; 1005 struct spdk_fs_cb_args *args = &req->args; 1006 1007 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1008 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1009 } 1010 1011 int 1012 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, const char *name) 1013 { 1014 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1015 struct spdk_fs_request *req; 1016 struct spdk_fs_cb_args *args; 1017 int rc; 1018 1019 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1020 1021 req = alloc_fs_request(channel); 1022 if (req == NULL) { 1023 return -ENOMEM; 1024 } 1025 1026 args = &req->args; 1027 args->fs = fs; 1028 args->op.create.name = name; 1029 args->sem = &channel->sem; 1030 fs->send_request(__fs_create_file, req); 1031 sem_wait(&channel->sem); 1032 rc = args->rc; 1033 free_fs_request(req); 1034 1035 return rc; 1036 } 1037 1038 static void 1039 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1040 { 1041 struct spdk_fs_request *req = ctx; 1042 struct spdk_fs_cb_args *args = &req->args; 1043 struct spdk_file *f = args->file; 1044 1045 f->blob = blob; 1046 while (!TAILQ_EMPTY(&f->open_requests)) { 1047 req = TAILQ_FIRST(&f->open_requests); 1048 args = &req->args; 1049 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1050 args->fn.file_op_with_handle(args->arg, f, bserrno); 1051 free_fs_request(req); 1052 } 1053 } 1054 1055 static void 1056 fs_open_blob_create_cb(void *ctx, int bserrno) 1057 { 1058 struct spdk_fs_request *req = ctx; 1059 struct spdk_fs_cb_args *args = &req->args; 1060 struct spdk_file *file = args->file; 1061 struct spdk_filesystem *fs = args->fs; 1062 1063 if (file == NULL) { 1064 /* 1065 * This is from an open with CREATE flag - the file 1066 * is now created so look it up in the file list for this 1067 * filesystem. 1068 */ 1069 file = fs_find_file(fs, args->op.open.name); 1070 assert(file != NULL); 1071 args->file = file; 1072 } 1073 1074 file->ref_count++; 1075 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1076 if (file->ref_count == 1) { 1077 assert(file->blob == NULL); 1078 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1079 } else if (file->blob != NULL) { 1080 fs_open_blob_done(req, file->blob, 0); 1081 } else { 1082 /* 1083 * The blob open for this file is in progress due to a previous 1084 * open request. When that open completes, it will invoke the 1085 * open callback for this request. 1086 */ 1087 } 1088 } 1089 1090 void 1091 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1092 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1093 { 1094 struct spdk_file *f = NULL; 1095 struct spdk_fs_request *req; 1096 struct spdk_fs_cb_args *args; 1097 1098 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1099 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1100 return; 1101 } 1102 1103 f = fs_find_file(fs, name); 1104 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1105 cb_fn(cb_arg, NULL, -ENOENT); 1106 return; 1107 } 1108 1109 if (f != NULL && f->is_deleted == true) { 1110 cb_fn(cb_arg, NULL, -ENOENT); 1111 return; 1112 } 1113 1114 req = alloc_fs_request(fs->md_target.md_fs_channel); 1115 if (req == NULL) { 1116 cb_fn(cb_arg, NULL, -ENOMEM); 1117 return; 1118 } 1119 1120 args = &req->args; 1121 args->fn.file_op_with_handle = cb_fn; 1122 args->arg = cb_arg; 1123 args->file = f; 1124 args->fs = fs; 1125 args->op.open.name = name; 1126 1127 if (f == NULL) { 1128 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1129 } else { 1130 fs_open_blob_create_cb(req, 0); 1131 } 1132 } 1133 1134 static void 1135 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1136 { 1137 struct spdk_fs_request *req = arg; 1138 struct spdk_fs_cb_args *args = &req->args; 1139 1140 args->file = file; 1141 __wake_caller(args, bserrno); 1142 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1143 } 1144 1145 static void 1146 __fs_open_file(void *arg) 1147 { 1148 struct spdk_fs_request *req = arg; 1149 struct spdk_fs_cb_args *args = &req->args; 1150 1151 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1152 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1153 __fs_open_file_done, req); 1154 } 1155 1156 int 1157 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1158 const char *name, uint32_t flags, struct spdk_file **file) 1159 { 1160 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1161 struct spdk_fs_request *req; 1162 struct spdk_fs_cb_args *args; 1163 int rc; 1164 1165 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1166 1167 req = alloc_fs_request(channel); 1168 if (req == NULL) { 1169 return -ENOMEM; 1170 } 1171 1172 args = &req->args; 1173 args->fs = fs; 1174 args->op.open.name = name; 1175 args->op.open.flags = flags; 1176 args->sem = &channel->sem; 1177 fs->send_request(__fs_open_file, req); 1178 sem_wait(&channel->sem); 1179 rc = args->rc; 1180 if (rc == 0) { 1181 *file = args->file; 1182 } else { 1183 *file = NULL; 1184 } 1185 free_fs_request(req); 1186 1187 return rc; 1188 } 1189 1190 static void 1191 fs_rename_blob_close_cb(void *ctx, int bserrno) 1192 { 1193 struct spdk_fs_request *req = ctx; 1194 struct spdk_fs_cb_args *args = &req->args; 1195 1196 args->fn.fs_op(args->arg, bserrno); 1197 free_fs_request(req); 1198 } 1199 1200 static void 1201 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1202 { 1203 struct spdk_fs_request *req = ctx; 1204 struct spdk_fs_cb_args *args = &req->args; 1205 const char *new_name = args->op.rename.new_name; 1206 1207 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1208 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1209 } 1210 1211 static void 1212 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1213 { 1214 struct spdk_fs_cb_args *args = &req->args; 1215 struct spdk_file *f; 1216 1217 f = fs_find_file(args->fs, args->op.rename.old_name); 1218 if (f == NULL) { 1219 args->fn.fs_op(args->arg, -ENOENT); 1220 free_fs_request(req); 1221 return; 1222 } 1223 1224 free(f->name); 1225 f->name = strdup(args->op.rename.new_name); 1226 args->file = f; 1227 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1228 } 1229 1230 static void 1231 fs_rename_delete_done(void *arg, int fserrno) 1232 { 1233 __spdk_fs_md_rename_file(arg); 1234 } 1235 1236 void 1237 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1238 const char *old_name, const char *new_name, 1239 spdk_file_op_complete cb_fn, void *cb_arg) 1240 { 1241 struct spdk_file *f; 1242 struct spdk_fs_request *req; 1243 struct spdk_fs_cb_args *args; 1244 1245 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1246 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1247 cb_fn(cb_arg, -ENAMETOOLONG); 1248 return; 1249 } 1250 1251 req = alloc_fs_request(fs->md_target.md_fs_channel); 1252 if (req == NULL) { 1253 cb_fn(cb_arg, -ENOMEM); 1254 return; 1255 } 1256 1257 args = &req->args; 1258 args->fn.fs_op = cb_fn; 1259 args->fs = fs; 1260 args->arg = cb_arg; 1261 args->op.rename.old_name = old_name; 1262 args->op.rename.new_name = new_name; 1263 1264 f = fs_find_file(fs, new_name); 1265 if (f == NULL) { 1266 __spdk_fs_md_rename_file(req); 1267 return; 1268 } 1269 1270 /* 1271 * The rename overwrites an existing file. So delete the existing file, then 1272 * do the actual rename. 1273 */ 1274 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1275 } 1276 1277 static void 1278 __fs_rename_file_done(void *arg, int fserrno) 1279 { 1280 struct spdk_fs_request *req = arg; 1281 struct spdk_fs_cb_args *args = &req->args; 1282 1283 __wake_caller(args, fserrno); 1284 } 1285 1286 static void 1287 __fs_rename_file(void *arg) 1288 { 1289 struct spdk_fs_request *req = arg; 1290 struct spdk_fs_cb_args *args = &req->args; 1291 1292 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1293 __fs_rename_file_done, req); 1294 } 1295 1296 int 1297 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1298 const char *old_name, const char *new_name) 1299 { 1300 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1301 struct spdk_fs_request *req; 1302 struct spdk_fs_cb_args *args; 1303 int rc; 1304 1305 req = alloc_fs_request(channel); 1306 if (req == NULL) { 1307 return -ENOMEM; 1308 } 1309 1310 args = &req->args; 1311 1312 args->fs = fs; 1313 args->op.rename.old_name = old_name; 1314 args->op.rename.new_name = new_name; 1315 args->sem = &channel->sem; 1316 fs->send_request(__fs_rename_file, req); 1317 sem_wait(&channel->sem); 1318 rc = args->rc; 1319 free_fs_request(req); 1320 return rc; 1321 } 1322 1323 static void 1324 blob_delete_cb(void *ctx, int bserrno) 1325 { 1326 struct spdk_fs_request *req = ctx; 1327 struct spdk_fs_cb_args *args = &req->args; 1328 1329 args->fn.file_op(args->arg, bserrno); 1330 free_fs_request(req); 1331 } 1332 1333 void 1334 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1335 spdk_file_op_complete cb_fn, void *cb_arg) 1336 { 1337 struct spdk_file *f; 1338 spdk_blob_id blobid; 1339 struct spdk_fs_request *req; 1340 struct spdk_fs_cb_args *args; 1341 1342 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1343 1344 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1345 cb_fn(cb_arg, -ENAMETOOLONG); 1346 return; 1347 } 1348 1349 f = fs_find_file(fs, name); 1350 if (f == NULL) { 1351 cb_fn(cb_arg, -ENOENT); 1352 return; 1353 } 1354 1355 req = alloc_fs_request(fs->md_target.md_fs_channel); 1356 if (req == NULL) { 1357 cb_fn(cb_arg, -ENOMEM); 1358 return; 1359 } 1360 1361 args = &req->args; 1362 args->fn.file_op = cb_fn; 1363 args->arg = cb_arg; 1364 1365 if (f->ref_count > 0) { 1366 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1367 f->is_deleted = true; 1368 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1369 spdk_blob_sync_md(f->blob, blob_delete_cb, args); 1370 return; 1371 } 1372 1373 TAILQ_REMOVE(&fs->files, f, tailq); 1374 1375 cache_free_buffers(f); 1376 1377 blobid = f->blobid; 1378 1379 free(f->name); 1380 free(f->tree); 1381 free(f); 1382 1383 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1384 } 1385 1386 static void 1387 __fs_delete_file_done(void *arg, int fserrno) 1388 { 1389 struct spdk_fs_request *req = arg; 1390 struct spdk_fs_cb_args *args = &req->args; 1391 1392 __wake_caller(args, fserrno); 1393 } 1394 1395 static void 1396 __fs_delete_file(void *arg) 1397 { 1398 struct spdk_fs_request *req = arg; 1399 struct spdk_fs_cb_args *args = &req->args; 1400 1401 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1402 } 1403 1404 int 1405 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_fs_thread_ctx *ctx, 1406 const char *name) 1407 { 1408 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1409 struct spdk_fs_request *req; 1410 struct spdk_fs_cb_args *args; 1411 int rc; 1412 1413 req = alloc_fs_request(channel); 1414 if (req == NULL) { 1415 return -ENOMEM; 1416 } 1417 1418 args = &req->args; 1419 args->fs = fs; 1420 args->op.delete.name = name; 1421 args->sem = &channel->sem; 1422 fs->send_request(__fs_delete_file, req); 1423 sem_wait(&channel->sem); 1424 rc = args->rc; 1425 free_fs_request(req); 1426 1427 return rc; 1428 } 1429 1430 spdk_fs_iter 1431 spdk_fs_iter_first(struct spdk_filesystem *fs) 1432 { 1433 struct spdk_file *f; 1434 1435 f = TAILQ_FIRST(&fs->files); 1436 return f; 1437 } 1438 1439 spdk_fs_iter 1440 spdk_fs_iter_next(spdk_fs_iter iter) 1441 { 1442 struct spdk_file *f = iter; 1443 1444 if (f == NULL) { 1445 return NULL; 1446 } 1447 1448 f = TAILQ_NEXT(f, tailq); 1449 return f; 1450 } 1451 1452 const char * 1453 spdk_file_get_name(struct spdk_file *file) 1454 { 1455 return file->name; 1456 } 1457 1458 uint64_t 1459 spdk_file_get_length(struct spdk_file *file) 1460 { 1461 uint64_t length; 1462 1463 assert(file != NULL); 1464 1465 length = file->append_pos >= file->length ? file->append_pos : file->length; 1466 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length); 1467 return length; 1468 } 1469 1470 static void 1471 fs_truncate_complete_cb(void *ctx, int bserrno) 1472 { 1473 struct spdk_fs_request *req = ctx; 1474 struct spdk_fs_cb_args *args = &req->args; 1475 1476 args->fn.file_op(args->arg, bserrno); 1477 free_fs_request(req); 1478 } 1479 1480 static void 1481 fs_truncate_resize_cb(void *ctx, int bserrno) 1482 { 1483 struct spdk_fs_request *req = ctx; 1484 struct spdk_fs_cb_args *args = &req->args; 1485 struct spdk_file *file = args->file; 1486 uint64_t *length = &args->op.truncate.length; 1487 1488 if (bserrno) { 1489 args->fn.file_op(args->arg, bserrno); 1490 free_fs_request(req); 1491 return; 1492 } 1493 1494 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1495 1496 file->length = *length; 1497 if (file->append_pos > file->length) { 1498 file->append_pos = file->length; 1499 } 1500 1501 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, args); 1502 } 1503 1504 static uint64_t 1505 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1506 { 1507 return (length + cluster_sz - 1) / cluster_sz; 1508 } 1509 1510 void 1511 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1512 spdk_file_op_complete cb_fn, void *cb_arg) 1513 { 1514 struct spdk_filesystem *fs; 1515 size_t num_clusters; 1516 struct spdk_fs_request *req; 1517 struct spdk_fs_cb_args *args; 1518 1519 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1520 if (length == file->length) { 1521 cb_fn(cb_arg, 0); 1522 return; 1523 } 1524 1525 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1526 if (req == NULL) { 1527 cb_fn(cb_arg, -ENOMEM); 1528 return; 1529 } 1530 1531 args = &req->args; 1532 args->fn.file_op = cb_fn; 1533 args->arg = cb_arg; 1534 args->file = file; 1535 args->op.truncate.length = length; 1536 fs = file->fs; 1537 1538 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1539 1540 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1541 } 1542 1543 static void 1544 __truncate(void *arg) 1545 { 1546 struct spdk_fs_request *req = arg; 1547 struct spdk_fs_cb_args *args = &req->args; 1548 1549 spdk_file_truncate_async(args->file, args->op.truncate.length, 1550 args->fn.file_op, args); 1551 } 1552 1553 int 1554 spdk_file_truncate(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 1555 uint64_t length) 1556 { 1557 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 1558 struct spdk_fs_request *req; 1559 struct spdk_fs_cb_args *args; 1560 int rc; 1561 1562 req = alloc_fs_request(channel); 1563 if (req == NULL) { 1564 return -ENOMEM; 1565 } 1566 1567 args = &req->args; 1568 1569 args->file = file; 1570 args->op.truncate.length = length; 1571 args->fn.file_op = __wake_caller; 1572 args->sem = &channel->sem; 1573 1574 channel->send_request(__truncate, req); 1575 sem_wait(&channel->sem); 1576 rc = args->rc; 1577 free_fs_request(req); 1578 1579 return rc; 1580 } 1581 1582 static void 1583 __rw_done(void *ctx, int bserrno) 1584 { 1585 struct spdk_fs_request *req = ctx; 1586 struct spdk_fs_cb_args *args = &req->args; 1587 1588 spdk_free(args->op.rw.pin_buf); 1589 args->fn.file_op(args->arg, bserrno); 1590 free_fs_request(req); 1591 } 1592 1593 static void 1594 __read_done(void *ctx, int bserrno) 1595 { 1596 struct spdk_fs_request *req = ctx; 1597 struct spdk_fs_cb_args *args = &req->args; 1598 1599 assert(req != NULL); 1600 if (args->op.rw.is_read) { 1601 memcpy(args->op.rw.user_buf, 1602 args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)), 1603 args->op.rw.length); 1604 __rw_done(req, 0); 1605 } else { 1606 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)), 1607 args->op.rw.user_buf, 1608 args->op.rw.length); 1609 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1610 args->op.rw.pin_buf, 1611 args->op.rw.start_lba, args->op.rw.num_lba, 1612 __rw_done, req); 1613 } 1614 } 1615 1616 static void 1617 __do_blob_read(void *ctx, int fserrno) 1618 { 1619 struct spdk_fs_request *req = ctx; 1620 struct spdk_fs_cb_args *args = &req->args; 1621 1622 if (fserrno) { 1623 __rw_done(req, fserrno); 1624 return; 1625 } 1626 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1627 args->op.rw.pin_buf, 1628 args->op.rw.start_lba, args->op.rw.num_lba, 1629 __read_done, req); 1630 } 1631 1632 static void 1633 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1634 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1635 { 1636 uint64_t end_lba; 1637 1638 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1639 *start_lba = offset / *lba_size; 1640 end_lba = (offset + length - 1) / *lba_size; 1641 *num_lba = (end_lba - *start_lba + 1); 1642 } 1643 1644 static void 1645 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1646 void *payload, uint64_t offset, uint64_t length, 1647 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1648 { 1649 struct spdk_fs_request *req; 1650 struct spdk_fs_cb_args *args; 1651 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1652 uint64_t start_lba, num_lba, pin_buf_length; 1653 uint32_t lba_size; 1654 1655 if (is_read && offset + length > file->length) { 1656 cb_fn(cb_arg, -EINVAL); 1657 return; 1658 } 1659 1660 req = alloc_fs_request(channel); 1661 if (req == NULL) { 1662 cb_fn(cb_arg, -ENOMEM); 1663 return; 1664 } 1665 1666 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1667 1668 args = &req->args; 1669 args->fn.file_op = cb_fn; 1670 args->arg = cb_arg; 1671 args->file = file; 1672 args->op.rw.channel = channel->bs_channel; 1673 args->op.rw.user_buf = payload; 1674 args->op.rw.is_read = is_read; 1675 args->op.rw.offset = offset; 1676 args->op.rw.length = length; 1677 args->op.rw.blocklen = lba_size; 1678 1679 pin_buf_length = num_lba * lba_size; 1680 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, lba_size, NULL, 1681 SPDK_ENV_SOCKET_ID_ANY, SPDK_MALLOC_DMA); 1682 if (args->op.rw.pin_buf == NULL) { 1683 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1684 file->name, offset, length); 1685 free_fs_request(req); 1686 cb_fn(cb_arg, -ENOMEM); 1687 return; 1688 } 1689 1690 args->op.rw.start_lba = start_lba; 1691 args->op.rw.num_lba = num_lba; 1692 1693 if (!is_read && file->length < offset + length) { 1694 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1695 } else { 1696 __do_blob_read(req, 0); 1697 } 1698 } 1699 1700 void 1701 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1702 void *payload, uint64_t offset, uint64_t length, 1703 spdk_file_op_complete cb_fn, void *cb_arg) 1704 { 1705 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1706 } 1707 1708 void 1709 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1710 void *payload, uint64_t offset, uint64_t length, 1711 spdk_file_op_complete cb_fn, void *cb_arg) 1712 { 1713 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1714 file->name, offset, length); 1715 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1716 } 1717 1718 struct spdk_io_channel * 1719 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1720 { 1721 struct spdk_io_channel *io_channel; 1722 struct spdk_fs_channel *fs_channel; 1723 1724 io_channel = spdk_get_io_channel(&fs->io_target); 1725 fs_channel = spdk_io_channel_get_ctx(io_channel); 1726 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1727 fs_channel->send_request = __send_request_direct; 1728 1729 return io_channel; 1730 } 1731 1732 void 1733 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1734 { 1735 spdk_put_io_channel(channel); 1736 } 1737 1738 struct spdk_fs_thread_ctx * 1739 spdk_fs_alloc_thread_ctx(struct spdk_filesystem *fs) 1740 { 1741 struct spdk_io_channel *io_channel; 1742 struct spdk_fs_channel *fs_channel; 1743 1744 io_channel = spdk_get_io_channel(&fs->io_target); 1745 fs_channel = spdk_io_channel_get_ctx(io_channel); 1746 fs_channel->send_request = fs->send_request; 1747 fs_channel->sync = 1; 1748 pthread_spin_init(&fs_channel->lock, 0); 1749 1750 /* These two types are currently equivalent */ 1751 return (struct spdk_fs_thread_ctx *)fs_channel; 1752 } 1753 1754 1755 void 1756 spdk_fs_free_thread_ctx(struct spdk_fs_thread_ctx *ctx) 1757 { 1758 spdk_put_io_channel(spdk_io_channel_from_ctx(ctx)); 1759 } 1760 1761 void 1762 spdk_fs_set_cache_size(uint64_t size_in_mb) 1763 { 1764 g_fs_cache_size = size_in_mb * 1024 * 1024; 1765 } 1766 1767 uint64_t 1768 spdk_fs_get_cache_size(void) 1769 { 1770 return g_fs_cache_size / (1024 * 1024); 1771 } 1772 1773 static void __file_flush(void *_args); 1774 1775 static void * 1776 alloc_cache_memory_buffer(struct spdk_file *context) 1777 { 1778 struct spdk_file *file; 1779 void *buf; 1780 1781 buf = spdk_mempool_get(g_cache_pool); 1782 if (buf != NULL) { 1783 return buf; 1784 } 1785 1786 pthread_spin_lock(&g_caches_lock); 1787 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1788 if (!file->open_for_writing && 1789 file->priority == SPDK_FILE_PRIORITY_LOW && 1790 file != context) { 1791 break; 1792 } 1793 } 1794 pthread_spin_unlock(&g_caches_lock); 1795 if (file != NULL) { 1796 cache_free_buffers(file); 1797 buf = spdk_mempool_get(g_cache_pool); 1798 if (buf != NULL) { 1799 return buf; 1800 } 1801 } 1802 1803 pthread_spin_lock(&g_caches_lock); 1804 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1805 if (!file->open_for_writing && file != context) { 1806 break; 1807 } 1808 } 1809 pthread_spin_unlock(&g_caches_lock); 1810 if (file != NULL) { 1811 cache_free_buffers(file); 1812 buf = spdk_mempool_get(g_cache_pool); 1813 if (buf != NULL) { 1814 return buf; 1815 } 1816 } 1817 1818 pthread_spin_lock(&g_caches_lock); 1819 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1820 if (file != context) { 1821 break; 1822 } 1823 } 1824 pthread_spin_unlock(&g_caches_lock); 1825 if (file != NULL) { 1826 cache_free_buffers(file); 1827 buf = spdk_mempool_get(g_cache_pool); 1828 if (buf != NULL) { 1829 return buf; 1830 } 1831 } 1832 1833 return NULL; 1834 } 1835 1836 static struct cache_buffer * 1837 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1838 { 1839 struct cache_buffer *buf; 1840 int count = 0; 1841 1842 buf = calloc(1, sizeof(*buf)); 1843 if (buf == NULL) { 1844 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 1845 return NULL; 1846 } 1847 1848 buf->buf = alloc_cache_memory_buffer(file); 1849 while (buf->buf == NULL) { 1850 /* 1851 * TODO: alloc_cache_memory_buffer() should eventually free 1852 * some buffers. Need a more sophisticated check here, instead 1853 * of just bailing if 100 tries does not result in getting a 1854 * free buffer. This will involve using the sync channel's 1855 * semaphore to block until a buffer becomes available. 1856 */ 1857 if (count++ == 100) { 1858 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 1859 file, offset); 1860 free(buf); 1861 return NULL; 1862 } 1863 buf->buf = alloc_cache_memory_buffer(file); 1864 } 1865 1866 buf->buf_size = CACHE_BUFFER_SIZE; 1867 buf->offset = offset; 1868 1869 pthread_spin_lock(&g_caches_lock); 1870 if (file->tree->present_mask == 0) { 1871 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1872 } 1873 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1874 pthread_spin_unlock(&g_caches_lock); 1875 1876 return buf; 1877 } 1878 1879 static struct cache_buffer * 1880 cache_append_buffer(struct spdk_file *file) 1881 { 1882 struct cache_buffer *last; 1883 1884 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1885 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1886 1887 last = cache_insert_buffer(file, file->append_pos); 1888 if (last == NULL) { 1889 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 1890 return NULL; 1891 } 1892 1893 file->last = last; 1894 1895 return last; 1896 } 1897 1898 static void __check_sync_reqs(struct spdk_file *file); 1899 1900 static void 1901 __file_cache_finish_sync(void *ctx, int bserrno) 1902 { 1903 struct spdk_file *file = ctx; 1904 struct spdk_fs_request *sync_req; 1905 struct spdk_fs_cb_args *sync_args; 1906 1907 pthread_spin_lock(&file->lock); 1908 sync_req = TAILQ_FIRST(&file->sync_requests); 1909 sync_args = &sync_req->args; 1910 assert(sync_args->op.sync.offset <= file->length_flushed); 1911 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1912 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1913 pthread_spin_unlock(&file->lock); 1914 1915 sync_args->fn.file_op(sync_args->arg, bserrno); 1916 __check_sync_reqs(file); 1917 1918 pthread_spin_lock(&file->lock); 1919 free_fs_request(sync_req); 1920 pthread_spin_unlock(&file->lock); 1921 } 1922 1923 static void 1924 __free_args(struct spdk_fs_cb_args *args) 1925 { 1926 struct spdk_fs_request *req; 1927 1928 if (!args->from_request) { 1929 free(args); 1930 } else { 1931 /* Depends on args being at the start of the spdk_fs_request structure. */ 1932 req = (struct spdk_fs_request *)args; 1933 free_fs_request(req); 1934 } 1935 } 1936 1937 static void 1938 __check_sync_reqs(struct spdk_file *file) 1939 { 1940 struct spdk_fs_request *sync_req; 1941 1942 pthread_spin_lock(&file->lock); 1943 1944 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1945 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1946 break; 1947 } 1948 } 1949 1950 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1951 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1952 sync_req->args.op.sync.xattr_in_progress = true; 1953 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 1954 sizeof(file->length_flushed)); 1955 1956 pthread_spin_unlock(&file->lock); 1957 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, file); 1958 } else { 1959 pthread_spin_unlock(&file->lock); 1960 } 1961 } 1962 1963 static void 1964 __file_flush_done(void *arg, int bserrno) 1965 { 1966 struct spdk_fs_cb_args *args = arg; 1967 struct spdk_file *file = args->file; 1968 struct cache_buffer *next = args->op.flush.cache_buffer; 1969 1970 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1971 1972 pthread_spin_lock(&file->lock); 1973 next->in_progress = false; 1974 next->bytes_flushed += args->op.flush.length; 1975 file->length_flushed += args->op.flush.length; 1976 if (file->length_flushed > file->length) { 1977 file->length = file->length_flushed; 1978 } 1979 if (next->bytes_flushed == next->buf_size) { 1980 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1981 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1982 } 1983 1984 /* 1985 * Assert that there is no cached data that extends past the end of the underlying 1986 * blob. 1987 */ 1988 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1989 next->bytes_filled == 0); 1990 1991 pthread_spin_unlock(&file->lock); 1992 1993 __check_sync_reqs(file); 1994 1995 __file_flush(args); 1996 } 1997 1998 static void 1999 __file_flush(void *_args) 2000 { 2001 struct spdk_fs_cb_args *args = _args; 2002 struct spdk_file *file = args->file; 2003 struct cache_buffer *next; 2004 uint64_t offset, length, start_lba, num_lba; 2005 uint32_t lba_size; 2006 2007 pthread_spin_lock(&file->lock); 2008 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 2009 if (next == NULL || next->in_progress) { 2010 /* 2011 * There is either no data to flush, or a flush I/O is already in 2012 * progress. So return immediately - if a flush I/O is in 2013 * progress we will flush more data after that is completed. 2014 */ 2015 __free_args(args); 2016 if (next == NULL) { 2017 /* 2018 * For cases where a file's cache was evicted, and then the 2019 * file was later appended, we will write the data directly 2020 * to disk and bypass cache. So just update length_flushed 2021 * here to reflect that all data was already written to disk. 2022 */ 2023 file->length_flushed = file->append_pos; 2024 } 2025 pthread_spin_unlock(&file->lock); 2026 if (next == NULL) { 2027 /* 2028 * There is no data to flush, but we still need to check for any 2029 * outstanding sync requests to make sure metadata gets updated. 2030 */ 2031 __check_sync_reqs(file); 2032 } 2033 return; 2034 } 2035 2036 offset = next->offset + next->bytes_flushed; 2037 length = next->bytes_filled - next->bytes_flushed; 2038 if (length == 0) { 2039 __free_args(args); 2040 pthread_spin_unlock(&file->lock); 2041 return; 2042 } 2043 args->op.flush.length = length; 2044 args->op.flush.cache_buffer = next; 2045 2046 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2047 2048 next->in_progress = true; 2049 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2050 offset, length, start_lba, num_lba); 2051 pthread_spin_unlock(&file->lock); 2052 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2053 next->buf + (start_lba * lba_size) - next->offset, 2054 start_lba, num_lba, __file_flush_done, args); 2055 } 2056 2057 static void 2058 __file_extend_done(void *arg, int bserrno) 2059 { 2060 struct spdk_fs_cb_args *args = arg; 2061 2062 __wake_caller(args, bserrno); 2063 } 2064 2065 static void 2066 __file_extend_resize_cb(void *_args, int bserrno) 2067 { 2068 struct spdk_fs_cb_args *args = _args; 2069 struct spdk_file *file = args->file; 2070 2071 if (bserrno) { 2072 __wake_caller(args, bserrno); 2073 return; 2074 } 2075 2076 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2077 } 2078 2079 static void 2080 __file_extend_blob(void *_args) 2081 { 2082 struct spdk_fs_cb_args *args = _args; 2083 struct spdk_file *file = args->file; 2084 2085 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2086 } 2087 2088 static void 2089 __rw_from_file_done(void *arg, int bserrno) 2090 { 2091 struct spdk_fs_cb_args *args = arg; 2092 2093 __wake_caller(args, bserrno); 2094 __free_args(args); 2095 } 2096 2097 static void 2098 __rw_from_file(void *_args) 2099 { 2100 struct spdk_fs_cb_args *args = _args; 2101 struct spdk_file *file = args->file; 2102 2103 if (args->op.rw.is_read) { 2104 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 2105 args->op.rw.offset, args->op.rw.length, 2106 __rw_from_file_done, args); 2107 } else { 2108 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 2109 args->op.rw.offset, args->op.rw.length, 2110 __rw_from_file_done, args); 2111 } 2112 } 2113 2114 static int 2115 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 2116 uint64_t offset, uint64_t length, bool is_read) 2117 { 2118 struct spdk_fs_cb_args *args; 2119 2120 args = calloc(1, sizeof(*args)); 2121 if (args == NULL) { 2122 sem_post(sem); 2123 return -ENOMEM; 2124 } 2125 2126 args->file = file; 2127 args->sem = sem; 2128 args->op.rw.user_buf = payload; 2129 args->op.rw.offset = offset; 2130 args->op.rw.length = length; 2131 args->op.rw.is_read = is_read; 2132 file->fs->send_request(__rw_from_file, args); 2133 return 0; 2134 } 2135 2136 int 2137 spdk_file_write(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2138 void *payload, uint64_t offset, uint64_t length) 2139 { 2140 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2141 struct spdk_fs_cb_args *args; 2142 uint64_t rem_length, copy, blob_size, cluster_sz; 2143 uint32_t cache_buffers_filled = 0; 2144 uint8_t *cur_payload; 2145 struct cache_buffer *last; 2146 2147 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2148 2149 if (length == 0) { 2150 return 0; 2151 } 2152 2153 if (offset != file->append_pos) { 2154 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2155 return -EINVAL; 2156 } 2157 2158 pthread_spin_lock(&file->lock); 2159 file->open_for_writing = true; 2160 2161 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2162 cache_append_buffer(file); 2163 } 2164 2165 if (file->last == NULL) { 2166 int rc; 2167 2168 file->append_pos += length; 2169 pthread_spin_unlock(&file->lock); 2170 rc = __send_rw_from_file(file, &channel->sem, payload, 2171 offset, length, false); 2172 sem_wait(&channel->sem); 2173 return rc; 2174 } 2175 2176 blob_size = __file_get_blob_size(file); 2177 2178 if ((offset + length) > blob_size) { 2179 struct spdk_fs_cb_args extend_args = {}; 2180 2181 cluster_sz = file->fs->bs_opts.cluster_sz; 2182 extend_args.sem = &channel->sem; 2183 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2184 extend_args.file = file; 2185 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2186 pthread_spin_unlock(&file->lock); 2187 file->fs->send_request(__file_extend_blob, &extend_args); 2188 sem_wait(&channel->sem); 2189 if (extend_args.rc) { 2190 return extend_args.rc; 2191 } 2192 } 2193 2194 last = file->last; 2195 rem_length = length; 2196 cur_payload = payload; 2197 while (rem_length > 0) { 2198 copy = last->buf_size - last->bytes_filled; 2199 if (copy > rem_length) { 2200 copy = rem_length; 2201 } 2202 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2203 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2204 file->append_pos += copy; 2205 if (file->length < file->append_pos) { 2206 file->length = file->append_pos; 2207 } 2208 cur_payload += copy; 2209 last->bytes_filled += copy; 2210 rem_length -= copy; 2211 if (last->bytes_filled == last->buf_size) { 2212 cache_buffers_filled++; 2213 last = cache_append_buffer(file); 2214 if (last == NULL) { 2215 BLOBFS_TRACE(file, "nomem\n"); 2216 pthread_spin_unlock(&file->lock); 2217 return -ENOMEM; 2218 } 2219 } 2220 } 2221 2222 pthread_spin_unlock(&file->lock); 2223 2224 if (cache_buffers_filled == 0) { 2225 return 0; 2226 } 2227 2228 args = calloc(1, sizeof(*args)); 2229 if (args == NULL) { 2230 return -ENOMEM; 2231 } 2232 2233 args->file = file; 2234 file->fs->send_request(__file_flush, args); 2235 return 0; 2236 } 2237 2238 static void 2239 __readahead_done(void *arg, int bserrno) 2240 { 2241 struct spdk_fs_cb_args *args = arg; 2242 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2243 struct spdk_file *file = args->file; 2244 2245 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2246 2247 pthread_spin_lock(&file->lock); 2248 cache_buffer->bytes_filled = args->op.readahead.length; 2249 cache_buffer->bytes_flushed = args->op.readahead.length; 2250 cache_buffer->in_progress = false; 2251 pthread_spin_unlock(&file->lock); 2252 2253 __free_args(args); 2254 } 2255 2256 static void 2257 __readahead(void *_args) 2258 { 2259 struct spdk_fs_cb_args *args = _args; 2260 struct spdk_file *file = args->file; 2261 uint64_t offset, length, start_lba, num_lba; 2262 uint32_t lba_size; 2263 2264 offset = args->op.readahead.offset; 2265 length = args->op.readahead.length; 2266 assert(length > 0); 2267 2268 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2269 2270 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2271 offset, length, start_lba, num_lba); 2272 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2273 args->op.readahead.cache_buffer->buf, 2274 start_lba, num_lba, __readahead_done, args); 2275 } 2276 2277 static uint64_t 2278 __next_cache_buffer_offset(uint64_t offset) 2279 { 2280 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2281 } 2282 2283 static void 2284 check_readahead(struct spdk_file *file, uint64_t offset) 2285 { 2286 struct spdk_fs_cb_args *args; 2287 2288 offset = __next_cache_buffer_offset(offset); 2289 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2290 return; 2291 } 2292 2293 args = calloc(1, sizeof(*args)); 2294 if (args == NULL) { 2295 return; 2296 } 2297 2298 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2299 2300 args->file = file; 2301 args->op.readahead.offset = offset; 2302 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2303 if (!args->op.readahead.cache_buffer) { 2304 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2305 free(args); 2306 return; 2307 } 2308 2309 args->op.readahead.cache_buffer->in_progress = true; 2310 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2311 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2312 } else { 2313 args->op.readahead.length = CACHE_BUFFER_SIZE; 2314 } 2315 file->fs->send_request(__readahead, args); 2316 } 2317 2318 static int 2319 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2320 { 2321 struct cache_buffer *buf; 2322 int rc; 2323 2324 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2325 if (buf == NULL) { 2326 pthread_spin_unlock(&file->lock); 2327 rc = __send_rw_from_file(file, sem, payload, offset, length, true); 2328 pthread_spin_lock(&file->lock); 2329 return rc; 2330 } 2331 2332 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2333 length = buf->offset + buf->bytes_filled - offset; 2334 } 2335 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2336 memcpy(payload, &buf->buf[offset - buf->offset], length); 2337 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2338 pthread_spin_lock(&g_caches_lock); 2339 spdk_tree_remove_buffer(file->tree, buf); 2340 if (file->tree->present_mask == 0) { 2341 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2342 } 2343 pthread_spin_unlock(&g_caches_lock); 2344 } 2345 2346 sem_post(sem); 2347 return 0; 2348 } 2349 2350 int64_t 2351 spdk_file_read(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx, 2352 void *payload, uint64_t offset, uint64_t length) 2353 { 2354 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2355 uint64_t final_offset, final_length; 2356 uint32_t sub_reads = 0; 2357 int rc = 0; 2358 2359 pthread_spin_lock(&file->lock); 2360 2361 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2362 2363 file->open_for_writing = false; 2364 2365 if (length == 0 || offset >= file->append_pos) { 2366 pthread_spin_unlock(&file->lock); 2367 return 0; 2368 } 2369 2370 if (offset + length > file->append_pos) { 2371 length = file->append_pos - offset; 2372 } 2373 2374 if (offset != file->next_seq_offset) { 2375 file->seq_byte_count = 0; 2376 } 2377 file->seq_byte_count += length; 2378 file->next_seq_offset = offset + length; 2379 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2380 check_readahead(file, offset); 2381 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2382 } 2383 2384 final_length = 0; 2385 final_offset = offset + length; 2386 while (offset < final_offset) { 2387 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2388 if (length > (final_offset - offset)) { 2389 length = final_offset - offset; 2390 } 2391 rc = __file_read(file, payload, offset, length, &channel->sem); 2392 if (rc == 0) { 2393 final_length += length; 2394 } else { 2395 break; 2396 } 2397 payload += length; 2398 offset += length; 2399 sub_reads++; 2400 } 2401 pthread_spin_unlock(&file->lock); 2402 while (sub_reads-- > 0) { 2403 sem_wait(&channel->sem); 2404 } 2405 if (rc == 0) { 2406 return final_length; 2407 } else { 2408 return rc; 2409 } 2410 } 2411 2412 static void 2413 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2414 spdk_file_op_complete cb_fn, void *cb_arg) 2415 { 2416 struct spdk_fs_request *sync_req; 2417 struct spdk_fs_request *flush_req; 2418 struct spdk_fs_cb_args *sync_args; 2419 struct spdk_fs_cb_args *flush_args; 2420 2421 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2422 2423 pthread_spin_lock(&file->lock); 2424 if (file->append_pos <= file->length_flushed) { 2425 BLOBFS_TRACE(file, "done - no data to flush\n"); 2426 pthread_spin_unlock(&file->lock); 2427 cb_fn(cb_arg, 0); 2428 return; 2429 } 2430 2431 sync_req = alloc_fs_request(channel); 2432 if (!sync_req) { 2433 pthread_spin_unlock(&file->lock); 2434 cb_fn(cb_arg, -ENOMEM); 2435 return; 2436 } 2437 sync_args = &sync_req->args; 2438 2439 flush_req = alloc_fs_request(channel); 2440 if (!flush_req) { 2441 pthread_spin_unlock(&file->lock); 2442 cb_fn(cb_arg, -ENOMEM); 2443 return; 2444 } 2445 flush_args = &flush_req->args; 2446 2447 sync_args->file = file; 2448 sync_args->fn.file_op = cb_fn; 2449 sync_args->arg = cb_arg; 2450 sync_args->op.sync.offset = file->append_pos; 2451 sync_args->op.sync.xattr_in_progress = false; 2452 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2453 pthread_spin_unlock(&file->lock); 2454 2455 flush_args->file = file; 2456 channel->send_request(__file_flush, flush_args); 2457 } 2458 2459 int 2460 spdk_file_sync(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2461 { 2462 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2463 struct spdk_fs_cb_args args = {}; 2464 2465 args.sem = &channel->sem; 2466 _file_sync(file, channel, __wake_caller, &args); 2467 sem_wait(&channel->sem); 2468 2469 return args.rc; 2470 } 2471 2472 void 2473 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2474 spdk_file_op_complete cb_fn, void *cb_arg) 2475 { 2476 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2477 2478 _file_sync(file, channel, cb_fn, cb_arg); 2479 } 2480 2481 void 2482 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2483 { 2484 BLOBFS_TRACE(file, "priority=%u\n", priority); 2485 file->priority = priority; 2486 2487 } 2488 2489 /* 2490 * Close routines 2491 */ 2492 2493 static void 2494 __file_close_async_done(void *ctx, int bserrno) 2495 { 2496 struct spdk_fs_request *req = ctx; 2497 struct spdk_fs_cb_args *args = &req->args; 2498 struct spdk_file *file = args->file; 2499 2500 if (file->is_deleted) { 2501 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2502 return; 2503 } 2504 2505 args->fn.file_op(args->arg, bserrno); 2506 free_fs_request(req); 2507 } 2508 2509 static void 2510 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2511 { 2512 struct spdk_blob *blob; 2513 2514 pthread_spin_lock(&file->lock); 2515 if (file->ref_count == 0) { 2516 pthread_spin_unlock(&file->lock); 2517 __file_close_async_done(req, -EBADF); 2518 return; 2519 } 2520 2521 file->ref_count--; 2522 if (file->ref_count > 0) { 2523 pthread_spin_unlock(&file->lock); 2524 req->args.fn.file_op(req->args.arg, 0); 2525 free_fs_request(req); 2526 return; 2527 } 2528 2529 pthread_spin_unlock(&file->lock); 2530 2531 blob = file->blob; 2532 file->blob = NULL; 2533 spdk_blob_close(blob, __file_close_async_done, req); 2534 } 2535 2536 static void 2537 __file_close_async__sync_done(void *arg, int fserrno) 2538 { 2539 struct spdk_fs_request *req = arg; 2540 struct spdk_fs_cb_args *args = &req->args; 2541 2542 __file_close_async(args->file, req); 2543 } 2544 2545 void 2546 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2547 { 2548 struct spdk_fs_request *req; 2549 struct spdk_fs_cb_args *args; 2550 2551 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2552 if (req == NULL) { 2553 cb_fn(cb_arg, -ENOMEM); 2554 return; 2555 } 2556 2557 args = &req->args; 2558 args->file = file; 2559 args->fn.file_op = cb_fn; 2560 args->arg = cb_arg; 2561 2562 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2563 } 2564 2565 static void 2566 __file_close(void *arg) 2567 { 2568 struct spdk_fs_request *req = arg; 2569 struct spdk_fs_cb_args *args = &req->args; 2570 struct spdk_file *file = args->file; 2571 2572 __file_close_async(file, req); 2573 } 2574 2575 int 2576 spdk_file_close(struct spdk_file *file, struct spdk_fs_thread_ctx *ctx) 2577 { 2578 struct spdk_fs_channel *channel = (struct spdk_fs_channel *)ctx; 2579 struct spdk_fs_request *req; 2580 struct spdk_fs_cb_args *args; 2581 2582 req = alloc_fs_request(channel); 2583 if (req == NULL) { 2584 return -ENOMEM; 2585 } 2586 2587 args = &req->args; 2588 2589 spdk_file_sync(file, ctx); 2590 BLOBFS_TRACE(file, "name=%s\n", file->name); 2591 args->file = file; 2592 args->sem = &channel->sem; 2593 args->fn.file_op = __wake_caller; 2594 args->arg = req; 2595 channel->send_request(__file_close, req); 2596 sem_wait(&channel->sem); 2597 2598 return args->rc; 2599 } 2600 2601 int 2602 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2603 { 2604 if (size < sizeof(spdk_blob_id)) { 2605 return -EINVAL; 2606 } 2607 2608 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2609 2610 return sizeof(spdk_blob_id); 2611 } 2612 2613 static void 2614 cache_free_buffers(struct spdk_file *file) 2615 { 2616 BLOBFS_TRACE(file, "free=%s\n", file->name); 2617 pthread_spin_lock(&file->lock); 2618 pthread_spin_lock(&g_caches_lock); 2619 if (file->tree->present_mask == 0) { 2620 pthread_spin_unlock(&g_caches_lock); 2621 pthread_spin_unlock(&file->lock); 2622 return; 2623 } 2624 spdk_tree_free_buffers(file->tree); 2625 2626 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2627 /* If not freed, put it in the end of the queue */ 2628 if (file->tree->present_mask != 0) { 2629 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2630 } 2631 file->last = NULL; 2632 pthread_spin_unlock(&g_caches_lock); 2633 pthread_spin_unlock(&file->lock); 2634 } 2635 2636 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2637 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2638