1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "blobfs_internal.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/io_channel.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 47 #define BLOBFS_TRACE(file, str, args...) \ 48 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 49 50 #define BLOBFS_TRACE_RW(file, str, args...) \ 51 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 54 #define SPDK_BLOBFS_OPTS_CLUSTER_SZ (1024 * 1024) 55 56 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 57 static struct spdk_mempool *g_cache_pool; 58 static TAILQ_HEAD(, spdk_file) g_caches; 59 static int g_fs_count = 0; 60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 61 static pthread_spinlock_t g_caches_lock; 62 63 static void 64 __sem_post(void *arg, int bserrno) 65 { 66 sem_t *sem = arg; 67 68 sem_post(sem); 69 } 70 71 void 72 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 73 { 74 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 75 free(cache_buffer); 76 } 77 78 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 79 80 struct spdk_file { 81 struct spdk_filesystem *fs; 82 struct spdk_blob *blob; 83 char *name; 84 uint64_t length; 85 bool is_deleted; 86 bool open_for_writing; 87 uint64_t length_flushed; 88 uint64_t append_pos; 89 uint64_t seq_byte_count; 90 uint64_t next_seq_offset; 91 uint32_t priority; 92 TAILQ_ENTRY(spdk_file) tailq; 93 spdk_blob_id blobid; 94 uint32_t ref_count; 95 pthread_spinlock_t lock; 96 struct cache_buffer *last; 97 struct cache_tree *tree; 98 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 99 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 100 TAILQ_ENTRY(spdk_file) cache_tailq; 101 }; 102 103 struct spdk_deleted_file { 104 spdk_blob_id id; 105 TAILQ_ENTRY(spdk_deleted_file) tailq; 106 }; 107 108 struct spdk_filesystem { 109 struct spdk_blob_store *bs; 110 TAILQ_HEAD(, spdk_file) files; 111 struct spdk_bs_opts bs_opts; 112 struct spdk_bs_dev *bdev; 113 fs_send_request_fn send_request; 114 115 struct { 116 uint32_t max_ops; 117 struct spdk_io_channel *sync_io_channel; 118 struct spdk_fs_channel *sync_fs_channel; 119 } sync_target; 120 121 struct { 122 uint32_t max_ops; 123 struct spdk_io_channel *md_io_channel; 124 struct spdk_fs_channel *md_fs_channel; 125 } md_target; 126 127 struct { 128 uint32_t max_ops; 129 } io_target; 130 }; 131 132 struct spdk_fs_cb_args { 133 union { 134 spdk_fs_op_with_handle_complete fs_op_with_handle; 135 spdk_fs_op_complete fs_op; 136 spdk_file_op_with_handle_complete file_op_with_handle; 137 spdk_file_op_complete file_op; 138 spdk_file_stat_op_complete stat_op; 139 } fn; 140 void *arg; 141 sem_t *sem; 142 struct spdk_filesystem *fs; 143 struct spdk_file *file; 144 int rc; 145 bool from_request; 146 union { 147 struct { 148 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 149 } fs_load; 150 struct { 151 uint64_t length; 152 } truncate; 153 struct { 154 struct spdk_io_channel *channel; 155 void *user_buf; 156 void *pin_buf; 157 int is_read; 158 off_t offset; 159 size_t length; 160 uint64_t start_page; 161 uint64_t num_pages; 162 uint32_t blocklen; 163 } rw; 164 struct { 165 const char *old_name; 166 const char *new_name; 167 } rename; 168 struct { 169 struct cache_buffer *cache_buffer; 170 uint64_t length; 171 } flush; 172 struct { 173 struct cache_buffer *cache_buffer; 174 uint64_t length; 175 uint64_t offset; 176 } readahead; 177 struct { 178 uint64_t offset; 179 TAILQ_ENTRY(spdk_fs_request) tailq; 180 bool xattr_in_progress; 181 } sync; 182 struct { 183 uint32_t num_clusters; 184 } resize; 185 struct { 186 const char *name; 187 uint32_t flags; 188 TAILQ_ENTRY(spdk_fs_request) tailq; 189 } open; 190 struct { 191 const char *name; 192 } create; 193 struct { 194 const char *name; 195 } delete; 196 struct { 197 const char *name; 198 } stat; 199 } op; 200 }; 201 202 static void cache_free_buffers(struct spdk_file *file); 203 204 void 205 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 206 { 207 opts->cluster_sz = SPDK_BLOBFS_OPTS_CLUSTER_SZ; 208 } 209 210 static void 211 __initialize_cache(void) 212 { 213 assert(g_cache_pool == NULL); 214 215 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 216 g_fs_cache_size / CACHE_BUFFER_SIZE, 217 CACHE_BUFFER_SIZE, 218 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 219 SPDK_ENV_SOCKET_ID_ANY); 220 TAILQ_INIT(&g_caches); 221 pthread_spin_init(&g_caches_lock, 0); 222 } 223 224 static void 225 __free_cache(void) 226 { 227 assert(g_cache_pool != NULL); 228 229 spdk_mempool_free(g_cache_pool); 230 g_cache_pool = NULL; 231 } 232 233 static uint64_t 234 __file_get_blob_size(struct spdk_file *file) 235 { 236 uint64_t cluster_sz; 237 238 cluster_sz = file->fs->bs_opts.cluster_sz; 239 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 240 } 241 242 struct spdk_fs_request { 243 struct spdk_fs_cb_args args; 244 TAILQ_ENTRY(spdk_fs_request) link; 245 struct spdk_fs_channel *channel; 246 }; 247 248 struct spdk_fs_channel { 249 struct spdk_fs_request *req_mem; 250 TAILQ_HEAD(, spdk_fs_request) reqs; 251 sem_t sem; 252 struct spdk_filesystem *fs; 253 struct spdk_io_channel *bs_channel; 254 fs_send_request_fn send_request; 255 bool sync; 256 pthread_spinlock_t lock; 257 }; 258 259 static struct spdk_fs_request * 260 alloc_fs_request(struct spdk_fs_channel *channel) 261 { 262 struct spdk_fs_request *req; 263 264 if (channel->sync) { 265 pthread_spin_lock(&channel->lock); 266 } 267 268 req = TAILQ_FIRST(&channel->reqs); 269 if (req) { 270 TAILQ_REMOVE(&channel->reqs, req, link); 271 } 272 273 if (channel->sync) { 274 pthread_spin_unlock(&channel->lock); 275 } 276 277 if (req == NULL) { 278 return NULL; 279 } 280 memset(req, 0, sizeof(*req)); 281 req->channel = channel; 282 req->args.from_request = true; 283 284 return req; 285 } 286 287 static void 288 free_fs_request(struct spdk_fs_request *req) 289 { 290 struct spdk_fs_channel *channel = req->channel; 291 292 if (channel->sync) { 293 pthread_spin_lock(&channel->lock); 294 } 295 296 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 297 298 if (channel->sync) { 299 pthread_spin_unlock(&channel->lock); 300 } 301 } 302 303 static int 304 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 305 uint32_t max_ops) 306 { 307 uint32_t i; 308 309 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 310 if (!channel->req_mem) { 311 return -1; 312 } 313 314 TAILQ_INIT(&channel->reqs); 315 sem_init(&channel->sem, 0, 0); 316 317 for (i = 0; i < max_ops; i++) { 318 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 319 } 320 321 channel->fs = fs; 322 323 return 0; 324 } 325 326 static int 327 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 328 { 329 struct spdk_filesystem *fs; 330 struct spdk_fs_channel *channel = ctx_buf; 331 332 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 333 334 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 335 } 336 337 static int 338 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 339 { 340 struct spdk_filesystem *fs; 341 struct spdk_fs_channel *channel = ctx_buf; 342 343 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 344 345 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 346 } 347 348 static int 349 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 350 { 351 struct spdk_filesystem *fs; 352 struct spdk_fs_channel *channel = ctx_buf; 353 354 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 355 356 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 357 } 358 359 static void 360 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 361 { 362 struct spdk_fs_channel *channel = ctx_buf; 363 364 free(channel->req_mem); 365 if (channel->bs_channel != NULL) { 366 spdk_bs_free_io_channel(channel->bs_channel); 367 } 368 } 369 370 static void 371 __send_request_direct(fs_request_fn fn, void *arg) 372 { 373 fn(arg); 374 } 375 376 static void 377 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 378 { 379 fs->bs = bs; 380 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 381 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 382 fs->md_target.md_fs_channel->send_request = __send_request_direct; 383 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 384 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 385 386 pthread_mutex_lock(&g_cache_init_lock); 387 if (g_fs_count == 0) { 388 __initialize_cache(); 389 } 390 g_fs_count++; 391 pthread_mutex_unlock(&g_cache_init_lock); 392 } 393 394 static void 395 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 396 { 397 struct spdk_fs_request *req = ctx; 398 struct spdk_fs_cb_args *args = &req->args; 399 struct spdk_filesystem *fs = args->fs; 400 401 if (bserrno == 0) { 402 common_fs_bs_init(fs, bs); 403 } else { 404 free(fs); 405 fs = NULL; 406 } 407 408 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 409 free_fs_request(req); 410 } 411 412 static void 413 fs_conf_parse(void) 414 { 415 struct spdk_conf_section *sp; 416 417 sp = spdk_conf_find_section(NULL, "Blobfs"); 418 if (sp == NULL) { 419 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 420 return; 421 } 422 423 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 424 if (g_fs_cache_buffer_shift <= 0) { 425 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 426 } 427 } 428 429 static struct spdk_filesystem * 430 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 431 { 432 struct spdk_filesystem *fs; 433 434 fs = calloc(1, sizeof(*fs)); 435 if (fs == NULL) { 436 return NULL; 437 } 438 439 fs->bdev = dev; 440 fs->send_request = send_request_fn; 441 TAILQ_INIT(&fs->files); 442 443 fs->md_target.max_ops = 512; 444 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 445 sizeof(struct spdk_fs_channel)); 446 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 447 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 448 449 fs->sync_target.max_ops = 512; 450 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 451 sizeof(struct spdk_fs_channel)); 452 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 453 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 454 455 fs->io_target.max_ops = 512; 456 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 457 sizeof(struct spdk_fs_channel)); 458 459 return fs; 460 } 461 462 void 463 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 464 fs_send_request_fn send_request_fn, 465 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 466 { 467 struct spdk_filesystem *fs; 468 struct spdk_fs_request *req; 469 struct spdk_fs_cb_args *args; 470 struct spdk_bs_opts opts = {}; 471 472 fs = fs_alloc(dev, send_request_fn); 473 if (fs == NULL) { 474 cb_fn(cb_arg, NULL, -ENOMEM); 475 return; 476 } 477 478 fs_conf_parse(); 479 480 req = alloc_fs_request(fs->md_target.md_fs_channel); 481 if (req == NULL) { 482 spdk_put_io_channel(fs->md_target.md_io_channel); 483 spdk_io_device_unregister(&fs->md_target, NULL); 484 spdk_put_io_channel(fs->sync_target.sync_io_channel); 485 spdk_io_device_unregister(&fs->sync_target, NULL); 486 spdk_io_device_unregister(&fs->io_target, NULL); 487 free(fs); 488 cb_fn(cb_arg, NULL, -ENOMEM); 489 return; 490 } 491 492 args = &req->args; 493 args->fn.fs_op_with_handle = cb_fn; 494 args->arg = cb_arg; 495 args->fs = fs; 496 497 spdk_bs_opts_init(&opts); 498 strncpy(opts.bstype.bstype, "BLOBFS", SPDK_BLOBSTORE_TYPE_LENGTH); 499 if (opt) { 500 opts.cluster_sz = opt->cluster_sz; 501 } 502 spdk_bs_init(dev, &opts, init_cb, req); 503 } 504 505 static struct spdk_file * 506 file_alloc(struct spdk_filesystem *fs) 507 { 508 struct spdk_file *file; 509 510 file = calloc(1, sizeof(*file)); 511 if (file == NULL) { 512 return NULL; 513 } 514 515 file->tree = calloc(1, sizeof(*file->tree)); 516 if (file->tree == NULL) { 517 free(file); 518 return NULL; 519 } 520 521 file->fs = fs; 522 TAILQ_INIT(&file->open_requests); 523 TAILQ_INIT(&file->sync_requests); 524 pthread_spin_init(&file->lock, 0); 525 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 526 file->priority = SPDK_FILE_PRIORITY_LOW; 527 return file; 528 } 529 530 static void fs_load_done(void *ctx, int bserrno); 531 532 static int 533 _handle_deleted_files(struct spdk_fs_request *req) 534 { 535 struct spdk_fs_cb_args *args = &req->args; 536 struct spdk_filesystem *fs = args->fs; 537 538 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 539 struct spdk_deleted_file *deleted_file; 540 541 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 542 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 543 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 544 free(deleted_file); 545 return 0; 546 } 547 548 return 1; 549 } 550 551 static void 552 fs_load_done(void *ctx, int bserrno) 553 { 554 struct spdk_fs_request *req = ctx; 555 struct spdk_fs_cb_args *args = &req->args; 556 struct spdk_filesystem *fs = args->fs; 557 558 /* The filesystem has been loaded. Now check if there are any files that 559 * were marked for deletion before last unload. Do not complete the 560 * fs_load callback until all of them have been deleted on disk. 561 */ 562 if (_handle_deleted_files(req) == 0) { 563 /* We found a file that's been marked for deleting but not actually 564 * deleted yet. This function will get called again once the delete 565 * operation is completed. 566 */ 567 return; 568 } 569 570 args->fn.fs_op_with_handle(args->arg, fs, 0); 571 free_fs_request(req); 572 573 } 574 575 static void 576 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 577 { 578 struct spdk_fs_request *req = ctx; 579 struct spdk_fs_cb_args *args = &req->args; 580 struct spdk_filesystem *fs = args->fs; 581 uint64_t *length; 582 const char *name; 583 uint32_t *is_deleted; 584 size_t value_len; 585 586 if (rc < 0) { 587 args->fn.fs_op_with_handle(args->arg, fs, rc); 588 free_fs_request(req); 589 return; 590 } 591 592 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 593 if (rc < 0) { 594 args->fn.fs_op_with_handle(args->arg, fs, rc); 595 free_fs_request(req); 596 return; 597 } 598 599 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 600 if (rc < 0) { 601 args->fn.fs_op_with_handle(args->arg, fs, rc); 602 free_fs_request(req); 603 return; 604 } 605 606 assert(value_len == 8); 607 608 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 609 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 610 if (rc < 0) { 611 struct spdk_file *f; 612 613 f = file_alloc(fs); 614 if (f == NULL) { 615 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 616 free_fs_request(req); 617 return; 618 } 619 620 f->name = strdup(name); 621 f->blobid = spdk_blob_get_id(blob); 622 f->length = *length; 623 f->length_flushed = *length; 624 f->append_pos = *length; 625 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 626 } else { 627 struct spdk_deleted_file *deleted_file; 628 629 deleted_file = calloc(1, sizeof(*deleted_file)); 630 if (deleted_file == NULL) { 631 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 632 free_fs_request(req); 633 return; 634 } 635 deleted_file->id = spdk_blob_get_id(blob); 636 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 637 } 638 } 639 640 static void 641 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 642 { 643 struct spdk_fs_request *req = ctx; 644 struct spdk_fs_cb_args *args = &req->args; 645 struct spdk_filesystem *fs = args->fs; 646 struct spdk_bs_type bstype; 647 static const struct spdk_bs_type blobfs_type = {"BLOBFS"}; 648 static const struct spdk_bs_type zeros; 649 650 if (bserrno != 0) { 651 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 652 free_fs_request(req); 653 free(fs); 654 return; 655 } 656 657 bstype = spdk_bs_get_bstype(bs); 658 659 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 660 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n"); 661 spdk_bs_set_bstype(bs, blobfs_type); 662 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 663 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n"); 664 SPDK_TRACEDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype)); 665 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 666 free_fs_request(req); 667 free(fs); 668 return; 669 } 670 671 common_fs_bs_init(fs, bs); 672 fs_load_done(req, 0); 673 } 674 675 void 676 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 677 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 678 { 679 struct spdk_filesystem *fs; 680 struct spdk_fs_cb_args *args; 681 struct spdk_fs_request *req; 682 struct spdk_bs_opts bs_opts; 683 684 fs = fs_alloc(dev, send_request_fn); 685 if (fs == NULL) { 686 cb_fn(cb_arg, NULL, -ENOMEM); 687 return; 688 } 689 690 fs_conf_parse(); 691 692 req = alloc_fs_request(fs->md_target.md_fs_channel); 693 if (req == NULL) { 694 spdk_put_io_channel(fs->md_target.md_io_channel); 695 spdk_io_device_unregister(&fs->md_target, NULL); 696 spdk_put_io_channel(fs->sync_target.sync_io_channel); 697 spdk_io_device_unregister(&fs->sync_target, NULL); 698 spdk_io_device_unregister(&fs->io_target, NULL); 699 free(fs); 700 cb_fn(cb_arg, NULL, -ENOMEM); 701 return; 702 } 703 704 args = &req->args; 705 args->fn.fs_op_with_handle = cb_fn; 706 args->arg = cb_arg; 707 args->fs = fs; 708 TAILQ_INIT(&args->op.fs_load.deleted_files); 709 spdk_bs_opts_init(&bs_opts); 710 bs_opts.iter_cb_fn = iter_cb; 711 bs_opts.iter_cb_arg = req; 712 spdk_bs_load(dev, &bs_opts, load_cb, req); 713 } 714 715 static void 716 unload_cb(void *ctx, int bserrno) 717 { 718 struct spdk_fs_request *req = ctx; 719 struct spdk_fs_cb_args *args = &req->args; 720 struct spdk_filesystem *fs = args->fs; 721 722 pthread_mutex_lock(&g_cache_init_lock); 723 g_fs_count--; 724 if (g_fs_count == 0) { 725 __free_cache(); 726 } 727 pthread_mutex_unlock(&g_cache_init_lock); 728 729 args->fn.fs_op(args->arg, bserrno); 730 free(req); 731 732 spdk_io_device_unregister(&fs->io_target, NULL); 733 spdk_io_device_unregister(&fs->sync_target, NULL); 734 spdk_io_device_unregister(&fs->md_target, NULL); 735 736 free(fs); 737 } 738 739 void 740 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 741 { 742 struct spdk_fs_request *req; 743 struct spdk_fs_cb_args *args; 744 745 /* 746 * We must free the md_channel before unloading the blobstore, so just 747 * allocate this request from the general heap. 748 */ 749 req = calloc(1, sizeof(*req)); 750 if (req == NULL) { 751 cb_fn(cb_arg, -ENOMEM); 752 return; 753 } 754 755 args = &req->args; 756 args->fn.fs_op = cb_fn; 757 args->arg = cb_arg; 758 args->fs = fs; 759 760 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 761 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 762 spdk_bs_unload(fs->bs, unload_cb, req); 763 } 764 765 static struct spdk_file * 766 fs_find_file(struct spdk_filesystem *fs, const char *name) 767 { 768 struct spdk_file *file; 769 770 TAILQ_FOREACH(file, &fs->files, tailq) { 771 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 772 return file; 773 } 774 } 775 776 return NULL; 777 } 778 779 void 780 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 781 spdk_file_stat_op_complete cb_fn, void *cb_arg) 782 { 783 struct spdk_file_stat stat; 784 struct spdk_file *f = NULL; 785 786 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 787 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 788 return; 789 } 790 791 f = fs_find_file(fs, name); 792 if (f != NULL) { 793 stat.blobid = f->blobid; 794 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 795 cb_fn(cb_arg, &stat, 0); 796 return; 797 } 798 799 cb_fn(cb_arg, NULL, -ENOENT); 800 } 801 802 static void 803 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 804 { 805 struct spdk_fs_request *req = arg; 806 struct spdk_fs_cb_args *args = &req->args; 807 808 args->rc = fserrno; 809 if (fserrno == 0) { 810 memcpy(args->arg, stat, sizeof(*stat)); 811 } 812 sem_post(args->sem); 813 } 814 815 static void 816 __file_stat(void *arg) 817 { 818 struct spdk_fs_request *req = arg; 819 struct spdk_fs_cb_args *args = &req->args; 820 821 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 822 args->fn.stat_op, req); 823 } 824 825 int 826 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 827 const char *name, struct spdk_file_stat *stat) 828 { 829 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 830 struct spdk_fs_request *req; 831 int rc; 832 833 req = alloc_fs_request(channel); 834 assert(req != NULL); 835 836 req->args.fs = fs; 837 req->args.op.stat.name = name; 838 req->args.fn.stat_op = __copy_stat; 839 req->args.arg = stat; 840 req->args.sem = &channel->sem; 841 channel->send_request(__file_stat, req); 842 sem_wait(&channel->sem); 843 844 rc = req->args.rc; 845 free_fs_request(req); 846 847 return rc; 848 } 849 850 static void 851 fs_create_blob_close_cb(void *ctx, int bserrno) 852 { 853 struct spdk_fs_request *req = ctx; 854 struct spdk_fs_cb_args *args = &req->args; 855 856 args->fn.file_op(args->arg, bserrno); 857 free_fs_request(req); 858 } 859 860 static void 861 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 862 { 863 struct spdk_fs_request *req = ctx; 864 struct spdk_fs_cb_args *args = &req->args; 865 struct spdk_file *f = args->file; 866 uint64_t length = 0; 867 868 spdk_blob_resize(blob, 1); 869 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 870 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 871 872 spdk_blob_close(blob, fs_create_blob_close_cb, args); 873 } 874 875 static void 876 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 877 { 878 struct spdk_fs_request *req = ctx; 879 struct spdk_fs_cb_args *args = &req->args; 880 struct spdk_file *f = args->file; 881 882 f->blobid = blobid; 883 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 884 } 885 886 void 887 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 888 spdk_file_op_complete cb_fn, void *cb_arg) 889 { 890 struct spdk_file *file; 891 struct spdk_fs_request *req; 892 struct spdk_fs_cb_args *args; 893 894 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 895 cb_fn(cb_arg, -ENAMETOOLONG); 896 return; 897 } 898 899 file = fs_find_file(fs, name); 900 if (file != NULL) { 901 cb_fn(cb_arg, -EEXIST); 902 return; 903 } 904 905 file = file_alloc(fs); 906 if (file == NULL) { 907 cb_fn(cb_arg, -ENOMEM); 908 return; 909 } 910 911 req = alloc_fs_request(fs->md_target.md_fs_channel); 912 if (req == NULL) { 913 cb_fn(cb_arg, -ENOMEM); 914 return; 915 } 916 917 args = &req->args; 918 args->file = file; 919 args->fn.file_op = cb_fn; 920 args->arg = cb_arg; 921 922 file->name = strdup(name); 923 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 924 } 925 926 static void 927 __fs_create_file_done(void *arg, int fserrno) 928 { 929 struct spdk_fs_request *req = arg; 930 struct spdk_fs_cb_args *args = &req->args; 931 932 args->rc = fserrno; 933 sem_post(args->sem); 934 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 935 } 936 937 static void 938 __fs_create_file(void *arg) 939 { 940 struct spdk_fs_request *req = arg; 941 struct spdk_fs_cb_args *args = &req->args; 942 943 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 944 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 945 } 946 947 int 948 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 949 { 950 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 951 struct spdk_fs_request *req; 952 struct spdk_fs_cb_args *args; 953 int rc; 954 955 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 956 957 req = alloc_fs_request(channel); 958 assert(req != NULL); 959 960 args = &req->args; 961 args->fs = fs; 962 args->op.create.name = name; 963 args->sem = &channel->sem; 964 fs->send_request(__fs_create_file, req); 965 sem_wait(&channel->sem); 966 rc = args->rc; 967 free_fs_request(req); 968 969 return rc; 970 } 971 972 static void 973 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 974 { 975 struct spdk_fs_request *req = ctx; 976 struct spdk_fs_cb_args *args = &req->args; 977 struct spdk_file *f = args->file; 978 979 f->blob = blob; 980 while (!TAILQ_EMPTY(&f->open_requests)) { 981 req = TAILQ_FIRST(&f->open_requests); 982 args = &req->args; 983 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 984 args->fn.file_op_with_handle(args->arg, f, bserrno); 985 free_fs_request(req); 986 } 987 } 988 989 static void 990 fs_open_blob_create_cb(void *ctx, int bserrno) 991 { 992 struct spdk_fs_request *req = ctx; 993 struct spdk_fs_cb_args *args = &req->args; 994 struct spdk_file *file = args->file; 995 struct spdk_filesystem *fs = args->fs; 996 997 if (file == NULL) { 998 /* 999 * This is from an open with CREATE flag - the file 1000 * is now created so look it up in the file list for this 1001 * filesystem. 1002 */ 1003 file = fs_find_file(fs, args->op.open.name); 1004 assert(file != NULL); 1005 args->file = file; 1006 } 1007 1008 file->ref_count++; 1009 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1010 if (file->ref_count == 1) { 1011 assert(file->blob == NULL); 1012 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1013 } else if (file->blob != NULL) { 1014 fs_open_blob_done(req, file->blob, 0); 1015 } else { 1016 /* 1017 * The blob open for this file is in progress due to a previous 1018 * open request. When that open completes, it will invoke the 1019 * open callback for this request. 1020 */ 1021 } 1022 } 1023 1024 void 1025 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1026 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1027 { 1028 struct spdk_file *f = NULL; 1029 struct spdk_fs_request *req; 1030 struct spdk_fs_cb_args *args; 1031 1032 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1033 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1034 return; 1035 } 1036 1037 f = fs_find_file(fs, name); 1038 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1039 cb_fn(cb_arg, NULL, -ENOENT); 1040 return; 1041 } 1042 1043 if (f != NULL && f->is_deleted == true) { 1044 cb_fn(cb_arg, NULL, -ENOENT); 1045 return; 1046 } 1047 1048 req = alloc_fs_request(fs->md_target.md_fs_channel); 1049 if (req == NULL) { 1050 cb_fn(cb_arg, NULL, -ENOMEM); 1051 return; 1052 } 1053 1054 args = &req->args; 1055 args->fn.file_op_with_handle = cb_fn; 1056 args->arg = cb_arg; 1057 args->file = f; 1058 args->fs = fs; 1059 args->op.open.name = name; 1060 1061 if (f == NULL) { 1062 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1063 } else { 1064 fs_open_blob_create_cb(req, 0); 1065 } 1066 } 1067 1068 static void 1069 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1070 { 1071 struct spdk_fs_request *req = arg; 1072 struct spdk_fs_cb_args *args = &req->args; 1073 1074 args->file = file; 1075 args->rc = bserrno; 1076 sem_post(args->sem); 1077 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1078 } 1079 1080 static void 1081 __fs_open_file(void *arg) 1082 { 1083 struct spdk_fs_request *req = arg; 1084 struct spdk_fs_cb_args *args = &req->args; 1085 1086 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1087 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1088 __fs_open_file_done, req); 1089 } 1090 1091 int 1092 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1093 const char *name, uint32_t flags, struct spdk_file **file) 1094 { 1095 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1096 struct spdk_fs_request *req; 1097 struct spdk_fs_cb_args *args; 1098 int rc; 1099 1100 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1101 1102 req = alloc_fs_request(channel); 1103 assert(req != NULL); 1104 1105 args = &req->args; 1106 args->fs = fs; 1107 args->op.open.name = name; 1108 args->op.open.flags = flags; 1109 args->sem = &channel->sem; 1110 fs->send_request(__fs_open_file, req); 1111 sem_wait(&channel->sem); 1112 rc = args->rc; 1113 if (rc == 0) { 1114 *file = args->file; 1115 } else { 1116 *file = NULL; 1117 } 1118 free_fs_request(req); 1119 1120 return rc; 1121 } 1122 1123 static void 1124 fs_rename_blob_close_cb(void *ctx, int bserrno) 1125 { 1126 struct spdk_fs_request *req = ctx; 1127 struct spdk_fs_cb_args *args = &req->args; 1128 1129 args->fn.fs_op(args->arg, bserrno); 1130 free_fs_request(req); 1131 } 1132 1133 static void 1134 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1135 { 1136 struct spdk_fs_request *req = ctx; 1137 struct spdk_fs_cb_args *args = &req->args; 1138 const char *new_name = args->op.rename.new_name; 1139 1140 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1141 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1142 } 1143 1144 static void 1145 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1146 { 1147 struct spdk_fs_cb_args *args = &req->args; 1148 struct spdk_file *f; 1149 1150 f = fs_find_file(args->fs, args->op.rename.old_name); 1151 if (f == NULL) { 1152 args->fn.fs_op(args->arg, -ENOENT); 1153 free_fs_request(req); 1154 return; 1155 } 1156 1157 free(f->name); 1158 f->name = strdup(args->op.rename.new_name); 1159 args->file = f; 1160 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1161 } 1162 1163 static void 1164 fs_rename_delete_done(void *arg, int fserrno) 1165 { 1166 __spdk_fs_md_rename_file(arg); 1167 } 1168 1169 void 1170 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1171 const char *old_name, const char *new_name, 1172 spdk_file_op_complete cb_fn, void *cb_arg) 1173 { 1174 struct spdk_file *f; 1175 struct spdk_fs_request *req; 1176 struct spdk_fs_cb_args *args; 1177 1178 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1179 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1180 cb_fn(cb_arg, -ENAMETOOLONG); 1181 return; 1182 } 1183 1184 req = alloc_fs_request(fs->md_target.md_fs_channel); 1185 if (req == NULL) { 1186 cb_fn(cb_arg, -ENOMEM); 1187 return; 1188 } 1189 1190 args = &req->args; 1191 args->fn.fs_op = cb_fn; 1192 args->fs = fs; 1193 args->arg = cb_arg; 1194 args->op.rename.old_name = old_name; 1195 args->op.rename.new_name = new_name; 1196 1197 f = fs_find_file(fs, new_name); 1198 if (f == NULL) { 1199 __spdk_fs_md_rename_file(req); 1200 return; 1201 } 1202 1203 /* 1204 * The rename overwrites an existing file. So delete the existing file, then 1205 * do the actual rename. 1206 */ 1207 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1208 } 1209 1210 static void 1211 __fs_rename_file_done(void *arg, int fserrno) 1212 { 1213 struct spdk_fs_request *req = arg; 1214 struct spdk_fs_cb_args *args = &req->args; 1215 1216 args->rc = fserrno; 1217 sem_post(args->sem); 1218 } 1219 1220 static void 1221 __fs_rename_file(void *arg) 1222 { 1223 struct spdk_fs_request *req = arg; 1224 struct spdk_fs_cb_args *args = &req->args; 1225 1226 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1227 __fs_rename_file_done, req); 1228 } 1229 1230 int 1231 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1232 const char *old_name, const char *new_name) 1233 { 1234 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1235 struct spdk_fs_request *req; 1236 struct spdk_fs_cb_args *args; 1237 int rc; 1238 1239 req = alloc_fs_request(channel); 1240 assert(req != NULL); 1241 1242 args = &req->args; 1243 1244 args->fs = fs; 1245 args->op.rename.old_name = old_name; 1246 args->op.rename.new_name = new_name; 1247 args->sem = &channel->sem; 1248 fs->send_request(__fs_rename_file, req); 1249 sem_wait(&channel->sem); 1250 rc = args->rc; 1251 free_fs_request(req); 1252 return rc; 1253 } 1254 1255 static void 1256 blob_delete_cb(void *ctx, int bserrno) 1257 { 1258 struct spdk_fs_request *req = ctx; 1259 struct spdk_fs_cb_args *args = &req->args; 1260 1261 args->fn.file_op(args->arg, bserrno); 1262 free_fs_request(req); 1263 } 1264 1265 void 1266 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1267 spdk_file_op_complete cb_fn, void *cb_arg) 1268 { 1269 struct spdk_file *f; 1270 spdk_blob_id blobid; 1271 struct spdk_fs_request *req; 1272 struct spdk_fs_cb_args *args; 1273 1274 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1275 1276 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1277 cb_fn(cb_arg, -ENAMETOOLONG); 1278 return; 1279 } 1280 1281 f = fs_find_file(fs, name); 1282 if (f == NULL) { 1283 cb_fn(cb_arg, -ENOENT); 1284 return; 1285 } 1286 1287 req = alloc_fs_request(fs->md_target.md_fs_channel); 1288 if (req == NULL) { 1289 cb_fn(cb_arg, -ENOMEM); 1290 return; 1291 } 1292 1293 args = &req->args; 1294 args->fn.file_op = cb_fn; 1295 args->arg = cb_arg; 1296 1297 if (f->ref_count > 0) { 1298 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1299 f->is_deleted = true; 1300 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1301 spdk_blob_sync_md(f->blob, blob_delete_cb, args); 1302 return; 1303 } 1304 1305 TAILQ_REMOVE(&fs->files, f, tailq); 1306 1307 cache_free_buffers(f); 1308 1309 blobid = f->blobid; 1310 1311 free(f->name); 1312 free(f->tree); 1313 free(f); 1314 1315 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1316 } 1317 1318 static void 1319 __fs_delete_file_done(void *arg, int fserrno) 1320 { 1321 struct spdk_fs_request *req = arg; 1322 struct spdk_fs_cb_args *args = &req->args; 1323 1324 args->rc = fserrno; 1325 sem_post(args->sem); 1326 } 1327 1328 static void 1329 __fs_delete_file(void *arg) 1330 { 1331 struct spdk_fs_request *req = arg; 1332 struct spdk_fs_cb_args *args = &req->args; 1333 1334 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1335 } 1336 1337 int 1338 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1339 const char *name) 1340 { 1341 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1342 struct spdk_fs_request *req; 1343 struct spdk_fs_cb_args *args; 1344 int rc; 1345 1346 req = alloc_fs_request(channel); 1347 assert(req != NULL); 1348 1349 args = &req->args; 1350 args->fs = fs; 1351 args->op.delete.name = name; 1352 args->sem = &channel->sem; 1353 fs->send_request(__fs_delete_file, req); 1354 sem_wait(&channel->sem); 1355 rc = args->rc; 1356 free_fs_request(req); 1357 1358 return rc; 1359 } 1360 1361 spdk_fs_iter 1362 spdk_fs_iter_first(struct spdk_filesystem *fs) 1363 { 1364 struct spdk_file *f; 1365 1366 f = TAILQ_FIRST(&fs->files); 1367 return f; 1368 } 1369 1370 spdk_fs_iter 1371 spdk_fs_iter_next(spdk_fs_iter iter) 1372 { 1373 struct spdk_file *f = iter; 1374 1375 if (f == NULL) { 1376 return NULL; 1377 } 1378 1379 f = TAILQ_NEXT(f, tailq); 1380 return f; 1381 } 1382 1383 const char * 1384 spdk_file_get_name(struct spdk_file *file) 1385 { 1386 return file->name; 1387 } 1388 1389 uint64_t 1390 spdk_file_get_length(struct spdk_file *file) 1391 { 1392 assert(file != NULL); 1393 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1394 return file->length; 1395 } 1396 1397 static void 1398 fs_truncate_complete_cb(void *ctx, int bserrno) 1399 { 1400 struct spdk_fs_request *req = ctx; 1401 struct spdk_fs_cb_args *args = &req->args; 1402 1403 args->fn.file_op(args->arg, bserrno); 1404 free_fs_request(req); 1405 } 1406 1407 static uint64_t 1408 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1409 { 1410 return (length + cluster_sz - 1) / cluster_sz; 1411 } 1412 1413 void 1414 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1415 spdk_file_op_complete cb_fn, void *cb_arg) 1416 { 1417 struct spdk_filesystem *fs; 1418 size_t num_clusters; 1419 struct spdk_fs_request *req; 1420 struct spdk_fs_cb_args *args; 1421 1422 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1423 if (length == file->length) { 1424 cb_fn(cb_arg, 0); 1425 return; 1426 } 1427 1428 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1429 if (req == NULL) { 1430 cb_fn(cb_arg, -ENOMEM); 1431 return; 1432 } 1433 1434 args = &req->args; 1435 args->fn.file_op = cb_fn; 1436 args->arg = cb_arg; 1437 args->file = file; 1438 fs = file->fs; 1439 1440 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1441 1442 spdk_blob_resize(file->blob, num_clusters); 1443 spdk_blob_set_xattr(file->blob, "length", &length, sizeof(length)); 1444 1445 file->length = length; 1446 if (file->append_pos > file->length) { 1447 file->append_pos = file->length; 1448 } 1449 1450 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, args); 1451 } 1452 1453 static void 1454 __truncate(void *arg) 1455 { 1456 struct spdk_fs_request *req = arg; 1457 struct spdk_fs_cb_args *args = &req->args; 1458 1459 spdk_file_truncate_async(args->file, args->op.truncate.length, 1460 args->fn.file_op, args->arg); 1461 } 1462 1463 void 1464 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1465 uint64_t length) 1466 { 1467 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1468 struct spdk_fs_request *req; 1469 struct spdk_fs_cb_args *args; 1470 1471 req = alloc_fs_request(channel); 1472 assert(req != NULL); 1473 1474 args = &req->args; 1475 1476 args->file = file; 1477 args->op.truncate.length = length; 1478 args->fn.file_op = __sem_post; 1479 args->arg = &channel->sem; 1480 1481 channel->send_request(__truncate, req); 1482 sem_wait(&channel->sem); 1483 free_fs_request(req); 1484 } 1485 1486 static void 1487 __rw_done(void *ctx, int bserrno) 1488 { 1489 struct spdk_fs_request *req = ctx; 1490 struct spdk_fs_cb_args *args = &req->args; 1491 1492 spdk_dma_free(args->op.rw.pin_buf); 1493 args->fn.file_op(args->arg, bserrno); 1494 free_fs_request(req); 1495 } 1496 1497 static void 1498 __read_done(void *ctx, int bserrno) 1499 { 1500 struct spdk_fs_request *req = ctx; 1501 struct spdk_fs_cb_args *args = &req->args; 1502 1503 if (args->op.rw.is_read) { 1504 memcpy(args->op.rw.user_buf, 1505 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1506 args->op.rw.length); 1507 __rw_done(req, 0); 1508 } else { 1509 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1510 args->op.rw.user_buf, 1511 args->op.rw.length); 1512 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1513 args->op.rw.pin_buf, 1514 args->op.rw.start_page, args->op.rw.num_pages, 1515 __rw_done, req); 1516 } 1517 } 1518 1519 static void 1520 __do_blob_read(void *ctx, int fserrno) 1521 { 1522 struct spdk_fs_request *req = ctx; 1523 struct spdk_fs_cb_args *args = &req->args; 1524 1525 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1526 args->op.rw.pin_buf, 1527 args->op.rw.start_page, args->op.rw.num_pages, 1528 __read_done, req); 1529 } 1530 1531 static void 1532 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1533 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1534 { 1535 uint64_t end_page; 1536 1537 *page_size = spdk_bs_get_page_size(file->fs->bs); 1538 *start_page = offset / *page_size; 1539 end_page = (offset + length - 1) / *page_size; 1540 *num_pages = (end_page - *start_page + 1); 1541 } 1542 1543 static void 1544 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1545 void *payload, uint64_t offset, uint64_t length, 1546 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1547 { 1548 struct spdk_fs_request *req; 1549 struct spdk_fs_cb_args *args; 1550 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1551 uint64_t start_page, num_pages, pin_buf_length; 1552 uint32_t page_size; 1553 1554 if (is_read && offset + length > file->length) { 1555 cb_fn(cb_arg, -EINVAL); 1556 return; 1557 } 1558 1559 req = alloc_fs_request(channel); 1560 if (req == NULL) { 1561 cb_fn(cb_arg, -ENOMEM); 1562 return; 1563 } 1564 1565 args = &req->args; 1566 args->fn.file_op = cb_fn; 1567 args->arg = cb_arg; 1568 args->file = file; 1569 args->op.rw.channel = channel->bs_channel; 1570 args->op.rw.user_buf = payload; 1571 args->op.rw.is_read = is_read; 1572 args->op.rw.offset = offset; 1573 args->op.rw.length = length; 1574 1575 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1576 pin_buf_length = num_pages * page_size; 1577 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL); 1578 1579 args->op.rw.start_page = start_page; 1580 args->op.rw.num_pages = num_pages; 1581 1582 if (!is_read && file->length < offset + length) { 1583 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1584 } else { 1585 __do_blob_read(req, 0); 1586 } 1587 } 1588 1589 void 1590 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1591 void *payload, uint64_t offset, uint64_t length, 1592 spdk_file_op_complete cb_fn, void *cb_arg) 1593 { 1594 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1595 } 1596 1597 void 1598 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1599 void *payload, uint64_t offset, uint64_t length, 1600 spdk_file_op_complete cb_fn, void *cb_arg) 1601 { 1602 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1603 file->name, offset, length); 1604 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1605 } 1606 1607 struct spdk_io_channel * 1608 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1609 { 1610 struct spdk_io_channel *io_channel; 1611 struct spdk_fs_channel *fs_channel; 1612 1613 io_channel = spdk_get_io_channel(&fs->io_target); 1614 fs_channel = spdk_io_channel_get_ctx(io_channel); 1615 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1616 fs_channel->send_request = __send_request_direct; 1617 1618 return io_channel; 1619 } 1620 1621 struct spdk_io_channel * 1622 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs) 1623 { 1624 struct spdk_io_channel *io_channel; 1625 struct spdk_fs_channel *fs_channel; 1626 1627 io_channel = spdk_get_io_channel(&fs->io_target); 1628 fs_channel = spdk_io_channel_get_ctx(io_channel); 1629 fs_channel->send_request = fs->send_request; 1630 fs_channel->sync = 1; 1631 pthread_spin_init(&fs_channel->lock, 0); 1632 1633 return io_channel; 1634 } 1635 1636 void 1637 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1638 { 1639 spdk_put_io_channel(channel); 1640 } 1641 1642 void 1643 spdk_fs_set_cache_size(uint64_t size_in_mb) 1644 { 1645 g_fs_cache_size = size_in_mb * 1024 * 1024; 1646 } 1647 1648 uint64_t 1649 spdk_fs_get_cache_size(void) 1650 { 1651 return g_fs_cache_size / (1024 * 1024); 1652 } 1653 1654 static void __file_flush(void *_args); 1655 1656 static void * 1657 alloc_cache_memory_buffer(struct spdk_file *context) 1658 { 1659 struct spdk_file *file; 1660 void *buf; 1661 1662 buf = spdk_mempool_get(g_cache_pool); 1663 if (buf != NULL) { 1664 return buf; 1665 } 1666 1667 pthread_spin_lock(&g_caches_lock); 1668 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1669 if (!file->open_for_writing && 1670 file->priority == SPDK_FILE_PRIORITY_LOW && 1671 file != context) { 1672 break; 1673 } 1674 } 1675 pthread_spin_unlock(&g_caches_lock); 1676 if (file != NULL) { 1677 cache_free_buffers(file); 1678 buf = spdk_mempool_get(g_cache_pool); 1679 if (buf != NULL) { 1680 return buf; 1681 } 1682 } 1683 1684 pthread_spin_lock(&g_caches_lock); 1685 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1686 if (!file->open_for_writing && file != context) { 1687 break; 1688 } 1689 } 1690 pthread_spin_unlock(&g_caches_lock); 1691 if (file != NULL) { 1692 cache_free_buffers(file); 1693 buf = spdk_mempool_get(g_cache_pool); 1694 if (buf != NULL) { 1695 return buf; 1696 } 1697 } 1698 1699 pthread_spin_lock(&g_caches_lock); 1700 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1701 if (file != context) { 1702 break; 1703 } 1704 } 1705 pthread_spin_unlock(&g_caches_lock); 1706 if (file != NULL) { 1707 cache_free_buffers(file); 1708 buf = spdk_mempool_get(g_cache_pool); 1709 if (buf != NULL) { 1710 return buf; 1711 } 1712 } 1713 1714 return NULL; 1715 } 1716 1717 static struct cache_buffer * 1718 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1719 { 1720 struct cache_buffer *buf; 1721 int count = 0; 1722 1723 buf = calloc(1, sizeof(*buf)); 1724 if (buf == NULL) { 1725 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 1726 return NULL; 1727 } 1728 1729 buf->buf = alloc_cache_memory_buffer(file); 1730 while (buf->buf == NULL) { 1731 /* 1732 * TODO: alloc_cache_memory_buffer() should eventually free 1733 * some buffers. Need a more sophisticated check here, instead 1734 * of just bailing if 100 tries does not result in getting a 1735 * free buffer. This will involve using the sync channel's 1736 * semaphore to block until a buffer becomes available. 1737 */ 1738 if (count++ == 100) { 1739 SPDK_ERRLOG("could not allocate cache buffer\n"); 1740 assert(false); 1741 free(buf); 1742 return NULL; 1743 } 1744 buf->buf = alloc_cache_memory_buffer(file); 1745 } 1746 1747 buf->buf_size = CACHE_BUFFER_SIZE; 1748 buf->offset = offset; 1749 1750 pthread_spin_lock(&g_caches_lock); 1751 if (file->tree->present_mask == 0) { 1752 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1753 } 1754 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1755 pthread_spin_unlock(&g_caches_lock); 1756 1757 return buf; 1758 } 1759 1760 static struct cache_buffer * 1761 cache_append_buffer(struct spdk_file *file) 1762 { 1763 struct cache_buffer *last; 1764 1765 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1766 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1767 1768 last = cache_insert_buffer(file, file->append_pos); 1769 if (last == NULL) { 1770 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 1771 return NULL; 1772 } 1773 1774 file->last = last; 1775 1776 return last; 1777 } 1778 1779 static void 1780 __wake_caller(struct spdk_fs_cb_args *args) 1781 { 1782 sem_post(args->sem); 1783 } 1784 1785 static void __check_sync_reqs(struct spdk_file *file); 1786 1787 static void 1788 __file_cache_finish_sync(struct spdk_file *file) 1789 { 1790 struct spdk_fs_request *sync_req; 1791 struct spdk_fs_cb_args *sync_args; 1792 1793 pthread_spin_lock(&file->lock); 1794 sync_req = TAILQ_FIRST(&file->sync_requests); 1795 sync_args = &sync_req->args; 1796 assert(sync_args->op.sync.offset <= file->length_flushed); 1797 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1798 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1799 pthread_spin_unlock(&file->lock); 1800 1801 sync_args->fn.file_op(sync_args->arg, 0); 1802 __check_sync_reqs(file); 1803 1804 pthread_spin_lock(&file->lock); 1805 free_fs_request(sync_req); 1806 pthread_spin_unlock(&file->lock); 1807 } 1808 1809 static void 1810 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1811 { 1812 struct spdk_file *file = ctx; 1813 1814 __file_cache_finish_sync(file); 1815 } 1816 1817 static void 1818 __free_args(struct spdk_fs_cb_args *args) 1819 { 1820 struct spdk_fs_request *req; 1821 1822 if (!args->from_request) { 1823 free(args); 1824 } else { 1825 /* Depends on args being at the start of the spdk_fs_request structure. */ 1826 req = (struct spdk_fs_request *)args; 1827 free_fs_request(req); 1828 } 1829 } 1830 1831 static void 1832 __check_sync_reqs(struct spdk_file *file) 1833 { 1834 struct spdk_fs_request *sync_req; 1835 1836 pthread_spin_lock(&file->lock); 1837 1838 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1839 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1840 break; 1841 } 1842 } 1843 1844 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1845 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1846 sync_req->args.op.sync.xattr_in_progress = true; 1847 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 1848 sizeof(file->length_flushed)); 1849 1850 pthread_spin_unlock(&file->lock); 1851 spdk_blob_sync_md(file->blob, __file_cache_finish_sync_bs_cb, file); 1852 } else { 1853 pthread_spin_unlock(&file->lock); 1854 } 1855 } 1856 1857 static void 1858 __file_flush_done(void *arg, int bserrno) 1859 { 1860 struct spdk_fs_cb_args *args = arg; 1861 struct spdk_file *file = args->file; 1862 struct cache_buffer *next = args->op.flush.cache_buffer; 1863 1864 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1865 1866 pthread_spin_lock(&file->lock); 1867 next->in_progress = false; 1868 next->bytes_flushed += args->op.flush.length; 1869 file->length_flushed += args->op.flush.length; 1870 if (file->length_flushed > file->length) { 1871 file->length = file->length_flushed; 1872 } 1873 if (next->bytes_flushed == next->buf_size) { 1874 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1875 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1876 } 1877 1878 /* 1879 * Assert that there is no cached data that extends past the end of the underlying 1880 * blob. 1881 */ 1882 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1883 next->bytes_filled == 0); 1884 1885 pthread_spin_unlock(&file->lock); 1886 1887 __check_sync_reqs(file); 1888 1889 __file_flush(args); 1890 } 1891 1892 static void 1893 __file_flush(void *_args) 1894 { 1895 struct spdk_fs_cb_args *args = _args; 1896 struct spdk_file *file = args->file; 1897 struct cache_buffer *next; 1898 uint64_t offset, length, start_page, num_pages; 1899 uint32_t page_size; 1900 1901 pthread_spin_lock(&file->lock); 1902 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1903 if (next == NULL || next->in_progress) { 1904 /* 1905 * There is either no data to flush, or a flush I/O is already in 1906 * progress. So return immediately - if a flush I/O is in 1907 * progress we will flush more data after that is completed. 1908 */ 1909 __free_args(args); 1910 pthread_spin_unlock(&file->lock); 1911 return; 1912 } 1913 1914 offset = next->offset + next->bytes_flushed; 1915 length = next->bytes_filled - next->bytes_flushed; 1916 if (length == 0) { 1917 __free_args(args); 1918 pthread_spin_unlock(&file->lock); 1919 return; 1920 } 1921 args->op.flush.length = length; 1922 args->op.flush.cache_buffer = next; 1923 1924 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1925 1926 next->in_progress = true; 1927 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1928 offset, length, start_page, num_pages); 1929 pthread_spin_unlock(&file->lock); 1930 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1931 next->buf + (start_page * page_size) - next->offset, 1932 start_page, num_pages, __file_flush_done, args); 1933 } 1934 1935 static void 1936 __file_extend_done(void *arg, int bserrno) 1937 { 1938 struct spdk_fs_cb_args *args = arg; 1939 1940 __wake_caller(args); 1941 } 1942 1943 static void 1944 __file_extend_blob(void *_args) 1945 { 1946 struct spdk_fs_cb_args *args = _args; 1947 struct spdk_file *file = args->file; 1948 1949 spdk_blob_resize(file->blob, args->op.resize.num_clusters); 1950 1951 spdk_blob_sync_md(file->blob, __file_extend_done, args); 1952 } 1953 1954 static void 1955 __rw_from_file_done(void *arg, int bserrno) 1956 { 1957 struct spdk_fs_cb_args *args = arg; 1958 1959 __wake_caller(args); 1960 __free_args(args); 1961 } 1962 1963 static void 1964 __rw_from_file(void *_args) 1965 { 1966 struct spdk_fs_cb_args *args = _args; 1967 struct spdk_file *file = args->file; 1968 1969 if (args->op.rw.is_read) { 1970 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1971 args->op.rw.offset, args->op.rw.length, 1972 __rw_from_file_done, args); 1973 } else { 1974 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1975 args->op.rw.offset, args->op.rw.length, 1976 __rw_from_file_done, args); 1977 } 1978 } 1979 1980 static int 1981 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1982 uint64_t offset, uint64_t length, bool is_read) 1983 { 1984 struct spdk_fs_cb_args *args; 1985 1986 args = calloc(1, sizeof(*args)); 1987 if (args == NULL) { 1988 sem_post(sem); 1989 return -ENOMEM; 1990 } 1991 1992 args->file = file; 1993 args->sem = sem; 1994 args->op.rw.user_buf = payload; 1995 args->op.rw.offset = offset; 1996 args->op.rw.length = length; 1997 args->op.rw.is_read = is_read; 1998 file->fs->send_request(__rw_from_file, args); 1999 return 0; 2000 } 2001 2002 int 2003 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 2004 void *payload, uint64_t offset, uint64_t length) 2005 { 2006 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2007 struct spdk_fs_cb_args *args; 2008 uint64_t rem_length, copy, blob_size, cluster_sz; 2009 uint32_t cache_buffers_filled = 0; 2010 uint8_t *cur_payload; 2011 struct cache_buffer *last; 2012 2013 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2014 2015 if (length == 0) { 2016 return 0; 2017 } 2018 2019 if (offset != file->append_pos) { 2020 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2021 return -EINVAL; 2022 } 2023 2024 pthread_spin_lock(&file->lock); 2025 file->open_for_writing = true; 2026 2027 if (file->last == NULL) { 2028 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 2029 cache_append_buffer(file); 2030 } else { 2031 int rc; 2032 2033 file->append_pos += length; 2034 pthread_spin_unlock(&file->lock); 2035 rc = __send_rw_from_file(file, &channel->sem, payload, 2036 offset, length, false); 2037 sem_wait(&channel->sem); 2038 return rc; 2039 } 2040 } 2041 2042 blob_size = __file_get_blob_size(file); 2043 2044 if ((offset + length) > blob_size) { 2045 struct spdk_fs_cb_args extend_args = {}; 2046 2047 cluster_sz = file->fs->bs_opts.cluster_sz; 2048 extend_args.sem = &channel->sem; 2049 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2050 extend_args.file = file; 2051 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2052 pthread_spin_unlock(&file->lock); 2053 file->fs->send_request(__file_extend_blob, &extend_args); 2054 sem_wait(&channel->sem); 2055 } 2056 2057 last = file->last; 2058 rem_length = length; 2059 cur_payload = payload; 2060 while (rem_length > 0) { 2061 copy = last->buf_size - last->bytes_filled; 2062 if (copy > rem_length) { 2063 copy = rem_length; 2064 } 2065 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2066 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2067 file->append_pos += copy; 2068 if (file->length < file->append_pos) { 2069 file->length = file->append_pos; 2070 } 2071 cur_payload += copy; 2072 last->bytes_filled += copy; 2073 rem_length -= copy; 2074 if (last->bytes_filled == last->buf_size) { 2075 cache_buffers_filled++; 2076 last = cache_append_buffer(file); 2077 if (last == NULL) { 2078 BLOBFS_TRACE(file, "nomem\n"); 2079 pthread_spin_unlock(&file->lock); 2080 return -ENOMEM; 2081 } 2082 } 2083 } 2084 2085 pthread_spin_unlock(&file->lock); 2086 2087 if (cache_buffers_filled == 0) { 2088 return 0; 2089 } 2090 2091 args = calloc(1, sizeof(*args)); 2092 if (args == NULL) { 2093 return -ENOMEM; 2094 } 2095 2096 args->file = file; 2097 file->fs->send_request(__file_flush, args); 2098 return 0; 2099 } 2100 2101 static void 2102 __readahead_done(void *arg, int bserrno) 2103 { 2104 struct spdk_fs_cb_args *args = arg; 2105 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2106 struct spdk_file *file = args->file; 2107 2108 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2109 2110 pthread_spin_lock(&file->lock); 2111 cache_buffer->bytes_filled = args->op.readahead.length; 2112 cache_buffer->bytes_flushed = args->op.readahead.length; 2113 cache_buffer->in_progress = false; 2114 pthread_spin_unlock(&file->lock); 2115 2116 __free_args(args); 2117 } 2118 2119 static void 2120 __readahead(void *_args) 2121 { 2122 struct spdk_fs_cb_args *args = _args; 2123 struct spdk_file *file = args->file; 2124 uint64_t offset, length, start_page, num_pages; 2125 uint32_t page_size; 2126 2127 offset = args->op.readahead.offset; 2128 length = args->op.readahead.length; 2129 assert(length > 0); 2130 2131 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 2132 2133 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2134 offset, length, start_page, num_pages); 2135 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2136 args->op.readahead.cache_buffer->buf, 2137 start_page, num_pages, __readahead_done, args); 2138 } 2139 2140 static uint64_t 2141 __next_cache_buffer_offset(uint64_t offset) 2142 { 2143 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2144 } 2145 2146 static void 2147 check_readahead(struct spdk_file *file, uint64_t offset) 2148 { 2149 struct spdk_fs_cb_args *args; 2150 2151 offset = __next_cache_buffer_offset(offset); 2152 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2153 return; 2154 } 2155 2156 args = calloc(1, sizeof(*args)); 2157 if (args == NULL) { 2158 return; 2159 } 2160 2161 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2162 2163 args->file = file; 2164 args->op.readahead.offset = offset; 2165 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2166 args->op.readahead.cache_buffer->in_progress = true; 2167 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2168 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2169 } else { 2170 args->op.readahead.length = CACHE_BUFFER_SIZE; 2171 } 2172 file->fs->send_request(__readahead, args); 2173 } 2174 2175 static int 2176 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2177 { 2178 struct cache_buffer *buf; 2179 int rc; 2180 2181 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2182 if (buf == NULL) { 2183 pthread_spin_unlock(&file->lock); 2184 rc = __send_rw_from_file(file, sem, payload, offset, length, true); 2185 pthread_spin_lock(&file->lock); 2186 return rc; 2187 } 2188 2189 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2190 length = buf->offset + buf->bytes_filled - offset; 2191 } 2192 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2193 memcpy(payload, &buf->buf[offset - buf->offset], length); 2194 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2195 pthread_spin_lock(&g_caches_lock); 2196 spdk_tree_remove_buffer(file->tree, buf); 2197 if (file->tree->present_mask == 0) { 2198 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2199 } 2200 pthread_spin_unlock(&g_caches_lock); 2201 } 2202 2203 sem_post(sem); 2204 return 0; 2205 } 2206 2207 int64_t 2208 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2209 void *payload, uint64_t offset, uint64_t length) 2210 { 2211 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2212 uint64_t final_offset, final_length; 2213 uint32_t sub_reads = 0; 2214 int rc = 0; 2215 2216 pthread_spin_lock(&file->lock); 2217 2218 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2219 2220 file->open_for_writing = false; 2221 2222 if (length == 0 || offset >= file->append_pos) { 2223 pthread_spin_unlock(&file->lock); 2224 return 0; 2225 } 2226 2227 if (offset + length > file->append_pos) { 2228 length = file->append_pos - offset; 2229 } 2230 2231 if (offset != file->next_seq_offset) { 2232 file->seq_byte_count = 0; 2233 } 2234 file->seq_byte_count += length; 2235 file->next_seq_offset = offset + length; 2236 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2237 check_readahead(file, offset); 2238 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2239 } 2240 2241 final_length = 0; 2242 final_offset = offset + length; 2243 while (offset < final_offset) { 2244 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2245 if (length > (final_offset - offset)) { 2246 length = final_offset - offset; 2247 } 2248 rc = __file_read(file, payload, offset, length, &channel->sem); 2249 if (rc == 0) { 2250 final_length += length; 2251 } else { 2252 break; 2253 } 2254 payload += length; 2255 offset += length; 2256 sub_reads++; 2257 } 2258 pthread_spin_unlock(&file->lock); 2259 while (sub_reads-- > 0) { 2260 sem_wait(&channel->sem); 2261 } 2262 if (rc == 0) { 2263 return final_length; 2264 } else { 2265 return rc; 2266 } 2267 } 2268 2269 static void 2270 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2271 spdk_file_op_complete cb_fn, void *cb_arg) 2272 { 2273 struct spdk_fs_request *sync_req; 2274 struct spdk_fs_request *flush_req; 2275 struct spdk_fs_cb_args *sync_args; 2276 struct spdk_fs_cb_args *flush_args; 2277 2278 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2279 2280 pthread_spin_lock(&file->lock); 2281 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2282 BLOBFS_TRACE(file, "done - no data to flush\n"); 2283 pthread_spin_unlock(&file->lock); 2284 cb_fn(cb_arg, 0); 2285 return; 2286 } 2287 2288 sync_req = alloc_fs_request(channel); 2289 assert(sync_req != NULL); 2290 sync_args = &sync_req->args; 2291 2292 flush_req = alloc_fs_request(channel); 2293 assert(flush_req != NULL); 2294 flush_args = &flush_req->args; 2295 2296 sync_args->file = file; 2297 sync_args->fn.file_op = cb_fn; 2298 sync_args->arg = cb_arg; 2299 sync_args->op.sync.offset = file->append_pos; 2300 sync_args->op.sync.xattr_in_progress = false; 2301 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2302 pthread_spin_unlock(&file->lock); 2303 2304 flush_args->file = file; 2305 channel->send_request(__file_flush, flush_args); 2306 } 2307 2308 int 2309 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2310 { 2311 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2312 2313 _file_sync(file, channel, __sem_post, &channel->sem); 2314 sem_wait(&channel->sem); 2315 2316 return 0; 2317 } 2318 2319 void 2320 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2321 spdk_file_op_complete cb_fn, void *cb_arg) 2322 { 2323 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2324 2325 _file_sync(file, channel, cb_fn, cb_arg); 2326 } 2327 2328 void 2329 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2330 { 2331 BLOBFS_TRACE(file, "priority=%u\n", priority); 2332 file->priority = priority; 2333 2334 } 2335 2336 /* 2337 * Close routines 2338 */ 2339 2340 static void 2341 __file_close_async_done(void *ctx, int bserrno) 2342 { 2343 struct spdk_fs_request *req = ctx; 2344 struct spdk_fs_cb_args *args = &req->args; 2345 struct spdk_file *file = args->file; 2346 2347 if (file->is_deleted) { 2348 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2349 return; 2350 } 2351 2352 args->fn.file_op(args->arg, bserrno); 2353 free_fs_request(req); 2354 } 2355 2356 static void 2357 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2358 { 2359 struct spdk_blob *blob; 2360 2361 pthread_spin_lock(&file->lock); 2362 if (file->ref_count == 0) { 2363 pthread_spin_unlock(&file->lock); 2364 __file_close_async_done(req, -EBADF); 2365 return; 2366 } 2367 2368 file->ref_count--; 2369 if (file->ref_count > 0) { 2370 pthread_spin_unlock(&file->lock); 2371 req->args.fn.file_op(req->args.arg, 0); 2372 free_fs_request(req); 2373 return; 2374 } 2375 2376 pthread_spin_unlock(&file->lock); 2377 2378 blob = file->blob; 2379 file->blob = NULL; 2380 spdk_blob_close(blob, __file_close_async_done, req); 2381 } 2382 2383 static void 2384 __file_close_async__sync_done(void *arg, int fserrno) 2385 { 2386 struct spdk_fs_request *req = arg; 2387 struct spdk_fs_cb_args *args = &req->args; 2388 2389 __file_close_async(args->file, req); 2390 } 2391 2392 void 2393 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2394 { 2395 struct spdk_fs_request *req; 2396 struct spdk_fs_cb_args *args; 2397 2398 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2399 if (req == NULL) { 2400 cb_fn(cb_arg, -ENOMEM); 2401 return; 2402 } 2403 2404 args = &req->args; 2405 args->file = file; 2406 args->fn.file_op = cb_fn; 2407 args->arg = cb_arg; 2408 2409 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2410 } 2411 2412 static void 2413 __file_close_done(void *arg, int fserrno) 2414 { 2415 struct spdk_fs_cb_args *args = arg; 2416 2417 args->rc = fserrno; 2418 sem_post(args->sem); 2419 } 2420 2421 static void 2422 __file_close(void *arg) 2423 { 2424 struct spdk_fs_request *req = arg; 2425 struct spdk_fs_cb_args *args = &req->args; 2426 struct spdk_file *file = args->file; 2427 2428 __file_close_async(file, req); 2429 } 2430 2431 int 2432 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2433 { 2434 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2435 struct spdk_fs_request *req; 2436 struct spdk_fs_cb_args *args; 2437 2438 req = alloc_fs_request(channel); 2439 assert(req != NULL); 2440 2441 args = &req->args; 2442 2443 spdk_file_sync(file, _channel); 2444 BLOBFS_TRACE(file, "name=%s\n", file->name); 2445 args->file = file; 2446 args->sem = &channel->sem; 2447 args->fn.file_op = __file_close_done; 2448 args->arg = req; 2449 channel->send_request(__file_close, req); 2450 sem_wait(&channel->sem); 2451 2452 return args->rc; 2453 } 2454 2455 static void 2456 cache_free_buffers(struct spdk_file *file) 2457 { 2458 BLOBFS_TRACE(file, "free=%s\n", file->name); 2459 pthread_spin_lock(&file->lock); 2460 pthread_spin_lock(&g_caches_lock); 2461 if (file->tree->present_mask == 0) { 2462 pthread_spin_unlock(&g_caches_lock); 2463 pthread_spin_unlock(&file->lock); 2464 return; 2465 } 2466 spdk_tree_free_buffers(file->tree); 2467 2468 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2469 /* If not freed, put it in the end of the queue */ 2470 if (file->tree->present_mask != 0) { 2471 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2472 } 2473 file->last = NULL; 2474 pthread_spin_unlock(&g_caches_lock); 2475 pthread_spin_unlock(&file->lock); 2476 } 2477 2478 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2479 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2480