1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "blobfs_internal.h" 38 39 #include "spdk/queue.h" 40 #include "spdk/io_channel.h" 41 #include "spdk/assert.h" 42 #include "spdk/env.h" 43 #include "spdk/util.h" 44 #include "spdk_internal/log.h" 45 46 #define BLOBFS_TRACE(file, str, args...) \ 47 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s " str, file->name, ##args) 48 49 #define BLOBFS_TRACE_RW(file, str, args...) \ 50 SPDK_TRACELOG(SPDK_TRACE_BLOBFS_RW, "file=%s " str, file->name, ##args) 51 52 #define BLOBFS_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 53 54 static uint64_t g_fs_cache_size = BLOBFS_CACHE_SIZE; 55 static struct spdk_mempool *g_cache_pool; 56 static TAILQ_HEAD(, spdk_file) g_caches; 57 static pthread_spinlock_t g_caches_lock; 58 59 static void 60 __sem_post(void *arg, int bserrno) 61 { 62 sem_t *sem = arg; 63 64 sem_post(sem); 65 } 66 67 void 68 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 69 { 70 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 71 free(cache_buffer); 72 } 73 74 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 75 76 struct spdk_file { 77 struct spdk_filesystem *fs; 78 struct spdk_blob *blob; 79 char *name; 80 uint64_t length; 81 bool open_for_writing; 82 uint64_t length_flushed; 83 uint64_t append_pos; 84 uint64_t seq_byte_count; 85 uint64_t next_seq_offset; 86 uint32_t priority; 87 TAILQ_ENTRY(spdk_file) tailq; 88 spdk_blob_id blobid; 89 uint32_t ref_count; 90 pthread_spinlock_t lock; 91 struct cache_buffer *last; 92 struct cache_tree *tree; 93 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 94 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 95 TAILQ_ENTRY(spdk_file) cache_tailq; 96 }; 97 98 struct spdk_filesystem { 99 struct spdk_blob_store *bs; 100 TAILQ_HEAD(, spdk_file) files; 101 struct spdk_bs_opts bs_opts; 102 struct spdk_bs_dev *bdev; 103 fs_send_request_fn send_request; 104 105 struct { 106 uint32_t max_ops; 107 struct spdk_io_channel *sync_io_channel; 108 struct spdk_fs_channel *sync_fs_channel; 109 } sync_target; 110 111 struct { 112 uint32_t max_ops; 113 struct spdk_io_channel *md_io_channel; 114 struct spdk_fs_channel *md_fs_channel; 115 } md_target; 116 117 struct { 118 uint32_t max_ops; 119 } io_target; 120 }; 121 122 struct spdk_fs_cb_args { 123 union { 124 spdk_fs_op_with_handle_complete fs_op_with_handle; 125 spdk_fs_op_complete fs_op; 126 spdk_file_op_with_handle_complete file_op_with_handle; 127 spdk_file_op_complete file_op; 128 spdk_file_stat_op_complete stat_op; 129 } fn; 130 void *arg; 131 sem_t *sem; 132 struct spdk_filesystem *fs; 133 struct spdk_file *file; 134 int rc; 135 bool from_request; 136 union { 137 struct { 138 uint64_t length; 139 } truncate; 140 struct { 141 struct spdk_io_channel *channel; 142 void *user_buf; 143 void *pin_buf; 144 int is_read; 145 off_t offset; 146 size_t length; 147 uint64_t start_page; 148 uint64_t num_pages; 149 uint32_t blocklen; 150 } rw; 151 struct { 152 const char *old_name; 153 const char *new_name; 154 } rename; 155 struct { 156 struct cache_buffer *cache_buffer; 157 uint64_t length; 158 } flush; 159 struct { 160 struct cache_buffer *cache_buffer; 161 uint64_t length; 162 uint64_t offset; 163 } readahead; 164 struct { 165 uint64_t offset; 166 TAILQ_ENTRY(spdk_fs_request) tailq; 167 } sync; 168 struct { 169 uint32_t num_clusters; 170 } resize; 171 struct { 172 const char *name; 173 uint32_t flags; 174 TAILQ_ENTRY(spdk_fs_request) tailq; 175 } open; 176 struct { 177 const char *name; 178 } create; 179 struct { 180 const char *name; 181 } delete; 182 struct { 183 const char *name; 184 } stat; 185 } op; 186 }; 187 188 static void cache_free_buffers(struct spdk_file *file); 189 190 static void 191 __initialize_cache(void) 192 { 193 if (g_cache_pool != NULL) { 194 return; 195 } 196 197 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 198 g_fs_cache_size / CACHE_BUFFER_SIZE, 199 CACHE_BUFFER_SIZE, -1, SPDK_ENV_SOCKET_ID_ANY); 200 TAILQ_INIT(&g_caches); 201 pthread_spin_init(&g_caches_lock, 0); 202 } 203 204 static uint64_t 205 __file_get_blob_size(struct spdk_file *file) 206 { 207 uint64_t cluster_sz; 208 209 cluster_sz = file->fs->bs_opts.cluster_sz; 210 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 211 } 212 213 struct spdk_fs_request { 214 struct spdk_fs_cb_args args; 215 TAILQ_ENTRY(spdk_fs_request) link; 216 struct spdk_fs_channel *channel; 217 }; 218 219 struct spdk_fs_channel { 220 struct spdk_fs_request *req_mem; 221 TAILQ_HEAD(, spdk_fs_request) reqs; 222 sem_t sem; 223 struct spdk_filesystem *fs; 224 struct spdk_io_channel *bs_channel; 225 fs_send_request_fn send_request; 226 }; 227 228 static struct spdk_fs_request * 229 alloc_fs_request(struct spdk_fs_channel *channel) 230 { 231 struct spdk_fs_request *req; 232 233 req = TAILQ_FIRST(&channel->reqs); 234 if (!req) { 235 return NULL; 236 } 237 TAILQ_REMOVE(&channel->reqs, req, link); 238 memset(req, 0, sizeof(*req)); 239 req->channel = channel; 240 req->args.from_request = true; 241 242 return req; 243 } 244 245 static void 246 free_fs_request(struct spdk_fs_request *req) 247 { 248 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 249 } 250 251 static int 252 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 253 uint32_t max_ops) 254 { 255 uint32_t i; 256 257 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 258 if (!channel->req_mem) { 259 return -1; 260 } 261 262 TAILQ_INIT(&channel->reqs); 263 sem_init(&channel->sem, 0, 0); 264 265 for (i = 0; i < max_ops; i++) { 266 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 267 } 268 269 channel->fs = fs; 270 271 return 0; 272 } 273 274 static int 275 _spdk_fs_md_channel_create(void *io_device, uint32_t priority, void *ctx_buf) 276 { 277 struct spdk_filesystem *fs; 278 struct spdk_fs_channel *channel = ctx_buf; 279 280 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 281 282 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 283 } 284 285 static int 286 _spdk_fs_sync_channel_create(void *io_device, uint32_t priority, void *ctx_buf) 287 { 288 struct spdk_filesystem *fs; 289 struct spdk_fs_channel *channel = ctx_buf; 290 291 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 292 293 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 294 } 295 296 static int 297 _spdk_fs_io_channel_create(void *io_device, uint32_t priority, void *ctx_buf) 298 { 299 struct spdk_filesystem *fs; 300 struct spdk_fs_channel *channel = ctx_buf; 301 302 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 303 304 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 305 } 306 307 static void 308 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 309 { 310 struct spdk_fs_channel *channel = ctx_buf; 311 312 free(channel->req_mem); 313 if (channel->bs_channel != NULL) { 314 spdk_bs_free_io_channel(channel->bs_channel); 315 } 316 } 317 318 static void 319 __send_request_direct(fs_request_fn fn, void *arg) 320 { 321 fn(arg); 322 } 323 324 static void 325 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 326 { 327 fs->bs = bs; 328 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 329 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, 330 SPDK_IO_PRIORITY_DEFAULT); 331 fs->md_target.md_fs_channel->send_request = __send_request_direct; 332 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, 333 SPDK_IO_PRIORITY_DEFAULT); 334 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 335 } 336 337 static void 338 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 339 { 340 struct spdk_fs_request *req = ctx; 341 struct spdk_fs_cb_args *args = &req->args; 342 struct spdk_filesystem *fs = args->fs; 343 344 if (bserrno == 0) { 345 common_fs_bs_init(fs, bs); 346 } else { 347 free(fs); 348 fs = NULL; 349 } 350 351 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 352 free_fs_request(req); 353 } 354 355 static struct spdk_filesystem * 356 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 357 { 358 struct spdk_filesystem *fs; 359 360 fs = calloc(1, sizeof(*fs)); 361 if (fs == NULL) { 362 return NULL; 363 } 364 365 fs->bdev = dev; 366 fs->send_request = send_request_fn; 367 TAILQ_INIT(&fs->files); 368 369 fs->md_target.max_ops = 512; 370 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 371 sizeof(struct spdk_fs_channel)); 372 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target, SPDK_IO_PRIORITY_DEFAULT); 373 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 374 375 fs->sync_target.max_ops = 512; 376 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 377 sizeof(struct spdk_fs_channel)); 378 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target, SPDK_IO_PRIORITY_DEFAULT); 379 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 380 381 fs->io_target.max_ops = 512; 382 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 383 sizeof(struct spdk_fs_channel)); 384 385 __initialize_cache(); 386 387 return fs; 388 } 389 390 void 391 spdk_fs_init(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 392 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 393 { 394 struct spdk_filesystem *fs; 395 struct spdk_fs_request *req; 396 struct spdk_fs_cb_args *args; 397 398 fs = fs_alloc(dev, send_request_fn); 399 if (fs == NULL) { 400 cb_fn(cb_arg, NULL, -ENOMEM); 401 return; 402 } 403 404 req = alloc_fs_request(fs->md_target.md_fs_channel); 405 if (req == NULL) { 406 cb_fn(cb_arg, NULL, -ENOMEM); 407 return; 408 } 409 410 args = &req->args; 411 args->fn.fs_op_with_handle = cb_fn; 412 args->arg = cb_arg; 413 args->fs = fs; 414 415 spdk_bs_init(dev, NULL, init_cb, req); 416 } 417 418 static struct spdk_file * 419 file_alloc(struct spdk_filesystem *fs) 420 { 421 struct spdk_file *file; 422 423 file = calloc(1, sizeof(*file)); 424 if (file == NULL) { 425 return NULL; 426 } 427 428 file->fs = fs; 429 TAILQ_INIT(&file->open_requests); 430 TAILQ_INIT(&file->sync_requests); 431 pthread_spin_init(&file->lock, 0); 432 file->tree = calloc(1, sizeof(*file->tree)); 433 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 434 file->priority = SPDK_FILE_PRIORITY_LOW; 435 return file; 436 } 437 438 static void 439 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 440 { 441 struct spdk_fs_request *req = ctx; 442 struct spdk_fs_cb_args *args = &req->args; 443 struct spdk_filesystem *fs = args->fs; 444 struct spdk_file *f; 445 uint64_t *length; 446 const char *name; 447 size_t value_len; 448 449 if (rc == -ENOENT) { 450 /* Finished iterating */ 451 args->fn.fs_op_with_handle(args->arg, fs, 0); 452 free_fs_request(req); 453 return; 454 } else if (rc < 0) { 455 args->fn.fs_op_with_handle(args->arg, fs, rc); 456 free_fs_request(req); 457 return; 458 } 459 460 rc = spdk_bs_md_get_xattr_value(blob, "name", (const void **)&name, &value_len); 461 if (rc < 0) { 462 args->fn.fs_op_with_handle(args->arg, fs, rc); 463 free_fs_request(req); 464 return; 465 } 466 467 rc = spdk_bs_md_get_xattr_value(blob, "length", (const void **)&length, &value_len); 468 if (rc < 0) { 469 args->fn.fs_op_with_handle(args->arg, fs, rc); 470 free_fs_request(req); 471 return; 472 } 473 assert(value_len == 8); 474 475 f = file_alloc(fs); 476 if (f == NULL) { 477 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 478 free_fs_request(req); 479 return; 480 } 481 482 f->name = strdup(name); 483 f->blobid = spdk_blob_get_id(blob); 484 f->length = *length; 485 f->length_flushed = *length; 486 f->append_pos = *length; 487 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 488 489 spdk_bs_md_iter_next(fs->bs, &blob, iter_cb, req); 490 } 491 492 static void 493 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 494 { 495 struct spdk_fs_request *req = ctx; 496 struct spdk_fs_cb_args *args = &req->args; 497 struct spdk_filesystem *fs = args->fs; 498 499 if (bserrno != 0) { 500 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 501 free_fs_request(req); 502 free(fs); 503 return; 504 } 505 506 common_fs_bs_init(fs, bs); 507 spdk_bs_md_iter_first(fs->bs, iter_cb, req); 508 } 509 510 void 511 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 512 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 513 { 514 struct spdk_filesystem *fs; 515 struct spdk_fs_cb_args *args; 516 struct spdk_fs_request *req; 517 518 fs = fs_alloc(dev, send_request_fn); 519 if (fs == NULL) { 520 cb_fn(cb_arg, NULL, -ENOMEM); 521 return; 522 } 523 524 req = alloc_fs_request(fs->md_target.md_fs_channel); 525 if (req == NULL) { 526 cb_fn(cb_arg, NULL, -ENOMEM); 527 return; 528 } 529 530 args = &req->args; 531 args->fn.fs_op_with_handle = cb_fn; 532 args->arg = cb_arg; 533 args->fs = fs; 534 535 spdk_bs_load(dev, load_cb, req); 536 } 537 538 static void 539 unload_cb(void *ctx, int bserrno) 540 { 541 struct spdk_fs_request *req = ctx; 542 struct spdk_fs_cb_args *args = &req->args; 543 struct spdk_filesystem *fs = args->fs; 544 545 args->fn.fs_op(args->arg, bserrno); 546 free(req); 547 548 spdk_io_device_unregister(&fs->io_target); 549 spdk_io_device_unregister(&fs->sync_target); 550 spdk_io_device_unregister(&fs->md_target); 551 552 free(fs); 553 } 554 555 void 556 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 557 { 558 struct spdk_fs_request *req; 559 struct spdk_fs_cb_args *args; 560 561 /* 562 * We must free the md_channel before unloading the blobstore, so just 563 * allocate this request from the general heap. 564 */ 565 req = calloc(1, sizeof(*req)); 566 if (req == NULL) { 567 cb_fn(cb_arg, -ENOMEM); 568 return; 569 } 570 571 args = &req->args; 572 args->fn.fs_op = cb_fn; 573 args->arg = cb_arg; 574 args->fs = fs; 575 576 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 577 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 578 spdk_bs_unload(fs->bs, unload_cb, req); 579 } 580 581 static struct spdk_file * 582 fs_find_file(struct spdk_filesystem *fs, const char *name) 583 { 584 struct spdk_file *file; 585 586 TAILQ_FOREACH(file, &fs->files, tailq) { 587 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 588 return file; 589 } 590 } 591 592 return NULL; 593 } 594 595 void 596 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 597 spdk_file_stat_op_complete cb_fn, void *cb_arg) 598 { 599 struct spdk_file_stat stat; 600 struct spdk_file *f = NULL; 601 602 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 603 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 604 return; 605 } 606 607 f = fs_find_file(fs, name); 608 if (f != NULL) { 609 stat.blobid = f->blobid; 610 stat.size = f->length; 611 cb_fn(cb_arg, &stat, 0); 612 return; 613 } 614 615 cb_fn(cb_arg, NULL, -ENOENT); 616 } 617 618 static void 619 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 620 { 621 struct spdk_fs_request *req = arg; 622 struct spdk_fs_cb_args *args = &req->args; 623 624 args->rc = fserrno; 625 if (fserrno == 0) { 626 memcpy(args->arg, stat, sizeof(*stat)); 627 } 628 sem_post(args->sem); 629 } 630 631 static void 632 __file_stat(void *arg) 633 { 634 struct spdk_fs_request *req = arg; 635 struct spdk_fs_cb_args *args = &req->args; 636 637 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 638 args->fn.stat_op, req); 639 } 640 641 int 642 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 643 const char *name, struct spdk_file_stat *stat) 644 { 645 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 646 struct spdk_fs_request *req; 647 int rc; 648 649 req = alloc_fs_request(channel); 650 assert(req != NULL); 651 652 req->args.fs = fs; 653 req->args.op.stat.name = name; 654 req->args.fn.stat_op = __copy_stat; 655 req->args.arg = stat; 656 req->args.sem = &channel->sem; 657 channel->send_request(__file_stat, req); 658 sem_wait(&channel->sem); 659 660 rc = req->args.rc; 661 free_fs_request(req); 662 663 return rc; 664 } 665 666 static void 667 fs_create_blob_close_cb(void *ctx, int bserrno) 668 { 669 struct spdk_fs_request *req = ctx; 670 struct spdk_fs_cb_args *args = &req->args; 671 672 args->fn.file_op(args->arg, bserrno); 673 free_fs_request(req); 674 } 675 676 static void 677 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 678 { 679 struct spdk_fs_request *req = ctx; 680 struct spdk_fs_cb_args *args = &req->args; 681 struct spdk_file *f = args->file; 682 uint64_t length = 0; 683 684 f->blob = blob; 685 spdk_bs_md_resize_blob(blob, 1); 686 spdk_blob_md_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 687 spdk_blob_md_set_xattr(blob, "length", &length, sizeof(length)); 688 689 spdk_bs_md_close_blob(&f->blob, fs_create_blob_close_cb, args); 690 } 691 692 static void 693 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 694 { 695 struct spdk_fs_request *req = ctx; 696 struct spdk_fs_cb_args *args = &req->args; 697 struct spdk_file *f = args->file; 698 699 f->blobid = blobid; 700 spdk_bs_md_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 701 } 702 703 void 704 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 705 spdk_file_op_complete cb_fn, void *cb_arg) 706 { 707 struct spdk_file *file; 708 struct spdk_fs_request *req; 709 struct spdk_fs_cb_args *args; 710 711 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 712 cb_fn(cb_arg, -ENAMETOOLONG); 713 return; 714 } 715 716 file = fs_find_file(fs, name); 717 if (file != NULL) { 718 cb_fn(cb_arg, -EEXIST); 719 return; 720 } 721 722 file = file_alloc(fs); 723 if (file == NULL) { 724 cb_fn(cb_arg, -ENOMEM); 725 return; 726 } 727 728 req = alloc_fs_request(fs->md_target.md_fs_channel); 729 if (req == NULL) { 730 cb_fn(cb_arg, -ENOMEM); 731 return; 732 } 733 734 args = &req->args; 735 args->file = file; 736 args->fn.file_op = cb_fn; 737 args->arg = cb_arg; 738 739 file->name = strdup(name); 740 spdk_bs_md_create_blob(fs->bs, fs_create_blob_create_cb, args); 741 } 742 743 static void 744 __fs_create_file_done(void *arg, int fserrno) 745 { 746 struct spdk_fs_request *req = arg; 747 struct spdk_fs_cb_args *args = &req->args; 748 749 args->rc = fserrno; 750 sem_post(args->sem); 751 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 752 } 753 754 static void 755 __fs_create_file(void *arg) 756 { 757 struct spdk_fs_request *req = arg; 758 struct spdk_fs_cb_args *args = &req->args; 759 760 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.create.name); 761 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 762 } 763 764 int 765 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 766 { 767 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 768 struct spdk_fs_request *req; 769 struct spdk_fs_cb_args *args; 770 int rc; 771 772 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 773 774 req = alloc_fs_request(channel); 775 assert(req != NULL); 776 777 args = &req->args; 778 args->fs = fs; 779 args->op.create.name = name; 780 args->sem = &channel->sem; 781 fs->send_request(__fs_create_file, req); 782 sem_wait(&channel->sem); 783 rc = args->rc; 784 free_fs_request(req); 785 786 return rc; 787 } 788 789 static void 790 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 791 { 792 struct spdk_fs_request *req = ctx; 793 struct spdk_fs_cb_args *args = &req->args; 794 struct spdk_file *f = args->file; 795 796 f->blob = blob; 797 while (!TAILQ_EMPTY(&f->open_requests)) { 798 req = TAILQ_FIRST(&f->open_requests); 799 args = &req->args; 800 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 801 args->fn.file_op_with_handle(args->arg, f, bserrno); 802 free_fs_request(req); 803 } 804 } 805 806 static void 807 fs_open_blob_create_cb(void *ctx, int bserrno) 808 { 809 struct spdk_fs_request *req = ctx; 810 struct spdk_fs_cb_args *args = &req->args; 811 struct spdk_file *file = args->file; 812 struct spdk_filesystem *fs = args->fs; 813 814 if (file == NULL) { 815 /* 816 * This is from an open with CREATE flag - the file 817 * is now created so look it up in the file list for this 818 * filesystem. 819 */ 820 file = fs_find_file(fs, args->op.open.name); 821 assert(file != NULL); 822 args->file = file; 823 } 824 825 file->ref_count++; 826 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 827 if (file->ref_count == 1) { 828 assert(file->blob == NULL); 829 spdk_bs_md_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 830 } else if (file->blob != NULL) { 831 fs_open_blob_done(req, file->blob, 0); 832 } else { 833 /* 834 * The blob open for this file is in progress due to a previous 835 * open request. When that open completes, it will invoke the 836 * open callback for this request. 837 */ 838 } 839 } 840 841 void 842 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 843 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 844 { 845 struct spdk_file *f = NULL; 846 struct spdk_fs_request *req; 847 struct spdk_fs_cb_args *args; 848 849 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 850 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 851 return; 852 } 853 854 f = fs_find_file(fs, name); 855 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 856 cb_fn(cb_arg, NULL, -ENOENT); 857 return; 858 } 859 860 req = alloc_fs_request(fs->md_target.md_fs_channel); 861 if (req == NULL) { 862 cb_fn(cb_arg, NULL, -ENOMEM); 863 return; 864 } 865 866 args = &req->args; 867 args->fn.file_op_with_handle = cb_fn; 868 args->arg = cb_arg; 869 args->file = f; 870 args->fs = fs; 871 args->op.open.name = name; 872 873 if (f == NULL) { 874 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 875 } else { 876 fs_open_blob_create_cb(req, 0); 877 } 878 } 879 880 static void 881 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 882 { 883 struct spdk_fs_request *req = arg; 884 struct spdk_fs_cb_args *args = &req->args; 885 886 args->file = file; 887 args->rc = bserrno; 888 sem_post(args->sem); 889 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 890 } 891 892 static void 893 __fs_open_file(void *arg) 894 { 895 struct spdk_fs_request *req = arg; 896 struct spdk_fs_cb_args *args = &req->args; 897 898 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", args->op.open.name); 899 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 900 __fs_open_file_done, req); 901 } 902 903 int 904 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 905 const char *name, uint32_t flags, struct spdk_file **file) 906 { 907 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 908 struct spdk_fs_request *req; 909 struct spdk_fs_cb_args *args; 910 int rc; 911 912 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 913 914 req = alloc_fs_request(channel); 915 assert(req != NULL); 916 917 args = &req->args; 918 args->fs = fs; 919 args->op.open.name = name; 920 args->op.open.flags = flags; 921 args->sem = &channel->sem; 922 fs->send_request(__fs_open_file, req); 923 sem_wait(&channel->sem); 924 rc = args->rc; 925 if (rc == 0) { 926 *file = args->file; 927 } else { 928 *file = NULL; 929 } 930 free_fs_request(req); 931 932 return rc; 933 } 934 935 static void 936 fs_rename_blob_close_cb(void *ctx, int bserrno) 937 { 938 struct spdk_fs_request *req = ctx; 939 struct spdk_fs_cb_args *args = &req->args; 940 941 args->fn.fs_op(args->arg, bserrno); 942 free_fs_request(req); 943 } 944 945 static void 946 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 947 { 948 struct spdk_fs_request *req = ctx; 949 struct spdk_fs_cb_args *args = &req->args; 950 struct spdk_file *f = args->file; 951 const char *new_name = args->op.rename.new_name; 952 953 f->blob = blob; 954 spdk_blob_md_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 955 spdk_bs_md_close_blob(&f->blob, fs_rename_blob_close_cb, req); 956 } 957 958 static void 959 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 960 { 961 struct spdk_fs_cb_args *args = &req->args; 962 struct spdk_file *f; 963 964 f = fs_find_file(args->fs, args->op.rename.old_name); 965 if (f == NULL) { 966 args->fn.fs_op(args->arg, -ENOENT); 967 free_fs_request(req); 968 return; 969 } 970 971 free(f->name); 972 f->name = strdup(args->op.rename.new_name); 973 args->file = f; 974 spdk_bs_md_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 975 } 976 977 static void 978 fs_rename_delete_done(void *arg, int fserrno) 979 { 980 __spdk_fs_md_rename_file(arg); 981 } 982 983 void 984 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 985 const char *old_name, const char *new_name, 986 spdk_file_op_complete cb_fn, void *cb_arg) 987 { 988 struct spdk_file *f; 989 struct spdk_fs_request *req; 990 struct spdk_fs_cb_args *args; 991 992 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "old=%s new=%s\n", old_name, new_name); 993 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 994 cb_fn(cb_arg, -ENAMETOOLONG); 995 return; 996 } 997 998 req = alloc_fs_request(fs->md_target.md_fs_channel); 999 if (req == NULL) { 1000 cb_fn(cb_arg, -ENOMEM); 1001 return; 1002 } 1003 1004 args = &req->args; 1005 args->fn.fs_op = cb_fn; 1006 args->fs = fs; 1007 args->arg = cb_arg; 1008 args->op.rename.old_name = old_name; 1009 args->op.rename.new_name = new_name; 1010 1011 f = fs_find_file(fs, new_name); 1012 if (f == NULL) { 1013 __spdk_fs_md_rename_file(req); 1014 return; 1015 } 1016 1017 /* 1018 * The rename overwrites an existing file. So delete the existing file, then 1019 * do the actual rename. 1020 */ 1021 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1022 } 1023 1024 static void 1025 __fs_rename_file_done(void *arg, int fserrno) 1026 { 1027 struct spdk_fs_request *req = arg; 1028 struct spdk_fs_cb_args *args = &req->args; 1029 1030 args->rc = fserrno; 1031 sem_post(args->sem); 1032 } 1033 1034 static void 1035 __fs_rename_file(void *arg) 1036 { 1037 struct spdk_fs_request *req = arg; 1038 struct spdk_fs_cb_args *args = &req->args; 1039 1040 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1041 __fs_rename_file_done, req); 1042 } 1043 1044 int 1045 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1046 const char *old_name, const char *new_name) 1047 { 1048 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1049 struct spdk_fs_request *req; 1050 struct spdk_fs_cb_args *args; 1051 int rc; 1052 1053 req = alloc_fs_request(channel); 1054 assert(req != NULL); 1055 1056 args = &req->args; 1057 1058 args->fs = fs; 1059 args->op.rename.old_name = old_name; 1060 args->op.rename.new_name = new_name; 1061 args->sem = &channel->sem; 1062 fs->send_request(__fs_rename_file, req); 1063 sem_wait(&channel->sem); 1064 rc = args->rc; 1065 free_fs_request(req); 1066 return rc; 1067 } 1068 1069 static void 1070 blob_delete_cb(void *ctx, int bserrno) 1071 { 1072 struct spdk_fs_request *req = ctx; 1073 struct spdk_fs_cb_args *args = &req->args; 1074 1075 args->fn.file_op(args->arg, bserrno); 1076 free_fs_request(req); 1077 } 1078 1079 void 1080 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1081 spdk_file_op_complete cb_fn, void *cb_arg) 1082 { 1083 struct spdk_file *f; 1084 spdk_blob_id blobid; 1085 struct spdk_fs_request *req; 1086 struct spdk_fs_cb_args *args; 1087 1088 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s\n", name); 1089 1090 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1091 cb_fn(cb_arg, -ENAMETOOLONG); 1092 return; 1093 } 1094 1095 f = fs_find_file(fs, name); 1096 if (f == NULL) { 1097 cb_fn(cb_arg, -ENOENT); 1098 return; 1099 } 1100 1101 if (f->ref_count > 0) { 1102 /* For now, do not allow deleting files with open references. */ 1103 cb_fn(cb_arg, -EBUSY); 1104 return; 1105 } 1106 1107 req = alloc_fs_request(fs->md_target.md_fs_channel); 1108 if (req == NULL) { 1109 cb_fn(cb_arg, -ENOMEM); 1110 return; 1111 } 1112 1113 TAILQ_REMOVE(&fs->files, f, tailq); 1114 1115 cache_free_buffers(f); 1116 1117 blobid = f->blobid; 1118 1119 free(f->name); 1120 free(f->tree); 1121 free(f); 1122 1123 args = &req->args; 1124 args->fn.file_op = cb_fn; 1125 args->arg = cb_arg; 1126 spdk_bs_md_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1127 } 1128 1129 static void 1130 __fs_delete_file_done(void *arg, int fserrno) 1131 { 1132 struct spdk_fs_request *req = arg; 1133 struct spdk_fs_cb_args *args = &req->args; 1134 1135 args->rc = fserrno; 1136 sem_post(args->sem); 1137 } 1138 1139 static void 1140 __fs_delete_file(void *arg) 1141 { 1142 struct spdk_fs_request *req = arg; 1143 struct spdk_fs_cb_args *args = &req->args; 1144 1145 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1146 } 1147 1148 int 1149 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1150 const char *name) 1151 { 1152 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1153 struct spdk_fs_request *req; 1154 struct spdk_fs_cb_args *args; 1155 int rc; 1156 1157 req = alloc_fs_request(channel); 1158 assert(req != NULL); 1159 1160 args = &req->args; 1161 args->fs = fs; 1162 args->op.delete.name = name; 1163 args->sem = &channel->sem; 1164 fs->send_request(__fs_delete_file, req); 1165 sem_wait(&channel->sem); 1166 rc = args->rc; 1167 free_fs_request(req); 1168 1169 return rc; 1170 } 1171 1172 spdk_fs_iter 1173 spdk_fs_iter_first(struct spdk_filesystem *fs) 1174 { 1175 struct spdk_file *f; 1176 1177 f = TAILQ_FIRST(&fs->files); 1178 return f; 1179 } 1180 1181 spdk_fs_iter 1182 spdk_fs_iter_next(spdk_fs_iter iter) 1183 { 1184 struct spdk_file *f = iter; 1185 1186 if (f == NULL) { 1187 return NULL; 1188 } 1189 1190 f = TAILQ_NEXT(f, tailq); 1191 return f; 1192 } 1193 1194 const char * 1195 spdk_file_get_name(struct spdk_file *file) 1196 { 1197 return file->name; 1198 } 1199 1200 uint64_t 1201 spdk_file_get_length(struct spdk_file *file) 1202 { 1203 assert(file != NULL); 1204 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s length=0x%jx\n", file->name, file->length); 1205 return file->length; 1206 } 1207 1208 static void 1209 fs_truncate_complete_cb(void *ctx, int bserrno) 1210 { 1211 struct spdk_fs_request *req = ctx; 1212 struct spdk_fs_cb_args *args = &req->args; 1213 1214 args->fn.file_op(args->arg, bserrno); 1215 free_fs_request(req); 1216 } 1217 1218 static uint64_t 1219 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1220 { 1221 return (length + cluster_sz - 1) / cluster_sz; 1222 } 1223 1224 void 1225 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1226 spdk_file_op_complete cb_fn, void *cb_arg) 1227 { 1228 struct spdk_filesystem *fs; 1229 size_t num_clusters; 1230 struct spdk_fs_request *req; 1231 struct spdk_fs_cb_args *args; 1232 1233 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1234 if (length == file->length) { 1235 cb_fn(cb_arg, 0); 1236 return; 1237 } 1238 1239 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1240 if (req == NULL) { 1241 cb_fn(cb_arg, -ENOMEM); 1242 return; 1243 } 1244 1245 args = &req->args; 1246 args->fn.file_op = cb_fn; 1247 args->arg = cb_arg; 1248 args->file = file; 1249 fs = file->fs; 1250 1251 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1252 1253 spdk_bs_md_resize_blob(file->blob, num_clusters); 1254 spdk_blob_md_set_xattr(file->blob, "length", &length, sizeof(length)); 1255 1256 file->length = length; 1257 if (file->append_pos > file->length) { 1258 file->append_pos = file->length; 1259 } 1260 1261 spdk_bs_md_sync_blob(file->blob, fs_truncate_complete_cb, args); 1262 } 1263 1264 static void 1265 __truncate(void *arg) 1266 { 1267 struct spdk_fs_request *req = arg; 1268 struct spdk_fs_cb_args *args = &req->args; 1269 1270 spdk_file_truncate_async(args->file, args->op.truncate.length, 1271 args->fn.file_op, args->arg); 1272 } 1273 1274 void 1275 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1276 uint64_t length) 1277 { 1278 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1279 struct spdk_fs_request *req; 1280 struct spdk_fs_cb_args *args; 1281 1282 req = alloc_fs_request(channel); 1283 assert(req != NULL); 1284 1285 args = &req->args; 1286 1287 args->file = file; 1288 args->op.truncate.length = length; 1289 args->fn.file_op = __sem_post; 1290 args->arg = &channel->sem; 1291 1292 channel->send_request(__truncate, req); 1293 sem_wait(&channel->sem); 1294 free_fs_request(req); 1295 } 1296 1297 static void 1298 __rw_done(void *ctx, int bserrno) 1299 { 1300 struct spdk_fs_request *req = ctx; 1301 struct spdk_fs_cb_args *args = &req->args; 1302 1303 spdk_free(args->op.rw.pin_buf); 1304 args->fn.file_op(args->arg, bserrno); 1305 free_fs_request(req); 1306 } 1307 1308 static void 1309 __read_done(void *ctx, int bserrno) 1310 { 1311 struct spdk_fs_request *req = ctx; 1312 struct spdk_fs_cb_args *args = &req->args; 1313 1314 if (args->op.rw.is_read) { 1315 memcpy(args->op.rw.user_buf, 1316 args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1317 args->op.rw.length); 1318 __rw_done(req, 0); 1319 } else { 1320 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & 0xFFF), 1321 args->op.rw.user_buf, 1322 args->op.rw.length); 1323 spdk_bs_io_write_blob(args->file->blob, args->op.rw.channel, 1324 args->op.rw.pin_buf, 1325 args->op.rw.start_page, args->op.rw.num_pages, 1326 __rw_done, req); 1327 } 1328 } 1329 1330 static void 1331 __do_blob_read(void *ctx, int fserrno) 1332 { 1333 struct spdk_fs_request *req = ctx; 1334 struct spdk_fs_cb_args *args = &req->args; 1335 1336 spdk_bs_io_read_blob(args->file->blob, args->op.rw.channel, 1337 args->op.rw.pin_buf, 1338 args->op.rw.start_page, args->op.rw.num_pages, 1339 __read_done, req); 1340 } 1341 1342 static void 1343 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1344 uint64_t *start_page, uint32_t *page_size, uint64_t *num_pages) 1345 { 1346 uint64_t end_page; 1347 1348 *page_size = spdk_bs_get_page_size(file->fs->bs); 1349 *start_page = offset / *page_size; 1350 end_page = (offset + length - 1) / *page_size; 1351 *num_pages = (end_page - *start_page + 1); 1352 } 1353 1354 static void 1355 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1356 void *payload, uint64_t offset, uint64_t length, 1357 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1358 { 1359 struct spdk_fs_request *req; 1360 struct spdk_fs_cb_args *args; 1361 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1362 uint64_t start_page, num_pages, pin_buf_length; 1363 uint32_t page_size; 1364 1365 if (is_read && offset + length > file->length) { 1366 cb_fn(cb_arg, -EINVAL); 1367 return; 1368 } 1369 1370 req = alloc_fs_request(channel); 1371 if (req == NULL) { 1372 cb_fn(cb_arg, -ENOMEM); 1373 return; 1374 } 1375 1376 args = &req->args; 1377 args->fn.file_op = cb_fn; 1378 args->arg = cb_arg; 1379 args->file = file; 1380 args->op.rw.channel = channel->bs_channel; 1381 args->op.rw.user_buf = payload; 1382 args->op.rw.is_read = is_read; 1383 args->op.rw.offset = offset; 1384 args->op.rw.length = length; 1385 1386 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1387 pin_buf_length = num_pages * page_size; 1388 args->op.rw.pin_buf = spdk_malloc(pin_buf_length, 4096, NULL); 1389 1390 args->op.rw.start_page = start_page; 1391 args->op.rw.num_pages = num_pages; 1392 1393 if (!is_read && file->length < offset + length) { 1394 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1395 } else { 1396 __do_blob_read(req, 0); 1397 } 1398 } 1399 1400 void 1401 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1402 void *payload, uint64_t offset, uint64_t length, 1403 spdk_file_op_complete cb_fn, void *cb_arg) 1404 { 1405 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1406 } 1407 1408 void 1409 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1410 void *payload, uint64_t offset, uint64_t length, 1411 spdk_file_op_complete cb_fn, void *cb_arg) 1412 { 1413 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "file=%s offset=%jx length=%jx\n", 1414 file->name, offset, length); 1415 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1416 } 1417 1418 struct spdk_io_channel * 1419 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs, uint32_t priority) 1420 { 1421 struct spdk_io_channel *io_channel; 1422 struct spdk_fs_channel *fs_channel; 1423 1424 io_channel = spdk_get_io_channel(&fs->io_target, priority); 1425 fs_channel = spdk_io_channel_get_ctx(io_channel); 1426 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs, SPDK_IO_PRIORITY_DEFAULT); 1427 fs_channel->send_request = __send_request_direct; 1428 1429 return io_channel; 1430 } 1431 1432 struct spdk_io_channel * 1433 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs, uint32_t priority) 1434 { 1435 struct spdk_io_channel *io_channel; 1436 struct spdk_fs_channel *fs_channel; 1437 1438 io_channel = spdk_get_io_channel(&fs->io_target, priority); 1439 fs_channel = spdk_io_channel_get_ctx(io_channel); 1440 fs_channel->send_request = fs->send_request; 1441 1442 return io_channel; 1443 } 1444 1445 void 1446 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1447 { 1448 spdk_put_io_channel(channel); 1449 } 1450 1451 void 1452 spdk_fs_set_cache_size(uint64_t size_in_mb) 1453 { 1454 g_fs_cache_size = size_in_mb * 1024 * 1024; 1455 } 1456 1457 uint64_t 1458 spdk_fs_get_cache_size(void) 1459 { 1460 return g_fs_cache_size / (1024 * 1024); 1461 } 1462 1463 static void __file_flush(void *_args); 1464 1465 static void * 1466 alloc_cache_memory_buffer(struct spdk_file *context) 1467 { 1468 struct spdk_file *file; 1469 void *buf; 1470 1471 buf = spdk_mempool_get(g_cache_pool); 1472 if (buf != NULL) { 1473 return buf; 1474 } 1475 1476 pthread_spin_lock(&g_caches_lock); 1477 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1478 if (!file->open_for_writing && 1479 file->priority == SPDK_FILE_PRIORITY_LOW && 1480 file != context) { 1481 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1482 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1483 break; 1484 } 1485 } 1486 pthread_spin_unlock(&g_caches_lock); 1487 if (file != NULL) { 1488 cache_free_buffers(file); 1489 buf = spdk_mempool_get(g_cache_pool); 1490 if (buf != NULL) { 1491 return buf; 1492 } 1493 } 1494 1495 pthread_spin_lock(&g_caches_lock); 1496 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1497 if (!file->open_for_writing && file != context) { 1498 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1499 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1500 break; 1501 } 1502 } 1503 pthread_spin_unlock(&g_caches_lock); 1504 if (file != NULL) { 1505 cache_free_buffers(file); 1506 buf = spdk_mempool_get(g_cache_pool); 1507 if (buf != NULL) { 1508 return buf; 1509 } 1510 } 1511 1512 pthread_spin_lock(&g_caches_lock); 1513 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1514 if (file != context) { 1515 TAILQ_REMOVE(&g_caches, file, cache_tailq); 1516 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1517 break; 1518 } 1519 } 1520 pthread_spin_unlock(&g_caches_lock); 1521 if (file != NULL) { 1522 cache_free_buffers(file); 1523 buf = spdk_mempool_get(g_cache_pool); 1524 if (buf != NULL) { 1525 return buf; 1526 } 1527 } 1528 1529 assert(false); 1530 return NULL; 1531 } 1532 1533 static struct cache_buffer * 1534 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1535 { 1536 struct cache_buffer *buf; 1537 int count = 0; 1538 1539 buf = calloc(1, sizeof(*buf)); 1540 if (buf == NULL) { 1541 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "calloc failed\n"); 1542 return NULL; 1543 } 1544 1545 buf->buf = alloc_cache_memory_buffer(file); 1546 if (buf->buf == NULL) { 1547 while (buf->buf == NULL) { 1548 /* 1549 * TODO: alloc_cache_memory_buffer() should eventually free 1550 * some buffers. Need a more sophisticated check here, instead 1551 * of just bailing if 100 tries does not result in getting a 1552 * free buffer. This will involve using the sync channel's 1553 * semaphore to block until a buffer becomes available. 1554 */ 1555 if (count++ == 100) { 1556 SPDK_ERRLOG("could not allocate cache buffer\n"); 1557 assert(false); 1558 free(buf); 1559 return NULL; 1560 } 1561 buf->buf = alloc_cache_memory_buffer(file); 1562 } 1563 } 1564 1565 buf->buf_size = CACHE_BUFFER_SIZE; 1566 buf->offset = offset; 1567 1568 pthread_spin_lock(&g_caches_lock); 1569 if (file->tree->present_mask == 0) { 1570 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1571 } 1572 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1573 pthread_spin_unlock(&g_caches_lock); 1574 1575 return buf; 1576 } 1577 1578 static struct cache_buffer * 1579 cache_append_buffer(struct spdk_file *file) 1580 { 1581 struct cache_buffer *last; 1582 1583 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1584 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1585 1586 last = cache_insert_buffer(file, file->append_pos); 1587 if (last == NULL) { 1588 SPDK_TRACELOG(SPDK_TRACE_BLOBFS, "cache_insert_buffer failed\n"); 1589 return NULL; 1590 } 1591 1592 if (file->last != NULL) { 1593 file->last->next = last; 1594 } 1595 file->last = last; 1596 1597 return last; 1598 } 1599 1600 static void 1601 __wake_caller(struct spdk_fs_cb_args *args) 1602 { 1603 sem_post(args->sem); 1604 } 1605 1606 static void 1607 __file_cache_finish_sync(struct spdk_file *file) 1608 { 1609 struct spdk_fs_request *sync_req; 1610 struct spdk_fs_cb_args *sync_args; 1611 1612 pthread_spin_lock(&file->lock); 1613 while (!TAILQ_EMPTY(&file->sync_requests)) { 1614 sync_req = TAILQ_FIRST(&file->sync_requests); 1615 sync_args = &sync_req->args; 1616 if (sync_args->op.sync.offset > file->length_flushed) { 1617 break; 1618 } 1619 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1620 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1621 pthread_spin_unlock(&file->lock); 1622 sync_args->fn.file_op(sync_args->arg, 0); 1623 pthread_spin_lock(&file->lock); 1624 free_fs_request(sync_req); 1625 } 1626 pthread_spin_unlock(&file->lock); 1627 } 1628 1629 static void 1630 __file_cache_finish_sync_bs_cb(void *ctx, int bserrno) 1631 { 1632 struct spdk_file *file = ctx; 1633 1634 __file_cache_finish_sync(file); 1635 } 1636 1637 static void 1638 __free_args(struct spdk_fs_cb_args *args) 1639 { 1640 struct spdk_fs_request *req; 1641 1642 if (!args->from_request) { 1643 free(args); 1644 } else { 1645 /* Depends on args being at the start of the spdk_fs_request structure. */ 1646 req = (struct spdk_fs_request *)args; 1647 free_fs_request(req); 1648 } 1649 } 1650 1651 static void 1652 __file_flush_done(void *arg, int bserrno) 1653 { 1654 struct spdk_fs_cb_args *args = arg; 1655 struct spdk_fs_request *sync_req; 1656 struct spdk_file *file = args->file; 1657 struct cache_buffer *next = args->op.flush.cache_buffer; 1658 1659 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1660 1661 pthread_spin_lock(&file->lock); 1662 next->in_progress = false; 1663 next->bytes_flushed += args->op.flush.length; 1664 file->length_flushed += args->op.flush.length; 1665 if (file->length_flushed > file->length) { 1666 file->length = file->length_flushed; 1667 } 1668 if (next->bytes_flushed == next->buf_size) { 1669 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1670 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1671 } 1672 1673 TAILQ_FOREACH_REVERSE(sync_req, &file->sync_requests, sync_requests_head, args.op.sync.tailq) { 1674 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1675 break; 1676 } 1677 } 1678 1679 /* 1680 * Assert that there is no cached data that extends past the end of the underlying 1681 * blob. 1682 */ 1683 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1684 next->bytes_filled == 0); 1685 1686 if (sync_req != NULL) { 1687 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1688 spdk_blob_md_set_xattr(file->blob, "length", &file->length_flushed, 1689 sizeof(file->length_flushed)); 1690 1691 pthread_spin_unlock(&file->lock); 1692 spdk_bs_md_sync_blob(file->blob, __file_cache_finish_sync_bs_cb, file); 1693 } else { 1694 pthread_spin_unlock(&file->lock); 1695 __file_cache_finish_sync(file); 1696 } 1697 1698 __file_flush(args); 1699 } 1700 1701 static void 1702 __file_flush(void *_args) 1703 { 1704 struct spdk_fs_cb_args *args = _args; 1705 struct spdk_file *file = args->file; 1706 struct cache_buffer *next; 1707 uint64_t offset, length, start_page, num_pages; 1708 uint32_t page_size; 1709 1710 pthread_spin_lock(&file->lock); 1711 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1712 if (next == NULL || next->in_progress) { 1713 /* 1714 * There is either no data to flush, or a flush I/O is already in 1715 * progress. So return immediately - if a flush I/O is in 1716 * progress we will flush more data after that is completed. 1717 */ 1718 __free_args(args); 1719 pthread_spin_unlock(&file->lock); 1720 return; 1721 } 1722 1723 offset = next->offset + next->bytes_flushed; 1724 length = next->bytes_filled - next->bytes_flushed; 1725 if (length == 0) { 1726 __free_args(args); 1727 pthread_spin_unlock(&file->lock); 1728 return; 1729 } 1730 args->op.flush.length = length; 1731 args->op.flush.cache_buffer = next; 1732 1733 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1734 1735 next->in_progress = true; 1736 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1737 offset, length, start_page, num_pages); 1738 pthread_spin_unlock(&file->lock); 1739 spdk_bs_io_write_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1740 next->buf + (start_page * page_size) - next->offset, 1741 start_page, num_pages, 1742 __file_flush_done, args); 1743 } 1744 1745 static void 1746 __file_extend_done(void *arg, int bserrno) 1747 { 1748 struct spdk_fs_cb_args *args = arg; 1749 1750 __wake_caller(args); 1751 } 1752 1753 static void 1754 __file_extend_blob(void *_args) 1755 { 1756 struct spdk_fs_cb_args *args = _args; 1757 struct spdk_file *file = args->file; 1758 1759 spdk_bs_md_resize_blob(file->blob, args->op.resize.num_clusters); 1760 1761 spdk_bs_md_sync_blob(file->blob, __file_extend_done, args); 1762 } 1763 1764 static void 1765 __rw_from_file_done(void *arg, int bserrno) 1766 { 1767 struct spdk_fs_cb_args *args = arg; 1768 1769 __wake_caller(args); 1770 __free_args(args); 1771 } 1772 1773 static void 1774 __rw_from_file(void *_args) 1775 { 1776 struct spdk_fs_cb_args *args = _args; 1777 struct spdk_file *file = args->file; 1778 1779 if (args->op.rw.is_read) { 1780 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1781 args->op.rw.offset, args->op.rw.length, 1782 __rw_from_file_done, args); 1783 } else { 1784 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 1785 args->op.rw.offset, args->op.rw.length, 1786 __rw_from_file_done, args); 1787 } 1788 } 1789 1790 static int 1791 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 1792 uint64_t offset, uint64_t length, bool is_read) 1793 { 1794 struct spdk_fs_cb_args *args; 1795 1796 args = calloc(1, sizeof(*args)); 1797 if (args == NULL) { 1798 sem_post(sem); 1799 return -ENOMEM; 1800 } 1801 1802 args->file = file; 1803 args->sem = sem; 1804 args->op.rw.user_buf = payload; 1805 args->op.rw.offset = offset; 1806 args->op.rw.length = length; 1807 args->op.rw.is_read = is_read; 1808 file->fs->send_request(__rw_from_file, args); 1809 return 0; 1810 } 1811 1812 int 1813 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 1814 void *payload, uint64_t offset, uint64_t length) 1815 { 1816 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1817 struct spdk_fs_cb_args *args; 1818 uint64_t rem_length, copy, blob_size, cluster_sz; 1819 uint32_t cache_buffers_filled = 0; 1820 uint8_t *cur_payload; 1821 struct cache_buffer *last; 1822 1823 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 1824 1825 if (length == 0) { 1826 return 0; 1827 } 1828 1829 if (offset != file->append_pos) { 1830 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 1831 return -EINVAL; 1832 } 1833 1834 pthread_spin_lock(&file->lock); 1835 file->open_for_writing = true; 1836 1837 if (file->last == NULL) { 1838 if (file->append_pos % CACHE_BUFFER_SIZE == 0) { 1839 cache_append_buffer(file); 1840 } else { 1841 int rc; 1842 1843 file->append_pos += length; 1844 rc = __send_rw_from_file(file, &channel->sem, payload, 1845 offset, length, false); 1846 pthread_spin_unlock(&file->lock); 1847 sem_wait(&channel->sem); 1848 return rc; 1849 } 1850 } 1851 1852 blob_size = __file_get_blob_size(file); 1853 1854 if ((offset + length) > blob_size) { 1855 struct spdk_fs_cb_args extend_args = {}; 1856 1857 cluster_sz = file->fs->bs_opts.cluster_sz; 1858 extend_args.sem = &channel->sem; 1859 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 1860 extend_args.file = file; 1861 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 1862 pthread_spin_unlock(&file->lock); 1863 file->fs->send_request(__file_extend_blob, &extend_args); 1864 sem_wait(&channel->sem); 1865 } 1866 1867 last = file->last; 1868 rem_length = length; 1869 cur_payload = payload; 1870 while (rem_length > 0) { 1871 copy = last->buf_size - last->bytes_filled; 1872 if (copy > rem_length) { 1873 copy = rem_length; 1874 } 1875 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 1876 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 1877 file->append_pos += copy; 1878 if (file->length < file->append_pos) { 1879 file->length = file->append_pos; 1880 } 1881 cur_payload += copy; 1882 last->bytes_filled += copy; 1883 rem_length -= copy; 1884 if (last->bytes_filled == last->buf_size) { 1885 cache_buffers_filled++; 1886 last = cache_append_buffer(file); 1887 if (last == NULL) { 1888 BLOBFS_TRACE(file, "nomem\n"); 1889 pthread_spin_unlock(&file->lock); 1890 return -ENOMEM; 1891 } 1892 } 1893 } 1894 1895 if (cache_buffers_filled == 0) { 1896 pthread_spin_unlock(&file->lock); 1897 return 0; 1898 } 1899 1900 args = calloc(1, sizeof(*args)); 1901 if (args == NULL) { 1902 pthread_spin_unlock(&file->lock); 1903 return -ENOMEM; 1904 } 1905 1906 args->file = file; 1907 file->fs->send_request(__file_flush, args); 1908 pthread_spin_unlock(&file->lock); 1909 return 0; 1910 } 1911 1912 static void 1913 __readahead_done(void *arg, int bserrno) 1914 { 1915 struct spdk_fs_cb_args *args = arg; 1916 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 1917 struct spdk_file *file = args->file; 1918 1919 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 1920 1921 pthread_spin_lock(&file->lock); 1922 cache_buffer->bytes_filled = args->op.readahead.length; 1923 cache_buffer->bytes_flushed = args->op.readahead.length; 1924 cache_buffer->in_progress = false; 1925 pthread_spin_unlock(&file->lock); 1926 1927 __free_args(args); 1928 } 1929 1930 static void 1931 __readahead(void *_args) 1932 { 1933 struct spdk_fs_cb_args *args = _args; 1934 struct spdk_file *file = args->file; 1935 uint64_t offset, length, start_page, num_pages; 1936 uint32_t page_size; 1937 1938 offset = args->op.readahead.offset; 1939 length = args->op.readahead.length; 1940 assert(length > 0); 1941 1942 __get_page_parameters(file, offset, length, &start_page, &page_size, &num_pages); 1943 1944 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 1945 offset, length, start_page, num_pages); 1946 spdk_bs_io_read_blob(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 1947 args->op.readahead.cache_buffer->buf, 1948 start_page, num_pages, 1949 __readahead_done, args); 1950 } 1951 1952 static uint64_t 1953 __next_cache_buffer_offset(uint64_t offset) 1954 { 1955 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 1956 } 1957 1958 static void 1959 check_readahead(struct spdk_file *file, uint64_t offset) 1960 { 1961 struct spdk_fs_cb_args *args; 1962 1963 offset = __next_cache_buffer_offset(offset); 1964 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 1965 return; 1966 } 1967 1968 args = calloc(1, sizeof(*args)); 1969 if (args == NULL) { 1970 return; 1971 } 1972 1973 BLOBFS_TRACE(file, "offset=%jx\n", offset); 1974 1975 args->file = file; 1976 args->op.readahead.offset = offset; 1977 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 1978 args->op.readahead.cache_buffer->in_progress = true; 1979 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 1980 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 1981 } else { 1982 args->op.readahead.length = CACHE_BUFFER_SIZE; 1983 } 1984 file->fs->send_request(__readahead, args); 1985 } 1986 1987 static int 1988 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 1989 { 1990 struct cache_buffer *buf; 1991 1992 buf = spdk_tree_find_filled_buffer(file->tree, offset); 1993 if (buf == NULL) { 1994 return __send_rw_from_file(file, sem, payload, offset, length, true); 1995 } 1996 1997 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 1998 length = buf->offset + buf->bytes_filled - offset; 1999 } 2000 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2001 memcpy(payload, &buf->buf[offset - buf->offset], length); 2002 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2003 pthread_spin_lock(&g_caches_lock); 2004 spdk_tree_remove_buffer(file->tree, buf); 2005 if (file->tree->present_mask == 0) { 2006 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2007 } 2008 pthread_spin_unlock(&g_caches_lock); 2009 } 2010 2011 sem_post(sem); 2012 return 0; 2013 } 2014 2015 int64_t 2016 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2017 void *payload, uint64_t offset, uint64_t length) 2018 { 2019 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2020 uint64_t final_offset, final_length; 2021 uint32_t sub_reads = 0; 2022 int rc = 0; 2023 2024 pthread_spin_lock(&file->lock); 2025 2026 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2027 2028 file->open_for_writing = false; 2029 2030 if (length == 0 || offset >= file->length) { 2031 pthread_spin_unlock(&file->lock); 2032 return 0; 2033 } 2034 2035 if (offset + length > file->length) { 2036 length = file->length - offset; 2037 } 2038 2039 if (offset != file->next_seq_offset) { 2040 file->seq_byte_count = 0; 2041 } 2042 file->seq_byte_count += length; 2043 file->next_seq_offset = offset + length; 2044 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2045 check_readahead(file, offset); 2046 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2047 } 2048 2049 final_length = 0; 2050 final_offset = offset + length; 2051 while (offset < final_offset) { 2052 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2053 if (length > (final_offset - offset)) { 2054 length = final_offset - offset; 2055 } 2056 rc = __file_read(file, payload, offset, length, &channel->sem); 2057 if (rc == 0) { 2058 final_length += length; 2059 } else { 2060 break; 2061 } 2062 payload += length; 2063 offset += length; 2064 sub_reads++; 2065 } 2066 pthread_spin_unlock(&file->lock); 2067 while (sub_reads-- > 0) { 2068 sem_wait(&channel->sem); 2069 } 2070 if (rc == 0) { 2071 return final_length; 2072 } else { 2073 return rc; 2074 } 2075 } 2076 2077 static void 2078 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2079 spdk_file_op_complete cb_fn, void *cb_arg) 2080 { 2081 struct spdk_fs_request *sync_req; 2082 struct spdk_fs_request *flush_req; 2083 struct spdk_fs_cb_args *sync_args; 2084 struct spdk_fs_cb_args *flush_args; 2085 2086 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2087 2088 pthread_spin_lock(&file->lock); 2089 if (file->append_pos <= file->length_flushed || file->last == NULL) { 2090 BLOBFS_TRACE(file, "done - no data to flush\n"); 2091 pthread_spin_unlock(&file->lock); 2092 cb_fn(cb_arg, 0); 2093 return; 2094 } 2095 2096 sync_req = alloc_fs_request(channel); 2097 assert(sync_req != NULL); 2098 sync_args = &sync_req->args; 2099 2100 flush_req = alloc_fs_request(channel); 2101 assert(flush_req != NULL); 2102 flush_args = &flush_req->args; 2103 2104 sync_args->file = file; 2105 sync_args->fn.file_op = cb_fn; 2106 sync_args->arg = cb_arg; 2107 sync_args->op.sync.offset = file->append_pos; 2108 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2109 pthread_spin_unlock(&file->lock); 2110 2111 flush_args->file = file; 2112 channel->send_request(__file_flush, flush_args); 2113 } 2114 2115 int 2116 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2117 { 2118 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2119 2120 _file_sync(file, channel, __sem_post, &channel->sem); 2121 sem_wait(&channel->sem); 2122 2123 return 0; 2124 } 2125 2126 void 2127 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2128 spdk_file_op_complete cb_fn, void *cb_arg) 2129 { 2130 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2131 2132 _file_sync(file, channel, cb_fn, cb_arg); 2133 } 2134 2135 void 2136 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2137 { 2138 BLOBFS_TRACE(file, "priority=%u\n", priority); 2139 file->priority = priority; 2140 2141 } 2142 2143 /* 2144 * Close routines 2145 */ 2146 2147 static void 2148 __file_close_async_done(void *ctx, int bserrno) 2149 { 2150 struct spdk_fs_request *req = ctx; 2151 struct spdk_fs_cb_args *args = &req->args; 2152 2153 args->fn.file_op(args->arg, bserrno); 2154 free_fs_request(req); 2155 } 2156 2157 static void 2158 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2159 { 2160 pthread_spin_lock(&file->lock); 2161 if (file->ref_count == 0) { 2162 pthread_spin_unlock(&file->lock); 2163 __file_close_async_done(req, -EBADF); 2164 return; 2165 } 2166 2167 file->ref_count--; 2168 if (file->ref_count > 0) { 2169 pthread_spin_unlock(&file->lock); 2170 __file_close_async_done(req, 0); 2171 return; 2172 } 2173 2174 pthread_spin_unlock(&file->lock); 2175 2176 spdk_bs_md_close_blob(&file->blob, __file_close_async_done, req); 2177 } 2178 2179 static void 2180 __file_close_async__sync_done(void *arg, int fserrno) 2181 { 2182 struct spdk_fs_request *req = arg; 2183 struct spdk_fs_cb_args *args = &req->args; 2184 2185 __file_close_async(args->file, req); 2186 } 2187 2188 void 2189 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2190 { 2191 struct spdk_fs_request *req; 2192 struct spdk_fs_cb_args *args; 2193 2194 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2195 if (req == NULL) { 2196 cb_fn(cb_arg, -ENOMEM); 2197 return; 2198 } 2199 2200 args = &req->args; 2201 args->file = file; 2202 args->fn.file_op = cb_fn; 2203 args->arg = cb_arg; 2204 2205 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2206 } 2207 2208 static void 2209 __file_close_done(void *arg, int fserrno) 2210 { 2211 struct spdk_fs_cb_args *args = arg; 2212 2213 args->rc = fserrno; 2214 sem_post(args->sem); 2215 } 2216 2217 static void 2218 __file_close(void *arg) 2219 { 2220 struct spdk_fs_request *req = arg; 2221 struct spdk_fs_cb_args *args = &req->args; 2222 struct spdk_file *file = args->file; 2223 2224 __file_close_async(file, req); 2225 } 2226 2227 int 2228 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2229 { 2230 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2231 struct spdk_fs_request *req; 2232 struct spdk_fs_cb_args *args; 2233 2234 req = alloc_fs_request(channel); 2235 assert(req != NULL); 2236 2237 args = &req->args; 2238 2239 spdk_file_sync(file, _channel); 2240 BLOBFS_TRACE(file, "name=%s\n", file->name); 2241 args->file = file; 2242 args->sem = &channel->sem; 2243 args->fn.file_op = __file_close_done; 2244 args->arg = req; 2245 channel->send_request(__file_close, req); 2246 sem_wait(&channel->sem); 2247 2248 return args->rc; 2249 } 2250 2251 static void 2252 cache_free_buffers(struct spdk_file *file) 2253 { 2254 BLOBFS_TRACE(file, "free=%s\n", file->name); 2255 pthread_spin_lock(&file->lock); 2256 pthread_spin_lock(&g_caches_lock); 2257 if (file->tree->present_mask == 0) { 2258 pthread_spin_unlock(&g_caches_lock); 2259 pthread_spin_unlock(&file->lock); 2260 return; 2261 } 2262 spdk_tree_free_buffers(file->tree); 2263 if (file->tree->present_mask == 0) { 2264 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2265 } 2266 file->last = NULL; 2267 pthread_spin_unlock(&g_caches_lock); 2268 pthread_spin_unlock(&file->lock); 2269 } 2270 2271 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs", SPDK_TRACE_BLOBFS); 2272 SPDK_LOG_REGISTER_TRACE_FLAG("blobfs_rw", SPDK_TRACE_BLOBFS_RW); 2273