1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "blobfs_internal.h" 38 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/assert.h" 42 #include "spdk/env.h" 43 #include "spdk_internal/log.h" 44 45 #define BLOBFS_TRACE(file, str, args...) \ 46 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args) 47 48 #define BLOBFS_TRACE_RW(file, str, args...) \ 49 SPDK_TRACELOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args) 50 51 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 52 53 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 54 static struct spdk_mempool *g_cache_pool; 55 static TAILQ_HEAD(, spdk_file) g_caches; 56 static pthread_spinlock_t g_caches_lock; 57 58 static void 59 __sem_post(void *arg, int bserrno) 60 { 61 sem_t *sem = arg; 62 63 sem_post(sem); 64 } 65 66 void 67 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 68 { 69 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 70 free(cache_buffer); 71 } 72 73 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 74 75 struct spdk_file { 76 struct spdk_filesystem *fs; 77 struct spdk_blob *blob; 78 char *name; 79 uint64_t length; 80 bool open_for_writing; 81 uint64_t length_flushed; 82 uint64_t append_pos; 83 uint64_t seq_byte_count; 84 uint64_t next_seq_offset; 85 uint32_t priority; 86 TAILQ_ENTRY(spdk_file) tailq; 87 spdk_blob_id blobid; 88 uint32_t ref_count; 89 pthread_spinlock_t lock; 90 struct cache_buffer *last; 91 struct cache_tree *tree; 92 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 93 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 94 TAILQ_ENTRY(spdk_file) cache_tailq; 95 }; 96 97 struct spdk_filesystem { 98 struct spdk_blob_store *bs; 99 TAILQ_HEAD(, spdk_file) files; 100 struct spdk_bs_opts bs_opts; 101 struct spdk_bs_dev *bdev; 102 fs_send_request_fn send_request; 103 struct spdk_io_channel *sync_io_channel; 104 struct spdk_fs_channel *sync_fs_channel; 105 struct spdk_io_channel *md_io_channel; 106 struct spdk_fs_channel *md_fs_channel; 107 }; 108 109 struct spdk_fs_cb_args { 110 union { 111 spdk_fs_op_with_handle_complete fs_op_with_handle; 112 spdk_fs_op_complete fs_op; 113 spdk_file_op_with_handle_complete file_op_with_handle; 114 spdk_file_op_complete file_op; 115 spdk_file_stat_op_complete stat_op; 116 } fn; 117 void *arg; 118 sem_t *sem; 119 struct spdk_filesystem *fs; 120 struct spdk_file *file; 121 int rc; 122 bool from_request; 123 union { 124 struct { 125 uint64_t length; 126 } truncate; 127 struct { 128 struct spdk_io_channel *channel; 129 void *user_buf; 130 void *pin_buf; 131 int is_read; 132 off_t offset; 133 size_t length; 134 uint64_t start_page; 135 uint64_t num_pages; 136 uint32_t blocklen; 137 } rw; 138 struct { 139 const char *old_name; 140 const char *new_name; 141 } rename; 142 struct { 143 struct cache_buffer *cache_buffer; 144 uint64_t length; 145 } flush; 146 struct { 147 struct cache_buffer *cache_buffer; 148 uint64_t length; 149 uint64_t offset; 150 } readahead; 151 struct { 152 uint64_t offset; 153 TAILQ_ENTRY(spdk_fs_request) tailq; 154 } sync; 155 struct { 156 uint32_t num_clusters; 157 } resize; 158 struct { 159 const char *name; 160 uint32_t flags; 161 TAILQ_ENTRY(spdk_fs_request) tailq; 162 } open; 163 struct { 164 const char *name; 165 } create; 166 struct { 167 const char *name; 168 } delete; 169 struct { 170 const char *name; 171 } stat; 172 } op; 173 }; 174 175 static void cache_free_buffers(struct spdk_file *file); 176 177 static void 178 __initialize_cache(void) 179 { 180 if (g_cache_pool != NULL) { 181 return; 182 } 183 184 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 185 g_fs_cache_size / CACHE_BUFFER_SIZE, 186 CACHE_BUFFER_SIZE, -1, SPDK_ENV_SOCKET_ID_ANY); 187 TAILQ_INIT(&g_caches); 188 pthread_spin_init(&g_caches_lock, 0); 189 } 190 191 static uint64_t 192 __file_get_blob_size(struct spdk_file *file) 193 { 194 uint64_t cluster_sz; 195 196 cluster_sz = file->fs->bs_opts.cluster_sz; 197 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 198 } 199 200 struct spdk_fs_request { 201 struct spdk_fs_cb_args args; 202 TAILQ_ENTRY(spdk_fs_request) link; 203 struct spdk_fs_channel *channel; 204 }; 205 206 struct spdk_fs_channel { 207 struct spdk_fs_request *req_mem; 208 TAILQ_HEAD(, spdk_fs_request) reqs; 209 sem_t sem; 210 struct spdk_filesystem *fs; 211 struct spdk_io_channel *bs_channel; 212 fs_send_request_fn send_request; 213 }; 214 215 static struct spdk_fs_request * 216 alloc_fs_request(struct spdk_fs_channel *channel) 217 { 218 struct spdk_fs_request *req; 219 220 req = TAILQ_FIRST(&channel->reqs); 221 if (!req) { 222 return NULL; 223 } 224 TAILQ_REMOVE(&channel->reqs, req, link); 225 memset(req, 0, sizeof(*req)); 226 req->channel = channel; 227 req->args.from_request = true; 228 229 return req; 230 } 231 232 static void 233 free_fs_request(struct spdk_fs_request *req) 234 { 235 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 236 } 237 238 static int 239 _spdk_fs_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx) 240 { 241 struct spdk_filesystem *fs = io_device; 242 struct spdk_fs_channel *channel = ctx_buf; 243 uint32_t max_ops = *(uint32_t *)unique_ctx; 244 uint32_t i; 245 246 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 247 if (!channel->req_mem) { 248 return -1; 249 } 250 251 TAILQ_INIT(&channel->reqs); 252 sem_init(&channel->sem, 0, 0); 253 254 for (i = 0; i < max_ops; i++) { 255 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 256 } 257 258 channel->fs = fs; 259 260 return 0; 261 } 262 263 static void 264 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 265 { 266 struct spdk_fs_channel *channel = ctx_buf; 267 268 free(channel->req_mem); 269 if (channel->bs_channel != NULL) { 270 spdk_bs_free_io_channel(channel->bs_channel); 271 } 272 } 273 274 static void 275 __send_request_direct(fs_request_fn fn, void *arg) 276 { 277 fn(arg); 278 } 279 280 static void 281 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 282 { 283 fs->bs = bs; 284 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 285 fs->md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT); 286 fs->md_fs_channel->send_request = __send_request_direct; 287 fs->sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT); 288 fs->sync_fs_channel->send_request = __send_request_direct; 289 } 290 291 static void 292 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 293 { 294 struct spdk_fs_request *req = ctx; 295 struct spdk_fs_cb_args *args = &req->args; 296 struct spdk_filesystem *fs = args->fs; 297 298 if (bserrno == 0) { 299 common_fs_bs_init(fs, bs); 300 } else { 301 free(fs); 302 fs = NULL; 303 } 304 305 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 306 free_fs_request(req); 307 } 308 309 static struct spdk_filesystem * 310 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 311 { 312 struct spdk_filesystem *fs; 313 uint32_t max_ops = 512; 314 315 fs = calloc(1, sizeof(*fs)); 316 if (fs == NULL) { 317 return NULL; 318 } 319 320 fs->bdev = dev; 321 fs->send_request = send_request_fn; 322 TAILQ_INIT(&fs->files); 323 spdk_io_device_register(fs, _spdk_fs_channel_create, _spdk_fs_channel_destroy, 324 sizeof(struct spdk_fs_channel)); 325 326 fs->md_io_channel = spdk_get_io_channel(fs, SPDK_IO_PRIORITY_DEFAULT, true, (void *)&max_ops); 327 fs->md_fs_channel = spdk_io_channel_get_ctx(fs->md_io_channel); 328 329 fs->sync_io_channel = spdk_get_io_channel(fs, SPDK_IO_PRIORITY_DEFAULT, true, (void *)&max_ops); 330 fs->sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_io_channel); 331 332 __initialize_cache(); 333 334 return fs; 335 } 336 337 void 338 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 339 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 340 { 341 struct spdk_filesystem *fs; 342 struct spdk_fs_request *req; 343 struct spdk_fs_cb_args *args; 344 345 fs = fs_alloc(dev, send_request_fn); 346 if (fs == NULL) { 347 cb_fn(cb_arg, NULL, -ENOMEM); 348 return; 349 } 350 351 req = alloc_fs_request(fs->md_fs_channel); 352 if (req == NULL) { 353 cb_fn(cb_arg, NULL, -ENOMEM); 354 return; 355 } 356 357 args = &req->args; 358 args->fn.fs_op_with_handle = cb_fn; 359 args->arg = cb_arg; 360 args->fs = fs; 361 362 spdk_bs_init(dev, NULL, init_cb, req); 363 } 364 365 static struct spdk_file * 366 file_alloc(struct spdk_filesystem *fs) 367 { 368 struct spdk_file *file; 369 370 file = calloc(1, sizeof(*file)); 371 if (file == NULL) { 372 return NULL; 373 } 374 375 file->fs = fs; 376 TAILQ_INIT(&file->open_requests); 377 TAILQ_INIT(&file->sync_requests); 378 pthread_spin_init(&file->lock, 0); 379 file->tree = calloc(1, sizeof(*file->tree)); 380 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 381 file->priority = SPDK_FILE_PRIORITY_LOW; 382 return file; 383 } 384 385 static void 386 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 387 { 388 struct spdk_fs_request *req = ctx; 389 struct spdk_fs_cb_args *args = &req->args; 390 struct spdk_filesystem *fs = args->fs; 391 struct spdk_file *f; 392 uint64_t *length; 393 const char *name; 394 size_t value_len; 395 396 if (rc == -ENOENT) { 397 /* Finished iterating */ 398 args->fn.fs_op_with_handle(args->arg, fs, 0); 399 free_fs_request(req); 400 return; 401 } else if (rc < 0) { 402 args->fn.fs_op_with_handle(args->arg, fs, rc); 403 free_fs_request(req); 404 return; 405 } 406 407 rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len); 408 if (rc < 0) { 409 args->fn.fs_op_with_handle(args->arg, fs, rc); 410 free_fs_request(req); 411 return; 412 } 413 414 rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len); 415 if (rc < 0) { 416 args->fn.fs_op_with_handle(args->arg, fs, rc); 417 free_fs_request(req); 418 return; 419 } 420 assert(value_len == 8); 421 422 f = file_alloc(fs); 423 if (f == NULL) { 424 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 425 free_fs_request(req); 426 return; 427 } 428 429 f->name = strdup(name); 430 f->blobid = spdk_blob_get_id(blob); 431 f->length = *length; 432 f->length_flushed = *length; 433 f->append_pos = *length; 434 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 435 436 spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req); 437 } 438 439 static void 440 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 441 { 442 struct spdk_fs_request *req = ctx; 443 struct spdk_fs_cb_args *args = &req->args; 444 struct spdk_filesystem *fs = args->fs; 445 446 if (bserrno != 0) { 447 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 448 free_fs_request(req); 449 free(fs); 450 return; 451 } 452 453 common_fs_bs_init(fs, bs); 454 spdk_bs_md_iter_first(fs->bs, iter_cb, req); 455 } 456 457 void 458 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 459 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 460 { 461 struct spdk_filesystem *fs; 462 struct spdk_fs_cb_args *args; 463 struct spdk_fs_request *req; 464 465 fs = fs_alloc(dev, send_request_fn); 466 if (fs == NULL) { 467 cb_fn(cb_arg, NULL, -ENOMEM); 468 return; 469 } 470 471 req = alloc_fs_request(fs->md_fs_channel); 472 if (req == NULL) { 473 cb_fn(cb_arg, NULL, -ENOMEM); 474 return; 475 } 476 477 args = &req->args; 478 args->fn.fs_op_with_handle = cb_fn; 479 args->arg = cb_arg; 480 args->fs = fs; 481 482 spdk_bs_load(dev, load_cb, req); 483 } 484 485 static void 486 unload_cb(void *ctx, int bserrno) 487 { 488 struct spdk_fs_request *req = ctx; 489 struct spdk_fs_cb_args *args = &req->args; 490 struct spdk_filesystem *fs = args->fs; 491 492 args->fn.fs_op(args->arg, bserrno); 493 free(req); 494 spdk_io_device_unregister(fs); 495 free(fs); 496 } 497 498 void 499 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 500 { 501 struct spdk_fs_request *req; 502 struct spdk_fs_cb_args *args; 503 504 /* 505 * We must free the md_channel before unloading the blobstore, so just 506 * allocate this request from the general heap. 507 */ 508 req = calloc(1, sizeof(*req)); 509 if (req == NULL) { 510 cb_fn(cb_arg, -ENOMEM); 511 return; 512 } 513 514 args = &req->args; 515 args->fn.fs_op = cb_fn; 516 args->arg = cb_arg; 517 args->fs = fs; 518 519 spdk_fs_free_io_channel(fs->md_io_channel); 520 spdk_fs_free_io_channel(fs->sync_io_channel); 521 spdk_bs_unload(fs->bs, unload_cb, req); 522 } 523 524 static struct spdk_file * 525 fs_find_file(struct spdk_filesystem *fs, const char *name) 526 { 527 struct spdk_file *file; 528 529 TAILQ_FOREACH(file, &fs->files, tailq) { 530 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 531 return file; 532 } 533 } 534 535 return NULL; 536 } 537 538 void 539 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 540 spdk_file_stat_op_complete cb_fn, void *cb_arg) 541 { 542 struct spdk_file_stat stat; 543 struct spdk_file *f = NULL; 544 545 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 546 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 547 return; 548 } 549 550 f = fs_find_file(fs, name); 551 if (f != NULL) { 552 stat.blobid = f->blobid; 553 stat.size = f->length; 554 cb_fn(cb_arg, &stat, 0); 555 return; 556 } 557 558 cb_fn(cb_arg, NULL, -ENOENT); 559 } 560 561 static void 562 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 563 { 564 struct spdk_fs_request *req = arg; 565 struct spdk_fs_cb_args *args = &req->args; 566 567 args->rc = fserrno; 568 if (fserrno == 0) { 569 memcpy(args->arg, stat, sizeof(*stat)); 570 } 571 sem_post(args->sem); 572 } 573 574 static void 575 __file_stat(void *arg) 576 { 577 struct spdk_fs_request *req = arg; 578 struct spdk_fs_cb_args *args = &req->args; 579 580 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 581 args->fn.stat_op, req); 582 } 583 584 int 585 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 586 const char *name, struct spdk_file_stat *stat) 587 { 588 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 589 struct spdk_fs_request *req; 590 int rc; 591 592 req = alloc_fs_request(channel); 593 assert(req != NULL); 594 595 req->args.fs = fs; 596 req->args.op.stat.name = name; 597 req->args.fn.stat_op = __copy_stat; 598 req->args.arg = stat; 599 req->args.sem = &channel->sem; 600 channel->send_request(__file_stat, req); 601 sem_wait(&channel->sem); 602 603 rc = req->args.rc; 604 free_fs_request(req); 605 606 return rc; 607 } 608 609 static void 610 fs_create_blob_close_cb(void *ctx, int bserrno) 611 { 612 struct spdk_fs_request *req = ctx; 613 struct spdk_fs_cb_args *args = &req->args; 614 615 args->fn.file_op(args->arg, bserrno); 616 free_fs_request(req); 617 } 618 619 static void 620 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 621 { 622 struct spdk_fs_request *req = ctx; 623 struct spdk_fs_cb_args *args = &req->args; 624 struct spdk_file *f = args->file; 625 uint64_t length = 0; 626 627 f->blob = blob; 628 spdk_bs_md_resize_blob(blob, 1); 629 spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 630 spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length)); 631 632 spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args); 633 } 634 635 static void 636 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 637 { 638 struct spdk_fs_request *req = ctx; 639 struct spdk_fs_cb_args *args = &req->args; 640 struct spdk_file *f = args->file; 641 642 f->blobid = blobid; 643 spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 644 } 645 646 void 647 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 648 spdk_file_op_complete cb_fn, void *cb_arg) 649 { 650 struct spdk_file *file; 651 struct spdk_fs_request *req; 652 struct spdk_fs_cb_args *args; 653 654 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 655 cb_fn(cb_arg, -ENAMETOOLONG); 656 return; 657 } 658 659 file = fs_find_file(fs, name); 660 if (file != NULL) { 661 cb_fn(cb_arg, -EEXIST); 662 return; 663 } 664 665 file = file_alloc(fs); 666 if (file == NULL) { 667 cb_fn(cb_arg, -ENOMEM); 668 return; 669 } 670 671 req = alloc_fs_request(fs->md_fs_channel); 672 if (req == NULL) { 673 cb_fn(cb_arg, -ENOMEM); 674 return; 675 } 676 677 args = &req->args; 678 args->file = file; 679 args->fn.file_op = cb_fn; 680 args->arg = cb_arg; 681 682 file->name = strdup(name); 683 spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args); 684 } 685 686 static void 687 __fs_create_file_done(void *arg, int fserrno) 688 { 689 struct spdk_fs_request *req = arg; 690 struct spdk_fs_cb_args *args = &req->args; 691 692 args->rc = fserrno; 693 sem_post(args->sem); 694 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 695 } 696 697 static void 698 __fs_create_file(void *arg) 699 { 700 struct spdk_fs_request *req = arg; 701 struct spdk_fs_cb_args *args = &req->args; 702 703 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 704 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 705 } 706 707 int 708 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 709 { 710 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 711 struct spdk_fs_request *req; 712 struct spdk_fs_cb_args *args; 713 int rc; 714 715 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 716 717 req = alloc_fs_request(channel); 718 assert(req != NULL); 719 720 args = &req->args; 721 args->fs = fs; 722 args->op.create.name = name; 723 args->sem = &channel->sem; 724 fs->send_request(__fs_create_file, req); 725 sem_wait(&channel->sem); 726 rc = args->rc; 727 free_fs_request(req); 728 729 return rc; 730 } 731 732 static void 733 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 734 { 735 struct spdk_fs_request *req = ctx; 736 struct spdk_fs_cb_args *args = &req->args; 737 struct spdk_file *f = args->file; 738 739 f->blob = blob; 740 while (!TAILQ_EMPTY(&f->open_requests)) { 741 req = TAILQ_FIRST(&f->open_requests); 742 args = &req->args; 743 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 744 args->fn.file_op_with_handle(args->arg, f, bserrno); 745 free_fs_request(req); 746 } 747 } 748 749 static void 750 fs_open_blob_create_cb(void *ctx, int bserrno) 751 { 752 struct spdk_fs_request *req = ctx; 753 struct spdk_fs_cb_args *args = &req->args; 754 struct spdk_file *file = args->file; 755 struct spdk_filesystem *fs = args->fs; 756 757 if (file == NULL) { 758 /* 759 * This is from an open with CREATE flag - the file 760 * is now created so look it up in the file list for this 761 * filesystem. 762 */ 763 file = fs_find_file(fs, args->op.open.name); 764 assert(file != NULL); 765 args->file = file; 766 } 767 768 file->ref_count++; 769 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 770 if (file->ref_count == 1) { 771 assert(file->blob == NULL); 772 spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 773 } else if (file->blob != NULL) { 774 fs_open_blob_done(req, file->blob, 0); 775 } else { 776 /* 777 * The blob open for this file is in progress due to a previous 778 * open request. When that open completes, it will invoke the 779 * open callback for this request. 780 */ 781 } 782 } 783 784 void 785 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 786 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 787 { 788 struct spdk_file *f = NULL; 789 struct spdk_fs_request *req; 790 struct spdk_fs_cb_args *args; 791 792 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 793 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 794 return; 795 } 796 797 f = fs_find_file(fs, name); 798 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 799 cb_fn(cb_arg, NULL, -ENOENT); 800 return; 801 } 802 803 req = alloc_fs_request(fs->md_fs_channel); 804 if (req == NULL) { 805 cb_fn(cb_arg, NULL, -ENOMEM); 806 return; 807 } 808 809 args = &req->args; 810 args->fn.file_op_with_handle = cb_fn; 811 args->arg = cb_arg; 812 args->file = f; 813 args->fs = fs; 814 args->op.open.name = name; 815 816 if (f == NULL) { 817 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 818 } else { 819 fs_open_blob_create_cb(req, 0); 820 } 821 } 822 823 static void 824 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 825 { 826 struct spdk_fs_request *req = arg; 827 struct spdk_fs_cb_args *args = &req->args; 828 829 args->file = file; 830 args->rc = bserrno; 831 sem_post(args->sem); 832 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 833 } 834 835 static void 836 __fs_open_file(void *arg) 837 { 838 struct spdk_fs_request *req = arg; 839 struct spdk_fs_cb_args *args = &req->args; 840 841 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 842 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 843 __fs_open_file_done, req); 844 } 845 846 int 847 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 848 const char *name, uint32_t flags, struct spdk_file **file) 849 { 850 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 851 struct spdk_fs_request *req; 852 struct spdk_fs_cb_args *args; 853 int rc; 854 855 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 856 857 req = alloc_fs_request(channel); 858 assert(req != NULL); 859 860 args = &req->args; 861 args->fs = fs; 862 args->op.open.name = name; 863 args->op.open.flags = flags; 864 args->sem = &channel->sem; 865 fs->send_request(__fs_open_file, req); 866 sem_wait(&channel->sem); 867 rc = args->rc; 868 if (rc == 0) { 869 *file = args->file; 870 } else { 871 *file = NULL; 872 } 873 free_fs_request(req); 874 875 return rc; 876 } 877 878 static void 879 fs_rename_blob_close_cb(void *ctx, int bserrno) 880 { 881 struct spdk_fs_request *req = ctx; 882 struct spdk_fs_cb_args *args = &req->args; 883 884 args->fn.fs_op(args->arg, bserrno); 885 free_fs_request(req); 886 } 887 888 static void 889 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 890 { 891 struct spdk_fs_request *req = ctx; 892 struct spdk_fs_cb_args *args = &req->args; 893 struct spdk_file *f = args->file; 894 const char *new_name = args->op.rename.new_name; 895 896 f->blob = blob; 897 spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 898 spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req); 899 } 900 901 static void 902 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 903 { 904 struct spdk_fs_cb_args *args = &req->args; 905 struct spdk_file *f; 906 907 f = fs_find_file(args->fs, args->op.rename.old_name); 908 if (f == NULL) { 909 args->fn.fs_op(args->arg, -ENOENT); 910 free_fs_request(req); 911 return; 912 } 913 914 free(f->name); 915 f->name = strdup(args->op.rename.new_name); 916 args->file = f; 917 spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 918 } 919 920 static void 921 fs_rename_delete_done(void *arg, int fserrno) 922 { 923 __spdk_fs_md_rename_file(arg); 924 } 925 926 void 927 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 928 const char *old_name, const char *new_name, 929 spdk_file_op_complete cb_fn, void *cb_arg) 930 { 931 struct spdk_file *f; 932 struct spdk_fs_request *req; 933 struct spdk_fs_cb_args *args; 934 935 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name); 936 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 937 cb_fn(cb_arg, -ENAMETOOLONG); 938 return; 939 } 940 941 req = alloc_fs_request(fs->md_fs_channel); 942 if (req == NULL) { 943 cb_fn(cb_arg, -ENOMEM); 944 return; 945 } 946 947 args = &req->args; 948 args->fn.fs_op = cb_fn; 949 args->fs = fs; 950 args->arg = cb_arg; 951 args->op.rename.old_name = old_name; 952 args->op.rename.new_name = new_name; 953 954 f = fs_find_file(fs, new_name); 955 if (f == NULL) { 956 __spdk_fs_md_rename_file(req); 957 return; 958 } 959 960 /* 961 * The rename overwrites an existing file. So delete the existing file, then 962 * do the actual rename. 963 */ 964 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 965 } 966 967 static void 968 __fs_rename_file_done(void *arg, int fserrno) 969 { 970 struct spdk_fs_request *req = arg; 971 struct spdk_fs_cb_args *args = &req->args; 972 973 args->rc = fserrno; 974 sem_post(args->sem); 975 } 976 977 static void 978 __fs_rename_file(void *arg) 979 { 980 struct spdk_fs_request *req = arg; 981 struct spdk_fs_cb_args *args = &req->args; 982 983 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 984 __fs_rename_file_done, req); 985 } 986 987 int 988 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 989 const char *old_name, const char *new_name) 990 { 991 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 992 struct spdk_fs_request *req; 993 struct spdk_fs_cb_args *args; 994 int rc; 995 996 req = alloc_fs_request(channel); 997 assert(req != NULL); 998 999 args = &req->args; 1000 1001 args->fs = fs; 1002 args->op.rename.old_name = old_name; 1003 args->op.rename.new_name = new_name; 1004 args->sem = &channel->sem; 1005 fs->send_request(__fs_rename_file, req); 1006 sem_wait(&channel->sem); 1007 rc = args->rc; 1008 free_fs_request(req); 1009 return rc; 1010 } 1011 1012 static void 1013 blob_delete_cb(void *ctx, int bserrno) 1014 { 1015 struct spdk_fs_request *req = ctx; 1016 struct spdk_fs_cb_args *args = &req->args; 1017 1018 args->fn.file_op(args->arg, bserrno); 1019 free_fs_request(req); 1020 } 1021 1022 void 1023 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1024 spdk_file_op_complete cb_fn, void *cb_arg) 1025 { 1026 struct spdk_file *f; 1027 spdk_blob_id blobid; 1028 struct spdk_fs_request *req; 1029 struct spdk_fs_cb_args *args; 1030 1031 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1032 1033 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1034 cb_fn(cb_arg, -ENAMETOOLONG); 1035 return; 1036 } 1037 1038 f = fs_find_file(fs, name); 1039 if (f == NULL) { 1040 cb_fn(cb_arg, -ENOENT); 1041 return; 1042 } 1043 1044 if (f->ref_count > 0) { 1045 /* For now, do not allow deleting files with open references. */ 1046 cb_fn(cb_arg, -EBUSY); 1047 return; 1048 } 1049 1050 req = alloc_fs_request(fs->md_fs_channel); 1051 if (req == NULL) { 1052 cb_fn(cb_arg, -ENOMEM); 1053 return; 1054 } 1055 1056 TAILQ_REMOVE(&fs->files, f, tailq); 1057 1058 cache_free_buffers(f); 1059 1060 blobid = f->blobid; 1061 1062 free(f->name); 1063 free(f->tree); 1064 free(f); 1065 1066 args = &req->args; 1067 args->fn.file_op = cb_fn; 1068 args->arg = cb_arg; 1069 spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1070 } 1071 1072 static void 1073 __fs_delete_file_done(void *arg, int fserrno) 1074 { 1075 struct spdk_fs_request *req = arg; 1076 struct spdk_fs_cb_args *args = &req->args; 1077 1078 args->rc = fserrno; 1079 sem_post(args->sem); 1080 } 1081 1082 static void 1083 __fs_delete_file(void *arg) 1084 { 1085 struct spdk_fs_request *req = arg; 1086 struct spdk_fs_cb_args *args = &req->args; 1087 1088 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1089 } 1090 1091 int 1092 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1093 const char *name) 1094 { 1095 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1096 struct spdk_fs_request *req; 1097 struct spdk_fs_cb_args *args; 1098 int rc; 1099 1100 req = alloc_fs_request(channel); 1101 assert(req != NULL); 1102 1103 args = &req->args; 1104 args->fs = fs; 1105 args->op.delete.name = name; 1106 args->sem = &channel->sem; 1107 fs->send_request(__fs_delete_file, req); 1108 sem_wait(&channel->sem); 1109 rc = args->rc; 1110 free_fs_request(req); 1111 1112 return rc; 1113 } 1114 1115 spdk_fs_iter 1116 spdk_fs_iter_first(struct spdk_filesystem *fs) 1117 { 1118 struct spdk_file *f; 1119 1120 f = TAILQ_FIRST(&fs->files); 1121 return f; 1122 } 1123 1124 spdk_fs_iter 1125 spdk_fs_iter_next(spdk_fs_iter iter) 1126 { 1127 struct spdk_file *f = iter; 1128 1129 if (f == NULL) { 1130 return NULL; 1131 } 1132 1133 f = TAILQ_NEXT(f, tailq); 1134 return f; 1135 } 1136 1137 const char * 1138 spdk_file_get_name(struct spdk_file *file) 1139 { 1140 return file->name; 1141 } 1142 1143 uint64_t 1144 spdk_file_get_length(struct spdk_file *file) 1145 { 1146 assert(file != NULL); 1147 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1148 return file->length; 1149 } 1150 1151 static void 1152 fs_truncate_complete_cb(void *ctx, int bserrno) 1153 { 1154 struct spdk_fs_request *req = ctx; 1155 struct spdk_fs_cb_args *args = &req->args; 1156 1157 args->fn.file_op(args->arg, bserrno); 1158 free_fs_request(req); 1159 } 1160 1161 static uint64_t 1162 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1163 { 1164 return (length + cluster_sz - 1) / cluster_sz; 1165 } 1166 1167 void 1168 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1169 spdk_file_op_complete cb_fn, void *cb_arg) 1170 { 1171 struct spdk_filesystem *fs; 1172 size_t num_clusters; 1173 struct spdk_fs_request *req; 1174 struct spdk_fs_cb_args *args; 1175 1176 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1177 if (length == file->length) { 1178 cb_fn(cb_arg, 0); 1179 return; 1180 } 1181 1182 req = alloc_fs_request(file->fs->md_fs_channel); 1183 if (req == NULL) { 1184 cb_fn(cb_arg, -ENOMEM); 1185 return; 1186 } 1187 1188 args = &req->args; 1189 args->fn.file_op = cb_fn; 1190 args->arg = cb_arg; 1191 args->file = file; 1192 fs = file->fs; 1193 1194 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1195 1196 spdk_bs_md_resize_blob(file->blob, num_clusters); 1197 spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length)); 1198 1199 file->length = length; 1200 if (file->append_pos > file->length) { 1201 file->append_pos = file->length; 1202 } 1203 1204 spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args); 1205 } 1206 1207 static void 1208 __truncate(void *arg) 1209 { 1210 struct spdk_fs_request *req = arg; 1211 struct spdk_fs_cb_args *args = &req->args; 1212 1213 spdk_file_truncate_async(args->file, args->op.truncate.length, 1214 args->fn.file_op, args->arg); 1215 } 1216 1217 void 1218 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1219 uint64_t length) 1220 { 1221 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1222 struct spdk_fs_request *req; 1223 struct spdk_fs_cb_args *args; 1224 1225 req = alloc_fs_request(channel); 1226 assert(req != NULL); 1227 1228 args = &req->args; 1229 1230 args->file = file; 1231 args->op.truncate.length = length; 1232 args->fn.file_op = __sem_post; 1233 args->arg = &channel->sem; 1234 1235 channel->send_request(__truncate, req); 1236 sem_wait(&channel->sem); 1237 free_fs_request(req); 1238 } 1239 1240 static void 1241 __rw_done(void *ctx, int bserrno) 1242 { 1243 struct spdk_fs_request *req = ctx; 1244 struct spdk_fs_cb_args *args = &req->args; 1245 1246 spdk_free(args->op.rw.pin_buf); 1247 args->fn.file_op(args->arg, bserrno); 1248 free_fs_request(req); 1249 } 1250 1251 static void 1252 __read_done(void *ctx, int bserrno) 1253 { 1254 struct spdk_fs_request *req = ctx; 1255 struct spdk_fs_cb_args *args = &req->args; 1256 1257 if (args->op.rw.is_read) { 1258 memcpy(args->op.rw.user_buf, 1259 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1260 args->op.rw.length); 1261 __rw_done(req, 0); 1262 } else { 1263 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1264 args->op.rw.user_buf, 1265 args->op.rw.length); 1266 spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel, 1267 args->op.rw.pin_buf, 1268 args->op.rw.start_page, args->op.rw.num_pages, 1269 __rw_done, req); 1270 } 1271 } 1272 1273 static void 1274 __do_blob_read(void *ctx, int fserrno) 1275 { 1276 struct spdk_fs_request *req = ctx; 1277 struct spdk_fs_cb_args *args = &req->args; 1278 1279 spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel, 1280 args->op.rw.pin_buf, 1281 args->op.rw.start_page, args->op.rw.num_pages, 1282 __read_done, req); 1283 } 1284 1285 static void 1286 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1287 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1288 { 1289 uint64_t end_page; 1290 1291 *page_size = spdk_bs_get_page_size(file->fs->bs); 1292 *start_page = offset / *page_size; 1293 end_page = (offset + length - 1) / *page_size; 1294 *num_pages = (end_page - *start_page + 1); 1295 } 1296 1297 static void 1298 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1299 void *payload, uint64_t offset, uint64_t length, 1300 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1301 { 1302 struct spdk_fs_request *req; 1303 struct spdk_fs_cb_args *args; 1304 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1305 uint64_t start_page, num_pages, pin_buf_length; 1306 uint32_t page_size; 1307 1308 if (is_read && offset + length > file->length) { 1309 cb_fn(cb_arg, -EINVAL); 1310 return; 1311 } 1312 1313 req = alloc_fs_request(channel); 1314 if (req == NULL) { 1315 cb_fn(cb_arg, -ENOMEM); 1316 return; 1317 } 1318 1319 args = &req->args; 1320 args->fn.file_op = cb_fn; 1321 args->arg = cb_arg; 1322 args->file = file; 1323 args->op.rw.channel = channel->bs_channel; 1324 args->op.rw.user_buf = payload; 1325 args->op.rw.is_read = is_read; 1326 args->op.rw.offset = offset; 1327 args->op.rw.length = length; 1328 1329 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1330 pin_buf_length = num_pages * page_size; 1331 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, 4096, NULL); 1332 1333 args->op.rw.start_page = start_page; 1334 args->op.rw.num_pages = num_pages; 1335 1336 if (!is_read && file->length < offset + length) { 1337 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1338 } else { 1339 __do_blob_read(req, 0); 1340 } 1341 } 1342 1343 void 1344 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1345 void *payload, uint64_t offset, uint64_t length, 1346 spdk_file_op_complete cb_fn, void *cb_arg) 1347 { 1348 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1349 } 1350 1351 void 1352 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1353 void *payload, uint64_t offset, uint64_t length, 1354 spdk_file_op_complete cb_fn, void *cb_arg) 1355 { 1356 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n", 1357 file->name, offset, length); 1358 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1359 } 1360 1361 struct spdk_io_channel * 1362 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs, uint32_t priority) 1363 { 1364 struct spdk_io_channel *io_channel; 1365 struct spdk_fs_channel *fs_channel; 1366 uint32_t max_ops = 512; 1367 1368 io_channel = spdk_get_io_channel(fs, priority, true, (void *)&max_ops); 1369 fs_channel = spdk_io_channel_get_ctx(io_channel); 1370 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT); 1371 fs_channel->send_request = __send_request_direct; 1372 1373 return io_channel; 1374 } 1375 1376 struct spdk_io_channel * 1377 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs, uint32_t priority) 1378 { 1379 struct spdk_io_channel *io_channel; 1380 struct spdk_fs_channel *fs_channel; 1381 uint32_t max_ops = 16; 1382 1383 io_channel = spdk_get_io_channel(fs, priority, true, (void *)&max_ops); 1384 fs_channel = spdk_io_channel_get_ctx(io_channel); 1385 fs_channel->send_request = fs->send_request; 1386 1387 return io_channel; 1388 } 1389 1390 void 1391 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1392 { 1393 spdk_put_io_channel(channel); 1394 } 1395 1396 void 1397 spdk_fs_set_cache_size(uint64_t size_in_mb) 1398 { 1399 g_fs_cache_size = size_in_mb * 1024 * 1024; 1400 } 1401 1402 uint64_t 1403 spdk_fs_get_cache_size(void) 1404 { 1405 return g_fs_cache_size / (1024 * 1024); 1406 } 1407 1408 static void __file_flush(void *_args); 1409 1410 static void * 1411 alloc_cache_memory_buffer(struct spdk_file *context) 1412 { 1413 struct spdk_file *file; 1414 void *buf; 1415 1416 buf = spdk_mempool_get(g_cache_pool); 1417 if (buf != NULL) { 1418 return buf; 1419 } 1420 1421 pthread_spin_lock(&g_caches_lock); 1422 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1423 if (!file->open_for_writing && 1424 file->priority == SPDK_FILE_PRIORITY_LOW && 1425 file != context) { 1426 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1427 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1428 break; 1429 } 1430 } 1431 pthread_spin_unlock(&g_caches_lock); 1432 if (file != NULL) { 1433 cache_free_buffers(file); 1434 buf = spdk_mempool_get(g_cache_pool); 1435 if (buf != NULL) { 1436 return buf; 1437 } 1438 } 1439 1440 pthread_spin_lock(&g_caches_lock); 1441 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1442 if (!file->open_for_writing && file != context) { 1443 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1444 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1445 break; 1446 } 1447 } 1448 pthread_spin_unlock(&g_caches_lock); 1449 if (file != NULL) { 1450 cache_free_buffers(file); 1451 buf = spdk_mempool_get(g_cache_pool); 1452 if (buf != NULL) { 1453 return buf; 1454 } 1455 } 1456 1457 pthread_spin_lock(&g_caches_lock); 1458 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1459 if (file != context) { 1460 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1461 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1462 break; 1463 } 1464 } 1465 pthread_spin_unlock(&g_caches_lock); 1466 if (file != NULL) { 1467 cache_free_buffers(file); 1468 buf = spdk_mempool_get(g_cache_pool); 1469 if (buf != NULL) { 1470 return buf; 1471 } 1472 } 1473 1474 assert(false); 1475 return NULL; 1476 } 1477 1478 static struct cache_buffer * 1479 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1480 { 1481 struct cache_buffer *buf; 1482 int count = 0; 1483 1484 buf = calloc(1, sizeof(*buf)); 1485 if (buf == NULL) { 1486 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "calloc failed\n"); 1487 return NULL; 1488 } 1489 1490 buf->buf = alloc_cache_memory_buffer(file); 1491 if (buf->buf == NULL) { 1492 while (buf->buf == NULL) { 1493 /* 1494 * TODO: alloc_cache_memory_buffer() should eventually free 1495 * some buffers. Need a more sophisticated check here, instead 1496 * of just bailing if 100 tries does not result in getting a 1497 * free buffer. This will involve using the sync channel's 1498 * semaphore to block until a buffer becomes available. 1499 */ 1500 if (count++ == 100) { 1501 SPDK_ERRLOG("could not allocate cache buffer\n"); 1502 assert(false); 1503 free(buf); 1504 return NULL; 1505 } 1506 buf->buf = alloc_cache_memory_buffer(file); 1507 } 1508 } 1509 1510 buf->buf_size = CACHE_BUFFER_SIZE; 1511 buf->offset = offset; 1512 1513 pthread_spin_lock(&g_caches_lock); 1514 if (file->tree->present_mask == 0) { 1515 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1516 } 1517 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1518 pthread_spin_unlock(&g_caches_lock); 1519 1520 return buf; 1521 } 1522 1523 static struct cache_buffer * 1524 cache_append_buffer(struct spdk_file *file) 1525 { 1526 struct cache_buffer *last; 1527 1528 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1529 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1530 1531 last = cache_insert_buffer(file, file->append_pos); 1532 if (last == NULL) { 1533 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n"); 1534 return NULL; 1535 } 1536 1537 if (file->last != NULL) { 1538 file->last->next = last; 1539 } 1540 file->last = last; 1541 1542 return last; 1543 } 1544 1545 static void 1546 __wake_caller(struct spdk_fs_cb_args *args) 1547 { 1548 sem_post(args->sem); 1549 } 1550 1551 static void 1552 __file_cache_finish_sync(struct spdk_file *file) 1553 { 1554 struct spdk_fs_request *sync_req; 1555 struct spdk_fs_cb_args *sync_args; 1556 1557 pthread_spin_lock(&file->lock); 1558 while (!TAILQ_EMPTY(&file->sync_requests)) { 1559 sync_req = TAILQ_FIRST(&file->sync_requests); 1560 sync_args = &sync_req->args; 1561 if (sync_args->op.sync.offset > file->length_flushed) { 1562 break; 1563 } 1564 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1565 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1566 pthread_spin_unlock(&file->lock); 1567 sync_args->fn.file_op(sync_args->arg, 0); 1568 pthread_spin_lock(&file->lock); 1569 free_fs_request(sync_req); 1570 } 1571 pthread_spin_unlock(&file->lock); 1572 } 1573 1574 static void 1575 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1576 { 1577 struct spdk_file *file = ctx; 1578 1579 __file_cache_finish_sync(file); 1580 } 1581 1582 static void 1583 __free_args(struct spdk_fs_cb_args *args) 1584 { 1585 struct spdk_fs_request *req; 1586 1587 if (!args->from_request) { 1588 free(args); 1589 } else { 1590 /* Depends on args being at the start of the spdk_fs_request structure. */ 1591 req = (struct spdk_fs_request *)args; 1592 free_fs_request(req); 1593 } 1594 } 1595 1596 static void 1597 __file_flush_done(void *arg, int bserrno) 1598 { 1599 struct spdk_fs_cb_args *args = arg; 1600 struct spdk_fs_request *sync_req; 1601 struct spdk_file *file = args->file; 1602 struct cache_buffer *next = args->op.flush.cache_buffer; 1603 1604 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1605 1606 pthread_spin_lock(&file->lock); 1607 next->in_progress = false; 1608 next->bytes_flushed += args->op.flush.length; 1609 file->length_flushed += args->op.flush.length; 1610 if (file->length_flushed > file->length) { 1611 file->length = file->length_flushed; 1612 } 1613 if (next->bytes_flushed == next->buf_size) { 1614 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1615 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1616 } 1617 1618 TAILQ_FOREACH_REVERSE(sync_req, &file->sync_requests, sync_requests_head, args.op.sync.tailq) { 1619 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1620 break; 1621 } 1622 } 1623 1624 /* 1625 * Assert that there is no cached data that extends past the end of the underlying 1626 * blob. 1627 */ 1628 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1629 next->bytes_filled == 0); 1630 1631 if (sync_req != NULL) { 1632 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1633 spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed, 1634 sizeof(file->length_flushed)); 1635 1636 pthread_spin_unlock(&file->lock); 1637 spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file); 1638 } else { 1639 pthread_spin_unlock(&file->lock); 1640 __file_cache_finish_sync(file); 1641 } 1642 1643 __file_flush(args); 1644 } 1645 1646 static void 1647 __file_flush(void *_args) 1648 { 1649 struct spdk_fs_cb_args *args = _args; 1650 struct spdk_file *file = args->file; 1651 struct cache_buffer *next; 1652 uint64_t offset, length, start_page, num_pages; 1653 uint32_t page_size; 1654 1655 pthread_spin_lock(&file->lock); 1656 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1657 if (next == NULL || next->in_progress) { 1658 /* 1659 * There is either no data to flush, or a flush I/O is already in 1660 * progress. So return immediately - if a flush I/O is in 1661 * progress we will flush more data after that is completed. 1662 */ 1663 __free_args(args); 1664 pthread_spin_unlock(&file->lock); 1665 return; 1666 } 1667 1668 offset = next->offset + next->bytes_flushed; 1669 length = next->bytes_filled - next->bytes_flushed; 1670 if (length == 0) { 1671 __free_args(args); 1672 pthread_spin_unlock(&file->lock); 1673 return; 1674 } 1675 args->op.flush.length = length; 1676 args->op.flush.cache_buffer = next; 1677 1678 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1679 1680 next->in_progress = true; 1681 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1682 offset, length, start_page, num_pages); 1683 pthread_spin_unlock(&file->lock); 1684 spdk_bs_io_write_blob(file->blob, file->fs->sync_fs_channel->bs_channel, 1685 next->buf + (start_page * page_size) - next->offset, 1686 start_page, num_pages, 1687 __file_flush_done, args); 1688 } 1689 1690 static void 1691 __file_extend_done(void *arg, int bserrno) 1692 { 1693 struct spdk_fs_cb_args *args = arg; 1694 1695 __wake_caller(args); 1696 } 1697 1698 static void 1699 __file_extend_blob(void *_args) 1700 { 1701 struct spdk_fs_cb_args *args = _args; 1702 struct spdk_file *file = args->file; 1703 1704 spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters); 1705 1706 spdk_bs_md_sync_blob(file->blob, __file_extend_done, args); 1707 } 1708 1709 static void 1710 __rw_from_file_done(void *arg, int bserrno) 1711 { 1712 struct spdk_fs_cb_args *args = arg; 1713 1714 __wake_caller(args); 1715 __free_args(args); 1716 } 1717 1718 static void 1719 __rw_from_file(void *_args) 1720 { 1721 struct spdk_fs_cb_args *args = _args; 1722 struct spdk_file *file = args->file; 1723 1724 if (args->op.rw.is_read) { 1725 spdk_file_read_async(file, file->fs->sync_io_channel, args->op.rw.user_buf, 1726 args->op.rw.offset, args->op.rw.length, 1727 __rw_from_file_done, args); 1728 } else { 1729 spdk_file_write_async(file, file->fs->sync_io_channel, args->op.rw.user_buf, 1730 args->op.rw.offset, args->op.rw.length, 1731 __rw_from_file_done, args); 1732 } 1733 } 1734 1735 static int 1736 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1737 uint64_t offset, uint64_t length, bool is_read) 1738 { 1739 struct spdk_fs_cb_args *args; 1740 1741 args = calloc(1, sizeof(*args)); 1742 if (args == NULL) { 1743 sem_post(sem); 1744 return -ENOMEM; 1745 } 1746 1747 args->file = file; 1748 args->sem = sem; 1749 args->op.rw.user_buf = payload; 1750 args->op.rw.offset = offset; 1751 args->op.rw.length = length; 1752 args->op.rw.is_read = is_read; 1753 file->fs->send_request(__rw_from_file, args); 1754 return 0; 1755 } 1756 1757 int 1758 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 1759 void *payload, uint64_t offset, uint64_t length) 1760 { 1761 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1762 struct spdk_fs_cb_args *args; 1763 uint64_t rem_length, copy, blob_size, cluster_sz; 1764 uint32_t cache_buffers_filled = 0; 1765 uint8_t *cur_payload; 1766 struct cache_buffer *last; 1767 1768 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 1769 1770 if (length == 0) { 1771 return 0; 1772 } 1773 1774 if (offset != file->append_pos) { 1775 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 1776 return -EINVAL; 1777 } 1778 1779 pthread_spin_lock(&file->lock); 1780 file->open_for_writing = true; 1781 1782 if (file->last == NULL) { 1783 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 1784 cache_append_buffer(file); 1785 } else { 1786 int rc; 1787 1788 file->append_pos += length; 1789 rc = __send_rw_from_file(file, &channel->sem, payload, 1790 offset, length, false); 1791 pthread_spin_unlock(&file->lock); 1792 sem_wait(&channel->sem); 1793 return rc; 1794 } 1795 } 1796 1797 blob_size = __file_get_blob_size(file); 1798 1799 if ((offset + length) > blob_size) { 1800 struct spdk_fs_cb_args extend_args = {}; 1801 1802 cluster_sz = file->fs->bs_opts.cluster_sz; 1803 extend_args.sem = &channel->sem; 1804 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 1805 extend_args.file = file; 1806 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 1807 pthread_spin_unlock(&file->lock); 1808 file->fs->send_request(__file_extend_blob, &extend_args); 1809 sem_wait(&channel->sem); 1810 } 1811 1812 last = file->last; 1813 rem_length = length; 1814 cur_payload = payload; 1815 while (rem_length > 0) { 1816 copy = last->buf_size - last->bytes_filled; 1817 if (copy > rem_length) { 1818 copy = rem_length; 1819 } 1820 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 1821 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 1822 file->append_pos += copy; 1823 if (file->length < file->append_pos) { 1824 file->length = file->append_pos; 1825 } 1826 cur_payload += copy; 1827 last->bytes_filled += copy; 1828 rem_length -= copy; 1829 if (last->bytes_filled == last->buf_size) { 1830 cache_buffers_filled++; 1831 last = cache_append_buffer(file); 1832 if (last == NULL) { 1833 BLOBFS_TRACE(file, "nomem\n"); 1834 pthread_spin_unlock(&file->lock); 1835 return -ENOMEM; 1836 } 1837 } 1838 } 1839 1840 if (cache_buffers_filled == 0) { 1841 pthread_spin_unlock(&file->lock); 1842 return 0; 1843 } 1844 1845 args = calloc(1, sizeof(*args)); 1846 if (args == NULL) { 1847 pthread_spin_unlock(&file->lock); 1848 return -ENOMEM; 1849 } 1850 1851 args->file = file; 1852 file->fs->send_request(__file_flush, args); 1853 pthread_spin_unlock(&file->lock); 1854 return 0; 1855 } 1856 1857 static void 1858 __readahead_done(void *arg, int bserrno) 1859 { 1860 struct spdk_fs_cb_args *args = arg; 1861 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 1862 struct spdk_file *file = args->file; 1863 1864 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 1865 1866 pthread_spin_lock(&file->lock); 1867 cache_buffer->bytes_filled = args->op.readahead.length; 1868 cache_buffer->bytes_flushed = args->op.readahead.length; 1869 cache_buffer->in_progress = false; 1870 pthread_spin_unlock(&file->lock); 1871 1872 __free_args(args); 1873 } 1874 1875 static void 1876 __readahead(void *_args) 1877 { 1878 struct spdk_fs_cb_args *args = _args; 1879 struct spdk_file *file = args->file; 1880 uint64_t offset, length, start_page, num_pages; 1881 uint32_t page_size; 1882 1883 offset = args->op.readahead.offset; 1884 length = args->op.readahead.length; 1885 assert(length > 0); 1886 1887 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1888 1889 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1890 offset, length, start_page, num_pages); 1891 spdk_bs_io_read_blob(file->blob, file->fs->sync_fs_channel->bs_channel, 1892 args->op.readahead.cache_buffer->buf, 1893 start_page, num_pages, 1894 __readahead_done, args); 1895 } 1896 1897 static uint64_t 1898 __next_cache_buffer_offset(uint64_t offset) 1899 { 1900 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 1901 } 1902 1903 static void 1904 check_readahead(struct spdk_file *file, uint64_t offset) 1905 { 1906 struct spdk_fs_cb_args *args; 1907 1908 offset = __next_cache_buffer_offset(offset); 1909 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 1910 return; 1911 } 1912 1913 args = calloc(1, sizeof(*args)); 1914 if (args == NULL) { 1915 return; 1916 } 1917 1918 BLOBFS_TRACE(file, "offset=%jx\n", offset); 1919 1920 args->file = file; 1921 args->op.readahead.offset = offset; 1922 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 1923 args->op.readahead.cache_buffer->in_progress = true; 1924 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 1925 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 1926 } else { 1927 args->op.readahead.length = CACHE_BUFFER_SIZE; 1928 } 1929 file->fs->send_request(__readahead, args); 1930 } 1931 1932 static int 1933 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 1934 { 1935 struct cache_buffer *buf; 1936 1937 buf = spdk_tree_find_filled_buffer(file->tree, offset); 1938 if (buf == NULL) { 1939 return __send_rw_from_file(file, sem, payload, offset, length, true); 1940 } 1941 1942 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 1943 length = buf->offset + buf->bytes_filled - offset; 1944 } 1945 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 1946 memcpy(payload, &buf->buf[offset - buf->offset], length); 1947 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 1948 pthread_spin_lock(&g_caches_lock); 1949 spdk_tree_remove_buffer(file->tree, buf); 1950 if (file->tree->present_mask == 0) { 1951 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1952 } 1953 pthread_spin_unlock(&g_caches_lock); 1954 } 1955 1956 sem_post(sem); 1957 return 0; 1958 } 1959 1960 int64_t 1961 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 1962 void *payload, uint64_t offset, uint64_t length) 1963 { 1964 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1965 uint64_t final_offset, final_length; 1966 uint32_t sub_reads = 0; 1967 int rc = 0; 1968 1969 pthread_spin_lock(&file->lock); 1970 1971 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 1972 1973 file->open_for_writing = false; 1974 1975 if (length == 0 || offset >= file->length) { 1976 pthread_spin_unlock(&file->lock); 1977 return 0; 1978 } 1979 1980 if (offset + length > file->length) { 1981 length = file->length - offset; 1982 } 1983 1984 if (offset != file->next_seq_offset) { 1985 file->seq_byte_count = 0; 1986 } 1987 file->seq_byte_count += length; 1988 file->next_seq_offset = offset + length; 1989 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 1990 check_readahead(file, offset); 1991 check_readahead(file, offset + CACHE_BUFFER_SIZE); 1992 } 1993 1994 final_length = 0; 1995 final_offset = offset + length; 1996 while (offset < final_offset) { 1997 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 1998 if (length > (final_offset - offset)) { 1999 length = final_offset - offset; 2000 } 2001 rc = __file_read(file, payload, offset, length, &channel->sem); 2002 if (rc == 0) { 2003 final_length += length; 2004 } else { 2005 break; 2006 } 2007 payload += length; 2008 offset += length; 2009 sub_reads++; 2010 } 2011 pthread_spin_unlock(&file->lock); 2012 while (sub_reads-- > 0) { 2013 sem_wait(&channel->sem); 2014 } 2015 if (rc == 0) { 2016 return final_length; 2017 } else { 2018 return rc; 2019 } 2020 } 2021 2022 static void 2023 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2024 spdk_file_op_complete cb_fn, void *cb_arg) 2025 { 2026 struct spdk_fs_request *sync_req; 2027 struct spdk_fs_request *flush_req; 2028 struct spdk_fs_cb_args *sync_args; 2029 struct spdk_fs_cb_args *flush_args; 2030 2031 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2032 2033 pthread_spin_lock(&file->lock); 2034 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2035 BLOBFS_TRACE(file, "done - no data to flush\n"); 2036 pthread_spin_unlock(&file->lock); 2037 cb_fn(cb_arg, 0); 2038 return; 2039 } 2040 2041 sync_req = alloc_fs_request(channel); 2042 assert(sync_req != NULL); 2043 sync_args = &sync_req->args; 2044 2045 flush_req = alloc_fs_request(channel); 2046 assert(flush_req != NULL); 2047 flush_args = &flush_req->args; 2048 2049 sync_args->file = file; 2050 sync_args->fn.file_op = cb_fn; 2051 sync_args->arg = cb_arg; 2052 sync_args->op.sync.offset = file->append_pos; 2053 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2054 pthread_spin_unlock(&file->lock); 2055 2056 flush_args->file = file; 2057 channel->send_request(__file_flush, flush_args); 2058 } 2059 2060 int 2061 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2062 { 2063 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2064 2065 _file_sync(file, channel, __sem_post, &channel->sem); 2066 sem_wait(&channel->sem); 2067 2068 return 0; 2069 } 2070 2071 void 2072 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2073 spdk_file_op_complete cb_fn, void *cb_arg) 2074 { 2075 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2076 2077 _file_sync(file, channel, cb_fn, cb_arg); 2078 } 2079 2080 void 2081 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2082 { 2083 BLOBFS_TRACE(file, "priority=%u\n", priority); 2084 file->priority = priority; 2085 2086 } 2087 2088 /* 2089 * Close routines 2090 */ 2091 2092 static void 2093 __file_close_async_done(void *ctx, int bserrno) 2094 { 2095 struct spdk_fs_request *req = ctx; 2096 struct spdk_fs_cb_args *args = &req->args; 2097 2098 args->fn.file_op(args->arg, bserrno); 2099 free_fs_request(req); 2100 } 2101 2102 static void 2103 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2104 { 2105 pthread_spin_lock(&file->lock); 2106 if (file->ref_count == 0) { 2107 pthread_spin_unlock(&file->lock); 2108 __file_close_async_done(req, -EBADF); 2109 return; 2110 } 2111 2112 file->ref_count--; 2113 if (file->ref_count > 0) { 2114 pthread_spin_unlock(&file->lock); 2115 __file_close_async_done(req, 0); 2116 return; 2117 } 2118 2119 pthread_spin_unlock(&file->lock); 2120 2121 spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req); 2122 } 2123 2124 static void 2125 __file_close_async__sync_done(void *arg, int fserrno) 2126 { 2127 struct spdk_fs_request *req = arg; 2128 struct spdk_fs_cb_args *args = &req->args; 2129 2130 __file_close_async(args->file, req); 2131 } 2132 2133 void 2134 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2135 { 2136 struct spdk_fs_request *req; 2137 struct spdk_fs_cb_args *args; 2138 2139 req = alloc_fs_request(file->fs->md_fs_channel); 2140 if (req == NULL) { 2141 cb_fn(cb_arg, -ENOMEM); 2142 return; 2143 } 2144 2145 args = &req->args; 2146 args->file = file; 2147 args->fn.file_op = cb_fn; 2148 args->arg = cb_arg; 2149 2150 spdk_file_sync_async(file, file->fs->md_io_channel, __file_close_async__sync_done, req); 2151 } 2152 2153 static void 2154 __file_close_done(void *arg, int fserrno) 2155 { 2156 struct spdk_fs_cb_args *args = arg; 2157 2158 args->rc = fserrno; 2159 sem_post(args->sem); 2160 } 2161 2162 static void 2163 __file_close(void *arg) 2164 { 2165 struct spdk_fs_request *req = arg; 2166 struct spdk_fs_cb_args *args = &req->args; 2167 struct spdk_file *file = args->file; 2168 2169 __file_close_async(file, req); 2170 } 2171 2172 int 2173 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2174 { 2175 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2176 struct spdk_fs_request *req; 2177 struct spdk_fs_cb_args *args; 2178 2179 req = alloc_fs_request(channel); 2180 assert(req != NULL); 2181 2182 args = &req->args; 2183 2184 spdk_file_sync(file, _channel); 2185 BLOBFS_TRACE(file, "name=%s\n", file->name); 2186 args->file = file; 2187 args->sem = &channel->sem; 2188 args->fn.file_op = __file_close_done; 2189 args->arg = req; 2190 channel->send_request(__file_close, req); 2191 sem_wait(&channel->sem); 2192 2193 return args->rc; 2194 } 2195 2196 static void 2197 cache_free_buffers(struct spdk_file *file) 2198 { 2199 BLOBFS_TRACE(file, "free=%s\n", file->name); 2200 pthread_spin_lock(&file->lock); 2201 pthread_spin_lock(&g_caches_lock); 2202 if (file->tree->present_mask == 0) { 2203 pthread_spin_unlock(&g_caches_lock); 2204 pthread_spin_unlock(&file->lock); 2205 return; 2206 } 2207 spdk_tree_free_buffers(file->tree); 2208 if (file->tree->present_mask == 0) { 2209 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2210 } 2211 file->last = NULL; 2212 pthread_spin_unlock(&g_caches_lock); 2213 pthread_spin_unlock(&file->lock); 2214 } 2215 2216 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS); 2217 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW); 2218