1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "blobfs_internal.h" 38 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/assert.h" 42 #include "spdk/env.h" 43 #include "spdk_internal/log.h" 44 45 #define BLOBFS_TRACE(file, str, args...) \ 46 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args) 47 48 #define BLOBFS_TRACE_RW(file, str, args...) \ 49 SPDK_TRACELOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args) 50 51 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 52 53 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 54 static struct spdk_mempool *g_cache_pool; 55 static TAILQ_HEAD(, spdk_file) g_caches; 56 static pthread_spinlock_t g_caches_lock; 57 58 static void 59 __sem_post(void *arg, int bserrno) 60 { 61 sem_t *sem = arg; 62 63 sem_post(sem); 64 } 65 66 void 67 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 68 { 69 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 70 free(cache_buffer); 71 } 72 73 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 74 75 struct spdk_file { 76 struct spdk_filesystem *fs; 77 struct spdk_blob *blob; 78 char *name; 79 uint64_t length; 80 bool open_for_writing; 81 uint64_t length_flushed; 82 uint64_t append_pos; 83 uint64_t seq_byte_count; 84 uint64_t next_seq_offset; 85 uint32_t priority; 86 TAILQ_ENTRY(spdk_file) tailq; 87 spdk_blob_id blobid; 88 uint32_t ref_count; 89 pthread_spinlock_t lock; 90 struct cache_buffer *last; 91 struct cache_tree *tree; 92 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 93 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 94 TAILQ_ENTRY(spdk_file) cache_tailq; 95 }; 96 97 struct spdk_filesystem { 98 struct spdk_blob_store *bs; 99 TAILQ_HEAD(, spdk_file) files; 100 struct spdk_bs_opts bs_opts; 101 struct spdk_bs_dev *bdev; 102 fs_send_request_fn send_request; 103 struct spdk_io_channel *sync_io_channel; 104 struct spdk_fs_channel *sync_fs_channel; 105 struct spdk_io_channel *md_io_channel; 106 struct spdk_fs_channel *md_fs_channel; 107 }; 108 109 struct spdk_fs_cb_args { 110 union { 111 spdk_fs_op_with_handle_complete fs_op_with_handle; 112 spdk_fs_op_complete fs_op; 113 spdk_file_op_with_handle_complete file_op_with_handle; 114 spdk_file_op_complete file_op; 115 spdk_file_stat_op_complete stat_op; 116 } fn; 117 void *arg; 118 sem_t *sem; 119 struct spdk_filesystem *fs; 120 struct spdk_file *file; 121 int rc; 122 bool from_request; 123 union { 124 struct { 125 uint64_t length; 126 } truncate; 127 struct { 128 struct spdk_io_channel *channel; 129 void *user_buf; 130 void *pin_buf; 131 int is_read; 132 off_t offset; 133 size_t length; 134 uint64_t start_page; 135 uint64_t num_pages; 136 uint32_t blocklen; 137 } rw; 138 struct { 139 const char *old_name; 140 const char *new_name; 141 } rename; 142 struct { 143 struct cache_buffer *cache_buffer; 144 uint64_t length; 145 } flush; 146 struct { 147 struct cache_buffer *cache_buffer; 148 uint64_t length; 149 uint64_t offset; 150 } readahead; 151 struct { 152 uint64_t offset; 153 TAILQ_ENTRY(spdk_fs_request) tailq; 154 } sync; 155 struct { 156 uint32_t num_clusters; 157 } resize; 158 struct { 159 const char *name; 160 uint32_t flags; 161 TAILQ_ENTRY(spdk_fs_request) tailq; 162 } open; 163 struct { 164 const char *name; 165 } create; 166 struct { 167 const char *name; 168 } delete; 169 struct { 170 const char *name; 171 } stat; 172 } op; 173 }; 174 175 static void cache_free_buffers(struct spdk_file *file); 176 177 static void 178 __initialize_cache(void) 179 { 180 if (g_cache_pool != NULL) { 181 return; 182 } 183 184 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 185 g_fs_cache_size / CACHE_BUFFER_SIZE, 186 CACHE_BUFFER_SIZE, -1, SPDK_ENV_SOCKET_ID_ANY); 187 TAILQ_INIT(&g_caches); 188 pthread_spin_init(&g_caches_lock, 0); 189 } 190 191 static uint64_t 192 __file_get_blob_size(struct spdk_file *file) 193 { 194 uint64_t cluster_sz; 195 196 cluster_sz = file->fs->bs_opts.cluster_sz; 197 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 198 } 199 200 struct spdk_fs_request { 201 struct spdk_fs_cb_args args; 202 TAILQ_ENTRY(spdk_fs_request) link; 203 struct spdk_fs_channel *channel; 204 }; 205 206 struct spdk_fs_channel { 207 struct spdk_fs_request *req_mem; 208 TAILQ_HEAD(, spdk_fs_request) reqs; 209 sem_t sem; 210 struct spdk_filesystem *fs; 211 struct spdk_io_channel *bs_channel; 212 fs_send_request_fn send_request; 213 }; 214 215 static struct spdk_fs_request * 216 alloc_fs_request(struct spdk_fs_channel *channel) 217 { 218 struct spdk_fs_request *req; 219 220 req = TAILQ_FIRST(&channel->reqs); 221 if (!req) { 222 return NULL; 223 } 224 TAILQ_REMOVE(&channel->reqs, req, link); 225 memset(req, 0, sizeof(*req)); 226 req->channel = channel; 227 req->args.from_request = true; 228 229 return req; 230 } 231 232 static void 233 free_fs_request(struct spdk_fs_request *req) 234 { 235 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 236 } 237 238 static int 239 _spdk_fs_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx) 240 { 241 struct spdk_filesystem *fs = io_device; 242 struct spdk_fs_channel *channel = ctx_buf; 243 uint32_t max_ops = 512; 244 uint32_t i; 245 246 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 247 if (!channel->req_mem) { 248 return -1; 249 } 250 251 TAILQ_INIT(&channel->reqs); 252 sem_init(&channel->sem, 0, 0); 253 254 for (i = 0; i < max_ops; i++) { 255 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 256 } 257 258 channel->fs = fs; 259 260 return 0; 261 } 262 263 static void 264 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 265 { 266 struct spdk_fs_channel *channel = ctx_buf; 267 268 free(channel->req_mem); 269 if (channel->bs_channel != NULL) { 270 spdk_bs_free_io_channel(channel->bs_channel); 271 } 272 } 273 274 static void 275 __send_request_direct(fs_request_fn fn, void *arg) 276 { 277 fn(arg); 278 } 279 280 static void 281 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 282 { 283 fs->bs = bs; 284 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 285 fs->md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT); 286 fs->md_fs_channel->send_request = __send_request_direct; 287 fs->sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT); 288 fs->sync_fs_channel->send_request = __send_request_direct; 289 } 290 291 static void 292 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 293 { 294 struct spdk_fs_request *req = ctx; 295 struct spdk_fs_cb_args *args = &req->args; 296 struct spdk_filesystem *fs = args->fs; 297 298 if (bserrno == 0) { 299 common_fs_bs_init(fs, bs); 300 } else { 301 free(fs); 302 fs = NULL; 303 } 304 305 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 306 free_fs_request(req); 307 } 308 309 static struct spdk_filesystem * 310 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 311 { 312 struct spdk_filesystem *fs; 313 314 fs = calloc(1, sizeof(*fs)); 315 if (fs == NULL) { 316 return NULL; 317 } 318 319 fs->bdev = dev; 320 fs->send_request = send_request_fn; 321 TAILQ_INIT(&fs->files); 322 spdk_io_device_register(fs, _spdk_fs_channel_create, _spdk_fs_channel_destroy, 323 sizeof(struct spdk_fs_channel)); 324 325 fs->md_io_channel = spdk_get_io_channel(fs, SPDK_IO_PRIORITY_DEFAULT, true, NULL); 326 fs->md_fs_channel = spdk_io_channel_get_ctx(fs->md_io_channel); 327 328 fs->sync_io_channel = spdk_get_io_channel(fs, SPDK_IO_PRIORITY_DEFAULT, true, NULL); 329 fs->sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_io_channel); 330 331 __initialize_cache(); 332 333 return fs; 334 } 335 336 void 337 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 338 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 339 { 340 struct spdk_filesystem *fs; 341 struct spdk_fs_request *req; 342 struct spdk_fs_cb_args *args; 343 344 fs = fs_alloc(dev, send_request_fn); 345 if (fs == NULL) { 346 cb_fn(cb_arg, NULL, -ENOMEM); 347 return; 348 } 349 350 req = alloc_fs_request(fs->md_fs_channel); 351 if (req == NULL) { 352 cb_fn(cb_arg, NULL, -ENOMEM); 353 return; 354 } 355 356 args = &req->args; 357 args->fn.fs_op_with_handle = cb_fn; 358 args->arg = cb_arg; 359 args->fs = fs; 360 361 spdk_bs_init(dev, NULL, init_cb, req); 362 } 363 364 static struct spdk_file * 365 file_alloc(struct spdk_filesystem *fs) 366 { 367 struct spdk_file *file; 368 369 file = calloc(1, sizeof(*file)); 370 if (file == NULL) { 371 return NULL; 372 } 373 374 file->fs = fs; 375 TAILQ_INIT(&file->open_requests); 376 TAILQ_INIT(&file->sync_requests); 377 pthread_spin_init(&file->lock, 0); 378 file->tree = calloc(1, sizeof(*file->tree)); 379 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 380 file->priority = SPDK_FILE_PRIORITY_LOW; 381 return file; 382 } 383 384 static void 385 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 386 { 387 struct spdk_fs_request *req = ctx; 388 struct spdk_fs_cb_args *args = &req->args; 389 struct spdk_filesystem *fs = args->fs; 390 struct spdk_file *f; 391 uint64_t *length; 392 const char *name; 393 size_t value_len; 394 395 if (rc == -ENOENT) { 396 /* Finished iterating */ 397 args->fn.fs_op_with_handle(args->arg, fs, 0); 398 free_fs_request(req); 399 return; 400 } else if (rc < 0) { 401 args->fn.fs_op_with_handle(args->arg, fs, rc); 402 free_fs_request(req); 403 return; 404 } 405 406 rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len); 407 if (rc < 0) { 408 args->fn.fs_op_with_handle(args->arg, fs, rc); 409 free_fs_request(req); 410 return; 411 } 412 413 rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len); 414 if (rc < 0) { 415 args->fn.fs_op_with_handle(args->arg, fs, rc); 416 free_fs_request(req); 417 return; 418 } 419 assert(value_len == 8); 420 421 f = file_alloc(fs); 422 if (f == NULL) { 423 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 424 free_fs_request(req); 425 return; 426 } 427 428 f->name = strdup(name); 429 f->blobid = spdk_blob_get_id(blob); 430 f->length = *length; 431 f->length_flushed = *length; 432 f->append_pos = *length; 433 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 434 435 spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req); 436 } 437 438 static void 439 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 440 { 441 struct spdk_fs_request *req = ctx; 442 struct spdk_fs_cb_args *args = &req->args; 443 struct spdk_filesystem *fs = args->fs; 444 445 if (bserrno != 0) { 446 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 447 free_fs_request(req); 448 free(fs); 449 return; 450 } 451 452 common_fs_bs_init(fs, bs); 453 spdk_bs_md_iter_first(fs->bs, iter_cb, req); 454 } 455 456 void 457 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 458 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 459 { 460 struct spdk_filesystem *fs; 461 struct spdk_fs_cb_args *args; 462 struct spdk_fs_request *req; 463 464 fs = fs_alloc(dev, send_request_fn); 465 if (fs == NULL) { 466 cb_fn(cb_arg, NULL, -ENOMEM); 467 return; 468 } 469 470 req = alloc_fs_request(fs->md_fs_channel); 471 if (req == NULL) { 472 cb_fn(cb_arg, NULL, -ENOMEM); 473 return; 474 } 475 476 args = &req->args; 477 args->fn.fs_op_with_handle = cb_fn; 478 args->arg = cb_arg; 479 args->fs = fs; 480 481 spdk_bs_load(dev, load_cb, req); 482 } 483 484 static void 485 unload_cb(void *ctx, int bserrno) 486 { 487 struct spdk_fs_request *req = ctx; 488 struct spdk_fs_cb_args *args = &req->args; 489 struct spdk_filesystem *fs = args->fs; 490 491 args->fn.fs_op(args->arg, bserrno); 492 free(req); 493 spdk_io_device_unregister(fs); 494 free(fs); 495 } 496 497 void 498 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 499 { 500 struct spdk_fs_request *req; 501 struct spdk_fs_cb_args *args; 502 503 /* 504 * We must free the md_channel before unloading the blobstore, so just 505 * allocate this request from the general heap. 506 */ 507 req = calloc(1, sizeof(*req)); 508 if (req == NULL) { 509 cb_fn(cb_arg, -ENOMEM); 510 return; 511 } 512 513 args = &req->args; 514 args->fn.fs_op = cb_fn; 515 args->arg = cb_arg; 516 args->fs = fs; 517 518 spdk_fs_free_io_channel(fs->md_io_channel); 519 spdk_fs_free_io_channel(fs->sync_io_channel); 520 spdk_bs_unload(fs->bs, unload_cb, req); 521 } 522 523 static struct spdk_file * 524 fs_find_file(struct spdk_filesystem *fs, const char *name) 525 { 526 struct spdk_file *file; 527 528 TAILQ_FOREACH(file, &fs->files, tailq) { 529 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 530 return file; 531 } 532 } 533 534 return NULL; 535 } 536 537 void 538 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 539 spdk_file_stat_op_complete cb_fn, void *cb_arg) 540 { 541 struct spdk_file_stat stat; 542 struct spdk_file *f = NULL; 543 544 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 545 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 546 return; 547 } 548 549 f = fs_find_file(fs, name); 550 if (f != NULL) { 551 stat.blobid = f->blobid; 552 stat.size = f->length; 553 cb_fn(cb_arg, &stat, 0); 554 return; 555 } 556 557 cb_fn(cb_arg, NULL, -ENOENT); 558 } 559 560 static void 561 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 562 { 563 struct spdk_fs_request *req = arg; 564 struct spdk_fs_cb_args *args = &req->args; 565 566 args->rc = fserrno; 567 if (fserrno == 0) { 568 memcpy(args->arg, stat, sizeof(*stat)); 569 } 570 sem_post(args->sem); 571 } 572 573 static void 574 __file_stat(void *arg) 575 { 576 struct spdk_fs_request *req = arg; 577 struct spdk_fs_cb_args *args = &req->args; 578 579 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 580 args->fn.stat_op, req); 581 } 582 583 int 584 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 585 const char *name, struct spdk_file_stat *stat) 586 { 587 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 588 struct spdk_fs_request *req; 589 int rc; 590 591 req = alloc_fs_request(channel); 592 assert(req != NULL); 593 594 req->args.fs = fs; 595 req->args.op.stat.name = name; 596 req->args.fn.stat_op = __copy_stat; 597 req->args.arg = stat; 598 req->args.sem = &channel->sem; 599 channel->send_request(__file_stat, req); 600 sem_wait(&channel->sem); 601 602 rc = req->args.rc; 603 free_fs_request(req); 604 605 return rc; 606 } 607 608 static void 609 fs_create_blob_close_cb(void *ctx, int bserrno) 610 { 611 struct spdk_fs_request *req = ctx; 612 struct spdk_fs_cb_args *args = &req->args; 613 614 args->fn.file_op(args->arg, bserrno); 615 free_fs_request(req); 616 } 617 618 static void 619 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 620 { 621 struct spdk_fs_request *req = ctx; 622 struct spdk_fs_cb_args *args = &req->args; 623 struct spdk_file *f = args->file; 624 uint64_t length = 0; 625 626 f->blob = blob; 627 spdk_bs_md_resize_blob(blob, 1); 628 spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 629 spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length)); 630 631 spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args); 632 } 633 634 static void 635 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 636 { 637 struct spdk_fs_request *req = ctx; 638 struct spdk_fs_cb_args *args = &req->args; 639 struct spdk_file *f = args->file; 640 641 f->blobid = blobid; 642 spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 643 } 644 645 void 646 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 647 spdk_file_op_complete cb_fn, void *cb_arg) 648 { 649 struct spdk_file *file; 650 struct spdk_fs_request *req; 651 struct spdk_fs_cb_args *args; 652 653 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 654 cb_fn(cb_arg, -ENAMETOOLONG); 655 return; 656 } 657 658 file = fs_find_file(fs, name); 659 if (file != NULL) { 660 cb_fn(cb_arg, -EEXIST); 661 return; 662 } 663 664 file = file_alloc(fs); 665 if (file == NULL) { 666 cb_fn(cb_arg, -ENOMEM); 667 return; 668 } 669 670 req = alloc_fs_request(fs->md_fs_channel); 671 if (req == NULL) { 672 cb_fn(cb_arg, -ENOMEM); 673 return; 674 } 675 676 args = &req->args; 677 args->file = file; 678 args->fn.file_op = cb_fn; 679 args->arg = cb_arg; 680 681 file->name = strdup(name); 682 spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args); 683 } 684 685 static void 686 __fs_create_file_done(void *arg, int fserrno) 687 { 688 struct spdk_fs_request *req = arg; 689 struct spdk_fs_cb_args *args = &req->args; 690 691 args->rc = fserrno; 692 sem_post(args->sem); 693 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 694 } 695 696 static void 697 __fs_create_file(void *arg) 698 { 699 struct spdk_fs_request *req = arg; 700 struct spdk_fs_cb_args *args = &req->args; 701 702 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 703 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 704 } 705 706 int 707 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 708 { 709 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 710 struct spdk_fs_request *req; 711 struct spdk_fs_cb_args *args; 712 int rc; 713 714 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 715 716 req = alloc_fs_request(channel); 717 assert(req != NULL); 718 719 args = &req->args; 720 args->fs = fs; 721 args->op.create.name = name; 722 args->sem = &channel->sem; 723 fs->send_request(__fs_create_file, req); 724 sem_wait(&channel->sem); 725 rc = args->rc; 726 free_fs_request(req); 727 728 return rc; 729 } 730 731 static void 732 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 733 { 734 struct spdk_fs_request *req = ctx; 735 struct spdk_fs_cb_args *args = &req->args; 736 struct spdk_file *f = args->file; 737 738 f->blob = blob; 739 while (!TAILQ_EMPTY(&f->open_requests)) { 740 req = TAILQ_FIRST(&f->open_requests); 741 args = &req->args; 742 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 743 args->fn.file_op_with_handle(args->arg, f, bserrno); 744 free_fs_request(req); 745 } 746 } 747 748 static void 749 fs_open_blob_create_cb(void *ctx, int bserrno) 750 { 751 struct spdk_fs_request *req = ctx; 752 struct spdk_fs_cb_args *args = &req->args; 753 struct spdk_file *file = args->file; 754 struct spdk_filesystem *fs = args->fs; 755 756 if (file == NULL) { 757 /* 758 * This is from an open with CREATE flag - the file 759 * is now created so look it up in the file list for this 760 * filesystem. 761 */ 762 file = fs_find_file(fs, args->op.open.name); 763 assert(file != NULL); 764 args->file = file; 765 } 766 767 file->ref_count++; 768 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 769 if (file->ref_count == 1) { 770 assert(file->blob == NULL); 771 spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 772 } else if (file->blob != NULL) { 773 fs_open_blob_done(req, file->blob, 0); 774 } else { 775 /* 776 * The blob open for this file is in progress due to a previous 777 * open request. When that open completes, it will invoke the 778 * open callback for this request. 779 */ 780 } 781 } 782 783 void 784 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 785 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 786 { 787 struct spdk_file *f = NULL; 788 struct spdk_fs_request *req; 789 struct spdk_fs_cb_args *args; 790 791 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 792 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 793 return; 794 } 795 796 f = fs_find_file(fs, name); 797 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 798 cb_fn(cb_arg, NULL, -ENOENT); 799 return; 800 } 801 802 req = alloc_fs_request(fs->md_fs_channel); 803 if (req == NULL) { 804 cb_fn(cb_arg, NULL, -ENOMEM); 805 return; 806 } 807 808 args = &req->args; 809 args->fn.file_op_with_handle = cb_fn; 810 args->arg = cb_arg; 811 args->file = f; 812 args->fs = fs; 813 args->op.open.name = name; 814 815 if (f == NULL) { 816 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 817 } else { 818 fs_open_blob_create_cb(req, 0); 819 } 820 } 821 822 static void 823 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 824 { 825 struct spdk_fs_request *req = arg; 826 struct spdk_fs_cb_args *args = &req->args; 827 828 args->file = file; 829 args->rc = bserrno; 830 sem_post(args->sem); 831 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 832 } 833 834 static void 835 __fs_open_file(void *arg) 836 { 837 struct spdk_fs_request *req = arg; 838 struct spdk_fs_cb_args *args = &req->args; 839 840 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 841 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 842 __fs_open_file_done, req); 843 } 844 845 int 846 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 847 const char *name, uint32_t flags, struct spdk_file **file) 848 { 849 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 850 struct spdk_fs_request *req; 851 struct spdk_fs_cb_args *args; 852 int rc; 853 854 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 855 856 req = alloc_fs_request(channel); 857 assert(req != NULL); 858 859 args = &req->args; 860 args->fs = fs; 861 args->op.open.name = name; 862 args->op.open.flags = flags; 863 args->sem = &channel->sem; 864 fs->send_request(__fs_open_file, req); 865 sem_wait(&channel->sem); 866 rc = args->rc; 867 if (rc == 0) { 868 *file = args->file; 869 } else { 870 *file = NULL; 871 } 872 free_fs_request(req); 873 874 return rc; 875 } 876 877 static void 878 fs_rename_blob_close_cb(void *ctx, int bserrno) 879 { 880 struct spdk_fs_request *req = ctx; 881 struct spdk_fs_cb_args *args = &req->args; 882 883 args->fn.fs_op(args->arg, bserrno); 884 free_fs_request(req); 885 } 886 887 static void 888 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 889 { 890 struct spdk_fs_request *req = ctx; 891 struct spdk_fs_cb_args *args = &req->args; 892 struct spdk_file *f = args->file; 893 const char *new_name = args->op.rename.new_name; 894 895 f->blob = blob; 896 spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 897 spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req); 898 } 899 900 static void 901 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 902 { 903 struct spdk_fs_cb_args *args = &req->args; 904 struct spdk_file *f; 905 906 f = fs_find_file(args->fs, args->op.rename.old_name); 907 if (f == NULL) { 908 args->fn.fs_op(args->arg, -ENOENT); 909 free_fs_request(req); 910 return; 911 } 912 913 free(f->name); 914 f->name = strdup(args->op.rename.new_name); 915 args->file = f; 916 spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 917 } 918 919 static void 920 fs_rename_delete_done(void *arg, int fserrno) 921 { 922 __spdk_fs_md_rename_file(arg); 923 } 924 925 void 926 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 927 const char *old_name, const char *new_name, 928 spdk_file_op_complete cb_fn, void *cb_arg) 929 { 930 struct spdk_file *f; 931 struct spdk_fs_request *req; 932 struct spdk_fs_cb_args *args; 933 934 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name); 935 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 936 cb_fn(cb_arg, -ENAMETOOLONG); 937 return; 938 } 939 940 req = alloc_fs_request(fs->md_fs_channel); 941 if (req == NULL) { 942 cb_fn(cb_arg, -ENOMEM); 943 return; 944 } 945 946 args = &req->args; 947 args->fn.fs_op = cb_fn; 948 args->fs = fs; 949 args->arg = cb_arg; 950 args->op.rename.old_name = old_name; 951 args->op.rename.new_name = new_name; 952 953 f = fs_find_file(fs, new_name); 954 if (f == NULL) { 955 __spdk_fs_md_rename_file(req); 956 return; 957 } 958 959 /* 960 * The rename overwrites an existing file. So delete the existing file, then 961 * do the actual rename. 962 */ 963 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 964 } 965 966 static void 967 __fs_rename_file_done(void *arg, int fserrno) 968 { 969 struct spdk_fs_request *req = arg; 970 struct spdk_fs_cb_args *args = &req->args; 971 972 args->rc = fserrno; 973 sem_post(args->sem); 974 } 975 976 static void 977 __fs_rename_file(void *arg) 978 { 979 struct spdk_fs_request *req = arg; 980 struct spdk_fs_cb_args *args = &req->args; 981 982 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 983 __fs_rename_file_done, req); 984 } 985 986 int 987 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 988 const char *old_name, const char *new_name) 989 { 990 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 991 struct spdk_fs_request *req; 992 struct spdk_fs_cb_args *args; 993 int rc; 994 995 req = alloc_fs_request(channel); 996 assert(req != NULL); 997 998 args = &req->args; 999 1000 args->fs = fs; 1001 args->op.rename.old_name = old_name; 1002 args->op.rename.new_name = new_name; 1003 args->sem = &channel->sem; 1004 fs->send_request(__fs_rename_file, req); 1005 sem_wait(&channel->sem); 1006 rc = args->rc; 1007 free_fs_request(req); 1008 return rc; 1009 } 1010 1011 static void 1012 blob_delete_cb(void *ctx, int bserrno) 1013 { 1014 struct spdk_fs_request *req = ctx; 1015 struct spdk_fs_cb_args *args = &req->args; 1016 1017 args->fn.file_op(args->arg, bserrno); 1018 free_fs_request(req); 1019 } 1020 1021 void 1022 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1023 spdk_file_op_complete cb_fn, void *cb_arg) 1024 { 1025 struct spdk_file *f; 1026 spdk_blob_id blobid; 1027 struct spdk_fs_request *req; 1028 struct spdk_fs_cb_args *args; 1029 1030 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1031 1032 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1033 cb_fn(cb_arg, -ENAMETOOLONG); 1034 return; 1035 } 1036 1037 f = fs_find_file(fs, name); 1038 if (f == NULL) { 1039 cb_fn(cb_arg, -ENOENT); 1040 return; 1041 } 1042 1043 if (f->ref_count > 0) { 1044 /* For now, do not allow deleting files with open references. */ 1045 cb_fn(cb_arg, -EBUSY); 1046 return; 1047 } 1048 1049 req = alloc_fs_request(fs->md_fs_channel); 1050 if (req == NULL) { 1051 cb_fn(cb_arg, -ENOMEM); 1052 return; 1053 } 1054 1055 TAILQ_REMOVE(&fs->files, f, tailq); 1056 1057 cache_free_buffers(f); 1058 1059 blobid = f->blobid; 1060 1061 free(f->name); 1062 free(f->tree); 1063 free(f); 1064 1065 args = &req->args; 1066 args->fn.file_op = cb_fn; 1067 args->arg = cb_arg; 1068 spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1069 } 1070 1071 static void 1072 __fs_delete_file_done(void *arg, int fserrno) 1073 { 1074 struct spdk_fs_request *req = arg; 1075 struct spdk_fs_cb_args *args = &req->args; 1076 1077 args->rc = fserrno; 1078 sem_post(args->sem); 1079 } 1080 1081 static void 1082 __fs_delete_file(void *arg) 1083 { 1084 struct spdk_fs_request *req = arg; 1085 struct spdk_fs_cb_args *args = &req->args; 1086 1087 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1088 } 1089 1090 int 1091 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1092 const char *name) 1093 { 1094 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1095 struct spdk_fs_request *req; 1096 struct spdk_fs_cb_args *args; 1097 int rc; 1098 1099 req = alloc_fs_request(channel); 1100 assert(req != NULL); 1101 1102 args = &req->args; 1103 args->fs = fs; 1104 args->op.delete.name = name; 1105 args->sem = &channel->sem; 1106 fs->send_request(__fs_delete_file, req); 1107 sem_wait(&channel->sem); 1108 rc = args->rc; 1109 free_fs_request(req); 1110 1111 return rc; 1112 } 1113 1114 spdk_fs_iter 1115 spdk_fs_iter_first(struct spdk_filesystem *fs) 1116 { 1117 struct spdk_file *f; 1118 1119 f = TAILQ_FIRST(&fs->files); 1120 return f; 1121 } 1122 1123 spdk_fs_iter 1124 spdk_fs_iter_next(spdk_fs_iter iter) 1125 { 1126 struct spdk_file *f = iter; 1127 1128 if (f == NULL) { 1129 return NULL; 1130 } 1131 1132 f = TAILQ_NEXT(f, tailq); 1133 return f; 1134 } 1135 1136 const char * 1137 spdk_file_get_name(struct spdk_file *file) 1138 { 1139 return file->name; 1140 } 1141 1142 uint64_t 1143 spdk_file_get_length(struct spdk_file *file) 1144 { 1145 assert(file != NULL); 1146 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1147 return file->length; 1148 } 1149 1150 static void 1151 fs_truncate_complete_cb(void *ctx, int bserrno) 1152 { 1153 struct spdk_fs_request *req = ctx; 1154 struct spdk_fs_cb_args *args = &req->args; 1155 1156 args->fn.file_op(args->arg, bserrno); 1157 free_fs_request(req); 1158 } 1159 1160 static uint64_t 1161 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1162 { 1163 return (length + cluster_sz - 1) / cluster_sz; 1164 } 1165 1166 void 1167 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1168 spdk_file_op_complete cb_fn, void *cb_arg) 1169 { 1170 struct spdk_filesystem *fs; 1171 size_t num_clusters; 1172 struct spdk_fs_request *req; 1173 struct spdk_fs_cb_args *args; 1174 1175 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1176 if (length == file->length) { 1177 cb_fn(cb_arg, 0); 1178 return; 1179 } 1180 1181 req = alloc_fs_request(file->fs->md_fs_channel); 1182 if (req == NULL) { 1183 cb_fn(cb_arg, -ENOMEM); 1184 return; 1185 } 1186 1187 args = &req->args; 1188 args->fn.file_op = cb_fn; 1189 args->arg = cb_arg; 1190 args->file = file; 1191 fs = file->fs; 1192 1193 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1194 1195 spdk_bs_md_resize_blob(file->blob, num_clusters); 1196 spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length)); 1197 1198 file->length = length; 1199 if (file->append_pos > file->length) { 1200 file->append_pos = file->length; 1201 } 1202 1203 spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args); 1204 } 1205 1206 static void 1207 __truncate(void *arg) 1208 { 1209 struct spdk_fs_request *req = arg; 1210 struct spdk_fs_cb_args *args = &req->args; 1211 1212 spdk_file_truncate_async(args->file, args->op.truncate.length, 1213 args->fn.file_op, args->arg); 1214 } 1215 1216 void 1217 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1218 uint64_t length) 1219 { 1220 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1221 struct spdk_fs_request *req; 1222 struct spdk_fs_cb_args *args; 1223 1224 req = alloc_fs_request(channel); 1225 assert(req != NULL); 1226 1227 args = &req->args; 1228 1229 args->file = file; 1230 args->op.truncate.length = length; 1231 args->fn.file_op = __sem_post; 1232 args->arg = &channel->sem; 1233 1234 channel->send_request(__truncate, req); 1235 sem_wait(&channel->sem); 1236 free_fs_request(req); 1237 } 1238 1239 static void 1240 __rw_done(void *ctx, int bserrno) 1241 { 1242 struct spdk_fs_request *req = ctx; 1243 struct spdk_fs_cb_args *args = &req->args; 1244 1245 spdk_free(args->op.rw.pin_buf); 1246 args->fn.file_op(args->arg, bserrno); 1247 free_fs_request(req); 1248 } 1249 1250 static void 1251 __read_done(void *ctx, int bserrno) 1252 { 1253 struct spdk_fs_request *req = ctx; 1254 struct spdk_fs_cb_args *args = &req->args; 1255 1256 if (args->op.rw.is_read) { 1257 memcpy(args->op.rw.user_buf, 1258 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1259 args->op.rw.length); 1260 __rw_done(req, 0); 1261 } else { 1262 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1263 args->op.rw.user_buf, 1264 args->op.rw.length); 1265 spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel, 1266 args->op.rw.pin_buf, 1267 args->op.rw.start_page, args->op.rw.num_pages, 1268 __rw_done, req); 1269 } 1270 } 1271 1272 static void 1273 __do_blob_read(void *ctx, int fserrno) 1274 { 1275 struct spdk_fs_request *req = ctx; 1276 struct spdk_fs_cb_args *args = &req->args; 1277 1278 spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel, 1279 args->op.rw.pin_buf, 1280 args->op.rw.start_page, args->op.rw.num_pages, 1281 __read_done, req); 1282 } 1283 1284 static void 1285 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1286 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1287 { 1288 uint64_t end_page; 1289 1290 *page_size = spdk_bs_get_page_size(file->fs->bs); 1291 *start_page = offset / *page_size; 1292 end_page = (offset + length - 1) / *page_size; 1293 *num_pages = (end_page - *start_page + 1); 1294 } 1295 1296 static void 1297 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1298 void *payload, uint64_t offset, uint64_t length, 1299 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1300 { 1301 struct spdk_fs_request *req; 1302 struct spdk_fs_cb_args *args; 1303 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1304 uint64_t start_page, num_pages, pin_buf_length; 1305 uint32_t page_size; 1306 1307 if (is_read && offset + length > file->length) { 1308 cb_fn(cb_arg, -EINVAL); 1309 return; 1310 } 1311 1312 req = alloc_fs_request(channel); 1313 if (req == NULL) { 1314 cb_fn(cb_arg, -ENOMEM); 1315 return; 1316 } 1317 1318 args = &req->args; 1319 args->fn.file_op = cb_fn; 1320 args->arg = cb_arg; 1321 args->file = file; 1322 args->op.rw.channel = channel->bs_channel; 1323 args->op.rw.user_buf = payload; 1324 args->op.rw.is_read = is_read; 1325 args->op.rw.offset = offset; 1326 args->op.rw.length = length; 1327 1328 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1329 pin_buf_length = num_pages * page_size; 1330 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, 4096, NULL); 1331 1332 args->op.rw.start_page = start_page; 1333 args->op.rw.num_pages = num_pages; 1334 1335 if (!is_read && file->length < offset + length) { 1336 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1337 } else { 1338 __do_blob_read(req, 0); 1339 } 1340 } 1341 1342 void 1343 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1344 void *payload, uint64_t offset, uint64_t length, 1345 spdk_file_op_complete cb_fn, void *cb_arg) 1346 { 1347 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1348 } 1349 1350 void 1351 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1352 void *payload, uint64_t offset, uint64_t length, 1353 spdk_file_op_complete cb_fn, void *cb_arg) 1354 { 1355 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n", 1356 file->name, offset, length); 1357 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1358 } 1359 1360 struct spdk_io_channel * 1361 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs, uint32_t priority) 1362 { 1363 struct spdk_io_channel *io_channel; 1364 struct spdk_fs_channel *fs_channel; 1365 1366 io_channel = spdk_get_io_channel(fs, priority, true, NULL); 1367 fs_channel = spdk_io_channel_get_ctx(io_channel); 1368 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT); 1369 fs_channel->send_request = __send_request_direct; 1370 1371 return io_channel; 1372 } 1373 1374 struct spdk_io_channel * 1375 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs, uint32_t priority) 1376 { 1377 struct spdk_io_channel *io_channel; 1378 struct spdk_fs_channel *fs_channel; 1379 1380 io_channel = spdk_get_io_channel(fs, priority, true, NULL); 1381 fs_channel = spdk_io_channel_get_ctx(io_channel); 1382 fs_channel->send_request = fs->send_request; 1383 1384 return io_channel; 1385 } 1386 1387 void 1388 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1389 { 1390 spdk_put_io_channel(channel); 1391 } 1392 1393 void 1394 spdk_fs_set_cache_size(uint64_t size_in_mb) 1395 { 1396 g_fs_cache_size = size_in_mb * 1024 * 1024; 1397 } 1398 1399 uint64_t 1400 spdk_fs_get_cache_size(void) 1401 { 1402 return g_fs_cache_size / (1024 * 1024); 1403 } 1404 1405 static void __file_flush(void *_args); 1406 1407 static void * 1408 alloc_cache_memory_buffer(struct spdk_file *context) 1409 { 1410 struct spdk_file *file; 1411 void *buf; 1412 1413 buf = spdk_mempool_get(g_cache_pool); 1414 if (buf != NULL) { 1415 return buf; 1416 } 1417 1418 pthread_spin_lock(&g_caches_lock); 1419 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1420 if (!file->open_for_writing && 1421 file->priority == SPDK_FILE_PRIORITY_LOW && 1422 file != context) { 1423 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1424 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1425 break; 1426 } 1427 } 1428 pthread_spin_unlock(&g_caches_lock); 1429 if (file != NULL) { 1430 cache_free_buffers(file); 1431 buf = spdk_mempool_get(g_cache_pool); 1432 if (buf != NULL) { 1433 return buf; 1434 } 1435 } 1436 1437 pthread_spin_lock(&g_caches_lock); 1438 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1439 if (!file->open_for_writing && file != context) { 1440 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1441 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1442 break; 1443 } 1444 } 1445 pthread_spin_unlock(&g_caches_lock); 1446 if (file != NULL) { 1447 cache_free_buffers(file); 1448 buf = spdk_mempool_get(g_cache_pool); 1449 if (buf != NULL) { 1450 return buf; 1451 } 1452 } 1453 1454 pthread_spin_lock(&g_caches_lock); 1455 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1456 if (file != context) { 1457 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1458 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1459 break; 1460 } 1461 } 1462 pthread_spin_unlock(&g_caches_lock); 1463 if (file != NULL) { 1464 cache_free_buffers(file); 1465 buf = spdk_mempool_get(g_cache_pool); 1466 if (buf != NULL) { 1467 return buf; 1468 } 1469 } 1470 1471 assert(false); 1472 return NULL; 1473 } 1474 1475 static struct cache_buffer * 1476 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1477 { 1478 struct cache_buffer *buf; 1479 int count = 0; 1480 1481 buf = calloc(1, sizeof(*buf)); 1482 if (buf == NULL) { 1483 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "calloc failed\n"); 1484 return NULL; 1485 } 1486 1487 buf->buf = alloc_cache_memory_buffer(file); 1488 if (buf->buf == NULL) { 1489 while (buf->buf == NULL) { 1490 /* 1491 * TODO: alloc_cache_memory_buffer() should eventually free 1492 * some buffers. Need a more sophisticated check here, instead 1493 * of just bailing if 100 tries does not result in getting a 1494 * free buffer. This will involve using the sync channel's 1495 * semaphore to block until a buffer becomes available. 1496 */ 1497 if (count++ == 100) { 1498 SPDK_ERRLOG("could not allocate cache buffer\n"); 1499 assert(false); 1500 free(buf); 1501 return NULL; 1502 } 1503 buf->buf = alloc_cache_memory_buffer(file); 1504 } 1505 } 1506 1507 buf->buf_size = CACHE_BUFFER_SIZE; 1508 buf->offset = offset; 1509 1510 pthread_spin_lock(&g_caches_lock); 1511 if (file->tree->present_mask == 0) { 1512 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1513 } 1514 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1515 pthread_spin_unlock(&g_caches_lock); 1516 1517 return buf; 1518 } 1519 1520 static struct cache_buffer * 1521 cache_append_buffer(struct spdk_file *file) 1522 { 1523 struct cache_buffer *last; 1524 1525 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1526 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1527 1528 last = cache_insert_buffer(file, file->append_pos); 1529 if (last == NULL) { 1530 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n"); 1531 return NULL; 1532 } 1533 1534 if (file->last != NULL) { 1535 file->last->next = last; 1536 } 1537 file->last = last; 1538 1539 return last; 1540 } 1541 1542 static void 1543 __wake_caller(struct spdk_fs_cb_args *args) 1544 { 1545 sem_post(args->sem); 1546 } 1547 1548 static void 1549 __file_cache_finish_sync(struct spdk_file *file) 1550 { 1551 struct spdk_fs_request *sync_req; 1552 struct spdk_fs_cb_args *sync_args; 1553 1554 pthread_spin_lock(&file->lock); 1555 while (!TAILQ_EMPTY(&file->sync_requests)) { 1556 sync_req = TAILQ_FIRST(&file->sync_requests); 1557 sync_args = &sync_req->args; 1558 if (sync_args->op.sync.offset > file->length_flushed) { 1559 break; 1560 } 1561 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1562 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1563 pthread_spin_unlock(&file->lock); 1564 sync_args->fn.file_op(sync_args->arg, 0); 1565 pthread_spin_lock(&file->lock); 1566 free_fs_request(sync_req); 1567 } 1568 pthread_spin_unlock(&file->lock); 1569 } 1570 1571 static void 1572 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1573 { 1574 struct spdk_file *file = ctx; 1575 1576 __file_cache_finish_sync(file); 1577 } 1578 1579 static void 1580 __free_args(struct spdk_fs_cb_args *args) 1581 { 1582 struct spdk_fs_request *req; 1583 1584 if (!args->from_request) { 1585 free(args); 1586 } else { 1587 /* Depends on args being at the start of the spdk_fs_request structure. */ 1588 req = (struct spdk_fs_request *)args; 1589 free_fs_request(req); 1590 } 1591 } 1592 1593 static void 1594 __file_flush_done(void *arg, int bserrno) 1595 { 1596 struct spdk_fs_cb_args *args = arg; 1597 struct spdk_fs_request *sync_req; 1598 struct spdk_file *file = args->file; 1599 struct cache_buffer *next = args->op.flush.cache_buffer; 1600 1601 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1602 1603 pthread_spin_lock(&file->lock); 1604 next->in_progress = false; 1605 next->bytes_flushed += args->op.flush.length; 1606 file->length_flushed += args->op.flush.length; 1607 if (file->length_flushed > file->length) { 1608 file->length = file->length_flushed; 1609 } 1610 if (next->bytes_flushed == next->buf_size) { 1611 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1612 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1613 } 1614 1615 TAILQ_FOREACH_REVERSE(sync_req, &file->sync_requests, sync_requests_head, args.op.sync.tailq) { 1616 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1617 break; 1618 } 1619 } 1620 1621 /* 1622 * Assert that there is no cached data that extends past the end of the underlying 1623 * blob. 1624 */ 1625 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1626 next->bytes_filled == 0); 1627 1628 if (sync_req != NULL) { 1629 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1630 spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed, 1631 sizeof(file->length_flushed)); 1632 1633 pthread_spin_unlock(&file->lock); 1634 spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file); 1635 } else { 1636 pthread_spin_unlock(&file->lock); 1637 __file_cache_finish_sync(file); 1638 } 1639 1640 __file_flush(args); 1641 } 1642 1643 static void 1644 __file_flush(void *_args) 1645 { 1646 struct spdk_fs_cb_args *args = _args; 1647 struct spdk_file *file = args->file; 1648 struct cache_buffer *next; 1649 uint64_t offset, length, start_page, num_pages; 1650 uint32_t page_size; 1651 1652 pthread_spin_lock(&file->lock); 1653 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1654 if (next == NULL || next->in_progress) { 1655 /* 1656 * There is either no data to flush, or a flush I/O is already in 1657 * progress. So return immediately - if a flush I/O is in 1658 * progress we will flush more data after that is completed. 1659 */ 1660 __free_args(args); 1661 pthread_spin_unlock(&file->lock); 1662 return; 1663 } 1664 1665 offset = next->offset + next->bytes_flushed; 1666 length = next->bytes_filled - next->bytes_flushed; 1667 if (length == 0) { 1668 __free_args(args); 1669 pthread_spin_unlock(&file->lock); 1670 return; 1671 } 1672 args->op.flush.length = length; 1673 args->op.flush.cache_buffer = next; 1674 1675 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1676 1677 next->in_progress = true; 1678 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1679 offset, length, start_page, num_pages); 1680 pthread_spin_unlock(&file->lock); 1681 spdk_bs_io_write_blob(file->blob, file->fs->sync_fs_channel->bs_channel, 1682 next->buf + (start_page * page_size) - next->offset, 1683 start_page, num_pages, 1684 __file_flush_done, args); 1685 } 1686 1687 static void 1688 __file_extend_done(void *arg, int bserrno) 1689 { 1690 struct spdk_fs_cb_args *args = arg; 1691 1692 __wake_caller(args); 1693 } 1694 1695 static void 1696 __file_extend_blob(void *_args) 1697 { 1698 struct spdk_fs_cb_args *args = _args; 1699 struct spdk_file *file = args->file; 1700 1701 spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters); 1702 1703 spdk_bs_md_sync_blob(file->blob, __file_extend_done, args); 1704 } 1705 1706 static void 1707 __rw_from_file_done(void *arg, int bserrno) 1708 { 1709 struct spdk_fs_cb_args *args = arg; 1710 1711 __wake_caller(args); 1712 __free_args(args); 1713 } 1714 1715 static void 1716 __rw_from_file(void *_args) 1717 { 1718 struct spdk_fs_cb_args *args = _args; 1719 struct spdk_file *file = args->file; 1720 1721 if (args->op.rw.is_read) { 1722 spdk_file_read_async(file, file->fs->sync_io_channel, args->op.rw.user_buf, 1723 args->op.rw.offset, args->op.rw.length, 1724 __rw_from_file_done, args); 1725 } else { 1726 spdk_file_write_async(file, file->fs->sync_io_channel, args->op.rw.user_buf, 1727 args->op.rw.offset, args->op.rw.length, 1728 __rw_from_file_done, args); 1729 } 1730 } 1731 1732 static int 1733 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1734 uint64_t offset, uint64_t length, bool is_read) 1735 { 1736 struct spdk_fs_cb_args *args; 1737 1738 args = calloc(1, sizeof(*args)); 1739 if (args == NULL) { 1740 sem_post(sem); 1741 return -ENOMEM; 1742 } 1743 1744 args->file = file; 1745 args->sem = sem; 1746 args->op.rw.user_buf = payload; 1747 args->op.rw.offset = offset; 1748 args->op.rw.length = length; 1749 args->op.rw.is_read = is_read; 1750 file->fs->send_request(__rw_from_file, args); 1751 return 0; 1752 } 1753 1754 int 1755 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 1756 void *payload, uint64_t offset, uint64_t length) 1757 { 1758 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1759 struct spdk_fs_cb_args *args; 1760 uint64_t rem_length, copy, blob_size, cluster_sz; 1761 uint32_t cache_buffers_filled = 0; 1762 uint8_t *cur_payload; 1763 struct cache_buffer *last; 1764 1765 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 1766 1767 if (length == 0) { 1768 return 0; 1769 } 1770 1771 if (offset != file->append_pos) { 1772 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 1773 return -EINVAL; 1774 } 1775 1776 pthread_spin_lock(&file->lock); 1777 file->open_for_writing = true; 1778 1779 if (file->last == NULL) { 1780 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 1781 cache_append_buffer(file); 1782 } else { 1783 int rc; 1784 1785 file->append_pos += length; 1786 rc = __send_rw_from_file(file, &channel->sem, payload, 1787 offset, length, false); 1788 pthread_spin_unlock(&file->lock); 1789 sem_wait(&channel->sem); 1790 return rc; 1791 } 1792 } 1793 1794 blob_size = __file_get_blob_size(file); 1795 1796 if ((offset + length) > blob_size) { 1797 struct spdk_fs_cb_args extend_args = {}; 1798 1799 cluster_sz = file->fs->bs_opts.cluster_sz; 1800 extend_args.sem = &channel->sem; 1801 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 1802 extend_args.file = file; 1803 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 1804 pthread_spin_unlock(&file->lock); 1805 file->fs->send_request(__file_extend_blob, &extend_args); 1806 sem_wait(&channel->sem); 1807 } 1808 1809 last = file->last; 1810 rem_length = length; 1811 cur_payload = payload; 1812 while (rem_length > 0) { 1813 copy = last->buf_size - last->bytes_filled; 1814 if (copy > rem_length) { 1815 copy = rem_length; 1816 } 1817 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 1818 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 1819 file->append_pos += copy; 1820 if (file->length < file->append_pos) { 1821 file->length = file->append_pos; 1822 } 1823 cur_payload += copy; 1824 last->bytes_filled += copy; 1825 rem_length -= copy; 1826 if (last->bytes_filled == last->buf_size) { 1827 cache_buffers_filled++; 1828 last = cache_append_buffer(file); 1829 if (last == NULL) { 1830 BLOBFS_TRACE(file, "nomem\n"); 1831 pthread_spin_unlock(&file->lock); 1832 return -ENOMEM; 1833 } 1834 } 1835 } 1836 1837 if (cache_buffers_filled == 0) { 1838 pthread_spin_unlock(&file->lock); 1839 return 0; 1840 } 1841 1842 args = calloc(1, sizeof(*args)); 1843 if (args == NULL) { 1844 pthread_spin_unlock(&file->lock); 1845 return -ENOMEM; 1846 } 1847 1848 args->file = file; 1849 file->fs->send_request(__file_flush, args); 1850 pthread_spin_unlock(&file->lock); 1851 return 0; 1852 } 1853 1854 static void 1855 __readahead_done(void *arg, int bserrno) 1856 { 1857 struct spdk_fs_cb_args *args = arg; 1858 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 1859 struct spdk_file *file = args->file; 1860 1861 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 1862 1863 pthread_spin_lock(&file->lock); 1864 cache_buffer->bytes_filled = args->op.readahead.length; 1865 cache_buffer->bytes_flushed = args->op.readahead.length; 1866 cache_buffer->in_progress = false; 1867 pthread_spin_unlock(&file->lock); 1868 1869 __free_args(args); 1870 } 1871 1872 static void 1873 __readahead(void *_args) 1874 { 1875 struct spdk_fs_cb_args *args = _args; 1876 struct spdk_file *file = args->file; 1877 uint64_t offset, length, start_page, num_pages; 1878 uint32_t page_size; 1879 1880 offset = args->op.readahead.offset; 1881 length = args->op.readahead.length; 1882 assert(length > 0); 1883 1884 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1885 1886 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1887 offset, length, start_page, num_pages); 1888 spdk_bs_io_read_blob(file->blob, file->fs->sync_fs_channel->bs_channel, 1889 args->op.readahead.cache_buffer->buf, 1890 start_page, num_pages, 1891 __readahead_done, args); 1892 } 1893 1894 static uint64_t 1895 __next_cache_buffer_offset(uint64_t offset) 1896 { 1897 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 1898 } 1899 1900 static void 1901 check_readahead(struct spdk_file *file, uint64_t offset) 1902 { 1903 struct spdk_fs_cb_args *args; 1904 1905 offset = __next_cache_buffer_offset(offset); 1906 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 1907 return; 1908 } 1909 1910 args = calloc(1, sizeof(*args)); 1911 if (args == NULL) { 1912 return; 1913 } 1914 1915 BLOBFS_TRACE(file, "offset=%jx\n", offset); 1916 1917 args->file = file; 1918 args->op.readahead.offset = offset; 1919 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 1920 args->op.readahead.cache_buffer->in_progress = true; 1921 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 1922 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 1923 } else { 1924 args->op.readahead.length = CACHE_BUFFER_SIZE; 1925 } 1926 file->fs->send_request(__readahead, args); 1927 } 1928 1929 static int 1930 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 1931 { 1932 struct cache_buffer *buf; 1933 1934 buf = spdk_tree_find_filled_buffer(file->tree, offset); 1935 if (buf == NULL) { 1936 return __send_rw_from_file(file, sem, payload, offset, length, true); 1937 } 1938 1939 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 1940 length = buf->offset + buf->bytes_filled - offset; 1941 } 1942 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 1943 memcpy(payload, &buf->buf[offset - buf->offset], length); 1944 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 1945 pthread_spin_lock(&g_caches_lock); 1946 spdk_tree_remove_buffer(file->tree, buf); 1947 if (file->tree->present_mask == 0) { 1948 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1949 } 1950 pthread_spin_unlock(&g_caches_lock); 1951 } 1952 1953 sem_post(sem); 1954 return 0; 1955 } 1956 1957 int64_t 1958 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 1959 void *payload, uint64_t offset, uint64_t length) 1960 { 1961 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1962 uint64_t final_offset, final_length; 1963 uint32_t sub_reads = 0; 1964 int rc = 0; 1965 1966 pthread_spin_lock(&file->lock); 1967 1968 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 1969 1970 file->open_for_writing = false; 1971 1972 if (length == 0 || offset >= file->length) { 1973 pthread_spin_unlock(&file->lock); 1974 return 0; 1975 } 1976 1977 if (offset + length > file->length) { 1978 length = file->length - offset; 1979 } 1980 1981 if (offset != file->next_seq_offset) { 1982 file->seq_byte_count = 0; 1983 } 1984 file->seq_byte_count += length; 1985 file->next_seq_offset = offset + length; 1986 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 1987 check_readahead(file, offset); 1988 check_readahead(file, offset + CACHE_BUFFER_SIZE); 1989 } 1990 1991 final_length = 0; 1992 final_offset = offset + length; 1993 while (offset < final_offset) { 1994 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 1995 if (length > (final_offset - offset)) { 1996 length = final_offset - offset; 1997 } 1998 rc = __file_read(file, payload, offset, length, &channel->sem); 1999 if (rc == 0) { 2000 final_length += length; 2001 } else { 2002 break; 2003 } 2004 payload += length; 2005 offset += length; 2006 sub_reads++; 2007 } 2008 pthread_spin_unlock(&file->lock); 2009 while (sub_reads-- > 0) { 2010 sem_wait(&channel->sem); 2011 } 2012 if (rc == 0) { 2013 return final_length; 2014 } else { 2015 return rc; 2016 } 2017 } 2018 2019 static void 2020 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2021 spdk_file_op_complete cb_fn, void *cb_arg) 2022 { 2023 struct spdk_fs_request *sync_req; 2024 struct spdk_fs_request *flush_req; 2025 struct spdk_fs_cb_args *sync_args; 2026 struct spdk_fs_cb_args *flush_args; 2027 2028 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2029 2030 pthread_spin_lock(&file->lock); 2031 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2032 BLOBFS_TRACE(file, "done - no data to flush\n"); 2033 pthread_spin_unlock(&file->lock); 2034 cb_fn(cb_arg, 0); 2035 return; 2036 } 2037 2038 sync_req = alloc_fs_request(channel); 2039 assert(sync_req != NULL); 2040 sync_args = &sync_req->args; 2041 2042 flush_req = alloc_fs_request(channel); 2043 assert(flush_req != NULL); 2044 flush_args = &flush_req->args; 2045 2046 sync_args->file = file; 2047 sync_args->fn.file_op = cb_fn; 2048 sync_args->arg = cb_arg; 2049 sync_args->op.sync.offset = file->append_pos; 2050 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2051 pthread_spin_unlock(&file->lock); 2052 2053 flush_args->file = file; 2054 channel->send_request(__file_flush, flush_args); 2055 } 2056 2057 int 2058 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2059 { 2060 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2061 2062 _file_sync(file, channel, __sem_post, &channel->sem); 2063 sem_wait(&channel->sem); 2064 2065 return 0; 2066 } 2067 2068 void 2069 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2070 spdk_file_op_complete cb_fn, void *cb_arg) 2071 { 2072 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2073 2074 _file_sync(file, channel, cb_fn, cb_arg); 2075 } 2076 2077 void 2078 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2079 { 2080 BLOBFS_TRACE(file, "priority=%u\n", priority); 2081 file->priority = priority; 2082 2083 } 2084 2085 /* 2086 * Close routines 2087 */ 2088 2089 static void 2090 __file_close_async_done(void *ctx, int bserrno) 2091 { 2092 struct spdk_fs_request *req = ctx; 2093 struct spdk_fs_cb_args *args = &req->args; 2094 2095 args->fn.file_op(args->arg, bserrno); 2096 free_fs_request(req); 2097 } 2098 2099 static void 2100 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2101 { 2102 pthread_spin_lock(&file->lock); 2103 if (file->ref_count == 0) { 2104 pthread_spin_unlock(&file->lock); 2105 __file_close_async_done(req, -EBADF); 2106 return; 2107 } 2108 2109 file->ref_count--; 2110 if (file->ref_count > 0) { 2111 pthread_spin_unlock(&file->lock); 2112 __file_close_async_done(req, 0); 2113 return; 2114 } 2115 2116 pthread_spin_unlock(&file->lock); 2117 2118 spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req); 2119 } 2120 2121 static void 2122 __file_close_async__sync_done(void *arg, int fserrno) 2123 { 2124 struct spdk_fs_request *req = arg; 2125 struct spdk_fs_cb_args *args = &req->args; 2126 2127 __file_close_async(args->file, req); 2128 } 2129 2130 void 2131 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2132 { 2133 struct spdk_fs_request *req; 2134 struct spdk_fs_cb_args *args; 2135 2136 req = alloc_fs_request(file->fs->md_fs_channel); 2137 if (req == NULL) { 2138 cb_fn(cb_arg, -ENOMEM); 2139 return; 2140 } 2141 2142 args = &req->args; 2143 args->file = file; 2144 args->fn.file_op = cb_fn; 2145 args->arg = cb_arg; 2146 2147 spdk_file_sync_async(file, file->fs->md_io_channel, __file_close_async__sync_done, req); 2148 } 2149 2150 static void 2151 __file_close_done(void *arg, int fserrno) 2152 { 2153 struct spdk_fs_cb_args *args = arg; 2154 2155 args->rc = fserrno; 2156 sem_post(args->sem); 2157 } 2158 2159 static void 2160 __file_close(void *arg) 2161 { 2162 struct spdk_fs_request *req = arg; 2163 struct spdk_fs_cb_args *args = &req->args; 2164 struct spdk_file *file = args->file; 2165 2166 __file_close_async(file, req); 2167 } 2168 2169 int 2170 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2171 { 2172 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2173 struct spdk_fs_request *req; 2174 struct spdk_fs_cb_args *args; 2175 2176 req = alloc_fs_request(channel); 2177 assert(req != NULL); 2178 2179 args = &req->args; 2180 2181 spdk_file_sync(file, _channel); 2182 BLOBFS_TRACE(file, "name=%s\n", file->name); 2183 args->file = file; 2184 args->sem = &channel->sem; 2185 args->fn.file_op = __file_close_done; 2186 args->arg = req; 2187 channel->send_request(__file_close, req); 2188 sem_wait(&channel->sem); 2189 2190 return args->rc; 2191 } 2192 2193 static void 2194 cache_free_buffers(struct spdk_file *file) 2195 { 2196 BLOBFS_TRACE(file, "free=%s\n", file->name); 2197 pthread_spin_lock(&file->lock); 2198 pthread_spin_lock(&g_caches_lock); 2199 if (file->tree->present_mask == 0) { 2200 pthread_spin_unlock(&g_caches_lock); 2201 pthread_spin_unlock(&file->lock); 2202 return; 2203 } 2204 spdk_tree_free_buffers(file->tree); 2205 if (file->tree->present_mask == 0) { 2206 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2207 } 2208 file->last = NULL; 2209 pthread_spin_unlock(&g_caches_lock); 2210 pthread_spin_unlock(&file->lock); 2211 } 2212 2213 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS); 2214 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW); 2215