1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "blobfs_internal.h" 38 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/assert.h" 42 #include "spdk/env.h" 43 #include "spdk/util.h" 44 #include "spdk_internal/log.h" 45 46 #define BLOBFS_TRACE(file, str, args...) \ 47 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args) 48 49 #define BLOBFS_TRACE_RW(file, str, args...) \ 50 SPDK_TRACELOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args) 51 52 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 53 54 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 55 static struct spdk_mempool *g_cache_pool; 56 static TAILQ_HEAD(, spdk_file) g_caches; 57 static pthread_spinlock_t g_caches_lock; 58 59 static void 60 __sem_post(void *arg, int bserrno) 61 { 62 sem_t *sem = arg; 63 64 sem_post(sem); 65 } 66 67 void 68 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 69 { 70 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 71 free(cache_buffer); 72 } 73 74 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 75 76 struct spdk_file { 77 struct spdk_filesystem *fs; 78 struct spdk_blob *blob; 79 char *name; 80 uint64_t length; 81 bool open_for_writing; 82 uint64_t length_flushed; 83 uint64_t append_pos; 84 uint64_t seq_byte_count; 85 uint64_t next_seq_offset; 86 uint32_t priority; 87 TAILQ_ENTRY(spdk_file) tailq; 88 spdk_blob_id blobid; 89 uint32_t ref_count; 90 pthread_spinlock_t lock; 91 struct cache_buffer *last; 92 struct cache_tree *tree; 93 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 94 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 95 TAILQ_ENTRY(spdk_file) cache_tailq; 96 }; 97 98 struct spdk_filesystem { 99 struct spdk_blob_store *bs; 100 TAILQ_HEAD(, spdk_file) files; 101 struct spdk_bs_opts bs_opts; 102 struct spdk_bs_dev *bdev; 103 fs_send_request_fn send_request; 104 struct spdk_io_channel *md_io_channel; 105 struct spdk_fs_channel *md_fs_channel; 106 107 struct { 108 uint32_t max_ops; 109 struct spdk_io_channel *sync_io_channel; 110 struct spdk_fs_channel *sync_fs_channel; 111 } sync_target; 112 113 struct { 114 uint32_t max_ops; 115 } io_target; 116 }; 117 118 struct spdk_fs_cb_args { 119 union { 120 spdk_fs_op_with_handle_complete fs_op_with_handle; 121 spdk_fs_op_complete fs_op; 122 spdk_file_op_with_handle_complete file_op_with_handle; 123 spdk_file_op_complete file_op; 124 spdk_file_stat_op_complete stat_op; 125 } fn; 126 void *arg; 127 sem_t *sem; 128 struct spdk_filesystem *fs; 129 struct spdk_file *file; 130 int rc; 131 bool from_request; 132 union { 133 struct { 134 uint64_t length; 135 } truncate; 136 struct { 137 struct spdk_io_channel *channel; 138 void *user_buf; 139 void *pin_buf; 140 int is_read; 141 off_t offset; 142 size_t length; 143 uint64_t start_page; 144 uint64_t num_pages; 145 uint32_t blocklen; 146 } rw; 147 struct { 148 const char *old_name; 149 const char *new_name; 150 } rename; 151 struct { 152 struct cache_buffer *cache_buffer; 153 uint64_t length; 154 } flush; 155 struct { 156 struct cache_buffer *cache_buffer; 157 uint64_t length; 158 uint64_t offset; 159 } readahead; 160 struct { 161 uint64_t offset; 162 TAILQ_ENTRY(spdk_fs_request) tailq; 163 } sync; 164 struct { 165 uint32_t num_clusters; 166 } resize; 167 struct { 168 const char *name; 169 uint32_t flags; 170 TAILQ_ENTRY(spdk_fs_request) tailq; 171 } open; 172 struct { 173 const char *name; 174 } create; 175 struct { 176 const char *name; 177 } delete; 178 struct { 179 const char *name; 180 } stat; 181 } op; 182 }; 183 184 static void cache_free_buffers(struct spdk_file *file); 185 186 static void 187 __initialize_cache(void) 188 { 189 if (g_cache_pool != NULL) { 190 return; 191 } 192 193 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 194 g_fs_cache_size / CACHE_BUFFER_SIZE, 195 CACHE_BUFFER_SIZE, -1, SPDK_ENV_SOCKET_ID_ANY); 196 TAILQ_INIT(&g_caches); 197 pthread_spin_init(&g_caches_lock, 0); 198 } 199 200 static uint64_t 201 __file_get_blob_size(struct spdk_file *file) 202 { 203 uint64_t cluster_sz; 204 205 cluster_sz = file->fs->bs_opts.cluster_sz; 206 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 207 } 208 209 struct spdk_fs_request { 210 struct spdk_fs_cb_args args; 211 TAILQ_ENTRY(spdk_fs_request) link; 212 struct spdk_fs_channel *channel; 213 }; 214 215 struct spdk_fs_channel { 216 struct spdk_fs_request *req_mem; 217 TAILQ_HEAD(, spdk_fs_request) reqs; 218 sem_t sem; 219 struct spdk_filesystem *fs; 220 struct spdk_io_channel *bs_channel; 221 fs_send_request_fn send_request; 222 }; 223 224 static struct spdk_fs_request * 225 alloc_fs_request(struct spdk_fs_channel *channel) 226 { 227 struct spdk_fs_request *req; 228 229 req = TAILQ_FIRST(&channel->reqs); 230 if (!req) { 231 return NULL; 232 } 233 TAILQ_REMOVE(&channel->reqs, req, link); 234 memset(req, 0, sizeof(*req)); 235 req->channel = channel; 236 req->args.from_request = true; 237 238 return req; 239 } 240 241 static void 242 free_fs_request(struct spdk_fs_request *req) 243 { 244 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 245 } 246 247 static int 248 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 249 uint32_t max_ops) 250 { 251 uint32_t i; 252 253 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 254 if (!channel->req_mem) { 255 return -1; 256 } 257 258 TAILQ_INIT(&channel->reqs); 259 sem_init(&channel->sem, 0, 0); 260 261 for (i = 0; i < max_ops; i++) { 262 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 263 } 264 265 channel->fs = fs; 266 267 return 0; 268 } 269 270 static int 271 _spdk_fs_md_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx) 272 { 273 return _spdk_fs_channel_create(io_device, ctx_buf, 512); 274 } 275 276 static int 277 _spdk_fs_sync_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx) 278 { 279 struct spdk_filesystem *fs; 280 struct spdk_fs_channel *channel = ctx_buf; 281 282 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 283 284 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 285 } 286 287 static int 288 _spdk_fs_io_channel_create(void *io_device, uint32_t priority, void *ctx_buf, void *unique_ctx) 289 { 290 struct spdk_filesystem *fs; 291 struct spdk_fs_channel *channel = ctx_buf; 292 293 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 294 295 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 296 } 297 298 static void 299 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 300 { 301 struct spdk_fs_channel *channel = ctx_buf; 302 303 free(channel->req_mem); 304 if (channel->bs_channel != NULL) { 305 spdk_bs_free_io_channel(channel->bs_channel); 306 } 307 } 308 309 static void 310 __send_request_direct(fs_request_fn fn, void *arg) 311 { 312 fn(arg); 313 } 314 315 static void 316 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 317 { 318 fs->bs = bs; 319 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 320 fs->md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT); 321 fs->md_fs_channel->send_request = __send_request_direct; 322 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, 323 SPDK_IO_PRIORITY_DEFAULT); 324 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 325 } 326 327 static void 328 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 329 { 330 struct spdk_fs_request *req = ctx; 331 struct spdk_fs_cb_args *args = &req->args; 332 struct spdk_filesystem *fs = args->fs; 333 334 if (bserrno == 0) { 335 common_fs_bs_init(fs, bs); 336 } else { 337 free(fs); 338 fs = NULL; 339 } 340 341 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 342 free_fs_request(req); 343 } 344 345 static struct spdk_filesystem * 346 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 347 { 348 struct spdk_filesystem *fs; 349 350 fs = calloc(1, sizeof(*fs)); 351 if (fs == NULL) { 352 return NULL; 353 } 354 355 fs->bdev = dev; 356 fs->send_request = send_request_fn; 357 TAILQ_INIT(&fs->files); 358 spdk_io_device_register(fs, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 359 sizeof(struct spdk_fs_channel)); 360 361 fs->md_io_channel = spdk_get_io_channel(fs, SPDK_IO_PRIORITY_DEFAULT, true, NULL); 362 fs->md_fs_channel = spdk_io_channel_get_ctx(fs->md_io_channel); 363 364 fs->sync_target.max_ops = 512; 365 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 366 sizeof(struct spdk_fs_channel)); 367 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target, SPDK_IO_PRIORITY_DEFAULT, 368 false, NULL); 369 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 370 371 fs->io_target.max_ops = 512; 372 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 373 sizeof(struct spdk_fs_channel)); 374 375 __initialize_cache(); 376 377 return fs; 378 } 379 380 void 381 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 382 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 383 { 384 struct spdk_filesystem *fs; 385 struct spdk_fs_request *req; 386 struct spdk_fs_cb_args *args; 387 388 fs = fs_alloc(dev, send_request_fn); 389 if (fs == NULL) { 390 cb_fn(cb_arg, NULL, -ENOMEM); 391 return; 392 } 393 394 req = alloc_fs_request(fs->md_fs_channel); 395 if (req == NULL) { 396 cb_fn(cb_arg, NULL, -ENOMEM); 397 return; 398 } 399 400 args = &req->args; 401 args->fn.fs_op_with_handle = cb_fn; 402 args->arg = cb_arg; 403 args->fs = fs; 404 405 spdk_bs_init(dev, NULL, init_cb, req); 406 } 407 408 static struct spdk_file * 409 file_alloc(struct spdk_filesystem *fs) 410 { 411 struct spdk_file *file; 412 413 file = calloc(1, sizeof(*file)); 414 if (file == NULL) { 415 return NULL; 416 } 417 418 file->fs = fs; 419 TAILQ_INIT(&file->open_requests); 420 TAILQ_INIT(&file->sync_requests); 421 pthread_spin_init(&file->lock, 0); 422 file->tree = calloc(1, sizeof(*file->tree)); 423 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 424 file->priority = SPDK_FILE_PRIORITY_LOW; 425 return file; 426 } 427 428 static void 429 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 430 { 431 struct spdk_fs_request *req = ctx; 432 struct spdk_fs_cb_args *args = &req->args; 433 struct spdk_filesystem *fs = args->fs; 434 struct spdk_file *f; 435 uint64_t *length; 436 const char *name; 437 size_t value_len; 438 439 if (rc == -ENOENT) { 440 /* Finished iterating */ 441 args->fn.fs_op_with_handle(args->arg, fs, 0); 442 free_fs_request(req); 443 return; 444 } else if (rc < 0) { 445 args->fn.fs_op_with_handle(args->arg, fs, rc); 446 free_fs_request(req); 447 return; 448 } 449 450 rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len); 451 if (rc < 0) { 452 args->fn.fs_op_with_handle(args->arg, fs, rc); 453 free_fs_request(req); 454 return; 455 } 456 457 rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len); 458 if (rc < 0) { 459 args->fn.fs_op_with_handle(args->arg, fs, rc); 460 free_fs_request(req); 461 return; 462 } 463 assert(value_len == 8); 464 465 f = file_alloc(fs); 466 if (f == NULL) { 467 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 468 free_fs_request(req); 469 return; 470 } 471 472 f->name = strdup(name); 473 f->blobid = spdk_blob_get_id(blob); 474 f->length = *length; 475 f->length_flushed = *length; 476 f->append_pos = *length; 477 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 478 479 spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req); 480 } 481 482 static void 483 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 484 { 485 struct spdk_fs_request *req = ctx; 486 struct spdk_fs_cb_args *args = &req->args; 487 struct spdk_filesystem *fs = args->fs; 488 489 if (bserrno != 0) { 490 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 491 free_fs_request(req); 492 free(fs); 493 return; 494 } 495 496 common_fs_bs_init(fs, bs); 497 spdk_bs_md_iter_first(fs->bs, iter_cb, req); 498 } 499 500 void 501 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 502 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 503 { 504 struct spdk_filesystem *fs; 505 struct spdk_fs_cb_args *args; 506 struct spdk_fs_request *req; 507 508 fs = fs_alloc(dev, send_request_fn); 509 if (fs == NULL) { 510 cb_fn(cb_arg, NULL, -ENOMEM); 511 return; 512 } 513 514 req = alloc_fs_request(fs->md_fs_channel); 515 if (req == NULL) { 516 cb_fn(cb_arg, NULL, -ENOMEM); 517 return; 518 } 519 520 args = &req->args; 521 args->fn.fs_op_with_handle = cb_fn; 522 args->arg = cb_arg; 523 args->fs = fs; 524 525 spdk_bs_load(dev, load_cb, req); 526 } 527 528 static void 529 unload_cb(void *ctx, int bserrno) 530 { 531 struct spdk_fs_request *req = ctx; 532 struct spdk_fs_cb_args *args = &req->args; 533 struct spdk_filesystem *fs = args->fs; 534 535 args->fn.fs_op(args->arg, bserrno); 536 free(req); 537 spdk_io_device_unregister(&fs->io_target); 538 spdk_io_device_unregister(&fs->sync_target); 539 spdk_io_device_unregister(fs); 540 free(fs); 541 } 542 543 void 544 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 545 { 546 struct spdk_fs_request *req; 547 struct spdk_fs_cb_args *args; 548 549 /* 550 * We must free the md_channel before unloading the blobstore, so just 551 * allocate this request from the general heap. 552 */ 553 req = calloc(1, sizeof(*req)); 554 if (req == NULL) { 555 cb_fn(cb_arg, -ENOMEM); 556 return; 557 } 558 559 args = &req->args; 560 args->fn.fs_op = cb_fn; 561 args->arg = cb_arg; 562 args->fs = fs; 563 564 spdk_fs_free_io_channel(fs->md_io_channel); 565 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 566 spdk_bs_unload(fs->bs, unload_cb, req); 567 } 568 569 static struct spdk_file * 570 fs_find_file(struct spdk_filesystem *fs, const char *name) 571 { 572 struct spdk_file *file; 573 574 TAILQ_FOREACH(file, &fs->files, tailq) { 575 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 576 return file; 577 } 578 } 579 580 return NULL; 581 } 582 583 void 584 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 585 spdk_file_stat_op_complete cb_fn, void *cb_arg) 586 { 587 struct spdk_file_stat stat; 588 struct spdk_file *f = NULL; 589 590 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 591 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 592 return; 593 } 594 595 f = fs_find_file(fs, name); 596 if (f != NULL) { 597 stat.blobid = f->blobid; 598 stat.size = f->length; 599 cb_fn(cb_arg, &stat, 0); 600 return; 601 } 602 603 cb_fn(cb_arg, NULL, -ENOENT); 604 } 605 606 static void 607 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 608 { 609 struct spdk_fs_request *req = arg; 610 struct spdk_fs_cb_args *args = &req->args; 611 612 args->rc = fserrno; 613 if (fserrno == 0) { 614 memcpy(args->arg, stat, sizeof(*stat)); 615 } 616 sem_post(args->sem); 617 } 618 619 static void 620 __file_stat(void *arg) 621 { 622 struct spdk_fs_request *req = arg; 623 struct spdk_fs_cb_args *args = &req->args; 624 625 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 626 args->fn.stat_op, req); 627 } 628 629 int 630 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 631 const char *name, struct spdk_file_stat *stat) 632 { 633 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 634 struct spdk_fs_request *req; 635 int rc; 636 637 req = alloc_fs_request(channel); 638 assert(req != NULL); 639 640 req->args.fs = fs; 641 req->args.op.stat.name = name; 642 req->args.fn.stat_op = __copy_stat; 643 req->args.arg = stat; 644 req->args.sem = &channel->sem; 645 channel->send_request(__file_stat, req); 646 sem_wait(&channel->sem); 647 648 rc = req->args.rc; 649 free_fs_request(req); 650 651 return rc; 652 } 653 654 static void 655 fs_create_blob_close_cb(void *ctx, int bserrno) 656 { 657 struct spdk_fs_request *req = ctx; 658 struct spdk_fs_cb_args *args = &req->args; 659 660 args->fn.file_op(args->arg, bserrno); 661 free_fs_request(req); 662 } 663 664 static void 665 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 666 { 667 struct spdk_fs_request *req = ctx; 668 struct spdk_fs_cb_args *args = &req->args; 669 struct spdk_file *f = args->file; 670 uint64_t length = 0; 671 672 f->blob = blob; 673 spdk_bs_md_resize_blob(blob, 1); 674 spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 675 spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length)); 676 677 spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args); 678 } 679 680 static void 681 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 682 { 683 struct spdk_fs_request *req = ctx; 684 struct spdk_fs_cb_args *args = &req->args; 685 struct spdk_file *f = args->file; 686 687 f->blobid = blobid; 688 spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 689 } 690 691 void 692 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 693 spdk_file_op_complete cb_fn, void *cb_arg) 694 { 695 struct spdk_file *file; 696 struct spdk_fs_request *req; 697 struct spdk_fs_cb_args *args; 698 699 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 700 cb_fn(cb_arg, -ENAMETOOLONG); 701 return; 702 } 703 704 file = fs_find_file(fs, name); 705 if (file != NULL) { 706 cb_fn(cb_arg, -EEXIST); 707 return; 708 } 709 710 file = file_alloc(fs); 711 if (file == NULL) { 712 cb_fn(cb_arg, -ENOMEM); 713 return; 714 } 715 716 req = alloc_fs_request(fs->md_fs_channel); 717 if (req == NULL) { 718 cb_fn(cb_arg, -ENOMEM); 719 return; 720 } 721 722 args = &req->args; 723 args->file = file; 724 args->fn.file_op = cb_fn; 725 args->arg = cb_arg; 726 727 file->name = strdup(name); 728 spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args); 729 } 730 731 static void 732 __fs_create_file_done(void *arg, int fserrno) 733 { 734 struct spdk_fs_request *req = arg; 735 struct spdk_fs_cb_args *args = &req->args; 736 737 args->rc = fserrno; 738 sem_post(args->sem); 739 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 740 } 741 742 static void 743 __fs_create_file(void *arg) 744 { 745 struct spdk_fs_request *req = arg; 746 struct spdk_fs_cb_args *args = &req->args; 747 748 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 749 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 750 } 751 752 int 753 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 754 { 755 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 756 struct spdk_fs_request *req; 757 struct spdk_fs_cb_args *args; 758 int rc; 759 760 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 761 762 req = alloc_fs_request(channel); 763 assert(req != NULL); 764 765 args = &req->args; 766 args->fs = fs; 767 args->op.create.name = name; 768 args->sem = &channel->sem; 769 fs->send_request(__fs_create_file, req); 770 sem_wait(&channel->sem); 771 rc = args->rc; 772 free_fs_request(req); 773 774 return rc; 775 } 776 777 static void 778 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 779 { 780 struct spdk_fs_request *req = ctx; 781 struct spdk_fs_cb_args *args = &req->args; 782 struct spdk_file *f = args->file; 783 784 f->blob = blob; 785 while (!TAILQ_EMPTY(&f->open_requests)) { 786 req = TAILQ_FIRST(&f->open_requests); 787 args = &req->args; 788 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 789 args->fn.file_op_with_handle(args->arg, f, bserrno); 790 free_fs_request(req); 791 } 792 } 793 794 static void 795 fs_open_blob_create_cb(void *ctx, int bserrno) 796 { 797 struct spdk_fs_request *req = ctx; 798 struct spdk_fs_cb_args *args = &req->args; 799 struct spdk_file *file = args->file; 800 struct spdk_filesystem *fs = args->fs; 801 802 if (file == NULL) { 803 /* 804 * This is from an open with CREATE flag - the file 805 * is now created so look it up in the file list for this 806 * filesystem. 807 */ 808 file = fs_find_file(fs, args->op.open.name); 809 assert(file != NULL); 810 args->file = file; 811 } 812 813 file->ref_count++; 814 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 815 if (file->ref_count == 1) { 816 assert(file->blob == NULL); 817 spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 818 } else if (file->blob != NULL) { 819 fs_open_blob_done(req, file->blob, 0); 820 } else { 821 /* 822 * The blob open for this file is in progress due to a previous 823 * open request. When that open completes, it will invoke the 824 * open callback for this request. 825 */ 826 } 827 } 828 829 void 830 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 831 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 832 { 833 struct spdk_file *f = NULL; 834 struct spdk_fs_request *req; 835 struct spdk_fs_cb_args *args; 836 837 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 838 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 839 return; 840 } 841 842 f = fs_find_file(fs, name); 843 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 844 cb_fn(cb_arg, NULL, -ENOENT); 845 return; 846 } 847 848 req = alloc_fs_request(fs->md_fs_channel); 849 if (req == NULL) { 850 cb_fn(cb_arg, NULL, -ENOMEM); 851 return; 852 } 853 854 args = &req->args; 855 args->fn.file_op_with_handle = cb_fn; 856 args->arg = cb_arg; 857 args->file = f; 858 args->fs = fs; 859 args->op.open.name = name; 860 861 if (f == NULL) { 862 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 863 } else { 864 fs_open_blob_create_cb(req, 0); 865 } 866 } 867 868 static void 869 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 870 { 871 struct spdk_fs_request *req = arg; 872 struct spdk_fs_cb_args *args = &req->args; 873 874 args->file = file; 875 args->rc = bserrno; 876 sem_post(args->sem); 877 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 878 } 879 880 static void 881 __fs_open_file(void *arg) 882 { 883 struct spdk_fs_request *req = arg; 884 struct spdk_fs_cb_args *args = &req->args; 885 886 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 887 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 888 __fs_open_file_done, req); 889 } 890 891 int 892 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 893 const char *name, uint32_t flags, struct spdk_file **file) 894 { 895 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 896 struct spdk_fs_request *req; 897 struct spdk_fs_cb_args *args; 898 int rc; 899 900 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 901 902 req = alloc_fs_request(channel); 903 assert(req != NULL); 904 905 args = &req->args; 906 args->fs = fs; 907 args->op.open.name = name; 908 args->op.open.flags = flags; 909 args->sem = &channel->sem; 910 fs->send_request(__fs_open_file, req); 911 sem_wait(&channel->sem); 912 rc = args->rc; 913 if (rc == 0) { 914 *file = args->file; 915 } else { 916 *file = NULL; 917 } 918 free_fs_request(req); 919 920 return rc; 921 } 922 923 static void 924 fs_rename_blob_close_cb(void *ctx, int bserrno) 925 { 926 struct spdk_fs_request *req = ctx; 927 struct spdk_fs_cb_args *args = &req->args; 928 929 args->fn.fs_op(args->arg, bserrno); 930 free_fs_request(req); 931 } 932 933 static void 934 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 935 { 936 struct spdk_fs_request *req = ctx; 937 struct spdk_fs_cb_args *args = &req->args; 938 struct spdk_file *f = args->file; 939 const char *new_name = args->op.rename.new_name; 940 941 f->blob = blob; 942 spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 943 spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req); 944 } 945 946 static void 947 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 948 { 949 struct spdk_fs_cb_args *args = &req->args; 950 struct spdk_file *f; 951 952 f = fs_find_file(args->fs, args->op.rename.old_name); 953 if (f == NULL) { 954 args->fn.fs_op(args->arg, -ENOENT); 955 free_fs_request(req); 956 return; 957 } 958 959 free(f->name); 960 f->name = strdup(args->op.rename.new_name); 961 args->file = f; 962 spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 963 } 964 965 static void 966 fs_rename_delete_done(void *arg, int fserrno) 967 { 968 __spdk_fs_md_rename_file(arg); 969 } 970 971 void 972 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 973 const char *old_name, const char *new_name, 974 spdk_file_op_complete cb_fn, void *cb_arg) 975 { 976 struct spdk_file *f; 977 struct spdk_fs_request *req; 978 struct spdk_fs_cb_args *args; 979 980 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name); 981 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 982 cb_fn(cb_arg, -ENAMETOOLONG); 983 return; 984 } 985 986 req = alloc_fs_request(fs->md_fs_channel); 987 if (req == NULL) { 988 cb_fn(cb_arg, -ENOMEM); 989 return; 990 } 991 992 args = &req->args; 993 args->fn.fs_op = cb_fn; 994 args->fs = fs; 995 args->arg = cb_arg; 996 args->op.rename.old_name = old_name; 997 args->op.rename.new_name = new_name; 998 999 f = fs_find_file(fs, new_name); 1000 if (f == NULL) { 1001 __spdk_fs_md_rename_file(req); 1002 return; 1003 } 1004 1005 /* 1006 * The rename overwrites an existing file. So delete the existing file, then 1007 * do the actual rename. 1008 */ 1009 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1010 } 1011 1012 static void 1013 __fs_rename_file_done(void *arg, int fserrno) 1014 { 1015 struct spdk_fs_request *req = arg; 1016 struct spdk_fs_cb_args *args = &req->args; 1017 1018 args->rc = fserrno; 1019 sem_post(args->sem); 1020 } 1021 1022 static void 1023 __fs_rename_file(void *arg) 1024 { 1025 struct spdk_fs_request *req = arg; 1026 struct spdk_fs_cb_args *args = &req->args; 1027 1028 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1029 __fs_rename_file_done, req); 1030 } 1031 1032 int 1033 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1034 const char *old_name, const char *new_name) 1035 { 1036 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1037 struct spdk_fs_request *req; 1038 struct spdk_fs_cb_args *args; 1039 int rc; 1040 1041 req = alloc_fs_request(channel); 1042 assert(req != NULL); 1043 1044 args = &req->args; 1045 1046 args->fs = fs; 1047 args->op.rename.old_name = old_name; 1048 args->op.rename.new_name = new_name; 1049 args->sem = &channel->sem; 1050 fs->send_request(__fs_rename_file, req); 1051 sem_wait(&channel->sem); 1052 rc = args->rc; 1053 free_fs_request(req); 1054 return rc; 1055 } 1056 1057 static void 1058 blob_delete_cb(void *ctx, int bserrno) 1059 { 1060 struct spdk_fs_request *req = ctx; 1061 struct spdk_fs_cb_args *args = &req->args; 1062 1063 args->fn.file_op(args->arg, bserrno); 1064 free_fs_request(req); 1065 } 1066 1067 void 1068 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1069 spdk_file_op_complete cb_fn, void *cb_arg) 1070 { 1071 struct spdk_file *f; 1072 spdk_blob_id blobid; 1073 struct spdk_fs_request *req; 1074 struct spdk_fs_cb_args *args; 1075 1076 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1077 1078 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1079 cb_fn(cb_arg, -ENAMETOOLONG); 1080 return; 1081 } 1082 1083 f = fs_find_file(fs, name); 1084 if (f == NULL) { 1085 cb_fn(cb_arg, -ENOENT); 1086 return; 1087 } 1088 1089 if (f->ref_count > 0) { 1090 /* For now, do not allow deleting files with open references. */ 1091 cb_fn(cb_arg, -EBUSY); 1092 return; 1093 } 1094 1095 req = alloc_fs_request(fs->md_fs_channel); 1096 if (req == NULL) { 1097 cb_fn(cb_arg, -ENOMEM); 1098 return; 1099 } 1100 1101 TAILQ_REMOVE(&fs->files, f, tailq); 1102 1103 cache_free_buffers(f); 1104 1105 blobid = f->blobid; 1106 1107 free(f->name); 1108 free(f->tree); 1109 free(f); 1110 1111 args = &req->args; 1112 args->fn.file_op = cb_fn; 1113 args->arg = cb_arg; 1114 spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1115 } 1116 1117 static void 1118 __fs_delete_file_done(void *arg, int fserrno) 1119 { 1120 struct spdk_fs_request *req = arg; 1121 struct spdk_fs_cb_args *args = &req->args; 1122 1123 args->rc = fserrno; 1124 sem_post(args->sem); 1125 } 1126 1127 static void 1128 __fs_delete_file(void *arg) 1129 { 1130 struct spdk_fs_request *req = arg; 1131 struct spdk_fs_cb_args *args = &req->args; 1132 1133 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1134 } 1135 1136 int 1137 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1138 const char *name) 1139 { 1140 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1141 struct spdk_fs_request *req; 1142 struct spdk_fs_cb_args *args; 1143 int rc; 1144 1145 req = alloc_fs_request(channel); 1146 assert(req != NULL); 1147 1148 args = &req->args; 1149 args->fs = fs; 1150 args->op.delete.name = name; 1151 args->sem = &channel->sem; 1152 fs->send_request(__fs_delete_file, req); 1153 sem_wait(&channel->sem); 1154 rc = args->rc; 1155 free_fs_request(req); 1156 1157 return rc; 1158 } 1159 1160 spdk_fs_iter 1161 spdk_fs_iter_first(struct spdk_filesystem *fs) 1162 { 1163 struct spdk_file *f; 1164 1165 f = TAILQ_FIRST(&fs->files); 1166 return f; 1167 } 1168 1169 spdk_fs_iter 1170 spdk_fs_iter_next(spdk_fs_iter iter) 1171 { 1172 struct spdk_file *f = iter; 1173 1174 if (f == NULL) { 1175 return NULL; 1176 } 1177 1178 f = TAILQ_NEXT(f, tailq); 1179 return f; 1180 } 1181 1182 const char * 1183 spdk_file_get_name(struct spdk_file *file) 1184 { 1185 return file->name; 1186 } 1187 1188 uint64_t 1189 spdk_file_get_length(struct spdk_file *file) 1190 { 1191 assert(file != NULL); 1192 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1193 return file->length; 1194 } 1195 1196 static void 1197 fs_truncate_complete_cb(void *ctx, int bserrno) 1198 { 1199 struct spdk_fs_request *req = ctx; 1200 struct spdk_fs_cb_args *args = &req->args; 1201 1202 args->fn.file_op(args->arg, bserrno); 1203 free_fs_request(req); 1204 } 1205 1206 static uint64_t 1207 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1208 { 1209 return (length + cluster_sz - 1) / cluster_sz; 1210 } 1211 1212 void 1213 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1214 spdk_file_op_complete cb_fn, void *cb_arg) 1215 { 1216 struct spdk_filesystem *fs; 1217 size_t num_clusters; 1218 struct spdk_fs_request *req; 1219 struct spdk_fs_cb_args *args; 1220 1221 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1222 if (length == file->length) { 1223 cb_fn(cb_arg, 0); 1224 return; 1225 } 1226 1227 req = alloc_fs_request(file->fs->md_fs_channel); 1228 if (req == NULL) { 1229 cb_fn(cb_arg, -ENOMEM); 1230 return; 1231 } 1232 1233 args = &req->args; 1234 args->fn.file_op = cb_fn; 1235 args->arg = cb_arg; 1236 args->file = file; 1237 fs = file->fs; 1238 1239 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1240 1241 spdk_bs_md_resize_blob(file->blob, num_clusters); 1242 spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length)); 1243 1244 file->length = length; 1245 if (file->append_pos > file->length) { 1246 file->append_pos = file->length; 1247 } 1248 1249 spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args); 1250 } 1251 1252 static void 1253 __truncate(void *arg) 1254 { 1255 struct spdk_fs_request *req = arg; 1256 struct spdk_fs_cb_args *args = &req->args; 1257 1258 spdk_file_truncate_async(args->file, args->op.truncate.length, 1259 args->fn.file_op, args->arg); 1260 } 1261 1262 void 1263 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1264 uint64_t length) 1265 { 1266 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1267 struct spdk_fs_request *req; 1268 struct spdk_fs_cb_args *args; 1269 1270 req = alloc_fs_request(channel); 1271 assert(req != NULL); 1272 1273 args = &req->args; 1274 1275 args->file = file; 1276 args->op.truncate.length = length; 1277 args->fn.file_op = __sem_post; 1278 args->arg = &channel->sem; 1279 1280 channel->send_request(__truncate, req); 1281 sem_wait(&channel->sem); 1282 free_fs_request(req); 1283 } 1284 1285 static void 1286 __rw_done(void *ctx, int bserrno) 1287 { 1288 struct spdk_fs_request *req = ctx; 1289 struct spdk_fs_cb_args *args = &req->args; 1290 1291 spdk_free(args->op.rw.pin_buf); 1292 args->fn.file_op(args->arg, bserrno); 1293 free_fs_request(req); 1294 } 1295 1296 static void 1297 __read_done(void *ctx, int bserrno) 1298 { 1299 struct spdk_fs_request *req = ctx; 1300 struct spdk_fs_cb_args *args = &req->args; 1301 1302 if (args->op.rw.is_read) { 1303 memcpy(args->op.rw.user_buf, 1304 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1305 args->op.rw.length); 1306 __rw_done(req, 0); 1307 } else { 1308 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1309 args->op.rw.user_buf, 1310 args->op.rw.length); 1311 spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel, 1312 args->op.rw.pin_buf, 1313 args->op.rw.start_page, args->op.rw.num_pages, 1314 __rw_done, req); 1315 } 1316 } 1317 1318 static void 1319 __do_blob_read(void *ctx, int fserrno) 1320 { 1321 struct spdk_fs_request *req = ctx; 1322 struct spdk_fs_cb_args *args = &req->args; 1323 1324 spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel, 1325 args->op.rw.pin_buf, 1326 args->op.rw.start_page, args->op.rw.num_pages, 1327 __read_done, req); 1328 } 1329 1330 static void 1331 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1332 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1333 { 1334 uint64_t end_page; 1335 1336 *page_size = spdk_bs_get_page_size(file->fs->bs); 1337 *start_page = offset / *page_size; 1338 end_page = (offset + length - 1) / *page_size; 1339 *num_pages = (end_page - *start_page + 1); 1340 } 1341 1342 static void 1343 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1344 void *payload, uint64_t offset, uint64_t length, 1345 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1346 { 1347 struct spdk_fs_request *req; 1348 struct spdk_fs_cb_args *args; 1349 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1350 uint64_t start_page, num_pages, pin_buf_length; 1351 uint32_t page_size; 1352 1353 if (is_read && offset + length > file->length) { 1354 cb_fn(cb_arg, -EINVAL); 1355 return; 1356 } 1357 1358 req = alloc_fs_request(channel); 1359 if (req == NULL) { 1360 cb_fn(cb_arg, -ENOMEM); 1361 return; 1362 } 1363 1364 args = &req->args; 1365 args->fn.file_op = cb_fn; 1366 args->arg = cb_arg; 1367 args->file = file; 1368 args->op.rw.channel = channel->bs_channel; 1369 args->op.rw.user_buf = payload; 1370 args->op.rw.is_read = is_read; 1371 args->op.rw.offset = offset; 1372 args->op.rw.length = length; 1373 1374 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1375 pin_buf_length = num_pages * page_size; 1376 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, 4096, NULL); 1377 1378 args->op.rw.start_page = start_page; 1379 args->op.rw.num_pages = num_pages; 1380 1381 if (!is_read && file->length < offset + length) { 1382 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1383 } else { 1384 __do_blob_read(req, 0); 1385 } 1386 } 1387 1388 void 1389 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1390 void *payload, uint64_t offset, uint64_t length, 1391 spdk_file_op_complete cb_fn, void *cb_arg) 1392 { 1393 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1394 } 1395 1396 void 1397 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1398 void *payload, uint64_t offset, uint64_t length, 1399 spdk_file_op_complete cb_fn, void *cb_arg) 1400 { 1401 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n", 1402 file->name, offset, length); 1403 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1404 } 1405 1406 struct spdk_io_channel * 1407 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs, uint32_t priority) 1408 { 1409 struct spdk_io_channel *io_channel; 1410 struct spdk_fs_channel *fs_channel; 1411 1412 io_channel = spdk_get_io_channel(&fs->io_target, priority, false, NULL); 1413 fs_channel = spdk_io_channel_get_ctx(io_channel); 1414 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT); 1415 fs_channel->send_request = __send_request_direct; 1416 1417 return io_channel; 1418 } 1419 1420 struct spdk_io_channel * 1421 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs, uint32_t priority) 1422 { 1423 struct spdk_io_channel *io_channel; 1424 struct spdk_fs_channel *fs_channel; 1425 1426 io_channel = spdk_get_io_channel(&fs->io_target, priority, false, NULL); 1427 fs_channel = spdk_io_channel_get_ctx(io_channel); 1428 fs_channel->send_request = fs->send_request; 1429 1430 return io_channel; 1431 } 1432 1433 void 1434 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1435 { 1436 spdk_put_io_channel(channel); 1437 } 1438 1439 void 1440 spdk_fs_set_cache_size(uint64_t size_in_mb) 1441 { 1442 g_fs_cache_size = size_in_mb * 1024 * 1024; 1443 } 1444 1445 uint64_t 1446 spdk_fs_get_cache_size(void) 1447 { 1448 return g_fs_cache_size / (1024 * 1024); 1449 } 1450 1451 static void __file_flush(void *_args); 1452 1453 static void * 1454 alloc_cache_memory_buffer(struct spdk_file *context) 1455 { 1456 struct spdk_file *file; 1457 void *buf; 1458 1459 buf = spdk_mempool_get(g_cache_pool); 1460 if (buf != NULL) { 1461 return buf; 1462 } 1463 1464 pthread_spin_lock(&g_caches_lock); 1465 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1466 if (!file->open_for_writing && 1467 file->priority == SPDK_FILE_PRIORITY_LOW && 1468 file != context) { 1469 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1470 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1471 break; 1472 } 1473 } 1474 pthread_spin_unlock(&g_caches_lock); 1475 if (file != NULL) { 1476 cache_free_buffers(file); 1477 buf = spdk_mempool_get(g_cache_pool); 1478 if (buf != NULL) { 1479 return buf; 1480 } 1481 } 1482 1483 pthread_spin_lock(&g_caches_lock); 1484 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1485 if (!file->open_for_writing && file != context) { 1486 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1487 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1488 break; 1489 } 1490 } 1491 pthread_spin_unlock(&g_caches_lock); 1492 if (file != NULL) { 1493 cache_free_buffers(file); 1494 buf = spdk_mempool_get(g_cache_pool); 1495 if (buf != NULL) { 1496 return buf; 1497 } 1498 } 1499 1500 pthread_spin_lock(&g_caches_lock); 1501 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1502 if (file != context) { 1503 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1504 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1505 break; 1506 } 1507 } 1508 pthread_spin_unlock(&g_caches_lock); 1509 if (file != NULL) { 1510 cache_free_buffers(file); 1511 buf = spdk_mempool_get(g_cache_pool); 1512 if (buf != NULL) { 1513 return buf; 1514 } 1515 } 1516 1517 assert(false); 1518 return NULL; 1519 } 1520 1521 static struct cache_buffer * 1522 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1523 { 1524 struct cache_buffer *buf; 1525 int count = 0; 1526 1527 buf = calloc(1, sizeof(*buf)); 1528 if (buf == NULL) { 1529 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "calloc failed\n"); 1530 return NULL; 1531 } 1532 1533 buf->buf = alloc_cache_memory_buffer(file); 1534 if (buf->buf == NULL) { 1535 while (buf->buf == NULL) { 1536 /* 1537 * TODO: alloc_cache_memory_buffer() should eventually free 1538 * some buffers. Need a more sophisticated check here, instead 1539 * of just bailing if 100 tries does not result in getting a 1540 * free buffer. This will involve using the sync channel's 1541 * semaphore to block until a buffer becomes available. 1542 */ 1543 if (count++ == 100) { 1544 SPDK_ERRLOG("could not allocate cache buffer\n"); 1545 assert(false); 1546 free(buf); 1547 return NULL; 1548 } 1549 buf->buf = alloc_cache_memory_buffer(file); 1550 } 1551 } 1552 1553 buf->buf_size = CACHE_BUFFER_SIZE; 1554 buf->offset = offset; 1555 1556 pthread_spin_lock(&g_caches_lock); 1557 if (file->tree->present_mask == 0) { 1558 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1559 } 1560 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1561 pthread_spin_unlock(&g_caches_lock); 1562 1563 return buf; 1564 } 1565 1566 static struct cache_buffer * 1567 cache_append_buffer(struct spdk_file *file) 1568 { 1569 struct cache_buffer *last; 1570 1571 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1572 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1573 1574 last = cache_insert_buffer(file, file->append_pos); 1575 if (last == NULL) { 1576 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n"); 1577 return NULL; 1578 } 1579 1580 if (file->last != NULL) { 1581 file->last->next = last; 1582 } 1583 file->last = last; 1584 1585 return last; 1586 } 1587 1588 static void 1589 __wake_caller(struct spdk_fs_cb_args *args) 1590 { 1591 sem_post(args->sem); 1592 } 1593 1594 static void 1595 __file_cache_finish_sync(struct spdk_file *file) 1596 { 1597 struct spdk_fs_request *sync_req; 1598 struct spdk_fs_cb_args *sync_args; 1599 1600 pthread_spin_lock(&file->lock); 1601 while (!TAILQ_EMPTY(&file->sync_requests)) { 1602 sync_req = TAILQ_FIRST(&file->sync_requests); 1603 sync_args = &sync_req->args; 1604 if (sync_args->op.sync.offset > file->length_flushed) { 1605 break; 1606 } 1607 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1608 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1609 pthread_spin_unlock(&file->lock); 1610 sync_args->fn.file_op(sync_args->arg, 0); 1611 pthread_spin_lock(&file->lock); 1612 free_fs_request(sync_req); 1613 } 1614 pthread_spin_unlock(&file->lock); 1615 } 1616 1617 static void 1618 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1619 { 1620 struct spdk_file *file = ctx; 1621 1622 __file_cache_finish_sync(file); 1623 } 1624 1625 static void 1626 __free_args(struct spdk_fs_cb_args *args) 1627 { 1628 struct spdk_fs_request *req; 1629 1630 if (!args->from_request) { 1631 free(args); 1632 } else { 1633 /* Depends on args being at the start of the spdk_fs_request structure. */ 1634 req = (struct spdk_fs_request *)args; 1635 free_fs_request(req); 1636 } 1637 } 1638 1639 static void 1640 __file_flush_done(void *arg, int bserrno) 1641 { 1642 struct spdk_fs_cb_args *args = arg; 1643 struct spdk_fs_request *sync_req; 1644 struct spdk_file *file = args->file; 1645 struct cache_buffer *next = args->op.flush.cache_buffer; 1646 1647 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1648 1649 pthread_spin_lock(&file->lock); 1650 next->in_progress = false; 1651 next->bytes_flushed += args->op.flush.length; 1652 file->length_flushed += args->op.flush.length; 1653 if (file->length_flushed > file->length) { 1654 file->length = file->length_flushed; 1655 } 1656 if (next->bytes_flushed == next->buf_size) { 1657 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1658 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1659 } 1660 1661 TAILQ_FOREACH_REVERSE(sync_req, &file->sync_requests, sync_requests_head, args.op.sync.tailq) { 1662 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1663 break; 1664 } 1665 } 1666 1667 /* 1668 * Assert that there is no cached data that extends past the end of the underlying 1669 * blob. 1670 */ 1671 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1672 next->bytes_filled == 0); 1673 1674 if (sync_req != NULL) { 1675 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1676 spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed, 1677 sizeof(file->length_flushed)); 1678 1679 pthread_spin_unlock(&file->lock); 1680 spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file); 1681 } else { 1682 pthread_spin_unlock(&file->lock); 1683 __file_cache_finish_sync(file); 1684 } 1685 1686 __file_flush(args); 1687 } 1688 1689 static void 1690 __file_flush(void *_args) 1691 { 1692 struct spdk_fs_cb_args *args = _args; 1693 struct spdk_file *file = args->file; 1694 struct cache_buffer *next; 1695 uint64_t offset, length, start_page, num_pages; 1696 uint32_t page_size; 1697 1698 pthread_spin_lock(&file->lock); 1699 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1700 if (next == NULL || next->in_progress) { 1701 /* 1702 * There is either no data to flush, or a flush I/O is already in 1703 * progress. So return immediately - if a flush I/O is in 1704 * progress we will flush more data after that is completed. 1705 */ 1706 __free_args(args); 1707 pthread_spin_unlock(&file->lock); 1708 return; 1709 } 1710 1711 offset = next->offset + next->bytes_flushed; 1712 length = next->bytes_filled - next->bytes_flushed; 1713 if (length == 0) { 1714 __free_args(args); 1715 pthread_spin_unlock(&file->lock); 1716 return; 1717 } 1718 args->op.flush.length = length; 1719 args->op.flush.cache_buffer = next; 1720 1721 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1722 1723 next->in_progress = true; 1724 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1725 offset, length, start_page, num_pages); 1726 pthread_spin_unlock(&file->lock); 1727 spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1728 next->buf + (start_page * page_size) - next->offset, 1729 start_page, num_pages, 1730 __file_flush_done, args); 1731 } 1732 1733 static void 1734 __file_extend_done(void *arg, int bserrno) 1735 { 1736 struct spdk_fs_cb_args *args = arg; 1737 1738 __wake_caller(args); 1739 } 1740 1741 static void 1742 __file_extend_blob(void *_args) 1743 { 1744 struct spdk_fs_cb_args *args = _args; 1745 struct spdk_file *file = args->file; 1746 1747 spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters); 1748 1749 spdk_bs_md_sync_blob(file->blob, __file_extend_done, args); 1750 } 1751 1752 static void 1753 __rw_from_file_done(void *arg, int bserrno) 1754 { 1755 struct spdk_fs_cb_args *args = arg; 1756 1757 __wake_caller(args); 1758 __free_args(args); 1759 } 1760 1761 static void 1762 __rw_from_file(void *_args) 1763 { 1764 struct spdk_fs_cb_args *args = _args; 1765 struct spdk_file *file = args->file; 1766 1767 if (args->op.rw.is_read) { 1768 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1769 args->op.rw.offset, args->op.rw.length, 1770 __rw_from_file_done, args); 1771 } else { 1772 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1773 args->op.rw.offset, args->op.rw.length, 1774 __rw_from_file_done, args); 1775 } 1776 } 1777 1778 static int 1779 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1780 uint64_t offset, uint64_t length, bool is_read) 1781 { 1782 struct spdk_fs_cb_args *args; 1783 1784 args = calloc(1, sizeof(*args)); 1785 if (args == NULL) { 1786 sem_post(sem); 1787 return -ENOMEM; 1788 } 1789 1790 args->file = file; 1791 args->sem = sem; 1792 args->op.rw.user_buf = payload; 1793 args->op.rw.offset = offset; 1794 args->op.rw.length = length; 1795 args->op.rw.is_read = is_read; 1796 file->fs->send_request(__rw_from_file, args); 1797 return 0; 1798 } 1799 1800 int 1801 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 1802 void *payload, uint64_t offset, uint64_t length) 1803 { 1804 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1805 struct spdk_fs_cb_args *args; 1806 uint64_t rem_length, copy, blob_size, cluster_sz; 1807 uint32_t cache_buffers_filled = 0; 1808 uint8_t *cur_payload; 1809 struct cache_buffer *last; 1810 1811 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 1812 1813 if (length == 0) { 1814 return 0; 1815 } 1816 1817 if (offset != file->append_pos) { 1818 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 1819 return -EINVAL; 1820 } 1821 1822 pthread_spin_lock(&file->lock); 1823 file->open_for_writing = true; 1824 1825 if (file->last == NULL) { 1826 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 1827 cache_append_buffer(file); 1828 } else { 1829 int rc; 1830 1831 file->append_pos += length; 1832 rc = __send_rw_from_file(file, &channel->sem, payload, 1833 offset, length, false); 1834 pthread_spin_unlock(&file->lock); 1835 sem_wait(&channel->sem); 1836 return rc; 1837 } 1838 } 1839 1840 blob_size = __file_get_blob_size(file); 1841 1842 if ((offset + length) > blob_size) { 1843 struct spdk_fs_cb_args extend_args = {}; 1844 1845 cluster_sz = file->fs->bs_opts.cluster_sz; 1846 extend_args.sem = &channel->sem; 1847 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 1848 extend_args.file = file; 1849 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 1850 pthread_spin_unlock(&file->lock); 1851 file->fs->send_request(__file_extend_blob, &extend_args); 1852 sem_wait(&channel->sem); 1853 } 1854 1855 last = file->last; 1856 rem_length = length; 1857 cur_payload = payload; 1858 while (rem_length > 0) { 1859 copy = last->buf_size - last->bytes_filled; 1860 if (copy > rem_length) { 1861 copy = rem_length; 1862 } 1863 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 1864 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 1865 file->append_pos += copy; 1866 if (file->length < file->append_pos) { 1867 file->length = file->append_pos; 1868 } 1869 cur_payload += copy; 1870 last->bytes_filled += copy; 1871 rem_length -= copy; 1872 if (last->bytes_filled == last->buf_size) { 1873 cache_buffers_filled++; 1874 last = cache_append_buffer(file); 1875 if (last == NULL) { 1876 BLOBFS_TRACE(file, "nomem\n"); 1877 pthread_spin_unlock(&file->lock); 1878 return -ENOMEM; 1879 } 1880 } 1881 } 1882 1883 if (cache_buffers_filled == 0) { 1884 pthread_spin_unlock(&file->lock); 1885 return 0; 1886 } 1887 1888 args = calloc(1, sizeof(*args)); 1889 if (args == NULL) { 1890 pthread_spin_unlock(&file->lock); 1891 return -ENOMEM; 1892 } 1893 1894 args->file = file; 1895 file->fs->send_request(__file_flush, args); 1896 pthread_spin_unlock(&file->lock); 1897 return 0; 1898 } 1899 1900 static void 1901 __readahead_done(void *arg, int bserrno) 1902 { 1903 struct spdk_fs_cb_args *args = arg; 1904 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 1905 struct spdk_file *file = args->file; 1906 1907 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 1908 1909 pthread_spin_lock(&file->lock); 1910 cache_buffer->bytes_filled = args->op.readahead.length; 1911 cache_buffer->bytes_flushed = args->op.readahead.length; 1912 cache_buffer->in_progress = false; 1913 pthread_spin_unlock(&file->lock); 1914 1915 __free_args(args); 1916 } 1917 1918 static void 1919 __readahead(void *_args) 1920 { 1921 struct spdk_fs_cb_args *args = _args; 1922 struct spdk_file *file = args->file; 1923 uint64_t offset, length, start_page, num_pages; 1924 uint32_t page_size; 1925 1926 offset = args->op.readahead.offset; 1927 length = args->op.readahead.length; 1928 assert(length > 0); 1929 1930 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1931 1932 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1933 offset, length, start_page, num_pages); 1934 spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1935 args->op.readahead.cache_buffer->buf, 1936 start_page, num_pages, 1937 __readahead_done, args); 1938 } 1939 1940 static uint64_t 1941 __next_cache_buffer_offset(uint64_t offset) 1942 { 1943 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 1944 } 1945 1946 static void 1947 check_readahead(struct spdk_file *file, uint64_t offset) 1948 { 1949 struct spdk_fs_cb_args *args; 1950 1951 offset = __next_cache_buffer_offset(offset); 1952 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 1953 return; 1954 } 1955 1956 args = calloc(1, sizeof(*args)); 1957 if (args == NULL) { 1958 return; 1959 } 1960 1961 BLOBFS_TRACE(file, "offset=%jx\n", offset); 1962 1963 args->file = file; 1964 args->op.readahead.offset = offset; 1965 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 1966 args->op.readahead.cache_buffer->in_progress = true; 1967 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 1968 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 1969 } else { 1970 args->op.readahead.length = CACHE_BUFFER_SIZE; 1971 } 1972 file->fs->send_request(__readahead, args); 1973 } 1974 1975 static int 1976 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 1977 { 1978 struct cache_buffer *buf; 1979 1980 buf = spdk_tree_find_filled_buffer(file->tree, offset); 1981 if (buf == NULL) { 1982 return __send_rw_from_file(file, sem, payload, offset, length, true); 1983 } 1984 1985 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 1986 length = buf->offset + buf->bytes_filled - offset; 1987 } 1988 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 1989 memcpy(payload, &buf->buf[offset - buf->offset], length); 1990 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 1991 pthread_spin_lock(&g_caches_lock); 1992 spdk_tree_remove_buffer(file->tree, buf); 1993 if (file->tree->present_mask == 0) { 1994 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1995 } 1996 pthread_spin_unlock(&g_caches_lock); 1997 } 1998 1999 sem_post(sem); 2000 return 0; 2001 } 2002 2003 int64_t 2004 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2005 void *payload, uint64_t offset, uint64_t length) 2006 { 2007 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2008 uint64_t final_offset, final_length; 2009 uint32_t sub_reads = 0; 2010 int rc = 0; 2011 2012 pthread_spin_lock(&file->lock); 2013 2014 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2015 2016 file->open_for_writing = false; 2017 2018 if (length == 0 || offset >= file->length) { 2019 pthread_spin_unlock(&file->lock); 2020 return 0; 2021 } 2022 2023 if (offset + length > file->length) { 2024 length = file->length - offset; 2025 } 2026 2027 if (offset != file->next_seq_offset) { 2028 file->seq_byte_count = 0; 2029 } 2030 file->seq_byte_count += length; 2031 file->next_seq_offset = offset + length; 2032 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2033 check_readahead(file, offset); 2034 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2035 } 2036 2037 final_length = 0; 2038 final_offset = offset + length; 2039 while (offset < final_offset) { 2040 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2041 if (length > (final_offset - offset)) { 2042 length = final_offset - offset; 2043 } 2044 rc = __file_read(file, payload, offset, length, &channel->sem); 2045 if (rc == 0) { 2046 final_length += length; 2047 } else { 2048 break; 2049 } 2050 payload += length; 2051 offset += length; 2052 sub_reads++; 2053 } 2054 pthread_spin_unlock(&file->lock); 2055 while (sub_reads-- > 0) { 2056 sem_wait(&channel->sem); 2057 } 2058 if (rc == 0) { 2059 return final_length; 2060 } else { 2061 return rc; 2062 } 2063 } 2064 2065 static void 2066 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2067 spdk_file_op_complete cb_fn, void *cb_arg) 2068 { 2069 struct spdk_fs_request *sync_req; 2070 struct spdk_fs_request *flush_req; 2071 struct spdk_fs_cb_args *sync_args; 2072 struct spdk_fs_cb_args *flush_args; 2073 2074 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2075 2076 pthread_spin_lock(&file->lock); 2077 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2078 BLOBFS_TRACE(file, "done - no data to flush\n"); 2079 pthread_spin_unlock(&file->lock); 2080 cb_fn(cb_arg, 0); 2081 return; 2082 } 2083 2084 sync_req = alloc_fs_request(channel); 2085 assert(sync_req != NULL); 2086 sync_args = &sync_req->args; 2087 2088 flush_req = alloc_fs_request(channel); 2089 assert(flush_req != NULL); 2090 flush_args = &flush_req->args; 2091 2092 sync_args->file = file; 2093 sync_args->fn.file_op = cb_fn; 2094 sync_args->arg = cb_arg; 2095 sync_args->op.sync.offset = file->append_pos; 2096 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2097 pthread_spin_unlock(&file->lock); 2098 2099 flush_args->file = file; 2100 channel->send_request(__file_flush, flush_args); 2101 } 2102 2103 int 2104 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2105 { 2106 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2107 2108 _file_sync(file, channel, __sem_post, &channel->sem); 2109 sem_wait(&channel->sem); 2110 2111 return 0; 2112 } 2113 2114 void 2115 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2116 spdk_file_op_complete cb_fn, void *cb_arg) 2117 { 2118 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2119 2120 _file_sync(file, channel, cb_fn, cb_arg); 2121 } 2122 2123 void 2124 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2125 { 2126 BLOBFS_TRACE(file, "priority=%u\n", priority); 2127 file->priority = priority; 2128 2129 } 2130 2131 /* 2132 * Close routines 2133 */ 2134 2135 static void 2136 __file_close_async_done(void *ctx, int bserrno) 2137 { 2138 struct spdk_fs_request *req = ctx; 2139 struct spdk_fs_cb_args *args = &req->args; 2140 2141 args->fn.file_op(args->arg, bserrno); 2142 free_fs_request(req); 2143 } 2144 2145 static void 2146 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2147 { 2148 pthread_spin_lock(&file->lock); 2149 if (file->ref_count == 0) { 2150 pthread_spin_unlock(&file->lock); 2151 __file_close_async_done(req, -EBADF); 2152 return; 2153 } 2154 2155 file->ref_count--; 2156 if (file->ref_count > 0) { 2157 pthread_spin_unlock(&file->lock); 2158 __file_close_async_done(req, 0); 2159 return; 2160 } 2161 2162 pthread_spin_unlock(&file->lock); 2163 2164 spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req); 2165 } 2166 2167 static void 2168 __file_close_async__sync_done(void *arg, int fserrno) 2169 { 2170 struct spdk_fs_request *req = arg; 2171 struct spdk_fs_cb_args *args = &req->args; 2172 2173 __file_close_async(args->file, req); 2174 } 2175 2176 void 2177 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2178 { 2179 struct spdk_fs_request *req; 2180 struct spdk_fs_cb_args *args; 2181 2182 req = alloc_fs_request(file->fs->md_fs_channel); 2183 if (req == NULL) { 2184 cb_fn(cb_arg, -ENOMEM); 2185 return; 2186 } 2187 2188 args = &req->args; 2189 args->file = file; 2190 args->fn.file_op = cb_fn; 2191 args->arg = cb_arg; 2192 2193 spdk_file_sync_async(file, file->fs->md_io_channel, __file_close_async__sync_done, req); 2194 } 2195 2196 static void 2197 __file_close_done(void *arg, int fserrno) 2198 { 2199 struct spdk_fs_cb_args *args = arg; 2200 2201 args->rc = fserrno; 2202 sem_post(args->sem); 2203 } 2204 2205 static void 2206 __file_close(void *arg) 2207 { 2208 struct spdk_fs_request *req = arg; 2209 struct spdk_fs_cb_args *args = &req->args; 2210 struct spdk_file *file = args->file; 2211 2212 __file_close_async(file, req); 2213 } 2214 2215 int 2216 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2217 { 2218 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2219 struct spdk_fs_request *req; 2220 struct spdk_fs_cb_args *args; 2221 2222 req = alloc_fs_request(channel); 2223 assert(req != NULL); 2224 2225 args = &req->args; 2226 2227 spdk_file_sync(file, _channel); 2228 BLOBFS_TRACE(file, "name=%s\n", file->name); 2229 args->file = file; 2230 args->sem = &channel->sem; 2231 args->fn.file_op = __file_close_done; 2232 args->arg = req; 2233 channel->send_request(__file_close, req); 2234 sem_wait(&channel->sem); 2235 2236 return args->rc; 2237 } 2238 2239 static void 2240 cache_free_buffers(struct spdk_file *file) 2241 { 2242 BLOBFS_TRACE(file, "free=%s\n", file->name); 2243 pthread_spin_lock(&file->lock); 2244 pthread_spin_lock(&g_caches_lock); 2245 if (file->tree->present_mask == 0) { 2246 pthread_spin_unlock(&g_caches_lock); 2247 pthread_spin_unlock(&file->lock); 2248 return; 2249 } 2250 spdk_tree_free_buffers(file->tree); 2251 if (file->tree->present_mask == 0) { 2252 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2253 } 2254 file->last = NULL; 2255 pthread_spin_unlock(&g_caches_lock); 2256 pthread_spin_unlock(&file->lock); 2257 } 2258 2259 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS); 2260 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW); 2261