1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk/stdinc.h" 35 36 #include "spdk/blobfs.h" 37 #include "spdk/conf.h" 38 #include "blobfs_internal.h" 39 40 #include "spdk/queue.h" 41 #include "spdk/thread.h" 42 #include "spdk/assert.h" 43 #include "spdk/env.h" 44 #include "spdk/util.h" 45 #include "spdk_internal/log.h" 46 47 #define BLOBFS_TRACE(file, str, args...) \ 48 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s " str, file->name, ##args) 49 50 #define BLOBFS_TRACE_RW(file, str, args...) \ 51 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS_RW, "file=%s " str, file->name, ##args) 52 53 #define BLOBFS_DEFAULT_CACHE_SIZE (4ULL * 1024 * 1024 * 1024) 54 #define SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ (1024 * 1024) 55 56 static uint64_t g_fs_cache_size = BLOBFS_DEFAULT_CACHE_SIZE; 57 static struct spdk_mempool *g_cache_pool; 58 static TAILQ_HEAD(, spdk_file) g_caches; 59 static int g_fs_count = 0; 60 static pthread_mutex_t g_cache_init_lock = PTHREAD_MUTEX_INITIALIZER; 61 static pthread_spinlock_t g_caches_lock; 62 63 void 64 spdk_cache_buffer_free(struct cache_buffer *cache_buffer) 65 { 66 spdk_mempool_put(g_cache_pool, cache_buffer->buf); 67 free(cache_buffer); 68 } 69 70 #define CACHE_READAHEAD_THRESHOLD (128 * 1024) 71 72 struct spdk_file { 73 struct spdk_filesystem *fs; 74 struct spdk_blob *blob; 75 char *name; 76 uint64_t length; 77 bool is_deleted; 78 bool open_for_writing; 79 uint64_t length_flushed; 80 uint64_t append_pos; 81 uint64_t seq_byte_count; 82 uint64_t next_seq_offset; 83 uint32_t priority; 84 TAILQ_ENTRY(spdk_file) tailq; 85 spdk_blob_id blobid; 86 uint32_t ref_count; 87 pthread_spinlock_t lock; 88 struct cache_buffer *last; 89 struct cache_tree *tree; 90 TAILQ_HEAD(open_requests_head, spdk_fs_request) open_requests; 91 TAILQ_HEAD(sync_requests_head, spdk_fs_request) sync_requests; 92 TAILQ_ENTRY(spdk_file) cache_tailq; 93 }; 94 95 struct spdk_deleted_file { 96 spdk_blob_id id; 97 TAILQ_ENTRY(spdk_deleted_file) tailq; 98 }; 99 100 struct spdk_filesystem { 101 struct spdk_blob_store *bs; 102 TAILQ_HEAD(, spdk_file) files; 103 struct spdk_bs_opts bs_opts; 104 struct spdk_bs_dev *bdev; 105 fs_send_request_fn send_request; 106 107 struct { 108 uint32_t max_ops; 109 struct spdk_io_channel *sync_io_channel; 110 struct spdk_fs_channel *sync_fs_channel; 111 } sync_target; 112 113 struct { 114 uint32_t max_ops; 115 struct spdk_io_channel *md_io_channel; 116 struct spdk_fs_channel *md_fs_channel; 117 } md_target; 118 119 struct { 120 uint32_t max_ops; 121 } io_target; 122 }; 123 124 struct spdk_fs_cb_args { 125 union { 126 spdk_fs_op_with_handle_complete fs_op_with_handle; 127 spdk_fs_op_complete fs_op; 128 spdk_file_op_with_handle_complete file_op_with_handle; 129 spdk_file_op_complete file_op; 130 spdk_file_stat_op_complete stat_op; 131 } fn; 132 void *arg; 133 sem_t *sem; 134 struct spdk_filesystem *fs; 135 struct spdk_file *file; 136 int rc; 137 bool from_request; 138 union { 139 struct { 140 TAILQ_HEAD(, spdk_deleted_file) deleted_files; 141 } fs_load; 142 struct { 143 uint64_t length; 144 } truncate; 145 struct { 146 struct spdk_io_channel *channel; 147 void *user_buf; 148 void *pin_buf; 149 int is_read; 150 off_t offset; 151 size_t length; 152 uint64_t start_lba; 153 uint64_t num_lba; 154 uint32_t blocklen; 155 } rw; 156 struct { 157 const char *old_name; 158 const char *new_name; 159 } rename; 160 struct { 161 struct cache_buffer *cache_buffer; 162 uint64_t length; 163 } flush; 164 struct { 165 struct cache_buffer *cache_buffer; 166 uint64_t length; 167 uint64_t offset; 168 } readahead; 169 struct { 170 uint64_t offset; 171 TAILQ_ENTRY(spdk_fs_request) tailq; 172 bool xattr_in_progress; 173 } sync; 174 struct { 175 uint32_t num_clusters; 176 } resize; 177 struct { 178 const char *name; 179 uint32_t flags; 180 TAILQ_ENTRY(spdk_fs_request) tailq; 181 } open; 182 struct { 183 const char *name; 184 struct spdk_blob *blob; 185 } create; 186 struct { 187 const char *name; 188 } delete; 189 struct { 190 const char *name; 191 } stat; 192 } op; 193 }; 194 195 static void cache_free_buffers(struct spdk_file *file); 196 197 void 198 spdk_fs_opts_init(struct spdk_blobfs_opts *opts) 199 { 200 opts->cluster_sz = SPDK_BLOBFS_DEFAULT_OPTS_CLUSTER_SZ; 201 } 202 203 static void 204 __initialize_cache(void) 205 { 206 assert(g_cache_pool == NULL); 207 208 g_cache_pool = spdk_mempool_create("spdk_fs_cache", 209 g_fs_cache_size / CACHE_BUFFER_SIZE, 210 CACHE_BUFFER_SIZE, 211 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, 212 SPDK_ENV_SOCKET_ID_ANY); 213 if (!g_cache_pool) { 214 SPDK_ERRLOG("Create mempool failed, you may " 215 "increase the memory and try again\n"); 216 assert(false); 217 } 218 TAILQ_INIT(&g_caches); 219 pthread_spin_init(&g_caches_lock, 0); 220 } 221 222 static void 223 __free_cache(void) 224 { 225 assert(g_cache_pool != NULL); 226 227 spdk_mempool_free(g_cache_pool); 228 g_cache_pool = NULL; 229 } 230 231 static uint64_t 232 __file_get_blob_size(struct spdk_file *file) 233 { 234 uint64_t cluster_sz; 235 236 cluster_sz = file->fs->bs_opts.cluster_sz; 237 return cluster_sz * spdk_blob_get_num_clusters(file->blob); 238 } 239 240 struct spdk_fs_request { 241 struct spdk_fs_cb_args args; 242 TAILQ_ENTRY(spdk_fs_request) link; 243 struct spdk_fs_channel *channel; 244 }; 245 246 struct spdk_fs_channel { 247 struct spdk_fs_request *req_mem; 248 TAILQ_HEAD(, spdk_fs_request) reqs; 249 sem_t sem; 250 struct spdk_filesystem *fs; 251 struct spdk_io_channel *bs_channel; 252 fs_send_request_fn send_request; 253 bool sync; 254 pthread_spinlock_t lock; 255 }; 256 257 static struct spdk_fs_request * 258 alloc_fs_request(struct spdk_fs_channel *channel) 259 { 260 struct spdk_fs_request *req; 261 262 if (channel->sync) { 263 pthread_spin_lock(&channel->lock); 264 } 265 266 req = TAILQ_FIRST(&channel->reqs); 267 if (req) { 268 TAILQ_REMOVE(&channel->reqs, req, link); 269 } 270 271 if (channel->sync) { 272 pthread_spin_unlock(&channel->lock); 273 } 274 275 if (req == NULL) { 276 SPDK_ERRLOG("Cannot allocate req on spdk_fs_channel =%p\n", channel); 277 return NULL; 278 } 279 memset(req, 0, sizeof(*req)); 280 req->channel = channel; 281 req->args.from_request = true; 282 283 return req; 284 } 285 286 static void 287 free_fs_request(struct spdk_fs_request *req) 288 { 289 struct spdk_fs_channel *channel = req->channel; 290 291 if (channel->sync) { 292 pthread_spin_lock(&channel->lock); 293 } 294 295 TAILQ_INSERT_HEAD(&req->channel->reqs, req, link); 296 297 if (channel->sync) { 298 pthread_spin_unlock(&channel->lock); 299 } 300 } 301 302 static int 303 _spdk_fs_channel_create(struct spdk_filesystem *fs, struct spdk_fs_channel *channel, 304 uint32_t max_ops) 305 { 306 uint32_t i; 307 308 channel->req_mem = calloc(max_ops, sizeof(struct spdk_fs_request)); 309 if (!channel->req_mem) { 310 return -1; 311 } 312 313 TAILQ_INIT(&channel->reqs); 314 sem_init(&channel->sem, 0, 0); 315 316 for (i = 0; i < max_ops; i++) { 317 TAILQ_INSERT_TAIL(&channel->reqs, &channel->req_mem[i], link); 318 } 319 320 channel->fs = fs; 321 322 return 0; 323 } 324 325 static int 326 _spdk_fs_md_channel_create(void *io_device, void *ctx_buf) 327 { 328 struct spdk_filesystem *fs; 329 struct spdk_fs_channel *channel = ctx_buf; 330 331 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, md_target); 332 333 return _spdk_fs_channel_create(fs, channel, fs->md_target.max_ops); 334 } 335 336 static int 337 _spdk_fs_sync_channel_create(void *io_device, void *ctx_buf) 338 { 339 struct spdk_filesystem *fs; 340 struct spdk_fs_channel *channel = ctx_buf; 341 342 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, sync_target); 343 344 return _spdk_fs_channel_create(fs, channel, fs->sync_target.max_ops); 345 } 346 347 static int 348 _spdk_fs_io_channel_create(void *io_device, void *ctx_buf) 349 { 350 struct spdk_filesystem *fs; 351 struct spdk_fs_channel *channel = ctx_buf; 352 353 fs = SPDK_CONTAINEROF(io_device, struct spdk_filesystem, io_target); 354 355 return _spdk_fs_channel_create(fs, channel, fs->io_target.max_ops); 356 } 357 358 static void 359 _spdk_fs_channel_destroy(void *io_device, void *ctx_buf) 360 { 361 struct spdk_fs_channel *channel = ctx_buf; 362 363 free(channel->req_mem); 364 if (channel->bs_channel != NULL) { 365 spdk_bs_free_io_channel(channel->bs_channel); 366 } 367 } 368 369 static void 370 __send_request_direct(fs_request_fn fn, void *arg) 371 { 372 fn(arg); 373 } 374 375 static void 376 common_fs_bs_init(struct spdk_filesystem *fs, struct spdk_blob_store *bs) 377 { 378 fs->bs = bs; 379 fs->bs_opts.cluster_sz = spdk_bs_get_cluster_size(bs); 380 fs->md_target.md_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 381 fs->md_target.md_fs_channel->send_request = __send_request_direct; 382 fs->sync_target.sync_fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 383 fs->sync_target.sync_fs_channel->send_request = __send_request_direct; 384 385 pthread_mutex_lock(&g_cache_init_lock); 386 if (g_fs_count == 0) { 387 __initialize_cache(); 388 } 389 g_fs_count++; 390 pthread_mutex_unlock(&g_cache_init_lock); 391 } 392 393 static void 394 init_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 395 { 396 struct spdk_fs_request *req = ctx; 397 struct spdk_fs_cb_args *args = &req->args; 398 struct spdk_filesystem *fs = args->fs; 399 400 if (bserrno == 0) { 401 common_fs_bs_init(fs, bs); 402 } else { 403 free(fs); 404 fs = NULL; 405 } 406 407 args->fn.fs_op_with_handle(args->arg, fs, bserrno); 408 free_fs_request(req); 409 } 410 411 static void 412 fs_conf_parse(void) 413 { 414 struct spdk_conf_section *sp; 415 416 sp = spdk_conf_find_section(NULL, "Blobfs"); 417 if (sp == NULL) { 418 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 419 return; 420 } 421 422 g_fs_cache_buffer_shift = spdk_conf_section_get_intval(sp, "CacheBufferShift"); 423 if (g_fs_cache_buffer_shift <= 0) { 424 g_fs_cache_buffer_shift = CACHE_BUFFER_SHIFT_DEFAULT; 425 } 426 } 427 428 static struct spdk_filesystem * 429 fs_alloc(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn) 430 { 431 struct spdk_filesystem *fs; 432 433 fs = calloc(1, sizeof(*fs)); 434 if (fs == NULL) { 435 return NULL; 436 } 437 438 fs->bdev = dev; 439 fs->send_request = send_request_fn; 440 TAILQ_INIT(&fs->files); 441 442 fs->md_target.max_ops = 512; 443 spdk_io_device_register(&fs->md_target, _spdk_fs_md_channel_create, _spdk_fs_channel_destroy, 444 sizeof(struct spdk_fs_channel), "blobfs_md"); 445 fs->md_target.md_io_channel = spdk_get_io_channel(&fs->md_target); 446 fs->md_target.md_fs_channel = spdk_io_channel_get_ctx(fs->md_target.md_io_channel); 447 448 fs->sync_target.max_ops = 512; 449 spdk_io_device_register(&fs->sync_target, _spdk_fs_sync_channel_create, _spdk_fs_channel_destroy, 450 sizeof(struct spdk_fs_channel), "blobfs_sync"); 451 fs->sync_target.sync_io_channel = spdk_get_io_channel(&fs->sync_target); 452 fs->sync_target.sync_fs_channel = spdk_io_channel_get_ctx(fs->sync_target.sync_io_channel); 453 454 fs->io_target.max_ops = 512; 455 spdk_io_device_register(&fs->io_target, _spdk_fs_io_channel_create, _spdk_fs_channel_destroy, 456 sizeof(struct spdk_fs_channel), "blobfs_io"); 457 458 return fs; 459 } 460 461 static void 462 __wake_caller(void *arg, int fserrno) 463 { 464 struct spdk_fs_cb_args *args = arg; 465 466 args->rc = fserrno; 467 sem_post(args->sem); 468 } 469 470 void 471 spdk_fs_init(struct spdk_bs_dev *dev, struct spdk_blobfs_opts *opt, 472 fs_send_request_fn send_request_fn, 473 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 474 { 475 struct spdk_filesystem *fs; 476 struct spdk_fs_request *req; 477 struct spdk_fs_cb_args *args; 478 struct spdk_bs_opts opts = {}; 479 480 fs = fs_alloc(dev, send_request_fn); 481 if (fs == NULL) { 482 cb_fn(cb_arg, NULL, -ENOMEM); 483 return; 484 } 485 486 fs_conf_parse(); 487 488 req = alloc_fs_request(fs->md_target.md_fs_channel); 489 if (req == NULL) { 490 spdk_put_io_channel(fs->md_target.md_io_channel); 491 spdk_io_device_unregister(&fs->md_target, NULL); 492 spdk_put_io_channel(fs->sync_target.sync_io_channel); 493 spdk_io_device_unregister(&fs->sync_target, NULL); 494 spdk_io_device_unregister(&fs->io_target, NULL); 495 free(fs); 496 cb_fn(cb_arg, NULL, -ENOMEM); 497 return; 498 } 499 500 args = &req->args; 501 args->fn.fs_op_with_handle = cb_fn; 502 args->arg = cb_arg; 503 args->fs = fs; 504 505 spdk_bs_opts_init(&opts); 506 snprintf(opts.bstype.bstype, sizeof(opts.bstype.bstype), "BLOBFS"); 507 if (opt) { 508 opts.cluster_sz = opt->cluster_sz; 509 } 510 spdk_bs_init(dev, &opts, init_cb, req); 511 } 512 513 static struct spdk_file * 514 file_alloc(struct spdk_filesystem *fs) 515 { 516 struct spdk_file *file; 517 518 file = calloc(1, sizeof(*file)); 519 if (file == NULL) { 520 return NULL; 521 } 522 523 file->tree = calloc(1, sizeof(*file->tree)); 524 if (file->tree == NULL) { 525 free(file); 526 return NULL; 527 } 528 529 file->fs = fs; 530 TAILQ_INIT(&file->open_requests); 531 TAILQ_INIT(&file->sync_requests); 532 pthread_spin_init(&file->lock, 0); 533 TAILQ_INSERT_TAIL(&fs->files, file, tailq); 534 file->priority = SPDK_FILE_PRIORITY_LOW; 535 return file; 536 } 537 538 static void fs_load_done(void *ctx, int bserrno); 539 540 static int 541 _handle_deleted_files(struct spdk_fs_request *req) 542 { 543 struct spdk_fs_cb_args *args = &req->args; 544 struct spdk_filesystem *fs = args->fs; 545 546 if (!TAILQ_EMPTY(&args->op.fs_load.deleted_files)) { 547 struct spdk_deleted_file *deleted_file; 548 549 deleted_file = TAILQ_FIRST(&args->op.fs_load.deleted_files); 550 TAILQ_REMOVE(&args->op.fs_load.deleted_files, deleted_file, tailq); 551 spdk_bs_delete_blob(fs->bs, deleted_file->id, fs_load_done, req); 552 free(deleted_file); 553 return 0; 554 } 555 556 return 1; 557 } 558 559 static void 560 fs_load_done(void *ctx, int bserrno) 561 { 562 struct spdk_fs_request *req = ctx; 563 struct spdk_fs_cb_args *args = &req->args; 564 struct spdk_filesystem *fs = args->fs; 565 566 /* The filesystem has been loaded. Now check if there are any files that 567 * were marked for deletion before last unload. Do not complete the 568 * fs_load callback until all of them have been deleted on disk. 569 */ 570 if (_handle_deleted_files(req) == 0) { 571 /* We found a file that's been marked for deleting but not actually 572 * deleted yet. This function will get called again once the delete 573 * operation is completed. 574 */ 575 return; 576 } 577 578 args->fn.fs_op_with_handle(args->arg, fs, 0); 579 free_fs_request(req); 580 581 } 582 583 static void 584 iter_cb(void *ctx, struct spdk_blob *blob, int rc) 585 { 586 struct spdk_fs_request *req = ctx; 587 struct spdk_fs_cb_args *args = &req->args; 588 struct spdk_filesystem *fs = args->fs; 589 uint64_t *length; 590 const char *name; 591 uint32_t *is_deleted; 592 size_t value_len; 593 594 if (rc < 0) { 595 args->fn.fs_op_with_handle(args->arg, fs, rc); 596 free_fs_request(req); 597 return; 598 } 599 600 rc = spdk_blob_get_xattr_value(blob, "name", (const void **)&name, &value_len); 601 if (rc < 0) { 602 args->fn.fs_op_with_handle(args->arg, fs, rc); 603 free_fs_request(req); 604 return; 605 } 606 607 rc = spdk_blob_get_xattr_value(blob, "length", (const void **)&length, &value_len); 608 if (rc < 0) { 609 args->fn.fs_op_with_handle(args->arg, fs, rc); 610 free_fs_request(req); 611 return; 612 } 613 614 assert(value_len == 8); 615 616 /* This file could be deleted last time without close it, then app crashed, so we delete it now */ 617 rc = spdk_blob_get_xattr_value(blob, "is_deleted", (const void **)&is_deleted, &value_len); 618 if (rc < 0) { 619 struct spdk_file *f; 620 621 f = file_alloc(fs); 622 if (f == NULL) { 623 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 624 free_fs_request(req); 625 return; 626 } 627 628 f->name = strdup(name); 629 f->blobid = spdk_blob_get_id(blob); 630 f->length = *length; 631 f->length_flushed = *length; 632 f->append_pos = *length; 633 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "added file %s length=%ju\n", f->name, f->length); 634 } else { 635 struct spdk_deleted_file *deleted_file; 636 637 deleted_file = calloc(1, sizeof(*deleted_file)); 638 if (deleted_file == NULL) { 639 args->fn.fs_op_with_handle(args->arg, fs, -ENOMEM); 640 free_fs_request(req); 641 return; 642 } 643 deleted_file->id = spdk_blob_get_id(blob); 644 TAILQ_INSERT_TAIL(&args->op.fs_load.deleted_files, deleted_file, tailq); 645 } 646 } 647 648 static void 649 load_cb(void *ctx, struct spdk_blob_store *bs, int bserrno) 650 { 651 struct spdk_fs_request *req = ctx; 652 struct spdk_fs_cb_args *args = &req->args; 653 struct spdk_filesystem *fs = args->fs; 654 struct spdk_bs_type bstype; 655 static const struct spdk_bs_type blobfs_type = {"BLOBFS"}; 656 static const struct spdk_bs_type zeros; 657 658 if (bserrno != 0) { 659 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 660 free_fs_request(req); 661 free(fs); 662 return; 663 } 664 665 bstype = spdk_bs_get_bstype(bs); 666 667 if (!memcmp(&bstype, &zeros, sizeof(bstype))) { 668 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "assigning bstype\n"); 669 spdk_bs_set_bstype(bs, blobfs_type); 670 } else if (memcmp(&bstype, &blobfs_type, sizeof(bstype))) { 671 SPDK_DEBUGLOG(SPDK_LOG_BLOB, "not blobfs\n"); 672 SPDK_LOGDUMP(SPDK_LOG_BLOB, "bstype", &bstype, sizeof(bstype)); 673 args->fn.fs_op_with_handle(args->arg, NULL, bserrno); 674 free_fs_request(req); 675 free(fs); 676 return; 677 } 678 679 common_fs_bs_init(fs, bs); 680 fs_load_done(req, 0); 681 } 682 683 static void 684 spdk_fs_io_device_unregister(struct spdk_filesystem *fs) 685 { 686 assert(fs != NULL); 687 spdk_io_device_unregister(&fs->md_target, NULL); 688 spdk_io_device_unregister(&fs->sync_target, NULL); 689 spdk_io_device_unregister(&fs->io_target, NULL); 690 free(fs); 691 } 692 693 static void 694 spdk_fs_free_io_channels(struct spdk_filesystem *fs) 695 { 696 assert(fs != NULL); 697 spdk_fs_free_io_channel(fs->md_target.md_io_channel); 698 spdk_fs_free_io_channel(fs->sync_target.sync_io_channel); 699 } 700 701 void 702 spdk_fs_load(struct spdk_bs_dev *dev, fs_send_request_fn send_request_fn, 703 spdk_fs_op_with_handle_complete cb_fn, void *cb_arg) 704 { 705 struct spdk_filesystem *fs; 706 struct spdk_fs_cb_args *args; 707 struct spdk_fs_request *req; 708 struct spdk_bs_opts bs_opts; 709 710 fs = fs_alloc(dev, send_request_fn); 711 if (fs == NULL) { 712 cb_fn(cb_arg, NULL, -ENOMEM); 713 return; 714 } 715 716 fs_conf_parse(); 717 718 req = alloc_fs_request(fs->md_target.md_fs_channel); 719 if (req == NULL) { 720 spdk_fs_free_io_channels(fs); 721 spdk_fs_io_device_unregister(fs); 722 cb_fn(cb_arg, NULL, -ENOMEM); 723 return; 724 } 725 726 args = &req->args; 727 args->fn.fs_op_with_handle = cb_fn; 728 args->arg = cb_arg; 729 args->fs = fs; 730 TAILQ_INIT(&args->op.fs_load.deleted_files); 731 spdk_bs_opts_init(&bs_opts); 732 bs_opts.iter_cb_fn = iter_cb; 733 bs_opts.iter_cb_arg = req; 734 spdk_bs_load(dev, &bs_opts, load_cb, req); 735 } 736 737 static void 738 unload_cb(void *ctx, int bserrno) 739 { 740 struct spdk_fs_request *req = ctx; 741 struct spdk_fs_cb_args *args = &req->args; 742 struct spdk_filesystem *fs = args->fs; 743 struct spdk_file *file, *tmp; 744 745 TAILQ_FOREACH_SAFE(file, &fs->files, tailq, tmp) { 746 TAILQ_REMOVE(&fs->files, file, tailq); 747 cache_free_buffers(file); 748 free(file->name); 749 free(file->tree); 750 free(file); 751 } 752 753 pthread_mutex_lock(&g_cache_init_lock); 754 g_fs_count--; 755 if (g_fs_count == 0) { 756 __free_cache(); 757 } 758 pthread_mutex_unlock(&g_cache_init_lock); 759 760 args->fn.fs_op(args->arg, bserrno); 761 free(req); 762 763 spdk_fs_io_device_unregister(fs); 764 } 765 766 void 767 spdk_fs_unload(struct spdk_filesystem *fs, spdk_fs_op_complete cb_fn, void *cb_arg) 768 { 769 struct spdk_fs_request *req; 770 struct spdk_fs_cb_args *args; 771 772 /* 773 * We must free the md_channel before unloading the blobstore, so just 774 * allocate this request from the general heap. 775 */ 776 req = calloc(1, sizeof(*req)); 777 if (req == NULL) { 778 cb_fn(cb_arg, -ENOMEM); 779 return; 780 } 781 782 args = &req->args; 783 args->fn.fs_op = cb_fn; 784 args->arg = cb_arg; 785 args->fs = fs; 786 787 spdk_fs_free_io_channels(fs); 788 spdk_bs_unload(fs->bs, unload_cb, req); 789 } 790 791 static struct spdk_file * 792 fs_find_file(struct spdk_filesystem *fs, const char *name) 793 { 794 struct spdk_file *file; 795 796 TAILQ_FOREACH(file, &fs->files, tailq) { 797 if (!strncmp(name, file->name, SPDK_FILE_NAME_MAX)) { 798 return file; 799 } 800 } 801 802 return NULL; 803 } 804 805 void 806 spdk_fs_file_stat_async(struct spdk_filesystem *fs, const char *name, 807 spdk_file_stat_op_complete cb_fn, void *cb_arg) 808 { 809 struct spdk_file_stat stat; 810 struct spdk_file *f = NULL; 811 812 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 813 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 814 return; 815 } 816 817 f = fs_find_file(fs, name); 818 if (f != NULL) { 819 stat.blobid = f->blobid; 820 stat.size = f->append_pos >= f->length ? f->append_pos : f->length; 821 cb_fn(cb_arg, &stat, 0); 822 return; 823 } 824 825 cb_fn(cb_arg, NULL, -ENOENT); 826 } 827 828 static void 829 __copy_stat(void *arg, struct spdk_file_stat *stat, int fserrno) 830 { 831 struct spdk_fs_request *req = arg; 832 struct spdk_fs_cb_args *args = &req->args; 833 834 args->rc = fserrno; 835 if (fserrno == 0) { 836 memcpy(args->arg, stat, sizeof(*stat)); 837 } 838 sem_post(args->sem); 839 } 840 841 static void 842 __file_stat(void *arg) 843 { 844 struct spdk_fs_request *req = arg; 845 struct spdk_fs_cb_args *args = &req->args; 846 847 spdk_fs_file_stat_async(args->fs, args->op.stat.name, 848 args->fn.stat_op, req); 849 } 850 851 int 852 spdk_fs_file_stat(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 853 const char *name, struct spdk_file_stat *stat) 854 { 855 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 856 struct spdk_fs_request *req; 857 int rc; 858 859 req = alloc_fs_request(channel); 860 if (req == NULL) { 861 return -ENOMEM; 862 } 863 864 req->args.fs = fs; 865 req->args.op.stat.name = name; 866 req->args.fn.stat_op = __copy_stat; 867 req->args.arg = stat; 868 req->args.sem = &channel->sem; 869 channel->send_request(__file_stat, req); 870 sem_wait(&channel->sem); 871 872 rc = req->args.rc; 873 free_fs_request(req); 874 875 return rc; 876 } 877 878 static void 879 fs_create_blob_close_cb(void *ctx, int bserrno) 880 { 881 int rc; 882 struct spdk_fs_request *req = ctx; 883 struct spdk_fs_cb_args *args = &req->args; 884 885 rc = args->rc ? args->rc : bserrno; 886 args->fn.file_op(args->arg, rc); 887 free_fs_request(req); 888 } 889 890 static void 891 fs_create_blob_resize_cb(void *ctx, int bserrno) 892 { 893 struct spdk_fs_request *req = ctx; 894 struct spdk_fs_cb_args *args = &req->args; 895 struct spdk_file *f = args->file; 896 struct spdk_blob *blob = args->op.create.blob; 897 uint64_t length = 0; 898 899 args->rc = bserrno; 900 if (bserrno) { 901 spdk_blob_close(blob, fs_create_blob_close_cb, args); 902 return; 903 } 904 905 spdk_blob_set_xattr(blob, "name", f->name, strlen(f->name) + 1); 906 spdk_blob_set_xattr(blob, "length", &length, sizeof(length)); 907 908 spdk_blob_close(blob, fs_create_blob_close_cb, args); 909 } 910 911 static void 912 fs_create_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 913 { 914 struct spdk_fs_request *req = ctx; 915 struct spdk_fs_cb_args *args = &req->args; 916 917 if (bserrno) { 918 args->fn.file_op(args->arg, bserrno); 919 free_fs_request(req); 920 return; 921 } 922 923 args->op.create.blob = blob; 924 spdk_blob_resize(blob, 1, fs_create_blob_resize_cb, req); 925 } 926 927 static void 928 fs_create_blob_create_cb(void *ctx, spdk_blob_id blobid, int bserrno) 929 { 930 struct spdk_fs_request *req = ctx; 931 struct spdk_fs_cb_args *args = &req->args; 932 struct spdk_file *f = args->file; 933 934 if (bserrno) { 935 args->fn.file_op(args->arg, bserrno); 936 free_fs_request(req); 937 return; 938 } 939 940 f->blobid = blobid; 941 spdk_bs_open_blob(f->fs->bs, blobid, fs_create_blob_open_cb, req); 942 } 943 944 void 945 spdk_fs_create_file_async(struct spdk_filesystem *fs, const char *name, 946 spdk_file_op_complete cb_fn, void *cb_arg) 947 { 948 struct spdk_file *file; 949 struct spdk_fs_request *req; 950 struct spdk_fs_cb_args *args; 951 952 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 953 cb_fn(cb_arg, -ENAMETOOLONG); 954 return; 955 } 956 957 file = fs_find_file(fs, name); 958 if (file != NULL) { 959 cb_fn(cb_arg, -EEXIST); 960 return; 961 } 962 963 file = file_alloc(fs); 964 if (file == NULL) { 965 cb_fn(cb_arg, -ENOMEM); 966 return; 967 } 968 969 req = alloc_fs_request(fs->md_target.md_fs_channel); 970 if (req == NULL) { 971 cb_fn(cb_arg, -ENOMEM); 972 return; 973 } 974 975 args = &req->args; 976 args->file = file; 977 args->fn.file_op = cb_fn; 978 args->arg = cb_arg; 979 980 file->name = strdup(name); 981 spdk_bs_create_blob(fs->bs, fs_create_blob_create_cb, args); 982 } 983 984 static void 985 __fs_create_file_done(void *arg, int fserrno) 986 { 987 struct spdk_fs_request *req = arg; 988 struct spdk_fs_cb_args *args = &req->args; 989 990 args->rc = fserrno; 991 sem_post(args->sem); 992 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 993 } 994 995 static void 996 __fs_create_file(void *arg) 997 { 998 struct spdk_fs_request *req = arg; 999 struct spdk_fs_cb_args *args = &req->args; 1000 1001 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.create.name); 1002 spdk_fs_create_file_async(args->fs, args->op.create.name, __fs_create_file_done, req); 1003 } 1004 1005 int 1006 spdk_fs_create_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, const char *name) 1007 { 1008 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1009 struct spdk_fs_request *req; 1010 struct spdk_fs_cb_args *args; 1011 int rc; 1012 1013 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1014 1015 req = alloc_fs_request(channel); 1016 if (req == NULL) { 1017 return -ENOMEM; 1018 } 1019 1020 args = &req->args; 1021 args->fs = fs; 1022 args->op.create.name = name; 1023 args->sem = &channel->sem; 1024 fs->send_request(__fs_create_file, req); 1025 sem_wait(&channel->sem); 1026 rc = args->rc; 1027 free_fs_request(req); 1028 1029 return rc; 1030 } 1031 1032 static void 1033 fs_open_blob_done(void *ctx, struct spdk_blob *blob, int bserrno) 1034 { 1035 struct spdk_fs_request *req = ctx; 1036 struct spdk_fs_cb_args *args = &req->args; 1037 struct spdk_file *f = args->file; 1038 1039 f->blob = blob; 1040 while (!TAILQ_EMPTY(&f->open_requests)) { 1041 req = TAILQ_FIRST(&f->open_requests); 1042 args = &req->args; 1043 TAILQ_REMOVE(&f->open_requests, req, args.op.open.tailq); 1044 args->fn.file_op_with_handle(args->arg, f, bserrno); 1045 free_fs_request(req); 1046 } 1047 } 1048 1049 static void 1050 fs_open_blob_create_cb(void *ctx, int bserrno) 1051 { 1052 struct spdk_fs_request *req = ctx; 1053 struct spdk_fs_cb_args *args = &req->args; 1054 struct spdk_file *file = args->file; 1055 struct spdk_filesystem *fs = args->fs; 1056 1057 if (file == NULL) { 1058 /* 1059 * This is from an open with CREATE flag - the file 1060 * is now created so look it up in the file list for this 1061 * filesystem. 1062 */ 1063 file = fs_find_file(fs, args->op.open.name); 1064 assert(file != NULL); 1065 args->file = file; 1066 } 1067 1068 file->ref_count++; 1069 TAILQ_INSERT_TAIL(&file->open_requests, req, args.op.open.tailq); 1070 if (file->ref_count == 1) { 1071 assert(file->blob == NULL); 1072 spdk_bs_open_blob(fs->bs, file->blobid, fs_open_blob_done, req); 1073 } else if (file->blob != NULL) { 1074 fs_open_blob_done(req, file->blob, 0); 1075 } else { 1076 /* 1077 * The blob open for this file is in progress due to a previous 1078 * open request. When that open completes, it will invoke the 1079 * open callback for this request. 1080 */ 1081 } 1082 } 1083 1084 void 1085 spdk_fs_open_file_async(struct spdk_filesystem *fs, const char *name, uint32_t flags, 1086 spdk_file_op_with_handle_complete cb_fn, void *cb_arg) 1087 { 1088 struct spdk_file *f = NULL; 1089 struct spdk_fs_request *req; 1090 struct spdk_fs_cb_args *args; 1091 1092 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1093 cb_fn(cb_arg, NULL, -ENAMETOOLONG); 1094 return; 1095 } 1096 1097 f = fs_find_file(fs, name); 1098 if (f == NULL && !(flags & SPDK_BLOBFS_OPEN_CREATE)) { 1099 cb_fn(cb_arg, NULL, -ENOENT); 1100 return; 1101 } 1102 1103 if (f != NULL && f->is_deleted == true) { 1104 cb_fn(cb_arg, NULL, -ENOENT); 1105 return; 1106 } 1107 1108 req = alloc_fs_request(fs->md_target.md_fs_channel); 1109 if (req == NULL) { 1110 cb_fn(cb_arg, NULL, -ENOMEM); 1111 return; 1112 } 1113 1114 args = &req->args; 1115 args->fn.file_op_with_handle = cb_fn; 1116 args->arg = cb_arg; 1117 args->file = f; 1118 args->fs = fs; 1119 args->op.open.name = name; 1120 1121 if (f == NULL) { 1122 spdk_fs_create_file_async(fs, name, fs_open_blob_create_cb, req); 1123 } else { 1124 fs_open_blob_create_cb(req, 0); 1125 } 1126 } 1127 1128 static void 1129 __fs_open_file_done(void *arg, struct spdk_file *file, int bserrno) 1130 { 1131 struct spdk_fs_request *req = arg; 1132 struct spdk_fs_cb_args *args = &req->args; 1133 1134 args->file = file; 1135 __wake_caller(args, bserrno); 1136 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1137 } 1138 1139 static void 1140 __fs_open_file(void *arg) 1141 { 1142 struct spdk_fs_request *req = arg; 1143 struct spdk_fs_cb_args *args = &req->args; 1144 1145 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", args->op.open.name); 1146 spdk_fs_open_file_async(args->fs, args->op.open.name, args->op.open.flags, 1147 __fs_open_file_done, req); 1148 } 1149 1150 int 1151 spdk_fs_open_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1152 const char *name, uint32_t flags, struct spdk_file **file) 1153 { 1154 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1155 struct spdk_fs_request *req; 1156 struct spdk_fs_cb_args *args; 1157 int rc; 1158 1159 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1160 1161 req = alloc_fs_request(channel); 1162 if (req == NULL) { 1163 return -ENOMEM; 1164 } 1165 1166 args = &req->args; 1167 args->fs = fs; 1168 args->op.open.name = name; 1169 args->op.open.flags = flags; 1170 args->sem = &channel->sem; 1171 fs->send_request(__fs_open_file, req); 1172 sem_wait(&channel->sem); 1173 rc = args->rc; 1174 if (rc == 0) { 1175 *file = args->file; 1176 } else { 1177 *file = NULL; 1178 } 1179 free_fs_request(req); 1180 1181 return rc; 1182 } 1183 1184 static void 1185 fs_rename_blob_close_cb(void *ctx, int bserrno) 1186 { 1187 struct spdk_fs_request *req = ctx; 1188 struct spdk_fs_cb_args *args = &req->args; 1189 1190 args->fn.fs_op(args->arg, bserrno); 1191 free_fs_request(req); 1192 } 1193 1194 static void 1195 fs_rename_blob_open_cb(void *ctx, struct spdk_blob *blob, int bserrno) 1196 { 1197 struct spdk_fs_request *req = ctx; 1198 struct spdk_fs_cb_args *args = &req->args; 1199 const char *new_name = args->op.rename.new_name; 1200 1201 spdk_blob_set_xattr(blob, "name", new_name, strlen(new_name) + 1); 1202 spdk_blob_close(blob, fs_rename_blob_close_cb, req); 1203 } 1204 1205 static void 1206 __spdk_fs_md_rename_file(struct spdk_fs_request *req) 1207 { 1208 struct spdk_fs_cb_args *args = &req->args; 1209 struct spdk_file *f; 1210 1211 f = fs_find_file(args->fs, args->op.rename.old_name); 1212 if (f == NULL) { 1213 args->fn.fs_op(args->arg, -ENOENT); 1214 free_fs_request(req); 1215 return; 1216 } 1217 1218 free(f->name); 1219 f->name = strdup(args->op.rename.new_name); 1220 args->file = f; 1221 spdk_bs_open_blob(args->fs->bs, f->blobid, fs_rename_blob_open_cb, req); 1222 } 1223 1224 static void 1225 fs_rename_delete_done(void *arg, int fserrno) 1226 { 1227 __spdk_fs_md_rename_file(arg); 1228 } 1229 1230 void 1231 spdk_fs_rename_file_async(struct spdk_filesystem *fs, 1232 const char *old_name, const char *new_name, 1233 spdk_file_op_complete cb_fn, void *cb_arg) 1234 { 1235 struct spdk_file *f; 1236 struct spdk_fs_request *req; 1237 struct spdk_fs_cb_args *args; 1238 1239 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "old=%s new=%s\n", old_name, new_name); 1240 if (strnlen(new_name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1241 cb_fn(cb_arg, -ENAMETOOLONG); 1242 return; 1243 } 1244 1245 req = alloc_fs_request(fs->md_target.md_fs_channel); 1246 if (req == NULL) { 1247 cb_fn(cb_arg, -ENOMEM); 1248 return; 1249 } 1250 1251 args = &req->args; 1252 args->fn.fs_op = cb_fn; 1253 args->fs = fs; 1254 args->arg = cb_arg; 1255 args->op.rename.old_name = old_name; 1256 args->op.rename.new_name = new_name; 1257 1258 f = fs_find_file(fs, new_name); 1259 if (f == NULL) { 1260 __spdk_fs_md_rename_file(req); 1261 return; 1262 } 1263 1264 /* 1265 * The rename overwrites an existing file. So delete the existing file, then 1266 * do the actual rename. 1267 */ 1268 spdk_fs_delete_file_async(fs, new_name, fs_rename_delete_done, req); 1269 } 1270 1271 static void 1272 __fs_rename_file_done(void *arg, int fserrno) 1273 { 1274 struct spdk_fs_request *req = arg; 1275 struct spdk_fs_cb_args *args = &req->args; 1276 1277 __wake_caller(args, fserrno); 1278 } 1279 1280 static void 1281 __fs_rename_file(void *arg) 1282 { 1283 struct spdk_fs_request *req = arg; 1284 struct spdk_fs_cb_args *args = &req->args; 1285 1286 spdk_fs_rename_file_async(args->fs, args->op.rename.old_name, args->op.rename.new_name, 1287 __fs_rename_file_done, req); 1288 } 1289 1290 int 1291 spdk_fs_rename_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1292 const char *old_name, const char *new_name) 1293 { 1294 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1295 struct spdk_fs_request *req; 1296 struct spdk_fs_cb_args *args; 1297 int rc; 1298 1299 req = alloc_fs_request(channel); 1300 if (req == NULL) { 1301 return -ENOMEM; 1302 } 1303 1304 args = &req->args; 1305 1306 args->fs = fs; 1307 args->op.rename.old_name = old_name; 1308 args->op.rename.new_name = new_name; 1309 args->sem = &channel->sem; 1310 fs->send_request(__fs_rename_file, req); 1311 sem_wait(&channel->sem); 1312 rc = args->rc; 1313 free_fs_request(req); 1314 return rc; 1315 } 1316 1317 static void 1318 blob_delete_cb(void *ctx, int bserrno) 1319 { 1320 struct spdk_fs_request *req = ctx; 1321 struct spdk_fs_cb_args *args = &req->args; 1322 1323 args->fn.file_op(args->arg, bserrno); 1324 free_fs_request(req); 1325 } 1326 1327 void 1328 spdk_fs_delete_file_async(struct spdk_filesystem *fs, const char *name, 1329 spdk_file_op_complete cb_fn, void *cb_arg) 1330 { 1331 struct spdk_file *f; 1332 spdk_blob_id blobid; 1333 struct spdk_fs_request *req; 1334 struct spdk_fs_cb_args *args; 1335 1336 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s\n", name); 1337 1338 if (strnlen(name, SPDK_FILE_NAME_MAX + 1) == SPDK_FILE_NAME_MAX + 1) { 1339 cb_fn(cb_arg, -ENAMETOOLONG); 1340 return; 1341 } 1342 1343 f = fs_find_file(fs, name); 1344 if (f == NULL) { 1345 cb_fn(cb_arg, -ENOENT); 1346 return; 1347 } 1348 1349 req = alloc_fs_request(fs->md_target.md_fs_channel); 1350 if (req == NULL) { 1351 cb_fn(cb_arg, -ENOMEM); 1352 return; 1353 } 1354 1355 args = &req->args; 1356 args->fn.file_op = cb_fn; 1357 args->arg = cb_arg; 1358 1359 if (f->ref_count > 0) { 1360 /* If the ref > 0, we mark the file as deleted and delete it when we close it. */ 1361 f->is_deleted = true; 1362 spdk_blob_set_xattr(f->blob, "is_deleted", &f->is_deleted, sizeof(bool)); 1363 spdk_blob_sync_md(f->blob, blob_delete_cb, args); 1364 return; 1365 } 1366 1367 TAILQ_REMOVE(&fs->files, f, tailq); 1368 1369 cache_free_buffers(f); 1370 1371 blobid = f->blobid; 1372 1373 free(f->name); 1374 free(f->tree); 1375 free(f); 1376 1377 spdk_bs_delete_blob(fs->bs, blobid, blob_delete_cb, req); 1378 } 1379 1380 static void 1381 __fs_delete_file_done(void *arg, int fserrno) 1382 { 1383 struct spdk_fs_request *req = arg; 1384 struct spdk_fs_cb_args *args = &req->args; 1385 1386 __wake_caller(args, fserrno); 1387 } 1388 1389 static void 1390 __fs_delete_file(void *arg) 1391 { 1392 struct spdk_fs_request *req = arg; 1393 struct spdk_fs_cb_args *args = &req->args; 1394 1395 spdk_fs_delete_file_async(args->fs, args->op.delete.name, __fs_delete_file_done, req); 1396 } 1397 1398 int 1399 spdk_fs_delete_file(struct spdk_filesystem *fs, struct spdk_io_channel *_channel, 1400 const char *name) 1401 { 1402 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1403 struct spdk_fs_request *req; 1404 struct spdk_fs_cb_args *args; 1405 int rc; 1406 1407 req = alloc_fs_request(channel); 1408 if (req == NULL) { 1409 return -ENOMEM; 1410 } 1411 1412 args = &req->args; 1413 args->fs = fs; 1414 args->op.delete.name = name; 1415 args->sem = &channel->sem; 1416 fs->send_request(__fs_delete_file, req); 1417 sem_wait(&channel->sem); 1418 rc = args->rc; 1419 free_fs_request(req); 1420 1421 return rc; 1422 } 1423 1424 spdk_fs_iter 1425 spdk_fs_iter_first(struct spdk_filesystem *fs) 1426 { 1427 struct spdk_file *f; 1428 1429 f = TAILQ_FIRST(&fs->files); 1430 return f; 1431 } 1432 1433 spdk_fs_iter 1434 spdk_fs_iter_next(spdk_fs_iter iter) 1435 { 1436 struct spdk_file *f = iter; 1437 1438 if (f == NULL) { 1439 return NULL; 1440 } 1441 1442 f = TAILQ_NEXT(f, tailq); 1443 return f; 1444 } 1445 1446 const char * 1447 spdk_file_get_name(struct spdk_file *file) 1448 { 1449 return file->name; 1450 } 1451 1452 uint64_t 1453 spdk_file_get_length(struct spdk_file *file) 1454 { 1455 uint64_t length; 1456 1457 assert(file != NULL); 1458 1459 length = file->append_pos >= file->length ? file->append_pos : file->length; 1460 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s length=0x%jx\n", file->name, length); 1461 return length; 1462 } 1463 1464 static void 1465 fs_truncate_complete_cb(void *ctx, int bserrno) 1466 { 1467 struct spdk_fs_request *req = ctx; 1468 struct spdk_fs_cb_args *args = &req->args; 1469 1470 args->fn.file_op(args->arg, bserrno); 1471 free_fs_request(req); 1472 } 1473 1474 static void 1475 fs_truncate_resize_cb(void *ctx, int bserrno) 1476 { 1477 struct spdk_fs_request *req = ctx; 1478 struct spdk_fs_cb_args *args = &req->args; 1479 struct spdk_file *file = args->file; 1480 uint64_t *length = &args->op.truncate.length; 1481 1482 if (bserrno) { 1483 args->fn.file_op(args->arg, bserrno); 1484 free_fs_request(req); 1485 return; 1486 } 1487 1488 spdk_blob_set_xattr(file->blob, "length", length, sizeof(*length)); 1489 1490 file->length = *length; 1491 if (file->append_pos > file->length) { 1492 file->append_pos = file->length; 1493 } 1494 1495 spdk_blob_sync_md(file->blob, fs_truncate_complete_cb, args); 1496 } 1497 1498 static uint64_t 1499 __bytes_to_clusters(uint64_t length, uint64_t cluster_sz) 1500 { 1501 return (length + cluster_sz - 1) / cluster_sz; 1502 } 1503 1504 void 1505 spdk_file_truncate_async(struct spdk_file *file, uint64_t length, 1506 spdk_file_op_complete cb_fn, void *cb_arg) 1507 { 1508 struct spdk_filesystem *fs; 1509 size_t num_clusters; 1510 struct spdk_fs_request *req; 1511 struct spdk_fs_cb_args *args; 1512 1513 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s old=0x%jx new=0x%jx\n", file->name, file->length, length); 1514 if (length == file->length) { 1515 cb_fn(cb_arg, 0); 1516 return; 1517 } 1518 1519 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 1520 if (req == NULL) { 1521 cb_fn(cb_arg, -ENOMEM); 1522 return; 1523 } 1524 1525 args = &req->args; 1526 args->fn.file_op = cb_fn; 1527 args->arg = cb_arg; 1528 args->file = file; 1529 args->op.truncate.length = length; 1530 fs = file->fs; 1531 1532 num_clusters = __bytes_to_clusters(length, fs->bs_opts.cluster_sz); 1533 1534 spdk_blob_resize(file->blob, num_clusters, fs_truncate_resize_cb, req); 1535 } 1536 1537 static void 1538 __truncate(void *arg) 1539 { 1540 struct spdk_fs_request *req = arg; 1541 struct spdk_fs_cb_args *args = &req->args; 1542 1543 spdk_file_truncate_async(args->file, args->op.truncate.length, 1544 args->fn.file_op, args); 1545 } 1546 1547 int 1548 spdk_file_truncate(struct spdk_file *file, struct spdk_io_channel *_channel, 1549 uint64_t length) 1550 { 1551 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1552 struct spdk_fs_request *req; 1553 struct spdk_fs_cb_args *args; 1554 int rc; 1555 1556 req = alloc_fs_request(channel); 1557 if (req == NULL) { 1558 return -ENOMEM; 1559 } 1560 1561 args = &req->args; 1562 1563 args->file = file; 1564 args->op.truncate.length = length; 1565 args->fn.file_op = __wake_caller; 1566 args->sem = &channel->sem; 1567 1568 channel->send_request(__truncate, req); 1569 sem_wait(&channel->sem); 1570 rc = args->rc; 1571 free_fs_request(req); 1572 1573 return rc; 1574 } 1575 1576 static void 1577 __rw_done(void *ctx, int bserrno) 1578 { 1579 struct spdk_fs_request *req = ctx; 1580 struct spdk_fs_cb_args *args = &req->args; 1581 1582 spdk_dma_free(args->op.rw.pin_buf); 1583 args->fn.file_op(args->arg, bserrno); 1584 free_fs_request(req); 1585 } 1586 1587 static void 1588 __read_done(void *ctx, int bserrno) 1589 { 1590 struct spdk_fs_request *req = ctx; 1591 struct spdk_fs_cb_args *args = &req->args; 1592 1593 assert(req != NULL); 1594 if (args->op.rw.is_read) { 1595 memcpy(args->op.rw.user_buf, 1596 args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)), 1597 args->op.rw.length); 1598 __rw_done(req, 0); 1599 } else { 1600 memcpy(args->op.rw.pin_buf + (args->op.rw.offset & (args->op.rw.blocklen - 1)), 1601 args->op.rw.user_buf, 1602 args->op.rw.length); 1603 spdk_blob_io_write(args->file->blob, args->op.rw.channel, 1604 args->op.rw.pin_buf, 1605 args->op.rw.start_lba, args->op.rw.num_lba, 1606 __rw_done, req); 1607 } 1608 } 1609 1610 static void 1611 __do_blob_read(void *ctx, int fserrno) 1612 { 1613 struct spdk_fs_request *req = ctx; 1614 struct spdk_fs_cb_args *args = &req->args; 1615 1616 if (fserrno) { 1617 __rw_done(req, fserrno); 1618 return; 1619 } 1620 spdk_blob_io_read(args->file->blob, args->op.rw.channel, 1621 args->op.rw.pin_buf, 1622 args->op.rw.start_lba, args->op.rw.num_lba, 1623 __read_done, req); 1624 } 1625 1626 static void 1627 __get_page_parameters(struct spdk_file *file, uint64_t offset, uint64_t length, 1628 uint64_t *start_lba, uint32_t *lba_size, uint64_t *num_lba) 1629 { 1630 uint64_t end_lba; 1631 1632 *lba_size = spdk_bs_get_io_unit_size(file->fs->bs); 1633 *start_lba = offset / *lba_size; 1634 end_lba = (offset + length - 1) / *lba_size; 1635 *num_lba = (end_lba - *start_lba + 1); 1636 } 1637 1638 static void 1639 __readwrite(struct spdk_file *file, struct spdk_io_channel *_channel, 1640 void *payload, uint64_t offset, uint64_t length, 1641 spdk_file_op_complete cb_fn, void *cb_arg, int is_read) 1642 { 1643 struct spdk_fs_request *req; 1644 struct spdk_fs_cb_args *args; 1645 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 1646 uint64_t start_lba, num_lba, pin_buf_length; 1647 uint32_t lba_size; 1648 1649 if (is_read && offset + length > file->length) { 1650 cb_fn(cb_arg, -EINVAL); 1651 return; 1652 } 1653 1654 req = alloc_fs_request(channel); 1655 if (req == NULL) { 1656 cb_fn(cb_arg, -ENOMEM); 1657 return; 1658 } 1659 1660 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 1661 1662 args = &req->args; 1663 args->fn.file_op = cb_fn; 1664 args->arg = cb_arg; 1665 args->file = file; 1666 args->op.rw.channel = channel->bs_channel; 1667 args->op.rw.user_buf = payload; 1668 args->op.rw.is_read = is_read; 1669 args->op.rw.offset = offset; 1670 args->op.rw.length = length; 1671 args->op.rw.blocklen = lba_size; 1672 1673 pin_buf_length = num_lba * lba_size; 1674 args->op.rw.pin_buf = spdk_dma_malloc(pin_buf_length, lba_size, NULL); 1675 if (args->op.rw.pin_buf == NULL) { 1676 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "Failed to allocate buf for: file=%s offset=%jx length=%jx\n", 1677 file->name, offset, length); 1678 free_fs_request(req); 1679 cb_fn(cb_arg, -ENOMEM); 1680 return; 1681 } 1682 1683 args->op.rw.start_lba = start_lba; 1684 args->op.rw.num_lba = num_lba; 1685 1686 if (!is_read && file->length < offset + length) { 1687 spdk_file_truncate_async(file, offset + length, __do_blob_read, req); 1688 } else { 1689 __do_blob_read(req, 0); 1690 } 1691 } 1692 1693 void 1694 spdk_file_write_async(struct spdk_file *file, struct spdk_io_channel *channel, 1695 void *payload, uint64_t offset, uint64_t length, 1696 spdk_file_op_complete cb_fn, void *cb_arg) 1697 { 1698 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 0); 1699 } 1700 1701 void 1702 spdk_file_read_async(struct spdk_file *file, struct spdk_io_channel *channel, 1703 void *payload, uint64_t offset, uint64_t length, 1704 spdk_file_op_complete cb_fn, void *cb_arg) 1705 { 1706 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "file=%s offset=%jx length=%jx\n", 1707 file->name, offset, length); 1708 __readwrite(file, channel, payload, offset, length, cb_fn, cb_arg, 1); 1709 } 1710 1711 struct spdk_io_channel * 1712 spdk_fs_alloc_io_channel(struct spdk_filesystem *fs) 1713 { 1714 struct spdk_io_channel *io_channel; 1715 struct spdk_fs_channel *fs_channel; 1716 1717 io_channel = spdk_get_io_channel(&fs->io_target); 1718 fs_channel = spdk_io_channel_get_ctx(io_channel); 1719 fs_channel->bs_channel = spdk_bs_alloc_io_channel(fs->bs); 1720 fs_channel->send_request = __send_request_direct; 1721 1722 return io_channel; 1723 } 1724 1725 struct spdk_io_channel * 1726 spdk_fs_alloc_io_channel_sync(struct spdk_filesystem *fs) 1727 { 1728 struct spdk_io_channel *io_channel; 1729 struct spdk_fs_channel *fs_channel; 1730 1731 io_channel = spdk_get_io_channel(&fs->io_target); 1732 fs_channel = spdk_io_channel_get_ctx(io_channel); 1733 fs_channel->send_request = fs->send_request; 1734 fs_channel->sync = 1; 1735 pthread_spin_init(&fs_channel->lock, 0); 1736 1737 return io_channel; 1738 } 1739 1740 void 1741 spdk_fs_free_io_channel(struct spdk_io_channel *channel) 1742 { 1743 spdk_put_io_channel(channel); 1744 } 1745 1746 void 1747 spdk_fs_set_cache_size(uint64_t size_in_mb) 1748 { 1749 g_fs_cache_size = size_in_mb * 1024 * 1024; 1750 } 1751 1752 uint64_t 1753 spdk_fs_get_cache_size(void) 1754 { 1755 return g_fs_cache_size / (1024 * 1024); 1756 } 1757 1758 static void __file_flush(void *_args); 1759 1760 static void * 1761 alloc_cache_memory_buffer(struct spdk_file *context) 1762 { 1763 struct spdk_file *file; 1764 void *buf; 1765 1766 buf = spdk_mempool_get(g_cache_pool); 1767 if (buf != NULL) { 1768 return buf; 1769 } 1770 1771 pthread_spin_lock(&g_caches_lock); 1772 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1773 if (!file->open_for_writing && 1774 file->priority == SPDK_FILE_PRIORITY_LOW && 1775 file != context) { 1776 break; 1777 } 1778 } 1779 pthread_spin_unlock(&g_caches_lock); 1780 if (file != NULL) { 1781 cache_free_buffers(file); 1782 buf = spdk_mempool_get(g_cache_pool); 1783 if (buf != NULL) { 1784 return buf; 1785 } 1786 } 1787 1788 pthread_spin_lock(&g_caches_lock); 1789 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1790 if (!file->open_for_writing && file != context) { 1791 break; 1792 } 1793 } 1794 pthread_spin_unlock(&g_caches_lock); 1795 if (file != NULL) { 1796 cache_free_buffers(file); 1797 buf = spdk_mempool_get(g_cache_pool); 1798 if (buf != NULL) { 1799 return buf; 1800 } 1801 } 1802 1803 pthread_spin_lock(&g_caches_lock); 1804 TAILQ_FOREACH(file, &g_caches, cache_tailq) { 1805 if (file != context) { 1806 break; 1807 } 1808 } 1809 pthread_spin_unlock(&g_caches_lock); 1810 if (file != NULL) { 1811 cache_free_buffers(file); 1812 buf = spdk_mempool_get(g_cache_pool); 1813 if (buf != NULL) { 1814 return buf; 1815 } 1816 } 1817 1818 return NULL; 1819 } 1820 1821 static struct cache_buffer * 1822 cache_insert_buffer(struct spdk_file *file, uint64_t offset) 1823 { 1824 struct cache_buffer *buf; 1825 int count = 0; 1826 1827 buf = calloc(1, sizeof(*buf)); 1828 if (buf == NULL) { 1829 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "calloc failed\n"); 1830 return NULL; 1831 } 1832 1833 buf->buf = alloc_cache_memory_buffer(file); 1834 while (buf->buf == NULL) { 1835 /* 1836 * TODO: alloc_cache_memory_buffer() should eventually free 1837 * some buffers. Need a more sophisticated check here, instead 1838 * of just bailing if 100 tries does not result in getting a 1839 * free buffer. This will involve using the sync channel's 1840 * semaphore to block until a buffer becomes available. 1841 */ 1842 if (count++ == 100) { 1843 SPDK_ERRLOG("Could not allocate cache buffer for file=%p on offset=%jx\n", 1844 file, offset); 1845 free(buf); 1846 return NULL; 1847 } 1848 buf->buf = alloc_cache_memory_buffer(file); 1849 } 1850 1851 buf->buf_size = CACHE_BUFFER_SIZE; 1852 buf->offset = offset; 1853 1854 pthread_spin_lock(&g_caches_lock); 1855 if (file->tree->present_mask == 0) { 1856 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 1857 } 1858 file->tree = spdk_tree_insert_buffer(file->tree, buf); 1859 pthread_spin_unlock(&g_caches_lock); 1860 1861 return buf; 1862 } 1863 1864 static struct cache_buffer * 1865 cache_append_buffer(struct spdk_file *file) 1866 { 1867 struct cache_buffer *last; 1868 1869 assert(file->last == NULL || file->last->bytes_filled == file->last->buf_size); 1870 assert((file->append_pos % CACHE_BUFFER_SIZE) == 0); 1871 1872 last = cache_insert_buffer(file, file->append_pos); 1873 if (last == NULL) { 1874 SPDK_DEBUGLOG(SPDK_LOG_BLOBFS, "cache_insert_buffer failed\n"); 1875 return NULL; 1876 } 1877 1878 file->last = last; 1879 1880 return last; 1881 } 1882 1883 static void __check_sync_reqs(struct spdk_file *file); 1884 1885 static void 1886 __file_cache_finish_sync(void *ctx, int bserrno) 1887 { 1888 struct spdk_file *file = ctx; 1889 struct spdk_fs_request *sync_req; 1890 struct spdk_fs_cb_args *sync_args; 1891 1892 pthread_spin_lock(&file->lock); 1893 sync_req = TAILQ_FIRST(&file->sync_requests); 1894 sync_args = &sync_req->args; 1895 assert(sync_args->op.sync.offset <= file->length_flushed); 1896 BLOBFS_TRACE(file, "sync done offset=%jx\n", sync_args->op.sync.offset); 1897 TAILQ_REMOVE(&file->sync_requests, sync_req, args.op.sync.tailq); 1898 pthread_spin_unlock(&file->lock); 1899 1900 sync_args->fn.file_op(sync_args->arg, bserrno); 1901 __check_sync_reqs(file); 1902 1903 pthread_spin_lock(&file->lock); 1904 free_fs_request(sync_req); 1905 pthread_spin_unlock(&file->lock); 1906 } 1907 1908 static void 1909 __free_args(struct spdk_fs_cb_args *args) 1910 { 1911 struct spdk_fs_request *req; 1912 1913 if (!args->from_request) { 1914 free(args); 1915 } else { 1916 /* Depends on args being at the start of the spdk_fs_request structure. */ 1917 req = (struct spdk_fs_request *)args; 1918 free_fs_request(req); 1919 } 1920 } 1921 1922 static void 1923 __check_sync_reqs(struct spdk_file *file) 1924 { 1925 struct spdk_fs_request *sync_req; 1926 1927 pthread_spin_lock(&file->lock); 1928 1929 TAILQ_FOREACH(sync_req, &file->sync_requests, args.op.sync.tailq) { 1930 if (sync_req->args.op.sync.offset <= file->length_flushed) { 1931 break; 1932 } 1933 } 1934 1935 if (sync_req != NULL && !sync_req->args.op.sync.xattr_in_progress) { 1936 BLOBFS_TRACE(file, "set xattr length 0x%jx\n", file->length_flushed); 1937 sync_req->args.op.sync.xattr_in_progress = true; 1938 spdk_blob_set_xattr(file->blob, "length", &file->length_flushed, 1939 sizeof(file->length_flushed)); 1940 1941 pthread_spin_unlock(&file->lock); 1942 spdk_blob_sync_md(file->blob, __file_cache_finish_sync, file); 1943 } else { 1944 pthread_spin_unlock(&file->lock); 1945 } 1946 } 1947 1948 static void 1949 __file_flush_done(void *arg, int bserrno) 1950 { 1951 struct spdk_fs_cb_args *args = arg; 1952 struct spdk_file *file = args->file; 1953 struct cache_buffer *next = args->op.flush.cache_buffer; 1954 1955 BLOBFS_TRACE(file, "length=%jx\n", args->op.flush.length); 1956 1957 pthread_spin_lock(&file->lock); 1958 next->in_progress = false; 1959 next->bytes_flushed += args->op.flush.length; 1960 file->length_flushed += args->op.flush.length; 1961 if (file->length_flushed > file->length) { 1962 file->length = file->length_flushed; 1963 } 1964 if (next->bytes_flushed == next->buf_size) { 1965 BLOBFS_TRACE(file, "write buffer fully flushed 0x%jx\n", file->length_flushed); 1966 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1967 } 1968 1969 /* 1970 * Assert that there is no cached data that extends past the end of the underlying 1971 * blob. 1972 */ 1973 assert(next == NULL || next->offset < __file_get_blob_size(file) || 1974 next->bytes_filled == 0); 1975 1976 pthread_spin_unlock(&file->lock); 1977 1978 __check_sync_reqs(file); 1979 1980 __file_flush(args); 1981 } 1982 1983 static void 1984 __file_flush(void *_args) 1985 { 1986 struct spdk_fs_cb_args *args = _args; 1987 struct spdk_file *file = args->file; 1988 struct cache_buffer *next; 1989 uint64_t offset, length, start_lba, num_lba; 1990 uint32_t lba_size; 1991 1992 pthread_spin_lock(&file->lock); 1993 next = spdk_tree_find_buffer(file->tree, file->length_flushed); 1994 if (next == NULL || next->in_progress) { 1995 /* 1996 * There is either no data to flush, or a flush I/O is already in 1997 * progress. So return immediately - if a flush I/O is in 1998 * progress we will flush more data after that is completed. 1999 */ 2000 __free_args(args); 2001 if (next == NULL) { 2002 /* 2003 * For cases where a file's cache was evicted, and then the 2004 * file was later appended, we will write the data directly 2005 * to disk and bypass cache. So just update length_flushed 2006 * here to reflect that all data was already written to disk. 2007 */ 2008 file->length_flushed = file->append_pos; 2009 } 2010 pthread_spin_unlock(&file->lock); 2011 if (next == NULL) { 2012 /* 2013 * There is no data to flush, but we still need to check for any 2014 * outstanding sync requests to make sure metadata gets updated. 2015 */ 2016 __check_sync_reqs(file); 2017 } 2018 return; 2019 } 2020 2021 offset = next->offset + next->bytes_flushed; 2022 length = next->bytes_filled - next->bytes_flushed; 2023 if (length == 0) { 2024 __free_args(args); 2025 pthread_spin_unlock(&file->lock); 2026 return; 2027 } 2028 args->op.flush.length = length; 2029 args->op.flush.cache_buffer = next; 2030 2031 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2032 2033 next->in_progress = true; 2034 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2035 offset, length, start_lba, num_lba); 2036 pthread_spin_unlock(&file->lock); 2037 spdk_blob_io_write(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2038 next->buf + (start_lba * lba_size) - next->offset, 2039 start_lba, num_lba, __file_flush_done, args); 2040 } 2041 2042 static void 2043 __file_extend_done(void *arg, int bserrno) 2044 { 2045 struct spdk_fs_cb_args *args = arg; 2046 2047 __wake_caller(args, bserrno); 2048 } 2049 2050 static void 2051 __file_extend_resize_cb(void *_args, int bserrno) 2052 { 2053 struct spdk_fs_cb_args *args = _args; 2054 struct spdk_file *file = args->file; 2055 2056 if (bserrno) { 2057 __wake_caller(args, bserrno); 2058 return; 2059 } 2060 2061 spdk_blob_sync_md(file->blob, __file_extend_done, args); 2062 } 2063 2064 static void 2065 __file_extend_blob(void *_args) 2066 { 2067 struct spdk_fs_cb_args *args = _args; 2068 struct spdk_file *file = args->file; 2069 2070 spdk_blob_resize(file->blob, args->op.resize.num_clusters, __file_extend_resize_cb, args); 2071 } 2072 2073 static void 2074 __rw_from_file_done(void *arg, int bserrno) 2075 { 2076 struct spdk_fs_cb_args *args = arg; 2077 2078 __wake_caller(args, bserrno); 2079 __free_args(args); 2080 } 2081 2082 static void 2083 __rw_from_file(void *_args) 2084 { 2085 struct spdk_fs_cb_args *args = _args; 2086 struct spdk_file *file = args->file; 2087 2088 if (args->op.rw.is_read) { 2089 spdk_file_read_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 2090 args->op.rw.offset, args->op.rw.length, 2091 __rw_from_file_done, args); 2092 } else { 2093 spdk_file_write_async(file, file->fs->sync_target.sync_io_channel, args->op.rw.user_buf, 2094 args->op.rw.offset, args->op.rw.length, 2095 __rw_from_file_done, args); 2096 } 2097 } 2098 2099 static int 2100 __send_rw_from_file(struct spdk_file *file, sem_t *sem, void *payload, 2101 uint64_t offset, uint64_t length, bool is_read) 2102 { 2103 struct spdk_fs_cb_args *args; 2104 2105 args = calloc(1, sizeof(*args)); 2106 if (args == NULL) { 2107 sem_post(sem); 2108 return -ENOMEM; 2109 } 2110 2111 args->file = file; 2112 args->sem = sem; 2113 args->op.rw.user_buf = payload; 2114 args->op.rw.offset = offset; 2115 args->op.rw.length = length; 2116 args->op.rw.is_read = is_read; 2117 file->fs->send_request(__rw_from_file, args); 2118 return 0; 2119 } 2120 2121 int 2122 spdk_file_write(struct spdk_file *file, struct spdk_io_channel *_channel, 2123 void *payload, uint64_t offset, uint64_t length) 2124 { 2125 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2126 struct spdk_fs_cb_args *args; 2127 uint64_t rem_length, copy, blob_size, cluster_sz; 2128 uint32_t cache_buffers_filled = 0; 2129 uint8_t *cur_payload; 2130 struct cache_buffer *last; 2131 2132 BLOBFS_TRACE_RW(file, "offset=%jx length=%jx\n", offset, length); 2133 2134 if (length == 0) { 2135 return 0; 2136 } 2137 2138 if (offset != file->append_pos) { 2139 BLOBFS_TRACE(file, " error offset=%jx append_pos=%jx\n", offset, file->append_pos); 2140 return -EINVAL; 2141 } 2142 2143 pthread_spin_lock(&file->lock); 2144 file->open_for_writing = true; 2145 2146 if ((file->last == NULL) && (file->append_pos % CACHE_BUFFER_SIZE == 0)) { 2147 cache_append_buffer(file); 2148 } 2149 2150 if (file->last == NULL) { 2151 int rc; 2152 2153 file->append_pos += length; 2154 pthread_spin_unlock(&file->lock); 2155 rc = __send_rw_from_file(file, &channel->sem, payload, 2156 offset, length, false); 2157 sem_wait(&channel->sem); 2158 return rc; 2159 } 2160 2161 blob_size = __file_get_blob_size(file); 2162 2163 if ((offset + length) > blob_size) { 2164 struct spdk_fs_cb_args extend_args = {}; 2165 2166 cluster_sz = file->fs->bs_opts.cluster_sz; 2167 extend_args.sem = &channel->sem; 2168 extend_args.op.resize.num_clusters = __bytes_to_clusters((offset + length), cluster_sz); 2169 extend_args.file = file; 2170 BLOBFS_TRACE(file, "start resize to %u clusters\n", extend_args.op.resize.num_clusters); 2171 pthread_spin_unlock(&file->lock); 2172 file->fs->send_request(__file_extend_blob, &extend_args); 2173 sem_wait(&channel->sem); 2174 if (extend_args.rc) { 2175 return extend_args.rc; 2176 } 2177 } 2178 2179 last = file->last; 2180 rem_length = length; 2181 cur_payload = payload; 2182 while (rem_length > 0) { 2183 copy = last->buf_size - last->bytes_filled; 2184 if (copy > rem_length) { 2185 copy = rem_length; 2186 } 2187 BLOBFS_TRACE_RW(file, " fill offset=%jx length=%jx\n", file->append_pos, copy); 2188 memcpy(&last->buf[last->bytes_filled], cur_payload, copy); 2189 file->append_pos += copy; 2190 if (file->length < file->append_pos) { 2191 file->length = file->append_pos; 2192 } 2193 cur_payload += copy; 2194 last->bytes_filled += copy; 2195 rem_length -= copy; 2196 if (last->bytes_filled == last->buf_size) { 2197 cache_buffers_filled++; 2198 last = cache_append_buffer(file); 2199 if (last == NULL) { 2200 BLOBFS_TRACE(file, "nomem\n"); 2201 pthread_spin_unlock(&file->lock); 2202 return -ENOMEM; 2203 } 2204 } 2205 } 2206 2207 pthread_spin_unlock(&file->lock); 2208 2209 if (cache_buffers_filled == 0) { 2210 return 0; 2211 } 2212 2213 args = calloc(1, sizeof(*args)); 2214 if (args == NULL) { 2215 return -ENOMEM; 2216 } 2217 2218 args->file = file; 2219 file->fs->send_request(__file_flush, args); 2220 return 0; 2221 } 2222 2223 static void 2224 __readahead_done(void *arg, int bserrno) 2225 { 2226 struct spdk_fs_cb_args *args = arg; 2227 struct cache_buffer *cache_buffer = args->op.readahead.cache_buffer; 2228 struct spdk_file *file = args->file; 2229 2230 BLOBFS_TRACE(file, "offset=%jx\n", cache_buffer->offset); 2231 2232 pthread_spin_lock(&file->lock); 2233 cache_buffer->bytes_filled = args->op.readahead.length; 2234 cache_buffer->bytes_flushed = args->op.readahead.length; 2235 cache_buffer->in_progress = false; 2236 pthread_spin_unlock(&file->lock); 2237 2238 __free_args(args); 2239 } 2240 2241 static void 2242 __readahead(void *_args) 2243 { 2244 struct spdk_fs_cb_args *args = _args; 2245 struct spdk_file *file = args->file; 2246 uint64_t offset, length, start_lba, num_lba; 2247 uint32_t lba_size; 2248 2249 offset = args->op.readahead.offset; 2250 length = args->op.readahead.length; 2251 assert(length > 0); 2252 2253 __get_page_parameters(file, offset, length, &start_lba, &lba_size, &num_lba); 2254 2255 BLOBFS_TRACE(file, "offset=%jx length=%jx page start=%jx num=%jx\n", 2256 offset, length, start_lba, num_lba); 2257 spdk_blob_io_read(file->blob, file->fs->sync_target.sync_fs_channel->bs_channel, 2258 args->op.readahead.cache_buffer->buf, 2259 start_lba, num_lba, __readahead_done, args); 2260 } 2261 2262 static uint64_t 2263 __next_cache_buffer_offset(uint64_t offset) 2264 { 2265 return (offset + CACHE_BUFFER_SIZE) & ~(CACHE_TREE_LEVEL_MASK(0)); 2266 } 2267 2268 static void 2269 check_readahead(struct spdk_file *file, uint64_t offset) 2270 { 2271 struct spdk_fs_cb_args *args; 2272 2273 offset = __next_cache_buffer_offset(offset); 2274 if (spdk_tree_find_buffer(file->tree, offset) != NULL || file->length <= offset) { 2275 return; 2276 } 2277 2278 args = calloc(1, sizeof(*args)); 2279 if (args == NULL) { 2280 return; 2281 } 2282 2283 BLOBFS_TRACE(file, "offset=%jx\n", offset); 2284 2285 args->file = file; 2286 args->op.readahead.offset = offset; 2287 args->op.readahead.cache_buffer = cache_insert_buffer(file, offset); 2288 if (!args->op.readahead.cache_buffer) { 2289 BLOBFS_TRACE(file, "Cannot allocate buf for offset=%jx\n", offset); 2290 free(args); 2291 return; 2292 } 2293 2294 args->op.readahead.cache_buffer->in_progress = true; 2295 if (file->length < (offset + CACHE_BUFFER_SIZE)) { 2296 args->op.readahead.length = file->length & (CACHE_BUFFER_SIZE - 1); 2297 } else { 2298 args->op.readahead.length = CACHE_BUFFER_SIZE; 2299 } 2300 file->fs->send_request(__readahead, args); 2301 } 2302 2303 static int 2304 __file_read(struct spdk_file *file, void *payload, uint64_t offset, uint64_t length, sem_t *sem) 2305 { 2306 struct cache_buffer *buf; 2307 int rc; 2308 2309 buf = spdk_tree_find_filled_buffer(file->tree, offset); 2310 if (buf == NULL) { 2311 pthread_spin_unlock(&file->lock); 2312 rc = __send_rw_from_file(file, sem, payload, offset, length, true); 2313 pthread_spin_lock(&file->lock); 2314 return rc; 2315 } 2316 2317 if ((offset + length) > (buf->offset + buf->bytes_filled)) { 2318 length = buf->offset + buf->bytes_filled - offset; 2319 } 2320 BLOBFS_TRACE(file, "read %p offset=%ju length=%ju\n", payload, offset, length); 2321 memcpy(payload, &buf->buf[offset - buf->offset], length); 2322 if ((offset + length) % CACHE_BUFFER_SIZE == 0) { 2323 pthread_spin_lock(&g_caches_lock); 2324 spdk_tree_remove_buffer(file->tree, buf); 2325 if (file->tree->present_mask == 0) { 2326 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2327 } 2328 pthread_spin_unlock(&g_caches_lock); 2329 } 2330 2331 sem_post(sem); 2332 return 0; 2333 } 2334 2335 int64_t 2336 spdk_file_read(struct spdk_file *file, struct spdk_io_channel *_channel, 2337 void *payload, uint64_t offset, uint64_t length) 2338 { 2339 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2340 uint64_t final_offset, final_length; 2341 uint32_t sub_reads = 0; 2342 int rc = 0; 2343 2344 pthread_spin_lock(&file->lock); 2345 2346 BLOBFS_TRACE_RW(file, "offset=%ju length=%ju\n", offset, length); 2347 2348 file->open_for_writing = false; 2349 2350 if (length == 0 || offset >= file->append_pos) { 2351 pthread_spin_unlock(&file->lock); 2352 return 0; 2353 } 2354 2355 if (offset + length > file->append_pos) { 2356 length = file->append_pos - offset; 2357 } 2358 2359 if (offset != file->next_seq_offset) { 2360 file->seq_byte_count = 0; 2361 } 2362 file->seq_byte_count += length; 2363 file->next_seq_offset = offset + length; 2364 if (file->seq_byte_count >= CACHE_READAHEAD_THRESHOLD) { 2365 check_readahead(file, offset); 2366 check_readahead(file, offset + CACHE_BUFFER_SIZE); 2367 } 2368 2369 final_length = 0; 2370 final_offset = offset + length; 2371 while (offset < final_offset) { 2372 length = NEXT_CACHE_BUFFER_OFFSET(offset) - offset; 2373 if (length > (final_offset - offset)) { 2374 length = final_offset - offset; 2375 } 2376 rc = __file_read(file, payload, offset, length, &channel->sem); 2377 if (rc == 0) { 2378 final_length += length; 2379 } else { 2380 break; 2381 } 2382 payload += length; 2383 offset += length; 2384 sub_reads++; 2385 } 2386 pthread_spin_unlock(&file->lock); 2387 while (sub_reads-- > 0) { 2388 sem_wait(&channel->sem); 2389 } 2390 if (rc == 0) { 2391 return final_length; 2392 } else { 2393 return rc; 2394 } 2395 } 2396 2397 static void 2398 _file_sync(struct spdk_file *file, struct spdk_fs_channel *channel, 2399 spdk_file_op_complete cb_fn, void *cb_arg) 2400 { 2401 struct spdk_fs_request *sync_req; 2402 struct spdk_fs_request *flush_req; 2403 struct spdk_fs_cb_args *sync_args; 2404 struct spdk_fs_cb_args *flush_args; 2405 2406 BLOBFS_TRACE(file, "offset=%jx\n", file->append_pos); 2407 2408 pthread_spin_lock(&file->lock); 2409 if (file->append_pos <= file->length_flushed) { 2410 BLOBFS_TRACE(file, "done - no data to flush\n"); 2411 pthread_spin_unlock(&file->lock); 2412 cb_fn(cb_arg, 0); 2413 return; 2414 } 2415 2416 sync_req = alloc_fs_request(channel); 2417 if (!sync_req) { 2418 pthread_spin_unlock(&file->lock); 2419 cb_fn(cb_arg, -ENOMEM); 2420 return; 2421 } 2422 sync_args = &sync_req->args; 2423 2424 flush_req = alloc_fs_request(channel); 2425 if (!flush_req) { 2426 pthread_spin_unlock(&file->lock); 2427 cb_fn(cb_arg, -ENOMEM); 2428 return; 2429 } 2430 flush_args = &flush_req->args; 2431 2432 sync_args->file = file; 2433 sync_args->fn.file_op = cb_fn; 2434 sync_args->arg = cb_arg; 2435 sync_args->op.sync.offset = file->append_pos; 2436 sync_args->op.sync.xattr_in_progress = false; 2437 TAILQ_INSERT_TAIL(&file->sync_requests, sync_req, args.op.sync.tailq); 2438 pthread_spin_unlock(&file->lock); 2439 2440 flush_args->file = file; 2441 channel->send_request(__file_flush, flush_args); 2442 } 2443 2444 int 2445 spdk_file_sync(struct spdk_file *file, struct spdk_io_channel *_channel) 2446 { 2447 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2448 struct spdk_fs_cb_args args = {}; 2449 2450 args.sem = &channel->sem; 2451 _file_sync(file, channel, __wake_caller, &args); 2452 sem_wait(&channel->sem); 2453 2454 return args.rc; 2455 } 2456 2457 void 2458 spdk_file_sync_async(struct spdk_file *file, struct spdk_io_channel *_channel, 2459 spdk_file_op_complete cb_fn, void *cb_arg) 2460 { 2461 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2462 2463 _file_sync(file, channel, cb_fn, cb_arg); 2464 } 2465 2466 void 2467 spdk_file_set_priority(struct spdk_file *file, uint32_t priority) 2468 { 2469 BLOBFS_TRACE(file, "priority=%u\n", priority); 2470 file->priority = priority; 2471 2472 } 2473 2474 /* 2475 * Close routines 2476 */ 2477 2478 static void 2479 __file_close_async_done(void *ctx, int bserrno) 2480 { 2481 struct spdk_fs_request *req = ctx; 2482 struct spdk_fs_cb_args *args = &req->args; 2483 struct spdk_file *file = args->file; 2484 2485 if (file->is_deleted) { 2486 spdk_fs_delete_file_async(file->fs, file->name, blob_delete_cb, ctx); 2487 return; 2488 } 2489 2490 args->fn.file_op(args->arg, bserrno); 2491 free_fs_request(req); 2492 } 2493 2494 static void 2495 __file_close_async(struct spdk_file *file, struct spdk_fs_request *req) 2496 { 2497 struct spdk_blob *blob; 2498 2499 pthread_spin_lock(&file->lock); 2500 if (file->ref_count == 0) { 2501 pthread_spin_unlock(&file->lock); 2502 __file_close_async_done(req, -EBADF); 2503 return; 2504 } 2505 2506 file->ref_count--; 2507 if (file->ref_count > 0) { 2508 pthread_spin_unlock(&file->lock); 2509 req->args.fn.file_op(req->args.arg, 0); 2510 free_fs_request(req); 2511 return; 2512 } 2513 2514 pthread_spin_unlock(&file->lock); 2515 2516 blob = file->blob; 2517 file->blob = NULL; 2518 spdk_blob_close(blob, __file_close_async_done, req); 2519 } 2520 2521 static void 2522 __file_close_async__sync_done(void *arg, int fserrno) 2523 { 2524 struct spdk_fs_request *req = arg; 2525 struct spdk_fs_cb_args *args = &req->args; 2526 2527 __file_close_async(args->file, req); 2528 } 2529 2530 void 2531 spdk_file_close_async(struct spdk_file *file, spdk_file_op_complete cb_fn, void *cb_arg) 2532 { 2533 struct spdk_fs_request *req; 2534 struct spdk_fs_cb_args *args; 2535 2536 req = alloc_fs_request(file->fs->md_target.md_fs_channel); 2537 if (req == NULL) { 2538 cb_fn(cb_arg, -ENOMEM); 2539 return; 2540 } 2541 2542 args = &req->args; 2543 args->file = file; 2544 args->fn.file_op = cb_fn; 2545 args->arg = cb_arg; 2546 2547 spdk_file_sync_async(file, file->fs->md_target.md_io_channel, __file_close_async__sync_done, req); 2548 } 2549 2550 static void 2551 __file_close(void *arg) 2552 { 2553 struct spdk_fs_request *req = arg; 2554 struct spdk_fs_cb_args *args = &req->args; 2555 struct spdk_file *file = args->file; 2556 2557 __file_close_async(file, req); 2558 } 2559 2560 int 2561 spdk_file_close(struct spdk_file *file, struct spdk_io_channel *_channel) 2562 { 2563 struct spdk_fs_channel *channel = spdk_io_channel_get_ctx(_channel); 2564 struct spdk_fs_request *req; 2565 struct spdk_fs_cb_args *args; 2566 2567 req = alloc_fs_request(channel); 2568 if (req == NULL) { 2569 return -ENOMEM; 2570 } 2571 2572 args = &req->args; 2573 2574 spdk_file_sync(file, _channel); 2575 BLOBFS_TRACE(file, "name=%s\n", file->name); 2576 args->file = file; 2577 args->sem = &channel->sem; 2578 args->fn.file_op = __wake_caller; 2579 args->arg = req; 2580 channel->send_request(__file_close, req); 2581 sem_wait(&channel->sem); 2582 2583 return args->rc; 2584 } 2585 2586 int 2587 spdk_file_get_id(struct spdk_file *file, void *id, size_t size) 2588 { 2589 if (size < sizeof(spdk_blob_id)) { 2590 return -EINVAL; 2591 } 2592 2593 memcpy(id, &file->blobid, sizeof(spdk_blob_id)); 2594 2595 return sizeof(spdk_blob_id); 2596 } 2597 2598 static void 2599 cache_free_buffers(struct spdk_file *file) 2600 { 2601 BLOBFS_TRACE(file, "free=%s\n", file->name); 2602 pthread_spin_lock(&file->lock); 2603 pthread_spin_lock(&g_caches_lock); 2604 if (file->tree->present_mask == 0) { 2605 pthread_spin_unlock(&g_caches_lock); 2606 pthread_spin_unlock(&file->lock); 2607 return; 2608 } 2609 spdk_tree_free_buffers(file->tree); 2610 2611 TAILQ_REMOVE(&g_caches, file, cache_tailq); 2612 /* If not freed, put it in the end of the queue */ 2613 if (file->tree->present_mask != 0) { 2614 TAILQ_INSERT_TAIL(&g_caches, file, cache_tailq); 2615 } 2616 file->last = NULL; 2617 pthread_spin_unlock(&g_caches_lock); 2618 pthread_spin_unlock(&file->lock); 2619 } 2620 2621 SPDK_LOG_REGISTER_COMPONENT("blobfs", SPDK_LOG_BLOBFS) 2622 SPDK_LOG_REGISTER_COMPONENT("blobfs_rw", SPDK_LOG_BLOBFS_RW) 2623