1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include <stdbool.h> 35 #include <assert.h> 36 #include <errno.h> 37 #include <stdlib.h> 38 #include <string.h> 39 #include <pthread.h> 40 41 #include "spdk/blobfs.h" 42 #include "blobfs_internal.h" 43 44 #include "spdk/queue.h" 45 #include "spdk/io_channel.h" 46 #include "spdk/assert.h" 47 #include "spdk/env.h" 48 #include "spdk_internal/log.h" 49 50 #define BLOBFS_TRACE(file, str, args...) \ 51 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_TRACE_RW(file, str, args...) \ 54 SPDK_TRACELOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args) 55 56 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 57 58 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 59 static struct spdk_mempool *g_cache_pool; 60 static TAILQ_HEAD(, spdk_file) g_caches; 61 static pthread_spinlock_t g_caches_lock; 62 63 static void 64 __sem_post(void *arg, int bserrno) 65 { 66 sem_t *sem = arg; 67 68 sem_post(sem); 69 } 70 71 void 72 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 73 { 74 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 75 free(cache_buffer); 76 } 77 78 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 79 80 struct spdk_file { 81 struct spdk_filesystem *fs; 82 struct spdk_blob *blob; 83 char *name; 84 uint64_t length; 85 bool open_for_writing; 86 uint64_t length_flushed; 87 uint64_t append_pos; 88 uint64_t seq_byte_count; 89 uint64_t next_seq_offset; 90 uint32_t priority; 91 TAILQ_ENTRY(spdk_file) tailq; 92 spdk_blob_id blobid; 93 uint32_t ref_count; 94 pthread_spinlock_t lock; 95 struct cache_buffer *last; 96 struct cache_tree *tree; 97 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 98 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 99 TAILQ_ENTRY(spdk_file) cache_tailq; 100 }; 101 102 struct spdk_filesystem { 103 struct spdk_blob_store *bs; 104 TAILQ_HEAD(, spdk_file) files; 105 struct spdk_bs_opts bs_opts; 106 struct spdk_bs_dev *bdev; 107 fs_send_request_fn send_request; 108 struct spdk_io_channel *sync_io_channel; 109 struct spdk_fs_channel *sync_fs_channel; 110 struct spdk_io_channel *md_io_channel; 111 struct spdk_fs_channel *md_fs_channel; 112 }; 113 114 struct spdk_fs_cb_args { 115 union { 116 spdk_fs_op_with_handle_complete fs_op_with_handle; 117 spdk_fs_op_complete fs_op; 118 spdk_file_op_with_handle_complete file_op_with_handle; 119 spdk_file_op_complete file_op; 120 spdk_file_stat_op_complete stat_op; 121 } fn; 122 void *arg; 123 sem_t *sem; 124 struct spdk_filesystem *fs; 125 struct spdk_file *file; 126 int rc; 127 bool from_request; 128 union { 129 struct { 130 uint64_t length; 131 } truncate; 132 struct { 133 struct spdk_io_channel *channel; 134 void *user_buf; 135 void *pin_buf; 136 int is_read; 137 off_t offset; 138 size_t length; 139 uint64_t start_page; 140 uint64_t num_pages; 141 uint32_t blocklen; 142 } rw; 143 struct { 144 const char *old_name; 145 const char *new_name; 146 } rename; 147 struct { 148 struct cache_buffer *cache_buffer; 149 uint64_t length; 150 } flush; 151 struct { 152 struct cache_buffer *cache_buffer; 153 uint64_t length; 154 uint64_t offset; 155 } readahead; 156 struct { 157 uint64_t offset; 158 TAILQ_ENTRY(spdk_fs_request) tailq; 159 } sync; 160 struct { 161 uint32_t num_clusters; 162 } resize; 163 struct { 164 const char *name; 165 uint32_t flags; 166 TAILQ_ENTRY(spdk_fs_request) tailq; 167 } open; 168 struct { 169 const char *name; 170 } create; 171 struct { 172 const char *name; 173 } delete; 174 struct { 175 const char *name; 176 } stat; 177 } op; 178 }; 179 180 static void cache_free_buffers(struct spdk_file *file); 181 182 static void 183 __initialize_cache(void) 184 { 185 if (g_cache_pool != NULL) { 186 return; 187 } 188 189 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 190 g_fs_cache_size / CACHE_BUFFER_SIZE, 191 CACHE_BUFFER_SIZE, -1, SPDK_ENV_SOCKET_ID_ANY); 192 TAILQ_INIT(&g_caches); 193 pthread_spin_init(&g_caches_lock, 0); 194 } 195 196 static uint64_t 197 __file_get_blob_size(struct spdk_file *file) 198 { 199 uint64_t cluster_sz; 200 201 cluster_sz = file->fs->bs_opts.cluster_sz; 202 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 203 } 204 205 struct spdk_fs_request { 206 struct spdk_fs_cb_args args; 207 TAILQ_ENTRY(spdk_fs_request) link; 208 struct spdk_fs_channel *channel; 209 }; 210 211 struct spdk_fs_channel { 212 struct spdk_fs_request *req_mem; 213 TAILQ_HEAD(, spdk_fs_request) reqs; 214 sem_t sem; 215 struct spdk_filesystem *fs; 216 struct spdk_io_channel *bs_channel; 217 fs_send_request_fn send_request; 218 }; 219 220 static struct spdk_fs_request * 221 alloc_fs_request(struct spdk_fs_channel *channel) 222 { 223 struct spdk_fs_request *req; 224 225 req = TAILQ_FIRST(&channel->reqs); 226 if (!req) { 227 return NULL; 228 } 229 TAILQ_REMOVE(&channel->reqs, req, link); 230 memset(req, 0, sizeof(*req)); 231 req->channel = channel; 232 req->args.from_request = true; 233 234 return req; 235 } 236 237 static void 238 free_fs_request(struct spdk_fs_request *req) 239 { 240 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 241 } 242 243 static int 244 _spdk_fs_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx) 245 { 246 struct spdk_filesystem *fs = io_device; 247 struct spdk_fs_channel *channel = ctx_buf; 248 uint32_t max_ops = *(uint32_t *)unique_ctx; 249 uint32_t i; 250 251 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 252 if (!channel->req_mem) { 253 free(channel); 254 return -1; 255 } 256 257 TAILQ_INIT(&channel->reqs); 258 sem_init(&channel->sem, 0, 0); 259 260 for (i = 0; i < max_ops; i++) { 261 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 262 } 263 264 channel->fs = fs; 265 266 return 0; 267 } 268 269 static void 270 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 271 { 272 struct spdk_fs_channel *channel = ctx_buf; 273 274 free(channel->req_mem); 275 if (channel->bs_channel != NULL) { 276 spdk_bs_free_io_channel(channel->bs_channel); 277 } 278 } 279 280 static void 281 __send_request_direct(fs_request_fn fn, void *arg) 282 { 283 fn(arg); 284 } 285 286 static void 287 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 288 { 289 fs->bs = bs; 290 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 291 fs->md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT, 512); 292 fs->md_fs_channel->send_request = __send_request_direct; 293 fs->sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT, 512); 294 fs->sync_fs_channel->send_request = __send_request_direct; 295 } 296 297 static void 298 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 299 { 300 struct spdk_fs_request *req = ctx; 301 struct spdk_fs_cb_args *args = &req->args; 302 struct spdk_filesystem *fs = args->fs; 303 304 if (bserrno == 0) { 305 common_fs_bs_init(fs, bs); 306 } else { 307 free(fs); 308 fs = NULL; 309 } 310 311 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 312 free_fs_request(req); 313 } 314 315 static struct spdk_filesystem * 316 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 317 { 318 struct spdk_filesystem *fs; 319 uint32_t max_ops = 512; 320 321 fs = calloc(1, sizeof(*fs)); 322 if (fs == NULL) { 323 return NULL; 324 } 325 326 fs->bdev = dev; 327 fs->send_request = send_request_fn; 328 TAILQ_INIT(&fs->files); 329 spdk_io_device_register(fs, _spdk_fs_channel_create, _spdk_fs_channel_destroy, 330 sizeof(struct spdk_fs_channel)); 331 332 fs->md_io_channel = spdk_get_io_channel(fs, SPDK_IO_PRIORITY_DEFAULT, true, (void *)&max_ops); 333 fs->md_fs_channel = spdk_io_channel_get_ctx(fs->md_io_channel); 334 335 fs->sync_io_channel = spdk_get_io_channel(fs, SPDK_IO_PRIORITY_DEFAULT, true, (void *)&max_ops); 336 fs->sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_io_channel); 337 338 __initialize_cache(); 339 340 return fs; 341 } 342 343 void 344 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 345 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 346 { 347 struct spdk_filesystem *fs; 348 struct spdk_fs_request *req; 349 struct spdk_fs_cb_args *args; 350 351 fs = fs_alloc(dev, send_request_fn); 352 if (fs == NULL) { 353 cb_fn(cb_arg, NULL, -ENOMEM); 354 return; 355 } 356 357 req = alloc_fs_request(fs->md_fs_channel); 358 if (req == NULL) { 359 cb_fn(cb_arg, NULL, -ENOMEM); 360 return; 361 } 362 363 args = &req->args; 364 args->fn.fs_op_with_handle = cb_fn; 365 args->arg = cb_arg; 366 args->fs = fs; 367 368 spdk_bs_init(dev, NULL, init_cb, req); 369 } 370 371 static struct spdk_file * 372 file_alloc(struct spdk_filesystem *fs) 373 { 374 struct spdk_file *file; 375 376 file = calloc(1, sizeof(*file)); 377 if (file == NULL) { 378 return NULL; 379 } 380 381 file->fs = fs; 382 TAILQ_INIT(&file->open_requests); 383 TAILQ_INIT(&file->sync_requests); 384 pthread_spin_init(&file->lock, 0); 385 file->tree = calloc(1, sizeof(*file->tree)); 386 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 387 file->priority = SPDK_FILE_PRIORITY_LOW; 388 return file; 389 } 390 391 static void 392 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 393 { 394 struct spdk_fs_request *req = ctx; 395 struct spdk_fs_cb_args *args = &req->args; 396 struct spdk_filesystem *fs = args->fs; 397 struct spdk_file *f; 398 uint64_t *length; 399 const char *name; 400 size_t value_len; 401 402 if (rc == -ENOENT) { 403 /* Finished iterating */ 404 args->fn.fs_op_with_handle(args->arg, fs, 0); 405 free_fs_request(req); 406 return; 407 } else if (rc < 0) { 408 args->fn.fs_op_with_handle(args->arg, fs, rc); 409 free_fs_request(req); 410 return; 411 } 412 413 rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len); 414 if (rc < 0) { 415 args->fn.fs_op_with_handle(args->arg, fs, rc); 416 free_fs_request(req); 417 return; 418 } 419 420 rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len); 421 if (rc < 0) { 422 args->fn.fs_op_with_handle(args->arg, fs, rc); 423 free_fs_request(req); 424 return; 425 } 426 assert(value_len == 8); 427 428 f = file_alloc(fs); 429 if (f == NULL) { 430 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 431 free_fs_request(req); 432 return; 433 } 434 435 f->name = strdup(name); 436 f->blobid = spdk_blob_get_id(blob); 437 f->length = *length; 438 f->length_flushed = *length; 439 f->append_pos = *length; 440 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 441 442 spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req); 443 } 444 445 static void 446 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 447 { 448 struct spdk_fs_request *req = ctx; 449 struct spdk_fs_cb_args *args = &req->args; 450 struct spdk_filesystem *fs = args->fs; 451 452 if (bserrno != 0) { 453 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 454 free_fs_request(req); 455 free(fs); 456 return; 457 } 458 459 common_fs_bs_init(fs, bs); 460 spdk_bs_md_iter_first(fs->bs, iter_cb, req); 461 } 462 463 void 464 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 465 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 466 { 467 struct spdk_filesystem *fs; 468 struct spdk_fs_cb_args *args; 469 struct spdk_fs_request *req; 470 471 fs = fs_alloc(dev, send_request_fn); 472 if (fs == NULL) { 473 cb_fn(cb_arg, NULL, -ENOMEM); 474 return; 475 } 476 477 req = alloc_fs_request(fs->md_fs_channel); 478 if (req == NULL) { 479 cb_fn(cb_arg, NULL, -ENOMEM); 480 return; 481 } 482 483 args = &req->args; 484 args->fn.fs_op_with_handle = cb_fn; 485 args->arg = cb_arg; 486 args->fs = fs; 487 488 spdk_bs_load(dev, load_cb, req); 489 } 490 491 static void 492 unload_cb(void *ctx, int bserrno) 493 { 494 struct spdk_fs_request *req = ctx; 495 struct spdk_fs_cb_args *args = &req->args; 496 struct spdk_filesystem *fs = args->fs; 497 498 args->fn.fs_op(args->arg, bserrno); 499 free(req); 500 spdk_io_device_unregister(fs); 501 free(fs); 502 } 503 504 void 505 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 506 { 507 struct spdk_fs_request *req; 508 struct spdk_fs_cb_args *args; 509 510 /* 511 * We must free the md_channel before unloading the blobstore, so just 512 * allocate this request from the general heap. 513 */ 514 req = calloc(1, sizeof(*req)); 515 if (req == NULL) { 516 cb_fn(cb_arg, -ENOMEM); 517 return; 518 } 519 520 args = &req->args; 521 args->fn.fs_op = cb_fn; 522 args->arg = cb_arg; 523 args->fs = fs; 524 525 spdk_fs_free_io_channel(fs->md_io_channel); 526 spdk_fs_free_io_channel(fs->sync_io_channel); 527 spdk_bs_unload(fs->bs, unload_cb, req); 528 } 529 530 static struct spdk_file * 531 fs_find_file(struct spdk_filesystem *fs, const char *name) 532 { 533 struct spdk_file *file; 534 535 TAILQ_FOREACH(file, &fs->files, tailq) { 536 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 537 return file; 538 } 539 } 540 541 return NULL; 542 } 543 544 void 545 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 546 spdk_file_stat_op_complete cb_fn, void *cb_arg) 547 { 548 struct spdk_file_stat stat; 549 struct spdk_file *f = NULL; 550 551 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 552 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 553 return; 554 } 555 556 f = fs_find_file(fs, name); 557 if (f != NULL) { 558 stat.blobid = f->blobid; 559 stat.size = f->length; 560 cb_fn(cb_arg, &stat, 0); 561 return; 562 } 563 564 cb_fn(cb_arg, NULL, -ENOENT); 565 } 566 567 static void 568 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 569 { 570 struct spdk_fs_request *req = arg; 571 struct spdk_fs_cb_args *args = &req->args; 572 573 args->rc = fserrno; 574 if (fserrno == 0) { 575 memcpy(args->arg, stat, sizeof(*stat)); 576 } 577 sem_post(args->sem); 578 } 579 580 static void 581 __file_stat(void *arg) 582 { 583 struct spdk_fs_request *req = arg; 584 struct spdk_fs_cb_args *args = &req->args; 585 586 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 587 args->fn.stat_op, req); 588 } 589 590 int 591 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 592 const char *name, struct spdk_file_stat *stat) 593 { 594 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 595 struct spdk_fs_request *req; 596 int rc; 597 598 req = alloc_fs_request(channel); 599 assert(req != NULL); 600 601 req->args.fs = fs; 602 req->args.op.stat.name = name; 603 req->args.fn.stat_op = __copy_stat; 604 req->args.arg = stat; 605 req->args.sem = &channel->sem; 606 channel->send_request(__file_stat, req); 607 sem_wait(&channel->sem); 608 609 rc = req->args.rc; 610 free_fs_request(req); 611 612 return rc; 613 } 614 615 static void 616 fs_create_blob_close_cb(void *ctx, int bserrno) 617 { 618 struct spdk_fs_request *req = ctx; 619 struct spdk_fs_cb_args *args = &req->args; 620 621 args->fn.file_op(args->arg, bserrno); 622 free_fs_request(req); 623 } 624 625 static void 626 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 627 { 628 struct spdk_fs_request *req = ctx; 629 struct spdk_fs_cb_args *args = &req->args; 630 struct spdk_file *f = args->file; 631 uint64_t length = 0; 632 633 f->blob = blob; 634 spdk_bs_md_resize_blob(blob, 1); 635 spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 636 spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length)); 637 638 spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args); 639 } 640 641 static void 642 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 643 { 644 struct spdk_fs_request *req = ctx; 645 struct spdk_fs_cb_args *args = &req->args; 646 struct spdk_file *f = args->file; 647 648 f->blobid = blobid; 649 spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 650 } 651 652 void 653 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 654 spdk_file_op_complete cb_fn, void *cb_arg) 655 { 656 struct spdk_file *file; 657 struct spdk_fs_request *req; 658 struct spdk_fs_cb_args *args; 659 660 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 661 cb_fn(cb_arg, -ENAMETOOLONG); 662 return; 663 } 664 665 file = fs_find_file(fs, name); 666 if (file != NULL) { 667 cb_fn(cb_arg, -EEXIST); 668 return; 669 } 670 671 file = file_alloc(fs); 672 if (file == NULL) { 673 cb_fn(cb_arg, -ENOMEM); 674 return; 675 } 676 677 req = alloc_fs_request(fs->md_fs_channel); 678 if (req == NULL) { 679 cb_fn(cb_arg, -ENOMEM); 680 return; 681 } 682 683 args = &req->args; 684 args->file = file; 685 args->fn.file_op = cb_fn; 686 args->arg = cb_arg; 687 688 file->name = strdup(name); 689 spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args); 690 } 691 692 static void 693 __fs_create_file_done(void *arg, int fserrno) 694 { 695 struct spdk_fs_request *req = arg; 696 struct spdk_fs_cb_args *args = &req->args; 697 698 args->rc = fserrno; 699 sem_post(args->sem); 700 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 701 } 702 703 static void 704 __fs_create_file(void *arg) 705 { 706 struct spdk_fs_request *req = arg; 707 struct spdk_fs_cb_args *args = &req->args; 708 709 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 710 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 711 } 712 713 int 714 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 715 { 716 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 717 struct spdk_fs_request *req; 718 struct spdk_fs_cb_args *args; 719 int rc; 720 721 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 722 723 req = alloc_fs_request(channel); 724 assert(req != NULL); 725 726 args = &req->args; 727 args->fs = fs; 728 args->op.create.name = name; 729 args->sem = &channel->sem; 730 fs->send_request(__fs_create_file, req); 731 sem_wait(&channel->sem); 732 rc = args->rc; 733 free_fs_request(req); 734 735 return rc; 736 } 737 738 static void 739 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 740 { 741 struct spdk_fs_request *req = ctx; 742 struct spdk_fs_cb_args *args = &req->args; 743 struct spdk_file *f = args->file; 744 745 f->blob = blob; 746 while (!TAILQ_EMPTY(&f->open_requests)) { 747 req = TAILQ_FIRST(&f->open_requests); 748 args = &req->args; 749 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 750 args->fn.file_op_with_handle(args->arg, f, bserrno); 751 free_fs_request(req); 752 } 753 } 754 755 static void 756 fs_open_blob_create_cb(void *ctx, int bserrno) 757 { 758 struct spdk_fs_request *req = ctx; 759 struct spdk_fs_cb_args *args = &req->args; 760 struct spdk_file *file = args->file; 761 struct spdk_filesystem *fs = args->fs; 762 763 if (file == NULL) { 764 /* 765 * This is from an open with CREATE flag - the file 766 * is now created so look it up in the file list for this 767 * filesystem. 768 */ 769 file = fs_find_file(fs, args->op.open.name); 770 assert(file != NULL); 771 args->file = file; 772 } 773 774 file->ref_count++; 775 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 776 if (file->ref_count == 1) { 777 assert(file->blob == NULL); 778 spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 779 } else if (file->blob != NULL) { 780 fs_open_blob_done(req, file->blob, 0); 781 } else { 782 /* 783 * The blob open for this file is in progress due to a previous 784 * open request. When that open completes, it will invoke the 785 * open callback for this request. 786 */ 787 } 788 } 789 790 void 791 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 792 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 793 { 794 struct spdk_file *f = NULL; 795 struct spdk_fs_request *req; 796 struct spdk_fs_cb_args *args; 797 798 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 799 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 800 return; 801 } 802 803 f = fs_find_file(fs, name); 804 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 805 cb_fn(cb_arg, NULL, -ENOENT); 806 return; 807 } 808 809 req = alloc_fs_request(fs->md_fs_channel); 810 if (req == NULL) { 811 cb_fn(cb_arg, NULL, -ENOMEM); 812 return; 813 } 814 815 args = &req->args; 816 args->fn.file_op_with_handle = cb_fn; 817 args->arg = cb_arg; 818 args->file = f; 819 args->fs = fs; 820 args->op.open.name = name; 821 822 if (f == NULL) { 823 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 824 } else { 825 fs_open_blob_create_cb(req, 0); 826 } 827 } 828 829 static void 830 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 831 { 832 struct spdk_fs_request *req = arg; 833 struct spdk_fs_cb_args *args = &req->args; 834 835 args->file = file; 836 args->rc = bserrno; 837 sem_post(args->sem); 838 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 839 } 840 841 static void 842 __fs_open_file(void *arg) 843 { 844 struct spdk_fs_request *req = arg; 845 struct spdk_fs_cb_args *args = &req->args; 846 847 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 848 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 849 __fs_open_file_done, req); 850 } 851 852 int 853 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 854 const char *name, uint32_t flags, struct spdk_file **file) 855 { 856 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 857 struct spdk_fs_request *req; 858 struct spdk_fs_cb_args *args; 859 int rc; 860 861 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 862 863 req = alloc_fs_request(channel); 864 assert(req != NULL); 865 866 args = &req->args; 867 args->fs = fs; 868 args->op.open.name = name; 869 args->op.open.flags = flags; 870 args->sem = &channel->sem; 871 fs->send_request(__fs_open_file, req); 872 sem_wait(&channel->sem); 873 rc = args->rc; 874 if (rc == 0) { 875 *file = args->file; 876 } else { 877 *file = NULL; 878 } 879 free_fs_request(req); 880 881 return rc; 882 } 883 884 static void 885 fs_rename_blob_close_cb(void *ctx, int bserrno) 886 { 887 struct spdk_fs_request *req = ctx; 888 struct spdk_fs_cb_args *args = &req->args; 889 890 args->fn.fs_op(args->arg, bserrno); 891 free_fs_request(req); 892 } 893 894 static void 895 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 896 { 897 struct spdk_fs_request *req = ctx; 898 struct spdk_fs_cb_args *args = &req->args; 899 struct spdk_file *f = args->file; 900 const char *new_name = args->op.rename.new_name; 901 902 f->blob = blob; 903 spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 904 spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req); 905 } 906 907 static void 908 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 909 { 910 struct spdk_fs_cb_args *args = &req->args; 911 struct spdk_file *f; 912 913 f = fs_find_file(args->fs, args->op.rename.old_name); 914 if (f == NULL) { 915 args->fn.fs_op(args->arg, -ENOENT); 916 free_fs_request(req); 917 return; 918 } 919 920 free(f->name); 921 f->name = strdup(args->op.rename.new_name); 922 args->file = f; 923 spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 924 } 925 926 static void 927 fs_rename_delete_done(void *arg, int fserrno) 928 { 929 __spdk_fs_md_rename_file(arg); 930 } 931 932 void 933 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 934 const char *old_name, const char *new_name, 935 spdk_file_op_complete cb_fn, void *cb_arg) 936 { 937 struct spdk_file *f; 938 struct spdk_fs_request *req; 939 struct spdk_fs_cb_args *args; 940 941 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name); 942 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 943 cb_fn(cb_arg, -ENAMETOOLONG); 944 return; 945 } 946 947 req = alloc_fs_request(fs->md_fs_channel); 948 if (req == NULL) { 949 cb_fn(cb_arg, -ENOMEM); 950 return; 951 } 952 953 args = &req->args; 954 args->fn.fs_op = cb_fn; 955 args->fs = fs; 956 args->arg = cb_arg; 957 args->op.rename.old_name = old_name; 958 args->op.rename.new_name = new_name; 959 960 f = fs_find_file(fs, new_name); 961 if (f == NULL) { 962 __spdk_fs_md_rename_file(req); 963 return; 964 } 965 966 /* 967 * The rename overwrites an existing file. So delete the existing file, then 968 * do the actual rename. 969 */ 970 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 971 } 972 973 static void 974 __fs_rename_file_done(void *arg, int fserrno) 975 { 976 struct spdk_fs_request *req = arg; 977 struct spdk_fs_cb_args *args = &req->args; 978 979 args->rc = fserrno; 980 sem_post(args->sem); 981 } 982 983 static void 984 __fs_rename_file(void *arg) 985 { 986 struct spdk_fs_request *req = arg; 987 struct spdk_fs_cb_args *args = &req->args; 988 989 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 990 __fs_rename_file_done, req); 991 } 992 993 int 994 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 995 const char *old_name, const char *new_name) 996 { 997 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 998 struct spdk_fs_request *req; 999 struct spdk_fs_cb_args *args; 1000 int rc; 1001 1002 req = alloc_fs_request(channel); 1003 assert(req != NULL); 1004 1005 args = &req->args; 1006 1007 args->fs = fs; 1008 args->op.rename.old_name = old_name; 1009 args->op.rename.new_name = new_name; 1010 args->sem = &channel->sem; 1011 fs->send_request(__fs_rename_file, req); 1012 sem_wait(&channel->sem); 1013 rc = args->rc; 1014 free_fs_request(req); 1015 return rc; 1016 } 1017 1018 static void 1019 blob_delete_cb(void *ctx, int bserrno) 1020 { 1021 struct spdk_fs_request *req = ctx; 1022 struct spdk_fs_cb_args *args = &req->args; 1023 1024 args->fn.file_op(args->arg, bserrno); 1025 free_fs_request(req); 1026 } 1027 1028 void 1029 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1030 spdk_file_op_complete cb_fn, void *cb_arg) 1031 { 1032 struct spdk_file *f; 1033 spdk_blob_id blobid; 1034 struct spdk_fs_request *req; 1035 struct spdk_fs_cb_args *args; 1036 1037 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1038 1039 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1040 cb_fn(cb_arg, -ENAMETOOLONG); 1041 return; 1042 } 1043 1044 f = fs_find_file(fs, name); 1045 if (f == NULL) { 1046 cb_fn(cb_arg, -ENOENT); 1047 return; 1048 } 1049 1050 if (f->ref_count > 0) { 1051 /* For now, do not allow deleting files with open references. */ 1052 cb_fn(cb_arg, -EBUSY); 1053 return; 1054 } 1055 1056 req = alloc_fs_request(fs->md_fs_channel); 1057 if (req == NULL) { 1058 cb_fn(cb_arg, -ENOMEM); 1059 return; 1060 } 1061 1062 TAILQ_REMOVE(&fs->files, f, tailq); 1063 1064 cache_free_buffers(f); 1065 1066 blobid = f->blobid; 1067 1068 free(f->name); 1069 free(f->tree); 1070 free(f); 1071 1072 args = &req->args; 1073 args->fn.file_op = cb_fn; 1074 args->arg = cb_arg; 1075 spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1076 } 1077 1078 static void 1079 __fs_delete_file_done(void *arg, int fserrno) 1080 { 1081 struct spdk_fs_request *req = arg; 1082 struct spdk_fs_cb_args *args = &req->args; 1083 1084 args->rc = fserrno; 1085 sem_post(args->sem); 1086 } 1087 1088 static void 1089 __fs_delete_file(void *arg) 1090 { 1091 struct spdk_fs_request *req = arg; 1092 struct spdk_fs_cb_args *args = &req->args; 1093 1094 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1095 } 1096 1097 int 1098 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1099 const char *name) 1100 { 1101 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1102 struct spdk_fs_request *req; 1103 struct spdk_fs_cb_args *args; 1104 int rc; 1105 1106 req = alloc_fs_request(channel); 1107 assert(req != NULL); 1108 1109 args = &req->args; 1110 args->fs = fs; 1111 args->op.delete.name = name; 1112 args->sem = &channel->sem; 1113 fs->send_request(__fs_delete_file, req); 1114 sem_wait(&channel->sem); 1115 rc = args->rc; 1116 free_fs_request(req); 1117 1118 return rc; 1119 } 1120 1121 spdk_fs_iter 1122 spdk_fs_iter_first(struct spdk_filesystem *fs) 1123 { 1124 struct spdk_file *f; 1125 1126 f = TAILQ_FIRST(&fs->files); 1127 return f; 1128 } 1129 1130 spdk_fs_iter 1131 spdk_fs_iter_next(spdk_fs_iter iter) 1132 { 1133 struct spdk_file *f = iter; 1134 1135 if (f == NULL) { 1136 return NULL; 1137 } 1138 1139 f = TAILQ_NEXT(f, tailq); 1140 return f; 1141 } 1142 1143 const char * 1144 spdk_file_get_name(struct spdk_file *file) 1145 { 1146 return file->name; 1147 } 1148 1149 uint64_t 1150 spdk_file_get_length(struct spdk_file *file) 1151 { 1152 assert(file != NULL); 1153 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1154 return file->length; 1155 } 1156 1157 static void 1158 fs_truncate_complete_cb(void *ctx, int bserrno) 1159 { 1160 struct spdk_fs_request *req = ctx; 1161 struct spdk_fs_cb_args *args = &req->args; 1162 1163 args->fn.file_op(args->arg, bserrno); 1164 free_fs_request(req); 1165 } 1166 1167 static uint64_t 1168 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1169 { 1170 return (length + cluster_sz - 1) / cluster_sz; 1171 } 1172 1173 void 1174 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1175 spdk_file_op_complete cb_fn, void *cb_arg) 1176 { 1177 struct spdk_filesystem *fs; 1178 size_t num_clusters; 1179 struct spdk_fs_request *req; 1180 struct spdk_fs_cb_args *args; 1181 1182 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1183 if (length == file->length) { 1184 cb_fn(cb_arg, 0); 1185 return; 1186 } 1187 1188 req = alloc_fs_request(file->fs->md_fs_channel); 1189 if (req == NULL) { 1190 cb_fn(cb_arg, -ENOMEM); 1191 return; 1192 } 1193 1194 args = &req->args; 1195 args->fn.file_op = cb_fn; 1196 args->arg = cb_arg; 1197 args->file = file; 1198 fs = file->fs; 1199 1200 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1201 1202 spdk_bs_md_resize_blob(file->blob, num_clusters); 1203 spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length)); 1204 1205 file->length = length; 1206 if (file->append_pos > file->length) { 1207 file->append_pos = file->length; 1208 } 1209 1210 spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args); 1211 } 1212 1213 static void 1214 __truncate(void *arg) 1215 { 1216 struct spdk_fs_request *req = arg; 1217 struct spdk_fs_cb_args *args = &req->args; 1218 1219 spdk_file_truncate_async(args->file, args->op.truncate.length, 1220 args->fn.file_op, args->arg); 1221 } 1222 1223 void 1224 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1225 uint64_t length) 1226 { 1227 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1228 struct spdk_fs_request *req; 1229 struct spdk_fs_cb_args *args; 1230 1231 req = alloc_fs_request(channel); 1232 assert(req != NULL); 1233 1234 args = &req->args; 1235 1236 args->file = file; 1237 args->op.truncate.length = length; 1238 args->fn.file_op = __sem_post; 1239 args->arg = &channel->sem; 1240 1241 channel->send_request(__truncate, req); 1242 sem_wait(&channel->sem); 1243 free_fs_request(req); 1244 } 1245 1246 static void 1247 __rw_done(void *ctx, int bserrno) 1248 { 1249 struct spdk_fs_request *req = ctx; 1250 struct spdk_fs_cb_args *args = &req->args; 1251 1252 spdk_free(args->op.rw.pin_buf); 1253 args->fn.file_op(args->arg, bserrno); 1254 free_fs_request(req); 1255 } 1256 1257 static void 1258 __read_done(void *ctx, int bserrno) 1259 { 1260 struct spdk_fs_request *req = ctx; 1261 struct spdk_fs_cb_args *args = &req->args; 1262 1263 if (args->op.rw.is_read) { 1264 memcpy(args->op.rw.user_buf, 1265 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1266 args->op.rw.length); 1267 __rw_done(req, 0); 1268 } else { 1269 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1270 args->op.rw.user_buf, 1271 args->op.rw.length); 1272 spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel, 1273 args->op.rw.pin_buf, 1274 args->op.rw.start_page, args->op.rw.num_pages, 1275 __rw_done, req); 1276 } 1277 } 1278 1279 static void 1280 __do_blob_read(void *ctx, int fserrno) 1281 { 1282 struct spdk_fs_request *req = ctx; 1283 struct spdk_fs_cb_args *args = &req->args; 1284 1285 spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel, 1286 args->op.rw.pin_buf, 1287 args->op.rw.start_page, args->op.rw.num_pages, 1288 __read_done, req); 1289 } 1290 1291 static void 1292 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1293 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1294 { 1295 uint64_t end_page; 1296 1297 *page_size = spdk_bs_get_page_size(file->fs->bs); 1298 *start_page = offset / *page_size; 1299 end_page = (offset + length - 1) / *page_size; 1300 *num_pages = (end_page - *start_page + 1); 1301 } 1302 1303 static void 1304 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1305 void *payload, uint64_t offset, uint64_t length, 1306 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1307 { 1308 struct spdk_fs_request *req; 1309 struct spdk_fs_cb_args *args; 1310 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1311 uint64_t start_page, num_pages, pin_buf_length; 1312 uint32_t page_size; 1313 1314 if (is_read && offset + length > file->length) { 1315 cb_fn(cb_arg, -EINVAL); 1316 return; 1317 } 1318 1319 req = alloc_fs_request(channel); 1320 if (req == NULL) { 1321 cb_fn(cb_arg, -ENOMEM); 1322 return; 1323 } 1324 1325 args = &req->args; 1326 args->fn.file_op = cb_fn; 1327 args->arg = cb_arg; 1328 args->file = file; 1329 args->op.rw.channel = channel->bs_channel; 1330 args->op.rw.user_buf = payload; 1331 args->op.rw.is_read = is_read; 1332 args->op.rw.offset = offset; 1333 args->op.rw.length = length; 1334 1335 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1336 pin_buf_length = num_pages * page_size; 1337 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, 4096, NULL); 1338 1339 args->op.rw.start_page = start_page; 1340 args->op.rw.num_pages = num_pages; 1341 1342 if (!is_read && file->length < offset + length) { 1343 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1344 } else { 1345 __do_blob_read(req, 0); 1346 } 1347 } 1348 1349 void 1350 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1351 void *payload, uint64_t offset, uint64_t length, 1352 spdk_file_op_complete cb_fn, void *cb_arg) 1353 { 1354 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1355 } 1356 1357 void 1358 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1359 void *payload, uint64_t offset, uint64_t length, 1360 spdk_file_op_complete cb_fn, void *cb_arg) 1361 { 1362 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n", 1363 file->name, offset, length); 1364 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1365 } 1366 1367 struct spdk_io_channel * 1368 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs, uint32_t priority) 1369 { 1370 struct spdk_io_channel *io_channel; 1371 struct spdk_fs_channel *fs_channel; 1372 uint32_t max_ops = 512; 1373 1374 io_channel = spdk_get_io_channel(fs, priority, true, (void *)&max_ops); 1375 fs_channel = spdk_io_channel_get_ctx(io_channel); 1376 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT, 512); 1377 fs_channel->send_request = __send_request_direct; 1378 1379 return io_channel; 1380 } 1381 1382 struct spdk_io_channel * 1383 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs, uint32_t priority) 1384 { 1385 struct spdk_io_channel *io_channel; 1386 struct spdk_fs_channel *fs_channel; 1387 uint32_t max_ops = 16; 1388 1389 io_channel = spdk_get_io_channel(fs, priority, true, (void *)&max_ops); 1390 fs_channel = spdk_io_channel_get_ctx(io_channel); 1391 fs_channel->send_request = fs->send_request; 1392 1393 return io_channel; 1394 } 1395 1396 void 1397 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1398 { 1399 spdk_put_io_channel(channel); 1400 } 1401 1402 void 1403 spdk_fs_set_cache_size(uint64_t size_in_mb) 1404 { 1405 g_fs_cache_size = size_in_mb * 1024 * 1024; 1406 } 1407 1408 uint64_t 1409 spdk_fs_get_cache_size(void) 1410 { 1411 return g_fs_cache_size / (1024 * 1024); 1412 } 1413 1414 static void __file_flush(void *_args); 1415 1416 static void * 1417 alloc_cache_memory_buffer(struct spdk_file *context) 1418 { 1419 struct spdk_file *file; 1420 void *buf; 1421 1422 buf = spdk_mempool_get(g_cache_pool); 1423 if (buf != NULL) { 1424 return buf; 1425 } 1426 1427 pthread_spin_lock(&g_caches_lock); 1428 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1429 if (!file->open_for_writing && 1430 file->priority == SPDK_FILE_PRIORITY_LOW && 1431 file != context) { 1432 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1433 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1434 break; 1435 } 1436 } 1437 pthread_spin_unlock(&g_caches_lock); 1438 if (file != NULL) { 1439 cache_free_buffers(file); 1440 buf = spdk_mempool_get(g_cache_pool); 1441 if (buf != NULL) { 1442 return buf; 1443 } 1444 } 1445 1446 pthread_spin_lock(&g_caches_lock); 1447 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1448 if (!file->open_for_writing && file != context) { 1449 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1450 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1451 break; 1452 } 1453 } 1454 pthread_spin_unlock(&g_caches_lock); 1455 if (file != NULL) { 1456 cache_free_buffers(file); 1457 buf = spdk_mempool_get(g_cache_pool); 1458 if (buf != NULL) { 1459 return buf; 1460 } 1461 } 1462 1463 pthread_spin_lock(&g_caches_lock); 1464 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1465 if (file != context) { 1466 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1467 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1468 break; 1469 } 1470 } 1471 pthread_spin_unlock(&g_caches_lock); 1472 if (file != NULL) { 1473 cache_free_buffers(file); 1474 buf = spdk_mempool_get(g_cache_pool); 1475 if (buf != NULL) { 1476 return buf; 1477 } 1478 } 1479 1480 assert(false); 1481 return NULL; 1482 } 1483 1484 static struct cache_buffer * 1485 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1486 { 1487 struct cache_buffer *buf; 1488 int count = 0; 1489 1490 buf = calloc(1, sizeof(*buf)); 1491 if (buf == NULL) { 1492 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "calloc failed\n"); 1493 return NULL; 1494 } 1495 1496 buf->buf = alloc_cache_memory_buffer(file); 1497 if (buf->buf == NULL) { 1498 while (buf->buf == NULL) { 1499 /* 1500 * TODO: alloc_cache_memory_buffer() should eventually free 1501 * some buffers. Need a more sophisticated check here, instead 1502 * of just bailing if 100 tries does not result in getting a 1503 * free buffer. This will involve using the sync channel's 1504 * semaphore to block until a buffer becomes available. 1505 */ 1506 if (count++ == 100) { 1507 SPDK_ERRLOG("could not allocate cache buffer\n"); 1508 assert(false); 1509 free(buf); 1510 return NULL; 1511 } 1512 buf->buf = alloc_cache_memory_buffer(file); 1513 } 1514 } 1515 1516 buf->buf_size = CACHE_BUFFER_SIZE; 1517 buf->offset = offset; 1518 1519 pthread_spin_lock(&g_caches_lock); 1520 if (file->tree->present_mask == 0) { 1521 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1522 } 1523 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1524 pthread_spin_unlock(&g_caches_lock); 1525 1526 return buf; 1527 } 1528 1529 static struct cache_buffer * 1530 cache_append_buffer(struct spdk_file *file) 1531 { 1532 struct cache_buffer *last; 1533 1534 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1535 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1536 1537 last = cache_insert_buffer(file, file->append_pos); 1538 if (last == NULL) { 1539 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n"); 1540 return NULL; 1541 } 1542 1543 if (file->last != NULL) { 1544 file->last->next = last; 1545 } 1546 file->last = last; 1547 1548 return last; 1549 } 1550 1551 static void 1552 __wake_caller(struct spdk_fs_cb_args *args) 1553 { 1554 sem_post(args->sem); 1555 } 1556 1557 static void 1558 __file_cache_finish_sync(struct spdk_file *file) 1559 { 1560 struct spdk_fs_request *sync_req; 1561 struct spdk_fs_cb_args *sync_args; 1562 1563 pthread_spin_lock(&file->lock); 1564 while (!TAILQ_EMPTY(&file->sync_requests)) { 1565 sync_req = TAILQ_FIRST(&file->sync_requests); 1566 sync_args = &sync_req->args; 1567 if (sync_args->op.sync.offset > file->length_flushed) { 1568 break; 1569 } 1570 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1571 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1572 pthread_spin_unlock(&file->lock); 1573 sync_args->fn.file_op(sync_args->arg, 0); 1574 pthread_spin_lock(&file->lock); 1575 free_fs_request(sync_req); 1576 } 1577 pthread_spin_unlock(&file->lock); 1578 } 1579 1580 static void 1581 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1582 { 1583 struct spdk_file *file = ctx; 1584 1585 __file_cache_finish_sync(file); 1586 } 1587 1588 static void 1589 __free_args(struct spdk_fs_cb_args *args) 1590 { 1591 struct spdk_fs_request *req; 1592 1593 if (!args->from_request) { 1594 free(args); 1595 } else { 1596 /* Depends on args being at the start of the spdk_fs_request structure. */ 1597 req = (struct spdk_fs_request *)args; 1598 free_fs_request(req); 1599 } 1600 } 1601 1602 static void 1603 __file_flush_done(void *arg, int bserrno) 1604 { 1605 struct spdk_fs_cb_args *args = arg; 1606 struct spdk_fs_request *sync_req; 1607 struct spdk_file *file = args->file; 1608 struct cache_buffer *next = args->op.flush.cache_buffer; 1609 1610 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1611 1612 pthread_spin_lock(&file->lock); 1613 next->in_progress = false; 1614 next->bytes_flushed += args->op.flush.length; 1615 file->length_flushed += args->op.flush.length; 1616 if (file->length_flushed > file->length) { 1617 file->length = file->length_flushed; 1618 } 1619 if (next->bytes_flushed == next->buf_size) { 1620 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1621 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1622 } 1623 1624 TAILQ_FOREACH_REVERSE(sync_req, &file->sync_requests, sync_requests_head, args.op.sync.tailq) { 1625 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1626 break; 1627 } 1628 } 1629 1630 if (sync_req != NULL) { 1631 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1632 spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed, 1633 sizeof(file->length_flushed)); 1634 1635 pthread_spin_unlock(&file->lock); 1636 spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file); 1637 } else { 1638 pthread_spin_unlock(&file->lock); 1639 __file_cache_finish_sync(file); 1640 } 1641 1642 /* 1643 * Assert that there is no cached data that extends past the end of the underlying 1644 * blob. 1645 */ 1646 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1647 next->bytes_filled == 0); 1648 1649 __file_flush(args); 1650 } 1651 1652 static void 1653 __file_flush(void *_args) 1654 { 1655 struct spdk_fs_cb_args *args = _args; 1656 struct spdk_file *file = args->file; 1657 struct cache_buffer *next; 1658 uint64_t offset, length, start_page, num_pages; 1659 uint32_t page_size; 1660 1661 pthread_spin_lock(&file->lock); 1662 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1663 if (next == NULL || next->in_progress) { 1664 /* 1665 * There is either no data to flush, or a flush I/O is already in 1666 * progress. So return immediately - if a flush I/O is in 1667 * progress we will flush more data after that is completed. 1668 */ 1669 __free_args(args); 1670 pthread_spin_unlock(&file->lock); 1671 return; 1672 } 1673 1674 offset = next->offset + next->bytes_flushed; 1675 length = next->bytes_filled - next->bytes_flushed; 1676 if (length == 0) { 1677 __free_args(args); 1678 pthread_spin_unlock(&file->lock); 1679 return; 1680 } 1681 args->op.flush.length = length; 1682 args->op.flush.cache_buffer = next; 1683 1684 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1685 1686 next->in_progress = true; 1687 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1688 offset, length, start_page, num_pages); 1689 pthread_spin_unlock(&file->lock); 1690 spdk_bs_io_write_blob(file->blob, file->fs->sync_fs_channel->bs_channel, 1691 next->buf + (start_page * page_size) - next->offset, 1692 start_page, num_pages, 1693 __file_flush_done, args); 1694 } 1695 1696 static void 1697 __file_extend_done(void *arg, int bserrno) 1698 { 1699 struct spdk_fs_cb_args *args = arg; 1700 1701 __wake_caller(args); 1702 } 1703 1704 static void 1705 __file_extend_blob(void *_args) 1706 { 1707 struct spdk_fs_cb_args *args = _args; 1708 struct spdk_file *file = args->file; 1709 1710 spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters); 1711 1712 spdk_bs_md_sync_blob(file->blob, __file_extend_done, args); 1713 } 1714 1715 static void 1716 __rw_from_file_done(void *arg, int bserrno) 1717 { 1718 struct spdk_fs_cb_args *args = arg; 1719 1720 __wake_caller(args); 1721 __free_args(args); 1722 } 1723 1724 static void 1725 __rw_from_file(void *_args) 1726 { 1727 struct spdk_fs_cb_args *args = _args; 1728 struct spdk_file *file = args->file; 1729 1730 if (args->op.rw.is_read) { 1731 spdk_file_read_async(file, file->fs->sync_io_channel, args->op.rw.user_buf, 1732 args->op.rw.offset, args->op.rw.length, 1733 __rw_from_file_done, args); 1734 } else { 1735 spdk_file_write_async(file, file->fs->sync_io_channel, args->op.rw.user_buf, 1736 args->op.rw.offset, args->op.rw.length, 1737 __rw_from_file_done, args); 1738 } 1739 } 1740 1741 static int 1742 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1743 uint64_t offset, uint64_t length, bool is_read) 1744 { 1745 struct spdk_fs_cb_args *args; 1746 1747 args = calloc(1, sizeof(*args)); 1748 if (args == NULL) { 1749 sem_post(sem); 1750 return -ENOMEM; 1751 } 1752 1753 args->file = file; 1754 args->sem = sem; 1755 args->op.rw.user_buf = payload; 1756 args->op.rw.offset = offset; 1757 args->op.rw.length = length; 1758 args->op.rw.is_read = is_read; 1759 file->fs->send_request(__rw_from_file, args); 1760 return 0; 1761 } 1762 1763 int 1764 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 1765 void *payload, uint64_t offset, uint64_t length) 1766 { 1767 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1768 struct spdk_fs_cb_args *args; 1769 uint64_t rem_length, copy, blob_size, cluster_sz; 1770 uint32_t cache_buffers_filled = 0; 1771 uint8_t *cur_payload; 1772 struct cache_buffer *last; 1773 1774 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 1775 1776 if (length == 0) { 1777 return 0; 1778 } 1779 1780 if (offset != file->append_pos) { 1781 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 1782 return -EINVAL; 1783 } 1784 1785 pthread_spin_lock(&file->lock); 1786 file->open_for_writing = true; 1787 1788 if (file->last == NULL) { 1789 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 1790 cache_append_buffer(file); 1791 } else { 1792 int rc; 1793 1794 file->append_pos += length; 1795 rc = __send_rw_from_file(file, &channel->sem, payload, 1796 offset, length, false); 1797 pthread_spin_unlock(&file->lock); 1798 sem_wait(&channel->sem); 1799 return rc; 1800 } 1801 } 1802 1803 blob_size = __file_get_blob_size(file); 1804 1805 if ((offset + length) > blob_size) { 1806 struct spdk_fs_cb_args extend_args = {}; 1807 1808 cluster_sz = file->fs->bs_opts.cluster_sz; 1809 extend_args.sem = &channel->sem; 1810 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 1811 extend_args.file = file; 1812 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 1813 pthread_spin_unlock(&file->lock); 1814 file->fs->send_request(__file_extend_blob, &extend_args); 1815 sem_wait(&channel->sem); 1816 } 1817 1818 last = file->last; 1819 rem_length = length; 1820 cur_payload = payload; 1821 while (rem_length > 0) { 1822 copy = last->buf_size - last->bytes_filled; 1823 if (copy > rem_length) { 1824 copy = rem_length; 1825 } 1826 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 1827 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 1828 file->append_pos += copy; 1829 if (file->length < file->append_pos) { 1830 file->length = file->append_pos; 1831 } 1832 cur_payload += copy; 1833 last->bytes_filled += copy; 1834 rem_length -= copy; 1835 if (last->bytes_filled == last->buf_size) { 1836 cache_buffers_filled++; 1837 last = cache_append_buffer(file); 1838 if (last == NULL) { 1839 BLOBFS_TRACE(file, "nomem\n"); 1840 pthread_spin_unlock(&file->lock); 1841 return -ENOMEM; 1842 } 1843 } 1844 } 1845 1846 if (cache_buffers_filled == 0) { 1847 pthread_spin_unlock(&file->lock); 1848 return 0; 1849 } 1850 1851 args = calloc(1, sizeof(*args)); 1852 if (args == NULL) { 1853 pthread_spin_unlock(&file->lock); 1854 return -ENOMEM; 1855 } 1856 1857 args->file = file; 1858 file->fs->send_request(__file_flush, args); 1859 pthread_spin_unlock(&file->lock); 1860 return 0; 1861 } 1862 1863 static void 1864 __readahead_done(void *arg, int bserrno) 1865 { 1866 struct spdk_fs_cb_args *args = arg; 1867 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 1868 struct spdk_file *file = args->file; 1869 1870 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 1871 1872 pthread_spin_lock(&file->lock); 1873 cache_buffer->bytes_filled = args->op.readahead.length; 1874 cache_buffer->bytes_flushed = args->op.readahead.length; 1875 cache_buffer->in_progress = false; 1876 pthread_spin_unlock(&file->lock); 1877 1878 __free_args(args); 1879 } 1880 1881 static void 1882 __readahead(void *_args) 1883 { 1884 struct spdk_fs_cb_args *args = _args; 1885 struct spdk_file *file = args->file; 1886 uint64_t offset, length, start_page, num_pages; 1887 uint32_t page_size; 1888 1889 offset = args->op.readahead.offset; 1890 length = args->op.readahead.length; 1891 assert(length > 0); 1892 1893 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1894 1895 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1896 offset, length, start_page, num_pages); 1897 spdk_bs_io_read_blob(file->blob, file->fs->sync_fs_channel->bs_channel, 1898 args->op.readahead.cache_buffer->buf, 1899 start_page, num_pages, 1900 __readahead_done, args); 1901 } 1902 1903 static uint64_t 1904 __next_cache_buffer_offset(uint64_t offset) 1905 { 1906 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 1907 } 1908 1909 static void 1910 check_readahead(struct spdk_file *file, uint64_t offset) 1911 { 1912 struct spdk_fs_cb_args *args; 1913 1914 offset = __next_cache_buffer_offset(offset); 1915 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 1916 return; 1917 } 1918 1919 args = calloc(1, sizeof(*args)); 1920 if (args == NULL) { 1921 return; 1922 } 1923 1924 BLOBFS_TRACE(file, "offset=%jx\n", offset); 1925 1926 args->file = file; 1927 args->op.readahead.offset = offset; 1928 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 1929 args->op.readahead.cache_buffer->in_progress = true; 1930 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 1931 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 1932 } else { 1933 args->op.readahead.length = CACHE_BUFFER_SIZE; 1934 } 1935 file->fs->send_request(__readahead, args); 1936 } 1937 1938 static int 1939 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 1940 { 1941 struct cache_buffer *buf; 1942 1943 buf = spdk_tree_find_filled_buffer(file->tree, offset); 1944 if (buf == NULL) { 1945 return __send_rw_from_file(file, sem, payload, offset, length, true); 1946 } 1947 1948 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 1949 length = buf->offset + buf->bytes_filled - offset; 1950 } 1951 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 1952 memcpy(payload, &buf->buf[offset - buf->offset], length); 1953 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 1954 pthread_spin_lock(&g_caches_lock); 1955 spdk_tree_remove_buffer(file->tree, buf); 1956 if (file->tree->present_mask == 0) { 1957 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1958 } 1959 pthread_spin_unlock(&g_caches_lock); 1960 } 1961 1962 sem_post(sem); 1963 return 0; 1964 } 1965 1966 int64_t 1967 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 1968 void *payload, uint64_t offset, uint64_t length) 1969 { 1970 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1971 uint64_t final_offset, final_length; 1972 uint32_t sub_reads = 0; 1973 int rc = 0; 1974 1975 pthread_spin_lock(&file->lock); 1976 1977 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 1978 1979 file->open_for_writing = false; 1980 1981 if (length == 0 || offset >= file->length) { 1982 pthread_spin_unlock(&file->lock); 1983 return 0; 1984 } 1985 1986 if (offset + length > file->length) { 1987 length = file->length - offset; 1988 } 1989 1990 if (offset != file->next_seq_offset) { 1991 file->seq_byte_count = 0; 1992 } 1993 file->seq_byte_count += length; 1994 file->next_seq_offset = offset + length; 1995 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 1996 check_readahead(file, offset); 1997 check_readahead(file, offset + CACHE_BUFFER_SIZE); 1998 } 1999 2000 final_length = 0; 2001 final_offset = offset + length; 2002 while (offset < final_offset) { 2003 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2004 if (length > (final_offset - offset)) { 2005 length = final_offset - offset; 2006 } 2007 rc = __file_read(file, payload, offset, length, &channel->sem); 2008 if (rc == 0) { 2009 final_length += length; 2010 } else { 2011 break; 2012 } 2013 payload += length; 2014 offset += length; 2015 sub_reads++; 2016 } 2017 pthread_spin_unlock(&file->lock); 2018 while (sub_reads-- > 0) { 2019 sem_wait(&channel->sem); 2020 } 2021 if (rc == 0) { 2022 return final_length; 2023 } else { 2024 return rc; 2025 } 2026 } 2027 2028 static void 2029 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2030 spdk_file_op_complete cb_fn, void *cb_arg) 2031 { 2032 struct spdk_fs_request *sync_req; 2033 struct spdk_fs_request *flush_req; 2034 struct spdk_fs_cb_args *sync_args; 2035 struct spdk_fs_cb_args *flush_args; 2036 2037 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2038 2039 pthread_spin_lock(&file->lock); 2040 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2041 BLOBFS_TRACE(file, "done - no data to flush\n"); 2042 pthread_spin_unlock(&file->lock); 2043 cb_fn(cb_arg, 0); 2044 return; 2045 } 2046 2047 sync_req = alloc_fs_request(channel); 2048 assert(sync_req != NULL); 2049 sync_args = &sync_req->args; 2050 2051 flush_req = alloc_fs_request(channel); 2052 assert(flush_req != NULL); 2053 flush_args = &flush_req->args; 2054 2055 sync_args->file = file; 2056 sync_args->fn.file_op = cb_fn; 2057 sync_args->arg = cb_arg; 2058 sync_args->op.sync.offset = file->append_pos; 2059 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2060 pthread_spin_unlock(&file->lock); 2061 2062 flush_args->file = file; 2063 channel->send_request(__file_flush, flush_args); 2064 } 2065 2066 int 2067 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2068 { 2069 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2070 2071 _file_sync(file, channel, __sem_post, &channel->sem); 2072 sem_wait(&channel->sem); 2073 2074 return 0; 2075 } 2076 2077 void 2078 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2079 spdk_file_op_complete cb_fn, void *cb_arg) 2080 { 2081 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2082 2083 _file_sync(file, channel, cb_fn, cb_arg); 2084 } 2085 2086 void 2087 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2088 { 2089 BLOBFS_TRACE(file, "priority=%u\n", priority); 2090 file->priority = priority; 2091 2092 } 2093 2094 /* 2095 * Close routines 2096 */ 2097 2098 static void 2099 __file_close_async_done(void *ctx, int bserrno) 2100 { 2101 struct spdk_fs_request *req = ctx; 2102 struct spdk_fs_cb_args *args = &req->args; 2103 2104 args->fn.file_op(args->arg, bserrno); 2105 free_fs_request(req); 2106 } 2107 2108 static void 2109 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2110 { 2111 pthread_spin_lock(&file->lock); 2112 if (file->ref_count == 0) { 2113 pthread_spin_unlock(&file->lock); 2114 __file_close_async_done(req, -EBADF); 2115 return; 2116 } 2117 2118 file->ref_count--; 2119 if (file->ref_count > 0) { 2120 pthread_spin_unlock(&file->lock); 2121 __file_close_async_done(req, 0); 2122 return; 2123 } 2124 2125 pthread_spin_unlock(&file->lock); 2126 2127 spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req); 2128 } 2129 2130 static void 2131 __file_close_async__sync_done(void *arg, int fserrno) 2132 { 2133 struct spdk_fs_request *req = arg; 2134 struct spdk_fs_cb_args *args = &req->args; 2135 2136 __file_close_async(args->file, req); 2137 } 2138 2139 void 2140 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2141 { 2142 struct spdk_fs_request *req; 2143 struct spdk_fs_cb_args *args; 2144 2145 req = alloc_fs_request(file->fs->md_fs_channel); 2146 if (req == NULL) { 2147 cb_fn(cb_arg, -ENOMEM); 2148 return; 2149 } 2150 2151 args = &req->args; 2152 args->file = file; 2153 args->fn.file_op = cb_fn; 2154 args->arg = cb_arg; 2155 2156 spdk_file_sync_async(file, file->fs->md_io_channel, __file_close_async__sync_done, req); 2157 } 2158 2159 static void 2160 __file_close_done(void *arg, int fserrno) 2161 { 2162 struct spdk_fs_cb_args *args = arg; 2163 2164 args->rc = fserrno; 2165 sem_post(args->sem); 2166 } 2167 2168 static void 2169 __file_close(void *arg) 2170 { 2171 struct spdk_fs_request *req = arg; 2172 struct spdk_fs_cb_args *args = &req->args; 2173 struct spdk_file *file = args->file; 2174 2175 __file_close_async(file, req); 2176 } 2177 2178 int 2179 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2180 { 2181 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2182 struct spdk_fs_request *req; 2183 struct spdk_fs_cb_args *args; 2184 2185 req = alloc_fs_request(channel); 2186 assert(req != NULL); 2187 2188 args = &req->args; 2189 2190 spdk_file_sync(file, _channel); 2191 BLOBFS_TRACE(file, "name=%s\n", file->name); 2192 args->file = file; 2193 args->sem = &channel->sem; 2194 args->fn.file_op = __file_close_done; 2195 args->arg = req; 2196 channel->send_request(__file_close, req); 2197 sem_wait(&channel->sem); 2198 2199 return args->rc; 2200 } 2201 2202 static void 2203 cache_free_buffers(struct spdk_file *file) 2204 { 2205 BLOBFS_TRACE(file, "free=%s\n", file->name); 2206 pthread_spin_lock(&file->lock); 2207 pthread_spin_lock(&g_caches_lock); 2208 if (file->tree->present_mask == 0) { 2209 pthread_spin_unlock(&g_caches_lock); 2210 pthread_spin_unlock(&file->lock); 2211 return; 2212 } 2213 spdk_tree_free_buffers(file->tree); 2214 if (file->tree->present_mask == 0) { 2215 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2216 } 2217 file->last = NULL; 2218 pthread_spin_unlock(&g_caches_lock); 2219 pthread_spin_unlock(&file->lock); 2220 } 2221 2222 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS); 2223 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW); 2224