1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "blobfs_internal.h" 38 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/assert.h" 42 #include "spdk/env.h" 43 #include "spdk/util.h" 44 #include "spdk_internal/log.h" 45 46 #define BLOBFS_TRACE(file, str, args...) \ 47 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args) 48 49 #define BLOBFS_TRACE_RW(file, str, args...) \ 50 SPDK_TRACELOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args) 51 52 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 53 54 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 55 static struct spdk_mempool *g_cache_pool; 56 static TAILQ_HEAD(, spdk_file) g_caches; 57 static int g_fs_count = 0; 58 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 59 static pthread_spinlock_t g_caches_lock; 60 61 static void 62 __sem_post(void *arg, int bserrno) 63 { 64 sem_t *sem = arg; 65 66 sem_post(sem); 67 } 68 69 void 70 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 71 { 72 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 73 free(cache_buffer); 74 } 75 76 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 77 78 struct spdk_file { 79 struct spdk_filesystem *fs; 80 struct spdk_blob *blob; 81 char *name; 82 uint64_t length; 83 bool open_for_writing; 84 uint64_t length_flushed; 85 uint64_t append_pos; 86 uint64_t seq_byte_count; 87 uint64_t next_seq_offset; 88 uint32_t priority; 89 TAILQ_ENTRY(spdk_file) tailq; 90 spdk_blob_id blobid; 91 uint32_t ref_count; 92 pthread_spinlock_t lock; 93 struct cache_buffer *last; 94 struct cache_tree *tree; 95 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 96 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 97 TAILQ_ENTRY(spdk_file) cache_tailq; 98 }; 99 100 struct spdk_filesystem { 101 struct spdk_blob_store *bs; 102 TAILQ_HEAD(, spdk_file) files; 103 struct spdk_bs_opts bs_opts; 104 struct spdk_bs_dev *bdev; 105 fs_send_request_fn send_request; 106 107 struct { 108 uint32_t max_ops; 109 struct spdk_io_channel *sync_io_channel; 110 struct spdk_fs_channel *sync_fs_channel; 111 } sync_target; 112 113 struct { 114 uint32_t max_ops; 115 struct spdk_io_channel *md_io_channel; 116 struct spdk_fs_channel *md_fs_channel; 117 } md_target; 118 119 struct { 120 uint32_t max_ops; 121 } io_target; 122 }; 123 124 struct spdk_fs_cb_args { 125 union { 126 spdk_fs_op_with_handle_complete fs_op_with_handle; 127 spdk_fs_op_complete fs_op; 128 spdk_file_op_with_handle_complete file_op_with_handle; 129 spdk_file_op_complete file_op; 130 spdk_file_stat_op_complete stat_op; 131 } fn; 132 void *arg; 133 sem_t *sem; 134 struct spdk_filesystem *fs; 135 struct spdk_file *file; 136 int rc; 137 bool from_request; 138 union { 139 struct { 140 uint64_t length; 141 } truncate; 142 struct { 143 struct spdk_io_channel *channel; 144 void *user_buf; 145 void *pin_buf; 146 int is_read; 147 off_t offset; 148 size_t length; 149 uint64_t start_page; 150 uint64_t num_pages; 151 uint32_t blocklen; 152 } rw; 153 struct { 154 const char *old_name; 155 const char *new_name; 156 } rename; 157 struct { 158 struct cache_buffer *cache_buffer; 159 uint64_t length; 160 } flush; 161 struct { 162 struct cache_buffer *cache_buffer; 163 uint64_t length; 164 uint64_t offset; 165 } readahead; 166 struct { 167 uint64_t offset; 168 TAILQ_ENTRY(spdk_fs_request) tailq; 169 } sync; 170 struct { 171 uint32_t num_clusters; 172 } resize; 173 struct { 174 const char *name; 175 uint32_t flags; 176 TAILQ_ENTRY(spdk_fs_request) tailq; 177 } open; 178 struct { 179 const char *name; 180 } create; 181 struct { 182 const char *name; 183 } delete; 184 struct { 185 const char *name; 186 } stat; 187 } op; 188 }; 189 190 static void cache_free_buffers(struct spdk_file *file); 191 192 static void 193 __initialize_cache(void) 194 { 195 assert(g_cache_pool == NULL); 196 197 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 198 g_fs_cache_size / CACHE_BUFFER_SIZE, 199 CACHE_BUFFER_SIZE, -1, SPDK_ENV_SOCKET_ID_ANY); 200 TAILQ_INIT(&g_caches); 201 pthread_spin_init(&g_caches_lock, 0); 202 } 203 204 static void 205 __free_cache(void) 206 { 207 assert(g_cache_pool != NULL); 208 209 spdk_mempool_free(g_cache_pool); 210 g_cache_pool = NULL; 211 } 212 213 static uint64_t 214 __file_get_blob_size(struct spdk_file *file) 215 { 216 uint64_t cluster_sz; 217 218 cluster_sz = file->fs->bs_opts.cluster_sz; 219 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 220 } 221 222 struct spdk_fs_request { 223 struct spdk_fs_cb_args args; 224 TAILQ_ENTRY(spdk_fs_request) link; 225 struct spdk_fs_channel *channel; 226 }; 227 228 struct spdk_fs_channel { 229 struct spdk_fs_request *req_mem; 230 TAILQ_HEAD(, spdk_fs_request) reqs; 231 sem_t sem; 232 struct spdk_filesystem *fs; 233 struct spdk_io_channel *bs_channel; 234 fs_send_request_fn send_request; 235 }; 236 237 static struct spdk_fs_request * 238 alloc_fs_request(struct spdk_fs_channel *channel) 239 { 240 struct spdk_fs_request *req; 241 242 req = TAILQ_FIRST(&channel->reqs); 243 if (!req) { 244 return NULL; 245 } 246 TAILQ_REMOVE(&channel->reqs, req, link); 247 memset(req, 0, sizeof(*req)); 248 req->channel = channel; 249 req->args.from_request = true; 250 251 return req; 252 } 253 254 static void 255 free_fs_request(struct spdk_fs_request *req) 256 { 257 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 258 } 259 260 static int 261 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 262 uint32_t max_ops) 263 { 264 uint32_t i; 265 266 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 267 if (!channel->req_mem) { 268 return -1; 269 } 270 271 TAILQ_INIT(&channel->reqs); 272 sem_init(&channel->sem, 0, 0); 273 274 for (i = 0; i < max_ops; i++) { 275 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 276 } 277 278 channel->fs = fs; 279 280 return 0; 281 } 282 283 static int 284 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 285 { 286 struct spdk_filesystem *fs; 287 struct spdk_fs_channel *channel = ctx_buf; 288 289 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 290 291 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 292 } 293 294 static int 295 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 296 { 297 struct spdk_filesystem *fs; 298 struct spdk_fs_channel *channel = ctx_buf; 299 300 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 301 302 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 303 } 304 305 static int 306 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 307 { 308 struct spdk_filesystem *fs; 309 struct spdk_fs_channel *channel = ctx_buf; 310 311 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 312 313 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 314 } 315 316 static void 317 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 318 { 319 struct spdk_fs_channel *channel = ctx_buf; 320 321 free(channel->req_mem); 322 if (channel->bs_channel != NULL) { 323 spdk_bs_free_io_channel(channel->bs_channel); 324 } 325 } 326 327 static void 328 __send_request_direct(fs_request_fn fn, void *arg) 329 { 330 fn(arg); 331 } 332 333 static void 334 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 335 { 336 fs->bs = bs; 337 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 338 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 339 fs->md_target.md_fs_channel->send_request = __send_request_direct; 340 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 341 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 342 343 pthread_mutex_lock(&g_cache_init_lock); 344 if (g_fs_count == 0) { 345 __initialize_cache(); 346 } 347 g_fs_count++; 348 pthread_mutex_unlock(&g_cache_init_lock); 349 } 350 351 static void 352 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 353 { 354 struct spdk_fs_request *req = ctx; 355 struct spdk_fs_cb_args *args = &req->args; 356 struct spdk_filesystem *fs = args->fs; 357 358 if (bserrno == 0) { 359 common_fs_bs_init(fs, bs); 360 } else { 361 free(fs); 362 fs = NULL; 363 } 364 365 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 366 free_fs_request(req); 367 } 368 369 static struct spdk_filesystem * 370 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 371 { 372 struct spdk_filesystem *fs; 373 374 fs = calloc(1, sizeof(*fs)); 375 if (fs == NULL) { 376 return NULL; 377 } 378 379 fs->bdev = dev; 380 fs->send_request = send_request_fn; 381 TAILQ_INIT(&fs->files); 382 383 fs->md_target.max_ops = 512; 384 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 385 sizeof(struct spdk_fs_channel)); 386 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 387 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 388 389 fs->sync_target.max_ops = 512; 390 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 391 sizeof(struct spdk_fs_channel)); 392 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 393 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 394 395 fs->io_target.max_ops = 512; 396 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 397 sizeof(struct spdk_fs_channel)); 398 399 return fs; 400 } 401 402 void 403 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 404 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 405 { 406 struct spdk_filesystem *fs; 407 struct spdk_fs_request *req; 408 struct spdk_fs_cb_args *args; 409 410 fs = fs_alloc(dev, send_request_fn); 411 if (fs == NULL) { 412 cb_fn(cb_arg, NULL, -ENOMEM); 413 return; 414 } 415 416 req = alloc_fs_request(fs->md_target.md_fs_channel); 417 if (req == NULL) { 418 cb_fn(cb_arg, NULL, -ENOMEM); 419 return; 420 } 421 422 args = &req->args; 423 args->fn.fs_op_with_handle = cb_fn; 424 args->arg = cb_arg; 425 args->fs = fs; 426 427 spdk_bs_init(dev, NULL, init_cb, req); 428 } 429 430 static struct spdk_file * 431 file_alloc(struct spdk_filesystem *fs) 432 { 433 struct spdk_file *file; 434 435 file = calloc(1, sizeof(*file)); 436 if (file == NULL) { 437 return NULL; 438 } 439 440 file->fs = fs; 441 TAILQ_INIT(&file->open_requests); 442 TAILQ_INIT(&file->sync_requests); 443 pthread_spin_init(&file->lock, 0); 444 file->tree = calloc(1, sizeof(*file->tree)); 445 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 446 file->priority = SPDK_FILE_PRIORITY_LOW; 447 return file; 448 } 449 450 static void 451 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 452 { 453 struct spdk_fs_request *req = ctx; 454 struct spdk_fs_cb_args *args = &req->args; 455 struct spdk_filesystem *fs = args->fs; 456 struct spdk_file *f; 457 uint64_t *length; 458 const char *name; 459 size_t value_len; 460 461 if (rc == -ENOENT) { 462 /* Finished iterating */ 463 args->fn.fs_op_with_handle(args->arg, fs, 0); 464 free_fs_request(req); 465 return; 466 } else if (rc < 0) { 467 args->fn.fs_op_with_handle(args->arg, fs, rc); 468 free_fs_request(req); 469 return; 470 } 471 472 rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len); 473 if (rc < 0) { 474 args->fn.fs_op_with_handle(args->arg, fs, rc); 475 free_fs_request(req); 476 return; 477 } 478 479 rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len); 480 if (rc < 0) { 481 args->fn.fs_op_with_handle(args->arg, fs, rc); 482 free_fs_request(req); 483 return; 484 } 485 assert(value_len == 8); 486 487 f = file_alloc(fs); 488 if (f == NULL) { 489 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 490 free_fs_request(req); 491 return; 492 } 493 494 f->name = strdup(name); 495 f->blobid = spdk_blob_get_id(blob); 496 f->length = *length; 497 f->length_flushed = *length; 498 f->append_pos = *length; 499 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 500 501 spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req); 502 } 503 504 static void 505 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 506 { 507 struct spdk_fs_request *req = ctx; 508 struct spdk_fs_cb_args *args = &req->args; 509 struct spdk_filesystem *fs = args->fs; 510 511 if (bserrno != 0) { 512 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 513 free_fs_request(req); 514 free(fs); 515 return; 516 } 517 518 common_fs_bs_init(fs, bs); 519 spdk_bs_md_iter_first(fs->bs, iter_cb, req); 520 } 521 522 void 523 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 524 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 525 { 526 struct spdk_filesystem *fs; 527 struct spdk_fs_cb_args *args; 528 struct spdk_fs_request *req; 529 530 fs = fs_alloc(dev, send_request_fn); 531 if (fs == NULL) { 532 cb_fn(cb_arg, NULL, -ENOMEM); 533 return; 534 } 535 536 req = alloc_fs_request(fs->md_target.md_fs_channel); 537 if (req == NULL) { 538 cb_fn(cb_arg, NULL, -ENOMEM); 539 return; 540 } 541 542 args = &req->args; 543 args->fn.fs_op_with_handle = cb_fn; 544 args->arg = cb_arg; 545 args->fs = fs; 546 547 spdk_bs_load(dev, load_cb, req); 548 } 549 550 static void 551 unload_cb(void *ctx, int bserrno) 552 { 553 struct spdk_fs_request *req = ctx; 554 struct spdk_fs_cb_args *args = &req->args; 555 struct spdk_filesystem *fs = args->fs; 556 557 pthread_mutex_lock(&g_cache_init_lock); 558 g_fs_count--; 559 if (g_fs_count == 0) { 560 __free_cache(); 561 } 562 pthread_mutex_unlock(&g_cache_init_lock); 563 564 args->fn.fs_op(args->arg, bserrno); 565 free(req); 566 567 spdk_io_device_unregister(&fs->io_target); 568 spdk_io_device_unregister(&fs->sync_target); 569 spdk_io_device_unregister(&fs->md_target); 570 571 free(fs); 572 } 573 574 void 575 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 576 { 577 struct spdk_fs_request *req; 578 struct spdk_fs_cb_args *args; 579 580 /* 581 * We must free the md_channel before unloading the blobstore, so just 582 * allocate this request from the general heap. 583 */ 584 req = calloc(1, sizeof(*req)); 585 if (req == NULL) { 586 cb_fn(cb_arg, -ENOMEM); 587 return; 588 } 589 590 args = &req->args; 591 args->fn.fs_op = cb_fn; 592 args->arg = cb_arg; 593 args->fs = fs; 594 595 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 596 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 597 spdk_bs_unload(fs->bs, unload_cb, req); 598 } 599 600 static struct spdk_file * 601 fs_find_file(struct spdk_filesystem *fs, const char *name) 602 { 603 struct spdk_file *file; 604 605 TAILQ_FOREACH(file, &fs->files, tailq) { 606 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 607 return file; 608 } 609 } 610 611 return NULL; 612 } 613 614 void 615 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 616 spdk_file_stat_op_complete cb_fn, void *cb_arg) 617 { 618 struct spdk_file_stat stat; 619 struct spdk_file *f = NULL; 620 621 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 622 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 623 return; 624 } 625 626 f = fs_find_file(fs, name); 627 if (f != NULL) { 628 stat.blobid = f->blobid; 629 stat.size = f->length; 630 cb_fn(cb_arg, &stat, 0); 631 return; 632 } 633 634 cb_fn(cb_arg, NULL, -ENOENT); 635 } 636 637 static void 638 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 639 { 640 struct spdk_fs_request *req = arg; 641 struct spdk_fs_cb_args *args = &req->args; 642 643 args->rc = fserrno; 644 if (fserrno == 0) { 645 memcpy(args->arg, stat, sizeof(*stat)); 646 } 647 sem_post(args->sem); 648 } 649 650 static void 651 __file_stat(void *arg) 652 { 653 struct spdk_fs_request *req = arg; 654 struct spdk_fs_cb_args *args = &req->args; 655 656 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 657 args->fn.stat_op, req); 658 } 659 660 int 661 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 662 const char *name, struct spdk_file_stat *stat) 663 { 664 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 665 struct spdk_fs_request *req; 666 int rc; 667 668 req = alloc_fs_request(channel); 669 assert(req != NULL); 670 671 req->args.fs = fs; 672 req->args.op.stat.name = name; 673 req->args.fn.stat_op = __copy_stat; 674 req->args.arg = stat; 675 req->args.sem = &channel->sem; 676 channel->send_request(__file_stat, req); 677 sem_wait(&channel->sem); 678 679 rc = req->args.rc; 680 free_fs_request(req); 681 682 return rc; 683 } 684 685 static void 686 fs_create_blob_close_cb(void *ctx, int bserrno) 687 { 688 struct spdk_fs_request *req = ctx; 689 struct spdk_fs_cb_args *args = &req->args; 690 691 args->fn.file_op(args->arg, bserrno); 692 free_fs_request(req); 693 } 694 695 static void 696 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 697 { 698 struct spdk_fs_request *req = ctx; 699 struct spdk_fs_cb_args *args = &req->args; 700 struct spdk_file *f = args->file; 701 uint64_t length = 0; 702 703 f->blob = blob; 704 spdk_bs_md_resize_blob(blob, 1); 705 spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 706 spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length)); 707 708 spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args); 709 } 710 711 static void 712 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 713 { 714 struct spdk_fs_request *req = ctx; 715 struct spdk_fs_cb_args *args = &req->args; 716 struct spdk_file *f = args->file; 717 718 f->blobid = blobid; 719 spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 720 } 721 722 void 723 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 724 spdk_file_op_complete cb_fn, void *cb_arg) 725 { 726 struct spdk_file *file; 727 struct spdk_fs_request *req; 728 struct spdk_fs_cb_args *args; 729 730 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 731 cb_fn(cb_arg, -ENAMETOOLONG); 732 return; 733 } 734 735 file = fs_find_file(fs, name); 736 if (file != NULL) { 737 cb_fn(cb_arg, -EEXIST); 738 return; 739 } 740 741 file = file_alloc(fs); 742 if (file == NULL) { 743 cb_fn(cb_arg, -ENOMEM); 744 return; 745 } 746 747 req = alloc_fs_request(fs->md_target.md_fs_channel); 748 if (req == NULL) { 749 cb_fn(cb_arg, -ENOMEM); 750 return; 751 } 752 753 args = &req->args; 754 args->file = file; 755 args->fn.file_op = cb_fn; 756 args->arg = cb_arg; 757 758 file->name = strdup(name); 759 spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args); 760 } 761 762 static void 763 __fs_create_file_done(void *arg, int fserrno) 764 { 765 struct spdk_fs_request *req = arg; 766 struct spdk_fs_cb_args *args = &req->args; 767 768 args->rc = fserrno; 769 sem_post(args->sem); 770 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 771 } 772 773 static void 774 __fs_create_file(void *arg) 775 { 776 struct spdk_fs_request *req = arg; 777 struct spdk_fs_cb_args *args = &req->args; 778 779 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 780 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 781 } 782 783 int 784 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 785 { 786 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 787 struct spdk_fs_request *req; 788 struct spdk_fs_cb_args *args; 789 int rc; 790 791 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 792 793 req = alloc_fs_request(channel); 794 assert(req != NULL); 795 796 args = &req->args; 797 args->fs = fs; 798 args->op.create.name = name; 799 args->sem = &channel->sem; 800 fs->send_request(__fs_create_file, req); 801 sem_wait(&channel->sem); 802 rc = args->rc; 803 free_fs_request(req); 804 805 return rc; 806 } 807 808 static void 809 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 810 { 811 struct spdk_fs_request *req = ctx; 812 struct spdk_fs_cb_args *args = &req->args; 813 struct spdk_file *f = args->file; 814 815 f->blob = blob; 816 while (!TAILQ_EMPTY(&f->open_requests)) { 817 req = TAILQ_FIRST(&f->open_requests); 818 args = &req->args; 819 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 820 args->fn.file_op_with_handle(args->arg, f, bserrno); 821 free_fs_request(req); 822 } 823 } 824 825 static void 826 fs_open_blob_create_cb(void *ctx, int bserrno) 827 { 828 struct spdk_fs_request *req = ctx; 829 struct spdk_fs_cb_args *args = &req->args; 830 struct spdk_file *file = args->file; 831 struct spdk_filesystem *fs = args->fs; 832 833 if (file == NULL) { 834 /* 835 * This is from an open with CREATE flag - the file 836 * is now created so look it up in the file list for this 837 * filesystem. 838 */ 839 file = fs_find_file(fs, args->op.open.name); 840 assert(file != NULL); 841 args->file = file; 842 } 843 844 file->ref_count++; 845 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 846 if (file->ref_count == 1) { 847 assert(file->blob == NULL); 848 spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 849 } else if (file->blob != NULL) { 850 fs_open_blob_done(req, file->blob, 0); 851 } else { 852 /* 853 * The blob open for this file is in progress due to a previous 854 * open request. When that open completes, it will invoke the 855 * open callback for this request. 856 */ 857 } 858 } 859 860 void 861 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 862 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 863 { 864 struct spdk_file *f = NULL; 865 struct spdk_fs_request *req; 866 struct spdk_fs_cb_args *args; 867 868 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 869 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 870 return; 871 } 872 873 f = fs_find_file(fs, name); 874 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 875 cb_fn(cb_arg, NULL, -ENOENT); 876 return; 877 } 878 879 req = alloc_fs_request(fs->md_target.md_fs_channel); 880 if (req == NULL) { 881 cb_fn(cb_arg, NULL, -ENOMEM); 882 return; 883 } 884 885 args = &req->args; 886 args->fn.file_op_with_handle = cb_fn; 887 args->arg = cb_arg; 888 args->file = f; 889 args->fs = fs; 890 args->op.open.name = name; 891 892 if (f == NULL) { 893 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 894 } else { 895 fs_open_blob_create_cb(req, 0); 896 } 897 } 898 899 static void 900 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 901 { 902 struct spdk_fs_request *req = arg; 903 struct spdk_fs_cb_args *args = &req->args; 904 905 args->file = file; 906 args->rc = bserrno; 907 sem_post(args->sem); 908 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 909 } 910 911 static void 912 __fs_open_file(void *arg) 913 { 914 struct spdk_fs_request *req = arg; 915 struct spdk_fs_cb_args *args = &req->args; 916 917 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 918 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 919 __fs_open_file_done, req); 920 } 921 922 int 923 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 924 const char *name, uint32_t flags, struct spdk_file **file) 925 { 926 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 927 struct spdk_fs_request *req; 928 struct spdk_fs_cb_args *args; 929 int rc; 930 931 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 932 933 req = alloc_fs_request(channel); 934 assert(req != NULL); 935 936 args = &req->args; 937 args->fs = fs; 938 args->op.open.name = name; 939 args->op.open.flags = flags; 940 args->sem = &channel->sem; 941 fs->send_request(__fs_open_file, req); 942 sem_wait(&channel->sem); 943 rc = args->rc; 944 if (rc == 0) { 945 *file = args->file; 946 } else { 947 *file = NULL; 948 } 949 free_fs_request(req); 950 951 return rc; 952 } 953 954 static void 955 fs_rename_blob_close_cb(void *ctx, int bserrno) 956 { 957 struct spdk_fs_request *req = ctx; 958 struct spdk_fs_cb_args *args = &req->args; 959 960 args->fn.fs_op(args->arg, bserrno); 961 free_fs_request(req); 962 } 963 964 static void 965 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 966 { 967 struct spdk_fs_request *req = ctx; 968 struct spdk_fs_cb_args *args = &req->args; 969 struct spdk_file *f = args->file; 970 const char *new_name = args->op.rename.new_name; 971 972 f->blob = blob; 973 spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 974 spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req); 975 } 976 977 static void 978 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 979 { 980 struct spdk_fs_cb_args *args = &req->args; 981 struct spdk_file *f; 982 983 f = fs_find_file(args->fs, args->op.rename.old_name); 984 if (f == NULL) { 985 args->fn.fs_op(args->arg, -ENOENT); 986 free_fs_request(req); 987 return; 988 } 989 990 free(f->name); 991 f->name = strdup(args->op.rename.new_name); 992 args->file = f; 993 spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 994 } 995 996 static void 997 fs_rename_delete_done(void *arg, int fserrno) 998 { 999 __spdk_fs_md_rename_file(arg); 1000 } 1001 1002 void 1003 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1004 const char *old_name, const char *new_name, 1005 spdk_file_op_complete cb_fn, void *cb_arg) 1006 { 1007 struct spdk_file *f; 1008 struct spdk_fs_request *req; 1009 struct spdk_fs_cb_args *args; 1010 1011 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1012 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1013 cb_fn(cb_arg, -ENAMETOOLONG); 1014 return; 1015 } 1016 1017 req = alloc_fs_request(fs->md_target.md_fs_channel); 1018 if (req == NULL) { 1019 cb_fn(cb_arg, -ENOMEM); 1020 return; 1021 } 1022 1023 args = &req->args; 1024 args->fn.fs_op = cb_fn; 1025 args->fs = fs; 1026 args->arg = cb_arg; 1027 args->op.rename.old_name = old_name; 1028 args->op.rename.new_name = new_name; 1029 1030 f = fs_find_file(fs, new_name); 1031 if (f == NULL) { 1032 __spdk_fs_md_rename_file(req); 1033 return; 1034 } 1035 1036 /* 1037 * The rename overwrites an existing file. So delete the existing file, then 1038 * do the actual rename. 1039 */ 1040 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1041 } 1042 1043 static void 1044 __fs_rename_file_done(void *arg, int fserrno) 1045 { 1046 struct spdk_fs_request *req = arg; 1047 struct spdk_fs_cb_args *args = &req->args; 1048 1049 args->rc = fserrno; 1050 sem_post(args->sem); 1051 } 1052 1053 static void 1054 __fs_rename_file(void *arg) 1055 { 1056 struct spdk_fs_request *req = arg; 1057 struct spdk_fs_cb_args *args = &req->args; 1058 1059 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1060 __fs_rename_file_done, req); 1061 } 1062 1063 int 1064 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1065 const char *old_name, const char *new_name) 1066 { 1067 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1068 struct spdk_fs_request *req; 1069 struct spdk_fs_cb_args *args; 1070 int rc; 1071 1072 req = alloc_fs_request(channel); 1073 assert(req != NULL); 1074 1075 args = &req->args; 1076 1077 args->fs = fs; 1078 args->op.rename.old_name = old_name; 1079 args->op.rename.new_name = new_name; 1080 args->sem = &channel->sem; 1081 fs->send_request(__fs_rename_file, req); 1082 sem_wait(&channel->sem); 1083 rc = args->rc; 1084 free_fs_request(req); 1085 return rc; 1086 } 1087 1088 static void 1089 blob_delete_cb(void *ctx, int bserrno) 1090 { 1091 struct spdk_fs_request *req = ctx; 1092 struct spdk_fs_cb_args *args = &req->args; 1093 1094 args->fn.file_op(args->arg, bserrno); 1095 free_fs_request(req); 1096 } 1097 1098 void 1099 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1100 spdk_file_op_complete cb_fn, void *cb_arg) 1101 { 1102 struct spdk_file *f; 1103 spdk_blob_id blobid; 1104 struct spdk_fs_request *req; 1105 struct spdk_fs_cb_args *args; 1106 1107 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1108 1109 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1110 cb_fn(cb_arg, -ENAMETOOLONG); 1111 return; 1112 } 1113 1114 f = fs_find_file(fs, name); 1115 if (f == NULL) { 1116 cb_fn(cb_arg, -ENOENT); 1117 return; 1118 } 1119 1120 if (f->ref_count > 0) { 1121 /* For now, do not allow deleting files with open references. */ 1122 cb_fn(cb_arg, -EBUSY); 1123 return; 1124 } 1125 1126 req = alloc_fs_request(fs->md_target.md_fs_channel); 1127 if (req == NULL) { 1128 cb_fn(cb_arg, -ENOMEM); 1129 return; 1130 } 1131 1132 TAILQ_REMOVE(&fs->files, f, tailq); 1133 1134 cache_free_buffers(f); 1135 1136 blobid = f->blobid; 1137 1138 free(f->name); 1139 free(f->tree); 1140 free(f); 1141 1142 args = &req->args; 1143 args->fn.file_op = cb_fn; 1144 args->arg = cb_arg; 1145 spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1146 } 1147 1148 static void 1149 __fs_delete_file_done(void *arg, int fserrno) 1150 { 1151 struct spdk_fs_request *req = arg; 1152 struct spdk_fs_cb_args *args = &req->args; 1153 1154 args->rc = fserrno; 1155 sem_post(args->sem); 1156 } 1157 1158 static void 1159 __fs_delete_file(void *arg) 1160 { 1161 struct spdk_fs_request *req = arg; 1162 struct spdk_fs_cb_args *args = &req->args; 1163 1164 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1165 } 1166 1167 int 1168 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1169 const char *name) 1170 { 1171 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1172 struct spdk_fs_request *req; 1173 struct spdk_fs_cb_args *args; 1174 int rc; 1175 1176 req = alloc_fs_request(channel); 1177 assert(req != NULL); 1178 1179 args = &req->args; 1180 args->fs = fs; 1181 args->op.delete.name = name; 1182 args->sem = &channel->sem; 1183 fs->send_request(__fs_delete_file, req); 1184 sem_wait(&channel->sem); 1185 rc = args->rc; 1186 free_fs_request(req); 1187 1188 return rc; 1189 } 1190 1191 spdk_fs_iter 1192 spdk_fs_iter_first(struct spdk_filesystem *fs) 1193 { 1194 struct spdk_file *f; 1195 1196 f = TAILQ_FIRST(&fs->files); 1197 return f; 1198 } 1199 1200 spdk_fs_iter 1201 spdk_fs_iter_next(spdk_fs_iter iter) 1202 { 1203 struct spdk_file *f = iter; 1204 1205 if (f == NULL) { 1206 return NULL; 1207 } 1208 1209 f = TAILQ_NEXT(f, tailq); 1210 return f; 1211 } 1212 1213 const char * 1214 spdk_file_get_name(struct spdk_file *file) 1215 { 1216 return file->name; 1217 } 1218 1219 uint64_t 1220 spdk_file_get_length(struct spdk_file *file) 1221 { 1222 assert(file != NULL); 1223 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1224 return file->length; 1225 } 1226 1227 static void 1228 fs_truncate_complete_cb(void *ctx, int bserrno) 1229 { 1230 struct spdk_fs_request *req = ctx; 1231 struct spdk_fs_cb_args *args = &req->args; 1232 1233 args->fn.file_op(args->arg, bserrno); 1234 free_fs_request(req); 1235 } 1236 1237 static uint64_t 1238 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1239 { 1240 return (length + cluster_sz - 1) / cluster_sz; 1241 } 1242 1243 void 1244 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1245 spdk_file_op_complete cb_fn, void *cb_arg) 1246 { 1247 struct spdk_filesystem *fs; 1248 size_t num_clusters; 1249 struct spdk_fs_request *req; 1250 struct spdk_fs_cb_args *args; 1251 1252 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1253 if (length == file->length) { 1254 cb_fn(cb_arg, 0); 1255 return; 1256 } 1257 1258 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1259 if (req == NULL) { 1260 cb_fn(cb_arg, -ENOMEM); 1261 return; 1262 } 1263 1264 args = &req->args; 1265 args->fn.file_op = cb_fn; 1266 args->arg = cb_arg; 1267 args->file = file; 1268 fs = file->fs; 1269 1270 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1271 1272 spdk_bs_md_resize_blob(file->blob, num_clusters); 1273 spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length)); 1274 1275 file->length = length; 1276 if (file->append_pos > file->length) { 1277 file->append_pos = file->length; 1278 } 1279 1280 spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args); 1281 } 1282 1283 static void 1284 __truncate(void *arg) 1285 { 1286 struct spdk_fs_request *req = arg; 1287 struct spdk_fs_cb_args *args = &req->args; 1288 1289 spdk_file_truncate_async(args->file, args->op.truncate.length, 1290 args->fn.file_op, args->arg); 1291 } 1292 1293 void 1294 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1295 uint64_t length) 1296 { 1297 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1298 struct spdk_fs_request *req; 1299 struct spdk_fs_cb_args *args; 1300 1301 req = alloc_fs_request(channel); 1302 assert(req != NULL); 1303 1304 args = &req->args; 1305 1306 args->file = file; 1307 args->op.truncate.length = length; 1308 args->fn.file_op = __sem_post; 1309 args->arg = &channel->sem; 1310 1311 channel->send_request(__truncate, req); 1312 sem_wait(&channel->sem); 1313 free_fs_request(req); 1314 } 1315 1316 static void 1317 __rw_done(void *ctx, int bserrno) 1318 { 1319 struct spdk_fs_request *req = ctx; 1320 struct spdk_fs_cb_args *args = &req->args; 1321 1322 spdk_dma_free(args->op.rw.pin_buf); 1323 args->fn.file_op(args->arg, bserrno); 1324 free_fs_request(req); 1325 } 1326 1327 static void 1328 __read_done(void *ctx, int bserrno) 1329 { 1330 struct spdk_fs_request *req = ctx; 1331 struct spdk_fs_cb_args *args = &req->args; 1332 1333 if (args->op.rw.is_read) { 1334 memcpy(args->op.rw.user_buf, 1335 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1336 args->op.rw.length); 1337 __rw_done(req, 0); 1338 } else { 1339 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1340 args->op.rw.user_buf, 1341 args->op.rw.length); 1342 spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel, 1343 args->op.rw.pin_buf, 1344 args->op.rw.start_page, args->op.rw.num_pages, 1345 __rw_done, req); 1346 } 1347 } 1348 1349 static void 1350 __do_blob_read(void *ctx, int fserrno) 1351 { 1352 struct spdk_fs_request *req = ctx; 1353 struct spdk_fs_cb_args *args = &req->args; 1354 1355 spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel, 1356 args->op.rw.pin_buf, 1357 args->op.rw.start_page, args->op.rw.num_pages, 1358 __read_done, req); 1359 } 1360 1361 static void 1362 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1363 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1364 { 1365 uint64_t end_page; 1366 1367 *page_size = spdk_bs_get_page_size(file->fs->bs); 1368 *start_page = offset / *page_size; 1369 end_page = (offset + length - 1) / *page_size; 1370 *num_pages = (end_page - *start_page + 1); 1371 } 1372 1373 static void 1374 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1375 void *payload, uint64_t offset, uint64_t length, 1376 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1377 { 1378 struct spdk_fs_request *req; 1379 struct spdk_fs_cb_args *args; 1380 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1381 uint64_t start_page, num_pages, pin_buf_length; 1382 uint32_t page_size; 1383 1384 if (is_read && offset + length > file->length) { 1385 cb_fn(cb_arg, -EINVAL); 1386 return; 1387 } 1388 1389 req = alloc_fs_request(channel); 1390 if (req == NULL) { 1391 cb_fn(cb_arg, -ENOMEM); 1392 return; 1393 } 1394 1395 args = &req->args; 1396 args->fn.file_op = cb_fn; 1397 args->arg = cb_arg; 1398 args->file = file; 1399 args->op.rw.channel = channel->bs_channel; 1400 args->op.rw.user_buf = payload; 1401 args->op.rw.is_read = is_read; 1402 args->op.rw.offset = offset; 1403 args->op.rw.length = length; 1404 1405 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1406 pin_buf_length = num_pages * page_size; 1407 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, 4096, NULL); 1408 1409 args->op.rw.start_page = start_page; 1410 args->op.rw.num_pages = num_pages; 1411 1412 if (!is_read && file->length < offset + length) { 1413 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1414 } else { 1415 __do_blob_read(req, 0); 1416 } 1417 } 1418 1419 void 1420 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1421 void *payload, uint64_t offset, uint64_t length, 1422 spdk_file_op_complete cb_fn, void *cb_arg) 1423 { 1424 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1425 } 1426 1427 void 1428 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1429 void *payload, uint64_t offset, uint64_t length, 1430 spdk_file_op_complete cb_fn, void *cb_arg) 1431 { 1432 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n", 1433 file->name, offset, length); 1434 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1435 } 1436 1437 struct spdk_io_channel * 1438 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1439 { 1440 struct spdk_io_channel *io_channel; 1441 struct spdk_fs_channel *fs_channel; 1442 1443 io_channel = spdk_get_io_channel(&fs->io_target); 1444 fs_channel = spdk_io_channel_get_ctx(io_channel); 1445 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1446 fs_channel->send_request = __send_request_direct; 1447 1448 return io_channel; 1449 } 1450 1451 struct spdk_io_channel * 1452 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs) 1453 { 1454 struct spdk_io_channel *io_channel; 1455 struct spdk_fs_channel *fs_channel; 1456 1457 io_channel = spdk_get_io_channel(&fs->io_target); 1458 fs_channel = spdk_io_channel_get_ctx(io_channel); 1459 fs_channel->send_request = fs->send_request; 1460 1461 return io_channel; 1462 } 1463 1464 void 1465 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1466 { 1467 spdk_put_io_channel(channel); 1468 } 1469 1470 void 1471 spdk_fs_set_cache_size(uint64_t size_in_mb) 1472 { 1473 g_fs_cache_size = size_in_mb * 1024 * 1024; 1474 } 1475 1476 uint64_t 1477 spdk_fs_get_cache_size(void) 1478 { 1479 return g_fs_cache_size / (1024 * 1024); 1480 } 1481 1482 static void __file_flush(void *_args); 1483 1484 static void * 1485 alloc_cache_memory_buffer(struct spdk_file *context) 1486 { 1487 struct spdk_file *file; 1488 void *buf; 1489 1490 buf = spdk_mempool_get(g_cache_pool); 1491 if (buf != NULL) { 1492 return buf; 1493 } 1494 1495 pthread_spin_lock(&g_caches_lock); 1496 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1497 if (!file->open_for_writing && 1498 file->priority == SPDK_FILE_PRIORITY_LOW && 1499 file != context) { 1500 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1501 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1502 break; 1503 } 1504 } 1505 pthread_spin_unlock(&g_caches_lock); 1506 if (file != NULL) { 1507 cache_free_buffers(file); 1508 buf = spdk_mempool_get(g_cache_pool); 1509 if (buf != NULL) { 1510 return buf; 1511 } 1512 } 1513 1514 pthread_spin_lock(&g_caches_lock); 1515 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1516 if (!file->open_for_writing && file != context) { 1517 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1518 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1519 break; 1520 } 1521 } 1522 pthread_spin_unlock(&g_caches_lock); 1523 if (file != NULL) { 1524 cache_free_buffers(file); 1525 buf = spdk_mempool_get(g_cache_pool); 1526 if (buf != NULL) { 1527 return buf; 1528 } 1529 } 1530 1531 pthread_spin_lock(&g_caches_lock); 1532 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1533 if (file != context) { 1534 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1535 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1536 break; 1537 } 1538 } 1539 pthread_spin_unlock(&g_caches_lock); 1540 if (file != NULL) { 1541 cache_free_buffers(file); 1542 buf = spdk_mempool_get(g_cache_pool); 1543 if (buf != NULL) { 1544 return buf; 1545 } 1546 } 1547 1548 assert(false); 1549 return NULL; 1550 } 1551 1552 static struct cache_buffer * 1553 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1554 { 1555 struct cache_buffer *buf; 1556 int count = 0; 1557 1558 buf = calloc(1, sizeof(*buf)); 1559 if (buf == NULL) { 1560 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "calloc failed\n"); 1561 return NULL; 1562 } 1563 1564 buf->buf = alloc_cache_memory_buffer(file); 1565 if (buf->buf == NULL) { 1566 while (buf->buf == NULL) { 1567 /* 1568 * TODO: alloc_cache_memory_buffer() should eventually free 1569 * some buffers. Need a more sophisticated check here, instead 1570 * of just bailing if 100 tries does not result in getting a 1571 * free buffer. This will involve using the sync channel's 1572 * semaphore to block until a buffer becomes available. 1573 */ 1574 if (count++ == 100) { 1575 SPDK_ERRLOG("could not allocate cache buffer\n"); 1576 assert(false); 1577 free(buf); 1578 return NULL; 1579 } 1580 buf->buf = alloc_cache_memory_buffer(file); 1581 } 1582 } 1583 1584 buf->buf_size = CACHE_BUFFER_SIZE; 1585 buf->offset = offset; 1586 1587 pthread_spin_lock(&g_caches_lock); 1588 if (file->tree->present_mask == 0) { 1589 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1590 } 1591 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1592 pthread_spin_unlock(&g_caches_lock); 1593 1594 return buf; 1595 } 1596 1597 static struct cache_buffer * 1598 cache_append_buffer(struct spdk_file *file) 1599 { 1600 struct cache_buffer *last; 1601 1602 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1603 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1604 1605 last = cache_insert_buffer(file, file->append_pos); 1606 if (last == NULL) { 1607 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n"); 1608 return NULL; 1609 } 1610 1611 if (file->last != NULL) { 1612 file->last->next = last; 1613 } 1614 file->last = last; 1615 1616 return last; 1617 } 1618 1619 static void 1620 __wake_caller(struct spdk_fs_cb_args *args) 1621 { 1622 sem_post(args->sem); 1623 } 1624 1625 static void 1626 __file_cache_finish_sync(struct spdk_file *file) 1627 { 1628 struct spdk_fs_request *sync_req; 1629 struct spdk_fs_cb_args *sync_args; 1630 1631 pthread_spin_lock(&file->lock); 1632 while (!TAILQ_EMPTY(&file->sync_requests)) { 1633 sync_req = TAILQ_FIRST(&file->sync_requests); 1634 sync_args = &sync_req->args; 1635 if (sync_args->op.sync.offset > file->length_flushed) { 1636 break; 1637 } 1638 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1639 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1640 pthread_spin_unlock(&file->lock); 1641 sync_args->fn.file_op(sync_args->arg, 0); 1642 pthread_spin_lock(&file->lock); 1643 free_fs_request(sync_req); 1644 } 1645 pthread_spin_unlock(&file->lock); 1646 } 1647 1648 static void 1649 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1650 { 1651 struct spdk_file *file = ctx; 1652 1653 __file_cache_finish_sync(file); 1654 } 1655 1656 static void 1657 __free_args(struct spdk_fs_cb_args *args) 1658 { 1659 struct spdk_fs_request *req; 1660 1661 if (!args->from_request) { 1662 free(args); 1663 } else { 1664 /* Depends on args being at the start of the spdk_fs_request structure. */ 1665 req = (struct spdk_fs_request *)args; 1666 free_fs_request(req); 1667 } 1668 } 1669 1670 static void 1671 __file_flush_done(void *arg, int bserrno) 1672 { 1673 struct spdk_fs_cb_args *args = arg; 1674 struct spdk_fs_request *sync_req; 1675 struct spdk_file *file = args->file; 1676 struct cache_buffer *next = args->op.flush.cache_buffer; 1677 1678 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1679 1680 pthread_spin_lock(&file->lock); 1681 next->in_progress = false; 1682 next->bytes_flushed += args->op.flush.length; 1683 file->length_flushed += args->op.flush.length; 1684 if (file->length_flushed > file->length) { 1685 file->length = file->length_flushed; 1686 } 1687 if (next->bytes_flushed == next->buf_size) { 1688 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1689 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1690 } 1691 1692 TAILQ_FOREACH_REVERSE(sync_req, &file->sync_requests, sync_requests_head, args.op.sync.tailq) { 1693 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1694 break; 1695 } 1696 } 1697 1698 /* 1699 * Assert that there is no cached data that extends past the end of the underlying 1700 * blob. 1701 */ 1702 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1703 next->bytes_filled == 0); 1704 1705 if (sync_req != NULL) { 1706 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1707 spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed, 1708 sizeof(file->length_flushed)); 1709 1710 pthread_spin_unlock(&file->lock); 1711 spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file); 1712 } else { 1713 pthread_spin_unlock(&file->lock); 1714 __file_cache_finish_sync(file); 1715 } 1716 1717 __file_flush(args); 1718 } 1719 1720 static void 1721 __file_flush(void *_args) 1722 { 1723 struct spdk_fs_cb_args *args = _args; 1724 struct spdk_file *file = args->file; 1725 struct cache_buffer *next; 1726 uint64_t offset, length, start_page, num_pages; 1727 uint32_t page_size; 1728 1729 pthread_spin_lock(&file->lock); 1730 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1731 if (next == NULL || next->in_progress) { 1732 /* 1733 * There is either no data to flush, or a flush I/O is already in 1734 * progress. So return immediately - if a flush I/O is in 1735 * progress we will flush more data after that is completed. 1736 */ 1737 __free_args(args); 1738 pthread_spin_unlock(&file->lock); 1739 return; 1740 } 1741 1742 offset = next->offset + next->bytes_flushed; 1743 length = next->bytes_filled - next->bytes_flushed; 1744 if (length == 0) { 1745 __free_args(args); 1746 pthread_spin_unlock(&file->lock); 1747 return; 1748 } 1749 args->op.flush.length = length; 1750 args->op.flush.cache_buffer = next; 1751 1752 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1753 1754 next->in_progress = true; 1755 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1756 offset, length, start_page, num_pages); 1757 pthread_spin_unlock(&file->lock); 1758 spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1759 next->buf + (start_page * page_size) - next->offset, 1760 start_page, num_pages, 1761 __file_flush_done, args); 1762 } 1763 1764 static void 1765 __file_extend_done(void *arg, int bserrno) 1766 { 1767 struct spdk_fs_cb_args *args = arg; 1768 1769 __wake_caller(args); 1770 } 1771 1772 static void 1773 __file_extend_blob(void *_args) 1774 { 1775 struct spdk_fs_cb_args *args = _args; 1776 struct spdk_file *file = args->file; 1777 1778 spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters); 1779 1780 spdk_bs_md_sync_blob(file->blob, __file_extend_done, args); 1781 } 1782 1783 static void 1784 __rw_from_file_done(void *arg, int bserrno) 1785 { 1786 struct spdk_fs_cb_args *args = arg; 1787 1788 __wake_caller(args); 1789 __free_args(args); 1790 } 1791 1792 static void 1793 __rw_from_file(void *_args) 1794 { 1795 struct spdk_fs_cb_args *args = _args; 1796 struct spdk_file *file = args->file; 1797 1798 if (args->op.rw.is_read) { 1799 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1800 args->op.rw.offset, args->op.rw.length, 1801 __rw_from_file_done, args); 1802 } else { 1803 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1804 args->op.rw.offset, args->op.rw.length, 1805 __rw_from_file_done, args); 1806 } 1807 } 1808 1809 static int 1810 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1811 uint64_t offset, uint64_t length, bool is_read) 1812 { 1813 struct spdk_fs_cb_args *args; 1814 1815 args = calloc(1, sizeof(*args)); 1816 if (args == NULL) { 1817 sem_post(sem); 1818 return -ENOMEM; 1819 } 1820 1821 args->file = file; 1822 args->sem = sem; 1823 args->op.rw.user_buf = payload; 1824 args->op.rw.offset = offset; 1825 args->op.rw.length = length; 1826 args->op.rw.is_read = is_read; 1827 file->fs->send_request(__rw_from_file, args); 1828 return 0; 1829 } 1830 1831 int 1832 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 1833 void *payload, uint64_t offset, uint64_t length) 1834 { 1835 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1836 struct spdk_fs_cb_args *args; 1837 uint64_t rem_length, copy, blob_size, cluster_sz; 1838 uint32_t cache_buffers_filled = 0; 1839 uint8_t *cur_payload; 1840 struct cache_buffer *last; 1841 1842 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 1843 1844 if (length == 0) { 1845 return 0; 1846 } 1847 1848 if (offset != file->append_pos) { 1849 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 1850 return -EINVAL; 1851 } 1852 1853 pthread_spin_lock(&file->lock); 1854 file->open_for_writing = true; 1855 1856 if (file->last == NULL) { 1857 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 1858 cache_append_buffer(file); 1859 } else { 1860 int rc; 1861 1862 file->append_pos += length; 1863 rc = __send_rw_from_file(file, &channel->sem, payload, 1864 offset, length, false); 1865 pthread_spin_unlock(&file->lock); 1866 sem_wait(&channel->sem); 1867 return rc; 1868 } 1869 } 1870 1871 blob_size = __file_get_blob_size(file); 1872 1873 if ((offset + length) > blob_size) { 1874 struct spdk_fs_cb_args extend_args = {}; 1875 1876 cluster_sz = file->fs->bs_opts.cluster_sz; 1877 extend_args.sem = &channel->sem; 1878 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 1879 extend_args.file = file; 1880 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 1881 pthread_spin_unlock(&file->lock); 1882 file->fs->send_request(__file_extend_blob, &extend_args); 1883 sem_wait(&channel->sem); 1884 } 1885 1886 last = file->last; 1887 rem_length = length; 1888 cur_payload = payload; 1889 while (rem_length > 0) { 1890 copy = last->buf_size - last->bytes_filled; 1891 if (copy > rem_length) { 1892 copy = rem_length; 1893 } 1894 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 1895 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 1896 file->append_pos += copy; 1897 if (file->length < file->append_pos) { 1898 file->length = file->append_pos; 1899 } 1900 cur_payload += copy; 1901 last->bytes_filled += copy; 1902 rem_length -= copy; 1903 if (last->bytes_filled == last->buf_size) { 1904 cache_buffers_filled++; 1905 last = cache_append_buffer(file); 1906 if (last == NULL) { 1907 BLOBFS_TRACE(file, "nomem\n"); 1908 pthread_spin_unlock(&file->lock); 1909 return -ENOMEM; 1910 } 1911 } 1912 } 1913 1914 if (cache_buffers_filled == 0) { 1915 pthread_spin_unlock(&file->lock); 1916 return 0; 1917 } 1918 1919 args = calloc(1, sizeof(*args)); 1920 if (args == NULL) { 1921 pthread_spin_unlock(&file->lock); 1922 return -ENOMEM; 1923 } 1924 1925 args->file = file; 1926 file->fs->send_request(__file_flush, args); 1927 pthread_spin_unlock(&file->lock); 1928 return 0; 1929 } 1930 1931 static void 1932 __readahead_done(void *arg, int bserrno) 1933 { 1934 struct spdk_fs_cb_args *args = arg; 1935 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 1936 struct spdk_file *file = args->file; 1937 1938 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 1939 1940 pthread_spin_lock(&file->lock); 1941 cache_buffer->bytes_filled = args->op.readahead.length; 1942 cache_buffer->bytes_flushed = args->op.readahead.length; 1943 cache_buffer->in_progress = false; 1944 pthread_spin_unlock(&file->lock); 1945 1946 __free_args(args); 1947 } 1948 1949 static void 1950 __readahead(void *_args) 1951 { 1952 struct spdk_fs_cb_args *args = _args; 1953 struct spdk_file *file = args->file; 1954 uint64_t offset, length, start_page, num_pages; 1955 uint32_t page_size; 1956 1957 offset = args->op.readahead.offset; 1958 length = args->op.readahead.length; 1959 assert(length > 0); 1960 1961 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1962 1963 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1964 offset, length, start_page, num_pages); 1965 spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1966 args->op.readahead.cache_buffer->buf, 1967 start_page, num_pages, 1968 __readahead_done, args); 1969 } 1970 1971 static uint64_t 1972 __next_cache_buffer_offset(uint64_t offset) 1973 { 1974 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 1975 } 1976 1977 static void 1978 check_readahead(struct spdk_file *file, uint64_t offset) 1979 { 1980 struct spdk_fs_cb_args *args; 1981 1982 offset = __next_cache_buffer_offset(offset); 1983 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 1984 return; 1985 } 1986 1987 args = calloc(1, sizeof(*args)); 1988 if (args == NULL) { 1989 return; 1990 } 1991 1992 BLOBFS_TRACE(file, "offset=%jx\n", offset); 1993 1994 args->file = file; 1995 args->op.readahead.offset = offset; 1996 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 1997 args->op.readahead.cache_buffer->in_progress = true; 1998 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 1999 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2000 } else { 2001 args->op.readahead.length = CACHE_BUFFER_SIZE; 2002 } 2003 file->fs->send_request(__readahead, args); 2004 } 2005 2006 static int 2007 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2008 { 2009 struct cache_buffer *buf; 2010 2011 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2012 if (buf == NULL) { 2013 return __send_rw_from_file(file, sem, payload, offset, length, true); 2014 } 2015 2016 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2017 length = buf->offset + buf->bytes_filled - offset; 2018 } 2019 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2020 memcpy(payload, &buf->buf[offset - buf->offset], length); 2021 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2022 pthread_spin_lock(&g_caches_lock); 2023 spdk_tree_remove_buffer(file->tree, buf); 2024 if (file->tree->present_mask == 0) { 2025 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2026 } 2027 pthread_spin_unlock(&g_caches_lock); 2028 } 2029 2030 sem_post(sem); 2031 return 0; 2032 } 2033 2034 int64_t 2035 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2036 void *payload, uint64_t offset, uint64_t length) 2037 { 2038 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2039 uint64_t final_offset, final_length; 2040 uint32_t sub_reads = 0; 2041 int rc = 0; 2042 2043 pthread_spin_lock(&file->lock); 2044 2045 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2046 2047 file->open_for_writing = false; 2048 2049 if (length == 0 || offset >= file->length) { 2050 pthread_spin_unlock(&file->lock); 2051 return 0; 2052 } 2053 2054 if (offset + length > file->length) { 2055 length = file->length - offset; 2056 } 2057 2058 if (offset != file->next_seq_offset) { 2059 file->seq_byte_count = 0; 2060 } 2061 file->seq_byte_count += length; 2062 file->next_seq_offset = offset + length; 2063 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2064 check_readahead(file, offset); 2065 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2066 } 2067 2068 final_length = 0; 2069 final_offset = offset + length; 2070 while (offset < final_offset) { 2071 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2072 if (length > (final_offset - offset)) { 2073 length = final_offset - offset; 2074 } 2075 rc = __file_read(file, payload, offset, length, &channel->sem); 2076 if (rc == 0) { 2077 final_length += length; 2078 } else { 2079 break; 2080 } 2081 payload += length; 2082 offset += length; 2083 sub_reads++; 2084 } 2085 pthread_spin_unlock(&file->lock); 2086 while (sub_reads-- > 0) { 2087 sem_wait(&channel->sem); 2088 } 2089 if (rc == 0) { 2090 return final_length; 2091 } else { 2092 return rc; 2093 } 2094 } 2095 2096 static void 2097 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2098 spdk_file_op_complete cb_fn, void *cb_arg) 2099 { 2100 struct spdk_fs_request *sync_req; 2101 struct spdk_fs_request *flush_req; 2102 struct spdk_fs_cb_args *sync_args; 2103 struct spdk_fs_cb_args *flush_args; 2104 2105 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2106 2107 pthread_spin_lock(&file->lock); 2108 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2109 BLOBFS_TRACE(file, "done - no data to flush\n"); 2110 pthread_spin_unlock(&file->lock); 2111 cb_fn(cb_arg, 0); 2112 return; 2113 } 2114 2115 sync_req = alloc_fs_request(channel); 2116 assert(sync_req != NULL); 2117 sync_args = &sync_req->args; 2118 2119 flush_req = alloc_fs_request(channel); 2120 assert(flush_req != NULL); 2121 flush_args = &flush_req->args; 2122 2123 sync_args->file = file; 2124 sync_args->fn.file_op = cb_fn; 2125 sync_args->arg = cb_arg; 2126 sync_args->op.sync.offset = file->append_pos; 2127 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2128 pthread_spin_unlock(&file->lock); 2129 2130 flush_args->file = file; 2131 channel->send_request(__file_flush, flush_args); 2132 } 2133 2134 int 2135 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2136 { 2137 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2138 2139 _file_sync(file, channel, __sem_post, &channel->sem); 2140 sem_wait(&channel->sem); 2141 2142 return 0; 2143 } 2144 2145 void 2146 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2147 spdk_file_op_complete cb_fn, void *cb_arg) 2148 { 2149 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2150 2151 _file_sync(file, channel, cb_fn, cb_arg); 2152 } 2153 2154 void 2155 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2156 { 2157 BLOBFS_TRACE(file, "priority=%u\n", priority); 2158 file->priority = priority; 2159 2160 } 2161 2162 /* 2163 * Close routines 2164 */ 2165 2166 static void 2167 __file_close_async_done(void *ctx, int bserrno) 2168 { 2169 struct spdk_fs_request *req = ctx; 2170 struct spdk_fs_cb_args *args = &req->args; 2171 2172 args->fn.file_op(args->arg, bserrno); 2173 free_fs_request(req); 2174 } 2175 2176 static void 2177 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2178 { 2179 pthread_spin_lock(&file->lock); 2180 if (file->ref_count == 0) { 2181 pthread_spin_unlock(&file->lock); 2182 __file_close_async_done(req, -EBADF); 2183 return; 2184 } 2185 2186 file->ref_count--; 2187 if (file->ref_count > 0) { 2188 pthread_spin_unlock(&file->lock); 2189 __file_close_async_done(req, 0); 2190 return; 2191 } 2192 2193 pthread_spin_unlock(&file->lock); 2194 2195 spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req); 2196 } 2197 2198 static void 2199 __file_close_async__sync_done(void *arg, int fserrno) 2200 { 2201 struct spdk_fs_request *req = arg; 2202 struct spdk_fs_cb_args *args = &req->args; 2203 2204 __file_close_async(args->file, req); 2205 } 2206 2207 void 2208 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2209 { 2210 struct spdk_fs_request *req; 2211 struct spdk_fs_cb_args *args; 2212 2213 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2214 if (req == NULL) { 2215 cb_fn(cb_arg, -ENOMEM); 2216 return; 2217 } 2218 2219 args = &req->args; 2220 args->file = file; 2221 args->fn.file_op = cb_fn; 2222 args->arg = cb_arg; 2223 2224 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2225 } 2226 2227 static void 2228 __file_close_done(void *arg, int fserrno) 2229 { 2230 struct spdk_fs_cb_args *args = arg; 2231 2232 args->rc = fserrno; 2233 sem_post(args->sem); 2234 } 2235 2236 static void 2237 __file_close(void *arg) 2238 { 2239 struct spdk_fs_request *req = arg; 2240 struct spdk_fs_cb_args *args = &req->args; 2241 struct spdk_file *file = args->file; 2242 2243 __file_close_async(file, req); 2244 } 2245 2246 int 2247 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2248 { 2249 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2250 struct spdk_fs_request *req; 2251 struct spdk_fs_cb_args *args; 2252 2253 req = alloc_fs_request(channel); 2254 assert(req != NULL); 2255 2256 args = &req->args; 2257 2258 spdk_file_sync(file, _channel); 2259 BLOBFS_TRACE(file, "name=%s\n", file->name); 2260 args->file = file; 2261 args->sem = &channel->sem; 2262 args->fn.file_op = __file_close_done; 2263 args->arg = req; 2264 channel->send_request(__file_close, req); 2265 sem_wait(&channel->sem); 2266 2267 return args->rc; 2268 } 2269 2270 static void 2271 cache_free_buffers(struct spdk_file *file) 2272 { 2273 BLOBFS_TRACE(file, "free=%s\n", file->name); 2274 pthread_spin_lock(&file->lock); 2275 pthread_spin_lock(&g_caches_lock); 2276 if (file->tree->present_mask == 0) { 2277 pthread_spin_unlock(&g_caches_lock); 2278 pthread_spin_unlock(&file->lock); 2279 return; 2280 } 2281 spdk_tree_free_buffers(file->tree); 2282 if (file->tree->present_mask == 0) { 2283 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2284 } 2285 file->last = NULL; 2286 pthread_spin_unlock(&g_caches_lock); 2287 pthread_spin_unlock(&file->lock); 2288 } 2289 2290 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS); 2291 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW); 2292