1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "blobfs_internal.h" 38 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/assert.h" 42 #include "spdk/env.h" 43 #include "spdk/util.h" 44 #include "spdk_internal/log.h" 45 46 #define BLOBFS_TRACE(file, str, args...) \ 47 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args) 48 49 #define BLOBFS_TRACE_RW(file, str, args...) \ 50 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args) 51 52 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 53 54 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 55 static struct spdk_mempool *g_cache_pool; 56 static TAILQ_HEAD(, spdk_file) g_caches; 57 static int g_fs_count = 0; 58 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 59 static pthread_spinlock_t g_caches_lock; 60 61 static void 62 __sem_post(void *arg, int bserrno) 63 { 64 sem_t *sem = arg; 65 66 sem_post(sem); 67 } 68 69 void 70 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 71 { 72 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 73 free(cache_buffer); 74 } 75 76 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 77 78 struct spdk_file { 79 struct spdk_filesystem *fs; 80 struct spdk_blob *blob; 81 char *name; 82 uint64_t length; 83 bool open_for_writing; 84 uint64_t length_flushed; 85 uint64_t append_pos; 86 uint64_t seq_byte_count; 87 uint64_t next_seq_offset; 88 uint32_t priority; 89 TAILQ_ENTRY(spdk_file) tailq; 90 spdk_blob_id blobid; 91 uint32_t ref_count; 92 pthread_spinlock_t lock; 93 struct cache_buffer *last; 94 struct cache_tree *tree; 95 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 96 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 97 TAILQ_ENTRY(spdk_file) cache_tailq; 98 }; 99 100 struct spdk_filesystem { 101 struct spdk_blob_store *bs; 102 TAILQ_HEAD(, spdk_file) files; 103 struct spdk_bs_opts bs_opts; 104 struct spdk_bs_dev *bdev; 105 fs_send_request_fn send_request; 106 107 struct { 108 uint32_t max_ops; 109 struct spdk_io_channel *sync_io_channel; 110 struct spdk_fs_channel *sync_fs_channel; 111 } sync_target; 112 113 struct { 114 uint32_t max_ops; 115 struct spdk_io_channel *md_io_channel; 116 struct spdk_fs_channel *md_fs_channel; 117 } md_target; 118 119 struct { 120 uint32_t max_ops; 121 } io_target; 122 }; 123 124 struct spdk_fs_cb_args { 125 union { 126 spdk_fs_op_with_handle_complete fs_op_with_handle; 127 spdk_fs_op_complete fs_op; 128 spdk_file_op_with_handle_complete file_op_with_handle; 129 spdk_file_op_complete file_op; 130 spdk_file_stat_op_complete stat_op; 131 } fn; 132 void *arg; 133 sem_t *sem; 134 struct spdk_filesystem *fs; 135 struct spdk_file *file; 136 int rc; 137 bool from_request; 138 union { 139 struct { 140 uint64_t length; 141 } truncate; 142 struct { 143 struct spdk_io_channel *channel; 144 void *user_buf; 145 void *pin_buf; 146 int is_read; 147 off_t offset; 148 size_t length; 149 uint64_t start_page; 150 uint64_t num_pages; 151 uint32_t blocklen; 152 } rw; 153 struct { 154 const char *old_name; 155 const char *new_name; 156 } rename; 157 struct { 158 struct cache_buffer *cache_buffer; 159 uint64_t length; 160 } flush; 161 struct { 162 struct cache_buffer *cache_buffer; 163 uint64_t length; 164 uint64_t offset; 165 } readahead; 166 struct { 167 uint64_t offset; 168 TAILQ_ENTRY(spdk_fs_request) tailq; 169 bool xattr_in_progress; 170 } sync; 171 struct { 172 uint32_t num_clusters; 173 } resize; 174 struct { 175 const char *name; 176 uint32_t flags; 177 TAILQ_ENTRY(spdk_fs_request) tailq; 178 } open; 179 struct { 180 const char *name; 181 } create; 182 struct { 183 const char *name; 184 } delete; 185 struct { 186 const char *name; 187 } stat; 188 } op; 189 }; 190 191 static void cache_free_buffers(struct spdk_file *file); 192 193 static void 194 __initialize_cache(void) 195 { 196 assert(g_cache_pool == NULL); 197 198 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 199 g_fs_cache_size / CACHE_BUFFER_SIZE, 200 CACHE_BUFFER_SIZE, 201 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 202 SPDK_ENV_SOCKET_ID_ANY); 203 TAILQ_INIT(&g_caches); 204 pthread_spin_init(&g_caches_lock, 0); 205 } 206 207 static void 208 __free_cache(void) 209 { 210 assert(g_cache_pool != NULL); 211 212 spdk_mempool_free(g_cache_pool); 213 g_cache_pool = NULL; 214 } 215 216 static uint64_t 217 __file_get_blob_size(struct spdk_file *file) 218 { 219 uint64_t cluster_sz; 220 221 cluster_sz = file->fs->bs_opts.cluster_sz; 222 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 223 } 224 225 struct spdk_fs_request { 226 struct spdk_fs_cb_args args; 227 TAILQ_ENTRY(spdk_fs_request) link; 228 struct spdk_fs_channel *channel; 229 }; 230 231 struct spdk_fs_channel { 232 struct spdk_fs_request *req_mem; 233 TAILQ_HEAD(, spdk_fs_request) reqs; 234 sem_t sem; 235 struct spdk_filesystem *fs; 236 struct spdk_io_channel *bs_channel; 237 fs_send_request_fn send_request; 238 bool sync; 239 pthread_spinlock_t lock; 240 }; 241 242 static struct spdk_fs_request * 243 alloc_fs_request(struct spdk_fs_channel *channel) 244 { 245 struct spdk_fs_request *req; 246 247 if (channel->sync) { 248 pthread_spin_lock(&channel->lock); 249 } 250 251 req = TAILQ_FIRST(&channel->reqs); 252 if (req) { 253 TAILQ_REMOVE(&channel->reqs, req, link); 254 } 255 256 if (channel->sync) { 257 pthread_spin_unlock(&channel->lock); 258 } 259 260 if (req == NULL) { 261 return NULL; 262 } 263 memset(req, 0, sizeof(*req)); 264 req->channel = channel; 265 req->args.from_request = true; 266 267 return req; 268 } 269 270 static void 271 free_fs_request(struct spdk_fs_request *req) 272 { 273 struct spdk_fs_channel *channel = req->channel; 274 275 if (channel->sync) { 276 pthread_spin_lock(&channel->lock); 277 } 278 279 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 280 281 if (channel->sync) { 282 pthread_spin_unlock(&channel->lock); 283 } 284 } 285 286 static int 287 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 288 uint32_t max_ops) 289 { 290 uint32_t i; 291 292 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 293 if (!channel->req_mem) { 294 return -1; 295 } 296 297 TAILQ_INIT(&channel->reqs); 298 sem_init(&channel->sem, 0, 0); 299 300 for (i = 0; i < max_ops; i++) { 301 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 302 } 303 304 channel->fs = fs; 305 306 return 0; 307 } 308 309 static int 310 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 311 { 312 struct spdk_filesystem *fs; 313 struct spdk_fs_channel *channel = ctx_buf; 314 315 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 316 317 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 318 } 319 320 static int 321 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 322 { 323 struct spdk_filesystem *fs; 324 struct spdk_fs_channel *channel = ctx_buf; 325 326 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 327 328 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 329 } 330 331 static int 332 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 333 { 334 struct spdk_filesystem *fs; 335 struct spdk_fs_channel *channel = ctx_buf; 336 337 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 338 339 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 340 } 341 342 static void 343 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 344 { 345 struct spdk_fs_channel *channel = ctx_buf; 346 347 free(channel->req_mem); 348 if (channel->bs_channel != NULL) { 349 spdk_bs_free_io_channel(channel->bs_channel); 350 } 351 } 352 353 static void 354 __send_request_direct(fs_request_fn fn, void *arg) 355 { 356 fn(arg); 357 } 358 359 static void 360 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 361 { 362 fs->bs = bs; 363 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 364 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 365 fs->md_target.md_fs_channel->send_request = __send_request_direct; 366 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 367 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 368 369 pthread_mutex_lock(&g_cache_init_lock); 370 if (g_fs_count == 0) { 371 __initialize_cache(); 372 } 373 g_fs_count++; 374 pthread_mutex_unlock(&g_cache_init_lock); 375 } 376 377 static void 378 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 379 { 380 struct spdk_fs_request *req = ctx; 381 struct spdk_fs_cb_args *args = &req->args; 382 struct spdk_filesystem *fs = args->fs; 383 384 if (bserrno == 0) { 385 common_fs_bs_init(fs, bs); 386 } else { 387 free(fs); 388 fs = NULL; 389 } 390 391 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 392 free_fs_request(req); 393 } 394 395 static struct spdk_filesystem * 396 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 397 { 398 struct spdk_filesystem *fs; 399 400 fs = calloc(1, sizeof(*fs)); 401 if (fs == NULL) { 402 return NULL; 403 } 404 405 fs->bdev = dev; 406 fs->send_request = send_request_fn; 407 TAILQ_INIT(&fs->files); 408 409 fs->md_target.max_ops = 512; 410 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 411 sizeof(struct spdk_fs_channel)); 412 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 413 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 414 415 fs->sync_target.max_ops = 512; 416 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 417 sizeof(struct spdk_fs_channel)); 418 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 419 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 420 421 fs->io_target.max_ops = 512; 422 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 423 sizeof(struct spdk_fs_channel)); 424 425 return fs; 426 } 427 428 void 429 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 430 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 431 { 432 struct spdk_filesystem *fs; 433 struct spdk_fs_request *req; 434 struct spdk_fs_cb_args *args; 435 436 fs = fs_alloc(dev, send_request_fn); 437 if (fs == NULL) { 438 cb_fn(cb_arg, NULL, -ENOMEM); 439 return; 440 } 441 442 req = alloc_fs_request(fs->md_target.md_fs_channel); 443 if (req == NULL) { 444 spdk_put_io_channel(fs->md_target.md_io_channel); 445 spdk_io_device_unregister(&fs->md_target, NULL); 446 spdk_put_io_channel(fs->sync_target.sync_io_channel); 447 spdk_io_device_unregister(&fs->sync_target, NULL); 448 spdk_io_device_unregister(&fs->io_target, NULL); 449 free(fs); 450 cb_fn(cb_arg, NULL, -ENOMEM); 451 return; 452 } 453 454 args = &req->args; 455 args->fn.fs_op_with_handle = cb_fn; 456 args->arg = cb_arg; 457 args->fs = fs; 458 459 spdk_bs_init(dev, NULL, init_cb, req); 460 } 461 462 static struct spdk_file * 463 file_alloc(struct spdk_filesystem *fs) 464 { 465 struct spdk_file *file; 466 467 file = calloc(1, sizeof(*file)); 468 if (file == NULL) { 469 return NULL; 470 } 471 472 file->tree = calloc(1, sizeof(*file->tree)); 473 if (file->tree == NULL) { 474 free(file); 475 return NULL; 476 } 477 478 file->fs = fs; 479 TAILQ_INIT(&file->open_requests); 480 TAILQ_INIT(&file->sync_requests); 481 pthread_spin_init(&file->lock, 0); 482 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 483 file->priority = SPDK_FILE_PRIORITY_LOW; 484 return file; 485 } 486 487 static void 488 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 489 { 490 struct spdk_fs_request *req = ctx; 491 struct spdk_fs_cb_args *args = &req->args; 492 struct spdk_filesystem *fs = args->fs; 493 struct spdk_file *f; 494 uint64_t *length; 495 const char *name; 496 size_t value_len; 497 498 if (rc == -ENOENT) { 499 /* Finished iterating */ 500 args->fn.fs_op_with_handle(args->arg, fs, 0); 501 free_fs_request(req); 502 return; 503 } else if (rc < 0) { 504 args->fn.fs_op_with_handle(args->arg, fs, rc); 505 free_fs_request(req); 506 return; 507 } 508 509 rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len); 510 if (rc < 0) { 511 args->fn.fs_op_with_handle(args->arg, fs, rc); 512 free_fs_request(req); 513 return; 514 } 515 516 rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len); 517 if (rc < 0) { 518 args->fn.fs_op_with_handle(args->arg, fs, rc); 519 free_fs_request(req); 520 return; 521 } 522 assert(value_len == 8); 523 524 f = file_alloc(fs); 525 if (f == NULL) { 526 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 527 free_fs_request(req); 528 return; 529 } 530 531 f->name = strdup(name); 532 f->blobid = spdk_blob_get_id(blob); 533 f->length = *length; 534 f->length_flushed = *length; 535 f->append_pos = *length; 536 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 537 538 spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req); 539 } 540 541 static void 542 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 543 { 544 struct spdk_fs_request *req = ctx; 545 struct spdk_fs_cb_args *args = &req->args; 546 struct spdk_filesystem *fs = args->fs; 547 548 if (bserrno != 0) { 549 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 550 free_fs_request(req); 551 free(fs); 552 return; 553 } 554 555 common_fs_bs_init(fs, bs); 556 spdk_bs_md_iter_first(fs->bs, iter_cb, req); 557 } 558 559 void 560 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 561 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 562 { 563 struct spdk_filesystem *fs; 564 struct spdk_fs_cb_args *args; 565 struct spdk_fs_request *req; 566 567 fs = fs_alloc(dev, send_request_fn); 568 if (fs == NULL) { 569 cb_fn(cb_arg, NULL, -ENOMEM); 570 return; 571 } 572 573 req = alloc_fs_request(fs->md_target.md_fs_channel); 574 if (req == NULL) { 575 spdk_put_io_channel(fs->md_target.md_io_channel); 576 spdk_io_device_unregister(&fs->md_target, NULL); 577 spdk_put_io_channel(fs->sync_target.sync_io_channel); 578 spdk_io_device_unregister(&fs->sync_target, NULL); 579 spdk_io_device_unregister(&fs->io_target, NULL); 580 free(fs); 581 cb_fn(cb_arg, NULL, -ENOMEM); 582 return; 583 } 584 585 args = &req->args; 586 args->fn.fs_op_with_handle = cb_fn; 587 args->arg = cb_arg; 588 args->fs = fs; 589 590 spdk_bs_load(dev, load_cb, req); 591 } 592 593 static void 594 unload_cb(void *ctx, int bserrno) 595 { 596 struct spdk_fs_request *req = ctx; 597 struct spdk_fs_cb_args *args = &req->args; 598 struct spdk_filesystem *fs = args->fs; 599 600 pthread_mutex_lock(&g_cache_init_lock); 601 g_fs_count--; 602 if (g_fs_count == 0) { 603 __free_cache(); 604 } 605 pthread_mutex_unlock(&g_cache_init_lock); 606 607 args->fn.fs_op(args->arg, bserrno); 608 free(req); 609 610 spdk_io_device_unregister(&fs->io_target, NULL); 611 spdk_io_device_unregister(&fs->sync_target, NULL); 612 spdk_io_device_unregister(&fs->md_target, NULL); 613 614 free(fs); 615 } 616 617 void 618 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 619 { 620 struct spdk_fs_request *req; 621 struct spdk_fs_cb_args *args; 622 623 /* 624 * We must free the md_channel before unloading the blobstore, so just 625 * allocate this request from the general heap. 626 */ 627 req = calloc(1, sizeof(*req)); 628 if (req == NULL) { 629 cb_fn(cb_arg, -ENOMEM); 630 return; 631 } 632 633 args = &req->args; 634 args->fn.fs_op = cb_fn; 635 args->arg = cb_arg; 636 args->fs = fs; 637 638 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 639 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 640 spdk_bs_unload(fs->bs, unload_cb, req); 641 } 642 643 static struct spdk_file * 644 fs_find_file(struct spdk_filesystem *fs, const char *name) 645 { 646 struct spdk_file *file; 647 648 TAILQ_FOREACH(file, &fs->files, tailq) { 649 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 650 return file; 651 } 652 } 653 654 return NULL; 655 } 656 657 void 658 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 659 spdk_file_stat_op_complete cb_fn, void *cb_arg) 660 { 661 struct spdk_file_stat stat; 662 struct spdk_file *f = NULL; 663 664 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 665 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 666 return; 667 } 668 669 f = fs_find_file(fs, name); 670 if (f != NULL) { 671 stat.blobid = f->blobid; 672 stat.size = f->length; 673 cb_fn(cb_arg, &stat, 0); 674 return; 675 } 676 677 cb_fn(cb_arg, NULL, -ENOENT); 678 } 679 680 static void 681 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 682 { 683 struct spdk_fs_request *req = arg; 684 struct spdk_fs_cb_args *args = &req->args; 685 686 args->rc = fserrno; 687 if (fserrno == 0) { 688 memcpy(args->arg, stat, sizeof(*stat)); 689 } 690 sem_post(args->sem); 691 } 692 693 static void 694 __file_stat(void *arg) 695 { 696 struct spdk_fs_request *req = arg; 697 struct spdk_fs_cb_args *args = &req->args; 698 699 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 700 args->fn.stat_op, req); 701 } 702 703 int 704 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 705 const char *name, struct spdk_file_stat *stat) 706 { 707 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 708 struct spdk_fs_request *req; 709 int rc; 710 711 req = alloc_fs_request(channel); 712 assert(req != NULL); 713 714 req->args.fs = fs; 715 req->args.op.stat.name = name; 716 req->args.fn.stat_op = __copy_stat; 717 req->args.arg = stat; 718 req->args.sem = &channel->sem; 719 channel->send_request(__file_stat, req); 720 sem_wait(&channel->sem); 721 722 rc = req->args.rc; 723 free_fs_request(req); 724 725 return rc; 726 } 727 728 static void 729 fs_create_blob_close_cb(void *ctx, int bserrno) 730 { 731 struct spdk_fs_request *req = ctx; 732 struct spdk_fs_cb_args *args = &req->args; 733 734 args->fn.file_op(args->arg, bserrno); 735 free_fs_request(req); 736 } 737 738 static void 739 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 740 { 741 struct spdk_fs_request *req = ctx; 742 struct spdk_fs_cb_args *args = &req->args; 743 struct spdk_file *f = args->file; 744 uint64_t length = 0; 745 746 f->blob = blob; 747 spdk_bs_md_resize_blob(blob, 1); 748 spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 749 spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length)); 750 751 spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args); 752 } 753 754 static void 755 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 756 { 757 struct spdk_fs_request *req = ctx; 758 struct spdk_fs_cb_args *args = &req->args; 759 struct spdk_file *f = args->file; 760 761 f->blobid = blobid; 762 spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 763 } 764 765 void 766 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 767 spdk_file_op_complete cb_fn, void *cb_arg) 768 { 769 struct spdk_file *file; 770 struct spdk_fs_request *req; 771 struct spdk_fs_cb_args *args; 772 773 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 774 cb_fn(cb_arg, -ENAMETOOLONG); 775 return; 776 } 777 778 file = fs_find_file(fs, name); 779 if (file != NULL) { 780 cb_fn(cb_arg, -EEXIST); 781 return; 782 } 783 784 file = file_alloc(fs); 785 if (file == NULL) { 786 cb_fn(cb_arg, -ENOMEM); 787 return; 788 } 789 790 req = alloc_fs_request(fs->md_target.md_fs_channel); 791 if (req == NULL) { 792 cb_fn(cb_arg, -ENOMEM); 793 return; 794 } 795 796 args = &req->args; 797 args->file = file; 798 args->fn.file_op = cb_fn; 799 args->arg = cb_arg; 800 801 file->name = strdup(name); 802 spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args); 803 } 804 805 static void 806 __fs_create_file_done(void *arg, int fserrno) 807 { 808 struct spdk_fs_request *req = arg; 809 struct spdk_fs_cb_args *args = &req->args; 810 811 args->rc = fserrno; 812 sem_post(args->sem); 813 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 814 } 815 816 static void 817 __fs_create_file(void *arg) 818 { 819 struct spdk_fs_request *req = arg; 820 struct spdk_fs_cb_args *args = &req->args; 821 822 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 823 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 824 } 825 826 int 827 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 828 { 829 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 830 struct spdk_fs_request *req; 831 struct spdk_fs_cb_args *args; 832 int rc; 833 834 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 835 836 req = alloc_fs_request(channel); 837 assert(req != NULL); 838 839 args = &req->args; 840 args->fs = fs; 841 args->op.create.name = name; 842 args->sem = &channel->sem; 843 fs->send_request(__fs_create_file, req); 844 sem_wait(&channel->sem); 845 rc = args->rc; 846 free_fs_request(req); 847 848 return rc; 849 } 850 851 static void 852 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 853 { 854 struct spdk_fs_request *req = ctx; 855 struct spdk_fs_cb_args *args = &req->args; 856 struct spdk_file *f = args->file; 857 858 f->blob = blob; 859 while (!TAILQ_EMPTY(&f->open_requests)) { 860 req = TAILQ_FIRST(&f->open_requests); 861 args = &req->args; 862 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 863 args->fn.file_op_with_handle(args->arg, f, bserrno); 864 free_fs_request(req); 865 } 866 } 867 868 static void 869 fs_open_blob_create_cb(void *ctx, int bserrno) 870 { 871 struct spdk_fs_request *req = ctx; 872 struct spdk_fs_cb_args *args = &req->args; 873 struct spdk_file *file = args->file; 874 struct spdk_filesystem *fs = args->fs; 875 876 if (file == NULL) { 877 /* 878 * This is from an open with CREATE flag - the file 879 * is now created so look it up in the file list for this 880 * filesystem. 881 */ 882 file = fs_find_file(fs, args->op.open.name); 883 assert(file != NULL); 884 args->file = file; 885 } 886 887 file->ref_count++; 888 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 889 if (file->ref_count == 1) { 890 assert(file->blob == NULL); 891 spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 892 } else if (file->blob != NULL) { 893 fs_open_blob_done(req, file->blob, 0); 894 } else { 895 /* 896 * The blob open for this file is in progress due to a previous 897 * open request. When that open completes, it will invoke the 898 * open callback for this request. 899 */ 900 } 901 } 902 903 void 904 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 905 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 906 { 907 struct spdk_file *f = NULL; 908 struct spdk_fs_request *req; 909 struct spdk_fs_cb_args *args; 910 911 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 912 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 913 return; 914 } 915 916 f = fs_find_file(fs, name); 917 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 918 cb_fn(cb_arg, NULL, -ENOENT); 919 return; 920 } 921 922 req = alloc_fs_request(fs->md_target.md_fs_channel); 923 if (req == NULL) { 924 cb_fn(cb_arg, NULL, -ENOMEM); 925 return; 926 } 927 928 args = &req->args; 929 args->fn.file_op_with_handle = cb_fn; 930 args->arg = cb_arg; 931 args->file = f; 932 args->fs = fs; 933 args->op.open.name = name; 934 935 if (f == NULL) { 936 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 937 } else { 938 fs_open_blob_create_cb(req, 0); 939 } 940 } 941 942 static void 943 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 944 { 945 struct spdk_fs_request *req = arg; 946 struct spdk_fs_cb_args *args = &req->args; 947 948 args->file = file; 949 args->rc = bserrno; 950 sem_post(args->sem); 951 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 952 } 953 954 static void 955 __fs_open_file(void *arg) 956 { 957 struct spdk_fs_request *req = arg; 958 struct spdk_fs_cb_args *args = &req->args; 959 960 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 961 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 962 __fs_open_file_done, req); 963 } 964 965 int 966 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 967 const char *name, uint32_t flags, struct spdk_file **file) 968 { 969 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 970 struct spdk_fs_request *req; 971 struct spdk_fs_cb_args *args; 972 int rc; 973 974 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 975 976 req = alloc_fs_request(channel); 977 assert(req != NULL); 978 979 args = &req->args; 980 args->fs = fs; 981 args->op.open.name = name; 982 args->op.open.flags = flags; 983 args->sem = &channel->sem; 984 fs->send_request(__fs_open_file, req); 985 sem_wait(&channel->sem); 986 rc = args->rc; 987 if (rc == 0) { 988 *file = args->file; 989 } else { 990 *file = NULL; 991 } 992 free_fs_request(req); 993 994 return rc; 995 } 996 997 static void 998 fs_rename_blob_close_cb(void *ctx, int bserrno) 999 { 1000 struct spdk_fs_request *req = ctx; 1001 struct spdk_fs_cb_args *args = &req->args; 1002 1003 args->fn.fs_op(args->arg, bserrno); 1004 free_fs_request(req); 1005 } 1006 1007 static void 1008 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1009 { 1010 struct spdk_fs_request *req = ctx; 1011 struct spdk_fs_cb_args *args = &req->args; 1012 struct spdk_file *f = args->file; 1013 const char *new_name = args->op.rename.new_name; 1014 1015 f->blob = blob; 1016 spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1017 spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req); 1018 } 1019 1020 static void 1021 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1022 { 1023 struct spdk_fs_cb_args *args = &req->args; 1024 struct spdk_file *f; 1025 1026 f = fs_find_file(args->fs, args->op.rename.old_name); 1027 if (f == NULL) { 1028 args->fn.fs_op(args->arg, -ENOENT); 1029 free_fs_request(req); 1030 return; 1031 } 1032 1033 free(f->name); 1034 f->name = strdup(args->op.rename.new_name); 1035 args->file = f; 1036 spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1037 } 1038 1039 static void 1040 fs_rename_delete_done(void *arg, int fserrno) 1041 { 1042 __spdk_fs_md_rename_file(arg); 1043 } 1044 1045 void 1046 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1047 const char *old_name, const char *new_name, 1048 spdk_file_op_complete cb_fn, void *cb_arg) 1049 { 1050 struct spdk_file *f; 1051 struct spdk_fs_request *req; 1052 struct spdk_fs_cb_args *args; 1053 1054 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1055 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1056 cb_fn(cb_arg, -ENAMETOOLONG); 1057 return; 1058 } 1059 1060 req = alloc_fs_request(fs->md_target.md_fs_channel); 1061 if (req == NULL) { 1062 cb_fn(cb_arg, -ENOMEM); 1063 return; 1064 } 1065 1066 args = &req->args; 1067 args->fn.fs_op = cb_fn; 1068 args->fs = fs; 1069 args->arg = cb_arg; 1070 args->op.rename.old_name = old_name; 1071 args->op.rename.new_name = new_name; 1072 1073 f = fs_find_file(fs, new_name); 1074 if (f == NULL) { 1075 __spdk_fs_md_rename_file(req); 1076 return; 1077 } 1078 1079 /* 1080 * The rename overwrites an existing file. So delete the existing file, then 1081 * do the actual rename. 1082 */ 1083 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1084 } 1085 1086 static void 1087 __fs_rename_file_done(void *arg, int fserrno) 1088 { 1089 struct spdk_fs_request *req = arg; 1090 struct spdk_fs_cb_args *args = &req->args; 1091 1092 args->rc = fserrno; 1093 sem_post(args->sem); 1094 } 1095 1096 static void 1097 __fs_rename_file(void *arg) 1098 { 1099 struct spdk_fs_request *req = arg; 1100 struct spdk_fs_cb_args *args = &req->args; 1101 1102 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1103 __fs_rename_file_done, req); 1104 } 1105 1106 int 1107 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1108 const char *old_name, const char *new_name) 1109 { 1110 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1111 struct spdk_fs_request *req; 1112 struct spdk_fs_cb_args *args; 1113 int rc; 1114 1115 req = alloc_fs_request(channel); 1116 assert(req != NULL); 1117 1118 args = &req->args; 1119 1120 args->fs = fs; 1121 args->op.rename.old_name = old_name; 1122 args->op.rename.new_name = new_name; 1123 args->sem = &channel->sem; 1124 fs->send_request(__fs_rename_file, req); 1125 sem_wait(&channel->sem); 1126 rc = args->rc; 1127 free_fs_request(req); 1128 return rc; 1129 } 1130 1131 static void 1132 blob_delete_cb(void *ctx, int bserrno) 1133 { 1134 struct spdk_fs_request *req = ctx; 1135 struct spdk_fs_cb_args *args = &req->args; 1136 1137 args->fn.file_op(args->arg, bserrno); 1138 free_fs_request(req); 1139 } 1140 1141 void 1142 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1143 spdk_file_op_complete cb_fn, void *cb_arg) 1144 { 1145 struct spdk_file *f; 1146 spdk_blob_id blobid; 1147 struct spdk_fs_request *req; 1148 struct spdk_fs_cb_args *args; 1149 1150 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1151 1152 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1153 cb_fn(cb_arg, -ENAMETOOLONG); 1154 return; 1155 } 1156 1157 f = fs_find_file(fs, name); 1158 if (f == NULL) { 1159 cb_fn(cb_arg, -ENOENT); 1160 return; 1161 } 1162 1163 if (f->ref_count > 0) { 1164 /* For now, do not allow deleting files with open references. */ 1165 cb_fn(cb_arg, -EBUSY); 1166 return; 1167 } 1168 1169 req = alloc_fs_request(fs->md_target.md_fs_channel); 1170 if (req == NULL) { 1171 cb_fn(cb_arg, -ENOMEM); 1172 return; 1173 } 1174 1175 TAILQ_REMOVE(&fs->files, f, tailq); 1176 1177 cache_free_buffers(f); 1178 1179 blobid = f->blobid; 1180 1181 free(f->name); 1182 free(f->tree); 1183 free(f); 1184 1185 args = &req->args; 1186 args->fn.file_op = cb_fn; 1187 args->arg = cb_arg; 1188 spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1189 } 1190 1191 static void 1192 __fs_delete_file_done(void *arg, int fserrno) 1193 { 1194 struct spdk_fs_request *req = arg; 1195 struct spdk_fs_cb_args *args = &req->args; 1196 1197 args->rc = fserrno; 1198 sem_post(args->sem); 1199 } 1200 1201 static void 1202 __fs_delete_file(void *arg) 1203 { 1204 struct spdk_fs_request *req = arg; 1205 struct spdk_fs_cb_args *args = &req->args; 1206 1207 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1208 } 1209 1210 int 1211 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1212 const char *name) 1213 { 1214 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1215 struct spdk_fs_request *req; 1216 struct spdk_fs_cb_args *args; 1217 int rc; 1218 1219 req = alloc_fs_request(channel); 1220 assert(req != NULL); 1221 1222 args = &req->args; 1223 args->fs = fs; 1224 args->op.delete.name = name; 1225 args->sem = &channel->sem; 1226 fs->send_request(__fs_delete_file, req); 1227 sem_wait(&channel->sem); 1228 rc = args->rc; 1229 free_fs_request(req); 1230 1231 return rc; 1232 } 1233 1234 spdk_fs_iter 1235 spdk_fs_iter_first(struct spdk_filesystem *fs) 1236 { 1237 struct spdk_file *f; 1238 1239 f = TAILQ_FIRST(&fs->files); 1240 return f; 1241 } 1242 1243 spdk_fs_iter 1244 spdk_fs_iter_next(spdk_fs_iter iter) 1245 { 1246 struct spdk_file *f = iter; 1247 1248 if (f == NULL) { 1249 return NULL; 1250 } 1251 1252 f = TAILQ_NEXT(f, tailq); 1253 return f; 1254 } 1255 1256 const char * 1257 spdk_file_get_name(struct spdk_file *file) 1258 { 1259 return file->name; 1260 } 1261 1262 uint64_t 1263 spdk_file_get_length(struct spdk_file *file) 1264 { 1265 assert(file != NULL); 1266 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1267 return file->length; 1268 } 1269 1270 static void 1271 fs_truncate_complete_cb(void *ctx, int bserrno) 1272 { 1273 struct spdk_fs_request *req = ctx; 1274 struct spdk_fs_cb_args *args = &req->args; 1275 1276 args->fn.file_op(args->arg, bserrno); 1277 free_fs_request(req); 1278 } 1279 1280 static uint64_t 1281 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1282 { 1283 return (length + cluster_sz - 1) / cluster_sz; 1284 } 1285 1286 void 1287 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1288 spdk_file_op_complete cb_fn, void *cb_arg) 1289 { 1290 struct spdk_filesystem *fs; 1291 size_t num_clusters; 1292 struct spdk_fs_request *req; 1293 struct spdk_fs_cb_args *args; 1294 1295 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1296 if (length == file->length) { 1297 cb_fn(cb_arg, 0); 1298 return; 1299 } 1300 1301 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1302 if (req == NULL) { 1303 cb_fn(cb_arg, -ENOMEM); 1304 return; 1305 } 1306 1307 args = &req->args; 1308 args->fn.file_op = cb_fn; 1309 args->arg = cb_arg; 1310 args->file = file; 1311 fs = file->fs; 1312 1313 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1314 1315 spdk_bs_md_resize_blob(file->blob, num_clusters); 1316 spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length)); 1317 1318 file->length = length; 1319 if (file->append_pos > file->length) { 1320 file->append_pos = file->length; 1321 } 1322 1323 spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args); 1324 } 1325 1326 static void 1327 __truncate(void *arg) 1328 { 1329 struct spdk_fs_request *req = arg; 1330 struct spdk_fs_cb_args *args = &req->args; 1331 1332 spdk_file_truncate_async(args->file, args->op.truncate.length, 1333 args->fn.file_op, args->arg); 1334 } 1335 1336 void 1337 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1338 uint64_t length) 1339 { 1340 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1341 struct spdk_fs_request *req; 1342 struct spdk_fs_cb_args *args; 1343 1344 req = alloc_fs_request(channel); 1345 assert(req != NULL); 1346 1347 args = &req->args; 1348 1349 args->file = file; 1350 args->op.truncate.length = length; 1351 args->fn.file_op = __sem_post; 1352 args->arg = &channel->sem; 1353 1354 channel->send_request(__truncate, req); 1355 sem_wait(&channel->sem); 1356 free_fs_request(req); 1357 } 1358 1359 static void 1360 __rw_done(void *ctx, int bserrno) 1361 { 1362 struct spdk_fs_request *req = ctx; 1363 struct spdk_fs_cb_args *args = &req->args; 1364 1365 spdk_dma_free(args->op.rw.pin_buf); 1366 args->fn.file_op(args->arg, bserrno); 1367 free_fs_request(req); 1368 } 1369 1370 static void 1371 __read_done(void *ctx, int bserrno) 1372 { 1373 struct spdk_fs_request *req = ctx; 1374 struct spdk_fs_cb_args *args = &req->args; 1375 1376 if (args->op.rw.is_read) { 1377 memcpy(args->op.rw.user_buf, 1378 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1379 args->op.rw.length); 1380 __rw_done(req, 0); 1381 } else { 1382 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1383 args->op.rw.user_buf, 1384 args->op.rw.length); 1385 spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel, 1386 args->op.rw.pin_buf, 1387 args->op.rw.start_page, args->op.rw.num_pages, 1388 __rw_done, req); 1389 } 1390 } 1391 1392 static void 1393 __do_blob_read(void *ctx, int fserrno) 1394 { 1395 struct spdk_fs_request *req = ctx; 1396 struct spdk_fs_cb_args *args = &req->args; 1397 1398 spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel, 1399 args->op.rw.pin_buf, 1400 args->op.rw.start_page, args->op.rw.num_pages, 1401 __read_done, req); 1402 } 1403 1404 static void 1405 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1406 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1407 { 1408 uint64_t end_page; 1409 1410 *page_size = spdk_bs_get_page_size(file->fs->bs); 1411 *start_page = offset / *page_size; 1412 end_page = (offset + length - 1) / *page_size; 1413 *num_pages = (end_page - *start_page + 1); 1414 } 1415 1416 static void 1417 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1418 void *payload, uint64_t offset, uint64_t length, 1419 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1420 { 1421 struct spdk_fs_request *req; 1422 struct spdk_fs_cb_args *args; 1423 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1424 uint64_t start_page, num_pages, pin_buf_length; 1425 uint32_t page_size; 1426 1427 if (is_read && offset + length > file->length) { 1428 cb_fn(cb_arg, -EINVAL); 1429 return; 1430 } 1431 1432 req = alloc_fs_request(channel); 1433 if (req == NULL) { 1434 cb_fn(cb_arg, -ENOMEM); 1435 return; 1436 } 1437 1438 args = &req->args; 1439 args->fn.file_op = cb_fn; 1440 args->arg = cb_arg; 1441 args->file = file; 1442 args->op.rw.channel = channel->bs_channel; 1443 args->op.rw.user_buf = payload; 1444 args->op.rw.is_read = is_read; 1445 args->op.rw.offset = offset; 1446 args->op.rw.length = length; 1447 1448 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1449 pin_buf_length = num_pages * page_size; 1450 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL); 1451 1452 args->op.rw.start_page = start_page; 1453 args->op.rw.num_pages = num_pages; 1454 1455 if (!is_read && file->length < offset + length) { 1456 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1457 } else { 1458 __do_blob_read(req, 0); 1459 } 1460 } 1461 1462 void 1463 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1464 void *payload, uint64_t offset, uint64_t length, 1465 spdk_file_op_complete cb_fn, void *cb_arg) 1466 { 1467 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1468 } 1469 1470 void 1471 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1472 void *payload, uint64_t offset, uint64_t length, 1473 spdk_file_op_complete cb_fn, void *cb_arg) 1474 { 1475 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n", 1476 file->name, offset, length); 1477 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1478 } 1479 1480 struct spdk_io_channel * 1481 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1482 { 1483 struct spdk_io_channel *io_channel; 1484 struct spdk_fs_channel *fs_channel; 1485 1486 io_channel = spdk_get_io_channel(&fs->io_target); 1487 fs_channel = spdk_io_channel_get_ctx(io_channel); 1488 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1489 fs_channel->send_request = __send_request_direct; 1490 1491 return io_channel; 1492 } 1493 1494 struct spdk_io_channel * 1495 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs) 1496 { 1497 struct spdk_io_channel *io_channel; 1498 struct spdk_fs_channel *fs_channel; 1499 1500 io_channel = spdk_get_io_channel(&fs->io_target); 1501 fs_channel = spdk_io_channel_get_ctx(io_channel); 1502 fs_channel->send_request = fs->send_request; 1503 fs_channel->sync = 1; 1504 pthread_spin_init(&fs_channel->lock, 0); 1505 1506 return io_channel; 1507 } 1508 1509 void 1510 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1511 { 1512 spdk_put_io_channel(channel); 1513 } 1514 1515 void 1516 spdk_fs_set_cache_size(uint64_t size_in_mb) 1517 { 1518 g_fs_cache_size = size_in_mb * 1024 * 1024; 1519 } 1520 1521 uint64_t 1522 spdk_fs_get_cache_size(void) 1523 { 1524 return g_fs_cache_size / (1024 * 1024); 1525 } 1526 1527 static void __file_flush(void *_args); 1528 1529 static void * 1530 alloc_cache_memory_buffer(struct spdk_file *context) 1531 { 1532 struct spdk_file *file; 1533 void *buf; 1534 1535 buf = spdk_mempool_get(g_cache_pool); 1536 if (buf != NULL) { 1537 return buf; 1538 } 1539 1540 pthread_spin_lock(&g_caches_lock); 1541 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1542 if (!file->open_for_writing && 1543 file->priority == SPDK_FILE_PRIORITY_LOW && 1544 file != context) { 1545 break; 1546 } 1547 } 1548 pthread_spin_unlock(&g_caches_lock); 1549 if (file != NULL) { 1550 cache_free_buffers(file); 1551 buf = spdk_mempool_get(g_cache_pool); 1552 if (buf != NULL) { 1553 return buf; 1554 } 1555 } 1556 1557 pthread_spin_lock(&g_caches_lock); 1558 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1559 if (!file->open_for_writing && file != context) { 1560 break; 1561 } 1562 } 1563 pthread_spin_unlock(&g_caches_lock); 1564 if (file != NULL) { 1565 cache_free_buffers(file); 1566 buf = spdk_mempool_get(g_cache_pool); 1567 if (buf != NULL) { 1568 return buf; 1569 } 1570 } 1571 1572 pthread_spin_lock(&g_caches_lock); 1573 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1574 if (file != context) { 1575 break; 1576 } 1577 } 1578 pthread_spin_unlock(&g_caches_lock); 1579 if (file != NULL) { 1580 cache_free_buffers(file); 1581 buf = spdk_mempool_get(g_cache_pool); 1582 if (buf != NULL) { 1583 return buf; 1584 } 1585 } 1586 1587 return NULL; 1588 } 1589 1590 static struct cache_buffer * 1591 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1592 { 1593 struct cache_buffer *buf; 1594 int count = 0; 1595 1596 buf = calloc(1, sizeof(*buf)); 1597 if (buf == NULL) { 1598 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "calloc failed\n"); 1599 return NULL; 1600 } 1601 1602 buf->buf = alloc_cache_memory_buffer(file); 1603 while (buf->buf == NULL) { 1604 /* 1605 * TODO: alloc_cache_memory_buffer() should eventually free 1606 * some buffers. Need a more sophisticated check here, instead 1607 * of just bailing if 100 tries does not result in getting a 1608 * free buffer. This will involve using the sync channel's 1609 * semaphore to block until a buffer becomes available. 1610 */ 1611 if (count++ == 100) { 1612 SPDK_ERRLOG("could not allocate cache buffer\n"); 1613 assert(false); 1614 free(buf); 1615 return NULL; 1616 } 1617 buf->buf = alloc_cache_memory_buffer(file); 1618 } 1619 1620 buf->buf_size = CACHE_BUFFER_SIZE; 1621 buf->offset = offset; 1622 1623 pthread_spin_lock(&g_caches_lock); 1624 if (file->tree->present_mask == 0) { 1625 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1626 } 1627 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1628 pthread_spin_unlock(&g_caches_lock); 1629 1630 return buf; 1631 } 1632 1633 static struct cache_buffer * 1634 cache_append_buffer(struct spdk_file *file) 1635 { 1636 struct cache_buffer *last; 1637 1638 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1639 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1640 1641 last = cache_insert_buffer(file, file->append_pos); 1642 if (last == NULL) { 1643 SPDK_DEBUGLOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n"); 1644 return NULL; 1645 } 1646 1647 file->last = last; 1648 1649 return last; 1650 } 1651 1652 static void 1653 __wake_caller(struct spdk_fs_cb_args *args) 1654 { 1655 sem_post(args->sem); 1656 } 1657 1658 static void __check_sync_reqs(struct spdk_file *file); 1659 1660 static void 1661 __file_cache_finish_sync(struct spdk_file *file) 1662 { 1663 struct spdk_fs_request *sync_req; 1664 struct spdk_fs_cb_args *sync_args; 1665 1666 pthread_spin_lock(&file->lock); 1667 sync_req = TAILQ_FIRST(&file->sync_requests); 1668 sync_args = &sync_req->args; 1669 assert(sync_args->op.sync.offset <= file->length_flushed); 1670 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1671 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1672 pthread_spin_unlock(&file->lock); 1673 1674 sync_args->fn.file_op(sync_args->arg, 0); 1675 __check_sync_reqs(file); 1676 1677 pthread_spin_lock(&file->lock); 1678 free_fs_request(sync_req); 1679 pthread_spin_unlock(&file->lock); 1680 } 1681 1682 static void 1683 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1684 { 1685 struct spdk_file *file = ctx; 1686 1687 __file_cache_finish_sync(file); 1688 } 1689 1690 static void 1691 __free_args(struct spdk_fs_cb_args *args) 1692 { 1693 struct spdk_fs_request *req; 1694 1695 if (!args->from_request) { 1696 free(args); 1697 } else { 1698 /* Depends on args being at the start of the spdk_fs_request structure. */ 1699 req = (struct spdk_fs_request *)args; 1700 free_fs_request(req); 1701 } 1702 } 1703 1704 static void 1705 __check_sync_reqs(struct spdk_file *file) 1706 { 1707 struct spdk_fs_request *sync_req; 1708 1709 pthread_spin_lock(&file->lock); 1710 1711 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1712 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1713 break; 1714 } 1715 } 1716 1717 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1718 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1719 sync_req->args.op.sync.xattr_in_progress = true; 1720 spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed, 1721 sizeof(file->length_flushed)); 1722 1723 pthread_spin_unlock(&file->lock); 1724 spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file); 1725 } else { 1726 pthread_spin_unlock(&file->lock); 1727 } 1728 } 1729 1730 static void 1731 __file_flush_done(void *arg, int bserrno) 1732 { 1733 struct spdk_fs_cb_args *args = arg; 1734 struct spdk_file *file = args->file; 1735 struct cache_buffer *next = args->op.flush.cache_buffer; 1736 1737 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1738 1739 pthread_spin_lock(&file->lock); 1740 next->in_progress = false; 1741 next->bytes_flushed += args->op.flush.length; 1742 file->length_flushed += args->op.flush.length; 1743 if (file->length_flushed > file->length) { 1744 file->length = file->length_flushed; 1745 } 1746 if (next->bytes_flushed == next->buf_size) { 1747 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1748 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1749 } 1750 1751 /* 1752 * Assert that there is no cached data that extends past the end of the underlying 1753 * blob. 1754 */ 1755 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1756 next->bytes_filled == 0); 1757 1758 pthread_spin_unlock(&file->lock); 1759 1760 __check_sync_reqs(file); 1761 1762 __file_flush(args); 1763 } 1764 1765 static void 1766 __file_flush(void *_args) 1767 { 1768 struct spdk_fs_cb_args *args = _args; 1769 struct spdk_file *file = args->file; 1770 struct cache_buffer *next; 1771 uint64_t offset, length, start_page, num_pages; 1772 uint32_t page_size; 1773 1774 pthread_spin_lock(&file->lock); 1775 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1776 if (next == NULL || next->in_progress) { 1777 /* 1778 * There is either no data to flush, or a flush I/O is already in 1779 * progress. So return immediately - if a flush I/O is in 1780 * progress we will flush more data after that is completed. 1781 */ 1782 __free_args(args); 1783 pthread_spin_unlock(&file->lock); 1784 return; 1785 } 1786 1787 offset = next->offset + next->bytes_flushed; 1788 length = next->bytes_filled - next->bytes_flushed; 1789 if (length == 0) { 1790 __free_args(args); 1791 pthread_spin_unlock(&file->lock); 1792 return; 1793 } 1794 args->op.flush.length = length; 1795 args->op.flush.cache_buffer = next; 1796 1797 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1798 1799 next->in_progress = true; 1800 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1801 offset, length, start_page, num_pages); 1802 pthread_spin_unlock(&file->lock); 1803 spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1804 next->buf + (start_page * page_size) - next->offset, 1805 start_page, num_pages, 1806 __file_flush_done, args); 1807 } 1808 1809 static void 1810 __file_extend_done(void *arg, int bserrno) 1811 { 1812 struct spdk_fs_cb_args *args = arg; 1813 1814 __wake_caller(args); 1815 } 1816 1817 static void 1818 __file_extend_blob(void *_args) 1819 { 1820 struct spdk_fs_cb_args *args = _args; 1821 struct spdk_file *file = args->file; 1822 1823 spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters); 1824 1825 spdk_bs_md_sync_blob(file->blob, __file_extend_done, args); 1826 } 1827 1828 static void 1829 __rw_from_file_done(void *arg, int bserrno) 1830 { 1831 struct spdk_fs_cb_args *args = arg; 1832 1833 __wake_caller(args); 1834 __free_args(args); 1835 } 1836 1837 static void 1838 __rw_from_file(void *_args) 1839 { 1840 struct spdk_fs_cb_args *args = _args; 1841 struct spdk_file *file = args->file; 1842 1843 if (args->op.rw.is_read) { 1844 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1845 args->op.rw.offset, args->op.rw.length, 1846 __rw_from_file_done, args); 1847 } else { 1848 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1849 args->op.rw.offset, args->op.rw.length, 1850 __rw_from_file_done, args); 1851 } 1852 } 1853 1854 static int 1855 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1856 uint64_t offset, uint64_t length, bool is_read) 1857 { 1858 struct spdk_fs_cb_args *args; 1859 1860 args = calloc(1, sizeof(*args)); 1861 if (args == NULL) { 1862 sem_post(sem); 1863 return -ENOMEM; 1864 } 1865 1866 args->file = file; 1867 args->sem = sem; 1868 args->op.rw.user_buf = payload; 1869 args->op.rw.offset = offset; 1870 args->op.rw.length = length; 1871 args->op.rw.is_read = is_read; 1872 file->fs->send_request(__rw_from_file, args); 1873 return 0; 1874 } 1875 1876 int 1877 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 1878 void *payload, uint64_t offset, uint64_t length) 1879 { 1880 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1881 struct spdk_fs_cb_args *args; 1882 uint64_t rem_length, copy, blob_size, cluster_sz; 1883 uint32_t cache_buffers_filled = 0; 1884 uint8_t *cur_payload; 1885 struct cache_buffer *last; 1886 1887 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 1888 1889 if (length == 0) { 1890 return 0; 1891 } 1892 1893 if (offset != file->append_pos) { 1894 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 1895 return -EINVAL; 1896 } 1897 1898 pthread_spin_lock(&file->lock); 1899 file->open_for_writing = true; 1900 1901 if (file->last == NULL) { 1902 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 1903 cache_append_buffer(file); 1904 } else { 1905 int rc; 1906 1907 file->append_pos += length; 1908 pthread_spin_unlock(&file->lock); 1909 rc = __send_rw_from_file(file, &channel->sem, payload, 1910 offset, length, false); 1911 sem_wait(&channel->sem); 1912 return rc; 1913 } 1914 } 1915 1916 blob_size = __file_get_blob_size(file); 1917 1918 if ((offset + length) > blob_size) { 1919 struct spdk_fs_cb_args extend_args = {}; 1920 1921 cluster_sz = file->fs->bs_opts.cluster_sz; 1922 extend_args.sem = &channel->sem; 1923 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 1924 extend_args.file = file; 1925 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 1926 pthread_spin_unlock(&file->lock); 1927 file->fs->send_request(__file_extend_blob, &extend_args); 1928 sem_wait(&channel->sem); 1929 } 1930 1931 last = file->last; 1932 rem_length = length; 1933 cur_payload = payload; 1934 while (rem_length > 0) { 1935 copy = last->buf_size - last->bytes_filled; 1936 if (copy > rem_length) { 1937 copy = rem_length; 1938 } 1939 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 1940 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 1941 file->append_pos += copy; 1942 if (file->length < file->append_pos) { 1943 file->length = file->append_pos; 1944 } 1945 cur_payload += copy; 1946 last->bytes_filled += copy; 1947 rem_length -= copy; 1948 if (last->bytes_filled == last->buf_size) { 1949 cache_buffers_filled++; 1950 last = cache_append_buffer(file); 1951 if (last == NULL) { 1952 BLOBFS_TRACE(file, "nomem\n"); 1953 pthread_spin_unlock(&file->lock); 1954 return -ENOMEM; 1955 } 1956 } 1957 } 1958 1959 if (cache_buffers_filled == 0) { 1960 pthread_spin_unlock(&file->lock); 1961 return 0; 1962 } 1963 1964 args = calloc(1, sizeof(*args)); 1965 if (args == NULL) { 1966 pthread_spin_unlock(&file->lock); 1967 return -ENOMEM; 1968 } 1969 1970 args->file = file; 1971 file->fs->send_request(__file_flush, args); 1972 pthread_spin_unlock(&file->lock); 1973 return 0; 1974 } 1975 1976 static void 1977 __readahead_done(void *arg, int bserrno) 1978 { 1979 struct spdk_fs_cb_args *args = arg; 1980 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 1981 struct spdk_file *file = args->file; 1982 1983 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 1984 1985 pthread_spin_lock(&file->lock); 1986 cache_buffer->bytes_filled = args->op.readahead.length; 1987 cache_buffer->bytes_flushed = args->op.readahead.length; 1988 cache_buffer->in_progress = false; 1989 pthread_spin_unlock(&file->lock); 1990 1991 __free_args(args); 1992 } 1993 1994 static void 1995 __readahead(void *_args) 1996 { 1997 struct spdk_fs_cb_args *args = _args; 1998 struct spdk_file *file = args->file; 1999 uint64_t offset, length, start_page, num_pages; 2000 uint32_t page_size; 2001 2002 offset = args->op.readahead.offset; 2003 length = args->op.readahead.length; 2004 assert(length > 0); 2005 2006 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 2007 2008 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2009 offset, length, start_page, num_pages); 2010 spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2011 args->op.readahead.cache_buffer->buf, 2012 start_page, num_pages, 2013 __readahead_done, args); 2014 } 2015 2016 static uint64_t 2017 __next_cache_buffer_offset(uint64_t offset) 2018 { 2019 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2020 } 2021 2022 static void 2023 check_readahead(struct spdk_file *file, uint64_t offset) 2024 { 2025 struct spdk_fs_cb_args *args; 2026 2027 offset = __next_cache_buffer_offset(offset); 2028 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2029 return; 2030 } 2031 2032 args = calloc(1, sizeof(*args)); 2033 if (args == NULL) { 2034 return; 2035 } 2036 2037 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2038 2039 args->file = file; 2040 args->op.readahead.offset = offset; 2041 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2042 args->op.readahead.cache_buffer->in_progress = true; 2043 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2044 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2045 } else { 2046 args->op.readahead.length = CACHE_BUFFER_SIZE; 2047 } 2048 file->fs->send_request(__readahead, args); 2049 } 2050 2051 static int 2052 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2053 { 2054 struct cache_buffer *buf; 2055 int rc; 2056 2057 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2058 if (buf == NULL) { 2059 pthread_spin_unlock(&file->lock); 2060 rc = __send_rw_from_file(file, sem, payload, offset, length, true); 2061 pthread_spin_lock(&file->lock); 2062 return rc; 2063 } 2064 2065 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2066 length = buf->offset + buf->bytes_filled - offset; 2067 } 2068 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2069 memcpy(payload, &buf->buf[offset - buf->offset], length); 2070 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2071 pthread_spin_lock(&g_caches_lock); 2072 spdk_tree_remove_buffer(file->tree, buf); 2073 if (file->tree->present_mask == 0) { 2074 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2075 } 2076 pthread_spin_unlock(&g_caches_lock); 2077 } 2078 2079 sem_post(sem); 2080 return 0; 2081 } 2082 2083 int64_t 2084 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2085 void *payload, uint64_t offset, uint64_t length) 2086 { 2087 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2088 uint64_t final_offset, final_length; 2089 uint32_t sub_reads = 0; 2090 int rc = 0; 2091 2092 pthread_spin_lock(&file->lock); 2093 2094 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2095 2096 file->open_for_writing = false; 2097 2098 if (length == 0 || offset >= file->append_pos) { 2099 pthread_spin_unlock(&file->lock); 2100 return 0; 2101 } 2102 2103 if (offset + length > file->append_pos) { 2104 length = file->append_pos - offset; 2105 } 2106 2107 if (offset != file->next_seq_offset) { 2108 file->seq_byte_count = 0; 2109 } 2110 file->seq_byte_count += length; 2111 file->next_seq_offset = offset + length; 2112 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2113 check_readahead(file, offset); 2114 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2115 } 2116 2117 final_length = 0; 2118 final_offset = offset + length; 2119 while (offset < final_offset) { 2120 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2121 if (length > (final_offset - offset)) { 2122 length = final_offset - offset; 2123 } 2124 rc = __file_read(file, payload, offset, length, &channel->sem); 2125 if (rc == 0) { 2126 final_length += length; 2127 } else { 2128 break; 2129 } 2130 payload += length; 2131 offset += length; 2132 sub_reads++; 2133 } 2134 pthread_spin_unlock(&file->lock); 2135 while (sub_reads-- > 0) { 2136 sem_wait(&channel->sem); 2137 } 2138 if (rc == 0) { 2139 return final_length; 2140 } else { 2141 return rc; 2142 } 2143 } 2144 2145 static void 2146 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2147 spdk_file_op_complete cb_fn, void *cb_arg) 2148 { 2149 struct spdk_fs_request *sync_req; 2150 struct spdk_fs_request *flush_req; 2151 struct spdk_fs_cb_args *sync_args; 2152 struct spdk_fs_cb_args *flush_args; 2153 2154 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2155 2156 pthread_spin_lock(&file->lock); 2157 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2158 BLOBFS_TRACE(file, "done - no data to flush\n"); 2159 pthread_spin_unlock(&file->lock); 2160 cb_fn(cb_arg, 0); 2161 return; 2162 } 2163 2164 sync_req = alloc_fs_request(channel); 2165 assert(sync_req != NULL); 2166 sync_args = &sync_req->args; 2167 2168 flush_req = alloc_fs_request(channel); 2169 assert(flush_req != NULL); 2170 flush_args = &flush_req->args; 2171 2172 sync_args->file = file; 2173 sync_args->fn.file_op = cb_fn; 2174 sync_args->arg = cb_arg; 2175 sync_args->op.sync.offset = file->append_pos; 2176 sync_args->op.sync.xattr_in_progress = false; 2177 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2178 pthread_spin_unlock(&file->lock); 2179 2180 flush_args->file = file; 2181 channel->send_request(__file_flush, flush_args); 2182 } 2183 2184 int 2185 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2186 { 2187 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2188 2189 _file_sync(file, channel, __sem_post, &channel->sem); 2190 sem_wait(&channel->sem); 2191 2192 return 0; 2193 } 2194 2195 void 2196 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2197 spdk_file_op_complete cb_fn, void *cb_arg) 2198 { 2199 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2200 2201 _file_sync(file, channel, cb_fn, cb_arg); 2202 } 2203 2204 void 2205 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2206 { 2207 BLOBFS_TRACE(file, "priority=%u\n", priority); 2208 file->priority = priority; 2209 2210 } 2211 2212 /* 2213 * Close routines 2214 */ 2215 2216 static void 2217 __file_close_async_done(void *ctx, int bserrno) 2218 { 2219 struct spdk_fs_request *req = ctx; 2220 struct spdk_fs_cb_args *args = &req->args; 2221 2222 args->fn.file_op(args->arg, bserrno); 2223 free_fs_request(req); 2224 } 2225 2226 static void 2227 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2228 { 2229 pthread_spin_lock(&file->lock); 2230 if (file->ref_count == 0) { 2231 pthread_spin_unlock(&file->lock); 2232 __file_close_async_done(req, -EBADF); 2233 return; 2234 } 2235 2236 file->ref_count--; 2237 if (file->ref_count > 0) { 2238 pthread_spin_unlock(&file->lock); 2239 __file_close_async_done(req, 0); 2240 return; 2241 } 2242 2243 pthread_spin_unlock(&file->lock); 2244 2245 spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req); 2246 } 2247 2248 static void 2249 __file_close_async__sync_done(void *arg, int fserrno) 2250 { 2251 struct spdk_fs_request *req = arg; 2252 struct spdk_fs_cb_args *args = &req->args; 2253 2254 __file_close_async(args->file, req); 2255 } 2256 2257 void 2258 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2259 { 2260 struct spdk_fs_request *req; 2261 struct spdk_fs_cb_args *args; 2262 2263 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2264 if (req == NULL) { 2265 cb_fn(cb_arg, -ENOMEM); 2266 return; 2267 } 2268 2269 args = &req->args; 2270 args->file = file; 2271 args->fn.file_op = cb_fn; 2272 args->arg = cb_arg; 2273 2274 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2275 } 2276 2277 static void 2278 __file_close_done(void *arg, int fserrno) 2279 { 2280 struct spdk_fs_cb_args *args = arg; 2281 2282 args->rc = fserrno; 2283 sem_post(args->sem); 2284 } 2285 2286 static void 2287 __file_close(void *arg) 2288 { 2289 struct spdk_fs_request *req = arg; 2290 struct spdk_fs_cb_args *args = &req->args; 2291 struct spdk_file *file = args->file; 2292 2293 __file_close_async(file, req); 2294 } 2295 2296 int 2297 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2298 { 2299 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2300 struct spdk_fs_request *req; 2301 struct spdk_fs_cb_args *args; 2302 2303 req = alloc_fs_request(channel); 2304 assert(req != NULL); 2305 2306 args = &req->args; 2307 2308 spdk_file_sync(file, _channel); 2309 BLOBFS_TRACE(file, "name=%s\n", file->name); 2310 args->file = file; 2311 args->sem = &channel->sem; 2312 args->fn.file_op = __file_close_done; 2313 args->arg = req; 2314 channel->send_request(__file_close, req); 2315 sem_wait(&channel->sem); 2316 2317 return args->rc; 2318 } 2319 2320 static void 2321 cache_free_buffers(struct spdk_file *file) 2322 { 2323 BLOBFS_TRACE(file, "free=%s\n", file->name); 2324 pthread_spin_lock(&file->lock); 2325 pthread_spin_lock(&g_caches_lock); 2326 if (file->tree->present_mask == 0) { 2327 pthread_spin_unlock(&g_caches_lock); 2328 pthread_spin_unlock(&file->lock); 2329 return; 2330 } 2331 spdk_tree_free_buffers(file->tree); 2332 2333 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2334 /* If not freed, put it in the end of the queue */ 2335 if (file->tree->present_mask != 0) { 2336 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2337 } 2338 file->last = NULL; 2339 pthread_spin_unlock(&g_caches_lock); 2340 pthread_spin_unlock(&file->lock); 2341 } 2342 2343 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS); 2344 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW); 2345