1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include <stdbool.h> 35 #include <assert.h> 36 #include <errno.h> 37 #include <stdlib.h> 38 #include <string.h> 39 #include <pthread.h> 40 41 #include "spdk/blobfs.h" 42 #include "blobfs_internal.h" 43 44 #include "spdk/queue.h" 45 #include "spdk/io_channel.h" 46 #include "spdk/assert.h" 47 #include "spdk/env.h" 48 #include "spdk_internal/log.h" 49 50 #define BLOBFS_TRACE(file, str, args...) \ 51 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_TRACE_RW(file, str, args...) \ 54 SPDK_TRACELOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args) 55 56 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 57 58 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 59 static struct spdk_mempool *g_cache_pool; 60 static TAILQ_HEAD(, spdk_file) g_caches; 61 static pthread_spinlock_t g_caches_lock; 62 63 static void 64 __sem_post(void *arg, int bserrno) 65 { 66 sem_t *sem = arg; 67 68 sem_post(sem); 69 } 70 71 void 72 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 73 { 74 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 75 free(cache_buffer); 76 } 77 78 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 79 80 struct spdk_file { 81 struct spdk_filesystem *fs; 82 struct spdk_blob *blob; 83 char *name; 84 uint64_t length; 85 bool open_for_writing; 86 uint64_t length_flushed; 87 uint64_t append_pos; 88 uint64_t seq_byte_count; 89 uint64_t next_seq_offset; 90 uint32_t priority; 91 TAILQ_ENTRY(spdk_file) tailq; 92 spdk_blob_id blobid; 93 uint32_t ref_count; 94 pthread_spinlock_t lock; 95 struct cache_buffer *last; 96 struct cache_tree *tree; 97 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 98 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 99 TAILQ_ENTRY(spdk_file) cache_tailq; 100 }; 101 102 struct spdk_filesystem { 103 struct spdk_blob_store *bs; 104 TAILQ_HEAD(, spdk_file) files; 105 struct spdk_bs_opts bs_opts; 106 struct spdk_bs_dev *bdev; 107 fs_send_request_fn send_request; 108 struct spdk_io_channel *sync_io_channel; 109 struct spdk_fs_channel *sync_fs_channel; 110 struct spdk_io_channel *md_io_channel; 111 struct spdk_fs_channel *md_fs_channel; 112 }; 113 114 struct spdk_fs_cb_args { 115 union { 116 spdk_fs_op_with_handle_complete fs_op_with_handle; 117 spdk_fs_op_complete fs_op; 118 spdk_file_op_with_handle_complete file_op_with_handle; 119 spdk_file_op_complete file_op; 120 spdk_file_stat_op_complete stat_op; 121 } fn; 122 void *arg; 123 sem_t *sem; 124 struct spdk_filesystem *fs; 125 struct spdk_file *file; 126 int rc; 127 bool from_request; 128 union { 129 struct { 130 uint64_t length; 131 } truncate; 132 struct { 133 struct spdk_io_channel *channel; 134 void *user_buf; 135 void *pin_buf; 136 int is_read; 137 off_t offset; 138 size_t length; 139 uint64_t start_page; 140 uint64_t num_pages; 141 uint32_t blocklen; 142 } rw; 143 struct { 144 const char *old_name; 145 const char *new_name; 146 } rename; 147 struct { 148 struct cache_buffer *cache_buffer; 149 uint64_t length; 150 } flush; 151 struct { 152 struct cache_buffer *cache_buffer; 153 uint64_t length; 154 uint64_t offset; 155 } readahead; 156 struct { 157 uint64_t offset; 158 TAILQ_ENTRY(spdk_fs_request) tailq; 159 } sync; 160 struct { 161 uint32_t num_clusters; 162 } resize; 163 struct { 164 const char *name; 165 uint32_t flags; 166 TAILQ_ENTRY(spdk_fs_request) tailq; 167 } open; 168 struct { 169 const char *name; 170 } create; 171 struct { 172 const char *name; 173 } delete; 174 struct { 175 const char *name; 176 } stat; 177 } op; 178 }; 179 180 static void cache_free_buffers(struct spdk_file *file); 181 182 static void 183 __initialize_cache(void) 184 { 185 if (g_cache_pool != NULL) { 186 return; 187 } 188 189 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 190 g_fs_cache_size / CACHE_BUFFER_SIZE, 191 CACHE_BUFFER_SIZE, -1, SPDK_ENV_SOCKET_ID_ANY); 192 TAILQ_INIT(&g_caches); 193 pthread_spin_init(&g_caches_lock, 0); 194 } 195 196 static uint64_t 197 __file_get_blob_size(struct spdk_file *file) 198 { 199 uint64_t cluster_sz; 200 201 cluster_sz = file->fs->bs_opts.cluster_sz; 202 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 203 } 204 205 struct spdk_fs_request { 206 struct spdk_fs_cb_args args; 207 TAILQ_ENTRY(spdk_fs_request) link; 208 struct spdk_fs_channel *channel; 209 }; 210 211 struct spdk_fs_channel { 212 struct spdk_fs_request *req_mem; 213 TAILQ_HEAD(, spdk_fs_request) reqs; 214 sem_t sem; 215 struct spdk_filesystem *fs; 216 struct spdk_io_channel *bs_channel; 217 fs_send_request_fn send_request; 218 }; 219 220 static struct spdk_fs_request * 221 alloc_fs_request(struct spdk_fs_channel *channel) 222 { 223 struct spdk_fs_request *req; 224 225 req = TAILQ_FIRST(&channel->reqs); 226 if (!req) { 227 return NULL; 228 } 229 TAILQ_REMOVE(&channel->reqs, req, link); 230 memset(req, 0, sizeof(*req)); 231 req->channel = channel; 232 req->args.from_request = true; 233 234 return req; 235 } 236 237 static void 238 free_fs_request(struct spdk_fs_request *req) 239 { 240 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 241 } 242 243 static int 244 _spdk_fs_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx) 245 { 246 struct spdk_filesystem *fs = io_device; 247 struct spdk_fs_channel *channel = ctx_buf; 248 uint32_t max_ops = *(uint32_t *)unique_ctx; 249 uint32_t i; 250 251 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 252 if (!channel->req_mem) { 253 free(channel); 254 return -1; 255 } 256 257 TAILQ_INIT(&channel->reqs); 258 sem_init(&channel->sem, 0, 0); 259 260 for (i = 0; i < max_ops; i++) { 261 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 262 } 263 264 channel->fs = fs; 265 266 return 0; 267 } 268 269 static void 270 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 271 { 272 struct spdk_fs_channel *channel = ctx_buf; 273 274 free(channel->req_mem); 275 if (channel->bs_channel != NULL) { 276 spdk_bs_free_io_channel(channel->bs_channel); 277 } 278 } 279 280 static void 281 __send_request_direct(fs_request_fn fn, void *arg) 282 { 283 fn(arg); 284 } 285 286 static void 287 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 288 { 289 fs->bs = bs; 290 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 291 fs->md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT, 512); 292 fs->md_fs_channel->send_request = __send_request_direct; 293 fs->sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT, 512); 294 fs->sync_fs_channel->send_request = __send_request_direct; 295 } 296 297 static void 298 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 299 { 300 struct spdk_fs_request *req = ctx; 301 struct spdk_fs_cb_args *args = &req->args; 302 struct spdk_filesystem *fs = args->fs; 303 304 if (bserrno == 0) { 305 common_fs_bs_init(fs, bs); 306 } else { 307 free(fs); 308 fs = NULL; 309 } 310 311 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 312 free_fs_request(req); 313 } 314 315 static struct spdk_filesystem * 316 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 317 { 318 struct spdk_filesystem *fs; 319 uint32_t max_ops = 512; 320 321 fs = calloc(1, sizeof(*fs)); 322 if (fs == NULL) { 323 return NULL; 324 } 325 326 fs->bdev = dev; 327 fs->send_request = send_request_fn; 328 TAILQ_INIT(&fs->files); 329 spdk_io_device_register(fs, _spdk_fs_channel_create, _spdk_fs_channel_destroy, 330 sizeof(struct spdk_fs_channel)); 331 332 fs->md_io_channel = spdk_get_io_channel(fs, SPDK_IO_PRIORITY_DEFAULT, true, (void *)&max_ops); 333 fs->md_fs_channel = spdk_io_channel_get_ctx(fs->md_io_channel); 334 335 fs->sync_io_channel = spdk_get_io_channel(fs, SPDK_IO_PRIORITY_DEFAULT, true, (void *)&max_ops); 336 fs->sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_io_channel); 337 338 __initialize_cache(); 339 340 return fs; 341 } 342 343 void 344 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 345 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 346 { 347 struct spdk_filesystem *fs; 348 struct spdk_fs_request *req; 349 struct spdk_fs_cb_args *args; 350 351 fs = fs_alloc(dev, send_request_fn); 352 if (fs == NULL) { 353 cb_fn(cb_arg, NULL, -ENOMEM); 354 return; 355 } 356 357 req = alloc_fs_request(fs->md_fs_channel); 358 if (req == NULL) { 359 cb_fn(cb_arg, NULL, -ENOMEM); 360 return; 361 } 362 363 args = &req->args; 364 args->fn.fs_op_with_handle = cb_fn; 365 args->arg = cb_arg; 366 args->fs = fs; 367 368 spdk_bs_init(dev, NULL, init_cb, req); 369 } 370 371 static struct spdk_file * 372 file_alloc(struct spdk_filesystem *fs) 373 { 374 struct spdk_file *file; 375 376 file = calloc(1, sizeof(*file)); 377 if (file == NULL) { 378 return NULL; 379 } 380 381 file->fs = fs; 382 TAILQ_INIT(&file->open_requests); 383 TAILQ_INIT(&file->sync_requests); 384 pthread_spin_init(&file->lock, 0); 385 file->tree = calloc(1, sizeof(*file->tree)); 386 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 387 file->priority = SPDK_FILE_PRIORITY_LOW; 388 return file; 389 } 390 391 static void 392 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 393 { 394 struct spdk_fs_request *req = ctx; 395 struct spdk_fs_cb_args *args = &req->args; 396 struct spdk_filesystem *fs = args->fs; 397 struct spdk_file *f; 398 uint64_t *length; 399 const char *name; 400 size_t value_len; 401 402 if (rc == -ENOENT) { 403 /* Finished iterating */ 404 args->fn.fs_op_with_handle(args->arg, fs, 0); 405 free_fs_request(req); 406 return; 407 } else if (rc < 0) { 408 args->fn.fs_op_with_handle(args->arg, fs, rc); 409 free_fs_request(req); 410 return; 411 } 412 413 rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len); 414 if (rc < 0) { 415 args->fn.fs_op_with_handle(args->arg, fs, rc); 416 free_fs_request(req); 417 return; 418 } 419 420 rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len); 421 if (rc < 0) { 422 args->fn.fs_op_with_handle(args->arg, fs, rc); 423 free_fs_request(req); 424 return; 425 } 426 assert(value_len == 8); 427 428 f = file_alloc(fs); 429 if (f == NULL) { 430 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 431 free_fs_request(req); 432 return; 433 } 434 435 f->name = strdup(name); 436 f->blobid = spdk_blob_get_id(blob); 437 f->length = *length; 438 f->length_flushed = *length; 439 f->append_pos = *length; 440 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 441 442 spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req); 443 } 444 445 static void 446 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 447 { 448 struct spdk_fs_request *req = ctx; 449 struct spdk_fs_cb_args *args = &req->args; 450 struct spdk_filesystem *fs = args->fs; 451 452 if (bserrno != 0) { 453 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 454 free_fs_request(req); 455 free(fs); 456 return; 457 } 458 459 common_fs_bs_init(fs, bs); 460 spdk_bs_md_iter_first(fs->bs, iter_cb, req); 461 } 462 463 void 464 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 465 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 466 { 467 struct spdk_filesystem *fs; 468 struct spdk_fs_cb_args *args; 469 struct spdk_fs_request *req; 470 471 fs = fs_alloc(dev, send_request_fn); 472 if (fs == NULL) { 473 cb_fn(cb_arg, NULL, -ENOMEM); 474 return; 475 } 476 477 req = alloc_fs_request(fs->md_fs_channel); 478 if (req == NULL) { 479 cb_fn(cb_arg, NULL, -ENOMEM); 480 return; 481 } 482 483 args = &req->args; 484 args->fn.fs_op_with_handle = cb_fn; 485 args->arg = cb_arg; 486 args->fs = fs; 487 488 spdk_bs_load(dev, load_cb, req); 489 } 490 491 static void 492 unload_cb(void *ctx, int bserrno) 493 { 494 struct spdk_fs_request *req = ctx; 495 struct spdk_fs_cb_args *args = &req->args; 496 struct spdk_filesystem *fs = args->fs; 497 498 args->fn.fs_op(args->arg, bserrno); 499 free(req); 500 spdk_io_device_unregister(fs); 501 free(fs); 502 } 503 504 void 505 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 506 { 507 struct spdk_fs_request *req; 508 struct spdk_fs_cb_args *args; 509 510 /* 511 * We must free the md_channel before unloading the blobstore, so just 512 * allocate this request from the general heap. 513 */ 514 req = calloc(1, sizeof(*req)); 515 if (req == NULL) { 516 cb_fn(cb_arg, -ENOMEM); 517 return; 518 } 519 520 args = &req->args; 521 args->fn.fs_op = cb_fn; 522 args->arg = cb_arg; 523 args->fs = fs; 524 525 spdk_fs_free_io_channel(fs->md_io_channel); 526 spdk_fs_free_io_channel(fs->sync_io_channel); 527 spdk_bs_unload(fs->bs, unload_cb, req); 528 } 529 530 static struct spdk_file * 531 fs_find_file(struct spdk_filesystem *fs, const char *name) 532 { 533 struct spdk_file *file; 534 535 TAILQ_FOREACH(file, &fs->files, tailq) { 536 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 537 return file; 538 } 539 } 540 541 return NULL; 542 } 543 544 void 545 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 546 spdk_file_stat_op_complete cb_fn, void *cb_arg) 547 { 548 struct spdk_file_stat stat; 549 struct spdk_file *f = NULL; 550 551 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 552 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 553 return; 554 } 555 556 f = fs_find_file(fs, name); 557 if (f != NULL) { 558 stat.blobid = f->blobid; 559 stat.size = f->length; 560 cb_fn(cb_arg, &stat, 0); 561 return; 562 } 563 564 cb_fn(cb_arg, NULL, -ENOENT); 565 } 566 567 static void 568 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 569 { 570 struct spdk_fs_request *req = arg; 571 struct spdk_fs_cb_args *args = &req->args; 572 573 args->rc = fserrno; 574 if (fserrno == 0) { 575 memcpy(args->arg, stat, sizeof(*stat)); 576 } 577 sem_post(args->sem); 578 } 579 580 static void 581 __file_stat(void *arg) 582 { 583 struct spdk_fs_request *req = arg; 584 struct spdk_fs_cb_args *args = &req->args; 585 586 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 587 args->fn.stat_op, req); 588 } 589 590 int 591 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 592 const char *name, struct spdk_file_stat *stat) 593 { 594 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 595 struct spdk_fs_request *req; 596 int rc; 597 598 req = alloc_fs_request(channel); 599 assert(req != NULL); 600 601 req->args.fs = fs; 602 req->args.op.stat.name = name; 603 req->args.fn.stat_op = __copy_stat; 604 req->args.arg = stat; 605 req->args.sem = &channel->sem; 606 channel->send_request(__file_stat, req); 607 sem_wait(&channel->sem); 608 609 rc = req->args.rc; 610 free_fs_request(req); 611 612 return rc; 613 } 614 615 static void 616 fs_create_blob_close_cb(void *ctx, int bserrno) 617 { 618 struct spdk_fs_request *req = ctx; 619 struct spdk_fs_cb_args *args = &req->args; 620 621 args->fn.file_op(args->arg, bserrno); 622 free_fs_request(req); 623 } 624 625 static void 626 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 627 { 628 struct spdk_fs_request *req = ctx; 629 struct spdk_fs_cb_args *args = &req->args; 630 struct spdk_file *f = args->file; 631 uint64_t length = 0; 632 633 f->blob = blob; 634 spdk_bs_md_resize_blob(blob, 1); 635 spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 636 spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length)); 637 638 spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args); 639 } 640 641 static void 642 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 643 { 644 struct spdk_fs_request *req = ctx; 645 struct spdk_fs_cb_args *args = &req->args; 646 struct spdk_file *f = args->file; 647 648 f->blobid = blobid; 649 spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 650 } 651 652 void 653 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 654 spdk_file_op_complete cb_fn, void *cb_arg) 655 { 656 struct spdk_file *file; 657 struct spdk_fs_request *req; 658 struct spdk_fs_cb_args *args; 659 660 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 661 cb_fn(cb_arg, -ENAMETOOLONG); 662 return; 663 } 664 665 file = fs_find_file(fs, name); 666 if (file != NULL) { 667 cb_fn(cb_arg, -EEXIST); 668 return; 669 } 670 671 file = file_alloc(fs); 672 if (file == NULL) { 673 cb_fn(cb_arg, -ENOMEM); 674 return; 675 } 676 677 req = alloc_fs_request(fs->md_fs_channel); 678 if (req == NULL) { 679 cb_fn(cb_arg, -ENOMEM); 680 return; 681 } 682 683 args = &req->args; 684 args->file = file; 685 args->fn.file_op = cb_fn; 686 args->arg = cb_arg; 687 688 file->name = strdup(name); 689 spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args); 690 } 691 692 static void 693 __fs_create_file_done(void *arg, int fserrno) 694 { 695 struct spdk_fs_request *req = arg; 696 struct spdk_fs_cb_args *args = &req->args; 697 698 args->rc = fserrno; 699 sem_post(args->sem); 700 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 701 } 702 703 static void 704 __fs_create_file(void *arg) 705 { 706 struct spdk_fs_request *req = arg; 707 struct spdk_fs_cb_args *args = &req->args; 708 709 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 710 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 711 } 712 713 int 714 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 715 { 716 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 717 struct spdk_fs_request *req; 718 struct spdk_fs_cb_args *args; 719 int rc; 720 721 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 722 723 req = alloc_fs_request(channel); 724 assert(req != NULL); 725 726 args = &req->args; 727 args->fs = fs; 728 args->op.create.name = name; 729 args->sem = &channel->sem; 730 fs->send_request(__fs_create_file, req); 731 sem_wait(&channel->sem); 732 rc = args->rc; 733 free_fs_request(req); 734 735 return rc; 736 } 737 738 static void 739 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 740 { 741 struct spdk_fs_request *req = ctx; 742 struct spdk_fs_cb_args *args = &req->args; 743 struct spdk_file *f = args->file; 744 745 f->blob = blob; 746 while (!TAILQ_EMPTY(&f->open_requests)) { 747 req = TAILQ_FIRST(&f->open_requests); 748 args = &req->args; 749 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 750 args->fn.file_op_with_handle(args->arg, f, bserrno); 751 free_fs_request(req); 752 } 753 } 754 755 static void 756 fs_open_blob_create_cb(void *ctx, int bserrno) 757 { 758 struct spdk_fs_request *req = ctx; 759 struct spdk_fs_cb_args *args = &req->args; 760 struct spdk_file *file = args->file; 761 struct spdk_filesystem *fs = args->fs; 762 763 if (file == NULL) { 764 file = fs_find_file(fs, args->op.open.name); 765 args->file = file; 766 } 767 768 file->ref_count++; 769 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 770 if (file->ref_count == 1) { 771 assert(file->blob == NULL); 772 spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 773 } else if (file->blob != NULL) { 774 fs_open_blob_done(req, file->blob, 0); 775 } else { 776 /* 777 * The blob open for this file is in progress due to a previous 778 * open request. When that open completes, it will invoke the 779 * open callback for this request. 780 */ 781 } 782 } 783 784 void 785 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 786 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 787 { 788 struct spdk_file *f = NULL; 789 struct spdk_fs_request *req; 790 struct spdk_fs_cb_args *args; 791 792 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 793 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 794 return; 795 } 796 797 f = fs_find_file(fs, name); 798 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 799 cb_fn(cb_arg, NULL, -ENOENT); 800 return; 801 } 802 803 req = alloc_fs_request(fs->md_fs_channel); 804 if (req == NULL) { 805 cb_fn(cb_arg, NULL, -ENOMEM); 806 return; 807 } 808 809 args = &req->args; 810 args->fn.file_op_with_handle = cb_fn; 811 args->arg = cb_arg; 812 args->file = f; 813 args->fs = fs; 814 args->op.open.name = name; 815 816 if (f == NULL) { 817 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 818 } else { 819 fs_open_blob_create_cb(req, 0); 820 } 821 } 822 823 static void 824 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 825 { 826 struct spdk_fs_request *req = arg; 827 struct spdk_fs_cb_args *args = &req->args; 828 829 args->file = file; 830 args->rc = bserrno; 831 sem_post(args->sem); 832 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 833 } 834 835 static void 836 __fs_open_file(void *arg) 837 { 838 struct spdk_fs_request *req = arg; 839 struct spdk_fs_cb_args *args = &req->args; 840 841 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 842 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 843 __fs_open_file_done, req); 844 } 845 846 int 847 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 848 const char *name, uint32_t flags, struct spdk_file **file) 849 { 850 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 851 struct spdk_fs_request *req; 852 struct spdk_fs_cb_args *args; 853 int rc; 854 855 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 856 857 req = alloc_fs_request(channel); 858 assert(req != NULL); 859 860 args = &req->args; 861 args->fs = fs; 862 args->op.open.name = name; 863 args->op.open.flags = flags; 864 args->sem = &channel->sem; 865 fs->send_request(__fs_open_file, req); 866 sem_wait(&channel->sem); 867 rc = args->rc; 868 if (rc == 0) { 869 *file = args->file; 870 } else { 871 *file = NULL; 872 } 873 free_fs_request(req); 874 875 return rc; 876 } 877 878 static void 879 fs_rename_blob_close_cb(void *ctx, int bserrno) 880 { 881 struct spdk_fs_request *req = ctx; 882 struct spdk_fs_cb_args *args = &req->args; 883 884 args->fn.fs_op(args->arg, bserrno); 885 free_fs_request(req); 886 } 887 888 static void 889 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 890 { 891 struct spdk_fs_request *req = ctx; 892 struct spdk_fs_cb_args *args = &req->args; 893 struct spdk_file *f = args->file; 894 const char *new_name = args->op.rename.new_name; 895 896 f->blob = blob; 897 spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 898 spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req); 899 } 900 901 static void 902 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 903 { 904 struct spdk_fs_cb_args *args = &req->args; 905 struct spdk_file *f; 906 907 f = fs_find_file(args->fs, args->op.rename.old_name); 908 if (f == NULL) { 909 args->fn.fs_op(args->arg, -ENOENT); 910 free_fs_request(req); 911 return; 912 } 913 914 free(f->name); 915 f->name = strdup(args->op.rename.new_name); 916 args->file = f; 917 spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 918 } 919 920 static void 921 fs_rename_delete_done(void *arg, int fserrno) 922 { 923 __spdk_fs_md_rename_file(arg); 924 } 925 926 void 927 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 928 const char *old_name, const char *new_name, 929 spdk_file_op_complete cb_fn, void *cb_arg) 930 { 931 struct spdk_file *f; 932 struct spdk_fs_request *req; 933 struct spdk_fs_cb_args *args; 934 935 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name); 936 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 937 cb_fn(cb_arg, -ENAMETOOLONG); 938 return; 939 } 940 941 req = alloc_fs_request(fs->md_fs_channel); 942 if (req == NULL) { 943 cb_fn(cb_arg, -ENOMEM); 944 return; 945 } 946 947 args = &req->args; 948 args->fn.fs_op = cb_fn; 949 args->fs = fs; 950 args->arg = cb_arg; 951 args->op.rename.old_name = old_name; 952 args->op.rename.new_name = new_name; 953 954 f = fs_find_file(fs, new_name); 955 if (f == NULL) { 956 __spdk_fs_md_rename_file(req); 957 return; 958 } 959 960 /* 961 * The rename overwrites an existing file. So delete the existing file, then 962 * do the actual rename. 963 */ 964 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 965 } 966 967 static void 968 __fs_rename_file_done(void *arg, int fserrno) 969 { 970 struct spdk_fs_request *req = arg; 971 struct spdk_fs_cb_args *args = &req->args; 972 973 args->rc = fserrno; 974 sem_post(args->sem); 975 } 976 977 static void 978 __fs_rename_file(void *arg) 979 { 980 struct spdk_fs_request *req = arg; 981 struct spdk_fs_cb_args *args = &req->args; 982 983 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 984 __fs_rename_file_done, req); 985 } 986 987 int 988 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 989 const char *old_name, const char *new_name) 990 { 991 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 992 struct spdk_fs_request *req; 993 struct spdk_fs_cb_args *args; 994 int rc; 995 996 req = alloc_fs_request(channel); 997 assert(req != NULL); 998 999 args = &req->args; 1000 1001 args->fs = fs; 1002 args->op.rename.old_name = old_name; 1003 args->op.rename.new_name = new_name; 1004 args->sem = &channel->sem; 1005 fs->send_request(__fs_rename_file, req); 1006 sem_wait(&channel->sem); 1007 rc = args->rc; 1008 free_fs_request(req); 1009 return rc; 1010 } 1011 1012 static void 1013 blob_delete_cb(void *ctx, int bserrno) 1014 { 1015 struct spdk_fs_request *req = ctx; 1016 struct spdk_fs_cb_args *args = &req->args; 1017 1018 args->fn.file_op(args->arg, bserrno); 1019 free_fs_request(req); 1020 } 1021 1022 void 1023 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1024 spdk_file_op_complete cb_fn, void *cb_arg) 1025 { 1026 struct spdk_file *f; 1027 spdk_blob_id blobid; 1028 struct spdk_fs_request *req; 1029 struct spdk_fs_cb_args *args; 1030 1031 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1032 1033 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1034 cb_fn(cb_arg, -ENAMETOOLONG); 1035 return; 1036 } 1037 1038 f = fs_find_file(fs, name); 1039 if (f == NULL) { 1040 cb_fn(cb_arg, -ENOENT); 1041 return; 1042 } 1043 1044 if (f->ref_count > 0) { 1045 /* For now, do not allow deleting files with open references. */ 1046 cb_fn(cb_arg, -EBUSY); 1047 return; 1048 } 1049 1050 req = alloc_fs_request(fs->md_fs_channel); 1051 if (req == NULL) { 1052 cb_fn(cb_arg, -ENOMEM); 1053 return; 1054 } 1055 1056 TAILQ_REMOVE(&fs->files, f, tailq); 1057 1058 cache_free_buffers(f); 1059 1060 blobid = f->blobid; 1061 1062 free(f->name); 1063 free(f->tree); 1064 free(f); 1065 1066 args = &req->args; 1067 args->fn.file_op = cb_fn; 1068 args->arg = cb_arg; 1069 spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1070 } 1071 1072 static void 1073 __fs_delete_file_done(void *arg, int fserrno) 1074 { 1075 struct spdk_fs_request *req = arg; 1076 struct spdk_fs_cb_args *args = &req->args; 1077 1078 args->rc = fserrno; 1079 sem_post(args->sem); 1080 } 1081 1082 static void 1083 __fs_delete_file(void *arg) 1084 { 1085 struct spdk_fs_request *req = arg; 1086 struct spdk_fs_cb_args *args = &req->args; 1087 1088 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1089 } 1090 1091 int 1092 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1093 const char *name) 1094 { 1095 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1096 struct spdk_fs_request *req; 1097 struct spdk_fs_cb_args *args; 1098 int rc; 1099 1100 req = alloc_fs_request(channel); 1101 assert(req != NULL); 1102 1103 args = &req->args; 1104 args->fs = fs; 1105 args->op.delete.name = name; 1106 args->sem = &channel->sem; 1107 fs->send_request(__fs_delete_file, req); 1108 sem_wait(&channel->sem); 1109 rc = args->rc; 1110 free_fs_request(req); 1111 1112 return rc; 1113 } 1114 1115 spdk_fs_iter 1116 spdk_fs_iter_first(struct spdk_filesystem *fs) 1117 { 1118 struct spdk_file *f; 1119 1120 f = TAILQ_FIRST(&fs->files); 1121 return f; 1122 } 1123 1124 spdk_fs_iter 1125 spdk_fs_iter_next(spdk_fs_iter iter) 1126 { 1127 struct spdk_file *f = iter; 1128 1129 if (f == NULL) { 1130 return NULL; 1131 } 1132 1133 f = TAILQ_NEXT(f, tailq); 1134 return f; 1135 } 1136 1137 const char * 1138 spdk_file_get_name(struct spdk_file *file) 1139 { 1140 return file->name; 1141 } 1142 1143 uint64_t 1144 spdk_file_get_length(struct spdk_file *file) 1145 { 1146 assert(file != NULL); 1147 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1148 return file->length; 1149 } 1150 1151 static void 1152 fs_truncate_complete_cb(void *ctx, int bserrno) 1153 { 1154 struct spdk_fs_request *req = ctx; 1155 struct spdk_fs_cb_args *args = &req->args; 1156 1157 args->fn.file_op(args->arg, bserrno); 1158 free_fs_request(req); 1159 } 1160 1161 static uint64_t 1162 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1163 { 1164 return (length + cluster_sz - 1) / cluster_sz; 1165 } 1166 1167 void 1168 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1169 spdk_file_op_complete cb_fn, void *cb_arg) 1170 { 1171 struct spdk_filesystem *fs; 1172 size_t num_clusters; 1173 struct spdk_fs_request *req; 1174 struct spdk_fs_cb_args *args; 1175 1176 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1177 if (length == file->length) { 1178 cb_fn(cb_arg, 0); 1179 return; 1180 } 1181 1182 req = alloc_fs_request(file->fs->md_fs_channel); 1183 if (req == NULL) { 1184 cb_fn(cb_arg, -ENOMEM); 1185 return; 1186 } 1187 1188 args = &req->args; 1189 args->fn.file_op = cb_fn; 1190 args->arg = cb_arg; 1191 args->file = file; 1192 fs = file->fs; 1193 1194 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1195 1196 spdk_bs_md_resize_blob(file->blob, num_clusters); 1197 spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length)); 1198 1199 file->length = length; 1200 if (file->append_pos > file->length) { 1201 file->append_pos = file->length; 1202 } 1203 1204 spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args); 1205 } 1206 1207 static void 1208 __truncate(void *arg) 1209 { 1210 struct spdk_fs_request *req = arg; 1211 struct spdk_fs_cb_args *args = &req->args; 1212 1213 spdk_file_truncate_async(args->file, args->op.truncate.length, 1214 args->fn.file_op, args->arg); 1215 } 1216 1217 void 1218 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1219 uint64_t length) 1220 { 1221 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1222 struct spdk_fs_request *req; 1223 struct spdk_fs_cb_args *args; 1224 1225 req = alloc_fs_request(channel); 1226 assert(req != NULL); 1227 1228 args = &req->args; 1229 1230 args->file = file; 1231 args->op.truncate.length = length; 1232 args->fn.file_op = __sem_post; 1233 args->arg = &channel->sem; 1234 1235 channel->send_request(__truncate, req); 1236 sem_wait(&channel->sem); 1237 free_fs_request(req); 1238 } 1239 1240 static void 1241 __rw_done(void *ctx, int bserrno) 1242 { 1243 struct spdk_fs_request *req = ctx; 1244 struct spdk_fs_cb_args *args = &req->args; 1245 1246 spdk_free(args->op.rw.pin_buf); 1247 args->fn.file_op(args->arg, bserrno); 1248 free_fs_request(req); 1249 } 1250 1251 static void 1252 __read_done(void *ctx, int bserrno) 1253 { 1254 struct spdk_fs_request *req = ctx; 1255 struct spdk_fs_cb_args *args = &req->args; 1256 1257 if (args->op.rw.is_read) { 1258 memcpy(args->op.rw.user_buf, 1259 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1260 args->op.rw.length); 1261 __rw_done(req, 0); 1262 } else { 1263 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1264 args->op.rw.user_buf, 1265 args->op.rw.length); 1266 spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel, 1267 args->op.rw.pin_buf, 1268 args->op.rw.start_page, args->op.rw.num_pages, 1269 __rw_done, req); 1270 } 1271 } 1272 1273 static void 1274 __do_blob_read(void *ctx, int fserrno) 1275 { 1276 struct spdk_fs_request *req = ctx; 1277 struct spdk_fs_cb_args *args = &req->args; 1278 1279 spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel, 1280 args->op.rw.pin_buf, 1281 args->op.rw.start_page, args->op.rw.num_pages, 1282 __read_done, req); 1283 } 1284 1285 static void 1286 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1287 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1288 { 1289 uint64_t end_page; 1290 1291 *page_size = spdk_bs_get_page_size(file->fs->bs); 1292 *start_page = offset / *page_size; 1293 end_page = (offset + length - 1) / *page_size; 1294 *num_pages = (end_page - *start_page + 1); 1295 } 1296 1297 static void 1298 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1299 void *payload, uint64_t offset, uint64_t length, 1300 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1301 { 1302 struct spdk_fs_request *req; 1303 struct spdk_fs_cb_args *args; 1304 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1305 uint64_t start_page, num_pages, pin_buf_length; 1306 uint32_t page_size; 1307 1308 if (is_read && offset + length > file->length) { 1309 cb_fn(cb_arg, -EINVAL); 1310 return; 1311 } 1312 1313 req = alloc_fs_request(channel); 1314 if (req == NULL) { 1315 cb_fn(cb_arg, -ENOMEM); 1316 return; 1317 } 1318 1319 args = &req->args; 1320 args->fn.file_op = cb_fn; 1321 args->arg = cb_arg; 1322 args->file = file; 1323 args->op.rw.channel = channel->bs_channel; 1324 args->op.rw.user_buf = payload; 1325 args->op.rw.is_read = is_read; 1326 args->op.rw.offset = offset; 1327 args->op.rw.length = length; 1328 1329 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1330 pin_buf_length = num_pages * page_size; 1331 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, 4096, NULL); 1332 1333 args->op.rw.start_page = start_page; 1334 args->op.rw.num_pages = num_pages; 1335 1336 if (!is_read && file->length < offset + length) { 1337 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1338 } else { 1339 __do_blob_read(req, 0); 1340 } 1341 } 1342 1343 void 1344 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1345 void *payload, uint64_t offset, uint64_t length, 1346 spdk_file_op_complete cb_fn, void *cb_arg) 1347 { 1348 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1349 } 1350 1351 void 1352 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1353 void *payload, uint64_t offset, uint64_t length, 1354 spdk_file_op_complete cb_fn, void *cb_arg) 1355 { 1356 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n", 1357 file->name, offset, length); 1358 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1359 } 1360 1361 struct spdk_io_channel * 1362 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs, uint32_t priority) 1363 { 1364 struct spdk_io_channel *io_channel; 1365 struct spdk_fs_channel *fs_channel; 1366 uint32_t max_ops = 512; 1367 1368 io_channel = spdk_get_io_channel(fs, priority, true, (void *)&max_ops); 1369 fs_channel = spdk_io_channel_get_ctx(io_channel); 1370 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT, 512); 1371 fs_channel->send_request = __send_request_direct; 1372 1373 return io_channel; 1374 } 1375 1376 struct spdk_io_channel * 1377 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs, uint32_t priority) 1378 { 1379 struct spdk_io_channel *io_channel; 1380 struct spdk_fs_channel *fs_channel; 1381 uint32_t max_ops = 16; 1382 1383 io_channel = spdk_get_io_channel(fs, priority, true, (void *)&max_ops); 1384 fs_channel = spdk_io_channel_get_ctx(io_channel); 1385 fs_channel->send_request = fs->send_request; 1386 1387 return io_channel; 1388 } 1389 1390 void 1391 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1392 { 1393 spdk_put_io_channel(channel); 1394 } 1395 1396 void 1397 spdk_fs_set_cache_size(uint64_t size_in_mb) 1398 { 1399 g_fs_cache_size = size_in_mb * 1024 * 1024; 1400 } 1401 1402 uint64_t 1403 spdk_fs_get_cache_size(void) 1404 { 1405 return g_fs_cache_size / (1024 * 1024); 1406 } 1407 1408 static void __file_flush(void *_args); 1409 1410 static void * 1411 alloc_cache_memory_buffer(struct spdk_file *context) 1412 { 1413 struct spdk_file *file; 1414 void *buf; 1415 1416 buf = spdk_mempool_get(g_cache_pool); 1417 if (buf != NULL) { 1418 return buf; 1419 } 1420 1421 pthread_spin_lock(&g_caches_lock); 1422 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1423 if (!file->open_for_writing && 1424 file->priority == SPDK_FILE_PRIORITY_LOW && 1425 file != context) { 1426 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1427 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1428 break; 1429 } 1430 } 1431 pthread_spin_unlock(&g_caches_lock); 1432 if (file != NULL) { 1433 cache_free_buffers(file); 1434 buf = spdk_mempool_get(g_cache_pool); 1435 if (buf != NULL) { 1436 return buf; 1437 } 1438 } 1439 1440 pthread_spin_lock(&g_caches_lock); 1441 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1442 if (!file->open_for_writing && file != context) { 1443 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1444 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1445 break; 1446 } 1447 } 1448 pthread_spin_unlock(&g_caches_lock); 1449 if (file != NULL) { 1450 cache_free_buffers(file); 1451 buf = spdk_mempool_get(g_cache_pool); 1452 if (buf != NULL) { 1453 return buf; 1454 } 1455 } 1456 1457 pthread_spin_lock(&g_caches_lock); 1458 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1459 if (file != context) { 1460 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1461 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1462 break; 1463 } 1464 } 1465 pthread_spin_unlock(&g_caches_lock); 1466 if (file != NULL) { 1467 cache_free_buffers(file); 1468 buf = spdk_mempool_get(g_cache_pool); 1469 if (buf != NULL) { 1470 return buf; 1471 } 1472 } 1473 1474 assert(false); 1475 return NULL; 1476 } 1477 1478 static struct cache_buffer * 1479 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1480 { 1481 struct cache_buffer *buf; 1482 int count = 0; 1483 1484 buf = calloc(1, sizeof(*buf)); 1485 if (buf == NULL) { 1486 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "calloc failed\n"); 1487 return NULL; 1488 } 1489 1490 buf->buf = alloc_cache_memory_buffer(file); 1491 if (buf->buf == NULL) { 1492 while (buf->buf == NULL) { 1493 count++; 1494 buf->buf = alloc_cache_memory_buffer(file); 1495 /* 1496 * TODO: __free_oldest_cache() should eventually free some buffers. 1497 * Should have a more sophisticated check here, instead of just 1498 * bailing if 100 tries does not result in getting a free buffer. 1499 */ 1500 if (count == 100) { 1501 SPDK_ERRLOG("could not allocate cache buffer\n"); 1502 assert(false); 1503 return NULL; 1504 } 1505 } 1506 } 1507 1508 buf->buf_size = CACHE_BUFFER_SIZE; 1509 buf->offset = offset; 1510 1511 pthread_spin_lock(&g_caches_lock); 1512 if (file->tree->present_mask == 0) { 1513 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1514 } 1515 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1516 pthread_spin_unlock(&g_caches_lock); 1517 1518 return buf; 1519 } 1520 1521 static struct cache_buffer * 1522 cache_append_buffer(struct spdk_file *file) 1523 { 1524 struct cache_buffer *last; 1525 1526 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1527 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1528 1529 last = cache_insert_buffer(file, file->append_pos); 1530 if (last == NULL) { 1531 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n"); 1532 return NULL; 1533 } 1534 1535 if (file->last != NULL) { 1536 file->last->next = last; 1537 } 1538 file->last = last; 1539 1540 return last; 1541 } 1542 1543 static void 1544 __wake_caller(struct spdk_fs_cb_args *args) 1545 { 1546 sem_post(args->sem); 1547 } 1548 1549 static void 1550 __file_cache_finish_sync(struct spdk_file *file) 1551 { 1552 struct spdk_fs_request *sync_req; 1553 struct spdk_fs_cb_args *sync_args; 1554 1555 pthread_spin_lock(&file->lock); 1556 while (!TAILQ_EMPTY(&file->sync_requests)) { 1557 sync_req = TAILQ_FIRST(&file->sync_requests); 1558 sync_args = &sync_req->args; 1559 if (sync_args->op.sync.offset > file->length_flushed) { 1560 break; 1561 } 1562 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1563 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1564 pthread_spin_unlock(&file->lock); 1565 sync_args->fn.file_op(sync_args->arg, 0); 1566 pthread_spin_lock(&file->lock); 1567 free_fs_request(sync_req); 1568 } 1569 pthread_spin_unlock(&file->lock); 1570 } 1571 1572 static void 1573 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1574 { 1575 struct spdk_file *file = ctx; 1576 1577 __file_cache_finish_sync(file); 1578 } 1579 1580 static void 1581 __free_args(struct spdk_fs_cb_args *args) 1582 { 1583 struct spdk_fs_request *req; 1584 1585 if (!args->from_request) { 1586 free(args); 1587 } else { 1588 /* Depends on args being at the start of the spdk_fs_request structure. */ 1589 req = (struct spdk_fs_request *)args; 1590 free_fs_request(req); 1591 } 1592 } 1593 1594 static void 1595 __file_flush_done(void *arg, int bserrno) 1596 { 1597 struct spdk_fs_cb_args *args = arg; 1598 struct spdk_fs_request *sync_req; 1599 struct spdk_file *file = args->file; 1600 struct cache_buffer *next = args->op.flush.cache_buffer; 1601 1602 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1603 1604 pthread_spin_lock(&file->lock); 1605 next->in_progress = false; 1606 next->bytes_flushed += args->op.flush.length; 1607 file->length_flushed += args->op.flush.length; 1608 if (file->length_flushed > file->length) { 1609 file->length = file->length_flushed; 1610 } 1611 if (next->bytes_flushed == next->buf_size) { 1612 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1613 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1614 } 1615 1616 TAILQ_FOREACH_REVERSE(sync_req, &file->sync_requests, sync_requests_head, args.op.sync.tailq) { 1617 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1618 break; 1619 } 1620 } 1621 1622 if (sync_req != NULL) { 1623 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1624 spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed, 1625 sizeof(file->length_flushed)); 1626 1627 pthread_spin_unlock(&file->lock); 1628 spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file); 1629 } else { 1630 pthread_spin_unlock(&file->lock); 1631 __file_cache_finish_sync(file); 1632 } 1633 1634 /* 1635 * Assert that there is no cached data that extends past the end of the underlying 1636 * blob. 1637 */ 1638 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1639 next->bytes_filled == 0); 1640 1641 __file_flush(args); 1642 } 1643 1644 static void 1645 __file_flush(void *_args) 1646 { 1647 struct spdk_fs_cb_args *args = _args; 1648 struct spdk_file *file = args->file; 1649 struct cache_buffer *next; 1650 uint64_t offset, length, start_page, num_pages; 1651 uint32_t page_size; 1652 1653 pthread_spin_lock(&file->lock); 1654 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1655 if (next == NULL || next->in_progress) { 1656 /* 1657 * There is either no data to flush, or a flush I/O is already in 1658 * progress. So return immediately - if a flush I/O is in 1659 * progress we will flush more data after that is completed. 1660 */ 1661 __free_args(args); 1662 pthread_spin_unlock(&file->lock); 1663 return; 1664 } 1665 1666 offset = next->offset + next->bytes_flushed; 1667 length = next->bytes_filled - next->bytes_flushed; 1668 if (length == 0) { 1669 __free_args(args); 1670 pthread_spin_unlock(&file->lock); 1671 return; 1672 } 1673 args->op.flush.length = length; 1674 args->op.flush.cache_buffer = next; 1675 1676 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1677 1678 next->in_progress = true; 1679 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1680 offset, length, start_page, num_pages); 1681 pthread_spin_unlock(&file->lock); 1682 spdk_bs_io_write_blob(file->blob, file->fs->sync_fs_channel->bs_channel, 1683 next->buf + (start_page * page_size) - next->offset, 1684 start_page, num_pages, 1685 __file_flush_done, args); 1686 } 1687 1688 static void 1689 __file_extend_done(void *arg, int bserrno) 1690 { 1691 struct spdk_fs_cb_args *args = arg; 1692 1693 __wake_caller(args); 1694 } 1695 1696 static void 1697 __file_extend_blob(void *_args) 1698 { 1699 struct spdk_fs_cb_args *args = _args; 1700 struct spdk_file *file = args->file; 1701 1702 spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters); 1703 1704 spdk_bs_md_sync_blob(file->blob, __file_extend_done, args); 1705 } 1706 1707 static void 1708 __rw_from_file_done(void *arg, int bserrno) 1709 { 1710 struct spdk_fs_cb_args *args = arg; 1711 1712 __wake_caller(args); 1713 __free_args(args); 1714 } 1715 1716 static void 1717 __rw_from_file(void *_args) 1718 { 1719 struct spdk_fs_cb_args *args = _args; 1720 struct spdk_file *file = args->file; 1721 1722 if (args->op.rw.is_read) { 1723 spdk_file_read_async(file, file->fs->sync_io_channel, args->op.rw.user_buf, 1724 args->op.rw.offset, args->op.rw.length, 1725 __rw_from_file_done, args); 1726 } else { 1727 spdk_file_write_async(file, file->fs->sync_io_channel, args->op.rw.user_buf, 1728 args->op.rw.offset, args->op.rw.length, 1729 __rw_from_file_done, args); 1730 } 1731 } 1732 1733 static int 1734 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1735 uint64_t offset, uint64_t length, bool is_read) 1736 { 1737 struct spdk_fs_cb_args *args; 1738 1739 args = calloc(1, sizeof(*args)); 1740 if (args == NULL) { 1741 sem_post(sem); 1742 return -ENOMEM; 1743 } 1744 1745 args->file = file; 1746 args->sem = sem; 1747 args->op.rw.user_buf = payload; 1748 args->op.rw.offset = offset; 1749 args->op.rw.length = length; 1750 args->op.rw.is_read = is_read; 1751 file->fs->send_request(__rw_from_file, args); 1752 return 0; 1753 } 1754 1755 int 1756 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 1757 void *payload, uint64_t offset, uint64_t length) 1758 { 1759 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1760 struct spdk_fs_cb_args *args; 1761 uint64_t rem_length, copy, blob_size, cluster_sz; 1762 uint32_t cache_buffers_filled = 0; 1763 uint8_t *cur_payload; 1764 struct cache_buffer *last; 1765 1766 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 1767 1768 if (length == 0) { 1769 return 0; 1770 } 1771 1772 if (offset != file->append_pos) { 1773 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 1774 return -EINVAL; 1775 } 1776 1777 pthread_spin_lock(&file->lock); 1778 file->open_for_writing = true; 1779 1780 if (file->last == NULL) { 1781 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 1782 cache_append_buffer(file); 1783 } else { 1784 int rc; 1785 1786 file->append_pos += length; 1787 rc = __send_rw_from_file(file, &channel->sem, payload, 1788 offset, length, false); 1789 pthread_spin_unlock(&file->lock); 1790 sem_wait(&channel->sem); 1791 return rc; 1792 } 1793 } 1794 1795 blob_size = __file_get_blob_size(file); 1796 1797 if ((offset + length) > blob_size) { 1798 struct spdk_fs_cb_args extend_args = {}; 1799 1800 cluster_sz = file->fs->bs_opts.cluster_sz; 1801 extend_args.sem = &channel->sem; 1802 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 1803 extend_args.file = file; 1804 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 1805 pthread_spin_unlock(&file->lock); 1806 file->fs->send_request(__file_extend_blob, &extend_args); 1807 sem_wait(&channel->sem); 1808 } 1809 1810 last = file->last; 1811 rem_length = length; 1812 cur_payload = payload; 1813 while (rem_length > 0) { 1814 copy = last->buf_size - last->bytes_filled; 1815 if (copy > rem_length) { 1816 copy = rem_length; 1817 } 1818 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 1819 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 1820 file->append_pos += copy; 1821 if (file->length < file->append_pos) { 1822 file->length = file->append_pos; 1823 } 1824 cur_payload += copy; 1825 last->bytes_filled += copy; 1826 rem_length -= copy; 1827 if (last->bytes_filled == last->buf_size) { 1828 cache_buffers_filled++; 1829 last = cache_append_buffer(file); 1830 if (last == NULL) { 1831 BLOBFS_TRACE(file, "nomem\n"); 1832 pthread_spin_unlock(&file->lock); 1833 return -ENOMEM; 1834 } 1835 } 1836 } 1837 1838 if (cache_buffers_filled == 0) { 1839 pthread_spin_unlock(&file->lock); 1840 return 0; 1841 } 1842 1843 args = calloc(1, sizeof(*args)); 1844 if (args == NULL) { 1845 pthread_spin_unlock(&file->lock); 1846 return -ENOMEM; 1847 } 1848 1849 args->file = file; 1850 file->fs->send_request(__file_flush, args); 1851 pthread_spin_unlock(&file->lock); 1852 return 0; 1853 } 1854 1855 static void 1856 __readahead_done(void *arg, int bserrno) 1857 { 1858 struct spdk_fs_cb_args *args = arg; 1859 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 1860 struct spdk_file *file = args->file; 1861 1862 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 1863 1864 pthread_spin_lock(&file->lock); 1865 cache_buffer->bytes_filled = args->op.readahead.length; 1866 cache_buffer->bytes_flushed = args->op.readahead.length; 1867 cache_buffer->in_progress = false; 1868 pthread_spin_unlock(&file->lock); 1869 1870 __free_args(args); 1871 } 1872 1873 static void 1874 __readahead(void *_args) 1875 { 1876 struct spdk_fs_cb_args *args = _args; 1877 struct spdk_file *file = args->file; 1878 uint64_t offset, length, start_page, num_pages; 1879 uint32_t page_size; 1880 1881 offset = args->op.readahead.offset; 1882 length = args->op.readahead.length; 1883 assert(length > 0); 1884 1885 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1886 1887 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1888 offset, length, start_page, num_pages); 1889 spdk_bs_io_read_blob(file->blob, file->fs->sync_fs_channel->bs_channel, 1890 args->op.readahead.cache_buffer->buf, 1891 start_page, num_pages, 1892 __readahead_done, args); 1893 } 1894 1895 static uint64_t 1896 __next_cache_buffer_offset(uint64_t offset) 1897 { 1898 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 1899 } 1900 1901 static void 1902 check_readahead(struct spdk_file *file, uint64_t offset) 1903 { 1904 struct spdk_fs_cb_args *args; 1905 1906 offset = __next_cache_buffer_offset(offset); 1907 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 1908 return; 1909 } 1910 1911 BLOBFS_TRACE(file, "offset=%jx\n", offset); 1912 args = calloc(1, sizeof(*args)); 1913 args->file = file; 1914 args->op.readahead.offset = offset; 1915 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 1916 args->op.readahead.cache_buffer->in_progress = true; 1917 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 1918 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 1919 } else { 1920 args->op.readahead.length = CACHE_BUFFER_SIZE; 1921 } 1922 file->fs->send_request(__readahead, args); 1923 } 1924 1925 static int 1926 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 1927 { 1928 struct cache_buffer *buf; 1929 1930 buf = spdk_tree_find_filled_buffer(file->tree, offset); 1931 if (buf == NULL) { 1932 return __send_rw_from_file(file, sem, payload, offset, length, true); 1933 } 1934 1935 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 1936 length = buf->offset + buf->bytes_filled - offset; 1937 } 1938 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 1939 memcpy(payload, &buf->buf[offset - buf->offset], length); 1940 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 1941 pthread_spin_lock(&g_caches_lock); 1942 spdk_tree_remove_buffer(file->tree, buf); 1943 if (file->tree->present_mask == 0) { 1944 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1945 } 1946 pthread_spin_unlock(&g_caches_lock); 1947 } 1948 1949 sem_post(sem); 1950 return 0; 1951 } 1952 1953 int64_t 1954 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 1955 void *payload, uint64_t offset, uint64_t length) 1956 { 1957 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1958 uint64_t final_offset, final_length; 1959 uint32_t sub_reads = 0; 1960 int rc = 0; 1961 1962 pthread_spin_lock(&file->lock); 1963 1964 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 1965 1966 file->open_for_writing = false; 1967 1968 if (length == 0 || offset >= file->length) { 1969 pthread_spin_unlock(&file->lock); 1970 return 0; 1971 } 1972 1973 if (offset + length > file->length) { 1974 length = file->length - offset; 1975 } 1976 1977 if (offset != file->next_seq_offset) { 1978 file->seq_byte_count = 0; 1979 } 1980 file->seq_byte_count += length; 1981 file->next_seq_offset = offset + length; 1982 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 1983 check_readahead(file, offset); 1984 check_readahead(file, offset + CACHE_BUFFER_SIZE); 1985 } 1986 1987 final_length = 0; 1988 final_offset = offset + length; 1989 while (offset < final_offset) { 1990 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 1991 if (length > (final_offset - offset)) { 1992 length = final_offset - offset; 1993 } 1994 rc = __file_read(file, payload, offset, length, &channel->sem); 1995 if (rc == 0) { 1996 final_length += length; 1997 } else { 1998 break; 1999 } 2000 payload += length; 2001 offset += length; 2002 sub_reads++; 2003 } 2004 pthread_spin_unlock(&file->lock); 2005 while (sub_reads-- > 0) { 2006 sem_wait(&channel->sem); 2007 } 2008 if (rc == 0) { 2009 return final_length; 2010 } else { 2011 return rc; 2012 } 2013 } 2014 2015 static void 2016 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2017 spdk_file_op_complete cb_fn, void *cb_arg) 2018 { 2019 struct spdk_fs_request *sync_req; 2020 struct spdk_fs_request *flush_req; 2021 struct spdk_fs_cb_args *sync_args; 2022 struct spdk_fs_cb_args *flush_args; 2023 2024 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2025 2026 pthread_spin_lock(&file->lock); 2027 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2028 BLOBFS_TRACE(file, "done - no data to flush\n"); 2029 pthread_spin_unlock(&file->lock); 2030 cb_fn(cb_arg, 0); 2031 return; 2032 } 2033 2034 sync_req = alloc_fs_request(channel); 2035 assert(sync_req != NULL); 2036 sync_args = &sync_req->args; 2037 2038 flush_req = alloc_fs_request(channel); 2039 assert(flush_req != NULL); 2040 flush_args = &flush_req->args; 2041 2042 sync_args->file = file; 2043 sync_args->fn.file_op = cb_fn; 2044 sync_args->arg = cb_arg; 2045 sync_args->op.sync.offset = file->append_pos; 2046 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2047 pthread_spin_unlock(&file->lock); 2048 2049 flush_args->file = file; 2050 channel->send_request(__file_flush, flush_args); 2051 } 2052 2053 int 2054 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2055 { 2056 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2057 2058 _file_sync(file, channel, __sem_post, &channel->sem); 2059 sem_wait(&channel->sem); 2060 2061 return 0; 2062 } 2063 2064 void 2065 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2066 spdk_file_op_complete cb_fn, void *cb_arg) 2067 { 2068 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2069 2070 _file_sync(file, channel, cb_fn, cb_arg); 2071 } 2072 2073 void 2074 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2075 { 2076 BLOBFS_TRACE(file, "priority=%u\n", priority); 2077 file->priority = priority; 2078 2079 } 2080 2081 /* 2082 * Close routines 2083 */ 2084 2085 static void 2086 __file_close_async_done(void *ctx, int bserrno) 2087 { 2088 struct spdk_fs_request *req = ctx; 2089 struct spdk_fs_cb_args *args = &req->args; 2090 2091 args->fn.file_op(args->arg, bserrno); 2092 free_fs_request(req); 2093 } 2094 2095 static void 2096 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2097 { 2098 pthread_spin_lock(&file->lock); 2099 if (file->ref_count == 0) { 2100 pthread_spin_unlock(&file->lock); 2101 __file_close_async_done(req, -EBADF); 2102 return; 2103 } 2104 2105 file->ref_count--; 2106 if (file->ref_count > 0) { 2107 pthread_spin_unlock(&file->lock); 2108 __file_close_async_done(req, 0); 2109 return; 2110 } 2111 2112 pthread_spin_unlock(&file->lock); 2113 2114 spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req); 2115 } 2116 2117 static void 2118 __file_close_async__sync_done(void *arg, int fserrno) 2119 { 2120 struct spdk_fs_request *req = arg; 2121 struct spdk_fs_cb_args *args = &req->args; 2122 2123 __file_close_async(args->file, req); 2124 } 2125 2126 void 2127 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2128 { 2129 struct spdk_fs_request *req; 2130 struct spdk_fs_cb_args *args; 2131 2132 req = alloc_fs_request(file->fs->md_fs_channel); 2133 if (req == NULL) { 2134 cb_fn(cb_arg, -ENOMEM); 2135 return; 2136 } 2137 2138 args = &req->args; 2139 args->file = file; 2140 args->fn.file_op = cb_fn; 2141 args->arg = cb_arg; 2142 2143 spdk_file_sync_async(file, file->fs->md_io_channel, __file_close_async__sync_done, req); 2144 } 2145 2146 static void 2147 __file_close_done(void *arg, int fserrno) 2148 { 2149 struct spdk_fs_cb_args *args = arg; 2150 2151 args->rc = fserrno; 2152 sem_post(args->sem); 2153 } 2154 2155 static void 2156 __file_close(void *arg) 2157 { 2158 struct spdk_fs_request *req = arg; 2159 struct spdk_fs_cb_args *args = &req->args; 2160 struct spdk_file *file = args->file; 2161 2162 __file_close_async(file, req); 2163 } 2164 2165 int 2166 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2167 { 2168 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2169 struct spdk_fs_request *req; 2170 struct spdk_fs_cb_args *args; 2171 2172 req = alloc_fs_request(channel); 2173 assert(req != NULL); 2174 2175 args = &req->args; 2176 2177 spdk_file_sync(file, _channel); 2178 BLOBFS_TRACE(file, "name=%s\n", file->name); 2179 args->file = file; 2180 args->sem = &channel->sem; 2181 args->fn.file_op = __file_close_done; 2182 args->arg = req; 2183 channel->send_request(__file_close, req); 2184 sem_wait(&channel->sem); 2185 2186 return args->rc; 2187 } 2188 2189 static void 2190 cache_free_buffers(struct spdk_file *file) 2191 { 2192 BLOBFS_TRACE(file, "free=%s\n", file->name); 2193 pthread_spin_lock(&file->lock); 2194 pthread_spin_lock(&g_caches_lock); 2195 if (file->tree->present_mask == 0) { 2196 pthread_spin_unlock(&g_caches_lock); 2197 pthread_spin_unlock(&file->lock); 2198 return; 2199 } 2200 spdk_tree_free_buffers(file->tree); 2201 if (file->tree->present_mask == 0) { 2202 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2203 } 2204 file->last = NULL; 2205 pthread_spin_unlock(&g_caches_lock); 2206 pthread_spin_unlock(&file->lock); 2207 } 2208 2209 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS); 2210 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW); 2211