1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "blobfs_internal.h" 38 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/assert.h" 42 #include "spdk/env.h" 43 #include "spdk/util.h" 44 #include "spdk_internal/log.h" 45 46 #define BLOBFS_TRACE(file, str, args...) \ 47 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args) 48 49 #define BLOBFS_TRACE_RW(file, str, args...) \ 50 SPDK_TRACELOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args) 51 52 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 53 54 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 55 static struct spdk_mempool *g_cache_pool; 56 static TAILQ_HEAD(, spdk_file) g_caches; 57 static int g_fs_count = 0; 58 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 59 static pthread_spinlock_t g_caches_lock; 60 61 static void 62 __sem_post(void *arg, int bserrno) 63 { 64 sem_t *sem = arg; 65 66 sem_post(sem); 67 } 68 69 void 70 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 71 { 72 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 73 free(cache_buffer); 74 } 75 76 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 77 78 struct spdk_file { 79 struct spdk_filesystem *fs; 80 struct spdk_blob *blob; 81 char *name; 82 uint64_t length; 83 bool open_for_writing; 84 uint64_t length_flushed; 85 uint64_t append_pos; 86 uint64_t seq_byte_count; 87 uint64_t next_seq_offset; 88 uint32_t priority; 89 TAILQ_ENTRY(spdk_file) tailq; 90 spdk_blob_id blobid; 91 uint32_t ref_count; 92 pthread_spinlock_t lock; 93 struct cache_buffer *last; 94 struct cache_tree *tree; 95 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 96 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 97 TAILQ_ENTRY(spdk_file) cache_tailq; 98 }; 99 100 struct spdk_filesystem { 101 struct spdk_blob_store *bs; 102 TAILQ_HEAD(, spdk_file) files; 103 struct spdk_bs_opts bs_opts; 104 struct spdk_bs_dev *bdev; 105 fs_send_request_fn send_request; 106 107 struct { 108 uint32_t max_ops; 109 struct spdk_io_channel *sync_io_channel; 110 struct spdk_fs_channel *sync_fs_channel; 111 } sync_target; 112 113 struct { 114 uint32_t max_ops; 115 struct spdk_io_channel *md_io_channel; 116 struct spdk_fs_channel *md_fs_channel; 117 } md_target; 118 119 struct { 120 uint32_t max_ops; 121 } io_target; 122 }; 123 124 struct spdk_fs_cb_args { 125 union { 126 spdk_fs_op_with_handle_complete fs_op_with_handle; 127 spdk_fs_op_complete fs_op; 128 spdk_file_op_with_handle_complete file_op_with_handle; 129 spdk_file_op_complete file_op; 130 spdk_file_stat_op_complete stat_op; 131 } fn; 132 void *arg; 133 sem_t *sem; 134 struct spdk_filesystem *fs; 135 struct spdk_file *file; 136 int rc; 137 bool from_request; 138 union { 139 struct { 140 uint64_t length; 141 } truncate; 142 struct { 143 struct spdk_io_channel *channel; 144 void *user_buf; 145 void *pin_buf; 146 int is_read; 147 off_t offset; 148 size_t length; 149 uint64_t start_page; 150 uint64_t num_pages; 151 uint32_t blocklen; 152 } rw; 153 struct { 154 const char *old_name; 155 const char *new_name; 156 } rename; 157 struct { 158 struct cache_buffer *cache_buffer; 159 uint64_t length; 160 } flush; 161 struct { 162 struct cache_buffer *cache_buffer; 163 uint64_t length; 164 uint64_t offset; 165 } readahead; 166 struct { 167 uint64_t offset; 168 TAILQ_ENTRY(spdk_fs_request) tailq; 169 bool xattr_in_progress; 170 } sync; 171 struct { 172 uint32_t num_clusters; 173 } resize; 174 struct { 175 const char *name; 176 uint32_t flags; 177 TAILQ_ENTRY(spdk_fs_request) tailq; 178 } open; 179 struct { 180 const char *name; 181 } create; 182 struct { 183 const char *name; 184 } delete; 185 struct { 186 const char *name; 187 } stat; 188 } op; 189 }; 190 191 static void cache_free_buffers(struct spdk_file *file); 192 193 static void 194 __initialize_cache(void) 195 { 196 assert(g_cache_pool == NULL); 197 198 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 199 g_fs_cache_size / CACHE_BUFFER_SIZE, 200 CACHE_BUFFER_SIZE, -1, SPDK_ENV_SOCKET_ID_ANY); 201 TAILQ_INIT(&g_caches); 202 pthread_spin_init(&g_caches_lock, 0); 203 } 204 205 static void 206 __free_cache(void) 207 { 208 assert(g_cache_pool != NULL); 209 210 spdk_mempool_free(g_cache_pool); 211 g_cache_pool = NULL; 212 } 213 214 static uint64_t 215 __file_get_blob_size(struct spdk_file *file) 216 { 217 uint64_t cluster_sz; 218 219 cluster_sz = file->fs->bs_opts.cluster_sz; 220 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 221 } 222 223 struct spdk_fs_request { 224 struct spdk_fs_cb_args args; 225 TAILQ_ENTRY(spdk_fs_request) link; 226 struct spdk_fs_channel *channel; 227 }; 228 229 struct spdk_fs_channel { 230 struct spdk_fs_request *req_mem; 231 TAILQ_HEAD(, spdk_fs_request) reqs; 232 sem_t sem; 233 struct spdk_filesystem *fs; 234 struct spdk_io_channel *bs_channel; 235 fs_send_request_fn send_request; 236 bool sync; 237 pthread_spinlock_t lock; 238 }; 239 240 static struct spdk_fs_request * 241 alloc_fs_request(struct spdk_fs_channel *channel) 242 { 243 struct spdk_fs_request *req; 244 245 if (channel->sync) { 246 pthread_spin_lock(&channel->lock); 247 } 248 249 req = TAILQ_FIRST(&channel->reqs); 250 if (req) { 251 TAILQ_REMOVE(&channel->reqs, req, link); 252 } 253 254 if (channel->sync) { 255 pthread_spin_unlock(&channel->lock); 256 } 257 258 if (req == NULL) { 259 return NULL; 260 } 261 memset(req, 0, sizeof(*req)); 262 req->channel = channel; 263 req->args.from_request = true; 264 265 return req; 266 } 267 268 static void 269 free_fs_request(struct spdk_fs_request *req) 270 { 271 struct spdk_fs_channel *channel = req->channel; 272 273 if (channel->sync) { 274 pthread_spin_lock(&channel->lock); 275 } 276 277 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 278 279 if (channel->sync) { 280 pthread_spin_unlock(&channel->lock); 281 } 282 } 283 284 static int 285 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 286 uint32_t max_ops) 287 { 288 uint32_t i; 289 290 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 291 if (!channel->req_mem) { 292 return -1; 293 } 294 295 TAILQ_INIT(&channel->reqs); 296 sem_init(&channel->sem, 0, 0); 297 298 for (i = 0; i < max_ops; i++) { 299 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 300 } 301 302 channel->fs = fs; 303 304 return 0; 305 } 306 307 static int 308 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 309 { 310 struct spdk_filesystem *fs; 311 struct spdk_fs_channel *channel = ctx_buf; 312 313 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 314 315 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 316 } 317 318 static int 319 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 320 { 321 struct spdk_filesystem *fs; 322 struct spdk_fs_channel *channel = ctx_buf; 323 324 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 325 326 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 327 } 328 329 static int 330 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 331 { 332 struct spdk_filesystem *fs; 333 struct spdk_fs_channel *channel = ctx_buf; 334 335 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 336 337 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 338 } 339 340 static void 341 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 342 { 343 struct spdk_fs_channel *channel = ctx_buf; 344 345 free(channel->req_mem); 346 if (channel->bs_channel != NULL) { 347 spdk_bs_free_io_channel(channel->bs_channel); 348 } 349 } 350 351 static void 352 __send_request_direct(fs_request_fn fn, void *arg) 353 { 354 fn(arg); 355 } 356 357 static void 358 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 359 { 360 fs->bs = bs; 361 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 362 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 363 fs->md_target.md_fs_channel->send_request = __send_request_direct; 364 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 365 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 366 367 pthread_mutex_lock(&g_cache_init_lock); 368 if (g_fs_count == 0) { 369 __initialize_cache(); 370 } 371 g_fs_count++; 372 pthread_mutex_unlock(&g_cache_init_lock); 373 } 374 375 static void 376 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 377 { 378 struct spdk_fs_request *req = ctx; 379 struct spdk_fs_cb_args *args = &req->args; 380 struct spdk_filesystem *fs = args->fs; 381 382 if (bserrno == 0) { 383 common_fs_bs_init(fs, bs); 384 } else { 385 free(fs); 386 fs = NULL; 387 } 388 389 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 390 free_fs_request(req); 391 } 392 393 static struct spdk_filesystem * 394 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 395 { 396 struct spdk_filesystem *fs; 397 398 fs = calloc(1, sizeof(*fs)); 399 if (fs == NULL) { 400 return NULL; 401 } 402 403 fs->bdev = dev; 404 fs->send_request = send_request_fn; 405 TAILQ_INIT(&fs->files); 406 407 fs->md_target.max_ops = 512; 408 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 409 sizeof(struct spdk_fs_channel)); 410 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 411 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 412 413 fs->sync_target.max_ops = 512; 414 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 415 sizeof(struct spdk_fs_channel)); 416 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 417 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 418 419 fs->io_target.max_ops = 512; 420 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 421 sizeof(struct spdk_fs_channel)); 422 423 return fs; 424 } 425 426 void 427 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 428 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 429 { 430 struct spdk_filesystem *fs; 431 struct spdk_fs_request *req; 432 struct spdk_fs_cb_args *args; 433 434 fs = fs_alloc(dev, send_request_fn); 435 if (fs == NULL) { 436 cb_fn(cb_arg, NULL, -ENOMEM); 437 return; 438 } 439 440 req = alloc_fs_request(fs->md_target.md_fs_channel); 441 if (req == NULL) { 442 spdk_put_io_channel(fs->md_target.md_io_channel); 443 spdk_io_device_unregister(&fs->md_target); 444 spdk_put_io_channel(fs->sync_target.sync_io_channel); 445 spdk_io_device_unregister(&fs->sync_target); 446 spdk_io_device_unregister(&fs->io_target); 447 free(fs); 448 cb_fn(cb_arg, NULL, -ENOMEM); 449 return; 450 } 451 452 args = &req->args; 453 args->fn.fs_op_with_handle = cb_fn; 454 args->arg = cb_arg; 455 args->fs = fs; 456 457 spdk_bs_init(dev, NULL, init_cb, req); 458 } 459 460 static struct spdk_file * 461 file_alloc(struct spdk_filesystem *fs) 462 { 463 struct spdk_file *file; 464 465 file = calloc(1, sizeof(*file)); 466 if (file == NULL) { 467 return NULL; 468 } 469 470 file->fs = fs; 471 TAILQ_INIT(&file->open_requests); 472 TAILQ_INIT(&file->sync_requests); 473 pthread_spin_init(&file->lock, 0); 474 file->tree = calloc(1, sizeof(*file->tree)); 475 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 476 file->priority = SPDK_FILE_PRIORITY_LOW; 477 return file; 478 } 479 480 static void 481 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 482 { 483 struct spdk_fs_request *req = ctx; 484 struct spdk_fs_cb_args *args = &req->args; 485 struct spdk_filesystem *fs = args->fs; 486 struct spdk_file *f; 487 uint64_t *length; 488 const char *name; 489 size_t value_len; 490 491 if (rc == -ENOENT) { 492 /* Finished iterating */ 493 args->fn.fs_op_with_handle(args->arg, fs, 0); 494 free_fs_request(req); 495 return; 496 } else if (rc < 0) { 497 args->fn.fs_op_with_handle(args->arg, fs, rc); 498 free_fs_request(req); 499 return; 500 } 501 502 rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len); 503 if (rc < 0) { 504 args->fn.fs_op_with_handle(args->arg, fs, rc); 505 free_fs_request(req); 506 return; 507 } 508 509 rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len); 510 if (rc < 0) { 511 args->fn.fs_op_with_handle(args->arg, fs, rc); 512 free_fs_request(req); 513 return; 514 } 515 assert(value_len == 8); 516 517 f = file_alloc(fs); 518 if (f == NULL) { 519 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 520 free_fs_request(req); 521 return; 522 } 523 524 f->name = strdup(name); 525 f->blobid = spdk_blob_get_id(blob); 526 f->length = *length; 527 f->length_flushed = *length; 528 f->append_pos = *length; 529 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 530 531 spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req); 532 } 533 534 static void 535 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 536 { 537 struct spdk_fs_request *req = ctx; 538 struct spdk_fs_cb_args *args = &req->args; 539 struct spdk_filesystem *fs = args->fs; 540 541 if (bserrno != 0) { 542 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 543 free_fs_request(req); 544 free(fs); 545 return; 546 } 547 548 common_fs_bs_init(fs, bs); 549 spdk_bs_md_iter_first(fs->bs, iter_cb, req); 550 } 551 552 void 553 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 554 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 555 { 556 struct spdk_filesystem *fs; 557 struct spdk_fs_cb_args *args; 558 struct spdk_fs_request *req; 559 560 fs = fs_alloc(dev, send_request_fn); 561 if (fs == NULL) { 562 cb_fn(cb_arg, NULL, -ENOMEM); 563 return; 564 } 565 566 req = alloc_fs_request(fs->md_target.md_fs_channel); 567 if (req == NULL) { 568 spdk_put_io_channel(fs->md_target.md_io_channel); 569 spdk_io_device_unregister(&fs->md_target); 570 spdk_put_io_channel(fs->sync_target.sync_io_channel); 571 spdk_io_device_unregister(&fs->sync_target); 572 spdk_io_device_unregister(&fs->io_target); 573 free(fs); 574 cb_fn(cb_arg, NULL, -ENOMEM); 575 return; 576 } 577 578 args = &req->args; 579 args->fn.fs_op_with_handle = cb_fn; 580 args->arg = cb_arg; 581 args->fs = fs; 582 583 spdk_bs_load(dev, load_cb, req); 584 } 585 586 static void 587 unload_cb(void *ctx, int bserrno) 588 { 589 struct spdk_fs_request *req = ctx; 590 struct spdk_fs_cb_args *args = &req->args; 591 struct spdk_filesystem *fs = args->fs; 592 593 pthread_mutex_lock(&g_cache_init_lock); 594 g_fs_count--; 595 if (g_fs_count == 0) { 596 __free_cache(); 597 } 598 pthread_mutex_unlock(&g_cache_init_lock); 599 600 args->fn.fs_op(args->arg, bserrno); 601 free(req); 602 603 spdk_io_device_unregister(&fs->io_target); 604 spdk_io_device_unregister(&fs->sync_target); 605 spdk_io_device_unregister(&fs->md_target); 606 607 free(fs); 608 } 609 610 void 611 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 612 { 613 struct spdk_fs_request *req; 614 struct spdk_fs_cb_args *args; 615 616 /* 617 * We must free the md_channel before unloading the blobstore, so just 618 * allocate this request from the general heap. 619 */ 620 req = calloc(1, sizeof(*req)); 621 if (req == NULL) { 622 cb_fn(cb_arg, -ENOMEM); 623 return; 624 } 625 626 args = &req->args; 627 args->fn.fs_op = cb_fn; 628 args->arg = cb_arg; 629 args->fs = fs; 630 631 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 632 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 633 spdk_bs_unload(fs->bs, unload_cb, req); 634 } 635 636 static struct spdk_file * 637 fs_find_file(struct spdk_filesystem *fs, const char *name) 638 { 639 struct spdk_file *file; 640 641 TAILQ_FOREACH(file, &fs->files, tailq) { 642 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 643 return file; 644 } 645 } 646 647 return NULL; 648 } 649 650 void 651 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 652 spdk_file_stat_op_complete cb_fn, void *cb_arg) 653 { 654 struct spdk_file_stat stat; 655 struct spdk_file *f = NULL; 656 657 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 658 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 659 return; 660 } 661 662 f = fs_find_file(fs, name); 663 if (f != NULL) { 664 stat.blobid = f->blobid; 665 stat.size = f->length; 666 cb_fn(cb_arg, &stat, 0); 667 return; 668 } 669 670 cb_fn(cb_arg, NULL, -ENOENT); 671 } 672 673 static void 674 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 675 { 676 struct spdk_fs_request *req = arg; 677 struct spdk_fs_cb_args *args = &req->args; 678 679 args->rc = fserrno; 680 if (fserrno == 0) { 681 memcpy(args->arg, stat, sizeof(*stat)); 682 } 683 sem_post(args->sem); 684 } 685 686 static void 687 __file_stat(void *arg) 688 { 689 struct spdk_fs_request *req = arg; 690 struct spdk_fs_cb_args *args = &req->args; 691 692 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 693 args->fn.stat_op, req); 694 } 695 696 int 697 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 698 const char *name, struct spdk_file_stat *stat) 699 { 700 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 701 struct spdk_fs_request *req; 702 int rc; 703 704 req = alloc_fs_request(channel); 705 assert(req != NULL); 706 707 req->args.fs = fs; 708 req->args.op.stat.name = name; 709 req->args.fn.stat_op = __copy_stat; 710 req->args.arg = stat; 711 req->args.sem = &channel->sem; 712 channel->send_request(__file_stat, req); 713 sem_wait(&channel->sem); 714 715 rc = req->args.rc; 716 free_fs_request(req); 717 718 return rc; 719 } 720 721 static void 722 fs_create_blob_close_cb(void *ctx, int bserrno) 723 { 724 struct spdk_fs_request *req = ctx; 725 struct spdk_fs_cb_args *args = &req->args; 726 727 args->fn.file_op(args->arg, bserrno); 728 free_fs_request(req); 729 } 730 731 static void 732 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 733 { 734 struct spdk_fs_request *req = ctx; 735 struct spdk_fs_cb_args *args = &req->args; 736 struct spdk_file *f = args->file; 737 uint64_t length = 0; 738 739 f->blob = blob; 740 spdk_bs_md_resize_blob(blob, 1); 741 spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 742 spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length)); 743 744 spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args); 745 } 746 747 static void 748 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 749 { 750 struct spdk_fs_request *req = ctx; 751 struct spdk_fs_cb_args *args = &req->args; 752 struct spdk_file *f = args->file; 753 754 f->blobid = blobid; 755 spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 756 } 757 758 void 759 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 760 spdk_file_op_complete cb_fn, void *cb_arg) 761 { 762 struct spdk_file *file; 763 struct spdk_fs_request *req; 764 struct spdk_fs_cb_args *args; 765 766 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 767 cb_fn(cb_arg, -ENAMETOOLONG); 768 return; 769 } 770 771 file = fs_find_file(fs, name); 772 if (file != NULL) { 773 cb_fn(cb_arg, -EEXIST); 774 return; 775 } 776 777 file = file_alloc(fs); 778 if (file == NULL) { 779 cb_fn(cb_arg, -ENOMEM); 780 return; 781 } 782 783 req = alloc_fs_request(fs->md_target.md_fs_channel); 784 if (req == NULL) { 785 cb_fn(cb_arg, -ENOMEM); 786 return; 787 } 788 789 args = &req->args; 790 args->file = file; 791 args->fn.file_op = cb_fn; 792 args->arg = cb_arg; 793 794 file->name = strdup(name); 795 spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args); 796 } 797 798 static void 799 __fs_create_file_done(void *arg, int fserrno) 800 { 801 struct spdk_fs_request *req = arg; 802 struct spdk_fs_cb_args *args = &req->args; 803 804 args->rc = fserrno; 805 sem_post(args->sem); 806 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 807 } 808 809 static void 810 __fs_create_file(void *arg) 811 { 812 struct spdk_fs_request *req = arg; 813 struct spdk_fs_cb_args *args = &req->args; 814 815 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 816 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 817 } 818 819 int 820 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 821 { 822 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 823 struct spdk_fs_request *req; 824 struct spdk_fs_cb_args *args; 825 int rc; 826 827 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 828 829 req = alloc_fs_request(channel); 830 assert(req != NULL); 831 832 args = &req->args; 833 args->fs = fs; 834 args->op.create.name = name; 835 args->sem = &channel->sem; 836 fs->send_request(__fs_create_file, req); 837 sem_wait(&channel->sem); 838 rc = args->rc; 839 free_fs_request(req); 840 841 return rc; 842 } 843 844 static void 845 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 846 { 847 struct spdk_fs_request *req = ctx; 848 struct spdk_fs_cb_args *args = &req->args; 849 struct spdk_file *f = args->file; 850 851 f->blob = blob; 852 while (!TAILQ_EMPTY(&f->open_requests)) { 853 req = TAILQ_FIRST(&f->open_requests); 854 args = &req->args; 855 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 856 args->fn.file_op_with_handle(args->arg, f, bserrno); 857 free_fs_request(req); 858 } 859 } 860 861 static void 862 fs_open_blob_create_cb(void *ctx, int bserrno) 863 { 864 struct spdk_fs_request *req = ctx; 865 struct spdk_fs_cb_args *args = &req->args; 866 struct spdk_file *file = args->file; 867 struct spdk_filesystem *fs = args->fs; 868 869 if (file == NULL) { 870 /* 871 * This is from an open with CREATE flag - the file 872 * is now created so look it up in the file list for this 873 * filesystem. 874 */ 875 file = fs_find_file(fs, args->op.open.name); 876 assert(file != NULL); 877 args->file = file; 878 } 879 880 file->ref_count++; 881 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 882 if (file->ref_count == 1) { 883 assert(file->blob == NULL); 884 spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 885 } else if (file->blob != NULL) { 886 fs_open_blob_done(req, file->blob, 0); 887 } else { 888 /* 889 * The blob open for this file is in progress due to a previous 890 * open request. When that open completes, it will invoke the 891 * open callback for this request. 892 */ 893 } 894 } 895 896 void 897 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 898 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 899 { 900 struct spdk_file *f = NULL; 901 struct spdk_fs_request *req; 902 struct spdk_fs_cb_args *args; 903 904 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 905 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 906 return; 907 } 908 909 f = fs_find_file(fs, name); 910 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 911 cb_fn(cb_arg, NULL, -ENOENT); 912 return; 913 } 914 915 req = alloc_fs_request(fs->md_target.md_fs_channel); 916 if (req == NULL) { 917 cb_fn(cb_arg, NULL, -ENOMEM); 918 return; 919 } 920 921 args = &req->args; 922 args->fn.file_op_with_handle = cb_fn; 923 args->arg = cb_arg; 924 args->file = f; 925 args->fs = fs; 926 args->op.open.name = name; 927 928 if (f == NULL) { 929 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 930 } else { 931 fs_open_blob_create_cb(req, 0); 932 } 933 } 934 935 static void 936 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 937 { 938 struct spdk_fs_request *req = arg; 939 struct spdk_fs_cb_args *args = &req->args; 940 941 args->file = file; 942 args->rc = bserrno; 943 sem_post(args->sem); 944 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 945 } 946 947 static void 948 __fs_open_file(void *arg) 949 { 950 struct spdk_fs_request *req = arg; 951 struct spdk_fs_cb_args *args = &req->args; 952 953 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 954 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 955 __fs_open_file_done, req); 956 } 957 958 int 959 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 960 const char *name, uint32_t flags, struct spdk_file **file) 961 { 962 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 963 struct spdk_fs_request *req; 964 struct spdk_fs_cb_args *args; 965 int rc; 966 967 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 968 969 req = alloc_fs_request(channel); 970 assert(req != NULL); 971 972 args = &req->args; 973 args->fs = fs; 974 args->op.open.name = name; 975 args->op.open.flags = flags; 976 args->sem = &channel->sem; 977 fs->send_request(__fs_open_file, req); 978 sem_wait(&channel->sem); 979 rc = args->rc; 980 if (rc == 0) { 981 *file = args->file; 982 } else { 983 *file = NULL; 984 } 985 free_fs_request(req); 986 987 return rc; 988 } 989 990 static void 991 fs_rename_blob_close_cb(void *ctx, int bserrno) 992 { 993 struct spdk_fs_request *req = ctx; 994 struct spdk_fs_cb_args *args = &req->args; 995 996 args->fn.fs_op(args->arg, bserrno); 997 free_fs_request(req); 998 } 999 1000 static void 1001 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1002 { 1003 struct spdk_fs_request *req = ctx; 1004 struct spdk_fs_cb_args *args = &req->args; 1005 struct spdk_file *f = args->file; 1006 const char *new_name = args->op.rename.new_name; 1007 1008 f->blob = blob; 1009 spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1010 spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req); 1011 } 1012 1013 static void 1014 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1015 { 1016 struct spdk_fs_cb_args *args = &req->args; 1017 struct spdk_file *f; 1018 1019 f = fs_find_file(args->fs, args->op.rename.old_name); 1020 if (f == NULL) { 1021 args->fn.fs_op(args->arg, -ENOENT); 1022 free_fs_request(req); 1023 return; 1024 } 1025 1026 free(f->name); 1027 f->name = strdup(args->op.rename.new_name); 1028 args->file = f; 1029 spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1030 } 1031 1032 static void 1033 fs_rename_delete_done(void *arg, int fserrno) 1034 { 1035 __spdk_fs_md_rename_file(arg); 1036 } 1037 1038 void 1039 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1040 const char *old_name, const char *new_name, 1041 spdk_file_op_complete cb_fn, void *cb_arg) 1042 { 1043 struct spdk_file *f; 1044 struct spdk_fs_request *req; 1045 struct spdk_fs_cb_args *args; 1046 1047 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1048 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1049 cb_fn(cb_arg, -ENAMETOOLONG); 1050 return; 1051 } 1052 1053 req = alloc_fs_request(fs->md_target.md_fs_channel); 1054 if (req == NULL) { 1055 cb_fn(cb_arg, -ENOMEM); 1056 return; 1057 } 1058 1059 args = &req->args; 1060 args->fn.fs_op = cb_fn; 1061 args->fs = fs; 1062 args->arg = cb_arg; 1063 args->op.rename.old_name = old_name; 1064 args->op.rename.new_name = new_name; 1065 1066 f = fs_find_file(fs, new_name); 1067 if (f == NULL) { 1068 __spdk_fs_md_rename_file(req); 1069 return; 1070 } 1071 1072 /* 1073 * The rename overwrites an existing file. So delete the existing file, then 1074 * do the actual rename. 1075 */ 1076 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1077 } 1078 1079 static void 1080 __fs_rename_file_done(void *arg, int fserrno) 1081 { 1082 struct spdk_fs_request *req = arg; 1083 struct spdk_fs_cb_args *args = &req->args; 1084 1085 args->rc = fserrno; 1086 sem_post(args->sem); 1087 } 1088 1089 static void 1090 __fs_rename_file(void *arg) 1091 { 1092 struct spdk_fs_request *req = arg; 1093 struct spdk_fs_cb_args *args = &req->args; 1094 1095 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1096 __fs_rename_file_done, req); 1097 } 1098 1099 int 1100 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1101 const char *old_name, const char *new_name) 1102 { 1103 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1104 struct spdk_fs_request *req; 1105 struct spdk_fs_cb_args *args; 1106 int rc; 1107 1108 req = alloc_fs_request(channel); 1109 assert(req != NULL); 1110 1111 args = &req->args; 1112 1113 args->fs = fs; 1114 args->op.rename.old_name = old_name; 1115 args->op.rename.new_name = new_name; 1116 args->sem = &channel->sem; 1117 fs->send_request(__fs_rename_file, req); 1118 sem_wait(&channel->sem); 1119 rc = args->rc; 1120 free_fs_request(req); 1121 return rc; 1122 } 1123 1124 static void 1125 blob_delete_cb(void *ctx, int bserrno) 1126 { 1127 struct spdk_fs_request *req = ctx; 1128 struct spdk_fs_cb_args *args = &req->args; 1129 1130 args->fn.file_op(args->arg, bserrno); 1131 free_fs_request(req); 1132 } 1133 1134 void 1135 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1136 spdk_file_op_complete cb_fn, void *cb_arg) 1137 { 1138 struct spdk_file *f; 1139 spdk_blob_id blobid; 1140 struct spdk_fs_request *req; 1141 struct spdk_fs_cb_args *args; 1142 1143 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1144 1145 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1146 cb_fn(cb_arg, -ENAMETOOLONG); 1147 return; 1148 } 1149 1150 f = fs_find_file(fs, name); 1151 if (f == NULL) { 1152 cb_fn(cb_arg, -ENOENT); 1153 return; 1154 } 1155 1156 if (f->ref_count > 0) { 1157 /* For now, do not allow deleting files with open references. */ 1158 cb_fn(cb_arg, -EBUSY); 1159 return; 1160 } 1161 1162 req = alloc_fs_request(fs->md_target.md_fs_channel); 1163 if (req == NULL) { 1164 cb_fn(cb_arg, -ENOMEM); 1165 return; 1166 } 1167 1168 TAILQ_REMOVE(&fs->files, f, tailq); 1169 1170 cache_free_buffers(f); 1171 1172 blobid = f->blobid; 1173 1174 free(f->name); 1175 free(f->tree); 1176 free(f); 1177 1178 args = &req->args; 1179 args->fn.file_op = cb_fn; 1180 args->arg = cb_arg; 1181 spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1182 } 1183 1184 static void 1185 __fs_delete_file_done(void *arg, int fserrno) 1186 { 1187 struct spdk_fs_request *req = arg; 1188 struct spdk_fs_cb_args *args = &req->args; 1189 1190 args->rc = fserrno; 1191 sem_post(args->sem); 1192 } 1193 1194 static void 1195 __fs_delete_file(void *arg) 1196 { 1197 struct spdk_fs_request *req = arg; 1198 struct spdk_fs_cb_args *args = &req->args; 1199 1200 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1201 } 1202 1203 int 1204 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1205 const char *name) 1206 { 1207 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1208 struct spdk_fs_request *req; 1209 struct spdk_fs_cb_args *args; 1210 int rc; 1211 1212 req = alloc_fs_request(channel); 1213 assert(req != NULL); 1214 1215 args = &req->args; 1216 args->fs = fs; 1217 args->op.delete.name = name; 1218 args->sem = &channel->sem; 1219 fs->send_request(__fs_delete_file, req); 1220 sem_wait(&channel->sem); 1221 rc = args->rc; 1222 free_fs_request(req); 1223 1224 return rc; 1225 } 1226 1227 spdk_fs_iter 1228 spdk_fs_iter_first(struct spdk_filesystem *fs) 1229 { 1230 struct spdk_file *f; 1231 1232 f = TAILQ_FIRST(&fs->files); 1233 return f; 1234 } 1235 1236 spdk_fs_iter 1237 spdk_fs_iter_next(spdk_fs_iter iter) 1238 { 1239 struct spdk_file *f = iter; 1240 1241 if (f == NULL) { 1242 return NULL; 1243 } 1244 1245 f = TAILQ_NEXT(f, tailq); 1246 return f; 1247 } 1248 1249 const char * 1250 spdk_file_get_name(struct spdk_file *file) 1251 { 1252 return file->name; 1253 } 1254 1255 uint64_t 1256 spdk_file_get_length(struct spdk_file *file) 1257 { 1258 assert(file != NULL); 1259 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1260 return file->length; 1261 } 1262 1263 static void 1264 fs_truncate_complete_cb(void *ctx, int bserrno) 1265 { 1266 struct spdk_fs_request *req = ctx; 1267 struct spdk_fs_cb_args *args = &req->args; 1268 1269 args->fn.file_op(args->arg, bserrno); 1270 free_fs_request(req); 1271 } 1272 1273 static uint64_t 1274 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1275 { 1276 return (length + cluster_sz - 1) / cluster_sz; 1277 } 1278 1279 void 1280 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1281 spdk_file_op_complete cb_fn, void *cb_arg) 1282 { 1283 struct spdk_filesystem *fs; 1284 size_t num_clusters; 1285 struct spdk_fs_request *req; 1286 struct spdk_fs_cb_args *args; 1287 1288 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1289 if (length == file->length) { 1290 cb_fn(cb_arg, 0); 1291 return; 1292 } 1293 1294 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1295 if (req == NULL) { 1296 cb_fn(cb_arg, -ENOMEM); 1297 return; 1298 } 1299 1300 args = &req->args; 1301 args->fn.file_op = cb_fn; 1302 args->arg = cb_arg; 1303 args->file = file; 1304 fs = file->fs; 1305 1306 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1307 1308 spdk_bs_md_resize_blob(file->blob, num_clusters); 1309 spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length)); 1310 1311 file->length = length; 1312 if (file->append_pos > file->length) { 1313 file->append_pos = file->length; 1314 } 1315 1316 spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args); 1317 } 1318 1319 static void 1320 __truncate(void *arg) 1321 { 1322 struct spdk_fs_request *req = arg; 1323 struct spdk_fs_cb_args *args = &req->args; 1324 1325 spdk_file_truncate_async(args->file, args->op.truncate.length, 1326 args->fn.file_op, args->arg); 1327 } 1328 1329 void 1330 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1331 uint64_t length) 1332 { 1333 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1334 struct spdk_fs_request *req; 1335 struct spdk_fs_cb_args *args; 1336 1337 req = alloc_fs_request(channel); 1338 assert(req != NULL); 1339 1340 args = &req->args; 1341 1342 args->file = file; 1343 args->op.truncate.length = length; 1344 args->fn.file_op = __sem_post; 1345 args->arg = &channel->sem; 1346 1347 channel->send_request(__truncate, req); 1348 sem_wait(&channel->sem); 1349 free_fs_request(req); 1350 } 1351 1352 static void 1353 __rw_done(void *ctx, int bserrno) 1354 { 1355 struct spdk_fs_request *req = ctx; 1356 struct spdk_fs_cb_args *args = &req->args; 1357 1358 spdk_dma_free(args->op.rw.pin_buf); 1359 args->fn.file_op(args->arg, bserrno); 1360 free_fs_request(req); 1361 } 1362 1363 static void 1364 __read_done(void *ctx, int bserrno) 1365 { 1366 struct spdk_fs_request *req = ctx; 1367 struct spdk_fs_cb_args *args = &req->args; 1368 1369 if (args->op.rw.is_read) { 1370 memcpy(args->op.rw.user_buf, 1371 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1372 args->op.rw.length); 1373 __rw_done(req, 0); 1374 } else { 1375 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1376 args->op.rw.user_buf, 1377 args->op.rw.length); 1378 spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel, 1379 args->op.rw.pin_buf, 1380 args->op.rw.start_page, args->op.rw.num_pages, 1381 __rw_done, req); 1382 } 1383 } 1384 1385 static void 1386 __do_blob_read(void *ctx, int fserrno) 1387 { 1388 struct spdk_fs_request *req = ctx; 1389 struct spdk_fs_cb_args *args = &req->args; 1390 1391 spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel, 1392 args->op.rw.pin_buf, 1393 args->op.rw.start_page, args->op.rw.num_pages, 1394 __read_done, req); 1395 } 1396 1397 static void 1398 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1399 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1400 { 1401 uint64_t end_page; 1402 1403 *page_size = spdk_bs_get_page_size(file->fs->bs); 1404 *start_page = offset / *page_size; 1405 end_page = (offset + length - 1) / *page_size; 1406 *num_pages = (end_page - *start_page + 1); 1407 } 1408 1409 static void 1410 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1411 void *payload, uint64_t offset, uint64_t length, 1412 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1413 { 1414 struct spdk_fs_request *req; 1415 struct spdk_fs_cb_args *args; 1416 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1417 uint64_t start_page, num_pages, pin_buf_length; 1418 uint32_t page_size; 1419 1420 if (is_read && offset + length > file->length) { 1421 cb_fn(cb_arg, -EINVAL); 1422 return; 1423 } 1424 1425 req = alloc_fs_request(channel); 1426 if (req == NULL) { 1427 cb_fn(cb_arg, -ENOMEM); 1428 return; 1429 } 1430 1431 args = &req->args; 1432 args->fn.file_op = cb_fn; 1433 args->arg = cb_arg; 1434 args->file = file; 1435 args->op.rw.channel = channel->bs_channel; 1436 args->op.rw.user_buf = payload; 1437 args->op.rw.is_read = is_read; 1438 args->op.rw.offset = offset; 1439 args->op.rw.length = length; 1440 1441 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1442 pin_buf_length = num_pages * page_size; 1443 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL); 1444 1445 args->op.rw.start_page = start_page; 1446 args->op.rw.num_pages = num_pages; 1447 1448 if (!is_read && file->length < offset + length) { 1449 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1450 } else { 1451 __do_blob_read(req, 0); 1452 } 1453 } 1454 1455 void 1456 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1457 void *payload, uint64_t offset, uint64_t length, 1458 spdk_file_op_complete cb_fn, void *cb_arg) 1459 { 1460 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1461 } 1462 1463 void 1464 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1465 void *payload, uint64_t offset, uint64_t length, 1466 spdk_file_op_complete cb_fn, void *cb_arg) 1467 { 1468 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n", 1469 file->name, offset, length); 1470 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1471 } 1472 1473 struct spdk_io_channel * 1474 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1475 { 1476 struct spdk_io_channel *io_channel; 1477 struct spdk_fs_channel *fs_channel; 1478 1479 io_channel = spdk_get_io_channel(&fs->io_target); 1480 fs_channel = spdk_io_channel_get_ctx(io_channel); 1481 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1482 fs_channel->send_request = __send_request_direct; 1483 1484 return io_channel; 1485 } 1486 1487 struct spdk_io_channel * 1488 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs) 1489 { 1490 struct spdk_io_channel *io_channel; 1491 struct spdk_fs_channel *fs_channel; 1492 1493 io_channel = spdk_get_io_channel(&fs->io_target); 1494 fs_channel = spdk_io_channel_get_ctx(io_channel); 1495 fs_channel->send_request = fs->send_request; 1496 fs_channel->sync = 1; 1497 pthread_spin_init(&fs_channel->lock, 0); 1498 1499 return io_channel; 1500 } 1501 1502 void 1503 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1504 { 1505 spdk_put_io_channel(channel); 1506 } 1507 1508 void 1509 spdk_fs_set_cache_size(uint64_t size_in_mb) 1510 { 1511 g_fs_cache_size = size_in_mb * 1024 * 1024; 1512 } 1513 1514 uint64_t 1515 spdk_fs_get_cache_size(void) 1516 { 1517 return g_fs_cache_size / (1024 * 1024); 1518 } 1519 1520 static void __file_flush(void *_args); 1521 1522 static void * 1523 alloc_cache_memory_buffer(struct spdk_file *context) 1524 { 1525 struct spdk_file *file; 1526 void *buf; 1527 1528 buf = spdk_mempool_get(g_cache_pool); 1529 if (buf != NULL) { 1530 return buf; 1531 } 1532 1533 pthread_spin_lock(&g_caches_lock); 1534 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1535 if (!file->open_for_writing && 1536 file->priority == SPDK_FILE_PRIORITY_LOW && 1537 file != context) { 1538 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1539 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1540 break; 1541 } 1542 } 1543 pthread_spin_unlock(&g_caches_lock); 1544 if (file != NULL) { 1545 cache_free_buffers(file); 1546 buf = spdk_mempool_get(g_cache_pool); 1547 if (buf != NULL) { 1548 return buf; 1549 } 1550 } 1551 1552 pthread_spin_lock(&g_caches_lock); 1553 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1554 if (!file->open_for_writing && file != context) { 1555 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1556 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1557 break; 1558 } 1559 } 1560 pthread_spin_unlock(&g_caches_lock); 1561 if (file != NULL) { 1562 cache_free_buffers(file); 1563 buf = spdk_mempool_get(g_cache_pool); 1564 if (buf != NULL) { 1565 return buf; 1566 } 1567 } 1568 1569 pthread_spin_lock(&g_caches_lock); 1570 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1571 if (file != context) { 1572 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1573 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1574 break; 1575 } 1576 } 1577 pthread_spin_unlock(&g_caches_lock); 1578 if (file != NULL) { 1579 cache_free_buffers(file); 1580 buf = spdk_mempool_get(g_cache_pool); 1581 if (buf != NULL) { 1582 return buf; 1583 } 1584 } 1585 1586 assert(false); 1587 return NULL; 1588 } 1589 1590 static struct cache_buffer * 1591 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1592 { 1593 struct cache_buffer *buf; 1594 int count = 0; 1595 1596 buf = calloc(1, sizeof(*buf)); 1597 if (buf == NULL) { 1598 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "calloc failed\n"); 1599 return NULL; 1600 } 1601 1602 buf->buf = alloc_cache_memory_buffer(file); 1603 if (buf->buf == NULL) { 1604 while (buf->buf == NULL) { 1605 /* 1606 * TODO: alloc_cache_memory_buffer() should eventually free 1607 * some buffers. Need a more sophisticated check here, instead 1608 * of just bailing if 100 tries does not result in getting a 1609 * free buffer. This will involve using the sync channel's 1610 * semaphore to block until a buffer becomes available. 1611 */ 1612 if (count++ == 100) { 1613 SPDK_ERRLOG("could not allocate cache buffer\n"); 1614 assert(false); 1615 free(buf); 1616 return NULL; 1617 } 1618 buf->buf = alloc_cache_memory_buffer(file); 1619 } 1620 } 1621 1622 buf->buf_size = CACHE_BUFFER_SIZE; 1623 buf->offset = offset; 1624 1625 pthread_spin_lock(&g_caches_lock); 1626 if (file->tree->present_mask == 0) { 1627 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1628 } 1629 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1630 pthread_spin_unlock(&g_caches_lock); 1631 1632 return buf; 1633 } 1634 1635 static struct cache_buffer * 1636 cache_append_buffer(struct spdk_file *file) 1637 { 1638 struct cache_buffer *last; 1639 1640 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1641 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1642 1643 last = cache_insert_buffer(file, file->append_pos); 1644 if (last == NULL) { 1645 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n"); 1646 return NULL; 1647 } 1648 1649 if (file->last != NULL) { 1650 file->last->next = last; 1651 } 1652 file->last = last; 1653 1654 return last; 1655 } 1656 1657 static void 1658 __wake_caller(struct spdk_fs_cb_args *args) 1659 { 1660 sem_post(args->sem); 1661 } 1662 1663 static void __check_sync_reqs(struct spdk_file *file); 1664 1665 static void 1666 __file_cache_finish_sync(struct spdk_file *file) 1667 { 1668 struct spdk_fs_request *sync_req; 1669 struct spdk_fs_cb_args *sync_args; 1670 1671 pthread_spin_lock(&file->lock); 1672 sync_req = TAILQ_FIRST(&file->sync_requests); 1673 sync_args = &sync_req->args; 1674 assert(sync_args->op.sync.offset <= file->length_flushed); 1675 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1676 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1677 pthread_spin_unlock(&file->lock); 1678 1679 sync_args->fn.file_op(sync_args->arg, 0); 1680 __check_sync_reqs(file); 1681 1682 pthread_spin_lock(&file->lock); 1683 free_fs_request(sync_req); 1684 pthread_spin_unlock(&file->lock); 1685 } 1686 1687 static void 1688 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1689 { 1690 struct spdk_file *file = ctx; 1691 1692 __file_cache_finish_sync(file); 1693 } 1694 1695 static void 1696 __free_args(struct spdk_fs_cb_args *args) 1697 { 1698 struct spdk_fs_request *req; 1699 1700 if (!args->from_request) { 1701 free(args); 1702 } else { 1703 /* Depends on args being at the start of the spdk_fs_request structure. */ 1704 req = (struct spdk_fs_request *)args; 1705 free_fs_request(req); 1706 } 1707 } 1708 1709 static void 1710 __check_sync_reqs(struct spdk_file *file) 1711 { 1712 struct spdk_fs_request *sync_req; 1713 1714 pthread_spin_lock(&file->lock); 1715 1716 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1717 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1718 break; 1719 } 1720 } 1721 1722 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1723 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1724 sync_req->args.op.sync.xattr_in_progress = true; 1725 spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed, 1726 sizeof(file->length_flushed)); 1727 1728 pthread_spin_unlock(&file->lock); 1729 spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file); 1730 } else { 1731 pthread_spin_unlock(&file->lock); 1732 } 1733 } 1734 1735 static void 1736 __file_flush_done(void *arg, int bserrno) 1737 { 1738 struct spdk_fs_cb_args *args = arg; 1739 struct spdk_file *file = args->file; 1740 struct cache_buffer *next = args->op.flush.cache_buffer; 1741 1742 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1743 1744 pthread_spin_lock(&file->lock); 1745 next->in_progress = false; 1746 next->bytes_flushed += args->op.flush.length; 1747 file->length_flushed += args->op.flush.length; 1748 if (file->length_flushed > file->length) { 1749 file->length = file->length_flushed; 1750 } 1751 if (next->bytes_flushed == next->buf_size) { 1752 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1753 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1754 } 1755 1756 /* 1757 * Assert that there is no cached data that extends past the end of the underlying 1758 * blob. 1759 */ 1760 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1761 next->bytes_filled == 0); 1762 1763 pthread_spin_unlock(&file->lock); 1764 1765 __check_sync_reqs(file); 1766 1767 __file_flush(args); 1768 } 1769 1770 static void 1771 __file_flush(void *_args) 1772 { 1773 struct spdk_fs_cb_args *args = _args; 1774 struct spdk_file *file = args->file; 1775 struct cache_buffer *next; 1776 uint64_t offset, length, start_page, num_pages; 1777 uint32_t page_size; 1778 1779 pthread_spin_lock(&file->lock); 1780 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1781 if (next == NULL || next->in_progress) { 1782 /* 1783 * There is either no data to flush, or a flush I/O is already in 1784 * progress. So return immediately - if a flush I/O is in 1785 * progress we will flush more data after that is completed. 1786 */ 1787 __free_args(args); 1788 pthread_spin_unlock(&file->lock); 1789 return; 1790 } 1791 1792 offset = next->offset + next->bytes_flushed; 1793 length = next->bytes_filled - next->bytes_flushed; 1794 if (length == 0) { 1795 __free_args(args); 1796 pthread_spin_unlock(&file->lock); 1797 return; 1798 } 1799 args->op.flush.length = length; 1800 args->op.flush.cache_buffer = next; 1801 1802 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1803 1804 next->in_progress = true; 1805 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1806 offset, length, start_page, num_pages); 1807 pthread_spin_unlock(&file->lock); 1808 spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1809 next->buf + (start_page * page_size) - next->offset, 1810 start_page, num_pages, 1811 __file_flush_done, args); 1812 } 1813 1814 static void 1815 __file_extend_done(void *arg, int bserrno) 1816 { 1817 struct spdk_fs_cb_args *args = arg; 1818 1819 __wake_caller(args); 1820 } 1821 1822 static void 1823 __file_extend_blob(void *_args) 1824 { 1825 struct spdk_fs_cb_args *args = _args; 1826 struct spdk_file *file = args->file; 1827 1828 spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters); 1829 1830 spdk_bs_md_sync_blob(file->blob, __file_extend_done, args); 1831 } 1832 1833 static void 1834 __rw_from_file_done(void *arg, int bserrno) 1835 { 1836 struct spdk_fs_cb_args *args = arg; 1837 1838 __wake_caller(args); 1839 __free_args(args); 1840 } 1841 1842 static void 1843 __rw_from_file(void *_args) 1844 { 1845 struct spdk_fs_cb_args *args = _args; 1846 struct spdk_file *file = args->file; 1847 1848 if (args->op.rw.is_read) { 1849 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1850 args->op.rw.offset, args->op.rw.length, 1851 __rw_from_file_done, args); 1852 } else { 1853 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1854 args->op.rw.offset, args->op.rw.length, 1855 __rw_from_file_done, args); 1856 } 1857 } 1858 1859 static int 1860 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1861 uint64_t offset, uint64_t length, bool is_read) 1862 { 1863 struct spdk_fs_cb_args *args; 1864 1865 args = calloc(1, sizeof(*args)); 1866 if (args == NULL) { 1867 sem_post(sem); 1868 return -ENOMEM; 1869 } 1870 1871 args->file = file; 1872 args->sem = sem; 1873 args->op.rw.user_buf = payload; 1874 args->op.rw.offset = offset; 1875 args->op.rw.length = length; 1876 args->op.rw.is_read = is_read; 1877 file->fs->send_request(__rw_from_file, args); 1878 return 0; 1879 } 1880 1881 int 1882 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 1883 void *payload, uint64_t offset, uint64_t length) 1884 { 1885 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1886 struct spdk_fs_cb_args *args; 1887 uint64_t rem_length, copy, blob_size, cluster_sz; 1888 uint32_t cache_buffers_filled = 0; 1889 uint8_t *cur_payload; 1890 struct cache_buffer *last; 1891 1892 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 1893 1894 if (length == 0) { 1895 return 0; 1896 } 1897 1898 if (offset != file->append_pos) { 1899 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 1900 return -EINVAL; 1901 } 1902 1903 pthread_spin_lock(&file->lock); 1904 file->open_for_writing = true; 1905 1906 if (file->last == NULL) { 1907 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 1908 cache_append_buffer(file); 1909 } else { 1910 int rc; 1911 1912 file->append_pos += length; 1913 pthread_spin_unlock(&file->lock); 1914 rc = __send_rw_from_file(file, &channel->sem, payload, 1915 offset, length, false); 1916 sem_wait(&channel->sem); 1917 return rc; 1918 } 1919 } 1920 1921 blob_size = __file_get_blob_size(file); 1922 1923 if ((offset + length) > blob_size) { 1924 struct spdk_fs_cb_args extend_args = {}; 1925 1926 cluster_sz = file->fs->bs_opts.cluster_sz; 1927 extend_args.sem = &channel->sem; 1928 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 1929 extend_args.file = file; 1930 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 1931 pthread_spin_unlock(&file->lock); 1932 file->fs->send_request(__file_extend_blob, &extend_args); 1933 sem_wait(&channel->sem); 1934 } 1935 1936 last = file->last; 1937 rem_length = length; 1938 cur_payload = payload; 1939 while (rem_length > 0) { 1940 copy = last->buf_size - last->bytes_filled; 1941 if (copy > rem_length) { 1942 copy = rem_length; 1943 } 1944 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 1945 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 1946 file->append_pos += copy; 1947 if (file->length < file->append_pos) { 1948 file->length = file->append_pos; 1949 } 1950 cur_payload += copy; 1951 last->bytes_filled += copy; 1952 rem_length -= copy; 1953 if (last->bytes_filled == last->buf_size) { 1954 cache_buffers_filled++; 1955 last = cache_append_buffer(file); 1956 if (last == NULL) { 1957 BLOBFS_TRACE(file, "nomem\n"); 1958 pthread_spin_unlock(&file->lock); 1959 return -ENOMEM; 1960 } 1961 } 1962 } 1963 1964 if (cache_buffers_filled == 0) { 1965 pthread_spin_unlock(&file->lock); 1966 return 0; 1967 } 1968 1969 args = calloc(1, sizeof(*args)); 1970 if (args == NULL) { 1971 pthread_spin_unlock(&file->lock); 1972 return -ENOMEM; 1973 } 1974 1975 args->file = file; 1976 file->fs->send_request(__file_flush, args); 1977 pthread_spin_unlock(&file->lock); 1978 return 0; 1979 } 1980 1981 static void 1982 __readahead_done(void *arg, int bserrno) 1983 { 1984 struct spdk_fs_cb_args *args = arg; 1985 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 1986 struct spdk_file *file = args->file; 1987 1988 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 1989 1990 pthread_spin_lock(&file->lock); 1991 cache_buffer->bytes_filled = args->op.readahead.length; 1992 cache_buffer->bytes_flushed = args->op.readahead.length; 1993 cache_buffer->in_progress = false; 1994 pthread_spin_unlock(&file->lock); 1995 1996 __free_args(args); 1997 } 1998 1999 static void 2000 __readahead(void *_args) 2001 { 2002 struct spdk_fs_cb_args *args = _args; 2003 struct spdk_file *file = args->file; 2004 uint64_t offset, length, start_page, num_pages; 2005 uint32_t page_size; 2006 2007 offset = args->op.readahead.offset; 2008 length = args->op.readahead.length; 2009 assert(length > 0); 2010 2011 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 2012 2013 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2014 offset, length, start_page, num_pages); 2015 spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2016 args->op.readahead.cache_buffer->buf, 2017 start_page, num_pages, 2018 __readahead_done, args); 2019 } 2020 2021 static uint64_t 2022 __next_cache_buffer_offset(uint64_t offset) 2023 { 2024 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2025 } 2026 2027 static void 2028 check_readahead(struct spdk_file *file, uint64_t offset) 2029 { 2030 struct spdk_fs_cb_args *args; 2031 2032 offset = __next_cache_buffer_offset(offset); 2033 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2034 return; 2035 } 2036 2037 args = calloc(1, sizeof(*args)); 2038 if (args == NULL) { 2039 return; 2040 } 2041 2042 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2043 2044 args->file = file; 2045 args->op.readahead.offset = offset; 2046 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2047 args->op.readahead.cache_buffer->in_progress = true; 2048 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2049 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2050 } else { 2051 args->op.readahead.length = CACHE_BUFFER_SIZE; 2052 } 2053 file->fs->send_request(__readahead, args); 2054 } 2055 2056 static int 2057 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2058 { 2059 struct cache_buffer *buf; 2060 int rc; 2061 2062 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2063 if (buf == NULL) { 2064 pthread_spin_unlock(&file->lock); 2065 rc = __send_rw_from_file(file, sem, payload, offset, length, true); 2066 pthread_spin_lock(&file->lock); 2067 return rc; 2068 } 2069 2070 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2071 length = buf->offset + buf->bytes_filled - offset; 2072 } 2073 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2074 memcpy(payload, &buf->buf[offset - buf->offset], length); 2075 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2076 pthread_spin_lock(&g_caches_lock); 2077 spdk_tree_remove_buffer(file->tree, buf); 2078 if (file->tree->present_mask == 0) { 2079 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2080 } 2081 pthread_spin_unlock(&g_caches_lock); 2082 } 2083 2084 sem_post(sem); 2085 return 0; 2086 } 2087 2088 int64_t 2089 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2090 void *payload, uint64_t offset, uint64_t length) 2091 { 2092 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2093 uint64_t final_offset, final_length; 2094 uint32_t sub_reads = 0; 2095 int rc = 0; 2096 2097 pthread_spin_lock(&file->lock); 2098 2099 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2100 2101 file->open_for_writing = false; 2102 2103 if (length == 0 || offset >= file->append_pos) { 2104 pthread_spin_unlock(&file->lock); 2105 return 0; 2106 } 2107 2108 if (offset + length > file->append_pos) { 2109 length = file->append_pos - offset; 2110 } 2111 2112 if (offset != file->next_seq_offset) { 2113 file->seq_byte_count = 0; 2114 } 2115 file->seq_byte_count += length; 2116 file->next_seq_offset = offset + length; 2117 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2118 check_readahead(file, offset); 2119 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2120 } 2121 2122 final_length = 0; 2123 final_offset = offset + length; 2124 while (offset < final_offset) { 2125 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2126 if (length > (final_offset - offset)) { 2127 length = final_offset - offset; 2128 } 2129 rc = __file_read(file, payload, offset, length, &channel->sem); 2130 if (rc == 0) { 2131 final_length += length; 2132 } else { 2133 break; 2134 } 2135 payload += length; 2136 offset += length; 2137 sub_reads++; 2138 } 2139 pthread_spin_unlock(&file->lock); 2140 while (sub_reads-- > 0) { 2141 sem_wait(&channel->sem); 2142 } 2143 if (rc == 0) { 2144 return final_length; 2145 } else { 2146 return rc; 2147 } 2148 } 2149 2150 static void 2151 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2152 spdk_file_op_complete cb_fn, void *cb_arg) 2153 { 2154 struct spdk_fs_request *sync_req; 2155 struct spdk_fs_request *flush_req; 2156 struct spdk_fs_cb_args *sync_args; 2157 struct spdk_fs_cb_args *flush_args; 2158 2159 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2160 2161 pthread_spin_lock(&file->lock); 2162 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2163 BLOBFS_TRACE(file, "done - no data to flush\n"); 2164 pthread_spin_unlock(&file->lock); 2165 cb_fn(cb_arg, 0); 2166 return; 2167 } 2168 2169 sync_req = alloc_fs_request(channel); 2170 assert(sync_req != NULL); 2171 sync_args = &sync_req->args; 2172 2173 flush_req = alloc_fs_request(channel); 2174 assert(flush_req != NULL); 2175 flush_args = &flush_req->args; 2176 2177 sync_args->file = file; 2178 sync_args->fn.file_op = cb_fn; 2179 sync_args->arg = cb_arg; 2180 sync_args->op.sync.offset = file->append_pos; 2181 sync_args->op.sync.xattr_in_progress = false; 2182 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2183 pthread_spin_unlock(&file->lock); 2184 2185 flush_args->file = file; 2186 channel->send_request(__file_flush, flush_args); 2187 } 2188 2189 int 2190 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2191 { 2192 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2193 2194 _file_sync(file, channel, __sem_post, &channel->sem); 2195 sem_wait(&channel->sem); 2196 2197 return 0; 2198 } 2199 2200 void 2201 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2202 spdk_file_op_complete cb_fn, void *cb_arg) 2203 { 2204 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2205 2206 _file_sync(file, channel, cb_fn, cb_arg); 2207 } 2208 2209 void 2210 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2211 { 2212 BLOBFS_TRACE(file, "priority=%u\n", priority); 2213 file->priority = priority; 2214 2215 } 2216 2217 /* 2218 * Close routines 2219 */ 2220 2221 static void 2222 __file_close_async_done(void *ctx, int bserrno) 2223 { 2224 struct spdk_fs_request *req = ctx; 2225 struct spdk_fs_cb_args *args = &req->args; 2226 2227 args->fn.file_op(args->arg, bserrno); 2228 free_fs_request(req); 2229 } 2230 2231 static void 2232 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2233 { 2234 pthread_spin_lock(&file->lock); 2235 if (file->ref_count == 0) { 2236 pthread_spin_unlock(&file->lock); 2237 __file_close_async_done(req, -EBADF); 2238 return; 2239 } 2240 2241 file->ref_count--; 2242 if (file->ref_count > 0) { 2243 pthread_spin_unlock(&file->lock); 2244 __file_close_async_done(req, 0); 2245 return; 2246 } 2247 2248 pthread_spin_unlock(&file->lock); 2249 2250 spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req); 2251 } 2252 2253 static void 2254 __file_close_async__sync_done(void *arg, int fserrno) 2255 { 2256 struct spdk_fs_request *req = arg; 2257 struct spdk_fs_cb_args *args = &req->args; 2258 2259 __file_close_async(args->file, req); 2260 } 2261 2262 void 2263 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2264 { 2265 struct spdk_fs_request *req; 2266 struct spdk_fs_cb_args *args; 2267 2268 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2269 if (req == NULL) { 2270 cb_fn(cb_arg, -ENOMEM); 2271 return; 2272 } 2273 2274 args = &req->args; 2275 args->file = file; 2276 args->fn.file_op = cb_fn; 2277 args->arg = cb_arg; 2278 2279 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2280 } 2281 2282 static void 2283 __file_close_done(void *arg, int fserrno) 2284 { 2285 struct spdk_fs_cb_args *args = arg; 2286 2287 args->rc = fserrno; 2288 sem_post(args->sem); 2289 } 2290 2291 static void 2292 __file_close(void *arg) 2293 { 2294 struct spdk_fs_request *req = arg; 2295 struct spdk_fs_cb_args *args = &req->args; 2296 struct spdk_file *file = args->file; 2297 2298 __file_close_async(file, req); 2299 } 2300 2301 int 2302 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2303 { 2304 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2305 struct spdk_fs_request *req; 2306 struct spdk_fs_cb_args *args; 2307 2308 req = alloc_fs_request(channel); 2309 assert(req != NULL); 2310 2311 args = &req->args; 2312 2313 spdk_file_sync(file, _channel); 2314 BLOBFS_TRACE(file, "name=%s\n", file->name); 2315 args->file = file; 2316 args->sem = &channel->sem; 2317 args->fn.file_op = __file_close_done; 2318 args->arg = req; 2319 channel->send_request(__file_close, req); 2320 sem_wait(&channel->sem); 2321 2322 return args->rc; 2323 } 2324 2325 static void 2326 cache_free_buffers(struct spdk_file *file) 2327 { 2328 BLOBFS_TRACE(file, "free=%s\n", file->name); 2329 pthread_spin_lock(&file->lock); 2330 pthread_spin_lock(&g_caches_lock); 2331 if (file->tree->present_mask == 0) { 2332 pthread_spin_unlock(&g_caches_lock); 2333 pthread_spin_unlock(&file->lock); 2334 return; 2335 } 2336 spdk_tree_free_buffers(file->tree); 2337 if (file->tree->present_mask == 0) { 2338 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2339 } 2340 file->last = NULL; 2341 pthread_spin_unlock(&g_caches_lock); 2342 pthread_spin_unlock(&file->lock); 2343 } 2344 2345 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS); 2346 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW); 2347