1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "blobfs_internal.h" 38 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/assert.h" 42 #include "spdk/env.h" 43 #include "spdk/util.h" 44 #include "spdk_internal/log.h" 45 46 #define BLOBFS_TRACE(file, str, args...) \ 47 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args) 48 49 #define BLOBFS_TRACE_RW(file, str, args...) \ 50 SPDK_TRACELOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args) 51 52 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 53 54 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 55 static struct spdk_mempool *g_cache_pool; 56 static TAILQ_HEAD(, spdk_file) g_caches; 57 static int g_fs_count = 0; 58 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 59 static pthread_spinlock_t g_caches_lock; 60 61 static void 62 __sem_post(void *arg, int bserrno) 63 { 64 sem_t *sem = arg; 65 66 sem_post(sem); 67 } 68 69 void 70 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 71 { 72 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 73 free(cache_buffer); 74 } 75 76 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 77 78 struct spdk_file { 79 struct spdk_filesystem *fs; 80 struct spdk_blob *blob; 81 char *name; 82 uint64_t length; 83 bool open_for_writing; 84 uint64_t length_flushed; 85 uint64_t append_pos; 86 uint64_t seq_byte_count; 87 uint64_t next_seq_offset; 88 uint32_t priority; 89 TAILQ_ENTRY(spdk_file) tailq; 90 spdk_blob_id blobid; 91 uint32_t ref_count; 92 pthread_spinlock_t lock; 93 struct cache_buffer *last; 94 struct cache_tree *tree; 95 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 96 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 97 TAILQ_ENTRY(spdk_file) cache_tailq; 98 }; 99 100 struct spdk_filesystem { 101 struct spdk_blob_store *bs; 102 TAILQ_HEAD(, spdk_file) files; 103 struct spdk_bs_opts bs_opts; 104 struct spdk_bs_dev *bdev; 105 fs_send_request_fn send_request; 106 107 struct { 108 uint32_t max_ops; 109 struct spdk_io_channel *sync_io_channel; 110 struct spdk_fs_channel *sync_fs_channel; 111 } sync_target; 112 113 struct { 114 uint32_t max_ops; 115 struct spdk_io_channel *md_io_channel; 116 struct spdk_fs_channel *md_fs_channel; 117 } md_target; 118 119 struct { 120 uint32_t max_ops; 121 } io_target; 122 }; 123 124 struct spdk_fs_cb_args { 125 union { 126 spdk_fs_op_with_handle_complete fs_op_with_handle; 127 spdk_fs_op_complete fs_op; 128 spdk_file_op_with_handle_complete file_op_with_handle; 129 spdk_file_op_complete file_op; 130 spdk_file_stat_op_complete stat_op; 131 } fn; 132 void *arg; 133 sem_t *sem; 134 struct spdk_filesystem *fs; 135 struct spdk_file *file; 136 int rc; 137 bool from_request; 138 union { 139 struct { 140 uint64_t length; 141 } truncate; 142 struct { 143 struct spdk_io_channel *channel; 144 void *user_buf; 145 void *pin_buf; 146 int is_read; 147 off_t offset; 148 size_t length; 149 uint64_t start_page; 150 uint64_t num_pages; 151 uint32_t blocklen; 152 } rw; 153 struct { 154 const char *old_name; 155 const char *new_name; 156 } rename; 157 struct { 158 struct cache_buffer *cache_buffer; 159 uint64_t length; 160 } flush; 161 struct { 162 struct cache_buffer *cache_buffer; 163 uint64_t length; 164 uint64_t offset; 165 } readahead; 166 struct { 167 uint64_t offset; 168 TAILQ_ENTRY(spdk_fs_request) tailq; 169 bool xattr_in_progress; 170 } sync; 171 struct { 172 uint32_t num_clusters; 173 } resize; 174 struct { 175 const char *name; 176 uint32_t flags; 177 TAILQ_ENTRY(spdk_fs_request) tailq; 178 } open; 179 struct { 180 const char *name; 181 } create; 182 struct { 183 const char *name; 184 } delete; 185 struct { 186 const char *name; 187 } stat; 188 } op; 189 }; 190 191 static void cache_free_buffers(struct spdk_file *file); 192 193 static void 194 __initialize_cache(void) 195 { 196 assert(g_cache_pool == NULL); 197 198 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 199 g_fs_cache_size / CACHE_BUFFER_SIZE, 200 CACHE_BUFFER_SIZE, -1, SPDK_ENV_SOCKET_ID_ANY); 201 TAILQ_INIT(&g_caches); 202 pthread_spin_init(&g_caches_lock, 0); 203 } 204 205 static void 206 __free_cache(void) 207 { 208 assert(g_cache_pool != NULL); 209 210 spdk_mempool_free(g_cache_pool); 211 g_cache_pool = NULL; 212 } 213 214 static uint64_t 215 __file_get_blob_size(struct spdk_file *file) 216 { 217 uint64_t cluster_sz; 218 219 cluster_sz = file->fs->bs_opts.cluster_sz; 220 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 221 } 222 223 struct spdk_fs_request { 224 struct spdk_fs_cb_args args; 225 TAILQ_ENTRY(spdk_fs_request) link; 226 struct spdk_fs_channel *channel; 227 }; 228 229 struct spdk_fs_channel { 230 struct spdk_fs_request *req_mem; 231 TAILQ_HEAD(, spdk_fs_request) reqs; 232 sem_t sem; 233 struct spdk_filesystem *fs; 234 struct spdk_io_channel *bs_channel; 235 fs_send_request_fn send_request; 236 bool sync; 237 pthread_spinlock_t lock; 238 }; 239 240 static struct spdk_fs_request * 241 alloc_fs_request(struct spdk_fs_channel *channel) 242 { 243 struct spdk_fs_request *req; 244 245 if (channel->sync) { 246 pthread_spin_lock(&channel->lock); 247 } 248 249 req = TAILQ_FIRST(&channel->reqs); 250 if (req) { 251 TAILQ_REMOVE(&channel->reqs, req, link); 252 } 253 254 if (channel->sync) { 255 pthread_spin_unlock(&channel->lock); 256 } 257 258 if (req == NULL) { 259 return NULL; 260 } 261 memset(req, 0, sizeof(*req)); 262 req->channel = channel; 263 req->args.from_request = true; 264 265 return req; 266 } 267 268 static void 269 free_fs_request(struct spdk_fs_request *req) 270 { 271 struct spdk_fs_channel *channel = req->channel; 272 273 if (channel->sync) { 274 pthread_spin_lock(&channel->lock); 275 } 276 277 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 278 279 if (channel->sync) { 280 pthread_spin_unlock(&channel->lock); 281 } 282 } 283 284 static int 285 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 286 uint32_t max_ops) 287 { 288 uint32_t i; 289 290 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 291 if (!channel->req_mem) { 292 return -1; 293 } 294 295 TAILQ_INIT(&channel->reqs); 296 sem_init(&channel->sem, 0, 0); 297 298 for (i = 0; i < max_ops; i++) { 299 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 300 } 301 302 channel->fs = fs; 303 304 return 0; 305 } 306 307 static int 308 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 309 { 310 struct spdk_filesystem *fs; 311 struct spdk_fs_channel *channel = ctx_buf; 312 313 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 314 315 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 316 } 317 318 static int 319 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 320 { 321 struct spdk_filesystem *fs; 322 struct spdk_fs_channel *channel = ctx_buf; 323 324 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 325 326 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 327 } 328 329 static int 330 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 331 { 332 struct spdk_filesystem *fs; 333 struct spdk_fs_channel *channel = ctx_buf; 334 335 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 336 337 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 338 } 339 340 static void 341 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 342 { 343 struct spdk_fs_channel *channel = ctx_buf; 344 345 free(channel->req_mem); 346 if (channel->bs_channel != NULL) { 347 spdk_bs_free_io_channel(channel->bs_channel); 348 } 349 } 350 351 static void 352 __send_request_direct(fs_request_fn fn, void *arg) 353 { 354 fn(arg); 355 } 356 357 static void 358 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 359 { 360 fs->bs = bs; 361 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 362 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 363 fs->md_target.md_fs_channel->send_request = __send_request_direct; 364 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 365 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 366 367 pthread_mutex_lock(&g_cache_init_lock); 368 if (g_fs_count == 0) { 369 __initialize_cache(); 370 } 371 g_fs_count++; 372 pthread_mutex_unlock(&g_cache_init_lock); 373 } 374 375 static void 376 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 377 { 378 struct spdk_fs_request *req = ctx; 379 struct spdk_fs_cb_args *args = &req->args; 380 struct spdk_filesystem *fs = args->fs; 381 382 if (bserrno == 0) { 383 common_fs_bs_init(fs, bs); 384 } else { 385 free(fs); 386 fs = NULL; 387 } 388 389 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 390 free_fs_request(req); 391 } 392 393 static struct spdk_filesystem * 394 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 395 { 396 struct spdk_filesystem *fs; 397 398 fs = calloc(1, sizeof(*fs)); 399 if (fs == NULL) { 400 return NULL; 401 } 402 403 fs->bdev = dev; 404 fs->send_request = send_request_fn; 405 TAILQ_INIT(&fs->files); 406 407 fs->md_target.max_ops = 512; 408 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 409 sizeof(struct spdk_fs_channel)); 410 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 411 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 412 413 fs->sync_target.max_ops = 512; 414 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 415 sizeof(struct spdk_fs_channel)); 416 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 417 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 418 419 fs->io_target.max_ops = 512; 420 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 421 sizeof(struct spdk_fs_channel)); 422 423 return fs; 424 } 425 426 void 427 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 428 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 429 { 430 struct spdk_filesystem *fs; 431 struct spdk_fs_request *req; 432 struct spdk_fs_cb_args *args; 433 434 fs = fs_alloc(dev, send_request_fn); 435 if (fs == NULL) { 436 cb_fn(cb_arg, NULL, -ENOMEM); 437 return; 438 } 439 440 req = alloc_fs_request(fs->md_target.md_fs_channel); 441 if (req == NULL) { 442 spdk_put_io_channel(fs->md_target.md_io_channel); 443 spdk_io_device_unregister(&fs->md_target, NULL); 444 spdk_put_io_channel(fs->sync_target.sync_io_channel); 445 spdk_io_device_unregister(&fs->sync_target, NULL); 446 spdk_io_device_unregister(&fs->io_target, NULL); 447 free(fs); 448 cb_fn(cb_arg, NULL, -ENOMEM); 449 return; 450 } 451 452 args = &req->args; 453 args->fn.fs_op_with_handle = cb_fn; 454 args->arg = cb_arg; 455 args->fs = fs; 456 457 spdk_bs_init(dev, NULL, init_cb, req); 458 } 459 460 static struct spdk_file * 461 file_alloc(struct spdk_filesystem *fs) 462 { 463 struct spdk_file *file; 464 465 file = calloc(1, sizeof(*file)); 466 if (file == NULL) { 467 return NULL; 468 } 469 470 file->tree = calloc(1, sizeof(*file->tree)); 471 if (file->tree == NULL) { 472 free(file); 473 return NULL; 474 } 475 476 file->fs = fs; 477 TAILQ_INIT(&file->open_requests); 478 TAILQ_INIT(&file->sync_requests); 479 pthread_spin_init(&file->lock, 0); 480 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 481 file->priority = SPDK_FILE_PRIORITY_LOW; 482 return file; 483 } 484 485 static void 486 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 487 { 488 struct spdk_fs_request *req = ctx; 489 struct spdk_fs_cb_args *args = &req->args; 490 struct spdk_filesystem *fs = args->fs; 491 struct spdk_file *f; 492 uint64_t *length; 493 const char *name; 494 size_t value_len; 495 496 if (rc == -ENOENT) { 497 /* Finished iterating */ 498 args->fn.fs_op_with_handle(args->arg, fs, 0); 499 free_fs_request(req); 500 return; 501 } else if (rc < 0) { 502 args->fn.fs_op_with_handle(args->arg, fs, rc); 503 free_fs_request(req); 504 return; 505 } 506 507 rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len); 508 if (rc < 0) { 509 args->fn.fs_op_with_handle(args->arg, fs, rc); 510 free_fs_request(req); 511 return; 512 } 513 514 rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len); 515 if (rc < 0) { 516 args->fn.fs_op_with_handle(args->arg, fs, rc); 517 free_fs_request(req); 518 return; 519 } 520 assert(value_len == 8); 521 522 f = file_alloc(fs); 523 if (f == NULL) { 524 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 525 free_fs_request(req); 526 return; 527 } 528 529 f->name = strdup(name); 530 f->blobid = spdk_blob_get_id(blob); 531 f->length = *length; 532 f->length_flushed = *length; 533 f->append_pos = *length; 534 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 535 536 spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req); 537 } 538 539 static void 540 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 541 { 542 struct spdk_fs_request *req = ctx; 543 struct spdk_fs_cb_args *args = &req->args; 544 struct spdk_filesystem *fs = args->fs; 545 546 if (bserrno != 0) { 547 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 548 free_fs_request(req); 549 free(fs); 550 return; 551 } 552 553 common_fs_bs_init(fs, bs); 554 spdk_bs_md_iter_first(fs->bs, iter_cb, req); 555 } 556 557 void 558 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 559 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 560 { 561 struct spdk_filesystem *fs; 562 struct spdk_fs_cb_args *args; 563 struct spdk_fs_request *req; 564 565 fs = fs_alloc(dev, send_request_fn); 566 if (fs == NULL) { 567 cb_fn(cb_arg, NULL, -ENOMEM); 568 return; 569 } 570 571 req = alloc_fs_request(fs->md_target.md_fs_channel); 572 if (req == NULL) { 573 spdk_put_io_channel(fs->md_target.md_io_channel); 574 spdk_io_device_unregister(&fs->md_target, NULL); 575 spdk_put_io_channel(fs->sync_target.sync_io_channel); 576 spdk_io_device_unregister(&fs->sync_target, NULL); 577 spdk_io_device_unregister(&fs->io_target, NULL); 578 free(fs); 579 cb_fn(cb_arg, NULL, -ENOMEM); 580 return; 581 } 582 583 args = &req->args; 584 args->fn.fs_op_with_handle = cb_fn; 585 args->arg = cb_arg; 586 args->fs = fs; 587 588 spdk_bs_load(dev, load_cb, req); 589 } 590 591 static void 592 unload_cb(void *ctx, int bserrno) 593 { 594 struct spdk_fs_request *req = ctx; 595 struct spdk_fs_cb_args *args = &req->args; 596 struct spdk_filesystem *fs = args->fs; 597 598 pthread_mutex_lock(&g_cache_init_lock); 599 g_fs_count--; 600 if (g_fs_count == 0) { 601 __free_cache(); 602 } 603 pthread_mutex_unlock(&g_cache_init_lock); 604 605 args->fn.fs_op(args->arg, bserrno); 606 free(req); 607 608 spdk_io_device_unregister(&fs->io_target, NULL); 609 spdk_io_device_unregister(&fs->sync_target, NULL); 610 spdk_io_device_unregister(&fs->md_target, NULL); 611 612 free(fs); 613 } 614 615 void 616 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 617 { 618 struct spdk_fs_request *req; 619 struct spdk_fs_cb_args *args; 620 621 /* 622 * We must free the md_channel before unloading the blobstore, so just 623 * allocate this request from the general heap. 624 */ 625 req = calloc(1, sizeof(*req)); 626 if (req == NULL) { 627 cb_fn(cb_arg, -ENOMEM); 628 return; 629 } 630 631 args = &req->args; 632 args->fn.fs_op = cb_fn; 633 args->arg = cb_arg; 634 args->fs = fs; 635 636 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 637 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 638 spdk_bs_unload(fs->bs, unload_cb, req); 639 } 640 641 static struct spdk_file * 642 fs_find_file(struct spdk_filesystem *fs, const char *name) 643 { 644 struct spdk_file *file; 645 646 TAILQ_FOREACH(file, &fs->files, tailq) { 647 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 648 return file; 649 } 650 } 651 652 return NULL; 653 } 654 655 void 656 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 657 spdk_file_stat_op_complete cb_fn, void *cb_arg) 658 { 659 struct spdk_file_stat stat; 660 struct spdk_file *f = NULL; 661 662 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 663 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 664 return; 665 } 666 667 f = fs_find_file(fs, name); 668 if (f != NULL) { 669 stat.blobid = f->blobid; 670 stat.size = f->length; 671 cb_fn(cb_arg, &stat, 0); 672 return; 673 } 674 675 cb_fn(cb_arg, NULL, -ENOENT); 676 } 677 678 static void 679 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 680 { 681 struct spdk_fs_request *req = arg; 682 struct spdk_fs_cb_args *args = &req->args; 683 684 args->rc = fserrno; 685 if (fserrno == 0) { 686 memcpy(args->arg, stat, sizeof(*stat)); 687 } 688 sem_post(args->sem); 689 } 690 691 static void 692 __file_stat(void *arg) 693 { 694 struct spdk_fs_request *req = arg; 695 struct spdk_fs_cb_args *args = &req->args; 696 697 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 698 args->fn.stat_op, req); 699 } 700 701 int 702 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 703 const char *name, struct spdk_file_stat *stat) 704 { 705 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 706 struct spdk_fs_request *req; 707 int rc; 708 709 req = alloc_fs_request(channel); 710 assert(req != NULL); 711 712 req->args.fs = fs; 713 req->args.op.stat.name = name; 714 req->args.fn.stat_op = __copy_stat; 715 req->args.arg = stat; 716 req->args.sem = &channel->sem; 717 channel->send_request(__file_stat, req); 718 sem_wait(&channel->sem); 719 720 rc = req->args.rc; 721 free_fs_request(req); 722 723 return rc; 724 } 725 726 static void 727 fs_create_blob_close_cb(void *ctx, int bserrno) 728 { 729 struct spdk_fs_request *req = ctx; 730 struct spdk_fs_cb_args *args = &req->args; 731 732 args->fn.file_op(args->arg, bserrno); 733 free_fs_request(req); 734 } 735 736 static void 737 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 738 { 739 struct spdk_fs_request *req = ctx; 740 struct spdk_fs_cb_args *args = &req->args; 741 struct spdk_file *f = args->file; 742 uint64_t length = 0; 743 744 f->blob = blob; 745 spdk_bs_md_resize_blob(blob, 1); 746 spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 747 spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length)); 748 749 spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args); 750 } 751 752 static void 753 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 754 { 755 struct spdk_fs_request *req = ctx; 756 struct spdk_fs_cb_args *args = &req->args; 757 struct spdk_file *f = args->file; 758 759 f->blobid = blobid; 760 spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 761 } 762 763 void 764 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 765 spdk_file_op_complete cb_fn, void *cb_arg) 766 { 767 struct spdk_file *file; 768 struct spdk_fs_request *req; 769 struct spdk_fs_cb_args *args; 770 771 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 772 cb_fn(cb_arg, -ENAMETOOLONG); 773 return; 774 } 775 776 file = fs_find_file(fs, name); 777 if (file != NULL) { 778 cb_fn(cb_arg, -EEXIST); 779 return; 780 } 781 782 file = file_alloc(fs); 783 if (file == NULL) { 784 cb_fn(cb_arg, -ENOMEM); 785 return; 786 } 787 788 req = alloc_fs_request(fs->md_target.md_fs_channel); 789 if (req == NULL) { 790 cb_fn(cb_arg, -ENOMEM); 791 return; 792 } 793 794 args = &req->args; 795 args->file = file; 796 args->fn.file_op = cb_fn; 797 args->arg = cb_arg; 798 799 file->name = strdup(name); 800 spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args); 801 } 802 803 static void 804 __fs_create_file_done(void *arg, int fserrno) 805 { 806 struct spdk_fs_request *req = arg; 807 struct spdk_fs_cb_args *args = &req->args; 808 809 args->rc = fserrno; 810 sem_post(args->sem); 811 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 812 } 813 814 static void 815 __fs_create_file(void *arg) 816 { 817 struct spdk_fs_request *req = arg; 818 struct spdk_fs_cb_args *args = &req->args; 819 820 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 821 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 822 } 823 824 int 825 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 826 { 827 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 828 struct spdk_fs_request *req; 829 struct spdk_fs_cb_args *args; 830 int rc; 831 832 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 833 834 req = alloc_fs_request(channel); 835 assert(req != NULL); 836 837 args = &req->args; 838 args->fs = fs; 839 args->op.create.name = name; 840 args->sem = &channel->sem; 841 fs->send_request(__fs_create_file, req); 842 sem_wait(&channel->sem); 843 rc = args->rc; 844 free_fs_request(req); 845 846 return rc; 847 } 848 849 static void 850 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 851 { 852 struct spdk_fs_request *req = ctx; 853 struct spdk_fs_cb_args *args = &req->args; 854 struct spdk_file *f = args->file; 855 856 f->blob = blob; 857 while (!TAILQ_EMPTY(&f->open_requests)) { 858 req = TAILQ_FIRST(&f->open_requests); 859 args = &req->args; 860 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 861 args->fn.file_op_with_handle(args->arg, f, bserrno); 862 free_fs_request(req); 863 } 864 } 865 866 static void 867 fs_open_blob_create_cb(void *ctx, int bserrno) 868 { 869 struct spdk_fs_request *req = ctx; 870 struct spdk_fs_cb_args *args = &req->args; 871 struct spdk_file *file = args->file; 872 struct spdk_filesystem *fs = args->fs; 873 874 if (file == NULL) { 875 /* 876 * This is from an open with CREATE flag - the file 877 * is now created so look it up in the file list for this 878 * filesystem. 879 */ 880 file = fs_find_file(fs, args->op.open.name); 881 assert(file != NULL); 882 args->file = file; 883 } 884 885 file->ref_count++; 886 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 887 if (file->ref_count == 1) { 888 assert(file->blob == NULL); 889 spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 890 } else if (file->blob != NULL) { 891 fs_open_blob_done(req, file->blob, 0); 892 } else { 893 /* 894 * The blob open for this file is in progress due to a previous 895 * open request. When that open completes, it will invoke the 896 * open callback for this request. 897 */ 898 } 899 } 900 901 void 902 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 903 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 904 { 905 struct spdk_file *f = NULL; 906 struct spdk_fs_request *req; 907 struct spdk_fs_cb_args *args; 908 909 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 910 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 911 return; 912 } 913 914 f = fs_find_file(fs, name); 915 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 916 cb_fn(cb_arg, NULL, -ENOENT); 917 return; 918 } 919 920 req = alloc_fs_request(fs->md_target.md_fs_channel); 921 if (req == NULL) { 922 cb_fn(cb_arg, NULL, -ENOMEM); 923 return; 924 } 925 926 args = &req->args; 927 args->fn.file_op_with_handle = cb_fn; 928 args->arg = cb_arg; 929 args->file = f; 930 args->fs = fs; 931 args->op.open.name = name; 932 933 if (f == NULL) { 934 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 935 } else { 936 fs_open_blob_create_cb(req, 0); 937 } 938 } 939 940 static void 941 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 942 { 943 struct spdk_fs_request *req = arg; 944 struct spdk_fs_cb_args *args = &req->args; 945 946 args->file = file; 947 args->rc = bserrno; 948 sem_post(args->sem); 949 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 950 } 951 952 static void 953 __fs_open_file(void *arg) 954 { 955 struct spdk_fs_request *req = arg; 956 struct spdk_fs_cb_args *args = &req->args; 957 958 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 959 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 960 __fs_open_file_done, req); 961 } 962 963 int 964 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 965 const char *name, uint32_t flags, struct spdk_file **file) 966 { 967 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 968 struct spdk_fs_request *req; 969 struct spdk_fs_cb_args *args; 970 int rc; 971 972 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 973 974 req = alloc_fs_request(channel); 975 assert(req != NULL); 976 977 args = &req->args; 978 args->fs = fs; 979 args->op.open.name = name; 980 args->op.open.flags = flags; 981 args->sem = &channel->sem; 982 fs->send_request(__fs_open_file, req); 983 sem_wait(&channel->sem); 984 rc = args->rc; 985 if (rc == 0) { 986 *file = args->file; 987 } else { 988 *file = NULL; 989 } 990 free_fs_request(req); 991 992 return rc; 993 } 994 995 static void 996 fs_rename_blob_close_cb(void *ctx, int bserrno) 997 { 998 struct spdk_fs_request *req = ctx; 999 struct spdk_fs_cb_args *args = &req->args; 1000 1001 args->fn.fs_op(args->arg, bserrno); 1002 free_fs_request(req); 1003 } 1004 1005 static void 1006 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1007 { 1008 struct spdk_fs_request *req = ctx; 1009 struct spdk_fs_cb_args *args = &req->args; 1010 struct spdk_file *f = args->file; 1011 const char *new_name = args->op.rename.new_name; 1012 1013 f->blob = blob; 1014 spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1015 spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req); 1016 } 1017 1018 static void 1019 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1020 { 1021 struct spdk_fs_cb_args *args = &req->args; 1022 struct spdk_file *f; 1023 1024 f = fs_find_file(args->fs, args->op.rename.old_name); 1025 if (f == NULL) { 1026 args->fn.fs_op(args->arg, -ENOENT); 1027 free_fs_request(req); 1028 return; 1029 } 1030 1031 free(f->name); 1032 f->name = strdup(args->op.rename.new_name); 1033 args->file = f; 1034 spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1035 } 1036 1037 static void 1038 fs_rename_delete_done(void *arg, int fserrno) 1039 { 1040 __spdk_fs_md_rename_file(arg); 1041 } 1042 1043 void 1044 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1045 const char *old_name, const char *new_name, 1046 spdk_file_op_complete cb_fn, void *cb_arg) 1047 { 1048 struct spdk_file *f; 1049 struct spdk_fs_request *req; 1050 struct spdk_fs_cb_args *args; 1051 1052 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1053 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1054 cb_fn(cb_arg, -ENAMETOOLONG); 1055 return; 1056 } 1057 1058 req = alloc_fs_request(fs->md_target.md_fs_channel); 1059 if (req == NULL) { 1060 cb_fn(cb_arg, -ENOMEM); 1061 return; 1062 } 1063 1064 args = &req->args; 1065 args->fn.fs_op = cb_fn; 1066 args->fs = fs; 1067 args->arg = cb_arg; 1068 args->op.rename.old_name = old_name; 1069 args->op.rename.new_name = new_name; 1070 1071 f = fs_find_file(fs, new_name); 1072 if (f == NULL) { 1073 __spdk_fs_md_rename_file(req); 1074 return; 1075 } 1076 1077 /* 1078 * The rename overwrites an existing file. So delete the existing file, then 1079 * do the actual rename. 1080 */ 1081 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1082 } 1083 1084 static void 1085 __fs_rename_file_done(void *arg, int fserrno) 1086 { 1087 struct spdk_fs_request *req = arg; 1088 struct spdk_fs_cb_args *args = &req->args; 1089 1090 args->rc = fserrno; 1091 sem_post(args->sem); 1092 } 1093 1094 static void 1095 __fs_rename_file(void *arg) 1096 { 1097 struct spdk_fs_request *req = arg; 1098 struct spdk_fs_cb_args *args = &req->args; 1099 1100 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1101 __fs_rename_file_done, req); 1102 } 1103 1104 int 1105 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1106 const char *old_name, const char *new_name) 1107 { 1108 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1109 struct spdk_fs_request *req; 1110 struct spdk_fs_cb_args *args; 1111 int rc; 1112 1113 req = alloc_fs_request(channel); 1114 assert(req != NULL); 1115 1116 args = &req->args; 1117 1118 args->fs = fs; 1119 args->op.rename.old_name = old_name; 1120 args->op.rename.new_name = new_name; 1121 args->sem = &channel->sem; 1122 fs->send_request(__fs_rename_file, req); 1123 sem_wait(&channel->sem); 1124 rc = args->rc; 1125 free_fs_request(req); 1126 return rc; 1127 } 1128 1129 static void 1130 blob_delete_cb(void *ctx, int bserrno) 1131 { 1132 struct spdk_fs_request *req = ctx; 1133 struct spdk_fs_cb_args *args = &req->args; 1134 1135 args->fn.file_op(args->arg, bserrno); 1136 free_fs_request(req); 1137 } 1138 1139 void 1140 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1141 spdk_file_op_complete cb_fn, void *cb_arg) 1142 { 1143 struct spdk_file *f; 1144 spdk_blob_id blobid; 1145 struct spdk_fs_request *req; 1146 struct spdk_fs_cb_args *args; 1147 1148 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1149 1150 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1151 cb_fn(cb_arg, -ENAMETOOLONG); 1152 return; 1153 } 1154 1155 f = fs_find_file(fs, name); 1156 if (f == NULL) { 1157 cb_fn(cb_arg, -ENOENT); 1158 return; 1159 } 1160 1161 if (f->ref_count > 0) { 1162 /* For now, do not allow deleting files with open references. */ 1163 cb_fn(cb_arg, -EBUSY); 1164 return; 1165 } 1166 1167 req = alloc_fs_request(fs->md_target.md_fs_channel); 1168 if (req == NULL) { 1169 cb_fn(cb_arg, -ENOMEM); 1170 return; 1171 } 1172 1173 TAILQ_REMOVE(&fs->files, f, tailq); 1174 1175 cache_free_buffers(f); 1176 1177 blobid = f->blobid; 1178 1179 free(f->name); 1180 free(f->tree); 1181 free(f); 1182 1183 args = &req->args; 1184 args->fn.file_op = cb_fn; 1185 args->arg = cb_arg; 1186 spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1187 } 1188 1189 static void 1190 __fs_delete_file_done(void *arg, int fserrno) 1191 { 1192 struct spdk_fs_request *req = arg; 1193 struct spdk_fs_cb_args *args = &req->args; 1194 1195 args->rc = fserrno; 1196 sem_post(args->sem); 1197 } 1198 1199 static void 1200 __fs_delete_file(void *arg) 1201 { 1202 struct spdk_fs_request *req = arg; 1203 struct spdk_fs_cb_args *args = &req->args; 1204 1205 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1206 } 1207 1208 int 1209 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1210 const char *name) 1211 { 1212 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1213 struct spdk_fs_request *req; 1214 struct spdk_fs_cb_args *args; 1215 int rc; 1216 1217 req = alloc_fs_request(channel); 1218 assert(req != NULL); 1219 1220 args = &req->args; 1221 args->fs = fs; 1222 args->op.delete.name = name; 1223 args->sem = &channel->sem; 1224 fs->send_request(__fs_delete_file, req); 1225 sem_wait(&channel->sem); 1226 rc = args->rc; 1227 free_fs_request(req); 1228 1229 return rc; 1230 } 1231 1232 spdk_fs_iter 1233 spdk_fs_iter_first(struct spdk_filesystem *fs) 1234 { 1235 struct spdk_file *f; 1236 1237 f = TAILQ_FIRST(&fs->files); 1238 return f; 1239 } 1240 1241 spdk_fs_iter 1242 spdk_fs_iter_next(spdk_fs_iter iter) 1243 { 1244 struct spdk_file *f = iter; 1245 1246 if (f == NULL) { 1247 return NULL; 1248 } 1249 1250 f = TAILQ_NEXT(f, tailq); 1251 return f; 1252 } 1253 1254 const char * 1255 spdk_file_get_name(struct spdk_file *file) 1256 { 1257 return file->name; 1258 } 1259 1260 uint64_t 1261 spdk_file_get_length(struct spdk_file *file) 1262 { 1263 assert(file != NULL); 1264 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1265 return file->length; 1266 } 1267 1268 static void 1269 fs_truncate_complete_cb(void *ctx, int bserrno) 1270 { 1271 struct spdk_fs_request *req = ctx; 1272 struct spdk_fs_cb_args *args = &req->args; 1273 1274 args->fn.file_op(args->arg, bserrno); 1275 free_fs_request(req); 1276 } 1277 1278 static uint64_t 1279 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1280 { 1281 return (length + cluster_sz - 1) / cluster_sz; 1282 } 1283 1284 void 1285 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1286 spdk_file_op_complete cb_fn, void *cb_arg) 1287 { 1288 struct spdk_filesystem *fs; 1289 size_t num_clusters; 1290 struct spdk_fs_request *req; 1291 struct spdk_fs_cb_args *args; 1292 1293 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1294 if (length == file->length) { 1295 cb_fn(cb_arg, 0); 1296 return; 1297 } 1298 1299 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1300 if (req == NULL) { 1301 cb_fn(cb_arg, -ENOMEM); 1302 return; 1303 } 1304 1305 args = &req->args; 1306 args->fn.file_op = cb_fn; 1307 args->arg = cb_arg; 1308 args->file = file; 1309 fs = file->fs; 1310 1311 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1312 1313 spdk_bs_md_resize_blob(file->blob, num_clusters); 1314 spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length)); 1315 1316 file->length = length; 1317 if (file->append_pos > file->length) { 1318 file->append_pos = file->length; 1319 } 1320 1321 spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args); 1322 } 1323 1324 static void 1325 __truncate(void *arg) 1326 { 1327 struct spdk_fs_request *req = arg; 1328 struct spdk_fs_cb_args *args = &req->args; 1329 1330 spdk_file_truncate_async(args->file, args->op.truncate.length, 1331 args->fn.file_op, args->arg); 1332 } 1333 1334 void 1335 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1336 uint64_t length) 1337 { 1338 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1339 struct spdk_fs_request *req; 1340 struct spdk_fs_cb_args *args; 1341 1342 req = alloc_fs_request(channel); 1343 assert(req != NULL); 1344 1345 args = &req->args; 1346 1347 args->file = file; 1348 args->op.truncate.length = length; 1349 args->fn.file_op = __sem_post; 1350 args->arg = &channel->sem; 1351 1352 channel->send_request(__truncate, req); 1353 sem_wait(&channel->sem); 1354 free_fs_request(req); 1355 } 1356 1357 static void 1358 __rw_done(void *ctx, int bserrno) 1359 { 1360 struct spdk_fs_request *req = ctx; 1361 struct spdk_fs_cb_args *args = &req->args; 1362 1363 spdk_dma_free(args->op.rw.pin_buf); 1364 args->fn.file_op(args->arg, bserrno); 1365 free_fs_request(req); 1366 } 1367 1368 static void 1369 __read_done(void *ctx, int bserrno) 1370 { 1371 struct spdk_fs_request *req = ctx; 1372 struct spdk_fs_cb_args *args = &req->args; 1373 1374 if (args->op.rw.is_read) { 1375 memcpy(args->op.rw.user_buf, 1376 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1377 args->op.rw.length); 1378 __rw_done(req, 0); 1379 } else { 1380 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1381 args->op.rw.user_buf, 1382 args->op.rw.length); 1383 spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel, 1384 args->op.rw.pin_buf, 1385 args->op.rw.start_page, args->op.rw.num_pages, 1386 __rw_done, req); 1387 } 1388 } 1389 1390 static void 1391 __do_blob_read(void *ctx, int fserrno) 1392 { 1393 struct spdk_fs_request *req = ctx; 1394 struct spdk_fs_cb_args *args = &req->args; 1395 1396 spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel, 1397 args->op.rw.pin_buf, 1398 args->op.rw.start_page, args->op.rw.num_pages, 1399 __read_done, req); 1400 } 1401 1402 static void 1403 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1404 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1405 { 1406 uint64_t end_page; 1407 1408 *page_size = spdk_bs_get_page_size(file->fs->bs); 1409 *start_page = offset / *page_size; 1410 end_page = (offset + length - 1) / *page_size; 1411 *num_pages = (end_page - *start_page + 1); 1412 } 1413 1414 static void 1415 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1416 void *payload, uint64_t offset, uint64_t length, 1417 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1418 { 1419 struct spdk_fs_request *req; 1420 struct spdk_fs_cb_args *args; 1421 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1422 uint64_t start_page, num_pages, pin_buf_length; 1423 uint32_t page_size; 1424 1425 if (is_read && offset + length > file->length) { 1426 cb_fn(cb_arg, -EINVAL); 1427 return; 1428 } 1429 1430 req = alloc_fs_request(channel); 1431 if (req == NULL) { 1432 cb_fn(cb_arg, -ENOMEM); 1433 return; 1434 } 1435 1436 args = &req->args; 1437 args->fn.file_op = cb_fn; 1438 args->arg = cb_arg; 1439 args->file = file; 1440 args->op.rw.channel = channel->bs_channel; 1441 args->op.rw.user_buf = payload; 1442 args->op.rw.is_read = is_read; 1443 args->op.rw.offset = offset; 1444 args->op.rw.length = length; 1445 1446 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1447 pin_buf_length = num_pages * page_size; 1448 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL); 1449 1450 args->op.rw.start_page = start_page; 1451 args->op.rw.num_pages = num_pages; 1452 1453 if (!is_read && file->length < offset + length) { 1454 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1455 } else { 1456 __do_blob_read(req, 0); 1457 } 1458 } 1459 1460 void 1461 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1462 void *payload, uint64_t offset, uint64_t length, 1463 spdk_file_op_complete cb_fn, void *cb_arg) 1464 { 1465 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1466 } 1467 1468 void 1469 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1470 void *payload, uint64_t offset, uint64_t length, 1471 spdk_file_op_complete cb_fn, void *cb_arg) 1472 { 1473 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n", 1474 file->name, offset, length); 1475 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1476 } 1477 1478 struct spdk_io_channel * 1479 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1480 { 1481 struct spdk_io_channel *io_channel; 1482 struct spdk_fs_channel *fs_channel; 1483 1484 io_channel = spdk_get_io_channel(&fs->io_target); 1485 fs_channel = spdk_io_channel_get_ctx(io_channel); 1486 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1487 fs_channel->send_request = __send_request_direct; 1488 1489 return io_channel; 1490 } 1491 1492 struct spdk_io_channel * 1493 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs) 1494 { 1495 struct spdk_io_channel *io_channel; 1496 struct spdk_fs_channel *fs_channel; 1497 1498 io_channel = spdk_get_io_channel(&fs->io_target); 1499 fs_channel = spdk_io_channel_get_ctx(io_channel); 1500 fs_channel->send_request = fs->send_request; 1501 fs_channel->sync = 1; 1502 pthread_spin_init(&fs_channel->lock, 0); 1503 1504 return io_channel; 1505 } 1506 1507 void 1508 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1509 { 1510 spdk_put_io_channel(channel); 1511 } 1512 1513 void 1514 spdk_fs_set_cache_size(uint64_t size_in_mb) 1515 { 1516 g_fs_cache_size = size_in_mb * 1024 * 1024; 1517 } 1518 1519 uint64_t 1520 spdk_fs_get_cache_size(void) 1521 { 1522 return g_fs_cache_size / (1024 * 1024); 1523 } 1524 1525 static void __file_flush(void *_args); 1526 1527 static void * 1528 alloc_cache_memory_buffer(struct spdk_file *context) 1529 { 1530 struct spdk_file *file; 1531 void *buf; 1532 1533 buf = spdk_mempool_get(g_cache_pool); 1534 if (buf != NULL) { 1535 return buf; 1536 } 1537 1538 pthread_spin_lock(&g_caches_lock); 1539 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1540 if (!file->open_for_writing && 1541 file->priority == SPDK_FILE_PRIORITY_LOW && 1542 file != context) { 1543 break; 1544 } 1545 } 1546 pthread_spin_unlock(&g_caches_lock); 1547 if (file != NULL) { 1548 cache_free_buffers(file); 1549 buf = spdk_mempool_get(g_cache_pool); 1550 if (buf != NULL) { 1551 return buf; 1552 } 1553 } 1554 1555 pthread_spin_lock(&g_caches_lock); 1556 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1557 if (!file->open_for_writing && file != context) { 1558 break; 1559 } 1560 } 1561 pthread_spin_unlock(&g_caches_lock); 1562 if (file != NULL) { 1563 cache_free_buffers(file); 1564 buf = spdk_mempool_get(g_cache_pool); 1565 if (buf != NULL) { 1566 return buf; 1567 } 1568 } 1569 1570 pthread_spin_lock(&g_caches_lock); 1571 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1572 if (file != context) { 1573 break; 1574 } 1575 } 1576 pthread_spin_unlock(&g_caches_lock); 1577 if (file != NULL) { 1578 cache_free_buffers(file); 1579 buf = spdk_mempool_get(g_cache_pool); 1580 if (buf != NULL) { 1581 return buf; 1582 } 1583 } 1584 1585 return NULL; 1586 } 1587 1588 static struct cache_buffer * 1589 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1590 { 1591 struct cache_buffer *buf; 1592 int count = 0; 1593 1594 buf = calloc(1, sizeof(*buf)); 1595 if (buf == NULL) { 1596 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "calloc failed\n"); 1597 return NULL; 1598 } 1599 1600 buf->buf = alloc_cache_memory_buffer(file); 1601 while (buf->buf == NULL) { 1602 /* 1603 * TODO: alloc_cache_memory_buffer() should eventually free 1604 * some buffers. Need a more sophisticated check here, instead 1605 * of just bailing if 100 tries does not result in getting a 1606 * free buffer. This will involve using the sync channel's 1607 * semaphore to block until a buffer becomes available. 1608 */ 1609 if (count++ == 100) { 1610 SPDK_ERRLOG("could not allocate cache buffer\n"); 1611 assert(false); 1612 free(buf); 1613 return NULL; 1614 } 1615 buf->buf = alloc_cache_memory_buffer(file); 1616 } 1617 1618 buf->buf_size = CACHE_BUFFER_SIZE; 1619 buf->offset = offset; 1620 1621 pthread_spin_lock(&g_caches_lock); 1622 if (file->tree->present_mask == 0) { 1623 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1624 } 1625 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1626 pthread_spin_unlock(&g_caches_lock); 1627 1628 return buf; 1629 } 1630 1631 static struct cache_buffer * 1632 cache_append_buffer(struct spdk_file *file) 1633 { 1634 struct cache_buffer *last; 1635 1636 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1637 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1638 1639 last = cache_insert_buffer(file, file->append_pos); 1640 if (last == NULL) { 1641 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n"); 1642 return NULL; 1643 } 1644 1645 if (file->last != NULL) { 1646 file->last->next = last; 1647 } 1648 file->last = last; 1649 1650 return last; 1651 } 1652 1653 static void 1654 __wake_caller(struct spdk_fs_cb_args *args) 1655 { 1656 sem_post(args->sem); 1657 } 1658 1659 static void __check_sync_reqs(struct spdk_file *file); 1660 1661 static void 1662 __file_cache_finish_sync(struct spdk_file *file) 1663 { 1664 struct spdk_fs_request *sync_req; 1665 struct spdk_fs_cb_args *sync_args; 1666 1667 pthread_spin_lock(&file->lock); 1668 sync_req = TAILQ_FIRST(&file->sync_requests); 1669 sync_args = &sync_req->args; 1670 assert(sync_args->op.sync.offset <= file->length_flushed); 1671 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1672 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1673 pthread_spin_unlock(&file->lock); 1674 1675 sync_args->fn.file_op(sync_args->arg, 0); 1676 __check_sync_reqs(file); 1677 1678 pthread_spin_lock(&file->lock); 1679 free_fs_request(sync_req); 1680 pthread_spin_unlock(&file->lock); 1681 } 1682 1683 static void 1684 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1685 { 1686 struct spdk_file *file = ctx; 1687 1688 __file_cache_finish_sync(file); 1689 } 1690 1691 static void 1692 __free_args(struct spdk_fs_cb_args *args) 1693 { 1694 struct spdk_fs_request *req; 1695 1696 if (!args->from_request) { 1697 free(args); 1698 } else { 1699 /* Depends on args being at the start of the spdk_fs_request structure. */ 1700 req = (struct spdk_fs_request *)args; 1701 free_fs_request(req); 1702 } 1703 } 1704 1705 static void 1706 __check_sync_reqs(struct spdk_file *file) 1707 { 1708 struct spdk_fs_request *sync_req; 1709 1710 pthread_spin_lock(&file->lock); 1711 1712 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1713 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1714 break; 1715 } 1716 } 1717 1718 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1719 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1720 sync_req->args.op.sync.xattr_in_progress = true; 1721 spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed, 1722 sizeof(file->length_flushed)); 1723 1724 pthread_spin_unlock(&file->lock); 1725 spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file); 1726 } else { 1727 pthread_spin_unlock(&file->lock); 1728 } 1729 } 1730 1731 static void 1732 __file_flush_done(void *arg, int bserrno) 1733 { 1734 struct spdk_fs_cb_args *args = arg; 1735 struct spdk_file *file = args->file; 1736 struct cache_buffer *next = args->op.flush.cache_buffer; 1737 1738 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1739 1740 pthread_spin_lock(&file->lock); 1741 next->in_progress = false; 1742 next->bytes_flushed += args->op.flush.length; 1743 file->length_flushed += args->op.flush.length; 1744 if (file->length_flushed > file->length) { 1745 file->length = file->length_flushed; 1746 } 1747 if (next->bytes_flushed == next->buf_size) { 1748 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1749 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1750 } 1751 1752 /* 1753 * Assert that there is no cached data that extends past the end of the underlying 1754 * blob. 1755 */ 1756 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1757 next->bytes_filled == 0); 1758 1759 pthread_spin_unlock(&file->lock); 1760 1761 __check_sync_reqs(file); 1762 1763 __file_flush(args); 1764 } 1765 1766 static void 1767 __file_flush(void *_args) 1768 { 1769 struct spdk_fs_cb_args *args = _args; 1770 struct spdk_file *file = args->file; 1771 struct cache_buffer *next; 1772 uint64_t offset, length, start_page, num_pages; 1773 uint32_t page_size; 1774 1775 pthread_spin_lock(&file->lock); 1776 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1777 if (next == NULL || next->in_progress) { 1778 /* 1779 * There is either no data to flush, or a flush I/O is already in 1780 * progress. So return immediately - if a flush I/O is in 1781 * progress we will flush more data after that is completed. 1782 */ 1783 __free_args(args); 1784 pthread_spin_unlock(&file->lock); 1785 return; 1786 } 1787 1788 offset = next->offset + next->bytes_flushed; 1789 length = next->bytes_filled - next->bytes_flushed; 1790 if (length == 0) { 1791 __free_args(args); 1792 pthread_spin_unlock(&file->lock); 1793 return; 1794 } 1795 args->op.flush.length = length; 1796 args->op.flush.cache_buffer = next; 1797 1798 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1799 1800 next->in_progress = true; 1801 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1802 offset, length, start_page, num_pages); 1803 pthread_spin_unlock(&file->lock); 1804 spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1805 next->buf + (start_page * page_size) - next->offset, 1806 start_page, num_pages, 1807 __file_flush_done, args); 1808 } 1809 1810 static void 1811 __file_extend_done(void *arg, int bserrno) 1812 { 1813 struct spdk_fs_cb_args *args = arg; 1814 1815 __wake_caller(args); 1816 } 1817 1818 static void 1819 __file_extend_blob(void *_args) 1820 { 1821 struct spdk_fs_cb_args *args = _args; 1822 struct spdk_file *file = args->file; 1823 1824 spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters); 1825 1826 spdk_bs_md_sync_blob(file->blob, __file_extend_done, args); 1827 } 1828 1829 static void 1830 __rw_from_file_done(void *arg, int bserrno) 1831 { 1832 struct spdk_fs_cb_args *args = arg; 1833 1834 __wake_caller(args); 1835 __free_args(args); 1836 } 1837 1838 static void 1839 __rw_from_file(void *_args) 1840 { 1841 struct spdk_fs_cb_args *args = _args; 1842 struct spdk_file *file = args->file; 1843 1844 if (args->op.rw.is_read) { 1845 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1846 args->op.rw.offset, args->op.rw.length, 1847 __rw_from_file_done, args); 1848 } else { 1849 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1850 args->op.rw.offset, args->op.rw.length, 1851 __rw_from_file_done, args); 1852 } 1853 } 1854 1855 static int 1856 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1857 uint64_t offset, uint64_t length, bool is_read) 1858 { 1859 struct spdk_fs_cb_args *args; 1860 1861 args = calloc(1, sizeof(*args)); 1862 if (args == NULL) { 1863 sem_post(sem); 1864 return -ENOMEM; 1865 } 1866 1867 args->file = file; 1868 args->sem = sem; 1869 args->op.rw.user_buf = payload; 1870 args->op.rw.offset = offset; 1871 args->op.rw.length = length; 1872 args->op.rw.is_read = is_read; 1873 file->fs->send_request(__rw_from_file, args); 1874 return 0; 1875 } 1876 1877 int 1878 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 1879 void *payload, uint64_t offset, uint64_t length) 1880 { 1881 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1882 struct spdk_fs_cb_args *args; 1883 uint64_t rem_length, copy, blob_size, cluster_sz; 1884 uint32_t cache_buffers_filled = 0; 1885 uint8_t *cur_payload; 1886 struct cache_buffer *last; 1887 1888 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 1889 1890 if (length == 0) { 1891 return 0; 1892 } 1893 1894 if (offset != file->append_pos) { 1895 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 1896 return -EINVAL; 1897 } 1898 1899 pthread_spin_lock(&file->lock); 1900 file->open_for_writing = true; 1901 1902 if (file->last == NULL) { 1903 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 1904 cache_append_buffer(file); 1905 } else { 1906 int rc; 1907 1908 file->append_pos += length; 1909 pthread_spin_unlock(&file->lock); 1910 rc = __send_rw_from_file(file, &channel->sem, payload, 1911 offset, length, false); 1912 sem_wait(&channel->sem); 1913 return rc; 1914 } 1915 } 1916 1917 blob_size = __file_get_blob_size(file); 1918 1919 if ((offset + length) > blob_size) { 1920 struct spdk_fs_cb_args extend_args = {}; 1921 1922 cluster_sz = file->fs->bs_opts.cluster_sz; 1923 extend_args.sem = &channel->sem; 1924 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 1925 extend_args.file = file; 1926 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 1927 pthread_spin_unlock(&file->lock); 1928 file->fs->send_request(__file_extend_blob, &extend_args); 1929 sem_wait(&channel->sem); 1930 } 1931 1932 last = file->last; 1933 rem_length = length; 1934 cur_payload = payload; 1935 while (rem_length > 0) { 1936 copy = last->buf_size - last->bytes_filled; 1937 if (copy > rem_length) { 1938 copy = rem_length; 1939 } 1940 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 1941 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 1942 file->append_pos += copy; 1943 if (file->length < file->append_pos) { 1944 file->length = file->append_pos; 1945 } 1946 cur_payload += copy; 1947 last->bytes_filled += copy; 1948 rem_length -= copy; 1949 if (last->bytes_filled == last->buf_size) { 1950 cache_buffers_filled++; 1951 last = cache_append_buffer(file); 1952 if (last == NULL) { 1953 BLOBFS_TRACE(file, "nomem\n"); 1954 pthread_spin_unlock(&file->lock); 1955 return -ENOMEM; 1956 } 1957 } 1958 } 1959 1960 if (cache_buffers_filled == 0) { 1961 pthread_spin_unlock(&file->lock); 1962 return 0; 1963 } 1964 1965 args = calloc(1, sizeof(*args)); 1966 if (args == NULL) { 1967 pthread_spin_unlock(&file->lock); 1968 return -ENOMEM; 1969 } 1970 1971 args->file = file; 1972 file->fs->send_request(__file_flush, args); 1973 pthread_spin_unlock(&file->lock); 1974 return 0; 1975 } 1976 1977 static void 1978 __readahead_done(void *arg, int bserrno) 1979 { 1980 struct spdk_fs_cb_args *args = arg; 1981 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 1982 struct spdk_file *file = args->file; 1983 1984 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 1985 1986 pthread_spin_lock(&file->lock); 1987 cache_buffer->bytes_filled = args->op.readahead.length; 1988 cache_buffer->bytes_flushed = args->op.readahead.length; 1989 cache_buffer->in_progress = false; 1990 pthread_spin_unlock(&file->lock); 1991 1992 __free_args(args); 1993 } 1994 1995 static void 1996 __readahead(void *_args) 1997 { 1998 struct spdk_fs_cb_args *args = _args; 1999 struct spdk_file *file = args->file; 2000 uint64_t offset, length, start_page, num_pages; 2001 uint32_t page_size; 2002 2003 offset = args->op.readahead.offset; 2004 length = args->op.readahead.length; 2005 assert(length > 0); 2006 2007 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 2008 2009 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2010 offset, length, start_page, num_pages); 2011 spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2012 args->op.readahead.cache_buffer->buf, 2013 start_page, num_pages, 2014 __readahead_done, args); 2015 } 2016 2017 static uint64_t 2018 __next_cache_buffer_offset(uint64_t offset) 2019 { 2020 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2021 } 2022 2023 static void 2024 check_readahead(struct spdk_file *file, uint64_t offset) 2025 { 2026 struct spdk_fs_cb_args *args; 2027 2028 offset = __next_cache_buffer_offset(offset); 2029 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2030 return; 2031 } 2032 2033 args = calloc(1, sizeof(*args)); 2034 if (args == NULL) { 2035 return; 2036 } 2037 2038 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2039 2040 args->file = file; 2041 args->op.readahead.offset = offset; 2042 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2043 args->op.readahead.cache_buffer->in_progress = true; 2044 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2045 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2046 } else { 2047 args->op.readahead.length = CACHE_BUFFER_SIZE; 2048 } 2049 file->fs->send_request(__readahead, args); 2050 } 2051 2052 static int 2053 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2054 { 2055 struct cache_buffer *buf; 2056 int rc; 2057 2058 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2059 if (buf == NULL) { 2060 pthread_spin_unlock(&file->lock); 2061 rc = __send_rw_from_file(file, sem, payload, offset, length, true); 2062 pthread_spin_lock(&file->lock); 2063 return rc; 2064 } 2065 2066 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2067 length = buf->offset + buf->bytes_filled - offset; 2068 } 2069 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2070 memcpy(payload, &buf->buf[offset - buf->offset], length); 2071 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2072 pthread_spin_lock(&g_caches_lock); 2073 spdk_tree_remove_buffer(file->tree, buf); 2074 if (file->tree->present_mask == 0) { 2075 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2076 } 2077 pthread_spin_unlock(&g_caches_lock); 2078 } 2079 2080 sem_post(sem); 2081 return 0; 2082 } 2083 2084 int64_t 2085 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2086 void *payload, uint64_t offset, uint64_t length) 2087 { 2088 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2089 uint64_t final_offset, final_length; 2090 uint32_t sub_reads = 0; 2091 int rc = 0; 2092 2093 pthread_spin_lock(&file->lock); 2094 2095 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2096 2097 file->open_for_writing = false; 2098 2099 if (length == 0 || offset >= file->append_pos) { 2100 pthread_spin_unlock(&file->lock); 2101 return 0; 2102 } 2103 2104 if (offset + length > file->append_pos) { 2105 length = file->append_pos - offset; 2106 } 2107 2108 if (offset != file->next_seq_offset) { 2109 file->seq_byte_count = 0; 2110 } 2111 file->seq_byte_count += length; 2112 file->next_seq_offset = offset + length; 2113 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2114 check_readahead(file, offset); 2115 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2116 } 2117 2118 final_length = 0; 2119 final_offset = offset + length; 2120 while (offset < final_offset) { 2121 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2122 if (length > (final_offset - offset)) { 2123 length = final_offset - offset; 2124 } 2125 rc = __file_read(file, payload, offset, length, &channel->sem); 2126 if (rc == 0) { 2127 final_length += length; 2128 } else { 2129 break; 2130 } 2131 payload += length; 2132 offset += length; 2133 sub_reads++; 2134 } 2135 pthread_spin_unlock(&file->lock); 2136 while (sub_reads-- > 0) { 2137 sem_wait(&channel->sem); 2138 } 2139 if (rc == 0) { 2140 return final_length; 2141 } else { 2142 return rc; 2143 } 2144 } 2145 2146 static void 2147 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2148 spdk_file_op_complete cb_fn, void *cb_arg) 2149 { 2150 struct spdk_fs_request *sync_req; 2151 struct spdk_fs_request *flush_req; 2152 struct spdk_fs_cb_args *sync_args; 2153 struct spdk_fs_cb_args *flush_args; 2154 2155 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2156 2157 pthread_spin_lock(&file->lock); 2158 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2159 BLOBFS_TRACE(file, "done - no data to flush\n"); 2160 pthread_spin_unlock(&file->lock); 2161 cb_fn(cb_arg, 0); 2162 return; 2163 } 2164 2165 sync_req = alloc_fs_request(channel); 2166 assert(sync_req != NULL); 2167 sync_args = &sync_req->args; 2168 2169 flush_req = alloc_fs_request(channel); 2170 assert(flush_req != NULL); 2171 flush_args = &flush_req->args; 2172 2173 sync_args->file = file; 2174 sync_args->fn.file_op = cb_fn; 2175 sync_args->arg = cb_arg; 2176 sync_args->op.sync.offset = file->append_pos; 2177 sync_args->op.sync.xattr_in_progress = false; 2178 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2179 pthread_spin_unlock(&file->lock); 2180 2181 flush_args->file = file; 2182 channel->send_request(__file_flush, flush_args); 2183 } 2184 2185 int 2186 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2187 { 2188 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2189 2190 _file_sync(file, channel, __sem_post, &channel->sem); 2191 sem_wait(&channel->sem); 2192 2193 return 0; 2194 } 2195 2196 void 2197 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2198 spdk_file_op_complete cb_fn, void *cb_arg) 2199 { 2200 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2201 2202 _file_sync(file, channel, cb_fn, cb_arg); 2203 } 2204 2205 void 2206 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2207 { 2208 BLOBFS_TRACE(file, "priority=%u\n", priority); 2209 file->priority = priority; 2210 2211 } 2212 2213 /* 2214 * Close routines 2215 */ 2216 2217 static void 2218 __file_close_async_done(void *ctx, int bserrno) 2219 { 2220 struct spdk_fs_request *req = ctx; 2221 struct spdk_fs_cb_args *args = &req->args; 2222 2223 args->fn.file_op(args->arg, bserrno); 2224 free_fs_request(req); 2225 } 2226 2227 static void 2228 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2229 { 2230 pthread_spin_lock(&file->lock); 2231 if (file->ref_count == 0) { 2232 pthread_spin_unlock(&file->lock); 2233 __file_close_async_done(req, -EBADF); 2234 return; 2235 } 2236 2237 file->ref_count--; 2238 if (file->ref_count > 0) { 2239 pthread_spin_unlock(&file->lock); 2240 __file_close_async_done(req, 0); 2241 return; 2242 } 2243 2244 pthread_spin_unlock(&file->lock); 2245 2246 spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req); 2247 } 2248 2249 static void 2250 __file_close_async__sync_done(void *arg, int fserrno) 2251 { 2252 struct spdk_fs_request *req = arg; 2253 struct spdk_fs_cb_args *args = &req->args; 2254 2255 __file_close_async(args->file, req); 2256 } 2257 2258 void 2259 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2260 { 2261 struct spdk_fs_request *req; 2262 struct spdk_fs_cb_args *args; 2263 2264 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2265 if (req == NULL) { 2266 cb_fn(cb_arg, -ENOMEM); 2267 return; 2268 } 2269 2270 args = &req->args; 2271 args->file = file; 2272 args->fn.file_op = cb_fn; 2273 args->arg = cb_arg; 2274 2275 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2276 } 2277 2278 static void 2279 __file_close_done(void *arg, int fserrno) 2280 { 2281 struct spdk_fs_cb_args *args = arg; 2282 2283 args->rc = fserrno; 2284 sem_post(args->sem); 2285 } 2286 2287 static void 2288 __file_close(void *arg) 2289 { 2290 struct spdk_fs_request *req = arg; 2291 struct spdk_fs_cb_args *args = &req->args; 2292 struct spdk_file *file = args->file; 2293 2294 __file_close_async(file, req); 2295 } 2296 2297 int 2298 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2299 { 2300 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2301 struct spdk_fs_request *req; 2302 struct spdk_fs_cb_args *args; 2303 2304 req = alloc_fs_request(channel); 2305 assert(req != NULL); 2306 2307 args = &req->args; 2308 2309 spdk_file_sync(file, _channel); 2310 BLOBFS_TRACE(file, "name=%s\n", file->name); 2311 args->file = file; 2312 args->sem = &channel->sem; 2313 args->fn.file_op = __file_close_done; 2314 args->arg = req; 2315 channel->send_request(__file_close, req); 2316 sem_wait(&channel->sem); 2317 2318 return args->rc; 2319 } 2320 2321 static void 2322 cache_free_buffers(struct spdk_file *file) 2323 { 2324 BLOBFS_TRACE(file, "free=%s\n", file->name); 2325 pthread_spin_lock(&file->lock); 2326 pthread_spin_lock(&g_caches_lock); 2327 if (file->tree->present_mask == 0) { 2328 pthread_spin_unlock(&g_caches_lock); 2329 pthread_spin_unlock(&file->lock); 2330 return; 2331 } 2332 spdk_tree_free_buffers(file->tree); 2333 2334 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2335 /* If not freed, put it in the end of the queue */ 2336 if (file->tree->present_mask != 0) { 2337 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2338 } 2339 file->last = NULL; 2340 pthread_spin_unlock(&g_caches_lock); 2341 pthread_spin_unlock(&file->lock); 2342 } 2343 2344 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS); 2345 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW); 2346