1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "blobfs_internal.h" 38 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/assert.h" 42 #include "spdk/env.h" 43 #include "spdk/util.h" 44 #include "spdk_internal/log.h" 45 46 #define BLOBFS_TRACE(file, str, args...) \ 47 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args) 48 49 #define BLOBFS_TRACE_RW(file, str, args...) \ 50 SPDK_TRACELOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args) 51 52 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 53 54 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 55 static struct spdk_mempool *g_cache_pool; 56 static TAILQ_HEAD(, spdk_file) g_caches; 57 static int g_fs_count = 0; 58 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 59 static pthread_spinlock_t g_caches_lock; 60 61 static void 62 __sem_post(void *arg, int bserrno) 63 { 64 sem_t *sem = arg; 65 66 sem_post(sem); 67 } 68 69 void 70 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 71 { 72 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 73 free(cache_buffer); 74 } 75 76 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 77 78 struct spdk_file { 79 struct spdk_filesystem *fs; 80 struct spdk_blob *blob; 81 char *name; 82 uint64_t length; 83 bool open_for_writing; 84 uint64_t length_flushed; 85 uint64_t append_pos; 86 uint64_t seq_byte_count; 87 uint64_t next_seq_offset; 88 uint32_t priority; 89 TAILQ_ENTRY(spdk_file) tailq; 90 spdk_blob_id blobid; 91 uint32_t ref_count; 92 pthread_spinlock_t lock; 93 struct cache_buffer *last; 94 struct cache_tree *tree; 95 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 96 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 97 TAILQ_ENTRY(spdk_file) cache_tailq; 98 }; 99 100 struct spdk_filesystem { 101 struct spdk_blob_store *bs; 102 TAILQ_HEAD(, spdk_file) files; 103 struct spdk_bs_opts bs_opts; 104 struct spdk_bs_dev *bdev; 105 fs_send_request_fn send_request; 106 107 struct { 108 uint32_t max_ops; 109 struct spdk_io_channel *sync_io_channel; 110 struct spdk_fs_channel *sync_fs_channel; 111 } sync_target; 112 113 struct { 114 uint32_t max_ops; 115 struct spdk_io_channel *md_io_channel; 116 struct spdk_fs_channel *md_fs_channel; 117 } md_target; 118 119 struct { 120 uint32_t max_ops; 121 } io_target; 122 }; 123 124 struct spdk_fs_cb_args { 125 union { 126 spdk_fs_op_with_handle_complete fs_op_with_handle; 127 spdk_fs_op_complete fs_op; 128 spdk_file_op_with_handle_complete file_op_with_handle; 129 spdk_file_op_complete file_op; 130 spdk_file_stat_op_complete stat_op; 131 } fn; 132 void *arg; 133 sem_t *sem; 134 struct spdk_filesystem *fs; 135 struct spdk_file *file; 136 int rc; 137 bool from_request; 138 union { 139 struct { 140 uint64_t length; 141 } truncate; 142 struct { 143 struct spdk_io_channel *channel; 144 void *user_buf; 145 void *pin_buf; 146 int is_read; 147 off_t offset; 148 size_t length; 149 uint64_t start_page; 150 uint64_t num_pages; 151 uint32_t blocklen; 152 } rw; 153 struct { 154 const char *old_name; 155 const char *new_name; 156 } rename; 157 struct { 158 struct cache_buffer *cache_buffer; 159 uint64_t length; 160 } flush; 161 struct { 162 struct cache_buffer *cache_buffer; 163 uint64_t length; 164 uint64_t offset; 165 } readahead; 166 struct { 167 uint64_t offset; 168 TAILQ_ENTRY(spdk_fs_request) tailq; 169 } sync; 170 struct { 171 uint32_t num_clusters; 172 } resize; 173 struct { 174 const char *name; 175 uint32_t flags; 176 TAILQ_ENTRY(spdk_fs_request) tailq; 177 } open; 178 struct { 179 const char *name; 180 } create; 181 struct { 182 const char *name; 183 } delete; 184 struct { 185 const char *name; 186 } stat; 187 } op; 188 }; 189 190 static void cache_free_buffers(struct spdk_file *file); 191 192 static void 193 __initialize_cache(void) 194 { 195 assert(g_cache_pool == NULL); 196 197 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 198 g_fs_cache_size / CACHE_BUFFER_SIZE, 199 CACHE_BUFFER_SIZE, -1, SPDK_ENV_SOCKET_ID_ANY); 200 TAILQ_INIT(&g_caches); 201 pthread_spin_init(&g_caches_lock, 0); 202 } 203 204 static void 205 __free_cache(void) 206 { 207 assert(g_cache_pool != NULL); 208 209 spdk_mempool_free(g_cache_pool); 210 g_cache_pool = NULL; 211 } 212 213 static uint64_t 214 __file_get_blob_size(struct spdk_file *file) 215 { 216 uint64_t cluster_sz; 217 218 cluster_sz = file->fs->bs_opts.cluster_sz; 219 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 220 } 221 222 struct spdk_fs_request { 223 struct spdk_fs_cb_args args; 224 TAILQ_ENTRY(spdk_fs_request) link; 225 struct spdk_fs_channel *channel; 226 }; 227 228 struct spdk_fs_channel { 229 struct spdk_fs_request *req_mem; 230 TAILQ_HEAD(, spdk_fs_request) reqs; 231 sem_t sem; 232 struct spdk_filesystem *fs; 233 struct spdk_io_channel *bs_channel; 234 fs_send_request_fn send_request; 235 bool sync; 236 pthread_spinlock_t lock; 237 }; 238 239 static struct spdk_fs_request * 240 alloc_fs_request(struct spdk_fs_channel *channel) 241 { 242 struct spdk_fs_request *req; 243 244 if (channel->sync) { 245 pthread_spin_lock(&channel->lock); 246 } 247 248 req = TAILQ_FIRST(&channel->reqs); 249 if (req) { 250 TAILQ_REMOVE(&channel->reqs, req, link); 251 } 252 253 if (channel->sync) { 254 pthread_spin_unlock(&channel->lock); 255 } 256 257 if (req == NULL) { 258 return NULL; 259 } 260 memset(req, 0, sizeof(*req)); 261 req->channel = channel; 262 req->args.from_request = true; 263 264 return req; 265 } 266 267 static void 268 free_fs_request(struct spdk_fs_request *req) 269 { 270 struct spdk_fs_channel *channel = req->channel; 271 272 if (channel->sync) { 273 pthread_spin_lock(&channel->lock); 274 } 275 276 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 277 278 if (channel->sync) { 279 pthread_spin_unlock(&channel->lock); 280 } 281 } 282 283 static int 284 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 285 uint32_t max_ops) 286 { 287 uint32_t i; 288 289 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 290 if (!channel->req_mem) { 291 return -1; 292 } 293 294 TAILQ_INIT(&channel->reqs); 295 sem_init(&channel->sem, 0, 0); 296 297 for (i = 0; i < max_ops; i++) { 298 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 299 } 300 301 channel->fs = fs; 302 303 return 0; 304 } 305 306 static int 307 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 308 { 309 struct spdk_filesystem *fs; 310 struct spdk_fs_channel *channel = ctx_buf; 311 312 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 313 314 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 315 } 316 317 static int 318 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 319 { 320 struct spdk_filesystem *fs; 321 struct spdk_fs_channel *channel = ctx_buf; 322 323 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 324 325 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 326 } 327 328 static int 329 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 330 { 331 struct spdk_filesystem *fs; 332 struct spdk_fs_channel *channel = ctx_buf; 333 334 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 335 336 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 337 } 338 339 static void 340 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 341 { 342 struct spdk_fs_channel *channel = ctx_buf; 343 344 free(channel->req_mem); 345 if (channel->bs_channel != NULL) { 346 spdk_bs_free_io_channel(channel->bs_channel); 347 } 348 } 349 350 static void 351 __send_request_direct(fs_request_fn fn, void *arg) 352 { 353 fn(arg); 354 } 355 356 static void 357 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 358 { 359 fs->bs = bs; 360 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 361 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 362 fs->md_target.md_fs_channel->send_request = __send_request_direct; 363 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 364 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 365 366 pthread_mutex_lock(&g_cache_init_lock); 367 if (g_fs_count == 0) { 368 __initialize_cache(); 369 } 370 g_fs_count++; 371 pthread_mutex_unlock(&g_cache_init_lock); 372 } 373 374 static void 375 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 376 { 377 struct spdk_fs_request *req = ctx; 378 struct spdk_fs_cb_args *args = &req->args; 379 struct spdk_filesystem *fs = args->fs; 380 381 if (bserrno == 0) { 382 common_fs_bs_init(fs, bs); 383 } else { 384 free(fs); 385 fs = NULL; 386 } 387 388 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 389 free_fs_request(req); 390 } 391 392 static struct spdk_filesystem * 393 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 394 { 395 struct spdk_filesystem *fs; 396 397 fs = calloc(1, sizeof(*fs)); 398 if (fs == NULL) { 399 return NULL; 400 } 401 402 fs->bdev = dev; 403 fs->send_request = send_request_fn; 404 TAILQ_INIT(&fs->files); 405 406 fs->md_target.max_ops = 512; 407 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 408 sizeof(struct spdk_fs_channel)); 409 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 410 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 411 412 fs->sync_target.max_ops = 512; 413 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 414 sizeof(struct spdk_fs_channel)); 415 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 416 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 417 418 fs->io_target.max_ops = 512; 419 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 420 sizeof(struct spdk_fs_channel)); 421 422 return fs; 423 } 424 425 void 426 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 427 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 428 { 429 struct spdk_filesystem *fs; 430 struct spdk_fs_request *req; 431 struct spdk_fs_cb_args *args; 432 433 fs = fs_alloc(dev, send_request_fn); 434 if (fs == NULL) { 435 cb_fn(cb_arg, NULL, -ENOMEM); 436 return; 437 } 438 439 req = alloc_fs_request(fs->md_target.md_fs_channel); 440 if (req == NULL) { 441 cb_fn(cb_arg, NULL, -ENOMEM); 442 return; 443 } 444 445 args = &req->args; 446 args->fn.fs_op_with_handle = cb_fn; 447 args->arg = cb_arg; 448 args->fs = fs; 449 450 spdk_bs_init(dev, NULL, init_cb, req); 451 } 452 453 static struct spdk_file * 454 file_alloc(struct spdk_filesystem *fs) 455 { 456 struct spdk_file *file; 457 458 file = calloc(1, sizeof(*file)); 459 if (file == NULL) { 460 return NULL; 461 } 462 463 file->fs = fs; 464 TAILQ_INIT(&file->open_requests); 465 TAILQ_INIT(&file->sync_requests); 466 pthread_spin_init(&file->lock, 0); 467 file->tree = calloc(1, sizeof(*file->tree)); 468 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 469 file->priority = SPDK_FILE_PRIORITY_LOW; 470 return file; 471 } 472 473 static void 474 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 475 { 476 struct spdk_fs_request *req = ctx; 477 struct spdk_fs_cb_args *args = &req->args; 478 struct spdk_filesystem *fs = args->fs; 479 struct spdk_file *f; 480 uint64_t *length; 481 const char *name; 482 size_t value_len; 483 484 if (rc == -ENOENT) { 485 /* Finished iterating */ 486 args->fn.fs_op_with_handle(args->arg, fs, 0); 487 free_fs_request(req); 488 return; 489 } else if (rc < 0) { 490 args->fn.fs_op_with_handle(args->arg, fs, rc); 491 free_fs_request(req); 492 return; 493 } 494 495 rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len); 496 if (rc < 0) { 497 args->fn.fs_op_with_handle(args->arg, fs, rc); 498 free_fs_request(req); 499 return; 500 } 501 502 rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len); 503 if (rc < 0) { 504 args->fn.fs_op_with_handle(args->arg, fs, rc); 505 free_fs_request(req); 506 return; 507 } 508 assert(value_len == 8); 509 510 f = file_alloc(fs); 511 if (f == NULL) { 512 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 513 free_fs_request(req); 514 return; 515 } 516 517 f->name = strdup(name); 518 f->blobid = spdk_blob_get_id(blob); 519 f->length = *length; 520 f->length_flushed = *length; 521 f->append_pos = *length; 522 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 523 524 spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req); 525 } 526 527 static void 528 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 529 { 530 struct spdk_fs_request *req = ctx; 531 struct spdk_fs_cb_args *args = &req->args; 532 struct spdk_filesystem *fs = args->fs; 533 534 if (bserrno != 0) { 535 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 536 free_fs_request(req); 537 free(fs); 538 return; 539 } 540 541 common_fs_bs_init(fs, bs); 542 spdk_bs_md_iter_first(fs->bs, iter_cb, req); 543 } 544 545 void 546 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 547 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 548 { 549 struct spdk_filesystem *fs; 550 struct spdk_fs_cb_args *args; 551 struct spdk_fs_request *req; 552 553 fs = fs_alloc(dev, send_request_fn); 554 if (fs == NULL) { 555 cb_fn(cb_arg, NULL, -ENOMEM); 556 return; 557 } 558 559 req = alloc_fs_request(fs->md_target.md_fs_channel); 560 if (req == NULL) { 561 cb_fn(cb_arg, NULL, -ENOMEM); 562 return; 563 } 564 565 args = &req->args; 566 args->fn.fs_op_with_handle = cb_fn; 567 args->arg = cb_arg; 568 args->fs = fs; 569 570 spdk_bs_load(dev, load_cb, req); 571 } 572 573 static void 574 unload_cb(void *ctx, int bserrno) 575 { 576 struct spdk_fs_request *req = ctx; 577 struct spdk_fs_cb_args *args = &req->args; 578 struct spdk_filesystem *fs = args->fs; 579 580 pthread_mutex_lock(&g_cache_init_lock); 581 g_fs_count--; 582 if (g_fs_count == 0) { 583 __free_cache(); 584 } 585 pthread_mutex_unlock(&g_cache_init_lock); 586 587 args->fn.fs_op(args->arg, bserrno); 588 free(req); 589 590 spdk_io_device_unregister(&fs->io_target); 591 spdk_io_device_unregister(&fs->sync_target); 592 spdk_io_device_unregister(&fs->md_target); 593 594 free(fs); 595 } 596 597 void 598 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 599 { 600 struct spdk_fs_request *req; 601 struct spdk_fs_cb_args *args; 602 603 /* 604 * We must free the md_channel before unloading the blobstore, so just 605 * allocate this request from the general heap. 606 */ 607 req = calloc(1, sizeof(*req)); 608 if (req == NULL) { 609 cb_fn(cb_arg, -ENOMEM); 610 return; 611 } 612 613 args = &req->args; 614 args->fn.fs_op = cb_fn; 615 args->arg = cb_arg; 616 args->fs = fs; 617 618 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 619 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 620 spdk_bs_unload(fs->bs, unload_cb, req); 621 } 622 623 static struct spdk_file * 624 fs_find_file(struct spdk_filesystem *fs, const char *name) 625 { 626 struct spdk_file *file; 627 628 TAILQ_FOREACH(file, &fs->files, tailq) { 629 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 630 return file; 631 } 632 } 633 634 return NULL; 635 } 636 637 void 638 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 639 spdk_file_stat_op_complete cb_fn, void *cb_arg) 640 { 641 struct spdk_file_stat stat; 642 struct spdk_file *f = NULL; 643 644 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 645 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 646 return; 647 } 648 649 f = fs_find_file(fs, name); 650 if (f != NULL) { 651 stat.blobid = f->blobid; 652 stat.size = f->length; 653 cb_fn(cb_arg, &stat, 0); 654 return; 655 } 656 657 cb_fn(cb_arg, NULL, -ENOENT); 658 } 659 660 static void 661 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 662 { 663 struct spdk_fs_request *req = arg; 664 struct spdk_fs_cb_args *args = &req->args; 665 666 args->rc = fserrno; 667 if (fserrno == 0) { 668 memcpy(args->arg, stat, sizeof(*stat)); 669 } 670 sem_post(args->sem); 671 } 672 673 static void 674 __file_stat(void *arg) 675 { 676 struct spdk_fs_request *req = arg; 677 struct spdk_fs_cb_args *args = &req->args; 678 679 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 680 args->fn.stat_op, req); 681 } 682 683 int 684 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 685 const char *name, struct spdk_file_stat *stat) 686 { 687 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 688 struct spdk_fs_request *req; 689 int rc; 690 691 req = alloc_fs_request(channel); 692 assert(req != NULL); 693 694 req->args.fs = fs; 695 req->args.op.stat.name = name; 696 req->args.fn.stat_op = __copy_stat; 697 req->args.arg = stat; 698 req->args.sem = &channel->sem; 699 channel->send_request(__file_stat, req); 700 sem_wait(&channel->sem); 701 702 rc = req->args.rc; 703 free_fs_request(req); 704 705 return rc; 706 } 707 708 static void 709 fs_create_blob_close_cb(void *ctx, int bserrno) 710 { 711 struct spdk_fs_request *req = ctx; 712 struct spdk_fs_cb_args *args = &req->args; 713 714 args->fn.file_op(args->arg, bserrno); 715 free_fs_request(req); 716 } 717 718 static void 719 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 720 { 721 struct spdk_fs_request *req = ctx; 722 struct spdk_fs_cb_args *args = &req->args; 723 struct spdk_file *f = args->file; 724 uint64_t length = 0; 725 726 f->blob = blob; 727 spdk_bs_md_resize_blob(blob, 1); 728 spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 729 spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length)); 730 731 spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args); 732 } 733 734 static void 735 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 736 { 737 struct spdk_fs_request *req = ctx; 738 struct spdk_fs_cb_args *args = &req->args; 739 struct spdk_file *f = args->file; 740 741 f->blobid = blobid; 742 spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 743 } 744 745 void 746 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 747 spdk_file_op_complete cb_fn, void *cb_arg) 748 { 749 struct spdk_file *file; 750 struct spdk_fs_request *req; 751 struct spdk_fs_cb_args *args; 752 753 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 754 cb_fn(cb_arg, -ENAMETOOLONG); 755 return; 756 } 757 758 file = fs_find_file(fs, name); 759 if (file != NULL) { 760 cb_fn(cb_arg, -EEXIST); 761 return; 762 } 763 764 file = file_alloc(fs); 765 if (file == NULL) { 766 cb_fn(cb_arg, -ENOMEM); 767 return; 768 } 769 770 req = alloc_fs_request(fs->md_target.md_fs_channel); 771 if (req == NULL) { 772 cb_fn(cb_arg, -ENOMEM); 773 return; 774 } 775 776 args = &req->args; 777 args->file = file; 778 args->fn.file_op = cb_fn; 779 args->arg = cb_arg; 780 781 file->name = strdup(name); 782 spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args); 783 } 784 785 static void 786 __fs_create_file_done(void *arg, int fserrno) 787 { 788 struct spdk_fs_request *req = arg; 789 struct spdk_fs_cb_args *args = &req->args; 790 791 args->rc = fserrno; 792 sem_post(args->sem); 793 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 794 } 795 796 static void 797 __fs_create_file(void *arg) 798 { 799 struct spdk_fs_request *req = arg; 800 struct spdk_fs_cb_args *args = &req->args; 801 802 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 803 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 804 } 805 806 int 807 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 808 { 809 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 810 struct spdk_fs_request *req; 811 struct spdk_fs_cb_args *args; 812 int rc; 813 814 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 815 816 req = alloc_fs_request(channel); 817 assert(req != NULL); 818 819 args = &req->args; 820 args->fs = fs; 821 args->op.create.name = name; 822 args->sem = &channel->sem; 823 fs->send_request(__fs_create_file, req); 824 sem_wait(&channel->sem); 825 rc = args->rc; 826 free_fs_request(req); 827 828 return rc; 829 } 830 831 static void 832 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 833 { 834 struct spdk_fs_request *req = ctx; 835 struct spdk_fs_cb_args *args = &req->args; 836 struct spdk_file *f = args->file; 837 838 f->blob = blob; 839 while (!TAILQ_EMPTY(&f->open_requests)) { 840 req = TAILQ_FIRST(&f->open_requests); 841 args = &req->args; 842 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 843 args->fn.file_op_with_handle(args->arg, f, bserrno); 844 free_fs_request(req); 845 } 846 } 847 848 static void 849 fs_open_blob_create_cb(void *ctx, int bserrno) 850 { 851 struct spdk_fs_request *req = ctx; 852 struct spdk_fs_cb_args *args = &req->args; 853 struct spdk_file *file = args->file; 854 struct spdk_filesystem *fs = args->fs; 855 856 if (file == NULL) { 857 /* 858 * This is from an open with CREATE flag - the file 859 * is now created so look it up in the file list for this 860 * filesystem. 861 */ 862 file = fs_find_file(fs, args->op.open.name); 863 assert(file != NULL); 864 args->file = file; 865 } 866 867 file->ref_count++; 868 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 869 if (file->ref_count == 1) { 870 assert(file->blob == NULL); 871 spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 872 } else if (file->blob != NULL) { 873 fs_open_blob_done(req, file->blob, 0); 874 } else { 875 /* 876 * The blob open for this file is in progress due to a previous 877 * open request. When that open completes, it will invoke the 878 * open callback for this request. 879 */ 880 } 881 } 882 883 void 884 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 885 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 886 { 887 struct spdk_file *f = NULL; 888 struct spdk_fs_request *req; 889 struct spdk_fs_cb_args *args; 890 891 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 892 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 893 return; 894 } 895 896 f = fs_find_file(fs, name); 897 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 898 cb_fn(cb_arg, NULL, -ENOENT); 899 return; 900 } 901 902 req = alloc_fs_request(fs->md_target.md_fs_channel); 903 if (req == NULL) { 904 cb_fn(cb_arg, NULL, -ENOMEM); 905 return; 906 } 907 908 args = &req->args; 909 args->fn.file_op_with_handle = cb_fn; 910 args->arg = cb_arg; 911 args->file = f; 912 args->fs = fs; 913 args->op.open.name = name; 914 915 if (f == NULL) { 916 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 917 } else { 918 fs_open_blob_create_cb(req, 0); 919 } 920 } 921 922 static void 923 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 924 { 925 struct spdk_fs_request *req = arg; 926 struct spdk_fs_cb_args *args = &req->args; 927 928 args->file = file; 929 args->rc = bserrno; 930 sem_post(args->sem); 931 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 932 } 933 934 static void 935 __fs_open_file(void *arg) 936 { 937 struct spdk_fs_request *req = arg; 938 struct spdk_fs_cb_args *args = &req->args; 939 940 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 941 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 942 __fs_open_file_done, req); 943 } 944 945 int 946 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 947 const char *name, uint32_t flags, struct spdk_file **file) 948 { 949 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 950 struct spdk_fs_request *req; 951 struct spdk_fs_cb_args *args; 952 int rc; 953 954 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 955 956 req = alloc_fs_request(channel); 957 assert(req != NULL); 958 959 args = &req->args; 960 args->fs = fs; 961 args->op.open.name = name; 962 args->op.open.flags = flags; 963 args->sem = &channel->sem; 964 fs->send_request(__fs_open_file, req); 965 sem_wait(&channel->sem); 966 rc = args->rc; 967 if (rc == 0) { 968 *file = args->file; 969 } else { 970 *file = NULL; 971 } 972 free_fs_request(req); 973 974 return rc; 975 } 976 977 static void 978 fs_rename_blob_close_cb(void *ctx, int bserrno) 979 { 980 struct spdk_fs_request *req = ctx; 981 struct spdk_fs_cb_args *args = &req->args; 982 983 args->fn.fs_op(args->arg, bserrno); 984 free_fs_request(req); 985 } 986 987 static void 988 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 989 { 990 struct spdk_fs_request *req = ctx; 991 struct spdk_fs_cb_args *args = &req->args; 992 struct spdk_file *f = args->file; 993 const char *new_name = args->op.rename.new_name; 994 995 f->blob = blob; 996 spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 997 spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req); 998 } 999 1000 static void 1001 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1002 { 1003 struct spdk_fs_cb_args *args = &req->args; 1004 struct spdk_file *f; 1005 1006 f = fs_find_file(args->fs, args->op.rename.old_name); 1007 if (f == NULL) { 1008 args->fn.fs_op(args->arg, -ENOENT); 1009 free_fs_request(req); 1010 return; 1011 } 1012 1013 free(f->name); 1014 f->name = strdup(args->op.rename.new_name); 1015 args->file = f; 1016 spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1017 } 1018 1019 static void 1020 fs_rename_delete_done(void *arg, int fserrno) 1021 { 1022 __spdk_fs_md_rename_file(arg); 1023 } 1024 1025 void 1026 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1027 const char *old_name, const char *new_name, 1028 spdk_file_op_complete cb_fn, void *cb_arg) 1029 { 1030 struct spdk_file *f; 1031 struct spdk_fs_request *req; 1032 struct spdk_fs_cb_args *args; 1033 1034 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1035 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1036 cb_fn(cb_arg, -ENAMETOOLONG); 1037 return; 1038 } 1039 1040 req = alloc_fs_request(fs->md_target.md_fs_channel); 1041 if (req == NULL) { 1042 cb_fn(cb_arg, -ENOMEM); 1043 return; 1044 } 1045 1046 args = &req->args; 1047 args->fn.fs_op = cb_fn; 1048 args->fs = fs; 1049 args->arg = cb_arg; 1050 args->op.rename.old_name = old_name; 1051 args->op.rename.new_name = new_name; 1052 1053 f = fs_find_file(fs, new_name); 1054 if (f == NULL) { 1055 __spdk_fs_md_rename_file(req); 1056 return; 1057 } 1058 1059 /* 1060 * The rename overwrites an existing file. So delete the existing file, then 1061 * do the actual rename. 1062 */ 1063 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1064 } 1065 1066 static void 1067 __fs_rename_file_done(void *arg, int fserrno) 1068 { 1069 struct spdk_fs_request *req = arg; 1070 struct spdk_fs_cb_args *args = &req->args; 1071 1072 args->rc = fserrno; 1073 sem_post(args->sem); 1074 } 1075 1076 static void 1077 __fs_rename_file(void *arg) 1078 { 1079 struct spdk_fs_request *req = arg; 1080 struct spdk_fs_cb_args *args = &req->args; 1081 1082 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1083 __fs_rename_file_done, req); 1084 } 1085 1086 int 1087 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1088 const char *old_name, const char *new_name) 1089 { 1090 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1091 struct spdk_fs_request *req; 1092 struct spdk_fs_cb_args *args; 1093 int rc; 1094 1095 req = alloc_fs_request(channel); 1096 assert(req != NULL); 1097 1098 args = &req->args; 1099 1100 args->fs = fs; 1101 args->op.rename.old_name = old_name; 1102 args->op.rename.new_name = new_name; 1103 args->sem = &channel->sem; 1104 fs->send_request(__fs_rename_file, req); 1105 sem_wait(&channel->sem); 1106 rc = args->rc; 1107 free_fs_request(req); 1108 return rc; 1109 } 1110 1111 static void 1112 blob_delete_cb(void *ctx, int bserrno) 1113 { 1114 struct spdk_fs_request *req = ctx; 1115 struct spdk_fs_cb_args *args = &req->args; 1116 1117 args->fn.file_op(args->arg, bserrno); 1118 free_fs_request(req); 1119 } 1120 1121 void 1122 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1123 spdk_file_op_complete cb_fn, void *cb_arg) 1124 { 1125 struct spdk_file *f; 1126 spdk_blob_id blobid; 1127 struct spdk_fs_request *req; 1128 struct spdk_fs_cb_args *args; 1129 1130 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1131 1132 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1133 cb_fn(cb_arg, -ENAMETOOLONG); 1134 return; 1135 } 1136 1137 f = fs_find_file(fs, name); 1138 if (f == NULL) { 1139 cb_fn(cb_arg, -ENOENT); 1140 return; 1141 } 1142 1143 if (f->ref_count > 0) { 1144 /* For now, do not allow deleting files with open references. */ 1145 cb_fn(cb_arg, -EBUSY); 1146 return; 1147 } 1148 1149 req = alloc_fs_request(fs->md_target.md_fs_channel); 1150 if (req == NULL) { 1151 cb_fn(cb_arg, -ENOMEM); 1152 return; 1153 } 1154 1155 TAILQ_REMOVE(&fs->files, f, tailq); 1156 1157 cache_free_buffers(f); 1158 1159 blobid = f->blobid; 1160 1161 free(f->name); 1162 free(f->tree); 1163 free(f); 1164 1165 args = &req->args; 1166 args->fn.file_op = cb_fn; 1167 args->arg = cb_arg; 1168 spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1169 } 1170 1171 static void 1172 __fs_delete_file_done(void *arg, int fserrno) 1173 { 1174 struct spdk_fs_request *req = arg; 1175 struct spdk_fs_cb_args *args = &req->args; 1176 1177 args->rc = fserrno; 1178 sem_post(args->sem); 1179 } 1180 1181 static void 1182 __fs_delete_file(void *arg) 1183 { 1184 struct spdk_fs_request *req = arg; 1185 struct spdk_fs_cb_args *args = &req->args; 1186 1187 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1188 } 1189 1190 int 1191 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1192 const char *name) 1193 { 1194 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1195 struct spdk_fs_request *req; 1196 struct spdk_fs_cb_args *args; 1197 int rc; 1198 1199 req = alloc_fs_request(channel); 1200 assert(req != NULL); 1201 1202 args = &req->args; 1203 args->fs = fs; 1204 args->op.delete.name = name; 1205 args->sem = &channel->sem; 1206 fs->send_request(__fs_delete_file, req); 1207 sem_wait(&channel->sem); 1208 rc = args->rc; 1209 free_fs_request(req); 1210 1211 return rc; 1212 } 1213 1214 spdk_fs_iter 1215 spdk_fs_iter_first(struct spdk_filesystem *fs) 1216 { 1217 struct spdk_file *f; 1218 1219 f = TAILQ_FIRST(&fs->files); 1220 return f; 1221 } 1222 1223 spdk_fs_iter 1224 spdk_fs_iter_next(spdk_fs_iter iter) 1225 { 1226 struct spdk_file *f = iter; 1227 1228 if (f == NULL) { 1229 return NULL; 1230 } 1231 1232 f = TAILQ_NEXT(f, tailq); 1233 return f; 1234 } 1235 1236 const char * 1237 spdk_file_get_name(struct spdk_file *file) 1238 { 1239 return file->name; 1240 } 1241 1242 uint64_t 1243 spdk_file_get_length(struct spdk_file *file) 1244 { 1245 assert(file != NULL); 1246 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1247 return file->length; 1248 } 1249 1250 static void 1251 fs_truncate_complete_cb(void *ctx, int bserrno) 1252 { 1253 struct spdk_fs_request *req = ctx; 1254 struct spdk_fs_cb_args *args = &req->args; 1255 1256 args->fn.file_op(args->arg, bserrno); 1257 free_fs_request(req); 1258 } 1259 1260 static uint64_t 1261 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1262 { 1263 return (length + cluster_sz - 1) / cluster_sz; 1264 } 1265 1266 void 1267 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1268 spdk_file_op_complete cb_fn, void *cb_arg) 1269 { 1270 struct spdk_filesystem *fs; 1271 size_t num_clusters; 1272 struct spdk_fs_request *req; 1273 struct spdk_fs_cb_args *args; 1274 1275 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1276 if (length == file->length) { 1277 cb_fn(cb_arg, 0); 1278 return; 1279 } 1280 1281 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1282 if (req == NULL) { 1283 cb_fn(cb_arg, -ENOMEM); 1284 return; 1285 } 1286 1287 args = &req->args; 1288 args->fn.file_op = cb_fn; 1289 args->arg = cb_arg; 1290 args->file = file; 1291 fs = file->fs; 1292 1293 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1294 1295 spdk_bs_md_resize_blob(file->blob, num_clusters); 1296 spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length)); 1297 1298 file->length = length; 1299 if (file->append_pos > file->length) { 1300 file->append_pos = file->length; 1301 } 1302 1303 spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args); 1304 } 1305 1306 static void 1307 __truncate(void *arg) 1308 { 1309 struct spdk_fs_request *req = arg; 1310 struct spdk_fs_cb_args *args = &req->args; 1311 1312 spdk_file_truncate_async(args->file, args->op.truncate.length, 1313 args->fn.file_op, args->arg); 1314 } 1315 1316 void 1317 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1318 uint64_t length) 1319 { 1320 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1321 struct spdk_fs_request *req; 1322 struct spdk_fs_cb_args *args; 1323 1324 req = alloc_fs_request(channel); 1325 assert(req != NULL); 1326 1327 args = &req->args; 1328 1329 args->file = file; 1330 args->op.truncate.length = length; 1331 args->fn.file_op = __sem_post; 1332 args->arg = &channel->sem; 1333 1334 channel->send_request(__truncate, req); 1335 sem_wait(&channel->sem); 1336 free_fs_request(req); 1337 } 1338 1339 static void 1340 __rw_done(void *ctx, int bserrno) 1341 { 1342 struct spdk_fs_request *req = ctx; 1343 struct spdk_fs_cb_args *args = &req->args; 1344 1345 spdk_dma_free(args->op.rw.pin_buf); 1346 args->fn.file_op(args->arg, bserrno); 1347 free_fs_request(req); 1348 } 1349 1350 static void 1351 __read_done(void *ctx, int bserrno) 1352 { 1353 struct spdk_fs_request *req = ctx; 1354 struct spdk_fs_cb_args *args = &req->args; 1355 1356 if (args->op.rw.is_read) { 1357 memcpy(args->op.rw.user_buf, 1358 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1359 args->op.rw.length); 1360 __rw_done(req, 0); 1361 } else { 1362 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1363 args->op.rw.user_buf, 1364 args->op.rw.length); 1365 spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel, 1366 args->op.rw.pin_buf, 1367 args->op.rw.start_page, args->op.rw.num_pages, 1368 __rw_done, req); 1369 } 1370 } 1371 1372 static void 1373 __do_blob_read(void *ctx, int fserrno) 1374 { 1375 struct spdk_fs_request *req = ctx; 1376 struct spdk_fs_cb_args *args = &req->args; 1377 1378 spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel, 1379 args->op.rw.pin_buf, 1380 args->op.rw.start_page, args->op.rw.num_pages, 1381 __read_done, req); 1382 } 1383 1384 static void 1385 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1386 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1387 { 1388 uint64_t end_page; 1389 1390 *page_size = spdk_bs_get_page_size(file->fs->bs); 1391 *start_page = offset / *page_size; 1392 end_page = (offset + length - 1) / *page_size; 1393 *num_pages = (end_page - *start_page + 1); 1394 } 1395 1396 static void 1397 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1398 void *payload, uint64_t offset, uint64_t length, 1399 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1400 { 1401 struct spdk_fs_request *req; 1402 struct spdk_fs_cb_args *args; 1403 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1404 uint64_t start_page, num_pages, pin_buf_length; 1405 uint32_t page_size; 1406 1407 if (is_read && offset + length > file->length) { 1408 cb_fn(cb_arg, -EINVAL); 1409 return; 1410 } 1411 1412 req = alloc_fs_request(channel); 1413 if (req == NULL) { 1414 cb_fn(cb_arg, -ENOMEM); 1415 return; 1416 } 1417 1418 args = &req->args; 1419 args->fn.file_op = cb_fn; 1420 args->arg = cb_arg; 1421 args->file = file; 1422 args->op.rw.channel = channel->bs_channel; 1423 args->op.rw.user_buf = payload; 1424 args->op.rw.is_read = is_read; 1425 args->op.rw.offset = offset; 1426 args->op.rw.length = length; 1427 1428 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1429 pin_buf_length = num_pages * page_size; 1430 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL); 1431 1432 args->op.rw.start_page = start_page; 1433 args->op.rw.num_pages = num_pages; 1434 1435 if (!is_read && file->length < offset + length) { 1436 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1437 } else { 1438 __do_blob_read(req, 0); 1439 } 1440 } 1441 1442 void 1443 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1444 void *payload, uint64_t offset, uint64_t length, 1445 spdk_file_op_complete cb_fn, void *cb_arg) 1446 { 1447 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1448 } 1449 1450 void 1451 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1452 void *payload, uint64_t offset, uint64_t length, 1453 spdk_file_op_complete cb_fn, void *cb_arg) 1454 { 1455 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n", 1456 file->name, offset, length); 1457 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1458 } 1459 1460 struct spdk_io_channel * 1461 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1462 { 1463 struct spdk_io_channel *io_channel; 1464 struct spdk_fs_channel *fs_channel; 1465 1466 io_channel = spdk_get_io_channel(&fs->io_target); 1467 fs_channel = spdk_io_channel_get_ctx(io_channel); 1468 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1469 fs_channel->send_request = __send_request_direct; 1470 1471 return io_channel; 1472 } 1473 1474 struct spdk_io_channel * 1475 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs) 1476 { 1477 struct spdk_io_channel *io_channel; 1478 struct spdk_fs_channel *fs_channel; 1479 1480 io_channel = spdk_get_io_channel(&fs->io_target); 1481 fs_channel = spdk_io_channel_get_ctx(io_channel); 1482 fs_channel->send_request = fs->send_request; 1483 fs_channel->sync = 1; 1484 pthread_spin_init(&fs_channel->lock, 0); 1485 1486 return io_channel; 1487 } 1488 1489 void 1490 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1491 { 1492 spdk_put_io_channel(channel); 1493 } 1494 1495 void 1496 spdk_fs_set_cache_size(uint64_t size_in_mb) 1497 { 1498 g_fs_cache_size = size_in_mb * 1024 * 1024; 1499 } 1500 1501 uint64_t 1502 spdk_fs_get_cache_size(void) 1503 { 1504 return g_fs_cache_size / (1024 * 1024); 1505 } 1506 1507 static void __file_flush(void *_args); 1508 1509 static void * 1510 alloc_cache_memory_buffer(struct spdk_file *context) 1511 { 1512 struct spdk_file *file; 1513 void *buf; 1514 1515 buf = spdk_mempool_get(g_cache_pool); 1516 if (buf != NULL) { 1517 return buf; 1518 } 1519 1520 pthread_spin_lock(&g_caches_lock); 1521 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1522 if (!file->open_for_writing && 1523 file->priority == SPDK_FILE_PRIORITY_LOW && 1524 file != context) { 1525 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1526 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1527 break; 1528 } 1529 } 1530 pthread_spin_unlock(&g_caches_lock); 1531 if (file != NULL) { 1532 cache_free_buffers(file); 1533 buf = spdk_mempool_get(g_cache_pool); 1534 if (buf != NULL) { 1535 return buf; 1536 } 1537 } 1538 1539 pthread_spin_lock(&g_caches_lock); 1540 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1541 if (!file->open_for_writing && file != context) { 1542 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1543 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1544 break; 1545 } 1546 } 1547 pthread_spin_unlock(&g_caches_lock); 1548 if (file != NULL) { 1549 cache_free_buffers(file); 1550 buf = spdk_mempool_get(g_cache_pool); 1551 if (buf != NULL) { 1552 return buf; 1553 } 1554 } 1555 1556 pthread_spin_lock(&g_caches_lock); 1557 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1558 if (file != context) { 1559 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1560 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1561 break; 1562 } 1563 } 1564 pthread_spin_unlock(&g_caches_lock); 1565 if (file != NULL) { 1566 cache_free_buffers(file); 1567 buf = spdk_mempool_get(g_cache_pool); 1568 if (buf != NULL) { 1569 return buf; 1570 } 1571 } 1572 1573 assert(false); 1574 return NULL; 1575 } 1576 1577 static struct cache_buffer * 1578 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1579 { 1580 struct cache_buffer *buf; 1581 int count = 0; 1582 1583 buf = calloc(1, sizeof(*buf)); 1584 if (buf == NULL) { 1585 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "calloc failed\n"); 1586 return NULL; 1587 } 1588 1589 buf->buf = alloc_cache_memory_buffer(file); 1590 if (buf->buf == NULL) { 1591 while (buf->buf == NULL) { 1592 /* 1593 * TODO: alloc_cache_memory_buffer() should eventually free 1594 * some buffers. Need a more sophisticated check here, instead 1595 * of just bailing if 100 tries does not result in getting a 1596 * free buffer. This will involve using the sync channel's 1597 * semaphore to block until a buffer becomes available. 1598 */ 1599 if (count++ == 100) { 1600 SPDK_ERRLOG("could not allocate cache buffer\n"); 1601 assert(false); 1602 free(buf); 1603 return NULL; 1604 } 1605 buf->buf = alloc_cache_memory_buffer(file); 1606 } 1607 } 1608 1609 buf->buf_size = CACHE_BUFFER_SIZE; 1610 buf->offset = offset; 1611 1612 pthread_spin_lock(&g_caches_lock); 1613 if (file->tree->present_mask == 0) { 1614 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1615 } 1616 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1617 pthread_spin_unlock(&g_caches_lock); 1618 1619 return buf; 1620 } 1621 1622 static struct cache_buffer * 1623 cache_append_buffer(struct spdk_file *file) 1624 { 1625 struct cache_buffer *last; 1626 1627 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1628 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1629 1630 last = cache_insert_buffer(file, file->append_pos); 1631 if (last == NULL) { 1632 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n"); 1633 return NULL; 1634 } 1635 1636 if (file->last != NULL) { 1637 file->last->next = last; 1638 } 1639 file->last = last; 1640 1641 return last; 1642 } 1643 1644 static void 1645 __wake_caller(struct spdk_fs_cb_args *args) 1646 { 1647 sem_post(args->sem); 1648 } 1649 1650 static void 1651 __file_cache_finish_sync(struct spdk_file *file) 1652 { 1653 struct spdk_fs_request *sync_req; 1654 struct spdk_fs_cb_args *sync_args; 1655 1656 pthread_spin_lock(&file->lock); 1657 while (!TAILQ_EMPTY(&file->sync_requests)) { 1658 sync_req = TAILQ_FIRST(&file->sync_requests); 1659 sync_args = &sync_req->args; 1660 if (sync_args->op.sync.offset > file->length_flushed) { 1661 break; 1662 } 1663 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1664 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1665 pthread_spin_unlock(&file->lock); 1666 sync_args->fn.file_op(sync_args->arg, 0); 1667 pthread_spin_lock(&file->lock); 1668 free_fs_request(sync_req); 1669 } 1670 pthread_spin_unlock(&file->lock); 1671 } 1672 1673 static void 1674 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1675 { 1676 struct spdk_file *file = ctx; 1677 1678 __file_cache_finish_sync(file); 1679 } 1680 1681 static void 1682 __free_args(struct spdk_fs_cb_args *args) 1683 { 1684 struct spdk_fs_request *req; 1685 1686 if (!args->from_request) { 1687 free(args); 1688 } else { 1689 /* Depends on args being at the start of the spdk_fs_request structure. */ 1690 req = (struct spdk_fs_request *)args; 1691 free_fs_request(req); 1692 } 1693 } 1694 1695 static void 1696 __file_flush_done(void *arg, int bserrno) 1697 { 1698 struct spdk_fs_cb_args *args = arg; 1699 struct spdk_fs_request *sync_req; 1700 struct spdk_file *file = args->file; 1701 struct cache_buffer *next = args->op.flush.cache_buffer; 1702 1703 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1704 1705 pthread_spin_lock(&file->lock); 1706 next->in_progress = false; 1707 next->bytes_flushed += args->op.flush.length; 1708 file->length_flushed += args->op.flush.length; 1709 if (file->length_flushed > file->length) { 1710 file->length = file->length_flushed; 1711 } 1712 if (next->bytes_flushed == next->buf_size) { 1713 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1714 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1715 } 1716 1717 TAILQ_FOREACH_REVERSE(sync_req, &file->sync_requests, sync_requests_head, args.op.sync.tailq) { 1718 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1719 break; 1720 } 1721 } 1722 1723 /* 1724 * Assert that there is no cached data that extends past the end of the underlying 1725 * blob. 1726 */ 1727 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1728 next->bytes_filled == 0); 1729 1730 if (sync_req != NULL) { 1731 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1732 spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed, 1733 sizeof(file->length_flushed)); 1734 1735 pthread_spin_unlock(&file->lock); 1736 spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file); 1737 } else { 1738 pthread_spin_unlock(&file->lock); 1739 __file_cache_finish_sync(file); 1740 } 1741 1742 __file_flush(args); 1743 } 1744 1745 static void 1746 __file_flush(void *_args) 1747 { 1748 struct spdk_fs_cb_args *args = _args; 1749 struct spdk_file *file = args->file; 1750 struct cache_buffer *next; 1751 uint64_t offset, length, start_page, num_pages; 1752 uint32_t page_size; 1753 1754 pthread_spin_lock(&file->lock); 1755 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1756 if (next == NULL || next->in_progress) { 1757 /* 1758 * There is either no data to flush, or a flush I/O is already in 1759 * progress. So return immediately - if a flush I/O is in 1760 * progress we will flush more data after that is completed. 1761 */ 1762 __free_args(args); 1763 pthread_spin_unlock(&file->lock); 1764 return; 1765 } 1766 1767 offset = next->offset + next->bytes_flushed; 1768 length = next->bytes_filled - next->bytes_flushed; 1769 if (length == 0) { 1770 __free_args(args); 1771 pthread_spin_unlock(&file->lock); 1772 return; 1773 } 1774 args->op.flush.length = length; 1775 args->op.flush.cache_buffer = next; 1776 1777 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1778 1779 next->in_progress = true; 1780 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1781 offset, length, start_page, num_pages); 1782 pthread_spin_unlock(&file->lock); 1783 spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1784 next->buf + (start_page * page_size) - next->offset, 1785 start_page, num_pages, 1786 __file_flush_done, args); 1787 } 1788 1789 static void 1790 __file_extend_done(void *arg, int bserrno) 1791 { 1792 struct spdk_fs_cb_args *args = arg; 1793 1794 __wake_caller(args); 1795 } 1796 1797 static void 1798 __file_extend_blob(void *_args) 1799 { 1800 struct spdk_fs_cb_args *args = _args; 1801 struct spdk_file *file = args->file; 1802 1803 spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters); 1804 1805 spdk_bs_md_sync_blob(file->blob, __file_extend_done, args); 1806 } 1807 1808 static void 1809 __rw_from_file_done(void *arg, int bserrno) 1810 { 1811 struct spdk_fs_cb_args *args = arg; 1812 1813 __wake_caller(args); 1814 __free_args(args); 1815 } 1816 1817 static void 1818 __rw_from_file(void *_args) 1819 { 1820 struct spdk_fs_cb_args *args = _args; 1821 struct spdk_file *file = args->file; 1822 1823 if (args->op.rw.is_read) { 1824 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1825 args->op.rw.offset, args->op.rw.length, 1826 __rw_from_file_done, args); 1827 } else { 1828 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1829 args->op.rw.offset, args->op.rw.length, 1830 __rw_from_file_done, args); 1831 } 1832 } 1833 1834 static int 1835 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1836 uint64_t offset, uint64_t length, bool is_read) 1837 { 1838 struct spdk_fs_cb_args *args; 1839 1840 args = calloc(1, sizeof(*args)); 1841 if (args == NULL) { 1842 sem_post(sem); 1843 return -ENOMEM; 1844 } 1845 1846 args->file = file; 1847 args->sem = sem; 1848 args->op.rw.user_buf = payload; 1849 args->op.rw.offset = offset; 1850 args->op.rw.length = length; 1851 args->op.rw.is_read = is_read; 1852 file->fs->send_request(__rw_from_file, args); 1853 return 0; 1854 } 1855 1856 int 1857 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 1858 void *payload, uint64_t offset, uint64_t length) 1859 { 1860 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1861 struct spdk_fs_cb_args *args; 1862 uint64_t rem_length, copy, blob_size, cluster_sz; 1863 uint32_t cache_buffers_filled = 0; 1864 uint8_t *cur_payload; 1865 struct cache_buffer *last; 1866 1867 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 1868 1869 if (length == 0) { 1870 return 0; 1871 } 1872 1873 if (offset != file->append_pos) { 1874 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 1875 return -EINVAL; 1876 } 1877 1878 pthread_spin_lock(&file->lock); 1879 file->open_for_writing = true; 1880 1881 if (file->last == NULL) { 1882 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 1883 cache_append_buffer(file); 1884 } else { 1885 int rc; 1886 1887 file->append_pos += length; 1888 pthread_spin_unlock(&file->lock); 1889 rc = __send_rw_from_file(file, &channel->sem, payload, 1890 offset, length, false); 1891 sem_wait(&channel->sem); 1892 return rc; 1893 } 1894 } 1895 1896 blob_size = __file_get_blob_size(file); 1897 1898 if ((offset + length) > blob_size) { 1899 struct spdk_fs_cb_args extend_args = {}; 1900 1901 cluster_sz = file->fs->bs_opts.cluster_sz; 1902 extend_args.sem = &channel->sem; 1903 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 1904 extend_args.file = file; 1905 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 1906 pthread_spin_unlock(&file->lock); 1907 file->fs->send_request(__file_extend_blob, &extend_args); 1908 sem_wait(&channel->sem); 1909 } 1910 1911 last = file->last; 1912 rem_length = length; 1913 cur_payload = payload; 1914 while (rem_length > 0) { 1915 copy = last->buf_size - last->bytes_filled; 1916 if (copy > rem_length) { 1917 copy = rem_length; 1918 } 1919 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 1920 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 1921 file->append_pos += copy; 1922 if (file->length < file->append_pos) { 1923 file->length = file->append_pos; 1924 } 1925 cur_payload += copy; 1926 last->bytes_filled += copy; 1927 rem_length -= copy; 1928 if (last->bytes_filled == last->buf_size) { 1929 cache_buffers_filled++; 1930 last = cache_append_buffer(file); 1931 if (last == NULL) { 1932 BLOBFS_TRACE(file, "nomem\n"); 1933 pthread_spin_unlock(&file->lock); 1934 return -ENOMEM; 1935 } 1936 } 1937 } 1938 1939 if (cache_buffers_filled == 0) { 1940 pthread_spin_unlock(&file->lock); 1941 return 0; 1942 } 1943 1944 args = calloc(1, sizeof(*args)); 1945 if (args == NULL) { 1946 pthread_spin_unlock(&file->lock); 1947 return -ENOMEM; 1948 } 1949 1950 args->file = file; 1951 file->fs->send_request(__file_flush, args); 1952 pthread_spin_unlock(&file->lock); 1953 return 0; 1954 } 1955 1956 static void 1957 __readahead_done(void *arg, int bserrno) 1958 { 1959 struct spdk_fs_cb_args *args = arg; 1960 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 1961 struct spdk_file *file = args->file; 1962 1963 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 1964 1965 pthread_spin_lock(&file->lock); 1966 cache_buffer->bytes_filled = args->op.readahead.length; 1967 cache_buffer->bytes_flushed = args->op.readahead.length; 1968 cache_buffer->in_progress = false; 1969 pthread_spin_unlock(&file->lock); 1970 1971 __free_args(args); 1972 } 1973 1974 static void 1975 __readahead(void *_args) 1976 { 1977 struct spdk_fs_cb_args *args = _args; 1978 struct spdk_file *file = args->file; 1979 uint64_t offset, length, start_page, num_pages; 1980 uint32_t page_size; 1981 1982 offset = args->op.readahead.offset; 1983 length = args->op.readahead.length; 1984 assert(length > 0); 1985 1986 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1987 1988 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1989 offset, length, start_page, num_pages); 1990 spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1991 args->op.readahead.cache_buffer->buf, 1992 start_page, num_pages, 1993 __readahead_done, args); 1994 } 1995 1996 static uint64_t 1997 __next_cache_buffer_offset(uint64_t offset) 1998 { 1999 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2000 } 2001 2002 static void 2003 check_readahead(struct spdk_file *file, uint64_t offset) 2004 { 2005 struct spdk_fs_cb_args *args; 2006 2007 offset = __next_cache_buffer_offset(offset); 2008 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2009 return; 2010 } 2011 2012 args = calloc(1, sizeof(*args)); 2013 if (args == NULL) { 2014 return; 2015 } 2016 2017 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2018 2019 args->file = file; 2020 args->op.readahead.offset = offset; 2021 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2022 args->op.readahead.cache_buffer->in_progress = true; 2023 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2024 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2025 } else { 2026 args->op.readahead.length = CACHE_BUFFER_SIZE; 2027 } 2028 file->fs->send_request(__readahead, args); 2029 } 2030 2031 static int 2032 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2033 { 2034 struct cache_buffer *buf; 2035 int rc; 2036 2037 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2038 if (buf == NULL) { 2039 pthread_spin_unlock(&file->lock); 2040 rc = __send_rw_from_file(file, sem, payload, offset, length, true); 2041 pthread_spin_lock(&file->lock); 2042 return rc; 2043 } 2044 2045 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2046 length = buf->offset + buf->bytes_filled - offset; 2047 } 2048 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2049 memcpy(payload, &buf->buf[offset - buf->offset], length); 2050 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2051 pthread_spin_lock(&g_caches_lock); 2052 spdk_tree_remove_buffer(file->tree, buf); 2053 if (file->tree->present_mask == 0) { 2054 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2055 } 2056 pthread_spin_unlock(&g_caches_lock); 2057 } 2058 2059 sem_post(sem); 2060 return 0; 2061 } 2062 2063 int64_t 2064 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2065 void *payload, uint64_t offset, uint64_t length) 2066 { 2067 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2068 uint64_t final_offset, final_length; 2069 uint32_t sub_reads = 0; 2070 int rc = 0; 2071 2072 pthread_spin_lock(&file->lock); 2073 2074 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2075 2076 file->open_for_writing = false; 2077 2078 if (length == 0 || offset >= file->length) { 2079 pthread_spin_unlock(&file->lock); 2080 return 0; 2081 } 2082 2083 if (offset + length > file->length) { 2084 length = file->length - offset; 2085 } 2086 2087 if (offset != file->next_seq_offset) { 2088 file->seq_byte_count = 0; 2089 } 2090 file->seq_byte_count += length; 2091 file->next_seq_offset = offset + length; 2092 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2093 check_readahead(file, offset); 2094 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2095 } 2096 2097 final_length = 0; 2098 final_offset = offset + length; 2099 while (offset < final_offset) { 2100 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2101 if (length > (final_offset - offset)) { 2102 length = final_offset - offset; 2103 } 2104 rc = __file_read(file, payload, offset, length, &channel->sem); 2105 if (rc == 0) { 2106 final_length += length; 2107 } else { 2108 break; 2109 } 2110 payload += length; 2111 offset += length; 2112 sub_reads++; 2113 } 2114 pthread_spin_unlock(&file->lock); 2115 while (sub_reads-- > 0) { 2116 sem_wait(&channel->sem); 2117 } 2118 if (rc == 0) { 2119 return final_length; 2120 } else { 2121 return rc; 2122 } 2123 } 2124 2125 static void 2126 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2127 spdk_file_op_complete cb_fn, void *cb_arg) 2128 { 2129 struct spdk_fs_request *sync_req; 2130 struct spdk_fs_request *flush_req; 2131 struct spdk_fs_cb_args *sync_args; 2132 struct spdk_fs_cb_args *flush_args; 2133 2134 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2135 2136 pthread_spin_lock(&file->lock); 2137 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2138 BLOBFS_TRACE(file, "done - no data to flush\n"); 2139 pthread_spin_unlock(&file->lock); 2140 cb_fn(cb_arg, 0); 2141 return; 2142 } 2143 2144 sync_req = alloc_fs_request(channel); 2145 assert(sync_req != NULL); 2146 sync_args = &sync_req->args; 2147 2148 flush_req = alloc_fs_request(channel); 2149 assert(flush_req != NULL); 2150 flush_args = &flush_req->args; 2151 2152 sync_args->file = file; 2153 sync_args->fn.file_op = cb_fn; 2154 sync_args->arg = cb_arg; 2155 sync_args->op.sync.offset = file->append_pos; 2156 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2157 pthread_spin_unlock(&file->lock); 2158 2159 flush_args->file = file; 2160 channel->send_request(__file_flush, flush_args); 2161 } 2162 2163 int 2164 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2165 { 2166 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2167 2168 _file_sync(file, channel, __sem_post, &channel->sem); 2169 sem_wait(&channel->sem); 2170 2171 return 0; 2172 } 2173 2174 void 2175 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2176 spdk_file_op_complete cb_fn, void *cb_arg) 2177 { 2178 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2179 2180 _file_sync(file, channel, cb_fn, cb_arg); 2181 } 2182 2183 void 2184 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2185 { 2186 BLOBFS_TRACE(file, "priority=%u\n", priority); 2187 file->priority = priority; 2188 2189 } 2190 2191 /* 2192 * Close routines 2193 */ 2194 2195 static void 2196 __file_close_async_done(void *ctx, int bserrno) 2197 { 2198 struct spdk_fs_request *req = ctx; 2199 struct spdk_fs_cb_args *args = &req->args; 2200 2201 args->fn.file_op(args->arg, bserrno); 2202 free_fs_request(req); 2203 } 2204 2205 static void 2206 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2207 { 2208 pthread_spin_lock(&file->lock); 2209 if (file->ref_count == 0) { 2210 pthread_spin_unlock(&file->lock); 2211 __file_close_async_done(req, -EBADF); 2212 return; 2213 } 2214 2215 file->ref_count--; 2216 if (file->ref_count > 0) { 2217 pthread_spin_unlock(&file->lock); 2218 __file_close_async_done(req, 0); 2219 return; 2220 } 2221 2222 pthread_spin_unlock(&file->lock); 2223 2224 spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req); 2225 } 2226 2227 static void 2228 __file_close_async__sync_done(void *arg, int fserrno) 2229 { 2230 struct spdk_fs_request *req = arg; 2231 struct spdk_fs_cb_args *args = &req->args; 2232 2233 __file_close_async(args->file, req); 2234 } 2235 2236 void 2237 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2238 { 2239 struct spdk_fs_request *req; 2240 struct spdk_fs_cb_args *args; 2241 2242 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2243 if (req == NULL) { 2244 cb_fn(cb_arg, -ENOMEM); 2245 return; 2246 } 2247 2248 args = &req->args; 2249 args->file = file; 2250 args->fn.file_op = cb_fn; 2251 args->arg = cb_arg; 2252 2253 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2254 } 2255 2256 static void 2257 __file_close_done(void *arg, int fserrno) 2258 { 2259 struct spdk_fs_cb_args *args = arg; 2260 2261 args->rc = fserrno; 2262 sem_post(args->sem); 2263 } 2264 2265 static void 2266 __file_close(void *arg) 2267 { 2268 struct spdk_fs_request *req = arg; 2269 struct spdk_fs_cb_args *args = &req->args; 2270 struct spdk_file *file = args->file; 2271 2272 __file_close_async(file, req); 2273 } 2274 2275 int 2276 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2277 { 2278 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2279 struct spdk_fs_request *req; 2280 struct spdk_fs_cb_args *args; 2281 2282 req = alloc_fs_request(channel); 2283 assert(req != NULL); 2284 2285 args = &req->args; 2286 2287 spdk_file_sync(file, _channel); 2288 BLOBFS_TRACE(file, "name=%s\n", file->name); 2289 args->file = file; 2290 args->sem = &channel->sem; 2291 args->fn.file_op = __file_close_done; 2292 args->arg = req; 2293 channel->send_request(__file_close, req); 2294 sem_wait(&channel->sem); 2295 2296 return args->rc; 2297 } 2298 2299 static void 2300 cache_free_buffers(struct spdk_file *file) 2301 { 2302 BLOBFS_TRACE(file, "free=%s\n", file->name); 2303 pthread_spin_lock(&file->lock); 2304 pthread_spin_lock(&g_caches_lock); 2305 if (file->tree->present_mask == 0) { 2306 pthread_spin_unlock(&g_caches_lock); 2307 pthread_spin_unlock(&file->lock); 2308 return; 2309 } 2310 spdk_tree_free_buffers(file->tree); 2311 if (file->tree->present_mask == 0) { 2312 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2313 } 2314 file->last = NULL; 2315 pthread_spin_unlock(&g_caches_lock); 2316 pthread_spin_unlock(&file->lock); 2317 } 2318 2319 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS); 2320 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW); 2321