1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "blobfs_internal.h" 38 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/assert.h" 42 #include "spdk/env.h" 43 #include "spdk/util.h" 44 #include "spdk_internal/log.h" 45 46 #define BLOBFS_TRACE(file, str, args...) \ 47 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args) 48 49 #define BLOBFS_TRACE_RW(file, str, args...) \ 50 SPDK_TRACELOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args) 51 52 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 53 54 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 55 static struct spdk_mempool *g_cache_pool; 56 static TAILQ_HEAD(, spdk_file) g_caches; 57 static int g_fs_count = 0; 58 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 59 static pthread_spinlock_t g_caches_lock; 60 61 static void 62 __sem_post(void *arg, int bserrno) 63 { 64 sem_t *sem = arg; 65 66 sem_post(sem); 67 } 68 69 void 70 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 71 { 72 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 73 free(cache_buffer); 74 } 75 76 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 77 78 struct spdk_file { 79 struct spdk_filesystem *fs; 80 struct spdk_blob *blob; 81 char *name; 82 uint64_t length; 83 bool open_for_writing; 84 uint64_t length_flushed; 85 uint64_t append_pos; 86 uint64_t seq_byte_count; 87 uint64_t next_seq_offset; 88 uint32_t priority; 89 TAILQ_ENTRY(spdk_file) tailq; 90 spdk_blob_id blobid; 91 uint32_t ref_count; 92 pthread_spinlock_t lock; 93 struct cache_buffer *last; 94 struct cache_tree *tree; 95 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 96 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 97 TAILQ_ENTRY(spdk_file) cache_tailq; 98 }; 99 100 struct spdk_filesystem { 101 struct spdk_blob_store *bs; 102 TAILQ_HEAD(, spdk_file) files; 103 struct spdk_bs_opts bs_opts; 104 struct spdk_bs_dev *bdev; 105 fs_send_request_fn send_request; 106 107 struct { 108 uint32_t max_ops; 109 struct spdk_io_channel *sync_io_channel; 110 struct spdk_fs_channel *sync_fs_channel; 111 } sync_target; 112 113 struct { 114 uint32_t max_ops; 115 struct spdk_io_channel *md_io_channel; 116 struct spdk_fs_channel *md_fs_channel; 117 } md_target; 118 119 struct { 120 uint32_t max_ops; 121 } io_target; 122 }; 123 124 struct spdk_fs_cb_args { 125 union { 126 spdk_fs_op_with_handle_complete fs_op_with_handle; 127 spdk_fs_op_complete fs_op; 128 spdk_file_op_with_handle_complete file_op_with_handle; 129 spdk_file_op_complete file_op; 130 spdk_file_stat_op_complete stat_op; 131 } fn; 132 void *arg; 133 sem_t *sem; 134 struct spdk_filesystem *fs; 135 struct spdk_file *file; 136 int rc; 137 bool from_request; 138 union { 139 struct { 140 uint64_t length; 141 } truncate; 142 struct { 143 struct spdk_io_channel *channel; 144 void *user_buf; 145 void *pin_buf; 146 int is_read; 147 off_t offset; 148 size_t length; 149 uint64_t start_page; 150 uint64_t num_pages; 151 uint32_t blocklen; 152 } rw; 153 struct { 154 const char *old_name; 155 const char *new_name; 156 } rename; 157 struct { 158 struct cache_buffer *cache_buffer; 159 uint64_t length; 160 } flush; 161 struct { 162 struct cache_buffer *cache_buffer; 163 uint64_t length; 164 uint64_t offset; 165 } readahead; 166 struct { 167 uint64_t offset; 168 TAILQ_ENTRY(spdk_fs_request) tailq; 169 bool xattr_in_progress; 170 } sync; 171 struct { 172 uint32_t num_clusters; 173 } resize; 174 struct { 175 const char *name; 176 uint32_t flags; 177 TAILQ_ENTRY(spdk_fs_request) tailq; 178 } open; 179 struct { 180 const char *name; 181 } create; 182 struct { 183 const char *name; 184 } delete; 185 struct { 186 const char *name; 187 } stat; 188 } op; 189 }; 190 191 static void cache_free_buffers(struct spdk_file *file); 192 193 static void 194 __initialize_cache(void) 195 { 196 assert(g_cache_pool == NULL); 197 198 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 199 g_fs_cache_size / CACHE_BUFFER_SIZE, 200 CACHE_BUFFER_SIZE, -1, SPDK_ENV_SOCKET_ID_ANY); 201 TAILQ_INIT(&g_caches); 202 pthread_spin_init(&g_caches_lock, 0); 203 } 204 205 static void 206 __free_cache(void) 207 { 208 assert(g_cache_pool != NULL); 209 210 spdk_mempool_free(g_cache_pool); 211 g_cache_pool = NULL; 212 } 213 214 static uint64_t 215 __file_get_blob_size(struct spdk_file *file) 216 { 217 uint64_t cluster_sz; 218 219 cluster_sz = file->fs->bs_opts.cluster_sz; 220 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 221 } 222 223 struct spdk_fs_request { 224 struct spdk_fs_cb_args args; 225 TAILQ_ENTRY(spdk_fs_request) link; 226 struct spdk_fs_channel *channel; 227 }; 228 229 struct spdk_fs_channel { 230 struct spdk_fs_request *req_mem; 231 TAILQ_HEAD(, spdk_fs_request) reqs; 232 sem_t sem; 233 struct spdk_filesystem *fs; 234 struct spdk_io_channel *bs_channel; 235 fs_send_request_fn send_request; 236 bool sync; 237 pthread_spinlock_t lock; 238 }; 239 240 static struct spdk_fs_request * 241 alloc_fs_request(struct spdk_fs_channel *channel) 242 { 243 struct spdk_fs_request *req; 244 245 if (channel->sync) { 246 pthread_spin_lock(&channel->lock); 247 } 248 249 req = TAILQ_FIRST(&channel->reqs); 250 if (req) { 251 TAILQ_REMOVE(&channel->reqs, req, link); 252 } 253 254 if (channel->sync) { 255 pthread_spin_unlock(&channel->lock); 256 } 257 258 if (req == NULL) { 259 return NULL; 260 } 261 memset(req, 0, sizeof(*req)); 262 req->channel = channel; 263 req->args.from_request = true; 264 265 return req; 266 } 267 268 static void 269 free_fs_request(struct spdk_fs_request *req) 270 { 271 struct spdk_fs_channel *channel = req->channel; 272 273 if (channel->sync) { 274 pthread_spin_lock(&channel->lock); 275 } 276 277 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 278 279 if (channel->sync) { 280 pthread_spin_unlock(&channel->lock); 281 } 282 } 283 284 static int 285 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 286 uint32_t max_ops) 287 { 288 uint32_t i; 289 290 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 291 if (!channel->req_mem) { 292 return -1; 293 } 294 295 TAILQ_INIT(&channel->reqs); 296 sem_init(&channel->sem, 0, 0); 297 298 for (i = 0; i < max_ops; i++) { 299 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 300 } 301 302 channel->fs = fs; 303 304 return 0; 305 } 306 307 static int 308 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 309 { 310 struct spdk_filesystem *fs; 311 struct spdk_fs_channel *channel = ctx_buf; 312 313 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 314 315 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 316 } 317 318 static int 319 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 320 { 321 struct spdk_filesystem *fs; 322 struct spdk_fs_channel *channel = ctx_buf; 323 324 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 325 326 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 327 } 328 329 static int 330 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 331 { 332 struct spdk_filesystem *fs; 333 struct spdk_fs_channel *channel = ctx_buf; 334 335 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 336 337 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 338 } 339 340 static void 341 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 342 { 343 struct spdk_fs_channel *channel = ctx_buf; 344 345 free(channel->req_mem); 346 if (channel->bs_channel != NULL) { 347 spdk_bs_free_io_channel(channel->bs_channel); 348 } 349 } 350 351 static void 352 __send_request_direct(fs_request_fn fn, void *arg) 353 { 354 fn(arg); 355 } 356 357 static void 358 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 359 { 360 fs->bs = bs; 361 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 362 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 363 fs->md_target.md_fs_channel->send_request = __send_request_direct; 364 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 365 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 366 367 pthread_mutex_lock(&g_cache_init_lock); 368 if (g_fs_count == 0) { 369 __initialize_cache(); 370 } 371 g_fs_count++; 372 pthread_mutex_unlock(&g_cache_init_lock); 373 } 374 375 static void 376 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 377 { 378 struct spdk_fs_request *req = ctx; 379 struct spdk_fs_cb_args *args = &req->args; 380 struct spdk_filesystem *fs = args->fs; 381 382 if (bserrno == 0) { 383 common_fs_bs_init(fs, bs); 384 } else { 385 free(fs); 386 fs = NULL; 387 } 388 389 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 390 free_fs_request(req); 391 } 392 393 static struct spdk_filesystem * 394 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 395 { 396 struct spdk_filesystem *fs; 397 398 fs = calloc(1, sizeof(*fs)); 399 if (fs == NULL) { 400 return NULL; 401 } 402 403 fs->bdev = dev; 404 fs->send_request = send_request_fn; 405 TAILQ_INIT(&fs->files); 406 407 fs->md_target.max_ops = 512; 408 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 409 sizeof(struct spdk_fs_channel)); 410 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 411 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 412 413 fs->sync_target.max_ops = 512; 414 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 415 sizeof(struct spdk_fs_channel)); 416 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 417 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 418 419 fs->io_target.max_ops = 512; 420 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 421 sizeof(struct spdk_fs_channel)); 422 423 return fs; 424 } 425 426 void 427 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 428 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 429 { 430 struct spdk_filesystem *fs; 431 struct spdk_fs_request *req; 432 struct spdk_fs_cb_args *args; 433 434 fs = fs_alloc(dev, send_request_fn); 435 if (fs == NULL) { 436 cb_fn(cb_arg, NULL, -ENOMEM); 437 return; 438 } 439 440 req = alloc_fs_request(fs->md_target.md_fs_channel); 441 if (req == NULL) { 442 cb_fn(cb_arg, NULL, -ENOMEM); 443 return; 444 } 445 446 args = &req->args; 447 args->fn.fs_op_with_handle = cb_fn; 448 args->arg = cb_arg; 449 args->fs = fs; 450 451 spdk_bs_init(dev, NULL, init_cb, req); 452 } 453 454 static struct spdk_file * 455 file_alloc(struct spdk_filesystem *fs) 456 { 457 struct spdk_file *file; 458 459 file = calloc(1, sizeof(*file)); 460 if (file == NULL) { 461 return NULL; 462 } 463 464 file->fs = fs; 465 TAILQ_INIT(&file->open_requests); 466 TAILQ_INIT(&file->sync_requests); 467 pthread_spin_init(&file->lock, 0); 468 file->tree = calloc(1, sizeof(*file->tree)); 469 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 470 file->priority = SPDK_FILE_PRIORITY_LOW; 471 return file; 472 } 473 474 static void 475 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 476 { 477 struct spdk_fs_request *req = ctx; 478 struct spdk_fs_cb_args *args = &req->args; 479 struct spdk_filesystem *fs = args->fs; 480 struct spdk_file *f; 481 uint64_t *length; 482 const char *name; 483 size_t value_len; 484 485 if (rc == -ENOENT) { 486 /* Finished iterating */ 487 args->fn.fs_op_with_handle(args->arg, fs, 0); 488 free_fs_request(req); 489 return; 490 } else if (rc < 0) { 491 args->fn.fs_op_with_handle(args->arg, fs, rc); 492 free_fs_request(req); 493 return; 494 } 495 496 rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len); 497 if (rc < 0) { 498 args->fn.fs_op_with_handle(args->arg, fs, rc); 499 free_fs_request(req); 500 return; 501 } 502 503 rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len); 504 if (rc < 0) { 505 args->fn.fs_op_with_handle(args->arg, fs, rc); 506 free_fs_request(req); 507 return; 508 } 509 assert(value_len == 8); 510 511 f = file_alloc(fs); 512 if (f == NULL) { 513 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 514 free_fs_request(req); 515 return; 516 } 517 518 f->name = strdup(name); 519 f->blobid = spdk_blob_get_id(blob); 520 f->length = *length; 521 f->length_flushed = *length; 522 f->append_pos = *length; 523 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 524 525 spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req); 526 } 527 528 static void 529 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 530 { 531 struct spdk_fs_request *req = ctx; 532 struct spdk_fs_cb_args *args = &req->args; 533 struct spdk_filesystem *fs = args->fs; 534 535 if (bserrno != 0) { 536 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 537 free_fs_request(req); 538 free(fs); 539 return; 540 } 541 542 common_fs_bs_init(fs, bs); 543 spdk_bs_md_iter_first(fs->bs, iter_cb, req); 544 } 545 546 void 547 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 548 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 549 { 550 struct spdk_filesystem *fs; 551 struct spdk_fs_cb_args *args; 552 struct spdk_fs_request *req; 553 554 fs = fs_alloc(dev, send_request_fn); 555 if (fs == NULL) { 556 cb_fn(cb_arg, NULL, -ENOMEM); 557 return; 558 } 559 560 req = alloc_fs_request(fs->md_target.md_fs_channel); 561 if (req == NULL) { 562 cb_fn(cb_arg, NULL, -ENOMEM); 563 return; 564 } 565 566 args = &req->args; 567 args->fn.fs_op_with_handle = cb_fn; 568 args->arg = cb_arg; 569 args->fs = fs; 570 571 spdk_bs_load(dev, load_cb, req); 572 } 573 574 static void 575 unload_cb(void *ctx, int bserrno) 576 { 577 struct spdk_fs_request *req = ctx; 578 struct spdk_fs_cb_args *args = &req->args; 579 struct spdk_filesystem *fs = args->fs; 580 581 pthread_mutex_lock(&g_cache_init_lock); 582 g_fs_count--; 583 if (g_fs_count == 0) { 584 __free_cache(); 585 } 586 pthread_mutex_unlock(&g_cache_init_lock); 587 588 args->fn.fs_op(args->arg, bserrno); 589 free(req); 590 591 spdk_io_device_unregister(&fs->io_target); 592 spdk_io_device_unregister(&fs->sync_target); 593 spdk_io_device_unregister(&fs->md_target); 594 595 free(fs); 596 } 597 598 void 599 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 600 { 601 struct spdk_fs_request *req; 602 struct spdk_fs_cb_args *args; 603 604 /* 605 * We must free the md_channel before unloading the blobstore, so just 606 * allocate this request from the general heap. 607 */ 608 req = calloc(1, sizeof(*req)); 609 if (req == NULL) { 610 cb_fn(cb_arg, -ENOMEM); 611 return; 612 } 613 614 args = &req->args; 615 args->fn.fs_op = cb_fn; 616 args->arg = cb_arg; 617 args->fs = fs; 618 619 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 620 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 621 spdk_bs_unload(fs->bs, unload_cb, req); 622 } 623 624 static struct spdk_file * 625 fs_find_file(struct spdk_filesystem *fs, const char *name) 626 { 627 struct spdk_file *file; 628 629 TAILQ_FOREACH(file, &fs->files, tailq) { 630 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 631 return file; 632 } 633 } 634 635 return NULL; 636 } 637 638 void 639 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 640 spdk_file_stat_op_complete cb_fn, void *cb_arg) 641 { 642 struct spdk_file_stat stat; 643 struct spdk_file *f = NULL; 644 645 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 646 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 647 return; 648 } 649 650 f = fs_find_file(fs, name); 651 if (f != NULL) { 652 stat.blobid = f->blobid; 653 stat.size = f->length; 654 cb_fn(cb_arg, &stat, 0); 655 return; 656 } 657 658 cb_fn(cb_arg, NULL, -ENOENT); 659 } 660 661 static void 662 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 663 { 664 struct spdk_fs_request *req = arg; 665 struct spdk_fs_cb_args *args = &req->args; 666 667 args->rc = fserrno; 668 if (fserrno == 0) { 669 memcpy(args->arg, stat, sizeof(*stat)); 670 } 671 sem_post(args->sem); 672 } 673 674 static void 675 __file_stat(void *arg) 676 { 677 struct spdk_fs_request *req = arg; 678 struct spdk_fs_cb_args *args = &req->args; 679 680 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 681 args->fn.stat_op, req); 682 } 683 684 int 685 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 686 const char *name, struct spdk_file_stat *stat) 687 { 688 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 689 struct spdk_fs_request *req; 690 int rc; 691 692 req = alloc_fs_request(channel); 693 assert(req != NULL); 694 695 req->args.fs = fs; 696 req->args.op.stat.name = name; 697 req->args.fn.stat_op = __copy_stat; 698 req->args.arg = stat; 699 req->args.sem = &channel->sem; 700 channel->send_request(__file_stat, req); 701 sem_wait(&channel->sem); 702 703 rc = req->args.rc; 704 free_fs_request(req); 705 706 return rc; 707 } 708 709 static void 710 fs_create_blob_close_cb(void *ctx, int bserrno) 711 { 712 struct spdk_fs_request *req = ctx; 713 struct spdk_fs_cb_args *args = &req->args; 714 715 args->fn.file_op(args->arg, bserrno); 716 free_fs_request(req); 717 } 718 719 static void 720 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 721 { 722 struct spdk_fs_request *req = ctx; 723 struct spdk_fs_cb_args *args = &req->args; 724 struct spdk_file *f = args->file; 725 uint64_t length = 0; 726 727 f->blob = blob; 728 spdk_bs_md_resize_blob(blob, 1); 729 spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 730 spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length)); 731 732 spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args); 733 } 734 735 static void 736 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 737 { 738 struct spdk_fs_request *req = ctx; 739 struct spdk_fs_cb_args *args = &req->args; 740 struct spdk_file *f = args->file; 741 742 f->blobid = blobid; 743 spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 744 } 745 746 void 747 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 748 spdk_file_op_complete cb_fn, void *cb_arg) 749 { 750 struct spdk_file *file; 751 struct spdk_fs_request *req; 752 struct spdk_fs_cb_args *args; 753 754 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 755 cb_fn(cb_arg, -ENAMETOOLONG); 756 return; 757 } 758 759 file = fs_find_file(fs, name); 760 if (file != NULL) { 761 cb_fn(cb_arg, -EEXIST); 762 return; 763 } 764 765 file = file_alloc(fs); 766 if (file == NULL) { 767 cb_fn(cb_arg, -ENOMEM); 768 return; 769 } 770 771 req = alloc_fs_request(fs->md_target.md_fs_channel); 772 if (req == NULL) { 773 cb_fn(cb_arg, -ENOMEM); 774 return; 775 } 776 777 args = &req->args; 778 args->file = file; 779 args->fn.file_op = cb_fn; 780 args->arg = cb_arg; 781 782 file->name = strdup(name); 783 spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args); 784 } 785 786 static void 787 __fs_create_file_done(void *arg, int fserrno) 788 { 789 struct spdk_fs_request *req = arg; 790 struct spdk_fs_cb_args *args = &req->args; 791 792 args->rc = fserrno; 793 sem_post(args->sem); 794 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 795 } 796 797 static void 798 __fs_create_file(void *arg) 799 { 800 struct spdk_fs_request *req = arg; 801 struct spdk_fs_cb_args *args = &req->args; 802 803 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 804 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 805 } 806 807 int 808 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 809 { 810 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 811 struct spdk_fs_request *req; 812 struct spdk_fs_cb_args *args; 813 int rc; 814 815 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 816 817 req = alloc_fs_request(channel); 818 assert(req != NULL); 819 820 args = &req->args; 821 args->fs = fs; 822 args->op.create.name = name; 823 args->sem = &channel->sem; 824 fs->send_request(__fs_create_file, req); 825 sem_wait(&channel->sem); 826 rc = args->rc; 827 free_fs_request(req); 828 829 return rc; 830 } 831 832 static void 833 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 834 { 835 struct spdk_fs_request *req = ctx; 836 struct spdk_fs_cb_args *args = &req->args; 837 struct spdk_file *f = args->file; 838 839 f->blob = blob; 840 while (!TAILQ_EMPTY(&f->open_requests)) { 841 req = TAILQ_FIRST(&f->open_requests); 842 args = &req->args; 843 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 844 args->fn.file_op_with_handle(args->arg, f, bserrno); 845 free_fs_request(req); 846 } 847 } 848 849 static void 850 fs_open_blob_create_cb(void *ctx, int bserrno) 851 { 852 struct spdk_fs_request *req = ctx; 853 struct spdk_fs_cb_args *args = &req->args; 854 struct spdk_file *file = args->file; 855 struct spdk_filesystem *fs = args->fs; 856 857 if (file == NULL) { 858 /* 859 * This is from an open with CREATE flag - the file 860 * is now created so look it up in the file list for this 861 * filesystem. 862 */ 863 file = fs_find_file(fs, args->op.open.name); 864 assert(file != NULL); 865 args->file = file; 866 } 867 868 file->ref_count++; 869 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 870 if (file->ref_count == 1) { 871 assert(file->blob == NULL); 872 spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 873 } else if (file->blob != NULL) { 874 fs_open_blob_done(req, file->blob, 0); 875 } else { 876 /* 877 * The blob open for this file is in progress due to a previous 878 * open request. When that open completes, it will invoke the 879 * open callback for this request. 880 */ 881 } 882 } 883 884 void 885 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 886 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 887 { 888 struct spdk_file *f = NULL; 889 struct spdk_fs_request *req; 890 struct spdk_fs_cb_args *args; 891 892 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 893 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 894 return; 895 } 896 897 f = fs_find_file(fs, name); 898 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 899 cb_fn(cb_arg, NULL, -ENOENT); 900 return; 901 } 902 903 req = alloc_fs_request(fs->md_target.md_fs_channel); 904 if (req == NULL) { 905 cb_fn(cb_arg, NULL, -ENOMEM); 906 return; 907 } 908 909 args = &req->args; 910 args->fn.file_op_with_handle = cb_fn; 911 args->arg = cb_arg; 912 args->file = f; 913 args->fs = fs; 914 args->op.open.name = name; 915 916 if (f == NULL) { 917 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 918 } else { 919 fs_open_blob_create_cb(req, 0); 920 } 921 } 922 923 static void 924 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 925 { 926 struct spdk_fs_request *req = arg; 927 struct spdk_fs_cb_args *args = &req->args; 928 929 args->file = file; 930 args->rc = bserrno; 931 sem_post(args->sem); 932 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 933 } 934 935 static void 936 __fs_open_file(void *arg) 937 { 938 struct spdk_fs_request *req = arg; 939 struct spdk_fs_cb_args *args = &req->args; 940 941 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 942 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 943 __fs_open_file_done, req); 944 } 945 946 int 947 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 948 const char *name, uint32_t flags, struct spdk_file **file) 949 { 950 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 951 struct spdk_fs_request *req; 952 struct spdk_fs_cb_args *args; 953 int rc; 954 955 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 956 957 req = alloc_fs_request(channel); 958 assert(req != NULL); 959 960 args = &req->args; 961 args->fs = fs; 962 args->op.open.name = name; 963 args->op.open.flags = flags; 964 args->sem = &channel->sem; 965 fs->send_request(__fs_open_file, req); 966 sem_wait(&channel->sem); 967 rc = args->rc; 968 if (rc == 0) { 969 *file = args->file; 970 } else { 971 *file = NULL; 972 } 973 free_fs_request(req); 974 975 return rc; 976 } 977 978 static void 979 fs_rename_blob_close_cb(void *ctx, int bserrno) 980 { 981 struct spdk_fs_request *req = ctx; 982 struct spdk_fs_cb_args *args = &req->args; 983 984 args->fn.fs_op(args->arg, bserrno); 985 free_fs_request(req); 986 } 987 988 static void 989 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 990 { 991 struct spdk_fs_request *req = ctx; 992 struct spdk_fs_cb_args *args = &req->args; 993 struct spdk_file *f = args->file; 994 const char *new_name = args->op.rename.new_name; 995 996 f->blob = blob; 997 spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 998 spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req); 999 } 1000 1001 static void 1002 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1003 { 1004 struct spdk_fs_cb_args *args = &req->args; 1005 struct spdk_file *f; 1006 1007 f = fs_find_file(args->fs, args->op.rename.old_name); 1008 if (f == NULL) { 1009 args->fn.fs_op(args->arg, -ENOENT); 1010 free_fs_request(req); 1011 return; 1012 } 1013 1014 free(f->name); 1015 f->name = strdup(args->op.rename.new_name); 1016 args->file = f; 1017 spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1018 } 1019 1020 static void 1021 fs_rename_delete_done(void *arg, int fserrno) 1022 { 1023 __spdk_fs_md_rename_file(arg); 1024 } 1025 1026 void 1027 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1028 const char *old_name, const char *new_name, 1029 spdk_file_op_complete cb_fn, void *cb_arg) 1030 { 1031 struct spdk_file *f; 1032 struct spdk_fs_request *req; 1033 struct spdk_fs_cb_args *args; 1034 1035 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1036 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1037 cb_fn(cb_arg, -ENAMETOOLONG); 1038 return; 1039 } 1040 1041 req = alloc_fs_request(fs->md_target.md_fs_channel); 1042 if (req == NULL) { 1043 cb_fn(cb_arg, -ENOMEM); 1044 return; 1045 } 1046 1047 args = &req->args; 1048 args->fn.fs_op = cb_fn; 1049 args->fs = fs; 1050 args->arg = cb_arg; 1051 args->op.rename.old_name = old_name; 1052 args->op.rename.new_name = new_name; 1053 1054 f = fs_find_file(fs, new_name); 1055 if (f == NULL) { 1056 __spdk_fs_md_rename_file(req); 1057 return; 1058 } 1059 1060 /* 1061 * The rename overwrites an existing file. So delete the existing file, then 1062 * do the actual rename. 1063 */ 1064 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1065 } 1066 1067 static void 1068 __fs_rename_file_done(void *arg, int fserrno) 1069 { 1070 struct spdk_fs_request *req = arg; 1071 struct spdk_fs_cb_args *args = &req->args; 1072 1073 args->rc = fserrno; 1074 sem_post(args->sem); 1075 } 1076 1077 static void 1078 __fs_rename_file(void *arg) 1079 { 1080 struct spdk_fs_request *req = arg; 1081 struct spdk_fs_cb_args *args = &req->args; 1082 1083 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1084 __fs_rename_file_done, req); 1085 } 1086 1087 int 1088 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1089 const char *old_name, const char *new_name) 1090 { 1091 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1092 struct spdk_fs_request *req; 1093 struct spdk_fs_cb_args *args; 1094 int rc; 1095 1096 req = alloc_fs_request(channel); 1097 assert(req != NULL); 1098 1099 args = &req->args; 1100 1101 args->fs = fs; 1102 args->op.rename.old_name = old_name; 1103 args->op.rename.new_name = new_name; 1104 args->sem = &channel->sem; 1105 fs->send_request(__fs_rename_file, req); 1106 sem_wait(&channel->sem); 1107 rc = args->rc; 1108 free_fs_request(req); 1109 return rc; 1110 } 1111 1112 static void 1113 blob_delete_cb(void *ctx, int bserrno) 1114 { 1115 struct spdk_fs_request *req = ctx; 1116 struct spdk_fs_cb_args *args = &req->args; 1117 1118 args->fn.file_op(args->arg, bserrno); 1119 free_fs_request(req); 1120 } 1121 1122 void 1123 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1124 spdk_file_op_complete cb_fn, void *cb_arg) 1125 { 1126 struct spdk_file *f; 1127 spdk_blob_id blobid; 1128 struct spdk_fs_request *req; 1129 struct spdk_fs_cb_args *args; 1130 1131 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1132 1133 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1134 cb_fn(cb_arg, -ENAMETOOLONG); 1135 return; 1136 } 1137 1138 f = fs_find_file(fs, name); 1139 if (f == NULL) { 1140 cb_fn(cb_arg, -ENOENT); 1141 return; 1142 } 1143 1144 if (f->ref_count > 0) { 1145 /* For now, do not allow deleting files with open references. */ 1146 cb_fn(cb_arg, -EBUSY); 1147 return; 1148 } 1149 1150 req = alloc_fs_request(fs->md_target.md_fs_channel); 1151 if (req == NULL) { 1152 cb_fn(cb_arg, -ENOMEM); 1153 return; 1154 } 1155 1156 TAILQ_REMOVE(&fs->files, f, tailq); 1157 1158 cache_free_buffers(f); 1159 1160 blobid = f->blobid; 1161 1162 free(f->name); 1163 free(f->tree); 1164 free(f); 1165 1166 args = &req->args; 1167 args->fn.file_op = cb_fn; 1168 args->arg = cb_arg; 1169 spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1170 } 1171 1172 static void 1173 __fs_delete_file_done(void *arg, int fserrno) 1174 { 1175 struct spdk_fs_request *req = arg; 1176 struct spdk_fs_cb_args *args = &req->args; 1177 1178 args->rc = fserrno; 1179 sem_post(args->sem); 1180 } 1181 1182 static void 1183 __fs_delete_file(void *arg) 1184 { 1185 struct spdk_fs_request *req = arg; 1186 struct spdk_fs_cb_args *args = &req->args; 1187 1188 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1189 } 1190 1191 int 1192 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1193 const char *name) 1194 { 1195 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1196 struct spdk_fs_request *req; 1197 struct spdk_fs_cb_args *args; 1198 int rc; 1199 1200 req = alloc_fs_request(channel); 1201 assert(req != NULL); 1202 1203 args = &req->args; 1204 args->fs = fs; 1205 args->op.delete.name = name; 1206 args->sem = &channel->sem; 1207 fs->send_request(__fs_delete_file, req); 1208 sem_wait(&channel->sem); 1209 rc = args->rc; 1210 free_fs_request(req); 1211 1212 return rc; 1213 } 1214 1215 spdk_fs_iter 1216 spdk_fs_iter_first(struct spdk_filesystem *fs) 1217 { 1218 struct spdk_file *f; 1219 1220 f = TAILQ_FIRST(&fs->files); 1221 return f; 1222 } 1223 1224 spdk_fs_iter 1225 spdk_fs_iter_next(spdk_fs_iter iter) 1226 { 1227 struct spdk_file *f = iter; 1228 1229 if (f == NULL) { 1230 return NULL; 1231 } 1232 1233 f = TAILQ_NEXT(f, tailq); 1234 return f; 1235 } 1236 1237 const char * 1238 spdk_file_get_name(struct spdk_file *file) 1239 { 1240 return file->name; 1241 } 1242 1243 uint64_t 1244 spdk_file_get_length(struct spdk_file *file) 1245 { 1246 assert(file != NULL); 1247 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1248 return file->length; 1249 } 1250 1251 static void 1252 fs_truncate_complete_cb(void *ctx, int bserrno) 1253 { 1254 struct spdk_fs_request *req = ctx; 1255 struct spdk_fs_cb_args *args = &req->args; 1256 1257 args->fn.file_op(args->arg, bserrno); 1258 free_fs_request(req); 1259 } 1260 1261 static uint64_t 1262 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1263 { 1264 return (length + cluster_sz - 1) / cluster_sz; 1265 } 1266 1267 void 1268 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1269 spdk_file_op_complete cb_fn, void *cb_arg) 1270 { 1271 struct spdk_filesystem *fs; 1272 size_t num_clusters; 1273 struct spdk_fs_request *req; 1274 struct spdk_fs_cb_args *args; 1275 1276 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1277 if (length == file->length) { 1278 cb_fn(cb_arg, 0); 1279 return; 1280 } 1281 1282 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1283 if (req == NULL) { 1284 cb_fn(cb_arg, -ENOMEM); 1285 return; 1286 } 1287 1288 args = &req->args; 1289 args->fn.file_op = cb_fn; 1290 args->arg = cb_arg; 1291 args->file = file; 1292 fs = file->fs; 1293 1294 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1295 1296 spdk_bs_md_resize_blob(file->blob, num_clusters); 1297 spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length)); 1298 1299 file->length = length; 1300 if (file->append_pos > file->length) { 1301 file->append_pos = file->length; 1302 } 1303 1304 spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args); 1305 } 1306 1307 static void 1308 __truncate(void *arg) 1309 { 1310 struct spdk_fs_request *req = arg; 1311 struct spdk_fs_cb_args *args = &req->args; 1312 1313 spdk_file_truncate_async(args->file, args->op.truncate.length, 1314 args->fn.file_op, args->arg); 1315 } 1316 1317 void 1318 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1319 uint64_t length) 1320 { 1321 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1322 struct spdk_fs_request *req; 1323 struct spdk_fs_cb_args *args; 1324 1325 req = alloc_fs_request(channel); 1326 assert(req != NULL); 1327 1328 args = &req->args; 1329 1330 args->file = file; 1331 args->op.truncate.length = length; 1332 args->fn.file_op = __sem_post; 1333 args->arg = &channel->sem; 1334 1335 channel->send_request(__truncate, req); 1336 sem_wait(&channel->sem); 1337 free_fs_request(req); 1338 } 1339 1340 static void 1341 __rw_done(void *ctx, int bserrno) 1342 { 1343 struct spdk_fs_request *req = ctx; 1344 struct spdk_fs_cb_args *args = &req->args; 1345 1346 spdk_dma_free(args->op.rw.pin_buf); 1347 args->fn.file_op(args->arg, bserrno); 1348 free_fs_request(req); 1349 } 1350 1351 static void 1352 __read_done(void *ctx, int bserrno) 1353 { 1354 struct spdk_fs_request *req = ctx; 1355 struct spdk_fs_cb_args *args = &req->args; 1356 1357 if (args->op.rw.is_read) { 1358 memcpy(args->op.rw.user_buf, 1359 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1360 args->op.rw.length); 1361 __rw_done(req, 0); 1362 } else { 1363 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1364 args->op.rw.user_buf, 1365 args->op.rw.length); 1366 spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel, 1367 args->op.rw.pin_buf, 1368 args->op.rw.start_page, args->op.rw.num_pages, 1369 __rw_done, req); 1370 } 1371 } 1372 1373 static void 1374 __do_blob_read(void *ctx, int fserrno) 1375 { 1376 struct spdk_fs_request *req = ctx; 1377 struct spdk_fs_cb_args *args = &req->args; 1378 1379 spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel, 1380 args->op.rw.pin_buf, 1381 args->op.rw.start_page, args->op.rw.num_pages, 1382 __read_done, req); 1383 } 1384 1385 static void 1386 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1387 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1388 { 1389 uint64_t end_page; 1390 1391 *page_size = spdk_bs_get_page_size(file->fs->bs); 1392 *start_page = offset / *page_size; 1393 end_page = (offset + length - 1) / *page_size; 1394 *num_pages = (end_page - *start_page + 1); 1395 } 1396 1397 static void 1398 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1399 void *payload, uint64_t offset, uint64_t length, 1400 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1401 { 1402 struct spdk_fs_request *req; 1403 struct spdk_fs_cb_args *args; 1404 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1405 uint64_t start_page, num_pages, pin_buf_length; 1406 uint32_t page_size; 1407 1408 if (is_read && offset + length > file->length) { 1409 cb_fn(cb_arg, -EINVAL); 1410 return; 1411 } 1412 1413 req = alloc_fs_request(channel); 1414 if (req == NULL) { 1415 cb_fn(cb_arg, -ENOMEM); 1416 return; 1417 } 1418 1419 args = &req->args; 1420 args->fn.file_op = cb_fn; 1421 args->arg = cb_arg; 1422 args->file = file; 1423 args->op.rw.channel = channel->bs_channel; 1424 args->op.rw.user_buf = payload; 1425 args->op.rw.is_read = is_read; 1426 args->op.rw.offset = offset; 1427 args->op.rw.length = length; 1428 1429 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1430 pin_buf_length = num_pages * page_size; 1431 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL); 1432 1433 args->op.rw.start_page = start_page; 1434 args->op.rw.num_pages = num_pages; 1435 1436 if (!is_read && file->length < offset + length) { 1437 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1438 } else { 1439 __do_blob_read(req, 0); 1440 } 1441 } 1442 1443 void 1444 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1445 void *payload, uint64_t offset, uint64_t length, 1446 spdk_file_op_complete cb_fn, void *cb_arg) 1447 { 1448 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1449 } 1450 1451 void 1452 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1453 void *payload, uint64_t offset, uint64_t length, 1454 spdk_file_op_complete cb_fn, void *cb_arg) 1455 { 1456 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n", 1457 file->name, offset, length); 1458 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1459 } 1460 1461 struct spdk_io_channel * 1462 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1463 { 1464 struct spdk_io_channel *io_channel; 1465 struct spdk_fs_channel *fs_channel; 1466 1467 io_channel = spdk_get_io_channel(&fs->io_target); 1468 fs_channel = spdk_io_channel_get_ctx(io_channel); 1469 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1470 fs_channel->send_request = __send_request_direct; 1471 1472 return io_channel; 1473 } 1474 1475 struct spdk_io_channel * 1476 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs) 1477 { 1478 struct spdk_io_channel *io_channel; 1479 struct spdk_fs_channel *fs_channel; 1480 1481 io_channel = spdk_get_io_channel(&fs->io_target); 1482 fs_channel = spdk_io_channel_get_ctx(io_channel); 1483 fs_channel->send_request = fs->send_request; 1484 fs_channel->sync = 1; 1485 pthread_spin_init(&fs_channel->lock, 0); 1486 1487 return io_channel; 1488 } 1489 1490 void 1491 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1492 { 1493 spdk_put_io_channel(channel); 1494 } 1495 1496 void 1497 spdk_fs_set_cache_size(uint64_t size_in_mb) 1498 { 1499 g_fs_cache_size = size_in_mb * 1024 * 1024; 1500 } 1501 1502 uint64_t 1503 spdk_fs_get_cache_size(void) 1504 { 1505 return g_fs_cache_size / (1024 * 1024); 1506 } 1507 1508 static void __file_flush(void *_args); 1509 1510 static void * 1511 alloc_cache_memory_buffer(struct spdk_file *context) 1512 { 1513 struct spdk_file *file; 1514 void *buf; 1515 1516 buf = spdk_mempool_get(g_cache_pool); 1517 if (buf != NULL) { 1518 return buf; 1519 } 1520 1521 pthread_spin_lock(&g_caches_lock); 1522 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1523 if (!file->open_for_writing && 1524 file->priority == SPDK_FILE_PRIORITY_LOW && 1525 file != context) { 1526 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1527 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1528 break; 1529 } 1530 } 1531 pthread_spin_unlock(&g_caches_lock); 1532 if (file != NULL) { 1533 cache_free_buffers(file); 1534 buf = spdk_mempool_get(g_cache_pool); 1535 if (buf != NULL) { 1536 return buf; 1537 } 1538 } 1539 1540 pthread_spin_lock(&g_caches_lock); 1541 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1542 if (!file->open_for_writing && file != context) { 1543 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1544 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1545 break; 1546 } 1547 } 1548 pthread_spin_unlock(&g_caches_lock); 1549 if (file != NULL) { 1550 cache_free_buffers(file); 1551 buf = spdk_mempool_get(g_cache_pool); 1552 if (buf != NULL) { 1553 return buf; 1554 } 1555 } 1556 1557 pthread_spin_lock(&g_caches_lock); 1558 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1559 if (file != context) { 1560 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1561 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1562 break; 1563 } 1564 } 1565 pthread_spin_unlock(&g_caches_lock); 1566 if (file != NULL) { 1567 cache_free_buffers(file); 1568 buf = spdk_mempool_get(g_cache_pool); 1569 if (buf != NULL) { 1570 return buf; 1571 } 1572 } 1573 1574 assert(false); 1575 return NULL; 1576 } 1577 1578 static struct cache_buffer * 1579 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1580 { 1581 struct cache_buffer *buf; 1582 int count = 0; 1583 1584 buf = calloc(1, sizeof(*buf)); 1585 if (buf == NULL) { 1586 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "calloc failed\n"); 1587 return NULL; 1588 } 1589 1590 buf->buf = alloc_cache_memory_buffer(file); 1591 if (buf->buf == NULL) { 1592 while (buf->buf == NULL) { 1593 /* 1594 * TODO: alloc_cache_memory_buffer() should eventually free 1595 * some buffers. Need a more sophisticated check here, instead 1596 * of just bailing if 100 tries does not result in getting a 1597 * free buffer. This will involve using the sync channel's 1598 * semaphore to block until a buffer becomes available. 1599 */ 1600 if (count++ == 100) { 1601 SPDK_ERRLOG("could not allocate cache buffer\n"); 1602 assert(false); 1603 free(buf); 1604 return NULL; 1605 } 1606 buf->buf = alloc_cache_memory_buffer(file); 1607 } 1608 } 1609 1610 buf->buf_size = CACHE_BUFFER_SIZE; 1611 buf->offset = offset; 1612 1613 pthread_spin_lock(&g_caches_lock); 1614 if (file->tree->present_mask == 0) { 1615 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1616 } 1617 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1618 pthread_spin_unlock(&g_caches_lock); 1619 1620 return buf; 1621 } 1622 1623 static struct cache_buffer * 1624 cache_append_buffer(struct spdk_file *file) 1625 { 1626 struct cache_buffer *last; 1627 1628 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1629 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1630 1631 last = cache_insert_buffer(file, file->append_pos); 1632 if (last == NULL) { 1633 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n"); 1634 return NULL; 1635 } 1636 1637 if (file->last != NULL) { 1638 file->last->next = last; 1639 } 1640 file->last = last; 1641 1642 return last; 1643 } 1644 1645 static void 1646 __wake_caller(struct spdk_fs_cb_args *args) 1647 { 1648 sem_post(args->sem); 1649 } 1650 1651 static void __check_sync_reqs(struct spdk_file *file); 1652 1653 static void 1654 __file_cache_finish_sync(struct spdk_file *file) 1655 { 1656 struct spdk_fs_request *sync_req; 1657 struct spdk_fs_cb_args *sync_args; 1658 1659 pthread_spin_lock(&file->lock); 1660 sync_req = TAILQ_FIRST(&file->sync_requests); 1661 sync_args = &sync_req->args; 1662 assert(sync_args->op.sync.offset <= file->length_flushed); 1663 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1664 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1665 pthread_spin_unlock(&file->lock); 1666 1667 sync_args->fn.file_op(sync_args->arg, 0); 1668 __check_sync_reqs(file); 1669 1670 pthread_spin_lock(&file->lock); 1671 free_fs_request(sync_req); 1672 pthread_spin_unlock(&file->lock); 1673 } 1674 1675 static void 1676 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1677 { 1678 struct spdk_file *file = ctx; 1679 1680 __file_cache_finish_sync(file); 1681 } 1682 1683 static void 1684 __free_args(struct spdk_fs_cb_args *args) 1685 { 1686 struct spdk_fs_request *req; 1687 1688 if (!args->from_request) { 1689 free(args); 1690 } else { 1691 /* Depends on args being at the start of the spdk_fs_request structure. */ 1692 req = (struct spdk_fs_request *)args; 1693 free_fs_request(req); 1694 } 1695 } 1696 1697 static void 1698 __check_sync_reqs(struct spdk_file *file) 1699 { 1700 struct spdk_fs_request *sync_req; 1701 1702 pthread_spin_lock(&file->lock); 1703 1704 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1705 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1706 break; 1707 } 1708 } 1709 1710 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1711 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1712 sync_req->args.op.sync.xattr_in_progress = true; 1713 spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed, 1714 sizeof(file->length_flushed)); 1715 1716 pthread_spin_unlock(&file->lock); 1717 spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file); 1718 } else { 1719 pthread_spin_unlock(&file->lock); 1720 } 1721 } 1722 1723 static void 1724 __file_flush_done(void *arg, int bserrno) 1725 { 1726 struct spdk_fs_cb_args *args = arg; 1727 struct spdk_file *file = args->file; 1728 struct cache_buffer *next = args->op.flush.cache_buffer; 1729 1730 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1731 1732 pthread_spin_lock(&file->lock); 1733 next->in_progress = false; 1734 next->bytes_flushed += args->op.flush.length; 1735 file->length_flushed += args->op.flush.length; 1736 if (file->length_flushed > file->length) { 1737 file->length = file->length_flushed; 1738 } 1739 if (next->bytes_flushed == next->buf_size) { 1740 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1741 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1742 } 1743 1744 /* 1745 * Assert that there is no cached data that extends past the end of the underlying 1746 * blob. 1747 */ 1748 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1749 next->bytes_filled == 0); 1750 1751 pthread_spin_unlock(&file->lock); 1752 1753 __check_sync_reqs(file); 1754 1755 __file_flush(args); 1756 } 1757 1758 static void 1759 __file_flush(void *_args) 1760 { 1761 struct spdk_fs_cb_args *args = _args; 1762 struct spdk_file *file = args->file; 1763 struct cache_buffer *next; 1764 uint64_t offset, length, start_page, num_pages; 1765 uint32_t page_size; 1766 1767 pthread_spin_lock(&file->lock); 1768 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1769 if (next == NULL || next->in_progress) { 1770 /* 1771 * There is either no data to flush, or a flush I/O is already in 1772 * progress. So return immediately - if a flush I/O is in 1773 * progress we will flush more data after that is completed. 1774 */ 1775 __free_args(args); 1776 pthread_spin_unlock(&file->lock); 1777 return; 1778 } 1779 1780 offset = next->offset + next->bytes_flushed; 1781 length = next->bytes_filled - next->bytes_flushed; 1782 if (length == 0) { 1783 __free_args(args); 1784 pthread_spin_unlock(&file->lock); 1785 return; 1786 } 1787 args->op.flush.length = length; 1788 args->op.flush.cache_buffer = next; 1789 1790 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1791 1792 next->in_progress = true; 1793 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1794 offset, length, start_page, num_pages); 1795 pthread_spin_unlock(&file->lock); 1796 spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1797 next->buf + (start_page * page_size) - next->offset, 1798 start_page, num_pages, 1799 __file_flush_done, args); 1800 } 1801 1802 static void 1803 __file_extend_done(void *arg, int bserrno) 1804 { 1805 struct spdk_fs_cb_args *args = arg; 1806 1807 __wake_caller(args); 1808 } 1809 1810 static void 1811 __file_extend_blob(void *_args) 1812 { 1813 struct spdk_fs_cb_args *args = _args; 1814 struct spdk_file *file = args->file; 1815 1816 spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters); 1817 1818 spdk_bs_md_sync_blob(file->blob, __file_extend_done, args); 1819 } 1820 1821 static void 1822 __rw_from_file_done(void *arg, int bserrno) 1823 { 1824 struct spdk_fs_cb_args *args = arg; 1825 1826 __wake_caller(args); 1827 __free_args(args); 1828 } 1829 1830 static void 1831 __rw_from_file(void *_args) 1832 { 1833 struct spdk_fs_cb_args *args = _args; 1834 struct spdk_file *file = args->file; 1835 1836 if (args->op.rw.is_read) { 1837 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1838 args->op.rw.offset, args->op.rw.length, 1839 __rw_from_file_done, args); 1840 } else { 1841 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1842 args->op.rw.offset, args->op.rw.length, 1843 __rw_from_file_done, args); 1844 } 1845 } 1846 1847 static int 1848 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1849 uint64_t offset, uint64_t length, bool is_read) 1850 { 1851 struct spdk_fs_cb_args *args; 1852 1853 args = calloc(1, sizeof(*args)); 1854 if (args == NULL) { 1855 sem_post(sem); 1856 return -ENOMEM; 1857 } 1858 1859 args->file = file; 1860 args->sem = sem; 1861 args->op.rw.user_buf = payload; 1862 args->op.rw.offset = offset; 1863 args->op.rw.length = length; 1864 args->op.rw.is_read = is_read; 1865 file->fs->send_request(__rw_from_file, args); 1866 return 0; 1867 } 1868 1869 int 1870 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 1871 void *payload, uint64_t offset, uint64_t length) 1872 { 1873 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1874 struct spdk_fs_cb_args *args; 1875 uint64_t rem_length, copy, blob_size, cluster_sz; 1876 uint32_t cache_buffers_filled = 0; 1877 uint8_t *cur_payload; 1878 struct cache_buffer *last; 1879 1880 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 1881 1882 if (length == 0) { 1883 return 0; 1884 } 1885 1886 if (offset != file->append_pos) { 1887 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 1888 return -EINVAL; 1889 } 1890 1891 pthread_spin_lock(&file->lock); 1892 file->open_for_writing = true; 1893 1894 if (file->last == NULL) { 1895 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 1896 cache_append_buffer(file); 1897 } else { 1898 int rc; 1899 1900 file->append_pos += length; 1901 pthread_spin_unlock(&file->lock); 1902 rc = __send_rw_from_file(file, &channel->sem, payload, 1903 offset, length, false); 1904 sem_wait(&channel->sem); 1905 return rc; 1906 } 1907 } 1908 1909 blob_size = __file_get_blob_size(file); 1910 1911 if ((offset + length) > blob_size) { 1912 struct spdk_fs_cb_args extend_args = {}; 1913 1914 cluster_sz = file->fs->bs_opts.cluster_sz; 1915 extend_args.sem = &channel->sem; 1916 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 1917 extend_args.file = file; 1918 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 1919 pthread_spin_unlock(&file->lock); 1920 file->fs->send_request(__file_extend_blob, &extend_args); 1921 sem_wait(&channel->sem); 1922 } 1923 1924 last = file->last; 1925 rem_length = length; 1926 cur_payload = payload; 1927 while (rem_length > 0) { 1928 copy = last->buf_size - last->bytes_filled; 1929 if (copy > rem_length) { 1930 copy = rem_length; 1931 } 1932 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 1933 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 1934 file->append_pos += copy; 1935 if (file->length < file->append_pos) { 1936 file->length = file->append_pos; 1937 } 1938 cur_payload += copy; 1939 last->bytes_filled += copy; 1940 rem_length -= copy; 1941 if (last->bytes_filled == last->buf_size) { 1942 cache_buffers_filled++; 1943 last = cache_append_buffer(file); 1944 if (last == NULL) { 1945 BLOBFS_TRACE(file, "nomem\n"); 1946 pthread_spin_unlock(&file->lock); 1947 return -ENOMEM; 1948 } 1949 } 1950 } 1951 1952 if (cache_buffers_filled == 0) { 1953 pthread_spin_unlock(&file->lock); 1954 return 0; 1955 } 1956 1957 args = calloc(1, sizeof(*args)); 1958 if (args == NULL) { 1959 pthread_spin_unlock(&file->lock); 1960 return -ENOMEM; 1961 } 1962 1963 args->file = file; 1964 file->fs->send_request(__file_flush, args); 1965 pthread_spin_unlock(&file->lock); 1966 return 0; 1967 } 1968 1969 static void 1970 __readahead_done(void *arg, int bserrno) 1971 { 1972 struct spdk_fs_cb_args *args = arg; 1973 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 1974 struct spdk_file *file = args->file; 1975 1976 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 1977 1978 pthread_spin_lock(&file->lock); 1979 cache_buffer->bytes_filled = args->op.readahead.length; 1980 cache_buffer->bytes_flushed = args->op.readahead.length; 1981 cache_buffer->in_progress = false; 1982 pthread_spin_unlock(&file->lock); 1983 1984 __free_args(args); 1985 } 1986 1987 static void 1988 __readahead(void *_args) 1989 { 1990 struct spdk_fs_cb_args *args = _args; 1991 struct spdk_file *file = args->file; 1992 uint64_t offset, length, start_page, num_pages; 1993 uint32_t page_size; 1994 1995 offset = args->op.readahead.offset; 1996 length = args->op.readahead.length; 1997 assert(length > 0); 1998 1999 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 2000 2001 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2002 offset, length, start_page, num_pages); 2003 spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2004 args->op.readahead.cache_buffer->buf, 2005 start_page, num_pages, 2006 __readahead_done, args); 2007 } 2008 2009 static uint64_t 2010 __next_cache_buffer_offset(uint64_t offset) 2011 { 2012 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2013 } 2014 2015 static void 2016 check_readahead(struct spdk_file *file, uint64_t offset) 2017 { 2018 struct spdk_fs_cb_args *args; 2019 2020 offset = __next_cache_buffer_offset(offset); 2021 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2022 return; 2023 } 2024 2025 args = calloc(1, sizeof(*args)); 2026 if (args == NULL) { 2027 return; 2028 } 2029 2030 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2031 2032 args->file = file; 2033 args->op.readahead.offset = offset; 2034 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2035 args->op.readahead.cache_buffer->in_progress = true; 2036 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2037 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2038 } else { 2039 args->op.readahead.length = CACHE_BUFFER_SIZE; 2040 } 2041 file->fs->send_request(__readahead, args); 2042 } 2043 2044 static int 2045 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2046 { 2047 struct cache_buffer *buf; 2048 int rc; 2049 2050 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2051 if (buf == NULL) { 2052 pthread_spin_unlock(&file->lock); 2053 rc = __send_rw_from_file(file, sem, payload, offset, length, true); 2054 pthread_spin_lock(&file->lock); 2055 return rc; 2056 } 2057 2058 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2059 length = buf->offset + buf->bytes_filled - offset; 2060 } 2061 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2062 memcpy(payload, &buf->buf[offset - buf->offset], length); 2063 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2064 pthread_spin_lock(&g_caches_lock); 2065 spdk_tree_remove_buffer(file->tree, buf); 2066 if (file->tree->present_mask == 0) { 2067 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2068 } 2069 pthread_spin_unlock(&g_caches_lock); 2070 } 2071 2072 sem_post(sem); 2073 return 0; 2074 } 2075 2076 int64_t 2077 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2078 void *payload, uint64_t offset, uint64_t length) 2079 { 2080 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2081 uint64_t final_offset, final_length; 2082 uint32_t sub_reads = 0; 2083 int rc = 0; 2084 2085 pthread_spin_lock(&file->lock); 2086 2087 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2088 2089 file->open_for_writing = false; 2090 2091 if (length == 0 || offset >= file->append_pos) { 2092 pthread_spin_unlock(&file->lock); 2093 return 0; 2094 } 2095 2096 if (offset + length > file->append_pos) { 2097 length = file->append_pos - offset; 2098 } 2099 2100 if (offset != file->next_seq_offset) { 2101 file->seq_byte_count = 0; 2102 } 2103 file->seq_byte_count += length; 2104 file->next_seq_offset = offset + length; 2105 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2106 check_readahead(file, offset); 2107 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2108 } 2109 2110 final_length = 0; 2111 final_offset = offset + length; 2112 while (offset < final_offset) { 2113 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2114 if (length > (final_offset - offset)) { 2115 length = final_offset - offset; 2116 } 2117 rc = __file_read(file, payload, offset, length, &channel->sem); 2118 if (rc == 0) { 2119 final_length += length; 2120 } else { 2121 break; 2122 } 2123 payload += length; 2124 offset += length; 2125 sub_reads++; 2126 } 2127 pthread_spin_unlock(&file->lock); 2128 while (sub_reads-- > 0) { 2129 sem_wait(&channel->sem); 2130 } 2131 if (rc == 0) { 2132 return final_length; 2133 } else { 2134 return rc; 2135 } 2136 } 2137 2138 static void 2139 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2140 spdk_file_op_complete cb_fn, void *cb_arg) 2141 { 2142 struct spdk_fs_request *sync_req; 2143 struct spdk_fs_request *flush_req; 2144 struct spdk_fs_cb_args *sync_args; 2145 struct spdk_fs_cb_args *flush_args; 2146 2147 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2148 2149 pthread_spin_lock(&file->lock); 2150 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2151 BLOBFS_TRACE(file, "done - no data to flush\n"); 2152 pthread_spin_unlock(&file->lock); 2153 cb_fn(cb_arg, 0); 2154 return; 2155 } 2156 2157 sync_req = alloc_fs_request(channel); 2158 assert(sync_req != NULL); 2159 sync_args = &sync_req->args; 2160 2161 flush_req = alloc_fs_request(channel); 2162 assert(flush_req != NULL); 2163 flush_args = &flush_req->args; 2164 2165 sync_args->file = file; 2166 sync_args->fn.file_op = cb_fn; 2167 sync_args->arg = cb_arg; 2168 sync_args->op.sync.offset = file->append_pos; 2169 sync_args->op.sync.xattr_in_progress = false; 2170 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2171 pthread_spin_unlock(&file->lock); 2172 2173 flush_args->file = file; 2174 channel->send_request(__file_flush, flush_args); 2175 } 2176 2177 int 2178 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2179 { 2180 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2181 2182 _file_sync(file, channel, __sem_post, &channel->sem); 2183 sem_wait(&channel->sem); 2184 2185 return 0; 2186 } 2187 2188 void 2189 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2190 spdk_file_op_complete cb_fn, void *cb_arg) 2191 { 2192 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2193 2194 _file_sync(file, channel, cb_fn, cb_arg); 2195 } 2196 2197 void 2198 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2199 { 2200 BLOBFS_TRACE(file, "priority=%u\n", priority); 2201 file->priority = priority; 2202 2203 } 2204 2205 /* 2206 * Close routines 2207 */ 2208 2209 static void 2210 __file_close_async_done(void *ctx, int bserrno) 2211 { 2212 struct spdk_fs_request *req = ctx; 2213 struct spdk_fs_cb_args *args = &req->args; 2214 2215 args->fn.file_op(args->arg, bserrno); 2216 free_fs_request(req); 2217 } 2218 2219 static void 2220 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2221 { 2222 pthread_spin_lock(&file->lock); 2223 if (file->ref_count == 0) { 2224 pthread_spin_unlock(&file->lock); 2225 __file_close_async_done(req, -EBADF); 2226 return; 2227 } 2228 2229 file->ref_count--; 2230 if (file->ref_count > 0) { 2231 pthread_spin_unlock(&file->lock); 2232 __file_close_async_done(req, 0); 2233 return; 2234 } 2235 2236 pthread_spin_unlock(&file->lock); 2237 2238 spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req); 2239 } 2240 2241 static void 2242 __file_close_async__sync_done(void *arg, int fserrno) 2243 { 2244 struct spdk_fs_request *req = arg; 2245 struct spdk_fs_cb_args *args = &req->args; 2246 2247 __file_close_async(args->file, req); 2248 } 2249 2250 void 2251 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2252 { 2253 struct spdk_fs_request *req; 2254 struct spdk_fs_cb_args *args; 2255 2256 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2257 if (req == NULL) { 2258 cb_fn(cb_arg, -ENOMEM); 2259 return; 2260 } 2261 2262 args = &req->args; 2263 args->file = file; 2264 args->fn.file_op = cb_fn; 2265 args->arg = cb_arg; 2266 2267 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2268 } 2269 2270 static void 2271 __file_close_done(void *arg, int fserrno) 2272 { 2273 struct spdk_fs_cb_args *args = arg; 2274 2275 args->rc = fserrno; 2276 sem_post(args->sem); 2277 } 2278 2279 static void 2280 __file_close(void *arg) 2281 { 2282 struct spdk_fs_request *req = arg; 2283 struct spdk_fs_cb_args *args = &req->args; 2284 struct spdk_file *file = args->file; 2285 2286 __file_close_async(file, req); 2287 } 2288 2289 int 2290 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2291 { 2292 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2293 struct spdk_fs_request *req; 2294 struct spdk_fs_cb_args *args; 2295 2296 req = alloc_fs_request(channel); 2297 assert(req != NULL); 2298 2299 args = &req->args; 2300 2301 spdk_file_sync(file, _channel); 2302 BLOBFS_TRACE(file, "name=%s\n", file->name); 2303 args->file = file; 2304 args->sem = &channel->sem; 2305 args->fn.file_op = __file_close_done; 2306 args->arg = req; 2307 channel->send_request(__file_close, req); 2308 sem_wait(&channel->sem); 2309 2310 return args->rc; 2311 } 2312 2313 static void 2314 cache_free_buffers(struct spdk_file *file) 2315 { 2316 BLOBFS_TRACE(file, "free=%s\n", file->name); 2317 pthread_spin_lock(&file->lock); 2318 pthread_spin_lock(&g_caches_lock); 2319 if (file->tree->present_mask == 0) { 2320 pthread_spin_unlock(&g_caches_lock); 2321 pthread_spin_unlock(&file->lock); 2322 return; 2323 } 2324 spdk_tree_free_buffers(file->tree); 2325 if (file->tree->present_mask == 0) { 2326 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2327 } 2328 file->last = NULL; 2329 pthread_spin_unlock(&g_caches_lock); 2330 pthread_spin_unlock(&file->lock); 2331 } 2332 2333 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS); 2334 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW); 2335