1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "blobfs_internal.h" 38 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/assert.h" 42 #include "spdk/env.h" 43 #include "spdk_internal/log.h" 44 45 #define BLOBFS_TRACE(file, str, args...) \ 46 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args) 47 48 #define BLOBFS_TRACE_RW(file, str, args...) \ 49 SPDK_TRACELOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args) 50 51 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 52 53 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 54 static struct spdk_mempool *g_cache_pool; 55 static TAILQ_HEAD(, spdk_file) g_caches; 56 static pthread_spinlock_t g_caches_lock; 57 58 static void 59 __sem_post(void *arg, int bserrno) 60 { 61 sem_t *sem = arg; 62 63 sem_post(sem); 64 } 65 66 void 67 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 68 { 69 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 70 free(cache_buffer); 71 } 72 73 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 74 75 struct spdk_file { 76 struct spdk_filesystem *fs; 77 struct spdk_blob *blob; 78 char *name; 79 uint64_t length; 80 bool open_for_writing; 81 uint64_t length_flushed; 82 uint64_t append_pos; 83 uint64_t seq_byte_count; 84 uint64_t next_seq_offset; 85 uint32_t priority; 86 TAILQ_ENTRY(spdk_file) tailq; 87 spdk_blob_id blobid; 88 uint32_t ref_count; 89 pthread_spinlock_t lock; 90 struct cache_buffer *last; 91 struct cache_tree *tree; 92 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 93 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 94 TAILQ_ENTRY(spdk_file) cache_tailq; 95 }; 96 97 struct spdk_filesystem { 98 struct spdk_blob_store *bs; 99 TAILQ_HEAD(, spdk_file) files; 100 struct spdk_bs_opts bs_opts; 101 struct spdk_bs_dev *bdev; 102 fs_send_request_fn send_request; 103 struct spdk_io_channel *sync_io_channel; 104 struct spdk_fs_channel *sync_fs_channel; 105 struct spdk_io_channel *md_io_channel; 106 struct spdk_fs_channel *md_fs_channel; 107 }; 108 109 struct spdk_fs_cb_args { 110 union { 111 spdk_fs_op_with_handle_complete fs_op_with_handle; 112 spdk_fs_op_complete fs_op; 113 spdk_file_op_with_handle_complete file_op_with_handle; 114 spdk_file_op_complete file_op; 115 spdk_file_stat_op_complete stat_op; 116 } fn; 117 void *arg; 118 sem_t *sem; 119 struct spdk_filesystem *fs; 120 struct spdk_file *file; 121 int rc; 122 bool from_request; 123 union { 124 struct { 125 uint64_t length; 126 } truncate; 127 struct { 128 struct spdk_io_channel *channel; 129 void *user_buf; 130 void *pin_buf; 131 int is_read; 132 off_t offset; 133 size_t length; 134 uint64_t start_page; 135 uint64_t num_pages; 136 uint32_t blocklen; 137 } rw; 138 struct { 139 const char *old_name; 140 const char *new_name; 141 } rename; 142 struct { 143 struct cache_buffer *cache_buffer; 144 uint64_t length; 145 } flush; 146 struct { 147 struct cache_buffer *cache_buffer; 148 uint64_t length; 149 uint64_t offset; 150 } readahead; 151 struct { 152 uint64_t offset; 153 TAILQ_ENTRY(spdk_fs_request) tailq; 154 } sync; 155 struct { 156 uint32_t num_clusters; 157 } resize; 158 struct { 159 const char *name; 160 uint32_t flags; 161 TAILQ_ENTRY(spdk_fs_request) tailq; 162 } open; 163 struct { 164 const char *name; 165 } create; 166 struct { 167 const char *name; 168 } delete; 169 struct { 170 const char *name; 171 } stat; 172 } op; 173 }; 174 175 static void cache_free_buffers(struct spdk_file *file); 176 177 static void 178 __initialize_cache(void) 179 { 180 if (g_cache_pool != NULL) { 181 return; 182 } 183 184 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 185 g_fs_cache_size / CACHE_BUFFER_SIZE, 186 CACHE_BUFFER_SIZE, -1, SPDK_ENV_SOCKET_ID_ANY); 187 TAILQ_INIT(&g_caches); 188 pthread_spin_init(&g_caches_lock, 0); 189 } 190 191 static uint64_t 192 __file_get_blob_size(struct spdk_file *file) 193 { 194 uint64_t cluster_sz; 195 196 cluster_sz = file->fs->bs_opts.cluster_sz; 197 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 198 } 199 200 struct spdk_fs_request { 201 struct spdk_fs_cb_args args; 202 TAILQ_ENTRY(spdk_fs_request) link; 203 struct spdk_fs_channel *channel; 204 }; 205 206 struct spdk_fs_channel { 207 struct spdk_fs_request *req_mem; 208 TAILQ_HEAD(, spdk_fs_request) reqs; 209 sem_t sem; 210 struct spdk_filesystem *fs; 211 struct spdk_io_channel *bs_channel; 212 fs_send_request_fn send_request; 213 }; 214 215 static struct spdk_fs_request * 216 alloc_fs_request(struct spdk_fs_channel *channel) 217 { 218 struct spdk_fs_request *req; 219 220 req = TAILQ_FIRST(&channel->reqs); 221 if (!req) { 222 return NULL; 223 } 224 TAILQ_REMOVE(&channel->reqs, req, link); 225 memset(req, 0, sizeof(*req)); 226 req->channel = channel; 227 req->args.from_request = true; 228 229 return req; 230 } 231 232 static void 233 free_fs_request(struct spdk_fs_request *req) 234 { 235 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 236 } 237 238 static int 239 _spdk_fs_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx) 240 { 241 struct spdk_filesystem *fs = io_device; 242 struct spdk_fs_channel *channel = ctx_buf; 243 uint32_t max_ops = *(uint32_t *)unique_ctx; 244 uint32_t i; 245 246 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 247 if (!channel->req_mem) { 248 free(channel); 249 return -1; 250 } 251 252 TAILQ_INIT(&channel->reqs); 253 sem_init(&channel->sem, 0, 0); 254 255 for (i = 0; i < max_ops; i++) { 256 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 257 } 258 259 channel->fs = fs; 260 261 return 0; 262 } 263 264 static void 265 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 266 { 267 struct spdk_fs_channel *channel = ctx_buf; 268 269 free(channel->req_mem); 270 if (channel->bs_channel != NULL) { 271 spdk_bs_free_io_channel(channel->bs_channel); 272 } 273 } 274 275 static void 276 __send_request_direct(fs_request_fn fn, void *arg) 277 { 278 fn(arg); 279 } 280 281 static void 282 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 283 { 284 fs->bs = bs; 285 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 286 fs->md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT, 512); 287 fs->md_fs_channel->send_request = __send_request_direct; 288 fs->sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT, 512); 289 fs->sync_fs_channel->send_request = __send_request_direct; 290 } 291 292 static void 293 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 294 { 295 struct spdk_fs_request *req = ctx; 296 struct spdk_fs_cb_args *args = &req->args; 297 struct spdk_filesystem *fs = args->fs; 298 299 if (bserrno == 0) { 300 common_fs_bs_init(fs, bs); 301 } else { 302 free(fs); 303 fs = NULL; 304 } 305 306 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 307 free_fs_request(req); 308 } 309 310 static struct spdk_filesystem * 311 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 312 { 313 struct spdk_filesystem *fs; 314 uint32_t max_ops = 512; 315 316 fs = calloc(1, sizeof(*fs)); 317 if (fs == NULL) { 318 return NULL; 319 } 320 321 fs->bdev = dev; 322 fs->send_request = send_request_fn; 323 TAILQ_INIT(&fs->files); 324 spdk_io_device_register(fs, _spdk_fs_channel_create, _spdk_fs_channel_destroy, 325 sizeof(struct spdk_fs_channel)); 326 327 fs->md_io_channel = spdk_get_io_channel(fs, SPDK_IO_PRIORITY_DEFAULT, true, (void *)&max_ops); 328 fs->md_fs_channel = spdk_io_channel_get_ctx(fs->md_io_channel); 329 330 fs->sync_io_channel = spdk_get_io_channel(fs, SPDK_IO_PRIORITY_DEFAULT, true, (void *)&max_ops); 331 fs->sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_io_channel); 332 333 __initialize_cache(); 334 335 return fs; 336 } 337 338 void 339 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 340 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 341 { 342 struct spdk_filesystem *fs; 343 struct spdk_fs_request *req; 344 struct spdk_fs_cb_args *args; 345 346 fs = fs_alloc(dev, send_request_fn); 347 if (fs == NULL) { 348 cb_fn(cb_arg, NULL, -ENOMEM); 349 return; 350 } 351 352 req = alloc_fs_request(fs->md_fs_channel); 353 if (req == NULL) { 354 cb_fn(cb_arg, NULL, -ENOMEM); 355 return; 356 } 357 358 args = &req->args; 359 args->fn.fs_op_with_handle = cb_fn; 360 args->arg = cb_arg; 361 args->fs = fs; 362 363 spdk_bs_init(dev, NULL, init_cb, req); 364 } 365 366 static struct spdk_file * 367 file_alloc(struct spdk_filesystem *fs) 368 { 369 struct spdk_file *file; 370 371 file = calloc(1, sizeof(*file)); 372 if (file == NULL) { 373 return NULL; 374 } 375 376 file->fs = fs; 377 TAILQ_INIT(&file->open_requests); 378 TAILQ_INIT(&file->sync_requests); 379 pthread_spin_init(&file->lock, 0); 380 file->tree = calloc(1, sizeof(*file->tree)); 381 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 382 file->priority = SPDK_FILE_PRIORITY_LOW; 383 return file; 384 } 385 386 static void 387 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 388 { 389 struct spdk_fs_request *req = ctx; 390 struct spdk_fs_cb_args *args = &req->args; 391 struct spdk_filesystem *fs = args->fs; 392 struct spdk_file *f; 393 uint64_t *length; 394 const char *name; 395 size_t value_len; 396 397 if (rc == -ENOENT) { 398 /* Finished iterating */ 399 args->fn.fs_op_with_handle(args->arg, fs, 0); 400 free_fs_request(req); 401 return; 402 } else if (rc < 0) { 403 args->fn.fs_op_with_handle(args->arg, fs, rc); 404 free_fs_request(req); 405 return; 406 } 407 408 rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len); 409 if (rc < 0) { 410 args->fn.fs_op_with_handle(args->arg, fs, rc); 411 free_fs_request(req); 412 return; 413 } 414 415 rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len); 416 if (rc < 0) { 417 args->fn.fs_op_with_handle(args->arg, fs, rc); 418 free_fs_request(req); 419 return; 420 } 421 assert(value_len == 8); 422 423 f = file_alloc(fs); 424 if (f == NULL) { 425 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 426 free_fs_request(req); 427 return; 428 } 429 430 f->name = strdup(name); 431 f->blobid = spdk_blob_get_id(blob); 432 f->length = *length; 433 f->length_flushed = *length; 434 f->append_pos = *length; 435 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 436 437 spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req); 438 } 439 440 static void 441 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 442 { 443 struct spdk_fs_request *req = ctx; 444 struct spdk_fs_cb_args *args = &req->args; 445 struct spdk_filesystem *fs = args->fs; 446 447 if (bserrno != 0) { 448 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 449 free_fs_request(req); 450 free(fs); 451 return; 452 } 453 454 common_fs_bs_init(fs, bs); 455 spdk_bs_md_iter_first(fs->bs, iter_cb, req); 456 } 457 458 void 459 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 460 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 461 { 462 struct spdk_filesystem *fs; 463 struct spdk_fs_cb_args *args; 464 struct spdk_fs_request *req; 465 466 fs = fs_alloc(dev, send_request_fn); 467 if (fs == NULL) { 468 cb_fn(cb_arg, NULL, -ENOMEM); 469 return; 470 } 471 472 req = alloc_fs_request(fs->md_fs_channel); 473 if (req == NULL) { 474 cb_fn(cb_arg, NULL, -ENOMEM); 475 return; 476 } 477 478 args = &req->args; 479 args->fn.fs_op_with_handle = cb_fn; 480 args->arg = cb_arg; 481 args->fs = fs; 482 483 spdk_bs_load(dev, load_cb, req); 484 } 485 486 static void 487 unload_cb(void *ctx, int bserrno) 488 { 489 struct spdk_fs_request *req = ctx; 490 struct spdk_fs_cb_args *args = &req->args; 491 struct spdk_filesystem *fs = args->fs; 492 493 args->fn.fs_op(args->arg, bserrno); 494 free(req); 495 spdk_io_device_unregister(fs); 496 free(fs); 497 } 498 499 void 500 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 501 { 502 struct spdk_fs_request *req; 503 struct spdk_fs_cb_args *args; 504 505 /* 506 * We must free the md_channel before unloading the blobstore, so just 507 * allocate this request from the general heap. 508 */ 509 req = calloc(1, sizeof(*req)); 510 if (req == NULL) { 511 cb_fn(cb_arg, -ENOMEM); 512 return; 513 } 514 515 args = &req->args; 516 args->fn.fs_op = cb_fn; 517 args->arg = cb_arg; 518 args->fs = fs; 519 520 spdk_fs_free_io_channel(fs->md_io_channel); 521 spdk_fs_free_io_channel(fs->sync_io_channel); 522 spdk_bs_unload(fs->bs, unload_cb, req); 523 } 524 525 static struct spdk_file * 526 fs_find_file(struct spdk_filesystem *fs, const char *name) 527 { 528 struct spdk_file *file; 529 530 TAILQ_FOREACH(file, &fs->files, tailq) { 531 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 532 return file; 533 } 534 } 535 536 return NULL; 537 } 538 539 void 540 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 541 spdk_file_stat_op_complete cb_fn, void *cb_arg) 542 { 543 struct spdk_file_stat stat; 544 struct spdk_file *f = NULL; 545 546 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 547 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 548 return; 549 } 550 551 f = fs_find_file(fs, name); 552 if (f != NULL) { 553 stat.blobid = f->blobid; 554 stat.size = f->length; 555 cb_fn(cb_arg, &stat, 0); 556 return; 557 } 558 559 cb_fn(cb_arg, NULL, -ENOENT); 560 } 561 562 static void 563 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 564 { 565 struct spdk_fs_request *req = arg; 566 struct spdk_fs_cb_args *args = &req->args; 567 568 args->rc = fserrno; 569 if (fserrno == 0) { 570 memcpy(args->arg, stat, sizeof(*stat)); 571 } 572 sem_post(args->sem); 573 } 574 575 static void 576 __file_stat(void *arg) 577 { 578 struct spdk_fs_request *req = arg; 579 struct spdk_fs_cb_args *args = &req->args; 580 581 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 582 args->fn.stat_op, req); 583 } 584 585 int 586 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 587 const char *name, struct spdk_file_stat *stat) 588 { 589 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 590 struct spdk_fs_request *req; 591 int rc; 592 593 req = alloc_fs_request(channel); 594 assert(req != NULL); 595 596 req->args.fs = fs; 597 req->args.op.stat.name = name; 598 req->args.fn.stat_op = __copy_stat; 599 req->args.arg = stat; 600 req->args.sem = &channel->sem; 601 channel->send_request(__file_stat, req); 602 sem_wait(&channel->sem); 603 604 rc = req->args.rc; 605 free_fs_request(req); 606 607 return rc; 608 } 609 610 static void 611 fs_create_blob_close_cb(void *ctx, int bserrno) 612 { 613 struct spdk_fs_request *req = ctx; 614 struct spdk_fs_cb_args *args = &req->args; 615 616 args->fn.file_op(args->arg, bserrno); 617 free_fs_request(req); 618 } 619 620 static void 621 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 622 { 623 struct spdk_fs_request *req = ctx; 624 struct spdk_fs_cb_args *args = &req->args; 625 struct spdk_file *f = args->file; 626 uint64_t length = 0; 627 628 f->blob = blob; 629 spdk_bs_md_resize_blob(blob, 1); 630 spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 631 spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length)); 632 633 spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args); 634 } 635 636 static void 637 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 638 { 639 struct spdk_fs_request *req = ctx; 640 struct spdk_fs_cb_args *args = &req->args; 641 struct spdk_file *f = args->file; 642 643 f->blobid = blobid; 644 spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 645 } 646 647 void 648 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 649 spdk_file_op_complete cb_fn, void *cb_arg) 650 { 651 struct spdk_file *file; 652 struct spdk_fs_request *req; 653 struct spdk_fs_cb_args *args; 654 655 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 656 cb_fn(cb_arg, -ENAMETOOLONG); 657 return; 658 } 659 660 file = fs_find_file(fs, name); 661 if (file != NULL) { 662 cb_fn(cb_arg, -EEXIST); 663 return; 664 } 665 666 file = file_alloc(fs); 667 if (file == NULL) { 668 cb_fn(cb_arg, -ENOMEM); 669 return; 670 } 671 672 req = alloc_fs_request(fs->md_fs_channel); 673 if (req == NULL) { 674 cb_fn(cb_arg, -ENOMEM); 675 return; 676 } 677 678 args = &req->args; 679 args->file = file; 680 args->fn.file_op = cb_fn; 681 args->arg = cb_arg; 682 683 file->name = strdup(name); 684 spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args); 685 } 686 687 static void 688 __fs_create_file_done(void *arg, int fserrno) 689 { 690 struct spdk_fs_request *req = arg; 691 struct spdk_fs_cb_args *args = &req->args; 692 693 args->rc = fserrno; 694 sem_post(args->sem); 695 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 696 } 697 698 static void 699 __fs_create_file(void *arg) 700 { 701 struct spdk_fs_request *req = arg; 702 struct spdk_fs_cb_args *args = &req->args; 703 704 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 705 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 706 } 707 708 int 709 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 710 { 711 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 712 struct spdk_fs_request *req; 713 struct spdk_fs_cb_args *args; 714 int rc; 715 716 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 717 718 req = alloc_fs_request(channel); 719 assert(req != NULL); 720 721 args = &req->args; 722 args->fs = fs; 723 args->op.create.name = name; 724 args->sem = &channel->sem; 725 fs->send_request(__fs_create_file, req); 726 sem_wait(&channel->sem); 727 rc = args->rc; 728 free_fs_request(req); 729 730 return rc; 731 } 732 733 static void 734 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 735 { 736 struct spdk_fs_request *req = ctx; 737 struct spdk_fs_cb_args *args = &req->args; 738 struct spdk_file *f = args->file; 739 740 f->blob = blob; 741 while (!TAILQ_EMPTY(&f->open_requests)) { 742 req = TAILQ_FIRST(&f->open_requests); 743 args = &req->args; 744 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 745 args->fn.file_op_with_handle(args->arg, f, bserrno); 746 free_fs_request(req); 747 } 748 } 749 750 static void 751 fs_open_blob_create_cb(void *ctx, int bserrno) 752 { 753 struct spdk_fs_request *req = ctx; 754 struct spdk_fs_cb_args *args = &req->args; 755 struct spdk_file *file = args->file; 756 struct spdk_filesystem *fs = args->fs; 757 758 if (file == NULL) { 759 /* 760 * This is from an open with CREATE flag - the file 761 * is now created so look it up in the file list for this 762 * filesystem. 763 */ 764 file = fs_find_file(fs, args->op.open.name); 765 assert(file != NULL); 766 args->file = file; 767 } 768 769 file->ref_count++; 770 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 771 if (file->ref_count == 1) { 772 assert(file->blob == NULL); 773 spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 774 } else if (file->blob != NULL) { 775 fs_open_blob_done(req, file->blob, 0); 776 } else { 777 /* 778 * The blob open for this file is in progress due to a previous 779 * open request. When that open completes, it will invoke the 780 * open callback for this request. 781 */ 782 } 783 } 784 785 void 786 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 787 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 788 { 789 struct spdk_file *f = NULL; 790 struct spdk_fs_request *req; 791 struct spdk_fs_cb_args *args; 792 793 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 794 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 795 return; 796 } 797 798 f = fs_find_file(fs, name); 799 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 800 cb_fn(cb_arg, NULL, -ENOENT); 801 return; 802 } 803 804 req = alloc_fs_request(fs->md_fs_channel); 805 if (req == NULL) { 806 cb_fn(cb_arg, NULL, -ENOMEM); 807 return; 808 } 809 810 args = &req->args; 811 args->fn.file_op_with_handle = cb_fn; 812 args->arg = cb_arg; 813 args->file = f; 814 args->fs = fs; 815 args->op.open.name = name; 816 817 if (f == NULL) { 818 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 819 } else { 820 fs_open_blob_create_cb(req, 0); 821 } 822 } 823 824 static void 825 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 826 { 827 struct spdk_fs_request *req = arg; 828 struct spdk_fs_cb_args *args = &req->args; 829 830 args->file = file; 831 args->rc = bserrno; 832 sem_post(args->sem); 833 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 834 } 835 836 static void 837 __fs_open_file(void *arg) 838 { 839 struct spdk_fs_request *req = arg; 840 struct spdk_fs_cb_args *args = &req->args; 841 842 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 843 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 844 __fs_open_file_done, req); 845 } 846 847 int 848 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 849 const char *name, uint32_t flags, struct spdk_file **file) 850 { 851 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 852 struct spdk_fs_request *req; 853 struct spdk_fs_cb_args *args; 854 int rc; 855 856 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 857 858 req = alloc_fs_request(channel); 859 assert(req != NULL); 860 861 args = &req->args; 862 args->fs = fs; 863 args->op.open.name = name; 864 args->op.open.flags = flags; 865 args->sem = &channel->sem; 866 fs->send_request(__fs_open_file, req); 867 sem_wait(&channel->sem); 868 rc = args->rc; 869 if (rc == 0) { 870 *file = args->file; 871 } else { 872 *file = NULL; 873 } 874 free_fs_request(req); 875 876 return rc; 877 } 878 879 static void 880 fs_rename_blob_close_cb(void *ctx, int bserrno) 881 { 882 struct spdk_fs_request *req = ctx; 883 struct spdk_fs_cb_args *args = &req->args; 884 885 args->fn.fs_op(args->arg, bserrno); 886 free_fs_request(req); 887 } 888 889 static void 890 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 891 { 892 struct spdk_fs_request *req = ctx; 893 struct spdk_fs_cb_args *args = &req->args; 894 struct spdk_file *f = args->file; 895 const char *new_name = args->op.rename.new_name; 896 897 f->blob = blob; 898 spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 899 spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req); 900 } 901 902 static void 903 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 904 { 905 struct spdk_fs_cb_args *args = &req->args; 906 struct spdk_file *f; 907 908 f = fs_find_file(args->fs, args->op.rename.old_name); 909 if (f == NULL) { 910 args->fn.fs_op(args->arg, -ENOENT); 911 free_fs_request(req); 912 return; 913 } 914 915 free(f->name); 916 f->name = strdup(args->op.rename.new_name); 917 args->file = f; 918 spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 919 } 920 921 static void 922 fs_rename_delete_done(void *arg, int fserrno) 923 { 924 __spdk_fs_md_rename_file(arg); 925 } 926 927 void 928 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 929 const char *old_name, const char *new_name, 930 spdk_file_op_complete cb_fn, void *cb_arg) 931 { 932 struct spdk_file *f; 933 struct spdk_fs_request *req; 934 struct spdk_fs_cb_args *args; 935 936 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name); 937 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 938 cb_fn(cb_arg, -ENAMETOOLONG); 939 return; 940 } 941 942 req = alloc_fs_request(fs->md_fs_channel); 943 if (req == NULL) { 944 cb_fn(cb_arg, -ENOMEM); 945 return; 946 } 947 948 args = &req->args; 949 args->fn.fs_op = cb_fn; 950 args->fs = fs; 951 args->arg = cb_arg; 952 args->op.rename.old_name = old_name; 953 args->op.rename.new_name = new_name; 954 955 f = fs_find_file(fs, new_name); 956 if (f == NULL) { 957 __spdk_fs_md_rename_file(req); 958 return; 959 } 960 961 /* 962 * The rename overwrites an existing file. So delete the existing file, then 963 * do the actual rename. 964 */ 965 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 966 } 967 968 static void 969 __fs_rename_file_done(void *arg, int fserrno) 970 { 971 struct spdk_fs_request *req = arg; 972 struct spdk_fs_cb_args *args = &req->args; 973 974 args->rc = fserrno; 975 sem_post(args->sem); 976 } 977 978 static void 979 __fs_rename_file(void *arg) 980 { 981 struct spdk_fs_request *req = arg; 982 struct spdk_fs_cb_args *args = &req->args; 983 984 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 985 __fs_rename_file_done, req); 986 } 987 988 int 989 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 990 const char *old_name, const char *new_name) 991 { 992 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 993 struct spdk_fs_request *req; 994 struct spdk_fs_cb_args *args; 995 int rc; 996 997 req = alloc_fs_request(channel); 998 assert(req != NULL); 999 1000 args = &req->args; 1001 1002 args->fs = fs; 1003 args->op.rename.old_name = old_name; 1004 args->op.rename.new_name = new_name; 1005 args->sem = &channel->sem; 1006 fs->send_request(__fs_rename_file, req); 1007 sem_wait(&channel->sem); 1008 rc = args->rc; 1009 free_fs_request(req); 1010 return rc; 1011 } 1012 1013 static void 1014 blob_delete_cb(void *ctx, int bserrno) 1015 { 1016 struct spdk_fs_request *req = ctx; 1017 struct spdk_fs_cb_args *args = &req->args; 1018 1019 args->fn.file_op(args->arg, bserrno); 1020 free_fs_request(req); 1021 } 1022 1023 void 1024 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1025 spdk_file_op_complete cb_fn, void *cb_arg) 1026 { 1027 struct spdk_file *f; 1028 spdk_blob_id blobid; 1029 struct spdk_fs_request *req; 1030 struct spdk_fs_cb_args *args; 1031 1032 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1033 1034 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1035 cb_fn(cb_arg, -ENAMETOOLONG); 1036 return; 1037 } 1038 1039 f = fs_find_file(fs, name); 1040 if (f == NULL) { 1041 cb_fn(cb_arg, -ENOENT); 1042 return; 1043 } 1044 1045 if (f->ref_count > 0) { 1046 /* For now, do not allow deleting files with open references. */ 1047 cb_fn(cb_arg, -EBUSY); 1048 return; 1049 } 1050 1051 req = alloc_fs_request(fs->md_fs_channel); 1052 if (req == NULL) { 1053 cb_fn(cb_arg, -ENOMEM); 1054 return; 1055 } 1056 1057 TAILQ_REMOVE(&fs->files, f, tailq); 1058 1059 cache_free_buffers(f); 1060 1061 blobid = f->blobid; 1062 1063 free(f->name); 1064 free(f->tree); 1065 free(f); 1066 1067 args = &req->args; 1068 args->fn.file_op = cb_fn; 1069 args->arg = cb_arg; 1070 spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1071 } 1072 1073 static void 1074 __fs_delete_file_done(void *arg, int fserrno) 1075 { 1076 struct spdk_fs_request *req = arg; 1077 struct spdk_fs_cb_args *args = &req->args; 1078 1079 args->rc = fserrno; 1080 sem_post(args->sem); 1081 } 1082 1083 static void 1084 __fs_delete_file(void *arg) 1085 { 1086 struct spdk_fs_request *req = arg; 1087 struct spdk_fs_cb_args *args = &req->args; 1088 1089 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1090 } 1091 1092 int 1093 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1094 const char *name) 1095 { 1096 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1097 struct spdk_fs_request *req; 1098 struct spdk_fs_cb_args *args; 1099 int rc; 1100 1101 req = alloc_fs_request(channel); 1102 assert(req != NULL); 1103 1104 args = &req->args; 1105 args->fs = fs; 1106 args->op.delete.name = name; 1107 args->sem = &channel->sem; 1108 fs->send_request(__fs_delete_file, req); 1109 sem_wait(&channel->sem); 1110 rc = args->rc; 1111 free_fs_request(req); 1112 1113 return rc; 1114 } 1115 1116 spdk_fs_iter 1117 spdk_fs_iter_first(struct spdk_filesystem *fs) 1118 { 1119 struct spdk_file *f; 1120 1121 f = TAILQ_FIRST(&fs->files); 1122 return f; 1123 } 1124 1125 spdk_fs_iter 1126 spdk_fs_iter_next(spdk_fs_iter iter) 1127 { 1128 struct spdk_file *f = iter; 1129 1130 if (f == NULL) { 1131 return NULL; 1132 } 1133 1134 f = TAILQ_NEXT(f, tailq); 1135 return f; 1136 } 1137 1138 const char * 1139 spdk_file_get_name(struct spdk_file *file) 1140 { 1141 return file->name; 1142 } 1143 1144 uint64_t 1145 spdk_file_get_length(struct spdk_file *file) 1146 { 1147 assert(file != NULL); 1148 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1149 return file->length; 1150 } 1151 1152 static void 1153 fs_truncate_complete_cb(void *ctx, int bserrno) 1154 { 1155 struct spdk_fs_request *req = ctx; 1156 struct spdk_fs_cb_args *args = &req->args; 1157 1158 args->fn.file_op(args->arg, bserrno); 1159 free_fs_request(req); 1160 } 1161 1162 static uint64_t 1163 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1164 { 1165 return (length + cluster_sz - 1) / cluster_sz; 1166 } 1167 1168 void 1169 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1170 spdk_file_op_complete cb_fn, void *cb_arg) 1171 { 1172 struct spdk_filesystem *fs; 1173 size_t num_clusters; 1174 struct spdk_fs_request *req; 1175 struct spdk_fs_cb_args *args; 1176 1177 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1178 if (length == file->length) { 1179 cb_fn(cb_arg, 0); 1180 return; 1181 } 1182 1183 req = alloc_fs_request(file->fs->md_fs_channel); 1184 if (req == NULL) { 1185 cb_fn(cb_arg, -ENOMEM); 1186 return; 1187 } 1188 1189 args = &req->args; 1190 args->fn.file_op = cb_fn; 1191 args->arg = cb_arg; 1192 args->file = file; 1193 fs = file->fs; 1194 1195 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1196 1197 spdk_bs_md_resize_blob(file->blob, num_clusters); 1198 spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length)); 1199 1200 file->length = length; 1201 if (file->append_pos > file->length) { 1202 file->append_pos = file->length; 1203 } 1204 1205 spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args); 1206 } 1207 1208 static void 1209 __truncate(void *arg) 1210 { 1211 struct spdk_fs_request *req = arg; 1212 struct spdk_fs_cb_args *args = &req->args; 1213 1214 spdk_file_truncate_async(args->file, args->op.truncate.length, 1215 args->fn.file_op, args->arg); 1216 } 1217 1218 void 1219 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1220 uint64_t length) 1221 { 1222 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1223 struct spdk_fs_request *req; 1224 struct spdk_fs_cb_args *args; 1225 1226 req = alloc_fs_request(channel); 1227 assert(req != NULL); 1228 1229 args = &req->args; 1230 1231 args->file = file; 1232 args->op.truncate.length = length; 1233 args->fn.file_op = __sem_post; 1234 args->arg = &channel->sem; 1235 1236 channel->send_request(__truncate, req); 1237 sem_wait(&channel->sem); 1238 free_fs_request(req); 1239 } 1240 1241 static void 1242 __rw_done(void *ctx, int bserrno) 1243 { 1244 struct spdk_fs_request *req = ctx; 1245 struct spdk_fs_cb_args *args = &req->args; 1246 1247 spdk_free(args->op.rw.pin_buf); 1248 args->fn.file_op(args->arg, bserrno); 1249 free_fs_request(req); 1250 } 1251 1252 static void 1253 __read_done(void *ctx, int bserrno) 1254 { 1255 struct spdk_fs_request *req = ctx; 1256 struct spdk_fs_cb_args *args = &req->args; 1257 1258 if (args->op.rw.is_read) { 1259 memcpy(args->op.rw.user_buf, 1260 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1261 args->op.rw.length); 1262 __rw_done(req, 0); 1263 } else { 1264 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1265 args->op.rw.user_buf, 1266 args->op.rw.length); 1267 spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel, 1268 args->op.rw.pin_buf, 1269 args->op.rw.start_page, args->op.rw.num_pages, 1270 __rw_done, req); 1271 } 1272 } 1273 1274 static void 1275 __do_blob_read(void *ctx, int fserrno) 1276 { 1277 struct spdk_fs_request *req = ctx; 1278 struct spdk_fs_cb_args *args = &req->args; 1279 1280 spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel, 1281 args->op.rw.pin_buf, 1282 args->op.rw.start_page, args->op.rw.num_pages, 1283 __read_done, req); 1284 } 1285 1286 static void 1287 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1288 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1289 { 1290 uint64_t end_page; 1291 1292 *page_size = spdk_bs_get_page_size(file->fs->bs); 1293 *start_page = offset / *page_size; 1294 end_page = (offset + length - 1) / *page_size; 1295 *num_pages = (end_page - *start_page + 1); 1296 } 1297 1298 static void 1299 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1300 void *payload, uint64_t offset, uint64_t length, 1301 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1302 { 1303 struct spdk_fs_request *req; 1304 struct spdk_fs_cb_args *args; 1305 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1306 uint64_t start_page, num_pages, pin_buf_length; 1307 uint32_t page_size; 1308 1309 if (is_read && offset + length > file->length) { 1310 cb_fn(cb_arg, -EINVAL); 1311 return; 1312 } 1313 1314 req = alloc_fs_request(channel); 1315 if (req == NULL) { 1316 cb_fn(cb_arg, -ENOMEM); 1317 return; 1318 } 1319 1320 args = &req->args; 1321 args->fn.file_op = cb_fn; 1322 args->arg = cb_arg; 1323 args->file = file; 1324 args->op.rw.channel = channel->bs_channel; 1325 args->op.rw.user_buf = payload; 1326 args->op.rw.is_read = is_read; 1327 args->op.rw.offset = offset; 1328 args->op.rw.length = length; 1329 1330 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1331 pin_buf_length = num_pages * page_size; 1332 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, 4096, NULL); 1333 1334 args->op.rw.start_page = start_page; 1335 args->op.rw.num_pages = num_pages; 1336 1337 if (!is_read && file->length < offset + length) { 1338 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1339 } else { 1340 __do_blob_read(req, 0); 1341 } 1342 } 1343 1344 void 1345 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1346 void *payload, uint64_t offset, uint64_t length, 1347 spdk_file_op_complete cb_fn, void *cb_arg) 1348 { 1349 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1350 } 1351 1352 void 1353 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1354 void *payload, uint64_t offset, uint64_t length, 1355 spdk_file_op_complete cb_fn, void *cb_arg) 1356 { 1357 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n", 1358 file->name, offset, length); 1359 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1360 } 1361 1362 struct spdk_io_channel * 1363 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs, uint32_t priority) 1364 { 1365 struct spdk_io_channel *io_channel; 1366 struct spdk_fs_channel *fs_channel; 1367 uint32_t max_ops = 512; 1368 1369 io_channel = spdk_get_io_channel(fs, priority, true, (void *)&max_ops); 1370 fs_channel = spdk_io_channel_get_ctx(io_channel); 1371 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT, 512); 1372 fs_channel->send_request = __send_request_direct; 1373 1374 return io_channel; 1375 } 1376 1377 struct spdk_io_channel * 1378 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs, uint32_t priority) 1379 { 1380 struct spdk_io_channel *io_channel; 1381 struct spdk_fs_channel *fs_channel; 1382 uint32_t max_ops = 16; 1383 1384 io_channel = spdk_get_io_channel(fs, priority, true, (void *)&max_ops); 1385 fs_channel = spdk_io_channel_get_ctx(io_channel); 1386 fs_channel->send_request = fs->send_request; 1387 1388 return io_channel; 1389 } 1390 1391 void 1392 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1393 { 1394 spdk_put_io_channel(channel); 1395 } 1396 1397 void 1398 spdk_fs_set_cache_size(uint64_t size_in_mb) 1399 { 1400 g_fs_cache_size = size_in_mb * 1024 * 1024; 1401 } 1402 1403 uint64_t 1404 spdk_fs_get_cache_size(void) 1405 { 1406 return g_fs_cache_size / (1024 * 1024); 1407 } 1408 1409 static void __file_flush(void *_args); 1410 1411 static void * 1412 alloc_cache_memory_buffer(struct spdk_file *context) 1413 { 1414 struct spdk_file *file; 1415 void *buf; 1416 1417 buf = spdk_mempool_get(g_cache_pool); 1418 if (buf != NULL) { 1419 return buf; 1420 } 1421 1422 pthread_spin_lock(&g_caches_lock); 1423 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1424 if (!file->open_for_writing && 1425 file->priority == SPDK_FILE_PRIORITY_LOW && 1426 file != context) { 1427 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1428 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1429 break; 1430 } 1431 } 1432 pthread_spin_unlock(&g_caches_lock); 1433 if (file != NULL) { 1434 cache_free_buffers(file); 1435 buf = spdk_mempool_get(g_cache_pool); 1436 if (buf != NULL) { 1437 return buf; 1438 } 1439 } 1440 1441 pthread_spin_lock(&g_caches_lock); 1442 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1443 if (!file->open_for_writing && file != context) { 1444 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1445 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1446 break; 1447 } 1448 } 1449 pthread_spin_unlock(&g_caches_lock); 1450 if (file != NULL) { 1451 cache_free_buffers(file); 1452 buf = spdk_mempool_get(g_cache_pool); 1453 if (buf != NULL) { 1454 return buf; 1455 } 1456 } 1457 1458 pthread_spin_lock(&g_caches_lock); 1459 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1460 if (file != context) { 1461 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1462 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1463 break; 1464 } 1465 } 1466 pthread_spin_unlock(&g_caches_lock); 1467 if (file != NULL) { 1468 cache_free_buffers(file); 1469 buf = spdk_mempool_get(g_cache_pool); 1470 if (buf != NULL) { 1471 return buf; 1472 } 1473 } 1474 1475 assert(false); 1476 return NULL; 1477 } 1478 1479 static struct cache_buffer * 1480 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1481 { 1482 struct cache_buffer *buf; 1483 int count = 0; 1484 1485 buf = calloc(1, sizeof(*buf)); 1486 if (buf == NULL) { 1487 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "calloc failed\n"); 1488 return NULL; 1489 } 1490 1491 buf->buf = alloc_cache_memory_buffer(file); 1492 if (buf->buf == NULL) { 1493 while (buf->buf == NULL) { 1494 /* 1495 * TODO: alloc_cache_memory_buffer() should eventually free 1496 * some buffers. Need a more sophisticated check here, instead 1497 * of just bailing if 100 tries does not result in getting a 1498 * free buffer. This will involve using the sync channel's 1499 * semaphore to block until a buffer becomes available. 1500 */ 1501 if (count++ == 100) { 1502 SPDK_ERRLOG("could not allocate cache buffer\n"); 1503 assert(false); 1504 free(buf); 1505 return NULL; 1506 } 1507 buf->buf = alloc_cache_memory_buffer(file); 1508 } 1509 } 1510 1511 buf->buf_size = CACHE_BUFFER_SIZE; 1512 buf->offset = offset; 1513 1514 pthread_spin_lock(&g_caches_lock); 1515 if (file->tree->present_mask == 0) { 1516 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1517 } 1518 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1519 pthread_spin_unlock(&g_caches_lock); 1520 1521 return buf; 1522 } 1523 1524 static struct cache_buffer * 1525 cache_append_buffer(struct spdk_file *file) 1526 { 1527 struct cache_buffer *last; 1528 1529 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1530 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1531 1532 last = cache_insert_buffer(file, file->append_pos); 1533 if (last == NULL) { 1534 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n"); 1535 return NULL; 1536 } 1537 1538 if (file->last != NULL) { 1539 file->last->next = last; 1540 } 1541 file->last = last; 1542 1543 return last; 1544 } 1545 1546 static void 1547 __wake_caller(struct spdk_fs_cb_args *args) 1548 { 1549 sem_post(args->sem); 1550 } 1551 1552 static void 1553 __file_cache_finish_sync(struct spdk_file *file) 1554 { 1555 struct spdk_fs_request *sync_req; 1556 struct spdk_fs_cb_args *sync_args; 1557 1558 pthread_spin_lock(&file->lock); 1559 while (!TAILQ_EMPTY(&file->sync_requests)) { 1560 sync_req = TAILQ_FIRST(&file->sync_requests); 1561 sync_args = &sync_req->args; 1562 if (sync_args->op.sync.offset > file->length_flushed) { 1563 break; 1564 } 1565 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1566 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1567 pthread_spin_unlock(&file->lock); 1568 sync_args->fn.file_op(sync_args->arg, 0); 1569 pthread_spin_lock(&file->lock); 1570 free_fs_request(sync_req); 1571 } 1572 pthread_spin_unlock(&file->lock); 1573 } 1574 1575 static void 1576 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1577 { 1578 struct spdk_file *file = ctx; 1579 1580 __file_cache_finish_sync(file); 1581 } 1582 1583 static void 1584 __free_args(struct spdk_fs_cb_args *args) 1585 { 1586 struct spdk_fs_request *req; 1587 1588 if (!args->from_request) { 1589 free(args); 1590 } else { 1591 /* Depends on args being at the start of the spdk_fs_request structure. */ 1592 req = (struct spdk_fs_request *)args; 1593 free_fs_request(req); 1594 } 1595 } 1596 1597 static void 1598 __file_flush_done(void *arg, int bserrno) 1599 { 1600 struct spdk_fs_cb_args *args = arg; 1601 struct spdk_fs_request *sync_req; 1602 struct spdk_file *file = args->file; 1603 struct cache_buffer *next = args->op.flush.cache_buffer; 1604 1605 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1606 1607 pthread_spin_lock(&file->lock); 1608 next->in_progress = false; 1609 next->bytes_flushed += args->op.flush.length; 1610 file->length_flushed += args->op.flush.length; 1611 if (file->length_flushed > file->length) { 1612 file->length = file->length_flushed; 1613 } 1614 if (next->bytes_flushed == next->buf_size) { 1615 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1616 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1617 } 1618 1619 TAILQ_FOREACH_REVERSE(sync_req, &file->sync_requests, sync_requests_head, args.op.sync.tailq) { 1620 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1621 break; 1622 } 1623 } 1624 1625 if (sync_req != NULL) { 1626 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1627 spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed, 1628 sizeof(file->length_flushed)); 1629 1630 pthread_spin_unlock(&file->lock); 1631 spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file); 1632 } else { 1633 pthread_spin_unlock(&file->lock); 1634 __file_cache_finish_sync(file); 1635 } 1636 1637 /* 1638 * Assert that there is no cached data that extends past the end of the underlying 1639 * blob. 1640 */ 1641 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1642 next->bytes_filled == 0); 1643 1644 __file_flush(args); 1645 } 1646 1647 static void 1648 __file_flush(void *_args) 1649 { 1650 struct spdk_fs_cb_args *args = _args; 1651 struct spdk_file *file = args->file; 1652 struct cache_buffer *next; 1653 uint64_t offset, length, start_page, num_pages; 1654 uint32_t page_size; 1655 1656 pthread_spin_lock(&file->lock); 1657 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1658 if (next == NULL || next->in_progress) { 1659 /* 1660 * There is either no data to flush, or a flush I/O is already in 1661 * progress. So return immediately - if a flush I/O is in 1662 * progress we will flush more data after that is completed. 1663 */ 1664 __free_args(args); 1665 pthread_spin_unlock(&file->lock); 1666 return; 1667 } 1668 1669 offset = next->offset + next->bytes_flushed; 1670 length = next->bytes_filled - next->bytes_flushed; 1671 if (length == 0) { 1672 __free_args(args); 1673 pthread_spin_unlock(&file->lock); 1674 return; 1675 } 1676 args->op.flush.length = length; 1677 args->op.flush.cache_buffer = next; 1678 1679 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1680 1681 next->in_progress = true; 1682 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1683 offset, length, start_page, num_pages); 1684 pthread_spin_unlock(&file->lock); 1685 spdk_bs_io_write_blob(file->blob, file->fs->sync_fs_channel->bs_channel, 1686 next->buf + (start_page * page_size) - next->offset, 1687 start_page, num_pages, 1688 __file_flush_done, args); 1689 } 1690 1691 static void 1692 __file_extend_done(void *arg, int bserrno) 1693 { 1694 struct spdk_fs_cb_args *args = arg; 1695 1696 __wake_caller(args); 1697 } 1698 1699 static void 1700 __file_extend_blob(void *_args) 1701 { 1702 struct spdk_fs_cb_args *args = _args; 1703 struct spdk_file *file = args->file; 1704 1705 spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters); 1706 1707 spdk_bs_md_sync_blob(file->blob, __file_extend_done, args); 1708 } 1709 1710 static void 1711 __rw_from_file_done(void *arg, int bserrno) 1712 { 1713 struct spdk_fs_cb_args *args = arg; 1714 1715 __wake_caller(args); 1716 __free_args(args); 1717 } 1718 1719 static void 1720 __rw_from_file(void *_args) 1721 { 1722 struct spdk_fs_cb_args *args = _args; 1723 struct spdk_file *file = args->file; 1724 1725 if (args->op.rw.is_read) { 1726 spdk_file_read_async(file, file->fs->sync_io_channel, args->op.rw.user_buf, 1727 args->op.rw.offset, args->op.rw.length, 1728 __rw_from_file_done, args); 1729 } else { 1730 spdk_file_write_async(file, file->fs->sync_io_channel, args->op.rw.user_buf, 1731 args->op.rw.offset, args->op.rw.length, 1732 __rw_from_file_done, args); 1733 } 1734 } 1735 1736 static int 1737 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1738 uint64_t offset, uint64_t length, bool is_read) 1739 { 1740 struct spdk_fs_cb_args *args; 1741 1742 args = calloc(1, sizeof(*args)); 1743 if (args == NULL) { 1744 sem_post(sem); 1745 return -ENOMEM; 1746 } 1747 1748 args->file = file; 1749 args->sem = sem; 1750 args->op.rw.user_buf = payload; 1751 args->op.rw.offset = offset; 1752 args->op.rw.length = length; 1753 args->op.rw.is_read = is_read; 1754 file->fs->send_request(__rw_from_file, args); 1755 return 0; 1756 } 1757 1758 int 1759 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 1760 void *payload, uint64_t offset, uint64_t length) 1761 { 1762 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1763 struct spdk_fs_cb_args *args; 1764 uint64_t rem_length, copy, blob_size, cluster_sz; 1765 uint32_t cache_buffers_filled = 0; 1766 uint8_t *cur_payload; 1767 struct cache_buffer *last; 1768 1769 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 1770 1771 if (length == 0) { 1772 return 0; 1773 } 1774 1775 if (offset != file->append_pos) { 1776 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 1777 return -EINVAL; 1778 } 1779 1780 pthread_spin_lock(&file->lock); 1781 file->open_for_writing = true; 1782 1783 if (file->last == NULL) { 1784 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 1785 cache_append_buffer(file); 1786 } else { 1787 int rc; 1788 1789 file->append_pos += length; 1790 rc = __send_rw_from_file(file, &channel->sem, payload, 1791 offset, length, false); 1792 pthread_spin_unlock(&file->lock); 1793 sem_wait(&channel->sem); 1794 return rc; 1795 } 1796 } 1797 1798 blob_size = __file_get_blob_size(file); 1799 1800 if ((offset + length) > blob_size) { 1801 struct spdk_fs_cb_args extend_args = {}; 1802 1803 cluster_sz = file->fs->bs_opts.cluster_sz; 1804 extend_args.sem = &channel->sem; 1805 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 1806 extend_args.file = file; 1807 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 1808 pthread_spin_unlock(&file->lock); 1809 file->fs->send_request(__file_extend_blob, &extend_args); 1810 sem_wait(&channel->sem); 1811 } 1812 1813 last = file->last; 1814 rem_length = length; 1815 cur_payload = payload; 1816 while (rem_length > 0) { 1817 copy = last->buf_size - last->bytes_filled; 1818 if (copy > rem_length) { 1819 copy = rem_length; 1820 } 1821 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 1822 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 1823 file->append_pos += copy; 1824 if (file->length < file->append_pos) { 1825 file->length = file->append_pos; 1826 } 1827 cur_payload += copy; 1828 last->bytes_filled += copy; 1829 rem_length -= copy; 1830 if (last->bytes_filled == last->buf_size) { 1831 cache_buffers_filled++; 1832 last = cache_append_buffer(file); 1833 if (last == NULL) { 1834 BLOBFS_TRACE(file, "nomem\n"); 1835 pthread_spin_unlock(&file->lock); 1836 return -ENOMEM; 1837 } 1838 } 1839 } 1840 1841 if (cache_buffers_filled == 0) { 1842 pthread_spin_unlock(&file->lock); 1843 return 0; 1844 } 1845 1846 args = calloc(1, sizeof(*args)); 1847 if (args == NULL) { 1848 pthread_spin_unlock(&file->lock); 1849 return -ENOMEM; 1850 } 1851 1852 args->file = file; 1853 file->fs->send_request(__file_flush, args); 1854 pthread_spin_unlock(&file->lock); 1855 return 0; 1856 } 1857 1858 static void 1859 __readahead_done(void *arg, int bserrno) 1860 { 1861 struct spdk_fs_cb_args *args = arg; 1862 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 1863 struct spdk_file *file = args->file; 1864 1865 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 1866 1867 pthread_spin_lock(&file->lock); 1868 cache_buffer->bytes_filled = args->op.readahead.length; 1869 cache_buffer->bytes_flushed = args->op.readahead.length; 1870 cache_buffer->in_progress = false; 1871 pthread_spin_unlock(&file->lock); 1872 1873 __free_args(args); 1874 } 1875 1876 static void 1877 __readahead(void *_args) 1878 { 1879 struct spdk_fs_cb_args *args = _args; 1880 struct spdk_file *file = args->file; 1881 uint64_t offset, length, start_page, num_pages; 1882 uint32_t page_size; 1883 1884 offset = args->op.readahead.offset; 1885 length = args->op.readahead.length; 1886 assert(length > 0); 1887 1888 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1889 1890 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1891 offset, length, start_page, num_pages); 1892 spdk_bs_io_read_blob(file->blob, file->fs->sync_fs_channel->bs_channel, 1893 args->op.readahead.cache_buffer->buf, 1894 start_page, num_pages, 1895 __readahead_done, args); 1896 } 1897 1898 static uint64_t 1899 __next_cache_buffer_offset(uint64_t offset) 1900 { 1901 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 1902 } 1903 1904 static void 1905 check_readahead(struct spdk_file *file, uint64_t offset) 1906 { 1907 struct spdk_fs_cb_args *args; 1908 1909 offset = __next_cache_buffer_offset(offset); 1910 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 1911 return; 1912 } 1913 1914 args = calloc(1, sizeof(*args)); 1915 if (args == NULL) { 1916 return; 1917 } 1918 1919 BLOBFS_TRACE(file, "offset=%jx\n", offset); 1920 1921 args->file = file; 1922 args->op.readahead.offset = offset; 1923 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 1924 args->op.readahead.cache_buffer->in_progress = true; 1925 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 1926 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 1927 } else { 1928 args->op.readahead.length = CACHE_BUFFER_SIZE; 1929 } 1930 file->fs->send_request(__readahead, args); 1931 } 1932 1933 static int 1934 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 1935 { 1936 struct cache_buffer *buf; 1937 1938 buf = spdk_tree_find_filled_buffer(file->tree, offset); 1939 if (buf == NULL) { 1940 return __send_rw_from_file(file, sem, payload, offset, length, true); 1941 } 1942 1943 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 1944 length = buf->offset + buf->bytes_filled - offset; 1945 } 1946 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 1947 memcpy(payload, &buf->buf[offset - buf->offset], length); 1948 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 1949 pthread_spin_lock(&g_caches_lock); 1950 spdk_tree_remove_buffer(file->tree, buf); 1951 if (file->tree->present_mask == 0) { 1952 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1953 } 1954 pthread_spin_unlock(&g_caches_lock); 1955 } 1956 1957 sem_post(sem); 1958 return 0; 1959 } 1960 1961 int64_t 1962 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 1963 void *payload, uint64_t offset, uint64_t length) 1964 { 1965 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1966 uint64_t final_offset, final_length; 1967 uint32_t sub_reads = 0; 1968 int rc = 0; 1969 1970 pthread_spin_lock(&file->lock); 1971 1972 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 1973 1974 file->open_for_writing = false; 1975 1976 if (length == 0 || offset >= file->length) { 1977 pthread_spin_unlock(&file->lock); 1978 return 0; 1979 } 1980 1981 if (offset + length > file->length) { 1982 length = file->length - offset; 1983 } 1984 1985 if (offset != file->next_seq_offset) { 1986 file->seq_byte_count = 0; 1987 } 1988 file->seq_byte_count += length; 1989 file->next_seq_offset = offset + length; 1990 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 1991 check_readahead(file, offset); 1992 check_readahead(file, offset + CACHE_BUFFER_SIZE); 1993 } 1994 1995 final_length = 0; 1996 final_offset = offset + length; 1997 while (offset < final_offset) { 1998 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 1999 if (length > (final_offset - offset)) { 2000 length = final_offset - offset; 2001 } 2002 rc = __file_read(file, payload, offset, length, &channel->sem); 2003 if (rc == 0) { 2004 final_length += length; 2005 } else { 2006 break; 2007 } 2008 payload += length; 2009 offset += length; 2010 sub_reads++; 2011 } 2012 pthread_spin_unlock(&file->lock); 2013 while (sub_reads-- > 0) { 2014 sem_wait(&channel->sem); 2015 } 2016 if (rc == 0) { 2017 return final_length; 2018 } else { 2019 return rc; 2020 } 2021 } 2022 2023 static void 2024 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2025 spdk_file_op_complete cb_fn, void *cb_arg) 2026 { 2027 struct spdk_fs_request *sync_req; 2028 struct spdk_fs_request *flush_req; 2029 struct spdk_fs_cb_args *sync_args; 2030 struct spdk_fs_cb_args *flush_args; 2031 2032 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2033 2034 pthread_spin_lock(&file->lock); 2035 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2036 BLOBFS_TRACE(file, "done - no data to flush\n"); 2037 pthread_spin_unlock(&file->lock); 2038 cb_fn(cb_arg, 0); 2039 return; 2040 } 2041 2042 sync_req = alloc_fs_request(channel); 2043 assert(sync_req != NULL); 2044 sync_args = &sync_req->args; 2045 2046 flush_req = alloc_fs_request(channel); 2047 assert(flush_req != NULL); 2048 flush_args = &flush_req->args; 2049 2050 sync_args->file = file; 2051 sync_args->fn.file_op = cb_fn; 2052 sync_args->arg = cb_arg; 2053 sync_args->op.sync.offset = file->append_pos; 2054 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2055 pthread_spin_unlock(&file->lock); 2056 2057 flush_args->file = file; 2058 channel->send_request(__file_flush, flush_args); 2059 } 2060 2061 int 2062 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2063 { 2064 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2065 2066 _file_sync(file, channel, __sem_post, &channel->sem); 2067 sem_wait(&channel->sem); 2068 2069 return 0; 2070 } 2071 2072 void 2073 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2074 spdk_file_op_complete cb_fn, void *cb_arg) 2075 { 2076 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2077 2078 _file_sync(file, channel, cb_fn, cb_arg); 2079 } 2080 2081 void 2082 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2083 { 2084 BLOBFS_TRACE(file, "priority=%u\n", priority); 2085 file->priority = priority; 2086 2087 } 2088 2089 /* 2090 * Close routines 2091 */ 2092 2093 static void 2094 __file_close_async_done(void *ctx, int bserrno) 2095 { 2096 struct spdk_fs_request *req = ctx; 2097 struct spdk_fs_cb_args *args = &req->args; 2098 2099 args->fn.file_op(args->arg, bserrno); 2100 free_fs_request(req); 2101 } 2102 2103 static void 2104 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2105 { 2106 pthread_spin_lock(&file->lock); 2107 if (file->ref_count == 0) { 2108 pthread_spin_unlock(&file->lock); 2109 __file_close_async_done(req, -EBADF); 2110 return; 2111 } 2112 2113 file->ref_count--; 2114 if (file->ref_count > 0) { 2115 pthread_spin_unlock(&file->lock); 2116 __file_close_async_done(req, 0); 2117 return; 2118 } 2119 2120 pthread_spin_unlock(&file->lock); 2121 2122 spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req); 2123 } 2124 2125 static void 2126 __file_close_async__sync_done(void *arg, int fserrno) 2127 { 2128 struct spdk_fs_request *req = arg; 2129 struct spdk_fs_cb_args *args = &req->args; 2130 2131 __file_close_async(args->file, req); 2132 } 2133 2134 void 2135 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2136 { 2137 struct spdk_fs_request *req; 2138 struct spdk_fs_cb_args *args; 2139 2140 req = alloc_fs_request(file->fs->md_fs_channel); 2141 if (req == NULL) { 2142 cb_fn(cb_arg, -ENOMEM); 2143 return; 2144 } 2145 2146 args = &req->args; 2147 args->file = file; 2148 args->fn.file_op = cb_fn; 2149 args->arg = cb_arg; 2150 2151 spdk_file_sync_async(file, file->fs->md_io_channel, __file_close_async__sync_done, req); 2152 } 2153 2154 static void 2155 __file_close_done(void *arg, int fserrno) 2156 { 2157 struct spdk_fs_cb_args *args = arg; 2158 2159 args->rc = fserrno; 2160 sem_post(args->sem); 2161 } 2162 2163 static void 2164 __file_close(void *arg) 2165 { 2166 struct spdk_fs_request *req = arg; 2167 struct spdk_fs_cb_args *args = &req->args; 2168 struct spdk_file *file = args->file; 2169 2170 __file_close_async(file, req); 2171 } 2172 2173 int 2174 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2175 { 2176 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2177 struct spdk_fs_request *req; 2178 struct spdk_fs_cb_args *args; 2179 2180 req = alloc_fs_request(channel); 2181 assert(req != NULL); 2182 2183 args = &req->args; 2184 2185 spdk_file_sync(file, _channel); 2186 BLOBFS_TRACE(file, "name=%s\n", file->name); 2187 args->file = file; 2188 args->sem = &channel->sem; 2189 args->fn.file_op = __file_close_done; 2190 args->arg = req; 2191 channel->send_request(__file_close, req); 2192 sem_wait(&channel->sem); 2193 2194 return args->rc; 2195 } 2196 2197 static void 2198 cache_free_buffers(struct spdk_file *file) 2199 { 2200 BLOBFS_TRACE(file, "free=%s\n", file->name); 2201 pthread_spin_lock(&file->lock); 2202 pthread_spin_lock(&g_caches_lock); 2203 if (file->tree->present_mask == 0) { 2204 pthread_spin_unlock(&g_caches_lock); 2205 pthread_spin_unlock(&file->lock); 2206 return; 2207 } 2208 spdk_tree_free_buffers(file->tree); 2209 if (file->tree->present_mask == 0) { 2210 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2211 } 2212 file->last = NULL; 2213 pthread_spin_unlock(&g_caches_lock); 2214 pthread_spin_unlock(&file->lock); 2215 } 2216 2217 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS); 2218 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW); 2219