1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "blobfs_internal.h" 38 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/assert.h" 42 #include "spdk/env.h" 43 #include "spdk/util.h" 44 #include "spdk_internal/log.h" 45 46 #define BLOBFS_TRACE(file, str, args...) \ 47 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args) 48 49 #define BLOBFS_TRACE_RW(file, str, args...) \ 50 SPDK_TRACELOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args) 51 52 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 53 54 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 55 static struct spdk_mempool *g_cache_pool; 56 static TAILQ_HEAD(, spdk_file) g_caches; 57 static pthread_spinlock_t g_caches_lock; 58 59 static void 60 __sem_post(void *arg, int bserrno) 61 { 62 sem_t *sem = arg; 63 64 sem_post(sem); 65 } 66 67 void 68 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 69 { 70 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 71 free(cache_buffer); 72 } 73 74 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 75 76 struct spdk_file { 77 struct spdk_filesystem *fs; 78 struct spdk_blob *blob; 79 char *name; 80 uint64_t length; 81 bool open_for_writing; 82 uint64_t length_flushed; 83 uint64_t append_pos; 84 uint64_t seq_byte_count; 85 uint64_t next_seq_offset; 86 uint32_t priority; 87 TAILQ_ENTRY(spdk_file) tailq; 88 spdk_blob_id blobid; 89 uint32_t ref_count; 90 pthread_spinlock_t lock; 91 struct cache_buffer *last; 92 struct cache_tree *tree; 93 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 94 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 95 TAILQ_ENTRY(spdk_file) cache_tailq; 96 }; 97 98 struct spdk_filesystem { 99 struct spdk_blob_store *bs; 100 TAILQ_HEAD(, spdk_file) files; 101 struct spdk_bs_opts bs_opts; 102 struct spdk_bs_dev *bdev; 103 fs_send_request_fn send_request; 104 105 struct { 106 uint32_t max_ops; 107 struct spdk_io_channel *sync_io_channel; 108 struct spdk_fs_channel *sync_fs_channel; 109 } sync_target; 110 111 struct { 112 uint32_t max_ops; 113 struct spdk_io_channel *md_io_channel; 114 struct spdk_fs_channel *md_fs_channel; 115 } md_target; 116 117 struct { 118 uint32_t max_ops; 119 } io_target; 120 }; 121 122 struct spdk_fs_cb_args { 123 union { 124 spdk_fs_op_with_handle_complete fs_op_with_handle; 125 spdk_fs_op_complete fs_op; 126 spdk_file_op_with_handle_complete file_op_with_handle; 127 spdk_file_op_complete file_op; 128 spdk_file_stat_op_complete stat_op; 129 } fn; 130 void *arg; 131 sem_t *sem; 132 struct spdk_filesystem *fs; 133 struct spdk_file *file; 134 int rc; 135 bool from_request; 136 union { 137 struct { 138 uint64_t length; 139 } truncate; 140 struct { 141 struct spdk_io_channel *channel; 142 void *user_buf; 143 void *pin_buf; 144 int is_read; 145 off_t offset; 146 size_t length; 147 uint64_t start_page; 148 uint64_t num_pages; 149 uint32_t blocklen; 150 } rw; 151 struct { 152 const char *old_name; 153 const char *new_name; 154 } rename; 155 struct { 156 struct cache_buffer *cache_buffer; 157 uint64_t length; 158 } flush; 159 struct { 160 struct cache_buffer *cache_buffer; 161 uint64_t length; 162 uint64_t offset; 163 } readahead; 164 struct { 165 uint64_t offset; 166 TAILQ_ENTRY(spdk_fs_request) tailq; 167 } sync; 168 struct { 169 uint32_t num_clusters; 170 } resize; 171 struct { 172 const char *name; 173 uint32_t flags; 174 TAILQ_ENTRY(spdk_fs_request) tailq; 175 } open; 176 struct { 177 const char *name; 178 } create; 179 struct { 180 const char *name; 181 } delete; 182 struct { 183 const char *name; 184 } stat; 185 } op; 186 }; 187 188 static void cache_free_buffers(struct spdk_file *file); 189 190 static void 191 __initialize_cache(void) 192 { 193 if (g_cache_pool != NULL) { 194 return; 195 } 196 197 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 198 g_fs_cache_size / CACHE_BUFFER_SIZE, 199 CACHE_BUFFER_SIZE, -1, SPDK_ENV_SOCKET_ID_ANY); 200 TAILQ_INIT(&g_caches); 201 pthread_spin_init(&g_caches_lock, 0); 202 } 203 204 static uint64_t 205 __file_get_blob_size(struct spdk_file *file) 206 { 207 uint64_t cluster_sz; 208 209 cluster_sz = file->fs->bs_opts.cluster_sz; 210 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 211 } 212 213 struct spdk_fs_request { 214 struct spdk_fs_cb_args args; 215 TAILQ_ENTRY(spdk_fs_request) link; 216 struct spdk_fs_channel *channel; 217 }; 218 219 struct spdk_fs_channel { 220 struct spdk_fs_request *req_mem; 221 TAILQ_HEAD(, spdk_fs_request) reqs; 222 sem_t sem; 223 struct spdk_filesystem *fs; 224 struct spdk_io_channel *bs_channel; 225 fs_send_request_fn send_request; 226 }; 227 228 static struct spdk_fs_request * 229 alloc_fs_request(struct spdk_fs_channel *channel) 230 { 231 struct spdk_fs_request *req; 232 233 req = TAILQ_FIRST(&channel->reqs); 234 if (!req) { 235 return NULL; 236 } 237 TAILQ_REMOVE(&channel->reqs, req, link); 238 memset(req, 0, sizeof(*req)); 239 req->channel = channel; 240 req->args.from_request = true; 241 242 return req; 243 } 244 245 static void 246 free_fs_request(struct spdk_fs_request *req) 247 { 248 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 249 } 250 251 static int 252 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 253 uint32_t max_ops) 254 { 255 uint32_t i; 256 257 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 258 if (!channel->req_mem) { 259 return -1; 260 } 261 262 TAILQ_INIT(&channel->reqs); 263 sem_init(&channel->sem, 0, 0); 264 265 for (i = 0; i < max_ops; i++) { 266 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 267 } 268 269 channel->fs = fs; 270 271 return 0; 272 } 273 274 static int 275 _spdk_fs_md_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx) 276 { 277 struct spdk_filesystem *fs; 278 struct spdk_fs_channel *channel = ctx_buf; 279 280 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 281 282 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 283 } 284 285 static int 286 _spdk_fs_sync_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx) 287 { 288 struct spdk_filesystem *fs; 289 struct spdk_fs_channel *channel = ctx_buf; 290 291 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 292 293 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 294 } 295 296 static int 297 _spdk_fs_io_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx) 298 { 299 struct spdk_filesystem *fs; 300 struct spdk_fs_channel *channel = ctx_buf; 301 302 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 303 304 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 305 } 306 307 static void 308 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 309 { 310 struct spdk_fs_channel *channel = ctx_buf; 311 312 free(channel->req_mem); 313 if (channel->bs_channel != NULL) { 314 spdk_bs_free_io_channel(channel->bs_channel); 315 } 316 } 317 318 static void 319 __send_request_direct(fs_request_fn fn, void *arg) 320 { 321 fn(arg); 322 } 323 324 static void 325 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 326 { 327 fs->bs = bs; 328 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 329 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, 330 SPDK_IO_PRIORITY_DEFAULT); 331 fs->md_target.md_fs_channel->send_request = __send_request_direct; 332 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, 333 SPDK_IO_PRIORITY_DEFAULT); 334 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 335 } 336 337 static void 338 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 339 { 340 struct spdk_fs_request *req = ctx; 341 struct spdk_fs_cb_args *args = &req->args; 342 struct spdk_filesystem *fs = args->fs; 343 344 if (bserrno == 0) { 345 common_fs_bs_init(fs, bs); 346 } else { 347 free(fs); 348 fs = NULL; 349 } 350 351 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 352 free_fs_request(req); 353 } 354 355 static struct spdk_filesystem * 356 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 357 { 358 struct spdk_filesystem *fs; 359 360 fs = calloc(1, sizeof(*fs)); 361 if (fs == NULL) { 362 return NULL; 363 } 364 365 fs->bdev = dev; 366 fs->send_request = send_request_fn; 367 TAILQ_INIT(&fs->files); 368 369 fs->md_target.max_ops = 512; 370 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 371 sizeof(struct spdk_fs_channel)); 372 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target, SPDK_IO_PRIORITY_DEFAULT, false, 373 NULL); 374 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 375 376 fs->sync_target.max_ops = 512; 377 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 378 sizeof(struct spdk_fs_channel)); 379 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target, SPDK_IO_PRIORITY_DEFAULT, 380 false, NULL); 381 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 382 383 fs->io_target.max_ops = 512; 384 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 385 sizeof(struct spdk_fs_channel)); 386 387 __initialize_cache(); 388 389 return fs; 390 } 391 392 void 393 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 394 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 395 { 396 struct spdk_filesystem *fs; 397 struct spdk_fs_request *req; 398 struct spdk_fs_cb_args *args; 399 400 fs = fs_alloc(dev, send_request_fn); 401 if (fs == NULL) { 402 cb_fn(cb_arg, NULL, -ENOMEM); 403 return; 404 } 405 406 req = alloc_fs_request(fs->md_target.md_fs_channel); 407 if (req == NULL) { 408 cb_fn(cb_arg, NULL, -ENOMEM); 409 return; 410 } 411 412 args = &req->args; 413 args->fn.fs_op_with_handle = cb_fn; 414 args->arg = cb_arg; 415 args->fs = fs; 416 417 spdk_bs_init(dev, NULL, init_cb, req); 418 } 419 420 static struct spdk_file * 421 file_alloc(struct spdk_filesystem *fs) 422 { 423 struct spdk_file *file; 424 425 file = calloc(1, sizeof(*file)); 426 if (file == NULL) { 427 return NULL; 428 } 429 430 file->fs = fs; 431 TAILQ_INIT(&file->open_requests); 432 TAILQ_INIT(&file->sync_requests); 433 pthread_spin_init(&file->lock, 0); 434 file->tree = calloc(1, sizeof(*file->tree)); 435 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 436 file->priority = SPDK_FILE_PRIORITY_LOW; 437 return file; 438 } 439 440 static void 441 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 442 { 443 struct spdk_fs_request *req = ctx; 444 struct spdk_fs_cb_args *args = &req->args; 445 struct spdk_filesystem *fs = args->fs; 446 struct spdk_file *f; 447 uint64_t *length; 448 const char *name; 449 size_t value_len; 450 451 if (rc == -ENOENT) { 452 /* Finished iterating */ 453 args->fn.fs_op_with_handle(args->arg, fs, 0); 454 free_fs_request(req); 455 return; 456 } else if (rc < 0) { 457 args->fn.fs_op_with_handle(args->arg, fs, rc); 458 free_fs_request(req); 459 return; 460 } 461 462 rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len); 463 if (rc < 0) { 464 args->fn.fs_op_with_handle(args->arg, fs, rc); 465 free_fs_request(req); 466 return; 467 } 468 469 rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len); 470 if (rc < 0) { 471 args->fn.fs_op_with_handle(args->arg, fs, rc); 472 free_fs_request(req); 473 return; 474 } 475 assert(value_len == 8); 476 477 f = file_alloc(fs); 478 if (f == NULL) { 479 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 480 free_fs_request(req); 481 return; 482 } 483 484 f->name = strdup(name); 485 f->blobid = spdk_blob_get_id(blob); 486 f->length = *length; 487 f->length_flushed = *length; 488 f->append_pos = *length; 489 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 490 491 spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req); 492 } 493 494 static void 495 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 496 { 497 struct spdk_fs_request *req = ctx; 498 struct spdk_fs_cb_args *args = &req->args; 499 struct spdk_filesystem *fs = args->fs; 500 501 if (bserrno != 0) { 502 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 503 free_fs_request(req); 504 free(fs); 505 return; 506 } 507 508 common_fs_bs_init(fs, bs); 509 spdk_bs_md_iter_first(fs->bs, iter_cb, req); 510 } 511 512 void 513 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 514 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 515 { 516 struct spdk_filesystem *fs; 517 struct spdk_fs_cb_args *args; 518 struct spdk_fs_request *req; 519 520 fs = fs_alloc(dev, send_request_fn); 521 if (fs == NULL) { 522 cb_fn(cb_arg, NULL, -ENOMEM); 523 return; 524 } 525 526 req = alloc_fs_request(fs->md_target.md_fs_channel); 527 if (req == NULL) { 528 cb_fn(cb_arg, NULL, -ENOMEM); 529 return; 530 } 531 532 args = &req->args; 533 args->fn.fs_op_with_handle = cb_fn; 534 args->arg = cb_arg; 535 args->fs = fs; 536 537 spdk_bs_load(dev, load_cb, req); 538 } 539 540 static void 541 unload_cb(void *ctx, int bserrno) 542 { 543 struct spdk_fs_request *req = ctx; 544 struct spdk_fs_cb_args *args = &req->args; 545 struct spdk_filesystem *fs = args->fs; 546 547 args->fn.fs_op(args->arg, bserrno); 548 free(req); 549 550 spdk_io_device_unregister(&fs->io_target); 551 spdk_io_device_unregister(&fs->sync_target); 552 spdk_io_device_unregister(&fs->md_target); 553 554 free(fs); 555 } 556 557 void 558 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 559 { 560 struct spdk_fs_request *req; 561 struct spdk_fs_cb_args *args; 562 563 /* 564 * We must free the md_channel before unloading the blobstore, so just 565 * allocate this request from the general heap. 566 */ 567 req = calloc(1, sizeof(*req)); 568 if (req == NULL) { 569 cb_fn(cb_arg, -ENOMEM); 570 return; 571 } 572 573 args = &req->args; 574 args->fn.fs_op = cb_fn; 575 args->arg = cb_arg; 576 args->fs = fs; 577 578 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 579 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 580 spdk_bs_unload(fs->bs, unload_cb, req); 581 } 582 583 static struct spdk_file * 584 fs_find_file(struct spdk_filesystem *fs, const char *name) 585 { 586 struct spdk_file *file; 587 588 TAILQ_FOREACH(file, &fs->files, tailq) { 589 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 590 return file; 591 } 592 } 593 594 return NULL; 595 } 596 597 void 598 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 599 spdk_file_stat_op_complete cb_fn, void *cb_arg) 600 { 601 struct spdk_file_stat stat; 602 struct spdk_file *f = NULL; 603 604 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 605 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 606 return; 607 } 608 609 f = fs_find_file(fs, name); 610 if (f != NULL) { 611 stat.blobid = f->blobid; 612 stat.size = f->length; 613 cb_fn(cb_arg, &stat, 0); 614 return; 615 } 616 617 cb_fn(cb_arg, NULL, -ENOENT); 618 } 619 620 static void 621 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 622 { 623 struct spdk_fs_request *req = arg; 624 struct spdk_fs_cb_args *args = &req->args; 625 626 args->rc = fserrno; 627 if (fserrno == 0) { 628 memcpy(args->arg, stat, sizeof(*stat)); 629 } 630 sem_post(args->sem); 631 } 632 633 static void 634 __file_stat(void *arg) 635 { 636 struct spdk_fs_request *req = arg; 637 struct spdk_fs_cb_args *args = &req->args; 638 639 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 640 args->fn.stat_op, req); 641 } 642 643 int 644 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 645 const char *name, struct spdk_file_stat *stat) 646 { 647 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 648 struct spdk_fs_request *req; 649 int rc; 650 651 req = alloc_fs_request(channel); 652 assert(req != NULL); 653 654 req->args.fs = fs; 655 req->args.op.stat.name = name; 656 req->args.fn.stat_op = __copy_stat; 657 req->args.arg = stat; 658 req->args.sem = &channel->sem; 659 channel->send_request(__file_stat, req); 660 sem_wait(&channel->sem); 661 662 rc = req->args.rc; 663 free_fs_request(req); 664 665 return rc; 666 } 667 668 static void 669 fs_create_blob_close_cb(void *ctx, int bserrno) 670 { 671 struct spdk_fs_request *req = ctx; 672 struct spdk_fs_cb_args *args = &req->args; 673 674 args->fn.file_op(args->arg, bserrno); 675 free_fs_request(req); 676 } 677 678 static void 679 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 680 { 681 struct spdk_fs_request *req = ctx; 682 struct spdk_fs_cb_args *args = &req->args; 683 struct spdk_file *f = args->file; 684 uint64_t length = 0; 685 686 f->blob = blob; 687 spdk_bs_md_resize_blob(blob, 1); 688 spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 689 spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length)); 690 691 spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args); 692 } 693 694 static void 695 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 696 { 697 struct spdk_fs_request *req = ctx; 698 struct spdk_fs_cb_args *args = &req->args; 699 struct spdk_file *f = args->file; 700 701 f->blobid = blobid; 702 spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 703 } 704 705 void 706 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 707 spdk_file_op_complete cb_fn, void *cb_arg) 708 { 709 struct spdk_file *file; 710 struct spdk_fs_request *req; 711 struct spdk_fs_cb_args *args; 712 713 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 714 cb_fn(cb_arg, -ENAMETOOLONG); 715 return; 716 } 717 718 file = fs_find_file(fs, name); 719 if (file != NULL) { 720 cb_fn(cb_arg, -EEXIST); 721 return; 722 } 723 724 file = file_alloc(fs); 725 if (file == NULL) { 726 cb_fn(cb_arg, -ENOMEM); 727 return; 728 } 729 730 req = alloc_fs_request(fs->md_target.md_fs_channel); 731 if (req == NULL) { 732 cb_fn(cb_arg, -ENOMEM); 733 return; 734 } 735 736 args = &req->args; 737 args->file = file; 738 args->fn.file_op = cb_fn; 739 args->arg = cb_arg; 740 741 file->name = strdup(name); 742 spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args); 743 } 744 745 static void 746 __fs_create_file_done(void *arg, int fserrno) 747 { 748 struct spdk_fs_request *req = arg; 749 struct spdk_fs_cb_args *args = &req->args; 750 751 args->rc = fserrno; 752 sem_post(args->sem); 753 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 754 } 755 756 static void 757 __fs_create_file(void *arg) 758 { 759 struct spdk_fs_request *req = arg; 760 struct spdk_fs_cb_args *args = &req->args; 761 762 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 763 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 764 } 765 766 int 767 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 768 { 769 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 770 struct spdk_fs_request *req; 771 struct spdk_fs_cb_args *args; 772 int rc; 773 774 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 775 776 req = alloc_fs_request(channel); 777 assert(req != NULL); 778 779 args = &req->args; 780 args->fs = fs; 781 args->op.create.name = name; 782 args->sem = &channel->sem; 783 fs->send_request(__fs_create_file, req); 784 sem_wait(&channel->sem); 785 rc = args->rc; 786 free_fs_request(req); 787 788 return rc; 789 } 790 791 static void 792 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 793 { 794 struct spdk_fs_request *req = ctx; 795 struct spdk_fs_cb_args *args = &req->args; 796 struct spdk_file *f = args->file; 797 798 f->blob = blob; 799 while (!TAILQ_EMPTY(&f->open_requests)) { 800 req = TAILQ_FIRST(&f->open_requests); 801 args = &req->args; 802 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 803 args->fn.file_op_with_handle(args->arg, f, bserrno); 804 free_fs_request(req); 805 } 806 } 807 808 static void 809 fs_open_blob_create_cb(void *ctx, int bserrno) 810 { 811 struct spdk_fs_request *req = ctx; 812 struct spdk_fs_cb_args *args = &req->args; 813 struct spdk_file *file = args->file; 814 struct spdk_filesystem *fs = args->fs; 815 816 if (file == NULL) { 817 /* 818 * This is from an open with CREATE flag - the file 819 * is now created so look it up in the file list for this 820 * filesystem. 821 */ 822 file = fs_find_file(fs, args->op.open.name); 823 assert(file != NULL); 824 args->file = file; 825 } 826 827 file->ref_count++; 828 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 829 if (file->ref_count == 1) { 830 assert(file->blob == NULL); 831 spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 832 } else if (file->blob != NULL) { 833 fs_open_blob_done(req, file->blob, 0); 834 } else { 835 /* 836 * The blob open for this file is in progress due to a previous 837 * open request. When that open completes, it will invoke the 838 * open callback for this request. 839 */ 840 } 841 } 842 843 void 844 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 845 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 846 { 847 struct spdk_file *f = NULL; 848 struct spdk_fs_request *req; 849 struct spdk_fs_cb_args *args; 850 851 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 852 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 853 return; 854 } 855 856 f = fs_find_file(fs, name); 857 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 858 cb_fn(cb_arg, NULL, -ENOENT); 859 return; 860 } 861 862 req = alloc_fs_request(fs->md_target.md_fs_channel); 863 if (req == NULL) { 864 cb_fn(cb_arg, NULL, -ENOMEM); 865 return; 866 } 867 868 args = &req->args; 869 args->fn.file_op_with_handle = cb_fn; 870 args->arg = cb_arg; 871 args->file = f; 872 args->fs = fs; 873 args->op.open.name = name; 874 875 if (f == NULL) { 876 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 877 } else { 878 fs_open_blob_create_cb(req, 0); 879 } 880 } 881 882 static void 883 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 884 { 885 struct spdk_fs_request *req = arg; 886 struct spdk_fs_cb_args *args = &req->args; 887 888 args->file = file; 889 args->rc = bserrno; 890 sem_post(args->sem); 891 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 892 } 893 894 static void 895 __fs_open_file(void *arg) 896 { 897 struct spdk_fs_request *req = arg; 898 struct spdk_fs_cb_args *args = &req->args; 899 900 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 901 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 902 __fs_open_file_done, req); 903 } 904 905 int 906 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 907 const char *name, uint32_t flags, struct spdk_file **file) 908 { 909 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 910 struct spdk_fs_request *req; 911 struct spdk_fs_cb_args *args; 912 int rc; 913 914 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 915 916 req = alloc_fs_request(channel); 917 assert(req != NULL); 918 919 args = &req->args; 920 args->fs = fs; 921 args->op.open.name = name; 922 args->op.open.flags = flags; 923 args->sem = &channel->sem; 924 fs->send_request(__fs_open_file, req); 925 sem_wait(&channel->sem); 926 rc = args->rc; 927 if (rc == 0) { 928 *file = args->file; 929 } else { 930 *file = NULL; 931 } 932 free_fs_request(req); 933 934 return rc; 935 } 936 937 static void 938 fs_rename_blob_close_cb(void *ctx, int bserrno) 939 { 940 struct spdk_fs_request *req = ctx; 941 struct spdk_fs_cb_args *args = &req->args; 942 943 args->fn.fs_op(args->arg, bserrno); 944 free_fs_request(req); 945 } 946 947 static void 948 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 949 { 950 struct spdk_fs_request *req = ctx; 951 struct spdk_fs_cb_args *args = &req->args; 952 struct spdk_file *f = args->file; 953 const char *new_name = args->op.rename.new_name; 954 955 f->blob = blob; 956 spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 957 spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req); 958 } 959 960 static void 961 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 962 { 963 struct spdk_fs_cb_args *args = &req->args; 964 struct spdk_file *f; 965 966 f = fs_find_file(args->fs, args->op.rename.old_name); 967 if (f == NULL) { 968 args->fn.fs_op(args->arg, -ENOENT); 969 free_fs_request(req); 970 return; 971 } 972 973 free(f->name); 974 f->name = strdup(args->op.rename.new_name); 975 args->file = f; 976 spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 977 } 978 979 static void 980 fs_rename_delete_done(void *arg, int fserrno) 981 { 982 __spdk_fs_md_rename_file(arg); 983 } 984 985 void 986 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 987 const char *old_name, const char *new_name, 988 spdk_file_op_complete cb_fn, void *cb_arg) 989 { 990 struct spdk_file *f; 991 struct spdk_fs_request *req; 992 struct spdk_fs_cb_args *args; 993 994 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name); 995 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 996 cb_fn(cb_arg, -ENAMETOOLONG); 997 return; 998 } 999 1000 req = alloc_fs_request(fs->md_target.md_fs_channel); 1001 if (req == NULL) { 1002 cb_fn(cb_arg, -ENOMEM); 1003 return; 1004 } 1005 1006 args = &req->args; 1007 args->fn.fs_op = cb_fn; 1008 args->fs = fs; 1009 args->arg = cb_arg; 1010 args->op.rename.old_name = old_name; 1011 args->op.rename.new_name = new_name; 1012 1013 f = fs_find_file(fs, new_name); 1014 if (f == NULL) { 1015 __spdk_fs_md_rename_file(req); 1016 return; 1017 } 1018 1019 /* 1020 * The rename overwrites an existing file. So delete the existing file, then 1021 * do the actual rename. 1022 */ 1023 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1024 } 1025 1026 static void 1027 __fs_rename_file_done(void *arg, int fserrno) 1028 { 1029 struct spdk_fs_request *req = arg; 1030 struct spdk_fs_cb_args *args = &req->args; 1031 1032 args->rc = fserrno; 1033 sem_post(args->sem); 1034 } 1035 1036 static void 1037 __fs_rename_file(void *arg) 1038 { 1039 struct spdk_fs_request *req = arg; 1040 struct spdk_fs_cb_args *args = &req->args; 1041 1042 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1043 __fs_rename_file_done, req); 1044 } 1045 1046 int 1047 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1048 const char *old_name, const char *new_name) 1049 { 1050 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1051 struct spdk_fs_request *req; 1052 struct spdk_fs_cb_args *args; 1053 int rc; 1054 1055 req = alloc_fs_request(channel); 1056 assert(req != NULL); 1057 1058 args = &req->args; 1059 1060 args->fs = fs; 1061 args->op.rename.old_name = old_name; 1062 args->op.rename.new_name = new_name; 1063 args->sem = &channel->sem; 1064 fs->send_request(__fs_rename_file, req); 1065 sem_wait(&channel->sem); 1066 rc = args->rc; 1067 free_fs_request(req); 1068 return rc; 1069 } 1070 1071 static void 1072 blob_delete_cb(void *ctx, int bserrno) 1073 { 1074 struct spdk_fs_request *req = ctx; 1075 struct spdk_fs_cb_args *args = &req->args; 1076 1077 args->fn.file_op(args->arg, bserrno); 1078 free_fs_request(req); 1079 } 1080 1081 void 1082 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1083 spdk_file_op_complete cb_fn, void *cb_arg) 1084 { 1085 struct spdk_file *f; 1086 spdk_blob_id blobid; 1087 struct spdk_fs_request *req; 1088 struct spdk_fs_cb_args *args; 1089 1090 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1091 1092 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1093 cb_fn(cb_arg, -ENAMETOOLONG); 1094 return; 1095 } 1096 1097 f = fs_find_file(fs, name); 1098 if (f == NULL) { 1099 cb_fn(cb_arg, -ENOENT); 1100 return; 1101 } 1102 1103 if (f->ref_count > 0) { 1104 /* For now, do not allow deleting files with open references. */ 1105 cb_fn(cb_arg, -EBUSY); 1106 return; 1107 } 1108 1109 req = alloc_fs_request(fs->md_target.md_fs_channel); 1110 if (req == NULL) { 1111 cb_fn(cb_arg, -ENOMEM); 1112 return; 1113 } 1114 1115 TAILQ_REMOVE(&fs->files, f, tailq); 1116 1117 cache_free_buffers(f); 1118 1119 blobid = f->blobid; 1120 1121 free(f->name); 1122 free(f->tree); 1123 free(f); 1124 1125 args = &req->args; 1126 args->fn.file_op = cb_fn; 1127 args->arg = cb_arg; 1128 spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1129 } 1130 1131 static void 1132 __fs_delete_file_done(void *arg, int fserrno) 1133 { 1134 struct spdk_fs_request *req = arg; 1135 struct spdk_fs_cb_args *args = &req->args; 1136 1137 args->rc = fserrno; 1138 sem_post(args->sem); 1139 } 1140 1141 static void 1142 __fs_delete_file(void *arg) 1143 { 1144 struct spdk_fs_request *req = arg; 1145 struct spdk_fs_cb_args *args = &req->args; 1146 1147 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1148 } 1149 1150 int 1151 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1152 const char *name) 1153 { 1154 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1155 struct spdk_fs_request *req; 1156 struct spdk_fs_cb_args *args; 1157 int rc; 1158 1159 req = alloc_fs_request(channel); 1160 assert(req != NULL); 1161 1162 args = &req->args; 1163 args->fs = fs; 1164 args->op.delete.name = name; 1165 args->sem = &channel->sem; 1166 fs->send_request(__fs_delete_file, req); 1167 sem_wait(&channel->sem); 1168 rc = args->rc; 1169 free_fs_request(req); 1170 1171 return rc; 1172 } 1173 1174 spdk_fs_iter 1175 spdk_fs_iter_first(struct spdk_filesystem *fs) 1176 { 1177 struct spdk_file *f; 1178 1179 f = TAILQ_FIRST(&fs->files); 1180 return f; 1181 } 1182 1183 spdk_fs_iter 1184 spdk_fs_iter_next(spdk_fs_iter iter) 1185 { 1186 struct spdk_file *f = iter; 1187 1188 if (f == NULL) { 1189 return NULL; 1190 } 1191 1192 f = TAILQ_NEXT(f, tailq); 1193 return f; 1194 } 1195 1196 const char * 1197 spdk_file_get_name(struct spdk_file *file) 1198 { 1199 return file->name; 1200 } 1201 1202 uint64_t 1203 spdk_file_get_length(struct spdk_file *file) 1204 { 1205 assert(file != NULL); 1206 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1207 return file->length; 1208 } 1209 1210 static void 1211 fs_truncate_complete_cb(void *ctx, int bserrno) 1212 { 1213 struct spdk_fs_request *req = ctx; 1214 struct spdk_fs_cb_args *args = &req->args; 1215 1216 args->fn.file_op(args->arg, bserrno); 1217 free_fs_request(req); 1218 } 1219 1220 static uint64_t 1221 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1222 { 1223 return (length + cluster_sz - 1) / cluster_sz; 1224 } 1225 1226 void 1227 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1228 spdk_file_op_complete cb_fn, void *cb_arg) 1229 { 1230 struct spdk_filesystem *fs; 1231 size_t num_clusters; 1232 struct spdk_fs_request *req; 1233 struct spdk_fs_cb_args *args; 1234 1235 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1236 if (length == file->length) { 1237 cb_fn(cb_arg, 0); 1238 return; 1239 } 1240 1241 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1242 if (req == NULL) { 1243 cb_fn(cb_arg, -ENOMEM); 1244 return; 1245 } 1246 1247 args = &req->args; 1248 args->fn.file_op = cb_fn; 1249 args->arg = cb_arg; 1250 args->file = file; 1251 fs = file->fs; 1252 1253 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1254 1255 spdk_bs_md_resize_blob(file->blob, num_clusters); 1256 spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length)); 1257 1258 file->length = length; 1259 if (file->append_pos > file->length) { 1260 file->append_pos = file->length; 1261 } 1262 1263 spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args); 1264 } 1265 1266 static void 1267 __truncate(void *arg) 1268 { 1269 struct spdk_fs_request *req = arg; 1270 struct spdk_fs_cb_args *args = &req->args; 1271 1272 spdk_file_truncate_async(args->file, args->op.truncate.length, 1273 args->fn.file_op, args->arg); 1274 } 1275 1276 void 1277 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1278 uint64_t length) 1279 { 1280 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1281 struct spdk_fs_request *req; 1282 struct spdk_fs_cb_args *args; 1283 1284 req = alloc_fs_request(channel); 1285 assert(req != NULL); 1286 1287 args = &req->args; 1288 1289 args->file = file; 1290 args->op.truncate.length = length; 1291 args->fn.file_op = __sem_post; 1292 args->arg = &channel->sem; 1293 1294 channel->send_request(__truncate, req); 1295 sem_wait(&channel->sem); 1296 free_fs_request(req); 1297 } 1298 1299 static void 1300 __rw_done(void *ctx, int bserrno) 1301 { 1302 struct spdk_fs_request *req = ctx; 1303 struct spdk_fs_cb_args *args = &req->args; 1304 1305 spdk_free(args->op.rw.pin_buf); 1306 args->fn.file_op(args->arg, bserrno); 1307 free_fs_request(req); 1308 } 1309 1310 static void 1311 __read_done(void *ctx, int bserrno) 1312 { 1313 struct spdk_fs_request *req = ctx; 1314 struct spdk_fs_cb_args *args = &req->args; 1315 1316 if (args->op.rw.is_read) { 1317 memcpy(args->op.rw.user_buf, 1318 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1319 args->op.rw.length); 1320 __rw_done(req, 0); 1321 } else { 1322 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1323 args->op.rw.user_buf, 1324 args->op.rw.length); 1325 spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel, 1326 args->op.rw.pin_buf, 1327 args->op.rw.start_page, args->op.rw.num_pages, 1328 __rw_done, req); 1329 } 1330 } 1331 1332 static void 1333 __do_blob_read(void *ctx, int fserrno) 1334 { 1335 struct spdk_fs_request *req = ctx; 1336 struct spdk_fs_cb_args *args = &req->args; 1337 1338 spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel, 1339 args->op.rw.pin_buf, 1340 args->op.rw.start_page, args->op.rw.num_pages, 1341 __read_done, req); 1342 } 1343 1344 static void 1345 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1346 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1347 { 1348 uint64_t end_page; 1349 1350 *page_size = spdk_bs_get_page_size(file->fs->bs); 1351 *start_page = offset / *page_size; 1352 end_page = (offset + length - 1) / *page_size; 1353 *num_pages = (end_page - *start_page + 1); 1354 } 1355 1356 static void 1357 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1358 void *payload, uint64_t offset, uint64_t length, 1359 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1360 { 1361 struct spdk_fs_request *req; 1362 struct spdk_fs_cb_args *args; 1363 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1364 uint64_t start_page, num_pages, pin_buf_length; 1365 uint32_t page_size; 1366 1367 if (is_read && offset + length > file->length) { 1368 cb_fn(cb_arg, -EINVAL); 1369 return; 1370 } 1371 1372 req = alloc_fs_request(channel); 1373 if (req == NULL) { 1374 cb_fn(cb_arg, -ENOMEM); 1375 return; 1376 } 1377 1378 args = &req->args; 1379 args->fn.file_op = cb_fn; 1380 args->arg = cb_arg; 1381 args->file = file; 1382 args->op.rw.channel = channel->bs_channel; 1383 args->op.rw.user_buf = payload; 1384 args->op.rw.is_read = is_read; 1385 args->op.rw.offset = offset; 1386 args->op.rw.length = length; 1387 1388 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1389 pin_buf_length = num_pages * page_size; 1390 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, 4096, NULL); 1391 1392 args->op.rw.start_page = start_page; 1393 args->op.rw.num_pages = num_pages; 1394 1395 if (!is_read && file->length < offset + length) { 1396 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1397 } else { 1398 __do_blob_read(req, 0); 1399 } 1400 } 1401 1402 void 1403 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1404 void *payload, uint64_t offset, uint64_t length, 1405 spdk_file_op_complete cb_fn, void *cb_arg) 1406 { 1407 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1408 } 1409 1410 void 1411 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1412 void *payload, uint64_t offset, uint64_t length, 1413 spdk_file_op_complete cb_fn, void *cb_arg) 1414 { 1415 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n", 1416 file->name, offset, length); 1417 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1418 } 1419 1420 struct spdk_io_channel * 1421 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs, uint32_t priority) 1422 { 1423 struct spdk_io_channel *io_channel; 1424 struct spdk_fs_channel *fs_channel; 1425 1426 io_channel = spdk_get_io_channel(&fs->io_target, priority, false, NULL); 1427 fs_channel = spdk_io_channel_get_ctx(io_channel); 1428 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT); 1429 fs_channel->send_request = __send_request_direct; 1430 1431 return io_channel; 1432 } 1433 1434 struct spdk_io_channel * 1435 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs, uint32_t priority) 1436 { 1437 struct spdk_io_channel *io_channel; 1438 struct spdk_fs_channel *fs_channel; 1439 1440 io_channel = spdk_get_io_channel(&fs->io_target, priority, false, NULL); 1441 fs_channel = spdk_io_channel_get_ctx(io_channel); 1442 fs_channel->send_request = fs->send_request; 1443 1444 return io_channel; 1445 } 1446 1447 void 1448 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1449 { 1450 spdk_put_io_channel(channel); 1451 } 1452 1453 void 1454 spdk_fs_set_cache_size(uint64_t size_in_mb) 1455 { 1456 g_fs_cache_size = size_in_mb * 1024 * 1024; 1457 } 1458 1459 uint64_t 1460 spdk_fs_get_cache_size(void) 1461 { 1462 return g_fs_cache_size / (1024 * 1024); 1463 } 1464 1465 static void __file_flush(void *_args); 1466 1467 static void * 1468 alloc_cache_memory_buffer(struct spdk_file *context) 1469 { 1470 struct spdk_file *file; 1471 void *buf; 1472 1473 buf = spdk_mempool_get(g_cache_pool); 1474 if (buf != NULL) { 1475 return buf; 1476 } 1477 1478 pthread_spin_lock(&g_caches_lock); 1479 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1480 if (!file->open_for_writing && 1481 file->priority == SPDK_FILE_PRIORITY_LOW && 1482 file != context) { 1483 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1484 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1485 break; 1486 } 1487 } 1488 pthread_spin_unlock(&g_caches_lock); 1489 if (file != NULL) { 1490 cache_free_buffers(file); 1491 buf = spdk_mempool_get(g_cache_pool); 1492 if (buf != NULL) { 1493 return buf; 1494 } 1495 } 1496 1497 pthread_spin_lock(&g_caches_lock); 1498 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1499 if (!file->open_for_writing && file != context) { 1500 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1501 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1502 break; 1503 } 1504 } 1505 pthread_spin_unlock(&g_caches_lock); 1506 if (file != NULL) { 1507 cache_free_buffers(file); 1508 buf = spdk_mempool_get(g_cache_pool); 1509 if (buf != NULL) { 1510 return buf; 1511 } 1512 } 1513 1514 pthread_spin_lock(&g_caches_lock); 1515 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1516 if (file != context) { 1517 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1518 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1519 break; 1520 } 1521 } 1522 pthread_spin_unlock(&g_caches_lock); 1523 if (file != NULL) { 1524 cache_free_buffers(file); 1525 buf = spdk_mempool_get(g_cache_pool); 1526 if (buf != NULL) { 1527 return buf; 1528 } 1529 } 1530 1531 assert(false); 1532 return NULL; 1533 } 1534 1535 static struct cache_buffer * 1536 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1537 { 1538 struct cache_buffer *buf; 1539 int count = 0; 1540 1541 buf = calloc(1, sizeof(*buf)); 1542 if (buf == NULL) { 1543 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "calloc failed\n"); 1544 return NULL; 1545 } 1546 1547 buf->buf = alloc_cache_memory_buffer(file); 1548 if (buf->buf == NULL) { 1549 while (buf->buf == NULL) { 1550 /* 1551 * TODO: alloc_cache_memory_buffer() should eventually free 1552 * some buffers. Need a more sophisticated check here, instead 1553 * of just bailing if 100 tries does not result in getting a 1554 * free buffer. This will involve using the sync channel's 1555 * semaphore to block until a buffer becomes available. 1556 */ 1557 if (count++ == 100) { 1558 SPDK_ERRLOG("could not allocate cache buffer\n"); 1559 assert(false); 1560 free(buf); 1561 return NULL; 1562 } 1563 buf->buf = alloc_cache_memory_buffer(file); 1564 } 1565 } 1566 1567 buf->buf_size = CACHE_BUFFER_SIZE; 1568 buf->offset = offset; 1569 1570 pthread_spin_lock(&g_caches_lock); 1571 if (file->tree->present_mask == 0) { 1572 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1573 } 1574 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1575 pthread_spin_unlock(&g_caches_lock); 1576 1577 return buf; 1578 } 1579 1580 static struct cache_buffer * 1581 cache_append_buffer(struct spdk_file *file) 1582 { 1583 struct cache_buffer *last; 1584 1585 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1586 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1587 1588 last = cache_insert_buffer(file, file->append_pos); 1589 if (last == NULL) { 1590 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n"); 1591 return NULL; 1592 } 1593 1594 if (file->last != NULL) { 1595 file->last->next = last; 1596 } 1597 file->last = last; 1598 1599 return last; 1600 } 1601 1602 static void 1603 __wake_caller(struct spdk_fs_cb_args *args) 1604 { 1605 sem_post(args->sem); 1606 } 1607 1608 static void 1609 __file_cache_finish_sync(struct spdk_file *file) 1610 { 1611 struct spdk_fs_request *sync_req; 1612 struct spdk_fs_cb_args *sync_args; 1613 1614 pthread_spin_lock(&file->lock); 1615 while (!TAILQ_EMPTY(&file->sync_requests)) { 1616 sync_req = TAILQ_FIRST(&file->sync_requests); 1617 sync_args = &sync_req->args; 1618 if (sync_args->op.sync.offset > file->length_flushed) { 1619 break; 1620 } 1621 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1622 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1623 pthread_spin_unlock(&file->lock); 1624 sync_args->fn.file_op(sync_args->arg, 0); 1625 pthread_spin_lock(&file->lock); 1626 free_fs_request(sync_req); 1627 } 1628 pthread_spin_unlock(&file->lock); 1629 } 1630 1631 static void 1632 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1633 { 1634 struct spdk_file *file = ctx; 1635 1636 __file_cache_finish_sync(file); 1637 } 1638 1639 static void 1640 __free_args(struct spdk_fs_cb_args *args) 1641 { 1642 struct spdk_fs_request *req; 1643 1644 if (!args->from_request) { 1645 free(args); 1646 } else { 1647 /* Depends on args being at the start of the spdk_fs_request structure. */ 1648 req = (struct spdk_fs_request *)args; 1649 free_fs_request(req); 1650 } 1651 } 1652 1653 static void 1654 __file_flush_done(void *arg, int bserrno) 1655 { 1656 struct spdk_fs_cb_args *args = arg; 1657 struct spdk_fs_request *sync_req; 1658 struct spdk_file *file = args->file; 1659 struct cache_buffer *next = args->op.flush.cache_buffer; 1660 1661 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1662 1663 pthread_spin_lock(&file->lock); 1664 next->in_progress = false; 1665 next->bytes_flushed += args->op.flush.length; 1666 file->length_flushed += args->op.flush.length; 1667 if (file->length_flushed > file->length) { 1668 file->length = file->length_flushed; 1669 } 1670 if (next->bytes_flushed == next->buf_size) { 1671 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1672 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1673 } 1674 1675 TAILQ_FOREACH_REVERSE(sync_req, &file->sync_requests, sync_requests_head, args.op.sync.tailq) { 1676 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1677 break; 1678 } 1679 } 1680 1681 /* 1682 * Assert that there is no cached data that extends past the end of the underlying 1683 * blob. 1684 */ 1685 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1686 next->bytes_filled == 0); 1687 1688 if (sync_req != NULL) { 1689 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1690 spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed, 1691 sizeof(file->length_flushed)); 1692 1693 pthread_spin_unlock(&file->lock); 1694 spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file); 1695 } else { 1696 pthread_spin_unlock(&file->lock); 1697 __file_cache_finish_sync(file); 1698 } 1699 1700 __file_flush(args); 1701 } 1702 1703 static void 1704 __file_flush(void *_args) 1705 { 1706 struct spdk_fs_cb_args *args = _args; 1707 struct spdk_file *file = args->file; 1708 struct cache_buffer *next; 1709 uint64_t offset, length, start_page, num_pages; 1710 uint32_t page_size; 1711 1712 pthread_spin_lock(&file->lock); 1713 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1714 if (next == NULL || next->in_progress) { 1715 /* 1716 * There is either no data to flush, or a flush I/O is already in 1717 * progress. So return immediately - if a flush I/O is in 1718 * progress we will flush more data after that is completed. 1719 */ 1720 __free_args(args); 1721 pthread_spin_unlock(&file->lock); 1722 return; 1723 } 1724 1725 offset = next->offset + next->bytes_flushed; 1726 length = next->bytes_filled - next->bytes_flushed; 1727 if (length == 0) { 1728 __free_args(args); 1729 pthread_spin_unlock(&file->lock); 1730 return; 1731 } 1732 args->op.flush.length = length; 1733 args->op.flush.cache_buffer = next; 1734 1735 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1736 1737 next->in_progress = true; 1738 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1739 offset, length, start_page, num_pages); 1740 pthread_spin_unlock(&file->lock); 1741 spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1742 next->buf + (start_page * page_size) - next->offset, 1743 start_page, num_pages, 1744 __file_flush_done, args); 1745 } 1746 1747 static void 1748 __file_extend_done(void *arg, int bserrno) 1749 { 1750 struct spdk_fs_cb_args *args = arg; 1751 1752 __wake_caller(args); 1753 } 1754 1755 static void 1756 __file_extend_blob(void *_args) 1757 { 1758 struct spdk_fs_cb_args *args = _args; 1759 struct spdk_file *file = args->file; 1760 1761 spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters); 1762 1763 spdk_bs_md_sync_blob(file->blob, __file_extend_done, args); 1764 } 1765 1766 static void 1767 __rw_from_file_done(void *arg, int bserrno) 1768 { 1769 struct spdk_fs_cb_args *args = arg; 1770 1771 __wake_caller(args); 1772 __free_args(args); 1773 } 1774 1775 static void 1776 __rw_from_file(void *_args) 1777 { 1778 struct spdk_fs_cb_args *args = _args; 1779 struct spdk_file *file = args->file; 1780 1781 if (args->op.rw.is_read) { 1782 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1783 args->op.rw.offset, args->op.rw.length, 1784 __rw_from_file_done, args); 1785 } else { 1786 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1787 args->op.rw.offset, args->op.rw.length, 1788 __rw_from_file_done, args); 1789 } 1790 } 1791 1792 static int 1793 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1794 uint64_t offset, uint64_t length, bool is_read) 1795 { 1796 struct spdk_fs_cb_args *args; 1797 1798 args = calloc(1, sizeof(*args)); 1799 if (args == NULL) { 1800 sem_post(sem); 1801 return -ENOMEM; 1802 } 1803 1804 args->file = file; 1805 args->sem = sem; 1806 args->op.rw.user_buf = payload; 1807 args->op.rw.offset = offset; 1808 args->op.rw.length = length; 1809 args->op.rw.is_read = is_read; 1810 file->fs->send_request(__rw_from_file, args); 1811 return 0; 1812 } 1813 1814 int 1815 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 1816 void *payload, uint64_t offset, uint64_t length) 1817 { 1818 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1819 struct spdk_fs_cb_args *args; 1820 uint64_t rem_length, copy, blob_size, cluster_sz; 1821 uint32_t cache_buffers_filled = 0; 1822 uint8_t *cur_payload; 1823 struct cache_buffer *last; 1824 1825 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 1826 1827 if (length == 0) { 1828 return 0; 1829 } 1830 1831 if (offset != file->append_pos) { 1832 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 1833 return -EINVAL; 1834 } 1835 1836 pthread_spin_lock(&file->lock); 1837 file->open_for_writing = true; 1838 1839 if (file->last == NULL) { 1840 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 1841 cache_append_buffer(file); 1842 } else { 1843 int rc; 1844 1845 file->append_pos += length; 1846 rc = __send_rw_from_file(file, &channel->sem, payload, 1847 offset, length, false); 1848 pthread_spin_unlock(&file->lock); 1849 sem_wait(&channel->sem); 1850 return rc; 1851 } 1852 } 1853 1854 blob_size = __file_get_blob_size(file); 1855 1856 if ((offset + length) > blob_size) { 1857 struct spdk_fs_cb_args extend_args = {}; 1858 1859 cluster_sz = file->fs->bs_opts.cluster_sz; 1860 extend_args.sem = &channel->sem; 1861 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 1862 extend_args.file = file; 1863 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 1864 pthread_spin_unlock(&file->lock); 1865 file->fs->send_request(__file_extend_blob, &extend_args); 1866 sem_wait(&channel->sem); 1867 } 1868 1869 last = file->last; 1870 rem_length = length; 1871 cur_payload = payload; 1872 while (rem_length > 0) { 1873 copy = last->buf_size - last->bytes_filled; 1874 if (copy > rem_length) { 1875 copy = rem_length; 1876 } 1877 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 1878 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 1879 file->append_pos += copy; 1880 if (file->length < file->append_pos) { 1881 file->length = file->append_pos; 1882 } 1883 cur_payload += copy; 1884 last->bytes_filled += copy; 1885 rem_length -= copy; 1886 if (last->bytes_filled == last->buf_size) { 1887 cache_buffers_filled++; 1888 last = cache_append_buffer(file); 1889 if (last == NULL) { 1890 BLOBFS_TRACE(file, "nomem\n"); 1891 pthread_spin_unlock(&file->lock); 1892 return -ENOMEM; 1893 } 1894 } 1895 } 1896 1897 if (cache_buffers_filled == 0) { 1898 pthread_spin_unlock(&file->lock); 1899 return 0; 1900 } 1901 1902 args = calloc(1, sizeof(*args)); 1903 if (args == NULL) { 1904 pthread_spin_unlock(&file->lock); 1905 return -ENOMEM; 1906 } 1907 1908 args->file = file; 1909 file->fs->send_request(__file_flush, args); 1910 pthread_spin_unlock(&file->lock); 1911 return 0; 1912 } 1913 1914 static void 1915 __readahead_done(void *arg, int bserrno) 1916 { 1917 struct spdk_fs_cb_args *args = arg; 1918 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 1919 struct spdk_file *file = args->file; 1920 1921 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 1922 1923 pthread_spin_lock(&file->lock); 1924 cache_buffer->bytes_filled = args->op.readahead.length; 1925 cache_buffer->bytes_flushed = args->op.readahead.length; 1926 cache_buffer->in_progress = false; 1927 pthread_spin_unlock(&file->lock); 1928 1929 __free_args(args); 1930 } 1931 1932 static void 1933 __readahead(void *_args) 1934 { 1935 struct spdk_fs_cb_args *args = _args; 1936 struct spdk_file *file = args->file; 1937 uint64_t offset, length, start_page, num_pages; 1938 uint32_t page_size; 1939 1940 offset = args->op.readahead.offset; 1941 length = args->op.readahead.length; 1942 assert(length > 0); 1943 1944 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1945 1946 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1947 offset, length, start_page, num_pages); 1948 spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1949 args->op.readahead.cache_buffer->buf, 1950 start_page, num_pages, 1951 __readahead_done, args); 1952 } 1953 1954 static uint64_t 1955 __next_cache_buffer_offset(uint64_t offset) 1956 { 1957 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 1958 } 1959 1960 static void 1961 check_readahead(struct spdk_file *file, uint64_t offset) 1962 { 1963 struct spdk_fs_cb_args *args; 1964 1965 offset = __next_cache_buffer_offset(offset); 1966 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 1967 return; 1968 } 1969 1970 args = calloc(1, sizeof(*args)); 1971 if (args == NULL) { 1972 return; 1973 } 1974 1975 BLOBFS_TRACE(file, "offset=%jx\n", offset); 1976 1977 args->file = file; 1978 args->op.readahead.offset = offset; 1979 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 1980 args->op.readahead.cache_buffer->in_progress = true; 1981 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 1982 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 1983 } else { 1984 args->op.readahead.length = CACHE_BUFFER_SIZE; 1985 } 1986 file->fs->send_request(__readahead, args); 1987 } 1988 1989 static int 1990 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 1991 { 1992 struct cache_buffer *buf; 1993 1994 buf = spdk_tree_find_filled_buffer(file->tree, offset); 1995 if (buf == NULL) { 1996 return __send_rw_from_file(file, sem, payload, offset, length, true); 1997 } 1998 1999 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2000 length = buf->offset + buf->bytes_filled - offset; 2001 } 2002 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2003 memcpy(payload, &buf->buf[offset - buf->offset], length); 2004 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2005 pthread_spin_lock(&g_caches_lock); 2006 spdk_tree_remove_buffer(file->tree, buf); 2007 if (file->tree->present_mask == 0) { 2008 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2009 } 2010 pthread_spin_unlock(&g_caches_lock); 2011 } 2012 2013 sem_post(sem); 2014 return 0; 2015 } 2016 2017 int64_t 2018 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2019 void *payload, uint64_t offset, uint64_t length) 2020 { 2021 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2022 uint64_t final_offset, final_length; 2023 uint32_t sub_reads = 0; 2024 int rc = 0; 2025 2026 pthread_spin_lock(&file->lock); 2027 2028 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2029 2030 file->open_for_writing = false; 2031 2032 if (length == 0 || offset >= file->length) { 2033 pthread_spin_unlock(&file->lock); 2034 return 0; 2035 } 2036 2037 if (offset + length > file->length) { 2038 length = file->length - offset; 2039 } 2040 2041 if (offset != file->next_seq_offset) { 2042 file->seq_byte_count = 0; 2043 } 2044 file->seq_byte_count += length; 2045 file->next_seq_offset = offset + length; 2046 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2047 check_readahead(file, offset); 2048 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2049 } 2050 2051 final_length = 0; 2052 final_offset = offset + length; 2053 while (offset < final_offset) { 2054 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2055 if (length > (final_offset - offset)) { 2056 length = final_offset - offset; 2057 } 2058 rc = __file_read(file, payload, offset, length, &channel->sem); 2059 if (rc == 0) { 2060 final_length += length; 2061 } else { 2062 break; 2063 } 2064 payload += length; 2065 offset += length; 2066 sub_reads++; 2067 } 2068 pthread_spin_unlock(&file->lock); 2069 while (sub_reads-- > 0) { 2070 sem_wait(&channel->sem); 2071 } 2072 if (rc == 0) { 2073 return final_length; 2074 } else { 2075 return rc; 2076 } 2077 } 2078 2079 static void 2080 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2081 spdk_file_op_complete cb_fn, void *cb_arg) 2082 { 2083 struct spdk_fs_request *sync_req; 2084 struct spdk_fs_request *flush_req; 2085 struct spdk_fs_cb_args *sync_args; 2086 struct spdk_fs_cb_args *flush_args; 2087 2088 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2089 2090 pthread_spin_lock(&file->lock); 2091 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2092 BLOBFS_TRACE(file, "done - no data to flush\n"); 2093 pthread_spin_unlock(&file->lock); 2094 cb_fn(cb_arg, 0); 2095 return; 2096 } 2097 2098 sync_req = alloc_fs_request(channel); 2099 assert(sync_req != NULL); 2100 sync_args = &sync_req->args; 2101 2102 flush_req = alloc_fs_request(channel); 2103 assert(flush_req != NULL); 2104 flush_args = &flush_req->args; 2105 2106 sync_args->file = file; 2107 sync_args->fn.file_op = cb_fn; 2108 sync_args->arg = cb_arg; 2109 sync_args->op.sync.offset = file->append_pos; 2110 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2111 pthread_spin_unlock(&file->lock); 2112 2113 flush_args->file = file; 2114 channel->send_request(__file_flush, flush_args); 2115 } 2116 2117 int 2118 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2119 { 2120 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2121 2122 _file_sync(file, channel, __sem_post, &channel->sem); 2123 sem_wait(&channel->sem); 2124 2125 return 0; 2126 } 2127 2128 void 2129 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2130 spdk_file_op_complete cb_fn, void *cb_arg) 2131 { 2132 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2133 2134 _file_sync(file, channel, cb_fn, cb_arg); 2135 } 2136 2137 void 2138 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2139 { 2140 BLOBFS_TRACE(file, "priority=%u\n", priority); 2141 file->priority = priority; 2142 2143 } 2144 2145 /* 2146 * Close routines 2147 */ 2148 2149 static void 2150 __file_close_async_done(void *ctx, int bserrno) 2151 { 2152 struct spdk_fs_request *req = ctx; 2153 struct spdk_fs_cb_args *args = &req->args; 2154 2155 args->fn.file_op(args->arg, bserrno); 2156 free_fs_request(req); 2157 } 2158 2159 static void 2160 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2161 { 2162 pthread_spin_lock(&file->lock); 2163 if (file->ref_count == 0) { 2164 pthread_spin_unlock(&file->lock); 2165 __file_close_async_done(req, -EBADF); 2166 return; 2167 } 2168 2169 file->ref_count--; 2170 if (file->ref_count > 0) { 2171 pthread_spin_unlock(&file->lock); 2172 __file_close_async_done(req, 0); 2173 return; 2174 } 2175 2176 pthread_spin_unlock(&file->lock); 2177 2178 spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req); 2179 } 2180 2181 static void 2182 __file_close_async__sync_done(void *arg, int fserrno) 2183 { 2184 struct spdk_fs_request *req = arg; 2185 struct spdk_fs_cb_args *args = &req->args; 2186 2187 __file_close_async(args->file, req); 2188 } 2189 2190 void 2191 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2192 { 2193 struct spdk_fs_request *req; 2194 struct spdk_fs_cb_args *args; 2195 2196 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2197 if (req == NULL) { 2198 cb_fn(cb_arg, -ENOMEM); 2199 return; 2200 } 2201 2202 args = &req->args; 2203 args->file = file; 2204 args->fn.file_op = cb_fn; 2205 args->arg = cb_arg; 2206 2207 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2208 } 2209 2210 static void 2211 __file_close_done(void *arg, int fserrno) 2212 { 2213 struct spdk_fs_cb_args *args = arg; 2214 2215 args->rc = fserrno; 2216 sem_post(args->sem); 2217 } 2218 2219 static void 2220 __file_close(void *arg) 2221 { 2222 struct spdk_fs_request *req = arg; 2223 struct spdk_fs_cb_args *args = &req->args; 2224 struct spdk_file *file = args->file; 2225 2226 __file_close_async(file, req); 2227 } 2228 2229 int 2230 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2231 { 2232 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2233 struct spdk_fs_request *req; 2234 struct spdk_fs_cb_args *args; 2235 2236 req = alloc_fs_request(channel); 2237 assert(req != NULL); 2238 2239 args = &req->args; 2240 2241 spdk_file_sync(file, _channel); 2242 BLOBFS_TRACE(file, "name=%s\n", file->name); 2243 args->file = file; 2244 args->sem = &channel->sem; 2245 args->fn.file_op = __file_close_done; 2246 args->arg = req; 2247 channel->send_request(__file_close, req); 2248 sem_wait(&channel->sem); 2249 2250 return args->rc; 2251 } 2252 2253 static void 2254 cache_free_buffers(struct spdk_file *file) 2255 { 2256 BLOBFS_TRACE(file, "free=%s\n", file->name); 2257 pthread_spin_lock(&file->lock); 2258 pthread_spin_lock(&g_caches_lock); 2259 if (file->tree->present_mask == 0) { 2260 pthread_spin_unlock(&g_caches_lock); 2261 pthread_spin_unlock(&file->lock); 2262 return; 2263 } 2264 spdk_tree_free_buffers(file->tree); 2265 if (file->tree->present_mask == 0) { 2266 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2267 } 2268 file->last = NULL; 2269 pthread_spin_unlock(&g_caches_lock); 2270 pthread_spin_unlock(&file->lock); 2271 } 2272 2273 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS); 2274 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW); 2275